summaryrefslogtreecommitdiff
path: root/src/journal-remote/journal-remote-parse.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/journal-remote/journal-remote-parse.c')
-rw-r--r--src/journal-remote/journal-remote-parse.c43
1 files changed, 37 insertions, 6 deletions
diff --git a/src/journal-remote/journal-remote-parse.c b/src/journal-remote/journal-remote-parse.c
index 90d50a7ad0..cdc920eb4c 100644
--- a/src/journal-remote/journal-remote-parse.c
+++ b/src/journal-remote/journal-remote-parse.c
@@ -28,19 +28,48 @@ void source_free(RemoteSource *source) {
if (!source)
return;
- if (source->fd >= 0) {
+ if (source->fd >= 0 && !source->passive_fd) {
log_debug("Closing fd:%d (%s)", source->fd, source->name);
safe_close(source->fd);
}
+
free(source->name);
free(source->buf);
iovw_free_contents(&source->iovw);
+ log_debug("Writer ref count %u", source->writer->n_ref);
+ writer_unref(source->writer);
+
sd_event_source_unref(source->event);
free(source);
}
+/**
+ * Initialize zero-filled source with given values. On success, takes
+ * ownerhship of fd and writer, otherwise does not touch them.
+ */
+RemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer) {
+
+ RemoteSource *source;
+
+ log_debug("Creating source for %sfd:%d (%s)",
+ passive_fd ? "passive " : "", fd, name);
+
+ assert(fd >= 0);
+
+ source = new0(RemoteSource, 1);
+ if (!source)
+ return NULL;
+
+ source->fd = fd;
+ source->passive_fd = passive_fd;
+ source->name = name;
+ source->writer = writer;
+
+ return source;
+}
+
static int get_line(RemoteSource *source, char **line, size_t *size) {
ssize_t n, remain;
char *c = NULL;
@@ -51,6 +80,7 @@ static int get_line(RemoteSource *source, char **line, size_t *size) {
assert(source->state == STATE_LINE);
assert(source->filled <= source->size);
assert(source->buf == NULL || source->size > 0);
+ assert(source->fd >= 0);
while (true) {
if (source->buf)
@@ -65,7 +95,7 @@ static int get_line(RemoteSource *source, char **line, size_t *size) {
return -E2BIG;
}
- if (source->fd < 0)
+ if (source->passive_fd)
/* we have to wait for some data to come to us */
return -EWOULDBLOCK;
@@ -146,10 +176,11 @@ static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {
assert(source->scanned <= source->filled);
assert(source->buf != NULL || source->size == 0);
assert(source->buf == NULL || source->size > 0);
+ assert(source->fd >= 0);
assert(data);
while(source->filled < size) {
- if (source->fd < 0)
+ if (source->passive_fd)
/* we have to wait for some data to come to us */
return -EWOULDBLOCK;
@@ -418,11 +449,11 @@ int process_data(RemoteSource *source) {
}
}
-int process_source(RemoteSource *source, Writer *writer, bool compress, bool seal) {
+int process_source(RemoteSource *source, bool compress, bool seal) {
int r;
assert(source);
- assert(writer);
+ assert(source->writer);
r = process_data(source);
if (r <= 0)
@@ -440,7 +471,7 @@ int process_source(RemoteSource *source, Writer *writer, bool compress, bool sea
assert(source->iovw.iovec);
assert(source->iovw.count);
- r = writer_write(writer, &source->iovw, &source->ts, compress, seal);
+ r = writer_write(source->writer, &source->iovw, &source->ts, compress, seal);
if (r < 0)
log_error("Failed to write entry of %zu bytes: %s",
iovw_size(&source->iovw), strerror(-r));