diff options
Diffstat (limited to 'src/libsystemd-bus/sd-bus.c')
-rw-r--r-- | src/libsystemd-bus/sd-bus.c | 224 |
1 files changed, 182 insertions, 42 deletions
diff --git a/src/libsystemd-bus/sd-bus.c b/src/libsystemd-bus/sd-bus.c index c8248e19aa..bc88ac977c 100644 --- a/src/libsystemd-bus/sd-bus.c +++ b/src/libsystemd-bus/sd-bus.c @@ -1204,6 +1204,18 @@ _public_ void sd_bus_close(sd_bus *bus) { * freed. */ } +static void bus_enter_closing(sd_bus *bus) { + assert(bus); + + if (bus->state != BUS_OPENING && + bus->state != BUS_AUTHENTICATING && + bus->state != BUS_HELLO && + bus->state != BUS_RUNNING) + return; + + bus->state = BUS_CLOSING; +} + _public_ sd_bus *sd_bus_ref(sd_bus *bus) { assert_return(bus, NULL); @@ -1282,6 +1294,20 @@ static int bus_seal_message(sd_bus *b, sd_bus_message *m) { return bus_message_seal(m, ++b->serial); } +static int bus_write_message(sd_bus *bus, sd_bus_message *message, size_t *idx) { + int r; + + assert(bus); + assert(message); + + if (bus->is_kernel) + r = bus_kernel_write_message(bus, message); + else + r = bus_socket_write_message(bus, message, idx); + + return r; +} + static int dispatch_wqueue(sd_bus *bus) { int r, ret = 0; @@ -1290,15 +1316,10 @@ static int dispatch_wqueue(sd_bus *bus) { while (bus->wqueue_size > 0) { - if (bus->is_kernel) - r = bus_kernel_write_message(bus, bus->wqueue[0]); - else - r = bus_socket_write_message(bus, bus->wqueue[0], &bus->windex); - - if (r < 0) { - sd_bus_close(bus); + r = bus_write_message(bus, bus->wqueue[0], &bus->windex); + if (r < 0) return r; - } else if (r == 0) + else if (r == 0) /* Didn't do anything this time */ return ret; else if (bus->is_kernel || bus->windex >= BUS_MESSAGE_SIZE(bus->wqueue[0])) { @@ -1324,6 +1345,20 @@ static int dispatch_wqueue(sd_bus *bus) { return ret; } +static int bus_read_message(sd_bus *bus, sd_bus_message **m) { + int r; + + assert(bus); + assert(m); + + if (bus->is_kernel) + r = bus_kernel_read_message(bus, m); + else + r = bus_socket_read_message(bus, m); + + return r; +} + static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) { sd_bus_message *z = NULL; int r, ret = 0; @@ -1343,15 +1378,9 @@ static int dispatch_rqueue(sd_bus *bus, sd_bus_message **m) { /* Try to read a new message */ do { - if (bus->is_kernel) - r = bus_kernel_read_message(bus, &z); - else - r = bus_socket_read_message(bus, &z); - - if (r < 0) { - sd_bus_close(bus); + r = bus_read_message(bus, &z); + if (r < 0) return r; - } if (r == 0) return ret; @@ -1395,15 +1424,10 @@ _public_ int sd_bus_send(sd_bus *bus, sd_bus_message *m, uint64_t *serial) { if ((bus->state == BUS_RUNNING || bus->state == BUS_HELLO) && bus->wqueue_size <= 0) { size_t idx = 0; - if (bus->is_kernel) - r = bus_kernel_write_message(bus, m); - else - r = bus_socket_write_message(bus, m, &idx); - - if (r < 0) { - sd_bus_close(bus); + r = bus_write_message(bus, m, &idx); + if (r < 0) return r; - } else if (!bus->is_kernel && idx < BUS_MESSAGE_SIZE(m)) { + else if (!bus->is_kernel && idx < BUS_MESSAGE_SIZE(m)) { /* Wasn't fully written. So let's remember how * much was written. Note that the first entry * of the wqueue array is always allocated so @@ -1573,7 +1597,7 @@ int bus_ensure_running(sd_bus *bus) { assert(bus); - if (bus->state == BUS_UNSET || bus->state == BUS_CLOSED) + if (bus->state == BUS_UNSET || bus->state == BUS_CLOSED || bus->state == BUS_CLOSING) return -ENOTCONN; if (bus->state == BUS_RUNNING) return 1; @@ -1644,12 +1668,10 @@ _public_ int sd_bus_call( room = true; } - if (bus->is_kernel) - r = bus_kernel_read_message(bus, &incoming); - else - r = bus_socket_read_message(bus, &incoming); + r = bus_read_message(bus, &incoming); if (r < 0) return r; + if (incoming) { if (incoming->reply_serial == serial) { @@ -1731,7 +1753,6 @@ _public_ int sd_bus_call( _public_ int sd_bus_get_fd(sd_bus *bus) { assert_return(bus, -EINVAL); - assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN); assert_return(bus->input_fd == bus->output_fd, -EPERM); assert_return(!bus_pid_changed(bus), -ECHILD); @@ -1742,7 +1763,7 @@ _public_ int sd_bus_get_events(sd_bus *bus) { int flags = 0; assert_return(bus, -EINVAL); - assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN); + assert_return(BUS_IS_OPEN(bus->state) || bus->state == BUS_CLOSING, -ENOTCONN); assert_return(!bus_pid_changed(bus), -ECHILD); if (bus->state == BUS_OPENING) @@ -1769,9 +1790,14 @@ _public_ int sd_bus_get_timeout(sd_bus *bus, uint64_t *timeout_usec) { assert_return(bus, -EINVAL); assert_return(timeout_usec, -EINVAL); - assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN); + assert_return(BUS_IS_OPEN(bus->state) || bus->state == BUS_CLOSING, -ENOTCONN); assert_return(!bus_pid_changed(bus), -ECHILD); + if (bus->state == BUS_CLOSING) { + *timeout_usec = 0; + return 1; + } + if (bus->state == BUS_AUTHENTICATING) { *timeout_usec = bus->auth_timeout; return 1; @@ -1821,12 +1847,21 @@ static int process_timeout(sd_bus *bus) { if (r < 0) return r; + r = bus_seal_message(bus, m); + if (r < 0) + return r; + assert_se(prioq_pop(bus->reply_callbacks_prioq) == c); hashmap_remove(bus->reply_callbacks, &c->serial); + bus->current = m; + bus->iteration_counter ++; + r = c->callback(bus, m, c->userdata); free(c); + bus->current = NULL; + return r < 0 ? r : 1; } @@ -1877,7 +1912,7 @@ static int process_reply(sd_bus *bus, sd_bus_message *m) { r = c->callback(bus, m, c->userdata); free(c); - return r; + return r < 0 ? r : 1; } static int process_filter(sd_bus *bus, sd_bus_message *m) { @@ -2078,6 +2113,85 @@ null_message: return r; } +static int process_closing(sd_bus *bus, sd_bus_message **ret) { + _cleanup_bus_message_unref_ sd_bus_message *m = NULL; + struct reply_callback *c; + int r; + + assert(bus); + assert(bus->state == BUS_CLOSING); + + c = hashmap_first(bus->reply_callbacks); + if (c) { + /* First, fail all outstanding method calls */ + r = bus_message_new_synthetic_error( + bus, + c->serial, + &SD_BUS_ERROR_MAKE(SD_BUS_ERROR_NO_REPLY, "Connection terminated"), + &m); + if (r < 0) + return r; + + r = bus_seal_message(bus, m); + if (r < 0) + return r; + + if (c->timeout != 0) + prioq_remove(bus->reply_callbacks_prioq, c, &c->prioq_idx); + + hashmap_remove(bus->reply_callbacks, &c->serial); + + bus->current = m; + bus->iteration_counter++; + + r = c->callback(bus, m, c->userdata); + free(c); + + if (r >= 0) + r = 1; + + goto finish; + } + + /* Then, synthesize a Disconnected message */ + r = sd_bus_message_new_signal( + bus, + "/org/freedesktop/DBus/Local", + "org.freedesktop.DBus.Local", + "Disconnected", + &m); + if (r < 0) + return r; + + r = bus_seal_message(bus, m); + if (r < 0) + return r; + + sd_bus_close(bus); + + bus->current = m; + bus->iteration_counter++; + + r = process_filter(bus, m); + if (r != 0) + goto finish; + + r = process_match(bus, m); + if (r != 0) + goto finish; + + if (ret) { + *ret = m; + m = NULL; + } + + r = 1; + +finish: + bus->current = NULL; + return r; +} + _public_ int sd_bus_process(sd_bus *bus, sd_bus_message **ret) { BUS_DONT_DESTROY(bus); int r; @@ -2091,7 +2205,7 @@ _public_ int sd_bus_process(sd_bus *bus, sd_bus_message **ret) { assert_return(!bus_pid_changed(bus), -ECHILD); /* We don't allow recursively invoking sd_bus_process(). */ - assert_return(!bus->processing, -EBUSY); + assert_return(!bus->current, -EBUSY); switch (bus->state) { @@ -2101,29 +2215,43 @@ _public_ int sd_bus_process(sd_bus *bus, sd_bus_message **ret) { case BUS_OPENING: r = bus_socket_process_opening(bus); - if (r < 0) + if (r == -ECONNRESET || r == -EPIPE) { + bus_enter_closing(bus); + r = 1; + } else if (r < 0) return r; if (ret) *ret = NULL; return r; case BUS_AUTHENTICATING: - r = bus_socket_process_authenticating(bus); - if (r < 0) + if (r == -ECONNRESET || r == -EPIPE) { + bus_enter_closing(bus); + r = 1; + } else if (r < 0) return r; + if (ret) *ret = NULL; + return r; case BUS_RUNNING: case BUS_HELLO: - - bus->processing = true; r = process_running(bus, ret); - bus->processing = false; + if (r == -ECONNRESET || r == -EPIPE) { + bus_enter_closing(bus); + r = 1; + + if (ret) + *ret = NULL; + } return r; + + case BUS_CLOSING: + return process_closing(bus, ret); } assert_not_reached("Unknown state"); @@ -2136,6 +2264,10 @@ static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) { usec_t m = (usec_t) -1; assert(bus); + + if (bus->state == BUS_CLOSING) + return 1; + assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN); e = sd_bus_get_events(bus); @@ -2186,9 +2318,13 @@ static int bus_poll(sd_bus *bus, bool need_more, uint64_t timeout_usec) { _public_ int sd_bus_wait(sd_bus *bus, uint64_t timeout_usec) { assert_return(bus, -EINVAL); - assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN); assert_return(!bus_pid_changed(bus), -ECHILD); + if (bus->state == BUS_CLOSING) + return 0; + + assert_return(BUS_IS_OPEN(bus->state) , -ENOTCONN); + if (bus->rqueue_size > 0) return 0; @@ -2199,9 +2335,13 @@ _public_ int sd_bus_flush(sd_bus *bus) { int r; assert_return(bus, -EINVAL); - assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN); assert_return(!bus_pid_changed(bus), -ECHILD); + if (bus->state == BUS_CLOSING) + return 0; + + assert_return(BUS_IS_OPEN(bus->state), -ENOTCONN); + r = bus_ensure_running(bus); if (r < 0) return r; |