diff options
-rw-r--r-- | Makefile.am | 4 | ||||
-rw-r--r-- | src/core/dbus-client-track.c | 251 | ||||
-rw-r--r-- | src/core/dbus-job.c | 64 | ||||
-rw-r--r-- | src/core/dbus-manager.c | 99 | ||||
-rw-r--r-- | src/core/dbus-manager.h | 2 | ||||
-rw-r--r-- | src/core/dbus-unit.c | 34 | ||||
-rw-r--r-- | src/core/dbus.c | 95 | ||||
-rw-r--r-- | src/core/dbus.h | 7 | ||||
-rw-r--r-- | src/core/job.c | 22 | ||||
-rw-r--r-- | src/core/job.h | 3 | ||||
-rw-r--r-- | src/core/manager.c | 11 | ||||
-rw-r--r-- | src/core/manager.h | 8 | ||||
-rw-r--r-- | src/core/unit.c | 3 | ||||
-rw-r--r-- | src/libsystemd/libsystemd.sym.m4 | 12 | ||||
-rw-r--r-- | src/libsystemd/sd-bus/bus-internal.h | 2 | ||||
-rw-r--r-- | src/libsystemd/sd-bus/bus-track.c | 314 | ||||
-rw-r--r-- | src/libsystemd/sd-bus/bus-track.h (renamed from src/core/dbus-client-track.h) | 20 | ||||
-rw-r--r-- | src/libsystemd/sd-bus/sd-bus.c | 22 | ||||
-rw-r--r-- | src/systemd/sd-bus.h | 19 |
19 files changed, 569 insertions, 423 deletions
diff --git a/Makefile.am b/Makefile.am index 15500adaae..38445fb33c 100644 --- a/Makefile.am +++ b/Makefile.am @@ -975,8 +975,6 @@ libsystemd_core_la_SOURCES = \ src/core/dbus-kill.h \ src/core/dbus-cgroup.c \ src/core/dbus-cgroup.h \ - src/core/dbus-client-track.c \ - src/core/dbus-client-track.h \ src/core/cgroup.c \ src/core/cgroup.h \ src/core/selinux-access.c \ @@ -2056,6 +2054,8 @@ libsystemd_internal_la_SOURCES = \ src/libsystemd/sd-bus/bus-gvariant.c \ src/libsystemd/sd-bus/bus-gvariant.h \ src/libsystemd/sd-bus/bus-convenience.c \ + src/libsystemd/sd-bus/bus-track.c \ + src/libsystemd/sd-bus/bus-track.h \ src/libsystemd/sd-bus/bus-util.c \ src/libsystemd/sd-bus/bus-util.h \ src/libsystemd/sd-bus/bus-protocol.h \ diff --git a/src/core/dbus-client-track.c b/src/core/dbus-client-track.c deleted file mode 100644 index 07dfea49e6..0000000000 --- a/src/core/dbus-client-track.c +++ /dev/null @@ -1,251 +0,0 @@ -/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/ - -/*** - This file is part of systemd. - - Copyright 2013 Lennart Poettering - - systemd is free software; you can redistribute it and/or modify it - under the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 2.1 of the License, or - (at your option) any later version. - - systemd is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with systemd; If not, see <http://www.gnu.org/licenses/>. -***/ - -#include "bus-util.h" -#include "dbus-client-track.h" - -static unsigned long tracked_client_hash(const void *a, const uint8_t hash_key[HASH_KEY_SIZE]) { - const BusTrackedClient *x = a; - - return string_hash_func(x->name, hash_key) ^ trivial_hash_func(x->bus, hash_key); -} - -static int tracked_client_compare(const void *a, const void *b) { - const BusTrackedClient *x = a, *y = b; - int r; - - r = strcmp(x->name, y->name); - if (r != 0) - return r; - - if (x->bus < y->bus) - return -1; - if (x->bus > y->bus) - return 1; - - return 0; -} - -static int on_name_owner_changed(sd_bus *bus, sd_bus_message *message, void *userdata, sd_bus_error *error) { - BusTrackedClient *c = userdata; - const char *name, *old, *new; - int r; - - assert(bus); - assert(message); - - r = sd_bus_message_read(message, "sss", &name, &old, &new); - if (r < 0) { - bus_log_parse_error(r); - return r; - } - - bus_client_untrack(c->set, bus, name); - return 0; -} - -static char *build_match(const char *name) { - - return strjoin("type='signal'," - "sender='org.freedesktop.DBus'," - "path='/org/freedesktop/DBus'," - "interface='org.freedesktop.DBus'," - "member='NameOwnerChanged'," - "arg0='", name, "'", NULL); -} - -int bus_client_track(Set **s, sd_bus *bus, const char *name) { - BusTrackedClient *c, *found; - size_t l; - int r; - - assert(s); - assert(bus); - - r = set_ensure_allocated(s, tracked_client_hash, tracked_client_compare); - if (r < 0) - return r; - - name = strempty(name); - - l = strlen(name); - - c = alloca(offsetof(BusTrackedClient, name) + l + 1); - c->set = *s; - c->bus = bus; - strcpy(c->name, name); - - found = set_get(*s, c); - if (found) - return 0; - - c = memdup(c, offsetof(BusTrackedClient, name) + l + 1); - if (!c) - return -ENOMEM; - - r = set_put(*s, c); - if (r < 0) { - free(c); - return r; - } - - if (!isempty(name)) { - _cleanup_free_ char *match = NULL; - - match = build_match(name); - if (!match) { - set_remove(*s, c); - free(c); - return -ENOMEM; - } - - r = sd_bus_add_match(bus, match, on_name_owner_changed, c); - if (r < 0) { - set_remove(*s, c); - free(c); - return r; - } - } - - sd_bus_ref(c->bus); - return 1; -} - -static void bus_client_free_one(Set *s, BusTrackedClient *c) { - assert(s); - assert(c); - - if (!isempty(c->name)) { - _cleanup_free_ char *match = NULL; - - match = build_match(c->name); - if (match) - sd_bus_remove_match(c->bus, match, on_name_owner_changed, c); - } - - sd_bus_unref(c->bus); - set_remove(s, c); - free(c); -} - -int bus_client_untrack(Set *s, sd_bus *bus, const char *name) { - BusTrackedClient *c, *found; - size_t l; - - assert(bus); - assert(s); - assert(name); - - name = strempty(name); - - l = strlen(name); - - c = alloca(offsetof(BusTrackedClient, name) + l + 1); - c->bus = bus; - strcpy(c->name, name); - - found = set_get(s, c); - if (!found) - return 0; - - bus_client_free_one(s, found); - return 1; -} - -void bus_client_track_free(Set *s) { - BusTrackedClient *c; - - while ((c = set_first(s))) - bus_client_free_one(s, c); - - set_free(s); -} - -int bus_client_untrack_bus(Set *s, sd_bus *bus) { - BusTrackedClient *c; - Iterator i; - int r = 0; - - SET_FOREACH(c, s, i) - if (c->bus == bus) { - bus_client_free_one(s, c); - r++; - } - - return r; -} - -void bus_client_track_serialize(Manager *m, FILE *f, Set *s) { - BusTrackedClient *c; - Iterator i; - - assert(m); - assert(f); - - SET_FOREACH(c, s, i) { - if (c->bus == m->api_bus) - fprintf(f, "subscribed=%s\n", isempty(c->name) ? "*" : c->name); - else - fprintf(f, "subscribed=%p %s\n", c->bus, isempty(c->name) ? "*" : c->name); - } -} - -int bus_client_track_deserialize_item(Manager *m, Set **s, const char *line) { - const char *e, *q, *name; - sd_bus *bus; - void *p; - int r; - - e = startswith(line, "subscribed="); - if (!e) - return 0; - - q = strpbrk(e, WHITESPACE); - if (!q) { - if (m->api_bus) { - bus = m->api_bus; - name = e; - goto finish; - } - - return 1; - } - - if (sscanf(e, "%p", &p) != 1) { - log_debug("Failed to parse subscription pointer."); - return -EINVAL; - } - - bus = set_get(m->private_buses, p); - if (!bus) - return 1; - - name = q + strspn(q, WHITESPACE); - -finish: - r = bus_client_track(s, bus, streq(name, "*") ? NULL : name); - if (r < 0) { - log_debug("Failed to deserialize client subscription: %s", strerror(-r)); - return r; - } - - return 1; -} diff --git a/src/core/dbus-job.c b/src/core/dbus-job.c index a8eae4734e..5c364a4264 100644 --- a/src/core/dbus-job.c +++ b/src/core/dbus-job.c @@ -24,7 +24,7 @@ #include "selinux-access.h" #include "job.h" #include "dbus-job.h" -#include "dbus-client-track.h" +#include "dbus.h" static BUS_DEFINE_PROPERTY_GET_ENUM(property_get_type, job_type, JobType); static BUS_DEFINE_PROPERTY_GET_ENUM(property_get_state, job_state, JobState); @@ -79,53 +79,10 @@ const sd_bus_vtable bus_job_vtable[] = { SD_BUS_VTABLE_END }; -static int foreach_client(Job *j, int (*send_message)(sd_bus *bus, const char *name, Job *j)) { - BusTrackedClient *one_destination = NULL; - Iterator i; - sd_bus *b; - unsigned n, m; - int r, ret; - - assert(j); - assert(send_message); - - n = set_size(j->manager->subscribed); - m = set_size(j->subscribed); - - if (n <= 0 && m <= 0) - return 0; - - if (n == 1 && m == 0) - one_destination = set_first(j->manager->subscribed); - else if (n == 0 && m == 1) - one_destination = set_first(j->subscribed); - else - one_destination = NULL; - - if (one_destination) - return send_message(one_destination->bus, isempty(one_destination->name) ? NULL : one_destination->name, j); - - ret = 0; - - /* Send to everybody */ - SET_FOREACH(b, j->manager->private_buses, i) { - r = send_message(b, NULL, j); - if (r < 0) - ret = r; - } - - if (j->manager->api_bus) { - r = send_message(j->manager->api_bus, NULL, j); - if (r < 0) - ret = r; - } - - return ret; -} - -static int send_new_signal(sd_bus *bus, const char *destination, Job *j) { +static int send_new_signal(sd_bus *bus, void *userdata) { _cleanup_bus_message_unref_ sd_bus_message *m = NULL; _cleanup_free_ char *p = NULL; + Job *j = userdata; int r; assert(bus); @@ -148,11 +105,12 @@ static int send_new_signal(sd_bus *bus, const char *destination, Job *j) { if (r < 0) return r; - return sd_bus_send_to(bus, m, destination, NULL); + return sd_bus_send(bus, m, NULL); } -static int send_changed_signal(sd_bus *bus, const char *destination, Job *j) { +static int send_changed_signal(sd_bus *bus, void *userdata) { _cleanup_free_ char *p = NULL; + Job *j = userdata; assert(bus); assert(j); @@ -174,16 +132,17 @@ void bus_job_send_change_signal(Job *j) { j->in_dbus_queue = false; } - r = foreach_client(j, j->sent_dbus_new_signal ? send_changed_signal : send_new_signal); + r = bus_foreach_bus(j->manager, j->subscribed, j->sent_dbus_new_signal ? send_changed_signal : send_new_signal, j); if (r < 0) log_debug("Failed to send job change signal for %u: %s", j->id, strerror(-r)); j->sent_dbus_new_signal = true; } -static int send_removed_signal(sd_bus *bus, const char *destination, Job *j) { +static int send_removed_signal(sd_bus *bus, void *userdata) { _cleanup_bus_message_unref_ sd_bus_message *m = NULL; _cleanup_free_ char *p = NULL; + Job *j = userdata; int r; assert(bus); @@ -193,7 +152,6 @@ static int send_removed_signal(sd_bus *bus, const char *destination, Job *j) { if (!p) return -ENOMEM; - r = sd_bus_message_new_signal( bus, &m, @@ -207,7 +165,7 @@ static int send_removed_signal(sd_bus *bus, const char *destination, Job *j) { if (r < 0) return r; - return sd_bus_send_to(bus, m, destination, NULL); + return sd_bus_send(bus, m, NULL); } void bus_job_send_removed_signal(Job *j) { @@ -218,7 +176,7 @@ void bus_job_send_removed_signal(Job *j) { if (!j->sent_dbus_new_signal) bus_job_send_change_signal(j); - r = foreach_client(j, send_removed_signal); + r = bus_foreach_bus(j->manager, j->subscribed, send_removed_signal, j); if (r < 0) log_debug("Failed to send job remove signal for %u: %s", j->id, strerror(-r)); } diff --git a/src/core/dbus-manager.c b/src/core/dbus-manager.c index f1e344c180..34ef1f5d29 100644 --- a/src/core/dbus-manager.c +++ b/src/core/dbus-manager.c @@ -37,7 +37,6 @@ #include "dbus-manager.h" #include "dbus-unit.h" #include "dbus-snapshot.h" -#include "dbus-client-track.h" #include "dbus-execute.h" #include "bus-errors.h" @@ -824,11 +823,23 @@ static int method_subscribe(sd_bus *bus, sd_bus_message *message, void *userdata if (r < 0) return r; - r = bus_client_track(&m->subscribed, bus, sd_bus_message_get_sender(message)); - if (r < 0) - return r; - if (r == 0) - return sd_bus_error_setf(error, BUS_ERROR_ALREADY_SUBSCRIBED, "Client is already subscribed."); + if (bus == m->api_bus) { + + /* Note that direct bus connection subscribe by + * default, we only track peers on the API bus here */ + + if (!m->subscribed) { + r = sd_bus_track_new(bus, &m->subscribed, NULL, NULL); + if (r < 0) + return r; + } + + r = sd_bus_track_add_sender(m->subscribed, message); + if (r < 0) + return r; + if (r == 0) + return sd_bus_error_setf(error, BUS_ERROR_ALREADY_SUBSCRIBED, "Client is already subscribed."); + } return sd_bus_reply_method_return(message, NULL); } @@ -845,11 +856,13 @@ static int method_unsubscribe(sd_bus *bus, sd_bus_message *message, void *userda if (r < 0) return r; - r = bus_client_untrack(m->subscribed, bus, sd_bus_message_get_sender(message)); - if (r < 0) - return r; - if (r == 0) - return sd_bus_error_setf(error, BUS_ERROR_NOT_SUBSCRIBED, "Client is not subscribed."); + if (bus == m->api_bus) { + r = sd_bus_track_remove_sender(m->subscribed, message); + if (r < 0) + return r; + if (r == 0) + return sd_bus_error_setf(error, BUS_ERROR_NOT_SUBSCRIBED, "Client is not subscribed."); + } return sd_bus_reply_method_return(message, NULL); } @@ -1348,7 +1361,7 @@ static int method_get_default_target(sd_bus *bus, sd_bus_message *message, void return sd_bus_reply_method_return(message, "s", default_target); } -static int send_unit_files_changed(sd_bus *bus, const char *destination, void *userdata) { +static int send_unit_files_changed(sd_bus *bus, void *userdata) { _cleanup_bus_message_unref_ sd_bus_message *message = NULL; int r; @@ -1358,7 +1371,7 @@ static int send_unit_files_changed(sd_bus *bus, const char *destination, void *u if (r < 0) return r; - return sd_bus_send_to(bus, message, destination, NULL); + return sd_bus_send(bus, message, NULL); } static int reply_unit_file_changes_and_free( @@ -1374,7 +1387,7 @@ static int reply_unit_file_changes_and_free( int r; if (n_changes > 0) - bus_manager_foreach_client(m, send_unit_files_changed, NULL); + bus_foreach_bus(m, NULL, send_unit_files_changed, NULL); r = sd_bus_message_new_method_return(message, &reply); if (r < 0) @@ -1656,41 +1669,7 @@ const sd_bus_vtable bus_manager_vtable[] = { SD_BUS_VTABLE_END }; -int bus_manager_foreach_client(Manager *m, int (*send_message)(sd_bus *bus, const char *destination, void *userdata), void *userdata) { - Iterator i; - sd_bus *b; - unsigned n; - int r, ret; - - n = set_size(m->subscribed); - if (n <= 0) - return 0; - if (n == 1) { - BusTrackedClient *d; - - assert_se(d = set_first(m->subscribed)); - return send_message(d->bus, isempty(d->name) ? NULL : d->name, userdata); - } - - ret = 0; - - /* Send to everybody */ - SET_FOREACH(b, m->private_buses, i) { - r = send_message(b, NULL, userdata); - if (r < 0) - ret = r; - } - - if (m->api_bus) { - r = send_message(m->api_bus, NULL, userdata); - if (r < 0) - ret = r; - } - - return ret; -} - -static int send_finished(sd_bus *bus, const char *destination, void *userdata) { +static int send_finished(sd_bus *bus, void *userdata) { _cleanup_bus_message_unref_ sd_bus_message *message = NULL; usec_t *times = userdata; int r; @@ -1706,7 +1685,7 @@ static int send_finished(sd_bus *bus, const char *destination, void *userdata) { if (r < 0) return r; - return sd_bus_send_to(bus, message, destination, NULL); + return sd_bus_send(bus, message, NULL); } void bus_manager_send_finished( @@ -1722,13 +1701,23 @@ void bus_manager_send_finished( assert(m); - r = bus_manager_foreach_client(m, send_finished, - (usec_t[6]) { firmware_usec, loader_usec, kernel_usec, initrd_usec, userspace_usec, total_usec }); + r = bus_foreach_bus( + m, + NULL, + send_finished, + (usec_t[6]) { + firmware_usec, + loader_usec, + kernel_usec, + initrd_usec, + userspace_usec, + total_usec + }); if (r < 0) log_debug("Failed to send finished signal: %s", strerror(-r)); } -static int send_reloading(sd_bus *bus, const char *destination, void *userdata) { +static int send_reloading(sd_bus *bus, void *userdata) { _cleanup_bus_message_unref_ sd_bus_message *message = NULL; int r; @@ -1742,7 +1731,7 @@ static int send_reloading(sd_bus *bus, const char *destination, void *userdata) if (r < 0) return r; - return sd_bus_send_to(bus, message, destination, NULL); + return sd_bus_send(bus, message, NULL); } void bus_manager_send_reloading(Manager *m, bool active) { @@ -1750,7 +1739,7 @@ void bus_manager_send_reloading(Manager *m, bool active) { assert(m); - r = bus_manager_foreach_client(m, send_reloading, INT_TO_PTR(active)); + r = bus_foreach_bus(m, NULL, send_reloading, INT_TO_PTR(active)); if (r < 0) log_debug("Failed to send reloading signal: %s", strerror(-r)); diff --git a/src/core/dbus-manager.h b/src/core/dbus-manager.h index 0ef99fad41..e1903fa16a 100644 --- a/src/core/dbus-manager.h +++ b/src/core/dbus-manager.h @@ -26,7 +26,5 @@ extern const sd_bus_vtable bus_manager_vtable[]; -int bus_manager_foreach_client(Manager *m, int (*send_message)(sd_bus *bus, const char *destination, void *userdata), void *userdata); - void bus_manager_send_finished(Manager *m, usec_t firmware_usec, usec_t loader_usec, usec_t kernel_usec, usec_t initrd_usec, usec_t userspace_usec, usec_t total_usec); void bus_manager_send_reloading(Manager *m, bool active); diff --git a/src/core/dbus-unit.c b/src/core/dbus-unit.c index 24d8a598e7..515ac8b78c 100644 --- a/src/core/dbus-unit.c +++ b/src/core/dbus-unit.c @@ -26,10 +26,10 @@ #include "strv.h" #include "path-util.h" #include "fileio.h" -#include "dbus-unit.h" -#include "dbus-manager.h" #include "bus-errors.h" -#include "dbus-client-track.h" +#include "dbus.h" +#include "dbus-manager.h" +#include "dbus-unit.h" static BUS_DEFINE_PROPERTY_GET_ENUM(property_get_load_state, unit_load_state, UnitLoadState); static BUS_DEFINE_PROPERTY_GET_ENUM(property_get_job_mode, job_mode, JobMode); @@ -589,7 +589,7 @@ const sd_bus_vtable bus_unit_cgroup_vtable[] = { SD_BUS_VTABLE_END }; -static int send_new_signal(sd_bus *bus, const char *destination, void *userdata) { +static int send_new_signal(sd_bus *bus, void *userdata) { _cleanup_bus_message_unref_ sd_bus_message *m = NULL; _cleanup_free_ char *p = NULL; Unit *u = userdata; @@ -615,10 +615,10 @@ static int send_new_signal(sd_bus *bus, const char *destination, void *userdata) if (r < 0) return r; - return sd_bus_send_to(bus, m, destination, NULL); + return sd_bus_send(bus, m, NULL); } -static int send_changed_signal(sd_bus *bus, const char *destination, void *userdata) { +static int send_changed_signal(sd_bus *bus, void *userdata) { _cleanup_free_ char *p = NULL; Unit *u = userdata; int r; @@ -667,14 +667,14 @@ void bus_unit_send_change_signal(Unit *u) { if (!u->id) return; - r = bus_manager_foreach_client(u->manager, u->sent_dbus_new_signal ? send_changed_signal : send_new_signal, u); + r = bus_foreach_bus(u->manager, NULL, u->sent_dbus_new_signal ? send_changed_signal : send_new_signal, u); if (r < 0) log_debug("Failed to send unit change signal for %s: %s", u->id, strerror(-r)); u->sent_dbus_new_signal = true; } -static int send_removed_signal(sd_bus *bus, const char *destination, void *userdata) { +static int send_removed_signal(sd_bus *bus, void *userdata) { _cleanup_bus_message_unref_ sd_bus_message *m = NULL; _cleanup_free_ char *p = NULL; Unit *u = userdata; @@ -700,7 +700,7 @@ static int send_removed_signal(sd_bus *bus, const char *destination, void *userd if (r < 0) return r; - return sd_bus_send_to(bus, m, destination, NULL); + return sd_bus_send(bus, m, NULL); } void bus_unit_send_removed_signal(Unit *u) { @@ -713,7 +713,7 @@ void bus_unit_send_removed_signal(Unit *u) { if (!u->id) return; - r = bus_manager_foreach_client(u->manager, send_removed_signal, u); + r = bus_foreach_bus(u->manager, NULL, send_removed_signal, u); if (r < 0) log_debug("Failed to send unit remove signal for %s: %s", u->id, strerror(-r)); } @@ -765,9 +765,17 @@ int bus_unit_queue_job( if (r < 0) return r; - r = bus_client_track(&j->subscribed, bus, sd_bus_message_get_sender(message)); - if (r < 0) - return r; + if (bus == u->manager->api_bus) { + if (!j->subscribed) { + r = sd_bus_track_new(bus, &j->subscribed, NULL, NULL); + if (r < 0) + return r; + } + + r = sd_bus_track_add_sender(j->subscribed, message); + if (r < 0) + return r; + } path = job_dbus_path(j); if (!path) diff --git a/src/core/dbus.c b/src/core/dbus.c index 1059415711..be8dfc90f8 100644 --- a/src/core/dbus.c +++ b/src/core/dbus.c @@ -41,7 +41,6 @@ #include "bus-error.h" #include "bus-errors.h" #include "strxcpyx.h" -#include "dbus-client-track.h" #include "bus-internal.h" #include "selinux-access.h" @@ -1040,9 +1039,12 @@ static void destroy_bus(Manager *m, sd_bus **bus) { return; /* Get rid of tracked clients on this bus */ - bus_client_untrack_bus(m->subscribed, *bus); + if (m->subscribed && sd_bus_track_get_bus(m->subscribed) == *bus) + m->subscribed = sd_bus_track_unref(m->subscribed); + HASHMAP_FOREACH(j, m->jobs, i) - bus_client_untrack_bus(j->subscribed, *bus); + if (j->subscribed && sd_bus_track_get_bus(j->subscribed) == *bus) + j->subscribed = sd_bus_track_unref(j->subscribed); /* Get rid of queued message on this bus */ if (m->queued_message_bus == *bus) { @@ -1075,7 +1077,11 @@ void bus_done(Manager *m) { destroy_bus(m, &b); set_free(m->private_buses); - set_free(m->subscribed); + m->private_buses = NULL; + + m->subscribed = sd_bus_track_unref(m->subscribed); + strv_free(m->deserialized_subscribed); + m->deserialized_subscribed = NULL; if (m->private_listen_event_source) m->private_listen_event_source = sd_event_source_unref(m->private_listen_event_source); @@ -1126,16 +1132,85 @@ int bus_fdset_add_all(Manager *m, FDSet *fds) { return 0; } -void bus_serialize(Manager *m, FILE *f) { - assert(m); +int bus_foreach_bus( + Manager *m, + sd_bus_track *subscribed2, + int (*send_message)(sd_bus *bus, void *userdata), + void *userdata) { + + Iterator i; + sd_bus *b; + int r, ret = 0; + + /* Send to all direct busses, unconditionally */ + SET_FOREACH(b, m->private_buses, i) { + r = send_message(b, userdata); + if (r < 0) + ret = r; + } + + /* Send to API bus, but only if somebody is subscribed */ + if (sd_bus_track_count(m->subscribed) > 0 || + sd_bus_track_count(subscribed2) > 0) { + r = send_message(m->api_bus, userdata); + if (r < 0) + ret = r; + } + + return ret; +} + +void bus_track_serialize(sd_bus_track *t, FILE *f) { + const char *n; + + assert(t); assert(f); - bus_client_track_serialize(m, f, m->subscribed); + for (n = sd_bus_track_first(t); n; n = sd_bus_track_next(t)) + fprintf(f, "subscribed=%s\n", n); } -int bus_deserialize_item(Manager *m, const char *line) { - assert(m); +int bus_track_deserialize_item(char ***l, const char *line) { + const char *e; + + assert(l); assert(line); - return bus_client_track_deserialize_item(m, &m->subscribed, line); + e = startswith(line, "subscribed="); + if (!e) + return 0; + + return strv_extend(l, e); +} + +int bus_track_coldplug(Manager *m, sd_bus_track **t, char ***l) { + int r = 0; + + assert(m); + assert(t); + assert(l); + + if (!strv_isempty(*l) && m->api_bus) { + char **i; + + if (!*t) { + r = sd_bus_track_new(m->api_bus, t, NULL, NULL); + if (r < 0) + return r; + } + + r = 0; + STRV_FOREACH(i, *l) { + int k; + + k = sd_bus_track_add_name(*t, *i); + if (k < 0) + r = k; + } + } + + strv_free(*l); + *l = NULL; + + return r; } diff --git a/src/core/dbus.h b/src/core/dbus.h index a3bef47d5d..bfb236ec8e 100644 --- a/src/core/dbus.h +++ b/src/core/dbus.h @@ -30,5 +30,8 @@ void bus_done(Manager *m); int bus_fdset_add_all(Manager *m, FDSet *fds); -void bus_serialize(Manager *m, FILE *f); -int bus_deserialize_item(Manager *m, const char *line); +void bus_track_serialize(sd_bus_track *t, FILE *f); +int bus_track_deserialize_item(char ***l, const char *line); +int bus_track_coldplug(Manager *m, sd_bus_track **t, char ***l); + +int bus_foreach_bus(Manager *m, sd_bus_track *subscribed2, int (*send_message)(sd_bus *bus, void *userdata), void *userdata); diff --git a/src/core/job.c b/src/core/job.c index 0cd4397bf2..9c099c686f 100644 --- a/src/core/job.c +++ b/src/core/job.c @@ -37,7 +37,7 @@ #include "special.h" #include "async.h" #include "virt.h" -#include "dbus-client-track.h" +#include "dbus.h" Job* job_new_raw(Unit *unit) { Job *j; @@ -90,7 +90,8 @@ void job_free(Job *j) { sd_event_source_unref(j->timer_event_source); - bus_client_track_free(j->subscribed); + sd_bus_track_unref(j->subscribed); + strv_free(j->deserialized_subscribed); free(j); } @@ -931,7 +932,7 @@ int job_serialize(Job *j, FILE *f, FDSet *fds) { if (j->begin_usec > 0) fprintf(f, "job-begin="USEC_FMT"\n", j->begin_usec); - bus_client_track_serialize(j->manager, f, j->subscribed); + bus_track_serialize(j->subscribed, f); /* End marker */ fputc('\n', f); @@ -1035,13 +1036,10 @@ int job_deserialize(Job *j, FILE *f, FDSet *fds) { else j->begin_usec = ull; - } else { - char t[strlen(l) + 1 + strlen(v) + 1]; + } else if (streq(l, "subscribed")) { - strcpy(stpcpy(stpcpy(t, l), "="), v); - - if (bus_client_track_deserialize_item(j->manager, &j->subscribed, t) == 0) - log_debug("Unknown deserialization key '%s'", l); + if (strv_extend(&j->deserialized_subscribed, v) < 0) + return log_oom(); } } } @@ -1051,6 +1049,12 @@ int job_coldplug(Job *j) { assert(j); + /* After deserialization is complete and the bus connection + * set up again, let's start watching our subscribers again */ + r = bus_track_coldplug(j->manager, &j->subscribed, &j->deserialized_subscribed); + if (r < 0) + return r; + if (j->begin_usec == 0 || j->unit->job_timeout == 0) return 0; diff --git a/src/core/job.h b/src/core/job.h index 8cc3a02192..30d41d9edd 100644 --- a/src/core/job.h +++ b/src/core/job.h @@ -146,7 +146,8 @@ struct Job { usec_t begin_usec; /* There can be more than one client, because of job merging. */ - Set *subscribed; + sd_bus_track *subscribed; + char **deserialized_subscribed; JobResult result; diff --git a/src/core/manager.c b/src/core/manager.c index f5801b4749..9172a244ae 100644 --- a/src/core/manager.c +++ b/src/core/manager.c @@ -839,7 +839,7 @@ int manager_enumerate(Manager *m) { } static int manager_coldplug(Manager *m) { - int r = 0, q; + int r = 0; Iterator i; Unit *u; char *k; @@ -848,12 +848,14 @@ static int manager_coldplug(Manager *m) { /* Then, let's set up their initial state. */ HASHMAP_FOREACH_KEY(u, k, m->units, i) { + int q; /* ignore aliases */ if (u->id != k) continue; - if ((q = unit_coldplug(u)) < 0) + q = unit_coldplug(u); + if (q < 0) r = q; } @@ -996,6 +998,7 @@ int manager_startup(Manager *m, FILE *serialization, FDSet *fds) { * didn't, then let's create the bus now. */ manager_setup_kdbus(m); manager_connect_bus(m, !!serialization); + bus_track_coldplug(m, &m->subscribed, &m->deserialized_subscribed); /* Third, fire things up! */ q = manager_coldplug(m); @@ -2102,7 +2105,7 @@ int manager_serialize(Manager *m, FILE *f, FDSet *fds, bool switching_root) { fprintf(f, "kdbus-fd=%i\n", copy); } - bus_serialize(m, f); + bus_track_serialize(m->subscribed, f); fputc('\n', f); @@ -2279,7 +2282,7 @@ int manager_deserialize(Manager *m, FILE *f, FDSet *fds) { m->kdbus_fd = fdset_remove(fds, fd); } - } else if (bus_deserialize_item(m, l) == 0) + } else if (bus_track_deserialize_item(&m->deserialized_subscribed, l) == 0) log_debug("Unknown serialization item '%s'", l); } diff --git a/src/core/manager.h b/src/core/manager.h index 9dee48ddde..398c8e642e 100644 --- a/src/core/manager.h +++ b/src/core/manager.h @@ -167,7 +167,13 @@ struct Manager { Set *private_buses; int private_listen_fd; sd_event_source *private_listen_event_source; - Set *subscribed; + + /* Contains all the clients that are subscribed to signals via + the API bus. Note that private bus connections are always + considered subscribes, since they last for very short only, + and it is much simpler that way. */ + sd_bus_track *subscribed; + char **deserialized_subscribed; sd_bus_message *queued_message; /* This is used during reloading: * before the reload we queue the diff --git a/src/core/unit.c b/src/core/unit.c index 05470739d2..1c0b0c72ac 100644 --- a/src/core/unit.c +++ b/src/core/unit.c @@ -328,7 +328,8 @@ void unit_add_to_dbus_queue(Unit *u) { return; /* Shortcut things if nobody cares */ - if (set_isempty(u->manager->subscribed)) { + if (sd_bus_track_count(u->manager->subscribed) <= 0 && + set_isempty(u->manager->private_buses)) { u->sent_dbus_new_signal = true; return; } diff --git a/src/libsystemd/libsystemd.sym.m4 b/src/libsystemd/libsystemd.sym.m4 index 8d34615bc7..c708c7f38f 100644 --- a/src/libsystemd/libsystemd.sym.m4 +++ b/src/libsystemd/libsystemd.sym.m4 @@ -313,6 +313,18 @@ m4_ifdef(`ENABLE_KDBUS', sd_bus_error_has_name; sd_bus_label_escape; sd_bus_label_unescape; + sd_bus_track_new; + sd_bus_track_ref; + sd_bus_track_unref; + sd_bus_track_get_bus; + sd_bus_track_add_sender; + sd_bus_track_remove_sender; + sd_bus_track_add_name; + sd_bus_track_remove_name; + sd_bus_track_count; + sd_bus_track_contains; + sd_bus_track_first; + sd_bus_track_next; /* sd-memfd */ sd_memfd_new; diff --git a/src/libsystemd/sd-bus/bus-internal.h b/src/libsystemd/sd-bus/bus-internal.h index a4160ef4a1..ae5f66288f 100644 --- a/src/libsystemd/sd-bus/bus-internal.h +++ b/src/libsystemd/sd-bus/bus-internal.h @@ -274,6 +274,8 @@ struct sd_bus { size_t bloom_size; unsigned bloom_n_hash; + + sd_bus_track *track_queue; }; #define BUS_DEFAULT_TIMEOUT ((usec_t) (25 * USEC_PER_SEC)) diff --git a/src/libsystemd/sd-bus/bus-track.c b/src/libsystemd/sd-bus/bus-track.c new file mode 100644 index 0000000000..4a5056e434 --- /dev/null +++ b/src/libsystemd/sd-bus/bus-track.c @@ -0,0 +1,314 @@ +/*-*- Mode: C; c-basic-offset: 8; indent-tabs-mode: nil -*-*/ + +/*** + This file is part of systemd. + + Copyright 2013 Lennart Poettering + + systemd is free software; you can redistribute it and/or modify it + under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 2.1 of the License, or + (at your option) any later version. + + systemd is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with systemd; If not, see <http://www.gnu.org/licenses/>. +***/ + +#include "sd-bus.h" +#include "set.h" +#include "bus-internal.h" +#include "bus-track.h" + +struct sd_bus_track { + unsigned n_ref; + sd_bus *bus; + sd_bus_track_handler_t handler; + void *userdata; + Set *names; + LIST_FIELDS(sd_bus_track, queue); + Iterator iterator; + bool in_queue; + bool modified; +}; + +#define MATCH_PREFIX \ + "type='signal'," \ + "sender='org.freedesktop.DBus'," \ + "path='/org/freedesktop/DBus'," \ + "interface='org.freedesktop.DBus'," \ + "member='NameOwnerChanged'," \ + "arg0='" + +#define MATCH_SUFFIX \ + "'" + +#define MATCH_FOR_NAME(name) \ + ({ \ + char *_x; \ + size_t _l = strlen(name); \ + _x = alloca(sizeof(MATCH_PREFIX)-1+_l+sizeof(MATCH_SUFFIX)); \ + strcpy(stpcpy(stpcpy(_x, MATCH_PREFIX), name), MATCH_SUFFIX); \ + _x; \ + }) + +static void bus_track_add_to_queue(sd_bus_track *track) { + assert(track); + + if (track->in_queue) + return; + + if (!track->handler) + return; + + LIST_PREPEND(queue, track->bus->track_queue, track); + track->in_queue = true; +} + +static void bus_track_remove_from_queue(sd_bus_track *track) { + assert(track); + + if (!track->in_queue) + return; + + LIST_REMOVE(queue, track->bus->track_queue, track); + track->in_queue = false; +} + +_public_ int sd_bus_track_new( + sd_bus *bus, + sd_bus_track **track, + sd_bus_track_handler_t handler, + void *userdata) { + + sd_bus_track *t; + + assert_return(bus, -EINVAL); + assert_return(track, -EINVAL); + + t = new0(sd_bus_track, 1); + if (!t) + return -ENOMEM; + + t->n_ref = 1; + t->handler = handler; + t->userdata = userdata; + t->bus = sd_bus_ref(bus); + + bus_track_add_to_queue(t); + + *track = t; + return 0; +} + +_public_ sd_bus_track* sd_bus_track_ref(sd_bus_track *track) { + assert_return(track, NULL); + + assert(track->n_ref > 0); + + track->n_ref++; + + return track; +} + +_public_ sd_bus_track* sd_bus_track_unref(sd_bus_track *track) { + const char *n; + + if (!track) + return NULL; + + assert(track->n_ref > 0); + + if (track->n_ref > 1) { + track->n_ref --; + return NULL; + } + + while ((n = set_first(track->names))) + sd_bus_track_remove_name(track, n); + + bus_track_remove_from_queue(track); + set_free(track->names); + sd_bus_unref(track->bus); + free(track); + + return NULL; +} + +static int on_name_owner_changed(sd_bus *bus, sd_bus_message *message, void *userdata, sd_bus_error *error) { + sd_bus_track *track = userdata; + const char *name, *old, *new; + int r; + + assert(bus); + assert(message); + assert(track); + + r = sd_bus_message_read(message, "sss", &name, &old, &new); + if (r < 0) + return 0; + + sd_bus_track_remove_name(track, name); + return 0; +} + +_public_ int sd_bus_track_add_name(sd_bus_track *track, const char *name) { + _cleanup_free_ char *n = NULL; + const char *match; + int r; + + assert_return(track, -EINVAL); + assert_return(service_name_is_valid(name), -EINVAL); + + r = set_ensure_allocated(&track->names, string_hash_func, string_compare_func); + if (r < 0) + return r; + + n = strdup(name); + if (!n) + return -ENOMEM; + + r = set_put(track->names, n); + if (r == -EEXIST) + return 0; + if (r < 0) + return r; + + /* First, subscribe to this name */ + match = MATCH_FOR_NAME(name); + r = sd_bus_add_match(track->bus, match, on_name_owner_changed, track); + if (r < 0) { + set_remove(track->names, n); + return r; + } + + /* Second, check if it is currently existing, or maybe + * doesn't, or maybe disappeared already. */ + r = sd_bus_get_owner(track->bus, name, 0, NULL); + if (r < 0) { + set_remove(track->names, n); + sd_bus_remove_match(track->bus, match, on_name_owner_changed, track); + return r; + } + + n = NULL; + + bus_track_remove_from_queue(track); + track->modified = true; + + return 1; +} + +_public_ int sd_bus_track_remove_name(sd_bus_track *track, const char *name) { + const char *match; + _cleanup_free_ char *n; + + assert_return(name, -EINVAL); + + if (!track) + return 0; + + n = set_remove(track->names, (char*) name); + if (!n) + return 0; + + match = MATCH_FOR_NAME(n); + sd_bus_remove_match(track->bus, match, on_name_owner_changed, track); + + if (set_isempty(track->names)) + bus_track_add_to_queue(track); + + track->modified = true; + + return 1; +} + +_public_ unsigned sd_bus_track_count(sd_bus_track *track) { + if (!track) + return 0; + + return set_size(track->names); +} + +_public_ const char* sd_bus_track_contains(sd_bus_track *track, const char *name) { + assert_return(track, NULL); + assert_return(name, NULL); + + return set_get(track->names, (void*) name); +} + +_public_ const char* sd_bus_track_first(sd_bus_track *track) { + if (!track) + return NULL; + + track->modified = false; + track->iterator = NULL; + + return set_iterate(track->names, &track->iterator); +} + +_public_ const char* sd_bus_track_next(sd_bus_track *track) { + if (!track) + return NULL; + + if (track->modified) + return NULL; + + return set_iterate(track->names, &track->iterator); +} + +_public_ int sd_bus_track_add_sender(sd_bus_track *track, sd_bus_message *m) { + const char *sender; + + assert_return(track, -EINVAL); + assert_return(m, -EINVAL); + + sender = sd_bus_message_get_sender(m); + if (!sender) + return -EINVAL; + + return sd_bus_track_add_name(track, sender); +} + +_public_ int sd_bus_track_remove_sender(sd_bus_track *track, sd_bus_message *m) { + const char *sender; + + assert_return(track, -EINVAL); + assert_return(m, -EINVAL); + + sender = sd_bus_message_get_sender(m); + if (!sender) + return -EINVAL; + + return sd_bus_track_remove_name(track, sender); +} + +_public_ sd_bus* sd_bus_track_get_bus(sd_bus_track *track) { + assert_return(track, NULL); + + return track->bus; +} + +void bus_track_dispatch(sd_bus_track *track) { + int r; + + assert(track); + assert(track->in_queue); + assert(track->handler); + + bus_track_remove_from_queue(track); + + sd_bus_track_ref(track); + + r = track->handler(track, track->userdata); + if (r < 0) + log_debug("Failed to process track handler: %s", strerror(-r)); + else if (r == 0) + bus_track_add_to_queue(track); + + sd_bus_track_unref(track); +} diff --git a/src/core/dbus-client-track.h b/src/libsystemd/sd-bus/bus-track.h index 01676479b0..f8690a5239 100644 --- a/src/core/dbus-client-track.h +++ b/src/libsystemd/sd-bus/bus-track.h @@ -21,22 +21,4 @@ along with systemd; If not, see <http://www.gnu.org/licenses/>. ***/ -#include "sd-bus.h" -#include "set.h" -#include "manager.h" - -typedef struct BusTrackedClient { - Set *set; - sd_bus *bus; - char name[0]; -} BusTrackedClient; - -int bus_client_track(Set **s, sd_bus *bus, const char *name); - -int bus_client_untrack(Set *s, sd_bus *bus, const char *name); -int bus_client_untrack_bus(Set *s, sd_bus *bus); - -void bus_client_track_free(Set *s); - -void bus_client_track_serialize(Manager *m, FILE *f, Set *s); -int bus_client_track_deserialize_item(Manager *m, Set **s, const char *line); +void bus_track_dispatch(sd_bus_track *track); diff --git a/src/libsystemd/sd-bus/sd-bus.c b/src/libsystemd/sd-bus/sd-bus.c index 636715f759..fbf1a5919f 100644 --- a/src/libsystemd/sd-bus/sd-bus.c +++ b/src/libsystemd/sd-bus/sd-bus.c @@ -51,6 +51,7 @@ #include "bus-util.h" #include "bus-container.h" #include "bus-protocol.h" +#include "bus-track.h" static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec); static int attach_io_events(sd_bus *b); @@ -131,6 +132,8 @@ static void bus_free(sd_bus *b) { assert(b); + assert(!b->track_queue); + sd_bus_detach_event(b); if (b->default_bus_ptr) @@ -2000,6 +2003,11 @@ _public_ int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) { assert_return(BUS_IS_OPEN(bus->state) || bus->state == BUS_CLOSING, -ENOTCONN); assert_return(!bus_pid_changed(bus), -ECHILD); + if (bus->track_queue) { + *timeout_usec = 0; + return 1; + } + if (bus->state == BUS_CLOSING) { *timeout_usec = 0; return 1; @@ -2282,6 +2290,16 @@ finish: return r; } +static int dispatch_track(sd_bus *bus) { + assert(bus); + + if (!bus->track_queue) + return 0; + + bus_track_dispatch(bus->track_queue); + return 1; +} + static int process_running(sd_bus *bus, bool hint_priority, int64_t priority, sd_bus_message **ret) { _cleanup_bus_message_unref_ sd_bus_message *m = NULL; int r; @@ -2297,6 +2315,10 @@ static int process_running(sd_bus *bus, bool hint_priority, int64_t priority, sd if (r != 0) goto null_message; + r = dispatch_track(bus); + if (r != 0) + goto null_message; + r = dispatch_rqueue(bus, hint_priority, priority, &m); if (r < 0) return r; diff --git a/src/systemd/sd-bus.h b/src/systemd/sd-bus.h index 0629e47dea..e26ca6b704 100644 --- a/src/systemd/sd-bus.h +++ b/src/systemd/sd-bus.h @@ -38,6 +38,7 @@ _SD_BEGIN_DECLARATIONS; typedef struct sd_bus sd_bus; typedef struct sd_bus_message sd_bus_message; typedef struct sd_bus_creds sd_bus_creds; +typedef struct sd_bus_track sd_bus_track; typedef struct { const char *name; @@ -89,6 +90,7 @@ typedef int (*sd_bus_property_get_t) (sd_bus *bus, const char *path, const char typedef int (*sd_bus_property_set_t) (sd_bus *bus, const char *path, const char *interface, const char *property, sd_bus_message *value, void *userdata, sd_bus_error *ret_error); typedef int (*sd_bus_object_find_t) (sd_bus *bus, const char *path, const char *interface, void *userdata, void **ret_found, sd_bus_error *ret_error); typedef int (*sd_bus_node_enumerator_t) (sd_bus *bus, const char *path, void *userdata, char ***ret_nodes, sd_bus_error *ret_error); +typedef int (*sd_bus_track_handler_t) (sd_bus_track *track, void *userdata); #include "sd-bus-protocol.h" #include "sd-bus-vtable.h" @@ -350,6 +352,23 @@ int sd_bus_error_has_name(const sd_bus_error *e, const char *name); char *sd_bus_label_escape(const char *s); char *sd_bus_label_unescape(const char *f); +/* Tracking peers */ + +int sd_bus_track_new(sd_bus *bus, sd_bus_track **track, sd_bus_track_handler_t handler, void *userdata); +sd_bus_track* sd_bus_track_ref(sd_bus_track *track); +sd_bus_track* sd_bus_track_unref(sd_bus_track *track); +sd_bus* sd_bus_track_get_bus(sd_bus_track *track); + +int sd_bus_track_add_sender(sd_bus_track *track, sd_bus_message *m); +int sd_bus_track_remove_sender(sd_bus_track *track, sd_bus_message *m); +int sd_bus_track_add_name(sd_bus_track *track, const char *name); +int sd_bus_track_remove_name(sd_bus_track *track, const char *name); + +unsigned sd_bus_track_count(sd_bus_track *track); +const char* sd_bus_track_contains(sd_bus_track *track, const char *names); +const char* sd_bus_track_first(sd_bus_track *track); +const char* sd_bus_track_next(sd_bus_track *track); + _SD_END_DECLARATIONS; #endif |