summaryrefslogtreecommitdiff
path: root/net/sunrpc
diff options
context:
space:
mode:
authorAndré Fabian Silva Delgado <emulatorman@parabola.nu>2016-01-20 14:01:31 -0300
committerAndré Fabian Silva Delgado <emulatorman@parabola.nu>2016-01-20 14:01:31 -0300
commitb4b7ff4b08e691656c9d77c758fc355833128ac0 (patch)
tree82fcb00e6b918026dc9f2d1f05ed8eee83874cc0 /net/sunrpc
parent35acfa0fc609f2a2cd95cef4a6a9c3a5c38f1778 (diff)
Linux-libre 4.4-gnupck-4.4-gnu
Diffstat (limited to 'net/sunrpc')
-rw-r--r--net/sunrpc/auth_gss/auth_gss.c13
-rw-r--r--net/sunrpc/backchannel_rqst.c24
-rw-r--r--net/sunrpc/cache.c53
-rw-r--r--net/sunrpc/sched.c6
-rw-r--r--net/sunrpc/svc.c18
-rw-r--r--net/sunrpc/svcsock.c40
-rw-r--r--net/sunrpc/sysctl.c23
-rw-r--r--net/sunrpc/xprtrdma/Makefile1
-rw-r--r--net/sunrpc/xprtrdma/backchannel.c394
-rw-r--r--net/sunrpc/xprtrdma/frwr_ops.c126
-rw-r--r--net/sunrpc/xprtrdma/rpc_rdma.c148
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma.c6
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_recvfrom.c123
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_sendto.c18
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_transport.c96
-rw-r--r--net/sunrpc/xprtrdma/transport.c18
-rw-r--r--net/sunrpc/xprtrdma/verbs.c482
-rw-r--r--net/sunrpc/xprtrdma/xprt_rdma.h57
-rw-r--r--net/sunrpc/xprtsock.c274
19 files changed, 1343 insertions, 577 deletions
diff --git a/net/sunrpc/auth_gss/auth_gss.c b/net/sunrpc/auth_gss/auth_gss.c
index dace13d76..799e65b94 100644
--- a/net/sunrpc/auth_gss/auth_gss.c
+++ b/net/sunrpc/auth_gss/auth_gss.c
@@ -1411,17 +1411,16 @@ gss_key_timeout(struct rpc_cred *rc)
{
struct gss_cred *gss_cred = container_of(rc, struct gss_cred, gc_base);
struct gss_cl_ctx *ctx;
- unsigned long now = jiffies;
- unsigned long expire;
+ unsigned long timeout = jiffies + (gss_key_expire_timeo * HZ);
+ int ret = 0;
rcu_read_lock();
ctx = rcu_dereference(gss_cred->gc_ctx);
- if (ctx)
- expire = ctx->gc_expiry - (gss_key_expire_timeo * HZ);
+ if (!ctx || time_after(timeout, ctx->gc_expiry))
+ ret = -EACCES;
rcu_read_unlock();
- if (!ctx || time_after(now, expire))
- return -EACCES;
- return 0;
+
+ return ret;
}
static int
diff --git a/net/sunrpc/backchannel_rqst.c b/net/sunrpc/backchannel_rqst.c
index 6255d1411..229956bf8 100644
--- a/net/sunrpc/backchannel_rqst.c
+++ b/net/sunrpc/backchannel_rqst.c
@@ -138,6 +138,14 @@ out_free:
*/
int xprt_setup_backchannel(struct rpc_xprt *xprt, unsigned int min_reqs)
{
+ if (!xprt->ops->bc_setup)
+ return 0;
+ return xprt->ops->bc_setup(xprt, min_reqs);
+}
+EXPORT_SYMBOL_GPL(xprt_setup_backchannel);
+
+int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs)
+{
struct rpc_rqst *req;
struct list_head tmp_list;
int i;
@@ -192,7 +200,6 @@ out_free:
dprintk("RPC: setup backchannel transport failed\n");
return -ENOMEM;
}
-EXPORT_SYMBOL_GPL(xprt_setup_backchannel);
/**
* xprt_destroy_backchannel - Destroys the backchannel preallocated structures.
@@ -205,6 +212,13 @@ EXPORT_SYMBOL_GPL(xprt_setup_backchannel);
*/
void xprt_destroy_backchannel(struct rpc_xprt *xprt, unsigned int max_reqs)
{
+ if (xprt->ops->bc_destroy)
+ xprt->ops->bc_destroy(xprt, max_reqs);
+}
+EXPORT_SYMBOL_GPL(xprt_destroy_backchannel);
+
+void xprt_destroy_bc(struct rpc_xprt *xprt, unsigned int max_reqs)
+{
struct rpc_rqst *req = NULL, *tmp = NULL;
dprintk("RPC: destroy backchannel transport\n");
@@ -227,7 +241,6 @@ out:
dprintk("RPC: backchannel list empty= %s\n",
list_empty(&xprt->bc_pa_list) ? "true" : "false");
}
-EXPORT_SYMBOL_GPL(xprt_destroy_backchannel);
static struct rpc_rqst *xprt_alloc_bc_request(struct rpc_xprt *xprt, __be32 xid)
{
@@ -264,6 +277,13 @@ void xprt_free_bc_request(struct rpc_rqst *req)
{
struct rpc_xprt *xprt = req->rq_xprt;
+ xprt->ops->bc_free_rqst(req);
+}
+
+void xprt_free_bc_rqst(struct rpc_rqst *req)
+{
+ struct rpc_xprt *xprt = req->rq_xprt;
+
dprintk("RPC: free backchannel req=%p\n", req);
req->rq_connect_cookie = xprt->connect_cookie - 1;
diff --git a/net/sunrpc/cache.c b/net/sunrpc/cache.c
index 4a2340a54..5e4f815c2 100644
--- a/net/sunrpc/cache.c
+++ b/net/sunrpc/cache.c
@@ -41,13 +41,16 @@
static bool cache_defer_req(struct cache_req *req, struct cache_head *item);
static void cache_revisit_request(struct cache_head *item);
-static void cache_init(struct cache_head *h)
+static void cache_init(struct cache_head *h, struct cache_detail *detail)
{
time_t now = seconds_since_boot();
INIT_HLIST_NODE(&h->cache_list);
h->flags = 0;
kref_init(&h->ref);
h->expiry_time = now + CACHE_NEW_EXPIRY;
+ if (now <= detail->flush_time)
+ /* ensure it isn't already expired */
+ now = detail->flush_time + 1;
h->last_refresh = now;
}
@@ -81,7 +84,7 @@ struct cache_head *sunrpc_cache_lookup(struct cache_detail *detail,
* we might get lose if we need to
* cache_put it soon.
*/
- cache_init(new);
+ cache_init(new, detail);
detail->init(new, key);
write_lock(&detail->hash_lock);
@@ -116,10 +119,15 @@ EXPORT_SYMBOL_GPL(sunrpc_cache_lookup);
static void cache_dequeue(struct cache_detail *detail, struct cache_head *ch);
-static void cache_fresh_locked(struct cache_head *head, time_t expiry)
+static void cache_fresh_locked(struct cache_head *head, time_t expiry,
+ struct cache_detail *detail)
{
+ time_t now = seconds_since_boot();
+ if (now <= detail->flush_time)
+ /* ensure it isn't immediately treated as expired */
+ now = detail->flush_time + 1;
head->expiry_time = expiry;
- head->last_refresh = seconds_since_boot();
+ head->last_refresh = now;
smp_wmb(); /* paired with smp_rmb() in cache_is_valid() */
set_bit(CACHE_VALID, &head->flags);
}
@@ -149,7 +157,7 @@ struct cache_head *sunrpc_cache_update(struct cache_detail *detail,
set_bit(CACHE_NEGATIVE, &old->flags);
else
detail->update(old, new);
- cache_fresh_locked(old, new->expiry_time);
+ cache_fresh_locked(old, new->expiry_time, detail);
write_unlock(&detail->hash_lock);
cache_fresh_unlocked(old, detail);
return old;
@@ -162,7 +170,7 @@ struct cache_head *sunrpc_cache_update(struct cache_detail *detail,
cache_put(old, detail);
return NULL;
}
- cache_init(tmp);
+ cache_init(tmp, detail);
detail->init(tmp, old);
write_lock(&detail->hash_lock);
@@ -173,8 +181,8 @@ struct cache_head *sunrpc_cache_update(struct cache_detail *detail,
hlist_add_head(&tmp->cache_list, &detail->hash_table[hash]);
detail->entries++;
cache_get(tmp);
- cache_fresh_locked(tmp, new->expiry_time);
- cache_fresh_locked(old, 0);
+ cache_fresh_locked(tmp, new->expiry_time, detail);
+ cache_fresh_locked(old, 0, detail);
write_unlock(&detail->hash_lock);
cache_fresh_unlocked(tmp, detail);
cache_fresh_unlocked(old, detail);
@@ -219,7 +227,8 @@ static int try_to_negate_entry(struct cache_detail *detail, struct cache_head *h
rv = cache_is_valid(h);
if (rv == -EAGAIN) {
set_bit(CACHE_NEGATIVE, &h->flags);
- cache_fresh_locked(h, seconds_since_boot()+CACHE_NEW_EXPIRY);
+ cache_fresh_locked(h, seconds_since_boot()+CACHE_NEW_EXPIRY,
+ detail);
rv = -ENOENT;
}
write_unlock(&detail->hash_lock);
@@ -487,10 +496,13 @@ EXPORT_SYMBOL_GPL(cache_flush);
void cache_purge(struct cache_detail *detail)
{
- detail->flush_time = LONG_MAX;
+ time_t now = seconds_since_boot();
+ if (detail->flush_time >= now)
+ now = detail->flush_time + 1;
+ /* 'now' is the maximum value any 'last_refresh' can have */
+ detail->flush_time = now;
detail->nextcheck = seconds_since_boot();
cache_flush();
- detail->flush_time = 1;
}
EXPORT_SYMBOL_GPL(cache_purge);
@@ -1436,6 +1448,7 @@ static ssize_t write_flush(struct file *file, const char __user *buf,
{
char tbuf[20];
char *bp, *ep;
+ time_t then, now;
if (*ppos || count > sizeof(tbuf)-1)
return -EINVAL;
@@ -1447,8 +1460,22 @@ static ssize_t write_flush(struct file *file, const char __user *buf,
return -EINVAL;
bp = tbuf;
- cd->flush_time = get_expiry(&bp);
- cd->nextcheck = seconds_since_boot();
+ then = get_expiry(&bp);
+ now = seconds_since_boot();
+ cd->nextcheck = now;
+ /* Can only set flush_time to 1 second beyond "now", or
+ * possibly 1 second beyond flushtime. This is because
+ * flush_time never goes backwards so it mustn't get too far
+ * ahead of time.
+ */
+ if (then >= now) {
+ /* Want to flush everything, so behave like cache_purge() */
+ if (cd->flush_time >= now)
+ now = cd->flush_time + 1;
+ then = now;
+ }
+
+ cd->flush_time = then;
cache_flush();
*ppos += count;
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index f14f24ee9..73ad57a59 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -250,11 +250,11 @@ void rpc_destroy_wait_queue(struct rpc_wait_queue *queue)
}
EXPORT_SYMBOL_GPL(rpc_destroy_wait_queue);
-static int rpc_wait_bit_killable(struct wait_bit_key *key)
+static int rpc_wait_bit_killable(struct wait_bit_key *key, int mode)
{
- if (fatal_signal_pending(current))
- return -ERESTARTSYS;
freezable_schedule_unsafe();
+ if (signal_pending_state(mode, current))
+ return -ERESTARTSYS;
return 0;
}
diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c
index a8f579df1..cc9852897 100644
--- a/net/sunrpc/svc.c
+++ b/net/sunrpc/svc.c
@@ -1364,14 +1364,22 @@ bc_svc_process(struct svc_serv *serv, struct rpc_rqst *req,
memcpy(&rqstp->rq_arg, &req->rq_rcv_buf, sizeof(rqstp->rq_arg));
memcpy(&rqstp->rq_res, &req->rq_snd_buf, sizeof(rqstp->rq_res));
+ /* Adjust the argument buffer length */
+ rqstp->rq_arg.len = req->rq_private_buf.len;
+ if (rqstp->rq_arg.len <= rqstp->rq_arg.head[0].iov_len) {
+ rqstp->rq_arg.head[0].iov_len = rqstp->rq_arg.len;
+ rqstp->rq_arg.page_len = 0;
+ } else if (rqstp->rq_arg.len <= rqstp->rq_arg.head[0].iov_len +
+ rqstp->rq_arg.page_len)
+ rqstp->rq_arg.page_len = rqstp->rq_arg.len -
+ rqstp->rq_arg.head[0].iov_len;
+ else
+ rqstp->rq_arg.len = rqstp->rq_arg.head[0].iov_len +
+ rqstp->rq_arg.page_len;
+
/* reset result send buffer "put" position */
resv->iov_len = 0;
- if (rqstp->rq_prot != IPPROTO_TCP) {
- printk(KERN_ERR "No support for Non-TCP transports!\n");
- BUG();
- }
-
/*
* Skip the next two words because they've already been
* processed in the transport
diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c
index 0c8120229..1413cdcc1 100644
--- a/net/sunrpc/svcsock.c
+++ b/net/sunrpc/svcsock.c
@@ -181,7 +181,7 @@ int svc_send_common(struct socket *sock, struct xdr_buf *xdr,
struct page **ppage = xdr->pages;
size_t base = xdr->page_base;
unsigned int pglen = xdr->page_len;
- unsigned int flags = MSG_MORE;
+ unsigned int flags = MSG_MORE | MSG_SENDPAGE_NOTLAST;
int slen;
int len = 0;
@@ -399,6 +399,31 @@ static int svc_sock_secure_port(struct svc_rqst *rqstp)
return svc_port_is_privileged(svc_addr(rqstp));
}
+static bool sunrpc_waitqueue_active(wait_queue_head_t *wq)
+{
+ if (!wq)
+ return false;
+ /*
+ * There should normally be a memory * barrier here--see
+ * wq_has_sleeper().
+ *
+ * It appears that isn't currently necessary, though, basically
+ * because callers all appear to have sufficient memory barriers
+ * between the time the relevant change is made and the
+ * time they call these callbacks.
+ *
+ * The nfsd code itself doesn't actually explicitly wait on
+ * these waitqueues, but it may wait on them for example in
+ * sendpage() or sendmsg() calls. (And those may be the only
+ * places, since it it uses nonblocking reads.)
+ *
+ * Maybe we should add the memory barriers anyway, but these are
+ * hot paths so we'd need to be convinced there's no sigificant
+ * penalty.
+ */
+ return waitqueue_active(wq);
+}
+
/*
* INET callback when data has been received on the socket.
*/
@@ -414,7 +439,7 @@ static void svc_udp_data_ready(struct sock *sk)
set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags);
svc_xprt_enqueue(&svsk->sk_xprt);
}
- if (wq && waitqueue_active(wq))
+ if (sunrpc_waitqueue_active(wq))
wake_up_interruptible(wq);
}
@@ -432,7 +457,7 @@ static void svc_write_space(struct sock *sk)
svc_xprt_enqueue(&svsk->sk_xprt);
}
- if (wq && waitqueue_active(wq)) {
+ if (sunrpc_waitqueue_active(wq)) {
dprintk("RPC svc_write_space: someone sleeping on %p\n",
svsk);
wake_up_interruptible(wq);
@@ -787,7 +812,7 @@ static void svc_tcp_listen_data_ready(struct sock *sk)
}
wq = sk_sleep(sk);
- if (wq && waitqueue_active(wq))
+ if (sunrpc_waitqueue_active(wq))
wake_up_interruptible_all(wq);
}
@@ -808,7 +833,7 @@ static void svc_tcp_state_change(struct sock *sk)
set_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags);
svc_xprt_enqueue(&svsk->sk_xprt);
}
- if (wq && waitqueue_active(wq))
+ if (sunrpc_waitqueue_active(wq))
wake_up_interruptible_all(wq);
}
@@ -823,7 +848,7 @@ static void svc_tcp_data_ready(struct sock *sk)
set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags);
svc_xprt_enqueue(&svsk->sk_xprt);
}
- if (wq && waitqueue_active(wq))
+ if (sunrpc_waitqueue_active(wq))
wake_up_interruptible(wq);
}
@@ -1367,7 +1392,6 @@ EXPORT_SYMBOL_GPL(svc_sock_update_bufs);
/*
* Initialize socket for RPC use and create svc_sock struct
- * XXX: May want to setsockopt SO_SNDBUF and SO_RCVBUF.
*/
static struct svc_sock *svc_setup_socket(struct svc_serv *serv,
struct socket *sock,
@@ -1594,7 +1618,7 @@ static void svc_sock_detach(struct svc_xprt *xprt)
sk->sk_write_space = svsk->sk_owspace;
wq = sk_sleep(sk);
- if (wq && waitqueue_active(wq))
+ if (sunrpc_waitqueue_active(wq))
wake_up_interruptible(wq);
}
diff --git a/net/sunrpc/sysctl.c b/net/sunrpc/sysctl.c
index 887f0183b..c88d9bc06 100644
--- a/net/sunrpc/sysctl.c
+++ b/net/sunrpc/sysctl.c
@@ -76,7 +76,7 @@ static int
proc_dodebug(struct ctl_table *table, int write,
void __user *buffer, size_t *lenp, loff_t *ppos)
{
- char tmpbuf[20], c, *s;
+ char tmpbuf[20], c, *s = NULL;
char __user *p;
unsigned int value;
size_t left, len;
@@ -103,23 +103,24 @@ proc_dodebug(struct ctl_table *table, int write,
return -EFAULT;
tmpbuf[left] = '\0';
- for (s = tmpbuf, value = 0; '0' <= *s && *s <= '9'; s++, left--)
- value = 10 * value + (*s - '0');
- if (*s && !isspace(*s))
- return -EINVAL;
- while (left && isspace(*s))
- left--, s++;
+ value = simple_strtol(tmpbuf, &s, 0);
+ if (s) {
+ left -= (s - tmpbuf);
+ if (left && !isspace(*s))
+ return -EINVAL;
+ while (left && isspace(*s))
+ left--, s++;
+ } else
+ left = 0;
*(unsigned int *) table->data = value;
/* Display the RPC tasks on writing to rpc_debug */
if (strcmp(table->procname, "rpc_debug") == 0)
rpc_show_tasks(&init_net);
} else {
- if (!access_ok(VERIFY_WRITE, buffer, left))
- return -EFAULT;
- len = sprintf(tmpbuf, "%d", *(unsigned int *) table->data);
+ len = sprintf(tmpbuf, "0x%04x", *(unsigned int *) table->data);
if (len > left)
len = left;
- if (__copy_to_user(buffer, tmpbuf, len))
+ if (copy_to_user(buffer, tmpbuf, len))
return -EFAULT;
if ((left -= len) > 0) {
if (put_user('\n', (char __user *)buffer + len))
diff --git a/net/sunrpc/xprtrdma/Makefile b/net/sunrpc/xprtrdma/Makefile
index 48913de24..33f99d300 100644
--- a/net/sunrpc/xprtrdma/Makefile
+++ b/net/sunrpc/xprtrdma/Makefile
@@ -5,3 +5,4 @@ rpcrdma-y := transport.o rpc_rdma.o verbs.o \
svc_rdma.o svc_rdma_transport.o \
svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o \
module.o
+rpcrdma-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel.o
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
new file mode 100644
index 000000000..2dcb44f69
--- /dev/null
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -0,0 +1,394 @@
+/*
+ * Copyright (c) 2015 Oracle. All rights reserved.
+ *
+ * Support for backward direction RPCs on RPC/RDMA.
+ */
+
+#include <linux/module.h>
+#include <linux/sunrpc/xprt.h>
+#include <linux/sunrpc/svc.h>
+#include <linux/sunrpc/svc_xprt.h>
+
+#include "xprt_rdma.h"
+
+#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
+# define RPCDBG_FACILITY RPCDBG_TRANS
+#endif
+
+#define RPCRDMA_BACKCHANNEL_DEBUG
+
+static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt,
+ struct rpc_rqst *rqst)
+{
+ struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+ struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
+
+ spin_lock(&buf->rb_reqslock);
+ list_del(&req->rl_all);
+ spin_unlock(&buf->rb_reqslock);
+
+ rpcrdma_destroy_req(&r_xprt->rx_ia, req);
+
+ kfree(rqst);
+}
+
+static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
+ struct rpc_rqst *rqst)
+{
+ struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+ struct rpcrdma_regbuf *rb;
+ struct rpcrdma_req *req;
+ struct xdr_buf *buf;
+ size_t size;
+
+ req = rpcrdma_create_req(r_xprt);
+ if (!req)
+ return -ENOMEM;
+ req->rl_backchannel = true;
+
+ size = RPCRDMA_INLINE_WRITE_THRESHOLD(rqst);
+ rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
+ if (IS_ERR(rb))
+ goto out_fail;
+ req->rl_rdmabuf = rb;
+
+ size += RPCRDMA_INLINE_READ_THRESHOLD(rqst);
+ rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
+ if (IS_ERR(rb))
+ goto out_fail;
+ rb->rg_owner = req;
+ req->rl_sendbuf = rb;
+ /* so that rpcr_to_rdmar works when receiving a request */
+ rqst->rq_buffer = (void *)req->rl_sendbuf->rg_base;
+
+ buf = &rqst->rq_snd_buf;
+ buf->head[0].iov_base = rqst->rq_buffer;
+ buf->head[0].iov_len = 0;
+ buf->tail[0].iov_base = NULL;
+ buf->tail[0].iov_len = 0;
+ buf->page_len = 0;
+ buf->len = 0;
+ buf->buflen = size;
+
+ return 0;
+
+out_fail:
+ rpcrdma_bc_free_rqst(r_xprt, rqst);
+ return -ENOMEM;
+}
+
+/* Allocate and add receive buffers to the rpcrdma_buffer's
+ * existing list of rep's. These are released when the
+ * transport is destroyed.
+ */
+static int rpcrdma_bc_setup_reps(struct rpcrdma_xprt *r_xprt,
+ unsigned int count)
+{
+ struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
+ struct rpcrdma_rep *rep;
+ unsigned long flags;
+ int rc = 0;
+
+ while (count--) {
+ rep = rpcrdma_create_rep(r_xprt);
+ if (IS_ERR(rep)) {
+ pr_err("RPC: %s: reply buffer alloc failed\n",
+ __func__);
+ rc = PTR_ERR(rep);
+ break;
+ }
+
+ spin_lock_irqsave(&buffers->rb_lock, flags);
+ list_add(&rep->rr_list, &buffers->rb_recv_bufs);
+ spin_unlock_irqrestore(&buffers->rb_lock, flags);
+ }
+
+ return rc;
+}
+
+/**
+ * xprt_rdma_bc_setup - Pre-allocate resources for handling backchannel requests
+ * @xprt: transport associated with these backchannel resources
+ * @reqs: number of concurrent incoming requests to expect
+ *
+ * Returns 0 on success; otherwise a negative errno
+ */
+int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs)
+{
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+ struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
+ struct rpc_rqst *rqst;
+ unsigned int i;
+ int rc;
+
+ /* The backchannel reply path returns each rpc_rqst to the
+ * bc_pa_list _after_ the reply is sent. If the server is
+ * faster than the client, it can send another backward
+ * direction request before the rpc_rqst is returned to the
+ * list. The client rejects the request in this case.
+ *
+ * Twice as many rpc_rqsts are prepared to ensure there is
+ * always an rpc_rqst available as soon as a reply is sent.
+ */
+ if (reqs > RPCRDMA_BACKWARD_WRS >> 1)
+ goto out_err;
+
+ for (i = 0; i < (reqs << 1); i++) {
+ rqst = kzalloc(sizeof(*rqst), GFP_KERNEL);
+ if (!rqst) {
+ pr_err("RPC: %s: Failed to create bc rpc_rqst\n",
+ __func__);
+ goto out_free;
+ }
+
+ rqst->rq_xprt = &r_xprt->rx_xprt;
+ INIT_LIST_HEAD(&rqst->rq_list);
+ INIT_LIST_HEAD(&rqst->rq_bc_list);
+
+ if (rpcrdma_bc_setup_rqst(r_xprt, rqst))
+ goto out_free;
+
+ spin_lock_bh(&xprt->bc_pa_lock);
+ list_add(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
+ spin_unlock_bh(&xprt->bc_pa_lock);
+ }
+
+ rc = rpcrdma_bc_setup_reps(r_xprt, reqs);
+ if (rc)
+ goto out_free;
+
+ rc = rpcrdma_ep_post_extra_recv(r_xprt, reqs);
+ if (rc)
+ goto out_free;
+
+ buffer->rb_bc_srv_max_requests = reqs;
+ request_module("svcrdma");
+
+ return 0;
+
+out_free:
+ xprt_rdma_bc_destroy(xprt, reqs);
+
+out_err:
+ pr_err("RPC: %s: setup backchannel transport failed\n", __func__);
+ return -ENOMEM;
+}
+
+/**
+ * xprt_rdma_bc_up - Create transport endpoint for backchannel service
+ * @serv: server endpoint
+ * @net: network namespace
+ *
+ * The "xprt" is an implied argument: it supplies the name of the
+ * backchannel transport class.
+ *
+ * Returns zero on success, negative errno on failure
+ */
+int xprt_rdma_bc_up(struct svc_serv *serv, struct net *net)
+{
+ int ret;
+
+ ret = svc_create_xprt(serv, "rdma-bc", net, PF_INET, 0, 0);
+ if (ret < 0)
+ return ret;
+ return 0;
+}
+
+/**
+ * rpcrdma_bc_marshal_reply - Send backwards direction reply
+ * @rqst: buffer containing RPC reply data
+ *
+ * Returns zero on success.
+ */
+int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
+{
+ struct rpc_xprt *xprt = rqst->rq_xprt;
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+ struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
+ struct rpcrdma_msg *headerp;
+ size_t rpclen;
+
+ headerp = rdmab_to_msg(req->rl_rdmabuf);
+ headerp->rm_xid = rqst->rq_xid;
+ headerp->rm_vers = rpcrdma_version;
+ headerp->rm_credit =
+ cpu_to_be32(r_xprt->rx_buf.rb_bc_srv_max_requests);
+ headerp->rm_type = rdma_msg;
+ headerp->rm_body.rm_chunks[0] = xdr_zero;
+ headerp->rm_body.rm_chunks[1] = xdr_zero;
+ headerp->rm_body.rm_chunks[2] = xdr_zero;
+
+ rpclen = rqst->rq_svec[0].iov_len;
+
+ pr_info("RPC: %s: rpclen %zd headerp 0x%p lkey 0x%x\n",
+ __func__, rpclen, headerp, rdmab_lkey(req->rl_rdmabuf));
+ pr_info("RPC: %s: RPC/RDMA: %*ph\n",
+ __func__, (int)RPCRDMA_HDRLEN_MIN, headerp);
+ pr_info("RPC: %s: RPC: %*ph\n",
+ __func__, (int)rpclen, rqst->rq_svec[0].iov_base);
+
+ req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf);
+ req->rl_send_iov[0].length = RPCRDMA_HDRLEN_MIN;
+ req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf);
+
+ req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf);
+ req->rl_send_iov[1].length = rpclen;
+ req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf);
+
+ req->rl_niovs = 2;
+ return 0;
+}
+
+/**
+ * xprt_rdma_bc_destroy - Release resources for handling backchannel requests
+ * @xprt: transport associated with these backchannel resources
+ * @reqs: number of incoming requests to destroy; ignored
+ */
+void xprt_rdma_bc_destroy(struct rpc_xprt *xprt, unsigned int reqs)
+{
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+ struct rpc_rqst *rqst, *tmp;
+
+ spin_lock_bh(&xprt->bc_pa_lock);
+ list_for_each_entry_safe(rqst, tmp, &xprt->bc_pa_list, rq_bc_pa_list) {
+ list_del(&rqst->rq_bc_pa_list);
+ spin_unlock_bh(&xprt->bc_pa_lock);
+
+ rpcrdma_bc_free_rqst(r_xprt, rqst);
+
+ spin_lock_bh(&xprt->bc_pa_lock);
+ }
+ spin_unlock_bh(&xprt->bc_pa_lock);
+}
+
+/**
+ * xprt_rdma_bc_free_rqst - Release a backchannel rqst
+ * @rqst: request to release
+ */
+void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst)
+{
+ struct rpc_xprt *xprt = rqst->rq_xprt;
+
+ smp_mb__before_atomic();
+ WARN_ON_ONCE(!test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state));
+ clear_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
+ smp_mb__after_atomic();
+
+ spin_lock_bh(&xprt->bc_pa_lock);
+ list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
+ spin_unlock_bh(&xprt->bc_pa_lock);
+}
+
+/**
+ * rpcrdma_bc_receive_call - Handle a backward direction call
+ * @xprt: transport receiving the call
+ * @rep: receive buffer containing the call
+ *
+ * Called in the RPC reply handler, which runs in a tasklet.
+ * Be quick about it.
+ *
+ * Operational assumptions:
+ * o Backchannel credits are ignored, just as the NFS server
+ * forechannel currently does
+ * o The ULP manages a replay cache (eg, NFSv4.1 sessions).
+ * No replay detection is done at the transport level
+ */
+void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt,
+ struct rpcrdma_rep *rep)
+{
+ struct rpc_xprt *xprt = &r_xprt->rx_xprt;
+ struct rpcrdma_msg *headerp;
+ struct svc_serv *bc_serv;
+ struct rpcrdma_req *req;
+ struct rpc_rqst *rqst;
+ struct xdr_buf *buf;
+ size_t size;
+ __be32 *p;
+
+ headerp = rdmab_to_msg(rep->rr_rdmabuf);
+#ifdef RPCRDMA_BACKCHANNEL_DEBUG
+ pr_info("RPC: %s: callback XID %08x, length=%u\n",
+ __func__, be32_to_cpu(headerp->rm_xid), rep->rr_len);
+ pr_info("RPC: %s: %*ph\n", __func__, rep->rr_len, headerp);
+#endif
+
+ /* Sanity check:
+ * Need at least enough bytes for RPC/RDMA header, as code
+ * here references the header fields by array offset. Also,
+ * backward calls are always inline, so ensure there
+ * are some bytes beyond the RPC/RDMA header.
+ */
+ if (rep->rr_len < RPCRDMA_HDRLEN_MIN + 24)
+ goto out_short;
+ p = (__be32 *)((unsigned char *)headerp + RPCRDMA_HDRLEN_MIN);
+ size = rep->rr_len - RPCRDMA_HDRLEN_MIN;
+
+ /* Grab a free bc rqst */
+ spin_lock(&xprt->bc_pa_lock);
+ if (list_empty(&xprt->bc_pa_list)) {
+ spin_unlock(&xprt->bc_pa_lock);
+ goto out_overflow;
+ }
+ rqst = list_first_entry(&xprt->bc_pa_list,
+ struct rpc_rqst, rq_bc_pa_list);
+ list_del(&rqst->rq_bc_pa_list);
+ spin_unlock(&xprt->bc_pa_lock);
+#ifdef RPCRDMA_BACKCHANNEL_DEBUG
+ pr_info("RPC: %s: using rqst %p\n", __func__, rqst);
+#endif
+
+ /* Prepare rqst */
+ rqst->rq_reply_bytes_recvd = 0;
+ rqst->rq_bytes_sent = 0;
+ rqst->rq_xid = headerp->rm_xid;
+ set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
+
+ buf = &rqst->rq_rcv_buf;
+ memset(buf, 0, sizeof(*buf));
+ buf->head[0].iov_base = p;
+ buf->head[0].iov_len = size;
+ buf->len = size;
+
+ /* The receive buffer has to be hooked to the rpcrdma_req
+ * so that it can be reposted after the server is done
+ * parsing it but just before sending the backward
+ * direction reply.
+ */
+ req = rpcr_to_rdmar(rqst);
+#ifdef RPCRDMA_BACKCHANNEL_DEBUG
+ pr_info("RPC: %s: attaching rep %p to req %p\n",
+ __func__, rep, req);
+#endif
+ req->rl_reply = rep;
+
+ /* Defeat the retransmit detection logic in send_request */
+ req->rl_connect_cookie = 0;
+
+ /* Queue rqst for ULP's callback service */
+ bc_serv = xprt->bc_serv;
+ spin_lock(&bc_serv->sv_cb_lock);
+ list_add(&rqst->rq_bc_list, &bc_serv->sv_cb_list);
+ spin_unlock(&bc_serv->sv_cb_lock);
+
+ wake_up(&bc_serv->sv_cb_waitq);
+
+ r_xprt->rx_stats.bcall_count++;
+ return;
+
+out_overflow:
+ pr_warn("RPC/RDMA backchannel overflow\n");
+ xprt_disconnect_done(xprt);
+ /* This receive buffer gets reposted automatically
+ * when the connection is re-established.
+ */
+ return;
+
+out_short:
+ pr_warn("RPC/RDMA short backward direction call\n");
+
+ if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
+ xprt_disconnect_done(xprt);
+ else
+ pr_warn("RPC: %s: reposting rep %p\n",
+ __func__, rep);
+}
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index 5318951b3..88cf9e726 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -151,9 +151,13 @@ __frwr_init(struct rpcrdma_mw *r, struct ib_pd *pd, struct ib_device *device,
f->fr_mr = ib_alloc_mr(pd, IB_MR_TYPE_MEM_REG, depth);
if (IS_ERR(f->fr_mr))
goto out_mr_err;
- f->fr_pgl = ib_alloc_fast_reg_page_list(device, depth);
- if (IS_ERR(f->fr_pgl))
+
+ f->sg = kcalloc(depth, sizeof(*f->sg), GFP_KERNEL);
+ if (!f->sg)
goto out_list_err;
+
+ sg_init_table(f->sg, depth);
+
return 0;
out_mr_err:
@@ -163,9 +167,9 @@ out_mr_err:
return rc;
out_list_err:
- rc = PTR_ERR(f->fr_pgl);
- dprintk("RPC: %s: ib_alloc_fast_reg_page_list status %i\n",
- __func__, rc);
+ rc = -ENOMEM;
+ dprintk("RPC: %s: sg allocation failure\n",
+ __func__);
ib_dereg_mr(f->fr_mr);
return rc;
}
@@ -179,7 +183,7 @@ __frwr_release(struct rpcrdma_mw *r)
if (rc)
dprintk("RPC: %s: ib_dereg_mr status %i\n",
__func__, rc);
- ib_free_fast_reg_page_list(r->r.frmr.fr_pgl);
+ kfree(r->r.frmr.sg);
}
static int
@@ -252,8 +256,11 @@ frwr_sendcompletion(struct ib_wc *wc)
/* WARNING: Only wr_id and status are reliable at this point */
r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
- pr_warn("RPC: %s: frmr %p flushed, status %s (%d)\n",
- __func__, r, ib_wc_status_msg(wc->status), wc->status);
+ if (wc->status == IB_WC_WR_FLUSH_ERR)
+ dprintk("RPC: %s: frmr %p flushed\n", __func__, r);
+ else
+ pr_warn("RPC: %s: frmr %p error, status %s (%d)\n",
+ __func__, r, ib_wc_status_msg(wc->status), wc->status);
r->r.frmr.fr_state = FRMR_IS_STALE;
}
@@ -312,13 +319,10 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
struct rpcrdma_mw *mw;
struct rpcrdma_frmr *frmr;
struct ib_mr *mr;
- struct ib_send_wr fastreg_wr, *bad_wr;
+ struct ib_reg_wr reg_wr;
+ struct ib_send_wr *bad_wr;
+ int rc, i, n, dma_nents;
u8 key;
- int len, pageoff;
- int i, rc;
- int seg_len;
- u64 pa;
- int page_no;
mw = seg1->rl_mw;
seg1->rl_mw = NULL;
@@ -331,64 +335,80 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
} while (mw->r.frmr.fr_state != FRMR_IS_INVALID);
frmr = &mw->r.frmr;
frmr->fr_state = FRMR_IS_VALID;
+ mr = frmr->fr_mr;
- pageoff = offset_in_page(seg1->mr_offset);
- seg1->mr_offset -= pageoff; /* start of page */
- seg1->mr_len += pageoff;
- len = -pageoff;
if (nsegs > ia->ri_max_frmr_depth)
nsegs = ia->ri_max_frmr_depth;
- for (page_no = i = 0; i < nsegs;) {
- rpcrdma_map_one(device, seg, direction);
- pa = seg->mr_dma;
- for (seg_len = seg->mr_len; seg_len > 0; seg_len -= PAGE_SIZE) {
- frmr->fr_pgl->page_list[page_no++] = pa;
- pa += PAGE_SIZE;
- }
- len += seg->mr_len;
+ for (i = 0; i < nsegs;) {
+ if (seg->mr_page)
+ sg_set_page(&frmr->sg[i],
+ seg->mr_page,
+ seg->mr_len,
+ offset_in_page(seg->mr_offset));
+ else
+ sg_set_buf(&frmr->sg[i], seg->mr_offset,
+ seg->mr_len);
+
++seg;
++i;
+
/* Check for holes */
if ((i < nsegs && offset_in_page(seg->mr_offset)) ||
offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
break;
}
- dprintk("RPC: %s: Using frmr %p to map %d segments (%d bytes)\n",
- __func__, mw, i, len);
-
- memset(&fastreg_wr, 0, sizeof(fastreg_wr));
- fastreg_wr.wr_id = (unsigned long)(void *)mw;
- fastreg_wr.opcode = IB_WR_FAST_REG_MR;
- fastreg_wr.wr.fast_reg.iova_start = seg1->mr_dma + pageoff;
- fastreg_wr.wr.fast_reg.page_list = frmr->fr_pgl;
- fastreg_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
- fastreg_wr.wr.fast_reg.page_list_len = page_no;
- fastreg_wr.wr.fast_reg.length = len;
- fastreg_wr.wr.fast_reg.access_flags = writing ?
- IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
- IB_ACCESS_REMOTE_READ;
- mr = frmr->fr_mr;
+ frmr->sg_nents = i;
+
+ dma_nents = ib_dma_map_sg(device, frmr->sg, frmr->sg_nents, direction);
+ if (!dma_nents) {
+ pr_err("RPC: %s: failed to dma map sg %p sg_nents %u\n",
+ __func__, frmr->sg, frmr->sg_nents);
+ return -ENOMEM;
+ }
+
+ n = ib_map_mr_sg(mr, frmr->sg, frmr->sg_nents, PAGE_SIZE);
+ if (unlikely(n != frmr->sg_nents)) {
+ pr_err("RPC: %s: failed to map mr %p (%u/%u)\n",
+ __func__, frmr->fr_mr, n, frmr->sg_nents);
+ rc = n < 0 ? n : -EINVAL;
+ goto out_senderr;
+ }
+
+ dprintk("RPC: %s: Using frmr %p to map %u segments (%u bytes)\n",
+ __func__, mw, frmr->sg_nents, mr->length);
+
key = (u8)(mr->rkey & 0x000000FF);
ib_update_fast_reg_key(mr, ++key);
- fastreg_wr.wr.fast_reg.rkey = mr->rkey;
+
+ reg_wr.wr.next = NULL;
+ reg_wr.wr.opcode = IB_WR_REG_MR;
+ reg_wr.wr.wr_id = (uintptr_t)mw;
+ reg_wr.wr.num_sge = 0;
+ reg_wr.wr.send_flags = 0;
+ reg_wr.mr = mr;
+ reg_wr.key = mr->rkey;
+ reg_wr.access = writing ?
+ IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
+ IB_ACCESS_REMOTE_READ;
DECR_CQCOUNT(&r_xprt->rx_ep);
- rc = ib_post_send(ia->ri_id->qp, &fastreg_wr, &bad_wr);
+ rc = ib_post_send(ia->ri_id->qp, &reg_wr.wr, &bad_wr);
if (rc)
goto out_senderr;
+ seg1->mr_dir = direction;
seg1->rl_mw = mw;
seg1->mr_rkey = mr->rkey;
- seg1->mr_base = seg1->mr_dma + pageoff;
- seg1->mr_nsegs = i;
- seg1->mr_len = len;
- return i;
+ seg1->mr_base = mr->iova;
+ seg1->mr_nsegs = frmr->sg_nents;
+ seg1->mr_len = mr->length;
+
+ return frmr->sg_nents;
out_senderr:
dprintk("RPC: %s: ib_post_send status %i\n", __func__, rc);
- while (i--)
- rpcrdma_unmap_one(device, --seg);
+ ib_dma_unmap_sg(device, frmr->sg, dma_nents, direction);
__frwr_queue_recovery(mw);
return rc;
}
@@ -402,22 +422,22 @@ frwr_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg)
struct rpcrdma_mr_seg *seg1 = seg;
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
struct rpcrdma_mw *mw = seg1->rl_mw;
+ struct rpcrdma_frmr *frmr = &mw->r.frmr;
struct ib_send_wr invalidate_wr, *bad_wr;
int rc, nsegs = seg->mr_nsegs;
dprintk("RPC: %s: FRMR %p\n", __func__, mw);
seg1->rl_mw = NULL;
- mw->r.frmr.fr_state = FRMR_IS_INVALID;
+ frmr->fr_state = FRMR_IS_INVALID;
memset(&invalidate_wr, 0, sizeof(invalidate_wr));
invalidate_wr.wr_id = (unsigned long)(void *)mw;
invalidate_wr.opcode = IB_WR_LOCAL_INV;
- invalidate_wr.ex.invalidate_rkey = mw->r.frmr.fr_mr->rkey;
+ invalidate_wr.ex.invalidate_rkey = frmr->fr_mr->rkey;
DECR_CQCOUNT(&r_xprt->rx_ep);
- while (seg1->mr_nsegs--)
- rpcrdma_unmap_one(ia->ri_device, seg++);
+ ib_dma_unmap_sg(ia->ri_device, frmr->sg, frmr->sg_nents, seg1->mr_dir);
read_lock(&ia->ri_qplock);
rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
read_unlock(&ia->ri_qplock);
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index bc8bd6577..c10d96994 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -441,6 +441,11 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
enum rpcrdma_chunktype rtype, wtype;
struct rpcrdma_msg *headerp;
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+ if (test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state))
+ return rpcrdma_bc_marshal_reply(rqst);
+#endif
+
/*
* rpclen gets amount of data in first buffer, which is the
* pre-registered buffer.
@@ -711,6 +716,37 @@ rpcrdma_connect_worker(struct work_struct *work)
spin_unlock_bh(&xprt->transport_lock);
}
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+/* By convention, backchannel calls arrive via rdma_msg type
+ * messages, and never populate the chunk lists. This makes
+ * the RPC/RDMA header small and fixed in size, so it is
+ * straightforward to check the RPC header's direction field.
+ */
+static bool
+rpcrdma_is_bcall(struct rpcrdma_msg *headerp)
+{
+ __be32 *p = (__be32 *)headerp;
+
+ if (headerp->rm_type != rdma_msg)
+ return false;
+ if (headerp->rm_body.rm_chunks[0] != xdr_zero)
+ return false;
+ if (headerp->rm_body.rm_chunks[1] != xdr_zero)
+ return false;
+ if (headerp->rm_body.rm_chunks[2] != xdr_zero)
+ return false;
+
+ /* sanity */
+ if (p[7] != headerp->rm_xid)
+ return false;
+ /* call direction */
+ if (p[8] != cpu_to_be32(RPC_CALL))
+ return false;
+
+ return true;
+}
+#endif /* CONFIG_SUNRPC_BACKCHANNEL */
+
/*
* This function is called when an async event is posted to
* the connection which changes the connection state. All it
@@ -723,8 +759,8 @@ rpcrdma_conn_func(struct rpcrdma_ep *ep)
schedule_delayed_work(&ep->rep_connect_worker, 0);
}
-/*
- * Called as a tasklet to do req/reply match and complete a request
+/* Process received RPC/RDMA messages.
+ *
* Errors must result in the RPC task either being awakened, or
* allowed to timeout, to discover the errors at that time.
*/
@@ -741,52 +777,32 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
unsigned long cwnd;
u32 credits;
- /* Check status. If bad, signal disconnect and return rep to pool */
- if (rep->rr_len == ~0U) {
- rpcrdma_recv_buffer_put(rep);
- if (r_xprt->rx_ep.rep_connected == 1) {
- r_xprt->rx_ep.rep_connected = -EIO;
- rpcrdma_conn_func(&r_xprt->rx_ep);
- }
- return;
- }
- if (rep->rr_len < RPCRDMA_HDRLEN_MIN) {
- dprintk("RPC: %s: short/invalid reply\n", __func__);
- goto repost;
- }
+ dprintk("RPC: %s: incoming rep %p\n", __func__, rep);
+
+ if (rep->rr_len == RPCRDMA_BAD_LEN)
+ goto out_badstatus;
+ if (rep->rr_len < RPCRDMA_HDRLEN_MIN)
+ goto out_shortreply;
+
headerp = rdmab_to_msg(rep->rr_rdmabuf);
- if (headerp->rm_vers != rpcrdma_version) {
- dprintk("RPC: %s: invalid version %d\n",
- __func__, be32_to_cpu(headerp->rm_vers));
- goto repost;
- }
+ if (headerp->rm_vers != rpcrdma_version)
+ goto out_badversion;
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+ if (rpcrdma_is_bcall(headerp))
+ goto out_bcall;
+#endif
- /* Get XID and try for a match. */
- spin_lock(&xprt->transport_lock);
+ /* Match incoming rpcrdma_rep to an rpcrdma_req to
+ * get context for handling any incoming chunks.
+ */
+ spin_lock_bh(&xprt->transport_lock);
rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
- if (rqst == NULL) {
- spin_unlock(&xprt->transport_lock);
- dprintk("RPC: %s: reply 0x%p failed "
- "to match any request xid 0x%08x len %d\n",
- __func__, rep, be32_to_cpu(headerp->rm_xid),
- rep->rr_len);
-repost:
- r_xprt->rx_stats.bad_reply_count++;
- if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
- rpcrdma_recv_buffer_put(rep);
+ if (!rqst)
+ goto out_nomatch;
- return;
- }
-
- /* get request object */
req = rpcr_to_rdmar(rqst);
- if (req->rl_reply) {
- spin_unlock(&xprt->transport_lock);
- dprintk("RPC: %s: duplicate reply 0x%p to RPC "
- "request 0x%p: xid 0x%08x\n", __func__, rep, req,
- be32_to_cpu(headerp->rm_xid));
- goto repost;
- }
+ if (req->rl_reply)
+ goto out_duplicate;
dprintk("RPC: %s: reply 0x%p completes request 0x%p\n"
" RPC request 0x%p xid 0x%08x\n",
@@ -883,8 +899,50 @@ badheader:
if (xprt->cwnd > cwnd)
xprt_release_rqst_cong(rqst->rq_task);
+ xprt_complete_rqst(rqst->rq_task, status);
+ spin_unlock_bh(&xprt->transport_lock);
dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
__func__, xprt, rqst, status);
- xprt_complete_rqst(rqst->rq_task, status);
- spin_unlock(&xprt->transport_lock);
+ return;
+
+out_badstatus:
+ rpcrdma_recv_buffer_put(rep);
+ if (r_xprt->rx_ep.rep_connected == 1) {
+ r_xprt->rx_ep.rep_connected = -EIO;
+ rpcrdma_conn_func(&r_xprt->rx_ep);
+ }
+ return;
+
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+out_bcall:
+ rpcrdma_bc_receive_call(r_xprt, rep);
+ return;
+#endif
+
+out_shortreply:
+ dprintk("RPC: %s: short/invalid reply\n", __func__);
+ goto repost;
+
+out_badversion:
+ dprintk("RPC: %s: invalid version %d\n",
+ __func__, be32_to_cpu(headerp->rm_vers));
+ goto repost;
+
+out_nomatch:
+ spin_unlock_bh(&xprt->transport_lock);
+ dprintk("RPC: %s: no match for incoming xid 0x%08x len %d\n",
+ __func__, be32_to_cpu(headerp->rm_xid),
+ rep->rr_len);
+ goto repost;
+
+out_duplicate:
+ spin_unlock_bh(&xprt->transport_lock);
+ dprintk("RPC: %s: "
+ "duplicate reply %p to RPC request %p: xid 0x%08x\n",
+ __func__, rep, req, be32_to_cpu(headerp->rm_xid));
+
+repost:
+ r_xprt->rx_stats.bad_reply_count++;
+ if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
+ rpcrdma_recv_buffer_put(rep);
}
diff --git a/net/sunrpc/xprtrdma/svc_rdma.c b/net/sunrpc/xprtrdma/svc_rdma.c
index 2cd252f02..1b7051bdb 100644
--- a/net/sunrpc/xprtrdma/svc_rdma.c
+++ b/net/sunrpc/xprtrdma/svc_rdma.c
@@ -239,6 +239,9 @@ void svc_rdma_cleanup(void)
unregister_sysctl_table(svcrdma_table_header);
svcrdma_table_header = NULL;
}
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+ svc_unreg_xprt_class(&svc_rdma_bc_class);
+#endif
svc_unreg_xprt_class(&svc_rdma_class);
kmem_cache_destroy(svc_rdma_map_cachep);
kmem_cache_destroy(svc_rdma_ctxt_cachep);
@@ -286,6 +289,9 @@ int svc_rdma_init(void)
/* Register RDMA with the SVC transport switch */
svc_reg_xprt_class(&svc_rdma_class);
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+ svc_reg_xprt_class(&svc_rdma_bc_class);
+#endif
return 0;
err1:
kmem_cache_destroy(svc_rdma_map_cachep);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index f0c3ff67c..ff4f01e52 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -126,7 +126,7 @@ int rdma_read_chunk_lcl(struct svcxprt_rdma *xprt,
u64 rs_offset,
bool last)
{
- struct ib_send_wr read_wr;
+ struct ib_rdma_wr read_wr;
int pages_needed = PAGE_ALIGN(*page_offset + rs_length) >> PAGE_SHIFT;
struct svc_rdma_op_ctxt *ctxt = svc_rdma_get_context(xprt);
int ret, read, pno;
@@ -180,16 +180,16 @@ int rdma_read_chunk_lcl(struct svcxprt_rdma *xprt,
clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
memset(&read_wr, 0, sizeof(read_wr));
- read_wr.wr_id = (unsigned long)ctxt;
- read_wr.opcode = IB_WR_RDMA_READ;
- ctxt->wr_op = read_wr.opcode;
- read_wr.send_flags = IB_SEND_SIGNALED;
- read_wr.wr.rdma.rkey = rs_handle;
- read_wr.wr.rdma.remote_addr = rs_offset;
- read_wr.sg_list = ctxt->sge;
- read_wr.num_sge = pages_needed;
-
- ret = svc_rdma_send(xprt, &read_wr);
+ read_wr.wr.wr_id = (unsigned long)ctxt;
+ read_wr.wr.opcode = IB_WR_RDMA_READ;
+ ctxt->wr_op = read_wr.wr.opcode;
+ read_wr.wr.send_flags = IB_SEND_SIGNALED;
+ read_wr.rkey = rs_handle;
+ read_wr.remote_addr = rs_offset;
+ read_wr.wr.sg_list = ctxt->sge;
+ read_wr.wr.num_sge = pages_needed;
+
+ ret = svc_rdma_send(xprt, &read_wr.wr);
if (ret) {
pr_err("svcrdma: Error %d posting RDMA_READ\n", ret);
set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
@@ -219,14 +219,14 @@ int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,
u64 rs_offset,
bool last)
{
- struct ib_send_wr read_wr;
+ struct ib_rdma_wr read_wr;
struct ib_send_wr inv_wr;
- struct ib_send_wr fastreg_wr;
+ struct ib_reg_wr reg_wr;
u8 key;
- int pages_needed = PAGE_ALIGN(*page_offset + rs_length) >> PAGE_SHIFT;
+ int nents = PAGE_ALIGN(*page_offset + rs_length) >> PAGE_SHIFT;
struct svc_rdma_op_ctxt *ctxt = svc_rdma_get_context(xprt);
struct svc_rdma_fastreg_mr *frmr = svc_rdma_get_frmr(xprt);
- int ret, read, pno;
+ int ret, read, pno, dma_nents, n;
u32 pg_off = *page_offset;
u32 pg_no = *page_no;
@@ -235,17 +235,14 @@ int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,
ctxt->direction = DMA_FROM_DEVICE;
ctxt->frmr = frmr;
- pages_needed = min_t(int, pages_needed, xprt->sc_frmr_pg_list_len);
- read = min_t(int, (pages_needed << PAGE_SHIFT) - *page_offset,
- rs_length);
+ nents = min_t(unsigned int, nents, xprt->sc_frmr_pg_list_len);
+ read = min_t(int, (nents << PAGE_SHIFT) - *page_offset, rs_length);
- frmr->kva = page_address(rqstp->rq_arg.pages[pg_no]);
frmr->direction = DMA_FROM_DEVICE;
frmr->access_flags = (IB_ACCESS_LOCAL_WRITE|IB_ACCESS_REMOTE_WRITE);
- frmr->map_len = pages_needed << PAGE_SHIFT;
- frmr->page_list_len = pages_needed;
+ frmr->sg_nents = nents;
- for (pno = 0; pno < pages_needed; pno++) {
+ for (pno = 0; pno < nents; pno++) {
int len = min_t(int, rs_length, PAGE_SIZE - pg_off);
head->arg.pages[pg_no] = rqstp->rq_arg.pages[pg_no];
@@ -253,17 +250,12 @@ int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,
head->arg.len += len;
if (!pg_off)
head->count++;
+
+ sg_set_page(&frmr->sg[pno], rqstp->rq_arg.pages[pg_no],
+ len, pg_off);
+
rqstp->rq_respages = &rqstp->rq_arg.pages[pg_no+1];
rqstp->rq_next_page = rqstp->rq_respages + 1;
- frmr->page_list->page_list[pno] =
- ib_dma_map_page(xprt->sc_cm_id->device,
- head->arg.pages[pg_no], 0,
- PAGE_SIZE, DMA_FROM_DEVICE);
- ret = ib_dma_mapping_error(xprt->sc_cm_id->device,
- frmr->page_list->page_list[pno]);
- if (ret)
- goto err;
- atomic_inc(&xprt->sc_dma_used);
/* adjust offset and wrap to next page if needed */
pg_off += len;
@@ -279,43 +271,57 @@ int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,
else
clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
+ dma_nents = ib_dma_map_sg(xprt->sc_cm_id->device,
+ frmr->sg, frmr->sg_nents,
+ frmr->direction);
+ if (!dma_nents) {
+ pr_err("svcrdma: failed to dma map sg %p\n",
+ frmr->sg);
+ return -ENOMEM;
+ }
+ atomic_inc(&xprt->sc_dma_used);
+
+ n = ib_map_mr_sg(frmr->mr, frmr->sg, frmr->sg_nents, PAGE_SIZE);
+ if (unlikely(n != frmr->sg_nents)) {
+ pr_err("svcrdma: failed to map mr %p (%d/%d elements)\n",
+ frmr->mr, n, frmr->sg_nents);
+ return n < 0 ? n : -EINVAL;
+ }
+
/* Bump the key */
key = (u8)(frmr->mr->lkey & 0x000000FF);
ib_update_fast_reg_key(frmr->mr, ++key);
- ctxt->sge[0].addr = (unsigned long)frmr->kva + *page_offset;
+ ctxt->sge[0].addr = frmr->mr->iova;
ctxt->sge[0].lkey = frmr->mr->lkey;
- ctxt->sge[0].length = read;
+ ctxt->sge[0].length = frmr->mr->length;
ctxt->count = 1;
ctxt->read_hdr = head;
- /* Prepare FASTREG WR */
- memset(&fastreg_wr, 0, sizeof(fastreg_wr));
- fastreg_wr.opcode = IB_WR_FAST_REG_MR;
- fastreg_wr.send_flags = IB_SEND_SIGNALED;
- fastreg_wr.wr.fast_reg.iova_start = (unsigned long)frmr->kva;
- fastreg_wr.wr.fast_reg.page_list = frmr->page_list;
- fastreg_wr.wr.fast_reg.page_list_len = frmr->page_list_len;
- fastreg_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
- fastreg_wr.wr.fast_reg.length = frmr->map_len;
- fastreg_wr.wr.fast_reg.access_flags = frmr->access_flags;
- fastreg_wr.wr.fast_reg.rkey = frmr->mr->lkey;
- fastreg_wr.next = &read_wr;
+ /* Prepare REG WR */
+ reg_wr.wr.opcode = IB_WR_REG_MR;
+ reg_wr.wr.wr_id = 0;
+ reg_wr.wr.send_flags = IB_SEND_SIGNALED;
+ reg_wr.wr.num_sge = 0;
+ reg_wr.mr = frmr->mr;
+ reg_wr.key = frmr->mr->lkey;
+ reg_wr.access = frmr->access_flags;
+ reg_wr.wr.next = &read_wr.wr;
/* Prepare RDMA_READ */
memset(&read_wr, 0, sizeof(read_wr));
- read_wr.send_flags = IB_SEND_SIGNALED;
- read_wr.wr.rdma.rkey = rs_handle;
- read_wr.wr.rdma.remote_addr = rs_offset;
- read_wr.sg_list = ctxt->sge;
- read_wr.num_sge = 1;
+ read_wr.wr.send_flags = IB_SEND_SIGNALED;
+ read_wr.rkey = rs_handle;
+ read_wr.remote_addr = rs_offset;
+ read_wr.wr.sg_list = ctxt->sge;
+ read_wr.wr.num_sge = 1;
if (xprt->sc_dev_caps & SVCRDMA_DEVCAP_READ_W_INV) {
- read_wr.opcode = IB_WR_RDMA_READ_WITH_INV;
- read_wr.wr_id = (unsigned long)ctxt;
- read_wr.ex.invalidate_rkey = ctxt->frmr->mr->lkey;
+ read_wr.wr.opcode = IB_WR_RDMA_READ_WITH_INV;
+ read_wr.wr.wr_id = (unsigned long)ctxt;
+ read_wr.wr.ex.invalidate_rkey = ctxt->frmr->mr->lkey;
} else {
- read_wr.opcode = IB_WR_RDMA_READ;
- read_wr.next = &inv_wr;
+ read_wr.wr.opcode = IB_WR_RDMA_READ;
+ read_wr.wr.next = &inv_wr;
/* Prepare invalidate */
memset(&inv_wr, 0, sizeof(inv_wr));
inv_wr.wr_id = (unsigned long)ctxt;
@@ -323,10 +329,10 @@ int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,
inv_wr.send_flags = IB_SEND_SIGNALED | IB_SEND_FENCE;
inv_wr.ex.invalidate_rkey = frmr->mr->lkey;
}
- ctxt->wr_op = read_wr.opcode;
+ ctxt->wr_op = read_wr.wr.opcode;
/* Post the chain */
- ret = svc_rdma_send(xprt, &fastreg_wr);
+ ret = svc_rdma_send(xprt, &reg_wr.wr);
if (ret) {
pr_err("svcrdma: Error %d posting RDMA_READ\n", ret);
set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
@@ -340,7 +346,8 @@ int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,
atomic_inc(&rdma_stat_read);
return ret;
err:
- svc_rdma_unmap_dma(ctxt);
+ ib_dma_unmap_sg(xprt->sc_cm_id->device,
+ frmr->sg, frmr->sg_nents, frmr->direction);
svc_rdma_put_context(ctxt, 0);
svc_rdma_put_frmr(xprt, frmr);
return ret;
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index 1dfae8317..969a1ab75 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -217,7 +217,7 @@ static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,
u32 xdr_off, int write_len,
struct svc_rdma_req_map *vec)
{
- struct ib_send_wr write_wr;
+ struct ib_rdma_wr write_wr;
struct ib_sge *sge;
int xdr_sge_no;
int sge_no;
@@ -282,17 +282,17 @@ static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,
/* Prepare WRITE WR */
memset(&write_wr, 0, sizeof write_wr);
ctxt->wr_op = IB_WR_RDMA_WRITE;
- write_wr.wr_id = (unsigned long)ctxt;
- write_wr.sg_list = &sge[0];
- write_wr.num_sge = sge_no;
- write_wr.opcode = IB_WR_RDMA_WRITE;
- write_wr.send_flags = IB_SEND_SIGNALED;
- write_wr.wr.rdma.rkey = rmr;
- write_wr.wr.rdma.remote_addr = to;
+ write_wr.wr.wr_id = (unsigned long)ctxt;
+ write_wr.wr.sg_list = &sge[0];
+ write_wr.wr.num_sge = sge_no;
+ write_wr.wr.opcode = IB_WR_RDMA_WRITE;
+ write_wr.wr.send_flags = IB_SEND_SIGNALED;
+ write_wr.rkey = rmr;
+ write_wr.remote_addr = to;
/* Post It */
atomic_inc(&rdma_stat_write);
- if (svc_rdma_send(xprt, &write_wr))
+ if (svc_rdma_send(xprt, &write_wr.wr))
goto err;
return write_len - bc;
err:
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index fcc3eb80c..b348b4ade 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -56,6 +56,7 @@
#define RPCDBG_FACILITY RPCDBG_SVCXPRT
+static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *, int);
static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
struct net *net,
struct sockaddr *sa, int salen,
@@ -95,6 +96,63 @@ struct svc_xprt_class svc_rdma_class = {
.xcl_ident = XPRT_TRANSPORT_RDMA,
};
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *, struct net *,
+ struct sockaddr *, int, int);
+static void svc_rdma_bc_detach(struct svc_xprt *);
+static void svc_rdma_bc_free(struct svc_xprt *);
+
+static struct svc_xprt_ops svc_rdma_bc_ops = {
+ .xpo_create = svc_rdma_bc_create,
+ .xpo_detach = svc_rdma_bc_detach,
+ .xpo_free = svc_rdma_bc_free,
+ .xpo_prep_reply_hdr = svc_rdma_prep_reply_hdr,
+ .xpo_secure_port = svc_rdma_secure_port,
+};
+
+struct svc_xprt_class svc_rdma_bc_class = {
+ .xcl_name = "rdma-bc",
+ .xcl_owner = THIS_MODULE,
+ .xcl_ops = &svc_rdma_bc_ops,
+ .xcl_max_payload = (1024 - RPCRDMA_HDRLEN_MIN)
+};
+
+static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *serv,
+ struct net *net,
+ struct sockaddr *sa, int salen,
+ int flags)
+{
+ struct svcxprt_rdma *cma_xprt;
+ struct svc_xprt *xprt;
+
+ cma_xprt = rdma_create_xprt(serv, 0);
+ if (!cma_xprt)
+ return ERR_PTR(-ENOMEM);
+ xprt = &cma_xprt->sc_xprt;
+
+ svc_xprt_init(net, &svc_rdma_bc_class, xprt, serv);
+ serv->sv_bc_xprt = xprt;
+
+ dprintk("svcrdma: %s(%p)\n", __func__, xprt);
+ return xprt;
+}
+
+static void svc_rdma_bc_detach(struct svc_xprt *xprt)
+{
+ dprintk("svcrdma: %s(%p)\n", __func__, xprt);
+}
+
+static void svc_rdma_bc_free(struct svc_xprt *xprt)
+{
+ struct svcxprt_rdma *rdma =
+ container_of(xprt, struct svcxprt_rdma, sc_xprt);
+
+ dprintk("svcrdma: %s(%p)\n", __func__, xprt);
+ if (xprt)
+ kfree(rdma);
+}
+#endif /* CONFIG_SUNRPC_BACKCHANNEL */
+
struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
{
struct svc_rdma_op_ctxt *ctxt;
@@ -692,8 +750,8 @@ static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
if (!cma_xprt)
return ERR_PTR(-ENOMEM);
- listen_id = rdma_create_id(rdma_listen_handler, cma_xprt, RDMA_PS_TCP,
- IB_QPT_RC);
+ listen_id = rdma_create_id(&init_net, rdma_listen_handler, cma_xprt,
+ RDMA_PS_TCP, IB_QPT_RC);
if (IS_ERR(listen_id)) {
ret = PTR_ERR(listen_id);
dprintk("svcrdma: rdma_create_id failed = %d\n", ret);
@@ -732,7 +790,7 @@ static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
static struct svc_rdma_fastreg_mr *rdma_alloc_frmr(struct svcxprt_rdma *xprt)
{
struct ib_mr *mr;
- struct ib_fast_reg_page_list *pl;
+ struct scatterlist *sg;
struct svc_rdma_fastreg_mr *frmr;
u32 num_sg;
@@ -745,13 +803,14 @@ static struct svc_rdma_fastreg_mr *rdma_alloc_frmr(struct svcxprt_rdma *xprt)
if (IS_ERR(mr))
goto err_free_frmr;
- pl = ib_alloc_fast_reg_page_list(xprt->sc_cm_id->device,
- num_sg);
- if (IS_ERR(pl))
+ sg = kcalloc(RPCSVC_MAXPAGES, sizeof(*sg), GFP_KERNEL);
+ if (!sg)
goto err_free_mr;
+ sg_init_table(sg, RPCSVC_MAXPAGES);
+
frmr->mr = mr;
- frmr->page_list = pl;
+ frmr->sg = sg;
INIT_LIST_HEAD(&frmr->frmr_list);
return frmr;
@@ -771,8 +830,8 @@ static void rdma_dealloc_frmr_q(struct svcxprt_rdma *xprt)
frmr = list_entry(xprt->sc_frmr_q.next,
struct svc_rdma_fastreg_mr, frmr_list);
list_del_init(&frmr->frmr_list);
+ kfree(frmr->sg);
ib_dereg_mr(frmr->mr);
- ib_free_fast_reg_page_list(frmr->page_list);
kfree(frmr);
}
}
@@ -786,8 +845,7 @@ struct svc_rdma_fastreg_mr *svc_rdma_get_frmr(struct svcxprt_rdma *rdma)
frmr = list_entry(rdma->sc_frmr_q.next,
struct svc_rdma_fastreg_mr, frmr_list);
list_del_init(&frmr->frmr_list);
- frmr->map_len = 0;
- frmr->page_list_len = 0;
+ frmr->sg_nents = 0;
}
spin_unlock_bh(&rdma->sc_frmr_q_lock);
if (frmr)
@@ -796,25 +854,13 @@ struct svc_rdma_fastreg_mr *svc_rdma_get_frmr(struct svcxprt_rdma *rdma)
return rdma_alloc_frmr(rdma);
}
-static void frmr_unmap_dma(struct svcxprt_rdma *xprt,
- struct svc_rdma_fastreg_mr *frmr)
-{
- int page_no;
- for (page_no = 0; page_no < frmr->page_list_len; page_no++) {
- dma_addr_t addr = frmr->page_list->page_list[page_no];
- if (ib_dma_mapping_error(frmr->mr->device, addr))
- continue;
- atomic_dec(&xprt->sc_dma_used);
- ib_dma_unmap_page(frmr->mr->device, addr, PAGE_SIZE,
- frmr->direction);
- }
-}
-
void svc_rdma_put_frmr(struct svcxprt_rdma *rdma,
struct svc_rdma_fastreg_mr *frmr)
{
if (frmr) {
- frmr_unmap_dma(rdma, frmr);
+ ib_dma_unmap_sg(rdma->sc_cm_id->device,
+ frmr->sg, frmr->sg_nents, frmr->direction);
+ atomic_dec(&rdma->sc_dma_used);
spin_lock_bh(&rdma->sc_frmr_q_lock);
WARN_ON_ONCE(!list_empty(&frmr->frmr_list));
list_add(&frmr->frmr_list, &rdma->sc_frmr_q);
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 41e452bc5..8c545f7d7 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -676,7 +676,7 @@ static void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
static int
xprt_rdma_enable_swap(struct rpc_xprt *xprt)
{
- return -EINVAL;
+ return 0;
}
static void
@@ -705,7 +705,13 @@ static struct rpc_xprt_ops xprt_rdma_procs = {
.print_stats = xprt_rdma_print_stats,
.enable_swap = xprt_rdma_enable_swap,
.disable_swap = xprt_rdma_disable_swap,
- .inject_disconnect = xprt_rdma_inject_disconnect
+ .inject_disconnect = xprt_rdma_inject_disconnect,
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+ .bc_setup = xprt_rdma_bc_setup,
+ .bc_up = xprt_rdma_bc_up,
+ .bc_free_rqst = xprt_rdma_bc_free_rqst,
+ .bc_destroy = xprt_rdma_bc_destroy,
+#endif
};
static struct xprt_class xprt_rdma = {
@@ -732,6 +738,7 @@ void xprt_rdma_cleanup(void)
dprintk("RPC: %s: xprt_unregister returned %i\n",
__func__, rc);
+ rpcrdma_destroy_wq();
frwr_destroy_recovery_wq();
}
@@ -743,8 +750,15 @@ int xprt_rdma_init(void)
if (rc)
return rc;
+ rc = rpcrdma_alloc_wq();
+ if (rc) {
+ frwr_destroy_recovery_wq();
+ return rc;
+ }
+
rc = xprt_register_transport(&xprt_rdma);
if (rc) {
+ rpcrdma_destroy_wq();
frwr_destroy_recovery_wq();
return rc;
}
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 5502d4dad..eadd16551 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -68,47 +68,33 @@
* internal functions
*/
-/*
- * handle replies in tasklet context, using a single, global list
- * rdma tasklet function -- just turn around and call the func
- * for all replies on the list
- */
-
-static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
-static LIST_HEAD(rpcrdma_tasklets_g);
+static struct workqueue_struct *rpcrdma_receive_wq;
-static void
-rpcrdma_run_tasklet(unsigned long data)
+int
+rpcrdma_alloc_wq(void)
{
- struct rpcrdma_rep *rep;
- unsigned long flags;
-
- data = data;
- spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
- while (!list_empty(&rpcrdma_tasklets_g)) {
- rep = list_entry(rpcrdma_tasklets_g.next,
- struct rpcrdma_rep, rr_list);
- list_del(&rep->rr_list);
- spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
+ struct workqueue_struct *recv_wq;
- rpcrdma_reply_handler(rep);
+ recv_wq = alloc_workqueue("xprtrdma_receive",
+ WQ_MEM_RECLAIM | WQ_UNBOUND | WQ_HIGHPRI,
+ 0);
+ if (!recv_wq)
+ return -ENOMEM;
- spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
- }
- spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
+ rpcrdma_receive_wq = recv_wq;
+ return 0;
}
-static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
-
-static void
-rpcrdma_schedule_tasklet(struct list_head *sched_list)
+void
+rpcrdma_destroy_wq(void)
{
- unsigned long flags;
+ struct workqueue_struct *wq;
- spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
- list_splice_tail(sched_list, &rpcrdma_tasklets_g);
- spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
- tasklet_schedule(&rpcrdma_tasklet_g);
+ if (rpcrdma_receive_wq) {
+ wq = rpcrdma_receive_wq;
+ rpcrdma_receive_wq = NULL;
+ destroy_workqueue(wq);
+ }
}
static void
@@ -158,63 +144,54 @@ rpcrdma_sendcq_process_wc(struct ib_wc *wc)
}
}
-static int
-rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
+/* The common case is a single send completion is waiting. By
+ * passing two WC entries to ib_poll_cq, a return code of 1
+ * means there is exactly one WC waiting and no more. We don't
+ * have to invoke ib_poll_cq again to know that the CQ has been
+ * properly drained.
+ */
+static void
+rpcrdma_sendcq_poll(struct ib_cq *cq)
{
- struct ib_wc *wcs;
- int budget, count, rc;
+ struct ib_wc *pos, wcs[2];
+ int count, rc;
- budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
do {
- wcs = ep->rep_send_wcs;
+ pos = wcs;
- rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
- if (rc <= 0)
- return rc;
+ rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos);
+ if (rc < 0)
+ break;
count = rc;
while (count-- > 0)
- rpcrdma_sendcq_process_wc(wcs++);
- } while (rc == RPCRDMA_POLLSIZE && --budget);
- return 0;
+ rpcrdma_sendcq_process_wc(pos++);
+ } while (rc == ARRAY_SIZE(wcs));
+ return;
}
-/*
- * Handle send, fast_reg_mr, and local_inv completions.
- *
- * Send events are typically suppressed and thus do not result
- * in an upcall. Occasionally one is signaled, however. This
- * prevents the provider's completion queue from wrapping and
- * losing a completion.
+/* Handle provider send completion upcalls.
*/
static void
rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
{
- struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
- int rc;
-
- rc = rpcrdma_sendcq_poll(cq, ep);
- if (rc) {
- dprintk("RPC: %s: ib_poll_cq failed: %i\n",
- __func__, rc);
- return;
- }
+ do {
+ rpcrdma_sendcq_poll(cq);
+ } while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP |
+ IB_CQ_REPORT_MISSED_EVENTS) > 0);
+}
- rc = ib_req_notify_cq(cq,
- IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
- if (rc == 0)
- return;
- if (rc < 0) {
- dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
- __func__, rc);
- return;
- }
+static void
+rpcrdma_receive_worker(struct work_struct *work)
+{
+ struct rpcrdma_rep *rep =
+ container_of(work, struct rpcrdma_rep, rr_work);
- rpcrdma_sendcq_poll(cq, ep);
+ rpcrdma_reply_handler(rep);
}
static void
-rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
+rpcrdma_recvcq_process_wc(struct ib_wc *wc)
{
struct rpcrdma_rep *rep =
(struct rpcrdma_rep *)(unsigned long)wc->wr_id;
@@ -237,91 +214,60 @@ rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
prefetch(rdmab_to_msg(rep->rr_rdmabuf));
out_schedule:
- list_add_tail(&rep->rr_list, sched_list);
+ queue_work(rpcrdma_receive_wq, &rep->rr_work);
return;
+
out_fail:
if (wc->status != IB_WC_WR_FLUSH_ERR)
pr_err("RPC: %s: rep %p: %s\n",
__func__, rep, ib_wc_status_msg(wc->status));
- rep->rr_len = ~0U;
+ rep->rr_len = RPCRDMA_BAD_LEN;
goto out_schedule;
}
-static int
-rpcrdma_recvcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
+/* The wc array is on stack: automatic memory is always CPU-local.
+ *
+ * struct ib_wc is 64 bytes, making the poll array potentially
+ * large. But this is at the bottom of the call chain. Further
+ * substantial work is done in another thread.
+ */
+static void
+rpcrdma_recvcq_poll(struct ib_cq *cq)
{
- struct list_head sched_list;
- struct ib_wc *wcs;
- int budget, count, rc;
+ struct ib_wc *pos, wcs[4];
+ int count, rc;
- INIT_LIST_HEAD(&sched_list);
- budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
do {
- wcs = ep->rep_recv_wcs;
+ pos = wcs;
- rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
- if (rc <= 0)
- goto out_schedule;
+ rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos);
+ if (rc < 0)
+ break;
count = rc;
while (count-- > 0)
- rpcrdma_recvcq_process_wc(wcs++, &sched_list);
- } while (rc == RPCRDMA_POLLSIZE && --budget);
- rc = 0;
-
-out_schedule:
- rpcrdma_schedule_tasklet(&sched_list);
- return rc;
+ rpcrdma_recvcq_process_wc(pos++);
+ } while (rc == ARRAY_SIZE(wcs));
}
-/*
- * Handle receive completions.
- *
- * It is reentrant but processes single events in order to maintain
- * ordering of receives to keep server credits.
- *
- * It is the responsibility of the scheduled tasklet to return
- * recv buffers to the pool. NOTE: this affects synchronization of
- * connection shutdown. That is, the structures required for
- * the completion of the reply handler must remain intact until
- * all memory has been reclaimed.
+/* Handle provider receive completion upcalls.
*/
static void
rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
{
- struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
- int rc;
-
- rc = rpcrdma_recvcq_poll(cq, ep);
- if (rc) {
- dprintk("RPC: %s: ib_poll_cq failed: %i\n",
- __func__, rc);
- return;
- }
-
- rc = ib_req_notify_cq(cq,
- IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
- if (rc == 0)
- return;
- if (rc < 0) {
- dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
- __func__, rc);
- return;
- }
-
- rpcrdma_recvcq_poll(cq, ep);
+ do {
+ rpcrdma_recvcq_poll(cq);
+ } while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP |
+ IB_CQ_REPORT_MISSED_EVENTS) > 0);
}
static void
rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
{
struct ib_wc wc;
- LIST_HEAD(sched_list);
while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
- rpcrdma_recvcq_process_wc(&wc, &sched_list);
- if (!list_empty(&sched_list))
- rpcrdma_schedule_tasklet(&sched_list);
+ rpcrdma_recvcq_process_wc(&wc);
while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0)
rpcrdma_sendcq_process_wc(&wc);
}
@@ -432,7 +378,8 @@ rpcrdma_create_id(struct rpcrdma_xprt *xprt,
init_completion(&ia->ri_done);
- id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, IB_QPT_RC);
+ id = rdma_create_id(&init_net, rpcrdma_conn_upcall, xprt, RDMA_PS_TCP,
+ IB_QPT_RC);
if (IS_ERR(id)) {
rc = PTR_ERR(id);
dprintk("RPC: %s: rdma_create_id() failed %i\n",
@@ -622,6 +569,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
struct ib_device_attr *devattr = &ia->ri_devattr;
struct ib_cq *sendcq, *recvcq;
struct ib_cq_init_attr cq_attr = {};
+ unsigned int max_qp_wr;
int rc, err;
if (devattr->max_sge < RPCRDMA_MAX_IOVS) {
@@ -630,18 +578,27 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
return -ENOMEM;
}
+ if (devattr->max_qp_wr <= RPCRDMA_BACKWARD_WRS) {
+ dprintk("RPC: %s: insufficient wqe's available\n",
+ __func__);
+ return -ENOMEM;
+ }
+ max_qp_wr = devattr->max_qp_wr - RPCRDMA_BACKWARD_WRS;
+
/* check provider's send/recv wr limits */
- if (cdata->max_requests > devattr->max_qp_wr)
- cdata->max_requests = devattr->max_qp_wr;
+ if (cdata->max_requests > max_qp_wr)
+ cdata->max_requests = max_qp_wr;
ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
ep->rep_attr.qp_context = ep;
ep->rep_attr.srq = NULL;
ep->rep_attr.cap.max_send_wr = cdata->max_requests;
+ ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS;
rc = ia->ri_ops->ro_open(ia, ep, cdata);
if (rc)
return rc;
ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
+ ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS;
ep->rep_attr.cap.max_recv_sge = 1;
ep->rep_attr.cap.max_inline_data = 0;
@@ -669,7 +626,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
cq_attr.cqe = ep->rep_attr.cap.max_send_wr + 1;
sendcq = ib_create_cq(ia->ri_device, rpcrdma_sendcq_upcall,
- rpcrdma_cq_async_error_upcall, ep, &cq_attr);
+ rpcrdma_cq_async_error_upcall, NULL, &cq_attr);
if (IS_ERR(sendcq)) {
rc = PTR_ERR(sendcq);
dprintk("RPC: %s: failed to create send CQ: %i\n",
@@ -686,7 +643,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
cq_attr.cqe = ep->rep_attr.cap.max_recv_wr + 1;
recvcq = ib_create_cq(ia->ri_device, rpcrdma_recvcq_upcall,
- rpcrdma_cq_async_error_upcall, ep, &cq_attr);
+ rpcrdma_cq_async_error_upcall, NULL, &cq_attr);
if (IS_ERR(recvcq)) {
rc = PTR_ERR(recvcq);
dprintk("RPC: %s: failed to create recv CQ: %i\n",
@@ -885,7 +842,21 @@ retry:
}
rc = ep->rep_connected;
} else {
+ struct rpcrdma_xprt *r_xprt;
+ unsigned int extras;
+
dprintk("RPC: %s: connected\n", __func__);
+
+ r_xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
+ extras = r_xprt->rx_buf.rb_bc_srv_max_requests;
+
+ if (extras) {
+ rc = rpcrdma_ep_post_extra_recv(r_xprt, extras);
+ if (rc)
+ pr_warn("%s: rpcrdma_ep_post_extra_recv: %i\n",
+ __func__, rc);
+ rc = 0;
+ }
}
out:
@@ -922,20 +893,25 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
}
}
-static struct rpcrdma_req *
+struct rpcrdma_req *
rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
{
+ struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
struct rpcrdma_req *req;
req = kzalloc(sizeof(*req), GFP_KERNEL);
if (req == NULL)
return ERR_PTR(-ENOMEM);
+ INIT_LIST_HEAD(&req->rl_free);
+ spin_lock(&buffer->rb_reqslock);
+ list_add(&req->rl_all, &buffer->rb_allreqs);
+ spin_unlock(&buffer->rb_reqslock);
req->rl_buffer = &r_xprt->rx_buf;
return req;
}
-static struct rpcrdma_rep *
+struct rpcrdma_rep *
rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
{
struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
@@ -957,6 +933,7 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
rep->rr_device = ia->ri_device;
rep->rr_rxprt = r_xprt;
+ INIT_WORK(&rep->rr_work, rpcrdma_receive_worker);
return rep;
out_free:
@@ -970,44 +947,21 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
{
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
- struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
- char *p;
- size_t len;
int i, rc;
- buf->rb_max_requests = cdata->max_requests;
+ buf->rb_max_requests = r_xprt->rx_data.max_requests;
+ buf->rb_bc_srv_max_requests = 0;
spin_lock_init(&buf->rb_lock);
- /* Need to allocate:
- * 1. arrays for send and recv pointers
- * 2. arrays of struct rpcrdma_req to fill in pointers
- * 3. array of struct rpcrdma_rep for replies
- * Send/recv buffers in req/rep need to be registered
- */
- len = buf->rb_max_requests *
- (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
-
- p = kzalloc(len, GFP_KERNEL);
- if (p == NULL) {
- dprintk("RPC: %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
- __func__, len);
- rc = -ENOMEM;
- goto out;
- }
- buf->rb_pool = p; /* for freeing it later */
-
- buf->rb_send_bufs = (struct rpcrdma_req **) p;
- p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
- buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
- p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
-
rc = ia->ri_ops->ro_init(r_xprt);
if (rc)
goto out;
+ INIT_LIST_HEAD(&buf->rb_send_bufs);
+ INIT_LIST_HEAD(&buf->rb_allreqs);
+ spin_lock_init(&buf->rb_reqslock);
for (i = 0; i < buf->rb_max_requests; i++) {
struct rpcrdma_req *req;
- struct rpcrdma_rep *rep;
req = rpcrdma_create_req(r_xprt);
if (IS_ERR(req)) {
@@ -1016,7 +970,13 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
rc = PTR_ERR(req);
goto out;
}
- buf->rb_send_bufs[i] = req;
+ req->rl_backchannel = false;
+ list_add(&req->rl_free, &buf->rb_send_bufs);
+ }
+
+ INIT_LIST_HEAD(&buf->rb_recv_bufs);
+ for (i = 0; i < buf->rb_max_requests + 2; i++) {
+ struct rpcrdma_rep *rep;
rep = rpcrdma_create_rep(r_xprt);
if (IS_ERR(rep)) {
@@ -1025,7 +985,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
rc = PTR_ERR(rep);
goto out;
}
- buf->rb_recv_bufs[i] = rep;
+ list_add(&rep->rr_list, &buf->rb_recv_bufs);
}
return 0;
@@ -1034,22 +994,38 @@ out:
return rc;
}
+static struct rpcrdma_req *
+rpcrdma_buffer_get_req_locked(struct rpcrdma_buffer *buf)
+{
+ struct rpcrdma_req *req;
+
+ req = list_first_entry(&buf->rb_send_bufs,
+ struct rpcrdma_req, rl_free);
+ list_del(&req->rl_free);
+ return req;
+}
+
+static struct rpcrdma_rep *
+rpcrdma_buffer_get_rep_locked(struct rpcrdma_buffer *buf)
+{
+ struct rpcrdma_rep *rep;
+
+ rep = list_first_entry(&buf->rb_recv_bufs,
+ struct rpcrdma_rep, rr_list);
+ list_del(&rep->rr_list);
+ return rep;
+}
+
static void
rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep)
{
- if (!rep)
- return;
-
rpcrdma_free_regbuf(ia, rep->rr_rdmabuf);
kfree(rep);
}
-static void
+void
rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
{
- if (!req)
- return;
-
rpcrdma_free_regbuf(ia, req->rl_sendbuf);
rpcrdma_free_regbuf(ia, req->rl_rdmabuf);
kfree(req);
@@ -1059,25 +1035,29 @@ void
rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
{
struct rpcrdma_ia *ia = rdmab_to_ia(buf);
- int i;
- /* clean up in reverse order from create
- * 1. recv mr memory (mr free, then kfree)
- * 2. send mr memory (mr free, then kfree)
- * 3. MWs
- */
- dprintk("RPC: %s: entering\n", __func__);
+ while (!list_empty(&buf->rb_recv_bufs)) {
+ struct rpcrdma_rep *rep;
- for (i = 0; i < buf->rb_max_requests; i++) {
- if (buf->rb_recv_bufs)
- rpcrdma_destroy_rep(ia, buf->rb_recv_bufs[i]);
- if (buf->rb_send_bufs)
- rpcrdma_destroy_req(ia, buf->rb_send_bufs[i]);
+ rep = rpcrdma_buffer_get_rep_locked(buf);
+ rpcrdma_destroy_rep(ia, rep);
}
- ia->ri_ops->ro_destroy(buf);
+ spin_lock(&buf->rb_reqslock);
+ while (!list_empty(&buf->rb_allreqs)) {
+ struct rpcrdma_req *req;
+
+ req = list_first_entry(&buf->rb_allreqs,
+ struct rpcrdma_req, rl_all);
+ list_del(&req->rl_all);
+
+ spin_unlock(&buf->rb_reqslock);
+ rpcrdma_destroy_req(ia, req);
+ spin_lock(&buf->rb_reqslock);
+ }
+ spin_unlock(&buf->rb_reqslock);
- kfree(buf->rb_pool);
+ ia->ri_ops->ro_destroy(buf);
}
struct rpcrdma_mw *
@@ -1109,53 +1089,34 @@ rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw)
spin_unlock(&buf->rb_mwlock);
}
-static void
-rpcrdma_buffer_put_sendbuf(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
-{
- buf->rb_send_bufs[--buf->rb_send_index] = req;
- req->rl_niovs = 0;
- if (req->rl_reply) {
- buf->rb_recv_bufs[--buf->rb_recv_index] = req->rl_reply;
- req->rl_reply = NULL;
- }
-}
-
/*
* Get a set of request/reply buffers.
*
- * Reply buffer (if needed) is attached to send buffer upon return.
- * Rule:
- * rb_send_index and rb_recv_index MUST always be pointing to the
- * *next* available buffer (non-NULL). They are incremented after
- * removing buffers, and decremented *before* returning them.
+ * Reply buffer (if available) is attached to send buffer upon return.
*/
struct rpcrdma_req *
rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
{
struct rpcrdma_req *req;
- unsigned long flags;
-
- spin_lock_irqsave(&buffers->rb_lock, flags);
- if (buffers->rb_send_index == buffers->rb_max_requests) {
- spin_unlock_irqrestore(&buffers->rb_lock, flags);
- dprintk("RPC: %s: out of request buffers\n", __func__);
- return ((struct rpcrdma_req *)NULL);
- }
-
- req = buffers->rb_send_bufs[buffers->rb_send_index];
- if (buffers->rb_send_index < buffers->rb_recv_index) {
- dprintk("RPC: %s: %d extra receives outstanding (ok)\n",
- __func__,
- buffers->rb_recv_index - buffers->rb_send_index);
- req->rl_reply = NULL;
- } else {
- req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
- buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
- }
- buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
+ spin_lock(&buffers->rb_lock);
+ if (list_empty(&buffers->rb_send_bufs))
+ goto out_reqbuf;
+ req = rpcrdma_buffer_get_req_locked(buffers);
+ if (list_empty(&buffers->rb_recv_bufs))
+ goto out_repbuf;
+ req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers);
+ spin_unlock(&buffers->rb_lock);
+ return req;
- spin_unlock_irqrestore(&buffers->rb_lock, flags);
+out_reqbuf:
+ spin_unlock(&buffers->rb_lock);
+ pr_warn("RPC: %s: out of request buffers\n", __func__);
+ return NULL;
+out_repbuf:
+ spin_unlock(&buffers->rb_lock);
+ pr_warn("RPC: %s: out of reply buffers\n", __func__);
+ req->rl_reply = NULL;
return req;
}
@@ -1167,30 +1128,31 @@ void
rpcrdma_buffer_put(struct rpcrdma_req *req)
{
struct rpcrdma_buffer *buffers = req->rl_buffer;
- unsigned long flags;
+ struct rpcrdma_rep *rep = req->rl_reply;
- spin_lock_irqsave(&buffers->rb_lock, flags);
- rpcrdma_buffer_put_sendbuf(req, buffers);
- spin_unlock_irqrestore(&buffers->rb_lock, flags);
+ req->rl_niovs = 0;
+ req->rl_reply = NULL;
+
+ spin_lock(&buffers->rb_lock);
+ list_add_tail(&req->rl_free, &buffers->rb_send_bufs);
+ if (rep)
+ list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
+ spin_unlock(&buffers->rb_lock);
}
/*
* Recover reply buffers from pool.
- * This happens when recovering from error conditions.
- * Post-increment counter/array index.
+ * This happens when recovering from disconnect.
*/
void
rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
{
struct rpcrdma_buffer *buffers = req->rl_buffer;
- unsigned long flags;
- spin_lock_irqsave(&buffers->rb_lock, flags);
- if (buffers->rb_recv_index < buffers->rb_max_requests) {
- req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
- buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
- }
- spin_unlock_irqrestore(&buffers->rb_lock, flags);
+ spin_lock(&buffers->rb_lock);
+ if (!list_empty(&buffers->rb_recv_bufs))
+ req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers);
+ spin_unlock(&buffers->rb_lock);
}
/*
@@ -1201,11 +1163,10 @@ void
rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
{
struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf;
- unsigned long flags;
- spin_lock_irqsave(&buffers->rb_lock, flags);
- buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
- spin_unlock_irqrestore(&buffers->rb_lock, flags);
+ spin_lock(&buffers->rb_lock);
+ list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
+ spin_unlock(&buffers->rb_lock);
}
/*
@@ -1362,6 +1323,47 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
return rc;
}
+/**
+ * rpcrdma_ep_post_extra_recv - Post buffers for incoming backchannel requests
+ * @r_xprt: transport associated with these backchannel resources
+ * @min_reqs: minimum number of incoming requests expected
+ *
+ * Returns zero if all requested buffers were posted, or a negative errno.
+ */
+int
+rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count)
+{
+ struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
+ struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+ struct rpcrdma_ep *ep = &r_xprt->rx_ep;
+ struct rpcrdma_rep *rep;
+ unsigned long flags;
+ int rc;
+
+ while (count--) {
+ spin_lock_irqsave(&buffers->rb_lock, flags);
+ if (list_empty(&buffers->rb_recv_bufs))
+ goto out_reqbuf;
+ rep = rpcrdma_buffer_get_rep_locked(buffers);
+ spin_unlock_irqrestore(&buffers->rb_lock, flags);
+
+ rc = rpcrdma_ep_post_recv(ia, ep, rep);
+ if (rc)
+ goto out_rc;
+ }
+
+ return 0;
+
+out_reqbuf:
+ spin_unlock_irqrestore(&buffers->rb_lock, flags);
+ pr_warn("%s: no extra receive buffers\n", __func__);
+ return -ENOMEM;
+
+out_rc:
+ rpcrdma_recv_buffer_put(rep);
+ return rc;
+}
+
/* How many chunk list items fit within our inline buffers?
*/
unsigned int
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index c09414e6f..ac7f8d4f6 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -77,9 +77,6 @@ struct rpcrdma_ia {
* RDMA Endpoint -- one per transport instance
*/
-#define RPCRDMA_WC_BUDGET (128)
-#define RPCRDMA_POLLSIZE (16)
-
struct rpcrdma_ep {
atomic_t rep_cqcount;
int rep_cqinit;
@@ -89,8 +86,6 @@ struct rpcrdma_ep {
struct rdma_conn_param rep_remote_cma;
struct sockaddr_storage rep_remote_addr;
struct delayed_work rep_connect_worker;
- struct ib_wc rep_send_wcs[RPCRDMA_POLLSIZE];
- struct ib_wc rep_recv_wcs[RPCRDMA_POLLSIZE];
};
/*
@@ -106,6 +101,16 @@ struct rpcrdma_ep {
*/
#define RPCRDMA_IGNORE_COMPLETION (0ULL)
+/* Pre-allocate extra Work Requests for handling backward receives
+ * and sends. This is a fixed value because the Work Queues are
+ * allocated when the forward channel is set up.
+ */
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+#define RPCRDMA_BACKWARD_WRS (8)
+#else
+#define RPCRDMA_BACKWARD_WRS (0)
+#endif
+
/* Registered buffer -- registered kmalloc'd memory for RDMA SEND/RECV
*
* The below structure appears at the front of a large region of kmalloc'd
@@ -169,10 +174,13 @@ struct rpcrdma_rep {
unsigned int rr_len;
struct ib_device *rr_device;
struct rpcrdma_xprt *rr_rxprt;
+ struct work_struct rr_work;
struct list_head rr_list;
struct rpcrdma_regbuf *rr_rdmabuf;
};
+#define RPCRDMA_BAD_LEN (~0U)
+
/*
* struct rpcrdma_mw - external memory region metadata
*
@@ -193,7 +201,8 @@ enum rpcrdma_frmr_state {
};
struct rpcrdma_frmr {
- struct ib_fast_reg_page_list *fr_pgl;
+ struct scatterlist *sg;
+ int sg_nents;
struct ib_mr *fr_mr;
enum rpcrdma_frmr_state fr_state;
struct work_struct fr_work;
@@ -255,6 +264,7 @@ struct rpcrdma_mr_seg { /* chunk descriptors */
#define RPCRDMA_MAX_IOVS (2)
struct rpcrdma_req {
+ struct list_head rl_free;
unsigned int rl_niovs;
unsigned int rl_nchunks;
unsigned int rl_connect_cookie;
@@ -264,6 +274,9 @@ struct rpcrdma_req {
struct rpcrdma_regbuf *rl_rdmabuf;
struct rpcrdma_regbuf *rl_sendbuf;
struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];
+
+ struct list_head rl_all;
+ bool rl_backchannel;
};
static inline struct rpcrdma_req *
@@ -288,12 +301,14 @@ struct rpcrdma_buffer {
struct list_head rb_all;
char *rb_pool;
- spinlock_t rb_lock; /* protect buf arrays */
+ spinlock_t rb_lock; /* protect buf lists */
+ struct list_head rb_send_bufs;
+ struct list_head rb_recv_bufs;
u32 rb_max_requests;
- int rb_send_index;
- int rb_recv_index;
- struct rpcrdma_req **rb_send_bufs;
- struct rpcrdma_rep **rb_recv_bufs;
+
+ u32 rb_bc_srv_max_requests;
+ spinlock_t rb_reqslock; /* protect rb_allreqs */
+ struct list_head rb_allreqs;
};
#define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia)
@@ -339,6 +354,7 @@ struct rpcrdma_stats {
unsigned long failed_marshal_count;
unsigned long bad_reply_count;
unsigned long nomsg_call_count;
+ unsigned long bcall_count;
};
/*
@@ -414,6 +430,9 @@ int rpcrdma_ep_post_recv(struct rpcrdma_ia *, struct rpcrdma_ep *,
/*
* Buffer calls - xprtrdma/verbs.c
*/
+struct rpcrdma_req *rpcrdma_create_req(struct rpcrdma_xprt *);
+struct rpcrdma_rep *rpcrdma_create_rep(struct rpcrdma_xprt *);
+void rpcrdma_destroy_req(struct rpcrdma_ia *, struct rpcrdma_req *);
int rpcrdma_buffer_create(struct rpcrdma_xprt *);
void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);
@@ -430,10 +449,14 @@ void rpcrdma_free_regbuf(struct rpcrdma_ia *,
struct rpcrdma_regbuf *);
unsigned int rpcrdma_max_segments(struct rpcrdma_xprt *);
+int rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *, unsigned int);
int frwr_alloc_recovery_wq(void);
void frwr_destroy_recovery_wq(void);
+int rpcrdma_alloc_wq(void);
+void rpcrdma_destroy_wq(void);
+
/*
* Wrappers for chunk registration, shared by read/write chunk code.
*/
@@ -494,6 +517,18 @@ int rpcrdma_marshal_req(struct rpc_rqst *);
int xprt_rdma_init(void);
void xprt_rdma_cleanup(void);
+/* Backchannel calls - xprtrdma/backchannel.c
+ */
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+int xprt_rdma_bc_setup(struct rpc_xprt *, unsigned int);
+int xprt_rdma_bc_up(struct svc_serv *, struct net *);
+int rpcrdma_bc_post_recv(struct rpcrdma_xprt *, unsigned int);
+void rpcrdma_bc_receive_call(struct rpcrdma_xprt *, struct rpcrdma_rep *);
+int rpcrdma_bc_marshal_reply(struct rpc_rqst *);
+void xprt_rdma_bc_free_rqst(struct rpc_rqst *);
+void xprt_rdma_bc_destroy(struct rpc_xprt *, unsigned int);
+#endif /* CONFIG_SUNRPC_BACKCHANNEL */
+
/* Temporary NFS request map cache. Created in svc_rdma.c */
extern struct kmem_cache *svc_rdma_map_cachep;
/* WR context cache. Created in svc_rdma.c */
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 1a85e0ed0..2ffaf6a79 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -360,8 +360,10 @@ static int xs_send_pagedata(struct socket *sock, struct xdr_buf *xdr, unsigned i
int flags = XS_SENDMSG_FLAGS;
remainder -= len;
- if (remainder != 0 || more)
+ if (more)
flags |= MSG_MORE;
+ if (remainder != 0)
+ flags |= MSG_SENDPAGE_NOTLAST | MSG_MORE;
err = do_sendpage(sock, *ppage, base, len, flags);
if (remainder == 0 || err != len)
break;
@@ -396,7 +398,7 @@ static int xs_sendpages(struct socket *sock, struct sockaddr *addr, int addrlen,
if (unlikely(!sock))
return -ENOTSOCK;
- clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags);
+ clear_bit(SOCKWQ_ASYNC_NOSPACE, &sock->flags);
if (base != 0) {
addr = NULL;
addrlen = 0;
@@ -440,7 +442,7 @@ static void xs_nospace_callback(struct rpc_task *task)
struct sock_xprt *transport = container_of(task->tk_rqstp->rq_xprt, struct sock_xprt, xprt);
transport->inet->sk_write_pending--;
- clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
+ clear_bit(SOCKWQ_ASYNC_NOSPACE, &transport->sock->flags);
}
/**
@@ -465,7 +467,7 @@ static int xs_nospace(struct rpc_task *task)
/* Don't race with disconnect */
if (xprt_connected(xprt)) {
- if (test_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags)) {
+ if (test_bit(SOCKWQ_ASYNC_NOSPACE, &transport->sock->flags)) {
/*
* Notify TCP that we're limited by the application
* window size
@@ -476,7 +478,7 @@ static int xs_nospace(struct rpc_task *task)
xprt_wait_for_buffer_space(task, xs_nospace_callback);
}
} else {
- clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
+ clear_bit(SOCKWQ_ASYNC_NOSPACE, &transport->sock->flags);
ret = -ENOTCONN;
}
@@ -624,7 +626,7 @@ process_status:
case -EPERM:
/* When the server has died, an ICMP port unreachable message
* prompts ECONNREFUSED. */
- clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
+ clear_bit(SOCKWQ_ASYNC_NOSPACE, &transport->sock->flags);
}
return status;
@@ -713,7 +715,7 @@ static int xs_tcp_send_request(struct rpc_task *task)
case -EADDRINUSE:
case -ENOBUFS:
case -EPIPE:
- clear_bit(SOCK_ASYNC_NOSPACE, &transport->sock->flags);
+ clear_bit(SOCKWQ_ASYNC_NOSPACE, &transport->sock->flags);
}
return status;
@@ -823,6 +825,7 @@ static void xs_reset_transport(struct sock_xprt *transport)
kernel_sock_shutdown(sock, SHUT_RDWR);
+ mutex_lock(&transport->recv_mutex);
write_lock_bh(&sk->sk_callback_lock);
transport->inet = NULL;
transport->sock = NULL;
@@ -833,6 +836,7 @@ static void xs_reset_transport(struct sock_xprt *transport)
xprt_clear_connected(xprt);
write_unlock_bh(&sk->sk_callback_lock);
xs_sock_reset_connection_flags(xprt);
+ mutex_unlock(&transport->recv_mutex);
trace_rpc_socket_close(xprt, sock);
sock_release(sock);
@@ -886,6 +890,7 @@ static void xs_destroy(struct rpc_xprt *xprt)
cancel_delayed_work_sync(&transport->connect_worker);
xs_close(xprt);
+ cancel_work_sync(&transport->recv_worker);
xs_xprt_free(xprt);
module_put(THIS_MODULE);
}
@@ -906,44 +911,36 @@ static int xs_local_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb)
}
/**
- * xs_local_data_ready - "data ready" callback for AF_LOCAL sockets
- * @sk: socket with data to read
+ * xs_local_data_read_skb
+ * @xprt: transport
+ * @sk: socket
+ * @skb: skbuff
*
* Currently this assumes we can read the whole reply in a single gulp.
*/
-static void xs_local_data_ready(struct sock *sk)
+static void xs_local_data_read_skb(struct rpc_xprt *xprt,
+ struct sock *sk,
+ struct sk_buff *skb)
{
struct rpc_task *task;
- struct rpc_xprt *xprt;
struct rpc_rqst *rovr;
- struct sk_buff *skb;
- int err, repsize, copied;
+ int repsize, copied;
u32 _xid;
__be32 *xp;
- read_lock_bh(&sk->sk_callback_lock);
- dprintk("RPC: %s...\n", __func__);
- xprt = xprt_from_sock(sk);
- if (xprt == NULL)
- goto out;
-
- skb = skb_recv_datagram(sk, 0, 1, &err);
- if (skb == NULL)
- goto out;
-
repsize = skb->len - sizeof(rpc_fraghdr);
if (repsize < 4) {
dprintk("RPC: impossible RPC reply size %d\n", repsize);
- goto dropit;
+ return;
}
/* Copy the XID from the skb... */
xp = skb_header_pointer(skb, sizeof(rpc_fraghdr), sizeof(_xid), &_xid);
if (xp == NULL)
- goto dropit;
+ return;
/* Look up and lock the request corresponding to the given XID */
- spin_lock(&xprt->transport_lock);
+ spin_lock_bh(&xprt->transport_lock);
rovr = xprt_lookup_rqst(xprt, *xp);
if (!rovr)
goto out_unlock;
@@ -961,50 +958,68 @@ static void xs_local_data_ready(struct sock *sk)
xprt_complete_rqst(task, copied);
out_unlock:
- spin_unlock(&xprt->transport_lock);
- dropit:
- skb_free_datagram(sk, skb);
- out:
- read_unlock_bh(&sk->sk_callback_lock);
+ spin_unlock_bh(&xprt->transport_lock);
+}
+
+static void xs_local_data_receive(struct sock_xprt *transport)
+{
+ struct sk_buff *skb;
+ struct sock *sk;
+ int err;
+
+ mutex_lock(&transport->recv_mutex);
+ sk = transport->inet;
+ if (sk == NULL)
+ goto out;
+ for (;;) {
+ skb = skb_recv_datagram(sk, 0, 1, &err);
+ if (skb == NULL)
+ break;
+ xs_local_data_read_skb(&transport->xprt, sk, skb);
+ skb_free_datagram(sk, skb);
+ }
+out:
+ mutex_unlock(&transport->recv_mutex);
+}
+
+static void xs_local_data_receive_workfn(struct work_struct *work)
+{
+ struct sock_xprt *transport =
+ container_of(work, struct sock_xprt, recv_worker);
+ xs_local_data_receive(transport);
}
/**
- * xs_udp_data_ready - "data ready" callback for UDP sockets
- * @sk: socket with data to read
+ * xs_udp_data_read_skb - receive callback for UDP sockets
+ * @xprt: transport
+ * @sk: socket
+ * @skb: skbuff
*
*/
-static void xs_udp_data_ready(struct sock *sk)
+static void xs_udp_data_read_skb(struct rpc_xprt *xprt,
+ struct sock *sk,
+ struct sk_buff *skb)
{
struct rpc_task *task;
- struct rpc_xprt *xprt;
struct rpc_rqst *rovr;
- struct sk_buff *skb;
- int err, repsize, copied;
+ int repsize, copied;
u32 _xid;
__be32 *xp;
- read_lock_bh(&sk->sk_callback_lock);
- dprintk("RPC: xs_udp_data_ready...\n");
- if (!(xprt = xprt_from_sock(sk)))
- goto out;
-
- if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)
- goto out;
-
repsize = skb->len - sizeof(struct udphdr);
if (repsize < 4) {
dprintk("RPC: impossible RPC reply size %d!\n", repsize);
- goto dropit;
+ return;
}
/* Copy the XID from the skb... */
xp = skb_header_pointer(skb, sizeof(struct udphdr),
sizeof(_xid), &_xid);
if (xp == NULL)
- goto dropit;
+ return;
/* Look up and lock the request corresponding to the given XID */
- spin_lock(&xprt->transport_lock);
+ spin_lock_bh(&xprt->transport_lock);
rovr = xprt_lookup_rqst(xprt, *xp);
if (!rovr)
goto out_unlock;
@@ -1025,10 +1040,54 @@ static void xs_udp_data_ready(struct sock *sk)
xprt_complete_rqst(task, copied);
out_unlock:
- spin_unlock(&xprt->transport_lock);
- dropit:
- skb_free_datagram(sk, skb);
- out:
+ spin_unlock_bh(&xprt->transport_lock);
+}
+
+static void xs_udp_data_receive(struct sock_xprt *transport)
+{
+ struct sk_buff *skb;
+ struct sock *sk;
+ int err;
+
+ mutex_lock(&transport->recv_mutex);
+ sk = transport->inet;
+ if (sk == NULL)
+ goto out;
+ for (;;) {
+ skb = skb_recv_datagram(sk, 0, 1, &err);
+ if (skb == NULL)
+ break;
+ xs_udp_data_read_skb(&transport->xprt, sk, skb);
+ skb_free_datagram(sk, skb);
+ }
+out:
+ mutex_unlock(&transport->recv_mutex);
+}
+
+static void xs_udp_data_receive_workfn(struct work_struct *work)
+{
+ struct sock_xprt *transport =
+ container_of(work, struct sock_xprt, recv_worker);
+ xs_udp_data_receive(transport);
+}
+
+/**
+ * xs_data_ready - "data ready" callback for UDP sockets
+ * @sk: socket with data to read
+ *
+ */
+static void xs_data_ready(struct sock *sk)
+{
+ struct rpc_xprt *xprt;
+
+ read_lock_bh(&sk->sk_callback_lock);
+ dprintk("RPC: xs_data_ready...\n");
+ xprt = xprt_from_sock(sk);
+ if (xprt != NULL) {
+ struct sock_xprt *transport = container_of(xprt,
+ struct sock_xprt, xprt);
+ queue_work(rpciod_workqueue, &transport->recv_worker);
+ }
read_unlock_bh(&sk->sk_callback_lock);
}
@@ -1243,12 +1302,12 @@ static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
dprintk("RPC: read reply XID %08x\n", ntohl(transport->tcp_xid));
/* Find and lock the request corresponding to this xid */
- spin_lock(&xprt->transport_lock);
+ spin_lock_bh(&xprt->transport_lock);
req = xprt_lookup_rqst(xprt, transport->tcp_xid);
if (!req) {
dprintk("RPC: XID %08x request not found!\n",
ntohl(transport->tcp_xid));
- spin_unlock(&xprt->transport_lock);
+ spin_unlock_bh(&xprt->transport_lock);
return -1;
}
@@ -1257,7 +1316,7 @@ static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
xprt_complete_rqst(req->rq_task, transport->tcp_copied);
- spin_unlock(&xprt->transport_lock);
+ spin_unlock_bh(&xprt->transport_lock);
return 0;
}
@@ -1277,10 +1336,10 @@ static int xs_tcp_read_callback(struct rpc_xprt *xprt,
struct rpc_rqst *req;
/* Look up and lock the request corresponding to the given XID */
- spin_lock(&xprt->transport_lock);
+ spin_lock_bh(&xprt->transport_lock);
req = xprt_lookup_bc_request(xprt, transport->tcp_xid);
if (req == NULL) {
- spin_unlock(&xprt->transport_lock);
+ spin_unlock_bh(&xprt->transport_lock);
printk(KERN_WARNING "Callback slot table overflowed\n");
xprt_force_disconnect(xprt);
return -1;
@@ -1291,7 +1350,7 @@ static int xs_tcp_read_callback(struct rpc_xprt *xprt,
if (!(transport->tcp_flags & TCP_RCV_COPY_DATA))
xprt_complete_bc_request(req, transport->tcp_copied);
- spin_unlock(&xprt->transport_lock);
+ spin_unlock_bh(&xprt->transport_lock);
return 0;
}
@@ -1306,6 +1365,17 @@ static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
xs_tcp_read_reply(xprt, desc) :
xs_tcp_read_callback(xprt, desc);
}
+
+static int xs_tcp_bc_up(struct svc_serv *serv, struct net *net)
+{
+ int ret;
+
+ ret = svc_create_xprt(serv, "tcp-bc", net, PF_INET, 0,
+ SVC_SOCK_ANONYMOUS);
+ if (ret < 0)
+ return ret;
+ return 0;
+}
#else
static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
struct xdr_skb_reader *desc)
@@ -1391,6 +1461,44 @@ static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, uns
return len - desc.count;
}
+static void xs_tcp_data_receive(struct sock_xprt *transport)
+{
+ struct rpc_xprt *xprt = &transport->xprt;
+ struct sock *sk;
+ read_descriptor_t rd_desc = {
+ .count = 2*1024*1024,
+ .arg.data = xprt,
+ };
+ unsigned long total = 0;
+ int read = 0;
+
+ mutex_lock(&transport->recv_mutex);
+ sk = transport->inet;
+ if (sk == NULL)
+ goto out;
+
+ /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
+ for (;;) {
+ lock_sock(sk);
+ read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
+ release_sock(sk);
+ if (read <= 0)
+ break;
+ total += read;
+ rd_desc.count = 65536;
+ }
+out:
+ mutex_unlock(&transport->recv_mutex);
+ trace_xs_tcp_data_ready(xprt, read, total);
+}
+
+static void xs_tcp_data_receive_workfn(struct work_struct *work)
+{
+ struct sock_xprt *transport =
+ container_of(work, struct sock_xprt, recv_worker);
+ xs_tcp_data_receive(transport);
+}
+
/**
* xs_tcp_data_ready - "data ready" callback for TCP sockets
* @sk: socket with data to read
@@ -1398,34 +1506,24 @@ static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, uns
*/
static void xs_tcp_data_ready(struct sock *sk)
{
+ struct sock_xprt *transport;
struct rpc_xprt *xprt;
- read_descriptor_t rd_desc;
- int read;
- unsigned long total = 0;
dprintk("RPC: xs_tcp_data_ready...\n");
read_lock_bh(&sk->sk_callback_lock);
- if (!(xprt = xprt_from_sock(sk))) {
- read = 0;
+ if (!(xprt = xprt_from_sock(sk)))
goto out;
- }
+ transport = container_of(xprt, struct sock_xprt, xprt);
+
/* Any data means we had a useful conversation, so
* the we don't need to delay the next reconnect
*/
if (xprt->reestablish_timeout)
xprt->reestablish_timeout = 0;
+ queue_work(rpciod_workqueue, &transport->recv_worker);
- /* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
- rd_desc.arg.data = xprt;
- do {
- rd_desc.count = 65536;
- read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv);
- if (read > 0)
- total += read;
- } while (read > 0);
out:
- trace_xs_tcp_data_ready(xprt, read, total);
read_unlock_bh(&sk->sk_callback_lock);
}
@@ -1520,7 +1618,7 @@ static void xs_write_space(struct sock *sk)
if (unlikely(!(xprt = xprt_from_sock(sk))))
return;
- if (test_and_clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags) == 0)
+ if (test_and_clear_bit(SOCKWQ_ASYNC_NOSPACE, &sock->flags) == 0)
return;
xprt_write_space(xprt);
@@ -1873,7 +1971,7 @@ static int xs_local_finish_connecting(struct rpc_xprt *xprt,
xs_save_old_callbacks(transport, sk);
sk->sk_user_data = xprt;
- sk->sk_data_ready = xs_local_data_ready;
+ sk->sk_data_ready = xs_data_ready;
sk->sk_write_space = xs_udp_write_space;
sk->sk_error_report = xs_error_report;
sk->sk_allocation = GFP_NOIO;
@@ -2059,7 +2157,7 @@ static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
xs_save_old_callbacks(transport, sk);
sk->sk_user_data = xprt;
- sk->sk_data_ready = xs_udp_data_ready;
+ sk->sk_data_ready = xs_data_ready;
sk->sk_write_space = xs_udp_write_space;
sk->sk_allocation = GFP_NOIO;
@@ -2472,7 +2570,7 @@ static int bc_send_request(struct rpc_task *task)
{
struct rpc_rqst *req = task->tk_rqstp;
struct svc_xprt *xprt;
- u32 len;
+ int len;
dprintk("sending request with xid: %08x\n", ntohl(req->rq_xid));
/*
@@ -2580,6 +2678,12 @@ static struct rpc_xprt_ops xs_tcp_ops = {
.enable_swap = xs_enable_swap,
.disable_swap = xs_disable_swap,
.inject_disconnect = xs_inject_disconnect,
+#ifdef CONFIG_SUNRPC_BACKCHANNEL
+ .bc_setup = xprt_setup_bc,
+ .bc_up = xs_tcp_bc_up,
+ .bc_free_rqst = xprt_free_bc_rqst,
+ .bc_destroy = xprt_destroy_bc,
+#endif
};
/*
@@ -2650,6 +2754,7 @@ static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args,
}
new = container_of(xprt, struct sock_xprt, xprt);
+ mutex_init(&new->recv_mutex);
memcpy(&xprt->addr, args->dstaddr, args->addrlen);
xprt->addrlen = args->addrlen;
if (args->srcaddr)
@@ -2703,6 +2808,7 @@ static struct rpc_xprt *xs_setup_local(struct xprt_create *args)
xprt->ops = &xs_local_ops;
xprt->timeout = &xs_local_default_timeout;
+ INIT_WORK(&transport->recv_worker, xs_local_data_receive_workfn);
INIT_DELAYED_WORK(&transport->connect_worker,
xs_dummy_setup_socket);
@@ -2774,21 +2880,20 @@ static struct rpc_xprt *xs_setup_udp(struct xprt_create *args)
xprt->timeout = &xs_udp_default_timeout;
+ INIT_WORK(&transport->recv_worker, xs_udp_data_receive_workfn);
+ INIT_DELAYED_WORK(&transport->connect_worker, xs_udp_setup_socket);
+
switch (addr->sa_family) {
case AF_INET:
if (((struct sockaddr_in *)addr)->sin_port != htons(0))
xprt_set_bound(xprt);
- INIT_DELAYED_WORK(&transport->connect_worker,
- xs_udp_setup_socket);
xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP);
break;
case AF_INET6:
if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
xprt_set_bound(xprt);
- INIT_DELAYED_WORK(&transport->connect_worker,
- xs_udp_setup_socket);
xs_format_peer_addresses(xprt, "udp", RPCBIND_NETID_UDP6);
break;
default:
@@ -2853,21 +2958,20 @@ static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args)
xprt->ops = &xs_tcp_ops;
xprt->timeout = &xs_tcp_default_timeout;
+ INIT_WORK(&transport->recv_worker, xs_tcp_data_receive_workfn);
+ INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_setup_socket);
+
switch (addr->sa_family) {
case AF_INET:
if (((struct sockaddr_in *)addr)->sin_port != htons(0))
xprt_set_bound(xprt);
- INIT_DELAYED_WORK(&transport->connect_worker,
- xs_tcp_setup_socket);
xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP);
break;
case AF_INET6:
if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0))
xprt_set_bound(xprt);
- INIT_DELAYED_WORK(&transport->connect_worker,
- xs_tcp_setup_socket);
xs_format_peer_addresses(xprt, "tcp", RPCBIND_NETID_TCP6);
break;
default: