diff options
Diffstat (limited to 'src/grp-journal/grp-remote/systemd-journal-upload')
6 files changed, 1668 insertions, 0 deletions
diff --git a/src/grp-journal/grp-remote/systemd-journal-upload/journal-upload-journal.c b/src/grp-journal/grp-remote/systemd-journal-upload/journal-upload-journal.c new file mode 100644 index 0000000000..aef095c8c9 --- /dev/null +++ b/src/grp-journal/grp-remote/systemd-journal-upload/journal-upload-journal.c @@ -0,0 +1,422 @@ +/*** + This file is part of systemd. + + Copyright 2014 Zbigniew Jędrzejewski-Szmek + + systemd is free software; you can redistribute it and/or modify it + under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 2.1 of the License, or + (at your option) any later version. + + systemd is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with systemd; If not, see <http://www.gnu.org/licenses/>. +***/ + +#include <curl/curl.h> +#include <stdbool.h> + +#include "alloc-util.h" +#include "journal-upload.h" +#include "log.h" +#include "utf8.h" +#include "util.h" +#include <systemd/sd-daemon.h> + +/** + * Write up to size bytes to buf. Return negative on error, and number of + * bytes written otherwise. The last case is a kind of an error too. + */ +static ssize_t write_entry(char *buf, size_t size, Uploader *u) { + int r; + size_t pos = 0; + + assert(size <= SSIZE_MAX); + + for (;;) { + + switch(u->entry_state) { + case ENTRY_CURSOR: { + u->current_cursor = mfree(u->current_cursor); + + r = sd_journal_get_cursor(u->journal, &u->current_cursor); + if (r < 0) + return log_error_errno(r, "Failed to get cursor: %m"); + + r = snprintf(buf + pos, size - pos, + "__CURSOR=%s\n", u->current_cursor); + if (pos + r > size) + /* not enough space */ + return pos; + + u->entry_state++; + + if (pos + r == size) { + /* exactly one character short, but we don't need it */ + buf[size - 1] = '\n'; + return size; + } + + pos += r; + } /* fall through */ + + case ENTRY_REALTIME: { + usec_t realtime; + + r = sd_journal_get_realtime_usec(u->journal, &realtime); + if (r < 0) + return log_error_errno(r, "Failed to get realtime timestamp: %m"); + + r = snprintf(buf + pos, size - pos, + "__REALTIME_TIMESTAMP="USEC_FMT"\n", realtime); + if (r + pos > size) + /* not enough space */ + return pos; + + u->entry_state++; + + if (r + pos == size) { + /* exactly one character short, but we don't need it */ + buf[size - 1] = '\n'; + return size; + } + + pos += r; + } /* fall through */ + + case ENTRY_MONOTONIC: { + usec_t monotonic; + sd_id128_t boot_id; + + r = sd_journal_get_monotonic_usec(u->journal, &monotonic, &boot_id); + if (r < 0) + return log_error_errno(r, "Failed to get monotonic timestamp: %m"); + + r = snprintf(buf + pos, size - pos, + "__MONOTONIC_TIMESTAMP="USEC_FMT"\n", monotonic); + if (r + pos > size) + /* not enough space */ + return pos; + + u->entry_state++; + + if (r + pos == size) { + /* exactly one character short, but we don't need it */ + buf[size - 1] = '\n'; + return size; + } + + pos += r; + } /* fall through */ + + case ENTRY_BOOT_ID: { + sd_id128_t boot_id; + char sid[33]; + + r = sd_journal_get_monotonic_usec(u->journal, NULL, &boot_id); + if (r < 0) + return log_error_errno(r, "Failed to get monotonic timestamp: %m"); + + r = snprintf(buf + pos, size - pos, + "_BOOT_ID=%s\n", sd_id128_to_string(boot_id, sid)); + if (r + pos > size) + /* not enough space */ + return pos; + + u->entry_state++; + + if (r + pos == size) { + /* exactly one character short, but we don't need it */ + buf[size - 1] = '\n'; + return size; + } + + pos += r; + } /* fall through */ + + case ENTRY_NEW_FIELD: { + u->field_pos = 0; + + r = sd_journal_enumerate_data(u->journal, + &u->field_data, + &u->field_length); + if (r < 0) + return log_error_errno(r, "Failed to move to next field in entry: %m"); + else if (r == 0) { + u->entry_state = ENTRY_OUTRO; + continue; + } + + if (!utf8_is_printable_newline(u->field_data, + u->field_length, false)) { + u->entry_state = ENTRY_BINARY_FIELD_START; + continue; + } + + u->entry_state++; + } /* fall through */ + + case ENTRY_TEXT_FIELD: + case ENTRY_BINARY_FIELD: { + bool done; + size_t tocopy; + + done = size - pos > u->field_length - u->field_pos; + if (done) + tocopy = u->field_length - u->field_pos; + else + tocopy = size - pos; + + memcpy(buf + pos, + (char*) u->field_data + u->field_pos, + tocopy); + + if (done) { + buf[pos + tocopy] = '\n'; + pos += tocopy + 1; + u->entry_state = ENTRY_NEW_FIELD; + continue; + } else { + u->field_pos += tocopy; + return size; + } + } + + case ENTRY_BINARY_FIELD_START: { + const char *c; + size_t len; + + c = memchr(u->field_data, '=', u->field_length); + if (!c || c == u->field_data) { + log_error("Invalid field."); + return -EINVAL; + } + + len = c - (const char*)u->field_data; + + /* need space for label + '\n' */ + if (size - pos < len + 1) + return pos; + + memcpy(buf + pos, u->field_data, len); + buf[pos + len] = '\n'; + pos += len + 1; + + u->field_pos = len + 1; + u->entry_state++; + } /* fall through */ + + case ENTRY_BINARY_FIELD_SIZE: { + uint64_t le64; + + /* need space for uint64_t */ + if (size - pos < 8) + return pos; + + le64 = htole64(u->field_length - u->field_pos); + memcpy(buf + pos, &le64, 8); + pos += 8; + + u->entry_state++; + continue; + } + + case ENTRY_OUTRO: + /* need space for '\n' */ + if (size - pos < 1) + return pos; + + buf[pos++] = '\n'; + u->entry_state++; + u->entries_sent++; + + return pos; + + default: + assert_not_reached("WTF?"); + } + } + assert_not_reached("WTF?"); +} + +static inline void check_update_watchdog(Uploader *u) { + usec_t after; + usec_t elapsed_time; + + if (u->watchdog_usec <= 0) + return; + + after = now(CLOCK_MONOTONIC); + elapsed_time = usec_sub(after, u->watchdog_timestamp); + if (elapsed_time > u->watchdog_usec / 2) { + log_debug("Update watchdog timer"); + sd_notify(false, "WATCHDOG=1"); + u->watchdog_timestamp = after; + } +} + +static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void *userp) { + Uploader *u = userp; + int r; + sd_journal *j; + size_t filled = 0; + ssize_t w; + + assert(u); + assert(nmemb <= SSIZE_MAX / size); + + check_update_watchdog(u); + + j = u->journal; + + while (j && filled < size * nmemb) { + if (u->entry_state == ENTRY_DONE) { + r = sd_journal_next(j); + if (r < 0) { + log_error_errno(r, "Failed to move to next entry in journal: %m"); + return CURL_READFUNC_ABORT; + } else if (r == 0) { + if (u->input_event) + log_debug("No more entries, waiting for journal."); + else { + log_info("No more entries, closing journal."); + close_journal_input(u); + } + + u->uploading = false; + + break; + } + + u->entry_state = ENTRY_CURSOR; + } + + w = write_entry((char*)buf + filled, size * nmemb - filled, u); + if (w < 0) + return CURL_READFUNC_ABORT; + filled += w; + + if (filled == 0) { + log_error("Buffer space is too small to write entry."); + return CURL_READFUNC_ABORT; + } else if (u->entry_state != ENTRY_DONE) + /* This means that all available space was used up */ + break; + + log_debug("Entry %zu (%s) has been uploaded.", + u->entries_sent, u->current_cursor); + } + + return filled; +} + +void close_journal_input(Uploader *u) { + assert(u); + + if (u->journal) { + log_debug("Closing journal input."); + + sd_journal_close(u->journal); + u->journal = NULL; + } + u->timeout = 0; +} + +static int process_journal_input(Uploader *u, int skip) { + int r; + + if (u->uploading) + return 0; + + r = sd_journal_next_skip(u->journal, skip); + if (r < 0) + return log_error_errno(r, "Failed to skip to next entry: %m"); + else if (r < skip) + return 0; + + /* have data */ + u->entry_state = ENTRY_CURSOR; + return start_upload(u, journal_input_callback, u); +} + +int check_journal_input(Uploader *u) { + if (u->input_event) { + int r; + + r = sd_journal_process(u->journal); + if (r < 0) { + log_error_errno(r, "Failed to process journal: %m"); + close_journal_input(u); + return r; + } + + if (r == SD_JOURNAL_NOP) + return 0; + } + + return process_journal_input(u, 1); +} + +static int dispatch_journal_input(sd_event_source *event, + int fd, + uint32_t revents, + void *userp) { + Uploader *u = userp; + + assert(u); + + if (u->uploading) + return 0; + + log_debug("Detected journal input, checking for new data."); + return check_journal_input(u); +} + +int open_journal_for_upload(Uploader *u, + sd_journal *j, + const char *cursor, + bool after_cursor, + bool follow) { + int fd, r, events; + + u->journal = j; + + sd_journal_set_data_threshold(j, 0); + + if (follow) { + fd = sd_journal_get_fd(j); + if (fd < 0) + return log_error_errno(fd, "sd_journal_get_fd failed: %m"); + + events = sd_journal_get_events(j); + + r = sd_journal_reliable_fd(j); + assert(r >= 0); + if (r > 0) + u->timeout = -1; + else + u->timeout = JOURNAL_UPLOAD_POLL_TIMEOUT; + + r = sd_event_add_io(u->events, &u->input_event, + fd, events, dispatch_journal_input, u); + if (r < 0) + return log_error_errno(r, "Failed to register input event: %m"); + + log_debug("Listening for journal events on fd:%d, timeout %d", + fd, u->timeout == (uint64_t) -1 ? -1 : (int) u->timeout); + } else + log_debug("Not listening for journal events."); + + if (cursor) { + r = sd_journal_seek_cursor(j, cursor); + if (r < 0) + return log_error_errno(r, "Failed to seek to cursor %s: %m", + cursor); + } + + return process_journal_input(u, 1 + !!after_cursor); +} diff --git a/src/grp-journal/grp-remote/systemd-journal-upload/journal-upload.c b/src/grp-journal/grp-remote/systemd-journal-upload/journal-upload.c new file mode 100644 index 0000000000..e622f6c1e1 --- /dev/null +++ b/src/grp-journal/grp-remote/systemd-journal-upload/journal-upload.c @@ -0,0 +1,880 @@ +/*** + This file is part of systemd. + + Copyright 2014 Zbigniew Jędrzejewski-Szmek + + systemd is free software; you can redistribute it and/or modify it + under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 2.1 of the License, or + (at your option) any later version. + + systemd is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with systemd; If not, see <http://www.gnu.org/licenses/>. +***/ + +#include <curl/curl.h> +#include <fcntl.h> +#include <getopt.h> +#include <stdio.h> +#include <sys/stat.h> + +#include <systemd/sd-daemon.h> + +#include "alloc-util.h" +#include "conf-parser.h" +#include "def.h" +#include "fd-util.h" +#include "fileio.h" +#include "formats-util.h" +#include "glob-util.h" +#include "journal-upload.h" +#include "log.h" +#include "mkdir.h" +#include "parse-util.h" +#include "sigbus.h" +#include "signal-util.h" +#include "string-util.h" +#include "util.h" + +#define PRIV_KEY_FILE CERTIFICATE_ROOT "/private/journal-upload.pem" +#define CERT_FILE CERTIFICATE_ROOT "/certs/journal-upload.pem" +#define TRUST_FILE CERTIFICATE_ROOT "/ca/trusted.pem" +#define DEFAULT_PORT 19532 + +static const char* arg_url = NULL; +static const char *arg_key = NULL; +static const char *arg_cert = NULL; +static const char *arg_trust = NULL; +static const char *arg_directory = NULL; +static char **arg_file = NULL; +static const char *arg_cursor = NULL; +static bool arg_after_cursor = false; +static int arg_journal_type = 0; +static const char *arg_machine = NULL; +static bool arg_merge = false; +static int arg_follow = -1; +static const char *arg_save_state = NULL; + +static void close_fd_input(Uploader *u); + +#define SERVER_ANSWER_KEEP 2048 + +#define STATE_FILE "/var/lib/systemd/journal-upload/state" + +#define easy_setopt(curl, opt, value, level, cmd) \ + do { \ + code = curl_easy_setopt(curl, opt, value); \ + if (code) { \ + log_full(level, \ + "curl_easy_setopt " #opt " failed: %s", \ + curl_easy_strerror(code)); \ + cmd; \ + } \ + } while (0) + +static size_t output_callback(char *buf, + size_t size, + size_t nmemb, + void *userp) { + Uploader *u = userp; + + assert(u); + + log_debug("The server answers (%zu bytes): %.*s", + size*nmemb, (int)(size*nmemb), buf); + + if (nmemb && !u->answer) { + u->answer = strndup(buf, size*nmemb); + if (!u->answer) + log_warning_errno(ENOMEM, "Failed to store server answer (%zu bytes): %m", + size*nmemb); + } + + return size * nmemb; +} + +static int check_cursor_updating(Uploader *u) { + _cleanup_free_ char *temp_path = NULL; + _cleanup_fclose_ FILE *f = NULL; + int r; + + if (!u->state_file) + return 0; + + r = mkdir_parents(u->state_file, 0755); + if (r < 0) + return log_error_errno(r, "Cannot create parent directory of state file %s: %m", + u->state_file); + + r = fopen_temporary(u->state_file, &f, &temp_path); + if (r < 0) + return log_error_errno(r, "Cannot save state to %s: %m", + u->state_file); + unlink(temp_path); + + return 0; +} + +static int update_cursor_state(Uploader *u) { + _cleanup_free_ char *temp_path = NULL; + _cleanup_fclose_ FILE *f = NULL; + int r; + + if (!u->state_file || !u->last_cursor) + return 0; + + r = fopen_temporary(u->state_file, &f, &temp_path); + if (r < 0) + goto fail; + + fprintf(f, + "# This is private data. Do not parse.\n" + "LAST_CURSOR=%s\n", + u->last_cursor); + + r = fflush_and_check(f); + if (r < 0) + goto fail; + + if (rename(temp_path, u->state_file) < 0) { + r = -errno; + goto fail; + } + + return 0; + +fail: + if (temp_path) + (void) unlink(temp_path); + + (void) unlink(u->state_file); + + return log_error_errno(r, "Failed to save state %s: %m", u->state_file); +} + +static int load_cursor_state(Uploader *u) { + int r; + + if (!u->state_file) + return 0; + + r = parse_env_file(u->state_file, NEWLINE, + "LAST_CURSOR", &u->last_cursor, + NULL); + + if (r == -ENOENT) + log_debug("State file %s is not present.", u->state_file); + else if (r < 0) + return log_error_errno(r, "Failed to read state file %s: %m", + u->state_file); + else + log_debug("Last cursor was %s", u->last_cursor); + + return 0; +} + + + +int start_upload(Uploader *u, + size_t (*input_callback)(void *ptr, + size_t size, + size_t nmemb, + void *userdata), + void *data) { + CURLcode code; + + assert(u); + assert(input_callback); + + if (!u->header) { + struct curl_slist *h; + + h = curl_slist_append(NULL, "Content-Type: application/vnd.fdo.journal"); + if (!h) + return log_oom(); + + h = curl_slist_append(h, "Transfer-Encoding: chunked"); + if (!h) { + curl_slist_free_all(h); + return log_oom(); + } + + h = curl_slist_append(h, "Accept: text/plain"); + if (!h) { + curl_slist_free_all(h); + return log_oom(); + } + + u->header = h; + } + + if (!u->easy) { + CURL *curl; + + curl = curl_easy_init(); + if (!curl) { + log_error("Call to curl_easy_init failed."); + return -ENOSR; + } + + /* tell it to POST to the URL */ + easy_setopt(curl, CURLOPT_POST, 1L, + LOG_ERR, return -EXFULL); + + easy_setopt(curl, CURLOPT_ERRORBUFFER, u->error, + LOG_ERR, return -EXFULL); + + /* set where to write to */ + easy_setopt(curl, CURLOPT_WRITEFUNCTION, output_callback, + LOG_ERR, return -EXFULL); + + easy_setopt(curl, CURLOPT_WRITEDATA, data, + LOG_ERR, return -EXFULL); + + /* set where to read from */ + easy_setopt(curl, CURLOPT_READFUNCTION, input_callback, + LOG_ERR, return -EXFULL); + + easy_setopt(curl, CURLOPT_READDATA, data, + LOG_ERR, return -EXFULL); + + /* use our special own mime type and chunked transfer */ + easy_setopt(curl, CURLOPT_HTTPHEADER, u->header, + LOG_ERR, return -EXFULL); + + if (_unlikely_(log_get_max_level() >= LOG_DEBUG)) + /* enable verbose for easier tracing */ + easy_setopt(curl, CURLOPT_VERBOSE, 1L, LOG_WARNING, ); + + easy_setopt(curl, CURLOPT_USERAGENT, + "systemd-journal-upload " PACKAGE_STRING, + LOG_WARNING, ); + + if (arg_key || startswith(u->url, "https://")) { + easy_setopt(curl, CURLOPT_SSLKEY, arg_key ?: PRIV_KEY_FILE, + LOG_ERR, return -EXFULL); + easy_setopt(curl, CURLOPT_SSLCERT, arg_cert ?: CERT_FILE, + LOG_ERR, return -EXFULL); + } + + if (streq_ptr(arg_trust, "all")) + easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, 0, + LOG_ERR, return -EUCLEAN); + else if (arg_trust || startswith(u->url, "https://")) + easy_setopt(curl, CURLOPT_CAINFO, arg_trust ?: TRUST_FILE, + LOG_ERR, return -EXFULL); + + if (arg_key || arg_trust) + easy_setopt(curl, CURLOPT_SSLVERSION, CURL_SSLVERSION_TLSv1, + LOG_WARNING, ); + + u->easy = curl; + } else { + /* truncate the potential old error message */ + u->error[0] = '\0'; + + free(u->answer); + u->answer = 0; + } + + /* upload to this place */ + code = curl_easy_setopt(u->easy, CURLOPT_URL, u->url); + if (code) { + log_error("curl_easy_setopt CURLOPT_URL failed: %s", + curl_easy_strerror(code)); + return -EXFULL; + } + + u->uploading = true; + + return 0; +} + +static size_t fd_input_callback(void *buf, size_t size, size_t nmemb, void *userp) { + Uploader *u = userp; + + ssize_t r; + + assert(u); + assert(nmemb <= SSIZE_MAX / size); + + if (u->input < 0) + return 0; + + r = read(u->input, buf, size * nmemb); + log_debug("%s: allowed %zu, read %zd", __func__, size*nmemb, r); + + if (r > 0) + return r; + + u->uploading = false; + if (r == 0) { + log_debug("Reached EOF"); + close_fd_input(u); + return 0; + } else { + log_error_errno(errno, "Aborting transfer after read error on input: %m."); + return CURL_READFUNC_ABORT; + } +} + +static void close_fd_input(Uploader *u) { + assert(u); + + if (u->input >= 0) + close_nointr(u->input); + u->input = -1; + u->timeout = 0; +} + +static int dispatch_fd_input(sd_event_source *event, + int fd, + uint32_t revents, + void *userp) { + Uploader *u = userp; + + assert(u); + assert(fd >= 0); + + if (revents & EPOLLHUP) { + log_debug("Received HUP"); + close_fd_input(u); + return 0; + } + + if (!(revents & EPOLLIN)) { + log_warning("Unexpected poll event %"PRIu32".", revents); + return -EINVAL; + } + + if (u->uploading) { + log_warning("dispatch_fd_input called when uploading, ignoring."); + return 0; + } + + return start_upload(u, fd_input_callback, u); +} + +static int open_file_for_upload(Uploader *u, const char *filename) { + int fd, r = 0; + + if (streq(filename, "-")) + fd = STDIN_FILENO; + else { + fd = open(filename, O_RDONLY|O_CLOEXEC|O_NOCTTY); + if (fd < 0) + return log_error_errno(errno, "Failed to open %s: %m", filename); + } + + u->input = fd; + + if (arg_follow) { + r = sd_event_add_io(u->events, &u->input_event, + fd, EPOLLIN, dispatch_fd_input, u); + if (r < 0) { + if (r != -EPERM || arg_follow > 0) + return log_error_errno(r, "Failed to register input event: %m"); + + /* Normal files should just be consumed without polling. */ + r = start_upload(u, fd_input_callback, u); + } + } + + return r; +} + +static int dispatch_sigterm(sd_event_source *event, + const struct signalfd_siginfo *si, + void *userdata) { + Uploader *u = userdata; + + assert(u); + + log_received_signal(LOG_INFO, si); + + close_fd_input(u); + close_journal_input(u); + + sd_event_exit(u->events, 0); + return 0; +} + +static int setup_signals(Uploader *u) { + int r; + + assert(u); + + assert_se(sigprocmask_many(SIG_SETMASK, NULL, SIGINT, SIGTERM, -1) >= 0); + + r = sd_event_add_signal(u->events, &u->sigterm_event, SIGTERM, dispatch_sigterm, u); + if (r < 0) + return r; + + r = sd_event_add_signal(u->events, &u->sigint_event, SIGINT, dispatch_sigterm, u); + if (r < 0) + return r; + + return 0; +} + +static int setup_uploader(Uploader *u, const char *url, const char *state_file) { + int r; + const char *host, *proto = ""; + + assert(u); + assert(url); + + memzero(u, sizeof(Uploader)); + u->input = -1; + + if (!(host = startswith(url, "http://")) && !(host = startswith(url, "https://"))) { + host = url; + proto = "https://"; + } + + if (strchr(host, ':')) + u->url = strjoin(proto, url, "/upload", NULL); + else { + char *t; + size_t x; + + t = strdupa(url); + x = strlen(t); + while (x > 0 && t[x - 1] == '/') + t[x - 1] = '\0'; + + u->url = strjoin(proto, t, ":" STRINGIFY(DEFAULT_PORT), "/upload", NULL); + } + if (!u->url) + return log_oom(); + + u->state_file = state_file; + + r = sd_event_default(&u->events); + if (r < 0) + return log_error_errno(r, "sd_event_default failed: %m"); + + r = setup_signals(u); + if (r < 0) + return log_error_errno(r, "Failed to set up signals: %m"); + + (void) sd_watchdog_enabled(false, &u->watchdog_usec); + + return load_cursor_state(u); +} + +static void destroy_uploader(Uploader *u) { + assert(u); + + curl_easy_cleanup(u->easy); + curl_slist_free_all(u->header); + free(u->answer); + + free(u->last_cursor); + free(u->current_cursor); + + free(u->url); + + u->input_event = sd_event_source_unref(u->input_event); + + close_fd_input(u); + close_journal_input(u); + + sd_event_source_unref(u->sigterm_event); + sd_event_source_unref(u->sigint_event); + sd_event_unref(u->events); +} + +static int perform_upload(Uploader *u) { + CURLcode code; + long status; + + assert(u); + + u->watchdog_timestamp = now(CLOCK_MONOTONIC); + code = curl_easy_perform(u->easy); + if (code) { + if (u->error[0]) + log_error("Upload to %s failed: %.*s", + u->url, (int) sizeof(u->error), u->error); + else + log_error("Upload to %s failed: %s", + u->url, curl_easy_strerror(code)); + return -EIO; + } + + code = curl_easy_getinfo(u->easy, CURLINFO_RESPONSE_CODE, &status); + if (code) { + log_error("Failed to retrieve response code: %s", + curl_easy_strerror(code)); + return -EUCLEAN; + } + + if (status >= 300) { + log_error("Upload to %s failed with code %ld: %s", + u->url, status, strna(u->answer)); + return -EIO; + } else if (status < 200) { + log_error("Upload to %s finished with unexpected code %ld: %s", + u->url, status, strna(u->answer)); + return -EIO; + } else + log_debug("Upload finished successfully with code %ld: %s", + status, strna(u->answer)); + + free(u->last_cursor); + u->last_cursor = u->current_cursor; + u->current_cursor = NULL; + + return update_cursor_state(u); +} + +static int parse_config(void) { + const ConfigTableItem items[] = { + { "Upload", "URL", config_parse_string, 0, &arg_url }, + { "Upload", "ServerKeyFile", config_parse_path, 0, &arg_key }, + { "Upload", "ServerCertificateFile", config_parse_path, 0, &arg_cert }, + { "Upload", "TrustedCertificateFile", config_parse_path, 0, &arg_trust }, + {}}; + + return config_parse_many(PKGSYSCONFDIR "/journal-upload.conf", + CONF_PATHS_NULSTR("systemd/journal-upload.conf.d"), + "Upload\0", config_item_table_lookup, items, + false, NULL); +} + +static void help(void) { + printf("%s -u URL {FILE|-}...\n\n" + "Upload journal events to a remote server.\n\n" + " -h --help Show this help\n" + " --version Show package version\n" + " -u --url=URL Upload to this address (default port " + STRINGIFY(DEFAULT_PORT) ")\n" + " --key=FILENAME Specify key in PEM format (default:\n" + " \"" PRIV_KEY_FILE "\")\n" + " --cert=FILENAME Specify certificate in PEM format (default:\n" + " \"" CERT_FILE "\")\n" + " --trust=FILENAME|all Specify CA certificate or disable checking (default:\n" + " \"" TRUST_FILE "\")\n" + " --system Use the system journal\n" + " --user Use the user journal for the current user\n" + " -m --merge Use all available journals\n" + " -M --machine=CONTAINER Operate on local container\n" + " -D --directory=PATH Use journal files from directory\n" + " --file=PATH Use this journal file\n" + " --cursor=CURSOR Start at the specified cursor\n" + " --after-cursor=CURSOR Start after the specified cursor\n" + " --follow[=BOOL] Do [not] wait for input\n" + " --save-state[=FILE] Save uploaded cursors (default \n" + " " STATE_FILE ")\n" + " -h --help Show this help and exit\n" + " --version Print version string and exit\n" + , program_invocation_short_name); +} + +static int parse_argv(int argc, char *argv[]) { + enum { + ARG_VERSION = 0x100, + ARG_KEY, + ARG_CERT, + ARG_TRUST, + ARG_USER, + ARG_SYSTEM, + ARG_FILE, + ARG_CURSOR, + ARG_AFTER_CURSOR, + ARG_FOLLOW, + ARG_SAVE_STATE, + }; + + static const struct option options[] = { + { "help", no_argument, NULL, 'h' }, + { "version", no_argument, NULL, ARG_VERSION }, + { "url", required_argument, NULL, 'u' }, + { "key", required_argument, NULL, ARG_KEY }, + { "cert", required_argument, NULL, ARG_CERT }, + { "trust", required_argument, NULL, ARG_TRUST }, + { "system", no_argument, NULL, ARG_SYSTEM }, + { "user", no_argument, NULL, ARG_USER }, + { "merge", no_argument, NULL, 'm' }, + { "machine", required_argument, NULL, 'M' }, + { "directory", required_argument, NULL, 'D' }, + { "file", required_argument, NULL, ARG_FILE }, + { "cursor", required_argument, NULL, ARG_CURSOR }, + { "after-cursor", required_argument, NULL, ARG_AFTER_CURSOR }, + { "follow", optional_argument, NULL, ARG_FOLLOW }, + { "save-state", optional_argument, NULL, ARG_SAVE_STATE }, + {} + }; + + int c, r; + + assert(argc >= 0); + assert(argv); + + opterr = 0; + + while ((c = getopt_long(argc, argv, "hu:mM:D:", options, NULL)) >= 0) + switch(c) { + case 'h': + help(); + return 0 /* done */; + + case ARG_VERSION: + return version(); + + case 'u': + if (arg_url) { + log_error("cannot use more than one --url"); + return -EINVAL; + } + + arg_url = optarg; + break; + + case ARG_KEY: + if (arg_key) { + log_error("cannot use more than one --key"); + return -EINVAL; + } + + arg_key = optarg; + break; + + case ARG_CERT: + if (arg_cert) { + log_error("cannot use more than one --cert"); + return -EINVAL; + } + + arg_cert = optarg; + break; + + case ARG_TRUST: + if (arg_trust) { + log_error("cannot use more than one --trust"); + return -EINVAL; + } + + arg_trust = optarg; + break; + + case ARG_SYSTEM: + arg_journal_type |= SD_JOURNAL_SYSTEM; + break; + + case ARG_USER: + arg_journal_type |= SD_JOURNAL_CURRENT_USER; + break; + + case 'm': + arg_merge = true; + break; + + case 'M': + if (arg_machine) { + log_error("cannot use more than one --machine/-M"); + return -EINVAL; + } + + arg_machine = optarg; + break; + + case 'D': + if (arg_directory) { + log_error("cannot use more than one --directory/-D"); + return -EINVAL; + } + + arg_directory = optarg; + break; + + case ARG_FILE: + r = glob_extend(&arg_file, optarg); + if (r < 0) + return log_error_errno(r, "Failed to add paths: %m"); + break; + + case ARG_CURSOR: + if (arg_cursor) { + log_error("cannot use more than one --cursor/--after-cursor"); + return -EINVAL; + } + + arg_cursor = optarg; + break; + + case ARG_AFTER_CURSOR: + if (arg_cursor) { + log_error("cannot use more than one --cursor/--after-cursor"); + return -EINVAL; + } + + arg_cursor = optarg; + arg_after_cursor = true; + break; + + case ARG_FOLLOW: + if (optarg) { + r = parse_boolean(optarg); + if (r < 0) { + log_error("Failed to parse --follow= parameter."); + return -EINVAL; + } + + arg_follow = !!r; + } else + arg_follow = true; + + break; + + case ARG_SAVE_STATE: + arg_save_state = optarg ?: STATE_FILE; + break; + + case '?': + log_error("Unknown option %s.", argv[optind-1]); + return -EINVAL; + + case ':': + log_error("Missing argument to %s.", argv[optind-1]); + return -EINVAL; + + default: + assert_not_reached("Unhandled option code."); + } + + if (!arg_url) { + log_error("Required --url/-u option missing."); + return -EINVAL; + } + + if (!!arg_key != !!arg_cert) { + log_error("Options --key and --cert must be used together."); + return -EINVAL; + } + + if (optind < argc && (arg_directory || arg_file || arg_machine || arg_journal_type)) { + log_error("Input arguments make no sense with journal input."); + return -EINVAL; + } + + return 1; +} + +static int open_journal(sd_journal **j) { + int r; + + if (arg_directory) + r = sd_journal_open_directory(j, arg_directory, arg_journal_type); + else if (arg_file) + r = sd_journal_open_files(j, (const char**) arg_file, 0); + else if (arg_machine) + r = sd_journal_open_container(j, arg_machine, 0); + else + r = sd_journal_open(j, !arg_merge*SD_JOURNAL_LOCAL_ONLY + arg_journal_type); + if (r < 0) + log_error_errno(r, "Failed to open %s: %m", + arg_directory ? arg_directory : arg_file ? "files" : "journal"); + return r; +} + +int main(int argc, char **argv) { + Uploader u; + int r; + bool use_journal; + + log_show_color(true); + log_parse_environment(); + + r = parse_config(); + if (r < 0) + goto finish; + + r = parse_argv(argc, argv); + if (r <= 0) + goto finish; + + sigbus_install(); + + r = setup_uploader(&u, arg_url, arg_save_state); + if (r < 0) + goto cleanup; + + sd_event_set_watchdog(u.events, true); + + r = check_cursor_updating(&u); + if (r < 0) + goto cleanup; + + log_debug("%s running as pid "PID_FMT, + program_invocation_short_name, getpid()); + + use_journal = optind >= argc; + if (use_journal) { + sd_journal *j; + r = open_journal(&j); + if (r < 0) + goto finish; + r = open_journal_for_upload(&u, j, + arg_cursor ?: u.last_cursor, + arg_cursor ? arg_after_cursor : true, + !!arg_follow); + if (r < 0) + goto finish; + } + + sd_notify(false, + "READY=1\n" + "STATUS=Processing input..."); + + for (;;) { + r = sd_event_get_state(u.events); + if (r < 0) + break; + if (r == SD_EVENT_FINISHED) + break; + + if (use_journal) { + if (!u.journal) + break; + + r = check_journal_input(&u); + } else if (u.input < 0 && !use_journal) { + if (optind >= argc) + break; + + log_debug("Using %s as input.", argv[optind]); + r = open_file_for_upload(&u, argv[optind++]); + } + if (r < 0) + goto cleanup; + + if (u.uploading) { + r = perform_upload(&u); + if (r < 0) + break; + } + + r = sd_event_run(u.events, u.timeout); + if (r < 0) { + log_error_errno(r, "Failed to run event loop: %m"); + break; + } + } + +cleanup: + sd_notify(false, + "STOPPING=1\n" + "STATUS=Shutting down..."); + + destroy_uploader(&u); + +finish: + return r >= 0 ? EXIT_SUCCESS : EXIT_FAILURE; +} diff --git a/src/grp-journal/grp-remote/systemd-journal-upload/journal-upload.conf.in b/src/grp-journal/grp-remote/systemd-journal-upload/journal-upload.conf.in new file mode 100644 index 0000000000..c5670682e8 --- /dev/null +++ b/src/grp-journal/grp-remote/systemd-journal-upload/journal-upload.conf.in @@ -0,0 +1,5 @@ +[Upload] +# URL= +# ServerKeyFile=@CERTIFICATEROOT@/private/journal-upload.pem +# ServerCertificateFile=@CERTIFICATEROOT@/certs/journal-upload.pem +# TrustedCertificateFile=@CERTIFICATEROOT@/ca/trusted.pem diff --git a/src/grp-journal/grp-remote/systemd-journal-upload/journal-upload.h b/src/grp-journal/grp-remote/systemd-journal-upload/journal-upload.h new file mode 100644 index 0000000000..4a521bf78f --- /dev/null +++ b/src/grp-journal/grp-remote/systemd-journal-upload/journal-upload.h @@ -0,0 +1,71 @@ +#pragma once + +#include <inttypes.h> + +#include <systemd/sd-event.h> +#include <systemd/sd-journal.h> +#include "time-util.h" + +typedef enum { + ENTRY_CURSOR = 0, /* Nothing actually written yet. */ + ENTRY_REALTIME, + ENTRY_MONOTONIC, + ENTRY_BOOT_ID, + ENTRY_NEW_FIELD, /* In between fields. */ + ENTRY_TEXT_FIELD, /* In the middle of a text field. */ + ENTRY_BINARY_FIELD_START, /* Writing the name of a binary field. */ + ENTRY_BINARY_FIELD_SIZE, /* Writing the size of a binary field. */ + ENTRY_BINARY_FIELD, /* In the middle of a binary field. */ + ENTRY_OUTRO, /* Writing '\n' */ + ENTRY_DONE, /* Need to move to a new field. */ +} entry_state; + +typedef struct Uploader { + sd_event *events; + sd_event_source *sigint_event, *sigterm_event; + + char *url; + CURL *easy; + bool uploading; + char error[CURL_ERROR_SIZE]; + struct curl_slist *header; + char *answer; + + sd_event_source *input_event; + uint64_t timeout; + + /* fd stuff */ + int input; + + /* journal stuff */ + sd_journal* journal; + + entry_state entry_state; + const void *field_data; + size_t field_pos, field_length; + + /* general metrics */ + const char *state_file; + + size_t entries_sent; + char *last_cursor, *current_cursor; + usec_t watchdog_timestamp; + usec_t watchdog_usec; +} Uploader; + +#define JOURNAL_UPLOAD_POLL_TIMEOUT (10 * USEC_PER_SEC) + +int start_upload(Uploader *u, + size_t (*input_callback)(void *ptr, + size_t size, + size_t nmemb, + void *userdata), + void *data); + +int open_journal_for_upload(Uploader *u, + sd_journal *j, + const char *cursor, + bool after_cursor, + bool follow); +void close_journal_input(Uploader *u); +int check_journal_input(Uploader *u); diff --git a/src/grp-journal/grp-remote/systemd-journal-upload/systemd-journal-upload.service.in b/src/grp-journal/grp-remote/systemd-journal-upload/systemd-journal-upload.service.in new file mode 100644 index 0000000000..1f488ff425 --- /dev/null +++ b/src/grp-journal/grp-remote/systemd-journal-upload/systemd-journal-upload.service.in @@ -0,0 +1,27 @@ +# This file is part of systemd. +# +# systemd is free software; you can redistribute it and/or modify it +# under the terms of the GNU Lesser General Public License as published by +# the Free Software Foundation; either version 2.1 of the License, or +# (at your option) any later version. + +[Unit] +Description=Journal Remote Upload Service +Documentation=man:systemd-journal-upload(8) +After=network.target + +[Service] +ExecStart=@rootlibexecdir@/systemd-journal-upload \ + --save-state +User=systemd-journal-upload +SupplementaryGroups=systemd-journal +PrivateTmp=yes +PrivateDevices=yes +WatchdogSec=3min + +# If there are many split up journal files we need a lot of fds to +# access them all and combine +LimitNOFILE=16384 + +[Install] +WantedBy=multi-user.target diff --git a/src/grp-journal/grp-remote/systemd-journal-upload/systemd-journal-upload.xml b/src/grp-journal/grp-remote/systemd-journal-upload/systemd-journal-upload.xml new file mode 100644 index 0000000000..f9723dea89 --- /dev/null +++ b/src/grp-journal/grp-remote/systemd-journal-upload/systemd-journal-upload.xml @@ -0,0 +1,263 @@ +<?xml version='1.0'?> <!--*- Mode: nxml; nxml-child-indent: 2; indent-tabs-mode: nil -*--> +<!DOCTYPE refentry PUBLIC "-//OASIS//DTD DocBook XML V4.2//EN" +"http://www.oasis-open.org/docbook/xml/4.2/docbookx.dtd"> + +<!-- + This file is part of systemd. + + Copyright 2014 Zbigniew Jędrzejewski-Szmek + + systemd is free software; you can redistribute it and/or modify it + under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 2.1 of the License, or + (at your option) any later version. + + systemd is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with systemd; If not, see <http://www.gnu.org/licenses/>. +--> + +<refentry id="systemd-journal-upload" conditional='HAVE_MICROHTTPD' + xmlns:xi="http://www.w3.org/2001/XInclude"> + + <refentryinfo> + <title>systemd-journal-upload</title> + <productname>systemd</productname> + + <authorgroup> + <author> + <contrib>Developer</contrib> + <firstname>Zbigniew</firstname> + <surname>Jędrzejewski-Szmek</surname> + <email>zbyszek@in.waw.pl</email> + </author> + </authorgroup> + </refentryinfo> + + <refmeta> + <refentrytitle>systemd-journal-upload</refentrytitle> + <manvolnum>8</manvolnum> + </refmeta> + + <refnamediv> + <refname>systemd-journal-upload</refname> + <refpurpose>Send journal messages over the network</refpurpose> + </refnamediv> + + <refsynopsisdiv> + <cmdsynopsis> + <command>systemd-journal-upload</command> + <arg choice="opt" rep="repeat">OPTIONS</arg> + <arg choice="opt" rep="norepeat">-u/--url=<replaceable>URL</replaceable></arg> + <arg choice="opt" rep="repeat">SOURCES</arg> + </cmdsynopsis> + </refsynopsisdiv> + + <refsect1> + <title>Description</title> + + <para> + <command>systemd-journal-upload</command> will upload journal + entries to the URL specified with <option>--url</option>. Unless + limited by one of the options specified below, all journal + entries accessible to the user the program is running as will be + uploaded, and then the program will wait and send new entries + as they become available. + </para> + </refsect1> + + <refsect1> + <title>Options</title> + + <variablelist> + <varlistentry> + <term><option>-u</option></term> + <term><option>--url=<optional>https://</optional><replaceable>URL</replaceable></option></term> + <term><option>--url=<optional>http://</optional><replaceable>URL</replaceable></option></term> + + <listitem><para>Upload to the specified + address. <replaceable>URL</replaceable> may specify either + just the hostname or both the protocol and + hostname. <constant>https</constant> is the default. + </para></listitem> + </varlistentry> + + <varlistentry> + <term><option>--system</option></term> + <term><option>--user</option></term> + + <listitem><para>Limit uploaded entries to entries from system + services and the kernel, or to entries from services of + current user. This has the same meaning as + <option>--system</option> and <option>--user</option> options + for + <citerefentry><refentrytitle>journalctl</refentrytitle><manvolnum>1</manvolnum></citerefentry>. If + neither is specified, all accessible entries are uploaded. + </para></listitem> + </varlistentry> + + <varlistentry> + <term><option>-m</option></term> + <term><option>--merge</option></term> + + <listitem><para>Upload entries interleaved from all available + journals, including other machines. This has the same meaning + as <option>--merge</option> option for + <citerefentry><refentrytitle>journalctl</refentrytitle><manvolnum>1</manvolnum></citerefentry>.</para></listitem> + </varlistentry> + + <varlistentry> + <term><option>-D</option></term> + <term><option>--directory=<replaceable>DIR</replaceable></option></term> + + <listitem><para>Takes a directory path as argument. Upload + entries from the specified journal directory + <replaceable>DIR</replaceable> instead of the default runtime + and system journal paths. This has the same meaning as + <option>--directory</option> option for + <citerefentry><refentrytitle>journalctl</refentrytitle><manvolnum>1</manvolnum></citerefentry>. + </para></listitem> + </varlistentry> + + <varlistentry> + <term><option>--file=<replaceable>GLOB</replaceable></option></term> + + <listitem><para>Takes a file glob as an argument. Upload + entries from the specified journal files matching + <replaceable>GLOB</replaceable> instead of the default runtime + and system journal paths. May be specified multiple times, in + which case files will be suitably interleaved. This has the same meaning as + <option>--file</option> option for + <citerefentry><refentrytitle>journalctl</refentrytitle><manvolnum>1</manvolnum></citerefentry>. + </para></listitem> + </varlistentry> + + <varlistentry> + <term><option>--cursor=</option></term> + + <listitem><para>Upload entries from the location in the + journal specified by the passed cursor. This has the same + meaning as <option>--cursor</option> option for + <citerefentry><refentrytitle>journalctl</refentrytitle><manvolnum>1</manvolnum></citerefentry>.</para></listitem> + </varlistentry> + + <varlistentry> + <term><option>--after-cursor=</option></term> + + <listitem><para>Upload entries from the location in the + journal <emphasis>after</emphasis> the location specified by + the this cursor. This has the same meaning as + <option>--after-cursor</option> option for + <citerefentry><refentrytitle>journalctl</refentrytitle><manvolnum>1</manvolnum></citerefentry>. + </para></listitem> + </varlistentry> + + + <varlistentry> + <term><option>--save-state</option><optional>=<replaceable>PATH</replaceable></optional></term> + + <listitem><para>Upload entries from the location in the + journal <emphasis>after</emphasis> the location specified by + the cursor saved in file at <replaceable>PATH</replaceable> + (<filename>/var/lib/systemd/journal-upload/state</filename> by default). + After an entry is successfully uploaded, update this file + with the cursor of that entry. + </para></listitem> + </varlistentry> + + <xi:include href="standard-options.xml" xpointer="help" /> + <xi:include href="standard-options.xml" xpointer="version" /> + </variablelist> + </refsect1> + + <refsect1> + <title>Exit status</title> + + <para>On success, 0 is returned; otherwise, a non-zero + failure code is returned.</para> + </refsect1> + + <refsect1> + <title>Examples</title> + <example> + <title>Setting up certificates for authentication</title> + + <para>Certificates signed by a trusted authority are used to + verify that the server to which messages are uploaded is + legitimate, and vice versa, that the client is trusted.</para> + + <para>A suitable set of certificates can be generated with + <command>openssl</command>:</para> + + <programlisting>openssl req -newkey rsa:2048 -days 3650 -x509 -nodes \ + -out ca.pem -keyout ca.key -subj '/CN=Certificate authority/' + +cat >ca.conf <<EOF +[ ca ] +default_ca = this + +[ this ] +new_certs_dir = . +certificate = ca.pem +database = ./index +private_key = ca.key +serial = ./serial +default_days = 3650 +default_md = default +policy = policy_anything + +[ policy_anything ] +countryName = optional +stateOrProvinceName = optional +localityName = optional +organizationName = optional +organizationalUnitName = optional +commonName = supplied +emailAddress = optional +EOF + +touch index +echo 0001 >serial + +SERVER=server +CLIENT=client + +openssl req -newkey rsa:1024 -nodes -out $SERVER.csr -keyout $SERVER.key -subj "/CN=$SERVER/" +openssl ca -batch -config ca.conf -notext -in $SERVER.csr -out $SERVER.pem + +openssl req -newkey rsa:1024 -nodes -out $CLIENT.csr -keyout $CLIENT.key -subj "/CN=$CLIENT/" +openssl ca -batch -config ca.conf -notext -in $CLIENT.csr -out $CLIENT.pem +</programlisting> + + <para>Generated files <filename>ca.pem</filename>, + <filename>server.pem</filename>, and + <filename>server.key</filename> should be installed on server, + and <filename>ca.pem</filename>, + <filename>client.pem</filename>, and + <filename>client.key</filename> on the client. The location of + those files can be specified using + <varname>TrustedCertificateFile=</varname>, + <varname>ServerCertificateFile=</varname>, + <varname>ServerKeyFile=</varname>, in + <filename>/etc/systemd/journal-remote.conf</filename> and + <filename>/etc/systemd/journal-upload.conf</filename>, + respectively. The default locations can be queried by using + <command>systemd-journal-remote --help</command> and + <command>systemd-journal-upload --help</command>.</para> + </example> + </refsect1> + + <refsect1> + <title>See Also</title> + <para> + <citerefentry><refentrytitle>systemd-journal-remote</refentrytitle><manvolnum>8</manvolnum></citerefentry>, + <citerefentry><refentrytitle>journalctl</refentrytitle><manvolnum>1</manvolnum></citerefentry>, + <citerefentry><refentrytitle>systemd-journald.service</refentrytitle><manvolnum>8</manvolnum></citerefentry>, + <citerefentry><refentrytitle>systemd-journal-gatewayd.service</refentrytitle><manvolnum>8</manvolnum></citerefentry> + </para> + </refsect1> +</refentry> |