From da7e457c5b9339721454ae8401a03ffdd781e6a9 Mon Sep 17 00:00:00 2001 From: Lennart Poettering Date: Fri, 11 Oct 2013 00:49:11 +0200 Subject: event: implement quit handlers Quit handlers are executed when an event loop is terminated via sd_event_request_quit(). They are in a way atexit() handlers that are executed in a well-defined environment, time and thread: from the event loop thread when the event loop finishes. --- src/libsystemd-bus/sd-event.c | 309 ++++++++++++++++++++++++++++++---------- src/libsystemd-bus/test-event.c | 14 +- 2 files changed, 243 insertions(+), 80 deletions(-) (limited to 'src/libsystemd-bus') diff --git a/src/libsystemd-bus/sd-event.c b/src/libsystemd-bus/sd-event.c index e3110c3d6b..c52e4cd501 100644 --- a/src/libsystemd-bus/sd-event.c +++ b/src/libsystemd-bus/sd-event.c @@ -24,7 +24,6 @@ #include #include "macro.h" -#include "refcnt.h" #include "prioq.h" #include "hashmap.h" #include "util.h" @@ -42,18 +41,19 @@ typedef enum EventSourceType { SOURCE_REALTIME, SOURCE_SIGNAL, SOURCE_CHILD, - SOURCE_DEFER + SOURCE_DEFER, + SOURCE_QUIT } EventSourceType; struct sd_event_source { - RefCount n_ref; + unsigned n_ref; sd_event *event; void *userdata; sd_prepare_handler_t prepare; EventSourceType type:4; - sd_event_mute_t mute:3; + int mute:3; bool pending:1; int priority; @@ -90,11 +90,15 @@ struct sd_event_source { struct { sd_defer_handler_t callback; } defer; + struct { + sd_quit_handler_t callback; + unsigned prioq_index; + } quit; }; }; struct sd_event { - RefCount n_ref; + unsigned n_ref; int epoll_fd; int signal_fd; @@ -115,21 +119,24 @@ struct sd_event { Prioq *realtime_earliest; Prioq *realtime_latest; + usec_t realtime_next, monotonic_next; + usec_t perturb; + sigset_t sigset; sd_event_source **signal_sources; Hashmap *child_sources; unsigned n_unmuted_child_sources; - unsigned iteration; + Prioq *quit; - usec_t realtime_next, monotonic_next; - usec_t perturb; + pid_t original_pid; - bool quit:1; - bool need_process_child:1; + unsigned iteration; + int state; - pid_t original_pid; + bool quit_requested:1; + bool need_process_child:1; }; static int pending_prioq_compare(const void *a, const void *b) { @@ -236,8 +243,8 @@ static int earliest_time_prioq_compare(const void *a, const void *b) { static int latest_time_prioq_compare(const void *a, const void *b) { const sd_event_source *x = a, *y = b; - assert(x->type == SOURCE_MONOTONIC || x->type == SOURCE_REALTIME); - assert(y->type == SOURCE_MONOTONIC || y->type == SOURCE_REALTIME); + assert((x->type == SOURCE_MONOTONIC && y->type == SOURCE_MONOTONIC) || + (x->type == SOURCE_REALTIME && y->type == SOURCE_REALTIME)); /* Unmuted ones first */ if (x->mute != SD_EVENT_MUTED && y->mute == SD_EVENT_MUTED) @@ -266,6 +273,33 @@ static int latest_time_prioq_compare(const void *a, const void *b) { return 0; } +static int quit_prioq_compare(const void *a, const void *b) { + const sd_event_source *x = a, *y = b; + + assert(x->type == SOURCE_QUIT); + assert(y->type == SOURCE_QUIT); + + /* Unmuted ones first */ + if (x->mute != SD_EVENT_MUTED && y->mute == SD_EVENT_MUTED) + return -1; + if (x->mute == SD_EVENT_MUTED && y->mute != SD_EVENT_MUTED) + return 1; + + /* Lower priority values first */ + if (x->priority < y->priority) + return -1; + if (x->priority > y->priority) + return 1; + + /* Stability for the rest */ + if (x < y) + return -1; + if (x > y) + return 1; + + return 0; +} + static void event_free(sd_event *e) { assert(e); @@ -287,6 +321,7 @@ static void event_free(sd_event *e) { prioq_free(e->monotonic_latest); prioq_free(e->realtime_earliest); prioq_free(e->realtime_latest); + prioq_free(e->quit); free(e->signal_sources); @@ -305,7 +340,7 @@ int sd_event_new(sd_event** ret) { if (!e) return -ENOMEM; - e->n_ref = REFCNT_INIT; + e->n_ref = 1; e->signal_fd = e->realtime_fd = e->monotonic_fd = e->epoll_fd = -1; e->realtime_next = e->monotonic_next = (usec_t) -1; e->original_pid = getpid(); @@ -336,7 +371,8 @@ sd_event* sd_event_ref(sd_event *e) { if (!e) return NULL; - assert_se(REFCNT_INC(e->n_ref) >= 2); + assert(e->n_ref >= 1); + e->n_ref++; return e; } @@ -345,7 +381,10 @@ sd_event* sd_event_unref(sd_event *e) { if (!e) return NULL; - if (REFCNT_DEC(e->n_ref) <= 0) + assert(e->n_ref >= 1); + e->n_ref--; + + if (e->n_ref <= 0) event_free(e); return NULL; @@ -377,18 +416,18 @@ static int source_io_unregister(sd_event_source *s) { return 0; } -static int source_io_register(sd_event_source *s, sd_event_mute_t m, uint32_t events) { +static int source_io_register(sd_event_source *s, int mute, uint32_t events) { struct epoll_event ev = {}; int r; assert(s); assert(s->type == SOURCE_IO); - assert(m != SD_EVENT_MUTED); + assert(mute != SD_EVENT_MUTED); ev.events = events; ev.data.ptr = s; - if (m == SD_EVENT_ONESHOT) + if (mute == SD_EVENT_ONESHOT) ev.events |= EPOLLONESHOT; if (s->io.registered) @@ -451,6 +490,10 @@ static void source_free(sd_event_source *s) { } break; + + case SOURCE_QUIT: + prioq_remove(s->event->quit, s, &s->quit.prioq_index); + break; } if (s->pending) @@ -469,6 +512,7 @@ static int source_set_pending(sd_event_source *s, bool b) { int r; assert(s); + assert(s->type != SOURCE_QUIT); if (s->pending == b) return 0; @@ -498,10 +542,9 @@ static sd_event_source *source_new(sd_event *e, EventSourceType type) { if (!s) return NULL; - s->n_ref = REFCNT_INIT; + s->n_ref = 1; s->event = sd_event_ref(e); s->type = type; - s->mute = SD_EVENT_UNMUTED; s->pending_index = s->prepare_index = PRIOQ_IDX_NULL; return s; @@ -528,6 +571,7 @@ int sd_event_add_io( return -EINVAL; if (!ret) return -EINVAL; + assert_return(e->state != SD_EVENT_FINISHED, -ESTALE); if (event_pid_changed(e)) return -ECHILD; @@ -539,6 +583,7 @@ int sd_event_add_io( s->io.events = events; s->io.callback = callback; s->userdata = userdata; + s->mute = SD_EVENT_UNMUTED; r = source_io_register(s, s->mute, events); if (r < 0) { @@ -620,6 +665,7 @@ static int event_add_time_internal( return -EINVAL; if (accuracy == (uint64_t) -1) return -EINVAL; + assert_return(e->state != SD_EVENT_FINISHED, -ESTALE); if (event_pid_changed(e)) return -ECHILD; @@ -652,9 +698,9 @@ static int event_add_time_internal( s->time.next = usec; s->time.accuracy = accuracy == 0 ? DEFAULT_ACCURACY_USEC : accuracy; s->time.callback = callback; - s->time.earliest_index = PRIOQ_IDX_NULL; - s->time.latest_index = PRIOQ_IDX_NULL; + s->time.earliest_index = s->time.latest_index = PRIOQ_IDX_NULL; s->userdata = userdata; + s->mute = SD_EVENT_ONESHOT; r = prioq_put(*earliest, s, &s->time.earliest_index); if (r < 0) @@ -726,6 +772,7 @@ int sd_event_add_signal(sd_event *e, int sig, sd_signal_handler_t callback, void return -EINVAL; if (!ret) return -EINVAL; + assert_return(e->state != SD_EVENT_FINISHED, -ESTALE); if (event_pid_changed(e)) return -ECHILD; @@ -743,6 +790,7 @@ int sd_event_add_signal(sd_event *e, int sig, sd_signal_handler_t callback, void s->signal.sig = sig; s->signal.callback = callback; s->userdata = userdata; + s->mute = SD_EVENT_UNMUTED; e->signal_sources[sig] = s; assert_se(sigaddset(&e->sigset, sig) == 0); @@ -773,6 +821,7 @@ int sd_event_add_child(sd_event *e, pid_t pid, int options, sd_child_handler_t c return -EINVAL; if (!ret) return -EINVAL; + assert_return(e->state != SD_EVENT_FINISHED, -ESTALE); if (event_pid_changed(e)) return -ECHILD; @@ -791,6 +840,7 @@ int sd_event_add_child(sd_event *e, pid_t pid, int options, sd_child_handler_t c s->child.options = options; s->child.callback = callback; s->userdata = userdata; + s->mute = SD_EVENT_ONESHOT; r = hashmap_put(e->child_sources, INT_TO_PTR(pid), s); if (r < 0) { @@ -824,6 +874,7 @@ int sd_event_add_defer(sd_event *e, sd_defer_handler_t callback, void *userdata, return -EINVAL; if (!ret) return -EINVAL; + assert_return(e->state != SD_EVENT_FINISHED, -ESTALE); if (event_pid_changed(e)) return -ECHILD; @@ -833,6 +884,7 @@ int sd_event_add_defer(sd_event *e, sd_defer_handler_t callback, void *userdata, s->defer.callback = callback; s->userdata = userdata; + s->mute = SD_EVENT_ONESHOT; r = source_set_pending(s, true); if (r < 0) { @@ -844,26 +896,61 @@ int sd_event_add_defer(sd_event *e, sd_defer_handler_t callback, void *userdata, return 0; } -sd_event_source* sd_event_source_ref(sd_event_source *s) { +int sd_event_add_quit(sd_event *e, sd_quit_handler_t callback, void *userdata, sd_event_source **ret) { + sd_event_source *s; + int r; + + assert_return(e, -EINVAL); + assert_return(callback, -EINVAL); + assert_return(e->state != SD_EVENT_FINISHED, -ESTALE); + assert_return(!event_pid_changed(e), -ECHILD); + + if (!e->quit) { + e->quit = prioq_new(quit_prioq_compare); + if (!e->quit) + return -ENOMEM; + } + + s = source_new(e, SOURCE_QUIT); if (!s) - return NULL; + return -ENOMEM; - assert_se(REFCNT_INC(s->n_ref) >= 2); + s->quit.callback = callback; + s->userdata = userdata; + s->quit.prioq_index = PRIOQ_IDX_NULL; + s->mute = SD_EVENT_ONESHOT; + + r = prioq_put(s->event->quit, s, &s->quit.prioq_index); + if (r < 0) { + source_free(s); + return r; + } + + *ret = s; + return 0; +} + +sd_event_source* sd_event_source_ref(sd_event_source *s) { + assert_return(s, NULL); + + assert(s->n_ref >= 1); + s->n_ref++; return s; } sd_event_source* sd_event_source_unref(sd_event_source *s) { - if (!s) - return NULL; + assert_return(s, NULL); - if (REFCNT_DEC(s->n_ref) <= 0) + assert(s->n_ref >= 1); + s->n_ref--; + + if (s->n_ref <= 0) source_free(s); return NULL; } - sd_event *sd_event_get(sd_event_source *s) { if (!s) return NULL; @@ -874,6 +961,7 @@ sd_event *sd_event_get(sd_event_source *s) { int sd_event_source_get_pending(sd_event_source *s) { if (!s) return -EINVAL; + assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE); if (event_pid_changed(s->event)) return -ECHILD; @@ -914,6 +1002,7 @@ int sd_event_source_set_io_events(sd_event_source *s, uint32_t events) { return -EDOM; if (events & ~(EPOLLIN|EPOLLOUT|EPOLLRDHUP|EPOLLPRI|EPOLLERR|EPOLLHUP)) return -EINVAL; + assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE); if (event_pid_changed(s->event)) return -ECHILD; @@ -952,6 +1041,7 @@ int sd_event_source_get_signal(sd_event_source *s) { return -EINVAL; if (s->type != SOURCE_SIGNAL) return -EDOM; + assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE); if (event_pid_changed(s->event)) return -ECHILD; @@ -970,6 +1060,7 @@ int sd_event_source_get_priority(sd_event_source *s, int *priority) { int sd_event_source_set_priority(sd_event_source *s, int priority) { if (!s) return -EINVAL; + assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE); if (event_pid_changed(s->event)) return -ECHILD; @@ -987,7 +1078,7 @@ int sd_event_source_set_priority(sd_event_source *s, int priority) { return 0; } -int sd_event_source_get_mute(sd_event_source *s, sd_event_mute_t *m) { +int sd_event_source_get_mute(sd_event_source *s, int *m) { if (!s) return -EINVAL; if (!m) @@ -999,13 +1090,14 @@ int sd_event_source_get_mute(sd_event_source *s, sd_event_mute_t *m) { return 0; } -int sd_event_source_set_mute(sd_event_source *s, sd_event_mute_t m) { +int sd_event_source_set_mute(sd_event_source *s, int m) { int r; if (!s) return -EINVAL; if (m != SD_EVENT_MUTED && m != SD_EVENT_UNMUTED && !SD_EVENT_ONESHOT) return -EINVAL; + assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE); if (event_pid_changed(s->event)) return -ECHILD; @@ -1059,6 +1151,7 @@ int sd_event_source_set_mute(sd_event_source *s, sd_event_mute_t m) { break; case SOURCE_DEFER: + case SOURCE_QUIT: s->mute = m; break; } @@ -1109,6 +1202,7 @@ int sd_event_source_set_mute(sd_event_source *s, sd_event_mute_t m) { break; case SOURCE_DEFER: + case SOURCE_QUIT: s->mute = m; break; } @@ -1144,6 +1238,7 @@ int sd_event_source_set_time(sd_event_source *s, uint64_t usec) { return -EINVAL; if (s->type != SOURCE_REALTIME && s->type != SOURCE_MONOTONIC) return -EDOM; + assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE); if (event_pid_changed(s->event)) return -ECHILD; @@ -1168,6 +1263,7 @@ int sd_event_source_set_time_accuracy(sd_event_source *s, uint64_t usec) { return -EINVAL; if (s->type != SOURCE_MONOTONIC && s->type != SOURCE_REALTIME) return -EDOM; + assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE); if (event_pid_changed(s->event)) return -ECHILD; @@ -1177,7 +1273,6 @@ int sd_event_source_set_time_accuracy(sd_event_source *s, uint64_t usec) { if (s->time.accuracy == usec) return 0; - s->time.accuracy = usec; if (s->type == SOURCE_REALTIME) @@ -1205,10 +1300,10 @@ int sd_event_source_get_time_accuracy(sd_event_source *s, uint64_t *usec) { int sd_event_source_set_prepare(sd_event_source *s, sd_prepare_handler_t callback) { int r; - if (!s) - return -EINVAL; - if (event_pid_changed(s->event)) - return -ECHILD; + assert_return(s, -EINVAL); + assert_return(s->type != SOURCE_QUIT, -EDOM); + assert_return(s->event->state != SD_EVENT_FINISHED, -ESTALE); + assert_return(!event_pid_changed(s->event), -ECHILD); if (s->prepare == callback) return 0; @@ -1235,8 +1330,7 @@ int sd_event_source_set_prepare(sd_event_source *s, sd_prepare_handler_t callbac } void* sd_event_source_get_userdata(sd_event_source *s) { - if (!s) - return NULL; + assert_return(s, NULL); return s->userdata; } @@ -1361,6 +1455,7 @@ static int flush_timer(sd_event *e, int fd, uint32_t events) { ssize_t ss; assert(e); + assert(fd >= 0); if (events != EPOLLIN) return -EIO; @@ -1457,6 +1552,8 @@ static int process_signal(sd_event *e, uint32_t events) { ssize_t ss; int r; + assert(e); + if (events != EPOLLIN) return -EIO; @@ -1502,11 +1599,13 @@ static int source_dispatch(sd_event_source *s) { int r; assert(s); - assert(s->pending); + assert(s->pending || s->type == SOURCE_QUIT); - r = source_set_pending(s, false); - if (r < 0) - return r; + if (s->type != SOURCE_DEFER && s->type != SOURCE_QUIT) { + r = source_set_pending(s, false); + if (r < 0) + return r; + } if (s->mute == SD_EVENT_ONESHOT) { r = sd_event_source_set_mute(s, SD_EVENT_MUTED); @@ -1539,6 +1638,10 @@ static int source_dispatch(sd_event_source *s) { case SOURCE_DEFER: r = s->defer.callback(s, s->userdata); break; + + case SOURCE_QUIT: + r = s->quit.callback(s, s->userdata); + break; } return r; @@ -1571,9 +1674,35 @@ static int event_prepare(sd_event *e) { return 0; } +static int dispatch_quit(sd_event *e) { + sd_event_source *p; + int r; + + assert(e); + + p = prioq_peek(e->quit); + if (!p || p->mute == SD_EVENT_MUTED) { + e->state = SD_EVENT_FINISHED; + return 0; + } + + sd_event_ref(e); + e->iteration++; + e->state = SD_EVENT_QUITTING; + + r = source_dispatch(p); + + e->state = SD_EVENT_PASSIVE; + sd_event_unref(e); + + return r; +} + static sd_event_source* event_next_pending(sd_event *e) { sd_event_source *p; + assert(e); + p = prioq_peek(e->pending); if (!p) return NULL; @@ -1590,18 +1719,21 @@ int sd_event_run(sd_event *e, uint64_t timeout) { int r, i, m; dual_timestamp n; - if (!e) - return -EINVAL; - if (e->quit) - return -ESTALE; - if (event_pid_changed(e)) - return -ECHILD; + assert_return(e, -EINVAL); + assert_return(!event_pid_changed(e), -ECHILD); + assert_return(e->state != SD_EVENT_FINISHED, -ESTALE); + assert_return(e->state == SD_EVENT_PASSIVE, -EBUSY); + + if (e->quit_requested) + return dispatch_quit(e); + sd_event_ref(e); e->iteration++; + e->state = SD_EVENT_RUNNING; r = event_prepare(e); if (r < 0) - return r; + goto finish; if (event_next_pending(e) || e->need_process_child) timeout = 0; @@ -1609,17 +1741,19 @@ int sd_event_run(sd_event *e, uint64_t timeout) { if (timeout > 0) { r = event_arm_timer(e, e->monotonic_fd, e->monotonic_earliest, e->monotonic_latest, &e->monotonic_next); if (r < 0) - return r; + goto finish; r = event_arm_timer(e, e->realtime_fd, e->realtime_earliest, e->realtime_latest, &e->realtime_next); if (r < 0) - return r; + goto finish; } m = epoll_wait(e->epoll_fd, ev_queue, EPOLL_QUEUE_MAX, timeout == (uint64_t) -1 ? -1 : (int) ((timeout + USEC_PER_MSEC - 1) / USEC_PER_MSEC)); - if (m < 0) - return m; + if (m < 0) { + r = m; + goto finish; + } dual_timestamp_get(&n); @@ -1635,62 +1769,79 @@ int sd_event_run(sd_event *e, uint64_t timeout) { r = process_io(e, ev_queue[i].data.ptr, ev_queue[i].events); if (r < 0) - return r; + goto finish; } r = process_timer(e, n.monotonic, e->monotonic_earliest, e->monotonic_latest); if (r < 0) - return r; + goto finish; r = process_timer(e, n.realtime, e->realtime_earliest, e->realtime_latest); if (r < 0) - return r; + goto finish; if (e->need_process_child) { r = process_child(e); if (r < 0) - return r; + goto finish; } p = event_next_pending(e); - if (!p) - return 0; + if (!p) { + r = 0; + goto finish; + } + + r = source_dispatch(p); - return source_dispatch(p); +finish: + e->state = SD_EVENT_PASSIVE; + sd_event_unref(e); + + return r; } int sd_event_loop(sd_event *e) { int r; - if (!e) - return -EINVAL; - if (event_pid_changed(e)) - return -ECHILD; + assert_return(e, -EINVAL); + assert_return(!event_pid_changed(e), -ECHILD); + assert_return(e->state == SD_EVENT_PASSIVE, -EBUSY); + + sd_event_ref(e); - while (!e->quit) { + while (e->state != SD_EVENT_FINISHED) { r = sd_event_run(e, (uint64_t) -1); if (r < 0) - return r; + goto finish; } - return 0; + r = 0; + +finish: + sd_event_unref(e); + return r; } -int sd_event_quit(sd_event *e) { - if (!e) - return EINVAL; - if (event_pid_changed(e)) - return -ECHILD; +int sd_event_get_state(sd_event *e) { + assert_return(e, -EINVAL); + assert_return(!event_pid_changed(e), -ECHILD); + + return e->state; +} + +int sd_event_get_quit(sd_event *e) { + assert_return(e, -EINVAL); + assert_return(!event_pid_changed(e), -ECHILD); - return e->quit; + return e->quit_requested; } int sd_event_request_quit(sd_event *e) { - if (!e) - return -EINVAL; - if (event_pid_changed(e)) - return -ECHILD; + assert_return(e, -EINVAL); + assert_return(e->state != SD_EVENT_FINISHED, -ESTALE); + assert_return(!event_pid_changed(e), -ECHILD); - e->quit = true; + e->quit_requested = true; return 0; } diff --git a/src/libsystemd-bus/test-event.c b/src/libsystemd-bus/test-event.c index acb5cee367..7004f61b03 100644 --- a/src/libsystemd-bus/test-event.c +++ b/src/libsystemd-bus/test-event.c @@ -136,9 +136,19 @@ static int time_handler(sd_event_source *s, uint64_t usec, void *userdata) { return 2; } +static bool got_quit = false; + +static int quit_handler(sd_event_source *s, void *userdata) { + log_info("got quit handler on %c", PTR_TO_INT(userdata)); + + got_quit = true; + + return 3; +} + int main(int argc, char *argv[]) { sd_event *e = NULL; - sd_event_source *x = NULL, *y = NULL, *z = NULL; + sd_event_source *x = NULL, *y = NULL, *z = NULL, *q = NULL; static const char ch = 'x'; int a[2] = { -1, -1 }, b[2] = { -1, -1}; @@ -152,6 +162,7 @@ int main(int argc, char *argv[]) { assert_se(sd_event_add_io(e, a[0], EPOLLIN, io_handler, INT_TO_PTR('a'), &x) >= 0); assert_se(sd_event_add_io(e, b[0], EPOLLIN, io_handler, INT_TO_PTR('b'), &y) >= 0); assert_se(sd_event_add_monotonic(e, 0, 0, time_handler, INT_TO_PTR('c'), &z) >= 0); + assert_se(sd_event_add_quit(e, quit_handler, INT_TO_PTR('g'), &q) >= 0); assert_se(sd_event_source_set_priority(x, 99) >= 0); assert_se(sd_event_source_set_mute(y, SD_EVENT_ONESHOT) >= 0); @@ -187,6 +198,7 @@ int main(int argc, char *argv[]) { assert_se(sd_event_loop(e) >= 0); sd_event_source_unref(z); + sd_event_source_unref(q); sd_event_unref(e); -- cgit v1.2.3-54-g00ecf