summaryrefslogtreecommitdiff
path: root/src/libsystemd-bus
diff options
context:
space:
mode:
Diffstat (limited to 'src/libsystemd-bus')
-rw-r--r--src/libsystemd-bus/sd-event.c1440
-rw-r--r--src/libsystemd-bus/test-event.c197
2 files changed, 1637 insertions, 0 deletions
diff --git a/src/libsystemd-bus/sd-event.c b/src/libsystemd-bus/sd-event.c
new file mode 100644
index 0000000000..069e4fb820
--- /dev/null
+++ b/src/libsystemd-bus/sd-event.c
@@ -0,0 +1,1440 @@
+/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
+
+/***
+ This file is part of systemd.
+
+ Copyright 2013 Lennart Poettering
+
+ systemd is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 2.1 of the License, or
+ (at your option) any later version.
+
+ systemd is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with systemd; If not, see <http://www.gnu.org/licenses/>.
+***/
+
+#include <sys/epoll.h>
+#include <sys/timerfd.h>
+#include <sys/wait.h>
+
+#include "macro.h"
+#include "refcnt.h"
+#include "prioq.h"
+#include "hashmap.h"
+#include "util.h"
+#include "time-util.h"
+
+#include "sd-event.h"
+
+#define EPOLL_QUEUE_MAX 64
+
+typedef enum EventSourceType {
+ SOURCE_IO,
+ SOURCE_MONOTONIC,
+ SOURCE_REALTIME,
+ SOURCE_SIGNAL,
+ SOURCE_CHILD,
+ SOURCE_DEFER
+} EventSourceType;
+
+struct sd_event_source {
+ RefCount n_ref;
+
+ sd_event *event;
+ void *userdata;
+ sd_prepare_handler_t prepare;
+
+ EventSourceType type:4;
+ sd_event_mute_t mute:3;
+ bool pending:1;
+
+ int priority;
+ unsigned pending_index;
+ unsigned prepare_index;
+ unsigned pending_iteration;
+ unsigned prepare_iteration;
+
+ union {
+ struct {
+ sd_io_handler_t callback;
+ int fd;
+ uint32_t events;
+ uint32_t revents;
+ bool registered:1;
+ } io;
+ struct {
+ sd_time_handler_t callback;
+ usec_t next;
+ unsigned prioq_index;
+ } time;
+ struct {
+ sd_signal_handler_t callback;
+ struct signalfd_siginfo siginfo;
+ int sig;
+ } signal;
+ struct {
+ sd_child_handler_t callback;
+ siginfo_t siginfo;
+ pid_t pid;
+ int options;
+ } child;
+ struct {
+ sd_defer_handler_t callback;
+ } defer;
+ };
+};
+
+struct sd_event {
+ RefCount n_ref;
+
+ int epoll_fd;
+ int signal_fd;
+ int realtime_fd;
+ int monotonic_fd;
+
+ Prioq *pending;
+ Prioq *prepare;
+ Prioq *monotonic;
+ Prioq *realtime;
+
+ sigset_t sigset;
+ sd_event_source **signal_sources;
+
+ Hashmap *child_sources;
+ unsigned n_unmuted_child_sources;
+
+ unsigned iteration;
+ unsigned processed_children;
+
+ usec_t realtime_next, monotonic_next;
+
+ bool quit;
+};
+
+static int pending_prioq_compare(const void *a, const void *b) {
+ const sd_event_source *x = a, *y = b;
+
+ assert(x->pending);
+ assert(y->pending);
+
+ /* Unmuted ones first */
+ if (x->mute != SD_EVENT_MUTED && y->mute == SD_EVENT_MUTED)
+ return -1;
+ if (x->mute == SD_EVENT_MUTED && y->mute != SD_EVENT_MUTED)
+ return 1;
+
+ /* Lower priority values first */
+ if (x->priority < y->priority)
+ return -1;
+ if (x->priority > y->priority)
+ return 1;
+
+ /* Older entries first */
+ if (x->pending_iteration < y->pending_iteration)
+ return -1;
+ if (x->pending_iteration > y->pending_iteration)
+ return 1;
+
+ /* Stability for the rest */
+ if (x < y)
+ return -1;
+ if (y > x)
+ return 1;
+
+ return 0;
+}
+
+static int prepare_prioq_compare(const void *a, const void *b) {
+ const sd_event_source *x = a, *y = b;
+
+ assert(x->prepare);
+ assert(y->prepare);
+
+ /* Move most recently prepared ones last, so that we can stop
+ * preparing as soon as we hit one that has already been
+ * prepared in the current iteration */
+ if (x->prepare_iteration < y->prepare_iteration)
+ return -1;
+ if (x->prepare_iteration > y->prepare_iteration)
+ return 1;
+
+ /* Unmuted ones first */
+ if (x->mute != SD_EVENT_MUTED && y->mute == SD_EVENT_MUTED)
+ return -1;
+ if (x->mute == SD_EVENT_MUTED && y->mute != SD_EVENT_MUTED)
+ return 1;
+
+ /* Lower priority values first */
+ if (x->priority < y->priority)
+ return -1;
+ if (x->priority > y->priority)
+ return 1;
+
+ /* Stability for the rest */
+ if (x < y)
+ return -1;
+ if (y > x)
+ return 1;
+
+ return 0;
+}
+
+static int time_prioq_compare(const void *a, const void *b) {
+ const sd_event_source *x = a, *y = b;
+
+ assert(x->type == SOURCE_MONOTONIC || x->type == SOURCE_REALTIME);
+ assert(y->type == SOURCE_MONOTONIC || y->type == SOURCE_REALTIME);
+
+ /* Unmuted ones first */
+ if (x->mute != SD_EVENT_MUTED && y->mute == SD_EVENT_MUTED)
+ return -1;
+ if (x->mute == SD_EVENT_MUTED && y->mute != SD_EVENT_MUTED)
+ return 1;
+
+ /* Move the pending ones to the end */
+ if (!x->pending && y->pending)
+ return -1;
+ if (x->pending && !y->pending)
+ return 1;
+
+ /* Order by time */
+ if (x->time.next < y->time.next)
+ return -1;
+ if (x->time.next > y->time.next)
+ return -1;
+
+ /* Stability for the rest */
+ if (x < y)
+ return -1;
+ if (y > x)
+ return 1;
+
+ return 0;
+}
+
+static void event_free(sd_event *e) {
+ assert(e);
+
+ if (e->epoll_fd >= 0)
+ close_nointr_nofail(e->epoll_fd);
+
+ if (e->signal_fd >= 0)
+ close_nointr_nofail(e->signal_fd);
+
+ if (e->realtime_fd >= 0)
+ close_nointr_nofail(e->realtime_fd);
+
+ if (e->monotonic_fd >= 0)
+ close_nointr_nofail(e->monotonic_fd);
+
+ prioq_free(e->pending);
+ prioq_free(e->prepare);
+ prioq_free(e->monotonic);
+ prioq_free(e->realtime);
+
+ free(e->signal_sources);
+
+ hashmap_free(e->child_sources);
+ free(e);
+}
+
+int sd_event_new(sd_event** ret) {
+ sd_event *e;
+ int r;
+
+ if (!ret)
+ return -EINVAL;
+
+ e = new0(sd_event, 1);
+ if (!e)
+ return -ENOMEM;
+
+ e->n_ref = REFCNT_INIT;
+ e->signal_fd = e->realtime_fd = e->monotonic_fd = e->epoll_fd = -1;
+ e->realtime_next = e->monotonic_next = (usec_t) -1;
+
+ assert_se(sigemptyset(&e->sigset) == 0);
+
+ e->pending = prioq_new(pending_prioq_compare);
+ if (!e->pending) {
+ r = -ENOMEM;
+ goto fail;
+ }
+
+ e->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
+ if (e->epoll_fd < 0) {
+ r = -errno;
+ goto fail;
+ }
+
+ *ret = e;
+ return 0;
+
+fail:
+ event_free(e);
+ return r;
+}
+
+sd_event* sd_event_ref(sd_event *e) {
+ if (!e)
+ return NULL;
+
+ assert_se(REFCNT_INC(e->n_ref) >= 2);
+
+ return e;
+}
+
+sd_event* sd_event_unref(sd_event *e) {
+ if (!e)
+ return NULL;
+
+ if (REFCNT_DEC(e->n_ref) <= 0)
+ event_free(e);
+
+ return NULL;
+}
+
+static int source_io_unregister(sd_event_source *s) {
+ int r;
+
+ assert(s);
+ assert(s->type == SOURCE_IO);
+
+ if (!s->io.registered)
+ return 0;
+
+ r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_DEL, s->io.fd, NULL);
+ if (r < 0)
+ return -errno;
+
+ s->io.registered = false;
+ return 0;
+}
+
+static int source_io_register(sd_event_source *s, sd_event_mute_t m, uint32_t events) {
+ struct epoll_event ev = {};
+ int r;
+
+ assert(s);
+ assert(s->type == SOURCE_IO);
+ assert(m != SD_EVENT_MUTED);
+
+ ev.events = events;
+ ev.data.ptr = s;
+
+ if (m == SD_EVENT_ONESHOT)
+ ev.events |= EPOLLONESHOT;
+
+ if (s->io.registered)
+ r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_MOD, s->io.fd, &ev);
+ else
+ r = epoll_ctl(s->event->epoll_fd, EPOLL_CTL_ADD, s->io.fd, &ev);
+
+ if (r < 0)
+ return -errno;
+
+ s->io.registered = true;
+
+ return 0;
+}
+
+static void source_free(sd_event_source *s) {
+ assert(s);
+
+ if (s->event) {
+ switch (s->type) {
+
+ case SOURCE_IO:
+ if (s->io.fd >= 0)
+ source_io_unregister(s);
+
+ break;
+
+ case SOURCE_MONOTONIC:
+ prioq_remove(s->event->monotonic, s, &s->time.prioq_index);
+ break;
+
+ case SOURCE_REALTIME:
+ prioq_remove(s->event->realtime, s, &s->time.prioq_index);
+ break;
+
+ case SOURCE_SIGNAL:
+ if (s->signal.sig > 0) {
+ if (s->signal.sig != SIGCHLD || s->event->n_unmuted_child_sources == 0)
+ assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
+
+ if (s->event->signal_sources)
+ s->event->signal_sources[s->signal.sig] = NULL;
+ }
+
+ break;
+
+ case SOURCE_CHILD:
+ if (s->child.pid > 0) {
+ if (s->mute != SD_EVENT_MUTED) {
+ assert(s->event->n_unmuted_child_sources > 0);
+ s->event->n_unmuted_child_sources--;
+ }
+
+ if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD])
+ assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
+
+ hashmap_remove(s->event->child_sources, INT_TO_PTR(s->child.pid));
+ }
+
+ break;
+ }
+
+ if (s->pending)
+ prioq_remove(s->event->pending, s, &s->pending_index);
+
+ if (s->prepare)
+ prioq_remove(s->event->prepare, s, &s->prepare_index);
+
+ sd_event_unref(s->event);
+ }
+
+ free(s);
+}
+
+static int source_set_pending(sd_event_source *s, bool b) {
+ int r;
+
+ assert(s);
+
+ if (s->pending == b)
+ return 0;
+
+ s->pending = b;
+
+ if (b) {
+ s->pending_iteration = s->event->iteration;
+
+ r = prioq_put(s->event->pending, s, &s->pending_index);
+ if (r < 0) {
+ s->pending = false;
+ return r;
+ }
+ } else
+ assert_se(prioq_remove(s->event->pending, s, &s->pending_index));
+
+ return 0;
+}
+
+static sd_event_source *source_new(sd_event *e, EventSourceType type) {
+ sd_event_source *s;
+
+ assert(e);
+
+ s = new0(sd_event_source, 1);
+ if (!s)
+ return NULL;
+
+ s->n_ref = REFCNT_INIT;
+ s->event = sd_event_ref(e);
+ s->type = type;
+ s->mute = SD_EVENT_UNMUTED;
+ s->pending_index = s->prepare_index = PRIOQ_IDX_NULL;
+
+ return s;
+}
+
+int sd_event_add_io(
+ sd_event *e,
+ int fd,
+ uint32_t events,
+ sd_io_handler_t callback,
+ void *userdata,
+ sd_event_source **ret) {
+
+ sd_event_source *s;
+ int r;
+
+ if (!e)
+ return -EINVAL;
+ if (fd < 0)
+ return -EINVAL;
+ if (events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP))
+ return -EINVAL;
+ if (!callback)
+ return -EINVAL;
+ if (!ret)
+ return -EINVAL;
+
+ s = source_new(e, SOURCE_IO);
+ if (!s)
+ return -ENOMEM;
+
+ s->io.fd = fd;
+ s->io.events = events;
+ s->io.callback = callback;
+ s->userdata = userdata;
+
+ r = source_io_register(s, s->mute, events);
+ if (r < 0) {
+ source_free(s);
+ return -errno;
+ }
+
+ *ret = s;
+ return 0;
+}
+
+static int event_setup_timer_fd(
+ sd_event *e,
+ EventSourceType type,
+ int *timer_fd,
+ clockid_t id) {
+
+ struct epoll_event ev = {};
+ int r, fd;
+
+ assert(e);
+ assert(timer_fd);
+
+ if (_likely_(*timer_fd >= 0))
+ return 0;
+
+ fd = timerfd_create(id, TFD_NONBLOCK|TFD_CLOEXEC);
+ if (fd < 0)
+ return -errno;
+
+ ev.events = EPOLLIN;
+ ev.data.ptr = INT_TO_PTR(type);
+
+ r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, fd, &ev);
+ if (r < 0) {
+ close_nointr_nofail(fd);
+ return -errno;
+ }
+
+ *timer_fd = fd;
+ return 0;
+}
+
+static int event_add_time_internal(
+ sd_event *e,
+ EventSourceType type,
+ int *timer_fd,
+ clockid_t id,
+ Prioq **prioq,
+ uint64_t usec,
+ sd_time_handler_t callback,
+ void *userdata,
+ sd_event_source **ret) {
+
+ sd_event_source *s;
+ int r;
+
+ if (!e)
+ return -EINVAL;
+ if (!callback)
+ return -EINVAL;
+ if (!ret)
+ return -EINVAL;
+
+ assert(timer_fd);
+ assert(prioq);
+
+ if (!*prioq) {
+ *prioq = prioq_new(time_prioq_compare);
+ if (!*prioq)
+ return -ENOMEM;
+ }
+
+ if (*timer_fd < 0) {
+ r = event_setup_timer_fd(e, type, timer_fd, id);
+ if (r < 0)
+ return r;
+ }
+
+ s = source_new(e, type);
+ if (!s)
+ return -ENOMEM;
+
+ s->time.next = usec;
+ s->time.callback = callback;
+ s->time.prioq_index = PRIOQ_IDX_NULL;
+ s->userdata = userdata;
+
+ r = prioq_put(*prioq, s, &s->time.prioq_index);
+ if (r < 0) {
+ source_free(s);
+ return r;
+ }
+
+ *ret = s;
+ return 0;
+}
+
+int sd_event_add_monotonic(sd_event *e, uint64_t usec, sd_time_handler_t callback, void *userdata, sd_event_source **ret) {
+ return event_add_time_internal(e, SOURCE_MONOTONIC, &e->monotonic_fd, CLOCK_MONOTONIC, &e->monotonic, usec, callback, userdata, ret);
+}
+
+int sd_event_add_realtime(sd_event *e, uint64_t usec, sd_time_handler_t callback, void *userdata, sd_event_source **ret) {
+ return event_add_time_internal(e, SOURCE_REALTIME, &e->realtime_fd, CLOCK_REALTIME, &e->realtime, usec, callback, userdata, ret);
+}
+
+static int event_update_signal_fd(sd_event *e) {
+ struct epoll_event ev = {};
+ bool add_to_epoll;
+ int r;
+
+ assert(e);
+
+ add_to_epoll = e->signal_fd < 0;
+
+ r = signalfd(e->signal_fd, &e->sigset, SFD_NONBLOCK|SFD_CLOEXEC);
+ if (r < 0)
+ return -errno;
+
+ e->signal_fd = r;
+
+ if (!add_to_epoll)
+ return 0;
+
+ ev.events = EPOLLIN;
+ ev.data.ptr = INT_TO_PTR(SOURCE_SIGNAL);
+
+ r = epoll_ctl(e->epoll_fd, EPOLL_CTL_ADD, e->signal_fd, &ev);
+ if (r < 0) {
+ close_nointr_nofail(e->signal_fd);
+ e->signal_fd = -1;
+
+ return -errno;
+ }
+
+ return 0;
+}
+
+int sd_event_add_signal(sd_event *e, int sig, sd_signal_handler_t callback, void *userdata, sd_event_source **ret) {
+ sd_event_source *s;
+ int r;
+
+ if (!e)
+ return -EINVAL;
+ if (sig <= 0)
+ return -EINVAL;
+ if (sig >= _NSIG)
+ return -EINVAL;
+ if (!callback)
+ return -EINVAL;
+ if (!ret)
+ return -EINVAL;
+
+ if (!e->signal_sources) {
+ e->signal_sources = new0(sd_event_source*, _NSIG);
+ if (!e->signal_sources)
+ return -ENOMEM;
+ } else if (e->signal_sources[sig])
+ return -EBUSY;
+
+ s = source_new(e, SOURCE_SIGNAL);
+ if (!s)
+ return -ENOMEM;
+
+ s->signal.sig = sig;
+ s->signal.callback = callback;
+ s->userdata = userdata;
+
+ e->signal_sources[sig] = s;
+ assert_se(sigaddset(&e->sigset, sig) == 0);
+
+ if (sig != SIGCHLD || e->n_unmuted_child_sources == 0) {
+ r = event_update_signal_fd(e);
+ if (r < 0) {
+ source_free(s);
+ return r;
+ }
+ }
+
+ *ret = s;
+ return 0;
+}
+
+int sd_event_add_child(sd_event *e, pid_t pid, int options, sd_child_handler_t callback, void *userdata, sd_event_source **ret) {
+ sd_event_source *s;
+ int r;
+
+ if (!e)
+ return -EINVAL;
+ if (pid <= 1)
+ return -EINVAL;
+ if (options & ~(WEXITED|WSTOPPED|WCONTINUED))
+ return -EINVAL;
+ if (!callback)
+ return -EINVAL;
+ if (!ret)
+ return -EINVAL;
+
+ r = hashmap_ensure_allocated(&e->child_sources, trivial_hash_func, trivial_compare_func);
+ if (r < 0)
+ return r;
+
+ if (hashmap_contains(e->child_sources, INT_TO_PTR(pid)))
+ return -EBUSY;
+
+ s = source_new(e, SOURCE_CHILD);
+ if (!s)
+ return -ENOMEM;
+
+ s->child.pid = pid;
+ s->child.options = options;
+ s->child.callback = callback;
+ s->userdata = userdata;
+
+ r = hashmap_put(e->child_sources, INT_TO_PTR(pid), s);
+ if (r < 0) {
+ source_free(s);
+ return r;
+ }
+
+ e->n_unmuted_child_sources ++;
+
+ assert_se(sigaddset(&e->sigset, SIGCHLD) == 0);
+
+ if (!e->signal_sources || !e->signal_sources[SIGCHLD]) {
+ r = event_update_signal_fd(e);
+ if (r < 0) {
+ source_free(s);
+ return -errno;
+ }
+ }
+
+ *ret = s;
+ return 0;
+}
+
+int sd_event_add_defer(sd_event *e, sd_defer_handler_t callback, void *userdata, sd_event_source **ret) {
+ sd_event_source *s;
+ int r;
+
+ if (!e)
+ return -EINVAL;
+ if (!ret)
+ return -EINVAL;
+
+ s = source_new(e, SOURCE_DEFER);
+ if (!s)
+ return -ENOMEM;
+
+ s->defer.callback = callback;
+ s->userdata = userdata;
+
+ r = source_set_pending(s, true);
+ if (r < 0) {
+ source_free(s);
+ return r;
+ }
+
+ *ret = s;
+ return 0;
+}
+
+sd_event_source* sd_event_source_ref(sd_event_source *s) {
+ if (!s)
+ return NULL;
+
+ assert_se(REFCNT_INC(s->n_ref) >= 2);
+
+ return s;
+}
+
+sd_event_source* sd_event_source_unref(sd_event_source *s) {
+ if (!s)
+ return NULL;
+
+ if (REFCNT_DEC(s->n_ref) <= 0)
+ source_free(s);
+
+ return NULL;
+}
+
+int sd_event_source_get_pending(sd_event_source *s) {
+ if (!s)
+ return -EINVAL;
+
+ return s->pending;
+}
+
+int sd_event_source_get_io_fd(sd_event_source *s) {
+ if (!s)
+ return -EINVAL;
+ if (s->type != SOURCE_IO)
+ return -EDOM;
+
+ return s->io.fd;
+}
+
+int sd_event_source_get_io_events(sd_event_source *s, uint32_t* events) {
+ if (!s)
+ return -EINVAL;
+ if (s->type != SOURCE_IO)
+ return -EDOM;
+ if (!events)
+ return -EINVAL;
+
+ *events = s->io.events;
+ return 0;
+}
+
+int sd_event_source_set_io_events(sd_event_source *s, uint32_t events) {
+ int r;
+
+ if (!s)
+ return -EINVAL;
+ if (!s->type != SOURCE_IO)
+ return -EDOM;
+ if (events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP))
+ return -EINVAL;
+
+ if (s->io.events == events)
+ return 0;
+
+ if (s->mute != SD_EVENT_MUTED) {
+ r = source_io_register(s, s->io.events, events);
+ if (r < 0)
+ return r;
+ }
+
+ s->io.events = events;
+
+ return 0;
+}
+
+int sd_event_source_get_io_revents(sd_event_source *s, uint32_t* revents) {
+ if (!s)
+ return -EINVAL;
+ if (s->type != SOURCE_IO)
+ return -EDOM;
+ if (!revents)
+ return -EINVAL;
+ if (!s->pending)
+ return -ENODATA;
+
+ *revents = s->io.revents;
+ return 0;
+}
+
+int sd_event_source_get_signal(sd_event_source *s) {
+ if (!s)
+ return -EINVAL;
+ if (s->type != SOURCE_SIGNAL)
+ return -EDOM;
+
+ return s->signal.sig;
+}
+
+int sd_event_source_get_priority(sd_event_source *s, int *priority) {
+ if (!s)
+ return -EINVAL;
+
+ return s->priority;
+}
+
+int sd_event_source_set_priority(sd_event_source *s, int priority) {
+ if (!s)
+ return -EINVAL;
+
+ if (s->priority == priority)
+ return 0;
+
+ s->priority = priority;
+
+ if (s->pending)
+ assert_se(prioq_reshuffle(s->event->pending, s, &s->pending_index) == 0);
+
+ if (s->prepare)
+ assert_se(prioq_reshuffle(s->event->prepare, s, &s->prepare_index) == 0);
+
+ return 0;
+}
+
+int sd_event_source_get_mute(sd_event_source *s, sd_event_mute_t *m) {
+ if (!s)
+ return -EINVAL;
+ if (!m)
+ return -EINVAL;
+
+ *m = s->mute;
+ return 0;
+}
+
+int sd_event_source_set_mute(sd_event_source *s, sd_event_mute_t m) {
+ int r;
+
+ if (!s)
+ return -EINVAL;
+ if (m != SD_EVENT_MUTED && m != SD_EVENT_UNMUTED && !SD_EVENT_ONESHOT)
+ return -EINVAL;
+
+ if (s->mute == m)
+ return 0;
+
+ if (m == SD_EVENT_MUTED) {
+
+ switch (s->type) {
+
+ case SOURCE_IO:
+ r = source_io_unregister(s);
+ if (r < 0)
+ return r;
+
+ s->mute = m;
+ break;
+
+ case SOURCE_MONOTONIC:
+ s->mute = m;
+ prioq_reshuffle(s->event->monotonic, s, &s->time.prioq_index);
+ break;
+
+ case SOURCE_REALTIME:
+ s->mute = m;
+ prioq_reshuffle(s->event->realtime, s, &s->time.prioq_index);
+ break;
+
+ case SOURCE_SIGNAL:
+ s->mute = m;
+ if (s->signal.sig != SIGCHLD || s->event->n_unmuted_child_sources == 0) {
+ assert_se(sigdelset(&s->event->sigset, s->signal.sig) == 0);
+ event_update_signal_fd(s->event);
+ }
+
+ break;
+
+ case SOURCE_CHILD:
+ s->mute = m;
+
+ assert(s->event->n_unmuted_child_sources > 0);
+ s->event->n_unmuted_child_sources--;
+
+ if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
+ assert_se(sigdelset(&s->event->sigset, SIGCHLD) == 0);
+ event_update_signal_fd(s->event);
+ }
+
+ break;
+
+ case SOURCE_DEFER:
+ s->mute = m;
+ break;
+ }
+
+ } else {
+ switch (s->type) {
+
+ case SOURCE_IO:
+ r = source_io_register(s, m, s->io.events);
+ if (r < 0)
+ return r;
+
+ s->mute = m;
+ break;
+
+ case SOURCE_MONOTONIC:
+ s->mute = m;
+ prioq_reshuffle(s->event->monotonic, s, &s->time.prioq_index);
+ break;
+
+ case SOURCE_REALTIME:
+ s->mute = m;
+ prioq_reshuffle(s->event->realtime, s, &s->time.prioq_index);
+ break;
+
+ case SOURCE_SIGNAL:
+ s->mute = m;
+
+ if (s->signal.sig != SIGCHLD || s->event->n_unmuted_child_sources == 0) {
+ assert_se(sigaddset(&s->event->sigset, s->signal.sig) == 0);
+ event_update_signal_fd(s->event);
+ }
+ break;
+
+ case SOURCE_CHILD:
+ s->mute = m;
+
+ if (s->mute == SD_EVENT_MUTED) {
+ s->event->n_unmuted_child_sources++;
+
+ if (!s->event->signal_sources || !s->event->signal_sources[SIGCHLD]) {
+ assert_se(sigaddset(&s->event->sigset, SIGCHLD) == 0);
+ event_update_signal_fd(s->event);
+ }
+ }
+ break;
+
+ case SOURCE_DEFER:
+ s->mute = m;
+ break;
+ }
+ }
+
+ if (s->pending)
+ prioq_reshuffle(s->event->pending, s, &s->pending_index);
+
+ if (s->prepare)
+ prioq_reshuffle(s->event->prepare, s, &s->prepare_index);
+
+ return 0;
+}
+
+int sd_event_source_get_time(sd_event_source *s, uint64_t *usec) {
+ if (!s)
+ return -EINVAL;
+ if (!usec)
+ return -EINVAL;
+ if (s->type != SOURCE_REALTIME && s->type != SOURCE_MONOTONIC)
+ return -EDOM;
+
+ *usec = s->time.next;
+ return 0;
+}
+
+int sd_event_source_set_time(sd_event_source *s, uint64_t usec) {
+ if (!s)
+ return -EINVAL;
+ if (s->type != SOURCE_REALTIME && s->type != SOURCE_MONOTONIC)
+ return -EDOM;
+
+ if (s->time.next == usec)
+ return 0;
+
+ s->time.next = usec;
+
+ if (s->type == SOURCE_REALTIME)
+ prioq_reshuffle(s->event->realtime, s, &s->time.prioq_index);
+ else
+ prioq_reshuffle(s->event->monotonic, s, &s->time.prioq_index);
+
+ return 0;
+}
+
+int sd_event_source_set_prepare(sd_event_source *s, sd_prepare_handler_t callback) {
+ int r;
+
+ if (!s)
+ return -EINVAL;
+
+ if (s->prepare == callback)
+ return 0;
+
+ if (callback && s->prepare) {
+ s->prepare = callback;
+ return 0;
+ }
+
+ r = prioq_ensure_allocated(&s->event->prepare, prepare_prioq_compare);
+ if (r < 0)
+ return r;
+
+ s->prepare = callback;
+
+ if (callback) {
+ r = prioq_put(s->event->prepare, s, &s->prepare_index);
+ if (r < 0)
+ return r;
+ } else
+ prioq_remove(s->event->prepare, s, &s->prepare_index);
+
+ return 0;
+}
+
+void* sd_event_source_get_userdata(sd_event_source *s) {
+ if (!s)
+ return NULL;
+
+ return s->userdata;
+}
+
+static int event_arm_timer(
+ sd_event *e,
+ int timer_fd,
+ Prioq *prioq,
+ usec_t *next) {
+
+ struct itimerspec its = {};
+ sd_event_source *s;
+ int r;
+
+ assert_se(e);
+ assert_se(next);
+
+ s = prioq_peek(prioq);
+ if (!s || s->mute == SD_EVENT_MUTED)
+ return 0;
+
+ if (*next == s->time.next)
+ return 0;
+
+ assert_se(timer_fd >= 0);
+
+ if (s->time.next == 0) {
+ /* We don' want to disarm here, just mean some time looooong ago. */
+ its.it_value.tv_sec = 0;
+ its.it_value.tv_nsec = 1;
+ } else
+ timespec_store(&its.it_value, s->time.next);
+
+ r = timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &its, NULL);
+ if (r < 0)
+ return r;
+
+ *next = s->time.next;
+ return 0;
+}
+
+static int process_io(sd_event *e, sd_event_source *s, uint32_t events) {
+ assert(e);
+ assert(s);
+ assert(s->type == SOURCE_IO);
+
+ s->io.revents = events;
+
+ /*
+ If this is a oneshot event source, then we added it to the
+ epoll with EPOLLONESHOT, hence we know it's not registered
+ anymore. We can save a syscall here...
+ */
+
+ if (s->mute == SD_EVENT_ONESHOT)
+ s->io.registered = false;
+
+ return source_set_pending(s, true);
+}
+
+static int flush_timer(sd_event *e, int fd, uint32_t events) {
+ uint64_t x;
+ ssize_t ss;
+
+ assert(e);
+
+ if (events != EPOLLIN)
+ return -EIO;
+
+ ss = read(fd, &x, sizeof(x));
+ if (ss < 0) {
+ if (errno == EAGAIN || errno == EINTR)
+ return 0;
+
+ return -errno;
+ }
+
+ if (ss != sizeof(x))
+ return -EIO;
+
+ return 0;
+}
+
+static int process_timer(sd_event *e, usec_t n, Prioq *prioq) {
+ sd_event_source *s;
+ int r;
+
+ assert(e);
+
+ for (;;) {
+ s = prioq_peek(prioq);
+ if (!s ||
+ s->time.next > n ||
+ s->mute == SD_EVENT_MUTED ||
+ s->pending)
+ break;
+
+ r = source_set_pending(s, true);
+ if (r < 0)
+ return r;
+
+ r = prioq_reshuffle(prioq, s, &s->time.prioq_index);
+ if (r < 0)
+ return r;
+ }
+
+ return 0;
+}
+
+static int process_child(sd_event *e) {
+ sd_event_source *s;
+ Iterator i;
+ int r;
+
+ assert(e);
+
+ /*
+ So, this is ugly. We iteratively invoke waitid() with P_PID
+ + WNOHANG for each PID we wait for, instead of using
+ P_ALL. This is because we only want to get child
+ information of very specific child processes, and not all
+ of them. We might not have processed the SIGCHLD even of a
+ previous invocation and we don't want to maintain a
+ unbounded *per-child* event queue, hence we really don't
+ want anything flushed out of the kernel's queue that we
+ don't care about. Since this is O(n) this means that if you
+ have a lot of processes you probably want to handle SIGCHLD
+ yourself.
+ */
+
+ HASHMAP_FOREACH(s, e->child_sources, i) {
+ assert(s->type == SOURCE_CHILD);
+
+ if (s->pending)
+ continue;
+
+ if (s->mute == SD_EVENT_MUTED)
+ continue;
+
+ zero(s->child.siginfo);
+ r = waitid(P_PID, s->child.pid, &s->child.siginfo, WNOHANG|s->child.options);
+ if (r < 0)
+ return -errno;
+
+ if (s->child.siginfo.si_pid != 0) {
+ r = source_set_pending(s, true);
+ if (r < 0)
+ return r;
+ }
+ }
+
+ e->processed_children = e->iteration;
+ return 0;
+}
+
+static int process_signal(sd_event *e, uint32_t events) {
+ struct signalfd_siginfo si;
+ bool read_one = false;
+ ssize_t ss;
+ int r;
+
+ if (events != EPOLLIN)
+ return -EIO;
+
+ for (;;) {
+ sd_event_source *s;
+
+ ss = read(e->signal_fd, &si, sizeof(si));
+ if (ss < 0) {
+ if (errno == EAGAIN || errno == EINTR)
+ return read_one;
+
+ return -errno;
+ }
+
+ if (ss != sizeof(si))
+ return -EIO;
+
+ read_one = true;
+
+ if (si.ssi_signo == SIGCHLD) {
+ r = process_child(e);
+ if (r < 0)
+ return r;
+ if (r > 0 || !e->signal_sources[si.ssi_signo])
+ continue;
+ } else {
+ s = e->signal_sources[si.ssi_signo];
+ if (!s)
+ return -EIO;
+ }
+
+ s->signal.siginfo = si;
+ r = source_set_pending(s, true);
+ if (r < 0)
+ return r;
+ }
+
+
+ return 0;
+}
+
+static int source_dispatch(sd_event_source *s) {
+ int r;
+
+ assert(s);
+ assert(s->pending);
+
+ r = source_set_pending(s, false);
+ if (r < 0)
+ return r;
+
+ if (s->mute == SD_EVENT_ONESHOT) {
+ r = sd_event_source_set_mute(s, SD_EVENT_MUTED);
+ if (r < 0)
+ return r;
+ }
+
+ switch (s->type) {
+
+ case SOURCE_IO:
+ r = s->io.callback(s, s->io.fd, s->io.revents, s->userdata);
+ break;
+
+ case SOURCE_MONOTONIC:
+ r = s->time.callback(s, s->time.next, s->userdata);
+ break;
+
+ case SOURCE_REALTIME:
+ r = s->time.callback(s, s->time.next, s->userdata);
+ break;
+
+ case SOURCE_SIGNAL:
+ r = s->signal.callback(s, &s->signal.siginfo, s->userdata);
+ break;
+
+ case SOURCE_CHILD:
+ r = s->child.callback(s, &s->child.siginfo, s->userdata);
+ break;
+
+ case SOURCE_DEFER:
+ r = s->defer.callback(s, s->userdata);
+ break;
+ }
+
+ return r;
+}
+
+static int event_prepare(sd_event *e) {
+ int r;
+
+ assert(e);
+
+ for (;;) {
+ sd_event_source *s;
+
+ s = prioq_peek(e->prepare);
+ if (!s || s->prepare_iteration == e->iteration || s->mute == SD_EVENT_MUTED)
+ break;
+
+ s->prepare_iteration = e->iteration;
+ r = prioq_reshuffle(e->prepare, s, &s->prepare_index);
+ if (r < 0)
+ return r;
+
+ assert(s->prepare);
+ r = s->prepare(s, s->userdata);
+ if (r < 0)
+ return r;
+
+ }
+
+ return 0;
+}
+
+int sd_event_run(sd_event *e, uint64_t timeout) {
+ struct epoll_event ev_queue[EPOLL_QUEUE_MAX];
+ sd_event_source *p;
+ int r, i, m;
+ dual_timestamp n;
+
+ if (!e)
+ return -EINVAL;
+ if (e->quit)
+ return -ESTALE;
+
+ e->iteration++;
+
+ r = event_prepare(e);
+ if (r < 0)
+ return r;
+
+ r = event_arm_timer(e, e->monotonic_fd, e->monotonic, &e->monotonic_next);
+ if (r < 0)
+ return r;
+
+ r = event_arm_timer(e, e->realtime_fd, e->realtime, &e->realtime_next);
+ if (r < 0)
+ return r;
+
+ if (e->iteration == 1 && !hashmap_isempty(e->child_sources))
+ /* On the first iteration, there might be already some
+ * zombies for us to care for, hence, don't wait */
+ timeout = 0;
+ else {
+ p = prioq_peek(e->pending);
+ if (p && p->mute != SD_EVENT_MUTED)
+ timeout = 0;
+ }
+
+ m = epoll_wait(e->epoll_fd, ev_queue, EPOLL_QUEUE_MAX, timeout == (uint64_t) -1 ? -1 : (int) ((timeout + USEC_PER_MSEC - 1) / USEC_PER_MSEC));
+ if (m < 0)
+ return m;
+
+ dual_timestamp_get(&n);
+
+ for (i = 0; i < m; i++) {
+
+ if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_MONOTONIC))
+ r = flush_timer(e, e->monotonic_fd, ev_queue[i].events);
+ else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_REALTIME))
+ r = flush_timer(e, e->realtime_fd, ev_queue[i].events);
+ else if (ev_queue[i].data.ptr == INT_TO_PTR(SOURCE_SIGNAL))
+ r = process_signal(e, ev_queue[i].events);
+ else
+ r = process_io(e, ev_queue[i].data.ptr, ev_queue[i].events);
+
+ if (r < 0)
+ return r;
+ }
+
+ r = process_timer(e, n.monotonic, e->monotonic);
+ if (r < 0)
+ return r;
+
+ r = process_timer(e, n.realtime, e->realtime);
+ if (r < 0)
+ return r;
+
+ if (e->iteration == 1 && e->processed_children != 1) {
+ /* On the first iteration, make sure we really process
+ * all children which might already be zombies. */
+ r = process_child(e);
+ if (r < 0)
+ return r;
+ }
+
+ p = prioq_peek(e->pending);
+ if (!p || p->mute == SD_EVENT_MUTED)
+ return 0;
+
+ return source_dispatch(p);
+}
+
+int sd_event_loop(sd_event *e) {
+ int r;
+
+ if (!e)
+ return -EINVAL;
+
+ while (!e->quit) {
+ r = sd_event_run(e, (uint64_t) -1);
+ if (r < 0)
+ return r;
+ }
+
+ return 0;
+}
+
+int sd_event_quit(sd_event *e) {
+ if (!e)
+ return EINVAL;
+
+ return e->quit;
+}
+
+int sd_event_request_quit(sd_event *e) {
+ if (!e)
+ return -EINVAL;
+
+ e->quit = true;
+ return 0;
+}
+
+sd_event *sd_event_get(sd_event_source *s) {
+ if (!s)
+ return NULL;
+
+ return s->event;
+}
diff --git a/src/libsystemd-bus/test-event.c b/src/libsystemd-bus/test-event.c
new file mode 100644
index 0000000000..8dd47fe8a9
--- /dev/null
+++ b/src/libsystemd-bus/test-event.c
@@ -0,0 +1,197 @@
+/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
+
+/***
+ This file is part of systemd.
+
+ Copyright 2013 Lennart Poettering
+
+ systemd is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 2.1 of the License, or
+ (at your option) any later version.
+
+ systemd is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with systemd; If not, see <http://www.gnu.org/licenses/>.
+***/
+
+#include "sd-event.h"
+#include "log.h"
+#include "util.h"
+
+static int prepare_handler(sd_event_source *s, void *userdata) {
+ log_info("preparing %c", PTR_TO_INT(userdata));
+ return 1;
+}
+
+static bool got_a, got_b, got_c;
+
+static int io_handler(sd_event_source *s, int fd, uint32_t revents, void *userdata) {
+
+ log_info("got IO on %c", PTR_TO_INT(userdata));
+
+ if (userdata == INT_TO_PTR('a')) {
+ assert_se(sd_event_source_set_mute(s, SD_EVENT_MUTED) >= 0);
+ assert_se(!got_a);
+ got_a = true;
+ } else if (userdata == INT_TO_PTR('b')) {
+ assert_se(!got_b);
+ got_b = true;
+ } else
+ assert_not_reached("Yuck!");
+
+ return 1;
+}
+
+static int child_handler(sd_event_source *s, const siginfo_t *si, void *userdata) {
+
+ assert(s);
+ assert(si);
+
+ log_info("got child on %c", PTR_TO_INT(userdata));
+
+ assert(userdata == INT_TO_PTR('f'));
+
+ assert_se(sd_event_request_quit(sd_event_get(s)) >= 0);
+ sd_event_source_unref(s);
+
+ return 1;
+}
+
+static int signal_handler(sd_event_source *s, const struct signalfd_siginfo *si, void *userdata) {
+ sd_event_source *p;
+ sigset_t ss;
+ pid_t pid;
+
+ assert(s);
+ assert(si);
+
+ log_info("got signal on %c", PTR_TO_INT(userdata));
+
+ assert(userdata == INT_TO_PTR('e'));
+
+ assert_se(sigemptyset(&ss) >= 0);
+ assert_se(sigaddset(&ss, SIGCHLD) >= 0);
+ assert_se(sigprocmask(SIG_BLOCK, &ss, NULL) >= 0);
+
+ pid = fork();
+ assert_se(pid >= 0);
+
+ if (pid == 0)
+ _exit(0);
+
+ assert_se(sd_event_add_child(sd_event_get(s), pid, WEXITED, child_handler, INT_TO_PTR('f'), &p) >= 0);
+ assert_se(sd_event_source_set_mute(p, SD_EVENT_ONESHOT) >= 0);
+
+ sd_event_source_unref(s);
+
+ return 1;
+}
+
+static int defer_handler(sd_event_source *s, void *userdata) {
+ sd_event_source *p;
+ sigset_t ss;
+
+ assert(s);
+
+ log_info("got defer on %c", PTR_TO_INT(userdata));
+
+ assert(userdata == INT_TO_PTR('d'));
+
+ assert_se(sigemptyset(&ss) >= 0);
+ assert_se(sigaddset(&ss, SIGUSR1) >= 0);
+ assert_se(sigprocmask(SIG_BLOCK, &ss, NULL) >= 0);
+ assert_se(sd_event_add_signal(sd_event_get(s), SIGUSR1, signal_handler, INT_TO_PTR('e'), &p) >= 0);
+ assert_se(sd_event_source_set_mute(p, SD_EVENT_ONESHOT) >= 0);
+ raise(SIGUSR1);
+
+ sd_event_source_unref(s);
+
+ return 1;
+}
+
+static bool do_quit = false;
+
+static int time_handler(sd_event_source *s, uint64_t usec, void *userdata) {
+ log_info("got timer on %c", PTR_TO_INT(userdata));
+
+ if (userdata == INT_TO_PTR('c')) {
+
+ if (do_quit) {
+ sd_event_source *p;
+
+ assert_se(sd_event_add_defer(sd_event_get(s), defer_handler, INT_TO_PTR('d'), &p) >= 0);
+ assert_se(sd_event_source_set_mute(p, SD_EVENT_ONESHOT) >= 0);
+ } else {
+ assert(!got_c);
+ got_c = true;
+ }
+ } else
+ assert_not_reached("Huh?");
+
+ return 2;
+}
+
+int main(int argc, char *argv[]) {
+ sd_event *e = NULL;
+ sd_event_source *x = NULL, *y = NULL, *z = NULL;
+ static const char ch = 'x';
+ int a[2] = { -1, -1 }, b[2] = { -1, -1};
+
+ assert_se(pipe(a) >= 0);
+ assert_se(pipe(b) >= 0);
+
+ assert_se(sd_event_new(&e) >= 0);
+
+ got_a = false, got_b = false, got_c = false;
+
+ assert_se(sd_event_add_io(e, a[0], EPOLLIN, io_handler, INT_TO_PTR('a'), &x) >= 0);
+ assert_se(sd_event_add_io(e, b[0], EPOLLIN, io_handler, INT_TO_PTR('b'), &y) >= 0);
+ assert_se(sd_event_add_monotonic(e, 0, time_handler, INT_TO_PTR('c'), &z) >= 0);
+
+ assert_se(sd_event_source_set_priority(x, 99) >= 0);
+ assert_se(sd_event_source_set_mute(y, SD_EVENT_ONESHOT) >= 0);
+ assert_se(sd_event_source_set_prepare(x, prepare_handler) >= 0);
+ assert_se(sd_event_source_set_priority(z, 50) >= 0);
+ assert_se(sd_event_source_set_mute(z, SD_EVENT_ONESHOT) >= 0);
+ assert_se(sd_event_source_set_prepare(z, prepare_handler) >= 0);
+
+ assert_se(write(a[1], &ch, 1) >= 0);
+ assert_se(write(b[1], &ch, 1) >= 0);
+
+ assert_se(!got_a && !got_b && !got_c);
+
+ assert_se(sd_event_run(e, (uint64_t) -1) >= 1);
+
+ assert_se(!got_a && got_b && !got_c);
+
+ assert_se(sd_event_run(e, (uint64_t) -1) >= 1);
+
+ assert_se(!got_a && got_b && got_c);
+
+ assert_se(sd_event_run(e, (uint64_t) -1) >= 1);
+
+ assert_se(got_a && got_b && got_c);
+
+ sd_event_source_unref(x);
+ sd_event_source_unref(y);
+
+ do_quit = true;
+ assert_se(sd_event_source_set_time(z, now(CLOCK_MONOTONIC) + 200 * USEC_PER_MSEC) >= 0);
+ assert_se(sd_event_source_set_mute(z, SD_EVENT_ONESHOT) >= 0);
+
+ assert_se(sd_event_loop(e) >= 0);
+
+ sd_event_source_unref(z);
+
+ sd_event_unref(e);
+
+ close_pipe(a);
+ close_pipe(b);
+
+ return 0;
+}