diff options
Diffstat (limited to 'net/sunrpc')
-rw-r--r-- | net/sunrpc/auth.c | 9 | ||||
-rw-r--r-- | net/sunrpc/auth_generic.c | 13 | ||||
-rw-r--r-- | net/sunrpc/auth_gss/auth_gss.c | 6 | ||||
-rw-r--r-- | net/sunrpc/auth_gss/svcauth_gss.c | 5 | ||||
-rw-r--r-- | net/sunrpc/auth_unix.c | 6 | ||||
-rw-r--r-- | net/sunrpc/clnt.c | 17 | ||||
-rw-r--r-- | net/sunrpc/socklib.c | 2 | ||||
-rw-r--r-- | net/sunrpc/svc_xprt.c | 23 | ||||
-rw-r--r-- | net/sunrpc/svcsock.c | 8 | ||||
-rw-r--r-- | net/sunrpc/xdr.c | 2 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/backchannel.c | 16 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/fmr_ops.c | 134 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/frwr_ops.c | 214 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/physical_ops.c | 39 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/rpc_rdma.c | 517 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_marshal.c | 32 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 36 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_sendto.c | 28 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/svc_rdma_transport.c | 17 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/transport.c | 16 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/verbs.c | 78 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/xprt_rdma.h | 47 | ||||
-rw-r--r-- | net/sunrpc/xprtsock.c | 21 |
23 files changed, 768 insertions, 518 deletions
diff --git a/net/sunrpc/auth.c b/net/sunrpc/auth.c index 02f53674d..040ff627c 100644 --- a/net/sunrpc/auth.c +++ b/net/sunrpc/auth.c @@ -543,7 +543,7 @@ rpcauth_cache_enforce_limit(void) */ struct rpc_cred * rpcauth_lookup_credcache(struct rpc_auth *auth, struct auth_cred * acred, - int flags) + int flags, gfp_t gfp) { LIST_HEAD(free); struct rpc_cred_cache *cache = auth->au_credcache; @@ -580,7 +580,7 @@ rpcauth_lookup_credcache(struct rpc_auth *auth, struct auth_cred * acred, if (flags & RPCAUTH_LOOKUP_RCU) return ERR_PTR(-ECHILD); - new = auth->au_ops->crcreate(auth, acred, flags); + new = auth->au_ops->crcreate(auth, acred, flags, gfp); if (IS_ERR(new)) { cred = new; goto out; @@ -703,8 +703,7 @@ rpcauth_bindcred(struct rpc_task *task, struct rpc_cred *cred, int flags) new = rpcauth_bind_new_cred(task, lookupflags); if (IS_ERR(new)) return PTR_ERR(new); - if (req->rq_cred != NULL) - put_rpccred(req->rq_cred); + put_rpccred(req->rq_cred); req->rq_cred = new; return 0; } @@ -712,6 +711,8 @@ rpcauth_bindcred(struct rpc_task *task, struct rpc_cred *cred, int flags) void put_rpccred(struct rpc_cred *cred) { + if (cred == NULL) + return; /* Fast path for unhashed credentials */ if (test_bit(RPCAUTH_CRED_HASHED, &cred->cr_flags) == 0) { if (atomic_dec_and_test(&cred->cr_count)) diff --git a/net/sunrpc/auth_generic.c b/net/sunrpc/auth_generic.c index 41248b182..54dd3fdea 100644 --- a/net/sunrpc/auth_generic.c +++ b/net/sunrpc/auth_generic.c @@ -38,6 +38,13 @@ struct rpc_cred *rpc_lookup_cred(void) } EXPORT_SYMBOL_GPL(rpc_lookup_cred); +struct rpc_cred * +rpc_lookup_generic_cred(struct auth_cred *acred, int flags, gfp_t gfp) +{ + return rpcauth_lookup_credcache(&generic_auth, acred, flags, gfp); +} +EXPORT_SYMBOL_GPL(rpc_lookup_generic_cred); + struct rpc_cred *rpc_lookup_cred_nonblock(void) { return rpcauth_lookupcred(&generic_auth, RPCAUTH_LOOKUP_RCU); @@ -77,15 +84,15 @@ static struct rpc_cred *generic_bind_cred(struct rpc_task *task, static struct rpc_cred * generic_lookup_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags) { - return rpcauth_lookup_credcache(&generic_auth, acred, flags); + return rpcauth_lookup_credcache(&generic_auth, acred, flags, GFP_KERNEL); } static struct rpc_cred * -generic_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags) +generic_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags, gfp_t gfp) { struct generic_cred *gcred; - gcred = kmalloc(sizeof(*gcred), GFP_KERNEL); + gcred = kmalloc(sizeof(*gcred), gfp); if (gcred == NULL) return ERR_PTR(-ENOMEM); diff --git a/net/sunrpc/auth_gss/auth_gss.c b/net/sunrpc/auth_gss/auth_gss.c index 15612ffa8..e64ae93d5 100644 --- a/net/sunrpc/auth_gss/auth_gss.c +++ b/net/sunrpc/auth_gss/auth_gss.c @@ -1299,11 +1299,11 @@ gss_destroy_cred(struct rpc_cred *cred) static struct rpc_cred * gss_lookup_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags) { - return rpcauth_lookup_credcache(auth, acred, flags); + return rpcauth_lookup_credcache(auth, acred, flags, GFP_NOFS); } static struct rpc_cred * -gss_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags) +gss_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags, gfp_t gfp) { struct gss_auth *gss_auth = container_of(auth, struct gss_auth, rpc_auth); struct gss_cred *cred = NULL; @@ -1313,7 +1313,7 @@ gss_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags) __func__, from_kuid(&init_user_ns, acred->uid), auth->au_flavor); - if (!(cred = kzalloc(sizeof(*cred), GFP_NOFS))) + if (!(cred = kzalloc(sizeof(*cred), gfp))) goto out_err; rpcauth_init_cred(&cred->gc_base, acred, auth, &gss_credops); diff --git a/net/sunrpc/auth_gss/svcauth_gss.c b/net/sunrpc/auth_gss/svcauth_gss.c index 4605dc73d..e085f5ae1 100644 --- a/net/sunrpc/auth_gss/svcauth_gss.c +++ b/net/sunrpc/auth_gss/svcauth_gss.c @@ -569,10 +569,9 @@ gss_svc_searchbyctx(struct cache_detail *cd, struct xdr_netobj *handle) struct rsc *found; memset(&rsci, 0, sizeof(rsci)); - if (dup_to_netobj(&rsci.handle, handle->data, handle->len)) - return NULL; + rsci.handle.data = handle->data; + rsci.handle.len = handle->len; found = rsc_lookup(cd, &rsci); - rsc_free(&rsci); if (!found) return NULL; if (cache_check(cd, &found->h, NULL)) diff --git a/net/sunrpc/auth_unix.c b/net/sunrpc/auth_unix.c index 0d3dd364c..9f65452b7 100644 --- a/net/sunrpc/auth_unix.c +++ b/net/sunrpc/auth_unix.c @@ -52,11 +52,11 @@ unx_destroy(struct rpc_auth *auth) static struct rpc_cred * unx_lookup_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags) { - return rpcauth_lookup_credcache(auth, acred, flags); + return rpcauth_lookup_credcache(auth, acred, flags, GFP_NOFS); } static struct rpc_cred * -unx_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags) +unx_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags, gfp_t gfp) { struct unx_cred *cred; unsigned int groups = 0; @@ -66,7 +66,7 @@ unx_create_cred(struct rpc_auth *auth, struct auth_cred *acred, int flags) from_kuid(&init_user_ns, acred->uid), from_kgid(&init_user_ns, acred->gid)); - if (!(cred = kmalloc(sizeof(*cred), GFP_NOFS))) + if (!(cred = kmalloc(sizeof(*cred), gfp))) return ERR_PTR(-ENOMEM); rpcauth_init_cred(&cred->uc_base, acred, auth, &unix_credops); diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c index 837dd910a..2808d550d 100644 --- a/net/sunrpc/clnt.c +++ b/net/sunrpc/clnt.c @@ -1433,6 +1433,23 @@ size_t rpc_max_payload(struct rpc_clnt *clnt) EXPORT_SYMBOL_GPL(rpc_max_payload); /** + * rpc_max_bc_payload - Get maximum backchannel payload size, in bytes + * @clnt: RPC client to query + */ +size_t rpc_max_bc_payload(struct rpc_clnt *clnt) +{ + struct rpc_xprt *xprt; + size_t ret; + + rcu_read_lock(); + xprt = rcu_dereference(clnt->cl_xprt); + ret = xprt->ops->bc_maxpayload(xprt); + rcu_read_unlock(); + return ret; +} +EXPORT_SYMBOL_GPL(rpc_max_bc_payload); + +/** * rpc_get_timeout - Get timeout for transport in units of HZ * @clnt: RPC client to query */ diff --git a/net/sunrpc/socklib.c b/net/sunrpc/socklib.c index de70c7802..f217c348b 100644 --- a/net/sunrpc/socklib.c +++ b/net/sunrpc/socklib.c @@ -155,7 +155,7 @@ int csum_partial_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb) struct xdr_skb_reader desc; desc.skb = skb; - desc.offset = sizeof(struct udphdr); + desc.offset = 0; desc.count = skb->len - desc.offset; if (skb_csum_unnecessary(skb)) diff --git a/net/sunrpc/svc_xprt.c b/net/sunrpc/svc_xprt.c index 7231cb413..4f01f6310 100644 --- a/net/sunrpc/svc_xprt.c +++ b/net/sunrpc/svc_xprt.c @@ -246,13 +246,12 @@ void svc_add_new_perm_xprt(struct svc_serv *serv, struct svc_xprt *new) svc_xprt_received(new); } -int svc_create_xprt(struct svc_serv *serv, const char *xprt_name, +int _svc_create_xprt(struct svc_serv *serv, const char *xprt_name, struct net *net, const int family, const unsigned short port, int flags) { struct svc_xprt_class *xcl; - dprintk("svc: creating transport %s[%d]\n", xprt_name, port); spin_lock(&svc_xprt_class_lock); list_for_each_entry(xcl, &svc_xprt_class_list, xcl_list) { struct svc_xprt *newxprt; @@ -276,12 +275,28 @@ int svc_create_xprt(struct svc_serv *serv, const char *xprt_name, } err: spin_unlock(&svc_xprt_class_lock); - dprintk("svc: transport %s not found\n", xprt_name); - /* This errno is exposed to user space. Provide a reasonable * perror msg for a bad transport. */ return -EPROTONOSUPPORT; } + +int svc_create_xprt(struct svc_serv *serv, const char *xprt_name, + struct net *net, const int family, + const unsigned short port, int flags) +{ + int err; + + dprintk("svc: creating transport %s[%d]\n", xprt_name, port); + err = _svc_create_xprt(serv, xprt_name, net, family, port, flags); + if (err == -EPROTONOSUPPORT) { + request_module("svc%s", xprt_name); + err = _svc_create_xprt(serv, xprt_name, net, family, port, flags); + } + if (err) + dprintk("svc: transport %s not found, err %d\n", + xprt_name, err); + return err; +} EXPORT_SYMBOL_GPL(svc_create_xprt); /* diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c index 1413cdcc1..dadfec66d 100644 --- a/net/sunrpc/svcsock.c +++ b/net/sunrpc/svcsock.c @@ -85,8 +85,7 @@ static void svc_reclassify_socket(struct socket *sock) { struct sock *sk = sock->sk; - WARN_ON_ONCE(sock_owned_by_user(sk)); - if (sock_owned_by_user(sk)) + if (WARN_ON_ONCE(!sock_allow_reclassification(sk))) return; switch (sk->sk_family) { @@ -617,7 +616,7 @@ static int svc_udp_recvfrom(struct svc_rqst *rqstp) svsk->sk_sk->sk_stamp = skb->tstamp; set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags); /* there may be more data... */ - len = skb->len - sizeof(struct udphdr); + len = skb->len; rqstp->rq_arg.len = len; rqstp->rq_prot = IPPROTO_UDP; @@ -641,8 +640,7 @@ static int svc_udp_recvfrom(struct svc_rqst *rqstp) skb_free_datagram_locked(svsk->sk_sk, skb); } else { /* we can use it in-place */ - rqstp->rq_arg.head[0].iov_base = skb->data + - sizeof(struct udphdr); + rqstp->rq_arg.head[0].iov_base = skb->data; rqstp->rq_arg.head[0].iov_len = len; if (skb_checksum_complete(skb)) goto out_free; diff --git a/net/sunrpc/xdr.c b/net/sunrpc/xdr.c index 6bdb38652..c4f3cc0c0 100644 --- a/net/sunrpc/xdr.c +++ b/net/sunrpc/xdr.c @@ -797,6 +797,8 @@ void xdr_init_decode(struct xdr_stream *xdr, struct xdr_buf *buf, __be32 *p) xdr_set_iov(xdr, buf->head, buf->len); else if (buf->page_len != 0) xdr_set_page_base(xdr, 0, buf->len); + else + xdr_set_iov(xdr, buf->head, buf->len); if (p != NULL && p > xdr->p && xdr->end >= p) { xdr->nwords -= p - xdr->p; xdr->p = p; diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c index 2dcd7640e..87762d976 100644 --- a/net/sunrpc/xprtrdma/backchannel.c +++ b/net/sunrpc/xprtrdma/backchannel.c @@ -192,6 +192,22 @@ int xprt_rdma_bc_up(struct svc_serv *serv, struct net *net) } /** + * xprt_rdma_bc_maxpayload - Return maximum backchannel message size + * @xprt: transport + * + * Returns maximum size, in bytes, of a backchannel message + */ +size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *xprt) +{ + struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); + struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; + size_t maxmsg; + + maxmsg = min_t(unsigned int, cdata->inline_rsize, cdata->inline_wsize); + return maxmsg - RPCRDMA_HDRLEN_MIN; +} + +/** * rpcrdma_bc_marshal_reply - Send backwards direction reply * @rqst: buffer containing RPC reply data * diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c index b289e1065..6326ebe8b 100644 --- a/net/sunrpc/xprtrdma/fmr_ops.c +++ b/net/sunrpc/xprtrdma/fmr_ops.c @@ -35,10 +35,71 @@ /* Maximum scatter/gather per FMR */ #define RPCRDMA_MAX_FMR_SGES (64) +static struct workqueue_struct *fmr_recovery_wq; + +#define FMR_RECOVERY_WQ_FLAGS (WQ_UNBOUND) + +int +fmr_alloc_recovery_wq(void) +{ + fmr_recovery_wq = alloc_workqueue("fmr_recovery", WQ_UNBOUND, 0); + return !fmr_recovery_wq ? -ENOMEM : 0; +} + +void +fmr_destroy_recovery_wq(void) +{ + struct workqueue_struct *wq; + + if (!fmr_recovery_wq) + return; + + wq = fmr_recovery_wq; + fmr_recovery_wq = NULL; + destroy_workqueue(wq); +} + +static int +__fmr_unmap(struct rpcrdma_mw *mw) +{ + LIST_HEAD(l); + + list_add(&mw->fmr.fmr->list, &l); + return ib_unmap_fmr(&l); +} + +/* Deferred reset of a single FMR. Generate a fresh rkey by + * replacing the MR. There's no recovery if this fails. + */ +static void +__fmr_recovery_worker(struct work_struct *work) +{ + struct rpcrdma_mw *mw = container_of(work, struct rpcrdma_mw, + mw_work); + struct rpcrdma_xprt *r_xprt = mw->mw_xprt; + + __fmr_unmap(mw); + rpcrdma_put_mw(r_xprt, mw); + return; +} + +/* A broken MR was discovered in a context that can't sleep. + * Defer recovery to the recovery worker. + */ +static void +__fmr_queue_recovery(struct rpcrdma_mw *mw) +{ + INIT_WORK(&mw->mw_work, __fmr_recovery_worker); + queue_work(fmr_recovery_wq, &mw->mw_work); +} + static int fmr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep, struct rpcrdma_create_data_internal *cdata) { + rpcrdma_set_max_header_sizes(ia, cdata, max_t(unsigned int, 1, + RPCRDMA_MAX_DATA_SEGS / + RPCRDMA_MAX_FMR_SGES)); return 0; } @@ -48,7 +109,7 @@ static size_t fmr_op_maxpages(struct rpcrdma_xprt *r_xprt) { return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS, - rpcrdma_max_segments(r_xprt) * RPCRDMA_MAX_FMR_SGES); + RPCRDMA_MAX_HDR_SEGS * RPCRDMA_MAX_FMR_SGES); } static int @@ -89,6 +150,7 @@ fmr_op_init(struct rpcrdma_xprt *r_xprt) if (IS_ERR(r->fmr.fmr)) goto out_fmr_err; + r->mw_xprt = r_xprt; list_add(&r->mw_list, &buf->rb_mws); list_add(&r->mw_all, &buf->rb_all); } @@ -104,15 +166,6 @@ out: return rc; } -static int -__fmr_unmap(struct rpcrdma_mw *r) -{ - LIST_HEAD(l); - - list_add(&r->fmr.fmr->list, &l); - return ib_unmap_fmr(&l); -} - /* Use the ib_map_phys_fmr() verb to register a memory region * for remote access via RDMA READ or RDMA WRITE. */ @@ -183,15 +236,10 @@ static void __fmr_dma_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg) { struct ib_device *device = r_xprt->rx_ia.ri_device; - struct rpcrdma_mw *mw = seg->rl_mw; int nsegs = seg->mr_nsegs; - seg->rl_mw = NULL; - while (nsegs--) rpcrdma_unmap_one(device, seg++); - - rpcrdma_put_mw(r_xprt, mw); } /* Invalidate all memory regions that were registered for "req". @@ -234,42 +282,50 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) seg = &req->rl_segments[i]; __fmr_dma_unmap(r_xprt, seg); + rpcrdma_put_mw(r_xprt, seg->rl_mw); i += seg->mr_nsegs; seg->mr_nsegs = 0; + seg->rl_mw = NULL; } req->rl_nchunks = 0; } -/* Use the ib_unmap_fmr() verb to prevent further remote - * access via RDMA READ or RDMA WRITE. +/* Use a slow, safe mechanism to invalidate all memory regions + * that were registered for "req". + * + * In the asynchronous case, DMA unmapping occurs first here + * because the rpcrdma_mr_seg is released immediately after this + * call. It's contents won't be available in __fmr_dma_unmap later. + * FIXME. */ -static int -fmr_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg) +static void +fmr_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, + bool sync) { - struct rpcrdma_ia *ia = &r_xprt->rx_ia; - struct rpcrdma_mr_seg *seg1 = seg; - struct rpcrdma_mw *mw = seg1->rl_mw; - int rc, nsegs = seg->mr_nsegs; + struct rpcrdma_mr_seg *seg; + struct rpcrdma_mw *mw; + unsigned int i; - dprintk("RPC: %s: FMR %p\n", __func__, mw); + for (i = 0; req->rl_nchunks; req->rl_nchunks--) { + seg = &req->rl_segments[i]; + mw = seg->rl_mw; - seg1->rl_mw = NULL; - while (seg1->mr_nsegs--) - rpcrdma_unmap_one(ia->ri_device, seg++); - rc = __fmr_unmap(mw); - if (rc) - goto out_err; - rpcrdma_put_mw(r_xprt, mw); - return nsegs; + if (sync) { + /* ORDER */ + __fmr_unmap(mw); + __fmr_dma_unmap(r_xprt, seg); + rpcrdma_put_mw(r_xprt, mw); + } else { + __fmr_dma_unmap(r_xprt, seg); + __fmr_queue_recovery(mw); + } -out_err: - /* The FMR is abandoned, but remains in rb_all. fmr_op_destroy - * will attempt to release it when the transport is destroyed. - */ - dprintk("RPC: %s: ib_unmap_fmr status %i\n", __func__, rc); - return nsegs; + i += seg->mr_nsegs; + seg->mr_nsegs = 0; + seg->rl_mw = NULL; + } } static void @@ -295,7 +351,7 @@ fmr_op_destroy(struct rpcrdma_buffer *buf) const struct rpcrdma_memreg_ops rpcrdma_fmr_memreg_ops = { .ro_map = fmr_op_map, .ro_unmap_sync = fmr_op_unmap_sync, - .ro_unmap = fmr_op_unmap, + .ro_unmap_safe = fmr_op_unmap_safe, .ro_open = fmr_op_open, .ro_maxpages = fmr_op_maxpages, .ro_init = fmr_op_init, diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c index c250924a9..c0947544b 100644 --- a/net/sunrpc/xprtrdma/frwr_ops.c +++ b/net/sunrpc/xprtrdma/frwr_ops.c @@ -98,6 +98,47 @@ frwr_destroy_recovery_wq(void) destroy_workqueue(wq); } +static int +__frwr_reset_mr(struct rpcrdma_ia *ia, struct rpcrdma_mw *r) +{ + struct rpcrdma_frmr *f = &r->frmr; + int rc; + + rc = ib_dereg_mr(f->fr_mr); + if (rc) { + pr_warn("rpcrdma: ib_dereg_mr status %d, frwr %p orphaned\n", + rc, r); + return rc; + } + + f->fr_mr = ib_alloc_mr(ia->ri_pd, IB_MR_TYPE_MEM_REG, + ia->ri_max_frmr_depth); + if (IS_ERR(f->fr_mr)) { + pr_warn("rpcrdma: ib_alloc_mr status %ld, frwr %p orphaned\n", + PTR_ERR(f->fr_mr), r); + return PTR_ERR(f->fr_mr); + } + + dprintk("RPC: %s: recovered FRMR %p\n", __func__, r); + f->fr_state = FRMR_IS_INVALID; + return 0; +} + +static void +__frwr_reset_and_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw) +{ + struct rpcrdma_ia *ia = &r_xprt->rx_ia; + struct rpcrdma_frmr *f = &mw->frmr; + int rc; + + rc = __frwr_reset_mr(ia, mw); + ib_dma_unmap_sg(ia->ri_device, f->fr_sg, f->fr_nents, f->fr_dir); + if (rc) + return; + + rpcrdma_put_mw(r_xprt, mw); +} + /* Deferred reset of a single FRMR. Generate a fresh rkey by * replacing the MR. * @@ -109,26 +150,10 @@ static void __frwr_recovery_worker(struct work_struct *work) { struct rpcrdma_mw *r = container_of(work, struct rpcrdma_mw, - frmr.fr_work); - struct rpcrdma_xprt *r_xprt = r->frmr.fr_xprt; - unsigned int depth = r_xprt->rx_ia.ri_max_frmr_depth; - struct ib_pd *pd = r_xprt->rx_ia.ri_pd; - - if (ib_dereg_mr(r->frmr.fr_mr)) - goto out_fail; + mw_work); - r->frmr.fr_mr = ib_alloc_mr(pd, IB_MR_TYPE_MEM_REG, depth); - if (IS_ERR(r->frmr.fr_mr)) - goto out_fail; - - dprintk("RPC: %s: recovered FRMR %p\n", __func__, r); - r->frmr.fr_state = FRMR_IS_INVALID; - rpcrdma_put_mw(r_xprt, r); + __frwr_reset_and_unmap(r->mw_xprt, r); return; - -out_fail: - pr_warn("RPC: %s: FRMR %p unrecovered\n", - __func__, r); } /* A broken MR was discovered in a context that can't sleep. @@ -137,8 +162,8 @@ out_fail: static void __frwr_queue_recovery(struct rpcrdma_mw *r) { - INIT_WORK(&r->frmr.fr_work, __frwr_recovery_worker); - queue_work(frwr_recovery_wq, &r->frmr.fr_work); + INIT_WORK(&r->mw_work, __frwr_recovery_worker); + queue_work(frwr_recovery_wq, &r->mw_work); } static int @@ -152,11 +177,11 @@ __frwr_init(struct rpcrdma_mw *r, struct ib_pd *pd, struct ib_device *device, if (IS_ERR(f->fr_mr)) goto out_mr_err; - f->sg = kcalloc(depth, sizeof(*f->sg), GFP_KERNEL); - if (!f->sg) + f->fr_sg = kcalloc(depth, sizeof(*f->fr_sg), GFP_KERNEL); + if (!f->fr_sg) goto out_list_err; - sg_init_table(f->sg, depth); + sg_init_table(f->fr_sg, depth); init_completion(&f->fr_linv_done); @@ -185,7 +210,7 @@ __frwr_release(struct rpcrdma_mw *r) if (rc) dprintk("RPC: %s: ib_dereg_mr status %i\n", __func__, rc); - kfree(r->frmr.sg); + kfree(r->frmr.fr_sg); } static int @@ -231,6 +256,9 @@ frwr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep, depth; } + rpcrdma_set_max_header_sizes(ia, cdata, max_t(unsigned int, 1, + RPCRDMA_MAX_DATA_SEGS / + ia->ri_max_frmr_depth)); return 0; } @@ -243,7 +271,7 @@ frwr_op_maxpages(struct rpcrdma_xprt *r_xprt) struct rpcrdma_ia *ia = &r_xprt->rx_ia; return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS, - rpcrdma_max_segments(r_xprt) * ia->ri_max_frmr_depth); + RPCRDMA_MAX_HDR_SEGS * ia->ri_max_frmr_depth); } static void @@ -350,9 +378,9 @@ frwr_op_init(struct rpcrdma_xprt *r_xprt) return rc; } + r->mw_xprt = r_xprt; list_add(&r->mw_list, &buf->rb_mws); list_add(&r->mw_all, &buf->rb_all); - r->frmr.fr_xprt = r_xprt; } return 0; @@ -396,12 +424,12 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, for (i = 0; i < nsegs;) { if (seg->mr_page) - sg_set_page(&frmr->sg[i], + sg_set_page(&frmr->fr_sg[i], seg->mr_page, seg->mr_len, offset_in_page(seg->mr_offset)); else - sg_set_buf(&frmr->sg[i], seg->mr_offset, + sg_set_buf(&frmr->fr_sg[i], seg->mr_offset, seg->mr_len); ++seg; @@ -412,25 +440,26 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len)) break; } - frmr->sg_nents = i; + frmr->fr_nents = i; + frmr->fr_dir = direction; - dma_nents = ib_dma_map_sg(device, frmr->sg, frmr->sg_nents, direction); + dma_nents = ib_dma_map_sg(device, frmr->fr_sg, frmr->fr_nents, direction); if (!dma_nents) { pr_err("RPC: %s: failed to dma map sg %p sg_nents %u\n", - __func__, frmr->sg, frmr->sg_nents); + __func__, frmr->fr_sg, frmr->fr_nents); return -ENOMEM; } - n = ib_map_mr_sg(mr, frmr->sg, frmr->sg_nents, PAGE_SIZE); - if (unlikely(n != frmr->sg_nents)) { + n = ib_map_mr_sg(mr, frmr->fr_sg, frmr->fr_nents, NULL, PAGE_SIZE); + if (unlikely(n != frmr->fr_nents)) { pr_err("RPC: %s: failed to map mr %p (%u/%u)\n", - __func__, frmr->fr_mr, n, frmr->sg_nents); + __func__, frmr->fr_mr, n, frmr->fr_nents); rc = n < 0 ? n : -EINVAL; goto out_senderr; } dprintk("RPC: %s: Using frmr %p to map %u segments (%u bytes)\n", - __func__, mw, frmr->sg_nents, mr->length); + __func__, mw, frmr->fr_nents, mr->length); key = (u8)(mr->rkey & 0x000000FF); ib_update_fast_reg_key(mr, ++key); @@ -452,18 +481,16 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, if (rc) goto out_senderr; - seg1->mr_dir = direction; seg1->rl_mw = mw; seg1->mr_rkey = mr->rkey; seg1->mr_base = mr->iova; - seg1->mr_nsegs = frmr->sg_nents; + seg1->mr_nsegs = frmr->fr_nents; seg1->mr_len = mr->length; - return frmr->sg_nents; + return frmr->fr_nents; out_senderr: dprintk("RPC: %s: ib_post_send status %i\n", __func__, rc); - ib_dma_unmap_sg(device, frmr->sg, dma_nents, direction); __frwr_queue_recovery(mw); return rc; } @@ -487,24 +514,6 @@ __frwr_prepare_linv_wr(struct rpcrdma_mr_seg *seg) return invalidate_wr; } -static void -__frwr_dma_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, - int rc) -{ - struct ib_device *device = r_xprt->rx_ia.ri_device; - struct rpcrdma_mw *mw = seg->rl_mw; - struct rpcrdma_frmr *f = &mw->frmr; - - seg->rl_mw = NULL; - - ib_dma_unmap_sg(device, f->sg, f->sg_nents, seg->mr_dir); - - if (!rc) - rpcrdma_put_mw(r_xprt, mw); - else - __frwr_queue_recovery(mw); -} - /* Invalidate all memory regions that were registered for "req". * * Sleeps until it is safe for the host CPU to access the @@ -518,6 +527,7 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) struct rpcrdma_mr_seg *seg; unsigned int i, nchunks; struct rpcrdma_frmr *f; + struct rpcrdma_mw *mw; int rc; dprintk("RPC: %s: req %p\n", __func__, req); @@ -558,11 +568,8 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) * unless ri_id->qp is a valid pointer. */ rc = ib_post_send(ia->ri_id->qp, invalidate_wrs, &bad_wr); - if (rc) { - pr_warn("%s: ib_post_send failed %i\n", __func__, rc); - rdma_disconnect(ia->ri_id); - goto unmap; - } + if (rc) + goto reset_mrs; wait_for_completion(&f->fr_linv_done); @@ -572,56 +579,65 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) unmap: for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) { seg = &req->rl_segments[i]; + mw = seg->rl_mw; + seg->rl_mw = NULL; - __frwr_dma_unmap(r_xprt, seg, rc); + ib_dma_unmap_sg(ia->ri_device, f->fr_sg, f->fr_nents, + f->fr_dir); + rpcrdma_put_mw(r_xprt, mw); i += seg->mr_nsegs; seg->mr_nsegs = 0; } req->rl_nchunks = 0; -} + return; -/* Post a LOCAL_INV Work Request to prevent further remote access - * via RDMA READ or RDMA WRITE. - */ -static int -frwr_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg) -{ - struct rpcrdma_mr_seg *seg1 = seg; - struct rpcrdma_ia *ia = &r_xprt->rx_ia; - struct rpcrdma_mw *mw = seg1->rl_mw; - struct rpcrdma_frmr *frmr = &mw->frmr; - struct ib_send_wr *invalidate_wr, *bad_wr; - int rc, nsegs = seg->mr_nsegs; +reset_mrs: + pr_warn("%s: ib_post_send failed %i\n", __func__, rc); - dprintk("RPC: %s: FRMR %p\n", __func__, mw); + /* Find and reset the MRs in the LOCAL_INV WRs that did not + * get posted. This is synchronous, and slow. + */ + for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) { + seg = &req->rl_segments[i]; + mw = seg->rl_mw; + f = &mw->frmr; - seg1->rl_mw = NULL; - frmr->fr_state = FRMR_IS_INVALID; - invalidate_wr = &mw->frmr.fr_invwr; + if (mw->frmr.fr_mr->rkey == bad_wr->ex.invalidate_rkey) { + __frwr_reset_mr(ia, mw); + bad_wr = bad_wr->next; + } - memset(invalidate_wr, 0, sizeof(*invalidate_wr)); - frmr->fr_cqe.done = frwr_wc_localinv; - invalidate_wr->wr_cqe = &frmr->fr_cqe; - invalidate_wr->opcode = IB_WR_LOCAL_INV; - invalidate_wr->ex.invalidate_rkey = frmr->fr_mr->rkey; - DECR_CQCOUNT(&r_xprt->rx_ep); + i += seg->mr_nsegs; + } + goto unmap; +} - ib_dma_unmap_sg(ia->ri_device, frmr->sg, frmr->sg_nents, seg1->mr_dir); - read_lock(&ia->ri_qplock); - rc = ib_post_send(ia->ri_id->qp, invalidate_wr, &bad_wr); - read_unlock(&ia->ri_qplock); - if (rc) - goto out_err; +/* Use a slow, safe mechanism to invalidate all memory regions + * that were registered for "req". + */ +static void +frwr_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, + bool sync) +{ + struct rpcrdma_mr_seg *seg; + struct rpcrdma_mw *mw; + unsigned int i; - rpcrdma_put_mw(r_xprt, mw); - return nsegs; + for (i = 0; req->rl_nchunks; req->rl_nchunks--) { + seg = &req->rl_segments[i]; + mw = seg->rl_mw; -out_err: - dprintk("RPC: %s: ib_post_send status %i\n", __func__, rc); - __frwr_queue_recovery(mw); - return nsegs; + if (sync) + __frwr_reset_and_unmap(r_xprt, mw); + else + __frwr_queue_recovery(mw); + + i += seg->mr_nsegs; + seg->mr_nsegs = 0; + seg->rl_mw = NULL; + } } static void @@ -643,7 +659,7 @@ frwr_op_destroy(struct rpcrdma_buffer *buf) const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops = { .ro_map = frwr_op_map, .ro_unmap_sync = frwr_op_unmap_sync, - .ro_unmap = frwr_op_unmap, + .ro_unmap_safe = frwr_op_unmap_safe, .ro_open = frwr_op_open, .ro_maxpages = frwr_op_maxpages, .ro_init = frwr_op_init, diff --git a/net/sunrpc/xprtrdma/physical_ops.c b/net/sunrpc/xprtrdma/physical_ops.c index 481b9b6f4..3750596cc 100644 --- a/net/sunrpc/xprtrdma/physical_ops.c +++ b/net/sunrpc/xprtrdma/physical_ops.c @@ -36,8 +36,11 @@ physical_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep, __func__, PTR_ERR(mr)); return -ENOMEM; } - ia->ri_dma_mr = mr; + + rpcrdma_set_max_header_sizes(ia, cdata, min_t(unsigned int, + RPCRDMA_MAX_DATA_SEGS, + RPCRDMA_MAX_HDR_SEGS)); return 0; } @@ -47,7 +50,7 @@ static size_t physical_op_maxpages(struct rpcrdma_xprt *r_xprt) { return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS, - rpcrdma_max_segments(r_xprt)); + RPCRDMA_MAX_HDR_SEGS); } static int @@ -71,17 +74,6 @@ physical_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, return 1; } -/* Unmap a memory region, but leave it registered. - */ -static int -physical_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg) -{ - struct rpcrdma_ia *ia = &r_xprt->rx_ia; - - rpcrdma_unmap_one(ia->ri_device, seg); - return 1; -} - /* DMA unmap all memory regions that were mapped for "req". */ static void @@ -94,6 +86,25 @@ physical_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) rpcrdma_unmap_one(device, &req->rl_segments[i++]); } +/* Use a slow, safe mechanism to invalidate all memory regions + * that were registered for "req". + * + * For physical memory registration, there is no good way to + * fence a single MR that has been advertised to the server. The + * client has already handed the server an R_key that cannot be + * invalidated and is shared by all MRs on this connection. + * Tearing down the PD might be the only safe choice, but it's + * not clear that a freshly acquired DMA R_key would be different + * than the one used by the PD that was just destroyed. + * FIXME. + */ +static void +physical_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, + bool sync) +{ + physical_op_unmap_sync(r_xprt, req); +} + static void physical_op_destroy(struct rpcrdma_buffer *buf) { @@ -102,7 +113,7 @@ physical_op_destroy(struct rpcrdma_buffer *buf) const struct rpcrdma_memreg_ops rpcrdma_physical_memreg_ops = { .ro_map = physical_op_map, .ro_unmap_sync = physical_op_unmap_sync, - .ro_unmap = physical_op_unmap, + .ro_unmap_safe = physical_op_unmap_safe, .ro_open = physical_op_open, .ro_maxpages = physical_op_maxpages, .ro_init = physical_op_init, diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index 888823bb6..35a81096e 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -61,26 +61,84 @@ enum rpcrdma_chunktype { rpcrdma_replych }; -#if IS_ENABLED(CONFIG_SUNRPC_DEBUG) static const char transfertypes[][12] = { - "pure inline", /* no chunks */ - " read chunk", /* some argument via rdma read */ - "*read chunk", /* entire request via rdma read */ - "write chunk", /* some result via rdma write */ + "inline", /* no chunks */ + "read list", /* some argument via rdma read */ + "*read list", /* entire request via rdma read */ + "write list", /* some result via rdma write */ "reply chunk" /* entire reply via rdma write */ }; -#endif + +/* Returns size of largest RPC-over-RDMA header in a Call message + * + * The largest Call header contains a full-size Read list and a + * minimal Reply chunk. + */ +static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs) +{ + unsigned int size; + + /* Fixed header fields and list discriminators */ + size = RPCRDMA_HDRLEN_MIN; + + /* Maximum Read list size */ + maxsegs += 2; /* segment for head and tail buffers */ + size = maxsegs * sizeof(struct rpcrdma_read_chunk); + + /* Minimal Read chunk size */ + size += sizeof(__be32); /* segment count */ + size += sizeof(struct rpcrdma_segment); + size += sizeof(__be32); /* list discriminator */ + + dprintk("RPC: %s: max call header size = %u\n", + __func__, size); + return size; +} + +/* Returns size of largest RPC-over-RDMA header in a Reply message + * + * There is only one Write list or one Reply chunk per Reply + * message. The larger list is the Write list. + */ +static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs) +{ + unsigned int size; + + /* Fixed header fields and list discriminators */ + size = RPCRDMA_HDRLEN_MIN; + + /* Maximum Write list size */ + maxsegs += 2; /* segment for head and tail buffers */ + size = sizeof(__be32); /* segment count */ + size += maxsegs * sizeof(struct rpcrdma_segment); + size += sizeof(__be32); /* list discriminator */ + + dprintk("RPC: %s: max reply header size = %u\n", + __func__, size); + return size; +} + +void rpcrdma_set_max_header_sizes(struct rpcrdma_ia *ia, + struct rpcrdma_create_data_internal *cdata, + unsigned int maxsegs) +{ + ia->ri_max_inline_write = cdata->inline_wsize - + rpcrdma_max_call_header_size(maxsegs); + ia->ri_max_inline_read = cdata->inline_rsize - + rpcrdma_max_reply_header_size(maxsegs); +} /* The client can send a request inline as long as the RPCRDMA header * plus the RPC call fit under the transport's inline limit. If the * combined call message size exceeds that limit, the client must use * the read chunk list for this operation. */ -static bool rpcrdma_args_inline(struct rpc_rqst *rqst) +static bool rpcrdma_args_inline(struct rpcrdma_xprt *r_xprt, + struct rpc_rqst *rqst) { - unsigned int callsize = RPCRDMA_HDRLEN_MIN + rqst->rq_snd_buf.len; + struct rpcrdma_ia *ia = &r_xprt->rx_ia; - return callsize <= RPCRDMA_INLINE_WRITE_THRESHOLD(rqst); + return rqst->rq_snd_buf.len <= ia->ri_max_inline_write; } /* The client can't know how large the actual reply will be. Thus it @@ -89,11 +147,12 @@ static bool rpcrdma_args_inline(struct rpc_rqst *rqst) * limit, the client must provide a write list or a reply chunk for * this request. */ -static bool rpcrdma_results_inline(struct rpc_rqst *rqst) +static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt, + struct rpc_rqst *rqst) { - unsigned int repsize = RPCRDMA_HDRLEN_MIN + rqst->rq_rcv_buf.buflen; + struct rpcrdma_ia *ia = &r_xprt->rx_ia; - return repsize <= RPCRDMA_INLINE_READ_THRESHOLD(rqst); + return rqst->rq_rcv_buf.buflen <= ia->ri_max_inline_read; } static int @@ -226,23 +285,16 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos, return n; } -/* - * Create read/write chunk lists, and reply chunks, for RDMA - * - * Assume check against THRESHOLD has been done, and chunks are required. - * Assume only encoding one list entry for read|write chunks. The NFSv3 - * protocol is simple enough to allow this as it only has a single "bulk - * result" in each procedure - complicated NFSv4 COMPOUNDs are not. (The - * RDMA/Sessions NFSv4 proposal addresses this for future v4 revs.) - * - * When used for a single reply chunk (which is a special write - * chunk used for the entire reply, rather than just the data), it - * is used primarily for READDIR and READLINK which would otherwise - * be severely size-limited by a small rdma inline read max. The server - * response will come back as an RDMA Write, followed by a message - * of type RDMA_NOMSG carrying the xid and length. As a result, reply - * chunks do not provide data alignment, however they do not require - * "fixup" (moving the response to the upper layer buffer) either. +static inline __be32 * +xdr_encode_rdma_segment(__be32 *iptr, struct rpcrdma_mr_seg *seg) +{ + *iptr++ = cpu_to_be32(seg->mr_rkey); + *iptr++ = cpu_to_be32(seg->mr_len); + return xdr_encode_hyper(iptr, seg->mr_base); +} + +/* XDR-encode the Read list. Supports encoding a list of read + * segments that belong to a single read chunk. * * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64): * @@ -250,131 +302,190 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos, * N elements, position P (same P for all chunks of same arg!): * 1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0 * + * Returns a pointer to the XDR word in the RDMA header following + * the end of the Read list, or an error pointer. + */ +static __be32 * +rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_req *req, struct rpc_rqst *rqst, + __be32 *iptr, enum rpcrdma_chunktype rtype) +{ + struct rpcrdma_mr_seg *seg = req->rl_nextseg; + unsigned int pos; + int n, nsegs; + + if (rtype == rpcrdma_noch) { + *iptr++ = xdr_zero; /* item not present */ + return iptr; + } + + pos = rqst->rq_snd_buf.head[0].iov_len; + if (rtype == rpcrdma_areadch) + pos = 0; + nsegs = rpcrdma_convert_iovs(&rqst->rq_snd_buf, pos, rtype, seg, + RPCRDMA_MAX_SEGS - req->rl_nchunks); + if (nsegs < 0) + return ERR_PTR(nsegs); + + do { + n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, false); + if (n <= 0) + return ERR_PTR(n); + + *iptr++ = xdr_one; /* item present */ + + /* All read segments in this chunk + * have the same "position". + */ + *iptr++ = cpu_to_be32(pos); + iptr = xdr_encode_rdma_segment(iptr, seg); + + dprintk("RPC: %5u %s: read segment pos %u " + "%d@0x%016llx:0x%08x (%s)\n", + rqst->rq_task->tk_pid, __func__, pos, + seg->mr_len, (unsigned long long)seg->mr_base, + seg->mr_rkey, n < nsegs ? "more" : "last"); + + r_xprt->rx_stats.read_chunk_count++; + req->rl_nchunks++; + seg += n; + nsegs -= n; + } while (nsegs); + req->rl_nextseg = seg; + + /* Finish Read list */ + *iptr++ = xdr_zero; /* Next item not present */ + return iptr; +} + +/* XDR-encode the Write list. Supports encoding a list containing + * one array of plain segments that belong to a single write chunk. + * + * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64): + * * Write chunklist (a list of (one) counted array): * N elements: * 1 - N - HLOO - HLOO - ... - HLOO - 0 * + * Returns a pointer to the XDR word in the RDMA header following + * the end of the Write list, or an error pointer. + */ +static __be32 * +rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, + struct rpc_rqst *rqst, __be32 *iptr, + enum rpcrdma_chunktype wtype) +{ + struct rpcrdma_mr_seg *seg = req->rl_nextseg; + int n, nsegs, nchunks; + __be32 *segcount; + + if (wtype != rpcrdma_writech) { + *iptr++ = xdr_zero; /* no Write list present */ + return iptr; + } + + nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf, + rqst->rq_rcv_buf.head[0].iov_len, + wtype, seg, + RPCRDMA_MAX_SEGS - req->rl_nchunks); + if (nsegs < 0) + return ERR_PTR(nsegs); + + *iptr++ = xdr_one; /* Write list present */ + segcount = iptr++; /* save location of segment count */ + + nchunks = 0; + do { + n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, true); + if (n <= 0) + return ERR_PTR(n); + + iptr = xdr_encode_rdma_segment(iptr, seg); + + dprintk("RPC: %5u %s: write segment " + "%d@0x016%llx:0x%08x (%s)\n", + rqst->rq_task->tk_pid, __func__, + seg->mr_len, (unsigned long long)seg->mr_base, + seg->mr_rkey, n < nsegs ? "more" : "last"); + + r_xprt->rx_stats.write_chunk_count++; + r_xprt->rx_stats.total_rdma_request += seg->mr_len; + req->rl_nchunks++; + nchunks++; + seg += n; + nsegs -= n; + } while (nsegs); + req->rl_nextseg = seg; + + /* Update count of segments in this Write chunk */ + *segcount = cpu_to_be32(nchunks); + + /* Finish Write list */ + *iptr++ = xdr_zero; /* Next item not present */ + return iptr; +} + +/* XDR-encode the Reply chunk. Supports encoding an array of plain + * segments that belong to a single write (reply) chunk. + * + * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64): + * * Reply chunk (a counted array): * N elements: * 1 - N - HLOO - HLOO - ... - HLOO * - * Returns positive RPC/RDMA header size, or negative errno. + * Returns a pointer to the XDR word in the RDMA header following + * the end of the Reply chunk, or an error pointer. */ - -static ssize_t -rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target, - struct rpcrdma_msg *headerp, enum rpcrdma_chunktype type) +static __be32 * +rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_req *req, struct rpc_rqst *rqst, + __be32 *iptr, enum rpcrdma_chunktype wtype) { - struct rpcrdma_req *req = rpcr_to_rdmar(rqst); - struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt); - int n, nsegs, nchunks = 0; - unsigned int pos; - struct rpcrdma_mr_seg *seg = req->rl_segments; - struct rpcrdma_read_chunk *cur_rchunk = NULL; - struct rpcrdma_write_array *warray = NULL; - struct rpcrdma_write_chunk *cur_wchunk = NULL; - __be32 *iptr = headerp->rm_body.rm_chunks; - int (*map)(struct rpcrdma_xprt *, struct rpcrdma_mr_seg *, int, bool); - - if (type == rpcrdma_readch || type == rpcrdma_areadch) { - /* a read chunk - server will RDMA Read our memory */ - cur_rchunk = (struct rpcrdma_read_chunk *) iptr; - } else { - /* a write or reply chunk - server will RDMA Write our memory */ - *iptr++ = xdr_zero; /* encode a NULL read chunk list */ - if (type == rpcrdma_replych) - *iptr++ = xdr_zero; /* a NULL write chunk list */ - warray = (struct rpcrdma_write_array *) iptr; - cur_wchunk = (struct rpcrdma_write_chunk *) (warray + 1); - } + struct rpcrdma_mr_seg *seg = req->rl_nextseg; + int n, nsegs, nchunks; + __be32 *segcount; - if (type == rpcrdma_replych || type == rpcrdma_areadch) - pos = 0; - else - pos = target->head[0].iov_len; + if (wtype != rpcrdma_replych) { + *iptr++ = xdr_zero; /* no Reply chunk present */ + return iptr; + } - nsegs = rpcrdma_convert_iovs(target, pos, type, seg, RPCRDMA_MAX_SEGS); + nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf, 0, wtype, seg, + RPCRDMA_MAX_SEGS - req->rl_nchunks); if (nsegs < 0) - return nsegs; + return ERR_PTR(nsegs); - map = r_xprt->rx_ia.ri_ops->ro_map; + *iptr++ = xdr_one; /* Reply chunk present */ + segcount = iptr++; /* save location of segment count */ + + nchunks = 0; do { - n = map(r_xprt, seg, nsegs, cur_wchunk != NULL); + n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, true); if (n <= 0) - goto out; - if (cur_rchunk) { /* read */ - cur_rchunk->rc_discrim = xdr_one; - /* all read chunks have the same "position" */ - cur_rchunk->rc_position = cpu_to_be32(pos); - cur_rchunk->rc_target.rs_handle = - cpu_to_be32(seg->mr_rkey); - cur_rchunk->rc_target.rs_length = - cpu_to_be32(seg->mr_len); - xdr_encode_hyper( - (__be32 *)&cur_rchunk->rc_target.rs_offset, - seg->mr_base); - dprintk("RPC: %s: read chunk " - "elem %d@0x%llx:0x%x pos %u (%s)\n", __func__, - seg->mr_len, (unsigned long long)seg->mr_base, - seg->mr_rkey, pos, n < nsegs ? "more" : "last"); - cur_rchunk++; - r_xprt->rx_stats.read_chunk_count++; - } else { /* write/reply */ - cur_wchunk->wc_target.rs_handle = - cpu_to_be32(seg->mr_rkey); - cur_wchunk->wc_target.rs_length = - cpu_to_be32(seg->mr_len); - xdr_encode_hyper( - (__be32 *)&cur_wchunk->wc_target.rs_offset, - seg->mr_base); - dprintk("RPC: %s: %s chunk " - "elem %d@0x%llx:0x%x (%s)\n", __func__, - (type == rpcrdma_replych) ? "reply" : "write", - seg->mr_len, (unsigned long long)seg->mr_base, - seg->mr_rkey, n < nsegs ? "more" : "last"); - cur_wchunk++; - if (type == rpcrdma_replych) - r_xprt->rx_stats.reply_chunk_count++; - else - r_xprt->rx_stats.write_chunk_count++; - r_xprt->rx_stats.total_rdma_request += seg->mr_len; - } + return ERR_PTR(n); + + iptr = xdr_encode_rdma_segment(iptr, seg); + + dprintk("RPC: %5u %s: reply segment " + "%d@0x%016llx:0x%08x (%s)\n", + rqst->rq_task->tk_pid, __func__, + seg->mr_len, (unsigned long long)seg->mr_base, + seg->mr_rkey, n < nsegs ? "more" : "last"); + + r_xprt->rx_stats.reply_chunk_count++; + r_xprt->rx_stats.total_rdma_request += seg->mr_len; + req->rl_nchunks++; nchunks++; seg += n; nsegs -= n; } while (nsegs); + req->rl_nextseg = seg; - /* success. all failures return above */ - req->rl_nchunks = nchunks; - - /* - * finish off header. If write, marshal discrim and nchunks. - */ - if (cur_rchunk) { - iptr = (__be32 *) cur_rchunk; - *iptr++ = xdr_zero; /* finish the read chunk list */ - *iptr++ = xdr_zero; /* encode a NULL write chunk list */ - *iptr++ = xdr_zero; /* encode a NULL reply chunk */ - } else { - warray->wc_discrim = xdr_one; - warray->wc_nchunks = cpu_to_be32(nchunks); - iptr = (__be32 *) cur_wchunk; - if (type == rpcrdma_writech) { - *iptr++ = xdr_zero; /* finish the write chunk list */ - *iptr++ = xdr_zero; /* encode a NULL reply chunk */ - } - } - - /* - * Return header size. - */ - return (unsigned char *)iptr - (unsigned char *)headerp; + /* Update count of segments in the Reply chunk */ + *segcount = cpu_to_be32(nchunks); -out: - for (pos = 0; nchunks--;) - pos += r_xprt->rx_ia.ri_ops->ro_unmap(r_xprt, - &req->rl_segments[pos]); - return n; + return iptr; } /* @@ -440,13 +551,10 @@ static void rpcrdma_inline_pullup(struct rpc_rqst *rqst) * Marshal a request: the primary job of this routine is to choose * the transfer modes. See comments below. * - * Uses multiple RDMA IOVs for a request: - * [0] -- RPC RDMA header, which uses memory from the *start* of the - * preregistered buffer that already holds the RPC data in - * its middle. - * [1] -- the RPC header/data, marshaled by RPC and the NFS protocol. - * [2] -- optional padding. - * [3] -- if padded, header only in [1] and data here. + * Prepares up to two IOVs per Call message: + * + * [0] -- RPC RDMA header + * [1] -- the RPC header/data * * Returns zero on success, otherwise a negative errno. */ @@ -457,24 +565,17 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) struct rpc_xprt *xprt = rqst->rq_xprt; struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); struct rpcrdma_req *req = rpcr_to_rdmar(rqst); - char *base; - size_t rpclen; - ssize_t hdrlen; enum rpcrdma_chunktype rtype, wtype; struct rpcrdma_msg *headerp; + ssize_t hdrlen; + size_t rpclen; + __be32 *iptr; #if defined(CONFIG_SUNRPC_BACKCHANNEL) if (test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state)) return rpcrdma_bc_marshal_reply(rqst); #endif - /* - * rpclen gets amount of data in first buffer, which is the - * pre-registered buffer. - */ - base = rqst->rq_svec[0].iov_base; - rpclen = rqst->rq_svec[0].iov_len; - headerp = rdmab_to_msg(req->rl_rdmabuf); /* don't byte-swap XID, it's already done in request */ headerp->rm_xid = rqst->rq_xid; @@ -485,15 +586,16 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) /* * Chunks needed for results? * - * o Read ops return data as write chunk(s), header as inline. * o If the expected result is under the inline threshold, all ops * return as inline. + * o Large read ops return data as write chunk(s), header as + * inline. * o Large non-read ops return as a single reply chunk. */ - if (rqst->rq_rcv_buf.flags & XDRBUF_READ) - wtype = rpcrdma_writech; - else if (rpcrdma_results_inline(rqst)) + if (rpcrdma_results_inline(r_xprt, rqst)) wtype = rpcrdma_noch; + else if (rqst->rq_rcv_buf.flags & XDRBUF_READ) + wtype = rpcrdma_writech; else wtype = rpcrdma_replych; @@ -511,10 +613,14 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) * that both has a data payload, and whose non-data arguments * by themselves are larger than the inline threshold. */ - if (rpcrdma_args_inline(rqst)) { + if (rpcrdma_args_inline(r_xprt, rqst)) { rtype = rpcrdma_noch; + rpcrdma_inline_pullup(rqst); + rpclen = rqst->rq_svec[0].iov_len; } else if (rqst->rq_snd_buf.flags & XDRBUF_WRITE) { rtype = rpcrdma_readch; + rpclen = rqst->rq_svec[0].iov_len; + rpclen += rpcrdma_tail_pullup(&rqst->rq_snd_buf); } else { r_xprt->rx_stats.nomsg_call_count++; headerp->rm_type = htonl(RDMA_NOMSG); @@ -522,57 +628,50 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) rpclen = 0; } - /* The following simplification is not true forever */ - if (rtype != rpcrdma_noch && wtype == rpcrdma_replych) - wtype = rpcrdma_noch; - if (rtype != rpcrdma_noch && wtype != rpcrdma_noch) { - dprintk("RPC: %s: cannot marshal multiple chunk lists\n", - __func__); - return -EIO; - } - - hdrlen = RPCRDMA_HDRLEN_MIN; - - /* - * Pull up any extra send data into the preregistered buffer. - * When padding is in use and applies to the transfer, insert - * it and change the message type. + /* This implementation supports the following combinations + * of chunk lists in one RPC-over-RDMA Call message: + * + * - Read list + * - Write list + * - Reply chunk + * - Read list + Reply chunk + * + * It might not yet support the following combinations: + * + * - Read list + Write list + * + * It does not support the following combinations: + * + * - Write list + Reply chunk + * - Read list + Write list + Reply chunk + * + * This implementation supports only a single chunk in each + * Read or Write list. Thus for example the client cannot + * send a Call message with a Position Zero Read chunk and a + * regular Read chunk at the same time. */ - if (rtype == rpcrdma_noch) { - - rpcrdma_inline_pullup(rqst); - - headerp->rm_body.rm_nochunks.rm_empty[0] = xdr_zero; - headerp->rm_body.rm_nochunks.rm_empty[1] = xdr_zero; - headerp->rm_body.rm_nochunks.rm_empty[2] = xdr_zero; - /* new length after pullup */ - rpclen = rqst->rq_svec[0].iov_len; - } else if (rtype == rpcrdma_readch) - rpclen += rpcrdma_tail_pullup(&rqst->rq_snd_buf); - if (rtype != rpcrdma_noch) { - hdrlen = rpcrdma_create_chunks(rqst, &rqst->rq_snd_buf, - headerp, rtype); - wtype = rtype; /* simplify dprintk */ - - } else if (wtype != rpcrdma_noch) { - hdrlen = rpcrdma_create_chunks(rqst, &rqst->rq_rcv_buf, - headerp, wtype); - } - if (hdrlen < 0) - return hdrlen; + req->rl_nchunks = 0; + req->rl_nextseg = req->rl_segments; + iptr = headerp->rm_body.rm_chunks; + iptr = rpcrdma_encode_read_list(r_xprt, req, rqst, iptr, rtype); + if (IS_ERR(iptr)) + goto out_unmap; + iptr = rpcrdma_encode_write_list(r_xprt, req, rqst, iptr, wtype); + if (IS_ERR(iptr)) + goto out_unmap; + iptr = rpcrdma_encode_reply_chunk(r_xprt, req, rqst, iptr, wtype); + if (IS_ERR(iptr)) + goto out_unmap; + hdrlen = (unsigned char *)iptr - (unsigned char *)headerp; + + if (hdrlen + rpclen > RPCRDMA_INLINE_WRITE_THRESHOLD(rqst)) + goto out_overflow; + + dprintk("RPC: %5u %s: %s/%s: hdrlen %zd rpclen %zd\n", + rqst->rq_task->tk_pid, __func__, + transfertypes[rtype], transfertypes[wtype], + hdrlen, rpclen); - dprintk("RPC: %s: %s: hdrlen %zd rpclen %zd" - " headerp 0x%p base 0x%p lkey 0x%x\n", - __func__, transfertypes[wtype], hdrlen, rpclen, - headerp, base, rdmab_lkey(req->rl_rdmabuf)); - - /* - * initialize send_iov's - normally only two: rdma chunk header and - * single preregistered RPC header buffer, but if padding is present, - * then use a preregistered (and zeroed) pad buffer between the RPC - * header and any write data. In all non-rdma cases, any following - * data has been copied into the RPC header buffer. - */ req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf); req->rl_send_iov[0].length = hdrlen; req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf); @@ -587,6 +686,18 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) req->rl_niovs = 2; return 0; + +out_overflow: + pr_err("rpcrdma: send overflow: hdrlen %zd rpclen %zu %s/%s\n", + hdrlen, rpclen, transfertypes[rtype], transfertypes[wtype]); + /* Terminate this RPC. Chunks registered above will be + * released by xprt_release -> xprt_rmda_free . + */ + return -EIO; + +out_unmap: + r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req, false); + return PTR_ERR(iptr); } /* diff --git a/net/sunrpc/xprtrdma/svc_rdma_marshal.c b/net/sunrpc/xprtrdma/svc_rdma_marshal.c index 765bca47c..0ba9887f3 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_marshal.c +++ b/net/sunrpc/xprtrdma/svc_rdma_marshal.c @@ -145,19 +145,32 @@ static __be32 *decode_reply_array(__be32 *va, __be32 *vaend) return (__be32 *)&ary->wc_array[nchunks]; } -int svc_rdma_xdr_decode_req(struct rpcrdma_msg *rmsgp, struct svc_rqst *rqstp) +/** + * svc_rdma_xdr_decode_req - Parse incoming RPC-over-RDMA header + * @rq_arg: Receive buffer + * + * On entry, xdr->head[0].iov_base points to first byte in the + * RPC-over-RDMA header. + * + * On successful exit, head[0] points to first byte past the + * RPC-over-RDMA header. For RDMA_MSG, this is the RPC message. + * The length of the RPC-over-RDMA header is returned. + */ +int svc_rdma_xdr_decode_req(struct xdr_buf *rq_arg) { + struct rpcrdma_msg *rmsgp; __be32 *va, *vaend; unsigned int len; u32 hdr_len; /* Verify that there's enough bytes for header + something */ - if (rqstp->rq_arg.len <= RPCRDMA_HDRLEN_ERR) { + if (rq_arg->len <= RPCRDMA_HDRLEN_ERR) { dprintk("svcrdma: header too short = %d\n", - rqstp->rq_arg.len); + rq_arg->len); return -EINVAL; } + rmsgp = (struct rpcrdma_msg *)rq_arg->head[0].iov_base; if (rmsgp->rm_vers != rpcrdma_version) { dprintk("%s: bad version %u\n", __func__, be32_to_cpu(rmsgp->rm_vers)); @@ -189,10 +202,10 @@ int svc_rdma_xdr_decode_req(struct rpcrdma_msg *rmsgp, struct svc_rqst *rqstp) be32_to_cpu(rmsgp->rm_body.rm_padded.rm_thresh); va = &rmsgp->rm_body.rm_padded.rm_pempty[4]; - rqstp->rq_arg.head[0].iov_base = va; + rq_arg->head[0].iov_base = va; len = (u32)((unsigned long)va - (unsigned long)rmsgp); - rqstp->rq_arg.head[0].iov_len -= len; - if (len > rqstp->rq_arg.len) + rq_arg->head[0].iov_len -= len; + if (len > rq_arg->len) return -EINVAL; return len; default: @@ -205,7 +218,7 @@ int svc_rdma_xdr_decode_req(struct rpcrdma_msg *rmsgp, struct svc_rqst *rqstp) * chunk list and a reply chunk list. */ va = &rmsgp->rm_body.rm_chunks[0]; - vaend = (__be32 *)((unsigned long)rmsgp + rqstp->rq_arg.len); + vaend = (__be32 *)((unsigned long)rmsgp + rq_arg->len); va = decode_read_list(va, vaend); if (!va) { dprintk("svcrdma: failed to decode read list\n"); @@ -222,10 +235,9 @@ int svc_rdma_xdr_decode_req(struct rpcrdma_msg *rmsgp, struct svc_rqst *rqstp) return -EINVAL; } - rqstp->rq_arg.head[0].iov_base = va; + rq_arg->head[0].iov_base = va; hdr_len = (unsigned long)va - (unsigned long)rmsgp; - rqstp->rq_arg.head[0].iov_len -= hdr_len; - + rq_arg->head[0].iov_len -= hdr_len; return hdr_len; } diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c index 3b24a646e..2c25606f2 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c +++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c @@ -281,7 +281,7 @@ int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt, } atomic_inc(&xprt->sc_dma_used); - n = ib_map_mr_sg(frmr->mr, frmr->sg, frmr->sg_nents, PAGE_SIZE); + n = ib_map_mr_sg(frmr->mr, frmr->sg, frmr->sg_nents, NULL, PAGE_SIZE); if (unlikely(n != frmr->sg_nents)) { pr_err("svcrdma: failed to map mr %p (%d/%d elements)\n", frmr->mr, n, frmr->sg_nents); @@ -447,10 +447,8 @@ static int rdma_read_chunks(struct svcxprt_rdma *xprt, head->arg.len = rqstp->rq_arg.len; head->arg.buflen = rqstp->rq_arg.buflen; - ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0]; - position = be32_to_cpu(ch->rc_position); - /* RDMA_NOMSG: RDMA READ data should land just after RDMA RECV data */ + position = be32_to_cpu(ch->rc_position); if (position == 0) { head->arg.pages = &head->pages[0]; page_offset = head->byte_len; @@ -488,7 +486,7 @@ static int rdma_read_chunks(struct svcxprt_rdma *xprt, if (page_offset & 3) { u32 pad = 4 - (page_offset & 3); - head->arg.page_len += pad; + head->arg.tail[0].iov_len += pad; head->arg.len += pad; head->arg.buflen += pad; page_offset += pad; @@ -510,11 +508,10 @@ static int rdma_read_chunks(struct svcxprt_rdma *xprt, return ret; } -static int rdma_read_complete(struct svc_rqst *rqstp, - struct svc_rdma_op_ctxt *head) +static void rdma_read_complete(struct svc_rqst *rqstp, + struct svc_rdma_op_ctxt *head) { int page_no; - int ret; /* Copy RPC pages */ for (page_no = 0; page_no < head->count; page_no++) { @@ -550,23 +547,6 @@ static int rdma_read_complete(struct svc_rqst *rqstp, rqstp->rq_arg.tail[0] = head->arg.tail[0]; rqstp->rq_arg.len = head->arg.len; rqstp->rq_arg.buflen = head->arg.buflen; - - /* Free the context */ - svc_rdma_put_context(head, 0); - - /* XXX: What should this be? */ - rqstp->rq_prot = IPPROTO_MAX; - svc_xprt_copy_addrs(rqstp, rqstp->rq_xprt); - - ret = rqstp->rq_arg.head[0].iov_len - + rqstp->rq_arg.page_len - + rqstp->rq_arg.tail[0].iov_len; - dprintk("svcrdma: deferred read ret=%d, rq_arg.len=%u, " - "rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len=%zu\n", - ret, rqstp->rq_arg.len, rqstp->rq_arg.head[0].iov_base, - rqstp->rq_arg.head[0].iov_len); - - return ret; } /* By convention, backchannel calls arrive via rdma_msg type @@ -624,7 +604,8 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp) dto_q); list_del_init(&ctxt->dto_q); spin_unlock_bh(&rdma_xprt->sc_rq_dto_lock); - return rdma_read_complete(rqstp, ctxt); + rdma_read_complete(rqstp, ctxt); + goto complete; } else if (!list_empty(&rdma_xprt->sc_rq_dto_q)) { ctxt = list_entry(rdma_xprt->sc_rq_dto_q.next, struct svc_rdma_op_ctxt, @@ -655,7 +636,7 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp) /* Decode the RDMA header. */ rmsgp = (struct rpcrdma_msg *)rqstp->rq_arg.head[0].iov_base; - ret = svc_rdma_xdr_decode_req(rmsgp, rqstp); + ret = svc_rdma_xdr_decode_req(&rqstp->rq_arg); if (ret < 0) goto out_err; if (ret == 0) @@ -682,6 +663,7 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp) return 0; } +complete: ret = rqstp->rq_arg.head[0].iov_len + rqstp->rq_arg.page_len + rqstp->rq_arg.tail[0].iov_len; diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c index 4f1b1c4f4..54d533300 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c +++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c @@ -463,25 +463,21 @@ static int send_reply(struct svcxprt_rdma *rdma, struct svc_rqst *rqstp, struct page *page, struct rpcrdma_msg *rdma_resp, - struct svc_rdma_op_ctxt *ctxt, struct svc_rdma_req_map *vec, int byte_count) { + struct svc_rdma_op_ctxt *ctxt; struct ib_send_wr send_wr; u32 xdr_off; int sge_no; int sge_bytes; int page_no; int pages; - int ret; - - ret = svc_rdma_repost_recv(rdma, GFP_KERNEL); - if (ret) { - svc_rdma_put_context(ctxt, 0); - return -ENOTCONN; - } + int ret = -EIO; /* Prepare the context */ + ctxt = svc_rdma_get_context(rdma); + ctxt->direction = DMA_TO_DEVICE; ctxt->pages[0] = page; ctxt->count = 1; @@ -565,8 +561,7 @@ static int send_reply(struct svcxprt_rdma *rdma, err: svc_rdma_unmap_dma(ctxt); svc_rdma_put_context(ctxt, 1); - pr_err("svcrdma: failed to send reply, rc=%d\n", ret); - return -EIO; + return ret; } void svc_rdma_prep_reply_hdr(struct svc_rqst *rqstp) @@ -585,7 +580,6 @@ int svc_rdma_sendto(struct svc_rqst *rqstp) int ret; int inline_bytes; struct page *res_page; - struct svc_rdma_op_ctxt *ctxt; struct svc_rdma_req_map *vec; dprintk("svcrdma: sending response for rqstp=%p\n", rqstp); @@ -598,8 +592,6 @@ int svc_rdma_sendto(struct svc_rqst *rqstp) rp_ary = svc_rdma_get_reply_array(rdma_argp, wr_ary); /* Build an req vec for the XDR */ - ctxt = svc_rdma_get_context(rdma); - ctxt->direction = DMA_TO_DEVICE; vec = svc_rdma_get_req_map(rdma); ret = svc_rdma_map_xdr(rdma, &rqstp->rq_res, vec, wr_ary != NULL); if (ret) @@ -635,7 +627,12 @@ int svc_rdma_sendto(struct svc_rqst *rqstp) inline_bytes -= ret; } - ret = send_reply(rdma, rqstp, res_page, rdma_resp, ctxt, vec, + /* Post a fresh Receive buffer _before_ sending the reply */ + ret = svc_rdma_post_recv(rdma, GFP_KERNEL); + if (ret) + goto err1; + + ret = send_reply(rdma, rqstp, res_page, rdma_resp, vec, inline_bytes); if (ret < 0) goto err1; @@ -648,7 +645,8 @@ int svc_rdma_sendto(struct svc_rqst *rqstp) put_page(res_page); err0: svc_rdma_put_req_map(rdma, vec); - svc_rdma_put_context(ctxt, 0); + pr_err("svcrdma: Could not send reply, err=%d. Closing transport.\n", + ret); set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags); return -ENOTCONN; } diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c index 90668969d..dd9440137 100644 --- a/net/sunrpc/xprtrdma/svc_rdma_transport.c +++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c @@ -789,7 +789,7 @@ static struct svc_xprt *svc_rdma_create(struct svc_serv *serv, int ret; dprintk("svcrdma: Creating RDMA socket\n"); - if (sa->sa_family != AF_INET) { + if ((sa->sa_family != AF_INET) && (sa->sa_family != AF_INET6)) { dprintk("svcrdma: Address family %d is not supported.\n", sa->sa_family); return ERR_PTR(-EAFNOSUPPORT); } @@ -805,6 +805,16 @@ static struct svc_xprt *svc_rdma_create(struct svc_serv *serv, goto err0; } + /* Allow both IPv4 and IPv6 sockets to bind a single port + * at the same time. + */ +#if IS_ENABLED(CONFIG_IPV6) + ret = rdma_set_afonly(listen_id, 1); + if (ret) { + dprintk("svcrdma: rdma_set_afonly failed = %d\n", ret); + goto err1; + } +#endif ret = rdma_bind_addr(listen_id, sa); if (ret) { dprintk("svcrdma: rdma_bind_addr failed = %d\n", ret); @@ -1073,7 +1083,7 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) newxprt->sc_dev_caps |= SVCRDMA_DEVCAP_READ_W_INV; /* Post receive buffers */ - for (i = 0; i < newxprt->sc_rq_depth; i++) { + for (i = 0; i < newxprt->sc_max_requests; i++) { ret = svc_rdma_post_recv(newxprt, GFP_KERNEL); if (ret) { dprintk("svcrdma: failure posting receive buffers\n"); @@ -1170,6 +1180,9 @@ static void __svc_rdma_free(struct work_struct *work) dprintk("svcrdma: %s(%p)\n", __func__, rdma); + if (rdma->sc_qp && !IS_ERR(rdma->sc_qp)) + ib_drain_qp(rdma->sc_qp); + /* We should only be called from kref_put */ if (atomic_read(&xprt->xpt_ref.refcount) != 0) pr_err("svcrdma: sc_xprt still in use? (%d)\n", diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c index b1b009f10..99d2e5b72 100644 --- a/net/sunrpc/xprtrdma/transport.c +++ b/net/sunrpc/xprtrdma/transport.c @@ -73,6 +73,8 @@ static unsigned int xprt_rdma_memreg_strategy = RPCRDMA_FRMR; static unsigned int min_slot_table_size = RPCRDMA_MIN_SLOT_TABLE; static unsigned int max_slot_table_size = RPCRDMA_MAX_SLOT_TABLE; +static unsigned int min_inline_size = RPCRDMA_MIN_INLINE; +static unsigned int max_inline_size = RPCRDMA_MAX_INLINE; static unsigned int zero; static unsigned int max_padding = PAGE_SIZE; static unsigned int min_memreg = RPCRDMA_BOUNCEBUFFERS; @@ -96,6 +98,8 @@ static struct ctl_table xr_tunables_table[] = { .maxlen = sizeof(unsigned int), .mode = 0644, .proc_handler = proc_dointvec, + .extra1 = &min_inline_size, + .extra2 = &max_inline_size, }, { .procname = "rdma_max_inline_write", @@ -103,6 +107,8 @@ static struct ctl_table xr_tunables_table[] = { .maxlen = sizeof(unsigned int), .mode = 0644, .proc_handler = proc_dointvec, + .extra1 = &min_inline_size, + .extra2 = &max_inline_size, }, { .procname = "rdma_inline_write_padding", @@ -508,6 +514,7 @@ xprt_rdma_allocate(struct rpc_task *task, size_t size) out: dprintk("RPC: %s: size %zd, request 0x%p\n", __func__, size, req); req->rl_connect_cookie = 0; /* our reserved value */ + req->rl_task = task; return req->rl_sendbuf->rg_base; out_rdmabuf: @@ -564,7 +571,6 @@ xprt_rdma_free(void *buffer) struct rpcrdma_req *req; struct rpcrdma_xprt *r_xprt; struct rpcrdma_regbuf *rb; - int i; if (buffer == NULL) return; @@ -578,11 +584,8 @@ xprt_rdma_free(void *buffer) dprintk("RPC: %s: called on 0x%p\n", __func__, req->rl_reply); - for (i = 0; req->rl_nchunks;) { - --req->rl_nchunks; - i += r_xprt->rx_ia.ri_ops->ro_unmap(r_xprt, - &req->rl_segments[i]); - } + r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req, + !RPC_IS_ASYNC(req->rl_task)); rpcrdma_buffer_put(req); } @@ -707,6 +710,7 @@ static struct rpc_xprt_ops xprt_rdma_procs = { #if defined(CONFIG_SUNRPC_BACKCHANNEL) .bc_setup = xprt_rdma_bc_setup, .bc_up = xprt_rdma_bc_up, + .bc_maxpayload = xprt_rdma_bc_maxpayload, .bc_free_rqst = xprt_rdma_bc_free_rqst, .bc_destroy = xprt_rdma_bc_destroy, #endif diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index f5ed9f982..b044d98a1 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -203,15 +203,6 @@ out_fail: goto out_schedule; } -static void -rpcrdma_flush_cqs(struct rpcrdma_ep *ep) -{ - struct ib_wc wc; - - while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0) - rpcrdma_receive_wc(NULL, &wc); -} - static int rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event) { @@ -374,23 +365,6 @@ out: } /* - * Drain any cq, prior to teardown. - */ -static void -rpcrdma_clean_cq(struct ib_cq *cq) -{ - struct ib_wc wc; - int count = 0; - - while (1 == ib_poll_cq(cq, 1, &wc)) - ++count; - - if (count) - dprintk("RPC: %s: flushed %d events (last 0x%x)\n", - __func__, count, wc.opcode); -} - -/* * Exported functions. */ @@ -459,7 +433,6 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg) dprintk("RPC: %s: memory registration strategy is '%s'\n", __func__, ia->ri_ops->ro_displayname); - rwlock_init(&ia->ri_qplock); return 0; out3: @@ -515,7 +488,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, __func__); return -ENOMEM; } - max_qp_wr = ia->ri_device->attrs.max_qp_wr - RPCRDMA_BACKWARD_WRS; + max_qp_wr = ia->ri_device->attrs.max_qp_wr - RPCRDMA_BACKWARD_WRS - 1; /* check provider's send/recv wr limits */ if (cdata->max_requests > max_qp_wr) @@ -526,11 +499,13 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, ep->rep_attr.srq = NULL; ep->rep_attr.cap.max_send_wr = cdata->max_requests; ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS; + ep->rep_attr.cap.max_send_wr += 1; /* drain cqe */ rc = ia->ri_ops->ro_open(ia, ep, cdata); if (rc) return rc; ep->rep_attr.cap.max_recv_wr = cdata->max_requests; ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS; + ep->rep_attr.cap.max_recv_wr += 1; /* drain cqe */ ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS; ep->rep_attr.cap.max_recv_sge = 1; ep->rep_attr.cap.max_inline_data = 0; @@ -578,6 +553,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, ep->rep_attr.recv_cq = recvcq; /* Initialize cma parameters */ + memset(&ep->rep_remote_cma, 0, sizeof(ep->rep_remote_cma)); /* RPC/RDMA does not use private data */ ep->rep_remote_cma.private_data = NULL; @@ -591,7 +567,16 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, ep->rep_remote_cma.responder_resources = ia->ri_device->attrs.max_qp_rd_atom; - ep->rep_remote_cma.retry_count = 7; + /* Limit transport retries so client can detect server + * GID changes quickly. RPC layer handles re-establishing + * transport connection and retransmission. + */ + ep->rep_remote_cma.retry_count = 6; + + /* RPC-over-RDMA handles its own flow control. In addition, + * make all RNR NAKs visible so we know that RPC-over-RDMA + * flow control is working correctly (no NAKs should be seen). + */ ep->rep_remote_cma.flow_control = 0; ep->rep_remote_cma.rnr_retry_count = 0; @@ -622,13 +607,8 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) cancel_delayed_work_sync(&ep->rep_connect_worker); - if (ia->ri_id->qp) - rpcrdma_ep_disconnect(ep, ia); - - rpcrdma_clean_cq(ep->rep_attr.recv_cq); - rpcrdma_clean_cq(ep->rep_attr.send_cq); - if (ia->ri_id->qp) { + rpcrdma_ep_disconnect(ep, ia); rdma_destroy_qp(ia->ri_id); ia->ri_id->qp = NULL; } @@ -659,7 +639,6 @@ retry: dprintk("RPC: %s: reconnecting...\n", __func__); rpcrdma_ep_disconnect(ep, ia); - rpcrdma_flush_cqs(ep); xprt = container_of(ia, struct rpcrdma_xprt, rx_ia); id = rpcrdma_create_id(xprt, ia, @@ -692,10 +671,8 @@ retry: goto out; } - write_lock(&ia->ri_qplock); old = ia->ri_id; ia->ri_id = id; - write_unlock(&ia->ri_qplock); rdma_destroy_qp(old); rpcrdma_destroy_id(old); @@ -785,7 +762,6 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) { int rc; - rpcrdma_flush_cqs(ep); rc = rdma_disconnect(ia->ri_id); if (!rc) { /* returns without wait if not connected */ @@ -797,6 +773,8 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) dprintk("RPC: %s: rdma_disconnect %i\n", __func__, rc); ep->rep_connected = rc; } + + ib_drain_qp(ia->ri_id->qp); } struct rpcrdma_req * @@ -1271,25 +1249,3 @@ out_rc: rpcrdma_recv_buffer_put(rep); return rc; } - -/* How many chunk list items fit within our inline buffers? - */ -unsigned int -rpcrdma_max_segments(struct rpcrdma_xprt *r_xprt) -{ - struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; - int bytes, segments; - - bytes = min_t(unsigned int, cdata->inline_wsize, cdata->inline_rsize); - bytes -= RPCRDMA_HDRLEN_MIN; - if (bytes < sizeof(struct rpcrdma_segment) * 2) { - pr_warn("RPC: %s: inline threshold too small\n", - __func__); - return 0; - } - - segments = 1 << (fls(bytes / sizeof(struct rpcrdma_segment)) - 1); - dprintk("RPC: %s: max chunk list size = %d segments\n", - __func__, segments); - return segments; -} diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index 2ebc743cb..95cdc6622 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -65,7 +65,6 @@ */ struct rpcrdma_ia { const struct rpcrdma_memreg_ops *ri_ops; - rwlock_t ri_qplock; struct ib_device *ri_device; struct rdma_cm_id *ri_id; struct ib_pd *ri_pd; @@ -73,6 +72,8 @@ struct rpcrdma_ia { struct completion ri_done; int ri_async_rc; unsigned int ri_max_frmr_depth; + unsigned int ri_max_inline_write; + unsigned int ri_max_inline_read; struct ib_qp_attr ri_qp_attr; struct ib_qp_init_attr ri_qp_init_attr; }; @@ -144,6 +145,26 @@ rdmab_to_msg(struct rpcrdma_regbuf *rb) #define RPCRDMA_DEF_GFP (GFP_NOIO | __GFP_NOWARN) +/* To ensure a transport can always make forward progress, + * the number of RDMA segments allowed in header chunk lists + * is capped at 8. This prevents less-capable devices and + * memory registrations from overrunning the Send buffer + * while building chunk lists. + * + * Elements of the Read list take up more room than the + * Write list or Reply chunk. 8 read segments means the Read + * list (or Write list or Reply chunk) cannot consume more + * than + * + * ((8 + 2) * read segment size) + 1 XDR words, or 244 bytes. + * + * And the fixed part of the header is another 24 bytes. + * + * The smallest inline threshold is 1024 bytes, ensuring that + * at least 750 bytes are available for RPC messages. + */ +#define RPCRDMA_MAX_HDR_SEGS (8) + /* * struct rpcrdma_rep -- this structure encapsulates state required to recv * and complete a reply, asychronously. It needs several pieces of @@ -162,7 +183,9 @@ rdmab_to_msg(struct rpcrdma_regbuf *rb) */ #define RPCRDMA_MAX_DATA_SEGS ((1 * 1024 * 1024) / PAGE_SIZE) -#define RPCRDMA_MAX_SEGS (RPCRDMA_MAX_DATA_SEGS + 2) /* head+tail = 2 */ + +/* data segments + head/tail for Call + head/tail for Reply */ +#define RPCRDMA_MAX_SEGS (RPCRDMA_MAX_DATA_SEGS + 4) struct rpcrdma_buffer; @@ -198,14 +221,13 @@ enum rpcrdma_frmr_state { }; struct rpcrdma_frmr { - struct scatterlist *sg; - int sg_nents; + struct scatterlist *fr_sg; + int fr_nents; + enum dma_data_direction fr_dir; struct ib_mr *fr_mr; struct ib_cqe fr_cqe; enum rpcrdma_frmr_state fr_state; struct completion fr_linv_done; - struct work_struct fr_work; - struct rpcrdma_xprt *fr_xprt; union { struct ib_reg_wr fr_regwr; struct ib_send_wr fr_invwr; @@ -222,6 +244,8 @@ struct rpcrdma_mw { struct rpcrdma_fmr fmr; struct rpcrdma_frmr frmr; }; + struct work_struct mw_work; + struct rpcrdma_xprt *mw_xprt; struct list_head mw_list; struct list_head mw_all; }; @@ -270,12 +294,14 @@ struct rpcrdma_req { unsigned int rl_niovs; unsigned int rl_nchunks; unsigned int rl_connect_cookie; + struct rpc_task *rl_task; struct rpcrdma_buffer *rl_buffer; struct rpcrdma_rep *rl_reply;/* holder for reply buffer */ struct ib_sge rl_send_iov[RPCRDMA_MAX_IOVS]; struct rpcrdma_regbuf *rl_rdmabuf; struct rpcrdma_regbuf *rl_sendbuf; struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS]; + struct rpcrdma_mr_seg *rl_nextseg; struct ib_cqe rl_cqe; struct list_head rl_all; @@ -372,8 +398,8 @@ struct rpcrdma_memreg_ops { struct rpcrdma_mr_seg *, int, bool); void (*ro_unmap_sync)(struct rpcrdma_xprt *, struct rpcrdma_req *); - int (*ro_unmap)(struct rpcrdma_xprt *, - struct rpcrdma_mr_seg *); + void (*ro_unmap_safe)(struct rpcrdma_xprt *, + struct rpcrdma_req *, bool); int (*ro_open)(struct rpcrdma_ia *, struct rpcrdma_ep *, struct rpcrdma_create_data_internal *); @@ -456,7 +482,6 @@ struct rpcrdma_regbuf *rpcrdma_alloc_regbuf(struct rpcrdma_ia *, void rpcrdma_free_regbuf(struct rpcrdma_ia *, struct rpcrdma_regbuf *); -unsigned int rpcrdma_max_segments(struct rpcrdma_xprt *); int rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *, unsigned int); int frwr_alloc_recovery_wq(void); @@ -519,6 +544,9 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *); * RPC/RDMA protocol calls - xprtrdma/rpc_rdma.c */ int rpcrdma_marshal_req(struct rpc_rqst *); +void rpcrdma_set_max_header_sizes(struct rpcrdma_ia *, + struct rpcrdma_create_data_internal *, + unsigned int); /* RPC/RDMA module init - xprtrdma/transport.c */ @@ -534,6 +562,7 @@ void xprt_rdma_cleanup(void); #if defined(CONFIG_SUNRPC_BACKCHANNEL) int xprt_rdma_bc_setup(struct rpc_xprt *, unsigned int); int xprt_rdma_bc_up(struct svc_serv *, struct net *); +size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *); int rpcrdma_bc_post_recv(struct rpcrdma_xprt *, unsigned int); void rpcrdma_bc_receive_call(struct rpcrdma_xprt *, struct rpcrdma_rep *); int rpcrdma_bc_marshal_reply(struct rpc_rqst *); diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index e9e5dd0dc..7e2b2fa18 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -995,15 +995,14 @@ static void xs_udp_data_read_skb(struct rpc_xprt *xprt, u32 _xid; __be32 *xp; - repsize = skb->len - sizeof(struct udphdr); + repsize = skb->len; if (repsize < 4) { dprintk("RPC: impossible RPC reply size %d!\n", repsize); return; } /* Copy the XID from the skb... */ - xp = skb_header_pointer(skb, sizeof(struct udphdr), - sizeof(_xid), &_xid); + xp = skb_header_pointer(skb, 0, sizeof(_xid), &_xid); if (xp == NULL) return; @@ -1019,11 +1018,11 @@ static void xs_udp_data_read_skb(struct rpc_xprt *xprt, /* Suck it into the iovec, verify checksum if not done by hw. */ if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb)) { - UDPX_INC_STATS_BH(sk, UDP_MIB_INERRORS); + __UDPX_INC_STATS(sk, UDP_MIB_INERRORS); goto out_unlock; } - UDPX_INC_STATS_BH(sk, UDP_MIB_INDATAGRAMS); + __UDPX_INC_STATS(sk, UDP_MIB_INDATAGRAMS); xprt_adjust_cwnd(xprt, task, copied); xprt_complete_rqst(task, copied); @@ -1365,6 +1364,11 @@ static int xs_tcp_bc_up(struct svc_serv *serv, struct net *net) return ret; return 0; } + +static size_t xs_tcp_bc_maxpayload(struct rpc_xprt *xprt) +{ + return PAGE_SIZE; +} #else static inline int _xs_tcp_read_data(struct rpc_xprt *xprt, struct xdr_skb_reader *desc) @@ -1881,8 +1885,7 @@ static inline void xs_reclassify_socket6(struct socket *sock) static inline void xs_reclassify_socket(int family, struct socket *sock) { - WARN_ON_ONCE(sock_owned_by_user(sock->sk)); - if (sock_owned_by_user(sock->sk)) + if (WARN_ON_ONCE(!sock_allow_reclassification(sock->sk))) return; switch (family) { @@ -1952,6 +1955,7 @@ static int xs_local_finish_connecting(struct rpc_xprt *xprt, sk->sk_user_data = xprt; sk->sk_data_ready = xs_data_ready; sk->sk_write_space = xs_udp_write_space; + sock_set_flag(sk, SOCK_FASYNC); sk->sk_error_report = xs_error_report; sk->sk_allocation = GFP_NOIO; @@ -2138,6 +2142,7 @@ static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) sk->sk_user_data = xprt; sk->sk_data_ready = xs_data_ready; sk->sk_write_space = xs_udp_write_space; + sock_set_flag(sk, SOCK_FASYNC); sk->sk_allocation = GFP_NOIO; xprt_set_connected(xprt); @@ -2239,6 +2244,7 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) sk->sk_data_ready = xs_tcp_data_ready; sk->sk_state_change = xs_tcp_state_change; sk->sk_write_space = xs_tcp_write_space; + sock_set_flag(sk, SOCK_FASYNC); sk->sk_error_report = xs_error_report; sk->sk_allocation = GFP_NOIO; @@ -2660,6 +2666,7 @@ static struct rpc_xprt_ops xs_tcp_ops = { #ifdef CONFIG_SUNRPC_BACKCHANNEL .bc_setup = xprt_setup_bc, .bc_up = xs_tcp_bc_up, + .bc_maxpayload = xs_tcp_bc_maxpayload, .bc_free_rqst = xprt_free_bc_rqst, .bc_destroy = xprt_destroy_bc, #endif |