summaryrefslogtreecommitdiff
path: root/src/libsystemd
diff options
context:
space:
mode:
authorLennart Poettering <lennart@poettering.net>2014-03-03 01:33:45 +0100
committerLennart Poettering <lennart@poettering.net>2014-03-03 02:34:13 +0100
commit8f8f05a919355095518911135c3d630f4620a9b0 (patch)
treecb7b75f124dd743d2bcaf366a4a64125d1c34253 /src/libsystemd
parentd9256bac4da4241cb5d97960c899390839f2c6e5 (diff)
bus: add sd_bus_track object for tracking peers, and port core over to it
This is primarily useful for services that need to track clients which reference certain objects they maintain, or which explicitly want to subscribe to certain events. Something like this is done in a large number of services, and not trivial to do. Hence, let's unify this at one place. This also ports over PID 1 to use this to ensure that subscriptions to job and manager events are correctly tracked. As a side-effect this makes sure we properly serialize and restore the track list across daemon reexec/reload, which didn't work correctly before. This also simplifies how we distribute messages to broadcast to the direct busses: we only track subscriptions for the API bus and implicitly assume that all direct busses are subscribed. This should be a pretty OK simplification since clients connected via direct bus connections are shortlived anyway.
Diffstat (limited to 'src/libsystemd')
-rw-r--r--src/libsystemd/libsystemd.sym.m412
-rw-r--r--src/libsystemd/sd-bus/bus-internal.h2
-rw-r--r--src/libsystemd/sd-bus/bus-track.c314
-rw-r--r--src/libsystemd/sd-bus/bus-track.h24
-rw-r--r--src/libsystemd/sd-bus/sd-bus.c22
5 files changed, 374 insertions, 0 deletions
diff --git a/src/libsystemd/libsystemd.sym.m4 b/src/libsystemd/libsystemd.sym.m4
index 8d34615bc7..c708c7f38f 100644
--- a/src/libsystemd/libsystemd.sym.m4
+++ b/src/libsystemd/libsystemd.sym.m4
@@ -313,6 +313,18 @@ m4_ifdef(`ENABLE_KDBUS',
sd_bus_error_has_name;
sd_bus_label_escape;
sd_bus_label_unescape;
+ sd_bus_track_new;
+ sd_bus_track_ref;
+ sd_bus_track_unref;
+ sd_bus_track_get_bus;
+ sd_bus_track_add_sender;
+ sd_bus_track_remove_sender;
+ sd_bus_track_add_name;
+ sd_bus_track_remove_name;
+ sd_bus_track_count;
+ sd_bus_track_contains;
+ sd_bus_track_first;
+ sd_bus_track_next;
/* sd-memfd */
sd_memfd_new;
diff --git a/src/libsystemd/sd-bus/bus-internal.h b/src/libsystemd/sd-bus/bus-internal.h
index a4160ef4a1..ae5f66288f 100644
--- a/src/libsystemd/sd-bus/bus-internal.h
+++ b/src/libsystemd/sd-bus/bus-internal.h
@@ -274,6 +274,8 @@ struct sd_bus {
size_t bloom_size;
unsigned bloom_n_hash;
+
+ sd_bus_track *track_queue;
};
#define BUS_DEFAULT_TIMEOUT ((usec_t) (25 * USEC_PER_SEC))
diff --git a/src/libsystemd/sd-bus/bus-track.c b/src/libsystemd/sd-bus/bus-track.c
new file mode 100644
index 0000000000..4a5056e434
--- /dev/null
+++ b/src/libsystemd/sd-bus/bus-track.c
@@ -0,0 +1,314 @@
+/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
+
+/***
+ This file is part of systemd.
+
+ Copyright 2013 Lennart Poettering
+
+ systemd is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 2.1 of the License, or
+ (at your option) any later version.
+
+ systemd is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with systemd; If not, see <http://www.gnu.org/licenses/>.
+***/
+
+#include "sd-bus.h"
+#include "set.h"
+#include "bus-internal.h"
+#include "bus-track.h"
+
+struct sd_bus_track {
+ unsigned n_ref;
+ sd_bus *bus;
+ sd_bus_track_handler_t handler;
+ void *userdata;
+ Set *names;
+ LIST_FIELDS(sd_bus_track, queue);
+ Iterator iterator;
+ bool in_queue;
+ bool modified;
+};
+
+#define MATCH_PREFIX \
+ "type='signal'," \
+ "sender='org.freedesktop.DBus'," \
+ "path='/org/freedesktop/DBus'," \
+ "interface='org.freedesktop.DBus'," \
+ "member='NameOwnerChanged'," \
+ "arg0='"
+
+#define MATCH_SUFFIX \
+ "'"
+
+#define MATCH_FOR_NAME(name) \
+ ({ \
+ char *_x; \
+ size_t _l = strlen(name); \
+ _x = alloca(sizeof(MATCH_PREFIX)-1+_l+sizeof(MATCH_SUFFIX)); \
+ strcpy(stpcpy(stpcpy(_x, MATCH_PREFIX), name), MATCH_SUFFIX); \
+ _x; \
+ })
+
+static void bus_track_add_to_queue(sd_bus_track *track) {
+ assert(track);
+
+ if (track->in_queue)
+ return;
+
+ if (!track->handler)
+ return;
+
+ LIST_PREPEND(queue, track->bus->track_queue, track);
+ track->in_queue = true;
+}
+
+static void bus_track_remove_from_queue(sd_bus_track *track) {
+ assert(track);
+
+ if (!track->in_queue)
+ return;
+
+ LIST_REMOVE(queue, track->bus->track_queue, track);
+ track->in_queue = false;
+}
+
+_public_ int sd_bus_track_new(
+ sd_bus *bus,
+ sd_bus_track **track,
+ sd_bus_track_handler_t handler,
+ void *userdata) {
+
+ sd_bus_track *t;
+
+ assert_return(bus, -EINVAL);
+ assert_return(track, -EINVAL);
+
+ t = new0(sd_bus_track, 1);
+ if (!t)
+ return -ENOMEM;
+
+ t->n_ref = 1;
+ t->handler = handler;
+ t->userdata = userdata;
+ t->bus = sd_bus_ref(bus);
+
+ bus_track_add_to_queue(t);
+
+ *track = t;
+ return 0;
+}
+
+_public_ sd_bus_track* sd_bus_track_ref(sd_bus_track *track) {
+ assert_return(track, NULL);
+
+ assert(track->n_ref > 0);
+
+ track->n_ref++;
+
+ return track;
+}
+
+_public_ sd_bus_track* sd_bus_track_unref(sd_bus_track *track) {
+ const char *n;
+
+ if (!track)
+ return NULL;
+
+ assert(track->n_ref > 0);
+
+ if (track->n_ref > 1) {
+ track->n_ref --;
+ return NULL;
+ }
+
+ while ((n = set_first(track->names)))
+ sd_bus_track_remove_name(track, n);
+
+ bus_track_remove_from_queue(track);
+ set_free(track->names);
+ sd_bus_unref(track->bus);
+ free(track);
+
+ return NULL;
+}
+
+static int on_name_owner_changed(sd_bus *bus, sd_bus_message *message, void *userdata, sd_bus_error *error) {
+ sd_bus_track *track = userdata;
+ const char *name, *old, *new;
+ int r;
+
+ assert(bus);
+ assert(message);
+ assert(track);
+
+ r = sd_bus_message_read(message, "sss", &name, &old, &new);
+ if (r < 0)
+ return 0;
+
+ sd_bus_track_remove_name(track, name);
+ return 0;
+}
+
+_public_ int sd_bus_track_add_name(sd_bus_track *track, const char *name) {
+ _cleanup_free_ char *n = NULL;
+ const char *match;
+ int r;
+
+ assert_return(track, -EINVAL);
+ assert_return(service_name_is_valid(name), -EINVAL);
+
+ r = set_ensure_allocated(&track->names, string_hash_func, string_compare_func);
+ if (r < 0)
+ return r;
+
+ n = strdup(name);
+ if (!n)
+ return -ENOMEM;
+
+ r = set_put(track->names, n);
+ if (r == -EEXIST)
+ return 0;
+ if (r < 0)
+ return r;
+
+ /* First, subscribe to this name */
+ match = MATCH_FOR_NAME(name);
+ r = sd_bus_add_match(track->bus, match, on_name_owner_changed, track);
+ if (r < 0) {
+ set_remove(track->names, n);
+ return r;
+ }
+
+ /* Second, check if it is currently existing, or maybe
+ * doesn't, or maybe disappeared already. */
+ r = sd_bus_get_owner(track->bus, name, 0, NULL);
+ if (r < 0) {
+ set_remove(track->names, n);
+ sd_bus_remove_match(track->bus, match, on_name_owner_changed, track);
+ return r;
+ }
+
+ n = NULL;
+
+ bus_track_remove_from_queue(track);
+ track->modified = true;
+
+ return 1;
+}
+
+_public_ int sd_bus_track_remove_name(sd_bus_track *track, const char *name) {
+ const char *match;
+ _cleanup_free_ char *n;
+
+ assert_return(name, -EINVAL);
+
+ if (!track)
+ return 0;
+
+ n = set_remove(track->names, (char*) name);
+ if (!n)
+ return 0;
+
+ match = MATCH_FOR_NAME(n);
+ sd_bus_remove_match(track->bus, match, on_name_owner_changed, track);
+
+ if (set_isempty(track->names))
+ bus_track_add_to_queue(track);
+
+ track->modified = true;
+
+ return 1;
+}
+
+_public_ unsigned sd_bus_track_count(sd_bus_track *track) {
+ if (!track)
+ return 0;
+
+ return set_size(track->names);
+}
+
+_public_ const char* sd_bus_track_contains(sd_bus_track *track, const char *name) {
+ assert_return(track, NULL);
+ assert_return(name, NULL);
+
+ return set_get(track->names, (void*) name);
+}
+
+_public_ const char* sd_bus_track_first(sd_bus_track *track) {
+ if (!track)
+ return NULL;
+
+ track->modified = false;
+ track->iterator = NULL;
+
+ return set_iterate(track->names, &track->iterator);
+}
+
+_public_ const char* sd_bus_track_next(sd_bus_track *track) {
+ if (!track)
+ return NULL;
+
+ if (track->modified)
+ return NULL;
+
+ return set_iterate(track->names, &track->iterator);
+}
+
+_public_ int sd_bus_track_add_sender(sd_bus_track *track, sd_bus_message *m) {
+ const char *sender;
+
+ assert_return(track, -EINVAL);
+ assert_return(m, -EINVAL);
+
+ sender = sd_bus_message_get_sender(m);
+ if (!sender)
+ return -EINVAL;
+
+ return sd_bus_track_add_name(track, sender);
+}
+
+_public_ int sd_bus_track_remove_sender(sd_bus_track *track, sd_bus_message *m) {
+ const char *sender;
+
+ assert_return(track, -EINVAL);
+ assert_return(m, -EINVAL);
+
+ sender = sd_bus_message_get_sender(m);
+ if (!sender)
+ return -EINVAL;
+
+ return sd_bus_track_remove_name(track, sender);
+}
+
+_public_ sd_bus* sd_bus_track_get_bus(sd_bus_track *track) {
+ assert_return(track, NULL);
+
+ return track->bus;
+}
+
+void bus_track_dispatch(sd_bus_track *track) {
+ int r;
+
+ assert(track);
+ assert(track->in_queue);
+ assert(track->handler);
+
+ bus_track_remove_from_queue(track);
+
+ sd_bus_track_ref(track);
+
+ r = track->handler(track, track->userdata);
+ if (r < 0)
+ log_debug("Failed to process track handler: %s", strerror(-r));
+ else if (r == 0)
+ bus_track_add_to_queue(track);
+
+ sd_bus_track_unref(track);
+}
diff --git a/src/libsystemd/sd-bus/bus-track.h b/src/libsystemd/sd-bus/bus-track.h
new file mode 100644
index 0000000000..f8690a5239
--- /dev/null
+++ b/src/libsystemd/sd-bus/bus-track.h
@@ -0,0 +1,24 @@
+/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/
+
+#pragma once
+
+/***
+ This file is part of systemd.
+
+ Copyright 2013 Lennart Poettering
+
+ systemd is free software; you can redistribute it and/or modify it
+ under the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 2.1 of the License, or
+ (at your option) any later version.
+
+ systemd is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with systemd; If not, see <http://www.gnu.org/licenses/>.
+***/
+
+void bus_track_dispatch(sd_bus_track *track);
diff --git a/src/libsystemd/sd-bus/sd-bus.c b/src/libsystemd/sd-bus/sd-bus.c
index 636715f759..fbf1a5919f 100644
--- a/src/libsystemd/sd-bus/sd-bus.c
+++ b/src/libsystemd/sd-bus/sd-bus.c
@@ -51,6 +51,7 @@
#include "bus-util.h"
#include "bus-container.h"
#include "bus-protocol.h"
+#include "bus-track.h"
static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec);
static int attach_io_events(sd_bus *b);
@@ -131,6 +132,8 @@ static void bus_free(sd_bus *b) {
assert(b);
+ assert(!b->track_queue);
+
sd_bus_detach_event(b);
if (b->default_bus_ptr)
@@ -2000,6 +2003,11 @@ _public_ int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
assert_return(BUS_IS_OPEN(bus->state) || bus->state == BUS_CLOSING, -ENOTCONN);
assert_return(!bus_pid_changed(bus), -ECHILD);
+ if (bus->track_queue) {
+ *timeout_usec = 0;
+ return 1;
+ }
+
if (bus->state == BUS_CLOSING) {
*timeout_usec = 0;
return 1;
@@ -2282,6 +2290,16 @@ finish:
return r;
}
+static int dispatch_track(sd_bus *bus) {
+ assert(bus);
+
+ if (!bus->track_queue)
+ return 0;
+
+ bus_track_dispatch(bus->track_queue);
+ return 1;
+}
+
static int process_running(sd_bus *bus, bool hint_priority, int64_t priority, sd_bus_message **ret) {
_cleanup_bus_message_unref_ sd_bus_message *m = NULL;
int r;
@@ -2297,6 +2315,10 @@ static int process_running(sd_bus *bus, bool hint_priority, int64_t priority, sd
if (r != 0)
goto null_message;
+ r = dispatch_track(bus);
+ if (r != 0)
+ goto null_message;
+
r = dispatch_rqueue(bus, hint_priority, priority, &m);
if (r < 0)
return r;