diff options
Diffstat (limited to 'src/journal')
26 files changed, 10443 insertions, 0 deletions
diff --git a/src/journal/.gitignore b/src/journal/.gitignore new file mode 100644 index 0000000000..d6a79460cd --- /dev/null +++ b/src/journal/.gitignore @@ -0,0 +1,2 @@ +/journald-gperf.c +/libsystemd-journal.pc diff --git a/src/journal/Makefile b/src/journal/Makefile new file mode 120000 index 0000000000..d0b0e8e008 --- /dev/null +++ b/src/journal/Makefile @@ -0,0 +1 @@ +../Makefile
\ No newline at end of file diff --git a/src/journal/cat.c b/src/journal/cat.c new file mode 100644 index 0000000000..f0a666642c --- /dev/null +++ b/src/journal/cat.c @@ -0,0 +1,182 @@ +/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/ + +/*** + This file is part of systemd. + + Copyright 2012 Lennart Poettering + + systemd is free software; you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + systemd is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with systemd; If not, see <http://www.gnu.org/licenses/>. +***/ + +#include <stdio.h> +#include <getopt.h> +#include <assert.h> +#include <unistd.h> +#include <stdlib.h> +#include <errno.h> +#include <sys/fcntl.h> + +#include <systemd/sd-journal.h> + +#include "util.h" +#include "build.h" + +static char *arg_identifier = NULL; +static int arg_priority = LOG_INFO; +static bool arg_level_prefix = true; + +static int help(void) { + + printf("%s [OPTIONS...] {COMMAND} ...\n\n" + "Execute process with stdout/stderr connected to the journal.\n\n" + " -h --help Show this help\n" + " --version Show package version\n" + " -t --identifier=STRING Set syslog identifier\n" + " -p --priority=PRIORITY Set priority value (0..7)\n" + " --level-prefix=BOOL Control whether level prefix shall be parsed\n", + program_invocation_short_name); + + return 0; +} + +static int parse_argv(int argc, char *argv[]) { + + enum { + ARG_VERSION = 0x100, + ARG_LEVEL_PREFIX + }; + + static const struct option options[] = { + { "help", no_argument, NULL, 'h' }, + { "version", no_argument, NULL, ARG_VERSION }, + { "identifier", required_argument, NULL, 't' }, + { "priority", required_argument, NULL, 'p' }, + { "level-prefix", required_argument, NULL, ARG_LEVEL_PREFIX }, + { NULL, 0, NULL, 0 } + }; + + int c; + + assert(argc >= 0); + assert(argv); + + while ((c = getopt_long(argc, argv, "+ht:p:", options, NULL)) >= 0) { + + switch (c) { + + case 'h': + help(); + return 0; + + case ARG_VERSION: + puts(PACKAGE_STRING); + puts(DISTRIBUTION); + puts(SYSTEMD_FEATURES); + return 0; + + case 't': + free(arg_identifier); + if (isempty(optarg)) + arg_identifier = NULL; + else { + arg_identifier = strdup(optarg); + if (!arg_identifier) { + log_error("Out of memory."); + return -ENOMEM; + } + } + break; + + case 'p': + arg_priority = log_level_from_string(optarg); + if (arg_priority < 0) { + log_error("Failed to parse priority value."); + return arg_priority; + } + break; + + case ARG_LEVEL_PREFIX: { + int k; + + k = parse_boolean(optarg); + if (k < 0) { + log_error("Failed to parse level prefix value."); + return k; + } + arg_level_prefix = k; + break; + } + + default: + log_error("Unknown option code %c", c); + return -EINVAL; + } + } + + return 1; +} + +int main(int argc, char *argv[]) { + int r, fd = -1, saved_stderr = -1; + + log_parse_environment(); + log_open(); + + r = parse_argv(argc, argv); + if (r <= 0) + goto finish; + + fd = sd_journal_stream_fd(arg_identifier, arg_priority, arg_level_prefix); + if (fd < 0) { + log_error("Failed to create stream fd: %s", strerror(-fd)); + r = fd; + goto finish; + } + + saved_stderr = fcntl(STDERR_FILENO, F_DUPFD_CLOEXEC, 3); + + if (dup3(fd, STDOUT_FILENO, 0) < 0 || + dup3(fd, STDERR_FILENO, 0) < 0) { + log_error("Failed to duplicate fd: %m"); + r = -errno; + goto finish; + } + + if (fd >= 3) + close_nointr_nofail(fd); + + fd = -1; + + if (argc <= optind) + execl("/bin/cat", "/bin/cat", NULL); + else + execvp(argv[optind], argv + optind); + + r = -errno; + + /* Let's try to restore a working stderr, so we can print the error message */ + if (saved_stderr >= 0) + dup3(saved_stderr, STDERR_FILENO, 0); + + log_error("Failed to execute process: %s", strerror(-r)); + +finish: + if (fd >= 0) + close_nointr_nofail(fd); + + if (saved_stderr >= 0) + close_nointr_nofail(saved_stderr); + + return r < 0 ? EXIT_FAILURE : EXIT_SUCCESS; +} diff --git a/src/journal/compress.c b/src/journal/compress.c new file mode 100644 index 0000000000..ff906581f0 --- /dev/null +++ b/src/journal/compress.c @@ -0,0 +1,208 @@ +/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/ + +/*** + This file is part of systemd. + + Copyright 2011 Lennart Poettering + + systemd is free software; you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + systemd is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with systemd; If not, see <http://www.gnu.org/licenses/>. +***/ + +#include <assert.h> +#include <stdlib.h> +#include <string.h> +#include <lzma.h> + +#include "compress.h" + +bool compress_blob(const void *src, uint64_t src_size, void *dst, uint64_t *dst_size) { + lzma_stream s = LZMA_STREAM_INIT; + lzma_ret ret; + bool b = false; + + assert(src); + assert(src_size > 0); + assert(dst); + assert(dst_size); + + /* Returns false if we couldn't compress the data or the + * compressed result is longer than the original */ + + ret = lzma_easy_encoder(&s, LZMA_PRESET_DEFAULT, LZMA_CHECK_NONE); + if (ret != LZMA_OK) + return false; + + s.next_in = src; + s.avail_in = src_size; + s.next_out = dst; + s.avail_out = src_size; + + /* Does it fit? */ + if (lzma_code(&s, LZMA_FINISH) != LZMA_STREAM_END) + goto fail; + + /* Is it actually shorter? */ + if (s.avail_out == 0) + goto fail; + + *dst_size = src_size - s.avail_out; + b = true; + +fail: + lzma_end(&s); + + return b; +} + +bool uncompress_blob(const void *src, uint64_t src_size, + void **dst, uint64_t *dst_alloc_size, uint64_t* dst_size) { + + lzma_stream s = LZMA_STREAM_INIT; + lzma_ret ret; + bool b = false; + + assert(src); + assert(src_size > 0); + assert(dst); + assert(dst_alloc_size); + assert(dst_size); + assert(*dst_alloc_size == 0 || *dst); + + ret = lzma_stream_decoder(&s, UINT64_MAX, 0); + if (ret != LZMA_OK) + return false; + + if (*dst_alloc_size <= src_size) { + void *p; + + p = realloc(*dst, src_size*2); + if (!p) + return false; + + *dst = p; + *dst_alloc_size = src_size*2; + } + + s.next_in = src; + s.avail_in = src_size; + + s.next_out = *dst; + s.avail_out = *dst_alloc_size; + + for (;;) { + void *p; + + ret = lzma_code(&s, LZMA_FINISH); + + if (ret == LZMA_STREAM_END) + break; + + if (ret != LZMA_OK) + goto fail; + + p = realloc(*dst, *dst_alloc_size*2); + if (!p) + goto fail; + + s.next_out = (uint8_t*) p + ((uint8_t*) s.next_out - (uint8_t*) *dst); + s.avail_out += *dst_alloc_size; + + *dst = p; + *dst_alloc_size *= 2; + } + + *dst_size = *dst_alloc_size - s.avail_out; + b = true; + +fail: + lzma_end(&s); + + return b; +} + +bool uncompress_startswith(const void *src, uint64_t src_size, + void **buffer, uint64_t *buffer_size, + const void *prefix, uint64_t prefix_len, + uint8_t extra) { + + lzma_stream s = LZMA_STREAM_INIT; + lzma_ret ret; + bool b = false; + + /* Checks whether the uncompressed blob starts with the + * mentioned prefix. The byte extra needs to follow the + * prefix */ + + assert(src); + assert(src_size > 0); + assert(buffer); + assert(buffer_size); + assert(prefix); + assert(*buffer_size == 0 || *buffer); + + ret = lzma_stream_decoder(&s, UINT64_MAX, 0); + if (ret != LZMA_OK) + return false; + + if (*buffer_size <= prefix_len) { + void *p; + + p = realloc(*buffer, prefix_len*2); + if (!p) + return false; + + *buffer = p; + *buffer_size = prefix_len*2; + } + + s.next_in = src; + s.avail_in = src_size; + + s.next_out = *buffer; + s.avail_out = *buffer_size; + + for (;;) { + void *p; + + ret = lzma_code(&s, LZMA_FINISH); + + if (ret != LZMA_STREAM_END && ret != LZMA_OK) + goto fail; + + if ((*buffer_size - s.avail_out > prefix_len) && + memcmp(*buffer, prefix, prefix_len) == 0 && + ((const uint8_t*) *buffer)[prefix_len] == extra) + break; + + if (ret == LZMA_STREAM_END) + goto fail; + + p = realloc(*buffer, *buffer_size*2); + if (!p) + goto fail; + + s.next_out = (uint8_t*) p + ((uint8_t*) s.next_out - (uint8_t*) *buffer); + s.avail_out += *buffer_size; + + *buffer = p; + *buffer_size *= 2; + } + + b = true; + +fail: + lzma_end(&s); + + return b; +} diff --git a/src/journal/compress.h b/src/journal/compress.h new file mode 100644 index 0000000000..f187a6e00c --- /dev/null +++ b/src/journal/compress.h @@ -0,0 +1,38 @@ +/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/ + +#ifndef foocompresshfoo +#define foocompresshfoo + +/*** + This file is part of systemd. + + Copyright 2011 Lennart Poettering + + systemd is free software; you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + systemd is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with systemd; If not, see <http://www.gnu.org/licenses/>. +***/ + +#include <inttypes.h> +#include <stdbool.h> + +bool compress_blob(const void *src, uint64_t src_size, void *dst, uint64_t *dst_size); + +bool uncompress_blob(const void *src, uint64_t src_size, + void **dst, uint64_t *dst_alloc_size, uint64_t* dst_size); + +bool uncompress_startswith(const void *src, uint64_t src_size, + void **buffer, uint64_t *buffer_size, + const void *prefix, uint64_t prefix_len, + uint8_t extra); + +#endif diff --git a/src/journal/coredump.c b/src/journal/coredump.c new file mode 100644 index 0000000000..7dea66e6fd --- /dev/null +++ b/src/journal/coredump.c @@ -0,0 +1,271 @@ +/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/ + +/*** + This file is part of systemd. + + Copyright 2012 Lennart Poettering + + systemd is free software; you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + systemd is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with systemd; If not, see <http://www.gnu.org/licenses/>. +***/ + +#include <errno.h> +#include <unistd.h> +#include <stdio.h> +#include <sys/prctl.h> + +#include <systemd/sd-journal.h> +#include <systemd/sd-login.h> + +#include "log.h" +#include "util.h" +#include "special.h" + +#define COREDUMP_MAX (24*1024*1024) + +enum { + ARG_PID = 1, + ARG_UID, + ARG_GID, + ARG_SIGNAL, + ARG_TIMESTAMP, + ARG_COMM, + _ARG_MAX +}; + +static int divert_coredump(void) { + FILE *f; + int r; + + log_info("Detected coredump of the journal daemon itself, diverting coredump to /var/lib/systemd/coredump/."); + + mkdir_p("/var/lib/systemd/coredump", 0755); + + f = fopen("/var/lib/systemd/coredump/core.systemd-journald", "we"); + if (!f) { + log_error("Failed to create coredump file: %m"); + return -errno; + } + + for (;;) { + uint8_t buffer[4096]; + size_t l, q; + + l = fread(buffer, 1, sizeof(buffer), stdin); + if (l <= 0) { + if (ferror(f)) { + log_error("Failed to read coredump: %m"); + r = -errno; + goto finish; + } + + r = 0; + break; + } + + q = fwrite(buffer, 1, l, f); + if (q != l) { + log_error("Failed to write coredump: %m"); + r = -errno; + goto finish; + } + } + + fflush(f); + + if (ferror(f)) { + log_error("Failed to write coredump: %m"); + r = -errno; + } + +finish: + fclose(f); + return r; +} + +int main(int argc, char* argv[]) { + int r, j = 0; + char *p = NULL; + ssize_t n; + pid_t pid; + uid_t uid; + gid_t gid; + struct iovec iovec[14]; + char *core_pid = NULL, *core_uid = NULL, *core_gid = NULL, *core_signal = NULL, + *core_timestamp = NULL, *core_comm = NULL, *core_exe = NULL, *core_unit = NULL, + *core_session = NULL, *core_message = NULL, *core_cmdline = NULL, *t; + + prctl(PR_SET_DUMPABLE, 0); + + if (argc != _ARG_MAX) { + log_set_target(LOG_TARGET_JOURNAL_OR_KMSG); + log_open(); + + log_error("Invalid number of arguments passed from kernel."); + r = -EINVAL; + goto finish; + } + + r = parse_pid(argv[ARG_PID], &pid); + if (r < 0) { + log_set_target(LOG_TARGET_JOURNAL_OR_KMSG); + log_open(); + + log_error("Failed to parse PID."); + goto finish; + } + + if (sd_pid_get_unit(pid, &t) >= 0) { + + if (streq(t, SPECIAL_JOURNALD_SERVICE)) { + /* Make sure we don't make use of the journal, + * if it's the journal which is crashing */ + log_set_target(LOG_TARGET_KMSG); + log_open(); + + r = divert_coredump(); + goto finish; + } + + core_unit = strappend("COREDUMP_UNIT=", t); + free(t); + + if (core_unit) + IOVEC_SET_STRING(iovec[j++], core_unit); + } + + /* OK, now we know it's not the journal, hence make use of + * it */ + log_set_target(LOG_TARGET_JOURNAL_OR_KMSG); + log_open(); + + r = parse_uid(argv[ARG_UID], &uid); + if (r < 0) { + log_error("Failed to parse UID."); + goto finish; + } + + r = parse_gid(argv[ARG_GID], &gid); + if (r < 0) { + log_error("Failed to parse GID."); + goto finish; + } + + core_pid = strappend("COREDUMP_PID=", argv[ARG_PID]); + if (core_pid) + IOVEC_SET_STRING(iovec[j++], core_pid); + + core_uid = strappend("COREDUMP_UID=", argv[ARG_UID]); + if (core_uid) + IOVEC_SET_STRING(iovec[j++], core_uid); + + core_gid = strappend("COREDUMP_GID=", argv[ARG_GID]); + if (core_gid) + IOVEC_SET_STRING(iovec[j++], core_gid); + + core_signal = strappend("COREDUMP_SIGNAL=", argv[ARG_SIGNAL]); + if (core_signal) + IOVEC_SET_STRING(iovec[j++], core_signal); + + core_comm = strappend("COREDUMP_COMM=", argv[ARG_COMM]); + if (core_comm) + IOVEC_SET_STRING(iovec[j++], core_comm); + + if (sd_pid_get_session(pid, &t) >= 0) { + core_session = strappend("COREDUMP_SESSION=", t); + free(t); + + if (core_session) + IOVEC_SET_STRING(iovec[j++], core_session); + } + + if (get_process_exe(pid, &t) >= 0) { + core_exe = strappend("COREDUMP_EXE=", t); + free(t); + + if (core_exe) + IOVEC_SET_STRING(iovec[j++], core_exe); + } + + if (get_process_cmdline(pid, LINE_MAX, false, &t) >= 0) { + core_cmdline = strappend("COREDUMP_CMDLINE=", t); + free(t); + + if (core_cmdline) + IOVEC_SET_STRING(iovec[j++], core_cmdline); + } + + core_timestamp = join("COREDUMP_TIMESTAMP=", argv[ARG_TIMESTAMP], "000000", NULL); + if (core_timestamp) + IOVEC_SET_STRING(iovec[j++], core_timestamp); + + IOVEC_SET_STRING(iovec[j++], "MESSAGE_ID=fc2e22bc6ee647b6b90729ab34a250b1"); + IOVEC_SET_STRING(iovec[j++], "PRIORITY=2"); + + core_message = join("MESSAGE=Process ", argv[ARG_PID], " (", argv[ARG_COMM], ") dumped core.", NULL); + if (core_message) + IOVEC_SET_STRING(iovec[j++], core_message); + + /* Now, let's drop privileges to become the user who owns the + * segfaulted process and allocate the coredump memory under + * his uid. This also ensures that the credentials journald + * will see are the ones of the coredumping user, thus making + * sure the user himself gets access to the core dump. */ + + if (setresgid(gid, gid, gid) < 0 || + setresuid(uid, uid, uid) < 0) { + log_error("Failed to drop privileges: %m"); + r = -errno; + goto finish; + } + + p = malloc(9 + COREDUMP_MAX); + if (!p) { + log_error("Out of memory"); + r = -ENOMEM; + goto finish; + } + + memcpy(p, "COREDUMP=", 9); + + n = loop_read(STDIN_FILENO, p + 9, COREDUMP_MAX, false); + if (n < 0) { + log_error("Failed to read core dump data: %s", strerror(-n)); + r = (int) n; + goto finish; + } + + iovec[j].iov_base = p; + iovec[j].iov_len = 9 + n; + j++; + + r = sd_journal_sendv(iovec, j); + if (r < 0) + log_error("Failed to send coredump: %s", strerror(-r)); + +finish: + free(p); + free(core_pid); + free(core_uid); + free(core_gid); + free(core_signal); + free(core_timestamp); + free(core_comm); + free(core_exe); + free(core_cmdline); + free(core_unit); + free(core_session); + free(core_message); + + return r < 0 ? EXIT_FAILURE : EXIT_SUCCESS; +} diff --git a/src/journal/journal-def.h b/src/journal/journal-def.h new file mode 100644 index 0000000000..9cb8051082 --- /dev/null +++ b/src/journal/journal-def.h @@ -0,0 +1,165 @@ +/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/ + +#ifndef foojournaldefhfoo +#define foojournaldefhfoo + +/*** + This file is part of systemd. + + Copyright 2011 Lennart Poettering + + systemd is free software; you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + systemd is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with systemd; If not, see <http://www.gnu.org/licenses/>. +***/ + +#include "sparse-endian.h" + +#include <systemd/sd-id128.h> + +#include "macro.h" + +typedef struct Header Header; +typedef struct ObjectHeader ObjectHeader; +typedef union Object Object; +typedef struct DataObject DataObject; +typedef struct FieldObject FieldObject; +typedef struct EntryObject EntryObject; +typedef struct HashTableObject HashTableObject; +typedef struct EntryArrayObject EntryArrayObject; +typedef struct EntryItem EntryItem; +typedef struct HashItem HashItem; + +/* Object types */ +enum { + OBJECT_UNUSED, + OBJECT_DATA, + OBJECT_FIELD, + OBJECT_ENTRY, + OBJECT_DATA_HASH_TABLE, + OBJECT_FIELD_HASH_TABLE, + OBJECT_ENTRY_ARRAY, + _OBJECT_TYPE_MAX +}; + +/* Object flags */ +enum { + OBJECT_COMPRESSED = 1 +}; + +_packed_ struct ObjectHeader { + uint8_t type; + uint8_t flags; + uint8_t reserved[6]; + le64_t size; + uint8_t payload[]; +}; + +_packed_ struct DataObject { + ObjectHeader object; + le64_t hash; + le64_t next_hash_offset; + le64_t next_field_offset; + le64_t entry_offset; /* the first array entry we store inline */ + le64_t entry_array_offset; + le64_t n_entries; + uint8_t payload[]; +}; + +_packed_ struct FieldObject { + ObjectHeader object; + le64_t hash; + le64_t next_hash_offset; + le64_t head_data_offset; + le64_t tail_data_offset; + uint8_t payload[]; +}; + +_packed_ struct EntryItem { + le64_t object_offset; + le64_t hash; +}; + +_packed_ struct EntryObject { + ObjectHeader object; + le64_t seqnum; + le64_t realtime; + le64_t monotonic; + sd_id128_t boot_id; + le64_t xor_hash; + EntryItem items[]; +}; + +_packed_ struct HashItem { + le64_t head_hash_offset; + le64_t tail_hash_offset; +}; + +_packed_ struct HashTableObject { + ObjectHeader object; + HashItem items[]; +}; + +_packed_ struct EntryArrayObject { + ObjectHeader object; + le64_t next_entry_array_offset; + le64_t items[]; +}; + +union Object { + ObjectHeader object; + DataObject data; + FieldObject field; + EntryObject entry; + HashTableObject hash_table; + EntryArrayObject entry_array; +}; + +enum { + STATE_OFFLINE, + STATE_ONLINE, + STATE_ARCHIVED +}; + +/* Header flags */ +enum { + HEADER_INCOMPATIBLE_COMPRESSED = 1 +}; + +_packed_ struct Header { + uint8_t signature[8]; /* "LPKSHHRH" */ + uint32_t compatible_flags; + uint32_t incompatible_flags; + uint8_t state; + uint8_t reserved[7]; + sd_id128_t file_id; + sd_id128_t machine_id; + sd_id128_t boot_id; + sd_id128_t seqnum_id; + le64_t arena_offset; + le64_t arena_size; + le64_t data_hash_table_offset; /* for looking up data objects */ + le64_t data_hash_table_size; + le64_t field_hash_table_offset; /* for looking up field objects */ + le64_t field_hash_table_size; + le64_t tail_object_offset; + le64_t n_objects; + le64_t n_entries; + le64_t seqnum; + le64_t first_seqnum; + le64_t entry_array_offset; + le64_t head_entry_realtime; + le64_t tail_entry_realtime; + le64_t tail_entry_monotonic; +}; + +#endif diff --git a/src/journal/journal-file.c b/src/journal/journal-file.c new file mode 100644 index 0000000000..be92c90447 --- /dev/null +++ b/src/journal/journal-file.c @@ -0,0 +1,2274 @@ +/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/ + +/*** + This file is part of systemd. + + Copyright 2011 Lennart Poettering + + systemd is free software; you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + systemd is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with systemd; If not, see <http://www.gnu.org/licenses/>. +***/ + +#include <sys/mman.h> +#include <errno.h> +#include <sys/uio.h> +#include <unistd.h> +#include <sys/statvfs.h> +#include <fcntl.h> +#include <stddef.h> + +#include "journal-def.h" +#include "journal-file.h" +#include "lookup3.h" +#include "compress.h" + +#define DEFAULT_DATA_HASH_TABLE_SIZE (2047ULL*16ULL) +#define DEFAULT_FIELD_HASH_TABLE_SIZE (2047ULL*16ULL) + +#define DEFAULT_WINDOW_SIZE (8ULL*1024ULL*1024ULL) + +#define COMPRESSION_SIZE_THRESHOLD (512ULL) + +/* This is the minimum journal file size */ +#define JOURNAL_FILE_SIZE_MIN (64ULL*1024ULL) /* 64 KiB */ + +/* These are the lower and upper bounds if we deduce the max_use value + * from the file system size */ +#define DEFAULT_MAX_USE_LOWER (1ULL*1024ULL*1024ULL) /* 1 MiB */ +#define DEFAULT_MAX_USE_UPPER (4ULL*1024ULL*1024ULL*1024ULL) /* 4 GiB */ + +/* This is the upper bound if we deduce max_size from max_use */ +#define DEFAULT_MAX_SIZE_UPPER (128ULL*1024ULL*1024ULL) /* 128 MiB */ + +/* This is the upper bound if we deduce the keep_free value from the + * file system size */ +#define DEFAULT_KEEP_FREE_UPPER (4ULL*1024ULL*1024ULL*1024ULL) /* 4 GiB */ + +/* This is the keep_free value when we can't determine the system + * size */ +#define DEFAULT_KEEP_FREE (1024ULL*1024ULL) /* 1 MB */ + +static const char signature[] = { 'L', 'P', 'K', 'S', 'H', 'H', 'R', 'H' }; + +#define ALIGN64(x) (((x) + 7ULL) & ~7ULL) + +void journal_file_close(JournalFile *f) { + int t; + + assert(f); + + if (f->header && f->writable) + f->header->state = STATE_OFFLINE; + + + for (t = 0; t < _WINDOW_MAX; t++) + if (f->windows[t].ptr) + munmap(f->windows[t].ptr, f->windows[t].size); + + if (f->fd >= 0) + close_nointr_nofail(f->fd); + + free(f->path); + +#ifdef HAVE_XZ + free(f->compress_buffer); +#endif + + free(f); +} + +static int journal_file_init_header(JournalFile *f, JournalFile *template) { + Header h; + ssize_t k; + int r; + + assert(f); + + zero(h); + memcpy(h.signature, signature, 8); + h.arena_offset = htole64(ALIGN64(sizeof(h))); + + r = sd_id128_randomize(&h.file_id); + if (r < 0) + return r; + + if (template) { + h.seqnum_id = template->header->seqnum_id; + h.seqnum = template->header->seqnum; + } else + h.seqnum_id = h.file_id; + + k = pwrite(f->fd, &h, sizeof(h), 0); + if (k < 0) + return -errno; + + if (k != sizeof(h)) + return -EIO; + + return 0; +} + +static int journal_file_refresh_header(JournalFile *f) { + int r; + sd_id128_t boot_id; + + assert(f); + + r = sd_id128_get_machine(&f->header->machine_id); + if (r < 0) + return r; + + r = sd_id128_get_boot(&boot_id); + if (r < 0) + return r; + + if (sd_id128_equal(boot_id, f->header->boot_id)) + f->tail_entry_monotonic_valid = true; + + f->header->boot_id = boot_id; + + f->header->state = STATE_ONLINE; + + __sync_synchronize(); + + return 0; +} + +static int journal_file_verify_header(JournalFile *f) { + assert(f); + + if (memcmp(f->header, signature, 8)) + return -EBADMSG; + +#ifdef HAVE_XZ + if ((le64toh(f->header->incompatible_flags) & ~HEADER_INCOMPATIBLE_COMPRESSED) != 0) + return -EPROTONOSUPPORT; +#else + if (f->header->incompatible_flags != 0) + return -EPROTONOSUPPORT; +#endif + + if ((uint64_t) f->last_stat.st_size < (le64toh(f->header->arena_offset) + le64toh(f->header->arena_size))) + return -ENODATA; + + if (f->writable) { + uint8_t state; + sd_id128_t machine_id; + int r; + + r = sd_id128_get_machine(&machine_id); + if (r < 0) + return r; + + if (!sd_id128_equal(machine_id, f->header->machine_id)) + return -EHOSTDOWN; + + state = f->header->state; + + if (state == STATE_ONLINE) + log_debug("Journal file %s is already online. Assuming unclean closing. Ignoring.", f->path); + else if (state == STATE_ARCHIVED) + return -ESHUTDOWN; + else if (state != STATE_OFFLINE) + log_debug("Journal file %s has unknown state %u. Ignoring.", f->path, state); + } + + return 0; +} + +static int journal_file_allocate(JournalFile *f, uint64_t offset, uint64_t size) { + uint64_t old_size, new_size; + + assert(f); + + /* We assume that this file is not sparse, and we know that + * for sure, since we always call posix_fallocate() + * ourselves */ + + old_size = + le64toh(f->header->arena_offset) + + le64toh(f->header->arena_size); + + new_size = PAGE_ALIGN(offset + size); + if (new_size < le64toh(f->header->arena_offset)) + new_size = le64toh(f->header->arena_offset); + + if (new_size <= old_size) + return 0; + + if (f->metrics.max_size > 0 && + new_size > f->metrics.max_size) + return -E2BIG; + + if (new_size > f->metrics.min_size && + f->metrics.keep_free > 0) { + struct statvfs svfs; + + if (fstatvfs(f->fd, &svfs) >= 0) { + uint64_t available; + + available = svfs.f_bfree * svfs.f_bsize; + + if (available >= f->metrics.keep_free) + available -= f->metrics.keep_free; + else + available = 0; + + if (new_size - old_size > available) + return -E2BIG; + } + } + + /* Note that the glibc fallocate() fallback is very + inefficient, hence we try to minimize the allocation area + as we can. */ + if (posix_fallocate(f->fd, old_size, new_size - old_size) < 0) + return -errno; + + if (fstat(f->fd, &f->last_stat) < 0) + return -errno; + + f->header->arena_size = htole64(new_size - le64toh(f->header->arena_offset)); + + return 0; +} + +static int journal_file_map( + JournalFile *f, + uint64_t offset, + uint64_t size, + void **_window, + uint64_t *_woffset, + uint64_t *_wsize, + void **ret) { + + uint64_t woffset, wsize; + void *window; + + assert(f); + assert(size > 0); + assert(ret); + + woffset = offset & ~((uint64_t) page_size() - 1ULL); + wsize = size + (offset - woffset); + wsize = PAGE_ALIGN(wsize); + + /* Avoid SIGBUS on invalid accesses */ + if (woffset + wsize > (uint64_t) PAGE_ALIGN(f->last_stat.st_size)) + return -EADDRNOTAVAIL; + + window = mmap(NULL, wsize, f->prot, MAP_SHARED, f->fd, woffset); + if (window == MAP_FAILED) + return -errno; + + if (_window) + *_window = window; + + if (_woffset) + *_woffset = woffset; + + if (_wsize) + *_wsize = wsize; + + *ret = (uint8_t*) window + (offset - woffset); + + return 0; +} + +static int journal_file_move_to(JournalFile *f, int wt, uint64_t offset, uint64_t size, void **ret) { + void *p = NULL; + uint64_t delta; + int r; + Window *w; + + assert(f); + assert(ret); + assert(wt >= 0); + assert(wt < _WINDOW_MAX); + + if (offset + size > (uint64_t) f->last_stat.st_size) { + /* Hmm, out of range? Let's refresh the fstat() data + * first, before we trust that check. */ + + if (fstat(f->fd, &f->last_stat) < 0 || + offset + size > (uint64_t) f->last_stat.st_size) + return -EADDRNOTAVAIL; + } + + w = f->windows + wt; + + if (_likely_(w->ptr && + w->offset <= offset && + w->offset + w->size >= offset + size)) { + + *ret = (uint8_t*) w->ptr + (offset - w->offset); + return 0; + } + + if (w->ptr) { + if (munmap(w->ptr, w->size) < 0) + return -errno; + + w->ptr = NULL; + w->size = w->offset = 0; + } + + if (size < DEFAULT_WINDOW_SIZE) { + /* If the default window size is larger then what was + * asked for extend the mapping a bit in the hope to + * minimize needed remappings later on. We add half + * the window space before and half behind the + * requested mapping */ + + delta = (DEFAULT_WINDOW_SIZE - size) / 2; + + if (delta > offset) + delta = offset; + + offset -= delta; + size = DEFAULT_WINDOW_SIZE; + } else + delta = 0; + + if (offset + size > (uint64_t) f->last_stat.st_size) + size = (uint64_t) f->last_stat.st_size - offset; + + if (size <= 0) + return -EADDRNOTAVAIL; + + r = journal_file_map(f, + offset, size, + &w->ptr, &w->offset, &w->size, + &p); + + if (r < 0) + return r; + + *ret = (uint8_t*) p + delta; + return 0; +} + +static bool verify_hash(Object *o) { + uint64_t h1, h2; + + assert(o); + + if (o->object.type == OBJECT_DATA && !(o->object.flags & OBJECT_COMPRESSED)) { + h1 = le64toh(o->data.hash); + h2 = hash64(o->data.payload, le64toh(o->object.size) - offsetof(Object, data.payload)); + } else if (o->object.type == OBJECT_FIELD) { + h1 = le64toh(o->field.hash); + h2 = hash64(o->field.payload, le64toh(o->object.size) - offsetof(Object, field.payload)); + } else + return true; + + return h1 == h2; +} + +int journal_file_move_to_object(JournalFile *f, int type, uint64_t offset, Object **ret) { + int r; + void *t; + Object *o; + uint64_t s; + + assert(f); + assert(ret); + assert(type < _OBJECT_TYPE_MAX); + + r = journal_file_move_to(f, type >= 0 ? type : WINDOW_UNKNOWN, offset, sizeof(ObjectHeader), &t); + if (r < 0) + return r; + + o = (Object*) t; + s = le64toh(o->object.size); + + if (s < sizeof(ObjectHeader)) + return -EBADMSG; + + if (type >= 0 && o->object.type != type) + return -EBADMSG; + + if (s > sizeof(ObjectHeader)) { + r = journal_file_move_to(f, o->object.type, offset, s, &t); + if (r < 0) + return r; + + o = (Object*) t; + } + + if (!verify_hash(o)) + return -EBADMSG; + + *ret = o; + return 0; +} + +static uint64_t journal_file_seqnum(JournalFile *f, uint64_t *seqnum) { + uint64_t r; + + assert(f); + + r = le64toh(f->header->seqnum) + 1; + + if (seqnum) { + /* If an external seqnum counter was passed, we update + * both the local and the external one, and set it to + * the maximum of both */ + + if (*seqnum + 1 > r) + r = *seqnum + 1; + + *seqnum = r; + } + + f->header->seqnum = htole64(r); + + if (f->header->first_seqnum == 0) + f->header->first_seqnum = htole64(r); + + return r; +} + +static int journal_file_append_object(JournalFile *f, int type, uint64_t size, Object **ret, uint64_t *offset) { + int r; + uint64_t p; + Object *tail, *o; + void *t; + + assert(f); + assert(size >= sizeof(ObjectHeader)); + assert(offset); + assert(ret); + + p = le64toh(f->header->tail_object_offset); + if (p == 0) + p = le64toh(f->header->arena_offset); + else { + r = journal_file_move_to_object(f, -1, p, &tail); + if (r < 0) + return r; + + p += ALIGN64(le64toh(tail->object.size)); + } + + r = journal_file_allocate(f, p, size); + if (r < 0) + return r; + + r = journal_file_move_to(f, type, p, size, &t); + if (r < 0) + return r; + + o = (Object*) t; + + zero(o->object); + o->object.type = type; + o->object.size = htole64(size); + + f->header->tail_object_offset = htole64(p); + f->header->n_objects = htole64(le64toh(f->header->n_objects) + 1); + + *ret = o; + *offset = p; + + return 0; +} + +static int journal_file_setup_data_hash_table(JournalFile *f) { + uint64_t s, p; + Object *o; + int r; + + assert(f); + + s = DEFAULT_DATA_HASH_TABLE_SIZE; + r = journal_file_append_object(f, + OBJECT_DATA_HASH_TABLE, + offsetof(Object, hash_table.items) + s, + &o, &p); + if (r < 0) + return r; + + memset(o->hash_table.items, 0, s); + + f->header->data_hash_table_offset = htole64(p + offsetof(Object, hash_table.items)); + f->header->data_hash_table_size = htole64(s); + + return 0; +} + +static int journal_file_setup_field_hash_table(JournalFile *f) { + uint64_t s, p; + Object *o; + int r; + + assert(f); + + s = DEFAULT_FIELD_HASH_TABLE_SIZE; + r = journal_file_append_object(f, + OBJECT_FIELD_HASH_TABLE, + offsetof(Object, hash_table.items) + s, + &o, &p); + if (r < 0) + return r; + + memset(o->hash_table.items, 0, s); + + f->header->field_hash_table_offset = htole64(p + offsetof(Object, hash_table.items)); + f->header->field_hash_table_size = htole64(s); + + return 0; +} + +static int journal_file_map_data_hash_table(JournalFile *f) { + uint64_t s, p; + void *t; + int r; + + assert(f); + + p = le64toh(f->header->data_hash_table_offset); + s = le64toh(f->header->data_hash_table_size); + + r = journal_file_move_to(f, + WINDOW_DATA_HASH_TABLE, + p, s, + &t); + if (r < 0) + return r; + + f->data_hash_table = t; + return 0; +} + +static int journal_file_map_field_hash_table(JournalFile *f) { + uint64_t s, p; + void *t; + int r; + + assert(f); + + p = le64toh(f->header->field_hash_table_offset); + s = le64toh(f->header->field_hash_table_size); + + r = journal_file_move_to(f, + WINDOW_FIELD_HASH_TABLE, + p, s, + &t); + if (r < 0) + return r; + + f->field_hash_table = t; + return 0; +} + +static int journal_file_link_data(JournalFile *f, Object *o, uint64_t offset, uint64_t hash) { + uint64_t p, h; + int r; + + assert(f); + assert(o); + assert(offset > 0); + assert(o->object.type == OBJECT_DATA); + + /* This might alter the window we are looking at */ + + o->data.next_hash_offset = o->data.next_field_offset = 0; + o->data.entry_offset = o->data.entry_array_offset = 0; + o->data.n_entries = 0; + + h = hash % (le64toh(f->header->data_hash_table_size) / sizeof(HashItem)); + p = le64toh(f->data_hash_table[h].head_hash_offset); + if (p == 0) { + /* Only entry in the hash table is easy */ + f->data_hash_table[h].head_hash_offset = htole64(offset); + } else { + /* Move back to the previous data object, to patch in + * pointer */ + + r = journal_file_move_to_object(f, OBJECT_DATA, p, &o); + if (r < 0) + return r; + + o->data.next_hash_offset = htole64(offset); + } + + f->data_hash_table[h].tail_hash_offset = htole64(offset); + + return 0; +} + +int journal_file_find_data_object_with_hash( + JournalFile *f, + const void *data, uint64_t size, uint64_t hash, + Object **ret, uint64_t *offset) { + + uint64_t p, osize, h; + int r; + + assert(f); + assert(data || size == 0); + + osize = offsetof(Object, data.payload) + size; + + if (f->header->data_hash_table_size == 0) + return -EBADMSG; + + h = hash % (le64toh(f->header->data_hash_table_size) / sizeof(HashItem)); + p = le64toh(f->data_hash_table[h].head_hash_offset); + + while (p > 0) { + Object *o; + + r = journal_file_move_to_object(f, OBJECT_DATA, p, &o); + if (r < 0) + return r; + + if (le64toh(o->data.hash) != hash) + goto next; + + if (o->object.flags & OBJECT_COMPRESSED) { +#ifdef HAVE_XZ + uint64_t l, rsize; + + l = le64toh(o->object.size); + if (l <= offsetof(Object, data.payload)) + return -EBADMSG; + + l -= offsetof(Object, data.payload); + + if (!uncompress_blob(o->data.payload, l, &f->compress_buffer, &f->compress_buffer_size, &rsize)) + return -EBADMSG; + + if (rsize == size && + memcmp(f->compress_buffer, data, size) == 0) { + + if (ret) + *ret = o; + + if (offset) + *offset = p; + + return 1; + } +#else + return -EPROTONOSUPPORT; +#endif + + } else if (le64toh(o->object.size) == osize && + memcmp(o->data.payload, data, size) == 0) { + + if (ret) + *ret = o; + + if (offset) + *offset = p; + + return 1; + } + + next: + p = le64toh(o->data.next_hash_offset); + } + + return 0; +} + +int journal_file_find_data_object( + JournalFile *f, + const void *data, uint64_t size, + Object **ret, uint64_t *offset) { + + uint64_t hash; + + assert(f); + assert(data || size == 0); + + hash = hash64(data, size); + + return journal_file_find_data_object_with_hash(f, + data, size, hash, + ret, offset); +} + +static int journal_file_append_data( + JournalFile *f, + const void *data, uint64_t size, + Object **ret, uint64_t *offset) { + + uint64_t hash, p; + uint64_t osize; + Object *o; + int r; + bool compressed = false; + + assert(f); + assert(data || size == 0); + + hash = hash64(data, size); + + r = journal_file_find_data_object_with_hash(f, data, size, hash, &o, &p); + if (r < 0) + return r; + else if (r > 0) { + + if (ret) + *ret = o; + + if (offset) + *offset = p; + + return 0; + } + + osize = offsetof(Object, data.payload) + size; + r = journal_file_append_object(f, OBJECT_DATA, osize, &o, &p); + if (r < 0) + return r; + + o->data.hash = htole64(hash); + +#ifdef HAVE_XZ + if (f->compress && + size >= COMPRESSION_SIZE_THRESHOLD) { + uint64_t rsize; + + compressed = compress_blob(data, size, o->data.payload, &rsize); + + if (compressed) { + o->object.size = htole64(offsetof(Object, data.payload) + rsize); + o->object.flags |= OBJECT_COMPRESSED; + + f->header->incompatible_flags = htole32(le32toh(f->header->incompatible_flags) | HEADER_INCOMPATIBLE_COMPRESSED); + + log_debug("Compressed data object %lu -> %lu", (unsigned long) size, (unsigned long) rsize); + } + } +#endif + + if (!compressed) + memcpy(o->data.payload, data, size); + + r = journal_file_link_data(f, o, p, hash); + if (r < 0) + return r; + + /* The linking might have altered the window, so let's + * refresh our pointer */ + r = journal_file_move_to_object(f, OBJECT_DATA, p, &o); + if (r < 0) + return r; + + if (ret) + *ret = o; + + if (offset) + *offset = p; + + return 0; +} + +uint64_t journal_file_entry_n_items(Object *o) { + assert(o); + assert(o->object.type == OBJECT_ENTRY); + + return (le64toh(o->object.size) - offsetof(Object, entry.items)) / sizeof(EntryItem); +} + +static uint64_t journal_file_entry_array_n_items(Object *o) { + assert(o); + assert(o->object.type == OBJECT_ENTRY_ARRAY); + + return (le64toh(o->object.size) - offsetof(Object, entry_array.items)) / sizeof(uint64_t); +} + +static int link_entry_into_array(JournalFile *f, + le64_t *first, + le64_t *idx, + uint64_t p) { + int r; + uint64_t n = 0, ap = 0, q, i, a, hidx; + Object *o; + + assert(f); + assert(first); + assert(idx); + assert(p > 0); + + a = le64toh(*first); + i = hidx = le64toh(*idx); + while (a > 0) { + + r = journal_file_move_to_object(f, OBJECT_ENTRY_ARRAY, a, &o); + if (r < 0) + return r; + + n = journal_file_entry_array_n_items(o); + if (i < n) { + o->entry_array.items[i] = htole64(p); + *idx = htole64(hidx + 1); + return 0; + } + + i -= n; + ap = a; + a = le64toh(o->entry_array.next_entry_array_offset); + } + + if (hidx > n) + n = (hidx+1) * 2; + else + n = n * 2; + + if (n < 4) + n = 4; + + r = journal_file_append_object(f, OBJECT_ENTRY_ARRAY, + offsetof(Object, entry_array.items) + n * sizeof(uint64_t), + &o, &q); + if (r < 0) + return r; + + o->entry_array.items[i] = htole64(p); + + if (ap == 0) + *first = htole64(q); + else { + r = journal_file_move_to_object(f, OBJECT_ENTRY_ARRAY, ap, &o); + if (r < 0) + return r; + + o->entry_array.next_entry_array_offset = htole64(q); + } + + *idx = htole64(hidx + 1); + + return 0; +} + +static int link_entry_into_array_plus_one(JournalFile *f, + le64_t *extra, + le64_t *first, + le64_t *idx, + uint64_t p) { + + int r; + + assert(f); + assert(extra); + assert(first); + assert(idx); + assert(p > 0); + + if (*idx == 0) + *extra = htole64(p); + else { + le64_t i; + + i = htole64(le64toh(*idx) - 1); + r = link_entry_into_array(f, first, &i, p); + if (r < 0) + return r; + } + + *idx = htole64(le64toh(*idx) + 1); + return 0; +} + +static int journal_file_link_entry_item(JournalFile *f, Object *o, uint64_t offset, uint64_t i) { + uint64_t p; + int r; + assert(f); + assert(o); + assert(offset > 0); + + p = le64toh(o->entry.items[i].object_offset); + if (p == 0) + return -EINVAL; + + r = journal_file_move_to_object(f, OBJECT_DATA, p, &o); + if (r < 0) + return r; + + return link_entry_into_array_plus_one(f, + &o->data.entry_offset, + &o->data.entry_array_offset, + &o->data.n_entries, + offset); +} + +static int journal_file_link_entry(JournalFile *f, Object *o, uint64_t offset) { + uint64_t n, i; + int r; + + assert(f); + assert(o); + assert(offset > 0); + assert(o->object.type == OBJECT_ENTRY); + + __sync_synchronize(); + + /* Link up the entry itself */ + r = link_entry_into_array(f, + &f->header->entry_array_offset, + &f->header->n_entries, + offset); + if (r < 0) + return r; + + /* log_debug("=> %s seqnr=%lu n_entries=%lu", f->path, (unsigned long) o->entry.seqnum, (unsigned long) f->header->n_entries); */ + + if (f->header->head_entry_realtime == 0) + f->header->head_entry_realtime = o->entry.realtime; + + f->header->tail_entry_realtime = o->entry.realtime; + f->header->tail_entry_monotonic = o->entry.monotonic; + + f->tail_entry_monotonic_valid = true; + + /* Link up the items */ + n = journal_file_entry_n_items(o); + for (i = 0; i < n; i++) { + r = journal_file_link_entry_item(f, o, offset, i); + if (r < 0) + return r; + } + + return 0; +} + +static int journal_file_append_entry_internal( + JournalFile *f, + const dual_timestamp *ts, + uint64_t xor_hash, + const EntryItem items[], unsigned n_items, + uint64_t *seqnum, + Object **ret, uint64_t *offset) { + uint64_t np; + uint64_t osize; + Object *o; + int r; + + assert(f); + assert(items || n_items == 0); + assert(ts); + + osize = offsetof(Object, entry.items) + (n_items * sizeof(EntryItem)); + + r = journal_file_append_object(f, OBJECT_ENTRY, osize, &o, &np); + if (r < 0) + return r; + + o->entry.seqnum = htole64(journal_file_seqnum(f, seqnum)); + memcpy(o->entry.items, items, n_items * sizeof(EntryItem)); + o->entry.realtime = htole64(ts->realtime); + o->entry.monotonic = htole64(ts->monotonic); + o->entry.xor_hash = htole64(xor_hash); + o->entry.boot_id = f->header->boot_id; + + r = journal_file_link_entry(f, o, np); + if (r < 0) + return r; + + if (ret) + *ret = o; + + if (offset) + *offset = np; + + return 0; +} + +void journal_file_post_change(JournalFile *f) { + assert(f); + + /* inotify() does not receive IN_MODIFY events from file + * accesses done via mmap(). After each access we hence + * trigger IN_MODIFY by truncating the journal file to its + * current size which triggers IN_MODIFY. */ + + __sync_synchronize(); + + if (ftruncate(f->fd, f->last_stat.st_size) < 0) + log_error("Failed to to truncate file to its own size: %m"); +} + +int journal_file_append_entry(JournalFile *f, const dual_timestamp *ts, const struct iovec iovec[], unsigned n_iovec, uint64_t *seqnum, Object **ret, uint64_t *offset) { + unsigned i; + EntryItem *items; + int r; + uint64_t xor_hash = 0; + struct dual_timestamp _ts; + + assert(f); + assert(iovec || n_iovec == 0); + + if (!f->writable) + return -EPERM; + + if (!ts) { + dual_timestamp_get(&_ts); + ts = &_ts; + } + + if (f->tail_entry_monotonic_valid && + ts->monotonic < le64toh(f->header->tail_entry_monotonic)) + return -EINVAL; + + items = alloca(sizeof(EntryItem) * n_iovec); + + for (i = 0; i < n_iovec; i++) { + uint64_t p; + Object *o; + + r = journal_file_append_data(f, iovec[i].iov_base, iovec[i].iov_len, &o, &p); + if (r < 0) + return r; + + xor_hash ^= le64toh(o->data.hash); + items[i].object_offset = htole64(p); + items[i].hash = o->data.hash; + } + + r = journal_file_append_entry_internal(f, ts, xor_hash, items, n_iovec, seqnum, ret, offset); + + journal_file_post_change(f); + + return r; +} + +static int generic_array_get(JournalFile *f, + uint64_t first, + uint64_t i, + Object **ret, uint64_t *offset) { + + Object *o; + uint64_t p = 0, a; + int r; + + assert(f); + + a = first; + while (a > 0) { + uint64_t n; + + r = journal_file_move_to_object(f, OBJECT_ENTRY_ARRAY, a, &o); + if (r < 0) + return r; + + n = journal_file_entry_array_n_items(o); + if (i < n) { + p = le64toh(o->entry_array.items[i]); + break; + } + + i -= n; + a = le64toh(o->entry_array.next_entry_array_offset); + } + + if (a <= 0 || p <= 0) + return 0; + + r = journal_file_move_to_object(f, OBJECT_ENTRY, p, &o); + if (r < 0) + return r; + + if (ret) + *ret = o; + + if (offset) + *offset = p; + + return 1; +} + +static int generic_array_get_plus_one(JournalFile *f, + uint64_t extra, + uint64_t first, + uint64_t i, + Object **ret, uint64_t *offset) { + + Object *o; + + assert(f); + + if (i == 0) { + int r; + + r = journal_file_move_to_object(f, OBJECT_ENTRY, extra, &o); + if (r < 0) + return r; + + if (ret) + *ret = o; + + if (offset) + *offset = extra; + + return 1; + } + + return generic_array_get(f, first, i-1, ret, offset); +} + +enum { + TEST_FOUND, + TEST_LEFT, + TEST_RIGHT +}; + +static int generic_array_bisect(JournalFile *f, + uint64_t first, + uint64_t n, + uint64_t needle, + int (*test_object)(JournalFile *f, uint64_t p, uint64_t needle), + direction_t direction, + Object **ret, + uint64_t *offset, + uint64_t *idx) { + + uint64_t a, p, t = 0, i = 0, last_p = 0; + bool subtract_one = false; + Object *o, *array = NULL; + int r; + + assert(f); + assert(test_object); + + a = first; + while (a > 0) { + uint64_t left, right, k, lp; + + r = journal_file_move_to_object(f, OBJECT_ENTRY_ARRAY, a, &array); + if (r < 0) + return r; + + k = journal_file_entry_array_n_items(array); + right = MIN(k, n); + if (right <= 0) + return 0; + + i = right - 1; + lp = p = le64toh(array->entry_array.items[i]); + if (p <= 0) + return -EBADMSG; + + r = test_object(f, p, needle); + if (r < 0) + return r; + + if (r == TEST_FOUND) + r = direction == DIRECTION_DOWN ? TEST_RIGHT : TEST_LEFT; + + if (r == TEST_RIGHT) { + left = 0; + right -= 1; + for (;;) { + if (left == right) { + if (direction == DIRECTION_UP) + subtract_one = true; + + i = left; + goto found; + } + + assert(left < right); + + i = (left + right) / 2; + p = le64toh(array->entry_array.items[i]); + if (p <= 0) + return -EBADMSG; + + r = test_object(f, p, needle); + if (r < 0) + return r; + + if (r == TEST_FOUND) + r = direction == DIRECTION_DOWN ? TEST_RIGHT : TEST_LEFT; + + if (r == TEST_RIGHT) + right = i; + else + left = i + 1; + } + } + + if (k > n) + return 0; + + last_p = lp; + + n -= k; + t += k; + a = le64toh(array->entry_array.next_entry_array_offset); + } + + return 0; + +found: + if (subtract_one && t == 0 && i == 0) + return 0; + + if (subtract_one && i == 0) + p = last_p; + else if (subtract_one) + p = le64toh(array->entry_array.items[i-1]); + else + p = le64toh(array->entry_array.items[i]); + + r = journal_file_move_to_object(f, OBJECT_ENTRY, p, &o); + if (r < 0) + return r; + + if (ret) + *ret = o; + + if (offset) + *offset = p; + + if (idx) + *idx = t + i - (subtract_one ? 1 : 0); + + return 1; +} + +static int generic_array_bisect_plus_one(JournalFile *f, + uint64_t extra, + uint64_t first, + uint64_t n, + uint64_t needle, + int (*test_object)(JournalFile *f, uint64_t p, uint64_t needle), + direction_t direction, + Object **ret, + uint64_t *offset, + uint64_t *idx) { + + int r; + + assert(f); + assert(test_object); + + if (n <= 0) + return 0; + + /* This bisects the array in object 'first', but first checks + * an extra */ + r = test_object(f, extra, needle); + if (r < 0) + return r; + else if (r == TEST_FOUND) { + Object *o; + + r = journal_file_move_to_object(f, OBJECT_ENTRY, extra, &o); + if (r < 0) + return r; + + if (ret) + *ret = o; + + if (offset) + *offset = extra; + + if (idx) + *idx = 0; + + return 1; + } else if (r == TEST_RIGHT) + return 0; + + r = generic_array_bisect(f, first, n-1, needle, test_object, direction, ret, offset, idx); + + if (r > 0) + (*idx) ++; + + return r; +} + +static int test_object_seqnum(JournalFile *f, uint64_t p, uint64_t needle) { + Object *o; + int r; + + assert(f); + assert(p > 0); + + r = journal_file_move_to_object(f, OBJECT_ENTRY, p, &o); + if (r < 0) + return r; + + if (le64toh(o->entry.seqnum) == needle) + return TEST_FOUND; + else if (le64toh(o->entry.seqnum) < needle) + return TEST_LEFT; + else + return TEST_RIGHT; +} + +int journal_file_move_to_entry_by_seqnum( + JournalFile *f, + uint64_t seqnum, + direction_t direction, + Object **ret, + uint64_t *offset) { + + return generic_array_bisect(f, + le64toh(f->header->entry_array_offset), + le64toh(f->header->n_entries), + seqnum, + test_object_seqnum, + direction, + ret, offset, NULL); +} + +static int test_object_realtime(JournalFile *f, uint64_t p, uint64_t needle) { + Object *o; + int r; + + assert(f); + assert(p > 0); + + r = journal_file_move_to_object(f, OBJECT_ENTRY, p, &o); + if (r < 0) + return r; + + if (le64toh(o->entry.realtime) == needle) + return TEST_FOUND; + else if (le64toh(o->entry.realtime) < needle) + return TEST_LEFT; + else + return TEST_RIGHT; +} + +int journal_file_move_to_entry_by_realtime( + JournalFile *f, + uint64_t realtime, + direction_t direction, + Object **ret, + uint64_t *offset) { + + return generic_array_bisect(f, + le64toh(f->header->entry_array_offset), + le64toh(f->header->n_entries), + realtime, + test_object_realtime, + direction, + ret, offset, NULL); +} + +static int test_object_monotonic(JournalFile *f, uint64_t p, uint64_t needle) { + Object *o; + int r; + + assert(f); + assert(p > 0); + + r = journal_file_move_to_object(f, OBJECT_ENTRY, p, &o); + if (r < 0) + return r; + + if (le64toh(o->entry.monotonic) == needle) + return TEST_FOUND; + else if (le64toh(o->entry.monotonic) < needle) + return TEST_LEFT; + else + return TEST_RIGHT; +} + +int journal_file_move_to_entry_by_monotonic( + JournalFile *f, + sd_id128_t boot_id, + uint64_t monotonic, + direction_t direction, + Object **ret, + uint64_t *offset) { + + char t[8+32+1] = "_BOOT_ID="; + Object *o; + int r; + + sd_id128_to_string(boot_id, t + 8); + + r = journal_file_find_data_object(f, t, strlen(t), &o, NULL); + if (r < 0) + return r; + else if (r == 0) + return -ENOENT; + + return generic_array_bisect_plus_one(f, + le64toh(o->data.entry_offset), + le64toh(o->data.entry_array_offset), + le64toh(o->data.n_entries), + monotonic, + test_object_monotonic, + direction, + ret, offset, NULL); +} + +static int test_object_offset(JournalFile *f, uint64_t p, uint64_t needle) { + assert(f); + assert(p > 0); + + if (p == needle) + return TEST_FOUND; + else if (p < needle) + return TEST_LEFT; + else + return TEST_RIGHT; +} + +int journal_file_next_entry( + JournalFile *f, + Object *o, uint64_t p, + direction_t direction, + Object **ret, uint64_t *offset) { + + uint64_t i, n; + int r; + + assert(f); + assert(p > 0 || !o); + + n = le64toh(f->header->n_entries); + if (n <= 0) + return 0; + + if (!o) + i = direction == DIRECTION_DOWN ? 0 : n - 1; + else { + if (o->object.type != OBJECT_ENTRY) + return -EINVAL; + + r = generic_array_bisect(f, + le64toh(f->header->entry_array_offset), + le64toh(f->header->n_entries), + p, + test_object_offset, + DIRECTION_DOWN, + NULL, NULL, + &i); + if (r <= 0) + return r; + + if (direction == DIRECTION_DOWN) { + if (i >= n - 1) + return 0; + + i++; + } else { + if (i <= 0) + return 0; + + i--; + } + } + + /* And jump to it */ + return generic_array_get(f, + le64toh(f->header->entry_array_offset), + i, + ret, offset); +} + +int journal_file_skip_entry( + JournalFile *f, + Object *o, uint64_t p, + int64_t skip, + Object **ret, uint64_t *offset) { + + uint64_t i, n; + int r; + + assert(f); + assert(o); + assert(p > 0); + + if (o->object.type != OBJECT_ENTRY) + return -EINVAL; + + r = generic_array_bisect(f, + le64toh(f->header->entry_array_offset), + le64toh(f->header->n_entries), + p, + test_object_offset, + DIRECTION_DOWN, + NULL, NULL, + &i); + if (r <= 0) + return r; + + /* Calculate new index */ + if (skip < 0) { + if ((uint64_t) -skip >= i) + i = 0; + else + i = i - (uint64_t) -skip; + } else + i += (uint64_t) skip; + + n = le64toh(f->header->n_entries); + if (n <= 0) + return -EBADMSG; + + if (i >= n) + i = n-1; + + return generic_array_get(f, + le64toh(f->header->entry_array_offset), + i, + ret, offset); +} + +int journal_file_next_entry_for_data( + JournalFile *f, + Object *o, uint64_t p, + uint64_t data_offset, + direction_t direction, + Object **ret, uint64_t *offset) { + + uint64_t n, i; + int r; + Object *d; + + assert(f); + assert(p > 0 || !o); + + r = journal_file_move_to_object(f, OBJECT_DATA, data_offset, &d); + if (r < 0) + return r; + + n = le64toh(d->data.n_entries); + if (n <= 0) + return n; + + if (!o) + i = direction == DIRECTION_DOWN ? 0 : n - 1; + else { + if (o->object.type != OBJECT_ENTRY) + return -EINVAL; + + r = generic_array_bisect_plus_one(f, + le64toh(d->data.entry_offset), + le64toh(d->data.entry_array_offset), + le64toh(d->data.n_entries), + p, + test_object_offset, + DIRECTION_DOWN, + NULL, NULL, + &i); + + if (r <= 0) + return r; + + if (direction == DIRECTION_DOWN) { + if (i >= n - 1) + return 0; + + i++; + } else { + if (i <= 0) + return 0; + + i--; + } + + } + + return generic_array_get_plus_one(f, + le64toh(d->data.entry_offset), + le64toh(d->data.entry_array_offset), + i, + ret, offset); +} + +int journal_file_move_to_entry_by_seqnum_for_data( + JournalFile *f, + uint64_t data_offset, + uint64_t seqnum, + direction_t direction, + Object **ret, uint64_t *offset) { + + Object *d; + int r; + + r = journal_file_move_to_object(f, OBJECT_DATA, data_offset, &d); + if (r <= 0) + return r; + + return generic_array_bisect_plus_one(f, + le64toh(d->data.entry_offset), + le64toh(d->data.entry_array_offset), + le64toh(d->data.n_entries), + seqnum, + test_object_seqnum, + direction, + ret, offset, NULL); +} + +int journal_file_move_to_entry_by_realtime_for_data( + JournalFile *f, + uint64_t data_offset, + uint64_t realtime, + direction_t direction, + Object **ret, uint64_t *offset) { + + Object *d; + int r; + + r = journal_file_move_to_object(f, OBJECT_DATA, data_offset, &d); + if (r <= 0) + return r; + + return generic_array_bisect_plus_one(f, + le64toh(d->data.entry_offset), + le64toh(d->data.entry_array_offset), + le64toh(d->data.n_entries), + realtime, + test_object_realtime, + direction, + ret, offset, NULL); +} + +void journal_file_dump(JournalFile *f) { + char a[33], b[33], c[33]; + Object *o; + int r; + uint64_t p; + + assert(f); + + printf("File Path: %s\n" + "File ID: %s\n" + "Machine ID: %s\n" + "Boot ID: %s\n" + "Arena size: %llu\n" + "Objects: %lu\n" + "Entries: %lu\n", + f->path, + sd_id128_to_string(f->header->file_id, a), + sd_id128_to_string(f->header->machine_id, b), + sd_id128_to_string(f->header->boot_id, c), + (unsigned long long) le64toh(f->header->arena_size), + (unsigned long) le64toh(f->header->n_objects), + (unsigned long) le64toh(f->header->n_entries)); + + p = le64toh(f->header->arena_offset); + while (p != 0) { + r = journal_file_move_to_object(f, -1, p, &o); + if (r < 0) + goto fail; + + switch (o->object.type) { + + case OBJECT_UNUSED: + printf("Type: OBJECT_UNUSED\n"); + break; + + case OBJECT_DATA: + printf("Type: OBJECT_DATA\n"); + break; + + case OBJECT_ENTRY: + printf("Type: OBJECT_ENTRY %llu %llu %llu\n", + (unsigned long long) le64toh(o->entry.seqnum), + (unsigned long long) le64toh(o->entry.monotonic), + (unsigned long long) le64toh(o->entry.realtime)); + break; + + case OBJECT_FIELD_HASH_TABLE: + printf("Type: OBJECT_FIELD_HASH_TABLE\n"); + break; + + case OBJECT_DATA_HASH_TABLE: + printf("Type: OBJECT_DATA_HASH_TABLE\n"); + break; + + case OBJECT_ENTRY_ARRAY: + printf("Type: OBJECT_ENTRY_ARRAY\n"); + break; + } + + if (o->object.flags & OBJECT_COMPRESSED) + printf("Flags: COMPRESSED\n"); + + if (p == le64toh(f->header->tail_object_offset)) + p = 0; + else + p = p + ALIGN64(le64toh(o->object.size)); + } + + return; +fail: + log_error("File corrupt"); +} + +int journal_file_open( + const char *fname, + int flags, + mode_t mode, + JournalFile *template, + JournalFile **ret) { + + JournalFile *f; + int r; + bool newly_created = false; + + assert(fname); + + if ((flags & O_ACCMODE) != O_RDONLY && + (flags & O_ACCMODE) != O_RDWR) + return -EINVAL; + + if (!endswith(fname, ".journal")) + return -EINVAL; + + f = new0(JournalFile, 1); + if (!f) + return -ENOMEM; + + f->fd = -1; + f->flags = flags; + f->mode = mode; + f->writable = (flags & O_ACCMODE) != O_RDONLY; + f->prot = prot_from_flags(flags); + + if (template) { + f->metrics = template->metrics; + f->compress = template->compress; + } + + f->path = strdup(fname); + if (!f->path) { + r = -ENOMEM; + goto fail; + } + + f->fd = open(f->path, f->flags|O_CLOEXEC, f->mode); + if (f->fd < 0) { + r = -errno; + goto fail; + } + + if (fstat(f->fd, &f->last_stat) < 0) { + r = -errno; + goto fail; + } + + if (f->last_stat.st_size == 0 && f->writable) { + newly_created = true; + + r = journal_file_init_header(f, template); + if (r < 0) + goto fail; + + if (fstat(f->fd, &f->last_stat) < 0) { + r = -errno; + goto fail; + } + } + + if (f->last_stat.st_size < (off_t) sizeof(Header)) { + r = -EIO; + goto fail; + } + + f->header = mmap(NULL, PAGE_ALIGN(sizeof(Header)), prot_from_flags(flags), MAP_SHARED, f->fd, 0); + if (f->header == MAP_FAILED) { + f->header = NULL; + r = -errno; + goto fail; + } + + if (!newly_created) { + r = journal_file_verify_header(f); + if (r < 0) + goto fail; + } + + if (f->writable) { + r = journal_file_refresh_header(f); + if (r < 0) + goto fail; + } + + if (newly_created) { + + r = journal_file_setup_field_hash_table(f); + if (r < 0) + goto fail; + + r = journal_file_setup_data_hash_table(f); + if (r < 0) + goto fail; + } + + r = journal_file_map_field_hash_table(f); + if (r < 0) + goto fail; + + r = journal_file_map_data_hash_table(f); + if (r < 0) + goto fail; + + if (ret) + *ret = f; + + return 0; + +fail: + journal_file_close(f); + + return r; +} + +int journal_file_rotate(JournalFile **f) { + char *p; + size_t l; + JournalFile *old_file, *new_file = NULL; + int r; + + assert(f); + assert(*f); + + old_file = *f; + + if (!old_file->writable) + return -EINVAL; + + if (!endswith(old_file->path, ".journal")) + return -EINVAL; + + l = strlen(old_file->path); + + p = new(char, l + 1 + 32 + 1 + 16 + 1 + 16 + 1); + if (!p) + return -ENOMEM; + + memcpy(p, old_file->path, l - 8); + p[l-8] = '@'; + sd_id128_to_string(old_file->header->seqnum_id, p + l - 8 + 1); + snprintf(p + l - 8 + 1 + 32, 1 + 16 + 1 + 16 + 8 + 1, + "-%016llx-%016llx.journal", + (unsigned long long) le64toh((*f)->header->seqnum), + (unsigned long long) le64toh((*f)->header->tail_entry_realtime)); + + r = rename(old_file->path, p); + free(p); + + if (r < 0) + return -errno; + + old_file->header->state = STATE_ARCHIVED; + + r = journal_file_open(old_file->path, old_file->flags, old_file->mode, old_file, &new_file); + journal_file_close(old_file); + + *f = new_file; + return r; +} + +int journal_file_open_reliably( + const char *fname, + int flags, + mode_t mode, + JournalFile *template, + JournalFile **ret) { + + int r; + size_t l; + char *p; + + r = journal_file_open(fname, flags, mode, template, ret); + if (r != -EBADMSG && /* corrupted */ + r != -ENODATA && /* truncated */ + r != -EHOSTDOWN && /* other machine */ + r != -EPROTONOSUPPORT) /* incompatible feature */ + return r; + + if ((flags & O_ACCMODE) == O_RDONLY) + return r; + + if (!(flags & O_CREAT)) + return r; + + /* The file is corrupted. Rotate it away and try it again (but only once) */ + + l = strlen(fname); + if (asprintf(&p, "%.*s@%016llx-%016llx.journal~", + (int) (l-8), fname, + (unsigned long long) now(CLOCK_REALTIME), + random_ull()) < 0) + return -ENOMEM; + + r = rename(fname, p); + free(p); + if (r < 0) + return -errno; + + log_warning("File %s corrupted, renaming and replacing.", fname); + + return journal_file_open(fname, flags, mode, template, ret); +} + +struct vacuum_info { + off_t usage; + char *filename; + + uint64_t realtime; + sd_id128_t seqnum_id; + uint64_t seqnum; + + bool have_seqnum; +}; + +static int vacuum_compare(const void *_a, const void *_b) { + const struct vacuum_info *a, *b; + + a = _a; + b = _b; + + if (a->have_seqnum && b->have_seqnum && + sd_id128_equal(a->seqnum_id, b->seqnum_id)) { + if (a->seqnum < b->seqnum) + return -1; + else if (a->seqnum > b->seqnum) + return 1; + else + return 0; + } + + if (a->realtime < b->realtime) + return -1; + else if (a->realtime > b->realtime) + return 1; + else if (a->have_seqnum && b->have_seqnum) + return memcmp(&a->seqnum_id, &b->seqnum_id, 16); + else + return strcmp(a->filename, b->filename); +} + +int journal_directory_vacuum(const char *directory, uint64_t max_use, uint64_t min_free) { + DIR *d; + int r = 0; + struct vacuum_info *list = NULL; + unsigned n_list = 0, n_allocated = 0, i; + uint64_t sum = 0; + + assert(directory); + + if (max_use <= 0) + return 0; + + d = opendir(directory); + if (!d) + return -errno; + + for (;;) { + int k; + struct dirent buf, *de; + size_t q; + struct stat st; + char *p; + unsigned long long seqnum, realtime; + sd_id128_t seqnum_id; + bool have_seqnum; + + k = readdir_r(d, &buf, &de); + if (k != 0) { + r = -k; + goto finish; + } + + if (!de) + break; + + if (fstatat(dirfd(d), de->d_name, &st, AT_SYMLINK_NOFOLLOW) < 0) + continue; + + if (!S_ISREG(st.st_mode)) + continue; + + q = strlen(de->d_name); + + if (endswith(de->d_name, ".journal")) { + + /* Vacuum archived files */ + + if (q < 1 + 32 + 1 + 16 + 1 + 16 + 8) + continue; + + if (de->d_name[q-8-16-1] != '-' || + de->d_name[q-8-16-1-16-1] != '-' || + de->d_name[q-8-16-1-16-1-32-1] != '@') + continue; + + p = strdup(de->d_name); + if (!p) { + r = -ENOMEM; + goto finish; + } + + de->d_name[q-8-16-1-16-1] = 0; + if (sd_id128_from_string(de->d_name + q-8-16-1-16-1-32, &seqnum_id) < 0) { + free(p); + continue; + } + + if (sscanf(de->d_name + q-8-16-1-16, "%16llx-%16llx.journal", &seqnum, &realtime) != 2) { + free(p); + continue; + } + + have_seqnum = true; + + } else if (endswith(de->d_name, ".journal~")) { + unsigned long long tmp; + + /* Vacuum corrupted files */ + + if (q < 1 + 16 + 1 + 16 + 8 + 1) + continue; + + if (de->d_name[q-1-8-16-1] != '-' || + de->d_name[q-1-8-16-1-16-1] != '@') + continue; + + p = strdup(de->d_name); + if (!p) { + r = -ENOMEM; + goto finish; + } + + if (sscanf(de->d_name + q-1-8-16-1-16, "%16llx-%16llx.journal~", &realtime, &tmp) != 2) { + free(p); + continue; + } + + have_seqnum = false; + } else + continue; + + if (n_list >= n_allocated) { + struct vacuum_info *j; + + n_allocated = MAX(n_allocated * 2U, 8U); + j = realloc(list, n_allocated * sizeof(struct vacuum_info)); + if (!j) { + free(p); + r = -ENOMEM; + goto finish; + } + + list = j; + } + + list[n_list].filename = p; + list[n_list].usage = 512UL * (uint64_t) st.st_blocks; + list[n_list].seqnum = seqnum; + list[n_list].realtime = realtime; + list[n_list].seqnum_id = seqnum_id; + list[n_list].have_seqnum = have_seqnum; + + sum += list[n_list].usage; + + n_list ++; + } + + qsort(list, n_list, sizeof(struct vacuum_info), vacuum_compare); + + for(i = 0; i < n_list; i++) { + struct statvfs ss; + + if (fstatvfs(dirfd(d), &ss) < 0) { + r = -errno; + goto finish; + } + + if (sum <= max_use && + (uint64_t) ss.f_bavail * (uint64_t) ss.f_bsize >= min_free) + break; + + if (unlinkat(dirfd(d), list[i].filename, 0) >= 0) { + log_info("Deleted archived journal %s/%s.", directory, list[i].filename); + sum -= list[i].usage; + } else if (errno != ENOENT) + log_warning("Failed to delete %s/%s: %m", directory, list[i].filename); + } + +finish: + for (i = 0; i < n_list; i++) + free(list[i].filename); + + free(list); + + if (d) + closedir(d); + + return r; +} + +int journal_file_copy_entry(JournalFile *from, JournalFile *to, Object *o, uint64_t p, uint64_t *seqnum, Object **ret, uint64_t *offset) { + uint64_t i, n; + uint64_t q, xor_hash = 0; + int r; + EntryItem *items; + dual_timestamp ts; + + assert(from); + assert(to); + assert(o); + assert(p); + + if (!to->writable) + return -EPERM; + + ts.monotonic = le64toh(o->entry.monotonic); + ts.realtime = le64toh(o->entry.realtime); + + if (to->tail_entry_monotonic_valid && + ts.monotonic < le64toh(to->header->tail_entry_monotonic)) + return -EINVAL; + + if (ts.realtime < le64toh(to->header->tail_entry_realtime)) + return -EINVAL; + + n = journal_file_entry_n_items(o); + items = alloca(sizeof(EntryItem) * n); + + for (i = 0; i < n; i++) { + uint64_t l, h; + le64_t le_hash; + size_t t; + void *data; + Object *u; + + q = le64toh(o->entry.items[i].object_offset); + le_hash = o->entry.items[i].hash; + + r = journal_file_move_to_object(from, OBJECT_DATA, q, &o); + if (r < 0) + return r; + + if (le_hash != o->data.hash) + return -EBADMSG; + + l = le64toh(o->object.size) - offsetof(Object, data.payload); + t = (size_t) l; + + /* We hit the limit on 32bit machines */ + if ((uint64_t) t != l) + return -E2BIG; + + if (o->object.flags & OBJECT_COMPRESSED) { +#ifdef HAVE_XZ + uint64_t rsize; + + if (!uncompress_blob(o->data.payload, l, &from->compress_buffer, &from->compress_buffer_size, &rsize)) + return -EBADMSG; + + data = from->compress_buffer; + l = rsize; +#else + return -EPROTONOSUPPORT; +#endif + } else + data = o->data.payload; + + r = journal_file_append_data(to, data, l, &u, &h); + if (r < 0) + return r; + + xor_hash ^= le64toh(u->data.hash); + items[i].object_offset = htole64(h); + items[i].hash = u->data.hash; + + r = journal_file_move_to_object(from, OBJECT_ENTRY, p, &o); + if (r < 0) + return r; + } + + return journal_file_append_entry_internal(to, &ts, xor_hash, items, n, seqnum, ret, offset); +} + +void journal_default_metrics(JournalMetrics *m, int fd) { + uint64_t fs_size = 0; + struct statvfs ss; + char a[FORMAT_BYTES_MAX], b[FORMAT_BYTES_MAX], c[FORMAT_BYTES_MAX], d[FORMAT_BYTES_MAX]; + + assert(m); + assert(fd >= 0); + + if (fstatvfs(fd, &ss) >= 0) + fs_size = ss.f_frsize * ss.f_blocks; + + if (m->max_use == (uint64_t) -1) { + + if (fs_size > 0) { + m->max_use = PAGE_ALIGN(fs_size / 10); /* 10% of file system size */ + + if (m->max_use > DEFAULT_MAX_USE_UPPER) + m->max_use = DEFAULT_MAX_USE_UPPER; + + if (m->max_use < DEFAULT_MAX_USE_LOWER) + m->max_use = DEFAULT_MAX_USE_LOWER; + } else + m->max_use = DEFAULT_MAX_USE_LOWER; + } else { + m->max_use = PAGE_ALIGN(m->max_use); + + if (m->max_use < JOURNAL_FILE_SIZE_MIN*2) + m->max_use = JOURNAL_FILE_SIZE_MIN*2; + } + + if (m->max_size == (uint64_t) -1) { + m->max_size = PAGE_ALIGN(m->max_use / 8); /* 8 chunks */ + + if (m->max_size > DEFAULT_MAX_SIZE_UPPER) + m->max_size = DEFAULT_MAX_SIZE_UPPER; + } else + m->max_size = PAGE_ALIGN(m->max_size); + + if (m->max_size < JOURNAL_FILE_SIZE_MIN) + m->max_size = JOURNAL_FILE_SIZE_MIN; + + if (m->max_size*2 > m->max_use) + m->max_use = m->max_size*2; + + if (m->min_size == (uint64_t) -1) + m->min_size = JOURNAL_FILE_SIZE_MIN; + else { + m->min_size = PAGE_ALIGN(m->min_size); + + if (m->min_size < JOURNAL_FILE_SIZE_MIN) + m->min_size = JOURNAL_FILE_SIZE_MIN; + + if (m->min_size > m->max_size) + m->max_size = m->min_size; + } + + if (m->keep_free == (uint64_t) -1) { + + if (fs_size > 0) { + m->keep_free = PAGE_ALIGN(fs_size / 20); /* 5% of file system size */ + + if (m->keep_free > DEFAULT_KEEP_FREE_UPPER) + m->keep_free = DEFAULT_KEEP_FREE_UPPER; + + } else + m->keep_free = DEFAULT_KEEP_FREE; + } + + log_info("Fixed max_use=%s max_size=%s min_size=%s keep_free=%s", + format_bytes(a, sizeof(a), m->max_use), + format_bytes(b, sizeof(b), m->max_size), + format_bytes(c, sizeof(c), m->min_size), + format_bytes(d, sizeof(d), m->keep_free)); +} diff --git a/src/journal/journal-file.h b/src/journal/journal-file.h new file mode 100644 index 0000000000..57d66cafc5 --- /dev/null +++ b/src/journal/journal-file.h @@ -0,0 +1,128 @@ +/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/ + +#ifndef foojournalfilehfoo +#define foojournalfilehfoo + +/*** + This file is part of systemd. + + Copyright 2011 Lennart Poettering + + systemd is free software; you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + systemd is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with systemd; If not, see <http://www.gnu.org/licenses/>. +***/ + +#include <inttypes.h> + +#include <systemd/sd-id128.h> + +#include "sparse-endian.h" +#include "journal-def.h" +#include "util.h" + +typedef struct Window { + void *ptr; + uint64_t offset; + uint64_t size; +} Window; + +enum { + WINDOW_UNKNOWN = OBJECT_UNUSED, + WINDOW_DATA = OBJECT_DATA, + WINDOW_ENTRY = OBJECT_ENTRY, + WINDOW_DATA_HASH_TABLE = OBJECT_DATA_HASH_TABLE, + WINDOW_FIELD_HASH_TABLE = OBJECT_FIELD_HASH_TABLE, + WINDOW_ENTRY_ARRAY = OBJECT_ENTRY_ARRAY, + WINDOW_HEADER, + _WINDOW_MAX +}; + +typedef struct JournalMetrics { + uint64_t max_use; + uint64_t max_size; + uint64_t min_size; + uint64_t keep_free; +} JournalMetrics; + +typedef struct JournalFile { + int fd; + char *path; + struct stat last_stat; + mode_t mode; + int flags; + int prot; + bool writable; + bool tail_entry_monotonic_valid; + + Header *header; + HashItem *data_hash_table; + HashItem *field_hash_table; + + Window windows[_WINDOW_MAX]; + + uint64_t current_offset; + + JournalMetrics metrics; + + bool compress; + +#ifdef HAVE_XZ + void *compress_buffer; + uint64_t compress_buffer_size; +#endif +} JournalFile; + +typedef enum direction { + DIRECTION_UP, + DIRECTION_DOWN +} direction_t; + +int journal_file_open(const char *fname, int flags, mode_t mode, JournalFile *template, JournalFile **ret); +void journal_file_close(JournalFile *j); + +int journal_file_open_reliably(const char *fname, int flags, mode_t mode, JournalFile *template, JournalFile **ret); + +int journal_file_move_to_object(JournalFile *f, int type, uint64_t offset, Object **ret); + +uint64_t journal_file_entry_n_items(Object *o); + +int journal_file_append_entry(JournalFile *f, const dual_timestamp *ts, const struct iovec iovec[], unsigned n_iovec, uint64_t *seqno, Object **ret, uint64_t *offset); + +int journal_file_find_data_object(JournalFile *f, const void *data, uint64_t size, Object **ret, uint64_t *offset); +int journal_file_find_data_object_with_hash(JournalFile *f, const void *data, uint64_t size, uint64_t hash, Object **ret, uint64_t *offset); + +int journal_file_next_entry(JournalFile *f, Object *o, uint64_t p, direction_t direction, Object **ret, uint64_t *offset); +int journal_file_skip_entry(JournalFile *f, Object *o, uint64_t p, int64_t skip, Object **ret, uint64_t *offset); + +int journal_file_next_entry_for_data(JournalFile *f, Object *o, uint64_t p, uint64_t data_offset, direction_t direction, Object **ret, uint64_t *offset); + +int journal_file_move_to_entry_by_seqnum(JournalFile *f, uint64_t seqnum, direction_t direction, Object **ret, uint64_t *offset); +int journal_file_move_to_entry_by_realtime(JournalFile *f, uint64_t realtime, direction_t direction, Object **ret, uint64_t *offset); +int journal_file_move_to_entry_by_monotonic(JournalFile *f, sd_id128_t boot_id, uint64_t monotonic, direction_t direction, Object **ret, uint64_t *offset); + +int journal_file_move_to_entry_by_seqnum_for_data(JournalFile *f, uint64_t data_offset, uint64_t seqnum, direction_t direction, Object **ret, uint64_t *offset); +int journal_file_move_to_entry_by_realtime_for_data(JournalFile *f, uint64_t data_offset, uint64_t realtime, direction_t direction, Object **ret, uint64_t *offset); + +int journal_file_copy_entry(JournalFile *from, JournalFile *to, Object *o, uint64_t p, uint64_t *seqnum, Object **ret, uint64_t *offset); + +void journal_file_dump(JournalFile *f); + +int journal_file_rotate(JournalFile **f); + +int journal_directory_vacuum(const char *directory, uint64_t max_use, uint64_t min_free); + +void journal_file_post_change(JournalFile *f); + +void journal_default_metrics(JournalMetrics *m, int fd); + +#endif diff --git a/src/journal/journal-internal.h b/src/journal/journal-internal.h new file mode 100644 index 0000000000..17f1d317c7 --- /dev/null +++ b/src/journal/journal-internal.h @@ -0,0 +1,84 @@ +/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/ + +#ifndef foojournalinternalhfoo +#define foojournalinternalhfoo + +/*** + This file is part of systemd. + + Copyright 2011 Lennart Poettering + + systemd is free software; you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + systemd is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with systemd; If not, see <http://www.gnu.org/licenses/>. +***/ + +#include <sys/types.h> +#include <inttypes.h> +#include <stdbool.h> + +#include <systemd/sd-id128.h> + +#include "list.h" + +typedef struct Match Match; + +struct Match { + char *data; + size_t size; + le64_t le_hash; + + LIST_FIELDS(Match, matches); +}; + +typedef enum location_type { + LOCATION_HEAD, + LOCATION_TAIL, + LOCATION_DISCRETE +} location_type_t; + +typedef struct Location { + location_type_t type; + + uint64_t seqnum; + sd_id128_t seqnum_id; + bool seqnum_set; + + uint64_t realtime; + bool realtime_set; + + uint64_t monotonic; + sd_id128_t boot_id; + bool monotonic_set; + + uint64_t xor_hash; + bool xor_hash_set; +} Location; + +struct sd_journal { + int flags; + + Hashmap *files; + + Location current_location; + JournalFile *current_file; + uint64_t current_field; + + int inotify_fd; + Hashmap *inotify_wd_dirs; + Hashmap *inotify_wd_roots; + + LIST_HEAD(Match, matches); + unsigned n_matches; +}; + +#endif diff --git a/src/journal/journal-rate-limit.c b/src/journal/journal-rate-limit.c new file mode 100644 index 0000000000..243ff2a378 --- /dev/null +++ b/src/journal/journal-rate-limit.c @@ -0,0 +1,275 @@ +/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/ + +/*** + This file is part of systemd. + + Copyright 2011 Lennart Poettering + + systemd is free software; you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + systemd is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with systemd; If not, see <http://www.gnu.org/licenses/>. +***/ + +#include <string.h> +#include <errno.h> + +#include "journal-rate-limit.h" +#include "list.h" +#include "util.h" +#include "hashmap.h" + +#define POOLS_MAX 5 +#define BUCKETS_MAX 127 +#define GROUPS_MAX 2047 + +static const int priority_map[] = { + [LOG_EMERG] = 0, + [LOG_ALERT] = 0, + [LOG_CRIT] = 0, + [LOG_ERR] = 1, + [LOG_WARNING] = 2, + [LOG_NOTICE] = 3, + [LOG_INFO] = 3, + [LOG_DEBUG] = 4 +}; + +typedef struct JournalRateLimitPool JournalRateLimitPool; +typedef struct JournalRateLimitGroup JournalRateLimitGroup; + +struct JournalRateLimitPool { + usec_t begin; + unsigned num; + unsigned suppressed; +}; + +struct JournalRateLimitGroup { + JournalRateLimit *parent; + + char *id; + JournalRateLimitPool pools[POOLS_MAX]; + unsigned hash; + + LIST_FIELDS(JournalRateLimitGroup, bucket); + LIST_FIELDS(JournalRateLimitGroup, lru); +}; + +struct JournalRateLimit { + usec_t interval; + unsigned burst; + + JournalRateLimitGroup* buckets[BUCKETS_MAX]; + JournalRateLimitGroup *lru, *lru_tail; + + unsigned n_groups; +}; + +JournalRateLimit *journal_rate_limit_new(usec_t interval, unsigned burst) { + JournalRateLimit *r; + + assert(interval > 0 || burst == 0); + + r = new0(JournalRateLimit, 1); + if (!r) + return NULL; + + r->interval = interval; + r->burst = burst; + + return r; +} + +static void journal_rate_limit_group_free(JournalRateLimitGroup *g) { + assert(g); + + if (g->parent) { + assert(g->parent->n_groups > 0); + + if (g->parent->lru_tail == g) + g->parent->lru_tail = g->lru_prev; + + LIST_REMOVE(JournalRateLimitGroup, lru, g->parent->lru, g); + LIST_REMOVE(JournalRateLimitGroup, bucket, g->parent->buckets[g->hash % BUCKETS_MAX], g); + + g->parent->n_groups --; + } + + free(g->id); + free(g); +} + +void journal_rate_limit_free(JournalRateLimit *r) { + assert(r); + + while (r->lru) + journal_rate_limit_group_free(r->lru); + + free(r); +} + +static bool journal_rate_limit_group_expired(JournalRateLimitGroup *g, usec_t ts) { + unsigned i; + + assert(g); + + for (i = 0; i < POOLS_MAX; i++) + if (g->pools[i].begin + g->parent->interval >= ts) + return false; + + return true; +} + +static void journal_rate_limit_vacuum(JournalRateLimit *r, usec_t ts) { + assert(r); + + /* Makes room for at least one new item, but drop all + * expored items too. */ + + while (r->n_groups >= GROUPS_MAX || + (r->lru_tail && journal_rate_limit_group_expired(r->lru_tail, ts))) + journal_rate_limit_group_free(r->lru_tail); +} + +static JournalRateLimitGroup* journal_rate_limit_group_new(JournalRateLimit *r, const char *id, usec_t ts) { + JournalRateLimitGroup *g; + + assert(r); + assert(id); + + g = new0(JournalRateLimitGroup, 1); + if (!g) + return NULL; + + g->id = strdup(id); + if (!g->id) + goto fail; + + g->hash = string_hash_func(g->id); + + journal_rate_limit_vacuum(r, ts); + + LIST_PREPEND(JournalRateLimitGroup, bucket, r->buckets[g->hash % BUCKETS_MAX], g); + LIST_PREPEND(JournalRateLimitGroup, lru, r->lru, g); + if (!g->lru_next) + r->lru_tail = g; + r->n_groups ++; + + g->parent = r; + return g; + +fail: + journal_rate_limit_group_free(g); + return NULL; +} + +static uint64_t u64log2(uint64_t n) { + unsigned r; + + if (n <= 1) + return 0; + + r = 0; + for (;;) { + n = n >> 1; + if (!n) + return r; + r++; + } +} + +static unsigned burst_modulate(unsigned burst, uint64_t available) { + unsigned k; + + /* Modulates the burst rate a bit with the amount of available + * disk space */ + + k = u64log2(available); + + /* 1MB */ + if (k <= 20) + return burst; + + burst = (burst * (k-20)) / 4; + + /* + * Example: + * + * <= 1MB = rate * 1 + * 16MB = rate * 2 + * 256MB = rate * 3 + * 4GB = rate * 4 + * 64GB = rate * 5 + * 1TB = rate * 6 + */ + + return burst; +} + +int journal_rate_limit_test(JournalRateLimit *r, const char *id, int priority, uint64_t available) { + unsigned h; + JournalRateLimitGroup *g; + JournalRateLimitPool *p; + unsigned burst; + usec_t ts; + + assert(id); + + if (!r) + return 1; + + if (r->interval == 0 || r->burst == 0) + return 1; + + burst = burst_modulate(r->burst, available); + + ts = now(CLOCK_MONOTONIC); + + h = string_hash_func(id); + g = r->buckets[h % BUCKETS_MAX]; + + LIST_FOREACH(bucket, g, g) + if (streq(g->id, id)) + break; + + if (!g) { + g = journal_rate_limit_group_new(r, id, ts); + if (!g) + return -ENOMEM; + } + + p = &g->pools[priority_map[priority]]; + + if (p->begin <= 0) { + p->suppressed = 0; + p->num = 1; + p->begin = ts; + return 1; + } + + if (p->begin + r->interval < ts) { + unsigned s; + + s = p->suppressed; + p->suppressed = 0; + p->num = 1; + p->begin = ts; + + return 1 + s; + } + + if (p->num <= burst) { + p->num++; + return 1; + } + + p->suppressed++; + return 0; +} diff --git a/src/journal/journal-rate-limit.h b/src/journal/journal-rate-limit.h new file mode 100644 index 0000000000..2bbdd5f9fe --- /dev/null +++ b/src/journal/journal-rate-limit.h @@ -0,0 +1,34 @@ +/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/ + +#ifndef foojournalratelimithfoo +#define foojournalratelimithfoo + +/*** + This file is part of systemd. + + Copyright 2011 Lennart Poettering + + systemd is free software; you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + systemd is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with systemd; If not, see <http://www.gnu.org/licenses/>. +***/ + +#include "macro.h" +#include "util.h" + +typedef struct JournalRateLimit JournalRateLimit; + +JournalRateLimit *journal_rate_limit_new(usec_t interval, unsigned burst); +void journal_rate_limit_free(JournalRateLimit *r); +int journal_rate_limit_test(JournalRateLimit *r, const char *id, int priority, uint64_t available); + +#endif diff --git a/src/journal/journal-send.c b/src/journal/journal-send.c new file mode 100644 index 0000000000..32e94af91f --- /dev/null +++ b/src/journal/journal-send.c @@ -0,0 +1,516 @@ +/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/ + +/*** + This file is part of systemd. + + Copyright 2011 Lennart Poettering + + systemd is free software; you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + systemd is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with systemd; If not, see <http://www.gnu.org/licenses/>. +***/ + +#include <sys/socket.h> +#include <sys/un.h> +#include <errno.h> +#include <stddef.h> +#include <unistd.h> +#include <fcntl.h> + +#define SD_JOURNAL_SUPPRESS_LOCATION + +#include "sd-journal.h" +#include "util.h" +#include "socket-util.h" + +#define SNDBUF_SIZE (8*1024*1024) + +/* We open a single fd, and we'll share it with the current process, + * all its threads, and all its subprocesses. This means we need to + * initialize it atomically, and need to operate on it atomically + * never assuming we are the only user */ + +static int journal_fd(void) { + int fd; + static int fd_plus_one = 0; + +retry: + if (fd_plus_one > 0) + return fd_plus_one - 1; + + fd = socket(AF_UNIX, SOCK_DGRAM|SOCK_CLOEXEC, 0); + if (fd < 0) + return -errno; + + fd_inc_sndbuf(fd, SNDBUF_SIZE); + + if (!__sync_bool_compare_and_swap(&fd_plus_one, 0, fd+1)) { + close_nointr_nofail(fd); + goto retry; + } + + return fd; +} + +_public_ int sd_journal_print(int priority, const char *format, ...) { + int r; + va_list ap; + + va_start(ap, format); + r = sd_journal_printv(priority, format, ap); + va_end(ap); + + return r; +} + +_public_ int sd_journal_printv(int priority, const char *format, va_list ap) { + char buffer[8 + LINE_MAX], p[11]; + struct iovec iov[2]; + + if (priority < 0 || priority > 7) + return -EINVAL; + + if (!format) + return -EINVAL; + + snprintf(p, sizeof(p), "PRIORITY=%i", priority & LOG_PRIMASK); + char_array_0(p); + + memcpy(buffer, "MESSAGE=", 8); + vsnprintf(buffer+8, sizeof(buffer) - 8, format, ap); + char_array_0(buffer); + + zero(iov); + IOVEC_SET_STRING(iov[0], buffer); + IOVEC_SET_STRING(iov[1], p); + + return sd_journal_sendv(iov, 2); +} + +static int fill_iovec_sprintf(const char *format, va_list ap, int extra, struct iovec **_iov) { + int r, n = 0, i, j; + struct iovec *iov = NULL; + int saved_errno; + + assert(_iov); + saved_errno = errno; + + if (extra > 0) { + n = MAX(extra * 2, extra + 4); + iov = malloc0(n * sizeof(struct iovec)); + if (!iov) { + r = -ENOMEM; + goto fail; + } + + i = extra; + } else + i = 0; + + while (format) { + struct iovec *c; + char *buffer; + + if (i >= n) { + n = MAX(i*2, 4); + c = realloc(iov, n * sizeof(struct iovec)); + if (!c) { + r = -ENOMEM; + goto fail; + } + + iov = c; + } + + if (vasprintf(&buffer, format, ap) < 0) { + r = -ENOMEM; + goto fail; + } + + IOVEC_SET_STRING(iov[i++], buffer); + + format = va_arg(ap, char *); + } + + *_iov = iov; + + errno = saved_errno; + return i; + +fail: + for (j = 0; j < i; j++) + free(iov[j].iov_base); + + free(iov); + + errno = saved_errno; + return r; +} + +_public_ int sd_journal_send(const char *format, ...) { + int r, i, j; + va_list ap; + struct iovec *iov = NULL; + + va_start(ap, format); + i = fill_iovec_sprintf(format, ap, 0, &iov); + va_end(ap); + + if (_unlikely_(i < 0)) { + r = i; + goto finish; + } + + r = sd_journal_sendv(iov, i); + +finish: + for (j = 0; j < i; j++) + free(iov[j].iov_base); + + free(iov); + + return r; +} + +_public_ int sd_journal_sendv(const struct iovec *iov, int n) { + int fd, buffer_fd; + struct iovec *w; + uint64_t *l; + int r, i, j = 0; + struct msghdr mh; + struct sockaddr_un sa; + ssize_t k; + int saved_errno; + union { + struct cmsghdr cmsghdr; + uint8_t buf[CMSG_SPACE(sizeof(int))]; + } control; + struct cmsghdr *cmsg; + /* We use /dev/shm instead of /tmp here, since we want this to + * be a tmpfs, and one that is available from early boot on + * and where unprivileged users can create files. */ + char path[] = "/dev/shm/journal.XXXXXX"; + + if (_unlikely_(!iov)) + return -EINVAL; + + if (_unlikely_(n <= 0)) + return -EINVAL; + + saved_errno = errno; + + w = alloca(sizeof(struct iovec) * n * 5); + l = alloca(sizeof(uint64_t) * n); + + for (i = 0; i < n; i++) { + char *c, *nl; + + if (_unlikely_(!iov[i].iov_base || iov[i].iov_len <= 1)) { + r = -EINVAL; + goto finish; + } + + c = memchr(iov[i].iov_base, '=', iov[i].iov_len); + if (_unlikely_(!c || c == iov[i].iov_base)) { + r = -EINVAL; + goto finish; + } + + nl = memchr(iov[i].iov_base, '\n', iov[i].iov_len); + if (nl) { + if (_unlikely_(nl < c)) { + r = -EINVAL; + goto finish; + } + + /* Already includes a newline? Bummer, then + * let's write the variable name, then a + * newline, then the size (64bit LE), followed + * by the data and a final newline */ + + w[j].iov_base = iov[i].iov_base; + w[j].iov_len = c - (char*) iov[i].iov_base; + j++; + + IOVEC_SET_STRING(w[j++], "\n"); + + l[i] = htole64(iov[i].iov_len - (c - (char*) iov[i].iov_base) - 1); + w[j].iov_base = &l[i]; + w[j].iov_len = sizeof(uint64_t); + j++; + + w[j].iov_base = c + 1; + w[j].iov_len = iov[i].iov_len - (c - (char*) iov[i].iov_base) - 1; + j++; + + } else + /* Nothing special? Then just add the line and + * append a newline */ + w[j++] = iov[i]; + + IOVEC_SET_STRING(w[j++], "\n"); + } + + fd = journal_fd(); + if (_unlikely_(fd < 0)) { + r = fd; + goto finish; + } + + zero(sa); + sa.sun_family = AF_UNIX; + strncpy(sa.sun_path, "/run/systemd/journal/socket", sizeof(sa.sun_path)); + + zero(mh); + mh.msg_name = &sa; + mh.msg_namelen = offsetof(struct sockaddr_un, sun_path) + strlen(sa.sun_path); + mh.msg_iov = w; + mh.msg_iovlen = j; + + k = sendmsg(fd, &mh, MSG_NOSIGNAL); + if (k >= 0) { + r = 0; + goto finish; + } + + if (errno != EMSGSIZE && errno != ENOBUFS) { + r = -errno; + goto finish; + } + + /* Message doesn't fit... Let's dump the data in a temporary + * file and just pass a file descriptor of it to the other + * side */ + + buffer_fd = mkostemp(path, O_CLOEXEC|O_RDWR); + if (buffer_fd < 0) { + r = -errno; + goto finish; + } + + if (unlink(path) < 0) { + close_nointr_nofail(buffer_fd); + r = -errno; + goto finish; + } + + n = writev(buffer_fd, w, j); + if (n < 0) { + close_nointr_nofail(buffer_fd); + r = -errno; + goto finish; + } + + mh.msg_iov = NULL; + mh.msg_iovlen = 0; + + zero(control); + mh.msg_control = &control; + mh.msg_controllen = sizeof(control); + + cmsg = CMSG_FIRSTHDR(&mh); + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_type = SCM_RIGHTS; + cmsg->cmsg_len = CMSG_LEN(sizeof(int)); + memcpy(CMSG_DATA(cmsg), &buffer_fd, sizeof(int)); + + mh.msg_controllen = cmsg->cmsg_len; + + k = sendmsg(fd, &mh, MSG_NOSIGNAL); + close_nointr_nofail(buffer_fd); + + if (k < 0) { + r = -errno; + goto finish; + } + + r = 0; + +finish: + errno = saved_errno; + + return r; +} + +_public_ int sd_journal_stream_fd(const char *identifier, int priority, int level_prefix) { + union sockaddr_union sa; + int fd; + char *header; + size_t l; + ssize_t r; + + if (priority < 0 || priority > 7) + return -EINVAL; + + fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0); + if (fd < 0) + return -errno; + + zero(sa); + sa.un.sun_family = AF_UNIX; + strncpy(sa.un.sun_path, "/run/systemd/journal/stdout", sizeof(sa.un.sun_path)); + + r = connect(fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path)); + if (r < 0) { + close_nointr_nofail(fd); + return -errno; + } + + if (shutdown(fd, SHUT_RD) < 0) { + close_nointr_nofail(fd); + return -errno; + } + + fd_inc_sndbuf(fd, SNDBUF_SIZE); + + if (!identifier) + identifier = ""; + + l = strlen(identifier); + header = alloca(l + 1 + 2 + 2 + 2 + 2 + 2); + + memcpy(header, identifier, l); + header[l++] = '\n'; + header[l++] = '0' + priority; + header[l++] = '\n'; + header[l++] = '0' + !!level_prefix; + header[l++] = '\n'; + header[l++] = '0'; + header[l++] = '\n'; + header[l++] = '0'; + header[l++] = '\n'; + header[l++] = '0'; + header[l++] = '\n'; + + r = loop_write(fd, header, l, false); + if (r < 0) { + close_nointr_nofail(fd); + return (int) r; + } + + if ((size_t) r != l) { + close_nointr_nofail(fd); + return -errno; + } + + return fd; +} + +_public_ int sd_journal_print_with_location(int priority, const char *file, const char *line, const char *func, const char *format, ...) { + int r; + va_list ap; + + va_start(ap, format); + r = sd_journal_printv_with_location(priority, file, line, func, format, ap); + va_end(ap); + + return r; +} + +_public_ int sd_journal_printv_with_location(int priority, const char *file, const char *line, const char *func, const char *format, va_list ap) { + char buffer[8 + LINE_MAX], p[11]; + struct iovec iov[5]; + char *f; + size_t fl; + + if (priority < 0 || priority > 7) + return -EINVAL; + + if (_unlikely_(!format)) + return -EINVAL; + + snprintf(p, sizeof(p), "PRIORITY=%i", priority & LOG_PRIMASK); + char_array_0(p); + + memcpy(buffer, "MESSAGE=", 8); + vsnprintf(buffer+8, sizeof(buffer) - 8, format, ap); + char_array_0(buffer); + + /* func is initialized from __func__ which is not a macro, but + * a static const char[], hence cannot easily be prefixed with + * CODE_FUNC=, hence let's do it manually here. */ + fl = strlen(func); + f = alloca(fl + 10); + memcpy(f, "CODE_FUNC=", 10); + memcpy(f + 10, func, fl + 1); + + zero(iov); + IOVEC_SET_STRING(iov[0], buffer); + IOVEC_SET_STRING(iov[1], p); + IOVEC_SET_STRING(iov[2], file); + IOVEC_SET_STRING(iov[3], line); + IOVEC_SET_STRING(iov[4], f); + + return sd_journal_sendv(iov, ELEMENTSOF(iov)); +} + +_public_ int sd_journal_send_with_location(const char *file, const char *line, const char *func, const char *format, ...) { + int r, i, j; + va_list ap; + struct iovec *iov = NULL; + char *f; + size_t fl; + + va_start(ap, format); + i = fill_iovec_sprintf(format, ap, 3, &iov); + va_end(ap); + + if (_unlikely_(i < 0)) { + r = i; + goto finish; + } + + fl = strlen(func); + f = alloca(fl + 10); + memcpy(f, "CODE_FUNC=", 10); + memcpy(f + 10, func, fl + 1); + + IOVEC_SET_STRING(iov[0], file); + IOVEC_SET_STRING(iov[1], line); + IOVEC_SET_STRING(iov[2], f); + + r = sd_journal_sendv(iov, i); + +finish: + for (j = 3; j < i; j++) + free(iov[j].iov_base); + + free(iov); + + return r; +} + +_public_ int sd_journal_sendv_with_location(const char *file, const char *line, const char *func, const struct iovec *iov, int n) { + struct iovec *niov; + char *f; + size_t fl; + + if (_unlikely_(!iov)) + return -EINVAL; + + if (_unlikely_(n <= 0)) + return -EINVAL; + + niov = alloca(sizeof(struct iovec) * (n + 3)); + memcpy(niov, iov, sizeof(struct iovec) * n); + + fl = strlen(func); + f = alloca(fl + 10); + memcpy(f, "CODE_FUNC=", 10); + memcpy(f + 10, func, fl + 1); + + IOVEC_SET_STRING(niov[n++], file); + IOVEC_SET_STRING(niov[n++], line); + IOVEC_SET_STRING(niov[n++], f); + + return sd_journal_sendv(niov, n); +} diff --git a/src/journal/journalctl.c b/src/journal/journalctl.c new file mode 100644 index 0000000000..01dceca237 --- /dev/null +++ b/src/journal/journalctl.c @@ -0,0 +1,327 @@ +/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/ + +/*** + This file is part of systemd. + + Copyright 2011 Lennart Poettering + + systemd is free software; you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + systemd is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with systemd; If not, see <http://www.gnu.org/licenses/>. +***/ + +#include <fcntl.h> +#include <errno.h> +#include <stddef.h> +#include <string.h> +#include <stdio.h> +#include <unistd.h> +#include <stdlib.h> +#include <sys/poll.h> +#include <time.h> +#include <getopt.h> + +#include <systemd/sd-journal.h> + +#include "log.h" +#include "util.h" +#include "build.h" +#include "pager.h" +#include "logs-show.h" + +static OutputMode arg_output = OUTPUT_SHORT; +static bool arg_follow = false; +static bool arg_show_all = false; +static bool arg_no_pager = false; +static int arg_lines = -1; +static bool arg_no_tail = false; +static bool arg_new_id128 = false; +static bool arg_quiet = false; +static bool arg_local = false; + +static int help(void) { + + printf("%s [OPTIONS...] {COMMAND} ...\n\n" + "Send control commands to or query the journal.\n\n" + " -h --help Show this help\n" + " --version Show package version\n" + " --no-pager Do not pipe output into a pager\n" + " -a --all Show all fields, including long and unprintable\n" + " -f --follow Follow journal\n" + " -n --lines=INTEGER Journal entries to show\n" + " --no-tail Show all lines, even in follow mode\n" + " -o --output=STRING Change journal output mode (short, short-monotonic,\n" + " verbose, export, json, cat)\n" + " -q --quiet Don't show privilege warning\n" + " --new-id128 Generate a new 128 Bit id\n" + " -l --local Only local entries\n", + program_invocation_short_name); + + return 0; +} + +static int parse_argv(int argc, char *argv[]) { + + enum { + ARG_VERSION = 0x100, + ARG_NO_PAGER, + ARG_NO_TAIL, + ARG_NEW_ID128 + }; + + static const struct option options[] = { + { "help", no_argument, NULL, 'h' }, + { "version" , no_argument, NULL, ARG_VERSION }, + { "no-pager", no_argument, NULL, ARG_NO_PAGER }, + { "follow", no_argument, NULL, 'f' }, + { "output", required_argument, NULL, 'o' }, + { "all", no_argument, NULL, 'a' }, + { "lines", required_argument, NULL, 'n' }, + { "no-tail", no_argument, NULL, ARG_NO_TAIL }, + { "new-id128", no_argument, NULL, ARG_NEW_ID128 }, + { "quiet", no_argument, NULL, 'q' }, + { "local", no_argument, NULL, 'l' }, + { NULL, 0, NULL, 0 } + }; + + int c, r; + + assert(argc >= 0); + assert(argv); + + while ((c = getopt_long(argc, argv, "hfo:an:ql", options, NULL)) >= 0) { + + switch (c) { + + case 'h': + help(); + return 0; + + case ARG_VERSION: + puts(PACKAGE_STRING); + puts(DISTRIBUTION); + puts(SYSTEMD_FEATURES); + return 0; + + case ARG_NO_PAGER: + arg_no_pager = true; + break; + + case 'f': + arg_follow = true; + break; + + case 'o': + arg_output = output_mode_from_string(optarg); + if (arg_output < 0) { + log_error("Unknown output '%s'.", optarg); + return -EINVAL; + } + + break; + + case 'a': + arg_show_all = true; + break; + + case 'n': + r = safe_atoi(optarg, &arg_lines); + if (r < 0 || arg_lines < 0) { + log_error("Failed to parse lines '%s'", optarg); + return -EINVAL; + } + break; + + case ARG_NO_TAIL: + arg_no_tail = true; + break; + + case ARG_NEW_ID128: + arg_new_id128 = true; + break; + + case 'q': + arg_quiet = true; + break; + + case 'l': + arg_local = true; + break; + + case '?': + return -EINVAL; + + default: + log_error("Unknown option code %c", c); + return -EINVAL; + } + } + + if (arg_follow && !arg_no_tail && arg_lines < 0) + arg_lines = 10; + + return 1; +} + +static int generate_new_id128(void) { + sd_id128_t id; + int r; + unsigned i; + + r = sd_id128_randomize(&id); + if (r < 0) { + log_error("Failed to generate ID: %s", strerror(-r)); + return r; + } + + printf("As string:\n" + SD_ID128_FORMAT_STR "\n\n" + "As UUID:\n" + "%02x%02x%02x%02x-%02x%02x-%02x%02x-%02x%02x-%02x%02x%02x%02x%02x%02x\n\n" + "As macro:\n" + "#define MESSAGE_XYZ SD_ID128_MAKE(", + SD_ID128_FORMAT_VAL(id), + SD_ID128_FORMAT_VAL(id)); + + for (i = 0; i < 16; i++) + printf("%02x%s", id.bytes[i], i != 15 ? "," : ""); + + fputs(")\n", stdout); + + return 0; +} + +int main(int argc, char *argv[]) { + int r, i, fd; + sd_journal *j = NULL; + unsigned line = 0; + bool need_seek = false; + + log_parse_environment(); + log_open(); + + r = parse_argv(argc, argv); + if (r <= 0) + goto finish; + + if (arg_new_id128) { + r = generate_new_id128(); + goto finish; + } + +#ifdef HAVE_ACL + if (!arg_quiet && geteuid() != 0 && in_group("adm") <= 0) + log_warning("Showing user generated messages only. Users in the group 'adm' can see all messages. Pass -q to turn this message off."); +#endif + + r = sd_journal_open(&j, arg_local ? SD_JOURNAL_LOCAL_ONLY : 0); + if (r < 0) { + log_error("Failed to open journal: %s", strerror(-r)); + goto finish; + } + + for (i = optind; i < argc; i++) { + r = sd_journal_add_match(j, argv[i], strlen(argv[i])); + if (r < 0) { + log_error("Failed to add match: %s", strerror(-r)); + goto finish; + } + } + + fd = sd_journal_get_fd(j); + if (fd < 0) { + log_error("Failed to get wakeup fd: %s", strerror(-fd)); + goto finish; + } + + if (arg_lines >= 0) { + r = sd_journal_seek_tail(j); + if (r < 0) { + log_error("Failed to seek to tail: %s", strerror(-r)); + goto finish; + } + + r = sd_journal_previous_skip(j, arg_lines); + } else { + r = sd_journal_seek_head(j); + if (r < 0) { + log_error("Failed to seek to head: %s", strerror(-r)); + goto finish; + } + + r = sd_journal_next(j); + } + + if (r < 0) { + log_error("Failed to iterate through journal: %s", strerror(-r)); + goto finish; + } + + if (!arg_no_pager && !arg_follow) { + columns(); + pager_open(); + } + + if (arg_output == OUTPUT_JSON) { + fputc('[', stdout); + fflush(stdout); + } + + for (;;) { + for (;;) { + if (need_seek) { + r = sd_journal_next(j); + if (r < 0) { + log_error("Failed to iterate through journal: %s", strerror(-r)); + goto finish; + } + } + + if (r == 0) + break; + + line ++; + + r = output_journal(j, arg_output, line, 0, arg_show_all); + if (r < 0) + goto finish; + + need_seek = true; + } + + if (!arg_follow) + break; + + r = fd_wait_for_event(fd, POLLIN, (usec_t) -1); + if (r < 0) { + log_error("Couldn't wait for event: %s", strerror(-r)); + goto finish; + } + + r = sd_journal_process(j); + if (r < 0) { + log_error("Failed to process: %s", strerror(-r)); + goto finish; + } + } + + if (arg_output == OUTPUT_JSON) + fputs("\n]\n", stdout); + +finish: + if (j) + sd_journal_close(j); + + pager_close(); + + return r < 0 ? EXIT_FAILURE : EXIT_SUCCESS; +} diff --git a/src/journal/journald-gperf.gperf b/src/journal/journald-gperf.gperf new file mode 100644 index 0000000000..a56f6d966e --- /dev/null +++ b/src/journal/journald-gperf.gperf @@ -0,0 +1,31 @@ +%{ +#include <stddef.h> +#include "conf-parser.h" +#include "journald.h" +%} +struct ConfigPerfItem; +%null_strings +%language=ANSI-C +%define slot-name section_and_lvalue +%define hash-function-name journald_gperf_hash +%define lookup-function-name journald_gperf_lookup +%readonly-tables +%omit-struct-type +%struct-type +%includes +%% +Journal.RateLimitInterval, config_parse_usec, 0, offsetof(Server, rate_limit_interval) +Journal.RateLimitBurst, config_parse_unsigned, 0, offsetof(Server, rate_limit_burst) +Journal.Compress, config_parse_bool, 0, offsetof(Server, compress) +Journal.SystemMaxUse, config_parse_bytes_off, 0, offsetof(Server, system_metrics.max_use) +Journal.SystemMaxFileSize, config_parse_bytes_off, 0, offsetof(Server, system_metrics.max_size) +Journal.SystemMinFileSize, config_parse_bytes_off, 0, offsetof(Server, system_metrics.min_size) +Journal.SystemKeepFree, config_parse_bytes_off, 0, offsetof(Server, system_metrics.keep_free) +Journal.RuntimeMaxUse, config_parse_bytes_off, 0, offsetof(Server, runtime_metrics.max_use) +Journal.RuntimeMaxFileSize, config_parse_bytes_off, 0, offsetof(Server, runtime_metrics.max_size) +Journal.RuntimeMinFileSize, config_parse_bytes_off, 0, offsetof(Server, runtime_metrics.min_size) +Journal.RuntimeKeepFree, config_parse_bytes_off, 0, offsetof(Server, runtime_metrics.keep_free) +Journal.ForwardToSyslog, config_parse_bool, 0, offsetof(Server, forward_to_syslog) +Journal.ForwardToKMsg, config_parse_bool, 0, offsetof(Server, forward_to_kmsg) +Journal.ForwardToConsole, config_parse_bool, 0, offsetof(Server, forward_to_console) +Journal.ImportKernel, config_parse_bool, 0, offsetof(Server, import_proc_kmsg) diff --git a/src/journal/journald.c b/src/journal/journald.c new file mode 100644 index 0000000000..555d74f049 --- /dev/null +++ b/src/journal/journald.c @@ -0,0 +1,2821 @@ +/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/ + +/*** + This file is part of systemd. + + Copyright 2011 Lennart Poettering + + systemd is free software; you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + systemd is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with systemd; If not, see <http://www.gnu.org/licenses/>. +***/ + +#include <sys/epoll.h> +#include <sys/socket.h> +#include <errno.h> +#include <sys/signalfd.h> +#include <unistd.h> +#include <fcntl.h> +#include <stddef.h> +#include <sys/ioctl.h> +#include <linux/sockios.h> +#include <sys/statvfs.h> + +#include <systemd/sd-journal.h> +#include <systemd/sd-login.h> +#include <systemd/sd-messages.h> +#include <systemd/sd-daemon.h> + +#include "hashmap.h" +#include "journal-file.h" +#include "socket-util.h" +#include "cgroup-util.h" +#include "list.h" +#include "journal-rate-limit.h" +#include "journal-internal.h" +#include "conf-parser.h" +#include "journald.h" +#include "virt.h" +#include "missing.h" + +#ifdef HAVE_ACL +#include <sys/acl.h> +#include <acl/libacl.h> +#include "acl-util.h" +#endif + +#ifdef HAVE_SELINUX +#include <selinux/selinux.h> +#endif + +#define USER_JOURNALS_MAX 1024 +#define STDOUT_STREAMS_MAX 4096 + +#define DEFAULT_RATE_LIMIT_INTERVAL (10*USEC_PER_SEC) +#define DEFAULT_RATE_LIMIT_BURST 200 + +#define RECHECK_AVAILABLE_SPACE_USEC (30*USEC_PER_SEC) + +#define RECHECK_VAR_AVAILABLE_USEC (30*USEC_PER_SEC) + +#define N_IOVEC_META_FIELDS 17 + +#define ENTRY_SIZE_MAX (1024*1024*32) + +typedef enum StdoutStreamState { + STDOUT_STREAM_IDENTIFIER, + STDOUT_STREAM_PRIORITY, + STDOUT_STREAM_LEVEL_PREFIX, + STDOUT_STREAM_FORWARD_TO_SYSLOG, + STDOUT_STREAM_FORWARD_TO_KMSG, + STDOUT_STREAM_FORWARD_TO_CONSOLE, + STDOUT_STREAM_RUNNING +} StdoutStreamState; + +struct StdoutStream { + Server *server; + StdoutStreamState state; + + int fd; + + struct ucred ucred; +#ifdef HAVE_SELINUX + security_context_t security_context; +#endif + + char *identifier; + int priority; + bool level_prefix:1; + bool forward_to_syslog:1; + bool forward_to_kmsg:1; + bool forward_to_console:1; + + char buffer[LINE_MAX+1]; + size_t length; + + LIST_FIELDS(StdoutStream, stdout_stream); +}; + +static int server_flush_to_var(Server *s); + +static uint64_t available_space(Server *s) { + char ids[33], *p; + const char *f; + sd_id128_t machine; + struct statvfs ss; + uint64_t sum = 0, avail = 0, ss_avail = 0; + int r; + DIR *d; + usec_t ts; + JournalMetrics *m; + + ts = now(CLOCK_MONOTONIC); + + if (s->cached_available_space_timestamp + RECHECK_AVAILABLE_SPACE_USEC > ts) + return s->cached_available_space; + + r = sd_id128_get_machine(&machine); + if (r < 0) + return 0; + + if (s->system_journal) { + f = "/var/log/journal/"; + m = &s->system_metrics; + } else { + f = "/run/log/journal/"; + m = &s->runtime_metrics; + } + + assert(m); + + p = strappend(f, sd_id128_to_string(machine, ids)); + if (!p) + return 0; + + d = opendir(p); + free(p); + + if (!d) + return 0; + + if (fstatvfs(dirfd(d), &ss) < 0) + goto finish; + + for (;;) { + struct stat st; + struct dirent buf, *de; + + r = readdir_r(d, &buf, &de); + if (r != 0) + break; + + if (!de) + break; + + if (!endswith(de->d_name, ".journal") && + !endswith(de->d_name, ".journal~")) + continue; + + if (fstatat(dirfd(d), de->d_name, &st, AT_SYMLINK_NOFOLLOW) < 0) + continue; + + if (!S_ISREG(st.st_mode)) + continue; + + sum += (uint64_t) st.st_blocks * 512UL; + } + + avail = sum >= m->max_use ? 0 : m->max_use - sum; + + ss_avail = ss.f_bsize * ss.f_bavail; + + ss_avail = ss_avail < m->keep_free ? 0 : ss_avail - m->keep_free; + + if (ss_avail < avail) + avail = ss_avail; + + s->cached_available_space = avail; + s->cached_available_space_timestamp = ts; + +finish: + closedir(d); + + return avail; +} + +static void server_read_file_gid(Server *s) { + const char *adm = "adm"; + int r; + + assert(s); + + if (s->file_gid_valid) + return; + + r = get_group_creds(&adm, &s->file_gid); + if (r < 0) + log_warning("Failed to resolve 'adm' group: %s", strerror(-r)); + + /* if we couldn't read the gid, then it will be 0, but that's + * fine and we shouldn't try to resolve the group again, so + * let's just pretend it worked right-away. */ + s->file_gid_valid = true; +} + +static void server_fix_perms(Server *s, JournalFile *f, uid_t uid) { + int r; +#ifdef HAVE_ACL + acl_t acl; + acl_entry_t entry; + acl_permset_t permset; +#endif + + assert(f); + + server_read_file_gid(s); + + r = fchmod_and_fchown(f->fd, 0640, 0, s->file_gid); + if (r < 0) + log_warning("Failed to fix access mode/rights on %s, ignoring: %s", f->path, strerror(-r)); + +#ifdef HAVE_ACL + if (uid <= 0) + return; + + acl = acl_get_fd(f->fd); + if (!acl) { + log_warning("Failed to read ACL on %s, ignoring: %m", f->path); + return; + } + + r = acl_find_uid(acl, uid, &entry); + if (r <= 0) { + + if (acl_create_entry(&acl, &entry) < 0 || + acl_set_tag_type(entry, ACL_USER) < 0 || + acl_set_qualifier(entry, &uid) < 0) { + log_warning("Failed to patch ACL on %s, ignoring: %m", f->path); + goto finish; + } + } + + if (acl_get_permset(entry, &permset) < 0 || + acl_add_perm(permset, ACL_READ) < 0 || + acl_calc_mask(&acl) < 0) { + log_warning("Failed to patch ACL on %s, ignoring: %m", f->path); + goto finish; + } + + if (acl_set_fd(f->fd, acl) < 0) + log_warning("Failed to set ACL on %s, ignoring: %m", f->path); + +finish: + acl_free(acl); +#endif +} + +static JournalFile* find_journal(Server *s, uid_t uid) { + char *p; + int r; + JournalFile *f; + char ids[33]; + sd_id128_t machine; + + assert(s); + + /* We split up user logs only on /var, not on /run. If the + * runtime file is open, we write to it exclusively, in order + * to guarantee proper order as soon as we flush /run to + * /var and close the runtime file. */ + + if (s->runtime_journal) + return s->runtime_journal; + + if (uid <= 0) + return s->system_journal; + + r = sd_id128_get_machine(&machine); + if (r < 0) + return s->system_journal; + + f = hashmap_get(s->user_journals, UINT32_TO_PTR(uid)); + if (f) + return f; + + if (asprintf(&p, "/var/log/journal/%s/user-%lu.journal", sd_id128_to_string(machine, ids), (unsigned long) uid) < 0) + return s->system_journal; + + while (hashmap_size(s->user_journals) >= USER_JOURNALS_MAX) { + /* Too many open? Then let's close one */ + f = hashmap_steal_first(s->user_journals); + assert(f); + journal_file_close(f); + } + + r = journal_file_open_reliably(p, O_RDWR|O_CREAT, 0640, s->system_journal, &f); + free(p); + + if (r < 0) + return s->system_journal; + + server_fix_perms(s, f, uid); + + r = hashmap_put(s->user_journals, UINT32_TO_PTR(uid), f); + if (r < 0) { + journal_file_close(f); + return s->system_journal; + } + + return f; +} + +static void server_rotate(Server *s) { + JournalFile *f; + void *k; + Iterator i; + int r; + + log_info("Rotating..."); + + if (s->runtime_journal) { + r = journal_file_rotate(&s->runtime_journal); + if (r < 0) + log_error("Failed to rotate %s: %s", s->runtime_journal->path, strerror(-r)); + else + server_fix_perms(s, s->runtime_journal, 0); + } + + if (s->system_journal) { + r = journal_file_rotate(&s->system_journal); + if (r < 0) + log_error("Failed to rotate %s: %s", s->system_journal->path, strerror(-r)); + else + server_fix_perms(s, s->system_journal, 0); + } + + HASHMAP_FOREACH_KEY(f, k, s->user_journals, i) { + r = journal_file_rotate(&f); + if (r < 0) + log_error("Failed to rotate %s: %s", f->path, strerror(-r)); + else { + hashmap_replace(s->user_journals, k, f); + server_fix_perms(s, s->system_journal, PTR_TO_UINT32(k)); + } + } +} + +static void server_vacuum(Server *s) { + char *p; + char ids[33]; + sd_id128_t machine; + int r; + + log_info("Vacuuming..."); + + r = sd_id128_get_machine(&machine); + if (r < 0) { + log_error("Failed to get machine ID: %s", strerror(-r)); + return; + } + + sd_id128_to_string(machine, ids); + + if (s->system_journal) { + if (asprintf(&p, "/var/log/journal/%s", ids) < 0) { + log_error("Out of memory."); + return; + } + + r = journal_directory_vacuum(p, s->system_metrics.max_use, s->system_metrics.keep_free); + if (r < 0 && r != -ENOENT) + log_error("Failed to vacuum %s: %s", p, strerror(-r)); + free(p); + } + + + if (s->runtime_journal) { + if (asprintf(&p, "/run/log/journal/%s", ids) < 0) { + log_error("Out of memory."); + return; + } + + r = journal_directory_vacuum(p, s->runtime_metrics.max_use, s->runtime_metrics.keep_free); + if (r < 0 && r != -ENOENT) + log_error("Failed to vacuum %s: %s", p, strerror(-r)); + free(p); + } + + s->cached_available_space_timestamp = 0; +} + +static char *shortened_cgroup_path(pid_t pid) { + int r; + char *process_path, *init_path, *path; + + assert(pid > 0); + + r = cg_get_by_pid(SYSTEMD_CGROUP_CONTROLLER, pid, &process_path); + if (r < 0) + return NULL; + + r = cg_get_by_pid(SYSTEMD_CGROUP_CONTROLLER, 1, &init_path); + if (r < 0) { + free(process_path); + return NULL; + } + + if (endswith(init_path, "/system")) + init_path[strlen(init_path) - 7] = 0; + else if (streq(init_path, "/")) + init_path[0] = 0; + + if (startswith(process_path, init_path)) { + char *p; + + p = strdup(process_path + strlen(init_path)); + if (!p) { + free(process_path); + free(init_path); + return NULL; + } + path = p; + } else { + path = process_path; + process_path = NULL; + } + + free(process_path); + free(init_path); + + return path; +} + +static void dispatch_message_real( + Server *s, + struct iovec *iovec, unsigned n, unsigned m, + struct ucred *ucred, + struct timeval *tv, + const char *label, size_t label_len) { + + char *pid = NULL, *uid = NULL, *gid = NULL, + *source_time = NULL, *boot_id = NULL, *machine_id = NULL, + *comm = NULL, *cmdline = NULL, *hostname = NULL, + *audit_session = NULL, *audit_loginuid = NULL, + *exe = NULL, *cgroup = NULL, *session = NULL, + *owner_uid = NULL, *unit = NULL, *selinux_context = NULL; + + char idbuf[33]; + sd_id128_t id; + int r; + char *t; + uid_t loginuid = 0, realuid = 0; + JournalFile *f; + bool vacuumed = false; + + assert(s); + assert(iovec); + assert(n > 0); + assert(n + N_IOVEC_META_FIELDS <= m); + + if (ucred) { + uint32_t audit; + uid_t owner; + + realuid = ucred->uid; + + if (asprintf(&pid, "_PID=%lu", (unsigned long) ucred->pid) >= 0) + IOVEC_SET_STRING(iovec[n++], pid); + + if (asprintf(&uid, "_UID=%lu", (unsigned long) ucred->uid) >= 0) + IOVEC_SET_STRING(iovec[n++], uid); + + if (asprintf(&gid, "_GID=%lu", (unsigned long) ucred->gid) >= 0) + IOVEC_SET_STRING(iovec[n++], gid); + + r = get_process_comm(ucred->pid, &t); + if (r >= 0) { + comm = strappend("_COMM=", t); + free(t); + + if (comm) + IOVEC_SET_STRING(iovec[n++], comm); + } + + r = get_process_exe(ucred->pid, &t); + if (r >= 0) { + exe = strappend("_EXE=", t); + free(t); + + if (exe) + IOVEC_SET_STRING(iovec[n++], exe); + } + + r = get_process_cmdline(ucred->pid, LINE_MAX, false, &t); + if (r >= 0) { + cmdline = strappend("_CMDLINE=", t); + free(t); + + if (cmdline) + IOVEC_SET_STRING(iovec[n++], cmdline); + } + + r = audit_session_from_pid(ucred->pid, &audit); + if (r >= 0) + if (asprintf(&audit_session, "_AUDIT_SESSION=%lu", (unsigned long) audit) >= 0) + IOVEC_SET_STRING(iovec[n++], audit_session); + + r = audit_loginuid_from_pid(ucred->pid, &loginuid); + if (r >= 0) + if (asprintf(&audit_loginuid, "_AUDIT_LOGINUID=%lu", (unsigned long) loginuid) >= 0) + IOVEC_SET_STRING(iovec[n++], audit_loginuid); + + t = shortened_cgroup_path(ucred->pid); + if (t) { + cgroup = strappend("_SYSTEMD_CGROUP=", t); + free(t); + + if (cgroup) + IOVEC_SET_STRING(iovec[n++], cgroup); + } + + if (sd_pid_get_session(ucred->pid, &t) >= 0) { + session = strappend("_SYSTEMD_SESSION=", t); + free(t); + + if (session) + IOVEC_SET_STRING(iovec[n++], session); + } + + if (sd_pid_get_unit(ucred->pid, &t) >= 0) { + unit = strappend("_SYSTEMD_UNIT=", t); + free(t); + + if (unit) + IOVEC_SET_STRING(iovec[n++], unit); + } + + if (sd_pid_get_owner_uid(ucred->uid, &owner) >= 0) + if (asprintf(&owner_uid, "_SYSTEMD_OWNER_UID=%lu", (unsigned long) owner) >= 0) + IOVEC_SET_STRING(iovec[n++], owner_uid); + +#ifdef HAVE_SELINUX + if (label) { + selinux_context = malloc(sizeof("_SELINUX_CONTEXT=") + label_len); + if (selinux_context) { + memcpy(selinux_context, "_SELINUX_CONTEXT=", sizeof("_SELINUX_CONTEXT=")-1); + memcpy(selinux_context+sizeof("_SELINUX_CONTEXT=")-1, label, label_len); + selinux_context[sizeof("_SELINUX_CONTEXT=")-1+label_len] = 0; + IOVEC_SET_STRING(iovec[n++], selinux_context); + } + } else { + security_context_t con; + + if (getpidcon(ucred->pid, &con) >= 0) { + selinux_context = strappend("_SELINUX_CONTEXT=", con); + if (selinux_context) + IOVEC_SET_STRING(iovec[n++], selinux_context); + + freecon(con); + } + } +#endif + } + + if (tv) { + if (asprintf(&source_time, "_SOURCE_REALTIME_TIMESTAMP=%llu", + (unsigned long long) timeval_load(tv)) >= 0) + IOVEC_SET_STRING(iovec[n++], source_time); + } + + /* Note that strictly speaking storing the boot id here is + * redundant since the entry includes this in-line + * anyway. However, we need this indexed, too. */ + r = sd_id128_get_boot(&id); + if (r >= 0) + if (asprintf(&boot_id, "_BOOT_ID=%s", sd_id128_to_string(id, idbuf)) >= 0) + IOVEC_SET_STRING(iovec[n++], boot_id); + + r = sd_id128_get_machine(&id); + if (r >= 0) + if (asprintf(&machine_id, "_MACHINE_ID=%s", sd_id128_to_string(id, idbuf)) >= 0) + IOVEC_SET_STRING(iovec[n++], machine_id); + + t = gethostname_malloc(); + if (t) { + hostname = strappend("_HOSTNAME=", t); + free(t); + if (hostname) + IOVEC_SET_STRING(iovec[n++], hostname); + } + + assert(n <= m); + + server_flush_to_var(s); + +retry: + f = find_journal(s, realuid == 0 ? 0 : loginuid); + if (!f) + log_warning("Dropping message, as we can't find a place to store the data."); + else { + r = journal_file_append_entry(f, NULL, iovec, n, &s->seqnum, NULL, NULL); + + if ((r == -E2BIG || /* hit limit */ + r == -EFBIG || /* hit fs limit */ + r == -EDQUOT || /* quota hit */ + r == -ENOSPC || /* disk full */ + r == -EBADMSG || /* corrupted */ + r == -ENODATA || /* truncated */ + r == -EHOSTDOWN || /* other machine */ + r == -EPROTONOSUPPORT) && /* unsupported feature */ + !vacuumed) { + + if (r == -E2BIG) + log_info("Allocation limit reached, rotating."); + else + log_warning("Journal file corrupted, rotating."); + + server_rotate(s); + server_vacuum(s); + vacuumed = true; + + log_info("Retrying write."); + goto retry; + } + + if (r < 0) + log_error("Failed to write entry, ignoring: %s", strerror(-r)); + } + + free(pid); + free(uid); + free(gid); + free(comm); + free(exe); + free(cmdline); + free(source_time); + free(boot_id); + free(machine_id); + free(hostname); + free(audit_session); + free(audit_loginuid); + free(cgroup); + free(session); + free(owner_uid); + free(unit); + free(selinux_context); +} + +static void driver_message(Server *s, sd_id128_t message_id, const char *format, ...) { + char mid[11 + 32 + 1]; + char buffer[16 + LINE_MAX + 1]; + struct iovec iovec[N_IOVEC_META_FIELDS + 4]; + int n = 0; + va_list ap; + struct ucred ucred; + + assert(s); + assert(format); + + IOVEC_SET_STRING(iovec[n++], "PRIORITY=5"); + IOVEC_SET_STRING(iovec[n++], "_TRANSPORT=driver"); + + memcpy(buffer, "MESSAGE=", 8); + va_start(ap, format); + vsnprintf(buffer + 8, sizeof(buffer) - 8, format, ap); + va_end(ap); + char_array_0(buffer); + IOVEC_SET_STRING(iovec[n++], buffer); + + snprintf(mid, sizeof(mid), "MESSAGE_ID=" SD_ID128_FORMAT_STR, SD_ID128_FORMAT_VAL(message_id)); + char_array_0(mid); + IOVEC_SET_STRING(iovec[n++], mid); + + zero(ucred); + ucred.pid = getpid(); + ucred.uid = getuid(); + ucred.gid = getgid(); + + dispatch_message_real(s, iovec, n, ELEMENTSOF(iovec), &ucred, NULL, NULL, 0); +} + +static void dispatch_message(Server *s, + struct iovec *iovec, unsigned n, unsigned m, + struct ucred *ucred, + struct timeval *tv, + const char *label, size_t label_len, + int priority) { + int rl; + char *path = NULL, *c; + + assert(s); + assert(iovec || n == 0); + + if (n == 0) + return; + + if (!ucred) + goto finish; + + path = shortened_cgroup_path(ucred->pid); + if (!path) + goto finish; + + /* example: /user/lennart/3/foobar + * /system/dbus.service/foobar + * + * So let's cut of everything past the third /, since that is + * wher user directories start */ + + c = strchr(path, '/'); + if (c) { + c = strchr(c+1, '/'); + if (c) { + c = strchr(c+1, '/'); + if (c) + *c = 0; + } + } + + rl = journal_rate_limit_test(s->rate_limit, path, priority & LOG_PRIMASK, available_space(s)); + + if (rl == 0) { + free(path); + return; + } + + /* Write a suppression message if we suppressed something */ + if (rl > 1) + driver_message(s, SD_MESSAGE_JOURNAL_DROPPED, "Suppressed %u messages from %s", rl - 1, path); + + free(path); + +finish: + dispatch_message_real(s, iovec, n, m, ucred, tv, label, label_len); +} + +static void forward_syslog_iovec(Server *s, const struct iovec *iovec, unsigned n_iovec, struct ucred *ucred, struct timeval *tv) { + struct msghdr msghdr; + struct cmsghdr *cmsg; + union { + struct cmsghdr cmsghdr; + uint8_t buf[CMSG_SPACE(sizeof(struct ucred))]; + } control; + union sockaddr_union sa; + + assert(s); + assert(iovec); + assert(n_iovec > 0); + + zero(msghdr); + msghdr.msg_iov = (struct iovec*) iovec; + msghdr.msg_iovlen = n_iovec; + + zero(sa); + sa.un.sun_family = AF_UNIX; + strncpy(sa.un.sun_path, "/run/systemd/journal/syslog", sizeof(sa.un.sun_path)); + msghdr.msg_name = &sa; + msghdr.msg_namelen = offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path); + + if (ucred) { + zero(control); + msghdr.msg_control = &control; + msghdr.msg_controllen = sizeof(control); + + cmsg = CMSG_FIRSTHDR(&msghdr); + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_type = SCM_CREDENTIALS; + cmsg->cmsg_len = CMSG_LEN(sizeof(struct ucred)); + memcpy(CMSG_DATA(cmsg), ucred, sizeof(struct ucred)); + msghdr.msg_controllen = cmsg->cmsg_len; + } + + /* Forward the syslog message we received via /dev/log to + * /run/systemd/syslog. Unfortunately we currently can't set + * the SO_TIMESTAMP auxiliary data, and hence we don't. */ + + if (sendmsg(s->syslog_fd, &msghdr, MSG_NOSIGNAL) >= 0) + return; + + /* The socket is full? I guess the syslog implementation is + * too slow, and we shouldn't wait for that... */ + if (errno == EAGAIN) + return; + + if (ucred && errno == ESRCH) { + struct ucred u; + + /* Hmm, presumably the sender process vanished + * by now, so let's fix it as good as we + * can, and retry */ + + u = *ucred; + u.pid = getpid(); + memcpy(CMSG_DATA(cmsg), &u, sizeof(struct ucred)); + + if (sendmsg(s->syslog_fd, &msghdr, MSG_NOSIGNAL) >= 0) + return; + + if (errno == EAGAIN) + return; + } + + log_debug("Failed to forward syslog message: %m"); +} + +static void forward_syslog_raw(Server *s, const char *buffer, struct ucred *ucred, struct timeval *tv) { + struct iovec iovec; + + assert(s); + assert(buffer); + + IOVEC_SET_STRING(iovec, buffer); + forward_syslog_iovec(s, &iovec, 1, ucred, tv); +} + +static void forward_syslog(Server *s, int priority, const char *identifier, const char *message, struct ucred *ucred, struct timeval *tv) { + struct iovec iovec[5]; + char header_priority[6], header_time[64], header_pid[16]; + int n = 0; + time_t t; + struct tm *tm; + char *ident_buf = NULL; + + assert(s); + assert(priority >= 0); + assert(priority <= 999); + assert(message); + + /* First: priority field */ + snprintf(header_priority, sizeof(header_priority), "<%i>", priority); + char_array_0(header_priority); + IOVEC_SET_STRING(iovec[n++], header_priority); + + /* Second: timestamp */ + t = tv ? tv->tv_sec : ((time_t) (now(CLOCK_REALTIME) / USEC_PER_SEC)); + tm = localtime(&t); + if (!tm) + return; + if (strftime(header_time, sizeof(header_time), "%h %e %T ", tm) <= 0) + return; + IOVEC_SET_STRING(iovec[n++], header_time); + + /* Third: identifier and PID */ + if (ucred) { + if (!identifier) { + get_process_comm(ucred->pid, &ident_buf); + identifier = ident_buf; + } + + snprintf(header_pid, sizeof(header_pid), "[%lu]: ", (unsigned long) ucred->pid); + char_array_0(header_pid); + + if (identifier) + IOVEC_SET_STRING(iovec[n++], identifier); + + IOVEC_SET_STRING(iovec[n++], header_pid); + } else if (identifier) { + IOVEC_SET_STRING(iovec[n++], identifier); + IOVEC_SET_STRING(iovec[n++], ": "); + } + + /* Fourth: message */ + IOVEC_SET_STRING(iovec[n++], message); + + forward_syslog_iovec(s, iovec, n, ucred, tv); + + free(ident_buf); +} + +static int fixup_priority(int priority) { + + if ((priority & LOG_FACMASK) == 0) + return (priority & LOG_PRIMASK) | LOG_USER; + + return priority; +} + +static void forward_kmsg(Server *s, int priority, const char *identifier, const char *message, struct ucred *ucred) { + struct iovec iovec[5]; + char header_priority[6], header_pid[16]; + int n = 0; + char *ident_buf = NULL; + int fd; + + assert(s); + assert(priority >= 0); + assert(priority <= 999); + assert(message); + + /* Never allow messages with kernel facility to be written to + * kmsg, regardless where the data comes from. */ + priority = fixup_priority(priority); + + /* First: priority field */ + snprintf(header_priority, sizeof(header_priority), "<%i>", priority); + char_array_0(header_priority); + IOVEC_SET_STRING(iovec[n++], header_priority); + + /* Second: identifier and PID */ + if (ucred) { + if (!identifier) { + get_process_comm(ucred->pid, &ident_buf); + identifier = ident_buf; + } + + snprintf(header_pid, sizeof(header_pid), "[%lu]: ", (unsigned long) ucred->pid); + char_array_0(header_pid); + + if (identifier) + IOVEC_SET_STRING(iovec[n++], identifier); + + IOVEC_SET_STRING(iovec[n++], header_pid); + } else if (identifier) { + IOVEC_SET_STRING(iovec[n++], identifier); + IOVEC_SET_STRING(iovec[n++], ": "); + } + + /* Fourth: message */ + IOVEC_SET_STRING(iovec[n++], message); + IOVEC_SET_STRING(iovec[n++], "\n"); + + fd = open("/dev/kmsg", O_WRONLY|O_NOCTTY|O_CLOEXEC); + if (fd < 0) { + log_debug("Failed to open /dev/kmsg for logging: %s", strerror(errno)); + goto finish; + } + + if (writev(fd, iovec, n) < 0) + log_debug("Failed to write to /dev/kmsg for logging: %s", strerror(errno)); + + close_nointr_nofail(fd); + +finish: + free(ident_buf); +} + +static void forward_console(Server *s, const char *identifier, const char *message, struct ucred *ucred) { + struct iovec iovec[4]; + char header_pid[16]; + int n = 0, fd; + char *ident_buf = NULL; + + assert(s); + assert(message); + + /* First: identifier and PID */ + if (ucred) { + if (!identifier) { + get_process_comm(ucred->pid, &ident_buf); + identifier = ident_buf; + } + + snprintf(header_pid, sizeof(header_pid), "[%lu]: ", (unsigned long) ucred->pid); + char_array_0(header_pid); + + if (identifier) + IOVEC_SET_STRING(iovec[n++], identifier); + + IOVEC_SET_STRING(iovec[n++], header_pid); + } else if (identifier) { + IOVEC_SET_STRING(iovec[n++], identifier); + IOVEC_SET_STRING(iovec[n++], ": "); + } + + /* Third: message */ + IOVEC_SET_STRING(iovec[n++], message); + IOVEC_SET_STRING(iovec[n++], "\n"); + + fd = open_terminal("/dev/console", O_WRONLY|O_NOCTTY|O_CLOEXEC); + if (fd < 0) { + log_debug("Failed to open /dev/console for logging: %s", strerror(errno)); + goto finish; + } + + if (writev(fd, iovec, n) < 0) + log_debug("Failed to write to /dev/console for logging: %s", strerror(errno)); + + close_nointr_nofail(fd); + +finish: + free(ident_buf); +} + +static void read_identifier(const char **buf, char **identifier, char **pid) { + const char *p; + char *t; + size_t l, e; + + assert(buf); + assert(identifier); + assert(pid); + + p = *buf; + + p += strspn(p, WHITESPACE); + l = strcspn(p, WHITESPACE); + + if (l <= 0 || + p[l-1] != ':') + return; + + e = l; + l--; + + if (p[l-1] == ']') { + size_t k = l-1; + + for (;;) { + + if (p[k] == '[') { + t = strndup(p+k+1, l-k-2); + if (t) + *pid = t; + + l = k; + break; + } + + if (k == 0) + break; + + k--; + } + } + + t = strndup(p, l); + if (t) + *identifier = t; + + *buf = p + e; + *buf += strspn(*buf, WHITESPACE); +} + +static void process_syslog_message(Server *s, const char *buf, struct ucred *ucred, struct timeval *tv, const char *label, size_t label_len) { + char *message = NULL, *syslog_priority = NULL, *syslog_facility = NULL, *syslog_identifier = NULL, *syslog_pid = NULL; + struct iovec iovec[N_IOVEC_META_FIELDS + 6]; + unsigned n = 0; + int priority = LOG_USER | LOG_INFO; + char *identifier = NULL, *pid = NULL; + + assert(s); + assert(buf); + + if (s->forward_to_syslog) + forward_syslog_raw(s, buf, ucred, tv); + + parse_syslog_priority((char**) &buf, &priority); + skip_syslog_date((char**) &buf); + read_identifier(&buf, &identifier, &pid); + + if (s->forward_to_kmsg) + forward_kmsg(s, priority, identifier, buf, ucred); + + if (s->forward_to_console) + forward_console(s, identifier, buf, ucred); + + IOVEC_SET_STRING(iovec[n++], "_TRANSPORT=syslog"); + + if (asprintf(&syslog_priority, "PRIORITY=%i", priority & LOG_PRIMASK) >= 0) + IOVEC_SET_STRING(iovec[n++], syslog_priority); + + if (priority & LOG_FACMASK) + if (asprintf(&syslog_facility, "SYSLOG_FACILITY=%i", LOG_FAC(priority)) >= 0) + IOVEC_SET_STRING(iovec[n++], syslog_facility); + + if (identifier) { + syslog_identifier = strappend("SYSLOG_IDENTIFIER=", identifier); + if (syslog_identifier) + IOVEC_SET_STRING(iovec[n++], syslog_identifier); + } + + if (pid) { + syslog_pid = strappend("SYSLOG_PID=", pid); + if (syslog_pid) + IOVEC_SET_STRING(iovec[n++], syslog_pid); + } + + message = strappend("MESSAGE=", buf); + if (message) + IOVEC_SET_STRING(iovec[n++], message); + + dispatch_message(s, iovec, n, ELEMENTSOF(iovec), ucred, tv, label, label_len, priority); + + free(message); + free(identifier); + free(pid); + free(syslog_priority); + free(syslog_facility); + free(syslog_identifier); +} + +static bool valid_user_field(const char *p, size_t l) { + const char *a; + + /* We kinda enforce POSIX syntax recommendations for + environment variables here, but make a couple of additional + requirements. + + http://pubs.opengroup.org/onlinepubs/000095399/basedefs/xbd_chap08.html */ + + /* No empty field names */ + if (l <= 0) + return false; + + /* Don't allow names longer than 64 chars */ + if (l > 64) + return false; + + /* Variables starting with an underscore are protected */ + if (p[0] == '_') + return false; + + /* Don't allow digits as first character */ + if (p[0] >= '0' && p[0] <= '9') + return false; + + /* Only allow A-Z0-9 and '_' */ + for (a = p; a < p + l; a++) + if (!((*a >= 'A' && *a <= 'Z') || + (*a >= '0' && *a <= '9') || + *a == '_')) + return false; + + return true; +} + +static void process_native_message( + Server *s, + const void *buffer, size_t buffer_size, + struct ucred *ucred, + struct timeval *tv, + const char *label, size_t label_len) { + + struct iovec *iovec = NULL; + unsigned n = 0, m = 0, j, tn = (unsigned) -1; + const char *p; + size_t remaining; + int priority = LOG_INFO; + char *identifier = NULL, *message = NULL; + + assert(s); + assert(buffer || n == 0); + + p = buffer; + remaining = buffer_size; + + while (remaining > 0) { + const char *e, *q; + + e = memchr(p, '\n', remaining); + + if (!e) { + /* Trailing noise, let's ignore it, and flush what we collected */ + log_debug("Received message with trailing noise, ignoring."); + break; + } + + if (e == p) { + /* Entry separator */ + dispatch_message(s, iovec, n, m, ucred, tv, label, label_len, priority); + n = 0; + priority = LOG_INFO; + + p++; + remaining--; + continue; + } + + if (*p == '.' || *p == '#') { + /* Ignore control commands for now, and + * comments too. */ + remaining -= (e - p) + 1; + p = e + 1; + continue; + } + + /* A property follows */ + + if (n+N_IOVEC_META_FIELDS >= m) { + struct iovec *c; + unsigned u; + + u = MAX((n+N_IOVEC_META_FIELDS+1) * 2U, 4U); + c = realloc(iovec, u * sizeof(struct iovec)); + if (!c) { + log_error("Out of memory"); + break; + } + + iovec = c; + m = u; + } + + q = memchr(p, '=', e - p); + if (q) { + if (valid_user_field(p, q - p)) { + size_t l; + + l = e - p; + + /* If the field name starts with an + * underscore, skip the variable, + * since that indidates a trusted + * field */ + iovec[n].iov_base = (char*) p; + iovec[n].iov_len = l; + n++; + + /* We need to determine the priority + * of this entry for the rate limiting + * logic */ + if (l == 10 && + memcmp(p, "PRIORITY=", 9) == 0 && + p[9] >= '0' && p[9] <= '9') + priority = (priority & LOG_FACMASK) | (p[9] - '0'); + + else if (l == 17 && + memcmp(p, "SYSLOG_FACILITY=", 16) == 0 && + p[16] >= '0' && p[16] <= '9') + priority = (priority & LOG_PRIMASK) | ((p[16] - '0') << 3); + + else if (l == 18 && + memcmp(p, "SYSLOG_FACILITY=", 16) == 0 && + p[16] >= '0' && p[16] <= '9' && + p[17] >= '0' && p[17] <= '9') + priority = (priority & LOG_PRIMASK) | (((p[16] - '0')*10 + (p[17] - '0')) << 3); + + else if (l >= 12 && + memcmp(p, "SYSLOG_IDENTIFIER=", 11) == 0) { + char *t; + + t = strndup(p + 11, l - 11); + if (t) { + free(identifier); + identifier = t; + } + } else if (l >= 8 && + memcmp(p, "MESSAGE=", 8) == 0) { + char *t; + + t = strndup(p + 8, l - 8); + if (t) { + free(message); + message = t; + } + } + } + + remaining -= (e - p) + 1; + p = e + 1; + continue; + } else { + le64_t l_le; + uint64_t l; + char *k; + + if (remaining < e - p + 1 + sizeof(uint64_t) + 1) { + log_debug("Failed to parse message, ignoring."); + break; + } + + memcpy(&l_le, e + 1, sizeof(uint64_t)); + l = le64toh(l_le); + + if (remaining < e - p + 1 + sizeof(uint64_t) + l + 1 || + e[1+sizeof(uint64_t)+l] != '\n') { + log_debug("Failed to parse message, ignoring."); + break; + } + + k = malloc((e - p) + 1 + l); + if (!k) { + log_error("Out of memory"); + break; + } + + memcpy(k, p, e - p); + k[e - p] = '='; + memcpy(k + (e - p) + 1, e + 1 + sizeof(uint64_t), l); + + if (valid_user_field(p, e - p)) { + iovec[n].iov_base = k; + iovec[n].iov_len = (e - p) + 1 + l; + n++; + } else + free(k); + + remaining -= (e - p) + 1 + sizeof(uint64_t) + l + 1; + p = e + 1 + sizeof(uint64_t) + l + 1; + } + } + + if (n <= 0) + goto finish; + + tn = n++; + IOVEC_SET_STRING(iovec[tn], "_TRANSPORT=journal"); + + if (message) { + if (s->forward_to_syslog) + forward_syslog(s, priority, identifier, message, ucred, tv); + + if (s->forward_to_kmsg) + forward_kmsg(s, priority, identifier, message, ucred); + + if (s->forward_to_console) + forward_console(s, identifier, message, ucred); + } + + dispatch_message(s, iovec, n, m, ucred, tv, label, label_len, priority); + +finish: + for (j = 0; j < n; j++) { + if (j == tn) + continue; + + if (iovec[j].iov_base < buffer || + (const uint8_t*) iovec[j].iov_base >= (const uint8_t*) buffer + buffer_size) + free(iovec[j].iov_base); + } + + free(iovec); + free(identifier); + free(message); +} + +static void process_native_file( + Server *s, + int fd, + struct ucred *ucred, + struct timeval *tv, + const char *label, size_t label_len) { + + struct stat st; + void *p; + ssize_t n; + + assert(s); + assert(fd >= 0); + + /* Data is in the passed file, since it didn't fit in a + * datagram. We can't map the file here, since clients might + * then truncate it and trigger a SIGBUS for us. So let's + * stupidly read it */ + + if (fstat(fd, &st) < 0) { + log_error("Failed to stat passed file, ignoring: %m"); + return; + } + + if (!S_ISREG(st.st_mode)) { + log_error("File passed is not regular. Ignoring."); + return; + } + + if (st.st_size <= 0) + return; + + if (st.st_size > ENTRY_SIZE_MAX) { + log_error("File passed too large. Ignoring."); + return; + } + + p = malloc(st.st_size); + if (!p) { + log_error("Out of memory"); + return; + } + + n = pread(fd, p, st.st_size, 0); + if (n < 0) + log_error("Failed to read file, ignoring: %s", strerror(-n)); + else if (n > 0) + process_native_message(s, p, n, ucred, tv, label, label_len); + + free(p); +} + +static int stdout_stream_log(StdoutStream *s, const char *p) { + struct iovec iovec[N_IOVEC_META_FIELDS + 5]; + char *message = NULL, *syslog_priority = NULL, *syslog_facility = NULL, *syslog_identifier = NULL; + unsigned n = 0; + int priority; + char *label = NULL; + size_t label_len = 0; + + assert(s); + assert(p); + + if (isempty(p)) + return 0; + + priority = s->priority; + + if (s->level_prefix) + parse_syslog_priority((char**) &p, &priority); + + if (s->forward_to_syslog || s->server->forward_to_syslog) + forward_syslog(s->server, fixup_priority(priority), s->identifier, p, &s->ucred, NULL); + + if (s->forward_to_kmsg || s->server->forward_to_kmsg) + forward_kmsg(s->server, priority, s->identifier, p, &s->ucred); + + if (s->forward_to_console || s->server->forward_to_console) + forward_console(s->server, s->identifier, p, &s->ucred); + + IOVEC_SET_STRING(iovec[n++], "_TRANSPORT=stdout"); + + if (asprintf(&syslog_priority, "PRIORITY=%i", priority & LOG_PRIMASK) >= 0) + IOVEC_SET_STRING(iovec[n++], syslog_priority); + + if (priority & LOG_FACMASK) + if (asprintf(&syslog_facility, "SYSLOG_FACILITY=%i", LOG_FAC(priority)) >= 0) + IOVEC_SET_STRING(iovec[n++], syslog_facility); + + if (s->identifier) { + syslog_identifier = strappend("SYSLOG_IDENTIFIER=", s->identifier); + if (syslog_identifier) + IOVEC_SET_STRING(iovec[n++], syslog_identifier); + } + + message = strappend("MESSAGE=", p); + if (message) + IOVEC_SET_STRING(iovec[n++], message); + +#ifdef HAVE_SELINUX + if (s->security_context) { + label = (char*) s->security_context; + label_len = strlen((char*) s->security_context); + } +#endif + + dispatch_message(s->server, iovec, n, ELEMENTSOF(iovec), &s->ucred, NULL, label, label_len, priority); + + free(message); + free(syslog_priority); + free(syslog_facility); + free(syslog_identifier); + + return 0; +} + +static int stdout_stream_line(StdoutStream *s, char *p) { + int r; + + assert(s); + assert(p); + + p = strstrip(p); + + switch (s->state) { + + case STDOUT_STREAM_IDENTIFIER: + if (isempty(p)) + s->identifier = NULL; + else { + s->identifier = strdup(p); + if (!s->identifier) { + log_error("Out of memory"); + return -ENOMEM; + } + } + + s->state = STDOUT_STREAM_PRIORITY; + return 0; + + case STDOUT_STREAM_PRIORITY: + r = safe_atoi(p, &s->priority); + if (r < 0 || s->priority <= 0 || s->priority >= 999) { + log_warning("Failed to parse log priority line."); + return -EINVAL; + } + + s->state = STDOUT_STREAM_LEVEL_PREFIX; + return 0; + + case STDOUT_STREAM_LEVEL_PREFIX: + r = parse_boolean(p); + if (r < 0) { + log_warning("Failed to parse level prefix line."); + return -EINVAL; + } + + s->level_prefix = !!r; + s->state = STDOUT_STREAM_FORWARD_TO_SYSLOG; + return 0; + + case STDOUT_STREAM_FORWARD_TO_SYSLOG: + r = parse_boolean(p); + if (r < 0) { + log_warning("Failed to parse forward to syslog line."); + return -EINVAL; + } + + s->forward_to_syslog = !!r; + s->state = STDOUT_STREAM_FORWARD_TO_KMSG; + return 0; + + case STDOUT_STREAM_FORWARD_TO_KMSG: + r = parse_boolean(p); + if (r < 0) { + log_warning("Failed to parse copy to kmsg line."); + return -EINVAL; + } + + s->forward_to_kmsg = !!r; + s->state = STDOUT_STREAM_FORWARD_TO_CONSOLE; + return 0; + + case STDOUT_STREAM_FORWARD_TO_CONSOLE: + r = parse_boolean(p); + if (r < 0) { + log_warning("Failed to parse copy to console line."); + return -EINVAL; + } + + s->forward_to_console = !!r; + s->state = STDOUT_STREAM_RUNNING; + return 0; + + case STDOUT_STREAM_RUNNING: + return stdout_stream_log(s, p); + } + + assert_not_reached("Unknown stream state"); +} + +static int stdout_stream_scan(StdoutStream *s, bool force_flush) { + char *p; + size_t remaining; + int r; + + assert(s); + + p = s->buffer; + remaining = s->length; + for (;;) { + char *end; + size_t skip; + + end = memchr(p, '\n', remaining); + if (end) + skip = end - p + 1; + else if (remaining >= sizeof(s->buffer) - 1) { + end = p + sizeof(s->buffer) - 1; + skip = remaining; + } else + break; + + *end = 0; + + r = stdout_stream_line(s, p); + if (r < 0) + return r; + + remaining -= skip; + p += skip; + } + + if (force_flush && remaining > 0) { + p[remaining] = 0; + r = stdout_stream_line(s, p); + if (r < 0) + return r; + + p += remaining; + remaining = 0; + } + + if (p > s->buffer) { + memmove(s->buffer, p, remaining); + s->length = remaining; + } + + return 0; +} + +static int stdout_stream_process(StdoutStream *s) { + ssize_t l; + int r; + + assert(s); + + l = read(s->fd, s->buffer+s->length, sizeof(s->buffer)-1-s->length); + if (l < 0) { + + if (errno == EAGAIN) + return 0; + + log_warning("Failed to read from stream: %m"); + return -errno; + } + + if (l == 0) { + r = stdout_stream_scan(s, true); + if (r < 0) + return r; + + return 0; + } + + s->length += l; + r = stdout_stream_scan(s, false); + if (r < 0) + return r; + + return 1; + +} + +static void stdout_stream_free(StdoutStream *s) { + assert(s); + + if (s->server) { + assert(s->server->n_stdout_streams > 0); + s->server->n_stdout_streams --; + LIST_REMOVE(StdoutStream, stdout_stream, s->server->stdout_streams, s); + } + + if (s->fd >= 0) { + if (s->server) + epoll_ctl(s->server->epoll_fd, EPOLL_CTL_DEL, s->fd, NULL); + + close_nointr_nofail(s->fd); + } + +#ifdef HAVE_SELINUX + if (s->security_context) + freecon(s->security_context); +#endif + + free(s->identifier); + free(s); +} + +static int stdout_stream_new(Server *s) { + StdoutStream *stream; + int fd, r; + socklen_t len; + struct epoll_event ev; + + assert(s); + + fd = accept4(s->stdout_fd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC); + if (fd < 0) { + if (errno == EAGAIN) + return 0; + + log_error("Failed to accept stdout connection: %m"); + return -errno; + } + + if (s->n_stdout_streams >= STDOUT_STREAMS_MAX) { + log_warning("Too many stdout streams, refusing connection."); + close_nointr_nofail(fd); + return 0; + } + + stream = new0(StdoutStream, 1); + if (!stream) { + log_error("Out of memory."); + close_nointr_nofail(fd); + return -ENOMEM; + } + + stream->fd = fd; + + len = sizeof(stream->ucred); + if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &stream->ucred, &len) < 0) { + log_error("Failed to determine peer credentials: %m"); + r = -errno; + goto fail; + } + +#ifdef HAVE_SELINUX + if (getpeercon(fd, &stream->security_context) < 0) + log_error("Failed to determine peer security context."); +#endif + + if (shutdown(fd, SHUT_WR) < 0) { + log_error("Failed to shutdown writing side of socket: %m"); + r = -errno; + goto fail; + } + + zero(ev); + ev.data.ptr = stream; + ev.events = EPOLLIN; + if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, fd, &ev) < 0) { + log_error("Failed to add stream to event loop: %m"); + r = -errno; + goto fail; + } + + stream->server = s; + LIST_PREPEND(StdoutStream, stdout_stream, s->stdout_streams, stream); + s->n_stdout_streams ++; + + return 0; + +fail: + stdout_stream_free(stream); + return r; +} + +static int parse_kernel_timestamp(char **_p, usec_t *t) { + usec_t r; + int k, i; + char *p; + + assert(_p); + assert(*_p); + assert(t); + + p = *_p; + + if (strlen(p) < 14 || p[0] != '[' || p[13] != ']' || p[6] != '.') + return 0; + + r = 0; + + for (i = 1; i <= 5; i++) { + r *= 10; + + if (p[i] == ' ') + continue; + + k = undecchar(p[i]); + if (k < 0) + return 0; + + r += k; + } + + for (i = 7; i <= 12; i++) { + r *= 10; + + k = undecchar(p[i]); + if (k < 0) + return 0; + + r += k; + } + + *t = r; + *_p += 14; + *_p += strspn(*_p, WHITESPACE); + + return 1; +} + +static void proc_kmsg_line(Server *s, const char *p) { + struct iovec iovec[N_IOVEC_META_FIELDS + 7]; + char *message = NULL, *syslog_priority = NULL, *syslog_pid = NULL, *syslog_facility = NULL, *syslog_identifier = NULL, *source_time = NULL; + int priority = LOG_KERN | LOG_INFO; + unsigned n = 0; + usec_t usec; + char *identifier = NULL, *pid = NULL; + + assert(s); + assert(p); + + if (isempty(p)) + return; + + parse_syslog_priority((char **) &p, &priority); + + if (s->forward_to_kmsg && (priority & LOG_FACMASK) != LOG_KERN) + return; + + if (parse_kernel_timestamp((char **) &p, &usec) > 0) { + if (asprintf(&source_time, "_SOURCE_MONOTONIC_TIMESTAMP=%llu", + (unsigned long long) usec) >= 0) + IOVEC_SET_STRING(iovec[n++], source_time); + } + + IOVEC_SET_STRING(iovec[n++], "_TRANSPORT=kernel"); + + if (asprintf(&syslog_priority, "PRIORITY=%i", priority & LOG_PRIMASK) >= 0) + IOVEC_SET_STRING(iovec[n++], syslog_priority); + + if ((priority & LOG_FACMASK) == LOG_KERN) { + + if (s->forward_to_syslog) + forward_syslog(s, priority, "kernel", p, NULL, NULL); + + IOVEC_SET_STRING(iovec[n++], "SYSLOG_IDENTIFIER=kernel"); + } else { + read_identifier(&p, &identifier, &pid); + + if (s->forward_to_syslog) + forward_syslog(s, priority, identifier, p, NULL, NULL); + + if (identifier) { + syslog_identifier = strappend("SYSLOG_IDENTIFIER=", identifier); + if (syslog_identifier) + IOVEC_SET_STRING(iovec[n++], syslog_identifier); + } + + if (pid) { + syslog_pid = strappend("SYSLOG_PID=", pid); + if (syslog_pid) + IOVEC_SET_STRING(iovec[n++], syslog_pid); + } + + if (asprintf(&syslog_facility, "SYSLOG_FACILITY=%i", LOG_FAC(priority)) >= 0) + IOVEC_SET_STRING(iovec[n++], syslog_facility); + } + + message = strappend("MESSAGE=", p); + if (message) + IOVEC_SET_STRING(iovec[n++], message); + + dispatch_message(s, iovec, n, ELEMENTSOF(iovec), NULL, NULL, NULL, 0, priority); + + free(message); + free(syslog_priority); + free(syslog_identifier); + free(syslog_pid); + free(syslog_facility); + free(source_time); + free(identifier); + free(pid); +} + +static void proc_kmsg_scan(Server *s) { + char *p; + size_t remaining; + + assert(s); + + p = s->proc_kmsg_buffer; + remaining = s->proc_kmsg_length; + for (;;) { + char *end; + size_t skip; + + end = memchr(p, '\n', remaining); + if (end) + skip = end - p + 1; + else if (remaining >= sizeof(s->proc_kmsg_buffer) - 1) { + end = p + sizeof(s->proc_kmsg_buffer) - 1; + skip = remaining; + } else + break; + + *end = 0; + + proc_kmsg_line(s, p); + + remaining -= skip; + p += skip; + } + + if (p > s->proc_kmsg_buffer) { + memmove(s->proc_kmsg_buffer, p, remaining); + s->proc_kmsg_length = remaining; + } +} + +static int system_journal_open(Server *s) { + int r; + char *fn; + sd_id128_t machine; + char ids[33]; + + r = sd_id128_get_machine(&machine); + if (r < 0) + return r; + + sd_id128_to_string(machine, ids); + + if (!s->system_journal) { + + /* First try to create the machine path, but not the prefix */ + fn = strappend("/var/log/journal/", ids); + if (!fn) + return -ENOMEM; + (void) mkdir(fn, 0755); + free(fn); + + /* The create the system journal file */ + fn = join("/var/log/journal/", ids, "/system.journal", NULL); + if (!fn) + return -ENOMEM; + + r = journal_file_open_reliably(fn, O_RDWR|O_CREAT, 0640, NULL, &s->system_journal); + free(fn); + + if (r >= 0) { + journal_default_metrics(&s->system_metrics, s->system_journal->fd); + + s->system_journal->metrics = s->system_metrics; + s->system_journal->compress = s->compress; + + server_fix_perms(s, s->system_journal, 0); + } else if (r < 0) { + + if (r != -ENOENT && r != -EROFS) + log_warning("Failed to open system journal: %s", strerror(-r)); + + r = 0; + } + } + + if (!s->runtime_journal) { + + fn = join("/run/log/journal/", ids, "/system.journal", NULL); + if (!fn) + return -ENOMEM; + + if (s->system_journal) { + + /* Try to open the runtime journal, but only + * if it already exists, so that we can flush + * it into the system journal */ + + r = journal_file_open(fn, O_RDWR, 0640, NULL, &s->runtime_journal); + free(fn); + + if (r < 0) { + if (r != -ENOENT) + log_warning("Failed to open runtime journal: %s", strerror(-r)); + + r = 0; + } + + } else { + + /* OK, we really need the runtime journal, so create + * it if necessary. */ + + (void) mkdir_parents(fn, 0755); + r = journal_file_open_reliably(fn, O_RDWR|O_CREAT, 0640, NULL, &s->runtime_journal); + free(fn); + + if (r < 0) { + log_error("Failed to open runtime journal: %s", strerror(-r)); + return r; + } + } + + if (s->runtime_journal) { + journal_default_metrics(&s->runtime_metrics, s->runtime_journal->fd); + + s->runtime_journal->metrics = s->runtime_metrics; + s->runtime_journal->compress = s->compress; + + server_fix_perms(s, s->runtime_journal, 0); + } + } + + return r; +} + +static int server_flush_to_var(Server *s) { + char path[] = "/run/log/journal/xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"; + Object *o = NULL; + int r; + sd_id128_t machine; + sd_journal *j; + usec_t ts; + + assert(s); + + if (!s->runtime_journal) + return 0; + + ts = now(CLOCK_MONOTONIC); + if (s->var_available_timestamp + RECHECK_VAR_AVAILABLE_USEC > ts) + return 0; + + s->var_available_timestamp = ts; + + system_journal_open(s); + + if (!s->system_journal) + return 0; + + log_info("Flushing to /var..."); + + r = sd_id128_get_machine(&machine); + if (r < 0) { + log_error("Failed to get machine id: %s", strerror(-r)); + return r; + } + + r = sd_journal_open(&j, SD_JOURNAL_RUNTIME_ONLY); + if (r < 0) { + log_error("Failed to read runtime journal: %s", strerror(-r)); + return r; + } + + SD_JOURNAL_FOREACH(j) { + JournalFile *f; + + f = j->current_file; + assert(f && f->current_offset > 0); + + r = journal_file_move_to_object(f, OBJECT_ENTRY, f->current_offset, &o); + if (r < 0) { + log_error("Can't read entry: %s", strerror(-r)); + goto finish; + } + + r = journal_file_copy_entry(f, s->system_journal, o, f->current_offset, NULL, NULL, NULL); + if (r == -E2BIG) { + log_info("Allocation limit reached."); + + journal_file_post_change(s->system_journal); + server_rotate(s); + server_vacuum(s); + + r = journal_file_copy_entry(f, s->system_journal, o, f->current_offset, NULL, NULL, NULL); + } + + if (r < 0) { + log_error("Can't write entry: %s", strerror(-r)); + goto finish; + } + } + +finish: + journal_file_post_change(s->system_journal); + + journal_file_close(s->runtime_journal); + s->runtime_journal = NULL; + + if (r >= 0) { + sd_id128_to_string(machine, path + 17); + rm_rf(path, false, true, false); + } + + return r; +} + +static int server_read_proc_kmsg(Server *s) { + ssize_t l; + assert(s); + assert(s->proc_kmsg_fd >= 0); + + l = read(s->proc_kmsg_fd, s->proc_kmsg_buffer + s->proc_kmsg_length, sizeof(s->proc_kmsg_buffer) - 1 - s->proc_kmsg_length); + if (l < 0) { + + if (errno == EAGAIN || errno == EINTR) + return 0; + + log_error("Failed to read from kernel: %m"); + return -errno; + } + + s->proc_kmsg_length += l; + + proc_kmsg_scan(s); + return 1; +} + +static int server_flush_proc_kmsg(Server *s) { + int r; + + assert(s); + + if (s->proc_kmsg_fd < 0) + return 0; + + log_info("Flushing /proc/kmsg..."); + + for (;;) { + r = server_read_proc_kmsg(s); + if (r < 0) + return r; + + if (r == 0) + break; + } + + return 0; +} + +static int process_event(Server *s, struct epoll_event *ev) { + assert(s); + + if (ev->data.fd == s->signal_fd) { + struct signalfd_siginfo sfsi; + ssize_t n; + + if (ev->events != EPOLLIN) { + log_info("Got invalid event from epoll."); + return -EIO; + } + + n = read(s->signal_fd, &sfsi, sizeof(sfsi)); + if (n != sizeof(sfsi)) { + + if (n >= 0) + return -EIO; + + if (errno == EINTR || errno == EAGAIN) + return 1; + + return -errno; + } + + if (sfsi.ssi_signo == SIGUSR1) { + server_flush_to_var(s); + return 0; + } + + log_debug("Received SIG%s", signal_to_string(sfsi.ssi_signo)); + return 0; + + } else if (ev->data.fd == s->proc_kmsg_fd) { + int r; + + if (ev->events != EPOLLIN) { + log_info("Got invalid event from epoll."); + return -EIO; + } + + r = server_read_proc_kmsg(s); + if (r < 0) + return r; + + return 1; + + } else if (ev->data.fd == s->native_fd || + ev->data.fd == s->syslog_fd) { + + if (ev->events != EPOLLIN) { + log_info("Got invalid event from epoll."); + return -EIO; + } + + for (;;) { + struct msghdr msghdr; + struct iovec iovec; + struct ucred *ucred = NULL; + struct timeval *tv = NULL; + struct cmsghdr *cmsg; + char *label = NULL; + size_t label_len = 0; + union { + struct cmsghdr cmsghdr; + + /* We use NAME_MAX space for the + * SELinux label here. The kernel + * currently enforces no limit, but + * according to suggestions from the + * SELinux people this will change and + * it will probably be identical to + * NAME_MAX. For now we use that, but + * this should be updated one day when + * the final limit is known.*/ + uint8_t buf[CMSG_SPACE(sizeof(struct ucred)) + + CMSG_SPACE(sizeof(struct timeval)) + + CMSG_SPACE(sizeof(int)) + /* fd */ + CMSG_SPACE(NAME_MAX)]; /* selinux label */ + } control; + ssize_t n; + int v; + int *fds = NULL; + unsigned n_fds = 0; + + if (ioctl(ev->data.fd, SIOCINQ, &v) < 0) { + log_error("SIOCINQ failed: %m"); + return -errno; + } + + if (s->buffer_size < (size_t) v) { + void *b; + size_t l; + + l = MAX(LINE_MAX + (size_t) v, s->buffer_size * 2); + b = realloc(s->buffer, l+1); + + if (!b) { + log_error("Couldn't increase buffer."); + return -ENOMEM; + } + + s->buffer_size = l; + s->buffer = b; + } + + zero(iovec); + iovec.iov_base = s->buffer; + iovec.iov_len = s->buffer_size; + + zero(control); + zero(msghdr); + msghdr.msg_iov = &iovec; + msghdr.msg_iovlen = 1; + msghdr.msg_control = &control; + msghdr.msg_controllen = sizeof(control); + + n = recvmsg(ev->data.fd, &msghdr, MSG_DONTWAIT|MSG_CMSG_CLOEXEC); + if (n < 0) { + + if (errno == EINTR || errno == EAGAIN) + return 1; + + log_error("recvmsg() failed: %m"); + return -errno; + } + + for (cmsg = CMSG_FIRSTHDR(&msghdr); cmsg; cmsg = CMSG_NXTHDR(&msghdr, cmsg)) { + + if (cmsg->cmsg_level == SOL_SOCKET && + cmsg->cmsg_type == SCM_CREDENTIALS && + cmsg->cmsg_len == CMSG_LEN(sizeof(struct ucred))) + ucred = (struct ucred*) CMSG_DATA(cmsg); + else if (cmsg->cmsg_level == SOL_SOCKET && + cmsg->cmsg_type == SCM_SECURITY) { + label = (char*) CMSG_DATA(cmsg); + label_len = cmsg->cmsg_len - CMSG_LEN(0); + } else if (cmsg->cmsg_level == SOL_SOCKET && + cmsg->cmsg_type == SO_TIMESTAMP && + cmsg->cmsg_len == CMSG_LEN(sizeof(struct timeval))) + tv = (struct timeval*) CMSG_DATA(cmsg); + else if (cmsg->cmsg_level == SOL_SOCKET && + cmsg->cmsg_type == SCM_RIGHTS) { + fds = (int*) CMSG_DATA(cmsg); + n_fds = (cmsg->cmsg_len - CMSG_LEN(0)) / sizeof(int); + } + } + + if (ev->data.fd == s->syslog_fd) { + char *e; + + if (n > 0 && n_fds == 0) { + e = memchr(s->buffer, '\n', n); + if (e) + *e = 0; + else + s->buffer[n] = 0; + + process_syslog_message(s, strstrip(s->buffer), ucred, tv, label, label_len); + } else if (n_fds > 0) + log_warning("Got file descriptors via syslog socket. Ignoring."); + + } else { + if (n > 0 && n_fds == 0) + process_native_message(s, s->buffer, n, ucred, tv, label, label_len); + else if (n == 0 && n_fds == 1) + process_native_file(s, fds[0], ucred, tv, label, label_len); + else if (n_fds > 0) + log_warning("Got too many file descriptors via native socket. Ignoring."); + } + + close_many(fds, n_fds); + } + + return 1; + + } else if (ev->data.fd == s->stdout_fd) { + + if (ev->events != EPOLLIN) { + log_info("Got invalid event from epoll."); + return -EIO; + } + + stdout_stream_new(s); + return 1; + + } else { + StdoutStream *stream; + + if ((ev->events|EPOLLIN|EPOLLHUP) != (EPOLLIN|EPOLLHUP)) { + log_info("Got invalid event from epoll."); + return -EIO; + } + + /* If it is none of the well-known fds, it must be an + * stdout stream fd. Note that this is a bit ugly here + * (since we rely that none of the well-known fds + * could be interpreted as pointer), but nonetheless + * safe, since the well-known fds would never get an + * fd > 4096, i.e. beyond the first memory page */ + + stream = ev->data.ptr; + + if (stdout_stream_process(stream) <= 0) + stdout_stream_free(stream); + + return 1; + } + + log_error("Unknown event."); + return 0; +} + +static int open_syslog_socket(Server *s) { + union sockaddr_union sa; + int one, r; + struct epoll_event ev; + + assert(s); + + if (s->syslog_fd < 0) { + + s->syslog_fd = socket(AF_UNIX, SOCK_DGRAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0); + if (s->syslog_fd < 0) { + log_error("socket() failed: %m"); + return -errno; + } + + zero(sa); + sa.un.sun_family = AF_UNIX; + strncpy(sa.un.sun_path, "/dev/log", sizeof(sa.un.sun_path)); + + unlink(sa.un.sun_path); + + r = bind(s->syslog_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path)); + if (r < 0) { + log_error("bind() failed: %m"); + return -errno; + } + + chmod(sa.un.sun_path, 0666); + } else + fd_nonblock(s->syslog_fd, 1); + + one = 1; + r = setsockopt(s->syslog_fd, SOL_SOCKET, SO_PASSCRED, &one, sizeof(one)); + if (r < 0) { + log_error("SO_PASSCRED failed: %m"); + return -errno; + } + +#ifdef HAVE_SELINUX + one = 1; + r = setsockopt(s->syslog_fd, SOL_SOCKET, SO_PASSSEC, &one, sizeof(one)); + if (r < 0) + log_warning("SO_PASSSEC failed: %m"); +#endif + + one = 1; + r = setsockopt(s->syslog_fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one)); + if (r < 0) { + log_error("SO_TIMESTAMP failed: %m"); + return -errno; + } + + zero(ev); + ev.events = EPOLLIN; + ev.data.fd = s->syslog_fd; + if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->syslog_fd, &ev) < 0) { + log_error("Failed to add syslog server fd to epoll object: %m"); + return -errno; + } + + return 0; +} + +static int open_native_socket(Server*s) { + union sockaddr_union sa; + int one, r; + struct epoll_event ev; + + assert(s); + + if (s->native_fd < 0) { + + s->native_fd = socket(AF_UNIX, SOCK_DGRAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0); + if (s->native_fd < 0) { + log_error("socket() failed: %m"); + return -errno; + } + + zero(sa); + sa.un.sun_family = AF_UNIX; + strncpy(sa.un.sun_path, "/run/systemd/journal/socket", sizeof(sa.un.sun_path)); + + unlink(sa.un.sun_path); + + r = bind(s->native_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path)); + if (r < 0) { + log_error("bind() failed: %m"); + return -errno; + } + + chmod(sa.un.sun_path, 0666); + } else + fd_nonblock(s->native_fd, 1); + + one = 1; + r = setsockopt(s->native_fd, SOL_SOCKET, SO_PASSCRED, &one, sizeof(one)); + if (r < 0) { + log_error("SO_PASSCRED failed: %m"); + return -errno; + } + +#ifdef HAVE_SELINUX + one = 1; + r = setsockopt(s->syslog_fd, SOL_SOCKET, SO_PASSSEC, &one, sizeof(one)); + if (r < 0) + log_warning("SO_PASSSEC failed: %m"); +#endif + + one = 1; + r = setsockopt(s->native_fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one)); + if (r < 0) { + log_error("SO_TIMESTAMP failed: %m"); + return -errno; + } + + zero(ev); + ev.events = EPOLLIN; + ev.data.fd = s->native_fd; + if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->native_fd, &ev) < 0) { + log_error("Failed to add native server fd to epoll object: %m"); + return -errno; + } + + return 0; +} + +static int open_stdout_socket(Server *s) { + union sockaddr_union sa; + int r; + struct epoll_event ev; + + assert(s); + + if (s->stdout_fd < 0) { + + s->stdout_fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0); + if (s->stdout_fd < 0) { + log_error("socket() failed: %m"); + return -errno; + } + + zero(sa); + sa.un.sun_family = AF_UNIX; + strncpy(sa.un.sun_path, "/run/systemd/journal/stdout", sizeof(sa.un.sun_path)); + + unlink(sa.un.sun_path); + + r = bind(s->stdout_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path)); + if (r < 0) { + log_error("bind() failed: %m"); + return -errno; + } + + chmod(sa.un.sun_path, 0666); + + if (listen(s->stdout_fd, SOMAXCONN) < 0) { + log_error("liste() failed: %m"); + return -errno; + } + } else + fd_nonblock(s->stdout_fd, 1); + + zero(ev); + ev.events = EPOLLIN; + ev.data.fd = s->stdout_fd; + if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->stdout_fd, &ev) < 0) { + log_error("Failed to add stdout server fd to epoll object: %m"); + return -errno; + } + + return 0; +} + +static int open_proc_kmsg(Server *s) { + struct epoll_event ev; + + assert(s); + + if (!s->import_proc_kmsg) + return 0; + + + s->proc_kmsg_fd = open("/proc/kmsg", O_CLOEXEC|O_NONBLOCK|O_NOCTTY); + if (s->proc_kmsg_fd < 0) { + log_warning("Failed to open /proc/kmsg, ignoring: %m"); + return 0; + } + + zero(ev); + ev.events = EPOLLIN; + ev.data.fd = s->proc_kmsg_fd; + if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->proc_kmsg_fd, &ev) < 0) { + log_error("Failed to add /proc/kmsg fd to epoll object: %m"); + return -errno; + } + + return 0; +} + +static int open_signalfd(Server *s) { + sigset_t mask; + struct epoll_event ev; + + assert(s); + + assert_se(sigemptyset(&mask) == 0); + sigset_add_many(&mask, SIGINT, SIGTERM, SIGUSR1, -1); + assert_se(sigprocmask(SIG_SETMASK, &mask, NULL) == 0); + + s->signal_fd = signalfd(-1, &mask, SFD_NONBLOCK|SFD_CLOEXEC); + if (s->signal_fd < 0) { + log_error("signalfd(): %m"); + return -errno; + } + + zero(ev); + ev.events = EPOLLIN; + ev.data.fd = s->signal_fd; + + if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->signal_fd, &ev) < 0) { + log_error("epoll_ctl(): %m"); + return -errno; + } + + return 0; +} + +static int server_parse_proc_cmdline(Server *s) { + char *line, *w, *state; + int r; + size_t l; + + if (detect_container(NULL) > 0) + return 0; + + r = read_one_line_file("/proc/cmdline", &line); + if (r < 0) { + log_warning("Failed to read /proc/cmdline, ignoring: %s", strerror(-r)); + return 0; + } + + FOREACH_WORD_QUOTED(w, l, line, state) { + char *word; + + word = strndup(w, l); + if (!word) { + r = -ENOMEM; + goto finish; + } + + if (startswith(word, "systemd_journald.forward_to_syslog=")) { + r = parse_boolean(word + 35); + if (r < 0) + log_warning("Failed to parse forward to syslog switch %s. Ignoring.", word + 35); + else + s->forward_to_syslog = r; + } else if (startswith(word, "systemd_journald.forward_to_kmsg=")) { + r = parse_boolean(word + 33); + if (r < 0) + log_warning("Failed to parse forward to kmsg switch %s. Ignoring.", word + 33); + else + s->forward_to_kmsg = r; + } else if (startswith(word, "systemd_journald.forward_to_console=")) { + r = parse_boolean(word + 36); + if (r < 0) + log_warning("Failed to parse forward to console switch %s. Ignoring.", word + 36); + else + s->forward_to_console = r; + } + + free(word); + } + + r = 0; + +finish: + free(line); + return r; +} + +static int server_parse_config_file(Server *s) { + FILE *f; + const char *fn; + int r; + + assert(s); + + fn = "/etc/systemd/journald.conf"; + f = fopen(fn, "re"); + if (!f) { + if (errno == ENOENT) + return 0; + + log_warning("Failed to open configuration file %s: %m", fn); + return -errno; + } + + r = config_parse(fn, f, "Journal\0", config_item_perf_lookup, (void*) journald_gperf_lookup, false, s); + if (r < 0) + log_warning("Failed to parse configuration file: %s", strerror(-r)); + + fclose(f); + + return r; +} + +static int server_init(Server *s) { + int n, r, fd; + + assert(s); + + zero(*s); + s->syslog_fd = s->native_fd = s->stdout_fd = s->signal_fd = s->epoll_fd = s->proc_kmsg_fd = -1; + s->compress = true; + + s->rate_limit_interval = DEFAULT_RATE_LIMIT_INTERVAL; + s->rate_limit_burst = DEFAULT_RATE_LIMIT_BURST; + + s->forward_to_syslog = true; + s->import_proc_kmsg = true; + + memset(&s->system_metrics, 0xFF, sizeof(s->system_metrics)); + memset(&s->runtime_metrics, 0xFF, sizeof(s->runtime_metrics)); + + server_parse_config_file(s); + server_parse_proc_cmdline(s); + + s->user_journals = hashmap_new(trivial_hash_func, trivial_compare_func); + if (!s->user_journals) { + log_error("Out of memory."); + return -ENOMEM; + } + + s->epoll_fd = epoll_create1(EPOLL_CLOEXEC); + if (s->epoll_fd < 0) { + log_error("Failed to create epoll object: %m"); + return -errno; + } + + n = sd_listen_fds(true); + if (n < 0) { + log_error("Failed to read listening file descriptors from environment: %s", strerror(-n)); + return n; + } + + for (fd = SD_LISTEN_FDS_START; fd < SD_LISTEN_FDS_START + n; fd++) { + + if (sd_is_socket_unix(fd, SOCK_DGRAM, -1, "/run/systemd/journal/socket", 0) > 0) { + + if (s->native_fd >= 0) { + log_error("Too many native sockets passed."); + return -EINVAL; + } + + s->native_fd = fd; + + } else if (sd_is_socket_unix(fd, SOCK_STREAM, 1, "/run/systemd/journal/stdout", 0) > 0) { + + if (s->stdout_fd >= 0) { + log_error("Too many stdout sockets passed."); + return -EINVAL; + } + + s->stdout_fd = fd; + + } else if (sd_is_socket_unix(fd, SOCK_DGRAM, -1, "/dev/log", 0) > 0) { + + if (s->syslog_fd >= 0) { + log_error("Too many /dev/log sockets passed."); + return -EINVAL; + } + + s->syslog_fd = fd; + + } else { + log_error("Unknown socket passed."); + return -EINVAL; + } + } + + r = open_syslog_socket(s); + if (r < 0) + return r; + + r = open_native_socket(s); + if (r < 0) + return r; + + r = open_stdout_socket(s); + if (r < 0) + return r; + + r = open_proc_kmsg(s); + if (r < 0) + return r; + + r = open_signalfd(s); + if (r < 0) + return r; + + s->rate_limit = journal_rate_limit_new(s->rate_limit_interval, s->rate_limit_burst); + if (!s->rate_limit) + return -ENOMEM; + + r = system_journal_open(s); + if (r < 0) + return r; + + return 0; +} + +static void server_done(Server *s) { + JournalFile *f; + assert(s); + + while (s->stdout_streams) + stdout_stream_free(s->stdout_streams); + + if (s->system_journal) + journal_file_close(s->system_journal); + + if (s->runtime_journal) + journal_file_close(s->runtime_journal); + + while ((f = hashmap_steal_first(s->user_journals))) + journal_file_close(f); + + hashmap_free(s->user_journals); + + if (s->epoll_fd >= 0) + close_nointr_nofail(s->epoll_fd); + + if (s->signal_fd >= 0) + close_nointr_nofail(s->signal_fd); + + if (s->syslog_fd >= 0) + close_nointr_nofail(s->syslog_fd); + + if (s->native_fd >= 0) + close_nointr_nofail(s->native_fd); + + if (s->stdout_fd >= 0) + close_nointr_nofail(s->stdout_fd); + + if (s->proc_kmsg_fd >= 0) + close_nointr_nofail(s->proc_kmsg_fd); + + if (s->rate_limit) + journal_rate_limit_free(s->rate_limit); + + free(s->buffer); +} + +int main(int argc, char *argv[]) { + Server server; + int r; + + /* if (getppid() != 1) { */ + /* log_error("This program should be invoked by init only."); */ + /* return EXIT_FAILURE; */ + /* } */ + + if (argc > 1) { + log_error("This program does not take arguments."); + return EXIT_FAILURE; + } + + log_set_target(LOG_TARGET_CONSOLE); + log_set_facility(LOG_SYSLOG); + log_parse_environment(); + log_open(); + + umask(0022); + + r = server_init(&server); + if (r < 0) + goto finish; + + server_vacuum(&server); + server_flush_to_var(&server); + server_flush_proc_kmsg(&server); + + log_debug("systemd-journald running as pid %lu", (unsigned long) getpid()); + driver_message(&server, SD_MESSAGE_JOURNAL_START, "Journal started"); + + sd_notify(false, + "READY=1\n" + "STATUS=Processing requests..."); + + for (;;) { + struct epoll_event event; + + r = epoll_wait(server.epoll_fd, &event, 1, -1); + if (r < 0) { + + if (errno == EINTR) + continue; + + log_error("epoll_wait() failed: %m"); + r = -errno; + goto finish; + } else if (r == 0) + break; + + r = process_event(&server, &event); + if (r < 0) + goto finish; + else if (r == 0) + break; + } + + log_debug("systemd-journald stopped as pid %lu", (unsigned long) getpid()); + driver_message(&server, SD_MESSAGE_JOURNAL_STOP, "Journal stopped"); + +finish: + sd_notify(false, + "STATUS=Shutting down..."); + + server_done(&server); + + return r < 0 ? EXIT_FAILURE : EXIT_SUCCESS; +} diff --git a/src/journal/journald.conf b/src/journal/journald.conf new file mode 100644 index 0000000000..710b0aa941 --- /dev/null +++ b/src/journal/journald.conf @@ -0,0 +1,25 @@ +# This file is part of systemd. +# +# systemd is free software; you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# See system-journald.conf(5) for details + +[Journal] +#Compress=yes +#RateLimitInterval=10s +#RateLimitBurst=200 +#SystemMaxUse= +#SystemKeepFree= +#SystemMaxFileSize= +#SystemMinFileSize= +#RuntimeMaxUse= +#RuntimeKeepFree= +#RuntimeMaxFileSize= +#RuntimeMinFileSize= +#ForwardToSyslog=yes +#ForwardToKMsg=no +#ForwardToConsole=no +#ImportKernel=yes diff --git a/src/journal/journald.h b/src/journal/journald.h new file mode 100644 index 0000000000..6160991ed8 --- /dev/null +++ b/src/journal/journald.h @@ -0,0 +1,86 @@ +/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/ + +#ifndef foojournaldhfoo +#define foojournaldhfoo + +/*** + This file is part of systemd. + + Copyright 2011 Lennart Poettering + + systemd is free software; you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + systemd is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with systemd; If not, see <http://www.gnu.org/licenses/>. +***/ + +#include <inttypes.h> +#include <sys/types.h> +#include <stdbool.h> + +#include "journal-file.h" +#include "hashmap.h" +#include "util.h" +#include "journal-rate-limit.h" +#include "list.h" + +typedef struct StdoutStream StdoutStream; + +typedef struct Server { + int epoll_fd; + int signal_fd; + int syslog_fd; + int native_fd; + int stdout_fd; + int proc_kmsg_fd; + + JournalFile *runtime_journal; + JournalFile *system_journal; + Hashmap *user_journals; + + uint64_t seqnum; + + char *buffer; + size_t buffer_size; + + JournalRateLimit *rate_limit; + usec_t rate_limit_interval; + unsigned rate_limit_burst; + + JournalMetrics runtime_metrics; + JournalMetrics system_metrics; + + bool compress; + + bool forward_to_kmsg; + bool forward_to_syslog; + bool forward_to_console; + + bool import_proc_kmsg; + char proc_kmsg_buffer[LINE_MAX+1]; + size_t proc_kmsg_length; + + uint64_t cached_available_space; + usec_t cached_available_space_timestamp; + + uint64_t var_available_timestamp; + + gid_t file_gid; + bool file_gid_valid; + + LIST_HEAD(StdoutStream, stdout_streams); + unsigned n_stdout_streams; +} Server; + +/* gperf lookup function */ +const struct ConfigPerfItem* journald_gperf_lookup(const char *key, unsigned length); + +#endif diff --git a/src/journal/libsystemd-journal.pc.in b/src/journal/libsystemd-journal.pc.in new file mode 100644 index 0000000000..13cc8208df --- /dev/null +++ b/src/journal/libsystemd-journal.pc.in @@ -0,0 +1,19 @@ +# This file is part of systemd. +# +# systemd is free software; you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. + +prefix=@prefix@ +exec_prefix=@exec_prefix@ +libdir=@libdir@ +includedir=@includedir@ + +Name: systemd +Description: systemd Journal Utility Library +URL: @PACKAGE_URL@ +Version: @PACKAGE_VERSION@ +Requires: libsystemd-id128 = @PACKAGE_VERSION@ +Libs: -L${libdir} -lsystemd-journal +Cflags: -I${includedir} diff --git a/src/journal/libsystemd-journal.sym b/src/journal/libsystemd-journal.sym new file mode 100644 index 0000000000..cd434aea26 --- /dev/null +++ b/src/journal/libsystemd-journal.sym @@ -0,0 +1,53 @@ +/*** + This file is part of systemd. + + systemd is free software; you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. +***/ + +/* Original symbols from systemd v38 */ + +LIBSYSTEMD_JOURNAL_38 { +global: + sd_journal_print; + sd_journal_printv; + sd_journal_send; + sd_journal_sendv; + sd_journal_stream_fd; + sd_journal_open; + sd_journal_close; + sd_journal_previous; + sd_journal_next; + sd_journal_previous_skip; + sd_journal_next_skip; + sd_journal_get_realtime_usec; + sd_journal_get_monotonic_usec; + sd_journal_get_data; + sd_journal_enumerate_data; + sd_journal_restart_data; + sd_journal_add_match; + sd_journal_flush_matches; + sd_journal_seek_head; + sd_journal_seek_tail; + sd_journal_seek_monotonic_usec; + sd_journal_seek_realtime_usec; + sd_journal_seek_cursor; + sd_journal_get_cursor; + sd_journal_query_unique; + sd_journal_enumerate_unique; + sd_journal_restart_unique; + sd_journal_get_fd; + sd_journal_process; +local: + *; +}; + +LIBSYSTEMD_JOURNAL_45 { +global: + sd_journal_print_with_location; + sd_journal_printv_with_location; + sd_journal_send_with_location; + sd_journal_sendv_with_location; +} LIBSYSTEMD_JOURNAL_38; diff --git a/src/journal/lookup3.c b/src/journal/lookup3.c new file mode 100644 index 0000000000..b90093a5e2 --- /dev/null +++ b/src/journal/lookup3.c @@ -0,0 +1,1003 @@ +/* Slightly modified by Lennart Poettering, to avoid name clashes, and + * unexport a few functions. */ + +#include "lookup3.h" + +/* +------------------------------------------------------------------------------- +lookup3.c, by Bob Jenkins, May 2006, Public Domain. + +These are functions for producing 32-bit hashes for hash table lookup. +hashword(), hashlittle(), hashlittle2(), hashbig(), mix(), and final() +are externally useful functions. Routines to test the hash are included +if SELF_TEST is defined. You can use this free for any purpose. It's in +the public domain. It has no warranty. + +You probably want to use hashlittle(). hashlittle() and hashbig() +hash byte arrays. hashlittle() is is faster than hashbig() on +little-endian machines. Intel and AMD are little-endian machines. +On second thought, you probably want hashlittle2(), which is identical to +hashlittle() except it returns two 32-bit hashes for the price of one. +You could implement hashbig2() if you wanted but I haven't bothered here. + +If you want to find a hash of, say, exactly 7 integers, do + a = i1; b = i2; c = i3; + mix(a,b,c); + a += i4; b += i5; c += i6; + mix(a,b,c); + a += i7; + final(a,b,c); +then use c as the hash value. If you have a variable length array of +4-byte integers to hash, use hashword(). If you have a byte array (like +a character string), use hashlittle(). If you have several byte arrays, or +a mix of things, see the comments above hashlittle(). + +Why is this so big? I read 12 bytes at a time into 3 4-byte integers, +then mix those integers. This is fast (you can do a lot more thorough +mixing with 12*3 instructions on 3 integers than you can with 3 instructions +on 1 byte), but shoehorning those bytes into integers efficiently is messy. +------------------------------------------------------------------------------- +*/ +/* #define SELF_TEST 1 */ + +#include <stdio.h> /* defines printf for tests */ +#include <time.h> /* defines time_t for timings in the test */ +#include <stdint.h> /* defines uint32_t etc */ +#include <sys/param.h> /* attempt to define endianness */ +#ifdef linux +# include <endian.h> /* attempt to define endianness */ +#endif + +/* + * My best guess at if you are big-endian or little-endian. This may + * need adjustment. + */ +#if (defined(__BYTE_ORDER) && defined(__LITTLE_ENDIAN) && \ + __BYTE_ORDER == __LITTLE_ENDIAN) || \ + (defined(i386) || defined(__i386__) || defined(__i486__) || \ + defined(__i586__) || defined(__i686__) || defined(vax) || defined(MIPSEL)) +# define HASH_LITTLE_ENDIAN 1 +# define HASH_BIG_ENDIAN 0 +#elif (defined(__BYTE_ORDER) && defined(__BIG_ENDIAN) && \ + __BYTE_ORDER == __BIG_ENDIAN) || \ + (defined(sparc) || defined(POWERPC) || defined(mc68000) || defined(sel)) +# define HASH_LITTLE_ENDIAN 0 +# define HASH_BIG_ENDIAN 1 +#else +# define HASH_LITTLE_ENDIAN 0 +# define HASH_BIG_ENDIAN 0 +#endif + +#define hashsize(n) ((uint32_t)1<<(n)) +#define hashmask(n) (hashsize(n)-1) +#define rot(x,k) (((x)<<(k)) | ((x)>>(32-(k)))) + +/* +------------------------------------------------------------------------------- +mix -- mix 3 32-bit values reversibly. + +This is reversible, so any information in (a,b,c) before mix() is +still in (a,b,c) after mix(). + +If four pairs of (a,b,c) inputs are run through mix(), or through +mix() in reverse, there are at least 32 bits of the output that +are sometimes the same for one pair and different for another pair. +This was tested for: +* pairs that differed by one bit, by two bits, in any combination + of top bits of (a,b,c), or in any combination of bottom bits of + (a,b,c). +* "differ" is defined as +, -, ^, or ~^. For + and -, I transformed + the output delta to a Gray code (a^(a>>1)) so a string of 1's (as + is commonly produced by subtraction) look like a single 1-bit + difference. +* the base values were pseudorandom, all zero but one bit set, or + all zero plus a counter that starts at zero. + +Some k values for my "a-=c; a^=rot(c,k); c+=b;" arrangement that +satisfy this are + 4 6 8 16 19 4 + 9 15 3 18 27 15 + 14 9 3 7 17 3 +Well, "9 15 3 18 27 15" didn't quite get 32 bits diffing +for "differ" defined as + with a one-bit base and a two-bit delta. I +used http://burtleburtle.net/bob/hash/avalanche.html to choose +the operations, constants, and arrangements of the variables. + +This does not achieve avalanche. There are input bits of (a,b,c) +that fail to affect some output bits of (a,b,c), especially of a. The +most thoroughly mixed value is c, but it doesn't really even achieve +avalanche in c. + +This allows some parallelism. Read-after-writes are good at doubling +the number of bits affected, so the goal of mixing pulls in the opposite +direction as the goal of parallelism. I did what I could. Rotates +seem to cost as much as shifts on every machine I could lay my hands +on, and rotates are much kinder to the top and bottom bits, so I used +rotates. +------------------------------------------------------------------------------- +*/ +#define mix(a,b,c) \ +{ \ + a -= c; a ^= rot(c, 4); c += b; \ + b -= a; b ^= rot(a, 6); a += c; \ + c -= b; c ^= rot(b, 8); b += a; \ + a -= c; a ^= rot(c,16); c += b; \ + b -= a; b ^= rot(a,19); a += c; \ + c -= b; c ^= rot(b, 4); b += a; \ +} + +/* +------------------------------------------------------------------------------- +final -- final mixing of 3 32-bit values (a,b,c) into c + +Pairs of (a,b,c) values differing in only a few bits will usually +produce values of c that look totally different. This was tested for +* pairs that differed by one bit, by two bits, in any combination + of top bits of (a,b,c), or in any combination of bottom bits of + (a,b,c). +* "differ" is defined as +, -, ^, or ~^. For + and -, I transformed + the output delta to a Gray code (a^(a>>1)) so a string of 1's (as + is commonly produced by subtraction) look like a single 1-bit + difference. +* the base values were pseudorandom, all zero but one bit set, or + all zero plus a counter that starts at zero. + +These constants passed: + 14 11 25 16 4 14 24 + 12 14 25 16 4 14 24 +and these came close: + 4 8 15 26 3 22 24 + 10 8 15 26 3 22 24 + 11 8 15 26 3 22 24 +------------------------------------------------------------------------------- +*/ +#define final(a,b,c) \ +{ \ + c ^= b; c -= rot(b,14); \ + a ^= c; a -= rot(c,11); \ + b ^= a; b -= rot(a,25); \ + c ^= b; c -= rot(b,16); \ + a ^= c; a -= rot(c,4); \ + b ^= a; b -= rot(a,14); \ + c ^= b; c -= rot(b,24); \ +} + +/* +-------------------------------------------------------------------- + This works on all machines. To be useful, it requires + -- that the key be an array of uint32_t's, and + -- that the length be the number of uint32_t's in the key + + The function hashword() is identical to hashlittle() on little-endian + machines, and identical to hashbig() on big-endian machines, + except that the length has to be measured in uint32_ts rather than in + bytes. hashlittle() is more complicated than hashword() only because + hashlittle() has to dance around fitting the key bytes into registers. +-------------------------------------------------------------------- +*/ +uint32_t jenkins_hashword( +const uint32_t *k, /* the key, an array of uint32_t values */ +size_t length, /* the length of the key, in uint32_ts */ +uint32_t initval) /* the previous hash, or an arbitrary value */ +{ + uint32_t a,b,c; + + /* Set up the internal state */ + a = b = c = 0xdeadbeef + (((uint32_t)length)<<2) + initval; + + /*------------------------------------------------- handle most of the key */ + while (length > 3) + { + a += k[0]; + b += k[1]; + c += k[2]; + mix(a,b,c); + length -= 3; + k += 3; + } + + /*------------------------------------------- handle the last 3 uint32_t's */ + switch(length) /* all the case statements fall through */ + { + case 3 : c+=k[2]; + case 2 : b+=k[1]; + case 1 : a+=k[0]; + final(a,b,c); + case 0: /* case 0: nothing left to add */ + break; + } + /*------------------------------------------------------ report the result */ + return c; +} + + +/* +-------------------------------------------------------------------- +hashword2() -- same as hashword(), but take two seeds and return two +32-bit values. pc and pb must both be nonnull, and *pc and *pb must +both be initialized with seeds. If you pass in (*pb)==0, the output +(*pc) will be the same as the return value from hashword(). +-------------------------------------------------------------------- +*/ +void jenkins_hashword2 ( +const uint32_t *k, /* the key, an array of uint32_t values */ +size_t length, /* the length of the key, in uint32_ts */ +uint32_t *pc, /* IN: seed OUT: primary hash value */ +uint32_t *pb) /* IN: more seed OUT: secondary hash value */ +{ + uint32_t a,b,c; + + /* Set up the internal state */ + a = b = c = 0xdeadbeef + ((uint32_t)(length<<2)) + *pc; + c += *pb; + + /*------------------------------------------------- handle most of the key */ + while (length > 3) + { + a += k[0]; + b += k[1]; + c += k[2]; + mix(a,b,c); + length -= 3; + k += 3; + } + + /*------------------------------------------- handle the last 3 uint32_t's */ + switch(length) /* all the case statements fall through */ + { + case 3 : c+=k[2]; + case 2 : b+=k[1]; + case 1 : a+=k[0]; + final(a,b,c); + case 0: /* case 0: nothing left to add */ + break; + } + /*------------------------------------------------------ report the result */ + *pc=c; *pb=b; +} + + +/* +------------------------------------------------------------------------------- +hashlittle() -- hash a variable-length key into a 32-bit value + k : the key (the unaligned variable-length array of bytes) + length : the length of the key, counting by bytes + initval : can be any 4-byte value +Returns a 32-bit value. Every bit of the key affects every bit of +the return value. Two keys differing by one or two bits will have +totally different hash values. + +The best hash table sizes are powers of 2. There is no need to do +mod a prime (mod is sooo slow!). If you need less than 32 bits, +use a bitmask. For example, if you need only 10 bits, do + h = (h & hashmask(10)); +In which case, the hash table should have hashsize(10) elements. + +If you are hashing n strings (uint8_t **)k, do it like this: + for (i=0, h=0; i<n; ++i) h = hashlittle( k[i], len[i], h); + +By Bob Jenkins, 2006. bob_jenkins@burtleburtle.net. You may use this +code any way you wish, private, educational, or commercial. It's free. + +Use for hash table lookup, or anything where one collision in 2^^32 is +acceptable. Do NOT use for cryptographic purposes. +------------------------------------------------------------------------------- +*/ + +uint32_t jenkins_hashlittle( const void *key, size_t length, uint32_t initval) +{ + uint32_t a,b,c; /* internal state */ + union { const void *ptr; size_t i; } u; /* needed for Mac Powerbook G4 */ + + /* Set up the internal state */ + a = b = c = 0xdeadbeef + ((uint32_t)length) + initval; + + u.ptr = key; + if (HASH_LITTLE_ENDIAN && ((u.i & 0x3) == 0)) { + const uint32_t *k = (const uint32_t *)key; /* read 32-bit chunks */ + + /*------ all but last block: aligned reads and affect 32 bits of (a,b,c) */ + while (length > 12) + { + a += k[0]; + b += k[1]; + c += k[2]; + mix(a,b,c); + length -= 12; + k += 3; + } + + /*----------------------------- handle the last (probably partial) block */ + /* + * "k[2]&0xffffff" actually reads beyond the end of the string, but + * then masks off the part it's not allowed to read. Because the + * string is aligned, the masked-off tail is in the same word as the + * rest of the string. Every machine with memory protection I've seen + * does it on word boundaries, so is OK with this. But VALGRIND will + * still catch it and complain. The masking trick does make the hash + * noticably faster for short strings (like English words). + */ +#ifndef VALGRIND + + switch(length) + { + case 12: c+=k[2]; b+=k[1]; a+=k[0]; break; + case 11: c+=k[2]&0xffffff; b+=k[1]; a+=k[0]; break; + case 10: c+=k[2]&0xffff; b+=k[1]; a+=k[0]; break; + case 9 : c+=k[2]&0xff; b+=k[1]; a+=k[0]; break; + case 8 : b+=k[1]; a+=k[0]; break; + case 7 : b+=k[1]&0xffffff; a+=k[0]; break; + case 6 : b+=k[1]&0xffff; a+=k[0]; break; + case 5 : b+=k[1]&0xff; a+=k[0]; break; + case 4 : a+=k[0]; break; + case 3 : a+=k[0]&0xffffff; break; + case 2 : a+=k[0]&0xffff; break; + case 1 : a+=k[0]&0xff; break; + case 0 : return c; /* zero length strings require no mixing */ + } + +#else /* make valgrind happy */ + + k8 = (const uint8_t *)k; + switch(length) + { + case 12: c+=k[2]; b+=k[1]; a+=k[0]; break; + case 11: c+=((uint32_t)k8[10])<<16; /* fall through */ + case 10: c+=((uint32_t)k8[9])<<8; /* fall through */ + case 9 : c+=k8[8]; /* fall through */ + case 8 : b+=k[1]; a+=k[0]; break; + case 7 : b+=((uint32_t)k8[6])<<16; /* fall through */ + case 6 : b+=((uint32_t)k8[5])<<8; /* fall through */ + case 5 : b+=k8[4]; /* fall through */ + case 4 : a+=k[0]; break; + case 3 : a+=((uint32_t)k8[2])<<16; /* fall through */ + case 2 : a+=((uint32_t)k8[1])<<8; /* fall through */ + case 1 : a+=k8[0]; break; + case 0 : return c; + } + +#endif /* !valgrind */ + + } else if (HASH_LITTLE_ENDIAN && ((u.i & 0x1) == 0)) { + const uint16_t *k = (const uint16_t *)key; /* read 16-bit chunks */ + const uint8_t *k8; + + /*--------------- all but last block: aligned reads and different mixing */ + while (length > 12) + { + a += k[0] + (((uint32_t)k[1])<<16); + b += k[2] + (((uint32_t)k[3])<<16); + c += k[4] + (((uint32_t)k[5])<<16); + mix(a,b,c); + length -= 12; + k += 6; + } + + /*----------------------------- handle the last (probably partial) block */ + k8 = (const uint8_t *)k; + switch(length) + { + case 12: c+=k[4]+(((uint32_t)k[5])<<16); + b+=k[2]+(((uint32_t)k[3])<<16); + a+=k[0]+(((uint32_t)k[1])<<16); + break; + case 11: c+=((uint32_t)k8[10])<<16; /* fall through */ + case 10: c+=k[4]; + b+=k[2]+(((uint32_t)k[3])<<16); + a+=k[0]+(((uint32_t)k[1])<<16); + break; + case 9 : c+=k8[8]; /* fall through */ + case 8 : b+=k[2]+(((uint32_t)k[3])<<16); + a+=k[0]+(((uint32_t)k[1])<<16); + break; + case 7 : b+=((uint32_t)k8[6])<<16; /* fall through */ + case 6 : b+=k[2]; + a+=k[0]+(((uint32_t)k[1])<<16); + break; + case 5 : b+=k8[4]; /* fall through */ + case 4 : a+=k[0]+(((uint32_t)k[1])<<16); + break; + case 3 : a+=((uint32_t)k8[2])<<16; /* fall through */ + case 2 : a+=k[0]; + break; + case 1 : a+=k8[0]; + break; + case 0 : return c; /* zero length requires no mixing */ + } + + } else { /* need to read the key one byte at a time */ + const uint8_t *k = (const uint8_t *)key; + + /*--------------- all but the last block: affect some 32 bits of (a,b,c) */ + while (length > 12) + { + a += k[0]; + a += ((uint32_t)k[1])<<8; + a += ((uint32_t)k[2])<<16; + a += ((uint32_t)k[3])<<24; + b += k[4]; + b += ((uint32_t)k[5])<<8; + b += ((uint32_t)k[6])<<16; + b += ((uint32_t)k[7])<<24; + c += k[8]; + c += ((uint32_t)k[9])<<8; + c += ((uint32_t)k[10])<<16; + c += ((uint32_t)k[11])<<24; + mix(a,b,c); + length -= 12; + k += 12; + } + + /*-------------------------------- last block: affect all 32 bits of (c) */ + switch(length) /* all the case statements fall through */ + { + case 12: c+=((uint32_t)k[11])<<24; + case 11: c+=((uint32_t)k[10])<<16; + case 10: c+=((uint32_t)k[9])<<8; + case 9 : c+=k[8]; + case 8 : b+=((uint32_t)k[7])<<24; + case 7 : b+=((uint32_t)k[6])<<16; + case 6 : b+=((uint32_t)k[5])<<8; + case 5 : b+=k[4]; + case 4 : a+=((uint32_t)k[3])<<24; + case 3 : a+=((uint32_t)k[2])<<16; + case 2 : a+=((uint32_t)k[1])<<8; + case 1 : a+=k[0]; + break; + case 0 : return c; + } + } + + final(a,b,c); + return c; +} + + +/* + * hashlittle2: return 2 32-bit hash values + * + * This is identical to hashlittle(), except it returns two 32-bit hash + * values instead of just one. This is good enough for hash table + * lookup with 2^^64 buckets, or if you want a second hash if you're not + * happy with the first, or if you want a probably-unique 64-bit ID for + * the key. *pc is better mixed than *pb, so use *pc first. If you want + * a 64-bit value do something like "*pc + (((uint64_t)*pb)<<32)". + */ +void jenkins_hashlittle2( + const void *key, /* the key to hash */ + size_t length, /* length of the key */ + uint32_t *pc, /* IN: primary initval, OUT: primary hash */ + uint32_t *pb) /* IN: secondary initval, OUT: secondary hash */ +{ + uint32_t a,b,c; /* internal state */ + union { const void *ptr; size_t i; } u; /* needed for Mac Powerbook G4 */ + + /* Set up the internal state */ + a = b = c = 0xdeadbeef + ((uint32_t)length) + *pc; + c += *pb; + + u.ptr = key; + if (HASH_LITTLE_ENDIAN && ((u.i & 0x3) == 0)) { + const uint32_t *k = (const uint32_t *)key; /* read 32-bit chunks */ + + /*------ all but last block: aligned reads and affect 32 bits of (a,b,c) */ + while (length > 12) + { + a += k[0]; + b += k[1]; + c += k[2]; + mix(a,b,c); + length -= 12; + k += 3; + } + + /*----------------------------- handle the last (probably partial) block */ + /* + * "k[2]&0xffffff" actually reads beyond the end of the string, but + * then masks off the part it's not allowed to read. Because the + * string is aligned, the masked-off tail is in the same word as the + * rest of the string. Every machine with memory protection I've seen + * does it on word boundaries, so is OK with this. But VALGRIND will + * still catch it and complain. The masking trick does make the hash + * noticably faster for short strings (like English words). + */ +#ifndef VALGRIND + + switch(length) + { + case 12: c+=k[2]; b+=k[1]; a+=k[0]; break; + case 11: c+=k[2]&0xffffff; b+=k[1]; a+=k[0]; break; + case 10: c+=k[2]&0xffff; b+=k[1]; a+=k[0]; break; + case 9 : c+=k[2]&0xff; b+=k[1]; a+=k[0]; break; + case 8 : b+=k[1]; a+=k[0]; break; + case 7 : b+=k[1]&0xffffff; a+=k[0]; break; + case 6 : b+=k[1]&0xffff; a+=k[0]; break; + case 5 : b+=k[1]&0xff; a+=k[0]; break; + case 4 : a+=k[0]; break; + case 3 : a+=k[0]&0xffffff; break; + case 2 : a+=k[0]&0xffff; break; + case 1 : a+=k[0]&0xff; break; + case 0 : *pc=c; *pb=b; return; /* zero length strings require no mixing */ + } + +#else /* make valgrind happy */ + + k8 = (const uint8_t *)k; + switch(length) + { + case 12: c+=k[2]; b+=k[1]; a+=k[0]; break; + case 11: c+=((uint32_t)k8[10])<<16; /* fall through */ + case 10: c+=((uint32_t)k8[9])<<8; /* fall through */ + case 9 : c+=k8[8]; /* fall through */ + case 8 : b+=k[1]; a+=k[0]; break; + case 7 : b+=((uint32_t)k8[6])<<16; /* fall through */ + case 6 : b+=((uint32_t)k8[5])<<8; /* fall through */ + case 5 : b+=k8[4]; /* fall through */ + case 4 : a+=k[0]; break; + case 3 : a+=((uint32_t)k8[2])<<16; /* fall through */ + case 2 : a+=((uint32_t)k8[1])<<8; /* fall through */ + case 1 : a+=k8[0]; break; + case 0 : *pc=c; *pb=b; return; /* zero length strings require no mixing */ + } + +#endif /* !valgrind */ + + } else if (HASH_LITTLE_ENDIAN && ((u.i & 0x1) == 0)) { + const uint16_t *k = (const uint16_t *)key; /* read 16-bit chunks */ + const uint8_t *k8; + + /*--------------- all but last block: aligned reads and different mixing */ + while (length > 12) + { + a += k[0] + (((uint32_t)k[1])<<16); + b += k[2] + (((uint32_t)k[3])<<16); + c += k[4] + (((uint32_t)k[5])<<16); + mix(a,b,c); + length -= 12; + k += 6; + } + + /*----------------------------- handle the last (probably partial) block */ + k8 = (const uint8_t *)k; + switch(length) + { + case 12: c+=k[4]+(((uint32_t)k[5])<<16); + b+=k[2]+(((uint32_t)k[3])<<16); + a+=k[0]+(((uint32_t)k[1])<<16); + break; + case 11: c+=((uint32_t)k8[10])<<16; /* fall through */ + case 10: c+=k[4]; + b+=k[2]+(((uint32_t)k[3])<<16); + a+=k[0]+(((uint32_t)k[1])<<16); + break; + case 9 : c+=k8[8]; /* fall through */ + case 8 : b+=k[2]+(((uint32_t)k[3])<<16); + a+=k[0]+(((uint32_t)k[1])<<16); + break; + case 7 : b+=((uint32_t)k8[6])<<16; /* fall through */ + case 6 : b+=k[2]; + a+=k[0]+(((uint32_t)k[1])<<16); + break; + case 5 : b+=k8[4]; /* fall through */ + case 4 : a+=k[0]+(((uint32_t)k[1])<<16); + break; + case 3 : a+=((uint32_t)k8[2])<<16; /* fall through */ + case 2 : a+=k[0]; + break; + case 1 : a+=k8[0]; + break; + case 0 : *pc=c; *pb=b; return; /* zero length strings require no mixing */ + } + + } else { /* need to read the key one byte at a time */ + const uint8_t *k = (const uint8_t *)key; + + /*--------------- all but the last block: affect some 32 bits of (a,b,c) */ + while (length > 12) + { + a += k[0]; + a += ((uint32_t)k[1])<<8; + a += ((uint32_t)k[2])<<16; + a += ((uint32_t)k[3])<<24; + b += k[4]; + b += ((uint32_t)k[5])<<8; + b += ((uint32_t)k[6])<<16; + b += ((uint32_t)k[7])<<24; + c += k[8]; + c += ((uint32_t)k[9])<<8; + c += ((uint32_t)k[10])<<16; + c += ((uint32_t)k[11])<<24; + mix(a,b,c); + length -= 12; + k += 12; + } + + /*-------------------------------- last block: affect all 32 bits of (c) */ + switch(length) /* all the case statements fall through */ + { + case 12: c+=((uint32_t)k[11])<<24; + case 11: c+=((uint32_t)k[10])<<16; + case 10: c+=((uint32_t)k[9])<<8; + case 9 : c+=k[8]; + case 8 : b+=((uint32_t)k[7])<<24; + case 7 : b+=((uint32_t)k[6])<<16; + case 6 : b+=((uint32_t)k[5])<<8; + case 5 : b+=k[4]; + case 4 : a+=((uint32_t)k[3])<<24; + case 3 : a+=((uint32_t)k[2])<<16; + case 2 : a+=((uint32_t)k[1])<<8; + case 1 : a+=k[0]; + break; + case 0 : *pc=c; *pb=b; return; /* zero length strings require no mixing */ + } + } + + final(a,b,c); + *pc=c; *pb=b; +} + + + +/* + * hashbig(): + * This is the same as hashword() on big-endian machines. It is different + * from hashlittle() on all machines. hashbig() takes advantage of + * big-endian byte ordering. + */ +uint32_t jenkins_hashbig( const void *key, size_t length, uint32_t initval) +{ + uint32_t a,b,c; + union { const void *ptr; size_t i; } u; /* to cast key to (size_t) happily */ + + /* Set up the internal state */ + a = b = c = 0xdeadbeef + ((uint32_t)length) + initval; + + u.ptr = key; + if (HASH_BIG_ENDIAN && ((u.i & 0x3) == 0)) { + const uint32_t *k = (const uint32_t *)key; /* read 32-bit chunks */ + + /*------ all but last block: aligned reads and affect 32 bits of (a,b,c) */ + while (length > 12) + { + a += k[0]; + b += k[1]; + c += k[2]; + mix(a,b,c); + length -= 12; + k += 3; + } + + /*----------------------------- handle the last (probably partial) block */ + /* + * "k[2]<<8" actually reads beyond the end of the string, but + * then shifts out the part it's not allowed to read. Because the + * string is aligned, the illegal read is in the same word as the + * rest of the string. Every machine with memory protection I've seen + * does it on word boundaries, so is OK with this. But VALGRIND will + * still catch it and complain. The masking trick does make the hash + * noticably faster for short strings (like English words). + */ +#ifndef VALGRIND + + switch(length) + { + case 12: c+=k[2]; b+=k[1]; a+=k[0]; break; + case 11: c+=k[2]&0xffffff00; b+=k[1]; a+=k[0]; break; + case 10: c+=k[2]&0xffff0000; b+=k[1]; a+=k[0]; break; + case 9 : c+=k[2]&0xff000000; b+=k[1]; a+=k[0]; break; + case 8 : b+=k[1]; a+=k[0]; break; + case 7 : b+=k[1]&0xffffff00; a+=k[0]; break; + case 6 : b+=k[1]&0xffff0000; a+=k[0]; break; + case 5 : b+=k[1]&0xff000000; a+=k[0]; break; + case 4 : a+=k[0]; break; + case 3 : a+=k[0]&0xffffff00; break; + case 2 : a+=k[0]&0xffff0000; break; + case 1 : a+=k[0]&0xff000000; break; + case 0 : return c; /* zero length strings require no mixing */ + } + +#else /* make valgrind happy */ + + k8 = (const uint8_t *)k; + switch(length) /* all the case statements fall through */ + { + case 12: c+=k[2]; b+=k[1]; a+=k[0]; break; + case 11: c+=((uint32_t)k8[10])<<8; /* fall through */ + case 10: c+=((uint32_t)k8[9])<<16; /* fall through */ + case 9 : c+=((uint32_t)k8[8])<<24; /* fall through */ + case 8 : b+=k[1]; a+=k[0]; break; + case 7 : b+=((uint32_t)k8[6])<<8; /* fall through */ + case 6 : b+=((uint32_t)k8[5])<<16; /* fall through */ + case 5 : b+=((uint32_t)k8[4])<<24; /* fall through */ + case 4 : a+=k[0]; break; + case 3 : a+=((uint32_t)k8[2])<<8; /* fall through */ + case 2 : a+=((uint32_t)k8[1])<<16; /* fall through */ + case 1 : a+=((uint32_t)k8[0])<<24; break; + case 0 : return c; + } + +#endif /* !VALGRIND */ + + } else { /* need to read the key one byte at a time */ + const uint8_t *k = (const uint8_t *)key; + + /*--------------- all but the last block: affect some 32 bits of (a,b,c) */ + while (length > 12) + { + a += ((uint32_t)k[0])<<24; + a += ((uint32_t)k[1])<<16; + a += ((uint32_t)k[2])<<8; + a += ((uint32_t)k[3]); + b += ((uint32_t)k[4])<<24; + b += ((uint32_t)k[5])<<16; + b += ((uint32_t)k[6])<<8; + b += ((uint32_t)k[7]); + c += ((uint32_t)k[8])<<24; + c += ((uint32_t)k[9])<<16; + c += ((uint32_t)k[10])<<8; + c += ((uint32_t)k[11]); + mix(a,b,c); + length -= 12; + k += 12; + } + + /*-------------------------------- last block: affect all 32 bits of (c) */ + switch(length) /* all the case statements fall through */ + { + case 12: c+=k[11]; + case 11: c+=((uint32_t)k[10])<<8; + case 10: c+=((uint32_t)k[9])<<16; + case 9 : c+=((uint32_t)k[8])<<24; + case 8 : b+=k[7]; + case 7 : b+=((uint32_t)k[6])<<8; + case 6 : b+=((uint32_t)k[5])<<16; + case 5 : b+=((uint32_t)k[4])<<24; + case 4 : a+=k[3]; + case 3 : a+=((uint32_t)k[2])<<8; + case 2 : a+=((uint32_t)k[1])<<16; + case 1 : a+=((uint32_t)k[0])<<24; + break; + case 0 : return c; + } + } + + final(a,b,c); + return c; +} + + +#ifdef SELF_TEST + +/* used for timings */ +void driver1() +{ + uint8_t buf[256]; + uint32_t i; + uint32_t h=0; + time_t a,z; + + time(&a); + for (i=0; i<256; ++i) buf[i] = 'x'; + for (i=0; i<1; ++i) + { + h = hashlittle(&buf[0],1,h); + } + time(&z); + if (z-a > 0) printf("time %d %.8x\n", z-a, h); +} + +/* check that every input bit changes every output bit half the time */ +#define HASHSTATE 1 +#define HASHLEN 1 +#define MAXPAIR 60 +#define MAXLEN 70 +void driver2() +{ + uint8_t qa[MAXLEN+1], qb[MAXLEN+2], *a = &qa[0], *b = &qb[1]; + uint32_t c[HASHSTATE], d[HASHSTATE], i=0, j=0, k, l, m=0, z; + uint32_t e[HASHSTATE],f[HASHSTATE],g[HASHSTATE],h[HASHSTATE]; + uint32_t x[HASHSTATE],y[HASHSTATE]; + uint32_t hlen; + + printf("No more than %d trials should ever be needed \n",MAXPAIR/2); + for (hlen=0; hlen < MAXLEN; ++hlen) + { + z=0; + for (i=0; i<hlen; ++i) /*----------------------- for each input byte, */ + { + for (j=0; j<8; ++j) /*------------------------ for each input bit, */ + { + for (m=1; m<8; ++m) /*------------ for serveral possible initvals, */ + { + for (l=0; l<HASHSTATE; ++l) + e[l]=f[l]=g[l]=h[l]=x[l]=y[l]=~((uint32_t)0); + + /*---- check that every output bit is affected by that input bit */ + for (k=0; k<MAXPAIR; k+=2) + { + uint32_t finished=1; + /* keys have one bit different */ + for (l=0; l<hlen+1; ++l) {a[l] = b[l] = (uint8_t)0;} + /* have a and b be two keys differing in only one bit */ + a[i] ^= (k<<j); + a[i] ^= (k>>(8-j)); + c[0] = hashlittle(a, hlen, m); + b[i] ^= ((k+1)<<j); + b[i] ^= ((k+1)>>(8-j)); + d[0] = hashlittle(b, hlen, m); + /* check every bit is 1, 0, set, and not set at least once */ + for (l=0; l<HASHSTATE; ++l) + { + e[l] &= (c[l]^d[l]); + f[l] &= ~(c[l]^d[l]); + g[l] &= c[l]; + h[l] &= ~c[l]; + x[l] &= d[l]; + y[l] &= ~d[l]; + if (e[l]|f[l]|g[l]|h[l]|x[l]|y[l]) finished=0; + } + if (finished) break; + } + if (k>z) z=k; + if (k==MAXPAIR) + { + printf("Some bit didn't change: "); + printf("%.8x %.8x %.8x %.8x %.8x %.8x ", + e[0],f[0],g[0],h[0],x[0],y[0]); + printf("i %d j %d m %d len %d\n", i, j, m, hlen); + } + if (z==MAXPAIR) goto done; + } + } + } + done: + if (z < MAXPAIR) + { + printf("Mix success %2d bytes %2d initvals ",i,m); + printf("required %d trials\n", z/2); + } + } + printf("\n"); +} + +/* Check for reading beyond the end of the buffer and alignment problems */ +void driver3() +{ + uint8_t buf[MAXLEN+20], *b; + uint32_t len; + uint8_t q[] = "This is the time for all good men to come to the aid of their country..."; + uint32_t h; + uint8_t qq[] = "xThis is the time for all good men to come to the aid of their country..."; + uint32_t i; + uint8_t qqq[] = "xxThis is the time for all good men to come to the aid of their country..."; + uint32_t j; + uint8_t qqqq[] = "xxxThis is the time for all good men to come to the aid of their country..."; + uint32_t ref,x,y; + uint8_t *p; + + printf("Endianness. These lines should all be the same (for values filled in):\n"); + printf("%.8x %.8x %.8x\n", + hashword((const uint32_t *)q, (sizeof(q)-1)/4, 13), + hashword((const uint32_t *)q, (sizeof(q)-5)/4, 13), + hashword((const uint32_t *)q, (sizeof(q)-9)/4, 13)); + p = q; + printf("%.8x %.8x %.8x %.8x %.8x %.8x %.8x %.8x %.8x %.8x %.8x %.8x\n", + hashlittle(p, sizeof(q)-1, 13), hashlittle(p, sizeof(q)-2, 13), + hashlittle(p, sizeof(q)-3, 13), hashlittle(p, sizeof(q)-4, 13), + hashlittle(p, sizeof(q)-5, 13), hashlittle(p, sizeof(q)-6, 13), + hashlittle(p, sizeof(q)-7, 13), hashlittle(p, sizeof(q)-8, 13), + hashlittle(p, sizeof(q)-9, 13), hashlittle(p, sizeof(q)-10, 13), + hashlittle(p, sizeof(q)-11, 13), hashlittle(p, sizeof(q)-12, 13)); + p = &qq[1]; + printf("%.8x %.8x %.8x %.8x %.8x %.8x %.8x %.8x %.8x %.8x %.8x %.8x\n", + hashlittle(p, sizeof(q)-1, 13), hashlittle(p, sizeof(q)-2, 13), + hashlittle(p, sizeof(q)-3, 13), hashlittle(p, sizeof(q)-4, 13), + hashlittle(p, sizeof(q)-5, 13), hashlittle(p, sizeof(q)-6, 13), + hashlittle(p, sizeof(q)-7, 13), hashlittle(p, sizeof(q)-8, 13), + hashlittle(p, sizeof(q)-9, 13), hashlittle(p, sizeof(q)-10, 13), + hashlittle(p, sizeof(q)-11, 13), hashlittle(p, sizeof(q)-12, 13)); + p = &qqq[2]; + printf("%.8x %.8x %.8x %.8x %.8x %.8x %.8x %.8x %.8x %.8x %.8x %.8x\n", + hashlittle(p, sizeof(q)-1, 13), hashlittle(p, sizeof(q)-2, 13), + hashlittle(p, sizeof(q)-3, 13), hashlittle(p, sizeof(q)-4, 13), + hashlittle(p, sizeof(q)-5, 13), hashlittle(p, sizeof(q)-6, 13), + hashlittle(p, sizeof(q)-7, 13), hashlittle(p, sizeof(q)-8, 13), + hashlittle(p, sizeof(q)-9, 13), hashlittle(p, sizeof(q)-10, 13), + hashlittle(p, sizeof(q)-11, 13), hashlittle(p, sizeof(q)-12, 13)); + p = &qqqq[3]; + printf("%.8x %.8x %.8x %.8x %.8x %.8x %.8x %.8x %.8x %.8x %.8x %.8x\n", + hashlittle(p, sizeof(q)-1, 13), hashlittle(p, sizeof(q)-2, 13), + hashlittle(p, sizeof(q)-3, 13), hashlittle(p, sizeof(q)-4, 13), + hashlittle(p, sizeof(q)-5, 13), hashlittle(p, sizeof(q)-6, 13), + hashlittle(p, sizeof(q)-7, 13), hashlittle(p, sizeof(q)-8, 13), + hashlittle(p, sizeof(q)-9, 13), hashlittle(p, sizeof(q)-10, 13), + hashlittle(p, sizeof(q)-11, 13), hashlittle(p, sizeof(q)-12, 13)); + printf("\n"); + + /* check that hashlittle2 and hashlittle produce the same results */ + i=47; j=0; + hashlittle2(q, sizeof(q), &i, &j); + if (hashlittle(q, sizeof(q), 47) != i) + printf("hashlittle2 and hashlittle mismatch\n"); + + /* check that hashword2 and hashword produce the same results */ + len = 0xdeadbeef; + i=47, j=0; + hashword2(&len, 1, &i, &j); + if (hashword(&len, 1, 47) != i) + printf("hashword2 and hashword mismatch %x %x\n", + i, hashword(&len, 1, 47)); + + /* check hashlittle doesn't read before or after the ends of the string */ + for (h=0, b=buf+1; h<8; ++h, ++b) + { + for (i=0; i<MAXLEN; ++i) + { + len = i; + for (j=0; j<i; ++j) *(b+j)=0; + + /* these should all be equal */ + ref = hashlittle(b, len, (uint32_t)1); + *(b+i)=(uint8_t)~0; + *(b-1)=(uint8_t)~0; + x = hashlittle(b, len, (uint32_t)1); + y = hashlittle(b, len, (uint32_t)1); + if ((ref != x) || (ref != y)) + { + printf("alignment error: %.8x %.8x %.8x %d %d\n",ref,x,y, + h, i); + } + } + } +} + +/* check for problems with nulls */ + void driver4() +{ + uint8_t buf[1]; + uint32_t h,i,state[HASHSTATE]; + + + buf[0] = ~0; + for (i=0; i<HASHSTATE; ++i) state[i] = 1; + printf("These should all be different\n"); + for (i=0, h=0; i<8; ++i) + { + h = hashlittle(buf, 0, h); + printf("%2ld 0-byte strings, hash is %.8x\n", i, h); + } +} + +void driver5() +{ + uint32_t b,c; + b=0, c=0, hashlittle2("", 0, &c, &b); + printf("hash is %.8lx %.8lx\n", c, b); /* deadbeef deadbeef */ + b=0xdeadbeef, c=0, hashlittle2("", 0, &c, &b); + printf("hash is %.8lx %.8lx\n", c, b); /* bd5b7dde deadbeef */ + b=0xdeadbeef, c=0xdeadbeef, hashlittle2("", 0, &c, &b); + printf("hash is %.8lx %.8lx\n", c, b); /* 9c093ccd bd5b7dde */ + b=0, c=0, hashlittle2("Four score and seven years ago", 30, &c, &b); + printf("hash is %.8lx %.8lx\n", c, b); /* 17770551 ce7226e6 */ + b=1, c=0, hashlittle2("Four score and seven years ago", 30, &c, &b); + printf("hash is %.8lx %.8lx\n", c, b); /* e3607cae bd371de4 */ + b=0, c=1, hashlittle2("Four score and seven years ago", 30, &c, &b); + printf("hash is %.8lx %.8lx\n", c, b); /* cd628161 6cbea4b3 */ + c = hashlittle("Four score and seven years ago", 30, 0); + printf("hash is %.8lx\n", c); /* 17770551 */ + c = hashlittle("Four score and seven years ago", 30, 1); + printf("hash is %.8lx\n", c); /* cd628161 */ +} + + +int main() +{ + driver1(); /* test that the key is hashed: used for timings */ + driver2(); /* test that whole key is hashed thoroughly */ + driver3(); /* test that nothing but the key is hashed */ + driver4(); /* test hashing multiple buffers (all buffers are null) */ + driver5(); /* test the hash against known vectors */ + return 1; +} + +#endif /* SELF_TEST */ diff --git a/src/journal/lookup3.h b/src/journal/lookup3.h new file mode 100644 index 0000000000..31cc2f57b0 --- /dev/null +++ b/src/journal/lookup3.h @@ -0,0 +1,25 @@ +/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/ + +#ifndef foolookup3hfoo +#define foolookup3hfoo + +#include <inttypes.h> +#include <sys/types.h> + +uint32_t jenkins_hashword(const uint32_t *k, size_t length, uint32_t initval); +void jenkins_hashword2(const uint32_t *k, size_t length, uint32_t *pc, uint32_t *pb); + +uint32_t jenkins_hashlittle(const void *key, size_t length, uint32_t initval); +void jenkins_hashlittle2(const void *key, size_t length, uint32_t *pc, uint32_t *pb); + +uint32_t jenkins_hashbig(const void *key, size_t length, uint32_t initval); + +static inline uint64_t hash64(const void *data, size_t length) { + uint32_t a = 0, b = 0; + + jenkins_hashlittle2(data, length, &a, &b); + + return ((uint64_t) a << 32ULL) | (uint64_t) b; +} + +#endif diff --git a/src/journal/sd-journal.c b/src/journal/sd-journal.c new file mode 100644 index 0000000000..92ba57822b --- /dev/null +++ b/src/journal/sd-journal.c @@ -0,0 +1,1636 @@ +/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/ + +/*** + This file is part of systemd. + + Copyright 2011 Lennart Poettering + + systemd is free software; you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + systemd is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with systemd; If not, see <http://www.gnu.org/licenses/>. +***/ + +#include <errno.h> +#include <fcntl.h> +#include <stddef.h> +#include <unistd.h> +#include <sys/inotify.h> + +#include "sd-journal.h" +#include "journal-def.h" +#include "journal-file.h" +#include "hashmap.h" +#include "list.h" +#include "lookup3.h" +#include "compress.h" +#include "journal-internal.h" + +#define JOURNAL_FILES_MAX 1024 + +static void detach_location(sd_journal *j) { + Iterator i; + JournalFile *f; + + assert(j); + + j->current_file = NULL; + j->current_field = 0; + + HASHMAP_FOREACH(f, j->files, i) + f->current_offset = 0; +} + +static void reset_location(sd_journal *j) { + assert(j); + + detach_location(j); + zero(j->current_location); +} + +static void init_location(Location *l, JournalFile *f, Object *o) { + assert(l); + assert(f); + assert(o->object.type == OBJECT_ENTRY); + + l->type = LOCATION_DISCRETE; + l->seqnum = le64toh(o->entry.seqnum); + l->seqnum_id = f->header->seqnum_id; + l->realtime = le64toh(o->entry.realtime); + l->monotonic = le64toh(o->entry.monotonic); + l->boot_id = o->entry.boot_id; + l->xor_hash = le64toh(o->entry.xor_hash); + + l->seqnum_set = l->realtime_set = l->monotonic_set = l->xor_hash_set = true; +} + +static void set_location(sd_journal *j, JournalFile *f, Object *o, uint64_t offset) { + assert(j); + assert(f); + assert(o); + + init_location(&j->current_location, f, o); + + j->current_file = f; + j->current_field = 0; + + f->current_offset = offset; +} + +static int same_field(const void *_a, size_t s, const void *_b, size_t t) { + const uint8_t *a = _a, *b = _b; + size_t j; + bool a_good = false, b_good = false, different = false; + + for (j = 0; j < s && j < t; j++) { + + if (a[j] == '=') + a_good = true; + if (b[j] == '=') + b_good = true; + if (a[j] != b[j]) + different = true; + + if (a_good && b_good) + return different ? 0 : 1; + } + + return -EINVAL; +} + +_public_ int sd_journal_add_match(sd_journal *j, const void *data, size_t size) { + Match *m, *after = NULL; + le64_t le_hash; + + if (!j) + return -EINVAL; + if (!data) + return -EINVAL; + if (size <= 0) + return -EINVAL; + + le_hash = htole64(hash64(data, size)); + + LIST_FOREACH(matches, m, j->matches) { + int r; + + if (m->le_hash == le_hash && + m->size == size && + memcmp(m->data, data, size) == 0) + return 0; + + r = same_field(data, size, m->data, m->size); + if (r < 0) + return r; + else if (r > 0) + after = m; + } + + m = new0(Match, 1); + if (!m) + return -ENOMEM; + + m->size = size; + + m->data = malloc(m->size); + if (!m->data) { + free(m); + return -ENOMEM; + } + + memcpy(m->data, data, size); + m->le_hash = le_hash; + + /* Matches for the same fields we order adjacent to each + * other */ + LIST_INSERT_AFTER(Match, matches, j->matches, after, m); + j->n_matches ++; + + detach_location(j); + + return 0; +} + +_public_ void sd_journal_flush_matches(sd_journal *j) { + if (!j) + return; + + while (j->matches) { + Match *m = j->matches; + + LIST_REMOVE(Match, matches, j->matches, m); + free(m->data); + free(m); + } + + j->n_matches = 0; + + detach_location(j); +} + +static int compare_order(JournalFile *af, Object *ao, + JournalFile *bf, Object *bo) { + + uint64_t a, b; + + assert(af); + assert(ao); + assert(bf); + assert(bo); + + /* We operate on two different files here, hence we can access + * two objects at the same time, which we normally can't. + * + * If contents and timestamps match, these entries are + * identical, even if the seqnum does not match */ + + if (sd_id128_equal(ao->entry.boot_id, bo->entry.boot_id) && + ao->entry.monotonic == bo->entry.monotonic && + ao->entry.realtime == bo->entry.realtime && + ao->entry.xor_hash == bo->entry.xor_hash) + return 0; + + if (sd_id128_equal(af->header->seqnum_id, bf->header->seqnum_id)) { + + /* If this is from the same seqnum source, compare + * seqnums */ + a = le64toh(ao->entry.seqnum); + b = le64toh(bo->entry.seqnum); + + if (a < b) + return -1; + if (a > b) + return 1; + + /* Wow! This is weird, different data but the same + * seqnums? Something is borked, but let's make the + * best of it and compare by time. */ + } + + if (sd_id128_equal(ao->entry.boot_id, bo->entry.boot_id)) { + + /* If the boot id matches compare monotonic time */ + a = le64toh(ao->entry.monotonic); + b = le64toh(bo->entry.monotonic); + + if (a < b) + return -1; + if (a > b) + return 1; + } + + /* Otherwise compare UTC time */ + a = le64toh(ao->entry.realtime); + b = le64toh(ao->entry.realtime); + + if (a < b) + return -1; + if (a > b) + return 1; + + /* Finally, compare by contents */ + a = le64toh(ao->entry.xor_hash); + b = le64toh(ao->entry.xor_hash); + + if (a < b) + return -1; + if (a > b) + return 1; + + return 0; +} + +static int compare_with_location(JournalFile *af, Object *ao, Location *l) { + uint64_t a; + + assert(af); + assert(ao); + assert(l); + assert(l->type == LOCATION_DISCRETE); + + if (l->monotonic_set && + sd_id128_equal(ao->entry.boot_id, l->boot_id) && + l->realtime_set && + le64toh(ao->entry.realtime) == l->realtime && + l->xor_hash_set && + le64toh(ao->entry.xor_hash) == l->xor_hash) + return 0; + + if (l->seqnum_set && + sd_id128_equal(af->header->seqnum_id, l->seqnum_id)) { + + a = le64toh(ao->entry.seqnum); + + if (a < l->seqnum) + return -1; + if (a > l->seqnum) + return 1; + } + + if (l->monotonic_set && + sd_id128_equal(ao->entry.boot_id, l->boot_id)) { + + a = le64toh(ao->entry.monotonic); + + if (a < l->monotonic) + return -1; + if (a > l->monotonic) + return 1; + } + + if (l->realtime_set) { + + a = le64toh(ao->entry.realtime); + + if (a < l->realtime) + return -1; + if (a > l->realtime) + return 1; + } + + if (l->xor_hash_set) { + a = le64toh(ao->entry.xor_hash); + + if (a < l->xor_hash) + return -1; + if (a > l->xor_hash) + return 1; + } + + return 0; +} + +static int find_location(sd_journal *j, JournalFile *f, direction_t direction, Object **ret, uint64_t *offset) { + Object *o = NULL; + uint64_t p = 0; + int r; + + assert(j); + + if (!j->matches) { + /* No matches is simple */ + + if (j->current_location.type == LOCATION_HEAD) + r = journal_file_next_entry(f, NULL, 0, DIRECTION_DOWN, &o, &p); + else if (j->current_location.type == LOCATION_TAIL) + r = journal_file_next_entry(f, NULL, 0, DIRECTION_UP, &o, &p); + else if (j->current_location.seqnum_set && + sd_id128_equal(j->current_location.seqnum_id, f->header->seqnum_id)) + r = journal_file_move_to_entry_by_seqnum(f, j->current_location.seqnum, direction, &o, &p); + else if (j->current_location.monotonic_set) { + r = journal_file_move_to_entry_by_monotonic(f, j->current_location.boot_id, j->current_location.monotonic, direction, &o, &p); + + if (r == -ENOENT) { + /* boot id unknown in this file */ + if (j->current_location.realtime_set) + r = journal_file_move_to_entry_by_realtime(f, j->current_location.realtime, direction, &o, &p); + else + r = journal_file_next_entry(f, NULL, 0, direction, &o, &p); + } + } else if (j->current_location.realtime_set) + r = journal_file_move_to_entry_by_realtime(f, j->current_location.realtime, direction, &o, &p); + else + r = journal_file_next_entry(f, NULL, 0, direction, &o, &p); + + if (r <= 0) + return r; + + } else { + Match *m, *term_match = NULL; + Object *to = NULL; + uint64_t tp = 0; + + /* We have matches, first, let's jump to the monotonic + * position if we have any, since it implies a + * match. */ + + if (j->current_location.type == LOCATION_DISCRETE && + j->current_location.monotonic_set) { + + r = journal_file_move_to_entry_by_monotonic(f, j->current_location.boot_id, j->current_location.monotonic, direction, &o, &p); + if (r <= 0) + return r == -ENOENT ? 0 : r; + } + + LIST_FOREACH(matches, m, j->matches) { + Object *c, *d; + uint64_t cp, dp; + + r = journal_file_find_data_object_with_hash(f, m->data, m->size, le64toh(m->le_hash), &d, &dp); + if (r <= 0) + return r; + + if (j->current_location.type == LOCATION_HEAD) + r = journal_file_next_entry_for_data(f, NULL, 0, dp, DIRECTION_DOWN, &c, &cp); + else if (j->current_location.type == LOCATION_TAIL) + r = journal_file_next_entry_for_data(f, NULL, 0, dp, DIRECTION_UP, &c, &cp); + else if (j->current_location.seqnum_set && + sd_id128_equal(j->current_location.seqnum_id, f->header->seqnum_id)) + r = journal_file_move_to_entry_by_seqnum_for_data(f, dp, j->current_location.seqnum, direction, &c, &cp); + else if (j->current_location.realtime_set) + r = journal_file_move_to_entry_by_realtime_for_data(f, dp, j->current_location.realtime, direction, &c, &cp); + else + r = journal_file_next_entry_for_data(f, NULL, 0, dp, direction, &c, &cp); + + if (r < 0) + return r; + + if (!term_match) { + term_match = m; + + if (r > 0) { + to = c; + tp = cp; + } + } else if (same_field(term_match->data, term_match->size, m->data, m->size)) { + + /* Same field as previous match... */ + if (r > 0) { + + /* Find the earliest of the OR matches */ + + if (!to || + (direction == DIRECTION_DOWN && cp < tp) || + (direction == DIRECTION_UP && cp > tp)) { + to = c; + tp = cp; + } + + } + + } else { + + /* Previous term is finished, did anything match? */ + if (!to) + return 0; + + /* Find the last of the AND matches */ + if (!o || + (direction == DIRECTION_DOWN && tp > p) || + (direction == DIRECTION_UP && tp < p)) { + o = to; + p = tp; + } + + term_match = m; + + if (r > 0) { + to = c; + tp = cp; + } else { + to = NULL; + tp = 0; + } + } + } + + /* Last term is finished, did anything match? */ + if (!to) + return 0; + + if (!o || + (direction == DIRECTION_DOWN && tp > p) || + (direction == DIRECTION_UP && tp < p)) { + o = to; + p = tp; + } + + if (!o) + return 0; + } + + if (ret) + *ret = o; + + if (offset) + *offset = p; + + return 1; +} + +static int next_with_matches(sd_journal *j, JournalFile *f, direction_t direction, Object **ret, uint64_t *offset) { + int r; + uint64_t cp; + Object *c; + + assert(j); + assert(f); + assert(ret); + assert(offset); + + c = *ret; + cp = *offset; + + if (!j->matches) { + /* No matches is easy */ + + r = journal_file_next_entry(f, c, cp, direction, &c, &cp); + if (r <= 0) + return r; + + if (ret) + *ret = c; + if (offset) + *offset = cp; + return 1; + } + + /* So there are matches we have to adhere to, let's find the + * first entry that matches all of them */ + + for (;;) { + uint64_t np, n; + bool found, term_result = false; + Match *m, *term_match = NULL; + Object *npo = NULL; + + n = journal_file_entry_n_items(c); + + /* Make sure we don't match the entry we are starting + * from. */ + found = cp != *offset; + + np = 0; + LIST_FOREACH(matches, m, j->matches) { + uint64_t q, k; + Object *qo = NULL; + + /* Let's check if this is the beginning of a + * new term, i.e. has a different field prefix + * as the preceeding match. */ + if (!term_match) { + term_match = m; + term_result = false; + } else if (!same_field(term_match->data, term_match->size, m->data, m->size)) { + if (!term_result) + found = false; + + term_match = m; + term_result = false; + } + + for (k = 0; k < n; k++) + if (c->entry.items[k].hash == m->le_hash) + break; + + if (k >= n) { + /* Hmm, didn't find any field that + * matched this rule, so ignore this + * match. Go on with next match */ + continue; + } + + term_result = true; + + /* Hmm, so, this field matched, let's remember + * where we'd have to try next, in case the other + * matches are not OK */ + + r = journal_file_next_entry_for_data(f, c, cp, le64toh(c->entry.items[k].object_offset), direction, &qo, &q); + /* This pointer is invalidated if the window was + * remapped. May need to re-fetch it later */ + c = NULL; + if (r < 0) + return r; + + if (r > 0) { + + if (direction == DIRECTION_DOWN) { + if (q > np) { + np = q; + npo = qo; + } + } else { + if (np == 0 || q < np) { + np = q; + npo = qo; + } + } + } + } + + /* Check the last term */ + if (term_match && !term_result) + found = false; + + /* Did this entry match against all matches? */ + if (found) { + if (ret) { + if (c == NULL) { + /* Re-fetch the entry */ + r = journal_file_move_to_object(f, OBJECT_ENTRY, cp, &c); + if (r < 0) + return r; + } + *ret = c; + } + if (offset) + *offset = cp; + return 1; + } + + /* Did we find a subsequent entry? */ + if (np == 0) + return 0; + + /* Hmm, ok, this entry only matched partially, so + * let's try another one */ + cp = np; + c = npo; + } +} + +static int next_beyond_location(sd_journal *j, JournalFile *f, direction_t direction, Object **ret, uint64_t *offset) { + Object *c; + uint64_t cp; + int compare_value, r; + + assert(j); + assert(f); + + if (f->current_offset > 0) { + cp = f->current_offset; + + r = journal_file_move_to_object(f, OBJECT_ENTRY, cp, &c); + if (r < 0) + return r; + + r = next_with_matches(j, f, direction, &c, &cp); + if (r <= 0) + return r; + + compare_value = 1; + } else { + r = find_location(j, f, direction, &c, &cp); + if (r <= 0) + return r; + + compare_value = 0; + } + + for (;;) { + bool found; + + if (j->current_location.type == LOCATION_DISCRETE) { + int k; + + k = compare_with_location(f, c, &j->current_location); + if (direction == DIRECTION_DOWN) + found = k >= compare_value; + else + found = k <= -compare_value; + } else + found = true; + + if (found) { + if (ret) + *ret = c; + if (offset) + *offset = cp; + return 1; + } + + r = next_with_matches(j, f, direction, &c, &cp); + if (r <= 0) + return r; + } +} + +static int real_journal_next(sd_journal *j, direction_t direction) { + JournalFile *f, *new_current = NULL; + Iterator i; + int r; + uint64_t new_offset = 0; + Object *new_entry = NULL; + + if (!j) + return -EINVAL; + + HASHMAP_FOREACH(f, j->files, i) { + Object *o; + uint64_t p; + bool found; + + r = next_beyond_location(j, f, direction, &o, &p); + if (r < 0) + return r; + else if (r == 0) + continue; + + if (!new_current) + found = true; + else { + int k; + + k = compare_order(f, o, new_current, new_entry); + + if (direction == DIRECTION_DOWN) + found = k < 0; + else + found = k > 0; + } + + if (found) { + new_current = f; + new_entry = o; + new_offset = p; + } + } + + if (!new_current) + return 0; + + set_location(j, new_current, new_entry, new_offset); + + return 1; +} + +_public_ int sd_journal_next(sd_journal *j) { + return real_journal_next(j, DIRECTION_DOWN); +} + +_public_ int sd_journal_previous(sd_journal *j) { + return real_journal_next(j, DIRECTION_UP); +} + +static int real_journal_next_skip(sd_journal *j, direction_t direction, uint64_t skip) { + int c = 0, r; + + if (!j) + return -EINVAL; + + if (skip == 0) { + /* If this is not a discrete skip, then at least + * resolve the current location */ + if (j->current_location.type != LOCATION_DISCRETE) + return real_journal_next(j, direction); + + return 0; + } + + do { + r = real_journal_next(j, direction); + if (r < 0) + return r; + + if (r == 0) + return c; + + skip--; + c++; + } while (skip > 0); + + return c; +} + +_public_ int sd_journal_next_skip(sd_journal *j, uint64_t skip) { + return real_journal_next_skip(j, DIRECTION_DOWN, skip); +} + +_public_ int sd_journal_previous_skip(sd_journal *j, uint64_t skip) { + return real_journal_next_skip(j, DIRECTION_UP, skip); +} + +_public_ int sd_journal_get_cursor(sd_journal *j, char **cursor) { + Object *o; + int r; + char bid[33], sid[33]; + + if (!j) + return -EINVAL; + if (!cursor) + return -EINVAL; + + if (!j->current_file || j->current_file->current_offset <= 0) + return -EADDRNOTAVAIL; + + r = journal_file_move_to_object(j->current_file, OBJECT_ENTRY, j->current_file->current_offset, &o); + if (r < 0) + return r; + + sd_id128_to_string(j->current_file->header->seqnum_id, sid); + sd_id128_to_string(o->entry.boot_id, bid); + + if (asprintf(cursor, + "s=%s;i=%llx;b=%s;m=%llx;t=%llx;x=%llx;p=%s", + sid, (unsigned long long) le64toh(o->entry.seqnum), + bid, (unsigned long long) le64toh(o->entry.monotonic), + (unsigned long long) le64toh(o->entry.realtime), + (unsigned long long) le64toh(o->entry.xor_hash), + file_name_from_path(j->current_file->path)) < 0) + return -ENOMEM; + + return 1; +} + +_public_ int sd_journal_seek_cursor(sd_journal *j, const char *cursor) { + char *w; + size_t l; + char *state; + unsigned long long seqnum, monotonic, realtime, xor_hash; + bool + seqnum_id_set = false, + seqnum_set = false, + boot_id_set = false, + monotonic_set = false, + realtime_set = false, + xor_hash_set = false; + sd_id128_t seqnum_id, boot_id; + + if (!j) + return -EINVAL; + if (!cursor) + return -EINVAL; + + FOREACH_WORD_SEPARATOR(w, l, cursor, ";", state) { + char *item; + int k = 0; + + if (l < 2 || w[1] != '=') + return -EINVAL; + + item = strndup(w, l); + if (!item) + return -ENOMEM; + + switch (w[0]) { + + case 's': + seqnum_id_set = true; + k = sd_id128_from_string(w+2, &seqnum_id); + break; + + case 'i': + seqnum_set = true; + if (sscanf(w+2, "%llx", &seqnum) != 1) + k = -EINVAL; + break; + + case 'b': + boot_id_set = true; + k = sd_id128_from_string(w+2, &boot_id); + break; + + case 'm': + monotonic_set = true; + if (sscanf(w+2, "%llx", &monotonic) != 1) + k = -EINVAL; + break; + + case 't': + realtime_set = true; + if (sscanf(w+2, "%llx", &realtime) != 1) + k = -EINVAL; + break; + + case 'x': + xor_hash_set = true; + if (sscanf(w+2, "%llx", &xor_hash) != 1) + k = -EINVAL; + break; + } + + free(item); + + if (k < 0) + return k; + } + + if ((!seqnum_set || !seqnum_id_set) && + (!monotonic_set || !boot_id_set) && + !realtime_set) + return -EINVAL; + + reset_location(j); + + j->current_location.type = LOCATION_DISCRETE; + + if (realtime_set) { + j->current_location.realtime = (uint64_t) realtime; + j->current_location.realtime_set = true; + } + + if (seqnum_set && seqnum_id_set) { + j->current_location.seqnum = (uint64_t) seqnum; + j->current_location.seqnum_id = seqnum_id; + j->current_location.seqnum_set = true; + } + + if (monotonic_set && boot_id_set) { + j->current_location.monotonic = (uint64_t) monotonic; + j->current_location.boot_id = boot_id; + j->current_location.monotonic_set = true; + } + + if (xor_hash_set) { + j->current_location.xor_hash = (uint64_t) xor_hash; + j->current_location.xor_hash_set = true; + } + + return 0; +} + +_public_ int sd_journal_seek_monotonic_usec(sd_journal *j, sd_id128_t boot_id, uint64_t usec) { + if (!j) + return -EINVAL; + + reset_location(j); + j->current_location.type = LOCATION_DISCRETE; + j->current_location.boot_id = boot_id; + j->current_location.monotonic = usec; + j->current_location.monotonic_set = true; + + return 0; +} + +_public_ int sd_journal_seek_realtime_usec(sd_journal *j, uint64_t usec) { + if (!j) + return -EINVAL; + + reset_location(j); + j->current_location.type = LOCATION_DISCRETE; + j->current_location.realtime = usec; + j->current_location.realtime_set = true; + + return 0; +} + +_public_ int sd_journal_seek_head(sd_journal *j) { + if (!j) + return -EINVAL; + + reset_location(j); + j->current_location.type = LOCATION_HEAD; + + return 0; +} + +_public_ int sd_journal_seek_tail(sd_journal *j) { + if (!j) + return -EINVAL; + + reset_location(j); + j->current_location.type = LOCATION_TAIL; + + return 0; +} + +static int add_file(sd_journal *j, const char *prefix, const char *dir, const char *filename) { + char *fn; + int r; + JournalFile *f; + + assert(j); + assert(prefix); + assert(filename); + + if ((j->flags & SD_JOURNAL_SYSTEM_ONLY) && + !startswith(filename, "system.journal")) + return 0; + + if (dir) + fn = join(prefix, "/", dir, "/", filename, NULL); + else + fn = join(prefix, "/", filename, NULL); + + if (!fn) + return -ENOMEM; + + if (hashmap_get(j->files, fn)) { + free(fn); + return 0; + } + + if (hashmap_size(j->files) >= JOURNAL_FILES_MAX) { + log_debug("Too many open journal files, not adding %s, ignoring.", fn); + free(fn); + return 0; + } + + r = journal_file_open(fn, O_RDONLY, 0, NULL, &f); + free(fn); + + if (r < 0) { + if (errno == ENOENT) + return 0; + + return r; + } + + /* journal_file_dump(f); */ + + r = hashmap_put(j->files, f->path, f); + if (r < 0) { + journal_file_close(f); + return r; + } + + log_debug("File %s got added.", f->path); + + return 0; +} + +static int remove_file(sd_journal *j, const char *prefix, const char *dir, const char *filename) { + char *fn; + JournalFile *f; + + assert(j); + assert(prefix); + assert(filename); + + if (dir) + fn = join(prefix, "/", dir, "/", filename, NULL); + else + fn = join(prefix, "/", filename, NULL); + + if (!fn) + return -ENOMEM; + + f = hashmap_get(j->files, fn); + free(fn); + + if (!f) + return 0; + + hashmap_remove(j->files, f->path); + journal_file_close(f); + + log_debug("File %s got removed.", f->path); + return 0; +} + +static int add_directory(sd_journal *j, const char *prefix, const char *dir) { + char *fn; + int r; + DIR *d; + int wd; + sd_id128_t id, mid; + + assert(j); + assert(prefix); + assert(dir); + + if ((j->flags & SD_JOURNAL_LOCAL_ONLY) && + (sd_id128_from_string(dir, &id) < 0 || + sd_id128_get_machine(&mid) < 0 || + !sd_id128_equal(id, mid))) + return 0; + + fn = join(prefix, "/", dir, NULL); + if (!fn) + return -ENOMEM; + + d = opendir(fn); + + if (!d) { + free(fn); + if (errno == ENOENT) + return 0; + + return -errno; + } + + wd = inotify_add_watch(j->inotify_fd, fn, + IN_CREATE|IN_MOVED_TO|IN_MODIFY|IN_ATTRIB|IN_DELETE| + IN_DELETE_SELF|IN_MOVE_SELF|IN_UNMOUNT| + IN_DONT_FOLLOW|IN_ONLYDIR); + if (wd > 0) { + if (hashmap_put(j->inotify_wd_dirs, INT_TO_PTR(wd), fn) < 0) + inotify_rm_watch(j->inotify_fd, wd); + else + fn = NULL; + } + + free(fn); + + for (;;) { + struct dirent buf, *de; + + r = readdir_r(d, &buf, &de); + if (r != 0 || !de) + break; + + if (!dirent_is_file_with_suffix(de, ".journal")) + continue; + + r = add_file(j, prefix, dir, de->d_name); + if (r < 0) + log_debug("Failed to add file %s/%s/%s: %s", prefix, dir, de->d_name, strerror(-r)); + } + + closedir(d); + + log_debug("Directory %s/%s got added.", prefix, dir); + + return 0; +} + +static void remove_directory_wd(sd_journal *j, int wd) { + char *p; + + assert(j); + assert(wd > 0); + + if (j->inotify_fd >= 0) + inotify_rm_watch(j->inotify_fd, wd); + + p = hashmap_remove(j->inotify_wd_dirs, INT_TO_PTR(wd)); + + if (p) { + log_debug("Directory %s got removed.", p); + free(p); + } +} + +static void add_root_wd(sd_journal *j, const char *p) { + int wd; + char *k; + + assert(j); + assert(p); + + wd = inotify_add_watch(j->inotify_fd, p, + IN_CREATE|IN_MOVED_TO|IN_MODIFY|IN_ATTRIB|IN_DELETE| + IN_DONT_FOLLOW|IN_ONLYDIR); + if (wd <= 0) + return; + + k = strdup(p); + if (!k || hashmap_put(j->inotify_wd_roots, INT_TO_PTR(wd), k) < 0) { + inotify_rm_watch(j->inotify_fd, wd); + free(k); + } +} + +static void remove_root_wd(sd_journal *j, int wd) { + char *p; + + assert(j); + assert(wd > 0); + + if (j->inotify_fd >= 0) + inotify_rm_watch(j->inotify_fd, wd); + + p = hashmap_remove(j->inotify_wd_roots, INT_TO_PTR(wd)); + + if (p) { + log_debug("Root %s got removed.", p); + free(p); + } +} + +_public_ int sd_journal_open(sd_journal **ret, int flags) { + sd_journal *j; + const char *p; + const char search_paths[] = + "/run/log/journal\0" + "/var/log/journal\0"; + int r; + + if (!ret) + return -EINVAL; + + if (flags & ~(SD_JOURNAL_LOCAL_ONLY| + SD_JOURNAL_RUNTIME_ONLY| + SD_JOURNAL_SYSTEM_ONLY)) + return -EINVAL; + + j = new0(sd_journal, 1); + if (!j) + return -ENOMEM; + + j->flags = flags; + + j->inotify_fd = inotify_init1(IN_NONBLOCK|IN_CLOEXEC); + if (j->inotify_fd < 0) { + r = -errno; + goto fail; + } + + j->files = hashmap_new(string_hash_func, string_compare_func); + if (!j->files) { + r = -ENOMEM; + goto fail; + } + + j->inotify_wd_dirs = hashmap_new(trivial_hash_func, trivial_compare_func); + j->inotify_wd_roots = hashmap_new(trivial_hash_func, trivial_compare_func); + + if (!j->inotify_wd_dirs || !j->inotify_wd_roots) { + r = -ENOMEM; + goto fail; + } + + /* We ignore most errors here, since the idea is to only open + * what's actually accessible, and ignore the rest. */ + + NULSTR_FOREACH(p, search_paths) { + DIR *d; + + if ((flags & SD_JOURNAL_RUNTIME_ONLY) && + !path_startswith(p, "/run")) + continue; + + d = opendir(p); + if (!d) { + if (errno != ENOENT) + log_debug("Failed to open %s: %m", p); + continue; + } + + add_root_wd(j, p); + + for (;;) { + struct dirent buf, *de; + sd_id128_t id; + + r = readdir_r(d, &buf, &de); + if (r != 0 || !de) + break; + + if (dirent_is_file_with_suffix(de, ".journal")) { + r = add_file(j, p, NULL, de->d_name); + if (r < 0) + log_debug("Failed to add file %s/%s: %s", p, de->d_name, strerror(-r)); + + } else if ((de->d_type == DT_DIR || de->d_type == DT_UNKNOWN) && + sd_id128_from_string(de->d_name, &id) >= 0) { + + r = add_directory(j, p, de->d_name); + if (r < 0) + log_debug("Failed to add directory %s/%s: %s", p, de->d_name, strerror(-r)); + } + } + + closedir(d); + } + + *ret = j; + return 0; + +fail: + sd_journal_close(j); + + return r; +}; + +_public_ void sd_journal_close(sd_journal *j) { + if (!j) + return; + + if (j->inotify_wd_dirs) { + void *k; + + while ((k = hashmap_first_key(j->inotify_wd_dirs))) + remove_directory_wd(j, PTR_TO_INT(k)); + + hashmap_free(j->inotify_wd_dirs); + } + + if (j->inotify_wd_roots) { + void *k; + + while ((k = hashmap_first_key(j->inotify_wd_roots))) + remove_root_wd(j, PTR_TO_INT(k)); + + hashmap_free(j->inotify_wd_roots); + } + + if (j->files) { + JournalFile *f; + + while ((f = hashmap_steal_first(j->files))) + journal_file_close(f); + + hashmap_free(j->files); + } + + sd_journal_flush_matches(j); + + if (j->inotify_fd >= 0) + close_nointr_nofail(j->inotify_fd); + + free(j); +} + +_public_ int sd_journal_get_realtime_usec(sd_journal *j, uint64_t *ret) { + Object *o; + JournalFile *f; + int r; + + if (!j) + return -EINVAL; + if (!ret) + return -EINVAL; + + f = j->current_file; + if (!f) + return -EADDRNOTAVAIL; + + if (f->current_offset <= 0) + return -EADDRNOTAVAIL; + + r = journal_file_move_to_object(f, OBJECT_ENTRY, f->current_offset, &o); + if (r < 0) + return r; + + *ret = le64toh(o->entry.realtime); + return 0; +} + +_public_ int sd_journal_get_monotonic_usec(sd_journal *j, uint64_t *ret, sd_id128_t *ret_boot_id) { + Object *o; + JournalFile *f; + int r; + sd_id128_t id; + + if (!j) + return -EINVAL; + if (!ret) + return -EINVAL; + + f = j->current_file; + if (!f) + return -EADDRNOTAVAIL; + + if (f->current_offset <= 0) + return -EADDRNOTAVAIL; + + r = journal_file_move_to_object(f, OBJECT_ENTRY, f->current_offset, &o); + if (r < 0) + return r; + + if (ret_boot_id) + *ret_boot_id = o->entry.boot_id; + else { + r = sd_id128_get_boot(&id); + if (r < 0) + return r; + + if (!sd_id128_equal(id, o->entry.boot_id)) + return -ESTALE; + } + + *ret = le64toh(o->entry.monotonic); + return 0; +} + +_public_ int sd_journal_get_data(sd_journal *j, const char *field, const void **data, size_t *size) { + JournalFile *f; + uint64_t i, n; + size_t field_length; + int r; + Object *o; + + if (!j) + return -EINVAL; + if (!field) + return -EINVAL; + if (!data) + return -EINVAL; + if (!size) + return -EINVAL; + + if (isempty(field) || strchr(field, '=')) + return -EINVAL; + + f = j->current_file; + if (!f) + return -EADDRNOTAVAIL; + + if (f->current_offset <= 0) + return -EADDRNOTAVAIL; + + r = journal_file_move_to_object(f, OBJECT_ENTRY, f->current_offset, &o); + if (r < 0) + return r; + + field_length = strlen(field); + + n = journal_file_entry_n_items(o); + for (i = 0; i < n; i++) { + uint64_t p, l; + le64_t le_hash; + size_t t; + + p = le64toh(o->entry.items[i].object_offset); + le_hash = o->entry.items[i].hash; + r = journal_file_move_to_object(f, OBJECT_DATA, p, &o); + if (r < 0) + return r; + + if (le_hash != o->data.hash) + return -EBADMSG; + + l = le64toh(o->object.size) - offsetof(Object, data.payload); + + if (o->object.flags & OBJECT_COMPRESSED) { + +#ifdef HAVE_XZ + if (uncompress_startswith(o->data.payload, l, + &f->compress_buffer, &f->compress_buffer_size, + field, field_length, '=')) { + + uint64_t rsize; + + if (!uncompress_blob(o->data.payload, l, + &f->compress_buffer, &f->compress_buffer_size, &rsize)) + return -EBADMSG; + + *data = f->compress_buffer; + *size = (size_t) rsize; + + return 0; + } +#else + return -EPROTONOSUPPORT; +#endif + + } else if (l >= field_length+1 && + memcmp(o->data.payload, field, field_length) == 0 && + o->data.payload[field_length] == '=') { + + t = (size_t) l; + + if ((uint64_t) t != l) + return -E2BIG; + + *data = o->data.payload; + *size = t; + + return 0; + } + + r = journal_file_move_to_object(f, OBJECT_ENTRY, f->current_offset, &o); + if (r < 0) + return r; + } + + return -ENOENT; +} + +_public_ int sd_journal_enumerate_data(sd_journal *j, const void **data, size_t *size) { + JournalFile *f; + uint64_t p, l, n; + le64_t le_hash; + int r; + Object *o; + size_t t; + + if (!j) + return -EINVAL; + if (!data) + return -EINVAL; + if (!size) + return -EINVAL; + + f = j->current_file; + if (!f) + return -EADDRNOTAVAIL; + + if (f->current_offset <= 0) + return -EADDRNOTAVAIL; + + r = journal_file_move_to_object(f, OBJECT_ENTRY, f->current_offset, &o); + if (r < 0) + return r; + + n = journal_file_entry_n_items(o); + if (j->current_field >= n) + return 0; + + p = le64toh(o->entry.items[j->current_field].object_offset); + le_hash = o->entry.items[j->current_field].hash; + r = journal_file_move_to_object(f, OBJECT_DATA, p, &o); + if (r < 0) + return r; + + if (le_hash != o->data.hash) + return -EBADMSG; + + l = le64toh(o->object.size) - offsetof(Object, data.payload); + t = (size_t) l; + + /* We can't read objects larger than 4G on a 32bit machine */ + if ((uint64_t) t != l) + return -E2BIG; + + if (o->object.flags & OBJECT_COMPRESSED) { +#ifdef HAVE_XZ + uint64_t rsize; + + if (!uncompress_blob(o->data.payload, l, &f->compress_buffer, &f->compress_buffer_size, &rsize)) + return -EBADMSG; + + *data = f->compress_buffer; + *size = (size_t) rsize; +#else + return -EPROTONOSUPPORT; +#endif + } else { + *data = o->data.payload; + *size = t; + } + + j->current_field ++; + + return 1; +} + +_public_ void sd_journal_restart_data(sd_journal *j) { + if (!j) + return; + + j->current_field = 0; +} + +_public_ int sd_journal_get_fd(sd_journal *j) { + if (!j) + return -EINVAL; + + return j->inotify_fd; +} + +static void process_inotify_event(sd_journal *j, struct inotify_event *e) { + char *p; + int r; + + assert(j); + assert(e); + + /* Is this a subdirectory we watch? */ + p = hashmap_get(j->inotify_wd_dirs, INT_TO_PTR(e->wd)); + if (p) { + + if (!(e->mask & IN_ISDIR) && e->len > 0 && endswith(e->name, ".journal")) { + + /* Event for a journal file */ + + if (e->mask & (IN_CREATE|IN_MOVED_TO|IN_MODIFY|IN_ATTRIB)) { + r = add_file(j, p, NULL, e->name); + if (r < 0) + log_debug("Failed to add file %s/%s: %s", p, e->name, strerror(-r)); + } else if (e->mask & (IN_DELETE|IN_UNMOUNT)) { + + r = remove_file(j, p, NULL, e->name); + if (r < 0) + log_debug("Failed to remove file %s/%s: %s", p, e->name, strerror(-r)); + } + + } else if (e->len == 0) { + + /* Event for the directory itself */ + + if (e->mask & (IN_DELETE_SELF|IN_MOVE_SELF|IN_UNMOUNT)) + remove_directory_wd(j, e->wd); + } + + return; + } + + /* Must be the root directory then? */ + p = hashmap_get(j->inotify_wd_roots, INT_TO_PTR(e->wd)); + if (p) { + sd_id128_t id; + + if (!(e->mask & IN_ISDIR) && e->len > 0 && endswith(e->name, ".journal")) { + + /* Event for a journal file */ + + if (e->mask & (IN_CREATE|IN_MOVED_TO|IN_MODIFY|IN_ATTRIB)) { + r = add_file(j, p, NULL, e->name); + if (r < 0) + log_debug("Failed to add file %s/%s: %s", p, e->name, strerror(-r)); + } else if (e->mask & (IN_DELETE|IN_UNMOUNT)) { + + r = remove_file(j, p, NULL, e->name); + if (r < 0) + log_debug("Failed to remove file %s/%s: %s", p, e->name, strerror(-r)); + } + + } else if ((e->mask & IN_ISDIR) && e->len > 0 && sd_id128_from_string(e->name, &id) >= 0) { + + /* Event for subdirectory */ + + if (e->mask & (IN_CREATE|IN_MOVED_TO|IN_MODIFY|IN_ATTRIB)) { + + r = add_directory(j, p, e->name); + if (r < 0) + log_debug("Failed to add directory %s/%s: %s", p, e->name, strerror(-r)); + } + } + + return; + } + + if (e->mask & IN_IGNORED) + return; + + log_warning("Unknown inotify event."); +} + +_public_ int sd_journal_process(sd_journal *j) { + uint8_t buffer[sizeof(struct inotify_event) + FILENAME_MAX]; + + if (!j) + return -EINVAL; + + for (;;) { + struct inotify_event *e; + ssize_t l; + + l = read(j->inotify_fd, buffer, sizeof(buffer)); + if (l < 0) { + if (errno == EINTR || errno == EAGAIN) + return 0; + + return -errno; + } + + e = (struct inotify_event*) buffer; + while (l > 0) { + size_t step; + + process_inotify_event(j, e); + + step = sizeof(struct inotify_event) + e->len; + assert(step <= (size_t) l); + + e = (struct inotify_event*) ((uint8_t*) e + step); + l -= step; + } + } +} + +/* _public_ int sd_journal_query_unique(sd_journal *j, const char *field) { */ +/* if (!j) */ +/* return -EINVAL; */ +/* if (!field) */ +/* return -EINVAL; */ + +/* return -ENOTSUP; */ +/* } */ + +/* _public_ int sd_journal_enumerate_unique(sd_journal *j, const void **data, size_t *l) { */ +/* if (!j) */ +/* return -EINVAL; */ +/* if (!data) */ +/* return -EINVAL; */ +/* if (!l) */ +/* return -EINVAL; */ + +/* return -ENOTSUP; */ +/* } */ + +/* _public_ void sd_journal_restart_unique(sd_journal *j) { */ +/* if (!j) */ +/* return; */ +/* } */ diff --git a/src/journal/sparse-endian.h b/src/journal/sparse-endian.h new file mode 100644 index 0000000000..eb4dbf3615 --- /dev/null +++ b/src/journal/sparse-endian.h @@ -0,0 +1,87 @@ +/* Copyright (c) 2012 Josh Triplett <josh@joshtriplett.org> + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ +#ifndef SPARSE_ENDIAN_H +#define SPARSE_ENDIAN_H + +#include <endian.h> +#include <stdint.h> + +#ifdef __CHECKER__ +#define __bitwise __attribute__((bitwise)) +#define __force __attribute__((force)) +#else +#define __bitwise +#define __force +#endif + +typedef uint16_t __bitwise le16_t; +typedef uint16_t __bitwise be16_t; +typedef uint32_t __bitwise le32_t; +typedef uint32_t __bitwise be32_t; +typedef uint64_t __bitwise le64_t; +typedef uint64_t __bitwise be64_t; + +#undef htobe16 +#undef htole16 +#undef be16toh +#undef le16toh +#undef htobe32 +#undef htole32 +#undef be32toh +#undef le32toh +#undef htobe64 +#undef htole64 +#undef be64toh +#undef le64toh + +#if __BYTE_ORDER == __LITTLE_ENDIAN +#define bswap_16_on_le(x) __bswap_16(x) +#define bswap_32_on_le(x) __bswap_32(x) +#define bswap_64_on_le(x) __bswap_64(x) +#define bswap_16_on_be(x) (x) +#define bswap_32_on_be(x) (x) +#define bswap_64_on_be(x) (x) +#elif __BYTE_ORDER == __BIG_ENDIAN +#define bswap_16_on_le(x) (x) +#define bswap_32_on_le(x) (x) +#define bswap_64_on_le(x) (x) +#define bswap_16_on_be(x) __bswap_16(x) +#define bswap_32_on_be(x) __bswap_32(x) +#define bswap_64_on_be(x) __bswap_64(x) +#endif + +static inline le16_t htole16(uint16_t value) { return (le16_t __force) bswap_16_on_be(value); } +static inline le32_t htole32(uint32_t value) { return (le32_t __force) bswap_32_on_be(value); } +static inline le64_t htole64(uint64_t value) { return (le64_t __force) bswap_64_on_be(value); } + +static inline be16_t htobe16(uint16_t value) { return (be16_t __force) bswap_16_on_le(value); } +static inline be32_t htobe32(uint32_t value) { return (be32_t __force) bswap_32_on_le(value); } +static inline be64_t htobe64(uint64_t value) { return (be64_t __force) bswap_64_on_le(value); } + +static inline uint16_t le16toh(le16_t value) { return bswap_16_on_be((uint16_t __force)value); } +static inline uint32_t le32toh(le32_t value) { return bswap_32_on_be((uint32_t __force)value); } +static inline uint64_t le64toh(le64_t value) { return bswap_64_on_be((uint64_t __force)value); } + +static inline uint16_t be16toh(be16_t value) { return bswap_16_on_le((uint16_t __force)value); } +static inline uint32_t be32toh(be32_t value) { return bswap_32_on_le((uint32_t __force)value); } +static inline uint64_t be64toh(be64_t value) { return bswap_64_on_le((uint64_t __force)value); } + +#endif /* SPARSE_ENDIAN_H */ diff --git a/src/journal/test-journal-send.c b/src/journal/test-journal-send.c new file mode 100644 index 0000000000..2f9987dcde --- /dev/null +++ b/src/journal/test-journal-send.c @@ -0,0 +1,32 @@ +/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/ + +/*** + This file is part of systemd. + + Copyright 2011 Lennart Poettering + + systemd is free software; you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + systemd is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with systemd; If not, see <http://www.gnu.org/licenses/>. +***/ + +#include <systemd/sd-journal.h> + +int main(int argc, char *argv[]) { + sd_journal_print(LOG_INFO, "piepapo"); + + sd_journal_send("MESSAGE=foobar", + "VALUE=%i", 7, + NULL); + + return 0; +} diff --git a/src/journal/test-journal.c b/src/journal/test-journal.c new file mode 100644 index 0000000000..a023509b70 --- /dev/null +++ b/src/journal/test-journal.c @@ -0,0 +1,120 @@ +/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/ + +/*** + This file is part of systemd. + + Copyright 2011 Lennart Poettering + + systemd is free software; you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + systemd is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with systemd; If not, see <http://www.gnu.org/licenses/>. +***/ + +#include <fcntl.h> +#include <unistd.h> + +#include <systemd/sd-journal.h> + +#include "journal-file.h" +#include "log.h" + +int main(int argc, char *argv[]) { + dual_timestamp ts; + JournalFile *f; + struct iovec iovec; + static const char test[] = "test", test2[] = "test2"; + Object *o; + uint64_t p; + + log_set_max_level(LOG_DEBUG); + + unlink("test.journal"); + + assert_se(journal_file_open("test.journal", O_RDWR|O_CREAT, 0666, NULL, &f) == 0); + + dual_timestamp_get(&ts); + + iovec.iov_base = (void*) test; + iovec.iov_len = strlen(test); + assert_se(journal_file_append_entry(f, &ts, &iovec, 1, NULL, NULL, NULL) == 0); + + iovec.iov_base = (void*) test2; + iovec.iov_len = strlen(test2); + assert_se(journal_file_append_entry(f, &ts, &iovec, 1, NULL, NULL, NULL) == 0); + + iovec.iov_base = (void*) test; + iovec.iov_len = strlen(test); + assert_se(journal_file_append_entry(f, &ts, &iovec, 1, NULL, NULL, NULL) == 0); + + journal_file_dump(f); + + assert(journal_file_next_entry(f, NULL, 0, DIRECTION_DOWN, &o, &p) == 1); + assert(le64toh(o->entry.seqnum) == 1); + + assert(journal_file_next_entry(f, o, p, DIRECTION_DOWN, &o, &p) == 1); + assert(le64toh(o->entry.seqnum) == 2); + + assert(journal_file_next_entry(f, o, p, DIRECTION_DOWN, &o, &p) == 1); + assert(le64toh(o->entry.seqnum) == 3); + + assert(journal_file_next_entry(f, o, p, DIRECTION_DOWN, &o, &p) == 0); + + assert(journal_file_next_entry(f, NULL, 0, DIRECTION_DOWN, &o, &p) == 1); + assert(le64toh(o->entry.seqnum) == 1); + + assert(journal_file_skip_entry(f, o, p, 2, &o, &p) == 1); + assert(le64toh(o->entry.seqnum) == 3); + + assert(journal_file_skip_entry(f, o, p, -2, &o, &p) == 1); + assert(le64toh(o->entry.seqnum) == 1); + + assert(journal_file_skip_entry(f, o, p, -2, &o, &p) == 1); + assert(le64toh(o->entry.seqnum) == 1); + + assert(journal_file_find_data_object(f, test, strlen(test), NULL, &p) == 1); + assert(journal_file_next_entry_for_data(f, NULL, 0, p, DIRECTION_DOWN, &o, NULL) == 1); + assert(le64toh(o->entry.seqnum) == 1); + + assert(journal_file_next_entry_for_data(f, NULL, 0, p, DIRECTION_UP, &o, NULL) == 1); + assert(le64toh(o->entry.seqnum) == 3); + + assert(journal_file_find_data_object(f, test2, strlen(test2), NULL, &p) == 1); + assert(journal_file_next_entry_for_data(f, NULL, 0, p, DIRECTION_UP, &o, NULL) == 1); + assert(le64toh(o->entry.seqnum) == 2); + + assert(journal_file_next_entry_for_data(f, NULL, 0, p, DIRECTION_DOWN, &o, NULL) == 1); + assert(le64toh(o->entry.seqnum) == 2); + + assert(journal_file_find_data_object(f, "quux", 4, NULL, &p) == 0); + + assert(journal_file_move_to_entry_by_seqnum(f, 1, DIRECTION_DOWN, &o, NULL) == 1); + assert(le64toh(o->entry.seqnum) == 1); + + assert(journal_file_move_to_entry_by_seqnum(f, 3, DIRECTION_DOWN, &o, NULL) == 1); + assert(le64toh(o->entry.seqnum) == 3); + + assert(journal_file_move_to_entry_by_seqnum(f, 2, DIRECTION_DOWN, &o, NULL) == 1); + assert(le64toh(o->entry.seqnum) == 2); + + assert(journal_file_move_to_entry_by_seqnum(f, 10, DIRECTION_DOWN, &o, NULL) == 0); + + journal_file_rotate(&f); + journal_file_rotate(&f); + + journal_file_close(f); + + journal_directory_vacuum(".", 3000000, 0); + + log_error("Exiting..."); + + return 0; +} |