summaryrefslogtreecommitdiff
path: root/src/journal-remote/journal-remote-parse.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/journal-remote/journal-remote-parse.c')
-rw-r--r--src/journal-remote/journal-remote-parse.c133
1 files changed, 71 insertions, 62 deletions
diff --git a/src/journal-remote/journal-remote-parse.c b/src/journal-remote/journal-remote-parse.c
index cdc920eb4c..dfb87d49af 100644
--- a/src/journal-remote/journal-remote-parse.c
+++ b/src/journal-remote/journal-remote-parse.c
@@ -70,24 +70,38 @@ RemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer) {
return source;
}
+static char* realloc_buffer(RemoteSource *source, size_t size) {
+ char *b, *old = source->buf;
+
+ b = GREEDY_REALLOC(source->buf, source->size, size);
+ if (!b)
+ return NULL;
+
+ iovw_rebase(&source->iovw, old, source->buf);
+
+ return b;
+}
+
static int get_line(RemoteSource *source, char **line, size_t *size) {
- ssize_t n, remain;
+ ssize_t n;
char *c = NULL;
- char *newbuf = NULL;
- size_t newsize = 0;
assert(source);
assert(source->state == STATE_LINE);
+ assert(source->offset <= source->filled);
assert(source->filled <= source->size);
assert(source->buf == NULL || source->size > 0);
assert(source->fd >= 0);
while (true) {
- if (source->buf)
- c = memchr(source->buf + source->scanned, '\n',
- source->filled - source->scanned);
- if (c != NULL)
- break;
+ if (source->buf) {
+ size_t start = MAX(source->scanned, source->offset);
+
+ c = memchr(source->buf + start, '\n',
+ source->filled - start);
+ if (c != NULL)
+ break;
+ }
source->scanned = source->filled;
if (source->scanned >= DATA_SIZE_MAX) {
@@ -100,15 +114,12 @@ static int get_line(RemoteSource *source, char **line, size_t *size) {
return -EWOULDBLOCK;
if (source->size - source->filled < LINE_CHUNK &&
- !GREEDY_REALLOC(source->buf, source->size,
- MIN(source->filled + LINE_CHUNK, DATA_SIZE_MAX)))
+ !realloc_buffer(source,
+ MIN(source->filled + LINE_CHUNK, ENTRY_SIZE_MAX)))
return log_oom();
assert(source->size - source->filled >= LINE_CHUNK ||
- source->size == DATA_SIZE_MAX);
-
- // FIXME: the buffer probably needs to be bigger than DATA_SIZE_MAX
- // to accomodate such big fields.
+ source->size == ENTRY_SIZE_MAX);
n = read(source->fd, source->buf + source->filled,
source->size - source->filled);
@@ -123,23 +134,9 @@ static int get_line(RemoteSource *source, char **line, size_t *size) {
source->filled += n;
}
- *line = source->buf;
- *size = c + 1 - source->buf;
-
- /* Check if something remains */
- remain = source->buf + source->filled - c - 1;
- assert(remain >= 0);
- if (remain) {
- newsize = MAX(remain, LINE_CHUNK);
- newbuf = malloc(newsize);
- if (!newbuf)
- return log_oom();
- memcpy(newbuf, c + 1, remain);
- }
- source->buf = newbuf;
- source->size = newsize;
- source->filled = remain;
- source->scanned = 0;
+ *line = source->buf + source->offset;
+ *size = c + 1 - source->buf - source->offset;
+ source->offset += *size;
return 1;
}
@@ -148,8 +145,7 @@ int push_data(RemoteSource *source, const char *data, size_t size) {
assert(source);
assert(source->state != STATE_EOF);
- if (!GREEDY_REALLOC(source->buf, source->size,
- source->filled + size)) {
+ if (!realloc_buffer(source, source->filled + size)) {
log_error("Failed to store received data of size %zu "
"(in addition to existing %zu bytes with %zu filled): %s",
size, source->size, source->filled, strerror(ENOMEM));
@@ -163,28 +159,27 @@ int push_data(RemoteSource *source, const char *data, size_t size) {
}
static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
- int n;
- char *newbuf = NULL;
- size_t newsize = 0, remain;
assert(source);
assert(source->state == STATE_DATA_START ||
source->state == STATE_DATA ||
source->state == STATE_DATA_FINISH);
assert(size <= DATA_SIZE_MAX);
+ assert(source->offset <= source->filled);
assert(source->filled <= source->size);
- assert(source->scanned <= source->filled);
assert(source->buf != NULL || source->size == 0);
assert(source->buf == NULL || source->size > 0);
assert(source->fd >= 0);
assert(data);
- while(source->filled < size) {
+ while (source->filled - source->offset < size) {
+ int n;
+
if (source->passive_fd)
/* we have to wait for some data to come to us */
return -EWOULDBLOCK;
- if (!GREEDY_REALLOC(source->buf, source->size, size))
+ if (!realloc_buffer(source, source->offset + size))
return log_oom();
n = read(source->fd, source->buf + source->filled,
@@ -200,29 +195,15 @@ static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
source->filled += n;
}
- *data = source->buf;
-
- /* Check if something remains */
- assert(size <= source->filled);
- remain = source->filled - size;
- if (remain) {
- newsize = MAX(remain, LINE_CHUNK);
- newbuf = malloc(newsize);
- if (!newbuf)
- return log_oom();
- memcpy(newbuf, source->buf + size, remain);
- }
- source->buf = newbuf;
- source->size = newsize;
- source->filled = remain;
- source->scanned = 0;
+ *data = source->buf + source->offset;
+ source->offset += size;
return 1;
}
static int get_data_size(RemoteSource *source) {
int r;
- _cleanup_free_ void *data = NULL;
+ void *data;
assert(source);
assert(source->state == STATE_DATA_START);
@@ -260,7 +241,7 @@ static int get_data_data(RemoteSource *source, void **data) {
static int get_data_newline(RemoteSource *source) {
int r;
- _cleanup_free_ char *data = NULL;
+ char *data;
assert(source);
assert(source->state == STATE_DATA_FINISH);
@@ -350,15 +331,12 @@ int process_data(RemoteSource *source) {
if (n == 1) {
log_debug("Received empty line, event is ready");
- free(line);
return 1;
}
r = process_dunder(source, line, n);
- if (r != 0) {
- free(line);
+ if (r != 0)
return r < 0 ? r : 0;
- }
/* MESSAGE=xxx\n
or
@@ -377,7 +355,6 @@ int process_data(RemoteSource *source) {
r = iovw_put(&source->iovw, line, n);
if (r < 0) {
log_error("Failed to put line in iovect");
- free(line);
return r;
}
@@ -450,6 +427,7 @@ int process_data(RemoteSource *source) {
}
int process_source(RemoteSource *source, bool compress, bool seal) {
+ size_t remain, target;
int r;
assert(source);
@@ -480,5 +458,36 @@ int process_source(RemoteSource *source, bool compress, bool seal) {
freeing:
iovw_free_contents(&source->iovw);
+
+ /* possibly reset buffer position */
+ remain = source->filled - source->offset;
+
+ if (remain == 0) /* no brainer */
+ source->offset = source->scanned = source->filled = 0;
+ else if (source->offset > source->size - source->filled &&
+ source->offset > remain) {
+ memcpy(source->buf, source->buf + source->offset, remain);
+ source->offset = source->scanned = 0;
+ source->filled = remain;
+ }
+
+ target = source->size;
+ while (target > 16 * LINE_CHUNK && remain < target / 2)
+ target /= 2;
+ if (target < source->size) {
+ char *tmp;
+
+ tmp = realloc(source->buf, target);
+ if (tmp)
+ log_warning("Failed to reallocate buffer to (smaller) size %zu",
+ target);
+ else {
+ log_debug("Reallocated buffer from %zu to %zu bytes",
+ source->size, target);
+ source->buf = tmp;
+ source->size = target;
+ }
+ }
+
return r;
}