diff options
-rw-r--r-- | .gitignore | 1 | ||||
-rw-r--r-- | Makefile.am | 17 | ||||
-rw-r--r-- | src/libsystemd-bus/sd-event.c | 1440 | ||||
-rw-r--r-- | src/libsystemd-bus/test-event.c | 197 | ||||
-rw-r--r-- | src/shared/prioq.h | 2 | ||||
-rw-r--r-- | src/systemd/sd-event.h | 97 | ||||
-rw-r--r-- | src/systemd/sd-journal.h | 3 |
7 files changed, 1753 insertions, 4 deletions
diff --git a/.gitignore b/.gitignore index 786a0beb2d..2a2ed998b8 100644 --- a/.gitignore +++ b/.gitignore @@ -107,6 +107,7 @@ /test-device-nodes /test-engine /test-env-replace +/test-event /test-fileio /test-hashmap /test-hostname diff --git a/Makefile.am b/Makefile.am index 1458697c05..be152e9b8d 100644 --- a/Makefile.am +++ b/Makefile.am @@ -1936,8 +1936,9 @@ EXTRA_DIST += \ libsystemd_bus_la_SOURCES = \ src/systemd/sd-bus.h \ src/systemd/sd-bus-protocol.h \ - src/systemd/sd-memfd.h \ src/systemd/sd-bus-vtable.h \ + src/systemd/sd-memfd.h \ + src/systemd/sd-event.h \ src/libsystemd-bus/sd-bus.c \ src/libsystemd-bus/bus-control.c \ src/libsystemd-bus/bus-control.h \ @@ -1962,7 +1963,8 @@ libsystemd_bus_la_SOURCES = \ src/libsystemd-bus/bus-introspect.c \ src/libsystemd-bus/bus-introspect.h \ src/libsystemd-bus/kdbus.h \ - src/libsystemd-bus/sd-memfd.c + src/libsystemd-bus/sd-memfd.c \ + src/libsystemd-bus/sd-event.c libsystemd_bus_la_LIBADD = \ libsystemd-id128-internal.la \ @@ -1988,7 +1990,8 @@ tests += \ test-bus-memfd \ test-bus-zero-copy \ test-bus-introspect \ - test-bus-objects + test-bus-objects \ + test-event noinst_PROGRAMS += \ busctl @@ -2124,6 +2127,14 @@ test_bus_introspect_LDADD = \ libsystemd-shared.la \ libsystemd-bus.la +test_event_SOURCES = \ + src/libsystemd-bus/test-event.c + +test_event_LDADD = \ + libsystemd-shared.la \ + libsystemd-bus.la \ + libsystemd-id128-internal.la + busctl_SOURCES = \ src/libsystemd-bus/busctl.c diff --git a/src/libsystemd-bus/sd-event.c b/src/libsystemd-bus/sd-event.c new file mode 100644 index 0000000000..069e4fb820 --- /dev/null +++ b/src/libsystemd-bus/sd-event.c @@ -0,0 +1,1440 @@ +/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/ + +/*** + This file is part of systemd. + + Copyright 2013 Lennart Poettering + + systemd is free software; you can redistribute it and/or modify it + under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 2.1 of the License, or + (at your option) any later version. + + systemd is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with systemd; If not, see <http://www.gnu.org/licenses/>. +***/ + +#include <sys/epoll.h> +#include <sys/timerfd.h> +#include <sys/wait.h> + +#include "macro.h" +#include "refcnt.h" +#include "prioq.h" +#include "hashmap.h" +#include "util.h" +#include "time-util.h" + +#include "sd-event.h" + +#define EPOLL_QUEUE_MAX 64 + +typedef enum EventSourceType { + SOURCE_IO, + SOURCE_MONOTONIC, + SOURCE_REALTIME, + SOURCE_SIGNAL, + SOURCE_CHILD, + SOURCE_DEFER +} EventSourceType; + +struct sd_event_source { + RefCount n_ref; + + sd_event *event; + void *userdata; + sd_prepare_handler_t prepare; + + EventSourceType type:4; + sd_event_mute_t mute:3; + bool pending:1; + + int priority; + unsigned pending_index; + unsigned prepare_index; + unsigned pending_iteration; + unsigned prepare_iteration; + + union { + struct { + sd_io_handler_t callback; + int fd; + uint32_t events; + uint32_t revents; + bool registered:1; + } io; + struct { + sd_time_handler_t callback; + usec_t next; + unsigned prioq_index; + } time; + struct { + sd_signal_handler_t callback; + struct signalfd_siginfo siginfo; + int sig; + } signal; + struct { + sd_child_handler_t callback; + siginfo_t siginfo; + pid_t pid; + int options; + } child; + struct { + sd_defer_handler_t callback; + } defer; + }; +}; + +struct sd_event { + RefCount n_ref; + + int epoll_fd; + int signal_fd; + int realtime_fd; + int monotonic_fd; + + Prioq *pending; + Prioq *prepare; + Prioq *monotonic; + Prioq *realtime; + + sigset_t sigset; + sd_event_source **signal_sources; + + Hashmap *child_sources; + unsigned n_unmuted_child_sources; + + unsigned iteration; + unsigned processed_children; + + usec_t realtime_next, monotonic_next; + + bool quit; +}; + +static int pending_prioq_compare(const void *a, const void *b) { + const sd_event_source *x = a, *y = b; + + assert(x->pending); + assert(y->pending); + + /* Unmuted ones first */ + if (x->mute != SD_EVENT_MUTED && y->mute == SD_EVENT_MUTED) + return -1; + if (x->mute == SD_EVENT_MUTED && y->mute != SD_EVENT_MUTED) + return 1; + + /* Lower priority values first */ + if (x->priority < y->priority) + return -1; + if (x->priority > y->priority) + return 1; + + /* Older entries first */ + if (x->pending_iteration < y->pending_iteration) + return -1; + if (x->pending_iteration > y->pending_iteration) + return 1; + + /* Stability for the rest */ + if (x < y) + return -1; + if (y > x) + return 1; + + return 0; +} + +static int prepare_prioq_compare(const void *a, const void *b) { + const sd_event_source *x = a, *y = b; + + assert(x->prepare); + assert(y->prepare); + + /* Move most recently prepared ones last, so that we can stop + * preparing as soon as we hit one that has already been + * prepared in the current iteration */ + if (x->prepare_iteration < y->prepare_iteration) + return -1; + if (x->prepare_iteration > y->prepare_iteration) + return 1; + + /* Unmuted ones first */ + if (x->mute != SD_EVENT_MUTED && y->mute == SD_EVENT_MUTED) + return -1; + if (x->mute == SD_EVENT_MUTED && y->mute != SD_EVENT_MUTED) + return 1; + + /* Lower priority values first */ + if (x->priority < y->priority) + return -1; + if (x->priority > y->priority) + return 1; + + /* Stability for the rest */ + if (x < y) + return -1; + if (y > x) + return 1; + + return 0; +} + +static int time_prioq_compare(const void *a, const void *b) { + const sd_event_source *x = a, *y = b; + + assert(x->type == SOURCE_MONOTONIC || x->type == SOURCE_REALTIME); + assert(y->type == SOURCE_MONOTONIC || y->type == SOURCE_REALTIME); + + /* Unmuted ones first */ + if (x->mute != SD_EVENT_MUTED && y->mute == SD_EVENT_MUTED) + return -1; + if (x->mute == SD_EVENT_MUTED && y->mute != SD_EVENT_MUTED) + return 1; + + /* Move the pending ones to the end */ + if (!x->pending && y->pending) + return -1; + if (x->pending && !y->pending) + return 1; + + /* Order by time */ + if (x->time.next < y->time.next) + return -1; + if (x->time.next > y->time.next) + return -1; + + /* Stability for the rest */ + if (x < y) + return -1; + if (y > x) + return 1; + + return 0; +} + +static void event_free(sd_event *e) { + assert(e); + + if (e->epoll_fd >= 0) + close_nointr_nofail(e->epoll_fd); + + if (e->signal_fd >= 0) + close_nointr_nofail(e->signal_fd); + + if (e->realtime_fd >= 0) + close_nointr_nofail(e->realtime_fd); + + if (e->monotonic_fd >= 0) + close_nointr_nofail(e->monotonic_fd); + + prioq_free(e->pending); + prioq_free(e->prepare); + prioq_free(e->monotonic); + prioq_free(e->realtime); + + free(e->signal_sources); + + hashmap_free(e->child_sources); + free(e); +} + +int sd_event_new(sd_event** ret) { + sd_event *e; + int r; + + if (!ret) + return -EINVAL; + + e = new0(sd_event, 1); + if (!e) + return -ENOMEM; + + e->n_ref = REFCNT_INIT; + e->signal_fd = e->realtime_fd = e->monotonic_fd = e->epoll_fd = -1; + e->realtime_next = e->monotonic_next = (usec_t) -1; + + assert_se(sigemptyset(&e->sigset) == 0); + + e->pending = prioq_new(pending_prioq_compare); + if (!e->pending) { + r = -ENOMEM; + goto fail; + } + + e->epoll_fd = epoll_create1(EPOLL_CLOEXEC); + if (e->epoll_fd < 0) { + r = -errno; + goto fail; + } + + *ret = e; + return 0; + +fail: + event_free(e); + return r; +} + +sd_event* sd_event_ref(sd_event *e) { + if (!e) + return NULL; + + assert_se(REFCNT_INC(e->n_ref) >= 2); + + return e; +} + +sd_event* sd_event_unref(sd_event *e) { + if (!e) + return NULL; + + if (REFCNT_DEC(e->n_ref) <= 0) + event_free(e); + + return NULL; +} + +static int source_io_unregister(sd_event_source *s) { + int r; + + assert(s); + assert(s->type == SOURCE_IO); + + if (!s->io.registered) + return 0; + + r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_DEL, s->io.fd, NULL); + if (r < 0) + return -errno; + + s->io.registered = false; + return 0; +} + +static int source_io_register(sd_event_source *s, sd_event_mute_t m, uint32_t events) { + struct epoll_event ev = {}; + int r; + + assert(s); + assert(s->type == SOURCE_IO); + assert(m != SD_EVENT_MUTED); + + ev.events = events; + ev.data.ptr = s; + + if (m == SD_EVENT_ONESHOT) + ev.events |= EPOLLONESHOT; + + if (s->io.registered) + r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_MOD, s->io.fd, &ev); + else + r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_ADD, s->io.fd, &ev); + + if (r < 0) + return -errno; + + s->io.registered = true; + + return 0; +} + +static void source_free(sd_event_source *s) { + assert(s); + + if (s->event) { + switch (s->type) { + + case SOURCE_IO: + if (s->io.fd >= 0) + source_io_unregister(s); + + break; + + case SOURCE_MONOTONIC: + prioq_remove(s->event->monotonic, s, &s->time.prioq_index); + break; + + case SOURCE_REALTIME: + prioq_remove(s->event->realtime, s, &s->time.prioq_index); + break; + + case SOURCE_SIGNAL: + if (s->signal.sig > 0) { + if (s->signal.sig != SIGCHLD || s->event->n_unmuted_child_sources == 0) + assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0); + + if (s->event->signal_sources) + s->event->signal_sources[s->signal.sig] = NULL; + } + + break; + + case SOURCE_CHILD: + if (s->child.pid > 0) { + if (s->mute != SD_EVENT_MUTED) { + assert(s->event->n_unmuted_child_sources > 0); + s->event->n_unmuted_child_sources--; + } + + if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) + assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0); + + hashmap_remove(s->event->child_sources, INT_TO_PTR(s->child.pid)); + } + + break; + } + + if (s->pending) + prioq_remove(s->event->pending, s, &s->pending_index); + + if (s->prepare) + prioq_remove(s->event->prepare, s, &s->prepare_index); + + sd_event_unref(s->event); + } + + free(s); +} + +static int source_set_pending(sd_event_source *s, bool b) { + int r; + + assert(s); + + if (s->pending == b) + return 0; + + s->pending = b; + + if (b) { + s->pending_iteration = s->event->iteration; + + r = prioq_put(s->event->pending, s, &s->pending_index); + if (r < 0) { + s->pending = false; + return r; + } + } else + assert_se(prioq_remove(s->event->pending, s, &s->pending_index)); + + return 0; +} + +static sd_event_source *source_new(sd_event *e, EventSourceType type) { + sd_event_source *s; + + assert(e); + + s = new0(sd_event_source, 1); + if (!s) + return NULL; + + s->n_ref = REFCNT_INIT; + s->event = sd_event_ref(e); + s->type = type; + s->mute = SD_EVENT_UNMUTED; + s->pending_index = s->prepare_index = PRIOQ_IDX_NULL; + + return s; +} + +int sd_event_add_io( + sd_event *e, + int fd, + uint32_t events, + sd_io_handler_t callback, + void *userdata, + sd_event_source **ret) { + + sd_event_source *s; + int r; + + if (!e) + return -EINVAL; + if (fd < 0) + return -EINVAL; + if (events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP)) + return -EINVAL; + if (!callback) + return -EINVAL; + if (!ret) + return -EINVAL; + + s = source_new(e, SOURCE_IO); + if (!s) + return -ENOMEM; + + s->io.fd = fd; + s->io.events = events; + s->io.callback = callback; + s->userdata = userdata; + + r = source_io_register(s, s->mute, events); + if (r < 0) { + source_free(s); + return -errno; + } + + *ret = s; + return 0; +} + +static int event_setup_timer_fd( + sd_event *e, + EventSourceType type, + int *timer_fd, + clockid_t id) { + + struct epoll_event ev = {}; + int r, fd; + + assert(e); + assert(timer_fd); + + if (_likely_(*timer_fd >= 0)) + return 0; + + fd = timerfd_create(id, TFD_NONBLOCK|TFD_CLOEXEC); + if (fd < 0) + return -errno; + + ev.events = EPOLLIN; + ev.data.ptr = INT_TO_PTR(type); + + r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, fd, &ev); + if (r < 0) { + close_nointr_nofail(fd); + return -errno; + } + + *timer_fd = fd; + return 0; +} + +static int event_add_time_internal( + sd_event *e, + EventSourceType type, + int *timer_fd, + clockid_t id, + Prioq **prioq, + uint64_t usec, + sd_time_handler_t callback, + void *userdata, + sd_event_source **ret) { + + sd_event_source *s; + int r; + + if (!e) + return -EINVAL; + if (!callback) + return -EINVAL; + if (!ret) + return -EINVAL; + + assert(timer_fd); + assert(prioq); + + if (!*prioq) { + *prioq = prioq_new(time_prioq_compare); + if (!*prioq) + return -ENOMEM; + } + + if (*timer_fd < 0) { + r = event_setup_timer_fd(e, type, timer_fd, id); + if (r < 0) + return r; + } + + s = source_new(e, type); + if (!s) + return -ENOMEM; + + s->time.next = usec; + s->time.callback = callback; + s->time.prioq_index = PRIOQ_IDX_NULL; + s->userdata = userdata; + + r = prioq_put(*prioq, s, &s->time.prioq_index); + if (r < 0) { + source_free(s); + return r; + } + + *ret = s; + return 0; +} + +int sd_event_add_monotonic(sd_event *e, uint64_t usec, sd_time_handler_t callback, void *userdata, sd_event_source **ret) { + return event_add_time_internal(e, SOURCE_MONOTONIC, &e->monotonic_fd, CLOCK_MONOTONIC, &e->monotonic, usec, callback, userdata, ret); +} + +int sd_event_add_realtime(sd_event *e, uint64_t usec, sd_time_handler_t callback, void *userdata, sd_event_source **ret) { + return event_add_time_internal(e, SOURCE_REALTIME, &e->realtime_fd, CLOCK_REALTIME, &e->realtime, usec, callback, userdata, ret); +} + +static int event_update_signal_fd(sd_event *e) { + struct epoll_event ev = {}; + bool add_to_epoll; + int r; + + assert(e); + + add_to_epoll = e->signal_fd < 0; + + r = signalfd(e->signal_fd, &e->sigset, SFD_NONBLOCK|SFD_CLOEXEC); + if (r < 0) + return -errno; + + e->signal_fd = r; + + if (!add_to_epoll) + return 0; + + ev.events = EPOLLIN; + ev.data.ptr = INT_TO_PTR(SOURCE_SIGNAL); + + r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, e->signal_fd, &ev); + if (r < 0) { + close_nointr_nofail(e->signal_fd); + e->signal_fd = -1; + + return -errno; + } + + return 0; +} + +int sd_event_add_signal(sd_event *e, int sig, sd_signal_handler_t callback, void *userdata, sd_event_source **ret) { + sd_event_source *s; + int r; + + if (!e) + return -EINVAL; + if (sig <= 0) + return -EINVAL; + if (sig >= _NSIG) + return -EINVAL; + if (!callback) + return -EINVAL; + if (!ret) + return -EINVAL; + + if (!e->signal_sources) { + e->signal_sources = new0(sd_event_source*, _NSIG); + if (!e->signal_sources) + return -ENOMEM; + } else if (e->signal_sources[sig]) + return -EBUSY; + + s = source_new(e, SOURCE_SIGNAL); + if (!s) + return -ENOMEM; + + s->signal.sig = sig; + s->signal.callback = callback; + s->userdata = userdata; + + e->signal_sources[sig] = s; + assert_se(sigaddset(&e->sigset, sig) == 0); + + if (sig != SIGCHLD || e->n_unmuted_child_sources == 0) { + r = event_update_signal_fd(e); + if (r < 0) { + source_free(s); + return r; + } + } + + *ret = s; + return 0; +} + +int sd_event_add_child(sd_event *e, pid_t pid, int options, sd_child_handler_t callback, void *userdata, sd_event_source **ret) { + sd_event_source *s; + int r; + + if (!e) + return -EINVAL; + if (pid <= 1) + return -EINVAL; + if (options & ~(WEXITED|WSTOPPED|WCONTINUED)) + return -EINVAL; + if (!callback) + return -EINVAL; + if (!ret) + return -EINVAL; + + r = hashmap_ensure_allocated(&e->child_sources, trivial_hash_func, trivial_compare_func); + if (r < 0) + return r; + + if (hashmap_contains(e->child_sources, INT_TO_PTR(pid))) + return -EBUSY; + + s = source_new(e, SOURCE_CHILD); + if (!s) + return -ENOMEM; + + s->child.pid = pid; + s->child.options = options; + s->child.callback = callback; + s->userdata = userdata; + + r = hashmap_put(e->child_sources, INT_TO_PTR(pid), s); + if (r < 0) { + source_free(s); + return r; + } + + e->n_unmuted_child_sources ++; + + assert_se(sigaddset(&e->sigset, SIGCHLD) == 0); + + if (!e->signal_sources || !e->signal_sources[SIGCHLD]) { + r = event_update_signal_fd(e); + if (r < 0) { + source_free(s); + return -errno; + } + } + + *ret = s; + return 0; +} + +int sd_event_add_defer(sd_event *e, sd_defer_handler_t callback, void *userdata, sd_event_source **ret) { + sd_event_source *s; + int r; + + if (!e) + return -EINVAL; + if (!ret) + return -EINVAL; + + s = source_new(e, SOURCE_DEFER); + if (!s) + return -ENOMEM; + + s->defer.callback = callback; + s->userdata = userdata; + + r = source_set_pending(s, true); + if (r < 0) { + source_free(s); + return r; + } + + *ret = s; + return 0; +} + +sd_event_source* sd_event_source_ref(sd_event_source *s) { + if (!s) + return NULL; + + assert_se(REFCNT_INC(s->n_ref) >= 2); + + return s; +} + +sd_event_source* sd_event_source_unref(sd_event_source *s) { + if (!s) + return NULL; + + if (REFCNT_DEC(s->n_ref) <= 0) + source_free(s); + + return NULL; +} + +int sd_event_source_get_pending(sd_event_source *s) { + if (!s) + return -EINVAL; + + return s->pending; +} + +int sd_event_source_get_io_fd(sd_event_source *s) { + if (!s) + return -EINVAL; + if (s->type != SOURCE_IO) + return -EDOM; + + return s->io.fd; +} + +int sd_event_source_get_io_events(sd_event_source *s, uint32_t* events) { + if (!s) + return -EINVAL; + if (s->type != SOURCE_IO) + return -EDOM; + if (!events) + return -EINVAL; + + *events = s->io.events; + return 0; +} + +int sd_event_source_set_io_events(sd_event_source *s, uint32_t events) { + int r; + + if (!s) + return -EINVAL; + if (!s->type != SOURCE_IO) + return -EDOM; + if (events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP)) + return -EINVAL; + + if (s->io.events == events) + return 0; + + if (s->mute != SD_EVENT_MUTED) { + r = source_io_register(s, s->io.events, events); + if (r < 0) + return r; + } + + s->io.events = events; + + return 0; +} + +int sd_event_source_get_io_revents(sd_event_source *s, uint32_t* revents) { + if (!s) + return -EINVAL; + if (s->type != SOURCE_IO) + return -EDOM; + if (!revents) + return -EINVAL; + if (!s->pending) + return -ENODATA; + + *revents = s->io.revents; + return 0; +} + +int sd_event_source_get_signal(sd_event_source *s) { + if (!s) + return -EINVAL; + if (s->type != SOURCE_SIGNAL) + return -EDOM; + + return s->signal.sig; +} + +int sd_event_source_get_priority(sd_event_source *s, int *priority) { + if (!s) + return -EINVAL; + + return s->priority; +} + +int sd_event_source_set_priority(sd_event_source *s, int priority) { + if (!s) + return -EINVAL; + + if (s->priority == priority) + return 0; + + s->priority = priority; + + if (s->pending) + assert_se(prioq_reshuffle(s->event->pending, s, &s->pending_index) == 0); + + if (s->prepare) + assert_se(prioq_reshuffle(s->event->prepare, s, &s->prepare_index) == 0); + + return 0; +} + +int sd_event_source_get_mute(sd_event_source *s, sd_event_mute_t *m) { + if (!s) + return -EINVAL; + if (!m) + return -EINVAL; + + *m = s->mute; + return 0; +} + +int sd_event_source_set_mute(sd_event_source *s, sd_event_mute_t m) { + int r; + + if (!s) + return -EINVAL; + if (m != SD_EVENT_MUTED && m != SD_EVENT_UNMUTED && !SD_EVENT_ONESHOT) + return -EINVAL; + + if (s->mute == m) + return 0; + + if (m == SD_EVENT_MUTED) { + + switch (s->type) { + + case SOURCE_IO: + r = source_io_unregister(s); + if (r < 0) + return r; + + s->mute = m; + break; + + case SOURCE_MONOTONIC: + s->mute = m; + prioq_reshuffle(s->event->monotonic, s, &s->time.prioq_index); + break; + + case SOURCE_REALTIME: + s->mute = m; + prioq_reshuffle(s->event->realtime, s, &s->time.prioq_index); + break; + + case SOURCE_SIGNAL: + s->mute = m; + if (s->signal.sig != SIGCHLD || s->event->n_unmuted_child_sources == 0) { + assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0); + event_update_signal_fd(s->event); + } + + break; + + case SOURCE_CHILD: + s->mute = m; + + assert(s->event->n_unmuted_child_sources > 0); + s->event->n_unmuted_child_sources--; + + if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) { + assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0); + event_update_signal_fd(s->event); + } + + break; + + case SOURCE_DEFER: + s->mute = m; + break; + } + + } else { + switch (s->type) { + + case SOURCE_IO: + r = source_io_register(s, m, s->io.events); + if (r < 0) + return r; + + s->mute = m; + break; + + case SOURCE_MONOTONIC: + s->mute = m; + prioq_reshuffle(s->event->monotonic, s, &s->time.prioq_index); + break; + + case SOURCE_REALTIME: + s->mute = m; + prioq_reshuffle(s->event->realtime, s, &s->time.prioq_index); + break; + + case SOURCE_SIGNAL: + s->mute = m; + + if (s->signal.sig != SIGCHLD || s->event->n_unmuted_child_sources == 0) { + assert_se(sigaddset(&s->event->sigset, s->signal.sig) == 0); + event_update_signal_fd(s->event); + } + break; + + case SOURCE_CHILD: + s->mute = m; + + if (s->mute == SD_EVENT_MUTED) { + s->event->n_unmuted_child_sources++; + + if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) { + assert_se(sigaddset(&s->event->sigset, SIGCHLD) == 0); + event_update_signal_fd(s->event); + } + } + break; + + case SOURCE_DEFER: + s->mute = m; + break; + } + } + + if (s->pending) + prioq_reshuffle(s->event->pending, s, &s->pending_index); + + if (s->prepare) + prioq_reshuffle(s->event->prepare, s, &s->prepare_index); + + return 0; +} + +int sd_event_source_get_time(sd_event_source *s, uint64_t *usec) { + if (!s) + return -EINVAL; + if (!usec) + return -EINVAL; + if (s->type != SOURCE_REALTIME && s->type != SOURCE_MONOTONIC) + return -EDOM; + + *usec = s->time.next; + return 0; +} + +int sd_event_source_set_time(sd_event_source *s, uint64_t usec) { + if (!s) + return -EINVAL; + if (s->type != SOURCE_REALTIME && s->type != SOURCE_MONOTONIC) + return -EDOM; + + if (s->time.next == usec) + return 0; + + s->time.next = usec; + + if (s->type == SOURCE_REALTIME) + prioq_reshuffle(s->event->realtime, s, &s->time.prioq_index); + else + prioq_reshuffle(s->event->monotonic, s, &s->time.prioq_index); + + return 0; +} + +int sd_event_source_set_prepare(sd_event_source *s, sd_prepare_handler_t callback) { + int r; + + if (!s) + return -EINVAL; + + if (s->prepare == callback) + return 0; + + if (callback && s->prepare) { + s->prepare = callback; + return 0; + } + + r = prioq_ensure_allocated(&s->event->prepare, prepare_prioq_compare); + if (r < 0) + return r; + + s->prepare = callback; + + if (callback) { + r = prioq_put(s->event->prepare, s, &s->prepare_index); + if (r < 0) + return r; + } else + prioq_remove(s->event->prepare, s, &s->prepare_index); + + return 0; +} + +void* sd_event_source_get_userdata(sd_event_source *s) { + if (!s) + return NULL; + + return s->userdata; +} + +static int event_arm_timer( + sd_event *e, + int timer_fd, + Prioq *prioq, + usec_t *next) { + + struct itimerspec its = {}; + sd_event_source *s; + int r; + + assert_se(e); + assert_se(next); + + s = prioq_peek(prioq); + if (!s || s->mute == SD_EVENT_MUTED) + return 0; + + if (*next == s->time.next) + return 0; + + assert_se(timer_fd >= 0); + + if (s->time.next == 0) { + /* We don' want to disarm here, just mean some time looooong ago. */ + its.it_value.tv_sec = 0; + its.it_value.tv_nsec = 1; + } else + timespec_store(&its.it_value, s->time.next); + + r = timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &its, NULL); + if (r < 0) + return r; + + *next = s->time.next; + return 0; +} + +static int process_io(sd_event *e, sd_event_source *s, uint32_t events) { + assert(e); + assert(s); + assert(s->type == SOURCE_IO); + + s->io.revents = events; + + /* + If this is a oneshot event source, then we added it to the + epoll with EPOLLONESHOT, hence we know it's not registered + anymore. We can save a syscall here... + */ + + if (s->mute == SD_EVENT_ONESHOT) + s->io.registered = false; + + return source_set_pending(s, true); +} + +static int flush_timer(sd_event *e, int fd, uint32_t events) { + uint64_t x; + ssize_t ss; + + assert(e); + + if (events != EPOLLIN) + return -EIO; + + ss = read(fd, &x, sizeof(x)); + if (ss < 0) { + if (errno == EAGAIN || errno == EINTR) + return 0; + + return -errno; + } + + if (ss != sizeof(x)) + return -EIO; + + return 0; +} + +static int process_timer(sd_event *e, usec_t n, Prioq *prioq) { + sd_event_source *s; + int r; + + assert(e); + + for (;;) { + s = prioq_peek(prioq); + if (!s || + s->time.next > n || + s->mute == SD_EVENT_MUTED || + s->pending) + break; + + r = source_set_pending(s, true); + if (r < 0) + return r; + + r = prioq_reshuffle(prioq, s, &s->time.prioq_index); + if (r < 0) + return r; + } + + return 0; +} + +static int process_child(sd_event *e) { + sd_event_source *s; + Iterator i; + int r; + + assert(e); + + /* + So, this is ugly. We iteratively invoke waitid() with P_PID + + WNOHANG for each PID we wait for, instead of using + P_ALL. This is because we only want to get child + information of very specific child processes, and not all + of them. We might not have processed the SIGCHLD even of a + previous invocation and we don't want to maintain a + unbounded *per-child* event queue, hence we really don't + want anything flushed out of the kernel's queue that we + don't care about. Since this is O(n) this means that if you + have a lot of processes you probably want to handle SIGCHLD + yourself. + */ + + HASHMAP_FOREACH(s, e->child_sources, i) { + assert(s->type == SOURCE_CHILD); + + if (s->pending) + continue; + + if (s->mute == SD_EVENT_MUTED) + continue; + + zero(s->child.siginfo); + r = waitid(P_PID, s->child.pid, &s->child.siginfo, WNOHANG|s->child.options); + if (r < 0) + return -errno; + + if (s->child.siginfo.si_pid != 0) { + r = source_set_pending(s, true); + if (r < 0) + return r; + } + } + + e->processed_children = e->iteration; + return 0; +} + +static int process_signal(sd_event *e, uint32_t events) { + struct signalfd_siginfo si; + bool read_one = false; + ssize_t ss; + int r; + + if (events != EPOLLIN) + return -EIO; + + for (;;) { + sd_event_source *s; + + ss = read(e->signal_fd, &si, sizeof(si)); + if (ss < 0) { + if (errno == EAGAIN || errno == EINTR) + return read_one; + + return -errno; + } + + if (ss != sizeof(si)) + return -EIO; + + read_one = true; + + if (si.ssi_signo == SIGCHLD) { + r = process_child(e); + if (r < 0) + return r; + if (r > 0 || !e->signal_sources[si.ssi_signo]) + continue; + } else { + s = e->signal_sources[si.ssi_signo]; + if (!s) + return -EIO; + } + + s->signal.siginfo = si; + r = source_set_pending(s, true); + if (r < 0) + return r; + } + + + return 0; +} + +static int source_dispatch(sd_event_source *s) { + int r; + + assert(s); + assert(s->pending); + + r = source_set_pending(s, false); + if (r < 0) + return r; + + if (s->mute == SD_EVENT_ONESHOT) { + r = sd_event_source_set_mute(s, SD_EVENT_MUTED); + if (r < 0) + return r; + } + + switch (s->type) { + + case SOURCE_IO: + r = s->io.callback(s, s->io.fd, s->io.revents, s->userdata); + break; + + case SOURCE_MONOTONIC: + r = s->time.callback(s, s->time.next, s->userdata); + break; + + case SOURCE_REALTIME: + r = s->time.callback(s, s->time.next, s->userdata); + break; + + case SOURCE_SIGNAL: + r = s->signal.callback(s, &s->signal.siginfo, s->userdata); + break; + + case SOURCE_CHILD: + r = s->child.callback(s, &s->child.siginfo, s->userdata); + break; + + case SOURCE_DEFER: + r = s->defer.callback(s, s->userdata); + break; + } + + return r; +} + +static int event_prepare(sd_event *e) { + int r; + + assert(e); + + for (;;) { + sd_event_source *s; + + s = prioq_peek(e->prepare); + if (!s || s->prepare_iteration == e->iteration || s->mute == SD_EVENT_MUTED) + break; + + s->prepare_iteration = e->iteration; + r = prioq_reshuffle(e->prepare, s, &s->prepare_index); + if (r < 0) + return r; + + assert(s->prepare); + r = s->prepare(s, s->userdata); + if (r < 0) + return r; + + } + + return 0; +} + +int sd_event_run(sd_event *e, uint64_t timeout) { + struct epoll_event ev_queue[EPOLL_QUEUE_MAX]; + sd_event_source *p; + int r, i, m; + dual_timestamp n; + + if (!e) + return -EINVAL; + if (e->quit) + return -ESTALE; + + e->iteration++; + + r = event_prepare(e); + if (r < 0) + return r; + + r = event_arm_timer(e, e->monotonic_fd, e->monotonic, &e->monotonic_next); + if (r < 0) + return r; + + r = event_arm_timer(e, e->realtime_fd, e->realtime, &e->realtime_next); + if (r < 0) + return r; + + if (e->iteration == 1 && !hashmap_isempty(e->child_sources)) + /* On the first iteration, there might be already some + * zombies for us to care for, hence, don't wait */ + timeout = 0; + else { + p = prioq_peek(e->pending); + if (p && p->mute != SD_EVENT_MUTED) + timeout = 0; + } + + m = epoll_wait(e->epoll_fd, ev_queue, EPOLL_QUEUE_MAX, timeout == (uint64_t) -1 ? -1 : (int) ((timeout + USEC_PER_MSEC - 1) / USEC_PER_MSEC)); + if (m < 0) + return m; + + dual_timestamp_get(&n); + + for (i = 0; i < m; i++) { + + if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_MONOTONIC)) + r = flush_timer(e, e->monotonic_fd, ev_queue[i].events); + else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_REALTIME)) + r = flush_timer(e, e->realtime_fd, ev_queue[i].events); + else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_SIGNAL)) + r = process_signal(e, ev_queue[i].events); + else + r = process_io(e, ev_queue[i].data.ptr, ev_queue[i].events); + + if (r < 0) + return r; + } + + r = process_timer(e, n.monotonic, e->monotonic); + if (r < 0) + return r; + + r = process_timer(e, n.realtime, e->realtime); + if (r < 0) + return r; + + if (e->iteration == 1 && e->processed_children != 1) { + /* On the first iteration, make sure we really process + * all children which might already be zombies. */ + r = process_child(e); + if (r < 0) + return r; + } + + p = prioq_peek(e->pending); + if (!p || p->mute == SD_EVENT_MUTED) + return 0; + + return source_dispatch(p); +} + +int sd_event_loop(sd_event *e) { + int r; + + if (!e) + return -EINVAL; + + while (!e->quit) { + r = sd_event_run(e, (uint64_t) -1); + if (r < 0) + return r; + } + + return 0; +} + +int sd_event_quit(sd_event *e) { + if (!e) + return EINVAL; + + return e->quit; +} + +int sd_event_request_quit(sd_event *e) { + if (!e) + return -EINVAL; + + e->quit = true; + return 0; +} + +sd_event *sd_event_get(sd_event_source *s) { + if (!s) + return NULL; + + return s->event; +} diff --git a/src/libsystemd-bus/test-event.c b/src/libsystemd-bus/test-event.c new file mode 100644 index 0000000000..8dd47fe8a9 --- /dev/null +++ b/src/libsystemd-bus/test-event.c @@ -0,0 +1,197 @@ +/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/ + +/*** + This file is part of systemd. + + Copyright 2013 Lennart Poettering + + systemd is free software; you can redistribute it and/or modify it + under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 2.1 of the License, or + (at your option) any later version. + + systemd is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with systemd; If not, see <http://www.gnu.org/licenses/>. +***/ + +#include "sd-event.h" +#include "log.h" +#include "util.h" + +static int prepare_handler(sd_event_source *s, void *userdata) { + log_info("preparing %c", PTR_TO_INT(userdata)); + return 1; +} + +static bool got_a, got_b, got_c; + +static int io_handler(sd_event_source *s, int fd, uint32_t revents, void *userdata) { + + log_info("got IO on %c", PTR_TO_INT(userdata)); + + if (userdata == INT_TO_PTR('a')) { + assert_se(sd_event_source_set_mute(s, SD_EVENT_MUTED) >= 0); + assert_se(!got_a); + got_a = true; + } else if (userdata == INT_TO_PTR('b')) { + assert_se(!got_b); + got_b = true; + } else + assert_not_reached("Yuck!"); + + return 1; +} + +static int child_handler(sd_event_source *s, const siginfo_t *si, void *userdata) { + + assert(s); + assert(si); + + log_info("got child on %c", PTR_TO_INT(userdata)); + + assert(userdata == INT_TO_PTR('f')); + + assert_se(sd_event_request_quit(sd_event_get(s)) >= 0); + sd_event_source_unref(s); + + return 1; +} + +static int signal_handler(sd_event_source *s, const struct signalfd_siginfo *si, void *userdata) { + sd_event_source *p; + sigset_t ss; + pid_t pid; + + assert(s); + assert(si); + + log_info("got signal on %c", PTR_TO_INT(userdata)); + + assert(userdata == INT_TO_PTR('e')); + + assert_se(sigemptyset(&ss) >= 0); + assert_se(sigaddset(&ss, SIGCHLD) >= 0); + assert_se(sigprocmask(SIG_BLOCK, &ss, NULL) >= 0); + + pid = fork(); + assert_se(pid >= 0); + + if (pid == 0) + _exit(0); + + assert_se(sd_event_add_child(sd_event_get(s), pid, WEXITED, child_handler, INT_TO_PTR('f'), &p) >= 0); + assert_se(sd_event_source_set_mute(p, SD_EVENT_ONESHOT) >= 0); + + sd_event_source_unref(s); + + return 1; +} + +static int defer_handler(sd_event_source *s, void *userdata) { + sd_event_source *p; + sigset_t ss; + + assert(s); + + log_info("got defer on %c", PTR_TO_INT(userdata)); + + assert(userdata == INT_TO_PTR('d')); + + assert_se(sigemptyset(&ss) >= 0); + assert_se(sigaddset(&ss, SIGUSR1) >= 0); + assert_se(sigprocmask(SIG_BLOCK, &ss, NULL) >= 0); + assert_se(sd_event_add_signal(sd_event_get(s), SIGUSR1, signal_handler, INT_TO_PTR('e'), &p) >= 0); + assert_se(sd_event_source_set_mute(p, SD_EVENT_ONESHOT) >= 0); + raise(SIGUSR1); + + sd_event_source_unref(s); + + return 1; +} + +static bool do_quit = false; + +static int time_handler(sd_event_source *s, uint64_t usec, void *userdata) { + log_info("got timer on %c", PTR_TO_INT(userdata)); + + if (userdata == INT_TO_PTR('c')) { + + if (do_quit) { + sd_event_source *p; + + assert_se(sd_event_add_defer(sd_event_get(s), defer_handler, INT_TO_PTR('d'), &p) >= 0); + assert_se(sd_event_source_set_mute(p, SD_EVENT_ONESHOT) >= 0); + } else { + assert(!got_c); + got_c = true; + } + } else + assert_not_reached("Huh?"); + + return 2; +} + +int main(int argc, char *argv[]) { + sd_event *e = NULL; + sd_event_source *x = NULL, *y = NULL, *z = NULL; + static const char ch = 'x'; + int a[2] = { -1, -1 }, b[2] = { -1, -1}; + + assert_se(pipe(a) >= 0); + assert_se(pipe(b) >= 0); + + assert_se(sd_event_new(&e) >= 0); + + got_a = false, got_b = false, got_c = false; + + assert_se(sd_event_add_io(e, a[0], EPOLLIN, io_handler, INT_TO_PTR('a'), &x) >= 0); + assert_se(sd_event_add_io(e, b[0], EPOLLIN, io_handler, INT_TO_PTR('b'), &y) >= 0); + assert_se(sd_event_add_monotonic(e, 0, time_handler, INT_TO_PTR('c'), &z) >= 0); + + assert_se(sd_event_source_set_priority(x, 99) >= 0); + assert_se(sd_event_source_set_mute(y, SD_EVENT_ONESHOT) >= 0); + assert_se(sd_event_source_set_prepare(x, prepare_handler) >= 0); + assert_se(sd_event_source_set_priority(z, 50) >= 0); + assert_se(sd_event_source_set_mute(z, SD_EVENT_ONESHOT) >= 0); + assert_se(sd_event_source_set_prepare(z, prepare_handler) >= 0); + + assert_se(write(a[1], &ch, 1) >= 0); + assert_se(write(b[1], &ch, 1) >= 0); + + assert_se(!got_a && !got_b && !got_c); + + assert_se(sd_event_run(e, (uint64_t) -1) >= 1); + + assert_se(!got_a && got_b && !got_c); + + assert_se(sd_event_run(e, (uint64_t) -1) >= 1); + + assert_se(!got_a && got_b && got_c); + + assert_se(sd_event_run(e, (uint64_t) -1) >= 1); + + assert_se(got_a && got_b && got_c); + + sd_event_source_unref(x); + sd_event_source_unref(y); + + do_quit = true; + assert_se(sd_event_source_set_time(z, now(CLOCK_MONOTONIC) + 200 * USEC_PER_MSEC) >= 0); + assert_se(sd_event_source_set_mute(z, SD_EVENT_ONESHOT) >= 0); + + assert_se(sd_event_loop(e) >= 0); + + sd_event_source_unref(z); + + sd_event_unref(e); + + close_pipe(a); + close_pipe(b); + + return 0; +} diff --git a/src/shared/prioq.h b/src/shared/prioq.h index 4a206a3e59..d836b36cd9 100644 --- a/src/shared/prioq.h +++ b/src/shared/prioq.h @@ -25,6 +25,8 @@ typedef struct Prioq Prioq; +#define PRIOQ_IDX_NULL ((unsigned) -1) + Prioq *prioq_new(compare_func_t compare); void prioq_free(Prioq *q); int prioq_ensure_allocated(Prioq **q, compare_func_t compare_func); diff --git a/src/systemd/sd-event.h b/src/systemd/sd-event.h new file mode 100644 index 0000000000..bec41cff6d --- /dev/null +++ b/src/systemd/sd-event.h @@ -0,0 +1,97 @@ +/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/ + +#ifndef foosdeventhfoo +#define foosdeventhfoo + +/*** + This file is part of systemd. + + Copyright 2013 Lennart Poettering + + systemd is free software; you can redistribute it and/or modify it + under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 2.1 of the License, or + (at your option) any later version. + + systemd is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with systemd; If not, see <http://www.gnu.org/licenses/>. +***/ + +#include <sys/types.h> +#include <sys/signalfd.h> +#include <sys/epoll.h> +#include <inttypes.h> +#include <signal.h> + +/* + Why is this better than pure epoll? + + - Supports event source priorisation + - Scales better with a large number of time events, since it doesn't require one timerfd each + - Handles signals and child PIDs + + TODO: + + - Detect forks and return ECHILD + - Timer events with accuracy for coalescing time events +*/ + +typedef struct sd_event sd_event; +typedef struct sd_event_source sd_event_source; + +typedef enum sd_event_mute { + SD_EVENT_MUTED = 0, + SD_EVENT_UNMUTED = 1, + SD_EVENT_ONESHOT = -1 +} sd_event_mute_t; + +typedef int (*sd_io_handler_t)(sd_event_source *s, int fd, uint32_t revents, void *userdata); +typedef int (*sd_time_handler_t)(sd_event_source *s, uint64_t usec, void *userdata); +typedef int (*sd_signal_handler_t)(sd_event_source *s, const struct signalfd_siginfo *si, void *userdata); +typedef int (*sd_child_handler_t)(sd_event_source *s, const siginfo_t *si, void *userdata); +typedef int (*sd_defer_handler_t)(sd_event_source *s, void *userdata); +typedef int (*sd_prepare_handler_t)(sd_event_source *s, void *userdata); + +int sd_event_new(sd_event **e); +sd_event* sd_event_ref(sd_event *e); +sd_event* sd_event_unref(sd_event *e); + +int sd_event_add_io(sd_event *e, int fd, uint32_t events, sd_io_handler_t callback, void *userdata, sd_event_source **s); +int sd_event_add_monotonic(sd_event *e, uint64_t usec, sd_time_handler_t callback, void *userdata, sd_event_source **s); +int sd_event_add_realtime(sd_event *e, uint64_t usec, sd_time_handler_t callback, void *userdata, sd_event_source **s); +int sd_event_add_signal(sd_event *e, int sig, sd_signal_handler_t callback, void *userdata, sd_event_source **s); +int sd_event_add_child(sd_event *e, pid_t pid, int options, sd_child_handler_t callback, void *userdata, sd_event_source **s); +int sd_event_add_defer(sd_event *e, sd_defer_handler_t callback, void *userdata, sd_event_source **s); + +int sd_event_run(sd_event *e, uint64_t timeout); +int sd_event_loop(sd_event *e); + +int sd_event_quit(sd_event *e); +int sd_event_request_quit(sd_event *e); + +sd_event *sd_event_get(sd_event_source *s); + +sd_event_source* sd_event_source_ref(sd_event_source *s); +sd_event_source* sd_event_source_unref(sd_event_source *s); + +int sd_event_source_get_pending(sd_event_source *s); +int sd_event_source_get_io_fd(sd_event_source *s); +int sd_event_source_get_io_events(sd_event_source *s, uint32_t* events); +int sd_event_source_set_io_events(sd_event_source *s, uint32_t events); +int sd_event_source_get_io_revents(sd_event_source *s, uint32_t* revents); +int sd_event_source_get_signal(sd_event_source *s); +int sd_event_source_get_priority(sd_event_source *s, int *priority); +int sd_event_source_set_priority(sd_event_source *s, int priority); +int sd_event_source_get_mute(sd_event_source *s, sd_event_mute_t *m); +int sd_event_source_set_mute(sd_event_source *s, sd_event_mute_t m); +int sd_event_source_get_time(sd_event_source *s, uint64_t *usec); +int sd_event_source_set_time(sd_event_source *s, uint64_t usec); +int sd_event_source_set_prepare(sd_event_source *s, sd_prepare_handler_t callback); +void* sd_event_source_get_userdata(sd_event_source *s); + +#endif diff --git a/src/systemd/sd-journal.h b/src/systemd/sd-journal.h index 72ea328b28..887fd3007b 100644 --- a/src/systemd/sd-journal.h +++ b/src/systemd/sd-journal.h @@ -87,8 +87,9 @@ enum { SD_JOURNAL_LOCAL_ONLY = 1, SD_JOURNAL_RUNTIME_ONLY = 2, SD_JOURNAL_SYSTEM = 4, - SD_JOURNAL_SYSTEM_ONLY = SD_JOURNAL_SYSTEM, /* deprecated */ SD_JOURNAL_CURRENT_USER = 8, + + SD_JOURNAL_SYSTEM_ONLY = SD_JOURNAL_SYSTEM, /* deprecated name */ }; /* Wakeup event types */ |