diff options
Diffstat (limited to 'net/sunrpc')
-rw-r--r-- | net/sunrpc/auth.c | 8 | ||||
-rw-r--r-- | net/sunrpc/auth_generic.c | 9 | ||||
-rw-r--r-- | net/sunrpc/auth_gss/auth_gss.c | 3 | ||||
-rw-r--r-- | net/sunrpc/auth_gss/gss_krb5_mech.c | 2 | ||||
-rw-r--r-- | net/sunrpc/auth_gss/gss_mech_switch.c | 12 | ||||
-rw-r--r-- | net/sunrpc/auth_gss/svcauth_gss.c | 5 | ||||
-rw-r--r-- | net/sunrpc/auth_null.c | 1 | ||||
-rw-r--r-- | net/sunrpc/auth_unix.c | 1 | ||||
-rw-r--r-- | net/sunrpc/cache.c | 2 | ||||
-rw-r--r-- | net/sunrpc/clnt.c | 30 | ||||
-rw-r--r-- | net/sunrpc/rpc_pipe.c | 8 | ||||
-rw-r--r-- | net/sunrpc/sched.c | 67 | ||||
-rw-r--r-- | net/sunrpc/svc.c | 8 | ||||
-rw-r--r-- | net/sunrpc/svc_xprt.c | 51 | ||||
-rw-r--r-- | net/sunrpc/svcsock.c | 150 | ||||
-rw-r--r-- | net/sunrpc/xprt.c | 40 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/Makefile | 2 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/fmr_ops.c | 378 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/frwr_ops.c | 318 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/rpc_rdma.c | 274 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/transport.c | 40 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/verbs.c | 271 | ||||
-rw-r--r-- | net/sunrpc/xprtrdma/xprt_rdma.h | 111 | ||||
-rw-r--r-- | net/sunrpc/xprtsock.c | 177 |
24 files changed, 1032 insertions, 936 deletions
diff --git a/net/sunrpc/auth.c b/net/sunrpc/auth.c index 040ff627c..a7e42f9a4 100644 --- a/net/sunrpc/auth.c +++ b/net/sunrpc/auth.c @@ -51,9 +51,7 @@ static int param_set_hashtbl_sz(const char *val, const struct kernel_param *kp) ret = kstrtoul(val, 0, &num); if (ret == -EINVAL) goto out_inval; - nbits = fls(num); - if (num > (1U << nbits)) - nbits++; + nbits = fls(num - 1); if (nbits > MAX_HASHTABLE_BITS || nbits < 2) goto out_inval; *(unsigned int *)kp->arg = nbits; @@ -359,8 +357,10 @@ rpcauth_key_timeout_notify(struct rpc_auth *auth, struct rpc_cred *cred) EXPORT_SYMBOL_GPL(rpcauth_key_timeout_notify); bool -rpcauth_cred_key_to_expire(struct rpc_cred *cred) +rpcauth_cred_key_to_expire(struct rpc_auth *auth, struct rpc_cred *cred) { + if (auth->au_flags & RPCAUTH_AUTH_NO_CRKEY_TIMEOUT) + return false; if (!cred->cr_ops->crkey_to_expire) return false; return cred->cr_ops->crkey_to_expire(cred); diff --git a/net/sunrpc/auth_generic.c b/net/sunrpc/auth_generic.c index 54dd3fdea..168219535 100644 --- a/net/sunrpc/auth_generic.c +++ b/net/sunrpc/auth_generic.c @@ -224,7 +224,7 @@ generic_key_timeout(struct rpc_auth *auth, struct rpc_cred *cred) /* Fast track for non crkey_timeout (no key) underlying credentials */ - if (test_bit(RPC_CRED_NO_CRKEY_TIMEOUT, &acred->ac_flags)) + if (auth->au_flags & RPCAUTH_AUTH_NO_CRKEY_TIMEOUT) return 0; /* Fast track for the normal case */ @@ -236,12 +236,6 @@ generic_key_timeout(struct rpc_auth *auth, struct rpc_cred *cred) if (IS_ERR(tcred)) return -EACCES; - if (!tcred->cr_ops->crkey_timeout) { - set_bit(RPC_CRED_NO_CRKEY_TIMEOUT, &acred->ac_flags); - ret = 0; - goto out_put; - } - /* Test for the almost error case */ ret = tcred->cr_ops->crkey_timeout(tcred); if (ret != 0) { @@ -257,7 +251,6 @@ generic_key_timeout(struct rpc_auth *auth, struct rpc_cred *cred) set_bit(RPC_CRED_NOTIFY_TIMEOUT, &acred->ac_flags); } -out_put: put_rpccred(tcred); return ret; } diff --git a/net/sunrpc/auth_gss/auth_gss.c b/net/sunrpc/auth_gss/auth_gss.c index bf4b0e98f..976c7812b 100644 --- a/net/sunrpc/auth_gss/auth_gss.c +++ b/net/sunrpc/auth_gss/auth_gss.c @@ -1017,8 +1017,11 @@ gss_create_new(struct rpc_auth_create_args *args, struct rpc_clnt *clnt) auth = &gss_auth->rpc_auth; auth->au_cslack = GSS_CRED_SLACK >> 2; auth->au_rslack = GSS_VERF_SLACK >> 2; + auth->au_flags = 0; auth->au_ops = &authgss_ops; auth->au_flavor = flavor; + if (gss_pseudoflavor_to_datatouch(gss_auth->mech, flavor)) + auth->au_flags |= RPCAUTH_AUTH_DATATOUCH; atomic_set(&auth->au_count, 1); kref_init(&gss_auth->kref); diff --git a/net/sunrpc/auth_gss/gss_krb5_mech.c b/net/sunrpc/auth_gss/gss_krb5_mech.c index 65427492b..605958353 100644 --- a/net/sunrpc/auth_gss/gss_krb5_mech.c +++ b/net/sunrpc/auth_gss/gss_krb5_mech.c @@ -745,12 +745,14 @@ static struct pf_desc gss_kerberos_pfs[] = { .qop = GSS_C_QOP_DEFAULT, .service = RPC_GSS_SVC_INTEGRITY, .name = "krb5i", + .datatouch = true, }, [2] = { .pseudoflavor = RPC_AUTH_GSS_KRB5P, .qop = GSS_C_QOP_DEFAULT, .service = RPC_GSS_SVC_PRIVACY, .name = "krb5p", + .datatouch = true, }, }; diff --git a/net/sunrpc/auth_gss/gss_mech_switch.c b/net/sunrpc/auth_gss/gss_mech_switch.c index 7063d856a..5fec3abbe 100644 --- a/net/sunrpc/auth_gss/gss_mech_switch.c +++ b/net/sunrpc/auth_gss/gss_mech_switch.c @@ -361,6 +361,18 @@ gss_pseudoflavor_to_service(struct gss_api_mech *gm, u32 pseudoflavor) } EXPORT_SYMBOL(gss_pseudoflavor_to_service); +bool +gss_pseudoflavor_to_datatouch(struct gss_api_mech *gm, u32 pseudoflavor) +{ + int i; + + for (i = 0; i < gm->gm_pf_num; i++) { + if (gm->gm_pfs[i].pseudoflavor == pseudoflavor) + return gm->gm_pfs[i].datatouch; + } + return false; +} + char * gss_service_to_auth_domain_name(struct gss_api_mech *gm, u32 service) { diff --git a/net/sunrpc/auth_gss/svcauth_gss.c b/net/sunrpc/auth_gss/svcauth_gss.c index 4605dc73d..d8582028b 100644 --- a/net/sunrpc/auth_gss/svcauth_gss.c +++ b/net/sunrpc/auth_gss/svcauth_gss.c @@ -1231,8 +1231,9 @@ static int svcauth_gss_proxy_init(struct svc_rqst *rqstp, if (status) goto out; - dprintk("RPC: svcauth_gss: gss major status = %d\n", - ud.major_status); + dprintk("RPC: svcauth_gss: gss major status = %d " + "minor status = %d\n", + ud.major_status, ud.minor_status); switch (ud.major_status) { case GSS_S_CONTINUE_NEEDED: diff --git a/net/sunrpc/auth_null.c b/net/sunrpc/auth_null.c index 8d9eb4d5d..4d17376b2 100644 --- a/net/sunrpc/auth_null.c +++ b/net/sunrpc/auth_null.c @@ -115,6 +115,7 @@ static struct rpc_auth null_auth = { .au_cslack = NUL_CALLSLACK, .au_rslack = NUL_REPLYSLACK, + .au_flags = RPCAUTH_AUTH_NO_CRKEY_TIMEOUT, .au_ops = &authnull_ops, .au_flavor = RPC_AUTH_NULL, .au_count = ATOMIC_INIT(0), diff --git a/net/sunrpc/auth_unix.c b/net/sunrpc/auth_unix.c index 9f65452b7..a99278c98 100644 --- a/net/sunrpc/auth_unix.c +++ b/net/sunrpc/auth_unix.c @@ -228,6 +228,7 @@ static struct rpc_auth unix_auth = { .au_cslack = UNX_CALLSLACK, .au_rslack = NUL_REPLYSLACK, + .au_flags = RPCAUTH_AUTH_NO_CRKEY_TIMEOUT, .au_ops = &authunix_ops, .au_flavor = RPC_AUTH_UNIX, .au_count = ATOMIC_INIT(0), diff --git a/net/sunrpc/cache.c b/net/sunrpc/cache.c index 553bf95f7..4d8e11f94 100644 --- a/net/sunrpc/cache.c +++ b/net/sunrpc/cache.c @@ -362,7 +362,7 @@ void sunrpc_destroy_cache_detail(struct cache_detail *cd) cache_purge(cd); spin_lock(&cache_list_lock); write_lock(&cd->hash_lock); - if (cd->entries || atomic_read(&cd->inuse)) { + if (cd->entries) { write_unlock(&cd->hash_lock); spin_unlock(&cache_list_lock); goto out; diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c index 2808d550d..66f23b376 100644 --- a/net/sunrpc/clnt.c +++ b/net/sunrpc/clnt.c @@ -453,7 +453,7 @@ static struct rpc_clnt *rpc_create_xprt(struct rpc_create_args *args, struct rpc_xprt_switch *xps; if (args->bc_xprt && args->bc_xprt->xpt_bc_xps) { - WARN_ON(args->protocol != XPRT_TRANSPORT_BC_TCP); + WARN_ON_ONCE(!(args->protocol & XPRT_TRANSPORT_BC)); xps = args->bc_xprt->xpt_bc_xps; xprt_switch_get(xps); } else { @@ -520,7 +520,7 @@ struct rpc_clnt *rpc_create(struct rpc_create_args *args) char servername[48]; if (args->bc_xprt) { - WARN_ON(args->protocol != XPRT_TRANSPORT_BC_TCP); + WARN_ON_ONCE(!(args->protocol & XPRT_TRANSPORT_BC)); xprt = args->bc_xprt->xpt_bc_xprt; if (xprt) { xprt_get(xprt); @@ -2577,7 +2577,7 @@ static void rpc_cb_add_xprt_release(void *calldata) kfree(data); } -const static struct rpc_call_ops rpc_cb_add_xprt_call_ops = { +static const struct rpc_call_ops rpc_cb_add_xprt_call_ops = { .rpc_call_done = rpc_cb_add_xprt_done, .rpc_release = rpc_cb_add_xprt_release, }; @@ -2638,6 +2638,7 @@ int rpc_clnt_add_xprt(struct rpc_clnt *clnt, { struct rpc_xprt_switch *xps; struct rpc_xprt *xprt; + unsigned long reconnect_timeout; unsigned char resvport; int ret = 0; @@ -2649,6 +2650,7 @@ int rpc_clnt_add_xprt(struct rpc_clnt *clnt, return -EAGAIN; } resvport = xprt->resvport; + reconnect_timeout = xprt->max_reconnect_timeout; rcu_read_unlock(); xprt = xprt_create_transport(xprtargs); @@ -2657,6 +2659,7 @@ int rpc_clnt_add_xprt(struct rpc_clnt *clnt, goto out_put_switch; } xprt->resvport = resvport; + xprt->max_reconnect_timeout = reconnect_timeout; rpc_xprt_switch_set_roundrobin(xps); if (setup) { @@ -2673,6 +2676,27 @@ out_put_switch: } EXPORT_SYMBOL_GPL(rpc_clnt_add_xprt); +static int +rpc_xprt_cap_max_reconnect_timeout(struct rpc_clnt *clnt, + struct rpc_xprt *xprt, + void *data) +{ + unsigned long timeout = *((unsigned long *)data); + + if (timeout < xprt->max_reconnect_timeout) + xprt->max_reconnect_timeout = timeout; + return 0; +} + +void +rpc_cap_max_reconnect_timeout(struct rpc_clnt *clnt, unsigned long timeo) +{ + rpc_clnt_iterate_for_each_xprt(clnt, + rpc_xprt_cap_max_reconnect_timeout, + &timeo); +} +EXPORT_SYMBOL_GPL(rpc_cap_max_reconnect_timeout); + #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) static void rpc_show_header(void) { diff --git a/net/sunrpc/rpc_pipe.c b/net/sunrpc/rpc_pipe.c index fc48eca21..84f98cbe3 100644 --- a/net/sunrpc/rpc_pipe.c +++ b/net/sunrpc/rpc_pipe.c @@ -1386,7 +1386,7 @@ rpc_fill_super(struct super_block *sb, void *data, int silent) { struct inode *inode; struct dentry *root, *gssd_dentry; - struct net *net = data; + struct net *net = get_net(sb->s_fs_info); struct sunrpc_net *sn = net_generic(net, sunrpc_net_id); int err; @@ -1419,7 +1419,6 @@ rpc_fill_super(struct super_block *sb, void *data, int silent) sb); if (err) goto err_depopulate; - sb->s_fs_info = get_net(net); mutex_unlock(&sn->pipefs_sb_lock); return 0; @@ -1448,7 +1447,8 @@ static struct dentry * rpc_mount(struct file_system_type *fs_type, int flags, const char *dev_name, void *data) { - return mount_ns(fs_type, flags, current->nsproxy->net_ns, rpc_fill_super); + struct net *net = current->nsproxy->net_ns; + return mount_ns(fs_type, flags, data, net, net->user_ns, rpc_fill_super); } static void rpc_kill_sb(struct super_block *sb) @@ -1468,9 +1468,9 @@ static void rpc_kill_sb(struct super_block *sb) RPC_PIPEFS_UMOUNT, sb); mutex_unlock(&sn->pipefs_sb_lock); - put_net(net); out: kill_litter_super(sb); + put_net(net); } static struct file_system_type rpc_pipe_fs_type = { diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c index fcfd48d26..9ae588511 100644 --- a/net/sunrpc/sched.c +++ b/net/sunrpc/sched.c @@ -54,7 +54,8 @@ static struct rpc_wait_queue delay_queue; /* * rpciod-related stuff */ -struct workqueue_struct *rpciod_workqueue; +struct workqueue_struct *rpciod_workqueue __read_mostly; +struct workqueue_struct *xprtiod_workqueue __read_mostly; /* * Disable the timer for a given RPC task. Should be called with @@ -329,7 +330,8 @@ EXPORT_SYMBOL_GPL(__rpc_wait_for_completion_task); * lockless RPC_IS_QUEUED() test) before we've had a chance to test * the RPC_TASK_RUNNING flag. */ -static void rpc_make_runnable(struct rpc_task *task) +static void rpc_make_runnable(struct workqueue_struct *wq, + struct rpc_task *task) { bool need_wakeup = !rpc_test_and_set_running(task); @@ -338,7 +340,7 @@ static void rpc_make_runnable(struct rpc_task *task) return; if (RPC_IS_ASYNC(task)) { INIT_WORK(&task->u.tk_work, rpc_async_schedule); - queue_work(rpciod_workqueue, &task->u.tk_work); + queue_work(wq, &task->u.tk_work); } else wake_up_bit(&task->tk_runstate, RPC_TASK_QUEUED); } @@ -407,13 +409,16 @@ void rpc_sleep_on_priority(struct rpc_wait_queue *q, struct rpc_task *task, EXPORT_SYMBOL_GPL(rpc_sleep_on_priority); /** - * __rpc_do_wake_up_task - wake up a single rpc_task + * __rpc_do_wake_up_task_on_wq - wake up a single rpc_task + * @wq: workqueue on which to run task * @queue: wait queue * @task: task to be woken up * * Caller must hold queue->lock, and have cleared the task queued flag. */ -static void __rpc_do_wake_up_task(struct rpc_wait_queue *queue, struct rpc_task *task) +static void __rpc_do_wake_up_task_on_wq(struct workqueue_struct *wq, + struct rpc_wait_queue *queue, + struct rpc_task *task) { dprintk("RPC: %5u __rpc_wake_up_task (now %lu)\n", task->tk_pid, jiffies); @@ -428,7 +433,7 @@ static void __rpc_do_wake_up_task(struct rpc_wait_queue *queue, struct rpc_task __rpc_remove_wait_queue(queue, task); - rpc_make_runnable(task); + rpc_make_runnable(wq, task); dprintk("RPC: __rpc_wake_up_task done\n"); } @@ -436,16 +441,25 @@ static void __rpc_do_wake_up_task(struct rpc_wait_queue *queue, struct rpc_task /* * Wake up a queued task while the queue lock is being held */ -static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue, struct rpc_task *task) +static void rpc_wake_up_task_on_wq_queue_locked(struct workqueue_struct *wq, + struct rpc_wait_queue *queue, struct rpc_task *task) { if (RPC_IS_QUEUED(task)) { smp_rmb(); if (task->tk_waitqueue == queue) - __rpc_do_wake_up_task(queue, task); + __rpc_do_wake_up_task_on_wq(wq, queue, task); } } /* + * Wake up a queued task while the queue lock is being held + */ +static void rpc_wake_up_task_queue_locked(struct rpc_wait_queue *queue, struct rpc_task *task) +{ + rpc_wake_up_task_on_wq_queue_locked(rpciod_workqueue, queue, task); +} + +/* * Wake up a task on a specific queue */ void rpc_wake_up_queued_task(struct rpc_wait_queue *queue, struct rpc_task *task) @@ -518,7 +532,8 @@ static struct rpc_task *__rpc_find_next_queued(struct rpc_wait_queue *queue) /* * Wake up the first task on the wait queue. */ -struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *queue, +struct rpc_task *rpc_wake_up_first_on_wq(struct workqueue_struct *wq, + struct rpc_wait_queue *queue, bool (*func)(struct rpc_task *, void *), void *data) { struct rpc_task *task = NULL; @@ -529,7 +544,7 @@ struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *queue, task = __rpc_find_next_queued(queue); if (task != NULL) { if (func(task, data)) - rpc_wake_up_task_queue_locked(queue, task); + rpc_wake_up_task_on_wq_queue_locked(wq, queue, task); else task = NULL; } @@ -537,6 +552,15 @@ struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *queue, return task; } + +/* + * Wake up the first task on the wait queue. + */ +struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *queue, + bool (*func)(struct rpc_task *, void *), void *data) +{ + return rpc_wake_up_first_on_wq(rpciod_workqueue, queue, func, data); +} EXPORT_SYMBOL_GPL(rpc_wake_up_first); static bool rpc_wake_up_next_func(struct rpc_task *task, void *data) @@ -814,7 +838,7 @@ void rpc_execute(struct rpc_task *task) bool is_async = RPC_IS_ASYNC(task); rpc_set_active(task); - rpc_make_runnable(task); + rpc_make_runnable(rpciod_workqueue, task); if (!is_async) __rpc_execute(task); } @@ -1071,10 +1095,22 @@ static int rpciod_start(void) * Create the rpciod thread and wait for it to start. */ dprintk("RPC: creating workqueue rpciod\n"); - /* Note: highpri because network receive is latency sensitive */ - wq = alloc_workqueue("rpciod", WQ_MEM_RECLAIM | WQ_HIGHPRI, 0); + wq = alloc_workqueue("rpciod", WQ_MEM_RECLAIM, 0); + if (!wq) + goto out_failed; rpciod_workqueue = wq; - return rpciod_workqueue != NULL; + /* Note: highpri because network receive is latency sensitive */ + wq = alloc_workqueue("xprtiod", WQ_MEM_RECLAIM | WQ_HIGHPRI, 0); + if (!wq) + goto free_rpciod; + xprtiod_workqueue = wq; + return 1; +free_rpciod: + wq = rpciod_workqueue; + rpciod_workqueue = NULL; + destroy_workqueue(wq); +out_failed: + return 0; } static void rpciod_stop(void) @@ -1088,6 +1124,9 @@ static void rpciod_stop(void) wq = rpciod_workqueue; rpciod_workqueue = NULL; destroy_workqueue(wq); + wq = xprtiod_workqueue; + xprtiod_workqueue = NULL; + destroy_workqueue(wq); } void diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c index cc9852897..c5b0cb4f4 100644 --- a/net/sunrpc/svc.c +++ b/net/sunrpc/svc.c @@ -1188,11 +1188,17 @@ svc_process_common(struct svc_rqst *rqstp, struct kvec *argv, struct kvec *resv) *statp = procp->pc_func(rqstp, rqstp->rq_argp, rqstp->rq_resp); /* Encode reply */ - if (test_bit(RQ_DROPME, &rqstp->rq_flags)) { + if (*statp == rpc_drop_reply || + test_bit(RQ_DROPME, &rqstp->rq_flags)) { if (procp->pc_release) procp->pc_release(rqstp, NULL, rqstp->rq_resp); goto dropit; } + if (*statp == rpc_autherr_badcred) { + if (procp->pc_release) + procp->pc_release(rqstp, NULL, rqstp->rq_resp); + goto err_bad_auth; + } if (*statp == rpc_success && (xdr = procp->pc_encode) && !xdr(rqstp, resv->iov_base+resv->iov_len, rqstp->rq_resp)) { diff --git a/net/sunrpc/svc_xprt.c b/net/sunrpc/svc_xprt.c index 4f01f6310..c3f652395 100644 --- a/net/sunrpc/svc_xprt.c +++ b/net/sunrpc/svc_xprt.c @@ -21,6 +21,10 @@ #define RPCDBG_FACILITY RPCDBG_SVCXPRT +static unsigned int svc_rpc_per_connection_limit __read_mostly; +module_param(svc_rpc_per_connection_limit, uint, 0644); + + static struct svc_deferred_req *svc_deferred_dequeue(struct svc_xprt *xprt); static int svc_deferred_recv(struct svc_rqst *rqstp); static struct cache_deferred_req *svc_defer(struct cache_req *req); @@ -329,12 +333,45 @@ char *svc_print_addr(struct svc_rqst *rqstp, char *buf, size_t len) } EXPORT_SYMBOL_GPL(svc_print_addr); +static bool svc_xprt_slots_in_range(struct svc_xprt *xprt) +{ + unsigned int limit = svc_rpc_per_connection_limit; + int nrqsts = atomic_read(&xprt->xpt_nr_rqsts); + + return limit == 0 || (nrqsts >= 0 && nrqsts < limit); +} + +static bool svc_xprt_reserve_slot(struct svc_rqst *rqstp, struct svc_xprt *xprt) +{ + if (!test_bit(RQ_DATA, &rqstp->rq_flags)) { + if (!svc_xprt_slots_in_range(xprt)) + return false; + atomic_inc(&xprt->xpt_nr_rqsts); + set_bit(RQ_DATA, &rqstp->rq_flags); + } + return true; +} + +static void svc_xprt_release_slot(struct svc_rqst *rqstp) +{ + struct svc_xprt *xprt = rqstp->rq_xprt; + if (test_and_clear_bit(RQ_DATA, &rqstp->rq_flags)) { + atomic_dec(&xprt->xpt_nr_rqsts); + svc_xprt_enqueue(xprt); + } +} + static bool svc_xprt_has_something_to_do(struct svc_xprt *xprt) { if (xprt->xpt_flags & ((1<<XPT_CONN)|(1<<XPT_CLOSE))) return true; - if (xprt->xpt_flags & ((1<<XPT_DATA)|(1<<XPT_DEFERRED))) - return xprt->xpt_ops->xpo_has_wspace(xprt); + if (xprt->xpt_flags & ((1<<XPT_DATA)|(1<<XPT_DEFERRED))) { + if (xprt->xpt_ops->xpo_has_wspace(xprt) && + svc_xprt_slots_in_range(xprt)) + return true; + trace_svc_xprt_no_write_space(xprt); + return false; + } return false; } @@ -480,8 +517,6 @@ void svc_reserve(struct svc_rqst *rqstp, int space) atomic_sub((rqstp->rq_reserved - space), &xprt->xpt_reserved); rqstp->rq_reserved = space; - if (xprt->xpt_ops->xpo_adjust_wspace) - xprt->xpt_ops->xpo_adjust_wspace(xprt); svc_xprt_enqueue(xprt); } } @@ -512,8 +547,8 @@ static void svc_xprt_release(struct svc_rqst *rqstp) rqstp->rq_res.head[0].iov_len = 0; svc_reserve(rqstp, 0); + svc_xprt_release_slot(rqstp); rqstp->rq_xprt = NULL; - svc_xprt_put(xprt); } @@ -781,7 +816,7 @@ static int svc_handle_xprt(struct svc_rqst *rqstp, struct svc_xprt *xprt) svc_add_new_temp_xprt(serv, newxpt); else module_put(xprt->xpt_class->xcl_owner); - } else { + } else if (svc_xprt_reserve_slot(rqstp, xprt)) { /* XPT_DATA|XPT_DEFERRED case: */ dprintk("svc: server %p, pool %u, transport %p, inuse=%d\n", rqstp, rqstp->rq_pool->sp_id, xprt, @@ -871,6 +906,7 @@ EXPORT_SYMBOL_GPL(svc_recv); */ void svc_drop(struct svc_rqst *rqstp) { + trace_svc_drop(rqstp); dprintk("svc: xprt %p dropped request\n", rqstp->rq_xprt); svc_xprt_release(rqstp); } @@ -1148,6 +1184,7 @@ static void svc_revisit(struct cache_deferred_req *dreq, int too_many) spin_unlock(&xprt->xpt_lock); dprintk("revisit canceled\n"); svc_xprt_put(xprt); + trace_svc_drop_deferred(dr); kfree(dr); return; } @@ -1205,6 +1242,7 @@ static struct cache_deferred_req *svc_defer(struct cache_req *req) set_bit(RQ_DROPME, &rqstp->rq_flags); dr->handle.revisit = svc_revisit; + trace_svc_defer(rqstp); return &dr->handle; } @@ -1245,6 +1283,7 @@ static struct svc_deferred_req *svc_deferred_dequeue(struct svc_xprt *xprt) struct svc_deferred_req, handle.recent); list_del_init(&dr->handle.recent); + trace_svc_revisit_deferred(dr); } else clear_bit(XPT_DEFERRED, &xprt->xpt_flags); spin_unlock(&xprt->xpt_lock); diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c index dadfec66d..57625f64e 100644 --- a/net/sunrpc/svcsock.c +++ b/net/sunrpc/svcsock.c @@ -60,7 +60,6 @@ static struct svc_sock *svc_setup_socket(struct svc_serv *, struct socket *, int flags); -static void svc_udp_data_ready(struct sock *); static int svc_udp_recvfrom(struct svc_rqst *); static int svc_udp_sendto(struct svc_rqst *); static void svc_sock_detach(struct svc_xprt *); @@ -398,48 +397,21 @@ static int svc_sock_secure_port(struct svc_rqst *rqstp) return svc_port_is_privileged(svc_addr(rqstp)); } -static bool sunrpc_waitqueue_active(wait_queue_head_t *wq) -{ - if (!wq) - return false; - /* - * There should normally be a memory * barrier here--see - * wq_has_sleeper(). - * - * It appears that isn't currently necessary, though, basically - * because callers all appear to have sufficient memory barriers - * between the time the relevant change is made and the - * time they call these callbacks. - * - * The nfsd code itself doesn't actually explicitly wait on - * these waitqueues, but it may wait on them for example in - * sendpage() or sendmsg() calls. (And those may be the only - * places, since it it uses nonblocking reads.) - * - * Maybe we should add the memory barriers anyway, but these are - * hot paths so we'd need to be convinced there's no sigificant - * penalty. - */ - return waitqueue_active(wq); -} - /* * INET callback when data has been received on the socket. */ -static void svc_udp_data_ready(struct sock *sk) +static void svc_data_ready(struct sock *sk) { struct svc_sock *svsk = (struct svc_sock *)sk->sk_user_data; - wait_queue_head_t *wq = sk_sleep(sk); if (svsk) { dprintk("svc: socket %p(inet %p), busy=%d\n", svsk, sk, test_bit(XPT_BUSY, &svsk->sk_xprt.xpt_flags)); - set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags); - svc_xprt_enqueue(&svsk->sk_xprt); + svsk->sk_odata(sk); + if (!test_and_set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags)) + svc_xprt_enqueue(&svsk->sk_xprt); } - if (sunrpc_waitqueue_active(wq)) - wake_up_interruptible(wq); } /* @@ -448,56 +420,22 @@ static void svc_udp_data_ready(struct sock *sk) static void svc_write_space(struct sock *sk) { struct svc_sock *svsk = (struct svc_sock *)(sk->sk_user_data); - wait_queue_head_t *wq = sk_sleep(sk); if (svsk) { dprintk("svc: socket %p(inet %p), write_space busy=%d\n", svsk, sk, test_bit(XPT_BUSY, &svsk->sk_xprt.xpt_flags)); + svsk->sk_owspace(sk); svc_xprt_enqueue(&svsk->sk_xprt); } - - if (sunrpc_waitqueue_active(wq)) { - dprintk("RPC svc_write_space: someone sleeping on %p\n", - svsk); - wake_up_interruptible(wq); - } } static int svc_tcp_has_wspace(struct svc_xprt *xprt) { - struct svc_sock *svsk = container_of(xprt, struct svc_sock, sk_xprt); - struct svc_serv *serv = svsk->sk_xprt.xpt_server; - int required; + struct svc_sock *svsk = container_of(xprt, struct svc_sock, sk_xprt); if (test_bit(XPT_LISTENER, &xprt->xpt_flags)) return 1; - required = atomic_read(&xprt->xpt_reserved) + serv->sv_max_mesg; - if (sk_stream_wspace(svsk->sk_sk) >= required || - (sk_stream_min_wspace(svsk->sk_sk) == 0 && - atomic_read(&xprt->xpt_reserved) == 0)) - return 1; - set_bit(SOCK_NOSPACE, &svsk->sk_sock->flags); - return 0; -} - -static void svc_tcp_write_space(struct sock *sk) -{ - struct svc_sock *svsk = (struct svc_sock *)(sk->sk_user_data); - struct socket *sock = sk->sk_socket; - - if (!sk_stream_is_writeable(sk) || !sock) - return; - if (!svsk || svc_tcp_has_wspace(&svsk->sk_xprt)) - clear_bit(SOCK_NOSPACE, &sock->flags); - svc_write_space(sk); -} - -static void svc_tcp_adjust_wspace(struct svc_xprt *xprt) -{ - struct svc_sock *svsk = container_of(xprt, struct svc_sock, sk_xprt); - - if (svc_tcp_has_wspace(xprt)) - clear_bit(SOCK_NOSPACE, &svsk->sk_sock->flags); + return !test_bit(SOCK_NOSPACE, &svsk->sk_sock->flags); } /* @@ -746,7 +684,7 @@ static void svc_udp_init(struct svc_sock *svsk, struct svc_serv *serv) svc_xprt_init(sock_net(svsk->sk_sock->sk), &svc_udp_class, &svsk->sk_xprt, serv); clear_bit(XPT_CACHE_AUTH, &svsk->sk_xprt.xpt_flags); - svsk->sk_sk->sk_data_ready = svc_udp_data_ready; + svsk->sk_sk->sk_data_ready = svc_data_ready; svsk->sk_sk->sk_write_space = svc_write_space; /* initialise setting must have enough space to @@ -786,11 +724,12 @@ static void svc_udp_init(struct svc_sock *svsk, struct svc_serv *serv) static void svc_tcp_listen_data_ready(struct sock *sk) { struct svc_sock *svsk = (struct svc_sock *)sk->sk_user_data; - wait_queue_head_t *wq; dprintk("svc: socket %p TCP (listen) state change %d\n", sk, sk->sk_state); + if (svsk) + svsk->sk_odata(sk); /* * This callback may called twice when a new connection * is established as a child socket inherits everything @@ -808,10 +747,6 @@ static void svc_tcp_listen_data_ready(struct sock *sk) } else printk("svc: socket %p: no user data\n", sk); } - - wq = sk_sleep(sk); - if (sunrpc_waitqueue_active(wq)) - wake_up_interruptible_all(wq); } /* @@ -820,7 +755,6 @@ static void svc_tcp_listen_data_ready(struct sock *sk) static void svc_tcp_state_change(struct sock *sk) { struct svc_sock *svsk = (struct svc_sock *)sk->sk_user_data; - wait_queue_head_t *wq = sk_sleep(sk); dprintk("svc: socket %p TCP (connected) state change %d (svsk %p)\n", sk, sk->sk_state, sk->sk_user_data); @@ -828,26 +762,12 @@ static void svc_tcp_state_change(struct sock *sk) if (!svsk) printk("svc: socket %p: no user data\n", sk); else { - set_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags); - svc_xprt_enqueue(&svsk->sk_xprt); - } - if (sunrpc_waitqueue_active(wq)) - wake_up_interruptible_all(wq); -} - -static void svc_tcp_data_ready(struct sock *sk) -{ - struct svc_sock *svsk = (struct svc_sock *)sk->sk_user_data; - wait_queue_head_t *wq = sk_sleep(sk); - - dprintk("svc: socket %p TCP data ready (svsk %p)\n", - sk, sk->sk_user_data); - if (svsk) { - set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags); - svc_xprt_enqueue(&svsk->sk_xprt); + svsk->sk_ostate(sk); + if (sk->sk_state != TCP_ESTABLISHED) { + set_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags); + svc_xprt_enqueue(&svsk->sk_xprt); + } } - if (sunrpc_waitqueue_active(wq)) - wake_up_interruptible(wq); } /* @@ -901,6 +821,11 @@ static struct svc_xprt *svc_tcp_accept(struct svc_xprt *xprt) dprintk("%s: connect from %s\n", serv->sv_name, __svc_print_addr(sin, buf, sizeof(buf))); + /* Reset the inherited callbacks before calling svc_setup_socket */ + newsock->sk->sk_state_change = svsk->sk_ostate; + newsock->sk->sk_data_ready = svsk->sk_odata; + newsock->sk->sk_write_space = svsk->sk_owspace; + /* make sure that a write doesn't block forever when * low on memory */ @@ -1317,7 +1242,6 @@ static struct svc_xprt_ops svc_tcp_ops = { .xpo_has_wspace = svc_tcp_has_wspace, .xpo_accept = svc_tcp_accept, .xpo_secure_port = svc_sock_secure_port, - .xpo_adjust_wspace = svc_tcp_adjust_wspace, }; static struct svc_xprt_class svc_tcp_class = { @@ -1357,8 +1281,8 @@ static void svc_tcp_init(struct svc_sock *svsk, struct svc_serv *serv) } else { dprintk("setting up TCP socket for reading\n"); sk->sk_state_change = svc_tcp_state_change; - sk->sk_data_ready = svc_tcp_data_ready; - sk->sk_write_space = svc_tcp_write_space; + sk->sk_data_ready = svc_data_ready; + sk->sk_write_space = svc_write_space; svsk->sk_reclen = 0; svsk->sk_tcplen = 0; @@ -1368,8 +1292,13 @@ static void svc_tcp_init(struct svc_sock *svsk, struct svc_serv *serv) tcp_sk(sk)->nonagle |= TCP_NAGLE_OFF; set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags); - if (sk->sk_state != TCP_ESTABLISHED) + switch (sk->sk_state) { + case TCP_SYN_RECV: + case TCP_ESTABLISHED: + break; + default: set_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags); + } } } @@ -1428,17 +1357,14 @@ static struct svc_sock *svc_setup_socket(struct svc_serv *serv, /* Initialize the socket */ if (sock->type == SOCK_DGRAM) svc_udp_init(svsk, serv); - else { - /* initialise setting must have enough space to - * receive and respond to one request. - */ - svc_sock_setbufsize(svsk->sk_sock, 4 * serv->sv_max_mesg, - 4 * serv->sv_max_mesg); + else svc_tcp_init(svsk, serv); - } - dprintk("svc: svc_setup_socket created %p (inet %p)\n", - svsk, svsk->sk_sk); + dprintk("svc: svc_setup_socket created %p (inet %p), " + "listen %d close %d\n", + svsk, svsk->sk_sk, + test_bit(XPT_LISTENER, &svsk->sk_xprt.xpt_flags), + test_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags)); return svsk; } @@ -1606,18 +1532,16 @@ static void svc_sock_detach(struct svc_xprt *xprt) { struct svc_sock *svsk = container_of(xprt, struct svc_sock, sk_xprt); struct sock *sk = svsk->sk_sk; - wait_queue_head_t *wq; dprintk("svc: svc_sock_detach(%p)\n", svsk); /* put back the old socket callbacks */ + lock_sock(sk); sk->sk_state_change = svsk->sk_ostate; sk->sk_data_ready = svsk->sk_odata; sk->sk_write_space = svsk->sk_owspace; - - wq = sk_sleep(sk); - if (sunrpc_waitqueue_active(wq)) - wake_up_interruptible(wq); + sk->sk_user_data = NULL; + release_sock(sk); } /* diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index 216a13857..ea244b291 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -220,7 +220,7 @@ static void xprt_clear_locked(struct rpc_xprt *xprt) clear_bit(XPRT_LOCKED, &xprt->state); smp_mb__after_atomic(); } else - queue_work(rpciod_workqueue, &xprt->task_cleanup); + queue_work(xprtiod_workqueue, &xprt->task_cleanup); } /* @@ -295,7 +295,8 @@ static void __xprt_lock_write_next(struct rpc_xprt *xprt) if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) return; - if (rpc_wake_up_first(&xprt->sending, __xprt_lock_write_func, xprt)) + if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending, + __xprt_lock_write_func, xprt)) return; xprt_clear_locked(xprt); } @@ -324,7 +325,8 @@ static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt) return; if (RPCXPRT_CONGESTED(xprt)) goto out_unlock; - if (rpc_wake_up_first(&xprt->sending, __xprt_lock_write_cong_func, xprt)) + if (rpc_wake_up_first_on_wq(xprtiod_workqueue, &xprt->sending, + __xprt_lock_write_cong_func, xprt)) return; out_unlock: xprt_clear_locked(xprt); @@ -645,7 +647,7 @@ void xprt_force_disconnect(struct rpc_xprt *xprt) set_bit(XPRT_CLOSE_WAIT, &xprt->state); /* Try to schedule an autoclose RPC call */ if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) - queue_work(rpciod_workqueue, &xprt->task_cleanup); + queue_work(xprtiod_workqueue, &xprt->task_cleanup); xprt_wake_pending_tasks(xprt, -EAGAIN); spin_unlock_bh(&xprt->transport_lock); } @@ -672,12 +674,26 @@ void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie) set_bit(XPRT_CLOSE_WAIT, &xprt->state); /* Try to schedule an autoclose RPC call */ if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) - queue_work(rpciod_workqueue, &xprt->task_cleanup); + queue_work(xprtiod_workqueue, &xprt->task_cleanup); xprt_wake_pending_tasks(xprt, -EAGAIN); out: spin_unlock_bh(&xprt->transport_lock); } +static bool +xprt_has_timer(const struct rpc_xprt *xprt) +{ + return xprt->idle_timeout != 0; +} + +static void +xprt_schedule_autodisconnect(struct rpc_xprt *xprt) + __must_hold(&xprt->transport_lock) +{ + if (list_empty(&xprt->recv) && xprt_has_timer(xprt)) + mod_timer(&xprt->timer, xprt->last_used + xprt->idle_timeout); +} + static void xprt_init_autodisconnect(unsigned long data) { @@ -686,10 +702,12 @@ xprt_init_autodisconnect(unsigned long data) spin_lock(&xprt->transport_lock); if (!list_empty(&xprt->recv)) goto out_abort; + /* Reset xprt->last_used to avoid connect/autodisconnect cycling */ + xprt->last_used = jiffies; if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) goto out_abort; spin_unlock(&xprt->transport_lock); - queue_work(rpciod_workqueue, &xprt->task_cleanup); + queue_work(xprtiod_workqueue, &xprt->task_cleanup); return; out_abort: spin_unlock(&xprt->transport_lock); @@ -723,6 +741,7 @@ void xprt_unlock_connect(struct rpc_xprt *xprt, void *cookie) goto out; xprt->snd_task =NULL; xprt->ops->release_xprt(xprt, NULL); + xprt_schedule_autodisconnect(xprt); out: spin_unlock_bh(&xprt->transport_lock); wake_up_bit(&xprt->state, XPRT_LOCKED); @@ -886,11 +905,6 @@ static void xprt_timer(struct rpc_task *task) spin_unlock_bh(&xprt->transport_lock); } -static inline int xprt_has_timer(struct rpc_xprt *xprt) -{ - return xprt->idle_timeout != 0; -} - /** * xprt_prepare_transmit - reserve the transport before sending a request * @task: RPC task about to send a request @@ -1278,9 +1292,7 @@ void xprt_release(struct rpc_task *task) if (!list_empty(&req->rq_list)) list_del(&req->rq_list); xprt->last_used = jiffies; - if (list_empty(&xprt->recv) && xprt_has_timer(xprt)) - mod_timer(&xprt->timer, - xprt->last_used + xprt->idle_timeout); + xprt_schedule_autodisconnect(xprt); spin_unlock_bh(&xprt->transport_lock); if (req->rq_buffer) xprt->ops->buf_free(req->rq_buffer); diff --git a/net/sunrpc/xprtrdma/Makefile b/net/sunrpc/xprtrdma/Makefile index dc9f3b513..ef19fa42c 100644 --- a/net/sunrpc/xprtrdma/Makefile +++ b/net/sunrpc/xprtrdma/Makefile @@ -1,7 +1,7 @@ obj-$(CONFIG_SUNRPC_XPRT_RDMA) += rpcrdma.o rpcrdma-y := transport.o rpc_rdma.o verbs.o \ - fmr_ops.o frwr_ops.o physical_ops.o \ + fmr_ops.o frwr_ops.o \ svc_rdma.o svc_rdma_backchannel.o svc_rdma_transport.o \ svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o \ module.o diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c index 6326ebe8b..21cb3b150 100644 --- a/net/sunrpc/xprtrdma/fmr_ops.c +++ b/net/sunrpc/xprtrdma/fmr_ops.c @@ -19,13 +19,6 @@ * verb (fmr_op_unmap). */ -/* Transport recovery - * - * After a transport reconnect, fmr_op_map re-uses the MR already - * allocated for the RPC, but generates a fresh rkey then maps the - * MR again. This process is synchronous. - */ - #include "xprt_rdma.h" #if IS_ENABLED(CONFIG_SUNRPC_DEBUG) @@ -35,62 +28,132 @@ /* Maximum scatter/gather per FMR */ #define RPCRDMA_MAX_FMR_SGES (64) -static struct workqueue_struct *fmr_recovery_wq; - -#define FMR_RECOVERY_WQ_FLAGS (WQ_UNBOUND) +/* Access mode of externally registered pages */ +enum { + RPCRDMA_FMR_ACCESS_FLAGS = IB_ACCESS_REMOTE_WRITE | + IB_ACCESS_REMOTE_READ, +}; -int -fmr_alloc_recovery_wq(void) +bool +fmr_is_supported(struct rpcrdma_ia *ia) { - fmr_recovery_wq = alloc_workqueue("fmr_recovery", WQ_UNBOUND, 0); - return !fmr_recovery_wq ? -ENOMEM : 0; + if (!ia->ri_device->alloc_fmr) { + pr_info("rpcrdma: 'fmr' mode is not supported by device %s\n", + ia->ri_device->name); + return false; + } + return true; } -void -fmr_destroy_recovery_wq(void) +static int +fmr_op_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mw *mw) { - struct workqueue_struct *wq; + static struct ib_fmr_attr fmr_attr = { + .max_pages = RPCRDMA_MAX_FMR_SGES, + .max_maps = 1, + .page_shift = PAGE_SHIFT + }; - if (!fmr_recovery_wq) - return; + mw->fmr.fm_physaddrs = kcalloc(RPCRDMA_MAX_FMR_SGES, + sizeof(u64), GFP_KERNEL); + if (!mw->fmr.fm_physaddrs) + goto out_free; - wq = fmr_recovery_wq; - fmr_recovery_wq = NULL; - destroy_workqueue(wq); + mw->mw_sg = kcalloc(RPCRDMA_MAX_FMR_SGES, + sizeof(*mw->mw_sg), GFP_KERNEL); + if (!mw->mw_sg) + goto out_free; + + sg_init_table(mw->mw_sg, RPCRDMA_MAX_FMR_SGES); + + mw->fmr.fm_mr = ib_alloc_fmr(ia->ri_pd, RPCRDMA_FMR_ACCESS_FLAGS, + &fmr_attr); + if (IS_ERR(mw->fmr.fm_mr)) + goto out_fmr_err; + + return 0; + +out_fmr_err: + dprintk("RPC: %s: ib_alloc_fmr returned %ld\n", __func__, + PTR_ERR(mw->fmr.fm_mr)); + +out_free: + kfree(mw->mw_sg); + kfree(mw->fmr.fm_physaddrs); + return -ENOMEM; } static int __fmr_unmap(struct rpcrdma_mw *mw) { LIST_HEAD(l); + int rc; - list_add(&mw->fmr.fmr->list, &l); - return ib_unmap_fmr(&l); + list_add(&mw->fmr.fm_mr->list, &l); + rc = ib_unmap_fmr(&l); + list_del_init(&mw->fmr.fm_mr->list); + return rc; } -/* Deferred reset of a single FMR. Generate a fresh rkey by - * replacing the MR. There's no recovery if this fails. - */ static void -__fmr_recovery_worker(struct work_struct *work) +fmr_op_release_mr(struct rpcrdma_mw *r) { - struct rpcrdma_mw *mw = container_of(work, struct rpcrdma_mw, - mw_work); - struct rpcrdma_xprt *r_xprt = mw->mw_xprt; + LIST_HEAD(unmap_list); + int rc; - __fmr_unmap(mw); - rpcrdma_put_mw(r_xprt, mw); - return; + /* Ensure MW is not on any rl_registered list */ + if (!list_empty(&r->mw_list)) + list_del(&r->mw_list); + + kfree(r->fmr.fm_physaddrs); + kfree(r->mw_sg); + + /* In case this one was left mapped, try to unmap it + * to prevent dealloc_fmr from failing with EBUSY + */ + rc = __fmr_unmap(r); + if (rc) + pr_err("rpcrdma: final ib_unmap_fmr for %p failed %i\n", + r, rc); + + rc = ib_dealloc_fmr(r->fmr.fm_mr); + if (rc) + pr_err("rpcrdma: final ib_dealloc_fmr for %p returned %i\n", + r, rc); + + kfree(r); } -/* A broken MR was discovered in a context that can't sleep. - * Defer recovery to the recovery worker. +/* Reset of a single FMR. */ static void -__fmr_queue_recovery(struct rpcrdma_mw *mw) +fmr_op_recover_mr(struct rpcrdma_mw *mw) { - INIT_WORK(&mw->mw_work, __fmr_recovery_worker); - queue_work(fmr_recovery_wq, &mw->mw_work); + struct rpcrdma_xprt *r_xprt = mw->mw_xprt; + int rc; + + /* ORDER: invalidate first */ + rc = __fmr_unmap(mw); + + /* ORDER: then DMA unmap */ + ib_dma_unmap_sg(r_xprt->rx_ia.ri_device, + mw->mw_sg, mw->mw_nents, mw->mw_dir); + if (rc) + goto out_release; + + rpcrdma_put_mw(r_xprt, mw); + r_xprt->rx_stats.mrs_recovered++; + return; + +out_release: + pr_err("rpcrdma: FMR reset failed (%d), %p released\n", rc, mw); + r_xprt->rx_stats.mrs_orphaned++; + + spin_lock(&r_xprt->rx_buf.rb_mwlock); + list_del(&mw->mw_all); + spin_unlock(&r_xprt->rx_buf.rb_mwlock); + + fmr_op_release_mr(mw); } static int @@ -112,86 +175,21 @@ fmr_op_maxpages(struct rpcrdma_xprt *r_xprt) RPCRDMA_MAX_HDR_SEGS * RPCRDMA_MAX_FMR_SGES); } -static int -fmr_op_init(struct rpcrdma_xprt *r_xprt) -{ - struct rpcrdma_buffer *buf = &r_xprt->rx_buf; - int mr_access_flags = IB_ACCESS_REMOTE_WRITE | IB_ACCESS_REMOTE_READ; - struct ib_fmr_attr fmr_attr = { - .max_pages = RPCRDMA_MAX_FMR_SGES, - .max_maps = 1, - .page_shift = PAGE_SHIFT - }; - struct ib_pd *pd = r_xprt->rx_ia.ri_pd; - struct rpcrdma_mw *r; - int i, rc; - - spin_lock_init(&buf->rb_mwlock); - INIT_LIST_HEAD(&buf->rb_mws); - INIT_LIST_HEAD(&buf->rb_all); - - i = max_t(int, RPCRDMA_MAX_DATA_SEGS / RPCRDMA_MAX_FMR_SGES, 1); - i += 2; /* head + tail */ - i *= buf->rb_max_requests; /* one set for each RPC slot */ - dprintk("RPC: %s: initalizing %d FMRs\n", __func__, i); - - rc = -ENOMEM; - while (i--) { - r = kzalloc(sizeof(*r), GFP_KERNEL); - if (!r) - goto out; - - r->fmr.physaddrs = kmalloc(RPCRDMA_MAX_FMR_SGES * - sizeof(u64), GFP_KERNEL); - if (!r->fmr.physaddrs) - goto out_free; - - r->fmr.fmr = ib_alloc_fmr(pd, mr_access_flags, &fmr_attr); - if (IS_ERR(r->fmr.fmr)) - goto out_fmr_err; - - r->mw_xprt = r_xprt; - list_add(&r->mw_list, &buf->rb_mws); - list_add(&r->mw_all, &buf->rb_all); - } - return 0; - -out_fmr_err: - rc = PTR_ERR(r->fmr.fmr); - dprintk("RPC: %s: ib_alloc_fmr status %i\n", __func__, rc); - kfree(r->fmr.physaddrs); -out_free: - kfree(r); -out: - return rc; -} - /* Use the ib_map_phys_fmr() verb to register a memory region * for remote access via RDMA READ or RDMA WRITE. */ static int fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, - int nsegs, bool writing) + int nsegs, bool writing, struct rpcrdma_mw **out) { - struct rpcrdma_ia *ia = &r_xprt->rx_ia; - struct ib_device *device = ia->ri_device; - enum dma_data_direction direction = rpcrdma_data_dir(writing); struct rpcrdma_mr_seg *seg1 = seg; int len, pageoff, i, rc; struct rpcrdma_mw *mw; + u64 *dma_pages; - mw = seg1->rl_mw; - seg1->rl_mw = NULL; - if (!mw) { - mw = rpcrdma_get_mw(r_xprt); - if (!mw) - return -ENOMEM; - } else { - /* this is a retransmit; generate a fresh rkey */ - rc = __fmr_unmap(mw); - if (rc) - return rc; - } + mw = rpcrdma_get_mw(r_xprt); + if (!mw) + return -ENOBUFS; pageoff = offset_in_page(seg1->mr_offset); seg1->mr_offset -= pageoff; /* start of page */ @@ -200,8 +198,14 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, if (nsegs > RPCRDMA_MAX_FMR_SGES) nsegs = RPCRDMA_MAX_FMR_SGES; for (i = 0; i < nsegs;) { - rpcrdma_map_one(device, seg, direction); - mw->fmr.physaddrs[i] = seg->mr_dma; + if (seg->mr_page) + sg_set_page(&mw->mw_sg[i], + seg->mr_page, + seg->mr_len, + offset_in_page(seg->mr_offset)); + else + sg_set_buf(&mw->mw_sg[i], seg->mr_offset, + seg->mr_len); len += seg->mr_len; ++seg; ++i; @@ -210,49 +214,54 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len)) break; } - - rc = ib_map_phys_fmr(mw->fmr.fmr, mw->fmr.physaddrs, - i, seg1->mr_dma); + mw->mw_nents = i; + mw->mw_dir = rpcrdma_data_dir(writing); + if (i == 0) + goto out_dmamap_err; + + if (!ib_dma_map_sg(r_xprt->rx_ia.ri_device, + mw->mw_sg, mw->mw_nents, mw->mw_dir)) + goto out_dmamap_err; + + for (i = 0, dma_pages = mw->fmr.fm_physaddrs; i < mw->mw_nents; i++) + dma_pages[i] = sg_dma_address(&mw->mw_sg[i]); + rc = ib_map_phys_fmr(mw->fmr.fm_mr, dma_pages, mw->mw_nents, + dma_pages[0]); if (rc) goto out_maperr; - seg1->rl_mw = mw; - seg1->mr_rkey = mw->fmr.fmr->rkey; - seg1->mr_base = seg1->mr_dma + pageoff; - seg1->mr_nsegs = i; - seg1->mr_len = len; - return i; + mw->mw_handle = mw->fmr.fm_mr->rkey; + mw->mw_length = len; + mw->mw_offset = dma_pages[0] + pageoff; -out_maperr: - dprintk("RPC: %s: ib_map_phys_fmr %u@0x%llx+%i (%d) status %i\n", - __func__, len, (unsigned long long)seg1->mr_dma, - pageoff, i, rc); - while (i--) - rpcrdma_unmap_one(device, --seg); - return rc; -} + *out = mw; + return mw->mw_nents; -static void -__fmr_dma_unmap(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg) -{ - struct ib_device *device = r_xprt->rx_ia.ri_device; - int nsegs = seg->mr_nsegs; +out_dmamap_err: + pr_err("rpcrdma: failed to dma map sg %p sg_nents %u\n", + mw->mw_sg, mw->mw_nents); + rpcrdma_defer_mr_recovery(mw); + return -EIO; - while (nsegs--) - rpcrdma_unmap_one(device, seg++); +out_maperr: + pr_err("rpcrdma: ib_map_phys_fmr %u@0x%llx+%i (%d) status %i\n", + len, (unsigned long long)dma_pages[0], + pageoff, mw->mw_nents, rc); + rpcrdma_defer_mr_recovery(mw); + return -EIO; } /* Invalidate all memory regions that were registered for "req". * * Sleeps until it is safe for the host CPU to access the * previously mapped memory regions. + * + * Caller ensures that req->rl_registered is not empty. */ static void fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) { - struct rpcrdma_mr_seg *seg; - unsigned int i, nchunks; - struct rpcrdma_mw *mw; + struct rpcrdma_mw *mw, *tmp; LIST_HEAD(unmap_list); int rc; @@ -261,90 +270,54 @@ fmr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) /* ORDER: Invalidate all of the req's MRs first * * ib_unmap_fmr() is slow, so use a single call instead - * of one call per mapped MR. + * of one call per mapped FMR. */ - for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) { - seg = &req->rl_segments[i]; - mw = seg->rl_mw; - - list_add(&mw->fmr.fmr->list, &unmap_list); - - i += seg->mr_nsegs; - } + list_for_each_entry(mw, &req->rl_registered, mw_list) + list_add_tail(&mw->fmr.fm_mr->list, &unmap_list); rc = ib_unmap_fmr(&unmap_list); if (rc) - pr_warn("%s: ib_unmap_fmr failed (%i)\n", __func__, rc); + goto out_reset; /* ORDER: Now DMA unmap all of the req's MRs, and return * them to the free MW list. */ - for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) { - seg = &req->rl_segments[i]; + list_for_each_entry_safe(mw, tmp, &req->rl_registered, mw_list) { + list_del_init(&mw->mw_list); + list_del_init(&mw->fmr.fm_mr->list); + ib_dma_unmap_sg(r_xprt->rx_ia.ri_device, + mw->mw_sg, mw->mw_nents, mw->mw_dir); + rpcrdma_put_mw(r_xprt, mw); + } - __fmr_dma_unmap(r_xprt, seg); - rpcrdma_put_mw(r_xprt, seg->rl_mw); + return; - i += seg->mr_nsegs; - seg->mr_nsegs = 0; - seg->rl_mw = NULL; - } +out_reset: + pr_err("rpcrdma: ib_unmap_fmr failed (%i)\n", rc); - req->rl_nchunks = 0; + list_for_each_entry_safe(mw, tmp, &req->rl_registered, mw_list) { + list_del_init(&mw->fmr.fm_mr->list); + fmr_op_recover_mr(mw); + } } /* Use a slow, safe mechanism to invalidate all memory regions * that were registered for "req". - * - * In the asynchronous case, DMA unmapping occurs first here - * because the rpcrdma_mr_seg is released immediately after this - * call. It's contents won't be available in __fmr_dma_unmap later. - * FIXME. */ static void fmr_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, bool sync) { - struct rpcrdma_mr_seg *seg; struct rpcrdma_mw *mw; - unsigned int i; - - for (i = 0; req->rl_nchunks; req->rl_nchunks--) { - seg = &req->rl_segments[i]; - mw = seg->rl_mw; - - if (sync) { - /* ORDER */ - __fmr_unmap(mw); - __fmr_dma_unmap(r_xprt, seg); - rpcrdma_put_mw(r_xprt, mw); - } else { - __fmr_dma_unmap(r_xprt, seg); - __fmr_queue_recovery(mw); - } - - i += seg->mr_nsegs; - seg->mr_nsegs = 0; - seg->rl_mw = NULL; - } -} - -static void -fmr_op_destroy(struct rpcrdma_buffer *buf) -{ - struct rpcrdma_mw *r; - int rc; - - while (!list_empty(&buf->rb_all)) { - r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all); - list_del(&r->mw_all); - kfree(r->fmr.physaddrs); - rc = ib_dealloc_fmr(r->fmr.fmr); - if (rc) - dprintk("RPC: %s: ib_dealloc_fmr failed %i\n", - __func__, rc); + while (!list_empty(&req->rl_registered)) { + mw = list_first_entry(&req->rl_registered, + struct rpcrdma_mw, mw_list); + list_del_init(&mw->mw_list); - kfree(r); + if (sync) + fmr_op_recover_mr(mw); + else + rpcrdma_defer_mr_recovery(mw); } } @@ -352,9 +325,10 @@ const struct rpcrdma_memreg_ops rpcrdma_fmr_memreg_ops = { .ro_map = fmr_op_map, .ro_unmap_sync = fmr_op_unmap_sync, .ro_unmap_safe = fmr_op_unmap_safe, + .ro_recover_mr = fmr_op_recover_mr, .ro_open = fmr_op_open, .ro_maxpages = fmr_op_maxpages, - .ro_init = fmr_op_init, - .ro_destroy = fmr_op_destroy, + .ro_init_mr = fmr_op_init_mr, + .ro_release_mr = fmr_op_release_mr, .ro_displayname = "fmr", }; diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c index f02ab80aa..892b5e1d9 100644 --- a/net/sunrpc/xprtrdma/frwr_ops.c +++ b/net/sunrpc/xprtrdma/frwr_ops.c @@ -73,29 +73,71 @@ # define RPCDBG_FACILITY RPCDBG_TRANS #endif -static struct workqueue_struct *frwr_recovery_wq; - -#define FRWR_RECOVERY_WQ_FLAGS (WQ_UNBOUND | WQ_MEM_RECLAIM) +bool +frwr_is_supported(struct rpcrdma_ia *ia) +{ + struct ib_device_attr *attrs = &ia->ri_device->attrs; + + if (!(attrs->device_cap_flags & IB_DEVICE_MEM_MGT_EXTENSIONS)) + goto out_not_supported; + if (attrs->max_fast_reg_page_list_len == 0) + goto out_not_supported; + return true; + +out_not_supported: + pr_info("rpcrdma: 'frwr' mode is not supported by device %s\n", + ia->ri_device->name); + return false; +} -int -frwr_alloc_recovery_wq(void) +static int +frwr_op_init_mr(struct rpcrdma_ia *ia, struct rpcrdma_mw *r) { - frwr_recovery_wq = alloc_workqueue("frwr_recovery", - FRWR_RECOVERY_WQ_FLAGS, 0); - return !frwr_recovery_wq ? -ENOMEM : 0; + unsigned int depth = ia->ri_max_frmr_depth; + struct rpcrdma_frmr *f = &r->frmr; + int rc; + + f->fr_mr = ib_alloc_mr(ia->ri_pd, IB_MR_TYPE_MEM_REG, depth); + if (IS_ERR(f->fr_mr)) + goto out_mr_err; + + r->mw_sg = kcalloc(depth, sizeof(*r->mw_sg), GFP_KERNEL); + if (!r->mw_sg) + goto out_list_err; + + sg_init_table(r->mw_sg, depth); + init_completion(&f->fr_linv_done); + return 0; + +out_mr_err: + rc = PTR_ERR(f->fr_mr); + dprintk("RPC: %s: ib_alloc_mr status %i\n", + __func__, rc); + return rc; + +out_list_err: + rc = -ENOMEM; + dprintk("RPC: %s: sg allocation failure\n", + __func__); + ib_dereg_mr(f->fr_mr); + return rc; } -void -frwr_destroy_recovery_wq(void) +static void +frwr_op_release_mr(struct rpcrdma_mw *r) { - struct workqueue_struct *wq; + int rc; - if (!frwr_recovery_wq) - return; + /* Ensure MW is not on any rl_registered list */ + if (!list_empty(&r->mw_list)) + list_del(&r->mw_list); - wq = frwr_recovery_wq; - frwr_recovery_wq = NULL; - destroy_workqueue(wq); + rc = ib_dereg_mr(r->frmr.fr_mr); + if (rc) + pr_err("rpcrdma: final ib_dereg_mr for %p returned %i\n", + r, rc); + kfree(r->mw_sg); + kfree(r); } static int @@ -124,90 +166,37 @@ __frwr_reset_mr(struct rpcrdma_ia *ia, struct rpcrdma_mw *r) return 0; } -static void -__frwr_reset_and_unmap(struct rpcrdma_mw *mw) -{ - struct rpcrdma_xprt *r_xprt = mw->mw_xprt; - struct rpcrdma_ia *ia = &r_xprt->rx_ia; - int rc; - - rc = __frwr_reset_mr(ia, mw); - ib_dma_unmap_sg(ia->ri_device, mw->mw_sg, mw->mw_nents, mw->mw_dir); - if (rc) - return; - rpcrdma_put_mw(r_xprt, mw); -} - -/* Deferred reset of a single FRMR. Generate a fresh rkey by - * replacing the MR. +/* Reset of a single FRMR. Generate a fresh rkey by replacing the MR. * * There's no recovery if this fails. The FRMR is abandoned, but * remains in rb_all. It will be cleaned up when the transport is * destroyed. */ static void -__frwr_recovery_worker(struct work_struct *work) +frwr_op_recover_mr(struct rpcrdma_mw *mw) { - struct rpcrdma_mw *r = container_of(work, struct rpcrdma_mw, - mw_work); - - __frwr_reset_and_unmap(r); -} - -/* A broken MR was discovered in a context that can't sleep. - * Defer recovery to the recovery worker. - */ -static void -__frwr_queue_recovery(struct rpcrdma_mw *r) -{ - INIT_WORK(&r->mw_work, __frwr_recovery_worker); - queue_work(frwr_recovery_wq, &r->mw_work); -} - -static int -__frwr_init(struct rpcrdma_mw *r, struct ib_pd *pd, unsigned int depth) -{ - struct rpcrdma_frmr *f = &r->frmr; + struct rpcrdma_xprt *r_xprt = mw->mw_xprt; + struct rpcrdma_ia *ia = &r_xprt->rx_ia; int rc; - f->fr_mr = ib_alloc_mr(pd, IB_MR_TYPE_MEM_REG, depth); - if (IS_ERR(f->fr_mr)) - goto out_mr_err; - - r->mw_sg = kcalloc(depth, sizeof(*r->mw_sg), GFP_KERNEL); - if (!r->mw_sg) - goto out_list_err; - - sg_init_table(r->mw_sg, depth); - - init_completion(&f->fr_linv_done); - - return 0; + rc = __frwr_reset_mr(ia, mw); + ib_dma_unmap_sg(ia->ri_device, mw->mw_sg, mw->mw_nents, mw->mw_dir); + if (rc) + goto out_release; -out_mr_err: - rc = PTR_ERR(f->fr_mr); - dprintk("RPC: %s: ib_alloc_mr status %i\n", - __func__, rc); - return rc; + rpcrdma_put_mw(r_xprt, mw); + r_xprt->rx_stats.mrs_recovered++; + return; -out_list_err: - rc = -ENOMEM; - dprintk("RPC: %s: sg allocation failure\n", - __func__); - ib_dereg_mr(f->fr_mr); - return rc; -} +out_release: + pr_err("rpcrdma: FRMR reset failed %d, %p release\n", rc, mw); + r_xprt->rx_stats.mrs_orphaned++; -static void -__frwr_release(struct rpcrdma_mw *r) -{ - int rc; + spin_lock(&r_xprt->rx_buf.rb_mwlock); + list_del(&mw->mw_all); + spin_unlock(&r_xprt->rx_buf.rb_mwlock); - rc = ib_dereg_mr(r->frmr.fr_mr); - if (rc) - dprintk("RPC: %s: ib_dereg_mr status %i\n", - __func__, rc); - kfree(r->mw_sg); + frwr_op_release_mr(mw); } static int @@ -343,54 +332,14 @@ frwr_wc_localinv_wake(struct ib_cq *cq, struct ib_wc *wc) complete_all(&frmr->fr_linv_done); } -static int -frwr_op_init(struct rpcrdma_xprt *r_xprt) -{ - struct rpcrdma_buffer *buf = &r_xprt->rx_buf; - unsigned int depth = r_xprt->rx_ia.ri_max_frmr_depth; - struct ib_pd *pd = r_xprt->rx_ia.ri_pd; - int i; - - spin_lock_init(&buf->rb_mwlock); - INIT_LIST_HEAD(&buf->rb_mws); - INIT_LIST_HEAD(&buf->rb_all); - - i = max_t(int, RPCRDMA_MAX_DATA_SEGS / depth, 1); - i += 2; /* head + tail */ - i *= buf->rb_max_requests; /* one set for each RPC slot */ - dprintk("RPC: %s: initalizing %d FRMRs\n", __func__, i); - - while (i--) { - struct rpcrdma_mw *r; - int rc; - - r = kzalloc(sizeof(*r), GFP_KERNEL); - if (!r) - return -ENOMEM; - - rc = __frwr_init(r, pd, depth); - if (rc) { - kfree(r); - return rc; - } - - r->mw_xprt = r_xprt; - list_add(&r->mw_list, &buf->rb_mws); - list_add(&r->mw_all, &buf->rb_all); - } - - return 0; -} - /* Post a REG_MR Work Request to register a memory region * for remote access via RDMA READ or RDMA WRITE. */ static int frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, - int nsegs, bool writing) + int nsegs, bool writing, struct rpcrdma_mw **out) { struct rpcrdma_ia *ia = &r_xprt->rx_ia; - struct rpcrdma_mr_seg *seg1 = seg; struct rpcrdma_mw *mw; struct rpcrdma_frmr *frmr; struct ib_mr *mr; @@ -399,14 +348,13 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, int rc, i, n, dma_nents; u8 key; - mw = seg1->rl_mw; - seg1->rl_mw = NULL; + mw = NULL; do { if (mw) - __frwr_queue_recovery(mw); + rpcrdma_defer_mr_recovery(mw); mw = rpcrdma_get_mw(r_xprt); if (!mw) - return -ENOMEM; + return -ENOBUFS; } while (mw->frmr.fr_state != FRMR_IS_INVALID); frmr = &mw->frmr; frmr->fr_state = FRMR_IS_VALID; @@ -435,6 +383,8 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, } mw->mw_nents = i; mw->mw_dir = rpcrdma_data_dir(writing); + if (i == 0) + goto out_dmamap_err; dma_nents = ib_dma_map_sg(ia->ri_device, mw->mw_sg, mw->mw_nents, mw->mw_dir); @@ -468,36 +418,34 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, if (rc) goto out_senderr; - seg1->rl_mw = mw; - seg1->mr_rkey = mr->rkey; - seg1->mr_base = mr->iova; - seg1->mr_nsegs = mw->mw_nents; - seg1->mr_len = mr->length; + mw->mw_handle = mr->rkey; + mw->mw_length = mr->length; + mw->mw_offset = mr->iova; + *out = mw; return mw->mw_nents; out_dmamap_err: pr_err("rpcrdma: failed to dma map sg %p sg_nents %u\n", mw->mw_sg, mw->mw_nents); - return -ENOMEM; + rpcrdma_defer_mr_recovery(mw); + return -EIO; out_mapmr_err: pr_err("rpcrdma: failed to map mr %p (%u/%u)\n", frmr->fr_mr, n, mw->mw_nents); - rc = n < 0 ? n : -EIO; - __frwr_queue_recovery(mw); - return rc; + rpcrdma_defer_mr_recovery(mw); + return -EIO; out_senderr: - pr_err("rpcrdma: ib_post_send status %i\n", rc); - __frwr_queue_recovery(mw); - return rc; + pr_err("rpcrdma: FRMR registration ib_post_send returned %i\n", rc); + rpcrdma_defer_mr_recovery(mw); + return -ENOTCONN; } static struct ib_send_wr * -__frwr_prepare_linv_wr(struct rpcrdma_mr_seg *seg) +__frwr_prepare_linv_wr(struct rpcrdma_mw *mw) { - struct rpcrdma_mw *mw = seg->rl_mw; struct rpcrdma_frmr *f = &mw->frmr; struct ib_send_wr *invalidate_wr; @@ -517,16 +465,16 @@ __frwr_prepare_linv_wr(struct rpcrdma_mr_seg *seg) * * Sleeps until it is safe for the host CPU to access the * previously mapped memory regions. + * + * Caller ensures that req->rl_registered is not empty. */ static void frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) { struct ib_send_wr *invalidate_wrs, *pos, *prev, *bad_wr; struct rpcrdma_ia *ia = &r_xprt->rx_ia; - struct rpcrdma_mr_seg *seg; - unsigned int i, nchunks; + struct rpcrdma_mw *mw, *tmp; struct rpcrdma_frmr *f; - struct rpcrdma_mw *mw; int rc; dprintk("RPC: %s: req %p\n", __func__, req); @@ -536,22 +484,18 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) * Chain the LOCAL_INV Work Requests and post them with * a single ib_post_send() call. */ + f = NULL; invalidate_wrs = pos = prev = NULL; - seg = NULL; - for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) { - seg = &req->rl_segments[i]; - - pos = __frwr_prepare_linv_wr(seg); + list_for_each_entry(mw, &req->rl_registered, mw_list) { + pos = __frwr_prepare_linv_wr(mw); if (!invalidate_wrs) invalidate_wrs = pos; else prev->next = pos; prev = pos; - - i += seg->mr_nsegs; + f = &mw->frmr; } - f = &seg->rl_mw->frmr; /* Strong send queue ordering guarantees that when the * last WR in the chain completes, all WRs in the chain @@ -576,39 +520,27 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) * them to the free MW list. */ unmap: - for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) { - seg = &req->rl_segments[i]; - mw = seg->rl_mw; - seg->rl_mw = NULL; - + list_for_each_entry_safe(mw, tmp, &req->rl_registered, mw_list) { + list_del_init(&mw->mw_list); ib_dma_unmap_sg(ia->ri_device, mw->mw_sg, mw->mw_nents, mw->mw_dir); rpcrdma_put_mw(r_xprt, mw); - - i += seg->mr_nsegs; - seg->mr_nsegs = 0; } - - req->rl_nchunks = 0; return; reset_mrs: - pr_warn("%s: ib_post_send failed %i\n", __func__, rc); + pr_err("rpcrdma: FRMR invalidate ib_post_send returned %i\n", rc); + rdma_disconnect(ia->ri_id); /* Find and reset the MRs in the LOCAL_INV WRs that did not * get posted. This is synchronous, and slow. */ - for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) { - seg = &req->rl_segments[i]; - mw = seg->rl_mw; + list_for_each_entry(mw, &req->rl_registered, mw_list) { f = &mw->frmr; - if (mw->frmr.fr_mr->rkey == bad_wr->ex.invalidate_rkey) { __frwr_reset_mr(ia, mw); bad_wr = bad_wr->next; } - - i += seg->mr_nsegs; } goto unmap; } @@ -620,38 +552,17 @@ static void frwr_op_unmap_safe(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, bool sync) { - struct rpcrdma_mr_seg *seg; struct rpcrdma_mw *mw; - unsigned int i; - for (i = 0; req->rl_nchunks; req->rl_nchunks--) { - seg = &req->rl_segments[i]; - mw = seg->rl_mw; + while (!list_empty(&req->rl_registered)) { + mw = list_first_entry(&req->rl_registered, + struct rpcrdma_mw, mw_list); + list_del_init(&mw->mw_list); if (sync) - __frwr_reset_and_unmap(mw); + frwr_op_recover_mr(mw); else - __frwr_queue_recovery(mw); - - i += seg->mr_nsegs; - seg->mr_nsegs = 0; - seg->rl_mw = NULL; - } -} - -static void -frwr_op_destroy(struct rpcrdma_buffer *buf) -{ - struct rpcrdma_mw *r; - - /* Ensure stale MWs for "buf" are no longer in flight */ - flush_workqueue(frwr_recovery_wq); - - while (!list_empty(&buf->rb_all)) { - r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all); - list_del(&r->mw_all); - __frwr_release(r); - kfree(r); + rpcrdma_defer_mr_recovery(mw); } } @@ -659,9 +570,10 @@ const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops = { .ro_map = frwr_op_map, .ro_unmap_sync = frwr_op_unmap_sync, .ro_unmap_safe = frwr_op_unmap_safe, + .ro_recover_mr = frwr_op_recover_mr, .ro_open = frwr_op_open, .ro_maxpages = frwr_op_maxpages, - .ro_init = frwr_op_init, - .ro_destroy = frwr_op_destroy, + .ro_init_mr = frwr_op_init_mr, + .ro_release_mr = frwr_op_release_mr, .ro_displayname = "frwr", }; diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index 35a81096e..a47f170b2 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -196,8 +196,7 @@ rpcrdma_tail_pullup(struct xdr_buf *buf) * MR when they can. */ static int -rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg, - int n, int nsegs) +rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg, int n) { size_t page_offset; u32 remaining; @@ -206,7 +205,7 @@ rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg, base = vec->iov_base; page_offset = offset_in_page(base); remaining = vec->iov_len; - while (remaining && n < nsegs) { + while (remaining && n < RPCRDMA_MAX_SEGS) { seg[n].mr_page = NULL; seg[n].mr_offset = base; seg[n].mr_len = min_t(u32, PAGE_SIZE - page_offset, remaining); @@ -230,34 +229,34 @@ rpcrdma_convert_kvec(struct kvec *vec, struct rpcrdma_mr_seg *seg, static int rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos, - enum rpcrdma_chunktype type, struct rpcrdma_mr_seg *seg, int nsegs) + enum rpcrdma_chunktype type, struct rpcrdma_mr_seg *seg) { - int len, n = 0, p; - int page_base; + int len, n, p, page_base; struct page **ppages; + n = 0; if (pos == 0) { - n = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, n, nsegs); - if (n == nsegs) - return -EIO; + n = rpcrdma_convert_kvec(&xdrbuf->head[0], seg, n); + if (n == RPCRDMA_MAX_SEGS) + goto out_overflow; } len = xdrbuf->page_len; ppages = xdrbuf->pages + (xdrbuf->page_base >> PAGE_SHIFT); page_base = xdrbuf->page_base & ~PAGE_MASK; p = 0; - while (len && n < nsegs) { + while (len && n < RPCRDMA_MAX_SEGS) { if (!ppages[p]) { /* alloc the pagelist for receiving buffer */ ppages[p] = alloc_page(GFP_ATOMIC); if (!ppages[p]) - return -ENOMEM; + return -EAGAIN; } seg[n].mr_page = ppages[p]; seg[n].mr_offset = (void *)(unsigned long) page_base; seg[n].mr_len = min_t(u32, PAGE_SIZE - page_base, len); if (seg[n].mr_len > PAGE_SIZE) - return -EIO; + goto out_overflow; len -= seg[n].mr_len; ++n; ++p; @@ -265,8 +264,8 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos, } /* Message overflows the seg array */ - if (len && n == nsegs) - return -EIO; + if (len && n == RPCRDMA_MAX_SEGS) + goto out_overflow; /* When encoding the read list, the tail is always sent inline */ if (type == rpcrdma_readch) @@ -277,20 +276,24 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos, * xdr pad bytes, saving the server an RDMA operation. */ if (xdrbuf->tail[0].iov_len < 4 && xprt_rdma_pad_optimize) return n; - n = rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, n, nsegs); - if (n == nsegs) - return -EIO; + n = rpcrdma_convert_kvec(&xdrbuf->tail[0], seg, n); + if (n == RPCRDMA_MAX_SEGS) + goto out_overflow; } return n; + +out_overflow: + pr_err("rpcrdma: segment array overflow\n"); + return -EIO; } static inline __be32 * -xdr_encode_rdma_segment(__be32 *iptr, struct rpcrdma_mr_seg *seg) +xdr_encode_rdma_segment(__be32 *iptr, struct rpcrdma_mw *mw) { - *iptr++ = cpu_to_be32(seg->mr_rkey); - *iptr++ = cpu_to_be32(seg->mr_len); - return xdr_encode_hyper(iptr, seg->mr_base); + *iptr++ = cpu_to_be32(mw->mw_handle); + *iptr++ = cpu_to_be32(mw->mw_length); + return xdr_encode_hyper(iptr, mw->mw_offset); } /* XDR-encode the Read list. Supports encoding a list of read @@ -310,7 +313,8 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, struct rpc_rqst *rqst, __be32 *iptr, enum rpcrdma_chunktype rtype) { - struct rpcrdma_mr_seg *seg = req->rl_nextseg; + struct rpcrdma_mr_seg *seg; + struct rpcrdma_mw *mw; unsigned int pos; int n, nsegs; @@ -322,15 +326,17 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, pos = rqst->rq_snd_buf.head[0].iov_len; if (rtype == rpcrdma_areadch) pos = 0; - nsegs = rpcrdma_convert_iovs(&rqst->rq_snd_buf, pos, rtype, seg, - RPCRDMA_MAX_SEGS - req->rl_nchunks); + seg = req->rl_segments; + nsegs = rpcrdma_convert_iovs(&rqst->rq_snd_buf, pos, rtype, seg); if (nsegs < 0) return ERR_PTR(nsegs); do { - n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, false); - if (n <= 0) + n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, + false, &mw); + if (n < 0) return ERR_PTR(n); + list_add(&mw->mw_list, &req->rl_registered); *iptr++ = xdr_one; /* item present */ @@ -338,20 +344,17 @@ rpcrdma_encode_read_list(struct rpcrdma_xprt *r_xprt, * have the same "position". */ *iptr++ = cpu_to_be32(pos); - iptr = xdr_encode_rdma_segment(iptr, seg); + iptr = xdr_encode_rdma_segment(iptr, mw); - dprintk("RPC: %5u %s: read segment pos %u " - "%d@0x%016llx:0x%08x (%s)\n", + dprintk("RPC: %5u %s: pos %u %u@0x%016llx:0x%08x (%s)\n", rqst->rq_task->tk_pid, __func__, pos, - seg->mr_len, (unsigned long long)seg->mr_base, - seg->mr_rkey, n < nsegs ? "more" : "last"); + mw->mw_length, (unsigned long long)mw->mw_offset, + mw->mw_handle, n < nsegs ? "more" : "last"); r_xprt->rx_stats.read_chunk_count++; - req->rl_nchunks++; seg += n; nsegs -= n; } while (nsegs); - req->rl_nextseg = seg; /* Finish Read list */ *iptr++ = xdr_zero; /* Next item not present */ @@ -375,7 +378,8 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, struct rpc_rqst *rqst, __be32 *iptr, enum rpcrdma_chunktype wtype) { - struct rpcrdma_mr_seg *seg = req->rl_nextseg; + struct rpcrdma_mr_seg *seg; + struct rpcrdma_mw *mw; int n, nsegs, nchunks; __be32 *segcount; @@ -384,10 +388,10 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, return iptr; } + seg = req->rl_segments; nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf, rqst->rq_rcv_buf.head[0].iov_len, - wtype, seg, - RPCRDMA_MAX_SEGS - req->rl_nchunks); + wtype, seg); if (nsegs < 0) return ERR_PTR(nsegs); @@ -396,26 +400,25 @@ rpcrdma_encode_write_list(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, nchunks = 0; do { - n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, true); - if (n <= 0) + n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, + true, &mw); + if (n < 0) return ERR_PTR(n); + list_add(&mw->mw_list, &req->rl_registered); - iptr = xdr_encode_rdma_segment(iptr, seg); + iptr = xdr_encode_rdma_segment(iptr, mw); - dprintk("RPC: %5u %s: write segment " - "%d@0x016%llx:0x%08x (%s)\n", + dprintk("RPC: %5u %s: %u@0x016%llx:0x%08x (%s)\n", rqst->rq_task->tk_pid, __func__, - seg->mr_len, (unsigned long long)seg->mr_base, - seg->mr_rkey, n < nsegs ? "more" : "last"); + mw->mw_length, (unsigned long long)mw->mw_offset, + mw->mw_handle, n < nsegs ? "more" : "last"); r_xprt->rx_stats.write_chunk_count++; r_xprt->rx_stats.total_rdma_request += seg->mr_len; - req->rl_nchunks++; nchunks++; seg += n; nsegs -= n; } while (nsegs); - req->rl_nextseg = seg; /* Update count of segments in this Write chunk */ *segcount = cpu_to_be32(nchunks); @@ -442,7 +445,8 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, struct rpc_rqst *rqst, __be32 *iptr, enum rpcrdma_chunktype wtype) { - struct rpcrdma_mr_seg *seg = req->rl_nextseg; + struct rpcrdma_mr_seg *seg; + struct rpcrdma_mw *mw; int n, nsegs, nchunks; __be32 *segcount; @@ -451,8 +455,8 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, return iptr; } - nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf, 0, wtype, seg, - RPCRDMA_MAX_SEGS - req->rl_nchunks); + seg = req->rl_segments; + nsegs = rpcrdma_convert_iovs(&rqst->rq_rcv_buf, 0, wtype, seg); if (nsegs < 0) return ERR_PTR(nsegs); @@ -461,26 +465,25 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, nchunks = 0; do { - n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, true); - if (n <= 0) + n = r_xprt->rx_ia.ri_ops->ro_map(r_xprt, seg, nsegs, + true, &mw); + if (n < 0) return ERR_PTR(n); + list_add(&mw->mw_list, &req->rl_registered); - iptr = xdr_encode_rdma_segment(iptr, seg); + iptr = xdr_encode_rdma_segment(iptr, mw); - dprintk("RPC: %5u %s: reply segment " - "%d@0x%016llx:0x%08x (%s)\n", + dprintk("RPC: %5u %s: %u@0x%016llx:0x%08x (%s)\n", rqst->rq_task->tk_pid, __func__, - seg->mr_len, (unsigned long long)seg->mr_base, - seg->mr_rkey, n < nsegs ? "more" : "last"); + mw->mw_length, (unsigned long long)mw->mw_offset, + mw->mw_handle, n < nsegs ? "more" : "last"); r_xprt->rx_stats.reply_chunk_count++; r_xprt->rx_stats.total_rdma_request += seg->mr_len; - req->rl_nchunks++; nchunks++; seg += n; nsegs -= n; } while (nsegs); - req->rl_nextseg = seg; /* Update count of segments in the Reply chunk */ *segcount = cpu_to_be32(nchunks); @@ -567,6 +570,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) struct rpcrdma_req *req = rpcr_to_rdmar(rqst); enum rpcrdma_chunktype rtype, wtype; struct rpcrdma_msg *headerp; + bool ddp_allowed; ssize_t hdrlen; size_t rpclen; __be32 *iptr; @@ -583,6 +587,13 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) headerp->rm_credit = cpu_to_be32(r_xprt->rx_buf.rb_max_requests); headerp->rm_type = rdma_msg; + /* When the ULP employs a GSS flavor that guarantees integrity + * or privacy, direct data placement of individual data items + * is not allowed. + */ + ddp_allowed = !(rqst->rq_cred->cr_auth->au_flags & + RPCAUTH_AUTH_DATATOUCH); + /* * Chunks needed for results? * @@ -594,7 +605,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) */ if (rpcrdma_results_inline(r_xprt, rqst)) wtype = rpcrdma_noch; - else if (rqst->rq_rcv_buf.flags & XDRBUF_READ) + else if (ddp_allowed && rqst->rq_rcv_buf.flags & XDRBUF_READ) wtype = rpcrdma_writech; else wtype = rpcrdma_replych; @@ -617,7 +628,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) rtype = rpcrdma_noch; rpcrdma_inline_pullup(rqst); rpclen = rqst->rq_svec[0].iov_len; - } else if (rqst->rq_snd_buf.flags & XDRBUF_WRITE) { + } else if (ddp_allowed && rqst->rq_snd_buf.flags & XDRBUF_WRITE) { rtype = rpcrdma_readch; rpclen = rqst->rq_svec[0].iov_len; rpclen += rpcrdma_tail_pullup(&rqst->rq_snd_buf); @@ -650,8 +661,6 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) * send a Call message with a Position Zero Read chunk and a * regular Read chunk at the same time. */ - req->rl_nchunks = 0; - req->rl_nextseg = req->rl_segments; iptr = headerp->rm_body.rm_chunks; iptr = rpcrdma_encode_read_list(r_xprt, req, rqst, iptr, rtype); if (IS_ERR(iptr)) @@ -690,10 +699,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst) out_overflow: pr_err("rpcrdma: send overflow: hdrlen %zd rpclen %zu %s/%s\n", hdrlen, rpclen, transfertypes[rtype], transfertypes[wtype]); - /* Terminate this RPC. Chunks registered above will be - * released by xprt_release -> xprt_rmda_free . - */ - return -EIO; + iptr = ERR_PTR(-EIO); out_unmap: r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req, false); @@ -705,15 +711,13 @@ out_unmap: * RDMA'd by server. See map at rpcrdma_create_chunks()! :-) */ static int -rpcrdma_count_chunks(struct rpcrdma_rep *rep, unsigned int max, int wrchunk, __be32 **iptrp) +rpcrdma_count_chunks(struct rpcrdma_rep *rep, int wrchunk, __be32 **iptrp) { unsigned int i, total_len; struct rpcrdma_write_chunk *cur_wchunk; char *base = (char *)rdmab_to_msg(rep->rr_rdmabuf); i = be32_to_cpu(**iptrp); - if (i > max) - return -1; cur_wchunk = (struct rpcrdma_write_chunk *) (*iptrp + 1); total_len = 0; while (i--) { @@ -744,45 +748,66 @@ rpcrdma_count_chunks(struct rpcrdma_rep *rep, unsigned int max, int wrchunk, __b return total_len; } -/* - * Scatter inline received data back into provided iov's. +/** + * rpcrdma_inline_fixup - Scatter inline received data into rqst's iovecs + * @rqst: controlling RPC request + * @srcp: points to RPC message payload in receive buffer + * @copy_len: remaining length of receive buffer content + * @pad: Write chunk pad bytes needed (zero for pure inline) + * + * The upper layer has set the maximum number of bytes it can + * receive in each component of rq_rcv_buf. These values are set in + * the head.iov_len, page_len, tail.iov_len, and buflen fields. + * + * Unlike the TCP equivalent (xdr_partial_copy_from_skb), in + * many cases this function simply updates iov_base pointers in + * rq_rcv_buf to point directly to the received reply data, to + * avoid copying reply data. + * + * Returns the count of bytes which had to be memcopied. */ -static void +static unsigned long rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad) { - int i, npages, curlen, olen; + unsigned long fixup_copy_count; + int i, npages, curlen; char *destp; struct page **ppages; int page_base; + /* The head iovec is redirected to the RPC reply message + * in the receive buffer, to avoid a memcopy. + */ + rqst->rq_rcv_buf.head[0].iov_base = srcp; + rqst->rq_private_buf.head[0].iov_base = srcp; + + /* The contents of the receive buffer that follow + * head.iov_len bytes are copied into the page list. + */ curlen = rqst->rq_rcv_buf.head[0].iov_len; - if (curlen > copy_len) { /* write chunk header fixup */ + if (curlen > copy_len) curlen = copy_len; - rqst->rq_rcv_buf.head[0].iov_len = curlen; - } - dprintk("RPC: %s: srcp 0x%p len %d hdrlen %d\n", __func__, srcp, copy_len, curlen); - - /* Shift pointer for first receive segment only */ - rqst->rq_rcv_buf.head[0].iov_base = srcp; srcp += curlen; copy_len -= curlen; - olen = copy_len; - i = 0; - rpcx_to_rdmax(rqst->rq_xprt)->rx_stats.fixup_copy_count += olen; page_base = rqst->rq_rcv_buf.page_base; ppages = rqst->rq_rcv_buf.pages + (page_base >> PAGE_SHIFT); page_base &= ~PAGE_MASK; - + fixup_copy_count = 0; if (copy_len && rqst->rq_rcv_buf.page_len) { - npages = PAGE_ALIGN(page_base + - rqst->rq_rcv_buf.page_len) >> PAGE_SHIFT; - for (; i < npages; i++) { + int pagelist_len; + + pagelist_len = rqst->rq_rcv_buf.page_len; + if (pagelist_len > copy_len) + pagelist_len = copy_len; + npages = PAGE_ALIGN(page_base + pagelist_len) >> PAGE_SHIFT; + for (i = 0; i < npages; i++) { curlen = PAGE_SIZE - page_base; - if (curlen > copy_len) - curlen = copy_len; + if (curlen > pagelist_len) + curlen = pagelist_len; + dprintk("RPC: %s: page %d" " srcp 0x%p len %d curlen %d\n", __func__, i, srcp, copy_len, curlen); @@ -792,39 +817,32 @@ rpcrdma_inline_fixup(struct rpc_rqst *rqst, char *srcp, int copy_len, int pad) kunmap_atomic(destp); srcp += curlen; copy_len -= curlen; - if (copy_len == 0) + fixup_copy_count += curlen; + pagelist_len -= curlen; + if (!pagelist_len) break; page_base = 0; } - } - if (copy_len && rqst->rq_rcv_buf.tail[0].iov_len) { - curlen = copy_len; - if (curlen > rqst->rq_rcv_buf.tail[0].iov_len) - curlen = rqst->rq_rcv_buf.tail[0].iov_len; - if (rqst->rq_rcv_buf.tail[0].iov_base != srcp) - memmove(rqst->rq_rcv_buf.tail[0].iov_base, srcp, curlen); - dprintk("RPC: %s: tail srcp 0x%p len %d curlen %d\n", - __func__, srcp, copy_len, curlen); - rqst->rq_rcv_buf.tail[0].iov_len = curlen; - copy_len -= curlen; ++i; - } else - rqst->rq_rcv_buf.tail[0].iov_len = 0; - - if (pad) { - /* implicit padding on terminal chunk */ - unsigned char *p = rqst->rq_rcv_buf.tail[0].iov_base; - while (pad--) - p[rqst->rq_rcv_buf.tail[0].iov_len++] = 0; + /* Implicit padding for the last segment in a Write + * chunk is inserted inline at the front of the tail + * iovec. The upper layer ignores the content of + * the pad. Simply ensure inline content in the tail + * that follows the Write chunk is properly aligned. + */ + if (pad) + srcp -= pad; } - if (copy_len) - dprintk("RPC: %s: %d bytes in" - " %d extra segments (%d lost)\n", - __func__, olen, i, copy_len); + /* The tail iovec is redirected to the remaining data + * in the receive buffer, to avoid a memcopy. + */ + if (copy_len || pad) { + rqst->rq_rcv_buf.tail[0].iov_base = srcp; + rqst->rq_private_buf.tail[0].iov_base = srcp; + } - /* TBD avoid a warning from call_decode() */ - rqst->rq_private_buf = rqst->rq_rcv_buf; + return fixup_copy_count; } void @@ -960,14 +978,13 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep) (headerp->rm_body.rm_chunks[1] == xdr_zero && headerp->rm_body.rm_chunks[2] != xdr_zero) || (headerp->rm_body.rm_chunks[1] != xdr_zero && - req->rl_nchunks == 0)) + list_empty(&req->rl_registered))) goto badheader; if (headerp->rm_body.rm_chunks[1] != xdr_zero) { /* count any expected write chunks in read reply */ /* start at write chunk array count */ iptr = &headerp->rm_body.rm_chunks[2]; - rdmalen = rpcrdma_count_chunks(rep, - req->rl_nchunks, 1, &iptr); + rdmalen = rpcrdma_count_chunks(rep, 1, &iptr); /* check for validity, and no reply chunk after */ if (rdmalen < 0 || *iptr++ != xdr_zero) goto badheader; @@ -988,8 +1005,10 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep) rep->rr_len -= RPCRDMA_HDRLEN_MIN; status = rep->rr_len; } - /* Fix up the rpc results for upper layer */ - rpcrdma_inline_fixup(rqst, (char *)iptr, rep->rr_len, rdmalen); + + r_xprt->rx_stats.fixup_copy_count += + rpcrdma_inline_fixup(rqst, (char *)iptr, rep->rr_len, + rdmalen); break; case rdma_nomsg: @@ -997,11 +1016,11 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep) if (headerp->rm_body.rm_chunks[0] != xdr_zero || headerp->rm_body.rm_chunks[1] != xdr_zero || headerp->rm_body.rm_chunks[2] != xdr_one || - req->rl_nchunks == 0) + list_empty(&req->rl_registered)) goto badheader; iptr = (__be32 *)((unsigned char *)headerp + RPCRDMA_HDRLEN_MIN); - rdmalen = rpcrdma_count_chunks(rep, req->rl_nchunks, 0, &iptr); + rdmalen = rpcrdma_count_chunks(rep, 0, &iptr); if (rdmalen < 0) goto badheader; r_xprt->rx_stats.total_rdma_reply += rdmalen; @@ -1014,14 +1033,9 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep) badheader: default: - dprintk("%s: invalid rpcrdma reply header (type %d):" - " chunks[012] == %d %d %d" - " expected chunks <= %d\n", - __func__, be32_to_cpu(headerp->rm_type), - headerp->rm_body.rm_chunks[0], - headerp->rm_body.rm_chunks[1], - headerp->rm_body.rm_chunks[2], - req->rl_nchunks); + dprintk("RPC: %5u %s: invalid rpcrdma reply (type %u)\n", + rqst->rq_task->tk_pid, __func__, + be32_to_cpu(headerp->rm_type)); status = -EIO; r_xprt->rx_stats.bad_reply_count++; break; @@ -1035,7 +1049,7 @@ out: * control: waking the next RPC waits until this RPC has * relinquished all its Send Queue entries. */ - if (req->rl_nchunks) + if (!list_empty(&req->rl_registered)) r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt, req); spin_lock_bh(&xprt->transport_lock); diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c index 99d2e5b72..81f0e879f 100644 --- a/net/sunrpc/xprtrdma/transport.c +++ b/net/sunrpc/xprtrdma/transport.c @@ -558,7 +558,6 @@ out_sendbuf: out_fail: rpcrdma_buffer_put(req); - r_xprt->rx_stats.failed_marshal_count++; return NULL; } @@ -590,8 +589,19 @@ xprt_rdma_free(void *buffer) rpcrdma_buffer_put(req); } -/* +/** + * xprt_rdma_send_request - marshal and send an RPC request + * @task: RPC task with an RPC message in rq_snd_buf + * + * Return values: + * 0: The request has been sent + * ENOTCONN: Caller needs to invoke connect logic then call again + * ENOBUFS: Call again later to send the request + * EIO: A permanent error occurred. The request was not sent, + * and don't try it again + * * send_request invokes the meat of RPC RDMA. It must do the following: + * * 1. Marshal the RPC request into an RPC RDMA request, which means * putting a header in front of data, and creating IOVs for RDMA * from those in the request. @@ -600,7 +610,6 @@ xprt_rdma_free(void *buffer) * the request (rpcrdma_ep_post). * 4. No partial sends are possible in the RPC-RDMA protocol (as in UDP). */ - static int xprt_rdma_send_request(struct rpc_task *task) { @@ -610,6 +619,9 @@ xprt_rdma_send_request(struct rpc_task *task) struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); int rc = 0; + /* On retransmit, remove any previously registered chunks */ + r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req, false); + rc = rpcrdma_marshal_req(rqst); if (rc < 0) goto failed_marshal; @@ -630,11 +642,12 @@ xprt_rdma_send_request(struct rpc_task *task) return 0; failed_marshal: - r_xprt->rx_stats.failed_marshal_count++; dprintk("RPC: %s: rpcrdma_marshal_req failed, status %i\n", __func__, rc); if (rc == -EIO) - return -EIO; + r_xprt->rx_stats.failed_marshal_count++; + if (rc != -ENOTCONN) + return rc; drop_connection: xprt_disconnect_done(xprt); return -ENOTCONN; /* implies disconnect */ @@ -660,7 +673,7 @@ void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq) xprt->stat.bad_xids, xprt->stat.req_u, xprt->stat.bklog_u); - seq_printf(seq, "%lu %lu %lu %llu %llu %llu %llu %lu %lu %lu %lu\n", + seq_printf(seq, "%lu %lu %lu %llu %llu %llu %llu %lu %lu %lu %lu ", r_xprt->rx_stats.read_chunk_count, r_xprt->rx_stats.write_chunk_count, r_xprt->rx_stats.reply_chunk_count, @@ -672,6 +685,10 @@ void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq) r_xprt->rx_stats.failed_marshal_count, r_xprt->rx_stats.bad_reply_count, r_xprt->rx_stats.nomsg_call_count); + seq_printf(seq, "%lu %lu %lu\n", + r_xprt->rx_stats.mrs_recovered, + r_xprt->rx_stats.mrs_orphaned, + r_xprt->rx_stats.mrs_allocated); } static int @@ -741,7 +758,6 @@ void xprt_rdma_cleanup(void) __func__, rc); rpcrdma_destroy_wq(); - frwr_destroy_recovery_wq(); rc = xprt_unregister_transport(&xprt_rdma_bc); if (rc) @@ -753,20 +769,13 @@ int xprt_rdma_init(void) { int rc; - rc = frwr_alloc_recovery_wq(); - if (rc) - return rc; - rc = rpcrdma_alloc_wq(); - if (rc) { - frwr_destroy_recovery_wq(); + if (rc) return rc; - } rc = xprt_register_transport(&xprt_rdma); if (rc) { rpcrdma_destroy_wq(); - frwr_destroy_recovery_wq(); return rc; } @@ -774,7 +783,6 @@ int xprt_rdma_init(void) if (rc) { xprt_unregister_transport(&xprt_rdma); rpcrdma_destroy_wq(); - frwr_destroy_recovery_wq(); return rc; } diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index b044d98a1..799cce6cb 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -51,6 +51,7 @@ #include <linux/slab.h> #include <linux/prefetch.h> #include <linux/sunrpc/addr.h> +#include <linux/sunrpc/svc_rdma.h> #include <asm/bitops.h> #include <linux/module.h> /* try_module_get()/module_put() */ @@ -379,8 +380,6 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg) struct rpcrdma_ia *ia = &xprt->rx_ia; int rc; - ia->ri_dma_mr = NULL; - ia->ri_id = rpcrdma_create_id(xprt, ia, addr); if (IS_ERR(ia->ri_id)) { rc = PTR_ERR(ia->ri_id); @@ -391,47 +390,29 @@ rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg) ia->ri_pd = ib_alloc_pd(ia->ri_device); if (IS_ERR(ia->ri_pd)) { rc = PTR_ERR(ia->ri_pd); - dprintk("RPC: %s: ib_alloc_pd() failed %i\n", - __func__, rc); + pr_err("rpcrdma: ib_alloc_pd() returned %d\n", rc); goto out2; } - if (memreg == RPCRDMA_FRMR) { - if (!(ia->ri_device->attrs.device_cap_flags & - IB_DEVICE_MEM_MGT_EXTENSIONS) || - (ia->ri_device->attrs.max_fast_reg_page_list_len == 0)) { - dprintk("RPC: %s: FRMR registration " - "not supported by HCA\n", __func__); - memreg = RPCRDMA_MTHCAFMR; - } - } - if (memreg == RPCRDMA_MTHCAFMR) { - if (!ia->ri_device->alloc_fmr) { - dprintk("RPC: %s: MTHCAFMR registration " - "not supported by HCA\n", __func__); - rc = -EINVAL; - goto out3; - } - } - switch (memreg) { case RPCRDMA_FRMR: - ia->ri_ops = &rpcrdma_frwr_memreg_ops; - break; - case RPCRDMA_ALLPHYSICAL: - ia->ri_ops = &rpcrdma_physical_memreg_ops; - break; + if (frwr_is_supported(ia)) { + ia->ri_ops = &rpcrdma_frwr_memreg_ops; + break; + } + /*FALLTHROUGH*/ case RPCRDMA_MTHCAFMR: - ia->ri_ops = &rpcrdma_fmr_memreg_ops; - break; + if (fmr_is_supported(ia)) { + ia->ri_ops = &rpcrdma_fmr_memreg_ops; + break; + } + /*FALLTHROUGH*/ default: - printk(KERN_ERR "RPC: Unsupported memory " - "registration mode: %d\n", memreg); - rc = -ENOMEM; + pr_err("rpcrdma: Unsupported memory registration mode: %d\n", + memreg); + rc = -EINVAL; goto out3; } - dprintk("RPC: %s: memory registration strategy is '%s'\n", - __func__, ia->ri_ops->ro_displayname); return 0; @@ -585,8 +566,6 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, out2: ib_free_cq(sendcq); out1: - if (ia->ri_dma_mr) - ib_dereg_mr(ia->ri_dma_mr); return rc; } @@ -600,8 +579,6 @@ out1: void rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) { - int rc; - dprintk("RPC: %s: entering, connected is %d\n", __func__, ep->rep_connected); @@ -615,12 +592,6 @@ rpcrdma_ep_destroy(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) ib_free_cq(ep->rep_attr.recv_cq); ib_free_cq(ep->rep_attr.send_cq); - - if (ia->ri_dma_mr) { - rc = ib_dereg_mr(ia->ri_dma_mr); - dprintk("RPC: %s: ib_dereg_mr returned %i\n", - __func__, rc); - } } /* @@ -777,6 +748,90 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) ib_drain_qp(ia->ri_id->qp); } +static void +rpcrdma_mr_recovery_worker(struct work_struct *work) +{ + struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer, + rb_recovery_worker.work); + struct rpcrdma_mw *mw; + + spin_lock(&buf->rb_recovery_lock); + while (!list_empty(&buf->rb_stale_mrs)) { + mw = list_first_entry(&buf->rb_stale_mrs, + struct rpcrdma_mw, mw_list); + list_del_init(&mw->mw_list); + spin_unlock(&buf->rb_recovery_lock); + + dprintk("RPC: %s: recovering MR %p\n", __func__, mw); + mw->mw_xprt->rx_ia.ri_ops->ro_recover_mr(mw); + + spin_lock(&buf->rb_recovery_lock); + } + spin_unlock(&buf->rb_recovery_lock); +} + +void +rpcrdma_defer_mr_recovery(struct rpcrdma_mw *mw) +{ + struct rpcrdma_xprt *r_xprt = mw->mw_xprt; + struct rpcrdma_buffer *buf = &r_xprt->rx_buf; + + spin_lock(&buf->rb_recovery_lock); + list_add(&mw->mw_list, &buf->rb_stale_mrs); + spin_unlock(&buf->rb_recovery_lock); + + schedule_delayed_work(&buf->rb_recovery_worker, 0); +} + +static void +rpcrdma_create_mrs(struct rpcrdma_xprt *r_xprt) +{ + struct rpcrdma_buffer *buf = &r_xprt->rx_buf; + struct rpcrdma_ia *ia = &r_xprt->rx_ia; + unsigned int count; + LIST_HEAD(free); + LIST_HEAD(all); + + for (count = 0; count < 32; count++) { + struct rpcrdma_mw *mw; + int rc; + + mw = kzalloc(sizeof(*mw), GFP_KERNEL); + if (!mw) + break; + + rc = ia->ri_ops->ro_init_mr(ia, mw); + if (rc) { + kfree(mw); + break; + } + + mw->mw_xprt = r_xprt; + + list_add(&mw->mw_list, &free); + list_add(&mw->mw_all, &all); + } + + spin_lock(&buf->rb_mwlock); + list_splice(&free, &buf->rb_mws); + list_splice(&all, &buf->rb_all); + r_xprt->rx_stats.mrs_allocated += count; + spin_unlock(&buf->rb_mwlock); + + dprintk("RPC: %s: created %u MRs\n", __func__, count); +} + +static void +rpcrdma_mr_refresh_worker(struct work_struct *work) +{ + struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer, + rb_refresh_worker.work); + struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt, + rx_buf); + + rpcrdma_create_mrs(r_xprt); +} + struct rpcrdma_req * rpcrdma_create_req(struct rpcrdma_xprt *r_xprt) { @@ -793,6 +848,7 @@ rpcrdma_create_req(struct rpcrdma_xprt *r_xprt) spin_unlock(&buffer->rb_reqslock); req->rl_cqe.done = rpcrdma_wc_send; req->rl_buffer = &r_xprt->rx_buf; + INIT_LIST_HEAD(&req->rl_registered); return req; } @@ -832,17 +888,23 @@ int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) { struct rpcrdma_buffer *buf = &r_xprt->rx_buf; - struct rpcrdma_ia *ia = &r_xprt->rx_ia; int i, rc; buf->rb_max_requests = r_xprt->rx_data.max_requests; buf->rb_bc_srv_max_requests = 0; - spin_lock_init(&buf->rb_lock); atomic_set(&buf->rb_credits, 1); + spin_lock_init(&buf->rb_mwlock); + spin_lock_init(&buf->rb_lock); + spin_lock_init(&buf->rb_recovery_lock); + INIT_LIST_HEAD(&buf->rb_mws); + INIT_LIST_HEAD(&buf->rb_all); + INIT_LIST_HEAD(&buf->rb_stale_mrs); + INIT_DELAYED_WORK(&buf->rb_refresh_worker, + rpcrdma_mr_refresh_worker); + INIT_DELAYED_WORK(&buf->rb_recovery_worker, + rpcrdma_mr_recovery_worker); - rc = ia->ri_ops->ro_init(r_xprt); - if (rc) - goto out; + rpcrdma_create_mrs(r_xprt); INIT_LIST_HEAD(&buf->rb_send_bufs); INIT_LIST_HEAD(&buf->rb_allreqs); @@ -862,7 +924,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) } INIT_LIST_HEAD(&buf->rb_recv_bufs); - for (i = 0; i < buf->rb_max_requests + 2; i++) { + for (i = 0; i < buf->rb_max_requests + RPCRDMA_MAX_BC_REQUESTS; i++) { struct rpcrdma_rep *rep; rep = rpcrdma_create_rep(r_xprt); @@ -918,17 +980,46 @@ rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req) kfree(req); } +static void +rpcrdma_destroy_mrs(struct rpcrdma_buffer *buf) +{ + struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt, + rx_buf); + struct rpcrdma_ia *ia = rdmab_to_ia(buf); + struct rpcrdma_mw *mw; + unsigned int count; + + count = 0; + spin_lock(&buf->rb_mwlock); + while (!list_empty(&buf->rb_all)) { + mw = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all); + list_del(&mw->mw_all); + + spin_unlock(&buf->rb_mwlock); + ia->ri_ops->ro_release_mr(mw); + count++; + spin_lock(&buf->rb_mwlock); + } + spin_unlock(&buf->rb_mwlock); + r_xprt->rx_stats.mrs_allocated = 0; + + dprintk("RPC: %s: released %u MRs\n", __func__, count); +} + void rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) { struct rpcrdma_ia *ia = rdmab_to_ia(buf); + cancel_delayed_work_sync(&buf->rb_recovery_worker); + while (!list_empty(&buf->rb_recv_bufs)) { struct rpcrdma_rep *rep; rep = rpcrdma_buffer_get_rep_locked(buf); rpcrdma_destroy_rep(ia, rep); } + buf->rb_send_count = 0; spin_lock(&buf->rb_reqslock); while (!list_empty(&buf->rb_allreqs)) { @@ -943,8 +1034,9 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) spin_lock(&buf->rb_reqslock); } spin_unlock(&buf->rb_reqslock); + buf->rb_recv_count = 0; - ia->ri_ops->ro_destroy(buf); + rpcrdma_destroy_mrs(buf); } struct rpcrdma_mw * @@ -962,8 +1054,17 @@ rpcrdma_get_mw(struct rpcrdma_xprt *r_xprt) spin_unlock(&buf->rb_mwlock); if (!mw) - pr_err("RPC: %s: no MWs available\n", __func__); + goto out_nomws; return mw; + +out_nomws: + dprintk("RPC: %s: no MWs available\n", __func__); + schedule_delayed_work(&buf->rb_refresh_worker, 0); + + /* Allow the reply handler and refresh worker to run */ + cond_resched(); + + return NULL; } void @@ -976,6 +1077,23 @@ rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw) spin_unlock(&buf->rb_mwlock); } +static struct rpcrdma_rep * +rpcrdma_buffer_get_rep(struct rpcrdma_buffer *buffers) +{ + /* If an RPC previously completed without a reply (say, a + * credential problem or a soft timeout occurs) then hold off + * on supplying more Receive buffers until the number of new + * pending RPCs catches up to the number of posted Receives. + */ + if (unlikely(buffers->rb_send_count < buffers->rb_recv_count)) + return NULL; + + if (unlikely(list_empty(&buffers->rb_recv_bufs))) + return NULL; + buffers->rb_recv_count++; + return rpcrdma_buffer_get_rep_locked(buffers); +} + /* * Get a set of request/reply buffers. * @@ -989,10 +1107,9 @@ rpcrdma_buffer_get(struct rpcrdma_buffer *buffers) spin_lock(&buffers->rb_lock); if (list_empty(&buffers->rb_send_bufs)) goto out_reqbuf; + buffers->rb_send_count++; req = rpcrdma_buffer_get_req_locked(buffers); - if (list_empty(&buffers->rb_recv_bufs)) - goto out_repbuf; - req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers); + req->rl_reply = rpcrdma_buffer_get_rep(buffers); spin_unlock(&buffers->rb_lock); return req; @@ -1000,11 +1117,6 @@ out_reqbuf: spin_unlock(&buffers->rb_lock); pr_warn("RPC: %s: out of request buffers\n", __func__); return NULL; -out_repbuf: - spin_unlock(&buffers->rb_lock); - pr_warn("RPC: %s: out of reply buffers\n", __func__); - req->rl_reply = NULL; - return req; } /* @@ -1021,9 +1133,12 @@ rpcrdma_buffer_put(struct rpcrdma_req *req) req->rl_reply = NULL; spin_lock(&buffers->rb_lock); + buffers->rb_send_count--; list_add_tail(&req->rl_free, &buffers->rb_send_bufs); - if (rep) + if (rep) { + buffers->rb_recv_count--; list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs); + } spin_unlock(&buffers->rb_lock); } @@ -1037,8 +1152,7 @@ rpcrdma_recv_buffer_get(struct rpcrdma_req *req) struct rpcrdma_buffer *buffers = req->rl_buffer; spin_lock(&buffers->rb_lock); - if (!list_empty(&buffers->rb_recv_bufs)) - req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers); + req->rl_reply = rpcrdma_buffer_get_rep(buffers); spin_unlock(&buffers->rb_lock); } @@ -1052,6 +1166,7 @@ rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep) struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf; spin_lock(&buffers->rb_lock); + buffers->rb_recv_count--; list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs); spin_unlock(&buffers->rb_lock); } @@ -1060,14 +1175,6 @@ rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep) * Wrappers for internal-use kmalloc memory registration, used by buffer code. */ -void -rpcrdma_mapping_error(struct rpcrdma_mr_seg *seg) -{ - dprintk("RPC: map_one: offset %p iova %llx len %zu\n", - seg->mr_offset, - (unsigned long long)seg->mr_dma, seg->mr_dmalen); -} - /** * rpcrdma_alloc_regbuf - kmalloc and register memory for SEND/RECV buffers * @ia: controlling rpcrdma_ia @@ -1150,7 +1257,7 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia, if (rep) { rc = rpcrdma_ep_post_recv(ia, ep, rep); if (rc) - goto out; + return rc; req->rl_reply = NULL; } @@ -1175,10 +1282,12 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia, rc = ib_post_send(ia->ri_id->qp, &send_wr, &send_wr_fail); if (rc) - dprintk("RPC: %s: ib_post_send returned %i\n", __func__, - rc); -out: - return rc; + goto out_postsend_err; + return 0; + +out_postsend_err: + pr_err("rpcrdma: RDMA Send ib_post_send returned %i\n", rc); + return -ENOTCONN; } /* @@ -1203,11 +1312,13 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia, DMA_BIDIRECTIONAL); rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail); - if (rc) - dprintk("RPC: %s: ib_post_recv returned %i\n", __func__, - rc); - return rc; + goto out_postrecv; + return 0; + +out_postrecv: + pr_err("rpcrdma: ib_post_recv returned %i\n", rc); + return -ENOTCONN; } /** diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index c53abd128..a71b0f589 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -68,7 +68,6 @@ struct rpcrdma_ia { struct ib_device *ri_device; struct rdma_cm_id *ri_id; struct ib_pd *ri_pd; - struct ib_mr *ri_dma_mr; struct completion ri_done; int ri_async_rc; unsigned int ri_max_frmr_depth; @@ -172,23 +171,14 @@ rdmab_to_msg(struct rpcrdma_regbuf *rb) * o recv buffer (posted to provider) * o ib_sge (also donated to provider) * o status of reply (length, success or not) - * o bookkeeping state to get run by tasklet (list, etc) + * o bookkeeping state to get run by reply handler (list, etc) * - * These are allocated during initialization, per-transport instance; - * however, the tasklet execution list itself is global, as it should - * always be pretty short. + * These are allocated during initialization, per-transport instance. * * N of these are associated with a transport instance, and stored in * struct rpcrdma_buffer. N is the max number of outstanding requests. */ -#define RPCRDMA_MAX_DATA_SEGS ((1 * 1024 * 1024) / PAGE_SIZE) - -/* data segments + head/tail for Call + head/tail for Reply */ -#define RPCRDMA_MAX_SEGS (RPCRDMA_MAX_DATA_SEGS + 4) - -struct rpcrdma_buffer; - struct rpcrdma_rep { struct ib_cqe rr_cqe; unsigned int rr_len; @@ -232,8 +222,8 @@ struct rpcrdma_frmr { }; struct rpcrdma_fmr { - struct ib_fmr *fmr; - u64 *physaddrs; + struct ib_fmr *fm_mr; + u64 *fm_physaddrs; }; struct rpcrdma_mw { @@ -245,8 +235,10 @@ struct rpcrdma_mw { struct rpcrdma_fmr fmr; struct rpcrdma_frmr frmr; }; - struct work_struct mw_work; struct rpcrdma_xprt *mw_xprt; + u32 mw_handle; + u32 mw_length; + u64 mw_offset; struct list_head mw_all; }; @@ -266,33 +258,30 @@ struct rpcrdma_mw { * of iovs for send operations. The reason is that the iovs passed to * ib_post_{send,recv} must not be modified until the work request * completes. - * - * NOTES: - * o RPCRDMA_MAX_SEGS is the max number of addressible chunk elements we - * marshal. The number needed varies depending on the iov lists that - * are passed to us, the memory registration mode we are in, and if - * physical addressing is used, the layout. */ +/* Maximum number of page-sized "segments" per chunk list to be + * registered or invalidated. Must handle a Reply chunk: + */ +enum { + RPCRDMA_MAX_IOV_SEGS = 3, + RPCRDMA_MAX_DATA_SEGS = ((1 * 1024 * 1024) / PAGE_SIZE) + 1, + RPCRDMA_MAX_SEGS = RPCRDMA_MAX_DATA_SEGS + + RPCRDMA_MAX_IOV_SEGS, +}; + struct rpcrdma_mr_seg { /* chunk descriptors */ - struct rpcrdma_mw *rl_mw; /* registered MR */ - u64 mr_base; /* registration result */ - u32 mr_rkey; /* registration result */ u32 mr_len; /* length of chunk or segment */ - int mr_nsegs; /* number of segments in chunk or 0 */ - enum dma_data_direction mr_dir; /* segment mapping direction */ - dma_addr_t mr_dma; /* segment mapping address */ - size_t mr_dmalen; /* segment mapping length */ struct page *mr_page; /* owning page, if any */ char *mr_offset; /* kva if no page, else offset */ }; #define RPCRDMA_MAX_IOVS (2) +struct rpcrdma_buffer; struct rpcrdma_req { struct list_head rl_free; unsigned int rl_niovs; - unsigned int rl_nchunks; unsigned int rl_connect_cookie; struct rpc_task *rl_task; struct rpcrdma_buffer *rl_buffer; @@ -300,12 +289,13 @@ struct rpcrdma_req { struct ib_sge rl_send_iov[RPCRDMA_MAX_IOVS]; struct rpcrdma_regbuf *rl_rdmabuf; struct rpcrdma_regbuf *rl_sendbuf; - struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS]; - struct rpcrdma_mr_seg *rl_nextseg; struct ib_cqe rl_cqe; struct list_head rl_all; bool rl_backchannel; + + struct list_head rl_registered; /* registered segments */ + struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS]; }; static inline struct rpcrdma_req * @@ -331,6 +321,7 @@ struct rpcrdma_buffer { char *rb_pool; spinlock_t rb_lock; /* protect buf lists */ + int rb_send_count, rb_recv_count; struct list_head rb_send_bufs; struct list_head rb_recv_bufs; u32 rb_max_requests; @@ -341,6 +332,11 @@ struct rpcrdma_buffer { struct list_head rb_allreqs; u32 rb_bc_max_requests; + + spinlock_t rb_recovery_lock; /* protect rb_stale_mrs */ + struct list_head rb_stale_mrs; + struct delayed_work rb_recovery_worker; + struct delayed_work rb_refresh_worker; }; #define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia) @@ -387,6 +383,9 @@ struct rpcrdma_stats { unsigned long bad_reply_count; unsigned long nomsg_call_count; unsigned long bcall_count; + unsigned long mrs_recovered; + unsigned long mrs_orphaned; + unsigned long mrs_allocated; }; /* @@ -395,23 +394,25 @@ struct rpcrdma_stats { struct rpcrdma_xprt; struct rpcrdma_memreg_ops { int (*ro_map)(struct rpcrdma_xprt *, - struct rpcrdma_mr_seg *, int, bool); + struct rpcrdma_mr_seg *, int, bool, + struct rpcrdma_mw **); void (*ro_unmap_sync)(struct rpcrdma_xprt *, struct rpcrdma_req *); void (*ro_unmap_safe)(struct rpcrdma_xprt *, struct rpcrdma_req *, bool); + void (*ro_recover_mr)(struct rpcrdma_mw *); int (*ro_open)(struct rpcrdma_ia *, struct rpcrdma_ep *, struct rpcrdma_create_data_internal *); size_t (*ro_maxpages)(struct rpcrdma_xprt *); - int (*ro_init)(struct rpcrdma_xprt *); - void (*ro_destroy)(struct rpcrdma_buffer *); + int (*ro_init_mr)(struct rpcrdma_ia *, + struct rpcrdma_mw *); + void (*ro_release_mr)(struct rpcrdma_mw *); const char *ro_displayname; }; extern const struct rpcrdma_memreg_ops rpcrdma_fmr_memreg_ops; extern const struct rpcrdma_memreg_ops rpcrdma_frwr_memreg_ops; -extern const struct rpcrdma_memreg_ops rpcrdma_physical_memreg_ops; /* * RPCRDMA transport -- encapsulates the structures above for @@ -446,6 +447,8 @@ extern int xprt_rdma_pad_optimize; */ int rpcrdma_ia_open(struct rpcrdma_xprt *, struct sockaddr *, int); void rpcrdma_ia_close(struct rpcrdma_ia *); +bool frwr_is_supported(struct rpcrdma_ia *); +bool fmr_is_supported(struct rpcrdma_ia *); /* * Endpoint calls - xprtrdma/verbs.c @@ -477,6 +480,8 @@ void rpcrdma_buffer_put(struct rpcrdma_req *); void rpcrdma_recv_buffer_get(struct rpcrdma_req *); void rpcrdma_recv_buffer_put(struct rpcrdma_rep *); +void rpcrdma_defer_mr_recovery(struct rpcrdma_mw *); + struct rpcrdma_regbuf *rpcrdma_alloc_regbuf(struct rpcrdma_ia *, size_t, gfp_t); void rpcrdma_free_regbuf(struct rpcrdma_ia *, @@ -484,9 +489,6 @@ void rpcrdma_free_regbuf(struct rpcrdma_ia *, int rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *, unsigned int); -int frwr_alloc_recovery_wq(void); -void frwr_destroy_recovery_wq(void); - int rpcrdma_alloc_wq(void); void rpcrdma_destroy_wq(void); @@ -494,45 +496,12 @@ void rpcrdma_destroy_wq(void); * Wrappers for chunk registration, shared by read/write chunk code. */ -void rpcrdma_mapping_error(struct rpcrdma_mr_seg *); - static inline enum dma_data_direction rpcrdma_data_dir(bool writing) { return writing ? DMA_FROM_DEVICE : DMA_TO_DEVICE; } -static inline void -rpcrdma_map_one(struct ib_device *device, struct rpcrdma_mr_seg *seg, - enum dma_data_direction direction) -{ - seg->mr_dir = direction; - seg->mr_dmalen = seg->mr_len; - - if (seg->mr_page) - seg->mr_dma = ib_dma_map_page(device, - seg->mr_page, offset_in_page(seg->mr_offset), - seg->mr_dmalen, seg->mr_dir); - else - seg->mr_dma = ib_dma_map_single(device, - seg->mr_offset, - seg->mr_dmalen, seg->mr_dir); - - if (ib_dma_mapping_error(device, seg->mr_dma)) - rpcrdma_mapping_error(seg); -} - -static inline void -rpcrdma_unmap_one(struct ib_device *device, struct rpcrdma_mr_seg *seg) -{ - if (seg->mr_page) - ib_dma_unmap_page(device, - seg->mr_dma, seg->mr_dmalen, seg->mr_dir); - else - ib_dma_unmap_single(device, - seg->mr_dma, seg->mr_dmalen, seg->mr_dir); -} - /* * RPC/RDMA connection management calls - xprtrdma/rpc_rdma.c */ diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c index 167cf5931..bf168838a 100644 --- a/net/sunrpc/xprtsock.c +++ b/net/sunrpc/xprtsock.c @@ -124,7 +124,7 @@ static struct ctl_table xs_tunables_table[] = { .mode = 0644, .proc_handler = proc_dointvec_minmax, .extra1 = &xprt_min_resvport_limit, - .extra2 = &xprt_max_resvport_limit + .extra2 = &xprt_max_resvport }, { .procname = "max_resvport", @@ -132,7 +132,7 @@ static struct ctl_table xs_tunables_table[] = { .maxlen = sizeof(unsigned int), .mode = 0644, .proc_handler = proc_dointvec_minmax, - .extra1 = &xprt_min_resvport_limit, + .extra1 = &xprt_min_resvport, .extra2 = &xprt_max_resvport_limit }, { @@ -177,7 +177,6 @@ static struct ctl_table sunrpc_table[] = { * increase over time if the server is down or not responding. */ #define XS_TCP_INIT_REEST_TO (3U * HZ) -#define XS_TCP_MAX_REEST_TO (5U * 60 * HZ) /* * TCP idle timeout; client drops the transport socket if it is idle @@ -642,6 +641,7 @@ static int xs_tcp_send_request(struct rpc_task *task) struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); struct xdr_buf *xdr = &req->rq_snd_buf; bool zerocopy = true; + bool vm_wait = false; int status; int sent; @@ -677,15 +677,33 @@ static int xs_tcp_send_request(struct rpc_task *task) return 0; } + WARN_ON_ONCE(sent == 0 && status == 0); + + if (status == -EAGAIN ) { + /* + * Return EAGAIN if we're sure we're hitting the + * socket send buffer limits. + */ + if (test_bit(SOCK_NOSPACE, &transport->sock->flags)) + break; + /* + * Did we hit a memory allocation failure? + */ + if (sent == 0) { + status = -ENOBUFS; + if (vm_wait) + break; + /* Retry, knowing now that we're below the + * socket send buffer limit + */ + vm_wait = true; + } + continue; + } if (status < 0) break; - if (sent == 0) { - status = -EAGAIN; - break; - } + vm_wait = false; } - if (status == -EAGAIN && sk_stream_is_writeable(transport->inet)) - status = -ENOBUFS; switch (status) { case -ENOTSOCK: @@ -755,11 +773,19 @@ static void xs_restore_old_callbacks(struct sock_xprt *transport, struct sock *s sk->sk_error_report = transport->old_error_report; } +static void xs_sock_reset_state_flags(struct rpc_xprt *xprt) +{ + struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); + + clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state); +} + static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt) { smp_mb__before_atomic(); clear_bit(XPRT_CLOSE_WAIT, &xprt->state); clear_bit(XPRT_CLOSING, &xprt->state); + xs_sock_reset_state_flags(xprt); smp_mb__after_atomic(); } @@ -962,10 +988,13 @@ static void xs_local_data_receive(struct sock_xprt *transport) goto out; for (;;) { skb = skb_recv_datagram(sk, 0, 1, &err); - if (skb == NULL) + if (skb != NULL) { + xs_local_data_read_skb(&transport->xprt, sk, skb); + skb_free_datagram(sk, skb); + continue; + } + if (!test_and_clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state)) break; - xs_local_data_read_skb(&transport->xprt, sk, skb); - skb_free_datagram(sk, skb); } out: mutex_unlock(&transport->recv_mutex); @@ -1043,10 +1072,13 @@ static void xs_udp_data_receive(struct sock_xprt *transport) goto out; for (;;) { skb = skb_recv_datagram(sk, 0, 1, &err); - if (skb == NULL) + if (skb != NULL) { + xs_udp_data_read_skb(&transport->xprt, sk, skb); + skb_free_datagram_locked(sk, skb); + continue; + } + if (!test_and_clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state)) break; - xs_udp_data_read_skb(&transport->xprt, sk, skb); - skb_free_datagram(sk, skb); } out: mutex_unlock(&transport->recv_mutex); @@ -1074,7 +1106,14 @@ static void xs_data_ready(struct sock *sk) if (xprt != NULL) { struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); - queue_work(rpciod_workqueue, &transport->recv_worker); + transport->old_data_ready(sk); + /* Any data means we had a useful conversation, so + * then we don't need to delay the next reconnect + */ + if (xprt->reestablish_timeout) + xprt->reestablish_timeout = 0; + if (!test_and_set_bit(XPRT_SOCK_DATA_READY, &transport->sock_state)) + queue_work(xprtiod_workqueue, &transport->recv_worker); } read_unlock_bh(&sk->sk_callback_lock); } @@ -1474,10 +1513,15 @@ static void xs_tcp_data_receive(struct sock_xprt *transport) for (;;) { lock_sock(sk); read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv); - release_sock(sk); - if (read <= 0) - break; - total += read; + if (read <= 0) { + clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state); + release_sock(sk); + if (!test_bit(XPRT_SOCK_DATA_READY, &transport->sock_state)) + break; + } else { + release_sock(sk); + total += read; + } rd_desc.count = 65536; } out: @@ -1493,34 +1537,6 @@ static void xs_tcp_data_receive_workfn(struct work_struct *work) } /** - * xs_tcp_data_ready - "data ready" callback for TCP sockets - * @sk: socket with data to read - * - */ -static void xs_tcp_data_ready(struct sock *sk) -{ - struct sock_xprt *transport; - struct rpc_xprt *xprt; - - dprintk("RPC: xs_tcp_data_ready...\n"); - - read_lock_bh(&sk->sk_callback_lock); - if (!(xprt = xprt_from_sock(sk))) - goto out; - transport = container_of(xprt, struct sock_xprt, xprt); - - /* Any data means we had a useful conversation, so - * the we don't need to delay the next reconnect - */ - if (xprt->reestablish_timeout) - xprt->reestablish_timeout = 0; - queue_work(rpciod_workqueue, &transport->recv_worker); - -out: - read_unlock_bh(&sk->sk_callback_lock); -} - -/** * xs_tcp_state_change - callback to handle TCP socket state changes * @sk: socket whose state has changed * @@ -1714,7 +1730,7 @@ static void xs_udp_timer(struct rpc_xprt *xprt, struct rpc_task *task) static unsigned short xs_get_random_port(void) { - unsigned short range = xprt_max_resvport - xprt_min_resvport; + unsigned short range = xprt_max_resvport - xprt_min_resvport + 1; unsigned short rand = (unsigned short) prandom_u32() % range; return rand + xprt_min_resvport; } @@ -2156,6 +2172,8 @@ static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) write_unlock_bh(&sk->sk_callback_lock); } xs_udp_do_set_buffer_size(xprt); + + xprt->stat.connect_start = jiffies; } static void xs_udp_setup_socket(struct work_struct *work) @@ -2219,6 +2237,7 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) unsigned int keepcnt = xprt->timeout->to_retries + 1; unsigned int opt_on = 1; unsigned int timeo; + unsigned int addr_pref = IPV6_PREFER_SRC_PUBLIC; /* TCP Keepalive options */ kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, @@ -2230,6 +2249,16 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) kernel_setsockopt(sock, SOL_TCP, TCP_KEEPCNT, (char *)&keepcnt, sizeof(keepcnt)); + /* Avoid temporary address, they are bad for long-lived + * connections such as NFS mounts. + * RFC4941, section 3.6 suggests that: + * Individual applications, which have specific + * knowledge about the normal duration of connections, + * MAY override this as appropriate. + */ + kernel_setsockopt(sock, SOL_IPV6, IPV6_ADDR_PREFERENCES, + (char *)&addr_pref, sizeof(addr_pref)); + /* TCP user timeout (see RFC5482) */ timeo = jiffies_to_msecs(xprt->timeout->to_initval) * (xprt->timeout->to_retries + 1); @@ -2241,7 +2270,7 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) xs_save_old_callbacks(transport, sk); sk->sk_user_data = xprt; - sk->sk_data_ready = xs_tcp_data_ready; + sk->sk_data_ready = xs_data_ready; sk->sk_state_change = xs_tcp_state_change; sk->sk_write_space = xs_tcp_write_space; sock_set_flag(sk, SOCK_FASYNC); @@ -2356,6 +2385,25 @@ out: xprt_wake_pending_tasks(xprt, status); } +static unsigned long xs_reconnect_delay(const struct rpc_xprt *xprt) +{ + unsigned long start, now = jiffies; + + start = xprt->stat.connect_start + xprt->reestablish_timeout; + if (time_after(start, now)) + return start - now; + return 0; +} + +static void xs_reconnect_backoff(struct rpc_xprt *xprt) +{ + xprt->reestablish_timeout <<= 1; + if (xprt->reestablish_timeout > xprt->max_reconnect_timeout) + xprt->reestablish_timeout = xprt->max_reconnect_timeout; + if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO) + xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO; +} + /** * xs_connect - connect a socket to a remote endpoint * @xprt: pointer to transport structure @@ -2373,6 +2421,7 @@ out: static void xs_connect(struct rpc_xprt *xprt, struct rpc_task *task) { struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); + unsigned long delay = 0; WARN_ON_ONCE(!xprt_lock_connect(xprt, task, transport)); @@ -2384,19 +2433,15 @@ static void xs_connect(struct rpc_xprt *xprt, struct rpc_task *task) /* Start by resetting any existing state */ xs_reset_transport(transport); - queue_delayed_work(rpciod_workqueue, - &transport->connect_worker, - xprt->reestablish_timeout); - xprt->reestablish_timeout <<= 1; - if (xprt->reestablish_timeout < XS_TCP_INIT_REEST_TO) - xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO; - if (xprt->reestablish_timeout > XS_TCP_MAX_REEST_TO) - xprt->reestablish_timeout = XS_TCP_MAX_REEST_TO; - } else { + delay = xs_reconnect_delay(xprt); + xs_reconnect_backoff(xprt); + + } else dprintk("RPC: xs_connect scheduled xprt %p\n", xprt); - queue_delayed_work(rpciod_workqueue, - &transport->connect_worker, 0); - } + + queue_delayed_work(xprtiod_workqueue, + &transport->connect_worker, + delay); } /** @@ -2948,6 +2993,8 @@ static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args) xprt->ops = &xs_tcp_ops; xprt->timeout = &xs_tcp_default_timeout; + xprt->max_reconnect_timeout = xprt->timeout->to_maxval; + INIT_WORK(&transport->recv_worker, xs_tcp_data_receive_workfn); INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_setup_socket); @@ -3157,8 +3204,12 @@ static int param_set_uint_minmax(const char *val, static int param_set_portnr(const char *val, const struct kernel_param *kp) { - return param_set_uint_minmax(val, kp, + if (kp->arg == &xprt_min_resvport) + return param_set_uint_minmax(val, kp, RPC_MIN_RESVPORT, + xprt_max_resvport); + return param_set_uint_minmax(val, kp, + xprt_min_resvport, RPC_MAX_RESVPORT); } |