summaryrefslogtreecommitdiff
path: root/src/journal-remote/journal-remote-parse.c
diff options
context:
space:
mode:
authorZbigniew Jędrzejewski-Szmek <zbyszek@in.waw.pl>2014-07-11 23:20:08 -0400
committerZbigniew Jędrzejewski-Szmek <zbyszek@in.waw.pl>2014-07-15 22:34:42 -0400
commit92b10cbccdeef3896f45dc340eb7779c78577ede (patch)
tree2d608289582584f30aa27f0b3079457fb2ce326e /src/journal-remote/journal-remote-parse.c
parent874bc134ac6504c45e94174e37af13ff21a6bfe2 (diff)
journal-remote: avoid copying input data
Instead of copying fields into new memory allocations, simply keep pointers into the receive buffer. Data in this buffer is only copied when there is not enough space for new data and a large chunk of the buffer contains old data.
Diffstat (limited to 'src/journal-remote/journal-remote-parse.c')
-rw-r--r--src/journal-remote/journal-remote-parse.c133
1 files changed, 71 insertions, 62 deletions
diff --git a/src/journal-remote/journal-remote-parse.c b/src/journal-remote/journal-remote-parse.c
index cdc920eb4c..dfb87d49af 100644
--- a/src/journal-remote/journal-remote-parse.c
+++ b/src/journal-remote/journal-remote-parse.c
@@ -70,24 +70,38 @@ RemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer) {
return source;
}
+static char* realloc_buffer(RemoteSource *source, size_t size) {
+ char *b, *old = source->buf;
+
+ b = GREEDY_REALLOC(source->buf, source->size, size);
+ if (!b)
+ return NULL;
+
+ iovw_rebase(&source->iovw, old, source->buf);
+
+ return b;
+}
+
static int get_line(RemoteSource *source, char **line, size_t *size) {
- ssize_t n, remain;
+ ssize_t n;
char *c = NULL;
- char *newbuf = NULL;
- size_t newsize = 0;
assert(source);
assert(source->state == STATE_LINE);
+ assert(source->offset <= source->filled);
assert(source->filled <= source->size);
assert(source->buf == NULL || source->size > 0);
assert(source->fd >= 0);
while (true) {
- if (source->buf)
- c = memchr(source->buf + source->scanned, '\n',
- source->filled - source->scanned);
- if (c != NULL)
- break;
+ if (source->buf) {
+ size_t start = MAX(source->scanned, source->offset);
+
+ c = memchr(source->buf + start, '\n',
+ source->filled - start);
+ if (c != NULL)
+ break;
+ }
source->scanned = source->filled;
if (source->scanned >= DATA_SIZE_MAX) {
@@ -100,15 +114,12 @@ static int get_line(RemoteSource *source, char **line, size_t *size) {
return -EWOULDBLOCK;
if (source->size - source->filled < LINE_CHUNK &&
- !GREEDY_REALLOC(source->buf, source->size,
- MIN(source->filled + LINE_CHUNK, DATA_SIZE_MAX)))
+ !realloc_buffer(source,
+ MIN(source->filled + LINE_CHUNK, ENTRY_SIZE_MAX)))
return log_oom();
assert(source->size - source->filled >= LINE_CHUNK ||
- source->size == DATA_SIZE_MAX);
-
- // FIXME: the buffer probably needs to be bigger than DATA_SIZE_MAX
- // to accomodate such big fields.
+ source->size == ENTRY_SIZE_MAX);
n = read(source->fd, source->buf + source->filled,
source->size - source->filled);
@@ -123,23 +134,9 @@ static int get_line(RemoteSource *source, char **line, size_t *size) {
source->filled += n;
}
- *line = source->buf;
- *size = c + 1 - source->buf;
-
- /* Check if something remains */
- remain = source->buf + source->filled - c - 1;
- assert(remain >= 0);
- if (remain) {
- newsize = MAX(remain, LINE_CHUNK);
- newbuf = malloc(newsize);
- if (!newbuf)
- return log_oom();
- memcpy(newbuf, c + 1, remain);
- }
- source->buf = newbuf;
- source->size = newsize;
- source->filled = remain;
- source->scanned = 0;
+ *line = source->buf + source->offset;
+ *size = c + 1 - source->buf - source->offset;
+ source->offset += *size;
return 1;
}
@@ -148,8 +145,7 @@ int push_data(RemoteSource *source, const char *data, size_t size) {
assert(source);
assert(source->state != STATE_EOF);
- if (!GREEDY_REALLOC(source->buf, source->size,
- source->filled + size)) {
+ if (!realloc_buffer(source, source->filled + size)) {
log_error("Failed to store received data of size %zu "
"(in addition to existing %zu bytes with %zu filled): %s",
size, source->size, source->filled, strerror(ENOMEM));
@@ -163,28 +159,27 @@ int push_data(RemoteSource *source, const char *data, size_t size) {
}
static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
- int n;
- char *newbuf = NULL;
- size_t newsize = 0, remain;
assert(source);
assert(source->state == STATE_DATA_START ||
source->state == STATE_DATA ||
source->state == STATE_DATA_FINISH);
assert(size <= DATA_SIZE_MAX);
+ assert(source->offset <= source->filled);
assert(source->filled <= source->size);
- assert(source->scanned <= source->filled);
assert(source->buf != NULL || source->size == 0);
assert(source->buf == NULL || source->size > 0);
assert(source->fd >= 0);
assert(data);
- while(source->filled < size) {
+ while (source->filled - source->offset < size) {
+ int n;
+
if (source->passive_fd)
/* we have to wait for some data to come to us */
return -EWOULDBLOCK;
- if (!GREEDY_REALLOC(source->buf, source->size, size))
+ if (!realloc_buffer(source, source->offset + size))
return log_oom();
n = read(source->fd, source->buf + source->filled,
@@ -200,29 +195,15 @@ static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
source->filled += n;
}
- *data = source->buf;
-
- /* Check if something remains */
- assert(size <= source->filled);
- remain = source->filled - size;
- if (remain) {
- newsize = MAX(remain, LINE_CHUNK);
- newbuf = malloc(newsize);
- if (!newbuf)
- return log_oom();
- memcpy(newbuf, source->buf + size, remain);
- }
- source->buf = newbuf;
- source->size = newsize;
- source->filled = remain;
- source->scanned = 0;
+ *data = source->buf + source->offset;
+ source->offset += size;
return 1;
}
static int get_data_size(RemoteSource *source) {
int r;
- _cleanup_free_ void *data = NULL;
+ void *data;
assert(source);
assert(source->state == STATE_DATA_START);
@@ -260,7 +241,7 @@ static int get_data_data(RemoteSource *source, void **data) {
static int get_data_newline(RemoteSource *source) {
int r;
- _cleanup_free_ char *data = NULL;
+ char *data;
assert(source);
assert(source->state == STATE_DATA_FINISH);
@@ -350,15 +331,12 @@ int process_data(RemoteSource *source) {
if (n == 1) {
log_debug("Received empty line, event is ready");
- free(line);
return 1;
}
r = process_dunder(source, line, n);
- if (r != 0) {
- free(line);
+ if (r != 0)
return r < 0 ? r : 0;
- }
/* MESSAGE=xxx\n
or
@@ -377,7 +355,6 @@ int process_data(RemoteSource *source) {
r = iovw_put(&source->iovw, line, n);
if (r < 0) {
log_error("Failed to put line in iovect");
- free(line);
return r;
}
@@ -450,6 +427,7 @@ int process_data(RemoteSource *source) {
}
int process_source(RemoteSource *source, bool compress, bool seal) {
+ size_t remain, target;
int r;
assert(source);
@@ -480,5 +458,36 @@ int process_source(RemoteSource *source, bool compress, bool seal) {
freeing:
iovw_free_contents(&source->iovw);
+
+ /* possibly reset buffer position */
+ remain = source->filled - source->offset;
+
+ if (remain == 0) /* no brainer */
+ source->offset = source->scanned = source->filled = 0;
+ else if (source->offset > source->size - source->filled &&
+ source->offset > remain) {
+ memcpy(source->buf, source->buf + source->offset, remain);
+ source->offset = source->scanned = 0;
+ source->filled = remain;
+ }
+
+ target = source->size;
+ while (target > 16 * LINE_CHUNK && remain < target / 2)
+ target /= 2;
+ if (target < source->size) {
+ char *tmp;
+
+ tmp = realloc(source->buf, target);
+ if (tmp)
+ log_warning("Failed to reallocate buffer to (smaller) size %zu",
+ target);
+ else {
+ log_debug("Reallocated buffer from %zu to %zu bytes",
+ source->size, target);
+ source->buf = tmp;
+ source->size = target;
+ }
+ }
+
return r;
}