summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorTom Gundersen <teg@jklm.no>2013-11-12 22:37:51 +0100
committerTom Gundersen <teg@jklm.no>2013-11-13 19:52:17 +0100
commit4555ec72d6530fce4c978fd894ac22f7e006b0ee (patch)
tree9e2f0e85634b9c929f4443fe377e70391f1101fb /src
parentfe4824e065765f4536c84916694bb050c4a5d0af (diff)
rtnl: start adding support for asynchronous messaging
Similarly to sd-bus, add: sd_rtnl_wait sd_rtnl_process sd_rtnl_send and adapt sd_rtnl_call accordingly.
Diffstat (limited to 'src')
-rw-r--r--src/libsystemd-rtnl/rtnl-internal.h13
-rw-r--r--src/libsystemd-rtnl/rtnl-message.c2
-rw-r--r--src/libsystemd-rtnl/sd-rtnl.c300
-rw-r--r--src/libsystemd-rtnl/test-rtnl.c6
-rw-r--r--src/systemd/sd-rtnl.h4
5 files changed, 267 insertions, 58 deletions
diff --git a/src/libsystemd-rtnl/rtnl-internal.h b/src/libsystemd-rtnl/rtnl-internal.h
index b05290fd89..03297bb8de 100644
--- a/src/libsystemd-rtnl/rtnl-internal.h
+++ b/src/libsystemd-rtnl/rtnl-internal.h
@@ -35,6 +35,14 @@ struct sd_rtnl {
struct sockaddr_nl nl;
} sockaddr;
+ sd_rtnl_message **rqueue;
+ unsigned rqueue_size;
+
+ sd_rtnl_message **wqueue;
+ unsigned wqueue_size;
+
+ bool processing:1;
+
uint32_t serial;
pid_t original_pid;
@@ -42,8 +50,11 @@ struct sd_rtnl {
#define RTNL_DEFAULT_TIMEOUT ((usec_t) (10 * USEC_PER_SEC))
+#define RTNL_WQUEUE_MAX 1024
+#define RTNL_RQUEUE_MAX 64*1024
+
int message_get_errno(sd_rtnl_message *m);
-int message_get_serial(sd_rtnl_message *m);
+uint32_t message_get_serial(sd_rtnl_message *m);
int message_seal(sd_rtnl *nl, sd_rtnl_message *m);
int socket_write_message(sd_rtnl *nl, sd_rtnl_message *m);
int socket_read_message(sd_rtnl *nl, sd_rtnl_message **ret);
diff --git a/src/libsystemd-rtnl/rtnl-message.c b/src/libsystemd-rtnl/rtnl-message.c
index 941bd96382..f7ff0a0148 100644
--- a/src/libsystemd-rtnl/rtnl-message.c
+++ b/src/libsystemd-rtnl/rtnl-message.c
@@ -367,7 +367,7 @@ int sd_rtnl_message_read(sd_rtnl_message *m, unsigned short *type, void **data)
return message_read(m, type, data);
}
-int message_get_serial(sd_rtnl_message *m) {
+uint32_t message_get_serial(sd_rtnl_message *m) {
assert(m);
return m->hdr->nlmsg_seq;
diff --git a/src/libsystemd-rtnl/sd-rtnl.c b/src/libsystemd-rtnl/sd-rtnl.c
index eb3b01b725..b375576835 100644
--- a/src/libsystemd-rtnl/sd-rtnl.c
+++ b/src/libsystemd-rtnl/sd-rtnl.c
@@ -46,6 +46,14 @@ static int sd_rtnl_new(sd_rtnl **ret) {
rtnl->original_pid = getpid();
+ /* We guarantee that wqueue always has space for at least
+ * one entry */
+ rtnl->wqueue = new(sd_rtnl_message*, 1);
+ if (!rtnl->wqueue) {
+ free(rtnl);
+ return -ENOMEM;
+ }
+
*ret = rtnl;
return 0;
}
@@ -98,23 +106,31 @@ sd_rtnl *sd_rtnl_ref(sd_rtnl *rtnl) {
}
sd_rtnl *sd_rtnl_unref(sd_rtnl *rtnl) {
+
if (rtnl && REFCNT_DEC(rtnl->n_ref) <= 0) {
+ unsigned i;
+
+ for (i = 0; i < rtnl->rqueue_size; i++)
+ sd_rtnl_message_unref(rtnl->rqueue[i]);
+ free(rtnl->rqueue);
+
+ for (i = 0; i < rtnl->wqueue_size; i++)
+ sd_rtnl_message_unref(rtnl->wqueue[i]);
+ free(rtnl->wqueue);
+
if (rtnl->fd >= 0)
close_nointr_nofail(rtnl->fd);
+
free(rtnl);
}
return NULL;
}
-int sd_rtnl_call(sd_rtnl *nl,
- sd_rtnl_message *message,
- uint64_t usec,
- sd_rtnl_message **ret) {
- struct pollfd p[1] = {};
- struct timespec left;
- usec_t timeout;
- int r, serial;
+int sd_rtnl_send(sd_rtnl *nl,
+ sd_rtnl_message *message,
+ uint32_t *serial) {
+ int r;
assert_return(nl, -EINVAL);
assert_return(!rtnl_pid_changed(nl), -ECHILD);
@@ -124,82 +140,260 @@ int sd_rtnl_call(sd_rtnl *nl,
if (r < 0)
return r;
- serial = message_get_serial(message);
+ if (nl->wqueue_size <= 0) {
+ /* send directly */
+ r = socket_write_message(nl, message);
+ if (r < 0)
+ return r;
+ else if (r == 0) {
+ /* nothing was sent, so let's put it on
+ * the queue */
+ nl->wqueue[0] = sd_rtnl_message_ref(message);
+ nl->wqueue_size = 1;
+ }
+ } else {
+ sd_rtnl_message **q;
- p[0].fd = nl->fd;
- p[0].events = POLLOUT;
+ /* append to queue */
+ if (nl->wqueue_size >= RTNL_WQUEUE_MAX)
+ return -ENOBUFS;
- if (usec == (uint64_t) -1)
- timeout = 0;
- else if (usec == 0)
- timeout = now(CLOCK_MONOTONIC) + RTNL_DEFAULT_TIMEOUT;
- else
- timeout = now(CLOCK_MONOTONIC) + usec;
+ q = realloc(nl->wqueue, sizeof(sd_rtnl_message*) * (nl->wqueue_size + 1));
+ if (!q)
+ return -ENOMEM;
- for (;;) {
- if (timeout) {
- usec_t n;
+ nl->wqueue = q;
+ q[nl->wqueue_size ++] = sd_rtnl_message_ref(message);
+ }
- n = now(CLOCK_MONOTONIC);
- if (n >= timeout)
- return -ETIMEDOUT;
+ if (serial)
+ *serial = message_get_serial(message);
- timespec_store(&left, timeout - n);
- }
+ return 1;
+}
- r = ppoll(p, 1, timeout ? &left : NULL, NULL);
- if (r < 0)
- return 0;
+static int dispatch_rqueue(sd_rtnl *rtnl, sd_rtnl_message **message) {
+ sd_rtnl_message *z = NULL;
+ int r;
- r = socket_write_message(nl, message);
+ assert(rtnl);
+ assert(message);
+
+ if (rtnl->rqueue_size > 0) {
+ /* Dispatch a queued message */
+
+ *message = rtnl->rqueue[0];
+ rtnl->rqueue_size --;
+ memmove(rtnl->rqueue, rtnl->rqueue + 1, sizeof(sd_rtnl_message*) * rtnl->rqueue_size);
+
+ return 1;
+ }
+
+ /* Try to read a new message */
+ r = socket_read_message(rtnl, &z);
+ if (r < 0)
+ return r;
+ if (r == 0)
+ return 0;
+
+ *message = z;
+
+ return 1;
+}
+
+static int dispatch_wqueue(sd_rtnl *rtnl) {
+ int r, ret = 0;
+
+ assert(rtnl);
+
+ while (rtnl->wqueue_size > 0) {
+ r = socket_write_message(rtnl, rtnl->wqueue[0]);
if (r < 0)
return r;
-
- if (r > 0) {
- break;
+ else if (r == 0)
+ /* Didn't do anything this time */
+ return ret;
+ else {
+ /* see equivalent in sd-bus.c */
+ sd_rtnl_message_unref(rtnl->wqueue[0]);
+ rtnl->wqueue_size --;
+ memmove(rtnl->wqueue, rtnl->wqueue + 1, sizeof(sd_rtnl_message*) * rtnl->wqueue_size);
+
+ ret = 1;
}
}
+ return ret;
+}
+
+static int process_running(sd_rtnl *rtnl, sd_rtnl_message **ret) {
+ _cleanup_sd_rtnl_message_unref_ sd_rtnl_message *m = NULL;
+ int r;
+
+ r = dispatch_wqueue(rtnl);
+ if (r != 0)
+ goto null_message;
+
+ r = dispatch_rqueue(rtnl, &m);
+ if (r < 0)
+ return r;
+ if (!m)
+ goto null_message;
+
+ if (ret) {
+ *ret = m;
+ m = NULL;
+
+ return 1;
+ }
+
+ return 1;
+
+null_message:
+ if (r >= 0 && ret)
+ *ret = NULL;
+
+ return r;
+}
+int sd_rtnl_process(sd_rtnl *rtnl, sd_rtnl_message **ret) {
+ int r;
+
+ assert_return(rtnl, -EINVAL);
+ assert_return(!rtnl_pid_changed(rtnl), -ECHILD);
+ assert_return(!rtnl->processing, -EBUSY);
+
+ rtnl->processing = true;
+ r = process_running(rtnl, ret);
+ rtnl->processing = false;
+
+ return r;
+}
+
+static usec_t calc_elapse(uint64_t usec) {
+ if (usec == (uint64_t) -1)
+ return 0;
+
+ if (usec == 0)
+ usec = RTNL_DEFAULT_TIMEOUT;
+
+ return now(CLOCK_MONOTONIC) + usec;
+}
+
+static int rtnl_poll(sd_rtnl *nl, uint64_t timeout_usec) {
+ struct pollfd p[1] = {};
+ struct timespec ts;
+ int r;
+
+ assert(nl);
+
+ p[0].fd = nl->fd;
p[0].events = POLLIN;
+ r = ppoll(p, 1, timeout_usec == (uint64_t) -1 ? NULL :
+ timespec_store(&ts, timeout_usec), NULL);
+ if (r < 0)
+ return -errno;
+
+ return r > 0 ? 1 : 0;
+}
+
+int sd_rtnl_wait(sd_rtnl *nl, uint64_t timeout_usec) {
+ assert_return(nl, -EINVAL);
+ assert_return(!rtnl_pid_changed(nl), -ECHILD);
+
+ if (nl->rqueue_size > 0)
+ return 0;
+
+ return rtnl_poll(nl, timeout_usec);
+}
+
+int sd_rtnl_call(sd_rtnl *nl,
+ sd_rtnl_message *message,
+ uint64_t usec,
+ sd_rtnl_message **ret) {
+ usec_t timeout;
+ uint32_t serial;
+ bool room = false;
+ int r;
+
+ assert_return(nl, -EINVAL);
+ assert_return(!rtnl_pid_changed(nl), -ECHILD);
+ assert_return(message, -EINVAL);
+
+ r = sd_rtnl_send(nl, message, &serial);
+ if (r < 0)
+ return r;
+
+ timeout = calc_elapse(usec);
+
for (;;) {
- _cleanup_sd_rtnl_message_unref_ sd_rtnl_message *reply = NULL;
+ usec_t left;
+ _cleanup_sd_rtnl_message_unref_ sd_rtnl_message *incoming = NULL;
- if (timeout) {
- usec_t n;
+ if (!room) {
+ sd_rtnl_message **q;
- n = now(CLOCK_MONOTONIC);
- if (n >= timeout)
- return -ETIMEDOUT;
+ if (nl->rqueue_size >= RTNL_RQUEUE_MAX)
+ return -ENOBUFS;
- timespec_store(&left, timeout - n);
- }
+ /* Make sure there's room for queueing this
+ * locally, before we read the message */
- r = ppoll(p, 1, timeout ? &left : NULL, NULL);
- if (r < 0)
- return r;
+ q = realloc(nl->rqueue, (nl->rqueue_size + 1) * sizeof(sd_rtnl_message*));
+ if (!q)
+ return -ENOMEM;
- r = socket_read_message(nl, &reply);
+ nl->rqueue = q;
+ room = true;
+ }
+
+ r = socket_read_message(nl, &incoming);
if (r < 0)
return r;
-
- if (r > 0) {
- int received_serial = message_get_serial(reply);
+ if (incoming) {
+ uint32_t received_serial = message_get_serial(incoming);
if (received_serial == serial) {
- r = message_get_errno(reply);
+ r = message_get_errno(incoming);
if (r < 0)
return r;
if (ret) {
- *ret = reply;
- reply = NULL;
+ *ret = incoming;
+ incoming = NULL;
}
- break;;
+ return 1;
}
+
+ /* Room was allocated on the queue above */
+ nl->rqueue[nl->rqueue_size ++] = incoming;
+ incoming = NULL;
+ room = false;
+
+ /* Try to read more, right away */
+ continue;
}
- }
+ if (r != 0)
+ continue;
- return 0;
+ if (timeout > 0) {
+ usec_t n;
+
+ n = now(CLOCK_MONOTONIC);
+ if (n >= timeout)
+ return -ETIMEDOUT;
+
+ left = timeout - n;
+ } else
+ left = (uint64_t) -1;
+
+ r = rtnl_poll(nl, left);
+ if (r < 0)
+ return r;
+
+ r = dispatch_wqueue(nl);
+ if (r < 0)
+ return r;
+ }
}
diff --git a/src/libsystemd-rtnl/test-rtnl.c b/src/libsystemd-rtnl/test-rtnl.c
index 61345bce40..3615086793 100644
--- a/src/libsystemd-rtnl/test-rtnl.c
+++ b/src/libsystemd-rtnl/test-rtnl.c
@@ -53,7 +53,7 @@ static void test_link_configure(sd_rtnl *rtnl, int ifindex) {
assert(type == IFLA_MTU);
assert(mtu == *(unsigned int *) data);
- assert(sd_rtnl_call(rtnl, message, 0, NULL) == 0);
+ assert(sd_rtnl_call(rtnl, message, 0, NULL) == 1);
}
static void test_route(void) {
@@ -133,7 +133,7 @@ int main(void) {
assert(sd_rtnl_message_read(m, &type, &data) == 0);
- assert(sd_rtnl_call(rtnl, m, 0, &r) >= 0);
+ assert(sd_rtnl_call(rtnl, m, 0, &r) == 1);
assert(sd_rtnl_message_get_type(r, &type) >= 0);
assert(type == RTM_NEWLINK);
@@ -155,7 +155,7 @@ int main(void) {
assert(sd_rtnl_message_read(m, &type, &data) == 0);
- assert(sd_rtnl_call(rtnl, m, -1, &r) >= 0);
+ assert(sd_rtnl_call(rtnl, m, -1, &r) == 1);
while (sd_rtnl_message_read(r, &type, &data) > 0) {
switch (type) {
// case IFLA_MTU:
diff --git a/src/systemd/sd-rtnl.h b/src/systemd/sd-rtnl.h
index 2d166c4fb4..87acc316c2 100644
--- a/src/systemd/sd-rtnl.h
+++ b/src/systemd/sd-rtnl.h
@@ -37,9 +37,13 @@ int sd_rtnl_open(uint32_t groups, sd_rtnl **nl);
sd_rtnl *sd_rtnl_ref(sd_rtnl *nl);
sd_rtnl *sd_rtnl_unref(sd_rtnl *nl);
+int sd_rtnl_send(sd_rtnl *nl, sd_rtnl_message *message, uint32_t *serial);
int sd_rtnl_call(sd_rtnl *nl, sd_rtnl_message *message, uint64_t timeout,
sd_rtnl_message **reply);
+int sd_rtnl_process(sd_rtnl *nl, sd_rtnl_message **ret);
+int sd_rtnl_wait(sd_rtnl *nl, uint64_t timeout);
+
/* messages */
int sd_rtnl_message_link_new(uint16_t msg_type, int index, unsigned int type,
unsigned int flags, sd_rtnl_message **ret);