summaryrefslogtreecommitdiff
path: root/src/journal-remote
diff options
context:
space:
mode:
Diffstat (limited to 'src/journal-remote')
-rw-r--r--src/journal-remote/journal-remote-parse.c133
-rw-r--r--src/journal-remote/journal-remote-parse.h9
-rw-r--r--src/journal-remote/journal-remote-write.c11
-rw-r--r--src/journal-remote/journal-remote-write.h1
4 files changed, 85 insertions, 69 deletions
diff --git a/src/journal-remote/journal-remote-parse.c b/src/journal-remote/journal-remote-parse.c
index cdc920eb4c..dfb87d49af 100644
--- a/src/journal-remote/journal-remote-parse.c
+++ b/src/journal-remote/journal-remote-parse.c
@@ -70,24 +70,38 @@ RemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer) {
return source;
}
+static char* realloc_buffer(RemoteSource *source, size_t size) {
+ char *b, *old = source->buf;
+
+ b = GREEDY_REALLOC(source->buf, source->size, size);
+ if (!b)
+ return NULL;
+
+ iovw_rebase(&source->iovw, old, source->buf);
+
+ return b;
+}
+
static int get_line(RemoteSource *source, char **line, size_t *size) {
- ssize_t n, remain;
+ ssize_t n;
char *c = NULL;
- char *newbuf = NULL;
- size_t newsize = 0;
assert(source);
assert(source->state == STATE_LINE);
+ assert(source->offset <= source->filled);
assert(source->filled <= source->size);
assert(source->buf == NULL || source->size > 0);
assert(source->fd >= 0);
while (true) {
- if (source->buf)
- c = memchr(source->buf + source->scanned, '\n',
- source->filled - source->scanned);
- if (c != NULL)
- break;
+ if (source->buf) {
+ size_t start = MAX(source->scanned, source->offset);
+
+ c = memchr(source->buf + start, '\n',
+ source->filled - start);
+ if (c != NULL)
+ break;
+ }
source->scanned = source->filled;
if (source->scanned >= DATA_SIZE_MAX) {
@@ -100,15 +114,12 @@ static int get_line(RemoteSource *source, char **line, size_t *size) {
return -EWOULDBLOCK;
if (source->size - source->filled < LINE_CHUNK &&
- !GREEDY_REALLOC(source->buf, source->size,
- MIN(source->filled + LINE_CHUNK, DATA_SIZE_MAX)))
+ !realloc_buffer(source,
+ MIN(source->filled + LINE_CHUNK, ENTRY_SIZE_MAX)))
return log_oom();
assert(source->size - source->filled >= LINE_CHUNK ||
- source->size == DATA_SIZE_MAX);
-
- // FIXME: the buffer probably needs to be bigger than DATA_SIZE_MAX
- // to accomodate such big fields.
+ source->size == ENTRY_SIZE_MAX);
n = read(source->fd, source->buf + source->filled,
source->size - source->filled);
@@ -123,23 +134,9 @@ static int get_line(RemoteSource *source, char **line, size_t *size) {
source->filled += n;
}
- *line = source->buf;
- *size = c + 1 - source->buf;
-
- /* Check if something remains */
- remain = source->buf + source->filled - c - 1;
- assert(remain >= 0);
- if (remain) {
- newsize = MAX(remain, LINE_CHUNK);
- newbuf = malloc(newsize);
- if (!newbuf)
- return log_oom();
- memcpy(newbuf, c + 1, remain);
- }
- source->buf = newbuf;
- source->size = newsize;
- source->filled = remain;
- source->scanned = 0;
+ *line = source->buf + source->offset;
+ *size = c + 1 - source->buf - source->offset;
+ source->offset += *size;
return 1;
}
@@ -148,8 +145,7 @@ int push_data(RemoteSource *source, const char *data, size_t size) {
assert(source);
assert(source->state != STATE_EOF);
- if (!GREEDY_REALLOC(source->buf, source->size,
- source->filled + size)) {
+ if (!realloc_buffer(source, source->filled + size)) {
log_error("Failed to store received data of size %zu "
"(in addition to existing %zu bytes with %zu filled): %s",
size, source->size, source->filled, strerror(ENOMEM));
@@ -163,28 +159,27 @@ int push_data(RemoteSource *source, const char *data, size_t size) {
}
static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
- int n;
- char *newbuf = NULL;
- size_t newsize = 0, remain;
assert(source);
assert(source->state == STATE_DATA_START ||
source->state == STATE_DATA ||
source->state == STATE_DATA_FINISH);
assert(size <= DATA_SIZE_MAX);
+ assert(source->offset <= source->filled);
assert(source->filled <= source->size);
- assert(source->scanned <= source->filled);
assert(source->buf != NULL || source->size == 0);
assert(source->buf == NULL || source->size > 0);
assert(source->fd >= 0);
assert(data);
- while(source->filled < size) {
+ while (source->filled - source->offset < size) {
+ int n;
+
if (source->passive_fd)
/* we have to wait for some data to come to us */
return -EWOULDBLOCK;
- if (!GREEDY_REALLOC(source->buf, source->size, size))
+ if (!realloc_buffer(source, source->offset + size))
return log_oom();
n = read(source->fd, source->buf + source->filled,
@@ -200,29 +195,15 @@ static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
source->filled += n;
}
- *data = source->buf;
-
- /* Check if something remains */
- assert(size <= source->filled);
- remain = source->filled - size;
- if (remain) {
- newsize = MAX(remain, LINE_CHUNK);
- newbuf = malloc(newsize);
- if (!newbuf)
- return log_oom();
- memcpy(newbuf, source->buf + size, remain);
- }
- source->buf = newbuf;
- source->size = newsize;
- source->filled = remain;
- source->scanned = 0;
+ *data = source->buf + source->offset;
+ source->offset += size;
return 1;
}
static int get_data_size(RemoteSource *source) {
int r;
- _cleanup_free_ void *data = NULL;
+ void *data;
assert(source);
assert(source->state == STATE_DATA_START);
@@ -260,7 +241,7 @@ static int get_data_data(RemoteSource *source, void **data) {
static int get_data_newline(RemoteSource *source) {
int r;
- _cleanup_free_ char *data = NULL;
+ char *data;
assert(source);
assert(source->state == STATE_DATA_FINISH);
@@ -350,15 +331,12 @@ int process_data(RemoteSource *source) {
if (n == 1) {
log_debug("Received empty line, event is ready");
- free(line);
return 1;
}
r = process_dunder(source, line, n);
- if (r != 0) {
- free(line);
+ if (r != 0)
return r < 0 ? r : 0;
- }
/* MESSAGE=xxx\n
or
@@ -377,7 +355,6 @@ int process_data(RemoteSource *source) {
r = iovw_put(&source->iovw, line, n);
if (r < 0) {
log_error("Failed to put line in iovect");
- free(line);
return r;
}
@@ -450,6 +427,7 @@ int process_data(RemoteSource *source) {
}
int process_source(RemoteSource *source, bool compress, bool seal) {
+ size_t remain, target;
int r;
assert(source);
@@ -480,5 +458,36 @@ int process_source(RemoteSource *source, bool compress, bool seal) {
freeing:
iovw_free_contents(&source->iovw);
+
+ /* possibly reset buffer position */
+ remain = source->filled - source->offset;
+
+ if (remain == 0) /* no brainer */
+ source->offset = source->scanned = source->filled = 0;
+ else if (source->offset > source->size - source->filled &&
+ source->offset > remain) {
+ memcpy(source->buf, source->buf + source->offset, remain);
+ source->offset = source->scanned = 0;
+ source->filled = remain;
+ }
+
+ target = source->size;
+ while (target > 16 * LINE_CHUNK && remain < target / 2)
+ target /= 2;
+ if (target < source->size) {
+ char *tmp;
+
+ tmp = realloc(source->buf, target);
+ if (tmp)
+ log_warning("Failed to reallocate buffer to (smaller) size %zu",
+ target);
+ else {
+ log_debug("Reallocated buffer from %zu to %zu bytes",
+ source->size, target);
+ source->buf = tmp;
+ source->size = target;
+ }
+ }
+
return r;
}
diff --git a/src/journal-remote/journal-remote-parse.h b/src/journal-remote/journal-remote-parse.h
index 07d6ddb67f..8499f4eb82 100644
--- a/src/journal-remote/journal-remote-parse.h
+++ b/src/journal-remote/journal-remote-parse.h
@@ -38,10 +38,11 @@ typedef struct RemoteSource {
bool passive_fd;
char *buf;
- size_t size;
- size_t scanned;
- size_t filled;
- size_t data_size;
+ size_t size; /* total size of the buffer */
+ size_t offset; /* offset to the beginning of live data in the buffer */
+ size_t scanned; /* number of bytes since the beginning of data without a newline */
+ size_t filled; /* total number of bytes in the buffer */
+ size_t data_size; /* size of the binary data chunk being processed */
struct iovec_wrapper iovw;
diff --git a/src/journal-remote/journal-remote-write.c b/src/journal-remote/journal-remote-write.c
index cdd06f9eff..bec4cb1f7b 100644
--- a/src/journal-remote/journal-remote-write.c
+++ b/src/journal-remote/journal-remote-write.c
@@ -31,8 +31,6 @@ int iovw_put(struct iovec_wrapper *iovw, void* data, size_t len) {
}
void iovw_free_contents(struct iovec_wrapper *iovw) {
- for (size_t j = 0; j < iovw->count; j++)
- free(iovw->iovec[j].iov_base);
free(iovw->iovec);
iovw->iovec = NULL;
iovw->size_bytes = iovw->count = 0;
@@ -41,12 +39,19 @@ void iovw_free_contents(struct iovec_wrapper *iovw) {
size_t iovw_size(struct iovec_wrapper *iovw) {
size_t n = 0, i;
- for(i = 0; i < iovw->count; i++)
+ for (i = 0; i < iovw->count; i++)
n += iovw->iovec[i].iov_len;
return n;
}
+void iovw_rebase(struct iovec_wrapper *iovw, char *old, char *new) {
+ size_t i;
+
+ for (i = 0; i < iovw->count; i++)
+ iovw->iovec[i].iov_base = (char*) iovw->iovec[i].iov_base - old + new;
+}
+
/**********************************************************************
**********************************************************************
**********************************************************************/
diff --git a/src/journal-remote/journal-remote-write.h b/src/journal-remote/journal-remote-write.h
index 9c5a641d2e..aa381c661e 100644
--- a/src/journal-remote/journal-remote-write.h
+++ b/src/journal-remote/journal-remote-write.h
@@ -36,6 +36,7 @@ struct iovec_wrapper {
int iovw_put(struct iovec_wrapper *iovw, void* data, size_t len);
void iovw_free_contents(struct iovec_wrapper *iovw);
size_t iovw_size(struct iovec_wrapper *iovw);
+void iovw_rebase(struct iovec_wrapper *iovw, char *old, char *new);
typedef struct Writer {
JournalFile *journal;