Commit 43042b90 authored by Chuck Lever's avatar Chuck Lever

svcrdma: Reduce Receive doorbell rate

This is similar to commit e340c2d6 ("xprtrdma: Reduce the
doorbell rate (Receive)") which added Receive batching to the
client.
Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
parent c6226ff9
...@@ -104,6 +104,7 @@ struct svcxprt_rdma { ...@@ -104,6 +104,7 @@ struct svcxprt_rdma {
wait_queue_head_t sc_send_wait; /* SQ exhaustion waitlist */ wait_queue_head_t sc_send_wait; /* SQ exhaustion waitlist */
unsigned long sc_flags; unsigned long sc_flags;
u32 sc_pending_recvs;
struct list_head sc_read_complete_q; struct list_head sc_read_complete_q;
struct work_struct sc_work; struct work_struct sc_work;
......
...@@ -266,33 +266,46 @@ void svc_rdma_release_rqst(struct svc_rqst *rqstp) ...@@ -266,33 +266,46 @@ void svc_rdma_release_rqst(struct svc_rqst *rqstp)
svc_rdma_recv_ctxt_put(rdma, ctxt); svc_rdma_recv_ctxt_put(rdma, ctxt);
} }
static int __svc_rdma_post_recv(struct svcxprt_rdma *rdma, static bool svc_rdma_refresh_recvs(struct svcxprt_rdma *rdma,
struct svc_rdma_recv_ctxt *ctxt) unsigned int wanted, bool temp)
{ {
const struct ib_recv_wr *bad_wr = NULL;
struct svc_rdma_recv_ctxt *ctxt;
struct ib_recv_wr *recv_chain;
int ret; int ret;
recv_chain = NULL;
while (wanted--) {
ctxt = svc_rdma_recv_ctxt_get(rdma);
if (!ctxt)
break;
trace_svcrdma_post_recv(ctxt); trace_svcrdma_post_recv(ctxt);
ret = ib_post_recv(rdma->sc_qp, &ctxt->rc_recv_wr, NULL); ctxt->rc_temp = temp;
ctxt->rc_recv_wr.next = recv_chain;
recv_chain = &ctxt->rc_recv_wr;
rdma->sc_pending_recvs++;
}
if (!recv_chain)
return false;
ret = ib_post_recv(rdma->sc_qp, recv_chain, &bad_wr);
if (ret) if (ret)
goto err_post; goto err_post;
return 0; return true;
err_post: err_post:
trace_svcrdma_rq_post_err(rdma, ret); while (bad_wr) {
ctxt = container_of(bad_wr, struct svc_rdma_recv_ctxt,
rc_recv_wr);
bad_wr = bad_wr->next;
svc_rdma_recv_ctxt_put(rdma, ctxt); svc_rdma_recv_ctxt_put(rdma, ctxt);
return ret; }
}
static int svc_rdma_post_recv(struct svcxprt_rdma *rdma)
{
struct svc_rdma_recv_ctxt *ctxt;
if (test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags)) trace_svcrdma_rq_post_err(rdma, ret);
return 0; /* Since we're destroying the xprt, no need to reset
ctxt = svc_rdma_recv_ctxt_get(rdma); * sc_pending_recvs. */
if (!ctxt) return false;
return -ENOMEM;
return __svc_rdma_post_recv(rdma, ctxt);
} }
/** /**
...@@ -303,20 +316,7 @@ static int svc_rdma_post_recv(struct svcxprt_rdma *rdma) ...@@ -303,20 +316,7 @@ static int svc_rdma_post_recv(struct svcxprt_rdma *rdma)
*/ */
bool svc_rdma_post_recvs(struct svcxprt_rdma *rdma) bool svc_rdma_post_recvs(struct svcxprt_rdma *rdma)
{ {
struct svc_rdma_recv_ctxt *ctxt; return svc_rdma_refresh_recvs(rdma, rdma->sc_max_requests, true);
unsigned int i;
int ret;
for (i = 0; i < rdma->sc_max_requests; i++) {
ctxt = svc_rdma_recv_ctxt_get(rdma);
if (!ctxt)
return false;
ctxt->rc_temp = true;
ret = __svc_rdma_post_recv(rdma, ctxt);
if (ret)
return false;
}
return true;
} }
/** /**
...@@ -324,8 +324,6 @@ bool svc_rdma_post_recvs(struct svcxprt_rdma *rdma) ...@@ -324,8 +324,6 @@ bool svc_rdma_post_recvs(struct svcxprt_rdma *rdma)
* @cq: Completion Queue context * @cq: Completion Queue context
* @wc: Work Completion object * @wc: Work Completion object
* *
* NB: The svc_xprt/svcxprt_rdma is pinned whenever it's possible that
* the Receive completion handler could be running.
*/ */
static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc) static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
{ {
...@@ -333,6 +331,8 @@ static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc) ...@@ -333,6 +331,8 @@ static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
struct ib_cqe *cqe = wc->wr_cqe; struct ib_cqe *cqe = wc->wr_cqe;
struct svc_rdma_recv_ctxt *ctxt; struct svc_rdma_recv_ctxt *ctxt;
rdma->sc_pending_recvs--;
/* WARNING: Only wc->wr_cqe and wc->status are reliable */ /* WARNING: Only wc->wr_cqe and wc->status are reliable */
ctxt = container_of(cqe, struct svc_rdma_recv_ctxt, rc_cqe); ctxt = container_of(cqe, struct svc_rdma_recv_ctxt, rc_cqe);
...@@ -340,9 +340,6 @@ static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc) ...@@ -340,9 +340,6 @@ static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
if (wc->status != IB_WC_SUCCESS) if (wc->status != IB_WC_SUCCESS)
goto flushed; goto flushed;
if (svc_rdma_post_recv(rdma))
goto post_err;
/* All wc fields are now known to be valid */ /* All wc fields are now known to be valid */
ctxt->rc_byte_len = wc->byte_len; ctxt->rc_byte_len = wc->byte_len;
ib_dma_sync_single_for_cpu(rdma->sc_pd->device, ib_dma_sync_single_for_cpu(rdma->sc_pd->device,
...@@ -356,11 +353,18 @@ static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc) ...@@ -356,11 +353,18 @@ static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
spin_unlock(&rdma->sc_rq_dto_lock); spin_unlock(&rdma->sc_rq_dto_lock);
if (!test_bit(RDMAXPRT_CONN_PENDING, &rdma->sc_flags)) if (!test_bit(RDMAXPRT_CONN_PENDING, &rdma->sc_flags))
svc_xprt_enqueue(&rdma->sc_xprt); svc_xprt_enqueue(&rdma->sc_xprt);
if (!test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags) &&
rdma->sc_pending_recvs < rdma->sc_max_requests)
if (!svc_rdma_refresh_recvs(rdma, RPCRDMA_MAX_RECV_BATCH,
false))
goto post_err;
return; return;
flushed: flushed:
post_err:
svc_rdma_recv_ctxt_put(rdma, ctxt); svc_rdma_recv_ctxt_put(rdma, ctxt);
post_err:
set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags); set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
svc_xprt_enqueue(&rdma->sc_xprt); svc_xprt_enqueue(&rdma->sc_xprt);
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment