Commit a937693a authored by Chuck Lever's avatar Chuck Lever

svcrdma: Add back svcxprt_rdma::sc_read_complete_q

Having an nfsd thread waiting for an RDMA Read completion is
problematic if the Read responder (ie, the client) stops responding.
We need to go back to handling RDMA Reads by allowing the nfsd
thread to return to the svc scheduler, then waking a second thread
finish the RPC message once the Read completion fires.

As a next step, add a list_head upon which completed Reads are queued.
A subsequent patch will make use of this queue.
Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
parent 4d9d69db
......@@ -98,6 +98,7 @@ struct svcxprt_rdma {
u32 sc_pending_recvs;
u32 sc_recv_batch;
struct list_head sc_rq_dto_q;
struct list_head sc_read_complete_q;
spinlock_t sc_rq_dto_lock;
struct ib_qp *sc_qp;
struct ib_cq *sc_rq_cq;
......
......@@ -382,6 +382,10 @@ void svc_rdma_flush_recv_queues(struct svcxprt_rdma *rdma)
{
struct svc_rdma_recv_ctxt *ctxt;
while ((ctxt = svc_rdma_next_recv_ctxt(&rdma->sc_read_complete_q))) {
list_del(&ctxt->rc_list);
svc_rdma_recv_ctxt_put(rdma, ctxt);
}
while ((ctxt = svc_rdma_next_recv_ctxt(&rdma->sc_rq_dto_q))) {
list_del(&ctxt->rc_list);
svc_rdma_recv_ctxt_put(rdma, ctxt);
......@@ -763,6 +767,30 @@ static bool svc_rdma_is_reverse_direction_reply(struct svc_xprt *xprt,
return true;
}
static noinline void svc_rdma_read_complete(struct svc_rqst *rqstp,
struct svc_rdma_recv_ctxt *ctxt)
{
int i;
/* Transfer the Read chunk pages into @rqstp.rq_pages, replacing
* the rq_pages that were already allocated for this rqstp.
*/
release_pages(rqstp->rq_respages, ctxt->rc_page_count);
for (i = 0; i < ctxt->rc_page_count; i++)
rqstp->rq_pages[i] = ctxt->rc_pages[i];
/* Update @rqstp's result send buffer to start after the
* last page in the RDMA Read payload.
*/
rqstp->rq_respages = &rqstp->rq_pages[ctxt->rc_page_count];
rqstp->rq_next_page = rqstp->rq_respages + 1;
/* Prevent svc_rdma_recv_ctxt_put() from releasing the
* pages in ctxt::rc_pages a second time.
*/
ctxt->rc_page_count = 0;
}
/**
* svc_rdma_recvfrom - Receive an RPC call
* @rqstp: request structure into which to receive an RPC Call
......@@ -807,8 +835,14 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
rqstp->rq_xprt_ctxt = NULL;
ctxt = NULL;
spin_lock(&rdma_xprt->sc_rq_dto_lock);
ctxt = svc_rdma_next_recv_ctxt(&rdma_xprt->sc_read_complete_q);
if (ctxt) {
list_del(&ctxt->rc_list);
spin_unlock_bh(&rdma_xprt->sc_rq_dto_lock);
svc_rdma_read_complete(rqstp, ctxt);
goto complete;
}
ctxt = svc_rdma_next_recv_ctxt(&rdma_xprt->sc_rq_dto_q);
if (ctxt)
list_del(&ctxt->rc_list);
......@@ -846,6 +880,7 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
goto out_readfail;
}
complete:
rqstp->rq_xprt_ctxt = ctxt;
rqstp->rq_prot = IPPROTO_MAX;
svc_xprt_copy_addrs(rqstp, xprt);
......
......@@ -137,6 +137,7 @@ static struct svcxprt_rdma *svc_rdma_create_xprt(struct svc_serv *serv,
svc_xprt_init(net, &svc_rdma_class, &cma_xprt->sc_xprt, serv);
INIT_LIST_HEAD(&cma_xprt->sc_accept_q);
INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q);
INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q);
init_llist_head(&cma_xprt->sc_send_ctxts);
init_llist_head(&cma_xprt->sc_recv_ctxts);
init_llist_head(&cma_xprt->sc_rw_ctxts);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment