Commit 97bce634 authored by Chuck Lever's avatar Chuck Lever Committed by J. Bruce Fields

svcrdma: Optimize the logic that selects the R_key to invalidate

o Select the R_key to invalidate while the CPU cache still contains
  the received RPC Call transport header, rather than waiting until
  we're about to send the RPC Reply.

o Choose Send With Invalidate if there is exactly one distinct R_key
  in the received transport header. If there's more than one, the
  client will have to perform local invalidation after it has
  already waited for remote invalidation.
Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
Signed-off-by: default avatarJ. Bruce Fields <bfields@redhat.com>
parent b493fd31
...@@ -135,6 +135,7 @@ struct svc_rdma_recv_ctxt { ...@@ -135,6 +135,7 @@ struct svc_rdma_recv_ctxt {
u32 rc_byte_len; u32 rc_byte_len;
unsigned int rc_page_count; unsigned int rc_page_count;
unsigned int rc_hdr_count; unsigned int rc_hdr_count;
u32 rc_inv_rkey;
struct page *rc_pages[RPCSVC_MAXPAGES]; struct page *rc_pages[RPCSVC_MAXPAGES];
}; };
......
...@@ -485,6 +485,68 @@ static __be32 *xdr_check_reply_chunk(__be32 *p, const __be32 *end) ...@@ -485,6 +485,68 @@ static __be32 *xdr_check_reply_chunk(__be32 *p, const __be32 *end)
return p; return p;
} }
/* RPC-over-RDMA Version One private extension: Remote Invalidation.
* Responder's choice: requester signals it can handle Send With
* Invalidate, and responder chooses one R_key to invalidate.
*
* If there is exactly one distinct R_key in the received transport
* header, set rc_inv_rkey to that R_key. Otherwise, set it to zero.
*
* Perform this operation while the received transport header is
* still in the CPU cache.
*/
static void svc_rdma_get_inv_rkey(struct svcxprt_rdma *rdma,
struct svc_rdma_recv_ctxt *ctxt)
{
__be32 inv_rkey, *p;
u32 i, segcount;
ctxt->rc_inv_rkey = 0;
if (!rdma->sc_snd_w_inv)
return;
inv_rkey = xdr_zero;
p = ctxt->rc_recv_buf;
p += rpcrdma_fixed_maxsz;
/* Read list */
while (*p++ != xdr_zero) {
p++; /* position */
if (inv_rkey == xdr_zero)
inv_rkey = *p;
else if (inv_rkey != *p)
return;
p += 4;
}
/* Write list */
while (*p++ != xdr_zero) {
segcount = be32_to_cpup(p++);
for (i = 0; i < segcount; i++) {
if (inv_rkey == xdr_zero)
inv_rkey = *p;
else if (inv_rkey != *p)
return;
p += 4;
}
}
/* Reply chunk */
if (*p++ != xdr_zero) {
segcount = be32_to_cpup(p++);
for (i = 0; i < segcount; i++) {
if (inv_rkey == xdr_zero)
inv_rkey = *p;
else if (inv_rkey != *p)
return;
p += 4;
}
}
ctxt->rc_inv_rkey = be32_to_cpu(inv_rkey);
}
/* On entry, xdr->head[0].iov_base points to first byte in the /* On entry, xdr->head[0].iov_base points to first byte in the
* RPC-over-RDMA header. * RPC-over-RDMA header.
* *
...@@ -746,6 +808,7 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp) ...@@ -746,6 +808,7 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
svc_rdma_recv_ctxt_put(rdma_xprt, ctxt); svc_rdma_recv_ctxt_put(rdma_xprt, ctxt);
return ret; return ret;
} }
svc_rdma_get_inv_rkey(rdma_xprt, ctxt);
p += rpcrdma_fixed_maxsz; p += rpcrdma_fixed_maxsz;
if (*p != xdr_zero) if (*p != xdr_zero)
......
...@@ -484,32 +484,6 @@ static void svc_rdma_get_write_arrays(__be32 *rdma_argp, ...@@ -484,32 +484,6 @@ static void svc_rdma_get_write_arrays(__be32 *rdma_argp,
*reply = NULL; *reply = NULL;
} }
/* RPC-over-RDMA Version One private extension: Remote Invalidation.
* Responder's choice: requester signals it can handle Send With
* Invalidate, and responder chooses one rkey to invalidate.
*
* Find a candidate rkey to invalidate when sending a reply. Picks the
* first R_key it finds in the chunk lists.
*
* Returns zero if RPC's chunk lists are empty.
*/
static u32 svc_rdma_get_inv_rkey(__be32 *rdma_argp,
__be32 *wr_lst, __be32 *rp_ch)
{
__be32 *p;
p = rdma_argp + rpcrdma_fixed_maxsz;
if (*p != xdr_zero)
p += 2;
else if (wr_lst && be32_to_cpup(wr_lst + 1))
p = wr_lst + 2;
else if (rp_ch && be32_to_cpup(rp_ch + 1))
p = rp_ch + 2;
else
return 0;
return be32_to_cpup(p);
}
static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma, static int svc_rdma_dma_map_page(struct svcxprt_rdma *rdma,
struct svc_rdma_send_ctxt *ctxt, struct svc_rdma_send_ctxt *ctxt,
struct page *page, struct page *page,
...@@ -672,7 +646,7 @@ static void svc_rdma_save_io_pages(struct svc_rqst *rqstp, ...@@ -672,7 +646,7 @@ static void svc_rdma_save_io_pages(struct svc_rqst *rqstp,
* *
* RDMA Send is the last step of transmitting an RPC reply. Pages * RDMA Send is the last step of transmitting an RPC reply. Pages
* involved in the earlier RDMA Writes are here transferred out * involved in the earlier RDMA Writes are here transferred out
* of the rqstp and into the ctxt's page array. These pages are * of the rqstp and into the sctxt's page array. These pages are
* DMA unmapped by each Write completion, but the subsequent Send * DMA unmapped by each Write completion, but the subsequent Send
* completion finally releases these pages. * completion finally releases these pages.
* *
...@@ -680,32 +654,31 @@ static void svc_rdma_save_io_pages(struct svc_rqst *rqstp, ...@@ -680,32 +654,31 @@ static void svc_rdma_save_io_pages(struct svc_rqst *rqstp,
* - The Reply's transport header will never be larger than a page. * - The Reply's transport header will never be larger than a page.
*/ */
static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma, static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma,
struct svc_rdma_send_ctxt *ctxt, struct svc_rdma_send_ctxt *sctxt,
__be32 *rdma_argp, struct svc_rdma_recv_ctxt *rctxt,
struct svc_rqst *rqstp, struct svc_rqst *rqstp,
__be32 *wr_lst, __be32 *rp_ch) __be32 *wr_lst, __be32 *rp_ch)
{ {
int ret; int ret;
if (!rp_ch) { if (!rp_ch) {
ret = svc_rdma_map_reply_msg(rdma, ctxt, ret = svc_rdma_map_reply_msg(rdma, sctxt,
&rqstp->rq_res, wr_lst); &rqstp->rq_res, wr_lst);
if (ret < 0) if (ret < 0)
return ret; return ret;
} }
svc_rdma_save_io_pages(rqstp, ctxt); svc_rdma_save_io_pages(rqstp, sctxt);
ctxt->sc_send_wr.opcode = IB_WR_SEND; if (rctxt->rc_inv_rkey) {
if (rdma->sc_snd_w_inv) { sctxt->sc_send_wr.opcode = IB_WR_SEND_WITH_INV;
ctxt->sc_send_wr.ex.invalidate_rkey = sctxt->sc_send_wr.ex.invalidate_rkey = rctxt->rc_inv_rkey;
svc_rdma_get_inv_rkey(rdma_argp, wr_lst, rp_ch); } else {
if (ctxt->sc_send_wr.ex.invalidate_rkey) sctxt->sc_send_wr.opcode = IB_WR_SEND;
ctxt->sc_send_wr.opcode = IB_WR_SEND_WITH_INV;
} }
dprintk("svcrdma: posting Send WR with %u sge(s)\n", dprintk("svcrdma: posting Send WR with %u sge(s)\n",
ctxt->sc_send_wr.num_sge); sctxt->sc_send_wr.num_sge);
return svc_rdma_send(rdma, &ctxt->sc_send_wr); return svc_rdma_send(rdma, &sctxt->sc_send_wr);
} }
/* Given the client-provided Write and Reply chunks, the server was not /* Given the client-provided Write and Reply chunks, the server was not
...@@ -809,7 +782,7 @@ int svc_rdma_sendto(struct svc_rqst *rqstp) ...@@ -809,7 +782,7 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
} }
svc_rdma_sync_reply_hdr(rdma, sctxt, svc_rdma_reply_hdr_len(rdma_resp)); svc_rdma_sync_reply_hdr(rdma, sctxt, svc_rdma_reply_hdr_len(rdma_resp));
ret = svc_rdma_send_reply_msg(rdma, sctxt, rdma_argp, rqstp, ret = svc_rdma_send_reply_msg(rdma, sctxt, rctxt, rqstp,
wr_lst, rp_ch); wr_lst, rp_ch);
if (ret < 0) if (ret < 0)
goto err1; goto err1;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment