diff options
-rw-r--r-- | src/libsystemd-bus/sd-event.c | 338 | ||||
-rw-r--r-- | src/libsystemd-bus/test-event.c | 2 | ||||
-rw-r--r-- | src/shared/prioq.c | 3 | ||||
-rw-r--r-- | src/systemd/sd-event.h | 8 |
4 files changed, 275 insertions, 76 deletions
diff --git a/src/libsystemd-bus/sd-event.c b/src/libsystemd-bus/sd-event.c index de96fde8e2..511b271d45 100644 --- a/src/libsystemd-bus/sd-event.c +++ b/src/libsystemd-bus/sd-event.c @@ -29,10 +29,12 @@ #include "hashmap.h" #include "util.h" #include "time-util.h" +#include "sd-id128.h" #include "sd-event.h" #define EPOLL_QUEUE_MAX 64 +#define DEFAULT_ACCURACY_USEC (250 * USEC_PER_MSEC) typedef enum EventSourceType { SOURCE_IO, @@ -70,8 +72,9 @@ struct sd_event_source { } io; struct { sd_time_handler_t callback; - usec_t next; - unsigned prioq_index; + usec_t next, accuracy; + unsigned earliest_index; + unsigned latest_index; } time; struct { sd_signal_handler_t callback; @@ -100,8 +103,17 @@ struct sd_event { Prioq *pending; Prioq *prepare; - Prioq *monotonic; - Prioq *realtime; + + /* For both clocks we maintain two priority queues each, one + * ordered for the earliest times the events may be + * dispatched, and one ordered by the latest times they must + * have been dispatched. The range between the top entries in + * the two prioqs is the time window we can freely schedule + * wakeups in */ + Prioq *monotonic_earliest; + Prioq *monotonic_latest; + Prioq *realtime_earliest; + Prioq *realtime_latest; sigset_t sigset; sd_event_source **signal_sources; @@ -110,11 +122,13 @@ struct sd_event { unsigned n_unmuted_child_sources; unsigned iteration; - unsigned processed_children; usec_t realtime_next, monotonic_next; + usec_t perturb; + bool quit; + bool need_process_child; }; static int pending_prioq_compare(const void *a, const void *b) { @@ -185,7 +199,7 @@ static int prepare_prioq_compare(const void *a, const void *b) { return 0; } -static int time_prioq_compare(const void *a, const void *b) { +static int earliest_time_prioq_compare(const void *a, const void *b) { const sd_event_source *x = a, *y = b; assert(x->type == SOURCE_MONOTONIC || x->type == SOURCE_REALTIME); @@ -218,6 +232,39 @@ static int time_prioq_compare(const void *a, const void *b) { return 0; } +static int latest_time_prioq_compare(const void *a, const void *b) { + const sd_event_source *x = a, *y = b; + + assert(x->type == SOURCE_MONOTONIC || x->type == SOURCE_REALTIME); + assert(y->type == SOURCE_MONOTONIC || y->type == SOURCE_REALTIME); + + /* Unmuted ones first */ + if (x->mute != SD_EVENT_MUTED && y->mute == SD_EVENT_MUTED) + return -1; + if (x->mute == SD_EVENT_MUTED && y->mute != SD_EVENT_MUTED) + return 1; + + /* Move the pending ones to the end */ + if (!x->pending && y->pending) + return -1; + if (x->pending && !y->pending) + return 1; + + /* Order by time */ + if (x->time.next + x->time.accuracy < y->time.next + y->time.accuracy) + return -1; + if (x->time.next + x->time.accuracy > y->time.next + y->time.accuracy) + return -1; + + /* Stability for the rest */ + if (x < y) + return -1; + if (x > y) + return 1; + + return 0; +} + static void event_free(sd_event *e) { assert(e); @@ -235,8 +282,10 @@ static void event_free(sd_event *e) { prioq_free(e->pending); prioq_free(e->prepare); - prioq_free(e->monotonic); - prioq_free(e->realtime); + prioq_free(e->monotonic_earliest); + prioq_free(e->monotonic_latest); + prioq_free(e->realtime_earliest); + prioq_free(e->realtime_latest); free(e->signal_sources); @@ -357,11 +406,13 @@ static void source_free(sd_event_source *s) { break; case SOURCE_MONOTONIC: - prioq_remove(s->event->monotonic, s, &s->time.prioq_index); + prioq_remove(s->event->monotonic_earliest, s, &s->time.earliest_index); + prioq_remove(s->event->monotonic_latest, s, &s->time.latest_index); break; case SOURCE_REALTIME: - prioq_remove(s->event->realtime, s, &s->time.prioq_index); + prioq_remove(s->event->realtime_earliest, s, &s->time.earliest_index); + prioq_remove(s->event->realtime_latest, s, &s->time.latest_index); break; case SOURCE_SIGNAL: @@ -494,6 +545,7 @@ static int event_setup_timer_fd( struct epoll_event ev = {}; int r, fd; + sd_id128_t bootid; assert(e); assert(timer_fd); @@ -514,6 +566,17 @@ static int event_setup_timer_fd( return -errno; } + /* When we sleep for longer, we try to realign the wakeup to + the same time wihtin each second, so that events all across + the system can be coalesced into a single CPU + wakeup. However, let's take some system-specific randomness + for this value, so that in a network of systems with synced + clocks timer events are distributed a bit. Here, we + calculate a perturbation usec offset from the boot ID. */ + + if (sd_id128_get_boot(&bootid) >= 0) + e->perturb = (bootid.qwords[0] ^ bootid.qwords[1]) % USEC_PER_SEC; + *timer_fd = fd; return 0; } @@ -523,8 +586,10 @@ static int event_add_time_internal( EventSourceType type, int *timer_fd, clockid_t id, - Prioq **prioq, + Prioq **earliest, + Prioq **latest, uint64_t usec, + uint64_t accuracy, sd_time_handler_t callback, void *userdata, sd_event_source **ret) { @@ -538,13 +603,24 @@ static int event_add_time_internal( return -EINVAL; if (!ret) return -EINVAL; + if (usec == (uint64_t) -1) + return -EINVAL; + if (accuracy == (uint64_t) -1) + return -EINVAL; assert(timer_fd); - assert(prioq); + assert(earliest); + assert(latest); + + if (!*earliest) { + *earliest = prioq_new(earliest_time_prioq_compare); + if (!*earliest) + return -ENOMEM; + } - if (!*prioq) { - *prioq = prioq_new(time_prioq_compare); - if (!*prioq) + if (!*latest) { + *latest = prioq_new(latest_time_prioq_compare); + if (!*latest) return -ENOMEM; } @@ -559,26 +635,34 @@ static int event_add_time_internal( return -ENOMEM; s->time.next = usec; + s->time.accuracy = accuracy == 0 ? DEFAULT_ACCURACY_USEC : accuracy; s->time.callback = callback; - s->time.prioq_index = PRIOQ_IDX_NULL; + s->time.earliest_index = PRIOQ_IDX_NULL; + s->time.latest_index = PRIOQ_IDX_NULL; s->userdata = userdata; - r = prioq_put(*prioq, s, &s->time.prioq_index); - if (r < 0) { - source_free(s); - return r; - } + r = prioq_put(*earliest, s, &s->time.earliest_index); + if (r < 0) + goto fail; + + r = prioq_put(*latest, s, &s->time.latest_index); + if (r < 0) + goto fail; *ret = s; return 0; + +fail: + source_free(s); + return r; } -int sd_event_add_monotonic(sd_event *e, uint64_t usec, sd_time_handler_t callback, void *userdata, sd_event_source **ret) { - return event_add_time_internal(e, SOURCE_MONOTONIC, &e->monotonic_fd, CLOCK_MONOTONIC, &e->monotonic, usec, callback, userdata, ret); +int sd_event_add_monotonic(sd_event *e, uint64_t usec, uint64_t accuracy, sd_time_handler_t callback, void *userdata, sd_event_source **ret) { + return event_add_time_internal(e, SOURCE_MONOTONIC, &e->monotonic_fd, CLOCK_MONOTONIC, &e->monotonic_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret); } -int sd_event_add_realtime(sd_event *e, uint64_t usec, sd_time_handler_t callback, void *userdata, sd_event_source **ret) { - return event_add_time_internal(e, SOURCE_REALTIME, &e->realtime_fd, CLOCK_REALTIME, &e->realtime, usec, callback, userdata, ret); +int sd_event_add_realtime(sd_event *e, uint64_t usec, uint64_t accuracy, sd_time_handler_t callback, void *userdata, sd_event_source **ret) { + return event_add_time_internal(e, SOURCE_REALTIME, &e->realtime_fd, CLOCK_REALTIME, &e->realtime_earliest, &e->monotonic_latest, usec, accuracy, callback, userdata, ret); } static int event_update_signal_fd(sd_event *e) { @@ -707,6 +791,8 @@ int sd_event_add_child(sd_event *e, pid_t pid, int options, sd_child_handler_t c } } + e->need_process_child = true; + *ret = s; return 0; } @@ -848,10 +934,10 @@ int sd_event_source_set_priority(sd_event_source *s, int priority) { s->priority = priority; if (s->pending) - assert_se(prioq_reshuffle(s->event->pending, s, &s->pending_index) == 0); + prioq_reshuffle(s->event->pending, s, &s->pending_index); if (s->prepare) - assert_se(prioq_reshuffle(s->event->prepare, s, &s->prepare_index) == 0); + prioq_reshuffle(s->event->prepare, s, &s->prepare_index); return 0; } @@ -891,12 +977,14 @@ int sd_event_source_set_mute(sd_event_source *s, sd_event_mute_t m) { case SOURCE_MONOTONIC: s->mute = m; - prioq_reshuffle(s->event->monotonic, s, &s->time.prioq_index); + prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index); + prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index); break; case SOURCE_REALTIME: s->mute = m; - prioq_reshuffle(s->event->realtime, s, &s->time.prioq_index); + prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index); + prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index); break; case SOURCE_SIGNAL: @@ -939,12 +1027,14 @@ int sd_event_source_set_mute(sd_event_source *s, sd_event_mute_t m) { case SOURCE_MONOTONIC: s->mute = m; - prioq_reshuffle(s->event->monotonic, s, &s->time.prioq_index); + prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index); + prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index); break; case SOURCE_REALTIME: s->mute = m; - prioq_reshuffle(s->event->realtime, s, &s->time.prioq_index); + prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index); + prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index); break; case SOURCE_SIGNAL: @@ -999,6 +1089,8 @@ int sd_event_source_get_time(sd_event_source *s, uint64_t *usec) { int sd_event_source_set_time(sd_event_source *s, uint64_t usec) { if (!s) return -EINVAL; + if (usec == (uint64_t) -1) + return -EINVAL; if (s->type != SOURCE_REALTIME && s->type != SOURCE_MONOTONIC) return -EDOM; @@ -1007,10 +1099,13 @@ int sd_event_source_set_time(sd_event_source *s, uint64_t usec) { s->time.next = usec; - if (s->type == SOURCE_REALTIME) - prioq_reshuffle(s->event->realtime, s, &s->time.prioq_index); - else - prioq_reshuffle(s->event->monotonic, s, &s->time.prioq_index); + if (s->type == SOURCE_REALTIME) { + prioq_reshuffle(s->event->realtime_earliest, s, &s->time.earliest_index); + prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index); + } else { + prioq_reshuffle(s->event->monotonic_earliest, s, &s->time.earliest_index); + prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index); + } return 0; } @@ -1052,40 +1147,99 @@ void* sd_event_source_get_userdata(sd_event_source *s) { return s->userdata; } +static usec_t sleep_between(sd_event *e, usec_t a, usec_t b) { + usec_t c; + assert(e); + assert(a <= b); + + if (a <= 0) + return 0; + + if (b <= a + 1) + return a; + + /* + Find a good time to wake up again between times a and b. We + have two goals here: + + a) We want to wake up as seldom as possible, hence prefer + later times over earlier times. + + b) But if we have to wake up, then let's make sure to + dispatch as much as possible on the entire system. + + We implement this by waking up everywhere at the same time + within any given second if we can, synchronised via the + perturbation value determined from the boot ID. If we can't, + then we try to find the same spot in every a 250ms + step. Otherwise, we pick the last possible time to wake up. + */ + + c = (b / USEC_PER_SEC) * USEC_PER_SEC + e->perturb; + if (c >= b) { + if (_unlikely_(c < USEC_PER_SEC)) + return b; + + c -= USEC_PER_SEC; + } + + if (c >= a) + return c; + + c = (b / (USEC_PER_MSEC*250)) * (USEC_PER_MSEC*250) + (e->perturb % (USEC_PER_MSEC*250)); + if (c >= b) { + if (_unlikely_(c < USEC_PER_MSEC*250)) + return b; + + c -= USEC_PER_MSEC*250; + } + + if (c >= a) + return c; + + return b; +} + static int event_arm_timer( sd_event *e, int timer_fd, - Prioq *prioq, + Prioq *earliest, + Prioq *latest, usec_t *next) { struct itimerspec its = {}; - sd_event_source *s; + sd_event_source *a, *b; + usec_t t; int r; assert_se(e); assert_se(next); - s = prioq_peek(prioq); - if (!s || s->mute == SD_EVENT_MUTED) + a = prioq_peek(earliest); + if (!a || a->mute == SD_EVENT_MUTED) return 0; - if (*next == s->time.next) + b = prioq_peek(latest); + assert_se(b && b->mute != SD_EVENT_MUTED); + + t = sleep_between(e, a->time.next, b->time.next + b->time.accuracy); + if (*next == t) return 0; assert_se(timer_fd >= 0); - if (s->time.next == 0) { + if (t == 0) { /* We don' want to disarm here, just mean some time looooong ago. */ its.it_value.tv_sec = 0; its.it_value.tv_nsec = 1; } else - timespec_store(&its.it_value, s->time.next); + timespec_store(&its.it_value, t); r = timerfd_settime(timer_fd, TFD_TIMER_ABSTIME, &its, NULL); if (r < 0) return r; - *next = s->time.next; + *next = t; return 0; } @@ -1131,14 +1285,14 @@ static int flush_timer(sd_event *e, int fd, uint32_t events) { return 0; } -static int process_timer(sd_event *e, usec_t n, Prioq *prioq) { +static int process_timer(sd_event *e, usec_t n, Prioq *earliest, Prioq *latest) { sd_event_source *s; int r; assert(e); for (;;) { - s = prioq_peek(prioq); + s = prioq_peek(earliest); if (!s || s->time.next > n || s->mute == SD_EVENT_MUTED || @@ -1149,9 +1303,8 @@ static int process_timer(sd_event *e, usec_t n, Prioq *prioq) { if (r < 0) return r; - r = prioq_reshuffle(prioq, s, &s->time.prioq_index); - if (r < 0) - return r; + prioq_reshuffle(earliest, s, &s->time.earliest_index); + prioq_reshuffle(latest, s, &s->time.latest_index); } return 0; @@ -1164,6 +1317,8 @@ static int process_child(sd_event *e) { assert(e); + e->need_process_child = false; + /* So, this is ugly. We iteratively invoke waitid() with P_PID + WNOHANG for each PID we wait for, instead of using @@ -1199,7 +1354,6 @@ static int process_child(sd_event *e) { } } - e->processed_children = e->iteration; return 0; } @@ -1323,6 +1477,19 @@ static int event_prepare(sd_event *e) { return 0; } +static sd_event_source* event_next_pending(sd_event *e) { + sd_event_source *p; + + p = prioq_peek(e->pending); + if (!p) + return NULL; + + if (p->mute == SD_EVENT_MUTED) + return NULL; + + return p; +} + int sd_event_run(sd_event *e, uint64_t timeout) { struct epoll_event ev_queue[EPOLL_QUEUE_MAX]; sd_event_source *p; @@ -1340,25 +1507,21 @@ int sd_event_run(sd_event *e, uint64_t timeout) { if (r < 0) return r; - r = event_arm_timer(e, e->monotonic_fd, e->monotonic, &e->monotonic_next); - if (r < 0) - return r; + if (event_next_pending(e) || e->need_process_child) + timeout = 0; - r = event_arm_timer(e, e->realtime_fd, e->realtime, &e->realtime_next); - if (r < 0) - return r; + if (timeout > 0) { + r = event_arm_timer(e, e->monotonic_fd, e->monotonic_earliest, e->monotonic_latest, &e->monotonic_next); + if (r < 0) + return r; - if (e->iteration == 1 && !hashmap_isempty(e->child_sources)) - /* On the first iteration, there might be already some - * zombies for us to care for, hence, don't wait */ - timeout = 0; - else { - p = prioq_peek(e->pending); - if (p && p->mute != SD_EVENT_MUTED) - timeout = 0; + r = event_arm_timer(e, e->realtime_fd, e->realtime_earliest, e->realtime_latest, &e->realtime_next); + if (r < 0) + return r; } - m = epoll_wait(e->epoll_fd, ev_queue, EPOLL_QUEUE_MAX, timeout == (uint64_t) -1 ? -1 : (int) ((timeout + USEC_PER_MSEC - 1) / USEC_PER_MSEC)); + m = epoll_wait(e->epoll_fd, ev_queue, EPOLL_QUEUE_MAX, + timeout == (uint64_t) -1 ? -1 : (int) ((timeout + USEC_PER_MSEC - 1) / USEC_PER_MSEC)); if (m < 0) return m; @@ -1379,24 +1542,22 @@ int sd_event_run(sd_event *e, uint64_t timeout) { return r; } - r = process_timer(e, n.monotonic, e->monotonic); + r = process_timer(e, n.monotonic, e->monotonic_earliest, e->monotonic_latest); if (r < 0) return r; - r = process_timer(e, n.realtime, e->realtime); + r = process_timer(e, n.realtime, e->realtime_earliest, e->realtime_latest); if (r < 0) return r; - if (e->iteration == 1 && e->processed_children != 1) { - /* On the first iteration, make sure we really process - * all children which might already be zombies. */ + if (e->need_process_child) { r = process_child(e); if (r < 0) return r; } - p = prioq_peek(e->pending); - if (!p || p->mute == SD_EVENT_MUTED) + p = event_next_pending(e); + if (!p) return 0; return source_dispatch(p); @@ -1438,3 +1599,38 @@ sd_event *sd_event_get(sd_event_source *s) { return s->event; } + +int sd_event_source_set_time_accuracy(sd_event_source *s, uint64_t usec) { + if (!s) + return -EINVAL; + if (s->type != SOURCE_MONOTONIC && s->type != SOURCE_REALTIME) + return -EDOM; + + if (usec == 0) + usec = DEFAULT_ACCURACY_USEC; + + if (s->time.accuracy == usec) + return 0; + + + s->time.accuracy = usec; + + if (s->type == SOURCE_REALTIME) + prioq_reshuffle(s->event->realtime_latest, s, &s->time.latest_index); + else + prioq_reshuffle(s->event->monotonic_latest, s, &s->time.latest_index); + + return 0; +} + +int sd_event_source_get_time_accuracy(sd_event_source *s, uint64_t *usec) { + if (!s) + return -EINVAL; + if (!usec) + return -EINVAL; + if (s->type != SOURCE_MONOTONIC && s->type != SOURCE_REALTIME) + return -EDOM; + + *usec = s->time.accuracy; + return 0; +} diff --git a/src/libsystemd-bus/test-event.c b/src/libsystemd-bus/test-event.c index 8dd47fe8a9..acb5cee367 100644 --- a/src/libsystemd-bus/test-event.c +++ b/src/libsystemd-bus/test-event.c @@ -151,7 +151,7 @@ int main(int argc, char *argv[]) { assert_se(sd_event_add_io(e, a[0], EPOLLIN, io_handler, INT_TO_PTR('a'), &x) >= 0); assert_se(sd_event_add_io(e, b[0], EPOLLIN, io_handler, INT_TO_PTR('b'), &y) >= 0); - assert_se(sd_event_add_monotonic(e, 0, time_handler, INT_TO_PTR('c'), &z) >= 0); + assert_se(sd_event_add_monotonic(e, 0, 0, time_handler, INT_TO_PTR('c'), &z) >= 0); assert_se(sd_event_source_set_priority(x, 99) >= 0); assert_se(sd_event_source_set_mute(y, SD_EVENT_ONESHOT) >= 0); diff --git a/src/shared/prioq.c b/src/shared/prioq.c index 2d166360aa..537befc623 100644 --- a/src/shared/prioq.c +++ b/src/shared/prioq.c @@ -217,7 +217,8 @@ _pure_ static struct prioq_item* find_item(Prioq *q, void *data, unsigned *idx) assert(q); if (idx) { - if (*idx > q->n_items) + if (*idx == PRIOQ_IDX_NULL || + *idx > q->n_items) return NULL; i = q->items + *idx; diff --git a/src/systemd/sd-event.h b/src/systemd/sd-event.h index bec41cff6d..90fea4e17a 100644 --- a/src/systemd/sd-event.h +++ b/src/systemd/sd-event.h @@ -38,7 +38,7 @@ TODO: - Detect forks and return ECHILD - - Timer events with accuracy for coalescing time events + - quit hooks */ typedef struct sd_event sd_event; @@ -62,8 +62,8 @@ sd_event* sd_event_ref(sd_event *e); sd_event* sd_event_unref(sd_event *e); int sd_event_add_io(sd_event *e, int fd, uint32_t events, sd_io_handler_t callback, void *userdata, sd_event_source **s); -int sd_event_add_monotonic(sd_event *e, uint64_t usec, sd_time_handler_t callback, void *userdata, sd_event_source **s); -int sd_event_add_realtime(sd_event *e, uint64_t usec, sd_time_handler_t callback, void *userdata, sd_event_source **s); +int sd_event_add_monotonic(sd_event *e, uint64_t usec, uint64_t accuracy, sd_time_handler_t callback, void *userdata, sd_event_source **s); +int sd_event_add_realtime(sd_event *e, uint64_t usec, uint64_t accuracy, sd_time_handler_t callback, void *userdata, sd_event_source **s); int sd_event_add_signal(sd_event *e, int sig, sd_signal_handler_t callback, void *userdata, sd_event_source **s); int sd_event_add_child(sd_event *e, pid_t pid, int options, sd_child_handler_t callback, void *userdata, sd_event_source **s); int sd_event_add_defer(sd_event *e, sd_defer_handler_t callback, void *userdata, sd_event_source **s); @@ -91,6 +91,8 @@ int sd_event_source_get_mute(sd_event_source *s, sd_event_mute_t *m); int sd_event_source_set_mute(sd_event_source *s, sd_event_mute_t m); int sd_event_source_get_time(sd_event_source *s, uint64_t *usec); int sd_event_source_set_time(sd_event_source *s, uint64_t usec); +int sd_event_source_set_time_accuracy(sd_event_source *s, uint64_t usec); +int sd_event_source_get_time_accuracy(sd_event_source *s, uint64_t *usec); int sd_event_source_set_prepare(sd_event_source *s, sd_prepare_handler_t callback); void* sd_event_source_get_userdata(sd_event_source *s); |