summaryrefslogtreecommitdiff
path: root/net/sunrpc/xprtrdma/rpc_rdma.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/sunrpc/xprtrdma/rpc_rdma.c')
-rw-r--r--net/sunrpc/xprtrdma/rpc_rdma.c197
1 files changed, 101 insertions, 96 deletions
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 84ea37dae..bc8bd6577 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -71,6 +71,67 @@ static const char transfertypes[][12] = {
};
#endif
+/* The client can send a request inline as long as the RPCRDMA header
+ * plus the RPC call fit under the transport's inline limit. If the
+ * combined call message size exceeds that limit, the client must use
+ * the read chunk list for this operation.
+ */
+static bool rpcrdma_args_inline(struct rpc_rqst *rqst)
+{
+ unsigned int callsize = RPCRDMA_HDRLEN_MIN + rqst->rq_snd_buf.len;
+
+ return callsize <= RPCRDMA_INLINE_WRITE_THRESHOLD(rqst);
+}
+
+/* The client can't know how large the actual reply will be. Thus it
+ * plans for the largest possible reply for that particular ULP
+ * operation. If the maximum combined reply message size exceeds that
+ * limit, the client must provide a write list or a reply chunk for
+ * this request.
+ */
+static bool rpcrdma_results_inline(struct rpc_rqst *rqst)
+{
+ unsigned int repsize = RPCRDMA_HDRLEN_MIN + rqst->rq_rcv_buf.buflen;
+
+ return repsize <= RPCRDMA_INLINE_READ_THRESHOLD(rqst);
+}
+
+static int
+rpcrdma_tail_pullup(struct xdr_buf *buf)
+{
+ size_t tlen = buf->tail[0].iov_len;
+ size_t skip = tlen & 3;
+
+ /* Do not include the tail if it is only an XDR pad */
+ if (tlen < 4)
+ return 0;
+
+ /* xdr_write_pages() adds a pad at the beginning of the tail
+ * if the content in "buf->pages" is unaligned. Force the
+ * tail's actual content to land at the next XDR position
+ * after the head instead.
+ */
+ if (skip) {
+ unsigned char *src, *dst;
+ unsigned int count;
+
+ src = buf->tail[0].iov_base;
+ dst = buf->head[0].iov_base;
+ dst += buf->head[0].iov_len;
+
+ src += skip;
+ tlen -= skip;
+
+ dprintk("RPC: %s: skip=%zu, memmove(%p, %p, %zu)\n",
+ __func__, skip, dst, src, tlen);
+
+ for (count = tlen; count; count--)
+ *dst++ = *src++;
+ }
+
+ return tlen;
+}
+
/*
* Chunk assembly from upper layer xdr_buf.
*
@@ -122,6 +183,10 @@ rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
if (len && n == nsegs)
return -EIO;
+ /* When encoding the read list, the tail is always sent inline */
+ if (type == rpcrdma_readch)
+ return n;
+
if (xdrbuf->tail[0].iov_len) {
/* the rpcrdma protocol allows us to omit any trailing
* xdr pad bytes, saving the server an RDMA operation. */
@@ -297,8 +362,7 @@ out:
* pre-registered memory buffer for this request. For small amounts
* of data, this is efficient. The cutoff value is tunable.
*/
-static int
-rpcrdma_inline_pullup(struct rpc_rqst *rqst, int pad)
+static void rpcrdma_inline_pullup(struct rpc_rqst *rqst)
{
int i, npages, curlen;
int copy_len;
@@ -310,16 +374,9 @@ rpcrdma_inline_pullup(struct rpc_rqst *rqst, int pad)
destp = rqst->rq_svec[0].iov_base;
curlen = rqst->rq_svec[0].iov_len;
destp += curlen;
- /*
- * Do optional padding where it makes sense. Alignment of write
- * payload can help the server, if our setting is accurate.
- */
- pad -= (curlen + 36/*sizeof(struct rpcrdma_msg_padded)*/);
- if (pad < 0 || rqst->rq_slen - curlen < RPCRDMA_INLINE_PAD_THRESH)
- pad = 0; /* don't pad this request */
- dprintk("RPC: %s: pad %d destp 0x%p len %d hdrlen %d\n",
- __func__, pad, destp, rqst->rq_slen, curlen);
+ dprintk("RPC: %s: destp 0x%p len %d hdrlen %d\n",
+ __func__, destp, rqst->rq_slen, curlen);
copy_len = rqst->rq_snd_buf.page_len;
@@ -355,7 +412,6 @@ rpcrdma_inline_pullup(struct rpc_rqst *rqst, int pad)
page_base = 0;
}
/* header now contains entire send message */
- return pad;
}
/*
@@ -380,7 +436,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
char *base;
- size_t rpclen, padlen;
+ size_t rpclen;
ssize_t hdrlen;
enum rpcrdma_chunktype rtype, wtype;
struct rpcrdma_msg *headerp;
@@ -402,28 +458,15 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
/*
* Chunks needed for results?
*
+ * o Read ops return data as write chunk(s), header as inline.
* o If the expected result is under the inline threshold, all ops
- * return as inline (but see later).
+ * return as inline.
* o Large non-read ops return as a single reply chunk.
- * o Large read ops return data as write chunk(s), header as inline.
- *
- * Note: the NFS code sending down multiple result segments implies
- * the op is one of read, readdir[plus], readlink or NFSv4 getacl.
- */
-
- /*
- * This code can handle read chunks, write chunks OR reply
- * chunks -- only one type. If the request is too big to fit
- * inline, then we will choose read chunks. If the request is
- * a READ, then use write chunks to separate the file data
- * into pages; otherwise use reply chunks.
*/
- if (rqst->rq_rcv_buf.buflen <= RPCRDMA_INLINE_READ_THRESHOLD(rqst))
- wtype = rpcrdma_noch;
- else if (rqst->rq_rcv_buf.page_len == 0)
- wtype = rpcrdma_replych;
- else if (rqst->rq_rcv_buf.flags & XDRBUF_READ)
+ if (rqst->rq_rcv_buf.flags & XDRBUF_READ)
wtype = rpcrdma_writech;
+ else if (rpcrdma_results_inline(rqst))
+ wtype = rpcrdma_noch;
else
wtype = rpcrdma_replych;
@@ -432,21 +475,25 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
*
* o If the total request is under the inline threshold, all ops
* are sent as inline.
- * o Large non-write ops are sent with the entire message as a
- * single read chunk (protocol 0-position special case).
* o Large write ops transmit data as read chunk(s), header as
* inline.
+ * o Large non-write ops are sent with the entire message as a
+ * single read chunk (protocol 0-position special case).
*
- * Note: the NFS code sending down multiple argument segments
- * implies the op is a write.
- * TBD check NFSv4 setacl
+ * This assumes that the upper layer does not present a request
+ * that both has a data payload, and whose non-data arguments
+ * by themselves are larger than the inline threshold.
*/
- if (rqst->rq_snd_buf.len <= RPCRDMA_INLINE_WRITE_THRESHOLD(rqst))
+ if (rpcrdma_args_inline(rqst)) {
rtype = rpcrdma_noch;
- else if (rqst->rq_snd_buf.page_len == 0)
- rtype = rpcrdma_areadch;
- else
+ } else if (rqst->rq_snd_buf.flags & XDRBUF_WRITE) {
rtype = rpcrdma_readch;
+ } else {
+ r_xprt->rx_stats.nomsg_call_count++;
+ headerp->rm_type = htonl(RDMA_NOMSG);
+ rtype = rpcrdma_areadch;
+ rpclen = 0;
+ }
/* The following simplification is not true forever */
if (rtype != rpcrdma_noch && wtype == rpcrdma_replych)
@@ -458,7 +505,6 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
}
hdrlen = RPCRDMA_HDRLEN_MIN;
- padlen = 0;
/*
* Pull up any extra send data into the preregistered buffer.
@@ -467,45 +513,15 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
*/
if (rtype == rpcrdma_noch) {
- padlen = rpcrdma_inline_pullup(rqst,
- RPCRDMA_INLINE_PAD_VALUE(rqst));
-
- if (padlen) {
- headerp->rm_type = rdma_msgp;
- headerp->rm_body.rm_padded.rm_align =
- cpu_to_be32(RPCRDMA_INLINE_PAD_VALUE(rqst));
- headerp->rm_body.rm_padded.rm_thresh =
- cpu_to_be32(RPCRDMA_INLINE_PAD_THRESH);
- headerp->rm_body.rm_padded.rm_pempty[0] = xdr_zero;
- headerp->rm_body.rm_padded.rm_pempty[1] = xdr_zero;
- headerp->rm_body.rm_padded.rm_pempty[2] = xdr_zero;
- hdrlen += 2 * sizeof(u32); /* extra words in padhdr */
- if (wtype != rpcrdma_noch) {
- dprintk("RPC: %s: invalid chunk list\n",
- __func__);
- return -EIO;
- }
- } else {
- headerp->rm_body.rm_nochunks.rm_empty[0] = xdr_zero;
- headerp->rm_body.rm_nochunks.rm_empty[1] = xdr_zero;
- headerp->rm_body.rm_nochunks.rm_empty[2] = xdr_zero;
- /* new length after pullup */
- rpclen = rqst->rq_svec[0].iov_len;
- /*
- * Currently we try to not actually use read inline.
- * Reply chunks have the desirable property that
- * they land, packed, directly in the target buffers
- * without headers, so they require no fixup. The
- * additional RDMA Write op sends the same amount
- * of data, streams on-the-wire and adds no overhead
- * on receive. Therefore, we request a reply chunk
- * for non-writes wherever feasible and efficient.
- */
- if (wtype == rpcrdma_noch)
- wtype = rpcrdma_replych;
- }
- }
+ rpcrdma_inline_pullup(rqst);
+ headerp->rm_body.rm_nochunks.rm_empty[0] = xdr_zero;
+ headerp->rm_body.rm_nochunks.rm_empty[1] = xdr_zero;
+ headerp->rm_body.rm_nochunks.rm_empty[2] = xdr_zero;
+ /* new length after pullup */
+ rpclen = rqst->rq_svec[0].iov_len;
+ } else if (rtype == rpcrdma_readch)
+ rpclen += rpcrdma_tail_pullup(&rqst->rq_snd_buf);
if (rtype != rpcrdma_noch) {
hdrlen = rpcrdma_create_chunks(rqst, &rqst->rq_snd_buf,
headerp, rtype);
@@ -518,9 +534,9 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
if (hdrlen < 0)
return hdrlen;
- dprintk("RPC: %s: %s: hdrlen %zd rpclen %zd padlen %zd"
+ dprintk("RPC: %s: %s: hdrlen %zd rpclen %zd"
" headerp 0x%p base 0x%p lkey 0x%x\n",
- __func__, transfertypes[wtype], hdrlen, rpclen, padlen,
+ __func__, transfertypes[wtype], hdrlen, rpclen,
headerp, base, rdmab_lkey(req->rl_rdmabuf));
/*
@@ -534,26 +550,15 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
req->rl_send_iov[0].length = hdrlen;
req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf);
+ req->rl_niovs = 1;
+ if (rtype == rpcrdma_areadch)
+ return 0;
+
req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf);
req->rl_send_iov[1].length = rpclen;
req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf);
req->rl_niovs = 2;
-
- if (padlen) {
- struct rpcrdma_ep *ep = &r_xprt->rx_ep;
-
- req->rl_send_iov[2].addr = rdmab_addr(ep->rep_padbuf);
- req->rl_send_iov[2].length = padlen;
- req->rl_send_iov[2].lkey = rdmab_lkey(ep->rep_padbuf);
-
- req->rl_send_iov[3].addr = req->rl_send_iov[1].addr + rpclen;
- req->rl_send_iov[3].length = rqst->rq_slen - rpclen;
- req->rl_send_iov[3].lkey = rdmab_lkey(req->rl_sendbuf);
-
- req->rl_niovs = 4;
- }
-
return 0;
}