diff options
Diffstat (limited to 'src/journal-remote')
-rw-r--r-- | src/journal-remote/journal-remote-write.c | 4 | ||||
-rw-r--r-- | src/journal-remote/journal-remote.c | 2 | ||||
-rw-r--r-- | src/journal-remote/journal-upload-journal.c | 402 | ||||
-rw-r--r-- | src/journal-remote/journal-upload.c | 269 | ||||
-rw-r--r-- | src/journal-remote/journal-upload.h | 42 |
5 files changed, 685 insertions, 34 deletions
diff --git a/src/journal-remote/journal-remote-write.c b/src/journal-remote/journal-remote-write.c index 4d142bdc97..449636cd8c 100644 --- a/src/journal-remote/journal-remote-write.c +++ b/src/journal-remote/journal-remote-write.c @@ -81,8 +81,10 @@ int writer_init(Writer *s) { } int writer_close(Writer *s) { - if (s->journal) + if (s->journal) { journal_file_close(s->journal); + log_debug("Journal has been closed."); + } if (s->mmap) mmap_cache_unref(s->mmap); return 0; diff --git a/src/journal-remote/journal-remote.c b/src/journal-remote/journal-remote.c index 437e0b05d1..906759b434 100644 --- a/src/journal-remote/journal-remote.c +++ b/src/journal-remote/journal-remote.c @@ -628,8 +628,6 @@ static int dispatch_http_event(sd_event_source *event, assert(d); - log_info("%s", __func__); - r = MHD_run(d->daemon); if (r == MHD_NO) { log_error("MHD_run failed!"); diff --git a/src/journal-remote/journal-upload-journal.c b/src/journal-remote/journal-upload-journal.c new file mode 100644 index 0000000000..a3be1bf8e5 --- /dev/null +++ b/src/journal-remote/journal-upload-journal.c @@ -0,0 +1,402 @@ +#include <stdbool.h> + +#include <curl/curl.h> + +#include "util.h" +#include "log.h" +#include "utf8.h" +#include "journal-upload.h" + +/** + * Write up to size bytes to buf. Return negative on error, and number of + * bytes written otherwise. The last case is a kind of an error too. + */ +static ssize_t write_entry(char *buf, size_t size, Uploader *u) { + int r; + size_t pos = 0; + + assert(size <= SSIZE_MAX); + + while (true) { + + switch(u->entry_state) { + case ENTRY_CURSOR: { + free(u->last_cursor); + u->last_cursor = NULL; + + r = sd_journal_get_cursor(u->journal, &u->last_cursor); + if (r < 0) { + log_error("Failed to get cursor: %s", strerror(-r)); + return r; + } + + r = snprintf(buf + pos, size - pos, + "__CURSOR=%s\n", u->last_cursor); + if (pos + r > size) + /* not enough space */ + return pos; + + u->entry_state ++; + + if (pos + r == size) { + /* exactly one character short, but we don't need it */ + buf[size - 1] = '\n'; + return size; + } + + pos += r; + } /* fall through */ + + case ENTRY_REALTIME: { + usec_t realtime; + + r = sd_journal_get_realtime_usec(u->journal, &realtime); + if (r < 0) { + log_error("Failed to get realtime timestamp: %s", strerror(-r)); + return r; + } + + r = snprintf(buf + pos, size - pos, + "__REALTIME_TIMESTAMP="USEC_FMT"\n", realtime); + if (r + pos > size) + /* not enough space */ + return pos; + + u->entry_state ++; + + if (r + pos == size) { + /* exactly one character short, but we don't need it */ + buf[size - 1] = '\n'; + return size; + } + + pos += r; + } /* fall through */ + + case ENTRY_MONOTONIC: { + usec_t monotonic; + sd_id128_t boot_id; + + r = sd_journal_get_monotonic_usec(u->journal, &monotonic, &boot_id); + if (r < 0) { + log_error("Failed to get monotonic timestamp: %s", strerror(-r)); + return r; + } + + r = snprintf(buf + pos, size - pos, + "__MONOTONIC_TIMESTAMP="USEC_FMT"\n", monotonic); + if (r + pos > size) + /* not enough space */ + return pos; + + u->entry_state ++; + + if (r + pos == size) { + /* exactly one character short, but we don't need it */ + buf[size - 1] = '\n'; + return size; + } + + pos += r; + } /* fall through */ + + case ENTRY_BOOT_ID: { + sd_id128_t boot_id; + char sid[33]; + + r = sd_journal_get_monotonic_usec(u->journal, NULL, &boot_id); + if (r < 0) { + log_error("Failed to get monotonic timestamp: %s", strerror(-r)); + return r; + } + + r = snprintf(buf + pos, size - pos, + "_BOOT_ID=%s\n", sd_id128_to_string(boot_id, sid)); + if (r + pos> size) + /* not enough space */ + return pos; + + u->entry_state ++; + + if (r + pos == size) { + /* exactly one character short, but we don't need it */ + buf[size - 1] = '\n'; + return size; + } + + pos += r; + } /* fall through */ + + case ENTRY_NEW_FIELD: { + u->field_pos = 0; + + r = sd_journal_enumerate_data(u->journal, + &u->field_data, + &u->field_length); + if (r < 0) { + log_error("Failed to move to next field in entry: %s", + strerror(-r)); + return r; + } else if (r == 0) { + u->entry_state = ENTRY_OUTRO; + continue; + } + + if (!utf8_is_printable_newline(u->field_data, + u->field_length, false)) { + u->entry_state = ENTRY_BINARY_FIELD_START; + continue; + } + + u->entry_state ++; + } /* fall through */ + + case ENTRY_TEXT_FIELD: + case ENTRY_BINARY_FIELD: { + bool done; + size_t tocopy; + + done = size - pos > u->field_length - u->field_pos; + if (done) + tocopy = u->field_length - u->field_pos; + else + tocopy = size - pos; + + memcpy(buf + pos, + (char*) u->field_data + u->field_pos, + tocopy); + + if (done) { + buf[pos + tocopy] = '\n'; + pos += tocopy + 1; + u->entry_state = ENTRY_NEW_FIELD; + continue; + } else { + u->field_pos += tocopy; + return size; + } + } + + case ENTRY_BINARY_FIELD_START: { + const char *c; + size_t len; + + c = memchr(u->field_data, '=', u->field_length); + if (!c || c == u->field_data) { + log_error("Invalid field."); + return -EINVAL; + } + + len = c - (const char*)u->field_data; + + /* need space for label + '\n' */ + if (size - pos < len + 1) + return pos; + + memcpy(buf + pos, u->field_data, len); + buf[pos + len] = '\n'; + pos += len + 1; + + u->field_pos = len + 1; + u->entry_state ++; + } /* fall through */ + + case ENTRY_BINARY_FIELD_SIZE: { + uint64_t le64; + + /* need space for uint64_t */ + if (size - pos < 8) + return pos; + + le64 = htole64(u->field_length - u->field_pos); + memcpy(buf + pos, &le64, 8); + pos += 8; + + u->entry_state ++; + continue; + } + + case ENTRY_OUTRO: + /* need space for '\n' */ + if (size - pos < 1) + return pos; + + buf[pos++] = '\n'; + u->entry_state ++; + u->entries_sent ++; + + return pos; + + default: + assert_not_reached("WTF?"); + } + } + assert_not_reached("WTF?"); +} + +static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void *userp) { + Uploader *u = userp; + int r; + sd_journal *j; + size_t filled = 0; + ssize_t w; + + assert(u); + assert(nmemb <= SSIZE_MAX / size); + + j = u->journal; + + while (j && filled < size * nmemb) { + if (u->entry_state == ENTRY_DONE) { + r = sd_journal_next(j); + if (r < 0) { + log_error("Failed to move to next entry in journal: %s", + strerror(-r)); + return CURL_READFUNC_ABORT; + } else if (r == 0) { + if (u->input_event) + log_debug("No more entries, waiting for journal."); + else { + log_info("No more entries, closing journal."); + close_journal_input(u); + } + + u->uploading = false; + + break; + } + + u->entry_state = ENTRY_CURSOR; + } + + w = write_entry((char*)buf + filled, size * nmemb - filled, u); + if (w < 0) + return CURL_READFUNC_ABORT; + filled += w; + + if (filled == 0) { + log_error("Buffer space is too small to write entry."); + return CURL_READFUNC_ABORT; + } else if (u->entry_state != ENTRY_DONE) + /* This means that all available space was used up */ + break; + + log_debug("Entry %zu (%s) has been uploaded.", + u->entries_sent, u->last_cursor); + } + + return filled; +} + +void close_journal_input(Uploader *u) { + assert(u); + + if (u->journal) { + log_debug("Closing journal input."); + + sd_journal_close(u->journal); + u->journal = NULL; + } + u->timeout = 0; +} + +static int process_journal_input(Uploader *u, int skip) { + int r; + + r = sd_journal_next_skip(u->journal, skip); + if (r < 0) { + log_error("Failed to skip to next entry: %s", strerror(-r)); + return r; + } else if (r < skip) + return 0; + + /* have data */ + u->entry_state = ENTRY_CURSOR; + return start_upload(u, journal_input_callback, u); +} + +int check_journal_input(Uploader *u) { + if (u->input_event) { + int r; + + r = sd_journal_process(u->journal); + if (r < 0) { + log_error("Failed to process journal: %s", strerror(-r)); + close_journal_input(u); + return r; + } + + if (r == SD_JOURNAL_NOP) + return 0; + } + + return process_journal_input(u, 1); +} + +static int dispatch_journal_input(sd_event_source *event, + int fd, + uint32_t revents, + void *userp) { + Uploader *u = userp; + + assert(u); + + if (u->uploading) { + log_warning("dispatch_journal_input called when uploading, ignoring."); + return 0; + } + + log_debug("Detected journal input, checking for new data."); + return check_journal_input(u); +} + +int open_journal_for_upload(Uploader *u, + sd_journal *j, + const char *cursor, + bool after_cursor, + bool follow) { + int fd, r, events; + + u->journal = j; + + sd_journal_set_data_threshold(j, 0); + + if (follow) { + fd = sd_journal_get_fd(j); + if (fd < 0) { + log_error("sd_journal_get_fd failed: %s", strerror(-fd)); + return fd; + } + + events = sd_journal_get_events(j); + + r = sd_journal_reliable_fd(j); + assert(r >= 0); + if (r > 0) + u->timeout = -1; + else + u->timeout = JOURNAL_UPLOAD_POLL_TIMEOUT; + + r = sd_event_add_io(u->events, &u->input_event, + fd, events, dispatch_journal_input, u); + if (r < 0) { + log_error("Failed to register input event: %s", strerror(-r)); + return r; + } + + log_debug("Listening for journal events on fd:%d, timeout %d", + fd, u->timeout == (uint64_t) -1 ? -1 : (int) u->timeout); + } else + log_debug("Not listening for journal events."); + + if (cursor) { + r = sd_journal_seek_cursor(j, cursor); + if (r < 0) { + log_error("Failed to seek to cursor %s: %s", + cursor, strerror(-r)); + return r; + } + } + + return process_journal_input(u, 1 + !!after_cursor); +} diff --git a/src/journal-remote/journal-upload.c b/src/journal-remote/journal-upload.c index 538ba8b650..0cab031985 100644 --- a/src/journal-remote/journal-upload.c +++ b/src/journal-remote/journal-upload.c @@ -40,6 +40,17 @@ static const char *arg_key = NULL; static const char *arg_cert = NULL; static const char *arg_trust = NULL; +static const char *arg_directory = NULL; +static char **arg_file = NULL; +static const char *arg_cursor = NULL; +static bool arg_after_cursor = false; +static int arg_journal_type = 0; +static const char *arg_machine = NULL; +static bool arg_merge = false; +static int arg_follow = -1; + +#define SERVER_ANSWER_KEEP 2048 + #define easy_setopt(curl, opt, value, level, cmd) \ { \ code = curl_easy_setopt(curl, opt, value); \ @@ -51,6 +62,27 @@ static const char *arg_trust = NULL; } \ } +static size_t output_callback(char *buf, + size_t size, + size_t nmemb, + void *userp) { + Uploader *u = userp; + + assert(u); + + log_debug("The server answers (%zu bytes): %.*s", + size*nmemb, (int)(size*nmemb), buf); + + if (nmemb && !u->answer) { + u->answer = strndup(buf, size*nmemb); + if (!u->answer) + log_warning("Failed to store server answer (%zu bytes): %s", + size*nmemb, strerror(ENOMEM)); + } + + return size * nmemb; +} + int start_upload(Uploader *u, size_t (*input_callback)(void *ptr, size_t size, @@ -97,6 +129,16 @@ int start_upload(Uploader *u, easy_setopt(curl, CURLOPT_POST, 1L, LOG_ERR, return -EXFULL); + easy_setopt(curl, CURLOPT_ERRORBUFFER, &u->error, + LOG_ERR, return -EXFULL); + + /* set where to write to */ + easy_setopt(curl, CURLOPT_WRITEFUNCTION, output_callback, + LOG_ERR, return -EXFULL); + + easy_setopt(curl, CURLOPT_WRITEDATA, data, + LOG_ERR, return -EXFULL); + /* set where to read from */ easy_setopt(curl, CURLOPT_READFUNCTION, input_callback, LOG_ERR, return -EXFULL); @@ -133,6 +175,12 @@ int start_upload(Uploader *u, LOG_WARNING, ); u->easy = curl; + } else { + /* truncate the potential old error message */ + u->error[0] = '\0'; + + free(u->answer); + u->answer = 0; } /* upload to this place */ @@ -182,6 +230,7 @@ static void close_fd_input(Uploader *u) { if (u->input >= 0) close_nointr(u->input); u->input = -1; + u->timeout = 0; } static int dispatch_fd_input(sd_event_source *event, @@ -217,17 +266,20 @@ static int open_file_for_upload(Uploader *u, const char *filename) { u->input = fd; - r = sd_event_add_io(u->events, &u->input_event, - fd, EPOLLIN, dispatch_fd_input, u); - if (r < 0) { - if (r != -EPERM) { - log_error("Failed to register input event: %s", strerror(-r)); - return r; - } + if (arg_follow) { + r = sd_event_add_io(u->events, &u->input_event, + fd, EPOLLIN, dispatch_fd_input, u); + if (r < 0) { + if (r != -EPERM || arg_follow > 0) { + log_error("Failed to register input event: %s", strerror(-r)); + return r; + } - /* Normal files should just be consumed without polling. */ - r = start_upload(u, fd_input_callback, u); + /* Normal files should just be consumed without polling. */ + r = start_upload(u, fd_input_callback, u); + } } + return r; } @@ -256,14 +308,54 @@ static void destroy_uploader(Uploader *u) { curl_easy_cleanup(u->easy); curl_slist_free_all(u->header); + free(u->answer); + + free(u->last_cursor); u->input_event = sd_event_source_unref(u->input_event); close_fd_input(u); + close_journal_input(u); sd_event_unref(u->events); } +static int perform_upload(Uploader *u) { + CURLcode code; + long status; + + assert(u); + + code = curl_easy_perform(u->easy); + if (code) { + log_error("Upload to %s failed: %.*s", + u->url, + u->error[0] ? (int) sizeof(u->error) : INT_MAX, + u->error[0] ? u->error : curl_easy_strerror(code)); + return -EIO; + } + + code = curl_easy_getinfo(u->easy, CURLINFO_RESPONSE_CODE, &status); + if (code) { + log_error("Failed to retrieve response code: %s", + curl_easy_strerror(code)); + return -EUCLEAN; + } + + if (status >= 300) { + log_error("Upload to %s failed with code %lu: %s", + u->url, status, strna(u->answer)); + return -EIO; + } else if (status < 200) { + log_error("Upload to %s finished with unexpected code %lu: %s", + u->url, status, strna(u->answer)); + return -EIO; + } else + log_debug("Upload finished successfully with code %lu: %s", + status, strna(u->answer)); + return 0; +} + static void help(void) { printf("%s -u URL {FILE|-}...\n\n" "Upload journal events to a remote server.\n\n" @@ -272,6 +364,15 @@ static void help(void) { " --key=FILENAME Specify key in PEM format\n" " --cert=FILENAME Specify certificate in PEM format\n" " --trust=FILENAME Specify CA certificate in PEM format\n" + " --system Use the system journal\n" + " --user Use the user journal for the current user\n" + " -m --merge Use all available journals\n" + " -M --machine=CONTAINER Operate on local container\n" + " -D --directory=PATH Use journal files from directory\n" + " --file=PATH Use this journal file\n" + " --cursor=CURSOR Start at the specified cursor\n" + " --after-cursor=CURSOR Start after the specified cursor\n" + " --[no-]follow Do [not] wait for input\n" " -h --help Show this help and exit\n" " --version Print version string and exit\n" , program_invocation_short_name); @@ -283,6 +384,13 @@ static int parse_argv(int argc, char *argv[]) { ARG_KEY, ARG_CERT, ARG_TRUST, + ARG_USER, + ARG_SYSTEM, + ARG_FILE, + ARG_CURSOR, + ARG_AFTER_CURSOR, + ARG_FOLLOW, + ARG_NO_FOLLOW, }; static const struct option options[] = { @@ -292,17 +400,27 @@ static int parse_argv(int argc, char *argv[]) { { "key", required_argument, NULL, ARG_KEY }, { "cert", required_argument, NULL, ARG_CERT }, { "trust", required_argument, NULL, ARG_TRUST }, + { "system", no_argument, NULL, ARG_SYSTEM }, + { "user", no_argument, NULL, ARG_USER }, + { "merge", no_argument, NULL, 'm' }, + { "machine", required_argument, NULL, 'M' }, + { "directory", required_argument, NULL, 'D' }, + { "file", required_argument, NULL, ARG_FILE }, + { "cursor", required_argument, NULL, ARG_CURSOR }, + { "after-cursor", required_argument, NULL, ARG_AFTER_CURSOR }, + { "follow", no_argument, NULL, ARG_FOLLOW }, + { "no-follow", no_argument, NULL, ARG_NO_FOLLOW }, {} }; - int c; + int c, r; assert(argc >= 0); assert(argv); opterr = 0; - while ((c = getopt_long(argc, argv, "hu:", options, NULL)) >= 0) + while ((c = getopt_long(argc, argv, "hu:mM:D:", options, NULL)) >= 0) switch(c) { case 'h': help(); @@ -349,6 +467,71 @@ static int parse_argv(int argc, char *argv[]) { arg_trust = optarg; break; + case ARG_SYSTEM: + arg_journal_type |= SD_JOURNAL_SYSTEM; + break; + + case ARG_USER: + arg_journal_type |= SD_JOURNAL_CURRENT_USER; + break; + + case 'm': + arg_merge = true; + break; + + case 'M': + if (arg_machine) { + log_error("cannot use more than one --machine/-M"); + return -EINVAL; + } + + arg_machine = optarg; + break; + + case 'D': + if (arg_directory) { + log_error("cannot use more than one --directory/-D"); + return -EINVAL; + } + + arg_directory = optarg; + break; + + case ARG_FILE: + r = glob_extend(&arg_file, optarg); + if (r < 0) { + log_error("Failed to add paths: %s", strerror(-r)); + return r; + }; + break; + + case ARG_CURSOR: + if (arg_cursor) { + log_error("cannot use more than one --cursor/--after-cursor"); + return -EINVAL; + } + + arg_cursor = optarg; + break; + + case ARG_AFTER_CURSOR: + if (arg_cursor) { + log_error("cannot use more than one --cursor/--after-cursor"); + return -EINVAL; + } + + arg_cursor = optarg; + arg_after_cursor = true; + break; + + case ARG_FOLLOW: + arg_follow = true; + break; + + case ARG_NO_FOLLOW: + arg_follow = false; + break; + case '?': log_error("Unknown option %s.", argv[optind-1]); return -EINVAL; @@ -371,18 +554,36 @@ static int parse_argv(int argc, char *argv[]) { return -EINVAL; } - if (optind >= argc) { - log_error("Input argument missing."); + if (optind < argc && (arg_directory || arg_file || arg_machine || arg_journal_type)) { + log_error("Input arguments make no sense with journal input."); return -EINVAL; } return 1; } +static int open_journal(sd_journal **j) { + int r; + + if (arg_directory) + r = sd_journal_open_directory(j, arg_directory, arg_journal_type); + else if (arg_file) + r = sd_journal_open_files(j, (const char**) arg_file, 0); + else if (arg_machine) + r = sd_journal_open_container(j, arg_machine, 0); + else + r = sd_journal_open(j, !arg_merge*SD_JOURNAL_LOCAL_ONLY + arg_journal_type); + if (r < 0) + log_error("Failed to open %s: %s", + arg_directory ? arg_directory : arg_file ? "files" : "journal", + strerror(-r)); + return r; +} int main(int argc, char **argv) { Uploader u; int r; + bool use_journal; log_show_color(true); log_parse_environment(); @@ -397,22 +598,39 @@ int main(int argc, char **argv) { log_debug("%s running as pid "PID_FMT, program_invocation_short_name, getpid()); + + use_journal = optind >= argc; + if (use_journal) { + sd_journal *j; + r = open_journal(&j); + if (r < 0) + goto finish; + r = open_journal_for_upload(&u, j, + arg_cursor, arg_after_cursor, + !!arg_follow); + if (r < 0) + goto finish; + } + sd_notify(false, "READY=1\n" "STATUS=Processing input..."); while (true) { - if (u.input < 0) { + if (use_journal) { + if (!u.journal) + break; + + r = check_journal_input(&u); + } else if (u.input < 0 && !use_journal) { if (optind >= argc) break; log_debug("Using %s as input.", argv[optind]); - r = open_file_for_upload(&u, argv[optind++]); - if (r < 0) - goto cleanup; - } + if (r < 0) + goto cleanup; r = sd_event_get_state(u.events); if (r < 0) @@ -421,21 +639,12 @@ int main(int argc, char **argv) { break; if (u.uploading) { - CURLcode code; - - assert(u.easy); - - code = curl_easy_perform(u.easy); - if (code) { - log_error("Upload to %s failed: %s", - u.url, curl_easy_strerror(code)); - r = -EIO; + r = perform_upload(&u); + if (r < 0) break; - } else - log_debug("Upload finished successfully."); } - r = sd_event_run(u.events, u.input >= 0 ? -1 : 0); + r = sd_event_run(u.events, u.timeout); if (r < 0) { log_error("Failed to run event loop: %s", strerror(-r)); break; diff --git a/src/journal-remote/journal-upload.h b/src/journal-remote/journal-upload.h index 68d85be6bc..f94d9ac69e 100644 --- a/src/journal-remote/journal-upload.h +++ b/src/journal-remote/journal-upload.h @@ -2,24 +2,64 @@ #include <inttypes.h> +#include "sd-journal.h" #include "sd-event.h" +typedef enum { + ENTRY_CURSOR = 0, /* Nothing actually written yet. */ + ENTRY_REALTIME, + ENTRY_MONOTONIC, + ENTRY_BOOT_ID, + ENTRY_NEW_FIELD, /* In between fields. */ + ENTRY_TEXT_FIELD, /* In the middle of a text field. */ + ENTRY_BINARY_FIELD_START, /* Writing the name of a binary field. */ + ENTRY_BINARY_FIELD_SIZE, /* Writing the size of a binary field. */ + ENTRY_BINARY_FIELD, /* In the middle of a binary field. */ + ENTRY_OUTRO, /* Writing '\n' */ + ENTRY_DONE, /* Need to move to a new field. */ +} entry_state; + typedef struct Uploader { sd_event *events; const char *url; CURL *easy; bool uploading; + char error[CURL_ERROR_SIZE]; struct curl_slist *header; + char *answer; + + sd_event_source *input_event; + uint64_t timeout; + /* fd stuff */ int input; - sd_event_source *input_event; + /* journal stuff */ + sd_journal* journal; + + entry_state entry_state; + const void *field_data; + size_t field_pos, field_length; + + /* general metrics */ + size_t entries_sent; + char *last_cursor; } Uploader; +#define JOURNAL_UPLOAD_POLL_TIMEOUT (10 * USEC_PER_SEC) + int start_upload(Uploader *u, size_t (*input_callback)(void *ptr, size_t size, size_t nmemb, void *userdata), void *data); + +int open_journal_for_upload(Uploader *u, + sd_journal *j, + const char *cursor, + bool after_cursor, + bool follow); +void close_journal_input(Uploader *u); +int check_journal_input(Uploader *u); |