summaryrefslogtreecommitdiff
path: root/drivers/staging/lustre/lustre/ptlrpc
diff options
context:
space:
mode:
authorAndré Fabian Silva Delgado <emulatorman@parabola.nu>2016-01-20 14:01:31 -0300
committerAndré Fabian Silva Delgado <emulatorman@parabola.nu>2016-01-20 14:01:31 -0300
commitb4b7ff4b08e691656c9d77c758fc355833128ac0 (patch)
tree82fcb00e6b918026dc9f2d1f05ed8eee83874cc0 /drivers/staging/lustre/lustre/ptlrpc
parent35acfa0fc609f2a2cd95cef4a6a9c3a5c38f1778 (diff)
Linux-libre 4.4-gnupck-4.4-gnu
Diffstat (limited to 'drivers/staging/lustre/lustre/ptlrpc')
-rw-r--r--drivers/staging/lustre/lustre/ptlrpc/client.c512
-rw-r--r--drivers/staging/lustre/lustre/ptlrpc/connection.c10
-rw-r--r--drivers/staging/lustre/lustre/ptlrpc/events.c12
-rw-r--r--drivers/staging/lustre/lustre/ptlrpc/import.c70
-rw-r--r--drivers/staging/lustre/lustre/ptlrpc/layout.c144
-rw-r--r--drivers/staging/lustre/lustre/ptlrpc/llog_client.c37
-rw-r--r--drivers/staging/lustre/lustre/ptlrpc/lproc_ptlrpc.c134
-rw-r--r--drivers/staging/lustre/lustre/ptlrpc/niobuf.c22
-rw-r--r--drivers/staging/lustre/lustre/ptlrpc/nrs.c129
-rw-r--r--drivers/staging/lustre/lustre/ptlrpc/pack_generic.c362
-rw-r--r--drivers/staging/lustre/lustre/ptlrpc/pers.c1
-rw-r--r--drivers/staging/lustre/lustre/ptlrpc/pinger.c170
-rw-r--r--drivers/staging/lustre/lustre/ptlrpc/ptlrpc_internal.h19
-rw-r--r--drivers/staging/lustre/lustre/ptlrpc/ptlrpc_module.c6
-rw-r--r--drivers/staging/lustre/lustre/ptlrpc/ptlrpcd.c740
-rw-r--r--drivers/staging/lustre/lustre/ptlrpc/sec.c165
-rw-r--r--drivers/staging/lustre/lustre/ptlrpc/sec_bulk.c358
-rw-r--r--drivers/staging/lustre/lustre/ptlrpc/sec_config.c78
-rw-r--r--drivers/staging/lustre/lustre/ptlrpc/sec_gc.c25
-rw-r--r--drivers/staging/lustre/lustre/ptlrpc/sec_lproc.c6
-rw-r--r--drivers/staging/lustre/lustre/ptlrpc/sec_null.c4
-rw-r--r--drivers/staging/lustre/lustre/ptlrpc/sec_plain.c2
-rw-r--r--drivers/staging/lustre/lustre/ptlrpc/service.c526
-rw-r--r--drivers/staging/lustre/lustre/ptlrpc/wiretest.c6
24 files changed, 987 insertions, 2551 deletions
diff --git a/drivers/staging/lustre/lustre/ptlrpc/client.c b/drivers/staging/lustre/lustre/ptlrpc/client.c
index c83a34a01..a9f1bf536 100644
--- a/drivers/staging/lustre/lustre/ptlrpc/client.c
+++ b/drivers/staging/lustre/lustre/ptlrpc/client.c
@@ -72,9 +72,11 @@ struct ptlrpc_connection *ptlrpc_uuid_to_connection(struct obd_uuid *uuid)
lnet_process_id_t peer;
int err;
- /* ptlrpc_uuid_to_peer() initializes its 2nd parameter
- * before accessing its values. */
- /* coverity[uninit_use_in_call] */
+ /*
+ * ptlrpc_uuid_to_peer() initializes its 2nd parameter
+ * before accessing its values.
+ * coverity[uninit_use_in_call]
+ */
err = ptlrpc_uuid_to_peer(uuid, &peer, &self);
if (err != 0) {
CNETERR("cannot find peer %s!\n", uuid->uuid);
@@ -117,8 +119,10 @@ struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned npages, unsigned max_brw,
desc->bd_md_count = 0;
LASSERT(max_brw > 0);
desc->bd_md_max_brw = min(max_brw, PTLRPC_BULK_OPS_COUNT);
- /* PTLRPC_BULK_OPS_COUNT is the compile-time transfer limit for this
- * node. Negotiated ocd_brw_size will always be <= this number. */
+ /*
+ * PTLRPC_BULK_OPS_COUNT is the compile-time transfer limit for this
+ * node. Negotiated ocd_brw_size will always be <= this number.
+ */
for (i = 0; i < PTLRPC_BULK_OPS_COUNT; i++)
LNetInvalidateHandle(&desc->bd_mds[i]);
@@ -223,8 +227,9 @@ void ptlrpc_at_set_req_timeout(struct ptlrpc_request *req)
LASSERT(req->rq_import);
if (AT_OFF) {
- /* non-AT settings */
- /**
+ /*
+ * non-AT settings
+ *
* \a imp_server_timeout means this is reverse import and
* we send (currently only) ASTs to the client and cannot afford
* to wait too long for the reply, otherwise the other client
@@ -240,11 +245,15 @@ void ptlrpc_at_set_req_timeout(struct ptlrpc_request *req)
serv_est = at_get(&at->iat_service_estimate[idx]);
req->rq_timeout = at_est2timeout(serv_est);
}
- /* We could get even fancier here, using history to predict increased
- loading... */
+ /*
+ * We could get even fancier here, using history to predict increased
+ * loading...
+ */
- /* Let the server know what this RPC timeout is by putting it in the
- reqmsg*/
+ /*
+ * Let the server know what this RPC timeout is by putting it in the
+ * reqmsg
+ */
lustre_msg_set_timeout(req->rq_reqmsg, req->rq_timeout);
}
EXPORT_SYMBOL(ptlrpc_at_set_req_timeout);
@@ -261,8 +270,10 @@ static void ptlrpc_at_adj_service(struct ptlrpc_request *req,
at = &req->rq_import->imp_at;
idx = import_at_get_index(req->rq_import, req->rq_request_portal);
- /* max service estimates are tracked on the server side,
- so just keep minimal history here */
+ /*
+ * max service estimates are tracked on the server side,
+ * so just keep minimal history here
+ */
oldse = at_measured(&at->iat_service_estimate[idx], serv_est);
if (oldse != 0)
CDEBUG(D_ADAPTTO, "The RPC service estimate for %s ptl %d has changed from %d to %d\n",
@@ -282,12 +293,13 @@ static void ptlrpc_at_adj_net_latency(struct ptlrpc_request *req,
{
unsigned int nl, oldnl;
struct imp_at *at;
- time_t now = get_seconds();
+ time64_t now = ktime_get_real_seconds();
LASSERT(req->rq_import);
if (service_time > now - req->rq_sent + 3) {
- /* bz16408, however, this can also happen if early reply
+ /*
+ * bz16408, however, this can also happen if early reply
* is lost and client RPC is expired and resent, early reply
* or reply of original RPC can still be fit in reply buffer
* of resent RPC, now client is measuring time from the
@@ -298,7 +310,7 @@ static void ptlrpc_at_adj_net_latency(struct ptlrpc_request *req,
D_ADAPTTO : D_WARNING,
"Reported service time %u > total measured time "
CFS_DURATION_T"\n", service_time,
- cfs_time_sub(now, req->rq_sent));
+ (long)(now - req->rq_sent));
return;
}
@@ -343,7 +355,7 @@ static int unpack_reply(struct ptlrpc_request *req)
static int ptlrpc_at_recv_early_reply(struct ptlrpc_request *req)
{
struct ptlrpc_request *early_req;
- time_t olddl;
+ time64_t olddl;
int rc;
req->rq_early = 0;
@@ -376,16 +388,18 @@ static int ptlrpc_at_recv_early_reply(struct ptlrpc_request *req)
spin_lock(&req->rq_lock);
olddl = req->rq_deadline;
- /* server assumes it now has rq_timeout from when it sent the
- * early reply, so client should give it at least that long. */
- req->rq_deadline = get_seconds() + req->rq_timeout +
+ /*
+ * server assumes it now has rq_timeout from when it sent the
+ * early reply, so client should give it at least that long.
+ */
+ req->rq_deadline = ktime_get_real_seconds() + req->rq_timeout +
ptlrpc_at_get_net_latency(req);
DEBUG_REQ(D_ADAPTTO, req,
- "Early reply #%d, new deadline in " CFS_DURATION_T "s (" CFS_DURATION_T "s)",
+ "Early reply #%d, new deadline in %lds (%lds)",
req->rq_early_count,
- cfs_time_sub(req->rq_deadline, get_seconds()),
- cfs_time_sub(req->rq_deadline, olddl));
+ (long)(req->rq_deadline - ktime_get_real_seconds()),
+ (long)(req->rq_deadline - olddl));
return rc;
}
@@ -409,13 +423,13 @@ struct ptlrpc_request *ptlrpc_request_cache_alloc(gfp_t flags)
{
struct ptlrpc_request *req;
- OBD_SLAB_ALLOC_PTR_GFP(req, request_cache, flags);
+ req = kmem_cache_alloc(request_cache, flags | __GFP_ZERO);
return req;
}
void ptlrpc_request_cache_free(struct ptlrpc_request *req)
{
- OBD_SLAB_FREE_PTR(req, request_cache);
+ kmem_cache_free(request_cache, req);
}
/**
@@ -446,7 +460,7 @@ EXPORT_SYMBOL(ptlrpc_free_rq_pool);
/**
* Allocates, initializes and adds \a num_rq requests to the pool \a pool
*/
-void ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq)
+int ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq)
{
int i;
int size = 1;
@@ -468,11 +482,11 @@ void ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq)
spin_unlock(&pool->prp_lock);
req = ptlrpc_request_cache_alloc(GFP_NOFS);
if (!req)
- return;
+ return i;
msg = libcfs_kvzalloc(size, GFP_NOFS);
if (!msg) {
ptlrpc_request_cache_free(req);
- return;
+ return i;
}
req->rq_reqbuf = msg;
req->rq_reqbuf_len = size;
@@ -481,6 +495,7 @@ void ptlrpc_add_rqs_to_pool(struct ptlrpc_request_pool *pool, int num_rq)
list_add_tail(&req->rq_list, &pool->prp_req_list);
}
spin_unlock(&pool->prp_lock);
+ return num_rq;
}
EXPORT_SYMBOL(ptlrpc_add_rqs_to_pool);
@@ -494,7 +509,7 @@ EXPORT_SYMBOL(ptlrpc_add_rqs_to_pool);
*/
struct ptlrpc_request_pool *
ptlrpc_init_rq_pool(int num_rq, int msgsize,
- void (*populate_pool)(struct ptlrpc_request_pool *, int))
+ int (*populate_pool)(struct ptlrpc_request_pool *, int))
{
struct ptlrpc_request_pool *pool;
@@ -502,8 +517,10 @@ ptlrpc_init_rq_pool(int num_rq, int msgsize,
if (!pool)
return NULL;
- /* Request next power of two for the allocation, because internally
- kernel would do exactly this */
+ /*
+ * Request next power of two for the allocation, because internally
+ * kernel would do exactly this
+ */
spin_lock_init(&pool->prp_lock);
INIT_LIST_HEAD(&pool->prp_req_list);
@@ -512,11 +529,6 @@ ptlrpc_init_rq_pool(int num_rq, int msgsize,
populate_pool(pool, num_rq);
- if (list_empty(&pool->prp_req_list)) {
- /* have not allocated a single request for the pool */
- kfree(pool);
- pool = NULL;
- }
return pool;
}
EXPORT_SYMBOL(ptlrpc_init_rq_pool);
@@ -535,10 +547,12 @@ ptlrpc_prep_req_from_pool(struct ptlrpc_request_pool *pool)
spin_lock(&pool->prp_lock);
- /* See if we have anything in a pool, and bail out if nothing,
+ /*
+ * See if we have anything in a pool, and bail out if nothing,
* in writeout path, where this matters, this is safe to do, because
* nothing is lost in this case, and when some in-flight requests
- * complete, this code will be called again. */
+ * complete, this code will be called again.
+ */
if (unlikely(list_empty(&pool->prp_req_list))) {
spin_unlock(&pool->prp_lock);
return NULL;
@@ -664,11 +678,13 @@ int ptlrpc_request_pack(struct ptlrpc_request *request,
__u32 version, int opcode)
{
int rc;
+
rc = ptlrpc_request_bufs_pack(request, version, opcode, NULL, NULL);
if (rc)
return rc;
- /* For some old 1.8 clients (< 1.8.7), they will LASSERT the size of
+ /*
+ * For some old 1.8 clients (< 1.8.7), they will LASSERT the size of
* ptlrpc_body sent from server equal to local ptlrpc_body size, so we
* have to send old ptlrpc_body to keep interoperability with these
* clients.
@@ -700,13 +716,12 @@ static inline
struct ptlrpc_request *__ptlrpc_request_alloc(struct obd_import *imp,
struct ptlrpc_request_pool *pool)
{
- struct ptlrpc_request *request = NULL;
+ struct ptlrpc_request *request;
- if (pool)
- request = ptlrpc_prep_req_from_pool(pool);
+ request = ptlrpc_request_cache_alloc(GFP_NOFS);
- if (!request)
- request = ptlrpc_request_cache_alloc(GFP_NOFS);
+ if (!request && pool)
+ request = ptlrpc_prep_req_from_pool(pool);
if (request) {
LASSERTF((unsigned long)imp > 0x1000, "%p", imp);
@@ -807,56 +822,17 @@ struct ptlrpc_request *ptlrpc_request_alloc_pack(struct obd_import *imp,
EXPORT_SYMBOL(ptlrpc_request_alloc_pack);
/**
- * Prepare request (fetched from pool \a pool if not NULL) on import \a imp
- * for operation \a opcode. Request would contain \a count buffers.
- * Sizes of buffers are described in array \a lengths and buffers themselves
- * are provided by a pointer \a bufs.
- * Returns prepared request structure pointer or NULL on error.
- */
-struct ptlrpc_request *
-ptlrpc_prep_req_pool(struct obd_import *imp,
- __u32 version, int opcode,
- int count, __u32 *lengths, char **bufs,
- struct ptlrpc_request_pool *pool)
-{
- struct ptlrpc_request *request;
- int rc;
-
- request = __ptlrpc_request_alloc(imp, pool);
- if (!request)
- return NULL;
-
- rc = __ptlrpc_request_bufs_pack(request, version, opcode, count,
- lengths, bufs, NULL);
- if (rc) {
- ptlrpc_request_free(request);
- request = NULL;
- }
- return request;
-}
-EXPORT_SYMBOL(ptlrpc_prep_req_pool);
-
-/**
- * Same as ptlrpc_prep_req_pool, but without pool
- */
-struct ptlrpc_request *
-ptlrpc_prep_req(struct obd_import *imp, __u32 version, int opcode, int count,
- __u32 *lengths, char **bufs)
-{
- return ptlrpc_prep_req_pool(imp, version, opcode, count, lengths, bufs,
- NULL);
-}
-EXPORT_SYMBOL(ptlrpc_prep_req);
-
-/**
- * Allocate and initialize new request set structure.
+ * Allocate and initialize new request set structure on the current CPT.
* Returns a pointer to the newly allocated set structure or NULL on error.
*/
struct ptlrpc_request_set *ptlrpc_prep_set(void)
{
struct ptlrpc_request_set *set;
+ int cpt;
- set = kzalloc(sizeof(*set), GFP_NOFS);
+ cpt = cfs_cpt_current(cfs_cpt_table, 0);
+ set = kzalloc_node(sizeof(*set), GFP_NOFS,
+ cfs_cpt_spread_node(cfs_cpt_table, cpt));
if (!set)
return NULL;
atomic_set(&set->set_refcount, 1);
@@ -961,28 +937,6 @@ void ptlrpc_set_destroy(struct ptlrpc_request_set *set)
EXPORT_SYMBOL(ptlrpc_set_destroy);
/**
- * Add a callback function \a fn to the set.
- * This function would be called when all requests on this set are completed.
- * The function will be passed \a data argument.
- */
-int ptlrpc_set_add_cb(struct ptlrpc_request_set *set,
- set_interpreter_func fn, void *data)
-{
- struct ptlrpc_set_cbdata *cbdata;
-
- cbdata = kzalloc(sizeof(*cbdata), GFP_NOFS);
- if (!cbdata)
- return -ENOMEM;
-
- cbdata->psc_interpret = fn;
- cbdata->psc_data = data;
- list_add_tail(&cbdata->psc_item, &set->set_cblist);
-
- return 0;
-}
-EXPORT_SYMBOL(ptlrpc_set_add_cb);
-
-/**
* Add a new request to the general purpose request set.
* Assumes request reference from the caller.
*/
@@ -1001,8 +955,10 @@ void ptlrpc_set_add_req(struct ptlrpc_request_set *set,
lustre_msg_set_jobid(req->rq_reqmsg, NULL);
if (set->set_producer != NULL)
- /* If the request set has a producer callback, the RPC must be
- * sent straight away */
+ /*
+ * If the request set has a producer callback, the RPC must be
+ * sent straight away
+ */
ptlrpc_send_new_req(req);
}
EXPORT_SYMBOL(ptlrpc_set_add_req);
@@ -1022,9 +978,7 @@ void ptlrpc_set_add_new_req(struct ptlrpcd_ctl *pc,
LASSERT(test_bit(LIOD_STOP, &pc->pc_flags) == 0);
spin_lock(&set->set_new_req_lock);
- /*
- * The set takes over the caller's request reference.
- */
+ /* The set takes over the caller's request reference. */
req->rq_set = set;
req->rq_queued_time = cfs_time_current();
list_add_tail(&req->rq_set_chain, &set->set_new_requests);
@@ -1035,9 +989,11 @@ void ptlrpc_set_add_new_req(struct ptlrpcd_ctl *pc,
if (count == 1) {
wake_up(&set->set_waitq);
- /* XXX: It maybe unnecessary to wakeup all the partners. But to
+ /*
+ * XXX: It maybe unnecessary to wakeup all the partners. But to
* guarantee the async RPC can be processed ASAP, we have
- * no other better choice. It maybe fixed in future. */
+ * no other better choice. It maybe fixed in future.
+ */
for (i = 0; i < pc->pc_npartners; i++)
wake_up(&pc->pc_partners[i]->pc_set->set_waitq);
}
@@ -1125,8 +1081,10 @@ static int ptlrpc_console_allow(struct ptlrpc_request *req)
LASSERT(req->rq_reqmsg != NULL);
opc = lustre_msg_get_opc(req->rq_reqmsg);
- /* Suppress particular reconnect errors which are to be expected. No
- * errors are suppressed for the initial connection on an import */
+ /*
+ * Suppress particular reconnect errors which are to be expected. No
+ * errors are suppressed for the initial connection on an import
+ */
if ((lustre_handle_is_used(&req->rq_import->imp_remote_handle)) &&
(opc == OST_CONNECT || opc == MDS_CONNECT || opc == MGS_CONNECT)) {
@@ -1155,6 +1113,7 @@ static int ptlrpc_check_status(struct ptlrpc_request *req)
if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR) {
struct obd_import *imp = req->rq_import;
__u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
+
if (ptlrpc_console_allow(req))
LCONSOLE_ERROR_MSG(0x011, "%s: Communicating with %s, operation %s failed with %d.\n",
imp->imp_obd->obd_name,
@@ -1164,12 +1123,11 @@ static int ptlrpc_check_status(struct ptlrpc_request *req)
return err < 0 ? err : -EINVAL;
}
- if (err < 0) {
+ if (err < 0)
DEBUG_REQ(D_INFO, req, "status is %d", err);
- } else if (err > 0) {
+ else if (err > 0)
/* XXX: translate this error from net to host */
DEBUG_REQ(D_INFO, req, "status is %d", err);
- }
return err;
}
@@ -1206,7 +1164,7 @@ static int after_reply(struct ptlrpc_request *req)
struct obd_import *imp = req->rq_import;
struct obd_device *obd = req->rq_import->imp_obd;
int rc;
- struct timeval work_start;
+ struct timespec64 work_start;
long timediff;
LASSERT(obd != NULL);
@@ -1221,10 +1179,11 @@ static int after_reply(struct ptlrpc_request *req)
}
sptlrpc_cli_free_repbuf(req);
- /* Pass the required reply buffer size (include
- * space for early reply).
- * NB: no need to roundup because alloc_repbuf
- * will roundup it */
+ /*
+ * Pass the required reply buffer size (include space for early
+ * reply). NB: no need to round up because alloc_repbuf will
+ * round it up
+ */
req->rq_replen = req->rq_nob_received;
req->rq_nob_received = 0;
spin_lock(&req->rq_lock);
@@ -1243,9 +1202,7 @@ static int after_reply(struct ptlrpc_request *req)
return rc;
}
- /*
- * Security layer unwrap might ask resend this request.
- */
+ /* Security layer unwrap might ask resend this request. */
if (req->rq_resend)
return 0;
@@ -1256,7 +1213,7 @@ static int after_reply(struct ptlrpc_request *req)
/* retry indefinitely on EINPROGRESS */
if (lustre_msg_get_status(req->rq_repmsg) == -EINPROGRESS &&
ptlrpc_no_resend(req) == 0 && !req->rq_no_retry_einprogress) {
- time_t now = get_seconds();
+ time64_t now = ktime_get_real_seconds();
DEBUG_REQ(D_RPCTRACE, req, "Resending request on EINPROGRESS");
spin_lock(&req->rq_lock);
@@ -1266,18 +1223,19 @@ static int after_reply(struct ptlrpc_request *req)
/* allocate new xid to avoid reply reconstruction */
if (!req->rq_bulk) {
- /* new xid is already allocated for bulk in
- * ptlrpc_check_set() */
+ /* new xid is already allocated for bulk in ptlrpc_check_set() */
req->rq_xid = ptlrpc_next_xid();
DEBUG_REQ(D_RPCTRACE, req, "Allocating new xid for resend on EINPROGRESS");
}
/* Readjust the timeout for current conditions */
ptlrpc_at_set_req_timeout(req);
- /* delay resend to give a chance to the server to get ready.
+ /*
+ * delay resend to give a chance to the server to get ready.
* The delay is increased by 1s on every resend and is capped to
* the current request timeout (i.e. obd_timeout if AT is off,
- * or AT service time x 125% + 5s, see at_est2timeout) */
+ * or AT service time x 125% + 5s, see at_est2timeout)
+ */
if (req->rq_nr_resend > req->rq_timeout)
req->rq_sent = now + req->rq_timeout;
else
@@ -1286,8 +1244,9 @@ static int after_reply(struct ptlrpc_request *req)
return 0;
}
- do_gettimeofday(&work_start);
- timediff = cfs_timeval_sub(&work_start, &req->rq_arrival_time, NULL);
+ ktime_get_real_ts64(&work_start);
+ timediff = (work_start.tv_sec - req->rq_arrival_time.tv_sec) * USEC_PER_SEC +
+ (work_start.tv_nsec - req->rq_arrival_time.tv_nsec) / NSEC_PER_USEC;
if (obd->obd_svc_stats != NULL) {
lprocfs_counter_add(obd->obd_svc_stats, PTLRPC_REQWAIT_CNTR,
timediff);
@@ -1332,9 +1291,7 @@ static int after_reply(struct ptlrpc_request *req)
ldlm_cli_update_pool(req);
}
- /*
- * Store transno in reqmsg for replay.
- */
+ /* Store transno in reqmsg for replay. */
if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)) {
req->rq_transno = lustre_msg_get_transno(req->rq_repmsg);
lustre_msg_set_transno(req->rq_reqmsg, req->rq_transno);
@@ -1350,22 +1307,22 @@ static int after_reply(struct ptlrpc_request *req)
(req->rq_transno >
lustre_msg_get_last_committed(req->rq_repmsg) ||
req->rq_replay)) {
- /** version recovery */
+ /* version recovery */
ptlrpc_save_versions(req);
ptlrpc_retain_replayable_request(req, imp);
} else if (req->rq_commit_cb != NULL &&
list_empty(&req->rq_replay_list)) {
- /* NB: don't call rq_commit_cb if it's already on
+ /*
+ * NB: don't call rq_commit_cb if it's already on
* rq_replay_list, ptlrpc_free_committed() will call
- * it later, see LU-3618 for details */
+ * it later, see LU-3618 for details
+ */
spin_unlock(&imp->imp_lock);
req->rq_commit_cb(req);
spin_lock(&imp->imp_lock);
}
- /*
- * Replay-enabled imports return commit-status information.
- */
+ /* Replay-enabled imports return commit-status information. */
if (lustre_msg_get_last_committed(req->rq_repmsg)) {
imp->imp_peer_committed_transno =
lustre_msg_get_last_committed(req->rq_repmsg);
@@ -1404,7 +1361,7 @@ static int ptlrpc_send_new_req(struct ptlrpc_request *req)
int rc;
LASSERT(req->rq_phase == RQ_PHASE_NEW);
- if (req->rq_sent && (req->rq_sent > get_seconds()) &&
+ if (req->rq_sent && (req->rq_sent > ktime_get_real_seconds()) &&
(!req->rq_generation_set ||
req->rq_import_generation == imp->imp_generation))
return 0;
@@ -1484,8 +1441,10 @@ static inline int ptlrpc_set_producer(struct ptlrpc_request_set *set)
remaining = atomic_read(&set->set_remaining);
- /* populate the ->set_requests list with requests until we
- * reach the maximum number of RPCs in flight for this set */
+ /*
+ * populate the ->set_requests list with requests until we
+ * reach the maximum number of RPCs in flight for this set
+ */
while (atomic_read(&set->set_remaining) < set->set_max_inflight) {
rc = set->set_producer(set, set->set_producer_arg);
if (rc == -ENOENT) {
@@ -1525,7 +1484,8 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
int unregistered = 0;
int rc = 0;
- /* This schedule point is mainly for the ptlrpcd caller of this
+ /*
+ * This schedule point is mainly for the ptlrpcd caller of this
* function. Most ptlrpc sets are not long-lived and unbounded
* in length, but at the least the set used by the ptlrpcd is.
* Since the processing time is unbounded, we need to insert an
@@ -1544,7 +1504,7 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
/* delayed resend - skip */
if (req->rq_phase == RQ_PHASE_RPC && req->rq_resend &&
- req->rq_sent > get_seconds())
+ req->rq_sent > ktime_get_real_seconds())
continue;
if (!(req->rq_phase == RQ_PHASE_RPC ||
@@ -1584,8 +1544,7 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
OBD_FAIL_ONCE);
}
- /*
- * Move to next phase if reply was successfully
+ /* Move to next phase if reply was successfully
* unlinked.
*/
ptlrpc_rqphase_move(req, req->rq_next_phase);
@@ -1599,15 +1558,11 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
if (req->rq_phase == RQ_PHASE_INTERPRET)
goto interpret;
- /*
- * Note that this also will start async reply unlink.
- */
+ /* Note that this also will start async reply unlink. */
if (req->rq_net_err && !req->rq_timedout) {
ptlrpc_expire_one_request(req, 1);
- /*
- * Check if we still need to wait for unlink.
- */
+ /* Check if we still need to wait for unlink. */
if (ptlrpc_client_recv_or_unlink(req) ||
ptlrpc_client_bulk_active(req))
continue;
@@ -1632,7 +1587,8 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
goto interpret;
}
- /* ptlrpc_set_wait->l_wait_event sets lwi_allow_intr
+ /*
+ * ptlrpc_set_wait->l_wait_event sets lwi_allow_intr
* so it sets rq_intr regardless of individual rpc
* timeouts. The synchronous IO waiting path sets
* rq_intr irrespective of whether ptlrpcd
@@ -1659,8 +1615,10 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
spin_lock(&imp->imp_lock);
if (ptlrpc_import_delay_req(imp, req,
&status)) {
- /* put on delay list - only if we wait
- * recovery finished - before send */
+ /*
+ * put on delay list - only if we wait
+ * recovery finished - before send
+ */
list_del_init(&req->rq_list);
list_add_tail(&req->rq_list,
&imp->
@@ -1696,8 +1654,7 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
spin_unlock(&req->rq_lock);
if (req->rq_timedout || req->rq_resend) {
- /* This is re-sending anyways,
- * let's mark req as resend. */
+ /* This is re-sending anyway, let's mark req as resend. */
spin_lock(&req->rq_lock);
req->rq_resend = 1;
spin_unlock(&req->rq_lock);
@@ -1775,8 +1732,10 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
spin_unlock(&req->rq_lock);
- /* unlink from net because we are going to
- * swab in-place of reply buffer */
+ /*
+ * unlink from net because we are going to
+ * swab in-place of reply buffer
+ */
unregistered = ptlrpc_unregister_reply(req, 1);
if (!unregistered)
continue;
@@ -1785,7 +1744,8 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
if (req->rq_resend)
continue;
- /* If there is no bulk associated with this request,
+ /*
+ * If there is no bulk associated with this request,
* then we're done and should let the interpreter
* process the reply. Similarly if the RPC returned
* an error, and therefore the bulk will never arrive.
@@ -1803,10 +1763,12 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
continue;
if (req->rq_bulk->bd_failure) {
- /* The RPC reply arrived OK, but the bulk screwed
+ /*
+ * The RPC reply arrived OK, but the bulk screwed
* up! Dead weird since the server told us the RPC
* was good after getting the REPLY for her GET or
- * the ACK for her PUT. */
+ * the ACK for her PUT.
+ */
DEBUG_REQ(D_ERROR, req, "bulk transfer failed");
req->rq_status = -EIO;
}
@@ -1816,8 +1778,10 @@ int ptlrpc_check_set(const struct lu_env *env, struct ptlrpc_request_set *set)
interpret:
LASSERT(req->rq_phase == RQ_PHASE_INTERPRET);
- /* This moves to "unregistering" phase we need to wait for
- * reply unlink. */
+ /*
+ * This moves to "unregistering" phase we need to wait for
+ * reply unlink.
+ */
if (!unregistered && !ptlrpc_unregister_reply(req, 1)) {
/* start async bulk unlink too */
ptlrpc_unregister_bulk(req, 1);
@@ -1827,8 +1791,7 @@ interpret:
if (!ptlrpc_unregister_bulk(req, 1))
continue;
- /* When calling interpret receiving already should be
- * finished. */
+ /* When calling interpret receive should already be finished. */
LASSERT(!req->rq_receiving_reply);
ptlrpc_req_interpret(env, req, req->rq_status);
@@ -1847,10 +1810,12 @@ interpret:
lustre_msg_get_opc(req->rq_reqmsg));
spin_lock(&imp->imp_lock);
- /* Request already may be not on sending or delaying list. This
+ /*
+ * Request already may be not on sending or delaying list. This
* may happen in the case of marking it erroneous for the case
* ptlrpc_import_delay_req(req, status) find it impossible to
- * allow sending this rpc and returns *status != 0. */
+ * allow sending this rpc and returns *status != 0.
+ */
if (!list_empty(&req->rq_list)) {
list_del_init(&req->rq_list);
atomic_dec(&imp->imp_inflight);
@@ -1865,8 +1830,10 @@ interpret:
if (ptlrpc_set_producer(set) > 0)
force_timer_recalc = 1;
- /* free the request that has just been completed
- * in order not to pollute set->set_requests */
+ /*
+ * free the request that has just been completed
+ * in order not to pollute set->set_requests
+ */
list_del_init(&req->rq_set_chain);
spin_lock(&req->rq_lock);
req->rq_set = NULL;
@@ -1882,8 +1849,10 @@ interpret:
}
}
- /* move completed request at the head of list so it's easier for
- * caller to find them */
+ /*
+ * move completed request at the head of list so it's easier for
+ * caller to find them
+ */
list_splice(&comp_reqs, &set->set_requests);
/* If we hit an error, we want to recover promptly. */
@@ -1905,14 +1874,13 @@ int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink)
req->rq_timedout = 1;
spin_unlock(&req->rq_lock);
- DEBUG_REQ(D_WARNING, req, "Request sent has %s: [sent "CFS_DURATION_T
- "/real "CFS_DURATION_T"]",
+ DEBUG_REQ(D_WARNING, req, "Request sent has %s: [sent %lld/real %lld]",
req->rq_net_err ? "failed due to network error" :
((req->rq_real_sent == 0 ||
- time_before((unsigned long)req->rq_real_sent, (unsigned long)req->rq_sent) ||
- cfs_time_aftereq(req->rq_real_sent, req->rq_deadline)) ?
+ req->rq_real_sent < req->rq_sent ||
+ req->rq_real_sent >= req->rq_deadline) ?
"timed out for sent delay" : "timed out for slow reply"),
- req->rq_sent, req->rq_real_sent);
+ (s64)req->rq_sent, (s64)req->rq_real_sent);
if (imp != NULL && obd_debug_peer_on_timeout)
LNetCtl(IOC_LIBCFS_DEBUG_PEER, &imp->imp_connection->c_peer);
@@ -1934,8 +1902,10 @@ int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink)
if (imp->imp_dlm_fake)
return 1;
- /* If this request is for recovery or other primordial tasks,
- * then error it out here. */
+ /*
+ * If this request is for recovery or other primordial tasks,
+ * then error it out here.
+ */
if (req->rq_ctx_init || req->rq_ctx_fini ||
req->rq_send_state != LUSTRE_IMP_FULL ||
imp->imp_obd->obd_no_recov) {
@@ -1949,8 +1919,10 @@ int ptlrpc_expire_one_request(struct ptlrpc_request *req, int async_unlink)
return 1;
}
- /* if a request can't be resent we can't wait for an answer after
- the timeout */
+ /*
+ * if a request can't be resent we can't wait for an answer after
+ * the timeout
+ */
if (ptlrpc_no_resend(req)) {
DEBUG_REQ(D_RPCTRACE, req, "TIMEOUT-NORESEND:");
rc = 1;
@@ -1970,13 +1942,11 @@ int ptlrpc_expired_set(void *data)
{
struct ptlrpc_request_set *set = data;
struct list_head *tmp;
- time_t now = get_seconds();
+ time64_t now = ktime_get_real_seconds();
LASSERT(set != NULL);
- /*
- * A timeout expired. See which reqs it applies to...
- */
+ /* A timeout expired. See which reqs it applies to... */
list_for_each(tmp, &set->set_requests) {
struct ptlrpc_request *req =
list_entry(tmp, struct ptlrpc_request,
@@ -1996,8 +1966,10 @@ int ptlrpc_expired_set(void *data)
req->rq_deadline > now) /* not expired */
continue;
- /* Deal with this guy. Do it asynchronously to not block
- * ptlrpcd thread. */
+ /*
+ * Deal with this guy. Do it asynchronously to not block
+ * ptlrpcd thread.
+ */
ptlrpc_expire_one_request(req, 1);
}
@@ -2053,31 +2025,25 @@ EXPORT_SYMBOL(ptlrpc_interrupted_set);
int ptlrpc_set_next_timeout(struct ptlrpc_request_set *set)
{
struct list_head *tmp;
- time_t now = get_seconds();
+ time64_t now = ktime_get_real_seconds();
int timeout = 0;
struct ptlrpc_request *req;
- int deadline;
+ time64_t deadline;
list_for_each(tmp, &set->set_requests) {
req = list_entry(tmp, struct ptlrpc_request, rq_set_chain);
- /*
- * Request in-flight?
- */
+ /* Request in-flight? */
if (!(((req->rq_phase == RQ_PHASE_RPC) && !req->rq_waiting) ||
(req->rq_phase == RQ_PHASE_BULK) ||
(req->rq_phase == RQ_PHASE_NEW)))
continue;
- /*
- * Already timed out.
- */
+ /* Already timed out. */
if (req->rq_timedout)
continue;
- /*
- * Waiting for ctx.
- */
+ /* Waiting for ctx. */
if (req->rq_wait_ctx)
continue;
@@ -2126,8 +2092,10 @@ int ptlrpc_set_wait(struct ptlrpc_request_set *set)
do {
timeout = ptlrpc_set_next_timeout(set);
- /* wait until all complete, interrupted, or an in-flight
- * req times out */
+ /*
+ * wait until all complete, interrupted, or an in-flight
+ * req times out
+ */
CDEBUG(D_RPCTRACE, "set %p going to sleep for %d seconds\n",
set, timeout);
@@ -2152,18 +2120,22 @@ int ptlrpc_set_wait(struct ptlrpc_request_set *set)
rc = l_wait_event(set->set_waitq, ptlrpc_check_set(NULL, set), &lwi);
- /* LU-769 - if we ignored the signal because it was already
+ /*
+ * LU-769 - if we ignored the signal because it was already
* pending when we started, we need to handle it now or we risk
- * it being ignored forever */
+ * it being ignored forever
+ */
if (rc == -ETIMEDOUT && !lwi.lwi_allow_intr &&
cfs_signal_pending()) {
sigset_t blocked_sigs =
cfs_block_sigsinv(LUSTRE_FATAL_SIGS);
- /* In fact we only interrupt for the "fatal" signals
+ /*
+ * In fact we only interrupt for the "fatal" signals
* like SIGINT or SIGKILL. We still ignore less
* important signals since ptlrpc set is not easily
- * reentrant from userspace again */
+ * reentrant from userspace again
+ */
if (cfs_signal_pending())
ptlrpc_interrupted_set(set);
cfs_restore_sigs(blocked_sigs);
@@ -2171,13 +2143,15 @@ int ptlrpc_set_wait(struct ptlrpc_request_set *set)
LASSERT(rc == 0 || rc == -EINTR || rc == -ETIMEDOUT);
- /* -EINTR => all requests have been flagged rq_intr so next
+ /*
+ * -EINTR => all requests have been flagged rq_intr so next
* check completes.
* -ETIMEDOUT => someone timed out. When all reqs have
* timed out, signals are enabled allowing completion with
* EINTR.
* I don't really care if we go once more round the loop in
- * the error cases -eeb. */
+ * the error cases -eeb.
+ */
if (rc == 0 && atomic_read(&set->set_remaining) == 0) {
list_for_each(tmp, &set->set_requests) {
req = list_entry(tmp, struct ptlrpc_request,
@@ -2243,8 +2217,10 @@ static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
req_capsule_fini(&request->rq_pill);
- /* We must take it off the imp_replay_list first. Otherwise, we'll set
- * request->rq_reqmsg to NULL while osc_close is dereferencing it. */
+ /*
+ * We must take it off the imp_replay_list first. Otherwise, we'll set
+ * request->rq_reqmsg to NULL while osc_close is dereferencing it.
+ */
if (request->rq_import != NULL) {
if (!locked)
spin_lock(&request->rq_import->imp_lock);
@@ -2285,18 +2261,6 @@ static void __ptlrpc_free_req(struct ptlrpc_request *request, int locked)
ptlrpc_request_cache_free(request);
}
-static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked);
-/**
- * Drop one request reference. Must be called with import imp_lock held.
- * When reference count drops to zero, request is freed.
- */
-void ptlrpc_req_finished_with_imp_lock(struct ptlrpc_request *request)
-{
- assert_spin_locked(&request->rq_import->imp_lock);
- (void)__ptlrpc_req_finished(request, 1);
-}
-EXPORT_SYMBOL(ptlrpc_req_finished_with_imp_lock);
-
/**
* Helper function
* Drops one reference count for request \a request.
@@ -2357,40 +2321,28 @@ int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async)
wait_queue_head_t *wq;
struct l_wait_info lwi;
- /*
- * Might sleep.
- */
+ /* Might sleep. */
LASSERT(!in_interrupt());
- /*
- * Let's setup deadline for reply unlink.
- */
+ /* Let's setup deadline for reply unlink. */
if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_REPL_UNLINK) &&
async && request->rq_reply_deadline == 0)
- request->rq_reply_deadline = get_seconds()+LONG_UNLINK;
+ request->rq_reply_deadline = ktime_get_real_seconds()+LONG_UNLINK;
- /*
- * Nothing left to do.
- */
+ /* Nothing left to do. */
if (!ptlrpc_client_recv_or_unlink(request))
return 1;
LNetMDUnlink(request->rq_reply_md_h);
- /*
- * Let's check it once again.
- */
+ /* Let's check it once again. */
if (!ptlrpc_client_recv_or_unlink(request))
return 1;
- /*
- * Move to "Unregistering" phase as reply was not unlinked yet.
- */
+ /* Move to "Unregistering" phase as reply was not unlinked yet. */
ptlrpc_rqphase_move(request, RQ_PHASE_UNREGISTERING);
- /*
- * Do not wait for unlink to finish.
- */
+ /* Do not wait for unlink to finish. */
if (async)
return 0;
@@ -2405,8 +2357,10 @@ int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async)
wq = &request->rq_reply_waitq;
for (;;) {
- /* Network access will complete in finite time but the HUGE
- * timeout lets us CWARN for visibility of sluggish NALs */
+ /*
+ * Network access will complete in finite time but the HUGE
+ * timeout lets us CWARN for visibility of sluggish NALs
+ */
lwi = LWI_TIMEOUT_INTERVAL(cfs_time_seconds(LONG_UNLINK),
cfs_time_seconds(1), NULL, NULL);
rc = l_wait_event(*wq, !ptlrpc_client_recv_or_unlink(request),
@@ -2538,11 +2492,6 @@ free_req:
}
}
-void ptlrpc_cleanup_client(struct obd_import *imp)
-{
-}
-EXPORT_SYMBOL(ptlrpc_cleanup_client);
-
/**
* Schedule previously sent request for resend.
* For bulk requests we assign new xid (to avoid problems with
@@ -2554,8 +2503,10 @@ void ptlrpc_resend_req(struct ptlrpc_request *req)
DEBUG_REQ(D_HA, req, "going to resend");
spin_lock(&req->rq_lock);
- /* Request got reply but linked to the import list still.
- Let ptlrpc_check_set() to process it. */
+ /*
+ * Request got reply but linked to the import list still.
+ * Let ptlrpc_check_set() to process it.
+ */
if (ptlrpc_client_replied(req)) {
spin_unlock(&req->rq_lock);
DEBUG_REQ(D_HA, req, "it has reply, so skip it");
@@ -2581,20 +2532,6 @@ void ptlrpc_resend_req(struct ptlrpc_request *req)
}
EXPORT_SYMBOL(ptlrpc_resend_req);
-/* XXX: this function and rq_status are currently unused */
-void ptlrpc_restart_req(struct ptlrpc_request *req)
-{
- DEBUG_REQ(D_HA, req, "restarting (possibly-)completed request");
- req->rq_status = -ERESTARTSYS;
-
- spin_lock(&req->rq_lock);
- req->rq_restart = 1;
- req->rq_timedout = 0;
- ptlrpc_client_wake_req(req);
- spin_unlock(&req->rq_lock);
-}
-EXPORT_SYMBOL(ptlrpc_restart_req);
-
/**
* Grab additional reference on a request \a req
*/
@@ -2621,8 +2558,10 @@ void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
LBUG();
}
- /* clear this for new requests that were resent as well
- as resent replayed requests. */
+ /*
+ * clear this for new requests that were resent as well
+ * as resent replayed requests.
+ */
lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
/* don't re-add requests that have been replayed */
@@ -2639,7 +2578,8 @@ void ptlrpc_retain_replayable_request(struct ptlrpc_request *req,
list_entry(tmp, struct ptlrpc_request,
rq_replay_list);
- /* We may have duplicate transnos if we create and then
+ /*
+ * We may have duplicate transnos if we create and then
* open a file, or for closes retained if to match creating
* opens, so use req->rq_xid as a secondary key.
* (See bugs 684, 685, and 428.)
@@ -2824,8 +2764,10 @@ int ptlrpc_replay_req(struct ptlrpc_request *req)
/* Readjust the timeout for current conditions */
ptlrpc_at_set_req_timeout(req);
- /* Tell server the net_latency, so the server can calculate how long
- * it should wait for next replay */
+ /*
+ * Tell server the net_latency, so the server can calculate how long
+ * it should wait for next replay
+ */
lustre_msg_set_service_time(req->rq_reqmsg,
ptlrpc_at_get_net_latency(req));
DEBUG_REQ(D_HA, req, "REPLAY");
@@ -2833,7 +2775,7 @@ int ptlrpc_replay_req(struct ptlrpc_request *req)
atomic_inc(&req->rq_import->imp_replay_inflight);
ptlrpc_request_addref(req); /* ptlrpcd needs a ref */
- ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
+ ptlrpcd_add_req(req);
return 0;
}
EXPORT_SYMBOL(ptlrpc_replay_req);
@@ -2845,13 +2787,15 @@ void ptlrpc_abort_inflight(struct obd_import *imp)
{
struct list_head *tmp, *n;
- /* Make sure that no new requests get processed for this import.
+ /*
+ * Make sure that no new requests get processed for this import.
* ptlrpc_{queue,set}_wait must (and does) hold imp_lock while testing
* this flag and then putting requests on sending_list or delayed_list.
*/
spin_lock(&imp->imp_lock);
- /* XXX locking? Maybe we should remove each request with the list
+ /*
+ * XXX locking? Maybe we should remove each request with the list
* locked? Also, how do we know if the requests on the list are
* being freed at this time?
*/
@@ -2885,8 +2829,10 @@ void ptlrpc_abort_inflight(struct obd_import *imp)
spin_unlock(&req->rq_lock);
}
- /* Last chance to free reqs left on the replay list, but we
- * will still leak reqs that haven't committed. */
+ /*
+ * Last chance to free reqs left on the replay list, but we
+ * will still leak reqs that haven't committed.
+ */
if (imp->imp_replayable)
ptlrpc_free_committed(imp);
@@ -2942,7 +2888,7 @@ static spinlock_t ptlrpc_last_xid_lock;
#define YEAR_2004 (1ULL << 30)
void ptlrpc_init_xid(void)
{
- time_t now = get_seconds();
+ time64_t now = ktime_get_real_seconds();
spin_lock_init(&ptlrpc_last_xid_lock);
if (now < YEAR_2004) {
@@ -2954,7 +2900,7 @@ void ptlrpc_init_xid(void)
}
/* Always need to be aligned to a power-of-two for multi-bulk BRW */
- CLASSERT((PTLRPC_BULK_OPS_COUNT & (PTLRPC_BULK_OPS_COUNT - 1)) == 0);
+ CLASSERT(((PTLRPC_BULK_OPS_COUNT - 1) & PTLRPC_BULK_OPS_COUNT) == 0);
ptlrpc_last_xid &= PTLRPC_BULK_OPS_MASK;
}
@@ -3031,7 +2977,7 @@ static void ptlrpcd_add_work_req(struct ptlrpc_request *req)
{
/* re-initialize the req */
req->rq_timeout = obd_timeout;
- req->rq_sent = get_seconds();
+ req->rq_sent = ktime_get_real_seconds();
req->rq_deadline = req->rq_sent + req->rq_timeout;
req->rq_reply_deadline = req->rq_deadline;
req->rq_phase = RQ_PHASE_INTERPRET;
@@ -3039,7 +2985,7 @@ static void ptlrpcd_add_work_req(struct ptlrpc_request *req)
req->rq_xid = ptlrpc_next_xid();
req->rq_import_generation = req->rq_import->imp_generation;
- ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
+ ptlrpcd_add_req(req);
}
static int work_interpreter(const struct lu_env *env,
diff --git a/drivers/staging/lustre/lustre/ptlrpc/connection.c b/drivers/staging/lustre/lustre/ptlrpc/connection.c
index ffe36e222..da1f0b1ac 100644
--- a/drivers/staging/lustre/lustre/ptlrpc/connection.c
+++ b/drivers/staging/lustre/lustre/ptlrpc/connection.c
@@ -42,7 +42,7 @@
#include "ptlrpc_internal.h"
static struct cfs_hash *conn_hash;
-static cfs_hash_ops_t conn_hash_ops;
+static struct cfs_hash_ops conn_hash_ops;
struct ptlrpc_connection *
ptlrpc_connection_get(lnet_process_id_t peer, lnet_nid_t self,
@@ -173,7 +173,7 @@ conn_keycmp(const void *key, struct hlist_node *hnode)
const lnet_process_id_t *conn_key;
LASSERT(key != NULL);
- conn_key = (lnet_process_id_t *)key;
+ conn_key = key;
conn = hlist_entry(hnode, struct ptlrpc_connection, c_hash);
return conn_key->nid == conn->c_peer.nid &&
@@ -230,12 +230,12 @@ conn_exit(struct cfs_hash *hs, struct hlist_node *hnode)
kfree(conn);
}
-static cfs_hash_ops_t conn_hash_ops = {
+static struct cfs_hash_ops conn_hash_ops = {
.hs_hash = conn_hashfn,
.hs_keycmp = conn_keycmp,
- .hs_key = conn_key,
+ .hs_key = conn_key,
.hs_object = conn_object,
- .hs_get = conn_get,
+ .hs_get = conn_get,
.hs_put_locked = conn_put_locked,
.hs_exit = conn_exit,
};
diff --git a/drivers/staging/lustre/lustre/ptlrpc/events.c b/drivers/staging/lustre/lustre/ptlrpc/events.c
index c8ef9e578..9c2fd34e2 100644
--- a/drivers/staging/lustre/lustre/ptlrpc/events.c
+++ b/drivers/staging/lustre/lustre/ptlrpc/events.c
@@ -64,7 +64,7 @@ void request_out_callback(lnet_event_t *ev)
sptlrpc_request_out_callback(req);
spin_lock(&req->rq_lock);
- req->rq_real_sent = get_seconds();
+ req->rq_real_sent = ktime_get_real_seconds();
if (ev->unlinked)
req->rq_req_unlink = 0;
@@ -158,7 +158,7 @@ void reply_in_callback(lnet_event_t *ev)
ev->mlength, ev->offset, req->rq_replen);
}
- req->rq_import->imp_last_reply_time = get_seconds();
+ req->rq_import->imp_last_reply_time = ktime_get_real_seconds();
out_wake:
/* NB don't unlock till after wakeup; req can disappear under us
@@ -246,7 +246,7 @@ static void ptlrpc_req_add_history(struct ptlrpc_service_part *svcpt,
struct ptlrpc_request *req)
{
__u64 sec = req->rq_arrival_time.tv_sec;
- __u32 usec = req->rq_arrival_time.tv_usec >> 4; /* usec / 16 */
+ __u32 usec = req->rq_arrival_time.tv_nsec / NSEC_PER_USEC / 16; /* usec / 16 */
__u64 new_seq;
/* set sequence ID for request and add it to history list,
@@ -327,7 +327,7 @@ void request_in_callback(lnet_event_t *ev)
req->rq_reqbuf = ev->md.start + ev->offset;
if (ev->type == LNET_EVENT_PUT && ev->status == 0)
req->rq_reqdata_len = ev->mlength;
- do_gettimeofday(&req->rq_arrival_time);
+ ktime_get_real_ts64(&req->rq_arrival_time);
req->rq_peer = ev->initiator;
req->rq_self = ev->target.nid;
req->rq_rqbd = rqbd;
@@ -415,7 +415,6 @@ void reply_out_callback(lnet_event_t *ev)
}
}
-
static void ptlrpc_master_callback(lnet_event_t *ev)
{
struct ptlrpc_cb_id *cbid = ev->md.user_ptr;
@@ -521,7 +520,7 @@ static void ptlrpc_ni_fini(void)
/* notreached */
}
-lnet_pid_t ptl_get_pid(void)
+static lnet_pid_t ptl_get_pid(void)
{
lnet_pid_t pid;
@@ -560,7 +559,6 @@ static int ptlrpc_ni_init(void)
return -ENOMEM;
}
-
int ptlrpc_init_portals(void)
{
int rc = ptlrpc_ni_init();
diff --git a/drivers/staging/lustre/lustre/ptlrpc/import.c b/drivers/staging/lustre/lustre/ptlrpc/import.c
index 1eae3896c..bfa410f7e 100644
--- a/drivers/staging/lustre/lustre/ptlrpc/import.c
+++ b/drivers/staging/lustre/lustre/ptlrpc/import.c
@@ -79,7 +79,7 @@ static void __import_set_state(struct obd_import *imp,
imp->imp_state = state;
imp->imp_state_hist[imp->imp_state_hist_idx].ish_state = state;
imp->imp_state_hist[imp->imp_state_hist_idx].ish_time =
- get_seconds();
+ ktime_get_real_seconds();
imp->imp_state_hist_idx = (imp->imp_state_hist_idx + 1) %
IMP_STATE_HIST_LEN;
}
@@ -103,7 +103,6 @@ do { \
spin_unlock(&imp->imp_lock); \
} while (0)
-
static int ptlrpc_connect_interpret(const struct lu_env *env,
struct ptlrpc_request *request,
void *data, int rc);
@@ -128,7 +127,8 @@ int ptlrpc_init_import(struct obd_import *imp)
EXPORT_SYMBOL(ptlrpc_init_import);
#define UUID_STR "_UUID"
-void deuuidify(char *uuid, const char *prefix, char **uuid_start, int *uuid_len)
+static void deuuidify(char *uuid, const char *prefix, char **uuid_start,
+ int *uuid_len)
{
*uuid_start = !prefix || strncmp(uuid, prefix, strlen(prefix))
? uuid : uuid + strlen(prefix);
@@ -142,7 +142,6 @@ void deuuidify(char *uuid, const char *prefix, char **uuid_start, int *uuid_len)
UUID_STR, strlen(UUID_STR)))
*uuid_len -= strlen(UUID_STR);
}
-EXPORT_SYMBOL(deuuidify);
/**
* Returns true if import was FULL, false if import was already not
@@ -200,12 +199,15 @@ int ptlrpc_set_import_discon(struct obd_import *imp, __u32 conn_cnt)
return rc;
}
-/* Must be called with imp_lock held! */
-static void ptlrpc_deactivate_and_unlock_import(struct obd_import *imp)
+/*
+ * This acts as a barrier; all existing requests are rejected, and
+ * no new requests will be accepted until the import is valid again.
+ */
+void ptlrpc_deactivate_import(struct obd_import *imp)
{
- assert_spin_locked(&imp->imp_lock);
-
CDEBUG(D_HA, "setting import %s INVALID\n", obd2cli_tgt(imp->imp_obd));
+
+ spin_lock(&imp->imp_lock);
imp->imp_invalid = 1;
imp->imp_generation++;
spin_unlock(&imp->imp_lock);
@@ -213,20 +215,10 @@ static void ptlrpc_deactivate_and_unlock_import(struct obd_import *imp)
ptlrpc_abort_inflight(imp);
obd_import_event(imp->imp_obd, imp, IMP_EVENT_INACTIVE);
}
-
-/*
- * This acts as a barrier; all existing requests are rejected, and
- * no new requests will be accepted until the import is valid again.
- */
-void ptlrpc_deactivate_import(struct obd_import *imp)
-{
- spin_lock(&imp->imp_lock);
- ptlrpc_deactivate_and_unlock_import(imp);
-}
EXPORT_SYMBOL(ptlrpc_deactivate_import);
static unsigned int
-ptlrpc_inflight_deadline(struct ptlrpc_request *req, time_t now)
+ptlrpc_inflight_deadline(struct ptlrpc_request *req, time64_t now)
{
long dl;
@@ -251,7 +243,7 @@ ptlrpc_inflight_deadline(struct ptlrpc_request *req, time_t now)
static unsigned int ptlrpc_inflight_timeout(struct obd_import *imp)
{
- time_t now = get_seconds();
+ time64_t now = ktime_get_real_seconds();
struct list_head *tmp, *n;
struct ptlrpc_request *req;
unsigned int timeout = 0;
@@ -461,6 +453,7 @@ int ptlrpc_reconnect_import(struct obd_import *imp)
if (atomic_read(&imp->imp_inval_count) > 0) {
int rc;
struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
+
rc = l_wait_event(imp->imp_recovery_waitq,
(atomic_read(&imp->imp_inval_count) == 0),
&lwi);
@@ -542,6 +535,7 @@ static int import_select_connection(struct obd_import *imp)
trying to reconnect on it.) */
if (tried_all && (imp->imp_conn_list.next == &imp_conn->oic_item)) {
struct adaptive_timeout *at = &imp->imp_at.iat_net_latency;
+
if (at_get(at) < CONNECTION_SWITCH_MAX) {
at_measured(at, at_get(at) + CONNECTION_SWITCH_INC);
if (at_get(at) > CONNECTION_SWITCH_MAX)
@@ -749,12 +743,11 @@ int ptlrpc_connect_import(struct obd_import *imp)
DEBUG_REQ(D_RPCTRACE, request, "(re)connect request (timeout %d)",
request->rq_timeout);
- ptlrpcd_add_req(request, PDL_POLICY_ROUND, -1);
+ ptlrpcd_add_req(request);
rc = 0;
out:
- if (rc != 0) {
+ if (rc != 0)
IMPORT_SET_STATE(imp, LUSTRE_IMP_DISCON);
- }
return rc;
}
@@ -906,7 +899,7 @@ static int ptlrpc_connect_interpret(const struct lu_env *env,
}
/* Determine what recovery state to move the import to. */
- if (MSG_CONNECT_RECONNECT & msg_flags) {
+ if (msg_flags & MSG_CONNECT_RECONNECT) {
memset(&old_hdl, 0, sizeof(old_hdl));
if (!memcmp(&old_hdl, lustre_msg_get_handle(request->rq_repmsg),
sizeof(old_hdl))) {
@@ -931,7 +924,7 @@ static int ptlrpc_connect_interpret(const struct lu_env *env,
* eviction. If it is in recovery - we are safe to
* participate since we can reestablish all of our state
* with server again */
- if ((MSG_CONNECT_RECOVERING & msg_flags)) {
+ if ((msg_flags & MSG_CONNECT_RECOVERING)) {
CDEBUG(level, "%s@%s changed server handle from %#llx to %#llx but is still in recovery\n",
obd2cli_tgt(imp->imp_obd),
imp->imp_connection->c_remote_uuid.uuid,
@@ -948,11 +941,10 @@ static int ptlrpc_connect_interpret(const struct lu_env *env,
request->rq_repmsg)->cookie);
}
-
imp->imp_remote_handle =
*lustre_msg_get_handle(request->rq_repmsg);
- if (!(MSG_CONNECT_RECOVERING & msg_flags)) {
+ if (!(msg_flags & MSG_CONNECT_RECOVERING)) {
IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
rc = 0;
goto finish;
@@ -968,7 +960,7 @@ static int ptlrpc_connect_interpret(const struct lu_env *env,
CDEBUG(D_HA, "%s: reconnected but import is invalid; marking evicted\n",
imp->imp_obd->obd_name);
IMPORT_SET_STATE(imp, LUSTRE_IMP_EVICTED);
- } else if (MSG_CONNECT_RECOVERING & msg_flags) {
+ } else if (msg_flags & MSG_CONNECT_RECOVERING) {
CDEBUG(D_HA, "%s: reconnected to %s during replay\n",
imp->imp_obd->obd_name,
obd2cli_tgt(imp->imp_obd));
@@ -981,7 +973,7 @@ static int ptlrpc_connect_interpret(const struct lu_env *env,
} else {
IMPORT_SET_STATE(imp, LUSTRE_IMP_RECOVER);
}
- } else if ((MSG_CONNECT_RECOVERING & msg_flags) && !imp->imp_invalid) {
+ } else if ((msg_flags & MSG_CONNECT_RECOVERING) && !imp->imp_invalid) {
LASSERT(imp->imp_replayable);
imp->imp_remote_handle =
*lustre_msg_get_handle(request->rq_repmsg);
@@ -1264,7 +1256,7 @@ static int signal_completed_replay(struct obd_import *imp)
req->rq_timeout *= 3;
req->rq_interpret_reply = completed_replay_interpret;
- ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
+ ptlrpcd_add_req(req);
return 0;
}
@@ -1511,16 +1503,6 @@ out:
}
EXPORT_SYMBOL(ptlrpc_disconnect_import);
-void ptlrpc_cleanup_imp(struct obd_import *imp)
-{
- spin_lock(&imp->imp_lock);
- IMPORT_SET_STATE_NOLOCK(imp, LUSTRE_IMP_CLOSED);
- imp->imp_generation++;
- spin_unlock(&imp->imp_lock);
- ptlrpc_abort_inflight(imp);
-}
-EXPORT_SYMBOL(ptlrpc_cleanup_imp);
-
/* Adaptive Timeout utils */
extern unsigned int at_min, at_max, at_history;
@@ -1531,12 +1513,12 @@ extern unsigned int at_min, at_max, at_history;
int at_measured(struct adaptive_timeout *at, unsigned int val)
{
unsigned int old = at->at_current;
- time_t now = get_seconds();
- time_t binlimit = max_t(time_t, at_history / AT_BINS, 1);
+ time64_t now = ktime_get_real_seconds();
+ long binlimit = max_t(long, at_history / AT_BINS, 1);
LASSERT(at);
CDEBUG(D_OTHER, "add %u to %p time=%lu v=%u (%u %u %u %u)\n",
- val, at, now - at->at_binstart, at->at_current,
+ val, at, (long)(now - at->at_binstart), at->at_current,
at->at_hist[0], at->at_hist[1], at->at_hist[2], at->at_hist[3]);
if (val == 0)
@@ -1561,7 +1543,7 @@ int at_measured(struct adaptive_timeout *at, unsigned int val)
int i, shift;
unsigned int maxv = val;
/* move bins over */
- shift = (now - at->at_binstart) / binlimit;
+ shift = (u32)(now - at->at_binstart) / binlimit;
LASSERT(shift > 0);
for (i = AT_BINS - 1; i >= 0; i--) {
if (i >= shift) {
diff --git a/drivers/staging/lustre/lustre/ptlrpc/layout.c b/drivers/staging/lustre/lustre/ptlrpc/layout.c
index d14c20008..d7c4f4780 100644
--- a/drivers/staging/lustre/lustre/ptlrpc/layout.c
+++ b/drivers/staging/lustre/lustre/ptlrpc/layout.c
@@ -404,6 +404,7 @@ static const struct req_msg_field *ldlm_intent_layout_client[] = {
&RMF_LAYOUT_INTENT,
&RMF_EADATA /* for new layout to be set up */
};
+
static const struct req_msg_field *ldlm_intent_open_server[] = {
&RMF_PTLRPC_BODY,
&RMF_DLM_REP,
@@ -578,7 +579,6 @@ static const struct req_msg_field *ost_destroy_client[] = {
&RMF_CAPA1
};
-
static const struct req_msg_field *ost_brw_client[] = {
&RMF_PTLRPC_BODY,
&RMF_OST_BODY,
@@ -789,17 +789,17 @@ enum rmf_flags {
/**
* The field is a string, must be NUL-terminated.
*/
- RMF_F_STRING = 1 << 0,
+ RMF_F_STRING = BIT(0),
/**
* The field's buffer size need not match the declared \a rmf_size.
*/
- RMF_F_NO_SIZE_CHECK = 1 << 1,
+ RMF_F_NO_SIZE_CHECK = BIT(1),
/**
* The field's buffer size must be a whole multiple of the declared \a
* rmf_size and the \a rmf_swabber function must work on the declared \a
* rmf_size worth of bytes.
*/
- RMF_F_STRUCT_ARRAY = 1 << 2
+ RMF_F_STRUCT_ARRAY = BIT(2)
};
struct req_capsule;
@@ -1679,7 +1679,7 @@ EXPORT_SYMBOL(req_layout_fini);
* req_capsule_msg_size(). The \a rc_area information is used by.
* ptlrpc_request_set_replen().
*/
-void req_capsule_init_area(struct req_capsule *pill)
+static void req_capsule_init_area(struct req_capsule *pill)
{
int i;
@@ -1688,7 +1688,6 @@ void req_capsule_init_area(struct req_capsule *pill)
pill->rc_area[RCL_SERVER][i] = -1;
}
}
-EXPORT_SYMBOL(req_capsule_init_area);
/**
* Initialize a pill.
@@ -1996,55 +1995,6 @@ static void *__req_capsule_get(struct req_capsule *pill,
}
/**
- * Dump a request and/or reply
- */
-static void __req_capsule_dump(struct req_capsule *pill, enum req_location loc)
-{
- const struct req_format *fmt;
- const struct req_msg_field *field;
- int len;
- int i;
-
- fmt = pill->rc_fmt;
-
- DEBUG_REQ(D_RPCTRACE, pill->rc_req, "BEGIN REQ CAPSULE DUMP\n");
- for (i = 0; i < fmt->rf_fields[loc].nr; ++i) {
- field = FMT_FIELD(fmt, loc, i);
- if (field->rmf_dumper == NULL) {
- /*
- * FIXME Add a default hex dumper for fields that don't
- * have a specific dumper
- */
- len = req_capsule_get_size(pill, field, loc);
- CDEBUG(D_RPCTRACE, "Field %s has no dumper function; field size is %d\n",
- field->rmf_name, len);
- } else {
- /* It's the dumping side-effect that we're interested in */
- (void) __req_capsule_get(pill, field, loc, NULL, 1);
- }
- }
- CDEBUG(D_RPCTRACE, "END REQ CAPSULE DUMP\n");
-}
-
-/**
- * Dump a request.
- */
-void req_capsule_client_dump(struct req_capsule *pill)
-{
- __req_capsule_dump(pill, RCL_CLIENT);
-}
-EXPORT_SYMBOL(req_capsule_client_dump);
-
-/**
- * Dump a reply
- */
-void req_capsule_server_dump(struct req_capsule *pill)
-{
- __req_capsule_dump(pill, RCL_SERVER);
-}
-EXPORT_SYMBOL(req_capsule_server_dump);
-
-/**
* Trivial wrapper around __req_capsule_get(), that returns the PTLRPC request
* buffer corresponding to the given RMF (\a field) of a \a pill.
*/
@@ -2136,21 +2086,6 @@ void *req_capsule_server_sized_swab_get(struct req_capsule *pill,
EXPORT_SYMBOL(req_capsule_server_sized_swab_get);
/**
- * Returns the buffer of a \a pill corresponding to the given \a field from the
- * request (if the caller is executing on the server-side) or reply (if the
- * caller is executing on the client-side).
- *
- * This function convenient for use is code that could be executed on the
- * client and server alike.
- */
-const void *req_capsule_other_get(struct req_capsule *pill,
- const struct req_msg_field *field)
-{
- return __req_capsule_get(pill, field, pill->rc_loc ^ 1, NULL, 0);
-}
-EXPORT_SYMBOL(req_capsule_other_get);
-
-/**
* Set the size of the PTLRPC request/reply (\a loc) buffer for the given \a
* field of the given \a pill.
*
@@ -2324,9 +2259,9 @@ EXPORT_SYMBOL(req_capsule_has_field);
* Returns a non-zero value if the given \a field is present in the given \a
* pill's PTLRPC request or reply (\a loc), else it returns 0.
*/
-int req_capsule_field_present(const struct req_capsule *pill,
- const struct req_msg_field *field,
- enum req_location loc)
+static int req_capsule_field_present(const struct req_capsule *pill,
+ const struct req_msg_field *field,
+ enum req_location loc)
{
int offset;
@@ -2336,7 +2271,6 @@ int req_capsule_field_present(const struct req_capsule *pill,
offset = __req_capsule_offset(pill, field, loc);
return lustre_msg_bufcount(__req_msg(pill, loc)) > offset;
}
-EXPORT_SYMBOL(req_capsule_field_present);
/**
* This function shrinks the size of the _buffer_ of the \a pill's PTLRPC
@@ -2376,67 +2310,5 @@ void req_capsule_shrink(struct req_capsule *pill,
}
EXPORT_SYMBOL(req_capsule_shrink);
-int req_capsule_server_grow(struct req_capsule *pill,
- const struct req_msg_field *field,
- unsigned int newlen)
-{
- struct ptlrpc_reply_state *rs = pill->rc_req->rq_reply_state, *nrs;
- char *from, *to;
- int offset, len, rc;
-
- LASSERT(pill->rc_fmt != NULL);
- LASSERT(__req_format_is_sane(pill->rc_fmt));
- LASSERT(req_capsule_has_field(pill, field, RCL_SERVER));
- LASSERT(req_capsule_field_present(pill, field, RCL_SERVER));
-
- len = req_capsule_get_size(pill, field, RCL_SERVER);
- offset = __req_capsule_offset(pill, field, RCL_SERVER);
- if (pill->rc_req->rq_repbuf_len >=
- lustre_packed_msg_size(pill->rc_req->rq_repmsg) - len + newlen)
- CERROR("Inplace repack might be done\n");
-
- pill->rc_req->rq_reply_state = NULL;
- req_capsule_set_size(pill, field, RCL_SERVER, newlen);
- rc = req_capsule_server_pack(pill);
- if (rc) {
- /* put old rs back, the caller will decide what to do */
- pill->rc_req->rq_reply_state = rs;
- return rc;
- }
- nrs = pill->rc_req->rq_reply_state;
- /* Now we need only buffers, copy first chunk */
- to = lustre_msg_buf(nrs->rs_msg, 0, 0);
- from = lustre_msg_buf(rs->rs_msg, 0, 0);
- len = (char *)lustre_msg_buf(rs->rs_msg, offset, 0) - from;
- memcpy(to, from, len);
- /* check if we have tail and copy it too */
- if (rs->rs_msg->lm_bufcount > offset + 1) {
- to = lustre_msg_buf(nrs->rs_msg, offset + 1, 0);
- from = lustre_msg_buf(rs->rs_msg, offset + 1, 0);
- offset = rs->rs_msg->lm_bufcount - 1;
- len = (char *)lustre_msg_buf(rs->rs_msg, offset, 0) +
- cfs_size_round(rs->rs_msg->lm_buflens[offset]) - from;
- memcpy(to, from, len);
- }
- /* drop old reply if everything is fine */
- if (rs->rs_difficult) {
- /* copy rs data */
- int i;
-
- nrs->rs_difficult = 1;
- nrs->rs_no_ack = rs->rs_no_ack;
- for (i = 0; i < rs->rs_nlocks; i++) {
- nrs->rs_locks[i] = rs->rs_locks[i];
- nrs->rs_modes[i] = rs->rs_modes[i];
- nrs->rs_nlocks++;
- }
- rs->rs_nlocks = 0;
- rs->rs_difficult = 0;
- rs->rs_no_ack = 0;
- }
- ptlrpc_rs_decref(rs);
- return 0;
-}
-EXPORT_SYMBOL(req_capsule_server_grow);
/* __REQ_LAYOUT_USER__ */
#endif
diff --git a/drivers/staging/lustre/lustre/ptlrpc/llog_client.c b/drivers/staging/lustre/lustre/ptlrpc/llog_client.c
index 1c701e0a0..5122205cb 100644
--- a/drivers/staging/lustre/lustre/ptlrpc/llog_client.c
+++ b/drivers/staging/lustre/lustre/ptlrpc/llog_client.c
@@ -118,6 +118,7 @@ static int llog_client_open(const struct lu_env *env,
if (name) {
char *tmp;
+
tmp = req_capsule_client_sized_get(&req->rq_pill, &RMF_NAME,
strlen(name) + 1);
LASSERT(tmp);
@@ -142,41 +143,6 @@ out:
return rc;
}
-static int llog_client_destroy(const struct lu_env *env,
- struct llog_handle *loghandle)
-{
- struct obd_import *imp;
- struct ptlrpc_request *req = NULL;
- struct llogd_body *body;
- int rc;
-
- LLOG_CLIENT_ENTRY(loghandle->lgh_ctxt, imp);
- req = ptlrpc_request_alloc_pack(imp, &RQF_LLOG_ORIGIN_HANDLE_DESTROY,
- LUSTRE_LOG_VERSION,
- LLOG_ORIGIN_HANDLE_DESTROY);
- if (req == NULL) {
- rc = -ENOMEM;
- goto err_exit;
- }
-
- body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_BODY);
- body->lgd_logid = loghandle->lgh_id;
- body->lgd_llh_flags = loghandle->lgh_hdr->llh_flags;
-
- if (!(body->lgd_llh_flags & LLOG_F_IS_PLAIN))
- CERROR("%s: wrong llog flags %x\n", imp->imp_obd->obd_name,
- body->lgd_llh_flags);
-
- ptlrpc_request_set_replen(req);
- rc = ptlrpc_queue_wait(req);
-
- ptlrpc_req_finished(req);
-err_exit:
- LLOG_CLIENT_EXIT(loghandle->lgh_ctxt, imp);
- return rc;
-}
-
-
static int llog_client_next_block(const struct lu_env *env,
struct llog_handle *loghandle,
int *cur_idx, int next_idx,
@@ -360,7 +326,6 @@ struct llog_operations llog_client_ops = {
.lop_prev_block = llog_client_prev_block,
.lop_read_header = llog_client_read_header,
.lop_open = llog_client_open,
- .lop_destroy = llog_client_destroy,
.lop_close = llog_client_close,
};
EXPORT_SYMBOL(llog_client_ops);
diff --git a/drivers/staging/lustre/lustre/ptlrpc/lproc_ptlrpc.c b/drivers/staging/lustre/lustre/ptlrpc/lproc_ptlrpc.c
index 53f9af1f2..afab0dee7 100644
--- a/drivers/staging/lustre/lustre/ptlrpc/lproc_ptlrpc.c
+++ b/drivers/staging/lustre/lustre/ptlrpc/lproc_ptlrpc.c
@@ -35,7 +35,6 @@
*/
#define DEBUG_SUBSYSTEM S_CLASS
-
#include "../include/obd_support.h"
#include "../include/obd.h"
#include "../include/lprocfs_status.h"
@@ -44,7 +43,6 @@
#include "../include/obd_class.h"
#include "ptlrpc_internal.h"
-
static struct ll_rpc_opcode {
__u32 opcode;
const char *opname;
@@ -54,7 +52,7 @@ static struct ll_rpc_opcode {
{ OST_SETATTR, "ost_setattr" },
{ OST_READ, "ost_read" },
{ OST_WRITE, "ost_write" },
- { OST_CREATE , "ost_create" },
+ { OST_CREATE, "ost_create" },
{ OST_DESTROY, "ost_destroy" },
{ OST_GET_INFO, "ost_get_info" },
{ OST_CONNECT, "ost_connect" },
@@ -166,6 +164,7 @@ const char *ll_opcode2str(__u32 opcode)
* ptlrpc_internal.h needs to be modified.
*/
__u32 offset = opcode_offset(opcode);
+
LASSERTF(offset < LUSTRE_MAX_OPCODES,
"offset %u >= LUSTRE_MAX_OPCODES %u\n",
offset, LUSTRE_MAX_OPCODES);
@@ -239,6 +238,7 @@ ptlrpc_ldebugfs_register(struct dentry *root, char *dir,
}
for (i = 0; i < LUSTRE_MAX_OPCODES; i++) {
__u32 opcode = ll_rpc_opcode_table[i].opcode;
+
lprocfs_counter_init(svc_stats,
EXTRA_MAX_OPCODES + i, svc_counter_config,
ll_opcode2str(opcode), "usec");
@@ -270,6 +270,7 @@ ptlrpc_lprocfs_req_history_len_seq_show(struct seq_file *m, void *v)
seq_printf(m, "%d\n", total);
return 0;
}
+
LPROC_SEQ_FOPS_RO(ptlrpc_lprocfs_req_history_len);
static int
@@ -322,8 +323,8 @@ ptlrpc_lprocfs_req_history_max_seq_write(struct file *file,
return count;
}
-LPROC_SEQ_FOPS(ptlrpc_lprocfs_req_history_max);
+LPROC_SEQ_FOPS(ptlrpc_lprocfs_req_history_max);
static ssize_t threads_min_show(struct kobject *kobj, struct attribute *attr,
char *buf)
@@ -420,7 +421,6 @@ LUSTRE_RW_ATTR(threads_max);
* \addtogoup nrs
* @{
*/
-extern struct nrs_core nrs_core;
/**
* Translates \e ptlrpc_nrs_pol_state values to human-readable strings.
@@ -453,7 +453,7 @@ static const char *nrs_state2str(enum ptlrpc_nrs_pol_state state)
* \param[in] policy The policy
* \param[out] info Holds returned status information
*/
-void nrs_policy_get_info_locked(struct ptlrpc_nrs_policy *policy,
+static void nrs_policy_get_info_locked(struct ptlrpc_nrs_policy *policy,
struct ptlrpc_nrs_pol_info *info)
{
LASSERT(policy != NULL);
@@ -714,6 +714,7 @@ out:
return rc < 0 ? rc : count;
}
+
LPROC_SEQ_FOPS(ptlrpc_lprocfs_nrs);
/** @} nrs */
@@ -891,36 +892,6 @@ ptlrpc_lprocfs_svc_req_history_next(struct seq_file *s,
return NULL;
}
-/* common ost/mdt so_req_printer */
-void target_print_req(void *seq_file, struct ptlrpc_request *req)
-{
- /* Called holding srv_lock with irqs disabled.
- * Print specific req contents and a newline.
- * CAVEAT EMPTOR: check request message length before printing!!!
- * You might have received any old crap so you must be just as
- * careful here as the service's request parser!!! */
- struct seq_file *sf = seq_file;
-
- switch (req->rq_phase) {
- case RQ_PHASE_NEW:
- /* still awaiting a service thread's attention, or rejected
- * because the generic request message didn't unpack */
- seq_printf(sf, "<not swabbed>\n");
- break;
- case RQ_PHASE_INTERPRET:
- /* being handled, so basic msg swabbed, and opc is valid
- * but racing with mds_handle() */
- case RQ_PHASE_COMPLETE:
- /* been handled by mds_handle() reply state possibly still
- * volatile */
- seq_printf(sf, "opc %d\n", lustre_msg_get_opc(req->rq_reqmsg));
- break;
- default:
- DEBUG_REQ(D_ERROR, req, "bad phase %d", req->rq_phase);
- }
-}
-EXPORT_SYMBOL(target_print_req);
-
static int ptlrpc_lprocfs_svc_req_history_show(struct seq_file *s, void *iter)
{
struct ptlrpc_service *svc = s->private;
@@ -938,23 +909,26 @@ static int ptlrpc_lprocfs_svc_req_history_show(struct seq_file *s, void *iter)
rc = ptlrpc_lprocfs_svc_req_history_seek(svcpt, srhi, srhi->srhi_seq);
if (rc == 0) {
+ char nidstr[LNET_NIDSTR_SIZE];
+
req = srhi->srhi_req;
+ libcfs_nid2str_r(req->rq_self, nidstr, sizeof(nidstr));
/* Print common req fields.
* CAVEAT EMPTOR: we're racing with the service handler
* here. The request could contain any old crap, so you
* must be just as careful as the service's request
* parser. Currently I only print stuff here I know is OK
* to look at coz it was set up in request_in_callback()!!! */
- seq_printf(s, "%lld:%s:%s:x%llu:%d:%s:%ld:%lds(%+lds) ",
- req->rq_history_seq, libcfs_nid2str(req->rq_self),
+ seq_printf(s, "%lld:%s:%s:x%llu:%d:%s:%lld:%lds(%+lds) ",
+ req->rq_history_seq, nidstr,
libcfs_id2str(req->rq_peer), req->rq_xid,
req->rq_reqlen, ptlrpc_rqphase2str(req),
- req->rq_arrival_time.tv_sec,
- req->rq_sent - req->rq_arrival_time.tv_sec,
- req->rq_sent - req->rq_deadline);
+ (s64)req->rq_arrival_time.tv_sec,
+ (long)(req->rq_sent - req->rq_arrival_time.tv_sec),
+ (long)(req->rq_sent - req->rq_deadline));
if (svc->srv_ops.so_req_printer == NULL)
- seq_printf(s, "\n");
+ seq_putc(s, '\n');
else
svc->srv_ops.so_req_printer(s, srhi->srhi_req);
}
@@ -990,7 +964,7 @@ static int ptlrpc_lprocfs_timeouts_seq_show(struct seq_file *m, void *n)
struct ptlrpc_service *svc = m->private;
struct ptlrpc_service_part *svcpt;
struct dhms ts;
- time_t worstt;
+ time64_t worstt;
unsigned int cur;
unsigned int worst;
int i;
@@ -1005,17 +979,18 @@ static int ptlrpc_lprocfs_timeouts_seq_show(struct seq_file *m, void *n)
cur = at_get(&svcpt->scp_at_estimate);
worst = svcpt->scp_at_estimate.at_worst_ever;
worstt = svcpt->scp_at_estimate.at_worst_time;
- s2dhms(&ts, get_seconds() - worstt);
+ s2dhms(&ts, ktime_get_real_seconds() - worstt);
- seq_printf(m, "%10s : cur %3u worst %3u (at %ld, "
+ seq_printf(m, "%10s : cur %3u worst %3u (at %lld, "
DHMS_FMT" ago) ", "service",
- cur, worst, worstt, DHMS_VARS(&ts));
+ cur, worst, (s64)worstt, DHMS_VARS(&ts));
lprocfs_at_hist_helper(m, &svcpt->scp_at_estimate);
}
return 0;
}
+
LPROC_SEQ_FOPS_RO(ptlrpc_lprocfs_timeouts);
static ssize_t high_priority_ratio_show(struct kobject *kobj,
@@ -1208,55 +1183,6 @@ void ptlrpc_lprocfs_unregister_obd(struct obd_device *obd)
}
EXPORT_SYMBOL(ptlrpc_lprocfs_unregister_obd);
-
-#define BUFLEN (UUID_MAX + 5)
-
-int lprocfs_wr_evict_client(struct file *file, const char __user *buffer,
- size_t count, loff_t *off)
-{
- struct obd_device *obd = ((struct seq_file *)file->private_data)->private;
- char *kbuf;
- char *tmpbuf;
-
- kbuf = kzalloc(BUFLEN, GFP_NOFS);
- if (!kbuf)
- return -ENOMEM;
-
- /*
- * OBD_ALLOC() will zero kbuf, but we only copy BUFLEN - 1
- * bytes into kbuf, to ensure that the string is NUL-terminated.
- * UUID_MAX should include a trailing NUL already.
- */
- if (copy_from_user(kbuf, buffer,
- min_t(unsigned long, BUFLEN - 1, count))) {
- count = -EFAULT;
- goto out;
- }
- tmpbuf = cfs_firststr(kbuf, min_t(unsigned long, BUFLEN - 1, count));
- /* Kludge code(deadlock situation): the lprocfs lock has been held
- * since the client is evicted by writing client's
- * uuid/nid to procfs "evict_client" entry. However,
- * obd_export_evict_by_uuid() will call ldebugfs_remove() to destroy
- * the proc entries under the being destroyed export{}, so I have
- * to drop the lock at first here.
- * - jay, jxiong@clusterfs.com */
- class_incref(obd, __func__, current);
-
- if (strncmp(tmpbuf, "nid:", 4) == 0)
- obd_export_evict_by_nid(obd, tmpbuf + 4);
- else if (strncmp(tmpbuf, "uuid:", 5) == 0)
- obd_export_evict_by_uuid(obd, tmpbuf + 5);
- else
- obd_export_evict_by_uuid(obd, tmpbuf);
-
- class_decref(obd, __func__, current);
-
-out:
- kfree(kbuf);
- return count;
-}
-EXPORT_SYMBOL(lprocfs_wr_evict_client);
-
#undef BUFLEN
int lprocfs_wr_ping(struct file *file, const char __user *buffer,
@@ -1266,7 +1192,10 @@ int lprocfs_wr_ping(struct file *file, const char __user *buffer,
struct ptlrpc_request *req;
int rc;
- LPROCFS_CLIMP_CHECK(obd);
+ rc = lprocfs_climp_check(obd);
+ if (rc)
+ return rc;
+
req = ptlrpc_prep_ping(obd->u.cli.cl_import);
LPROCFS_CLIMP_EXIT(obd);
if (req == NULL)
@@ -1328,7 +1257,7 @@ int lprocfs_wr_import(struct file *file, const char __user *buffer,
*ptr = 0;
do_reconn = 0;
ptr += strlen("::");
- inst = simple_strtol(ptr, &endptr, 10);
+ inst = simple_strtoul(ptr, &endptr, 10);
if (*endptr) {
CERROR("config: wrong instance # %s\n", ptr);
} else if (inst != imp->imp_connect_data.ocd_instance) {
@@ -1355,8 +1284,12 @@ int lprocfs_rd_pinger_recov(struct seq_file *m, void *n)
{
struct obd_device *obd = m->private;
struct obd_import *imp = obd->u.cli.cl_import;
+ int rc;
+
+ rc = lprocfs_climp_check(obd);
+ if (rc)
+ return rc;
- LPROCFS_CLIMP_CHECK(obd);
seq_printf(m, "%d\n", !imp->imp_no_pinger_recover);
LPROCFS_CLIMP_EXIT(obd);
@@ -1379,7 +1312,10 @@ int lprocfs_wr_pinger_recov(struct file *file, const char __user *buffer,
if (val != 0 && val != 1)
return -ERANGE;
- LPROCFS_CLIMP_CHECK(obd);
+ rc = lprocfs_climp_check(obd);
+ if (rc)
+ return rc;
+
spin_lock(&imp->imp_lock);
imp->imp_no_pinger_recover = !val;
spin_unlock(&imp->imp_lock);
diff --git a/drivers/staging/lustre/lustre/ptlrpc/niobuf.c b/drivers/staging/lustre/lustre/ptlrpc/niobuf.c
index 92c746b44..09ddeef6b 100644
--- a/drivers/staging/lustre/lustre/ptlrpc/niobuf.c
+++ b/drivers/staging/lustre/lustre/ptlrpc/niobuf.c
@@ -106,12 +106,11 @@ static void mdunlink_iterate_helper(lnet_handle_md_t *bd_mds, int count)
LNetMDUnlink(bd_mds[i]);
}
-
/**
* Register bulk at the sender for later transfer.
* Returns 0 on success or error code.
*/
-int ptlrpc_register_bulk(struct ptlrpc_request *req)
+static int ptlrpc_register_bulk(struct ptlrpc_request *req)
{
struct ptlrpc_bulk_desc *desc = req->rq_bulk;
lnet_process_id_t peer;
@@ -232,7 +231,6 @@ int ptlrpc_register_bulk(struct ptlrpc_request *req)
return 0;
}
-EXPORT_SYMBOL(ptlrpc_register_bulk);
/**
* Disconnect a bulk desc from the network. Idempotent. Not
@@ -252,7 +250,7 @@ int ptlrpc_unregister_bulk(struct ptlrpc_request *req, int async)
/* Let's setup deadline for reply unlink. */
if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_LONG_BULK_UNLINK) &&
async && req->rq_bulk_deadline == 0)
- req->rq_bulk_deadline = get_seconds() + LONG_UNLINK;
+ req->rq_bulk_deadline = ktime_get_real_seconds() + LONG_UNLINK;
if (ptlrpc_client_bulk_active(req) == 0) /* completed or */
return 1; /* never registered */
@@ -303,7 +301,7 @@ static void ptlrpc_at_set_reply(struct ptlrpc_request *req, int flags)
{
struct ptlrpc_service_part *svcpt = req->rq_rqbd->rqbd_svcpt;
struct ptlrpc_service *svc = svcpt->scp_service;
- int service_time = max_t(int, get_seconds() -
+ int service_time = max_t(int, ktime_get_real_seconds() -
req->rq_arrival_time.tv_sec, 1);
if (!(flags & PTLRPC_REPLY_EARLY) &&
@@ -328,8 +326,7 @@ static void ptlrpc_at_set_reply(struct ptlrpc_request *req, int flags)
/* Report service time estimate for future client reqs, but report 0
* (to be ignored by client) if it's a error reply during recovery.
* (bz15815) */
- if (req->rq_type == PTL_RPC_MSG_ERR &&
- (req->rq_export == NULL || req->rq_export->exp_obd->obd_recovering))
+ if (req->rq_type == PTL_RPC_MSG_ERR && !req->rq_export)
lustre_msg_set_timeout(req->rq_repmsg, 0);
else
lustre_msg_set_timeout(req->rq_repmsg,
@@ -337,9 +334,8 @@ static void ptlrpc_at_set_reply(struct ptlrpc_request *req, int flags)
if (req->rq_reqmsg &&
!(lustre_msghdr_get_flags(req->rq_reqmsg) & MSGHDR_AT_SUPPORT)) {
- CDEBUG(D_ADAPTTO, "No early reply support: flags=%#x req_flags=%#x magic=%d:%x/%x len=%d\n",
+ CDEBUG(D_ADAPTTO, "No early reply support: flags=%#x req_flags=%#x magic=%x/%x len=%d\n",
flags, lustre_msg_get_flags(req->rq_reqmsg),
- lustre_msg_is_v1(req->rq_reqmsg),
lustre_msg_get_magic(req->rq_reqmsg),
lustre_msg_get_magic(req->rq_repmsg), req->rq_replen);
}
@@ -422,7 +418,7 @@ int ptlrpc_send_reply(struct ptlrpc_request *req, int flags)
if (unlikely(rc))
goto out;
- req->rq_sent = get_seconds();
+ req->rq_sent = ktime_get_real_seconds();
rc = ptl_send_buf(&rs->rs_md_h, rs->rs_repbuf, rs->rs_repdata_len,
(rs->rs_difficult && !rs->rs_no_ack) ?
@@ -601,7 +597,7 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply)
/* Manage remote for early replies */
reply_md.options = PTLRPC_MD_OPTIONS | LNET_MD_OP_PUT |
LNET_MD_MANAGE_REMOTE |
- LNET_MD_TRUNCATE; /* allow to make EOVERFLOW error */;
+ LNET_MD_TRUNCATE; /* allow to make EOVERFLOW error */
reply_md.user_ptr = &request->rq_reply_cbid;
reply_md.eq_handle = ptlrpc_eq_h;
@@ -633,8 +629,8 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply)
OBD_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_DELAY_SEND, request->rq_timeout + 5);
- do_gettimeofday(&request->rq_arrival_time);
- request->rq_sent = get_seconds();
+ ktime_get_real_ts64(&request->rq_arrival_time);
+ request->rq_sent = ktime_get_real_seconds();
/* We give the server rq_timeout secs to process the req, and
add the network latency for our local timeout. */
request->rq_deadline = request->rq_sent + request->rq_timeout +
diff --git a/drivers/staging/lustre/lustre/ptlrpc/nrs.c b/drivers/staging/lustre/lustre/ptlrpc/nrs.c
index d37cdd5ac..7044e1ff6 100644
--- a/drivers/staging/lustre/lustre/ptlrpc/nrs.c
+++ b/drivers/staging/lustre/lustre/ptlrpc/nrs.c
@@ -49,7 +49,6 @@
/* XXX: This is just for liblustre. Remove the #if defined directive when the
* "cfs_" prefix is dropped from cfs_list_head. */
-extern struct list_head ptlrpc_all_services;
/**
* NRS core object.
@@ -478,7 +477,6 @@ static void nrs_resource_get_safe(struct ptlrpc_nrs *nrs,
*
* \param resp the resource hierarchy that is being released
*
- * \see ptlrpc_nrs_req_hp_move()
* \see ptlrpc_nrs_req_finalize()
*/
static void nrs_resource_put_safe(struct ptlrpc_nrs_resource **resp)
@@ -1113,7 +1111,7 @@ again:
* \retval -ve error
* \retval 0 success
*/
-int ptlrpc_nrs_policy_register(struct ptlrpc_nrs_pol_conf *conf)
+static int ptlrpc_nrs_policy_register(struct ptlrpc_nrs_pol_conf *conf)
{
struct ptlrpc_service *svc;
struct ptlrpc_nrs_pol_desc *desc;
@@ -1249,71 +1247,6 @@ fail:
return rc;
}
-EXPORT_SYMBOL(ptlrpc_nrs_policy_register);
-
-/**
- * Unregisters a previously registered policy with NRS core. All instances of
- * the policy on all NRS heads of all supported services are removed.
- *
- * N.B. This function should only be called from a module's exit() function.
- * Although it can be used for policies that ship alongside NRS core, the
- * function is primarily intended for policies that register externally,
- * from other modules.
- *
- * \param[in] conf configuration information for the policy to unregister
- *
- * \retval -ve error
- * \retval 0 success
- */
-int ptlrpc_nrs_policy_unregister(struct ptlrpc_nrs_pol_conf *conf)
-{
- struct ptlrpc_nrs_pol_desc *desc;
- int rc;
-
- LASSERT(conf != NULL);
-
- if (conf->nc_flags & PTLRPC_NRS_FL_FALLBACK) {
- CERROR("Unable to unregister a fallback policy, unless the PTLRPC service is stopping.\n");
- return -EPERM;
- }
-
- conf->nc_name[NRS_POL_NAME_MAX - 1] = '\0';
-
- mutex_lock(&nrs_core.nrs_mutex);
-
- desc = nrs_policy_find_desc_locked(conf->nc_name);
- if (desc == NULL) {
- CERROR("Failing to unregister NRS policy %s which has not been registered with NRS core!\n",
- conf->nc_name);
- rc = -ENOENT;
- goto not_exist;
- }
-
- mutex_lock(&ptlrpc_all_services_mutex);
-
- rc = nrs_policy_unregister_locked(desc);
- if (rc < 0) {
- if (rc == -EBUSY)
- CERROR("Please first stop policy %s on all service partitions and then retry to unregister the policy.\n",
- conf->nc_name);
- goto fail;
- }
-
- CDEBUG(D_INFO, "Unregistering policy %s from NRS core.\n",
- conf->nc_name);
-
- list_del(&desc->pd_list);
- kfree(desc);
-
-fail:
- mutex_unlock(&ptlrpc_all_services_mutex);
-
-not_exist:
- mutex_unlock(&nrs_core.nrs_mutex);
-
- return rc;
-}
-EXPORT_SYMBOL(ptlrpc_nrs_policy_unregister);
/**
* Setup NRS heads on all service partitions of service \a svc, and register
@@ -1554,22 +1487,6 @@ ptlrpc_nrs_req_get_nolock0(struct ptlrpc_service_part *svcpt, bool hp,
}
/**
- * Dequeues request \a req from the policy it has been enqueued on.
- *
- * \param[in] req the request
- */
-void ptlrpc_nrs_req_del_nolock(struct ptlrpc_request *req)
-{
- struct ptlrpc_nrs_policy *policy = nrs_request_policy(&req->rq_nrq);
-
- policy->pol_desc->pd_ops->op_req_dequeue(policy, &req->rq_nrq);
-
- req->rq_nrq.nr_enqueued = 0;
-
- nrs_request_removed(policy);
-}
-
-/**
* Returns whether there are any requests currently enqueued on any of the
* policies of service partition's \a svcpt NRS head specified by \a hp. Should
* be called while holding ptlrpc_service_part::scp_req_lock to get a reliable
@@ -1590,48 +1507,6 @@ bool ptlrpc_nrs_req_pending_nolock(struct ptlrpc_service_part *svcpt, bool hp)
};
/**
- * Moves request \a req from the regular to the high-priority NRS head.
- *
- * \param[in] req the request to move
- */
-void ptlrpc_nrs_req_hp_move(struct ptlrpc_request *req)
-{
- struct ptlrpc_service_part *svcpt = req->rq_rqbd->rqbd_svcpt;
- struct ptlrpc_nrs_request *nrq = &req->rq_nrq;
- struct ptlrpc_nrs_resource *res1[NRS_RES_MAX];
- struct ptlrpc_nrs_resource *res2[NRS_RES_MAX];
-
- /**
- * Obtain the high-priority NRS head resources.
- */
- nrs_resource_get_safe(nrs_svcpt2nrs(svcpt, true), nrq, res1, true);
-
- spin_lock(&svcpt->scp_req_lock);
-
- if (!ptlrpc_nrs_req_can_move(req))
- goto out;
-
- ptlrpc_nrs_req_del_nolock(req);
-
- memcpy(res2, nrq->nr_res_ptrs, NRS_RES_MAX * sizeof(res2[0]));
- memcpy(nrq->nr_res_ptrs, res1, NRS_RES_MAX * sizeof(res1[0]));
-
- ptlrpc_nrs_hpreq_add_nolock(req);
-
- memcpy(res1, res2, NRS_RES_MAX * sizeof(res1[0]));
-out:
- spin_unlock(&svcpt->scp_req_lock);
-
- /**
- * Release either the regular NRS head resources if we moved the
- * request, or the high-priority NRS head resources if we took a
- * reference earlier in this function and ptlrpc_nrs_req_can_move()
- * returned false.
- */
- nrs_resource_put_safe(res1);
-}
-
-/**
* Carries out a control operation \a opc on the policy identified by the
* human-readable \a name, on either all partitions, or only on the first
* partition of service \a svc.
@@ -1698,7 +1573,6 @@ out:
return rc;
}
-
/* ptlrpc/nrs_fifo.c */
extern struct ptlrpc_nrs_pol_conf nrs_conf_fifo;
@@ -1720,7 +1594,6 @@ int ptlrpc_nrs_init(void)
if (rc != 0)
goto fail;
-
return rc;
fail:
/**
diff --git a/drivers/staging/lustre/lustre/ptlrpc/pack_generic.c b/drivers/staging/lustre/lustre/ptlrpc/pack_generic.c
index 84937ad90..f3cb5184f 100644
--- a/drivers/staging/lustre/lustre/ptlrpc/pack_generic.c
+++ b/drivers/staging/lustre/lustre/ptlrpc/pack_generic.c
@@ -94,32 +94,11 @@ int ptlrpc_buf_need_swab(struct ptlrpc_request *req, const int inout,
}
EXPORT_SYMBOL(ptlrpc_buf_need_swab);
-static inline int lustre_msg_check_version_v2(struct lustre_msg_v2 *msg,
- __u32 version)
-{
- __u32 ver = lustre_msg_get_version(msg);
- return (ver & LUSTRE_VERSION_MASK) != version;
-}
-
-int lustre_msg_check_version(struct lustre_msg *msg, __u32 version)
-{
- switch (msg->lm_magic) {
- case LUSTRE_MSG_MAGIC_V1:
- CERROR("msg v1 not supported - please upgrade you system\n");
- return -EINVAL;
- case LUSTRE_MSG_MAGIC_V2:
- return lustre_msg_check_version_v2(msg, version);
- default:
- CERROR("incorrect message magic: %08x\n", msg->lm_magic);
- return 0;
- }
-}
-EXPORT_SYMBOL(lustre_msg_check_version);
-
/* early reply size */
int lustre_msg_early_size(void)
{
static int size;
+
if (!size) {
/* Always reply old ptlrpc_body_v2 to keep interoperability
* with the old client (< 2.3) which doesn't have pb_jobid
@@ -129,6 +108,7 @@ int lustre_msg_early_size(void)
* client.
*/
__u32 pblen = sizeof(struct ptlrpc_body_v2);
+
size = lustre_msg_size(LUSTRE_MSG_MAGIC_V2, 1, &pblen);
}
return size;
@@ -209,6 +189,7 @@ void lustre_init_msg_v2(struct lustre_msg_v2 *msg, int count, __u32 *lens,
ptr = (char *)msg + lustre_msg_hdr_size_v2(count);
for (i = 0; i < count; i++) {
char *tmp = bufs[i];
+
LOGL(tmp, lens[i], ptr);
}
}
@@ -433,14 +414,15 @@ void *lustre_msg_buf(struct lustre_msg *m, int n, int min_size)
case LUSTRE_MSG_MAGIC_V2:
return lustre_msg_buf_v2(m, n, min_size);
default:
- LASSERTF(0, "incorrect message magic: %08x(msg:%p)\n", m->lm_magic, m);
+ LASSERTF(0, "incorrect message magic: %08x (msg:%p)\n",
+ m->lm_magic, m);
return NULL;
}
}
EXPORT_SYMBOL(lustre_msg_buf);
-int lustre_shrink_msg_v2(struct lustre_msg_v2 *msg, int segment,
- unsigned int newlen, int move_data)
+static int lustre_shrink_msg_v2(struct lustre_msg_v2 *msg, int segment,
+ unsigned int newlen, int move_data)
{
char *tail = NULL, *newpos;
int tail_len = 0, n;
@@ -593,6 +575,7 @@ EXPORT_SYMBOL(__lustre_unpack_msg);
int ptlrpc_unpack_req_msg(struct ptlrpc_request *req, int len)
{
int rc;
+
rc = __lustre_unpack_msg(req->rq_reqmsg, len);
if (rc == 1) {
lustre_set_req_swabbed(req, MSG_PTLRPC_HEADER_OFF);
@@ -605,6 +588,7 @@ EXPORT_SYMBOL(ptlrpc_unpack_req_msg);
int ptlrpc_unpack_rep_msg(struct ptlrpc_request *req, int len)
{
int rc;
+
rc = __lustre_unpack_msg(req->rq_repmsg, len);
if (rc == 1) {
lustre_set_rep_swabbed(req, MSG_PTLRPC_HEADER_OFF);
@@ -692,28 +676,6 @@ int lustre_msg_buflen(struct lustre_msg *m, int n)
}
EXPORT_SYMBOL(lustre_msg_buflen);
-static inline void
-lustre_msg_set_buflen_v2(struct lustre_msg_v2 *m, int n, int len)
-{
- if (n >= m->lm_bufcount)
- LBUG();
-
- m->lm_buflens[n] = len;
-}
-
-void lustre_msg_set_buflen(struct lustre_msg *m, int n, int len)
-{
- switch (m->lm_magic) {
- case LUSTRE_MSG_MAGIC_V2:
- lustre_msg_set_buflen_v2(m, n, len);
- return;
- default:
- LASSERTF(0, "incorrect message magic: %08x\n", m->lm_magic);
- }
-}
-
-EXPORT_SYMBOL(lustre_msg_set_buflen);
-
/* NB return the bufcount for lustre_msg_v2 format, so if message is packed
* in V1 format, the result is one bigger. (add struct ptlrpc_body). */
int lustre_msg_bufcount(struct lustre_msg *m)
@@ -802,14 +764,11 @@ static inline struct ptlrpc_body *lustre_msg_ptlrpc_body(struct lustre_msg *msg)
__u32 lustre_msghdr_get_flags(struct lustre_msg *msg)
{
switch (msg->lm_magic) {
- case LUSTRE_MSG_MAGIC_V1:
- case LUSTRE_MSG_MAGIC_V1_SWABBED:
- return 0;
case LUSTRE_MSG_MAGIC_V2:
/* already in host endian */
return msg->lm_flags;
default:
- LASSERTF(0, "incorrect message magic: %08x\n", msg->lm_magic);
+ CERROR("incorrect message magic: %08x\n", msg->lm_magic);
return 0;
}
}
@@ -818,8 +777,6 @@ EXPORT_SYMBOL(lustre_msghdr_get_flags);
void lustre_msghdr_set_flags(struct lustre_msg *msg, __u32 flags)
{
switch (msg->lm_magic) {
- case LUSTRE_MSG_MAGIC_V1:
- return;
case LUSTRE_MSG_MAGIC_V2:
msg->lm_flags = flags;
return;
@@ -833,12 +790,13 @@ __u32 lustre_msg_get_flags(struct lustre_msg *msg)
switch (msg->lm_magic) {
case LUSTRE_MSG_MAGIC_V2: {
struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
- if (!pb) {
- CERROR("invalid msg %p: no ptlrpc body!\n", msg);
- return 0;
- }
- return pb->pb_flags;
+
+ if (pb)
+ return pb->pb_flags;
+
+ CERROR("invalid msg %p: no ptlrpc body!\n", msg);
}
+ /* no break */
default:
/* flags might be printed in debug code while message
* uninitialized */
@@ -852,6 +810,7 @@ void lustre_msg_add_flags(struct lustre_msg *msg, int flags)
switch (msg->lm_magic) {
case LUSTRE_MSG_MAGIC_V2: {
struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
+
LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
pb->pb_flags |= flags;
return;
@@ -867,6 +826,7 @@ void lustre_msg_set_flags(struct lustre_msg *msg, int flags)
switch (msg->lm_magic) {
case LUSTRE_MSG_MAGIC_V2: {
struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
+
LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
pb->pb_flags = flags;
return;
@@ -882,8 +842,9 @@ void lustre_msg_clear_flags(struct lustre_msg *msg, int flags)
switch (msg->lm_magic) {
case LUSTRE_MSG_MAGIC_V2: {
struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
+
LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
- pb->pb_flags &= ~(MSG_GEN_FLAG_MASK & flags);
+ pb->pb_flags &= ~(flags & MSG_GEN_FLAG_MASK);
return;
}
default:
@@ -897,12 +858,13 @@ __u32 lustre_msg_get_op_flags(struct lustre_msg *msg)
switch (msg->lm_magic) {
case LUSTRE_MSG_MAGIC_V2: {
struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
- if (!pb) {
- CERROR("invalid msg %p: no ptlrpc body!\n", msg);
- return 0;
- }
- return pb->pb_op_flags;
+
+ if (pb)
+ return pb->pb_op_flags;
+
+ CERROR("invalid msg %p: no ptlrpc body!\n", msg);
}
+ /* no break */
default:
return 0;
}
@@ -914,21 +876,7 @@ void lustre_msg_add_op_flags(struct lustre_msg *msg, int flags)
switch (msg->lm_magic) {
case LUSTRE_MSG_MAGIC_V2: {
struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
- LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
- pb->pb_op_flags |= flags;
- return;
- }
- default:
- LASSERTF(0, "incorrect message magic: %08x\n", msg->lm_magic);
- }
-}
-EXPORT_SYMBOL(lustre_msg_add_op_flags);
-void lustre_msg_set_op_flags(struct lustre_msg *msg, int flags)
-{
- switch (msg->lm_magic) {
- case LUSTRE_MSG_MAGIC_V2: {
- struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
pb->pb_op_flags |= flags;
return;
@@ -937,13 +885,14 @@ void lustre_msg_set_op_flags(struct lustre_msg *msg, int flags)
LASSERTF(0, "incorrect message magic: %08x\n", msg->lm_magic);
}
}
-EXPORT_SYMBOL(lustre_msg_set_op_flags);
+EXPORT_SYMBOL(lustre_msg_add_op_flags);
struct lustre_handle *lustre_msg_get_handle(struct lustre_msg *msg)
{
switch (msg->lm_magic) {
case LUSTRE_MSG_MAGIC_V2: {
struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
+
if (!pb) {
CERROR("invalid msg %p: no ptlrpc body!\n", msg);
return NULL;
@@ -962,6 +911,7 @@ __u32 lustre_msg_get_type(struct lustre_msg *msg)
switch (msg->lm_magic) {
case LUSTRE_MSG_MAGIC_V2: {
struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
+
if (!pb) {
CERROR("invalid msg %p: no ptlrpc body!\n", msg);
return PTL_RPC_MSG_ERR;
@@ -975,29 +925,12 @@ __u32 lustre_msg_get_type(struct lustre_msg *msg)
}
EXPORT_SYMBOL(lustre_msg_get_type);
-__u32 lustre_msg_get_version(struct lustre_msg *msg)
-{
- switch (msg->lm_magic) {
- case LUSTRE_MSG_MAGIC_V2: {
- struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
- if (!pb) {
- CERROR("invalid msg %p: no ptlrpc body!\n", msg);
- return 0;
- }
- return pb->pb_version;
- }
- default:
- CERROR("incorrect message magic: %08x\n", msg->lm_magic);
- return 0;
- }
-}
-EXPORT_SYMBOL(lustre_msg_get_version);
-
void lustre_msg_add_version(struct lustre_msg *msg, int version)
{
switch (msg->lm_magic) {
case LUSTRE_MSG_MAGIC_V2: {
struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
+
LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
pb->pb_version |= version;
return;
@@ -1013,6 +946,7 @@ __u32 lustre_msg_get_opc(struct lustre_msg *msg)
switch (msg->lm_magic) {
case LUSTRE_MSG_MAGIC_V2: {
struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
+
if (!pb) {
CERROR("invalid msg %p: no ptlrpc body!\n", msg);
return 0;
@@ -1020,36 +954,19 @@ __u32 lustre_msg_get_opc(struct lustre_msg *msg)
return pb->pb_opc;
}
default:
- CERROR("incorrect message magic: %08x(msg:%p)\n", msg->lm_magic, msg);
- LBUG();
+ CERROR("incorrect message magic: %08x (msg:%p)\n",
+ msg->lm_magic, msg);
return 0;
}
}
EXPORT_SYMBOL(lustre_msg_get_opc);
-__u64 lustre_msg_get_last_xid(struct lustre_msg *msg)
-{
- switch (msg->lm_magic) {
- case LUSTRE_MSG_MAGIC_V2: {
- struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
- if (!pb) {
- CERROR("invalid msg %p: no ptlrpc body!\n", msg);
- return 0;
- }
- return pb->pb_last_xid;
- }
- default:
- CERROR("incorrect message magic: %08x\n", msg->lm_magic);
- return 0;
- }
-}
-EXPORT_SYMBOL(lustre_msg_get_last_xid);
-
__u64 lustre_msg_get_last_committed(struct lustre_msg *msg)
{
switch (msg->lm_magic) {
case LUSTRE_MSG_MAGIC_V2: {
struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
+
if (!pb) {
CERROR("invalid msg %p: no ptlrpc body!\n", msg);
return 0;
@@ -1066,10 +983,9 @@ EXPORT_SYMBOL(lustre_msg_get_last_committed);
__u64 *lustre_msg_get_versions(struct lustre_msg *msg)
{
switch (msg->lm_magic) {
- case LUSTRE_MSG_MAGIC_V1:
- return NULL;
case LUSTRE_MSG_MAGIC_V2: {
struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
+
if (!pb) {
CERROR("invalid msg %p: no ptlrpc body!\n", msg);
return NULL;
@@ -1088,6 +1004,7 @@ __u64 lustre_msg_get_transno(struct lustre_msg *msg)
switch (msg->lm_magic) {
case LUSTRE_MSG_MAGIC_V2: {
struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
+
if (!pb) {
CERROR("invalid msg %p: no ptlrpc body!\n", msg);
return 0;
@@ -1106,12 +1023,13 @@ int lustre_msg_get_status(struct lustre_msg *msg)
switch (msg->lm_magic) {
case LUSTRE_MSG_MAGIC_V2: {
struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
- if (!pb) {
- CERROR("invalid msg %p: no ptlrpc body!\n", msg);
- return -EINVAL;
- }
- return pb->pb_status;
+
+ if (pb)
+ return pb->pb_status;
+
+ CERROR("invalid msg %p: no ptlrpc body!\n", msg);
}
+ /* no break */
default:
/* status might be printed in debug code while message
* uninitialized */
@@ -1125,6 +1043,7 @@ __u64 lustre_msg_get_slv(struct lustre_msg *msg)
switch (msg->lm_magic) {
case LUSTRE_MSG_MAGIC_V2: {
struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
+
if (!pb) {
CERROR("invalid msg %p: no ptlrpc body!\n", msg);
return -EINVAL;
@@ -1138,12 +1057,12 @@ __u64 lustre_msg_get_slv(struct lustre_msg *msg)
}
EXPORT_SYMBOL(lustre_msg_get_slv);
-
void lustre_msg_set_slv(struct lustre_msg *msg, __u64 slv)
{
switch (msg->lm_magic) {
case LUSTRE_MSG_MAGIC_V2: {
struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
+
if (!pb) {
CERROR("invalid msg %p: no ptlrpc body!\n", msg);
return;
@@ -1163,6 +1082,7 @@ __u32 lustre_msg_get_limit(struct lustre_msg *msg)
switch (msg->lm_magic) {
case LUSTRE_MSG_MAGIC_V2: {
struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
+
if (!pb) {
CERROR("invalid msg %p: no ptlrpc body!\n", msg);
return -EINVAL;
@@ -1176,12 +1096,12 @@ __u32 lustre_msg_get_limit(struct lustre_msg *msg)
}
EXPORT_SYMBOL(lustre_msg_get_limit);
-
void lustre_msg_set_limit(struct lustre_msg *msg, __u64 limit)
{
switch (msg->lm_magic) {
case LUSTRE_MSG_MAGIC_V2: {
struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
+
if (!pb) {
CERROR("invalid msg %p: no ptlrpc body!\n", msg);
return;
@@ -1201,6 +1121,7 @@ __u32 lustre_msg_get_conn_cnt(struct lustre_msg *msg)
switch (msg->lm_magic) {
case LUSTRE_MSG_MAGIC_V2: {
struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
+
if (!pb) {
CERROR("invalid msg %p: no ptlrpc body!\n", msg);
return 0;
@@ -1214,18 +1135,6 @@ __u32 lustre_msg_get_conn_cnt(struct lustre_msg *msg)
}
EXPORT_SYMBOL(lustre_msg_get_conn_cnt);
-int lustre_msg_is_v1(struct lustre_msg *msg)
-{
- switch (msg->lm_magic) {
- case LUSTRE_MSG_MAGIC_V1:
- case LUSTRE_MSG_MAGIC_V1_SWABBED:
- return 1;
- default:
- return 0;
- }
-}
-EXPORT_SYMBOL(lustre_msg_is_v1);
-
__u32 lustre_msg_get_magic(struct lustre_msg *msg)
{
switch (msg->lm_magic) {
@@ -1241,11 +1150,9 @@ EXPORT_SYMBOL(lustre_msg_get_magic);
__u32 lustre_msg_get_timeout(struct lustre_msg *msg)
{
switch (msg->lm_magic) {
- case LUSTRE_MSG_MAGIC_V1:
- case LUSTRE_MSG_MAGIC_V1_SWABBED:
- return 0;
case LUSTRE_MSG_MAGIC_V2: {
struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
+
if (!pb) {
CERROR("invalid msg %p: no ptlrpc body!\n", msg);
return 0;
@@ -1255,18 +1162,16 @@ __u32 lustre_msg_get_timeout(struct lustre_msg *msg)
}
default:
CERROR("incorrect message magic: %08x\n", msg->lm_magic);
- return 0;
+ return -EPROTO;
}
}
__u32 lustre_msg_get_service_time(struct lustre_msg *msg)
{
switch (msg->lm_magic) {
- case LUSTRE_MSG_MAGIC_V1:
- case LUSTRE_MSG_MAGIC_V1_SWABBED:
- return 0;
case LUSTRE_MSG_MAGIC_V2: {
struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
+
if (!pb) {
CERROR("invalid msg %p: no ptlrpc body!\n", msg);
return 0;
@@ -1280,28 +1185,6 @@ __u32 lustre_msg_get_service_time(struct lustre_msg *msg)
}
}
-char *lustre_msg_get_jobid(struct lustre_msg *msg)
-{
- switch (msg->lm_magic) {
- case LUSTRE_MSG_MAGIC_V1:
- case LUSTRE_MSG_MAGIC_V1_SWABBED:
- return NULL;
- case LUSTRE_MSG_MAGIC_V2: {
- struct ptlrpc_body *pb =
- lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF,
- sizeof(struct ptlrpc_body));
- if (!pb)
- return NULL;
-
- return pb->pb_jobid;
- }
- default:
- CERROR("incorrect message magic: %08x\n", msg->lm_magic);
- return NULL;
- }
-}
-EXPORT_SYMBOL(lustre_msg_get_jobid);
-
__u32 lustre_msg_get_cksum(struct lustre_msg *msg)
{
switch (msg->lm_magic) {
@@ -1320,6 +1203,7 @@ __u32 lustre_msg_calc_cksum(struct lustre_msg *msg)
struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
__u32 crc;
unsigned int hsize = 4;
+
cfs_crypto_hash_digest(CFS_HASH_ALG_CRC32, (unsigned char *)pb,
lustre_msg_buflen(msg, MSG_PTLRPC_BODY_OFF),
NULL, 0, (unsigned char *)&crc, &hsize);
@@ -1336,6 +1220,7 @@ void lustre_msg_set_handle(struct lustre_msg *msg, struct lustre_handle *handle)
switch (msg->lm_magic) {
case LUSTRE_MSG_MAGIC_V2: {
struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
+
LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
pb->pb_handle = *handle;
return;
@@ -1351,6 +1236,7 @@ void lustre_msg_set_type(struct lustre_msg *msg, __u32 type)
switch (msg->lm_magic) {
case LUSTRE_MSG_MAGIC_V2: {
struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
+
LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
pb->pb_type = type;
return;
@@ -1366,6 +1252,7 @@ void lustre_msg_set_opc(struct lustre_msg *msg, __u32 opc)
switch (msg->lm_magic) {
case LUSTRE_MSG_MAGIC_V2: {
struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
+
LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
pb->pb_opc = opc;
return;
@@ -1376,43 +1263,12 @@ void lustre_msg_set_opc(struct lustre_msg *msg, __u32 opc)
}
EXPORT_SYMBOL(lustre_msg_set_opc);
-void lustre_msg_set_last_xid(struct lustre_msg *msg, __u64 last_xid)
-{
- switch (msg->lm_magic) {
- case LUSTRE_MSG_MAGIC_V2: {
- struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
- LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
- pb->pb_last_xid = last_xid;
- return;
- }
- default:
- LASSERTF(0, "incorrect message magic: %08x\n", msg->lm_magic);
- }
-}
-EXPORT_SYMBOL(lustre_msg_set_last_xid);
-
-void lustre_msg_set_last_committed(struct lustre_msg *msg, __u64 last_committed)
-{
- switch (msg->lm_magic) {
- case LUSTRE_MSG_MAGIC_V2: {
- struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
- LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
- pb->pb_last_committed = last_committed;
- return;
- }
- default:
- LASSERTF(0, "incorrect message magic: %08x\n", msg->lm_magic);
- }
-}
-EXPORT_SYMBOL(lustre_msg_set_last_committed);
-
void lustre_msg_set_versions(struct lustre_msg *msg, __u64 *versions)
{
switch (msg->lm_magic) {
- case LUSTRE_MSG_MAGIC_V1:
- return;
case LUSTRE_MSG_MAGIC_V2: {
struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
+
LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
pb->pb_pre_versions[0] = versions[0];
pb->pb_pre_versions[1] = versions[1];
@@ -1431,6 +1287,7 @@ void lustre_msg_set_transno(struct lustre_msg *msg, __u64 transno)
switch (msg->lm_magic) {
case LUSTRE_MSG_MAGIC_V2: {
struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
+
LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
pb->pb_transno = transno;
return;
@@ -1446,6 +1303,7 @@ void lustre_msg_set_status(struct lustre_msg *msg, __u32 status)
switch (msg->lm_magic) {
case LUSTRE_MSG_MAGIC_V2: {
struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
+
LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
pb->pb_status = status;
return;
@@ -1461,6 +1319,7 @@ void lustre_msg_set_conn_cnt(struct lustre_msg *msg, __u32 conn_cnt)
switch (msg->lm_magic) {
case LUSTRE_MSG_MAGIC_V2: {
struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
+
LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
pb->pb_conn_cnt = conn_cnt;
return;
@@ -1474,10 +1333,9 @@ EXPORT_SYMBOL(lustre_msg_set_conn_cnt);
void lustre_msg_set_timeout(struct lustre_msg *msg, __u32 timeout)
{
switch (msg->lm_magic) {
- case LUSTRE_MSG_MAGIC_V1:
- return;
case LUSTRE_MSG_MAGIC_V2: {
struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
+
LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
pb->pb_timeout = timeout;
return;
@@ -1490,10 +1348,9 @@ void lustre_msg_set_timeout(struct lustre_msg *msg, __u32 timeout)
void lustre_msg_set_service_time(struct lustre_msg *msg, __u32 service_time)
{
switch (msg->lm_magic) {
- case LUSTRE_MSG_MAGIC_V1:
- return;
case LUSTRE_MSG_MAGIC_V2: {
struct ptlrpc_body *pb = lustre_msg_ptlrpc_body(msg);
+
LASSERTF(pb, "invalid msg %p: no ptlrpc body!\n", msg);
pb->pb_service_time = service_time;
return;
@@ -1506,8 +1363,6 @@ void lustre_msg_set_service_time(struct lustre_msg *msg, __u32 service_time)
void lustre_msg_set_jobid(struct lustre_msg *msg, char *jobid)
{
switch (msg->lm_magic) {
- case LUSTRE_MSG_MAGIC_V1:
- return;
case LUSTRE_MSG_MAGIC_V2: {
__u32 opc = lustre_msg_get_opc(msg);
struct ptlrpc_body *pb;
@@ -1537,8 +1392,6 @@ EXPORT_SYMBOL(lustre_msg_set_jobid);
void lustre_msg_set_cksum(struct lustre_msg *msg, __u32 cksum)
{
switch (msg->lm_magic) {
- case LUSTRE_MSG_MAGIC_V1:
- return;
case LUSTRE_MSG_MAGIC_V2:
msg->lm_cksum = cksum;
return;
@@ -1547,7 +1400,6 @@ void lustre_msg_set_cksum(struct lustre_msg *msg, __u32 cksum)
}
}
-
void ptlrpc_request_set_replen(struct ptlrpc_request *req)
{
int count = req_capsule_filled_sizes(&req->rq_pill, RCL_SERVER);
@@ -1559,14 +1411,6 @@ void ptlrpc_request_set_replen(struct ptlrpc_request *req)
}
EXPORT_SYMBOL(ptlrpc_request_set_replen);
-void ptlrpc_req_set_repsize(struct ptlrpc_request *req, int count, __u32 *lens)
-{
- req->rq_replen = lustre_msg_size(req->rq_reqmsg->lm_magic, count, lens);
- if (req->rq_reqmsg->lm_magic == LUSTRE_MSG_MAGIC_V2)
- req->rq_reqmsg->lm_repsize = req->rq_replen;
-}
-EXPORT_SYMBOL(ptlrpc_req_set_repsize);
-
/**
* Send a remote set_info_async.
*
@@ -1690,7 +1534,7 @@ void lustre_swab_connect(struct obd_connect_data *ocd)
CLASSERT(offsetof(typeof(*ocd), paddingF) != 0);
}
-void lustre_swab_obdo(struct obdo *o)
+static void lustre_swab_obdo(struct obdo *o)
{
__swab64s(&o->o_valid);
lustre_swab_ost_id(&o->o_oi);
@@ -1722,7 +1566,6 @@ void lustre_swab_obdo(struct obdo *o)
CLASSERT(offsetof(typeof(*o), o_padding_6) != 0);
}
-EXPORT_SYMBOL(lustre_swab_obdo);
void lustre_swab_obd_statfs(struct obd_statfs *os)
{
@@ -1874,6 +1717,7 @@ EXPORT_SYMBOL(lustre_swab_mdt_ioepoch);
void lustre_swab_mgs_target_info(struct mgs_target_info *mti)
{
int i;
+
__swab32s(&mti->mti_lustre_ver);
__swab32s(&mti->mti_stripe_index);
__swab32s(&mti->mti_config_ver);
@@ -1979,7 +1823,7 @@ void lustre_swab_fid2path(struct getinfo_fid2path *gf)
}
EXPORT_SYMBOL(lustre_swab_fid2path);
-void lustre_swab_fiemap_extent(struct ll_fiemap_extent *fm_extent)
+static void lustre_swab_fiemap_extent(struct ll_fiemap_extent *fm_extent)
{
__swab64s(&fm_extent->fe_logical);
__swab64s(&fm_extent->fe_physical);
@@ -2018,15 +1862,6 @@ void lustre_swab_idx_info(struct idx_info *ii)
__swab16s(&ii->ii_recsize);
}
-void lustre_swab_lip_header(struct lu_idxpage *lip)
-{
- /* swab header */
- __swab32s(&lip->lip_magic);
- __swab16s(&lip->lip_flags);
- __swab16s(&lip->lip_nr);
-}
-EXPORT_SYMBOL(lustre_swab_lip_header);
-
void lustre_swab_mdt_rec_reint (struct mdt_rec_reint *rr)
{
__swab32s(&rr->rr_opcode);
@@ -2069,46 +1904,6 @@ void lustre_swab_lov_desc(struct lov_desc *ld)
}
EXPORT_SYMBOL(lustre_swab_lov_desc);
-void lustre_swab_lmv_desc(struct lmv_desc *ld)
-{
- __swab32s(&ld->ld_tgt_count);
- __swab32s(&ld->ld_active_tgt_count);
- __swab32s(&ld->ld_default_stripe_count);
- __swab32s(&ld->ld_pattern);
- __swab64s(&ld->ld_default_hash_size);
- __swab32s(&ld->ld_qos_maxage);
- /* uuid endian insensitive */
-}
-
-void lustre_swab_lmv_stripe_md(struct lmv_stripe_md *mea)
-{
- __swab32s(&mea->mea_magic);
- __swab32s(&mea->mea_count);
- __swab32s(&mea->mea_master);
- CLASSERT(offsetof(typeof(*mea), mea_padding) != 0);
-}
-
-void lustre_swab_lmv_user_md(struct lmv_user_md *lum)
-{
- int i;
-
- __swab32s(&lum->lum_magic);
- __swab32s(&lum->lum_stripe_count);
- __swab32s(&lum->lum_stripe_offset);
- __swab32s(&lum->lum_hash_type);
- __swab32s(&lum->lum_type);
- CLASSERT(offsetof(typeof(*lum), lum_padding1) != 0);
- CLASSERT(offsetof(typeof(*lum), lum_padding2) != 0);
- CLASSERT(offsetof(typeof(*lum), lum_padding3) != 0);
-
- for (i = 0; i < lum->lum_stripe_count; i++) {
- __swab32s(&lum->lum_objects[i].lum_mds);
- lustre_swab_lu_fid(&lum->lum_objects[i].lum_fid);
- }
-
-}
-EXPORT_SYMBOL(lustre_swab_lmv_user_md);
-
static void print_lum(struct lov_user_md *lum)
{
CDEBUG(D_OTHER, "lov_user_md %p:\n", lum);
@@ -2179,16 +1974,15 @@ void lustre_swab_lov_user_md_objects(struct lov_user_ost_data *lod,
}
EXPORT_SYMBOL(lustre_swab_lov_user_md_objects);
-void lustre_swab_ldlm_res_id(struct ldlm_res_id *id)
+static void lustre_swab_ldlm_res_id(struct ldlm_res_id *id)
{
int i;
for (i = 0; i < RES_NAME_SIZE; i++)
__swab64s(&id->name[i]);
}
-EXPORT_SYMBOL(lustre_swab_ldlm_res_id);
-void lustre_swab_ldlm_policy_data(ldlm_wire_policy_data_t *d)
+static void lustre_swab_ldlm_policy_data(ldlm_wire_policy_data_t *d)
{
/* the lock data is a union and the first two fields are always an
* extent so it's ok to process an LDLM_EXTENT and LDLM_FLOCK lock
@@ -2199,7 +1993,6 @@ void lustre_swab_ldlm_policy_data(ldlm_wire_policy_data_t *d)
__swab64s(&d->l_flock.lfw_owner);
__swab32s(&d->l_flock.lfw_pid);
}
-EXPORT_SYMBOL(lustre_swab_ldlm_policy_data);
void lustre_swab_ldlm_intent(struct ldlm_intent *i)
{
@@ -2207,22 +2000,20 @@ void lustre_swab_ldlm_intent(struct ldlm_intent *i)
}
EXPORT_SYMBOL(lustre_swab_ldlm_intent);
-void lustre_swab_ldlm_resource_desc(struct ldlm_resource_desc *r)
+static void lustre_swab_ldlm_resource_desc(struct ldlm_resource_desc *r)
{
__swab32s(&r->lr_type);
CLASSERT(offsetof(typeof(*r), lr_padding) != 0);
lustre_swab_ldlm_res_id(&r->lr_name);
}
-EXPORT_SYMBOL(lustre_swab_ldlm_resource_desc);
-void lustre_swab_ldlm_lock_desc(struct ldlm_lock_desc *l)
+static void lustre_swab_ldlm_lock_desc(struct ldlm_lock_desc *l)
{
lustre_swab_ldlm_resource_desc(&l->l_resource);
__swab32s(&l->l_req_mode);
__swab32s(&l->l_granted_mode);
lustre_swab_ldlm_policy_data(&l->l_policy_data);
}
-EXPORT_SYMBOL(lustre_swab_ldlm_lock_desc);
void lustre_swab_ldlm_request(struct ldlm_request *rq)
{
@@ -2271,7 +2062,7 @@ void dump_rniobuf(struct niobuf_remote *nb)
}
EXPORT_SYMBOL(dump_rniobuf);
-void dump_obdo(struct obdo *oa)
+static void dump_obdo(struct obdo *oa)
{
__u32 valid = oa->o_valid;
@@ -2332,7 +2123,6 @@ void dump_obdo(struct obdo *oa)
if (valid & OBD_MD_FLCOOKIE)
CDEBUG(D_RPCTRACE, "obdo: o_lcookie = (llog_cookie dumping not yet implemented)\n");
}
-EXPORT_SYMBOL(dump_obdo);
void dump_ost_body(struct ost_body *ob)
{
@@ -2394,7 +2184,7 @@ void _debug_req(struct ptlrpc_request *req,
va_start(args, fmt);
libcfs_debug_vmsg2(msgdata, fmt, args,
- " req@%p x%llu/t%lld(%lld) o%d->%s@%s:%d/%d lens %d/%d e %d to %d dl " CFS_TIME_T " ref %d fl " REQ_FLAGS_FMT "/%x/%x rc %d/%d\n",
+ " req@%p x%llu/t%lld(%lld) o%d->%s@%s:%d/%d lens %d/%d e %d to %lld dl %lld ref %d fl " REQ_FLAGS_FMT "/%x/%x rc %d/%d\n",
req, req->rq_xid, req->rq_transno,
req_ok ? lustre_msg_get_transno(req->rq_reqmsg) : 0,
req_ok ? lustre_msg_get_opc(req->rq_reqmsg) : -1,
@@ -2406,8 +2196,8 @@ void _debug_req(struct ptlrpc_request *req,
libcfs_nid2str(nid),
req->rq_request_portal, req->rq_reply_portal,
req->rq_reqlen, req->rq_replen,
- req->rq_early_count, req->rq_timedout,
- req->rq_deadline,
+ req->rq_early_count, (s64)req->rq_timedout,
+ (s64)req->rq_deadline,
atomic_read(&req->rq_refcount),
DEBUG_REQ_FLAGS(req),
req_ok ? lustre_msg_get_flags(req->rq_reqmsg) : -1,
@@ -2431,14 +2221,6 @@ void lustre_swab_lustre_capa(struct lustre_capa *c)
}
EXPORT_SYMBOL(lustre_swab_lustre_capa);
-void lustre_swab_lustre_capa_key(struct lustre_capa_key *k)
-{
- __swab64s(&k->lk_seq);
- __swab32s(&k->lk_keyid);
- CLASSERT(offsetof(typeof(*k), lk_padding) != 0);
-}
-EXPORT_SYMBOL(lustre_swab_lustre_capa_key);
-
void lustre_swab_hsm_user_state(struct hsm_user_state *state)
{
__swab32s(&state->hus_states);
@@ -2455,7 +2237,7 @@ void lustre_swab_hsm_state_set(struct hsm_state_set *hss)
}
EXPORT_SYMBOL(lustre_swab_hsm_state_set);
-void lustre_swab_hsm_extent(struct hsm_extent *extent)
+static void lustre_swab_hsm_extent(struct hsm_extent *extent)
{
__swab64s(&extent->offset);
__swab64s(&extent->length);
diff --git a/drivers/staging/lustre/lustre/ptlrpc/pers.c b/drivers/staging/lustre/lustre/ptlrpc/pers.c
index e1334c24e..2a2a9fb65 100644
--- a/drivers/staging/lustre/lustre/ptlrpc/pers.c
+++ b/drivers/staging/lustre/lustre/ptlrpc/pers.c
@@ -42,7 +42,6 @@
#include "ptlrpc_internal.h"
-
void ptlrpc_fill_bulk_md(lnet_md_t *md, struct ptlrpc_bulk_desc *desc,
int mdidx)
{
diff --git a/drivers/staging/lustre/lustre/ptlrpc/pinger.c b/drivers/staging/lustre/lustre/ptlrpc/pinger.c
index f8edb791a..5c719f175 100644
--- a/drivers/staging/lustre/lustre/ptlrpc/pinger.c
+++ b/drivers/staging/lustre/lustre/ptlrpc/pinger.c
@@ -44,20 +44,10 @@
#include "../include/obd_class.h"
#include "ptlrpc_internal.h"
-static int suppress_pings;
-module_param(suppress_pings, int, 0644);
-MODULE_PARM_DESC(suppress_pings, "Suppress pings");
-
struct mutex pinger_mutex;
static LIST_HEAD(pinger_imports);
static struct list_head timeout_list = LIST_HEAD_INIT(timeout_list);
-int ptlrpc_pinger_suppress_pings(void)
-{
- return suppress_pings;
-}
-EXPORT_SYMBOL(ptlrpc_pinger_suppress_pings);
-
struct ptlrpc_request *
ptlrpc_prep_ping(struct obd_import *imp)
{
@@ -105,7 +95,7 @@ static int ptlrpc_ping(struct obd_import *imp)
DEBUG_REQ(D_INFO, req, "pinging %s->%s",
imp->imp_obd->obd_uuid.uuid, obd2cli_tgt(imp->imp_obd));
- ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
+ ptlrpcd_add_req(req);
return 0;
}
@@ -113,6 +103,7 @@ static int ptlrpc_ping(struct obd_import *imp)
static void ptlrpc_update_next_ping(struct obd_import *imp, int soon)
{
int time = soon ? PING_INTERVAL_SHORT : PING_INTERVAL;
+
if (imp->imp_state == LUSTRE_IMP_DISCON) {
int dtime = max_t(int, CONNECTION_SWITCH_MIN,
AT_OFF ? 0 :
@@ -122,11 +113,6 @@ static void ptlrpc_update_next_ping(struct obd_import *imp, int soon)
imp->imp_next_ping = cfs_time_shift(time);
}
-void ptlrpc_ping_import_soon(struct obd_import *imp)
-{
- imp->imp_next_ping = cfs_time_current();
-}
-
static inline int imp_is_deactive(struct obd_import *imp)
{
return (imp->imp_deactive ||
@@ -150,6 +136,7 @@ static long pinger_check_timeout(unsigned long time)
mutex_lock(&pinger_mutex);
list_for_each_entry(item, &timeout_list, ti_chain) {
int ti_timeout = item->ti_timeout;
+
if (timeout > ti_timeout)
timeout = ti_timeout;
break;
@@ -234,7 +221,7 @@ static void ptlrpc_pinger_process_import(struct obd_import *imp,
static int ptlrpc_pinger_main(void *arg)
{
- struct ptlrpc_thread *thread = (struct ptlrpc_thread *)arg;
+ struct ptlrpc_thread *thread = arg;
/* Record that the thread is running */
thread_set_flags(thread, SVC_RUNNING);
@@ -266,8 +253,6 @@ static int ptlrpc_pinger_main(void *arg)
ptlrpc_update_next_ping(imp, 0);
}
mutex_unlock(&pinger_mutex);
- /* update memory usage info */
- obd_update_maxusage();
/* Wait until the next ping time, or until we're stopped. */
time_to_next_wake = pinger_check_timeout(this_ping);
@@ -277,8 +262,8 @@ static int ptlrpc_pinger_main(void *arg)
next ping time to next_ping + .01 sec, which means
we will SKIP the next ping at next_ping, and the
ping will get sent 2 timeouts from now! Beware. */
- CDEBUG(D_INFO, "next wakeup in "CFS_DURATION_T" ("
- CFS_TIME_T")\n", time_to_next_wake,
+ CDEBUG(D_INFO, "next wakeup in " CFS_DURATION_T " (%ld)\n",
+ time_to_next_wake,
cfs_time_add(this_ping,
cfs_time_seconds(PING_INTERVAL)));
if (time_to_next_wake > 0) {
@@ -327,13 +312,10 @@ int ptlrpc_start_pinger(void)
l_wait_event(pinger_thread.t_ctl_waitq,
thread_is_running(&pinger_thread), &lwi);
- if (suppress_pings)
- CWARN("Pings will be suppressed at the request of the administrator. The configuration shall meet the additional requirements described in the manual. (Search for the \"suppress_pings\" kernel module parameter.)\n");
-
return 0;
}
-int ptlrpc_pinger_remove_timeouts(void);
+static int ptlrpc_pinger_remove_timeouts(void);
int ptlrpc_stop_pinger(void)
{
@@ -517,7 +499,7 @@ int ptlrpc_del_timeout_client(struct list_head *obd_list,
}
EXPORT_SYMBOL(ptlrpc_del_timeout_client);
-int ptlrpc_pinger_remove_timeouts(void)
+static int ptlrpc_pinger_remove_timeouts(void)
{
struct timeout_item *item, *tmp;
@@ -536,139 +518,3 @@ void ptlrpc_pinger_wake_up(void)
thread_add_flags(&pinger_thread, SVC_EVENT);
wake_up(&pinger_thread.t_ctl_waitq);
}
-
-/* Ping evictor thread */
-#define PET_READY 1
-#define PET_TERMINATE 2
-
-static int pet_refcount;
-static int pet_state;
-static wait_queue_head_t pet_waitq;
-static LIST_HEAD(pet_list);
-static DEFINE_SPINLOCK(pet_lock);
-
-int ping_evictor_wake(struct obd_export *exp)
-{
- struct obd_device *obd;
-
- spin_lock(&pet_lock);
- if (pet_state != PET_READY) {
- /* eventually the new obd will call here again. */
- spin_unlock(&pet_lock);
- return 1;
- }
-
- obd = class_exp2obd(exp);
- if (list_empty(&obd->obd_evict_list)) {
- class_incref(obd, "evictor", obd);
- list_add(&obd->obd_evict_list, &pet_list);
- }
- spin_unlock(&pet_lock);
-
- wake_up(&pet_waitq);
- return 0;
-}
-
-static int ping_evictor_main(void *arg)
-{
- struct obd_device *obd;
- struct obd_export *exp;
- struct l_wait_info lwi = { 0 };
- time_t expire_time;
-
- unshare_fs_struct();
-
- CDEBUG(D_HA, "Starting Ping Evictor\n");
- pet_state = PET_READY;
- while (1) {
- l_wait_event(pet_waitq, (!list_empty(&pet_list)) ||
- (pet_state == PET_TERMINATE), &lwi);
-
- /* loop until all obd's will be removed */
- if ((pet_state == PET_TERMINATE) && list_empty(&pet_list))
- break;
-
- /* we only get here if pet_exp != NULL, and the end of this
- * loop is the only place which sets it NULL again, so lock
- * is not strictly necessary. */
- spin_lock(&pet_lock);
- obd = list_entry(pet_list.next, struct obd_device,
- obd_evict_list);
- spin_unlock(&pet_lock);
-
- expire_time = get_seconds() - PING_EVICT_TIMEOUT;
-
- CDEBUG(D_HA, "evicting all exports of obd %s older than %ld\n",
- obd->obd_name, expire_time);
-
- /* Exports can't be deleted out of the list while we hold
- * the obd lock (class_unlink_export), which means we can't
- * lose the last ref on the export. If they've already been
- * removed from the list, we won't find them here. */
- spin_lock(&obd->obd_dev_lock);
- while (!list_empty(&obd->obd_exports_timed)) {
- exp = list_entry(obd->obd_exports_timed.next,
- struct obd_export,
- exp_obd_chain_timed);
- if (expire_time > exp->exp_last_request_time) {
- class_export_get(exp);
- spin_unlock(&obd->obd_dev_lock);
- LCONSOLE_WARN("%s: haven't heard from client %s (at %s) in %ld seconds. I think it's dead, and I am evicting it. exp %p, cur %ld expire %ld last %ld\n",
- obd->obd_name,
- obd_uuid2str(&exp->exp_client_uuid),
- obd_export_nid2str(exp),
- (long)(get_seconds() -
- exp->exp_last_request_time),
- exp, (long)get_seconds(),
- (long)expire_time,
- (long)exp->exp_last_request_time);
- CDEBUG(D_HA, "Last request was at %ld\n",
- exp->exp_last_request_time);
- class_fail_export(exp);
- class_export_put(exp);
- spin_lock(&obd->obd_dev_lock);
- } else {
- /* List is sorted, so everyone below is ok */
- break;
- }
- }
- spin_unlock(&obd->obd_dev_lock);
-
- spin_lock(&pet_lock);
- list_del_init(&obd->obd_evict_list);
- spin_unlock(&pet_lock);
-
- class_decref(obd, "evictor", obd);
- }
- CDEBUG(D_HA, "Exiting Ping Evictor\n");
-
- return 0;
-}
-
-void ping_evictor_start(void)
-{
- struct task_struct *task;
-
- if (++pet_refcount > 1)
- return;
-
- init_waitqueue_head(&pet_waitq);
-
- task = kthread_run(ping_evictor_main, NULL, "ll_evictor");
- if (IS_ERR(task)) {
- pet_refcount--;
- CERROR("Cannot start ping evictor thread: %ld\n",
- PTR_ERR(task));
- }
-}
-EXPORT_SYMBOL(ping_evictor_start);
-
-void ping_evictor_stop(void)
-{
- if (--pet_refcount > 0)
- return;
-
- pet_state = PET_TERMINATE;
- wake_up(&pet_waitq);
-}
-EXPORT_SYMBOL(ping_evictor_stop);
diff --git a/drivers/staging/lustre/lustre/ptlrpc/ptlrpc_internal.h b/drivers/staging/lustre/lustre/ptlrpc/ptlrpc_internal.h
index 6dc3998dc..ab6c4580f 100644
--- a/drivers/staging/lustre/lustre/ptlrpc/ptlrpc_internal.h
+++ b/drivers/staging/lustre/lustre/ptlrpc/ptlrpc_internal.h
@@ -47,10 +47,14 @@ struct ldlm_res_id;
struct ptlrpc_request_set;
extern int test_req_buffer_pressure;
extern struct mutex ptlrpc_all_services_mutex;
+extern struct list_head ptlrpc_all_services;
+
+extern struct mutex ptlrpcd_mutex;
+extern struct mutex pinger_mutex;
int ptlrpc_start_thread(struct ptlrpc_service_part *svcpt, int wait);
/* ptlrpcd.c */
-int ptlrpcd_start(int index, int max, const char *name, struct ptlrpcd_ctl *pc);
+int ptlrpcd_start(struct ptlrpcd_ctl *pc);
/* client.c */
struct ptlrpc_bulk_desc *ptlrpc_new_bulk(unsigned npages, unsigned max_brw,
@@ -110,6 +114,8 @@ struct nrs_core {
};
+extern struct nrs_core nrs_core;
+
int ptlrpc_service_nrs_setup(struct ptlrpc_service *svc);
void ptlrpc_service_nrs_cleanup(struct ptlrpc_service *svc);
@@ -131,13 +137,6 @@ ptlrpc_nrs_req_get_nolock(struct ptlrpc_service_part *svcpt, bool hp,
return ptlrpc_nrs_req_get_nolock0(svcpt, hp, false, force);
}
-static inline struct ptlrpc_request *
-ptlrpc_nrs_req_peek_nolock(struct ptlrpc_service_part *svcpt, bool hp)
-{
- return ptlrpc_nrs_req_get_nolock0(svcpt, hp, true, false);
-}
-
-void ptlrpc_nrs_req_del_nolock(struct ptlrpc_request *req);
bool ptlrpc_nrs_req_pending_nolock(struct ptlrpc_service_part *svcpt, bool hp);
int ptlrpc_nrs_policy_control(const struct ptlrpc_service *svc,
@@ -243,8 +242,6 @@ int ptlrpc_stop_pinger(void);
void ptlrpc_pinger_sending_on_import(struct obd_import *imp);
void ptlrpc_pinger_commit_expected(struct obd_import *imp);
void ptlrpc_pinger_wake_up(void);
-void ptlrpc_ping_import_soon(struct obd_import *imp);
-int ping_evictor_wake(struct obd_export *exp);
/* sec_null.c */
int sptlrpc_null_init(void);
@@ -298,6 +295,6 @@ static inline void tgt_mod_exit(void)
static inline void ptlrpc_reqset_put(struct ptlrpc_request_set *set)
{
if (atomic_dec_and_test(&set->set_refcount))
- OBD_FREE_PTR(set);
+ kfree(set);
}
#endif /* PTLRPC_INTERNAL_H */
diff --git a/drivers/staging/lustre/lustre/ptlrpc/ptlrpc_module.c b/drivers/staging/lustre/lustre/ptlrpc/ptlrpc_module.c
index ae99180d6..9deeb2441 100644
--- a/drivers/staging/lustre/lustre/ptlrpc/ptlrpc_module.c
+++ b/drivers/staging/lustre/lustre/ptlrpc/ptlrpc_module.c
@@ -36,7 +36,6 @@
#define DEBUG_SUBSYSTEM S_RPC
-
#include "../include/obd_support.h"
#include "../include/obd_class.h"
#include "../include/lustre_net.h"
@@ -48,8 +47,6 @@ extern spinlock_t ptlrpc_last_xid_lock;
#if RS_DEBUG
extern spinlock_t ptlrpc_rs_debug_lock;
#endif
-extern struct mutex pinger_mutex;
-extern struct mutex ptlrpcd_mutex;
static int __init ptlrpc_init(void)
{
@@ -143,7 +140,8 @@ cleanup:
ptlrpc_hr_fini();
req_layout_fini();
/* Fall through */
- default: ;
+ default:
+ ;
}
return rc;
diff --git a/drivers/staging/lustre/lustre/ptlrpc/ptlrpcd.c b/drivers/staging/lustre/lustre/ptlrpc/ptlrpcd.c
index 17cc81d50..ce036a1ac 100644
--- a/drivers/staging/lustre/lustre/ptlrpc/ptlrpcd.c
+++ b/drivers/staging/lustre/lustre/ptlrpc/ptlrpcd.c
@@ -67,22 +67,94 @@
#include "ptlrpc_internal.h"
+/* One of these per CPT. */
struct ptlrpcd {
int pd_size;
int pd_index;
+ int pd_cpt;
+ int pd_cursor;
int pd_nthreads;
- struct ptlrpcd_ctl pd_thread_rcv;
+ int pd_groupsize;
struct ptlrpcd_ctl pd_threads[0];
};
+/*
+ * max_ptlrpcds is obsolete, but retained to ensure that the kernel
+ * module will load on a system where it has been tuned.
+ * A value other than 0 implies it was tuned, in which case the value
+ * is used to derive a setting for ptlrpcd_per_cpt_max.
+ */
static int max_ptlrpcds;
module_param(max_ptlrpcds, int, 0644);
MODULE_PARM_DESC(max_ptlrpcds, "Max ptlrpcd thread count to be started.");
-static int ptlrpcd_bind_policy = PDB_POLICY_PAIR;
+/*
+ * ptlrpcd_bind_policy is obsolete, but retained to ensure that
+ * the kernel module will load on a system where it has been tuned.
+ * A value other than 0 implies it was tuned, in which case the value
+ * is used to derive a setting for ptlrpcd_partner_group_size.
+ */
+static int ptlrpcd_bind_policy;
module_param(ptlrpcd_bind_policy, int, 0644);
-MODULE_PARM_DESC(ptlrpcd_bind_policy, "Ptlrpcd threads binding mode.");
-static struct ptlrpcd *ptlrpcds;
+MODULE_PARM_DESC(ptlrpcd_bind_policy,
+ "Ptlrpcd threads binding mode (obsolete).");
+
+/*
+ * ptlrpcd_per_cpt_max: The maximum number of ptlrpcd threads to run
+ * in a CPT.
+ */
+static int ptlrpcd_per_cpt_max;
+module_param(ptlrpcd_per_cpt_max, int, 0644);
+MODULE_PARM_DESC(ptlrpcd_per_cpt_max,
+ "Max ptlrpcd thread count to be started per cpt.");
+
+/*
+ * ptlrpcd_partner_group_size: The desired number of threads in each
+ * ptlrpcd partner thread group. Default is 2, corresponding to the
+ * old PDB_POLICY_PAIR. A negative value makes all ptlrpcd threads in
+ * a CPT partners of each other.
+ */
+static int ptlrpcd_partner_group_size;
+module_param(ptlrpcd_partner_group_size, int, 0644);
+MODULE_PARM_DESC(ptlrpcd_partner_group_size,
+ "Number of ptlrpcd threads in a partner group.");
+
+/*
+ * ptlrpcd_cpts: A CPT string describing the CPU partitions that
+ * ptlrpcd threads should run on. Used to make ptlrpcd threads run on
+ * a subset of all CPTs.
+ *
+ * ptlrpcd_cpts=2
+ * ptlrpcd_cpts=[2]
+ * run ptlrpcd threads only on CPT 2.
+ *
+ * ptlrpcd_cpts=0-3
+ * ptlrpcd_cpts=[0-3]
+ * run ptlrpcd threads on CPTs 0, 1, 2, and 3.
+ *
+ * ptlrpcd_cpts=[0-3,5,7]
+ * run ptlrpcd threads on CPTS 0, 1, 2, 3, 5, and 7.
+ */
+static char *ptlrpcd_cpts;
+module_param(ptlrpcd_cpts, charp, 0644);
+MODULE_PARM_DESC(ptlrpcd_cpts,
+ "CPU partitions ptlrpcd threads should run in");
+
+/* ptlrpcds_cpt_idx maps cpt numbers to an index in the ptlrpcds array. */
+static int *ptlrpcds_cpt_idx;
+
+/* ptlrpcds_num is the number of entries in the ptlrpcds array. */
+static int ptlrpcds_num;
+static struct ptlrpcd **ptlrpcds;
+
+/*
+ * In addition to the regular thread pool above, there is a single
+ * global recovery thread. Recovery isn't critical for performance,
+ * and doesn't block, but must always be able to proceed, and it is
+ * possible that all normal ptlrpcd threads are blocked. Hence the
+ * need for a dedicated thread.
+ */
+static struct ptlrpcd_ctl ptlrpcd_rcv;
struct mutex ptlrpcd_mutex;
static int ptlrpcd_users;
@@ -98,88 +170,30 @@ void ptlrpcd_wake(struct ptlrpc_request *req)
EXPORT_SYMBOL(ptlrpcd_wake);
static struct ptlrpcd_ctl *
-ptlrpcd_select_pc(struct ptlrpc_request *req, pdl_policy_t policy, int index)
+ptlrpcd_select_pc(struct ptlrpc_request *req)
{
- int idx = 0;
+ struct ptlrpcd *pd;
+ int cpt;
+ int idx;
if (req != NULL && req->rq_send_state != LUSTRE_IMP_FULL)
- return &ptlrpcds->pd_thread_rcv;
-
- switch (policy) {
- case PDL_POLICY_SAME:
- idx = smp_processor_id() % ptlrpcds->pd_nthreads;
- break;
- case PDL_POLICY_LOCAL:
- /* Before CPU partition patches available, process it the same
- * as "PDL_POLICY_ROUND". */
-# ifdef CFS_CPU_MODE_NUMA
-# warning "fix this code to use new CPU partition APIs"
-# endif
- /* Fall through to PDL_POLICY_ROUND until the CPU
- * CPU partition patches are available. */
- index = -1;
- case PDL_POLICY_PREFERRED:
- if (index >= 0 && index < num_online_cpus()) {
- idx = index % ptlrpcds->pd_nthreads;
- break;
- }
- /* Fall through to PDL_POLICY_ROUND for bad index. */
- default:
- /* Fall through to PDL_POLICY_ROUND for unknown policy. */
- case PDL_POLICY_ROUND:
- /* We do not care whether it is strict load balance. */
- idx = ptlrpcds->pd_index + 1;
- if (idx == smp_processor_id())
- idx++;
- idx %= ptlrpcds->pd_nthreads;
- ptlrpcds->pd_index = idx;
- break;
- }
-
- return &ptlrpcds->pd_threads[idx];
-}
-
-/**
- * Move all request from an existing request set to the ptlrpcd queue.
- * All requests from the set must be in phase RQ_PHASE_NEW.
- */
-void ptlrpcd_add_rqset(struct ptlrpc_request_set *set)
-{
- struct list_head *tmp, *pos;
- struct ptlrpcd_ctl *pc;
- struct ptlrpc_request_set *new;
- int count, i;
+ return &ptlrpcd_rcv;
- pc = ptlrpcd_select_pc(NULL, PDL_POLICY_LOCAL, -1);
- new = pc->pc_set;
+ cpt = cfs_cpt_current(cfs_cpt_table, 1);
+ if (!ptlrpcds_cpt_idx)
+ idx = cpt;
+ else
+ idx = ptlrpcds_cpt_idx[cpt];
+ pd = ptlrpcds[idx];
- list_for_each_safe(pos, tmp, &set->set_requests) {
- struct ptlrpc_request *req =
- list_entry(pos, struct ptlrpc_request,
- rq_set_chain);
-
- LASSERT(req->rq_phase == RQ_PHASE_NEW);
- req->rq_set = new;
- req->rq_queued_time = cfs_time_current();
- }
+ /* We do not care whether it is strict load balance. */
+ idx = pd->pd_cursor;
+ if (++idx == pd->pd_nthreads)
+ idx = 0;
+ pd->pd_cursor = idx;
- spin_lock(&new->set_new_req_lock);
- list_splice_init(&set->set_requests, &new->set_new_requests);
- i = atomic_read(&set->set_remaining);
- count = atomic_add_return(i, &new->set_new_count);
- atomic_set(&set->set_remaining, 0);
- spin_unlock(&new->set_new_req_lock);
- if (count == i) {
- wake_up(&new->set_waitq);
-
- /* XXX: It maybe unnecessary to wakeup all the partners. But to
- * guarantee the async RPC can be processed ASAP, we have
- * no other better choice. It maybe fixed in future. */
- for (i = 0; i < pc->pc_npartners; i++)
- wake_up(&pc->pc_partners[i]->pc_set->set_waitq);
- }
+ return &pd->pd_threads[idx];
}
-EXPORT_SYMBOL(ptlrpcd_add_rqset);
/**
* Return transferred RPCs count.
@@ -212,7 +226,7 @@ static int ptlrpcd_steal_rqset(struct ptlrpc_request_set *des,
* Requests that are added to the ptlrpcd queue are sent via
* ptlrpcd_check->ptlrpc_check_set().
*/
-void ptlrpcd_add_req(struct ptlrpc_request *req, pdl_policy_t policy, int idx)
+void ptlrpcd_add_req(struct ptlrpc_request *req)
{
struct ptlrpcd_ctl *pc;
@@ -242,7 +256,7 @@ void ptlrpcd_add_req(struct ptlrpc_request *req, pdl_policy_t policy, int idx)
spin_unlock(&req->rq_lock);
}
- pc = ptlrpcd_select_pc(req, policy, idx);
+ pc = ptlrpcd_select_pc(req);
DEBUG_REQ(D_INFO, req, "add req [%p] to pc [%s:%d]",
req, pc->pc_name, pc->pc_index);
@@ -372,25 +386,29 @@ static int ptlrpcd_check(struct lu_env *env, struct ptlrpcd_ctl *pc)
static int ptlrpcd(void *arg)
{
struct ptlrpcd_ctl *pc = arg;
- struct ptlrpc_request_set *set = pc->pc_set;
+ struct ptlrpc_request_set *set;
struct lu_env env = { .le_ses = NULL };
- int rc, exit = 0;
+ int rc = 0;
+ int exit = 0;
unshare_fs_struct();
-#if defined(CONFIG_SMP)
- if (test_bit(LIOD_BIND, &pc->pc_flags)) {
- int index = pc->pc_index;
-
- if (index >= 0 && index < num_possible_cpus()) {
- while (!cpu_online(index)) {
- if (++index >= num_possible_cpus())
- index = 0;
- }
- set_cpus_allowed_ptr(current,
- cpumask_of_node(cpu_to_node(index)));
- }
+ if (cfs_cpt_bind(cfs_cpt_table, pc->pc_cpt) != 0)
+ CWARN("Failed to bind %s on CPT %d\n", pc->pc_name, pc->pc_cpt);
+
+ /*
+ * Allocate the request set after the thread has been bound
+ * above. This is safe because no requests will be queued
+ * until all ptlrpcd threads have confirmed that they have
+ * successfully started.
+ */
+ set = ptlrpc_prep_set();
+ if (!set) {
+ rc = -ENOMEM;
+ goto failed;
}
-#endif
+ spin_lock(&pc->pc_lock);
+ pc->pc_set = set;
+ spin_unlock(&pc->pc_lock);
/*
* XXX So far only "client" ptlrpcd uses an environment. In
* the future, ptlrpcd thread (or a thread-set) has to given
@@ -398,10 +416,10 @@ static int ptlrpcd(void *arg)
*/
rc = lu_context_init(&env.le_ctx,
LCT_CL_THREAD|LCT_REMEMBER|LCT_NOREF);
- complete(&pc->pc_starting);
-
if (rc != 0)
- return rc;
+ goto failed;
+
+ complete(&pc->pc_starting);
/*
* This mainloop strongly resembles ptlrpc_set_wait() except that our
@@ -447,174 +465,97 @@ static int ptlrpcd(void *arg)
complete(&pc->pc_finishing);
return 0;
+failed:
+ pc->pc_error = rc;
+ complete(&pc->pc_starting);
+ return rc;
}
-/* XXX: We want multiple CPU cores to share the async RPC load. So we start many
- * ptlrpcd threads. We also want to reduce the ptlrpcd overhead caused by
- * data transfer cross-CPU cores. So we bind ptlrpcd thread to specified
- * CPU core. But binding all ptlrpcd threads maybe cause response delay
- * because of some CPU core(s) busy with other loads.
- *
- * For example: "ls -l", some async RPCs for statahead are assigned to
- * ptlrpcd_0, and ptlrpcd_0 is bound to CPU_0, but CPU_0 may be quite busy
- * with other non-ptlrpcd, like "ls -l" itself (we want to the "ls -l"
- * thread, statahead thread, and ptlrpcd thread can run in parallel), under
- * such case, the statahead async RPCs can not be processed in time, it is
- * unexpected. If ptlrpcd_0 can be re-scheduled on other CPU core, it may
- * be better. But it breaks former data transfer policy.
- *
- * So we shouldn't be blind for avoiding the data transfer. We make some
- * compromise: divide the ptlrpcd threads pool into two parts. One part is
- * for bound mode, each ptlrpcd thread in this part is bound to some CPU
- * core. The other part is for free mode, all the ptlrpcd threads in the
- * part can be scheduled on any CPU core. We specify some partnership
- * between bound mode ptlrpcd thread(s) and free mode ptlrpcd thread(s),
- * and the async RPC load within the partners are shared.
+static void ptlrpcd_ctl_init(struct ptlrpcd_ctl *pc, int index, int cpt)
+{
+ pc->pc_index = index;
+ pc->pc_cpt = cpt;
+ init_completion(&pc->pc_starting);
+ init_completion(&pc->pc_finishing);
+ spin_lock_init(&pc->pc_lock);
+
+ if (index < 0) {
+ /* Recovery thread. */
+ snprintf(pc->pc_name, sizeof(pc->pc_name), "ptlrpcd_rcv");
+ } else {
+ /* Regular thread. */
+ snprintf(pc->pc_name, sizeof(pc->pc_name),
+ "ptlrpcd_%02d_%02d", cpt, index);
+ }
+}
+
+/* XXX: We want multiple CPU cores to share the async RPC load. So we
+ * start many ptlrpcd threads. We also want to reduce the ptlrpcd
+ * overhead caused by data transfer cross-CPU cores. So we bind
+ * all ptlrpcd threads to a CPT, in the expectation that CPTs
+ * will be defined in a way that matches these boundaries. Within
+ * a CPT a ptlrpcd thread can be scheduled on any available core.
*
- * It can partly avoid data transfer cross-CPU (if the bound mode ptlrpcd
- * thread can be scheduled in time), and try to guarantee the async RPC
- * processed ASAP (as long as the free mode ptlrpcd thread can be scheduled
- * on any CPU core).
+ * Each ptlrpcd thread has its own request queue. This can cause
+ * response delay if the thread is already busy. To help with
+ * this we define partner threads: these are other threads bound
+ * to the same CPT which will check for work in each other's
+ * request queues if they have no work to do.
*
- * As for how to specify the partnership between bound mode ptlrpcd
- * thread(s) and free mode ptlrpcd thread(s), the simplest way is to use
- * <free bound> pair. In future, we can specify some more complex
- * partnership based on the patches for CPU partition. But before such
- * patches are available, we prefer to use the simplest one.
+ * The desired number of partner threads can be tuned by setting
+ * ptlrpcd_partner_group_size. The default is to create pairs of
+ * partner threads.
*/
-# ifdef CFS_CPU_MODE_NUMA
-# warning "fix ptlrpcd_bind() to use new CPU partition APIs"
-# endif
-static int ptlrpcd_bind(int index, int max)
+static int ptlrpcd_partners(struct ptlrpcd *pd, int index)
{
struct ptlrpcd_ctl *pc;
+ struct ptlrpcd_ctl **ppc;
+ int first;
+ int i;
int rc = 0;
-#if defined(CONFIG_NUMA)
- cpumask_t mask;
-#endif
+ int size;
- LASSERT(index <= max - 1);
- pc = &ptlrpcds->pd_threads[index];
- switch (ptlrpcd_bind_policy) {
- case PDB_POLICY_NONE:
- pc->pc_npartners = -1;
- break;
- case PDB_POLICY_FULL:
+ LASSERT(index >= 0 && index < pd->pd_nthreads);
+ pc = &pd->pd_threads[index];
+ pc->pc_npartners = pd->pd_groupsize - 1;
+
+ if (pc->pc_npartners <= 0)
+ goto out;
+
+ size = sizeof(struct ptlrpcd_ctl *) * pc->pc_npartners;
+ pc->pc_partners = kzalloc_node(size, GFP_NOFS,
+ cfs_cpt_spread_node(cfs_cpt_table,
+ pc->pc_cpt));
+ if (!pc->pc_partners) {
pc->pc_npartners = 0;
- set_bit(LIOD_BIND, &pc->pc_flags);
- break;
- case PDB_POLICY_PAIR:
- LASSERT(max % 2 == 0);
- pc->pc_npartners = 1;
- break;
- case PDB_POLICY_NEIGHBOR:
-#if defined(CONFIG_NUMA)
- {
- int i;
- cpumask_copy(&mask, cpumask_of_node(cpu_to_node(index)));
- for (i = max; i < num_online_cpus(); i++)
- cpumask_clear_cpu(i, &mask);
- pc->pc_npartners = cpumask_weight(&mask) - 1;
- set_bit(LIOD_BIND, &pc->pc_flags);
- }
-#else
- LASSERT(max >= 3);
- pc->pc_npartners = 2;
-#endif
- break;
- default:
- CERROR("unknown ptlrpcd bind policy %d\n", ptlrpcd_bind_policy);
- rc = -EINVAL;
+ rc = -ENOMEM;
+ goto out;
}
- if (rc == 0 && pc->pc_npartners > 0) {
- pc->pc_partners = kcalloc(pc->pc_npartners,
- sizeof(struct ptlrpcd_ctl *),
- GFP_NOFS);
- if (pc->pc_partners == NULL) {
- pc->pc_npartners = 0;
- rc = -ENOMEM;
- } else {
- switch (ptlrpcd_bind_policy) {
- case PDB_POLICY_PAIR:
- if (index & 0x1) {
- set_bit(LIOD_BIND, &pc->pc_flags);
- pc->pc_partners[0] = &ptlrpcds->
- pd_threads[index - 1];
- ptlrpcds->pd_threads[index - 1].
- pc_partners[0] = pc;
- }
- break;
- case PDB_POLICY_NEIGHBOR:
-#if defined(CONFIG_NUMA)
- {
- struct ptlrpcd_ctl *ppc;
- int i, pidx;
- /* partners are cores in the same NUMA node.
- * setup partnership only with ptlrpcd threads
- * that are already initialized
- */
- for (pidx = 0, i = 0; i < index; i++) {
- if (cpumask_test_cpu(i, &mask)) {
- ppc = &ptlrpcds->pd_threads[i];
- pc->pc_partners[pidx++] = ppc;
- ppc->pc_partners[ppc->
- pc_npartners++] = pc;
- }
- }
- /* adjust number of partners to the number
- * of partnership really setup */
- pc->pc_npartners = pidx;
- }
-#else
- if (index & 0x1)
- set_bit(LIOD_BIND, &pc->pc_flags);
- if (index > 0) {
- pc->pc_partners[0] = &ptlrpcds->
- pd_threads[index - 1];
- ptlrpcds->pd_threads[index - 1].
- pc_partners[1] = pc;
- if (index == max - 1) {
- pc->pc_partners[1] =
- &ptlrpcds->pd_threads[0];
- ptlrpcds->pd_threads[0].
- pc_partners[0] = pc;
- }
- }
-#endif
- break;
- }
- }
+ first = index - index % pd->pd_groupsize;
+ ppc = pc->pc_partners;
+ for (i = first; i < first + pd->pd_groupsize; i++) {
+ if (i != index)
+ *ppc++ = &pd->pd_threads[i];
}
-
+out:
return rc;
}
-
-int ptlrpcd_start(int index, int max, const char *name, struct ptlrpcd_ctl *pc)
+int ptlrpcd_start(struct ptlrpcd_ctl *pc)
{
- int rc;
+ struct task_struct *task;
+ int rc = 0;
/*
* Do not allow start second thread for one pc.
*/
if (test_and_set_bit(LIOD_START, &pc->pc_flags)) {
CWARN("Starting second thread (%s) for same pc %p\n",
- name, pc);
+ pc->pc_name, pc);
return 0;
}
- pc->pc_index = index;
- init_completion(&pc->pc_starting);
- init_completion(&pc->pc_finishing);
- spin_lock_init(&pc->pc_lock);
- strlcpy(pc->pc_name, name, sizeof(pc->pc_name));
- pc->pc_set = ptlrpc_prep_set();
- if (pc->pc_set == NULL) {
- rc = -ENOMEM;
- goto out;
- }
-
/*
* So far only "client" ptlrpcd uses an environment. In the future,
* ptlrpcd thread (or a thread-set) has to be given an argument,
@@ -622,29 +563,21 @@ int ptlrpcd_start(int index, int max, const char *name, struct ptlrpcd_ctl *pc)
*/
rc = lu_context_init(&pc->pc_env.le_ctx, LCT_CL_THREAD|LCT_REMEMBER);
if (rc != 0)
- goto out_set;
+ goto out;
- {
- struct task_struct *task;
- if (index >= 0) {
- rc = ptlrpcd_bind(index, max);
- if (rc < 0)
- goto out_env;
- }
+ task = kthread_run(ptlrpcd, pc, "%s", pc->pc_name);
+ if (IS_ERR(task)) {
+ rc = PTR_ERR(task);
+ goto out_set;
+ }
- task = kthread_run(ptlrpcd, pc, "%s", pc->pc_name);
- if (IS_ERR(task)) {
- rc = PTR_ERR(task);
- goto out_env;
- }
+ wait_for_completion(&pc->pc_starting);
+ rc = pc->pc_error;
+ if (rc != 0)
+ goto out_set;
- wait_for_completion(&pc->pc_starting);
- }
return 0;
-out_env:
- lu_context_fini(&pc->pc_env.le_ctx);
-
out_set:
if (pc->pc_set != NULL) {
struct ptlrpc_request_set *set = pc->pc_set;
@@ -654,7 +587,7 @@ out_set:
spin_unlock(&pc->pc_lock);
ptlrpc_set_destroy(set);
}
- clear_bit(LIOD_BIND, &pc->pc_flags);
+ lu_context_fini(&pc->pc_env.le_ctx);
out:
clear_bit(LIOD_START, &pc->pc_flags);
@@ -694,7 +627,6 @@ void ptlrpcd_free(struct ptlrpcd_ctl *pc)
clear_bit(LIOD_START, &pc->pc_flags);
clear_bit(LIOD_STOP, &pc->pc_flags);
clear_bit(LIOD_FORCE, &pc->pc_flags);
- clear_bit(LIOD_BIND, &pc->pc_flags);
out:
if (pc->pc_npartners > 0) {
@@ -704,88 +636,262 @@ out:
pc->pc_partners = NULL;
}
pc->pc_npartners = 0;
+ pc->pc_error = 0;
}
static void ptlrpcd_fini(void)
{
int i;
+ int j;
if (ptlrpcds != NULL) {
- for (i = 0; i < ptlrpcds->pd_nthreads; i++)
- ptlrpcd_stop(&ptlrpcds->pd_threads[i], 0);
- for (i = 0; i < ptlrpcds->pd_nthreads; i++)
- ptlrpcd_free(&ptlrpcds->pd_threads[i]);
- ptlrpcd_stop(&ptlrpcds->pd_thread_rcv, 0);
- ptlrpcd_free(&ptlrpcds->pd_thread_rcv);
+ for (i = 0; i < ptlrpcds_num; i++) {
+ if (!ptlrpcds[i])
+ break;
+ for (j = 0; j < ptlrpcds[i]->pd_nthreads; j++)
+ ptlrpcd_stop(&ptlrpcds[i]->pd_threads[j], 0);
+ for (j = 0; j < ptlrpcds[i]->pd_nthreads; j++)
+ ptlrpcd_free(&ptlrpcds[i]->pd_threads[j]);
+ kfree(ptlrpcds[i]);
+ ptlrpcds[i] = NULL;
+ }
kfree(ptlrpcds);
- ptlrpcds = NULL;
}
+ ptlrpcds_num = 0;
+
+ ptlrpcd_stop(&ptlrpcd_rcv, 0);
+ ptlrpcd_free(&ptlrpcd_rcv);
+
+ kfree(ptlrpcds_cpt_idx);
+ ptlrpcds_cpt_idx = NULL;
}
static int ptlrpcd_init(void)
{
- int nthreads = num_online_cpus();
- char name[16];
- int size, i = -1, j, rc = 0;
-
- if (max_ptlrpcds > 0 && max_ptlrpcds < nthreads)
- nthreads = max_ptlrpcds;
- if (nthreads < 2)
- nthreads = 2;
- if (nthreads < 3 && ptlrpcd_bind_policy == PDB_POLICY_NEIGHBOR)
- ptlrpcd_bind_policy = PDB_POLICY_PAIR;
- else if (nthreads % 2 != 0 && ptlrpcd_bind_policy == PDB_POLICY_PAIR)
- nthreads &= ~1; /* make sure it is even */
-
- size = offsetof(struct ptlrpcd, pd_threads[nthreads]);
- ptlrpcds = kzalloc(size, GFP_NOFS);
+ int nthreads;
+ int groupsize;
+ int size;
+ int i;
+ int j;
+ int rc = 0;
+ struct cfs_cpt_table *cptable;
+ __u32 *cpts = NULL;
+ int ncpts;
+ int cpt;
+ struct ptlrpcd *pd;
+
+ /*
+ * Determine the CPTs that ptlrpcd threads will run on.
+ */
+ cptable = cfs_cpt_table;
+ ncpts = cfs_cpt_number(cptable);
+ if (ptlrpcd_cpts) {
+ struct cfs_expr_list *el;
+
+ size = ncpts * sizeof(ptlrpcds_cpt_idx[0]);
+ ptlrpcds_cpt_idx = kzalloc(size, GFP_KERNEL);
+ if (!ptlrpcds_cpt_idx) {
+ rc = -ENOMEM;
+ goto out;
+ }
+
+ rc = cfs_expr_list_parse(ptlrpcd_cpts,
+ strlen(ptlrpcd_cpts),
+ 0, ncpts - 1, &el);
+
+ if (rc != 0) {
+ CERROR("ptlrpcd_cpts: invalid CPT pattern string: %s",
+ ptlrpcd_cpts);
+ rc = -EINVAL;
+ goto out;
+ }
+
+ rc = cfs_expr_list_values(el, ncpts, &cpts);
+ cfs_expr_list_free(el);
+ if (rc <= 0) {
+ CERROR("ptlrpcd_cpts: failed to parse CPT array %s: %d\n",
+ ptlrpcd_cpts, rc);
+ if (rc == 0)
+ rc = -EINVAL;
+ goto out;
+ }
+
+ /*
+ * Create the cpt-to-index map. When there is no match
+ * in the cpt table, pick a cpt at random. This could
+ * be changed to take the topology of the system into
+ * account.
+ */
+ for (cpt = 0; cpt < ncpts; cpt++) {
+ for (i = 0; i < rc; i++)
+ if (cpts[i] == cpt)
+ break;
+ if (i >= rc)
+ i = cpt % rc;
+ ptlrpcds_cpt_idx[cpt] = i;
+ }
+
+ cfs_expr_list_values_free(cpts, rc);
+ ncpts = rc;
+ }
+ ptlrpcds_num = ncpts;
+
+ size = ncpts * sizeof(ptlrpcds[0]);
+ ptlrpcds = kzalloc(size, GFP_KERNEL);
if (!ptlrpcds) {
rc = -ENOMEM;
goto out;
}
- snprintf(name, sizeof(name), "ptlrpcd_rcv");
- set_bit(LIOD_RECOVERY, &ptlrpcds->pd_thread_rcv.pc_flags);
- rc = ptlrpcd_start(-1, nthreads, name, &ptlrpcds->pd_thread_rcv);
+ /*
+ * The max_ptlrpcds parameter is obsolete, but do something
+ * sane if it has been tuned, and complain if
+ * ptlrpcd_per_cpt_max has also been tuned.
+ */
+ if (max_ptlrpcds != 0) {
+ CWARN("max_ptlrpcds is obsolete.\n");
+ if (ptlrpcd_per_cpt_max == 0) {
+ ptlrpcd_per_cpt_max = max_ptlrpcds / ncpts;
+ /* Round up if there is a remainder. */
+ if (max_ptlrpcds % ncpts != 0)
+ ptlrpcd_per_cpt_max++;
+ CWARN("Setting ptlrpcd_per_cpt_max = %d\n",
+ ptlrpcd_per_cpt_max);
+ } else {
+ CWARN("ptlrpd_per_cpt_max is also set!\n");
+ }
+ }
+
+ /*
+ * The ptlrpcd_bind_policy parameter is obsolete, but do
+ * something sane if it has been tuned, and complain if
+ * ptlrpcd_partner_group_size is also tuned.
+ */
+ if (ptlrpcd_bind_policy != 0) {
+ CWARN("ptlrpcd_bind_policy is obsolete.\n");
+ if (ptlrpcd_partner_group_size == 0) {
+ switch (ptlrpcd_bind_policy) {
+ case 1: /* PDB_POLICY_NONE */
+ case 2: /* PDB_POLICY_FULL */
+ ptlrpcd_partner_group_size = 1;
+ break;
+ case 3: /* PDB_POLICY_PAIR */
+ ptlrpcd_partner_group_size = 2;
+ break;
+ case 4: /* PDB_POLICY_NEIGHBOR */
+#ifdef CONFIG_NUMA
+ ptlrpcd_partner_group_size = -1; /* CPT */
+#else
+ ptlrpcd_partner_group_size = 3; /* Triplets */
+#endif
+ break;
+ default: /* Illegal value, use the default. */
+ ptlrpcd_partner_group_size = 2;
+ break;
+ }
+ CWARN("Setting ptlrpcd_partner_group_size = %d\n",
+ ptlrpcd_partner_group_size);
+ } else {
+ CWARN("ptlrpcd_partner_group_size is also set!\n");
+ }
+ }
+
+ if (ptlrpcd_partner_group_size == 0)
+ ptlrpcd_partner_group_size = 2;
+ else if (ptlrpcd_partner_group_size < 0)
+ ptlrpcd_partner_group_size = -1;
+ else if (ptlrpcd_per_cpt_max > 0 &&
+ ptlrpcd_partner_group_size > ptlrpcd_per_cpt_max)
+ ptlrpcd_partner_group_size = ptlrpcd_per_cpt_max;
+
+ /*
+ * Start the recovery thread first.
+ */
+ set_bit(LIOD_RECOVERY, &ptlrpcd_rcv.pc_flags);
+ ptlrpcd_ctl_init(&ptlrpcd_rcv, -1, CFS_CPT_ANY);
+ rc = ptlrpcd_start(&ptlrpcd_rcv);
if (rc < 0)
goto out;
- /* XXX: We start nthreads ptlrpc daemons. Each of them can process any
- * non-recovery async RPC to improve overall async RPC efficiency.
- *
- * But there are some issues with async I/O RPCs and async non-I/O
- * RPCs processed in the same set under some cases. The ptlrpcd may
- * be blocked by some async I/O RPC(s), then will cause other async
- * non-I/O RPC(s) can not be processed in time.
- *
- * Maybe we should distinguish blocked async RPCs from non-blocked
- * async RPCs, and process them in different ptlrpcd sets to avoid
- * unnecessary dependency. But how to distribute async RPCs load
- * among all the ptlrpc daemons becomes another trouble. */
- for (i = 0; i < nthreads; i++) {
- snprintf(name, sizeof(name), "ptlrpcd_%d", i);
- rc = ptlrpcd_start(i, nthreads, name, &ptlrpcds->pd_threads[i]);
- if (rc < 0)
+ for (i = 0; i < ncpts; i++) {
+ if (!cpts)
+ cpt = i;
+ else
+ cpt = cpts[i];
+
+ nthreads = cfs_cpt_weight(cptable, cpt);
+ if (ptlrpcd_per_cpt_max > 0 && ptlrpcd_per_cpt_max < nthreads)
+ nthreads = ptlrpcd_per_cpt_max;
+ if (nthreads < 2)
+ nthreads = 2;
+
+ if (ptlrpcd_partner_group_size <= 0) {
+ groupsize = nthreads;
+ } else if (nthreads <= ptlrpcd_partner_group_size) {
+ groupsize = nthreads;
+ } else {
+ groupsize = ptlrpcd_partner_group_size;
+ if (nthreads % groupsize != 0)
+ nthreads += groupsize - (nthreads % groupsize);
+ }
+
+ size = offsetof(struct ptlrpcd, pd_threads[nthreads]);
+ pd = kzalloc_node(size, GFP_NOFS,
+ cfs_cpt_spread_node(cfs_cpt_table, cpt));
+ if (!pd) {
+ rc = -ENOMEM;
goto out;
- }
+ }
+ pd->pd_size = size;
+ pd->pd_index = i;
+ pd->pd_cpt = cpt;
+ pd->pd_cursor = 0;
+ pd->pd_nthreads = nthreads;
+ pd->pd_groupsize = groupsize;
+ ptlrpcds[i] = pd;
- ptlrpcds->pd_size = size;
- ptlrpcds->pd_index = 0;
- ptlrpcds->pd_nthreads = nthreads;
+ /*
+ * The ptlrpcd threads in a partner group can access
+ * each other's struct ptlrpcd_ctl, so these must be
+ * initialized before any thread is started.
+ */
+ for (j = 0; j < nthreads; j++) {
+ ptlrpcd_ctl_init(&pd->pd_threads[j], j, cpt);
+ rc = ptlrpcd_partners(pd, j);
+ if (rc < 0)
+ goto out;
+ }
-out:
- if (rc != 0 && ptlrpcds != NULL) {
- for (j = 0; j <= i; j++)
- ptlrpcd_stop(&ptlrpcds->pd_threads[j], 0);
- for (j = 0; j <= i; j++)
- ptlrpcd_free(&ptlrpcds->pd_threads[j]);
- ptlrpcd_stop(&ptlrpcds->pd_thread_rcv, 0);
- ptlrpcd_free(&ptlrpcds->pd_thread_rcv);
- kfree(ptlrpcds);
- ptlrpcds = NULL;
+ /* XXX: We start nthreads ptlrpc daemons.
+ * Each of them can process any non-recovery
+ * async RPC to improve overall async RPC
+ * efficiency.
+ *
+ * But there are some issues with async I/O RPCs
+ * and async non-I/O RPCs processed in the same
+ * set under some cases. The ptlrpcd may be
+ * blocked by some async I/O RPC(s), then will
+ * cause other async non-I/O RPC(s) can not be
+ * processed in time.
+ *
+ * Maybe we should distinguish blocked async RPCs
+ * from non-blocked async RPCs, and process them
+ * in different ptlrpcd sets to avoid unnecessary
+ * dependency. But how to distribute async RPCs
+ * load among all the ptlrpc daemons becomes
+ * another trouble.
+ */
+ for (j = 0; j < nthreads; j++) {
+ rc = ptlrpcd_start(&pd->pd_threads[j]);
+ if (rc < 0)
+ goto out;
+ }
}
+out:
+ if (rc != 0)
+ ptlrpcd_fini();
- return 0;
+ return rc;
}
int ptlrpcd_addref(void)
diff --git a/drivers/staging/lustre/lustre/ptlrpc/sec.c b/drivers/staging/lustre/lustre/ptlrpc/sec.c
index b9821db22..39f5261c9 100644
--- a/drivers/staging/lustre/lustre/ptlrpc/sec.c
+++ b/drivers/staging/lustre/lustre/ptlrpc/sec.c
@@ -227,7 +227,7 @@ char *sptlrpc_flavor2name(struct sptlrpc_flavor *sf, char *buf, int bufsize)
}
EXPORT_SYMBOL(sptlrpc_flavor2name);
-char *sptlrpc_secflags2str(__u32 flags, char *buf, int bufsize)
+static char *sptlrpc_secflags2str(__u32 flags, char *buf, int bufsize)
{
buf[0] = '\0';
@@ -244,7 +244,6 @@ char *sptlrpc_secflags2str(__u32 flags, char *buf, int bufsize)
return buf;
}
-EXPORT_SYMBOL(sptlrpc_secflags2str);
/**************************************************
* client context APIs *
@@ -297,53 +296,13 @@ void sptlrpc_cli_ctx_put(struct ptlrpc_cli_ctx *ctx, int sync)
}
EXPORT_SYMBOL(sptlrpc_cli_ctx_put);
-/**
- * Expire the client context immediately.
- *
- * \pre Caller must hold at least 1 reference on the \a ctx.
- */
-void sptlrpc_cli_ctx_expire(struct ptlrpc_cli_ctx *ctx)
-{
- LASSERT(ctx->cc_ops->force_die);
- ctx->cc_ops->force_die(ctx, 0);
-}
-EXPORT_SYMBOL(sptlrpc_cli_ctx_expire);
-
-/**
- * To wake up the threads who are waiting for this client context. Called
- * after some status change happened on \a ctx.
- */
-void sptlrpc_cli_ctx_wakeup(struct ptlrpc_cli_ctx *ctx)
-{
- struct ptlrpc_request *req, *next;
-
- spin_lock(&ctx->cc_lock);
- list_for_each_entry_safe(req, next, &ctx->cc_req_list,
- rq_ctx_chain) {
- list_del_init(&req->rq_ctx_chain);
- ptlrpc_client_wake_req(req);
- }
- spin_unlock(&ctx->cc_lock);
-}
-EXPORT_SYMBOL(sptlrpc_cli_ctx_wakeup);
-
-int sptlrpc_cli_ctx_display(struct ptlrpc_cli_ctx *ctx, char *buf, int bufsize)
-{
- LASSERT(ctx->cc_ops);
-
- if (ctx->cc_ops->display == NULL)
- return 0;
-
- return ctx->cc_ops->display(ctx, buf, bufsize);
-}
-
static int import_sec_check_expire(struct obd_import *imp)
{
int adapt = 0;
spin_lock(&imp->imp_lock);
if (imp->imp_sec_expire &&
- imp->imp_sec_expire < get_seconds()) {
+ imp->imp_sec_expire < ktime_get_real_seconds()) {
adapt = 1;
imp->imp_sec_expire = 0;
}
@@ -510,7 +469,7 @@ int sptlrpc_req_ctx_switch(struct ptlrpc_request *req,
* \note a request must have a context, to keep other parts of code happy.
* In any case of failure during the switching, we must restore the old one.
*/
-int sptlrpc_req_replace_dead_ctx(struct ptlrpc_request *req)
+static int sptlrpc_req_replace_dead_ctx(struct ptlrpc_request *req)
{
struct ptlrpc_cli_ctx *oldctx = req->rq_cli_ctx;
struct ptlrpc_cli_ctx *newctx;
@@ -563,7 +522,6 @@ int sptlrpc_req_replace_dead_ctx(struct ptlrpc_request *req)
sptlrpc_cli_ctx_put(oldctx, 1);
return 0;
}
-EXPORT_SYMBOL(sptlrpc_req_replace_dead_ctx);
static
int ctx_check_refresh(struct ptlrpc_cli_ctx *ctx)
@@ -1229,12 +1187,6 @@ static void sec_cop_destroy_sec(struct ptlrpc_sec *sec)
sptlrpc_policy_put(policy);
}
-void sptlrpc_sec_destroy(struct ptlrpc_sec *sec)
-{
- sec_cop_destroy_sec(sec);
-}
-EXPORT_SYMBOL(sptlrpc_sec_destroy);
-
static void sptlrpc_sec_kill(struct ptlrpc_sec *sec)
{
LASSERT_ATOMIC_POS(&sec->ps_refcount);
@@ -1246,14 +1198,13 @@ static void sptlrpc_sec_kill(struct ptlrpc_sec *sec)
}
}
-struct ptlrpc_sec *sptlrpc_sec_get(struct ptlrpc_sec *sec)
+static struct ptlrpc_sec *sptlrpc_sec_get(struct ptlrpc_sec *sec)
{
if (sec)
atomic_inc(&sec->ps_refcount);
return sec;
}
-EXPORT_SYMBOL(sptlrpc_sec_get);
void sptlrpc_sec_put(struct ptlrpc_sec *sec)
{
@@ -1507,13 +1458,6 @@ static void import_flush_ctx_common(struct obd_import *imp,
sptlrpc_sec_put(sec);
}
-void sptlrpc_import_flush_root_ctx(struct obd_import *imp)
-{
- /* it's important to use grace mode, see explain in
- * sptlrpc_req_refresh_ctx() */
- import_flush_ctx_common(imp, 0, 1, 1);
-}
-
void sptlrpc_import_flush_my_ctx(struct obd_import *imp)
{
import_flush_ctx_common(imp, from_kuid(&init_user_ns, current_uid()),
@@ -1697,18 +1641,8 @@ void sptlrpc_cli_free_repbuf(struct ptlrpc_request *req)
req->rq_repmsg = NULL;
}
-int sptlrpc_cli_install_rvs_ctx(struct obd_import *imp,
- struct ptlrpc_cli_ctx *ctx)
-{
- struct ptlrpc_sec_policy *policy = ctx->cc_sec->ps_policy;
-
- if (!policy->sp_cops->install_rctx)
- return 0;
- return policy->sp_cops->install_rctx(imp, ctx->cc_sec, ctx);
-}
-
-int sptlrpc_svc_install_rvs_ctx(struct obd_import *imp,
- struct ptlrpc_svc_ctx *ctx)
+static int sptlrpc_svc_install_rvs_ctx(struct obd_import *imp,
+ struct ptlrpc_svc_ctx *ctx)
{
struct ptlrpc_sec_policy *policy = ctx->sc_policy;
@@ -1779,7 +1713,7 @@ int sptlrpc_target_export_check(struct obd_export *exp,
exp->exp_flvr_old[1] = exp->exp_flvr_old[0];
exp->exp_flvr_expire[1] = exp->exp_flvr_expire[0];
exp->exp_flvr_old[0] = exp->exp_flvr;
- exp->exp_flvr_expire[0] = get_seconds() +
+ exp->exp_flvr_expire[0] = ktime_get_real_seconds() +
EXP_FLVR_UPDATE_EXPIRE;
exp->exp_flvr = flavor;
@@ -1853,14 +1787,14 @@ int sptlrpc_target_export_check(struct obd_export *exp,
}
if (exp->exp_flvr_expire[0]) {
- if (exp->exp_flvr_expire[0] >= get_seconds()) {
+ if (exp->exp_flvr_expire[0] >= ktime_get_real_seconds()) {
if (flavor_allowed(&exp->exp_flvr_old[0], req)) {
- CDEBUG(D_SEC, "exp %p (%x|%x|%x): match the middle one (" CFS_DURATION_T ")\n", exp,
+ CDEBUG(D_SEC, "exp %p (%x|%x|%x): match the middle one (%lld)\n", exp,
exp->exp_flvr.sf_rpc,
exp->exp_flvr_old[0].sf_rpc,
exp->exp_flvr_old[1].sf_rpc,
- exp->exp_flvr_expire[0] -
- get_seconds());
+ (s64)(exp->exp_flvr_expire[0] -
+ ktime_get_real_seconds()));
spin_unlock(&exp->exp_lock);
return 0;
}
@@ -1877,15 +1811,15 @@ int sptlrpc_target_export_check(struct obd_export *exp,
/* now it doesn't match the current flavor, the only chance we can
* accept it is match the old flavors which is not expired. */
if (exp->exp_flvr_changed == 0 && exp->exp_flvr_expire[1]) {
- if (exp->exp_flvr_expire[1] >= get_seconds()) {
+ if (exp->exp_flvr_expire[1] >= ktime_get_real_seconds()) {
if (flavor_allowed(&exp->exp_flvr_old[1], req)) {
- CDEBUG(D_SEC, "exp %p (%x|%x|%x): match the oldest one (" CFS_DURATION_T ")\n",
+ CDEBUG(D_SEC, "exp %p (%x|%x|%x): match the oldest one (%lld)\n",
exp,
exp->exp_flvr.sf_rpc,
exp->exp_flvr_old[0].sf_rpc,
exp->exp_flvr_old[1].sf_rpc,
- exp->exp_flvr_expire[1] -
- get_seconds());
+ (s64)(exp->exp_flvr_expire[1] -
+ ktime_get_real_seconds()));
spin_unlock(&exp->exp_lock);
return 0;
}
@@ -1905,7 +1839,7 @@ int sptlrpc_target_export_check(struct obd_export *exp,
spin_unlock(&exp->exp_lock);
- CWARN("exp %p(%s): req %p (%u|%u|%u|%u|%u|%u) with unauthorized flavor %x, expect %x|%x(%+ld)|%x(%+ld)\n",
+ CWARN("exp %p(%s): req %p (%u|%u|%u|%u|%u|%u) with unauthorized flavor %x, expect %x|%x(%+lld)|%x(%+lld)\n",
exp, exp->exp_obd->obd_name,
req, req->rq_auth_gss, req->rq_ctx_init, req->rq_ctx_fini,
req->rq_auth_usr_root, req->rq_auth_usr_mdt, req->rq_auth_usr_ost,
@@ -1913,56 +1847,14 @@ int sptlrpc_target_export_check(struct obd_export *exp,
exp->exp_flvr.sf_rpc,
exp->exp_flvr_old[0].sf_rpc,
exp->exp_flvr_expire[0] ?
- (unsigned long) (exp->exp_flvr_expire[0] -
- get_seconds()) : 0,
+ (s64)(exp->exp_flvr_expire[0] - ktime_get_real_seconds()) : 0,
exp->exp_flvr_old[1].sf_rpc,
exp->exp_flvr_expire[1] ?
- (unsigned long) (exp->exp_flvr_expire[1] -
- get_seconds()) : 0);
+ (s64)(exp->exp_flvr_expire[1] - ktime_get_real_seconds()) : 0);
return -EACCES;
}
EXPORT_SYMBOL(sptlrpc_target_export_check);
-void sptlrpc_target_update_exp_flavor(struct obd_device *obd,
- struct sptlrpc_rule_set *rset)
-{
- struct obd_export *exp;
- struct sptlrpc_flavor new_flvr;
-
- LASSERT(obd);
-
- spin_lock(&obd->obd_dev_lock);
-
- list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain) {
- if (exp->exp_connection == NULL)
- continue;
-
- /* note if this export had just been updated flavor
- * (exp_flvr_changed == 1), this will override the
- * previous one. */
- spin_lock(&exp->exp_lock);
- sptlrpc_target_choose_flavor(rset, exp->exp_sp_peer,
- exp->exp_connection->c_peer.nid,
- &new_flvr);
- if (exp->exp_flvr_changed ||
- !flavor_equal(&new_flvr, &exp->exp_flvr)) {
- exp->exp_flvr_old[1] = new_flvr;
- exp->exp_flvr_expire[1] = 0;
- exp->exp_flvr_changed = 1;
- exp->exp_flvr_adapt = 1;
-
- CDEBUG(D_SEC, "exp %p (%s): updated flavor %x->%x\n",
- exp, sptlrpc_part2name(exp->exp_sp_peer),
- exp->exp_flvr.sf_rpc,
- exp->exp_flvr_old[1].sf_rpc);
- }
- spin_unlock(&exp->exp_lock);
- }
-
- spin_unlock(&obd->obd_dev_lock);
-}
-EXPORT_SYMBOL(sptlrpc_target_update_exp_flavor);
-
static int sptlrpc_svc_check_from(struct ptlrpc_request *req, int svc_rc)
{
/* peer's claim is unreliable unless gss is being used */
@@ -2090,6 +1982,7 @@ int sptlrpc_svc_alloc_rs(struct ptlrpc_request *req, int msglen)
rc = policy->sp_sops->alloc_rs(req, msglen);
if (unlikely(rc == -ENOMEM)) {
struct ptlrpc_service_part *svcpt = req->rq_rqbd->rqbd_svcpt;
+
if (svcpt->scp_service->srv_max_reply_size <
msglen + sizeof(struct ptlrpc_reply_state)) {
/* Just return failure if the size is too big */
@@ -2185,19 +2078,6 @@ void sptlrpc_svc_ctx_decref(struct ptlrpc_request *req)
req->rq_svc_ctx = NULL;
}
-void sptlrpc_svc_ctx_invalidate(struct ptlrpc_request *req)
-{
- struct ptlrpc_svc_ctx *ctx = req->rq_svc_ctx;
-
- if (ctx == NULL)
- return;
-
- LASSERT_ATOMIC_POS(&ctx->sc_refcount);
- if (ctx->sc_policy->sp_sops->invalidate_ctx)
- ctx->sc_policy->sp_sops->invalidate_ctx(ctx);
-}
-EXPORT_SYMBOL(sptlrpc_svc_ctx_invalidate);
-
/****************************************
* bulk security *
****************************************/
@@ -2285,7 +2165,6 @@ int sptlrpc_cli_unwrap_bulk_write(struct ptlrpc_request *req,
}
EXPORT_SYMBOL(sptlrpc_cli_unwrap_bulk_write);
-
/****************************************
* user descriptor helpers *
****************************************/
@@ -2382,14 +2261,14 @@ EXPORT_SYMBOL(sec2target_str);
/*
* return true if the bulk data is protected
*/
-int sptlrpc_flavor_has_bulk(struct sptlrpc_flavor *flvr)
+bool sptlrpc_flavor_has_bulk(struct sptlrpc_flavor *flvr)
{
switch (SPTLRPC_FLVR_BULK_SVC(flvr->sf_rpc)) {
case SPTLRPC_BULK_SVC_INTG:
case SPTLRPC_BULK_SVC_PRIV:
- return 1;
+ return true;
default:
- return 0;
+ return false;
}
}
EXPORT_SYMBOL(sptlrpc_flavor_has_bulk);
diff --git a/drivers/staging/lustre/lustre/ptlrpc/sec_bulk.c b/drivers/staging/lustre/lustre/ptlrpc/sec_bulk.c
index 2ee3e8b2e..cd8a9987f 100644
--- a/drivers/staging/lustre/lustre/ptlrpc/sec_bulk.c
+++ b/drivers/staging/lustre/lustre/ptlrpc/sec_bulk.c
@@ -58,7 +58,6 @@
* bulk encryption page pools *
****************************************/
-
#define POINTERS_PER_PAGE (PAGE_CACHE_SIZE / sizeof(void *))
#define PAGES_PER_POOL (POINTERS_PER_PAGE)
@@ -92,8 +91,8 @@ static struct ptlrpc_enc_page_pool {
unsigned long epp_idle_idx;
/* last shrink time due to mem tight */
- long epp_last_shrink;
- long epp_last_access;
+ time64_t epp_last_shrink;
+ time64_t epp_last_access;
/*
* in-pool pages bookkeeping
@@ -145,7 +144,7 @@ int sptlrpc_proc_enc_pool_seq_show(struct seq_file *m, void *v)
"cache missing: %lu\n"
"low free mark: %lu\n"
"max waitqueue depth: %u\n"
- "max wait time: " CFS_TIME_T "/%u\n",
+ "max wait time: %ld/%u\n",
totalram_pages,
PAGES_PER_POOL,
page_pools.epp_max_pages,
@@ -153,8 +152,8 @@ int sptlrpc_proc_enc_pool_seq_show(struct seq_file *m, void *v)
page_pools.epp_total_pages,
page_pools.epp_free_pages,
page_pools.epp_idle_idx,
- get_seconds() - page_pools.epp_last_shrink,
- get_seconds() - page_pools.epp_last_access,
+ (long)(ktime_get_seconds() - page_pools.epp_last_shrink),
+ (long)(ktime_get_seconds() - page_pools.epp_last_access),
page_pools.epp_st_max_pages,
page_pools.epp_st_grows,
page_pools.epp_st_grow_fails,
@@ -226,7 +225,7 @@ static unsigned long enc_pools_shrink_count(struct shrinker *s,
* if no pool access for a long time, we consider it's fully idle.
* a little race here is fine.
*/
- if (unlikely(get_seconds() - page_pools.epp_last_access >
+ if (unlikely(ktime_get_seconds() - page_pools.epp_last_access >
CACHE_QUIESCENT_PERIOD)) {
spin_lock(&page_pools.epp_lock);
page_pools.epp_idle_idx = IDLE_IDX_MAX;
@@ -253,7 +252,7 @@ static unsigned long enc_pools_shrink_scan(struct shrinker *s,
(long)sc->nr_to_scan, page_pools.epp_free_pages);
page_pools.epp_st_shrinks++;
- page_pools.epp_last_shrink = get_seconds();
+ page_pools.epp_last_shrink = ktime_get_seconds();
}
spin_unlock(&page_pools.epp_lock);
@@ -261,7 +260,7 @@ static unsigned long enc_pools_shrink_scan(struct shrinker *s,
* if no pool access for a long time, we consider it's fully idle.
* a little race here is fine.
*/
- if (unlikely(get_seconds() - page_pools.epp_last_access >
+ if (unlikely(ktime_get_seconds() - page_pools.epp_last_access >
CACHE_QUIESCENT_PERIOD)) {
spin_lock(&page_pools.epp_lock);
page_pools.epp_idle_idx = IDLE_IDX_MAX;
@@ -302,150 +301,6 @@ static unsigned long enc_pools_cleanup(struct page ***pools, int npools)
return cleaned;
}
-/*
- * merge @npools pointed by @pools which contains @npages new pages
- * into current pools.
- *
- * we have options to avoid most memory copy with some tricks. but we choose
- * the simplest way to avoid complexity. It's not frequently called.
- */
-static void enc_pools_insert(struct page ***pools, int npools, int npages)
-{
- int freeslot;
- int op_idx, np_idx, og_idx, ng_idx;
- int cur_npools, end_npools;
-
- LASSERT(npages > 0);
- LASSERT(page_pools.epp_total_pages+npages <= page_pools.epp_max_pages);
- LASSERT(npages_to_npools(npages) == npools);
- LASSERT(page_pools.epp_growing);
-
- spin_lock(&page_pools.epp_lock);
-
- /*
- * (1) fill all the free slots of current pools.
- */
- /* free slots are those left by rent pages, and the extra ones with
- * index >= total_pages, locate at the tail of last pool. */
- freeslot = page_pools.epp_total_pages % PAGES_PER_POOL;
- if (freeslot != 0)
- freeslot = PAGES_PER_POOL - freeslot;
- freeslot += page_pools.epp_total_pages - page_pools.epp_free_pages;
-
- op_idx = page_pools.epp_free_pages / PAGES_PER_POOL;
- og_idx = page_pools.epp_free_pages % PAGES_PER_POOL;
- np_idx = npools - 1;
- ng_idx = (npages - 1) % PAGES_PER_POOL;
-
- while (freeslot) {
- LASSERT(page_pools.epp_pools[op_idx][og_idx] == NULL);
- LASSERT(pools[np_idx][ng_idx] != NULL);
-
- page_pools.epp_pools[op_idx][og_idx] = pools[np_idx][ng_idx];
- pools[np_idx][ng_idx] = NULL;
-
- freeslot--;
-
- if (++og_idx == PAGES_PER_POOL) {
- op_idx++;
- og_idx = 0;
- }
- if (--ng_idx < 0) {
- if (np_idx == 0)
- break;
- np_idx--;
- ng_idx = PAGES_PER_POOL - 1;
- }
- }
-
- /*
- * (2) add pools if needed.
- */
- cur_npools = (page_pools.epp_total_pages + PAGES_PER_POOL - 1) /
- PAGES_PER_POOL;
- end_npools = (page_pools.epp_total_pages + npages + PAGES_PER_POOL - 1)
- / PAGES_PER_POOL;
- LASSERT(end_npools <= page_pools.epp_max_pools);
-
- np_idx = 0;
- while (cur_npools < end_npools) {
- LASSERT(page_pools.epp_pools[cur_npools] == NULL);
- LASSERT(np_idx < npools);
- LASSERT(pools[np_idx] != NULL);
-
- page_pools.epp_pools[cur_npools++] = pools[np_idx];
- pools[np_idx++] = NULL;
- }
-
- page_pools.epp_total_pages += npages;
- page_pools.epp_free_pages += npages;
- page_pools.epp_st_lowfree = page_pools.epp_free_pages;
-
- if (page_pools.epp_total_pages > page_pools.epp_st_max_pages)
- page_pools.epp_st_max_pages = page_pools.epp_total_pages;
-
- CDEBUG(D_SEC, "add %d pages to total %lu\n", npages,
- page_pools.epp_total_pages);
-
- spin_unlock(&page_pools.epp_lock);
-}
-
-static int enc_pools_add_pages(int npages)
-{
- static DEFINE_MUTEX(add_pages_mutex);
- struct page ***pools;
- int npools, alloced = 0;
- int i, j, rc = -ENOMEM;
-
- if (npages < PTLRPC_MAX_BRW_PAGES)
- npages = PTLRPC_MAX_BRW_PAGES;
-
- mutex_lock(&add_pages_mutex);
-
- if (npages + page_pools.epp_total_pages > page_pools.epp_max_pages)
- npages = page_pools.epp_max_pages - page_pools.epp_total_pages;
- LASSERT(npages > 0);
-
- page_pools.epp_st_grows++;
-
- npools = npages_to_npools(npages);
- pools = kcalloc(npools, sizeof(*pools), GFP_NOFS);
- if (pools == NULL)
- goto out;
-
- for (i = 0; i < npools; i++) {
- pools[i] = kzalloc(PAGE_CACHE_SIZE, GFP_NOFS);
- if (!pools[i])
- goto out_pools;
-
- for (j = 0; j < PAGES_PER_POOL && alloced < npages; j++) {
- pools[i][j] = alloc_page(GFP_NOFS |
- __GFP_HIGHMEM);
- if (pools[i][j] == NULL)
- goto out_pools;
-
- alloced++;
- }
- }
- LASSERT(alloced == npages);
-
- enc_pools_insert(pools, npools, npages);
- CDEBUG(D_SEC, "added %d pages into pools\n", npages);
- rc = 0;
-
-out_pools:
- enc_pools_cleanup(pools, npools);
- kfree(pools);
-out:
- if (rc) {
- page_pools.epp_st_grow_fails++;
- CERROR("Failed to allocate %d enc pages\n", npages);
- }
-
- mutex_unlock(&add_pages_mutex);
- return rc;
-}
-
static inline void enc_pools_wakeup(void)
{
assert_spin_locked(&page_pools.epp_lock);
@@ -457,156 +312,6 @@ static inline void enc_pools_wakeup(void)
}
}
-static int enc_pools_should_grow(int page_needed, long now)
-{
- /* don't grow if someone else is growing the pools right now,
- * or the pools has reached its full capacity
- */
- if (page_pools.epp_growing ||
- page_pools.epp_total_pages == page_pools.epp_max_pages)
- return 0;
-
- /* if total pages is not enough, we need to grow */
- if (page_pools.epp_total_pages < page_needed)
- return 1;
-
- /*
- * we wanted to return 0 here if there was a shrink just happened
- * moment ago, but this may cause deadlock if both client and ost
- * live on single node.
- */
-#if 0
- if (now - page_pools.epp_last_shrink < 2)
- return 0;
-#endif
-
- /*
- * here we perhaps need consider other factors like wait queue
- * length, idle index, etc. ?
- */
-
- /* grow the pools in any other cases */
- return 1;
-}
-
-/*
- * we allocate the requested pages atomically.
- */
-int sptlrpc_enc_pool_get_pages(struct ptlrpc_bulk_desc *desc)
-{
- wait_queue_t waitlink;
- unsigned long this_idle = -1;
- unsigned long tick = 0;
- long now;
- int p_idx, g_idx;
- int i;
-
- LASSERT(desc->bd_iov_count > 0);
- LASSERT(desc->bd_iov_count <= page_pools.epp_max_pages);
-
- /* resent bulk, enc iov might have been allocated previously */
- if (desc->bd_enc_iov != NULL)
- return 0;
-
- desc->bd_enc_iov = kcalloc(desc->bd_iov_count,
- sizeof(*desc->bd_enc_iov), GFP_NOFS);
- if (desc->bd_enc_iov == NULL)
- return -ENOMEM;
-
- spin_lock(&page_pools.epp_lock);
-
- page_pools.epp_st_access++;
-again:
- if (unlikely(page_pools.epp_free_pages < desc->bd_iov_count)) {
- if (tick == 0)
- tick = cfs_time_current();
-
- now = get_seconds();
-
- page_pools.epp_st_missings++;
- page_pools.epp_pages_short += desc->bd_iov_count;
-
- if (enc_pools_should_grow(desc->bd_iov_count, now)) {
- page_pools.epp_growing = 1;
-
- spin_unlock(&page_pools.epp_lock);
- enc_pools_add_pages(page_pools.epp_pages_short / 2);
- spin_lock(&page_pools.epp_lock);
-
- page_pools.epp_growing = 0;
-
- enc_pools_wakeup();
- } else {
- if (++page_pools.epp_waitqlen >
- page_pools.epp_st_max_wqlen)
- page_pools.epp_st_max_wqlen =
- page_pools.epp_waitqlen;
-
- set_current_state(TASK_UNINTERRUPTIBLE);
- init_waitqueue_entry(&waitlink, current);
- add_wait_queue(&page_pools.epp_waitq, &waitlink);
-
- spin_unlock(&page_pools.epp_lock);
- schedule();
- remove_wait_queue(&page_pools.epp_waitq, &waitlink);
- LASSERT(page_pools.epp_waitqlen > 0);
- spin_lock(&page_pools.epp_lock);
- page_pools.epp_waitqlen--;
- }
-
- LASSERT(page_pools.epp_pages_short >= desc->bd_iov_count);
- page_pools.epp_pages_short -= desc->bd_iov_count;
-
- this_idle = 0;
- goto again;
- }
-
- /* record max wait time */
- if (unlikely(tick != 0)) {
- tick = cfs_time_current() - tick;
- if (tick > page_pools.epp_st_max_wait)
- page_pools.epp_st_max_wait = tick;
- }
-
- /* proceed with rest of allocation */
- page_pools.epp_free_pages -= desc->bd_iov_count;
-
- p_idx = page_pools.epp_free_pages / PAGES_PER_POOL;
- g_idx = page_pools.epp_free_pages % PAGES_PER_POOL;
-
- for (i = 0; i < desc->bd_iov_count; i++) {
- LASSERT(page_pools.epp_pools[p_idx][g_idx] != NULL);
- desc->bd_enc_iov[i].kiov_page =
- page_pools.epp_pools[p_idx][g_idx];
- page_pools.epp_pools[p_idx][g_idx] = NULL;
-
- if (++g_idx == PAGES_PER_POOL) {
- p_idx++;
- g_idx = 0;
- }
- }
-
- if (page_pools.epp_free_pages < page_pools.epp_st_lowfree)
- page_pools.epp_st_lowfree = page_pools.epp_free_pages;
-
- /*
- * new idle index = (old * weight + new) / (weight + 1)
- */
- if (this_idle == -1) {
- this_idle = page_pools.epp_free_pages * IDLE_IDX_MAX /
- page_pools.epp_total_pages;
- }
- page_pools.epp_idle_idx = (page_pools.epp_idle_idx * IDLE_IDX_WEIGHT +
- this_idle) /
- (IDLE_IDX_WEIGHT + 1);
-
- page_pools.epp_last_access = get_seconds();
-
- spin_unlock(&page_pools.epp_lock);
- return 0;
-}
-EXPORT_SYMBOL(sptlrpc_enc_pool_get_pages);
-
void sptlrpc_enc_pool_put_pages(struct ptlrpc_bulk_desc *desc)
{
int p_idx, g_idx;
@@ -651,41 +356,6 @@ void sptlrpc_enc_pool_put_pages(struct ptlrpc_bulk_desc *desc)
}
EXPORT_SYMBOL(sptlrpc_enc_pool_put_pages);
-/*
- * we don't do much stuff for add_user/del_user anymore, except adding some
- * initial pages in add_user() if current pools are empty, rest would be
- * handled by the pools's self-adaption.
- */
-int sptlrpc_enc_pool_add_user(void)
-{
- int need_grow = 0;
-
- spin_lock(&page_pools.epp_lock);
- if (page_pools.epp_growing == 0 && page_pools.epp_total_pages == 0) {
- page_pools.epp_growing = 1;
- need_grow = 1;
- }
- spin_unlock(&page_pools.epp_lock);
-
- if (need_grow) {
- enc_pools_add_pages(PTLRPC_MAX_BRW_PAGES +
- PTLRPC_MAX_BRW_PAGES);
-
- spin_lock(&page_pools.epp_lock);
- page_pools.epp_growing = 0;
- enc_pools_wakeup();
- spin_unlock(&page_pools.epp_lock);
- }
- return 0;
-}
-EXPORT_SYMBOL(sptlrpc_enc_pool_add_user);
-
-int sptlrpc_enc_pool_del_user(void)
-{
- return 0;
-}
-EXPORT_SYMBOL(sptlrpc_enc_pool_del_user);
-
static inline void enc_pools_alloc(void)
{
LASSERT(page_pools.epp_max_pools);
@@ -725,8 +395,8 @@ int sptlrpc_enc_pool_init(void)
page_pools.epp_growing = 0;
page_pools.epp_idle_idx = 0;
- page_pools.epp_last_shrink = get_seconds();
- page_pools.epp_last_access = get_seconds();
+ page_pools.epp_last_shrink = ktime_get_seconds();
+ page_pools.epp_last_access = ktime_get_seconds();
spin_lock_init(&page_pools.epp_lock);
page_pools.epp_total_pages = 0;
@@ -768,8 +438,7 @@ void sptlrpc_enc_pool_fini(void)
if (page_pools.epp_st_access > 0) {
CDEBUG(D_SEC,
- "max pages %lu, grows %u, grow fails %u, shrinks %u, access %lu, missing %lu, max qlen %u, max wait "
- CFS_TIME_T"/%d\n",
+ "max pages %lu, grows %u, grow fails %u, shrinks %u, access %lu, missing %lu, max qlen %u, max wait %ld/%d\n",
page_pools.epp_st_max_pages, page_pools.epp_st_grows,
page_pools.epp_st_grow_fails,
page_pools.epp_st_shrinks, page_pools.epp_st_access,
@@ -778,7 +447,6 @@ void sptlrpc_enc_pool_fini(void)
}
}
-
static int cfs_hash_alg_id[] = {
[BULK_HASH_ALG_NULL] = CFS_HASH_ALG_NULL,
[BULK_HASH_ALG_ADLER32] = CFS_HASH_ALG_ADLER32,
@@ -789,6 +457,7 @@ static int cfs_hash_alg_id[] = {
[BULK_HASH_ALG_SHA384] = CFS_HASH_ALG_SHA384,
[BULK_HASH_ALG_SHA512] = CFS_HASH_ALG_SHA512,
};
+
const char *sptlrpc_get_hash_name(__u8 hash_alg)
{
return cfs_crypto_hash_name(cfs_hash_alg_id[hash_alg]);
@@ -871,8 +540,7 @@ int sptlrpc_get_bulk_checksum(struct ptlrpc_bulk_desc *desc, __u8 alg,
memcpy(buf, hashbuf, buflen);
} else {
bufsize = buflen;
- err = cfs_crypto_hash_final(hdesc, (unsigned char *)buf,
- &bufsize);
+ err = cfs_crypto_hash_final(hdesc, buf, &bufsize);
}
if (err)
diff --git a/drivers/staging/lustre/lustre/ptlrpc/sec_config.c b/drivers/staging/lustre/lustre/ptlrpc/sec_config.c
index e7f2f3332..7ff948fe1 100644
--- a/drivers/staging/lustre/lustre/ptlrpc/sec_config.c
+++ b/drivers/staging/lustre/lustre/ptlrpc/sec_config.c
@@ -48,27 +48,6 @@
#include "ptlrpc_internal.h"
-const char *sptlrpc_part2name(enum lustre_sec_part part)
-{
- switch (part) {
- case LUSTRE_SP_CLI:
- return "cli";
- case LUSTRE_SP_MDT:
- return "mdt";
- case LUSTRE_SP_OST:
- return "ost";
- case LUSTRE_SP_MGC:
- return "mgc";
- case LUSTRE_SP_MGS:
- return "mgs";
- case LUSTRE_SP_ANY:
- return "any";
- default:
- return "err";
- }
-}
-EXPORT_SYMBOL(sptlrpc_part2name);
-
enum lustre_sec_part sptlrpc_target_sec_part(struct obd_device *obd)
{
const char *type = obd->obd_type->typ_name;
@@ -180,7 +159,7 @@ static void sptlrpc_rule_init(struct sptlrpc_rule *rule)
/*
* format: network[.direction]=flavor
*/
-int sptlrpc_parse_rule(char *param, struct sptlrpc_rule *rule)
+static int sptlrpc_parse_rule(char *param, struct sptlrpc_rule *rule)
{
char *flavor, *dir;
int rc;
@@ -234,9 +213,8 @@ int sptlrpc_parse_rule(char *param, struct sptlrpc_rule *rule)
return 0;
}
-EXPORT_SYMBOL(sptlrpc_parse_rule);
-void sptlrpc_rule_set_free(struct sptlrpc_rule_set *rset)
+static void sptlrpc_rule_set_free(struct sptlrpc_rule_set *rset)
{
LASSERT(rset->srs_nslot ||
(rset->srs_nrule == 0 && rset->srs_rules == NULL));
@@ -246,12 +224,11 @@ void sptlrpc_rule_set_free(struct sptlrpc_rule_set *rset)
sptlrpc_rule_set_init(rset);
}
}
-EXPORT_SYMBOL(sptlrpc_rule_set_free);
/*
* return 0 if the rule set could accommodate one more rule.
*/
-int sptlrpc_rule_set_expand(struct sptlrpc_rule_set *rset)
+static int sptlrpc_rule_set_expand(struct sptlrpc_rule_set *rset)
{
struct sptlrpc_rule *rules;
int nslot;
@@ -280,22 +257,24 @@ int sptlrpc_rule_set_expand(struct sptlrpc_rule_set *rset)
rset->srs_nslot = nslot;
return 0;
}
-EXPORT_SYMBOL(sptlrpc_rule_set_expand);
static inline int rule_spec_dir(struct sptlrpc_rule *rule)
{
return (rule->sr_from != LUSTRE_SP_ANY ||
rule->sr_to != LUSTRE_SP_ANY);
}
+
static inline int rule_spec_net(struct sptlrpc_rule *rule)
{
return (rule->sr_netid != LNET_NIDNET(LNET_NID_ANY));
}
+
static inline int rule_match_dir(struct sptlrpc_rule *r1,
struct sptlrpc_rule *r2)
{
return (r1->sr_from == r2->sr_from && r1->sr_to == r2->sr_to);
}
+
static inline int rule_match_net(struct sptlrpc_rule *r1,
struct sptlrpc_rule *r2)
{
@@ -306,8 +285,8 @@ static inline int rule_match_net(struct sptlrpc_rule *r1,
* merge @rule into @rset.
* the @rset slots might be expanded.
*/
-int sptlrpc_rule_set_merge(struct sptlrpc_rule_set *rset,
- struct sptlrpc_rule *rule)
+static int sptlrpc_rule_set_merge(struct sptlrpc_rule_set *rset,
+ struct sptlrpc_rule *rule)
{
struct sptlrpc_rule *p = rset->srs_rules;
int spec_dir, spec_net;
@@ -391,17 +370,16 @@ int sptlrpc_rule_set_merge(struct sptlrpc_rule_set *rset,
return 0;
}
-EXPORT_SYMBOL(sptlrpc_rule_set_merge);
/**
* given from/to/nid, determine a matching flavor in ruleset.
* return 1 if a match found, otherwise return 0.
*/
-int sptlrpc_rule_set_choose(struct sptlrpc_rule_set *rset,
- enum lustre_sec_part from,
- enum lustre_sec_part to,
- lnet_nid_t nid,
- struct sptlrpc_flavor *sf)
+static int sptlrpc_rule_set_choose(struct sptlrpc_rule_set *rset,
+ enum lustre_sec_part from,
+ enum lustre_sec_part to,
+ lnet_nid_t nid,
+ struct sptlrpc_flavor *sf)
{
struct sptlrpc_rule *r;
int n;
@@ -428,20 +406,6 @@ int sptlrpc_rule_set_choose(struct sptlrpc_rule_set *rset,
return 0;
}
-EXPORT_SYMBOL(sptlrpc_rule_set_choose);
-
-void sptlrpc_rule_set_dump(struct sptlrpc_rule_set *rset)
-{
- struct sptlrpc_rule *r;
- int n;
-
- for (n = 0; n < rset->srs_nrule; n++) {
- r = &rset->srs_rules[n];
- CDEBUG(D_SEC, "<%02d> from %x to %x, net %x, rpc %x\n", n,
- r->sr_from, r->sr_to, r->sr_netid, r->sr_flvr.sf_rpc);
- }
-}
-EXPORT_SYMBOL(sptlrpc_rule_set_dump);
/**********************************
* sptlrpc configuration support *
@@ -836,20 +800,6 @@ out:
flavor_set_flags(sf, from, to, 1);
}
-/**
- * called by target devices, determine the expected flavor from
- * certain peer (from, nid).
- */
-void sptlrpc_target_choose_flavor(struct sptlrpc_rule_set *rset,
- enum lustre_sec_part from,
- lnet_nid_t nid,
- struct sptlrpc_flavor *sf)
-{
- if (sptlrpc_rule_set_choose(rset, from, LUSTRE_SP_ANY, nid, sf) == 0)
- get_default_flavor(sf);
-}
-EXPORT_SYMBOL(sptlrpc_target_choose_flavor);
-
#define SEC_ADAPT_DELAY (10)
/**
@@ -871,7 +821,7 @@ void sptlrpc_conf_client_adapt(struct obd_device *obd)
if (imp) {
spin_lock(&imp->imp_lock);
if (imp->imp_sec)
- imp->imp_sec_expire = get_seconds() +
+ imp->imp_sec_expire = ktime_get_real_seconds() +
SEC_ADAPT_DELAY;
spin_unlock(&imp->imp_lock);
}
diff --git a/drivers/staging/lustre/lustre/ptlrpc/sec_gc.c b/drivers/staging/lustre/lustre/ptlrpc/sec_gc.c
index cdad608bd..6e58d5f95 100644
--- a/drivers/staging/lustre/lustre/ptlrpc/sec_gc.c
+++ b/drivers/staging/lustre/lustre/ptlrpc/sec_gc.c
@@ -51,7 +51,6 @@
#define SEC_GC_INTERVAL (30 * 60)
-
static struct mutex sec_gc_mutex;
static LIST_HEAD(sec_gc_list);
static spinlock_t sec_gc_list_lock;
@@ -62,14 +61,13 @@ static spinlock_t sec_gc_ctx_list_lock;
static struct ptlrpc_thread sec_gc_thread;
static atomic_t sec_gc_wait_del = ATOMIC_INIT(0);
-
void sptlrpc_gc_add_sec(struct ptlrpc_sec *sec)
{
LASSERT(sec->ps_policy->sp_cops->gc_ctx);
LASSERT(sec->ps_gc_interval > 0);
LASSERT(list_empty(&sec->ps_gc_list));
- sec->ps_gc_next = get_seconds() + sec->ps_gc_interval;
+ sec->ps_gc_next = ktime_get_real_seconds() + sec->ps_gc_interval;
spin_lock(&sec_gc_list_lock);
list_add_tail(&sec_gc_list, &sec->ps_gc_list);
@@ -103,21 +101,6 @@ void sptlrpc_gc_del_sec(struct ptlrpc_sec *sec)
}
EXPORT_SYMBOL(sptlrpc_gc_del_sec);
-void sptlrpc_gc_add_ctx(struct ptlrpc_cli_ctx *ctx)
-{
- LASSERT(list_empty(&ctx->cc_gc_chain));
-
- CDEBUG(D_SEC, "hand over ctx %p(%u->%s)\n",
- ctx, ctx->cc_vcred.vc_uid, sec2target_str(ctx->cc_sec));
- spin_lock(&sec_gc_ctx_list_lock);
- list_add(&ctx->cc_gc_chain, &sec_gc_ctx_list);
- spin_unlock(&sec_gc_ctx_list_lock);
-
- thread_add_flags(&sec_gc_thread, SVC_SIGNAL);
- wake_up(&sec_gc_thread.t_ctl_waitq);
-}
-EXPORT_SYMBOL(sptlrpc_gc_add_ctx);
-
static void sec_process_ctx_list(void)
{
struct ptlrpc_cli_ctx *ctx;
@@ -154,16 +137,16 @@ static void sec_do_gc(struct ptlrpc_sec *sec)
CDEBUG(D_SEC, "check on sec %p(%s)\n", sec, sec->ps_policy->sp_name);
- if (cfs_time_after(sec->ps_gc_next, get_seconds()))
+ if (sec->ps_gc_next > ktime_get_real_seconds())
return;
sec->ps_policy->sp_cops->gc_ctx(sec);
- sec->ps_gc_next = get_seconds() + sec->ps_gc_interval;
+ sec->ps_gc_next = ktime_get_real_seconds() + sec->ps_gc_interval;
}
static int sec_gc_main(void *arg)
{
- struct ptlrpc_thread *thread = (struct ptlrpc_thread *) arg;
+ struct ptlrpc_thread *thread = arg;
struct l_wait_info lwi;
unshare_fs_struct();
diff --git a/drivers/staging/lustre/lustre/ptlrpc/sec_lproc.c b/drivers/staging/lustre/lustre/ptlrpc/sec_lproc.c
index 68fcac14b..bda9a77af 100644
--- a/drivers/staging/lustre/lustre/ptlrpc/sec_lproc.c
+++ b/drivers/staging/lustre/lustre/ptlrpc/sec_lproc.c
@@ -98,14 +98,15 @@ static int sptlrpc_info_lprocfs_seq_show(struct seq_file *seq, void *v)
atomic_read(&sec->ps_refcount));
seq_printf(seq, "nctx: %d\n", atomic_read(&sec->ps_nctx));
seq_printf(seq, "gc internal %ld\n", sec->ps_gc_interval);
- seq_printf(seq, "gc next %ld\n",
+ seq_printf(seq, "gc next %lld\n",
sec->ps_gc_interval ?
- sec->ps_gc_next - get_seconds() : 0);
+ (s64)(sec->ps_gc_next - ktime_get_real_seconds()) : 0ll);
sptlrpc_sec_put(sec);
out:
return 0;
}
+
LPROC_SEQ_FOPS_RO(sptlrpc_info_lprocfs);
static int sptlrpc_ctxs_lprocfs_seq_show(struct seq_file *seq, void *v)
@@ -130,6 +131,7 @@ static int sptlrpc_ctxs_lprocfs_seq_show(struct seq_file *seq, void *v)
out:
return 0;
}
+
LPROC_SEQ_FOPS_RO(sptlrpc_ctxs_lprocfs);
int sptlrpc_lprocfs_cliobd_attach(struct obd_device *dev)
diff --git a/drivers/staging/lustre/lustre/ptlrpc/sec_null.c b/drivers/staging/lustre/lustre/ptlrpc/sec_null.c
index ce1c563d0..ebfa6092b 100644
--- a/drivers/staging/lustre/lustre/ptlrpc/sec_null.c
+++ b/drivers/staging/lustre/lustre/ptlrpc/sec_null.c
@@ -40,13 +40,14 @@
#define DEBUG_SUBSYSTEM S_SEC
-
#include "../include/obd_support.h"
#include "../include/obd_cksum.h"
#include "../include/obd_class.h"
#include "../include/lustre_net.h"
#include "../include/lustre_sec.h"
+#include "ptlrpc_internal.h"
+
static struct ptlrpc_sec_policy null_policy;
static struct ptlrpc_sec null_sec;
static struct ptlrpc_cli_ctx null_cli_ctx;
@@ -82,6 +83,7 @@ int null_ctx_sign(struct ptlrpc_cli_ctx *ctx, struct ptlrpc_request *req)
if (!req->rq_import->imp_dlm_fake) {
struct obd_device *obd = req->rq_import->imp_obd;
+
null_encode_sec_part(req->rq_reqbuf,
obd->u.cli.cl_sp_me);
}
diff --git a/drivers/staging/lustre/lustre/ptlrpc/sec_plain.c b/drivers/staging/lustre/lustre/ptlrpc/sec_plain.c
index a243db60f..f448b4567 100644
--- a/drivers/staging/lustre/lustre/ptlrpc/sec_plain.c
+++ b/drivers/staging/lustre/lustre/ptlrpc/sec_plain.c
@@ -40,12 +40,12 @@
#define DEBUG_SUBSYSTEM S_SEC
-
#include "../include/obd_support.h"
#include "../include/obd_cksum.h"
#include "../include/obd_class.h"
#include "../include/lustre_net.h"
#include "../include/lustre_sec.h"
+#include "ptlrpc_internal.h"
struct plain_sec {
struct ptlrpc_sec pls_base;
diff --git a/drivers/staging/lustre/lustre/ptlrpc/service.c b/drivers/staging/lustre/lustre/ptlrpc/service.c
index 003344ccf..f45898f17 100644
--- a/drivers/staging/lustre/lustre/ptlrpc/service.c
+++ b/drivers/staging/lustre/lustre/ptlrpc/service.c
@@ -58,7 +58,6 @@ MODULE_PARM_DESC(at_early_margin, "How soon before an RPC deadline to send an ea
module_param(at_extra, int, 0644);
MODULE_PARM_DESC(at_extra, "How much extra time to give with each early reply");
-
/* forward ref */
static int ptlrpc_server_post_idle_rqbds(struct ptlrpc_service_part *svcpt);
static void ptlrpc_server_hpreq_fini(struct ptlrpc_request *req);
@@ -86,8 +85,10 @@ ptlrpc_alloc_rqbd(struct ptlrpc_service_part *svcpt)
rqbd->rqbd_cbid.cbid_fn = request_in_callback;
rqbd->rqbd_cbid.cbid_arg = rqbd;
INIT_LIST_HEAD(&rqbd->rqbd_reqs);
- OBD_CPT_ALLOC_LARGE(rqbd->rqbd_buffer, svc->srv_cptable,
- svcpt->scp_cpt, svc->srv_buf_size);
+ rqbd->rqbd_buffer = libcfs_kvzalloc_cpt(svc->srv_cptable,
+ svcpt->scp_cpt,
+ svc->srv_buf_size,
+ GFP_KERNEL);
if (rqbd->rqbd_buffer == NULL) {
kfree(rqbd);
return NULL;
@@ -141,7 +142,6 @@ ptlrpc_grow_req_bufs(struct ptlrpc_service_part *svcpt, int post)
svcpt->scp_rqbd_allocating++;
spin_unlock(&svcpt->scp_lock);
-
for (i = 0; i < svc->srv_nbuf_per_group; i++) {
/* NB: another thread might have recycled enough rqbds, we
* need to make sure it wouldn't over-allocate, see LU-1212. */
@@ -177,33 +177,6 @@ ptlrpc_grow_req_bufs(struct ptlrpc_service_part *svcpt, int post)
return rc;
}
-/**
- * Part of Rep-Ack logic.
- * Puts a lock and its mode into reply state associated to request reply.
- */
-void
-ptlrpc_save_lock(struct ptlrpc_request *req,
- struct lustre_handle *lock, int mode, int no_ack)
-{
- struct ptlrpc_reply_state *rs = req->rq_reply_state;
- int idx;
-
- LASSERT(rs != NULL);
- LASSERT(rs->rs_nlocks < RS_MAX_LOCKS);
-
- if (req->rq_export->exp_disconnected) {
- ldlm_lock_decref(lock, mode);
- } else {
- idx = rs->rs_nlocks++;
- rs->rs_locks[idx] = *lock;
- rs->rs_modes[idx] = mode;
- rs->rs_difficult = 1;
- rs->rs_no_ack = !!no_ack;
- }
-}
-EXPORT_SYMBOL(ptlrpc_save_lock);
-
-
struct ptlrpc_hr_partition;
struct ptlrpc_hr_thread {
@@ -244,32 +217,10 @@ struct ptlrpc_hr_service {
struct ptlrpc_hr_partition **hr_partitions;
};
-struct rs_batch {
- struct list_head rsb_replies;
- unsigned int rsb_n_replies;
- struct ptlrpc_service_part *rsb_svcpt;
-};
-
/** reply handling service. */
static struct ptlrpc_hr_service ptlrpc_hr;
/**
- * maximum number of replies scheduled in one batch
- */
-#define MAX_SCHEDULED 256
-
-/**
- * Initialize a reply batch.
- *
- * \param b batch
- */
-static void rs_batch_init(struct rs_batch *b)
-{
- memset(b, 0, sizeof(*b));
- INIT_LIST_HEAD(&b->rsb_replies);
-}
-
-/**
* Choose an hr thread to dispatch requests to.
*/
static struct ptlrpc_hr_thread *
@@ -295,76 +246,6 @@ ptlrpc_hr_select(struct ptlrpc_service_part *svcpt)
}
/**
- * Dispatch all replies accumulated in the batch to one from
- * dedicated reply handling threads.
- *
- * \param b batch
- */
-static void rs_batch_dispatch(struct rs_batch *b)
-{
- if (b->rsb_n_replies != 0) {
- struct ptlrpc_hr_thread *hrt;
-
- hrt = ptlrpc_hr_select(b->rsb_svcpt);
-
- spin_lock(&hrt->hrt_lock);
- list_splice_init(&b->rsb_replies, &hrt->hrt_queue);
- spin_unlock(&hrt->hrt_lock);
-
- wake_up(&hrt->hrt_waitq);
- b->rsb_n_replies = 0;
- }
-}
-
-/**
- * Add a reply to a batch.
- * Add one reply object to a batch, schedule batched replies if overload.
- *
- * \param b batch
- * \param rs reply
- */
-static void rs_batch_add(struct rs_batch *b, struct ptlrpc_reply_state *rs)
-{
- struct ptlrpc_service_part *svcpt = rs->rs_svcpt;
-
- if (svcpt != b->rsb_svcpt || b->rsb_n_replies >= MAX_SCHEDULED) {
- if (b->rsb_svcpt != NULL) {
- rs_batch_dispatch(b);
- spin_unlock(&b->rsb_svcpt->scp_rep_lock);
- }
- spin_lock(&svcpt->scp_rep_lock);
- b->rsb_svcpt = svcpt;
- }
- spin_lock(&rs->rs_lock);
- rs->rs_scheduled_ever = 1;
- if (rs->rs_scheduled == 0) {
- list_move(&rs->rs_list, &b->rsb_replies);
- rs->rs_scheduled = 1;
- b->rsb_n_replies++;
- }
- rs->rs_committed = 1;
- spin_unlock(&rs->rs_lock);
-}
-
-/**
- * Reply batch finalization.
- * Dispatch remaining replies from the batch
- * and release remaining spinlock.
- *
- * \param b batch
- */
-static void rs_batch_fini(struct rs_batch *b)
-{
- if (b->rsb_svcpt != NULL) {
- rs_batch_dispatch(b);
- spin_unlock(&b->rsb_svcpt->scp_rep_lock);
- }
-}
-
-#define DECLARE_RS_BATCH(b) struct rs_batch b
-
-
-/**
* Put reply state into a queue for processing because we received
* ACK from the client
*/
@@ -401,32 +282,6 @@ ptlrpc_schedule_difficult_reply(struct ptlrpc_reply_state *rs)
}
EXPORT_SYMBOL(ptlrpc_schedule_difficult_reply);
-void ptlrpc_commit_replies(struct obd_export *exp)
-{
- struct ptlrpc_reply_state *rs, *nxt;
- DECLARE_RS_BATCH(batch);
-
- rs_batch_init(&batch);
- /* Find any replies that have been committed and get their service
- * to attend to complete them. */
-
- /* CAVEAT EMPTOR: spinlock ordering!!! */
- spin_lock(&exp->exp_uncommitted_replies_lock);
- list_for_each_entry_safe(rs, nxt, &exp->exp_uncommitted_replies,
- rs_obd_list) {
- LASSERT(rs->rs_difficult);
- /* VBR: per-export last_committed */
- LASSERT(rs->rs_export);
- if (rs->rs_transno <= exp->exp_last_committed) {
- list_del_init(&rs->rs_obd_list);
- rs_batch_add(&batch, rs);
- }
- }
- spin_unlock(&exp->exp_uncommitted_replies_lock);
- rs_batch_fini(&batch);
-}
-EXPORT_SYMBOL(ptlrpc_commit_replies);
-
static int
ptlrpc_server_post_idle_rqbds(struct ptlrpc_service_part *svcpt)
{
@@ -647,7 +502,9 @@ ptlrpc_service_part_init(struct ptlrpc_service *svc,
if (array->paa_reqs_count == NULL)
goto free_reqs_array;
- cfs_timer_init(&svcpt->scp_at_timer, ptlrpc_at_timer, svcpt);
+ setup_timer(&svcpt->scp_at_timer, ptlrpc_at_timer,
+ (unsigned long)svcpt);
+
/* At SOW, service time should be quick; 10s seems generous. If client
* timeout is less than this, we'll be sending an early reply. */
at_init(&svcpt->scp_at_estimate, 10, 0);
@@ -856,7 +713,7 @@ static void ptlrpc_server_free_request(struct ptlrpc_request *req)
* drop a reference count of the request. if it reaches 0, we either
* put it into history list, or free it immediately.
*/
-void ptlrpc_server_drop_request(struct ptlrpc_request *req)
+static void ptlrpc_server_drop_request(struct ptlrpc_request *req)
{
struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd;
struct ptlrpc_service_part *svcpt = rqbd->rqbd_svcpt;
@@ -960,35 +817,6 @@ void ptlrpc_server_drop_request(struct ptlrpc_request *req)
}
}
-/** Change request export and move hp request from old export to new */
-void ptlrpc_request_change_export(struct ptlrpc_request *req,
- struct obd_export *export)
-{
- if (req->rq_export != NULL) {
- if (!list_empty(&req->rq_exp_list)) {
- /* remove rq_exp_list from last export */
- spin_lock_bh(&req->rq_export->exp_rpc_lock);
- list_del_init(&req->rq_exp_list);
- spin_unlock_bh(&req->rq_export->exp_rpc_lock);
-
- /* export has one reference already, so it`s safe to
- * add req to export queue here and get another
- * reference for request later */
- spin_lock_bh(&export->exp_rpc_lock);
- list_add(&req->rq_exp_list, &export->exp_hp_rpcs);
- spin_unlock_bh(&export->exp_rpc_lock);
- }
- class_export_rpc_dec(req->rq_export);
- class_export_put(req->rq_export);
- }
-
- /* request takes one export refcount */
- req->rq_export = class_export_get(export);
- class_export_rpc_inc(export);
-
- return;
-}
-
/**
* to finish a request: stop sending more early replies, and release
* the request.
@@ -1025,82 +853,6 @@ static void ptlrpc_server_finish_active_request(
}
/**
- * This function makes sure dead exports are evicted in a timely manner.
- * This function is only called when some export receives a message (i.e.,
- * the network is up.)
- */
-static void ptlrpc_update_export_timer(struct obd_export *exp, long extra_delay)
-{
- struct obd_export *oldest_exp;
- time_t oldest_time, new_time;
-
- LASSERT(exp);
-
- /* Compensate for slow machines, etc, by faking our request time
- into the future. Although this can break the strict time-ordering
- of the list, we can be really lazy here - we don't have to evict
- at the exact right moment. Eventually, all silent exports
- will make it to the top of the list. */
-
- /* Do not pay attention on 1sec or smaller renewals. */
- new_time = get_seconds() + extra_delay;
- if (exp->exp_last_request_time + 1 /*second */ >= new_time)
- return;
-
- exp->exp_last_request_time = new_time;
-
- /* exports may get disconnected from the chain even though the
- export has references, so we must keep the spin lock while
- manipulating the lists */
- spin_lock(&exp->exp_obd->obd_dev_lock);
-
- if (list_empty(&exp->exp_obd_chain_timed)) {
- /* this one is not timed */
- spin_unlock(&exp->exp_obd->obd_dev_lock);
- return;
- }
-
- list_move_tail(&exp->exp_obd_chain_timed,
- &exp->exp_obd->obd_exports_timed);
-
- oldest_exp = list_entry(exp->exp_obd->obd_exports_timed.next,
- struct obd_export, exp_obd_chain_timed);
- oldest_time = oldest_exp->exp_last_request_time;
- spin_unlock(&exp->exp_obd->obd_dev_lock);
-
- if (exp->exp_obd->obd_recovering) {
- /* be nice to everyone during recovery */
- return;
- }
-
- /* Note - racing to start/reset the obd_eviction timer is safe */
- if (exp->exp_obd->obd_eviction_timer == 0) {
- /* Check if the oldest entry is expired. */
- if (get_seconds() > (oldest_time + PING_EVICT_TIMEOUT +
- extra_delay)) {
- /* We need a second timer, in case the net was down and
- * it just came back. Since the pinger may skip every
- * other PING_INTERVAL (see note in ptlrpc_pinger_main),
- * we better wait for 3. */
- exp->exp_obd->obd_eviction_timer =
- get_seconds() + 3 * PING_INTERVAL;
- CDEBUG(D_HA, "%s: Think about evicting %s from "CFS_TIME_T"\n",
- exp->exp_obd->obd_name,
- obd_export_nid2str(oldest_exp), oldest_time);
- }
- } else {
- if (get_seconds() >
- (exp->exp_obd->obd_eviction_timer + extra_delay)) {
- /* The evictor won't evict anyone who we've heard from
- * recently, so we don't have to check before we start
- * it. */
- if (!ping_evictor_wake(exp))
- exp->exp_obd->obd_eviction_timer = 0;
- }
- }
-}
-
-/**
* Sanity check request \a req.
* Return 0 if all is ok, error code otherwise.
*/
@@ -1126,18 +878,16 @@ static int ptlrpc_check_req(struct ptlrpc_request *req)
req, (obd != NULL) ? obd->obd_name : "unknown");
rc = -ENODEV;
} else if (lustre_msg_get_flags(req->rq_reqmsg) &
- (MSG_REPLAY | MSG_REQ_REPLAY_DONE) &&
- !obd->obd_recovering) {
- DEBUG_REQ(D_ERROR, req,
- "Invalid replay without recovery");
- class_fail_export(req->rq_export);
- rc = -ENODEV;
- } else if (lustre_msg_get_transno(req->rq_reqmsg) != 0 &&
- !obd->obd_recovering) {
- DEBUG_REQ(D_ERROR, req, "Invalid req with transno %llu without recovery",
- lustre_msg_get_transno(req->rq_reqmsg));
- class_fail_export(req->rq_export);
- rc = -ENODEV;
+ (MSG_REPLAY | MSG_REQ_REPLAY_DONE)) {
+ DEBUG_REQ(D_ERROR, req, "Invalid replay without recovery");
+ class_fail_export(req->rq_export);
+ rc = -ENODEV;
+ } else if (lustre_msg_get_transno(req->rq_reqmsg) != 0) {
+ DEBUG_REQ(D_ERROR, req,
+ "Invalid req with transno %llu without recovery",
+ lustre_msg_get_transno(req->rq_reqmsg));
+ class_fail_export(req->rq_export);
+ rc = -ENODEV;
}
if (unlikely(rc < 0)) {
@@ -1153,17 +903,17 @@ static void ptlrpc_at_set_timer(struct ptlrpc_service_part *svcpt)
__s32 next;
if (array->paa_count == 0) {
- cfs_timer_disarm(&svcpt->scp_at_timer);
+ del_timer(&svcpt->scp_at_timer);
return;
}
/* Set timer for closest deadline */
- next = (__s32)(array->paa_deadline - get_seconds() -
+ next = (__s32)(array->paa_deadline - ktime_get_real_seconds() -
at_early_margin);
if (next <= 0) {
ptlrpc_at_timer((unsigned long)svcpt);
} else {
- cfs_timer_arm(&svcpt->scp_at_timer, cfs_time_shift(next));
+ mod_timer(&svcpt->scp_at_timer, cfs_time_shift(next));
CDEBUG(D_INFO, "armed %s at %+ds\n",
svcpt->scp_service->srv_name, next);
}
@@ -1189,7 +939,7 @@ static int ptlrpc_at_add_timed(struct ptlrpc_request *req)
spin_lock(&svcpt->scp_at_lock);
LASSERT(list_empty(&req->rq_timed_list));
- index = (unsigned long)req->rq_deadline % array->paa_size;
+ div_u64_rem(req->rq_deadline, array->paa_size, &index);
if (array->paa_reqs_count[index] > 0) {
/* latest rpcs will have the latest deadlines in the list,
* so search backward. */
@@ -1248,8 +998,8 @@ static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req)
struct ptlrpc_service_part *svcpt = req->rq_rqbd->rqbd_svcpt;
struct ptlrpc_request *reqcopy;
struct lustre_msg *reqmsg;
- long olddl = req->rq_deadline - get_seconds();
- time_t newdl;
+ long olddl = req->rq_deadline - ktime_get_real_seconds();
+ time64_t newdl;
int rc;
/* deadline is when the client expects us to reply, margin is the
@@ -1276,36 +1026,22 @@ static int ptlrpc_at_send_early_reply(struct ptlrpc_request *req)
return -ENOSYS;
}
- if (req->rq_export &&
- lustre_msg_get_flags(req->rq_reqmsg) &
- (MSG_REPLAY | MSG_REQ_REPLAY_DONE | MSG_LOCK_REPLAY_DONE)) {
- /* During recovery, we don't want to send too many early
- * replies, but on the other hand we want to make sure the
- * client has enough time to resend if the rpc is lost. So
- * during the recovery period send at least 4 early replies,
- * spacing them every at_extra if we can. at_estimate should
- * always equal this fixed value during recovery. */
- at_measured(&svcpt->scp_at_estimate, min(at_extra,
- req->rq_export->exp_obd->obd_recovery_timeout / 4));
- } else {
- /* Fake our processing time into the future to ask the clients
- * for some extra amount of time */
- at_measured(&svcpt->scp_at_estimate, at_extra +
- get_seconds() -
- req->rq_arrival_time.tv_sec);
-
- /* Check to see if we've actually increased the deadline -
- * we may be past adaptive_max */
- if (req->rq_deadline >= req->rq_arrival_time.tv_sec +
- at_get(&svcpt->scp_at_estimate)) {
- DEBUG_REQ(D_WARNING, req, "Couldn't add any time (%ld/%ld), not sending early reply\n",
- olddl, req->rq_arrival_time.tv_sec +
- at_get(&svcpt->scp_at_estimate) -
- get_seconds());
- return -ETIMEDOUT;
- }
+ /* Fake our processing time into the future to ask the clients
+ * for some extra amount of time */
+ at_measured(&svcpt->scp_at_estimate, at_extra +
+ ktime_get_real_seconds() - req->rq_arrival_time.tv_sec);
+
+ /* Check to see if we've actually increased the deadline -
+ * we may be past adaptive_max */
+ if (req->rq_deadline >= req->rq_arrival_time.tv_sec +
+ at_get(&svcpt->scp_at_estimate)) {
+ DEBUG_REQ(D_WARNING, req, "Couldn't add any time (%ld/%lld), not sending early reply\n",
+ olddl, req->rq_arrival_time.tv_sec +
+ at_get(&svcpt->scp_at_estimate) -
+ ktime_get_real_seconds());
+ return -ETIMEDOUT;
}
- newdl = get_seconds() + at_get(&svcpt->scp_at_estimate);
+ newdl = ktime_get_real_seconds() + at_get(&svcpt->scp_at_estimate);
reqcopy = ptlrpc_request_cache_alloc(GFP_NOFS);
if (reqcopy == NULL)
@@ -1388,8 +1124,8 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service_part *svcpt)
struct ptlrpc_request *rq, *n;
struct list_head work_list;
__u32 index, count;
- time_t deadline;
- time_t now = get_seconds();
+ time64_t deadline;
+ time64_t now = ktime_get_real_seconds();
long delay;
int first, counter = 0;
@@ -1419,7 +1155,7 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service_part *svcpt)
server will take. Send early replies to everyone expiring soon. */
INIT_LIST_HEAD(&work_list);
deadline = -1;
- index = (unsigned long)array->paa_deadline % array->paa_size;
+ div_u64_rem(array->paa_deadline, array->paa_size, &index);
count = array->paa_count;
while (count > 0) {
count -= array->paa_reqs_count[index];
@@ -1461,7 +1197,7 @@ static int ptlrpc_at_check_timed(struct ptlrpc_service_part *svcpt)
chance to send early replies */
LCONSOLE_WARN("%s: This server is not able to keep up with request traffic (cpu-bound).\n",
svcpt->scp_service->srv_name);
- CWARN("earlyQ=%d reqQ=%d recA=%d, svcEst=%d, delay=" CFS_DURATION_T "(jiff)\n",
+ CWARN("earlyQ=%d reqQ=%d recA=%d, svcEst=%d, delay=%ld(jiff)\n",
counter, svcpt->scp_nreqs_incoming,
svcpt->scp_nreqs_active,
at_get(&svcpt->scp_at_estimate), delay);
@@ -1546,30 +1282,6 @@ static void ptlrpc_server_hpreq_fini(struct ptlrpc_request *req)
}
}
-static int ptlrpc_hpreq_check(struct ptlrpc_request *req)
-{
- return 1;
-}
-
-static struct ptlrpc_hpreq_ops ptlrpc_hpreq_common = {
- .hpreq_check = ptlrpc_hpreq_check,
-};
-
-/* Hi-Priority RPC check by RPC operation code. */
-int ptlrpc_hpreq_handler(struct ptlrpc_request *req)
-{
- int opc = lustre_msg_get_opc(req->rq_reqmsg);
-
- /* Check for export to let only reconnects for not yet evicted
- * export to become a HP rpc. */
- if ((req->rq_export != NULL) &&
- (opc == OBD_PING || opc == MDS_CONNECT || opc == OST_CONNECT))
- req->rq_ops = &ptlrpc_hpreq_common;
-
- return 0;
-}
-EXPORT_SYMBOL(ptlrpc_hpreq_handler);
-
static int ptlrpc_server_request_add(struct ptlrpc_service_part *svcpt,
struct ptlrpc_request *req)
{
@@ -1638,6 +1350,7 @@ static bool ptlrpc_server_allow_normal(struct ptlrpc_service_part *svcpt,
bool force)
{
int running = svcpt->scp_nthrs_running;
+
if (unlikely(svcpt->scp_service->srv_req_portal == MDS_REQUEST_PORTAL &&
CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CANCEL_RESEND))) {
/* leave just 1 thread for normal RPCs */
@@ -1828,14 +1541,13 @@ ptlrpc_server_handle_req_in(struct ptlrpc_service_part *svcpt,
if (rc)
goto err_req;
- ptlrpc_update_export_timer(req->rq_export, 0);
}
/* req_in handling should/must be fast */
- if (get_seconds() - req->rq_arrival_time.tv_sec > 5)
+ if (ktime_get_real_seconds() - req->rq_arrival_time.tv_sec > 5)
DEBUG_REQ(D_WARNING, req, "Slow req_in handling "CFS_DURATION_T"s",
- cfs_time_sub(get_seconds(),
- req->rq_arrival_time.tv_sec));
+ (long)(ktime_get_real_seconds() -
+ req->rq_arrival_time.tv_sec));
/* Set rpc server deadline and add it to the timed list */
deadline = (lustre_msghdr_get_flags(req->rq_reqmsg) &
@@ -1876,9 +1588,12 @@ ptlrpc_server_handle_request(struct ptlrpc_service_part *svcpt,
{
struct ptlrpc_service *svc = svcpt->scp_service;
struct ptlrpc_request *request;
- struct timeval work_start;
- struct timeval work_end;
- long timediff;
+ struct timespec64 work_start;
+ struct timespec64 work_end;
+ struct timespec64 timediff;
+ struct timespec64 arrived;
+ unsigned long timediff_usecs;
+ unsigned long arrived_usecs;
int rc;
int fail_opc = 0;
@@ -1901,12 +1616,13 @@ ptlrpc_server_handle_request(struct ptlrpc_service_part *svcpt,
if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_DUMP_LOG))
libcfs_debug_dumplog();
- do_gettimeofday(&work_start);
- timediff = cfs_timeval_sub(&work_start, &request->rq_arrival_time,
- NULL);
+ ktime_get_real_ts64(&work_start);
+ timediff = timespec64_sub(work_start, request->rq_arrival_time);
+ timediff_usecs = timediff.tv_sec * USEC_PER_SEC +
+ timediff.tv_nsec / NSEC_PER_USEC;
if (likely(svc->srv_stats != NULL)) {
lprocfs_counter_add(svc->srv_stats, PTLRPC_REQWAIT_CNTR,
- timediff);
+ timediff_usecs);
lprocfs_counter_add(svc->srv_stats, PTLRPC_REQQDEPTH_CNTR,
svcpt->scp_nreqs_incoming);
lprocfs_counter_add(svc->srv_stats, PTLRPC_REQACTIVE_CNTR,
@@ -1933,18 +1649,17 @@ ptlrpc_server_handle_request(struct ptlrpc_service_part *svcpt,
if (likely(request->rq_export)) {
if (unlikely(ptlrpc_check_req(request)))
goto put_conn;
- ptlrpc_update_export_timer(request->rq_export, timediff >> 19);
}
/* Discard requests queued for longer than the deadline.
The deadline is increased if we send an early reply. */
- if (get_seconds() > request->rq_deadline) {
+ if (ktime_get_real_seconds() > request->rq_deadline) {
DEBUG_REQ(D_ERROR, request, "Dropping timed-out request from %s: deadline " CFS_DURATION_T ":" CFS_DURATION_T "s ago\n",
libcfs_id2str(request->rq_peer),
- cfs_time_sub(request->rq_deadline,
- request->rq_arrival_time.tv_sec),
- cfs_time_sub(get_seconds(),
- request->rq_deadline));
+ (long)(request->rq_deadline -
+ request->rq_arrival_time.tv_sec),
+ (long)(ktime_get_real_seconds() -
+ request->rq_deadline));
goto put_conn;
}
@@ -1969,19 +1684,22 @@ put_conn:
lu_context_exit(&request->rq_session);
lu_context_fini(&request->rq_session);
- if (unlikely(get_seconds() > request->rq_deadline)) {
+ if (unlikely(ktime_get_real_seconds() > request->rq_deadline)) {
DEBUG_REQ(D_WARNING, request,
- "Request took longer than estimated ("
- CFS_DURATION_T":"CFS_DURATION_T
- "s); client may timeout.",
- cfs_time_sub(request->rq_deadline,
- request->rq_arrival_time.tv_sec),
- cfs_time_sub(get_seconds(),
- request->rq_deadline));
- }
-
- do_gettimeofday(&work_end);
- timediff = cfs_timeval_sub(&work_end, &work_start, NULL);
+ "Request took longer than estimated (%lld:%llds); "
+ "client may timeout.",
+ (s64)request->rq_deadline -
+ request->rq_arrival_time.tv_sec,
+ (s64)ktime_get_real_seconds() - request->rq_deadline);
+ }
+
+ ktime_get_real_ts64(&work_end);
+ timediff = timespec64_sub(work_end, work_start);
+ timediff_usecs = timediff.tv_sec * USEC_PER_SEC +
+ timediff.tv_nsec / NSEC_PER_USEC;
+ arrived = timespec64_sub(work_end, request->rq_arrival_time);
+ arrived_usecs = arrived.tv_sec * USEC_PER_SEC +
+ arrived.tv_nsec / NSEC_PER_USEC;
CDEBUG(D_RPCTRACE, "Handled RPC pname:cluuid+ref:pid:xid:nid:opc %s:%s+%d:%d:x%llu:%s:%d Request processed in %ldus (%ldus total) trans %llu rc %d/%d\n",
current_comm(),
(request->rq_export ?
@@ -1992,8 +1710,8 @@ put_conn:
request->rq_xid,
libcfs_id2str(request->rq_peer),
lustre_msg_get_opc(request->rq_reqmsg),
- timediff,
- cfs_timeval_sub(&work_end, &request->rq_arrival_time, NULL),
+ timediff_usecs,
+ arrived_usecs,
(request->rq_repmsg ?
lustre_msg_get_transno(request->rq_repmsg) :
request->rq_transno),
@@ -2003,20 +1721,20 @@ put_conn:
if (likely(svc->srv_stats != NULL && request->rq_reqmsg != NULL)) {
__u32 op = lustre_msg_get_opc(request->rq_reqmsg);
int opc = opcode_offset(op);
+
if (opc > 0 && !(op == LDLM_ENQUEUE || op == MDS_REINT)) {
LASSERT(opc < LUSTRE_MAX_OPCODES);
lprocfs_counter_add(svc->srv_stats,
opc + EXTRA_MAX_OPCODES,
- timediff);
+ timediff_usecs);
}
}
if (unlikely(request->rq_early_count)) {
DEBUG_REQ(D_ADAPTTO, request,
- "sent %d early replies before finishing in "
- CFS_DURATION_T"s",
+ "sent %d early replies before finishing in %llds",
request->rq_early_count,
- cfs_time_sub(work_end.tv_sec,
- request->rq_arrival_time.tv_sec));
+ (s64)work_end.tv_sec -
+ request->rq_arrival_time.tv_sec);
}
out_req:
@@ -2128,7 +1846,6 @@ ptlrpc_handle_rs(struct ptlrpc_reply_state *rs)
return 1;
}
-
static void
ptlrpc_check_rqbd_pool(struct ptlrpc_service_part *svcpt)
{
@@ -2155,7 +1872,7 @@ ptlrpc_check_rqbd_pool(struct ptlrpc_service_part *svcpt)
static int
ptlrpc_retry_rqbds(void *arg)
{
- struct ptlrpc_service_part *svcpt = (struct ptlrpc_service_part *)arg;
+ struct ptlrpc_service_part *svcpt = arg;
svcpt->scp_rqbd_timeout = 0;
return -ETIMEDOUT;
@@ -2262,7 +1979,7 @@ ptlrpc_wait_event(struct ptlrpc_service_part *svcpt,
*/
static int ptlrpc_main(void *arg)
{
- struct ptlrpc_thread *thread = (struct ptlrpc_thread *)arg;
+ struct ptlrpc_thread *thread = arg;
struct ptlrpc_service_part *svcpt = thread->t_svcpt;
struct ptlrpc_service *svc = svcpt->scp_service;
struct ptlrpc_reply_state *rs;
@@ -2464,7 +2181,7 @@ static int hrt_dont_sleep(struct ptlrpc_hr_thread *hrt,
*/
static int ptlrpc_hr_main(void *arg)
{
- struct ptlrpc_hr_thread *hrt = (struct ptlrpc_hr_thread *)arg;
+ struct ptlrpc_hr_thread *hrt = arg;
struct ptlrpc_hr_partition *hrp = hrt->hrt_partition;
LIST_HEAD (replies);
char threadname[20];
@@ -2538,6 +2255,7 @@ static int ptlrpc_start_hr_threads(void)
for (j = 0; j < hrp->hrp_nthrs; j++) {
struct ptlrpc_hr_thread *hrt = &hrp->hrp_thrs[j];
+
rc = PTR_ERR(kthread_run(ptlrpc_hr_main,
&hrp->hrp_thrs[j],
"ptlrpc_hr%02d_%03d",
@@ -2609,7 +2327,7 @@ static void ptlrpc_svcpt_stop_threads(struct ptlrpc_service_part *svcpt)
/**
* Stops all threads of a particular service \a svc
*/
-void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
+static void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
{
struct ptlrpc_service_part *svcpt;
int i;
@@ -2619,7 +2337,6 @@ void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
ptlrpc_svcpt_stop_threads(svcpt);
}
}
-EXPORT_SYMBOL(ptlrpc_stop_all_threads);
int ptlrpc_start_threads(struct ptlrpc_service *svc)
{
@@ -2833,7 +2550,6 @@ void ptlrpc_hr_fini(void)
ptlrpc_hr.hr_partitions = NULL;
}
-
/**
* Wait until all already scheduled replies are processed.
*/
@@ -2862,7 +2578,7 @@ ptlrpc_service_del_atimer(struct ptlrpc_service *svc)
/* early disarm AT timer... */
ptlrpc_service_for_each_part(svcpt, i, svc) {
if (svcpt->scp_service != NULL)
- cfs_timer_disarm(&svcpt->scp_at_timer);
+ del_timer(&svcpt->scp_at_timer);
}
}
@@ -3002,7 +2718,7 @@ ptlrpc_service_free(struct ptlrpc_service *svc)
break;
/* In case somebody rearmed this in the meantime */
- cfs_timer_disarm(&svcpt->scp_at_timer);
+ del_timer(&svcpt->scp_at_timer);
array = &svcpt->scp_at_array;
kfree(array->paa_reqs_array);
@@ -3045,61 +2761,3 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
return 0;
}
EXPORT_SYMBOL(ptlrpc_unregister_service);
-
-/**
- * Returns 0 if the service is healthy.
- *
- * Right now, it just checks to make sure that requests aren't languishing
- * in the queue. We'll use this health check to govern whether a node needs
- * to be shot, so it's intentionally non-aggressive. */
-static int ptlrpc_svcpt_health_check(struct ptlrpc_service_part *svcpt)
-{
- struct ptlrpc_request *request = NULL;
- struct timeval right_now;
- long timediff;
-
- do_gettimeofday(&right_now);
-
- spin_lock(&svcpt->scp_req_lock);
- /* How long has the next entry been waiting? */
- if (ptlrpc_server_high_pending(svcpt, true))
- request = ptlrpc_nrs_req_peek_nolock(svcpt, true);
- else if (ptlrpc_server_normal_pending(svcpt, true))
- request = ptlrpc_nrs_req_peek_nolock(svcpt, false);
-
- if (request == NULL) {
- spin_unlock(&svcpt->scp_req_lock);
- return 0;
- }
-
- timediff = cfs_timeval_sub(&right_now, &request->rq_arrival_time, NULL);
- spin_unlock(&svcpt->scp_req_lock);
-
- if ((timediff / ONE_MILLION) >
- (AT_OFF ? obd_timeout * 3 / 2 : at_max)) {
- CERROR("%s: unhealthy - request has been waiting %lds\n",
- svcpt->scp_service->srv_name, timediff / ONE_MILLION);
- return -1;
- }
-
- return 0;
-}
-
-int
-ptlrpc_service_health_check(struct ptlrpc_service *svc)
-{
- struct ptlrpc_service_part *svcpt;
- int i;
-
- if (svc == NULL)
- return 0;
-
- ptlrpc_service_for_each_part(svcpt, i, svc) {
- int rc = ptlrpc_svcpt_health_check(svcpt);
-
- if (rc != 0)
- return rc;
- }
- return 0;
-}
-EXPORT_SYMBOL(ptlrpc_service_health_check);
diff --git a/drivers/staging/lustre/lustre/ptlrpc/wiretest.c b/drivers/staging/lustre/lustre/ptlrpc/wiretest.c
index d6d92046c..40f720ca3 100644
--- a/drivers/staging/lustre/lustre/ptlrpc/wiretest.c
+++ b/drivers/staging/lustre/lustre/ptlrpc/wiretest.c
@@ -43,6 +43,8 @@
#include "../include/obd_class.h"
#include "../include/lustre_net.h"
#include "../include/lustre_disk.h"
+#include "ptlrpc_internal.h"
+
void lustre_assert_wire_constants(void)
{
/* Wire protocol assertions generated by 'wirecheck'
@@ -636,12 +638,8 @@ void lustre_assert_wire_constants(void)
(long long)(int)offsetof(struct lustre_msg_v2, lm_buflens[0]));
LASSERTF((int)sizeof(((struct lustre_msg_v2 *)0)->lm_buflens[0]) == 4, "found %lld\n",
(long long)(int)sizeof(((struct lustre_msg_v2 *)0)->lm_buflens[0]));
- LASSERTF(LUSTRE_MSG_MAGIC_V1 == 0x0BD00BD0, "found 0x%.8x\n",
- LUSTRE_MSG_MAGIC_V1);
LASSERTF(LUSTRE_MSG_MAGIC_V2 == 0x0BD00BD3, "found 0x%.8x\n",
LUSTRE_MSG_MAGIC_V2);
- LASSERTF(LUSTRE_MSG_MAGIC_V1_SWABBED == 0xD00BD00B, "found 0x%.8x\n",
- LUSTRE_MSG_MAGIC_V1_SWABBED);
LASSERTF(LUSTRE_MSG_MAGIC_V2_SWABBED == 0xD30BD00B, "found 0x%.8x\n",
LUSTRE_MSG_MAGIC_V2_SWABBED);