diff options
author | André Fabian Silva Delgado <emulatorman@parabola.nu> | 2016-01-20 14:01:31 -0300 |
---|---|---|
committer | André Fabian Silva Delgado <emulatorman@parabola.nu> | 2016-01-20 14:01:31 -0300 |
commit | b4b7ff4b08e691656c9d77c758fc355833128ac0 (patch) | |
tree | 82fcb00e6b918026dc9f2d1f05ed8eee83874cc0 /drivers/staging/lustre/lustre/ptlrpc | |
parent | 35acfa0fc609f2a2cd95cef4a6a9c3a5c38f1778 (diff) |
Linux-libre 4.4-gnupck-4.4-gnu
Diffstat (limited to 'drivers/staging/lustre/lustre/ptlrpc')
24 files changed, 987 insertions, 2551 deletions
diff --git a/drivers/staging/lustre/lustre/ptlrpc/client.c b/drivers/staging/lustre/lustre/ptlrpc/client.c index c83a34a01..a9f1bf536 100644 --- a/drivers/staging/lustre/lustre/ptlrpc/client.c +++ b/drivers/staging/lustre/lustre/ptlrpc/client.c @@ -72,9 +72,11 @@ struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid) lnet_process_id_t peer; int err; - /* ptlrpc_uuid_to_peer() initializes its 2nd parameter - * before accessing its values. */ - /* coverity[uninit_use_in_call] */ + /* + * ptlrpc_uuid_to_peer() initializes its 2nd parameter + * before accessing its values. + * coverity[uninit_use_in_call] + */ err = ptlrpc_uuid_to_peer(uuid, &peer, &self); if (err != 0) { CNETERR("cannot find peer %s!\n", uuid->uuid); @@ -117,8 +119,10 @@ struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned npages, unsigned max_brw, desc->bd_md_count = 0; LASSERT(max_brw > 0); desc->bd_md_max_brw = min(max_brw, PTLRPC_BULK_OPS_COUNT); - /* PTLRPC_BULK_OPS_COUNT is the compile-time transfer limit for this - * node. Negotiated ocd_brw_size will always be <= this number. */ + /* + * PTLRPC_BULK_OPS_COUNT is the compile-time transfer limit for this + * node. Negotiated ocd_brw_size will always be <= this number. + */ for (i = 0; i < PTLRPC_BULK_OPS_COUNT; i++) LNetInvalidateHandle(&desc->bd_mds[i]); @@ -223,8 +227,9 @@ void ptlrpc_at_set_req_timeout(struct ptlrpc_request *req) LASSERT(req->rq_import); if (AT_OFF) { - /* non-AT settings */ - /** + /* + * non-AT settings + * * \a imp_server_timeout means this is reverse import and * we send (currently only) ASTs to the client and cannot afford * to wait too long for the reply, otherwise the other client @@ -240,11 +245,15 @@ void ptlrpc_at_set_req_timeout(struct ptlrpc_request *req) serv_est = at_get(&at->iat_service_estimate[idx]); req->rq_timeout = at_est2timeout(serv_est); } - /* We could get even fancier here, using history to predict increased - loading... */ + /* + * We could get even fancier here, using history to predict increased + * loading... + */ - /* Let the server know what this RPC timeout is by putting it in the - reqmsg*/ + /* + * Let the server know what this RPC timeout is by putting it in the + * reqmsg + */ lustre_msg_set_timeout(req->rq_reqmsg, req->rq_timeout); } EXPORT_SYMBOL(ptlrpc_at_set_req_timeout); @@ -261,8 +270,10 @@ static void ptlrpc_at_adj_service(struct ptlrpc_request *req, at = &req->rq_import->imp_at; idx = import_at_get_index(req->rq_import, req->rq_request_portal); - /* max service estimates are tracked on the server side, - so just keep minimal history here */ + /* + * max service estimates are tracked on the server side, + * so just keep minimal history here + */ oldse = at_measured(&at->iat_service_estimate[idx], serv_est); if (oldse != 0) CDEBUG(D_ADAPTTO, "The RPC service estimate for %s ptl %d has changed from %d to %d\n", @@ -282,12 +293,13 @@ static void ptlrpc_at_adj_net_latency(struct ptlrpc_request *req, { unsigned int nl, oldnl; struct imp_at *at; - time_t now = get_seconds(); + time64_t now = ktime_get_real_seconds(); LASSERT(req->rq_import); if (service_time > now - req->rq_sent + 3) { - /* bz16408, however, this can also happen if early reply + /* + * bz16408, however, this can also happen if early reply * is lost and client RPC is expired and resent, early reply * or reply of original RPC can still be fit in reply buffer * of resent RPC, now client is measuring time from the @@ -298,7 +310,7 @@ static void ptlrpc_at_adj_net_latency(struct ptlrpc_request *req, D_ADAPTTO : D_WARNING, "Reported service time %u > total measured time " CFS_DURATION_T"\n", service_time, - cfs_time_sub(now, req->rq_sent)); + (long)(now - req->rq_sent)); return; } @@ -343,7 +355,7 @@ static int unpack_reply(struct ptlrpc_request *req) static int ptlrpc_at_recv_early_reply(struct ptlrpc_request *req) { struct ptlrpc_request *early_req; - time_t olddl; + time64_t olddl; int rc; req->rq_early = 0; @@ -376,16 +388,18 @@ static int ptlrpc_at_recv_early_reply(struct ptlrpc_request *req) spin_lock(&req->rq_lock); olddl = req->rq_deadline; - /* server assumes it now has rq_timeout from when it sent the - * early reply, so client should give it at least that long. */ - req->rq_deadline = get_seconds() + req->rq_timeout + + /* + * server assumes it now has rq_timeout from when it sent the + * early reply, so client should give it at least that long. + */ + req->rq_deadline = ktime_get_real_seconds() + req->rq_timeout + ptlrpc_at_get_net_latency(req); DEBUG_REQ(D_ADAPTTO, req, - "Early reply #%d, new deadline in " CFS_DURATION_T "s (" CFS_DURATION_T "s)", + "Early reply #%d, new deadline in %lds (%lds)", req->rq_early_count, - cfs_time_sub(req->rq_deadline, get_seconds()), - cfs_time_sub(req->rq_deadline, olddl)); + (long)(req->rq_deadline - ktime_get_real_seconds()), + (long)(req->rq_deadline - olddl)); return rc; } @@ -409,13 +423,13 @@ struct ptlrpc_request *ptlrpc_request_cache_alloc(gfp_t flags) { struct ptlrpc_request *req; - OBD_SLAB_ALLOC_PTR_GFP(req, request_cache, flags); + req = kmem_cache_alloc(request_cache, flags | __GFP_ZERO); return req; } void ptlrpc_request_cache_free(struct ptlrpc_request *req) { - OBD_SLAB_FREE_PTR(req, request_cache); + kmem_cache_free(request_cache, req); } /** @@ -446,7 +460,7 @@ EXPORT_SYMBOL(ptlrpc_free_rq_pool); /** * Allocates, initializes and adds \a num_rq requests to the pool \a pool */ -void ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq) +int ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq) { int i; int size = 1; @@ -468,11 +482,11 @@ void ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq) spin_unlock(&pool->prp_lock); req = ptlrpc_request_cache_alloc(GFP_NOFS); if (!req) - return; + return i; msg = libcfs_kvzalloc(size, GFP_NOFS); if (!msg) { ptlrpc_request_cache_free(req); - return; + return i; } req->rq_reqbuf = msg; req->rq_reqbuf_len = size; @@ -481,6 +495,7 @@ void ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq) list_add_tail(&req->rq_list, &pool->prp_req_list); } spin_unlock(&pool->prp_lock); + return num_rq; } EXPORT_SYMBOL(ptlrpc_add_rqs_to_pool); @@ -494,7 +509,7 @@ EXPORT_SYMBOL(ptlrpc_add_rqs_to_pool); */ struct ptlrpc_request_pool * ptlrpc_init_rq_pool(int num_rq, int msgsize, - void (*populate_pool)(struct ptlrpc_request_pool *, int)) + int (*populate_pool)(struct ptlrpc_request_pool *, int)) { struct ptlrpc_request_pool *pool; @@ -502,8 +517,10 @@ ptlrpc_init_rq_pool(int num_rq, int msgsize, if (!pool) return NULL; - /* Request next power of two for the allocation, because internally - kernel would do exactly this */ + /* + * Request next power of two for the allocation, because internally + * kernel would do exactly this + */ spin_lock_init(&pool->prp_lock); INIT_LIST_HEAD(&pool->prp_req_list); @@ -512,11 +529,6 @@ ptlrpc_init_rq_pool(int num_rq, int msgsize, populate_pool(pool, num_rq); - if (list_empty(&pool->prp_req_list)) { - /* have not allocated a single request for the pool */ - kfree(pool); - pool = NULL; - } return pool; } EXPORT_SYMBOL(ptlrpc_init_rq_pool); @@ -535,10 +547,12 @@ ptlrpc_prep_req_from_pool(struct ptlrpc_request_pool *pool) spin_lock(&pool->prp_lock); - /* See if we have anything in a pool, and bail out if nothing, + /* + * See if we have anything in a pool, and bail out if nothing, * in writeout path, where this matters, this is safe to do, because * nothing is lost in this case, and when some in-flight requests - * complete, this code will be called again. */ + * complete, this code will be called again. + */ if (unlikely(list_empty(&pool->prp_req_list))) { spin_unlock(&pool->prp_lock); return NULL; @@ -664,11 +678,13 @@ int ptlrpc_request_pack(struct ptlrpc_request *request, __u32 version, int opcode) { int rc; + rc = ptlrpc_request_bufs_pack(request, version, opcode, NULL, NULL); if (rc) return rc; - /* For some old 1.8 clients (< 1.8.7), they will LASSERT the size of + /* + * For some old 1.8 clients (< 1.8.7), they will LASSERT the size of * ptlrpc_body sent from server equal to local ptlrpc_body size, so we * have to send old ptlrpc_body to keep interoperability with these * clients. @@ -700,13 +716,12 @@ static inline struct ptlrpc_request *__ptlrpc_request_alloc(struct obd_import *imp, struct ptlrpc_request_pool *pool) { - struct ptlrpc_request *request = NULL; + struct ptlrpc_request *request; - if (pool) - request = ptlrpc_prep_req_from_pool(pool); + request = ptlrpc_request_cache_alloc(GFP_NOFS); - if (!request) - request = ptlrpc_request_cache_alloc(GFP_NOFS); + if (!request && pool) + request = ptlrpc_prep_req_from_pool(pool); if (request) { LASSERTF((unsigned long)imp > 0x1000, "%p", imp); @@ -807,56 +822,17 @@ struct ptlrpc_request *ptlrpc_request_alloc_pack(struct obd_import *imp, EXPORT_SYMBOL(ptlrpc_request_alloc_pack); /** - * Prepare request (fetched from pool \a pool if not NULL) on import \a imp - * for operation \a opcode. Request would contain \a count buffers. - * Sizes of buffers are described in array \a lengths and buffers themselves - * are provided by a pointer \a bufs. - * Returns prepared request structure pointer or NULL on error. - */ -struct ptlrpc_request * -ptlrpc_prep_req_pool(struct obd_import *imp, - __u32 version, int opcode, - int count, __u32 *lengths, char **bufs, - struct ptlrpc_request_pool *pool) -{ - struct ptlrpc_request *request; - int rc; - - request = __ptlrpc_request_alloc(imp, pool); - if (!request) - return NULL; - - rc = __ptlrpc_request_bufs_pack(request, version, opcode, count, - lengths, bufs, NULL); - if (rc) { - ptlrpc_request_free(request); - request = NULL; - } - return request; -} -EXPORT_SYMBOL(ptlrpc_prep_req_pool); - -/** - * Same as ptlrpc_prep_req_pool, but without pool - */ -struct ptlrpc_request * -ptlrpc_prep_req(struct obd_import *imp, __u32 version, int opcode, int count, - __u32 *lengths, char **bufs) -{ - return ptlrpc_prep_req_pool(imp, version, opcode, count, lengths, bufs, - NULL); -} -EXPORT_SYMBOL(ptlrpc_prep_req); - -/** - * Allocate and initialize new request set structure. + * Allocate and initialize new request set structure on the current CPT. * Returns a pointer to the newly allocated set structure or NULL on error. */ struct ptlrpc_request_set *ptlrpc_prep_set(void) { struct ptlrpc_request_set *set; + int cpt; - set = kzalloc(sizeof(*set), GFP_NOFS); + cpt = cfs_cpt_current(cfs_cpt_table, 0); + set = kzalloc_node(sizeof(*set), GFP_NOFS, + cfs_cpt_spread_node(cfs_cpt_table, cpt)); if (!set) return NULL; atomic_set(&set->set_refcount, 1); @@ -961,28 +937,6 @@ void ptlrpc_set_destroy(struct ptlrpc_request_set *set) EXPORT_SYMBOL(ptlrpc_set_destroy); /** - * Add a callback function \a fn to the set. - * This function would be called when all requests on this set are completed. - * The function will be passed \a data argument. - */ -int ptlrpc_set_add_cb(struct ptlrpc_request_set *set, - set_interpreter_func fn, void *data) -{ - struct ptlrpc_set_cbdata *cbdata; - - cbdata = kzalloc(sizeof(*cbdata), GFP_NOFS); - if (!cbdata) - return -ENOMEM; - - cbdata->psc_interpret = fn; - cbdata->psc_data = data; - list_add_tail(&cbdata->psc_item, &set->set_cblist); - - return 0; -} -EXPORT_SYMBOL(ptlrpc_set_add_cb); - -/** * Add a new request to the general purpose request set. * Assumes request reference from the caller. */ @@ -1001,8 +955,10 @@ void ptlrpc_set_add_req(struct ptlrpc_request_set *set, lustre_msg_set_jobid(req->rq_reqmsg, NULL); if (set->set_producer != NULL) - /* If the request set has a producer callback, the RPC must be - * sent straight away */ + /* + * If the request set has a producer callback, the RPC must be + * sent straight away + */ ptlrpc_send_new_req(req); } EXPORT_SYMBOL(ptlrpc_set_add_req); @@ -1022,9 +978,7 @@ void ptlrpc_set_add_new_req(struct ptlrpcd_ctl *pc, LASSERT(test_bit(LIOD_STOP, &pc->pc_flags) == 0); spin_lock(&set->set_new_req_lock); - /* - * The set takes over the caller's request reference. - */ + /* The set takes over the caller's request reference. */ req->rq_set = set; req->rq_queued_time = cfs_time_current(); list_add_tail(&req->rq_set_chain, &set->set_new_requests); @@ -1035,9 +989,11 @@ void ptlrpc_set_add_new_req(struct ptlrpcd_ctl *pc, if (count == 1) { wake_up(&set->set_waitq); - /* XXX: It maybe unnecessary to wakeup all the partners. But to + /* + * XXX: It maybe unnecessary to wakeup all the partners. But to * guarantee the async RPC can be processed ASAP, we have - * no other better choice. It maybe fixed in future. */ + * no other better choice. It maybe fixed in future. + */ for (i = 0; i < pc->pc_npartners; i++) wake_up(&pc->pc_partners[i]->pc_set->set_waitq); } @@ -1125,8 +1081,10 @@ static int ptlrpc_console_allow(struct ptlrpc_request *req) LASSERT(req->rq_reqmsg != NULL); opc = lustre_msg_get_opc(req->rq_reqmsg); - /* Suppress particular reconnect errors which are to be expected. No - * errors are suppressed for the initial connection on an import */ + /* + * Suppress particular reconnect errors which are to be expected. No + * errors are suppressed for the initial connection on an import + */ if ((lustre_handle_is_used(&req->rq_import->imp_remote_handle)) && (opc == OST_CONNECT || opc == MDS_CONNECT || opc == MGS_CONNECT)) { @@ -1155,6 +1113,7 @@ static int ptlrpc_check_status(struct ptlrpc_request *req) if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR) { struct obd_import *imp = req->rq_import; __u32 opc = lustre_msg_get_opc(req->rq_reqmsg); + if (ptlrpc_console_allow(req)) LCONSOLE_ERROR_MSG(0x011, "%s: Communicating with %s, operation %s failed with %d.\n", imp->imp_obd->obd_name, @@ -1164,12 +1123,11 @@ static int ptlrpc_check_status(struct ptlrpc_request *req) return err < 0 ? err : -EINVAL; } - if (err < 0) { + if (err < 0) DEBUG_REQ(D_INFO, req, "status is %d", err); - } else if (err > 0) { + else if (err > 0) /* XXX: translate this error from net to host */ DEBUG_REQ(D_INFO, req, "status is %d", err); - } return err; } @@ -1206,7 +1164,7 @@ static int after_reply(struct ptlrpc_request *req) struct obd_import *imp = req->rq_import; struct obd_device *obd = req->rq_import->imp_obd; int rc; - struct timeval work_start; + struct timespec64 work_start; long timediff; LASSERT(obd != NULL); @@ -1221,10 +1179,11 @@ static int after_reply(struct ptlrpc_request *req) } sptlrpc_cli_free_repbuf(req); - /* Pass the required reply buffer size (include - * space for early reply). - * NB: no need to roundup because alloc_repbuf - * will roundup it */ + /* + * Pass the required reply buffer size (include space for early + * reply). NB: no need to round up because alloc_repbuf will + * round it up + */ req->rq_replen = req->rq_nob_received; req->rq_nob_received = 0; spin_lock(&req->rq_lock); @@ -1243,9 +1202,7 @@ static int after_reply(struct ptlrpc_request *req) return rc; } - /* - * Security layer unwrap might ask resend this request. - */ + /* Security layer unwrap might ask resend this request. */ if (req->rq_resend) return 0; @@ -1256,7 +1213,7 @@ static int after_reply(struct ptlrpc_request *req) /* retry indefinitely on EINPROGRESS */ if (lustre_msg_get_status(req->rq_repmsg) == -EINPROGRESS && ptlrpc_no_resend(req) == 0 && !req->rq_no_retry_einprogress) { - time_t now = get_seconds(); + time64_t now = ktime_get_real_seconds(); DEBUG_REQ(D_RPCTRACE, req, "Resending request on EINPROGRESS"); spin_lock(&req->rq_lock); @@ -1266,18 +1223,19 @@ static int after_reply(struct ptlrpc_request *req) /* allocate new xid to avoid reply reconstruction */ if (!req->rq_bulk) { - /* new xid is already allocated for bulk in - * ptlrpc_check_set() */ + /* new xid is already allocated for bulk in ptlrpc_check_set() */ req->rq_xid = ptlrpc_next_xid(); DEBUG_REQ(D_RPCTRACE, req, "Allocating new xid for resend on EINPROGRESS"); } /* Readjust the timeout for current conditions */ ptlrpc_at_set_req_timeout(req); - /* delay resend to give a chance to the server to get ready. + /* + * delay resend to give a chance to the server to get ready. * The delay is increased by 1s on every resend and is capped to * the current request timeout (i.e. obd_timeout if AT is off, - * or AT service time x 125% + 5s, see at_est2timeout) */ + * or AT service time x 125% + 5s, see at_est2timeout) + */ if (req->rq_nr_resend > req->rq_timeout) req->rq_sent = now + req->rq_timeout; else @@ -1286,8 +1244,9 @@ static int after_reply(struct ptlrpc_request *req) return 0; } - do_gettimeofday(&work_start); - timediff = cfs_timeval_sub(&work_start, &req->rq_arrival_time, NULL); + ktime_get_real_ts64(&work_start); + timediff = (work_start.tv_sec - req->rq_arrival_time.tv_sec) * USEC_PER_SEC + + (work_start.tv_nsec - req->rq_arrival_time.tv_nsec) / NSEC_PER_USEC; if (obd->obd_svc_stats != NULL) { lprocfs_counter_add(obd->obd_svc_stats, PTLRPC_REQWAIT_CNTR, timediff); @@ -1332,9 +1291,7 @@ static int after_reply(struct ptlrpc_request *req) ldlm_cli_update_pool(req); } - /* - * Store transno in reqmsg for replay. - */ + /* Store transno in reqmsg for replay. */ if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)) { req->rq_transno = lustre_msg_get_transno(req->rq_repmsg); lustre_msg_set_transno(req->rq_reqmsg, req->rq_transno); @@ -1350,22 +1307,22 @@ static int after_reply(struct ptlrpc_request *req) (req->rq_transno > lustre_msg_get_last_committed(req->rq_repmsg) || req->rq_replay)) { - /** version recovery */ + /* version recovery */ ptlrpc_save_versions(req); ptlrpc_retain_replayable_request(req, imp); } else if (req->rq_commit_cb != NULL && list_empty(&req->rq_replay_list)) { - /* NB: don't call rq_commit_cb if it's already on + /* + * NB: don't call rq_commit_cb if it's already on * rq_replay_list, ptlrpc_free_committed() will call - * it later, see LU-3618 for details */ + * it later, see LU-3618 for details + */ spin_unlock(&imp->imp_lock); req->rq_commit_cb(req); spin_lock(&imp->imp_lock); } - /* - * Replay-enabled imports return commit-status information. - */ + /* Replay-enabled imports return commit-status information. */ if (lustre_msg_get_last_committed(req->rq_repmsg)) { imp->imp_peer_committed_transno = lustre_msg_get_last_committed(req->rq_repmsg); @@ -1404,7 +1361,7 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req) int rc; LASSERT(req->rq_phase == RQ_PHASE_NEW); - if (req->rq_sent && (req->rq_sent > get_seconds()) && + if (req->rq_sent && (req->rq_sent > ktime_get_real_seconds()) && (!req->rq_generation_set || req->rq_import_generation == imp->imp_generation)) return 0; @@ -1484,8 +1441,10 @@ static inline int ptlrpc_set_producer(struct ptlrpc_request_set *set) remaining = atomic_read(&set->set_remaining); - /* populate the ->set_requests list with requests until we - * reach the maximum number of RPCs in flight for this set */ + /* + * populate the ->set_requests list with requests until we + * reach the maximum number of RPCs in flight for this set + */ while (atomic_read(&set->set_remaining) < set->set_max_inflight) { rc = set->set_producer(set, set->set_producer_arg); if (rc == -ENOENT) { @@ -1525,7 +1484,8 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) int unregistered = 0; int rc = 0; - /* This schedule point is mainly for the ptlrpcd caller of this + /* + * This schedule point is mainly for the ptlrpcd caller of this * function. Most ptlrpc sets are not long-lived and unbounded * in length, but at the least the set used by the ptlrpcd is. * Since the processing time is unbounded, we need to insert an @@ -1544,7 +1504,7 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) /* delayed resend - skip */ if (req->rq_phase == RQ_PHASE_RPC && req->rq_resend && - req->rq_sent > get_seconds()) + req->rq_sent > ktime_get_real_seconds()) continue; if (!(req->rq_phase == RQ_PHASE_RPC || @@ -1584,8 +1544,7 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) OBD_FAIL_ONCE); } - /* - * Move to next phase if reply was successfully + /* Move to next phase if reply was successfully * unlinked. */ ptlrpc_rqphase_move(req, req->rq_next_phase); @@ -1599,15 +1558,11 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) if (req->rq_phase == RQ_PHASE_INTERPRET) goto interpret; - /* - * Note that this also will start async reply unlink. - */ + /* Note that this also will start async reply unlink. */ if (req->rq_net_err && !req->rq_timedout) { ptlrpc_expire_one_request(req, 1); - /* - * Check if we still need to wait for unlink. - */ + /* Check if we still need to wait for unlink. */ if (ptlrpc_client_recv_or_unlink(req) || ptlrpc_client_bulk_active(req)) continue; @@ -1632,7 +1587,8 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) goto interpret; } - /* ptlrpc_set_wait->l_wait_event sets lwi_allow_intr + /* + * ptlrpc_set_wait->l_wait_event sets lwi_allow_intr * so it sets rq_intr regardless of individual rpc * timeouts. The synchronous IO waiting path sets * rq_intr irrespective of whether ptlrpcd @@ -1659,8 +1615,10 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) spin_lock(&imp->imp_lock); if (ptlrpc_import_delay_req(imp, req, &status)) { - /* put on delay list - only if we wait - * recovery finished - before send */ + /* + * put on delay list - only if we wait + * recovery finished - before send + */ list_del_init(&req->rq_list); list_add_tail(&req->rq_list, &imp-> @@ -1696,8 +1654,7 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) spin_unlock(&req->rq_lock); if (req->rq_timedout || req->rq_resend) { - /* This is re-sending anyways, - * let's mark req as resend. */ + /* This is re-sending anyway, let's mark req as resend. */ spin_lock(&req->rq_lock); req->rq_resend = 1; spin_unlock(&req->rq_lock); @@ -1775,8 +1732,10 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) spin_unlock(&req->rq_lock); - /* unlink from net because we are going to - * swab in-place of reply buffer */ + /* + * unlink from net because we are going to + * swab in-place of reply buffer + */ unregistered = ptlrpc_unregister_reply(req, 1); if (!unregistered) continue; @@ -1785,7 +1744,8 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) if (req->rq_resend) continue; - /* If there is no bulk associated with this request, + /* + * If there is no bulk associated with this request, * then we're done and should let the interpreter * process the reply. Similarly if the RPC returned * an error, and therefore the bulk will never arrive. @@ -1803,10 +1763,12 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) continue; if (req->rq_bulk->bd_failure) { - /* The RPC reply arrived OK, but the bulk screwed + /* + * The RPC reply arrived OK, but the bulk screwed * up! Dead weird since the server told us the RPC * was good after getting the REPLY for her GET or - * the ACK for her PUT. */ + * the ACK for her PUT. + */ DEBUG_REQ(D_ERROR, req, "bulk transfer failed"); req->rq_status = -EIO; } @@ -1816,8 +1778,10 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set) interpret: LASSERT(req->rq_phase == RQ_PHASE_INTERPRET); - /* This moves to "unregistering" phase we need to wait for - * reply unlink. */ + /* + * This moves to "unregistering" phase we need to wait for + * reply unlink. + */ if (!unregistered && !ptlrpc_unregister_reply(req, 1)) { /* start async bulk unlink too */ ptlrpc_unregister_bulk(req, 1); @@ -1827,8 +1791,7 @@ interpret: if (!ptlrpc_unregister_bulk(req, 1)) continue; - /* When calling interpret receiving already should be - * finished. */ + /* When calling interpret receive should already be finished. */ LASSERT(!req->rq_receiving_reply); ptlrpc_req_interpret(env, req, req->rq_status); @@ -1847,10 +1810,12 @@ interpret: lustre_msg_get_opc(req->rq_reqmsg)); spin_lock(&imp->imp_lock); - /* Request already may be not on sending or delaying list. This + /* + * Request already may be not on sending or delaying list. This * may happen in the case of marking it erroneous for the case * ptlrpc_import_delay_req(req, status) find it impossible to - * allow sending this rpc and returns *status != 0. */ + * allow sending this rpc and returns *status != 0. + */ if (!list_empty(&req->rq_list)) { list_del_init(&req->rq_list); atomic_dec(&imp->imp_inflight); @@ -1865,8 +1830,10 @@ interpret: if (ptlrpc_set_producer(set) > 0) force_timer_recalc = 1; - /* free the request that has just been completed - * in order not to pollute set->set_requests */ + /* + * free the request that has just been completed + * in order not to pollute set->set_requests + */ list_del_init(&req->rq_set_chain); spin_lock(&req->rq_lock); req->rq_set = NULL; @@ -1882,8 +1849,10 @@ interpret: } } - /* move completed request at the head of list so it's easier for - * caller to find them */ + /* + * move completed request at the head of list so it's easier for + * caller to find them + */ list_splice(&comp_reqs, &set->set_requests); /* If we hit an error, we want to recover promptly. */ @@ -1905,14 +1874,13 @@ int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink) req->rq_timedout = 1; spin_unlock(&req->rq_lock); - DEBUG_REQ(D_WARNING, req, "Request sent has %s: [sent "CFS_DURATION_T - "/real "CFS_DURATION_T"]", + DEBUG_REQ(D_WARNING, req, "Request sent has %s: [sent %lld/real %lld]", req->rq_net_err ? "failed due to network error" : ((req->rq_real_sent == 0 || - time_before((unsigned long)req->rq_real_sent, (unsigned long)req->rq_sent) || - cfs_time_aftereq(req->rq_real_sent, req->rq_deadline)) ? + req->rq_real_sent < req->rq_sent || + req->rq_real_sent >= req->rq_deadline) ? "timed out for sent delay" : "timed out for slow reply"), - req->rq_sent, req->rq_real_sent); + (s64)req->rq_sent, (s64)req->rq_real_sent); if (imp != NULL && obd_debug_peer_on_timeout) LNetCtl(IOC_LIBCFS_DEBUG_PEER, &imp->imp_connection->c_peer); @@ -1934,8 +1902,10 @@ int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink) if (imp->imp_dlm_fake) return 1; - /* If this request is for recovery or other primordial tasks, - * then error it out here. */ + /* + * If this request is for recovery or other primordial tasks, + * then error it out here. + */ if (req->rq_ctx_init || req->rq_ctx_fini || req->rq_send_state != LUSTRE_IMP_FULL || imp->imp_obd->obd_no_recov) { @@ -1949,8 +1919,10 @@ int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink) return 1; } - /* if a request can't be resent we can't wait for an answer after - the timeout */ + /* + * if a request can't be resent we can't wait for an answer after + * the timeout + */ if (ptlrpc_no_resend(req)) { DEBUG_REQ(D_RPCTRACE, req, "TIMEOUT-NORESEND:"); rc = 1; @@ -1970,13 +1942,11 @@ int ptlrpc_expired_set(void *data) { struct ptlrpc_request_set *set = data; struct list_head *tmp; - time_t now = get_seconds(); + time64_t now = ktime_get_real_seconds(); LASSERT(set != NULL); - /* - * A timeout expired. See which reqs it applies to... - */ + /* A timeout expired. See which reqs it applies to... */ list_for_each(tmp, &set->set_requests) { struct ptlrpc_request *req = list_entry(tmp, struct ptlrpc_request, @@ -1996,8 +1966,10 @@ int ptlrpc_expired_set(void *data) req->rq_deadline > now) /* not expired */ continue; - /* Deal with this guy. Do it asynchronously to not block - * ptlrpcd thread. */ + /* + * Deal with this guy. Do it asynchronously to not block + * ptlrpcd thread. + */ ptlrpc_expire_one_request(req, 1); } @@ -2053,31 +2025,25 @@ EXPORT_SYMBOL(ptlrpc_interrupted_set); int ptlrpc_set_next_timeout(struct ptlrpc_request_set *set) { struct list_head *tmp; - time_t now = get_seconds(); + time64_t now = ktime_get_real_seconds(); int timeout = 0; struct ptlrpc_request *req; - int deadline; + time64_t deadline; list_for_each(tmp, &set->set_requests) { req = list_entry(tmp, struct ptlrpc_request, rq_set_chain); - /* - * Request in-flight? - */ + /* Request in-flight? */ if (!(((req->rq_phase == RQ_PHASE_RPC) && !req->rq_waiting) || (req->rq_phase == RQ_PHASE_BULK) || (req->rq_phase == RQ_PHASE_NEW))) continue; - /* - * Already timed out. - */ + /* Already timed out. */ if (req->rq_timedout) continue; - /* - * Waiting for ctx. - */ + /* Waiting for ctx. */ if (req->rq_wait_ctx) continue; @@ -2126,8 +2092,10 @@ int ptlrpc_set_wait(struct ptlrpc_request_set *set) do { timeout = ptlrpc_set_next_timeout(set); - /* wait until all complete, interrupted, or an in-flight - * req times out */ + /* + * wait until all complete, interrupted, or an in-flight + * req times out + */ CDEBUG(D_RPCTRACE, "set %p going to sleep for %d seconds\n", set, timeout); @@ -2152,18 +2120,22 @@ int ptlrpc_set_wait(struct ptlrpc_request_set *set) rc = l_wait_event(set->set_waitq, ptlrpc_check_set(NULL, set), &lwi); - /* LU-769 - if we ignored the signal because it was already + /* + * LU-769 - if we ignored the signal because it was already * pending when we started, we need to handle it now or we risk - * it being ignored forever */ + * it being ignored forever + */ if (rc == -ETIMEDOUT && !lwi.lwi_allow_intr && cfs_signal_pending()) { sigset_t blocked_sigs = cfs_block_sigsinv(LUSTRE_FATAL_SIGS); - /* In fact we only interrupt for the "fatal" signals + /* + * In fact we only interrupt for the "fatal" signals * like SIGINT or SIGKILL. We still ignore less * important signals since ptlrpc set is not easily - * reentrant from userspace again */ + * reentrant from userspace again + */ if (cfs_signal_pending()) ptlrpc_interrupted_set(set); cfs_restore_sigs(blocked_sigs); @@ -2171,13 +2143,15 @@ int ptlrpc_set_wait(struct ptlrpc_request_set *set) LASSERT(rc == 0 || rc == -EINTR || rc == -ETIMEDOUT); - /* -EINTR => all requests have been flagged rq_intr so next + /* + * -EINTR => all requests have been flagged rq_intr so next * check completes. * -ETIMEDOUT => someone timed out. When all reqs have * timed out, signals are enabled allowing completion with * EINTR. * I don't really care if we go once more round the loop in - * the error cases -eeb. */ + * the error cases -eeb. + */ if (rc == 0 && atomic_read(&set->set_remaining) == 0) { list_for_each(tmp, &set->set_requests) { req = list_entry(tmp, struct ptlrpc_request, @@ -2243,8 +2217,10 @@ static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked) req_capsule_fini(&request->rq_pill); - /* We must take it off the imp_replay_list first. Otherwise, we'll set - * request->rq_reqmsg to NULL while osc_close is dereferencing it. */ + /* + * We must take it off the imp_replay_list first. Otherwise, we'll set + * request->rq_reqmsg to NULL while osc_close is dereferencing it. + */ if (request->rq_import != NULL) { if (!locked) spin_lock(&request->rq_import->imp_lock); @@ -2285,18 +2261,6 @@ static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked) ptlrpc_request_cache_free(request); } -static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked); -/** - * Drop one request reference. Must be called with import imp_lock held. - * When reference count drops to zero, request is freed. - */ -void ptlrpc_req_finished_with_imp_lock(struct ptlrpc_request *request) -{ - assert_spin_locked(&request->rq_import->imp_lock); - (void)__ptlrpc_req_finished(request, 1); -} -EXPORT_SYMBOL(ptlrpc_req_finished_with_imp_lock); - /** * Helper function * Drops one reference count for request \a request. @@ -2357,40 +2321,28 @@ int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async) wait_queue_head_t *wq; struct l_wait_info lwi; - /* - * Might sleep. - */ + /* Might sleep. */ LASSERT(!in_interrupt()); - /* - * Let's setup deadline for reply unlink. - */ + /* Let's setup deadline for reply unlink. */ if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) && async && request->rq_reply_deadline == 0) - request->rq_reply_deadline = get_seconds()+LONG_UNLINK; + request->rq_reply_deadline = ktime_get_real_seconds()+LONG_UNLINK; - /* - * Nothing left to do. - */ + /* Nothing left to do. */ if (!ptlrpc_client_recv_or_unlink(request)) return 1; LNetMDUnlink(request->rq_reply_md_h); - /* - * Let's check it once again. - */ + /* Let's check it once again. */ if (!ptlrpc_client_recv_or_unlink(request)) return 1; - /* - * Move to "Unregistering" phase as reply was not unlinked yet. - */ + /* Move to "Unregistering" phase as reply was not unlinked yet. */ ptlrpc_rqphase_move(request, RQ_PHASE_UNREGISTERING); - /* - * Do not wait for unlink to finish. - */ + /* Do not wait for unlink to finish. */ if (async) return 0; @@ -2405,8 +2357,10 @@ int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async) wq = &request->rq_reply_waitq; for (;;) { - /* Network access will complete in finite time but the HUGE - * timeout lets us CWARN for visibility of sluggish NALs */ + /* + * Network access will complete in finite time but the HUGE + * timeout lets us CWARN for visibility of sluggish NALs + */ lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(LONG_UNLINK), cfs_time_seconds(1), NULL, NULL); rc = l_wait_event(*wq, !ptlrpc_client_recv_or_unlink(request), @@ -2538,11 +2492,6 @@ free_req: } } -void ptlrpc_cleanup_client(struct obd_import *imp) -{ -} -EXPORT_SYMBOL(ptlrpc_cleanup_client); - /** * Schedule previously sent request for resend. * For bulk requests we assign new xid (to avoid problems with @@ -2554,8 +2503,10 @@ void ptlrpc_resend_req(struct ptlrpc_request *req) DEBUG_REQ(D_HA, req, "going to resend"); spin_lock(&req->rq_lock); - /* Request got reply but linked to the import list still. - Let ptlrpc_check_set() to process it. */ + /* + * Request got reply but linked to the import list still. + * Let ptlrpc_check_set() to process it. + */ if (ptlrpc_client_replied(req)) { spin_unlock(&req->rq_lock); DEBUG_REQ(D_HA, req, "it has reply, so skip it"); @@ -2581,20 +2532,6 @@ void ptlrpc_resend_req(struct ptlrpc_request *req) } EXPORT_SYMBOL(ptlrpc_resend_req); -/* XXX: this function and rq_status are currently unused */ -void ptlrpc_restart_req(struct ptlrpc_request *req) -{ - DEBUG_REQ(D_HA, req, "restarting (possibly-)completed request"); - req->rq_status = -ERESTARTSYS; - - spin_lock(&req->rq_lock); - req->rq_restart = 1; - req->rq_timedout = 0; - ptlrpc_client_wake_req(req); - spin_unlock(&req->rq_lock); -} -EXPORT_SYMBOL(ptlrpc_restart_req); - /** * Grab additional reference on a request \a req */ @@ -2621,8 +2558,10 @@ void ptlrpc_retain_replayable_request(struct ptlrpc_request *req, LBUG(); } - /* clear this for new requests that were resent as well - as resent replayed requests. */ + /* + * clear this for new requests that were resent as well + * as resent replayed requests. + */ lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT); /* don't re-add requests that have been replayed */ @@ -2639,7 +2578,8 @@ void ptlrpc_retain_replayable_request(struct ptlrpc_request *req, list_entry(tmp, struct ptlrpc_request, rq_replay_list); - /* We may have duplicate transnos if we create and then + /* + * We may have duplicate transnos if we create and then * open a file, or for closes retained if to match creating * opens, so use req->rq_xid as a secondary key. * (See bugs 684, 685, and 428.) @@ -2824,8 +2764,10 @@ int ptlrpc_replay_req(struct ptlrpc_request *req) /* Readjust the timeout for current conditions */ ptlrpc_at_set_req_timeout(req); - /* Tell server the net_latency, so the server can calculate how long - * it should wait for next replay */ + /* + * Tell server the net_latency, so the server can calculate how long + * it should wait for next replay + */ lustre_msg_set_service_time(req->rq_reqmsg, ptlrpc_at_get_net_latency(req)); DEBUG_REQ(D_HA, req, "REPLAY"); @@ -2833,7 +2775,7 @@ int ptlrpc_replay_req(struct ptlrpc_request *req) atomic_inc(&req->rq_import->imp_replay_inflight); ptlrpc_request_addref(req); /* ptlrpcd needs a ref */ - ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1); + ptlrpcd_add_req(req); return 0; } EXPORT_SYMBOL(ptlrpc_replay_req); @@ -2845,13 +2787,15 @@ void ptlrpc_abort_inflight(struct obd_import *imp) { struct list_head *tmp, *n; - /* Make sure that no new requests get processed for this import. + /* + * Make sure that no new requests get processed for this import. * ptlrpc_{queue,set}_wait must (and does) hold imp_lock while testing * this flag and then putting requests on sending_list or delayed_list. */ spin_lock(&imp->imp_lock); - /* XXX locking? Maybe we should remove each request with the list + /* + * XXX locking? Maybe we should remove each request with the list * locked? Also, how do we know if the requests on the list are * being freed at this time? */ @@ -2885,8 +2829,10 @@ void ptlrpc_abort_inflight(struct obd_import *imp) spin_unlock(&req->rq_lock); } - /* Last chance to free reqs left on the replay list, but we - * will still leak reqs that haven't committed. */ + /* + * Last chance to free reqs left on the replay list, but we + * will still leak reqs that haven't committed. + */ if (imp->imp_replayable) ptlrpc_free_committed(imp); @@ -2942,7 +2888,7 @@ static spinlock_t ptlrpc_last_xid_lock; #define YEAR_2004 (1ULL << 30) void ptlrpc_init_xid(void) { - time_t now = get_seconds(); + time64_t now = ktime_get_real_seconds(); spin_lock_init(&ptlrpc_last_xid_lock); if (now < YEAR_2004) { @@ -2954,7 +2900,7 @@ void ptlrpc_init_xid(void) } /* Always need to be aligned to a power-of-two for multi-bulk BRW */ - CLASSERT((PTLRPC_BULK_OPS_COUNT & (PTLRPC_BULK_OPS_COUNT - 1)) == 0); + CLASSERT(((PTLRPC_BULK_OPS_COUNT - 1) & PTLRPC_BULK_OPS_COUNT) == 0); ptlrpc_last_xid &= PTLRPC_BULK_OPS_MASK; } @@ -3031,7 +2977,7 @@ static void ptlrpcd_add_work_req(struct ptlrpc_request *req) { /* re-initialize the req */ req->rq_timeout = obd_timeout; - req->rq_sent = get_seconds(); + req->rq_sent = ktime_get_real_seconds(); req->rq_deadline = req->rq_sent + req->rq_timeout; req->rq_reply_deadline = req->rq_deadline; req->rq_phase = RQ_PHASE_INTERPRET; @@ -3039,7 +2985,7 @@ static void ptlrpcd_add_work_req(struct ptlrpc_request *req) req->rq_xid = ptlrpc_next_xid(); req->rq_import_generation = req->rq_import->imp_generation; - ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); + ptlrpcd_add_req(req); } static int work_interpreter(const struct lu_env *env, diff --git a/drivers/staging/lustre/lustre/ptlrpc/connection.c b/drivers/staging/lustre/lustre/ptlrpc/connection.c index ffe36e222..da1f0b1ac 100644 --- a/drivers/staging/lustre/lustre/ptlrpc/connection.c +++ b/drivers/staging/lustre/lustre/ptlrpc/connection.c @@ -42,7 +42,7 @@ #include "ptlrpc_internal.h" static struct cfs_hash *conn_hash; -static cfs_hash_ops_t conn_hash_ops; +static struct cfs_hash_ops conn_hash_ops; struct ptlrpc_connection * ptlrpc_connection_get(lnet_process_id_t peer, lnet_nid_t self, @@ -173,7 +173,7 @@ conn_keycmp(const void *key, struct hlist_node *hnode) const lnet_process_id_t *conn_key; LASSERT(key != NULL); - conn_key = (lnet_process_id_t *)key; + conn_key = key; conn = hlist_entry(hnode, struct ptlrpc_connection, c_hash); return conn_key->nid == conn->c_peer.nid && @@ -230,12 +230,12 @@ conn_exit(struct cfs_hash *hs, struct hlist_node *hnode) kfree(conn); } -static cfs_hash_ops_t conn_hash_ops = { +static struct cfs_hash_ops conn_hash_ops = { .hs_hash = conn_hashfn, .hs_keycmp = conn_keycmp, - .hs_key = conn_key, + .hs_key = conn_key, .hs_object = conn_object, - .hs_get = conn_get, + .hs_get = conn_get, .hs_put_locked = conn_put_locked, .hs_exit = conn_exit, }; diff --git a/drivers/staging/lustre/lustre/ptlrpc/events.c b/drivers/staging/lustre/lustre/ptlrpc/events.c index c8ef9e578..9c2fd34e2 100644 --- a/drivers/staging/lustre/lustre/ptlrpc/events.c +++ b/drivers/staging/lustre/lustre/ptlrpc/events.c @@ -64,7 +64,7 @@ void request_out_callback(lnet_event_t *ev) sptlrpc_request_out_callback(req); spin_lock(&req->rq_lock); - req->rq_real_sent = get_seconds(); + req->rq_real_sent = ktime_get_real_seconds(); if (ev->unlinked) req->rq_req_unlink = 0; @@ -158,7 +158,7 @@ void reply_in_callback(lnet_event_t *ev) ev->mlength, ev->offset, req->rq_replen); } - req->rq_import->imp_last_reply_time = get_seconds(); + req->rq_import->imp_last_reply_time = ktime_get_real_seconds(); out_wake: /* NB don't unlock till after wakeup; req can disappear under us @@ -246,7 +246,7 @@ static void ptlrpc_req_add_history(struct ptlrpc_service_part *svcpt, struct ptlrpc_request *req) { __u64 sec = req->rq_arrival_time.tv_sec; - __u32 usec = req->rq_arrival_time.tv_usec >> 4; /* usec / 16 */ + __u32 usec = req->rq_arrival_time.tv_nsec / NSEC_PER_USEC / 16; /* usec / 16 */ __u64 new_seq; /* set sequence ID for request and add it to history list, @@ -327,7 +327,7 @@ void request_in_callback(lnet_event_t *ev) req->rq_reqbuf = ev->md.start + ev->offset; if (ev->type == LNET_EVENT_PUT && ev->status == 0) req->rq_reqdata_len = ev->mlength; - do_gettimeofday(&req->rq_arrival_time); + ktime_get_real_ts64(&req->rq_arrival_time); req->rq_peer = ev->initiator; req->rq_self = ev->target.nid; req->rq_rqbd = rqbd; @@ -415,7 +415,6 @@ void reply_out_callback(lnet_event_t *ev) } } - static void ptlrpc_master_callback(lnet_event_t *ev) { struct ptlrpc_cb_id *cbid = ev->md.user_ptr; @@ -521,7 +520,7 @@ static void ptlrpc_ni_fini(void) /* notreached */ } -lnet_pid_t ptl_get_pid(void) +static lnet_pid_t ptl_get_pid(void) { lnet_pid_t pid; @@ -560,7 +559,6 @@ static int ptlrpc_ni_init(void) return -ENOMEM; } - int ptlrpc_init_portals(void) { int rc = ptlrpc_ni_init(); diff --git a/drivers/staging/lustre/lustre/ptlrpc/import.c b/drivers/staging/lustre/lustre/ptlrpc/import.c index 1eae3896c..bfa410f7e 100644 --- a/drivers/staging/lustre/lustre/ptlrpc/import.c +++ b/drivers/staging/lustre/lustre/ptlrpc/import.c @@ -79,7 +79,7 @@ static void __import_set_state(struct obd_import *imp, imp->imp_state = state; imp->imp_state_hist[imp->imp_state_hist_idx].ish_state = state; imp->imp_state_hist[imp->imp_state_hist_idx].ish_time = - get_seconds(); + ktime_get_real_seconds(); imp->imp_state_hist_idx = (imp->imp_state_hist_idx + 1) % IMP_STATE_HIST_LEN; } @@ -103,7 +103,6 @@ do { \ spin_unlock(&imp->imp_lock); \ } while (0) - static int ptlrpc_connect_interpret(const struct lu_env *env, struct ptlrpc_request *request, void *data, int rc); @@ -128,7 +127,8 @@ int ptlrpc_init_import(struct obd_import *imp) EXPORT_SYMBOL(ptlrpc_init_import); #define UUID_STR "_UUID" -void deuuidify(char *uuid, const char *prefix, char **uuid_start, int *uuid_len) +static void deuuidify(char *uuid, const char *prefix, char **uuid_start, + int *uuid_len) { *uuid_start = !prefix || strncmp(uuid, prefix, strlen(prefix)) ? uuid : uuid + strlen(prefix); @@ -142,7 +142,6 @@ void deuuidify(char *uuid, const char *prefix, char **uuid_start, int *uuid_len) UUID_STR, strlen(UUID_STR))) *uuid_len -= strlen(UUID_STR); } -EXPORT_SYMBOL(deuuidify); /** * Returns true if import was FULL, false if import was already not @@ -200,12 +199,15 @@ int ptlrpc_set_import_discon(struct obd_import *imp, __u32 conn_cnt) return rc; } -/* Must be called with imp_lock held! */ -static void ptlrpc_deactivate_and_unlock_import(struct obd_import *imp) +/* + * This acts as a barrier; all existing requests are rejected, and + * no new requests will be accepted until the import is valid again. + */ +void ptlrpc_deactivate_import(struct obd_import *imp) { - assert_spin_locked(&imp->imp_lock); - CDEBUG(D_HA, "setting import %s INVALID\n", obd2cli_tgt(imp->imp_obd)); + + spin_lock(&imp->imp_lock); imp->imp_invalid = 1; imp->imp_generation++; spin_unlock(&imp->imp_lock); @@ -213,20 +215,10 @@ static void ptlrpc_deactivate_and_unlock_import(struct obd_import *imp) ptlrpc_abort_inflight(imp); obd_import_event(imp->imp_obd, imp, IMP_EVENT_INACTIVE); } - -/* - * This acts as a barrier; all existing requests are rejected, and - * no new requests will be accepted until the import is valid again. - */ -void ptlrpc_deactivate_import(struct obd_import *imp) -{ - spin_lock(&imp->imp_lock); - ptlrpc_deactivate_and_unlock_import(imp); -} EXPORT_SYMBOL(ptlrpc_deactivate_import); static unsigned int -ptlrpc_inflight_deadline(struct ptlrpc_request *req, time_t now) +ptlrpc_inflight_deadline(struct ptlrpc_request *req, time64_t now) { long dl; @@ -251,7 +243,7 @@ ptlrpc_inflight_deadline(struct ptlrpc_request *req, time_t now) static unsigned int ptlrpc_inflight_timeout(struct obd_import *imp) { - time_t now = get_seconds(); + time64_t now = ktime_get_real_seconds(); struct list_head *tmp, *n; struct ptlrpc_request *req; unsigned int timeout = 0; @@ -461,6 +453,7 @@ int ptlrpc_reconnect_import(struct obd_import *imp) if (atomic_read(&imp->imp_inval_count) > 0) { int rc; struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL); + rc = l_wait_event(imp->imp_recovery_waitq, (atomic_read(&imp->imp_inval_count) == 0), &lwi); @@ -542,6 +535,7 @@ static int import_select_connection(struct obd_import *imp) trying to reconnect on it.) */ if (tried_all && (imp->imp_conn_list.next == &imp_conn->oic_item)) { struct adaptive_timeout *at = &imp->imp_at.iat_net_latency; + if (at_get(at) < CONNECTION_SWITCH_MAX) { at_measured(at, at_get(at) + CONNECTION_SWITCH_INC); if (at_get(at) > CONNECTION_SWITCH_MAX) @@ -749,12 +743,11 @@ int ptlrpc_connect_import(struct obd_import *imp) DEBUG_REQ(D_RPCTRACE, request, "(re)connect request (timeout %d)", request->rq_timeout); - ptlrpcd_add_req(request, PDL_POLICY_ROUND, -1); + ptlrpcd_add_req(request); rc = 0; out: - if (rc != 0) { + if (rc != 0) IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON); - } return rc; } @@ -906,7 +899,7 @@ static int ptlrpc_connect_interpret(const struct lu_env *env, } /* Determine what recovery state to move the import to. */ - if (MSG_CONNECT_RECONNECT & msg_flags) { + if (msg_flags & MSG_CONNECT_RECONNECT) { memset(&old_hdl, 0, sizeof(old_hdl)); if (!memcmp(&old_hdl, lustre_msg_get_handle(request->rq_repmsg), sizeof(old_hdl))) { @@ -931,7 +924,7 @@ static int ptlrpc_connect_interpret(const struct lu_env *env, * eviction. If it is in recovery - we are safe to * participate since we can reestablish all of our state * with server again */ - if ((MSG_CONNECT_RECOVERING & msg_flags)) { + if ((msg_flags & MSG_CONNECT_RECOVERING)) { CDEBUG(level, "%s@%s changed server handle from %#llx to %#llx but is still in recovery\n", obd2cli_tgt(imp->imp_obd), imp->imp_connection->c_remote_uuid.uuid, @@ -948,11 +941,10 @@ static int ptlrpc_connect_interpret(const struct lu_env *env, request->rq_repmsg)->cookie); } - imp->imp_remote_handle = *lustre_msg_get_handle(request->rq_repmsg); - if (!(MSG_CONNECT_RECOVERING & msg_flags)) { + if (!(msg_flags & MSG_CONNECT_RECOVERING)) { IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED); rc = 0; goto finish; @@ -968,7 +960,7 @@ static int ptlrpc_connect_interpret(const struct lu_env *env, CDEBUG(D_HA, "%s: reconnected but import is invalid; marking evicted\n", imp->imp_obd->obd_name); IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED); - } else if (MSG_CONNECT_RECOVERING & msg_flags) { + } else if (msg_flags & MSG_CONNECT_RECOVERING) { CDEBUG(D_HA, "%s: reconnected to %s during replay\n", imp->imp_obd->obd_name, obd2cli_tgt(imp->imp_obd)); @@ -981,7 +973,7 @@ static int ptlrpc_connect_interpret(const struct lu_env *env, } else { IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER); } - } else if ((MSG_CONNECT_RECOVERING & msg_flags) && !imp->imp_invalid) { + } else if ((msg_flags & MSG_CONNECT_RECOVERING) && !imp->imp_invalid) { LASSERT(imp->imp_replayable); imp->imp_remote_handle = *lustre_msg_get_handle(request->rq_repmsg); @@ -1264,7 +1256,7 @@ static int signal_completed_replay(struct obd_import *imp) req->rq_timeout *= 3; req->rq_interpret_reply = completed_replay_interpret; - ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); + ptlrpcd_add_req(req); return 0; } @@ -1511,16 +1503,6 @@ out: } EXPORT_SYMBOL(ptlrpc_disconnect_import); -void ptlrpc_cleanup_imp(struct obd_import *imp) -{ - spin_lock(&imp->imp_lock); - IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CLOSED); - imp->imp_generation++; - spin_unlock(&imp->imp_lock); - ptlrpc_abort_inflight(imp); -} -EXPORT_SYMBOL(ptlrpc_cleanup_imp); - /* Adaptive Timeout utils */ extern unsigned int at_min, at_max, at_history; @@ -1531,12 +1513,12 @@ extern unsigned int at_min, at_max, at_history; int at_measured(struct adaptive_timeout *at, unsigned int val) { unsigned int old = at->at_current; - time_t now = get_seconds(); - time_t binlimit = max_t(time_t, at_history / AT_BINS, 1); + time64_t now = ktime_get_real_seconds(); + long binlimit = max_t(long, at_history / AT_BINS, 1); LASSERT(at); CDEBUG(D_OTHER, "add %u to %p time=%lu v=%u (%u %u %u %u)\n", - val, at, now - at->at_binstart, at->at_current, + val, at, (long)(now - at->at_binstart), at->at_current, at->at_hist[0], at->at_hist[1], at->at_hist[2], at->at_hist[3]); if (val == 0) @@ -1561,7 +1543,7 @@ int at_measured(struct adaptive_timeout *at, unsigned int val) int i, shift; unsigned int maxv = val; /* move bins over */ - shift = (now - at->at_binstart) / binlimit; + shift = (u32)(now - at->at_binstart) / binlimit; LASSERT(shift > 0); for (i = AT_BINS - 1; i >= 0; i--) { if (i >= shift) { diff --git a/drivers/staging/lustre/lustre/ptlrpc/layout.c b/drivers/staging/lustre/lustre/ptlrpc/layout.c index d14c20008..d7c4f4780 100644 --- a/drivers/staging/lustre/lustre/ptlrpc/layout.c +++ b/drivers/staging/lustre/lustre/ptlrpc/layout.c @@ -404,6 +404,7 @@ static const struct req_msg_field *ldlm_intent_layout_client[] = { &RMF_LAYOUT_INTENT, &RMF_EADATA /* for new layout to be set up */ }; + static const struct req_msg_field *ldlm_intent_open_server[] = { &RMF_PTLRPC_BODY, &RMF_DLM_REP, @@ -578,7 +579,6 @@ static const struct req_msg_field *ost_destroy_client[] = { &RMF_CAPA1 }; - static const struct req_msg_field *ost_brw_client[] = { &RMF_PTLRPC_BODY, &RMF_OST_BODY, @@ -789,17 +789,17 @@ enum rmf_flags { /** * The field is a string, must be NUL-terminated. */ - RMF_F_STRING = 1 << 0, + RMF_F_STRING = BIT(0), /** * The field's buffer size need not match the declared \a rmf_size. */ - RMF_F_NO_SIZE_CHECK = 1 << 1, + RMF_F_NO_SIZE_CHECK = BIT(1), /** * The field's buffer size must be a whole multiple of the declared \a * rmf_size and the \a rmf_swabber function must work on the declared \a * rmf_size worth of bytes. */ - RMF_F_STRUCT_ARRAY = 1 << 2 + RMF_F_STRUCT_ARRAY = BIT(2) }; struct req_capsule; @@ -1679,7 +1679,7 @@ EXPORT_SYMBOL(req_layout_fini); * req_capsule_msg_size(). The \a rc_area information is used by. * ptlrpc_request_set_replen(). */ -void req_capsule_init_area(struct req_capsule *pill) +static void req_capsule_init_area(struct req_capsule *pill) { int i; @@ -1688,7 +1688,6 @@ void req_capsule_init_area(struct req_capsule *pill) pill->rc_area[RCL_SERVER][i] = -1; } } -EXPORT_SYMBOL(req_capsule_init_area); /** * Initialize a pill. @@ -1996,55 +1995,6 @@ static void *__req_capsule_get(struct req_capsule *pill, } /** - * Dump a request and/or reply - */ -static void __req_capsule_dump(struct req_capsule *pill, enum req_location loc) -{ - const struct req_format *fmt; - const struct req_msg_field *field; - int len; - int i; - - fmt = pill->rc_fmt; - - DEBUG_REQ(D_RPCTRACE, pill->rc_req, "BEGIN REQ CAPSULE DUMP\n"); - for (i = 0; i < fmt->rf_fields[loc].nr; ++i) { - field = FMT_FIELD(fmt, loc, i); - if (field->rmf_dumper == NULL) { - /* - * FIXME Add a default hex dumper for fields that don't - * have a specific dumper - */ - len = req_capsule_get_size(pill, field, loc); - CDEBUG(D_RPCTRACE, "Field %s has no dumper function; field size is %d\n", - field->rmf_name, len); - } else { - /* It's the dumping side-effect that we're interested in */ - (void) __req_capsule_get(pill, field, loc, NULL, 1); - } - } - CDEBUG(D_RPCTRACE, "END REQ CAPSULE DUMP\n"); -} - -/** - * Dump a request. - */ -void req_capsule_client_dump(struct req_capsule *pill) -{ - __req_capsule_dump(pill, RCL_CLIENT); -} -EXPORT_SYMBOL(req_capsule_client_dump); - -/** - * Dump a reply - */ -void req_capsule_server_dump(struct req_capsule *pill) -{ - __req_capsule_dump(pill, RCL_SERVER); -} -EXPORT_SYMBOL(req_capsule_server_dump); - -/** * Trivial wrapper around __req_capsule_get(), that returns the PTLRPC request * buffer corresponding to the given RMF (\a field) of a \a pill. */ @@ -2136,21 +2086,6 @@ void *req_capsule_server_sized_swab_get(struct req_capsule *pill, EXPORT_SYMBOL(req_capsule_server_sized_swab_get); /** - * Returns the buffer of a \a pill corresponding to the given \a field from the - * request (if the caller is executing on the server-side) or reply (if the - * caller is executing on the client-side). - * - * This function convenient for use is code that could be executed on the - * client and server alike. - */ -const void *req_capsule_other_get(struct req_capsule *pill, - const struct req_msg_field *field) -{ - return __req_capsule_get(pill, field, pill->rc_loc ^ 1, NULL, 0); -} -EXPORT_SYMBOL(req_capsule_other_get); - -/** * Set the size of the PTLRPC request/reply (\a loc) buffer for the given \a * field of the given \a pill. * @@ -2324,9 +2259,9 @@ EXPORT_SYMBOL(req_capsule_has_field); * Returns a non-zero value if the given \a field is present in the given \a * pill's PTLRPC request or reply (\a loc), else it returns 0. */ -int req_capsule_field_present(const struct req_capsule *pill, - const struct req_msg_field *field, - enum req_location loc) +static int req_capsule_field_present(const struct req_capsule *pill, + const struct req_msg_field *field, + enum req_location loc) { int offset; @@ -2336,7 +2271,6 @@ int req_capsule_field_present(const struct req_capsule *pill, offset = __req_capsule_offset(pill, field, loc); return lustre_msg_bufcount(__req_msg(pill, loc)) > offset; } -EXPORT_SYMBOL(req_capsule_field_present); /** * This function shrinks the size of the _buffer_ of the \a pill's PTLRPC @@ -2376,67 +2310,5 @@ void req_capsule_shrink(struct req_capsule *pill, } EXPORT_SYMBOL(req_capsule_shrink); -int req_capsule_server_grow(struct req_capsule *pill, - const struct req_msg_field *field, - unsigned int newlen) -{ - struct ptlrpc_reply_state *rs = pill->rc_req->rq_reply_state, *nrs; - char *from, *to; - int offset, len, rc; - - LASSERT(pill->rc_fmt != NULL); - LASSERT(__req_format_is_sane(pill->rc_fmt)); - LASSERT(req_capsule_has_field(pill, field, RCL_SERVER)); - LASSERT(req_capsule_field_present(pill, field, RCL_SERVER)); - - len = req_capsule_get_size(pill, field, RCL_SERVER); - offset = __req_capsule_offset(pill, field, RCL_SERVER); - if (pill->rc_req->rq_repbuf_len >= - lustre_packed_msg_size(pill->rc_req->rq_repmsg) - len + newlen) - CERROR("Inplace repack might be done\n"); - - pill->rc_req->rq_reply_state = NULL; - req_capsule_set_size(pill, field, RCL_SERVER, newlen); - rc = req_capsule_server_pack(pill); - if (rc) { - /* put old rs back, the caller will decide what to do */ - pill->rc_req->rq_reply_state = rs; - return rc; - } - nrs = pill->rc_req->rq_reply_state; - /* Now we need only buffers, copy first chunk */ - to = lustre_msg_buf(nrs->rs_msg, 0, 0); - from = lustre_msg_buf(rs->rs_msg, 0, 0); - len = (char *)lustre_msg_buf(rs->rs_msg, offset, 0) - from; - memcpy(to, from, len); - /* check if we have tail and copy it too */ - if (rs->rs_msg->lm_bufcount > offset + 1) { - to = lustre_msg_buf(nrs->rs_msg, offset + 1, 0); - from = lustre_msg_buf(rs->rs_msg, offset + 1, 0); - offset = rs->rs_msg->lm_bufcount - 1; - len = (char *)lustre_msg_buf(rs->rs_msg, offset, 0) + - cfs_size_round(rs->rs_msg->lm_buflens[offset]) - from; - memcpy(to, from, len); - } - /* drop old reply if everything is fine */ - if (rs->rs_difficult) { - /* copy rs data */ - int i; - - nrs->rs_difficult = 1; - nrs->rs_no_ack = rs->rs_no_ack; - for (i = 0; i < rs->rs_nlocks; i++) { - nrs->rs_locks[i] = rs->rs_locks[i]; - nrs->rs_modes[i] = rs->rs_modes[i]; - nrs->rs_nlocks++; - } - rs->rs_nlocks = 0; - rs->rs_difficult = 0; - rs->rs_no_ack = 0; - } - ptlrpc_rs_decref(rs); - return 0; -} -EXPORT_SYMBOL(req_capsule_server_grow); /* __REQ_LAYOUT_USER__ */ #endif diff --git a/drivers/staging/lustre/lustre/ptlrpc/llog_client.c b/drivers/staging/lustre/lustre/ptlrpc/llog_client.c index 1c701e0a0..5122205cb 100644 --- a/drivers/staging/lustre/lustre/ptlrpc/llog_client.c +++ b/drivers/staging/lustre/lustre/ptlrpc/llog_client.c @@ -118,6 +118,7 @@ static int llog_client_open(const struct lu_env *env, if (name) { char *tmp; + tmp = req_capsule_client_sized_get(&req->rq_pill, &RMF_NAME, strlen(name) + 1); LASSERT(tmp); @@ -142,41 +143,6 @@ out: return rc; } -static int llog_client_destroy(const struct lu_env *env, - struct llog_handle *loghandle) -{ - struct obd_import *imp; - struct ptlrpc_request *req = NULL; - struct llogd_body *body; - int rc; - - LLOG_CLIENT_ENTRY(loghandle->lgh_ctxt, imp); - req = ptlrpc_request_alloc_pack(imp, &RQF_LLOG_ORIGIN_HANDLE_DESTROY, - LUSTRE_LOG_VERSION, - LLOG_ORIGIN_HANDLE_DESTROY); - if (req == NULL) { - rc = -ENOMEM; - goto err_exit; - } - - body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY); - body->lgd_logid = loghandle->lgh_id; - body->lgd_llh_flags = loghandle->lgh_hdr->llh_flags; - - if (!(body->lgd_llh_flags & LLOG_F_IS_PLAIN)) - CERROR("%s: wrong llog flags %x\n", imp->imp_obd->obd_name, - body->lgd_llh_flags); - - ptlrpc_request_set_replen(req); - rc = ptlrpc_queue_wait(req); - - ptlrpc_req_finished(req); -err_exit: - LLOG_CLIENT_EXIT(loghandle->lgh_ctxt, imp); - return rc; -} - - static int llog_client_next_block(const struct lu_env *env, struct llog_handle *loghandle, int *cur_idx, int next_idx, @@ -360,7 +326,6 @@ struct llog_operations llog_client_ops = { .lop_prev_block = llog_client_prev_block, .lop_read_header = llog_client_read_header, .lop_open = llog_client_open, - .lop_destroy = llog_client_destroy, .lop_close = llog_client_close, }; EXPORT_SYMBOL(llog_client_ops); diff --git a/drivers/staging/lustre/lustre/ptlrpc/lproc_ptlrpc.c b/drivers/staging/lustre/lustre/ptlrpc/lproc_ptlrpc.c index 53f9af1f2..afab0dee7 100644 --- a/drivers/staging/lustre/lustre/ptlrpc/lproc_ptlrpc.c +++ b/drivers/staging/lustre/lustre/ptlrpc/lproc_ptlrpc.c @@ -35,7 +35,6 @@ */ #define DEBUG_SUBSYSTEM S_CLASS - #include "../include/obd_support.h" #include "../include/obd.h" #include "../include/lprocfs_status.h" @@ -44,7 +43,6 @@ #include "../include/obd_class.h" #include "ptlrpc_internal.h" - static struct ll_rpc_opcode { __u32 opcode; const char *opname; @@ -54,7 +52,7 @@ static struct ll_rpc_opcode { { OST_SETATTR, "ost_setattr" }, { OST_READ, "ost_read" }, { OST_WRITE, "ost_write" }, - { OST_CREATE , "ost_create" }, + { OST_CREATE, "ost_create" }, { OST_DESTROY, "ost_destroy" }, { OST_GET_INFO, "ost_get_info" }, { OST_CONNECT, "ost_connect" }, @@ -166,6 +164,7 @@ const char *ll_opcode2str(__u32 opcode) * ptlrpc_internal.h needs to be modified. */ __u32 offset = opcode_offset(opcode); + LASSERTF(offset < LUSTRE_MAX_OPCODES, "offset %u >= LUSTRE_MAX_OPCODES %u\n", offset, LUSTRE_MAX_OPCODES); @@ -239,6 +238,7 @@ ptlrpc_ldebugfs_register(struct dentry *root, char *dir, } for (i = 0; i < LUSTRE_MAX_OPCODES; i++) { __u32 opcode = ll_rpc_opcode_table[i].opcode; + lprocfs_counter_init(svc_stats, EXTRA_MAX_OPCODES + i, svc_counter_config, ll_opcode2str(opcode), "usec"); @@ -270,6 +270,7 @@ ptlrpc_lprocfs_req_history_len_seq_show(struct seq_file *m, void *v) seq_printf(m, "%d\n", total); return 0; } + LPROC_SEQ_FOPS_RO(ptlrpc_lprocfs_req_history_len); static int @@ -322,8 +323,8 @@ ptlrpc_lprocfs_req_history_max_seq_write(struct file *file, return count; } -LPROC_SEQ_FOPS(ptlrpc_lprocfs_req_history_max); +LPROC_SEQ_FOPS(ptlrpc_lprocfs_req_history_max); static ssize_t threads_min_show(struct kobject *kobj, struct attribute *attr, char *buf) @@ -420,7 +421,6 @@ LUSTRE_RW_ATTR(threads_max); * \addtogoup nrs * @{ */ -extern struct nrs_core nrs_core; /** * Translates \e ptlrpc_nrs_pol_state values to human-readable strings. @@ -453,7 +453,7 @@ static const char *nrs_state2str(enum ptlrpc_nrs_pol_state state) * \param[in] policy The policy * \param[out] info Holds returned status information */ -void nrs_policy_get_info_locked(struct ptlrpc_nrs_policy *policy, +static void nrs_policy_get_info_locked(struct ptlrpc_nrs_policy *policy, struct ptlrpc_nrs_pol_info *info) { LASSERT(policy != NULL); @@ -714,6 +714,7 @@ out: return rc < 0 ? rc : count; } + LPROC_SEQ_FOPS(ptlrpc_lprocfs_nrs); /** @} nrs */ @@ -891,36 +892,6 @@ ptlrpc_lprocfs_svc_req_history_next(struct seq_file *s, return NULL; } -/* common ost/mdt so_req_printer */ -void target_print_req(void *seq_file, struct ptlrpc_request *req) -{ - /* Called holding srv_lock with irqs disabled. - * Print specific req contents and a newline. - * CAVEAT EMPTOR: check request message length before printing!!! - * You might have received any old crap so you must be just as - * careful here as the service's request parser!!! */ - struct seq_file *sf = seq_file; - - switch (req->rq_phase) { - case RQ_PHASE_NEW: - /* still awaiting a service thread's attention, or rejected - * because the generic request message didn't unpack */ - seq_printf(sf, "<not swabbed>\n"); - break; - case RQ_PHASE_INTERPRET: - /* being handled, so basic msg swabbed, and opc is valid - * but racing with mds_handle() */ - case RQ_PHASE_COMPLETE: - /* been handled by mds_handle() reply state possibly still - * volatile */ - seq_printf(sf, "opc %d\n", lustre_msg_get_opc(req->rq_reqmsg)); - break; - default: - DEBUG_REQ(D_ERROR, req, "bad phase %d", req->rq_phase); - } -} -EXPORT_SYMBOL(target_print_req); - static int ptlrpc_lprocfs_svc_req_history_show(struct seq_file *s, void *iter) { struct ptlrpc_service *svc = s->private; @@ -938,23 +909,26 @@ static int ptlrpc_lprocfs_svc_req_history_show(struct seq_file *s, void *iter) rc = ptlrpc_lprocfs_svc_req_history_seek(svcpt, srhi, srhi->srhi_seq); if (rc == 0) { + char nidstr[LNET_NIDSTR_SIZE]; + req = srhi->srhi_req; + libcfs_nid2str_r(req->rq_self, nidstr, sizeof(nidstr)); /* Print common req fields. * CAVEAT EMPTOR: we're racing with the service handler * here. The request could contain any old crap, so you * must be just as careful as the service's request * parser. Currently I only print stuff here I know is OK * to look at coz it was set up in request_in_callback()!!! */ - seq_printf(s, "%lld:%s:%s:x%llu:%d:%s:%ld:%lds(%+lds) ", - req->rq_history_seq, libcfs_nid2str(req->rq_self), + seq_printf(s, "%lld:%s:%s:x%llu:%d:%s:%lld:%lds(%+lds) ", + req->rq_history_seq, nidstr, libcfs_id2str(req->rq_peer), req->rq_xid, req->rq_reqlen, ptlrpc_rqphase2str(req), - req->rq_arrival_time.tv_sec, - req->rq_sent - req->rq_arrival_time.tv_sec, - req->rq_sent - req->rq_deadline); + (s64)req->rq_arrival_time.tv_sec, + (long)(req->rq_sent - req->rq_arrival_time.tv_sec), + (long)(req->rq_sent - req->rq_deadline)); if (svc->srv_ops.so_req_printer == NULL) - seq_printf(s, "\n"); + seq_putc(s, '\n'); else svc->srv_ops.so_req_printer(s, srhi->srhi_req); } @@ -990,7 +964,7 @@ static int ptlrpc_lprocfs_timeouts_seq_show(struct seq_file *m, void *n) struct ptlrpc_service *svc = m->private; struct ptlrpc_service_part *svcpt; struct dhms ts; - time_t worstt; + time64_t worstt; unsigned int cur; unsigned int worst; int i; @@ -1005,17 +979,18 @@ static int ptlrpc_lprocfs_timeouts_seq_show(struct seq_file *m, void *n) cur = at_get(&svcpt->scp_at_estimate); worst = svcpt->scp_at_estimate.at_worst_ever; worstt = svcpt->scp_at_estimate.at_worst_time; - s2dhms(&ts, get_seconds() - worstt); + s2dhms(&ts, ktime_get_real_seconds() - worstt); - seq_printf(m, "%10s : cur %3u worst %3u (at %ld, " + seq_printf(m, "%10s : cur %3u worst %3u (at %lld, " DHMS_FMT" ago) ", "service", - cur, worst, worstt, DHMS_VARS(&ts)); + cur, worst, (s64)worstt, DHMS_VARS(&ts)); lprocfs_at_hist_helper(m, &svcpt->scp_at_estimate); } return 0; } + LPROC_SEQ_FOPS_RO(ptlrpc_lprocfs_timeouts); static ssize_t high_priority_ratio_show(struct kobject *kobj, @@ -1208,55 +1183,6 @@ void ptlrpc_lprocfs_unregister_obd(struct obd_device *obd) } EXPORT_SYMBOL(ptlrpc_lprocfs_unregister_obd); - -#define BUFLEN (UUID_MAX + 5) - -int lprocfs_wr_evict_client(struct file *file, const char __user *buffer, - size_t count, loff_t *off) -{ - struct obd_device *obd = ((struct seq_file *)file->private_data)->private; - char *kbuf; - char *tmpbuf; - - kbuf = kzalloc(BUFLEN, GFP_NOFS); - if (!kbuf) - return -ENOMEM; - - /* - * OBD_ALLOC() will zero kbuf, but we only copy BUFLEN - 1 - * bytes into kbuf, to ensure that the string is NUL-terminated. - * UUID_MAX should include a trailing NUL already. - */ - if (copy_from_user(kbuf, buffer, - min_t(unsigned long, BUFLEN - 1, count))) { - count = -EFAULT; - goto out; - } - tmpbuf = cfs_firststr(kbuf, min_t(unsigned long, BUFLEN - 1, count)); - /* Kludge code(deadlock situation): the lprocfs lock has been held - * since the client is evicted by writing client's - * uuid/nid to procfs "evict_client" entry. However, - * obd_export_evict_by_uuid() will call ldebugfs_remove() to destroy - * the proc entries under the being destroyed export{}, so I have - * to drop the lock at first here. - * - jay, jxiong@clusterfs.com */ - class_incref(obd, __func__, current); - - if (strncmp(tmpbuf, "nid:", 4) == 0) - obd_export_evict_by_nid(obd, tmpbuf + 4); - else if (strncmp(tmpbuf, "uuid:", 5) == 0) - obd_export_evict_by_uuid(obd, tmpbuf + 5); - else - obd_export_evict_by_uuid(obd, tmpbuf); - - class_decref(obd, __func__, current); - -out: - kfree(kbuf); - return count; -} -EXPORT_SYMBOL(lprocfs_wr_evict_client); - #undef BUFLEN int lprocfs_wr_ping(struct file *file, const char __user *buffer, @@ -1266,7 +1192,10 @@ int lprocfs_wr_ping(struct file *file, const char __user *buffer, struct ptlrpc_request *req; int rc; - LPROCFS_CLIMP_CHECK(obd); + rc = lprocfs_climp_check(obd); + if (rc) + return rc; + req = ptlrpc_prep_ping(obd->u.cli.cl_import); LPROCFS_CLIMP_EXIT(obd); if (req == NULL) @@ -1328,7 +1257,7 @@ int lprocfs_wr_import(struct file *file, const char __user *buffer, *ptr = 0; do_reconn = 0; ptr += strlen("::"); - inst = simple_strtol(ptr, &endptr, 10); + inst = simple_strtoul(ptr, &endptr, 10); if (*endptr) { CERROR("config: wrong instance # %s\n", ptr); } else if (inst != imp->imp_connect_data.ocd_instance) { @@ -1355,8 +1284,12 @@ int lprocfs_rd_pinger_recov(struct seq_file *m, void *n) { struct obd_device *obd = m->private; struct obd_import *imp = obd->u.cli.cl_import; + int rc; + + rc = lprocfs_climp_check(obd); + if (rc) + return rc; - LPROCFS_CLIMP_CHECK(obd); seq_printf(m, "%d\n", !imp->imp_no_pinger_recover); LPROCFS_CLIMP_EXIT(obd); @@ -1379,7 +1312,10 @@ int lprocfs_wr_pinger_recov(struct file *file, const char __user *buffer, if (val != 0 && val != 1) return -ERANGE; - LPROCFS_CLIMP_CHECK(obd); + rc = lprocfs_climp_check(obd); + if (rc) + return rc; + spin_lock(&imp->imp_lock); imp->imp_no_pinger_recover = !val; spin_unlock(&imp->imp_lock); diff --git a/drivers/staging/lustre/lustre/ptlrpc/niobuf.c b/drivers/staging/lustre/lustre/ptlrpc/niobuf.c index 92c746b44..09ddeef6b 100644 --- a/drivers/staging/lustre/lustre/ptlrpc/niobuf.c +++ b/drivers/staging/lustre/lustre/ptlrpc/niobuf.c @@ -106,12 +106,11 @@ static void mdunlink_iterate_helper(lnet_handle_md_t *bd_mds, int count) LNetMDUnlink(bd_mds[i]); } - /** * Register bulk at the sender for later transfer. * Returns 0 on success or error code. */ -int ptlrpc_register_bulk(struct ptlrpc_request *req) +static int ptlrpc_register_bulk(struct ptlrpc_request *req) { struct ptlrpc_bulk_desc *desc = req->rq_bulk; lnet_process_id_t peer; @@ -232,7 +231,6 @@ int ptlrpc_register_bulk(struct ptlrpc_request *req) return 0; } -EXPORT_SYMBOL(ptlrpc_register_bulk); /** * Disconnect a bulk desc from the network. Idempotent. Not @@ -252,7 +250,7 @@ int ptlrpc_unregister_bulk(struct ptlrpc_request *req, int async) /* Let's setup deadline for reply unlink. */ if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK) && async && req->rq_bulk_deadline == 0) - req->rq_bulk_deadline = get_seconds() + LONG_UNLINK; + req->rq_bulk_deadline = ktime_get_real_seconds() + LONG_UNLINK; if (ptlrpc_client_bulk_active(req) == 0) /* completed or */ return 1; /* never registered */ @@ -303,7 +301,7 @@ static void ptlrpc_at_set_reply(struct ptlrpc_request *req, int flags) { struct ptlrpc_service_part *svcpt = req->rq_rqbd->rqbd_svcpt; struct ptlrpc_service *svc = svcpt->scp_service; - int service_time = max_t(int, get_seconds() - + int service_time = max_t(int, ktime_get_real_seconds() - req->rq_arrival_time.tv_sec, 1); if (!(flags & PTLRPC_REPLY_EARLY) && @@ -328,8 +326,7 @@ static void ptlrpc_at_set_reply(struct ptlrpc_request *req, int flags) /* Report service time estimate for future client reqs, but report 0 * (to be ignored by client) if it's a error reply during recovery. * (bz15815) */ - if (req->rq_type == PTL_RPC_MSG_ERR && - (req->rq_export == NULL || req->rq_export->exp_obd->obd_recovering)) + if (req->rq_type == PTL_RPC_MSG_ERR && !req->rq_export) lustre_msg_set_timeout(req->rq_repmsg, 0); else lustre_msg_set_timeout(req->rq_repmsg, @@ -337,9 +334,8 @@ static void ptlrpc_at_set_reply(struct ptlrpc_request *req, int flags) if (req->rq_reqmsg && !(lustre_msghdr_get_flags(req->rq_reqmsg) & MSGHDR_AT_SUPPORT)) { - CDEBUG(D_ADAPTTO, "No early reply support: flags=%#x req_flags=%#x magic=%d:%x/%x len=%d\n", + CDEBUG(D_ADAPTTO, "No early reply support: flags=%#x req_flags=%#x magic=%x/%x len=%d\n", flags, lustre_msg_get_flags(req->rq_reqmsg), - lustre_msg_is_v1(req->rq_reqmsg), lustre_msg_get_magic(req->rq_reqmsg), lustre_msg_get_magic(req->rq_repmsg), req->rq_replen); } @@ -422,7 +418,7 @@ int ptlrpc_send_reply(struct ptlrpc_request *req, int flags) if (unlikely(rc)) goto out; - req->rq_sent = get_seconds(); + req->rq_sent = ktime_get_real_seconds(); rc = ptl_send_buf(&rs->rs_md_h, rs->rs_repbuf, rs->rs_repdata_len, (rs->rs_difficult && !rs->rs_no_ack) ? @@ -601,7 +597,7 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply) /* Manage remote for early replies */ reply_md.options = PTLRPC_MD_OPTIONS | LNET_MD_OP_PUT | LNET_MD_MANAGE_REMOTE | - LNET_MD_TRUNCATE; /* allow to make EOVERFLOW error */; + LNET_MD_TRUNCATE; /* allow to make EOVERFLOW error */ reply_md.user_ptr = &request->rq_reply_cbid; reply_md.eq_handle = ptlrpc_eq_h; @@ -633,8 +629,8 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply) OBD_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_DELAY_SEND, request->rq_timeout + 5); - do_gettimeofday(&request->rq_arrival_time); - request->rq_sent = get_seconds(); + ktime_get_real_ts64(&request->rq_arrival_time); + request->rq_sent = ktime_get_real_seconds(); /* We give the server rq_timeout secs to process the req, and add the network latency for our local timeout. */ request->rq_deadline = request->rq_sent + request->rq_timeout + diff --git a/drivers/staging/lustre/lustre/ptlrpc/nrs.c b/drivers/staging/lustre/lustre/ptlrpc/nrs.c index d37cdd5ac..7044e1ff6 100644 --- a/drivers/staging/lustre/lustre/ptlrpc/nrs.c +++ b/drivers/staging/lustre/lustre/ptlrpc/nrs.c @@ -49,7 +49,6 @@ /* XXX: This is just for liblustre. Remove the #if defined directive when the * "cfs_" prefix is dropped from cfs_list_head. */ -extern struct list_head ptlrpc_all_services; /** * NRS core object. @@ -478,7 +477,6 @@ static void nrs_resource_get_safe(struct ptlrpc_nrs *nrs, * * \param resp the resource hierarchy that is being released * - * \see ptlrpc_nrs_req_hp_move() * \see ptlrpc_nrs_req_finalize() */ static void nrs_resource_put_safe(struct ptlrpc_nrs_resource **resp) @@ -1113,7 +1111,7 @@ again: * \retval -ve error * \retval 0 success */ -int ptlrpc_nrs_policy_register(struct ptlrpc_nrs_pol_conf *conf) +static int ptlrpc_nrs_policy_register(struct ptlrpc_nrs_pol_conf *conf) { struct ptlrpc_service *svc; struct ptlrpc_nrs_pol_desc *desc; @@ -1249,71 +1247,6 @@ fail: return rc; } -EXPORT_SYMBOL(ptlrpc_nrs_policy_register); - -/** - * Unregisters a previously registered policy with NRS core. All instances of - * the policy on all NRS heads of all supported services are removed. - * - * N.B. This function should only be called from a module's exit() function. - * Although it can be used for policies that ship alongside NRS core, the - * function is primarily intended for policies that register externally, - * from other modules. - * - * \param[in] conf configuration information for the policy to unregister - * - * \retval -ve error - * \retval 0 success - */ -int ptlrpc_nrs_policy_unregister(struct ptlrpc_nrs_pol_conf *conf) -{ - struct ptlrpc_nrs_pol_desc *desc; - int rc; - - LASSERT(conf != NULL); - - if (conf->nc_flags & PTLRPC_NRS_FL_FALLBACK) { - CERROR("Unable to unregister a fallback policy, unless the PTLRPC service is stopping.\n"); - return -EPERM; - } - - conf->nc_name[NRS_POL_NAME_MAX - 1] = '\0'; - - mutex_lock(&nrs_core.nrs_mutex); - - desc = nrs_policy_find_desc_locked(conf->nc_name); - if (desc == NULL) { - CERROR("Failing to unregister NRS policy %s which has not been registered with NRS core!\n", - conf->nc_name); - rc = -ENOENT; - goto not_exist; - } - - mutex_lock(&ptlrpc_all_services_mutex); - - rc = nrs_policy_unregister_locked(desc); - if (rc < 0) { - if (rc == -EBUSY) - CERROR("Please first stop policy %s on all service partitions and then retry to unregister the policy.\n", - conf->nc_name); - goto fail; - } - - CDEBUG(D_INFO, "Unregistering policy %s from NRS core.\n", - conf->nc_name); - - list_del(&desc->pd_list); - kfree(desc); - -fail: - mutex_unlock(&ptlrpc_all_services_mutex); - -not_exist: - mutex_unlock(&nrs_core.nrs_mutex); - - return rc; -} -EXPORT_SYMBOL(ptlrpc_nrs_policy_unregister); /** * Setup NRS heads on all service partitions of service \a svc, and register @@ -1554,22 +1487,6 @@ ptlrpc_nrs_req_get_nolock0(struct ptlrpc_service_part *svcpt, bool hp, } /** - * Dequeues request \a req from the policy it has been enqueued on. - * - * \param[in] req the request - */ -void ptlrpc_nrs_req_del_nolock(struct ptlrpc_request *req) -{ - struct ptlrpc_nrs_policy *policy = nrs_request_policy(&req->rq_nrq); - - policy->pol_desc->pd_ops->op_req_dequeue(policy, &req->rq_nrq); - - req->rq_nrq.nr_enqueued = 0; - - nrs_request_removed(policy); -} - -/** * Returns whether there are any requests currently enqueued on any of the * policies of service partition's \a svcpt NRS head specified by \a hp. Should * be called while holding ptlrpc_service_part::scp_req_lock to get a reliable @@ -1590,48 +1507,6 @@ bool ptlrpc_nrs_req_pending_nolock(struct ptlrpc_service_part *svcpt, bool hp) }; /** - * Moves request \a req from the regular to the high-priority NRS head. - * - * \param[in] req the request to move - */ -void ptlrpc_nrs_req_hp_move(struct ptlrpc_request *req) -{ - struct ptlrpc_service_part *svcpt = req->rq_rqbd->rqbd_svcpt; - struct ptlrpc_nrs_request *nrq = &req->rq_nrq; - struct ptlrpc_nrs_resource *res1[NRS_RES_MAX]; - struct ptlrpc_nrs_resource *res2[NRS_RES_MAX]; - - /** - * Obtain the high-priority NRS head resources. - */ - nrs_resource_get_safe(nrs_svcpt2nrs(svcpt, true), nrq, res1, true); - - spin_lock(&svcpt->scp_req_lock); - - if (!ptlrpc_nrs_req_can_move(req)) - goto out; - - ptlrpc_nrs_req_del_nolock(req); - - memcpy(res2, nrq->nr_res_ptrs, NRS_RES_MAX * sizeof(res2[0])); - memcpy(nrq->nr_res_ptrs, res1, NRS_RES_MAX * sizeof(res1[0])); - - ptlrpc_nrs_hpreq_add_nolock(req); - - memcpy(res1, res2, NRS_RES_MAX * sizeof(res1[0])); -out: - spin_unlock(&svcpt->scp_req_lock); - - /** - * Release either the regular NRS head resources if we moved the - * request, or the high-priority NRS head resources if we took a - * reference earlier in this function and ptlrpc_nrs_req_can_move() - * returned false. - */ - nrs_resource_put_safe(res1); -} - -/** * Carries out a control operation \a opc on the policy identified by the * human-readable \a name, on either all partitions, or only on the first * partition of service \a svc. @@ -1698,7 +1573,6 @@ out: return rc; } - /* ptlrpc/nrs_fifo.c */ extern struct ptlrpc_nrs_pol_conf nrs_conf_fifo; @@ -1720,7 +1594,6 @@ int ptlrpc_nrs_init(void) if (rc != 0) goto fail; - return rc; fail: /** diff --git a/drivers/staging/lustre/lustre/ptlrpc/pack_generic.c b/drivers/staging/lustre/lustre/ptlrpc/pack_generic.c index 84937ad90..f3cb5184f 100644 --- a/drivers/staging/lustre/lustre/ptlrpc/pack_generic.c +++ b/drivers/staging/lustre/lustre/ptlrpc/pack_generic.c @@ -94,32 +94,11 @@ int ptlrpc_buf_need_swab(struct ptlrpc_request *req, const int inout, } EXPORT_SYMBOL(ptlrpc_buf_need_swab); -static inline int lustre_msg_check_version_v2(struct lustre_msg_v2 *msg, - __u32 version) -{ - __u32 ver = lustre_msg_get_version(msg); - return (ver & LUSTRE_VERSION_MASK) != version; -} - -int lustre_msg_check_version(struct lustre_msg *msg, __u32 version) -{ - switch (msg->lm_magic) { - case LUSTRE_MSG_MAGIC_V1: - CERROR("msg v1 not supported - please upgrade you system\n"); - return -EINVAL; - case LUSTRE_MSG_MAGIC_V2: - return lustre_msg_check_version_v2(msg, version); - default: - CERROR("incorrect message magic: %08x\n", msg->lm_magic); - return 0; - } -} -EXPORT_SYMBOL(lustre_msg_check_version); - /* early reply size */ int lustre_msg_early_size(void) { static int size; + if (!size) { /* Always reply old ptlrpc_body_v2 to keep interoperability * with the old client (< 2.3) which doesn't have pb_jobid @@ -129,6 +108,7 @@ int lustre_msg_early_size(void) * client. */ __u32 pblen = sizeof(struct ptlrpc_body_v2); + size = lustre_msg_size(LUSTRE_MSG_MAGIC_V2, 1, &pblen); } return size; @@ -209,6 +189,7 @@ void lustre_init_msg_v2(struct lustre_msg_v2 *msg, int count, __u32 *lens, ptr = (char *)msg + lustre_msg_hdr_size_v2(count); for (i = 0; i < count; i++) { char *tmp = bufs[i]; + LOGL(tmp, lens[i], ptr); } } @@ -433,14 +414,15 @@ void *lustre_msg_buf(struct lustre_msg *m, int n, int min_size) case LUSTRE_MSG_MAGIC_V2: return lustre_msg_buf_v2(m, n, min_size); default: - LASSERTF(0, "incorrect message magic: %08x(msg:%p)\n", m->lm_magic, m); + LASSERTF(0, "incorrect message magic: %08x (msg:%p)\n", + m->lm_magic, m); return NULL; } } EXPORT_SYMBOL(lustre_msg_buf); -int lustre_shrink_msg_v2(struct lustre_msg_v2 *msg, int segment, - unsigned int newlen, int move_data) +static int lustre_shrink_msg_v2(struct lustre_msg_v2 *msg, int segment, + unsigned int newlen, int move_data) { char *tail = NULL, *newpos; int tail_len = 0, n; @@ -593,6 +575,7 @@ EXPORT_SYMBOL(__lustre_unpack_msg); int ptlrpc_unpack_req_msg(struct ptlrpc_request *req, int len) { int rc; + rc = __lustre_unpack_msg(req->rq_reqmsg, len); if (rc == 1) { lustre_set_req_swabbed(req, MSG_PTLRPC_HEADER_OFF); @@ -605,6 +588,7 @@ EXPORT_SYMBOL(ptlrpc_unpack_req_msg); int ptlrpc_unpack_rep_msg(struct ptlrpc_request *req, int len) { int rc; + rc = __lustre_unpack_msg(req->rq_repmsg, len); if (rc == 1) { lustre_set_rep_swabbed(req, MSG_PTLRPC_HEADER_OFF); @@ -692,28 +676,6 @@ int lustre_msg_buflen(struct lustre_msg *m, int n) } EXPORT_SYMBOL(lustre_msg_buflen); -static inline void -lustre_msg_set_buflen_v2(struct lustre_msg_v2 *m, int n, int len) -{ - if (n >= m->lm_bufcount) - LBUG(); - - m->lm_buflens[n] = len; -} - -void lustre_msg_set_buflen(struct lustre_msg *m, int n, int len) -{ - switch (m->lm_magic) { - case LUSTRE_MSG_MAGIC_V2: - lustre_msg_set_buflen_v2(m, n, len); - return; - default: - LASSERTF(0, "incorrect message magic: %08x\n", m->lm_magic); - } -} - -EXPORT_SYMBOL(lustre_msg_set_buflen); - /* NB return the bufcount for lustre_msg_v2 format, so if message is packed * in V1 format, the result is one bigger. (add struct ptlrpc_body). */ int lustre_msg_bufcount(struct lustre_msg *m) @@ -802,14 +764,11 @@ static inline struct ptlrpc_body *lustre_msg_ptlrpc_body(struct lustre_msg *msg) __u32 lustre_msghdr_get_flags(struct lustre_msg *msg) { switch (msg->lm_magic) { - case LUSTRE_MSG_MAGIC_V1: - case LUSTRE_MSG_MAGIC_V1_SWABBED: - return 0; case LUSTRE_MSG_MAGIC_V2: /* already in host endian */ return msg->lm_flags; default: - LASSERTF(0, "incorrect message magic: %08x\n", msg->lm_magic); + CERROR("incorrect message magic: %08x\n", msg->lm_magic); return 0; } } @@ -818,8 +777,6 @@ EXPORT_SYMBOL(lustre_msghdr_get_flags); void lustre_msghdr_set_flags(struct lustre_msg *msg, __u32 flags) { switch (msg->lm_magic) { - case LUSTRE_MSG_MAGIC_V1: - return; case LUSTRE_MSG_MAGIC_V2: msg->lm_flags = flags; return; @@ -833,12 +790,13 @@ __u32 lustre_msg_get_flags(struct lustre_msg *msg) switch (msg->lm_magic) { case LUSTRE_MSG_MAGIC_V2: { struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); - if (!pb) { - CERROR("invalid msg %p: no ptlrpc body!\n", msg); - return 0; - } - return pb->pb_flags; + + if (pb) + return pb->pb_flags; + + CERROR("invalid msg %p: no ptlrpc body!\n", msg); } + /* no break */ default: /* flags might be printed in debug code while message * uninitialized */ @@ -852,6 +810,7 @@ void lustre_msg_add_flags(struct lustre_msg *msg, int flags) switch (msg->lm_magic) { case LUSTRE_MSG_MAGIC_V2: { struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); + LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg); pb->pb_flags |= flags; return; @@ -867,6 +826,7 @@ void lustre_msg_set_flags(struct lustre_msg *msg, int flags) switch (msg->lm_magic) { case LUSTRE_MSG_MAGIC_V2: { struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); + LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg); pb->pb_flags = flags; return; @@ -882,8 +842,9 @@ void lustre_msg_clear_flags(struct lustre_msg *msg, int flags) switch (msg->lm_magic) { case LUSTRE_MSG_MAGIC_V2: { struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); + LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg); - pb->pb_flags &= ~(MSG_GEN_FLAG_MASK & flags); + pb->pb_flags &= ~(flags & MSG_GEN_FLAG_MASK); return; } default: @@ -897,12 +858,13 @@ __u32 lustre_msg_get_op_flags(struct lustre_msg *msg) switch (msg->lm_magic) { case LUSTRE_MSG_MAGIC_V2: { struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); - if (!pb) { - CERROR("invalid msg %p: no ptlrpc body!\n", msg); - return 0; - } - return pb->pb_op_flags; + + if (pb) + return pb->pb_op_flags; + + CERROR("invalid msg %p: no ptlrpc body!\n", msg); } + /* no break */ default: return 0; } @@ -914,21 +876,7 @@ void lustre_msg_add_op_flags(struct lustre_msg *msg, int flags) switch (msg->lm_magic) { case LUSTRE_MSG_MAGIC_V2: { struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); - LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg); - pb->pb_op_flags |= flags; - return; - } - default: - LASSERTF(0, "incorrect message magic: %08x\n", msg->lm_magic); - } -} -EXPORT_SYMBOL(lustre_msg_add_op_flags); -void lustre_msg_set_op_flags(struct lustre_msg *msg, int flags) -{ - switch (msg->lm_magic) { - case LUSTRE_MSG_MAGIC_V2: { - struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg); pb->pb_op_flags |= flags; return; @@ -937,13 +885,14 @@ void lustre_msg_set_op_flags(struct lustre_msg *msg, int flags) LASSERTF(0, "incorrect message magic: %08x\n", msg->lm_magic); } } -EXPORT_SYMBOL(lustre_msg_set_op_flags); +EXPORT_SYMBOL(lustre_msg_add_op_flags); struct lustre_handle *lustre_msg_get_handle(struct lustre_msg *msg) { switch (msg->lm_magic) { case LUSTRE_MSG_MAGIC_V2: { struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); + if (!pb) { CERROR("invalid msg %p: no ptlrpc body!\n", msg); return NULL; @@ -962,6 +911,7 @@ __u32 lustre_msg_get_type(struct lustre_msg *msg) switch (msg->lm_magic) { case LUSTRE_MSG_MAGIC_V2: { struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); + if (!pb) { CERROR("invalid msg %p: no ptlrpc body!\n", msg); return PTL_RPC_MSG_ERR; @@ -975,29 +925,12 @@ __u32 lustre_msg_get_type(struct lustre_msg *msg) } EXPORT_SYMBOL(lustre_msg_get_type); -__u32 lustre_msg_get_version(struct lustre_msg *msg) -{ - switch (msg->lm_magic) { - case LUSTRE_MSG_MAGIC_V2: { - struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); - if (!pb) { - CERROR("invalid msg %p: no ptlrpc body!\n", msg); - return 0; - } - return pb->pb_version; - } - default: - CERROR("incorrect message magic: %08x\n", msg->lm_magic); - return 0; - } -} -EXPORT_SYMBOL(lustre_msg_get_version); - void lustre_msg_add_version(struct lustre_msg *msg, int version) { switch (msg->lm_magic) { case LUSTRE_MSG_MAGIC_V2: { struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); + LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg); pb->pb_version |= version; return; @@ -1013,6 +946,7 @@ __u32 lustre_msg_get_opc(struct lustre_msg *msg) switch (msg->lm_magic) { case LUSTRE_MSG_MAGIC_V2: { struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); + if (!pb) { CERROR("invalid msg %p: no ptlrpc body!\n", msg); return 0; @@ -1020,36 +954,19 @@ __u32 lustre_msg_get_opc(struct lustre_msg *msg) return pb->pb_opc; } default: - CERROR("incorrect message magic: %08x(msg:%p)\n", msg->lm_magic, msg); - LBUG(); + CERROR("incorrect message magic: %08x (msg:%p)\n", + msg->lm_magic, msg); return 0; } } EXPORT_SYMBOL(lustre_msg_get_opc); -__u64 lustre_msg_get_last_xid(struct lustre_msg *msg) -{ - switch (msg->lm_magic) { - case LUSTRE_MSG_MAGIC_V2: { - struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); - if (!pb) { - CERROR("invalid msg %p: no ptlrpc body!\n", msg); - return 0; - } - return pb->pb_last_xid; - } - default: - CERROR("incorrect message magic: %08x\n", msg->lm_magic); - return 0; - } -} -EXPORT_SYMBOL(lustre_msg_get_last_xid); - __u64 lustre_msg_get_last_committed(struct lustre_msg *msg) { switch (msg->lm_magic) { case LUSTRE_MSG_MAGIC_V2: { struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); + if (!pb) { CERROR("invalid msg %p: no ptlrpc body!\n", msg); return 0; @@ -1066,10 +983,9 @@ EXPORT_SYMBOL(lustre_msg_get_last_committed); __u64 *lustre_msg_get_versions(struct lustre_msg *msg) { switch (msg->lm_magic) { - case LUSTRE_MSG_MAGIC_V1: - return NULL; case LUSTRE_MSG_MAGIC_V2: { struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); + if (!pb) { CERROR("invalid msg %p: no ptlrpc body!\n", msg); return NULL; @@ -1088,6 +1004,7 @@ __u64 lustre_msg_get_transno(struct lustre_msg *msg) switch (msg->lm_magic) { case LUSTRE_MSG_MAGIC_V2: { struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); + if (!pb) { CERROR("invalid msg %p: no ptlrpc body!\n", msg); return 0; @@ -1106,12 +1023,13 @@ int lustre_msg_get_status(struct lustre_msg *msg) switch (msg->lm_magic) { case LUSTRE_MSG_MAGIC_V2: { struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); - if (!pb) { - CERROR("invalid msg %p: no ptlrpc body!\n", msg); - return -EINVAL; - } - return pb->pb_status; + + if (pb) + return pb->pb_status; + + CERROR("invalid msg %p: no ptlrpc body!\n", msg); } + /* no break */ default: /* status might be printed in debug code while message * uninitialized */ @@ -1125,6 +1043,7 @@ __u64 lustre_msg_get_slv(struct lustre_msg *msg) switch (msg->lm_magic) { case LUSTRE_MSG_MAGIC_V2: { struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); + if (!pb) { CERROR("invalid msg %p: no ptlrpc body!\n", msg); return -EINVAL; @@ -1138,12 +1057,12 @@ __u64 lustre_msg_get_slv(struct lustre_msg *msg) } EXPORT_SYMBOL(lustre_msg_get_slv); - void lustre_msg_set_slv(struct lustre_msg *msg, __u64 slv) { switch (msg->lm_magic) { case LUSTRE_MSG_MAGIC_V2: { struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); + if (!pb) { CERROR("invalid msg %p: no ptlrpc body!\n", msg); return; @@ -1163,6 +1082,7 @@ __u32 lustre_msg_get_limit(struct lustre_msg *msg) switch (msg->lm_magic) { case LUSTRE_MSG_MAGIC_V2: { struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); + if (!pb) { CERROR("invalid msg %p: no ptlrpc body!\n", msg); return -EINVAL; @@ -1176,12 +1096,12 @@ __u32 lustre_msg_get_limit(struct lustre_msg *msg) } EXPORT_SYMBOL(lustre_msg_get_limit); - void lustre_msg_set_limit(struct lustre_msg *msg, __u64 limit) { switch (msg->lm_magic) { case LUSTRE_MSG_MAGIC_V2: { struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); + if (!pb) { CERROR("invalid msg %p: no ptlrpc body!\n", msg); return; @@ -1201,6 +1121,7 @@ __u32 lustre_msg_get_conn_cnt(struct lustre_msg *msg) switch (msg->lm_magic) { case LUSTRE_MSG_MAGIC_V2: { struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); + if (!pb) { CERROR("invalid msg %p: no ptlrpc body!\n", msg); return 0; @@ -1214,18 +1135,6 @@ __u32 lustre_msg_get_conn_cnt(struct lustre_msg *msg) } EXPORT_SYMBOL(lustre_msg_get_conn_cnt); -int lustre_msg_is_v1(struct lustre_msg *msg) -{ - switch (msg->lm_magic) { - case LUSTRE_MSG_MAGIC_V1: - case LUSTRE_MSG_MAGIC_V1_SWABBED: - return 1; - default: - return 0; - } -} -EXPORT_SYMBOL(lustre_msg_is_v1); - __u32 lustre_msg_get_magic(struct lustre_msg *msg) { switch (msg->lm_magic) { @@ -1241,11 +1150,9 @@ EXPORT_SYMBOL(lustre_msg_get_magic); __u32 lustre_msg_get_timeout(struct lustre_msg *msg) { switch (msg->lm_magic) { - case LUSTRE_MSG_MAGIC_V1: - case LUSTRE_MSG_MAGIC_V1_SWABBED: - return 0; case LUSTRE_MSG_MAGIC_V2: { struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); + if (!pb) { CERROR("invalid msg %p: no ptlrpc body!\n", msg); return 0; @@ -1255,18 +1162,16 @@ __u32 lustre_msg_get_timeout(struct lustre_msg *msg) } default: CERROR("incorrect message magic: %08x\n", msg->lm_magic); - return 0; + return -EPROTO; } } __u32 lustre_msg_get_service_time(struct lustre_msg *msg) { switch (msg->lm_magic) { - case LUSTRE_MSG_MAGIC_V1: - case LUSTRE_MSG_MAGIC_V1_SWABBED: - return 0; case LUSTRE_MSG_MAGIC_V2: { struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); + if (!pb) { CERROR("invalid msg %p: no ptlrpc body!\n", msg); return 0; @@ -1280,28 +1185,6 @@ __u32 lustre_msg_get_service_time(struct lustre_msg *msg) } } -char *lustre_msg_get_jobid(struct lustre_msg *msg) -{ - switch (msg->lm_magic) { - case LUSTRE_MSG_MAGIC_V1: - case LUSTRE_MSG_MAGIC_V1_SWABBED: - return NULL; - case LUSTRE_MSG_MAGIC_V2: { - struct ptlrpc_body *pb = - lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, - sizeof(struct ptlrpc_body)); - if (!pb) - return NULL; - - return pb->pb_jobid; - } - default: - CERROR("incorrect message magic: %08x\n", msg->lm_magic); - return NULL; - } -} -EXPORT_SYMBOL(lustre_msg_get_jobid); - __u32 lustre_msg_get_cksum(struct lustre_msg *msg) { switch (msg->lm_magic) { @@ -1320,6 +1203,7 @@ __u32 lustre_msg_calc_cksum(struct lustre_msg *msg) struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); __u32 crc; unsigned int hsize = 4; + cfs_crypto_hash_digest(CFS_HASH_ALG_CRC32, (unsigned char *)pb, lustre_msg_buflen(msg, MSG_PTLRPC_BODY_OFF), NULL, 0, (unsigned char *)&crc, &hsize); @@ -1336,6 +1220,7 @@ void lustre_msg_set_handle(struct lustre_msg *msg, struct lustre_handle *handle) switch (msg->lm_magic) { case LUSTRE_MSG_MAGIC_V2: { struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); + LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg); pb->pb_handle = *handle; return; @@ -1351,6 +1236,7 @@ void lustre_msg_set_type(struct lustre_msg *msg, __u32 type) switch (msg->lm_magic) { case LUSTRE_MSG_MAGIC_V2: { struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); + LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg); pb->pb_type = type; return; @@ -1366,6 +1252,7 @@ void lustre_msg_set_opc(struct lustre_msg *msg, __u32 opc) switch (msg->lm_magic) { case LUSTRE_MSG_MAGIC_V2: { struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); + LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg); pb->pb_opc = opc; return; @@ -1376,43 +1263,12 @@ void lustre_msg_set_opc(struct lustre_msg *msg, __u32 opc) } EXPORT_SYMBOL(lustre_msg_set_opc); -void lustre_msg_set_last_xid(struct lustre_msg *msg, __u64 last_xid) -{ - switch (msg->lm_magic) { - case LUSTRE_MSG_MAGIC_V2: { - struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); - LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg); - pb->pb_last_xid = last_xid; - return; - } - default: - LASSERTF(0, "incorrect message magic: %08x\n", msg->lm_magic); - } -} -EXPORT_SYMBOL(lustre_msg_set_last_xid); - -void lustre_msg_set_last_committed(struct lustre_msg *msg, __u64 last_committed) -{ - switch (msg->lm_magic) { - case LUSTRE_MSG_MAGIC_V2: { - struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); - LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg); - pb->pb_last_committed = last_committed; - return; - } - default: - LASSERTF(0, "incorrect message magic: %08x\n", msg->lm_magic); - } -} -EXPORT_SYMBOL(lustre_msg_set_last_committed); - void lustre_msg_set_versions(struct lustre_msg *msg, __u64 *versions) { switch (msg->lm_magic) { - case LUSTRE_MSG_MAGIC_V1: - return; case LUSTRE_MSG_MAGIC_V2: { struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); + LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg); pb->pb_pre_versions[0] = versions[0]; pb->pb_pre_versions[1] = versions[1]; @@ -1431,6 +1287,7 @@ void lustre_msg_set_transno(struct lustre_msg *msg, __u64 transno) switch (msg->lm_magic) { case LUSTRE_MSG_MAGIC_V2: { struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); + LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg); pb->pb_transno = transno; return; @@ -1446,6 +1303,7 @@ void lustre_msg_set_status(struct lustre_msg *msg, __u32 status) switch (msg->lm_magic) { case LUSTRE_MSG_MAGIC_V2: { struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); + LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg); pb->pb_status = status; return; @@ -1461,6 +1319,7 @@ void lustre_msg_set_conn_cnt(struct lustre_msg *msg, __u32 conn_cnt) switch (msg->lm_magic) { case LUSTRE_MSG_MAGIC_V2: { struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); + LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg); pb->pb_conn_cnt = conn_cnt; return; @@ -1474,10 +1333,9 @@ EXPORT_SYMBOL(lustre_msg_set_conn_cnt); void lustre_msg_set_timeout(struct lustre_msg *msg, __u32 timeout) { switch (msg->lm_magic) { - case LUSTRE_MSG_MAGIC_V1: - return; case LUSTRE_MSG_MAGIC_V2: { struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); + LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg); pb->pb_timeout = timeout; return; @@ -1490,10 +1348,9 @@ void lustre_msg_set_timeout(struct lustre_msg *msg, __u32 timeout) void lustre_msg_set_service_time(struct lustre_msg *msg, __u32 service_time) { switch (msg->lm_magic) { - case LUSTRE_MSG_MAGIC_V1: - return; case LUSTRE_MSG_MAGIC_V2: { struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg); + LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg); pb->pb_service_time = service_time; return; @@ -1506,8 +1363,6 @@ void lustre_msg_set_service_time(struct lustre_msg *msg, __u32 service_time) void lustre_msg_set_jobid(struct lustre_msg *msg, char *jobid) { switch (msg->lm_magic) { - case LUSTRE_MSG_MAGIC_V1: - return; case LUSTRE_MSG_MAGIC_V2: { __u32 opc = lustre_msg_get_opc(msg); struct ptlrpc_body *pb; @@ -1537,8 +1392,6 @@ EXPORT_SYMBOL(lustre_msg_set_jobid); void lustre_msg_set_cksum(struct lustre_msg *msg, __u32 cksum) { switch (msg->lm_magic) { - case LUSTRE_MSG_MAGIC_V1: - return; case LUSTRE_MSG_MAGIC_V2: msg->lm_cksum = cksum; return; @@ -1547,7 +1400,6 @@ void lustre_msg_set_cksum(struct lustre_msg *msg, __u32 cksum) } } - void ptlrpc_request_set_replen(struct ptlrpc_request *req) { int count = req_capsule_filled_sizes(&req->rq_pill, RCL_SERVER); @@ -1559,14 +1411,6 @@ void ptlrpc_request_set_replen(struct ptlrpc_request *req) } EXPORT_SYMBOL(ptlrpc_request_set_replen); -void ptlrpc_req_set_repsize(struct ptlrpc_request *req, int count, __u32 *lens) -{ - req->rq_replen = lustre_msg_size(req->rq_reqmsg->lm_magic, count, lens); - if (req->rq_reqmsg->lm_magic == LUSTRE_MSG_MAGIC_V2) - req->rq_reqmsg->lm_repsize = req->rq_replen; -} -EXPORT_SYMBOL(ptlrpc_req_set_repsize); - /** * Send a remote set_info_async. * @@ -1690,7 +1534,7 @@ void lustre_swab_connect(struct obd_connect_data *ocd) CLASSERT(offsetof(typeof(*ocd), paddingF) != 0); } -void lustre_swab_obdo(struct obdo *o) +static void lustre_swab_obdo(struct obdo *o) { __swab64s(&o->o_valid); lustre_swab_ost_id(&o->o_oi); @@ -1722,7 +1566,6 @@ void lustre_swab_obdo(struct obdo *o) CLASSERT(offsetof(typeof(*o), o_padding_6) != 0); } -EXPORT_SYMBOL(lustre_swab_obdo); void lustre_swab_obd_statfs(struct obd_statfs *os) { @@ -1874,6 +1717,7 @@ EXPORT_SYMBOL(lustre_swab_mdt_ioepoch); void lustre_swab_mgs_target_info(struct mgs_target_info *mti) { int i; + __swab32s(&mti->mti_lustre_ver); __swab32s(&mti->mti_stripe_index); __swab32s(&mti->mti_config_ver); @@ -1979,7 +1823,7 @@ void lustre_swab_fid2path(struct getinfo_fid2path *gf) } EXPORT_SYMBOL(lustre_swab_fid2path); -void lustre_swab_fiemap_extent(struct ll_fiemap_extent *fm_extent) +static void lustre_swab_fiemap_extent(struct ll_fiemap_extent *fm_extent) { __swab64s(&fm_extent->fe_logical); __swab64s(&fm_extent->fe_physical); @@ -2018,15 +1862,6 @@ void lustre_swab_idx_info(struct idx_info *ii) __swab16s(&ii->ii_recsize); } -void lustre_swab_lip_header(struct lu_idxpage *lip) -{ - /* swab header */ - __swab32s(&lip->lip_magic); - __swab16s(&lip->lip_flags); - __swab16s(&lip->lip_nr); -} -EXPORT_SYMBOL(lustre_swab_lip_header); - void lustre_swab_mdt_rec_reint (struct mdt_rec_reint *rr) { __swab32s(&rr->rr_opcode); @@ -2069,46 +1904,6 @@ void lustre_swab_lov_desc(struct lov_desc *ld) } EXPORT_SYMBOL(lustre_swab_lov_desc); -void lustre_swab_lmv_desc(struct lmv_desc *ld) -{ - __swab32s(&ld->ld_tgt_count); - __swab32s(&ld->ld_active_tgt_count); - __swab32s(&ld->ld_default_stripe_count); - __swab32s(&ld->ld_pattern); - __swab64s(&ld->ld_default_hash_size); - __swab32s(&ld->ld_qos_maxage); - /* uuid endian insensitive */ -} - -void lustre_swab_lmv_stripe_md(struct lmv_stripe_md *mea) -{ - __swab32s(&mea->mea_magic); - __swab32s(&mea->mea_count); - __swab32s(&mea->mea_master); - CLASSERT(offsetof(typeof(*mea), mea_padding) != 0); -} - -void lustre_swab_lmv_user_md(struct lmv_user_md *lum) -{ - int i; - - __swab32s(&lum->lum_magic); - __swab32s(&lum->lum_stripe_count); - __swab32s(&lum->lum_stripe_offset); - __swab32s(&lum->lum_hash_type); - __swab32s(&lum->lum_type); - CLASSERT(offsetof(typeof(*lum), lum_padding1) != 0); - CLASSERT(offsetof(typeof(*lum), lum_padding2) != 0); - CLASSERT(offsetof(typeof(*lum), lum_padding3) != 0); - - for (i = 0; i < lum->lum_stripe_count; i++) { - __swab32s(&lum->lum_objects[i].lum_mds); - lustre_swab_lu_fid(&lum->lum_objects[i].lum_fid); - } - -} -EXPORT_SYMBOL(lustre_swab_lmv_user_md); - static void print_lum(struct lov_user_md *lum) { CDEBUG(D_OTHER, "lov_user_md %p:\n", lum); @@ -2179,16 +1974,15 @@ void lustre_swab_lov_user_md_objects(struct lov_user_ost_data *lod, } EXPORT_SYMBOL(lustre_swab_lov_user_md_objects); -void lustre_swab_ldlm_res_id(struct ldlm_res_id *id) +static void lustre_swab_ldlm_res_id(struct ldlm_res_id *id) { int i; for (i = 0; i < RES_NAME_SIZE; i++) __swab64s(&id->name[i]); } -EXPORT_SYMBOL(lustre_swab_ldlm_res_id); -void lustre_swab_ldlm_policy_data(ldlm_wire_policy_data_t *d) +static void lustre_swab_ldlm_policy_data(ldlm_wire_policy_data_t *d) { /* the lock data is a union and the first two fields are always an * extent so it's ok to process an LDLM_EXTENT and LDLM_FLOCK lock @@ -2199,7 +1993,6 @@ void lustre_swab_ldlm_policy_data(ldlm_wire_policy_data_t *d) __swab64s(&d->l_flock.lfw_owner); __swab32s(&d->l_flock.lfw_pid); } -EXPORT_SYMBOL(lustre_swab_ldlm_policy_data); void lustre_swab_ldlm_intent(struct ldlm_intent *i) { @@ -2207,22 +2000,20 @@ void lustre_swab_ldlm_intent(struct ldlm_intent *i) } EXPORT_SYMBOL(lustre_swab_ldlm_intent); -void lustre_swab_ldlm_resource_desc(struct ldlm_resource_desc *r) +static void lustre_swab_ldlm_resource_desc(struct ldlm_resource_desc *r) { __swab32s(&r->lr_type); CLASSERT(offsetof(typeof(*r), lr_padding) != 0); lustre_swab_ldlm_res_id(&r->lr_name); } -EXPORT_SYMBOL(lustre_swab_ldlm_resource_desc); -void lustre_swab_ldlm_lock_desc(struct ldlm_lock_desc *l) +static void lustre_swab_ldlm_lock_desc(struct ldlm_lock_desc *l) { lustre_swab_ldlm_resource_desc(&l->l_resource); __swab32s(&l->l_req_mode); __swab32s(&l->l_granted_mode); lustre_swab_ldlm_policy_data(&l->l_policy_data); } -EXPORT_SYMBOL(lustre_swab_ldlm_lock_desc); void lustre_swab_ldlm_request(struct ldlm_request *rq) { @@ -2271,7 +2062,7 @@ void dump_rniobuf(struct niobuf_remote *nb) } EXPORT_SYMBOL(dump_rniobuf); -void dump_obdo(struct obdo *oa) +static void dump_obdo(struct obdo *oa) { __u32 valid = oa->o_valid; @@ -2332,7 +2123,6 @@ void dump_obdo(struct obdo *oa) if (valid & OBD_MD_FLCOOKIE) CDEBUG(D_RPCTRACE, "obdo: o_lcookie = (llog_cookie dumping not yet implemented)\n"); } -EXPORT_SYMBOL(dump_obdo); void dump_ost_body(struct ost_body *ob) { @@ -2394,7 +2184,7 @@ void _debug_req(struct ptlrpc_request *req, va_start(args, fmt); libcfs_debug_vmsg2(msgdata, fmt, args, - " req@%p x%llu/t%lld(%lld) o%d->%s@%s:%d/%d lens %d/%d e %d to %d dl " CFS_TIME_T " ref %d fl " REQ_FLAGS_FMT "/%x/%x rc %d/%d\n", + " req@%p x%llu/t%lld(%lld) o%d->%s@%s:%d/%d lens %d/%d e %d to %lld dl %lld ref %d fl " REQ_FLAGS_FMT "/%x/%x rc %d/%d\n", req, req->rq_xid, req->rq_transno, req_ok ? lustre_msg_get_transno(req->rq_reqmsg) : 0, req_ok ? lustre_msg_get_opc(req->rq_reqmsg) : -1, @@ -2406,8 +2196,8 @@ void _debug_req(struct ptlrpc_request *req, libcfs_nid2str(nid), req->rq_request_portal, req->rq_reply_portal, req->rq_reqlen, req->rq_replen, - req->rq_early_count, req->rq_timedout, - req->rq_deadline, + req->rq_early_count, (s64)req->rq_timedout, + (s64)req->rq_deadline, atomic_read(&req->rq_refcount), DEBUG_REQ_FLAGS(req), req_ok ? lustre_msg_get_flags(req->rq_reqmsg) : -1, @@ -2431,14 +2221,6 @@ void lustre_swab_lustre_capa(struct lustre_capa *c) } EXPORT_SYMBOL(lustre_swab_lustre_capa); -void lustre_swab_lustre_capa_key(struct lustre_capa_key *k) -{ - __swab64s(&k->lk_seq); - __swab32s(&k->lk_keyid); - CLASSERT(offsetof(typeof(*k), lk_padding) != 0); -} -EXPORT_SYMBOL(lustre_swab_lustre_capa_key); - void lustre_swab_hsm_user_state(struct hsm_user_state *state) { __swab32s(&state->hus_states); @@ -2455,7 +2237,7 @@ void lustre_swab_hsm_state_set(struct hsm_state_set *hss) } EXPORT_SYMBOL(lustre_swab_hsm_state_set); -void lustre_swab_hsm_extent(struct hsm_extent *extent) +static void lustre_swab_hsm_extent(struct hsm_extent *extent) { __swab64s(&extent->offset); __swab64s(&extent->length); diff --git a/drivers/staging/lustre/lustre/ptlrpc/pers.c b/drivers/staging/lustre/lustre/ptlrpc/pers.c index e1334c24e..2a2a9fb65 100644 --- a/drivers/staging/lustre/lustre/ptlrpc/pers.c +++ b/drivers/staging/lustre/lustre/ptlrpc/pers.c @@ -42,7 +42,6 @@ #include "ptlrpc_internal.h" - void ptlrpc_fill_bulk_md(lnet_md_t *md, struct ptlrpc_bulk_desc *desc, int mdidx) { diff --git a/drivers/staging/lustre/lustre/ptlrpc/pinger.c b/drivers/staging/lustre/lustre/ptlrpc/pinger.c index f8edb791a..5c719f175 100644 --- a/drivers/staging/lustre/lustre/ptlrpc/pinger.c +++ b/drivers/staging/lustre/lustre/ptlrpc/pinger.c @@ -44,20 +44,10 @@ #include "../include/obd_class.h" #include "ptlrpc_internal.h" -static int suppress_pings; -module_param(suppress_pings, int, 0644); -MODULE_PARM_DESC(suppress_pings, "Suppress pings"); - struct mutex pinger_mutex; static LIST_HEAD(pinger_imports); static struct list_head timeout_list = LIST_HEAD_INIT(timeout_list); -int ptlrpc_pinger_suppress_pings(void) -{ - return suppress_pings; -} -EXPORT_SYMBOL(ptlrpc_pinger_suppress_pings); - struct ptlrpc_request * ptlrpc_prep_ping(struct obd_import *imp) { @@ -105,7 +95,7 @@ static int ptlrpc_ping(struct obd_import *imp) DEBUG_REQ(D_INFO, req, "pinging %s->%s", imp->imp_obd->obd_uuid.uuid, obd2cli_tgt(imp->imp_obd)); - ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); + ptlrpcd_add_req(req); return 0; } @@ -113,6 +103,7 @@ static int ptlrpc_ping(struct obd_import *imp) static void ptlrpc_update_next_ping(struct obd_import *imp, int soon) { int time = soon ? PING_INTERVAL_SHORT : PING_INTERVAL; + if (imp->imp_state == LUSTRE_IMP_DISCON) { int dtime = max_t(int, CONNECTION_SWITCH_MIN, AT_OFF ? 0 : @@ -122,11 +113,6 @@ static void ptlrpc_update_next_ping(struct obd_import *imp, int soon) imp->imp_next_ping = cfs_time_shift(time); } -void ptlrpc_ping_import_soon(struct obd_import *imp) -{ - imp->imp_next_ping = cfs_time_current(); -} - static inline int imp_is_deactive(struct obd_import *imp) { return (imp->imp_deactive || @@ -150,6 +136,7 @@ static long pinger_check_timeout(unsigned long time) mutex_lock(&pinger_mutex); list_for_each_entry(item, &timeout_list, ti_chain) { int ti_timeout = item->ti_timeout; + if (timeout > ti_timeout) timeout = ti_timeout; break; @@ -234,7 +221,7 @@ static void ptlrpc_pinger_process_import(struct obd_import *imp, static int ptlrpc_pinger_main(void *arg) { - struct ptlrpc_thread *thread = (struct ptlrpc_thread *)arg; + struct ptlrpc_thread *thread = arg; /* Record that the thread is running */ thread_set_flags(thread, SVC_RUNNING); @@ -266,8 +253,6 @@ static int ptlrpc_pinger_main(void *arg) ptlrpc_update_next_ping(imp, 0); } mutex_unlock(&pinger_mutex); - /* update memory usage info */ - obd_update_maxusage(); /* Wait until the next ping time, or until we're stopped. */ time_to_next_wake = pinger_check_timeout(this_ping); @@ -277,8 +262,8 @@ static int ptlrpc_pinger_main(void *arg) next ping time to next_ping + .01 sec, which means we will SKIP the next ping at next_ping, and the ping will get sent 2 timeouts from now! Beware. */ - CDEBUG(D_INFO, "next wakeup in "CFS_DURATION_T" (" - CFS_TIME_T")\n", time_to_next_wake, + CDEBUG(D_INFO, "next wakeup in " CFS_DURATION_T " (%ld)\n", + time_to_next_wake, cfs_time_add(this_ping, cfs_time_seconds(PING_INTERVAL))); if (time_to_next_wake > 0) { @@ -327,13 +312,10 @@ int ptlrpc_start_pinger(void) l_wait_event(pinger_thread.t_ctl_waitq, thread_is_running(&pinger_thread), &lwi); - if (suppress_pings) - CWARN("Pings will be suppressed at the request of the administrator. The configuration shall meet the additional requirements described in the manual. (Search for the \"suppress_pings\" kernel module parameter.)\n"); - return 0; } -int ptlrpc_pinger_remove_timeouts(void); +static int ptlrpc_pinger_remove_timeouts(void); int ptlrpc_stop_pinger(void) { @@ -517,7 +499,7 @@ int ptlrpc_del_timeout_client(struct list_head *obd_list, } EXPORT_SYMBOL(ptlrpc_del_timeout_client); -int ptlrpc_pinger_remove_timeouts(void) +static int ptlrpc_pinger_remove_timeouts(void) { struct timeout_item *item, *tmp; @@ -536,139 +518,3 @@ void ptlrpc_pinger_wake_up(void) thread_add_flags(&pinger_thread, SVC_EVENT); wake_up(&pinger_thread.t_ctl_waitq); } - -/* Ping evictor thread */ -#define PET_READY 1 -#define PET_TERMINATE 2 - -static int pet_refcount; -static int pet_state; -static wait_queue_head_t pet_waitq; -static LIST_HEAD(pet_list); -static DEFINE_SPINLOCK(pet_lock); - -int ping_evictor_wake(struct obd_export *exp) -{ - struct obd_device *obd; - - spin_lock(&pet_lock); - if (pet_state != PET_READY) { - /* eventually the new obd will call here again. */ - spin_unlock(&pet_lock); - return 1; - } - - obd = class_exp2obd(exp); - if (list_empty(&obd->obd_evict_list)) { - class_incref(obd, "evictor", obd); - list_add(&obd->obd_evict_list, &pet_list); - } - spin_unlock(&pet_lock); - - wake_up(&pet_waitq); - return 0; -} - -static int ping_evictor_main(void *arg) -{ - struct obd_device *obd; - struct obd_export *exp; - struct l_wait_info lwi = { 0 }; - time_t expire_time; - - unshare_fs_struct(); - - CDEBUG(D_HA, "Starting Ping Evictor\n"); - pet_state = PET_READY; - while (1) { - l_wait_event(pet_waitq, (!list_empty(&pet_list)) || - (pet_state == PET_TERMINATE), &lwi); - - /* loop until all obd's will be removed */ - if ((pet_state == PET_TERMINATE) && list_empty(&pet_list)) - break; - - /* we only get here if pet_exp != NULL, and the end of this - * loop is the only place which sets it NULL again, so lock - * is not strictly necessary. */ - spin_lock(&pet_lock); - obd = list_entry(pet_list.next, struct obd_device, - obd_evict_list); - spin_unlock(&pet_lock); - - expire_time = get_seconds() - PING_EVICT_TIMEOUT; - - CDEBUG(D_HA, "evicting all exports of obd %s older than %ld\n", - obd->obd_name, expire_time); - - /* Exports can't be deleted out of the list while we hold - * the obd lock (class_unlink_export), which means we can't - * lose the last ref on the export. If they've already been - * removed from the list, we won't find them here. */ - spin_lock(&obd->obd_dev_lock); - while (!list_empty(&obd->obd_exports_timed)) { - exp = list_entry(obd->obd_exports_timed.next, - struct obd_export, - exp_obd_chain_timed); - if (expire_time > exp->exp_last_request_time) { - class_export_get(exp); - spin_unlock(&obd->obd_dev_lock); - LCONSOLE_WARN("%s: haven't heard from client %s (at %s) in %ld seconds. I think it's dead, and I am evicting it. exp %p, cur %ld expire %ld last %ld\n", - obd->obd_name, - obd_uuid2str(&exp->exp_client_uuid), - obd_export_nid2str(exp), - (long)(get_seconds() - - exp->exp_last_request_time), - exp, (long)get_seconds(), - (long)expire_time, - (long)exp->exp_last_request_time); - CDEBUG(D_HA, "Last request was at %ld\n", - exp->exp_last_request_time); - class_fail_export(exp); - class_export_put(exp); - spin_lock(&obd->obd_dev_lock); - } else { - /* List is sorted, so everyone below is ok */ - break; - } - } - spin_unlock(&obd->obd_dev_lock); - - spin_lock(&pet_lock); - list_del_init(&obd->obd_evict_list); - spin_unlock(&pet_lock); - - class_decref(obd, "evictor", obd); - } - CDEBUG(D_HA, "Exiting Ping Evictor\n"); - - return 0; -} - -void ping_evictor_start(void) -{ - struct task_struct *task; - - if (++pet_refcount > 1) - return; - - init_waitqueue_head(&pet_waitq); - - task = kthread_run(ping_evictor_main, NULL, "ll_evictor"); - if (IS_ERR(task)) { - pet_refcount--; - CERROR("Cannot start ping evictor thread: %ld\n", - PTR_ERR(task)); - } -} -EXPORT_SYMBOL(ping_evictor_start); - -void ping_evictor_stop(void) -{ - if (--pet_refcount > 0) - return; - - pet_state = PET_TERMINATE; - wake_up(&pet_waitq); -} -EXPORT_SYMBOL(ping_evictor_stop); diff --git a/drivers/staging/lustre/lustre/ptlrpc/ptlrpc_internal.h b/drivers/staging/lustre/lustre/ptlrpc/ptlrpc_internal.h index 6dc3998dc..ab6c4580f 100644 --- a/drivers/staging/lustre/lustre/ptlrpc/ptlrpc_internal.h +++ b/drivers/staging/lustre/lustre/ptlrpc/ptlrpc_internal.h @@ -47,10 +47,14 @@ struct ldlm_res_id; struct ptlrpc_request_set; extern int test_req_buffer_pressure; extern struct mutex ptlrpc_all_services_mutex; +extern struct list_head ptlrpc_all_services; + +extern struct mutex ptlrpcd_mutex; +extern struct mutex pinger_mutex; int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt, int wait); /* ptlrpcd.c */ -int ptlrpcd_start(int index, int max, const char *name, struct ptlrpcd_ctl *pc); +int ptlrpcd_start(struct ptlrpcd_ctl *pc); /* client.c */ struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned npages, unsigned max_brw, @@ -110,6 +114,8 @@ struct nrs_core { }; +extern struct nrs_core nrs_core; + int ptlrpc_service_nrs_setup(struct ptlrpc_service *svc); void ptlrpc_service_nrs_cleanup(struct ptlrpc_service *svc); @@ -131,13 +137,6 @@ ptlrpc_nrs_req_get_nolock(struct ptlrpc_service_part *svcpt, bool hp, return ptlrpc_nrs_req_get_nolock0(svcpt, hp, false, force); } -static inline struct ptlrpc_request * -ptlrpc_nrs_req_peek_nolock(struct ptlrpc_service_part *svcpt, bool hp) -{ - return ptlrpc_nrs_req_get_nolock0(svcpt, hp, true, false); -} - -void ptlrpc_nrs_req_del_nolock(struct ptlrpc_request *req); bool ptlrpc_nrs_req_pending_nolock(struct ptlrpc_service_part *svcpt, bool hp); int ptlrpc_nrs_policy_control(const struct ptlrpc_service *svc, @@ -243,8 +242,6 @@ int ptlrpc_stop_pinger(void); void ptlrpc_pinger_sending_on_import(struct obd_import *imp); void ptlrpc_pinger_commit_expected(struct obd_import *imp); void ptlrpc_pinger_wake_up(void); -void ptlrpc_ping_import_soon(struct obd_import *imp); -int ping_evictor_wake(struct obd_export *exp); /* sec_null.c */ int sptlrpc_null_init(void); @@ -298,6 +295,6 @@ static inline void tgt_mod_exit(void) static inline void ptlrpc_reqset_put(struct ptlrpc_request_set *set) { if (atomic_dec_and_test(&set->set_refcount)) - OBD_FREE_PTR(set); + kfree(set); } #endif /* PTLRPC_INTERNAL_H */ diff --git a/drivers/staging/lustre/lustre/ptlrpc/ptlrpc_module.c b/drivers/staging/lustre/lustre/ptlrpc/ptlrpc_module.c index ae99180d6..9deeb2441 100644 --- a/drivers/staging/lustre/lustre/ptlrpc/ptlrpc_module.c +++ b/drivers/staging/lustre/lustre/ptlrpc/ptlrpc_module.c @@ -36,7 +36,6 @@ #define DEBUG_SUBSYSTEM S_RPC - #include "../include/obd_support.h" #include "../include/obd_class.h" #include "../include/lustre_net.h" @@ -48,8 +47,6 @@ extern spinlock_t ptlrpc_last_xid_lock; #if RS_DEBUG extern spinlock_t ptlrpc_rs_debug_lock; #endif -extern struct mutex pinger_mutex; -extern struct mutex ptlrpcd_mutex; static int __init ptlrpc_init(void) { @@ -143,7 +140,8 @@ cleanup: ptlrpc_hr_fini(); req_layout_fini(); /* Fall through */ - default: ; + default: + ; } return rc; diff --git a/drivers/staging/lustre/lustre/ptlrpc/ptlrpcd.c b/drivers/staging/lustre/lustre/ptlrpc/ptlrpcd.c index 17cc81d50..ce036a1ac 100644 --- a/drivers/staging/lustre/lustre/ptlrpc/ptlrpcd.c +++ b/drivers/staging/lustre/lustre/ptlrpc/ptlrpcd.c @@ -67,22 +67,94 @@ #include "ptlrpc_internal.h" +/* One of these per CPT. */ struct ptlrpcd { int pd_size; int pd_index; + int pd_cpt; + int pd_cursor; int pd_nthreads; - struct ptlrpcd_ctl pd_thread_rcv; + int pd_groupsize; struct ptlrpcd_ctl pd_threads[0]; }; +/* + * max_ptlrpcds is obsolete, but retained to ensure that the kernel + * module will load on a system where it has been tuned. + * A value other than 0 implies it was tuned, in which case the value + * is used to derive a setting for ptlrpcd_per_cpt_max. + */ static int max_ptlrpcds; module_param(max_ptlrpcds, int, 0644); MODULE_PARM_DESC(max_ptlrpcds, "Max ptlrpcd thread count to be started."); -static int ptlrpcd_bind_policy = PDB_POLICY_PAIR; +/* + * ptlrpcd_bind_policy is obsolete, but retained to ensure that + * the kernel module will load on a system where it has been tuned. + * A value other than 0 implies it was tuned, in which case the value + * is used to derive a setting for ptlrpcd_partner_group_size. + */ +static int ptlrpcd_bind_policy; module_param(ptlrpcd_bind_policy, int, 0644); -MODULE_PARM_DESC(ptlrpcd_bind_policy, "Ptlrpcd threads binding mode."); -static struct ptlrpcd *ptlrpcds; +MODULE_PARM_DESC(ptlrpcd_bind_policy, + "Ptlrpcd threads binding mode (obsolete)."); + +/* + * ptlrpcd_per_cpt_max: The maximum number of ptlrpcd threads to run + * in a CPT. + */ +static int ptlrpcd_per_cpt_max; +module_param(ptlrpcd_per_cpt_max, int, 0644); +MODULE_PARM_DESC(ptlrpcd_per_cpt_max, + "Max ptlrpcd thread count to be started per cpt."); + +/* + * ptlrpcd_partner_group_size: The desired number of threads in each + * ptlrpcd partner thread group. Default is 2, corresponding to the + * old PDB_POLICY_PAIR. A negative value makes all ptlrpcd threads in + * a CPT partners of each other. + */ +static int ptlrpcd_partner_group_size; +module_param(ptlrpcd_partner_group_size, int, 0644); +MODULE_PARM_DESC(ptlrpcd_partner_group_size, + "Number of ptlrpcd threads in a partner group."); + +/* + * ptlrpcd_cpts: A CPT string describing the CPU partitions that + * ptlrpcd threads should run on. Used to make ptlrpcd threads run on + * a subset of all CPTs. + * + * ptlrpcd_cpts=2 + * ptlrpcd_cpts=[2] + * run ptlrpcd threads only on CPT 2. + * + * ptlrpcd_cpts=0-3 + * ptlrpcd_cpts=[0-3] + * run ptlrpcd threads on CPTs 0, 1, 2, and 3. + * + * ptlrpcd_cpts=[0-3,5,7] + * run ptlrpcd threads on CPTS 0, 1, 2, 3, 5, and 7. + */ +static char *ptlrpcd_cpts; +module_param(ptlrpcd_cpts, charp, 0644); +MODULE_PARM_DESC(ptlrpcd_cpts, + "CPU partitions ptlrpcd threads should run in"); + +/* ptlrpcds_cpt_idx maps cpt numbers to an index in the ptlrpcds array. */ +static int *ptlrpcds_cpt_idx; + +/* ptlrpcds_num is the number of entries in the ptlrpcds array. */ +static int ptlrpcds_num; +static struct ptlrpcd **ptlrpcds; + +/* + * In addition to the regular thread pool above, there is a single + * global recovery thread. Recovery isn't critical for performance, + * and doesn't block, but must always be able to proceed, and it is + * possible that all normal ptlrpcd threads are blocked. Hence the + * need for a dedicated thread. + */ +static struct ptlrpcd_ctl ptlrpcd_rcv; struct mutex ptlrpcd_mutex; static int ptlrpcd_users; @@ -98,88 +170,30 @@ void ptlrpcd_wake(struct ptlrpc_request *req) EXPORT_SYMBOL(ptlrpcd_wake); static struct ptlrpcd_ctl * -ptlrpcd_select_pc(struct ptlrpc_request *req, pdl_policy_t policy, int index) +ptlrpcd_select_pc(struct ptlrpc_request *req) { - int idx = 0; + struct ptlrpcd *pd; + int cpt; + int idx; if (req != NULL && req->rq_send_state != LUSTRE_IMP_FULL) - return &ptlrpcds->pd_thread_rcv; - - switch (policy) { - case PDL_POLICY_SAME: - idx = smp_processor_id() % ptlrpcds->pd_nthreads; - break; - case PDL_POLICY_LOCAL: - /* Before CPU partition patches available, process it the same - * as "PDL_POLICY_ROUND". */ -# ifdef CFS_CPU_MODE_NUMA -# warning "fix this code to use new CPU partition APIs" -# endif - /* Fall through to PDL_POLICY_ROUND until the CPU - * CPU partition patches are available. */ - index = -1; - case PDL_POLICY_PREFERRED: - if (index >= 0 && index < num_online_cpus()) { - idx = index % ptlrpcds->pd_nthreads; - break; - } - /* Fall through to PDL_POLICY_ROUND for bad index. */ - default: - /* Fall through to PDL_POLICY_ROUND for unknown policy. */ - case PDL_POLICY_ROUND: - /* We do not care whether it is strict load balance. */ - idx = ptlrpcds->pd_index + 1; - if (idx == smp_processor_id()) - idx++; - idx %= ptlrpcds->pd_nthreads; - ptlrpcds->pd_index = idx; - break; - } - - return &ptlrpcds->pd_threads[idx]; -} - -/** - * Move all request from an existing request set to the ptlrpcd queue. - * All requests from the set must be in phase RQ_PHASE_NEW. - */ -void ptlrpcd_add_rqset(struct ptlrpc_request_set *set) -{ - struct list_head *tmp, *pos; - struct ptlrpcd_ctl *pc; - struct ptlrpc_request_set *new; - int count, i; + return &ptlrpcd_rcv; - pc = ptlrpcd_select_pc(NULL, PDL_POLICY_LOCAL, -1); - new = pc->pc_set; + cpt = cfs_cpt_current(cfs_cpt_table, 1); + if (!ptlrpcds_cpt_idx) + idx = cpt; + else + idx = ptlrpcds_cpt_idx[cpt]; + pd = ptlrpcds[idx]; - list_for_each_safe(pos, tmp, &set->set_requests) { - struct ptlrpc_request *req = - list_entry(pos, struct ptlrpc_request, - rq_set_chain); - - LASSERT(req->rq_phase == RQ_PHASE_NEW); - req->rq_set = new; - req->rq_queued_time = cfs_time_current(); - } + /* We do not care whether it is strict load balance. */ + idx = pd->pd_cursor; + if (++idx == pd->pd_nthreads) + idx = 0; + pd->pd_cursor = idx; - spin_lock(&new->set_new_req_lock); - list_splice_init(&set->set_requests, &new->set_new_requests); - i = atomic_read(&set->set_remaining); - count = atomic_add_return(i, &new->set_new_count); - atomic_set(&set->set_remaining, 0); - spin_unlock(&new->set_new_req_lock); - if (count == i) { - wake_up(&new->set_waitq); - - /* XXX: It maybe unnecessary to wakeup all the partners. But to - * guarantee the async RPC can be processed ASAP, we have - * no other better choice. It maybe fixed in future. */ - for (i = 0; i < pc->pc_npartners; i++) - wake_up(&pc->pc_partners[i]->pc_set->set_waitq); - } + return &pd->pd_threads[idx]; } -EXPORT_SYMBOL(ptlrpcd_add_rqset); /** * Return transferred RPCs count. @@ -212,7 +226,7 @@ static int ptlrpcd_steal_rqset(struct ptlrpc_request_set *des, * Requests that are added to the ptlrpcd queue are sent via * ptlrpcd_check->ptlrpc_check_set(). */ -void ptlrpcd_add_req(struct ptlrpc_request *req, pdl_policy_t policy, int idx) +void ptlrpcd_add_req(struct ptlrpc_request *req) { struct ptlrpcd_ctl *pc; @@ -242,7 +256,7 @@ void ptlrpcd_add_req(struct ptlrpc_request *req, pdl_policy_t policy, int idx) spin_unlock(&req->rq_lock); } - pc = ptlrpcd_select_pc(req, policy, idx); + pc = ptlrpcd_select_pc(req); DEBUG_REQ(D_INFO, req, "add req [%p] to pc [%s:%d]", req, pc->pc_name, pc->pc_index); @@ -372,25 +386,29 @@ static int ptlrpcd_check(struct lu_env *env, struct ptlrpcd_ctl *pc) static int ptlrpcd(void *arg) { struct ptlrpcd_ctl *pc = arg; - struct ptlrpc_request_set *set = pc->pc_set; + struct ptlrpc_request_set *set; struct lu_env env = { .le_ses = NULL }; - int rc, exit = 0; + int rc = 0; + int exit = 0; unshare_fs_struct(); -#if defined(CONFIG_SMP) - if (test_bit(LIOD_BIND, &pc->pc_flags)) { - int index = pc->pc_index; - - if (index >= 0 && index < num_possible_cpus()) { - while (!cpu_online(index)) { - if (++index >= num_possible_cpus()) - index = 0; - } - set_cpus_allowed_ptr(current, - cpumask_of_node(cpu_to_node(index))); - } + if (cfs_cpt_bind(cfs_cpt_table, pc->pc_cpt) != 0) + CWARN("Failed to bind %s on CPT %d\n", pc->pc_name, pc->pc_cpt); + + /* + * Allocate the request set after the thread has been bound + * above. This is safe because no requests will be queued + * until all ptlrpcd threads have confirmed that they have + * successfully started. + */ + set = ptlrpc_prep_set(); + if (!set) { + rc = -ENOMEM; + goto failed; } -#endif + spin_lock(&pc->pc_lock); + pc->pc_set = set; + spin_unlock(&pc->pc_lock); /* * XXX So far only "client" ptlrpcd uses an environment. In * the future, ptlrpcd thread (or a thread-set) has to given @@ -398,10 +416,10 @@ static int ptlrpcd(void *arg) */ rc = lu_context_init(&env.le_ctx, LCT_CL_THREAD|LCT_REMEMBER|LCT_NOREF); - complete(&pc->pc_starting); - if (rc != 0) - return rc; + goto failed; + + complete(&pc->pc_starting); /* * This mainloop strongly resembles ptlrpc_set_wait() except that our @@ -447,174 +465,97 @@ static int ptlrpcd(void *arg) complete(&pc->pc_finishing); return 0; +failed: + pc->pc_error = rc; + complete(&pc->pc_starting); + return rc; } -/* XXX: We want multiple CPU cores to share the async RPC load. So we start many - * ptlrpcd threads. We also want to reduce the ptlrpcd overhead caused by - * data transfer cross-CPU cores. So we bind ptlrpcd thread to specified - * CPU core. But binding all ptlrpcd threads maybe cause response delay - * because of some CPU core(s) busy with other loads. - * - * For example: "ls -l", some async RPCs for statahead are assigned to - * ptlrpcd_0, and ptlrpcd_0 is bound to CPU_0, but CPU_0 may be quite busy - * with other non-ptlrpcd, like "ls -l" itself (we want to the "ls -l" - * thread, statahead thread, and ptlrpcd thread can run in parallel), under - * such case, the statahead async RPCs can not be processed in time, it is - * unexpected. If ptlrpcd_0 can be re-scheduled on other CPU core, it may - * be better. But it breaks former data transfer policy. - * - * So we shouldn't be blind for avoiding the data transfer. We make some - * compromise: divide the ptlrpcd threads pool into two parts. One part is - * for bound mode, each ptlrpcd thread in this part is bound to some CPU - * core. The other part is for free mode, all the ptlrpcd threads in the - * part can be scheduled on any CPU core. We specify some partnership - * between bound mode ptlrpcd thread(s) and free mode ptlrpcd thread(s), - * and the async RPC load within the partners are shared. +static void ptlrpcd_ctl_init(struct ptlrpcd_ctl *pc, int index, int cpt) +{ + pc->pc_index = index; + pc->pc_cpt = cpt; + init_completion(&pc->pc_starting); + init_completion(&pc->pc_finishing); + spin_lock_init(&pc->pc_lock); + + if (index < 0) { + /* Recovery thread. */ + snprintf(pc->pc_name, sizeof(pc->pc_name), "ptlrpcd_rcv"); + } else { + /* Regular thread. */ + snprintf(pc->pc_name, sizeof(pc->pc_name), + "ptlrpcd_%02d_%02d", cpt, index); + } +} + +/* XXX: We want multiple CPU cores to share the async RPC load. So we + * start many ptlrpcd threads. We also want to reduce the ptlrpcd + * overhead caused by data transfer cross-CPU cores. So we bind + * all ptlrpcd threads to a CPT, in the expectation that CPTs + * will be defined in a way that matches these boundaries. Within + * a CPT a ptlrpcd thread can be scheduled on any available core. * - * It can partly avoid data transfer cross-CPU (if the bound mode ptlrpcd - * thread can be scheduled in time), and try to guarantee the async RPC - * processed ASAP (as long as the free mode ptlrpcd thread can be scheduled - * on any CPU core). + * Each ptlrpcd thread has its own request queue. This can cause + * response delay if the thread is already busy. To help with + * this we define partner threads: these are other threads bound + * to the same CPT which will check for work in each other's + * request queues if they have no work to do. * - * As for how to specify the partnership between bound mode ptlrpcd - * thread(s) and free mode ptlrpcd thread(s), the simplest way is to use - * <free bound> pair. In future, we can specify some more complex - * partnership based on the patches for CPU partition. But before such - * patches are available, we prefer to use the simplest one. + * The desired number of partner threads can be tuned by setting + * ptlrpcd_partner_group_size. The default is to create pairs of + * partner threads. */ -# ifdef CFS_CPU_MODE_NUMA -# warning "fix ptlrpcd_bind() to use new CPU partition APIs" -# endif -static int ptlrpcd_bind(int index, int max) +static int ptlrpcd_partners(struct ptlrpcd *pd, int index) { struct ptlrpcd_ctl *pc; + struct ptlrpcd_ctl **ppc; + int first; + int i; int rc = 0; -#if defined(CONFIG_NUMA) - cpumask_t mask; -#endif + int size; - LASSERT(index <= max - 1); - pc = &ptlrpcds->pd_threads[index]; - switch (ptlrpcd_bind_policy) { - case PDB_POLICY_NONE: - pc->pc_npartners = -1; - break; - case PDB_POLICY_FULL: + LASSERT(index >= 0 && index < pd->pd_nthreads); + pc = &pd->pd_threads[index]; + pc->pc_npartners = pd->pd_groupsize - 1; + + if (pc->pc_npartners <= 0) + goto out; + + size = sizeof(struct ptlrpcd_ctl *) * pc->pc_npartners; + pc->pc_partners = kzalloc_node(size, GFP_NOFS, + cfs_cpt_spread_node(cfs_cpt_table, + pc->pc_cpt)); + if (!pc->pc_partners) { pc->pc_npartners = 0; - set_bit(LIOD_BIND, &pc->pc_flags); - break; - case PDB_POLICY_PAIR: - LASSERT(max % 2 == 0); - pc->pc_npartners = 1; - break; - case PDB_POLICY_NEIGHBOR: -#if defined(CONFIG_NUMA) - { - int i; - cpumask_copy(&mask, cpumask_of_node(cpu_to_node(index))); - for (i = max; i < num_online_cpus(); i++) - cpumask_clear_cpu(i, &mask); - pc->pc_npartners = cpumask_weight(&mask) - 1; - set_bit(LIOD_BIND, &pc->pc_flags); - } -#else - LASSERT(max >= 3); - pc->pc_npartners = 2; -#endif - break; - default: - CERROR("unknown ptlrpcd bind policy %d\n", ptlrpcd_bind_policy); - rc = -EINVAL; + rc = -ENOMEM; + goto out; } - if (rc == 0 && pc->pc_npartners > 0) { - pc->pc_partners = kcalloc(pc->pc_npartners, - sizeof(struct ptlrpcd_ctl *), - GFP_NOFS); - if (pc->pc_partners == NULL) { - pc->pc_npartners = 0; - rc = -ENOMEM; - } else { - switch (ptlrpcd_bind_policy) { - case PDB_POLICY_PAIR: - if (index & 0x1) { - set_bit(LIOD_BIND, &pc->pc_flags); - pc->pc_partners[0] = &ptlrpcds-> - pd_threads[index - 1]; - ptlrpcds->pd_threads[index - 1]. - pc_partners[0] = pc; - } - break; - case PDB_POLICY_NEIGHBOR: -#if defined(CONFIG_NUMA) - { - struct ptlrpcd_ctl *ppc; - int i, pidx; - /* partners are cores in the same NUMA node. - * setup partnership only with ptlrpcd threads - * that are already initialized - */ - for (pidx = 0, i = 0; i < index; i++) { - if (cpumask_test_cpu(i, &mask)) { - ppc = &ptlrpcds->pd_threads[i]; - pc->pc_partners[pidx++] = ppc; - ppc->pc_partners[ppc-> - pc_npartners++] = pc; - } - } - /* adjust number of partners to the number - * of partnership really setup */ - pc->pc_npartners = pidx; - } -#else - if (index & 0x1) - set_bit(LIOD_BIND, &pc->pc_flags); - if (index > 0) { - pc->pc_partners[0] = &ptlrpcds-> - pd_threads[index - 1]; - ptlrpcds->pd_threads[index - 1]. - pc_partners[1] = pc; - if (index == max - 1) { - pc->pc_partners[1] = - &ptlrpcds->pd_threads[0]; - ptlrpcds->pd_threads[0]. - pc_partners[0] = pc; - } - } -#endif - break; - } - } + first = index - index % pd->pd_groupsize; + ppc = pc->pc_partners; + for (i = first; i < first + pd->pd_groupsize; i++) { + if (i != index) + *ppc++ = &pd->pd_threads[i]; } - +out: return rc; } - -int ptlrpcd_start(int index, int max, const char *name, struct ptlrpcd_ctl *pc) +int ptlrpcd_start(struct ptlrpcd_ctl *pc) { - int rc; + struct task_struct *task; + int rc = 0; /* * Do not allow start second thread for one pc. */ if (test_and_set_bit(LIOD_START, &pc->pc_flags)) { CWARN("Starting second thread (%s) for same pc %p\n", - name, pc); + pc->pc_name, pc); return 0; } - pc->pc_index = index; - init_completion(&pc->pc_starting); - init_completion(&pc->pc_finishing); - spin_lock_init(&pc->pc_lock); - strlcpy(pc->pc_name, name, sizeof(pc->pc_name)); - pc->pc_set = ptlrpc_prep_set(); - if (pc->pc_set == NULL) { - rc = -ENOMEM; - goto out; - } - /* * So far only "client" ptlrpcd uses an environment. In the future, * ptlrpcd thread (or a thread-set) has to be given an argument, @@ -622,29 +563,21 @@ int ptlrpcd_start(int index, int max, const char *name, struct ptlrpcd_ctl *pc) */ rc = lu_context_init(&pc->pc_env.le_ctx, LCT_CL_THREAD|LCT_REMEMBER); if (rc != 0) - goto out_set; + goto out; - { - struct task_struct *task; - if (index >= 0) { - rc = ptlrpcd_bind(index, max); - if (rc < 0) - goto out_env; - } + task = kthread_run(ptlrpcd, pc, "%s", pc->pc_name); + if (IS_ERR(task)) { + rc = PTR_ERR(task); + goto out_set; + } - task = kthread_run(ptlrpcd, pc, "%s", pc->pc_name); - if (IS_ERR(task)) { - rc = PTR_ERR(task); - goto out_env; - } + wait_for_completion(&pc->pc_starting); + rc = pc->pc_error; + if (rc != 0) + goto out_set; - wait_for_completion(&pc->pc_starting); - } return 0; -out_env: - lu_context_fini(&pc->pc_env.le_ctx); - out_set: if (pc->pc_set != NULL) { struct ptlrpc_request_set *set = pc->pc_set; @@ -654,7 +587,7 @@ out_set: spin_unlock(&pc->pc_lock); ptlrpc_set_destroy(set); } - clear_bit(LIOD_BIND, &pc->pc_flags); + lu_context_fini(&pc->pc_env.le_ctx); out: clear_bit(LIOD_START, &pc->pc_flags); @@ -694,7 +627,6 @@ void ptlrpcd_free(struct ptlrpcd_ctl *pc) clear_bit(LIOD_START, &pc->pc_flags); clear_bit(LIOD_STOP, &pc->pc_flags); clear_bit(LIOD_FORCE, &pc->pc_flags); - clear_bit(LIOD_BIND, &pc->pc_flags); out: if (pc->pc_npartners > 0) { @@ -704,88 +636,262 @@ out: pc->pc_partners = NULL; } pc->pc_npartners = 0; + pc->pc_error = 0; } static void ptlrpcd_fini(void) { int i; + int j; if (ptlrpcds != NULL) { - for (i = 0; i < ptlrpcds->pd_nthreads; i++) - ptlrpcd_stop(&ptlrpcds->pd_threads[i], 0); - for (i = 0; i < ptlrpcds->pd_nthreads; i++) - ptlrpcd_free(&ptlrpcds->pd_threads[i]); - ptlrpcd_stop(&ptlrpcds->pd_thread_rcv, 0); - ptlrpcd_free(&ptlrpcds->pd_thread_rcv); + for (i = 0; i < ptlrpcds_num; i++) { + if (!ptlrpcds[i]) + break; + for (j = 0; j < ptlrpcds[i]->pd_nthreads; j++) + ptlrpcd_stop(&ptlrpcds[i]->pd_threads[j], 0); + for (j = 0; j < ptlrpcds[i]->pd_nthreads; j++) + ptlrpcd_free(&ptlrpcds[i]->pd_threads[j]); + kfree(ptlrpcds[i]); + ptlrpcds[i] = NULL; + } kfree(ptlrpcds); - ptlrpcds = NULL; } + ptlrpcds_num = 0; + + ptlrpcd_stop(&ptlrpcd_rcv, 0); + ptlrpcd_free(&ptlrpcd_rcv); + + kfree(ptlrpcds_cpt_idx); + ptlrpcds_cpt_idx = NULL; } static int ptlrpcd_init(void) { - int nthreads = num_online_cpus(); - char name[16]; - int size, i = -1, j, rc = 0; - - if (max_ptlrpcds > 0 && max_ptlrpcds < nthreads) - nthreads = max_ptlrpcds; - if (nthreads < 2) - nthreads = 2; - if (nthreads < 3 && ptlrpcd_bind_policy == PDB_POLICY_NEIGHBOR) - ptlrpcd_bind_policy = PDB_POLICY_PAIR; - else if (nthreads % 2 != 0 && ptlrpcd_bind_policy == PDB_POLICY_PAIR) - nthreads &= ~1; /* make sure it is even */ - - size = offsetof(struct ptlrpcd, pd_threads[nthreads]); - ptlrpcds = kzalloc(size, GFP_NOFS); + int nthreads; + int groupsize; + int size; + int i; + int j; + int rc = 0; + struct cfs_cpt_table *cptable; + __u32 *cpts = NULL; + int ncpts; + int cpt; + struct ptlrpcd *pd; + + /* + * Determine the CPTs that ptlrpcd threads will run on. + */ + cptable = cfs_cpt_table; + ncpts = cfs_cpt_number(cptable); + if (ptlrpcd_cpts) { + struct cfs_expr_list *el; + + size = ncpts * sizeof(ptlrpcds_cpt_idx[0]); + ptlrpcds_cpt_idx = kzalloc(size, GFP_KERNEL); + if (!ptlrpcds_cpt_idx) { + rc = -ENOMEM; + goto out; + } + + rc = cfs_expr_list_parse(ptlrpcd_cpts, + strlen(ptlrpcd_cpts), + 0, ncpts - 1, &el); + + if (rc != 0) { + CERROR("ptlrpcd_cpts: invalid CPT pattern string: %s", + ptlrpcd_cpts); + rc = -EINVAL; + goto out; + } + + rc = cfs_expr_list_values(el, ncpts, &cpts); + cfs_expr_list_free(el); + if (rc <= 0) { + CERROR("ptlrpcd_cpts: failed to parse CPT array %s: %d\n", + ptlrpcd_cpts, rc); + if (rc == 0) + rc = -EINVAL; + goto out; + } + + /* + * Create the cpt-to-index map. When there is no match + * in the cpt table, pick a cpt at random. This could + * be changed to take the topology of the system into + * account. + */ + for (cpt = 0; cpt < ncpts; cpt++) { + for (i = 0; i < rc; i++) + if (cpts[i] == cpt) + break; + if (i >= rc) + i = cpt % rc; + ptlrpcds_cpt_idx[cpt] = i; + } + + cfs_expr_list_values_free(cpts, rc); + ncpts = rc; + } + ptlrpcds_num = ncpts; + + size = ncpts * sizeof(ptlrpcds[0]); + ptlrpcds = kzalloc(size, GFP_KERNEL); if (!ptlrpcds) { rc = -ENOMEM; goto out; } - snprintf(name, sizeof(name), "ptlrpcd_rcv"); - set_bit(LIOD_RECOVERY, &ptlrpcds->pd_thread_rcv.pc_flags); - rc = ptlrpcd_start(-1, nthreads, name, &ptlrpcds->pd_thread_rcv); + /* + * The max_ptlrpcds parameter is obsolete, but do something + * sane if it has been tuned, and complain if + * ptlrpcd_per_cpt_max has also been tuned. + */ + if (max_ptlrpcds != 0) { + CWARN("max_ptlrpcds is obsolete.\n"); + if (ptlrpcd_per_cpt_max == 0) { + ptlrpcd_per_cpt_max = max_ptlrpcds / ncpts; + /* Round up if there is a remainder. */ + if (max_ptlrpcds % ncpts != 0) + ptlrpcd_per_cpt_max++; + CWARN("Setting ptlrpcd_per_cpt_max = %d\n", + ptlrpcd_per_cpt_max); + } else { + CWARN("ptlrpd_per_cpt_max is also set!\n"); + } + } + + /* + * The ptlrpcd_bind_policy parameter is obsolete, but do + * something sane if it has been tuned, and complain if + * ptlrpcd_partner_group_size is also tuned. + */ + if (ptlrpcd_bind_policy != 0) { + CWARN("ptlrpcd_bind_policy is obsolete.\n"); + if (ptlrpcd_partner_group_size == 0) { + switch (ptlrpcd_bind_policy) { + case 1: /* PDB_POLICY_NONE */ + case 2: /* PDB_POLICY_FULL */ + ptlrpcd_partner_group_size = 1; + break; + case 3: /* PDB_POLICY_PAIR */ + ptlrpcd_partner_group_size = 2; + break; + case 4: /* PDB_POLICY_NEIGHBOR */ +#ifdef CONFIG_NUMA + ptlrpcd_partner_group_size = -1; /* CPT */ +#else + ptlrpcd_partner_group_size = 3; /* Triplets */ +#endif + break; + default: /* Illegal value, use the default. */ + ptlrpcd_partner_group_size = 2; + break; + } + CWARN("Setting ptlrpcd_partner_group_size = %d\n", + ptlrpcd_partner_group_size); + } else { + CWARN("ptlrpcd_partner_group_size is also set!\n"); + } + } + + if (ptlrpcd_partner_group_size == 0) + ptlrpcd_partner_group_size = 2; + else if (ptlrpcd_partner_group_size < 0) + ptlrpcd_partner_group_size = -1; + else if (ptlrpcd_per_cpt_max > 0 && + ptlrpcd_partner_group_size > ptlrpcd_per_cpt_max) + ptlrpcd_partner_group_size = ptlrpcd_per_cpt_max; + + /* + * Start the recovery thread first. + */ + set_bit(LIOD_RECOVERY, &ptlrpcd_rcv.pc_flags); + ptlrpcd_ctl_init(&ptlrpcd_rcv, -1, CFS_CPT_ANY); + rc = ptlrpcd_start(&ptlrpcd_rcv); if (rc < 0) goto out; - /* XXX: We start nthreads ptlrpc daemons. Each of them can process any - * non-recovery async RPC to improve overall async RPC efficiency. - * - * But there are some issues with async I/O RPCs and async non-I/O - * RPCs processed in the same set under some cases. The ptlrpcd may - * be blocked by some async I/O RPC(s), then will cause other async - * non-I/O RPC(s) can not be processed in time. - * - * Maybe we should distinguish blocked async RPCs from non-blocked - * async RPCs, and process them in different ptlrpcd sets to avoid - * unnecessary dependency. But how to distribute async RPCs load - * among all the ptlrpc daemons becomes another trouble. */ - for (i = 0; i < nthreads; i++) { - snprintf(name, sizeof(name), "ptlrpcd_%d", i); - rc = ptlrpcd_start(i, nthreads, name, &ptlrpcds->pd_threads[i]); - if (rc < 0) + for (i = 0; i < ncpts; i++) { + if (!cpts) + cpt = i; + else + cpt = cpts[i]; + + nthreads = cfs_cpt_weight(cptable, cpt); + if (ptlrpcd_per_cpt_max > 0 && ptlrpcd_per_cpt_max < nthreads) + nthreads = ptlrpcd_per_cpt_max; + if (nthreads < 2) + nthreads = 2; + + if (ptlrpcd_partner_group_size <= 0) { + groupsize = nthreads; + } else if (nthreads <= ptlrpcd_partner_group_size) { + groupsize = nthreads; + } else { + groupsize = ptlrpcd_partner_group_size; + if (nthreads % groupsize != 0) + nthreads += groupsize - (nthreads % groupsize); + } + + size = offsetof(struct ptlrpcd, pd_threads[nthreads]); + pd = kzalloc_node(size, GFP_NOFS, + cfs_cpt_spread_node(cfs_cpt_table, cpt)); + if (!pd) { + rc = -ENOMEM; goto out; - } + } + pd->pd_size = size; + pd->pd_index = i; + pd->pd_cpt = cpt; + pd->pd_cursor = 0; + pd->pd_nthreads = nthreads; + pd->pd_groupsize = groupsize; + ptlrpcds[i] = pd; - ptlrpcds->pd_size = size; - ptlrpcds->pd_index = 0; - ptlrpcds->pd_nthreads = nthreads; + /* + * The ptlrpcd threads in a partner group can access + * each other's struct ptlrpcd_ctl, so these must be + * initialized before any thread is started. + */ + for (j = 0; j < nthreads; j++) { + ptlrpcd_ctl_init(&pd->pd_threads[j], j, cpt); + rc = ptlrpcd_partners(pd, j); + if (rc < 0) + goto out; + } -out: - if (rc != 0 && ptlrpcds != NULL) { - for (j = 0; j <= i; j++) - ptlrpcd_stop(&ptlrpcds->pd_threads[j], 0); - for (j = 0; j <= i; j++) - ptlrpcd_free(&ptlrpcds->pd_threads[j]); - ptlrpcd_stop(&ptlrpcds->pd_thread_rcv, 0); - ptlrpcd_free(&ptlrpcds->pd_thread_rcv); - kfree(ptlrpcds); - ptlrpcds = NULL; + /* XXX: We start nthreads ptlrpc daemons. + * Each of them can process any non-recovery + * async RPC to improve overall async RPC + * efficiency. + * + * But there are some issues with async I/O RPCs + * and async non-I/O RPCs processed in the same + * set under some cases. The ptlrpcd may be + * blocked by some async I/O RPC(s), then will + * cause other async non-I/O RPC(s) can not be + * processed in time. + * + * Maybe we should distinguish blocked async RPCs + * from non-blocked async RPCs, and process them + * in different ptlrpcd sets to avoid unnecessary + * dependency. But how to distribute async RPCs + * load among all the ptlrpc daemons becomes + * another trouble. + */ + for (j = 0; j < nthreads; j++) { + rc = ptlrpcd_start(&pd->pd_threads[j]); + if (rc < 0) + goto out; + } } +out: + if (rc != 0) + ptlrpcd_fini(); - return 0; + return rc; } int ptlrpcd_addref(void) diff --git a/drivers/staging/lustre/lustre/ptlrpc/sec.c b/drivers/staging/lustre/lustre/ptlrpc/sec.c index b9821db22..39f5261c9 100644 --- a/drivers/staging/lustre/lustre/ptlrpc/sec.c +++ b/drivers/staging/lustre/lustre/ptlrpc/sec.c @@ -227,7 +227,7 @@ char *sptlrpc_flavor2name(struct sptlrpc_flavor *sf, char *buf, int bufsize) } EXPORT_SYMBOL(sptlrpc_flavor2name); -char *sptlrpc_secflags2str(__u32 flags, char *buf, int bufsize) +static char *sptlrpc_secflags2str(__u32 flags, char *buf, int bufsize) { buf[0] = '\0'; @@ -244,7 +244,6 @@ char *sptlrpc_secflags2str(__u32 flags, char *buf, int bufsize) return buf; } -EXPORT_SYMBOL(sptlrpc_secflags2str); /************************************************** * client context APIs * @@ -297,53 +296,13 @@ void sptlrpc_cli_ctx_put(struct ptlrpc_cli_ctx *ctx, int sync) } EXPORT_SYMBOL(sptlrpc_cli_ctx_put); -/** - * Expire the client context immediately. - * - * \pre Caller must hold at least 1 reference on the \a ctx. - */ -void sptlrpc_cli_ctx_expire(struct ptlrpc_cli_ctx *ctx) -{ - LASSERT(ctx->cc_ops->force_die); - ctx->cc_ops->force_die(ctx, 0); -} -EXPORT_SYMBOL(sptlrpc_cli_ctx_expire); - -/** - * To wake up the threads who are waiting for this client context. Called - * after some status change happened on \a ctx. - */ -void sptlrpc_cli_ctx_wakeup(struct ptlrpc_cli_ctx *ctx) -{ - struct ptlrpc_request *req, *next; - - spin_lock(&ctx->cc_lock); - list_for_each_entry_safe(req, next, &ctx->cc_req_list, - rq_ctx_chain) { - list_del_init(&req->rq_ctx_chain); - ptlrpc_client_wake_req(req); - } - spin_unlock(&ctx->cc_lock); -} -EXPORT_SYMBOL(sptlrpc_cli_ctx_wakeup); - -int sptlrpc_cli_ctx_display(struct ptlrpc_cli_ctx *ctx, char *buf, int bufsize) -{ - LASSERT(ctx->cc_ops); - - if (ctx->cc_ops->display == NULL) - return 0; - - return ctx->cc_ops->display(ctx, buf, bufsize); -} - static int import_sec_check_expire(struct obd_import *imp) { int adapt = 0; spin_lock(&imp->imp_lock); if (imp->imp_sec_expire && - imp->imp_sec_expire < get_seconds()) { + imp->imp_sec_expire < ktime_get_real_seconds()) { adapt = 1; imp->imp_sec_expire = 0; } @@ -510,7 +469,7 @@ int sptlrpc_req_ctx_switch(struct ptlrpc_request *req, * \note a request must have a context, to keep other parts of code happy. * In any case of failure during the switching, we must restore the old one. */ -int sptlrpc_req_replace_dead_ctx(struct ptlrpc_request *req) +static int sptlrpc_req_replace_dead_ctx(struct ptlrpc_request *req) { struct ptlrpc_cli_ctx *oldctx = req->rq_cli_ctx; struct ptlrpc_cli_ctx *newctx; @@ -563,7 +522,6 @@ int sptlrpc_req_replace_dead_ctx(struct ptlrpc_request *req) sptlrpc_cli_ctx_put(oldctx, 1); return 0; } -EXPORT_SYMBOL(sptlrpc_req_replace_dead_ctx); static int ctx_check_refresh(struct ptlrpc_cli_ctx *ctx) @@ -1229,12 +1187,6 @@ static void sec_cop_destroy_sec(struct ptlrpc_sec *sec) sptlrpc_policy_put(policy); } -void sptlrpc_sec_destroy(struct ptlrpc_sec *sec) -{ - sec_cop_destroy_sec(sec); -} -EXPORT_SYMBOL(sptlrpc_sec_destroy); - static void sptlrpc_sec_kill(struct ptlrpc_sec *sec) { LASSERT_ATOMIC_POS(&sec->ps_refcount); @@ -1246,14 +1198,13 @@ static void sptlrpc_sec_kill(struct ptlrpc_sec *sec) } } -struct ptlrpc_sec *sptlrpc_sec_get(struct ptlrpc_sec *sec) +static struct ptlrpc_sec *sptlrpc_sec_get(struct ptlrpc_sec *sec) { if (sec) atomic_inc(&sec->ps_refcount); return sec; } -EXPORT_SYMBOL(sptlrpc_sec_get); void sptlrpc_sec_put(struct ptlrpc_sec *sec) { @@ -1507,13 +1458,6 @@ static void import_flush_ctx_common(struct obd_import *imp, sptlrpc_sec_put(sec); } -void sptlrpc_import_flush_root_ctx(struct obd_import *imp) -{ - /* it's important to use grace mode, see explain in - * sptlrpc_req_refresh_ctx() */ - import_flush_ctx_common(imp, 0, 1, 1); -} - void sptlrpc_import_flush_my_ctx(struct obd_import *imp) { import_flush_ctx_common(imp, from_kuid(&init_user_ns, current_uid()), @@ -1697,18 +1641,8 @@ void sptlrpc_cli_free_repbuf(struct ptlrpc_request *req) req->rq_repmsg = NULL; } -int sptlrpc_cli_install_rvs_ctx(struct obd_import *imp, - struct ptlrpc_cli_ctx *ctx) -{ - struct ptlrpc_sec_policy *policy = ctx->cc_sec->ps_policy; - - if (!policy->sp_cops->install_rctx) - return 0; - return policy->sp_cops->install_rctx(imp, ctx->cc_sec, ctx); -} - -int sptlrpc_svc_install_rvs_ctx(struct obd_import *imp, - struct ptlrpc_svc_ctx *ctx) +static int sptlrpc_svc_install_rvs_ctx(struct obd_import *imp, + struct ptlrpc_svc_ctx *ctx) { struct ptlrpc_sec_policy *policy = ctx->sc_policy; @@ -1779,7 +1713,7 @@ int sptlrpc_target_export_check(struct obd_export *exp, exp->exp_flvr_old[1] = exp->exp_flvr_old[0]; exp->exp_flvr_expire[1] = exp->exp_flvr_expire[0]; exp->exp_flvr_old[0] = exp->exp_flvr; - exp->exp_flvr_expire[0] = get_seconds() + + exp->exp_flvr_expire[0] = ktime_get_real_seconds() + EXP_FLVR_UPDATE_EXPIRE; exp->exp_flvr = flavor; @@ -1853,14 +1787,14 @@ int sptlrpc_target_export_check(struct obd_export *exp, } if (exp->exp_flvr_expire[0]) { - if (exp->exp_flvr_expire[0] >= get_seconds()) { + if (exp->exp_flvr_expire[0] >= ktime_get_real_seconds()) { if (flavor_allowed(&exp->exp_flvr_old[0], req)) { - CDEBUG(D_SEC, "exp %p (%x|%x|%x): match the middle one (" CFS_DURATION_T ")\n", exp, + CDEBUG(D_SEC, "exp %p (%x|%x|%x): match the middle one (%lld)\n", exp, exp->exp_flvr.sf_rpc, exp->exp_flvr_old[0].sf_rpc, exp->exp_flvr_old[1].sf_rpc, - exp->exp_flvr_expire[0] - - get_seconds()); + (s64)(exp->exp_flvr_expire[0] - + ktime_get_real_seconds())); spin_unlock(&exp->exp_lock); return 0; } @@ -1877,15 +1811,15 @@ int sptlrpc_target_export_check(struct obd_export *exp, /* now it doesn't match the current flavor, the only chance we can * accept it is match the old flavors which is not expired. */ if (exp->exp_flvr_changed == 0 && exp->exp_flvr_expire[1]) { - if (exp->exp_flvr_expire[1] >= get_seconds()) { + if (exp->exp_flvr_expire[1] >= ktime_get_real_seconds()) { if (flavor_allowed(&exp->exp_flvr_old[1], req)) { - CDEBUG(D_SEC, "exp %p (%x|%x|%x): match the oldest one (" CFS_DURATION_T ")\n", + CDEBUG(D_SEC, "exp %p (%x|%x|%x): match the oldest one (%lld)\n", exp, exp->exp_flvr.sf_rpc, exp->exp_flvr_old[0].sf_rpc, exp->exp_flvr_old[1].sf_rpc, - exp->exp_flvr_expire[1] - - get_seconds()); + (s64)(exp->exp_flvr_expire[1] - + ktime_get_real_seconds())); spin_unlock(&exp->exp_lock); return 0; } @@ -1905,7 +1839,7 @@ int sptlrpc_target_export_check(struct obd_export *exp, spin_unlock(&exp->exp_lock); - CWARN("exp %p(%s): req %p (%u|%u|%u|%u|%u|%u) with unauthorized flavor %x, expect %x|%x(%+ld)|%x(%+ld)\n", + CWARN("exp %p(%s): req %p (%u|%u|%u|%u|%u|%u) with unauthorized flavor %x, expect %x|%x(%+lld)|%x(%+lld)\n", exp, exp->exp_obd->obd_name, req, req->rq_auth_gss, req->rq_ctx_init, req->rq_ctx_fini, req->rq_auth_usr_root, req->rq_auth_usr_mdt, req->rq_auth_usr_ost, @@ -1913,56 +1847,14 @@ int sptlrpc_target_export_check(struct obd_export *exp, exp->exp_flvr.sf_rpc, exp->exp_flvr_old[0].sf_rpc, exp->exp_flvr_expire[0] ? - (unsigned long) (exp->exp_flvr_expire[0] - - get_seconds()) : 0, + (s64)(exp->exp_flvr_expire[0] - ktime_get_real_seconds()) : 0, exp->exp_flvr_old[1].sf_rpc, exp->exp_flvr_expire[1] ? - (unsigned long) (exp->exp_flvr_expire[1] - - get_seconds()) : 0); + (s64)(exp->exp_flvr_expire[1] - ktime_get_real_seconds()) : 0); return -EACCES; } EXPORT_SYMBOL(sptlrpc_target_export_check); -void sptlrpc_target_update_exp_flavor(struct obd_device *obd, - struct sptlrpc_rule_set *rset) -{ - struct obd_export *exp; - struct sptlrpc_flavor new_flvr; - - LASSERT(obd); - - spin_lock(&obd->obd_dev_lock); - - list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain) { - if (exp->exp_connection == NULL) - continue; - - /* note if this export had just been updated flavor - * (exp_flvr_changed == 1), this will override the - * previous one. */ - spin_lock(&exp->exp_lock); - sptlrpc_target_choose_flavor(rset, exp->exp_sp_peer, - exp->exp_connection->c_peer.nid, - &new_flvr); - if (exp->exp_flvr_changed || - !flavor_equal(&new_flvr, &exp->exp_flvr)) { - exp->exp_flvr_old[1] = new_flvr; - exp->exp_flvr_expire[1] = 0; - exp->exp_flvr_changed = 1; - exp->exp_flvr_adapt = 1; - - CDEBUG(D_SEC, "exp %p (%s): updated flavor %x->%x\n", - exp, sptlrpc_part2name(exp->exp_sp_peer), - exp->exp_flvr.sf_rpc, - exp->exp_flvr_old[1].sf_rpc); - } - spin_unlock(&exp->exp_lock); - } - - spin_unlock(&obd->obd_dev_lock); -} -EXPORT_SYMBOL(sptlrpc_target_update_exp_flavor); - static int sptlrpc_svc_check_from(struct ptlrpc_request *req, int svc_rc) { /* peer's claim is unreliable unless gss is being used */ @@ -2090,6 +1982,7 @@ int sptlrpc_svc_alloc_rs(struct ptlrpc_request *req, int msglen) rc = policy->sp_sops->alloc_rs(req, msglen); if (unlikely(rc == -ENOMEM)) { struct ptlrpc_service_part *svcpt = req->rq_rqbd->rqbd_svcpt; + if (svcpt->scp_service->srv_max_reply_size < msglen + sizeof(struct ptlrpc_reply_state)) { /* Just return failure if the size is too big */ @@ -2185,19 +2078,6 @@ void sptlrpc_svc_ctx_decref(struct ptlrpc_request *req) req->rq_svc_ctx = NULL; } -void sptlrpc_svc_ctx_invalidate(struct ptlrpc_request *req) -{ - struct ptlrpc_svc_ctx *ctx = req->rq_svc_ctx; - - if (ctx == NULL) - return; - - LASSERT_ATOMIC_POS(&ctx->sc_refcount); - if (ctx->sc_policy->sp_sops->invalidate_ctx) - ctx->sc_policy->sp_sops->invalidate_ctx(ctx); -} -EXPORT_SYMBOL(sptlrpc_svc_ctx_invalidate); - /**************************************** * bulk security * ****************************************/ @@ -2285,7 +2165,6 @@ int sptlrpc_cli_unwrap_bulk_write(struct ptlrpc_request *req, } EXPORT_SYMBOL(sptlrpc_cli_unwrap_bulk_write); - /**************************************** * user descriptor helpers * ****************************************/ @@ -2382,14 +2261,14 @@ EXPORT_SYMBOL(sec2target_str); /* * return true if the bulk data is protected */ -int sptlrpc_flavor_has_bulk(struct sptlrpc_flavor *flvr) +bool sptlrpc_flavor_has_bulk(struct sptlrpc_flavor *flvr) { switch (SPTLRPC_FLVR_BULK_SVC(flvr->sf_rpc)) { case SPTLRPC_BULK_SVC_INTG: case SPTLRPC_BULK_SVC_PRIV: - return 1; + return true; default: - return 0; + return false; } } EXPORT_SYMBOL(sptlrpc_flavor_has_bulk); diff --git a/drivers/staging/lustre/lustre/ptlrpc/sec_bulk.c b/drivers/staging/lustre/lustre/ptlrpc/sec_bulk.c index 2ee3e8b2e..cd8a9987f 100644 --- a/drivers/staging/lustre/lustre/ptlrpc/sec_bulk.c +++ b/drivers/staging/lustre/lustre/ptlrpc/sec_bulk.c @@ -58,7 +58,6 @@ * bulk encryption page pools * ****************************************/ - #define POINTERS_PER_PAGE (PAGE_CACHE_SIZE / sizeof(void *)) #define PAGES_PER_POOL (POINTERS_PER_PAGE) @@ -92,8 +91,8 @@ static struct ptlrpc_enc_page_pool { unsigned long epp_idle_idx; /* last shrink time due to mem tight */ - long epp_last_shrink; - long epp_last_access; + time64_t epp_last_shrink; + time64_t epp_last_access; /* * in-pool pages bookkeeping @@ -145,7 +144,7 @@ int sptlrpc_proc_enc_pool_seq_show(struct seq_file *m, void *v) "cache missing: %lu\n" "low free mark: %lu\n" "max waitqueue depth: %u\n" - "max wait time: " CFS_TIME_T "/%u\n", + "max wait time: %ld/%u\n", totalram_pages, PAGES_PER_POOL, page_pools.epp_max_pages, @@ -153,8 +152,8 @@ int sptlrpc_proc_enc_pool_seq_show(struct seq_file *m, void *v) page_pools.epp_total_pages, page_pools.epp_free_pages, page_pools.epp_idle_idx, - get_seconds() - page_pools.epp_last_shrink, - get_seconds() - page_pools.epp_last_access, + (long)(ktime_get_seconds() - page_pools.epp_last_shrink), + (long)(ktime_get_seconds() - page_pools.epp_last_access), page_pools.epp_st_max_pages, page_pools.epp_st_grows, page_pools.epp_st_grow_fails, @@ -226,7 +225,7 @@ static unsigned long enc_pools_shrink_count(struct shrinker *s, * if no pool access for a long time, we consider it's fully idle. * a little race here is fine. */ - if (unlikely(get_seconds() - page_pools.epp_last_access > + if (unlikely(ktime_get_seconds() - page_pools.epp_last_access > CACHE_QUIESCENT_PERIOD)) { spin_lock(&page_pools.epp_lock); page_pools.epp_idle_idx = IDLE_IDX_MAX; @@ -253,7 +252,7 @@ static unsigned long enc_pools_shrink_scan(struct shrinker *s, (long)sc->nr_to_scan, page_pools.epp_free_pages); page_pools.epp_st_shrinks++; - page_pools.epp_last_shrink = get_seconds(); + page_pools.epp_last_shrink = ktime_get_seconds(); } spin_unlock(&page_pools.epp_lock); @@ -261,7 +260,7 @@ static unsigned long enc_pools_shrink_scan(struct shrinker *s, * if no pool access for a long time, we consider it's fully idle. * a little race here is fine. */ - if (unlikely(get_seconds() - page_pools.epp_last_access > + if (unlikely(ktime_get_seconds() - page_pools.epp_last_access > CACHE_QUIESCENT_PERIOD)) { spin_lock(&page_pools.epp_lock); page_pools.epp_idle_idx = IDLE_IDX_MAX; @@ -302,150 +301,6 @@ static unsigned long enc_pools_cleanup(struct page ***pools, int npools) return cleaned; } -/* - * merge @npools pointed by @pools which contains @npages new pages - * into current pools. - * - * we have options to avoid most memory copy with some tricks. but we choose - * the simplest way to avoid complexity. It's not frequently called. - */ -static void enc_pools_insert(struct page ***pools, int npools, int npages) -{ - int freeslot; - int op_idx, np_idx, og_idx, ng_idx; - int cur_npools, end_npools; - - LASSERT(npages > 0); - LASSERT(page_pools.epp_total_pages+npages <= page_pools.epp_max_pages); - LASSERT(npages_to_npools(npages) == npools); - LASSERT(page_pools.epp_growing); - - spin_lock(&page_pools.epp_lock); - - /* - * (1) fill all the free slots of current pools. - */ - /* free slots are those left by rent pages, and the extra ones with - * index >= total_pages, locate at the tail of last pool. */ - freeslot = page_pools.epp_total_pages % PAGES_PER_POOL; - if (freeslot != 0) - freeslot = PAGES_PER_POOL - freeslot; - freeslot += page_pools.epp_total_pages - page_pools.epp_free_pages; - - op_idx = page_pools.epp_free_pages / PAGES_PER_POOL; - og_idx = page_pools.epp_free_pages % PAGES_PER_POOL; - np_idx = npools - 1; - ng_idx = (npages - 1) % PAGES_PER_POOL; - - while (freeslot) { - LASSERT(page_pools.epp_pools[op_idx][og_idx] == NULL); - LASSERT(pools[np_idx][ng_idx] != NULL); - - page_pools.epp_pools[op_idx][og_idx] = pools[np_idx][ng_idx]; - pools[np_idx][ng_idx] = NULL; - - freeslot--; - - if (++og_idx == PAGES_PER_POOL) { - op_idx++; - og_idx = 0; - } - if (--ng_idx < 0) { - if (np_idx == 0) - break; - np_idx--; - ng_idx = PAGES_PER_POOL - 1; - } - } - - /* - * (2) add pools if needed. - */ - cur_npools = (page_pools.epp_total_pages + PAGES_PER_POOL - 1) / - PAGES_PER_POOL; - end_npools = (page_pools.epp_total_pages + npages + PAGES_PER_POOL - 1) - / PAGES_PER_POOL; - LASSERT(end_npools <= page_pools.epp_max_pools); - - np_idx = 0; - while (cur_npools < end_npools) { - LASSERT(page_pools.epp_pools[cur_npools] == NULL); - LASSERT(np_idx < npools); - LASSERT(pools[np_idx] != NULL); - - page_pools.epp_pools[cur_npools++] = pools[np_idx]; - pools[np_idx++] = NULL; - } - - page_pools.epp_total_pages += npages; - page_pools.epp_free_pages += npages; - page_pools.epp_st_lowfree = page_pools.epp_free_pages; - - if (page_pools.epp_total_pages > page_pools.epp_st_max_pages) - page_pools.epp_st_max_pages = page_pools.epp_total_pages; - - CDEBUG(D_SEC, "add %d pages to total %lu\n", npages, - page_pools.epp_total_pages); - - spin_unlock(&page_pools.epp_lock); -} - -static int enc_pools_add_pages(int npages) -{ - static DEFINE_MUTEX(add_pages_mutex); - struct page ***pools; - int npools, alloced = 0; - int i, j, rc = -ENOMEM; - - if (npages < PTLRPC_MAX_BRW_PAGES) - npages = PTLRPC_MAX_BRW_PAGES; - - mutex_lock(&add_pages_mutex); - - if (npages + page_pools.epp_total_pages > page_pools.epp_max_pages) - npages = page_pools.epp_max_pages - page_pools.epp_total_pages; - LASSERT(npages > 0); - - page_pools.epp_st_grows++; - - npools = npages_to_npools(npages); - pools = kcalloc(npools, sizeof(*pools), GFP_NOFS); - if (pools == NULL) - goto out; - - for (i = 0; i < npools; i++) { - pools[i] = kzalloc(PAGE_CACHE_SIZE, GFP_NOFS); - if (!pools[i]) - goto out_pools; - - for (j = 0; j < PAGES_PER_POOL && alloced < npages; j++) { - pools[i][j] = alloc_page(GFP_NOFS | - __GFP_HIGHMEM); - if (pools[i][j] == NULL) - goto out_pools; - - alloced++; - } - } - LASSERT(alloced == npages); - - enc_pools_insert(pools, npools, npages); - CDEBUG(D_SEC, "added %d pages into pools\n", npages); - rc = 0; - -out_pools: - enc_pools_cleanup(pools, npools); - kfree(pools); -out: - if (rc) { - page_pools.epp_st_grow_fails++; - CERROR("Failed to allocate %d enc pages\n", npages); - } - - mutex_unlock(&add_pages_mutex); - return rc; -} - static inline void enc_pools_wakeup(void) { assert_spin_locked(&page_pools.epp_lock); @@ -457,156 +312,6 @@ static inline void enc_pools_wakeup(void) } } -static int enc_pools_should_grow(int page_needed, long now) -{ - /* don't grow if someone else is growing the pools right now, - * or the pools has reached its full capacity - */ - if (page_pools.epp_growing || - page_pools.epp_total_pages == page_pools.epp_max_pages) - return 0; - - /* if total pages is not enough, we need to grow */ - if (page_pools.epp_total_pages < page_needed) - return 1; - - /* - * we wanted to return 0 here if there was a shrink just happened - * moment ago, but this may cause deadlock if both client and ost - * live on single node. - */ -#if 0 - if (now - page_pools.epp_last_shrink < 2) - return 0; -#endif - - /* - * here we perhaps need consider other factors like wait queue - * length, idle index, etc. ? - */ - - /* grow the pools in any other cases */ - return 1; -} - -/* - * we allocate the requested pages atomically. - */ -int sptlrpc_enc_pool_get_pages(struct ptlrpc_bulk_desc *desc) -{ - wait_queue_t waitlink; - unsigned long this_idle = -1; - unsigned long tick = 0; - long now; - int p_idx, g_idx; - int i; - - LASSERT(desc->bd_iov_count > 0); - LASSERT(desc->bd_iov_count <= page_pools.epp_max_pages); - - /* resent bulk, enc iov might have been allocated previously */ - if (desc->bd_enc_iov != NULL) - return 0; - - desc->bd_enc_iov = kcalloc(desc->bd_iov_count, - sizeof(*desc->bd_enc_iov), GFP_NOFS); - if (desc->bd_enc_iov == NULL) - return -ENOMEM; - - spin_lock(&page_pools.epp_lock); - - page_pools.epp_st_access++; -again: - if (unlikely(page_pools.epp_free_pages < desc->bd_iov_count)) { - if (tick == 0) - tick = cfs_time_current(); - - now = get_seconds(); - - page_pools.epp_st_missings++; - page_pools.epp_pages_short += desc->bd_iov_count; - - if (enc_pools_should_grow(desc->bd_iov_count, now)) { - page_pools.epp_growing = 1; - - spin_unlock(&page_pools.epp_lock); - enc_pools_add_pages(page_pools.epp_pages_short / 2); - spin_lock(&page_pools.epp_lock); - - page_pools.epp_growing = 0; - - enc_pools_wakeup(); - } else { - if (++page_pools.epp_waitqlen > - page_pools.epp_st_max_wqlen) - page_pools.epp_st_max_wqlen = - page_pools.epp_waitqlen; - - set_current_state(TASK_UNINTERRUPTIBLE); - init_waitqueue_entry(&waitlink, current); - add_wait_queue(&page_pools.epp_waitq, &waitlink); - - spin_unlock(&page_pools.epp_lock); - schedule(); - remove_wait_queue(&page_pools.epp_waitq, &waitlink); - LASSERT(page_pools.epp_waitqlen > 0); - spin_lock(&page_pools.epp_lock); - page_pools.epp_waitqlen--; - } - - LASSERT(page_pools.epp_pages_short >= desc->bd_iov_count); - page_pools.epp_pages_short -= desc->bd_iov_count; - - this_idle = 0; - goto again; - } - - /* record max wait time */ - if (unlikely(tick != 0)) { - tick = cfs_time_current() - tick; - if (tick > page_pools.epp_st_max_wait) - page_pools.epp_st_max_wait = tick; - } - - /* proceed with rest of allocation */ - page_pools.epp_free_pages -= desc->bd_iov_count; - - p_idx = page_pools.epp_free_pages / PAGES_PER_POOL; - g_idx = page_pools.epp_free_pages % PAGES_PER_POOL; - - for (i = 0; i < desc->bd_iov_count; i++) { - LASSERT(page_pools.epp_pools[p_idx][g_idx] != NULL); - desc->bd_enc_iov[i].kiov_page = - page_pools.epp_pools[p_idx][g_idx]; - page_pools.epp_pools[p_idx][g_idx] = NULL; - - if (++g_idx == PAGES_PER_POOL) { - p_idx++; - g_idx = 0; - } - } - - if (page_pools.epp_free_pages < page_pools.epp_st_lowfree) - page_pools.epp_st_lowfree = page_pools.epp_free_pages; - - /* - * new idle index = (old * weight + new) / (weight + 1) - */ - if (this_idle == -1) { - this_idle = page_pools.epp_free_pages * IDLE_IDX_MAX / - page_pools.epp_total_pages; - } - page_pools.epp_idle_idx = (page_pools.epp_idle_idx * IDLE_IDX_WEIGHT + - this_idle) / - (IDLE_IDX_WEIGHT + 1); - - page_pools.epp_last_access = get_seconds(); - - spin_unlock(&page_pools.epp_lock); - return 0; -} -EXPORT_SYMBOL(sptlrpc_enc_pool_get_pages); - void sptlrpc_enc_pool_put_pages(struct ptlrpc_bulk_desc *desc) { int p_idx, g_idx; @@ -651,41 +356,6 @@ void sptlrpc_enc_pool_put_pages(struct ptlrpc_bulk_desc *desc) } EXPORT_SYMBOL(sptlrpc_enc_pool_put_pages); -/* - * we don't do much stuff for add_user/del_user anymore, except adding some - * initial pages in add_user() if current pools are empty, rest would be - * handled by the pools's self-adaption. - */ -int sptlrpc_enc_pool_add_user(void) -{ - int need_grow = 0; - - spin_lock(&page_pools.epp_lock); - if (page_pools.epp_growing == 0 && page_pools.epp_total_pages == 0) { - page_pools.epp_growing = 1; - need_grow = 1; - } - spin_unlock(&page_pools.epp_lock); - - if (need_grow) { - enc_pools_add_pages(PTLRPC_MAX_BRW_PAGES + - PTLRPC_MAX_BRW_PAGES); - - spin_lock(&page_pools.epp_lock); - page_pools.epp_growing = 0; - enc_pools_wakeup(); - spin_unlock(&page_pools.epp_lock); - } - return 0; -} -EXPORT_SYMBOL(sptlrpc_enc_pool_add_user); - -int sptlrpc_enc_pool_del_user(void) -{ - return 0; -} -EXPORT_SYMBOL(sptlrpc_enc_pool_del_user); - static inline void enc_pools_alloc(void) { LASSERT(page_pools.epp_max_pools); @@ -725,8 +395,8 @@ int sptlrpc_enc_pool_init(void) page_pools.epp_growing = 0; page_pools.epp_idle_idx = 0; - page_pools.epp_last_shrink = get_seconds(); - page_pools.epp_last_access = get_seconds(); + page_pools.epp_last_shrink = ktime_get_seconds(); + page_pools.epp_last_access = ktime_get_seconds(); spin_lock_init(&page_pools.epp_lock); page_pools.epp_total_pages = 0; @@ -768,8 +438,7 @@ void sptlrpc_enc_pool_fini(void) if (page_pools.epp_st_access > 0) { CDEBUG(D_SEC, - "max pages %lu, grows %u, grow fails %u, shrinks %u, access %lu, missing %lu, max qlen %u, max wait " - CFS_TIME_T"/%d\n", + "max pages %lu, grows %u, grow fails %u, shrinks %u, access %lu, missing %lu, max qlen %u, max wait %ld/%d\n", page_pools.epp_st_max_pages, page_pools.epp_st_grows, page_pools.epp_st_grow_fails, page_pools.epp_st_shrinks, page_pools.epp_st_access, @@ -778,7 +447,6 @@ void sptlrpc_enc_pool_fini(void) } } - static int cfs_hash_alg_id[] = { [BULK_HASH_ALG_NULL] = CFS_HASH_ALG_NULL, [BULK_HASH_ALG_ADLER32] = CFS_HASH_ALG_ADLER32, @@ -789,6 +457,7 @@ static int cfs_hash_alg_id[] = { [BULK_HASH_ALG_SHA384] = CFS_HASH_ALG_SHA384, [BULK_HASH_ALG_SHA512] = CFS_HASH_ALG_SHA512, }; + const char *sptlrpc_get_hash_name(__u8 hash_alg) { return cfs_crypto_hash_name(cfs_hash_alg_id[hash_alg]); @@ -871,8 +540,7 @@ int sptlrpc_get_bulk_checksum(struct ptlrpc_bulk_desc *desc, __u8 alg, memcpy(buf, hashbuf, buflen); } else { bufsize = buflen; - err = cfs_crypto_hash_final(hdesc, (unsigned char *)buf, - &bufsize); + err = cfs_crypto_hash_final(hdesc, buf, &bufsize); } if (err) diff --git a/drivers/staging/lustre/lustre/ptlrpc/sec_config.c b/drivers/staging/lustre/lustre/ptlrpc/sec_config.c index e7f2f3332..7ff948fe1 100644 --- a/drivers/staging/lustre/lustre/ptlrpc/sec_config.c +++ b/drivers/staging/lustre/lustre/ptlrpc/sec_config.c @@ -48,27 +48,6 @@ #include "ptlrpc_internal.h" -const char *sptlrpc_part2name(enum lustre_sec_part part) -{ - switch (part) { - case LUSTRE_SP_CLI: - return "cli"; - case LUSTRE_SP_MDT: - return "mdt"; - case LUSTRE_SP_OST: - return "ost"; - case LUSTRE_SP_MGC: - return "mgc"; - case LUSTRE_SP_MGS: - return "mgs"; - case LUSTRE_SP_ANY: - return "any"; - default: - return "err"; - } -} -EXPORT_SYMBOL(sptlrpc_part2name); - enum lustre_sec_part sptlrpc_target_sec_part(struct obd_device *obd) { const char *type = obd->obd_type->typ_name; @@ -180,7 +159,7 @@ static void sptlrpc_rule_init(struct sptlrpc_rule *rule) /* * format: network[.direction]=flavor */ -int sptlrpc_parse_rule(char *param, struct sptlrpc_rule *rule) +static int sptlrpc_parse_rule(char *param, struct sptlrpc_rule *rule) { char *flavor, *dir; int rc; @@ -234,9 +213,8 @@ int sptlrpc_parse_rule(char *param, struct sptlrpc_rule *rule) return 0; } -EXPORT_SYMBOL(sptlrpc_parse_rule); -void sptlrpc_rule_set_free(struct sptlrpc_rule_set *rset) +static void sptlrpc_rule_set_free(struct sptlrpc_rule_set *rset) { LASSERT(rset->srs_nslot || (rset->srs_nrule == 0 && rset->srs_rules == NULL)); @@ -246,12 +224,11 @@ void sptlrpc_rule_set_free(struct sptlrpc_rule_set *rset) sptlrpc_rule_set_init(rset); } } -EXPORT_SYMBOL(sptlrpc_rule_set_free); /* * return 0 if the rule set could accommodate one more rule. */ -int sptlrpc_rule_set_expand(struct sptlrpc_rule_set *rset) +static int sptlrpc_rule_set_expand(struct sptlrpc_rule_set *rset) { struct sptlrpc_rule *rules; int nslot; @@ -280,22 +257,24 @@ int sptlrpc_rule_set_expand(struct sptlrpc_rule_set *rset) rset->srs_nslot = nslot; return 0; } -EXPORT_SYMBOL(sptlrpc_rule_set_expand); static inline int rule_spec_dir(struct sptlrpc_rule *rule) { return (rule->sr_from != LUSTRE_SP_ANY || rule->sr_to != LUSTRE_SP_ANY); } + static inline int rule_spec_net(struct sptlrpc_rule *rule) { return (rule->sr_netid != LNET_NIDNET(LNET_NID_ANY)); } + static inline int rule_match_dir(struct sptlrpc_rule *r1, struct sptlrpc_rule *r2) { return (r1->sr_from == r2->sr_from && r1->sr_to == r2->sr_to); } + static inline int rule_match_net(struct sptlrpc_rule *r1, struct sptlrpc_rule *r2) { @@ -306,8 +285,8 @@ static inline int rule_match_net(struct sptlrpc_rule *r1, * merge @rule into @rset. * the @rset slots might be expanded. */ -int sptlrpc_rule_set_merge(struct sptlrpc_rule_set *rset, - struct sptlrpc_rule *rule) +static int sptlrpc_rule_set_merge(struct sptlrpc_rule_set *rset, + struct sptlrpc_rule *rule) { struct sptlrpc_rule *p = rset->srs_rules; int spec_dir, spec_net; @@ -391,17 +370,16 @@ int sptlrpc_rule_set_merge(struct sptlrpc_rule_set *rset, return 0; } -EXPORT_SYMBOL(sptlrpc_rule_set_merge); /** * given from/to/nid, determine a matching flavor in ruleset. * return 1 if a match found, otherwise return 0. */ -int sptlrpc_rule_set_choose(struct sptlrpc_rule_set *rset, - enum lustre_sec_part from, - enum lustre_sec_part to, - lnet_nid_t nid, - struct sptlrpc_flavor *sf) +static int sptlrpc_rule_set_choose(struct sptlrpc_rule_set *rset, + enum lustre_sec_part from, + enum lustre_sec_part to, + lnet_nid_t nid, + struct sptlrpc_flavor *sf) { struct sptlrpc_rule *r; int n; @@ -428,20 +406,6 @@ int sptlrpc_rule_set_choose(struct sptlrpc_rule_set *rset, return 0; } -EXPORT_SYMBOL(sptlrpc_rule_set_choose); - -void sptlrpc_rule_set_dump(struct sptlrpc_rule_set *rset) -{ - struct sptlrpc_rule *r; - int n; - - for (n = 0; n < rset->srs_nrule; n++) { - r = &rset->srs_rules[n]; - CDEBUG(D_SEC, "<%02d> from %x to %x, net %x, rpc %x\n", n, - r->sr_from, r->sr_to, r->sr_netid, r->sr_flvr.sf_rpc); - } -} -EXPORT_SYMBOL(sptlrpc_rule_set_dump); /********************************** * sptlrpc configuration support * @@ -836,20 +800,6 @@ out: flavor_set_flags(sf, from, to, 1); } -/** - * called by target devices, determine the expected flavor from - * certain peer (from, nid). - */ -void sptlrpc_target_choose_flavor(struct sptlrpc_rule_set *rset, - enum lustre_sec_part from, - lnet_nid_t nid, - struct sptlrpc_flavor *sf) -{ - if (sptlrpc_rule_set_choose(rset, from, LUSTRE_SP_ANY, nid, sf) == 0) - get_default_flavor(sf); -} -EXPORT_SYMBOL(sptlrpc_target_choose_flavor); - #define SEC_ADAPT_DELAY (10) /** @@ -871,7 +821,7 @@ void sptlrpc_conf_client_adapt(struct obd_device *obd) if (imp) { spin_lock(&imp->imp_lock); if (imp->imp_sec) - imp->imp_sec_expire = get_seconds() + + imp->imp_sec_expire = ktime_get_real_seconds() + SEC_ADAPT_DELAY; spin_unlock(&imp->imp_lock); } diff --git a/drivers/staging/lustre/lustre/ptlrpc/sec_gc.c b/drivers/staging/lustre/lustre/ptlrpc/sec_gc.c index cdad608bd..6e58d5f95 100644 --- a/drivers/staging/lustre/lustre/ptlrpc/sec_gc.c +++ b/drivers/staging/lustre/lustre/ptlrpc/sec_gc.c @@ -51,7 +51,6 @@ #define SEC_GC_INTERVAL (30 * 60) - static struct mutex sec_gc_mutex; static LIST_HEAD(sec_gc_list); static spinlock_t sec_gc_list_lock; @@ -62,14 +61,13 @@ static spinlock_t sec_gc_ctx_list_lock; static struct ptlrpc_thread sec_gc_thread; static atomic_t sec_gc_wait_del = ATOMIC_INIT(0); - void sptlrpc_gc_add_sec(struct ptlrpc_sec *sec) { LASSERT(sec->ps_policy->sp_cops->gc_ctx); LASSERT(sec->ps_gc_interval > 0); LASSERT(list_empty(&sec->ps_gc_list)); - sec->ps_gc_next = get_seconds() + sec->ps_gc_interval; + sec->ps_gc_next = ktime_get_real_seconds() + sec->ps_gc_interval; spin_lock(&sec_gc_list_lock); list_add_tail(&sec_gc_list, &sec->ps_gc_list); @@ -103,21 +101,6 @@ void sptlrpc_gc_del_sec(struct ptlrpc_sec *sec) } EXPORT_SYMBOL(sptlrpc_gc_del_sec); -void sptlrpc_gc_add_ctx(struct ptlrpc_cli_ctx *ctx) -{ - LASSERT(list_empty(&ctx->cc_gc_chain)); - - CDEBUG(D_SEC, "hand over ctx %p(%u->%s)\n", - ctx, ctx->cc_vcred.vc_uid, sec2target_str(ctx->cc_sec)); - spin_lock(&sec_gc_ctx_list_lock); - list_add(&ctx->cc_gc_chain, &sec_gc_ctx_list); - spin_unlock(&sec_gc_ctx_list_lock); - - thread_add_flags(&sec_gc_thread, SVC_SIGNAL); - wake_up(&sec_gc_thread.t_ctl_waitq); -} -EXPORT_SYMBOL(sptlrpc_gc_add_ctx); - static void sec_process_ctx_list(void) { struct ptlrpc_cli_ctx *ctx; @@ -154,16 +137,16 @@ static void sec_do_gc(struct ptlrpc_sec *sec) CDEBUG(D_SEC, "check on sec %p(%s)\n", sec, sec->ps_policy->sp_name); - if (cfs_time_after(sec->ps_gc_next, get_seconds())) + if (sec->ps_gc_next > ktime_get_real_seconds()) return; sec->ps_policy->sp_cops->gc_ctx(sec); - sec->ps_gc_next = get_seconds() + sec->ps_gc_interval; + sec->ps_gc_next = ktime_get_real_seconds() + sec->ps_gc_interval; } static int sec_gc_main(void *arg) { - struct ptlrpc_thread *thread = (struct ptlrpc_thread *) arg; + struct ptlrpc_thread *thread = arg; struct l_wait_info lwi; unshare_fs_struct(); diff --git a/drivers/staging/lustre/lustre/ptlrpc/sec_lproc.c b/drivers/staging/lustre/lustre/ptlrpc/sec_lproc.c index 68fcac14b..bda9a77af 100644 --- a/drivers/staging/lustre/lustre/ptlrpc/sec_lproc.c +++ b/drivers/staging/lustre/lustre/ptlrpc/sec_lproc.c @@ -98,14 +98,15 @@ static int sptlrpc_info_lprocfs_seq_show(struct seq_file *seq, void *v) atomic_read(&sec->ps_refcount)); seq_printf(seq, "nctx: %d\n", atomic_read(&sec->ps_nctx)); seq_printf(seq, "gc internal %ld\n", sec->ps_gc_interval); - seq_printf(seq, "gc next %ld\n", + seq_printf(seq, "gc next %lld\n", sec->ps_gc_interval ? - sec->ps_gc_next - get_seconds() : 0); + (s64)(sec->ps_gc_next - ktime_get_real_seconds()) : 0ll); sptlrpc_sec_put(sec); out: return 0; } + LPROC_SEQ_FOPS_RO(sptlrpc_info_lprocfs); static int sptlrpc_ctxs_lprocfs_seq_show(struct seq_file *seq, void *v) @@ -130,6 +131,7 @@ static int sptlrpc_ctxs_lprocfs_seq_show(struct seq_file *seq, void *v) out: return 0; } + LPROC_SEQ_FOPS_RO(sptlrpc_ctxs_lprocfs); int sptlrpc_lprocfs_cliobd_attach(struct obd_device *dev) diff --git a/drivers/staging/lustre/lustre/ptlrpc/sec_null.c b/drivers/staging/lustre/lustre/ptlrpc/sec_null.c index ce1c563d0..ebfa6092b 100644 --- a/drivers/staging/lustre/lustre/ptlrpc/sec_null.c +++ b/drivers/staging/lustre/lustre/ptlrpc/sec_null.c @@ -40,13 +40,14 @@ #define DEBUG_SUBSYSTEM S_SEC - #include "../include/obd_support.h" #include "../include/obd_cksum.h" #include "../include/obd_class.h" #include "../include/lustre_net.h" #include "../include/lustre_sec.h" +#include "ptlrpc_internal.h" + static struct ptlrpc_sec_policy null_policy; static struct ptlrpc_sec null_sec; static struct ptlrpc_cli_ctx null_cli_ctx; @@ -82,6 +83,7 @@ int null_ctx_sign(struct ptlrpc_cli_ctx *ctx, struct ptlrpc_request *req) if (!req->rq_import->imp_dlm_fake) { struct obd_device *obd = req->rq_import->imp_obd; + null_encode_sec_part(req->rq_reqbuf, obd->u.cli.cl_sp_me); } diff --git a/drivers/staging/lustre/lustre/ptlrpc/sec_plain.c b/drivers/staging/lustre/lustre/ptlrpc/sec_plain.c index a243db60f..f448b4567 100644 --- a/drivers/staging/lustre/lustre/ptlrpc/sec_plain.c +++ b/drivers/staging/lustre/lustre/ptlrpc/sec_plain.c @@ -40,12 +40,12 @@ #define DEBUG_SUBSYSTEM S_SEC - #include "../include/obd_support.h" #include "../include/obd_cksum.h" #include "../include/obd_class.h" #include "../include/lustre_net.h" #include "../include/lustre_sec.h" +#include "ptlrpc_internal.h" struct plain_sec { struct ptlrpc_sec pls_base; diff --git a/drivers/staging/lustre/lustre/ptlrpc/service.c b/drivers/staging/lustre/lustre/ptlrpc/service.c index 003344ccf..f45898f17 100644 --- a/drivers/staging/lustre/lustre/ptlrpc/service.c +++ b/drivers/staging/lustre/lustre/ptlrpc/service.c @@ -58,7 +58,6 @@ MODULE_PARM_DESC(at_early_margin, "How soon before an RPC deadline to send an ea module_param(at_extra, int, 0644); MODULE_PARM_DESC(at_extra, "How much extra time to give with each early reply"); - /* forward ref */ static int ptlrpc_server_post_idle_rqbds(struct ptlrpc_service_part *svcpt); static void ptlrpc_server_hpreq_fini(struct ptlrpc_request *req); @@ -86,8 +85,10 @@ ptlrpc_alloc_rqbd(struct ptlrpc_service_part *svcpt) rqbd->rqbd_cbid.cbid_fn = request_in_callback; rqbd->rqbd_cbid.cbid_arg = rqbd; INIT_LIST_HEAD(&rqbd->rqbd_reqs); - OBD_CPT_ALLOC_LARGE(rqbd->rqbd_buffer, svc->srv_cptable, - svcpt->scp_cpt, svc->srv_buf_size); + rqbd->rqbd_buffer = libcfs_kvzalloc_cpt(svc->srv_cptable, + svcpt->scp_cpt, + svc->srv_buf_size, + GFP_KERNEL); if (rqbd->rqbd_buffer == NULL) { kfree(rqbd); return NULL; @@ -141,7 +142,6 @@ ptlrpc_grow_req_bufs(struct ptlrpc_service_part *svcpt, int post) svcpt->scp_rqbd_allocating++; spin_unlock(&svcpt->scp_lock); - for (i = 0; i < svc->srv_nbuf_per_group; i++) { /* NB: another thread might have recycled enough rqbds, we * need to make sure it wouldn't over-allocate, see LU-1212. */ @@ -177,33 +177,6 @@ ptlrpc_grow_req_bufs(struct ptlrpc_service_part *svcpt, int post) return rc; } -/** - * Part of Rep-Ack logic. - * Puts a lock and its mode into reply state associated to request reply. - */ -void -ptlrpc_save_lock(struct ptlrpc_request *req, - struct lustre_handle *lock, int mode, int no_ack) -{ - struct ptlrpc_reply_state *rs = req->rq_reply_state; - int idx; - - LASSERT(rs != NULL); - LASSERT(rs->rs_nlocks < RS_MAX_LOCKS); - - if (req->rq_export->exp_disconnected) { - ldlm_lock_decref(lock, mode); - } else { - idx = rs->rs_nlocks++; - rs->rs_locks[idx] = *lock; - rs->rs_modes[idx] = mode; - rs->rs_difficult = 1; - rs->rs_no_ack = !!no_ack; - } -} -EXPORT_SYMBOL(ptlrpc_save_lock); - - struct ptlrpc_hr_partition; struct ptlrpc_hr_thread { @@ -244,32 +217,10 @@ struct ptlrpc_hr_service { struct ptlrpc_hr_partition **hr_partitions; }; -struct rs_batch { - struct list_head rsb_replies; - unsigned int rsb_n_replies; - struct ptlrpc_service_part *rsb_svcpt; -}; - /** reply handling service. */ static struct ptlrpc_hr_service ptlrpc_hr; /** - * maximum number of replies scheduled in one batch - */ -#define MAX_SCHEDULED 256 - -/** - * Initialize a reply batch. - * - * \param b batch - */ -static void rs_batch_init(struct rs_batch *b) -{ - memset(b, 0, sizeof(*b)); - INIT_LIST_HEAD(&b->rsb_replies); -} - -/** * Choose an hr thread to dispatch requests to. */ static struct ptlrpc_hr_thread * @@ -295,76 +246,6 @@ ptlrpc_hr_select(struct ptlrpc_service_part *svcpt) } /** - * Dispatch all replies accumulated in the batch to one from - * dedicated reply handling threads. - * - * \param b batch - */ -static void rs_batch_dispatch(struct rs_batch *b) -{ - if (b->rsb_n_replies != 0) { - struct ptlrpc_hr_thread *hrt; - - hrt = ptlrpc_hr_select(b->rsb_svcpt); - - spin_lock(&hrt->hrt_lock); - list_splice_init(&b->rsb_replies, &hrt->hrt_queue); - spin_unlock(&hrt->hrt_lock); - - wake_up(&hrt->hrt_waitq); - b->rsb_n_replies = 0; - } -} - -/** - * Add a reply to a batch. - * Add one reply object to a batch, schedule batched replies if overload. - * - * \param b batch - * \param rs reply - */ -static void rs_batch_add(struct rs_batch *b, struct ptlrpc_reply_state *rs) -{ - struct ptlrpc_service_part *svcpt = rs->rs_svcpt; - - if (svcpt != b->rsb_svcpt || b->rsb_n_replies >= MAX_SCHEDULED) { - if (b->rsb_svcpt != NULL) { - rs_batch_dispatch(b); - spin_unlock(&b->rsb_svcpt->scp_rep_lock); - } - spin_lock(&svcpt->scp_rep_lock); - b->rsb_svcpt = svcpt; - } - spin_lock(&rs->rs_lock); - rs->rs_scheduled_ever = 1; - if (rs->rs_scheduled == 0) { - list_move(&rs->rs_list, &b->rsb_replies); - rs->rs_scheduled = 1; - b->rsb_n_replies++; - } - rs->rs_committed = 1; - spin_unlock(&rs->rs_lock); -} - -/** - * Reply batch finalization. - * Dispatch remaining replies from the batch - * and release remaining spinlock. - * - * \param b batch - */ -static void rs_batch_fini(struct rs_batch *b) -{ - if (b->rsb_svcpt != NULL) { - rs_batch_dispatch(b); - spin_unlock(&b->rsb_svcpt->scp_rep_lock); - } -} - -#define DECLARE_RS_BATCH(b) struct rs_batch b - - -/** * Put reply state into a queue for processing because we received * ACK from the client */ @@ -401,32 +282,6 @@ ptlrpc_schedule_difficult_reply(struct ptlrpc_reply_state *rs) } EXPORT_SYMBOL(ptlrpc_schedule_difficult_reply); -void ptlrpc_commit_replies(struct obd_export *exp) -{ - struct ptlrpc_reply_state *rs, *nxt; - DECLARE_RS_BATCH(batch); - - rs_batch_init(&batch); - /* Find any replies that have been committed and get their service - * to attend to complete them. */ - - /* CAVEAT EMPTOR: spinlock ordering!!! */ - spin_lock(&exp->exp_uncommitted_replies_lock); - list_for_each_entry_safe(rs, nxt, &exp->exp_uncommitted_replies, - rs_obd_list) { - LASSERT(rs->rs_difficult); - /* VBR: per-export last_committed */ - LASSERT(rs->rs_export); - if (rs->rs_transno <= exp->exp_last_committed) { - list_del_init(&rs->rs_obd_list); - rs_batch_add(&batch, rs); - } - } - spin_unlock(&exp->exp_uncommitted_replies_lock); - rs_batch_fini(&batch); -} -EXPORT_SYMBOL(ptlrpc_commit_replies); - static int ptlrpc_server_post_idle_rqbds(struct ptlrpc_service_part *svcpt) { @@ -647,7 +502,9 @@ ptlrpc_service_part_init(struct ptlrpc_service *svc, if (array->paa_reqs_count == NULL) goto free_reqs_array; - cfs_timer_init(&svcpt->scp_at_timer, ptlrpc_at_timer, svcpt); + setup_timer(&svcpt->scp_at_timer, ptlrpc_at_timer, + (unsigned long)svcpt); + /* At SOW, service time should be quick; 10s seems generous. If client * timeout is less than this, we'll be sending an early reply. */ at_init(&svcpt->scp_at_estimate, 10, 0); @@ -856,7 +713,7 @@ static void ptlrpc_server_free_request(struct ptlrpc_request *req) * drop a reference count of the request. if it reaches 0, we either * put it into history list, or free it immediately. */ -void ptlrpc_server_drop_request(struct ptlrpc_request *req) +static void ptlrpc_server_drop_request(struct ptlrpc_request *req) { struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd; struct ptlrpc_service_part *svcpt = rqbd->rqbd_svcpt; @@ -960,35 +817,6 @@ void ptlrpc_server_drop_request(struct ptlrpc_request *req) } } -/** Change request export and move hp request from old export to new */ -void ptlrpc_request_change_export(struct ptlrpc_request *req, - struct obd_export *export) -{ - if (req->rq_export != NULL) { - if (!list_empty(&req->rq_exp_list)) { - /* remove rq_exp_list from last export */ - spin_lock_bh(&req->rq_export->exp_rpc_lock); - list_del_init(&req->rq_exp_list); - spin_unlock_bh(&req->rq_export->exp_rpc_lock); - - /* export has one reference already, so it`s safe to - * add req to export queue here and get another - * reference for request later */ - spin_lock_bh(&export->exp_rpc_lock); - list_add(&req->rq_exp_list, &export->exp_hp_rpcs); - spin_unlock_bh(&export->exp_rpc_lock); - } - class_export_rpc_dec(req->rq_export); - class_export_put(req->rq_export); - } - - /* request takes one export refcount */ - req->rq_export = class_export_get(export); - class_export_rpc_inc(export); - - return; -} - /** * to finish a request: stop sending more early replies, and release * the request. @@ -1025,82 +853,6 @@ static void ptlrpc_server_finish_active_request( } /** - * This function makes sure dead exports are evicted in a timely manner. - * This function is only called when some export receives a message (i.e., - * the network is up.) - */ -static void ptlrpc_update_export_timer(struct obd_export *exp, long extra_delay) -{ - struct obd_export *oldest_exp; - time_t oldest_time, new_time; - - LASSERT(exp); - - /* Compensate for slow machines, etc, by faking our request time - into the future. Although this can break the strict time-ordering - of the list, we can be really lazy here - we don't have to evict - at the exact right moment. Eventually, all silent exports - will make it to the top of the list. */ - - /* Do not pay attention on 1sec or smaller renewals. */ - new_time = get_seconds() + extra_delay; - if (exp->exp_last_request_time + 1 /*second */ >= new_time) - return; - - exp->exp_last_request_time = new_time; - - /* exports may get disconnected from the chain even though the - export has references, so we must keep the spin lock while - manipulating the lists */ - spin_lock(&exp->exp_obd->obd_dev_lock); - - if (list_empty(&exp->exp_obd_chain_timed)) { - /* this one is not timed */ - spin_unlock(&exp->exp_obd->obd_dev_lock); - return; - } - - list_move_tail(&exp->exp_obd_chain_timed, - &exp->exp_obd->obd_exports_timed); - - oldest_exp = list_entry(exp->exp_obd->obd_exports_timed.next, - struct obd_export, exp_obd_chain_timed); - oldest_time = oldest_exp->exp_last_request_time; - spin_unlock(&exp->exp_obd->obd_dev_lock); - - if (exp->exp_obd->obd_recovering) { - /* be nice to everyone during recovery */ - return; - } - - /* Note - racing to start/reset the obd_eviction timer is safe */ - if (exp->exp_obd->obd_eviction_timer == 0) { - /* Check if the oldest entry is expired. */ - if (get_seconds() > (oldest_time + PING_EVICT_TIMEOUT + - extra_delay)) { - /* We need a second timer, in case the net was down and - * it just came back. Since the pinger may skip every - * other PING_INTERVAL (see note in ptlrpc_pinger_main), - * we better wait for 3. */ - exp->exp_obd->obd_eviction_timer = - get_seconds() + 3 * PING_INTERVAL; - CDEBUG(D_HA, "%s: Think about evicting %s from "CFS_TIME_T"\n", - exp->exp_obd->obd_name, - obd_export_nid2str(oldest_exp), oldest_time); - } - } else { - if (get_seconds() > - (exp->exp_obd->obd_eviction_timer + extra_delay)) { - /* The evictor won't evict anyone who we've heard from - * recently, so we don't have to check before we start - * it. */ - if (!ping_evictor_wake(exp)) - exp->exp_obd->obd_eviction_timer = 0; - } - } -} - -/** * Sanity check request \a req. * Return 0 if all is ok, error code otherwise. */ @@ -1126,18 +878,16 @@ static int ptlrpc_check_req(struct ptlrpc_request *req) req, (obd != NULL) ? obd->obd_name : "unknown"); rc = -ENODEV; } else if (lustre_msg_get_flags(req->rq_reqmsg) & - (MSG_REPLAY | MSG_REQ_REPLAY_DONE) && - !obd->obd_recovering) { - DEBUG_REQ(D_ERROR, req, - "Invalid replay without recovery"); - class_fail_export(req->rq_export); - rc = -ENODEV; - } else if (lustre_msg_get_transno(req->rq_reqmsg) != 0 && - !obd->obd_recovering) { - DEBUG_REQ(D_ERROR, req, "Invalid req with transno %llu without recovery", - lustre_msg_get_transno(req->rq_reqmsg)); - class_fail_export(req->rq_export); - rc = -ENODEV; + (MSG_REPLAY | MSG_REQ_REPLAY_DONE)) { + DEBUG_REQ(D_ERROR, req, "Invalid replay without recovery"); + class_fail_export(req->rq_export); + rc = -ENODEV; + } else if (lustre_msg_get_transno(req->rq_reqmsg) != 0) { + DEBUG_REQ(D_ERROR, req, + "Invalid req with transno %llu without recovery", + lustre_msg_get_transno(req->rq_reqmsg)); + class_fail_export(req->rq_export); + rc = -ENODEV; } if (unlikely(rc < 0)) { @@ -1153,17 +903,17 @@ static void ptlrpc_at_set_timer(struct ptlrpc_service_part *svcpt) __s32 next; if (array->paa_count == 0) { - cfs_timer_disarm(&svcpt->scp_at_timer); + del_timer(&svcpt->scp_at_timer); return; } /* Set timer for closest deadline */ - next = (__s32)(array->paa_deadline - get_seconds() - + next = (__s32)(array->paa_deadline - ktime_get_real_seconds() - at_early_margin); if (next <= 0) { ptlrpc_at_timer((unsigned long)svcpt); } else { - cfs_timer_arm(&svcpt->scp_at_timer, cfs_time_shift(next)); + mod_timer(&svcpt->scp_at_timer, cfs_time_shift(next)); CDEBUG(D_INFO, "armed %s at %+ds\n", svcpt->scp_service->srv_name, next); } @@ -1189,7 +939,7 @@ static int ptlrpc_at_add_timed(struct ptlrpc_request *req) spin_lock(&svcpt->scp_at_lock); LASSERT(list_empty(&req->rq_timed_list)); - index = (unsigned long)req->rq_deadline % array->paa_size; + div_u64_rem(req->rq_deadline, array->paa_size, &index); if (array->paa_reqs_count[index] > 0) { /* latest rpcs will have the latest deadlines in the list, * so search backward. */ @@ -1248,8 +998,8 @@ static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req) struct ptlrpc_service_part *svcpt = req->rq_rqbd->rqbd_svcpt; struct ptlrpc_request *reqcopy; struct lustre_msg *reqmsg; - long olddl = req->rq_deadline - get_seconds(); - time_t newdl; + long olddl = req->rq_deadline - ktime_get_real_seconds(); + time64_t newdl; int rc; /* deadline is when the client expects us to reply, margin is the @@ -1276,36 +1026,22 @@ static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req) return -ENOSYS; } - if (req->rq_export && - lustre_msg_get_flags(req->rq_reqmsg) & - (MSG_REPLAY | MSG_REQ_REPLAY_DONE | MSG_LOCK_REPLAY_DONE)) { - /* During recovery, we don't want to send too many early - * replies, but on the other hand we want to make sure the - * client has enough time to resend if the rpc is lost. So - * during the recovery period send at least 4 early replies, - * spacing them every at_extra if we can. at_estimate should - * always equal this fixed value during recovery. */ - at_measured(&svcpt->scp_at_estimate, min(at_extra, - req->rq_export->exp_obd->obd_recovery_timeout / 4)); - } else { - /* Fake our processing time into the future to ask the clients - * for some extra amount of time */ - at_measured(&svcpt->scp_at_estimate, at_extra + - get_seconds() - - req->rq_arrival_time.tv_sec); - - /* Check to see if we've actually increased the deadline - - * we may be past adaptive_max */ - if (req->rq_deadline >= req->rq_arrival_time.tv_sec + - at_get(&svcpt->scp_at_estimate)) { - DEBUG_REQ(D_WARNING, req, "Couldn't add any time (%ld/%ld), not sending early reply\n", - olddl, req->rq_arrival_time.tv_sec + - at_get(&svcpt->scp_at_estimate) - - get_seconds()); - return -ETIMEDOUT; - } + /* Fake our processing time into the future to ask the clients + * for some extra amount of time */ + at_measured(&svcpt->scp_at_estimate, at_extra + + ktime_get_real_seconds() - req->rq_arrival_time.tv_sec); + + /* Check to see if we've actually increased the deadline - + * we may be past adaptive_max */ + if (req->rq_deadline >= req->rq_arrival_time.tv_sec + + at_get(&svcpt->scp_at_estimate)) { + DEBUG_REQ(D_WARNING, req, "Couldn't add any time (%ld/%lld), not sending early reply\n", + olddl, req->rq_arrival_time.tv_sec + + at_get(&svcpt->scp_at_estimate) - + ktime_get_real_seconds()); + return -ETIMEDOUT; } - newdl = get_seconds() + at_get(&svcpt->scp_at_estimate); + newdl = ktime_get_real_seconds() + at_get(&svcpt->scp_at_estimate); reqcopy = ptlrpc_request_cache_alloc(GFP_NOFS); if (reqcopy == NULL) @@ -1388,8 +1124,8 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service_part *svcpt) struct ptlrpc_request *rq, *n; struct list_head work_list; __u32 index, count; - time_t deadline; - time_t now = get_seconds(); + time64_t deadline; + time64_t now = ktime_get_real_seconds(); long delay; int first, counter = 0; @@ -1419,7 +1155,7 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service_part *svcpt) server will take. Send early replies to everyone expiring soon. */ INIT_LIST_HEAD(&work_list); deadline = -1; - index = (unsigned long)array->paa_deadline % array->paa_size; + div_u64_rem(array->paa_deadline, array->paa_size, &index); count = array->paa_count; while (count > 0) { count -= array->paa_reqs_count[index]; @@ -1461,7 +1197,7 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service_part *svcpt) chance to send early replies */ LCONSOLE_WARN("%s: This server is not able to keep up with request traffic (cpu-bound).\n", svcpt->scp_service->srv_name); - CWARN("earlyQ=%d reqQ=%d recA=%d, svcEst=%d, delay=" CFS_DURATION_T "(jiff)\n", + CWARN("earlyQ=%d reqQ=%d recA=%d, svcEst=%d, delay=%ld(jiff)\n", counter, svcpt->scp_nreqs_incoming, svcpt->scp_nreqs_active, at_get(&svcpt->scp_at_estimate), delay); @@ -1546,30 +1282,6 @@ static void ptlrpc_server_hpreq_fini(struct ptlrpc_request *req) } } -static int ptlrpc_hpreq_check(struct ptlrpc_request *req) -{ - return 1; -} - -static struct ptlrpc_hpreq_ops ptlrpc_hpreq_common = { - .hpreq_check = ptlrpc_hpreq_check, -}; - -/* Hi-Priority RPC check by RPC operation code. */ -int ptlrpc_hpreq_handler(struct ptlrpc_request *req) -{ - int opc = lustre_msg_get_opc(req->rq_reqmsg); - - /* Check for export to let only reconnects for not yet evicted - * export to become a HP rpc. */ - if ((req->rq_export != NULL) && - (opc == OBD_PING || opc == MDS_CONNECT || opc == OST_CONNECT)) - req->rq_ops = &ptlrpc_hpreq_common; - - return 0; -} -EXPORT_SYMBOL(ptlrpc_hpreq_handler); - static int ptlrpc_server_request_add(struct ptlrpc_service_part *svcpt, struct ptlrpc_request *req) { @@ -1638,6 +1350,7 @@ static bool ptlrpc_server_allow_normal(struct ptlrpc_service_part *svcpt, bool force) { int running = svcpt->scp_nthrs_running; + if (unlikely(svcpt->scp_service->srv_req_portal == MDS_REQUEST_PORTAL && CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CANCEL_RESEND))) { /* leave just 1 thread for normal RPCs */ @@ -1828,14 +1541,13 @@ ptlrpc_server_handle_req_in(struct ptlrpc_service_part *svcpt, if (rc) goto err_req; - ptlrpc_update_export_timer(req->rq_export, 0); } /* req_in handling should/must be fast */ - if (get_seconds() - req->rq_arrival_time.tv_sec > 5) + if (ktime_get_real_seconds() - req->rq_arrival_time.tv_sec > 5) DEBUG_REQ(D_WARNING, req, "Slow req_in handling "CFS_DURATION_T"s", - cfs_time_sub(get_seconds(), - req->rq_arrival_time.tv_sec)); + (long)(ktime_get_real_seconds() - + req->rq_arrival_time.tv_sec)); /* Set rpc server deadline and add it to the timed list */ deadline = (lustre_msghdr_get_flags(req->rq_reqmsg) & @@ -1876,9 +1588,12 @@ ptlrpc_server_handle_request(struct ptlrpc_service_part *svcpt, { struct ptlrpc_service *svc = svcpt->scp_service; struct ptlrpc_request *request; - struct timeval work_start; - struct timeval work_end; - long timediff; + struct timespec64 work_start; + struct timespec64 work_end; + struct timespec64 timediff; + struct timespec64 arrived; + unsigned long timediff_usecs; + unsigned long arrived_usecs; int rc; int fail_opc = 0; @@ -1901,12 +1616,13 @@ ptlrpc_server_handle_request(struct ptlrpc_service_part *svcpt, if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_DUMP_LOG)) libcfs_debug_dumplog(); - do_gettimeofday(&work_start); - timediff = cfs_timeval_sub(&work_start, &request->rq_arrival_time, - NULL); + ktime_get_real_ts64(&work_start); + timediff = timespec64_sub(work_start, request->rq_arrival_time); + timediff_usecs = timediff.tv_sec * USEC_PER_SEC + + timediff.tv_nsec / NSEC_PER_USEC; if (likely(svc->srv_stats != NULL)) { lprocfs_counter_add(svc->srv_stats, PTLRPC_REQWAIT_CNTR, - timediff); + timediff_usecs); lprocfs_counter_add(svc->srv_stats, PTLRPC_REQQDEPTH_CNTR, svcpt->scp_nreqs_incoming); lprocfs_counter_add(svc->srv_stats, PTLRPC_REQACTIVE_CNTR, @@ -1933,18 +1649,17 @@ ptlrpc_server_handle_request(struct ptlrpc_service_part *svcpt, if (likely(request->rq_export)) { if (unlikely(ptlrpc_check_req(request))) goto put_conn; - ptlrpc_update_export_timer(request->rq_export, timediff >> 19); } /* Discard requests queued for longer than the deadline. The deadline is increased if we send an early reply. */ - if (get_seconds() > request->rq_deadline) { + if (ktime_get_real_seconds() > request->rq_deadline) { DEBUG_REQ(D_ERROR, request, "Dropping timed-out request from %s: deadline " CFS_DURATION_T ":" CFS_DURATION_T "s ago\n", libcfs_id2str(request->rq_peer), - cfs_time_sub(request->rq_deadline, - request->rq_arrival_time.tv_sec), - cfs_time_sub(get_seconds(), - request->rq_deadline)); + (long)(request->rq_deadline - + request->rq_arrival_time.tv_sec), + (long)(ktime_get_real_seconds() - + request->rq_deadline)); goto put_conn; } @@ -1969,19 +1684,22 @@ put_conn: lu_context_exit(&request->rq_session); lu_context_fini(&request->rq_session); - if (unlikely(get_seconds() > request->rq_deadline)) { + if (unlikely(ktime_get_real_seconds() > request->rq_deadline)) { DEBUG_REQ(D_WARNING, request, - "Request took longer than estimated (" - CFS_DURATION_T":"CFS_DURATION_T - "s); client may timeout.", - cfs_time_sub(request->rq_deadline, - request->rq_arrival_time.tv_sec), - cfs_time_sub(get_seconds(), - request->rq_deadline)); - } - - do_gettimeofday(&work_end); - timediff = cfs_timeval_sub(&work_end, &work_start, NULL); + "Request took longer than estimated (%lld:%llds); " + "client may timeout.", + (s64)request->rq_deadline - + request->rq_arrival_time.tv_sec, + (s64)ktime_get_real_seconds() - request->rq_deadline); + } + + ktime_get_real_ts64(&work_end); + timediff = timespec64_sub(work_end, work_start); + timediff_usecs = timediff.tv_sec * USEC_PER_SEC + + timediff.tv_nsec / NSEC_PER_USEC; + arrived = timespec64_sub(work_end, request->rq_arrival_time); + arrived_usecs = arrived.tv_sec * USEC_PER_SEC + + arrived.tv_nsec / NSEC_PER_USEC; CDEBUG(D_RPCTRACE, "Handled RPC pname:cluuid+ref:pid:xid:nid:opc %s:%s+%d:%d:x%llu:%s:%d Request processed in %ldus (%ldus total) trans %llu rc %d/%d\n", current_comm(), (request->rq_export ? @@ -1992,8 +1710,8 @@ put_conn: request->rq_xid, libcfs_id2str(request->rq_peer), lustre_msg_get_opc(request->rq_reqmsg), - timediff, - cfs_timeval_sub(&work_end, &request->rq_arrival_time, NULL), + timediff_usecs, + arrived_usecs, (request->rq_repmsg ? lustre_msg_get_transno(request->rq_repmsg) : request->rq_transno), @@ -2003,20 +1721,20 @@ put_conn: if (likely(svc->srv_stats != NULL && request->rq_reqmsg != NULL)) { __u32 op = lustre_msg_get_opc(request->rq_reqmsg); int opc = opcode_offset(op); + if (opc > 0 && !(op == LDLM_ENQUEUE || op == MDS_REINT)) { LASSERT(opc < LUSTRE_MAX_OPCODES); lprocfs_counter_add(svc->srv_stats, opc + EXTRA_MAX_OPCODES, - timediff); + timediff_usecs); } } if (unlikely(request->rq_early_count)) { DEBUG_REQ(D_ADAPTTO, request, - "sent %d early replies before finishing in " - CFS_DURATION_T"s", + "sent %d early replies before finishing in %llds", request->rq_early_count, - cfs_time_sub(work_end.tv_sec, - request->rq_arrival_time.tv_sec)); + (s64)work_end.tv_sec - + request->rq_arrival_time.tv_sec); } out_req: @@ -2128,7 +1846,6 @@ ptlrpc_handle_rs(struct ptlrpc_reply_state *rs) return 1; } - static void ptlrpc_check_rqbd_pool(struct ptlrpc_service_part *svcpt) { @@ -2155,7 +1872,7 @@ ptlrpc_check_rqbd_pool(struct ptlrpc_service_part *svcpt) static int ptlrpc_retry_rqbds(void *arg) { - struct ptlrpc_service_part *svcpt = (struct ptlrpc_service_part *)arg; + struct ptlrpc_service_part *svcpt = arg; svcpt->scp_rqbd_timeout = 0; return -ETIMEDOUT; @@ -2262,7 +1979,7 @@ ptlrpc_wait_event(struct ptlrpc_service_part *svcpt, */ static int ptlrpc_main(void *arg) { - struct ptlrpc_thread *thread = (struct ptlrpc_thread *)arg; + struct ptlrpc_thread *thread = arg; struct ptlrpc_service_part *svcpt = thread->t_svcpt; struct ptlrpc_service *svc = svcpt->scp_service; struct ptlrpc_reply_state *rs; @@ -2464,7 +2181,7 @@ static int hrt_dont_sleep(struct ptlrpc_hr_thread *hrt, */ static int ptlrpc_hr_main(void *arg) { - struct ptlrpc_hr_thread *hrt = (struct ptlrpc_hr_thread *)arg; + struct ptlrpc_hr_thread *hrt = arg; struct ptlrpc_hr_partition *hrp = hrt->hrt_partition; LIST_HEAD (replies); char threadname[20]; @@ -2538,6 +2255,7 @@ static int ptlrpc_start_hr_threads(void) for (j = 0; j < hrp->hrp_nthrs; j++) { struct ptlrpc_hr_thread *hrt = &hrp->hrp_thrs[j]; + rc = PTR_ERR(kthread_run(ptlrpc_hr_main, &hrp->hrp_thrs[j], "ptlrpc_hr%02d_%03d", @@ -2609,7 +2327,7 @@ static void ptlrpc_svcpt_stop_threads(struct ptlrpc_service_part *svcpt) /** * Stops all threads of a particular service \a svc */ -void ptlrpc_stop_all_threads(struct ptlrpc_service *svc) +static void ptlrpc_stop_all_threads(struct ptlrpc_service *svc) { struct ptlrpc_service_part *svcpt; int i; @@ -2619,7 +2337,6 @@ void ptlrpc_stop_all_threads(struct ptlrpc_service *svc) ptlrpc_svcpt_stop_threads(svcpt); } } -EXPORT_SYMBOL(ptlrpc_stop_all_threads); int ptlrpc_start_threads(struct ptlrpc_service *svc) { @@ -2833,7 +2550,6 @@ void ptlrpc_hr_fini(void) ptlrpc_hr.hr_partitions = NULL; } - /** * Wait until all already scheduled replies are processed. */ @@ -2862,7 +2578,7 @@ ptlrpc_service_del_atimer(struct ptlrpc_service *svc) /* early disarm AT timer... */ ptlrpc_service_for_each_part(svcpt, i, svc) { if (svcpt->scp_service != NULL) - cfs_timer_disarm(&svcpt->scp_at_timer); + del_timer(&svcpt->scp_at_timer); } } @@ -3002,7 +2718,7 @@ ptlrpc_service_free(struct ptlrpc_service *svc) break; /* In case somebody rearmed this in the meantime */ - cfs_timer_disarm(&svcpt->scp_at_timer); + del_timer(&svcpt->scp_at_timer); array = &svcpt->scp_at_array; kfree(array->paa_reqs_array); @@ -3045,61 +2761,3 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service) return 0; } EXPORT_SYMBOL(ptlrpc_unregister_service); - -/** - * Returns 0 if the service is healthy. - * - * Right now, it just checks to make sure that requests aren't languishing - * in the queue. We'll use this health check to govern whether a node needs - * to be shot, so it's intentionally non-aggressive. */ -static int ptlrpc_svcpt_health_check(struct ptlrpc_service_part *svcpt) -{ - struct ptlrpc_request *request = NULL; - struct timeval right_now; - long timediff; - - do_gettimeofday(&right_now); - - spin_lock(&svcpt->scp_req_lock); - /* How long has the next entry been waiting? */ - if (ptlrpc_server_high_pending(svcpt, true)) - request = ptlrpc_nrs_req_peek_nolock(svcpt, true); - else if (ptlrpc_server_normal_pending(svcpt, true)) - request = ptlrpc_nrs_req_peek_nolock(svcpt, false); - - if (request == NULL) { - spin_unlock(&svcpt->scp_req_lock); - return 0; - } - - timediff = cfs_timeval_sub(&right_now, &request->rq_arrival_time, NULL); - spin_unlock(&svcpt->scp_req_lock); - - if ((timediff / ONE_MILLION) > - (AT_OFF ? obd_timeout * 3 / 2 : at_max)) { - CERROR("%s: unhealthy - request has been waiting %lds\n", - svcpt->scp_service->srv_name, timediff / ONE_MILLION); - return -1; - } - - return 0; -} - -int -ptlrpc_service_health_check(struct ptlrpc_service *svc) -{ - struct ptlrpc_service_part *svcpt; - int i; - - if (svc == NULL) - return 0; - - ptlrpc_service_for_each_part(svcpt, i, svc) { - int rc = ptlrpc_svcpt_health_check(svcpt); - - if (rc != 0) - return rc; - } - return 0; -} -EXPORT_SYMBOL(ptlrpc_service_health_check); diff --git a/drivers/staging/lustre/lustre/ptlrpc/wiretest.c b/drivers/staging/lustre/lustre/ptlrpc/wiretest.c index d6d92046c..40f720ca3 100644 --- a/drivers/staging/lustre/lustre/ptlrpc/wiretest.c +++ b/drivers/staging/lustre/lustre/ptlrpc/wiretest.c @@ -43,6 +43,8 @@ #include "../include/obd_class.h" #include "../include/lustre_net.h" #include "../include/lustre_disk.h" +#include "ptlrpc_internal.h" + void lustre_assert_wire_constants(void) { /* Wire protocol assertions generated by 'wirecheck' @@ -636,12 +638,8 @@ void lustre_assert_wire_constants(void) (long long)(int)offsetof(struct lustre_msg_v2, lm_buflens[0])); LASSERTF((int)sizeof(((struct lustre_msg_v2 *)0)->lm_buflens[0]) == 4, "found %lld\n", (long long)(int)sizeof(((struct lustre_msg_v2 *)0)->lm_buflens[0])); - LASSERTF(LUSTRE_MSG_MAGIC_V1 == 0x0BD00BD0, "found 0x%.8x\n", - LUSTRE_MSG_MAGIC_V1); LASSERTF(LUSTRE_MSG_MAGIC_V2 == 0x0BD00BD3, "found 0x%.8x\n", LUSTRE_MSG_MAGIC_V2); - LASSERTF(LUSTRE_MSG_MAGIC_V1_SWABBED == 0xD00BD00B, "found 0x%.8x\n", - LUSTRE_MSG_MAGIC_V1_SWABBED); LASSERTF(LUSTRE_MSG_MAGIC_V2_SWABBED == 0xD30BD00B, "found 0x%.8x\n", LUSTRE_MSG_MAGIC_V2_SWABBED); |