diff options
author | Lennart Poettering <lennart@poettering.net> | 2011-12-31 00:59:37 +0100 |
---|---|---|
committer | Lennart Poettering <lennart@poettering.net> | 2011-12-31 00:59:37 +0100 |
commit | dc1ecd78e9f046880d10ddb45cf9b06df1084b10 (patch) | |
tree | 696491346a7fa69f8620a99cb7c0d2d87f2da7cf /src/journal | |
parent | 69b1c67463de64fb32ba75f586f0c9d2716f3e02 (diff) | |
parent | 74ef2d16ada74db3059d825ce8d24ea74946bf8f (diff) |
Merge branch 'journal'
Diffstat (limited to 'src/journal')
l--------- | src/journal/Makefile | 1 | ||||
-rw-r--r-- | src/journal/compress.c | 208 | ||||
-rw-r--r-- | src/journal/compress.h | 38 | ||||
-rw-r--r-- | src/journal/journal-def.h | 164 | ||||
-rw-r--r-- | src/journal/journal-file.c | 2078 | ||||
-rw-r--r-- | src/journal/journal-file.h | 127 | ||||
-rw-r--r-- | src/journal/journal-internal.h | 83 | ||||
-rw-r--r-- | src/journal/journal-rate-limit.c | 275 | ||||
-rw-r--r-- | src/journal/journal-rate-limit.h | 34 | ||||
-rw-r--r-- | src/journal/journal-send.c | 259 | ||||
-rw-r--r-- | src/journal/journalctl.c | 544 | ||||
-rw-r--r-- | src/journal/journald.c | 1909 | ||||
-rw-r--r-- | src/journal/lookup3.c | 1003 | ||||
-rw-r--r-- | src/journal/lookup3.h | 25 | ||||
-rw-r--r-- | src/journal/sd-journal.c | 1573 | ||||
-rw-r--r-- | src/journal/sd-journal.h | 125 | ||||
-rw-r--r-- | src/journal/test-journal.c | 119 |
17 files changed, 8565 insertions, 0 deletions
diff --git a/src/journal/Makefile b/src/journal/Makefile new file mode 120000 index 0000000000..d0b0e8e008 --- /dev/null +++ b/src/journal/Makefile @@ -0,0 +1 @@ +../Makefile
\ No newline at end of file diff --git a/src/journal/compress.c b/src/journal/compress.c new file mode 100644 index 0000000000..ff906581f0 --- /dev/null +++ b/src/journal/compress.c @@ -0,0 +1,208 @@ +/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/ + +/*** + This file is part of systemd. + + Copyright 2011 Lennart Poettering + + systemd is free software; you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + systemd is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with systemd; If not, see <http://www.gnu.org/licenses/>. +***/ + +#include <assert.h> +#include <stdlib.h> +#include <string.h> +#include <lzma.h> + +#include "compress.h" + +bool compress_blob(const void *src, uint64_t src_size, void *dst, uint64_t *dst_size) { + lzma_stream s = LZMA_STREAM_INIT; + lzma_ret ret; + bool b = false; + + assert(src); + assert(src_size > 0); + assert(dst); + assert(dst_size); + + /* Returns false if we couldn't compress the data or the + * compressed result is longer than the original */ + + ret = lzma_easy_encoder(&s, LZMA_PRESET_DEFAULT, LZMA_CHECK_NONE); + if (ret != LZMA_OK) + return false; + + s.next_in = src; + s.avail_in = src_size; + s.next_out = dst; + s.avail_out = src_size; + + /* Does it fit? */ + if (lzma_code(&s, LZMA_FINISH) != LZMA_STREAM_END) + goto fail; + + /* Is it actually shorter? */ + if (s.avail_out == 0) + goto fail; + + *dst_size = src_size - s.avail_out; + b = true; + +fail: + lzma_end(&s); + + return b; +} + +bool uncompress_blob(const void *src, uint64_t src_size, + void **dst, uint64_t *dst_alloc_size, uint64_t* dst_size) { + + lzma_stream s = LZMA_STREAM_INIT; + lzma_ret ret; + bool b = false; + + assert(src); + assert(src_size > 0); + assert(dst); + assert(dst_alloc_size); + assert(dst_size); + assert(*dst_alloc_size == 0 || *dst); + + ret = lzma_stream_decoder(&s, UINT64_MAX, 0); + if (ret != LZMA_OK) + return false; + + if (*dst_alloc_size <= src_size) { + void *p; + + p = realloc(*dst, src_size*2); + if (!p) + return false; + + *dst = p; + *dst_alloc_size = src_size*2; + } + + s.next_in = src; + s.avail_in = src_size; + + s.next_out = *dst; + s.avail_out = *dst_alloc_size; + + for (;;) { + void *p; + + ret = lzma_code(&s, LZMA_FINISH); + + if (ret == LZMA_STREAM_END) + break; + + if (ret != LZMA_OK) + goto fail; + + p = realloc(*dst, *dst_alloc_size*2); + if (!p) + goto fail; + + s.next_out = (uint8_t*) p + ((uint8_t*) s.next_out - (uint8_t*) *dst); + s.avail_out += *dst_alloc_size; + + *dst = p; + *dst_alloc_size *= 2; + } + + *dst_size = *dst_alloc_size - s.avail_out; + b = true; + +fail: + lzma_end(&s); + + return b; +} + +bool uncompress_startswith(const void *src, uint64_t src_size, + void **buffer, uint64_t *buffer_size, + const void *prefix, uint64_t prefix_len, + uint8_t extra) { + + lzma_stream s = LZMA_STREAM_INIT; + lzma_ret ret; + bool b = false; + + /* Checks whether the uncompressed blob starts with the + * mentioned prefix. The byte extra needs to follow the + * prefix */ + + assert(src); + assert(src_size > 0); + assert(buffer); + assert(buffer_size); + assert(prefix); + assert(*buffer_size == 0 || *buffer); + + ret = lzma_stream_decoder(&s, UINT64_MAX, 0); + if (ret != LZMA_OK) + return false; + + if (*buffer_size <= prefix_len) { + void *p; + + p = realloc(*buffer, prefix_len*2); + if (!p) + return false; + + *buffer = p; + *buffer_size = prefix_len*2; + } + + s.next_in = src; + s.avail_in = src_size; + + s.next_out = *buffer; + s.avail_out = *buffer_size; + + for (;;) { + void *p; + + ret = lzma_code(&s, LZMA_FINISH); + + if (ret != LZMA_STREAM_END && ret != LZMA_OK) + goto fail; + + if ((*buffer_size - s.avail_out > prefix_len) && + memcmp(*buffer, prefix, prefix_len) == 0 && + ((const uint8_t*) *buffer)[prefix_len] == extra) + break; + + if (ret == LZMA_STREAM_END) + goto fail; + + p = realloc(*buffer, *buffer_size*2); + if (!p) + goto fail; + + s.next_out = (uint8_t*) p + ((uint8_t*) s.next_out - (uint8_t*) *buffer); + s.avail_out += *buffer_size; + + *buffer = p; + *buffer_size *= 2; + } + + b = true; + +fail: + lzma_end(&s); + + return b; +} diff --git a/src/journal/compress.h b/src/journal/compress.h new file mode 100644 index 0000000000..f187a6e00c --- /dev/null +++ b/src/journal/compress.h @@ -0,0 +1,38 @@ +/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/ + +#ifndef foocompresshfoo +#define foocompresshfoo + +/*** + This file is part of systemd. + + Copyright 2011 Lennart Poettering + + systemd is free software; you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + systemd is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with systemd; If not, see <http://www.gnu.org/licenses/>. +***/ + +#include <inttypes.h> +#include <stdbool.h> + +bool compress_blob(const void *src, uint64_t src_size, void *dst, uint64_t *dst_size); + +bool uncompress_blob(const void *src, uint64_t src_size, + void **dst, uint64_t *dst_alloc_size, uint64_t* dst_size); + +bool uncompress_startswith(const void *src, uint64_t src_size, + void **buffer, uint64_t *buffer_size, + const void *prefix, uint64_t prefix_len, + uint8_t extra); + +#endif diff --git a/src/journal/journal-def.h b/src/journal/journal-def.h new file mode 100644 index 0000000000..ef0cb6dae6 --- /dev/null +++ b/src/journal/journal-def.h @@ -0,0 +1,164 @@ +/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/ + +#ifndef foojournaldefhfoo +#define foojournaldefhfoo + +/*** + This file is part of systemd. + + Copyright 2011 Lennart Poettering + + systemd is free software; you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + systemd is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with systemd; If not, see <http://www.gnu.org/licenses/>. +***/ + +#include <inttypes.h> + +#include "macro.h" +#include "sd-id128.h" + +typedef struct Header Header; +typedef struct ObjectHeader ObjectHeader; +typedef union Object Object; +typedef struct DataObject DataObject; +typedef struct FieldObject FieldObject; +typedef struct EntryObject EntryObject; +typedef struct HashTableObject HashTableObject; +typedef struct EntryArrayObject EntryArrayObject; +typedef struct EntryItem EntryItem; +typedef struct HashItem HashItem; + +/* Object types */ +enum { + OBJECT_UNUSED, + OBJECT_DATA, + OBJECT_FIELD, + OBJECT_ENTRY, + OBJECT_DATA_HASH_TABLE, + OBJECT_FIELD_HASH_TABLE, + OBJECT_ENTRY_ARRAY, + _OBJECT_TYPE_MAX +}; + +/* Object flags */ +enum { + OBJECT_COMPRESSED = 1 +}; + +_packed_ struct ObjectHeader { + uint8_t type; + uint8_t flags; + uint8_t reserved[6]; + uint64_t size; + uint8_t payload[]; +}; + +_packed_ struct DataObject { + ObjectHeader object; + uint64_t hash; + uint64_t next_hash_offset; + uint64_t next_field_offset; + uint64_t entry_offset; /* the first array entry we store inline */ + uint64_t entry_array_offset; + uint64_t n_entries; + uint8_t payload[]; +}; + +_packed_ struct FieldObject { + ObjectHeader object; + uint64_t hash; + uint64_t next_hash_offset; + uint64_t head_data_offset; + uint64_t tail_data_offset; + uint8_t payload[]; +}; + +_packed_ struct EntryItem { + uint64_t object_offset; + uint64_t hash; +}; + +_packed_ struct EntryObject { + ObjectHeader object; + uint64_t seqnum; + uint64_t realtime; + uint64_t monotonic; + sd_id128_t boot_id; + uint64_t xor_hash; + EntryItem items[]; +}; + +_packed_ struct HashItem { + uint64_t head_hash_offset; + uint64_t tail_hash_offset; +}; + +_packed_ struct HashTableObject { + ObjectHeader object; + HashItem items[]; +}; + +_packed_ struct EntryArrayObject { + ObjectHeader object; + uint64_t next_entry_array_offset; + uint64_t items[]; +}; + +union Object { + ObjectHeader object; + DataObject data; + FieldObject field; + EntryObject entry; + HashTableObject hash_table; + EntryArrayObject entry_array; +}; + +enum { + STATE_OFFLINE, + STATE_ONLINE, + STATE_ARCHIVED +}; + +/* Header flags */ +enum { + HEADER_INCOMPATIBLE_COMPRESSED = 1 +}; + +_packed_ struct Header { + uint8_t signature[8]; /* "LPKSHHRH" */ + uint32_t compatible_flags; + uint32_t incompatible_flags; + uint8_t state; + uint8_t reserved[7]; + sd_id128_t file_id; + sd_id128_t machine_id; + sd_id128_t boot_id; + sd_id128_t seqnum_id; + uint64_t arena_offset; + uint64_t arena_size; + uint64_t data_hash_table_offset; /* for looking up data objects */ + uint64_t data_hash_table_size; + uint64_t field_hash_table_offset; /* for looking up field objects */ + uint64_t field_hash_table_size; + uint64_t tail_object_offset; + uint64_t n_objects; + uint64_t n_entries; + uint64_t seqnum; + uint64_t first_seqnum; + uint64_t entry_array_offset; + uint64_t head_entry_realtime; + uint64_t tail_entry_realtime; + uint64_t tail_entry_monotonic; +}; + +#endif diff --git a/src/journal/journal-file.c b/src/journal/journal-file.c new file mode 100644 index 0000000000..190bfb996b --- /dev/null +++ b/src/journal/journal-file.c @@ -0,0 +1,2078 @@ +/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/ + +/*** + This file is part of systemd. + + Copyright 2011 Lennart Poettering + + systemd is free software; you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + systemd is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with systemd; If not, see <http://www.gnu.org/licenses/>. +***/ + +#include <sys/mman.h> +#include <errno.h> +#include <sys/uio.h> +#include <unistd.h> +#include <sys/statvfs.h> +#include <fcntl.h> +#include <stddef.h> + +#include "journal-def.h" +#include "journal-file.h" +#include "lookup3.h" +#include "compress.h" + +#define DEFAULT_DATA_HASH_TABLE_SIZE (2047ULL*16ULL) +#define DEFAULT_FIELD_HASH_TABLE_SIZE (2047ULL*16ULL) + +#define DEFAULT_WINDOW_SIZE (128ULL*1024ULL*1024ULL) + +#define COMPRESSION_SIZE_THRESHOLD (64ULL) + +static const char signature[] = { 'L', 'P', 'K', 'S', 'H', 'H', 'R', 'H' }; + +#define ALIGN64(x) (((x) + 7ULL) & ~7ULL) + +void journal_file_close(JournalFile *f) { + int t; + + assert(f); + + if (f->header && f->writable) + f->header->state = STATE_OFFLINE; + + + for (t = 0; t < _WINDOW_MAX; t++) + if (f->windows[t].ptr) + munmap(f->windows[t].ptr, f->windows[t].size); + + if (f->fd >= 0) + close_nointr_nofail(f->fd); + + free(f->path); + +#ifdef HAVE_XZ + free(f->compress_buffer); +#endif + + free(f); +} + +static int journal_file_init_header(JournalFile *f, JournalFile *template) { + Header h; + ssize_t k; + int r; + + assert(f); + + zero(h); + memcpy(h.signature, signature, 8); + h.arena_offset = htole64(ALIGN64(sizeof(h))); + + r = sd_id128_randomize(&h.file_id); + if (r < 0) + return r; + + if (template) { + h.seqnum_id = template->header->seqnum_id; + h.seqnum = template->header->seqnum; + } else + h.seqnum_id = h.file_id; + + k = pwrite(f->fd, &h, sizeof(h), 0); + if (k < 0) + return -errno; + + if (k != sizeof(h)) + return -EIO; + + return 0; +} + +static int journal_file_refresh_header(JournalFile *f) { + int r; + sd_id128_t boot_id; + + assert(f); + + r = sd_id128_get_machine(&f->header->machine_id); + if (r < 0) + return r; + + r = sd_id128_get_boot(&boot_id); + if (r < 0) + return r; + + if (sd_id128_equal(boot_id, f->header->boot_id)) + f->tail_entry_monotonic_valid = true; + + f->header->boot_id = boot_id; + + f->header->state = STATE_ONLINE; + return 0; +} + +static int journal_file_verify_header(JournalFile *f) { + assert(f); + + if (memcmp(f->header, signature, 8)) + return -EBADMSG; + +#ifdef HAVE_XZ + if ((le64toh(f->header->incompatible_flags) & ~HEADER_INCOMPATIBLE_COMPRESSED) != 0) + return -EPROTONOSUPPORT; +#else + if (f->header->incompatible_flags != 0) + return -EPROTONOSUPPORT; +#endif + + if ((uint64_t) f->last_stat.st_size < (le64toh(f->header->arena_offset) + le64toh(f->header->arena_size))) + return -ENODATA; + + if (f->writable) { + uint32_t state; + sd_id128_t machine_id; + int r; + + r = sd_id128_get_machine(&machine_id); + if (r < 0) + return r; + + if (!sd_id128_equal(machine_id, f->header->machine_id)) + return -EHOSTDOWN; + + state = f->header->state; + + if (state == STATE_ONLINE) + log_debug("Journal file %s is already online. Assuming unclean closing. Ignoring.", f->path); + else if (state == STATE_ARCHIVED) + return -ESHUTDOWN; + else if (state != STATE_OFFLINE) + log_debug("Journal file %s has unknown state %u. Ignoring.", f->path, state); + } + + return 0; +} + +static int journal_file_allocate(JournalFile *f, uint64_t offset, uint64_t size) { + uint64_t old_size, new_size; + + assert(f); + + /* We assume that this file is not sparse, and we know that + * for sure, since we always call posix_fallocate() + * ourselves */ + + old_size = + le64toh(f->header->arena_offset) + + le64toh(f->header->arena_size); + + new_size = PAGE_ALIGN(offset + size); + if (new_size < le64toh(f->header->arena_offset)) + new_size = le64toh(f->header->arena_offset); + + if (new_size <= old_size) + return 0; + + if (f->metrics.max_size > 0 && + new_size > f->metrics.max_size) + return -E2BIG; + + if (new_size > f->metrics.min_size && + f->metrics.keep_free > 0) { + struct statvfs svfs; + + if (fstatvfs(f->fd, &svfs) >= 0) { + uint64_t available; + + available = svfs.f_bfree * svfs.f_bsize; + + if (available >= f->metrics.keep_free) + available -= f->metrics.keep_free; + else + available = 0; + + if (new_size - old_size > available) + return -E2BIG; + } + } + + /* Note that the glibc fallocate() fallback is very + inefficient, hence we try to minimize the allocation area + as we can. */ + if (posix_fallocate(f->fd, old_size, new_size - old_size) < 0) + return -errno; + + if (fstat(f->fd, &f->last_stat) < 0) + return -errno; + + f->header->arena_size = new_size - htole64(f->header->arena_offset); + + return 0; +} + +static int journal_file_map( + JournalFile *f, + uint64_t offset, + uint64_t size, + void **_window, + uint64_t *_woffset, + uint64_t *_wsize, + void **ret) { + + uint64_t woffset, wsize; + void *window; + + assert(f); + assert(size > 0); + assert(ret); + + woffset = offset & ~((uint64_t) page_size() - 1ULL); + wsize = size + (offset - woffset); + wsize = PAGE_ALIGN(wsize); + + /* Avoid SIGBUS on invalid accesses */ + if (woffset + wsize > (uint64_t) PAGE_ALIGN(f->last_stat.st_size)) + return -EADDRNOTAVAIL; + + window = mmap(NULL, wsize, f->prot, MAP_SHARED, f->fd, woffset); + if (window == MAP_FAILED) + return -errno; + + if (_window) + *_window = window; + + if (_woffset) + *_woffset = woffset; + + if (_wsize) + *_wsize = wsize; + + *ret = (uint8_t*) window + (offset - woffset); + + return 0; +} + +static int journal_file_move_to(JournalFile *f, int wt, uint64_t offset, uint64_t size, void **ret) { + void *p; + uint64_t delta; + int r; + Window *w; + + assert(f); + assert(ret); + assert(wt >= 0); + assert(wt < _WINDOW_MAX); + + w = f->windows + wt; + + if (_likely_(w->ptr && + w->offset <= offset && + w->offset + w->size >= offset + size)) { + + *ret = (uint8_t*) w->ptr + (offset - w->offset); + return 0; + } + + if (w->ptr) { + if (munmap(w->ptr, w->size) < 0) + return -errno; + + w->ptr = NULL; + w->size = w->offset = 0; + } + + if (size < DEFAULT_WINDOW_SIZE) { + /* If the default window size is larger then what was + * asked for extend the mapping a bit in the hope to + * minimize needed remappings later on. We add half + * the window space before and half behind the + * requested mapping */ + + delta = PAGE_ALIGN((DEFAULT_WINDOW_SIZE - size) / 2); + + if (offset < delta) + delta = offset; + + offset -= delta; + size += (DEFAULT_WINDOW_SIZE - delta); + } else + delta = 0; + + if (offset > (uint64_t) f->last_stat.st_size) + return -EADDRNOTAVAIL; + + if (offset + size > (uint64_t) f->last_stat.st_size) + size = PAGE_ALIGN((uint64_t) f->last_stat.st_size - offset); + + if (size <= 0) + return -EADDRNOTAVAIL; + + r = journal_file_map(f, + offset, size, + &w->ptr, &w->offset, &w->size, + &p); + + if (r < 0) + return r; + + *ret = (uint8_t*) p + delta; + return 0; +} + +static bool verify_hash(Object *o) { + uint64_t h1, h2; + + assert(o); + + if (o->object.type == OBJECT_DATA && !(o->object.flags & OBJECT_COMPRESSED)) { + h1 = le64toh(o->data.hash); + h2 = hash64(o->data.payload, le64toh(o->object.size) - offsetof(Object, data.payload)); + } else if (o->object.type == OBJECT_FIELD) { + h1 = le64toh(o->field.hash); + h2 = hash64(o->field.payload, le64toh(o->object.size) - offsetof(Object, field.payload)); + } else + return true; + + return h1 == h2; +} + +int journal_file_move_to_object(JournalFile *f, int type, uint64_t offset, Object **ret) { + int r; + void *t; + Object *o; + uint64_t s; + + assert(f); + assert(ret); + assert(type < _OBJECT_TYPE_MAX); + + r = journal_file_move_to(f, type >= 0 ? type : WINDOW_UNKNOWN, offset, sizeof(ObjectHeader), &t); + if (r < 0) + return r; + + o = (Object*) t; + s = le64toh(o->object.size); + + if (s < sizeof(ObjectHeader)) + return -EBADMSG; + + if (type >= 0 && o->object.type != type) + return -EBADMSG; + + if (s > sizeof(ObjectHeader)) { + r = journal_file_move_to(f, o->object.type, offset, s, &t); + if (r < 0) + return r; + + o = (Object*) t; + } + + if (!verify_hash(o)) + return -EBADMSG; + + *ret = o; + return 0; +} + +static uint64_t journal_file_seqnum(JournalFile *f, uint64_t *seqnum) { + uint64_t r; + + assert(f); + + r = le64toh(f->header->seqnum) + 1; + + if (seqnum) { + /* If an external seqnum counter was passed, we update + * both the local and the external one, and set it to + * the maximum of both */ + + if (*seqnum + 1 > r) + r = *seqnum + 1; + + *seqnum = r; + } + + f->header->seqnum = htole64(r); + + if (f->header->first_seqnum == 0) + f->header->first_seqnum = htole64(r); + + return r; +} + +static int journal_file_append_object(JournalFile *f, int type, uint64_t size, Object **ret, uint64_t *offset) { + int r; + uint64_t p; + Object *tail, *o; + void *t; + + assert(f); + assert(size >= sizeof(ObjectHeader)); + assert(offset); + assert(ret); + + p = le64toh(f->header->tail_object_offset); + if (p == 0) + p = le64toh(f->header->arena_offset); + else { + r = journal_file_move_to_object(f, -1, p, &tail); + if (r < 0) + return r; + + p += ALIGN64(le64toh(tail->object.size)); + } + + r = journal_file_allocate(f, p, size); + if (r < 0) + return r; + + r = journal_file_move_to(f, type, p, size, &t); + if (r < 0) + return r; + + o = (Object*) t; + + zero(o->object); + o->object.type = type; + o->object.size = htole64(size); + + f->header->tail_object_offset = htole64(p); + f->header->n_objects = htole64(le64toh(f->header->n_objects) + 1); + + *ret = o; + *offset = p; + + return 0; +} + +static int journal_file_setup_data_hash_table(JournalFile *f) { + uint64_t s, p; + Object *o; + int r; + + assert(f); + + s = DEFAULT_DATA_HASH_TABLE_SIZE; + r = journal_file_append_object(f, + OBJECT_DATA_HASH_TABLE, + offsetof(Object, hash_table.items) + s, + &o, &p); + if (r < 0) + return r; + + memset(o->hash_table.items, 0, s); + + f->header->data_hash_table_offset = htole64(p + offsetof(Object, hash_table.items)); + f->header->data_hash_table_size = htole64(s); + + return 0; +} + +static int journal_file_setup_field_hash_table(JournalFile *f) { + uint64_t s, p; + Object *o; + int r; + + assert(f); + + s = DEFAULT_FIELD_HASH_TABLE_SIZE; + r = journal_file_append_object(f, + OBJECT_FIELD_HASH_TABLE, + offsetof(Object, hash_table.items) + s, + &o, &p); + if (r < 0) + return r; + + memset(o->hash_table.items, 0, s); + + f->header->field_hash_table_offset = htole64(p + offsetof(Object, hash_table.items)); + f->header->field_hash_table_size = htole64(s); + + return 0; +} + +static int journal_file_map_data_hash_table(JournalFile *f) { + uint64_t s, p; + void *t; + int r; + + assert(f); + + p = le64toh(f->header->data_hash_table_offset); + s = le64toh(f->header->data_hash_table_size); + + r = journal_file_move_to(f, + WINDOW_DATA_HASH_TABLE, + p, s, + &t); + if (r < 0) + return r; + + f->data_hash_table = t; + return 0; +} + +static int journal_file_map_field_hash_table(JournalFile *f) { + uint64_t s, p; + void *t; + int r; + + assert(f); + + p = le64toh(f->header->field_hash_table_offset); + s = le64toh(f->header->field_hash_table_size); + + r = journal_file_move_to(f, + WINDOW_FIELD_HASH_TABLE, + p, s, + &t); + if (r < 0) + return r; + + f->field_hash_table = t; + return 0; +} + +static int journal_file_link_data(JournalFile *f, Object *o, uint64_t offset, uint64_t hash) { + uint64_t p, h; + int r; + + assert(f); + assert(o); + assert(offset > 0); + assert(o->object.type == OBJECT_DATA); + + o->data.next_hash_offset = o->data.next_field_offset = 0; + o->data.entry_offset = o->data.entry_array_offset = 0; + o->data.n_entries = 0; + + h = hash % (le64toh(f->header->data_hash_table_size) / sizeof(HashItem)); + p = le64toh(f->data_hash_table[h].head_hash_offset); + if (p == 0) { + /* Only entry in the hash table is easy */ + f->data_hash_table[h].head_hash_offset = htole64(offset); + } else { + /* Temporarily move back to the previous data object, + * to patch in pointer */ + + r = journal_file_move_to_object(f, OBJECT_DATA, p, &o); + if (r < 0) + return r; + + o->data.next_hash_offset = htole64(offset); + + r = journal_file_move_to_object(f, OBJECT_DATA, offset, &o); + if (r < 0) + return r; + } + + f->data_hash_table[h].tail_hash_offset = htole64(offset); + + return 0; +} + +int journal_file_find_data_object_with_hash( + JournalFile *f, + const void *data, uint64_t size, uint64_t hash, + Object **ret, uint64_t *offset) { + uint64_t p, osize, h; + int r; + + assert(f); + assert(data || size == 0); + + osize = offsetof(Object, data.payload) + size; + + if (f->header->data_hash_table_size == 0) + return -EBADMSG; + + h = hash % (le64toh(f->header->data_hash_table_size) / sizeof(HashItem)); + p = le64toh(f->data_hash_table[h].head_hash_offset); + + while (p > 0) { + Object *o; + + r = journal_file_move_to_object(f, OBJECT_DATA, p, &o); + if (r < 0) + return r; + + if (le64toh(o->data.hash) != hash) + goto next; + + if (o->object.flags & OBJECT_COMPRESSED) { +#ifdef HAVE_XZ + uint64_t l, rsize; + + l = le64toh(o->object.size); + if (l <= offsetof(Object, data.payload)) + return -EBADMSG; + + l -= offsetof(Object, data.payload); + + if (!uncompress_blob(o->data.payload, l, &f->compress_buffer, &f->compress_buffer_size, &rsize)) + return -EBADMSG; + + if (rsize == size && + memcmp(f->compress_buffer, data, size) == 0) { + + if (ret) + *ret = o; + + if (offset) + *offset = p; + + return 1; + } +#else + return -EPROTONOSUPPORT; +#endif + + } else if (le64toh(o->object.size) == osize && + memcmp(o->data.payload, data, size) == 0) { + + if (ret) + *ret = o; + + if (offset) + *offset = p; + + return 1; + } + + next: + p = le64toh(o->data.next_hash_offset); + } + + return 0; +} + +int journal_file_find_data_object( + JournalFile *f, + const void *data, uint64_t size, + Object **ret, uint64_t *offset) { + + uint64_t hash; + + assert(f); + assert(data || size == 0); + + hash = hash64(data, size); + + return journal_file_find_data_object_with_hash(f, + data, size, hash, + ret, offset); +} + +static int journal_file_append_data(JournalFile *f, const void *data, uint64_t size, Object **ret, uint64_t *offset) { + uint64_t hash, p; + uint64_t osize; + Object *o; + int r; + bool compressed = false; + + assert(f); + assert(data || size == 0); + + hash = hash64(data, size); + + r = journal_file_find_data_object_with_hash(f, data, size, hash, &o, &p); + if (r < 0) + return r; + else if (r > 0) { + + if (ret) + *ret = o; + + if (offset) + *offset = p; + + return 0; + } + + osize = offsetof(Object, data.payload) + size; + r = journal_file_append_object(f, OBJECT_DATA, osize, &o, &p); + if (r < 0) + return r; + + o->data.hash = htole64(hash); + +#ifdef HAVE_XZ + if (f->compress && + size >= COMPRESSION_SIZE_THRESHOLD) { + uint64_t rsize; + + compressed = compress_blob(data, size, o->data.payload, &rsize); + + if (compressed) { + o->object.size = htole64(offsetof(Object, data.payload) + rsize); + o->object.flags |= OBJECT_COMPRESSED; + + f->header->incompatible_flags = htole32(le32toh(f->header->incompatible_flags) | HEADER_INCOMPATIBLE_COMPRESSED); + + log_debug("Compressed data object %lu -> %lu", (unsigned long) size, (unsigned long) rsize); + } + } +#endif + + if (!compressed) + memcpy(o->data.payload, data, size); + + r = journal_file_link_data(f, o, p, hash); + if (r < 0) + return r; + + if (ret) + *ret = o; + + if (offset) + *offset = p; + + return 0; +} + +uint64_t journal_file_entry_n_items(Object *o) { + assert(o); + assert(o->object.type == htole64(OBJECT_ENTRY)); + + return (le64toh(o->object.size) - offsetof(Object, entry.items)) / sizeof(EntryItem); +} + +static uint64_t journal_file_entry_array_n_items(Object *o) { + assert(o); + assert(o->object.type == htole64(OBJECT_ENTRY_ARRAY)); + + return (le64toh(o->object.size) - offsetof(Object, entry_array.items)) / sizeof(uint64_t); +} + +static int link_entry_into_array(JournalFile *f, + uint64_t *first, + uint64_t *idx, + uint64_t p) { + int r; + uint64_t n = 0, ap = 0, q, i, a, hidx; + Object *o; + + assert(f); + assert(first); + assert(idx); + assert(p > 0); + + a = le64toh(*first); + i = hidx = le64toh(*idx); + while (a > 0) { + + r = journal_file_move_to_object(f, OBJECT_ENTRY_ARRAY, a, &o); + if (r < 0) + return r; + + n = journal_file_entry_array_n_items(o); + if (i < n) { + o->entry_array.items[i] = htole64(p); + *idx = htole64(hidx + 1); + return 0; + } + + i -= n; + ap = a; + a = le64toh(o->entry_array.next_entry_array_offset); + } + + if (hidx > n) + n = (hidx+1) * 2; + else + n = n * 2; + + if (n < 4) + n = 4; + + r = journal_file_append_object(f, OBJECT_ENTRY_ARRAY, + offsetof(Object, entry_array.items) + n * sizeof(uint64_t), + &o, &q); + if (r < 0) + return r; + + o->entry_array.items[i] = htole64(p); + + if (ap == 0) + *first = q; + else { + r = journal_file_move_to_object(f, OBJECT_ENTRY_ARRAY, ap, &o); + if (r < 0) + return r; + + o->entry_array.next_entry_array_offset = htole64(q); + } + + *idx = htole64(hidx + 1); + + return 0; +} + +static int link_entry_into_array_plus_one(JournalFile *f, + uint64_t *extra, + uint64_t *first, + uint64_t *idx, + uint64_t p) { + + int r; + + assert(f); + assert(extra); + assert(first); + assert(idx); + assert(p > 0); + + if (*idx == 0) + *extra = htole64(p); + else { + uint64_t i; + + i = le64toh(*idx) - 1; + r = link_entry_into_array(f, first, &i, p); + if (r < 0) + return r; + } + + *idx = htole64(le64toh(*idx) + 1); + return 0; +} + +static int journal_file_link_entry_item(JournalFile *f, Object *o, uint64_t offset, uint64_t i) { + uint64_t p; + int r; + assert(f); + assert(o); + assert(offset > 0); + + p = le64toh(o->entry.items[i].object_offset); + if (p == 0) + return -EINVAL; + + r = journal_file_move_to_object(f, OBJECT_DATA, p, &o); + if (r < 0) + return r; + + return link_entry_into_array_plus_one(f, + &o->data.entry_offset, + &o->data.entry_array_offset, + &o->data.n_entries, + offset); +} + +static int journal_file_link_entry(JournalFile *f, Object *o, uint64_t offset) { + uint64_t n, i; + int r; + + assert(f); + assert(o); + assert(offset > 0); + assert(o->object.type == OBJECT_ENTRY); + + /* Link up the entry itself */ + r = link_entry_into_array(f, + &f->header->entry_array_offset, + &f->header->n_entries, + offset); + if (r < 0) + return r; + + log_error("=> %s seqnr=%lu n_entries=%lu", f->path, (unsigned long) o->entry.seqnum, (unsigned long) f->header->n_entries); + + if (f->header->head_entry_realtime == 0) + f->header->head_entry_realtime = o->entry.realtime; + + f->header->tail_entry_realtime = o->entry.realtime; + f->header->tail_entry_monotonic = o->entry.monotonic; + + f->tail_entry_monotonic_valid = true; + + /* Link up the items */ + n = journal_file_entry_n_items(o); + for (i = 0; i < n; i++) { + r = journal_file_link_entry_item(f, o, offset, i); + if (r < 0) + return r; + } + + return 0; +} + +static int journal_file_append_entry_internal( + JournalFile *f, + const dual_timestamp *ts, + uint64_t xor_hash, + const EntryItem items[], unsigned n_items, + uint64_t *seqnum, + Object **ret, uint64_t *offset) { + uint64_t np; + uint64_t osize; + Object *o; + int r; + + assert(f); + assert(items || n_items == 0); + assert(ts); + + osize = offsetof(Object, entry.items) + (n_items * sizeof(EntryItem)); + + r = journal_file_append_object(f, OBJECT_ENTRY, osize, &o, &np); + if (r < 0) + return r; + + o->entry.seqnum = htole64(journal_file_seqnum(f, seqnum)); + memcpy(o->entry.items, items, n_items * sizeof(EntryItem)); + o->entry.realtime = htole64(ts->realtime); + o->entry.monotonic = htole64(ts->monotonic); + o->entry.xor_hash = htole64(xor_hash); + o->entry.boot_id = f->header->boot_id; + + r = journal_file_link_entry(f, o, np); + if (r < 0) + return r; + + if (ret) + *ret = o; + + if (offset) + *offset = np; + + return 0; +} + +void journal_file_post_change(JournalFile *f) { + assert(f); + + /* inotify() does not receive IN_MODIFY events from file + * accesses done via mmap(). After each access we hence + * trigger IN_MODIFY by truncating the journal file to its + * current size which triggers IN_MODIFY. */ + + __sync_synchronize(); + + if (ftruncate(f->fd, f->last_stat.st_size) < 0) + log_error("Failed to to truncate file to its own size: %m"); +} + +int journal_file_append_entry(JournalFile *f, const dual_timestamp *ts, const struct iovec iovec[], unsigned n_iovec, uint64_t *seqnum, Object **ret, uint64_t *offset) { + unsigned i; + EntryItem *items; + int r; + uint64_t xor_hash = 0; + struct dual_timestamp _ts; + + assert(f); + assert(iovec || n_iovec == 0); + + if (!f->writable) + return -EPERM; + + if (!ts) { + dual_timestamp_get(&_ts); + ts = &_ts; + } + + if (f->tail_entry_monotonic_valid && + ts->monotonic < le64toh(f->header->tail_entry_monotonic)) + return -EINVAL; + + if (ts->realtime < le64toh(f->header->tail_entry_realtime)) + return -EINVAL; + + items = alloca(sizeof(EntryItem) * n_iovec); + + for (i = 0; i < n_iovec; i++) { + uint64_t p; + Object *o; + + r = journal_file_append_data(f, iovec[i].iov_base, iovec[i].iov_len, &o, &p); + if (r < 0) + return r; + + xor_hash ^= le64toh(o->data.hash); + items[i].object_offset = htole64(p); + items[i].hash = o->data.hash; + } + + r = journal_file_append_entry_internal(f, ts, xor_hash, items, n_iovec, seqnum, ret, offset); + + journal_file_post_change(f); + + return r; +} + +static int generic_array_get(JournalFile *f, + uint64_t first, + uint64_t i, + Object **ret, uint64_t *offset) { + + Object *o; + uint64_t p, a; + int r; + + assert(f); + + a = first; + while (a > 0) { + uint64_t n; + + r = journal_file_move_to_object(f, OBJECT_ENTRY_ARRAY, a, &o); + if (r < 0) + return r; + + n = journal_file_entry_array_n_items(o); + if (i < n) { + p = le64toh(o->entry_array.items[i]); + break; + } + + i -= n; + a = le64toh(o->entry_array.next_entry_array_offset); + } + + if (a <= 0 || p <= 0) + return 0; + + r = journal_file_move_to_object(f, OBJECT_ENTRY, p, &o); + if (r < 0) + return r; + + if (ret) + *ret = o; + + if (offset) + *offset = p; + + return 1; +} + +static int generic_array_get_plus_one(JournalFile *f, + uint64_t extra, + uint64_t first, + uint64_t i, + Object **ret, uint64_t *offset) { + + Object *o; + + assert(f); + + if (i == 0) { + int r; + + r = journal_file_move_to_object(f, OBJECT_ENTRY, extra, &o); + if (r < 0) + return r; + + if (ret) + *ret = o; + + if (offset) + *offset = extra; + + return 1; + } + + return generic_array_get(f, first, i-1, ret, offset); +} + +enum { + TEST_FOUND, + TEST_LEFT, + TEST_RIGHT +}; + +static int generic_array_bisect(JournalFile *f, + uint64_t first, + uint64_t n, + uint64_t needle, + int (*test_object)(JournalFile *f, uint64_t p, uint64_t needle), + direction_t direction, + Object **ret, + uint64_t *offset, + uint64_t *idx) { + + uint64_t a, p, t = 0, i = 0, last_p = 0; + bool subtract_one = false; + Object *o, *array = NULL; + int r; + + assert(f); + assert(test_object); + + a = first; + while (a > 0) { + uint64_t left, right, k, lp; + + r = journal_file_move_to_object(f, OBJECT_ENTRY_ARRAY, a, &array); + if (r < 0) + return r; + + k = journal_file_entry_array_n_items(array); + right = MIN(k, n); + if (right <= 0) + return 0; + + i = right - 1; + lp = p = le64toh(array->entry_array.items[i]); + if (p <= 0) + return -EBADMSG; + + r = test_object(f, p, needle); + if (r < 0) + return r; + + if (r == TEST_FOUND) + r = direction == DIRECTION_DOWN ? TEST_RIGHT : TEST_LEFT; + + if (r == TEST_RIGHT) { + left = 0; + right -= 1; + for (;;) { + if (left == right) { + if (direction == DIRECTION_UP) + subtract_one = true; + + i = left; + goto found; + } + + assert(left < right); + + i = (left + right) / 2; + p = le64toh(array->entry_array.items[i]); + if (p <= 0) + return -EBADMSG; + + r = test_object(f, p, needle); + if (r < 0) + return r; + + if (r == TEST_FOUND) + r = direction == DIRECTION_DOWN ? TEST_RIGHT : TEST_LEFT; + + if (r == TEST_RIGHT) + right = i; + else + left = i + 1; + } + } + + if (k > n) + return 0; + + last_p = lp; + + n -= k; + t += k; + a = le64toh(array->entry_array.next_entry_array_offset); + } + + return 0; + +found: + if (subtract_one && t == 0 && i == 0) + return 0; + + if (subtract_one && i == 0) + p = last_p; + else if (subtract_one) + p = le64toh(array->entry_array.items[i-1]); + else + p = le64toh(array->entry_array.items[i]); + + r = journal_file_move_to_object(f, OBJECT_ENTRY, p, &o); + if (r < 0) + return r; + + if (ret) + *ret = o; + + if (offset) + *offset = p; + + if (idx) + *idx = t + i - (subtract_one ? 1 : 0); + + return 1; +} + +static int generic_array_bisect_plus_one(JournalFile *f, + uint64_t extra, + uint64_t first, + uint64_t n, + uint64_t needle, + int (*test_object)(JournalFile *f, uint64_t p, uint64_t needle), + direction_t direction, + Object **ret, + uint64_t *offset, + uint64_t *idx) { + + int r; + + assert(f); + assert(test_object); + + if (n <= 0) + return 0; + + /* This bisects the array in object 'first', but first checks + * an extra */ + r = test_object(f, extra, needle); + if (r < 0) + return r; + else if (r == TEST_FOUND) { + Object *o; + + r = journal_file_move_to_object(f, OBJECT_ENTRY, extra, &o); + if (r < 0) + return r; + + if (ret) + *ret = o; + + if (offset) + *offset = extra; + + if (idx) + *idx = 0; + + return 1; + } else if (r == TEST_RIGHT) + return 0; + + r = generic_array_bisect(f, first, n-1, needle, test_object, direction, ret, offset, idx); + + if (r > 0) + (*idx) ++; + + return r; +} + +static int test_object_seqnum(JournalFile *f, uint64_t p, uint64_t needle) { + Object *o; + int r; + + assert(f); + assert(p > 0); + + r = journal_file_move_to_object(f, OBJECT_ENTRY, p, &o); + if (r < 0) + return r; + + if (le64toh(o->entry.seqnum) == needle) + return TEST_FOUND; + else if (le64toh(o->entry.seqnum) < needle) + return TEST_LEFT; + else + return TEST_RIGHT; +} + +int journal_file_move_to_entry_by_seqnum( + JournalFile *f, + uint64_t seqnum, + direction_t direction, + Object **ret, + uint64_t *offset) { + + return generic_array_bisect(f, + le64toh(f->header->entry_array_offset), + le64toh(f->header->n_entries), + seqnum, + test_object_seqnum, + direction, + ret, offset, NULL); +} + +static int test_object_realtime(JournalFile *f, uint64_t p, uint64_t needle) { + Object *o; + int r; + + assert(f); + assert(p > 0); + + r = journal_file_move_to_object(f, OBJECT_ENTRY, p, &o); + if (r < 0) + return r; + + if (le64toh(o->entry.realtime) == needle) + return TEST_FOUND; + else if (le64toh(o->entry.realtime) < needle) + return TEST_LEFT; + else + return TEST_RIGHT; +} + +int journal_file_move_to_entry_by_realtime( + JournalFile *f, + uint64_t realtime, + direction_t direction, + Object **ret, + uint64_t *offset) { + + return generic_array_bisect(f, + le64toh(f->header->entry_array_offset), + le64toh(f->header->n_entries), + realtime, + test_object_realtime, + direction, + ret, offset, NULL); +} + +static int test_object_monotonic(JournalFile *f, uint64_t p, uint64_t needle) { + Object *o; + int r; + + assert(f); + assert(p > 0); + + r = journal_file_move_to_object(f, OBJECT_ENTRY, p, &o); + if (r < 0) + return r; + + if (le64toh(o->entry.monotonic) == needle) + return TEST_FOUND; + else if (le64toh(o->entry.monotonic) < needle) + return TEST_LEFT; + else + return TEST_RIGHT; +} + +int journal_file_move_to_entry_by_monotonic( + JournalFile *f, + sd_id128_t boot_id, + uint64_t monotonic, + direction_t direction, + Object **ret, + uint64_t *offset) { + + char t[8+32+1] = "_BOOT_ID="; + Object *o; + int r; + + sd_id128_to_string(boot_id, t + 8); + + r = journal_file_find_data_object(f, t, strlen(t), &o, NULL); + if (r < 0) + return r; + else if (r == 0) + return -ENOENT; + + return generic_array_bisect_plus_one(f, + le64toh(o->data.entry_offset), + le64toh(o->data.entry_array_offset), + le64toh(o->data.n_entries), + monotonic, + test_object_monotonic, + direction, + ret, offset, NULL); +} + +static int test_object_offset(JournalFile *f, uint64_t p, uint64_t needle) { + assert(f); + assert(p > 0); + + if (p == needle) + return TEST_FOUND; + else if (p < needle) + return TEST_LEFT; + else + return TEST_RIGHT; +} + +int journal_file_next_entry( + JournalFile *f, + Object *o, uint64_t p, + direction_t direction, + Object **ret, uint64_t *offset) { + + uint64_t i, n; + int r; + + assert(f); + assert(p > 0 || !o); + + n = le64toh(f->header->n_entries); + if (n <= 0) + return 0; + + if (!o) + i = direction == DIRECTION_DOWN ? 0 : n - 1; + else { + if (o->object.type != OBJECT_ENTRY) + return -EINVAL; + + r = generic_array_bisect(f, + le64toh(f->header->entry_array_offset), + le64toh(f->header->n_entries), + p, + test_object_offset, + DIRECTION_DOWN, + NULL, NULL, + &i); + if (r <= 0) + return r; + + if (direction == DIRECTION_DOWN) { + if (i >= n - 1) + return 0; + + i++; + } else { + if (i <= 0) + return 0; + + i--; + } + } + + /* And jump to it */ + return generic_array_get(f, + le64toh(f->header->entry_array_offset), + i, + ret, offset); +} + +int journal_file_skip_entry( + JournalFile *f, + Object *o, uint64_t p, + int64_t skip, + Object **ret, uint64_t *offset) { + + uint64_t i, n; + int r; + + assert(f); + assert(o); + assert(p > 0); + + if (o->object.type != OBJECT_ENTRY) + return -EINVAL; + + r = generic_array_bisect(f, + le64toh(f->header->entry_array_offset), + le64toh(f->header->n_entries), + p, + test_object_offset, + DIRECTION_DOWN, + NULL, NULL, + &i); + if (r <= 0) + return r; + + /* Calculate new index */ + if (skip < 0) { + if ((uint64_t) -skip >= i) + i = 0; + else + i = i - (uint64_t) -skip; + } else + i += (uint64_t) skip; + + n = le64toh(f->header->n_entries); + if (n <= 0) + return -EBADMSG; + + if (i >= n) + i = n-1; + + return generic_array_get(f, + le64toh(f->header->entry_array_offset), + i, + ret, offset); +} + +int journal_file_next_entry_for_data( + JournalFile *f, + Object *o, uint64_t p, + uint64_t data_offset, + direction_t direction, + Object **ret, uint64_t *offset) { + + uint64_t n, i; + int r; + Object *d; + + assert(f); + assert(p > 0 || !o); + + r = journal_file_move_to_object(f, OBJECT_DATA, data_offset, &d); + if (r < 0) + return r; + + n = le64toh(d->data.n_entries); + if (n <= 0) + return n; + + if (!o) + i = direction == DIRECTION_DOWN ? 0 : n - 1; + else { + if (o->object.type != OBJECT_ENTRY) + return -EINVAL; + + r = generic_array_bisect_plus_one(f, + le64toh(d->data.entry_offset), + le64toh(d->data.entry_array_offset), + le64toh(d->data.n_entries), + p, + test_object_offset, + DIRECTION_DOWN, + NULL, NULL, + &i); + + if (r <= 0) + return r; + + if (direction == DIRECTION_DOWN) { + if (i >= n - 1) + return 0; + + i++; + } else { + if (i <= 0) + return 0; + + i--; + } + + } + + return generic_array_get_plus_one(f, + le64toh(d->data.entry_offset), + le64toh(d->data.entry_array_offset), + i, + ret, offset); +} + +int journal_file_move_to_entry_by_seqnum_for_data( + JournalFile *f, + uint64_t data_offset, + uint64_t seqnum, + direction_t direction, + Object **ret, uint64_t *offset) { + + Object *d; + int r; + + r = journal_file_move_to_object(f, OBJECT_DATA, data_offset, &d); + if (r <= 0) + return r; + + return generic_array_bisect_plus_one(f, + le64toh(d->data.entry_offset), + le64toh(d->data.entry_array_offset), + le64toh(d->data.n_entries), + seqnum, + test_object_seqnum, + direction, + ret, offset, NULL); +} + +int journal_file_move_to_entry_by_realtime_for_data( + JournalFile *f, + uint64_t data_offset, + uint64_t realtime, + direction_t direction, + Object **ret, uint64_t *offset) { + + Object *d; + int r; + + r = journal_file_move_to_object(f, OBJECT_DATA, data_offset, &d); + if (r <= 0) + return r; + + return generic_array_bisect_plus_one(f, + le64toh(d->data.entry_offset), + le64toh(d->data.entry_array_offset), + le64toh(d->data.n_entries), + realtime, + test_object_realtime, + direction, + ret, offset, NULL); +} + +void journal_file_dump(JournalFile *f) { + char a[33], b[33], c[33]; + Object *o; + int r; + uint64_t p; + + assert(f); + + printf("File Path: %s\n" + "File ID: %s\n" + "Machine ID: %s\n" + "Boot ID: %s\n" + "Arena size: %llu\n" + "Objects: %lu\n" + "Entries: %lu\n", + f->path, + sd_id128_to_string(f->header->file_id, a), + sd_id128_to_string(f->header->machine_id, b), + sd_id128_to_string(f->header->boot_id, c), + (unsigned long long) le64toh(f->header->arena_size), + (unsigned long) le64toh(f->header->n_objects), + (unsigned long) le64toh(f->header->n_entries)); + + p = le64toh(f->header->arena_offset); + while (p != 0) { + r = journal_file_move_to_object(f, -1, p, &o); + if (r < 0) + goto fail; + + switch (o->object.type) { + + case OBJECT_UNUSED: + printf("Type: OBJECT_UNUSED\n"); + break; + + case OBJECT_DATA: + printf("Type: OBJECT_DATA\n"); + break; + + case OBJECT_ENTRY: + printf("Type: OBJECT_ENTRY %llu %llu %llu\n", + (unsigned long long) le64toh(o->entry.seqnum), + (unsigned long long) le64toh(o->entry.monotonic), + (unsigned long long) le64toh(o->entry.realtime)); + break; + + case OBJECT_FIELD_HASH_TABLE: + printf("Type: OBJECT_FIELD_HASH_TABLE\n"); + break; + + case OBJECT_DATA_HASH_TABLE: + printf("Type: OBJECT_DATA_HASH_TABLE\n"); + break; + + case OBJECT_ENTRY_ARRAY: + printf("Type: OBJECT_ENTRY_ARRAY\n"); + break; + } + + if (o->object.flags & OBJECT_COMPRESSED) + printf("Flags: COMPRESSED\n"); + + if (p == le64toh(f->header->tail_object_offset)) + p = 0; + else + p = p + ALIGN64(le64toh(o->object.size)); + } + + return; +fail: + log_error("File corrupt"); +} + +int journal_file_open( + const char *fname, + int flags, + mode_t mode, + JournalFile *template, + JournalFile **ret) { + + JournalFile *f; + int r; + bool newly_created = false; + + assert(fname); + + if ((flags & O_ACCMODE) != O_RDONLY && + (flags & O_ACCMODE) != O_RDWR) + return -EINVAL; + + f = new0(JournalFile, 1); + if (!f) + return -ENOMEM; + + f->fd = -1; + f->flags = flags; + f->mode = mode; + f->writable = (flags & O_ACCMODE) != O_RDONLY; + f->prot = prot_from_flags(flags); + + f->metrics.max_size = DEFAULT_MAX_SIZE; + f->metrics.min_size = DEFAULT_MIN_SIZE; + f->metrics.keep_free = DEFAULT_KEEP_FREE; + + f->path = strdup(fname); + if (!f->path) { + r = -ENOMEM; + goto fail; + } + + f->fd = open(f->path, f->flags|O_CLOEXEC, f->mode); + if (f->fd < 0) { + r = -errno; + goto fail; + } + + if (fstat(f->fd, &f->last_stat) < 0) { + r = -errno; + goto fail; + } + + if (f->last_stat.st_size == 0 && f->writable) { + newly_created = true; + + r = journal_file_init_header(f, template); + if (r < 0) + goto fail; + + if (fstat(f->fd, &f->last_stat) < 0) { + r = -errno; + goto fail; + } + } + + if (f->last_stat.st_size < (off_t) sizeof(Header)) { + r = -EIO; + goto fail; + } + + f->header = mmap(NULL, PAGE_ALIGN(sizeof(Header)), prot_from_flags(flags), MAP_SHARED, f->fd, 0); + if (f->header == MAP_FAILED) { + f->header = NULL; + r = -errno; + goto fail; + } + + if (!newly_created) { + r = journal_file_verify_header(f); + if (r < 0) + goto fail; + } + + if (f->writable) { + r = journal_file_refresh_header(f); + if (r < 0) + goto fail; + } + + if (newly_created) { + + r = journal_file_setup_field_hash_table(f); + if (r < 0) + goto fail; + + r = journal_file_setup_data_hash_table(f); + if (r < 0) + goto fail; + } + + r = journal_file_map_field_hash_table(f); + if (r < 0) + goto fail; + + r = journal_file_map_data_hash_table(f); + if (r < 0) + goto fail; + + if (ret) + *ret = f; + + return 0; + +fail: + journal_file_close(f); + + return r; +} + +int journal_file_rotate(JournalFile **f) { + char *p; + size_t l; + JournalFile *old_file, *new_file = NULL; + int r; + + assert(f); + assert(*f); + + old_file = *f; + + if (!old_file->writable) + return -EINVAL; + + if (!endswith(old_file->path, ".journal")) + return -EINVAL; + + l = strlen(old_file->path); + + p = new(char, l + 1 + 16 + 1 + 32 + 1 + 16 + 1); + if (!p) + return -ENOMEM; + + memcpy(p, old_file->path, l - 8); + p[l-8] = '@'; + sd_id128_to_string(old_file->header->seqnum_id, p + l - 8 + 1); + snprintf(p + l - 8 + 1 + 32, 1 + 16 + 1 + 16 + 8 + 1, + "-%016llx-%016llx.journal", + (unsigned long long) le64toh((*f)->header->seqnum), + (unsigned long long) le64toh((*f)->header->tail_entry_realtime)); + + r = rename(old_file->path, p); + free(p); + + if (r < 0) + return -errno; + + old_file->header->state = le32toh(STATE_ARCHIVED); + + r = journal_file_open(old_file->path, old_file->flags, old_file->mode, old_file, &new_file); + journal_file_close(old_file); + + *f = new_file; + return r; +} + +struct vacuum_info { + off_t usage; + char *filename; + + uint64_t realtime; + sd_id128_t seqnum_id; + uint64_t seqnum; +}; + +static int vacuum_compare(const void *_a, const void *_b) { + const struct vacuum_info *a, *b; + + a = _a; + b = _b; + + if (sd_id128_equal(a->seqnum_id, b->seqnum_id)) { + if (a->seqnum < b->seqnum) + return -1; + else if (a->seqnum > b->seqnum) + return 1; + else + return 0; + } + + if (a->realtime < b->realtime) + return -1; + else if (a->realtime > b->realtime) + return 1; + else + return memcmp(&a->seqnum_id, &b->seqnum_id, 16); +} + +int journal_directory_vacuum(const char *directory, uint64_t max_use, uint64_t min_free) { + DIR *d; + int r = 0; + struct vacuum_info *list = NULL; + unsigned n_list = 0, n_allocated = 0, i; + uint64_t sum = 0; + + assert(directory); + + if (max_use <= 0) + max_use = DEFAULT_MAX_USE; + + d = opendir(directory); + if (!d) + return -errno; + + for (;;) { + int k; + struct dirent buf, *de; + size_t q; + struct stat st; + char *p; + unsigned long long seqnum, realtime; + sd_id128_t seqnum_id; + + k = readdir_r(d, &buf, &de); + if (k != 0) { + r = -k; + goto finish; + } + + if (!de) + break; + + if (!dirent_is_file_with_suffix(de, ".journal")) + continue; + + q = strlen(de->d_name); + + if (q < 1 + 32 + 1 + 16 + 1 + 16 + 8) + continue; + + if (de->d_name[q-8-16-1] != '-' || + de->d_name[q-8-16-1-16-1] != '-' || + de->d_name[q-8-16-1-16-1-32-1] != '@') + continue; + + if (fstatat(dirfd(d), de->d_name, &st, AT_SYMLINK_NOFOLLOW) < 0) + continue; + + if (!S_ISREG(st.st_mode)) + continue; + + p = strdup(de->d_name); + if (!p) { + r = -ENOMEM; + goto finish; + } + + de->d_name[q-8-16-1-16-1] = 0; + if (sd_id128_from_string(de->d_name + q-8-16-1-16-1-32, &seqnum_id) < 0) { + free(p); + continue; + } + + if (sscanf(de->d_name + q-8-16-1-16, "%16llx-%16llx.journal", &seqnum, &realtime) != 2) { + free(p); + continue; + } + + if (n_list >= n_allocated) { + struct vacuum_info *j; + + n_allocated = MAX(n_allocated * 2U, 8U); + j = realloc(list, n_allocated * sizeof(struct vacuum_info)); + if (!j) { + free(p); + r = -ENOMEM; + goto finish; + } + + list = j; + } + + list[n_list].filename = p; + list[n_list].usage = (uint64_t) st.st_blksize * (uint64_t) st.st_blocks; + list[n_list].seqnum = seqnum; + list[n_list].realtime = realtime; + list[n_list].seqnum_id = seqnum_id; + + sum += list[n_list].usage; + + n_list ++; + } + + qsort(list, n_list, sizeof(struct vacuum_info), vacuum_compare); + + for(i = 0; i < n_list; i++) { + struct statvfs ss; + + if (fstatvfs(dirfd(d), &ss) < 0) { + r = -errno; + goto finish; + } + + if (sum <= max_use && + (uint64_t) ss.f_bavail * (uint64_t) ss.f_bsize >= min_free) + break; + + if (unlinkat(dirfd(d), list[i].filename, 0) >= 0) { + log_debug("Deleted archived journal %s/%s.", directory, list[i].filename); + sum -= list[i].usage; + } else if (errno != ENOENT) + log_warning("Failed to delete %s/%s: %m", directory, list[i].filename); + } + +finish: + for (i = 0; i < n_list; i++) + free(list[i].filename); + + free(list); + + if (d) + closedir(d); + + return r; +} + +int journal_file_copy_entry(JournalFile *from, JournalFile *to, Object *o, uint64_t p, uint64_t *seqnum, Object **ret, uint64_t *offset) { + uint64_t i, n; + uint64_t q, xor_hash = 0; + int r; + EntryItem *items; + dual_timestamp ts; + + assert(from); + assert(to); + assert(o); + assert(p); + + if (!to->writable) + return -EPERM; + + ts.monotonic = le64toh(o->entry.monotonic); + ts.realtime = le64toh(o->entry.realtime); + + if (to->tail_entry_monotonic_valid && + ts.monotonic < le64toh(to->header->tail_entry_monotonic)) + return -EINVAL; + + if (ts.realtime < le64toh(to->header->tail_entry_realtime)) + return -EINVAL; + + n = journal_file_entry_n_items(o); + items = alloca(sizeof(EntryItem) * n); + + for (i = 0; i < n; i++) { + uint64_t le_hash, l, h; + size_t t; + void *data; + Object *u; + + q = le64toh(o->entry.items[i].object_offset); + le_hash = o->entry.items[i].hash; + + r = journal_file_move_to_object(from, OBJECT_DATA, q, &o); + if (r < 0) + return r; + + if (le_hash != o->data.hash) + return -EBADMSG; + + l = le64toh(o->object.size) - offsetof(Object, data.payload); + t = (size_t) l; + + /* We hit the limit on 32bit machines */ + if ((uint64_t) t != l) + return -E2BIG; + + if (o->object.flags & OBJECT_COMPRESSED) { +#ifdef HAVE_XZ + uint64_t rsize; + + if (!uncompress_blob(o->data.payload, l, &from->compress_buffer, &from->compress_buffer_size, &rsize)) + return -EBADMSG; + + data = from->compress_buffer; + l = rsize; +#else + return -EPROTONOSUPPORT; +#endif + } else + data = o->data.payload; + + r = journal_file_append_data(to, data, l, &u, &h); + if (r < 0) + return r; + + xor_hash ^= le64toh(u->data.hash); + items[i].object_offset = htole64(h); + items[i].hash = u->data.hash; + + r = journal_file_move_to_object(from, OBJECT_ENTRY, p, &o); + if (r < 0) + return r; + } + + return journal_file_append_entry_internal(to, &ts, xor_hash, items, n, seqnum, ret, offset); +} diff --git a/src/journal/journal-file.h b/src/journal/journal-file.h new file mode 100644 index 0000000000..acc55272a0 --- /dev/null +++ b/src/journal/journal-file.h @@ -0,0 +1,127 @@ +/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/ + +#ifndef foojournalfilehfoo +#define foojournalfilehfoo + +/*** + This file is part of systemd. + + Copyright 2011 Lennart Poettering + + systemd is free software; you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + systemd is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with systemd; If not, see <http://www.gnu.org/licenses/>. +***/ + +#include <inttypes.h> + +#include "journal-def.h" +#include "util.h" +#include "sd-id128.h" + +#define DEFAULT_MAX_SIZE (128ULL*1024ULL*1024ULL) +#define DEFAULT_MIN_SIZE (256ULL*1024ULL) +#define DEFAULT_KEEP_FREE (1ULL*1024ULL*1024ULL) +#define DEFAULT_MAX_USE (16ULL*1024ULL*1024ULL*16ULL) + +typedef struct Window { + void *ptr; + uint64_t offset; + uint64_t size; +} Window; + +enum { + WINDOW_UNKNOWN = OBJECT_UNUSED, + WINDOW_DATA = OBJECT_DATA, + WINDOW_ENTRY = OBJECT_ENTRY, + WINDOW_DATA_HASH_TABLE = OBJECT_DATA_HASH_TABLE, + WINDOW_FIELD_HASH_TABLE = OBJECT_FIELD_HASH_TABLE, + WINDOW_ENTRY_ARRAY = OBJECT_ENTRY_ARRAY, + WINDOW_HEADER, + _WINDOW_MAX +}; + +typedef struct JournalMetrics { + uint64_t max_size; + uint64_t min_size; + uint64_t keep_free; + uint64_t max_use; +} JournalMetrics; + +typedef struct JournalFile { + int fd; + char *path; + struct stat last_stat; + mode_t mode; + int flags; + int prot; + bool writable; + bool tail_entry_monotonic_valid; + + Header *header; + HashItem *data_hash_table; + HashItem *field_hash_table; + + Window windows[_WINDOW_MAX]; + + uint64_t current_offset; + + JournalMetrics metrics; + + bool compress; + +#ifdef HAVE_XZ + void *compress_buffer; + size_t compress_buffer_size; +#endif +} JournalFile; + +typedef enum direction { + DIRECTION_UP, + DIRECTION_DOWN +} direction_t; + +int journal_file_open(const char *fname, int flags, mode_t mode, JournalFile *template, JournalFile **ret); +void journal_file_close(JournalFile *j); + +int journal_file_move_to_object(JournalFile *f, int type, uint64_t offset, Object **ret); + +uint64_t journal_file_entry_n_items(Object *o); + +int journal_file_append_entry(JournalFile *f, const dual_timestamp *ts, const struct iovec iovec[], unsigned n_iovec, uint64_t *seqno, Object **ret, uint64_t *offset); + +int journal_file_find_data_object(JournalFile *f, const void *data, uint64_t size, Object **ret, uint64_t *offset); +int journal_file_find_data_object_with_hash(JournalFile *f, const void *data, uint64_t size, uint64_t hash, Object **ret, uint64_t *offset); + +int journal_file_next_entry(JournalFile *f, Object *o, uint64_t p, direction_t direction, Object **ret, uint64_t *offset); +int journal_file_skip_entry(JournalFile *f, Object *o, uint64_t p, int64_t skip, Object **ret, uint64_t *offset); + +int journal_file_next_entry_for_data(JournalFile *f, Object *o, uint64_t p, uint64_t data_offset, direction_t direction, Object **ret, uint64_t *offset); + +int journal_file_move_to_entry_by_seqnum(JournalFile *f, uint64_t seqnum, direction_t direction, Object **ret, uint64_t *offset); +int journal_file_move_to_entry_by_realtime(JournalFile *f, uint64_t realtime, direction_t direction, Object **ret, uint64_t *offset); +int journal_file_move_to_entry_by_monotonic(JournalFile *f, sd_id128_t boot_id, uint64_t monotonic, direction_t direction, Object **ret, uint64_t *offset); + +int journal_file_move_to_entry_by_seqnum_for_data(JournalFile *f, uint64_t data_offset, uint64_t seqnum, direction_t direction, Object **ret, uint64_t *offset); +int journal_file_move_to_entry_by_realtime_for_data(JournalFile *f, uint64_t data_offset, uint64_t realtime, direction_t direction, Object **ret, uint64_t *offset); + +int journal_file_copy_entry(JournalFile *from, JournalFile *to, Object *o, uint64_t p, uint64_t *seqnum, Object **ret, uint64_t *offset); + +void journal_file_dump(JournalFile *f); + +int journal_file_rotate(JournalFile **f); + +int journal_directory_vacuum(const char *directory, uint64_t max_use, uint64_t min_free); + +void journal_file_post_change(JournalFile *f); + +#endif diff --git a/src/journal/journal-internal.h b/src/journal/journal-internal.h new file mode 100644 index 0000000000..1b64666da3 --- /dev/null +++ b/src/journal/journal-internal.h @@ -0,0 +1,83 @@ +/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/ + +#ifndef foojournalinternalhfoo +#define foojournalinternalhfoo + +/*** + This file is part of systemd. + + Copyright 2011 Lennart Poettering + + systemd is free software; you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + systemd is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with systemd; If not, see <http://www.gnu.org/licenses/>. +***/ + +#include <sys/types.h> +#include <inttypes.h> +#include <stdbool.h> + +#include "list.h" +#include "sd-id128.h" + +typedef struct Match Match; + +struct Match { + char *data; + size_t size; + uint64_t le_hash; + + LIST_FIELDS(Match, matches); +}; + +typedef enum location_type { + LOCATION_HEAD, + LOCATION_TAIL, + LOCATION_DISCRETE +} location_type_t; + +typedef struct Location { + location_type_t type; + + uint64_t seqnum; + sd_id128_t seqnum_id; + bool seqnum_set; + + uint64_t realtime; + bool realtime_set; + + uint64_t monotonic; + sd_id128_t boot_id; + bool monotonic_set; + + uint64_t xor_hash; + bool xor_hash_set; +} Location; + +struct sd_journal { + int flags; + + Hashmap *files; + + Location current_location; + JournalFile *current_file; + uint64_t current_field; + + int inotify_fd; + Hashmap *inotify_wd_dirs; + Hashmap *inotify_wd_roots; + + LIST_HEAD(Match, matches); + unsigned n_matches; +}; + +#endif diff --git a/src/journal/journal-rate-limit.c b/src/journal/journal-rate-limit.c new file mode 100644 index 0000000000..243ff2a378 --- /dev/null +++ b/src/journal/journal-rate-limit.c @@ -0,0 +1,275 @@ +/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/ + +/*** + This file is part of systemd. + + Copyright 2011 Lennart Poettering + + systemd is free software; you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + systemd is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with systemd; If not, see <http://www.gnu.org/licenses/>. +***/ + +#include <string.h> +#include <errno.h> + +#include "journal-rate-limit.h" +#include "list.h" +#include "util.h" +#include "hashmap.h" + +#define POOLS_MAX 5 +#define BUCKETS_MAX 127 +#define GROUPS_MAX 2047 + +static const int priority_map[] = { + [LOG_EMERG] = 0, + [LOG_ALERT] = 0, + [LOG_CRIT] = 0, + [LOG_ERR] = 1, + [LOG_WARNING] = 2, + [LOG_NOTICE] = 3, + [LOG_INFO] = 3, + [LOG_DEBUG] = 4 +}; + +typedef struct JournalRateLimitPool JournalRateLimitPool; +typedef struct JournalRateLimitGroup JournalRateLimitGroup; + +struct JournalRateLimitPool { + usec_t begin; + unsigned num; + unsigned suppressed; +}; + +struct JournalRateLimitGroup { + JournalRateLimit *parent; + + char *id; + JournalRateLimitPool pools[POOLS_MAX]; + unsigned hash; + + LIST_FIELDS(JournalRateLimitGroup, bucket); + LIST_FIELDS(JournalRateLimitGroup, lru); +}; + +struct JournalRateLimit { + usec_t interval; + unsigned burst; + + JournalRateLimitGroup* buckets[BUCKETS_MAX]; + JournalRateLimitGroup *lru, *lru_tail; + + unsigned n_groups; +}; + +JournalRateLimit *journal_rate_limit_new(usec_t interval, unsigned burst) { + JournalRateLimit *r; + + assert(interval > 0 || burst == 0); + + r = new0(JournalRateLimit, 1); + if (!r) + return NULL; + + r->interval = interval; + r->burst = burst; + + return r; +} + +static void journal_rate_limit_group_free(JournalRateLimitGroup *g) { + assert(g); + + if (g->parent) { + assert(g->parent->n_groups > 0); + + if (g->parent->lru_tail == g) + g->parent->lru_tail = g->lru_prev; + + LIST_REMOVE(JournalRateLimitGroup, lru, g->parent->lru, g); + LIST_REMOVE(JournalRateLimitGroup, bucket, g->parent->buckets[g->hash % BUCKETS_MAX], g); + + g->parent->n_groups --; + } + + free(g->id); + free(g); +} + +void journal_rate_limit_free(JournalRateLimit *r) { + assert(r); + + while (r->lru) + journal_rate_limit_group_free(r->lru); + + free(r); +} + +static bool journal_rate_limit_group_expired(JournalRateLimitGroup *g, usec_t ts) { + unsigned i; + + assert(g); + + for (i = 0; i < POOLS_MAX; i++) + if (g->pools[i].begin + g->parent->interval >= ts) + return false; + + return true; +} + +static void journal_rate_limit_vacuum(JournalRateLimit *r, usec_t ts) { + assert(r); + + /* Makes room for at least one new item, but drop all + * expored items too. */ + + while (r->n_groups >= GROUPS_MAX || + (r->lru_tail && journal_rate_limit_group_expired(r->lru_tail, ts))) + journal_rate_limit_group_free(r->lru_tail); +} + +static JournalRateLimitGroup* journal_rate_limit_group_new(JournalRateLimit *r, const char *id, usec_t ts) { + JournalRateLimitGroup *g; + + assert(r); + assert(id); + + g = new0(JournalRateLimitGroup, 1); + if (!g) + return NULL; + + g->id = strdup(id); + if (!g->id) + goto fail; + + g->hash = string_hash_func(g->id); + + journal_rate_limit_vacuum(r, ts); + + LIST_PREPEND(JournalRateLimitGroup, bucket, r->buckets[g->hash % BUCKETS_MAX], g); + LIST_PREPEND(JournalRateLimitGroup, lru, r->lru, g); + if (!g->lru_next) + r->lru_tail = g; + r->n_groups ++; + + g->parent = r; + return g; + +fail: + journal_rate_limit_group_free(g); + return NULL; +} + +static uint64_t u64log2(uint64_t n) { + unsigned r; + + if (n <= 1) + return 0; + + r = 0; + for (;;) { + n = n >> 1; + if (!n) + return r; + r++; + } +} + +static unsigned burst_modulate(unsigned burst, uint64_t available) { + unsigned k; + + /* Modulates the burst rate a bit with the amount of available + * disk space */ + + k = u64log2(available); + + /* 1MB */ + if (k <= 20) + return burst; + + burst = (burst * (k-20)) / 4; + + /* + * Example: + * + * <= 1MB = rate * 1 + * 16MB = rate * 2 + * 256MB = rate * 3 + * 4GB = rate * 4 + * 64GB = rate * 5 + * 1TB = rate * 6 + */ + + return burst; +} + +int journal_rate_limit_test(JournalRateLimit *r, const char *id, int priority, uint64_t available) { + unsigned h; + JournalRateLimitGroup *g; + JournalRateLimitPool *p; + unsigned burst; + usec_t ts; + + assert(id); + + if (!r) + return 1; + + if (r->interval == 0 || r->burst == 0) + return 1; + + burst = burst_modulate(r->burst, available); + + ts = now(CLOCK_MONOTONIC); + + h = string_hash_func(id); + g = r->buckets[h % BUCKETS_MAX]; + + LIST_FOREACH(bucket, g, g) + if (streq(g->id, id)) + break; + + if (!g) { + g = journal_rate_limit_group_new(r, id, ts); + if (!g) + return -ENOMEM; + } + + p = &g->pools[priority_map[priority]]; + + if (p->begin <= 0) { + p->suppressed = 0; + p->num = 1; + p->begin = ts; + return 1; + } + + if (p->begin + r->interval < ts) { + unsigned s; + + s = p->suppressed; + p->suppressed = 0; + p->num = 1; + p->begin = ts; + + return 1 + s; + } + + if (p->num <= burst) { + p->num++; + return 1; + } + + p->suppressed++; + return 0; +} diff --git a/src/journal/journal-rate-limit.h b/src/journal/journal-rate-limit.h new file mode 100644 index 0000000000..2bbdd5f9fe --- /dev/null +++ b/src/journal/journal-rate-limit.h @@ -0,0 +1,34 @@ +/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/ + +#ifndef foojournalratelimithfoo +#define foojournalratelimithfoo + +/*** + This file is part of systemd. + + Copyright 2011 Lennart Poettering + + systemd is free software; you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + systemd is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with systemd; If not, see <http://www.gnu.org/licenses/>. +***/ + +#include "macro.h" +#include "util.h" + +typedef struct JournalRateLimit JournalRateLimit; + +JournalRateLimit *journal_rate_limit_new(usec_t interval, unsigned burst); +void journal_rate_limit_free(JournalRateLimit *r); +int journal_rate_limit_test(JournalRateLimit *r, const char *id, int priority, uint64_t available); + +#endif diff --git a/src/journal/journal-send.c b/src/journal/journal-send.c new file mode 100644 index 0000000000..cc3cd8c303 --- /dev/null +++ b/src/journal/journal-send.c @@ -0,0 +1,259 @@ +/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/ + +/*** + This file is part of systemd. + + Copyright 2011 Lennart Poettering + + systemd is free software; you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + systemd is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with systemd; If not, see <http://www.gnu.org/licenses/>. +***/ + +#include <sys/socket.h> +#include <sys/un.h> +#include <errno.h> +#include <stddef.h> + +#include "sd-journal.h" +#include "util.h" +#include "socket-util.h" + +/* We open a single fd, and we'll share it with the current process, + * all its threads, and all its subprocesses. This means we need to + * initialize it atomically, and need to operate on it atomically + * never assuming we are the only user */ + +static int journal_fd(void) { + int fd; + static int fd_plus_one = 0; + +retry: + if (fd_plus_one > 0) + return fd_plus_one - 1; + + fd = socket(AF_UNIX, SOCK_DGRAM|SOCK_CLOEXEC, 0); + if (fd < 0) + return -errno; + + if (!__sync_bool_compare_and_swap(&fd_plus_one, 0, fd+1)) { + close_nointr_nofail(fd); + goto retry; + } + + return fd; +} + +int sd_journal_print(int priority, const char *format, ...) { + int r; + va_list ap; + + va_start(ap, format); + r = sd_journal_printv(priority, format, ap); + va_end(ap); + + return r; +} + +int sd_journal_printv(int priority, const char *format, va_list ap) { + char buffer[8 + LINE_MAX], p[11]; + struct iovec iov[2]; + + if (priority < 0 || priority > 7) + return -EINVAL; + + if (!format) + return -EINVAL; + + snprintf(p, sizeof(p), "PRIORITY=%i", priority & LOG_PRIMASK); + char_array_0(p); + + memcpy(buffer, "MESSAGE=", 8); + vsnprintf(buffer+8, sizeof(buffer) - 8, format, ap); + char_array_0(buffer); + + zero(iov); + IOVEC_SET_STRING(iov[0], buffer); + IOVEC_SET_STRING(iov[1], p); + + return sd_journal_sendv(iov, 2); +} + +int sd_journal_send(const char *format, ...) { + int r, n = 0, i = 0, j; + va_list ap; + struct iovec *iov = NULL; + + va_start(ap, format); + while (format) { + struct iovec *c; + char *buffer; + + if (i >= n) { + n = MAX(i*2, 4); + c = realloc(iov, n * sizeof(struct iovec)); + if (!c) { + r = -ENOMEM; + goto fail; + } + + iov = c; + } + + if (vasprintf(&buffer, format, ap) < 0) { + r = -ENOMEM; + goto fail; + } + + IOVEC_SET_STRING(iov[i++], buffer); + + format = va_arg(ap, char *); + } + va_end(ap); + + r = sd_journal_sendv(iov, i); + +fail: + for (j = 0; j < i; j++) + free(iov[j].iov_base); + + free(iov); + + return r; +} + +int sd_journal_sendv(const struct iovec *iov, int n) { + int fd; + struct iovec *w; + uint64_t *l; + int i, j = 0; + struct msghdr mh; + struct sockaddr_un sa; + + if (!iov || n <= 0) + return -EINVAL; + + w = alloca(sizeof(struct iovec) * n * 5); + l = alloca(sizeof(uint64_t) * n); + + for (i = 0; i < n; i++) { + char *c, *nl; + + c = memchr(iov[i].iov_base, '=', iov[i].iov_len); + if (!c) + return -EINVAL; + + nl = memchr(iov[i].iov_base, '\n', iov[i].iov_len); + if (nl) { + if (nl < c) + return -EINVAL; + + /* Already includes a newline? Bummer, then + * let's write the variable name, then a + * newline, then the size (64bit LE), followed + * by the data and a final newline */ + + w[j].iov_base = iov[i].iov_base; + w[j].iov_len = c - (char*) iov[i].iov_base; + j++; + + IOVEC_SET_STRING(w[j++], "\n"); + + l[i] = htole64(iov[i].iov_len - (c - (char*) iov[i].iov_base) - 1); + w[j].iov_base = &l[i]; + w[j].iov_len = sizeof(uint64_t); + j++; + + w[j].iov_base = c + 1; + w[j].iov_len = iov[i].iov_len - (c - (char*) iov[i].iov_base) - 1; + j++; + + } else + /* Nothing special? Then just add the line and + * append a newline */ + w[j++] = iov[i]; + + IOVEC_SET_STRING(w[j++], "\n"); + } + + fd = journal_fd(); + if (fd < 0) + return fd; + + zero(sa); + sa.sun_family = AF_UNIX; + strncpy(sa.sun_path,"/run/systemd/journal", sizeof(sa.sun_path)); + + zero(mh); + mh.msg_name = &sa; + mh.msg_namelen = offsetof(struct sockaddr_un, sun_path) + strlen(sa.sun_path); + mh.msg_iov = w; + mh.msg_iovlen = j; + + if (sendmsg(fd, &mh, MSG_NOSIGNAL) < 0) + return -errno; + + return 0; +} + +int sd_journal_stream_fd(const char *tag, int priority, int priority_prefix) { + union sockaddr_union sa; + int fd; + char *header; + size_t l; + ssize_t r; + + if (priority < 0 || priority > 7) + return -EINVAL; + + fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0); + if (fd < 0) + return -errno; + + zero(sa); + sa.un.sun_family = AF_UNIX; + strncpy(sa.un.sun_path, "/run/systemd/stdout", sizeof(sa.un.sun_path)); + + r = connect(fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path)); + if (r < 0) { + close_nointr_nofail(fd); + return -errno; + } + + if (!tag) + tag = ""; + + l = strlen(tag); + header = alloca(l + 1 + 2 + 2 + 2); + + memcpy(header, tag, l); + header[l++] = '\n'; + header[l++] = '0' + priority; + header[l++] = '\n'; + header[l++] = '0' + !!priority_prefix; + header[l++] = '\n'; + header[l++] = '0'; + header[l++] = '\n'; + + r = loop_write(fd, header, l, false); + if (r < 0) { + close_nointr_nofail(fd); + return (int) r; + } + + if ((size_t) r != l) { + close_nointr_nofail(fd); + return -errno; + } + + return fd; +} diff --git a/src/journal/journalctl.c b/src/journal/journalctl.c new file mode 100644 index 0000000000..701518244c --- /dev/null +++ b/src/journal/journalctl.c @@ -0,0 +1,544 @@ +/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/ + +/*** + This file is part of systemd. + + Copyright 2011 Lennart Poettering + + systemd is free software; you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + systemd is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with systemd; If not, see <http://www.gnu.org/licenses/>. +***/ + +#include <fcntl.h> +#include <errno.h> +#include <stddef.h> +#include <string.h> +#include <stdio.h> +#include <unistd.h> +#include <stdlib.h> +#include <sys/poll.h> +#include <time.h> +#include <getopt.h> + +#include "sd-journal.h" +#include "log.h" +#include "util.h" +#include "build.h" +#include "pager.h" + +#define PRINT_THRESHOLD 128 + +static enum { + OUTPUT_SHORT, + OUTPUT_VERBOSE, + OUTPUT_EXPORT, + OUTPUT_JSON, + _OUTPUT_MAX +} arg_output = OUTPUT_SHORT; + +static bool arg_follow = false; +static bool arg_show_all = false; +static bool arg_no_pager = false; + +static bool contains_unprintable(const void *p, size_t l) { + const char *j; + + for (j = p; j < (const char *) p + l; j++) + if (*j < ' ' || *j >= 127) + return true; + + return false; +} + +static int output_short(sd_journal *j, unsigned line) { + int r; + uint64_t realtime; + time_t t; + struct tm tm; + char buf[64]; + const void *data; + size_t length; + size_t n = 0; + + assert(j); + + r = sd_journal_get_realtime_usec(j, &realtime); + if (r < 0) { + log_error("Failed to get realtime: %s", strerror(-r)); + return r; + } + + t = (time_t) (realtime / USEC_PER_SEC); + if (strftime(buf, sizeof(buf), "%b %d %H:%M:%S", localtime_r(&t, &tm)) <= 0) { + log_error("Failed to format time."); + return -EINVAL; + } + + fputs(buf, stdout); + n += strlen(buf); + + if (sd_journal_get_data(j, "_HOSTNAME", &data, &length) >= 0 && + (arg_show_all || (!contains_unprintable(data, length) && + length < PRINT_THRESHOLD))) { + printf(" %.*s", (int) length - 10, ((const char*) data) + 10); + n += length - 10 + 1; + } + + if (sd_journal_get_data(j, "MESSAGE", &data, &length) >= 0) { + if (arg_show_all) + printf(" %.*s", (int) length - 8, ((const char*) data) + 8); + else if (contains_unprintable(data, length)) + fputs(" [blob data]", stdout); + else if (length - 8 + n < columns()) + printf(" %.*s", (int) length - 8, ((const char*) data) + 8); + else if (n < columns()) { + char *e; + + e = ellipsize_mem((const char *) data + 8, length - 8, columns() - n - 2, 90); + + if (!e) + printf(" %.*s", (int) length - 8, ((const char*) data) + 8); + else + printf(" %s", e); + + free(e); + } + } + + fputc('\n', stdout); + + return 0; +} + +static int output_verbose(sd_journal *j, unsigned line) { + const void *data; + size_t length; + char *cursor; + uint64_t realtime; + char ts[FORMAT_TIMESTAMP_MAX]; + int r; + + assert(j); + + r = sd_journal_get_realtime_usec(j, &realtime); + if (r < 0) { + log_error("Failed to get realtime timestamp: %s", strerror(-r)); + return r; + } + + r = sd_journal_get_cursor(j, &cursor); + if (r < 0) { + log_error("Failed to get cursor: %s", strerror(-r)); + return r; + } + + printf("%s [%s]\n", + format_timestamp(ts, sizeof(ts), realtime), + cursor); + + free(cursor); + + SD_JOURNAL_FOREACH_DATA(j, data, length) { + if (!arg_show_all && (length > PRINT_THRESHOLD || + contains_unprintable(data, length))) { + const char *c; + + c = memchr(data, '=', length); + if (!c) { + log_error("Invalid field."); + return -EINVAL; + } + + printf("\t%.*s=[blob data]\n", + (int) (c - (const char*) data), + (const char*) data); + } else + printf("\t%.*s\n", (int) length, (const char*) data); + } + + return 0; +} + +static int output_export(sd_journal *j, unsigned line) { + sd_id128_t boot_id; + char sid[33]; + int r; + usec_t realtime, monotonic; + char *cursor; + const void *data; + size_t length; + + assert(j); + + r = sd_journal_get_realtime_usec(j, &realtime); + if (r < 0) { + log_error("Failed to get realtime timestamp: %s", strerror(-r)); + return r; + } + + r = sd_journal_get_monotonic_usec(j, &monotonic, &boot_id); + if (r < 0) { + log_error("Failed to get monotonic timestamp: %s", strerror(-r)); + return r; + } + + r = sd_journal_get_cursor(j, &cursor); + if (r < 0) { + log_error("Failed to get cursor: %s", strerror(-r)); + return r; + } + + printf(".cursor=%s\n" + ".realtime=%llu\n" + ".monotonic=%llu\n" + ".boot_id=%s\n", + cursor, + (unsigned long long) realtime, + (unsigned long long) monotonic, + sd_id128_to_string(boot_id, sid)); + + free(cursor); + + SD_JOURNAL_FOREACH_DATA(j, data, length) { + + if (contains_unprintable(data, length)) { + const char *c; + uint64_t le64; + + c = memchr(data, '=', length); + if (!c) { + log_error("Invalid field."); + return -EINVAL; + } + + fwrite(data, c - (const char*) data, 1, stdout); + fputc('\n', stdout); + le64 = htole64(length - (c - (const char*) data) - 1); + fwrite(&le64, sizeof(le64), 1, stdout); + fwrite(c + 1, length - (c - (const char*) data) - 1, 1, stdout); + } else + fwrite(data, length, 1, stdout); + + fputc('\n', stdout); + } + + fputc('\n', stdout); + + return 0; +} + +static void json_escape(const char* p, size_t l) { + + if (contains_unprintable(p, l)) { + bool not_first = false; + + fputs("[ ", stdout); + + while (l > 0) { + if (not_first) + printf(", %u", (uint8_t) *p); + else { + not_first = true; + printf("%u", (uint8_t) *p); + } + + p++; + l--; + } + + fputs(" ]", stdout); + } else { + fputc('\"', stdout); + + while (l > 0) { + if (*p == '"' || *p == '\\') { + fputc('\\', stdout); + fputc(*p, stdout); + } else + fputc(*p, stdout); + + p++; + l--; + } + + fputc('\"', stdout); + } +} + +static int output_json(sd_journal *j, unsigned line) { + uint64_t realtime, monotonic; + char *cursor; + const void *data; + size_t length; + sd_id128_t boot_id; + char sid[33]; + int r; + + assert(j); + + r = sd_journal_get_realtime_usec(j, &realtime); + if (r < 0) { + log_error("Failed to get realtime timestamp: %s", strerror(-r)); + return r; + } + + r = sd_journal_get_monotonic_usec(j, &monotonic, &boot_id); + if (r < 0) { + log_error("Failed to get monotonic timestamp: %s", strerror(-r)); + return r; + } + + r = sd_journal_get_cursor(j, &cursor); + if (r < 0) { + log_error("Failed to get cursor: %s", strerror(-r)); + return r; + } + + if (line == 1) + fputc('\n', stdout); + else + fputs(",\n", stdout); + + printf("{\n" + "\t\".cursor\" : \"%s\",\n" + "\t\".realtime\" : %llu,\n" + "\t\".monotonic\" : %llu,\n" + "\t\".boot_id\" : \"%s\"", + cursor, + (unsigned long long) realtime, + (unsigned long long) monotonic, + sd_id128_to_string(boot_id, sid)); + + free(cursor); + + SD_JOURNAL_FOREACH_DATA(j, data, length) { + const char *c; + + c = memchr(data, '=', length); + if (!c) { + log_error("Invalid field."); + return -EINVAL; + } + + fputs(",\n\t", stdout); + json_escape(data, c - (const char*) data); + fputs(" : ", stdout); + json_escape(c + 1, length - (c - (const char*) data) - 1); + } + + fputs("\n}", stdout); + fflush(stdout); + + return 0; +} + +static int (*output_funcs[_OUTPUT_MAX])(sd_journal*j, unsigned line) = { + [OUTPUT_SHORT] = output_short, + [OUTPUT_VERBOSE] = output_verbose, + [OUTPUT_EXPORT] = output_export, + [OUTPUT_JSON] = output_json +}; + +static int help(void) { + + printf("%s [OPTIONS...] {COMMAND} ...\n\n" + "Send control commands to or query the login manager.\n\n" + " -h --help Show this help\n" + " --version Show package version\n" + " --no-pager Do not pipe output into a pager\n" + " -a --all Show all properties, including long and unprintable\n" + " -f --follow Follow journal\n" + " -o --output=STRING Change output mode (short, verbose, export, json)\n", + program_invocation_short_name); + + return 0; +} + +static int parse_argv(int argc, char *argv[]) { + + enum { + ARG_VERSION = 0x100, + ARG_NO_PAGER + }; + + static const struct option options[] = { + { "help", no_argument, NULL, 'h' }, + { "version" , no_argument, NULL, ARG_VERSION }, + { "no-pager", no_argument, NULL, ARG_NO_PAGER }, + { "follow", no_argument, NULL, 'f' }, + { "output", required_argument, NULL, 'o' }, + { "all", no_argument, NULL, 'a' }, + { NULL, 0, NULL, 0 } + }; + + int c; + + assert(argc >= 0); + assert(argv); + + while ((c = getopt_long(argc, argv, "hfo:a", options, NULL)) >= 0) { + + switch (c) { + + case 'h': + help(); + return 0; + + case ARG_VERSION: + puts(PACKAGE_STRING); + puts(DISTRIBUTION); + puts(SYSTEMD_FEATURES); + return 0; + + case ARG_NO_PAGER: + arg_no_pager = true; + break; + + case 'f': + arg_follow = true; + break; + + case 'o': + if (streq(optarg, "short")) + arg_output = OUTPUT_SHORT; + else if (streq(optarg, "verbose")) + arg_output = OUTPUT_VERBOSE; + else if (streq(optarg, "export")) + arg_output = OUTPUT_EXPORT; + else if (streq(optarg, "json")) + arg_output = OUTPUT_JSON; + else { + log_error("Unknown output '%s'.", optarg); + return -EINVAL; + } + break; + + case 'a': + arg_show_all = true; + break; + + case '?': + return -EINVAL; + + default: + log_error("Unknown option code %c", c); + return -EINVAL; + } + } + + return 1; +} + +int main(int argc, char *argv[]) { + int r, i, fd; + sd_journal *j = NULL; + unsigned line = 0; + + log_parse_environment(); + log_open(); + + r = parse_argv(argc, argv); + if (r <= 0) + goto finish; + + r = sd_journal_open(&j, 0); + if (r < 0) { + log_error("Failed to open journal: %s", strerror(-r)); + goto finish; + } + + for (i = optind; i < argc; i++) { + r = sd_journal_add_match(j, argv[i], strlen(argv[i])); + if (r < 0) { + log_error("Failed to add match: %s", strerror(-r)); + goto finish; + } + } + + fd = sd_journal_get_fd(j); + if (fd < 0) { + log_error("Failed to get wakeup fd: %s", strerror(-fd)); + goto finish; + } + + r = sd_journal_seek_head(j); + if (r < 0) { + log_error("Failed to seek to head: %s", strerror(-r)); + goto finish; + } + + if (!arg_no_pager && !arg_follow) { + columns(); + pager_open(); + } + + if (arg_output == OUTPUT_JSON) { + fputc('[', stdout); + fflush(stdout); + } + + for (;;) { + struct pollfd pollfd; + + for (;;) { + r = sd_journal_next(j); + + if (r < 0) { + log_error("Failed to iterate through journal: %s", strerror(-r)); + goto finish; + } + + if (r == 0) + break; + + line ++; + + r = output_funcs[arg_output](j, line); + if (r < 0) + goto finish; + } + + if (!arg_follow) + break; + + zero(pollfd); + pollfd.fd = fd; + pollfd.events = POLLIN; + + if (poll(&pollfd, 1, -1) < 0) { + if (errno == EINTR) + break; + + log_error("poll(): %m"); + r = -errno; + goto finish; + } + + r = sd_journal_process(j); + if (r < 0) { + log_error("Failed to process: %s", strerror(-r)); + goto finish; + } + } + + if (arg_output == OUTPUT_JSON) + fputs("\n]\n", stdout); + +finish: + if (j) + sd_journal_close(j); + + pager_close(); + + return r < 0 ? EXIT_FAILURE : EXIT_SUCCESS; +} diff --git a/src/journal/journald.c b/src/journal/journald.c new file mode 100644 index 0000000000..78ccb4e05a --- /dev/null +++ b/src/journal/journald.c @@ -0,0 +1,1909 @@ +/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/ + +/*** + This file is part of systemd. + + Copyright 2011 Lennart Poettering + + systemd is free software; you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + systemd is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with systemd; If not, see <http://www.gnu.org/licenses/>. +***/ + +#include <sys/epoll.h> +#include <sys/socket.h> +#include <errno.h> +#include <sys/signalfd.h> +#include <unistd.h> +#include <fcntl.h> +#include <sys/acl.h> +#include <acl/libacl.h> +#include <stddef.h> +#include <sys/ioctl.h> +#include <linux/sockios.h> +#include <sys/statvfs.h> + +#include "hashmap.h" +#include "journal-file.h" +#include "sd-daemon.h" +#include "socket-util.h" +#include "acl-util.h" +#include "cgroup-util.h" +#include "list.h" +#include "journal-rate-limit.h" +#include "sd-journal.h" +#include "journal-internal.h" + +#define USER_JOURNALS_MAX 1024 +#define STDOUT_STREAMS_MAX 4096 + +#define DEFAULT_RATE_LIMIT_INTERVAL (10*USEC_PER_SEC) +#define DEFAULT_RATE_LIMIT_BURST 200 + +#define RECHECK_AVAILABLE_SPACE_USEC (30*USEC_PER_SEC) + +#define RECHECK_VAR_AVAILABLE_USEC (30*USEC_PER_SEC) + +#define SYSLOG_TIMEOUT_USEC (5*USEC_PER_SEC) + +typedef struct StdoutStream StdoutStream; + +typedef struct Server { + int epoll_fd; + int signal_fd; + int syslog_fd; + int native_fd; + int stdout_fd; + + JournalFile *runtime_journal; + JournalFile *system_journal; + Hashmap *user_journals; + + uint64_t seqnum; + + char *buffer; + size_t buffer_size; + + JournalRateLimit *rate_limit; + + JournalMetrics metrics; + bool compress; + + uint64_t cached_available_space; + usec_t cached_available_space_timestamp; + + uint64_t var_available_timestamp; + + LIST_HEAD(StdoutStream, stdout_streams); + unsigned n_stdout_streams; +} Server; + +typedef enum StdoutStreamState { + STDOUT_STREAM_TAG, + STDOUT_STREAM_PRIORITY, + STDOUT_STREAM_PRIORITY_PREFIX, + STDOUT_STREAM_TEE_CONSOLE, + STDOUT_STREAM_RUNNING +} StdoutStreamState; + +struct StdoutStream { + Server *server; + StdoutStreamState state; + + int fd; + + struct ucred ucred; + + char *tag; + int priority; + bool priority_prefix:1; + bool tee_console:1; + + char buffer[LINE_MAX+1]; + size_t length; + + LIST_FIELDS(StdoutStream, stdout_stream); +}; + +static int server_flush_to_var(Server *s); + +static uint64_t available_space(Server *s) { + char ids[33]; + sd_id128_t machine; + char *p; + const char *f; + struct statvfs ss; + uint64_t sum = 0, avail = 0, ss_avail = 0; + int r; + DIR *d; + usec_t ts = now(CLOCK_MONOTONIC); + + if (s->cached_available_space_timestamp + RECHECK_AVAILABLE_SPACE_USEC > ts) + return s->cached_available_space; + + r = sd_id128_get_machine(&machine); + if (r < 0) + return 0; + + if (s->system_journal) + f = "/var/log/journal/"; + else + f = "/run/log/journal/"; + + p = strappend(f, sd_id128_to_string(machine, ids)); + if (!p) + return 0; + + d = opendir(p); + free(p); + + if (!d) + return 0; + + if (fstatvfs(dirfd(d), &ss) < 0) + goto finish; + + for (;;) { + struct stat st; + struct dirent buf, *de; + int k; + + k = readdir_r(d, &buf, &de); + if (k != 0) { + r = -k; + goto finish; + } + + if (!de) + break; + + if (!dirent_is_file_with_suffix(de, ".journal")) + continue; + + if (fstatat(dirfd(d), de->d_name, &st, AT_SYMLINK_NOFOLLOW) < 0) + continue; + + sum += (uint64_t) st.st_blocks * (uint64_t) st.st_blksize; + } + + avail = sum >= s->metrics.max_use ? 0 : s->metrics.max_use - sum; + + ss_avail = ss.f_bsize * ss.f_bavail; + + ss_avail = ss_avail < s->metrics.keep_free ? 0 : ss_avail - s->metrics.keep_free; + + if (ss_avail < avail) + avail = ss_avail; + + s->cached_available_space = avail; + s->cached_available_space_timestamp = ts; + +finish: + closedir(d); + + return avail; +} + +static void fix_perms(JournalFile *f, uid_t uid) { + acl_t acl; + acl_entry_t entry; + acl_permset_t permset; + int r; + + assert(f); + + r = fchmod_and_fchown(f->fd, 0640, 0, 0); + if (r < 0) + log_warning("Failed to fix access mode/rights on %s, ignoring: %s", f->path, strerror(-r)); + + if (uid <= 0) + return; + + acl = acl_get_fd(f->fd); + if (!acl) { + log_warning("Failed to read ACL on %s, ignoring: %m", f->path); + return; + } + + r = acl_find_uid(acl, uid, &entry); + if (r <= 0) { + + if (acl_create_entry(&acl, &entry) < 0 || + acl_set_tag_type(entry, ACL_USER) < 0 || + acl_set_qualifier(entry, &uid) < 0) { + log_warning("Failed to patch ACL on %s, ignoring: %m", f->path); + goto finish; + } + } + + if (acl_get_permset(entry, &permset) < 0 || + acl_add_perm(permset, ACL_READ) < 0 || + acl_calc_mask(&acl) < 0) { + log_warning("Failed to patch ACL on %s, ignoring: %m", f->path); + goto finish; + } + + if (acl_set_fd(f->fd, acl) < 0) + log_warning("Failed to set ACL on %s, ignoring: %m", f->path); + +finish: + acl_free(acl); +} + +static JournalFile* find_journal(Server *s, uid_t uid) { + char *p; + int r; + JournalFile *f; + char ids[33]; + sd_id128_t machine; + + assert(s); + + /* We split up user logs only on /var, not on /run. If the + * runtime file is open, we write to it exclusively, in order + * to guarantee proper order as soon as we flush /run to + * /var and close the runtime file. */ + + if (s->runtime_journal) + return s->runtime_journal; + + if (uid <= 0) + return s->system_journal; + + r = sd_id128_get_machine(&machine); + if (r < 0) + return s->system_journal; + + f = hashmap_get(s->user_journals, UINT32_TO_PTR(uid)); + if (f) + return f; + + if (asprintf(&p, "/var/log/journal/%s/user-%lu.journal", sd_id128_to_string(machine, ids), (unsigned long) uid) < 0) + return s->system_journal; + + while (hashmap_size(s->user_journals) >= USER_JOURNALS_MAX) { + /* Too many open? Then let's close one */ + f = hashmap_steal_first(s->user_journals); + assert(f); + journal_file_close(f); + } + + r = journal_file_open(p, O_RDWR|O_CREAT, 0640, s->system_journal, &f); + free(p); + + if (r < 0) + return s->system_journal; + + fix_perms(f, uid); + f->metrics = s->metrics; + f->compress = s->compress; + + r = hashmap_put(s->user_journals, UINT32_TO_PTR(uid), f); + if (r < 0) { + journal_file_close(f); + return s->system_journal; + } + + return f; +} + +static void server_vacuum(Server *s) { + Iterator i; + void *k; + char *p; + char ids[33]; + sd_id128_t machine; + int r; + JournalFile *f; + + log_info("Rotating..."); + + if (s->runtime_journal) { + r = journal_file_rotate(&s->runtime_journal); + if (r < 0) + log_error("Failed to rotate %s: %s", s->runtime_journal->path, strerror(-r)); + } + + if (s->system_journal) { + r = journal_file_rotate(&s->system_journal); + if (r < 0) + log_error("Failed to rotate %s: %s", s->system_journal->path, strerror(-r)); + } + + HASHMAP_FOREACH_KEY(f, k, s->user_journals, i) { + r = journal_file_rotate(&f); + if (r < 0) + log_error("Failed to rotate %s: %s", f->path, strerror(-r)); + else + hashmap_replace(s->user_journals, k, f); + } + + log_info("Vacuuming..."); + + r = sd_id128_get_machine(&machine); + if (r < 0) { + log_error("Failed to get machine ID: %s", strerror(-r)); + return; + } + + if (asprintf(&p, "/var/log/journal/%s", sd_id128_to_string(machine, ids)) < 0) { + log_error("Out of memory."); + return; + } + + r = journal_directory_vacuum(p, s->metrics.max_use, s->metrics.keep_free); + if (r < 0 && r != -ENOENT) + log_error("Failed to vacuum %s: %s", p, strerror(-r)); + free(p); + + if (asprintf(&p, "/run/log/journal/%s", ids) < 0) { + log_error("Out of memory."); + return; + } + + r = journal_directory_vacuum(p, s->metrics.max_use, s->metrics.keep_free); + if (r < 0 && r != -ENOENT) + log_error("Failed to vacuum %s: %s", p, strerror(-r)); + free(p); + + s->cached_available_space_timestamp = 0; +} + +static char *shortened_cgroup_path(pid_t pid) { + int r; + char *process_path, *init_path, *path; + + assert(pid > 0); + + r = cg_get_by_pid(SYSTEMD_CGROUP_CONTROLLER, pid, &process_path); + if (r < 0) + return NULL; + + r = cg_get_by_pid(SYSTEMD_CGROUP_CONTROLLER, 1, &init_path); + if (r < 0) { + free(process_path); + return NULL; + } + + if (streq(init_path, "/")) + init_path[0] = 0; + + if (startswith(process_path, init_path)) { + char *p; + + p = strdup(process_path + strlen(init_path)); + if (!p) { + free(process_path); + free(init_path); + return NULL; + } + path = p; + } else { + path = process_path; + process_path = NULL; + } + + free(process_path); + free(init_path); + + return path; +} + +static void dispatch_message_real(Server *s, + struct iovec *iovec, unsigned n, unsigned m, + struct ucred *ucred, + struct timeval *tv) { + + char *pid = NULL, *uid = NULL, *gid = NULL, + *source_time = NULL, *boot_id = NULL, *machine_id = NULL, + *comm = NULL, *cmdline = NULL, *hostname = NULL, + *audit_session = NULL, *audit_loginuid = NULL, + *exe = NULL, *cgroup = NULL; + + char idbuf[33]; + sd_id128_t id; + int r; + char *t; + uid_t loginuid = 0, realuid = 0; + JournalFile *f; + bool vacuumed = false; + + assert(s); + assert(iovec); + assert(n > 0); + assert(n + 13 <= m); + + if (ucred) { + uint32_t session; + char *path; + + realuid = ucred->uid; + + if (asprintf(&pid, "_PID=%lu", (unsigned long) ucred->pid) >= 0) + IOVEC_SET_STRING(iovec[n++], pid); + + if (asprintf(&uid, "_UID=%lu", (unsigned long) ucred->uid) >= 0) + IOVEC_SET_STRING(iovec[n++], uid); + + if (asprintf(&gid, "_GID=%lu", (unsigned long) ucred->gid) >= 0) + IOVEC_SET_STRING(iovec[n++], gid); + + r = get_process_comm(ucred->pid, &t); + if (r >= 0) { + comm = strappend("_COMM=", t); + if (comm) + IOVEC_SET_STRING(iovec[n++], comm); + free(t); + } + + r = get_process_exe(ucred->pid, &t); + if (r >= 0) { + exe = strappend("_EXE=", t); + if (comm) + IOVEC_SET_STRING(iovec[n++], exe); + free(t); + } + + r = get_process_cmdline(ucred->pid, LINE_MAX, false, &t); + if (r >= 0) { + cmdline = strappend("_CMDLINE=", t); + if (cmdline) + IOVEC_SET_STRING(iovec[n++], cmdline); + free(t); + } + + r = audit_session_from_pid(ucred->pid, &session); + if (r >= 0) + if (asprintf(&audit_session, "_AUDIT_SESSION=%lu", (unsigned long) session) >= 0) + IOVEC_SET_STRING(iovec[n++], audit_session); + + r = audit_loginuid_from_pid(ucred->pid, &loginuid); + if (r >= 0) + if (asprintf(&audit_loginuid, "_AUDIT_LOGINUID=%lu", (unsigned long) loginuid) >= 0) + IOVEC_SET_STRING(iovec[n++], audit_loginuid); + + path = shortened_cgroup_path(ucred->pid); + if (path) { + cgroup = strappend("_SYSTEMD_CGROUP=", path); + if (cgroup) + IOVEC_SET_STRING(iovec[n++], cgroup); + + free(path); + } + } + + if (tv) { + if (asprintf(&source_time, "_SOURCE_REALTIME_TIMESTAMP=%llu", + (unsigned long long) timeval_load(tv)) >= 0) + IOVEC_SET_STRING(iovec[n++], source_time); + } + + /* Note that strictly speaking storing the boot id here is + * redundant since the entry includes this in-line + * anyway. However, we need this indexed, too. */ + r = sd_id128_get_boot(&id); + if (r >= 0) + if (asprintf(&boot_id, "_BOOT_ID=%s", sd_id128_to_string(id, idbuf)) >= 0) + IOVEC_SET_STRING(iovec[n++], boot_id); + + r = sd_id128_get_machine(&id); + if (r >= 0) + if (asprintf(&machine_id, "_MACHINE_ID=%s", sd_id128_to_string(id, idbuf)) >= 0) + IOVEC_SET_STRING(iovec[n++], machine_id); + + t = gethostname_malloc(); + if (t) { + hostname = strappend("_HOSTNAME=", t); + if (hostname) + IOVEC_SET_STRING(iovec[n++], hostname); + free(t); + } + + assert(n <= m); + + server_flush_to_var(s); + +retry: + f = find_journal(s, realuid == 0 ? 0 : loginuid); + if (!f) + log_warning("Dropping message, as we can't find a place to store the data."); + else { + r = journal_file_append_entry(f, NULL, iovec, n, &s->seqnum, NULL, NULL); + + if (r == -E2BIG && !vacuumed) { + log_info("Allocation limit reached."); + + server_vacuum(s); + vacuumed = true; + + log_info("Retrying write."); + goto retry; + } + + if (r < 0) + log_error("Failed to write entry, ignoring: %s", strerror(-r)); + } + + free(pid); + free(uid); + free(gid); + free(comm); + free(exe); + free(cmdline); + free(source_time); + free(boot_id); + free(machine_id); + free(hostname); + free(audit_session); + free(audit_loginuid); + free(cgroup); +} + +static void dispatch_message(Server *s, + struct iovec *iovec, unsigned n, unsigned m, + struct ucred *ucred, + struct timeval *tv, + int priority) { + int rl; + char *path = NULL, *c; + + assert(s); + assert(iovec || n == 0); + + if (n == 0) + return; + + if (!ucred) + goto finish; + + path = shortened_cgroup_path(ucred->pid); + if (!path) + goto finish; + + /* example: /user/lennart/3/foobar + * /system/dbus.service/foobar + * + * So let's cut of everything past the third /, since that is + * wher user directories start */ + + c = strchr(path, '/'); + if (c) { + c = strchr(c+1, '/'); + if (c) { + c = strchr(c+1, '/'); + if (c) + *c = 0; + } + } + + rl = journal_rate_limit_test(s->rate_limit, path, priority, available_space(s)); + + if (rl == 0) { + free(path); + return; + } + + if (rl > 1) { + int j = 0; + char suppress_message[LINE_MAX]; + struct iovec suppress_iovec[15]; + + /* Write a suppression message if we suppressed something */ + + snprintf(suppress_message, sizeof(suppress_message), "MESSAGE=Suppressed %u messages from %s", rl - 1, path); + char_array_0(suppress_message); + + IOVEC_SET_STRING(suppress_iovec[j++], "PRIORITY=5"); + IOVEC_SET_STRING(suppress_iovec[j++], suppress_message); + + dispatch_message_real(s, suppress_iovec, j, ELEMENTSOF(suppress_iovec), NULL, NULL); + } + + free(path); + +finish: + dispatch_message_real(s, iovec, n, m, ucred, tv); +} + +static void process_syslog_message(Server *s, const char *buf, struct ucred *ucred, struct timeval *tv) { + char *message = NULL, *syslog_priority = NULL, *syslog_facility = NULL; + struct iovec iovec[16]; + unsigned n = 0; + int priority = LOG_USER | LOG_INFO; + + assert(s); + assert(buf); + + parse_syslog_priority((char**) &buf, &priority); + skip_syslog_date((char**) &buf); + + if (asprintf(&syslog_priority, "PRIORITY=%i", priority & LOG_PRIMASK) >= 0) + IOVEC_SET_STRING(iovec[n++], syslog_priority); + + if (asprintf(&syslog_facility, "SYSLOG_FACILITY=%i", LOG_FAC(priority)) >= 0) + IOVEC_SET_STRING(iovec[n++], syslog_facility); + + message = strappend("MESSAGE=", buf); + if (message) + IOVEC_SET_STRING(iovec[n++], message); + + dispatch_message(s, iovec, n, ELEMENTSOF(iovec), ucred, tv, priority & LOG_PRIMASK); + + free(message); + free(syslog_facility); + free(syslog_priority); +} + +static bool valid_user_field(const char *p, size_t l) { + const char *a; + + /* We kinda enforce POSIX syntax recommendations for + environment variables here, but make a couple of additional + requirements. + + http://pubs.opengroup.org/onlinepubs/000095399/basedefs/xbd_chap08.html */ + + /* No empty field names */ + if (l <= 0) + return false; + + /* Don't allow names longer than 64 chars */ + if (l > 64) + return false; + + /* Variables starting with an underscore are protected */ + if (p[0] == '_') + return false; + + /* Don't allow digits as first character */ + if (p[0] >= '0' && p[0] <= '9') + return false; + + /* Only allow A-Z0-9 and '_' */ + for (a = p; a < p + l; a++) + if (!((*a >= 'A' && *a <= 'Z') || + (*a >= '0' && *a <= '9') || + *a == '_')) + return false; + + return true; +} + +static void process_native_message(Server *s, const void *buffer, size_t buffer_size, struct ucred *ucred, struct timeval *tv) { + struct iovec *iovec = NULL; + unsigned n = 0, m = 0, j; + const char *p; + size_t remaining; + int priority = LOG_INFO; + + assert(s); + assert(buffer || n == 0); + + p = buffer; + remaining = buffer_size; + + while (remaining > 0) { + const char *e, *q; + + e = memchr(p, '\n', remaining); + + if (!e) { + /* Trailing noise, let's ignore it, and flush what we collected */ + log_debug("Received message with trailing noise, ignoring."); + break; + } + + if (e == p) { + /* Entry separator */ + dispatch_message(s, iovec, n, m, ucred, tv, priority); + n = 0; + priority = LOG_INFO; + + p++; + remaining--; + continue; + } + + if (*p == '.' || *p == '#') { + /* Ignore control commands for now, and + * comments too. */ + remaining -= (e - p) + 1; + p = e + 1; + continue; + } + + /* A property follows */ + + if (n+13 >= m) { + struct iovec *c; + unsigned u; + + u = MAX((n+13U) * 2U, 4U); + c = realloc(iovec, u * sizeof(struct iovec)); + if (!c) { + log_error("Out of memory"); + break; + } + + iovec = c; + m = u; + } + + q = memchr(p, '=', e - p); + if (q) { + if (valid_user_field(p, q - p)) { + /* If the field name starts with an + * underscore, skip the variable, + * since that indidates a trusted + * field */ + iovec[n].iov_base = (char*) p; + iovec[n].iov_len = e - p; + n++; + + /* We need to determine the priority + * of this entry for the rate limiting + * logic */ + if (e - p == 10 && + memcmp(p, "PRIORITY=", 10) == 0 && + p[10] >= '0' && + p[10] <= '9') + priority = p[10] - '0'; + } + + remaining -= (e - p) + 1; + p = e + 1; + continue; + } else { + uint64_t l; + char *k; + + if (remaining < e - p + 1 + sizeof(uint64_t) + 1) { + log_debug("Failed to parse message, ignoring."); + break; + } + + memcpy(&l, e + 1, sizeof(uint64_t)); + l = le64toh(l); + + if (remaining < e - p + 1 + sizeof(uint64_t) + l + 1 || + e[1+sizeof(uint64_t)+l] != '\n') { + log_debug("Failed to parse message, ignoring."); + break; + } + + k = malloc((e - p) + 1 + l); + if (!k) { + log_error("Out of memory"); + break; + } + + memcpy(k, p, e - p); + k[e - p] = '='; + memcpy(k + (e - p) + 1, e + 1 + sizeof(uint64_t), l); + + if (valid_user_field(p, e - p)) { + iovec[n].iov_base = k; + iovec[n].iov_len = (e - p) + 1 + l; + n++; + } else + free(k); + + remaining -= (e - p) + 1 + sizeof(uint64_t) + l + 1; + p = e + 1 + sizeof(uint64_t) + l + 1; + } + } + + dispatch_message(s, iovec, n, m, ucred, tv, priority); + + for (j = 0; j < n; j++) + if (iovec[j].iov_base < buffer || + (const uint8_t*) iovec[j].iov_base >= (const uint8_t*) buffer + buffer_size) + free(iovec[j].iov_base); +} + +static int stdout_stream_log(StdoutStream *s, const char *p, size_t l) { + struct iovec iovec[15]; + char *message = NULL, *syslog_priority = NULL; + unsigned n = 0; + size_t tag_len; + int priority; + + assert(s); + assert(p); + + priority = s->priority; + + if (s->priority_prefix && + l > 3 && + p[0] == '<' && + p[1] >= '0' && p[1] <= '7' && + p[2] == '>') { + + priority = p[1] - '0'; + p += 3; + l -= 3; + } + + if (l <= 0) + return 0; + + if (asprintf(&syslog_priority, "PRIORITY=%i", priority) >= 0) + IOVEC_SET_STRING(iovec[n++], syslog_priority); + + tag_len = s->tag ? strlen(s->tag) + 2: 0; + message = malloc(8 + tag_len + l); + if (message) { + memcpy(message, "MESSAGE=", 8); + + if (s->tag) { + memcpy(message+8, s->tag, tag_len-2); + memcpy(message+8+tag_len-2, ": ", 2); + } + + memcpy(message+8+tag_len, p, l); + iovec[n].iov_base = message; + iovec[n].iov_len = 8+tag_len+l; + n++; + } + + dispatch_message(s->server, iovec, n, ELEMENTSOF(iovec), &s->ucred, NULL, priority); + + if (s->tee_console) { + int console; + + console = open_terminal("/dev/console", O_WRONLY|O_NOCTTY|O_CLOEXEC); + if (console >= 0) { + n = 0; + if (s->tag) { + IOVEC_SET_STRING(iovec[n++], s->tag); + IOVEC_SET_STRING(iovec[n++], ": "); + } + + iovec[n].iov_base = (void*) p; + iovec[n].iov_len = l; + n++; + + IOVEC_SET_STRING(iovec[n++], (char*) "\n"); + + writev(console, iovec, n); + } + } + + free(message); + free(syslog_priority); + + return 0; +} + +static int stdout_stream_line(StdoutStream *s, const char *p, size_t l) { + assert(s); + assert(p); + + while (l > 0 && strchr(WHITESPACE, *p)) { + l--; + p++; + } + + while (l > 0 && strchr(WHITESPACE, *(p+l-1))) + l--; + + switch (s->state) { + + case STDOUT_STREAM_TAG: + + if (l > 0) { + s->tag = strndup(p, l); + if (!s->tag) { + log_error("Out of memory"); + return -EINVAL; + } + } + + s->state = STDOUT_STREAM_PRIORITY; + return 0; + + case STDOUT_STREAM_PRIORITY: + if (l != 1 || *p < '0' || *p > '7') { + log_warning("Failed to parse log priority line."); + return -EINVAL; + } + + s->priority = *p - '0'; + s->state = STDOUT_STREAM_PRIORITY_PREFIX; + return 0; + + case STDOUT_STREAM_PRIORITY_PREFIX: + if (l != 1 || *p < '0' || *p > '1') { + log_warning("Failed to parse priority prefix line."); + return -EINVAL; + } + + s->priority_prefix = *p - '0'; + s->state = STDOUT_STREAM_TEE_CONSOLE; + return 0; + + case STDOUT_STREAM_TEE_CONSOLE: + if (l != 1 || *p < '0' || *p > '1') { + log_warning("Failed to parse tee to console line."); + return -EINVAL; + } + + s->tee_console = *p - '0'; + s->state = STDOUT_STREAM_RUNNING; + return 0; + + case STDOUT_STREAM_RUNNING: + return stdout_stream_log(s, p, l); + } + + assert_not_reached("Unknown stream state"); +} + +static int stdout_stream_scan(StdoutStream *s, bool force_flush) { + char *p; + size_t remaining; + int r; + + assert(s); + + p = s->buffer; + remaining = s->length; + for (;;) { + char *end; + size_t skip; + + end = memchr(p, '\n', remaining); + if (!end) { + if (remaining >= LINE_MAX) { + end = p + LINE_MAX; + skip = LINE_MAX; + } else + break; + } else + skip = end - p + 1; + + r = stdout_stream_line(s, p, end - p); + if (r < 0) + return r; + + remaining -= skip; + p += skip; + } + + if (force_flush && remaining > 0) { + r = stdout_stream_line(s, p, remaining); + if (r < 0) + return r; + + p += remaining; + remaining = 0; + } + + if (p > s->buffer) { + memmove(s->buffer, p, remaining); + s->length = remaining; + } + + return 0; +} + +static int stdout_stream_process(StdoutStream *s) { + ssize_t l; + int r; + + assert(s); + + l = read(s->fd, s->buffer+s->length, sizeof(s->buffer)-1-s->length); + if (l < 0) { + + if (errno == EAGAIN) + return 0; + + log_warning("Failed to read from stream: %m"); + return -errno; + } + + if (l == 0) { + r = stdout_stream_scan(s, true); + if (r < 0) + return r; + + return 0; + } + + s->length += l; + r = stdout_stream_scan(s, false); + if (r < 0) + return r; + + return 1; + +} + +static void stdout_stream_free(StdoutStream *s) { + assert(s); + + if (s->server) { + assert(s->server->n_stdout_streams > 0); + s->server->n_stdout_streams --; + LIST_REMOVE(StdoutStream, stdout_stream, s->server->stdout_streams, s); + } + + if (s->fd >= 0) { + if (s->server) + epoll_ctl(s->server->epoll_fd, EPOLL_CTL_DEL, s->fd, NULL); + + close_nointr_nofail(s->fd); + } + + free(s->tag); + free(s); +} + +static int stdout_stream_new(Server *s) { + StdoutStream *stream; + int fd, r; + socklen_t len; + struct epoll_event ev; + + assert(s); + + fd = accept4(s->stdout_fd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC); + if (fd < 0) { + if (errno == EAGAIN) + return 0; + + log_error("Failed to accept stdout connection: %m"); + return -errno; + } + + if (s->n_stdout_streams >= STDOUT_STREAMS_MAX) { + log_warning("Too many stdout streams, refusing connection."); + close_nointr_nofail(fd); + return 0; + } + + stream = new0(StdoutStream, 1); + if (!stream) { + log_error("Out of memory."); + close_nointr_nofail(fd); + return -ENOMEM; + } + + stream->fd = fd; + + len = sizeof(stream->ucred); + if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &stream->ucred, &len) < 0) { + log_error("Failed to determine peer credentials: %m"); + r = -errno; + goto fail; + } + + if (shutdown(fd, SHUT_WR) < 0) { + log_error("Failed to shutdown writing side of socket: %m"); + r = -errno; + goto fail; + } + + zero(ev); + ev.data.ptr = stream; + ev.events = EPOLLIN; + if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, fd, &ev) < 0) { + log_error("Failed to add stream to event loop: %m"); + r = -errno; + goto fail; + } + + stream->server = s; + LIST_PREPEND(StdoutStream, stdout_stream, s->stdout_streams, stream); + s->n_stdout_streams ++; + + return 0; + +fail: + stdout_stream_free(stream); + return r; +} + +static int system_journal_open(Server *s) { + int r; + char *fn; + sd_id128_t machine; + char ids[33]; + + r = sd_id128_get_machine(&machine); + if (r < 0) + return r; + + sd_id128_to_string(machine, ids); + + if (!s->system_journal) { + + /* First try to create the machine path, but not the prefix */ + fn = strappend("/var/log/journal/", ids); + if (!fn) + return -ENOMEM; + (void) mkdir(fn, 0755); + free(fn); + + /* The create the system journal file */ + fn = join("/var/log/journal/", ids, "/system.journal", NULL); + if (!fn) + return -ENOMEM; + + r = journal_file_open(fn, O_RDWR|O_CREAT, 0640, NULL, &s->system_journal); + free(fn); + + if (r >= 0) { + s->system_journal->metrics = s->metrics; + s->system_journal->compress = s->compress; + + fix_perms(s->system_journal, 0); + } else if (r < 0) { + + if (r == -ENOENT) + r = 0; + else { + log_error("Failed to open system journal: %s", strerror(-r)); + return r; + } + } + } + + if (!s->runtime_journal) { + + fn = join("/run/log/journal/", ids, "/system.journal", NULL); + if (!fn) + return -ENOMEM; + + if (s->system_journal) { + + /* Try to open the runtime journal, but only + * if it already exists, so that we can flush + * it into the system journal */ + + r = journal_file_open(fn, O_RDWR, 0640, NULL, &s->runtime_journal); + free(fn); + + if (r < 0) { + + if (r == -ENOENT) + r = 0; + else { + log_error("Failed to open runtime journal: %s", strerror(-r)); + return r; + } + } + + } else { + + /* OK, we really need the runtime journal, so create + * it if necessary. */ + + (void) mkdir_parents(fn, 0755); + r = journal_file_open(fn, O_RDWR|O_CREAT, 0640, NULL, &s->runtime_journal); + free(fn); + + if (r < 0) { + log_error("Failed to open runtime journal: %s", strerror(-r)); + return r; + } + } + + if (s->runtime_journal) { + s->runtime_journal->metrics = s->metrics; + s->runtime_journal->compress = s->compress; + + fix_perms(s->runtime_journal, 0); + } + } + + return r; +} + +static int server_flush_to_var(Server *s) { + char path[] = "/run/log/journal/xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"; + Object *o = NULL; + int r; + sd_id128_t machine; + sd_journal *j; + usec_t ts; + + assert(s); + + if (!s->runtime_journal) + return 0; + + ts = now(CLOCK_MONOTONIC); + if (s->var_available_timestamp + RECHECK_VAR_AVAILABLE_USEC > ts) + return 0; + + s->var_available_timestamp = ts; + + system_journal_open(s); + + if (!s->system_journal) + return 0; + + r = sd_id128_get_machine(&machine); + if (r < 0) { + log_error("Failed to get machine id: %s", strerror(-r)); + return r; + } + + r = sd_journal_open(&j, SD_JOURNAL_RUNTIME_ONLY); + if (r < 0) { + log_error("Failed to read runtime journal: %s", strerror(-r)); + return r; + } + + SD_JOURNAL_FOREACH(j) { + JournalFile *f; + + f = j->current_file; + assert(f && f->current_offset > 0); + + r = journal_file_move_to_object(f, OBJECT_ENTRY, f->current_offset, &o); + if (r < 0) { + log_error("Can't read entry: %s", strerror(-r)); + goto finish; + } + + r = journal_file_copy_entry(f, s->system_journal, o, f->current_offset, NULL, NULL, NULL); + if (r == -E2BIG) { + log_info("Allocation limit reached."); + + journal_file_post_change(s->system_journal); + server_vacuum(s); + + r = journal_file_copy_entry(f, s->system_journal, o, f->current_offset, NULL, NULL, NULL); + } + + if (r < 0) { + log_error("Can't write entry: %s", strerror(-r)); + goto finish; + } + } + +finish: + journal_file_post_change(s->system_journal); + + journal_file_close(s->runtime_journal); + s->runtime_journal = NULL; + + if (r >= 0) { + sd_id128_to_string(machine, path + 17); + rm_rf(path, false, true, false); + } + + return r; +} + +static void forward_syslog(Server *s, const void *buffer, size_t length, struct ucred *ucred, struct timeval *tv) { + struct msghdr msghdr; + struct iovec iovec; + struct cmsghdr *cmsg; + union { + struct cmsghdr cmsghdr; + uint8_t buf[CMSG_SPACE(sizeof(struct ucred)) + + CMSG_SPACE(sizeof(struct timeval))]; + } control; + union sockaddr_union sa; + + assert(s); + + zero(msghdr); + + zero(iovec); + iovec.iov_base = (void*) buffer; + iovec.iov_len = length; + msghdr.msg_iov = &iovec; + msghdr.msg_iovlen = 1; + + zero(sa); + sa.un.sun_family = AF_UNIX; + strncpy(sa.un.sun_path, "/run/systemd/syslog", sizeof(sa.un.sun_path)); + msghdr.msg_name = &sa; + msghdr.msg_namelen = offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path); + + zero(control); + msghdr.msg_control = &control; + msghdr.msg_controllen = sizeof(control); + + cmsg = CMSG_FIRSTHDR(&msghdr); + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_type = SCM_CREDENTIALS; + cmsg->cmsg_len = CMSG_LEN(sizeof(struct ucred)); + memcpy(CMSG_DATA(cmsg), ucred, sizeof(struct ucred)); + msghdr.msg_controllen = cmsg->cmsg_len; + + /* Forward the syslog message we received via /dev/log to + * /run/systemd/syslog. Unfortunately we currently can't set + * the SO_TIMESTAMP auxiliary data, and hence we don't. */ + + if (sendmsg(s->syslog_fd, &msghdr, MSG_NOSIGNAL) >= 0) + return; + + if (errno == ESRCH) { + struct ucred u; + + /* Hmm, presumably the sender process vanished + * by now, so let's fix it as good as we + * can, and retry */ + + u = *ucred; + u.pid = getpid(); + memcpy(CMSG_DATA(cmsg), &u, sizeof(struct ucred)); + + if (sendmsg(s->syslog_fd, &msghdr, MSG_NOSIGNAL) >= 0) + return; + } + + log_debug("Failed to forward syslog message: %m"); +} + +static int process_event(Server *s, struct epoll_event *ev) { + assert(s); + + if (ev->data.fd == s->signal_fd) { + struct signalfd_siginfo sfsi; + ssize_t n; + + if (ev->events != EPOLLIN) { + log_info("Got invalid event from epoll."); + return -EIO; + } + + n = read(s->signal_fd, &sfsi, sizeof(sfsi)); + if (n != sizeof(sfsi)) { + + if (n >= 0) + return -EIO; + + if (errno == EINTR || errno == EAGAIN) + return 0; + + return -errno; + } + + if (sfsi.ssi_signo == SIGUSR1) { + server_flush_to_var(s); + return 0; + } + + log_debug("Received SIG%s", signal_to_string(sfsi.ssi_signo)); + return 0; + + } else if (ev->data.fd == s->native_fd || + ev->data.fd == s->syslog_fd) { + + if (ev->events != EPOLLIN) { + log_info("Got invalid event from epoll."); + return -EIO; + } + + for (;;) { + struct msghdr msghdr; + struct iovec iovec; + struct ucred *ucred = NULL; + struct timeval *tv = NULL; + struct cmsghdr *cmsg; + union { + struct cmsghdr cmsghdr; + uint8_t buf[CMSG_SPACE(sizeof(struct ucred)) + + CMSG_SPACE(sizeof(struct timeval))]; + } control; + ssize_t n; + int v; + + if (ioctl(ev->data.fd, SIOCINQ, &v) < 0) { + log_error("SIOCINQ failed: %m"); + return -errno; + } + + if (v <= 0) + return 1; + + if (s->buffer_size < (size_t) v) { + void *b; + size_t l; + + l = MAX(LINE_MAX + (size_t) v, s->buffer_size * 2); + b = realloc(s->buffer, l+1); + + if (!b) { + log_error("Couldn't increase buffer."); + return -ENOMEM; + } + + s->buffer_size = l; + s->buffer = b; + } + + zero(iovec); + iovec.iov_base = s->buffer; + iovec.iov_len = s->buffer_size; + + zero(control); + zero(msghdr); + msghdr.msg_iov = &iovec; + msghdr.msg_iovlen = 1; + msghdr.msg_control = &control; + msghdr.msg_controllen = sizeof(control); + + n = recvmsg(ev->data.fd, &msghdr, MSG_DONTWAIT); + if (n < 0) { + + if (errno == EINTR || errno == EAGAIN) + return 1; + + log_error("recvmsg() failed: %m"); + return -errno; + } + + for (cmsg = CMSG_FIRSTHDR(&msghdr); cmsg; cmsg = CMSG_NXTHDR(&msghdr, cmsg)) { + + if (cmsg->cmsg_level == SOL_SOCKET && + cmsg->cmsg_type == SCM_CREDENTIALS && + cmsg->cmsg_len == CMSG_LEN(sizeof(struct ucred))) + ucred = (struct ucred*) CMSG_DATA(cmsg); + else if (cmsg->cmsg_level == SOL_SOCKET && + cmsg->cmsg_type == SO_TIMESTAMP && + cmsg->cmsg_len == CMSG_LEN(sizeof(struct timeval))) + tv = (struct timeval*) CMSG_DATA(cmsg); + } + + if (ev->data.fd == s->syslog_fd) { + char *e; + + e = memchr(s->buffer, '\n', n); + if (e) + *e = 0; + else + s->buffer[n] = 0; + + forward_syslog(s, s->buffer, n, ucred, tv); + process_syslog_message(s, strstrip(s->buffer), ucred, tv); + } else + process_native_message(s, s->buffer, n, ucred, tv); + } + + return 1; + + } else if (ev->data.fd == s->stdout_fd) { + + if (ev->events != EPOLLIN) { + log_info("Got invalid event from epoll."); + return -EIO; + } + + stdout_stream_new(s); + return 1; + + } else { + StdoutStream *stream; + + if ((ev->events|EPOLLIN|EPOLLHUP) != (EPOLLIN|EPOLLHUP)) { + log_info("Got invalid event from epoll."); + return -EIO; + } + + /* If it is none of the well-known fds, it must be an + * stdout stream fd. Note that this is a bit ugly here + * (since we rely that none of the well-known fds + * could be interpreted as pointer), but nonetheless + * safe, since the well-known fds would never get an + * fd > 4096, i.e. beyond the first memory page */ + + stream = ev->data.ptr; + + if (stdout_stream_process(stream) <= 0) + stdout_stream_free(stream); + + return 1; + } + + log_error("Unknown event."); + return 0; +} + +static int open_syslog_socket(Server *s) { + union sockaddr_union sa; + int one, r; + struct epoll_event ev; + struct timeval tv; + + assert(s); + + if (s->syslog_fd < 0) { + + s->syslog_fd = socket(AF_UNIX, SOCK_DGRAM|SOCK_CLOEXEC, 0); + if (s->syslog_fd < 0) { + log_error("socket() failed: %m"); + return -errno; + } + + zero(sa); + sa.un.sun_family = AF_UNIX; + strncpy(sa.un.sun_path, "/dev/log", sizeof(sa.un.sun_path)); + + unlink(sa.un.sun_path); + + r = bind(s->syslog_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path)); + if (r < 0) { + log_error("bind() failed: %m"); + return -errno; + } + + chmod(sa.un.sun_path, 0666); + } + + one = 1; + r = setsockopt(s->syslog_fd, SOL_SOCKET, SO_PASSCRED, &one, sizeof(one)); + if (r < 0) { + log_error("SO_PASSCRED failed: %m"); + return -errno; + } + + one = 1; + r = setsockopt(s->syslog_fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one)); + if (r < 0) { + log_error("SO_TIMESTAMP failed: %m"); + return -errno; + } + + /* Since we use the same socket for forwarding this to some + * other syslog implementation, make sure we don't hang + * forever */ + timeval_store(&tv, SYSLOG_TIMEOUT_USEC); + if (setsockopt(s->syslog_fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)) < 0) { + log_error("SO_SNDTIMEO failed: %m"); + return -errno; + } + + zero(ev); + ev.events = EPOLLIN; + ev.data.fd = s->syslog_fd; + if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->syslog_fd, &ev) < 0) { + log_error("Failed to add syslog server fd to epoll object: %m"); + return -errno; + } + + return 0; +} + +static int open_native_socket(Server*s) { + union sockaddr_union sa; + int one, r; + struct epoll_event ev; + + assert(s); + + if (s->native_fd < 0) { + + s->native_fd = socket(AF_UNIX, SOCK_DGRAM|SOCK_CLOEXEC, 0); + if (s->native_fd < 0) { + log_error("socket() failed: %m"); + return -errno; + } + + zero(sa); + sa.un.sun_family = AF_UNIX; + strncpy(sa.un.sun_path, "/run/systemd/journal", sizeof(sa.un.sun_path)); + + unlink(sa.un.sun_path); + + r = bind(s->native_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path)); + if (r < 0) { + log_error("bind() failed: %m"); + return -errno; + } + + chmod(sa.un.sun_path, 0666); + } + + one = 1; + r = setsockopt(s->native_fd, SOL_SOCKET, SO_PASSCRED, &one, sizeof(one)); + if (r < 0) { + log_error("SO_PASSCRED failed: %m"); + return -errno; + } + + one = 1; + r = setsockopt(s->native_fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one)); + if (r < 0) { + log_error("SO_TIMESTAMP failed: %m"); + return -errno; + } + + zero(ev); + ev.events = EPOLLIN; + ev.data.fd = s->native_fd; + if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->native_fd, &ev) < 0) { + log_error("Failed to add native server fd to epoll object: %m"); + return -errno; + } + + return 0; +} + +static int open_stdout_socket(Server *s) { + union sockaddr_union sa; + int r; + struct epoll_event ev; + + assert(s); + + if (s->stdout_fd < 0) { + + s->stdout_fd = socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0); + if (s->stdout_fd < 0) { + log_error("socket() failed: %m"); + return -errno; + } + + zero(sa); + sa.un.sun_family = AF_UNIX; + strncpy(sa.un.sun_path, "/run/systemd/stdout", sizeof(sa.un.sun_path)); + + unlink(sa.un.sun_path); + + r = bind(s->stdout_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path)); + if (r < 0) { + log_error("bind() failed: %m"); + return -errno; + } + + chmod(sa.un.sun_path, 0666); + + if (listen(s->stdout_fd, SOMAXCONN) < 0) { + log_error("liste() failed: %m"); + return -errno; + } + } + + zero(ev); + ev.events = EPOLLIN; + ev.data.fd = s->stdout_fd; + if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->stdout_fd, &ev) < 0) { + log_error("Failed to add stdout server fd to epoll object: %m"); + return -errno; + } + + return 0; +} + +static int open_signalfd(Server *s) { + sigset_t mask; + struct epoll_event ev; + + assert(s); + + assert_se(sigemptyset(&mask) == 0); + sigset_add_many(&mask, SIGINT, SIGTERM, SIGUSR1, -1); + assert_se(sigprocmask(SIG_SETMASK, &mask, NULL) == 0); + + s->signal_fd = signalfd(-1, &mask, SFD_NONBLOCK|SFD_CLOEXEC); + if (s->signal_fd < 0) { + log_error("signalfd(): %m"); + return -errno; + } + + zero(ev); + ev.events = EPOLLIN; + ev.data.fd = s->signal_fd; + + if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->signal_fd, &ev) < 0) { + log_error("epoll_ctl(): %m"); + return -errno; + } + + return 0; +} + +static int server_init(Server *s) { + int n, r, fd; + + assert(s); + + zero(*s); + s->syslog_fd = s->native_fd = s->stdout_fd = s->signal_fd = s->epoll_fd = -1; + s->metrics.max_size = DEFAULT_MAX_SIZE; + s->metrics.min_size = DEFAULT_MIN_SIZE; + s->metrics.keep_free = DEFAULT_KEEP_FREE; + s->metrics.max_use = DEFAULT_MAX_USE; + s->compress = true; + + s->user_journals = hashmap_new(trivial_hash_func, trivial_compare_func); + if (!s->user_journals) { + log_error("Out of memory."); + return -ENOMEM; + } + + s->epoll_fd = epoll_create1(EPOLL_CLOEXEC); + if (s->epoll_fd < 0) { + log_error("Failed to create epoll object: %m"); + return -errno; + } + + n = sd_listen_fds(true); + if (n < 0) { + log_error("Failed to read listening file descriptors from environment: %s", strerror(-n)); + return n; + } + + for (fd = SD_LISTEN_FDS_START; fd < SD_LISTEN_FDS_START + n; fd++) { + + if (sd_is_socket_unix(fd, SOCK_DGRAM, -1, "/run/systemd/native", 0) > 0) { + + if (s->native_fd >= 0) { + log_error("Too many native sockets passed."); + return -EINVAL; + } + + s->native_fd = fd; + + } else if (sd_is_socket_unix(fd, SOCK_STREAM, 1, "/run/systemd/stdout", 0) > 0) { + + if (s->stdout_fd >= 0) { + log_error("Too many stdout sockets passed."); + return -EINVAL; + } + + s->stdout_fd = fd; + + } else if (sd_is_socket_unix(fd, SOCK_DGRAM, -1, "/dev/log", 0) > 0) { + + if (s->syslog_fd >= 0) { + log_error("Too many /dev/log sockets passed."); + return -EINVAL; + } + + s->syslog_fd = fd; + + } else { + log_error("Unknown socket passed."); + return -EINVAL; + } + } + + r = open_syslog_socket(s); + if (r < 0) + return r; + + r = open_native_socket(s); + if (r < 0) + return r; + + r = open_stdout_socket(s); + if (r < 0) + return r; + + r = system_journal_open(s); + if (r < 0) + return r; + + r = open_signalfd(s); + if (r < 0) + return r; + + s->rate_limit = journal_rate_limit_new(DEFAULT_RATE_LIMIT_INTERVAL, DEFAULT_RATE_LIMIT_BURST); + if (!s->rate_limit) + return -ENOMEM; + + return 0; +} + +static void server_done(Server *s) { + JournalFile *f; + assert(s); + + while (s->stdout_streams) + stdout_stream_free(s->stdout_streams); + + if (s->system_journal) + journal_file_close(s->system_journal); + + if (s->runtime_journal) + journal_file_close(s->runtime_journal); + + while ((f = hashmap_steal_first(s->user_journals))) + journal_file_close(f); + + hashmap_free(s->user_journals); + + if (s->epoll_fd >= 0) + close_nointr_nofail(s->epoll_fd); + + if (s->signal_fd >= 0) + close_nointr_nofail(s->signal_fd); + + if (s->syslog_fd >= 0) + close_nointr_nofail(s->syslog_fd); + + if (s->native_fd >= 0) + close_nointr_nofail(s->native_fd); + + if (s->stdout_fd >= 0) + close_nointr_nofail(s->stdout_fd); + + if (s->rate_limit) + journal_rate_limit_free(s->rate_limit); + + free(s->buffer); +} + +int main(int argc, char *argv[]) { + Server server; + int r; + + /* if (getppid() != 1) { */ + /* log_error("This program should be invoked by init only."); */ + /* return EXIT_FAILURE; */ + /* } */ + + if (argc > 1) { + log_error("This program does not take arguments."); + return EXIT_FAILURE; + } + + log_set_target(LOG_TARGET_CONSOLE); + log_parse_environment(); + log_open(); + + umask(0022); + + r = server_init(&server); + if (r < 0) + goto finish; + + log_debug("systemd-journald running as pid %lu", (unsigned long) getpid()); + + sd_notify(false, + "READY=1\n" + "STATUS=Processing requests..."); + + server_vacuum(&server); + server_flush_to_var(&server); + + for (;;) { + struct epoll_event event; + + r = epoll_wait(server.epoll_fd, &event, 1, -1); + if (r < 0) { + + if (errno == EINTR) + continue; + + log_error("epoll_wait() failed: %m"); + r = -errno; + goto finish; + } else if (r == 0) + break; + + r = process_event(&server, &event); + if (r < 0) + goto finish; + else if (r == 0) + break; + } + + log_debug("systemd-journald stopped as pid %lu", (unsigned long) getpid()); + +finish: + sd_notify(false, + "STATUS=Shutting down..."); + + server_done(&server); + + return r < 0 ? EXIT_FAILURE : EXIT_SUCCESS; +} diff --git a/src/journal/lookup3.c b/src/journal/lookup3.c new file mode 100644 index 0000000000..b90093a5e2 --- /dev/null +++ b/src/journal/lookup3.c @@ -0,0 +1,1003 @@ +/* Slightly modified by Lennart Poettering, to avoid name clashes, and + * unexport a few functions. */ + +#include "lookup3.h" + +/* +------------------------------------------------------------------------------- +lookup3.c, by Bob Jenkins, May 2006, Public Domain. + +These are functions for producing 32-bit hashes for hash table lookup. +hashword(), hashlittle(), hashlittle2(), hashbig(), mix(), and final() +are externally useful functions. Routines to test the hash are included +if SELF_TEST is defined. You can use this free for any purpose. It's in +the public domain. It has no warranty. + +You probably want to use hashlittle(). hashlittle() and hashbig() +hash byte arrays. hashlittle() is is faster than hashbig() on +little-endian machines. Intel and AMD are little-endian machines. +On second thought, you probably want hashlittle2(), which is identical to +hashlittle() except it returns two 32-bit hashes for the price of one. +You could implement hashbig2() if you wanted but I haven't bothered here. + +If you want to find a hash of, say, exactly 7 integers, do + a = i1; b = i2; c = i3; + mix(a,b,c); + a += i4; b += i5; c += i6; + mix(a,b,c); + a += i7; + final(a,b,c); +then use c as the hash value. If you have a variable length array of +4-byte integers to hash, use hashword(). If you have a byte array (like +a character string), use hashlittle(). If you have several byte arrays, or +a mix of things, see the comments above hashlittle(). + +Why is this so big? I read 12 bytes at a time into 3 4-byte integers, +then mix those integers. This is fast (you can do a lot more thorough +mixing with 12*3 instructions on 3 integers than you can with 3 instructions +on 1 byte), but shoehorning those bytes into integers efficiently is messy. +------------------------------------------------------------------------------- +*/ +/* #define SELF_TEST 1 */ + +#include <stdio.h> /* defines printf for tests */ +#include <time.h> /* defines time_t for timings in the test */ +#include <stdint.h> /* defines uint32_t etc */ +#include <sys/param.h> /* attempt to define endianness */ +#ifdef linux +# include <endian.h> /* attempt to define endianness */ +#endif + +/* + * My best guess at if you are big-endian or little-endian. This may + * need adjustment. + */ +#if (defined(__BYTE_ORDER) && defined(__LITTLE_ENDIAN) && \ + __BYTE_ORDER == __LITTLE_ENDIAN) || \ + (defined(i386) || defined(__i386__) || defined(__i486__) || \ + defined(__i586__) || defined(__i686__) || defined(vax) || defined(MIPSEL)) +# define HASH_LITTLE_ENDIAN 1 +# define HASH_BIG_ENDIAN 0 +#elif (defined(__BYTE_ORDER) && defined(__BIG_ENDIAN) && \ + __BYTE_ORDER == __BIG_ENDIAN) || \ + (defined(sparc) || defined(POWERPC) || defined(mc68000) || defined(sel)) +# define HASH_LITTLE_ENDIAN 0 +# define HASH_BIG_ENDIAN 1 +#else +# define HASH_LITTLE_ENDIAN 0 +# define HASH_BIG_ENDIAN 0 +#endif + +#define hashsize(n) ((uint32_t)1<<(n)) +#define hashmask(n) (hashsize(n)-1) +#define rot(x,k) (((x)<<(k)) | ((x)>>(32-(k)))) + +/* +------------------------------------------------------------------------------- +mix -- mix 3 32-bit values reversibly. + +This is reversible, so any information in (a,b,c) before mix() is +still in (a,b,c) after mix(). + +If four pairs of (a,b,c) inputs are run through mix(), or through +mix() in reverse, there are at least 32 bits of the output that +are sometimes the same for one pair and different for another pair. +This was tested for: +* pairs that differed by one bit, by two bits, in any combination + of top bits of (a,b,c), or in any combination of bottom bits of + (a,b,c). +* "differ" is defined as +, -, ^, or ~^. For + and -, I transformed + the output delta to a Gray code (a^(a>>1)) so a string of 1's (as + is commonly produced by subtraction) look like a single 1-bit + difference. +* the base values were pseudorandom, all zero but one bit set, or + all zero plus a counter that starts at zero. + +Some k values for my "a-=c; a^=rot(c,k); c+=b;" arrangement that +satisfy this are + 4 6 8 16 19 4 + 9 15 3 18 27 15 + 14 9 3 7 17 3 +Well, "9 15 3 18 27 15" didn't quite get 32 bits diffing +for "differ" defined as + with a one-bit base and a two-bit delta. I +used http://burtleburtle.net/bob/hash/avalanche.html to choose +the operations, constants, and arrangements of the variables. + +This does not achieve avalanche. There are input bits of (a,b,c) +that fail to affect some output bits of (a,b,c), especially of a. The +most thoroughly mixed value is c, but it doesn't really even achieve +avalanche in c. + +This allows some parallelism. Read-after-writes are good at doubling +the number of bits affected, so the goal of mixing pulls in the opposite +direction as the goal of parallelism. I did what I could. Rotates +seem to cost as much as shifts on every machine I could lay my hands +on, and rotates are much kinder to the top and bottom bits, so I used +rotates. +------------------------------------------------------------------------------- +*/ +#define mix(a,b,c) \ +{ \ + a -= c; a ^= rot(c, 4); c += b; \ + b -= a; b ^= rot(a, 6); a += c; \ + c -= b; c ^= rot(b, 8); b += a; \ + a -= c; a ^= rot(c,16); c += b; \ + b -= a; b ^= rot(a,19); a += c; \ + c -= b; c ^= rot(b, 4); b += a; \ +} + +/* +------------------------------------------------------------------------------- +final -- final mixing of 3 32-bit values (a,b,c) into c + +Pairs of (a,b,c) values differing in only a few bits will usually +produce values of c that look totally different. This was tested for +* pairs that differed by one bit, by two bits, in any combination + of top bits of (a,b,c), or in any combination of bottom bits of + (a,b,c). +* "differ" is defined as +, -, ^, or ~^. For + and -, I transformed + the output delta to a Gray code (a^(a>>1)) so a string of 1's (as + is commonly produced by subtraction) look like a single 1-bit + difference. +* the base values were pseudorandom, all zero but one bit set, or + all zero plus a counter that starts at zero. + +These constants passed: + 14 11 25 16 4 14 24 + 12 14 25 16 4 14 24 +and these came close: + 4 8 15 26 3 22 24 + 10 8 15 26 3 22 24 + 11 8 15 26 3 22 24 +------------------------------------------------------------------------------- +*/ +#define final(a,b,c) \ +{ \ + c ^= b; c -= rot(b,14); \ + a ^= c; a -= rot(c,11); \ + b ^= a; b -= rot(a,25); \ + c ^= b; c -= rot(b,16); \ + a ^= c; a -= rot(c,4); \ + b ^= a; b -= rot(a,14); \ + c ^= b; c -= rot(b,24); \ +} + +/* +-------------------------------------------------------------------- + This works on all machines. To be useful, it requires + -- that the key be an array of uint32_t's, and + -- that the length be the number of uint32_t's in the key + + The function hashword() is identical to hashlittle() on little-endian + machines, and identical to hashbig() on big-endian machines, + except that the length has to be measured in uint32_ts rather than in + bytes. hashlittle() is more complicated than hashword() only because + hashlittle() has to dance around fitting the key bytes into registers. +-------------------------------------------------------------------- +*/ +uint32_t jenkins_hashword( +const uint32_t *k, /* the key, an array of uint32_t values */ +size_t length, /* the length of the key, in uint32_ts */ +uint32_t initval) /* the previous hash, or an arbitrary value */ +{ + uint32_t a,b,c; + + /* Set up the internal state */ + a = b = c = 0xdeadbeef + (((uint32_t)length)<<2) + initval; + + /*------------------------------------------------- handle most of the key */ + while (length > 3) + { + a += k[0]; + b += k[1]; + c += k[2]; + mix(a,b,c); + length -= 3; + k += 3; + } + + /*------------------------------------------- handle the last 3 uint32_t's */ + switch(length) /* all the case statements fall through */ + { + case 3 : c+=k[2]; + case 2 : b+=k[1]; + case 1 : a+=k[0]; + final(a,b,c); + case 0: /* case 0: nothing left to add */ + break; + } + /*------------------------------------------------------ report the result */ + return c; +} + + +/* +-------------------------------------------------------------------- +hashword2() -- same as hashword(), but take two seeds and return two +32-bit values. pc and pb must both be nonnull, and *pc and *pb must +both be initialized with seeds. If you pass in (*pb)==0, the output +(*pc) will be the same as the return value from hashword(). +-------------------------------------------------------------------- +*/ +void jenkins_hashword2 ( +const uint32_t *k, /* the key, an array of uint32_t values */ +size_t length, /* the length of the key, in uint32_ts */ +uint32_t *pc, /* IN: seed OUT: primary hash value */ +uint32_t *pb) /* IN: more seed OUT: secondary hash value */ +{ + uint32_t a,b,c; + + /* Set up the internal state */ + a = b = c = 0xdeadbeef + ((uint32_t)(length<<2)) + *pc; + c += *pb; + + /*------------------------------------------------- handle most of the key */ + while (length > 3) + { + a += k[0]; + b += k[1]; + c += k[2]; + mix(a,b,c); + length -= 3; + k += 3; + } + + /*------------------------------------------- handle the last 3 uint32_t's */ + switch(length) /* all the case statements fall through */ + { + case 3 : c+=k[2]; + case 2 : b+=k[1]; + case 1 : a+=k[0]; + final(a,b,c); + case 0: /* case 0: nothing left to add */ + break; + } + /*------------------------------------------------------ report the result */ + *pc=c; *pb=b; +} + + +/* +------------------------------------------------------------------------------- +hashlittle() -- hash a variable-length key into a 32-bit value + k : the key (the unaligned variable-length array of bytes) + length : the length of the key, counting by bytes + initval : can be any 4-byte value +Returns a 32-bit value. Every bit of the key affects every bit of +the return value. Two keys differing by one or two bits will have +totally different hash values. + +The best hash table sizes are powers of 2. There is no need to do +mod a prime (mod is sooo slow!). If you need less than 32 bits, +use a bitmask. For example, if you need only 10 bits, do + h = (h & hashmask(10)); +In which case, the hash table should have hashsize(10) elements. + +If you are hashing n strings (uint8_t **)k, do it like this: + for (i=0, h=0; i<n; ++i) h = hashlittle( k[i], len[i], h); + +By Bob Jenkins, 2006. bob_jenkins@burtleburtle.net. You may use this +code any way you wish, private, educational, or commercial. It's free. + +Use for hash table lookup, or anything where one collision in 2^^32 is +acceptable. Do NOT use for cryptographic purposes. +------------------------------------------------------------------------------- +*/ + +uint32_t jenkins_hashlittle( const void *key, size_t length, uint32_t initval) +{ + uint32_t a,b,c; /* internal state */ + union { const void *ptr; size_t i; } u; /* needed for Mac Powerbook G4 */ + + /* Set up the internal state */ + a = b = c = 0xdeadbeef + ((uint32_t)length) + initval; + + u.ptr = key; + if (HASH_LITTLE_ENDIAN && ((u.i & 0x3) == 0)) { + const uint32_t *k = (const uint32_t *)key; /* read 32-bit chunks */ + + /*------ all but last block: aligned reads and affect 32 bits of (a,b,c) */ + while (length > 12) + { + a += k[0]; + b += k[1]; + c += k[2]; + mix(a,b,c); + length -= 12; + k += 3; + } + + /*----------------------------- handle the last (probably partial) block */ + /* + * "k[2]&0xffffff" actually reads beyond the end of the string, but + * then masks off the part it's not allowed to read. Because the + * string is aligned, the masked-off tail is in the same word as the + * rest of the string. Every machine with memory protection I've seen + * does it on word boundaries, so is OK with this. But VALGRIND will + * still catch it and complain. The masking trick does make the hash + * noticably faster for short strings (like English words). + */ +#ifndef VALGRIND + + switch(length) + { + case 12: c+=k[2]; b+=k[1]; a+=k[0]; break; + case 11: c+=k[2]&0xffffff; b+=k[1]; a+=k[0]; break; + case 10: c+=k[2]&0xffff; b+=k[1]; a+=k[0]; break; + case 9 : c+=k[2]&0xff; b+=k[1]; a+=k[0]; break; + case 8 : b+=k[1]; a+=k[0]; break; + case 7 : b+=k[1]&0xffffff; a+=k[0]; break; + case 6 : b+=k[1]&0xffff; a+=k[0]; break; + case 5 : b+=k[1]&0xff; a+=k[0]; break; + case 4 : a+=k[0]; break; + case 3 : a+=k[0]&0xffffff; break; + case 2 : a+=k[0]&0xffff; break; + case 1 : a+=k[0]&0xff; break; + case 0 : return c; /* zero length strings require no mixing */ + } + +#else /* make valgrind happy */ + + k8 = (const uint8_t *)k; + switch(length) + { + case 12: c+=k[2]; b+=k[1]; a+=k[0]; break; + case 11: c+=((uint32_t)k8[10])<<16; /* fall through */ + case 10: c+=((uint32_t)k8[9])<<8; /* fall through */ + case 9 : c+=k8[8]; /* fall through */ + case 8 : b+=k[1]; a+=k[0]; break; + case 7 : b+=((uint32_t)k8[6])<<16; /* fall through */ + case 6 : b+=((uint32_t)k8[5])<<8; /* fall through */ + case 5 : b+=k8[4]; /* fall through */ + case 4 : a+=k[0]; break; + case 3 : a+=((uint32_t)k8[2])<<16; /* fall through */ + case 2 : a+=((uint32_t)k8[1])<<8; /* fall through */ + case 1 : a+=k8[0]; break; + case 0 : return c; + } + +#endif /* !valgrind */ + + } else if (HASH_LITTLE_ENDIAN && ((u.i & 0x1) == 0)) { + const uint16_t *k = (const uint16_t *)key; /* read 16-bit chunks */ + const uint8_t *k8; + + /*--------------- all but last block: aligned reads and different mixing */ + while (length > 12) + { + a += k[0] + (((uint32_t)k[1])<<16); + b += k[2] + (((uint32_t)k[3])<<16); + c += k[4] + (((uint32_t)k[5])<<16); + mix(a,b,c); + length -= 12; + k += 6; + } + + /*----------------------------- handle the last (probably partial) block */ + k8 = (const uint8_t *)k; + switch(length) + { + case 12: c+=k[4]+(((uint32_t)k[5])<<16); + b+=k[2]+(((uint32_t)k[3])<<16); + a+=k[0]+(((uint32_t)k[1])<<16); + break; + case 11: c+=((uint32_t)k8[10])<<16; /* fall through */ + case 10: c+=k[4]; + b+=k[2]+(((uint32_t)k[3])<<16); + a+=k[0]+(((uint32_t)k[1])<<16); + break; + case 9 : c+=k8[8]; /* fall through */ + case 8 : b+=k[2]+(((uint32_t)k[3])<<16); + a+=k[0]+(((uint32_t)k[1])<<16); + break; + case 7 : b+=((uint32_t)k8[6])<<16; /* fall through */ + case 6 : b+=k[2]; + a+=k[0]+(((uint32_t)k[1])<<16); + break; + case 5 : b+=k8[4]; /* fall through */ + case 4 : a+=k[0]+(((uint32_t)k[1])<<16); + break; + case 3 : a+=((uint32_t)k8[2])<<16; /* fall through */ + case 2 : a+=k[0]; + break; + case 1 : a+=k8[0]; + break; + case 0 : return c; /* zero length requires no mixing */ + } + + } else { /* need to read the key one byte at a time */ + const uint8_t *k = (const uint8_t *)key; + + /*--------------- all but the last block: affect some 32 bits of (a,b,c) */ + while (length > 12) + { + a += k[0]; + a += ((uint32_t)k[1])<<8; + a += ((uint32_t)k[2])<<16; + a += ((uint32_t)k[3])<<24; + b += k[4]; + b += ((uint32_t)k[5])<<8; + b += ((uint32_t)k[6])<<16; + b += ((uint32_t)k[7])<<24; + c += k[8]; + c += ((uint32_t)k[9])<<8; + c += ((uint32_t)k[10])<<16; + c += ((uint32_t)k[11])<<24; + mix(a,b,c); + length -= 12; + k += 12; + } + + /*-------------------------------- last block: affect all 32 bits of (c) */ + switch(length) /* all the case statements fall through */ + { + case 12: c+=((uint32_t)k[11])<<24; + case 11: c+=((uint32_t)k[10])<<16; + case 10: c+=((uint32_t)k[9])<<8; + case 9 : c+=k[8]; + case 8 : b+=((uint32_t)k[7])<<24; + case 7 : b+=((uint32_t)k[6])<<16; + case 6 : b+=((uint32_t)k[5])<<8; + case 5 : b+=k[4]; + case 4 : a+=((uint32_t)k[3])<<24; + case 3 : a+=((uint32_t)k[2])<<16; + case 2 : a+=((uint32_t)k[1])<<8; + case 1 : a+=k[0]; + break; + case 0 : return c; + } + } + + final(a,b,c); + return c; +} + + +/* + * hashlittle2: return 2 32-bit hash values + * + * This is identical to hashlittle(), except it returns two 32-bit hash + * values instead of just one. This is good enough for hash table + * lookup with 2^^64 buckets, or if you want a second hash if you're not + * happy with the first, or if you want a probably-unique 64-bit ID for + * the key. *pc is better mixed than *pb, so use *pc first. If you want + * a 64-bit value do something like "*pc + (((uint64_t)*pb)<<32)". + */ +void jenkins_hashlittle2( + const void *key, /* the key to hash */ + size_t length, /* length of the key */ + uint32_t *pc, /* IN: primary initval, OUT: primary hash */ + uint32_t *pb) /* IN: secondary initval, OUT: secondary hash */ +{ + uint32_t a,b,c; /* internal state */ + union { const void *ptr; size_t i; } u; /* needed for Mac Powerbook G4 */ + + /* Set up the internal state */ + a = b = c = 0xdeadbeef + ((uint32_t)length) + *pc; + c += *pb; + + u.ptr = key; + if (HASH_LITTLE_ENDIAN && ((u.i & 0x3) == 0)) { + const uint32_t *k = (const uint32_t *)key; /* read 32-bit chunks */ + + /*------ all but last block: aligned reads and affect 32 bits of (a,b,c) */ + while (length > 12) + { + a += k[0]; + b += k[1]; + c += k[2]; + mix(a,b,c); + length -= 12; + k += 3; + } + + /*----------------------------- handle the last (probably partial) block */ + /* + * "k[2]&0xffffff" actually reads beyond the end of the string, but + * then masks off the part it's not allowed to read. Because the + * string is aligned, the masked-off tail is in the same word as the + * rest of the string. Every machine with memory protection I've seen + * does it on word boundaries, so is OK with this. But VALGRIND will + * still catch it and complain. The masking trick does make the hash + * noticably faster for short strings (like English words). + */ +#ifndef VALGRIND + + switch(length) + { + case 12: c+=k[2]; b+=k[1]; a+=k[0]; break; + case 11: c+=k[2]&0xffffff; b+=k[1]; a+=k[0]; break; + case 10: c+=k[2]&0xffff; b+=k[1]; a+=k[0]; break; + case 9 : c+=k[2]&0xff; b+=k[1]; a+=k[0]; break; + case 8 : b+=k[1]; a+=k[0]; break; + case 7 : b+=k[1]&0xffffff; a+=k[0]; break; + case 6 : b+=k[1]&0xffff; a+=k[0]; break; + case 5 : b+=k[1]&0xff; a+=k[0]; break; + case 4 : a+=k[0]; break; + case 3 : a+=k[0]&0xffffff; break; + case 2 : a+=k[0]&0xffff; break; + case 1 : a+=k[0]&0xff; break; + case 0 : *pc=c; *pb=b; return; /* zero length strings require no mixing */ + } + +#else /* make valgrind happy */ + + k8 = (const uint8_t *)k; + switch(length) + { + case 12: c+=k[2]; b+=k[1]; a+=k[0]; break; + case 11: c+=((uint32_t)k8[10])<<16; /* fall through */ + case 10: c+=((uint32_t)k8[9])<<8; /* fall through */ + case 9 : c+=k8[8]; /* fall through */ + case 8 : b+=k[1]; a+=k[0]; break; + case 7 : b+=((uint32_t)k8[6])<<16; /* fall through */ + case 6 : b+=((uint32_t)k8[5])<<8; /* fall through */ + case 5 : b+=k8[4]; /* fall through */ + case 4 : a+=k[0]; break; + case 3 : a+=((uint32_t)k8[2])<<16; /* fall through */ + case 2 : a+=((uint32_t)k8[1])<<8; /* fall through */ + case 1 : a+=k8[0]; break; + case 0 : *pc=c; *pb=b; return; /* zero length strings require no mixing */ + } + +#endif /* !valgrind */ + + } else if (HASH_LITTLE_ENDIAN && ((u.i & 0x1) == 0)) { + const uint16_t *k = (const uint16_t *)key; /* read 16-bit chunks */ + const uint8_t *k8; + + /*--------------- all but last block: aligned reads and different mixing */ + while (length > 12) + { + a += k[0] + (((uint32_t)k[1])<<16); + b += k[2] + (((uint32_t)k[3])<<16); + c += k[4] + (((uint32_t)k[5])<<16); + mix(a,b,c); + length -= 12; + k += 6; + } + + /*----------------------------- handle the last (probably partial) block */ + k8 = (const uint8_t *)k; + switch(length) + { + case 12: c+=k[4]+(((uint32_t)k[5])<<16); + b+=k[2]+(((uint32_t)k[3])<<16); + a+=k[0]+(((uint32_t)k[1])<<16); + break; + case 11: c+=((uint32_t)k8[10])<<16; /* fall through */ + case 10: c+=k[4]; + b+=k[2]+(((uint32_t)k[3])<<16); + a+=k[0]+(((uint32_t)k[1])<<16); + break; + case 9 : c+=k8[8]; /* fall through */ + case 8 : b+=k[2]+(((uint32_t)k[3])<<16); + a+=k[0]+(((uint32_t)k[1])<<16); + break; + case 7 : b+=((uint32_t)k8[6])<<16; /* fall through */ + case 6 : b+=k[2]; + a+=k[0]+(((uint32_t)k[1])<<16); + break; + case 5 : b+=k8[4]; /* fall through */ + case 4 : a+=k[0]+(((uint32_t)k[1])<<16); + break; + case 3 : a+=((uint32_t)k8[2])<<16; /* fall through */ + case 2 : a+=k[0]; + break; + case 1 : a+=k8[0]; + break; + case 0 : *pc=c; *pb=b; return; /* zero length strings require no mixing */ + } + + } else { /* need to read the key one byte at a time */ + const uint8_t *k = (const uint8_t *)key; + + /*--------------- all but the last block: affect some 32 bits of (a,b,c) */ + while (length > 12) + { + a += k[0]; + a += ((uint32_t)k[1])<<8; + a += ((uint32_t)k[2])<<16; + a += ((uint32_t)k[3])<<24; + b += k[4]; + b += ((uint32_t)k[5])<<8; + b += ((uint32_t)k[6])<<16; + b += ((uint32_t)k[7])<<24; + c += k[8]; + c += ((uint32_t)k[9])<<8; + c += ((uint32_t)k[10])<<16; + c += ((uint32_t)k[11])<<24; + mix(a,b,c); + length -= 12; + k += 12; + } + + /*-------------------------------- last block: affect all 32 bits of (c) */ + switch(length) /* all the case statements fall through */ + { + case 12: c+=((uint32_t)k[11])<<24; + case 11: c+=((uint32_t)k[10])<<16; + case 10: c+=((uint32_t)k[9])<<8; + case 9 : c+=k[8]; + case 8 : b+=((uint32_t)k[7])<<24; + case 7 : b+=((uint32_t)k[6])<<16; + case 6 : b+=((uint32_t)k[5])<<8; + case 5 : b+=k[4]; + case 4 : a+=((uint32_t)k[3])<<24; + case 3 : a+=((uint32_t)k[2])<<16; + case 2 : a+=((uint32_t)k[1])<<8; + case 1 : a+=k[0]; + break; + case 0 : *pc=c; *pb=b; return; /* zero length strings require no mixing */ + } + } + + final(a,b,c); + *pc=c; *pb=b; +} + + + +/* + * hashbig(): + * This is the same as hashword() on big-endian machines. It is different + * from hashlittle() on all machines. hashbig() takes advantage of + * big-endian byte ordering. + */ +uint32_t jenkins_hashbig( const void *key, size_t length, uint32_t initval) +{ + uint32_t a,b,c; + union { const void *ptr; size_t i; } u; /* to cast key to (size_t) happily */ + + /* Set up the internal state */ + a = b = c = 0xdeadbeef + ((uint32_t)length) + initval; + + u.ptr = key; + if (HASH_BIG_ENDIAN && ((u.i & 0x3) == 0)) { + const uint32_t *k = (const uint32_t *)key; /* read 32-bit chunks */ + + /*------ all but last block: aligned reads and affect 32 bits of (a,b,c) */ + while (length > 12) + { + a += k[0]; + b += k[1]; + c += k[2]; + mix(a,b,c); + length -= 12; + k += 3; + } + + /*----------------------------- handle the last (probably partial) block */ + /* + * "k[2]<<8" actually reads beyond the end of the string, but + * then shifts out the part it's not allowed to read. Because the + * string is aligned, the illegal read is in the same word as the + * rest of the string. Every machine with memory protection I've seen + * does it on word boundaries, so is OK with this. But VALGRIND will + * still catch it and complain. The masking trick does make the hash + * noticably faster for short strings (like English words). + */ +#ifndef VALGRIND + + switch(length) + { + case 12: c+=k[2]; b+=k[1]; a+=k[0]; break; + case 11: c+=k[2]&0xffffff00; b+=k[1]; a+=k[0]; break; + case 10: c+=k[2]&0xffff0000; b+=k[1]; a+=k[0]; break; + case 9 : c+=k[2]&0xff000000; b+=k[1]; a+=k[0]; break; + case 8 : b+=k[1]; a+=k[0]; break; + case 7 : b+=k[1]&0xffffff00; a+=k[0]; break; + case 6 : b+=k[1]&0xffff0000; a+=k[0]; break; + case 5 : b+=k[1]&0xff000000; a+=k[0]; break; + case 4 : a+=k[0]; break; + case 3 : a+=k[0]&0xffffff00; break; + case 2 : a+=k[0]&0xffff0000; break; + case 1 : a+=k[0]&0xff000000; break; + case 0 : return c; /* zero length strings require no mixing */ + } + +#else /* make valgrind happy */ + + k8 = (const uint8_t *)k; + switch(length) /* all the case statements fall through */ + { + case 12: c+=k[2]; b+=k[1]; a+=k[0]; break; + case 11: c+=((uint32_t)k8[10])<<8; /* fall through */ + case 10: c+=((uint32_t)k8[9])<<16; /* fall through */ + case 9 : c+=((uint32_t)k8[8])<<24; /* fall through */ + case 8 : b+=k[1]; a+=k[0]; break; + case 7 : b+=((uint32_t)k8[6])<<8; /* fall through */ + case 6 : b+=((uint32_t)k8[5])<<16; /* fall through */ + case 5 : b+=((uint32_t)k8[4])<<24; /* fall through */ + case 4 : a+=k[0]; break; + case 3 : a+=((uint32_t)k8[2])<<8; /* fall through */ + case 2 : a+=((uint32_t)k8[1])<<16; /* fall through */ + case 1 : a+=((uint32_t)k8[0])<<24; break; + case 0 : return c; + } + +#endif /* !VALGRIND */ + + } else { /* need to read the key one byte at a time */ + const uint8_t *k = (const uint8_t *)key; + + /*--------------- all but the last block: affect some 32 bits of (a,b,c) */ + while (length > 12) + { + a += ((uint32_t)k[0])<<24; + a += ((uint32_t)k[1])<<16; + a += ((uint32_t)k[2])<<8; + a += ((uint32_t)k[3]); + b += ((uint32_t)k[4])<<24; + b += ((uint32_t)k[5])<<16; + b += ((uint32_t)k[6])<<8; + b += ((uint32_t)k[7]); + c += ((uint32_t)k[8])<<24; + c += ((uint32_t)k[9])<<16; + c += ((uint32_t)k[10])<<8; + c += ((uint32_t)k[11]); + mix(a,b,c); + length -= 12; + k += 12; + } + + /*-------------------------------- last block: affect all 32 bits of (c) */ + switch(length) /* all the case statements fall through */ + { + case 12: c+=k[11]; + case 11: c+=((uint32_t)k[10])<<8; + case 10: c+=((uint32_t)k[9])<<16; + case 9 : c+=((uint32_t)k[8])<<24; + case 8 : b+=k[7]; + case 7 : b+=((uint32_t)k[6])<<8; + case 6 : b+=((uint32_t)k[5])<<16; + case 5 : b+=((uint32_t)k[4])<<24; + case 4 : a+=k[3]; + case 3 : a+=((uint32_t)k[2])<<8; + case 2 : a+=((uint32_t)k[1])<<16; + case 1 : a+=((uint32_t)k[0])<<24; + break; + case 0 : return c; + } + } + + final(a,b,c); + return c; +} + + +#ifdef SELF_TEST + +/* used for timings */ +void driver1() +{ + uint8_t buf[256]; + uint32_t i; + uint32_t h=0; + time_t a,z; + + time(&a); + for (i=0; i<256; ++i) buf[i] = 'x'; + for (i=0; i<1; ++i) + { + h = hashlittle(&buf[0],1,h); + } + time(&z); + if (z-a > 0) printf("time %d %.8x\n", z-a, h); +} + +/* check that every input bit changes every output bit half the time */ +#define HASHSTATE 1 +#define HASHLEN 1 +#define MAXPAIR 60 +#define MAXLEN 70 +void driver2() +{ + uint8_t qa[MAXLEN+1], qb[MAXLEN+2], *a = &qa[0], *b = &qb[1]; + uint32_t c[HASHSTATE], d[HASHSTATE], i=0, j=0, k, l, m=0, z; + uint32_t e[HASHSTATE],f[HASHSTATE],g[HASHSTATE],h[HASHSTATE]; + uint32_t x[HASHSTATE],y[HASHSTATE]; + uint32_t hlen; + + printf("No more than %d trials should ever be needed \n",MAXPAIR/2); + for (hlen=0; hlen < MAXLEN; ++hlen) + { + z=0; + for (i=0; i<hlen; ++i) /*----------------------- for each input byte, */ + { + for (j=0; j<8; ++j) /*------------------------ for each input bit, */ + { + for (m=1; m<8; ++m) /*------------ for serveral possible initvals, */ + { + for (l=0; l<HASHSTATE; ++l) + e[l]=f[l]=g[l]=h[l]=x[l]=y[l]=~((uint32_t)0); + + /*---- check that every output bit is affected by that input bit */ + for (k=0; k<MAXPAIR; k+=2) + { + uint32_t finished=1; + /* keys have one bit different */ + for (l=0; l<hlen+1; ++l) {a[l] = b[l] = (uint8_t)0;} + /* have a and b be two keys differing in only one bit */ + a[i] ^= (k<<j); + a[i] ^= (k>>(8-j)); + c[0] = hashlittle(a, hlen, m); + b[i] ^= ((k+1)<<j); + b[i] ^= ((k+1)>>(8-j)); + d[0] = hashlittle(b, hlen, m); + /* check every bit is 1, 0, set, and not set at least once */ + for (l=0; l<HASHSTATE; ++l) + { + e[l] &= (c[l]^d[l]); + f[l] &= ~(c[l]^d[l]); + g[l] &= c[l]; + h[l] &= ~c[l]; + x[l] &= d[l]; + y[l] &= ~d[l]; + if (e[l]|f[l]|g[l]|h[l]|x[l]|y[l]) finished=0; + } + if (finished) break; + } + if (k>z) z=k; + if (k==MAXPAIR) + { + printf("Some bit didn't change: "); + printf("%.8x %.8x %.8x %.8x %.8x %.8x ", + e[0],f[0],g[0],h[0],x[0],y[0]); + printf("i %d j %d m %d len %d\n", i, j, m, hlen); + } + if (z==MAXPAIR) goto done; + } + } + } + done: + if (z < MAXPAIR) + { + printf("Mix success %2d bytes %2d initvals ",i,m); + printf("required %d trials\n", z/2); + } + } + printf("\n"); +} + +/* Check for reading beyond the end of the buffer and alignment problems */ +void driver3() +{ + uint8_t buf[MAXLEN+20], *b; + uint32_t len; + uint8_t q[] = "This is the time for all good men to come to the aid of their country..."; + uint32_t h; + uint8_t qq[] = "xThis is the time for all good men to come to the aid of their country..."; + uint32_t i; + uint8_t qqq[] = "xxThis is the time for all good men to come to the aid of their country..."; + uint32_t j; + uint8_t qqqq[] = "xxxThis is the time for all good men to come to the aid of their country..."; + uint32_t ref,x,y; + uint8_t *p; + + printf("Endianness. These lines should all be the same (for values filled in):\n"); + printf("%.8x %.8x %.8x\n", + hashword((const uint32_t *)q, (sizeof(q)-1)/4, 13), + hashword((const uint32_t *)q, (sizeof(q)-5)/4, 13), + hashword((const uint32_t *)q, (sizeof(q)-9)/4, 13)); + p = q; + printf("%.8x %.8x %.8x %.8x %.8x %.8x %.8x %.8x %.8x %.8x %.8x %.8x\n", + hashlittle(p, sizeof(q)-1, 13), hashlittle(p, sizeof(q)-2, 13), + hashlittle(p, sizeof(q)-3, 13), hashlittle(p, sizeof(q)-4, 13), + hashlittle(p, sizeof(q)-5, 13), hashlittle(p, sizeof(q)-6, 13), + hashlittle(p, sizeof(q)-7, 13), hashlittle(p, sizeof(q)-8, 13), + hashlittle(p, sizeof(q)-9, 13), hashlittle(p, sizeof(q)-10, 13), + hashlittle(p, sizeof(q)-11, 13), hashlittle(p, sizeof(q)-12, 13)); + p = &qq[1]; + printf("%.8x %.8x %.8x %.8x %.8x %.8x %.8x %.8x %.8x %.8x %.8x %.8x\n", + hashlittle(p, sizeof(q)-1, 13), hashlittle(p, sizeof(q)-2, 13), + hashlittle(p, sizeof(q)-3, 13), hashlittle(p, sizeof(q)-4, 13), + hashlittle(p, sizeof(q)-5, 13), hashlittle(p, sizeof(q)-6, 13), + hashlittle(p, sizeof(q)-7, 13), hashlittle(p, sizeof(q)-8, 13), + hashlittle(p, sizeof(q)-9, 13), hashlittle(p, sizeof(q)-10, 13), + hashlittle(p, sizeof(q)-11, 13), hashlittle(p, sizeof(q)-12, 13)); + p = &qqq[2]; + printf("%.8x %.8x %.8x %.8x %.8x %.8x %.8x %.8x %.8x %.8x %.8x %.8x\n", + hashlittle(p, sizeof(q)-1, 13), hashlittle(p, sizeof(q)-2, 13), + hashlittle(p, sizeof(q)-3, 13), hashlittle(p, sizeof(q)-4, 13), + hashlittle(p, sizeof(q)-5, 13), hashlittle(p, sizeof(q)-6, 13), + hashlittle(p, sizeof(q)-7, 13), hashlittle(p, sizeof(q)-8, 13), + hashlittle(p, sizeof(q)-9, 13), hashlittle(p, sizeof(q)-10, 13), + hashlittle(p, sizeof(q)-11, 13), hashlittle(p, sizeof(q)-12, 13)); + p = &qqqq[3]; + printf("%.8x %.8x %.8x %.8x %.8x %.8x %.8x %.8x %.8x %.8x %.8x %.8x\n", + hashlittle(p, sizeof(q)-1, 13), hashlittle(p, sizeof(q)-2, 13), + hashlittle(p, sizeof(q)-3, 13), hashlittle(p, sizeof(q)-4, 13), + hashlittle(p, sizeof(q)-5, 13), hashlittle(p, sizeof(q)-6, 13), + hashlittle(p, sizeof(q)-7, 13), hashlittle(p, sizeof(q)-8, 13), + hashlittle(p, sizeof(q)-9, 13), hashlittle(p, sizeof(q)-10, 13), + hashlittle(p, sizeof(q)-11, 13), hashlittle(p, sizeof(q)-12, 13)); + printf("\n"); + + /* check that hashlittle2 and hashlittle produce the same results */ + i=47; j=0; + hashlittle2(q, sizeof(q), &i, &j); + if (hashlittle(q, sizeof(q), 47) != i) + printf("hashlittle2 and hashlittle mismatch\n"); + + /* check that hashword2 and hashword produce the same results */ + len = 0xdeadbeef; + i=47, j=0; + hashword2(&len, 1, &i, &j); + if (hashword(&len, 1, 47) != i) + printf("hashword2 and hashword mismatch %x %x\n", + i, hashword(&len, 1, 47)); + + /* check hashlittle doesn't read before or after the ends of the string */ + for (h=0, b=buf+1; h<8; ++h, ++b) + { + for (i=0; i<MAXLEN; ++i) + { + len = i; + for (j=0; j<i; ++j) *(b+j)=0; + + /* these should all be equal */ + ref = hashlittle(b, len, (uint32_t)1); + *(b+i)=(uint8_t)~0; + *(b-1)=(uint8_t)~0; + x = hashlittle(b, len, (uint32_t)1); + y = hashlittle(b, len, (uint32_t)1); + if ((ref != x) || (ref != y)) + { + printf("alignment error: %.8x %.8x %.8x %d %d\n",ref,x,y, + h, i); + } + } + } +} + +/* check for problems with nulls */ + void driver4() +{ + uint8_t buf[1]; + uint32_t h,i,state[HASHSTATE]; + + + buf[0] = ~0; + for (i=0; i<HASHSTATE; ++i) state[i] = 1; + printf("These should all be different\n"); + for (i=0, h=0; i<8; ++i) + { + h = hashlittle(buf, 0, h); + printf("%2ld 0-byte strings, hash is %.8x\n", i, h); + } +} + +void driver5() +{ + uint32_t b,c; + b=0, c=0, hashlittle2("", 0, &c, &b); + printf("hash is %.8lx %.8lx\n", c, b); /* deadbeef deadbeef */ + b=0xdeadbeef, c=0, hashlittle2("", 0, &c, &b); + printf("hash is %.8lx %.8lx\n", c, b); /* bd5b7dde deadbeef */ + b=0xdeadbeef, c=0xdeadbeef, hashlittle2("", 0, &c, &b); + printf("hash is %.8lx %.8lx\n", c, b); /* 9c093ccd bd5b7dde */ + b=0, c=0, hashlittle2("Four score and seven years ago", 30, &c, &b); + printf("hash is %.8lx %.8lx\n", c, b); /* 17770551 ce7226e6 */ + b=1, c=0, hashlittle2("Four score and seven years ago", 30, &c, &b); + printf("hash is %.8lx %.8lx\n", c, b); /* e3607cae bd371de4 */ + b=0, c=1, hashlittle2("Four score and seven years ago", 30, &c, &b); + printf("hash is %.8lx %.8lx\n", c, b); /* cd628161 6cbea4b3 */ + c = hashlittle("Four score and seven years ago", 30, 0); + printf("hash is %.8lx\n", c); /* 17770551 */ + c = hashlittle("Four score and seven years ago", 30, 1); + printf("hash is %.8lx\n", c); /* cd628161 */ +} + + +int main() +{ + driver1(); /* test that the key is hashed: used for timings */ + driver2(); /* test that whole key is hashed thoroughly */ + driver3(); /* test that nothing but the key is hashed */ + driver4(); /* test hashing multiple buffers (all buffers are null) */ + driver5(); /* test the hash against known vectors */ + return 1; +} + +#endif /* SELF_TEST */ diff --git a/src/journal/lookup3.h b/src/journal/lookup3.h new file mode 100644 index 0000000000..31cc2f57b0 --- /dev/null +++ b/src/journal/lookup3.h @@ -0,0 +1,25 @@ +/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/ + +#ifndef foolookup3hfoo +#define foolookup3hfoo + +#include <inttypes.h> +#include <sys/types.h> + +uint32_t jenkins_hashword(const uint32_t *k, size_t length, uint32_t initval); +void jenkins_hashword2(const uint32_t *k, size_t length, uint32_t *pc, uint32_t *pb); + +uint32_t jenkins_hashlittle(const void *key, size_t length, uint32_t initval); +void jenkins_hashlittle2(const void *key, size_t length, uint32_t *pc, uint32_t *pb); + +uint32_t jenkins_hashbig(const void *key, size_t length, uint32_t initval); + +static inline uint64_t hash64(const void *data, size_t length) { + uint32_t a = 0, b = 0; + + jenkins_hashlittle2(data, length, &a, &b); + + return ((uint64_t) a << 32ULL) | (uint64_t) b; +} + +#endif diff --git a/src/journal/sd-journal.c b/src/journal/sd-journal.c new file mode 100644 index 0000000000..38e58f5732 --- /dev/null +++ b/src/journal/sd-journal.c @@ -0,0 +1,1573 @@ +/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/ + +/*** + This file is part of systemd. + + Copyright 2011 Lennart Poettering + + systemd is free software; you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + systemd is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with systemd; If not, see <http://www.gnu.org/licenses/>. +***/ + +#include <errno.h> +#include <fcntl.h> +#include <stddef.h> +#include <unistd.h> +#include <sys/inotify.h> + +#include "sd-journal.h" +#include "journal-def.h" +#include "journal-file.h" +#include "hashmap.h" +#include "list.h" +#include "lookup3.h" +#include "compress.h" +#include "journal-internal.h" + +#define JOURNAL_FILES_MAX 1024 + +static void detach_location(sd_journal *j) { + Iterator i; + JournalFile *f; + + assert(j); + + j->current_file = NULL; + j->current_field = 0; + + HASHMAP_FOREACH(f, j->files, i) + f->current_offset = 0; +} + +static void reset_location(sd_journal *j) { + assert(j); + + detach_location(j); + zero(j->current_location); +} + +static void init_location(Location *l, JournalFile *f, Object *o) { + assert(l); + assert(f); + assert(o->object.type == OBJECT_ENTRY); + + l->type = LOCATION_DISCRETE; + l->seqnum = le64toh(o->entry.seqnum); + l->seqnum_id = f->header->seqnum_id; + l->realtime = le64toh(o->entry.realtime); + l->monotonic = le64toh(o->entry.monotonic); + l->boot_id = le64toh(o->entry.boot_id); + l->xor_hash = le64toh(o->entry.xor_hash); + + l->seqnum_set = l->realtime_set = l->monotonic_set = l->xor_hash_set = true; +} + +static void set_location(sd_journal *j, JournalFile *f, Object *o, uint64_t offset) { + assert(j); + assert(f); + assert(o); + + init_location(&j->current_location, f, o); + + j->current_file = f; + j->current_field = 0; + + f->current_offset = offset; +} + +static int same_field(const void *_a, size_t s, const void *_b, size_t t) { + const uint8_t *a = _a, *b = _b; + size_t j; + bool a_good = false, b_good = false, different = false; + + for (j = 0; j < s && j < t; j++) { + + if (a[j] == '=') + a_good = true; + if (b[j] == '=') + b_good = true; + if (a[j] != b[j]) + different = true; + + if (a_good && b_good) + return different ? 0 : 1; + } + + return -EINVAL; +} + +int sd_journal_add_match(sd_journal *j, const void *data, size_t size) { + Match *m, *after = NULL; + uint64_t le_hash; + + assert(j); + + if (size <= 0) + return -EINVAL; + + assert(data); + + le_hash = htole64(hash64(data, size)); + + LIST_FOREACH(matches, m, j->matches) { + int r; + + if (m->le_hash == le_hash && + m->size == size && + memcmp(m->data, data, size) == 0) + return 0; + + r = same_field(data, size, m->data, m->size); + if (r < 0) + return r; + else if (r > 0) + after = m; + } + + m = new0(Match, 1); + if (!m) + return -ENOMEM; + + m->size = size; + + m->data = malloc(m->size); + if (!m->data) { + free(m); + return -ENOMEM; + } + + memcpy(m->data, data, size); + m->le_hash = le_hash; + + /* Matches for the same fields we order adjacent to each + * other */ + LIST_INSERT_AFTER(Match, matches, j->matches, after, m); + j->n_matches ++; + + detach_location(j); + + return 0; +} + +void sd_journal_flush_matches(sd_journal *j) { + assert(j); + + while (j->matches) { + Match *m = j->matches; + + LIST_REMOVE(Match, matches, j->matches, m); + free(m->data); + free(m); + } + + j->n_matches = 0; + + detach_location(j); +} + +static int compare_order(JournalFile *af, Object *ao, + JournalFile *bf, Object *bo) { + + uint64_t a, b; + + assert(af); + assert(ao); + assert(bf); + assert(bo); + + /* We operate on two different files here, hence we can access + * two objects at the same time, which we normally can't. + * + * If contents and timestamps match, these entries are + * identical, even if the seqnum does not match */ + + if (sd_id128_equal(ao->entry.boot_id, bo->entry.boot_id) && + ao->entry.monotonic == bo->entry.monotonic && + ao->entry.realtime == bo->entry.realtime && + ao->entry.xor_hash == bo->entry.xor_hash) + return 0; + + if (sd_id128_equal(af->header->seqnum_id, bf->header->seqnum_id)) { + + /* If this is from the same seqnum source, compare + * seqnums */ + a = le64toh(ao->entry.seqnum); + b = le64toh(bo->entry.seqnum); + + if (a < b) + return -1; + if (a > b) + return 1; + + /* Wow! This is weird, different data but the same + * seqnums? Something is borked, but let's make the + * best of it and compare by time. */ + } + + if (sd_id128_equal(ao->entry.boot_id, bo->entry.boot_id)) { + + /* If the boot id matches compare monotonic time */ + a = le64toh(ao->entry.monotonic); + b = le64toh(bo->entry.monotonic); + + if (a < b) + return -1; + if (a > b) + return 1; + } + + /* Otherwise compare UTC time */ + a = le64toh(ao->entry.realtime); + b = le64toh(ao->entry.realtime); + + if (a < b) + return -1; + if (a > b) + return 1; + + /* Finally, compare by contents */ + a = le64toh(ao->entry.xor_hash); + b = le64toh(ao->entry.xor_hash); + + if (a < b) + return -1; + if (a > b) + return 1; + + return 0; +} + +static int compare_with_location(JournalFile *af, Object *ao, Location *l) { + uint64_t a; + + assert(af); + assert(ao); + assert(l); + assert(l->type == LOCATION_DISCRETE); + + if (l->monotonic_set && + sd_id128_equal(ao->entry.boot_id, l->boot_id) && + l->realtime_set && + le64toh(ao->entry.realtime) == l->realtime && + l->xor_hash_set && + le64toh(ao->entry.xor_hash) == l->xor_hash) + return 0; + + if (l->seqnum_set && + sd_id128_equal(af->header->seqnum_id, l->seqnum_id)) { + + a = le64toh(ao->entry.seqnum); + + if (a < l->seqnum) + return -1; + if (a > l->seqnum) + return 1; + } + + if (l->monotonic_set && + sd_id128_equal(ao->entry.boot_id, l->boot_id)) { + + a = le64toh(ao->entry.monotonic); + + if (a < l->monotonic) + return -1; + if (a > l->monotonic) + return 1; + } + + if (l->realtime_set) { + + a = le64toh(ao->entry.realtime); + + if (a < l->realtime) + return -1; + if (a > l->realtime) + return 1; + } + + if (l->xor_hash_set) { + a = le64toh(ao->entry.xor_hash); + + if (a < l->xor_hash) + return -1; + if (a > l->xor_hash) + return 1; + } + + return 0; +} + +static int find_location(sd_journal *j, JournalFile *f, direction_t direction, Object **ret, uint64_t *offset) { + Object *o = NULL; + uint64_t p = 0; + int r; + + assert(j); + + if (!j->matches) { + /* No matches is simple */ + + if (j->current_location.type == LOCATION_HEAD) + r = journal_file_next_entry(f, NULL, 0, DIRECTION_DOWN, &o, &p); + else if (j->current_location.type == LOCATION_TAIL) + r = journal_file_next_entry(f, NULL, 0, DIRECTION_UP, &o, &p); + else if (j->current_location.seqnum_set && + sd_id128_equal(j->current_location.seqnum_id, f->header->seqnum_id)) + r = journal_file_move_to_entry_by_seqnum(f, j->current_location.seqnum, direction, &o, &p); + else if (j->current_location.monotonic_set) + r = journal_file_move_to_entry_by_monotonic(f, j->current_location.boot_id, j->current_location.monotonic, direction, &o, &p); + else if (j->current_location.realtime_set) + r = journal_file_move_to_entry_by_realtime(f, j->current_location.realtime, direction, &o, &p); + else + r = journal_file_next_entry(f, NULL, 0, direction, &o, &p); + + if (r <= 0) + return r; + + } else { + Match *m, *term_match = NULL; + Object *to = NULL; + uint64_t tp = 0; + + /* We have matches, first, let's jump to the monotonic + * position if we have any, since it implies a + * match. */ + + if (j->current_location.type == LOCATION_DISCRETE && + j->current_location.monotonic_set) { + + r = journal_file_move_to_entry_by_monotonic(f, j->current_location.boot_id, j->current_location.monotonic, direction, &o, &p); + if (r <= 0) + return r; + } + + LIST_FOREACH(matches, m, j->matches) { + Object *c, *d; + uint64_t cp, dp; + + r = journal_file_find_data_object_with_hash(f, m->data, m->size, m->le_hash, &d, &dp); + if (r <= 0) + return r; + + if (j->current_location.type == LOCATION_HEAD) + r = journal_file_next_entry_for_data(f, NULL, 0, dp, DIRECTION_DOWN, &c, &cp); + else if (j->current_location.type == LOCATION_TAIL) + r = journal_file_next_entry_for_data(f, NULL, 0, dp, DIRECTION_UP, &c, &cp); + else if (j->current_location.seqnum_set && + sd_id128_equal(j->current_location.seqnum_id, f->header->seqnum_id)) + r = journal_file_move_to_entry_by_seqnum_for_data(f, dp, j->current_location.seqnum, direction, &c, &cp); + else if (j->current_location.realtime_set) + r = journal_file_move_to_entry_by_realtime_for_data(f, dp, j->current_location.realtime, direction, &c, &cp); + else + r = journal_file_next_entry_for_data(f, NULL, 0, dp, direction, &c, &cp); + + if (!term_match) { + term_match = m; + + if (r > 0) { + to = c; + tp = cp; + } + } else if (same_field(term_match->data, term_match->size, m->data, m->size)) { + + /* Same field as previous match... */ + if (r > 0) { + + /* Find the earliest of the OR matches */ + + if (!to || + (direction == DIRECTION_DOWN && cp < tp) || + (direction == DIRECTION_UP && cp > tp)) { + to = c; + tp = cp; + } + + } + + } else { + + /* Previous term is finished, did anything match? */ + if (!to) + return 0; + + /* Find the last of the AND matches */ + if (!o || + (direction == DIRECTION_DOWN && tp > p) || + (direction == DIRECTION_UP && tp < p)) { + o = to; + p = tp; + } + + term_match = m; + + if (r > 0) { + to = c; + tp = cp; + } else { + to = NULL; + tp = 0; + } + } + } + + /* Last term is finished, did anything match? */ + if (!to) + return 0; + + if (!o || + (direction == DIRECTION_DOWN && tp > p) || + (direction == DIRECTION_UP && tp < p)) { + o = to; + p = tp; + } + + if (!o) + return 0; + } + + if (ret) + *ret = o; + + if (offset) + *offset = p; + + return 1; +} + +static int next_with_matches(sd_journal *j, JournalFile *f, direction_t direction, Object **ret, uint64_t *offset) { + int r; + uint64_t cp; + Object *c; + + assert(j); + assert(f); + assert(ret); + assert(offset); + + c = *ret; + cp = *offset; + + if (!j->matches) { + /* No matches is easy */ + + r = journal_file_next_entry(f, c, cp, direction, &c, &cp); + if (r <= 0) + return r; + + if (ret) + *ret = c; + if (offset) + *offset = cp; + return 1; + } + + /* So there are matches we have to adhere to, let's find the + * first entry that matches all of them */ + + for (;;) { + uint64_t np, n; + bool found, term_result = false; + Match *m, *term_match = NULL; + Object *npo = NULL; + + n = journal_file_entry_n_items(c); + + /* Make sure we don't match the entry we are starting + * from. */ + found = cp > *offset; + + np = 0; + LIST_FOREACH(matches, m, j->matches) { + uint64_t q, k; + Object *qo = NULL; + + /* Let's check if this is the beginning of a + * new term, i.e. has a different field prefix + * as the preceeding match. */ + if (!term_match) { + term_match = m; + term_result = false; + } else if (!same_field(term_match->data, term_match->size, m->data, m->size)) { + if (!term_result) + found = false; + + term_match = m; + term_result = false; + } + + for (k = 0; k < n; k++) + if (c->entry.items[k].hash == m->le_hash) + break; + + if (k >= n) { + /* Hmm, didn't find any field that + * matched this rule, so ignore this + * match. Go on with next match */ + continue; + } + + term_result = true; + + /* Hmm, so, this field matched, let's remember + * where we'd have to try next, in case the other + * matches are not OK */ + + r = journal_file_next_entry_for_data(f, c, cp, le64toh(c->entry.items[k].object_offset), direction, &qo, &q); + if (r > 0) { + + if (direction == DIRECTION_DOWN) { + if (q > np) { + np = q; + npo = qo; + } + } else { + if (np == 0 || q < np) { + np = q; + npo = qo; + } + } + } + } + + /* Check the last term */ + if (term_match && !term_result) + found = false; + + /* Did this entry match against all matches? */ + if (found) { + if (ret) + *ret = c; + if (offset) + *offset = cp; + return 1; + } + + /* Did we find a subsequent entry? */ + if (np == 0) + return 0; + + /* Hmm, ok, this entry only matched partially, so + * let's try another one */ + cp = np; + c = npo; + } +} + +static int next_beyond_location(sd_journal *j, JournalFile *f, direction_t direction, Object **ret, uint64_t *offset) { + Object *c; + uint64_t cp; + int compare_value, r; + + assert(j); + assert(f); + + if (f->current_offset > 0) { + cp = f->current_offset; + + r = journal_file_move_to_object(f, OBJECT_ENTRY, cp, &c); + if (r < 0) + return r; + + r = next_with_matches(j, f, direction, &c, &cp); + if (r <= 0) + return r; + + compare_value = 1; + } else { + r = find_location(j, f, direction, &c, &cp); + if (r <= 0) + return r; + + compare_value = 0; + } + + for (;;) { + bool found; + + if (j->current_location.type == LOCATION_DISCRETE) { + int k; + + k = compare_with_location(f, c, &j->current_location); + if (direction == DIRECTION_DOWN) + found = k >= compare_value; + else + found = k <= -compare_value; + } else + found = true; + + if (found) { + if (ret) + *ret = c; + if (offset) + *offset = cp; + return 1; + } + + r = next_with_matches(j, f, direction, &c, &cp); + if (r <= 0) + return r; + } +} + +static int real_journal_next(sd_journal *j, direction_t direction) { + JournalFile *f, *new_current = NULL; + Iterator i; + int r; + uint64_t new_offset = 0; + Object *new_entry = NULL; + + assert(j); + + HASHMAP_FOREACH(f, j->files, i) { + Object *o; + uint64_t p; + bool found; + + r = next_beyond_location(j, f, direction, &o, &p); + if (r < 0) + return r; + else if (r == 0) + continue; + + if (!new_current) + found = true; + else { + int k; + + k = compare_order(f, o, new_current, new_entry); + + if (direction == DIRECTION_DOWN) + found = k < 0; + else + found = k > 0; + } + + if (found) { + new_current = f; + new_entry = o; + new_offset = p; + } + } + + if (!new_current) + return 0; + + set_location(j, new_current, new_entry, new_offset); + + return 1; +} + +int sd_journal_next(sd_journal *j) { + return real_journal_next(j, DIRECTION_DOWN); +} + +int sd_journal_previous(sd_journal *j) { + return real_journal_next(j, DIRECTION_UP); +} + +int sd_journal_next_skip(sd_journal *j, uint64_t skip) { + int c = 0, r; + + assert(j); + + while (skip > 0) { + r = sd_journal_next(j); + if (r < 0) + return r; + + if (r == 0) + return c; + + skip--; + c++; + } + + return c; +} + +int sd_journal_previous_skip(sd_journal *j, uint64_t skip) { + int c = 0, r; + + assert(j); + + while (skip > 0) { + r = sd_journal_previous(j); + if (r < 0) + return r; + + if (r == 0) + return c; + + skip--; + c++; + } + + return 1; +} + +int sd_journal_get_cursor(sd_journal *j, char **cursor) { + Object *o; + int r; + char bid[33], sid[33]; + + assert(j); + assert(cursor); + + if (!j->current_file || j->current_file->current_offset <= 0) + return -EADDRNOTAVAIL; + + r = journal_file_move_to_object(j->current_file, OBJECT_ENTRY, j->current_file->current_offset, &o); + if (r < 0) + return r; + + sd_id128_to_string(j->current_file->header->seqnum_id, sid); + sd_id128_to_string(o->entry.boot_id, bid); + + if (asprintf(cursor, + "s=%s;i=%llx;b=%s;m=%llx;t=%llx;x=%llx;p=%s", + sid, (unsigned long long) le64toh(o->entry.seqnum), + bid, (unsigned long long) le64toh(o->entry.monotonic), + (unsigned long long) le64toh(o->entry.realtime), + (unsigned long long) le64toh(o->entry.xor_hash), + file_name_from_path(j->current_file->path)) < 0) + return -ENOMEM; + + return 1; +} + +int sd_journal_seek_cursor(sd_journal *j, const char *cursor) { + char *w; + size_t l; + char *state; + unsigned long long seqnum, monotonic, realtime, xor_hash; + bool + seqnum_id_set = false, + seqnum_set = false, + boot_id_set = false, + monotonic_set = false, + realtime_set = false, + xor_hash_set = false; + sd_id128_t seqnum_id, boot_id; + + assert(j); + assert(cursor); + + FOREACH_WORD_SEPARATOR(w, l, cursor, ";", state) { + char *item; + int k = 0; + + if (l < 2 || w[1] != '=') + return -EINVAL; + + item = strndup(w, l); + if (!item) + return -ENOMEM; + + switch (w[0]) { + + case 's': + seqnum_id_set = true; + k = sd_id128_from_string(w+2, &seqnum_id); + break; + + case 'i': + seqnum_set = true; + if (sscanf(w+2, "%llx", &seqnum) != 1) + k = -EINVAL; + break; + + case 'b': + boot_id_set = true; + k = sd_id128_from_string(w+2, &boot_id); + break; + + case 'm': + monotonic_set = true; + if (sscanf(w+2, "%llx", &monotonic) != 1) + k = -EINVAL; + break; + + case 't': + realtime_set = true; + if (sscanf(w+2, "%llx", &realtime) != 1) + k = -EINVAL; + break; + + case 'x': + xor_hash_set = true; + if (sscanf(w+2, "%llx", &xor_hash) != 1) + k = -EINVAL; + break; + } + + free(item); + + if (k < 0) + return k; + } + + if ((!seqnum_set || !seqnum_id_set) && + (!monotonic_set || !boot_id_set) && + !realtime_set) + return -EINVAL; + + reset_location(j); + + j->current_location.type = LOCATION_DISCRETE; + + if (realtime_set) { + j->current_location.realtime = (uint64_t) realtime; + j->current_location.realtime_set = true; + } + + if (seqnum_set && seqnum_id_set) { + j->current_location.seqnum = (uint64_t) seqnum; + j->current_location.seqnum_id = seqnum_id; + j->current_location.seqnum_set = true; + } + + if (monotonic_set && boot_id_set) { + j->current_location.monotonic = (uint64_t) monotonic; + j->current_location.boot_id = boot_id; + j->current_location.monotonic_set = true; + } + + if (xor_hash_set) { + j->current_location.xor_hash = (uint64_t) xor_hash; + j->current_location.xor_hash_set = true; + } + + return 0; +} + +int sd_journal_seek_monotonic_usec(sd_journal *j, sd_id128_t boot_id, uint64_t usec) { + assert(j); + + reset_location(j); + j->current_location.type = LOCATION_DISCRETE; + j->current_location.boot_id = boot_id; + j->current_location.monotonic = usec; + j->current_location.monotonic_set = true; + + return 0; +} + +int sd_journal_seek_realtime_usec(sd_journal *j, uint64_t usec) { + assert(j); + + reset_location(j); + j->current_location.type = LOCATION_DISCRETE; + j->current_location.realtime = usec; + j->current_location.realtime_set = true; + + return 0; +} + +int sd_journal_seek_head(sd_journal *j) { + assert(j); + + reset_location(j); + j->current_location.type = LOCATION_HEAD; + + return 0; +} + +int sd_journal_seek_tail(sd_journal *j) { + assert(j); + + reset_location(j); + j->current_location.type = LOCATION_TAIL; + + return 0; +} + +static int add_file(sd_journal *j, const char *prefix, const char *dir, const char *filename) { + char *fn; + int r; + JournalFile *f; + + assert(j); + assert(prefix); + assert(filename); + + if ((j->flags & SD_JOURNAL_SYSTEM_ONLY) && + !startswith(filename, "system.journal")) + return 0; + + if (dir) + fn = join(prefix, "/", dir, "/", filename, NULL); + else + fn = join(prefix, "/", filename, NULL); + + if (!fn) + return -ENOMEM; + + if (hashmap_get(j->files, fn)) { + free(fn); + return 0; + } + + if (hashmap_size(j->files) >= JOURNAL_FILES_MAX) { + log_debug("Too many open journal files, not adding %s, ignoring.", fn); + free(fn); + return 0; + } + + r = journal_file_open(fn, O_RDONLY, 0, NULL, &f); + free(fn); + + if (r < 0) { + if (errno == ENOENT) + return 0; + + return r; + } + + /* journal_file_dump(f); */ + + r = hashmap_put(j->files, f->path, f); + if (r < 0) { + journal_file_close(f); + return r; + } + + log_debug("File %s got added.", f->path); + + return 0; +} + +static int remove_file(sd_journal *j, const char *prefix, const char *dir, const char *filename) { + char *fn; + JournalFile *f; + + assert(j); + assert(prefix); + assert(filename); + + if (dir) + fn = join(prefix, "/", dir, "/", filename, NULL); + else + fn = join(prefix, "/", filename, NULL); + + if (!fn) + return -ENOMEM; + + f = hashmap_get(j->files, fn); + free(fn); + + if (!f) + return 0; + + hashmap_remove(j->files, f->path); + journal_file_close(f); + + log_debug("File %s got removed.", f->path); + return 0; +} + +static int add_directory(sd_journal *j, const char *prefix, const char *dir) { + char *fn; + int r; + DIR *d; + int wd; + sd_id128_t id, mid; + + assert(j); + assert(prefix); + assert(dir); + + if ((j->flags & SD_JOURNAL_LOCAL_ONLY) && + (sd_id128_from_string(dir, &id) < 0 || + sd_id128_get_machine(&mid) < 0 || + !sd_id128_equal(id, mid))) + return 0; + + fn = join(prefix, "/", dir, NULL); + if (!fn) + return -ENOMEM; + + d = opendir(fn); + + if (!d) { + free(fn); + if (errno == ENOENT) + return 0; + + return -errno; + } + + wd = inotify_add_watch(j->inotify_fd, fn, + IN_CREATE|IN_MOVED_TO|IN_MODIFY|IN_ATTRIB|IN_DELETE| + IN_DELETE_SELF|IN_MOVE_SELF|IN_UNMOUNT| + IN_DONT_FOLLOW|IN_ONLYDIR); + if (wd > 0) { + if (hashmap_put(j->inotify_wd_dirs, INT_TO_PTR(wd), fn) < 0) + inotify_rm_watch(j->inotify_fd, wd); + else + fn = NULL; + } + + free(fn); + + for (;;) { + struct dirent buf, *de; + + r = readdir_r(d, &buf, &de); + if (r != 0 || !de) + break; + + if (!dirent_is_file_with_suffix(de, ".journal")) + continue; + + r = add_file(j, prefix, dir, de->d_name); + if (r < 0) + log_debug("Failed to add file %s/%s/%s: %s", prefix, dir, de->d_name, strerror(-r)); + } + + closedir(d); + + log_debug("Directory %s/%s got added.", prefix, dir); + + return 0; +} + +static void remove_directory_wd(sd_journal *j, int wd) { + char *p; + + assert(j); + assert(wd > 0); + + if (j->inotify_fd >= 0) + inotify_rm_watch(j->inotify_fd, wd); + + p = hashmap_remove(j->inotify_wd_dirs, INT_TO_PTR(wd)); + + if (p) { + log_debug("Directory %s got removed.", p); + free(p); + } +} + +static void add_root_wd(sd_journal *j, const char *p) { + int wd; + char *k; + + assert(j); + assert(p); + + wd = inotify_add_watch(j->inotify_fd, p, + IN_CREATE|IN_MOVED_TO|IN_MODIFY|IN_ATTRIB|IN_DELETE| + IN_DONT_FOLLOW|IN_ONLYDIR); + if (wd <= 0) + return; + + k = strdup(p); + if (!k || hashmap_put(j->inotify_wd_roots, INT_TO_PTR(wd), k) < 0) { + inotify_rm_watch(j->inotify_fd, wd); + free(k); + } +} + +static void remove_root_wd(sd_journal *j, int wd) { + char *p; + + assert(j); + assert(wd > 0); + + if (j->inotify_fd >= 0) + inotify_rm_watch(j->inotify_fd, wd); + + p = hashmap_remove(j->inotify_wd_roots, INT_TO_PTR(wd)); + + if (p) { + log_debug("Root %s got removed.", p); + free(p); + } +} + +int sd_journal_open(sd_journal **ret, int flags) { + sd_journal *j; + const char *p; + const char search_paths[] = + "/run/log/journal\0" + "/var/log/journal\0"; + int r; + + assert(ret); + + j = new0(sd_journal, 1); + if (!j) + return -ENOMEM; + + j->flags = flags; + + j->inotify_fd = inotify_init1(IN_NONBLOCK|IN_CLOEXEC); + if (j->inotify_fd < 0) { + r = -errno; + goto fail; + } + + j->files = hashmap_new(string_hash_func, string_compare_func); + if (!j->files) { + r = -ENOMEM; + goto fail; + } + + j->inotify_wd_dirs = hashmap_new(trivial_hash_func, trivial_compare_func); + j->inotify_wd_roots = hashmap_new(trivial_hash_func, trivial_compare_func); + + if (!j->inotify_wd_dirs || !j->inotify_wd_roots) { + r = -ENOMEM; + goto fail; + } + + /* We ignore most errors here, since the idea is to only open + * what's actually accessible, and ignore the rest. */ + + NULSTR_FOREACH(p, search_paths) { + DIR *d; + + if ((flags & SD_JOURNAL_RUNTIME_ONLY) && + !path_startswith(p, "/run")) + continue; + + d = opendir(p); + if (!d) { + if (errno != ENOENT) + log_debug("Failed to open %s: %m", p); + continue; + } + + add_root_wd(j, p); + + for (;;) { + struct dirent buf, *de; + sd_id128_t id; + + r = readdir_r(d, &buf, &de); + if (r != 0 || !de) + break; + + if (dirent_is_file_with_suffix(de, ".journal")) { + r = add_file(j, p, NULL, de->d_name); + if (r < 0) + log_debug("Failed to add file %s/%s: %s", p, de->d_name, strerror(-r)); + + } else if ((de->d_type == DT_DIR || de->d_type == DT_UNKNOWN) && + sd_id128_from_string(de->d_name, &id) >= 0) { + + r = add_directory(j, p, de->d_name); + if (r < 0) + log_debug("Failed to add directory %s/%s: %s", p, de->d_name, strerror(-r)); + } + } + + closedir(d); + } + + *ret = j; + return 0; + +fail: + sd_journal_close(j); + + return r; +}; + +void sd_journal_close(sd_journal *j) { + assert(j); + + if (j->inotify_wd_dirs) { + void *k; + + while ((k = hashmap_first_key(j->inotify_wd_dirs))) + remove_directory_wd(j, PTR_TO_INT(k)); + + hashmap_free(j->inotify_wd_dirs); + } + + if (j->inotify_wd_roots) { + void *k; + + while ((k = hashmap_first_key(j->inotify_wd_roots))) + remove_root_wd(j, PTR_TO_INT(k)); + + hashmap_free(j->inotify_wd_roots); + } + + if (j->files) { + JournalFile *f; + + while ((f = hashmap_steal_first(j->files))) + journal_file_close(f); + + hashmap_free(j->files); + } + + sd_journal_flush_matches(j); + + if (j->inotify_fd >= 0) + close_nointr_nofail(j->inotify_fd); + + free(j); +} + +int sd_journal_get_realtime_usec(sd_journal *j, uint64_t *ret) { + Object *o; + JournalFile *f; + int r; + + assert(j); + assert(ret); + + f = j->current_file; + if (!f) + return -EADDRNOTAVAIL; + + if (f->current_offset <= 0) + return -EADDRNOTAVAIL; + + r = journal_file_move_to_object(f, OBJECT_ENTRY, f->current_offset, &o); + if (r < 0) + return r; + + *ret = le64toh(o->entry.realtime); + return 0; +} + +int sd_journal_get_monotonic_usec(sd_journal *j, uint64_t *ret, sd_id128_t *ret_boot_id) { + Object *o; + JournalFile *f; + int r; + sd_id128_t id; + + assert(j); + assert(ret); + + f = j->current_file; + if (!f) + return -EADDRNOTAVAIL; + + if (f->current_offset <= 0) + return -EADDRNOTAVAIL; + + r = journal_file_move_to_object(f, OBJECT_ENTRY, f->current_offset, &o); + if (r < 0) + return r; + + if (ret_boot_id) + *ret_boot_id = o->entry.boot_id; + else { + r = sd_id128_get_boot(&id); + if (r < 0) + return r; + + if (!sd_id128_equal(id, o->entry.boot_id)) + return -ENOENT; + } + + *ret = le64toh(o->entry.monotonic); + return 0; +} + +int sd_journal_get_data(sd_journal *j, const char *field, const void **data, size_t *size) { + JournalFile *f; + uint64_t i, n; + size_t field_length; + int r; + Object *o; + + assert(j); + assert(field); + assert(data); + assert(size); + + if (isempty(field) || strchr(field, '=')) + return -EINVAL; + + f = j->current_file; + if (!f) + return -EADDRNOTAVAIL; + + if (f->current_offset <= 0) + return -EADDRNOTAVAIL; + + r = journal_file_move_to_object(f, OBJECT_ENTRY, f->current_offset, &o); + if (r < 0) + return r; + + field_length = strlen(field); + + n = journal_file_entry_n_items(o); + for (i = 0; i < n; i++) { + uint64_t p, l, le_hash; + size_t t; + + p = le64toh(o->entry.items[i].object_offset); + le_hash = o->entry.items[i].hash; + r = journal_file_move_to_object(f, OBJECT_DATA, p, &o); + if (r < 0) + return r; + + if (le_hash != o->data.hash) + return -EBADMSG; + + l = le64toh(o->object.size) - offsetof(Object, data.payload); + + if (o->object.flags & OBJECT_COMPRESSED) { + +#ifdef HAVE_XZ + if (uncompress_startswith(o->data.payload, l, + &f->compress_buffer, &f->compress_buffer_size, + field, field_length, '=')) { + + uint64_t rsize; + + if (!uncompress_blob(o->data.payload, l, + &f->compress_buffer, &f->compress_buffer_size, &rsize)) + return -EBADMSG; + + *data = f->compress_buffer; + *size = (size_t) rsize; + + return 0; + } +#else + return -EPROTONOSUPPORT; +#endif + + } else if (l >= field_length+1 && + memcmp(o->data.payload, field, field_length) == 0 && + o->data.payload[field_length] == '=') { + + t = (size_t) l; + + if ((uint64_t) t != l) + return -E2BIG; + + *data = o->data.payload; + *size = t; + + return 0; + } + + r = journal_file_move_to_object(f, OBJECT_ENTRY, f->current_offset, &o); + if (r < 0) + return r; + } + + return -ENOENT; +} + +int sd_journal_enumerate_data(sd_journal *j, const void **data, size_t *size) { + JournalFile *f; + uint64_t p, l, n, le_hash; + int r; + Object *o; + size_t t; + + assert(j); + assert(data); + assert(size); + + f = j->current_file; + if (!f) + return -EADDRNOTAVAIL; + + if (f->current_offset <= 0) + return -EADDRNOTAVAIL; + + r = journal_file_move_to_object(f, OBJECT_ENTRY, f->current_offset, &o); + if (r < 0) + return r; + + n = journal_file_entry_n_items(o); + if (j->current_field >= n) + return 0; + + p = le64toh(o->entry.items[j->current_field].object_offset); + le_hash = o->entry.items[j->current_field].hash; + r = journal_file_move_to_object(f, OBJECT_DATA, p, &o); + if (r < 0) + return r; + + if (le_hash != o->data.hash) + return -EBADMSG; + + l = le64toh(o->object.size) - offsetof(Object, data.payload); + t = (size_t) l; + + /* We can't read objects larger than 4G on a 32bit machine */ + if ((uint64_t) t != l) + return -E2BIG; + + if (o->object.flags & OBJECT_COMPRESSED) { +#ifdef HAVE_XZ + uint64_t rsize; + + if (!uncompress_blob(o->data.payload, l, &f->compress_buffer, &f->compress_buffer_size, &rsize)) + return -EBADMSG; + + *data = f->compress_buffer; + *size = (size_t) rsize; +#else + return -EPROTONOSUPPORT; +#endif + } else { + *data = o->data.payload; + *size = t; + } + + j->current_field ++; + + return 1; +} + +void sd_journal_restart_data(sd_journal *j) { + assert(j); + + j->current_field = 0; +} + +int sd_journal_get_fd(sd_journal *j) { + assert(j); + + return j->inotify_fd; +} + +static void process_inotify_event(sd_journal *j, struct inotify_event *e) { + char *p; + int r; + + assert(j); + assert(e); + + /* Is this a subdirectory we watch? */ + p = hashmap_get(j->inotify_wd_dirs, INT_TO_PTR(e->wd)); + if (p) { + + if (!(e->mask & IN_ISDIR) && e->len > 0 && endswith(e->name, ".journal")) { + + /* Event for a journal file */ + + if (e->mask & (IN_CREATE|IN_MOVED_TO|IN_MODIFY|IN_ATTRIB)) { + r = add_file(j, p, NULL, e->name); + if (r < 0) + log_debug("Failed to add file %s/%s: %s", p, e->name, strerror(-r)); + } else if (e->mask & (IN_DELETE|IN_UNMOUNT)) { + + r = remove_file(j, p, NULL, e->name); + if (r < 0) + log_debug("Failed to remove file %s/%s: %s", p, e->name, strerror(-r)); + } + + } else if (e->len == 0) { + + /* Event for the directory itself */ + + if (e->mask & (IN_DELETE_SELF|IN_MOVE_SELF|IN_UNMOUNT)) + remove_directory_wd(j, e->wd); + } + + return; + } + + /* Must be the root directory then? */ + p = hashmap_get(j->inotify_wd_roots, INT_TO_PTR(e->wd)); + if (p) { + sd_id128_t id; + + if (!(e->mask & IN_ISDIR) && e->len > 0 && endswith(e->name, ".journal")) { + + /* Event for a journal file */ + + if (e->mask & (IN_CREATE|IN_MOVED_TO|IN_MODIFY|IN_ATTRIB)) { + r = add_file(j, p, NULL, e->name); + if (r < 0) + log_debug("Failed to add file %s/%s: %s", p, e->name, strerror(-r)); + } else if (e->mask & (IN_DELETE|IN_UNMOUNT)) { + + r = remove_file(j, p, NULL, e->name); + if (r < 0) + log_debug("Failed to remove file %s/%s: %s", p, e->name, strerror(-r)); + } + + } else if ((e->mask & IN_ISDIR) && e->len > 0 && sd_id128_from_string(e->name, &id) >= 0) { + + /* Event for subdirectory */ + + if (e->mask & (IN_CREATE|IN_MOVED_TO|IN_MODIFY|IN_ATTRIB)) { + + r = add_directory(j, p, e->name); + if (r < 0) + log_debug("Failed to add directory %s/%s: %s", p, e->name, strerror(-r)); + } + } + + return; + } + + if (e->mask & IN_IGNORED) + return; + + log_warning("Unknown inotify event."); +} + +int sd_journal_process(sd_journal *j) { + uint8_t buffer[sizeof(struct inotify_event) + FILENAME_MAX]; + + assert(j); + + for (;;) { + struct inotify_event *e; + ssize_t l; + + l = read(j->inotify_fd, buffer, sizeof(buffer)); + if (l < 0) { + if (errno == EINTR || errno == EAGAIN) + return 0; + + return -errno; + } + + e = (struct inotify_event*) buffer; + while (l > 0) { + size_t step; + + process_inotify_event(j, e); + + step = sizeof(struct inotify_event) + e->len; + assert(step <= (size_t) l); + + e = (struct inotify_event*) ((uint8_t*) e + step); + l -= step; + } + } +} + +int sd_journal_query_unique(sd_journal *j, const char *field) { + assert(j); + assert(field); + + return -ENOTSUP; +} + +int sd_journal_enumerate_unique(sd_journal *j, const void **data, size_t *l) { + assert(j); + + return -ENOTSUP; +} + +void sd_journal_restart_unique(sd_journal *j) { + assert(j); +} diff --git a/src/journal/sd-journal.h b/src/journal/sd-journal.h new file mode 100644 index 0000000000..97f9f0fa13 --- /dev/null +++ b/src/journal/sd-journal.h @@ -0,0 +1,125 @@ +/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/ + +#ifndef foojournalhfoo +#define foojournalhfoo + +/*** + This file is part of systemd. + + Copyright 2011 Lennart Poettering + + systemd is free software; you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + systemd is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with systemd; If not, see <http://www.gnu.org/licenses/>. +***/ + +#include <inttypes.h> +#include <sys/types.h> +#include <stdarg.h> +#include <sys/uio.h> + +#include "sd-id128.h" + +/* TODO: + * + * - OR of matches is borked... + * - extend hash tables table as we go + * - accelerate looking for "all hostnames" and suchlike. + * - hookup with systemctl + * - handle incomplete header + * - write unit files + * + * - local deserializer + * - http server + * - message catalog + * + * - check LE/BE conversion for 8bit, 16bit, 32bit values + * - cryptographic hash + * - think about manipulations of header + * - implement audit gateway + */ + +/* Write to daemon */ + +int sd_journal_print(int piority, const char *format, ...) __attribute__ ((format (printf, 2, 3))); +int sd_journal_printv(int priority, const char *format, va_list ap); + +int sd_journal_send(const char *format, ...) __attribute__((sentinel)); +int sd_journal_sendv(const struct iovec *iov, int n); + +int sd_journal_stream_fd(const char *tag, int priority, int priority_prefix); + +/* Browse journal stream */ + +typedef struct sd_journal sd_journal; + +enum { + SD_JOURNAL_LOCAL_ONLY = 1, + SD_JOURNAL_RUNTIME_ONLY = 2, + SD_JOURNAL_SYSTEM_ONLY = 4 +}; + +int sd_journal_open(sd_journal **ret, int flags); +void sd_journal_close(sd_journal *j); + +int sd_journal_previous(sd_journal *j); +int sd_journal_next(sd_journal *j); + +int sd_journal_previous_skip(sd_journal *j, uint64_t skip); +int sd_journal_next_skip(sd_journal *j, uint64_t skip); + +int sd_journal_get_realtime_usec(sd_journal *j, uint64_t *ret); +int sd_journal_get_monotonic_usec(sd_journal *j, uint64_t *ret, sd_id128_t *ret_boot_id); +int sd_journal_get_data(sd_journal *j, const char *field, const void **data, size_t *l); +int sd_journal_enumerate_data(sd_journal *j, const void **data, size_t *l); +void sd_journal_restart_data(sd_journal *j); + +int sd_journal_add_match(sd_journal *j, const void *data, size_t size); +void sd_journal_flush_matches(sd_journal *j); + +int sd_journal_seek_head(sd_journal *j); +int sd_journal_seek_tail(sd_journal *j); +int sd_journal_seek_monotonic_usec(sd_journal *j, sd_id128_t boot_id, uint64_t usec); +int sd_journal_seek_realtime_usec(sd_journal *j, uint64_t usec); +int sd_journal_seek_cursor(sd_journal *j, const char *cursor); + +int sd_journal_get_cursor(sd_journal *j, char **cursor); + +int sd_journal_query_unique(sd_journal *j, const char *field); /* missing */ +int sd_journal_enumerate_unique(sd_journal *j, const void **data, size_t *l); /* missing */ +void sd_journal_restart_unique(sd_journal *j); /* missing */ + +enum { + SD_JOURNAL_NOP, + SD_JOURNAL_APPEND, + SD_JOURNAL_INVALIDATE_ADD, + SD_JOURNAL_INVALIDATE_REMOVE +}; + +int sd_journal_get_fd(sd_journal *j); +int sd_journal_process(sd_journal *j); + +#define SD_JOURNAL_FOREACH(j) \ + if (sd_journal_seek_head(j) >= 0) \ + while (sd_journal_next(j) > 0) + +#define SD_JOURNAL_FOREACH_BACKWARDS(j) \ + if (sd_journal_seek_tail(j) >= 0) \ + while (sd_journal_previous(j) > 0) + +#define SD_JOURNAL_FOREACH_DATA(j, data, l) \ + for (sd_journal_restart_data(j); sd_journal_enumerate_data((j), &(data), &(l)) > 0; ) + +#define SD_JOURNAL_FOREACH_UNIQUE(j, data, l) \ + for (sd_journal_restart_unique(j); sd_journal_enumerate_data((j), &(data), &(l)) > 0; ) + +#endif diff --git a/src/journal/test-journal.c b/src/journal/test-journal.c new file mode 100644 index 0000000000..3d429bea90 --- /dev/null +++ b/src/journal/test-journal.c @@ -0,0 +1,119 @@ +/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/ + +/*** + This file is part of systemd. + + Copyright 2011 Lennart Poettering + + systemd is free software; you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + systemd is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with systemd; If not, see <http://www.gnu.org/licenses/>. +***/ + +#include <fcntl.h> +#include <unistd.h> + +#include "sd-journal.h" +#include "journal-file.h" +#include "log.h" + +int main(int argc, char *argv[]) { + dual_timestamp ts; + JournalFile *f; + struct iovec iovec; + static const char test[] = "test", test2[] = "test2"; + Object *o; + uint64_t p; + + log_set_max_level(LOG_DEBUG); + + unlink("test.journal"); + + assert_se(journal_file_open("test.journal", O_RDWR|O_CREAT, 0666, NULL, &f) == 0); + + dual_timestamp_get(&ts); + + iovec.iov_base = (void*) test; + iovec.iov_len = strlen(test); + assert_se(journal_file_append_entry(f, &ts, &iovec, 1, NULL, NULL, NULL) == 0); + + iovec.iov_base = (void*) test2; + iovec.iov_len = strlen(test2); + assert_se(journal_file_append_entry(f, &ts, &iovec, 1, NULL, NULL, NULL) == 0); + + iovec.iov_base = (void*) test; + iovec.iov_len = strlen(test); + assert_se(journal_file_append_entry(f, &ts, &iovec, 1, NULL, NULL, NULL) == 0); + + journal_file_dump(f); + + assert(journal_file_next_entry(f, NULL, 0, DIRECTION_DOWN, &o, &p) == 1); + assert(le64toh(o->entry.seqnum) == 1); + + assert(journal_file_next_entry(f, o, p, DIRECTION_DOWN, &o, &p) == 1); + assert(le64toh(o->entry.seqnum) == 2); + + assert(journal_file_next_entry(f, o, p, DIRECTION_DOWN, &o, &p) == 1); + assert(le64toh(o->entry.seqnum) == 3); + + assert(journal_file_next_entry(f, o, p, DIRECTION_DOWN, &o, &p) == 0); + + assert(journal_file_next_entry(f, NULL, 0, DIRECTION_DOWN, &o, &p) == 1); + assert(le64toh(o->entry.seqnum) == 1); + + assert(journal_file_skip_entry(f, o, p, 2, &o, &p) == 1); + assert(le64toh(o->entry.seqnum) == 3); + + assert(journal_file_skip_entry(f, o, p, -2, &o, &p) == 1); + assert(le64toh(o->entry.seqnum) == 1); + + assert(journal_file_skip_entry(f, o, p, -2, &o, &p) == 1); + assert(le64toh(o->entry.seqnum) == 1); + + assert(journal_file_find_data_object(f, test, strlen(test), NULL, &p) == 1); + assert(journal_file_next_entry_for_data(f, NULL, 0, p, DIRECTION_DOWN, &o, NULL) == 1); + assert(le64toh(o->entry.seqnum) == 1); + + assert(journal_file_next_entry_for_data(f, NULL, 0, p, DIRECTION_UP, &o, NULL) == 1); + assert(le64toh(o->entry.seqnum) == 3); + + assert(journal_file_find_data_object(f, test2, strlen(test2), NULL, &p) == 1); + assert(journal_file_next_entry_for_data(f, NULL, 0, p, DIRECTION_UP, &o, NULL) == 1); + assert(le64toh(o->entry.seqnum) == 2); + + assert(journal_file_next_entry_for_data(f, NULL, 0, p, DIRECTION_DOWN, &o, NULL) == 1); + assert(le64toh(o->entry.seqnum) == 2); + + assert(journal_file_find_data_object(f, "quux", 4, NULL, &p) == 0); + + assert(journal_file_move_to_entry_by_seqnum(f, 1, DIRECTION_DOWN, &o, NULL) == 1); + assert(le64toh(o->entry.seqnum) == 1); + + assert(journal_file_move_to_entry_by_seqnum(f, 3, DIRECTION_DOWN, &o, NULL) == 1); + assert(le64toh(o->entry.seqnum) == 3); + + assert(journal_file_move_to_entry_by_seqnum(f, 2, DIRECTION_DOWN, &o, NULL) == 1); + assert(le64toh(o->entry.seqnum) == 2); + + assert(journal_file_move_to_entry_by_seqnum(f, 10, DIRECTION_DOWN, &o, NULL) == 0); + + journal_file_rotate(&f); + journal_file_rotate(&f); + + journal_file_close(f); + + journal_directory_vacuum(".", 3000000, 0); + + log_error("Exiting..."); + + return 0; +} |