summaryrefslogtreecommitdiff
path: root/net/sunrpc/xprtrdma/verbs.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/sunrpc/xprtrdma/verbs.c')
-rw-r--r--net/sunrpc/xprtrdma/verbs.c271
1 files changed, 191 insertions, 80 deletions
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index b044d98a1..799cce6cb 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -51,6 +51,7 @@
#include <linux/slab.h>
#include <linux/prefetch.h>
#include <linux/sunrpc/addr.h>
+#include <linux/sunrpc/svc_rdma.h>
#include <asm/bitops.h>
#include <linux/module.h> /* try_module_get()/module_put() */
@@ -379,8 +380,6 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
struct rpcrdma_ia *ia = &xprt->rx_ia;
int rc;
- ia->ri_dma_mr = NULL;
-
ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
if (IS_ERR(ia->ri_id)) {
rc = PTR_ERR(ia->ri_id);
@@ -391,47 +390,29 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
ia->ri_pd = ib_alloc_pd(ia->ri_device);
if (IS_ERR(ia->ri_pd)) {
rc = PTR_ERR(ia->ri_pd);
- dprintk("RPC: %s: ib_alloc_pd() failed %i\n",
- __func__, rc);
+ pr_err("rpcrdma: ib_alloc_pd() returned %d\n", rc);
goto out2;
}
- if (memreg == RPCRDMA_FRMR) {
- if (!(ia->ri_device->attrs.device_cap_flags &
- IB_DEVICE_MEM_MGT_EXTENSIONS) ||
- (ia->ri_device->attrs.max_fast_reg_page_list_len == 0)) {
- dprintk("RPC: %s: FRMR registration "
- "not supported by HCA\n", __func__);
- memreg = RPCRDMA_MTHCAFMR;
- }
- }
- if (memreg == RPCRDMA_MTHCAFMR) {
- if (!ia->ri_device->alloc_fmr) {
- dprintk("RPC: %s: MTHCAFMR registration "
- "not supported by HCA\n", __func__);
- rc = -EINVAL;
- goto out3;
- }
- }
-
switch (memreg) {
case RPCRDMA_FRMR:
- ia->ri_ops = &rpcrdma_frwr_memreg_ops;
- break;
- case RPCRDMA_ALLPHYSICAL:
- ia->ri_ops = &rpcrdma_physical_memreg_ops;
- break;
+ if (frwr_is_supported(ia)) {
+ ia->ri_ops = &rpcrdma_frwr_memreg_ops;
+ break;
+ }
+ /*FALLTHROUGH*/
case RPCRDMA_MTHCAFMR:
- ia->ri_ops = &rpcrdma_fmr_memreg_ops;
- break;
+ if (fmr_is_supported(ia)) {
+ ia->ri_ops = &rpcrdma_fmr_memreg_ops;
+ break;
+ }
+ /*FALLTHROUGH*/
default:
- printk(KERN_ERR "RPC: Unsupported memory "
- "registration mode: %d\n", memreg);
- rc = -ENOMEM;
+ pr_err("rpcrdma: Unsupported memory registration mode: %d\n",
+ memreg);
+ rc = -EINVAL;
goto out3;
}
- dprintk("RPC: %s: memory registration strategy is '%s'\n",
- __func__, ia->ri_ops->ro_displayname);
return 0;
@@ -585,8 +566,6 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
out2:
ib_free_cq(sendcq);
out1:
- if (ia->ri_dma_mr)
- ib_dereg_mr(ia->ri_dma_mr);
return rc;
}
@@ -600,8 +579,6 @@ out1:
void
rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
{
- int rc;
-
dprintk("RPC: %s: entering, connected is %d\n",
__func__, ep->rep_connected);
@@ -615,12 +592,6 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
ib_free_cq(ep->rep_attr.recv_cq);
ib_free_cq(ep->rep_attr.send_cq);
-
- if (ia->ri_dma_mr) {
- rc = ib_dereg_mr(ia->ri_dma_mr);
- dprintk("RPC: %s: ib_dereg_mr returned %i\n",
- __func__, rc);
- }
}
/*
@@ -777,6 +748,90 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
ib_drain_qp(ia->ri_id->qp);
}
+static void
+rpcrdma_mr_recovery_worker(struct work_struct *work)
+{
+ struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer,
+ rb_recovery_worker.work);
+ struct rpcrdma_mw *mw;
+
+ spin_lock(&buf->rb_recovery_lock);
+ while (!list_empty(&buf->rb_stale_mrs)) {
+ mw = list_first_entry(&buf->rb_stale_mrs,
+ struct rpcrdma_mw, mw_list);
+ list_del_init(&mw->mw_list);
+ spin_unlock(&buf->rb_recovery_lock);
+
+ dprintk("RPC: %s: recovering MR %p\n", __func__, mw);
+ mw->mw_xprt->rx_ia.ri_ops->ro_recover_mr(mw);
+
+ spin_lock(&buf->rb_recovery_lock);
+ }
+ spin_unlock(&buf->rb_recovery_lock);
+}
+
+void
+rpcrdma_defer_mr_recovery(struct rpcrdma_mw *mw)
+{
+ struct rpcrdma_xprt *r_xprt = mw->mw_xprt;
+ struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+
+ spin_lock(&buf->rb_recovery_lock);
+ list_add(&mw->mw_list, &buf->rb_stale_mrs);
+ spin_unlock(&buf->rb_recovery_lock);
+
+ schedule_delayed_work(&buf->rb_recovery_worker, 0);
+}
+
+static void
+rpcrdma_create_mrs(struct rpcrdma_xprt *r_xprt)
+{
+ struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+ struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+ unsigned int count;
+ LIST_HEAD(free);
+ LIST_HEAD(all);
+
+ for (count = 0; count < 32; count++) {
+ struct rpcrdma_mw *mw;
+ int rc;
+
+ mw = kzalloc(sizeof(*mw), GFP_KERNEL);
+ if (!mw)
+ break;
+
+ rc = ia->ri_ops->ro_init_mr(ia, mw);
+ if (rc) {
+ kfree(mw);
+ break;
+ }
+
+ mw->mw_xprt = r_xprt;
+
+ list_add(&mw->mw_list, &free);
+ list_add(&mw->mw_all, &all);
+ }
+
+ spin_lock(&buf->rb_mwlock);
+ list_splice(&free, &buf->rb_mws);
+ list_splice(&all, &buf->rb_all);
+ r_xprt->rx_stats.mrs_allocated += count;
+ spin_unlock(&buf->rb_mwlock);
+
+ dprintk("RPC: %s: created %u MRs\n", __func__, count);
+}
+
+static void
+rpcrdma_mr_refresh_worker(struct work_struct *work)
+{
+ struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer,
+ rb_refresh_worker.work);
+ struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
+ rx_buf);
+
+ rpcrdma_create_mrs(r_xprt);
+}
+
struct rpcrdma_req *
rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
{
@@ -793,6 +848,7 @@ rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
spin_unlock(&buffer->rb_reqslock);
req->rl_cqe.done = rpcrdma_wc_send;
req->rl_buffer = &r_xprt->rx_buf;
+ INIT_LIST_HEAD(&req->rl_registered);
return req;
}
@@ -832,17 +888,23 @@ int
rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
{
struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
- struct rpcrdma_ia *ia = &r_xprt->rx_ia;
int i, rc;
buf->rb_max_requests = r_xprt->rx_data.max_requests;
buf->rb_bc_srv_max_requests = 0;
- spin_lock_init(&buf->rb_lock);
atomic_set(&buf->rb_credits, 1);
+ spin_lock_init(&buf->rb_mwlock);
+ spin_lock_init(&buf->rb_lock);
+ spin_lock_init(&buf->rb_recovery_lock);
+ INIT_LIST_HEAD(&buf->rb_mws);
+ INIT_LIST_HEAD(&buf->rb_all);
+ INIT_LIST_HEAD(&buf->rb_stale_mrs);
+ INIT_DELAYED_WORK(&buf->rb_refresh_worker,
+ rpcrdma_mr_refresh_worker);
+ INIT_DELAYED_WORK(&buf->rb_recovery_worker,
+ rpcrdma_mr_recovery_worker);
- rc = ia->ri_ops->ro_init(r_xprt);
- if (rc)
- goto out;
+ rpcrdma_create_mrs(r_xprt);
INIT_LIST_HEAD(&buf->rb_send_bufs);
INIT_LIST_HEAD(&buf->rb_allreqs);
@@ -862,7 +924,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
}
INIT_LIST_HEAD(&buf->rb_recv_bufs);
- for (i = 0; i < buf->rb_max_requests + 2; i++) {
+ for (i = 0; i < buf->rb_max_requests + RPCRDMA_MAX_BC_REQUESTS; i++) {
struct rpcrdma_rep *rep;
rep = rpcrdma_create_rep(r_xprt);
@@ -918,17 +980,46 @@ rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
kfree(req);
}
+static void
+rpcrdma_destroy_mrs(struct rpcrdma_buffer *buf)
+{
+ struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
+ rx_buf);
+ struct rpcrdma_ia *ia = rdmab_to_ia(buf);
+ struct rpcrdma_mw *mw;
+ unsigned int count;
+
+ count = 0;
+ spin_lock(&buf->rb_mwlock);
+ while (!list_empty(&buf->rb_all)) {
+ mw = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all);
+ list_del(&mw->mw_all);
+
+ spin_unlock(&buf->rb_mwlock);
+ ia->ri_ops->ro_release_mr(mw);
+ count++;
+ spin_lock(&buf->rb_mwlock);
+ }
+ spin_unlock(&buf->rb_mwlock);
+ r_xprt->rx_stats.mrs_allocated = 0;
+
+ dprintk("RPC: %s: released %u MRs\n", __func__, count);
+}
+
void
rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
{
struct rpcrdma_ia *ia = rdmab_to_ia(buf);
+ cancel_delayed_work_sync(&buf->rb_recovery_worker);
+
while (!list_empty(&buf->rb_recv_bufs)) {
struct rpcrdma_rep *rep;
rep = rpcrdma_buffer_get_rep_locked(buf);
rpcrdma_destroy_rep(ia, rep);
}
+ buf->rb_send_count = 0;
spin_lock(&buf->rb_reqslock);
while (!list_empty(&buf->rb_allreqs)) {
@@ -943,8 +1034,9 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
spin_lock(&buf->rb_reqslock);
}
spin_unlock(&buf->rb_reqslock);
+ buf->rb_recv_count = 0;
- ia->ri_ops->ro_destroy(buf);
+ rpcrdma_destroy_mrs(buf);
}
struct rpcrdma_mw *
@@ -962,8 +1054,17 @@ rpcrdma_get_mw(struct rpcrdma_xprt *r_xprt)
spin_unlock(&buf->rb_mwlock);
if (!mw)
- pr_err("RPC: %s: no MWs available\n", __func__);
+ goto out_nomws;
return mw;
+
+out_nomws:
+ dprintk("RPC: %s: no MWs available\n", __func__);
+ schedule_delayed_work(&buf->rb_refresh_worker, 0);
+
+ /* Allow the reply handler and refresh worker to run */
+ cond_resched();
+
+ return NULL;
}
void
@@ -976,6 +1077,23 @@ rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw)
spin_unlock(&buf->rb_mwlock);
}
+static struct rpcrdma_rep *
+rpcrdma_buffer_get_rep(struct rpcrdma_buffer *buffers)
+{
+ /* If an RPC previously completed without a reply (say, a
+ * credential problem or a soft timeout occurs) then hold off
+ * on supplying more Receive buffers until the number of new
+ * pending RPCs catches up to the number of posted Receives.
+ */
+ if (unlikely(buffers->rb_send_count < buffers->rb_recv_count))
+ return NULL;
+
+ if (unlikely(list_empty(&buffers->rb_recv_bufs)))
+ return NULL;
+ buffers->rb_recv_count++;
+ return rpcrdma_buffer_get_rep_locked(buffers);
+}
+
/*
* Get a set of request/reply buffers.
*
@@ -989,10 +1107,9 @@ rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
spin_lock(&buffers->rb_lock);
if (list_empty(&buffers->rb_send_bufs))
goto out_reqbuf;
+ buffers->rb_send_count++;
req = rpcrdma_buffer_get_req_locked(buffers);
- if (list_empty(&buffers->rb_recv_bufs))
- goto out_repbuf;
- req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers);
+ req->rl_reply = rpcrdma_buffer_get_rep(buffers);
spin_unlock(&buffers->rb_lock);
return req;
@@ -1000,11 +1117,6 @@ out_reqbuf:
spin_unlock(&buffers->rb_lock);
pr_warn("RPC: %s: out of request buffers\n", __func__);
return NULL;
-out_repbuf:
- spin_unlock(&buffers->rb_lock);
- pr_warn("RPC: %s: out of reply buffers\n", __func__);
- req->rl_reply = NULL;
- return req;
}
/*
@@ -1021,9 +1133,12 @@ rpcrdma_buffer_put(struct rpcrdma_req *req)
req->rl_reply = NULL;
spin_lock(&buffers->rb_lock);
+ buffers->rb_send_count--;
list_add_tail(&req->rl_free, &buffers->rb_send_bufs);
- if (rep)
+ if (rep) {
+ buffers->rb_recv_count--;
list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
+ }
spin_unlock(&buffers->rb_lock);
}
@@ -1037,8 +1152,7 @@ rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
struct rpcrdma_buffer *buffers = req->rl_buffer;
spin_lock(&buffers->rb_lock);
- if (!list_empty(&buffers->rb_recv_bufs))
- req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers);
+ req->rl_reply = rpcrdma_buffer_get_rep(buffers);
spin_unlock(&buffers->rb_lock);
}
@@ -1052,6 +1166,7 @@ rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf;
spin_lock(&buffers->rb_lock);
+ buffers->rb_recv_count--;
list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
spin_unlock(&buffers->rb_lock);
}
@@ -1060,14 +1175,6 @@ rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
* Wrappers for internal-use kmalloc memory registration, used by buffer code.
*/
-void
-rpcrdma_mapping_error(struct rpcrdma_mr_seg *seg)
-{
- dprintk("RPC: map_one: offset %p iova %llx len %zu\n",
- seg->mr_offset,
- (unsigned long long)seg->mr_dma, seg->mr_dmalen);
-}
-
/**
* rpcrdma_alloc_regbuf - kmalloc and register memory for SEND/RECV buffers
* @ia: controlling rpcrdma_ia
@@ -1150,7 +1257,7 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
if (rep) {
rc = rpcrdma_ep_post_recv(ia, ep, rep);
if (rc)
- goto out;
+ return rc;
req->rl_reply = NULL;
}
@@ -1175,10 +1282,12 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail);
if (rc)
- dprintk("RPC: %s: ib_post_send returned %i\n", __func__,
- rc);
-out:
- return rc;
+ goto out_postsend_err;
+ return 0;
+
+out_postsend_err:
+ pr_err("rpcrdma: RDMA Send ib_post_send returned %i\n", rc);
+ return -ENOTCONN;
}
/*
@@ -1203,11 +1312,13 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
DMA_BIDIRECTIONAL);
rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
-
if (rc)
- dprintk("RPC: %s: ib_post_recv returned %i\n", __func__,
- rc);
- return rc;
+ goto out_postrecv;
+ return 0;
+
+out_postrecv:
+ pr_err("rpcrdma: ib_post_recv returned %i\n", rc);
+ return -ENOTCONN;
}
/**