summaryrefslogtreecommitdiff
path: root/drivers/staging/lustre/lustre/ptlrpc/events.c
diff options
context:
space:
mode:
Diffstat (limited to 'drivers/staging/lustre/lustre/ptlrpc/events.c')
-rw-r--r--drivers/staging/lustre/lustre/ptlrpc/events.c68
1 files changed, 37 insertions, 31 deletions
diff --git a/drivers/staging/lustre/lustre/ptlrpc/events.c b/drivers/staging/lustre/lustre/ptlrpc/events.c
index 990156986..47be21ac9 100644
--- a/drivers/staging/lustre/lustre/ptlrpc/events.c
+++ b/drivers/staging/lustre/lustre/ptlrpc/events.c
@@ -71,7 +71,8 @@ void request_out_callback(lnet_event_t *ev)
if (ev->type == LNET_EVENT_UNLINK || ev->status != 0) {
/* Failed send: make it seem like the reply timed out, just
- * like failing sends in client.c does currently... */
+ * like failing sends in client.c does currently...
+ */
req->rq_net_err = 1;
ptlrpc_client_wake_req(req);
@@ -95,7 +96,8 @@ void reply_in_callback(lnet_event_t *ev)
LASSERT(ev->md.start == req->rq_repbuf);
LASSERT(ev->offset + ev->mlength <= req->rq_repbuf_len);
/* We've set LNET_MD_MANAGE_REMOTE for all outgoing requests
- for adaptive timeouts' early reply. */
+ * for adaptive timeouts' early reply.
+ */
LASSERT((ev->md.options & LNET_MD_MANAGE_REMOTE) != 0);
spin_lock(&req->rq_lock);
@@ -151,7 +153,8 @@ void reply_in_callback(lnet_event_t *ev)
req->rq_reply_off = ev->offset;
req->rq_nob_received = ev->mlength;
/* LNetMDUnlink can't be called under the LNET_LOCK,
- so we must unlink in ptlrpc_unregister_reply */
+ * so we must unlink in ptlrpc_unregister_reply
+ */
DEBUG_REQ(D_INFO, req,
"reply in flags=%x mlen=%u offset=%d replen=%d",
lustre_msg_get_flags(req->rq_reqmsg),
@@ -162,7 +165,8 @@ void reply_in_callback(lnet_event_t *ev)
out_wake:
/* NB don't unlock till after wakeup; req can disappear under us
- * since we don't have our own ref */
+ * since we don't have our own ref
+ */
ptlrpc_client_wake_req(req);
spin_unlock(&req->rq_lock);
}
@@ -213,7 +217,8 @@ void client_bulk_callback(lnet_event_t *ev)
desc->bd_failure = 1;
/* NB don't unlock till after wakeup; desc can disappear under us
- * otherwise */
+ * otherwise
+ */
if (desc->bd_md_count == 0)
ptlrpc_client_wake_req(desc->bd_req);
@@ -250,7 +255,8 @@ static void ptlrpc_req_add_history(struct ptlrpc_service_part *svcpt,
__u64 new_seq;
/* set sequence ID for request and add it to history list,
- * it must be called with hold svcpt::scp_lock */
+ * it must be called with hold svcpt::scp_lock
+ */
new_seq = (sec << REQS_SEC_SHIFT) |
(usec << REQS_USEC_SHIFT) |
@@ -258,7 +264,8 @@ static void ptlrpc_req_add_history(struct ptlrpc_service_part *svcpt,
if (new_seq > svcpt->scp_hist_seq) {
/* This handles the initial case of scp_hist_seq == 0 or
- * we just jumped into a new time window */
+ * we just jumped into a new time window
+ */
svcpt->scp_hist_seq = new_seq;
} else {
LASSERT(REQS_SEQ_SHIFT(svcpt) < REQS_USEC_SHIFT);
@@ -266,7 +273,8 @@ static void ptlrpc_req_add_history(struct ptlrpc_service_part *svcpt,
* however, it's possible that we used up all bits for
* sequence and jumped into the next usec bucket (future time),
* then we hope there will be less RPCs per bucket at some
- * point, and sequence will catch up again */
+ * point, and sequence will catch up again
+ */
svcpt->scp_hist_seq += (1U << REQS_SEQ_SHIFT(svcpt));
new_seq = svcpt->scp_hist_seq;
}
@@ -302,7 +310,8 @@ void request_in_callback(lnet_event_t *ev)
* request buffer we can use the request object embedded in
* rqbd. Note that if we failed to allocate a request,
* we'd have to re-post the rqbd, which we can't do in this
- * context. */
+ * context.
+ */
req = &rqbd->rqbd_req;
memset(req, 0, sizeof(*req));
} else {
@@ -312,7 +321,7 @@ void request_in_callback(lnet_event_t *ev)
return;
}
req = ptlrpc_request_cache_alloc(GFP_ATOMIC);
- if (req == NULL) {
+ if (!req) {
CERROR("Can't allocate incoming request descriptor: Dropping %s RPC from %s\n",
service->srv_name,
libcfs_id2str(ev->initiator));
@@ -322,7 +331,8 @@ void request_in_callback(lnet_event_t *ev)
/* NB we ABSOLUTELY RELY on req being zeroed, so pointers are NULL,
* flags are reset and scalars are zero. We only set the message
- * size to non-zero if this was a successful receive. */
+ * size to non-zero if this was a successful receive.
+ */
req->rq_xid = ev->match_bits;
req->rq_reqbuf = ev->md.start + ev->offset;
if (ev->type == LNET_EVENT_PUT && ev->status == 0)
@@ -352,7 +362,8 @@ void request_in_callback(lnet_event_t *ev)
svcpt->scp_nrqbds_posted);
/* Normally, don't complain about 0 buffers posted; LNET won't
- * drop incoming reqs since we set the portal lazy */
+ * drop incoming reqs since we set the portal lazy
+ */
if (test_req_buffer_pressure &&
ev->type != LNET_EVENT_UNLINK &&
svcpt->scp_nrqbds_posted == 0)
@@ -369,7 +380,8 @@ void request_in_callback(lnet_event_t *ev)
svcpt->scp_nreqs_incoming++;
/* NB everything can disappear under us once the request
- * has been queued and we unlock, so do the wake now... */
+ * has been queued and we unlock, so do the wake now...
+ */
wake_up(&svcpt->scp_waitq);
spin_unlock(&svcpt->scp_lock);
@@ -390,7 +402,8 @@ void reply_out_callback(lnet_event_t *ev)
if (!rs->rs_difficult) {
/* 'Easy' replies have no further processing so I drop the
- * net's ref on 'rs' */
+ * net's ref on 'rs'
+ */
LASSERT(ev->unlinked);
ptlrpc_rs_decref(rs);
return;
@@ -400,7 +413,8 @@ void reply_out_callback(lnet_event_t *ev)
if (ev->unlinked) {
/* Last network callback. The net's ref on 'rs' stays put
- * until ptlrpc_handle_rs() is done with it */
+ * until ptlrpc_handle_rs() is done with it
+ */
spin_lock(&svcpt->scp_rep_lock);
spin_lock(&rs->rs_lock);
@@ -438,15 +452,12 @@ int ptlrpc_uuid_to_peer(struct obd_uuid *uuid,
__u32 best_order = 0;
int count = 0;
int rc = -ENOENT;
- int portals_compatibility;
int dist;
__u32 order;
lnet_nid_t dst_nid;
lnet_nid_t src_nid;
- portals_compatibility = LNetCtl(IOC_LIBCFS_PORTALS_COMPATIBILITY, NULL);
-
- peer->pid = LUSTRE_SRV_LNET_PID;
+ peer->pid = LNET_PID_LUSTRE;
/* Choose the matching UUID that's closest */
while (lustre_uuid_to_peer(uuid->uuid, &dst_nid, count++) == 0) {
@@ -466,14 +477,6 @@ int ptlrpc_uuid_to_peer(struct obd_uuid *uuid,
best_dist = dist;
best_order = order;
- if (portals_compatibility > 1) {
- /* Strong portals compatibility: Zero the nid's
- * NET, so if I'm reading new config logs, or
- * getting configured by (new) lconf I can
- * still talk to old servers. */
- dst_nid = LNET_MKNID(0, LNET_NIDADDR(dst_nid));
- src_nid = LNET_MKNID(0, LNET_NIDADDR(src_nid));
- }
peer->nid = dst_nid;
*self = src_nid;
rc = 0;
@@ -494,7 +497,8 @@ static void ptlrpc_ni_fini(void)
/* Wait for the event queue to become idle since there may still be
* messages in flight with pending events (i.e. the fire-and-forget
* messages == client requests and "non-difficult" server
- * replies */
+ * replies
+ */
for (retries = 0;; retries++) {
rc = LNetEQFree(ptlrpc_eq_h);
@@ -524,7 +528,7 @@ static lnet_pid_t ptl_get_pid(void)
{
lnet_pid_t pid;
- pid = LUSTRE_SRV_LNET_PID;
+ pid = LNET_PID_LUSTRE;
return pid;
}
@@ -544,11 +548,13 @@ static int ptlrpc_ni_init(void)
}
/* CAVEAT EMPTOR: how we process portals events is _radically_
- * different depending on... */
+ * different depending on...
+ */
/* kernel LNet calls our master callback when there are new event,
* because we are guaranteed to get every event via callback,
* so we just set EQ size to 0 to avoid overhead of serializing
- * enqueue/dequeue operations in LNet. */
+ * enqueue/dequeue operations in LNet.
+ */
rc = LNetEQAlloc(0, ptlrpc_master_callback, &ptlrpc_eq_h);
if (rc == 0)
return 0;