diff options
author | Zbigniew Jędrzejewski-Szmek <zbyszek@in.waw.pl> | 2014-07-02 00:15:37 -0400 |
---|---|---|
committer | Zbigniew Jędrzejewski-Szmek <zbyszek@in.waw.pl> | 2014-07-15 22:34:41 -0400 |
commit | 9ff48d0982fcb97923955685fe9fa4e0e67cb238 (patch) | |
tree | b26c25523cf45bfd1659565d2d902c24e8168724 /src | |
parent | a83f403760cb63b1bf7787e9ff325ffb6d891d39 (diff) |
journal-remote: rework fd and writer reference handling
Diffstat (limited to 'src')
-rw-r--r-- | src/journal-remote/journal-remote-parse.c | 43 | ||||
-rw-r--r-- | src/journal-remote/journal-remote-parse.h | 7 | ||||
-rw-r--r-- | src/journal-remote/journal-remote-write.c | 68 | ||||
-rw-r--r-- | src/journal-remote/journal-remote-write.h | 19 | ||||
-rw-r--r-- | src/journal-remote/journal-remote.c | 316 | ||||
-rw-r--r-- | src/journal-remote/journal-remote.h | 30 |
6 files changed, 276 insertions, 207 deletions
diff --git a/src/journal-remote/journal-remote-parse.c b/src/journal-remote/journal-remote-parse.c index 90d50a7ad0..cdc920eb4c 100644 --- a/src/journal-remote/journal-remote-parse.c +++ b/src/journal-remote/journal-remote-parse.c @@ -28,19 +28,48 @@ void source_free(RemoteSource *source) { if (!source) return; - if (source->fd >= 0) { + if (source->fd >= 0 && !source->passive_fd) { log_debug("Closing fd:%d (%s)", source->fd, source->name); safe_close(source->fd); } + free(source->name); free(source->buf); iovw_free_contents(&source->iovw); + log_debug("Writer ref count %u", source->writer->n_ref); + writer_unref(source->writer); + sd_event_source_unref(source->event); free(source); } +/** + * Initialize zero-filled source with given values. On success, takes + * ownerhship of fd and writer, otherwise does not touch them. + */ +RemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer) { + + RemoteSource *source; + + log_debug("Creating source for %sfd:%d (%s)", + passive_fd ? "passive " : "", fd, name); + + assert(fd >= 0); + + source = new0(RemoteSource, 1); + if (!source) + return NULL; + + source->fd = fd; + source->passive_fd = passive_fd; + source->name = name; + source->writer = writer; + + return source; +} + static int get_line(RemoteSource *source, char **line, size_t *size) { ssize_t n, remain; char *c = NULL; @@ -51,6 +80,7 @@ static int get_line(RemoteSource *source, char **line, size_t *size) { assert(source->state == STATE_LINE); assert(source->filled <= source->size); assert(source->buf == NULL || source->size > 0); + assert(source->fd >= 0); while (true) { if (source->buf) @@ -65,7 +95,7 @@ static int get_line(RemoteSource *source, char **line, size_t *size) { return -E2BIG; } - if (source->fd < 0) + if (source->passive_fd) /* we have to wait for some data to come to us */ return -EWOULDBLOCK; @@ -146,10 +176,11 @@ static int fill_fixed_size(RemoteSource *source, void **data, size_t size) { assert(source->scanned <= source->filled); assert(source->buf != NULL || source->size == 0); assert(source->buf == NULL || source->size > 0); + assert(source->fd >= 0); assert(data); while(source->filled < size) { - if (source->fd < 0) + if (source->passive_fd) /* we have to wait for some data to come to us */ return -EWOULDBLOCK; @@ -418,11 +449,11 @@ int process_data(RemoteSource *source) { } } -int process_source(RemoteSource *source, Writer *writer, bool compress, bool seal) { +int process_source(RemoteSource *source, bool compress, bool seal) { int r; assert(source); - assert(writer); + assert(source->writer); r = process_data(source); if (r <= 0) @@ -440,7 +471,7 @@ int process_source(RemoteSource *source, Writer *writer, bool compress, bool sea assert(source->iovw.iovec); assert(source->iovw.count); - r = writer_write(writer, &source->iovw, &source->ts, compress, seal); + r = writer_write(source->writer, &source->iovw, &source->ts, compress, seal); if (r < 0) log_error("Failed to write entry of %zu bytes: %s", iovw_size(&source->iovw), strerror(-r)); diff --git a/src/journal-remote/journal-remote-parse.h b/src/journal-remote/journal-remote-parse.h index 5b7f23650c..07d6ddb67f 100644 --- a/src/journal-remote/journal-remote-parse.h +++ b/src/journal-remote/journal-remote-parse.h @@ -35,6 +35,7 @@ typedef enum { typedef struct RemoteSource { char *name; int fd; + bool passive_fd; char *buf; size_t size; @@ -47,9 +48,13 @@ typedef struct RemoteSource { source_state state; dual_timestamp ts; + Writer *writer; + sd_event_source *event; } RemoteSource; +RemoteSource* source_new(int fd, bool passive_fd, char *name, Writer *writer); + static inline size_t source_non_empty(RemoteSource *source) { assert(source); @@ -59,4 +64,4 @@ static inline size_t source_non_empty(RemoteSource *source) { void source_free(RemoteSource *source); int process_data(RemoteSource *source); int push_data(RemoteSource *source, const char *data, size_t size); -int process_source(RemoteSource *source, Writer *writer, bool compress, bool seal); +int process_source(RemoteSource *source, bool compress, bool seal); diff --git a/src/journal-remote/journal-remote-write.c b/src/journal-remote/journal-remote-write.c index 3b00ff5200..cdd06f9eff 100644 --- a/src/journal-remote/journal-remote-write.c +++ b/src/journal-remote/journal-remote-write.c @@ -19,6 +19,7 @@ along with systemd; If not, see <http://www.gnu.org/licenses/>. ***/ +#include "journal-remote.h" #include "journal-remote-write.h" int iovw_put(struct iovec_wrapper *iovw, void* data, size_t len) { @@ -64,32 +65,67 @@ static int do_rotate(JournalFile **f, bool compress, bool seal) { return r; } -int writer_init(Writer *s) { - assert(s); +Writer* writer_new(RemoteServer *server) { + Writer *w; - s->journal = NULL; + w = new0(Writer, 1); + if (!w) + return NULL; - memset(&s->metrics, 0xFF, sizeof(s->metrics)); + memset(&w->metrics, 0xFF, sizeof(w->metrics)); - s->mmap = mmap_cache_new(); - if (!s->mmap) - return log_oom(); + w->mmap = mmap_cache_new(); + if (!w->mmap) { + free(w); + return NULL; + } - s->seqnum = 0; + w->n_ref = 1; + w->server = server; - return 0; + return w; } -int writer_close(Writer *s) { - if (s->journal) { - journal_file_close(s->journal); - log_debug("Journal has been closed."); +Writer* writer_free(Writer *w) { + if (!w) + return NULL; + + if (w->journal) { + log_debug("Closing journal file %s.", w->journal->path); + journal_file_close(w->journal); } - if (s->mmap) - mmap_cache_unref(s->mmap); - return 0; + + if (w->server) { + w->server->event_count += w->seqnum; + if (w->hashmap_key) + hashmap_remove(w->server->writers, w->hashmap_key); + } + + free(w->hashmap_key); + + if (w->mmap) + mmap_cache_unref(w->mmap); + + free(w); + + return NULL; +} + +Writer* writer_unref(Writer *w) { + if (w && (-- w->n_ref <= 0)) + writer_free(w); + + return NULL; } +Writer* writer_ref(Writer *w) { + if (w) + assert_se(++ w->n_ref >= 2); + + return w; +} + + int writer_write(Writer *s, struct iovec_wrapper *iovw, dual_timestamp *ts, diff --git a/src/journal-remote/journal-remote-write.h b/src/journal-remote/journal-remote-write.h index 2ea5e6714f..9c5a641d2e 100644 --- a/src/journal-remote/journal-remote-write.h +++ b/src/journal-remote/journal-remote-write.h @@ -25,6 +25,8 @@ #include "journal-file.h" +typedef struct RemoteServer RemoteServer; + struct iovec_wrapper { struct iovec *iovec; size_t size_bytes; @@ -38,12 +40,25 @@ size_t iovw_size(struct iovec_wrapper *iovw); typedef struct Writer { JournalFile *journal; JournalMetrics metrics; + MMapCache *mmap; + RemoteServer *server; + char *hashmap_key; + uint64_t seqnum; + + int n_ref; } Writer; -int writer_init(Writer *s); -int writer_close(Writer *s); +Writer* writer_new(RemoteServer* server); +Writer* writer_free(Writer *w); + +Writer* writer_ref(Writer *w); +Writer* writer_unref(Writer *w); + +DEFINE_TRIVIAL_CLEANUP_FUNC(Writer*, writer_unref); +#define _cleanup_writer_unref_ _cleanup_(writer_unrefp) + int writer_write(Writer *s, struct iovec_wrapper *iovw, dual_timestamp *ts, diff --git a/src/journal-remote/journal-remote.c b/src/journal-remote/journal-remote.c index d37fac3cea..e127b0b800 100644 --- a/src/journal-remote/journal-remote.c +++ b/src/journal-remote/journal-remote.c @@ -21,7 +21,6 @@ #include <errno.h> #include <fcntl.h> -#include <inttypes.h> #include <stdio.h> #include <stdlib.h> #include <string.h> @@ -33,7 +32,6 @@ #include <getopt.h> #include "sd-daemon.h" -#include "sd-event.h" #include "journal-file.h" #include "journald-native.h" #include "socket-util.h" @@ -41,17 +39,15 @@ #include "build.h" #include "macro.h" #include "strv.h" -#include "hashmap.h" #include "fileio.h" #include "conf-parser.h" -#include "microhttpd-util.h" #include "siphash24.h" #ifdef HAVE_GNUTLS #include <gnutls/gnutls.h> #endif -#include "journal-remote-parse.h" +#include "journal-remote.h" #include "journal-remote-write.h" #define REMOTE_JOURNAL_PATH "/var/log/journal/remote" @@ -71,7 +67,7 @@ static int arg_seal = false; static int http_socket = -1, https_socket = -1; static char** arg_gnutls_log = NULL; -static JournalWriteSplitMode arg_split_mode = JOURNAL_WRITE_SPLIT_NONE; +static JournalWriteSplitMode arg_split_mode = JOURNAL_WRITE_SPLIT_HOST; static char* arg_output = NULL; static char *arg_key = NULL; @@ -170,7 +166,7 @@ static int spawn_getter(const char *getter, const char *url) { return r; } -#define filename_escape(s) xescape((s), "./ ") +#define filename_escape(s) xescape((s), "/ ") static int open_output(Writer *w, const char* host) { _cleanup_free_ char *_output = NULL; @@ -223,26 +219,78 @@ static int open_output(Writer *w, const char* host) { ********************************************************************** **********************************************************************/ -typedef struct MHDDaemonWrapper { - uint64_t fd; - struct MHD_Daemon *daemon; +static int init_writer_hashmap(RemoteServer *s) { + static const struct { + hash_func_t hash_func; + compare_func_t compare_func; + } functions[] = { + [JOURNAL_WRITE_SPLIT_NONE] = {trivial_hash_func, + trivial_compare_func}, + [JOURNAL_WRITE_SPLIT_HOST] = {string_hash_func, + string_compare_func}, + }; + + assert(arg_split_mode >= 0 && arg_split_mode < (int) ELEMENTSOF(functions)); - sd_event_source *event; -} MHDDaemonWrapper; + s->writers = hashmap_new(functions[arg_split_mode].hash_func, + functions[arg_split_mode].compare_func); + if (!s->writers) + return log_oom(); + + return 0; +} + +static int get_writer(RemoteServer *s, const char *host, + Writer **writer) { + const void *key; + _cleanup_writer_unref_ Writer *w = NULL; + int r; + + switch(arg_split_mode) { + case JOURNAL_WRITE_SPLIT_NONE: + key = "one and only"; + break; + + case JOURNAL_WRITE_SPLIT_HOST: + assert(host); + key = host; + break; + + default: + assert_not_reached("what split mode?"); + } + + w = hashmap_get(s->writers, key); + if (w) + writer_ref(w); + else { + w = writer_new(s); + if (!w) + return log_oom(); + + if (arg_split_mode == JOURNAL_WRITE_SPLIT_HOST) { + w->hashmap_key = strdup(key); + if (!w->hashmap_key) + return log_oom(); + } -typedef struct RemoteServer { - RemoteSource **sources; - size_t sources_size; - size_t active; + r = open_output(w, host); + if (r < 0) + return r; - sd_event *events; - sd_event_source *sigterm_event, *sigint_event, *listen_event; + r = hashmap_put(s->writers, w->hashmap_key ?: key, w); + if (r < 0) + return r; + } - Hashmap *writers; + *writer = w; + w = NULL; + return 0; +} - bool check_trust; - Hashmap *daemons; -} RemoteServer; +/********************************************************************** + ********************************************************************** + **********************************************************************/ /* This should go away as soon as µhttpd allows state to be passed around. */ static RemoteServer *server; @@ -260,18 +308,31 @@ static int dispatch_http_event(sd_event_source *event, uint32_t revents, void *userdata); -static int get_source_for_fd(RemoteServer *s, int fd, RemoteSource **source) { +static int get_source_for_fd(RemoteServer *s, + int fd, char *name, RemoteSource **source) { + Writer *writer; + int r; + assert(fd >= 0); assert(source); if (!GREEDY_REALLOC0(s->sources, s->sources_size, fd + 1)) return log_oom(); + r = get_writer(s, name, &writer); + if (r < 0) { + log_warning("Failed to get writer for source %s: %s", + name, strerror(-r)); + return r; + } + if (s->sources[fd] == NULL) { - s->sources[fd] = new0(RemoteSource, 1); - if (!s->sources[fd]) + s->sources[fd] = source_new(fd, false, name, writer); + if (!s->sources[fd]) { + writer_unref(writer); return log_oom(); - s->sources[fd]->fd = -1; + } + s->active++; } @@ -296,36 +357,28 @@ static int remove_source(RemoteServer *s, int fd) { return 0; } -static int add_source(RemoteServer *s, int fd, const char* _name) { +static int add_source(RemoteServer *s, int fd, char* name, bool own_name) { RemoteSource *source; - char *name; int r; assert(s); assert(fd >= 0); - assert(_name); + assert(name); - log_debug("Creating source for fd:%d (%s)", fd, _name); - - name = strdup(_name); - if (!name) - return log_oom(); + if (!own_name) { + name = strdup(name); + if (!name) + return log_oom(); + } - r = get_source_for_fd(s, fd, &source); + r = get_source_for_fd(s, fd, name, &source); if (r < 0) { - log_error("Failed to create source for fd:%d (%s)", fd, name); - free(name); + log_error("Failed to create source for fd:%d (%s): %s", + fd, name, strerror(-r)); return r; } - assert(source); - assert(source->fd < 0); - assert(!source->name); - - source->fd = fd; - source->name = name; - r = sd_event_add_io(s->events, &source->event, fd, EPOLLIN|EPOLLRDHUP|EPOLLPRI, dispatch_raw_source_event, s); @@ -371,97 +424,29 @@ static int setup_raw_socket(RemoteServer *s, const char *address) { ********************************************************************** **********************************************************************/ -static int init_writer_hashmap(RemoteServer *s) { - static const struct { - hash_func_t hash_func; - compare_func_t compare_func; - } functions[] = { - [JOURNAL_WRITE_SPLIT_NONE] = {trivial_hash_func, - trivial_compare_func}, - [JOURNAL_WRITE_SPLIT_HOST] = {string_hash_func, - string_compare_func}, - }; - - assert(arg_split_mode >= 0 && arg_split_mode < (int) ELEMENTSOF(functions)); - - s->writers = hashmap_new(functions[arg_split_mode].hash_func, - functions[arg_split_mode].compare_func); - if (!s->writers) - return log_oom(); - - return 0; -} - -static int get_writer(RemoteServer *s, const char *host, sd_id128_t *machine, - Writer **writer) { - const void *key; - Writer *w; - int r; - - switch(arg_split_mode) { - case JOURNAL_WRITE_SPLIT_NONE: - key = "one and only"; - break; - - case JOURNAL_WRITE_SPLIT_HOST: - assert(host); - key = host; - break; - - default: - assert_not_reached("what split mode?"); - } - - w = hashmap_get(s->writers, key); - if (!w) { - w = new0(Writer, 1); - if (!w) - return -ENOMEM; - - r = writer_init(w); - if (r < 0) { - free(w); - return r; - } - - r = hashmap_put(s->writers, key, w); - if (r < 0) { - writer_close(w); - free(w); - return r; - } - - r = open_output(w, host); - if (r < 0) - return r; - } - - *writer = w; - return 0; -} - -/********************************************************************** - ********************************************************************** - **********************************************************************/ - static RemoteSource *request_meta(void **connection_cls, int fd, char *hostname) { RemoteSource *source; + Writer *writer; + int r; assert(connection_cls); - if (*connection_cls) { - free(hostname); + if (*connection_cls) return *connection_cls; + + r = get_writer(server, hostname, &writer); + if (r < 0) { + log_warning("Failed to get writer for source %s: %s", + hostname, strerror(-r)); + return NULL; } - source = new0(RemoteSource, 1); + source = source_new(fd, true, hostname, writer); if (!source) { - free(hostname); + log_oom(); + writer_unref(writer); return NULL; } - source->fd = -1; /* fd */ - source->name = hostname; - log_debug("Added RemoteSource as connection metadata %p", source); *connection_cls = source; @@ -488,26 +473,15 @@ static int process_http_upload( size_t *upload_data_size, RemoteSource *source) { - Writer *w; - int r; bool finished = false; size_t remaining; + int r; assert(source); log_debug("request_handler_upload: connection %p, %zu bytes", connection, *upload_data_size); - r = get_writer(server, source->name, NULL, &w); - if (r < 0) { - log_warning("Failed to get writer for source %s: %s", - source->name, strerror(-r)); - return mhd_respondf(connection, - MHD_HTTP_SERVICE_UNAVAILABLE, - "Failed to get writer for connection: %s.\n", - strerror(-r)); - } - if (*upload_data_size) { log_debug("Received %zu bytes", *upload_data_size); @@ -520,8 +494,7 @@ static int process_http_upload( finished = true; while (true) { - - r = process_source(source, w, arg_compress, arg_seal); + r = process_source(source, arg_compress, arg_seal); if (r == -EAGAIN || r == -EWOULDBLOCK) break; else if (r < 0) { @@ -566,7 +539,7 @@ static int request_handler( const char *header; int r, code, fd; - char *hostname; + _cleanup_free_ char *hostname = NULL; assert(connection); assert(connection_cls); @@ -627,6 +600,7 @@ static int request_handler( if (!request_meta(connection_cls, fd, hostname)) return respond_oom(connection); + hostname = NULL; return MHD_YES; } @@ -871,7 +845,7 @@ static int remoteserver_init(RemoteServer *s, else r = add_raw_socket(s, fd); } else if (sd_is_socket(fd, AF_UNSPEC, 0, true)) { - _cleanup_free_ char *hostname = NULL; + char *hostname; r = getnameinfo_pretty(fd, &hostname); if (r < 0) { @@ -881,7 +855,9 @@ static int remoteserver_init(RemoteServer *s, log_info("Received a connection socket (fd:%d) from %s", fd, hostname); - r = add_source(s, fd, hostname); + r = add_source(s, fd, hostname, true); + if (r < 0) + free(hostname); } else { log_error("Unknown socket passed on fd:%d", fd); @@ -917,7 +893,7 @@ static int remoteserver_init(RemoteServer *s, startswith(arg_url, "http://") ?: arg_url; - r = add_source(s, fd, hostname); + r = add_source(s, fd, (char*) hostname, false); if (r < 0) return r; @@ -966,7 +942,7 @@ static int remoteserver_init(RemoteServer *s, output_name = *file; } - r = add_source(s, fd, output_name); + r = add_source(s, fd, (char*) output_name, false); if (r < 0) return r; } @@ -987,9 +963,7 @@ static int remoteserver_init(RemoteServer *s, /* In this case we know what the writer will be called, so we can create it and verify that we can create output as expected. */ - Writer *w; - - r = get_writer(s, NULL, NULL, &w); + r = get_writer(s, NULL, &s->_single_writer); if (r < 0) return r; } @@ -997,26 +971,10 @@ static int remoteserver_init(RemoteServer *s, return 0; } -static int server_destroy(RemoteServer *s, uint64_t *event_count) { - int r; +static void server_destroy(RemoteServer *s) { size_t i; - Writer *w; MHDDaemonWrapper *d; - *event_count = 0; - - while ((w = hashmap_steal_first(s->writers))) { - log_info("seqnum %"PRIu64, w->seqnum); - *event_count += w->seqnum; - - r = writer_close(w); - if (r < 0) - log_warning("Failed to close writer: %s", strerror(-r)); - free(w); - } - - hashmap_free(s->writers); - while ((d = hashmap_steal_first(s->daemons))) { MHD_stop_daemon(d->daemon); sd_event_source_unref(d->event); @@ -1028,17 +986,17 @@ static int server_destroy(RemoteServer *s, uint64_t *event_count) { assert(s->sources_size == 0 || s->sources); for (i = 0; i < s->sources_size; i++) remove_source(s, i); - free(s->sources); + writer_unref(s->_single_writer); + hashmap_free(s->writers); + sd_event_source_unref(s->sigterm_event); sd_event_source_unref(s->sigint_event); sd_event_source_unref(s->listen_event); sd_event_unref(s->events); /* fds that we're listening on remain open... */ - - return r; } /********************************************************************** @@ -1050,7 +1008,6 @@ static int dispatch_raw_source_event(sd_event_source *event, uint32_t revents, void *userdata) { - Writer *w; RemoteServer *s = userdata; RemoteSource *source; int r; @@ -1059,14 +1016,7 @@ static int dispatch_raw_source_event(sd_event_source *event, source = s->sources[fd]; assert(source->fd == fd); - r = get_writer(s, source->name, NULL, &w); - if (r < 0) { - log_warning("Failed to get writer for source %s: %s", - source->name, strerror(-r)); - return r; - } - - r = process_source(source, w, arg_compress, arg_seal); + r = process_source(source, arg_compress, arg_seal); if (source->state == STATE_EOF) { size_t remaining; @@ -1145,7 +1095,7 @@ static int dispatch_raw_connection_event(sd_event_source *event, uint32_t revents, void *userdata) { RemoteServer *s = userdata; - int fd2; + int fd2, r; SocketAddress addr = { .size = sizeof(union sockaddr_union), .type = SOCK_STREAM, @@ -1156,7 +1106,10 @@ static int dispatch_raw_connection_event(sd_event_source *event, if (fd2 < 0) return fd2; - return add_source(s, fd2, hostname); + r = add_source(s, fd2, hostname, true); + if (r < 0) + free(hostname); + return r; } /********************************************************************** @@ -1537,9 +1490,8 @@ static int setup_gnutls_logger(char **categories) { int main(int argc, char **argv) { RemoteServer s = {}; - int r, r2; + int r; _cleanup_free_ char *key = NULL, *cert = NULL, *trust = NULL; - uint64_t entry_count; log_show_color(true); log_parse_environment(); @@ -1585,8 +1537,8 @@ int main(int argc, char **argv) { } } - r2 = server_destroy(&s, &entry_count); - log_info("Finishing after writing %" PRIu64 " entries", entry_count); + server_destroy(&s); + log_info("Finishing after writing %" PRIu64 " entries", s.event_count); sd_notify(false, "STATUS=Shutting down..."); @@ -1594,5 +1546,5 @@ int main(int argc, char **argv) { free(arg_cert); free(arg_trust); - return r >= 0 && r2 >= 0 ? EXIT_SUCCESS : EXIT_FAILURE; + return r >= 0 ? EXIT_SUCCESS : EXIT_FAILURE; } diff --git a/src/journal-remote/journal-remote.h b/src/journal-remote/journal-remote.h new file mode 100644 index 0000000000..0422cea082 --- /dev/null +++ b/src/journal-remote/journal-remote.h @@ -0,0 +1,30 @@ +#include <inttypes.h> + +#include "sd-event.h" +#include "hashmap.h" +#include "microhttpd-util.h" + +#include "journal-remote-parse.h" + +typedef struct MHDDaemonWrapper { + uint64_t fd; + struct MHD_Daemon *daemon; + + sd_event_source *event; +} MHDDaemonWrapper; + +typedef struct RemoteServer { + RemoteSource **sources; + size_t sources_size; + size_t active; + + sd_event *events; + sd_event_source *sigterm_event, *sigint_event, *listen_event; + + Hashmap *writers; + Writer *_single_writer; + uint64_t event_count; + + bool check_trust; + Hashmap *daemons; +} RemoteServer; |