diff options
author | Tom Gundersen <teg@jklm.no> | 2013-11-13 23:18:20 +0100 |
---|---|---|
committer | Tom Gundersen <teg@jklm.no> | 2013-11-14 14:32:48 +0100 |
commit | e16bcf986e3e2da2e01afc96f264edfc7bb00d1a (patch) | |
tree | b6908953b2fe630af11e53480eb405054e296634 | |
parent | 24710f6d2fbe25282fd96a8f869b7e4a9c3b50d8 (diff) |
rtnl: add call_async and call_async_cancel
They work in the same way as the sd-bus equivalents.
-rw-r--r-- | src/libsystemd-rtnl/rtnl-internal.h | 20 | ||||
-rw-r--r-- | src/libsystemd-rtnl/rtnl-message.c | 25 | ||||
-rw-r--r-- | src/libsystemd-rtnl/sd-rtnl.c | 166 | ||||
-rw-r--r-- | src/libsystemd-rtnl/test-rtnl.c | 90 | ||||
-rw-r--r-- | src/systemd/sd-rtnl.h | 9 |
5 files changed, 306 insertions, 4 deletions
diff --git a/src/libsystemd-rtnl/rtnl-internal.h b/src/libsystemd-rtnl/rtnl-internal.h index 03297bb8de..adad7850c5 100644 --- a/src/libsystemd-rtnl/rtnl-internal.h +++ b/src/libsystemd-rtnl/rtnl-internal.h @@ -24,6 +24,17 @@ #include <linux/netlink.h> #include "refcnt.h" +#include "prioq.h" + +#include "sd-rtnl.h" + +struct reply_callback { + sd_rtnl_message_handler_t callback; + void *userdata; + usec_t timeout; + uint64_t serial; + unsigned prioq_idx; +}; struct sd_rtnl { RefCount n_ref; @@ -45,6 +56,9 @@ struct sd_rtnl { uint32_t serial; + struct Prioq *reply_callbacks_prioq; + Hashmap *reply_callbacks; + pid_t original_pid; }; @@ -53,8 +67,12 @@ struct sd_rtnl { #define RTNL_WQUEUE_MAX 1024 #define RTNL_RQUEUE_MAX 64*1024 -int message_get_errno(sd_rtnl_message *m); +int message_new_synthetic_error(int error, uint32_t serial, sd_rtnl_message **ret); uint32_t message_get_serial(sd_rtnl_message *m); int message_seal(sd_rtnl *nl, sd_rtnl_message *m); int socket_write_message(sd_rtnl *nl, sd_rtnl_message *m); int socket_read_message(sd_rtnl *nl, sd_rtnl_message **ret); + +/* Make sure callbacks don't destroy the rtnl connection */ +#define RTNL_DONT_DESTROY(rtnl) \ + _cleanup_sd_rtnl_unref_ sd_rtnl *_dont_destroy_##rtnl = sd_rtnl_ref(rtnl) diff --git a/src/libsystemd-rtnl/rtnl-message.c b/src/libsystemd-rtnl/rtnl-message.c index f7ff0a0148..1ce6862668 100644 --- a/src/libsystemd-rtnl/rtnl-message.c +++ b/src/libsystemd-rtnl/rtnl-message.c @@ -68,6 +68,27 @@ static int message_new(sd_rtnl_message **ret, size_t initial_size) { return 0; } +int message_new_synthetic_error(int error, uint32_t serial, sd_rtnl_message **ret) { + struct nlmsgerr *err; + int r; + + assert(error <= 0); + + r = message_new(ret, NLMSG_SPACE(sizeof(struct nlmsgerr))); + if (r < 0) + return r; + + (*ret)->hdr->nlmsg_len = NLMSG_LENGTH(sizeof(struct nlmsgerr)); + (*ret)->hdr->nlmsg_type = NLMSG_ERROR; + (*ret)->hdr->nlmsg_seq = serial; + + err = NLMSG_DATA((*ret)->hdr); + + err->error = error; + + return 0; +} + int sd_rtnl_message_route_new(uint16_t nlmsg_type, unsigned char rtm_family, unsigned char rtm_dst_len, unsigned char rtm_src_len, unsigned char rtm_tos, unsigned char rtm_table, @@ -373,10 +394,10 @@ uint32_t message_get_serial(sd_rtnl_message *m) { return m->hdr->nlmsg_seq; } -int message_get_errno(sd_rtnl_message *m) { +int sd_rtnl_message_get_errno(sd_rtnl_message *m) { struct nlmsgerr *err; - assert(m); + assert_return(m, -EINVAL); if (m->hdr->nlmsg_type != NLMSG_ERROR) return 0; diff --git a/src/libsystemd-rtnl/sd-rtnl.c b/src/libsystemd-rtnl/sd-rtnl.c index b375576835..215c0ab060 100644 --- a/src/libsystemd-rtnl/sd-rtnl.c +++ b/src/libsystemd-rtnl/sd-rtnl.c @@ -24,6 +24,7 @@ #include "macro.h" #include "util.h" +#include "hashmap.h" #include "sd-rtnl.h" #include "rtnl-internal.h" @@ -118,6 +119,9 @@ sd_rtnl *sd_rtnl_unref(sd_rtnl *rtnl) { sd_rtnl_message_unref(rtnl->wqueue[i]); free(rtnl->wqueue); + hashmap_free_free(rtnl->reply_callbacks); + prioq_free(rtnl->reply_callbacks_prioq); + if (rtnl->fd >= 0) close_nointr_nofail(rtnl->fd); @@ -226,10 +230,65 @@ static int dispatch_wqueue(sd_rtnl *rtnl) { return ret; } +static int process_timeout(sd_rtnl *rtnl) { + _cleanup_sd_rtnl_message_unref_ sd_rtnl_message *m = NULL; + struct reply_callback *c; + usec_t n; + int r; + + assert(rtnl); + + c = prioq_peek(rtnl->reply_callbacks_prioq); + if (!c) + return 0; + + n = now(CLOCK_MONOTONIC); + if (c->timeout > n) + return 0; + + r = message_new_synthetic_error(-ETIMEDOUT, c->serial, &m); + if (r < 0) + return r; + + assert_se(prioq_pop(rtnl->reply_callbacks_prioq) == c); + hashmap_remove(rtnl->reply_callbacks, &c->serial); + + r = c->callback(rtnl, m, c->userdata); + free(c); + + return r < 0 ? r : 1; +} + +static int process_reply(sd_rtnl *rtnl, sd_rtnl_message *m) { + struct reply_callback *c; + uint64_t serial; + int r; + + assert(rtnl); + assert(m); + + serial = message_get_serial(m); + c = hashmap_remove(rtnl->reply_callbacks, &serial); + if (!c) + return 0; + + if (c->timeout != 0) + prioq_remove(rtnl->reply_callbacks_prioq, c, &c->prioq_idx); + + r = c->callback(rtnl, m, c->userdata); + free(c); + + return r; +} + static int process_running(sd_rtnl *rtnl, sd_rtnl_message **ret) { _cleanup_sd_rtnl_message_unref_ sd_rtnl_message *m = NULL; int r; + r = process_timeout(rtnl); + if (r != 0) + goto null_message; + r = dispatch_wqueue(rtnl); if (r != 0) goto null_message; @@ -240,6 +299,10 @@ static int process_running(sd_rtnl *rtnl, sd_rtnl_message **ret) { if (!m) goto null_message; + r = process_reply(rtnl, m); + if (r != 0) + goto null_message; + if (ret) { *ret = m; m = NULL; @@ -255,7 +318,9 @@ null_message: return r; } + int sd_rtnl_process(sd_rtnl *rtnl, sd_rtnl_message **ret) { + RTNL_DONT_DESTROY(rtnl); int r; assert_return(rtnl, -EINVAL); @@ -307,6 +372,105 @@ int sd_rtnl_wait(sd_rtnl *nl, uint64_t timeout_usec) { return rtnl_poll(nl, timeout_usec); } +static int timeout_compare(const void *a, const void *b) { + const struct reply_callback *x = a, *y = b; + + if (x->timeout != 0 && y->timeout == 0) + return -1; + + if (x->timeout == 0 && y->timeout != 0) + return 1; + + if (x->timeout < y->timeout) + return -1; + + if (x->timeout > y->timeout) + return 1; + + return 0; +} + +int sd_rtnl_call_async(sd_rtnl *nl, + sd_rtnl_message *m, + sd_rtnl_message_handler_t callback, + void *userdata, + uint64_t usec, + uint32_t *serial) { + struct reply_callback *c; + uint32_t s; + int r, k; + + assert_return(nl, -EINVAL); + assert_return(m, -EINVAL); + assert_return(callback, -EINVAL); + assert_return(!rtnl_pid_changed(nl), -ECHILD); + + r = hashmap_ensure_allocated(&nl->reply_callbacks, uint64_hash_func, uint64_compare_func); + if (r < 0) + return r; + + if (usec != (uint64_t) -1) { + r = prioq_ensure_allocated(&nl->reply_callbacks_prioq, timeout_compare); + if (r < 0) + return r; + } + + c = new0(struct reply_callback, 1); + if (!c) + return -ENOMEM; + + c->callback = callback; + c->userdata = userdata; + c->timeout = calc_elapse(usec); + + k = sd_rtnl_send(nl, m, &s); + if (k < 0) { + free(c); + return k; + } + + c->serial = s; + + r = hashmap_put(nl->reply_callbacks, &c->serial, c); + if (r < 0) { + free(c); + return r; + } + + if (c->timeout != 0) { + r = prioq_put(nl->reply_callbacks_prioq, c, &c->prioq_idx); + if (r > 0) { + c->timeout = 0; + sd_rtnl_call_async_cancel(nl, c->serial); + return r; + } + } + + if (serial) + *serial = s; + + return k; +} + +int sd_rtnl_call_async_cancel(sd_rtnl *nl, uint32_t serial) { + struct reply_callback *c; + uint64_t s = serial; + + assert_return(nl, -EINVAL); + assert_return(serial != 0, -EINVAL); + assert_return(!rtnl_pid_changed(nl), -ECHILD); + + c = hashmap_remove(nl->reply_callbacks, &s); + if (!c) + return 0; + + if (c->timeout != 0) + prioq_remove(nl->reply_callbacks_prioq, c, &c->prioq_idx); + + free(c); + return 1; +} + int sd_rtnl_call(sd_rtnl *nl, sd_rtnl_message *message, uint64_t usec, @@ -354,7 +518,7 @@ int sd_rtnl_call(sd_rtnl *nl, uint32_t received_serial = message_get_serial(incoming); if (received_serial == serial) { - r = message_get_errno(incoming); + r = sd_rtnl_message_get_errno(incoming); if (r < 0) return r; diff --git a/src/libsystemd-rtnl/test-rtnl.c b/src/libsystemd-rtnl/test-rtnl.c index 3615086793..ea48f48455 100644 --- a/src/libsystemd-rtnl/test-rtnl.c +++ b/src/libsystemd-rtnl/test-rtnl.c @@ -103,6 +103,92 @@ static void test_multiple(void) { rtnl2 = sd_rtnl_unref(rtnl2); } +static int link_handler(sd_rtnl *rtnl, sd_rtnl_message *m, void *userdata) { + void *data; + uint16_t type; + char *ifname = userdata; + + assert(rtnl); + assert(m); + + log_info("got link info about %s", ifname); + free(ifname); + + while (sd_rtnl_message_read(m, &type, &data) > 0) { + switch (type) { +// case IFLA_MTU: +// assert(*(unsigned int *) data == 65536); +// break; +// case IFLA_QDISC: +// assert(streq((char *) data, "noqueue")); +// break; + case IFLA_IFNAME: + assert(streq((char *) data, "lo")); + break; + } + } + + return 1; +} + +static int pipe_handler(sd_rtnl *rtnl, sd_rtnl_message *m, void *userdata) { + int *counter = userdata; + + (*counter) --; + + log_info("got reply, %d left in pipe", *counter); + + return sd_rtnl_message_get_errno(m); +} + +static void test_async(void) { + _cleanup_sd_rtnl_unref_ sd_rtnl *rtnl = NULL; + _cleanup_sd_rtnl_message_unref_ sd_rtnl_message *m = NULL, *r = NULL; + int if_loopback; + uint32_t serial; + char *ifname; + + ifname = strdup("lo"); + + assert(sd_rtnl_open(0, &rtnl) >= 0); + + if_loopback = (int) if_nametoindex("lo"); + assert(if_loopback > 0); + + assert(sd_rtnl_message_link_new(RTM_GETLINK, if_loopback, 0, 0, &m) >= 0); + + assert(sd_rtnl_call_async(rtnl, m, &link_handler, ifname, 0, &serial) >= 0); + + assert(sd_rtnl_wait(rtnl, 0) >= 0); + assert(sd_rtnl_process(rtnl, &r) >= 0); +} + +static void test_pipe(void) { + _cleanup_sd_rtnl_unref_ sd_rtnl *rtnl = NULL; + _cleanup_sd_rtnl_message_unref_ sd_rtnl_message *m1 = NULL, *m2 = NULL; + int counter = 0; + int if_loopback; + + assert(sd_rtnl_open(0, &rtnl) >= 0); + + if_loopback = (int) if_nametoindex("lo"); + assert(if_loopback > 0); + + assert(sd_rtnl_message_link_new(RTM_GETLINK, if_loopback, 0, 0, &m1) >= 0); + assert(sd_rtnl_message_link_new(RTM_GETLINK, if_loopback, 0, 0, &m2) >= 0); + + counter ++; + assert(sd_rtnl_call_async(rtnl, m1, &pipe_handler, &counter, 0, NULL) >= 0); + + counter ++; + assert(sd_rtnl_call_async(rtnl, m2, &pipe_handler, &counter, 0, NULL) >= 0); + + while (counter > 0) { + assert(sd_rtnl_wait(rtnl, 0) >= 0); + assert(sd_rtnl_process(rtnl, NULL) >= 0); + } +} + int main(void) { sd_rtnl *rtnl; sd_rtnl_message *m; @@ -117,6 +203,10 @@ int main(void) { test_route(); + test_async(); + + test_pipe(); + assert(sd_rtnl_open(0, &rtnl) >= 0); assert(rtnl); diff --git a/src/systemd/sd-rtnl.h b/src/systemd/sd-rtnl.h index 87acc316c2..c673b36d1c 100644 --- a/src/systemd/sd-rtnl.h +++ b/src/systemd/sd-rtnl.h @@ -31,6 +31,10 @@ _SD_BEGIN_DECLARATIONS; typedef struct sd_rtnl sd_rtnl; typedef struct sd_rtnl_message sd_rtnl_message; +/* callback */ + +typedef int (*sd_rtnl_message_handler_t)(sd_rtnl *rtnl, sd_rtnl_message *m, void *userdata); + /* bus */ int sd_rtnl_open(uint32_t groups, sd_rtnl **nl); @@ -38,6 +42,10 @@ sd_rtnl *sd_rtnl_ref(sd_rtnl *nl); sd_rtnl *sd_rtnl_unref(sd_rtnl *nl); int sd_rtnl_send(sd_rtnl *nl, sd_rtnl_message *message, uint32_t *serial); +int sd_rtnl_call_async(sd_rtnl *nl, sd_rtnl_message *message, + sd_rtnl_message_handler_t callback, + void *userdata, uint64_t usec, uint32_t *serial); +int sd_rtnl_call_async_cancel(sd_rtnl *nl, uint32_t serial); int sd_rtnl_call(sd_rtnl *nl, sd_rtnl_message *message, uint64_t timeout, sd_rtnl_message **reply); @@ -58,6 +66,7 @@ int sd_rtnl_message_route_new(uint16_t nlmsg_type, unsigned char rtm_family, sd_rtnl_message *sd_rtnl_message_ref(sd_rtnl_message *m); sd_rtnl_message *sd_rtnl_message_unref(sd_rtnl_message *m); +int sd_rtnl_message_get_errno(sd_rtnl_message *m); int sd_rtnl_message_get_type(sd_rtnl_message *m, uint16_t *type); int sd_rtnl_message_append(sd_rtnl_message *m, unsigned short type, const void *data); int sd_rtnl_message_read(sd_rtnl_message *m, unsigned short *type, void **data); |