summaryrefslogtreecommitdiff
path: root/net/sunrpc/xprtrdma
diff options
context:
space:
mode:
Diffstat (limited to 'net/sunrpc/xprtrdma')
-rw-r--r--net/sunrpc/xprtrdma/Makefile9
-rw-r--r--net/sunrpc/xprtrdma/fmr_ops.c208
-rw-r--r--net/sunrpc/xprtrdma/frwr_ops.c353
-rw-r--r--net/sunrpc/xprtrdma/physical_ops.c94
-rw-r--r--net/sunrpc/xprtrdma/rpc_rdma.c889
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma.c302
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_marshal.c370
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_recvfrom.c672
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_sendto.c560
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_transport.c1366
-rw-r--r--net/sunrpc/xprtrdma/transport.c758
-rw-r--r--net/sunrpc/xprtrdma/verbs.c1672
-rw-r--r--net/sunrpc/xprtrdma/xprt_rdma.h496
13 files changed, 7749 insertions, 0 deletions
diff --git a/net/sunrpc/xprtrdma/Makefile b/net/sunrpc/xprtrdma/Makefile
new file mode 100644
index 000000000..579f72bbc
--- /dev/null
+++ b/net/sunrpc/xprtrdma/Makefile
@@ -0,0 +1,9 @@
+obj-$(CONFIG_SUNRPC_XPRT_RDMA_CLIENT) += xprtrdma.o
+
+xprtrdma-y := transport.o rpc_rdma.o verbs.o \
+ fmr_ops.o frwr_ops.o physical_ops.o
+
+obj-$(CONFIG_SUNRPC_XPRT_RDMA_SERVER) += svcrdma.o
+
+svcrdma-y := svc_rdma.o svc_rdma_transport.o \
+ svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o
diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c
new file mode 100644
index 000000000..302d4ebf6
--- /dev/null
+++ b/net/sunrpc/xprtrdma/fmr_ops.c
@@ -0,0 +1,208 @@
+/*
+ * Copyright (c) 2015 Oracle. All rights reserved.
+ * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
+ */
+
+/* Lightweight memory registration using Fast Memory Regions (FMR).
+ * Referred to sometimes as MTHCAFMR mode.
+ *
+ * FMR uses synchronous memory registration and deregistration.
+ * FMR registration is known to be fast, but FMR deregistration
+ * can take tens of usecs to complete.
+ */
+
+#include "xprt_rdma.h"
+
+#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
+# define RPCDBG_FACILITY RPCDBG_TRANS
+#endif
+
+/* Maximum scatter/gather per FMR */
+#define RPCRDMA_MAX_FMR_SGES (64)
+
+static int
+fmr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
+ struct rpcrdma_create_data_internal *cdata)
+{
+ return 0;
+}
+
+/* FMR mode conveys up to 64 pages of payload per chunk segment.
+ */
+static size_t
+fmr_op_maxpages(struct rpcrdma_xprt *r_xprt)
+{
+ return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS,
+ rpcrdma_max_segments(r_xprt) * RPCRDMA_MAX_FMR_SGES);
+}
+
+static int
+fmr_op_init(struct rpcrdma_xprt *r_xprt)
+{
+ struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+ int mr_access_flags = IB_ACCESS_REMOTE_WRITE | IB_ACCESS_REMOTE_READ;
+ struct ib_fmr_attr fmr_attr = {
+ .max_pages = RPCRDMA_MAX_FMR_SGES,
+ .max_maps = 1,
+ .page_shift = PAGE_SHIFT
+ };
+ struct ib_pd *pd = r_xprt->rx_ia.ri_pd;
+ struct rpcrdma_mw *r;
+ int i, rc;
+
+ INIT_LIST_HEAD(&buf->rb_mws);
+ INIT_LIST_HEAD(&buf->rb_all);
+
+ i = (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS;
+ dprintk("RPC: %s: initializing %d FMRs\n", __func__, i);
+
+ while (i--) {
+ r = kzalloc(sizeof(*r), GFP_KERNEL);
+ if (!r)
+ return -ENOMEM;
+
+ r->r.fmr = ib_alloc_fmr(pd, mr_access_flags, &fmr_attr);
+ if (IS_ERR(r->r.fmr))
+ goto out_fmr_err;
+
+ list_add(&r->mw_list, &buf->rb_mws);
+ list_add(&r->mw_all, &buf->rb_all);
+ }
+ return 0;
+
+out_fmr_err:
+ rc = PTR_ERR(r->r.fmr);
+ dprintk("RPC: %s: ib_alloc_fmr status %i\n", __func__, rc);
+ kfree(r);
+ return rc;
+}
+
+/* Use the ib_map_phys_fmr() verb to register a memory region
+ * for remote access via RDMA READ or RDMA WRITE.
+ */
+static int
+fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
+ int nsegs, bool writing)
+{
+ struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+ struct ib_device *device = ia->ri_id->device;
+ enum dma_data_direction direction = rpcrdma_data_dir(writing);
+ struct rpcrdma_mr_seg *seg1 = seg;
+ struct rpcrdma_mw *mw = seg1->rl_mw;
+ u64 physaddrs[RPCRDMA_MAX_DATA_SEGS];
+ int len, pageoff, i, rc;
+
+ pageoff = offset_in_page(seg1->mr_offset);
+ seg1->mr_offset -= pageoff; /* start of page */
+ seg1->mr_len += pageoff;
+ len = -pageoff;
+ if (nsegs > RPCRDMA_MAX_FMR_SGES)
+ nsegs = RPCRDMA_MAX_FMR_SGES;
+ for (i = 0; i < nsegs;) {
+ rpcrdma_map_one(device, seg, direction);
+ physaddrs[i] = seg->mr_dma;
+ len += seg->mr_len;
+ ++seg;
+ ++i;
+ /* Check for holes */
+ if ((i < nsegs && offset_in_page(seg->mr_offset)) ||
+ offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
+ break;
+ }
+
+ rc = ib_map_phys_fmr(mw->r.fmr, physaddrs, i, seg1->mr_dma);
+ if (rc)
+ goto out_maperr;
+
+ seg1->mr_rkey = mw->r.fmr->rkey;
+ seg1->mr_base = seg1->mr_dma + pageoff;
+ seg1->mr_nsegs = i;
+ seg1->mr_len = len;
+ return i;
+
+out_maperr:
+ dprintk("RPC: %s: ib_map_phys_fmr %u@0x%llx+%i (%d) status %i\n",
+ __func__, len, (unsigned long long)seg1->mr_dma,
+ pageoff, i, rc);
+ while (i--)
+ rpcrdma_unmap_one(device, --seg);
+ return rc;
+}
+
+/* Use the ib_unmap_fmr() verb to prevent further remote
+ * access via RDMA READ or RDMA WRITE.
+ */
+static int
+fmr_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg)
+{
+ struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+ struct rpcrdma_mr_seg *seg1 = seg;
+ struct ib_device *device;
+ int rc, nsegs = seg->mr_nsegs;
+ LIST_HEAD(l);
+
+ list_add(&seg1->rl_mw->r.fmr->list, &l);
+ rc = ib_unmap_fmr(&l);
+ read_lock(&ia->ri_qplock);
+ device = ia->ri_id->device;
+ while (seg1->mr_nsegs--)
+ rpcrdma_unmap_one(device, seg++);
+ read_unlock(&ia->ri_qplock);
+ if (rc)
+ goto out_err;
+ return nsegs;
+
+out_err:
+ dprintk("RPC: %s: ib_unmap_fmr status %i\n", __func__, rc);
+ return nsegs;
+}
+
+/* After a disconnect, unmap all FMRs.
+ *
+ * This is invoked only in the transport connect worker in order
+ * to serialize with rpcrdma_register_fmr_external().
+ */
+static void
+fmr_op_reset(struct rpcrdma_xprt *r_xprt)
+{
+ struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+ struct rpcrdma_mw *r;
+ LIST_HEAD(list);
+ int rc;
+
+ list_for_each_entry(r, &buf->rb_all, mw_all)
+ list_add(&r->r.fmr->list, &list);
+
+ rc = ib_unmap_fmr(&list);
+ if (rc)
+ dprintk("RPC: %s: ib_unmap_fmr failed %i\n",
+ __func__, rc);
+}
+
+static void
+fmr_op_destroy(struct rpcrdma_buffer *buf)
+{
+ struct rpcrdma_mw *r;
+ int rc;
+
+ while (!list_empty(&buf->rb_all)) {
+ r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all);
+ list_del(&r->mw_all);
+ rc = ib_dealloc_fmr(r->r.fmr);
+ if (rc)
+ dprintk("RPC: %s: ib_dealloc_fmr failed %i\n",
+ __func__, rc);
+ kfree(r);
+ }
+}
+
+const struct rpcrdma_memreg_ops rpcrdma_fmr_memreg_ops = {
+ .ro_map = fmr_op_map,
+ .ro_unmap = fmr_op_unmap,
+ .ro_open = fmr_op_open,
+ .ro_maxpages = fmr_op_maxpages,
+ .ro_init = fmr_op_init,
+ .ro_reset = fmr_op_reset,
+ .ro_destroy = fmr_op_destroy,
+ .ro_displayname = "fmr",
+};
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
new file mode 100644
index 000000000..dff0481db
--- /dev/null
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -0,0 +1,353 @@
+/*
+ * Copyright (c) 2015 Oracle. All rights reserved.
+ * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
+ */
+
+/* Lightweight memory registration using Fast Registration Work
+ * Requests (FRWR). Also referred to sometimes as FRMR mode.
+ *
+ * FRWR features ordered asynchronous registration and deregistration
+ * of arbitrarily sized memory regions. This is the fastest and safest
+ * but most complex memory registration mode.
+ */
+
+#include "xprt_rdma.h"
+
+#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
+# define RPCDBG_FACILITY RPCDBG_TRANS
+#endif
+
+static int
+__frwr_init(struct rpcrdma_mw *r, struct ib_pd *pd, struct ib_device *device,
+ unsigned int depth)
+{
+ struct rpcrdma_frmr *f = &r->r.frmr;
+ int rc;
+
+ f->fr_mr = ib_alloc_fast_reg_mr(pd, depth);
+ if (IS_ERR(f->fr_mr))
+ goto out_mr_err;
+ f->fr_pgl = ib_alloc_fast_reg_page_list(device, depth);
+ if (IS_ERR(f->fr_pgl))
+ goto out_list_err;
+ return 0;
+
+out_mr_err:
+ rc = PTR_ERR(f->fr_mr);
+ dprintk("RPC: %s: ib_alloc_fast_reg_mr status %i\n",
+ __func__, rc);
+ return rc;
+
+out_list_err:
+ rc = PTR_ERR(f->fr_pgl);
+ dprintk("RPC: %s: ib_alloc_fast_reg_page_list status %i\n",
+ __func__, rc);
+ ib_dereg_mr(f->fr_mr);
+ return rc;
+}
+
+static void
+__frwr_release(struct rpcrdma_mw *r)
+{
+ int rc;
+
+ rc = ib_dereg_mr(r->r.frmr.fr_mr);
+ if (rc)
+ dprintk("RPC: %s: ib_dereg_mr status %i\n",
+ __func__, rc);
+ ib_free_fast_reg_page_list(r->r.frmr.fr_pgl);
+}
+
+static int
+frwr_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
+ struct rpcrdma_create_data_internal *cdata)
+{
+ struct ib_device_attr *devattr = &ia->ri_devattr;
+ int depth, delta;
+
+ ia->ri_max_frmr_depth =
+ min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS,
+ devattr->max_fast_reg_page_list_len);
+ dprintk("RPC: %s: device's max FR page list len = %u\n",
+ __func__, ia->ri_max_frmr_depth);
+
+ /* Add room for frmr register and invalidate WRs.
+ * 1. FRMR reg WR for head
+ * 2. FRMR invalidate WR for head
+ * 3. N FRMR reg WRs for pagelist
+ * 4. N FRMR invalidate WRs for pagelist
+ * 5. FRMR reg WR for tail
+ * 6. FRMR invalidate WR for tail
+ * 7. The RDMA_SEND WR
+ */
+ depth = 7;
+
+ /* Calculate N if the device max FRMR depth is smaller than
+ * RPCRDMA_MAX_DATA_SEGS.
+ */
+ if (ia->ri_max_frmr_depth < RPCRDMA_MAX_DATA_SEGS) {
+ delta = RPCRDMA_MAX_DATA_SEGS - ia->ri_max_frmr_depth;
+ do {
+ depth += 2; /* FRMR reg + invalidate */
+ delta -= ia->ri_max_frmr_depth;
+ } while (delta > 0);
+ }
+
+ ep->rep_attr.cap.max_send_wr *= depth;
+ if (ep->rep_attr.cap.max_send_wr > devattr->max_qp_wr) {
+ cdata->max_requests = devattr->max_qp_wr / depth;
+ if (!cdata->max_requests)
+ return -EINVAL;
+ ep->rep_attr.cap.max_send_wr = cdata->max_requests *
+ depth;
+ }
+
+ return 0;
+}
+
+/* FRWR mode conveys a list of pages per chunk segment. The
+ * maximum length of that list is the FRWR page list depth.
+ */
+static size_t
+frwr_op_maxpages(struct rpcrdma_xprt *r_xprt)
+{
+ struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+
+ return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS,
+ rpcrdma_max_segments(r_xprt) * ia->ri_max_frmr_depth);
+}
+
+/* If FAST_REG or LOCAL_INV failed, indicate the frmr needs to be reset. */
+static void
+frwr_sendcompletion(struct ib_wc *wc)
+{
+ struct rpcrdma_mw *r;
+
+ if (likely(wc->status == IB_WC_SUCCESS))
+ return;
+
+ /* WARNING: Only wr_id and status are reliable at this point */
+ r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
+ dprintk("RPC: %s: frmr %p (stale), status %d\n",
+ __func__, r, wc->status);
+ r->r.frmr.fr_state = FRMR_IS_STALE;
+}
+
+static int
+frwr_op_init(struct rpcrdma_xprt *r_xprt)
+{
+ struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+ struct ib_device *device = r_xprt->rx_ia.ri_id->device;
+ unsigned int depth = r_xprt->rx_ia.ri_max_frmr_depth;
+ struct ib_pd *pd = r_xprt->rx_ia.ri_pd;
+ int i;
+
+ INIT_LIST_HEAD(&buf->rb_mws);
+ INIT_LIST_HEAD(&buf->rb_all);
+
+ i = (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS;
+ dprintk("RPC: %s: initializing %d FRMRs\n", __func__, i);
+
+ while (i--) {
+ struct rpcrdma_mw *r;
+ int rc;
+
+ r = kzalloc(sizeof(*r), GFP_KERNEL);
+ if (!r)
+ return -ENOMEM;
+
+ rc = __frwr_init(r, pd, device, depth);
+ if (rc) {
+ kfree(r);
+ return rc;
+ }
+
+ list_add(&r->mw_list, &buf->rb_mws);
+ list_add(&r->mw_all, &buf->rb_all);
+ r->mw_sendcompletion = frwr_sendcompletion;
+ }
+
+ return 0;
+}
+
+/* Post a FAST_REG Work Request to register a memory region
+ * for remote access via RDMA READ or RDMA WRITE.
+ */
+static int
+frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
+ int nsegs, bool writing)
+{
+ struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+ struct ib_device *device = ia->ri_id->device;
+ enum dma_data_direction direction = rpcrdma_data_dir(writing);
+ struct rpcrdma_mr_seg *seg1 = seg;
+ struct rpcrdma_mw *mw = seg1->rl_mw;
+ struct rpcrdma_frmr *frmr = &mw->r.frmr;
+ struct ib_mr *mr = frmr->fr_mr;
+ struct ib_send_wr fastreg_wr, *bad_wr;
+ u8 key;
+ int len, pageoff;
+ int i, rc;
+ int seg_len;
+ u64 pa;
+ int page_no;
+
+ pageoff = offset_in_page(seg1->mr_offset);
+ seg1->mr_offset -= pageoff; /* start of page */
+ seg1->mr_len += pageoff;
+ len = -pageoff;
+ if (nsegs > ia->ri_max_frmr_depth)
+ nsegs = ia->ri_max_frmr_depth;
+ for (page_no = i = 0; i < nsegs;) {
+ rpcrdma_map_one(device, seg, direction);
+ pa = seg->mr_dma;
+ for (seg_len = seg->mr_len; seg_len > 0; seg_len -= PAGE_SIZE) {
+ frmr->fr_pgl->page_list[page_no++] = pa;
+ pa += PAGE_SIZE;
+ }
+ len += seg->mr_len;
+ ++seg;
+ ++i;
+ /* Check for holes */
+ if ((i < nsegs && offset_in_page(seg->mr_offset)) ||
+ offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
+ break;
+ }
+ dprintk("RPC: %s: Using frmr %p to map %d segments (%d bytes)\n",
+ __func__, mw, i, len);
+
+ frmr->fr_state = FRMR_IS_VALID;
+
+ memset(&fastreg_wr, 0, sizeof(fastreg_wr));
+ fastreg_wr.wr_id = (unsigned long)(void *)mw;
+ fastreg_wr.opcode = IB_WR_FAST_REG_MR;
+ fastreg_wr.wr.fast_reg.iova_start = seg1->mr_dma + pageoff;
+ fastreg_wr.wr.fast_reg.page_list = frmr->fr_pgl;
+ fastreg_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
+ fastreg_wr.wr.fast_reg.page_list_len = page_no;
+ fastreg_wr.wr.fast_reg.length = len;
+ fastreg_wr.wr.fast_reg.access_flags = writing ?
+ IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
+ IB_ACCESS_REMOTE_READ;
+ key = (u8)(mr->rkey & 0x000000FF);
+ ib_update_fast_reg_key(mr, ++key);
+ fastreg_wr.wr.fast_reg.rkey = mr->rkey;
+
+ DECR_CQCOUNT(&r_xprt->rx_ep);
+ rc = ib_post_send(ia->ri_id->qp, &fastreg_wr, &bad_wr);
+ if (rc)
+ goto out_senderr;
+
+ seg1->mr_rkey = mr->rkey;
+ seg1->mr_base = seg1->mr_dma + pageoff;
+ seg1->mr_nsegs = i;
+ seg1->mr_len = len;
+ return i;
+
+out_senderr:
+ dprintk("RPC: %s: ib_post_send status %i\n", __func__, rc);
+ ib_update_fast_reg_key(mr, --key);
+ frmr->fr_state = FRMR_IS_INVALID;
+ while (i--)
+ rpcrdma_unmap_one(device, --seg);
+ return rc;
+}
+
+/* Post a LOCAL_INV Work Request to prevent further remote access
+ * via RDMA READ or RDMA WRITE.
+ */
+static int
+frwr_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg)
+{
+ struct rpcrdma_mr_seg *seg1 = seg;
+ struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+ struct ib_send_wr invalidate_wr, *bad_wr;
+ int rc, nsegs = seg->mr_nsegs;
+ struct ib_device *device;
+
+ seg1->rl_mw->r.frmr.fr_state = FRMR_IS_INVALID;
+
+ memset(&invalidate_wr, 0, sizeof(invalidate_wr));
+ invalidate_wr.wr_id = (unsigned long)(void *)seg1->rl_mw;
+ invalidate_wr.opcode = IB_WR_LOCAL_INV;
+ invalidate_wr.ex.invalidate_rkey = seg1->rl_mw->r.frmr.fr_mr->rkey;
+ DECR_CQCOUNT(&r_xprt->rx_ep);
+
+ read_lock(&ia->ri_qplock);
+ device = ia->ri_id->device;
+ while (seg1->mr_nsegs--)
+ rpcrdma_unmap_one(device, seg++);
+ rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
+ read_unlock(&ia->ri_qplock);
+ if (rc)
+ goto out_err;
+ return nsegs;
+
+out_err:
+ /* Force rpcrdma_buffer_get() to retry */
+ seg1->rl_mw->r.frmr.fr_state = FRMR_IS_STALE;
+ dprintk("RPC: %s: ib_post_send status %i\n", __func__, rc);
+ return nsegs;
+}
+
+/* After a disconnect, a flushed FAST_REG_MR can leave an FRMR in
+ * an unusable state. Find FRMRs in this state and dereg / reg
+ * each. FRMRs that are VALID and attached to an rpcrdma_req are
+ * also torn down.
+ *
+ * This gives all in-use FRMRs a fresh rkey and leaves them INVALID.
+ *
+ * This is invoked only in the transport connect worker in order
+ * to serialize with rpcrdma_register_frmr_external().
+ */
+static void
+frwr_op_reset(struct rpcrdma_xprt *r_xprt)
+{
+ struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+ struct ib_device *device = r_xprt->rx_ia.ri_id->device;
+ unsigned int depth = r_xprt->rx_ia.ri_max_frmr_depth;
+ struct ib_pd *pd = r_xprt->rx_ia.ri_pd;
+ struct rpcrdma_mw *r;
+ int rc;
+
+ list_for_each_entry(r, &buf->rb_all, mw_all) {
+ if (r->r.frmr.fr_state == FRMR_IS_INVALID)
+ continue;
+
+ __frwr_release(r);
+ rc = __frwr_init(r, pd, device, depth);
+ if (rc) {
+ dprintk("RPC: %s: mw %p left %s\n",
+ __func__, r,
+ (r->r.frmr.fr_state == FRMR_IS_STALE ?
+ "stale" : "valid"));
+ continue;
+ }
+
+ r->r.frmr.fr_state = FRMR_IS_INVALID;
+ }
+}
+
+static void
+frwr_op_destroy(struct rpcrdma_buffer *buf)
+{
+ struct rpcrdma_mw *r;
+
+ while (!list_empty(&buf->rb_all)) {
+ r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all);
+ list_del(&r->mw_all);
+ __frwr_release(r);
+ kfree(r);
+ }
+}
+
+const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops = {
+ .ro_map = frwr_op_map,
+ .ro_unmap = frwr_op_unmap,
+ .ro_open = frwr_op_open,
+ .ro_maxpages = frwr_op_maxpages,
+ .ro_init = frwr_op_init,
+ .ro_reset = frwr_op_reset,
+ .ro_destroy = frwr_op_destroy,
+ .ro_displayname = "frwr",
+};
diff --git a/net/sunrpc/xprtrdma/physical_ops.c b/net/sunrpc/xprtrdma/physical_ops.c
new file mode 100644
index 000000000..ba518af16
--- /dev/null
+++ b/net/sunrpc/xprtrdma/physical_ops.c
@@ -0,0 +1,94 @@
+/*
+ * Copyright (c) 2015 Oracle. All rights reserved.
+ * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
+ */
+
+/* No-op chunk preparation. All client memory is pre-registered.
+ * Sometimes referred to as ALLPHYSICAL mode.
+ *
+ * Physical registration is simple because all client memory is
+ * pre-registered and never deregistered. This mode is good for
+ * adapter bring up, but is considered not safe: the server is
+ * trusted not to abuse its access to client memory not involved
+ * in RDMA I/O.
+ */
+
+#include "xprt_rdma.h"
+
+#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
+# define RPCDBG_FACILITY RPCDBG_TRANS
+#endif
+
+static int
+physical_op_open(struct rpcrdma_ia *ia, struct rpcrdma_ep *ep,
+ struct rpcrdma_create_data_internal *cdata)
+{
+ return 0;
+}
+
+/* PHYSICAL memory registration conveys one page per chunk segment.
+ */
+static size_t
+physical_op_maxpages(struct rpcrdma_xprt *r_xprt)
+{
+ return min_t(unsigned int, RPCRDMA_MAX_DATA_SEGS,
+ rpcrdma_max_segments(r_xprt));
+}
+
+static int
+physical_op_init(struct rpcrdma_xprt *r_xprt)
+{
+ return 0;
+}
+
+/* The client's physical memory is already exposed for
+ * remote access via RDMA READ or RDMA WRITE.
+ */
+static int
+physical_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
+ int nsegs, bool writing)
+{
+ struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+
+ rpcrdma_map_one(ia->ri_id->device, seg,
+ rpcrdma_data_dir(writing));
+ seg->mr_rkey = ia->ri_bind_mem->rkey;
+ seg->mr_base = seg->mr_dma;
+ seg->mr_nsegs = 1;
+ return 1;
+}
+
+/* Unmap a memory region, but leave it registered.
+ */
+static int
+physical_op_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg)
+{
+ struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+
+ read_lock(&ia->ri_qplock);
+ rpcrdma_unmap_one(ia->ri_id->device, seg);
+ read_unlock(&ia->ri_qplock);
+
+ return 1;
+}
+
+static void
+physical_op_reset(struct rpcrdma_xprt *r_xprt)
+{
+}
+
+static void
+physical_op_destroy(struct rpcrdma_buffer *buf)
+{
+}
+
+const struct rpcrdma_memreg_ops rpcrdma_physical_memreg_ops = {
+ .ro_map = physical_op_map,
+ .ro_unmap = physical_op_unmap,
+ .ro_open = physical_op_open,
+ .ro_maxpages = physical_op_maxpages,
+ .ro_init = physical_op_init,
+ .ro_reset = physical_op_reset,
+ .ro_destroy = physical_op_destroy,
+ .ro_displayname = "physical",
+};
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
new file mode 100644
index 000000000..2c53ea9e1
--- /dev/null
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -0,0 +1,889 @@
+/*
+ * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses. You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the BSD-type
+ * license below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided
+ * with the distribution.
+ *
+ * Neither the name of the Network Appliance, Inc. nor the names of
+ * its contributors may be used to endorse or promote products
+ * derived from this software without specific prior written
+ * permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/*
+ * rpc_rdma.c
+ *
+ * This file contains the guts of the RPC RDMA protocol, and
+ * does marshaling/unmarshaling, etc. It is also where interfacing
+ * to the Linux RPC framework lives.
+ */
+
+#include "xprt_rdma.h"
+
+#include <linux/highmem.h>
+
+#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
+# define RPCDBG_FACILITY RPCDBG_TRANS
+#endif
+
+enum rpcrdma_chunktype {
+ rpcrdma_noch = 0,
+ rpcrdma_readch,
+ rpcrdma_areadch,
+ rpcrdma_writech,
+ rpcrdma_replych
+};
+
+#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
+static const char transfertypes[][12] = {
+ "pure inline", /* no chunks */
+ " read chunk", /* some argument via rdma read */
+ "*read chunk", /* entire request via rdma read */
+ "write chunk", /* some result via rdma write */
+ "reply chunk" /* entire reply via rdma write */
+};
+#endif
+
+/*
+ * Chunk assembly from upper layer xdr_buf.
+ *
+ * Prepare the passed-in xdr_buf into representation as RPC/RDMA chunk
+ * elements. Segments are then coalesced when registered, if possible
+ * within the selected memreg mode.
+ *
+ * Returns positive number of segments converted, or a negative errno.
+ */
+
+static int
+rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
+ enum rpcrdma_chunktype type, struct rpcrdma_mr_seg *seg, int nsegs)
+{
+ int len, n = 0, p;
+ int page_base;
+ struct page **ppages;
+
+ if (pos == 0 && xdrbuf->head[0].iov_len) {
+ seg[n].mr_page = NULL;
+ seg[n].mr_offset = xdrbuf->head[0].iov_base;
+ seg[n].mr_len = xdrbuf->head[0].iov_len;
+ ++n;
+ }
+
+ len = xdrbuf->page_len;
+ ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT);
+ page_base = xdrbuf->page_base & ~PAGE_MASK;
+ p = 0;
+ while (len && n < nsegs) {
+ if (!ppages[p]) {
+ /* alloc the pagelist for receiving buffer */
+ ppages[p] = alloc_page(GFP_ATOMIC);
+ if (!ppages[p])
+ return -ENOMEM;
+ }
+ seg[n].mr_page = ppages[p];
+ seg[n].mr_offset = (void *)(unsigned long) page_base;
+ seg[n].mr_len = min_t(u32, PAGE_SIZE - page_base, len);
+ if (seg[n].mr_len > PAGE_SIZE)
+ return -EIO;
+ len -= seg[n].mr_len;
+ ++n;
+ ++p;
+ page_base = 0; /* page offset only applies to first page */
+ }
+
+ /* Message overflows the seg array */
+ if (len && n == nsegs)
+ return -EIO;
+
+ if (xdrbuf->tail[0].iov_len) {
+ /* the rpcrdma protocol allows us to omit any trailing
+ * xdr pad bytes, saving the server an RDMA operation. */
+ if (xdrbuf->tail[0].iov_len < 4 && xprt_rdma_pad_optimize)
+ return n;
+ if (n == nsegs)
+ /* Tail remains, but we're out of segments */
+ return -EIO;
+ seg[n].mr_page = NULL;
+ seg[n].mr_offset = xdrbuf->tail[0].iov_base;
+ seg[n].mr_len = xdrbuf->tail[0].iov_len;
+ ++n;
+ }
+
+ return n;
+}
+
+/*
+ * Create read/write chunk lists, and reply chunks, for RDMA
+ *
+ * Assume check against THRESHOLD has been done, and chunks are required.
+ * Assume only encoding one list entry for read|write chunks. The NFSv3
+ * protocol is simple enough to allow this as it only has a single "bulk
+ * result" in each procedure - complicated NFSv4 COMPOUNDs are not. (The
+ * RDMA/Sessions NFSv4 proposal addresses this for future v4 revs.)
+ *
+ * When used for a single reply chunk (which is a special write
+ * chunk used for the entire reply, rather than just the data), it
+ * is used primarily for READDIR and READLINK which would otherwise
+ * be severely size-limited by a small rdma inline read max. The server
+ * response will come back as an RDMA Write, followed by a message
+ * of type RDMA_NOMSG carrying the xid and length. As a result, reply
+ * chunks do not provide data alignment, however they do not require
+ * "fixup" (moving the response to the upper layer buffer) either.
+ *
+ * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
+ *
+ * Read chunklist (a linked list):
+ * N elements, position P (same P for all chunks of same arg!):
+ * 1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0
+ *
+ * Write chunklist (a list of (one) counted array):
+ * N elements:
+ * 1 - N - HLOO - HLOO - ... - HLOO - 0
+ *
+ * Reply chunk (a counted array):
+ * N elements:
+ * 1 - N - HLOO - HLOO - ... - HLOO
+ *
+ * Returns positive RPC/RDMA header size, or negative errno.
+ */
+
+static ssize_t
+rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,
+ struct rpcrdma_msg *headerp, enum rpcrdma_chunktype type)
+{
+ struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
+ int n, nsegs, nchunks = 0;
+ unsigned int pos;
+ struct rpcrdma_mr_seg *seg = req->rl_segments;
+ struct rpcrdma_read_chunk *cur_rchunk = NULL;
+ struct rpcrdma_write_array *warray = NULL;
+ struct rpcrdma_write_chunk *cur_wchunk = NULL;
+ __be32 *iptr = headerp->rm_body.rm_chunks;
+ int (*map)(struct rpcrdma_xprt *, struct rpcrdma_mr_seg *, int, bool);
+
+ if (type == rpcrdma_readch || type == rpcrdma_areadch) {
+ /* a read chunk - server will RDMA Read our memory */
+ cur_rchunk = (struct rpcrdma_read_chunk *) iptr;
+ } else {
+ /* a write or reply chunk - server will RDMA Write our memory */
+ *iptr++ = xdr_zero; /* encode a NULL read chunk list */
+ if (type == rpcrdma_replych)
+ *iptr++ = xdr_zero; /* a NULL write chunk list */
+ warray = (struct rpcrdma_write_array *) iptr;
+ cur_wchunk = (struct rpcrdma_write_chunk *) (warray + 1);
+ }
+
+ if (type == rpcrdma_replych || type == rpcrdma_areadch)
+ pos = 0;
+ else
+ pos = target->head[0].iov_len;
+
+ nsegs = rpcrdma_convert_iovs(target, pos, type, seg, RPCRDMA_MAX_SEGS);
+ if (nsegs < 0)
+ return nsegs;
+
+ map = r_xprt->rx_ia.ri_ops->ro_map;
+ do {
+ n = map(r_xprt, seg, nsegs, cur_wchunk != NULL);
+ if (n <= 0)
+ goto out;
+ if (cur_rchunk) { /* read */
+ cur_rchunk->rc_discrim = xdr_one;
+ /* all read chunks have the same "position" */
+ cur_rchunk->rc_position = cpu_to_be32(pos);
+ cur_rchunk->rc_target.rs_handle =
+ cpu_to_be32(seg->mr_rkey);
+ cur_rchunk->rc_target.rs_length =
+ cpu_to_be32(seg->mr_len);
+ xdr_encode_hyper(
+ (__be32 *)&cur_rchunk->rc_target.rs_offset,
+ seg->mr_base);
+ dprintk("RPC: %s: read chunk "
+ "elem %d@0x%llx:0x%x pos %u (%s)\n", __func__,
+ seg->mr_len, (unsigned long long)seg->mr_base,
+ seg->mr_rkey, pos, n < nsegs ? "more" : "last");
+ cur_rchunk++;
+ r_xprt->rx_stats.read_chunk_count++;
+ } else { /* write/reply */
+ cur_wchunk->wc_target.rs_handle =
+ cpu_to_be32(seg->mr_rkey);
+ cur_wchunk->wc_target.rs_length =
+ cpu_to_be32(seg->mr_len);
+ xdr_encode_hyper(
+ (__be32 *)&cur_wchunk->wc_target.rs_offset,
+ seg->mr_base);
+ dprintk("RPC: %s: %s chunk "
+ "elem %d@0x%llx:0x%x (%s)\n", __func__,
+ (type == rpcrdma_replych) ? "reply" : "write",
+ seg->mr_len, (unsigned long long)seg->mr_base,
+ seg->mr_rkey, n < nsegs ? "more" : "last");
+ cur_wchunk++;
+ if (type == rpcrdma_replych)
+ r_xprt->rx_stats.reply_chunk_count++;
+ else
+ r_xprt->rx_stats.write_chunk_count++;
+ r_xprt->rx_stats.total_rdma_request += seg->mr_len;
+ }
+ nchunks++;
+ seg += n;
+ nsegs -= n;
+ } while (nsegs);
+
+ /* success. all failures return above */
+ req->rl_nchunks = nchunks;
+
+ /*
+ * finish off header. If write, marshal discrim and nchunks.
+ */
+ if (cur_rchunk) {
+ iptr = (__be32 *) cur_rchunk;
+ *iptr++ = xdr_zero; /* finish the read chunk list */
+ *iptr++ = xdr_zero; /* encode a NULL write chunk list */
+ *iptr++ = xdr_zero; /* encode a NULL reply chunk */
+ } else {
+ warray->wc_discrim = xdr_one;
+ warray->wc_nchunks = cpu_to_be32(nchunks);
+ iptr = (__be32 *) cur_wchunk;
+ if (type == rpcrdma_writech) {
+ *iptr++ = xdr_zero; /* finish the write chunk list */
+ *iptr++ = xdr_zero; /* encode a NULL reply chunk */
+ }
+ }
+
+ /*
+ * Return header size.
+ */
+ return (unsigned char *)iptr - (unsigned char *)headerp;
+
+out:
+ if (r_xprt->rx_ia.ri_memreg_strategy == RPCRDMA_FRMR)
+ return n;
+
+ for (pos = 0; nchunks--;)
+ pos += r_xprt->rx_ia.ri_ops->ro_unmap(r_xprt,
+ &req->rl_segments[pos]);
+ return n;
+}
+
+/*
+ * Copy write data inline.
+ * This function is used for "small" requests. Data which is passed
+ * to RPC via iovecs (or page list) is copied directly into the
+ * pre-registered memory buffer for this request. For small amounts
+ * of data, this is efficient. The cutoff value is tunable.
+ */
+static int
+rpcrdma_inline_pullup(struct rpc_rqst *rqst, int pad)
+{
+ int i, npages, curlen;
+ int copy_len;
+ unsigned char *srcp, *destp;
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
+ int page_base;
+ struct page **ppages;
+
+ destp = rqst->rq_svec[0].iov_base;
+ curlen = rqst->rq_svec[0].iov_len;
+ destp += curlen;
+ /*
+ * Do optional padding where it makes sense. Alignment of write
+ * payload can help the server, if our setting is accurate.
+ */
+ pad -= (curlen + 36/*sizeof(struct rpcrdma_msg_padded)*/);
+ if (pad < 0 || rqst->rq_slen - curlen < RPCRDMA_INLINE_PAD_THRESH)
+ pad = 0; /* don't pad this request */
+
+ dprintk("RPC: %s: pad %d destp 0x%p len %d hdrlen %d\n",
+ __func__, pad, destp, rqst->rq_slen, curlen);
+
+ copy_len = rqst->rq_snd_buf.page_len;
+
+ if (rqst->rq_snd_buf.tail[0].iov_len) {
+ curlen = rqst->rq_snd_buf.tail[0].iov_len;
+ if (destp + copy_len != rqst->rq_snd_buf.tail[0].iov_base) {
+ memmove(destp + copy_len,
+ rqst->rq_snd_buf.tail[0].iov_base, curlen);
+ r_xprt->rx_stats.pullup_copy_count += curlen;
+ }
+ dprintk("RPC: %s: tail destp 0x%p len %d\n",
+ __func__, destp + copy_len, curlen);
+ rqst->rq_svec[0].iov_len += curlen;
+ }
+ r_xprt->rx_stats.pullup_copy_count += copy_len;
+
+ page_base = rqst->rq_snd_buf.page_base;
+ ppages = rqst->rq_snd_buf.pages + (page_base >> PAGE_SHIFT);
+ page_base &= ~PAGE_MASK;
+ npages = PAGE_ALIGN(page_base+copy_len) >> PAGE_SHIFT;
+ for (i = 0; copy_len && i < npages; i++) {
+ curlen = PAGE_SIZE - page_base;
+ if (curlen > copy_len)
+ curlen = copy_len;
+ dprintk("RPC: %s: page %d destp 0x%p len %d curlen %d\n",
+ __func__, i, destp, copy_len, curlen);
+ srcp = kmap_atomic(ppages[i]);
+ memcpy(destp, srcp+page_base, curlen);
+ kunmap_atomic(srcp);
+ rqst->rq_svec[0].iov_len += curlen;
+ destp += curlen;
+ copy_len -= curlen;
+ page_base = 0;
+ }
+ /* header now contains entire send message */
+ return pad;
+}
+
+/*
+ * Marshal a request: the primary job of this routine is to choose
+ * the transfer modes. See comments below.
+ *
+ * Uses multiple RDMA IOVs for a request:
+ * [0] -- RPC RDMA header, which uses memory from the *start* of the
+ * preregistered buffer that already holds the RPC data in
+ * its middle.
+ * [1] -- the RPC header/data, marshaled by RPC and the NFS protocol.
+ * [2] -- optional padding.
+ * [3] -- if padded, header only in [1] and data here.
+ *
+ * Returns zero on success, otherwise a negative errno.
+ */
+
+int
+rpcrdma_marshal_req(struct rpc_rqst *rqst)
+{
+ struct rpc_xprt *xprt = rqst->rq_xprt;
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+ struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
+ char *base;
+ size_t rpclen, padlen;
+ ssize_t hdrlen;
+ enum rpcrdma_chunktype rtype, wtype;
+ struct rpcrdma_msg *headerp;
+
+ /*
+ * rpclen gets amount of data in first buffer, which is the
+ * pre-registered buffer.
+ */
+ base = rqst->rq_svec[0].iov_base;
+ rpclen = rqst->rq_svec[0].iov_len;
+
+ headerp = rdmab_to_msg(req->rl_rdmabuf);
+ /* don't byte-swap XID, it's already done in request */
+ headerp->rm_xid = rqst->rq_xid;
+ headerp->rm_vers = rpcrdma_version;
+ headerp->rm_credit = cpu_to_be32(r_xprt->rx_buf.rb_max_requests);
+ headerp->rm_type = rdma_msg;
+
+ /*
+ * Chunks needed for results?
+ *
+ * o If the expected result is under the inline threshold, all ops
+ * return as inline (but see later).
+ * o Large non-read ops return as a single reply chunk.
+ * o Large read ops return data as write chunk(s), header as inline.
+ *
+ * Note: the NFS code sending down multiple result segments implies
+ * the op is one of read, readdir[plus], readlink or NFSv4 getacl.
+ */
+
+ /*
+ * This code can handle read chunks, write chunks OR reply
+ * chunks -- only one type. If the request is too big to fit
+ * inline, then we will choose read chunks. If the request is
+ * a READ, then use write chunks to separate the file data
+ * into pages; otherwise use reply chunks.
+ */
+ if (rqst->rq_rcv_buf.buflen <= RPCRDMA_INLINE_READ_THRESHOLD(rqst))
+ wtype = rpcrdma_noch;
+ else if (rqst->rq_rcv_buf.page_len == 0)
+ wtype = rpcrdma_replych;
+ else if (rqst->rq_rcv_buf.flags & XDRBUF_READ)
+ wtype = rpcrdma_writech;
+ else
+ wtype = rpcrdma_replych;
+
+ /*
+ * Chunks needed for arguments?
+ *
+ * o If the total request is under the inline threshold, all ops
+ * are sent as inline.
+ * o Large non-write ops are sent with the entire message as a
+ * single read chunk (protocol 0-position special case).
+ * o Large write ops transmit data as read chunk(s), header as
+ * inline.
+ *
+ * Note: the NFS code sending down multiple argument segments
+ * implies the op is a write.
+ * TBD check NFSv4 setacl
+ */
+ if (rqst->rq_snd_buf.len <= RPCRDMA_INLINE_WRITE_THRESHOLD(rqst))
+ rtype = rpcrdma_noch;
+ else if (rqst->rq_snd_buf.page_len == 0)
+ rtype = rpcrdma_areadch;
+ else
+ rtype = rpcrdma_readch;
+
+ /* The following simplification is not true forever */
+ if (rtype != rpcrdma_noch && wtype == rpcrdma_replych)
+ wtype = rpcrdma_noch;
+ if (rtype != rpcrdma_noch && wtype != rpcrdma_noch) {
+ dprintk("RPC: %s: cannot marshal multiple chunk lists\n",
+ __func__);
+ return -EIO;
+ }
+
+ hdrlen = RPCRDMA_HDRLEN_MIN;
+ padlen = 0;
+
+ /*
+ * Pull up any extra send data into the preregistered buffer.
+ * When padding is in use and applies to the transfer, insert
+ * it and change the message type.
+ */
+ if (rtype == rpcrdma_noch) {
+
+ padlen = rpcrdma_inline_pullup(rqst,
+ RPCRDMA_INLINE_PAD_VALUE(rqst));
+
+ if (padlen) {
+ headerp->rm_type = rdma_msgp;
+ headerp->rm_body.rm_padded.rm_align =
+ cpu_to_be32(RPCRDMA_INLINE_PAD_VALUE(rqst));
+ headerp->rm_body.rm_padded.rm_thresh =
+ cpu_to_be32(RPCRDMA_INLINE_PAD_THRESH);
+ headerp->rm_body.rm_padded.rm_pempty[0] = xdr_zero;
+ headerp->rm_body.rm_padded.rm_pempty[1] = xdr_zero;
+ headerp->rm_body.rm_padded.rm_pempty[2] = xdr_zero;
+ hdrlen += 2 * sizeof(u32); /* extra words in padhdr */
+ if (wtype != rpcrdma_noch) {
+ dprintk("RPC: %s: invalid chunk list\n",
+ __func__);
+ return -EIO;
+ }
+ } else {
+ headerp->rm_body.rm_nochunks.rm_empty[0] = xdr_zero;
+ headerp->rm_body.rm_nochunks.rm_empty[1] = xdr_zero;
+ headerp->rm_body.rm_nochunks.rm_empty[2] = xdr_zero;
+ /* new length after pullup */
+ rpclen = rqst->rq_svec[0].iov_len;
+ /*
+ * Currently we try to not actually use read inline.
+ * Reply chunks have the desirable property that
+ * they land, packed, directly in the target buffers
+ * without headers, so they require no fixup. The
+ * additional RDMA Write op sends the same amount
+ * of data, streams on-the-wire and adds no overhead
+ * on receive. Therefore, we request a reply chunk
+ * for non-writes wherever feasible and efficient.
+ */
+ if (wtype == rpcrdma_noch)
+ wtype = rpcrdma_replych;
+ }
+ }
+
+ if (rtype != rpcrdma_noch) {
+ hdrlen = rpcrdma_create_chunks(rqst, &rqst->rq_snd_buf,
+ headerp, rtype);
+ wtype = rtype; /* simplify dprintk */
+
+ } else if (wtype != rpcrdma_noch) {
+ hdrlen = rpcrdma_create_chunks(rqst, &rqst->rq_rcv_buf,
+ headerp, wtype);
+ }
+ if (hdrlen < 0)
+ return hdrlen;
+
+ dprintk("RPC: %s: %s: hdrlen %zd rpclen %zd padlen %zd"
+ " headerp 0x%p base 0x%p lkey 0x%x\n",
+ __func__, transfertypes[wtype], hdrlen, rpclen, padlen,
+ headerp, base, rdmab_lkey(req->rl_rdmabuf));
+
+ /*
+ * initialize send_iov's - normally only two: rdma chunk header and
+ * single preregistered RPC header buffer, but if padding is present,
+ * then use a preregistered (and zeroed) pad buffer between the RPC
+ * header and any write data. In all non-rdma cases, any following
+ * data has been copied into the RPC header buffer.
+ */
+ req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf);
+ req->rl_send_iov[0].length = hdrlen;
+ req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf);
+
+ req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf);
+ req->rl_send_iov[1].length = rpclen;
+ req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf);
+
+ req->rl_niovs = 2;
+
+ if (padlen) {
+ struct rpcrdma_ep *ep = &r_xprt->rx_ep;
+
+ req->rl_send_iov[2].addr = rdmab_addr(ep->rep_padbuf);
+ req->rl_send_iov[2].length = padlen;
+ req->rl_send_iov[2].lkey = rdmab_lkey(ep->rep_padbuf);
+
+ req->rl_send_iov[3].addr = req->rl_send_iov[1].addr + rpclen;
+ req->rl_send_iov[3].length = rqst->rq_slen - rpclen;
+ req->rl_send_iov[3].lkey = rdmab_lkey(req->rl_sendbuf);
+
+ req->rl_niovs = 4;
+ }
+
+ return 0;
+}
+
+/*
+ * Chase down a received write or reply chunklist to get length
+ * RDMA'd by server. See map at rpcrdma_create_chunks()! :-)
+ */
+static int
+rpcrdma_count_chunks(struct rpcrdma_rep *rep, unsigned int max, int wrchunk, __be32 **iptrp)
+{
+ unsigned int i, total_len;
+ struct rpcrdma_write_chunk *cur_wchunk;
+ char *base = (char *)rdmab_to_msg(rep->rr_rdmabuf);
+
+ i = be32_to_cpu(**iptrp);
+ if (i > max)
+ return -1;
+ cur_wchunk = (struct rpcrdma_write_chunk *) (*iptrp + 1);
+ total_len = 0;
+ while (i--) {
+ struct rpcrdma_segment *seg = &cur_wchunk->wc_target;
+ ifdebug(FACILITY) {
+ u64 off;
+ xdr_decode_hyper((__be32 *)&seg->rs_offset, &off);
+ dprintk("RPC: %s: chunk %d@0x%llx:0x%x\n",
+ __func__,
+ be32_to_cpu(seg->rs_length),
+ (unsigned long long)off,
+ be32_to_cpu(seg->rs_handle));
+ }
+ total_len += be32_to_cpu(seg->rs_length);
+ ++cur_wchunk;
+ }
+ /* check and adjust for properly terminated write chunk */
+ if (wrchunk) {
+ __be32 *w = (__be32 *) cur_wchunk;
+ if (*w++ != xdr_zero)
+ return -1;
+ cur_wchunk = (struct rpcrdma_write_chunk *) w;
+ }
+ if ((char *)cur_wchunk > base + rep->rr_len)
+ return -1;
+
+ *iptrp = (__be32 *) cur_wchunk;
+ return total_len;
+}
+
+/*
+ * Scatter inline received data back into provided iov's.
+ */
+static void
+rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad)
+{
+ int i, npages, curlen, olen;
+ char *destp;
+ struct page **ppages;
+ int page_base;
+
+ curlen = rqst->rq_rcv_buf.head[0].iov_len;
+ if (curlen > copy_len) { /* write chunk header fixup */
+ curlen = copy_len;
+ rqst->rq_rcv_buf.head[0].iov_len = curlen;
+ }
+
+ dprintk("RPC: %s: srcp 0x%p len %d hdrlen %d\n",
+ __func__, srcp, copy_len, curlen);
+
+ /* Shift pointer for first receive segment only */
+ rqst->rq_rcv_buf.head[0].iov_base = srcp;
+ srcp += curlen;
+ copy_len -= curlen;
+
+ olen = copy_len;
+ i = 0;
+ rpcx_to_rdmax(rqst->rq_xprt)->rx_stats.fixup_copy_count += olen;
+ page_base = rqst->rq_rcv_buf.page_base;
+ ppages = rqst->rq_rcv_buf.pages + (page_base >> PAGE_SHIFT);
+ page_base &= ~PAGE_MASK;
+
+ if (copy_len && rqst->rq_rcv_buf.page_len) {
+ npages = PAGE_ALIGN(page_base +
+ rqst->rq_rcv_buf.page_len) >> PAGE_SHIFT;
+ for (; i < npages; i++) {
+ curlen = PAGE_SIZE - page_base;
+ if (curlen > copy_len)
+ curlen = copy_len;
+ dprintk("RPC: %s: page %d"
+ " srcp 0x%p len %d curlen %d\n",
+ __func__, i, srcp, copy_len, curlen);
+ destp = kmap_atomic(ppages[i]);
+ memcpy(destp + page_base, srcp, curlen);
+ flush_dcache_page(ppages[i]);
+ kunmap_atomic(destp);
+ srcp += curlen;
+ copy_len -= curlen;
+ if (copy_len == 0)
+ break;
+ page_base = 0;
+ }
+ }
+
+ if (copy_len && rqst->rq_rcv_buf.tail[0].iov_len) {
+ curlen = copy_len;
+ if (curlen > rqst->rq_rcv_buf.tail[0].iov_len)
+ curlen = rqst->rq_rcv_buf.tail[0].iov_len;
+ if (rqst->rq_rcv_buf.tail[0].iov_base != srcp)
+ memmove(rqst->rq_rcv_buf.tail[0].iov_base, srcp, curlen);
+ dprintk("RPC: %s: tail srcp 0x%p len %d curlen %d\n",
+ __func__, srcp, copy_len, curlen);
+ rqst->rq_rcv_buf.tail[0].iov_len = curlen;
+ copy_len -= curlen; ++i;
+ } else
+ rqst->rq_rcv_buf.tail[0].iov_len = 0;
+
+ if (pad) {
+ /* implicit padding on terminal chunk */
+ unsigned char *p = rqst->rq_rcv_buf.tail[0].iov_base;
+ while (pad--)
+ p[rqst->rq_rcv_buf.tail[0].iov_len++] = 0;
+ }
+
+ if (copy_len)
+ dprintk("RPC: %s: %d bytes in"
+ " %d extra segments (%d lost)\n",
+ __func__, olen, i, copy_len);
+
+ /* TBD avoid a warning from call_decode() */
+ rqst->rq_private_buf = rqst->rq_rcv_buf;
+}
+
+void
+rpcrdma_connect_worker(struct work_struct *work)
+{
+ struct rpcrdma_ep *ep =
+ container_of(work, struct rpcrdma_ep, rep_connect_worker.work);
+ struct rpcrdma_xprt *r_xprt =
+ container_of(ep, struct rpcrdma_xprt, rx_ep);
+ struct rpc_xprt *xprt = &r_xprt->rx_xprt;
+
+ spin_lock_bh(&xprt->transport_lock);
+ if (++xprt->connect_cookie == 0) /* maintain a reserved value */
+ ++xprt->connect_cookie;
+ if (ep->rep_connected > 0) {
+ if (!xprt_test_and_set_connected(xprt))
+ xprt_wake_pending_tasks(xprt, 0);
+ } else {
+ if (xprt_test_and_clear_connected(xprt))
+ xprt_wake_pending_tasks(xprt, -ENOTCONN);
+ }
+ spin_unlock_bh(&xprt->transport_lock);
+}
+
+/*
+ * This function is called when an async event is posted to
+ * the connection which changes the connection state. All it
+ * does at this point is mark the connection up/down, the rpc
+ * timers do the rest.
+ */
+void
+rpcrdma_conn_func(struct rpcrdma_ep *ep)
+{
+ schedule_delayed_work(&ep->rep_connect_worker, 0);
+}
+
+/*
+ * Called as a tasklet to do req/reply match and complete a request
+ * Errors must result in the RPC task either being awakened, or
+ * allowed to timeout, to discover the errors at that time.
+ */
+void
+rpcrdma_reply_handler(struct rpcrdma_rep *rep)
+{
+ struct rpcrdma_msg *headerp;
+ struct rpcrdma_req *req;
+ struct rpc_rqst *rqst;
+ struct rpc_xprt *xprt = rep->rr_xprt;
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+ __be32 *iptr;
+ int rdmalen, status;
+ unsigned long cwnd;
+ u32 credits;
+
+ /* Check status. If bad, signal disconnect and return rep to pool */
+ if (rep->rr_len == ~0U) {
+ rpcrdma_recv_buffer_put(rep);
+ if (r_xprt->rx_ep.rep_connected == 1) {
+ r_xprt->rx_ep.rep_connected = -EIO;
+ rpcrdma_conn_func(&r_xprt->rx_ep);
+ }
+ return;
+ }
+ if (rep->rr_len < RPCRDMA_HDRLEN_MIN) {
+ dprintk("RPC: %s: short/invalid reply\n", __func__);
+ goto repost;
+ }
+ headerp = rdmab_to_msg(rep->rr_rdmabuf);
+ if (headerp->rm_vers != rpcrdma_version) {
+ dprintk("RPC: %s: invalid version %d\n",
+ __func__, be32_to_cpu(headerp->rm_vers));
+ goto repost;
+ }
+
+ /* Get XID and try for a match. */
+ spin_lock(&xprt->transport_lock);
+ rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
+ if (rqst == NULL) {
+ spin_unlock(&xprt->transport_lock);
+ dprintk("RPC: %s: reply 0x%p failed "
+ "to match any request xid 0x%08x len %d\n",
+ __func__, rep, be32_to_cpu(headerp->rm_xid),
+ rep->rr_len);
+repost:
+ r_xprt->rx_stats.bad_reply_count++;
+ rep->rr_func = rpcrdma_reply_handler;
+ if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
+ rpcrdma_recv_buffer_put(rep);
+
+ return;
+ }
+
+ /* get request object */
+ req = rpcr_to_rdmar(rqst);
+ if (req->rl_reply) {
+ spin_unlock(&xprt->transport_lock);
+ dprintk("RPC: %s: duplicate reply 0x%p to RPC "
+ "request 0x%p: xid 0x%08x\n", __func__, rep, req,
+ be32_to_cpu(headerp->rm_xid));
+ goto repost;
+ }
+
+ dprintk("RPC: %s: reply 0x%p completes request 0x%p\n"
+ " RPC request 0x%p xid 0x%08x\n",
+ __func__, rep, req, rqst,
+ be32_to_cpu(headerp->rm_xid));
+
+ /* from here on, the reply is no longer an orphan */
+ req->rl_reply = rep;
+ xprt->reestablish_timeout = 0;
+
+ /* check for expected message types */
+ /* The order of some of these tests is important. */
+ switch (headerp->rm_type) {
+ case rdma_msg:
+ /* never expect read chunks */
+ /* never expect reply chunks (two ways to check) */
+ /* never expect write chunks without having offered RDMA */
+ if (headerp->rm_body.rm_chunks[0] != xdr_zero ||
+ (headerp->rm_body.rm_chunks[1] == xdr_zero &&
+ headerp->rm_body.rm_chunks[2] != xdr_zero) ||
+ (headerp->rm_body.rm_chunks[1] != xdr_zero &&
+ req->rl_nchunks == 0))
+ goto badheader;
+ if (headerp->rm_body.rm_chunks[1] != xdr_zero) {
+ /* count any expected write chunks in read reply */
+ /* start at write chunk array count */
+ iptr = &headerp->rm_body.rm_chunks[2];
+ rdmalen = rpcrdma_count_chunks(rep,
+ req->rl_nchunks, 1, &iptr);
+ /* check for validity, and no reply chunk after */
+ if (rdmalen < 0 || *iptr++ != xdr_zero)
+ goto badheader;
+ rep->rr_len -=
+ ((unsigned char *)iptr - (unsigned char *)headerp);
+ status = rep->rr_len + rdmalen;
+ r_xprt->rx_stats.total_rdma_reply += rdmalen;
+ /* special case - last chunk may omit padding */
+ if (rdmalen &= 3) {
+ rdmalen = 4 - rdmalen;
+ status += rdmalen;
+ }
+ } else {
+ /* else ordinary inline */
+ rdmalen = 0;
+ iptr = (__be32 *)((unsigned char *)headerp +
+ RPCRDMA_HDRLEN_MIN);
+ rep->rr_len -= RPCRDMA_HDRLEN_MIN;
+ status = rep->rr_len;
+ }
+ /* Fix up the rpc results for upper layer */
+ rpcrdma_inline_fixup(rqst, (char *)iptr, rep->rr_len, rdmalen);
+ break;
+
+ case rdma_nomsg:
+ /* never expect read or write chunks, always reply chunks */
+ if (headerp->rm_body.rm_chunks[0] != xdr_zero ||
+ headerp->rm_body.rm_chunks[1] != xdr_zero ||
+ headerp->rm_body.rm_chunks[2] != xdr_one ||
+ req->rl_nchunks == 0)
+ goto badheader;
+ iptr = (__be32 *)((unsigned char *)headerp +
+ RPCRDMA_HDRLEN_MIN);
+ rdmalen = rpcrdma_count_chunks(rep, req->rl_nchunks, 0, &iptr);
+ if (rdmalen < 0)
+ goto badheader;
+ r_xprt->rx_stats.total_rdma_reply += rdmalen;
+ /* Reply chunk buffer already is the reply vector - no fixup. */
+ status = rdmalen;
+ break;
+
+badheader:
+ default:
+ dprintk("%s: invalid rpcrdma reply header (type %d):"
+ " chunks[012] == %d %d %d"
+ " expected chunks <= %d\n",
+ __func__, be32_to_cpu(headerp->rm_type),
+ headerp->rm_body.rm_chunks[0],
+ headerp->rm_body.rm_chunks[1],
+ headerp->rm_body.rm_chunks[2],
+ req->rl_nchunks);
+ status = -EIO;
+ r_xprt->rx_stats.bad_reply_count++;
+ break;
+ }
+
+ credits = be32_to_cpu(headerp->rm_credit);
+ if (credits == 0)
+ credits = 1; /* don't deadlock */
+ else if (credits > r_xprt->rx_buf.rb_max_requests)
+ credits = r_xprt->rx_buf.rb_max_requests;
+
+ cwnd = xprt->cwnd;
+ xprt->cwnd = credits << RPC_CWNDSHIFT;
+ if (xprt->cwnd > cwnd)
+ xprt_release_rqst_cong(rqst->rq_task);
+
+ dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
+ __func__, xprt, rqst, status);
+ xprt_complete_rqst(rqst->rq_task, status);
+ spin_unlock(&xprt->transport_lock);
+}
diff --git a/net/sunrpc/xprtrdma/svc_rdma.c b/net/sunrpc/xprtrdma/svc_rdma.c
new file mode 100644
index 000000000..c1b627026
--- /dev/null
+++ b/net/sunrpc/xprtrdma/svc_rdma.c
@@ -0,0 +1,302 @@
+/*
+ * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses. You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the BSD-type
+ * license below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided
+ * with the distribution.
+ *
+ * Neither the name of the Network Appliance, Inc. nor the names of
+ * its contributors may be used to endorse or promote products
+ * derived from this software without specific prior written
+ * permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * Author: Tom Tucker <tom@opengridcomputing.com>
+ */
+#include <linux/module.h>
+#include <linux/init.h>
+#include <linux/slab.h>
+#include <linux/fs.h>
+#include <linux/sysctl.h>
+#include <linux/workqueue.h>
+#include <linux/sunrpc/clnt.h>
+#include <linux/sunrpc/sched.h>
+#include <linux/sunrpc/svc_rdma.h>
+#include "xprt_rdma.h"
+
+#define RPCDBG_FACILITY RPCDBG_SVCXPRT
+
+/* RPC/RDMA parameters */
+unsigned int svcrdma_ord = RPCRDMA_ORD;
+static unsigned int min_ord = 1;
+static unsigned int max_ord = 4096;
+unsigned int svcrdma_max_requests = RPCRDMA_MAX_REQUESTS;
+static unsigned int min_max_requests = 4;
+static unsigned int max_max_requests = 16384;
+unsigned int svcrdma_max_req_size = RPCRDMA_MAX_REQ_SIZE;
+static unsigned int min_max_inline = 4096;
+static unsigned int max_max_inline = 65536;
+
+atomic_t rdma_stat_recv;
+atomic_t rdma_stat_read;
+atomic_t rdma_stat_write;
+atomic_t rdma_stat_sq_starve;
+atomic_t rdma_stat_rq_starve;
+atomic_t rdma_stat_rq_poll;
+atomic_t rdma_stat_rq_prod;
+atomic_t rdma_stat_sq_poll;
+atomic_t rdma_stat_sq_prod;
+
+/* Temporary NFS request map and context caches */
+struct kmem_cache *svc_rdma_map_cachep;
+struct kmem_cache *svc_rdma_ctxt_cachep;
+
+struct workqueue_struct *svc_rdma_wq;
+
+/*
+ * This function implements reading and resetting an atomic_t stat
+ * variable through read/write to a proc file. Any write to the file
+ * resets the associated statistic to zero. Any read returns it's
+ * current value.
+ */
+static int read_reset_stat(struct ctl_table *table, int write,
+ void __user *buffer, size_t *lenp,
+ loff_t *ppos)
+{
+ atomic_t *stat = (atomic_t *)table->data;
+
+ if (!stat)
+ return -EINVAL;
+
+ if (write)
+ atomic_set(stat, 0);
+ else {
+ char str_buf[32];
+ char *data;
+ int len = snprintf(str_buf, 32, "%d\n", atomic_read(stat));
+ if (len >= 32)
+ return -EFAULT;
+ len = strlen(str_buf);
+ if (*ppos > len) {
+ *lenp = 0;
+ return 0;
+ }
+ data = &str_buf[*ppos];
+ len -= *ppos;
+ if (len > *lenp)
+ len = *lenp;
+ if (len && copy_to_user(buffer, str_buf, len))
+ return -EFAULT;
+ *lenp = len;
+ *ppos += len;
+ }
+ return 0;
+}
+
+static struct ctl_table_header *svcrdma_table_header;
+static struct ctl_table svcrdma_parm_table[] = {
+ {
+ .procname = "max_requests",
+ .data = &svcrdma_max_requests,
+ .maxlen = sizeof(unsigned int),
+ .mode = 0644,
+ .proc_handler = proc_dointvec_minmax,
+ .extra1 = &min_max_requests,
+ .extra2 = &max_max_requests
+ },
+ {
+ .procname = "max_req_size",
+ .data = &svcrdma_max_req_size,
+ .maxlen = sizeof(unsigned int),
+ .mode = 0644,
+ .proc_handler = proc_dointvec_minmax,
+ .extra1 = &min_max_inline,
+ .extra2 = &max_max_inline
+ },
+ {
+ .procname = "max_outbound_read_requests",
+ .data = &svcrdma_ord,
+ .maxlen = sizeof(unsigned int),
+ .mode = 0644,
+ .proc_handler = proc_dointvec_minmax,
+ .extra1 = &min_ord,
+ .extra2 = &max_ord,
+ },
+
+ {
+ .procname = "rdma_stat_read",
+ .data = &rdma_stat_read,
+ .maxlen = sizeof(atomic_t),
+ .mode = 0644,
+ .proc_handler = read_reset_stat,
+ },
+ {
+ .procname = "rdma_stat_recv",
+ .data = &rdma_stat_recv,
+ .maxlen = sizeof(atomic_t),
+ .mode = 0644,
+ .proc_handler = read_reset_stat,
+ },
+ {
+ .procname = "rdma_stat_write",
+ .data = &rdma_stat_write,
+ .maxlen = sizeof(atomic_t),
+ .mode = 0644,
+ .proc_handler = read_reset_stat,
+ },
+ {
+ .procname = "rdma_stat_sq_starve",
+ .data = &rdma_stat_sq_starve,
+ .maxlen = sizeof(atomic_t),
+ .mode = 0644,
+ .proc_handler = read_reset_stat,
+ },
+ {
+ .procname = "rdma_stat_rq_starve",
+ .data = &rdma_stat_rq_starve,
+ .maxlen = sizeof(atomic_t),
+ .mode = 0644,
+ .proc_handler = read_reset_stat,
+ },
+ {
+ .procname = "rdma_stat_rq_poll",
+ .data = &rdma_stat_rq_poll,
+ .maxlen = sizeof(atomic_t),
+ .mode = 0644,
+ .proc_handler = read_reset_stat,
+ },
+ {
+ .procname = "rdma_stat_rq_prod",
+ .data = &rdma_stat_rq_prod,
+ .maxlen = sizeof(atomic_t),
+ .mode = 0644,
+ .proc_handler = read_reset_stat,
+ },
+ {
+ .procname = "rdma_stat_sq_poll",
+ .data = &rdma_stat_sq_poll,
+ .maxlen = sizeof(atomic_t),
+ .mode = 0644,
+ .proc_handler = read_reset_stat,
+ },
+ {
+ .procname = "rdma_stat_sq_prod",
+ .data = &rdma_stat_sq_prod,
+ .maxlen = sizeof(atomic_t),
+ .mode = 0644,
+ .proc_handler = read_reset_stat,
+ },
+ { },
+};
+
+static struct ctl_table svcrdma_table[] = {
+ {
+ .procname = "svc_rdma",
+ .mode = 0555,
+ .child = svcrdma_parm_table
+ },
+ { },
+};
+
+static struct ctl_table svcrdma_root_table[] = {
+ {
+ .procname = "sunrpc",
+ .mode = 0555,
+ .child = svcrdma_table
+ },
+ { },
+};
+
+void svc_rdma_cleanup(void)
+{
+ dprintk("SVCRDMA Module Removed, deregister RPC RDMA transport\n");
+ destroy_workqueue(svc_rdma_wq);
+ if (svcrdma_table_header) {
+ unregister_sysctl_table(svcrdma_table_header);
+ svcrdma_table_header = NULL;
+ }
+ svc_unreg_xprt_class(&svc_rdma_class);
+ kmem_cache_destroy(svc_rdma_map_cachep);
+ kmem_cache_destroy(svc_rdma_ctxt_cachep);
+}
+
+int svc_rdma_init(void)
+{
+ dprintk("SVCRDMA Module Init, register RPC RDMA transport\n");
+ dprintk("\tsvcrdma_ord : %d\n", svcrdma_ord);
+ dprintk("\tmax_requests : %d\n", svcrdma_max_requests);
+ dprintk("\tsq_depth : %d\n",
+ svcrdma_max_requests * RPCRDMA_SQ_DEPTH_MULT);
+ dprintk("\tmax_inline : %d\n", svcrdma_max_req_size);
+
+ svc_rdma_wq = alloc_workqueue("svc_rdma", 0, 0);
+ if (!svc_rdma_wq)
+ return -ENOMEM;
+
+ if (!svcrdma_table_header)
+ svcrdma_table_header =
+ register_sysctl_table(svcrdma_root_table);
+
+ /* Create the temporary map cache */
+ svc_rdma_map_cachep = kmem_cache_create("svc_rdma_map_cache",
+ sizeof(struct svc_rdma_req_map),
+ 0,
+ SLAB_HWCACHE_ALIGN,
+ NULL);
+ if (!svc_rdma_map_cachep) {
+ printk(KERN_INFO "Could not allocate map cache.\n");
+ goto err0;
+ }
+
+ /* Create the temporary context cache */
+ svc_rdma_ctxt_cachep =
+ kmem_cache_create("svc_rdma_ctxt_cache",
+ sizeof(struct svc_rdma_op_ctxt),
+ 0,
+ SLAB_HWCACHE_ALIGN,
+ NULL);
+ if (!svc_rdma_ctxt_cachep) {
+ printk(KERN_INFO "Could not allocate WR ctxt cache.\n");
+ goto err1;
+ }
+
+ /* Register RDMA with the SVC transport switch */
+ svc_reg_xprt_class(&svc_rdma_class);
+ return 0;
+ err1:
+ kmem_cache_destroy(svc_rdma_map_cachep);
+ err0:
+ unregister_sysctl_table(svcrdma_table_header);
+ destroy_workqueue(svc_rdma_wq);
+ return -ENOMEM;
+}
+MODULE_AUTHOR("Tom Tucker <tom@opengridcomputing.com>");
+MODULE_DESCRIPTION("SVC RDMA Transport");
+MODULE_LICENSE("Dual BSD/GPL");
+module_init(svc_rdma_init);
+module_exit(svc_rdma_cleanup);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_marshal.c b/net/sunrpc/xprtrdma/svc_rdma_marshal.c
new file mode 100644
index 000000000..b681855cf
--- /dev/null
+++ b/net/sunrpc/xprtrdma/svc_rdma_marshal.c
@@ -0,0 +1,370 @@
+/*
+ * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses. You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the BSD-type
+ * license below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided
+ * with the distribution.
+ *
+ * Neither the name of the Network Appliance, Inc. nor the names of
+ * its contributors may be used to endorse or promote products
+ * derived from this software without specific prior written
+ * permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * Author: Tom Tucker <tom@opengridcomputing.com>
+ */
+
+#include <linux/sunrpc/xdr.h>
+#include <linux/sunrpc/debug.h>
+#include <asm/unaligned.h>
+#include <linux/sunrpc/rpc_rdma.h>
+#include <linux/sunrpc/svc_rdma.h>
+
+#define RPCDBG_FACILITY RPCDBG_SVCXPRT
+
+/*
+ * Decodes a read chunk list. The expected format is as follows:
+ * descrim : xdr_one
+ * position : u32 offset into XDR stream
+ * handle : u32 RKEY
+ * . . .
+ * end-of-list: xdr_zero
+ */
+static u32 *decode_read_list(u32 *va, u32 *vaend)
+{
+ struct rpcrdma_read_chunk *ch = (struct rpcrdma_read_chunk *)va;
+
+ while (ch->rc_discrim != xdr_zero) {
+ if (((unsigned long)ch + sizeof(struct rpcrdma_read_chunk)) >
+ (unsigned long)vaend) {
+ dprintk("svcrdma: vaend=%p, ch=%p\n", vaend, ch);
+ return NULL;
+ }
+ ch++;
+ }
+ return (u32 *)&ch->rc_position;
+}
+
+/*
+ * Decodes a write chunk list. The expected format is as follows:
+ * descrim : xdr_one
+ * nchunks : <count>
+ * handle : u32 RKEY ---+
+ * length : u32 <len of segment> |
+ * offset : remove va + <count>
+ * . . . |
+ * ---+
+ */
+static u32 *decode_write_list(u32 *va, u32 *vaend)
+{
+ unsigned long start, end;
+ int nchunks;
+
+ struct rpcrdma_write_array *ary =
+ (struct rpcrdma_write_array *)va;
+
+ /* Check for not write-array */
+ if (ary->wc_discrim == xdr_zero)
+ return (u32 *)&ary->wc_nchunks;
+
+ if ((unsigned long)ary + sizeof(struct rpcrdma_write_array) >
+ (unsigned long)vaend) {
+ dprintk("svcrdma: ary=%p, vaend=%p\n", ary, vaend);
+ return NULL;
+ }
+ nchunks = ntohl(ary->wc_nchunks);
+
+ start = (unsigned long)&ary->wc_array[0];
+ end = (unsigned long)vaend;
+ if (nchunks < 0 ||
+ nchunks > (SIZE_MAX - start) / sizeof(struct rpcrdma_write_chunk) ||
+ (start + (sizeof(struct rpcrdma_write_chunk) * nchunks)) > end) {
+ dprintk("svcrdma: ary=%p, wc_nchunks=%d, vaend=%p\n",
+ ary, nchunks, vaend);
+ return NULL;
+ }
+ /*
+ * rs_length is the 2nd 4B field in wc_target and taking its
+ * address skips the list terminator
+ */
+ return (u32 *)&ary->wc_array[nchunks].wc_target.rs_length;
+}
+
+static u32 *decode_reply_array(u32 *va, u32 *vaend)
+{
+ unsigned long start, end;
+ int nchunks;
+ struct rpcrdma_write_array *ary =
+ (struct rpcrdma_write_array *)va;
+
+ /* Check for no reply-array */
+ if (ary->wc_discrim == xdr_zero)
+ return (u32 *)&ary->wc_nchunks;
+
+ if ((unsigned long)ary + sizeof(struct rpcrdma_write_array) >
+ (unsigned long)vaend) {
+ dprintk("svcrdma: ary=%p, vaend=%p\n", ary, vaend);
+ return NULL;
+ }
+ nchunks = ntohl(ary->wc_nchunks);
+
+ start = (unsigned long)&ary->wc_array[0];
+ end = (unsigned long)vaend;
+ if (nchunks < 0 ||
+ nchunks > (SIZE_MAX - start) / sizeof(struct rpcrdma_write_chunk) ||
+ (start + (sizeof(struct rpcrdma_write_chunk) * nchunks)) > end) {
+ dprintk("svcrdma: ary=%p, wc_nchunks=%d, vaend=%p\n",
+ ary, nchunks, vaend);
+ return NULL;
+ }
+ return (u32 *)&ary->wc_array[nchunks];
+}
+
+int svc_rdma_xdr_decode_req(struct rpcrdma_msg **rdma_req,
+ struct svc_rqst *rqstp)
+{
+ struct rpcrdma_msg *rmsgp = NULL;
+ u32 *va;
+ u32 *vaend;
+ u32 hdr_len;
+
+ rmsgp = (struct rpcrdma_msg *)rqstp->rq_arg.head[0].iov_base;
+
+ /* Verify that there's enough bytes for header + something */
+ if (rqstp->rq_arg.len <= RPCRDMA_HDRLEN_MIN) {
+ dprintk("svcrdma: header too short = %d\n",
+ rqstp->rq_arg.len);
+ return -EINVAL;
+ }
+
+ /* Decode the header */
+ rmsgp->rm_xid = ntohl(rmsgp->rm_xid);
+ rmsgp->rm_vers = ntohl(rmsgp->rm_vers);
+ rmsgp->rm_credit = ntohl(rmsgp->rm_credit);
+ rmsgp->rm_type = ntohl(rmsgp->rm_type);
+
+ if (rmsgp->rm_vers != RPCRDMA_VERSION)
+ return -ENOSYS;
+
+ /* Pull in the extra for the padded case and bump our pointer */
+ if (rmsgp->rm_type == RDMA_MSGP) {
+ int hdrlen;
+ rmsgp->rm_body.rm_padded.rm_align =
+ ntohl(rmsgp->rm_body.rm_padded.rm_align);
+ rmsgp->rm_body.rm_padded.rm_thresh =
+ ntohl(rmsgp->rm_body.rm_padded.rm_thresh);
+
+ va = &rmsgp->rm_body.rm_padded.rm_pempty[4];
+ rqstp->rq_arg.head[0].iov_base = va;
+ hdrlen = (u32)((unsigned long)va - (unsigned long)rmsgp);
+ rqstp->rq_arg.head[0].iov_len -= hdrlen;
+ if (hdrlen > rqstp->rq_arg.len)
+ return -EINVAL;
+ return hdrlen;
+ }
+
+ /* The chunk list may contain either a read chunk list or a write
+ * chunk list and a reply chunk list.
+ */
+ va = &rmsgp->rm_body.rm_chunks[0];
+ vaend = (u32 *)((unsigned long)rmsgp + rqstp->rq_arg.len);
+ va = decode_read_list(va, vaend);
+ if (!va)
+ return -EINVAL;
+ va = decode_write_list(va, vaend);
+ if (!va)
+ return -EINVAL;
+ va = decode_reply_array(va, vaend);
+ if (!va)
+ return -EINVAL;
+
+ rqstp->rq_arg.head[0].iov_base = va;
+ hdr_len = (unsigned long)va - (unsigned long)rmsgp;
+ rqstp->rq_arg.head[0].iov_len -= hdr_len;
+
+ *rdma_req = rmsgp;
+ return hdr_len;
+}
+
+int svc_rdma_xdr_decode_deferred_req(struct svc_rqst *rqstp)
+{
+ struct rpcrdma_msg *rmsgp = NULL;
+ struct rpcrdma_read_chunk *ch;
+ struct rpcrdma_write_array *ary;
+ u32 *va;
+ u32 hdrlen;
+
+ dprintk("svcrdma: processing deferred RDMA header on rqstp=%p\n",
+ rqstp);
+ rmsgp = (struct rpcrdma_msg *)rqstp->rq_arg.head[0].iov_base;
+
+ /* Pull in the extra for the padded case and bump our pointer */
+ if (rmsgp->rm_type == RDMA_MSGP) {
+ va = &rmsgp->rm_body.rm_padded.rm_pempty[4];
+ rqstp->rq_arg.head[0].iov_base = va;
+ hdrlen = (u32)((unsigned long)va - (unsigned long)rmsgp);
+ rqstp->rq_arg.head[0].iov_len -= hdrlen;
+ return hdrlen;
+ }
+
+ /*
+ * Skip all chunks to find RPC msg. These were previously processed
+ */
+ va = &rmsgp->rm_body.rm_chunks[0];
+
+ /* Skip read-list */
+ for (ch = (struct rpcrdma_read_chunk *)va;
+ ch->rc_discrim != xdr_zero; ch++);
+ va = (u32 *)&ch->rc_position;
+
+ /* Skip write-list */
+ ary = (struct rpcrdma_write_array *)va;
+ if (ary->wc_discrim == xdr_zero)
+ va = (u32 *)&ary->wc_nchunks;
+ else
+ /*
+ * rs_length is the 2nd 4B field in wc_target and taking its
+ * address skips the list terminator
+ */
+ va = (u32 *)&ary->wc_array[ary->wc_nchunks].wc_target.rs_length;
+
+ /* Skip reply-array */
+ ary = (struct rpcrdma_write_array *)va;
+ if (ary->wc_discrim == xdr_zero)
+ va = (u32 *)&ary->wc_nchunks;
+ else
+ va = (u32 *)&ary->wc_array[ary->wc_nchunks];
+
+ rqstp->rq_arg.head[0].iov_base = va;
+ hdrlen = (unsigned long)va - (unsigned long)rmsgp;
+ rqstp->rq_arg.head[0].iov_len -= hdrlen;
+
+ return hdrlen;
+}
+
+int svc_rdma_xdr_encode_error(struct svcxprt_rdma *xprt,
+ struct rpcrdma_msg *rmsgp,
+ enum rpcrdma_errcode err, u32 *va)
+{
+ u32 *startp = va;
+
+ *va++ = htonl(rmsgp->rm_xid);
+ *va++ = htonl(rmsgp->rm_vers);
+ *va++ = htonl(xprt->sc_max_requests);
+ *va++ = htonl(RDMA_ERROR);
+ *va++ = htonl(err);
+ if (err == ERR_VERS) {
+ *va++ = htonl(RPCRDMA_VERSION);
+ *va++ = htonl(RPCRDMA_VERSION);
+ }
+
+ return (int)((unsigned long)va - (unsigned long)startp);
+}
+
+int svc_rdma_xdr_get_reply_hdr_len(struct rpcrdma_msg *rmsgp)
+{
+ struct rpcrdma_write_array *wr_ary;
+
+ /* There is no read-list in a reply */
+
+ /* skip write list */
+ wr_ary = (struct rpcrdma_write_array *)
+ &rmsgp->rm_body.rm_chunks[1];
+ if (wr_ary->wc_discrim)
+ wr_ary = (struct rpcrdma_write_array *)
+ &wr_ary->wc_array[ntohl(wr_ary->wc_nchunks)].
+ wc_target.rs_length;
+ else
+ wr_ary = (struct rpcrdma_write_array *)
+ &wr_ary->wc_nchunks;
+
+ /* skip reply array */
+ if (wr_ary->wc_discrim)
+ wr_ary = (struct rpcrdma_write_array *)
+ &wr_ary->wc_array[ntohl(wr_ary->wc_nchunks)];
+ else
+ wr_ary = (struct rpcrdma_write_array *)
+ &wr_ary->wc_nchunks;
+
+ return (unsigned long) wr_ary - (unsigned long) rmsgp;
+}
+
+void svc_rdma_xdr_encode_write_list(struct rpcrdma_msg *rmsgp, int chunks)
+{
+ struct rpcrdma_write_array *ary;
+
+ /* no read-list */
+ rmsgp->rm_body.rm_chunks[0] = xdr_zero;
+
+ /* write-array discrim */
+ ary = (struct rpcrdma_write_array *)
+ &rmsgp->rm_body.rm_chunks[1];
+ ary->wc_discrim = xdr_one;
+ ary->wc_nchunks = htonl(chunks);
+
+ /* write-list terminator */
+ ary->wc_array[chunks].wc_target.rs_handle = xdr_zero;
+
+ /* reply-array discriminator */
+ ary->wc_array[chunks].wc_target.rs_length = xdr_zero;
+}
+
+void svc_rdma_xdr_encode_reply_array(struct rpcrdma_write_array *ary,
+ int chunks)
+{
+ ary->wc_discrim = xdr_one;
+ ary->wc_nchunks = htonl(chunks);
+}
+
+void svc_rdma_xdr_encode_array_chunk(struct rpcrdma_write_array *ary,
+ int chunk_no,
+ __be32 rs_handle,
+ __be64 rs_offset,
+ u32 write_len)
+{
+ struct rpcrdma_segment *seg = &ary->wc_array[chunk_no].wc_target;
+ seg->rs_handle = rs_handle;
+ seg->rs_offset = rs_offset;
+ seg->rs_length = htonl(write_len);
+}
+
+void svc_rdma_xdr_encode_reply_header(struct svcxprt_rdma *xprt,
+ struct rpcrdma_msg *rdma_argp,
+ struct rpcrdma_msg *rdma_resp,
+ enum rpcrdma_proc rdma_type)
+{
+ rdma_resp->rm_xid = htonl(rdma_argp->rm_xid);
+ rdma_resp->rm_vers = htonl(rdma_argp->rm_vers);
+ rdma_resp->rm_credit = htonl(xprt->sc_max_requests);
+ rdma_resp->rm_type = htonl(rdma_type);
+
+ /* Encode <nul> chunks lists */
+ rdma_resp->rm_body.rm_chunks[0] = xdr_zero;
+ rdma_resp->rm_body.rm_chunks[1] = xdr_zero;
+ rdma_resp->rm_body.rm_chunks[2] = xdr_zero;
+}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
new file mode 100644
index 000000000..f9f13a32d
--- /dev/null
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -0,0 +1,672 @@
+/*
+ * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
+ * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses. You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the BSD-type
+ * license below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided
+ * with the distribution.
+ *
+ * Neither the name of the Network Appliance, Inc. nor the names of
+ * its contributors may be used to endorse or promote products
+ * derived from this software without specific prior written
+ * permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * Author: Tom Tucker <tom@opengridcomputing.com>
+ */
+
+#include <linux/sunrpc/debug.h>
+#include <linux/sunrpc/rpc_rdma.h>
+#include <linux/spinlock.h>
+#include <asm/unaligned.h>
+#include <rdma/ib_verbs.h>
+#include <rdma/rdma_cm.h>
+#include <linux/sunrpc/svc_rdma.h>
+
+#define RPCDBG_FACILITY RPCDBG_SVCXPRT
+
+/*
+ * Replace the pages in the rq_argpages array with the pages from the SGE in
+ * the RDMA_RECV completion. The SGL should contain full pages up until the
+ * last one.
+ */
+static void rdma_build_arg_xdr(struct svc_rqst *rqstp,
+ struct svc_rdma_op_ctxt *ctxt,
+ u32 byte_count)
+{
+ struct rpcrdma_msg *rmsgp;
+ struct page *page;
+ u32 bc;
+ int sge_no;
+
+ /* Swap the page in the SGE with the page in argpages */
+ page = ctxt->pages[0];
+ put_page(rqstp->rq_pages[0]);
+ rqstp->rq_pages[0] = page;
+
+ /* Set up the XDR head */
+ rqstp->rq_arg.head[0].iov_base = page_address(page);
+ rqstp->rq_arg.head[0].iov_len =
+ min_t(size_t, byte_count, ctxt->sge[0].length);
+ rqstp->rq_arg.len = byte_count;
+ rqstp->rq_arg.buflen = byte_count;
+
+ /* Compute bytes past head in the SGL */
+ bc = byte_count - rqstp->rq_arg.head[0].iov_len;
+
+ /* If data remains, store it in the pagelist */
+ rqstp->rq_arg.page_len = bc;
+ rqstp->rq_arg.page_base = 0;
+
+ /* RDMA_NOMSG: RDMA READ data should land just after RDMA RECV data */
+ rmsgp = (struct rpcrdma_msg *)rqstp->rq_arg.head[0].iov_base;
+ if (be32_to_cpu(rmsgp->rm_type) == RDMA_NOMSG)
+ rqstp->rq_arg.pages = &rqstp->rq_pages[0];
+ else
+ rqstp->rq_arg.pages = &rqstp->rq_pages[1];
+
+ sge_no = 1;
+ while (bc && sge_no < ctxt->count) {
+ page = ctxt->pages[sge_no];
+ put_page(rqstp->rq_pages[sge_no]);
+ rqstp->rq_pages[sge_no] = page;
+ bc -= min_t(u32, bc, ctxt->sge[sge_no].length);
+ rqstp->rq_arg.buflen += ctxt->sge[sge_no].length;
+ sge_no++;
+ }
+ rqstp->rq_respages = &rqstp->rq_pages[sge_no];
+ rqstp->rq_next_page = rqstp->rq_respages + 1;
+
+ /* If not all pages were used from the SGL, free the remaining ones */
+ bc = sge_no;
+ while (sge_no < ctxt->count) {
+ page = ctxt->pages[sge_no++];
+ put_page(page);
+ }
+ ctxt->count = bc;
+
+ /* Set up tail */
+ rqstp->rq_arg.tail[0].iov_base = NULL;
+ rqstp->rq_arg.tail[0].iov_len = 0;
+}
+
+static int rdma_read_max_sge(struct svcxprt_rdma *xprt, int sge_count)
+{
+ if (rdma_node_get_transport(xprt->sc_cm_id->device->node_type) ==
+ RDMA_TRANSPORT_IWARP)
+ return 1;
+ else
+ return min_t(int, sge_count, xprt->sc_max_sge);
+}
+
+/* Issue an RDMA_READ using the local lkey to map the data sink */
+int rdma_read_chunk_lcl(struct svcxprt_rdma *xprt,
+ struct svc_rqst *rqstp,
+ struct svc_rdma_op_ctxt *head,
+ int *page_no,
+ u32 *page_offset,
+ u32 rs_handle,
+ u32 rs_length,
+ u64 rs_offset,
+ bool last)
+{
+ struct ib_send_wr read_wr;
+ int pages_needed = PAGE_ALIGN(*page_offset + rs_length) >> PAGE_SHIFT;
+ struct svc_rdma_op_ctxt *ctxt = svc_rdma_get_context(xprt);
+ int ret, read, pno;
+ u32 pg_off = *page_offset;
+ u32 pg_no = *page_no;
+
+ ctxt->direction = DMA_FROM_DEVICE;
+ ctxt->read_hdr = head;
+ pages_needed =
+ min_t(int, pages_needed, rdma_read_max_sge(xprt, pages_needed));
+ read = min_t(int, pages_needed << PAGE_SHIFT, rs_length);
+
+ for (pno = 0; pno < pages_needed; pno++) {
+ int len = min_t(int, rs_length, PAGE_SIZE - pg_off);
+
+ head->arg.pages[pg_no] = rqstp->rq_arg.pages[pg_no];
+ head->arg.page_len += len;
+ head->arg.len += len;
+ if (!pg_off)
+ head->count++;
+ rqstp->rq_respages = &rqstp->rq_arg.pages[pg_no+1];
+ rqstp->rq_next_page = rqstp->rq_respages + 1;
+ ctxt->sge[pno].addr =
+ ib_dma_map_page(xprt->sc_cm_id->device,
+ head->arg.pages[pg_no], pg_off,
+ PAGE_SIZE - pg_off,
+ DMA_FROM_DEVICE);
+ ret = ib_dma_mapping_error(xprt->sc_cm_id->device,
+ ctxt->sge[pno].addr);
+ if (ret)
+ goto err;
+ atomic_inc(&xprt->sc_dma_used);
+
+ /* The lkey here is either a local dma lkey or a dma_mr lkey */
+ ctxt->sge[pno].lkey = xprt->sc_dma_lkey;
+ ctxt->sge[pno].length = len;
+ ctxt->count++;
+
+ /* adjust offset and wrap to next page if needed */
+ pg_off += len;
+ if (pg_off == PAGE_SIZE) {
+ pg_off = 0;
+ pg_no++;
+ }
+ rs_length -= len;
+ }
+
+ if (last && rs_length == 0)
+ set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
+ else
+ clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
+
+ memset(&read_wr, 0, sizeof(read_wr));
+ read_wr.wr_id = (unsigned long)ctxt;
+ read_wr.opcode = IB_WR_RDMA_READ;
+ ctxt->wr_op = read_wr.opcode;
+ read_wr.send_flags = IB_SEND_SIGNALED;
+ read_wr.wr.rdma.rkey = rs_handle;
+ read_wr.wr.rdma.remote_addr = rs_offset;
+ read_wr.sg_list = ctxt->sge;
+ read_wr.num_sge = pages_needed;
+
+ ret = svc_rdma_send(xprt, &read_wr);
+ if (ret) {
+ pr_err("svcrdma: Error %d posting RDMA_READ\n", ret);
+ set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
+ goto err;
+ }
+
+ /* return current location in page array */
+ *page_no = pg_no;
+ *page_offset = pg_off;
+ ret = read;
+ atomic_inc(&rdma_stat_read);
+ return ret;
+ err:
+ svc_rdma_unmap_dma(ctxt);
+ svc_rdma_put_context(ctxt, 0);
+ return ret;
+}
+
+/* Issue an RDMA_READ using an FRMR to map the data sink */
+int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,
+ struct svc_rqst *rqstp,
+ struct svc_rdma_op_ctxt *head,
+ int *page_no,
+ u32 *page_offset,
+ u32 rs_handle,
+ u32 rs_length,
+ u64 rs_offset,
+ bool last)
+{
+ struct ib_send_wr read_wr;
+ struct ib_send_wr inv_wr;
+ struct ib_send_wr fastreg_wr;
+ u8 key;
+ int pages_needed = PAGE_ALIGN(*page_offset + rs_length) >> PAGE_SHIFT;
+ struct svc_rdma_op_ctxt *ctxt = svc_rdma_get_context(xprt);
+ struct svc_rdma_fastreg_mr *frmr = svc_rdma_get_frmr(xprt);
+ int ret, read, pno;
+ u32 pg_off = *page_offset;
+ u32 pg_no = *page_no;
+
+ if (IS_ERR(frmr))
+ return -ENOMEM;
+
+ ctxt->direction = DMA_FROM_DEVICE;
+ ctxt->frmr = frmr;
+ pages_needed = min_t(int, pages_needed, xprt->sc_frmr_pg_list_len);
+ read = min_t(int, pages_needed << PAGE_SHIFT, rs_length);
+
+ frmr->kva = page_address(rqstp->rq_arg.pages[pg_no]);
+ frmr->direction = DMA_FROM_DEVICE;
+ frmr->access_flags = (IB_ACCESS_LOCAL_WRITE|IB_ACCESS_REMOTE_WRITE);
+ frmr->map_len = pages_needed << PAGE_SHIFT;
+ frmr->page_list_len = pages_needed;
+
+ for (pno = 0; pno < pages_needed; pno++) {
+ int len = min_t(int, rs_length, PAGE_SIZE - pg_off);
+
+ head->arg.pages[pg_no] = rqstp->rq_arg.pages[pg_no];
+ head->arg.page_len += len;
+ head->arg.len += len;
+ if (!pg_off)
+ head->count++;
+ rqstp->rq_respages = &rqstp->rq_arg.pages[pg_no+1];
+ rqstp->rq_next_page = rqstp->rq_respages + 1;
+ frmr->page_list->page_list[pno] =
+ ib_dma_map_page(xprt->sc_cm_id->device,
+ head->arg.pages[pg_no], 0,
+ PAGE_SIZE, DMA_FROM_DEVICE);
+ ret = ib_dma_mapping_error(xprt->sc_cm_id->device,
+ frmr->page_list->page_list[pno]);
+ if (ret)
+ goto err;
+ atomic_inc(&xprt->sc_dma_used);
+
+ /* adjust offset and wrap to next page if needed */
+ pg_off += len;
+ if (pg_off == PAGE_SIZE) {
+ pg_off = 0;
+ pg_no++;
+ }
+ rs_length -= len;
+ }
+
+ if (last && rs_length == 0)
+ set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
+ else
+ clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
+
+ /* Bump the key */
+ key = (u8)(frmr->mr->lkey & 0x000000FF);
+ ib_update_fast_reg_key(frmr->mr, ++key);
+
+ ctxt->sge[0].addr = (unsigned long)frmr->kva + *page_offset;
+ ctxt->sge[0].lkey = frmr->mr->lkey;
+ ctxt->sge[0].length = read;
+ ctxt->count = 1;
+ ctxt->read_hdr = head;
+
+ /* Prepare FASTREG WR */
+ memset(&fastreg_wr, 0, sizeof(fastreg_wr));
+ fastreg_wr.opcode = IB_WR_FAST_REG_MR;
+ fastreg_wr.send_flags = IB_SEND_SIGNALED;
+ fastreg_wr.wr.fast_reg.iova_start = (unsigned long)frmr->kva;
+ fastreg_wr.wr.fast_reg.page_list = frmr->page_list;
+ fastreg_wr.wr.fast_reg.page_list_len = frmr->page_list_len;
+ fastreg_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
+ fastreg_wr.wr.fast_reg.length = frmr->map_len;
+ fastreg_wr.wr.fast_reg.access_flags = frmr->access_flags;
+ fastreg_wr.wr.fast_reg.rkey = frmr->mr->lkey;
+ fastreg_wr.next = &read_wr;
+
+ /* Prepare RDMA_READ */
+ memset(&read_wr, 0, sizeof(read_wr));
+ read_wr.send_flags = IB_SEND_SIGNALED;
+ read_wr.wr.rdma.rkey = rs_handle;
+ read_wr.wr.rdma.remote_addr = rs_offset;
+ read_wr.sg_list = ctxt->sge;
+ read_wr.num_sge = 1;
+ if (xprt->sc_dev_caps & SVCRDMA_DEVCAP_READ_W_INV) {
+ read_wr.opcode = IB_WR_RDMA_READ_WITH_INV;
+ read_wr.wr_id = (unsigned long)ctxt;
+ read_wr.ex.invalidate_rkey = ctxt->frmr->mr->lkey;
+ } else {
+ read_wr.opcode = IB_WR_RDMA_READ;
+ read_wr.next = &inv_wr;
+ /* Prepare invalidate */
+ memset(&inv_wr, 0, sizeof(inv_wr));
+ inv_wr.wr_id = (unsigned long)ctxt;
+ inv_wr.opcode = IB_WR_LOCAL_INV;
+ inv_wr.send_flags = IB_SEND_SIGNALED | IB_SEND_FENCE;
+ inv_wr.ex.invalidate_rkey = frmr->mr->lkey;
+ }
+ ctxt->wr_op = read_wr.opcode;
+
+ /* Post the chain */
+ ret = svc_rdma_send(xprt, &fastreg_wr);
+ if (ret) {
+ pr_err("svcrdma: Error %d posting RDMA_READ\n", ret);
+ set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
+ goto err;
+ }
+
+ /* return current location in page array */
+ *page_no = pg_no;
+ *page_offset = pg_off;
+ ret = read;
+ atomic_inc(&rdma_stat_read);
+ return ret;
+ err:
+ svc_rdma_unmap_dma(ctxt);
+ svc_rdma_put_context(ctxt, 0);
+ svc_rdma_put_frmr(xprt, frmr);
+ return ret;
+}
+
+static unsigned int
+rdma_rcl_chunk_count(struct rpcrdma_read_chunk *ch)
+{
+ unsigned int count;
+
+ for (count = 0; ch->rc_discrim != xdr_zero; ch++)
+ count++;
+ return count;
+}
+
+/* If there was additional inline content, append it to the end of arg.pages.
+ * Tail copy has to be done after the reader function has determined how many
+ * pages are needed for RDMA READ.
+ */
+static int
+rdma_copy_tail(struct svc_rqst *rqstp, struct svc_rdma_op_ctxt *head,
+ u32 position, u32 byte_count, u32 page_offset, int page_no)
+{
+ char *srcp, *destp;
+ int ret;
+
+ ret = 0;
+ srcp = head->arg.head[0].iov_base + position;
+ byte_count = head->arg.head[0].iov_len - position;
+ if (byte_count > PAGE_SIZE) {
+ dprintk("svcrdma: large tail unsupported\n");
+ return 0;
+ }
+
+ /* Fit as much of the tail on the current page as possible */
+ if (page_offset != PAGE_SIZE) {
+ destp = page_address(rqstp->rq_arg.pages[page_no]);
+ destp += page_offset;
+ while (byte_count--) {
+ *destp++ = *srcp++;
+ page_offset++;
+ if (page_offset == PAGE_SIZE && byte_count)
+ goto more;
+ }
+ goto done;
+ }
+
+more:
+ /* Fit the rest on the next page */
+ page_no++;
+ destp = page_address(rqstp->rq_arg.pages[page_no]);
+ while (byte_count--)
+ *destp++ = *srcp++;
+
+ rqstp->rq_respages = &rqstp->rq_arg.pages[page_no+1];
+ rqstp->rq_next_page = rqstp->rq_respages + 1;
+
+done:
+ byte_count = head->arg.head[0].iov_len - position;
+ head->arg.page_len += byte_count;
+ head->arg.len += byte_count;
+ head->arg.buflen += byte_count;
+ return 1;
+}
+
+static int rdma_read_chunks(struct svcxprt_rdma *xprt,
+ struct rpcrdma_msg *rmsgp,
+ struct svc_rqst *rqstp,
+ struct svc_rdma_op_ctxt *head)
+{
+ int page_no, ret;
+ struct rpcrdma_read_chunk *ch;
+ u32 handle, page_offset, byte_count;
+ u32 position;
+ u64 rs_offset;
+ bool last;
+
+ /* If no read list is present, return 0 */
+ ch = svc_rdma_get_read_chunk(rmsgp);
+ if (!ch)
+ return 0;
+
+ if (rdma_rcl_chunk_count(ch) > RPCSVC_MAXPAGES)
+ return -EINVAL;
+
+ /* The request is completed when the RDMA_READs complete. The
+ * head context keeps all the pages that comprise the
+ * request.
+ */
+ head->arg.head[0] = rqstp->rq_arg.head[0];
+ head->arg.tail[0] = rqstp->rq_arg.tail[0];
+ head->hdr_count = head->count;
+ head->arg.page_base = 0;
+ head->arg.page_len = 0;
+ head->arg.len = rqstp->rq_arg.len;
+ head->arg.buflen = rqstp->rq_arg.buflen;
+
+ ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
+ position = be32_to_cpu(ch->rc_position);
+
+ /* RDMA_NOMSG: RDMA READ data should land just after RDMA RECV data */
+ if (position == 0) {
+ head->arg.pages = &head->pages[0];
+ page_offset = head->byte_len;
+ } else {
+ head->arg.pages = &head->pages[head->count];
+ page_offset = 0;
+ }
+
+ ret = 0;
+ page_no = 0;
+ for (; ch->rc_discrim != xdr_zero; ch++) {
+ if (be32_to_cpu(ch->rc_position) != position)
+ goto err;
+
+ handle = be32_to_cpu(ch->rc_target.rs_handle),
+ byte_count = be32_to_cpu(ch->rc_target.rs_length);
+ xdr_decode_hyper((__be32 *)&ch->rc_target.rs_offset,
+ &rs_offset);
+
+ while (byte_count > 0) {
+ last = (ch + 1)->rc_discrim == xdr_zero;
+ ret = xprt->sc_reader(xprt, rqstp, head,
+ &page_no, &page_offset,
+ handle, byte_count,
+ rs_offset, last);
+ if (ret < 0)
+ goto err;
+ byte_count -= ret;
+ rs_offset += ret;
+ head->arg.buflen += ret;
+ }
+ }
+
+ /* Read list may need XDR round-up (see RFC 5666, s. 3.7) */
+ if (page_offset & 3) {
+ u32 pad = 4 - (page_offset & 3);
+
+ head->arg.page_len += pad;
+ head->arg.len += pad;
+ head->arg.buflen += pad;
+ page_offset += pad;
+ }
+
+ ret = 1;
+ if (position && position < head->arg.head[0].iov_len)
+ ret = rdma_copy_tail(rqstp, head, position,
+ byte_count, page_offset, page_no);
+ head->arg.head[0].iov_len = position;
+ head->position = position;
+
+ err:
+ /* Detach arg pages. svc_recv will replenish them */
+ for (page_no = 0;
+ &rqstp->rq_pages[page_no] < rqstp->rq_respages; page_no++)
+ rqstp->rq_pages[page_no] = NULL;
+
+ return ret;
+}
+
+static int rdma_read_complete(struct svc_rqst *rqstp,
+ struct svc_rdma_op_ctxt *head)
+{
+ int page_no;
+ int ret;
+
+ /* Copy RPC pages */
+ for (page_no = 0; page_no < head->count; page_no++) {
+ put_page(rqstp->rq_pages[page_no]);
+ rqstp->rq_pages[page_no] = head->pages[page_no];
+ }
+
+ /* Adjustments made for RDMA_NOMSG type requests */
+ if (head->position == 0) {
+ if (head->arg.len <= head->sge[0].length) {
+ head->arg.head[0].iov_len = head->arg.len -
+ head->byte_len;
+ head->arg.page_len = 0;
+ } else {
+ head->arg.head[0].iov_len = head->sge[0].length -
+ head->byte_len;
+ head->arg.page_len = head->arg.len -
+ head->sge[0].length;
+ }
+ }
+
+ /* Point rq_arg.pages past header */
+ rqstp->rq_arg.pages = &rqstp->rq_pages[head->hdr_count];
+ rqstp->rq_arg.page_len = head->arg.page_len;
+ rqstp->rq_arg.page_base = head->arg.page_base;
+
+ /* rq_respages starts after the last arg page */
+ rqstp->rq_respages = &rqstp->rq_arg.pages[page_no];
+ rqstp->rq_next_page = rqstp->rq_respages + 1;
+
+ /* Rebuild rq_arg head and tail. */
+ rqstp->rq_arg.head[0] = head->arg.head[0];
+ rqstp->rq_arg.tail[0] = head->arg.tail[0];
+ rqstp->rq_arg.len = head->arg.len;
+ rqstp->rq_arg.buflen = head->arg.buflen;
+
+ /* Free the context */
+ svc_rdma_put_context(head, 0);
+
+ /* XXX: What should this be? */
+ rqstp->rq_prot = IPPROTO_MAX;
+ svc_xprt_copy_addrs(rqstp, rqstp->rq_xprt);
+
+ ret = rqstp->rq_arg.head[0].iov_len
+ + rqstp->rq_arg.page_len
+ + rqstp->rq_arg.tail[0].iov_len;
+ dprintk("svcrdma: deferred read ret=%d, rq_arg.len=%u, "
+ "rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len=%zu\n",
+ ret, rqstp->rq_arg.len, rqstp->rq_arg.head[0].iov_base,
+ rqstp->rq_arg.head[0].iov_len);
+
+ return ret;
+}
+
+/*
+ * Set up the rqstp thread context to point to the RQ buffer. If
+ * necessary, pull additional data from the client with an RDMA_READ
+ * request.
+ */
+int svc_rdma_recvfrom(struct svc_rqst *rqstp)
+{
+ struct svc_xprt *xprt = rqstp->rq_xprt;
+ struct svcxprt_rdma *rdma_xprt =
+ container_of(xprt, struct svcxprt_rdma, sc_xprt);
+ struct svc_rdma_op_ctxt *ctxt = NULL;
+ struct rpcrdma_msg *rmsgp;
+ int ret = 0;
+ int len;
+
+ dprintk("svcrdma: rqstp=%p\n", rqstp);
+
+ spin_lock_bh(&rdma_xprt->sc_rq_dto_lock);
+ if (!list_empty(&rdma_xprt->sc_read_complete_q)) {
+ ctxt = list_entry(rdma_xprt->sc_read_complete_q.next,
+ struct svc_rdma_op_ctxt,
+ dto_q);
+ list_del_init(&ctxt->dto_q);
+ spin_unlock_bh(&rdma_xprt->sc_rq_dto_lock);
+ return rdma_read_complete(rqstp, ctxt);
+ } else if (!list_empty(&rdma_xprt->sc_rq_dto_q)) {
+ ctxt = list_entry(rdma_xprt->sc_rq_dto_q.next,
+ struct svc_rdma_op_ctxt,
+ dto_q);
+ list_del_init(&ctxt->dto_q);
+ } else {
+ atomic_inc(&rdma_stat_rq_starve);
+ clear_bit(XPT_DATA, &xprt->xpt_flags);
+ ctxt = NULL;
+ }
+ spin_unlock_bh(&rdma_xprt->sc_rq_dto_lock);
+ if (!ctxt) {
+ /* This is the EAGAIN path. The svc_recv routine will
+ * return -EAGAIN, the nfsd thread will go to call into
+ * svc_recv again and we shouldn't be on the active
+ * transport list
+ */
+ if (test_bit(XPT_CLOSE, &xprt->xpt_flags))
+ goto close_out;
+
+ goto out;
+ }
+ dprintk("svcrdma: processing ctxt=%p on xprt=%p, rqstp=%p, status=%d\n",
+ ctxt, rdma_xprt, rqstp, ctxt->wc_status);
+ atomic_inc(&rdma_stat_recv);
+
+ /* Build up the XDR from the receive buffers. */
+ rdma_build_arg_xdr(rqstp, ctxt, ctxt->byte_len);
+
+ /* Decode the RDMA header. */
+ len = svc_rdma_xdr_decode_req(&rmsgp, rqstp);
+ rqstp->rq_xprt_hlen = len;
+
+ /* If the request is invalid, reply with an error */
+ if (len < 0) {
+ if (len == -ENOSYS)
+ svc_rdma_send_error(rdma_xprt, rmsgp, ERR_VERS);
+ goto close_out;
+ }
+
+ /* Read read-list data. */
+ ret = rdma_read_chunks(rdma_xprt, rmsgp, rqstp, ctxt);
+ if (ret > 0) {
+ /* read-list posted, defer until data received from client. */
+ goto defer;
+ } else if (ret < 0) {
+ /* Post of read-list failed, free context. */
+ svc_rdma_put_context(ctxt, 1);
+ return 0;
+ }
+
+ ret = rqstp->rq_arg.head[0].iov_len
+ + rqstp->rq_arg.page_len
+ + rqstp->rq_arg.tail[0].iov_len;
+ svc_rdma_put_context(ctxt, 0);
+ out:
+ dprintk("svcrdma: ret=%d, rq_arg.len=%u, "
+ "rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len=%zd\n",
+ ret, rqstp->rq_arg.len,
+ rqstp->rq_arg.head[0].iov_base,
+ rqstp->rq_arg.head[0].iov_len);
+ rqstp->rq_prot = IPPROTO_MAX;
+ svc_xprt_copy_addrs(rqstp, xprt);
+ return ret;
+
+ close_out:
+ if (ctxt)
+ svc_rdma_put_context(ctxt, 1);
+ dprintk("svcrdma: transport %p is closing\n", xprt);
+ /*
+ * Set the close bit and enqueue it. svc_recv will see the
+ * close bit and call svc_xprt_delete
+ */
+ set_bit(XPT_CLOSE, &xprt->xpt_flags);
+defer:
+ return 0;
+}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
new file mode 100644
index 000000000..7de33d1af
--- /dev/null
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -0,0 +1,560 @@
+/*
+ * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
+ * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses. You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the BSD-type
+ * license below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided
+ * with the distribution.
+ *
+ * Neither the name of the Network Appliance, Inc. nor the names of
+ * its contributors may be used to endorse or promote products
+ * derived from this software without specific prior written
+ * permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * Author: Tom Tucker <tom@opengridcomputing.com>
+ */
+
+#include <linux/sunrpc/debug.h>
+#include <linux/sunrpc/rpc_rdma.h>
+#include <linux/spinlock.h>
+#include <asm/unaligned.h>
+#include <rdma/ib_verbs.h>
+#include <rdma/rdma_cm.h>
+#include <linux/sunrpc/svc_rdma.h>
+
+#define RPCDBG_FACILITY RPCDBG_SVCXPRT
+
+static int map_xdr(struct svcxprt_rdma *xprt,
+ struct xdr_buf *xdr,
+ struct svc_rdma_req_map *vec)
+{
+ int sge_no;
+ u32 sge_bytes;
+ u32 page_bytes;
+ u32 page_off;
+ int page_no;
+
+ if (xdr->len !=
+ (xdr->head[0].iov_len + xdr->page_len + xdr->tail[0].iov_len)) {
+ pr_err("svcrdma: map_xdr: XDR buffer length error\n");
+ return -EIO;
+ }
+
+ /* Skip the first sge, this is for the RPCRDMA header */
+ sge_no = 1;
+
+ /* Head SGE */
+ vec->sge[sge_no].iov_base = xdr->head[0].iov_base;
+ vec->sge[sge_no].iov_len = xdr->head[0].iov_len;
+ sge_no++;
+
+ /* pages SGE */
+ page_no = 0;
+ page_bytes = xdr->page_len;
+ page_off = xdr->page_base;
+ while (page_bytes) {
+ vec->sge[sge_no].iov_base =
+ page_address(xdr->pages[page_no]) + page_off;
+ sge_bytes = min_t(u32, page_bytes, (PAGE_SIZE - page_off));
+ page_bytes -= sge_bytes;
+ vec->sge[sge_no].iov_len = sge_bytes;
+
+ sge_no++;
+ page_no++;
+ page_off = 0; /* reset for next time through loop */
+ }
+
+ /* Tail SGE */
+ if (xdr->tail[0].iov_len) {
+ vec->sge[sge_no].iov_base = xdr->tail[0].iov_base;
+ vec->sge[sge_no].iov_len = xdr->tail[0].iov_len;
+ sge_no++;
+ }
+
+ dprintk("svcrdma: map_xdr: sge_no %d page_no %d "
+ "page_base %u page_len %u head_len %zu tail_len %zu\n",
+ sge_no, page_no, xdr->page_base, xdr->page_len,
+ xdr->head[0].iov_len, xdr->tail[0].iov_len);
+
+ vec->count = sge_no;
+ return 0;
+}
+
+static dma_addr_t dma_map_xdr(struct svcxprt_rdma *xprt,
+ struct xdr_buf *xdr,
+ u32 xdr_off, size_t len, int dir)
+{
+ struct page *page;
+ dma_addr_t dma_addr;
+ if (xdr_off < xdr->head[0].iov_len) {
+ /* This offset is in the head */
+ xdr_off += (unsigned long)xdr->head[0].iov_base & ~PAGE_MASK;
+ page = virt_to_page(xdr->head[0].iov_base);
+ } else {
+ xdr_off -= xdr->head[0].iov_len;
+ if (xdr_off < xdr->page_len) {
+ /* This offset is in the page list */
+ xdr_off += xdr->page_base;
+ page = xdr->pages[xdr_off >> PAGE_SHIFT];
+ xdr_off &= ~PAGE_MASK;
+ } else {
+ /* This offset is in the tail */
+ xdr_off -= xdr->page_len;
+ xdr_off += (unsigned long)
+ xdr->tail[0].iov_base & ~PAGE_MASK;
+ page = virt_to_page(xdr->tail[0].iov_base);
+ }
+ }
+ dma_addr = ib_dma_map_page(xprt->sc_cm_id->device, page, xdr_off,
+ min_t(size_t, PAGE_SIZE, len), dir);
+ return dma_addr;
+}
+
+/* Assumptions:
+ * - The specified write_len can be represented in sc_max_sge * PAGE_SIZE
+ */
+static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,
+ u32 rmr, u64 to,
+ u32 xdr_off, int write_len,
+ struct svc_rdma_req_map *vec)
+{
+ struct ib_send_wr write_wr;
+ struct ib_sge *sge;
+ int xdr_sge_no;
+ int sge_no;
+ int sge_bytes;
+ int sge_off;
+ int bc;
+ struct svc_rdma_op_ctxt *ctxt;
+
+ if (vec->count > RPCSVC_MAXPAGES) {
+ pr_err("svcrdma: Too many pages (%lu)\n", vec->count);
+ return -EIO;
+ }
+
+ dprintk("svcrdma: RDMA_WRITE rmr=%x, to=%llx, xdr_off=%d, "
+ "write_len=%d, vec->sge=%p, vec->count=%lu\n",
+ rmr, (unsigned long long)to, xdr_off,
+ write_len, vec->sge, vec->count);
+
+ ctxt = svc_rdma_get_context(xprt);
+ ctxt->direction = DMA_TO_DEVICE;
+ sge = ctxt->sge;
+
+ /* Find the SGE associated with xdr_off */
+ for (bc = xdr_off, xdr_sge_no = 1; bc && xdr_sge_no < vec->count;
+ xdr_sge_no++) {
+ if (vec->sge[xdr_sge_no].iov_len > bc)
+ break;
+ bc -= vec->sge[xdr_sge_no].iov_len;
+ }
+
+ sge_off = bc;
+ bc = write_len;
+ sge_no = 0;
+
+ /* Copy the remaining SGE */
+ while (bc != 0) {
+ sge_bytes = min_t(size_t,
+ bc, vec->sge[xdr_sge_no].iov_len-sge_off);
+ sge[sge_no].length = sge_bytes;
+ sge[sge_no].addr =
+ dma_map_xdr(xprt, &rqstp->rq_res, xdr_off,
+ sge_bytes, DMA_TO_DEVICE);
+ xdr_off += sge_bytes;
+ if (ib_dma_mapping_error(xprt->sc_cm_id->device,
+ sge[sge_no].addr))
+ goto err;
+ atomic_inc(&xprt->sc_dma_used);
+ sge[sge_no].lkey = xprt->sc_dma_lkey;
+ ctxt->count++;
+ sge_off = 0;
+ sge_no++;
+ xdr_sge_no++;
+ if (xdr_sge_no > vec->count) {
+ pr_err("svcrdma: Too many sges (%d)\n", xdr_sge_no);
+ goto err;
+ }
+ bc -= sge_bytes;
+ if (sge_no == xprt->sc_max_sge)
+ break;
+ }
+
+ /* Prepare WRITE WR */
+ memset(&write_wr, 0, sizeof write_wr);
+ ctxt->wr_op = IB_WR_RDMA_WRITE;
+ write_wr.wr_id = (unsigned long)ctxt;
+ write_wr.sg_list = &sge[0];
+ write_wr.num_sge = sge_no;
+ write_wr.opcode = IB_WR_RDMA_WRITE;
+ write_wr.send_flags = IB_SEND_SIGNALED;
+ write_wr.wr.rdma.rkey = rmr;
+ write_wr.wr.rdma.remote_addr = to;
+
+ /* Post It */
+ atomic_inc(&rdma_stat_write);
+ if (svc_rdma_send(xprt, &write_wr))
+ goto err;
+ return write_len - bc;
+ err:
+ svc_rdma_unmap_dma(ctxt);
+ svc_rdma_put_context(ctxt, 0);
+ /* Fatal error, close transport */
+ return -EIO;
+}
+
+static int send_write_chunks(struct svcxprt_rdma *xprt,
+ struct rpcrdma_msg *rdma_argp,
+ struct rpcrdma_msg *rdma_resp,
+ struct svc_rqst *rqstp,
+ struct svc_rdma_req_map *vec)
+{
+ u32 xfer_len = rqstp->rq_res.page_len + rqstp->rq_res.tail[0].iov_len;
+ int write_len;
+ u32 xdr_off;
+ int chunk_off;
+ int chunk_no;
+ struct rpcrdma_write_array *arg_ary;
+ struct rpcrdma_write_array *res_ary;
+ int ret;
+
+ arg_ary = svc_rdma_get_write_array(rdma_argp);
+ if (!arg_ary)
+ return 0;
+ res_ary = (struct rpcrdma_write_array *)
+ &rdma_resp->rm_body.rm_chunks[1];
+
+ /* Write chunks start at the pagelist */
+ for (xdr_off = rqstp->rq_res.head[0].iov_len, chunk_no = 0;
+ xfer_len && chunk_no < arg_ary->wc_nchunks;
+ chunk_no++) {
+ struct rpcrdma_segment *arg_ch;
+ u64 rs_offset;
+
+ arg_ch = &arg_ary->wc_array[chunk_no].wc_target;
+ write_len = min(xfer_len, ntohl(arg_ch->rs_length));
+
+ /* Prepare the response chunk given the length actually
+ * written */
+ xdr_decode_hyper((__be32 *)&arg_ch->rs_offset, &rs_offset);
+ svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no,
+ arg_ch->rs_handle,
+ arg_ch->rs_offset,
+ write_len);
+ chunk_off = 0;
+ while (write_len) {
+ ret = send_write(xprt, rqstp,
+ ntohl(arg_ch->rs_handle),
+ rs_offset + chunk_off,
+ xdr_off,
+ write_len,
+ vec);
+ if (ret <= 0) {
+ dprintk("svcrdma: RDMA_WRITE failed, ret=%d\n",
+ ret);
+ return -EIO;
+ }
+ chunk_off += ret;
+ xdr_off += ret;
+ xfer_len -= ret;
+ write_len -= ret;
+ }
+ }
+ /* Update the req with the number of chunks actually used */
+ svc_rdma_xdr_encode_write_list(rdma_resp, chunk_no);
+
+ return rqstp->rq_res.page_len + rqstp->rq_res.tail[0].iov_len;
+}
+
+static int send_reply_chunks(struct svcxprt_rdma *xprt,
+ struct rpcrdma_msg *rdma_argp,
+ struct rpcrdma_msg *rdma_resp,
+ struct svc_rqst *rqstp,
+ struct svc_rdma_req_map *vec)
+{
+ u32 xfer_len = rqstp->rq_res.len;
+ int write_len;
+ u32 xdr_off;
+ int chunk_no;
+ int chunk_off;
+ int nchunks;
+ struct rpcrdma_segment *ch;
+ struct rpcrdma_write_array *arg_ary;
+ struct rpcrdma_write_array *res_ary;
+ int ret;
+
+ arg_ary = svc_rdma_get_reply_array(rdma_argp);
+ if (!arg_ary)
+ return 0;
+ /* XXX: need to fix when reply lists occur with read-list and or
+ * write-list */
+ res_ary = (struct rpcrdma_write_array *)
+ &rdma_resp->rm_body.rm_chunks[2];
+
+ /* xdr offset starts at RPC message */
+ nchunks = ntohl(arg_ary->wc_nchunks);
+ for (xdr_off = 0, chunk_no = 0;
+ xfer_len && chunk_no < nchunks;
+ chunk_no++) {
+ u64 rs_offset;
+ ch = &arg_ary->wc_array[chunk_no].wc_target;
+ write_len = min(xfer_len, htonl(ch->rs_length));
+
+ /* Prepare the reply chunk given the length actually
+ * written */
+ xdr_decode_hyper((__be32 *)&ch->rs_offset, &rs_offset);
+ svc_rdma_xdr_encode_array_chunk(res_ary, chunk_no,
+ ch->rs_handle, ch->rs_offset,
+ write_len);
+ chunk_off = 0;
+ while (write_len) {
+ ret = send_write(xprt, rqstp,
+ ntohl(ch->rs_handle),
+ rs_offset + chunk_off,
+ xdr_off,
+ write_len,
+ vec);
+ if (ret <= 0) {
+ dprintk("svcrdma: RDMA_WRITE failed, ret=%d\n",
+ ret);
+ return -EIO;
+ }
+ chunk_off += ret;
+ xdr_off += ret;
+ xfer_len -= ret;
+ write_len -= ret;
+ }
+ }
+ /* Update the req with the number of chunks actually used */
+ svc_rdma_xdr_encode_reply_array(res_ary, chunk_no);
+
+ return rqstp->rq_res.len;
+}
+
+/* This function prepares the portion of the RPCRDMA message to be
+ * sent in the RDMA_SEND. This function is called after data sent via
+ * RDMA has already been transmitted. There are three cases:
+ * - The RPCRDMA header, RPC header, and payload are all sent in a
+ * single RDMA_SEND. This is the "inline" case.
+ * - The RPCRDMA header and some portion of the RPC header and data
+ * are sent via this RDMA_SEND and another portion of the data is
+ * sent via RDMA.
+ * - The RPCRDMA header [NOMSG] is sent in this RDMA_SEND and the RPC
+ * header and data are all transmitted via RDMA.
+ * In all three cases, this function prepares the RPCRDMA header in
+ * sge[0], the 'type' parameter indicates the type to place in the
+ * RPCRDMA header, and the 'byte_count' field indicates how much of
+ * the XDR to include in this RDMA_SEND. NB: The offset of the payload
+ * to send is zero in the XDR.
+ */
+static int send_reply(struct svcxprt_rdma *rdma,
+ struct svc_rqst *rqstp,
+ struct page *page,
+ struct rpcrdma_msg *rdma_resp,
+ struct svc_rdma_op_ctxt *ctxt,
+ struct svc_rdma_req_map *vec,
+ int byte_count)
+{
+ struct ib_send_wr send_wr;
+ int sge_no;
+ int sge_bytes;
+ int page_no;
+ int pages;
+ int ret;
+
+ /* Post a recv buffer to handle another request. */
+ ret = svc_rdma_post_recv(rdma);
+ if (ret) {
+ printk(KERN_INFO
+ "svcrdma: could not post a receive buffer, err=%d."
+ "Closing transport %p.\n", ret, rdma);
+ set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
+ svc_rdma_put_context(ctxt, 0);
+ return -ENOTCONN;
+ }
+
+ /* Prepare the context */
+ ctxt->pages[0] = page;
+ ctxt->count = 1;
+
+ /* Prepare the SGE for the RPCRDMA Header */
+ ctxt->sge[0].lkey = rdma->sc_dma_lkey;
+ ctxt->sge[0].length = svc_rdma_xdr_get_reply_hdr_len(rdma_resp);
+ ctxt->sge[0].addr =
+ ib_dma_map_page(rdma->sc_cm_id->device, page, 0,
+ ctxt->sge[0].length, DMA_TO_DEVICE);
+ if (ib_dma_mapping_error(rdma->sc_cm_id->device, ctxt->sge[0].addr))
+ goto err;
+ atomic_inc(&rdma->sc_dma_used);
+
+ ctxt->direction = DMA_TO_DEVICE;
+
+ /* Map the payload indicated by 'byte_count' */
+ for (sge_no = 1; byte_count && sge_no < vec->count; sge_no++) {
+ int xdr_off = 0;
+ sge_bytes = min_t(size_t, vec->sge[sge_no].iov_len, byte_count);
+ byte_count -= sge_bytes;
+ ctxt->sge[sge_no].addr =
+ dma_map_xdr(rdma, &rqstp->rq_res, xdr_off,
+ sge_bytes, DMA_TO_DEVICE);
+ xdr_off += sge_bytes;
+ if (ib_dma_mapping_error(rdma->sc_cm_id->device,
+ ctxt->sge[sge_no].addr))
+ goto err;
+ atomic_inc(&rdma->sc_dma_used);
+ ctxt->sge[sge_no].lkey = rdma->sc_dma_lkey;
+ ctxt->sge[sge_no].length = sge_bytes;
+ }
+ if (byte_count != 0) {
+ pr_err("svcrdma: Could not map %d bytes\n", byte_count);
+ goto err;
+ }
+
+ /* Save all respages in the ctxt and remove them from the
+ * respages array. They are our pages until the I/O
+ * completes.
+ */
+ pages = rqstp->rq_next_page - rqstp->rq_respages;
+ for (page_no = 0; page_no < pages; page_no++) {
+ ctxt->pages[page_no+1] = rqstp->rq_respages[page_no];
+ ctxt->count++;
+ rqstp->rq_respages[page_no] = NULL;
+ /*
+ * If there are more pages than SGE, terminate SGE
+ * list so that svc_rdma_unmap_dma doesn't attempt to
+ * unmap garbage.
+ */
+ if (page_no+1 >= sge_no)
+ ctxt->sge[page_no+1].length = 0;
+ }
+ rqstp->rq_next_page = rqstp->rq_respages + 1;
+
+ if (sge_no > rdma->sc_max_sge) {
+ pr_err("svcrdma: Too many sges (%d)\n", sge_no);
+ goto err;
+ }
+ memset(&send_wr, 0, sizeof send_wr);
+ ctxt->wr_op = IB_WR_SEND;
+ send_wr.wr_id = (unsigned long)ctxt;
+ send_wr.sg_list = ctxt->sge;
+ send_wr.num_sge = sge_no;
+ send_wr.opcode = IB_WR_SEND;
+ send_wr.send_flags = IB_SEND_SIGNALED;
+
+ ret = svc_rdma_send(rdma, &send_wr);
+ if (ret)
+ goto err;
+
+ return 0;
+
+ err:
+ svc_rdma_unmap_dma(ctxt);
+ svc_rdma_put_context(ctxt, 1);
+ return -EIO;
+}
+
+void svc_rdma_prep_reply_hdr(struct svc_rqst *rqstp)
+{
+}
+
+int svc_rdma_sendto(struct svc_rqst *rqstp)
+{
+ struct svc_xprt *xprt = rqstp->rq_xprt;
+ struct svcxprt_rdma *rdma =
+ container_of(xprt, struct svcxprt_rdma, sc_xprt);
+ struct rpcrdma_msg *rdma_argp;
+ struct rpcrdma_msg *rdma_resp;
+ struct rpcrdma_write_array *reply_ary;
+ enum rpcrdma_proc reply_type;
+ int ret;
+ int inline_bytes;
+ struct page *res_page;
+ struct svc_rdma_op_ctxt *ctxt;
+ struct svc_rdma_req_map *vec;
+
+ dprintk("svcrdma: sending response for rqstp=%p\n", rqstp);
+
+ /* Get the RDMA request header. The receive logic always
+ * places this at the start of page 0.
+ */
+ rdma_argp = page_address(rqstp->rq_pages[0]);
+
+ /* Build an req vec for the XDR */
+ ctxt = svc_rdma_get_context(rdma);
+ ctxt->direction = DMA_TO_DEVICE;
+ vec = svc_rdma_get_req_map();
+ ret = map_xdr(rdma, &rqstp->rq_res, vec);
+ if (ret)
+ goto err0;
+ inline_bytes = rqstp->rq_res.len;
+
+ /* Create the RDMA response header */
+ res_page = svc_rdma_get_page();
+ rdma_resp = page_address(res_page);
+ reply_ary = svc_rdma_get_reply_array(rdma_argp);
+ if (reply_ary)
+ reply_type = RDMA_NOMSG;
+ else
+ reply_type = RDMA_MSG;
+ svc_rdma_xdr_encode_reply_header(rdma, rdma_argp,
+ rdma_resp, reply_type);
+
+ /* Send any write-chunk data and build resp write-list */
+ ret = send_write_chunks(rdma, rdma_argp, rdma_resp,
+ rqstp, vec);
+ if (ret < 0) {
+ printk(KERN_ERR "svcrdma: failed to send write chunks, rc=%d\n",
+ ret);
+ goto err1;
+ }
+ inline_bytes -= ret;
+
+ /* Send any reply-list data and update resp reply-list */
+ ret = send_reply_chunks(rdma, rdma_argp, rdma_resp,
+ rqstp, vec);
+ if (ret < 0) {
+ printk(KERN_ERR "svcrdma: failed to send reply chunks, rc=%d\n",
+ ret);
+ goto err1;
+ }
+ inline_bytes -= ret;
+
+ ret = send_reply(rdma, rqstp, res_page, rdma_resp, ctxt, vec,
+ inline_bytes);
+ svc_rdma_put_req_map(vec);
+ dprintk("svcrdma: send_reply returns %d\n", ret);
+ return ret;
+
+ err1:
+ put_page(res_page);
+ err0:
+ svc_rdma_put_req_map(vec);
+ svc_rdma_put_context(ctxt, 0);
+ return ret;
+}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
new file mode 100644
index 000000000..f609c1c2d
--- /dev/null
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -0,0 +1,1366 @@
+/*
+ * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
+ * Copyright (c) 2005-2007 Network Appliance, Inc. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses. You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the BSD-type
+ * license below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided
+ * with the distribution.
+ *
+ * Neither the name of the Network Appliance, Inc. nor the names of
+ * its contributors may be used to endorse or promote products
+ * derived from this software without specific prior written
+ * permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ * Author: Tom Tucker <tom@opengridcomputing.com>
+ */
+
+#include <linux/sunrpc/svc_xprt.h>
+#include <linux/sunrpc/debug.h>
+#include <linux/sunrpc/rpc_rdma.h>
+#include <linux/interrupt.h>
+#include <linux/sched.h>
+#include <linux/slab.h>
+#include <linux/spinlock.h>
+#include <linux/workqueue.h>
+#include <rdma/ib_verbs.h>
+#include <rdma/rdma_cm.h>
+#include <linux/sunrpc/svc_rdma.h>
+#include <linux/export.h>
+#include "xprt_rdma.h"
+
+#define RPCDBG_FACILITY RPCDBG_SVCXPRT
+
+static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
+ struct net *net,
+ struct sockaddr *sa, int salen,
+ int flags);
+static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt);
+static void svc_rdma_release_rqst(struct svc_rqst *);
+static void dto_tasklet_func(unsigned long data);
+static void svc_rdma_detach(struct svc_xprt *xprt);
+static void svc_rdma_free(struct svc_xprt *xprt);
+static int svc_rdma_has_wspace(struct svc_xprt *xprt);
+static int svc_rdma_secure_port(struct svc_rqst *);
+static void rq_cq_reap(struct svcxprt_rdma *xprt);
+static void sq_cq_reap(struct svcxprt_rdma *xprt);
+
+static DECLARE_TASKLET(dto_tasklet, dto_tasklet_func, 0UL);
+static DEFINE_SPINLOCK(dto_lock);
+static LIST_HEAD(dto_xprt_q);
+
+static struct svc_xprt_ops svc_rdma_ops = {
+ .xpo_create = svc_rdma_create,
+ .xpo_recvfrom = svc_rdma_recvfrom,
+ .xpo_sendto = svc_rdma_sendto,
+ .xpo_release_rqst = svc_rdma_release_rqst,
+ .xpo_detach = svc_rdma_detach,
+ .xpo_free = svc_rdma_free,
+ .xpo_prep_reply_hdr = svc_rdma_prep_reply_hdr,
+ .xpo_has_wspace = svc_rdma_has_wspace,
+ .xpo_accept = svc_rdma_accept,
+ .xpo_secure_port = svc_rdma_secure_port,
+};
+
+struct svc_xprt_class svc_rdma_class = {
+ .xcl_name = "rdma",
+ .xcl_owner = THIS_MODULE,
+ .xcl_ops = &svc_rdma_ops,
+ .xcl_max_payload = RPCSVC_MAXPAYLOAD_RDMA,
+ .xcl_ident = XPRT_TRANSPORT_RDMA,
+};
+
+struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
+{
+ struct svc_rdma_op_ctxt *ctxt;
+
+ while (1) {
+ ctxt = kmem_cache_alloc(svc_rdma_ctxt_cachep, GFP_KERNEL);
+ if (ctxt)
+ break;
+ schedule_timeout_uninterruptible(msecs_to_jiffies(500));
+ }
+ ctxt->xprt = xprt;
+ INIT_LIST_HEAD(&ctxt->dto_q);
+ ctxt->count = 0;
+ ctxt->frmr = NULL;
+ atomic_inc(&xprt->sc_ctxt_used);
+ return ctxt;
+}
+
+void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt)
+{
+ struct svcxprt_rdma *xprt = ctxt->xprt;
+ int i;
+ for (i = 0; i < ctxt->count && ctxt->sge[i].length; i++) {
+ /*
+ * Unmap the DMA addr in the SGE if the lkey matches
+ * the sc_dma_lkey, otherwise, ignore it since it is
+ * an FRMR lkey and will be unmapped later when the
+ * last WR that uses it completes.
+ */
+ if (ctxt->sge[i].lkey == xprt->sc_dma_lkey) {
+ atomic_dec(&xprt->sc_dma_used);
+ ib_dma_unmap_page(xprt->sc_cm_id->device,
+ ctxt->sge[i].addr,
+ ctxt->sge[i].length,
+ ctxt->direction);
+ }
+ }
+}
+
+void svc_rdma_put_context(struct svc_rdma_op_ctxt *ctxt, int free_pages)
+{
+ struct svcxprt_rdma *xprt;
+ int i;
+
+ xprt = ctxt->xprt;
+ if (free_pages)
+ for (i = 0; i < ctxt->count; i++)
+ put_page(ctxt->pages[i]);
+
+ kmem_cache_free(svc_rdma_ctxt_cachep, ctxt);
+ atomic_dec(&xprt->sc_ctxt_used);
+}
+
+/*
+ * Temporary NFS req mappings are shared across all transport
+ * instances. These are short lived and should be bounded by the number
+ * of concurrent server threads * depth of the SQ.
+ */
+struct svc_rdma_req_map *svc_rdma_get_req_map(void)
+{
+ struct svc_rdma_req_map *map;
+ while (1) {
+ map = kmem_cache_alloc(svc_rdma_map_cachep, GFP_KERNEL);
+ if (map)
+ break;
+ schedule_timeout_uninterruptible(msecs_to_jiffies(500));
+ }
+ map->count = 0;
+ return map;
+}
+
+void svc_rdma_put_req_map(struct svc_rdma_req_map *map)
+{
+ kmem_cache_free(svc_rdma_map_cachep, map);
+}
+
+/* ib_cq event handler */
+static void cq_event_handler(struct ib_event *event, void *context)
+{
+ struct svc_xprt *xprt = context;
+ dprintk("svcrdma: received CQ event id=%d, context=%p\n",
+ event->event, context);
+ set_bit(XPT_CLOSE, &xprt->xpt_flags);
+}
+
+/* QP event handler */
+static void qp_event_handler(struct ib_event *event, void *context)
+{
+ struct svc_xprt *xprt = context;
+
+ switch (event->event) {
+ /* These are considered benign events */
+ case IB_EVENT_PATH_MIG:
+ case IB_EVENT_COMM_EST:
+ case IB_EVENT_SQ_DRAINED:
+ case IB_EVENT_QP_LAST_WQE_REACHED:
+ dprintk("svcrdma: QP event %d received for QP=%p\n",
+ event->event, event->element.qp);
+ break;
+ /* These are considered fatal events */
+ case IB_EVENT_PATH_MIG_ERR:
+ case IB_EVENT_QP_FATAL:
+ case IB_EVENT_QP_REQ_ERR:
+ case IB_EVENT_QP_ACCESS_ERR:
+ case IB_EVENT_DEVICE_FATAL:
+ default:
+ dprintk("svcrdma: QP ERROR event %d received for QP=%p, "
+ "closing transport\n",
+ event->event, event->element.qp);
+ set_bit(XPT_CLOSE, &xprt->xpt_flags);
+ break;
+ }
+}
+
+/*
+ * Data Transfer Operation Tasklet
+ *
+ * Walks a list of transports with I/O pending, removing entries as
+ * they are added to the server's I/O pending list. Two bits indicate
+ * if SQ, RQ, or both have I/O pending. The dto_lock is an irqsave
+ * spinlock that serializes access to the transport list with the RQ
+ * and SQ interrupt handlers.
+ */
+static void dto_tasklet_func(unsigned long data)
+{
+ struct svcxprt_rdma *xprt;
+ unsigned long flags;
+
+ spin_lock_irqsave(&dto_lock, flags);
+ while (!list_empty(&dto_xprt_q)) {
+ xprt = list_entry(dto_xprt_q.next,
+ struct svcxprt_rdma, sc_dto_q);
+ list_del_init(&xprt->sc_dto_q);
+ spin_unlock_irqrestore(&dto_lock, flags);
+
+ rq_cq_reap(xprt);
+ sq_cq_reap(xprt);
+
+ svc_xprt_put(&xprt->sc_xprt);
+ spin_lock_irqsave(&dto_lock, flags);
+ }
+ spin_unlock_irqrestore(&dto_lock, flags);
+}
+
+/*
+ * Receive Queue Completion Handler
+ *
+ * Since an RQ completion handler is called on interrupt context, we
+ * need to defer the handling of the I/O to a tasklet
+ */
+static void rq_comp_handler(struct ib_cq *cq, void *cq_context)
+{
+ struct svcxprt_rdma *xprt = cq_context;
+ unsigned long flags;
+
+ /* Guard against unconditional flush call for destroyed QP */
+ if (atomic_read(&xprt->sc_xprt.xpt_ref.refcount)==0)
+ return;
+
+ /*
+ * Set the bit regardless of whether or not it's on the list
+ * because it may be on the list already due to an SQ
+ * completion.
+ */
+ set_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags);
+
+ /*
+ * If this transport is not already on the DTO transport queue,
+ * add it
+ */
+ spin_lock_irqsave(&dto_lock, flags);
+ if (list_empty(&xprt->sc_dto_q)) {
+ svc_xprt_get(&xprt->sc_xprt);
+ list_add_tail(&xprt->sc_dto_q, &dto_xprt_q);
+ }
+ spin_unlock_irqrestore(&dto_lock, flags);
+
+ /* Tasklet does all the work to avoid irqsave locks. */
+ tasklet_schedule(&dto_tasklet);
+}
+
+/*
+ * rq_cq_reap - Process the RQ CQ.
+ *
+ * Take all completing WC off the CQE and enqueue the associated DTO
+ * context on the dto_q for the transport.
+ *
+ * Note that caller must hold a transport reference.
+ */
+static void rq_cq_reap(struct svcxprt_rdma *xprt)
+{
+ int ret;
+ struct ib_wc wc;
+ struct svc_rdma_op_ctxt *ctxt = NULL;
+
+ if (!test_and_clear_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags))
+ return;
+
+ ib_req_notify_cq(xprt->sc_rq_cq, IB_CQ_NEXT_COMP);
+ atomic_inc(&rdma_stat_rq_poll);
+
+ while ((ret = ib_poll_cq(xprt->sc_rq_cq, 1, &wc)) > 0) {
+ ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id;
+ ctxt->wc_status = wc.status;
+ ctxt->byte_len = wc.byte_len;
+ svc_rdma_unmap_dma(ctxt);
+ if (wc.status != IB_WC_SUCCESS) {
+ /* Close the transport */
+ dprintk("svcrdma: transport closing putting ctxt %p\n", ctxt);
+ set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
+ svc_rdma_put_context(ctxt, 1);
+ svc_xprt_put(&xprt->sc_xprt);
+ continue;
+ }
+ spin_lock_bh(&xprt->sc_rq_dto_lock);
+ list_add_tail(&ctxt->dto_q, &xprt->sc_rq_dto_q);
+ spin_unlock_bh(&xprt->sc_rq_dto_lock);
+ svc_xprt_put(&xprt->sc_xprt);
+ }
+
+ if (ctxt)
+ atomic_inc(&rdma_stat_rq_prod);
+
+ set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
+ /*
+ * If data arrived before established event,
+ * don't enqueue. This defers RPC I/O until the
+ * RDMA connection is complete.
+ */
+ if (!test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags))
+ svc_xprt_enqueue(&xprt->sc_xprt);
+}
+
+/*
+ * Process a completion context
+ */
+static void process_context(struct svcxprt_rdma *xprt,
+ struct svc_rdma_op_ctxt *ctxt)
+{
+ svc_rdma_unmap_dma(ctxt);
+
+ switch (ctxt->wr_op) {
+ case IB_WR_SEND:
+ if (ctxt->frmr)
+ pr_err("svcrdma: SEND: ctxt->frmr != NULL\n");
+ svc_rdma_put_context(ctxt, 1);
+ break;
+
+ case IB_WR_RDMA_WRITE:
+ if (ctxt->frmr)
+ pr_err("svcrdma: WRITE: ctxt->frmr != NULL\n");
+ svc_rdma_put_context(ctxt, 0);
+ break;
+
+ case IB_WR_RDMA_READ:
+ case IB_WR_RDMA_READ_WITH_INV:
+ svc_rdma_put_frmr(xprt, ctxt->frmr);
+ if (test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) {
+ struct svc_rdma_op_ctxt *read_hdr = ctxt->read_hdr;
+ if (read_hdr) {
+ spin_lock_bh(&xprt->sc_rq_dto_lock);
+ set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
+ list_add_tail(&read_hdr->dto_q,
+ &xprt->sc_read_complete_q);
+ spin_unlock_bh(&xprt->sc_rq_dto_lock);
+ } else {
+ pr_err("svcrdma: ctxt->read_hdr == NULL\n");
+ }
+ svc_xprt_enqueue(&xprt->sc_xprt);
+ }
+ svc_rdma_put_context(ctxt, 0);
+ break;
+
+ default:
+ printk(KERN_ERR "svcrdma: unexpected completion type, "
+ "opcode=%d\n",
+ ctxt->wr_op);
+ break;
+ }
+}
+
+/*
+ * Send Queue Completion Handler - potentially called on interrupt context.
+ *
+ * Note that caller must hold a transport reference.
+ */
+static void sq_cq_reap(struct svcxprt_rdma *xprt)
+{
+ struct svc_rdma_op_ctxt *ctxt = NULL;
+ struct ib_wc wc_a[6];
+ struct ib_wc *wc;
+ struct ib_cq *cq = xprt->sc_sq_cq;
+ int ret;
+
+ memset(wc_a, 0, sizeof(wc_a));
+
+ if (!test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags))
+ return;
+
+ ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP);
+ atomic_inc(&rdma_stat_sq_poll);
+ while ((ret = ib_poll_cq(cq, ARRAY_SIZE(wc_a), wc_a)) > 0) {
+ int i;
+
+ for (i = 0; i < ret; i++) {
+ wc = &wc_a[i];
+ if (wc->status != IB_WC_SUCCESS) {
+ dprintk("svcrdma: sq wc err status %d\n",
+ wc->status);
+
+ /* Close the transport */
+ set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
+ }
+
+ /* Decrement used SQ WR count */
+ atomic_dec(&xprt->sc_sq_count);
+ wake_up(&xprt->sc_send_wait);
+
+ ctxt = (struct svc_rdma_op_ctxt *)
+ (unsigned long)wc->wr_id;
+ if (ctxt)
+ process_context(xprt, ctxt);
+
+ svc_xprt_put(&xprt->sc_xprt);
+ }
+ }
+
+ if (ctxt)
+ atomic_inc(&rdma_stat_sq_prod);
+}
+
+static void sq_comp_handler(struct ib_cq *cq, void *cq_context)
+{
+ struct svcxprt_rdma *xprt = cq_context;
+ unsigned long flags;
+
+ /* Guard against unconditional flush call for destroyed QP */
+ if (atomic_read(&xprt->sc_xprt.xpt_ref.refcount)==0)
+ return;
+
+ /*
+ * Set the bit regardless of whether or not it's on the list
+ * because it may be on the list already due to an RQ
+ * completion.
+ */
+ set_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags);
+
+ /*
+ * If this transport is not already on the DTO transport queue,
+ * add it
+ */
+ spin_lock_irqsave(&dto_lock, flags);
+ if (list_empty(&xprt->sc_dto_q)) {
+ svc_xprt_get(&xprt->sc_xprt);
+ list_add_tail(&xprt->sc_dto_q, &dto_xprt_q);
+ }
+ spin_unlock_irqrestore(&dto_lock, flags);
+
+ /* Tasklet does all the work to avoid irqsave locks. */
+ tasklet_schedule(&dto_tasklet);
+}
+
+static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv,
+ int listener)
+{
+ struct svcxprt_rdma *cma_xprt = kzalloc(sizeof *cma_xprt, GFP_KERNEL);
+
+ if (!cma_xprt)
+ return NULL;
+ svc_xprt_init(&init_net, &svc_rdma_class, &cma_xprt->sc_xprt, serv);
+ INIT_LIST_HEAD(&cma_xprt->sc_accept_q);
+ INIT_LIST_HEAD(&cma_xprt->sc_dto_q);
+ INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q);
+ INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q);
+ INIT_LIST_HEAD(&cma_xprt->sc_frmr_q);
+ init_waitqueue_head(&cma_xprt->sc_send_wait);
+
+ spin_lock_init(&cma_xprt->sc_lock);
+ spin_lock_init(&cma_xprt->sc_rq_dto_lock);
+ spin_lock_init(&cma_xprt->sc_frmr_q_lock);
+
+ cma_xprt->sc_ord = svcrdma_ord;
+
+ cma_xprt->sc_max_req_size = svcrdma_max_req_size;
+ cma_xprt->sc_max_requests = svcrdma_max_requests;
+ cma_xprt->sc_sq_depth = svcrdma_max_requests * RPCRDMA_SQ_DEPTH_MULT;
+ atomic_set(&cma_xprt->sc_sq_count, 0);
+ atomic_set(&cma_xprt->sc_ctxt_used, 0);
+
+ if (listener)
+ set_bit(XPT_LISTENER, &cma_xprt->sc_xprt.xpt_flags);
+
+ return cma_xprt;
+}
+
+struct page *svc_rdma_get_page(void)
+{
+ struct page *page;
+
+ while ((page = alloc_page(GFP_KERNEL)) == NULL) {
+ /* If we can't get memory, wait a bit and try again */
+ printk(KERN_INFO "svcrdma: out of memory...retrying in 1s\n");
+ schedule_timeout_uninterruptible(msecs_to_jiffies(1000));
+ }
+ return page;
+}
+
+int svc_rdma_post_recv(struct svcxprt_rdma *xprt)
+{
+ struct ib_recv_wr recv_wr, *bad_recv_wr;
+ struct svc_rdma_op_ctxt *ctxt;
+ struct page *page;
+ dma_addr_t pa;
+ int sge_no;
+ int buflen;
+ int ret;
+
+ ctxt = svc_rdma_get_context(xprt);
+ buflen = 0;
+ ctxt->direction = DMA_FROM_DEVICE;
+ for (sge_no = 0; buflen < xprt->sc_max_req_size; sge_no++) {
+ if (sge_no >= xprt->sc_max_sge) {
+ pr_err("svcrdma: Too many sges (%d)\n", sge_no);
+ goto err_put_ctxt;
+ }
+ page = svc_rdma_get_page();
+ ctxt->pages[sge_no] = page;
+ pa = ib_dma_map_page(xprt->sc_cm_id->device,
+ page, 0, PAGE_SIZE,
+ DMA_FROM_DEVICE);
+ if (ib_dma_mapping_error(xprt->sc_cm_id->device, pa))
+ goto err_put_ctxt;
+ atomic_inc(&xprt->sc_dma_used);
+ ctxt->sge[sge_no].addr = pa;
+ ctxt->sge[sge_no].length = PAGE_SIZE;
+ ctxt->sge[sge_no].lkey = xprt->sc_dma_lkey;
+ ctxt->count = sge_no + 1;
+ buflen += PAGE_SIZE;
+ }
+ recv_wr.next = NULL;
+ recv_wr.sg_list = &ctxt->sge[0];
+ recv_wr.num_sge = ctxt->count;
+ recv_wr.wr_id = (u64)(unsigned long)ctxt;
+
+ svc_xprt_get(&xprt->sc_xprt);
+ ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr);
+ if (ret) {
+ svc_rdma_unmap_dma(ctxt);
+ svc_rdma_put_context(ctxt, 1);
+ svc_xprt_put(&xprt->sc_xprt);
+ }
+ return ret;
+
+ err_put_ctxt:
+ svc_rdma_unmap_dma(ctxt);
+ svc_rdma_put_context(ctxt, 1);
+ return -ENOMEM;
+}
+
+/*
+ * This function handles the CONNECT_REQUEST event on a listening
+ * endpoint. It is passed the cma_id for the _new_ connection. The context in
+ * this cma_id is inherited from the listening cma_id and is the svc_xprt
+ * structure for the listening endpoint.
+ *
+ * This function creates a new xprt for the new connection and enqueues it on
+ * the accept queue for the listent xprt. When the listen thread is kicked, it
+ * will call the recvfrom method on the listen xprt which will accept the new
+ * connection.
+ */
+static void handle_connect_req(struct rdma_cm_id *new_cma_id, size_t client_ird)
+{
+ struct svcxprt_rdma *listen_xprt = new_cma_id->context;
+ struct svcxprt_rdma *newxprt;
+ struct sockaddr *sa;
+
+ /* Create a new transport */
+ newxprt = rdma_create_xprt(listen_xprt->sc_xprt.xpt_server, 0);
+ if (!newxprt) {
+ dprintk("svcrdma: failed to create new transport\n");
+ return;
+ }
+ newxprt->sc_cm_id = new_cma_id;
+ new_cma_id->context = newxprt;
+ dprintk("svcrdma: Creating newxprt=%p, cm_id=%p, listenxprt=%p\n",
+ newxprt, newxprt->sc_cm_id, listen_xprt);
+
+ /* Save client advertised inbound read limit for use later in accept. */
+ newxprt->sc_ord = client_ird;
+
+ /* Set the local and remote addresses in the transport */
+ sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr;
+ svc_xprt_set_remote(&newxprt->sc_xprt, sa, svc_addr_len(sa));
+ sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr;
+ svc_xprt_set_local(&newxprt->sc_xprt, sa, svc_addr_len(sa));
+
+ /*
+ * Enqueue the new transport on the accept queue of the listening
+ * transport
+ */
+ spin_lock_bh(&listen_xprt->sc_lock);
+ list_add_tail(&newxprt->sc_accept_q, &listen_xprt->sc_accept_q);
+ spin_unlock_bh(&listen_xprt->sc_lock);
+
+ set_bit(XPT_CONN, &listen_xprt->sc_xprt.xpt_flags);
+ svc_xprt_enqueue(&listen_xprt->sc_xprt);
+}
+
+/*
+ * Handles events generated on the listening endpoint. These events will be
+ * either be incoming connect requests or adapter removal events.
+ */
+static int rdma_listen_handler(struct rdma_cm_id *cma_id,
+ struct rdma_cm_event *event)
+{
+ struct svcxprt_rdma *xprt = cma_id->context;
+ int ret = 0;
+
+ switch (event->event) {
+ case RDMA_CM_EVENT_CONNECT_REQUEST:
+ dprintk("svcrdma: Connect request on cma_id=%p, xprt = %p, "
+ "event=%d\n", cma_id, cma_id->context, event->event);
+ handle_connect_req(cma_id,
+ event->param.conn.initiator_depth);
+ break;
+
+ case RDMA_CM_EVENT_ESTABLISHED:
+ /* Accept complete */
+ dprintk("svcrdma: Connection completed on LISTEN xprt=%p, "
+ "cm_id=%p\n", xprt, cma_id);
+ break;
+
+ case RDMA_CM_EVENT_DEVICE_REMOVAL:
+ dprintk("svcrdma: Device removal xprt=%p, cm_id=%p\n",
+ xprt, cma_id);
+ if (xprt)
+ set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
+ break;
+
+ default:
+ dprintk("svcrdma: Unexpected event on listening endpoint %p, "
+ "event=%d\n", cma_id, event->event);
+ break;
+ }
+
+ return ret;
+}
+
+static int rdma_cma_handler(struct rdma_cm_id *cma_id,
+ struct rdma_cm_event *event)
+{
+ struct svc_xprt *xprt = cma_id->context;
+ struct svcxprt_rdma *rdma =
+ container_of(xprt, struct svcxprt_rdma, sc_xprt);
+ switch (event->event) {
+ case RDMA_CM_EVENT_ESTABLISHED:
+ /* Accept complete */
+ svc_xprt_get(xprt);
+ dprintk("svcrdma: Connection completed on DTO xprt=%p, "
+ "cm_id=%p\n", xprt, cma_id);
+ clear_bit(RDMAXPRT_CONN_PENDING, &rdma->sc_flags);
+ svc_xprt_enqueue(xprt);
+ break;
+ case RDMA_CM_EVENT_DISCONNECTED:
+ dprintk("svcrdma: Disconnect on DTO xprt=%p, cm_id=%p\n",
+ xprt, cma_id);
+ if (xprt) {
+ set_bit(XPT_CLOSE, &xprt->xpt_flags);
+ svc_xprt_enqueue(xprt);
+ svc_xprt_put(xprt);
+ }
+ break;
+ case RDMA_CM_EVENT_DEVICE_REMOVAL:
+ dprintk("svcrdma: Device removal cma_id=%p, xprt = %p, "
+ "event=%d\n", cma_id, xprt, event->event);
+ if (xprt) {
+ set_bit(XPT_CLOSE, &xprt->xpt_flags);
+ svc_xprt_enqueue(xprt);
+ }
+ break;
+ default:
+ dprintk("svcrdma: Unexpected event on DTO endpoint %p, "
+ "event=%d\n", cma_id, event->event);
+ break;
+ }
+ return 0;
+}
+
+/*
+ * Create a listening RDMA service endpoint.
+ */
+static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
+ struct net *net,
+ struct sockaddr *sa, int salen,
+ int flags)
+{
+ struct rdma_cm_id *listen_id;
+ struct svcxprt_rdma *cma_xprt;
+ int ret;
+
+ dprintk("svcrdma: Creating RDMA socket\n");
+ if (sa->sa_family != AF_INET) {
+ dprintk("svcrdma: Address family %d is not supported.\n", sa->sa_family);
+ return ERR_PTR(-EAFNOSUPPORT);
+ }
+ cma_xprt = rdma_create_xprt(serv, 1);
+ if (!cma_xprt)
+ return ERR_PTR(-ENOMEM);
+
+ listen_id = rdma_create_id(rdma_listen_handler, cma_xprt, RDMA_PS_TCP,
+ IB_QPT_RC);
+ if (IS_ERR(listen_id)) {
+ ret = PTR_ERR(listen_id);
+ dprintk("svcrdma: rdma_create_id failed = %d\n", ret);
+ goto err0;
+ }
+
+ ret = rdma_bind_addr(listen_id, sa);
+ if (ret) {
+ dprintk("svcrdma: rdma_bind_addr failed = %d\n", ret);
+ goto err1;
+ }
+ cma_xprt->sc_cm_id = listen_id;
+
+ ret = rdma_listen(listen_id, RPCRDMA_LISTEN_BACKLOG);
+ if (ret) {
+ dprintk("svcrdma: rdma_listen failed = %d\n", ret);
+ goto err1;
+ }
+
+ /*
+ * We need to use the address from the cm_id in case the
+ * caller specified 0 for the port number.
+ */
+ sa = (struct sockaddr *)&cma_xprt->sc_cm_id->route.addr.src_addr;
+ svc_xprt_set_local(&cma_xprt->sc_xprt, sa, salen);
+
+ return &cma_xprt->sc_xprt;
+
+ err1:
+ rdma_destroy_id(listen_id);
+ err0:
+ kfree(cma_xprt);
+ return ERR_PTR(ret);
+}
+
+static struct svc_rdma_fastreg_mr *rdma_alloc_frmr(struct svcxprt_rdma *xprt)
+{
+ struct ib_mr *mr;
+ struct ib_fast_reg_page_list *pl;
+ struct svc_rdma_fastreg_mr *frmr;
+
+ frmr = kmalloc(sizeof(*frmr), GFP_KERNEL);
+ if (!frmr)
+ goto err;
+
+ mr = ib_alloc_fast_reg_mr(xprt->sc_pd, RPCSVC_MAXPAGES);
+ if (IS_ERR(mr))
+ goto err_free_frmr;
+
+ pl = ib_alloc_fast_reg_page_list(xprt->sc_cm_id->device,
+ RPCSVC_MAXPAGES);
+ if (IS_ERR(pl))
+ goto err_free_mr;
+
+ frmr->mr = mr;
+ frmr->page_list = pl;
+ INIT_LIST_HEAD(&frmr->frmr_list);
+ return frmr;
+
+ err_free_mr:
+ ib_dereg_mr(mr);
+ err_free_frmr:
+ kfree(frmr);
+ err:
+ return ERR_PTR(-ENOMEM);
+}
+
+static void rdma_dealloc_frmr_q(struct svcxprt_rdma *xprt)
+{
+ struct svc_rdma_fastreg_mr *frmr;
+
+ while (!list_empty(&xprt->sc_frmr_q)) {
+ frmr = list_entry(xprt->sc_frmr_q.next,
+ struct svc_rdma_fastreg_mr, frmr_list);
+ list_del_init(&frmr->frmr_list);
+ ib_dereg_mr(frmr->mr);
+ ib_free_fast_reg_page_list(frmr->page_list);
+ kfree(frmr);
+ }
+}
+
+struct svc_rdma_fastreg_mr *svc_rdma_get_frmr(struct svcxprt_rdma *rdma)
+{
+ struct svc_rdma_fastreg_mr *frmr = NULL;
+
+ spin_lock_bh(&rdma->sc_frmr_q_lock);
+ if (!list_empty(&rdma->sc_frmr_q)) {
+ frmr = list_entry(rdma->sc_frmr_q.next,
+ struct svc_rdma_fastreg_mr, frmr_list);
+ list_del_init(&frmr->frmr_list);
+ frmr->map_len = 0;
+ frmr->page_list_len = 0;
+ }
+ spin_unlock_bh(&rdma->sc_frmr_q_lock);
+ if (frmr)
+ return frmr;
+
+ return rdma_alloc_frmr(rdma);
+}
+
+static void frmr_unmap_dma(struct svcxprt_rdma *xprt,
+ struct svc_rdma_fastreg_mr *frmr)
+{
+ int page_no;
+ for (page_no = 0; page_no < frmr->page_list_len; page_no++) {
+ dma_addr_t addr = frmr->page_list->page_list[page_no];
+ if (ib_dma_mapping_error(frmr->mr->device, addr))
+ continue;
+ atomic_dec(&xprt->sc_dma_used);
+ ib_dma_unmap_page(frmr->mr->device, addr, PAGE_SIZE,
+ frmr->direction);
+ }
+}
+
+void svc_rdma_put_frmr(struct svcxprt_rdma *rdma,
+ struct svc_rdma_fastreg_mr *frmr)
+{
+ if (frmr) {
+ frmr_unmap_dma(rdma, frmr);
+ spin_lock_bh(&rdma->sc_frmr_q_lock);
+ WARN_ON_ONCE(!list_empty(&frmr->frmr_list));
+ list_add(&frmr->frmr_list, &rdma->sc_frmr_q);
+ spin_unlock_bh(&rdma->sc_frmr_q_lock);
+ }
+}
+
+/*
+ * This is the xpo_recvfrom function for listening endpoints. Its
+ * purpose is to accept incoming connections. The CMA callback handler
+ * has already created a new transport and attached it to the new CMA
+ * ID.
+ *
+ * There is a queue of pending connections hung on the listening
+ * transport. This queue contains the new svc_xprt structure. This
+ * function takes svc_xprt structures off the accept_q and completes
+ * the connection.
+ */
+static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
+{
+ struct svcxprt_rdma *listen_rdma;
+ struct svcxprt_rdma *newxprt = NULL;
+ struct rdma_conn_param conn_param;
+ struct ib_qp_init_attr qp_attr;
+ struct ib_device_attr devattr;
+ int uninitialized_var(dma_mr_acc);
+ int need_dma_mr;
+ int ret;
+ int i;
+
+ listen_rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt);
+ clear_bit(XPT_CONN, &xprt->xpt_flags);
+ /* Get the next entry off the accept list */
+ spin_lock_bh(&listen_rdma->sc_lock);
+ if (!list_empty(&listen_rdma->sc_accept_q)) {
+ newxprt = list_entry(listen_rdma->sc_accept_q.next,
+ struct svcxprt_rdma, sc_accept_q);
+ list_del_init(&newxprt->sc_accept_q);
+ }
+ if (!list_empty(&listen_rdma->sc_accept_q))
+ set_bit(XPT_CONN, &listen_rdma->sc_xprt.xpt_flags);
+ spin_unlock_bh(&listen_rdma->sc_lock);
+ if (!newxprt)
+ return NULL;
+
+ dprintk("svcrdma: newxprt from accept queue = %p, cm_id=%p\n",
+ newxprt, newxprt->sc_cm_id);
+
+ ret = ib_query_device(newxprt->sc_cm_id->device, &devattr);
+ if (ret) {
+ dprintk("svcrdma: could not query device attributes on "
+ "device %p, rc=%d\n", newxprt->sc_cm_id->device, ret);
+ goto errout;
+ }
+
+ /* Qualify the transport resource defaults with the
+ * capabilities of this particular device */
+ newxprt->sc_max_sge = min((size_t)devattr.max_sge,
+ (size_t)RPCSVC_MAXPAGES);
+ newxprt->sc_max_requests = min((size_t)devattr.max_qp_wr,
+ (size_t)svcrdma_max_requests);
+ newxprt->sc_sq_depth = RPCRDMA_SQ_DEPTH_MULT * newxprt->sc_max_requests;
+
+ /*
+ * Limit ORD based on client limit, local device limit, and
+ * configured svcrdma limit.
+ */
+ newxprt->sc_ord = min_t(size_t, devattr.max_qp_rd_atom, newxprt->sc_ord);
+ newxprt->sc_ord = min_t(size_t, svcrdma_ord, newxprt->sc_ord);
+
+ newxprt->sc_pd = ib_alloc_pd(newxprt->sc_cm_id->device);
+ if (IS_ERR(newxprt->sc_pd)) {
+ dprintk("svcrdma: error creating PD for connect request\n");
+ goto errout;
+ }
+ newxprt->sc_sq_cq = ib_create_cq(newxprt->sc_cm_id->device,
+ sq_comp_handler,
+ cq_event_handler,
+ newxprt,
+ newxprt->sc_sq_depth,
+ 0);
+ if (IS_ERR(newxprt->sc_sq_cq)) {
+ dprintk("svcrdma: error creating SQ CQ for connect request\n");
+ goto errout;
+ }
+ newxprt->sc_rq_cq = ib_create_cq(newxprt->sc_cm_id->device,
+ rq_comp_handler,
+ cq_event_handler,
+ newxprt,
+ newxprt->sc_max_requests,
+ 0);
+ if (IS_ERR(newxprt->sc_rq_cq)) {
+ dprintk("svcrdma: error creating RQ CQ for connect request\n");
+ goto errout;
+ }
+
+ memset(&qp_attr, 0, sizeof qp_attr);
+ qp_attr.event_handler = qp_event_handler;
+ qp_attr.qp_context = &newxprt->sc_xprt;
+ qp_attr.cap.max_send_wr = newxprt->sc_sq_depth;
+ qp_attr.cap.max_recv_wr = newxprt->sc_max_requests;
+ qp_attr.cap.max_send_sge = newxprt->sc_max_sge;
+ qp_attr.cap.max_recv_sge = newxprt->sc_max_sge;
+ qp_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
+ qp_attr.qp_type = IB_QPT_RC;
+ qp_attr.send_cq = newxprt->sc_sq_cq;
+ qp_attr.recv_cq = newxprt->sc_rq_cq;
+ dprintk("svcrdma: newxprt->sc_cm_id=%p, newxprt->sc_pd=%p\n"
+ " cm_id->device=%p, sc_pd->device=%p\n"
+ " cap.max_send_wr = %d\n"
+ " cap.max_recv_wr = %d\n"
+ " cap.max_send_sge = %d\n"
+ " cap.max_recv_sge = %d\n",
+ newxprt->sc_cm_id, newxprt->sc_pd,
+ newxprt->sc_cm_id->device, newxprt->sc_pd->device,
+ qp_attr.cap.max_send_wr,
+ qp_attr.cap.max_recv_wr,
+ qp_attr.cap.max_send_sge,
+ qp_attr.cap.max_recv_sge);
+
+ ret = rdma_create_qp(newxprt->sc_cm_id, newxprt->sc_pd, &qp_attr);
+ if (ret) {
+ dprintk("svcrdma: failed to create QP, ret=%d\n", ret);
+ goto errout;
+ }
+ newxprt->sc_qp = newxprt->sc_cm_id->qp;
+
+ /*
+ * Use the most secure set of MR resources based on the
+ * transport type and available memory management features in
+ * the device. Here's the table implemented below:
+ *
+ * Fast Global DMA Remote WR
+ * Reg LKEY MR Access
+ * Sup'd Sup'd Needed Needed
+ *
+ * IWARP N N Y Y
+ * N Y Y Y
+ * Y N Y N
+ * Y Y N -
+ *
+ * IB N N Y N
+ * N Y N -
+ * Y N Y N
+ * Y Y N -
+ *
+ * NB: iWARP requires remote write access for the data sink
+ * of an RDMA_READ. IB does not.
+ */
+ newxprt->sc_reader = rdma_read_chunk_lcl;
+ if (devattr.device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS) {
+ newxprt->sc_frmr_pg_list_len =
+ devattr.max_fast_reg_page_list_len;
+ newxprt->sc_dev_caps |= SVCRDMA_DEVCAP_FAST_REG;
+ newxprt->sc_reader = rdma_read_chunk_frmr;
+ }
+
+ /*
+ * Determine if a DMA MR is required and if so, what privs are required
+ */
+ switch (rdma_node_get_transport(newxprt->sc_cm_id->device->node_type)) {
+ case RDMA_TRANSPORT_IWARP:
+ newxprt->sc_dev_caps |= SVCRDMA_DEVCAP_READ_W_INV;
+ if (!(newxprt->sc_dev_caps & SVCRDMA_DEVCAP_FAST_REG)) {
+ need_dma_mr = 1;
+ dma_mr_acc =
+ (IB_ACCESS_LOCAL_WRITE |
+ IB_ACCESS_REMOTE_WRITE);
+ } else if (!(devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY)) {
+ need_dma_mr = 1;
+ dma_mr_acc = IB_ACCESS_LOCAL_WRITE;
+ } else
+ need_dma_mr = 0;
+ break;
+ case RDMA_TRANSPORT_IB:
+ if (!(newxprt->sc_dev_caps & SVCRDMA_DEVCAP_FAST_REG)) {
+ need_dma_mr = 1;
+ dma_mr_acc = IB_ACCESS_LOCAL_WRITE;
+ } else if (!(devattr.device_cap_flags &
+ IB_DEVICE_LOCAL_DMA_LKEY)) {
+ need_dma_mr = 1;
+ dma_mr_acc = IB_ACCESS_LOCAL_WRITE;
+ } else
+ need_dma_mr = 0;
+ break;
+ default:
+ goto errout;
+ }
+
+ /* Create the DMA MR if needed, otherwise, use the DMA LKEY */
+ if (need_dma_mr) {
+ /* Register all of physical memory */
+ newxprt->sc_phys_mr =
+ ib_get_dma_mr(newxprt->sc_pd, dma_mr_acc);
+ if (IS_ERR(newxprt->sc_phys_mr)) {
+ dprintk("svcrdma: Failed to create DMA MR ret=%d\n",
+ ret);
+ goto errout;
+ }
+ newxprt->sc_dma_lkey = newxprt->sc_phys_mr->lkey;
+ } else
+ newxprt->sc_dma_lkey =
+ newxprt->sc_cm_id->device->local_dma_lkey;
+
+ /* Post receive buffers */
+ for (i = 0; i < newxprt->sc_max_requests; i++) {
+ ret = svc_rdma_post_recv(newxprt);
+ if (ret) {
+ dprintk("svcrdma: failure posting receive buffers\n");
+ goto errout;
+ }
+ }
+
+ /* Swap out the handler */
+ newxprt->sc_cm_id->event_handler = rdma_cma_handler;
+
+ /*
+ * Arm the CQs for the SQ and RQ before accepting so we can't
+ * miss the first message
+ */
+ ib_req_notify_cq(newxprt->sc_sq_cq, IB_CQ_NEXT_COMP);
+ ib_req_notify_cq(newxprt->sc_rq_cq, IB_CQ_NEXT_COMP);
+
+ /* Accept Connection */
+ set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags);
+ memset(&conn_param, 0, sizeof conn_param);
+ conn_param.responder_resources = 0;
+ conn_param.initiator_depth = newxprt->sc_ord;
+ ret = rdma_accept(newxprt->sc_cm_id, &conn_param);
+ if (ret) {
+ dprintk("svcrdma: failed to accept new connection, ret=%d\n",
+ ret);
+ goto errout;
+ }
+
+ dprintk("svcrdma: new connection %p accepted with the following "
+ "attributes:\n"
+ " local_ip : %pI4\n"
+ " local_port : %d\n"
+ " remote_ip : %pI4\n"
+ " remote_port : %d\n"
+ " max_sge : %d\n"
+ " sq_depth : %d\n"
+ " max_requests : %d\n"
+ " ord : %d\n",
+ newxprt,
+ &((struct sockaddr_in *)&newxprt->sc_cm_id->
+ route.addr.src_addr)->sin_addr.s_addr,
+ ntohs(((struct sockaddr_in *)&newxprt->sc_cm_id->
+ route.addr.src_addr)->sin_port),
+ &((struct sockaddr_in *)&newxprt->sc_cm_id->
+ route.addr.dst_addr)->sin_addr.s_addr,
+ ntohs(((struct sockaddr_in *)&newxprt->sc_cm_id->
+ route.addr.dst_addr)->sin_port),
+ newxprt->sc_max_sge,
+ newxprt->sc_sq_depth,
+ newxprt->sc_max_requests,
+ newxprt->sc_ord);
+
+ return &newxprt->sc_xprt;
+
+ errout:
+ dprintk("svcrdma: failure accepting new connection rc=%d.\n", ret);
+ /* Take a reference in case the DTO handler runs */
+ svc_xprt_get(&newxprt->sc_xprt);
+ if (newxprt->sc_qp && !IS_ERR(newxprt->sc_qp))
+ ib_destroy_qp(newxprt->sc_qp);
+ rdma_destroy_id(newxprt->sc_cm_id);
+ /* This call to put will destroy the transport */
+ svc_xprt_put(&newxprt->sc_xprt);
+ return NULL;
+}
+
+static void svc_rdma_release_rqst(struct svc_rqst *rqstp)
+{
+}
+
+/*
+ * When connected, an svc_xprt has at least two references:
+ *
+ * - A reference held by the cm_id between the ESTABLISHED and
+ * DISCONNECTED events. If the remote peer disconnected first, this
+ * reference could be gone.
+ *
+ * - A reference held by the svc_recv code that called this function
+ * as part of close processing.
+ *
+ * At a minimum one references should still be held.
+ */
+static void svc_rdma_detach(struct svc_xprt *xprt)
+{
+ struct svcxprt_rdma *rdma =
+ container_of(xprt, struct svcxprt_rdma, sc_xprt);
+ dprintk("svc: svc_rdma_detach(%p)\n", xprt);
+
+ /* Disconnect and flush posted WQE */
+ rdma_disconnect(rdma->sc_cm_id);
+}
+
+static void __svc_rdma_free(struct work_struct *work)
+{
+ struct svcxprt_rdma *rdma =
+ container_of(work, struct svcxprt_rdma, sc_work);
+ dprintk("svcrdma: svc_rdma_free(%p)\n", rdma);
+
+ /* We should only be called from kref_put */
+ if (atomic_read(&rdma->sc_xprt.xpt_ref.refcount) != 0)
+ pr_err("svcrdma: sc_xprt still in use? (%d)\n",
+ atomic_read(&rdma->sc_xprt.xpt_ref.refcount));
+
+ /*
+ * Destroy queued, but not processed read completions. Note
+ * that this cleanup has to be done before destroying the
+ * cm_id because the device ptr is needed to unmap the dma in
+ * svc_rdma_put_context.
+ */
+ while (!list_empty(&rdma->sc_read_complete_q)) {
+ struct svc_rdma_op_ctxt *ctxt;
+ ctxt = list_entry(rdma->sc_read_complete_q.next,
+ struct svc_rdma_op_ctxt,
+ dto_q);
+ list_del_init(&ctxt->dto_q);
+ svc_rdma_put_context(ctxt, 1);
+ }
+
+ /* Destroy queued, but not processed recv completions */
+ while (!list_empty(&rdma->sc_rq_dto_q)) {
+ struct svc_rdma_op_ctxt *ctxt;
+ ctxt = list_entry(rdma->sc_rq_dto_q.next,
+ struct svc_rdma_op_ctxt,
+ dto_q);
+ list_del_init(&ctxt->dto_q);
+ svc_rdma_put_context(ctxt, 1);
+ }
+
+ /* Warn if we leaked a resource or under-referenced */
+ if (atomic_read(&rdma->sc_ctxt_used) != 0)
+ pr_err("svcrdma: ctxt still in use? (%d)\n",
+ atomic_read(&rdma->sc_ctxt_used));
+ if (atomic_read(&rdma->sc_dma_used) != 0)
+ pr_err("svcrdma: dma still in use? (%d)\n",
+ atomic_read(&rdma->sc_dma_used));
+
+ /* De-allocate fastreg mr */
+ rdma_dealloc_frmr_q(rdma);
+
+ /* Destroy the QP if present (not a listener) */
+ if (rdma->sc_qp && !IS_ERR(rdma->sc_qp))
+ ib_destroy_qp(rdma->sc_qp);
+
+ if (rdma->sc_sq_cq && !IS_ERR(rdma->sc_sq_cq))
+ ib_destroy_cq(rdma->sc_sq_cq);
+
+ if (rdma->sc_rq_cq && !IS_ERR(rdma->sc_rq_cq))
+ ib_destroy_cq(rdma->sc_rq_cq);
+
+ if (rdma->sc_phys_mr && !IS_ERR(rdma->sc_phys_mr))
+ ib_dereg_mr(rdma->sc_phys_mr);
+
+ if (rdma->sc_pd && !IS_ERR(rdma->sc_pd))
+ ib_dealloc_pd(rdma->sc_pd);
+
+ /* Destroy the CM ID */
+ rdma_destroy_id(rdma->sc_cm_id);
+
+ kfree(rdma);
+}
+
+static void svc_rdma_free(struct svc_xprt *xprt)
+{
+ struct svcxprt_rdma *rdma =
+ container_of(xprt, struct svcxprt_rdma, sc_xprt);
+ INIT_WORK(&rdma->sc_work, __svc_rdma_free);
+ queue_work(svc_rdma_wq, &rdma->sc_work);
+}
+
+static int svc_rdma_has_wspace(struct svc_xprt *xprt)
+{
+ struct svcxprt_rdma *rdma =
+ container_of(xprt, struct svcxprt_rdma, sc_xprt);
+
+ /*
+ * If there are already waiters on the SQ,
+ * return false.
+ */
+ if (waitqueue_active(&rdma->sc_send_wait))
+ return 0;
+
+ /* Otherwise return true. */
+ return 1;
+}
+
+static int svc_rdma_secure_port(struct svc_rqst *rqstp)
+{
+ return 1;
+}
+
+/*
+ * Attempt to register the kvec representing the RPC memory with the
+ * device.
+ *
+ * Returns:
+ * NULL : The device does not support fastreg or there were no more
+ * fastreg mr.
+ * frmr : The kvec register request was successfully posted.
+ * <0 : An error was encountered attempting to register the kvec.
+ */
+int svc_rdma_fastreg(struct svcxprt_rdma *xprt,
+ struct svc_rdma_fastreg_mr *frmr)
+{
+ struct ib_send_wr fastreg_wr;
+ u8 key;
+
+ /* Bump the key */
+ key = (u8)(frmr->mr->lkey & 0x000000FF);
+ ib_update_fast_reg_key(frmr->mr, ++key);
+
+ /* Prepare FASTREG WR */
+ memset(&fastreg_wr, 0, sizeof fastreg_wr);
+ fastreg_wr.opcode = IB_WR_FAST_REG_MR;
+ fastreg_wr.send_flags = IB_SEND_SIGNALED;
+ fastreg_wr.wr.fast_reg.iova_start = (unsigned long)frmr->kva;
+ fastreg_wr.wr.fast_reg.page_list = frmr->page_list;
+ fastreg_wr.wr.fast_reg.page_list_len = frmr->page_list_len;
+ fastreg_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
+ fastreg_wr.wr.fast_reg.length = frmr->map_len;
+ fastreg_wr.wr.fast_reg.access_flags = frmr->access_flags;
+ fastreg_wr.wr.fast_reg.rkey = frmr->mr->lkey;
+ return svc_rdma_send(xprt, &fastreg_wr);
+}
+
+int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
+{
+ struct ib_send_wr *bad_wr, *n_wr;
+ int wr_count;
+ int i;
+ int ret;
+
+ if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags))
+ return -ENOTCONN;
+
+ wr_count = 1;
+ for (n_wr = wr->next; n_wr; n_wr = n_wr->next)
+ wr_count++;
+
+ /* If the SQ is full, wait until an SQ entry is available */
+ while (1) {
+ spin_lock_bh(&xprt->sc_lock);
+ if (xprt->sc_sq_depth < atomic_read(&xprt->sc_sq_count) + wr_count) {
+ spin_unlock_bh(&xprt->sc_lock);
+ atomic_inc(&rdma_stat_sq_starve);
+
+ /* See if we can opportunistically reap SQ WR to make room */
+ sq_cq_reap(xprt);
+
+ /* Wait until SQ WR available if SQ still full */
+ wait_event(xprt->sc_send_wait,
+ atomic_read(&xprt->sc_sq_count) <
+ xprt->sc_sq_depth);
+ if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags))
+ return -ENOTCONN;
+ continue;
+ }
+ /* Take a transport ref for each WR posted */
+ for (i = 0; i < wr_count; i++)
+ svc_xprt_get(&xprt->sc_xprt);
+
+ /* Bump used SQ WR count and post */
+ atomic_add(wr_count, &xprt->sc_sq_count);
+ ret = ib_post_send(xprt->sc_qp, wr, &bad_wr);
+ if (ret) {
+ set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
+ atomic_sub(wr_count, &xprt->sc_sq_count);
+ for (i = 0; i < wr_count; i ++)
+ svc_xprt_put(&xprt->sc_xprt);
+ dprintk("svcrdma: failed to post SQ WR rc=%d, "
+ "sc_sq_count=%d, sc_sq_depth=%d\n",
+ ret, atomic_read(&xprt->sc_sq_count),
+ xprt->sc_sq_depth);
+ }
+ spin_unlock_bh(&xprt->sc_lock);
+ if (ret)
+ wake_up(&xprt->sc_send_wait);
+ break;
+ }
+ return ret;
+}
+
+void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
+ enum rpcrdma_errcode err)
+{
+ struct ib_send_wr err_wr;
+ struct page *p;
+ struct svc_rdma_op_ctxt *ctxt;
+ u32 *va;
+ int length;
+ int ret;
+
+ p = svc_rdma_get_page();
+ va = page_address(p);
+
+ /* XDR encode error */
+ length = svc_rdma_xdr_encode_error(xprt, rmsgp, err, va);
+
+ ctxt = svc_rdma_get_context(xprt);
+ ctxt->direction = DMA_FROM_DEVICE;
+ ctxt->count = 1;
+ ctxt->pages[0] = p;
+
+ /* Prepare SGE for local address */
+ ctxt->sge[0].addr = ib_dma_map_page(xprt->sc_cm_id->device,
+ p, 0, length, DMA_FROM_DEVICE);
+ if (ib_dma_mapping_error(xprt->sc_cm_id->device, ctxt->sge[0].addr)) {
+ put_page(p);
+ svc_rdma_put_context(ctxt, 1);
+ return;
+ }
+ atomic_inc(&xprt->sc_dma_used);
+ ctxt->sge[0].lkey = xprt->sc_dma_lkey;
+ ctxt->sge[0].length = length;
+
+ /* Prepare SEND WR */
+ memset(&err_wr, 0, sizeof err_wr);
+ ctxt->wr_op = IB_WR_SEND;
+ err_wr.wr_id = (unsigned long)ctxt;
+ err_wr.sg_list = ctxt->sge;
+ err_wr.num_sge = 1;
+ err_wr.opcode = IB_WR_SEND;
+ err_wr.send_flags = IB_SEND_SIGNALED;
+
+ /* Post It */
+ ret = svc_rdma_send(xprt, &err_wr);
+ if (ret) {
+ dprintk("svcrdma: Error %d posting send for protocol error\n",
+ ret);
+ svc_rdma_unmap_dma(ctxt);
+ svc_rdma_put_context(ctxt, 1);
+ }
+}
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
new file mode 100644
index 000000000..54f23b1be
--- /dev/null
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -0,0 +1,758 @@
+/*
+ * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses. You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the BSD-type
+ * license below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided
+ * with the distribution.
+ *
+ * Neither the name of the Network Appliance, Inc. nor the names of
+ * its contributors may be used to endorse or promote products
+ * derived from this software without specific prior written
+ * permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/*
+ * transport.c
+ *
+ * This file contains the top-level implementation of an RPC RDMA
+ * transport.
+ *
+ * Naming convention: functions beginning with xprt_ are part of the
+ * transport switch. All others are RPC RDMA internal.
+ */
+
+#include <linux/module.h>
+#include <linux/init.h>
+#include <linux/slab.h>
+#include <linux/seq_file.h>
+#include <linux/sunrpc/addr.h>
+
+#include "xprt_rdma.h"
+
+#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
+# define RPCDBG_FACILITY RPCDBG_TRANS
+#endif
+
+MODULE_LICENSE("Dual BSD/GPL");
+
+MODULE_DESCRIPTION("RPC/RDMA Transport for Linux kernel NFS");
+MODULE_AUTHOR("Network Appliance, Inc.");
+
+/*
+ * tunables
+ */
+
+static unsigned int xprt_rdma_slot_table_entries = RPCRDMA_DEF_SLOT_TABLE;
+static unsigned int xprt_rdma_max_inline_read = RPCRDMA_DEF_INLINE;
+static unsigned int xprt_rdma_max_inline_write = RPCRDMA_DEF_INLINE;
+static unsigned int xprt_rdma_inline_write_padding;
+static unsigned int xprt_rdma_memreg_strategy = RPCRDMA_FRMR;
+ int xprt_rdma_pad_optimize = 1;
+
+#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
+
+static unsigned int min_slot_table_size = RPCRDMA_MIN_SLOT_TABLE;
+static unsigned int max_slot_table_size = RPCRDMA_MAX_SLOT_TABLE;
+static unsigned int zero;
+static unsigned int max_padding = PAGE_SIZE;
+static unsigned int min_memreg = RPCRDMA_BOUNCEBUFFERS;
+static unsigned int max_memreg = RPCRDMA_LAST - 1;
+
+static struct ctl_table_header *sunrpc_table_header;
+
+static struct ctl_table xr_tunables_table[] = {
+ {
+ .procname = "rdma_slot_table_entries",
+ .data = &xprt_rdma_slot_table_entries,
+ .maxlen = sizeof(unsigned int),
+ .mode = 0644,
+ .proc_handler = proc_dointvec_minmax,
+ .extra1 = &min_slot_table_size,
+ .extra2 = &max_slot_table_size
+ },
+ {
+ .procname = "rdma_max_inline_read",
+ .data = &xprt_rdma_max_inline_read,
+ .maxlen = sizeof(unsigned int),
+ .mode = 0644,
+ .proc_handler = proc_dointvec,
+ },
+ {
+ .procname = "rdma_max_inline_write",
+ .data = &xprt_rdma_max_inline_write,
+ .maxlen = sizeof(unsigned int),
+ .mode = 0644,
+ .proc_handler = proc_dointvec,
+ },
+ {
+ .procname = "rdma_inline_write_padding",
+ .data = &xprt_rdma_inline_write_padding,
+ .maxlen = sizeof(unsigned int),
+ .mode = 0644,
+ .proc_handler = proc_dointvec_minmax,
+ .extra1 = &zero,
+ .extra2 = &max_padding,
+ },
+ {
+ .procname = "rdma_memreg_strategy",
+ .data = &xprt_rdma_memreg_strategy,
+ .maxlen = sizeof(unsigned int),
+ .mode = 0644,
+ .proc_handler = proc_dointvec_minmax,
+ .extra1 = &min_memreg,
+ .extra2 = &max_memreg,
+ },
+ {
+ .procname = "rdma_pad_optimize",
+ .data = &xprt_rdma_pad_optimize,
+ .maxlen = sizeof(unsigned int),
+ .mode = 0644,
+ .proc_handler = proc_dointvec,
+ },
+ { },
+};
+
+static struct ctl_table sunrpc_table[] = {
+ {
+ .procname = "sunrpc",
+ .mode = 0555,
+ .child = xr_tunables_table
+ },
+ { },
+};
+
+#endif
+
+#define RPCRDMA_BIND_TO (60U * HZ)
+#define RPCRDMA_INIT_REEST_TO (5U * HZ)
+#define RPCRDMA_MAX_REEST_TO (30U * HZ)
+#define RPCRDMA_IDLE_DISC_TO (5U * 60 * HZ)
+
+static struct rpc_xprt_ops xprt_rdma_procs; /* forward reference */
+
+static void
+xprt_rdma_format_addresses4(struct rpc_xprt *xprt, struct sockaddr *sap)
+{
+ struct sockaddr_in *sin = (struct sockaddr_in *)sap;
+ char buf[20];
+
+ snprintf(buf, sizeof(buf), "%08x", ntohl(sin->sin_addr.s_addr));
+ xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = kstrdup(buf, GFP_KERNEL);
+
+ xprt->address_strings[RPC_DISPLAY_NETID] = RPCBIND_NETID_RDMA;
+}
+
+static void
+xprt_rdma_format_addresses6(struct rpc_xprt *xprt, struct sockaddr *sap)
+{
+ struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)sap;
+ char buf[40];
+
+ snprintf(buf, sizeof(buf), "%pi6", &sin6->sin6_addr);
+ xprt->address_strings[RPC_DISPLAY_HEX_ADDR] = kstrdup(buf, GFP_KERNEL);
+
+ xprt->address_strings[RPC_DISPLAY_NETID] = RPCBIND_NETID_RDMA6;
+}
+
+static void
+xprt_rdma_format_addresses(struct rpc_xprt *xprt)
+{
+ struct sockaddr *sap = (struct sockaddr *)
+ &rpcx_to_rdmad(xprt).addr;
+ char buf[128];
+
+ switch (sap->sa_family) {
+ case AF_INET:
+ xprt_rdma_format_addresses4(xprt, sap);
+ break;
+ case AF_INET6:
+ xprt_rdma_format_addresses6(xprt, sap);
+ break;
+ default:
+ pr_err("rpcrdma: Unrecognized address family\n");
+ return;
+ }
+
+ (void)rpc_ntop(sap, buf, sizeof(buf));
+ xprt->address_strings[RPC_DISPLAY_ADDR] = kstrdup(buf, GFP_KERNEL);
+
+ snprintf(buf, sizeof(buf), "%u", rpc_get_port(sap));
+ xprt->address_strings[RPC_DISPLAY_PORT] = kstrdup(buf, GFP_KERNEL);
+
+ snprintf(buf, sizeof(buf), "%4hx", rpc_get_port(sap));
+ xprt->address_strings[RPC_DISPLAY_HEX_PORT] = kstrdup(buf, GFP_KERNEL);
+
+ xprt->address_strings[RPC_DISPLAY_PROTO] = "rdma";
+}
+
+static void
+xprt_rdma_free_addresses(struct rpc_xprt *xprt)
+{
+ unsigned int i;
+
+ for (i = 0; i < RPC_DISPLAY_MAX; i++)
+ switch (i) {
+ case RPC_DISPLAY_PROTO:
+ case RPC_DISPLAY_NETID:
+ continue;
+ default:
+ kfree(xprt->address_strings[i]);
+ }
+}
+
+static void
+xprt_rdma_connect_worker(struct work_struct *work)
+{
+ struct rpcrdma_xprt *r_xprt = container_of(work, struct rpcrdma_xprt,
+ rx_connect_worker.work);
+ struct rpc_xprt *xprt = &r_xprt->rx_xprt;
+ int rc = 0;
+
+ xprt_clear_connected(xprt);
+
+ dprintk("RPC: %s: %sconnect\n", __func__,
+ r_xprt->rx_ep.rep_connected != 0 ? "re" : "");
+ rc = rpcrdma_ep_connect(&r_xprt->rx_ep, &r_xprt->rx_ia);
+ if (rc)
+ xprt_wake_pending_tasks(xprt, rc);
+
+ dprintk("RPC: %s: exit\n", __func__);
+ xprt_clear_connecting(xprt);
+}
+
+/*
+ * xprt_rdma_destroy
+ *
+ * Destroy the xprt.
+ * Free all memory associated with the object, including its own.
+ * NOTE: none of the *destroy methods free memory for their top-level
+ * objects, even though they may have allocated it (they do free
+ * private memory). It's up to the caller to handle it. In this
+ * case (RDMA transport), all structure memory is inlined with the
+ * struct rpcrdma_xprt.
+ */
+static void
+xprt_rdma_destroy(struct rpc_xprt *xprt)
+{
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+
+ dprintk("RPC: %s: called\n", __func__);
+
+ cancel_delayed_work_sync(&r_xprt->rx_connect_worker);
+
+ xprt_clear_connected(xprt);
+
+ rpcrdma_buffer_destroy(&r_xprt->rx_buf);
+ rpcrdma_ep_destroy(&r_xprt->rx_ep, &r_xprt->rx_ia);
+ rpcrdma_ia_close(&r_xprt->rx_ia);
+
+ xprt_rdma_free_addresses(xprt);
+
+ xprt_free(xprt);
+
+ dprintk("RPC: %s: returning\n", __func__);
+
+ module_put(THIS_MODULE);
+}
+
+static const struct rpc_timeout xprt_rdma_default_timeout = {
+ .to_initval = 60 * HZ,
+ .to_maxval = 60 * HZ,
+};
+
+/**
+ * xprt_setup_rdma - Set up transport to use RDMA
+ *
+ * @args: rpc transport arguments
+ */
+static struct rpc_xprt *
+xprt_setup_rdma(struct xprt_create *args)
+{
+ struct rpcrdma_create_data_internal cdata;
+ struct rpc_xprt *xprt;
+ struct rpcrdma_xprt *new_xprt;
+ struct rpcrdma_ep *new_ep;
+ struct sockaddr_in *sin;
+ int rc;
+
+ if (args->addrlen > sizeof(xprt->addr)) {
+ dprintk("RPC: %s: address too large\n", __func__);
+ return ERR_PTR(-EBADF);
+ }
+
+ xprt = xprt_alloc(args->net, sizeof(struct rpcrdma_xprt),
+ xprt_rdma_slot_table_entries,
+ xprt_rdma_slot_table_entries);
+ if (xprt == NULL) {
+ dprintk("RPC: %s: couldn't allocate rpcrdma_xprt\n",
+ __func__);
+ return ERR_PTR(-ENOMEM);
+ }
+
+ /* 60 second timeout, no retries */
+ xprt->timeout = &xprt_rdma_default_timeout;
+ xprt->bind_timeout = RPCRDMA_BIND_TO;
+ xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
+ xprt->idle_timeout = RPCRDMA_IDLE_DISC_TO;
+
+ xprt->resvport = 0; /* privileged port not needed */
+ xprt->tsh_size = 0; /* RPC-RDMA handles framing */
+ xprt->ops = &xprt_rdma_procs;
+
+ /*
+ * Set up RDMA-specific connect data.
+ */
+
+ /* Put server RDMA address in local cdata */
+ memcpy(&cdata.addr, args->dstaddr, args->addrlen);
+
+ /* Ensure xprt->addr holds valid server TCP (not RDMA)
+ * address, for any side protocols which peek at it */
+ xprt->prot = IPPROTO_TCP;
+ xprt->addrlen = args->addrlen;
+ memcpy(&xprt->addr, &cdata.addr, xprt->addrlen);
+
+ sin = (struct sockaddr_in *)&cdata.addr;
+ if (ntohs(sin->sin_port) != 0)
+ xprt_set_bound(xprt);
+
+ dprintk("RPC: %s: %pI4:%u\n",
+ __func__, &sin->sin_addr.s_addr, ntohs(sin->sin_port));
+
+ /* Set max requests */
+ cdata.max_requests = xprt->max_reqs;
+
+ /* Set some length limits */
+ cdata.rsize = RPCRDMA_MAX_SEGS * PAGE_SIZE; /* RDMA write max */
+ cdata.wsize = RPCRDMA_MAX_SEGS * PAGE_SIZE; /* RDMA read max */
+
+ cdata.inline_wsize = xprt_rdma_max_inline_write;
+ if (cdata.inline_wsize > cdata.wsize)
+ cdata.inline_wsize = cdata.wsize;
+
+ cdata.inline_rsize = xprt_rdma_max_inline_read;
+ if (cdata.inline_rsize > cdata.rsize)
+ cdata.inline_rsize = cdata.rsize;
+
+ cdata.padding = xprt_rdma_inline_write_padding;
+
+ /*
+ * Create new transport instance, which includes initialized
+ * o ia
+ * o endpoint
+ * o buffers
+ */
+
+ new_xprt = rpcx_to_rdmax(xprt);
+
+ rc = rpcrdma_ia_open(new_xprt, (struct sockaddr *) &cdata.addr,
+ xprt_rdma_memreg_strategy);
+ if (rc)
+ goto out1;
+
+ /*
+ * initialize and create ep
+ */
+ new_xprt->rx_data = cdata;
+ new_ep = &new_xprt->rx_ep;
+ new_ep->rep_remote_addr = cdata.addr;
+
+ rc = rpcrdma_ep_create(&new_xprt->rx_ep,
+ &new_xprt->rx_ia, &new_xprt->rx_data);
+ if (rc)
+ goto out2;
+
+ /*
+ * Allocate pre-registered send and receive buffers for headers and
+ * any inline data. Also specify any padding which will be provided
+ * from a preregistered zero buffer.
+ */
+ rc = rpcrdma_buffer_create(new_xprt);
+ if (rc)
+ goto out3;
+
+ /*
+ * Register a callback for connection events. This is necessary because
+ * connection loss notification is async. We also catch connection loss
+ * when reaping receives.
+ */
+ INIT_DELAYED_WORK(&new_xprt->rx_connect_worker,
+ xprt_rdma_connect_worker);
+
+ xprt_rdma_format_addresses(xprt);
+ xprt->max_payload = new_xprt->rx_ia.ri_ops->ro_maxpages(new_xprt);
+ if (xprt->max_payload == 0)
+ goto out4;
+ xprt->max_payload <<= PAGE_SHIFT;
+ dprintk("RPC: %s: transport data payload maximum: %zu bytes\n",
+ __func__, xprt->max_payload);
+
+ if (!try_module_get(THIS_MODULE))
+ goto out4;
+
+ return xprt;
+
+out4:
+ xprt_rdma_free_addresses(xprt);
+ rc = -EINVAL;
+out3:
+ rpcrdma_ep_destroy(new_ep, &new_xprt->rx_ia);
+out2:
+ rpcrdma_ia_close(&new_xprt->rx_ia);
+out1:
+ xprt_free(xprt);
+ return ERR_PTR(rc);
+}
+
+/*
+ * Close a connection, during shutdown or timeout/reconnect
+ */
+static void
+xprt_rdma_close(struct rpc_xprt *xprt)
+{
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+
+ dprintk("RPC: %s: closing\n", __func__);
+ if (r_xprt->rx_ep.rep_connected > 0)
+ xprt->reestablish_timeout = 0;
+ xprt_disconnect_done(xprt);
+ rpcrdma_ep_disconnect(&r_xprt->rx_ep, &r_xprt->rx_ia);
+}
+
+static void
+xprt_rdma_set_port(struct rpc_xprt *xprt, u16 port)
+{
+ struct sockaddr_in *sap;
+
+ sap = (struct sockaddr_in *)&xprt->addr;
+ sap->sin_port = htons(port);
+ sap = (struct sockaddr_in *)&rpcx_to_rdmad(xprt).addr;
+ sap->sin_port = htons(port);
+ dprintk("RPC: %s: %u\n", __func__, port);
+}
+
+static void
+xprt_rdma_connect(struct rpc_xprt *xprt, struct rpc_task *task)
+{
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+
+ if (r_xprt->rx_ep.rep_connected != 0) {
+ /* Reconnect */
+ schedule_delayed_work(&r_xprt->rx_connect_worker,
+ xprt->reestablish_timeout);
+ xprt->reestablish_timeout <<= 1;
+ if (xprt->reestablish_timeout > RPCRDMA_MAX_REEST_TO)
+ xprt->reestablish_timeout = RPCRDMA_MAX_REEST_TO;
+ else if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO)
+ xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
+ } else {
+ schedule_delayed_work(&r_xprt->rx_connect_worker, 0);
+ if (!RPC_IS_ASYNC(task))
+ flush_delayed_work(&r_xprt->rx_connect_worker);
+ }
+}
+
+/*
+ * The RDMA allocate/free functions need the task structure as a place
+ * to hide the struct rpcrdma_req, which is necessary for the actual send/recv
+ * sequence.
+ *
+ * The RPC layer allocates both send and receive buffers in the same call
+ * (rq_send_buf and rq_rcv_buf are both part of a single contiguous buffer).
+ * We may register rq_rcv_buf when using reply chunks.
+ */
+static void *
+xprt_rdma_allocate(struct rpc_task *task, size_t size)
+{
+ struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+ struct rpcrdma_regbuf *rb;
+ struct rpcrdma_req *req;
+ size_t min_size;
+ gfp_t flags;
+
+ req = rpcrdma_buffer_get(&r_xprt->rx_buf);
+ if (req == NULL)
+ return NULL;
+
+ flags = GFP_NOIO | __GFP_NOWARN;
+ if (RPC_IS_SWAPPER(task))
+ flags = __GFP_MEMALLOC | GFP_NOWAIT | __GFP_NOWARN;
+
+ if (req->rl_rdmabuf == NULL)
+ goto out_rdmabuf;
+ if (req->rl_sendbuf == NULL)
+ goto out_sendbuf;
+ if (size > req->rl_sendbuf->rg_size)
+ goto out_sendbuf;
+
+out:
+ dprintk("RPC: %s: size %zd, request 0x%p\n", __func__, size, req);
+ req->rl_connect_cookie = 0; /* our reserved value */
+ return req->rl_sendbuf->rg_base;
+
+out_rdmabuf:
+ min_size = RPCRDMA_INLINE_WRITE_THRESHOLD(task->tk_rqstp);
+ rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, min_size, flags);
+ if (IS_ERR(rb))
+ goto out_fail;
+ req->rl_rdmabuf = rb;
+
+out_sendbuf:
+ /* XDR encoding and RPC/RDMA marshaling of this request has not
+ * yet occurred. Thus a lower bound is needed to prevent buffer
+ * overrun during marshaling.
+ *
+ * RPC/RDMA marshaling may choose to send payload bearing ops
+ * inline, if the result is smaller than the inline threshold.
+ * The value of the "size" argument accounts for header
+ * requirements but not for the payload in these cases.
+ *
+ * Likewise, allocate enough space to receive a reply up to the
+ * size of the inline threshold.
+ *
+ * It's unlikely that both the send header and the received
+ * reply will be large, but slush is provided here to allow
+ * flexibility when marshaling.
+ */
+ min_size = RPCRDMA_INLINE_READ_THRESHOLD(task->tk_rqstp);
+ min_size += RPCRDMA_INLINE_WRITE_THRESHOLD(task->tk_rqstp);
+ if (size < min_size)
+ size = min_size;
+
+ rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, size, flags);
+ if (IS_ERR(rb))
+ goto out_fail;
+ rb->rg_owner = req;
+
+ r_xprt->rx_stats.hardway_register_count += size;
+ rpcrdma_free_regbuf(&r_xprt->rx_ia, req->rl_sendbuf);
+ req->rl_sendbuf = rb;
+ goto out;
+
+out_fail:
+ rpcrdma_buffer_put(req);
+ r_xprt->rx_stats.failed_marshal_count++;
+ return NULL;
+}
+
+/*
+ * This function returns all RDMA resources to the pool.
+ */
+static void
+xprt_rdma_free(void *buffer)
+{
+ struct rpcrdma_req *req;
+ struct rpcrdma_xprt *r_xprt;
+ struct rpcrdma_regbuf *rb;
+ int i;
+
+ if (buffer == NULL)
+ return;
+
+ rb = container_of(buffer, struct rpcrdma_regbuf, rg_base[0]);
+ req = rb->rg_owner;
+ r_xprt = container_of(req->rl_buffer, struct rpcrdma_xprt, rx_buf);
+
+ dprintk("RPC: %s: called on 0x%p\n", __func__, req->rl_reply);
+
+ for (i = 0; req->rl_nchunks;) {
+ --req->rl_nchunks;
+ i += r_xprt->rx_ia.ri_ops->ro_unmap(r_xprt,
+ &req->rl_segments[i]);
+ }
+
+ rpcrdma_buffer_put(req);
+}
+
+/*
+ * send_request invokes the meat of RPC RDMA. It must do the following:
+ * 1. Marshal the RPC request into an RPC RDMA request, which means
+ * putting a header in front of data, and creating IOVs for RDMA
+ * from those in the request.
+ * 2. In marshaling, detect opportunities for RDMA, and use them.
+ * 3. Post a recv message to set up asynch completion, then send
+ * the request (rpcrdma_ep_post).
+ * 4. No partial sends are possible in the RPC-RDMA protocol (as in UDP).
+ */
+
+static int
+xprt_rdma_send_request(struct rpc_task *task)
+{
+ struct rpc_rqst *rqst = task->tk_rqstp;
+ struct rpc_xprt *xprt = rqst->rq_xprt;
+ struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+ int rc = 0;
+
+ rc = rpcrdma_marshal_req(rqst);
+ if (rc < 0)
+ goto failed_marshal;
+
+ if (req->rl_reply == NULL) /* e.g. reconnection */
+ rpcrdma_recv_buffer_get(req);
+
+ if (req->rl_reply) {
+ req->rl_reply->rr_func = rpcrdma_reply_handler;
+ /* this need only be done once, but... */
+ req->rl_reply->rr_xprt = xprt;
+ }
+
+ /* Must suppress retransmit to maintain credits */
+ if (req->rl_connect_cookie == xprt->connect_cookie)
+ goto drop_connection;
+ req->rl_connect_cookie = xprt->connect_cookie;
+
+ if (rpcrdma_ep_post(&r_xprt->rx_ia, &r_xprt->rx_ep, req))
+ goto drop_connection;
+
+ rqst->rq_xmit_bytes_sent += rqst->rq_snd_buf.len;
+ rqst->rq_bytes_sent = 0;
+ return 0;
+
+failed_marshal:
+ r_xprt->rx_stats.failed_marshal_count++;
+ dprintk("RPC: %s: rpcrdma_marshal_req failed, status %i\n",
+ __func__, rc);
+ if (rc == -EIO)
+ return -EIO;
+drop_connection:
+ xprt_disconnect_done(xprt);
+ return -ENOTCONN; /* implies disconnect */
+}
+
+static void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
+{
+ struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+ long idle_time = 0;
+
+ if (xprt_connected(xprt))
+ idle_time = (long)(jiffies - xprt->last_used) / HZ;
+
+ seq_printf(seq,
+ "\txprt:\trdma %u %lu %lu %lu %ld %lu %lu %lu %Lu %Lu "
+ "%lu %lu %lu %Lu %Lu %Lu %Lu %lu %lu %lu\n",
+
+ 0, /* need a local port? */
+ xprt->stat.bind_count,
+ xprt->stat.connect_count,
+ xprt->stat.connect_time,
+ idle_time,
+ xprt->stat.sends,
+ xprt->stat.recvs,
+ xprt->stat.bad_xids,
+ xprt->stat.req_u,
+ xprt->stat.bklog_u,
+
+ r_xprt->rx_stats.read_chunk_count,
+ r_xprt->rx_stats.write_chunk_count,
+ r_xprt->rx_stats.reply_chunk_count,
+ r_xprt->rx_stats.total_rdma_request,
+ r_xprt->rx_stats.total_rdma_reply,
+ r_xprt->rx_stats.pullup_copy_count,
+ r_xprt->rx_stats.fixup_copy_count,
+ r_xprt->rx_stats.hardway_register_count,
+ r_xprt->rx_stats.failed_marshal_count,
+ r_xprt->rx_stats.bad_reply_count);
+}
+
+/*
+ * Plumbing for rpc transport switch and kernel module
+ */
+
+static struct rpc_xprt_ops xprt_rdma_procs = {
+ .reserve_xprt = xprt_reserve_xprt_cong,
+ .release_xprt = xprt_release_xprt_cong, /* sunrpc/xprt.c */
+ .alloc_slot = xprt_alloc_slot,
+ .release_request = xprt_release_rqst_cong, /* ditto */
+ .set_retrans_timeout = xprt_set_retrans_timeout_def, /* ditto */
+ .rpcbind = rpcb_getport_async, /* sunrpc/rpcb_clnt.c */
+ .set_port = xprt_rdma_set_port,
+ .connect = xprt_rdma_connect,
+ .buf_alloc = xprt_rdma_allocate,
+ .buf_free = xprt_rdma_free,
+ .send_request = xprt_rdma_send_request,
+ .close = xprt_rdma_close,
+ .destroy = xprt_rdma_destroy,
+ .print_stats = xprt_rdma_print_stats
+};
+
+static struct xprt_class xprt_rdma = {
+ .list = LIST_HEAD_INIT(xprt_rdma.list),
+ .name = "rdma",
+ .owner = THIS_MODULE,
+ .ident = XPRT_TRANSPORT_RDMA,
+ .setup = xprt_setup_rdma,
+};
+
+static void __exit xprt_rdma_cleanup(void)
+{
+ int rc;
+
+ dprintk("RPCRDMA Module Removed, deregister RPC RDMA transport\n");
+#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
+ if (sunrpc_table_header) {
+ unregister_sysctl_table(sunrpc_table_header);
+ sunrpc_table_header = NULL;
+ }
+#endif
+ rc = xprt_unregister_transport(&xprt_rdma);
+ if (rc)
+ dprintk("RPC: %s: xprt_unregister returned %i\n",
+ __func__, rc);
+}
+
+static int __init xprt_rdma_init(void)
+{
+ int rc;
+
+ rc = xprt_register_transport(&xprt_rdma);
+
+ if (rc)
+ return rc;
+
+ dprintk("RPCRDMA Module Init, register RPC RDMA transport\n");
+
+ dprintk("Defaults:\n");
+ dprintk("\tSlots %d\n"
+ "\tMaxInlineRead %d\n\tMaxInlineWrite %d\n",
+ xprt_rdma_slot_table_entries,
+ xprt_rdma_max_inline_read, xprt_rdma_max_inline_write);
+ dprintk("\tPadding %d\n\tMemreg %d\n",
+ xprt_rdma_inline_write_padding, xprt_rdma_memreg_strategy);
+
+#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
+ if (!sunrpc_table_header)
+ sunrpc_table_header = register_sysctl_table(sunrpc_table);
+#endif
+ return 0;
+}
+
+module_init(xprt_rdma_init);
+module_exit(xprt_rdma_cleanup);
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
new file mode 100644
index 000000000..4870d272e
--- /dev/null
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -0,0 +1,1672 @@
+/*
+ * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses. You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the BSD-type
+ * license below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided
+ * with the distribution.
+ *
+ * Neither the name of the Network Appliance, Inc. nor the names of
+ * its contributors may be used to endorse or promote products
+ * derived from this software without specific prior written
+ * permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+/*
+ * verbs.c
+ *
+ * Encapsulates the major functions managing:
+ * o adapters
+ * o endpoints
+ * o connections
+ * o buffer memory
+ */
+
+#include <linux/interrupt.h>
+#include <linux/slab.h>
+#include <linux/prefetch.h>
+#include <linux/sunrpc/addr.h>
+#include <asm/bitops.h>
+
+#include "xprt_rdma.h"
+
+/*
+ * Globals/Macros
+ */
+
+#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
+# define RPCDBG_FACILITY RPCDBG_TRANS
+#endif
+
+/*
+ * internal functions
+ */
+
+/*
+ * handle replies in tasklet context, using a single, global list
+ * rdma tasklet function -- just turn around and call the func
+ * for all replies on the list
+ */
+
+static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
+static LIST_HEAD(rpcrdma_tasklets_g);
+
+static void
+rpcrdma_run_tasklet(unsigned long data)
+{
+ struct rpcrdma_rep *rep;
+ void (*func)(struct rpcrdma_rep *);
+ unsigned long flags;
+
+ data = data;
+ spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
+ while (!list_empty(&rpcrdma_tasklets_g)) {
+ rep = list_entry(rpcrdma_tasklets_g.next,
+ struct rpcrdma_rep, rr_list);
+ list_del(&rep->rr_list);
+ func = rep->rr_func;
+ rep->rr_func = NULL;
+ spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
+
+ if (func)
+ func(rep);
+ else
+ rpcrdma_recv_buffer_put(rep);
+
+ spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
+ }
+ spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
+}
+
+static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
+
+static const char * const async_event[] = {
+ "CQ error",
+ "QP fatal error",
+ "QP request error",
+ "QP access error",
+ "communication established",
+ "send queue drained",
+ "path migration successful",
+ "path mig error",
+ "device fatal error",
+ "port active",
+ "port error",
+ "LID change",
+ "P_key change",
+ "SM change",
+ "SRQ error",
+ "SRQ limit reached",
+ "last WQE reached",
+ "client reregister",
+ "GID change",
+};
+
+#define ASYNC_MSG(status) \
+ ((status) < ARRAY_SIZE(async_event) ? \
+ async_event[(status)] : "unknown async error")
+
+static void
+rpcrdma_schedule_tasklet(struct list_head *sched_list)
+{
+ unsigned long flags;
+
+ spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
+ list_splice_tail(sched_list, &rpcrdma_tasklets_g);
+ spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
+ tasklet_schedule(&rpcrdma_tasklet_g);
+}
+
+static void
+rpcrdma_qp_async_error_upcall(struct ib_event *event, void *context)
+{
+ struct rpcrdma_ep *ep = context;
+
+ pr_err("RPC: %s: %s on device %s ep %p\n",
+ __func__, ASYNC_MSG(event->event),
+ event->device->name, context);
+ if (ep->rep_connected == 1) {
+ ep->rep_connected = -EIO;
+ rpcrdma_conn_func(ep);
+ wake_up_all(&ep->rep_connect_wait);
+ }
+}
+
+static void
+rpcrdma_cq_async_error_upcall(struct ib_event *event, void *context)
+{
+ struct rpcrdma_ep *ep = context;
+
+ pr_err("RPC: %s: %s on device %s ep %p\n",
+ __func__, ASYNC_MSG(event->event),
+ event->device->name, context);
+ if (ep->rep_connected == 1) {
+ ep->rep_connected = -EIO;
+ rpcrdma_conn_func(ep);
+ wake_up_all(&ep->rep_connect_wait);
+ }
+}
+
+static const char * const wc_status[] = {
+ "success",
+ "local length error",
+ "local QP operation error",
+ "local EE context operation error",
+ "local protection error",
+ "WR flushed",
+ "memory management operation error",
+ "bad response error",
+ "local access error",
+ "remote invalid request error",
+ "remote access error",
+ "remote operation error",
+ "transport retry counter exceeded",
+ "RNR retry counter exceeded",
+ "local RDD violation error",
+ "remove invalid RD request",
+ "operation aborted",
+ "invalid EE context number",
+ "invalid EE context state",
+ "fatal error",
+ "response timeout error",
+ "general error",
+};
+
+#define COMPLETION_MSG(status) \
+ ((status) < ARRAY_SIZE(wc_status) ? \
+ wc_status[(status)] : "unexpected completion error")
+
+static void
+rpcrdma_sendcq_process_wc(struct ib_wc *wc)
+{
+ /* WARNING: Only wr_id and status are reliable at this point */
+ if (wc->wr_id == RPCRDMA_IGNORE_COMPLETION) {
+ if (wc->status != IB_WC_SUCCESS &&
+ wc->status != IB_WC_WR_FLUSH_ERR)
+ pr_err("RPC: %s: SEND: %s\n",
+ __func__, COMPLETION_MSG(wc->status));
+ } else {
+ struct rpcrdma_mw *r;
+
+ r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
+ r->mw_sendcompletion(wc);
+ }
+}
+
+static int
+rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
+{
+ struct ib_wc *wcs;
+ int budget, count, rc;
+
+ budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
+ do {
+ wcs = ep->rep_send_wcs;
+
+ rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
+ if (rc <= 0)
+ return rc;
+
+ count = rc;
+ while (count-- > 0)
+ rpcrdma_sendcq_process_wc(wcs++);
+ } while (rc == RPCRDMA_POLLSIZE && --budget);
+ return 0;
+}
+
+/*
+ * Handle send, fast_reg_mr, and local_inv completions.
+ *
+ * Send events are typically suppressed and thus do not result
+ * in an upcall. Occasionally one is signaled, however. This
+ * prevents the provider's completion queue from wrapping and
+ * losing a completion.
+ */
+static void
+rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
+{
+ struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
+ int rc;
+
+ rc = rpcrdma_sendcq_poll(cq, ep);
+ if (rc) {
+ dprintk("RPC: %s: ib_poll_cq failed: %i\n",
+ __func__, rc);
+ return;
+ }
+
+ rc = ib_req_notify_cq(cq,
+ IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
+ if (rc == 0)
+ return;
+ if (rc < 0) {
+ dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
+ __func__, rc);
+ return;
+ }
+
+ rpcrdma_sendcq_poll(cq, ep);
+}
+
+static void
+rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
+{
+ struct rpcrdma_rep *rep =
+ (struct rpcrdma_rep *)(unsigned long)wc->wr_id;
+
+ /* WARNING: Only wr_id and status are reliable at this point */
+ if (wc->status != IB_WC_SUCCESS)
+ goto out_fail;
+
+ /* status == SUCCESS means all fields in wc are trustworthy */
+ if (wc->opcode != IB_WC_RECV)
+ return;
+
+ dprintk("RPC: %s: rep %p opcode 'recv', length %u: success\n",
+ __func__, rep, wc->byte_len);
+
+ rep->rr_len = wc->byte_len;
+ ib_dma_sync_single_for_cpu(rdmab_to_ia(rep->rr_buffer)->ri_id->device,
+ rdmab_addr(rep->rr_rdmabuf),
+ rep->rr_len, DMA_FROM_DEVICE);
+ prefetch(rdmab_to_msg(rep->rr_rdmabuf));
+
+out_schedule:
+ list_add_tail(&rep->rr_list, sched_list);
+ return;
+out_fail:
+ if (wc->status != IB_WC_WR_FLUSH_ERR)
+ pr_err("RPC: %s: rep %p: %s\n",
+ __func__, rep, COMPLETION_MSG(wc->status));
+ rep->rr_len = ~0U;
+ goto out_schedule;
+}
+
+static int
+rpcrdma_recvcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
+{
+ struct list_head sched_list;
+ struct ib_wc *wcs;
+ int budget, count, rc;
+
+ INIT_LIST_HEAD(&sched_list);
+ budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
+ do {
+ wcs = ep->rep_recv_wcs;
+
+ rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
+ if (rc <= 0)
+ goto out_schedule;
+
+ count = rc;
+ while (count-- > 0)
+ rpcrdma_recvcq_process_wc(wcs++, &sched_list);
+ } while (rc == RPCRDMA_POLLSIZE && --budget);
+ rc = 0;
+
+out_schedule:
+ rpcrdma_schedule_tasklet(&sched_list);
+ return rc;
+}
+
+/*
+ * Handle receive completions.
+ *
+ * It is reentrant but processes single events in order to maintain
+ * ordering of receives to keep server credits.
+ *
+ * It is the responsibility of the scheduled tasklet to return
+ * recv buffers to the pool. NOTE: this affects synchronization of
+ * connection shutdown. That is, the structures required for
+ * the completion of the reply handler must remain intact until
+ * all memory has been reclaimed.
+ */
+static void
+rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
+{
+ struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
+ int rc;
+
+ rc = rpcrdma_recvcq_poll(cq, ep);
+ if (rc) {
+ dprintk("RPC: %s: ib_poll_cq failed: %i\n",
+ __func__, rc);
+ return;
+ }
+
+ rc = ib_req_notify_cq(cq,
+ IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
+ if (rc == 0)
+ return;
+ if (rc < 0) {
+ dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
+ __func__, rc);
+ return;
+ }
+
+ rpcrdma_recvcq_poll(cq, ep);
+}
+
+static void
+rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
+{
+ struct ib_wc wc;
+ LIST_HEAD(sched_list);
+
+ while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
+ rpcrdma_recvcq_process_wc(&wc, &sched_list);
+ if (!list_empty(&sched_list))
+ rpcrdma_schedule_tasklet(&sched_list);
+ while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0)
+ rpcrdma_sendcq_process_wc(&wc);
+}
+
+#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
+static const char * const conn[] = {
+ "address resolved",
+ "address error",
+ "route resolved",
+ "route error",
+ "connect request",
+ "connect response",
+ "connect error",
+ "unreachable",
+ "rejected",
+ "established",
+ "disconnected",
+ "device removal",
+ "multicast join",
+ "multicast error",
+ "address change",
+ "timewait exit",
+};
+
+#define CONNECTION_MSG(status) \
+ ((status) < ARRAY_SIZE(conn) ? \
+ conn[(status)] : "unrecognized connection error")
+#endif
+
+static int
+rpcrdma_conn_upcall(struct rdma_cm_id *id, struct rdma_cm_event *event)
+{
+ struct rpcrdma_xprt *xprt = id->context;
+ struct rpcrdma_ia *ia = &xprt->rx_ia;
+ struct rpcrdma_ep *ep = &xprt->rx_ep;
+#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
+ struct sockaddr *sap = (struct sockaddr *)&ep->rep_remote_addr;
+#endif
+ struct ib_qp_attr *attr = &ia->ri_qp_attr;
+ struct ib_qp_init_attr *iattr = &ia->ri_qp_init_attr;
+ int connstate = 0;
+
+ switch (event->event) {
+ case RDMA_CM_EVENT_ADDR_RESOLVED:
+ case RDMA_CM_EVENT_ROUTE_RESOLVED:
+ ia->ri_async_rc = 0;
+ complete(&ia->ri_done);
+ break;
+ case RDMA_CM_EVENT_ADDR_ERROR:
+ ia->ri_async_rc = -EHOSTUNREACH;
+ dprintk("RPC: %s: CM address resolution error, ep 0x%p\n",
+ __func__, ep);
+ complete(&ia->ri_done);
+ break;
+ case RDMA_CM_EVENT_ROUTE_ERROR:
+ ia->ri_async_rc = -ENETUNREACH;
+ dprintk("RPC: %s: CM route resolution error, ep 0x%p\n",
+ __func__, ep);
+ complete(&ia->ri_done);
+ break;
+ case RDMA_CM_EVENT_ESTABLISHED:
+ connstate = 1;
+ ib_query_qp(ia->ri_id->qp, attr,
+ IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
+ iattr);
+ dprintk("RPC: %s: %d responder resources"
+ " (%d initiator)\n",
+ __func__, attr->max_dest_rd_atomic,
+ attr->max_rd_atomic);
+ goto connected;
+ case RDMA_CM_EVENT_CONNECT_ERROR:
+ connstate = -ENOTCONN;
+ goto connected;
+ case RDMA_CM_EVENT_UNREACHABLE:
+ connstate = -ENETDOWN;
+ goto connected;
+ case RDMA_CM_EVENT_REJECTED:
+ connstate = -ECONNREFUSED;
+ goto connected;
+ case RDMA_CM_EVENT_DISCONNECTED:
+ connstate = -ECONNABORTED;
+ goto connected;
+ case RDMA_CM_EVENT_DEVICE_REMOVAL:
+ connstate = -ENODEV;
+connected:
+ dprintk("RPC: %s: %sconnected\n",
+ __func__, connstate > 0 ? "" : "dis");
+ ep->rep_connected = connstate;
+ rpcrdma_conn_func(ep);
+ wake_up_all(&ep->rep_connect_wait);
+ /*FALLTHROUGH*/
+ default:
+ dprintk("RPC: %s: %pIS:%u (ep 0x%p): %s\n",
+ __func__, sap, rpc_get_port(sap), ep,
+ CONNECTION_MSG(event->event));
+ break;
+ }
+
+#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
+ if (connstate == 1) {
+ int ird = attr->max_dest_rd_atomic;
+ int tird = ep->rep_remote_cma.responder_resources;
+
+ pr_info("rpcrdma: connection to %pIS:%u on %s, memreg '%s', %d credits, %d responders%s\n",
+ sap, rpc_get_port(sap),
+ ia->ri_id->device->name,
+ ia->ri_ops->ro_displayname,
+ xprt->rx_buf.rb_max_requests,
+ ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
+ } else if (connstate < 0) {
+ pr_info("rpcrdma: connection to %pIS:%u closed (%d)\n",
+ sap, rpc_get_port(sap), connstate);
+ }
+#endif
+
+ return 0;
+}
+
+static struct rdma_cm_id *
+rpcrdma_create_id(struct rpcrdma_xprt *xprt,
+ struct rpcrdma_ia *ia, struct sockaddr *addr)
+{
+ struct rdma_cm_id *id;
+ int rc;
+
+ init_completion(&ia->ri_done);
+
+ id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, IB_QPT_RC);
+ if (IS_ERR(id)) {
+ rc = PTR_ERR(id);
+ dprintk("RPC: %s: rdma_create_id() failed %i\n",
+ __func__, rc);
+ return id;
+ }
+
+ ia->ri_async_rc = -ETIMEDOUT;
+ rc = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
+ if (rc) {
+ dprintk("RPC: %s: rdma_resolve_addr() failed %i\n",
+ __func__, rc);
+ goto out;
+ }
+ wait_for_completion_interruptible_timeout(&ia->ri_done,
+ msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
+ rc = ia->ri_async_rc;
+ if (rc)
+ goto out;
+
+ ia->ri_async_rc = -ETIMEDOUT;
+ rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
+ if (rc) {
+ dprintk("RPC: %s: rdma_resolve_route() failed %i\n",
+ __func__, rc);
+ goto out;
+ }
+ wait_for_completion_interruptible_timeout(&ia->ri_done,
+ msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
+ rc = ia->ri_async_rc;
+ if (rc)
+ goto out;
+
+ return id;
+
+out:
+ rdma_destroy_id(id);
+ return ERR_PTR(rc);
+}
+
+/*
+ * Drain any cq, prior to teardown.
+ */
+static void
+rpcrdma_clean_cq(struct ib_cq *cq)
+{
+ struct ib_wc wc;
+ int count = 0;
+
+ while (1 == ib_poll_cq(cq, 1, &wc))
+ ++count;
+
+ if (count)
+ dprintk("RPC: %s: flushed %d events (last 0x%x)\n",
+ __func__, count, wc.opcode);
+}
+
+/*
+ * Exported functions.
+ */
+
+/*
+ * Open and initialize an Interface Adapter.
+ * o initializes fields of struct rpcrdma_ia, including
+ * interface and provider attributes and protection zone.
+ */
+int
+rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
+{
+ int rc, mem_priv;
+ struct rpcrdma_ia *ia = &xprt->rx_ia;
+ struct ib_device_attr *devattr = &ia->ri_devattr;
+
+ ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
+ if (IS_ERR(ia->ri_id)) {
+ rc = PTR_ERR(ia->ri_id);
+ goto out1;
+ }
+
+ ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
+ if (IS_ERR(ia->ri_pd)) {
+ rc = PTR_ERR(ia->ri_pd);
+ dprintk("RPC: %s: ib_alloc_pd() failed %i\n",
+ __func__, rc);
+ goto out2;
+ }
+
+ rc = ib_query_device(ia->ri_id->device, devattr);
+ if (rc) {
+ dprintk("RPC: %s: ib_query_device failed %d\n",
+ __func__, rc);
+ goto out3;
+ }
+
+ if (devattr->device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
+ ia->ri_have_dma_lkey = 1;
+ ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;
+ }
+
+ if (memreg == RPCRDMA_FRMR) {
+ /* Requires both frmr reg and local dma lkey */
+ if (((devattr->device_cap_flags &
+ (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=
+ (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) ||
+ (devattr->max_fast_reg_page_list_len == 0)) {
+ dprintk("RPC: %s: FRMR registration "
+ "not supported by HCA\n", __func__);
+ memreg = RPCRDMA_MTHCAFMR;
+ }
+ }
+ if (memreg == RPCRDMA_MTHCAFMR) {
+ if (!ia->ri_id->device->alloc_fmr) {
+ dprintk("RPC: %s: MTHCAFMR registration "
+ "not supported by HCA\n", __func__);
+ memreg = RPCRDMA_ALLPHYSICAL;
+ }
+ }
+
+ /*
+ * Optionally obtain an underlying physical identity mapping in
+ * order to do a memory window-based bind. This base registration
+ * is protected from remote access - that is enabled only by binding
+ * for the specific bytes targeted during each RPC operation, and
+ * revoked after the corresponding completion similar to a storage
+ * adapter.
+ */
+ switch (memreg) {
+ case RPCRDMA_FRMR:
+ ia->ri_ops = &rpcrdma_frwr_memreg_ops;
+ break;
+ case RPCRDMA_ALLPHYSICAL:
+ ia->ri_ops = &rpcrdma_physical_memreg_ops;
+ mem_priv = IB_ACCESS_LOCAL_WRITE |
+ IB_ACCESS_REMOTE_WRITE |
+ IB_ACCESS_REMOTE_READ;
+ goto register_setup;
+ case RPCRDMA_MTHCAFMR:
+ ia->ri_ops = &rpcrdma_fmr_memreg_ops;
+ if (ia->ri_have_dma_lkey)
+ break;
+ mem_priv = IB_ACCESS_LOCAL_WRITE;
+ register_setup:
+ ia->ri_bind_mem = ib_get_dma_mr(ia->ri_pd, mem_priv);
+ if (IS_ERR(ia->ri_bind_mem)) {
+ printk(KERN_ALERT "%s: ib_get_dma_mr for "
+ "phys register failed with %lX\n",
+ __func__, PTR_ERR(ia->ri_bind_mem));
+ rc = -ENOMEM;
+ goto out3;
+ }
+ break;
+ default:
+ printk(KERN_ERR "RPC: Unsupported memory "
+ "registration mode: %d\n", memreg);
+ rc = -ENOMEM;
+ goto out3;
+ }
+ dprintk("RPC: %s: memory registration strategy is '%s'\n",
+ __func__, ia->ri_ops->ro_displayname);
+
+ /* Else will do memory reg/dereg for each chunk */
+ ia->ri_memreg_strategy = memreg;
+
+ rwlock_init(&ia->ri_qplock);
+ return 0;
+
+out3:
+ ib_dealloc_pd(ia->ri_pd);
+ ia->ri_pd = NULL;
+out2:
+ rdma_destroy_id(ia->ri_id);
+ ia->ri_id = NULL;
+out1:
+ return rc;
+}
+
+/*
+ * Clean up/close an IA.
+ * o if event handles and PD have been initialized, free them.
+ * o close the IA
+ */
+void
+rpcrdma_ia_close(struct rpcrdma_ia *ia)
+{
+ int rc;
+
+ dprintk("RPC: %s: entering\n", __func__);
+ if (ia->ri_bind_mem != NULL) {
+ rc = ib_dereg_mr(ia->ri_bind_mem);
+ dprintk("RPC: %s: ib_dereg_mr returned %i\n",
+ __func__, rc);
+ }
+ if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
+ if (ia->ri_id->qp)
+ rdma_destroy_qp(ia->ri_id);
+ rdma_destroy_id(ia->ri_id);
+ ia->ri_id = NULL;
+ }
+ if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) {
+ rc = ib_dealloc_pd(ia->ri_pd);
+ dprintk("RPC: %s: ib_dealloc_pd returned %i\n",
+ __func__, rc);
+ }
+}
+
+/*
+ * Create unconnected endpoint.
+ */
+int
+rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
+ struct rpcrdma_create_data_internal *cdata)
+{
+ struct ib_device_attr *devattr = &ia->ri_devattr;
+ struct ib_cq *sendcq, *recvcq;
+ int rc, err;
+
+ /* check provider's send/recv wr limits */
+ if (cdata->max_requests > devattr->max_qp_wr)
+ cdata->max_requests = devattr->max_qp_wr;
+
+ ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
+ ep->rep_attr.qp_context = ep;
+ ep->rep_attr.srq = NULL;
+ ep->rep_attr.cap.max_send_wr = cdata->max_requests;
+ rc = ia->ri_ops->ro_open(ia, ep, cdata);
+ if (rc)
+ return rc;
+ ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
+ ep->rep_attr.cap.max_send_sge = (cdata->padding ? 4 : 2);
+ ep->rep_attr.cap.max_recv_sge = 1;
+ ep->rep_attr.cap.max_inline_data = 0;
+ ep->rep_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
+ ep->rep_attr.qp_type = IB_QPT_RC;
+ ep->rep_attr.port_num = ~0;
+
+ if (cdata->padding) {
+ ep->rep_padbuf = rpcrdma_alloc_regbuf(ia, cdata->padding,
+ GFP_KERNEL);
+ if (IS_ERR(ep->rep_padbuf))
+ return PTR_ERR(ep->rep_padbuf);
+ } else
+ ep->rep_padbuf = NULL;
+
+ dprintk("RPC: %s: requested max: dtos: send %d recv %d; "
+ "iovs: send %d recv %d\n",
+ __func__,
+ ep->rep_attr.cap.max_send_wr,
+ ep->rep_attr.cap.max_recv_wr,
+ ep->rep_attr.cap.max_send_sge,
+ ep->rep_attr.cap.max_recv_sge);
+
+ /* set trigger for requesting send completion */
+ ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;
+ if (ep->rep_cqinit > RPCRDMA_MAX_UNSIGNALED_SENDS)
+ ep->rep_cqinit = RPCRDMA_MAX_UNSIGNALED_SENDS;
+ else if (ep->rep_cqinit <= 2)
+ ep->rep_cqinit = 0;
+ INIT_CQCOUNT(ep);
+ init_waitqueue_head(&ep->rep_connect_wait);
+ INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
+
+ sendcq = ib_create_cq(ia->ri_id->device, rpcrdma_sendcq_upcall,
+ rpcrdma_cq_async_error_upcall, ep,
+ ep->rep_attr.cap.max_send_wr + 1, 0);
+ if (IS_ERR(sendcq)) {
+ rc = PTR_ERR(sendcq);
+ dprintk("RPC: %s: failed to create send CQ: %i\n",
+ __func__, rc);
+ goto out1;
+ }
+
+ rc = ib_req_notify_cq(sendcq, IB_CQ_NEXT_COMP);
+ if (rc) {
+ dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
+ __func__, rc);
+ goto out2;
+ }
+
+ recvcq = ib_create_cq(ia->ri_id->device, rpcrdma_recvcq_upcall,
+ rpcrdma_cq_async_error_upcall, ep,
+ ep->rep_attr.cap.max_recv_wr + 1, 0);
+ if (IS_ERR(recvcq)) {
+ rc = PTR_ERR(recvcq);
+ dprintk("RPC: %s: failed to create recv CQ: %i\n",
+ __func__, rc);
+ goto out2;
+ }
+
+ rc = ib_req_notify_cq(recvcq, IB_CQ_NEXT_COMP);
+ if (rc) {
+ dprintk("RPC: %s: ib_req_notify_cq failed: %i\n",
+ __func__, rc);
+ ib_destroy_cq(recvcq);
+ goto out2;
+ }
+
+ ep->rep_attr.send_cq = sendcq;
+ ep->rep_attr.recv_cq = recvcq;
+
+ /* Initialize cma parameters */
+
+ /* RPC/RDMA does not use private data */
+ ep->rep_remote_cma.private_data = NULL;
+ ep->rep_remote_cma.private_data_len = 0;
+
+ /* Client offers RDMA Read but does not initiate */
+ ep->rep_remote_cma.initiator_depth = 0;
+ if (devattr->max_qp_rd_atom > 32) /* arbitrary but <= 255 */
+ ep->rep_remote_cma.responder_resources = 32;
+ else
+ ep->rep_remote_cma.responder_resources =
+ devattr->max_qp_rd_atom;
+
+ ep->rep_remote_cma.retry_count = 7;
+ ep->rep_remote_cma.flow_control = 0;
+ ep->rep_remote_cma.rnr_retry_count = 0;
+
+ return 0;
+
+out2:
+ err = ib_destroy_cq(sendcq);
+ if (err)
+ dprintk("RPC: %s: ib_destroy_cq returned %i\n",
+ __func__, err);
+out1:
+ rpcrdma_free_regbuf(ia, ep->rep_padbuf);
+ return rc;
+}
+
+/*
+ * rpcrdma_ep_destroy
+ *
+ * Disconnect and destroy endpoint. After this, the only
+ * valid operations on the ep are to free it (if dynamically
+ * allocated) or re-create it.
+ */
+void
+rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
+{
+ int rc;
+
+ dprintk("RPC: %s: entering, connected is %d\n",
+ __func__, ep->rep_connected);
+
+ cancel_delayed_work_sync(&ep->rep_connect_worker);
+
+ if (ia->ri_id->qp) {
+ rpcrdma_ep_disconnect(ep, ia);
+ rdma_destroy_qp(ia->ri_id);
+ ia->ri_id->qp = NULL;
+ }
+
+ rpcrdma_free_regbuf(ia, ep->rep_padbuf);
+
+ rpcrdma_clean_cq(ep->rep_attr.recv_cq);
+ rc = ib_destroy_cq(ep->rep_attr.recv_cq);
+ if (rc)
+ dprintk("RPC: %s: ib_destroy_cq returned %i\n",
+ __func__, rc);
+
+ rpcrdma_clean_cq(ep->rep_attr.send_cq);
+ rc = ib_destroy_cq(ep->rep_attr.send_cq);
+ if (rc)
+ dprintk("RPC: %s: ib_destroy_cq returned %i\n",
+ __func__, rc);
+}
+
+/*
+ * Connect unconnected endpoint.
+ */
+int
+rpcrdma_ep_connect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
+{
+ struct rdma_cm_id *id, *old;
+ int rc = 0;
+ int retry_count = 0;
+
+ if (ep->rep_connected != 0) {
+ struct rpcrdma_xprt *xprt;
+retry:
+ dprintk("RPC: %s: reconnecting...\n", __func__);
+
+ rpcrdma_ep_disconnect(ep, ia);
+ rpcrdma_flush_cqs(ep);
+
+ xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
+ ia->ri_ops->ro_reset(xprt);
+
+ id = rpcrdma_create_id(xprt, ia,
+ (struct sockaddr *)&xprt->rx_data.addr);
+ if (IS_ERR(id)) {
+ rc = -EHOSTUNREACH;
+ goto out;
+ }
+ /* TEMP TEMP TEMP - fail if new device:
+ * Deregister/remarshal *all* requests!
+ * Close and recreate adapter, pd, etc!
+ * Re-determine all attributes still sane!
+ * More stuff I haven't thought of!
+ * Rrrgh!
+ */
+ if (ia->ri_id->device != id->device) {
+ printk("RPC: %s: can't reconnect on "
+ "different device!\n", __func__);
+ rdma_destroy_id(id);
+ rc = -ENETUNREACH;
+ goto out;
+ }
+ /* END TEMP */
+ rc = rdma_create_qp(id, ia->ri_pd, &ep->rep_attr);
+ if (rc) {
+ dprintk("RPC: %s: rdma_create_qp failed %i\n",
+ __func__, rc);
+ rdma_destroy_id(id);
+ rc = -ENETUNREACH;
+ goto out;
+ }
+
+ write_lock(&ia->ri_qplock);
+ old = ia->ri_id;
+ ia->ri_id = id;
+ write_unlock(&ia->ri_qplock);
+
+ rdma_destroy_qp(old);
+ rdma_destroy_id(old);
+ } else {
+ dprintk("RPC: %s: connecting...\n", __func__);
+ rc = rdma_create_qp(ia->ri_id, ia->ri_pd, &ep->rep_attr);
+ if (rc) {
+ dprintk("RPC: %s: rdma_create_qp failed %i\n",
+ __func__, rc);
+ /* do not update ep->rep_connected */
+ return -ENETUNREACH;
+ }
+ }
+
+ ep->rep_connected = 0;
+
+ rc = rdma_connect(ia->ri_id, &ep->rep_remote_cma);
+ if (rc) {
+ dprintk("RPC: %s: rdma_connect() failed with %i\n",
+ __func__, rc);
+ goto out;
+ }
+
+ wait_event_interruptible(ep->rep_connect_wait, ep->rep_connected != 0);
+
+ /*
+ * Check state. A non-peer reject indicates no listener
+ * (ECONNREFUSED), which may be a transient state. All
+ * others indicate a transport condition which has already
+ * undergone a best-effort.
+ */
+ if (ep->rep_connected == -ECONNREFUSED &&
+ ++retry_count <= RDMA_CONNECT_RETRY_MAX) {
+ dprintk("RPC: %s: non-peer_reject, retry\n", __func__);
+ goto retry;
+ }
+ if (ep->rep_connected <= 0) {
+ /* Sometimes, the only way to reliably connect to remote
+ * CMs is to use same nonzero values for ORD and IRD. */
+ if (retry_count++ <= RDMA_CONNECT_RETRY_MAX + 1 &&
+ (ep->rep_remote_cma.responder_resources == 0 ||
+ ep->rep_remote_cma.initiator_depth !=
+ ep->rep_remote_cma.responder_resources)) {
+ if (ep->rep_remote_cma.responder_resources == 0)
+ ep->rep_remote_cma.responder_resources = 1;
+ ep->rep_remote_cma.initiator_depth =
+ ep->rep_remote_cma.responder_resources;
+ goto retry;
+ }
+ rc = ep->rep_connected;
+ } else {
+ dprintk("RPC: %s: connected\n", __func__);
+ }
+
+out:
+ if (rc)
+ ep->rep_connected = rc;
+ return rc;
+}
+
+/*
+ * rpcrdma_ep_disconnect
+ *
+ * This is separate from destroy to facilitate the ability
+ * to reconnect without recreating the endpoint.
+ *
+ * This call is not reentrant, and must not be made in parallel
+ * on the same endpoint.
+ */
+void
+rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
+{
+ int rc;
+
+ rpcrdma_flush_cqs(ep);
+ rc = rdma_disconnect(ia->ri_id);
+ if (!rc) {
+ /* returns without wait if not connected */
+ wait_event_interruptible(ep->rep_connect_wait,
+ ep->rep_connected != 1);
+ dprintk("RPC: %s: after wait, %sconnected\n", __func__,
+ (ep->rep_connected == 1) ? "still " : "dis");
+ } else {
+ dprintk("RPC: %s: rdma_disconnect %i\n", __func__, rc);
+ ep->rep_connected = rc;
+ }
+}
+
+static struct rpcrdma_req *
+rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
+{
+ struct rpcrdma_req *req;
+
+ req = kzalloc(sizeof(*req), GFP_KERNEL);
+ if (req == NULL)
+ return ERR_PTR(-ENOMEM);
+
+ req->rl_buffer = &r_xprt->rx_buf;
+ return req;
+}
+
+static struct rpcrdma_rep *
+rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
+{
+ struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
+ struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+ struct rpcrdma_rep *rep;
+ int rc;
+
+ rc = -ENOMEM;
+ rep = kzalloc(sizeof(*rep), GFP_KERNEL);
+ if (rep == NULL)
+ goto out;
+
+ rep->rr_rdmabuf = rpcrdma_alloc_regbuf(ia, cdata->inline_rsize,
+ GFP_KERNEL);
+ if (IS_ERR(rep->rr_rdmabuf)) {
+ rc = PTR_ERR(rep->rr_rdmabuf);
+ goto out_free;
+ }
+
+ rep->rr_buffer = &r_xprt->rx_buf;
+ return rep;
+
+out_free:
+ kfree(rep);
+out:
+ return ERR_PTR(rc);
+}
+
+int
+rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
+{
+ struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+ struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+ struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
+ char *p;
+ size_t len;
+ int i, rc;
+
+ buf->rb_max_requests = cdata->max_requests;
+ spin_lock_init(&buf->rb_lock);
+
+ /* Need to allocate:
+ * 1. arrays for send and recv pointers
+ * 2. arrays of struct rpcrdma_req to fill in pointers
+ * 3. array of struct rpcrdma_rep for replies
+ * Send/recv buffers in req/rep need to be registered
+ */
+ len = buf->rb_max_requests *
+ (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
+
+ p = kzalloc(len, GFP_KERNEL);
+ if (p == NULL) {
+ dprintk("RPC: %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
+ __func__, len);
+ rc = -ENOMEM;
+ goto out;
+ }
+ buf->rb_pool = p; /* for freeing it later */
+
+ buf->rb_send_bufs = (struct rpcrdma_req **) p;
+ p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
+ buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
+ p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
+
+ rc = ia->ri_ops->ro_init(r_xprt);
+ if (rc)
+ goto out;
+
+ for (i = 0; i < buf->rb_max_requests; i++) {
+ struct rpcrdma_req *req;
+ struct rpcrdma_rep *rep;
+
+ req = rpcrdma_create_req(r_xprt);
+ if (IS_ERR(req)) {
+ dprintk("RPC: %s: request buffer %d alloc"
+ " failed\n", __func__, i);
+ rc = PTR_ERR(req);
+ goto out;
+ }
+ buf->rb_send_bufs[i] = req;
+
+ rep = rpcrdma_create_rep(r_xprt);
+ if (IS_ERR(rep)) {
+ dprintk("RPC: %s: reply buffer %d alloc failed\n",
+ __func__, i);
+ rc = PTR_ERR(rep);
+ goto out;
+ }
+ buf->rb_recv_bufs[i] = rep;
+ }
+
+ return 0;
+out:
+ rpcrdma_buffer_destroy(buf);
+ return rc;
+}
+
+static void
+rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep)
+{
+ if (!rep)
+ return;
+
+ rpcrdma_free_regbuf(ia, rep->rr_rdmabuf);
+ kfree(rep);
+}
+
+static void
+rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
+{
+ if (!req)
+ return;
+
+ rpcrdma_free_regbuf(ia, req->rl_sendbuf);
+ rpcrdma_free_regbuf(ia, req->rl_rdmabuf);
+ kfree(req);
+}
+
+void
+rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
+{
+ struct rpcrdma_ia *ia = rdmab_to_ia(buf);
+ int i;
+
+ /* clean up in reverse order from create
+ * 1. recv mr memory (mr free, then kfree)
+ * 2. send mr memory (mr free, then kfree)
+ * 3. MWs
+ */
+ dprintk("RPC: %s: entering\n", __func__);
+
+ for (i = 0; i < buf->rb_max_requests; i++) {
+ if (buf->rb_recv_bufs)
+ rpcrdma_destroy_rep(ia, buf->rb_recv_bufs[i]);
+ if (buf->rb_send_bufs)
+ rpcrdma_destroy_req(ia, buf->rb_send_bufs[i]);
+ }
+
+ ia->ri_ops->ro_destroy(buf);
+
+ kfree(buf->rb_pool);
+}
+
+/* "*mw" can be NULL when rpcrdma_buffer_get_mrs() fails, leaving
+ * some req segments uninitialized.
+ */
+static void
+rpcrdma_buffer_put_mr(struct rpcrdma_mw **mw, struct rpcrdma_buffer *buf)
+{
+ if (*mw) {
+ list_add_tail(&(*mw)->mw_list, &buf->rb_mws);
+ *mw = NULL;
+ }
+}
+
+/* Cycle mw's back in reverse order, and "spin" them.
+ * This delays and scrambles reuse as much as possible.
+ */
+static void
+rpcrdma_buffer_put_mrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
+{
+ struct rpcrdma_mr_seg *seg = req->rl_segments;
+ struct rpcrdma_mr_seg *seg1 = seg;
+ int i;
+
+ for (i = 1, seg++; i < RPCRDMA_MAX_SEGS; seg++, i++)
+ rpcrdma_buffer_put_mr(&seg->rl_mw, buf);
+ rpcrdma_buffer_put_mr(&seg1->rl_mw, buf);
+}
+
+static void
+rpcrdma_buffer_put_sendbuf(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
+{
+ buf->rb_send_bufs[--buf->rb_send_index] = req;
+ req->rl_niovs = 0;
+ if (req->rl_reply) {
+ buf->rb_recv_bufs[--buf->rb_recv_index] = req->rl_reply;
+ req->rl_reply->rr_func = NULL;
+ req->rl_reply = NULL;
+ }
+}
+
+/* rpcrdma_unmap_one() was already done during deregistration.
+ * Redo only the ib_post_send().
+ */
+static void
+rpcrdma_retry_local_inv(struct rpcrdma_mw *r, struct rpcrdma_ia *ia)
+{
+ struct rpcrdma_xprt *r_xprt =
+ container_of(ia, struct rpcrdma_xprt, rx_ia);
+ struct ib_send_wr invalidate_wr, *bad_wr;
+ int rc;
+
+ dprintk("RPC: %s: FRMR %p is stale\n", __func__, r);
+
+ /* When this FRMR is re-inserted into rb_mws, it is no longer stale */
+ r->r.frmr.fr_state = FRMR_IS_INVALID;
+
+ memset(&invalidate_wr, 0, sizeof(invalidate_wr));
+ invalidate_wr.wr_id = (unsigned long)(void *)r;
+ invalidate_wr.opcode = IB_WR_LOCAL_INV;
+ invalidate_wr.ex.invalidate_rkey = r->r.frmr.fr_mr->rkey;
+ DECR_CQCOUNT(&r_xprt->rx_ep);
+
+ dprintk("RPC: %s: frmr %p invalidating rkey %08x\n",
+ __func__, r, r->r.frmr.fr_mr->rkey);
+
+ read_lock(&ia->ri_qplock);
+ rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
+ read_unlock(&ia->ri_qplock);
+ if (rc) {
+ /* Force rpcrdma_buffer_get() to retry */
+ r->r.frmr.fr_state = FRMR_IS_STALE;
+ dprintk("RPC: %s: ib_post_send failed, %i\n",
+ __func__, rc);
+ }
+}
+
+static void
+rpcrdma_retry_flushed_linv(struct list_head *stale,
+ struct rpcrdma_buffer *buf)
+{
+ struct rpcrdma_ia *ia = rdmab_to_ia(buf);
+ struct list_head *pos;
+ struct rpcrdma_mw *r;
+ unsigned long flags;
+
+ list_for_each(pos, stale) {
+ r = list_entry(pos, struct rpcrdma_mw, mw_list);
+ rpcrdma_retry_local_inv(r, ia);
+ }
+
+ spin_lock_irqsave(&buf->rb_lock, flags);
+ list_splice_tail(stale, &buf->rb_mws);
+ spin_unlock_irqrestore(&buf->rb_lock, flags);
+}
+
+static struct rpcrdma_req *
+rpcrdma_buffer_get_frmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf,
+ struct list_head *stale)
+{
+ struct rpcrdma_mw *r;
+ int i;
+
+ i = RPCRDMA_MAX_SEGS - 1;
+ while (!list_empty(&buf->rb_mws)) {
+ r = list_entry(buf->rb_mws.next,
+ struct rpcrdma_mw, mw_list);
+ list_del(&r->mw_list);
+ if (r->r.frmr.fr_state == FRMR_IS_STALE) {
+ list_add(&r->mw_list, stale);
+ continue;
+ }
+ req->rl_segments[i].rl_mw = r;
+ if (unlikely(i-- == 0))
+ return req; /* Success */
+ }
+
+ /* Not enough entries on rb_mws for this req */
+ rpcrdma_buffer_put_sendbuf(req, buf);
+ rpcrdma_buffer_put_mrs(req, buf);
+ return NULL;
+}
+
+static struct rpcrdma_req *
+rpcrdma_buffer_get_fmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
+{
+ struct rpcrdma_mw *r;
+ int i;
+
+ i = RPCRDMA_MAX_SEGS - 1;
+ while (!list_empty(&buf->rb_mws)) {
+ r = list_entry(buf->rb_mws.next,
+ struct rpcrdma_mw, mw_list);
+ list_del(&r->mw_list);
+ req->rl_segments[i].rl_mw = r;
+ if (unlikely(i-- == 0))
+ return req; /* Success */
+ }
+
+ /* Not enough entries on rb_mws for this req */
+ rpcrdma_buffer_put_sendbuf(req, buf);
+ rpcrdma_buffer_put_mrs(req, buf);
+ return NULL;
+}
+
+/*
+ * Get a set of request/reply buffers.
+ *
+ * Reply buffer (if needed) is attached to send buffer upon return.
+ * Rule:
+ * rb_send_index and rb_recv_index MUST always be pointing to the
+ * *next* available buffer (non-NULL). They are incremented after
+ * removing buffers, and decremented *before* returning them.
+ */
+struct rpcrdma_req *
+rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
+{
+ struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
+ struct list_head stale;
+ struct rpcrdma_req *req;
+ unsigned long flags;
+
+ spin_lock_irqsave(&buffers->rb_lock, flags);
+ if (buffers->rb_send_index == buffers->rb_max_requests) {
+ spin_unlock_irqrestore(&buffers->rb_lock, flags);
+ dprintk("RPC: %s: out of request buffers\n", __func__);
+ return ((struct rpcrdma_req *)NULL);
+ }
+
+ req = buffers->rb_send_bufs[buffers->rb_send_index];
+ if (buffers->rb_send_index < buffers->rb_recv_index) {
+ dprintk("RPC: %s: %d extra receives outstanding (ok)\n",
+ __func__,
+ buffers->rb_recv_index - buffers->rb_send_index);
+ req->rl_reply = NULL;
+ } else {
+ req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
+ buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
+ }
+ buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
+
+ INIT_LIST_HEAD(&stale);
+ switch (ia->ri_memreg_strategy) {
+ case RPCRDMA_FRMR:
+ req = rpcrdma_buffer_get_frmrs(req, buffers, &stale);
+ break;
+ case RPCRDMA_MTHCAFMR:
+ req = rpcrdma_buffer_get_fmrs(req, buffers);
+ break;
+ default:
+ break;
+ }
+ spin_unlock_irqrestore(&buffers->rb_lock, flags);
+ if (!list_empty(&stale))
+ rpcrdma_retry_flushed_linv(&stale, buffers);
+ return req;
+}
+
+/*
+ * Put request/reply buffers back into pool.
+ * Pre-decrement counter/array index.
+ */
+void
+rpcrdma_buffer_put(struct rpcrdma_req *req)
+{
+ struct rpcrdma_buffer *buffers = req->rl_buffer;
+ struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
+ unsigned long flags;
+
+ spin_lock_irqsave(&buffers->rb_lock, flags);
+ rpcrdma_buffer_put_sendbuf(req, buffers);
+ switch (ia->ri_memreg_strategy) {
+ case RPCRDMA_FRMR:
+ case RPCRDMA_MTHCAFMR:
+ rpcrdma_buffer_put_mrs(req, buffers);
+ break;
+ default:
+ break;
+ }
+ spin_unlock_irqrestore(&buffers->rb_lock, flags);
+}
+
+/*
+ * Recover reply buffers from pool.
+ * This happens when recovering from error conditions.
+ * Post-increment counter/array index.
+ */
+void
+rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
+{
+ struct rpcrdma_buffer *buffers = req->rl_buffer;
+ unsigned long flags;
+
+ spin_lock_irqsave(&buffers->rb_lock, flags);
+ if (buffers->rb_recv_index < buffers->rb_max_requests) {
+ req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
+ buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
+ }
+ spin_unlock_irqrestore(&buffers->rb_lock, flags);
+}
+
+/*
+ * Put reply buffers back into pool when not attached to
+ * request. This happens in error conditions.
+ */
+void
+rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
+{
+ struct rpcrdma_buffer *buffers = rep->rr_buffer;
+ unsigned long flags;
+
+ rep->rr_func = NULL;
+ spin_lock_irqsave(&buffers->rb_lock, flags);
+ buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
+ spin_unlock_irqrestore(&buffers->rb_lock, flags);
+}
+
+/*
+ * Wrappers for internal-use kmalloc memory registration, used by buffer code.
+ */
+
+void
+rpcrdma_mapping_error(struct rpcrdma_mr_seg *seg)
+{
+ dprintk("RPC: map_one: offset %p iova %llx len %zu\n",
+ seg->mr_offset,
+ (unsigned long long)seg->mr_dma, seg->mr_dmalen);
+}
+
+static int
+rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
+ struct ib_mr **mrp, struct ib_sge *iov)
+{
+ struct ib_phys_buf ipb;
+ struct ib_mr *mr;
+ int rc;
+
+ /*
+ * All memory passed here was kmalloc'ed, therefore phys-contiguous.
+ */
+ iov->addr = ib_dma_map_single(ia->ri_id->device,
+ va, len, DMA_BIDIRECTIONAL);
+ if (ib_dma_mapping_error(ia->ri_id->device, iov->addr))
+ return -ENOMEM;
+
+ iov->length = len;
+
+ if (ia->ri_have_dma_lkey) {
+ *mrp = NULL;
+ iov->lkey = ia->ri_dma_lkey;
+ return 0;
+ } else if (ia->ri_bind_mem != NULL) {
+ *mrp = NULL;
+ iov->lkey = ia->ri_bind_mem->lkey;
+ return 0;
+ }
+
+ ipb.addr = iov->addr;
+ ipb.size = iov->length;
+ mr = ib_reg_phys_mr(ia->ri_pd, &ipb, 1,
+ IB_ACCESS_LOCAL_WRITE, &iov->addr);
+
+ dprintk("RPC: %s: phys convert: 0x%llx "
+ "registered 0x%llx length %d\n",
+ __func__, (unsigned long long)ipb.addr,
+ (unsigned long long)iov->addr, len);
+
+ if (IS_ERR(mr)) {
+ *mrp = NULL;
+ rc = PTR_ERR(mr);
+ dprintk("RPC: %s: failed with %i\n", __func__, rc);
+ } else {
+ *mrp = mr;
+ iov->lkey = mr->lkey;
+ rc = 0;
+ }
+
+ return rc;
+}
+
+static int
+rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
+ struct ib_mr *mr, struct ib_sge *iov)
+{
+ int rc;
+
+ ib_dma_unmap_single(ia->ri_id->device,
+ iov->addr, iov->length, DMA_BIDIRECTIONAL);
+
+ if (NULL == mr)
+ return 0;
+
+ rc = ib_dereg_mr(mr);
+ if (rc)
+ dprintk("RPC: %s: ib_dereg_mr failed %i\n", __func__, rc);
+ return rc;
+}
+
+/**
+ * rpcrdma_alloc_regbuf - kmalloc and register memory for SEND/RECV buffers
+ * @ia: controlling rpcrdma_ia
+ * @size: size of buffer to be allocated, in bytes
+ * @flags: GFP flags
+ *
+ * Returns pointer to private header of an area of internally
+ * registered memory, or an ERR_PTR. The registered buffer follows
+ * the end of the private header.
+ *
+ * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
+ * receiving the payload of RDMA RECV operations. regbufs are not
+ * used for RDMA READ/WRITE operations, thus are registered only for
+ * LOCAL access.
+ */
+struct rpcrdma_regbuf *
+rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size, gfp_t flags)
+{
+ struct rpcrdma_regbuf *rb;
+ int rc;
+
+ rc = -ENOMEM;
+ rb = kmalloc(sizeof(*rb) + size, flags);
+ if (rb == NULL)
+ goto out;
+
+ rb->rg_size = size;
+ rb->rg_owner = NULL;
+ rc = rpcrdma_register_internal(ia, rb->rg_base, size,
+ &rb->rg_mr, &rb->rg_iov);
+ if (rc)
+ goto out_free;
+
+ return rb;
+
+out_free:
+ kfree(rb);
+out:
+ return ERR_PTR(rc);
+}
+
+/**
+ * rpcrdma_free_regbuf - deregister and free registered buffer
+ * @ia: controlling rpcrdma_ia
+ * @rb: regbuf to be deregistered and freed
+ */
+void
+rpcrdma_free_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
+{
+ if (rb) {
+ rpcrdma_deregister_internal(ia, rb->rg_mr, &rb->rg_iov);
+ kfree(rb);
+ }
+}
+
+/*
+ * Prepost any receive buffer, then post send.
+ *
+ * Receive buffer is donated to hardware, reclaimed upon recv completion.
+ */
+int
+rpcrdma_ep_post(struct rpcrdma_ia *ia,
+ struct rpcrdma_ep *ep,
+ struct rpcrdma_req *req)
+{
+ struct ib_send_wr send_wr, *send_wr_fail;
+ struct rpcrdma_rep *rep = req->rl_reply;
+ int rc;
+
+ if (rep) {
+ rc = rpcrdma_ep_post_recv(ia, ep, rep);
+ if (rc)
+ goto out;
+ req->rl_reply = NULL;
+ }
+
+ send_wr.next = NULL;
+ send_wr.wr_id = RPCRDMA_IGNORE_COMPLETION;
+ send_wr.sg_list = req->rl_send_iov;
+ send_wr.num_sge = req->rl_niovs;
+ send_wr.opcode = IB_WR_SEND;
+ if (send_wr.num_sge == 4) /* no need to sync any pad (constant) */
+ ib_dma_sync_single_for_device(ia->ri_id->device,
+ req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
+ DMA_TO_DEVICE);
+ ib_dma_sync_single_for_device(ia->ri_id->device,
+ req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
+ DMA_TO_DEVICE);
+ ib_dma_sync_single_for_device(ia->ri_id->device,
+ req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
+ DMA_TO_DEVICE);
+
+ if (DECR_CQCOUNT(ep) > 0)
+ send_wr.send_flags = 0;
+ else { /* Provider must take a send completion every now and then */
+ INIT_CQCOUNT(ep);
+ send_wr.send_flags = IB_SEND_SIGNALED;
+ }
+
+ rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
+ if (rc)
+ dprintk("RPC: %s: ib_post_send returned %i\n", __func__,
+ rc);
+out:
+ return rc;
+}
+
+/*
+ * (Re)post a receive buffer.
+ */
+int
+rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
+ struct rpcrdma_ep *ep,
+ struct rpcrdma_rep *rep)
+{
+ struct ib_recv_wr recv_wr, *recv_wr_fail;
+ int rc;
+
+ recv_wr.next = NULL;
+ recv_wr.wr_id = (u64) (unsigned long) rep;
+ recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
+ recv_wr.num_sge = 1;
+
+ ib_dma_sync_single_for_cpu(ia->ri_id->device,
+ rdmab_addr(rep->rr_rdmabuf),
+ rdmab_length(rep->rr_rdmabuf),
+ DMA_BIDIRECTIONAL);
+
+ rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
+
+ if (rc)
+ dprintk("RPC: %s: ib_post_recv returned %i\n", __func__,
+ rc);
+ return rc;
+}
+
+/* How many chunk list items fit within our inline buffers?
+ */
+unsigned int
+rpcrdma_max_segments(struct rpcrdma_xprt *r_xprt)
+{
+ struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
+ int bytes, segments;
+
+ bytes = min_t(unsigned int, cdata->inline_wsize, cdata->inline_rsize);
+ bytes -= RPCRDMA_HDRLEN_MIN;
+ if (bytes < sizeof(struct rpcrdma_segment) * 2) {
+ pr_warn("RPC: %s: inline threshold too small\n",
+ __func__);
+ return 0;
+ }
+
+ segments = 1 << (fls(bytes / sizeof(struct rpcrdma_segment)) - 1);
+ dprintk("RPC: %s: max chunk list size = %d segments\n",
+ __func__, segments);
+ return segments;
+}
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
new file mode 100644
index 000000000..78e0b8bea
--- /dev/null
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -0,0 +1,496 @@
+/*
+ * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses. You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the BSD-type
+ * license below:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided
+ * with the distribution.
+ *
+ * Neither the name of the Network Appliance, Inc. nor the names of
+ * its contributors may be used to endorse or promote products
+ * derived from this software without specific prior written
+ * permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef _LINUX_SUNRPC_XPRT_RDMA_H
+#define _LINUX_SUNRPC_XPRT_RDMA_H
+
+#include <linux/wait.h> /* wait_queue_head_t, etc */
+#include <linux/spinlock.h> /* spinlock_t, etc */
+#include <linux/atomic.h> /* atomic_t, etc */
+#include <linux/workqueue.h> /* struct work_struct */
+
+#include <rdma/rdma_cm.h> /* RDMA connection api */
+#include <rdma/ib_verbs.h> /* RDMA verbs api */
+
+#include <linux/sunrpc/clnt.h> /* rpc_xprt */
+#include <linux/sunrpc/rpc_rdma.h> /* RPC/RDMA protocol */
+#include <linux/sunrpc/xprtrdma.h> /* xprt parameters */
+#include <linux/sunrpc/svc.h> /* RPCSVC_MAXPAYLOAD */
+
+#define RDMA_RESOLVE_TIMEOUT (5000) /* 5 seconds */
+#define RDMA_CONNECT_RETRY_MAX (2) /* retries if no listener backlog */
+
+/*
+ * Interface Adapter -- one per transport instance
+ */
+struct rpcrdma_ia {
+ const struct rpcrdma_memreg_ops *ri_ops;
+ rwlock_t ri_qplock;
+ struct rdma_cm_id *ri_id;
+ struct ib_pd *ri_pd;
+ struct ib_mr *ri_bind_mem;
+ u32 ri_dma_lkey;
+ int ri_have_dma_lkey;
+ struct completion ri_done;
+ int ri_async_rc;
+ enum rpcrdma_memreg ri_memreg_strategy;
+ unsigned int ri_max_frmr_depth;
+ struct ib_device_attr ri_devattr;
+ struct ib_qp_attr ri_qp_attr;
+ struct ib_qp_init_attr ri_qp_init_attr;
+};
+
+/*
+ * RDMA Endpoint -- one per transport instance
+ */
+
+#define RPCRDMA_WC_BUDGET (128)
+#define RPCRDMA_POLLSIZE (16)
+
+struct rpcrdma_ep {
+ atomic_t rep_cqcount;
+ int rep_cqinit;
+ int rep_connected;
+ struct ib_qp_init_attr rep_attr;
+ wait_queue_head_t rep_connect_wait;
+ struct rpcrdma_regbuf *rep_padbuf;
+ struct rdma_conn_param rep_remote_cma;
+ struct sockaddr_storage rep_remote_addr;
+ struct delayed_work rep_connect_worker;
+ struct ib_wc rep_send_wcs[RPCRDMA_POLLSIZE];
+ struct ib_wc rep_recv_wcs[RPCRDMA_POLLSIZE];
+};
+
+/*
+ * Force a signaled SEND Work Request every so often,
+ * in case the provider needs to do some housekeeping.
+ */
+#define RPCRDMA_MAX_UNSIGNALED_SENDS (32)
+
+#define INIT_CQCOUNT(ep) atomic_set(&(ep)->rep_cqcount, (ep)->rep_cqinit)
+#define DECR_CQCOUNT(ep) atomic_sub_return(1, &(ep)->rep_cqcount)
+
+/* Force completion handler to ignore the signal
+ */
+#define RPCRDMA_IGNORE_COMPLETION (0ULL)
+
+/* Registered buffer -- registered kmalloc'd memory for RDMA SEND/RECV
+ *
+ * The below structure appears at the front of a large region of kmalloc'd
+ * memory, which always starts on a good alignment boundary.
+ */
+
+struct rpcrdma_regbuf {
+ size_t rg_size;
+ struct rpcrdma_req *rg_owner;
+ struct ib_mr *rg_mr;
+ struct ib_sge rg_iov;
+ __be32 rg_base[0] __attribute__ ((aligned(256)));
+};
+
+static inline u64
+rdmab_addr(struct rpcrdma_regbuf *rb)
+{
+ return rb->rg_iov.addr;
+}
+
+static inline u32
+rdmab_length(struct rpcrdma_regbuf *rb)
+{
+ return rb->rg_iov.length;
+}
+
+static inline u32
+rdmab_lkey(struct rpcrdma_regbuf *rb)
+{
+ return rb->rg_iov.lkey;
+}
+
+static inline struct rpcrdma_msg *
+rdmab_to_msg(struct rpcrdma_regbuf *rb)
+{
+ return (struct rpcrdma_msg *)rb->rg_base;
+}
+
+/*
+ * struct rpcrdma_rep -- this structure encapsulates state required to recv
+ * and complete a reply, asychronously. It needs several pieces of
+ * state:
+ * o recv buffer (posted to provider)
+ * o ib_sge (also donated to provider)
+ * o status of reply (length, success or not)
+ * o bookkeeping state to get run by tasklet (list, etc)
+ *
+ * These are allocated during initialization, per-transport instance;
+ * however, the tasklet execution list itself is global, as it should
+ * always be pretty short.
+ *
+ * N of these are associated with a transport instance, and stored in
+ * struct rpcrdma_buffer. N is the max number of outstanding requests.
+ */
+
+/* temporary static scatter/gather max */
+#define RPCRDMA_MAX_DATA_SEGS (64) /* max scatter/gather */
+#define RPCRDMA_MAX_SEGS (RPCRDMA_MAX_DATA_SEGS + 2) /* head+tail = 2 */
+
+struct rpcrdma_buffer;
+
+struct rpcrdma_rep {
+ unsigned int rr_len;
+ struct rpcrdma_buffer *rr_buffer;
+ struct rpc_xprt *rr_xprt;
+ void (*rr_func)(struct rpcrdma_rep *);
+ struct list_head rr_list;
+ struct rpcrdma_regbuf *rr_rdmabuf;
+};
+
+/*
+ * struct rpcrdma_mw - external memory region metadata
+ *
+ * An external memory region is any buffer or page that is registered
+ * on the fly (ie, not pre-registered).
+ *
+ * Each rpcrdma_buffer has a list of free MWs anchored in rb_mws. During
+ * call_allocate, rpcrdma_buffer_get() assigns one to each segment in
+ * an rpcrdma_req. Then rpcrdma_register_external() grabs these to keep
+ * track of registration metadata while each RPC is pending.
+ * rpcrdma_deregister_external() uses this metadata to unmap and
+ * release these resources when an RPC is complete.
+ */
+enum rpcrdma_frmr_state {
+ FRMR_IS_INVALID, /* ready to be used */
+ FRMR_IS_VALID, /* in use */
+ FRMR_IS_STALE, /* failed completion */
+};
+
+struct rpcrdma_frmr {
+ struct ib_fast_reg_page_list *fr_pgl;
+ struct ib_mr *fr_mr;
+ enum rpcrdma_frmr_state fr_state;
+};
+
+struct rpcrdma_mw {
+ union {
+ struct ib_fmr *fmr;
+ struct rpcrdma_frmr frmr;
+ } r;
+ void (*mw_sendcompletion)(struct ib_wc *);
+ struct list_head mw_list;
+ struct list_head mw_all;
+};
+
+/*
+ * struct rpcrdma_req -- structure central to the request/reply sequence.
+ *
+ * N of these are associated with a transport instance, and stored in
+ * struct rpcrdma_buffer. N is the max number of outstanding requests.
+ *
+ * It includes pre-registered buffer memory for send AND recv.
+ * The recv buffer, however, is not owned by this structure, and
+ * is "donated" to the hardware when a recv is posted. When a
+ * reply is handled, the recv buffer used is given back to the
+ * struct rpcrdma_req associated with the request.
+ *
+ * In addition to the basic memory, this structure includes an array
+ * of iovs for send operations. The reason is that the iovs passed to
+ * ib_post_{send,recv} must not be modified until the work request
+ * completes.
+ *
+ * NOTES:
+ * o RPCRDMA_MAX_SEGS is the max number of addressible chunk elements we
+ * marshal. The number needed varies depending on the iov lists that
+ * are passed to us, the memory registration mode we are in, and if
+ * physical addressing is used, the layout.
+ */
+
+struct rpcrdma_mr_seg { /* chunk descriptors */
+ struct rpcrdma_mw *rl_mw; /* registered MR */
+ u64 mr_base; /* registration result */
+ u32 mr_rkey; /* registration result */
+ u32 mr_len; /* length of chunk or segment */
+ int mr_nsegs; /* number of segments in chunk or 0 */
+ enum dma_data_direction mr_dir; /* segment mapping direction */
+ dma_addr_t mr_dma; /* segment mapping address */
+ size_t mr_dmalen; /* segment mapping length */
+ struct page *mr_page; /* owning page, if any */
+ char *mr_offset; /* kva if no page, else offset */
+};
+
+struct rpcrdma_req {
+ unsigned int rl_niovs; /* 0, 2 or 4 */
+ unsigned int rl_nchunks; /* non-zero if chunks */
+ unsigned int rl_connect_cookie; /* retry detection */
+ struct rpcrdma_buffer *rl_buffer; /* home base for this structure */
+ struct rpcrdma_rep *rl_reply;/* holder for reply buffer */
+ struct ib_sge rl_send_iov[4]; /* for active requests */
+ struct rpcrdma_regbuf *rl_rdmabuf;
+ struct rpcrdma_regbuf *rl_sendbuf;
+ struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];
+};
+
+static inline struct rpcrdma_req *
+rpcr_to_rdmar(struct rpc_rqst *rqst)
+{
+ void *buffer = rqst->rq_buffer;
+ struct rpcrdma_regbuf *rb;
+
+ rb = container_of(buffer, struct rpcrdma_regbuf, rg_base);
+ return rb->rg_owner;
+}
+
+/*
+ * struct rpcrdma_buffer -- holds list/queue of pre-registered memory for
+ * inline requests/replies, and client/server credits.
+ *
+ * One of these is associated with a transport instance
+ */
+struct rpcrdma_buffer {
+ spinlock_t rb_lock; /* protects indexes */
+ u32 rb_max_requests;/* client max requests */
+ struct list_head rb_mws; /* optional memory windows/fmrs/frmrs */
+ struct list_head rb_all;
+ int rb_send_index;
+ struct rpcrdma_req **rb_send_bufs;
+ int rb_recv_index;
+ struct rpcrdma_rep **rb_recv_bufs;
+ char *rb_pool;
+};
+#define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia)
+
+/*
+ * Internal structure for transport instance creation. This
+ * exists primarily for modularity.
+ *
+ * This data should be set with mount options
+ */
+struct rpcrdma_create_data_internal {
+ struct sockaddr_storage addr; /* RDMA server address */
+ unsigned int max_requests; /* max requests (slots) in flight */
+ unsigned int rsize; /* mount rsize - max read hdr+data */
+ unsigned int wsize; /* mount wsize - max write hdr+data */
+ unsigned int inline_rsize; /* max non-rdma read data payload */
+ unsigned int inline_wsize; /* max non-rdma write data payload */
+ unsigned int padding; /* non-rdma write header padding */
+};
+
+#define RPCRDMA_INLINE_READ_THRESHOLD(rq) \
+ (rpcx_to_rdmad(rq->rq_xprt).inline_rsize)
+
+#define RPCRDMA_INLINE_WRITE_THRESHOLD(rq)\
+ (rpcx_to_rdmad(rq->rq_xprt).inline_wsize)
+
+#define RPCRDMA_INLINE_PAD_VALUE(rq)\
+ rpcx_to_rdmad(rq->rq_xprt).padding
+
+/*
+ * Statistics for RPCRDMA
+ */
+struct rpcrdma_stats {
+ unsigned long read_chunk_count;
+ unsigned long write_chunk_count;
+ unsigned long reply_chunk_count;
+
+ unsigned long long total_rdma_request;
+ unsigned long long total_rdma_reply;
+
+ unsigned long long pullup_copy_count;
+ unsigned long long fixup_copy_count;
+ unsigned long hardway_register_count;
+ unsigned long failed_marshal_count;
+ unsigned long bad_reply_count;
+};
+
+/*
+ * Per-registration mode operations
+ */
+struct rpcrdma_xprt;
+struct rpcrdma_memreg_ops {
+ int (*ro_map)(struct rpcrdma_xprt *,
+ struct rpcrdma_mr_seg *, int, bool);
+ int (*ro_unmap)(struct rpcrdma_xprt *,
+ struct rpcrdma_mr_seg *);
+ int (*ro_open)(struct rpcrdma_ia *,
+ struct rpcrdma_ep *,
+ struct rpcrdma_create_data_internal *);
+ size_t (*ro_maxpages)(struct rpcrdma_xprt *);
+ int (*ro_init)(struct rpcrdma_xprt *);
+ void (*ro_reset)(struct rpcrdma_xprt *);
+ void (*ro_destroy)(struct rpcrdma_buffer *);
+ const char *ro_displayname;
+};
+
+extern const struct rpcrdma_memreg_ops rpcrdma_fmr_memreg_ops;
+extern const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops;
+extern const struct rpcrdma_memreg_ops rpcrdma_physical_memreg_ops;
+
+/*
+ * RPCRDMA transport -- encapsulates the structures above for
+ * integration with RPC.
+ *
+ * The contained structures are embedded, not pointers,
+ * for convenience. This structure need not be visible externally.
+ *
+ * It is allocated and initialized during mount, and released
+ * during unmount.
+ */
+struct rpcrdma_xprt {
+ struct rpc_xprt rx_xprt;
+ struct rpcrdma_ia rx_ia;
+ struct rpcrdma_ep rx_ep;
+ struct rpcrdma_buffer rx_buf;
+ struct rpcrdma_create_data_internal rx_data;
+ struct delayed_work rx_connect_worker;
+ struct rpcrdma_stats rx_stats;
+};
+
+#define rpcx_to_rdmax(x) container_of(x, struct rpcrdma_xprt, rx_xprt)
+#define rpcx_to_rdmad(x) (rpcx_to_rdmax(x)->rx_data)
+
+/* Setting this to 0 ensures interoperability with early servers.
+ * Setting this to 1 enhances certain unaligned read/write performance.
+ * Default is 0, see sysctl entry and rpc_rdma.c rpcrdma_convert_iovs() */
+extern int xprt_rdma_pad_optimize;
+
+/*
+ * Interface Adapter calls - xprtrdma/verbs.c
+ */
+int rpcrdma_ia_open(struct rpcrdma_xprt *, struct sockaddr *, int);
+void rpcrdma_ia_close(struct rpcrdma_ia *);
+
+/*
+ * Endpoint calls - xprtrdma/verbs.c
+ */
+int rpcrdma_ep_create(struct rpcrdma_ep *, struct rpcrdma_ia *,
+ struct rpcrdma_create_data_internal *);
+void rpcrdma_ep_destroy(struct rpcrdma_ep *, struct rpcrdma_ia *);
+int rpcrdma_ep_connect(struct rpcrdma_ep *, struct rpcrdma_ia *);
+void rpcrdma_ep_disconnect(struct rpcrdma_ep *, struct rpcrdma_ia *);
+
+int rpcrdma_ep_post(struct rpcrdma_ia *, struct rpcrdma_ep *,
+ struct rpcrdma_req *);
+int rpcrdma_ep_post_recv(struct rpcrdma_ia *, struct rpcrdma_ep *,
+ struct rpcrdma_rep *);
+
+/*
+ * Buffer calls - xprtrdma/verbs.c
+ */
+int rpcrdma_buffer_create(struct rpcrdma_xprt *);
+void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);
+
+struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *);
+void rpcrdma_buffer_put(struct rpcrdma_req *);
+void rpcrdma_recv_buffer_get(struct rpcrdma_req *);
+void rpcrdma_recv_buffer_put(struct rpcrdma_rep *);
+
+struct rpcrdma_regbuf *rpcrdma_alloc_regbuf(struct rpcrdma_ia *,
+ size_t, gfp_t);
+void rpcrdma_free_regbuf(struct rpcrdma_ia *,
+ struct rpcrdma_regbuf *);
+
+unsigned int rpcrdma_max_segments(struct rpcrdma_xprt *);
+
+/*
+ * Wrappers for chunk registration, shared by read/write chunk code.
+ */
+
+void rpcrdma_mapping_error(struct rpcrdma_mr_seg *);
+
+static inline enum dma_data_direction
+rpcrdma_data_dir(bool writing)
+{
+ return writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE;
+}
+
+static inline void
+rpcrdma_map_one(struct ib_device *device, struct rpcrdma_mr_seg *seg,
+ enum dma_data_direction direction)
+{
+ seg->mr_dir = direction;
+ seg->mr_dmalen = seg->mr_len;
+
+ if (seg->mr_page)
+ seg->mr_dma = ib_dma_map_page(device,
+ seg->mr_page, offset_in_page(seg->mr_offset),
+ seg->mr_dmalen, seg->mr_dir);
+ else
+ seg->mr_dma = ib_dma_map_single(device,
+ seg->mr_offset,
+ seg->mr_dmalen, seg->mr_dir);
+
+ if (ib_dma_mapping_error(device, seg->mr_dma))
+ rpcrdma_mapping_error(seg);
+}
+
+static inline void
+rpcrdma_unmap_one(struct ib_device *device, struct rpcrdma_mr_seg *seg)
+{
+ if (seg->mr_page)
+ ib_dma_unmap_page(device,
+ seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
+ else
+ ib_dma_unmap_single(device,
+ seg->mr_dma, seg->mr_dmalen, seg->mr_dir);
+}
+
+/*
+ * RPC/RDMA connection management calls - xprtrdma/rpc_rdma.c
+ */
+void rpcrdma_connect_worker(struct work_struct *);
+void rpcrdma_conn_func(struct rpcrdma_ep *);
+void rpcrdma_reply_handler(struct rpcrdma_rep *);
+
+/*
+ * RPC/RDMA protocol calls - xprtrdma/rpc_rdma.c
+ */
+int rpcrdma_marshal_req(struct rpc_rqst *);
+
+/* Temporary NFS request map cache. Created in svc_rdma.c */
+extern struct kmem_cache *svc_rdma_map_cachep;
+/* WR context cache. Created in svc_rdma.c */
+extern struct kmem_cache *svc_rdma_ctxt_cachep;
+/* Workqueue created in svc_rdma.c */
+extern struct workqueue_struct *svc_rdma_wq;
+
+#if RPCSVC_MAXPAYLOAD < (RPCRDMA_MAX_DATA_SEGS << PAGE_SHIFT)
+#define RPCSVC_MAXPAYLOAD_RDMA RPCSVC_MAXPAYLOAD
+#else
+#define RPCSVC_MAXPAYLOAD_RDMA (RPCRDMA_MAX_DATA_SEGS << PAGE_SHIFT)
+#endif
+
+#endif /* _LINUX_SUNRPC_XPRT_RDMA_H */