Commit 4201c746 authored by Chuck Lever's avatar Chuck Lever Committed by J. Bruce Fields

svcrdma: Introduce svc_rdma_send_ctxt

svc_rdma_op_ctxt's are pre-allocated and maintained on a per-xprt
free list. This eliminates the overhead of calling kmalloc / kfree,
both of which grab a globally shared lock that disables interrupts.
Introduce a replacement to svc_rdma_op_ctxt's that is built
especially for the svcrdma Send path.

Subsequent patches will take advantage of this new structure by
allocating real resources which are then cached in these objects.
The allocations are freed when the transport is torn down.

I've renamed the structure so that static type checking can be used
to ensure that uses of op_ctxt and send_ctxt are not confused. As an
additional clean up, structure fields are renamed to conform with
kernel coding conventions.

Additional clean ups:
- Handle svc_rdma_send_ctxt_get allocation failure at each call
  site, rather than pre-allocating and hoping we guessed correctly
- All send_ctxt_put call-sites request page freeing, so remove
  the @free_pages argument
- All send_ctxt_put call-sites unmap SGEs, so fold that into
  svc_rdma_send_ctxt_put
Signed-off-by: default avatarChuck Lever <chuck.lever@oracle.com>
Signed-off-by: default avatarJ. Bruce Fields <bfields@redhat.com>
parent 23262790
...@@ -109,8 +109,8 @@ struct svcxprt_rdma { ...@@ -109,8 +109,8 @@ struct svcxprt_rdma {
struct ib_pd *sc_pd; struct ib_pd *sc_pd;
spinlock_t sc_ctxt_lock; spinlock_t sc_send_lock;
struct list_head sc_ctxts; struct list_head sc_send_ctxts;
int sc_ctxt_used; int sc_ctxt_used;
spinlock_t sc_rw_ctxt_lock; spinlock_t sc_rw_ctxt_lock;
struct list_head sc_rw_ctxts; struct list_head sc_rw_ctxts;
...@@ -158,6 +158,19 @@ struct svc_rdma_recv_ctxt { ...@@ -158,6 +158,19 @@ struct svc_rdma_recv_ctxt {
struct page *rc_pages[RPCSVC_MAXPAGES]; struct page *rc_pages[RPCSVC_MAXPAGES];
}; };
enum {
RPCRDMA_MAX_SGES = 1 + (RPCRDMA_MAX_INLINE_THRESH / PAGE_SIZE),
};
struct svc_rdma_send_ctxt {
struct list_head sc_list;
struct ib_send_wr sc_send_wr;
struct ib_cqe sc_cqe;
int sc_page_count;
struct page *sc_pages[RPCSVC_MAXPAGES];
struct ib_sge sc_sges[RPCRDMA_MAX_SGES];
};
/* svc_rdma_backchannel.c */ /* svc_rdma_backchannel.c */
extern int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, extern int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt,
__be32 *rdma_resp, __be32 *rdma_resp,
...@@ -183,24 +196,22 @@ extern int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma, ...@@ -183,24 +196,22 @@ extern int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
struct xdr_buf *xdr); struct xdr_buf *xdr);
/* svc_rdma_sendto.c */ /* svc_rdma_sendto.c */
extern void svc_rdma_send_ctxts_destroy(struct svcxprt_rdma *rdma);
extern struct svc_rdma_send_ctxt *
svc_rdma_send_ctxt_get(struct svcxprt_rdma *rdma);
extern void svc_rdma_send_ctxt_put(struct svcxprt_rdma *rdma,
struct svc_rdma_send_ctxt *ctxt);
extern int svc_rdma_send(struct svcxprt_rdma *rdma, struct ib_send_wr *wr);
extern int svc_rdma_map_reply_hdr(struct svcxprt_rdma *rdma, extern int svc_rdma_map_reply_hdr(struct svcxprt_rdma *rdma,
struct svc_rdma_op_ctxt *ctxt, struct svc_rdma_send_ctxt *ctxt,
__be32 *rdma_resp, unsigned int len); __be32 *rdma_resp, unsigned int len);
extern int svc_rdma_post_send_wr(struct svcxprt_rdma *rdma, extern int svc_rdma_post_send_wr(struct svcxprt_rdma *rdma,
struct svc_rdma_op_ctxt *ctxt, struct svc_rdma_send_ctxt *ctxt,
u32 inv_rkey); u32 inv_rkey);
extern int svc_rdma_sendto(struct svc_rqst *); extern int svc_rdma_sendto(struct svc_rqst *);
/* svc_rdma_transport.c */ /* svc_rdma_transport.c */
extern void svc_rdma_wc_send(struct ib_cq *, struct ib_wc *);
extern void svc_rdma_wc_reg(struct ib_cq *, struct ib_wc *);
extern void svc_rdma_wc_read(struct ib_cq *, struct ib_wc *);
extern void svc_rdma_wc_inv(struct ib_cq *, struct ib_wc *);
extern int svc_rdma_send(struct svcxprt_rdma *, struct ib_send_wr *);
extern int svc_rdma_create_listen(struct svc_serv *, int, struct sockaddr *); extern int svc_rdma_create_listen(struct svc_serv *, int, struct sockaddr *);
extern struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *);
extern void svc_rdma_put_context(struct svc_rdma_op_ctxt *, int);
extern void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt);
extern void svc_sq_reap(struct svcxprt_rdma *); extern void svc_sq_reap(struct svcxprt_rdma *);
extern void svc_rq_reap(struct svcxprt_rdma *); extern void svc_rq_reap(struct svcxprt_rdma *);
extern void svc_rdma_prep_reply_hdr(struct svc_rqst *); extern void svc_rdma_prep_reply_hdr(struct svc_rqst *);
......
// SPDX-License-Identifier: GPL-2.0 // SPDX-License-Identifier: GPL-2.0
/* /*
* Copyright (c) 2015 Oracle. All rights reserved. * Copyright (c) 2015-2018 Oracle. All rights reserved.
* *
* Support for backward direction RPCs on RPC/RDMA (server-side). * Support for backward direction RPCs on RPC/RDMA (server-side).
*/ */
...@@ -117,10 +117,14 @@ int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, __be32 *rdma_resp, ...@@ -117,10 +117,14 @@ int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, __be32 *rdma_resp,
static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma, static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
struct rpc_rqst *rqst) struct rpc_rqst *rqst)
{ {
struct svc_rdma_op_ctxt *ctxt; struct svc_rdma_send_ctxt *ctxt;
int ret; int ret;
ctxt = svc_rdma_get_context(rdma); ctxt = svc_rdma_send_ctxt_get(rdma);
if (!ctxt) {
ret = -ENOMEM;
goto out_err;
}
/* rpcrdma_bc_send_request builds the transport header and /* rpcrdma_bc_send_request builds the transport header and
* the backchannel RPC message in the same buffer. Thus only * the backchannel RPC message in the same buffer. Thus only
...@@ -144,8 +148,7 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma, ...@@ -144,8 +148,7 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
return ret; return ret;
out_unmap: out_unmap:
svc_rdma_unmap_dma(ctxt); svc_rdma_send_ctxt_put(rdma, ctxt);
svc_rdma_put_context(ctxt, 1);
ret = -EIO; ret = -EIO;
goto out_err; goto out_err;
} }
......
...@@ -601,7 +601,7 @@ static void rdma_read_complete(struct svc_rqst *rqstp, ...@@ -601,7 +601,7 @@ static void rdma_read_complete(struct svc_rqst *rqstp,
static void svc_rdma_send_error(struct svcxprt_rdma *xprt, static void svc_rdma_send_error(struct svcxprt_rdma *xprt,
__be32 *rdma_argp, int status) __be32 *rdma_argp, int status)
{ {
struct svc_rdma_op_ctxt *ctxt; struct svc_rdma_send_ctxt *ctxt;
__be32 *p, *err_msgp; __be32 *p, *err_msgp;
unsigned int length; unsigned int length;
struct page *page; struct page *page;
...@@ -631,7 +631,10 @@ static void svc_rdma_send_error(struct svcxprt_rdma *xprt, ...@@ -631,7 +631,10 @@ static void svc_rdma_send_error(struct svcxprt_rdma *xprt,
length = (unsigned long)p - (unsigned long)err_msgp; length = (unsigned long)p - (unsigned long)err_msgp;
/* Map transport header; no RPC message payload */ /* Map transport header; no RPC message payload */
ctxt = svc_rdma_get_context(xprt); ctxt = svc_rdma_send_ctxt_get(xprt);
if (!ctxt)
return;
ret = svc_rdma_map_reply_hdr(xprt, ctxt, err_msgp, length); ret = svc_rdma_map_reply_hdr(xprt, ctxt, err_msgp, length);
if (ret) { if (ret) {
dprintk("svcrdma: Error %d mapping send for protocol error\n", dprintk("svcrdma: Error %d mapping send for protocol error\n",
...@@ -640,10 +643,8 @@ static void svc_rdma_send_error(struct svcxprt_rdma *xprt, ...@@ -640,10 +643,8 @@ static void svc_rdma_send_error(struct svcxprt_rdma *xprt,
} }
ret = svc_rdma_post_send_wr(xprt, ctxt, 0); ret = svc_rdma_post_send_wr(xprt, ctxt, 0);
if (ret) { if (ret)
svc_rdma_unmap_dma(ctxt); svc_rdma_send_ctxt_put(xprt, ctxt);
svc_rdma_put_context(ctxt, 1);
}
} }
/* By convention, backchannel calls arrive via rdma_msg type /* By convention, backchannel calls arrive via rdma_msg type
......
This diff is collapsed.
// SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
/* /*
* Copyright (c) 2015-2018 Oracle. All rights reserved.
* Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved. * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
* Copyright (c) 2005-2007 Network Appliance, Inc. All rights reserved. * Copyright (c) 2005-2007 Network Appliance, Inc. All rights reserved.
* *
...@@ -157,114 +158,6 @@ static void svc_rdma_bc_free(struct svc_xprt *xprt) ...@@ -157,114 +158,6 @@ static void svc_rdma_bc_free(struct svc_xprt *xprt)
} }
#endif /* CONFIG_SUNRPC_BACKCHANNEL */ #endif /* CONFIG_SUNRPC_BACKCHANNEL */
static struct svc_rdma_op_ctxt *alloc_ctxt(struct svcxprt_rdma *xprt,
gfp_t flags)
{
struct svc_rdma_op_ctxt *ctxt;
ctxt = kmalloc(sizeof(*ctxt), flags);
if (ctxt) {
ctxt->xprt = xprt;
INIT_LIST_HEAD(&ctxt->list);
}
return ctxt;
}
static bool svc_rdma_prealloc_ctxts(struct svcxprt_rdma *xprt)
{
unsigned int i;
i = xprt->sc_sq_depth;
while (i--) {
struct svc_rdma_op_ctxt *ctxt;
ctxt = alloc_ctxt(xprt, GFP_KERNEL);
if (!ctxt) {
dprintk("svcrdma: No memory for RDMA ctxt\n");
return false;
}
list_add(&ctxt->list, &xprt->sc_ctxts);
}
return true;
}
struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
{
struct svc_rdma_op_ctxt *ctxt = NULL;
spin_lock(&xprt->sc_ctxt_lock);
xprt->sc_ctxt_used++;
if (list_empty(&xprt->sc_ctxts))
goto out_empty;
ctxt = list_first_entry(&xprt->sc_ctxts,
struct svc_rdma_op_ctxt, list);
list_del(&ctxt->list);
spin_unlock(&xprt->sc_ctxt_lock);
out:
ctxt->count = 0;
ctxt->mapped_sges = 0;
return ctxt;
out_empty:
/* Either pre-allocation missed the mark, or send
* queue accounting is broken.
*/
spin_unlock(&xprt->sc_ctxt_lock);
ctxt = alloc_ctxt(xprt, GFP_NOIO);
if (ctxt)
goto out;
spin_lock(&xprt->sc_ctxt_lock);
xprt->sc_ctxt_used--;
spin_unlock(&xprt->sc_ctxt_lock);
WARN_ONCE(1, "svcrdma: empty RDMA ctxt list?\n");
return NULL;
}
void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt)
{
struct svcxprt_rdma *xprt = ctxt->xprt;
struct ib_device *device = xprt->sc_cm_id->device;
unsigned int i;
for (i = 0; i < ctxt->mapped_sges; i++)
ib_dma_unmap_page(device,
ctxt->sge[i].addr,
ctxt->sge[i].length,
ctxt->direction);
ctxt->mapped_sges = 0;
}
void svc_rdma_put_context(struct svc_rdma_op_ctxt *ctxt, int free_pages)
{
struct svcxprt_rdma *xprt = ctxt->xprt;
int i;
if (free_pages)
for (i = 0; i < ctxt->count; i++)
put_page(ctxt->pages[i]);
spin_lock(&xprt->sc_ctxt_lock);
xprt->sc_ctxt_used--;
list_add(&ctxt->list, &xprt->sc_ctxts);
spin_unlock(&xprt->sc_ctxt_lock);
}
static void svc_rdma_destroy_ctxts(struct svcxprt_rdma *xprt)
{
while (!list_empty(&xprt->sc_ctxts)) {
struct svc_rdma_op_ctxt *ctxt;
ctxt = list_first_entry(&xprt->sc_ctxts,
struct svc_rdma_op_ctxt, list);
list_del(&ctxt->list);
kfree(ctxt);
}
}
/* QP event handler */ /* QP event handler */
static void qp_event_handler(struct ib_event *event, void *context) static void qp_event_handler(struct ib_event *event, void *context)
{ {
...@@ -292,39 +185,6 @@ static void qp_event_handler(struct ib_event *event, void *context) ...@@ -292,39 +185,6 @@ static void qp_event_handler(struct ib_event *event, void *context)
} }
} }
/**
* svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC
* @cq: completion queue
* @wc: completed WR
*
*/
void svc_rdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
{
struct svcxprt_rdma *xprt = cq->cq_context;
struct ib_cqe *cqe = wc->wr_cqe;
struct svc_rdma_op_ctxt *ctxt;
trace_svcrdma_wc_send(wc);
atomic_inc(&xprt->sc_sq_avail);
wake_up(&xprt->sc_send_wait);
ctxt = container_of(cqe, struct svc_rdma_op_ctxt, cqe);
svc_rdma_unmap_dma(ctxt);
svc_rdma_put_context(ctxt, 1);
if (unlikely(wc->status != IB_WC_SUCCESS)) {
set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
svc_xprt_enqueue(&xprt->sc_xprt);
if (wc->status != IB_WC_WR_FLUSH_ERR)
pr_err("svcrdma: Send: %s (%u/0x%x)\n",
ib_wc_status_msg(wc->status),
wc->status, wc->vendor_err);
}
svc_xprt_put(&xprt->sc_xprt);
}
static struct svcxprt_rdma *svc_rdma_create_xprt(struct svc_serv *serv, static struct svcxprt_rdma *svc_rdma_create_xprt(struct svc_serv *serv,
struct net *net) struct net *net)
{ {
...@@ -338,14 +198,14 @@ static struct svcxprt_rdma *svc_rdma_create_xprt(struct svc_serv *serv, ...@@ -338,14 +198,14 @@ static struct svcxprt_rdma *svc_rdma_create_xprt(struct svc_serv *serv,
INIT_LIST_HEAD(&cma_xprt->sc_accept_q); INIT_LIST_HEAD(&cma_xprt->sc_accept_q);
INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q); INIT_LIST_HEAD(&cma_xprt->sc_rq_dto_q);
INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q); INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q);
INIT_LIST_HEAD(&cma_xprt->sc_ctxts); INIT_LIST_HEAD(&cma_xprt->sc_send_ctxts);
INIT_LIST_HEAD(&cma_xprt->sc_recv_ctxts); INIT_LIST_HEAD(&cma_xprt->sc_recv_ctxts);
INIT_LIST_HEAD(&cma_xprt->sc_rw_ctxts); INIT_LIST_HEAD(&cma_xprt->sc_rw_ctxts);
init_waitqueue_head(&cma_xprt->sc_send_wait); init_waitqueue_head(&cma_xprt->sc_send_wait);
spin_lock_init(&cma_xprt->sc_lock); spin_lock_init(&cma_xprt->sc_lock);
spin_lock_init(&cma_xprt->sc_rq_dto_lock); spin_lock_init(&cma_xprt->sc_rq_dto_lock);
spin_lock_init(&cma_xprt->sc_ctxt_lock); spin_lock_init(&cma_xprt->sc_send_lock);
spin_lock_init(&cma_xprt->sc_recv_lock); spin_lock_init(&cma_xprt->sc_recv_lock);
spin_lock_init(&cma_xprt->sc_rw_ctxt_lock); spin_lock_init(&cma_xprt->sc_rw_ctxt_lock);
...@@ -640,9 +500,6 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) ...@@ -640,9 +500,6 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
} }
atomic_set(&newxprt->sc_sq_avail, newxprt->sc_sq_depth); atomic_set(&newxprt->sc_sq_avail, newxprt->sc_sq_depth);
if (!svc_rdma_prealloc_ctxts(newxprt))
goto errout;
newxprt->sc_pd = ib_alloc_pd(dev, 0); newxprt->sc_pd = ib_alloc_pd(dev, 0);
if (IS_ERR(newxprt->sc_pd)) { if (IS_ERR(newxprt->sc_pd)) {
dprintk("svcrdma: error creating PD for connect request\n"); dprintk("svcrdma: error creating PD for connect request\n");
...@@ -794,11 +651,6 @@ static void __svc_rdma_free(struct work_struct *work) ...@@ -794,11 +651,6 @@ static void __svc_rdma_free(struct work_struct *work)
svc_rdma_flush_recv_queues(rdma); svc_rdma_flush_recv_queues(rdma);
/* Warn if we leaked a resource or under-referenced */
if (rdma->sc_ctxt_used != 0)
pr_err("svcrdma: ctxt still in use? (%d)\n",
rdma->sc_ctxt_used);
/* Final put of backchannel client transport */ /* Final put of backchannel client transport */
if (xprt->xpt_bc_xprt) { if (xprt->xpt_bc_xprt) {
xprt_put(xprt->xpt_bc_xprt); xprt_put(xprt->xpt_bc_xprt);
...@@ -806,7 +658,7 @@ static void __svc_rdma_free(struct work_struct *work) ...@@ -806,7 +658,7 @@ static void __svc_rdma_free(struct work_struct *work)
} }
svc_rdma_destroy_rw_ctxts(rdma); svc_rdma_destroy_rw_ctxts(rdma);
svc_rdma_destroy_ctxts(rdma); svc_rdma_send_ctxts_destroy(rdma);
svc_rdma_recv_ctxts_destroy(rdma); svc_rdma_recv_ctxts_destroy(rdma);
/* Destroy the QP if present (not a listener) */ /* Destroy the QP if present (not a listener) */
...@@ -860,52 +712,3 @@ static void svc_rdma_secure_port(struct svc_rqst *rqstp) ...@@ -860,52 +712,3 @@ static void svc_rdma_secure_port(struct svc_rqst *rqstp)
static void svc_rdma_kill_temp_xprt(struct svc_xprt *xprt) static void svc_rdma_kill_temp_xprt(struct svc_xprt *xprt)
{ {
} }
int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
{
struct ib_send_wr *bad_wr, *n_wr;
int wr_count;
int i;
int ret;
if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags))
return -ENOTCONN;
wr_count = 1;
for (n_wr = wr->next; n_wr; n_wr = n_wr->next)
wr_count++;
/* If the SQ is full, wait until an SQ entry is available */
while (1) {
if ((atomic_sub_return(wr_count, &xprt->sc_sq_avail) < 0)) {
atomic_inc(&rdma_stat_sq_starve);
trace_svcrdma_sq_full(xprt);
atomic_add(wr_count, &xprt->sc_sq_avail);
wait_event(xprt->sc_send_wait,
atomic_read(&xprt->sc_sq_avail) > wr_count);
if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags))
return -ENOTCONN;
trace_svcrdma_sq_retry(xprt);
continue;
}
/* Take a transport ref for each WR posted */
for (i = 0; i < wr_count; i++)
svc_xprt_get(&xprt->sc_xprt);
/* Bump used SQ WR count and post */
ret = ib_post_send(xprt->sc_qp, wr, &bad_wr);
trace_svcrdma_post_send(wr, ret);
if (ret) {
set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
for (i = 0; i < wr_count; i ++)
svc_xprt_put(&xprt->sc_xprt);
dprintk("svcrdma: failed to post SQ WR rc=%d\n", ret);
dprintk(" sc_sq_avail=%d, sc_sq_depth=%d\n",
atomic_read(&xprt->sc_sq_avail),
xprt->sc_sq_depth);
wake_up(&xprt->sc_send_wait);
}
break;
}
return ret;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment