summaryrefslogtreecommitdiff
path: root/src/libsystemd-bus/sd-bus.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/libsystemd-bus/sd-bus.c')
-rw-r--r--src/libsystemd-bus/sd-bus.c224
1 files changed, 182 insertions, 42 deletions
diff --git a/src/libsystemd-bus/sd-bus.c b/src/libsystemd-bus/sd-bus.c
index c8248e19aa..bc88ac977c 100644
--- a/src/libsystemd-bus/sd-bus.c
+++ b/src/libsystemd-bus/sd-bus.c
@@ -1204,6 +1204,18 @@ _public_ void sd_bus_close(sd_bus *bus) {
* freed. */
}
+static void bus_enter_closing(sd_bus *bus) {
+ assert(bus);
+
+ if (bus->state != BUS_OPENING &&
+ bus->state != BUS_AUTHENTICATING &&
+ bus->state != BUS_HELLO &&
+ bus->state != BUS_RUNNING)
+ return;
+
+ bus->state = BUS_CLOSING;
+}
+
_public_ sd_bus *sd_bus_ref(sd_bus *bus) {
assert_return(bus, NULL);
@@ -1282,6 +1294,20 @@ static int bus_seal_message(sd_bus *b, sd_bus_message *m) {
return bus_message_seal(m, ++b->serial);
}
+static int bus_write_message(sd_bus *bus, sd_bus_message *message, size_t *idx) {
+ int r;
+
+ assert(bus);
+ assert(message);
+
+ if (bus->is_kernel)
+ r = bus_kernel_write_message(bus, message);
+ else
+ r = bus_socket_write_message(bus, message, idx);
+
+ return r;
+}
+
static int dispatch_wqueue(sd_bus *bus) {
int r, ret = 0;
@@ -1290,15 +1316,10 @@ static int dispatch_wqueue(sd_bus *bus) {
while (bus->wqueue_size > 0) {
- if (bus->is_kernel)
- r = bus_kernel_write_message(bus, bus->wqueue[0]);
- else
- r = bus_socket_write_message(bus, bus->wqueue[0], &bus->windex);
-
- if (r < 0) {
- sd_bus_close(bus);
+ r = bus_write_message(bus, bus->wqueue[0], &bus->windex);
+ if (r < 0)
return r;
- } else if (r == 0)
+ else if (r == 0)
/* Didn't do anything this time */
return ret;
else if (bus->is_kernel || bus->windex >= BUS_MESSAGE_SIZE(bus->wqueue[0])) {
@@ -1324,6 +1345,20 @@ static int dispatch_wqueue(sd_bus *bus) {
return ret;
}
+static int bus_read_message(sd_bus *bus, sd_bus_message **m) {
+ int r;
+
+ assert(bus);
+ assert(m);
+
+ if (bus->is_kernel)
+ r = bus_kernel_read_message(bus, m);
+ else
+ r = bus_socket_read_message(bus, m);
+
+ return r;
+}
+
static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
sd_bus_message *z = NULL;
int r, ret = 0;
@@ -1343,15 +1378,9 @@ static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) {
/* Try to read a new message */
do {
- if (bus->is_kernel)
- r = bus_kernel_read_message(bus, &z);
- else
- r = bus_socket_read_message(bus, &z);
-
- if (r < 0) {
- sd_bus_close(bus);
+ r = bus_read_message(bus, &z);
+ if (r < 0)
return r;
- }
if (r == 0)
return ret;
@@ -1395,15 +1424,10 @@ _public_ int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) {
if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) {
size_t idx = 0;
- if (bus->is_kernel)
- r = bus_kernel_write_message(bus, m);
- else
- r = bus_socket_write_message(bus, m, &idx);
-
- if (r < 0) {
- sd_bus_close(bus);
+ r = bus_write_message(bus, m, &idx);
+ if (r < 0)
return r;
- } else if (!bus->is_kernel && idx < BUS_MESSAGE_SIZE(m)) {
+ else if (!bus->is_kernel && idx < BUS_MESSAGE_SIZE(m)) {
/* Wasn't fully written. So let's remember how
* much was written. Note that the first entry
* of the wqueue array is always allocated so
@@ -1573,7 +1597,7 @@ int bus_ensure_running(sd_bus *bus) {
assert(bus);
- if (bus->state == BUS_UNSET || bus->state == BUS_CLOSED)
+ if (bus->state == BUS_UNSET || bus->state == BUS_CLOSED || bus->state == BUS_CLOSING)
return -ENOTCONN;
if (bus->state == BUS_RUNNING)
return 1;
@@ -1644,12 +1668,10 @@ _public_ int sd_bus_call(
room = true;
}
- if (bus->is_kernel)
- r = bus_kernel_read_message(bus, &incoming);
- else
- r = bus_socket_read_message(bus, &incoming);
+ r = bus_read_message(bus, &incoming);
if (r < 0)
return r;
+
if (incoming) {
if (incoming->reply_serial == serial) {
@@ -1731,7 +1753,6 @@ _public_ int sd_bus_call(
_public_ int sd_bus_get_fd(sd_bus *bus) {
assert_return(bus, -EINVAL);
- assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
assert_return(bus->input_fd == bus->output_fd, -EPERM);
assert_return(!bus_pid_changed(bus), -ECHILD);
@@ -1742,7 +1763,7 @@ _public_ int sd_bus_get_events(sd_bus *bus) {
int flags = 0;
assert_return(bus, -EINVAL);
- assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
+ assert_return(BUS_IS_OPEN(bus->state) || bus->state == BUS_CLOSING, -ENOTCONN);
assert_return(!bus_pid_changed(bus), -ECHILD);
if (bus->state == BUS_OPENING)
@@ -1769,9 +1790,14 @@ _public_ int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) {
assert_return(bus, -EINVAL);
assert_return(timeout_usec, -EINVAL);
- assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
+ assert_return(BUS_IS_OPEN(bus->state) || bus->state == BUS_CLOSING, -ENOTCONN);
assert_return(!bus_pid_changed(bus), -ECHILD);
+ if (bus->state == BUS_CLOSING) {
+ *timeout_usec = 0;
+ return 1;
+ }
+
if (bus->state == BUS_AUTHENTICATING) {
*timeout_usec = bus->auth_timeout;
return 1;
@@ -1821,12 +1847,21 @@ static int process_timeout(sd_bus *bus) {
if (r < 0)
return r;
+ r = bus_seal_message(bus, m);
+ if (r < 0)
+ return r;
+
assert_se(prioq_pop(bus->reply_callbacks_prioq) == c);
hashmap_remove(bus->reply_callbacks, &c->serial);
+ bus->current = m;
+ bus->iteration_counter ++;
+
r = c->callback(bus, m, c->userdata);
free(c);
+ bus->current = NULL;
+
return r < 0 ? r : 1;
}
@@ -1877,7 +1912,7 @@ static int process_reply(sd_bus *bus, sd_bus_message *m) {
r = c->callback(bus, m, c->userdata);
free(c);
- return r;
+ return r < 0 ? r : 1;
}
static int process_filter(sd_bus *bus, sd_bus_message *m) {
@@ -2078,6 +2113,85 @@ null_message:
return r;
}
+static int process_closing(sd_bus *bus, sd_bus_message **ret) {
+ _cleanup_bus_message_unref_ sd_bus_message *m = NULL;
+ struct reply_callback *c;
+ int r;
+
+ assert(bus);
+ assert(bus->state == BUS_CLOSING);
+
+ c = hashmap_first(bus->reply_callbacks);
+ if (c) {
+ /* First, fail all outstanding method calls */
+ r = bus_message_new_synthetic_error(
+ bus,
+ c->serial,
+ &SD_BUS_ERROR_MAKE(SD_BUS_ERROR_NO_REPLY, "Connection terminated"),
+ &m);
+ if (r < 0)
+ return r;
+
+ r = bus_seal_message(bus, m);
+ if (r < 0)
+ return r;
+
+ if (c->timeout != 0)
+ prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx);
+
+ hashmap_remove(bus->reply_callbacks, &c->serial);
+
+ bus->current = m;
+ bus->iteration_counter++;
+
+ r = c->callback(bus, m, c->userdata);
+ free(c);
+
+ if (r >= 0)
+ r = 1;
+
+ goto finish;
+ }
+
+ /* Then, synthesize a Disconnected message */
+ r = sd_bus_message_new_signal(
+ bus,
+ "/org/freedesktop/DBus/Local",
+ "org.freedesktop.DBus.Local",
+ "Disconnected",
+ &m);
+ if (r < 0)
+ return r;
+
+ r = bus_seal_message(bus, m);
+ if (r < 0)
+ return r;
+
+ sd_bus_close(bus);
+
+ bus->current = m;
+ bus->iteration_counter++;
+
+ r = process_filter(bus, m);
+ if (r != 0)
+ goto finish;
+
+ r = process_match(bus, m);
+ if (r != 0)
+ goto finish;
+
+ if (ret) {
+ *ret = m;
+ m = NULL;
+ }
+
+ r = 1;
+
+finish:
+ bus->current = NULL;
+ return r;
+}
+
_public_ int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
BUS_DONT_DESTROY(bus);
int r;
@@ -2091,7 +2205,7 @@ _public_ int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
assert_return(!bus_pid_changed(bus), -ECHILD);
/* We don't allow recursively invoking sd_bus_process(). */
- assert_return(!bus->processing, -EBUSY);
+ assert_return(!bus->current, -EBUSY);
switch (bus->state) {
@@ -2101,29 +2215,43 @@ _public_ int sd_bus_process(sd_bus *bus, sd_bus_message **ret) {
case BUS_OPENING:
r = bus_socket_process_opening(bus);
- if (r < 0)
+ if (r == -ECONNRESET || r == -EPIPE) {
+ bus_enter_closing(bus);
+ r = 1;
+ } else if (r < 0)
return r;
if (ret)
*ret = NULL;
return r;
case BUS_AUTHENTICATING:
-
r = bus_socket_process_authenticating(bus);
- if (r < 0)
+ if (r == -ECONNRESET || r == -EPIPE) {
+ bus_enter_closing(bus);
+ r = 1;
+ } else if (r < 0)
return r;
+
if (ret)
*ret = NULL;
+
return r;
case BUS_RUNNING:
case BUS_HELLO:
-
- bus->processing = true;
r = process_running(bus, ret);
- bus->processing = false;
+ if (r == -ECONNRESET || r == -EPIPE) {
+ bus_enter_closing(bus);
+ r = 1;
+
+ if (ret)
+ *ret = NULL;
+ }
return r;
+
+ case BUS_CLOSING:
+ return process_closing(bus, ret);
}
assert_not_reached("Unknown state");
@@ -2136,6 +2264,10 @@ static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
usec_t m = (usec_t) -1;
assert(bus);
+
+ if (bus->state == BUS_CLOSING)
+ return 1;
+
assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
e = sd_bus_get_events(bus);
@@ -2186,9 +2318,13 @@ static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) {
_public_ int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) {
assert_return(bus, -EINVAL);
- assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
assert_return(!bus_pid_changed(bus), -ECHILD);
+ if (bus->state == BUS_CLOSING)
+ return 0;
+
+ assert_return(BUS_IS_OPEN(bus->state) , -ENOTCONN);
+
if (bus->rqueue_size > 0)
return 0;
@@ -2199,9 +2335,13 @@ _public_ int sd_bus_flush(sd_bus *bus) {
int r;
assert_return(bus, -EINVAL);
- assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
assert_return(!bus_pid_changed(bus), -ECHILD);
+ if (bus->state == BUS_CLOSING)
+ return 0;
+
+ assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN);
+
r = bus_ensure_running(bus);
if (r < 0)
return r;