summaryrefslogtreecommitdiff
path: root/net/sunrpc/xprtrdma
diff options
context:
space:
mode:
Diffstat (limited to 'net/sunrpc/xprtrdma')
-rw-r--r--net/sunrpc/xprtrdma/Makefile14
-rw-r--r--net/sunrpc/xprtrdma/fmr_ops.c120
-rw-r--r--net/sunrpc/xprtrdma/frwr_ops.c229
-rw-r--r--net/sunrpc/xprtrdma/module.c46
-rw-r--r--net/sunrpc/xprtrdma/physical_ops.c14
-rw-r--r--net/sunrpc/xprtrdma/rpc_rdma.c8
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma.c8
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_marshal.c140
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_recvfrom.c6
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_sendto.c16
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_transport.c119
-rw-r--r--net/sunrpc/xprtrdma/transport.c56
-rw-r--r--net/sunrpc/xprtrdma/verbs.c348
-rw-r--r--net/sunrpc/xprtrdma/xprt_rdma.h49
14 files changed, 535 insertions, 638 deletions
diff --git a/net/sunrpc/xprtrdma/Makefile b/net/sunrpc/xprtrdma/Makefile
index 579f72bbc..48913de24 100644
--- a/net/sunrpc/xprtrdma/Makefile
+++ b/net/sunrpc/xprtrdma/Makefile
@@ -1,9 +1,7 @@
-obj-$(CONFIG_SUNRPC_XPRT_RDMA_CLIENT) += xprtrdma.o
+obj-$(CONFIG_SUNRPC_XPRT_RDMA) += rpcrdma.o
-xprtrdma-y := transport.o rpc_rdma.o verbs.o \
- fmr_ops.o frwr_ops.o physical_ops.o
-
-obj-$(CONFIG_SUNRPC_XPRT_RDMA_SERVER) += svcrdma.o
-
-svcrdma-y := svc_rdma.o svc_rdma_transport.o \
- svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o
+rpcrdma-y := transport.o rpc_rdma.o verbs.o \
+ fmr_ops.o frwr_ops.o physical_ops.o \
+ svc_rdma.o svc_rdma_transport.o \
+ svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o \
+ module.o
diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c
index 302d4ebf6..f1e8dafbd 100644
--- a/net/sunrpc/xprtrdma/fmr_ops.c
+++ b/net/sunrpc/xprtrdma/fmr_ops.c
@@ -11,6 +11,21 @@
* can take tens of usecs to complete.
*/
+/* Normal operation
+ *
+ * A Memory Region is prepared for RDMA READ or WRITE using the
+ * ib_map_phys_fmr verb (fmr_op_map). When the RDMA operation is
+ * finished, the Memory Region is unmapped using the ib_unmap_fmr
+ * verb (fmr_op_unmap).
+ */
+
+/* Transport recovery
+ *
+ * After a transport reconnect, fmr_op_map re-uses the MR already
+ * allocated for the RPC, but generates a fresh rkey then maps the
+ * MR again. This process is synchronous.
+ */
+
#include "xprt_rdma.h"
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
@@ -50,19 +65,28 @@ fmr_op_init(struct rpcrdma_xprt *r_xprt)
struct rpcrdma_mw *r;
int i, rc;
+ spin_lock_init(&buf->rb_mwlock);
INIT_LIST_HEAD(&buf->rb_mws);
INIT_LIST_HEAD(&buf->rb_all);
- i = (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS;
- dprintk("RPC: %s: initializing %d FMRs\n", __func__, i);
+ i = max_t(int, RPCRDMA_MAX_DATA_SEGS / RPCRDMA_MAX_FMR_SGES, 1);
+ i += 2; /* head + tail */
+ i *= buf->rb_max_requests; /* one set for each RPC slot */
+ dprintk("RPC: %s: initalizing %d FMRs\n", __func__, i);
+ rc = -ENOMEM;
while (i--) {
r = kzalloc(sizeof(*r), GFP_KERNEL);
if (!r)
- return -ENOMEM;
+ goto out;
- r->r.fmr = ib_alloc_fmr(pd, mr_access_flags, &fmr_attr);
- if (IS_ERR(r->r.fmr))
+ r->r.fmr.physaddrs = kmalloc(RPCRDMA_MAX_FMR_SGES *
+ sizeof(u64), GFP_KERNEL);
+ if (!r->r.fmr.physaddrs)
+ goto out_free;
+
+ r->r.fmr.fmr = ib_alloc_fmr(pd, mr_access_flags, &fmr_attr);
+ if (IS_ERR(r->r.fmr.fmr))
goto out_fmr_err;
list_add(&r->mw_list, &buf->rb_mws);
@@ -71,12 +95,24 @@ fmr_op_init(struct rpcrdma_xprt *r_xprt)
return 0;
out_fmr_err:
- rc = PTR_ERR(r->r.fmr);
+ rc = PTR_ERR(r->r.fmr.fmr);
dprintk("RPC: %s: ib_alloc_fmr status %i\n", __func__, rc);
+ kfree(r->r.fmr.physaddrs);
+out_free:
kfree(r);
+out:
return rc;
}
+static int
+__fmr_unmap(struct rpcrdma_mw *r)
+{
+ LIST_HEAD(l);
+
+ list_add(&r->r.fmr.fmr->list, &l);
+ return ib_unmap_fmr(&l);
+}
+
/* Use the ib_map_phys_fmr() verb to register a memory region
* for remote access via RDMA READ or RDMA WRITE.
*/
@@ -85,12 +121,24 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
int nsegs, bool writing)
{
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
- struct ib_device *device = ia->ri_id->device;
+ struct ib_device *device = ia->ri_device;
enum dma_data_direction direction = rpcrdma_data_dir(writing);
struct rpcrdma_mr_seg *seg1 = seg;
- struct rpcrdma_mw *mw = seg1->rl_mw;
- u64 physaddrs[RPCRDMA_MAX_DATA_SEGS];
int len, pageoff, i, rc;
+ struct rpcrdma_mw *mw;
+
+ mw = seg1->rl_mw;
+ seg1->rl_mw = NULL;
+ if (!mw) {
+ mw = rpcrdma_get_mw(r_xprt);
+ if (!mw)
+ return -ENOMEM;
+ } else {
+ /* this is a retransmit; generate a fresh rkey */
+ rc = __fmr_unmap(mw);
+ if (rc)
+ return rc;
+ }
pageoff = offset_in_page(seg1->mr_offset);
seg1->mr_offset -= pageoff; /* start of page */
@@ -100,7 +148,7 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
nsegs = RPCRDMA_MAX_FMR_SGES;
for (i = 0; i < nsegs;) {
rpcrdma_map_one(device, seg, direction);
- physaddrs[i] = seg->mr_dma;
+ mw->r.fmr.physaddrs[i] = seg->mr_dma;
len += seg->mr_len;
++seg;
++i;
@@ -110,11 +158,13 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
break;
}
- rc = ib_map_phys_fmr(mw->r.fmr, physaddrs, i, seg1->mr_dma);
+ rc = ib_map_phys_fmr(mw->r.fmr.fmr, mw->r.fmr.physaddrs,
+ i, seg1->mr_dma);
if (rc)
goto out_maperr;
- seg1->mr_rkey = mw->r.fmr->rkey;
+ seg1->rl_mw = mw;
+ seg1->mr_rkey = mw->r.fmr.fmr->rkey;
seg1->mr_base = seg1->mr_dma + pageoff;
seg1->mr_nsegs = i;
seg1->mr_len = len;
@@ -137,48 +187,28 @@ fmr_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg)
{
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
struct rpcrdma_mr_seg *seg1 = seg;
- struct ib_device *device;
+ struct rpcrdma_mw *mw = seg1->rl_mw;
int rc, nsegs = seg->mr_nsegs;
- LIST_HEAD(l);
- list_add(&seg1->rl_mw->r.fmr->list, &l);
- rc = ib_unmap_fmr(&l);
- read_lock(&ia->ri_qplock);
- device = ia->ri_id->device;
+ dprintk("RPC: %s: FMR %p\n", __func__, mw);
+
+ seg1->rl_mw = NULL;
while (seg1->mr_nsegs--)
- rpcrdma_unmap_one(device, seg++);
- read_unlock(&ia->ri_qplock);
+ rpcrdma_unmap_one(ia->ri_device, seg++);
+ rc = __fmr_unmap(mw);
if (rc)
goto out_err;
+ rpcrdma_put_mw(r_xprt, mw);
return nsegs;
out_err:
+ /* The FMR is abandoned, but remains in rb_all. fmr_op_destroy
+ * will attempt to release it when the transport is destroyed.
+ */
dprintk("RPC: %s: ib_unmap_fmr status %i\n", __func__, rc);
return nsegs;
}
-/* After a disconnect, unmap all FMRs.
- *
- * This is invoked only in the transport connect worker in order
- * to serialize with rpcrdma_register_fmr_external().
- */
-static void
-fmr_op_reset(struct rpcrdma_xprt *r_xprt)
-{
- struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
- struct rpcrdma_mw *r;
- LIST_HEAD(list);
- int rc;
-
- list_for_each_entry(r, &buf->rb_all, mw_all)
- list_add(&r->r.fmr->list, &list);
-
- rc = ib_unmap_fmr(&list);
- if (rc)
- dprintk("RPC: %s: ib_unmap_fmr failed %i\n",
- __func__, rc);
-}
-
static void
fmr_op_destroy(struct rpcrdma_buffer *buf)
{
@@ -188,10 +218,13 @@ fmr_op_destroy(struct rpcrdma_buffer *buf)
while (!list_empty(&buf->rb_all)) {
r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all);
list_del(&r->mw_all);
- rc = ib_dealloc_fmr(r->r.fmr);
+ kfree(r->r.fmr.physaddrs);
+
+ rc = ib_dealloc_fmr(r->r.fmr.fmr);
if (rc)
dprintk("RPC: %s: ib_dealloc_fmr failed %i\n",
__func__, rc);
+
kfree(r);
}
}
@@ -202,7 +235,6 @@ const struct rpcrdma_memreg_ops rpcrdma_fmr_memreg_ops = {
.ro_open = fmr_op_open,
.ro_maxpages = fmr_op_maxpages,
.ro_init = fmr_op_init,
- .ro_reset = fmr_op_reset,
.ro_destroy = fmr_op_destroy,
.ro_displayname = "fmr",
};
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index dff0481db..04ea91420 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -11,12 +11,136 @@
* but most complex memory registration mode.
*/
+/* Normal operation
+ *
+ * A Memory Region is prepared for RDMA READ or WRITE using a FAST_REG
+ * Work Request (frmr_op_map). When the RDMA operation is finished, this
+ * Memory Region is invalidated using a LOCAL_INV Work Request
+ * (frmr_op_unmap).
+ *
+ * Typically these Work Requests are not signaled, and neither are RDMA
+ * SEND Work Requests (with the exception of signaling occasionally to
+ * prevent provider work queue overflows). This greatly reduces HCA
+ * interrupt workload.
+ *
+ * As an optimization, frwr_op_unmap marks MRs INVALID before the
+ * LOCAL_INV WR is posted. If posting succeeds, the MR is placed on
+ * rb_mws immediately so that no work (like managing a linked list
+ * under a spinlock) is needed in the completion upcall.
+ *
+ * But this means that frwr_op_map() can occasionally encounter an MR
+ * that is INVALID but the LOCAL_INV WR has not completed. Work Queue
+ * ordering prevents a subsequent FAST_REG WR from executing against
+ * that MR while it is still being invalidated.
+ */
+
+/* Transport recovery
+ *
+ * ->op_map and the transport connect worker cannot run at the same
+ * time, but ->op_unmap can fire while the transport connect worker
+ * is running. Thus MR recovery is handled in ->op_map, to guarantee
+ * that recovered MRs are owned by a sending RPC, and not one where
+ * ->op_unmap could fire at the same time transport reconnect is
+ * being done.
+ *
+ * When the underlying transport disconnects, MRs are left in one of
+ * three states:
+ *
+ * INVALID: The MR was not in use before the QP entered ERROR state.
+ * (Or, the LOCAL_INV WR has not completed or flushed yet).
+ *
+ * STALE: The MR was being registered or unregistered when the QP
+ * entered ERROR state, and the pending WR was flushed.
+ *
+ * VALID: The MR was registered before the QP entered ERROR state.
+ *
+ * When frwr_op_map encounters STALE and VALID MRs, they are recovered
+ * with ib_dereg_mr and then are re-initialized. Beause MR recovery
+ * allocates fresh resources, it is deferred to a workqueue, and the
+ * recovered MRs are placed back on the rb_mws list when recovery is
+ * complete. frwr_op_map allocates another MR for the current RPC while
+ * the broken MR is reset.
+ *
+ * To ensure that frwr_op_map doesn't encounter an MR that is marked
+ * INVALID but that is about to be flushed due to a previous transport
+ * disconnect, the transport connect worker attempts to drain all
+ * pending send queue WRs before the transport is reconnected.
+ */
+
#include "xprt_rdma.h"
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
# define RPCDBG_FACILITY RPCDBG_TRANS
#endif
+static struct workqueue_struct *frwr_recovery_wq;
+
+#define FRWR_RECOVERY_WQ_FLAGS (WQ_UNBOUND | WQ_MEM_RECLAIM)
+
+int
+frwr_alloc_recovery_wq(void)
+{
+ frwr_recovery_wq = alloc_workqueue("frwr_recovery",
+ FRWR_RECOVERY_WQ_FLAGS, 0);
+ return !frwr_recovery_wq ? -ENOMEM : 0;
+}
+
+void
+frwr_destroy_recovery_wq(void)
+{
+ struct workqueue_struct *wq;
+
+ if (!frwr_recovery_wq)
+ return;
+
+ wq = frwr_recovery_wq;
+ frwr_recovery_wq = NULL;
+ destroy_workqueue(wq);
+}
+
+/* Deferred reset of a single FRMR. Generate a fresh rkey by
+ * replacing the MR.
+ *
+ * There's no recovery if this fails. The FRMR is abandoned, but
+ * remains in rb_all. It will be cleaned up when the transport is
+ * destroyed.
+ */
+static void
+__frwr_recovery_worker(struct work_struct *work)
+{
+ struct rpcrdma_mw *r = container_of(work, struct rpcrdma_mw,
+ r.frmr.fr_work);
+ struct rpcrdma_xprt *r_xprt = r->r.frmr.fr_xprt;
+ unsigned int depth = r_xprt->rx_ia.ri_max_frmr_depth;
+ struct ib_pd *pd = r_xprt->rx_ia.ri_pd;
+
+ if (ib_dereg_mr(r->r.frmr.fr_mr))
+ goto out_fail;
+
+ r->r.frmr.fr_mr = ib_alloc_fast_reg_mr(pd, depth);
+ if (IS_ERR(r->r.frmr.fr_mr))
+ goto out_fail;
+
+ dprintk("RPC: %s: recovered FRMR %p\n", __func__, r);
+ r->r.frmr.fr_state = FRMR_IS_INVALID;
+ rpcrdma_put_mw(r_xprt, r);
+ return;
+
+out_fail:
+ pr_warn("RPC: %s: FRMR %p unrecovered\n",
+ __func__, r);
+}
+
+/* A broken MR was discovered in a context that can't sleep.
+ * Defer recovery to the recovery worker.
+ */
+static void
+__frwr_queue_recovery(struct rpcrdma_mw *r)
+{
+ INIT_WORK(&r->r.frmr.fr_work, __frwr_recovery_worker);
+ queue_work(frwr_recovery_wq, &r->r.frmr.fr_work);
+}
+
static int
__frwr_init(struct rpcrdma_mw *r, struct ib_pd *pd, struct ib_device *device,
unsigned int depth)
@@ -128,8 +252,8 @@ frwr_sendcompletion(struct ib_wc *wc)
/* WARNING: Only wr_id and status are reliable at this point */
r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
- dprintk("RPC: %s: frmr %p (stale), status %d\n",
- __func__, r, wc->status);
+ pr_warn("RPC: %s: frmr %p flushed, status %s (%d)\n",
+ __func__, r, ib_wc_status_msg(wc->status), wc->status);
r->r.frmr.fr_state = FRMR_IS_STALE;
}
@@ -137,16 +261,19 @@ static int
frwr_op_init(struct rpcrdma_xprt *r_xprt)
{
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
- struct ib_device *device = r_xprt->rx_ia.ri_id->device;
+ struct ib_device *device = r_xprt->rx_ia.ri_device;
unsigned int depth = r_xprt->rx_ia.ri_max_frmr_depth;
struct ib_pd *pd = r_xprt->rx_ia.ri_pd;
int i;
+ spin_lock_init(&buf->rb_mwlock);
INIT_LIST_HEAD(&buf->rb_mws);
INIT_LIST_HEAD(&buf->rb_all);
- i = (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS;
- dprintk("RPC: %s: initializing %d FRMRs\n", __func__, i);
+ i = max_t(int, RPCRDMA_MAX_DATA_SEGS / depth, 1);
+ i += 2; /* head + tail */
+ i *= buf->rb_max_requests; /* one set for each RPC slot */
+ dprintk("RPC: %s: initalizing %d FRMRs\n", __func__, i);
while (i--) {
struct rpcrdma_mw *r;
@@ -165,6 +292,7 @@ frwr_op_init(struct rpcrdma_xprt *r_xprt)
list_add(&r->mw_list, &buf->rb_mws);
list_add(&r->mw_all, &buf->rb_all);
r->mw_sendcompletion = frwr_sendcompletion;
+ r->r.frmr.fr_xprt = r_xprt;
}
return 0;
@@ -178,12 +306,12 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
int nsegs, bool writing)
{
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
- struct ib_device *device = ia->ri_id->device;
+ struct ib_device *device = ia->ri_device;
enum dma_data_direction direction = rpcrdma_data_dir(writing);
struct rpcrdma_mr_seg *seg1 = seg;
- struct rpcrdma_mw *mw = seg1->rl_mw;
- struct rpcrdma_frmr *frmr = &mw->r.frmr;
- struct ib_mr *mr = frmr->fr_mr;
+ struct rpcrdma_mw *mw;
+ struct rpcrdma_frmr *frmr;
+ struct ib_mr *mr;
struct ib_send_wr fastreg_wr, *bad_wr;
u8 key;
int len, pageoff;
@@ -192,12 +320,25 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
u64 pa;
int page_no;
+ mw = seg1->rl_mw;
+ seg1->rl_mw = NULL;
+ do {
+ if (mw)
+ __frwr_queue_recovery(mw);
+ mw = rpcrdma_get_mw(r_xprt);
+ if (!mw)
+ return -ENOMEM;
+ } while (mw->r.frmr.fr_state != FRMR_IS_INVALID);
+ frmr = &mw->r.frmr;
+ frmr->fr_state = FRMR_IS_VALID;
+
pageoff = offset_in_page(seg1->mr_offset);
seg1->mr_offset -= pageoff; /* start of page */
seg1->mr_len += pageoff;
len = -pageoff;
if (nsegs > ia->ri_max_frmr_depth)
nsegs = ia->ri_max_frmr_depth;
+
for (page_no = i = 0; i < nsegs;) {
rpcrdma_map_one(device, seg, direction);
pa = seg->mr_dma;
@@ -216,8 +357,6 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
dprintk("RPC: %s: Using frmr %p to map %d segments (%d bytes)\n",
__func__, mw, i, len);
- frmr->fr_state = FRMR_IS_VALID;
-
memset(&fastreg_wr, 0, sizeof(fastreg_wr));
fastreg_wr.wr_id = (unsigned long)(void *)mw;
fastreg_wr.opcode = IB_WR_FAST_REG_MR;
@@ -229,6 +368,7 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
fastreg_wr.wr.fast_reg.access_flags = writing ?
IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
IB_ACCESS_REMOTE_READ;
+ mr = frmr->fr_mr;
key = (u8)(mr->rkey & 0x000000FF);
ib_update_fast_reg_key(mr, ++key);
fastreg_wr.wr.fast_reg.rkey = mr->rkey;
@@ -238,6 +378,7 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
if (rc)
goto out_senderr;
+ seg1->rl_mw = mw;
seg1->mr_rkey = mr->rkey;
seg1->mr_base = seg1->mr_dma + pageoff;
seg1->mr_nsegs = i;
@@ -246,10 +387,9 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
out_senderr:
dprintk("RPC: %s: ib_post_send status %i\n", __func__, rc);
- ib_update_fast_reg_key(mr, --key);
- frmr->fr_state = FRMR_IS_INVALID;
while (i--)
rpcrdma_unmap_one(device, --seg);
+ __frwr_queue_recovery(mw);
return rc;
}
@@ -261,78 +401,46 @@ frwr_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg)
{
struct rpcrdma_mr_seg *seg1 = seg;
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+ struct rpcrdma_mw *mw = seg1->rl_mw;
struct ib_send_wr invalidate_wr, *bad_wr;
int rc, nsegs = seg->mr_nsegs;
- struct ib_device *device;
- seg1->rl_mw->r.frmr.fr_state = FRMR_IS_INVALID;
+ dprintk("RPC: %s: FRMR %p\n", __func__, mw);
+
+ seg1->rl_mw = NULL;
+ mw->r.frmr.fr_state = FRMR_IS_INVALID;
memset(&invalidate_wr, 0, sizeof(invalidate_wr));
- invalidate_wr.wr_id = (unsigned long)(void *)seg1->rl_mw;
+ invalidate_wr.wr_id = (unsigned long)(void *)mw;
invalidate_wr.opcode = IB_WR_LOCAL_INV;
- invalidate_wr.ex.invalidate_rkey = seg1->rl_mw->r.frmr.fr_mr->rkey;
+ invalidate_wr.ex.invalidate_rkey = mw->r.frmr.fr_mr->rkey;
DECR_CQCOUNT(&r_xprt->rx_ep);
- read_lock(&ia->ri_qplock);
- device = ia->ri_id->device;
while (seg1->mr_nsegs--)
- rpcrdma_unmap_one(device, seg++);
+ rpcrdma_unmap_one(ia->ri_device, seg++);
+ read_lock(&ia->ri_qplock);
rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
read_unlock(&ia->ri_qplock);
if (rc)
goto out_err;
+
+ rpcrdma_put_mw(r_xprt, mw);
return nsegs;
out_err:
- /* Force rpcrdma_buffer_get() to retry */
- seg1->rl_mw->r.frmr.fr_state = FRMR_IS_STALE;
dprintk("RPC: %s: ib_post_send status %i\n", __func__, rc);
+ __frwr_queue_recovery(mw);
return nsegs;
}
-/* After a disconnect, a flushed FAST_REG_MR can leave an FRMR in
- * an unusable state. Find FRMRs in this state and dereg / reg
- * each. FRMRs that are VALID and attached to an rpcrdma_req are
- * also torn down.
- *
- * This gives all in-use FRMRs a fresh rkey and leaves them INVALID.
- *
- * This is invoked only in the transport connect worker in order
- * to serialize with rpcrdma_register_frmr_external().
- */
-static void
-frwr_op_reset(struct rpcrdma_xprt *r_xprt)
-{
- struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
- struct ib_device *device = r_xprt->rx_ia.ri_id->device;
- unsigned int depth = r_xprt->rx_ia.ri_max_frmr_depth;
- struct ib_pd *pd = r_xprt->rx_ia.ri_pd;
- struct rpcrdma_mw *r;
- int rc;
-
- list_for_each_entry(r, &buf->rb_all, mw_all) {
- if (r->r.frmr.fr_state == FRMR_IS_INVALID)
- continue;
-
- __frwr_release(r);
- rc = __frwr_init(r, pd, device, depth);
- if (rc) {
- dprintk("RPC: %s: mw %p left %s\n",
- __func__, r,
- (r->r.frmr.fr_state == FRMR_IS_STALE ?
- "stale" : "valid"));
- continue;
- }
-
- r->r.frmr.fr_state = FRMR_IS_INVALID;
- }
-}
-
static void
frwr_op_destroy(struct rpcrdma_buffer *buf)
{
struct rpcrdma_mw *r;
+ /* Ensure stale MWs for "buf" are no longer in flight */
+ flush_workqueue(frwr_recovery_wq);
+
while (!list_empty(&buf->rb_all)) {
r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all);
list_del(&r->mw_all);
@@ -347,7 +455,6 @@ const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops = {
.ro_open = frwr_op_open,
.ro_maxpages = frwr_op_maxpages,
.ro_init = frwr_op_init,
- .ro_reset = frwr_op_reset,
.ro_destroy = frwr_op_destroy,
.ro_displayname = "frwr",
};
diff --git a/net/sunrpc/xprtrdma/module.c b/net/sunrpc/xprtrdma/module.c
new file mode 100644
index 000000000..560712bd9
--- /dev/null
+++ b/net/sunrpc/xprtrdma/module.c
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2015 Oracle. All rights reserved.
+ */
+
+/* rpcrdma.ko module initialization
+ */
+
+#include <linux/module.h>
+#include <linux/init.h>
+#include <linux/sunrpc/svc_rdma.h>
+#include "xprt_rdma.h"
+
+#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
+# define RPCDBG_FACILITY RPCDBG_TRANS
+#endif
+
+MODULE_AUTHOR("Open Grid Computing and Network Appliance, Inc.");
+MODULE_DESCRIPTION("RPC/RDMA Transport");
+MODULE_LICENSE("Dual BSD/GPL");
+MODULE_ALIAS("svcrdma");
+MODULE_ALIAS("xprtrdma");
+
+static void __exit rpc_rdma_cleanup(void)
+{
+ xprt_rdma_cleanup();
+ svc_rdma_cleanup();
+}
+
+static int __init rpc_rdma_init(void)
+{
+ int rc;
+
+ rc = svc_rdma_init();
+ if (rc)
+ goto out;
+
+ rc = xprt_rdma_init();
+ if (rc)
+ svc_rdma_cleanup();
+
+out:
+ return rc;
+}
+
+module_init(rpc_rdma_init);
+module_exit(rpc_rdma_cleanup);
diff --git a/net/sunrpc/xprtrdma/physical_ops.c b/net/sunrpc/xprtrdma/physical_ops.c
index ba518af16..41985d07f 100644
--- a/net/sunrpc/xprtrdma/physical_ops.c
+++ b/net/sunrpc/xprtrdma/physical_ops.c
@@ -50,8 +50,7 @@ physical_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
{
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
- rpcrdma_map_one(ia->ri_id->device, seg,
- rpcrdma_data_dir(writing));
+ rpcrdma_map_one(ia->ri_device, seg, rpcrdma_data_dir(writing));
seg->mr_rkey = ia->ri_bind_mem->rkey;
seg->mr_base = seg->mr_dma;
seg->mr_nsegs = 1;
@@ -65,19 +64,11 @@ physical_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg)
{
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
- read_lock(&ia->ri_qplock);
- rpcrdma_unmap_one(ia->ri_id->device, seg);
- read_unlock(&ia->ri_qplock);
-
+ rpcrdma_unmap_one(ia->ri_device, seg);
return 1;
}
static void
-physical_op_reset(struct rpcrdma_xprt *r_xprt)
-{
-}
-
-static void
physical_op_destroy(struct rpcrdma_buffer *buf)
{
}
@@ -88,7 +79,6 @@ const struct rpcrdma_memreg_ops rpcrdma_physical_memreg_ops = {
.ro_open = physical_op_open,
.ro_maxpages = physical_op_maxpages,
.ro_init = physical_op_init,
- .ro_reset = physical_op_reset,
.ro_destroy = physical_op_destroy,
.ro_displayname = "physical",
};
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 2c53ea9e1..84ea37dae 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -284,9 +284,6 @@ rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,
return (unsigned char *)iptr - (unsigned char *)headerp;
out:
- if (r_xprt->rx_ia.ri_memreg_strategy == RPCRDMA_FRMR)
- return n;
-
for (pos = 0; nchunks--;)
pos += r_xprt->rx_ia.ri_ops->ro_unmap(r_xprt,
&req->rl_segments[pos]);
@@ -732,8 +729,8 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
struct rpcrdma_msg *headerp;
struct rpcrdma_req *req;
struct rpc_rqst *rqst;
- struct rpc_xprt *xprt = rep->rr_xprt;
- struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+ struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
+ struct rpc_xprt *xprt = &r_xprt->rx_xprt;
__be32 *iptr;
int rdmalen, status;
unsigned long cwnd;
@@ -770,7 +767,6 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
rep->rr_len);
repost:
r_xprt->rx_stats.bad_reply_count++;
- rep->rr_func = rpcrdma_reply_handler;
if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
rpcrdma_recv_buffer_put(rep);
diff --git a/net/sunrpc/xprtrdma/svc_rdma.c b/net/sunrpc/xprtrdma/svc_rdma.c
index c1b627026..2cd252f02 100644
--- a/net/sunrpc/xprtrdma/svc_rdma.c
+++ b/net/sunrpc/xprtrdma/svc_rdma.c
@@ -38,8 +38,7 @@
*
* Author: Tom Tucker <tom@opengridcomputing.com>
*/
-#include <linux/module.h>
-#include <linux/init.h>
+
#include <linux/slab.h>
#include <linux/fs.h>
#include <linux/sysctl.h>
@@ -295,8 +294,3 @@ int svc_rdma_init(void)
destroy_workqueue(svc_rdma_wq);
return -ENOMEM;
}
-MODULE_AUTHOR("Tom Tucker <tom@opengridcomputing.com>");
-MODULE_DESCRIPTION("SVC RDMA Transport");
-MODULE_LICENSE("Dual BSD/GPL");
-module_init(svc_rdma_init);
-module_exit(svc_rdma_cleanup);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_marshal.c b/net/sunrpc/xprtrdma/svc_rdma_marshal.c
index b681855cf..e2fca7617 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_marshal.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_marshal.c
@@ -50,12 +50,12 @@
/*
* Decodes a read chunk list. The expected format is as follows:
* descrim : xdr_one
- * position : u32 offset into XDR stream
- * handle : u32 RKEY
+ * position : __be32 offset into XDR stream
+ * handle : __be32 RKEY
* . . .
* end-of-list: xdr_zero
*/
-static u32 *decode_read_list(u32 *va, u32 *vaend)
+static __be32 *decode_read_list(__be32 *va, __be32 *vaend)
{
struct rpcrdma_read_chunk *ch = (struct rpcrdma_read_chunk *)va;
@@ -67,20 +67,20 @@ static u32 *decode_read_list(u32 *va, u32 *vaend)
}
ch++;
}
- return (u32 *)&ch->rc_position;
+ return &ch->rc_position;
}
/*
* Decodes a write chunk list. The expected format is as follows:
* descrim : xdr_one
* nchunks : <count>
- * handle : u32 RKEY ---+
- * length : u32 <len of segment> |
+ * handle : __be32 RKEY ---+
+ * length : __be32 <len of segment> |
* offset : remove va + <count>
* . . . |
* ---+
*/
-static u32 *decode_write_list(u32 *va, u32 *vaend)
+static __be32 *decode_write_list(__be32 *va, __be32 *vaend)
{
unsigned long start, end;
int nchunks;
@@ -90,14 +90,14 @@ static u32 *decode_write_list(u32 *va, u32 *vaend)
/* Check for not write-array */
if (ary->wc_discrim == xdr_zero)
- return (u32 *)&ary->wc_nchunks;
+ return &ary->wc_nchunks;
if ((unsigned long)ary + sizeof(struct rpcrdma_write_array) >
(unsigned long)vaend) {
dprintk("svcrdma: ary=%p, vaend=%p\n", ary, vaend);
return NULL;
}
- nchunks = ntohl(ary->wc_nchunks);
+ nchunks = be32_to_cpu(ary->wc_nchunks);
start = (unsigned long)&ary->wc_array[0];
end = (unsigned long)vaend;
@@ -112,10 +112,10 @@ static u32 *decode_write_list(u32 *va, u32 *vaend)
* rs_length is the 2nd 4B field in wc_target and taking its
* address skips the list terminator
*/
- return (u32 *)&ary->wc_array[nchunks].wc_target.rs_length;
+ return &ary->wc_array[nchunks].wc_target.rs_length;
}
-static u32 *decode_reply_array(u32 *va, u32 *vaend)
+static __be32 *decode_reply_array(__be32 *va, __be32 *vaend)
{
unsigned long start, end;
int nchunks;
@@ -124,14 +124,14 @@ static u32 *decode_reply_array(u32 *va, u32 *vaend)
/* Check for no reply-array */
if (ary->wc_discrim == xdr_zero)
- return (u32 *)&ary->wc_nchunks;
+ return &ary->wc_nchunks;
if ((unsigned long)ary + sizeof(struct rpcrdma_write_array) >
(unsigned long)vaend) {
dprintk("svcrdma: ary=%p, vaend=%p\n", ary, vaend);
return NULL;
}
- nchunks = ntohl(ary->wc_nchunks);
+ nchunks = be32_to_cpu(ary->wc_nchunks);
start = (unsigned long)&ary->wc_array[0];
end = (unsigned long)vaend;
@@ -142,15 +142,14 @@ static u32 *decode_reply_array(u32 *va, u32 *vaend)
ary, nchunks, vaend);
return NULL;
}
- return (u32 *)&ary->wc_array[nchunks];
+ return (__be32 *)&ary->wc_array[nchunks];
}
int svc_rdma_xdr_decode_req(struct rpcrdma_msg **rdma_req,
struct svc_rqst *rqstp)
{
struct rpcrdma_msg *rmsgp = NULL;
- u32 *va;
- u32 *vaend;
+ __be32 *va, *vaend;
u32 hdr_len;
rmsgp = (struct rpcrdma_msg *)rqstp->rq_arg.head[0].iov_base;
@@ -162,22 +161,17 @@ int svc_rdma_xdr_decode_req(struct rpcrdma_msg **rdma_req,
return -EINVAL;
}
- /* Decode the header */
- rmsgp->rm_xid = ntohl(rmsgp->rm_xid);
- rmsgp->rm_vers = ntohl(rmsgp->rm_vers);
- rmsgp->rm_credit = ntohl(rmsgp->rm_credit);
- rmsgp->rm_type = ntohl(rmsgp->rm_type);
-
- if (rmsgp->rm_vers != RPCRDMA_VERSION)
+ if (rmsgp->rm_vers != rpcrdma_version)
return -ENOSYS;
/* Pull in the extra for the padded case and bump our pointer */
- if (rmsgp->rm_type == RDMA_MSGP) {
+ if (rmsgp->rm_type == rdma_msgp) {
int hdrlen;
+
rmsgp->rm_body.rm_padded.rm_align =
- ntohl(rmsgp->rm_body.rm_padded.rm_align);
+ be32_to_cpu(rmsgp->rm_body.rm_padded.rm_align);
rmsgp->rm_body.rm_padded.rm_thresh =
- ntohl(rmsgp->rm_body.rm_padded.rm_thresh);
+ be32_to_cpu(rmsgp->rm_body.rm_padded.rm_thresh);
va = &rmsgp->rm_body.rm_padded.rm_pempty[4];
rqstp->rq_arg.head[0].iov_base = va;
@@ -192,7 +186,7 @@ int svc_rdma_xdr_decode_req(struct rpcrdma_msg **rdma_req,
* chunk list and a reply chunk list.
*/
va = &rmsgp->rm_body.rm_chunks[0];
- vaend = (u32 *)((unsigned long)rmsgp + rqstp->rq_arg.len);
+ vaend = (__be32 *)((unsigned long)rmsgp + rqstp->rq_arg.len);
va = decode_read_list(va, vaend);
if (!va)
return -EINVAL;
@@ -211,76 +205,20 @@ int svc_rdma_xdr_decode_req(struct rpcrdma_msg **rdma_req,
return hdr_len;
}
-int svc_rdma_xdr_decode_deferred_req(struct svc_rqst *rqstp)
-{
- struct rpcrdma_msg *rmsgp = NULL;
- struct rpcrdma_read_chunk *ch;
- struct rpcrdma_write_array *ary;
- u32 *va;
- u32 hdrlen;
-
- dprintk("svcrdma: processing deferred RDMA header on rqstp=%p\n",
- rqstp);
- rmsgp = (struct rpcrdma_msg *)rqstp->rq_arg.head[0].iov_base;
-
- /* Pull in the extra for the padded case and bump our pointer */
- if (rmsgp->rm_type == RDMA_MSGP) {
- va = &rmsgp->rm_body.rm_padded.rm_pempty[4];
- rqstp->rq_arg.head[0].iov_base = va;
- hdrlen = (u32)((unsigned long)va - (unsigned long)rmsgp);
- rqstp->rq_arg.head[0].iov_len -= hdrlen;
- return hdrlen;
- }
-
- /*
- * Skip all chunks to find RPC msg. These were previously processed
- */
- va = &rmsgp->rm_body.rm_chunks[0];
-
- /* Skip read-list */
- for (ch = (struct rpcrdma_read_chunk *)va;
- ch->rc_discrim != xdr_zero; ch++);
- va = (u32 *)&ch->rc_position;
-
- /* Skip write-list */
- ary = (struct rpcrdma_write_array *)va;
- if (ary->wc_discrim == xdr_zero)
- va = (u32 *)&ary->wc_nchunks;
- else
- /*
- * rs_length is the 2nd 4B field in wc_target and taking its
- * address skips the list terminator
- */
- va = (u32 *)&ary->wc_array[ary->wc_nchunks].wc_target.rs_length;
-
- /* Skip reply-array */
- ary = (struct rpcrdma_write_array *)va;
- if (ary->wc_discrim == xdr_zero)
- va = (u32 *)&ary->wc_nchunks;
- else
- va = (u32 *)&ary->wc_array[ary->wc_nchunks];
-
- rqstp->rq_arg.head[0].iov_base = va;
- hdrlen = (unsigned long)va - (unsigned long)rmsgp;
- rqstp->rq_arg.head[0].iov_len -= hdrlen;
-
- return hdrlen;
-}
-
int svc_rdma_xdr_encode_error(struct svcxprt_rdma *xprt,
struct rpcrdma_msg *rmsgp,
- enum rpcrdma_errcode err, u32 *va)
+ enum rpcrdma_errcode err, __be32 *va)
{
- u32 *startp = va;
+ __be32 *startp = va;
- *va++ = htonl(rmsgp->rm_xid);
- *va++ = htonl(rmsgp->rm_vers);
- *va++ = htonl(xprt->sc_max_requests);
- *va++ = htonl(RDMA_ERROR);
- *va++ = htonl(err);
+ *va++ = rmsgp->rm_xid;
+ *va++ = rmsgp->rm_vers;
+ *va++ = cpu_to_be32(xprt->sc_max_requests);
+ *va++ = rdma_error;
+ *va++ = cpu_to_be32(err);
if (err == ERR_VERS) {
- *va++ = htonl(RPCRDMA_VERSION);
- *va++ = htonl(RPCRDMA_VERSION);
+ *va++ = rpcrdma_version;
+ *va++ = rpcrdma_version;
}
return (int)((unsigned long)va - (unsigned long)startp);
@@ -297,7 +235,7 @@ int svc_rdma_xdr_get_reply_hdr_len(struct rpcrdma_msg *rmsgp)
&rmsgp->rm_body.rm_chunks[1];
if (wr_ary->wc_discrim)
wr_ary = (struct rpcrdma_write_array *)
- &wr_ary->wc_array[ntohl(wr_ary->wc_nchunks)].
+ &wr_ary->wc_array[be32_to_cpu(wr_ary->wc_nchunks)].
wc_target.rs_length;
else
wr_ary = (struct rpcrdma_write_array *)
@@ -306,7 +244,7 @@ int svc_rdma_xdr_get_reply_hdr_len(struct rpcrdma_msg *rmsgp)
/* skip reply array */
if (wr_ary->wc_discrim)
wr_ary = (struct rpcrdma_write_array *)
- &wr_ary->wc_array[ntohl(wr_ary->wc_nchunks)];
+ &wr_ary->wc_array[be32_to_cpu(wr_ary->wc_nchunks)];
else
wr_ary = (struct rpcrdma_write_array *)
&wr_ary->wc_nchunks;
@@ -325,7 +263,7 @@ void svc_rdma_xdr_encode_write_list(struct rpcrdma_msg *rmsgp, int chunks)
ary = (struct rpcrdma_write_array *)
&rmsgp->rm_body.rm_chunks[1];
ary->wc_discrim = xdr_one;
- ary->wc_nchunks = htonl(chunks);
+ ary->wc_nchunks = cpu_to_be32(chunks);
/* write-list terminator */
ary->wc_array[chunks].wc_target.rs_handle = xdr_zero;
@@ -338,7 +276,7 @@ void svc_rdma_xdr_encode_reply_array(struct rpcrdma_write_array *ary,
int chunks)
{
ary->wc_discrim = xdr_one;
- ary->wc_nchunks = htonl(chunks);
+ ary->wc_nchunks = cpu_to_be32(chunks);
}
void svc_rdma_xdr_encode_array_chunk(struct rpcrdma_write_array *ary,
@@ -350,7 +288,7 @@ void svc_rdma_xdr_encode_array_chunk(struct rpcrdma_write_array *ary,
struct rpcrdma_segment *seg = &ary->wc_array[chunk_no].wc_target;
seg->rs_handle = rs_handle;
seg->rs_offset = rs_offset;
- seg->rs_length = htonl(write_len);
+ seg->rs_length = cpu_to_be32(write_len);
}
void svc_rdma_xdr_encode_reply_header(struct svcxprt_rdma *xprt,
@@ -358,10 +296,10 @@ void svc_rdma_xdr_encode_reply_header(struct svcxprt_rdma *xprt,
struct rpcrdma_msg *rdma_resp,
enum rpcrdma_proc rdma_type)
{
- rdma_resp->rm_xid = htonl(rdma_argp->rm_xid);
- rdma_resp->rm_vers = htonl(rdma_argp->rm_vers);
- rdma_resp->rm_credit = htonl(xprt->sc_max_requests);
- rdma_resp->rm_type = htonl(rdma_type);
+ rdma_resp->rm_xid = rdma_argp->rm_xid;
+ rdma_resp->rm_vers = rdma_argp->rm_vers;
+ rdma_resp->rm_credit = cpu_to_be32(xprt->sc_max_requests);
+ rdma_resp->rm_type = cpu_to_be32(rdma_type);
/* Encode <nul> chunks lists */
rdma_resp->rm_body.rm_chunks[0] = xdr_zero;
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index f9f13a32d..2e1348bde 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -85,7 +85,7 @@ static void rdma_build_arg_xdr(struct svc_rqst *rqstp,
/* RDMA_NOMSG: RDMA READ data should land just after RDMA RECV data */
rmsgp = (struct rpcrdma_msg *)rqstp->rq_arg.head[0].iov_base;
- if (be32_to_cpu(rmsgp->rm_type) == RDMA_NOMSG)
+ if (rmsgp->rm_type == rdma_nomsg)
rqstp->rq_arg.pages = &rqstp->rq_pages[0];
else
rqstp->rq_arg.pages = &rqstp->rq_pages[1];
@@ -117,8 +117,8 @@ static void rdma_build_arg_xdr(struct svc_rqst *rqstp,
static int rdma_read_max_sge(struct svcxprt_rdma *xprt, int sge_count)
{
- if (rdma_node_get_transport(xprt->sc_cm_id->device->node_type) ==
- RDMA_TRANSPORT_IWARP)
+ if (!rdma_cap_read_multi_sge(xprt->sc_cm_id->device,
+ xprt->sc_cm_id->port_num))
return 1;
else
return min_t(int, sge_count, xprt->sc_max_sge);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index 7de33d1af..d25cd430f 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -240,6 +240,7 @@ static int send_write_chunks(struct svcxprt_rdma *xprt,
u32 xdr_off;
int chunk_off;
int chunk_no;
+ int nchunks;
struct rpcrdma_write_array *arg_ary;
struct rpcrdma_write_array *res_ary;
int ret;
@@ -251,14 +252,15 @@ static int send_write_chunks(struct svcxprt_rdma *xprt,
&rdma_resp->rm_body.rm_chunks[1];
/* Write chunks start at the pagelist */
+ nchunks = be32_to_cpu(arg_ary->wc_nchunks);
for (xdr_off = rqstp->rq_res.head[0].iov_len, chunk_no = 0;
- xfer_len && chunk_no < arg_ary->wc_nchunks;
+ xfer_len && chunk_no < nchunks;
chunk_no++) {
struct rpcrdma_segment *arg_ch;
u64 rs_offset;
arg_ch = &arg_ary->wc_array[chunk_no].wc_target;
- write_len = min(xfer_len, ntohl(arg_ch->rs_length));
+ write_len = min(xfer_len, be32_to_cpu(arg_ch->rs_length));
/* Prepare the response chunk given the length actually
* written */
@@ -270,7 +272,7 @@ static int send_write_chunks(struct svcxprt_rdma *xprt,
chunk_off = 0;
while (write_len) {
ret = send_write(xprt, rqstp,
- ntohl(arg_ch->rs_handle),
+ be32_to_cpu(arg_ch->rs_handle),
rs_offset + chunk_off,
xdr_off,
write_len,
@@ -318,13 +320,13 @@ static int send_reply_chunks(struct svcxprt_rdma *xprt,
&rdma_resp->rm_body.rm_chunks[2];
/* xdr offset starts at RPC message */
- nchunks = ntohl(arg_ary->wc_nchunks);
+ nchunks = be32_to_cpu(arg_ary->wc_nchunks);
for (xdr_off = 0, chunk_no = 0;
xfer_len && chunk_no < nchunks;
chunk_no++) {
u64 rs_offset;
ch = &arg_ary->wc_array[chunk_no].wc_target;
- write_len = min(xfer_len, htonl(ch->rs_length));
+ write_len = min(xfer_len, be32_to_cpu(ch->rs_length));
/* Prepare the reply chunk given the length actually
* written */
@@ -335,7 +337,7 @@ static int send_reply_chunks(struct svcxprt_rdma *xprt,
chunk_off = 0;
while (write_len) {
ret = send_write(xprt, rqstp,
- ntohl(ch->rs_handle),
+ be32_to_cpu(ch->rs_handle),
rs_offset + chunk_off,
xdr_off,
write_len,
@@ -515,7 +517,7 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
inline_bytes = rqstp->rq_res.len;
/* Create the RDMA response header */
- res_page = svc_rdma_get_page();
+ res_page = alloc_page(GFP_KERNEL | __GFP_NOFAIL);
rdma_resp = page_address(res_page);
reply_ary = svc_rdma_get_reply_array(rdma_argp);
if (reply_ary)
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index f609c1c2d..6b36279e4 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -91,7 +91,7 @@ struct svc_xprt_class svc_rdma_class = {
.xcl_name = "rdma",
.xcl_owner = THIS_MODULE,
.xcl_ops = &svc_rdma_ops,
- .xcl_max_payload = RPCSVC_MAXPAYLOAD_RDMA,
+ .xcl_max_payload = RPCRDMA_MAXPAYLOAD,
.xcl_ident = XPRT_TRANSPORT_RDMA,
};
@@ -99,12 +99,8 @@ struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
{
struct svc_rdma_op_ctxt *ctxt;
- while (1) {
- ctxt = kmem_cache_alloc(svc_rdma_ctxt_cachep, GFP_KERNEL);
- if (ctxt)
- break;
- schedule_timeout_uninterruptible(msecs_to_jiffies(500));
- }
+ ctxt = kmem_cache_alloc(svc_rdma_ctxt_cachep,
+ GFP_KERNEL | __GFP_NOFAIL);
ctxt->xprt = xprt;
INIT_LIST_HEAD(&ctxt->dto_q);
ctxt->count = 0;
@@ -156,12 +152,8 @@ void svc_rdma_put_context(struct svc_rdma_op_ctxt *ctxt, int free_pages)
struct svc_rdma_req_map *svc_rdma_get_req_map(void)
{
struct svc_rdma_req_map *map;
- while (1) {
- map = kmem_cache_alloc(svc_rdma_map_cachep, GFP_KERNEL);
- if (map)
- break;
- schedule_timeout_uninterruptible(msecs_to_jiffies(500));
- }
+ map = kmem_cache_alloc(svc_rdma_map_cachep,
+ GFP_KERNEL | __GFP_NOFAIL);
map->count = 0;
return map;
}
@@ -175,8 +167,8 @@ void svc_rdma_put_req_map(struct svc_rdma_req_map *map)
static void cq_event_handler(struct ib_event *event, void *context)
{
struct svc_xprt *xprt = context;
- dprintk("svcrdma: received CQ event id=%d, context=%p\n",
- event->event, context);
+ dprintk("svcrdma: received CQ event %s (%d), context=%p\n",
+ ib_event_msg(event->event), event->event, context);
set_bit(XPT_CLOSE, &xprt->xpt_flags);
}
@@ -191,8 +183,9 @@ static void qp_event_handler(struct ib_event *event, void *context)
case IB_EVENT_COMM_EST:
case IB_EVENT_SQ_DRAINED:
case IB_EVENT_QP_LAST_WQE_REACHED:
- dprintk("svcrdma: QP event %d received for QP=%p\n",
- event->event, event->element.qp);
+ dprintk("svcrdma: QP event %s (%d) received for QP=%p\n",
+ ib_event_msg(event->event), event->event,
+ event->element.qp);
break;
/* These are considered fatal events */
case IB_EVENT_PATH_MIG_ERR:
@@ -201,9 +194,10 @@ static void qp_event_handler(struct ib_event *event, void *context)
case IB_EVENT_QP_ACCESS_ERR:
case IB_EVENT_DEVICE_FATAL:
default:
- dprintk("svcrdma: QP ERROR event %d received for QP=%p, "
+ dprintk("svcrdma: QP ERROR event %s (%d) received for QP=%p, "
"closing transport\n",
- event->event, event->element.qp);
+ ib_event_msg(event->event), event->event,
+ event->element.qp);
set_bit(XPT_CLOSE, &xprt->xpt_flags);
break;
}
@@ -402,7 +396,8 @@ static void sq_cq_reap(struct svcxprt_rdma *xprt)
for (i = 0; i < ret; i++) {
wc = &wc_a[i];
if (wc->status != IB_WC_SUCCESS) {
- dprintk("svcrdma: sq wc err status %d\n",
+ dprintk("svcrdma: sq wc err status %s (%d)\n",
+ ib_wc_status_msg(wc->status),
wc->status);
/* Close the transport */
@@ -490,18 +485,6 @@ static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv,
return cma_xprt;
}
-struct page *svc_rdma_get_page(void)
-{
- struct page *page;
-
- while ((page = alloc_page(GFP_KERNEL)) == NULL) {
- /* If we can't get memory, wait a bit and try again */
- printk(KERN_INFO "svcrdma: out of memory...retrying in 1s\n");
- schedule_timeout_uninterruptible(msecs_to_jiffies(1000));
- }
- return page;
-}
-
int svc_rdma_post_recv(struct svcxprt_rdma *xprt)
{
struct ib_recv_wr recv_wr, *bad_recv_wr;
@@ -520,7 +503,7 @@ int svc_rdma_post_recv(struct svcxprt_rdma *xprt)
pr_err("svcrdma: Too many sges (%d)\n", sge_no);
goto err_put_ctxt;
}
- page = svc_rdma_get_page();
+ page = alloc_page(GFP_KERNEL | __GFP_NOFAIL);
ctxt->pages[sge_no] = page;
pa = ib_dma_map_page(xprt->sc_cm_id->device,
page, 0, PAGE_SIZE,
@@ -616,7 +599,8 @@ static int rdma_listen_handler(struct rdma_cm_id *cma_id,
switch (event->event) {
case RDMA_CM_EVENT_CONNECT_REQUEST:
dprintk("svcrdma: Connect request on cma_id=%p, xprt = %p, "
- "event=%d\n", cma_id, cma_id->context, event->event);
+ "event = %s (%d)\n", cma_id, cma_id->context,
+ rdma_event_msg(event->event), event->event);
handle_connect_req(cma_id,
event->param.conn.initiator_depth);
break;
@@ -636,7 +620,8 @@ static int rdma_listen_handler(struct rdma_cm_id *cma_id,
default:
dprintk("svcrdma: Unexpected event on listening endpoint %p, "
- "event=%d\n", cma_id, event->event);
+ "event = %s (%d)\n", cma_id,
+ rdma_event_msg(event->event), event->event);
break;
}
@@ -669,7 +654,8 @@ static int rdma_cma_handler(struct rdma_cm_id *cma_id,
break;
case RDMA_CM_EVENT_DEVICE_REMOVAL:
dprintk("svcrdma: Device removal cma_id=%p, xprt = %p, "
- "event=%d\n", cma_id, xprt, event->event);
+ "event = %s (%d)\n", cma_id, xprt,
+ rdma_event_msg(event->event), event->event);
if (xprt) {
set_bit(XPT_CLOSE, &xprt->xpt_flags);
svc_xprt_enqueue(xprt);
@@ -677,7 +663,8 @@ static int rdma_cma_handler(struct rdma_cm_id *cma_id,
break;
default:
dprintk("svcrdma: Unexpected event on DTO endpoint %p, "
- "event=%d\n", cma_id, event->event);
+ "event = %s (%d)\n", cma_id,
+ rdma_event_msg(event->event), event->event);
break;
}
return 0;
@@ -848,10 +835,11 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
struct svcxprt_rdma *listen_rdma;
struct svcxprt_rdma *newxprt = NULL;
struct rdma_conn_param conn_param;
+ struct ib_cq_init_attr cq_attr = {};
struct ib_qp_init_attr qp_attr;
struct ib_device_attr devattr;
int uninitialized_var(dma_mr_acc);
- int need_dma_mr;
+ int need_dma_mr = 0;
int ret;
int i;
@@ -900,22 +888,22 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
dprintk("svcrdma: error creating PD for connect request\n");
goto errout;
}
+ cq_attr.cqe = newxprt->sc_sq_depth;
newxprt->sc_sq_cq = ib_create_cq(newxprt->sc_cm_id->device,
sq_comp_handler,
cq_event_handler,
newxprt,
- newxprt->sc_sq_depth,
- 0);
+ &cq_attr);
if (IS_ERR(newxprt->sc_sq_cq)) {
dprintk("svcrdma: error creating SQ CQ for connect request\n");
goto errout;
}
+ cq_attr.cqe = newxprt->sc_max_requests;
newxprt->sc_rq_cq = ib_create_cq(newxprt->sc_cm_id->device,
rq_comp_handler,
cq_event_handler,
newxprt,
- newxprt->sc_max_requests,
- 0);
+ &cq_attr);
if (IS_ERR(newxprt->sc_rq_cq)) {
dprintk("svcrdma: error creating RQ CQ for connect request\n");
goto errout;
@@ -985,35 +973,26 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
/*
* Determine if a DMA MR is required and if so, what privs are required
*/
- switch (rdma_node_get_transport(newxprt->sc_cm_id->device->node_type)) {
- case RDMA_TRANSPORT_IWARP:
- newxprt->sc_dev_caps |= SVCRDMA_DEVCAP_READ_W_INV;
- if (!(newxprt->sc_dev_caps & SVCRDMA_DEVCAP_FAST_REG)) {
- need_dma_mr = 1;
- dma_mr_acc =
- (IB_ACCESS_LOCAL_WRITE |
- IB_ACCESS_REMOTE_WRITE);
- } else if (!(devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY)) {
- need_dma_mr = 1;
- dma_mr_acc = IB_ACCESS_LOCAL_WRITE;
- } else
- need_dma_mr = 0;
- break;
- case RDMA_TRANSPORT_IB:
- if (!(newxprt->sc_dev_caps & SVCRDMA_DEVCAP_FAST_REG)) {
- need_dma_mr = 1;
- dma_mr_acc = IB_ACCESS_LOCAL_WRITE;
- } else if (!(devattr.device_cap_flags &
- IB_DEVICE_LOCAL_DMA_LKEY)) {
- need_dma_mr = 1;
- dma_mr_acc = IB_ACCESS_LOCAL_WRITE;
- } else
- need_dma_mr = 0;
- break;
- default:
+ if (!rdma_protocol_iwarp(newxprt->sc_cm_id->device,
+ newxprt->sc_cm_id->port_num) &&
+ !rdma_ib_or_roce(newxprt->sc_cm_id->device,
+ newxprt->sc_cm_id->port_num))
goto errout;
+
+ if (!(newxprt->sc_dev_caps & SVCRDMA_DEVCAP_FAST_REG) ||
+ !(devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY)) {
+ need_dma_mr = 1;
+ dma_mr_acc = IB_ACCESS_LOCAL_WRITE;
+ if (rdma_protocol_iwarp(newxprt->sc_cm_id->device,
+ newxprt->sc_cm_id->port_num) &&
+ !(newxprt->sc_dev_caps & SVCRDMA_DEVCAP_FAST_REG))
+ dma_mr_acc |= IB_ACCESS_REMOTE_WRITE;
}
+ if (rdma_protocol_iwarp(newxprt->sc_cm_id->device,
+ newxprt->sc_cm_id->port_num))
+ newxprt->sc_dev_caps |= SVCRDMA_DEVCAP_READ_W_INV;
+
/* Create the DMA MR if needed, otherwise, use the DMA LKEY */
if (need_dma_mr) {
/* Register all of physical memory */
@@ -1319,11 +1298,11 @@ void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
struct ib_send_wr err_wr;
struct page *p;
struct svc_rdma_op_ctxt *ctxt;
- u32 *va;
+ __be32 *va;
int length;
int ret;
- p = svc_rdma_get_page();
+ p = alloc_page(GFP_KERNEL | __GFP_NOFAIL);
va = page_address(p);
/* XDR encode error */
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 54f23b1be..680f888a9 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -48,7 +48,6 @@
*/
#include <linux/module.h>
-#include <linux/init.h>
#include <linux/slab.h>
#include <linux/seq_file.h>
#include <linux/sunrpc/addr.h>
@@ -59,11 +58,6 @@
# define RPCDBG_FACILITY RPCDBG_TRANS
#endif
-MODULE_LICENSE("Dual BSD/GPL");
-
-MODULE_DESCRIPTION("RPC/RDMA Transport for Linux kernel NFS");
-MODULE_AUTHOR("Network Appliance, Inc.");
-
/*
* tunables
*/
@@ -246,6 +240,16 @@ xprt_rdma_connect_worker(struct work_struct *work)
xprt_clear_connecting(xprt);
}
+static void
+xprt_rdma_inject_disconnect(struct rpc_xprt *xprt)
+{
+ struct rpcrdma_xprt *r_xprt = container_of(xprt, struct rpcrdma_xprt,
+ rx_xprt);
+
+ pr_info("rpcrdma: injecting transport disconnect on xprt=%p\n", xprt);
+ rdma_disconnect(r_xprt->rx_ia.ri_id);
+}
+
/*
* xprt_rdma_destroy
*
@@ -618,12 +622,6 @@ xprt_rdma_send_request(struct rpc_task *task)
if (req->rl_reply == NULL) /* e.g. reconnection */
rpcrdma_recv_buffer_get(req);
- if (req->rl_reply) {
- req->rl_reply->rr_func = rpcrdma_reply_handler;
- /* this need only be done once, but... */
- req->rl_reply->rr_xprt = xprt;
- }
-
/* Must suppress retransmit to maintain credits */
if (req->rl_connect_cookie == xprt->connect_cookie)
goto drop_connection;
@@ -682,6 +680,17 @@ static void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
r_xprt->rx_stats.bad_reply_count);
}
+static int
+xprt_rdma_enable_swap(struct rpc_xprt *xprt)
+{
+ return -EINVAL;
+}
+
+static void
+xprt_rdma_disable_swap(struct rpc_xprt *xprt)
+{
+}
+
/*
* Plumbing for rpc transport switch and kernel module
*/
@@ -700,7 +709,10 @@ static struct rpc_xprt_ops xprt_rdma_procs = {
.send_request = xprt_rdma_send_request,
.close = xprt_rdma_close,
.destroy = xprt_rdma_destroy,
- .print_stats = xprt_rdma_print_stats
+ .print_stats = xprt_rdma_print_stats,
+ .enable_swap = xprt_rdma_enable_swap,
+ .disable_swap = xprt_rdma_disable_swap,
+ .inject_disconnect = xprt_rdma_inject_disconnect
};
static struct xprt_class xprt_rdma = {
@@ -711,7 +723,7 @@ static struct xprt_class xprt_rdma = {
.setup = xprt_setup_rdma,
};
-static void __exit xprt_rdma_cleanup(void)
+void xprt_rdma_cleanup(void)
{
int rc;
@@ -726,17 +738,24 @@ static void __exit xprt_rdma_cleanup(void)
if (rc)
dprintk("RPC: %s: xprt_unregister returned %i\n",
__func__, rc);
+
+ frwr_destroy_recovery_wq();
}
-static int __init xprt_rdma_init(void)
+int xprt_rdma_init(void)
{
int rc;
- rc = xprt_register_transport(&xprt_rdma);
-
+ rc = frwr_alloc_recovery_wq();
if (rc)
return rc;
+ rc = xprt_register_transport(&xprt_rdma);
+ if (rc) {
+ frwr_destroy_recovery_wq();
+ return rc;
+ }
+
dprintk("RPCRDMA Module Init, register RPC RDMA transport\n");
dprintk("Defaults:\n");
@@ -753,6 +772,3 @@ static int __init xprt_rdma_init(void)
#endif
return 0;
}
-
-module_init(xprt_rdma_init);
-module_exit(xprt_rdma_cleanup);
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 4870d272e..891c4ede2 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -80,7 +80,6 @@ static void
rpcrdma_run_tasklet(unsigned long data)
{
struct rpcrdma_rep *rep;
- void (*func)(struct rpcrdma_rep *);
unsigned long flags;
data = data;
@@ -89,14 +88,9 @@ rpcrdma_run_tasklet(unsigned long data)
rep = list_entry(rpcrdma_tasklets_g.next,
struct rpcrdma_rep, rr_list);
list_del(&rep->rr_list);
- func = rep->rr_func;
- rep->rr_func = NULL;
spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
- if (func)
- func(rep);
- else
- rpcrdma_recv_buffer_put(rep);
+ rpcrdma_reply_handler(rep);
spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
}
@@ -105,32 +99,6 @@ rpcrdma_run_tasklet(unsigned long data)
static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
-static const char * const async_event[] = {
- "CQ error",
- "QP fatal error",
- "QP request error",
- "QP access error",
- "communication established",
- "send queue drained",
- "path migration successful",
- "path mig error",
- "device fatal error",
- "port active",
- "port error",
- "LID change",
- "P_key change",
- "SM change",
- "SRQ error",
- "SRQ limit reached",
- "last WQE reached",
- "client reregister",
- "GID change",
-};
-
-#define ASYNC_MSG(status) \
- ((status) < ARRAY_SIZE(async_event) ? \
- async_event[(status)] : "unknown async error")
-
static void
rpcrdma_schedule_tasklet(struct list_head *sched_list)
{
@@ -148,7 +116,7 @@ rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
struct rpcrdma_ep *ep = context;
pr_err("RPC: %s: %s on device %s ep %p\n",
- __func__, ASYNC_MSG(event->event),
+ __func__, ib_event_msg(event->event),
event->device->name, context);
if (ep->rep_connected == 1) {
ep->rep_connected = -EIO;
@@ -163,7 +131,7 @@ rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
struct rpcrdma_ep *ep = context;
pr_err("RPC: %s: %s on device %s ep %p\n",
- __func__, ASYNC_MSG(event->event),
+ __func__, ib_event_msg(event->event),
event->device->name, context);
if (ep->rep_connected == 1) {
ep->rep_connected = -EIO;
@@ -172,35 +140,6 @@ rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
}
}
-static const char * const wc_status[] = {
- "success",
- "local length error",
- "local QP operation error",
- "local EE context operation error",
- "local protection error",
- "WR flushed",
- "memory management operation error",
- "bad response error",
- "local access error",
- "remote invalid request error",
- "remote access error",
- "remote operation error",
- "transport retry counter exceeded",
- "RNR retry counter exceeded",
- "local RDD violation error",
- "remove invalid RD request",
- "operation aborted",
- "invalid EE context number",
- "invalid EE context state",
- "fatal error",
- "response timeout error",
- "general error",
-};
-
-#define COMPLETION_MSG(status) \
- ((status) < ARRAY_SIZE(wc_status) ? \
- wc_status[(status)] : "unexpected completion error")
-
static void
rpcrdma_sendcq_process_wc(struct ib_wc *wc)
{
@@ -209,7 +148,7 @@ rpcrdma_sendcq_process_wc(struct ib_wc *wc)
if (wc->status != IB_WC_SUCCESS &&
wc->status != IB_WC_WR_FLUSH_ERR)
pr_err("RPC: %s: SEND: %s\n",
- __func__, COMPLETION_MSG(wc->status));
+ __func__, ib_wc_status_msg(wc->status));
} else {
struct rpcrdma_mw *r;
@@ -291,7 +230,7 @@ rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
__func__, rep, wc->byte_len);
rep->rr_len = wc->byte_len;
- ib_dma_sync_single_for_cpu(rdmab_to_ia(rep->rr_buffer)->ri_id->device,
+ ib_dma_sync_single_for_cpu(rep->rr_device,
rdmab_addr(rep->rr_rdmabuf),
rep->rr_len, DMA_FROM_DEVICE);
prefetch(rdmab_to_msg(rep->rr_rdmabuf));
@@ -302,7 +241,7 @@ out_schedule:
out_fail:
if (wc->status != IB_WC_WR_FLUSH_ERR)
pr_err("RPC: %s: rep %p: %s\n",
- __func__, rep, COMPLETION_MSG(wc->status));
+ __func__, rep, ib_wc_status_msg(wc->status));
rep->rr_len = ~0U;
goto out_schedule;
}
@@ -386,31 +325,6 @@ rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
rpcrdma_sendcq_process_wc(&wc);
}
-#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
-static const char * const conn[] = {
- "address resolved",
- "address error",
- "route resolved",
- "route error",
- "connect request",
- "connect response",
- "connect error",
- "unreachable",
- "rejected",
- "established",
- "disconnected",
- "device removal",
- "multicast join",
- "multicast error",
- "address change",
- "timewait exit",
-};
-
-#define CONNECTION_MSG(status) \
- ((status) < ARRAY_SIZE(conn) ? \
- conn[(status)] : "unrecognized connection error")
-#endif
-
static int
rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
{
@@ -476,7 +390,7 @@ connected:
default:
dprintk("RPC: %s: %pIS:%u (ep 0x%p): %s\n",
__func__, sap, rpc_get_port(sap), ep,
- CONNECTION_MSG(event->event));
+ rdma_event_msg(event->event));
break;
}
@@ -487,7 +401,7 @@ connected:
pr_info("rpcrdma: connection to %pIS:%u on %s, memreg '%s', %d credits, %d responders%s\n",
sap, rpc_get_port(sap),
- ia->ri_id->device->name,
+ ia->ri_device->name,
ia->ri_ops->ro_displayname,
xprt->rx_buf.rb_max_requests,
ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
@@ -588,8 +502,9 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
rc = PTR_ERR(ia->ri_id);
goto out1;
}
+ ia->ri_device = ia->ri_id->device;
- ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
+ ia->ri_pd = ib_alloc_pd(ia->ri_device);
if (IS_ERR(ia->ri_pd)) {
rc = PTR_ERR(ia->ri_pd);
dprintk("RPC: %s: ib_alloc_pd() failed %i\n",
@@ -597,7 +512,7 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
goto out2;
}
- rc = ib_query_device(ia->ri_id->device, devattr);
+ rc = ib_query_device(ia->ri_device, devattr);
if (rc) {
dprintk("RPC: %s: ib_query_device failed %d\n",
__func__, rc);
@@ -606,7 +521,7 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
if (devattr->device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
ia->ri_have_dma_lkey = 1;
- ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;
+ ia->ri_dma_lkey = ia->ri_device->local_dma_lkey;
}
if (memreg == RPCRDMA_FRMR) {
@@ -621,7 +536,7 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
}
}
if (memreg == RPCRDMA_MTHCAFMR) {
- if (!ia->ri_id->device->alloc_fmr) {
+ if (!ia->ri_device->alloc_fmr) {
dprintk("RPC: %s: MTHCAFMR registration "
"not supported by HCA\n", __func__);
memreg = RPCRDMA_ALLPHYSICAL;
@@ -670,9 +585,6 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
dprintk("RPC: %s: memory registration strategy is '%s'\n",
__func__, ia->ri_ops->ro_displayname);
- /* Else will do memory reg/dereg for each chunk */
- ia->ri_memreg_strategy = memreg;
-
rwlock_init(&ia->ri_qplock);
return 0;
@@ -702,17 +614,17 @@ rpcrdma_ia_close(struct rpcrdma_ia *ia)
dprintk("RPC: %s: ib_dereg_mr returned %i\n",
__func__, rc);
}
+
if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
if (ia->ri_id->qp)
rdma_destroy_qp(ia->ri_id);
rdma_destroy_id(ia->ri_id);
ia->ri_id = NULL;
}
- if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) {
- rc = ib_dealloc_pd(ia->ri_pd);
- dprintk("RPC: %s: ib_dealloc_pd returned %i\n",
- __func__, rc);
- }
+
+ /* If the pd is still busy, xprtrdma missed freeing a resource */
+ if (ia->ri_pd && !IS_ERR(ia->ri_pd))
+ WARN_ON(ib_dealloc_pd(ia->ri_pd));
}
/*
@@ -724,6 +636,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
{
struct ib_device_attr *devattr = &ia->ri_devattr;
struct ib_cq *sendcq, *recvcq;
+ struct ib_cq_init_attr cq_attr = {};
int rc, err;
/* check provider's send/recv wr limits */
@@ -771,9 +684,9 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
init_waitqueue_head(&ep->rep_connect_wait);
INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
- sendcq = ib_create_cq(ia->ri_id->device, rpcrdma_sendcq_upcall,
- rpcrdma_cq_async_error_upcall, ep,
- ep->rep_attr.cap.max_send_wr + 1, 0);
+ cq_attr.cqe = ep->rep_attr.cap.max_send_wr + 1;
+ sendcq = ib_create_cq(ia->ri_device, rpcrdma_sendcq_upcall,
+ rpcrdma_cq_async_error_upcall, ep, &cq_attr);
if (IS_ERR(sendcq)) {
rc = PTR_ERR(sendcq);
dprintk("RPC: %s: failed to create send CQ: %i\n",
@@ -788,9 +701,9 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
goto out2;
}
- recvcq = ib_create_cq(ia->ri_id->device, rpcrdma_recvcq_upcall,
- rpcrdma_cq_async_error_upcall, ep,
- ep->rep_attr.cap.max_recv_wr + 1, 0);
+ cq_attr.cqe = ep->rep_attr.cap.max_recv_wr + 1;
+ recvcq = ib_create_cq(ia->ri_device, rpcrdma_recvcq_upcall,
+ rpcrdma_cq_async_error_upcall, ep, &cq_attr);
if (IS_ERR(recvcq)) {
rc = PTR_ERR(recvcq);
dprintk("RPC: %s: failed to create recv CQ: %i\n",
@@ -896,8 +809,6 @@ retry:
rpcrdma_flush_cqs(ep);
xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
- ia->ri_ops->ro_reset(xprt);
-
id = rpcrdma_create_id(xprt, ia,
(struct sockaddr *)&xprt->rx_data.addr);
if (IS_ERR(id)) {
@@ -911,7 +822,7 @@ retry:
* More stuff I haven't thought of!
* Rrrgh!
*/
- if (ia->ri_id->device != id->device) {
+ if (ia->ri_device != id->device) {
printk("RPC: %s: can't reconnect on "
"different device!\n", __func__);
rdma_destroy_id(id);
@@ -1053,7 +964,8 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
goto out_free;
}
- rep->rr_buffer = &r_xprt->rx_buf;
+ rep->rr_device = ia->ri_device;
+ rep->rr_rxprt = r_xprt;
return rep;
out_free:
@@ -1177,31 +1089,33 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
kfree(buf->rb_pool);
}
-/* "*mw" can be NULL when rpcrdma_buffer_get_mrs() fails, leaving
- * some req segments uninitialized.
- */
-static void
-rpcrdma_buffer_put_mr(struct rpcrdma_mw **mw, struct rpcrdma_buffer *buf)
+struct rpcrdma_mw *
+rpcrdma_get_mw(struct rpcrdma_xprt *r_xprt)
{
- if (*mw) {
- list_add_tail(&(*mw)->mw_list, &buf->rb_mws);
- *mw = NULL;
+ struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+ struct rpcrdma_mw *mw = NULL;
+
+ spin_lock(&buf->rb_mwlock);
+ if (!list_empty(&buf->rb_mws)) {
+ mw = list_first_entry(&buf->rb_mws,
+ struct rpcrdma_mw, mw_list);
+ list_del_init(&mw->mw_list);
}
+ spin_unlock(&buf->rb_mwlock);
+
+ if (!mw)
+ pr_err("RPC: %s: no MWs available\n", __func__);
+ return mw;
}
-/* Cycle mw's back in reverse order, and "spin" them.
- * This delays and scrambles reuse as much as possible.
- */
-static void
-rpcrdma_buffer_put_mrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
+void
+rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw)
{
- struct rpcrdma_mr_seg *seg = req->rl_segments;
- struct rpcrdma_mr_seg *seg1 = seg;
- int i;
+ struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
- for (i = 1, seg++; i < RPCRDMA_MAX_SEGS; seg++, i++)
- rpcrdma_buffer_put_mr(&seg->rl_mw, buf);
- rpcrdma_buffer_put_mr(&seg1->rl_mw, buf);
+ spin_lock(&buf->rb_mwlock);
+ list_add_tail(&mw->mw_list, &buf->rb_mws);
+ spin_unlock(&buf->rb_mwlock);
}
static void
@@ -1211,115 +1125,10 @@ rpcrdma_buffer_put_sendbuf(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
req->rl_niovs = 0;
if (req->rl_reply) {
buf->rb_recv_bufs[--buf->rb_recv_index] = req->rl_reply;
- req->rl_reply->rr_func = NULL;
req->rl_reply = NULL;
}
}
-/* rpcrdma_unmap_one() was already done during deregistration.
- * Redo only the ib_post_send().
- */
-static void
-rpcrdma_retry_local_inv(struct rpcrdma_mw *r, struct rpcrdma_ia *ia)
-{
- struct rpcrdma_xprt *r_xprt =
- container_of(ia, struct rpcrdma_xprt, rx_ia);
- struct ib_send_wr invalidate_wr, *bad_wr;
- int rc;
-
- dprintk("RPC: %s: FRMR %p is stale\n", __func__, r);
-
- /* When this FRMR is re-inserted into rb_mws, it is no longer stale */
- r->r.frmr.fr_state = FRMR_IS_INVALID;
-
- memset(&invalidate_wr, 0, sizeof(invalidate_wr));
- invalidate_wr.wr_id = (unsigned long)(void *)r;
- invalidate_wr.opcode = IB_WR_LOCAL_INV;
- invalidate_wr.ex.invalidate_rkey = r->r.frmr.fr_mr->rkey;
- DECR_CQCOUNT(&r_xprt->rx_ep);
-
- dprintk("RPC: %s: frmr %p invalidating rkey %08x\n",
- __func__, r, r->r.frmr.fr_mr->rkey);
-
- read_lock(&ia->ri_qplock);
- rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
- read_unlock(&ia->ri_qplock);
- if (rc) {
- /* Force rpcrdma_buffer_get() to retry */
- r->r.frmr.fr_state = FRMR_IS_STALE;
- dprintk("RPC: %s: ib_post_send failed, %i\n",
- __func__, rc);
- }
-}
-
-static void
-rpcrdma_retry_flushed_linv(struct list_head *stale,
- struct rpcrdma_buffer *buf)
-{
- struct rpcrdma_ia *ia = rdmab_to_ia(buf);
- struct list_head *pos;
- struct rpcrdma_mw *r;
- unsigned long flags;
-
- list_for_each(pos, stale) {
- r = list_entry(pos, struct rpcrdma_mw, mw_list);
- rpcrdma_retry_local_inv(r, ia);
- }
-
- spin_lock_irqsave(&buf->rb_lock, flags);
- list_splice_tail(stale, &buf->rb_mws);
- spin_unlock_irqrestore(&buf->rb_lock, flags);
-}
-
-static struct rpcrdma_req *
-rpcrdma_buffer_get_frmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf,
- struct list_head *stale)
-{
- struct rpcrdma_mw *r;
- int i;
-
- i = RPCRDMA_MAX_SEGS - 1;
- while (!list_empty(&buf->rb_mws)) {
- r = list_entry(buf->rb_mws.next,
- struct rpcrdma_mw, mw_list);
- list_del(&r->mw_list);
- if (r->r.frmr.fr_state == FRMR_IS_STALE) {
- list_add(&r->mw_list, stale);
- continue;
- }
- req->rl_segments[i].rl_mw = r;
- if (unlikely(i-- == 0))
- return req; /* Success */
- }
-
- /* Not enough entries on rb_mws for this req */
- rpcrdma_buffer_put_sendbuf(req, buf);
- rpcrdma_buffer_put_mrs(req, buf);
- return NULL;
-}
-
-static struct rpcrdma_req *
-rpcrdma_buffer_get_fmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
-{
- struct rpcrdma_mw *r;
- int i;
-
- i = RPCRDMA_MAX_SEGS - 1;
- while (!list_empty(&buf->rb_mws)) {
- r = list_entry(buf->rb_mws.next,
- struct rpcrdma_mw, mw_list);
- list_del(&r->mw_list);
- req->rl_segments[i].rl_mw = r;
- if (unlikely(i-- == 0))
- return req; /* Success */
- }
-
- /* Not enough entries on rb_mws for this req */
- rpcrdma_buffer_put_sendbuf(req, buf);
- rpcrdma_buffer_put_mrs(req, buf);
- return NULL;
-}
-
/*
* Get a set of request/reply buffers.
*
@@ -1332,12 +1141,11 @@ rpcrdma_buffer_get_fmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
struct rpcrdma_req *
rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
{
- struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
- struct list_head stale;
struct rpcrdma_req *req;
unsigned long flags;
spin_lock_irqsave(&buffers->rb_lock, flags);
+
if (buffers->rb_send_index == buffers->rb_max_requests) {
spin_unlock_irqrestore(&buffers->rb_lock, flags);
dprintk("RPC: %s: out of request buffers\n", __func__);
@@ -1356,20 +1164,7 @@ rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
}
buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
- INIT_LIST_HEAD(&stale);
- switch (ia->ri_memreg_strategy) {
- case RPCRDMA_FRMR:
- req = rpcrdma_buffer_get_frmrs(req, buffers, &stale);
- break;
- case RPCRDMA_MTHCAFMR:
- req = rpcrdma_buffer_get_fmrs(req, buffers);
- break;
- default:
- break;
- }
spin_unlock_irqrestore(&buffers->rb_lock, flags);
- if (!list_empty(&stale))
- rpcrdma_retry_flushed_linv(&stale, buffers);
return req;
}
@@ -1381,19 +1176,10 @@ void
rpcrdma_buffer_put(struct rpcrdma_req *req)
{
struct rpcrdma_buffer *buffers = req->rl_buffer;
- struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
unsigned long flags;
spin_lock_irqsave(&buffers->rb_lock, flags);
rpcrdma_buffer_put_sendbuf(req, buffers);
- switch (ia->ri_memreg_strategy) {
- case RPCRDMA_FRMR:
- case RPCRDMA_MTHCAFMR:
- rpcrdma_buffer_put_mrs(req, buffers);
- break;
- default:
- break;
- }
spin_unlock_irqrestore(&buffers->rb_lock, flags);
}
@@ -1423,10 +1209,9 @@ rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
void
rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
{
- struct rpcrdma_buffer *buffers = rep->rr_buffer;
+ struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf;
unsigned long flags;
- rep->rr_func = NULL;
spin_lock_irqsave(&buffers->rb_lock, flags);
buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
spin_unlock_irqrestore(&buffers->rb_lock, flags);
@@ -1455,9 +1240,9 @@ rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
/*
* All memory passed here was kmalloc'ed, therefore phys-contiguous.
*/
- iov->addr = ib_dma_map_single(ia->ri_id->device,
+ iov->addr = ib_dma_map_single(ia->ri_device,
va, len, DMA_BIDIRECTIONAL);
- if (ib_dma_mapping_error(ia->ri_id->device, iov->addr))
+ if (ib_dma_mapping_error(ia->ri_device, iov->addr))
return -ENOMEM;
iov->length = len;
@@ -1501,8 +1286,8 @@ rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
{
int rc;
- ib_dma_unmap_single(ia->ri_id->device,
- iov->addr, iov->length, DMA_BIDIRECTIONAL);
+ ib_dma_unmap_single(ia->ri_device,
+ iov->addr, iov->length, DMA_BIDIRECTIONAL);
if (NULL == mr)
return 0;
@@ -1595,15 +1380,18 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
send_wr.num_sge = req->rl_niovs;
send_wr.opcode = IB_WR_SEND;
if (send_wr.num_sge == 4) /* no need to sync any pad (constant) */
- ib_dma_sync_single_for_device(ia->ri_id->device,
- req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
- DMA_TO_DEVICE);
- ib_dma_sync_single_for_device(ia->ri_id->device,
- req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
- DMA_TO_DEVICE);
- ib_dma_sync_single_for_device(ia->ri_id->device,
- req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
- DMA_TO_DEVICE);
+ ib_dma_sync_single_for_device(ia->ri_device,
+ req->rl_send_iov[3].addr,
+ req->rl_send_iov[3].length,
+ DMA_TO_DEVICE);
+ ib_dma_sync_single_for_device(ia->ri_device,
+ req->rl_send_iov[1].addr,
+ req->rl_send_iov[1].length,
+ DMA_TO_DEVICE);
+ ib_dma_sync_single_for_device(ia->ri_device,
+ req->rl_send_iov[0].addr,
+ req->rl_send_iov[0].length,
+ DMA_TO_DEVICE);
if (DECR_CQCOUNT(ep) > 0)
send_wr.send_flags = 0;
@@ -1636,7 +1424,7 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
recv_wr.num_sge = 1;
- ib_dma_sync_single_for_cpu(ia->ri_id->device,
+ ib_dma_sync_single_for_cpu(ia->ri_device,
rdmab_addr(rep->rr_rdmabuf),
rdmab_length(rep->rr_rdmabuf),
DMA_BIDIRECTIONAL);
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 78e0b8bea..f49dd8b38 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -62,6 +62,7 @@
struct rpcrdma_ia {
const struct rpcrdma_memreg_ops *ri_ops;
rwlock_t ri_qplock;
+ struct ib_device *ri_device;
struct rdma_cm_id *ri_id;
struct ib_pd *ri_pd;
struct ib_mr *ri_bind_mem;
@@ -69,7 +70,6 @@ struct rpcrdma_ia {
int ri_have_dma_lkey;
struct completion ri_done;
int ri_async_rc;
- enum rpcrdma_memreg ri_memreg_strategy;
unsigned int ri_max_frmr_depth;
struct ib_device_attr ri_devattr;
struct ib_qp_attr ri_qp_attr;
@@ -173,9 +173,8 @@ struct rpcrdma_buffer;
struct rpcrdma_rep {
unsigned int rr_len;
- struct rpcrdma_buffer *rr_buffer;
- struct rpc_xprt *rr_xprt;
- void (*rr_func)(struct rpcrdma_rep *);
+ struct ib_device *rr_device;
+ struct rpcrdma_xprt *rr_rxprt;
struct list_head rr_list;
struct rpcrdma_regbuf *rr_rdmabuf;
};
@@ -203,11 +202,18 @@ struct rpcrdma_frmr {
struct ib_fast_reg_page_list *fr_pgl;
struct ib_mr *fr_mr;
enum rpcrdma_frmr_state fr_state;
+ struct work_struct fr_work;
+ struct rpcrdma_xprt *fr_xprt;
+};
+
+struct rpcrdma_fmr {
+ struct ib_fmr *fmr;
+ u64 *physaddrs;
};
struct rpcrdma_mw {
union {
- struct ib_fmr *fmr;
+ struct rpcrdma_fmr fmr;
struct rpcrdma_frmr frmr;
} r;
void (*mw_sendcompletion)(struct ib_wc *);
@@ -281,15 +287,17 @@ rpcr_to_rdmar(struct rpc_rqst *rqst)
* One of these is associated with a transport instance
*/
struct rpcrdma_buffer {
- spinlock_t rb_lock; /* protects indexes */
- u32 rb_max_requests;/* client max requests */
- struct list_head rb_mws; /* optional memory windows/fmrs/frmrs */
- struct list_head rb_all;
- int rb_send_index;
+ spinlock_t rb_mwlock; /* protect rb_mws list */
+ struct list_head rb_mws;
+ struct list_head rb_all;
+ char *rb_pool;
+
+ spinlock_t rb_lock; /* protect buf arrays */
+ u32 rb_max_requests;
+ int rb_send_index;
+ int rb_recv_index;
struct rpcrdma_req **rb_send_bufs;
- int rb_recv_index;
struct rpcrdma_rep **rb_recv_bufs;
- char *rb_pool;
};
#define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia)
@@ -350,7 +358,6 @@ struct rpcrdma_memreg_ops {
struct rpcrdma_create_data_internal *);
size_t (*ro_maxpages)(struct rpcrdma_xprt *);
int (*ro_init)(struct rpcrdma_xprt *);
- void (*ro_reset)(struct rpcrdma_xprt *);
void (*ro_destroy)(struct rpcrdma_buffer *);
const char *ro_displayname;
};
@@ -413,6 +420,8 @@ int rpcrdma_ep_post_recv(struct rpcrdma_ia *, struct rpcrdma_ep *,
int rpcrdma_buffer_create(struct rpcrdma_xprt *);
void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);
+struct rpcrdma_mw *rpcrdma_get_mw(struct rpcrdma_xprt *);
+void rpcrdma_put_mw(struct rpcrdma_xprt *, struct rpcrdma_mw *);
struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *);
void rpcrdma_buffer_put(struct rpcrdma_req *);
void rpcrdma_recv_buffer_get(struct rpcrdma_req *);
@@ -425,6 +434,9 @@ void rpcrdma_free_regbuf(struct rpcrdma_ia *,
unsigned int rpcrdma_max_segments(struct rpcrdma_xprt *);
+int frwr_alloc_recovery_wq(void);
+void frwr_destroy_recovery_wq(void);
+
/*
* Wrappers for chunk registration, shared by read/write chunk code.
*/
@@ -480,6 +492,11 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *);
*/
int rpcrdma_marshal_req(struct rpc_rqst *);
+/* RPC/RDMA module init - xprtrdma/transport.c
+ */
+int xprt_rdma_init(void);
+void xprt_rdma_cleanup(void);
+
/* Temporary NFS request map cache. Created in svc_rdma.c */
extern struct kmem_cache *svc_rdma_map_cachep;
/* WR context cache. Created in svc_rdma.c */
@@ -487,10 +504,4 @@ extern struct kmem_cache *svc_rdma_ctxt_cachep;
/* Workqueue created in svc_rdma.c */
extern struct workqueue_struct *svc_rdma_wq;
-#if RPCSVC_MAXPAYLOAD < (RPCRDMA_MAX_DATA_SEGS << PAGE_SHIFT)
-#define RPCSVC_MAXPAYLOAD_RDMA RPCSVC_MAXPAYLOAD
-#else
-#define RPCSVC_MAXPAYLOAD_RDMA (RPCRDMA_MAX_DATA_SEGS << PAGE_SHIFT)
-#endif
-
#endif /* _LINUX_SUNRPC_XPRT_RDMA_H */