Commit be99bb11 authored by Chuck Lever's avatar Chuck Lever Committed by J. Bruce Fields

svcrdma: Use new CQ API for RPC-over-RDMA server send CQs

Calling ib_poll_cq() to sort through WCs during a completion is a
common pattern amongst RDMA consumers. Since commit 14d3a3b2
("IB: add a proper completion queue abstraction"), WC sorting can
be handled by the IB core.

By converting to this new API, svcrdma is made a better neighbor to
other RDMA consumers, as it allows the core to schedule the delivery
of completions more fairly amongst all active consumers.

This new API also aims each completion at a function that is
specific to the WR's opcode. Thus the ctxt->wr_op field and the
switch in process_context is replaced by a set of methods that
handle each completion type.

Because each ib_cqe carries a pointer to a completion method, the
core can now post operations on a consumer's QP, and handle the
completions itself.

The server's rdma_stat_sq_poll and rdma_stat_sq_prod metrics are no
longer updated.

As a clean up, the cq_event_handler, the dto_tasklet, and all
associated locking is removed, as they are no longer referenced or
used.
Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
Tested-by: default avatarSteve Wise <swise@opengridcomputing.com>
Signed-off-by: default avatarJ. Bruce Fields <bfields@redhat.com>
parent 8bd5ba86
...@@ -76,8 +76,9 @@ struct svc_rdma_op_ctxt { ...@@ -76,8 +76,9 @@ struct svc_rdma_op_ctxt {
int hdr_count; int hdr_count;
struct xdr_buf arg; struct xdr_buf arg;
struct ib_cqe cqe; struct ib_cqe cqe;
struct ib_cqe reg_cqe;
struct ib_cqe inv_cqe;
struct list_head dto_q; struct list_head dto_q;
enum ib_wr_opcode wr_op;
enum ib_wc_status wc_status; enum ib_wc_status wc_status;
u32 byte_len; u32 byte_len;
u32 position; u32 position;
...@@ -175,7 +176,6 @@ struct svcxprt_rdma { ...@@ -175,7 +176,6 @@ struct svcxprt_rdma {
struct work_struct sc_work; struct work_struct sc_work;
}; };
/* sc_flags */ /* sc_flags */
#define RDMAXPRT_SQ_PENDING 2
#define RDMAXPRT_CONN_PENDING 3 #define RDMAXPRT_CONN_PENDING 3
#define RPCRDMA_LISTEN_BACKLOG 10 #define RPCRDMA_LISTEN_BACKLOG 10
...@@ -232,6 +232,11 @@ extern void svc_rdma_send_error(struct svcxprt_rdma *, struct rpcrdma_msg *, ...@@ -232,6 +232,11 @@ extern void svc_rdma_send_error(struct svcxprt_rdma *, struct rpcrdma_msg *,
int); int);
/* svc_rdma_transport.c */ /* svc_rdma_transport.c */
extern void svc_rdma_wc_send(struct ib_cq *, struct ib_wc *);
extern void svc_rdma_wc_write(struct ib_cq *, struct ib_wc *);
extern void svc_rdma_wc_reg(struct ib_cq *, struct ib_wc *);
extern void svc_rdma_wc_read(struct ib_cq *, struct ib_wc *);
extern void svc_rdma_wc_inv(struct ib_cq *, struct ib_wc *);
extern int svc_rdma_send(struct svcxprt_rdma *, struct ib_send_wr *); extern int svc_rdma_send(struct svcxprt_rdma *, struct ib_send_wr *);
extern int svc_rdma_post_recv(struct svcxprt_rdma *, gfp_t); extern int svc_rdma_post_recv(struct svcxprt_rdma *, gfp_t);
extern int svc_rdma_repost_recv(struct svcxprt_rdma *, gfp_t); extern int svc_rdma_repost_recv(struct svcxprt_rdma *, gfp_t);
......
...@@ -119,7 +119,6 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma, ...@@ -119,7 +119,6 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
ctxt->pages[0] = virt_to_page(rqst->rq_buffer); ctxt->pages[0] = virt_to_page(rqst->rq_buffer);
ctxt->count = 1; ctxt->count = 1;
ctxt->wr_op = IB_WR_SEND;
ctxt->direction = DMA_TO_DEVICE; ctxt->direction = DMA_TO_DEVICE;
ctxt->sge[0].lkey = rdma->sc_pd->local_dma_lkey; ctxt->sge[0].lkey = rdma->sc_pd->local_dma_lkey;
ctxt->sge[0].length = sndbuf->len; ctxt->sge[0].length = sndbuf->len;
...@@ -133,7 +132,8 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma, ...@@ -133,7 +132,8 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
atomic_inc(&rdma->sc_dma_used); atomic_inc(&rdma->sc_dma_used);
memset(&send_wr, 0, sizeof(send_wr)); memset(&send_wr, 0, sizeof(send_wr));
send_wr.wr_id = (unsigned long)ctxt; ctxt->cqe.done = svc_rdma_wc_send;
send_wr.wr_cqe = &ctxt->cqe;
send_wr.sg_list = ctxt->sge; send_wr.sg_list = ctxt->sge;
send_wr.num_sge = 1; send_wr.num_sge = 1;
send_wr.opcode = IB_WR_SEND; send_wr.opcode = IB_WR_SEND;
......
...@@ -180,9 +180,9 @@ int rdma_read_chunk_lcl(struct svcxprt_rdma *xprt, ...@@ -180,9 +180,9 @@ int rdma_read_chunk_lcl(struct svcxprt_rdma *xprt,
clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags); clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
memset(&read_wr, 0, sizeof(read_wr)); memset(&read_wr, 0, sizeof(read_wr));
read_wr.wr.wr_id = (unsigned long)ctxt; ctxt->cqe.done = svc_rdma_wc_read;
read_wr.wr.wr_cqe = &ctxt->cqe;
read_wr.wr.opcode = IB_WR_RDMA_READ; read_wr.wr.opcode = IB_WR_RDMA_READ;
ctxt->wr_op = read_wr.wr.opcode;
read_wr.wr.send_flags = IB_SEND_SIGNALED; read_wr.wr.send_flags = IB_SEND_SIGNALED;
read_wr.rkey = rs_handle; read_wr.rkey = rs_handle;
read_wr.remote_addr = rs_offset; read_wr.remote_addr = rs_offset;
...@@ -299,8 +299,9 @@ int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt, ...@@ -299,8 +299,9 @@ int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,
ctxt->read_hdr = head; ctxt->read_hdr = head;
/* Prepare REG WR */ /* Prepare REG WR */
ctxt->reg_cqe.done = svc_rdma_wc_reg;
reg_wr.wr.wr_cqe = &ctxt->reg_cqe;
reg_wr.wr.opcode = IB_WR_REG_MR; reg_wr.wr.opcode = IB_WR_REG_MR;
reg_wr.wr.wr_id = 0;
reg_wr.wr.send_flags = IB_SEND_SIGNALED; reg_wr.wr.send_flags = IB_SEND_SIGNALED;
reg_wr.wr.num_sge = 0; reg_wr.wr.num_sge = 0;
reg_wr.mr = frmr->mr; reg_wr.mr = frmr->mr;
...@@ -310,6 +311,8 @@ int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt, ...@@ -310,6 +311,8 @@ int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,
/* Prepare RDMA_READ */ /* Prepare RDMA_READ */
memset(&read_wr, 0, sizeof(read_wr)); memset(&read_wr, 0, sizeof(read_wr));
ctxt->cqe.done = svc_rdma_wc_read;
read_wr.wr.wr_cqe = &ctxt->cqe;
read_wr.wr.send_flags = IB_SEND_SIGNALED; read_wr.wr.send_flags = IB_SEND_SIGNALED;
read_wr.rkey = rs_handle; read_wr.rkey = rs_handle;
read_wr.remote_addr = rs_offset; read_wr.remote_addr = rs_offset;
...@@ -317,19 +320,18 @@ int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt, ...@@ -317,19 +320,18 @@ int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,
read_wr.wr.num_sge = 1; read_wr.wr.num_sge = 1;
if (xprt->sc_dev_caps & SVCRDMA_DEVCAP_READ_W_INV) { if (xprt->sc_dev_caps & SVCRDMA_DEVCAP_READ_W_INV) {
read_wr.wr.opcode = IB_WR_RDMA_READ_WITH_INV; read_wr.wr.opcode = IB_WR_RDMA_READ_WITH_INV;
read_wr.wr.wr_id = (unsigned long)ctxt;
read_wr.wr.ex.invalidate_rkey = ctxt->frmr->mr->lkey; read_wr.wr.ex.invalidate_rkey = ctxt->frmr->mr->lkey;
} else { } else {
read_wr.wr.opcode = IB_WR_RDMA_READ; read_wr.wr.opcode = IB_WR_RDMA_READ;
read_wr.wr.next = &inv_wr; read_wr.wr.next = &inv_wr;
/* Prepare invalidate */ /* Prepare invalidate */
memset(&inv_wr, 0, sizeof(inv_wr)); memset(&inv_wr, 0, sizeof(inv_wr));
inv_wr.wr_id = (unsigned long)ctxt; ctxt->inv_cqe.done = svc_rdma_wc_inv;
inv_wr.wr_cqe = &ctxt->inv_cqe;
inv_wr.opcode = IB_WR_LOCAL_INV; inv_wr.opcode = IB_WR_LOCAL_INV;
inv_wr.send_flags = IB_SEND_SIGNALED | IB_SEND_FENCE; inv_wr.send_flags = IB_SEND_SIGNALED | IB_SEND_FENCE;
inv_wr.ex.invalidate_rkey = frmr->mr->lkey; inv_wr.ex.invalidate_rkey = frmr->mr->lkey;
} }
ctxt->wr_op = read_wr.wr.opcode;
/* Post the chain */ /* Post the chain */
ret = svc_rdma_send(xprt, &reg_wr.wr); ret = svc_rdma_send(xprt, &reg_wr.wr);
......
...@@ -297,8 +297,8 @@ static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp, ...@@ -297,8 +297,8 @@ static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,
/* Prepare WRITE WR */ /* Prepare WRITE WR */
memset(&write_wr, 0, sizeof write_wr); memset(&write_wr, 0, sizeof write_wr);
ctxt->wr_op = IB_WR_RDMA_WRITE; ctxt->cqe.done = svc_rdma_wc_write;
write_wr.wr.wr_id = (unsigned long)ctxt; write_wr.wr.wr_cqe = &ctxt->cqe;
write_wr.wr.sg_list = &sge[0]; write_wr.wr.sg_list = &sge[0];
write_wr.wr.num_sge = sge_no; write_wr.wr.num_sge = sge_no;
write_wr.wr.opcode = IB_WR_RDMA_WRITE; write_wr.wr.opcode = IB_WR_RDMA_WRITE;
...@@ -549,8 +549,8 @@ static int send_reply(struct svcxprt_rdma *rdma, ...@@ -549,8 +549,8 @@ static int send_reply(struct svcxprt_rdma *rdma,
goto err; goto err;
} }
memset(&send_wr, 0, sizeof send_wr); memset(&send_wr, 0, sizeof send_wr);
ctxt->wr_op = IB_WR_SEND; ctxt->cqe.done = svc_rdma_wc_send;
send_wr.wr_id = (unsigned long)ctxt; send_wr.wr_cqe = &ctxt->cqe;
send_wr.sg_list = ctxt->sge; send_wr.sg_list = ctxt->sge;
send_wr.num_sge = sge_no; send_wr.num_sge = sge_no;
send_wr.opcode = IB_WR_SEND; send_wr.opcode = IB_WR_SEND;
...@@ -698,8 +698,8 @@ void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp, ...@@ -698,8 +698,8 @@ void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
/* Prepare SEND WR */ /* Prepare SEND WR */
memset(&err_wr, 0, sizeof(err_wr)); memset(&err_wr, 0, sizeof(err_wr));
ctxt->wr_op = IB_WR_SEND; ctxt->cqe.done = svc_rdma_wc_send;
err_wr.wr_id = (unsigned long)ctxt; err_wr.wr_cqe = &ctxt->cqe;
err_wr.sg_list = ctxt->sge; err_wr.sg_list = ctxt->sge;
err_wr.num_sge = 1; err_wr.num_sge = 1;
err_wr.opcode = IB_WR_SEND; err_wr.opcode = IB_WR_SEND;
......
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment