diff options
author | Zbigniew Jędrzejewski-Szmek <zbyszek@in.waw.pl> | 2014-03-15 15:58:03 -0400 |
---|---|---|
committer | Zbigniew Jędrzejewski-Szmek <zbyszek@in.waw.pl> | 2014-03-17 01:55:48 -0400 |
commit | cc64d0175a3c2c974709e9962c00fbe04d74c43f (patch) | |
tree | b03dc9591925761c583a8b14c101c1052f1ace0d /src | |
parent | fdfccdbc985944a57017a25f44dd6acc1a937bab (diff) |
journal-remote: HTTP(s) support
The whole tool is made dependent on µhttpd availability. It should be
easy to make the µhttpd parts conditional, but since transfer over
HTTP seems to be the primary use case, currently this is not done.
Current implementation uses nested epoll loops: sd-event is used for
the external event loop, and µhttpd uses epoll in its own
loop. Unfortunately µhttpd does not expose enough information to add
the descriptors it uses to the external event loop. This means that
starvation of other events is possible, if one of the inner µhttpd
loops is constantly busy. This means that µhttpd servers should not
be mixed with other sources.
The TLS authentication parts haven't been really tested properly, and
should not be take too seriously.
Diffstat (limited to 'src')
-rw-r--r-- | src/journal/journal-remote-parse.c | 14 | ||||
-rw-r--r-- | src/journal/journal-remote-parse.h | 1 | ||||
-rw-r--r-- | src/journal/journal-remote.c | 491 | ||||
-rw-r--r-- | src/journal/microhttpd-util.c | 17 |
4 files changed, 490 insertions, 33 deletions
diff --git a/src/journal/journal-remote-parse.c b/src/journal/journal-remote-parse.c index ee2260c5a9..c961844c44 100644 --- a/src/journal/journal-remote-parse.c +++ b/src/journal/journal-remote-parse.c @@ -100,6 +100,20 @@ static int get_line(RemoteSource *source, char **line, size_t *size) { return 1; } +int push_data(RemoteSource *source, const char *data, size_t size) { + assert(source); + assert(source->state != STATE_EOF); + + if (!GREEDY_REALLOC(source->buf, source->size, + source->filled + size)) + return log_oom(); + + memcpy(source->buf + source->filled, data, size); + source->filled += size; + + return 0; +} + static int fill_fixed_size(RemoteSource *source, void **data, size_t size) { int n; char *newbuf = NULL; diff --git a/src/journal/journal-remote-parse.h b/src/journal/journal-remote-parse.h index 3bda97e2d4..c1506d118d 100644 --- a/src/journal/journal-remote-parse.h +++ b/src/journal/journal-remote-parse.h @@ -57,4 +57,5 @@ static inline int source_non_empty(RemoteSource *source) { void source_free(RemoteSource *source); int process_data(RemoteSource *source); +int push_data(RemoteSource *source, const char *data, size_t size); int process_source(RemoteSource *source, Writer *writer, bool compress, bool seal); diff --git a/src/journal/journal-remote.c b/src/journal/journal-remote.c index f8979daca6..1a1ca2c480 100644 --- a/src/journal/journal-remote.c +++ b/src/journal/journal-remote.c @@ -41,6 +41,13 @@ #include "build.h" #include "macro.h" #include "strv.h" +#include "fileio.h" +#include "socket-util.h" +#include "microhttpd-util.h" + +#ifdef HAVE_GNUTLS +#include <gnutls/gnutls.h> +#endif #include "journal-remote-parse.h" #include "journal-remote-write.h" @@ -52,9 +59,15 @@ static char* arg_url = NULL; static char* arg_getter = NULL; static bool arg_stdin = false; static char* arg_listen_raw = NULL; +static char* arg_listen_http = NULL; +static char* arg_listen_https = NULL; static int arg_compress = true; static int arg_seal = false; +static char *key_pem = NULL; +static char *cert_pem = NULL; +static char *trust_pem = NULL; + /********************************************************************** ********************************************************************** **********************************************************************/ @@ -201,6 +214,17 @@ static int open_output(Writer *s, const char* url) { return r; } +/********************************************************************** + ********************************************************************** + **********************************************************************/ + +typedef struct MHDDaemonWrapper { + uint64_t fd; + struct MHD_Daemon *daemon; + + sd_event_source *event; +} MHDDaemonWrapper; + typedef struct RemoteServer { RemoteSource **sources; ssize_t sources_size; @@ -210,8 +234,13 @@ typedef struct RemoteServer { sd_event_source *sigterm_event, *sigint_event, *listen_event; Writer writer; + + Hashmap *daemons; } RemoteServer; +/* This should go away as soon as µhttpd allows state to be passed around. */ +static RemoteServer *server; + static int dispatch_raw_source_event(sd_event_source *event, int fd, uint32_t revents, @@ -220,6 +249,10 @@ static int dispatch_raw_connection_event(sd_event_source *event, int fd, uint32_t revents, void *userdata); +static int dispatch_http_event(sd_event_source *event, + int fd, + uint32_t revents, + void *userdata); static int get_source_for_fd(RemoteServer *s, int fd, RemoteSource **source) { assert(fd >= 0); @@ -325,6 +358,285 @@ static int setup_raw_socket(RemoteServer *s, const char *address) { ********************************************************************** **********************************************************************/ +static RemoteSource *request_meta(void **connection_cls) { + RemoteSource *source; + + assert(connection_cls); + if (*connection_cls) + return *connection_cls; + + source = new0(RemoteSource, 1); + if (!source) + return NULL; + source->fd = -1; + + log_debug("Added RemoteSource as connection metadata %p", source); + + *connection_cls = source; + return source; +} + +static void request_meta_free(void *cls, + struct MHD_Connection *connection, + void **connection_cls, + enum MHD_RequestTerminationCode toe) { + RemoteSource *s; + + assert(connection_cls); + s = *connection_cls; + + log_debug("Cleaning up connection metadata %p", s); + source_free(s); + *connection_cls = NULL; +} + +static int process_http_upload( + struct MHD_Connection *connection, + const char *upload_data, + size_t *upload_data_size, + RemoteSource *source) { + + bool finished = false; + int r; + + assert(source); + + log_debug("request_handler_upload: connection %p, %zu bytes", + connection, *upload_data_size); + + if (*upload_data_size) { + log_info("Received %zu bytes", *upload_data_size); + + r = push_data(source, upload_data, *upload_data_size); + if (r < 0) { + log_error("Failed to store received data of size %zu: %s", + *upload_data_size, strerror(-r)); + return respond_oom_internal(connection); + } + *upload_data_size = 0; + } else + finished = true; + + while (true) { + r = process_source(source, &server->writer, arg_compress, arg_seal); + if (r == -E2BIG) + log_warning("Entry too big, skipped"); + else if (r == -EAGAIN || r == -EWOULDBLOCK) + break; + else if (r < 0) { + log_warning("Failed to process data for connection %p", connection); + return respond_error(connection, MHD_HTTP_UNPROCESSABLE_ENTITY, + "Processing failed: %s", strerror(-r)); + } + } + + if (!finished) + return MHD_YES; + + /* The upload is finished */ + + if (source_non_empty(source)) { + log_warning("EOF reached with incomplete data"); + return respond_error(connection, MHD_HTTP_EXPECTATION_FAILED, + "Trailing data not processed."); + } + + return respond_error(connection, MHD_HTTP_ACCEPTED, "OK.\n"); +}; + +static int request_handler( + void *cls, + struct MHD_Connection *connection, + const char *url, + const char *method, + const char *version, + const char *upload_data, + size_t *upload_data_size, + void **connection_cls) { + + const char *header; + int r ,code; + + assert(connection); + assert(connection_cls); + assert(url); + assert(method); + + log_debug("Handling a connection %s %s %s", method, url, version); + + if (*connection_cls) + return process_http_upload(connection, + upload_data, upload_data_size, + *connection_cls); + + if (!streq(method, "POST")) + return respond_error(connection, MHD_HTTP_METHOD_NOT_ACCEPTABLE, + "Unsupported method.\n"); + + if (!streq(url, "/upload")) + return respond_error(connection, MHD_HTTP_NOT_FOUND, + "Not found.\n"); + + header = MHD_lookup_connection_value(connection, + MHD_HEADER_KIND, "Content-Type"); + if (!header || !streq(header, "application/vnd.fdo.journal")) + return respond_error(connection, MHD_HTTP_UNSUPPORTED_MEDIA_TYPE, + "Content-Type: application/vnd.fdo.journal" + " is required.\n"); + + if (trust_pem) { + r = check_permissions(connection, &code); + if (r < 0) + return code; + } + + if (!request_meta(connection_cls)) + return respond_oom(connection); + return MHD_YES; +} + +static int setup_microhttpd_server(RemoteServer *s, int fd, bool https) { + struct MHD_OptionItem opts[] = { + { MHD_OPTION_NOTIFY_COMPLETED, (intptr_t) request_meta_free}, + { MHD_OPTION_EXTERNAL_LOGGER, (intptr_t) microhttpd_logger}, + { MHD_OPTION_LISTEN_SOCKET, fd}, + { MHD_OPTION_END}, + { MHD_OPTION_END}, + { MHD_OPTION_END}, + { MHD_OPTION_END}}; + int opts_pos = 3; + int flags = + MHD_USE_DEBUG | + MHD_USE_PEDANTIC_CHECKS | + MHD_USE_EPOLL_LINUX_ONLY | + MHD_USE_DUAL_STACK; + + const union MHD_DaemonInfo *info; + int r, epoll_fd; + MHDDaemonWrapper *d; + + assert(fd >= 0); + + r = fd_nonblock(fd, true); + if (r < 0) { + log_error("Failed to make fd:%d nonblocking: %s", fd, strerror(-r)); + return r; + } + + if (https) { + opts[opts_pos++] = (struct MHD_OptionItem) + {MHD_OPTION_HTTPS_MEM_KEY, 0, key_pem}; + opts[opts_pos++] = (struct MHD_OptionItem) + {MHD_OPTION_HTTPS_MEM_CERT, 0, cert_pem}; + + flags |= MHD_USE_SSL; + + if (trust_pem) + opts[opts_pos++] = (struct MHD_OptionItem) + {MHD_OPTION_HTTPS_MEM_TRUST, 0, trust_pem}; + } + + d = new(MHDDaemonWrapper, 1); + if (!d) + return log_oom(); + + d->fd = (uint64_t) fd; + + d->daemon = MHD_start_daemon(flags, 0, + NULL, NULL, + request_handler, NULL, + MHD_OPTION_ARRAY, opts, + MHD_OPTION_END); + if (!d->daemon) { + log_error("Failed to start µhttp daemon"); + r = -EINVAL; + goto error; + } + + log_debug("Started MHD %s daemon on fd:%d (wrapper @ %p)", + https ? "HTTPS" : "HTTP", fd, d); + + + info = MHD_get_daemon_info(d->daemon, MHD_DAEMON_INFO_EPOLL_FD_LINUX_ONLY); + if (!info) { + log_error("µhttp returned NULL daemon info"); + r = -ENOTSUP; + goto error; + } + + epoll_fd = info->listen_fd; + if (epoll_fd < 0) { + log_error("µhttp epoll fd is invalid"); + r = -EUCLEAN; + goto error; + } + + r = sd_event_add_io(s->events, &d->event, + epoll_fd, EPOLLIN, dispatch_http_event, d); + if (r < 0) { + log_error("Failed to add event callback: %s", strerror(-r)); + goto error; + } + + r = hashmap_ensure_allocated(&s->daemons, uint64_hash_func, uint64_compare_func); + if (r < 0) { + log_oom(); + goto error; + } + + r = hashmap_put(s->daemons, &d->fd, d); + if (r < 0) { + log_error("Failed to add daemon to hashmap: %s", strerror(-r)); + goto error; + } + + s->active ++; + return 0; + +error: + MHD_stop_daemon(d->daemon); + free(d->daemon); + free(d); + return r; +} + +static int setup_microhttpd_socket(RemoteServer *s, + const char *address, + bool https) { + int fd; + + fd = make_socket_fd(LOG_INFO, address, SOCK_STREAM | SOCK_CLOEXEC); + if (fd < 0) + return fd; + + return setup_microhttpd_server(s, fd, https); +} + +static int dispatch_http_event(sd_event_source *event, + int fd, + uint32_t revents, + void *userdata) { + MHDDaemonWrapper *d = userdata; + int r; + + assert(d); + + log_info("%s", __func__); + + r = MHD_run(d->daemon); + if (r == MHD_NO) { + log_error("MHD_run failed!"); + // XXX: unregister daemon + return -EINVAL; + } + + return 1; /* work to do */ +} + +/********************************************************************** + ********************************************************************** + **********************************************************************/ + static int dispatch_sigterm(sd_event_source *event, const struct signalfd_siginfo *si, void *userdata) { @@ -369,6 +681,9 @@ static int remoteserver_init(RemoteServer *s) { setup_signals(s); + assert(server == NULL); + server = s; + n = sd_listen_fds(true); if (n < 0) { log_error("Failed to read listening file descriptors from environment: %s", @@ -426,6 +741,22 @@ static int remoteserver_init(RemoteServer *s) { output_name = arg_listen_raw; } + if (arg_listen_http) { + r = setup_microhttpd_socket(s, arg_listen_http, false); + if (r < 0) + return r; + + output_name = arg_listen_http; + } + + if (arg_listen_https) { + r = setup_microhttpd_socket(s, arg_listen_https, true); + if (r < 0) + return r; + + output_name = arg_listen_https; + } + if (arg_stdin) { log_info("Reading standard input..."); r = add_source(s, STDIN_FILENO, "stdin"); @@ -454,11 +785,20 @@ static int remoteserver_init(RemoteServer *s) { static int server_destroy(RemoteServer *s) { int r; ssize_t i; + MHDDaemonWrapper *d; r = writer_close(&s->writer); + while ((d = hashmap_steal_first(s->daemons))) { + MHD_stop_daemon(d->daemon); + sd_event_source_unref(d->event); + free(d); + } + + hashmap_free(s->daemons); + assert(s->sources_size == 0 || s->sources); - for(i = 0; i < s->sources_size; i++) + for (i = 0; i < s->sources_size; i++) remove_source(s, i); free(s->sources); @@ -506,56 +846,61 @@ static int dispatch_raw_source_event(sd_event_source *event, return r; } -static int dispatch_raw_connection_event(sd_event_source *event, - int fd, - uint32_t revents, - void *userdata) { - RemoteServer *s = userdata; - - SocketAddress addr = { - .size = sizeof(union sockaddr_union), - .type = SOCK_STREAM, - }; +static int accept_connection(const char* type, int fd, SocketAddress *addr) { int fd2, r; - log_debug("Accepting new connection on fd:%d", fd); - fd2 = accept4(fd, &addr.sockaddr.sa, &addr.size, SOCK_NONBLOCK|SOCK_CLOEXEC); + log_debug("Accepting new %s connection on fd:%d", type, fd); + fd2 = accept4(fd, &addr->sockaddr.sa, &addr->size, SOCK_NONBLOCK|SOCK_CLOEXEC); if (fd2 < 0) { log_error("accept() on fd:%d failed: %m", fd); return -errno; } - switch(socket_address_family(&addr)) { + switch(socket_address_family(addr)) { case AF_INET: case AF_INET6: { char* _cleanup_free_ a = NULL; - r = socket_address_print(&addr, &a); + r = socket_address_print(addr, &a); if (r < 0) { log_error("socket_address_print(): %s", strerror(-r)); close(fd2); return r; } - log_info("Accepted %s connection from %s", - socket_address_family(&addr) == AF_INET ? "IP" : "IPv6", + log_info("Accepted %s %s connection from %s", + type, + socket_address_family(addr) == AF_INET ? "IP" : "IPv6", a); - break; + + return fd2; }; default: - log_error("Connection with unsupported family %d", - socket_address_family(&addr)); + log_error("Rejected %s connection with unsupported family %d", + type, socket_address_family(addr)); close(fd2); + return -EINVAL; } +} - r = add_source(s, fd2, NULL); - if (r < 0) - log_error("failed to create source from fd:%d: %s", fd2, strerror(-r)); +static int dispatch_raw_connection_event(sd_event_source *event, + int fd, + uint32_t revents, + void *userdata) { + RemoteServer *s = userdata; + int fd2; + SocketAddress addr = { + .size = sizeof(union sockaddr_union), + .type = SOCK_STREAM, + }; - return r; -} + fd2 = accept_connection("raw", fd, &addr); + if (fd2 < 0) + return fd2; + return add_source(s, fd2, NULL); +} /********************************************************************** ********************************************************************** @@ -568,6 +913,8 @@ static int help(void) { " --url=URL Read events from systemd-journal-gatewayd at URL\n" " --getter=COMMAND Read events from the output of COMMAND\n" " --listen-raw=ADDR Listen for connections at ADDR\n" + " --listen-http=ADDR Listen for HTTP connections at ADDR\n" + " --listen-https=ADDR Listen for HTTPS connections at ADDR\n" " --stdin Read events from standard input\n" " -o --output=FILE|DIR Write output to FILE or DIR/external-*.journal\n" " --[no-]compress Use XZ-compression in the output journal (default: yes)\n" @@ -586,12 +933,17 @@ static int parse_argv(int argc, char *argv[]) { ARG_VERSION = 0x100, ARG_URL, ARG_LISTEN_RAW, + ARG_LISTEN_HTTP, + ARG_LISTEN_HTTPS, ARG_STDIN, ARG_GETTER, ARG_COMPRESS, ARG_NO_COMPRESS, ARG_SEAL, ARG_NO_SEAL, + ARG_KEY, + ARG_CERT, + ARG_TRUST, }; static const struct option options[] = { @@ -600,16 +952,21 @@ static int parse_argv(int argc, char *argv[]) { { "url", required_argument, NULL, ARG_URL }, { "getter", required_argument, NULL, ARG_GETTER }, { "listen-raw", required_argument, NULL, ARG_LISTEN_RAW }, + { "listen-http", required_argument, NULL, ARG_LISTEN_HTTP }, + { "listen-https", required_argument, NULL, ARG_LISTEN_HTTPS }, { "stdin", no_argument, NULL, ARG_STDIN }, { "output", required_argument, NULL, 'o' }, { "compress", no_argument, NULL, ARG_COMPRESS }, { "no-compress", no_argument, NULL, ARG_NO_COMPRESS }, { "seal", no_argument, NULL, ARG_SEAL }, { "no-seal", no_argument, NULL, ARG_NO_SEAL }, + { "key", required_argument, NULL, ARG_KEY }, + { "cert", required_argument, NULL, ARG_CERT }, + { "trust", required_argument, NULL, ARG_TRUST }, {} }; - int c; + int c, r; assert(argc >= 0); assert(argv); @@ -652,6 +1009,67 @@ static int parse_argv(int argc, char *argv[]) { arg_listen_raw = optarg; break; + case ARG_LISTEN_HTTP: + if (arg_listen_http) { + log_error("cannot currently use --listen-http more than once"); + return -EINVAL; + } + + arg_listen_http = optarg; + break; + + case ARG_LISTEN_HTTPS: + if (arg_listen_https) { + log_error("cannot currently use --listen-https more than once"); + return -EINVAL; + } + + arg_listen_https = optarg; + break; + + case ARG_KEY: + if (key_pem) { + log_error("Key file specified twice"); + return -EINVAL; + } + r = read_full_file(optarg, &key_pem, NULL); + if (r < 0) { + log_error("Failed to read key file: %s", strerror(-r)); + return r; + } + assert(key_pem); + break; + + case ARG_CERT: + if (cert_pem) { + log_error("Certificate file specified twice"); + return -EINVAL; + } + r = read_full_file(optarg, &cert_pem, NULL); + if (r < 0) { + log_error("Failed to read certificate file: %s", strerror(-r)); + return r; + } + assert(cert_pem); + break; + + case ARG_TRUST: +#ifdef HAVE_GNUTLS + if (trust_pem) { + log_error("CA certificate file specified twice"); + return -EINVAL; + } + r = read_full_file(optarg, &trust_pem, NULL); + if (r < 0) { + log_error("Failed to read CA certificate file: %s", strerror(-r)); + return r; + } + assert(trust_pem); + break; +#else + log_error("Option --trust is not available."); +#endif + case ARG_STDIN: arg_stdin = true; break; @@ -686,6 +1104,11 @@ static int parse_argv(int argc, char *argv[]) { return -EINVAL; } + if (arg_listen_https && !(key_pem && cert_pem)) { + log_error("Options --key and --cert must be used when https sources are specified"); + return -EINVAL; + } + if (optind < argc) { log_error("This program takes no positional arguments"); return -EINVAL; @@ -694,6 +1117,18 @@ static int parse_argv(int argc, char *argv[]) { return 1 /* work to do */; } +static int setup_gnutls_logger(void) { + if (!arg_listen_http && !arg_listen_https) + return 0; + +#ifdef HAVE_GNUTLS + gnutls_global_set_log_function(log_func_gnutls); + gnutls_global_set_log_level(GNUTLS_LOG_LEVEL); +#endif + + return 0; +} + int main(int argc, char **argv) { RemoteServer s = {}; int r, r2; @@ -706,6 +1141,10 @@ int main(int argc, char **argv) { if (r <= 0) return r == 0 ? EXIT_SUCCESS : EXIT_FAILURE; + r = setup_gnutls_logger(); + if (r < 0) + return EXIT_FAILURE; + if (remoteserver_init(&s) < 0) return EXIT_FAILURE; diff --git a/src/journal/microhttpd-util.c b/src/journal/microhttpd-util.c index 17135abf8b..f7f12e1a8e 100644 --- a/src/journal/microhttpd-util.c +++ b/src/journal/microhttpd-util.c @@ -49,13 +49,14 @@ void microhttpd_logger(void *arg, const char *fmt, va_list ap) { int respond_oom_internal(struct MHD_Connection *connection) { + const char *m = "Out of memory.\n"; + struct MHD_Response *response; - const char m[] = "Out of memory.\n"; int ret; assert(connection); - response = MHD_create_response_from_buffer(sizeof(m)-1, (char*) m, MHD_RESPMEM_PERSISTENT); + response = MHD_create_response_from_buffer(strlen(m), (char*) m, MHD_RESPMEM_PERSISTENT); if (!response) return MHD_NO; @@ -92,7 +93,7 @@ int respond_error(struct MHD_Connection *connection, return respond_oom(connection); } - log_debug("queing response %u: %s", code, m); + log_debug("Queing response %u: %s", code, m); MHD_add_response_header(response, "Content-Type", "text/plain"); r = MHD_queue_response(connection, code, response); MHD_destroy_response(response); @@ -227,8 +228,10 @@ int check_permissions(struct MHD_Connection *connection, int *code) { ci = MHD_get_connection_info(connection, MHD_CONNECTION_INFO_GNUTLS_SESSION); if (!ci) { - log_error("MHD_get_connection_info failed"); - return -EINVAL; + log_error("MHD_get_connection_info failed: session is unencrypted"); + *code = respond_error(connection, MHD_HTTP_FORBIDDEN, + "Encrypted connection is required"); + return -EPERM; } session = ci->tls_session; assert(session); @@ -247,11 +250,11 @@ int check_permissions(struct MHD_Connection *connection, int *code) { return -EPERM; } - log_info("Connection from %s", buf); + log_info("Connection from DN %s", buf); r = verify_cert_authorized(session); if (r < 0) { - log_error("Client is not authorized"); + log_warning("Client is not authorized"); *code = respond_error(connection, MHD_HTTP_UNAUTHORIZED, "Client certificate not signed by recognized authority"); } |