diff options
author | Lennart Poettering <lennart@poettering.net> | 2011-12-17 00:56:34 +0100 |
---|---|---|
committer | Lennart Poettering <lennart@poettering.net> | 2011-12-17 00:56:34 +0100 |
commit | 7f3e62571a63ac90de6ac5eefeeb8d3e9aa6f49e (patch) | |
tree | cf79b929ef42916d07227b76504fc5990bfe93e6 /src/journal | |
parent | 1e2579fdeb7608719cb28da0b5061f48ba0efc34 (diff) |
journal: add native protocol to journald, and client side API to send journal messages
Diffstat (limited to 'src/journal')
-rw-r--r-- | src/journal/journal-send.c | 196 | ||||
-rw-r--r-- | src/journal/journald.c | 375 | ||||
-rw-r--r-- | src/journal/sd-journal.h | 12 | ||||
-rw-r--r-- | src/journal/test-journal.c | 1 |
4 files changed, 526 insertions, 58 deletions
diff --git a/src/journal/journal-send.c b/src/journal/journal-send.c new file mode 100644 index 0000000000..e2575a9805 --- /dev/null +++ b/src/journal/journal-send.c @@ -0,0 +1,196 @@ +/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/ + +/*** + This file is part of systemd. + + Copyright 2011 Lennart Poettering + + systemd is free software; you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + systemd is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with systemd; If not, see <http://www.gnu.org/licenses/>. +***/ + +#include <sys/socket.h> +#include <sys/un.h> +#include <errno.h> +#include <stddef.h> + +#include "sd-journal.h" +#include "util.h" + +/* We open a single fd, and we'll share it with the current process, + * all its threads, and all its subprocesses. This means we need to + * initialize it atomically, and need to operate on it atomically + * never assuming we are the only user */ + +static int journal_fd(void) { + int fd; + static int fd_plus_one = 0; + +retry: + if (fd_plus_one > 0) + return fd_plus_one - 1; + + fd = socket(AF_UNIX, SOCK_DGRAM|SOCK_CLOEXEC, 0); + if (fd < 0) + return -errno; + + if (!__sync_bool_compare_and_swap(&fd_plus_one, 0, fd+1)) { + close_nointr_nofail(fd); + goto retry; + } + + return fd; +} + +int sd_journal_print(const char *format, ...) { + int r; + va_list ap; + + va_start(ap, format); + r = sd_journal_printv(format, ap); + va_end(ap); + + return r; +} + +int sd_journal_printv(const char *format, va_list ap) { + char buffer[8 + LINE_MAX]; + struct iovec iov; + + memcpy(buffer, "MESSAGE=", 8); + vsnprintf(buffer+8, sizeof(buffer) - 8, format, ap); + + char_array_0(buffer); + + zero(iov); + IOVEC_SET_STRING(iov, buffer); + + return sd_journal_sendv(&iov, 1); +} + +int sd_journal_send(const char *format, ...) { + int r, n = 0, i = 0, j; + va_list ap; + struct iovec *iov = NULL; + + va_start(ap, format); + while (format) { + struct iovec *c; + char *buffer; + + if (i >= n) { + n = MAX(i*2, 4); + c = realloc(iov, n * sizeof(struct iovec)); + if (!c) { + r = -ENOMEM; + goto fail; + } + + iov = c; + } + + if (vasprintf(&buffer, format, ap) < 0) { + r = -ENOMEM; + goto fail; + } + + IOVEC_SET_STRING(iov[i++], buffer); + + format = va_arg(ap, char *); + } + va_end(ap); + + r = sd_journal_sendv(iov, i); + +fail: + for (j = 0; j < i; j++) + free(iov[j].iov_base); + + free(iov); + + return r; +} + +int sd_journal_sendv(const struct iovec *iov, int n) { + int fd; + struct iovec *w; + uint64_t *l; + int i, j = 0; + struct msghdr mh; + struct sockaddr_un sa; + + if (!iov || n <= 0) + return -EINVAL; + + w = alloca(sizeof(struct iovec) * n * 5); + l = alloca(sizeof(uint64_t) * n); + + for (i = 0; i < n; i++) { + char *c, *nl; + + c = memchr(iov[i].iov_base, '=', iov[i].iov_len); + if (!c) + return -EINVAL; + + nl = memchr(iov[i].iov_base, '\n', iov[i].iov_len); + if (nl) { + if (nl < c) + return -EINVAL; + + /* Already includes a newline? Bummer, then + * let's write the variable name, then a + * newline, then the size (64bit LE), followed + * by the data and a final newline */ + + w[j].iov_base = iov[i].iov_base; + w[j].iov_len = c - (char*) iov[i].iov_base; + j++; + + IOVEC_SET_STRING(w[j++], "\n"); + + l[i] = htole64(iov[i].iov_len - (c - (char*) iov[i].iov_base) - 1); + w[j].iov_base = &l[i]; + w[j].iov_len = sizeof(uint64_t); + j++; + + w[j].iov_base = c + 1; + w[j].iov_len = iov[i].iov_len - (c - (char*) iov[i].iov_base) - 1; + j++; + + } else + /* Nothing special? Then just add the line and + * append a newline */ + w[j++] = iov[i]; + + IOVEC_SET_STRING(w[j++], "\n"); + } + + fd = journal_fd(); + if (fd < 0) + return fd; + + zero(sa); + sa.sun_family = AF_UNIX; + strncpy(sa.sun_path,"/run/systemd/journal", sizeof(sa.sun_path)); + + zero(mh); + mh.msg_name = &sa; + mh.msg_namelen = offsetof(struct sockaddr_un, sun_path) + strlen(sa.sun_path); + mh.msg_iov = w; + mh.msg_iovlen = j; + + if (sendmsg(fd, &mh, MSG_NOSIGNAL) < 0) + return -errno; + + return 0; +} diff --git a/src/journal/journald.c b/src/journal/journald.c index 89d8bee2a2..453495a964 100644 --- a/src/journal/journald.c +++ b/src/journal/journald.c @@ -27,6 +27,9 @@ #include <fcntl.h> #include <sys/acl.h> #include <acl/libacl.h> +#include <stddef.h> +#include <sys/ioctl.h> +#include <linux/sockios.h> #include "hashmap.h" #include "journal-file.h" @@ -36,15 +39,19 @@ #include "cgroup-util.h" typedef struct Server { - int syslog_fd; int epoll_fd; int signal_fd; + int syslog_fd; + int native_fd; JournalFile *runtime_journal; JournalFile *system_journal; Hashmap *user_journals; uint64_t seqnum; + + char *buffer; + size_t buffer_size; } Server; static void fix_perms(JournalFile *f, uid_t uid) { @@ -137,35 +144,27 @@ static JournalFile* find_journal(Server *s, uid_t uid) { return f; } -static void process_message(Server *s, const char *buf, struct ucred *ucred, struct timeval *tv) { - char *message = NULL, *pid = NULL, *uid = NULL, *gid = NULL, +static void dispatch_message(Server *s, struct iovec *iovec, unsigned n, unsigned m, struct ucred *ucred, struct timeval *tv) { + char *pid = NULL, *uid = NULL, *gid = NULL, *source_time = NULL, *boot_id = NULL, *machine_id = NULL, *comm = NULL, *cmdline = NULL, *hostname = NULL, *audit_session = NULL, *audit_loginuid = NULL, - *syslog_priority = NULL, *syslog_facility = NULL, *exe = NULL, *cgroup = NULL; - struct iovec iovec[17]; - unsigned n = 0; + char idbuf[33]; sd_id128_t id; int r; char *t; - int priority = LOG_USER | LOG_INFO; uid_t loginuid = 0, realuid = 0; JournalFile *f; - parse_syslog_priority((char**) &buf, &priority); - skip_syslog_date((char**) &buf); - - if (asprintf(&syslog_priority, "PRIORITY=%i", priority & LOG_PRIMASK) >= 0) - IOVEC_SET_STRING(iovec[n++], syslog_priority); + assert(s); + assert(iovec || n == 0); - if (asprintf(&syslog_facility, "SYSLOG_FACILITY=%i", LOG_FAC(priority)) >= 0) - IOVEC_SET_STRING(iovec[n++], syslog_facility); + if (n == 0) + return; - message = strappend("MESSAGE=", buf); - if (message) - IOVEC_SET_STRING(iovec[n++], message); + assert(n + 13 <= m); if (ucred) { uint32_t session; @@ -252,6 +251,8 @@ static void process_message(Server *s, const char *buf, struct ucred *ucred, str free(t); } + assert(n <= m); + f = find_journal(s, realuid == 0 ? 0 : loginuid); if (!f) log_warning("Dropping message, as we can't find a place to store the data."); @@ -262,7 +263,6 @@ static void process_message(Server *s, const char *buf, struct ucred *ucred, str log_error("Failed to write entry, ignoring: %s", strerror(-r)); } - free(message); free(pid); free(uid); free(gid); @@ -275,9 +275,148 @@ static void process_message(Server *s, const char *buf, struct ucred *ucred, str free(hostname); free(audit_session); free(audit_loginuid); + free(cgroup); + +} + +static void process_syslog_message(Server *s, const char *buf, struct ucred *ucred, struct timeval *tv) { + char *message = NULL, *syslog_priority = NULL, *syslog_facility = NULL; + struct iovec iovec[16]; + unsigned n = 0; + int priority = LOG_USER | LOG_INFO; + + assert(s); + assert(buf); + + parse_syslog_priority((char**) &buf, &priority); + skip_syslog_date((char**) &buf); + + if (asprintf(&syslog_priority, "PRIORITY=%i", priority & LOG_PRIMASK) >= 0) + IOVEC_SET_STRING(iovec[n++], syslog_priority); + + if (asprintf(&syslog_facility, "SYSLOG_FACILITY=%i", LOG_FAC(priority)) >= 0) + IOVEC_SET_STRING(iovec[n++], syslog_facility); + + message = strappend("MESSAGE=", buf); + if (message) + IOVEC_SET_STRING(iovec[n++], message); + + dispatch_message(s, iovec, n, ELEMENTSOF(iovec), ucred, tv); + + free(message); free(syslog_facility); free(syslog_priority); - free(cgroup); +} + +static void process_native_message(Server *s, const void *buffer, size_t buffer_size, struct ucred *ucred, struct timeval *tv) { + struct iovec *iovec = NULL; + unsigned n = 0, m = 0, j; + const char *p; + size_t remaining; + + assert(s); + assert(buffer || n == 0); + + p = buffer; + remaining = buffer_size; + + while (remaining > 0) { + const char *e, *q; + + e = memchr(p, '\n', remaining); + + if (!e) { + /* Trailing noise, let's ignore it, and flush what we collected */ + log_debug("Received message with trailing noise, ignoring."); + break; + } + + if (e == p) { + /* Entry separator */ + dispatch_message(s, iovec, n, m, ucred, tv); + n = 0; + + p++; + remaining--; + continue; + } + + if (*p == '.') { + /* Control command, ignore for now */ + remaining -= (e - p) + 1; + p = e + 1; + continue; + } + + /* A property follows */ + + if (n+13 >= m) { + struct iovec *c; + unsigned u; + + u = MAX((n+13U) * 2U, 4U); + c = realloc(iovec, u * sizeof(struct iovec)); + if (!c) { + log_error("Out of memory"); + break; + } + + iovec = c; + m = u; + } + + q = memchr(p, '=', e - p); + if (q) { + iovec[n].iov_base = (char*) p; + iovec[n].iov_len = e - p; + n++; + + remaining -= (e - p) + 1; + p = e + 1; + continue; + } else { + uint64_t l; + char *k; + + if (remaining < e - p + 1 + sizeof(uint64_t) + 1) { + log_debug("Failed to parse message, ignoring."); + break; + } + + memcpy(&l, e + 1, sizeof(uint64_t)); + l = le64toh(l); + + if (remaining < e - p + 1 + sizeof(uint64_t) + l + 1 || + e[1+sizeof(uint64_t)+l] != '\n') { + log_debug("Failed to parse message, ignoring."); + break; + } + + k = malloc((e - p) + 1 + l); + if (!k) { + log_error("Out of memory"); + break; + } + + memcpy(k, p, e - p); + k[e - p] = '='; + memcpy(k + (e - p) + 1, e + 1 + sizeof(uint64_t), l); + + iovec[n].iov_base = k; + iovec[n].iov_len = (e - p) + 1 + l; + n++; + + remaining -= (e - p) + 1 + sizeof(uint64_t) + l + 1; + p = e + 1 + sizeof(uint64_t) + l + 1; + } + } + + dispatch_message(s, iovec, n, m, ucred, tv); + + for (j = 0; j < n; j++) + if (iovec[j].iov_base < buffer || + (const uint8_t*) iovec[j].iov_base >= (const uint8_t*) buffer + buffer_size) + free(iovec[j].iov_base); } static int process_event(Server *s, struct epoll_event *ev) { @@ -309,9 +448,9 @@ static int process_event(Server *s, struct epoll_event *ev) { } - if (ev->data.fd == s->syslog_fd) { + if (ev->data.fd == s->native_fd || + ev->data.fd == s->syslog_fd) { for (;;) { - char buf[LINE_MAX+1]; struct msghdr msghdr; struct iovec iovec; struct ucred *ucred = NULL; @@ -323,11 +462,35 @@ static int process_event(Server *s, struct epoll_event *ev) { CMSG_SPACE(sizeof(struct timeval))]; } control; ssize_t n; - char *e; + int v; + + if (ioctl(ev->data.fd, SIOCINQ, &v) < 0) { + log_error("SIOCINQ failed: %m"); + return -errno; + } + + if (v <= 0) + return 1; + + if (s->buffer_size < (size_t) v) { + void *b; + size_t l; + + l = MAX(LINE_MAX + (size_t) v, s->buffer_size * 2); + b = realloc(s->buffer, l+1); + + if (!b) { + log_error("Couldn't increase buffer."); + return -ENOMEM; + } + + s->buffer_size = l; + s->buffer = b; + } zero(iovec); - iovec.iov_base = buf; - iovec.iov_len = sizeof(buf)-1; + iovec.iov_base = s->buffer; + iovec.iov_len = s->buffer_size; zero(control); zero(msghdr); @@ -358,13 +521,18 @@ static int process_event(Server *s, struct epoll_event *ev) { tv = (struct timeval*) CMSG_DATA(cmsg); } - e = memchr(buf, '\n', n); - if (e) - *e = 0; - else - buf[n] = 0; + if (ev->data.fd == s->syslog_fd) { + char *e; - process_message(s, strstrip(buf), ucred, tv); + e = memchr(s->buffer, '\n', n); + if (e) + *e = 0; + else + s->buffer[n] = 0; + + process_syslog_message(s, strstrip(s->buffer), ucred, tv); + } else + process_native_message(s, s->buffer, n, ucred, tv); } return 1; @@ -428,51 +596,73 @@ static int system_journal_open(Server *s) { return r; } -static int server_init(Server *s) { - int n, one, r; - struct epoll_event ev; - sigset_t mask; +static int open_syslog_socket(Server *s) { + union sockaddr_union sa; + int one, r; assert(s); - zero(*s); - s->syslog_fd = s->signal_fd = -1; + if (s->syslog_fd < 0) { - s->epoll_fd = epoll_create1(EPOLL_CLOEXEC); - if (s->epoll_fd < 0) { - log_error("Failed to create epoll object: %m"); - return -errno; + s->syslog_fd = socket(AF_UNIX, SOCK_DGRAM|SOCK_CLOEXEC, 0); + if (s->syslog_fd < 0) { + log_error("socket() failed: %m"); + return -errno; + } + + zero(sa); + sa.un.sun_family = AF_UNIX; + strncpy(sa.un.sun_path, "/run/systemd/syslog", sizeof(sa.un.sun_path)); + + unlink(sa.un.sun_path); + + r = bind(s->syslog_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path)); + if (r < 0) { + log_error("bind() failed: %m"); + return -errno; + } + + chmod(sa.un.sun_path, 0666); } - n = sd_listen_fds(true); - if (n < 0) { - log_error("Failed to read listening file descriptors from environment: %s", strerror(-n)); - return n; + one = 1; + r = setsockopt(s->syslog_fd, SOL_SOCKET, SO_PASSCRED, &one, sizeof(one)); + if (r < 0) { + log_error("SO_PASSCRED failed: %m"); + return -errno; } - if (n > 1) { - log_error("Too many file descriptors passed."); - return -EINVAL; + one = 1; + r = setsockopt(s->syslog_fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one)); + if (r < 0) { + log_error("SO_TIMESTAMP failed: %m"); + return -errno; } - if (n == 1) - s->syslog_fd = SD_LISTEN_FDS_START; - else { - union sockaddr_union sa; + return 0; +} - s->syslog_fd = socket(AF_UNIX, SOCK_DGRAM|SOCK_CLOEXEC, 0); - if (s->syslog_fd < 0) { +static int open_native_socket(Server*s) { + union sockaddr_union sa; + int one, r; + + assert(s); + + if (s->native_fd < 0) { + + s->native_fd = socket(AF_UNIX, SOCK_DGRAM|SOCK_CLOEXEC, 0); + if (s->native_fd < 0) { log_error("socket() failed: %m"); return -errno; } zero(sa); sa.un.sun_family = AF_UNIX; - strncpy(sa.un.sun_path, "/run/systemd/syslog", sizeof(sa.un.sun_path)); + strncpy(sa.un.sun_path, "/run/systemd/journal", sizeof(sa.un.sun_path)); unlink(sa.un.sun_path); - r = bind(s->syslog_fd, &sa.sa, sizeof(sa.un)); + r = bind(s->native_fd, &sa.sa, offsetof(union sockaddr_union, un.sun_path) + strlen(sa.un.sun_path)); if (r < 0) { log_error("bind() failed: %m"); return -errno; @@ -482,24 +672,90 @@ static int server_init(Server *s) { } one = 1; - r = setsockopt(s->syslog_fd, SOL_SOCKET, SO_PASSCRED, &one, sizeof(one)); + r = setsockopt(s->native_fd, SOL_SOCKET, SO_PASSCRED, &one, sizeof(one)); if (r < 0) { log_error("SO_PASSCRED failed: %m"); return -errno; } one = 1; - r = setsockopt(s->syslog_fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one)); + r = setsockopt(s->native_fd, SOL_SOCKET, SO_TIMESTAMP, &one, sizeof(one)); if (r < 0) { log_error("SO_TIMESTAMP failed: %m"); return -errno; } + return 0; +} + +static int server_init(Server *s) { + int n, r, fd; + struct epoll_event ev; + sigset_t mask; + + assert(s); + + zero(*s); + s->syslog_fd = s->native_fd = s->signal_fd = -1; + + s->epoll_fd = epoll_create1(EPOLL_CLOEXEC); + if (s->epoll_fd < 0) { + log_error("Failed to create epoll object: %m"); + return -errno; + } + + n = sd_listen_fds(true); + if (n < 0) { + log_error("Failed to read listening file descriptors from environment: %s", strerror(-n)); + return n; + } + + for (fd = SD_LISTEN_FDS_START; fd < SD_LISTEN_FDS_START + n; fd++) { + + if (sd_is_socket_unix(fd, SOCK_DGRAM, -1, "/dev/log", 0) > 0) { + + if (s->syslog_fd >= 0) { + log_error("Too many /dev/log sockets passed."); + return -EINVAL; + } + + s->syslog_fd = fd; + + } else if (sd_is_socket(fd, AF_UNIX, SOCK_DGRAM, -1) > 0) { + + if (s->native_fd >= 0) { + log_error("Too many native sockets passed."); + return -EINVAL; + } + + s->native_fd = fd; + } else { + log_error("Unknown socket passed."); + return -EINVAL; + } + } + + r = open_syslog_socket(s); + if (r < 0) + return r; + zero(ev); ev.events = EPOLLIN; ev.data.fd = s->syslog_fd; if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->syslog_fd, &ev) < 0) { - log_error("Failed to add server fd to epoll object: %m"); + log_error("Failed to add syslog server fd to epoll object: %m"); + return -errno; + } + + r = open_native_socket(s); + if (r < 0) + return r; + + zero(ev); + ev.events = EPOLLIN; + ev.data.fd = s->native_fd; + if (epoll_ctl(s->epoll_fd, EPOLL_CTL_ADD, s->native_fd, &ev) < 0) { + log_error("Failed to add native server fd to epoll object: %m"); return -errno; } @@ -558,6 +814,9 @@ static void server_done(Server *s) { if (s->syslog_fd >= 0) close_nointr_nofail(s->syslog_fd); + + if (s->native_fd >= 0) + close_nointr_nofail(s->native_fd); } int main(int argc, char *argv[]) { diff --git a/src/journal/sd-journal.h b/src/journal/sd-journal.h index 8d7e314223..e42293ffe7 100644 --- a/src/journal/sd-journal.h +++ b/src/journal/sd-journal.h @@ -24,6 +24,8 @@ #include <inttypes.h> #include <sys/types.h> +#include <stdarg.h> +#include <sys/uio.h> #include "sd-id128.h" @@ -43,6 +45,16 @@ * - comm, argv can be manipulated, should it be _COMM=, _CMDLINE= or COMM=, CMDLINE=? */ +/* Write to daemon */ + +int sd_journal_print(const char *format, ...) __attribute__ ((format (printf, 1, 2))); +int sd_journal_printv(const char *format, va_list ap); + +int sd_journal_send(const char *format, ...) __attribute__((sentinel)); +int sd_journal_sendv(const struct iovec *iov, int n); + +/* Browse journal stream */ + typedef struct sd_journal sd_journal; int sd_journal_open(sd_journal **ret); diff --git a/src/journal/test-journal.c b/src/journal/test-journal.c index 45ced12b46..a9bd6cb2cf 100644 --- a/src/journal/test-journal.c +++ b/src/journal/test-journal.c @@ -22,6 +22,7 @@ #include <fcntl.h> #include <unistd.h> +#include "sd-journal.h" #include "journal-file.h" #include "log.h" |