Commit e4eb42ce authored by Chuck Lever's avatar Chuck Lever Committed by J. Bruce Fields

svcrdma: Remove BH-disabled spin locking in svc_rdma_send()

svcrdma's current SQ accounting algorithm takes sc_lock and disables
bottom-halves while posting all RDMA Read, Write, and Send WRs.

This is relatively heavyweight serialization. And note that Write and
Send are already fully serialized by the xpt_mutex.

Using a single atomic_t should be all that is necessary to guarantee
that ib_post_send() is called only when there is enough space on the
send queue. This is what the other RDMA-enabled storage targets do.
Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
Signed-off-by: default avatarJ. Bruce Fields <bfields@redhat.com>
parent 5fdca653
...@@ -139,7 +139,7 @@ struct svcxprt_rdma { ...@@ -139,7 +139,7 @@ struct svcxprt_rdma {
int sc_max_sge_rd; /* max sge for read target */ int sc_max_sge_rd; /* max sge for read target */
bool sc_snd_w_inv; /* OK to use Send With Invalidate */ bool sc_snd_w_inv; /* OK to use Send With Invalidate */
atomic_t sc_sq_count; /* Number of SQ WR on queue */ atomic_t sc_sq_avail; /* SQEs ready to be consumed */
unsigned int sc_sq_depth; /* Depth of SQ */ unsigned int sc_sq_depth; /* Depth of SQ */
unsigned int sc_rq_depth; /* Depth of RQ */ unsigned int sc_rq_depth; /* Depth of RQ */
u32 sc_max_requests; /* Forward credits */ u32 sc_max_requests; /* Forward credits */
......
...@@ -594,7 +594,12 @@ int svc_rdma_sendto(struct svc_rqst *rqstp) ...@@ -594,7 +594,12 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
goto err0; goto err0;
inline_bytes = rqstp->rq_res.len; inline_bytes = rqstp->rq_res.len;
/* Create the RDMA response header */ /* Create the RDMA response header. xprt->xpt_mutex,
* acquired in svc_send(), serializes RPC replies. The
* code path below that inserts the credit grant value
* into each transport header runs only inside this
* critical section.
*/
ret = -ENOMEM; ret = -ENOMEM;
res_page = alloc_page(GFP_KERNEL); res_page = alloc_page(GFP_KERNEL);
if (!res_page) if (!res_page)
......
...@@ -434,7 +434,7 @@ static void svc_rdma_send_wc_common(struct svcxprt_rdma *xprt, ...@@ -434,7 +434,7 @@ static void svc_rdma_send_wc_common(struct svcxprt_rdma *xprt,
goto err; goto err;
out: out:
atomic_dec(&xprt->sc_sq_count); atomic_inc(&xprt->sc_sq_avail);
wake_up(&xprt->sc_send_wait); wake_up(&xprt->sc_send_wait);
return; return;
...@@ -1008,6 +1008,7 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) ...@@ -1008,6 +1008,7 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
newxprt->sc_rq_depth = newxprt->sc_max_requests + newxprt->sc_rq_depth = newxprt->sc_max_requests +
newxprt->sc_max_bc_requests; newxprt->sc_max_bc_requests;
newxprt->sc_sq_depth = RPCRDMA_SQ_DEPTH_MULT * newxprt->sc_rq_depth; newxprt->sc_sq_depth = RPCRDMA_SQ_DEPTH_MULT * newxprt->sc_rq_depth;
atomic_set(&newxprt->sc_sq_avail, newxprt->sc_sq_depth);
if (!svc_rdma_prealloc_ctxts(newxprt)) if (!svc_rdma_prealloc_ctxts(newxprt))
goto errout; goto errout;
...@@ -1333,15 +1334,13 @@ int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr) ...@@ -1333,15 +1334,13 @@ int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
/* If the SQ is full, wait until an SQ entry is available */ /* If the SQ is full, wait until an SQ entry is available */
while (1) { while (1) {
spin_lock_bh(&xprt->sc_lock); if ((atomic_sub_return(wr_count, &xprt->sc_sq_avail) < 0)) {
if (xprt->sc_sq_depth < atomic_read(&xprt->sc_sq_count) + wr_count) {
spin_unlock_bh(&xprt->sc_lock);
atomic_inc(&rdma_stat_sq_starve); atomic_inc(&rdma_stat_sq_starve);
/* Wait until SQ WR available if SQ still full */ /* Wait until SQ WR available if SQ still full */
atomic_add(wr_count, &xprt->sc_sq_avail);
wait_event(xprt->sc_send_wait, wait_event(xprt->sc_send_wait,
atomic_read(&xprt->sc_sq_count) < atomic_read(&xprt->sc_sq_avail) > wr_count);
xprt->sc_sq_depth);
if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags)) if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags))
return -ENOTCONN; return -ENOTCONN;
continue; continue;
...@@ -1351,21 +1350,17 @@ int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr) ...@@ -1351,21 +1350,17 @@ int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
svc_xprt_get(&xprt->sc_xprt); svc_xprt_get(&xprt->sc_xprt);
/* Bump used SQ WR count and post */ /* Bump used SQ WR count and post */
atomic_add(wr_count, &xprt->sc_sq_count);
ret = ib_post_send(xprt->sc_qp, wr, &bad_wr); ret = ib_post_send(xprt->sc_qp, wr, &bad_wr);
if (ret) { if (ret) {
set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
atomic_sub(wr_count, &xprt->sc_sq_count);
for (i = 0; i < wr_count; i ++) for (i = 0; i < wr_count; i ++)
svc_xprt_put(&xprt->sc_xprt); svc_xprt_put(&xprt->sc_xprt);
dprintk("svcrdma: failed to post SQ WR rc=%d, " dprintk("svcrdma: failed to post SQ WR rc=%d\n", ret);
"sc_sq_count=%d, sc_sq_depth=%d\n", dprintk(" sc_sq_avail=%d, sc_sq_depth=%d\n",
ret, atomic_read(&xprt->sc_sq_count), atomic_read(&xprt->sc_sq_avail),
xprt->sc_sq_depth); xprt->sc_sq_depth);
}
spin_unlock_bh(&xprt->sc_lock);
if (ret)
wake_up(&xprt->sc_send_wait); wake_up(&xprt->sc_send_wait);
}
break; break;
} }
return ret; return ret;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment