summaryrefslogtreecommitdiff
path: root/src/journal
diff options
context:
space:
mode:
Diffstat (limited to 'src/journal')
-rw-r--r--src/journal/compress.c116
-rw-r--r--src/journal/compress.h13
-rw-r--r--src/journal/coredump.c2
-rw-r--r--src/journal/journal-file.c98
-rw-r--r--src/journal/journal-file.h5
-rw-r--r--src/journal/journalctl.c2
-rw-r--r--src/journal/journald-native.c4
-rw-r--r--src/journal/journald-server.c45
-rw-r--r--src/journal/journald-stream.c2
-rw-r--r--src/journal/journald-syslog.c10
-rw-r--r--src/journal/sd-journal.c8
-rw-r--r--src/journal/test-compress-benchmark.c7
-rw-r--r--src/journal/test-compress.c121
13 files changed, 286 insertions, 147 deletions
diff --git a/src/journal/compress.c b/src/journal/compress.c
index 1a3d2cdd80..78935fee74 100644
--- a/src/journal/compress.c
+++ b/src/journal/compress.c
@@ -58,7 +58,8 @@ static const char* const object_compressed_table[_OBJECT_COMPRESSED_MAX] = {
DEFINE_STRING_TABLE_LOOKUP(object_compressed, int);
-int compress_blob_xz(const void *src, uint64_t src_size, void *dst, size_t *dst_size) {
+int compress_blob_xz(const void *src, uint64_t src_size,
+ void *dst, size_t dst_alloc_size, size_t *dst_size) {
#ifdef HAVE_XZ
static const lzma_options_lzma opt = {
1u << 20u, NULL, 0, LZMA_LC_DEFAULT, LZMA_LP_DEFAULT,
@@ -74,6 +75,7 @@ int compress_blob_xz(const void *src, uint64_t src_size, void *dst, size_t *dst_
assert(src);
assert(src_size > 0);
assert(dst);
+ assert(dst_alloc_size > 0);
assert(dst_size);
/* Returns < 0 if we couldn't compress the data or the
@@ -83,7 +85,7 @@ int compress_blob_xz(const void *src, uint64_t src_size, void *dst, size_t *dst_
return -ENOBUFS;
ret = lzma_stream_buffer_encode((lzma_filter*) filters, LZMA_CHECK_NONE, NULL,
- src, src_size, dst, &out_pos, src_size - 1);
+ src, src_size, dst, &out_pos, dst_alloc_size);
if (ret != LZMA_OK)
return -ENOBUFS;
@@ -94,13 +96,15 @@ int compress_blob_xz(const void *src, uint64_t src_size, void *dst, size_t *dst_
#endif
}
-int compress_blob_lz4(const void *src, uint64_t src_size, void *dst, size_t *dst_size) {
+int compress_blob_lz4(const void *src, uint64_t src_size,
+ void *dst, size_t dst_alloc_size, size_t *dst_size) {
#ifdef HAVE_LZ4
int r;
assert(src);
assert(src_size > 0);
assert(dst);
+ assert(dst_alloc_size > 0);
assert(dst_size);
/* Returns < 0 if we couldn't compress the data or the
@@ -109,7 +113,7 @@ int compress_blob_lz4(const void *src, uint64_t src_size, void *dst, size_t *dst
if (src_size < 9)
return -ENOBUFS;
- r = LZ4_compress_limitedOutput(src, dst + 8, src_size, src_size - 8 - 1);
+ r = LZ4_compress_limitedOutput(src, dst + 8, src_size, (int) dst_alloc_size - 8);
if (r <= 0)
return -ENOBUFS;
@@ -306,6 +310,7 @@ int decompress_startswith_lz4(const void *src, uint64_t src_size,
* prefix */
int r;
+ size_t size;
assert(src);
assert(src_size > 0);
@@ -322,10 +327,18 @@ int decompress_startswith_lz4(const void *src, uint64_t src_size,
r = LZ4_decompress_safe_partial(src + 8, *buffer, src_size - 8,
prefix_len + 1, *buffer_size);
+ if (r >= 0)
+ size = (unsigned) r;
+ else {
+ /* lz4 always tries to decode full "sequence", so in
+ * pathological cases might need to decompress the
+ * full field. */
+ r = decompress_blob_lz4(src, src_size, buffer, buffer_size, &size, 0);
+ if (r < 0)
+ return r;
+ }
- if (r < 0)
- return -EBADMSG;
- if ((unsigned) r >= prefix_len + 1)
+ if (size >= prefix_len + 1)
return memcmp(*buffer, prefix, prefix_len) == 0 &&
((const uint8_t*) *buffer)[prefix_len] == extra;
else
@@ -438,7 +451,7 @@ int compress_stream_lz4(int fdf, int fdt, uint64_t max_bytes) {
_cleanup_(LZ4F_freeCompressionContextp) LZ4F_compressionContext_t ctx = NULL;
_cleanup_free_ char *buf = NULL;
char *src = NULL;
- size_t size, n, total_in = 0, total_out = 0, offset = 0, frame_size;
+ size_t size, n, total_in = 0, total_out, offset = 0, frame_size;
struct stat st;
int r;
static const LZ4F_compressOptions_t options = {
@@ -461,7 +474,7 @@ int compress_stream_lz4(int fdf, int fdt, uint64_t max_bytes) {
if (!buf)
return -ENOMEM;
- n = offset = LZ4F_compressBegin(ctx, buf, size, &preferences);
+ n = offset = total_out = LZ4F_compressBegin(ctx, buf, size, &preferences);
if (LZ4F_isError(n))
return -EINVAL;
@@ -599,80 +612,8 @@ int decompress_stream_xz(int fdf, int fdt, uint64_t max_bytes) {
#endif
}
+int decompress_stream_lz4(int in, int out, uint64_t max_bytes) {
#ifdef HAVE_LZ4
-static int decompress_stream_lz4_v1(int fdf, int fdt, uint64_t max_bytes) {
-
- _cleanup_free_ char *buf = NULL, *out = NULL;
- size_t buf_size = 0;
- LZ4_streamDecode_t lz4_data = {};
- le32_t header;
- size_t total_in = sizeof(header), total_out = 0;
-
- assert(fdf >= 0);
- assert(fdt >= 0);
-
- out = malloc(4*LZ4_BUFSIZE);
- if (!out)
- return -ENOMEM;
-
- for (;;) {
- ssize_t m;
- int r;
-
- r = loop_read_exact(fdf, &header, sizeof(header), false);
- if (r < 0)
- return r;
-
- m = le32toh(header);
- if (m == 0)
- break;
-
- /* We refuse to use a bigger decompression buffer than
- * the one used for compression by 4 times. This means
- * that compression buffer size can be enlarged 4
- * times. This can be changed, but old binaries might
- * not accept buffers compressed by newer binaries then.
- */
- if (m > LZ4_COMPRESSBOUND(LZ4_BUFSIZE * 4)) {
- log_debug("Compressed stream block too big: %zd bytes", m);
- return -ENOBUFS;
- }
-
- total_in += sizeof(header) + m;
-
- if (!GREEDY_REALLOC(buf, buf_size, m))
- return -ENOMEM;
-
- r = loop_read_exact(fdf, buf, m, false);
- if (r < 0)
- return r;
-
- r = LZ4_decompress_safe_continue(&lz4_data, buf, out, m, 4*LZ4_BUFSIZE);
- if (r <= 0) {
- log_debug("LZ4 decompression failed (legacy format).");
- return -EBADMSG;
- }
-
- total_out += r;
-
- if (max_bytes != (uint64_t) -1 && (uint64_t) total_out > max_bytes) {
- log_debug("Decompressed stream longer than %" PRIu64 " bytes", max_bytes);
- return -EFBIG;
- }
-
- r = loop_write(fdt, out, r, false);
- if (r < 0)
- return r;
- }
-
- log_debug("LZ4 decompression finished (legacy format, %zu -> %zu bytes, %.1f%%)",
- total_in, total_out,
- (double) total_out / total_in * 100);
-
- return 0;
-}
-
-static int decompress_stream_lz4_v2(int in, int out, uint64_t max_bytes) {
size_t c;
_cleanup_(LZ4F_freeDecompressionContextp) LZ4F_decompressionContext_t ctx = NULL;
_cleanup_free_ char *buf = NULL;
@@ -726,17 +667,6 @@ static int decompress_stream_lz4_v2(int in, int out, uint64_t max_bytes) {
cleanup:
munmap(src, st.st_size);
return r;
-}
-#endif
-
-int decompress_stream_lz4(int fdf, int fdt, uint64_t max_bytes) {
-#ifdef HAVE_LZ4
- int r;
-
- r = decompress_stream_lz4_v2(fdf, fdt, max_bytes);
- if (r == -EBADMSG)
- r = decompress_stream_lz4_v1(fdf, fdt, max_bytes);
- return r;
#else
log_debug("Cannot decompress file. Compiled without LZ4 support.");
return -EPROTONOSUPPORT;
diff --git a/src/journal/compress.h b/src/journal/compress.h
index 9a065eb763..758598730a 100644
--- a/src/journal/compress.h
+++ b/src/journal/compress.h
@@ -28,17 +28,20 @@
const char* object_compressed_to_string(int compression);
int object_compressed_from_string(const char *compression);
-int compress_blob_xz(const void *src, uint64_t src_size, void *dst, size_t *dst_size);
-int compress_blob_lz4(const void *src, uint64_t src_size, void *dst, size_t *dst_size);
+int compress_blob_xz(const void *src, uint64_t src_size,
+ void *dst, size_t dst_alloc_size, size_t *dst_size);
+int compress_blob_lz4(const void *src, uint64_t src_size,
+ void *dst, size_t dst_alloc_size, size_t *dst_size);
-static inline int compress_blob(const void *src, uint64_t src_size, void *dst, size_t *dst_size) {
+static inline int compress_blob(const void *src, uint64_t src_size,
+ void *dst, size_t dst_alloc_size, size_t *dst_size) {
int r;
#ifdef HAVE_LZ4
- r = compress_blob_lz4(src, src_size, dst, dst_size);
+ r = compress_blob_lz4(src, src_size, dst, dst_alloc_size, dst_size);
if (r == 0)
return OBJECT_COMPRESSED_LZ4;
#else
- r = compress_blob_xz(src, src_size, dst, dst_size);
+ r = compress_blob_xz(src, src_size, dst, dst_alloc_size, dst_size);
if (r == 0)
return OBJECT_COMPRESSED_XZ;
#endif
diff --git a/src/journal/coredump.c b/src/journal/coredump.c
index 7a260c32e0..7df59fe29b 100644
--- a/src/journal/coredump.c
+++ b/src/journal/coredump.c
@@ -532,7 +532,7 @@ static int compose_open_fds(pid_t pid, char **open_fds) {
errno = 0;
stream = safe_fclose(stream);
- if (errno != 0)
+ if (errno > 0)
return -errno;
*open_fds = buffer;
diff --git a/src/journal/journal-file.c b/src/journal/journal-file.c
index 6f09301521..929ad0aa7c 100644
--- a/src/journal/journal-file.c
+++ b/src/journal/journal-file.c
@@ -39,6 +39,7 @@
#include "lookup3.h"
#include "parse-util.h"
#include "random-util.h"
+#include "sd-event.h"
#include "string-util.h"
#include "xattr-util.h"
@@ -149,6 +150,17 @@ JournalFile* journal_file_close(JournalFile *f) {
journal_file_append_tag(f);
#endif
+ if (f->post_change_timer) {
+ int enabled;
+
+ if (sd_event_source_get_enabled(f->post_change_timer, &enabled) >= 0)
+ if (enabled == SD_EVENT_ONESHOT)
+ journal_file_post_change(f);
+
+ sd_event_source_set_enabled(f->post_change_timer, SD_EVENT_OFF);
+ sd_event_source_unref(f->post_change_timer);
+ }
+
journal_file_set_offline(f);
if (f->mmap && f->fd >= 0)
@@ -1084,7 +1096,7 @@ static int journal_file_append_data(
if (JOURNAL_FILE_COMPRESS(f) && size >= COMPRESSION_SIZE_THRESHOLD) {
size_t rsize = 0;
- compression = compress_blob(data, size, o->data.payload, &rsize);
+ compression = compress_blob(data, size, o->data.payload, size - 1, &rsize);
if (compression >= 0) {
o->object.size = htole64(offsetof(Object, data.payload) + rsize);
@@ -1400,6 +1412,77 @@ void journal_file_post_change(JournalFile *f) {
log_error_errno(errno, "Failed to truncate file to its own size: %m");
}
+static int post_change_thunk(sd_event_source *timer, uint64_t usec, void *userdata) {
+ assert(userdata);
+
+ journal_file_post_change(userdata);
+
+ return 1;
+}
+
+static void schedule_post_change(JournalFile *f) {
+ sd_event_source *timer;
+ int enabled, r;
+ uint64_t now;
+
+ assert(f);
+ assert(f->post_change_timer);
+
+ timer = f->post_change_timer;
+
+ r = sd_event_source_get_enabled(timer, &enabled);
+ if (r < 0) {
+ log_error_errno(-r, "Failed to get ftruncate timer state: %m");
+ return;
+ }
+
+ if (enabled == SD_EVENT_ONESHOT)
+ return;
+
+ r = sd_event_now(sd_event_source_get_event(timer), CLOCK_MONOTONIC, &now);
+ if (r < 0) {
+ log_error_errno(-r, "Failed to get clock's now for scheduling ftruncate: %m");
+ return;
+ }
+
+ r = sd_event_source_set_time(timer, now+f->post_change_timer_period);
+ if (r < 0) {
+ log_error_errno(-r, "Failed to set time for scheduling ftruncate: %m");
+ return;
+ }
+
+ r = sd_event_source_set_enabled(timer, SD_EVENT_ONESHOT);
+ if (r < 0) {
+ log_error_errno(-r, "Failed to enable scheduled ftruncate: %m");
+ return;
+ }
+}
+
+/* Enable coalesced change posting in a timer on the provided sd_event instance */
+int journal_file_enable_post_change_timer(JournalFile *f, sd_event *e, usec_t t) {
+ _cleanup_(sd_event_source_unrefp) sd_event_source *timer = NULL;
+ int r;
+
+ assert(f);
+ assert_return(!f->post_change_timer, -EINVAL);
+ assert(e);
+ assert(t);
+
+ r = sd_event_add_time(e, &timer, CLOCK_MONOTONIC, 0, 0, post_change_thunk, f);
+ if (r < 0)
+ return r;
+
+ r = sd_event_source_set_enabled(timer, SD_EVENT_OFF);
+ if (r < 0)
+ return r;
+
+ f->post_change_timer = timer;
+ timer = NULL;
+ f->post_change_timer_period = t;
+
+ return r;
+}
+
static int entry_item_cmp(const void *_a, const void *_b) {
const EntryItem *a = _a, *b = _b;
@@ -1465,7 +1548,10 @@ int journal_file_append_entry(JournalFile *f, const dual_timestamp *ts, const st
if (mmap_cache_got_sigbus(f->mmap, f->fd))
r = -EIO;
- journal_file_post_change(f);
+ if (f->post_change_timer)
+ schedule_post_change(f);
+ else
+ journal_file_post_change(f);
return r;
}
@@ -2767,6 +2853,14 @@ int journal_file_open(
goto fail;
}
+ if (template && template->post_change_timer) {
+ sd_event *e = sd_event_source_get_event(template->post_change_timer);
+
+ r = journal_file_enable_post_change_timer(f, e, template->post_change_timer_period);
+ if (r < 0)
+ goto fail;
+ }
+
*ret = f;
return 0;
diff --git a/src/journal/journal-file.h b/src/journal/journal-file.h
index 46c1f3278e..7970ebe738 100644
--- a/src/journal/journal-file.h
+++ b/src/journal/journal-file.h
@@ -33,6 +33,7 @@
#include "journal-def.h"
#include "macro.h"
#include "mmap-cache.h"
+#include "sd-event.h"
#include "sparse-endian.h"
typedef struct JournalMetrics {
@@ -101,6 +102,9 @@ typedef struct JournalFile {
JournalMetrics metrics;
MMapCache *mmap;
+ sd_event_source *post_change_timer;
+ usec_t post_change_timer_period;
+
OrderedHashmap *chain_cache;
#if defined(HAVE_XZ) || defined(HAVE_LZ4)
@@ -224,6 +228,7 @@ void journal_file_print_header(JournalFile *f);
int journal_file_rotate(JournalFile **f, bool compress, bool seal);
void journal_file_post_change(JournalFile *f);
+int journal_file_enable_post_change_timer(JournalFile *f, sd_event *e, usec_t t);
void journal_reset_metrics(JournalMetrics *m);
void journal_default_metrics(JournalMetrics *m, int fd);
diff --git a/src/journal/journalctl.c b/src/journal/journalctl.c
index d009b2e93b..db11421e7a 100644
--- a/src/journal/journalctl.c
+++ b/src/journal/journalctl.c
@@ -2336,7 +2336,7 @@ int main(int argc, char *argv[]) {
flags =
arg_all * OUTPUT_SHOW_ALL |
arg_full * OUTPUT_FULL_WIDTH |
- on_tty() * OUTPUT_COLOR |
+ colors_enabled() * OUTPUT_COLOR |
arg_catalog * OUTPUT_CATALOG |
arg_utc * OUTPUT_UTC;
diff --git a/src/journal/journald-native.c b/src/journal/journald-native.c
index 371df5b37f..f80a6ebfe5 100644
--- a/src/journal/journald-native.c
+++ b/src/journal/journald-native.c
@@ -495,5 +495,9 @@ int server_open_native_socket(Server*s) {
if (r < 0)
return log_error_errno(r, "Failed to add native server fd to event loop: %m");
+ r = sd_event_source_set_priority(s->native_event_source, SD_EVENT_PRIORITY_NORMAL+5);
+ if (r < 0)
+ return log_error_errno(r, "Failed to adjust native event source priority: %m");
+
return 0;
}
diff --git a/src/journal/journald-server.c b/src/journal/journald-server.c
index a8a9b72080..c3add87ed1 100644
--- a/src/journal/journald-server.c
+++ b/src/journal/journald-server.c
@@ -82,6 +82,9 @@
#define NOTIFY_SNDBUF_SIZE (8*1024*1024)
+/* The period to insert between posting changes for coalescing */
+#define POST_CHANGE_TIMER_INTERVAL_USEC (250*USEC_PER_MSEC)
+
static int determine_space_for(
Server *s,
JournalMetrics *metrics,
@@ -220,6 +223,38 @@ static void server_add_acls(JournalFile *f, uid_t uid) {
#endif
}
+static int open_journal(
+ Server *s,
+ bool reliably,
+ const char *fname,
+ int flags,
+ bool seal,
+ JournalMetrics *metrics,
+ JournalFile *template,
+ JournalFile **ret) {
+ int r;
+
+ assert(s);
+ assert(fname);
+ assert(ret);
+
+ if (reliably)
+ r = journal_file_open_reliably(fname, flags, 0640, s->compress, seal, metrics, s->mmap, template, ret);
+ else
+ r = journal_file_open(fname, flags, 0640, s->compress, seal, metrics, s->mmap, template, ret);
+
+ if (r < 0)
+ return r;
+
+ r = journal_file_enable_post_change_timer(*ret, s->event, POST_CHANGE_TIMER_INTERVAL_USEC);
+ if (r < 0) {
+ *ret = journal_file_close(*ret);
+ return r;
+ }
+
+ return r;
+}
+
static JournalFile* find_journal(Server *s, uid_t uid) {
_cleanup_free_ char *p = NULL;
int r;
@@ -258,7 +293,7 @@ static JournalFile* find_journal(Server *s, uid_t uid) {
journal_file_close(f);
}
- r = journal_file_open_reliably(p, O_RDWR|O_CREAT, 0640, s->compress, s->seal, &s->system_metrics, s->mmap, NULL, &f);
+ r = open_journal(s, true, p, O_RDWR|O_CREAT, s->seal, &s->system_metrics, NULL, &f);
if (r < 0)
return s->system_journal;
@@ -930,7 +965,7 @@ static int system_journal_open(Server *s, bool flush_requested) {
(void) mkdir(fn, 0755);
fn = strjoina(fn, "/system.journal");
- r = journal_file_open_reliably(fn, O_RDWR|O_CREAT, 0640, s->compress, s->seal, &s->system_metrics, s->mmap, NULL, &s->system_journal);
+ r = open_journal(s, true, fn, O_RDWR|O_CREAT, s->seal, &s->system_metrics, NULL, &s->system_journal);
if (r >= 0) {
server_add_acls(s->system_journal, 0);
(void) determine_space_for(s, &s->system_metrics, "/var/log/journal/", "System journal", true, true, NULL, NULL);
@@ -953,7 +988,7 @@ static int system_journal_open(Server *s, bool flush_requested) {
* if it already exists, so that we can flush
* it into the system journal */
- r = journal_file_open(fn, O_RDWR, 0640, s->compress, false, &s->runtime_metrics, s->mmap, NULL, &s->runtime_journal);
+ r = open_journal(s, false, fn, O_RDWR, false, &s->runtime_metrics, NULL, &s->runtime_journal);
if (r < 0) {
if (r != -ENOENT)
log_warning_errno(r, "Failed to open runtime journal: %m");
@@ -970,7 +1005,7 @@ static int system_journal_open(Server *s, bool flush_requested) {
(void) mkdir("/run/log/journal", 0755);
(void) mkdir_parents(fn, 0750);
- r = journal_file_open_reliably(fn, O_RDWR|O_CREAT, 0640, s->compress, false, &s->runtime_metrics, s->mmap, NULL, &s->runtime_journal);
+ r = open_journal(s, true, fn, O_RDWR|O_CREAT, false, &s->runtime_metrics, NULL, &s->runtime_journal);
if (r < 0)
return log_error_errno(r, "Failed to open runtime journal: %m");
}
@@ -1330,7 +1365,7 @@ static int server_parse_proc_cmdline(Server *s) {
p = line;
for(;;) {
- _cleanup_free_ char *word;
+ _cleanup_free_ char *word = NULL;
r = extract_first_word(&p, &word, NULL, 0);
if (r < 0)
diff --git a/src/journal/journald-stream.c b/src/journal/journald-stream.c
index 131fcdac42..90884b6929 100644
--- a/src/journal/journald-stream.c
+++ b/src/journal/journald-stream.c
@@ -733,7 +733,7 @@ int server_open_stdout_socket(Server *s) {
if (r < 0)
return log_error_errno(r, "Failed to add stdout server fd to event source: %m");
- r = sd_event_source_set_priority(s->stdout_event_source, SD_EVENT_PRIORITY_NORMAL+10);
+ r = sd_event_source_set_priority(s->stdout_event_source, SD_EVENT_PRIORITY_NORMAL+5);
if (r < 0)
return log_error_errno(r, "Failed to adjust priority of stdout server event source: %m");
diff --git a/src/journal/journald-syslog.c b/src/journal/journald-syslog.c
index cfc50d889b..0be73088e2 100644
--- a/src/journal/journald-syslog.c
+++ b/src/journal/journald-syslog.c
@@ -326,7 +326,7 @@ void server_process_syslog_message(
size_t label_len) {
char syslog_priority[sizeof("PRIORITY=") + DECIMAL_STR_MAX(int)],
- syslog_facility[sizeof("SYSLOG_FACILITY") + DECIMAL_STR_MAX(int)];
+ syslog_facility[sizeof("SYSLOG_FACILITY=") + DECIMAL_STR_MAX(int)];
const char *message = NULL, *syslog_identifier = NULL, *syslog_pid = NULL;
struct iovec iovec[N_IOVEC_META_FIELDS + 6];
unsigned n = 0;
@@ -357,11 +357,11 @@ void server_process_syslog_message(
IOVEC_SET_STRING(iovec[n++], "_TRANSPORT=syslog");
- sprintf(syslog_priority, "PRIORITY=%i", priority & LOG_PRIMASK);
+ xsprintf(syslog_priority, "PRIORITY=%i", priority & LOG_PRIMASK);
IOVEC_SET_STRING(iovec[n++], syslog_priority);
if (priority & LOG_FACMASK) {
- sprintf(syslog_facility, "SYSLOG_FACILITY=%i", LOG_FAC(priority));
+ xsprintf(syslog_facility, "SYSLOG_FACILITY=%i", LOG_FAC(priority));
IOVEC_SET_STRING(iovec[n++], syslog_facility);
}
@@ -430,6 +430,10 @@ int server_open_syslog_socket(Server *s) {
if (r < 0)
return log_error_errno(r, "Failed to add syslog server fd to event loop: %m");
+ r = sd_event_source_set_priority(s->syslog_event_source, SD_EVENT_PRIORITY_NORMAL+5);
+ if (r < 0)
+ return log_error_errno(r, "Failed to adjust syslog event source priority: %m");
+
return 0;
}
diff --git a/src/journal/sd-journal.c b/src/journal/sd-journal.c
index 5cde7f17f7..cd5160154a 100644
--- a/src/journal/sd-journal.c
+++ b/src/journal/sd-journal.c
@@ -1940,10 +1940,14 @@ _public_ int sd_journal_get_data(sd_journal *j, const char *field, const void **
compression = o->object.flags & OBJECT_COMPRESSION_MASK;
if (compression) {
#if defined(HAVE_XZ) || defined(HAVE_LZ4)
- if (decompress_startswith(compression,
+ r = decompress_startswith(compression,
o->data.payload, l,
&f->compress_buffer, &f->compress_buffer_size,
- field, field_length, '=')) {
+ field, field_length, '=');
+ if (r < 0)
+ log_debug_errno(r, "Cannot decompress %s object of length %zu at offset "OFSfmt": %m",
+ object_compressed_to_string(compression), l, p);
+ else if (r > 0) {
size_t rsize;
diff --git a/src/journal/test-compress-benchmark.c b/src/journal/test-compress-benchmark.c
index 93ea9c6318..baed0d82a4 100644
--- a/src/journal/test-compress-benchmark.c
+++ b/src/journal/test-compress-benchmark.c
@@ -27,7 +27,8 @@
#include "string-util.h"
#include "util.h"
-typedef int (compress_t)(const void *src, uint64_t src_size, void *dst, size_t *dst_size);
+typedef int (compress_t)(const void *src, uint64_t src_size, void *dst,
+ size_t dst_alloc_size, size_t *dst_size);
typedef int (decompress_t)(const void *src, uint64_t src_size,
void **dst, size_t *dst_alloc_size, size_t* dst_size, size_t dst_max);
@@ -111,8 +112,8 @@ static void test_compress_decompress(const char* label, const char* type,
memzero(buf, MIN(size + 1000, MAX_SIZE));
- r = compress(text, size, buf, &j);
- /* assume compression must be successful except for small inputs */
+ r = compress(text, size, buf, size, &j);
+ /* assume compression must be successful except for small or random inputs */
assert_se(r == 0 || (size < 2048 && r == -ENOBUFS) || streq(type, "random"));
/* check for overwrites */
diff --git a/src/journal/test-compress.c b/src/journal/test-compress.c
index b9d90a8988..68c9a4d76c 100644
--- a/src/journal/test-compress.c
+++ b/src/journal/test-compress.c
@@ -17,6 +17,10 @@
along with systemd; If not, see <http://www.gnu.org/licenses/>.
***/
+#ifdef HAVE_LZ4
+#include <lz4.h>
+#endif
+
#include "alloc-util.h"
#include "compress.h"
#include "fd-util.h"
@@ -38,7 +42,7 @@
#endif
typedef int (compress_blob_t)(const void *src, uint64_t src_size,
- void *dst, size_t *dst_size);
+ void *dst, size_t dst_alloc_size, size_t *dst_size);
typedef int (decompress_blob_t)(const void *src, uint64_t src_size,
void **dst, size_t *dst_alloc_size,
size_t* dst_size, size_t dst_max);
@@ -57,15 +61,14 @@ static void test_compress_decompress(int compression,
size_t data_len,
bool may_fail) {
char compressed[512];
- size_t csize = 512;
- size_t usize = 0;
+ size_t csize, usize = 0;
_cleanup_free_ char *decompressed = NULL;
int r;
log_info("/* testing %s %s blob compression/decompression */",
object_compressed_to_string(compression), data);
- r = compress(data, data_len, compressed, &csize);
+ r = compress(data, data_len, compressed, sizeof(compressed), &csize);
if (r == -ENOBUFS) {
log_info_errno(r, "compression failed: %m");
assert_se(may_fail);
@@ -101,43 +104,45 @@ static void test_decompress_startswith(int compression,
size_t data_len,
bool may_fail) {
- char compressed[512];
- size_t csize = 512;
- size_t usize = 0;
- _cleanup_free_ char *decompressed = NULL;
+ char *compressed;
+ _cleanup_free_ char *compressed1 = NULL, *compressed2 = NULL, *decompressed = NULL;
+ size_t csize, usize = 0, len;
int r;
- log_info("/* testing decompress_startswith with %s on %s text*/",
+ log_info("/* testing decompress_startswith with %s on %.20s text*/",
object_compressed_to_string(compression), data);
- r = compress(data, data_len, compressed, &csize);
+#define BUFSIZE_1 512
+#define BUFSIZE_2 20000
+
+ compressed = compressed1 = malloc(BUFSIZE_1);
+ assert_se(compressed1);
+ r = compress(data, data_len, compressed, BUFSIZE_1, &csize);
if (r == -ENOBUFS) {
log_info_errno(r, "compression failed: %m");
assert_se(may_fail);
- return;
+
+ compressed = compressed2 = malloc(BUFSIZE_2);
+ assert_se(compressed2);
+ r = compress(data, data_len, compressed, BUFSIZE_2, &csize);
+ assert(r == 0);
}
assert_se(r == 0);
- assert_se(decompress_sw(compressed,
- csize,
- (void **) &decompressed,
- &usize,
- data, strlen(data), '\0') > 0);
- assert_se(decompress_sw(compressed,
- csize,
- (void **) &decompressed,
- &usize,
- data, strlen(data), 'w') == 0);
- assert_se(decompress_sw(compressed,
- csize,
- (void **) &decompressed,
- &usize,
- "barbarbar", 9, ' ') == 0);
- assert_se(decompress_sw(compressed,
- csize,
- (void **) &decompressed,
- &usize,
- data, strlen(data), '\0') > 0);
+ len = strlen(data);
+
+ r = decompress_sw(compressed, csize, (void **) &decompressed, &usize, data, len, '\0');
+ assert_se(r > 0);
+ r = decompress_sw(compressed, csize, (void **) &decompressed, &usize, data, len, 'w');
+ assert_se(r == 0);
+ r = decompress_sw(compressed, csize, (void **) &decompressed, &usize, "barbarbar", 9, ' ');
+ assert_se(r == 0);
+ r = decompress_sw(compressed, csize, (void **) &decompressed, &usize, data, len - 1, data[len-1]);
+ assert_se(r > 0);
+ r = decompress_sw(compressed, csize, (void **) &decompressed, &usize, data, len - 1, 'w');
+ assert_se(r == 0);
+ r = decompress_sw(compressed, csize, (void **) &decompressed, &usize, data, len, '\0');
+ assert_se(r > 0);
}
static void test_compress_stream(int compression,
@@ -199,6 +204,44 @@ static void test_compress_stream(int compression,
assert_se(unlink(pattern2) == 0);
}
+#ifdef HAVE_LZ4
+static void test_lz4_decompress_partial(void) {
+ char buf[20000];
+ size_t buf_size = sizeof(buf), compressed;
+ int r;
+ _cleanup_free_ char *huge = NULL;
+
+#define HUGE_SIZE (4096*1024)
+ huge = malloc(HUGE_SIZE);
+ memset(huge, 'x', HUGE_SIZE);
+ memcpy(huge, "HUGE=", 5);
+
+ r = LZ4_compress_limitedOutput(huge, buf, HUGE_SIZE, buf_size);
+ assert_se(r >= 0);
+ compressed = r;
+ log_info("Compressed %i → %zu", HUGE_SIZE, compressed);
+
+ r = LZ4_decompress_safe(buf, huge, r, HUGE_SIZE);
+ assert_se(r >= 0);
+ log_info("Decompressed → %i", r);
+
+ r = LZ4_decompress_safe_partial(buf, huge,
+ compressed,
+ 12, HUGE_SIZE);
+ assert_se(r >= 0);
+ log_info("Decompressed partial %i/%i → %i", 12, HUGE_SIZE, r);
+
+ /* We expect this to fail, because that's how current lz4 works. If this
+ * call succeeds, then lz4 has been fixed, and we need to change our code.
+ */
+ r = LZ4_decompress_safe_partial(buf, huge,
+ compressed,
+ 12, HUGE_SIZE-1);
+ assert_se(r < 0);
+ log_info("Decompressed partial %i/%i → %i", 12, HUGE_SIZE-1, r);
+}
+#endif
+
int main(int argc, char *argv[]) {
const char text[] =
"text\0foofoofoofoo AAAA aaaaaaaaa ghost busters barbarbar FFF"
@@ -206,6 +249,11 @@ int main(int argc, char *argv[]) {
char data[512] = "random\0";
+ char huge[4096*1024];
+ memset(huge, 'x', sizeof(huge));
+ memcpy(huge, "HUGE=", 5);
+ char_array_0(huge);
+
log_set_max_level(LOG_DEBUG);
random_bytes(data + 7, sizeof(data) - 7);
@@ -215,12 +263,17 @@ int main(int argc, char *argv[]) {
text, sizeof(text), false);
test_compress_decompress(OBJECT_COMPRESSED_XZ, compress_blob_xz, decompress_blob_xz,
data, sizeof(data), true);
+
test_decompress_startswith(OBJECT_COMPRESSED_XZ,
compress_blob_xz, decompress_startswith_xz,
text, sizeof(text), false);
test_decompress_startswith(OBJECT_COMPRESSED_XZ,
compress_blob_xz, decompress_startswith_xz,
data, sizeof(data), true);
+ test_decompress_startswith(OBJECT_COMPRESSED_XZ,
+ compress_blob_xz, decompress_startswith_xz,
+ huge, sizeof(huge), true);
+
test_compress_stream(OBJECT_COMPRESSED_XZ, "xzcat",
compress_stream_xz, decompress_stream_xz, argv[0]);
#else
@@ -232,15 +285,21 @@ int main(int argc, char *argv[]) {
text, sizeof(text), false);
test_compress_decompress(OBJECT_COMPRESSED_LZ4, compress_blob_lz4, decompress_blob_lz4,
data, sizeof(data), true);
+
test_decompress_startswith(OBJECT_COMPRESSED_LZ4,
compress_blob_lz4, decompress_startswith_lz4,
text, sizeof(text), false);
test_decompress_startswith(OBJECT_COMPRESSED_LZ4,
compress_blob_lz4, decompress_startswith_lz4,
data, sizeof(data), true);
+ test_decompress_startswith(OBJECT_COMPRESSED_LZ4,
+ compress_blob_lz4, decompress_startswith_lz4,
+ huge, sizeof(huge), true);
test_compress_stream(OBJECT_COMPRESSED_LZ4, "lz4cat",
compress_stream_lz4, decompress_stream_lz4, argv[0]);
+
+ test_lz4_decompress_partial();
#else
log_info("/* LZ4 test skipped */");
#endif