diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/journal/journal-remote-parse.c | 14 | ||||
| -rw-r--r-- | src/journal/journal-remote-parse.h | 1 | ||||
| -rw-r--r-- | src/journal/journal-remote.c | 491 | ||||
| -rw-r--r-- | src/journal/microhttpd-util.c | 17 | 
4 files changed, 490 insertions, 33 deletions
| diff --git a/src/journal/journal-remote-parse.c b/src/journal/journal-remote-parse.c index ee2260c5a9..c961844c44 100644 --- a/src/journal/journal-remote-parse.c +++ b/src/journal/journal-remote-parse.c @@ -100,6 +100,20 @@ static int get_line(RemoteSource *source, char **line, size_t *size) {          return 1;  } +int push_data(RemoteSource *source, const char *data, size_t size) { +        assert(source); +        assert(source->state != STATE_EOF); + +        if (!GREEDY_REALLOC(source->buf, source->size, +                            source->filled + size)) +                return log_oom(); + +        memcpy(source->buf + source->filled, data, size); +        source->filled += size; + +        return 0; +} +  static int fill_fixed_size(RemoteSource *source, void **data, size_t size) {          int n;          char *newbuf = NULL; diff --git a/src/journal/journal-remote-parse.h b/src/journal/journal-remote-parse.h index 3bda97e2d4..c1506d118d 100644 --- a/src/journal/journal-remote-parse.h +++ b/src/journal/journal-remote-parse.h @@ -57,4 +57,5 @@ static inline int source_non_empty(RemoteSource *source) {  void source_free(RemoteSource *source);  int process_data(RemoteSource *source); +int push_data(RemoteSource *source, const char *data, size_t size);  int process_source(RemoteSource *source, Writer *writer, bool compress, bool seal); diff --git a/src/journal/journal-remote.c b/src/journal/journal-remote.c index f8979daca6..1a1ca2c480 100644 --- a/src/journal/journal-remote.c +++ b/src/journal/journal-remote.c @@ -41,6 +41,13 @@  #include "build.h"  #include "macro.h"  #include "strv.h" +#include "fileio.h" +#include "socket-util.h" +#include "microhttpd-util.h" + +#ifdef HAVE_GNUTLS +#include <gnutls/gnutls.h> +#endif  #include "journal-remote-parse.h"  #include "journal-remote-write.h" @@ -52,9 +59,15 @@ static char* arg_url = NULL;  static char* arg_getter = NULL;  static bool arg_stdin = false;  static char* arg_listen_raw = NULL; +static char* arg_listen_http = NULL; +static char* arg_listen_https = NULL;  static int arg_compress = true;  static int arg_seal = false; +static char *key_pem = NULL; +static char *cert_pem = NULL; +static char *trust_pem = NULL; +  /**********************************************************************   **********************************************************************   **********************************************************************/ @@ -201,6 +214,17 @@ static int open_output(Writer *s, const char* url) {          return r;  } +/********************************************************************** + ********************************************************************** + **********************************************************************/ + +typedef struct MHDDaemonWrapper { +        uint64_t fd; +        struct MHD_Daemon *daemon; + +        sd_event_source *event; +} MHDDaemonWrapper; +  typedef struct RemoteServer {          RemoteSource **sources;          ssize_t sources_size; @@ -210,8 +234,13 @@ typedef struct RemoteServer {          sd_event_source *sigterm_event, *sigint_event, *listen_event;          Writer writer; + +        Hashmap *daemons;  } RemoteServer; +/* This should go away as soon as µhttpd allows state to be passed around. */ +static RemoteServer *server; +  static int dispatch_raw_source_event(sd_event_source *event,                                       int fd,                                       uint32_t revents, @@ -220,6 +249,10 @@ static int dispatch_raw_connection_event(sd_event_source *event,                                           int fd,                                           uint32_t revents,                                           void *userdata); +static int dispatch_http_event(sd_event_source *event, +                               int fd, +                               uint32_t revents, +                               void *userdata);  static int get_source_for_fd(RemoteServer *s, int fd, RemoteSource **source) {          assert(fd >= 0); @@ -325,6 +358,285 @@ static int setup_raw_socket(RemoteServer *s, const char *address) {   **********************************************************************   **********************************************************************/ +static RemoteSource *request_meta(void **connection_cls) { +        RemoteSource *source; + +        assert(connection_cls); +        if (*connection_cls) +                return *connection_cls; + +        source = new0(RemoteSource, 1); +        if (!source) +                return NULL; +        source->fd = -1; + +        log_debug("Added RemoteSource as connection metadata %p", source); + +        *connection_cls = source; +        return source; +} + +static void request_meta_free(void *cls, +                              struct MHD_Connection *connection, +                              void **connection_cls, +                              enum MHD_RequestTerminationCode toe) { +        RemoteSource *s; + +        assert(connection_cls); +        s = *connection_cls; + +        log_debug("Cleaning up connection metadata %p", s); +        source_free(s); +        *connection_cls = NULL; +} + +static int process_http_upload( +                struct MHD_Connection *connection, +                const char *upload_data, +                size_t *upload_data_size, +                RemoteSource *source) { + +        bool finished = false; +        int r; + +        assert(source); + +        log_debug("request_handler_upload: connection %p, %zu bytes", +                  connection, *upload_data_size); + +        if (*upload_data_size) { +                log_info("Received %zu bytes", *upload_data_size); + +                r = push_data(source, upload_data, *upload_data_size); +                if (r < 0) { +                        log_error("Failed to store received data of size %zu: %s", +                                  *upload_data_size, strerror(-r)); +                        return respond_oom_internal(connection); +                } +                *upload_data_size = 0; +        } else +                finished = true; + +        while (true) { +                r = process_source(source, &server->writer, arg_compress, arg_seal); +                if (r == -E2BIG) +                        log_warning("Entry too big, skipped"); +                else if (r == -EAGAIN || r == -EWOULDBLOCK) +                        break; +                else if (r < 0) { +                        log_warning("Failed to process data for connection %p", connection); +                        return respond_error(connection, MHD_HTTP_UNPROCESSABLE_ENTITY, +                                             "Processing failed: %s", strerror(-r)); +                } +        } + +        if (!finished) +                return MHD_YES; + +        /* The upload is finished */ + +        if (source_non_empty(source)) { +                log_warning("EOF reached with incomplete data"); +                return respond_error(connection, MHD_HTTP_EXPECTATION_FAILED, +                                     "Trailing data not processed."); +        } + +        return respond_error(connection, MHD_HTTP_ACCEPTED, "OK.\n"); +}; + +static int request_handler( +                void *cls, +                struct MHD_Connection *connection, +                const char *url, +                const char *method, +                const char *version, +                const char *upload_data, +                size_t *upload_data_size, +                void **connection_cls) { + +        const char *header; +        int r ,code; + +        assert(connection); +        assert(connection_cls); +        assert(url); +        assert(method); + +        log_debug("Handling a connection %s %s %s", method, url, version); + +        if (*connection_cls) +                return process_http_upload(connection, +                                           upload_data, upload_data_size, +                                           *connection_cls); + +        if (!streq(method, "POST")) +                return respond_error(connection, MHD_HTTP_METHOD_NOT_ACCEPTABLE, +                                     "Unsupported method.\n"); + +        if (!streq(url, "/upload")) +                return respond_error(connection, MHD_HTTP_NOT_FOUND, +                                     "Not found.\n"); + +        header = MHD_lookup_connection_value(connection, +                                             MHD_HEADER_KIND, "Content-Type"); +        if (!header || !streq(header, "application/vnd.fdo.journal")) +                return respond_error(connection, MHD_HTTP_UNSUPPORTED_MEDIA_TYPE, +                                     "Content-Type: application/vnd.fdo.journal" +                                     " is required.\n"); + +        if (trust_pem) { +                r = check_permissions(connection, &code); +                if (r < 0) +                        return code; +        } + +        if (!request_meta(connection_cls)) +                return respond_oom(connection); +        return MHD_YES; +} + +static int setup_microhttpd_server(RemoteServer *s, int fd, bool https) { +        struct MHD_OptionItem opts[] = { +                { MHD_OPTION_NOTIFY_COMPLETED, (intptr_t) request_meta_free}, +                { MHD_OPTION_EXTERNAL_LOGGER, (intptr_t) microhttpd_logger}, +                { MHD_OPTION_LISTEN_SOCKET, fd}, +                { MHD_OPTION_END}, +                { MHD_OPTION_END}, +                { MHD_OPTION_END}, +                { MHD_OPTION_END}}; +        int opts_pos = 3; +        int flags = +                MHD_USE_DEBUG | +                MHD_USE_PEDANTIC_CHECKS | +                MHD_USE_EPOLL_LINUX_ONLY | +                MHD_USE_DUAL_STACK; + +        const union MHD_DaemonInfo *info; +        int r, epoll_fd; +        MHDDaemonWrapper *d; + +        assert(fd >= 0); + +        r = fd_nonblock(fd, true); +        if (r < 0) { +                log_error("Failed to make fd:%d nonblocking: %s", fd, strerror(-r)); +                return r; +        } + +        if (https) { +                opts[opts_pos++] = (struct MHD_OptionItem) +                        {MHD_OPTION_HTTPS_MEM_KEY, 0, key_pem}; +                opts[opts_pos++] = (struct MHD_OptionItem) +                        {MHD_OPTION_HTTPS_MEM_CERT, 0, cert_pem}; + +                flags |= MHD_USE_SSL; + +                if (trust_pem) +                        opts[opts_pos++] = (struct MHD_OptionItem) +                                {MHD_OPTION_HTTPS_MEM_TRUST, 0, trust_pem}; +        } + +        d = new(MHDDaemonWrapper, 1); +        if (!d) +                return log_oom(); + +        d->fd = (uint64_t) fd; + +        d->daemon = MHD_start_daemon(flags, 0, +                                     NULL, NULL, +                                     request_handler, NULL, +                                     MHD_OPTION_ARRAY, opts, +                                     MHD_OPTION_END); +        if (!d->daemon) { +                log_error("Failed to start µhttp daemon"); +                r = -EINVAL; +                goto error; +        } + +        log_debug("Started MHD %s daemon on fd:%d (wrapper @ %p)", +                  https ? "HTTPS" : "HTTP", fd, d); + + +        info = MHD_get_daemon_info(d->daemon, MHD_DAEMON_INFO_EPOLL_FD_LINUX_ONLY); +        if (!info) { +                log_error("µhttp returned NULL daemon info"); +                r = -ENOTSUP; +                goto error; +        } + +        epoll_fd = info->listen_fd; +        if (epoll_fd < 0) { +                log_error("µhttp epoll fd is invalid"); +                r = -EUCLEAN; +                goto error; +        } + +        r = sd_event_add_io(s->events, &d->event, +                            epoll_fd, EPOLLIN, dispatch_http_event, d); +        if (r < 0) { +                log_error("Failed to add event callback: %s", strerror(-r)); +                goto error; +        } + +        r = hashmap_ensure_allocated(&s->daemons, uint64_hash_func, uint64_compare_func); +        if (r < 0) { +                log_oom(); +                goto error; +        } + +        r = hashmap_put(s->daemons, &d->fd, d); +        if (r < 0) { +                log_error("Failed to add daemon to hashmap: %s", strerror(-r)); +                goto error; +        } + +        s->active ++; +        return 0; + +error: +        MHD_stop_daemon(d->daemon); +        free(d->daemon); +        free(d); +        return r; +} + +static int setup_microhttpd_socket(RemoteServer *s, +                                   const char *address, +                                   bool https) { +        int fd; + +        fd = make_socket_fd(LOG_INFO, address, SOCK_STREAM | SOCK_CLOEXEC); +        if (fd < 0) +                return fd; + +        return setup_microhttpd_server(s, fd, https); +} + +static int dispatch_http_event(sd_event_source *event, +                               int fd, +                               uint32_t revents, +                               void *userdata) { +        MHDDaemonWrapper *d = userdata; +        int r; + +        assert(d); + +        log_info("%s", __func__); + +        r = MHD_run(d->daemon); +        if (r == MHD_NO) { +                log_error("MHD_run failed!"); +                // XXX: unregister daemon +                return -EINVAL; +        } + +        return 1; /* work to do */ +} + +/********************************************************************** + ********************************************************************** + **********************************************************************/ +  static int dispatch_sigterm(sd_event_source *event,                              const struct signalfd_siginfo *si,                              void *userdata) { @@ -369,6 +681,9 @@ static int remoteserver_init(RemoteServer *s) {          setup_signals(s); +        assert(server == NULL); +        server = s; +          n = sd_listen_fds(true);          if (n < 0) {                  log_error("Failed to read listening file descriptors from environment: %s", @@ -426,6 +741,22 @@ static int remoteserver_init(RemoteServer *s) {                  output_name = arg_listen_raw;          } +        if (arg_listen_http) { +                r = setup_microhttpd_socket(s, arg_listen_http, false); +                if (r < 0) +                        return r; + +                output_name = arg_listen_http; +        } + +        if (arg_listen_https) { +                r = setup_microhttpd_socket(s, arg_listen_https, true); +                if (r < 0) +                        return r; + +                output_name = arg_listen_https; +        } +          if (arg_stdin) {                  log_info("Reading standard input...");                  r = add_source(s, STDIN_FILENO, "stdin"); @@ -454,11 +785,20 @@ static int remoteserver_init(RemoteServer *s) {  static int server_destroy(RemoteServer *s) {          int r;          ssize_t i; +        MHDDaemonWrapper *d;          r = writer_close(&s->writer); +        while ((d = hashmap_steal_first(s->daemons))) { +                MHD_stop_daemon(d->daemon); +                sd_event_source_unref(d->event); +                free(d); +        } + +        hashmap_free(s->daemons); +          assert(s->sources_size == 0 || s->sources); -        for(i = 0; i < s->sources_size; i++) +        for (i = 0; i < s->sources_size; i++)                  remove_source(s, i);          free(s->sources); @@ -506,56 +846,61 @@ static int dispatch_raw_source_event(sd_event_source *event,          return r;  } -static int dispatch_raw_connection_event(sd_event_source *event, -                                         int fd, -                                         uint32_t revents, -                                         void *userdata) { -        RemoteServer *s = userdata; - -        SocketAddress addr = { -                .size = sizeof(union sockaddr_union), -                .type = SOCK_STREAM, -        }; +static int accept_connection(const char* type, int fd, SocketAddress *addr) {          int fd2, r; -        log_debug("Accepting new connection on fd:%d", fd); -        fd2 = accept4(fd, &addr.sockaddr.sa, &addr.size, SOCK_NONBLOCK|SOCK_CLOEXEC); +        log_debug("Accepting new %s connection on fd:%d", type, fd); +        fd2 = accept4(fd, &addr->sockaddr.sa, &addr->size, SOCK_NONBLOCK|SOCK_CLOEXEC);          if (fd2 < 0) {                  log_error("accept() on fd:%d failed: %m", fd);                  return -errno;          } -        switch(socket_address_family(&addr)) { +        switch(socket_address_family(addr)) {          case AF_INET:          case AF_INET6: {                  char* _cleanup_free_ a = NULL; -                r = socket_address_print(&addr, &a); +                r = socket_address_print(addr, &a);                  if (r < 0) {                          log_error("socket_address_print(): %s", strerror(-r));                          close(fd2);                          return r;                  } -                log_info("Accepted %s connection from %s", -                         socket_address_family(&addr) == AF_INET ? "IP" : "IPv6", +                log_info("Accepted %s %s connection from %s", +                         type, +                         socket_address_family(addr) == AF_INET ? "IP" : "IPv6",                           a); -                break; + +                return fd2;          };          default: -                log_error("Connection with unsupported family %d", -                          socket_address_family(&addr)); +                log_error("Rejected %s connection with unsupported family %d", +                          type, socket_address_family(addr));                  close(fd2); +                  return -EINVAL;          } +} -        r = add_source(s, fd2, NULL); -        if (r < 0) -                log_error("failed to create source from fd:%d: %s", fd2, strerror(-r)); +static int dispatch_raw_connection_event(sd_event_source *event, +                                         int fd, +                                         uint32_t revents, +                                         void *userdata) { +        RemoteServer *s = userdata; +        int fd2; +        SocketAddress addr = { +                .size = sizeof(union sockaddr_union), +                .type = SOCK_STREAM, +        }; -        return r; -} +        fd2 = accept_connection("raw", fd, &addr); +        if (fd2 < 0) +                return fd2; +        return add_source(s, fd2, NULL); +}  /**********************************************************************   ********************************************************************** @@ -568,6 +913,8 @@ static int help(void) {                 "  --url=URL            Read events from systemd-journal-gatewayd at URL\n"                 "  --getter=COMMAND     Read events from the output of COMMAND\n"                 "  --listen-raw=ADDR    Listen for connections at ADDR\n" +               "  --listen-http=ADDR   Listen for HTTP connections at ADDR\n" +               "  --listen-https=ADDR  Listen for HTTPS connections at ADDR\n"                 "  --stdin              Read events from standard input\n"                 "  -o --output=FILE|DIR Write output to FILE or DIR/external-*.journal\n"                 "  --[no-]compress      Use XZ-compression in the output journal (default: yes)\n" @@ -586,12 +933,17 @@ static int parse_argv(int argc, char *argv[]) {                  ARG_VERSION = 0x100,                  ARG_URL,                  ARG_LISTEN_RAW, +                ARG_LISTEN_HTTP, +                ARG_LISTEN_HTTPS,                  ARG_STDIN,                  ARG_GETTER,                  ARG_COMPRESS,                  ARG_NO_COMPRESS,                  ARG_SEAL,                  ARG_NO_SEAL, +                ARG_KEY, +                ARG_CERT, +                ARG_TRUST,          };          static const struct option options[] = { @@ -600,16 +952,21 @@ static int parse_argv(int argc, char *argv[]) {                  { "url",          required_argument, NULL, ARG_URL          },                  { "getter",       required_argument, NULL, ARG_GETTER       },                  { "listen-raw",   required_argument, NULL, ARG_LISTEN_RAW   }, +                { "listen-http",  required_argument, NULL, ARG_LISTEN_HTTP  }, +                { "listen-https", required_argument, NULL, ARG_LISTEN_HTTPS },                  { "stdin",        no_argument,       NULL, ARG_STDIN        },                  { "output",       required_argument, NULL, 'o'              },                  { "compress",     no_argument,       NULL, ARG_COMPRESS     },                  { "no-compress",  no_argument,       NULL, ARG_NO_COMPRESS  },                  { "seal",         no_argument,       NULL, ARG_SEAL         },                  { "no-seal",      no_argument,       NULL, ARG_NO_SEAL      }, +                { "key",          required_argument, NULL, ARG_KEY          }, +                { "cert",         required_argument, NULL, ARG_CERT         }, +                { "trust",        required_argument, NULL, ARG_TRUST        },                  {}          }; -        int c; +        int c, r;          assert(argc >= 0);          assert(argv); @@ -652,6 +1009,67 @@ static int parse_argv(int argc, char *argv[]) {                          arg_listen_raw = optarg;                          break; +                case ARG_LISTEN_HTTP: +                        if (arg_listen_http) { +                                log_error("cannot currently use --listen-http more than once"); +                                return -EINVAL; +                        } + +                        arg_listen_http = optarg; +                        break; + +                case ARG_LISTEN_HTTPS: +                        if (arg_listen_https) { +                                log_error("cannot currently use --listen-https more than once"); +                                return -EINVAL; +                        } + +                        arg_listen_https = optarg; +                        break; + +                case ARG_KEY: +                        if (key_pem) { +                                log_error("Key file specified twice"); +                                return -EINVAL; +                        } +                        r = read_full_file(optarg, &key_pem, NULL); +                        if (r < 0) { +                                log_error("Failed to read key file: %s", strerror(-r)); +                                return r; +                        } +                        assert(key_pem); +                        break; + +                case ARG_CERT: +                        if (cert_pem) { +                                log_error("Certificate file specified twice"); +                                return -EINVAL; +                        } +                        r = read_full_file(optarg, &cert_pem, NULL); +                        if (r < 0) { +                                log_error("Failed to read certificate file: %s", strerror(-r)); +                                return r; +                        } +                        assert(cert_pem); +                        break; + +                case ARG_TRUST: +#ifdef HAVE_GNUTLS +                        if (trust_pem) { +                                log_error("CA certificate file specified twice"); +                                return -EINVAL; +                        } +                        r = read_full_file(optarg, &trust_pem, NULL); +                        if (r < 0) { +                                log_error("Failed to read CA certificate file: %s", strerror(-r)); +                                return r; +                        } +                        assert(trust_pem); +                        break; +#else +                        log_error("Option --trust is not available."); +#endif +                  case ARG_STDIN:                          arg_stdin = true;                          break; @@ -686,6 +1104,11 @@ static int parse_argv(int argc, char *argv[]) {                          return -EINVAL;                  } +        if (arg_listen_https && !(key_pem && cert_pem)) { +                log_error("Options --key and --cert must be used when https sources are specified"); +                return -EINVAL; +        } +          if (optind < argc) {                  log_error("This program takes no positional arguments");                  return -EINVAL; @@ -694,6 +1117,18 @@ static int parse_argv(int argc, char *argv[]) {          return 1 /* work to do */;  } +static int setup_gnutls_logger(void) { +        if (!arg_listen_http && !arg_listen_https) +                return 0; + +#ifdef HAVE_GNUTLS +        gnutls_global_set_log_function(log_func_gnutls); +        gnutls_global_set_log_level(GNUTLS_LOG_LEVEL); +#endif + +        return 0; +} +  int main(int argc, char **argv) {          RemoteServer s = {};          int r, r2; @@ -706,6 +1141,10 @@ int main(int argc, char **argv) {          if (r <= 0)                  return r == 0 ? EXIT_SUCCESS : EXIT_FAILURE; +        r = setup_gnutls_logger(); +        if (r < 0) +                return EXIT_FAILURE; +          if (remoteserver_init(&s) < 0)                  return EXIT_FAILURE; diff --git a/src/journal/microhttpd-util.c b/src/journal/microhttpd-util.c index 17135abf8b..f7f12e1a8e 100644 --- a/src/journal/microhttpd-util.c +++ b/src/journal/microhttpd-util.c @@ -49,13 +49,14 @@ void microhttpd_logger(void *arg, const char *fmt, va_list ap) {  int respond_oom_internal(struct MHD_Connection *connection) { +        const char *m = "Out of memory.\n"; +          struct MHD_Response *response; -        const char m[] = "Out of memory.\n";          int ret;          assert(connection); -        response = MHD_create_response_from_buffer(sizeof(m)-1, (char*) m, MHD_RESPMEM_PERSISTENT); +        response = MHD_create_response_from_buffer(strlen(m), (char*) m, MHD_RESPMEM_PERSISTENT);          if (!response)                  return MHD_NO; @@ -92,7 +93,7 @@ int respond_error(struct MHD_Connection *connection,                  return respond_oom(connection);          } -        log_debug("queing response %u: %s", code, m); +        log_debug("Queing response %u: %s", code, m);          MHD_add_response_header(response, "Content-Type", "text/plain");          r = MHD_queue_response(connection, code, response);          MHD_destroy_response(response); @@ -227,8 +228,10 @@ int check_permissions(struct MHD_Connection *connection, int *code) {          ci = MHD_get_connection_info(connection,                                       MHD_CONNECTION_INFO_GNUTLS_SESSION);          if (!ci) { -                log_error("MHD_get_connection_info failed"); -                return -EINVAL; +                log_error("MHD_get_connection_info failed: session is unencrypted"); +                *code = respond_error(connection, MHD_HTTP_FORBIDDEN, +                                      "Encrypted connection is required"); +                return -EPERM;          }          session = ci->tls_session;          assert(session); @@ -247,11 +250,11 @@ int check_permissions(struct MHD_Connection *connection, int *code) {                  return -EPERM;          } -        log_info("Connection from %s", buf); +        log_info("Connection from DN %s", buf);          r = verify_cert_authorized(session);          if (r < 0) { -                log_error("Client is not authorized"); +                log_warning("Client is not authorized");                  *code = respond_error(connection, MHD_HTTP_UNAUTHORIZED,                                        "Client certificate not signed by recognized authority");          } | 
