Commit b541dd55 authored by Chuck Lever's avatar Chuck Lever

svcrdma: Eliminate allocation of recv_ctxt objects in backchannel

The svc_rdma_recv_ctxt free list uses a lockless list to avoid the
need for a spin lock in the fast path. llist_del_first(), which is
used by svc_rdma_recv_ctxt_get(), requires serialization, however,
when there are multiple list producers that are unserialized.

I mistakenly thought there was only one caller of
svc_rdma_recv_ctxt_get() (svc_rdma_refresh_recvs()), thus explicit
serialization would not be necessary. But there is another caller:
svc_rdma_bc_sendto(), and these two are not serialized against each
other. I haven't seen ill effects that I could directly ascribe to
a lack of serialization. It's just an observation based on code
audit.

When DMA-mapping before sending a Reply, the passed-in struct
svc_rdma_recv_ctxt is used only for its write and reply PCLs. These
are currently always empty in the backchannel case. So, instead of
passing a full svc_rdma_recv_ctxt object to
svc_rdma_map_reply_msg(), let's pass in just the Write and Reply
PCLs.

This change makes it unnecessary for the backchannel to acquire a
dummy svc_rdma_recv_ctxt object when sending an RPC Call. The need
for svc_rdma_recv_ctxt free list serialization is now completely
avoided.
Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
parent 52e89100
......@@ -200,7 +200,8 @@ extern int svc_rdma_send(struct svcxprt_rdma *rdma,
struct svc_rdma_send_ctxt *ctxt);
extern int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
struct svc_rdma_send_ctxt *sctxt,
const struct svc_rdma_recv_ctxt *rctxt,
const struct svc_rdma_pcl *write_pcl,
const struct svc_rdma_pcl *reply_pcl,
const struct xdr_buf *xdr);
extern void svc_rdma_send_error_msg(struct svcxprt_rdma *rdma,
struct svc_rdma_send_ctxt *sctxt,
......
......@@ -76,15 +76,12 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
struct rpc_rqst *rqst,
struct svc_rdma_send_ctxt *sctxt)
{
struct svc_rdma_recv_ctxt *rctxt;
struct svc_rdma_pcl empty_pcl;
int ret;
rctxt = svc_rdma_recv_ctxt_get(rdma);
if (!rctxt)
return -EIO;
ret = svc_rdma_map_reply_msg(rdma, sctxt, rctxt, &rqst->rq_snd_buf);
svc_rdma_recv_ctxt_put(rdma, rctxt);
pcl_init(&empty_pcl);
ret = svc_rdma_map_reply_msg(rdma, sctxt, &empty_pcl, &empty_pcl,
&rqst->rq_snd_buf);
if (ret < 0)
return -EIO;
......
......@@ -653,7 +653,7 @@ static int svc_rdma_xb_count_sges(const struct xdr_buf *xdr,
* svc_rdma_pull_up_needed - Determine whether to use pull-up
* @rdma: controlling transport
* @sctxt: send_ctxt for the Send WR
* @rctxt: Write and Reply chunks provided by client
* @write_pcl: Write chunk list provided by client
* @xdr: xdr_buf containing RPC message to transmit
*
* Returns:
......@@ -662,7 +662,7 @@ static int svc_rdma_xb_count_sges(const struct xdr_buf *xdr,
*/
static bool svc_rdma_pull_up_needed(const struct svcxprt_rdma *rdma,
const struct svc_rdma_send_ctxt *sctxt,
const struct svc_rdma_recv_ctxt *rctxt,
const struct svc_rdma_pcl *write_pcl,
const struct xdr_buf *xdr)
{
/* Resources needed for the transport header */
......@@ -672,7 +672,7 @@ static bool svc_rdma_pull_up_needed(const struct svcxprt_rdma *rdma,
};
int ret;
ret = pcl_process_nonpayloads(&rctxt->rc_write_pcl, xdr,
ret = pcl_process_nonpayloads(write_pcl, xdr,
svc_rdma_xb_count_sges, &args);
if (ret < 0)
return false;
......@@ -728,7 +728,7 @@ static int svc_rdma_xb_linearize(const struct xdr_buf *xdr,
* svc_rdma_pull_up_reply_msg - Copy Reply into a single buffer
* @rdma: controlling transport
* @sctxt: send_ctxt for the Send WR; xprt hdr is already prepared
* @rctxt: Write and Reply chunks provided by client
* @write_pcl: Write chunk list provided by client
* @xdr: prepared xdr_buf containing RPC message
*
* The device is not capable of sending the reply directly.
......@@ -743,7 +743,7 @@ static int svc_rdma_xb_linearize(const struct xdr_buf *xdr,
*/
static int svc_rdma_pull_up_reply_msg(const struct svcxprt_rdma *rdma,
struct svc_rdma_send_ctxt *sctxt,
const struct svc_rdma_recv_ctxt *rctxt,
const struct svc_rdma_pcl *write_pcl,
const struct xdr_buf *xdr)
{
struct svc_rdma_pullup_data args = {
......@@ -751,7 +751,7 @@ static int svc_rdma_pull_up_reply_msg(const struct svcxprt_rdma *rdma,
};
int ret;
ret = pcl_process_nonpayloads(&rctxt->rc_write_pcl, xdr,
ret = pcl_process_nonpayloads(write_pcl, xdr,
svc_rdma_xb_linearize, &args);
if (ret < 0)
return ret;
......@@ -764,7 +764,8 @@ static int svc_rdma_pull_up_reply_msg(const struct svcxprt_rdma *rdma,
/* svc_rdma_map_reply_msg - DMA map the buffer holding RPC message
* @rdma: controlling transport
* @sctxt: send_ctxt for the Send WR
* @rctxt: Write and Reply chunks provided by client
* @write_pcl: Write chunk list provided by client
* @reply_pcl: Reply chunk provided by client
* @xdr: prepared xdr_buf containing RPC message
*
* Returns:
......@@ -776,7 +777,8 @@ static int svc_rdma_pull_up_reply_msg(const struct svcxprt_rdma *rdma,
*/
int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
struct svc_rdma_send_ctxt *sctxt,
const struct svc_rdma_recv_ctxt *rctxt,
const struct svc_rdma_pcl *write_pcl,
const struct svc_rdma_pcl *reply_pcl,
const struct xdr_buf *xdr)
{
struct svc_rdma_map_data args = {
......@@ -789,18 +791,18 @@ int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
sctxt->sc_sges[0].length = sctxt->sc_hdrbuf.len;
/* If there is a Reply chunk, nothing follows the transport
* header, and we're done here.
* header, so there is nothing to map.
*/
if (!pcl_is_empty(&rctxt->rc_reply_pcl))
if (!pcl_is_empty(reply_pcl))
return 0;
/* For pull-up, svc_rdma_send() will sync the transport header.
* No additional DMA mapping is necessary.
*/
if (svc_rdma_pull_up_needed(rdma, sctxt, rctxt, xdr))
return svc_rdma_pull_up_reply_msg(rdma, sctxt, rctxt, xdr);
if (svc_rdma_pull_up_needed(rdma, sctxt, write_pcl, xdr))
return svc_rdma_pull_up_reply_msg(rdma, sctxt, write_pcl, xdr);
return pcl_process_nonpayloads(&rctxt->rc_write_pcl, xdr,
return pcl_process_nonpayloads(write_pcl, xdr,
svc_rdma_xb_dma_map, &args);
}
......@@ -848,7 +850,8 @@ static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma,
{
int ret;
ret = svc_rdma_map_reply_msg(rdma, sctxt, rctxt, &rqstp->rq_res);
ret = svc_rdma_map_reply_msg(rdma, sctxt, &rctxt->rc_write_pcl,
&rctxt->rc_reply_pcl, &rqstp->rq_res);
if (ret < 0)
return ret;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment