Commit 8d38de65 authored by Chuck Lever's avatar Chuck Lever Committed by Anna Schumaker

xprtrdma: Make FRWR send queue entry accounting more accurate

Verbs providers may perform house-keeping on the Send Queue during
each signaled send completion. It is necessary therefore for a verbs
consumer (like xprtrdma) to occasionally force a signaled send
completion if it runs unsignaled most of the time.

xprtrdma does not require signaled completions for Send or FastReg
Work Requests, but does signal some LocalInv Work Requests. To
ensure that Send Queue house-keeping can run before the Send Queue
is more than half-consumed, xprtrdma forces a signaled completion
on occasion by counting the number of Send Queue Entries it
consumes. It currently does this by counting each ib_post_send as
one Entry.

Commit c9918ff5 ("xprtrdma: Add ro_unmap_sync method for FRWR")
introduced the ability for frwr_op_unmap_sync to post more than one
Work Request with a single post_send. Thus the underlying assumption
of one Send Queue Entry per ib_post_send is no longer true.

Also, FastReg Work Requests are currently never signaled. They
should be signaled once in a while, just as Send is, to keep the
accounting of consumed SQEs accurate.

While we're here, convert the CQCOUNT macros to the currently
preferred kernel coding style, which is inline functions.

Fixes: c9918ff5 ("xprtrdma: Add ro_unmap_sync method for FRWR")
Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
Signed-off-by: default avatarAnna Schumaker <Anna.Schumaker@Netapp.com>
parent 62aee0e3
...@@ -421,7 +421,7 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg, ...@@ -421,7 +421,7 @@ frwr_op_map(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mr_seg *seg,
IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE : IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
IB_ACCESS_REMOTE_READ; IB_ACCESS_REMOTE_READ;
DECR_CQCOUNT(&r_xprt->rx_ep); rpcrdma_set_signaled(&r_xprt->rx_ep, &reg_wr->wr);
rc = ib_post_send(ia->ri_id->qp, &reg_wr->wr, &bad_wr); rc = ib_post_send(ia->ri_id->qp, &reg_wr->wr, &bad_wr);
if (rc) if (rc)
goto out_senderr; goto out_senderr;
...@@ -486,7 +486,7 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) ...@@ -486,7 +486,7 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
struct rpcrdma_ia *ia = &r_xprt->rx_ia; struct rpcrdma_ia *ia = &r_xprt->rx_ia;
struct rpcrdma_mw *mw, *tmp; struct rpcrdma_mw *mw, *tmp;
struct rpcrdma_frmr *f; struct rpcrdma_frmr *f;
int rc; int count, rc;
dprintk("RPC: %s: req %p\n", __func__, req); dprintk("RPC: %s: req %p\n", __func__, req);
...@@ -496,6 +496,7 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) ...@@ -496,6 +496,7 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
* a single ib_post_send() call. * a single ib_post_send() call.
*/ */
f = NULL; f = NULL;
count = 0;
invalidate_wrs = pos = prev = NULL; invalidate_wrs = pos = prev = NULL;
list_for_each_entry(mw, &req->rl_registered, mw_list) { list_for_each_entry(mw, &req->rl_registered, mw_list) {
if ((rep->rr_wc_flags & IB_WC_WITH_INVALIDATE) && if ((rep->rr_wc_flags & IB_WC_WITH_INVALIDATE) &&
...@@ -505,6 +506,7 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) ...@@ -505,6 +506,7 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
} }
pos = __frwr_prepare_linv_wr(mw); pos = __frwr_prepare_linv_wr(mw);
count++;
if (!invalidate_wrs) if (!invalidate_wrs)
invalidate_wrs = pos; invalidate_wrs = pos;
...@@ -523,7 +525,12 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req) ...@@ -523,7 +525,12 @@ frwr_op_unmap_sync(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
f->fr_invwr.send_flags = IB_SEND_SIGNALED; f->fr_invwr.send_flags = IB_SEND_SIGNALED;
f->fr_cqe.done = frwr_wc_localinv_wake; f->fr_cqe.done = frwr_wc_localinv_wake;
reinit_completion(&f->fr_linv_done); reinit_completion(&f->fr_linv_done);
INIT_CQCOUNT(&r_xprt->rx_ep);
/* Initialize CQ count, since there is always a signaled
* WR being posted here. The new cqcount depends on how
* many SQEs are about to be consumed.
*/
rpcrdma_init_cqcount(&r_xprt->rx_ep, count);
/* Transport disconnect drains the receive CQ before it /* Transport disconnect drains the receive CQ before it
* replaces the QP. The RPC reply handler won't call us * replaces the QP. The RPC reply handler won't call us
......
...@@ -532,7 +532,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia, ...@@ -532,7 +532,7 @@ rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1; ep->rep_cqinit = ep->rep_attr.cap.max_send_wr/2 - 1;
if (ep->rep_cqinit <= 2) if (ep->rep_cqinit <= 2)
ep->rep_cqinit = 0; /* always signal? */ ep->rep_cqinit = 0; /* always signal? */
INIT_CQCOUNT(ep); rpcrdma_init_cqcount(ep, 0);
init_waitqueue_head(&ep->rep_connect_wait); init_waitqueue_head(&ep->rep_connect_wait);
INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker); INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
...@@ -1311,13 +1311,7 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia, ...@@ -1311,13 +1311,7 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
dprintk("RPC: %s: posting %d s/g entries\n", dprintk("RPC: %s: posting %d s/g entries\n",
__func__, send_wr->num_sge); __func__, send_wr->num_sge);
if (DECR_CQCOUNT(ep) > 0) rpcrdma_set_signaled(ep, send_wr);
send_wr->send_flags = 0;
else { /* Provider must take a send completion every now and then */
INIT_CQCOUNT(ep);
send_wr->send_flags = IB_SEND_SIGNALED;
}
rc = ib_post_send(ia->ri_id->qp, send_wr, &send_wr_fail); rc = ib_post_send(ia->ri_id->qp, send_wr, &send_wr_fail);
if (rc) if (rc)
goto out_postsend_err; goto out_postsend_err;
......
...@@ -95,8 +95,24 @@ struct rpcrdma_ep { ...@@ -95,8 +95,24 @@ struct rpcrdma_ep {
struct delayed_work rep_connect_worker; struct delayed_work rep_connect_worker;
}; };
#define INIT_CQCOUNT(ep) atomic_set(&(ep)->rep_cqcount, (ep)->rep_cqinit) static inline void
#define DECR_CQCOUNT(ep) atomic_sub_return(1, &(ep)->rep_cqcount) rpcrdma_init_cqcount(struct rpcrdma_ep *ep, int count)
{
atomic_set(&ep->rep_cqcount, ep->rep_cqinit - count);
}
/* To update send queue accounting, provider must take a
* send completion every now and then.
*/
static inline void
rpcrdma_set_signaled(struct rpcrdma_ep *ep, struct ib_send_wr *send_wr)
{
send_wr->send_flags = 0;
if (unlikely(atomic_sub_return(1, &ep->rep_cqcount) <= 0)) {
rpcrdma_init_cqcount(ep, 0);
send_wr->send_flags = IB_SEND_SIGNALED;
}
}
/* Pre-allocate extra Work Requests for handling backward receives /* Pre-allocate extra Work Requests for handling backward receives
* and sends. This is a fixed value because the Work Queues are * and sends. This is a fixed value because the Work Queues are
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment