Commit 2fad6592 authored by Chuck Lever's avatar Chuck Lever Committed by Anna Schumaker

xprtrdma: Wait on empty sendctx queue

Currently, when the sendctx queue is exhausted during marshaling, the
RPC/RDMA transport places the RPC task on the delayq, which forces a
wait for HZ >> 2 before the marshal and send is retried.

With this change, the transport now places such an RPC task on the
pending queue, and wakes it just as soon as more sendctxs become
available. This typically takes less than a millisecond, and the
write_space waking mechanism is less deadlock-prone.

Moreover, the waiting RPC task is holding the transport's write
lock, which blocks the transport from sending RPCs. Therefore faster
recovery from sendctx queue exhaustion is desirable.

Cf. commit 5804891455d5 ("xprtrdma: ->send_request returns -EAGAIN
when there are no free MRs").
Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
Signed-off-by: default avatarAnna Schumaker <Anna.Schumaker@Netapp.com>
parent ed3aa742
......@@ -695,7 +695,7 @@ rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
{
req->rl_sendctx = rpcrdma_sendctx_get_locked(&r_xprt->rx_buf);
if (!req->rl_sendctx)
return -ENOBUFS;
return -EAGAIN;
req->rl_sendctx->sc_wr.num_sge = 0;
req->rl_sendctx->sc_unmap_count = 0;
req->rl_sendctx->sc_req = req;
......
......@@ -878,6 +878,7 @@ static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt)
sc->sc_xprt = r_xprt;
buf->rb_sc_ctxs[i] = sc;
}
buf->rb_flags = 0;
return 0;
......@@ -935,7 +936,7 @@ struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_buffer *buf)
* completions recently. This is a sign the Send Queue is
* backing up. Cause the caller to pause and try again.
*/
dprintk("RPC: %s: empty sendctx queue\n", __func__);
set_bit(RPCRDMA_BUF_F_EMPTY_SCQ, &buf->rb_flags);
r_xprt = container_of(buf, struct rpcrdma_xprt, rx_buf);
r_xprt->rx_stats.empty_sendctx_q++;
return NULL;
......@@ -970,6 +971,11 @@ rpcrdma_sendctx_put_locked(struct rpcrdma_sendctx *sc)
/* Paired with READ_ONCE */
smp_store_release(&buf->rb_sc_tail, next_tail);
if (test_and_clear_bit(RPCRDMA_BUF_F_EMPTY_SCQ, &buf->rb_flags)) {
smp_mb__after_atomic();
xprt_write_space(&sc->sc_xprt->rx_xprt);
}
}
static void
......
......@@ -400,6 +400,7 @@ struct rpcrdma_buffer {
spinlock_t rb_lock; /* protect buf lists */
struct list_head rb_send_bufs;
struct list_head rb_recv_bufs;
unsigned long rb_flags;
u32 rb_max_requests;
u32 rb_credits; /* most recent credit grant */
int rb_posted_receives;
......@@ -417,6 +418,11 @@ struct rpcrdma_buffer {
};
#define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia)
/* rb_flags */
enum {
RPCRDMA_BUF_F_EMPTY_SCQ = 0,
};
/*
* Internal structure for transport instance creation. This
* exists primarily for modularity.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment