diff options
author | Lennart Poettering <lennart@poettering.net> | 2013-10-10 04:40:28 +0200 |
---|---|---|
committer | Lennart Poettering <lennart@poettering.net> | 2013-10-10 04:44:48 +0200 |
commit | fd38203a2a7bfbdc6cb5fd4dc54378e70f7d6778 (patch) | |
tree | 4a4c8a464f489825f94e5edd757408bda16fc9ee /src/libsystemd-bus | |
parent | 2b98f75a63e6022bf74a7d678c47faa5208c794f (diff) |
bus: add minimal event loop API
So far we tried to use epoll directly wherever we needed an event loop.
However, that has various shortcomings, such as the inability to handle
larger amounts of timers (since each timerfd costs one fd, which is a
very limited resource, usually bounded to 1024), and inability to do
priorisation between multiple queued events.
Let's add a minimal event loop API around epoll that is suitable for
implementation of our own daemons and maybe one day can become public
API for those who desire it.
This loop is part of libsystemd-bus, but may be used independently of
it.
Diffstat (limited to 'src/libsystemd-bus')
-rw-r--r-- | src/libsystemd-bus/sd-event.c | 1440 | ||||
-rw-r--r-- | src/libsystemd-bus/test-event.c | 197 |
2 files changed, 1637 insertions, 0 deletions
diff --git a/src/libsystemd-bus/sd-event.c b/src/libsystemd-bus/sd-event.c new file mode 100644 index 0000000000..069e4fb820 --- /dev/null +++ b/src/libsystemd-bus/sd-event.c @@ -0,0 +1,1440 @@ +/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/ + +/*** + This file is part of systemd. + + Copyright 2013 Lennart Poettering + + systemd is free software; you can redistribute it and/or modify it + under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 2.1 of the License, or + (at your option) any later version. + + systemd is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with systemd; If not, see <http://www.gnu.org/licenses/>. +***/ + +#include <sys/epoll.h> +#include <sys/timerfd.h> +#include <sys/wait.h> + +#include "macro.h" +#include "refcnt.h" +#include "prioq.h" +#include "hashmap.h" +#include "util.h" +#include "time-util.h" + +#include "sd-event.h" + +#define EPOLL_QUEUE_MAX 64 + +typedef enum EventSourceType { + SOURCE_IO, + SOURCE_MONOTONIC, + SOURCE_REALTIME, + SOURCE_SIGNAL, + SOURCE_CHILD, + SOURCE_DEFER +} EventSourceType; + +struct sd_event_source { + RefCount n_ref; + + sd_event *event; + void *userdata; + sd_prepare_handler_t prepare; + + EventSourceType type:4; + sd_event_mute_t mute:3; + bool pending:1; + + int priority; + unsigned pending_index; + unsigned prepare_index; + unsigned pending_iteration; + unsigned prepare_iteration; + + union { + struct { + sd_io_handler_t callback; + int fd; + uint32_t events; + uint32_t revents; + bool registered:1; + } io; + struct { + sd_time_handler_t callback; + usec_t next; + unsigned prioq_index; + } time; + struct { + sd_signal_handler_t callback; + struct signalfd_siginfo siginfo; + int sig; + } signal; + struct { + sd_child_handler_t callback; + siginfo_t siginfo; + pid_t pid; + int options; + } child; + struct { + sd_defer_handler_t callback; + } defer; + }; +}; + +struct sd_event { + RefCount n_ref; + + int epoll_fd; + int signal_fd; + int realtime_fd; + int monotonic_fd; + + Prioq *pending; + Prioq *prepare; + Prioq *monotonic; + Prioq *realtime; + + sigset_t sigset; + sd_event_source **signal_sources; + + Hashmap *child_sources; + unsigned n_unmuted_child_sources; + + unsigned iteration; + unsigned processed_children; + + usec_t realtime_next, monotonic_next; + + bool quit; +}; + +static int pending_prioq_compare(const void *a, const void *b) { + const sd_event_source *x = a, *y = b; + + assert(x->pending); + assert(y->pending); + + /* Unmuted ones first */ + if (x->mute != SD_EVENT_MUTED && y->mute == SD_EVENT_MUTED) + return -1; + if (x->mute == SD_EVENT_MUTED && y->mute != SD_EVENT_MUTED) + return 1; + + /* Lower priority values first */ + if (x->priority < y->priority) + return -1; + if (x->priority > y->priority) + return 1; + + /* Older entries first */ + if (x->pending_iteration < y->pending_iteration) + return -1; + if (x->pending_iteration > y->pending_iteration) + return 1; + + /* Stability for the rest */ + if (x < y) + return -1; + if (y > x) + return 1; + + return 0; +} + +static int prepare_prioq_compare(const void *a, const void *b) { + const sd_event_source *x = a, *y = b; + + assert(x->prepare); + assert(y->prepare); + + /* Move most recently prepared ones last, so that we can stop + * preparing as soon as we hit one that has already been + * prepared in the current iteration */ + if (x->prepare_iteration < y->prepare_iteration) + return -1; + if (x->prepare_iteration > y->prepare_iteration) + return 1; + + /* Unmuted ones first */ + if (x->mute != SD_EVENT_MUTED && y->mute == SD_EVENT_MUTED) + return -1; + if (x->mute == SD_EVENT_MUTED && y->mute != SD_EVENT_MUTED) + return 1; + + /* Lower priority values first */ + if (x->priority < y->priority) + return -1; + if (x->priority > y->priority) + return 1; + + /* Stability for the rest */ + if (x < y) + return -1; + if (y > x) + return 1; + + return 0; +} + +static int time_prioq_compare(const void *a, const void *b) { + const sd_event_source *x = a, *y = b; + + assert(x->type == SOURCE_MONOTONIC || x->type == SOURCE_REALTIME); + assert(y->type == SOURCE_MONOTONIC || y->type == SOURCE_REALTIME); + + /* Unmuted ones first */ + if (x->mute != SD_EVENT_MUTED && y->mute == SD_EVENT_MUTED) + return -1; + if (x->mute == SD_EVENT_MUTED && y->mute != SD_EVENT_MUTED) + return 1; + + /* Move the pending ones to the end */ + if (!x->pending && y->pending) + return -1; + if (x->pending && !y->pending) + return 1; + + /* Order by time */ + if (x->time.next < y->time.next) + return -1; + if (x->time.next > y->time.next) + return -1; + + /* Stability for the rest */ + if (x < y) + return -1; + if (y > x) + return 1; + + return 0; +} + +static void event_free(sd_event *e) { + assert(e); + + if (e->epoll_fd >= 0) + close_nointr_nofail(e->epoll_fd); + + if (e->signal_fd >= 0) + close_nointr_nofail(e->signal_fd); + + if (e->realtime_fd >= 0) + close_nointr_nofail(e->realtime_fd); + + if (e->monotonic_fd >= 0) + close_nointr_nofail(e->monotonic_fd); + + prioq_free(e->pending); + prioq_free(e->prepare); + prioq_free(e->monotonic); + prioq_free(e->realtime); + + free(e->signal_sources); + + hashmap_free(e->child_sources); + free(e); +} + +int sd_event_new(sd_event** ret) { + sd_event *e; + int r; + + if (!ret) + return -EINVAL; + + e = new0(sd_event, 1); + if (!e) + return -ENOMEM; + + e->n_ref = REFCNT_INIT; + e->signal_fd = e->realtime_fd = e->monotonic_fd = e->epoll_fd = -1; + e->realtime_next = e->monotonic_next = (usec_t) -1; + + assert_se(sigemptyset(&e->sigset) == 0); + + e->pending = prioq_new(pending_prioq_compare); + if (!e->pending) { + r = -ENOMEM; + goto fail; + } + + e->epoll_fd = epoll_create1(EPOLL_CLOEXEC); + if (e->epoll_fd < 0) { + r = -errno; + goto fail; + } + + *ret = e; + return 0; + +fail: + event_free(e); + return r; +} + +sd_event* sd_event_ref(sd_event *e) { + if (!e) + return NULL; + + assert_se(REFCNT_INC(e->n_ref) >= 2); + + return e; +} + +sd_event* sd_event_unref(sd_event *e) { + if (!e) + return NULL; + + if (REFCNT_DEC(e->n_ref) <= 0) + event_free(e); + + return NULL; +} + +static int source_io_unregister(sd_event_source *s) { + int r; + + assert(s); + assert(s->type == SOURCE_IO); + + if (!s->io.registered) + return 0; + + r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_DEL, s->io.fd, NULL); + if (r < 0) + return -errno; + + s->io.registered = false; + return 0; +} + +static int source_io_register(sd_event_source *s, sd_event_mute_t m, uint32_t events) { + struct epoll_event ev = {}; + int r; + + assert(s); + assert(s->type == SOURCE_IO); + assert(m != SD_EVENT_MUTED); + + ev.events = events; + ev.data.ptr = s; + + if (m == SD_EVENT_ONESHOT) + ev.events |= EPOLLONESHOT; + + if (s->io.registered) + r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_MOD, s->io.fd, &ev); + else + r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_ADD, s->io.fd, &ev); + + if (r < 0) + return -errno; + + s->io.registered = true; + + return 0; +} + +static void source_free(sd_event_source *s) { + assert(s); + + if (s->event) { + switch (s->type) { + + case SOURCE_IO: + if (s->io.fd >= 0) + source_io_unregister(s); + + break; + + case SOURCE_MONOTONIC: + prioq_remove(s->event->monotonic, s, &s->time.prioq_index); + break; + + case SOURCE_REALTIME: + prioq_remove(s->event->realtime, s, &s->time.prioq_index); + break; + + case SOURCE_SIGNAL: + if (s->signal.sig > 0) { + if (s->signal.sig != SIGCHLD || s->event->n_unmuted_child_sources == 0) + assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0); + + if (s->event->signal_sources) + s->event->signal_sources[s->signal.sig] = NULL; + } + + break; + + case SOURCE_CHILD: + if (s->child.pid > 0) { + if (s->mute != SD_EVENT_MUTED) { + assert(s->event->n_unmuted_child_sources > 0); + s->event->n_unmuted_child_sources--; + } + + if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) + assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0); + + hashmap_remove(s->event->child_sources, INT_TO_PTR(s->child.pid)); + } + + break; + } + + if (s->pending) + prioq_remove(s->event->pending, s, &s->pending_index); + + if (s->prepare) + prioq_remove(s->event->prepare, s, &s->prepare_index); + + sd_event_unref(s->event); + } + + free(s); +} + +static int source_set_pending(sd_event_source *s, bool b) { + int r; + + assert(s); + + if (s->pending == b) + return 0; + + s->pending = b; + + if (b) { + s->pending_iteration = s->event->iteration; + + r = prioq_put(s->event->pending, s, &s->pending_index); + if (r < 0) { + s->pending = false; + return r; + } + } else + assert_se(prioq_remove(s->event->pending, s, &s->pending_index)); + + return 0; +} + +static sd_event_source *source_new(sd_event *e, EventSourceType type) { + sd_event_source *s; + + assert(e); + + s = new0(sd_event_source, 1); + if (!s) + return NULL; + + s->n_ref = REFCNT_INIT; + s->event = sd_event_ref(e); + s->type = type; + s->mute = SD_EVENT_UNMUTED; + s->pending_index = s->prepare_index = PRIOQ_IDX_NULL; + + return s; +} + +int sd_event_add_io( + sd_event *e, + int fd, + uint32_t events, + sd_io_handler_t callback, + void *userdata, + sd_event_source **ret) { + + sd_event_source *s; + int r; + + if (!e) + return -EINVAL; + if (fd < 0) + return -EINVAL; + if (events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP)) + return -EINVAL; + if (!callback) + return -EINVAL; + if (!ret) + return -EINVAL; + + s = source_new(e, SOURCE_IO); + if (!s) + return -ENOMEM; + + s->io.fd = fd; + s->io.events = events; + s->io.callback = callback; + s->userdata = userdata; + + r = source_io_register(s, s->mute, events); + if (r < 0) { + source_free(s); + return -errno; + } + + *ret = s; + return 0; +} + +static int event_setup_timer_fd( + sd_event *e, + EventSourceType type, + int *timer_fd, + clockid_t id) { + + struct epoll_event ev = {}; + int r, fd; + + assert(e); + assert(timer_fd); + + if (_likely_(*timer_fd >= 0)) + return 0; + + fd = timerfd_create(id, TFD_NONBLOCK|TFD_CLOEXEC); + if (fd < 0) + return -errno; + + ev.events = EPOLLIN; + ev.data.ptr = INT_TO_PTR(type); + + r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, fd, &ev); + if (r < 0) { + close_nointr_nofail(fd); + return -errno; + } + + *timer_fd = fd; + return 0; +} + +static int event_add_time_internal( + sd_event *e, + EventSourceType type, + int *timer_fd, + clockid_t id, + Prioq **prioq, + uint64_t usec, + sd_time_handler_t callback, + void *userdata, + sd_event_source **ret) { + + sd_event_source *s; + int r; + + if (!e) + return -EINVAL; + if (!callback) + return -EINVAL; + if (!ret) + return -EINVAL; + + assert(timer_fd); + assert(prioq); + + if (!*prioq) { + *prioq = prioq_new(time_prioq_compare); + if (!*prioq) + return -ENOMEM; + } + + if (*timer_fd < 0) { + r = event_setup_timer_fd(e, type, timer_fd, id); + if (r < 0) + return r; + } + + s = source_new(e, type); + if (!s) + return -ENOMEM; + + s->time.next = usec; + s->time.callback = callback; + s->time.prioq_index = PRIOQ_IDX_NULL; + s->userdata = userdata; + + r = prioq_put(*prioq, s, &s->time.prioq_index); + if (r < 0) { + source_free(s); + return r; + } + + *ret = s; + return 0; +} + +int sd_event_add_monotonic(sd_event *e, uint64_t usec, sd_time_handler_t callback, void *userdata, sd_event_source **ret) { + return event_add_time_internal(e, SOURCE_MONOTONIC, &e->monotonic_fd, CLOCK_MONOTONIC, &e->monotonic, usec, callback, userdata, ret); +} + +int sd_event_add_realtime(sd_event *e, uint64_t usec, sd_time_handler_t callback, void *userdata, sd_event_source **ret) { + return event_add_time_internal(e, SOURCE_REALTIME, &e->realtime_fd, CLOCK_REALTIME, &e->realtime, usec, callback, userdata, ret); +} + +static int event_update_signal_fd(sd_event *e) { + struct epoll_event ev = {}; + bool add_to_epoll; + int r; + + assert(e); + + add_to_epoll = e->signal_fd < 0; + + r = signalfd(e->signal_fd, &e->sigset, SFD_NONBLOCK|SFD_CLOEXEC); + if (r < 0) + return -errno; + + e->signal_fd = r; + + if (!add_to_epoll) + return 0; + + ev.events = EPOLLIN; + ev.data.ptr = INT_TO_PTR(SOURCE_SIGNAL); + + r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, e->signal_fd, &ev); + if (r < 0) { + close_nointr_nofail(e->signal_fd); + e->signal_fd = -1; + + return -errno; + } + + return 0; +} + +int sd_event_add_signal(sd_event *e, int sig, sd_signal_handler_t callback, void *userdata, sd_event_source **ret) { + sd_event_source *s; + int r; + + if (!e) + return -EINVAL; + if (sig <= 0) + return -EINVAL; + if (sig >= _NSIG) + return -EINVAL; + if (!callback) + return -EINVAL; + if (!ret) + return -EINVAL; + + if (!e->signal_sources) { + e->signal_sources = new0(sd_event_source*, _NSIG); + if (!e->signal_sources) + return -ENOMEM; + } else if (e->signal_sources[sig]) + return -EBUSY; + + s = source_new(e, SOURCE_SIGNAL); + if (!s) + return -ENOMEM; + + s->signal.sig = sig; + s->signal.callback = callback; + s->userdata = userdata; + + e->signal_sources[sig] = s; + assert_se(sigaddset(&e->sigset, sig) == 0); + + if (sig != SIGCHLD || e->n_unmuted_child_sources == 0) { + r = event_update_signal_fd(e); + if (r < 0) { + source_free(s); + return r; + } + } + + *ret = s; + return 0; +} + +int sd_event_add_child(sd_event *e, pid_t pid, int options, sd_child_handler_t callback, void *userdata, sd_event_source **ret) { + sd_event_source *s; + int r; + + if (!e) + return -EINVAL; + if (pid <= 1) + return -EINVAL; + if (options & ~(WEXITED|WSTOPPED|WCONTINUED)) + return -EINVAL; + if (!callback) + return -EINVAL; + if (!ret) + return -EINVAL; + + r = hashmap_ensure_allocated(&e->child_sources, trivial_hash_func, trivial_compare_func); + if (r < 0) + return r; + + if (hashmap_contains(e->child_sources, INT_TO_PTR(pid))) + return -EBUSY; + + s = source_new(e, SOURCE_CHILD); + if (!s) + return -ENOMEM; + + s->child.pid = pid; + s->child.options = options; + s->child.callback = callback; + s->userdata = userdata; + + r = hashmap_put(e->child_sources, INT_TO_PTR(pid), s); + if (r < 0) { + source_free(s); + return r; + } + + e->n_unmuted_child_sources ++; + + assert_se(sigaddset(&e->sigset, SIGCHLD) == 0); + + if (!e->signal_sources || !e->signal_sources[SIGCHLD]) { + r = event_update_signal_fd(e); + if (r < 0) { + source_free(s); + return -errno; + } + } + + *ret = s; + return 0; +} + +int sd_event_add_defer(sd_event *e, sd_defer_handler_t callback, void *userdata, sd_event_source **ret) { + sd_event_source *s; + int r; + + if (!e) + return -EINVAL; + if (!ret) + return -EINVAL; + + s = source_new(e, SOURCE_DEFER); + if (!s) + return -ENOMEM; + + s->defer.callback = callback; + s->userdata = userdata; + + r = source_set_pending(s, true); + if (r < 0) { + source_free(s); + return r; + } + + *ret = s; + return 0; +} + +sd_event_source* sd_event_source_ref(sd_event_source *s) { + if (!s) + return NULL; + + assert_se(REFCNT_INC(s->n_ref) >= 2); + + return s; +} + +sd_event_source* sd_event_source_unref(sd_event_source *s) { + if (!s) + return NULL; + + if (REFCNT_DEC(s->n_ref) <= 0) + source_free(s); + + return NULL; +} + +int sd_event_source_get_pending(sd_event_source *s) { + if (!s) + return -EINVAL; + + return s->pending; +} + +int sd_event_source_get_io_fd(sd_event_source *s) { + if (!s) + return -EINVAL; + if (s->type != SOURCE_IO) + return -EDOM; + + return s->io.fd; +} + +int sd_event_source_get_io_events(sd_event_source *s, uint32_t* events) { + if (!s) + return -EINVAL; + if (s->type != SOURCE_IO) + return -EDOM; + if (!events) + return -EINVAL; + + *events = s->io.events; + return 0; +} + +int sd_event_source_set_io_events(sd_event_source *s, uint32_t events) { + int r; + + if (!s) + return -EINVAL; + if (!s->type != SOURCE_IO) + return -EDOM; + if (events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP)) + return -EINVAL; + + if (s->io.events == events) + return 0; + + if (s->mute != SD_EVENT_MUTED) { + r = source_io_register(s, s->io.events, events); + if (r < 0) + return r; + } + + s->io.events = events; + + return 0; +} + +int sd_event_source_get_io_revents(sd_event_source *s, uint32_t* revents) { + if (!s) + return -EINVAL; + if (s->type != SOURCE_IO) + return -EDOM; + if (!revents) + return -EINVAL; + if (!s->pending) + return -ENODATA; + + *revents = s->io.revents; + return 0; +} + +int sd_event_source_get_signal(sd_event_source *s) { + if (!s) + return -EINVAL; + if (s->type != SOURCE_SIGNAL) + return -EDOM; + + return s->signal.sig; +} + +int sd_event_source_get_priority(sd_event_source *s, int *priority) { + if (!s) + return -EINVAL; + + return s->priority; +} + +int sd_event_source_set_priority(sd_event_source *s, int priority) { + if (!s) + return -EINVAL; + + if (s->priority == priority) + return 0; + + s->priority = priority; + + if (s->pending) + assert_se(prioq_reshuffle(s->event->pending, s, &s->pending_index) == 0); + + if (s->prepare) + assert_se(prioq_reshuffle(s->event->prepare, s, &s->prepare_index) == 0); + + return 0; +} + +int sd_event_source_get_mute(sd_event_source *s, sd_event_mute_t *m) { + if (!s) + return -EINVAL; + if (!m) + return -EINVAL; + + *m = s->mute; + return 0; +} + +int sd_event_source_set_mute(sd_event_source *s, sd_event_mute_t m) { + int r; + + if (!s) + return -EINVAL; + if (m != SD_EVENT_MUTED && m != SD_EVENT_UNMUTED && !SD_EVENT_ONESHOT) + return -EINVAL; + + if (s->mute == m) + return 0; + + if (m == SD_EVENT_MUTED) { + + switch (s->type) { + + case SOURCE_IO: + r = source_io_unregister(s); + if (r < 0) + return r; + + s->mute = m; + break; + + case SOURCE_MONOTONIC: + s->mute = m; + prioq_reshuffle(s->event->monotonic, s, &s->time.prioq_index); + break; + + case SOURCE_REALTIME: + s->mute = m; + prioq_reshuffle(s->event->realtime, s, &s->time.prioq_index); + break; + + case SOURCE_SIGNAL: + s->mute = m; + if (s->signal.sig != SIGCHLD || s->event->n_unmuted_child_sources == 0) { + assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0); + event_update_signal_fd(s->event); + } + + break; + + case SOURCE_CHILD: + s->mute = m; + + assert(s->event->n_unmuted_child_sources > 0); + s->event->n_unmuted_child_sources--; + + if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) { + assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0); + event_update_signal_fd(s->event); + } + + break; + + case SOURCE_DEFER: + s->mute = m; + break; + } + + } else { + switch (s->type) { + + case SOURCE_IO: + r = source_io_register(s, m, s->io.events); + if (r < 0) + return r; + + s->mute = m; + break; + + case SOURCE_MONOTONIC: + s->mute = m; + prioq_reshuffle(s->event->monotonic, s, &s->time.prioq_index); + break; + + case SOURCE_REALTIME: + s->mute = m; + prioq_reshuffle(s->event->realtime, s, &s->time.prioq_index); + break; + + case SOURCE_SIGNAL: + s->mute = m; + + if (s->signal.sig != SIGCHLD || s->event->n_unmuted_child_sources == 0) { + assert_se(sigaddset(&s->event->sigset, s->signal.sig) == 0); + event_update_signal_fd(s->event); + } + break; + + case SOURCE_CHILD: + s->mute = m; + + if (s->mute == SD_EVENT_MUTED) { + s->event->n_unmuted_child_sources++; + + if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) { + assert_se(sigaddset(&s->event->sigset, SIGCHLD) == 0); + event_update_signal_fd(s->event); + } + } + break; + + case SOURCE_DEFER: + s->mute = m; + break; + } + } + + if (s->pending) + prioq_reshuffle(s->event->pending, s, &s->pending_index); + + if (s->prepare) + prioq_reshuffle(s->event->prepare, s, &s->prepare_index); + + return 0; +} + +int sd_event_source_get_time(sd_event_source *s, uint64_t *usec) { + if (!s) + return -EINVAL; + if (!usec) + return -EINVAL; + if (s->type != SOURCE_REALTIME && s->type != SOURCE_MONOTONIC) + return -EDOM; + + *usec = s->time.next; + return 0; +} + +int sd_event_source_set_time(sd_event_source *s, uint64_t usec) { + if (!s) + return -EINVAL; + if (s->type != SOURCE_REALTIME && s->type != SOURCE_MONOTONIC) + return -EDOM; + + if (s->time.next == usec) + return 0; + + s->time.next = usec; + + if (s->type == SOURCE_REALTIME) + prioq_reshuffle(s->event->realtime, s, &s->time.prioq_index); + else + prioq_reshuffle(s->event->monotonic, s, &s->time.prioq_index); + + return 0; +} + +int sd_event_source_set_prepare(sd_event_source *s, sd_prepare_handler_t callback) { + int r; + + if (!s) + return -EINVAL; + + if (s->prepare == callback) + return 0; + + if (callback && s->prepare) { + s->prepare = callback; + return 0; + } + + r = prioq_ensure_allocated(&s->event->prepare, prepare_prioq_compare); + if (r < 0) + return r; + + s->prepare = callback; + + if (callback) { + r = prioq_put(s->event->prepare, s, &s->prepare_index); + if (r < 0) + return r; + } else + prioq_remove(s->event->prepare, s, &s->prepare_index); + + return 0; +} + +void* sd_event_source_get_userdata(sd_event_source *s) { + if (!s) + return NULL; + + return s->userdata; +} + +static int event_arm_timer( + sd_event *e, + int timer_fd, + Prioq *prioq, + usec_t *next) { + + struct itimerspec its = {}; + sd_event_source *s; + int r; + + assert_se(e); + assert_se(next); + + s = prioq_peek(prioq); + if (!s || s->mute == SD_EVENT_MUTED) + return 0; + + if (*next == s->time.next) + return 0; + + assert_se(timer_fd >= 0); + + if (s->time.next == 0) { + /* We don' want to disarm here, just mean some time looooong ago. */ + its.it_value.tv_sec = 0; + its.it_value.tv_nsec = 1; + } else + timespec_store(&its.it_value, s->time.next); + + r = timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &its, NULL); + if (r < 0) + return r; + + *next = s->time.next; + return 0; +} + +static int process_io(sd_event *e, sd_event_source *s, uint32_t events) { + assert(e); + assert(s); + assert(s->type == SOURCE_IO); + + s->io.revents = events; + + /* + If this is a oneshot event source, then we added it to the + epoll with EPOLLONESHOT, hence we know it's not registered + anymore. We can save a syscall here... + */ + + if (s->mute == SD_EVENT_ONESHOT) + s->io.registered = false; + + return source_set_pending(s, true); +} + +static int flush_timer(sd_event *e, int fd, uint32_t events) { + uint64_t x; + ssize_t ss; + + assert(e); + + if (events != EPOLLIN) + return -EIO; + + ss = read(fd, &x, sizeof(x)); + if (ss < 0) { + if (errno == EAGAIN || errno == EINTR) + return 0; + + return -errno; + } + + if (ss != sizeof(x)) + return -EIO; + + return 0; +} + +static int process_timer(sd_event *e, usec_t n, Prioq *prioq) { + sd_event_source *s; + int r; + + assert(e); + + for (;;) { + s = prioq_peek(prioq); + if (!s || + s->time.next > n || + s->mute == SD_EVENT_MUTED || + s->pending) + break; + + r = source_set_pending(s, true); + if (r < 0) + return r; + + r = prioq_reshuffle(prioq, s, &s->time.prioq_index); + if (r < 0) + return r; + } + + return 0; +} + +static int process_child(sd_event *e) { + sd_event_source *s; + Iterator i; + int r; + + assert(e); + + /* + So, this is ugly. We iteratively invoke waitid() with P_PID + + WNOHANG for each PID we wait for, instead of using + P_ALL. This is because we only want to get child + information of very specific child processes, and not all + of them. We might not have processed the SIGCHLD even of a + previous invocation and we don't want to maintain a + unbounded *per-child* event queue, hence we really don't + want anything flushed out of the kernel's queue that we + don't care about. Since this is O(n) this means that if you + have a lot of processes you probably want to handle SIGCHLD + yourself. + */ + + HASHMAP_FOREACH(s, e->child_sources, i) { + assert(s->type == SOURCE_CHILD); + + if (s->pending) + continue; + + if (s->mute == SD_EVENT_MUTED) + continue; + + zero(s->child.siginfo); + r = waitid(P_PID, s->child.pid, &s->child.siginfo, WNOHANG|s->child.options); + if (r < 0) + return -errno; + + if (s->child.siginfo.si_pid != 0) { + r = source_set_pending(s, true); + if (r < 0) + return r; + } + } + + e->processed_children = e->iteration; + return 0; +} + +static int process_signal(sd_event *e, uint32_t events) { + struct signalfd_siginfo si; + bool read_one = false; + ssize_t ss; + int r; + + if (events != EPOLLIN) + return -EIO; + + for (;;) { + sd_event_source *s; + + ss = read(e->signal_fd, &si, sizeof(si)); + if (ss < 0) { + if (errno == EAGAIN || errno == EINTR) + return read_one; + + return -errno; + } + + if (ss != sizeof(si)) + return -EIO; + + read_one = true; + + if (si.ssi_signo == SIGCHLD) { + r = process_child(e); + if (r < 0) + return r; + if (r > 0 || !e->signal_sources[si.ssi_signo]) + continue; + } else { + s = e->signal_sources[si.ssi_signo]; + if (!s) + return -EIO; + } + + s->signal.siginfo = si; + r = source_set_pending(s, true); + if (r < 0) + return r; + } + + + return 0; +} + +static int source_dispatch(sd_event_source *s) { + int r; + + assert(s); + assert(s->pending); + + r = source_set_pending(s, false); + if (r < 0) + return r; + + if (s->mute == SD_EVENT_ONESHOT) { + r = sd_event_source_set_mute(s, SD_EVENT_MUTED); + if (r < 0) + return r; + } + + switch (s->type) { + + case SOURCE_IO: + r = s->io.callback(s, s->io.fd, s->io.revents, s->userdata); + break; + + case SOURCE_MONOTONIC: + r = s->time.callback(s, s->time.next, s->userdata); + break; + + case SOURCE_REALTIME: + r = s->time.callback(s, s->time.next, s->userdata); + break; + + case SOURCE_SIGNAL: + r = s->signal.callback(s, &s->signal.siginfo, s->userdata); + break; + + case SOURCE_CHILD: + r = s->child.callback(s, &s->child.siginfo, s->userdata); + break; + + case SOURCE_DEFER: + r = s->defer.callback(s, s->userdata); + break; + } + + return r; +} + +static int event_prepare(sd_event *e) { + int r; + + assert(e); + + for (;;) { + sd_event_source *s; + + s = prioq_peek(e->prepare); + if (!s || s->prepare_iteration == e->iteration || s->mute == SD_EVENT_MUTED) + break; + + s->prepare_iteration = e->iteration; + r = prioq_reshuffle(e->prepare, s, &s->prepare_index); + if (r < 0) + return r; + + assert(s->prepare); + r = s->prepare(s, s->userdata); + if (r < 0) + return r; + + } + + return 0; +} + +int sd_event_run(sd_event *e, uint64_t timeout) { + struct epoll_event ev_queue[EPOLL_QUEUE_MAX]; + sd_event_source *p; + int r, i, m; + dual_timestamp n; + + if (!e) + return -EINVAL; + if (e->quit) + return -ESTALE; + + e->iteration++; + + r = event_prepare(e); + if (r < 0) + return r; + + r = event_arm_timer(e, e->monotonic_fd, e->monotonic, &e->monotonic_next); + if (r < 0) + return r; + + r = event_arm_timer(e, e->realtime_fd, e->realtime, &e->realtime_next); + if (r < 0) + return r; + + if (e->iteration == 1 && !hashmap_isempty(e->child_sources)) + /* On the first iteration, there might be already some + * zombies for us to care for, hence, don't wait */ + timeout = 0; + else { + p = prioq_peek(e->pending); + if (p && p->mute != SD_EVENT_MUTED) + timeout = 0; + } + + m = epoll_wait(e->epoll_fd, ev_queue, EPOLL_QUEUE_MAX, timeout == (uint64_t) -1 ? -1 : (int) ((timeout + USEC_PER_MSEC - 1) / USEC_PER_MSEC)); + if (m < 0) + return m; + + dual_timestamp_get(&n); + + for (i = 0; i < m; i++) { + + if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_MONOTONIC)) + r = flush_timer(e, e->monotonic_fd, ev_queue[i].events); + else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_REALTIME)) + r = flush_timer(e, e->realtime_fd, ev_queue[i].events); + else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_SIGNAL)) + r = process_signal(e, ev_queue[i].events); + else + r = process_io(e, ev_queue[i].data.ptr, ev_queue[i].events); + + if (r < 0) + return r; + } + + r = process_timer(e, n.monotonic, e->monotonic); + if (r < 0) + return r; + + r = process_timer(e, n.realtime, e->realtime); + if (r < 0) + return r; + + if (e->iteration == 1 && e->processed_children != 1) { + /* On the first iteration, make sure we really process + * all children which might already be zombies. */ + r = process_child(e); + if (r < 0) + return r; + } + + p = prioq_peek(e->pending); + if (!p || p->mute == SD_EVENT_MUTED) + return 0; + + return source_dispatch(p); +} + +int sd_event_loop(sd_event *e) { + int r; + + if (!e) + return -EINVAL; + + while (!e->quit) { + r = sd_event_run(e, (uint64_t) -1); + if (r < 0) + return r; + } + + return 0; +} + +int sd_event_quit(sd_event *e) { + if (!e) + return EINVAL; + + return e->quit; +} + +int sd_event_request_quit(sd_event *e) { + if (!e) + return -EINVAL; + + e->quit = true; + return 0; +} + +sd_event *sd_event_get(sd_event_source *s) { + if (!s) + return NULL; + + return s->event; +} diff --git a/src/libsystemd-bus/test-event.c b/src/libsystemd-bus/test-event.c new file mode 100644 index 0000000000..8dd47fe8a9 --- /dev/null +++ b/src/libsystemd-bus/test-event.c @@ -0,0 +1,197 @@ +/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/ + +/*** + This file is part of systemd. + + Copyright 2013 Lennart Poettering + + systemd is free software; you can redistribute it and/or modify it + under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 2.1 of the License, or + (at your option) any later version. + + systemd is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with systemd; If not, see <http://www.gnu.org/licenses/>. +***/ + +#include "sd-event.h" +#include "log.h" +#include "util.h" + +static int prepare_handler(sd_event_source *s, void *userdata) { + log_info("preparing %c", PTR_TO_INT(userdata)); + return 1; +} + +static bool got_a, got_b, got_c; + +static int io_handler(sd_event_source *s, int fd, uint32_t revents, void *userdata) { + + log_info("got IO on %c", PTR_TO_INT(userdata)); + + if (userdata == INT_TO_PTR('a')) { + assert_se(sd_event_source_set_mute(s, SD_EVENT_MUTED) >= 0); + assert_se(!got_a); + got_a = true; + } else if (userdata == INT_TO_PTR('b')) { + assert_se(!got_b); + got_b = true; + } else + assert_not_reached("Yuck!"); + + return 1; +} + +static int child_handler(sd_event_source *s, const siginfo_t *si, void *userdata) { + + assert(s); + assert(si); + + log_info("got child on %c", PTR_TO_INT(userdata)); + + assert(userdata == INT_TO_PTR('f')); + + assert_se(sd_event_request_quit(sd_event_get(s)) >= 0); + sd_event_source_unref(s); + + return 1; +} + +static int signal_handler(sd_event_source *s, const struct signalfd_siginfo *si, void *userdata) { + sd_event_source *p; + sigset_t ss; + pid_t pid; + + assert(s); + assert(si); + + log_info("got signal on %c", PTR_TO_INT(userdata)); + + assert(userdata == INT_TO_PTR('e')); + + assert_se(sigemptyset(&ss) >= 0); + assert_se(sigaddset(&ss, SIGCHLD) >= 0); + assert_se(sigprocmask(SIG_BLOCK, &ss, NULL) >= 0); + + pid = fork(); + assert_se(pid >= 0); + + if (pid == 0) + _exit(0); + + assert_se(sd_event_add_child(sd_event_get(s), pid, WEXITED, child_handler, INT_TO_PTR('f'), &p) >= 0); + assert_se(sd_event_source_set_mute(p, SD_EVENT_ONESHOT) >= 0); + + sd_event_source_unref(s); + + return 1; +} + +static int defer_handler(sd_event_source *s, void *userdata) { + sd_event_source *p; + sigset_t ss; + + assert(s); + + log_info("got defer on %c", PTR_TO_INT(userdata)); + + assert(userdata == INT_TO_PTR('d')); + + assert_se(sigemptyset(&ss) >= 0); + assert_se(sigaddset(&ss, SIGUSR1) >= 0); + assert_se(sigprocmask(SIG_BLOCK, &ss, NULL) >= 0); + assert_se(sd_event_add_signal(sd_event_get(s), SIGUSR1, signal_handler, INT_TO_PTR('e'), &p) >= 0); + assert_se(sd_event_source_set_mute(p, SD_EVENT_ONESHOT) >= 0); + raise(SIGUSR1); + + sd_event_source_unref(s); + + return 1; +} + +static bool do_quit = false; + +static int time_handler(sd_event_source *s, uint64_t usec, void *userdata) { + log_info("got timer on %c", PTR_TO_INT(userdata)); + + if (userdata == INT_TO_PTR('c')) { + + if (do_quit) { + sd_event_source *p; + + assert_se(sd_event_add_defer(sd_event_get(s), defer_handler, INT_TO_PTR('d'), &p) >= 0); + assert_se(sd_event_source_set_mute(p, SD_EVENT_ONESHOT) >= 0); + } else { + assert(!got_c); + got_c = true; + } + } else + assert_not_reached("Huh?"); + + return 2; +} + +int main(int argc, char *argv[]) { + sd_event *e = NULL; + sd_event_source *x = NULL, *y = NULL, *z = NULL; + static const char ch = 'x'; + int a[2] = { -1, -1 }, b[2] = { -1, -1}; + + assert_se(pipe(a) >= 0); + assert_se(pipe(b) >= 0); + + assert_se(sd_event_new(&e) >= 0); + + got_a = false, got_b = false, got_c = false; + + assert_se(sd_event_add_io(e, a[0], EPOLLIN, io_handler, INT_TO_PTR('a'), &x) >= 0); + assert_se(sd_event_add_io(e, b[0], EPOLLIN, io_handler, INT_TO_PTR('b'), &y) >= 0); + assert_se(sd_event_add_monotonic(e, 0, time_handler, INT_TO_PTR('c'), &z) >= 0); + + assert_se(sd_event_source_set_priority(x, 99) >= 0); + assert_se(sd_event_source_set_mute(y, SD_EVENT_ONESHOT) >= 0); + assert_se(sd_event_source_set_prepare(x, prepare_handler) >= 0); + assert_se(sd_event_source_set_priority(z, 50) >= 0); + assert_se(sd_event_source_set_mute(z, SD_EVENT_ONESHOT) >= 0); + assert_se(sd_event_source_set_prepare(z, prepare_handler) >= 0); + + assert_se(write(a[1], &ch, 1) >= 0); + assert_se(write(b[1], &ch, 1) >= 0); + + assert_se(!got_a && !got_b && !got_c); + + assert_se(sd_event_run(e, (uint64_t) -1) >= 1); + + assert_se(!got_a && got_b && !got_c); + + assert_se(sd_event_run(e, (uint64_t) -1) >= 1); + + assert_se(!got_a && got_b && got_c); + + assert_se(sd_event_run(e, (uint64_t) -1) >= 1); + + assert_se(got_a && got_b && got_c); + + sd_event_source_unref(x); + sd_event_source_unref(y); + + do_quit = true; + assert_se(sd_event_source_set_time(z, now(CLOCK_MONOTONIC) + 200 * USEC_PER_MSEC) >= 0); + assert_se(sd_event_source_set_mute(z, SD_EVENT_ONESHOT) >= 0); + + assert_se(sd_event_loop(e) >= 0); + + sd_event_source_unref(z); + + sd_event_unref(e); + + close_pipe(a); + close_pipe(b); + + return 0; +} |