diff options
Diffstat (limited to 'net/sunrpc')
-rw-r--r-- | net/sunrpc/auth_gss/auth_gss.c | 2 | ||||
-rw-r--r-- | net/sunrpc/auth_gss/gss_rpc_upcall.c | 3 | ||||
-rw-r--r-- | net/sunrpc/cache.c | 10 | ||||
-rw-r--r-- | net/sunrpc/clnt.c | 1 | ||||
-rw-r--r-- | net/sunrpc/rpc_pipe.c | 62 | ||||
-rw-r--r-- | net/sunrpc/svc_xprt.c | 45 | ||||
-rw-r--r-- | net/sunrpc/svcauth.c | 2 | ||||
-rw-r--r-- | net/sunrpc/svcauth_unix.c | 8 | ||||
-rw-r--r-- | net/sunrpc/xprt.c | 1 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/Makefile | 2 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/backchannel.c | 28 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/fmr_ops.c | 64 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/frwr_ops.c | 181 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/physical_ops.c | 13 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/rpc_rdma.c | 16 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma.c | 41 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 371 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 56 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_sendto.c | 33 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_transport.c | 360 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/transport.c | 33 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/verbs.c | 40 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/xprt_rdma.h | 35 | ||||
-rw-r--r-- | net/sunrpc/xprtsock.c | 14 |
24 files changed, 1094 insertions, 327 deletions
diff --git a/net/sunrpc/auth_gss/auth_gss.c b/net/sunrpc/auth_gss/auth_gss.c index 799e65b94..cabf586f4 100644 --- a/net/sunrpc/auth_gss/auth_gss.c +++ b/net/sunrpc/auth_gss/auth_gss.c @@ -740,7 +740,7 @@ gss_pipe_downcall(struct file *filp, const char __user *src, size_t mlen) default: printk(KERN_CRIT "%s: bad return from " "gss_fill_context: %zd\n", __func__, err); - BUG(); + gss_msg->msg.errno = -EIO; } goto err_release_msg; } diff --git a/net/sunrpc/auth_gss/gss_rpc_upcall.c b/net/sunrpc/auth_gss/gss_rpc_upcall.c index 59eeed43e..f0c6a8c78 100644 --- a/net/sunrpc/auth_gss/gss_rpc_upcall.c +++ b/net/sunrpc/auth_gss/gss_rpc_upcall.c @@ -326,6 +326,9 @@ int gssp_accept_sec_context_upcall(struct net *net, if (data->found_creds && client_name.data != NULL) { char *c; + data->creds.cr_raw_principal = kstrndup(client_name.data, + client_name.len, GFP_KERNEL); + data->creds.cr_principal = kstrndup(client_name.data, client_name.len, GFP_KERNEL); if (data->creds.cr_principal) { diff --git a/net/sunrpc/cache.c b/net/sunrpc/cache.c index 21e203531..273bc3a35 100644 --- a/net/sunrpc/cache.c +++ b/net/sunrpc/cache.c @@ -771,7 +771,7 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count, if (count == 0) return 0; - mutex_lock(&inode->i_mutex); /* protect against multiple concurrent + inode_lock(inode); /* protect against multiple concurrent * readers on this file */ again: spin_lock(&queue_lock); @@ -784,7 +784,7 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count, } if (rp->q.list.next == &cd->queue) { spin_unlock(&queue_lock); - mutex_unlock(&inode->i_mutex); + inode_unlock(inode); WARN_ON_ONCE(rp->offset); return 0; } @@ -838,7 +838,7 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count, } if (err == -EAGAIN) goto again; - mutex_unlock(&inode->i_mutex); + inode_unlock(inode); return err ? err : count; } @@ -909,9 +909,9 @@ static ssize_t cache_write(struct file *filp, const char __user *buf, if (!cd->cache_parse) goto out; - mutex_lock(&inode->i_mutex); + inode_lock(inode); ret = cache_downcall(mapping, buf, count, cd); - mutex_unlock(&inode->i_mutex); + inode_unlock(inode); out: return ret; } diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c index 23608eb0d..b7f21044f 100644 --- a/net/sunrpc/clnt.c +++ b/net/sunrpc/clnt.c @@ -1217,6 +1217,7 @@ static int rpc_anyaddr(int family, struct sockaddr *buf, size_t buflen) return -EINVAL; memcpy(buf, &rpc_in6addr_loopback, sizeof(rpc_in6addr_loopback)); + break; default: dprintk("RPC: %s: address family not supported\n", __func__); diff --git a/net/sunrpc/rpc_pipe.c b/net/sunrpc/rpc_pipe.c index d81186d34..31789ef3e 100644 --- a/net/sunrpc/rpc_pipe.c +++ b/net/sunrpc/rpc_pipe.c @@ -172,7 +172,7 @@ rpc_close_pipes(struct inode *inode) int need_release; LIST_HEAD(free_list); - mutex_lock(&inode->i_mutex); + inode_lock(inode); spin_lock(&pipe->lock); need_release = pipe->nreaders != 0 || pipe->nwriters != 0; pipe->nreaders = 0; @@ -188,7 +188,7 @@ rpc_close_pipes(struct inode *inode) cancel_delayed_work_sync(&pipe->queue_timeout); rpc_inode_setowner(inode, NULL); RPC_I(inode)->pipe = NULL; - mutex_unlock(&inode->i_mutex); + inode_unlock(inode); } static struct inode * @@ -221,7 +221,7 @@ rpc_pipe_open(struct inode *inode, struct file *filp) int first_open; int res = -ENXIO; - mutex_lock(&inode->i_mutex); + inode_lock(inode); pipe = RPC_I(inode)->pipe; if (pipe == NULL) goto out; @@ -237,7 +237,7 @@ rpc_pipe_open(struct inode *inode, struct file *filp) pipe->nwriters++; res = 0; out: - mutex_unlock(&inode->i_mutex); + inode_unlock(inode); return res; } @@ -248,7 +248,7 @@ rpc_pipe_release(struct inode *inode, struct file *filp) struct rpc_pipe_msg *msg; int last_close; - mutex_lock(&inode->i_mutex); + inode_lock(inode); pipe = RPC_I(inode)->pipe; if (pipe == NULL) goto out; @@ -278,7 +278,7 @@ rpc_pipe_release(struct inode *inode, struct file *filp) if (last_close && pipe->ops->release_pipe) pipe->ops->release_pipe(inode); out: - mutex_unlock(&inode->i_mutex); + inode_unlock(inode); return 0; } @@ -290,7 +290,7 @@ rpc_pipe_read(struct file *filp, char __user *buf, size_t len, loff_t *offset) struct rpc_pipe_msg *msg; int res = 0; - mutex_lock(&inode->i_mutex); + inode_lock(inode); pipe = RPC_I(inode)->pipe; if (pipe == NULL) { res = -EPIPE; @@ -322,7 +322,7 @@ rpc_pipe_read(struct file *filp, char __user *buf, size_t len, loff_t *offset) pipe->ops->destroy_msg(msg); } out_unlock: - mutex_unlock(&inode->i_mutex); + inode_unlock(inode); return res; } @@ -332,11 +332,11 @@ rpc_pipe_write(struct file *filp, const char __user *buf, size_t len, loff_t *of struct inode *inode = file_inode(filp); int res; - mutex_lock(&inode->i_mutex); + inode_lock(inode); res = -EPIPE; if (RPC_I(inode)->pipe != NULL) res = RPC_I(inode)->pipe->ops->downcall(filp, buf, len); - mutex_unlock(&inode->i_mutex); + inode_unlock(inode); return res; } @@ -349,12 +349,12 @@ rpc_pipe_poll(struct file *filp, struct poll_table_struct *wait) poll_wait(filp, &rpci->waitq, wait); - mutex_lock(&inode->i_mutex); + inode_lock(inode); if (rpci->pipe == NULL) mask |= POLLERR | POLLHUP; else if (filp->private_data || !list_empty(&rpci->pipe->pipe)) mask |= POLLIN | POLLRDNORM; - mutex_unlock(&inode->i_mutex); + inode_unlock(inode); return mask; } @@ -367,10 +367,10 @@ rpc_pipe_ioctl(struct file *filp, unsigned int cmd, unsigned long arg) switch (cmd) { case FIONREAD: - mutex_lock(&inode->i_mutex); + inode_lock(inode); pipe = RPC_I(inode)->pipe; if (pipe == NULL) { - mutex_unlock(&inode->i_mutex); + inode_unlock(inode); return -EPIPE; } spin_lock(&pipe->lock); @@ -381,7 +381,7 @@ rpc_pipe_ioctl(struct file *filp, unsigned int cmd, unsigned long arg) len += msg->len - msg->copied; } spin_unlock(&pipe->lock); - mutex_unlock(&inode->i_mutex); + inode_unlock(inode); return put_user(len, (int __user *)arg); default: return -EINVAL; @@ -617,9 +617,9 @@ int rpc_rmdir(struct dentry *dentry) parent = dget_parent(dentry); dir = d_inode(parent); - mutex_lock_nested(&dir->i_mutex, I_MUTEX_PARENT); + inode_lock_nested(dir, I_MUTEX_PARENT); error = __rpc_rmdir(dir, dentry); - mutex_unlock(&dir->i_mutex); + inode_unlock(dir); dput(parent); return error; } @@ -701,9 +701,9 @@ static void rpc_depopulate(struct dentry *parent, { struct inode *dir = d_inode(parent); - mutex_lock_nested(&dir->i_mutex, I_MUTEX_CHILD); + inode_lock_nested(dir, I_MUTEX_CHILD); __rpc_depopulate(parent, files, start, eof); - mutex_unlock(&dir->i_mutex); + inode_unlock(dir); } static int rpc_populate(struct dentry *parent, @@ -715,7 +715,7 @@ static int rpc_populate(struct dentry *parent, struct dentry *dentry; int i, err; - mutex_lock(&dir->i_mutex); + inode_lock(dir); for (i = start; i < eof; i++) { dentry = __rpc_lookup_create_exclusive(parent, files[i].name); err = PTR_ERR(dentry); @@ -739,11 +739,11 @@ static int rpc_populate(struct dentry *parent, if (err != 0) goto out_bad; } - mutex_unlock(&dir->i_mutex); + inode_unlock(dir); return 0; out_bad: __rpc_depopulate(parent, files, start, eof); - mutex_unlock(&dir->i_mutex); + inode_unlock(dir); printk(KERN_WARNING "%s: %s failed to populate directory %pd\n", __FILE__, __func__, parent); return err; @@ -757,7 +757,7 @@ static struct dentry *rpc_mkdir_populate(struct dentry *parent, struct inode *dir = d_inode(parent); int error; - mutex_lock_nested(&dir->i_mutex, I_MUTEX_PARENT); + inode_lock_nested(dir, I_MUTEX_PARENT); dentry = __rpc_lookup_create_exclusive(parent, name); if (IS_ERR(dentry)) goto out; @@ -770,7 +770,7 @@ static struct dentry *rpc_mkdir_populate(struct dentry *parent, goto err_rmdir; } out: - mutex_unlock(&dir->i_mutex); + inode_unlock(dir); return dentry; err_rmdir: __rpc_rmdir(dir, dentry); @@ -788,11 +788,11 @@ static int rpc_rmdir_depopulate(struct dentry *dentry, parent = dget_parent(dentry); dir = d_inode(parent); - mutex_lock_nested(&dir->i_mutex, I_MUTEX_PARENT); + inode_lock_nested(dir, I_MUTEX_PARENT); if (depopulate != NULL) depopulate(dentry); error = __rpc_rmdir(dir, dentry); - mutex_unlock(&dir->i_mutex); + inode_unlock(dir); dput(parent); return error; } @@ -828,7 +828,7 @@ struct dentry *rpc_mkpipe_dentry(struct dentry *parent, const char *name, if (pipe->ops->downcall == NULL) umode &= ~S_IWUGO; - mutex_lock_nested(&dir->i_mutex, I_MUTEX_PARENT); + inode_lock_nested(dir, I_MUTEX_PARENT); dentry = __rpc_lookup_create_exclusive(parent, name); if (IS_ERR(dentry)) goto out; @@ -837,7 +837,7 @@ struct dentry *rpc_mkpipe_dentry(struct dentry *parent, const char *name, if (err) goto out_err; out: - mutex_unlock(&dir->i_mutex); + inode_unlock(dir); return dentry; out_err: dentry = ERR_PTR(err); @@ -865,9 +865,9 @@ rpc_unlink(struct dentry *dentry) parent = dget_parent(dentry); dir = d_inode(parent); - mutex_lock_nested(&dir->i_mutex, I_MUTEX_PARENT); + inode_lock_nested(dir, I_MUTEX_PARENT); error = __rpc_rmpipe(dir, dentry); - mutex_unlock(&dir->i_mutex); + inode_unlock(dir); dput(parent); return error; } @@ -1500,7 +1500,7 @@ int register_rpc_pipefs(void) rpc_inode_cachep = kmem_cache_create("rpc_inode_cache", sizeof(struct rpc_inode), 0, (SLAB_HWCACHE_ALIGN|SLAB_RECLAIM_ACCOUNT| - SLAB_MEM_SPREAD), + SLAB_MEM_SPREAD|SLAB_ACCOUNT), init_once); if (!rpc_inode_cachep) return -ENOMEM; diff --git a/net/sunrpc/svc_xprt.c b/net/sunrpc/svc_xprt.c index a6cbb2104..7422f2881 100644 --- a/net/sunrpc/svc_xprt.c +++ b/net/sunrpc/svc_xprt.c @@ -10,11 +10,13 @@ #include <linux/kthread.h> #include <linux/slab.h> #include <net/sock.h> +#include <linux/sunrpc/addr.h> #include <linux/sunrpc/stats.h> #include <linux/sunrpc/svc_xprt.h> #include <linux/sunrpc/svcsock.h> #include <linux/sunrpc/xprt.h> #include <linux/module.h> +#include <linux/netdevice.h> #include <trace/events/sunrpc.h> #define RPCDBG_FACILITY RPCDBG_SVCXPRT @@ -938,6 +940,49 @@ static void svc_age_temp_xprts(unsigned long closure) mod_timer(&serv->sv_temptimer, jiffies + svc_conn_age_period * HZ); } +/* Close temporary transports whose xpt_local matches server_addr immediately + * instead of waiting for them to be picked up by the timer. + * + * This is meant to be called from a notifier_block that runs when an ip + * address is deleted. + */ +void svc_age_temp_xprts_now(struct svc_serv *serv, struct sockaddr *server_addr) +{ + struct svc_xprt *xprt; + struct svc_sock *svsk; + struct socket *sock; + struct list_head *le, *next; + LIST_HEAD(to_be_closed); + struct linger no_linger = { + .l_onoff = 1, + .l_linger = 0, + }; + + spin_lock_bh(&serv->sv_lock); + list_for_each_safe(le, next, &serv->sv_tempsocks) { + xprt = list_entry(le, struct svc_xprt, xpt_list); + if (rpc_cmp_addr(server_addr, (struct sockaddr *) + &xprt->xpt_local)) { + dprintk("svc_age_temp_xprts_now: found %p\n", xprt); + list_move(le, &to_be_closed); + } + } + spin_unlock_bh(&serv->sv_lock); + + while (!list_empty(&to_be_closed)) { + le = to_be_closed.next; + list_del_init(le); + xprt = list_entry(le, struct svc_xprt, xpt_list); + dprintk("svc_age_temp_xprts_now: closing %p\n", xprt); + svsk = container_of(xprt, struct svc_sock, sk_xprt); + sock = svsk->sk_sock; + kernel_setsockopt(sock, SOL_SOCKET, SO_LINGER, + (char *)&no_linger, sizeof(no_linger)); + svc_close_xprt(xprt); + } +} +EXPORT_SYMBOL_GPL(svc_age_temp_xprts_now); + static void call_xpt_users(struct svc_xprt *xprt) { struct svc_xpt_user *u; diff --git a/net/sunrpc/svcauth.c b/net/sunrpc/svcauth.c index 79c0f3459..69841db1f 100644 --- a/net/sunrpc/svcauth.c +++ b/net/sunrpc/svcauth.c @@ -55,6 +55,7 @@ svc_authenticate(struct svc_rqst *rqstp, __be32 *authp) spin_unlock(&authtab_lock); rqstp->rq_auth_slack = 0; + init_svc_cred(&rqstp->rq_cred); rqstp->rq_authop = aops; return aops->accept(rqstp, authp); @@ -63,6 +64,7 @@ EXPORT_SYMBOL_GPL(svc_authenticate); int svc_set_client(struct svc_rqst *rqstp) { + rqstp->rq_client = NULL; return rqstp->rq_authop->set_client(rqstp); } EXPORT_SYMBOL_GPL(svc_set_client); diff --git a/net/sunrpc/svcauth_unix.c b/net/sunrpc/svcauth_unix.c index 621ca7b4a..dfacdc95b 100644 --- a/net/sunrpc/svcauth_unix.c +++ b/net/sunrpc/svcauth_unix.c @@ -728,10 +728,6 @@ svcauth_null_accept(struct svc_rqst *rqstp, __be32 *authp) struct kvec *resv = &rqstp->rq_res.head[0]; struct svc_cred *cred = &rqstp->rq_cred; - cred->cr_group_info = NULL; - cred->cr_principal = NULL; - rqstp->rq_client = NULL; - if (argv->iov_len < 3*4) return SVC_GARBAGE; @@ -794,10 +790,6 @@ svcauth_unix_accept(struct svc_rqst *rqstp, __be32 *authp) u32 slen, i; int len = argv->iov_len; - cred->cr_group_info = NULL; - cred->cr_principal = NULL; - rqstp->rq_client = NULL; - if ((len -= 3*4) < 0) return SVC_GARBAGE; diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index 2e98f4a24..37edea6fa 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -1425,3 +1425,4 @@ void xprt_put(struct rpc_xprt *xprt) if (atomic_dec_and_test(&xprt->count)) xprt_destroy(xprt); } +EXPORT_SYMBOL_GPL(xprt_put); diff --git a/net/sunrpc/xprtrdma/Makefile b/net/sunrpc/xprtrdma/Makefile index 33f99d300..dc9f3b513 100644 --- a/net/sunrpc/xprtrdma/Makefile +++ b/net/sunrpc/xprtrdma/Makefile @@ -2,7 +2,7 @@ obj-$(CONFIG_SUNRPC_XPRT_RDMA) += rpcrdma.o rpcrdma-y := transport.o rpc_rdma.o verbs.o \ fmr_ops.o frwr_ops.o physical_ops.o \ - svc_rdma.o svc_rdma_transport.o \ + svc_rdma.o svc_rdma_backchannel.o svc_rdma_transport.o \ svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o \ module.o rpcrdma-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel.o diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c index 2dcb44f69..2dcd7640e 100644 --- a/net/sunrpc/xprtrdma/backchannel.c +++ b/net/sunrpc/xprtrdma/backchannel.c @@ -15,7 +15,7 @@ # define RPCDBG_FACILITY RPCDBG_TRANS #endif -#define RPCRDMA_BACKCHANNEL_DEBUG +#undef RPCRDMA_BACKCHANNEL_DEBUG static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) @@ -42,8 +42,8 @@ static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt, size_t size; req = rpcrdma_create_req(r_xprt); - if (!req) - return -ENOMEM; + if (IS_ERR(req)) + return PTR_ERR(req); req->rl_backchannel = true; size = RPCRDMA_INLINE_WRITE_THRESHOLD(rqst); @@ -84,9 +84,7 @@ out_fail: static int rpcrdma_bc_setup_reps(struct rpcrdma_xprt *r_xprt, unsigned int count) { - struct rpcrdma_buffer *buffers = &r_xprt->rx_buf; struct rpcrdma_rep *rep; - unsigned long flags; int rc = 0; while (count--) { @@ -98,9 +96,7 @@ static int rpcrdma_bc_setup_reps(struct rpcrdma_xprt *r_xprt, break; } - spin_lock_irqsave(&buffers->rb_lock, flags); - list_add(&rep->rr_list, &buffers->rb_recv_bufs); - spin_unlock_irqrestore(&buffers->rb_lock, flags); + rpcrdma_recv_buffer_put(rep); } return rc; @@ -140,6 +136,7 @@ int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs) __func__); goto out_free; } + dprintk("RPC: %s: new rqst %p\n", __func__, rqst); rqst->rq_xprt = &r_xprt->rx_xprt; INIT_LIST_HEAD(&rqst->rq_list); @@ -220,12 +217,14 @@ int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst) rpclen = rqst->rq_svec[0].iov_len; +#ifdef RPCRDMA_BACKCHANNEL_DEBUG pr_info("RPC: %s: rpclen %zd headerp 0x%p lkey 0x%x\n", __func__, rpclen, headerp, rdmab_lkey(req->rl_rdmabuf)); pr_info("RPC: %s: RPC/RDMA: %*ph\n", __func__, (int)RPCRDMA_HDRLEN_MIN, headerp); pr_info("RPC: %s: RPC: %*ph\n", __func__, (int)rpclen, rqst->rq_svec[0].iov_base); +#endif req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf); req->rl_send_iov[0].length = RPCRDMA_HDRLEN_MIN; @@ -269,6 +268,9 @@ void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst) { struct rpc_xprt *xprt = rqst->rq_xprt; + dprintk("RPC: %s: freeing rqst %p (req %p)\n", + __func__, rqst, rpcr_to_rdmar(rqst)); + smp_mb__before_atomic(); WARN_ON_ONCE(!test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state)); clear_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state); @@ -333,14 +335,14 @@ void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt, struct rpc_rqst, rq_bc_pa_list); list_del(&rqst->rq_bc_pa_list); spin_unlock(&xprt->bc_pa_lock); -#ifdef RPCRDMA_BACKCHANNEL_DEBUG - pr_info("RPC: %s: using rqst %p\n", __func__, rqst); -#endif + dprintk("RPC: %s: using rqst %p\n", __func__, rqst); /* Prepare rqst */ rqst->rq_reply_bytes_recvd = 0; rqst->rq_bytes_sent = 0; rqst->rq_xid = headerp->rm_xid; + + rqst->rq_private_buf.len = size; set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state); buf = &rqst->rq_rcv_buf; @@ -355,10 +357,8 @@ void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt, * direction reply. */ req = rpcr_to_rdmar(rqst); -#ifdef RPCRDMA_BACKCHANNEL_DEBUG - pr_info("RPC: %s: attaching rep %p to req %p\n", + dprintk("RPC: %s: attaching rep %p to req %p\n", __func__, rep, req); -#endif req->rl_reply = rep; /* Defeat the retransmit detection logic in send_request */ diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c index f1e8dafbd..c14f3a4bf 100644 --- a/net/sunrpc/xprtrdma/fmr_ops.c +++ b/net/sunrpc/xprtrdma/fmr_ops.c @@ -179,6 +179,69 @@ out_maperr: return rc; } +static void +__fmr_dma_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg) +{ + struct ib_device *device = r_xprt->rx_ia.ri_device; + struct rpcrdma_mw *mw = seg->rl_mw; + int nsegs = seg->mr_nsegs; + + seg->rl_mw = NULL; + + while (nsegs--) + rpcrdma_unmap_one(device, seg++); + + rpcrdma_put_mw(r_xprt, mw); +} + +/* Invalidate all memory regions that were registered for "req". + * + * Sleeps until it is safe for the host CPU to access the + * previously mapped memory regions. + */ +static void +fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) +{ + struct rpcrdma_mr_seg *seg; + unsigned int i, nchunks; + struct rpcrdma_mw *mw; + LIST_HEAD(unmap_list); + int rc; + + dprintk("RPC: %s: req %p\n", __func__, req); + + /* ORDER: Invalidate all of the req's MRs first + * + * ib_unmap_fmr() is slow, so use a single call instead + * of one call per mapped MR. + */ + for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) { + seg = &req->rl_segments[i]; + mw = seg->rl_mw; + + list_add(&mw->r.fmr.fmr->list, &unmap_list); + + i += seg->mr_nsegs; + } + rc = ib_unmap_fmr(&unmap_list); + if (rc) + pr_warn("%s: ib_unmap_fmr failed (%i)\n", __func__, rc); + + /* ORDER: Now DMA unmap all of the req's MRs, and return + * them to the free MW list. + */ + for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) { + seg = &req->rl_segments[i]; + + __fmr_dma_unmap(r_xprt, seg); + + i += seg->mr_nsegs; + seg->mr_nsegs = 0; + } + + req->rl_nchunks = 0; +} + /* Use the ib_unmap_fmr() verb to prevent further remote * access via RDMA READ or RDMA WRITE. */ @@ -231,6 +294,7 @@ fmr_op_destroy(struct rpcrdma_buffer *buf) const struct rpcrdma_memreg_ops rpcrdma_fmr_memreg_ops = { .ro_map = fmr_op_map, + .ro_unmap_sync = fmr_op_unmap_sync, .ro_unmap = fmr_op_unmap, .ro_open = fmr_op_open, .ro_maxpages = fmr_op_maxpages, diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c index 88cf9e726..e16567389 100644 --- a/net/sunrpc/xprtrdma/frwr_ops.c +++ b/net/sunrpc/xprtrdma/frwr_ops.c @@ -190,12 +190,11 @@ static int frwr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep, struct rpcrdma_create_data_internal *cdata) { - struct ib_device_attr *devattr = &ia->ri_devattr; int depth, delta; ia->ri_max_frmr_depth = min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS, - devattr->max_fast_reg_page_list_len); + ia->ri_device->attrs.max_fast_reg_page_list_len); dprintk("RPC: %s: device's max FR page list len = %u\n", __func__, ia->ri_max_frmr_depth); @@ -222,8 +221,8 @@ frwr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep, } ep->rep_attr.cap.max_send_wr *= depth; - if (ep->rep_attr.cap.max_send_wr > devattr->max_qp_wr) { - cdata->max_requests = devattr->max_qp_wr / depth; + if (ep->rep_attr.cap.max_send_wr > ia->ri_device->attrs.max_qp_wr) { + cdata->max_requests = ia->ri_device->attrs.max_qp_wr / depth; if (!cdata->max_requests) return -EINVAL; ep->rep_attr.cap.max_send_wr = cdata->max_requests * @@ -245,12 +244,14 @@ frwr_op_maxpages(struct rpcrdma_xprt *r_xprt) rpcrdma_max_segments(r_xprt) * ia->ri_max_frmr_depth); } -/* If FAST_REG or LOCAL_INV failed, indicate the frmr needs to be reset. */ +/* If FAST_REG or LOCAL_INV failed, indicate the frmr needs + * to be reset. + * + * WARNING: Only wr_id and status are reliable at this point + */ static void -frwr_sendcompletion(struct ib_wc *wc) +__frwr_sendcompletion_flush(struct ib_wc *wc, struct rpcrdma_mw *r) { - struct rpcrdma_mw *r; - if (likely(wc->status == IB_WC_SUCCESS)) return; @@ -261,9 +262,23 @@ frwr_sendcompletion(struct ib_wc *wc) else pr_warn("RPC: %s: frmr %p error, status %s (%d)\n", __func__, r, ib_wc_status_msg(wc->status), wc->status); + r->r.frmr.fr_state = FRMR_IS_STALE; } +static void +frwr_sendcompletion(struct ib_wc *wc) +{ + struct rpcrdma_mw *r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id; + struct rpcrdma_frmr *f = &r->r.frmr; + + if (unlikely(wc->status != IB_WC_SUCCESS)) + __frwr_sendcompletion_flush(wc, r); + + if (f->fr_waiter) + complete(&f->fr_linv_done); +} + static int frwr_op_init(struct rpcrdma_xprt *r_xprt) { @@ -319,7 +334,7 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, struct rpcrdma_mw *mw; struct rpcrdma_frmr *frmr; struct ib_mr *mr; - struct ib_reg_wr reg_wr; + struct ib_reg_wr *reg_wr; struct ib_send_wr *bad_wr; int rc, i, n, dma_nents; u8 key; @@ -335,7 +350,9 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, } while (mw->r.frmr.fr_state != FRMR_IS_INVALID); frmr = &mw->r.frmr; frmr->fr_state = FRMR_IS_VALID; + frmr->fr_waiter = false; mr = frmr->fr_mr; + reg_wr = &frmr->fr_regwr; if (nsegs > ia->ri_max_frmr_depth) nsegs = ia->ri_max_frmr_depth; @@ -381,19 +398,19 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, key = (u8)(mr->rkey & 0x000000FF); ib_update_fast_reg_key(mr, ++key); - reg_wr.wr.next = NULL; - reg_wr.wr.opcode = IB_WR_REG_MR; - reg_wr.wr.wr_id = (uintptr_t)mw; - reg_wr.wr.num_sge = 0; - reg_wr.wr.send_flags = 0; - reg_wr.mr = mr; - reg_wr.key = mr->rkey; - reg_wr.access = writing ? - IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE : - IB_ACCESS_REMOTE_READ; + reg_wr->wr.next = NULL; + reg_wr->wr.opcode = IB_WR_REG_MR; + reg_wr->wr.wr_id = (uintptr_t)mw; + reg_wr->wr.num_sge = 0; + reg_wr->wr.send_flags = 0; + reg_wr->mr = mr; + reg_wr->key = mr->rkey; + reg_wr->access = writing ? + IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE : + IB_ACCESS_REMOTE_READ; DECR_CQCOUNT(&r_xprt->rx_ep); - rc = ib_post_send(ia->ri_id->qp, ®_wr.wr, &bad_wr); + rc = ib_post_send(ia->ri_id->qp, ®_wr->wr, &bad_wr); if (rc) goto out_senderr; @@ -413,6 +430,116 @@ out_senderr: return rc; } +static struct ib_send_wr * +__frwr_prepare_linv_wr(struct rpcrdma_mr_seg *seg) +{ + struct rpcrdma_mw *mw = seg->rl_mw; + struct rpcrdma_frmr *f = &mw->r.frmr; + struct ib_send_wr *invalidate_wr; + + f->fr_waiter = false; + f->fr_state = FRMR_IS_INVALID; + invalidate_wr = &f->fr_invwr; + + memset(invalidate_wr, 0, sizeof(*invalidate_wr)); + invalidate_wr->wr_id = (unsigned long)(void *)mw; + invalidate_wr->opcode = IB_WR_LOCAL_INV; + invalidate_wr->ex.invalidate_rkey = f->fr_mr->rkey; + + return invalidate_wr; +} + +static void +__frwr_dma_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, + int rc) +{ + struct ib_device *device = r_xprt->rx_ia.ri_device; + struct rpcrdma_mw *mw = seg->rl_mw; + struct rpcrdma_frmr *f = &mw->r.frmr; + + seg->rl_mw = NULL; + + ib_dma_unmap_sg(device, f->sg, f->sg_nents, seg->mr_dir); + + if (!rc) + rpcrdma_put_mw(r_xprt, mw); + else + __frwr_queue_recovery(mw); +} + +/* Invalidate all memory regions that were registered for "req". + * + * Sleeps until it is safe for the host CPU to access the + * previously mapped memory regions. + */ +static void +frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) +{ + struct ib_send_wr *invalidate_wrs, *pos, *prev, *bad_wr; + struct rpcrdma_ia *ia = &r_xprt->rx_ia; + struct rpcrdma_mr_seg *seg; + unsigned int i, nchunks; + struct rpcrdma_frmr *f; + int rc; + + dprintk("RPC: %s: req %p\n", __func__, req); + + /* ORDER: Invalidate all of the req's MRs first + * + * Chain the LOCAL_INV Work Requests and post them with + * a single ib_post_send() call. + */ + invalidate_wrs = pos = prev = NULL; + seg = NULL; + for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) { + seg = &req->rl_segments[i]; + + pos = __frwr_prepare_linv_wr(seg); + + if (!invalidate_wrs) + invalidate_wrs = pos; + else + prev->next = pos; + prev = pos; + + i += seg->mr_nsegs; + } + f = &seg->rl_mw->r.frmr; + + /* Strong send queue ordering guarantees that when the + * last WR in the chain completes, all WRs in the chain + * are complete. + */ + f->fr_invwr.send_flags = IB_SEND_SIGNALED; + f->fr_waiter = true; + init_completion(&f->fr_linv_done); + INIT_CQCOUNT(&r_xprt->rx_ep); + + /* Transport disconnect drains the receive CQ before it + * replaces the QP. The RPC reply handler won't call us + * unless ri_id->qp is a valid pointer. + */ + rc = ib_post_send(ia->ri_id->qp, invalidate_wrs, &bad_wr); + if (rc) + pr_warn("%s: ib_post_send failed %i\n", __func__, rc); + + wait_for_completion(&f->fr_linv_done); + + /* ORDER: Now DMA unmap all of the req's MRs, and return + * them to the free MW list. + */ + for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) { + seg = &req->rl_segments[i]; + + __frwr_dma_unmap(r_xprt, seg, rc); + + i += seg->mr_nsegs; + seg->mr_nsegs = 0; + } + + req->rl_nchunks = 0; +} + /* Post a LOCAL_INV Work Request to prevent further remote access * via RDMA READ or RDMA WRITE. */ @@ -423,23 +550,24 @@ frwr_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg) struct rpcrdma_ia *ia = &r_xprt->rx_ia; struct rpcrdma_mw *mw = seg1->rl_mw; struct rpcrdma_frmr *frmr = &mw->r.frmr; - struct ib_send_wr invalidate_wr, *bad_wr; + struct ib_send_wr *invalidate_wr, *bad_wr; int rc, nsegs = seg->mr_nsegs; dprintk("RPC: %s: FRMR %p\n", __func__, mw); seg1->rl_mw = NULL; frmr->fr_state = FRMR_IS_INVALID; + invalidate_wr = &mw->r.frmr.fr_invwr; - memset(&invalidate_wr, 0, sizeof(invalidate_wr)); - invalidate_wr.wr_id = (unsigned long)(void *)mw; - invalidate_wr.opcode = IB_WR_LOCAL_INV; - invalidate_wr.ex.invalidate_rkey = frmr->fr_mr->rkey; + memset(invalidate_wr, 0, sizeof(*invalidate_wr)); + invalidate_wr->wr_id = (uintptr_t)mw; + invalidate_wr->opcode = IB_WR_LOCAL_INV; + invalidate_wr->ex.invalidate_rkey = frmr->fr_mr->rkey; DECR_CQCOUNT(&r_xprt->rx_ep); ib_dma_unmap_sg(ia->ri_device, frmr->sg, frmr->sg_nents, seg1->mr_dir); read_lock(&ia->ri_qplock); - rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr); + rc = ib_post_send(ia->ri_id->qp, invalidate_wr, &bad_wr); read_unlock(&ia->ri_qplock); if (rc) goto out_err; @@ -471,6 +599,7 @@ frwr_op_destroy(struct rpcrdma_buffer *buf) const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops = { .ro_map = frwr_op_map, + .ro_unmap_sync = frwr_op_unmap_sync, .ro_unmap = frwr_op_unmap, .ro_open = frwr_op_open, .ro_maxpages = frwr_op_maxpages, diff --git a/net/sunrpc/xprtrdma/physical_ops.c b/net/sunrpc/xprtrdma/physical_ops.c index 617b76f22..dbb302ecf 100644 --- a/net/sunrpc/xprtrdma/physical_ops.c +++ b/net/sunrpc/xprtrdma/physical_ops.c @@ -83,6 +83,18 @@ physical_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg) return 1; } +/* DMA unmap all memory regions that were mapped for "req". + */ +static void +physical_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) +{ + struct ib_device *device = r_xprt->rx_ia.ri_device; + unsigned int i; + + for (i = 0; req->rl_nchunks; --req->rl_nchunks) + rpcrdma_unmap_one(device, &req->rl_segments[i++]); +} + static void physical_op_destroy(struct rpcrdma_buffer *buf) { @@ -90,6 +102,7 @@ physical_op_destroy(struct rpcrdma_buffer *buf) const struct rpcrdma_memreg_ops rpcrdma_physical_memreg_ops = { .ro_map = physical_op_map, + .ro_unmap_sync = physical_op_unmap_sync, .ro_unmap = physical_op_unmap, .ro_open = physical_op_open, .ro_maxpages = physical_op_maxpages, diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index c10d96994..0f28f2d74 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -804,6 +804,11 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep) if (req->rl_reply) goto out_duplicate; + /* Sanity checking has passed. We are now committed + * to complete this transaction. + */ + list_del_init(&rqst->rq_list); + spin_unlock_bh(&xprt->transport_lock); dprintk("RPC: %s: reply 0x%p completes request 0x%p\n" " RPC request 0x%p xid 0x%08x\n", __func__, rep, req, rqst, @@ -888,12 +893,23 @@ badheader: break; } + /* Invalidate and flush the data payloads before waking the + * waiting application. This guarantees the memory region is + * properly fenced from the server before the application + * accesses the data. It also ensures proper send flow + * control: waking the next RPC waits until this RPC has + * relinquished all its Send Queue entries. + */ + if (req->rl_nchunks) + r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, req); + credits = be32_to_cpu(headerp->rm_credit); if (credits == 0) credits = 1; /* don't deadlock */ else if (credits > r_xprt->rx_buf.rb_max_requests) credits = r_xprt->rx_buf.rb_max_requests; + spin_lock_bh(&xprt->transport_lock); cwnd = xprt->cwnd; xprt->cwnd = credits << RPC_CWNDSHIFT; if (xprt->cwnd > cwnd) diff --git a/net/sunrpc/xprtrdma/svc_rdma.c b/net/sunrpc/xprtrdma/svc_rdma.c index 1b7051bdb..c846ca9f1 100644 --- a/net/sunrpc/xprtrdma/svc_rdma.c +++ b/net/sunrpc/xprtrdma/svc_rdma.c @@ -55,6 +55,7 @@ unsigned int svcrdma_ord = RPCRDMA_ORD; static unsigned int min_ord = 1; static unsigned int max_ord = 4096; unsigned int svcrdma_max_requests = RPCRDMA_MAX_REQUESTS; +unsigned int svcrdma_max_bc_requests = RPCRDMA_MAX_BC_REQUESTS; static unsigned int min_max_requests = 4; static unsigned int max_max_requests = 16384; unsigned int svcrdma_max_req_size = RPCRDMA_MAX_REQ_SIZE; @@ -71,10 +72,6 @@ atomic_t rdma_stat_rq_prod; atomic_t rdma_stat_sq_poll; atomic_t rdma_stat_sq_prod; -/* Temporary NFS request map and context caches */ -struct kmem_cache *svc_rdma_map_cachep; -struct kmem_cache *svc_rdma_ctxt_cachep; - struct workqueue_struct *svc_rdma_wq; /* @@ -243,17 +240,16 @@ void svc_rdma_cleanup(void) svc_unreg_xprt_class(&svc_rdma_bc_class); #endif svc_unreg_xprt_class(&svc_rdma_class); - kmem_cache_destroy(svc_rdma_map_cachep); - kmem_cache_destroy(svc_rdma_ctxt_cachep); } int svc_rdma_init(void) { dprintk("SVCRDMA Module Init, register RPC RDMA transport\n"); dprintk("\tsvcrdma_ord : %d\n", svcrdma_ord); - dprintk("\tmax_requests : %d\n", svcrdma_max_requests); - dprintk("\tsq_depth : %d\n", + dprintk("\tmax_requests : %u\n", svcrdma_max_requests); + dprintk("\tsq_depth : %u\n", svcrdma_max_requests * RPCRDMA_SQ_DEPTH_MULT); + dprintk("\tmax_bc_requests : %u\n", svcrdma_max_bc_requests); dprintk("\tmax_inline : %d\n", svcrdma_max_req_size); svc_rdma_wq = alloc_workqueue("svc_rdma", 0, 0); @@ -264,39 +260,10 @@ int svc_rdma_init(void) svcrdma_table_header = register_sysctl_table(svcrdma_root_table); - /* Create the temporary map cache */ - svc_rdma_map_cachep = kmem_cache_create("svc_rdma_map_cache", - sizeof(struct svc_rdma_req_map), - 0, - SLAB_HWCACHE_ALIGN, - NULL); - if (!svc_rdma_map_cachep) { - printk(KERN_INFO "Could not allocate map cache.\n"); - goto err0; - } - - /* Create the temporary context cache */ - svc_rdma_ctxt_cachep = - kmem_cache_create("svc_rdma_ctxt_cache", - sizeof(struct svc_rdma_op_ctxt), - 0, - SLAB_HWCACHE_ALIGN, - NULL); - if (!svc_rdma_ctxt_cachep) { - printk(KERN_INFO "Could not allocate WR ctxt cache.\n"); - goto err1; - } - /* Register RDMA with the SVC transport switch */ svc_reg_xprt_class(&svc_rdma_class); #if defined(CONFIG_SUNRPC_BACKCHANNEL) svc_reg_xprt_class(&svc_rdma_bc_class); #endif return 0; - err1: - kmem_cache_destroy(svc_rdma_map_cachep); - err0: - unregister_sysctl_table(svcrdma_table_header); - destroy_workqueue(svc_rdma_wq); - return -ENOMEM; } diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c new file mode 100644 index 000000000..65a7c232a --- /dev/null +++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c @@ -0,0 +1,371 @@ +/* + * Copyright (c) 2015 Oracle. All rights reserved. + * + * Support for backward direction RPCs on RPC/RDMA (server-side). + */ + +#include <linux/sunrpc/svc_rdma.h> +#include "xprt_rdma.h" + +#define RPCDBG_FACILITY RPCDBG_SVCXPRT + +#undef SVCRDMA_BACKCHANNEL_DEBUG + +int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, struct rpcrdma_msg *rmsgp, + struct xdr_buf *rcvbuf) +{ + struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); + struct kvec *dst, *src = &rcvbuf->head[0]; + struct rpc_rqst *req; + unsigned long cwnd; + u32 credits; + size_t len; + __be32 xid; + __be32 *p; + int ret; + + p = (__be32 *)src->iov_base; + len = src->iov_len; + xid = rmsgp->rm_xid; + +#ifdef SVCRDMA_BACKCHANNEL_DEBUG + pr_info("%s: xid=%08x, length=%zu\n", + __func__, be32_to_cpu(xid), len); + pr_info("%s: RPC/RDMA: %*ph\n", + __func__, (int)RPCRDMA_HDRLEN_MIN, rmsgp); + pr_info("%s: RPC: %*ph\n", + __func__, (int)len, p); +#endif + + ret = -EAGAIN; + if (src->iov_len < 24) + goto out_shortreply; + + spin_lock_bh(&xprt->transport_lock); + req = xprt_lookup_rqst(xprt, xid); + if (!req) + goto out_notfound; + + dst = &req->rq_private_buf.head[0]; + memcpy(&req->rq_private_buf, &req->rq_rcv_buf, sizeof(struct xdr_buf)); + if (dst->iov_len < len) + goto out_unlock; + memcpy(dst->iov_base, p, len); + + credits = be32_to_cpu(rmsgp->rm_credit); + if (credits == 0) + credits = 1; /* don't deadlock */ + else if (credits > r_xprt->rx_buf.rb_bc_max_requests) + credits = r_xprt->rx_buf.rb_bc_max_requests; + + cwnd = xprt->cwnd; + xprt->cwnd = credits << RPC_CWNDSHIFT; + if (xprt->cwnd > cwnd) + xprt_release_rqst_cong(req->rq_task); + + ret = 0; + xprt_complete_rqst(req->rq_task, rcvbuf->len); + rcvbuf->len = 0; + +out_unlock: + spin_unlock_bh(&xprt->transport_lock); +out: + return ret; + +out_shortreply: + dprintk("svcrdma: short bc reply: xprt=%p, len=%zu\n", + xprt, src->iov_len); + goto out; + +out_notfound: + dprintk("svcrdma: unrecognized bc reply: xprt=%p, xid=%08x\n", + xprt, be32_to_cpu(xid)); + + goto out_unlock; +} + +/* Send a backwards direction RPC call. + * + * Caller holds the connection's mutex and has already marshaled + * the RPC/RDMA request. + * + * This is similar to svc_rdma_reply, but takes an rpc_rqst + * instead, does not support chunks, and avoids blocking memory + * allocation. + * + * XXX: There is still an opportunity to block in svc_rdma_send() + * if there are no SQ entries to post the Send. This may occur if + * the adapter has a small maximum SQ depth. + */ +static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma, + struct rpc_rqst *rqst) +{ + struct xdr_buf *sndbuf = &rqst->rq_snd_buf; + struct svc_rdma_op_ctxt *ctxt; + struct svc_rdma_req_map *vec; + struct ib_send_wr send_wr; + int ret; + + vec = svc_rdma_get_req_map(rdma); + ret = svc_rdma_map_xdr(rdma, sndbuf, vec); + if (ret) + goto out_err; + + /* Post a recv buffer to handle the reply for this request. */ + ret = svc_rdma_post_recv(rdma, GFP_NOIO); + if (ret) { + pr_err("svcrdma: Failed to post bc receive buffer, err=%d.\n", + ret); + pr_err("svcrdma: closing transport %p.\n", rdma); + set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags); + ret = -ENOTCONN; + goto out_err; + } + + ctxt = svc_rdma_get_context(rdma); + ctxt->pages[0] = virt_to_page(rqst->rq_buffer); + ctxt->count = 1; + + ctxt->wr_op = IB_WR_SEND; + ctxt->direction = DMA_TO_DEVICE; + ctxt->sge[0].lkey = rdma->sc_pd->local_dma_lkey; + ctxt->sge[0].length = sndbuf->len; + ctxt->sge[0].addr = + ib_dma_map_page(rdma->sc_cm_id->device, ctxt->pages[0], 0, + sndbuf->len, DMA_TO_DEVICE); + if (ib_dma_mapping_error(rdma->sc_cm_id->device, ctxt->sge[0].addr)) { + ret = -EIO; + goto out_unmap; + } + atomic_inc(&rdma->sc_dma_used); + + memset(&send_wr, 0, sizeof(send_wr)); + send_wr.wr_id = (unsigned long)ctxt; + send_wr.sg_list = ctxt->sge; + send_wr.num_sge = 1; + send_wr.opcode = IB_WR_SEND; + send_wr.send_flags = IB_SEND_SIGNALED; + + ret = svc_rdma_send(rdma, &send_wr); + if (ret) { + ret = -EIO; + goto out_unmap; + } + +out_err: + svc_rdma_put_req_map(rdma, vec); + dprintk("svcrdma: %s returns %d\n", __func__, ret); + return ret; + +out_unmap: + svc_rdma_unmap_dma(ctxt); + svc_rdma_put_context(ctxt, 1); + goto out_err; +} + +/* Server-side transport endpoint wants a whole page for its send + * buffer. The client RPC code constructs the RPC header in this + * buffer before it invokes ->send_request. + * + * Returns NULL if there was a temporary allocation failure. + */ +static void * +xprt_rdma_bc_allocate(struct rpc_task *task, size_t size) +{ + struct rpc_rqst *rqst = task->tk_rqstp; + struct svc_xprt *sxprt = rqst->rq_xprt->bc_xprt; + struct svcxprt_rdma *rdma; + struct page *page; + + rdma = container_of(sxprt, struct svcxprt_rdma, sc_xprt); + + /* Prevent an infinite loop: try to make this case work */ + if (size > PAGE_SIZE) + WARN_ONCE(1, "svcrdma: large bc buffer request (size %zu)\n", + size); + + page = alloc_page(RPCRDMA_DEF_GFP); + if (!page) + return NULL; + + return page_address(page); +} + +static void +xprt_rdma_bc_free(void *buffer) +{ + /* No-op: ctxt and page have already been freed. */ +} + +static int +rpcrdma_bc_send_request(struct svcxprt_rdma *rdma, struct rpc_rqst *rqst) +{ + struct rpc_xprt *xprt = rqst->rq_xprt; + struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); + struct rpcrdma_msg *headerp = (struct rpcrdma_msg *)rqst->rq_buffer; + int rc; + + /* Space in the send buffer for an RPC/RDMA header is reserved + * via xprt->tsh_size. + */ + headerp->rm_xid = rqst->rq_xid; + headerp->rm_vers = rpcrdma_version; + headerp->rm_credit = cpu_to_be32(r_xprt->rx_buf.rb_bc_max_requests); + headerp->rm_type = rdma_msg; + headerp->rm_body.rm_chunks[0] = xdr_zero; + headerp->rm_body.rm_chunks[1] = xdr_zero; + headerp->rm_body.rm_chunks[2] = xdr_zero; + +#ifdef SVCRDMA_BACKCHANNEL_DEBUG + pr_info("%s: %*ph\n", __func__, 64, rqst->rq_buffer); +#endif + + rc = svc_rdma_bc_sendto(rdma, rqst); + if (rc) + goto drop_connection; + return rc; + +drop_connection: + dprintk("svcrdma: failed to send bc call\n"); + xprt_disconnect_done(xprt); + return -ENOTCONN; +} + +/* Send an RPC call on the passive end of a transport + * connection. + */ +static int +xprt_rdma_bc_send_request(struct rpc_task *task) +{ + struct rpc_rqst *rqst = task->tk_rqstp; + struct svc_xprt *sxprt = rqst->rq_xprt->bc_xprt; + struct svcxprt_rdma *rdma; + int ret; + + dprintk("svcrdma: sending bc call with xid: %08x\n", + be32_to_cpu(rqst->rq_xid)); + + if (!mutex_trylock(&sxprt->xpt_mutex)) { + rpc_sleep_on(&sxprt->xpt_bc_pending, task, NULL); + if (!mutex_trylock(&sxprt->xpt_mutex)) + return -EAGAIN; + rpc_wake_up_queued_task(&sxprt->xpt_bc_pending, task); + } + + ret = -ENOTCONN; + rdma = container_of(sxprt, struct svcxprt_rdma, sc_xprt); + if (!test_bit(XPT_DEAD, &sxprt->xpt_flags)) + ret = rpcrdma_bc_send_request(rdma, rqst); + + mutex_unlock(&sxprt->xpt_mutex); + + if (ret < 0) + return ret; + return 0; +} + +static void +xprt_rdma_bc_close(struct rpc_xprt *xprt) +{ + dprintk("svcrdma: %s: xprt %p\n", __func__, xprt); +} + +static void +xprt_rdma_bc_put(struct rpc_xprt *xprt) +{ + dprintk("svcrdma: %s: xprt %p\n", __func__, xprt); + + xprt_free(xprt); + module_put(THIS_MODULE); +} + +static struct rpc_xprt_ops xprt_rdma_bc_procs = { + .reserve_xprt = xprt_reserve_xprt_cong, + .release_xprt = xprt_release_xprt_cong, + .alloc_slot = xprt_alloc_slot, + .release_request = xprt_release_rqst_cong, + .buf_alloc = xprt_rdma_bc_allocate, + .buf_free = xprt_rdma_bc_free, + .send_request = xprt_rdma_bc_send_request, + .set_retrans_timeout = xprt_set_retrans_timeout_def, + .close = xprt_rdma_bc_close, + .destroy = xprt_rdma_bc_put, + .print_stats = xprt_rdma_print_stats +}; + +static const struct rpc_timeout xprt_rdma_bc_timeout = { + .to_initval = 60 * HZ, + .to_maxval = 60 * HZ, +}; + +/* It shouldn't matter if the number of backchannel session slots + * doesn't match the number of RPC/RDMA credits. That just means + * one or the other will have extra slots that aren't used. + */ +static struct rpc_xprt * +xprt_setup_rdma_bc(struct xprt_create *args) +{ + struct rpc_xprt *xprt; + struct rpcrdma_xprt *new_xprt; + + if (args->addrlen > sizeof(xprt->addr)) { + dprintk("RPC: %s: address too large\n", __func__); + return ERR_PTR(-EBADF); + } + + xprt = xprt_alloc(args->net, sizeof(*new_xprt), + RPCRDMA_MAX_BC_REQUESTS, + RPCRDMA_MAX_BC_REQUESTS); + if (!xprt) { + dprintk("RPC: %s: couldn't allocate rpc_xprt\n", + __func__); + return ERR_PTR(-ENOMEM); + } + + xprt->timeout = &xprt_rdma_bc_timeout; + xprt_set_bound(xprt); + xprt_set_connected(xprt); + xprt->bind_timeout = RPCRDMA_BIND_TO; + xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO; + xprt->idle_timeout = RPCRDMA_IDLE_DISC_TO; + + xprt->prot = XPRT_TRANSPORT_BC_RDMA; + xprt->tsh_size = RPCRDMA_HDRLEN_MIN / sizeof(__be32); + xprt->ops = &xprt_rdma_bc_procs; + + memcpy(&xprt->addr, args->dstaddr, args->addrlen); + xprt->addrlen = args->addrlen; + xprt_rdma_format_addresses(xprt, (struct sockaddr *)&xprt->addr); + xprt->resvport = 0; + + xprt->max_payload = xprt_rdma_max_inline_read; + + new_xprt = rpcx_to_rdmax(xprt); + new_xprt->rx_buf.rb_bc_max_requests = xprt->max_reqs; + + xprt_get(xprt); + args->bc_xprt->xpt_bc_xprt = xprt; + xprt->bc_xprt = args->bc_xprt; + + if (!try_module_get(THIS_MODULE)) + goto out_fail; + + /* Final put for backchannel xprt is in __svc_rdma_free */ + xprt_get(xprt); + return xprt; + +out_fail: + xprt_rdma_free_addresses(xprt); + args->bc_xprt->xpt_bc_xprt = NULL; + xprt_put(xprt); + xprt_free(xprt); + return ERR_PTR(-EINVAL); +} + +struct xprt_class xprt_rdma_bc = { + .list = LIST_HEAD_INIT(xprt_rdma_bc.list), + .name = "rdma backchannel", + .owner = THIS_MODULE, + .ident = XPRT_TRANSPORT_BC_RDMA, + .setup = xprt_setup_rdma_bc, +}; diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c index ff4f01e52..c8b8a8b41 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c +++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c @@ -144,6 +144,7 @@ int rdma_read_chunk_lcl(struct svcxprt_rdma *xprt, head->arg.pages[pg_no] = rqstp->rq_arg.pages[pg_no]; head->arg.page_len += len; + head->arg.len += len; if (!pg_off) head->count++; @@ -160,8 +161,7 @@ int rdma_read_chunk_lcl(struct svcxprt_rdma *xprt, goto err; atomic_inc(&xprt->sc_dma_used); - /* The lkey here is either a local dma lkey or a dma_mr lkey */ - ctxt->sge[pno].lkey = xprt->sc_dma_lkey; + ctxt->sge[pno].lkey = xprt->sc_pd->local_dma_lkey; ctxt->sge[pno].length = len; ctxt->count++; @@ -567,6 +567,38 @@ static int rdma_read_complete(struct svc_rqst *rqstp, return ret; } +/* By convention, backchannel calls arrive via rdma_msg type + * messages, and never populate the chunk lists. This makes + * the RPC/RDMA header small and fixed in size, so it is + * straightforward to check the RPC header's direction field. + */ +static bool +svc_rdma_is_backchannel_reply(struct svc_xprt *xprt, struct rpcrdma_msg *rmsgp) +{ + __be32 *p = (__be32 *)rmsgp; + + if (!xprt->xpt_bc_xprt) + return false; + + if (rmsgp->rm_type != rdma_msg) + return false; + if (rmsgp->rm_body.rm_chunks[0] != xdr_zero) + return false; + if (rmsgp->rm_body.rm_chunks[1] != xdr_zero) + return false; + if (rmsgp->rm_body.rm_chunks[2] != xdr_zero) + return false; + + /* sanity */ + if (p[7] != rmsgp->rm_xid) + return false; + /* call direction */ + if (p[8] == cpu_to_be32(RPC_CALL)) + return false; + + return true; +} + /* * Set up the rqstp thread context to point to the RQ buffer. If * necessary, pull additional data from the client with an RDMA_READ @@ -632,6 +664,15 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp) goto close_out; } + if (svc_rdma_is_backchannel_reply(xprt, rmsgp)) { + ret = svc_rdma_handle_bc_reply(xprt->xpt_bc_xprt, rmsgp, + &rqstp->rq_arg); + svc_rdma_put_context(ctxt, 0); + if (ret) + goto repost; + return ret; + } + /* Read read-list data. */ ret = rdma_read_chunks(rdma_xprt, rmsgp, rqstp, ctxt); if (ret > 0) { @@ -668,4 +709,15 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp) set_bit(XPT_CLOSE, &xprt->xpt_flags); defer: return 0; + +repost: + ret = svc_rdma_post_recv(rdma_xprt, GFP_KERNEL); + if (ret) { + pr_err("svcrdma: could not post a receive buffer, err=%d.\n", + ret); + pr_err("svcrdma: closing transport %p.\n", rdma_xprt); + set_bit(XPT_CLOSE, &rdma_xprt->sc_xprt.xpt_flags); + ret = -ENOTCONN; + } + return ret; } diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c index 969a1ab75..df57f3ce6 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c +++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c @@ -50,9 +50,9 @@ #define RPCDBG_FACILITY RPCDBG_SVCXPRT -static int map_xdr(struct svcxprt_rdma *xprt, - struct xdr_buf *xdr, - struct svc_rdma_req_map *vec) +int svc_rdma_map_xdr(struct svcxprt_rdma *xprt, + struct xdr_buf *xdr, + struct svc_rdma_req_map *vec) { int sge_no; u32 sge_bytes; @@ -62,7 +62,7 @@ static int map_xdr(struct svcxprt_rdma *xprt, if (xdr->len != (xdr->head[0].iov_len + xdr->page_len + xdr->tail[0].iov_len)) { - pr_err("svcrdma: map_xdr: XDR buffer length error\n"); + pr_err("svcrdma: %s: XDR buffer length error\n", __func__); return -EIO; } @@ -97,9 +97,9 @@ static int map_xdr(struct svcxprt_rdma *xprt, sge_no++; } - dprintk("svcrdma: map_xdr: sge_no %d page_no %d " + dprintk("svcrdma: %s: sge_no %d page_no %d " "page_base %u page_len %u head_len %zu tail_len %zu\n", - sge_no, page_no, xdr->page_base, xdr->page_len, + __func__, sge_no, page_no, xdr->page_base, xdr->page_len, xdr->head[0].iov_len, xdr->tail[0].iov_len); vec->count = sge_no; @@ -265,7 +265,7 @@ static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp, sge[sge_no].addr)) goto err; atomic_inc(&xprt->sc_dma_used); - sge[sge_no].lkey = xprt->sc_dma_lkey; + sge[sge_no].lkey = xprt->sc_pd->local_dma_lkey; ctxt->count++; sge_off = 0; sge_no++; @@ -465,7 +465,7 @@ static int send_reply(struct svcxprt_rdma *rdma, int ret; /* Post a recv buffer to handle another request. */ - ret = svc_rdma_post_recv(rdma); + ret = svc_rdma_post_recv(rdma, GFP_KERNEL); if (ret) { printk(KERN_INFO "svcrdma: could not post a receive buffer, err=%d." @@ -480,7 +480,7 @@ static int send_reply(struct svcxprt_rdma *rdma, ctxt->count = 1; /* Prepare the SGE for the RPCRDMA Header */ - ctxt->sge[0].lkey = rdma->sc_dma_lkey; + ctxt->sge[0].lkey = rdma->sc_pd->local_dma_lkey; ctxt->sge[0].length = svc_rdma_xdr_get_reply_hdr_len(rdma_resp); ctxt->sge[0].addr = ib_dma_map_page(rdma->sc_cm_id->device, page, 0, @@ -504,7 +504,7 @@ static int send_reply(struct svcxprt_rdma *rdma, ctxt->sge[sge_no].addr)) goto err; atomic_inc(&rdma->sc_dma_used); - ctxt->sge[sge_no].lkey = rdma->sc_dma_lkey; + ctxt->sge[sge_no].lkey = rdma->sc_pd->local_dma_lkey; ctxt->sge[sge_no].length = sge_bytes; } if (byte_count != 0) { @@ -591,14 +591,17 @@ int svc_rdma_sendto(struct svc_rqst *rqstp) /* Build an req vec for the XDR */ ctxt = svc_rdma_get_context(rdma); ctxt->direction = DMA_TO_DEVICE; - vec = svc_rdma_get_req_map(); - ret = map_xdr(rdma, &rqstp->rq_res, vec); + vec = svc_rdma_get_req_map(rdma); + ret = svc_rdma_map_xdr(rdma, &rqstp->rq_res, vec); if (ret) goto err0; inline_bytes = rqstp->rq_res.len; /* Create the RDMA response header */ - res_page = alloc_page(GFP_KERNEL | __GFP_NOFAIL); + ret = -ENOMEM; + res_page = alloc_page(GFP_KERNEL); + if (!res_page) + goto err0; rdma_resp = page_address(res_page); reply_ary = svc_rdma_get_reply_array(rdma_argp); if (reply_ary) @@ -630,14 +633,14 @@ int svc_rdma_sendto(struct svc_rqst *rqstp) ret = send_reply(rdma, rqstp, res_page, rdma_resp, ctxt, vec, inline_bytes); - svc_rdma_put_req_map(vec); + svc_rdma_put_req_map(rdma, vec); dprintk("svcrdma: send_reply returns %d\n", ret); return ret; err1: put_page(res_page); err0: - svc_rdma_put_req_map(vec); + svc_rdma_put_req_map(rdma, vec); svc_rdma_put_context(ctxt, 0); return ret; } diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c index b348b4ade..5763825d0 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_transport.c +++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c @@ -153,18 +153,76 @@ static void svc_rdma_bc_free(struct svc_xprt *xprt) } #endif /* CONFIG_SUNRPC_BACKCHANNEL */ -struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt) +static struct svc_rdma_op_ctxt *alloc_ctxt(struct svcxprt_rdma *xprt, + gfp_t flags) { struct svc_rdma_op_ctxt *ctxt; - ctxt = kmem_cache_alloc(svc_rdma_ctxt_cachep, - GFP_KERNEL | __GFP_NOFAIL); - ctxt->xprt = xprt; - INIT_LIST_HEAD(&ctxt->dto_q); + ctxt = kmalloc(sizeof(*ctxt), flags); + if (ctxt) { + ctxt->xprt = xprt; + INIT_LIST_HEAD(&ctxt->free); + INIT_LIST_HEAD(&ctxt->dto_q); + } + return ctxt; +} + +static bool svc_rdma_prealloc_ctxts(struct svcxprt_rdma *xprt) +{ + unsigned int i; + + /* Each RPC/RDMA credit can consume a number of send + * and receive WQEs. One ctxt is allocated for each. + */ + i = xprt->sc_sq_depth + xprt->sc_rq_depth; + + while (i--) { + struct svc_rdma_op_ctxt *ctxt; + + ctxt = alloc_ctxt(xprt, GFP_KERNEL); + if (!ctxt) { + dprintk("svcrdma: No memory for RDMA ctxt\n"); + return false; + } + list_add(&ctxt->free, &xprt->sc_ctxts); + } + return true; +} + +struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt) +{ + struct svc_rdma_op_ctxt *ctxt = NULL; + + spin_lock_bh(&xprt->sc_ctxt_lock); + xprt->sc_ctxt_used++; + if (list_empty(&xprt->sc_ctxts)) + goto out_empty; + + ctxt = list_first_entry(&xprt->sc_ctxts, + struct svc_rdma_op_ctxt, free); + list_del_init(&ctxt->free); + spin_unlock_bh(&xprt->sc_ctxt_lock); + +out: ctxt->count = 0; ctxt->frmr = NULL; - atomic_inc(&xprt->sc_ctxt_used); return ctxt; + +out_empty: + /* Either pre-allocation missed the mark, or send + * queue accounting is broken. + */ + spin_unlock_bh(&xprt->sc_ctxt_lock); + + ctxt = alloc_ctxt(xprt, GFP_NOIO); + if (ctxt) + goto out; + + spin_lock_bh(&xprt->sc_ctxt_lock); + xprt->sc_ctxt_used--; + spin_unlock_bh(&xprt->sc_ctxt_lock); + WARN_ONCE(1, "svcrdma: empty RDMA ctxt list?\n"); + return NULL; } void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt) @@ -174,11 +232,11 @@ void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt) for (i = 0; i < ctxt->count && ctxt->sge[i].length; i++) { /* * Unmap the DMA addr in the SGE if the lkey matches - * the sc_dma_lkey, otherwise, ignore it since it is + * the local_dma_lkey, otherwise, ignore it since it is * an FRMR lkey and will be unmapped later when the * last WR that uses it completes. */ - if (ctxt->sge[i].lkey == xprt->sc_dma_lkey) { + if (ctxt->sge[i].lkey == xprt->sc_pd->local_dma_lkey) { atomic_dec(&xprt->sc_dma_used); ib_dma_unmap_page(xprt->sc_cm_id->device, ctxt->sge[i].addr, @@ -190,35 +248,108 @@ void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt) void svc_rdma_put_context(struct svc_rdma_op_ctxt *ctxt, int free_pages) { - struct svcxprt_rdma *xprt; + struct svcxprt_rdma *xprt = ctxt->xprt; int i; - xprt = ctxt->xprt; if (free_pages) for (i = 0; i < ctxt->count; i++) put_page(ctxt->pages[i]); - kmem_cache_free(svc_rdma_ctxt_cachep, ctxt); - atomic_dec(&xprt->sc_ctxt_used); + spin_lock_bh(&xprt->sc_ctxt_lock); + xprt->sc_ctxt_used--; + list_add(&ctxt->free, &xprt->sc_ctxts); + spin_unlock_bh(&xprt->sc_ctxt_lock); } -/* - * Temporary NFS req mappings are shared across all transport - * instances. These are short lived and should be bounded by the number - * of concurrent server threads * depth of the SQ. - */ -struct svc_rdma_req_map *svc_rdma_get_req_map(void) +static void svc_rdma_destroy_ctxts(struct svcxprt_rdma *xprt) +{ + while (!list_empty(&xprt->sc_ctxts)) { + struct svc_rdma_op_ctxt *ctxt; + + ctxt = list_first_entry(&xprt->sc_ctxts, + struct svc_rdma_op_ctxt, free); + list_del(&ctxt->free); + kfree(ctxt); + } +} + +static struct svc_rdma_req_map *alloc_req_map(gfp_t flags) { struct svc_rdma_req_map *map; - map = kmem_cache_alloc(svc_rdma_map_cachep, - GFP_KERNEL | __GFP_NOFAIL); + + map = kmalloc(sizeof(*map), flags); + if (map) + INIT_LIST_HEAD(&map->free); + return map; +} + +static bool svc_rdma_prealloc_maps(struct svcxprt_rdma *xprt) +{ + unsigned int i; + + /* One for each receive buffer on this connection. */ + i = xprt->sc_max_requests; + + while (i--) { + struct svc_rdma_req_map *map; + + map = alloc_req_map(GFP_KERNEL); + if (!map) { + dprintk("svcrdma: No memory for request map\n"); + return false; + } + list_add(&map->free, &xprt->sc_maps); + } + return true; +} + +struct svc_rdma_req_map *svc_rdma_get_req_map(struct svcxprt_rdma *xprt) +{ + struct svc_rdma_req_map *map = NULL; + + spin_lock(&xprt->sc_map_lock); + if (list_empty(&xprt->sc_maps)) + goto out_empty; + + map = list_first_entry(&xprt->sc_maps, + struct svc_rdma_req_map, free); + list_del_init(&map->free); + spin_unlock(&xprt->sc_map_lock); + +out: map->count = 0; return map; + +out_empty: + spin_unlock(&xprt->sc_map_lock); + + /* Pre-allocation amount was incorrect */ + map = alloc_req_map(GFP_NOIO); + if (map) + goto out; + + WARN_ONCE(1, "svcrdma: empty request map list?\n"); + return NULL; +} + +void svc_rdma_put_req_map(struct svcxprt_rdma *xprt, + struct svc_rdma_req_map *map) +{ + spin_lock(&xprt->sc_map_lock); + list_add(&map->free, &xprt->sc_maps); + spin_unlock(&xprt->sc_map_lock); } -void svc_rdma_put_req_map(struct svc_rdma_req_map *map) +static void svc_rdma_destroy_maps(struct svcxprt_rdma *xprt) { - kmem_cache_free(svc_rdma_map_cachep, map); + while (!list_empty(&xprt->sc_maps)) { + struct svc_rdma_req_map *map; + + map = list_first_entry(&xprt->sc_maps, + struct svc_rdma_req_map, free); + list_del(&map->free); + kfree(map); + } } /* ib_cq event handler */ @@ -386,46 +517,44 @@ static void rq_cq_reap(struct svcxprt_rdma *xprt) static void process_context(struct svcxprt_rdma *xprt, struct svc_rdma_op_ctxt *ctxt) { + struct svc_rdma_op_ctxt *read_hdr; + int free_pages = 0; + svc_rdma_unmap_dma(ctxt); switch (ctxt->wr_op) { case IB_WR_SEND: - if (ctxt->frmr) - pr_err("svcrdma: SEND: ctxt->frmr != NULL\n"); - svc_rdma_put_context(ctxt, 1); + free_pages = 1; break; case IB_WR_RDMA_WRITE: - if (ctxt->frmr) - pr_err("svcrdma: WRITE: ctxt->frmr != NULL\n"); - svc_rdma_put_context(ctxt, 0); break; case IB_WR_RDMA_READ: case IB_WR_RDMA_READ_WITH_INV: svc_rdma_put_frmr(xprt, ctxt->frmr); - if (test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) { - struct svc_rdma_op_ctxt *read_hdr = ctxt->read_hdr; - if (read_hdr) { - spin_lock_bh(&xprt->sc_rq_dto_lock); - set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags); - list_add_tail(&read_hdr->dto_q, - &xprt->sc_read_complete_q); - spin_unlock_bh(&xprt->sc_rq_dto_lock); - } else { - pr_err("svcrdma: ctxt->read_hdr == NULL\n"); - } - svc_xprt_enqueue(&xprt->sc_xprt); - } + + if (!test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) + break; + + read_hdr = ctxt->read_hdr; svc_rdma_put_context(ctxt, 0); - break; + + spin_lock_bh(&xprt->sc_rq_dto_lock); + set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags); + list_add_tail(&read_hdr->dto_q, + &xprt->sc_read_complete_q); + spin_unlock_bh(&xprt->sc_rq_dto_lock); + svc_xprt_enqueue(&xprt->sc_xprt); + return; default: - printk(KERN_ERR "svcrdma: unexpected completion type, " - "opcode=%d\n", - ctxt->wr_op); + dprintk("svcrdma: unexpected completion opcode=%d\n", + ctxt->wr_op); break; } + + svc_rdma_put_context(ctxt, free_pages); } /* @@ -523,19 +652,15 @@ static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv, INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q); INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q); INIT_LIST_HEAD(&cma_xprt->sc_frmr_q); + INIT_LIST_HEAD(&cma_xprt->sc_ctxts); + INIT_LIST_HEAD(&cma_xprt->sc_maps); init_waitqueue_head(&cma_xprt->sc_send_wait); spin_lock_init(&cma_xprt->sc_lock); spin_lock_init(&cma_xprt->sc_rq_dto_lock); spin_lock_init(&cma_xprt->sc_frmr_q_lock); - - cma_xprt->sc_ord = svcrdma_ord; - - cma_xprt->sc_max_req_size = svcrdma_max_req_size; - cma_xprt->sc_max_requests = svcrdma_max_requests; - cma_xprt->sc_sq_depth = svcrdma_max_requests * RPCRDMA_SQ_DEPTH_MULT; - atomic_set(&cma_xprt->sc_sq_count, 0); - atomic_set(&cma_xprt->sc_ctxt_used, 0); + spin_lock_init(&cma_xprt->sc_ctxt_lock); + spin_lock_init(&cma_xprt->sc_map_lock); if (listener) set_bit(XPT_LISTENER, &cma_xprt->sc_xprt.xpt_flags); @@ -543,7 +668,7 @@ static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv, return cma_xprt; } -int svc_rdma_post_recv(struct svcxprt_rdma *xprt) +int svc_rdma_post_recv(struct svcxprt_rdma *xprt, gfp_t flags) { struct ib_recv_wr recv_wr, *bad_recv_wr; struct svc_rdma_op_ctxt *ctxt; @@ -561,7 +686,9 @@ int svc_rdma_post_recv(struct svcxprt_rdma *xprt) pr_err("svcrdma: Too many sges (%d)\n", sge_no); goto err_put_ctxt; } - page = alloc_page(GFP_KERNEL | __GFP_NOFAIL); + page = alloc_page(flags); + if (!page) + goto err_put_ctxt; ctxt->pages[sge_no] = page; pa = ib_dma_map_page(xprt->sc_cm_id->device, page, 0, PAGE_SIZE, @@ -571,7 +698,7 @@ int svc_rdma_post_recv(struct svcxprt_rdma *xprt) atomic_inc(&xprt->sc_dma_used); ctxt->sge[sge_no].addr = pa; ctxt->sge[sge_no].length = PAGE_SIZE; - ctxt->sge[sge_no].lkey = xprt->sc_dma_lkey; + ctxt->sge[sge_no].lkey = xprt->sc_pd->local_dma_lkey; ctxt->count = sge_no + 1; buflen += PAGE_SIZE; } @@ -886,11 +1013,9 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) struct rdma_conn_param conn_param; struct ib_cq_init_attr cq_attr = {}; struct ib_qp_init_attr qp_attr; - struct ib_device_attr devattr; - int uninitialized_var(dma_mr_acc); - int need_dma_mr = 0; - int ret; - int i; + struct ib_device *dev; + unsigned int i; + int ret = 0; listen_rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt); clear_bit(XPT_CONN, &xprt->xpt_flags); @@ -910,37 +1035,42 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) dprintk("svcrdma: newxprt from accept queue = %p, cm_id=%p\n", newxprt, newxprt->sc_cm_id); - ret = ib_query_device(newxprt->sc_cm_id->device, &devattr); - if (ret) { - dprintk("svcrdma: could not query device attributes on " - "device %p, rc=%d\n", newxprt->sc_cm_id->device, ret); - goto errout; - } + dev = newxprt->sc_cm_id->device; /* Qualify the transport resource defaults with the * capabilities of this particular device */ - newxprt->sc_max_sge = min((size_t)devattr.max_sge, + newxprt->sc_max_sge = min((size_t)dev->attrs.max_sge, (size_t)RPCSVC_MAXPAGES); - newxprt->sc_max_sge_rd = min_t(size_t, devattr.max_sge_rd, + newxprt->sc_max_sge_rd = min_t(size_t, dev->attrs.max_sge_rd, RPCSVC_MAXPAGES); - newxprt->sc_max_requests = min((size_t)devattr.max_qp_wr, - (size_t)svcrdma_max_requests); - newxprt->sc_sq_depth = RPCRDMA_SQ_DEPTH_MULT * newxprt->sc_max_requests; + newxprt->sc_max_req_size = svcrdma_max_req_size; + newxprt->sc_max_requests = min_t(u32, dev->attrs.max_qp_wr, + svcrdma_max_requests); + newxprt->sc_max_bc_requests = min_t(u32, dev->attrs.max_qp_wr, + svcrdma_max_bc_requests); + newxprt->sc_rq_depth = newxprt->sc_max_requests + + newxprt->sc_max_bc_requests; + newxprt->sc_sq_depth = RPCRDMA_SQ_DEPTH_MULT * newxprt->sc_rq_depth; + + if (!svc_rdma_prealloc_ctxts(newxprt)) + goto errout; + if (!svc_rdma_prealloc_maps(newxprt)) + goto errout; /* * Limit ORD based on client limit, local device limit, and * configured svcrdma limit. */ - newxprt->sc_ord = min_t(size_t, devattr.max_qp_rd_atom, newxprt->sc_ord); + newxprt->sc_ord = min_t(size_t, dev->attrs.max_qp_rd_atom, newxprt->sc_ord); newxprt->sc_ord = min_t(size_t, svcrdma_ord, newxprt->sc_ord); - newxprt->sc_pd = ib_alloc_pd(newxprt->sc_cm_id->device); + newxprt->sc_pd = ib_alloc_pd(dev); if (IS_ERR(newxprt->sc_pd)) { dprintk("svcrdma: error creating PD for connect request\n"); goto errout; } cq_attr.cqe = newxprt->sc_sq_depth; - newxprt->sc_sq_cq = ib_create_cq(newxprt->sc_cm_id->device, + newxprt->sc_sq_cq = ib_create_cq(dev, sq_comp_handler, cq_event_handler, newxprt, @@ -949,8 +1079,8 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) dprintk("svcrdma: error creating SQ CQ for connect request\n"); goto errout; } - cq_attr.cqe = newxprt->sc_max_requests; - newxprt->sc_rq_cq = ib_create_cq(newxprt->sc_cm_id->device, + cq_attr.cqe = newxprt->sc_rq_depth; + newxprt->sc_rq_cq = ib_create_cq(dev, rq_comp_handler, cq_event_handler, newxprt, @@ -964,7 +1094,7 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) qp_attr.event_handler = qp_event_handler; qp_attr.qp_context = &newxprt->sc_xprt; qp_attr.cap.max_send_wr = newxprt->sc_sq_depth; - qp_attr.cap.max_recv_wr = newxprt->sc_max_requests; + qp_attr.cap.max_recv_wr = newxprt->sc_rq_depth; qp_attr.cap.max_send_sge = newxprt->sc_max_sge; qp_attr.cap.max_recv_sge = newxprt->sc_max_sge; qp_attr.sq_sig_type = IB_SIGNAL_REQ_WR; @@ -978,7 +1108,7 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) " cap.max_send_sge = %d\n" " cap.max_recv_sge = %d\n", newxprt->sc_cm_id, newxprt->sc_pd, - newxprt->sc_cm_id->device, newxprt->sc_pd->device, + dev, newxprt->sc_pd->device, qp_attr.cap.max_send_wr, qp_attr.cap.max_recv_wr, qp_attr.cap.max_send_sge, @@ -1014,9 +1144,9 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) * of an RDMA_READ. IB does not. */ newxprt->sc_reader = rdma_read_chunk_lcl; - if (devattr.device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS) { + if (dev->attrs.device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS) { newxprt->sc_frmr_pg_list_len = - devattr.max_fast_reg_page_list_len; + dev->attrs.max_fast_reg_page_list_len; newxprt->sc_dev_caps |= SVCRDMA_DEVCAP_FAST_REG; newxprt->sc_reader = rdma_read_chunk_frmr; } @@ -1024,44 +1154,16 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) /* * Determine if a DMA MR is required and if so, what privs are required */ - if (!rdma_protocol_iwarp(newxprt->sc_cm_id->device, - newxprt->sc_cm_id->port_num) && - !rdma_ib_or_roce(newxprt->sc_cm_id->device, - newxprt->sc_cm_id->port_num)) + if (!rdma_protocol_iwarp(dev, newxprt->sc_cm_id->port_num) && + !rdma_ib_or_roce(dev, newxprt->sc_cm_id->port_num)) goto errout; - if (!(newxprt->sc_dev_caps & SVCRDMA_DEVCAP_FAST_REG) || - !(devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY)) { - need_dma_mr = 1; - dma_mr_acc = IB_ACCESS_LOCAL_WRITE; - if (rdma_protocol_iwarp(newxprt->sc_cm_id->device, - newxprt->sc_cm_id->port_num) && - !(newxprt->sc_dev_caps & SVCRDMA_DEVCAP_FAST_REG)) - dma_mr_acc |= IB_ACCESS_REMOTE_WRITE; - } - - if (rdma_protocol_iwarp(newxprt->sc_cm_id->device, - newxprt->sc_cm_id->port_num)) + if (rdma_protocol_iwarp(dev, newxprt->sc_cm_id->port_num)) newxprt->sc_dev_caps |= SVCRDMA_DEVCAP_READ_W_INV; - /* Create the DMA MR if needed, otherwise, use the DMA LKEY */ - if (need_dma_mr) { - /* Register all of physical memory */ - newxprt->sc_phys_mr = - ib_get_dma_mr(newxprt->sc_pd, dma_mr_acc); - if (IS_ERR(newxprt->sc_phys_mr)) { - dprintk("svcrdma: Failed to create DMA MR ret=%d\n", - ret); - goto errout; - } - newxprt->sc_dma_lkey = newxprt->sc_phys_mr->lkey; - } else - newxprt->sc_dma_lkey = - newxprt->sc_cm_id->device->local_dma_lkey; - /* Post receive buffers */ - for (i = 0; i < newxprt->sc_max_requests; i++) { - ret = svc_rdma_post_recv(newxprt); + for (i = 0; i < newxprt->sc_rq_depth; i++) { + ret = svc_rdma_post_recv(newxprt, GFP_KERNEL); if (ret) { dprintk("svcrdma: failure posting receive buffers\n"); goto errout; @@ -1160,12 +1262,14 @@ static void __svc_rdma_free(struct work_struct *work) { struct svcxprt_rdma *rdma = container_of(work, struct svcxprt_rdma, sc_work); - dprintk("svcrdma: svc_rdma_free(%p)\n", rdma); + struct svc_xprt *xprt = &rdma->sc_xprt; + + dprintk("svcrdma: %s(%p)\n", __func__, rdma); /* We should only be called from kref_put */ - if (atomic_read(&rdma->sc_xprt.xpt_ref.refcount) != 0) + if (atomic_read(&xprt->xpt_ref.refcount) != 0) pr_err("svcrdma: sc_xprt still in use? (%d)\n", - atomic_read(&rdma->sc_xprt.xpt_ref.refcount)); + atomic_read(&xprt->xpt_ref.refcount)); /* * Destroy queued, but not processed read completions. Note @@ -1193,15 +1297,22 @@ static void __svc_rdma_free(struct work_struct *work) } /* Warn if we leaked a resource or under-referenced */ - if (atomic_read(&rdma->sc_ctxt_used) != 0) + if (rdma->sc_ctxt_used != 0) pr_err("svcrdma: ctxt still in use? (%d)\n", - atomic_read(&rdma->sc_ctxt_used)); + rdma->sc_ctxt_used); if (atomic_read(&rdma->sc_dma_used) != 0) pr_err("svcrdma: dma still in use? (%d)\n", atomic_read(&rdma->sc_dma_used)); - /* De-allocate fastreg mr */ + /* Final put of backchannel client transport */ + if (xprt->xpt_bc_xprt) { + xprt_put(xprt->xpt_bc_xprt); + xprt->xpt_bc_xprt = NULL; + } + rdma_dealloc_frmr_q(rdma); + svc_rdma_destroy_ctxts(rdma); + svc_rdma_destroy_maps(rdma); /* Destroy the QP if present (not a listener) */ if (rdma->sc_qp && !IS_ERR(rdma->sc_qp)) @@ -1213,9 +1324,6 @@ static void __svc_rdma_free(struct work_struct *work) if (rdma->sc_rq_cq && !IS_ERR(rdma->sc_rq_cq)) ib_destroy_cq(rdma->sc_rq_cq); - if (rdma->sc_phys_mr && !IS_ERR(rdma->sc_phys_mr)) - ib_dereg_mr(rdma->sc_phys_mr); - if (rdma->sc_pd && !IS_ERR(rdma->sc_pd)) ib_dealloc_pd(rdma->sc_pd); @@ -1321,7 +1429,9 @@ void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp, int length; int ret; - p = alloc_page(GFP_KERNEL | __GFP_NOFAIL); + p = alloc_page(GFP_KERNEL); + if (!p) + return; va = page_address(p); /* XDR encode error */ @@ -1341,7 +1451,7 @@ void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp, return; } atomic_inc(&xprt->sc_dma_used); - ctxt->sge[0].lkey = xprt->sc_dma_lkey; + ctxt->sge[0].lkey = xprt->sc_pd->local_dma_lkey; ctxt->sge[0].length = length; /* Prepare SEND WR */ diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c index 8c545f7d7..b1b009f10 100644 --- a/net/sunrpc/xprtrdma/transport.c +++ b/net/sunrpc/xprtrdma/transport.c @@ -63,7 +63,7 @@ */ static unsigned int xprt_rdma_slot_table_entries = RPCRDMA_DEF_SLOT_TABLE; -static unsigned int xprt_rdma_max_inline_read = RPCRDMA_DEF_INLINE; +unsigned int xprt_rdma_max_inline_read = RPCRDMA_DEF_INLINE; static unsigned int xprt_rdma_max_inline_write = RPCRDMA_DEF_INLINE; static unsigned int xprt_rdma_inline_write_padding; static unsigned int xprt_rdma_memreg_strategy = RPCRDMA_FRMR; @@ -143,12 +143,7 @@ static struct ctl_table sunrpc_table[] = { #endif -#define RPCRDMA_BIND_TO (60U * HZ) -#define RPCRDMA_INIT_REEST_TO (5U * HZ) -#define RPCRDMA_MAX_REEST_TO (30U * HZ) -#define RPCRDMA_IDLE_DISC_TO (5U * 60 * HZ) - -static struct rpc_xprt_ops xprt_rdma_procs; /* forward reference */ +static struct rpc_xprt_ops xprt_rdma_procs; /*forward reference */ static void xprt_rdma_format_addresses4(struct rpc_xprt *xprt, struct sockaddr *sap) @@ -174,7 +169,7 @@ xprt_rdma_format_addresses6(struct rpc_xprt *xprt, struct sockaddr *sap) xprt->address_strings[RPC_DISPLAY_NETID] = RPCBIND_NETID_RDMA6; } -static void +void xprt_rdma_format_addresses(struct rpc_xprt *xprt, struct sockaddr *sap) { char buf[128]; @@ -203,7 +198,7 @@ xprt_rdma_format_addresses(struct rpc_xprt *xprt, struct sockaddr *sap) xprt->address_strings[RPC_DISPLAY_PROTO] = "rdma"; } -static void +void xprt_rdma_free_addresses(struct rpc_xprt *xprt) { unsigned int i; @@ -499,7 +494,7 @@ xprt_rdma_allocate(struct rpc_task *task, size_t size) if (req == NULL) return NULL; - flags = GFP_NOIO | __GFP_NOWARN; + flags = RPCRDMA_DEF_GFP; if (RPC_IS_SWAPPER(task)) flags = __GFP_MEMALLOC | GFP_NOWAIT | __GFP_NOWARN; @@ -576,6 +571,9 @@ xprt_rdma_free(void *buffer) rb = container_of(buffer, struct rpcrdma_regbuf, rg_base[0]); req = rb->rg_owner; + if (req->rl_backchannel) + return; + r_xprt = container_of(req->rl_buffer, struct rpcrdma_xprt, rx_buf); dprintk("RPC: %s: called on 0x%p\n", __func__, req->rl_reply); @@ -639,7 +637,7 @@ drop_connection: return -ENOTCONN; /* implies disconnect */ } -static void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq) +void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq) { struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); long idle_time = 0; @@ -740,6 +738,11 @@ void xprt_rdma_cleanup(void) rpcrdma_destroy_wq(); frwr_destroy_recovery_wq(); + + rc = xprt_unregister_transport(&xprt_rdma_bc); + if (rc) + dprintk("RPC: %s: xprt_unregister(bc) returned %i\n", + __func__, rc); } int xprt_rdma_init(void) @@ -763,6 +766,14 @@ int xprt_rdma_init(void) return rc; } + rc = xprt_register_transport(&xprt_rdma_bc); + if (rc) { + xprt_unregister_transport(&xprt_rdma); + rpcrdma_destroy_wq(); + frwr_destroy_recovery_wq(); + return rc; + } + dprintk("RPCRDMA Module Init, register RPC RDMA transport\n"); dprintk("Defaults:\n"); diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index eadd16551..878f1bfb1 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -462,7 +462,6 @@ int rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg) { struct rpcrdma_ia *ia = &xprt->rx_ia; - struct ib_device_attr *devattr = &ia->ri_devattr; int rc; ia->ri_dma_mr = NULL; @@ -482,16 +481,10 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg) goto out2; } - rc = ib_query_device(ia->ri_device, devattr); - if (rc) { - dprintk("RPC: %s: ib_query_device failed %d\n", - __func__, rc); - goto out3; - } - if (memreg == RPCRDMA_FRMR) { - if (!(devattr->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS) || - (devattr->max_fast_reg_page_list_len == 0)) { + if (!(ia->ri_device->attrs.device_cap_flags & + IB_DEVICE_MEM_MGT_EXTENSIONS) || + (ia->ri_device->attrs.max_fast_reg_page_list_len == 0)) { dprintk("RPC: %s: FRMR registration " "not supported by HCA\n", __func__); memreg = RPCRDMA_MTHCAFMR; @@ -566,24 +559,23 @@ int rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata) { - struct ib_device_attr *devattr = &ia->ri_devattr; struct ib_cq *sendcq, *recvcq; struct ib_cq_init_attr cq_attr = {}; unsigned int max_qp_wr; int rc, err; - if (devattr->max_sge < RPCRDMA_MAX_IOVS) { + if (ia->ri_device->attrs.max_sge < RPCRDMA_MAX_IOVS) { dprintk("RPC: %s: insufficient sge's available\n", __func__); return -ENOMEM; } - if (devattr->max_qp_wr <= RPCRDMA_BACKWARD_WRS) { + if (ia->ri_device->attrs.max_qp_wr <= RPCRDMA_BACKWARD_WRS) { dprintk("RPC: %s: insufficient wqe's available\n", __func__); return -ENOMEM; } - max_qp_wr = devattr->max_qp_wr - RPCRDMA_BACKWARD_WRS; + max_qp_wr = ia->ri_device->attrs.max_qp_wr - RPCRDMA_BACKWARD_WRS; /* check provider's send/recv wr limits */ if (cdata->max_requests > max_qp_wr) @@ -616,10 +608,8 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, /* set trigger for requesting send completion */ ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1; - if (ep->rep_cqinit > RPCRDMA_MAX_UNSIGNALED_SENDS) - ep->rep_cqinit = RPCRDMA_MAX_UNSIGNALED_SENDS; - else if (ep->rep_cqinit <= 2) - ep->rep_cqinit = 0; + if (ep->rep_cqinit <= 2) + ep->rep_cqinit = 0; /* always signal? */ INIT_CQCOUNT(ep); init_waitqueue_head(&ep->rep_connect_wait); INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker); @@ -670,11 +660,11 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, /* Client offers RDMA Read but does not initiate */ ep->rep_remote_cma.initiator_depth = 0; - if (devattr->max_qp_rd_atom > 32) /* arbitrary but <= 255 */ + if (ia->ri_device->attrs.max_qp_rd_atom > 32) /* arbitrary but <= 255 */ ep->rep_remote_cma.responder_resources = 32; else ep->rep_remote_cma.responder_resources = - devattr->max_qp_rd_atom; + ia->ri_device->attrs.max_qp_rd_atom; ep->rep_remote_cma.retry_count = 7; ep->rep_remote_cma.flow_control = 0; @@ -852,10 +842,11 @@ retry: if (extras) { rc = rpcrdma_ep_post_extra_recv(r_xprt, extras); - if (rc) + if (rc) { pr_warn("%s: rpcrdma_ep_post_extra_recv: %i\n", __func__, rc); rc = 0; + } } } @@ -1337,15 +1328,14 @@ rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count) struct rpcrdma_ia *ia = &r_xprt->rx_ia; struct rpcrdma_ep *ep = &r_xprt->rx_ep; struct rpcrdma_rep *rep; - unsigned long flags; int rc; while (count--) { - spin_lock_irqsave(&buffers->rb_lock, flags); + spin_lock(&buffers->rb_lock); if (list_empty(&buffers->rb_recv_bufs)) goto out_reqbuf; rep = rpcrdma_buffer_get_rep_locked(buffers); - spin_unlock_irqrestore(&buffers->rb_lock, flags); + spin_unlock(&buffers->rb_lock); rc = rpcrdma_ep_post_recv(ia, ep, rep); if (rc) @@ -1355,7 +1345,7 @@ rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count) return 0; out_reqbuf: - spin_unlock_irqrestore(&buffers->rb_lock, flags); + spin_unlock(&buffers->rb_lock); pr_warn("%s: no extra receive buffers\n", __func__); return -ENOMEM; diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index ac7f8d4f6..38fe11b09 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -55,6 +55,11 @@ #define RDMA_RESOLVE_TIMEOUT (5000) /* 5 seconds */ #define RDMA_CONNECT_RETRY_MAX (2) /* retries if no listener backlog */ +#define RPCRDMA_BIND_TO (60U * HZ) +#define RPCRDMA_INIT_REEST_TO (5U * HZ) +#define RPCRDMA_MAX_REEST_TO (30U * HZ) +#define RPCRDMA_IDLE_DISC_TO (5U * 60 * HZ) + /* * Interface Adapter -- one per transport instance */ @@ -68,7 +73,6 @@ struct rpcrdma_ia { struct completion ri_done; int ri_async_rc; unsigned int ri_max_frmr_depth; - struct ib_device_attr ri_devattr; struct ib_qp_attr ri_qp_attr; struct ib_qp_init_attr ri_qp_init_attr; }; @@ -88,12 +92,6 @@ struct rpcrdma_ep { struct delayed_work rep_connect_worker; }; -/* - * Force a signaled SEND Work Request every so often, - * in case the provider needs to do some housekeeping. - */ -#define RPCRDMA_MAX_UNSIGNALED_SENDS (32) - #define INIT_CQCOUNT(ep) atomic_set(&(ep)->rep_cqcount, (ep)->rep_cqinit) #define DECR_CQCOUNT(ep) atomic_sub_return(1, &(ep)->rep_cqcount) @@ -148,6 +146,8 @@ rdmab_to_msg(struct rpcrdma_regbuf *rb) return (struct rpcrdma_msg *)rb->rg_base; } +#define RPCRDMA_DEF_GFP (GFP_NOIO | __GFP_NOWARN) + /* * struct rpcrdma_rep -- this structure encapsulates state required to recv * and complete a reply, asychronously. It needs several pieces of @@ -207,6 +207,12 @@ struct rpcrdma_frmr { enum rpcrdma_frmr_state fr_state; struct work_struct fr_work; struct rpcrdma_xprt *fr_xprt; + bool fr_waiter; + struct completion fr_linv_done;; + union { + struct ib_reg_wr fr_regwr; + struct ib_send_wr fr_invwr; + }; }; struct rpcrdma_fmr { @@ -309,6 +315,8 @@ struct rpcrdma_buffer { u32 rb_bc_srv_max_requests; spinlock_t rb_reqslock; /* protect rb_allreqs */ struct list_head rb_allreqs; + + u32 rb_bc_max_requests; }; #define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia) @@ -364,6 +372,8 @@ struct rpcrdma_xprt; struct rpcrdma_memreg_ops { int (*ro_map)(struct rpcrdma_xprt *, struct rpcrdma_mr_seg *, int, bool); + void (*ro_unmap_sync)(struct rpcrdma_xprt *, + struct rpcrdma_req *); int (*ro_unmap)(struct rpcrdma_xprt *, struct rpcrdma_mr_seg *); int (*ro_open)(struct rpcrdma_ia *, @@ -514,6 +524,10 @@ int rpcrdma_marshal_req(struct rpc_rqst *); /* RPC/RDMA module init - xprtrdma/transport.c */ +extern unsigned int xprt_rdma_max_inline_read; +void xprt_rdma_format_addresses(struct rpc_xprt *xprt, struct sockaddr *sap); +void xprt_rdma_free_addresses(struct rpc_xprt *xprt); +void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq); int xprt_rdma_init(void); void xprt_rdma_cleanup(void); @@ -529,11 +543,6 @@ void xprt_rdma_bc_free_rqst(struct rpc_rqst *); void xprt_rdma_bc_destroy(struct rpc_xprt *, unsigned int); #endif /* CONFIG_SUNRPC_BACKCHANNEL */ -/* Temporary NFS request map cache. Created in svc_rdma.c */ -extern struct kmem_cache *svc_rdma_map_cachep; -/* WR context cache. Created in svc_rdma.c */ -extern struct kmem_cache *svc_rdma_ctxt_cachep; -/* Workqueue created in svc_rdma.c */ -extern struct workqueue_struct *svc_rdma_wq; +extern struct xprt_class xprt_rdma_bc; #endif /* _LINUX_SUNRPC_XPRT_RDMA_H */ diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 027c9ef8a..fde2138b8 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -1900,18 +1900,6 @@ static inline void xs_reclassify_socket(int family, struct socket *sock) } } #else -static inline void xs_reclassify_socketu(struct socket *sock) -{ -} - -static inline void xs_reclassify_socket4(struct socket *sock) -{ -} - -static inline void xs_reclassify_socket6(struct socket *sock) -{ -} - static inline void xs_reclassify_socket(int family, struct socket *sock) { } @@ -2001,7 +1989,7 @@ static int xs_local_setup_socket(struct sock_xprt *transport) "transport socket (%d).\n", -status); goto out; } - xs_reclassify_socketu(sock); + xs_reclassify_socket(AF_LOCAL, sock); dprintk("RPC: worker connecting xprt %p via AF_LOCAL to %s\n", xprt, xprt->address_strings[RPC_DISPLAY_ADDR]); |