Commit 0ab11523 authored by Chuck Lever's avatar Chuck Lever Committed by Anna Schumaker

xprtrdma: Wake RPCs directly in rpcrdma_wc_send path

Eliminate a context switch in the path that handles RPC wake-ups
when a Receive completion has to wait for a Send completion.
Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
Signed-off-by: default avatarAnna Schumaker <Anna.Schumaker@Netapp.com>
parent d8099fed
...@@ -511,6 +511,16 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req, ...@@ -511,6 +511,16 @@ rpcrdma_encode_reply_chunk(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
return 0; return 0;
} }
static void rpcrdma_sendctx_done(struct kref *kref)
{
struct rpcrdma_req *req =
container_of(kref, struct rpcrdma_req, rl_kref);
struct rpcrdma_rep *rep = req->rl_reply;
rpcrdma_complete_rqst(rep);
rep->rr_rxprt->rx_stats.reply_waits_for_send++;
}
/** /**
* rpcrdma_sendctx_unmap - DMA-unmap Send buffer * rpcrdma_sendctx_unmap - DMA-unmap Send buffer
* @sc: sendctx containing SGEs to unmap * @sc: sendctx containing SGEs to unmap
...@@ -520,6 +530,9 @@ void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc) ...@@ -520,6 +530,9 @@ void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc)
{ {
struct ib_sge *sge; struct ib_sge *sge;
if (!sc->sc_unmap_count)
return;
/* The first two SGEs contain the transport header and /* The first two SGEs contain the transport header and
* the inline buffer. These are always left mapped so * the inline buffer. These are always left mapped so
* they can be cheaply re-used. * they can be cheaply re-used.
...@@ -529,9 +542,7 @@ void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc) ...@@ -529,9 +542,7 @@ void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc)
ib_dma_unmap_page(sc->sc_device, sge->addr, sge->length, ib_dma_unmap_page(sc->sc_device, sge->addr, sge->length,
DMA_TO_DEVICE); DMA_TO_DEVICE);
if (test_and_clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, kref_put(&sc->sc_req->rl_kref, rpcrdma_sendctx_done);
&sc->sc_req->rl_flags))
wake_up_bit(&sc->sc_req->rl_flags, RPCRDMA_REQ_F_TX_RESOURCES);
} }
/* Prepare an SGE for the RPC-over-RDMA transport header. /* Prepare an SGE for the RPC-over-RDMA transport header.
...@@ -666,7 +677,7 @@ static bool rpcrdma_prepare_msg_sges(struct rpcrdma_xprt *r_xprt, ...@@ -666,7 +677,7 @@ static bool rpcrdma_prepare_msg_sges(struct rpcrdma_xprt *r_xprt,
out: out:
sc->sc_wr.num_sge += sge_no; sc->sc_wr.num_sge += sge_no;
if (sc->sc_unmap_count) if (sc->sc_unmap_count)
__set_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags); kref_get(&req->rl_kref);
return true; return true;
out_regbuf: out_regbuf:
...@@ -708,7 +719,7 @@ rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt, ...@@ -708,7 +719,7 @@ rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
req->rl_sendctx->sc_wr.num_sge = 0; req->rl_sendctx->sc_wr.num_sge = 0;
req->rl_sendctx->sc_unmap_count = 0; req->rl_sendctx->sc_unmap_count = 0;
req->rl_sendctx->sc_req = req; req->rl_sendctx->sc_req = req;
__clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags); kref_init(&req->rl_kref);
ret = -EIO; ret = -EIO;
if (!rpcrdma_prepare_hdr_sge(r_xprt, req, hdrlen)) if (!rpcrdma_prepare_hdr_sge(r_xprt, req, hdrlen))
...@@ -1268,36 +1279,12 @@ void rpcrdma_complete_rqst(struct rpcrdma_rep *rep) ...@@ -1268,36 +1279,12 @@ void rpcrdma_complete_rqst(struct rpcrdma_rep *rep)
goto out; goto out;
} }
/* Ensure that any DMA mapped pages associated with static void rpcrdma_reply_done(struct kref *kref)
* the Send of the RPC Call have been unmapped before
* allowing the RPC to complete. This protects argument
* memory not controlled by the RPC client from being
* re-used before we're done with it.
*/
static void rpcrdma_release_tx(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_req *req)
{ {
if (test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) { struct rpcrdma_req *req =
r_xprt->rx_stats.reply_waits_for_send++; container_of(kref, struct rpcrdma_req, rl_kref);
out_of_line_wait_on_bit(&req->rl_flags,
RPCRDMA_REQ_F_TX_RESOURCES,
bit_wait,
TASK_UNINTERRUPTIBLE);
}
}
/** rpcrdma_complete_rqst(req->rl_reply);
* rpcrdma_release_rqst - Release hardware resources
* @r_xprt: controlling transport instance
* @req: request with resources to release
*
*/
void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
{
if (!list_empty(&req->rl_registered))
frwr_unmap_sync(r_xprt, req);
rpcrdma_release_tx(r_xprt, req);
} }
/** /**
...@@ -1367,13 +1354,11 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep) ...@@ -1367,13 +1354,11 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE) if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE)
frwr_reminv(rep, &req->rl_registered); frwr_reminv(rep, &req->rl_registered);
if (!list_empty(&req->rl_registered)) { if (!list_empty(&req->rl_registered))
frwr_unmap_async(r_xprt, req); frwr_unmap_async(r_xprt, req);
/* LocalInv completion will complete the RPC */ /* LocalInv completion will complete the RPC */
} else { else
rpcrdma_release_tx(r_xprt, req); kref_put(&req->rl_kref, rpcrdma_reply_done);
rpcrdma_complete_rqst(rep);
}
return; return;
out_badversion: out_badversion:
......
...@@ -618,8 +618,16 @@ xprt_rdma_free(struct rpc_task *task) ...@@ -618,8 +618,16 @@ xprt_rdma_free(struct rpc_task *task)
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt); struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
struct rpcrdma_req *req = rpcr_to_rdmar(rqst); struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
rpcrdma_release_rqst(r_xprt, req);
trace_xprtrdma_op_free(task, req); trace_xprtrdma_op_free(task, req);
if (!list_empty(&req->rl_registered))
frwr_unmap_sync(r_xprt, req);
/* XXX: If the RPC is completing because of a signal and
* not because a reply was received, we ought to ensure
* that the Send completion has fired, so that memory
* involved with the Send is not still visible to the NIC.
*/
} }
/** /**
......
...@@ -1462,8 +1462,7 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia, ...@@ -1462,8 +1462,7 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
struct ib_send_wr *send_wr = &req->rl_sendctx->sc_wr; struct ib_send_wr *send_wr = &req->rl_sendctx->sc_wr;
int rc; int rc;
if (!ep->rep_send_count || if (!ep->rep_send_count || kref_read(&req->rl_kref) > 1) {
test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) {
send_wr->send_flags |= IB_SEND_SIGNALED; send_wr->send_flags |= IB_SEND_SIGNALED;
ep->rep_send_count = ep->rep_send_batch; ep->rep_send_count = ep->rep_send_batch;
} else { } else {
......
...@@ -45,6 +45,7 @@ ...@@ -45,6 +45,7 @@
#include <linux/wait.h> /* wait_queue_head_t, etc */ #include <linux/wait.h> /* wait_queue_head_t, etc */
#include <linux/spinlock.h> /* spinlock_t, etc */ #include <linux/spinlock.h> /* spinlock_t, etc */
#include <linux/atomic.h> /* atomic_t, etc */ #include <linux/atomic.h> /* atomic_t, etc */
#include <linux/kref.h> /* struct kref */
#include <linux/workqueue.h> /* struct work_struct */ #include <linux/workqueue.h> /* struct work_struct */
#include <rdma/rdma_cm.h> /* RDMA connection api */ #include <rdma/rdma_cm.h> /* RDMA connection api */
...@@ -329,17 +330,12 @@ struct rpcrdma_req { ...@@ -329,17 +330,12 @@ struct rpcrdma_req {
struct rpcrdma_regbuf *rl_recvbuf; /* rq_rcv_buf */ struct rpcrdma_regbuf *rl_recvbuf; /* rq_rcv_buf */
struct list_head rl_all; struct list_head rl_all;
unsigned long rl_flags; struct kref rl_kref;
struct list_head rl_registered; /* registered segments */ struct list_head rl_registered; /* registered segments */
struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS]; struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];
}; };
/* rl_flags */
enum {
RPCRDMA_REQ_F_TX_RESOURCES,
};
static inline struct rpcrdma_req * static inline struct rpcrdma_req *
rpcr_to_rdmar(const struct rpc_rqst *rqst) rpcr_to_rdmar(const struct rpc_rqst *rqst)
{ {
...@@ -584,8 +580,6 @@ int rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst); ...@@ -584,8 +580,6 @@ int rpcrdma_marshal_req(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst);
void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *); void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *);
void rpcrdma_complete_rqst(struct rpcrdma_rep *rep); void rpcrdma_complete_rqst(struct rpcrdma_rep *rep);
void rpcrdma_reply_handler(struct rpcrdma_rep *rep); void rpcrdma_reply_handler(struct rpcrdma_rep *rep);
void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt,
struct rpcrdma_req *req);
static inline void rpcrdma_set_xdrlen(struct xdr_buf *xdr, size_t len) static inline void rpcrdma_set_xdrlen(struct xdr_buf *xdr, size_t len)
{ {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment