diff options
Diffstat (limited to 'src/journal-remote/journal-upload-journal.c')
-rw-r--r-- | src/journal-remote/journal-upload-journal.c | 77 |
1 files changed, 57 insertions, 20 deletions
diff --git a/src/journal-remote/journal-upload-journal.c b/src/journal-remote/journal-upload-journal.c index 5fd639a76a..8ce8e1895e 100644 --- a/src/journal-remote/journal-upload-journal.c +++ b/src/journal-remote/journal-upload-journal.c @@ -1,11 +1,31 @@ -#include <stdbool.h> +/*** + This file is part of systemd. + + Copyright 2014 Zbigniew Jędrzejewski-Szmek + + systemd is free software; you can redistribute it and/or modify it + under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 2.1 of the License, or + (at your option) any later version. + + systemd is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with systemd; If not, see <http://www.gnu.org/licenses/>. +***/ #include <curl/curl.h> +#include <stdbool.h> -#include "util.h" +#include "alloc-util.h" +#include "journal-upload.h" #include "log.h" #include "utf8.h" -#include "journal-upload.h" +#include "util.h" +#include "sd-daemon.h" /** * Write up to size bytes to buf. Return negative on error, and number of @@ -17,12 +37,11 @@ static ssize_t write_entry(char *buf, size_t size, Uploader *u) { assert(size <= SSIZE_MAX); - while (true) { + for (;;) { switch(u->entry_state) { case ENTRY_CURSOR: { - free(u->current_cursor); - u->current_cursor = NULL; + u->current_cursor = mfree(u->current_cursor); r = sd_journal_get_cursor(u->journal, &u->current_cursor); if (r < 0) @@ -34,7 +53,7 @@ static ssize_t write_entry(char *buf, size_t size, Uploader *u) { /* not enough space */ return pos; - u->entry_state ++; + u->entry_state++; if (pos + r == size) { /* exactly one character short, but we don't need it */ @@ -58,7 +77,7 @@ static ssize_t write_entry(char *buf, size_t size, Uploader *u) { /* not enough space */ return pos; - u->entry_state ++; + u->entry_state++; if (r + pos == size) { /* exactly one character short, but we don't need it */ @@ -83,7 +102,7 @@ static ssize_t write_entry(char *buf, size_t size, Uploader *u) { /* not enough space */ return pos; - u->entry_state ++; + u->entry_state++; if (r + pos == size) { /* exactly one character short, but we don't need it */ @@ -108,7 +127,7 @@ static ssize_t write_entry(char *buf, size_t size, Uploader *u) { /* not enough space */ return pos; - u->entry_state ++; + u->entry_state++; if (r + pos == size) { /* exactly one character short, but we don't need it */ @@ -138,7 +157,7 @@ static ssize_t write_entry(char *buf, size_t size, Uploader *u) { continue; } - u->entry_state ++; + u->entry_state++; } /* fall through */ case ENTRY_TEXT_FIELD: @@ -188,7 +207,7 @@ static ssize_t write_entry(char *buf, size_t size, Uploader *u) { pos += len + 1; u->field_pos = len + 1; - u->entry_state ++; + u->entry_state++; } /* fall through */ case ENTRY_BINARY_FIELD_SIZE: { @@ -202,7 +221,7 @@ static ssize_t write_entry(char *buf, size_t size, Uploader *u) { memcpy(buf + pos, &le64, 8); pos += 8; - u->entry_state ++; + u->entry_state++; continue; } @@ -212,8 +231,8 @@ static ssize_t write_entry(char *buf, size_t size, Uploader *u) { return pos; buf[pos++] = '\n'; - u->entry_state ++; - u->entries_sent ++; + u->entry_state++; + u->entries_sent++; return pos; @@ -224,6 +243,22 @@ static ssize_t write_entry(char *buf, size_t size, Uploader *u) { assert_not_reached("WTF?"); } +static inline void check_update_watchdog(Uploader *u) { + usec_t after; + usec_t elapsed_time; + + if (u->watchdog_usec <= 0) + return; + + after = now(CLOCK_MONOTONIC); + elapsed_time = usec_sub(after, u->watchdog_timestamp); + if (elapsed_time > u->watchdog_usec / 2) { + log_debug("Update watchdog timer"); + sd_notify(false, "WATCHDOG=1"); + u->watchdog_timestamp = after; + } +} + static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void *userp) { Uploader *u = userp; int r; @@ -234,6 +269,8 @@ static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void assert(u); assert(nmemb <= SSIZE_MAX / size); + check_update_watchdog(u); + j = u->journal; while (j && filled < size * nmemb) { @@ -292,6 +329,9 @@ void close_journal_input(Uploader *u) { static int process_journal_input(Uploader *u, int skip) { int r; + if (u->uploading) + return 0; + r = sd_journal_next_skip(u->journal, skip); if (r < 0) return log_error_errno(r, "Failed to skip to next entry: %m"); @@ -329,10 +369,8 @@ static int dispatch_journal_input(sd_event_source *event, assert(u); - if (u->uploading) { - log_warning("dispatch_journal_input called when uploading, ignoring."); + if (u->uploading) return 0; - } log_debug("Detected journal input, checking for new data."); return check_journal_input(u); @@ -375,10 +413,9 @@ int open_journal_for_upload(Uploader *u, if (cursor) { r = sd_journal_seek_cursor(j, cursor); - if (r < 0) { + if (r < 0) return log_error_errno(r, "Failed to seek to cursor %s: %m", cursor); - } } return process_journal_input(u, 1 + !!after_cursor); |