summaryrefslogtreecommitdiff
path: root/src/journal-remote/journal-remote-parse.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/journal-remote/journal-remote-parse.c')
-rw-r--r--src/journal-remote/journal-remote-parse.c59
1 files changed, 35 insertions, 24 deletions
diff --git a/src/journal-remote/journal-remote-parse.c b/src/journal-remote/journal-remote-parse.c
index d9dea8deb0..5ff05d3ad6 100644
--- a/src/journal-remote/journal-remote-parse.c
+++ b/src/journal-remote/journal-remote-parse.c
@@ -41,6 +41,7 @@ void source_free(RemoteSource *source) {
writer_unref(source->writer);
sd_event_source_unref(source->event);
+ sd_event_source_unref(source->buffer_event);
free(source);
}
@@ -111,21 +112,26 @@ static int get_line(RemoteSource *source, char **line, size_t *size) {
if (source->passive_fd)
/* we have to wait for some data to come to us */
- return -EWOULDBLOCK;
+ return -EAGAIN;
+ /* We know that source->filled is at most DATA_SIZE_MAX, so if
+ we reallocate it, we'll increase the size at least a bit. */
+ assert_cc(DATA_SIZE_MAX < ENTRY_SIZE_MAX);
if (source->size - source->filled < LINE_CHUNK &&
- !realloc_buffer(source,
- MIN(source->filled + LINE_CHUNK, ENTRY_SIZE_MAX)))
+ !realloc_buffer(source, MIN(source->filled + LINE_CHUNK, ENTRY_SIZE_MAX)))
return log_oom();
+ assert(source->buf);
assert(source->size - source->filled >= LINE_CHUNK ||
source->size == ENTRY_SIZE_MAX);
- n = read(source->fd, source->buf + source->filled,
+ n = read(source->fd,
+ source->buf + source->filled,
source->size - source->filled);
if (n < 0) {
- if (errno != EAGAIN && errno != EWOULDBLOCK)
- log_error_errno(errno, "read(%d, ..., %zu): %m", source->fd,
+ if (errno != EAGAIN)
+ log_error_errno(errno, "read(%d, ..., %zu): %m",
+ source->fd,
source->size - source->filled);
return -errno;
} else if (n == 0)
@@ -177,7 +183,7 @@ static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
if (source->passive_fd)
/* we have to wait for some data to come to us */
- return -EWOULDBLOCK;
+ return -EAGAIN;
if (!realloc_buffer(source, source->offset + size))
return log_oom();
@@ -185,7 +191,7 @@ static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
n = read(source->fd, source->buf + source->filled,
source->size - source->filled);
if (n < 0) {
- if (errno != EAGAIN && errno != EWOULDBLOCK)
+ if (errno != EAGAIN)
log_error_errno(errno, "read(%d, ..., %zu): %m", source->fd,
source->size - source->filled);
return -errno;
@@ -309,13 +315,13 @@ static int process_dunder(RemoteSource *source, char *line, size_t n) {
return 0;
}
-int process_data(RemoteSource *source) {
+static int process_data(RemoteSource *source) {
int r;
switch(source->state) {
case STATE_LINE: {
char *line, *sep;
- size_t n;
+ size_t n = 0;
assert(source->data_size == 0);
@@ -344,22 +350,25 @@ int process_data(RemoteSource *source) {
LLLLLLLL0011223344...\n
*/
sep = memchr(line, '=', n);
- if (sep)
+ if (sep) {
/* chomp newline */
n--;
- else
+
+ r = iovw_put(&source->iovw, line, n);
+ if (r < 0)
+ return r;
+ } else {
/* replace \n with = */
line[n-1] = '=';
- log_trace("Received: %.*s", (int) n, line);
- r = iovw_put(&source->iovw, line, n);
- if (r < 0) {
- log_error("Failed to put line in iovect");
- return r;
+ source->field_len = n;
+ source->state = STATE_DATA_START;
+
+ /* we cannot put the field in iovec until we have all data */
}
- if (!sep)
- source->state = STATE_DATA_START;
+ log_trace("Received: %.*s (%s)", (int) n, line, sep ? "text" : "binary");
+
return 0; /* continue */
}
@@ -382,6 +391,7 @@ int process_data(RemoteSource *source) {
case STATE_DATA: {
void *data;
+ char *field;
assert(source->data_size > 0);
@@ -396,11 +406,12 @@ int process_data(RemoteSource *source) {
assert(data);
- r = iovw_put(&source->iovw, data, source->data_size);
- if (r < 0) {
- log_error("failed to put binary buffer in iovect");
+ field = (char*) data - sizeof(uint64_t) - source->field_len;
+ memmove(field + sizeof(uint64_t), field, source->field_len);
+
+ r = iovw_put(&source->iovw, field + sizeof(uint64_t), source->field_len + source->data_size);
+ if (r < 0)
return r;
- }
source->state = STATE_DATA_FINISH;
@@ -438,7 +449,7 @@ int process_source(RemoteSource *source, bool compress, bool seal) {
return r;
/* We have a full event */
- log_trace("Received a full event from source@%p fd:%d (%s)",
+ log_trace("Received full event from source@%p fd:%d (%s)",
source, source->fd, source->name);
if (!source->iovw.count) {