diff options
| author | Lennart Poettering <lennart@poettering.net> | 2011-12-29 15:00:57 +0100 | 
|---|---|---|
| committer | Lennart Poettering <lennart@poettering.net> | 2011-12-29 15:00:57 +0100 | 
| commit | cf244689e9d1ab50082c9ddd0f3c4d1eb982badc (patch) | |
| tree | 0b2b9438e6269b1e9b4aee155d4eda60f2a8d720 | |
| parent | de97b26ac5e29063632312ec1a20eb6318ca924c (diff) | |
journald: flush /run to /var as soon as it becomes available
| -rw-r--r-- | src/journal/journal-file.c | 93 | ||||
| -rw-r--r-- | src/journal/journal-file.h | 4 | ||||
| -rw-r--r-- | src/journal/journalctl.c | 2 | ||||
| -rw-r--r-- | src/journal/journald.c | 248 | ||||
| -rw-r--r-- | src/journal/sd-journal.c | 69 | ||||
| -rw-r--r-- | src/journal/sd-journal.h | 23 | 
6 files changed, 309 insertions, 130 deletions
| diff --git a/src/journal/journal-file.c b/src/journal/journal-file.c index 6c7718de31..190bfb996b 100644 --- a/src/journal/journal-file.c +++ b/src/journal/journal-file.c @@ -950,7 +950,7 @@ static int journal_file_append_entry_internal(          return 0;  } -static void journal_file_post_change(JournalFile *f) { +void journal_file_post_change(JournalFile *f) {          assert(f);          /* inotify() does not receive IN_MODIFY events from file @@ -989,9 +989,7 @@ int journal_file_append_entry(JournalFile *f, const dual_timestamp *ts, const st          if (ts->realtime < le64toh(f->header->tail_entry_realtime))                  return -EINVAL; -        items = new(EntryItem, n_iovec); -        if (!items) -                return -ENOMEM; +        items = alloca(sizeof(EntryItem) * n_iovec);          for (i = 0; i < n_iovec; i++) {                  uint64_t p; @@ -999,7 +997,7 @@ int journal_file_append_entry(JournalFile *f, const dual_timestamp *ts, const st                  r = journal_file_append_data(f, iovec[i].iov_base, iovec[i].iov_len, &o, &p);                  if (r < 0) -                        goto finish; +                        return r;                  xor_hash ^= le64toh(o->data.hash);                  items[i].object_offset = htole64(p); @@ -1010,9 +1008,6 @@ int journal_file_append_entry(JournalFile *f, const dual_timestamp *ts, const st          journal_file_post_change(f); -finish: -        free(items); -          return r;  } @@ -1999,3 +1994,85 @@ finish:          return r;  } + +int journal_file_copy_entry(JournalFile *from, JournalFile *to, Object *o, uint64_t p, uint64_t *seqnum, Object **ret, uint64_t *offset) { +        uint64_t i, n; +        uint64_t q, xor_hash = 0; +        int r; +        EntryItem *items; +        dual_timestamp ts; + +        assert(from); +        assert(to); +        assert(o); +        assert(p); + +        if (!to->writable) +                return -EPERM; + +        ts.monotonic = le64toh(o->entry.monotonic); +        ts.realtime = le64toh(o->entry.realtime); + +        if (to->tail_entry_monotonic_valid && +            ts.monotonic < le64toh(to->header->tail_entry_monotonic)) +                return -EINVAL; + +        if (ts.realtime < le64toh(to->header->tail_entry_realtime)) +                return -EINVAL; + +        n = journal_file_entry_n_items(o); +        items = alloca(sizeof(EntryItem) * n); + +        for (i = 0; i < n; i++) { +                uint64_t le_hash, l, h; +                size_t t; +                void *data; +                Object *u; + +                q = le64toh(o->entry.items[i].object_offset); +                le_hash = o->entry.items[i].hash; + +                r = journal_file_move_to_object(from, OBJECT_DATA, q, &o); +                if (r < 0) +                        return r; + +                if (le_hash != o->data.hash) +                        return -EBADMSG; + +                l = le64toh(o->object.size) - offsetof(Object, data.payload); +                t = (size_t) l; + +                /* We hit the limit on 32bit machines */ +                if ((uint64_t) t != l) +                        return -E2BIG; + +                if (o->object.flags & OBJECT_COMPRESSED) { +#ifdef HAVE_XZ +                        uint64_t rsize; + +                        if (!uncompress_blob(o->data.payload, l, &from->compress_buffer, &from->compress_buffer_size, &rsize)) +                                return -EBADMSG; + +                        data = from->compress_buffer; +                        l = rsize; +#else +                        return -EPROTONOSUPPORT; +#endif +                } else +                        data = o->data.payload; + +                r = journal_file_append_data(to, data, l, &u, &h); +                if (r < 0) +                        return r; + +                xor_hash ^= le64toh(u->data.hash); +                items[i].object_offset = htole64(h); +                items[i].hash = u->data.hash; + +                r = journal_file_move_to_object(from, OBJECT_ENTRY, p, &o); +                if (r < 0) +                        return r; +        } + +        return journal_file_append_entry_internal(to, &ts, xor_hash, items, n, seqnum, ret, offset); +} diff --git a/src/journal/journal-file.h b/src/journal/journal-file.h index 421dfa6766..ab2970ca00 100644 --- a/src/journal/journal-file.h +++ b/src/journal/journal-file.h @@ -113,10 +113,14 @@ int journal_file_move_to_entry_by_monotonic(JournalFile *f, sd_id128_t boot_id,  int journal_file_move_to_entry_by_seqnum_for_data(JournalFile *f, uint64_t data_offset, uint64_t seqnum, direction_t direction, Object **ret, uint64_t *offset);  int journal_file_move_to_entry_by_realtime_for_data(JournalFile *f, uint64_t data_offset, uint64_t realtime, direction_t direction, Object **ret, uint64_t *offset); +int journal_file_copy_entry(JournalFile *from, JournalFile *to, Object *o, uint64_t p, uint64_t *seqnum, Object **ret, uint64_t *offset); +  void journal_file_dump(JournalFile *f);  int journal_file_rotate(JournalFile **f);  int journal_directory_vacuum(const char *directory, uint64_t max_use, uint64_t min_free); +void journal_file_post_change(JournalFile *f); +  #endif diff --git a/src/journal/journalctl.c b/src/journal/journalctl.c index 5a1cb6e88a..da4f51021d 100644 --- a/src/journal/journalctl.c +++ b/src/journal/journalctl.c @@ -454,7 +454,7 @@ int main(int argc, char *argv[]) {          if (r <= 0)                  goto finish; -        r = sd_journal_open(&j); +        r = sd_journal_open(&j, 0);          if (r < 0) {                  log_error("Failed to open journal: %s", strerror(-r));                  goto finish; diff --git a/src/journal/journald.c b/src/journal/journald.c index e7231d96f7..1efe0420db 100644 --- a/src/journal/journald.c +++ b/src/journal/journald.c @@ -40,6 +40,8 @@  #include "cgroup-util.h"  #include "list.h"  #include "journal-rate-limit.h" +#include "sd-journal.h" +#include "journal-internal.h"  #define USER_JOURNALS_MAX 1024  #define STDOUT_STREAMS_MAX 4096 @@ -107,6 +109,8 @@ struct StdoutStream {          LIST_FIELDS(StdoutStream, stdout_stream);  }; +static int server_flush_to_var(Server *s); +  static uint64_t available_space(Server *s) {          char ids[33];          sd_id128_t machine; @@ -239,8 +243,12 @@ static JournalFile* find_journal(Server *s, uid_t uid) {          assert(s); -        /* We split up user logs only on /var, not on /run */ -        if (!s->system_journal) +        /* We split up user logs only on /var, not on /run. If the +         * runtime file is open, we write to it exclusively, in order +         * to guarantee proper order as soon as we flush /run to +         * /var and close the runtime file. */ + +        if (s->runtime_journal)                  return s->runtime_journal;          if (uid <= 0) @@ -486,6 +494,8 @@ static void dispatch_message_real(Server *s,          assert(n <= m); +        server_flush_to_var(s); +  retry:          f = find_journal(s, realuid == 0 ? 0 : loginuid);          if (!f) @@ -1088,6 +1098,170 @@ fail:          return r;  } +static int system_journal_open(Server *s) { +        int r; +        char *fn; +        sd_id128_t machine; +        char ids[33]; + +        r = sd_id128_get_machine(&machine); +        if (r < 0) +                return r; + +        sd_id128_to_string(machine, ids); + +        if (!s->system_journal) { + +                /* First try to create the machine path, but not the prefix */ +                fn = strappend("/var/log/journal/", ids); +                if (!fn) +                        return -ENOMEM; +                (void) mkdir(fn, 0755); +                free(fn); + +                /* The create the system journal file */ +                fn = join("/var/log/journal/", ids, "/system.journal", NULL); +                if (!fn) +                        return -ENOMEM; + +                r = journal_file_open(fn, O_RDWR|O_CREAT, 0640, NULL, &s->system_journal); +                free(fn); + +                if (r >= 0) { +                        s->system_journal->metrics = s->metrics; +                        s->system_journal->compress = s->compress; + +                        fix_perms(s->system_journal, 0); +                } else if (r < 0) { + +                        if (r == -ENOENT) +                                r = 0; +                        else { +                                log_error("Failed to open system journal: %s", strerror(-r)); +                                return r; +                        } +                } +        } + +        if (!s->runtime_journal) { + +                fn = join("/run/log/journal/", ids, "/system.journal", NULL); +                if (!fn) +                        return -ENOMEM; + +                if (s->system_journal) { + +                        /* Try to open the runtime journal, but only +                         * if it already exists, so that we can flush +                         * it into the system journal */ + +                        r = journal_file_open(fn, O_RDWR, 0640, NULL, &s->runtime_journal); +                        free(fn); + +                        if (r < 0) { + +                                if (r == -ENOENT) +                                        r = 0; +                                else { +                                        log_error("Failed to open runtime journal: %s", strerror(-r)); +                                        return r; +                                } +                        } + +                } else { + +                        /* OK, we really need the runtime journal, so create +                         * it if necessary. */ + +                        (void) mkdir_parents(fn, 0755); +                        r = journal_file_open(fn, O_RDWR|O_CREAT, 0640, NULL, &s->runtime_journal); +                        free(fn); + +                        if (r < 0) { +                                log_error("Failed to open runtime journal: %s", strerror(-r)); +                                return r; +                        } +                } + +                if (s->runtime_journal) { +                        s->runtime_journal->metrics = s->metrics; +                        s->runtime_journal->compress = s->compress; + +                        fix_perms(s->runtime_journal, 0); +                } +        } + +        return r; +} + +static int server_flush_to_var(Server *s) { +        char path[] = "/run/log/journal/xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"; +        Object *o = NULL; +        int r; +        sd_id128_t machine; +        sd_journal *j; + +        assert(s); + +        system_journal_open(s); + +        if (!s->system_journal || !s->runtime_journal) +                return 0; + +        r = sd_id128_get_machine(&machine); +        if (r < 0) { +                log_error("Failed to get machine id: %s", strerror(-r)); +                return r; +        } + +        r = sd_journal_open(&j, SD_JOURNAL_RUNTIME_ONLY); +        if (r < 0) { +                log_error("Failed to read runtime journal: %s", strerror(-r)); +                return r; +        } + +        SD_JOURNAL_FOREACH(j) { +                JournalFile *f; + +                f = j->current_file; +                assert(f && f->current_offset > 0); + +                r = journal_file_move_to_object(f, OBJECT_ENTRY, f->current_offset, &o); +                if (r < 0) { +                        log_error("Can't read entry: %s", strerror(-r)); +                        goto finish; +                } + +                r = journal_file_copy_entry(f, s->system_journal, o, f->current_offset, NULL, NULL, NULL); +                if (r == -E2BIG) { +                        log_info("Allocation limit reached."); + +                        journal_file_post_change(s->system_journal); +                        server_vacuum(s); + +                        r = journal_file_copy_entry(f, s->system_journal, o, f->current_offset, NULL, NULL, NULL); +                } + +                if (r < 0) { +                        log_error("Can't write entry: %s", strerror(-r)); +                        goto finish; +                } +        } + +finish: +        journal_file_post_change(s->system_journal); + +        journal_file_close(s->runtime_journal); +        s->runtime_journal = NULL; + +        if (r >= 0) { +                sd_id128_to_string(machine, path + 17); +                rm_rf(path, false, true, false); +        } + +        return r; +} +  static int process_event(Server *s, struct epoll_event *ev) {          assert(s); @@ -1112,6 +1286,11 @@ static int process_event(Server *s, struct epoll_event *ev) {                          return -errno;                  } +                if (sfsi.ssi_signo == SIGUSR1) { +                        server_flush_to_var(s); +                        return 0; +                } +                  log_debug("Received SIG%s", signal_to_string(sfsi.ssi_signo));                  return 0; @@ -1247,66 +1426,6 @@ static int process_event(Server *s, struct epoll_event *ev) {          return 0;  } -static int system_journal_open(Server *s) { -        int r; -        char *fn; -        sd_id128_t machine; -        char ids[33]; - -        r = sd_id128_get_machine(&machine); -        if (r < 0) -                return r; - -        /* First try to create the machine path, but not the prefix */ -        fn = strappend("/var/log/journal/", sd_id128_to_string(machine, ids)); -        if (!fn) -                return -ENOMEM; -        (void) mkdir(fn, 0755); -        free(fn); - -        /* The create the system journal file */ -        fn = join("/var/log/journal/", ids, "/system.journal", NULL); -        if (!fn) -                return -ENOMEM; - -        r = journal_file_open(fn, O_RDWR|O_CREAT, 0640, NULL, &s->system_journal); -        free(fn); - -        if (r >= 0) { -                s->system_journal->metrics = s->metrics; -                s->system_journal->compress = s->compress; - -                fix_perms(s->system_journal, 0); -                return r; -        } - -        if (r < 0 && r != -ENOENT) { -                log_error("Failed to open system journal: %s", strerror(-r)); -                return r; -        } - -        /* /var didn't work, so try /run, but this time we -         * create the prefix too */ -        fn = join("/run/log/journal/", ids, "/system.journal", NULL); -        if (!fn) -                return -ENOMEM; - -        (void) mkdir_parents(fn, 0755); -        r = journal_file_open(fn, O_RDWR|O_CREAT, 0640, NULL, &s->runtime_journal); -        free(fn); - -        if (r < 0) { -                log_error("Failed to open runtime journal: %s", strerror(-r)); -                return r; -        } - -        s->runtime_journal->metrics = s->metrics; -        s->runtime_journal->compress = s->compress; - -        fix_perms(s->runtime_journal, 0); -        return r; -} -  static int open_syslog_socket(Server *s) {          union sockaddr_union sa;          int one, r; @@ -1470,7 +1589,7 @@ static int open_signalfd(Server *s) {          assert(s);          assert_se(sigemptyset(&mask) == 0); -        sigset_add_many(&mask, SIGINT, SIGTERM, -1); +        sigset_add_many(&mask, SIGINT, SIGTERM, SIGUSR1, -1);          assert_se(sigprocmask(SIG_SETMASK, &mask, NULL) == 0);          s->signal_fd = signalfd(-1, &mask, SFD_NONBLOCK|SFD_CLOEXEC); @@ -1652,6 +1771,9 @@ int main(int argc, char *argv[]) {                    "READY=1\n"                    "STATUS=Processing requests..."); +        server_vacuum(&server); +        server_flush_to_var(&server); +          for (;;) {                  struct epoll_event event; diff --git a/src/journal/sd-journal.c b/src/journal/sd-journal.c index 8fc8ec5885..38e58f5732 100644 --- a/src/journal/sd-journal.c +++ b/src/journal/sd-journal.c @@ -32,58 +32,10 @@  #include "list.h"  #include "lookup3.h"  #include "compress.h" +#include "journal-internal.h"  #define JOURNAL_FILES_MAX 1024 -typedef struct Match Match; - -struct Match { -        char *data; -        size_t size; -        uint64_t le_hash; - -        LIST_FIELDS(Match, matches); -}; - -typedef enum location_type { -        LOCATION_HEAD, -        LOCATION_TAIL, -        LOCATION_DISCRETE -} location_type_t; - -typedef struct Location { -        location_type_t type; - -        uint64_t seqnum; -        sd_id128_t seqnum_id; -        bool seqnum_set; - -        uint64_t realtime; -        bool realtime_set; - -        uint64_t monotonic; -        sd_id128_t boot_id; -        bool monotonic_set; - -        uint64_t xor_hash; -        bool xor_hash_set; -} Location; - -struct sd_journal { -        Hashmap *files; - -        Location current_location; -        JournalFile *current_file; -        uint64_t current_field; - -        int inotify_fd; -        Hashmap *inotify_wd_dirs; -        Hashmap *inotify_wd_roots; - -        LIST_HEAD(Match, matches); -        unsigned n_matches; -}; -  static void detach_location(sd_journal *j) {          Iterator i;          JournalFile *f; @@ -948,6 +900,10 @@ static int add_file(sd_journal *j, const char *prefix, const char *dir, const ch          assert(prefix);          assert(filename); +        if ((j->flags & SD_JOURNAL_SYSTEM_ONLY) && +            !startswith(filename, "system.journal")) +                return 0; +          if (dir)                  fn = join(prefix, "/", dir, "/", filename, NULL);          else @@ -1024,11 +980,18 @@ static int add_directory(sd_journal *j, const char *prefix, const char *dir) {          int r;          DIR *d;          int wd; +        sd_id128_t id, mid;          assert(j);          assert(prefix);          assert(dir); +        if ((j->flags & SD_JOURNAL_LOCAL_ONLY) && +            (sd_id128_from_string(dir, &id) < 0 || +             sd_id128_get_machine(&mid) < 0 || +             !sd_id128_equal(id, mid))) +            return 0; +          fn = join(prefix, "/", dir, NULL);          if (!fn)                  return -ENOMEM; @@ -1132,7 +1095,7 @@ static void remove_root_wd(sd_journal *j, int wd) {          }  } -int sd_journal_open(sd_journal **ret) { +int sd_journal_open(sd_journal **ret, int flags) {          sd_journal *j;          const char *p;          const char search_paths[] = @@ -1146,6 +1109,8 @@ int sd_journal_open(sd_journal **ret) {          if (!j)                  return -ENOMEM; +        j->flags = flags; +          j->inotify_fd = inotify_init1(IN_NONBLOCK|IN_CLOEXEC);          if (j->inotify_fd < 0) {                  r = -errno; @@ -1172,6 +1137,10 @@ int sd_journal_open(sd_journal **ret) {          NULSTR_FOREACH(p, search_paths) {                  DIR *d; +                if ((flags & SD_JOURNAL_RUNTIME_ONLY) && +                    !path_startswith(p, "/run")) +                        continue; +                  d = opendir(p);                  if (!d) {                          if (errno != ENOENT) diff --git a/src/journal/sd-journal.h b/src/journal/sd-journal.h index 7f9f78598b..f6b1c955fb 100644 --- a/src/journal/sd-journal.h +++ b/src/journal/sd-journal.h @@ -31,19 +31,20 @@  /* TODO:   * - *   - check LE/BE conversion for 8bit, 16bit, 32bit values - *   - implement audit gateway + *   - OR of matches is borked...   *   - extend hash tables table as we go   *   - accelerate looking for "all hostnames" and suchlike. - *   - cryptographic hash - *   - OR of matches is borked... - *   - flush /run to /var   *   - hookup with systemctl + *   - handle incomplete header + *   *   - local deserializer - *   - think about manipulations of header   *   - http server - *   - handle incomplete header   *   - message catalog + * + *   - check LE/BE conversion for 8bit, 16bit, 32bit values + *   - cryptographic hash + *   - think about manipulations of header + *   - implement audit gateway   */  /* Write to daemon */ @@ -60,7 +61,13 @@ int sd_journal_stream_fd(const char *tag, int priority, int priority_prefix);  typedef struct sd_journal sd_journal; -int sd_journal_open(sd_journal **ret); +enum { +        SD_JOURNAL_LOCAL_ONLY = 1, +        SD_JOURNAL_RUNTIME_ONLY = 2, +        SD_JOURNAL_SYSTEM_ONLY = 4 +}; + +int sd_journal_open(sd_journal **ret, int flags);  void sd_journal_close(sd_journal *j);  int sd_journal_previous(sd_journal *j); | 
