diff options
author | Luke Shumaker <lukeshu@sbcglobal.net> | 2016-06-09 03:13:06 -0400 |
---|---|---|
committer | Luke Shumaker <lukeshu@sbcglobal.net> | 2016-06-09 03:13:06 -0400 |
commit | 8231d17032fdd535e58c5aead8eae3f6cbc048b5 (patch) | |
tree | 799df8c0f203b8369b602155d3233bc9661772db /src/grp-journal-remote/systemd-journal-upload | |
parent | c533334ea105130b806b2a89512b3ff131ffc3bc (diff) |
./move.sh
Diffstat (limited to 'src/grp-journal-remote/systemd-journal-upload')
5 files changed, 1432 insertions, 0 deletions
diff --git a/src/grp-journal-remote/systemd-journal-upload/Makefile b/src/grp-journal-remote/systemd-journal-upload/Makefile new file mode 100644 index 0000000000..4d8fba8fd0 --- /dev/null +++ b/src/grp-journal-remote/systemd-journal-upload/Makefile @@ -0,0 +1,54 @@ +# -*- Mode: makefile; indent-tabs-mode: t -*- +# +# This file is part of systemd. +# +# Copyright 2010-2012 Lennart Poettering +# Copyright 2010-2012 Kay Sievers +# Copyright 2013 Zbigniew Jędrzejewski-Szmek +# Copyright 2013 David Strauss +# Copyright 2016 Luke Shumaker +# +# systemd is free software; you can redistribute it and/or modify it +# under the terms of the GNU Lesser General Public License as published by +# the Free Software Foundation; either version 2.1 of the License, or +# (at your option) any later version. +# +# systemd is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with systemd; If not, see <http://www.gnu.org/licenses/>. +include $(dir $(lastword $(MAKEFILE_LIST)))/../../../config.mk +include $(topsrcdir)/build-aux/Makefile.head.mk + +ifneq ($(HAVE_LIBCURL),) +libexec_PROGRAMS += \ + systemd-journal-upload + +systemd_journal_upload_SOURCES = \ + src/journal-remote/journal-upload.h \ + src/journal-remote/journal-upload.c \ + src/journal-remote/journal-upload-journal.c + +systemd_journal_upload_CFLAGS = \ + $(AM_CFLAGS) \ + $(LIBCURL_CFLAGS) + +systemd_journal_upload_LDADD = \ + libshared.la \ + $(LIBCURL_LIBS) + +nodist_systemunit_DATA += \ + units/systemd-journal-upload.service + +nodist_pkgsysconf_DATA += \ + src/journal-remote/journal-upload.conf +endif + +EXTRA_DIST += \ + units/systemd-journal-upload.service.in \ + src/journal-remote/journal-upload.conf.in + +include $(topsrcdir)/build-aux/Makefile.tail.mk diff --git a/src/grp-journal-remote/systemd-journal-upload/journal-upload-journal.c b/src/grp-journal-remote/systemd-journal-upload/journal-upload-journal.c new file mode 100644 index 0000000000..aef095c8c9 --- /dev/null +++ b/src/grp-journal-remote/systemd-journal-upload/journal-upload-journal.c @@ -0,0 +1,422 @@ +/*** + This file is part of systemd. + + Copyright 2014 Zbigniew Jędrzejewski-Szmek + + systemd is free software; you can redistribute it and/or modify it + under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 2.1 of the License, or + (at your option) any later version. + + systemd is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with systemd; If not, see <http://www.gnu.org/licenses/>. +***/ + +#include <curl/curl.h> +#include <stdbool.h> + +#include "alloc-util.h" +#include "journal-upload.h" +#include "log.h" +#include "utf8.h" +#include "util.h" +#include <systemd/sd-daemon.h> + +/** + * Write up to size bytes to buf. Return negative on error, and number of + * bytes written otherwise. The last case is a kind of an error too. + */ +static ssize_t write_entry(char *buf, size_t size, Uploader *u) { + int r; + size_t pos = 0; + + assert(size <= SSIZE_MAX); + + for (;;) { + + switch(u->entry_state) { + case ENTRY_CURSOR: { + u->current_cursor = mfree(u->current_cursor); + + r = sd_journal_get_cursor(u->journal, &u->current_cursor); + if (r < 0) + return log_error_errno(r, "Failed to get cursor: %m"); + + r = snprintf(buf + pos, size - pos, + "__CURSOR=%s\n", u->current_cursor); + if (pos + r > size) + /* not enough space */ + return pos; + + u->entry_state++; + + if (pos + r == size) { + /* exactly one character short, but we don't need it */ + buf[size - 1] = '\n'; + return size; + } + + pos += r; + } /* fall through */ + + case ENTRY_REALTIME: { + usec_t realtime; + + r = sd_journal_get_realtime_usec(u->journal, &realtime); + if (r < 0) + return log_error_errno(r, "Failed to get realtime timestamp: %m"); + + r = snprintf(buf + pos, size - pos, + "__REALTIME_TIMESTAMP="USEC_FMT"\n", realtime); + if (r + pos > size) + /* not enough space */ + return pos; + + u->entry_state++; + + if (r + pos == size) { + /* exactly one character short, but we don't need it */ + buf[size - 1] = '\n'; + return size; + } + + pos += r; + } /* fall through */ + + case ENTRY_MONOTONIC: { + usec_t monotonic; + sd_id128_t boot_id; + + r = sd_journal_get_monotonic_usec(u->journal, &monotonic, &boot_id); + if (r < 0) + return log_error_errno(r, "Failed to get monotonic timestamp: %m"); + + r = snprintf(buf + pos, size - pos, + "__MONOTONIC_TIMESTAMP="USEC_FMT"\n", monotonic); + if (r + pos > size) + /* not enough space */ + return pos; + + u->entry_state++; + + if (r + pos == size) { + /* exactly one character short, but we don't need it */ + buf[size - 1] = '\n'; + return size; + } + + pos += r; + } /* fall through */ + + case ENTRY_BOOT_ID: { + sd_id128_t boot_id; + char sid[33]; + + r = sd_journal_get_monotonic_usec(u->journal, NULL, &boot_id); + if (r < 0) + return log_error_errno(r, "Failed to get monotonic timestamp: %m"); + + r = snprintf(buf + pos, size - pos, + "_BOOT_ID=%s\n", sd_id128_to_string(boot_id, sid)); + if (r + pos > size) + /* not enough space */ + return pos; + + u->entry_state++; + + if (r + pos == size) { + /* exactly one character short, but we don't need it */ + buf[size - 1] = '\n'; + return size; + } + + pos += r; + } /* fall through */ + + case ENTRY_NEW_FIELD: { + u->field_pos = 0; + + r = sd_journal_enumerate_data(u->journal, + &u->field_data, + &u->field_length); + if (r < 0) + return log_error_errno(r, "Failed to move to next field in entry: %m"); + else if (r == 0) { + u->entry_state = ENTRY_OUTRO; + continue; + } + + if (!utf8_is_printable_newline(u->field_data, + u->field_length, false)) { + u->entry_state = ENTRY_BINARY_FIELD_START; + continue; + } + + u->entry_state++; + } /* fall through */ + + case ENTRY_TEXT_FIELD: + case ENTRY_BINARY_FIELD: { + bool done; + size_t tocopy; + + done = size - pos > u->field_length - u->field_pos; + if (done) + tocopy = u->field_length - u->field_pos; + else + tocopy = size - pos; + + memcpy(buf + pos, + (char*) u->field_data + u->field_pos, + tocopy); + + if (done) { + buf[pos + tocopy] = '\n'; + pos += tocopy + 1; + u->entry_state = ENTRY_NEW_FIELD; + continue; + } else { + u->field_pos += tocopy; + return size; + } + } + + case ENTRY_BINARY_FIELD_START: { + const char *c; + size_t len; + + c = memchr(u->field_data, '=', u->field_length); + if (!c || c == u->field_data) { + log_error("Invalid field."); + return -EINVAL; + } + + len = c - (const char*)u->field_data; + + /* need space for label + '\n' */ + if (size - pos < len + 1) + return pos; + + memcpy(buf + pos, u->field_data, len); + buf[pos + len] = '\n'; + pos += len + 1; + + u->field_pos = len + 1; + u->entry_state++; + } /* fall through */ + + case ENTRY_BINARY_FIELD_SIZE: { + uint64_t le64; + + /* need space for uint64_t */ + if (size - pos < 8) + return pos; + + le64 = htole64(u->field_length - u->field_pos); + memcpy(buf + pos, &le64, 8); + pos += 8; + + u->entry_state++; + continue; + } + + case ENTRY_OUTRO: + /* need space for '\n' */ + if (size - pos < 1) + return pos; + + buf[pos++] = '\n'; + u->entry_state++; + u->entries_sent++; + + return pos; + + default: + assert_not_reached("WTF?"); + } + } + assert_not_reached("WTF?"); +} + +static inline void check_update_watchdog(Uploader *u) { + usec_t after; + usec_t elapsed_time; + + if (u->watchdog_usec <= 0) + return; + + after = now(CLOCK_MONOTONIC); + elapsed_time = usec_sub(after, u->watchdog_timestamp); + if (elapsed_time > u->watchdog_usec / 2) { + log_debug("Update watchdog timer"); + sd_notify(false, "WATCHDOG=1"); + u->watchdog_timestamp = after; + } +} + +static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void *userp) { + Uploader *u = userp; + int r; + sd_journal *j; + size_t filled = 0; + ssize_t w; + + assert(u); + assert(nmemb <= SSIZE_MAX / size); + + check_update_watchdog(u); + + j = u->journal; + + while (j && filled < size * nmemb) { + if (u->entry_state == ENTRY_DONE) { + r = sd_journal_next(j); + if (r < 0) { + log_error_errno(r, "Failed to move to next entry in journal: %m"); + return CURL_READFUNC_ABORT; + } else if (r == 0) { + if (u->input_event) + log_debug("No more entries, waiting for journal."); + else { + log_info("No more entries, closing journal."); + close_journal_input(u); + } + + u->uploading = false; + + break; + } + + u->entry_state = ENTRY_CURSOR; + } + + w = write_entry((char*)buf + filled, size * nmemb - filled, u); + if (w < 0) + return CURL_READFUNC_ABORT; + filled += w; + + if (filled == 0) { + log_error("Buffer space is too small to write entry."); + return CURL_READFUNC_ABORT; + } else if (u->entry_state != ENTRY_DONE) + /* This means that all available space was used up */ + break; + + log_debug("Entry %zu (%s) has been uploaded.", + u->entries_sent, u->current_cursor); + } + + return filled; +} + +void close_journal_input(Uploader *u) { + assert(u); + + if (u->journal) { + log_debug("Closing journal input."); + + sd_journal_close(u->journal); + u->journal = NULL; + } + u->timeout = 0; +} + +static int process_journal_input(Uploader *u, int skip) { + int r; + + if (u->uploading) + return 0; + + r = sd_journal_next_skip(u->journal, skip); + if (r < 0) + return log_error_errno(r, "Failed to skip to next entry: %m"); + else if (r < skip) + return 0; + + /* have data */ + u->entry_state = ENTRY_CURSOR; + return start_upload(u, journal_input_callback, u); +} + +int check_journal_input(Uploader *u) { + if (u->input_event) { + int r; + + r = sd_journal_process(u->journal); + if (r < 0) { + log_error_errno(r, "Failed to process journal: %m"); + close_journal_input(u); + return r; + } + + if (r == SD_JOURNAL_NOP) + return 0; + } + + return process_journal_input(u, 1); +} + +static int dispatch_journal_input(sd_event_source *event, + int fd, + uint32_t revents, + void *userp) { + Uploader *u = userp; + + assert(u); + + if (u->uploading) + return 0; + + log_debug("Detected journal input, checking for new data."); + return check_journal_input(u); +} + +int open_journal_for_upload(Uploader *u, + sd_journal *j, + const char *cursor, + bool after_cursor, + bool follow) { + int fd, r, events; + + u->journal = j; + + sd_journal_set_data_threshold(j, 0); + + if (follow) { + fd = sd_journal_get_fd(j); + if (fd < 0) + return log_error_errno(fd, "sd_journal_get_fd failed: %m"); + + events = sd_journal_get_events(j); + + r = sd_journal_reliable_fd(j); + assert(r >= 0); + if (r > 0) + u->timeout = -1; + else + u->timeout = JOURNAL_UPLOAD_POLL_TIMEOUT; + + r = sd_event_add_io(u->events, &u->input_event, + fd, events, dispatch_journal_input, u); + if (r < 0) + return log_error_errno(r, "Failed to register input event: %m"); + + log_debug("Listening for journal events on fd:%d, timeout %d", + fd, u->timeout == (uint64_t) -1 ? -1 : (int) u->timeout); + } else + log_debug("Not listening for journal events."); + + if (cursor) { + r = sd_journal_seek_cursor(j, cursor); + if (r < 0) + return log_error_errno(r, "Failed to seek to cursor %s: %m", + cursor); + } + + return process_journal_input(u, 1 + !!after_cursor); +} diff --git a/src/grp-journal-remote/systemd-journal-upload/journal-upload.c b/src/grp-journal-remote/systemd-journal-upload/journal-upload.c new file mode 100644 index 0000000000..e622f6c1e1 --- /dev/null +++ b/src/grp-journal-remote/systemd-journal-upload/journal-upload.c @@ -0,0 +1,880 @@ +/*** + This file is part of systemd. + + Copyright 2014 Zbigniew Jędrzejewski-Szmek + + systemd is free software; you can redistribute it and/or modify it + under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 2.1 of the License, or + (at your option) any later version. + + systemd is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with systemd; If not, see <http://www.gnu.org/licenses/>. +***/ + +#include <curl/curl.h> +#include <fcntl.h> +#include <getopt.h> +#include <stdio.h> +#include <sys/stat.h> + +#include <systemd/sd-daemon.h> + +#include "alloc-util.h" +#include "conf-parser.h" +#include "def.h" +#include "fd-util.h" +#include "fileio.h" +#include "formats-util.h" +#include "glob-util.h" +#include "journal-upload.h" +#include "log.h" +#include "mkdir.h" +#include "parse-util.h" +#include "sigbus.h" +#include "signal-util.h" +#include "string-util.h" +#include "util.h" + +#define PRIV_KEY_FILE CERTIFICATE_ROOT "/private/journal-upload.pem" +#define CERT_FILE CERTIFICATE_ROOT "/certs/journal-upload.pem" +#define TRUST_FILE CERTIFICATE_ROOT "/ca/trusted.pem" +#define DEFAULT_PORT 19532 + +static const char* arg_url = NULL; +static const char *arg_key = NULL; +static const char *arg_cert = NULL; +static const char *arg_trust = NULL; +static const char *arg_directory = NULL; +static char **arg_file = NULL; +static const char *arg_cursor = NULL; +static bool arg_after_cursor = false; +static int arg_journal_type = 0; +static const char *arg_machine = NULL; +static bool arg_merge = false; +static int arg_follow = -1; +static const char *arg_save_state = NULL; + +static void close_fd_input(Uploader *u); + +#define SERVER_ANSWER_KEEP 2048 + +#define STATE_FILE "/var/lib/systemd/journal-upload/state" + +#define easy_setopt(curl, opt, value, level, cmd) \ + do { \ + code = curl_easy_setopt(curl, opt, value); \ + if (code) { \ + log_full(level, \ + "curl_easy_setopt " #opt " failed: %s", \ + curl_easy_strerror(code)); \ + cmd; \ + } \ + } while (0) + +static size_t output_callback(char *buf, + size_t size, + size_t nmemb, + void *userp) { + Uploader *u = userp; + + assert(u); + + log_debug("The server answers (%zu bytes): %.*s", + size*nmemb, (int)(size*nmemb), buf); + + if (nmemb && !u->answer) { + u->answer = strndup(buf, size*nmemb); + if (!u->answer) + log_warning_errno(ENOMEM, "Failed to store server answer (%zu bytes): %m", + size*nmemb); + } + + return size * nmemb; +} + +static int check_cursor_updating(Uploader *u) { + _cleanup_free_ char *temp_path = NULL; + _cleanup_fclose_ FILE *f = NULL; + int r; + + if (!u->state_file) + return 0; + + r = mkdir_parents(u->state_file, 0755); + if (r < 0) + return log_error_errno(r, "Cannot create parent directory of state file %s: %m", + u->state_file); + + r = fopen_temporary(u->state_file, &f, &temp_path); + if (r < 0) + return log_error_errno(r, "Cannot save state to %s: %m", + u->state_file); + unlink(temp_path); + + return 0; +} + +static int update_cursor_state(Uploader *u) { + _cleanup_free_ char *temp_path = NULL; + _cleanup_fclose_ FILE *f = NULL; + int r; + + if (!u->state_file || !u->last_cursor) + return 0; + + r = fopen_temporary(u->state_file, &f, &temp_path); + if (r < 0) + goto fail; + + fprintf(f, + "# This is private data. Do not parse.\n" + "LAST_CURSOR=%s\n", + u->last_cursor); + + r = fflush_and_check(f); + if (r < 0) + goto fail; + + if (rename(temp_path, u->state_file) < 0) { + r = -errno; + goto fail; + } + + return 0; + +fail: + if (temp_path) + (void) unlink(temp_path); + + (void) unlink(u->state_file); + + return log_error_errno(r, "Failed to save state %s: %m", u->state_file); +} + +static int load_cursor_state(Uploader *u) { + int r; + + if (!u->state_file) + return 0; + + r = parse_env_file(u->state_file, NEWLINE, + "LAST_CURSOR", &u->last_cursor, + NULL); + + if (r == -ENOENT) + log_debug("State file %s is not present.", u->state_file); + else if (r < 0) + return log_error_errno(r, "Failed to read state file %s: %m", + u->state_file); + else + log_debug("Last cursor was %s", u->last_cursor); + + return 0; +} + + + +int start_upload(Uploader *u, + size_t (*input_callback)(void *ptr, + size_t size, + size_t nmemb, + void *userdata), + void *data) { + CURLcode code; + + assert(u); + assert(input_callback); + + if (!u->header) { + struct curl_slist *h; + + h = curl_slist_append(NULL, "Content-Type: application/vnd.fdo.journal"); + if (!h) + return log_oom(); + + h = curl_slist_append(h, "Transfer-Encoding: chunked"); + if (!h) { + curl_slist_free_all(h); + return log_oom(); + } + + h = curl_slist_append(h, "Accept: text/plain"); + if (!h) { + curl_slist_free_all(h); + return log_oom(); + } + + u->header = h; + } + + if (!u->easy) { + CURL *curl; + + curl = curl_easy_init(); + if (!curl) { + log_error("Call to curl_easy_init failed."); + return -ENOSR; + } + + /* tell it to POST to the URL */ + easy_setopt(curl, CURLOPT_POST, 1L, + LOG_ERR, return -EXFULL); + + easy_setopt(curl, CURLOPT_ERRORBUFFER, u->error, + LOG_ERR, return -EXFULL); + + /* set where to write to */ + easy_setopt(curl, CURLOPT_WRITEFUNCTION, output_callback, + LOG_ERR, return -EXFULL); + + easy_setopt(curl, CURLOPT_WRITEDATA, data, + LOG_ERR, return -EXFULL); + + /* set where to read from */ + easy_setopt(curl, CURLOPT_READFUNCTION, input_callback, + LOG_ERR, return -EXFULL); + + easy_setopt(curl, CURLOPT_READDATA, data, + LOG_ERR, return -EXFULL); + + /* use our special own mime type and chunked transfer */ + easy_setopt(curl, CURLOPT_HTTPHEADER, u->header, + LOG_ERR, return -EXFULL); + + if (_unlikely_(log_get_max_level() >= LOG_DEBUG)) + /* enable verbose for easier tracing */ + easy_setopt(curl, CURLOPT_VERBOSE, 1L, LOG_WARNING, ); + + easy_setopt(curl, CURLOPT_USERAGENT, + "systemd-journal-upload " PACKAGE_STRING, + LOG_WARNING, ); + + if (arg_key || startswith(u->url, "https://")) { + easy_setopt(curl, CURLOPT_SSLKEY, arg_key ?: PRIV_KEY_FILE, + LOG_ERR, return -EXFULL); + easy_setopt(curl, CURLOPT_SSLCERT, arg_cert ?: CERT_FILE, + LOG_ERR, return -EXFULL); + } + + if (streq_ptr(arg_trust, "all")) + easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, 0, + LOG_ERR, return -EUCLEAN); + else if (arg_trust || startswith(u->url, "https://")) + easy_setopt(curl, CURLOPT_CAINFO, arg_trust ?: TRUST_FILE, + LOG_ERR, return -EXFULL); + + if (arg_key || arg_trust) + easy_setopt(curl, CURLOPT_SSLVERSION, CURL_SSLVERSION_TLSv1, + LOG_WARNING, ); + + u->easy = curl; + } else { + /* truncate the potential old error message */ + u->error[0] = '\0'; + + free(u->answer); + u->answer = 0; + } + + /* upload to this place */ + code = curl_easy_setopt(u->easy, CURLOPT_URL, u->url); + if (code) { + log_error("curl_easy_setopt CURLOPT_URL failed: %s", + curl_easy_strerror(code)); + return -EXFULL; + } + + u->uploading = true; + + return 0; +} + +static size_t fd_input_callback(void *buf, size_t size, size_t nmemb, void *userp) { + Uploader *u = userp; + + ssize_t r; + + assert(u); + assert(nmemb <= SSIZE_MAX / size); + + if (u->input < 0) + return 0; + + r = read(u->input, buf, size * nmemb); + log_debug("%s: allowed %zu, read %zd", __func__, size*nmemb, r); + + if (r > 0) + return r; + + u->uploading = false; + if (r == 0) { + log_debug("Reached EOF"); + close_fd_input(u); + return 0; + } else { + log_error_errno(errno, "Aborting transfer after read error on input: %m."); + return CURL_READFUNC_ABORT; + } +} + +static void close_fd_input(Uploader *u) { + assert(u); + + if (u->input >= 0) + close_nointr(u->input); + u->input = -1; + u->timeout = 0; +} + +static int dispatch_fd_input(sd_event_source *event, + int fd, + uint32_t revents, + void *userp) { + Uploader *u = userp; + + assert(u); + assert(fd >= 0); + + if (revents & EPOLLHUP) { + log_debug("Received HUP"); + close_fd_input(u); + return 0; + } + + if (!(revents & EPOLLIN)) { + log_warning("Unexpected poll event %"PRIu32".", revents); + return -EINVAL; + } + + if (u->uploading) { + log_warning("dispatch_fd_input called when uploading, ignoring."); + return 0; + } + + return start_upload(u, fd_input_callback, u); +} + +static int open_file_for_upload(Uploader *u, const char *filename) { + int fd, r = 0; + + if (streq(filename, "-")) + fd = STDIN_FILENO; + else { + fd = open(filename, O_RDONLY|O_CLOEXEC|O_NOCTTY); + if (fd < 0) + return log_error_errno(errno, "Failed to open %s: %m", filename); + } + + u->input = fd; + + if (arg_follow) { + r = sd_event_add_io(u->events, &u->input_event, + fd, EPOLLIN, dispatch_fd_input, u); + if (r < 0) { + if (r != -EPERM || arg_follow > 0) + return log_error_errno(r, "Failed to register input event: %m"); + + /* Normal files should just be consumed without polling. */ + r = start_upload(u, fd_input_callback, u); + } + } + + return r; +} + +static int dispatch_sigterm(sd_event_source *event, + const struct signalfd_siginfo *si, + void *userdata) { + Uploader *u = userdata; + + assert(u); + + log_received_signal(LOG_INFO, si); + + close_fd_input(u); + close_journal_input(u); + + sd_event_exit(u->events, 0); + return 0; +} + +static int setup_signals(Uploader *u) { + int r; + + assert(u); + + assert_se(sigprocmask_many(SIG_SETMASK, NULL, SIGINT, SIGTERM, -1) >= 0); + + r = sd_event_add_signal(u->events, &u->sigterm_event, SIGTERM, dispatch_sigterm, u); + if (r < 0) + return r; + + r = sd_event_add_signal(u->events, &u->sigint_event, SIGINT, dispatch_sigterm, u); + if (r < 0) + return r; + + return 0; +} + +static int setup_uploader(Uploader *u, const char *url, const char *state_file) { + int r; + const char *host, *proto = ""; + + assert(u); + assert(url); + + memzero(u, sizeof(Uploader)); + u->input = -1; + + if (!(host = startswith(url, "http://")) && !(host = startswith(url, "https://"))) { + host = url; + proto = "https://"; + } + + if (strchr(host, ':')) + u->url = strjoin(proto, url, "/upload", NULL); + else { + char *t; + size_t x; + + t = strdupa(url); + x = strlen(t); + while (x > 0 && t[x - 1] == '/') + t[x - 1] = '\0'; + + u->url = strjoin(proto, t, ":" STRINGIFY(DEFAULT_PORT), "/upload", NULL); + } + if (!u->url) + return log_oom(); + + u->state_file = state_file; + + r = sd_event_default(&u->events); + if (r < 0) + return log_error_errno(r, "sd_event_default failed: %m"); + + r = setup_signals(u); + if (r < 0) + return log_error_errno(r, "Failed to set up signals: %m"); + + (void) sd_watchdog_enabled(false, &u->watchdog_usec); + + return load_cursor_state(u); +} + +static void destroy_uploader(Uploader *u) { + assert(u); + + curl_easy_cleanup(u->easy); + curl_slist_free_all(u->header); + free(u->answer); + + free(u->last_cursor); + free(u->current_cursor); + + free(u->url); + + u->input_event = sd_event_source_unref(u->input_event); + + close_fd_input(u); + close_journal_input(u); + + sd_event_source_unref(u->sigterm_event); + sd_event_source_unref(u->sigint_event); + sd_event_unref(u->events); +} + +static int perform_upload(Uploader *u) { + CURLcode code; + long status; + + assert(u); + + u->watchdog_timestamp = now(CLOCK_MONOTONIC); + code = curl_easy_perform(u->easy); + if (code) { + if (u->error[0]) + log_error("Upload to %s failed: %.*s", + u->url, (int) sizeof(u->error), u->error); + else + log_error("Upload to %s failed: %s", + u->url, curl_easy_strerror(code)); + return -EIO; + } + + code = curl_easy_getinfo(u->easy, CURLINFO_RESPONSE_CODE, &status); + if (code) { + log_error("Failed to retrieve response code: %s", + curl_easy_strerror(code)); + return -EUCLEAN; + } + + if (status >= 300) { + log_error("Upload to %s failed with code %ld: %s", + u->url, status, strna(u->answer)); + return -EIO; + } else if (status < 200) { + log_error("Upload to %s finished with unexpected code %ld: %s", + u->url, status, strna(u->answer)); + return -EIO; + } else + log_debug("Upload finished successfully with code %ld: %s", + status, strna(u->answer)); + + free(u->last_cursor); + u->last_cursor = u->current_cursor; + u->current_cursor = NULL; + + return update_cursor_state(u); +} + +static int parse_config(void) { + const ConfigTableItem items[] = { + { "Upload", "URL", config_parse_string, 0, &arg_url }, + { "Upload", "ServerKeyFile", config_parse_path, 0, &arg_key }, + { "Upload", "ServerCertificateFile", config_parse_path, 0, &arg_cert }, + { "Upload", "TrustedCertificateFile", config_parse_path, 0, &arg_trust }, + {}}; + + return config_parse_many(PKGSYSCONFDIR "/journal-upload.conf", + CONF_PATHS_NULSTR("systemd/journal-upload.conf.d"), + "Upload\0", config_item_table_lookup, items, + false, NULL); +} + +static void help(void) { + printf("%s -u URL {FILE|-}...\n\n" + "Upload journal events to a remote server.\n\n" + " -h --help Show this help\n" + " --version Show package version\n" + " -u --url=URL Upload to this address (default port " + STRINGIFY(DEFAULT_PORT) ")\n" + " --key=FILENAME Specify key in PEM format (default:\n" + " \"" PRIV_KEY_FILE "\")\n" + " --cert=FILENAME Specify certificate in PEM format (default:\n" + " \"" CERT_FILE "\")\n" + " --trust=FILENAME|all Specify CA certificate or disable checking (default:\n" + " \"" TRUST_FILE "\")\n" + " --system Use the system journal\n" + " --user Use the user journal for the current user\n" + " -m --merge Use all available journals\n" + " -M --machine=CONTAINER Operate on local container\n" + " -D --directory=PATH Use journal files from directory\n" + " --file=PATH Use this journal file\n" + " --cursor=CURSOR Start at the specified cursor\n" + " --after-cursor=CURSOR Start after the specified cursor\n" + " --follow[=BOOL] Do [not] wait for input\n" + " --save-state[=FILE] Save uploaded cursors (default \n" + " " STATE_FILE ")\n" + " -h --help Show this help and exit\n" + " --version Print version string and exit\n" + , program_invocation_short_name); +} + +static int parse_argv(int argc, char *argv[]) { + enum { + ARG_VERSION = 0x100, + ARG_KEY, + ARG_CERT, + ARG_TRUST, + ARG_USER, + ARG_SYSTEM, + ARG_FILE, + ARG_CURSOR, + ARG_AFTER_CURSOR, + ARG_FOLLOW, + ARG_SAVE_STATE, + }; + + static const struct option options[] = { + { "help", no_argument, NULL, 'h' }, + { "version", no_argument, NULL, ARG_VERSION }, + { "url", required_argument, NULL, 'u' }, + { "key", required_argument, NULL, ARG_KEY }, + { "cert", required_argument, NULL, ARG_CERT }, + { "trust", required_argument, NULL, ARG_TRUST }, + { "system", no_argument, NULL, ARG_SYSTEM }, + { "user", no_argument, NULL, ARG_USER }, + { "merge", no_argument, NULL, 'm' }, + { "machine", required_argument, NULL, 'M' }, + { "directory", required_argument, NULL, 'D' }, + { "file", required_argument, NULL, ARG_FILE }, + { "cursor", required_argument, NULL, ARG_CURSOR }, + { "after-cursor", required_argument, NULL, ARG_AFTER_CURSOR }, + { "follow", optional_argument, NULL, ARG_FOLLOW }, + { "save-state", optional_argument, NULL, ARG_SAVE_STATE }, + {} + }; + + int c, r; + + assert(argc >= 0); + assert(argv); + + opterr = 0; + + while ((c = getopt_long(argc, argv, "hu:mM:D:", options, NULL)) >= 0) + switch(c) { + case 'h': + help(); + return 0 /* done */; + + case ARG_VERSION: + return version(); + + case 'u': + if (arg_url) { + log_error("cannot use more than one --url"); + return -EINVAL; + } + + arg_url = optarg; + break; + + case ARG_KEY: + if (arg_key) { + log_error("cannot use more than one --key"); + return -EINVAL; + } + + arg_key = optarg; + break; + + case ARG_CERT: + if (arg_cert) { + log_error("cannot use more than one --cert"); + return -EINVAL; + } + + arg_cert = optarg; + break; + + case ARG_TRUST: + if (arg_trust) { + log_error("cannot use more than one --trust"); + return -EINVAL; + } + + arg_trust = optarg; + break; + + case ARG_SYSTEM: + arg_journal_type |= SD_JOURNAL_SYSTEM; + break; + + case ARG_USER: + arg_journal_type |= SD_JOURNAL_CURRENT_USER; + break; + + case 'm': + arg_merge = true; + break; + + case 'M': + if (arg_machine) { + log_error("cannot use more than one --machine/-M"); + return -EINVAL; + } + + arg_machine = optarg; + break; + + case 'D': + if (arg_directory) { + log_error("cannot use more than one --directory/-D"); + return -EINVAL; + } + + arg_directory = optarg; + break; + + case ARG_FILE: + r = glob_extend(&arg_file, optarg); + if (r < 0) + return log_error_errno(r, "Failed to add paths: %m"); + break; + + case ARG_CURSOR: + if (arg_cursor) { + log_error("cannot use more than one --cursor/--after-cursor"); + return -EINVAL; + } + + arg_cursor = optarg; + break; + + case ARG_AFTER_CURSOR: + if (arg_cursor) { + log_error("cannot use more than one --cursor/--after-cursor"); + return -EINVAL; + } + + arg_cursor = optarg; + arg_after_cursor = true; + break; + + case ARG_FOLLOW: + if (optarg) { + r = parse_boolean(optarg); + if (r < 0) { + log_error("Failed to parse --follow= parameter."); + return -EINVAL; + } + + arg_follow = !!r; + } else + arg_follow = true; + + break; + + case ARG_SAVE_STATE: + arg_save_state = optarg ?: STATE_FILE; + break; + + case '?': + log_error("Unknown option %s.", argv[optind-1]); + return -EINVAL; + + case ':': + log_error("Missing argument to %s.", argv[optind-1]); + return -EINVAL; + + default: + assert_not_reached("Unhandled option code."); + } + + if (!arg_url) { + log_error("Required --url/-u option missing."); + return -EINVAL; + } + + if (!!arg_key != !!arg_cert) { + log_error("Options --key and --cert must be used together."); + return -EINVAL; + } + + if (optind < argc && (arg_directory || arg_file || arg_machine || arg_journal_type)) { + log_error("Input arguments make no sense with journal input."); + return -EINVAL; + } + + return 1; +} + +static int open_journal(sd_journal **j) { + int r; + + if (arg_directory) + r = sd_journal_open_directory(j, arg_directory, arg_journal_type); + else if (arg_file) + r = sd_journal_open_files(j, (const char**) arg_file, 0); + else if (arg_machine) + r = sd_journal_open_container(j, arg_machine, 0); + else + r = sd_journal_open(j, !arg_merge*SD_JOURNAL_LOCAL_ONLY + arg_journal_type); + if (r < 0) + log_error_errno(r, "Failed to open %s: %m", + arg_directory ? arg_directory : arg_file ? "files" : "journal"); + return r; +} + +int main(int argc, char **argv) { + Uploader u; + int r; + bool use_journal; + + log_show_color(true); + log_parse_environment(); + + r = parse_config(); + if (r < 0) + goto finish; + + r = parse_argv(argc, argv); + if (r <= 0) + goto finish; + + sigbus_install(); + + r = setup_uploader(&u, arg_url, arg_save_state); + if (r < 0) + goto cleanup; + + sd_event_set_watchdog(u.events, true); + + r = check_cursor_updating(&u); + if (r < 0) + goto cleanup; + + log_debug("%s running as pid "PID_FMT, + program_invocation_short_name, getpid()); + + use_journal = optind >= argc; + if (use_journal) { + sd_journal *j; + r = open_journal(&j); + if (r < 0) + goto finish; + r = open_journal_for_upload(&u, j, + arg_cursor ?: u.last_cursor, + arg_cursor ? arg_after_cursor : true, + !!arg_follow); + if (r < 0) + goto finish; + } + + sd_notify(false, + "READY=1\n" + "STATUS=Processing input..."); + + for (;;) { + r = sd_event_get_state(u.events); + if (r < 0) + break; + if (r == SD_EVENT_FINISHED) + break; + + if (use_journal) { + if (!u.journal) + break; + + r = check_journal_input(&u); + } else if (u.input < 0 && !use_journal) { + if (optind >= argc) + break; + + log_debug("Using %s as input.", argv[optind]); + r = open_file_for_upload(&u, argv[optind++]); + } + if (r < 0) + goto cleanup; + + if (u.uploading) { + r = perform_upload(&u); + if (r < 0) + break; + } + + r = sd_event_run(u.events, u.timeout); + if (r < 0) { + log_error_errno(r, "Failed to run event loop: %m"); + break; + } + } + +cleanup: + sd_notify(false, + "STOPPING=1\n" + "STATUS=Shutting down..."); + + destroy_uploader(&u); + +finish: + return r >= 0 ? EXIT_SUCCESS : EXIT_FAILURE; +} diff --git a/src/grp-journal-remote/systemd-journal-upload/journal-upload.conf.in b/src/grp-journal-remote/systemd-journal-upload/journal-upload.conf.in new file mode 100644 index 0000000000..c5670682e8 --- /dev/null +++ b/src/grp-journal-remote/systemd-journal-upload/journal-upload.conf.in @@ -0,0 +1,5 @@ +[Upload] +# URL= +# ServerKeyFile=@CERTIFICATEROOT@/private/journal-upload.pem +# ServerCertificateFile=@CERTIFICATEROOT@/certs/journal-upload.pem +# TrustedCertificateFile=@CERTIFICATEROOT@/ca/trusted.pem diff --git a/src/grp-journal-remote/systemd-journal-upload/journal-upload.h b/src/grp-journal-remote/systemd-journal-upload/journal-upload.h new file mode 100644 index 0000000000..4a521bf78f --- /dev/null +++ b/src/grp-journal-remote/systemd-journal-upload/journal-upload.h @@ -0,0 +1,71 @@ +#pragma once + +#include <inttypes.h> + +#include <systemd/sd-event.h> +#include <systemd/sd-journal.h> +#include "time-util.h" + +typedef enum { + ENTRY_CURSOR = 0, /* Nothing actually written yet. */ + ENTRY_REALTIME, + ENTRY_MONOTONIC, + ENTRY_BOOT_ID, + ENTRY_NEW_FIELD, /* In between fields. */ + ENTRY_TEXT_FIELD, /* In the middle of a text field. */ + ENTRY_BINARY_FIELD_START, /* Writing the name of a binary field. */ + ENTRY_BINARY_FIELD_SIZE, /* Writing the size of a binary field. */ + ENTRY_BINARY_FIELD, /* In the middle of a binary field. */ + ENTRY_OUTRO, /* Writing '\n' */ + ENTRY_DONE, /* Need to move to a new field. */ +} entry_state; + +typedef struct Uploader { + sd_event *events; + sd_event_source *sigint_event, *sigterm_event; + + char *url; + CURL *easy; + bool uploading; + char error[CURL_ERROR_SIZE]; + struct curl_slist *header; + char *answer; + + sd_event_source *input_event; + uint64_t timeout; + + /* fd stuff */ + int input; + + /* journal stuff */ + sd_journal* journal; + + entry_state entry_state; + const void *field_data; + size_t field_pos, field_length; + + /* general metrics */ + const char *state_file; + + size_t entries_sent; + char *last_cursor, *current_cursor; + usec_t watchdog_timestamp; + usec_t watchdog_usec; +} Uploader; + +#define JOURNAL_UPLOAD_POLL_TIMEOUT (10 * USEC_PER_SEC) + +int start_upload(Uploader *u, + size_t (*input_callback)(void *ptr, + size_t size, + size_t nmemb, + void *userdata), + void *data); + +int open_journal_for_upload(Uploader *u, + sd_journal *j, + const char *cursor, + bool after_cursor, + bool follow); +void close_journal_input(Uploader *u); +int check_journal_input(Uploader *u); |