summaryrefslogtreecommitdiff
path: root/src/journal-remote/journal-upload-journal.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/journal-remote/journal-upload-journal.c')
-rw-r--r--src/journal-remote/journal-upload-journal.c77
1 files changed, 57 insertions, 20 deletions
diff --git a/src/journal-remote/journal-upload-journal.c b/src/journal-remote/journal-upload-journal.c
index 5fd639a76a..8ce8e1895e 100644
--- a/src/journal-remote/journal-upload-journal.c
+++ b/src/journal-remote/journal-upload-journal.c
@@ -1,11 +1,31 @@
-#include <stdbool.h>
+/***
+ This file is part of systemd.
+
+ Copyright 2014 Zbigniew Jędrzejewski-Szmek
+
+ systemd is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 2.1 of the License, or
+ (at your option) any later version.
+
+ systemd is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with systemd; If not, see <http://www.gnu.org/licenses/>.
+***/
#include <curl/curl.h>
+#include <stdbool.h>
-#include "util.h"
+#include "alloc-util.h"
+#include "journal-upload.h"
#include "log.h"
#include "utf8.h"
-#include "journal-upload.h"
+#include "util.h"
+#include "sd-daemon.h"
/**
* Write up to size bytes to buf. Return negative on error, and number of
@@ -17,12 +37,11 @@ static ssize_t write_entry(char *buf, size_t size, Uploader *u) {
assert(size <= SSIZE_MAX);
- while (true) {
+ for (;;) {
switch(u->entry_state) {
case ENTRY_CURSOR: {
- free(u->current_cursor);
- u->current_cursor = NULL;
+ u->current_cursor = mfree(u->current_cursor);
r = sd_journal_get_cursor(u->journal, &u->current_cursor);
if (r < 0)
@@ -34,7 +53,7 @@ static ssize_t write_entry(char *buf, size_t size, Uploader *u) {
/* not enough space */
return pos;
- u->entry_state ++;
+ u->entry_state++;
if (pos + r == size) {
/* exactly one character short, but we don't need it */
@@ -58,7 +77,7 @@ static ssize_t write_entry(char *buf, size_t size, Uploader *u) {
/* not enough space */
return pos;
- u->entry_state ++;
+ u->entry_state++;
if (r + pos == size) {
/* exactly one character short, but we don't need it */
@@ -83,7 +102,7 @@ static ssize_t write_entry(char *buf, size_t size, Uploader *u) {
/* not enough space */
return pos;
- u->entry_state ++;
+ u->entry_state++;
if (r + pos == size) {
/* exactly one character short, but we don't need it */
@@ -108,7 +127,7 @@ static ssize_t write_entry(char *buf, size_t size, Uploader *u) {
/* not enough space */
return pos;
- u->entry_state ++;
+ u->entry_state++;
if (r + pos == size) {
/* exactly one character short, but we don't need it */
@@ -138,7 +157,7 @@ static ssize_t write_entry(char *buf, size_t size, Uploader *u) {
continue;
}
- u->entry_state ++;
+ u->entry_state++;
} /* fall through */
case ENTRY_TEXT_FIELD:
@@ -188,7 +207,7 @@ static ssize_t write_entry(char *buf, size_t size, Uploader *u) {
pos += len + 1;
u->field_pos = len + 1;
- u->entry_state ++;
+ u->entry_state++;
} /* fall through */
case ENTRY_BINARY_FIELD_SIZE: {
@@ -202,7 +221,7 @@ static ssize_t write_entry(char *buf, size_t size, Uploader *u) {
memcpy(buf + pos, &le64, 8);
pos += 8;
- u->entry_state ++;
+ u->entry_state++;
continue;
}
@@ -212,8 +231,8 @@ static ssize_t write_entry(char *buf, size_t size, Uploader *u) {
return pos;
buf[pos++] = '\n';
- u->entry_state ++;
- u->entries_sent ++;
+ u->entry_state++;
+ u->entries_sent++;
return pos;
@@ -224,6 +243,22 @@ static ssize_t write_entry(char *buf, size_t size, Uploader *u) {
assert_not_reached("WTF?");
}
+static inline void check_update_watchdog(Uploader *u) {
+ usec_t after;
+ usec_t elapsed_time;
+
+ if (u->watchdog_usec <= 0)
+ return;
+
+ after = now(CLOCK_MONOTONIC);
+ elapsed_time = usec_sub(after, u->watchdog_timestamp);
+ if (elapsed_time > u->watchdog_usec / 2) {
+ log_debug("Update watchdog timer");
+ sd_notify(false, "WATCHDOG=1");
+ u->watchdog_timestamp = after;
+ }
+}
+
static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void *userp) {
Uploader *u = userp;
int r;
@@ -234,6 +269,8 @@ static size_t journal_input_callback(void *buf, size_t size, size_t nmemb, void
assert(u);
assert(nmemb <= SSIZE_MAX / size);
+ check_update_watchdog(u);
+
j = u->journal;
while (j && filled < size * nmemb) {
@@ -292,6 +329,9 @@ void close_journal_input(Uploader *u) {
static int process_journal_input(Uploader *u, int skip) {
int r;
+ if (u->uploading)
+ return 0;
+
r = sd_journal_next_skip(u->journal, skip);
if (r < 0)
return log_error_errno(r, "Failed to skip to next entry: %m");
@@ -329,10 +369,8 @@ static int dispatch_journal_input(sd_event_source *event,
assert(u);
- if (u->uploading) {
- log_warning("dispatch_journal_input called when uploading, ignoring.");
+ if (u->uploading)
return 0;
- }
log_debug("Detected journal input, checking for new data.");
return check_journal_input(u);
@@ -375,10 +413,9 @@ int open_journal_for_upload(Uploader *u,
if (cursor) {
r = sd_journal_seek_cursor(j, cursor);
- if (r < 0) {
+ if (r < 0)
return log_error_errno(r, "Failed to seek to cursor %s: %m",
cursor);
- }
}
return process_journal_input(u, 1 + !!after_cursor);