From 4555ec72d6530fce4c978fd894ac22f7e006b0ee Mon Sep 17 00:00:00 2001 From: Tom Gundersen Date: Tue, 12 Nov 2013 22:37:51 +0100 Subject: rtnl: start adding support for asynchronous messaging Similarly to sd-bus, add: sd_rtnl_wait sd_rtnl_process sd_rtnl_send and adapt sd_rtnl_call accordingly. --- src/libsystemd-rtnl/rtnl-internal.h | 13 +- src/libsystemd-rtnl/rtnl-message.c | 2 +- src/libsystemd-rtnl/sd-rtnl.c | 300 +++++++++++++++++++++++++++++------- src/libsystemd-rtnl/test-rtnl.c | 6 +- src/systemd/sd-rtnl.h | 4 + 5 files changed, 267 insertions(+), 58 deletions(-) (limited to 'src') diff --git a/src/libsystemd-rtnl/rtnl-internal.h b/src/libsystemd-rtnl/rtnl-internal.h index b05290fd89..03297bb8de 100644 --- a/src/libsystemd-rtnl/rtnl-internal.h +++ b/src/libsystemd-rtnl/rtnl-internal.h @@ -35,6 +35,14 @@ struct sd_rtnl { struct sockaddr_nl nl; } sockaddr; + sd_rtnl_message **rqueue; + unsigned rqueue_size; + + sd_rtnl_message **wqueue; + unsigned wqueue_size; + + bool processing:1; + uint32_t serial; pid_t original_pid; @@ -42,8 +50,11 @@ struct sd_rtnl { #define RTNL_DEFAULT_TIMEOUT ((usec_t) (10 * USEC_PER_SEC)) +#define RTNL_WQUEUE_MAX 1024 +#define RTNL_RQUEUE_MAX 64*1024 + int message_get_errno(sd_rtnl_message *m); -int message_get_serial(sd_rtnl_message *m); +uint32_t message_get_serial(sd_rtnl_message *m); int message_seal(sd_rtnl *nl, sd_rtnl_message *m); int socket_write_message(sd_rtnl *nl, sd_rtnl_message *m); int socket_read_message(sd_rtnl *nl, sd_rtnl_message **ret); diff --git a/src/libsystemd-rtnl/rtnl-message.c b/src/libsystemd-rtnl/rtnl-message.c index 941bd96382..f7ff0a0148 100644 --- a/src/libsystemd-rtnl/rtnl-message.c +++ b/src/libsystemd-rtnl/rtnl-message.c @@ -367,7 +367,7 @@ int sd_rtnl_message_read(sd_rtnl_message *m, unsigned short *type, void **data) return message_read(m, type, data); } -int message_get_serial(sd_rtnl_message *m) { +uint32_t message_get_serial(sd_rtnl_message *m) { assert(m); return m->hdr->nlmsg_seq; diff --git a/src/libsystemd-rtnl/sd-rtnl.c b/src/libsystemd-rtnl/sd-rtnl.c index eb3b01b725..b375576835 100644 --- a/src/libsystemd-rtnl/sd-rtnl.c +++ b/src/libsystemd-rtnl/sd-rtnl.c @@ -46,6 +46,14 @@ static int sd_rtnl_new(sd_rtnl **ret) { rtnl->original_pid = getpid(); + /* We guarantee that wqueue always has space for at least + * one entry */ + rtnl->wqueue = new(sd_rtnl_message*, 1); + if (!rtnl->wqueue) { + free(rtnl); + return -ENOMEM; + } + *ret = rtnl; return 0; } @@ -98,23 +106,31 @@ sd_rtnl *sd_rtnl_ref(sd_rtnl *rtnl) { } sd_rtnl *sd_rtnl_unref(sd_rtnl *rtnl) { + if (rtnl && REFCNT_DEC(rtnl->n_ref) <= 0) { + unsigned i; + + for (i = 0; i < rtnl->rqueue_size; i++) + sd_rtnl_message_unref(rtnl->rqueue[i]); + free(rtnl->rqueue); + + for (i = 0; i < rtnl->wqueue_size; i++) + sd_rtnl_message_unref(rtnl->wqueue[i]); + free(rtnl->wqueue); + if (rtnl->fd >= 0) close_nointr_nofail(rtnl->fd); + free(rtnl); } return NULL; } -int sd_rtnl_call(sd_rtnl *nl, - sd_rtnl_message *message, - uint64_t usec, - sd_rtnl_message **ret) { - struct pollfd p[1] = {}; - struct timespec left; - usec_t timeout; - int r, serial; +int sd_rtnl_send(sd_rtnl *nl, + sd_rtnl_message *message, + uint32_t *serial) { + int r; assert_return(nl, -EINVAL); assert_return(!rtnl_pid_changed(nl), -ECHILD); @@ -124,82 +140,260 @@ int sd_rtnl_call(sd_rtnl *nl, if (r < 0) return r; - serial = message_get_serial(message); + if (nl->wqueue_size <= 0) { + /* send directly */ + r = socket_write_message(nl, message); + if (r < 0) + return r; + else if (r == 0) { + /* nothing was sent, so let's put it on + * the queue */ + nl->wqueue[0] = sd_rtnl_message_ref(message); + nl->wqueue_size = 1; + } + } else { + sd_rtnl_message **q; - p[0].fd = nl->fd; - p[0].events = POLLOUT; + /* append to queue */ + if (nl->wqueue_size >= RTNL_WQUEUE_MAX) + return -ENOBUFS; - if (usec == (uint64_t) -1) - timeout = 0; - else if (usec == 0) - timeout = now(CLOCK_MONOTONIC) + RTNL_DEFAULT_TIMEOUT; - else - timeout = now(CLOCK_MONOTONIC) + usec; + q = realloc(nl->wqueue, sizeof(sd_rtnl_message*) * (nl->wqueue_size + 1)); + if (!q) + return -ENOMEM; - for (;;) { - if (timeout) { - usec_t n; + nl->wqueue = q; + q[nl->wqueue_size ++] = sd_rtnl_message_ref(message); + } - n = now(CLOCK_MONOTONIC); - if (n >= timeout) - return -ETIMEDOUT; + if (serial) + *serial = message_get_serial(message); - timespec_store(&left, timeout - n); - } + return 1; +} - r = ppoll(p, 1, timeout ? &left : NULL, NULL); - if (r < 0) - return 0; +static int dispatch_rqueue(sd_rtnl *rtnl, sd_rtnl_message **message) { + sd_rtnl_message *z = NULL; + int r; - r = socket_write_message(nl, message); + assert(rtnl); + assert(message); + + if (rtnl->rqueue_size > 0) { + /* Dispatch a queued message */ + + *message = rtnl->rqueue[0]; + rtnl->rqueue_size --; + memmove(rtnl->rqueue, rtnl->rqueue + 1, sizeof(sd_rtnl_message*) * rtnl->rqueue_size); + + return 1; + } + + /* Try to read a new message */ + r = socket_read_message(rtnl, &z); + if (r < 0) + return r; + if (r == 0) + return 0; + + *message = z; + + return 1; +} + +static int dispatch_wqueue(sd_rtnl *rtnl) { + int r, ret = 0; + + assert(rtnl); + + while (rtnl->wqueue_size > 0) { + r = socket_write_message(rtnl, rtnl->wqueue[0]); if (r < 0) return r; - - if (r > 0) { - break; + else if (r == 0) + /* Didn't do anything this time */ + return ret; + else { + /* see equivalent in sd-bus.c */ + sd_rtnl_message_unref(rtnl->wqueue[0]); + rtnl->wqueue_size --; + memmove(rtnl->wqueue, rtnl->wqueue + 1, sizeof(sd_rtnl_message*) * rtnl->wqueue_size); + + ret = 1; } } + return ret; +} + +static int process_running(sd_rtnl *rtnl, sd_rtnl_message **ret) { + _cleanup_sd_rtnl_message_unref_ sd_rtnl_message *m = NULL; + int r; + + r = dispatch_wqueue(rtnl); + if (r != 0) + goto null_message; + + r = dispatch_rqueue(rtnl, &m); + if (r < 0) + return r; + if (!m) + goto null_message; + + if (ret) { + *ret = m; + m = NULL; + + return 1; + } + + return 1; + +null_message: + if (r >= 0 && ret) + *ret = NULL; + + return r; +} +int sd_rtnl_process(sd_rtnl *rtnl, sd_rtnl_message **ret) { + int r; + + assert_return(rtnl, -EINVAL); + assert_return(!rtnl_pid_changed(rtnl), -ECHILD); + assert_return(!rtnl->processing, -EBUSY); + + rtnl->processing = true; + r = process_running(rtnl, ret); + rtnl->processing = false; + + return r; +} + +static usec_t calc_elapse(uint64_t usec) { + if (usec == (uint64_t) -1) + return 0; + + if (usec == 0) + usec = RTNL_DEFAULT_TIMEOUT; + + return now(CLOCK_MONOTONIC) + usec; +} + +static int rtnl_poll(sd_rtnl *nl, uint64_t timeout_usec) { + struct pollfd p[1] = {}; + struct timespec ts; + int r; + + assert(nl); + + p[0].fd = nl->fd; p[0].events = POLLIN; + r = ppoll(p, 1, timeout_usec == (uint64_t) -1 ? NULL : + timespec_store(&ts, timeout_usec), NULL); + if (r < 0) + return -errno; + + return r > 0 ? 1 : 0; +} + +int sd_rtnl_wait(sd_rtnl *nl, uint64_t timeout_usec) { + assert_return(nl, -EINVAL); + assert_return(!rtnl_pid_changed(nl), -ECHILD); + + if (nl->rqueue_size > 0) + return 0; + + return rtnl_poll(nl, timeout_usec); +} + +int sd_rtnl_call(sd_rtnl *nl, + sd_rtnl_message *message, + uint64_t usec, + sd_rtnl_message **ret) { + usec_t timeout; + uint32_t serial; + bool room = false; + int r; + + assert_return(nl, -EINVAL); + assert_return(!rtnl_pid_changed(nl), -ECHILD); + assert_return(message, -EINVAL); + + r = sd_rtnl_send(nl, message, &serial); + if (r < 0) + return r; + + timeout = calc_elapse(usec); + for (;;) { - _cleanup_sd_rtnl_message_unref_ sd_rtnl_message *reply = NULL; + usec_t left; + _cleanup_sd_rtnl_message_unref_ sd_rtnl_message *incoming = NULL; - if (timeout) { - usec_t n; + if (!room) { + sd_rtnl_message **q; - n = now(CLOCK_MONOTONIC); - if (n >= timeout) - return -ETIMEDOUT; + if (nl->rqueue_size >= RTNL_RQUEUE_MAX) + return -ENOBUFS; - timespec_store(&left, timeout - n); - } + /* Make sure there's room for queueing this + * locally, before we read the message */ - r = ppoll(p, 1, timeout ? &left : NULL, NULL); - if (r < 0) - return r; + q = realloc(nl->rqueue, (nl->rqueue_size + 1) * sizeof(sd_rtnl_message*)); + if (!q) + return -ENOMEM; - r = socket_read_message(nl, &reply); + nl->rqueue = q; + room = true; + } + + r = socket_read_message(nl, &incoming); if (r < 0) return r; - - if (r > 0) { - int received_serial = message_get_serial(reply); + if (incoming) { + uint32_t received_serial = message_get_serial(incoming); if (received_serial == serial) { - r = message_get_errno(reply); + r = message_get_errno(incoming); if (r < 0) return r; if (ret) { - *ret = reply; - reply = NULL; + *ret = incoming; + incoming = NULL; } - break;; + return 1; } + + /* Room was allocated on the queue above */ + nl->rqueue[nl->rqueue_size ++] = incoming; + incoming = NULL; + room = false; + + /* Try to read more, right away */ + continue; } - } + if (r != 0) + continue; - return 0; + if (timeout > 0) { + usec_t n; + + n = now(CLOCK_MONOTONIC); + if (n >= timeout) + return -ETIMEDOUT; + + left = timeout - n; + } else + left = (uint64_t) -1; + + r = rtnl_poll(nl, left); + if (r < 0) + return r; + + r = dispatch_wqueue(nl); + if (r < 0) + return r; + } } diff --git a/src/libsystemd-rtnl/test-rtnl.c b/src/libsystemd-rtnl/test-rtnl.c index 61345bce40..3615086793 100644 --- a/src/libsystemd-rtnl/test-rtnl.c +++ b/src/libsystemd-rtnl/test-rtnl.c @@ -53,7 +53,7 @@ static void test_link_configure(sd_rtnl *rtnl, int ifindex) { assert(type == IFLA_MTU); assert(mtu == *(unsigned int *) data); - assert(sd_rtnl_call(rtnl, message, 0, NULL) == 0); + assert(sd_rtnl_call(rtnl, message, 0, NULL) == 1); } static void test_route(void) { @@ -133,7 +133,7 @@ int main(void) { assert(sd_rtnl_message_read(m, &type, &data) == 0); - assert(sd_rtnl_call(rtnl, m, 0, &r) >= 0); + assert(sd_rtnl_call(rtnl, m, 0, &r) == 1); assert(sd_rtnl_message_get_type(r, &type) >= 0); assert(type == RTM_NEWLINK); @@ -155,7 +155,7 @@ int main(void) { assert(sd_rtnl_message_read(m, &type, &data) == 0); - assert(sd_rtnl_call(rtnl, m, -1, &r) >= 0); + assert(sd_rtnl_call(rtnl, m, -1, &r) == 1); while (sd_rtnl_message_read(r, &type, &data) > 0) { switch (type) { // case IFLA_MTU: diff --git a/src/systemd/sd-rtnl.h b/src/systemd/sd-rtnl.h index 2d166c4fb4..87acc316c2 100644 --- a/src/systemd/sd-rtnl.h +++ b/src/systemd/sd-rtnl.h @@ -37,9 +37,13 @@ int sd_rtnl_open(uint32_t groups, sd_rtnl **nl); sd_rtnl *sd_rtnl_ref(sd_rtnl *nl); sd_rtnl *sd_rtnl_unref(sd_rtnl *nl); +int sd_rtnl_send(sd_rtnl *nl, sd_rtnl_message *message, uint32_t *serial); int sd_rtnl_call(sd_rtnl *nl, sd_rtnl_message *message, uint64_t timeout, sd_rtnl_message **reply); +int sd_rtnl_process(sd_rtnl *nl, sd_rtnl_message **ret); +int sd_rtnl_wait(sd_rtnl *nl, uint64_t timeout); + /* messages */ int sd_rtnl_message_link_new(uint16_t msg_type, int index, unsigned int type, unsigned int flags, sd_rtnl_message **ret); -- cgit v1.2.3-54-g00ecf