Commit 7402a4fe authored by Trond Myklebust's avatar Trond Myklebust

SUNRPC: Fix up backchannel slot table accounting

Add a per-transport maximum limit in the socket case, and add
helpers to allow the NFSv4 code to discover that limit.
Signed-off-by: default avatarTrond Myklebust <trond.myklebust@hammerspace.com>
parent 9f98effc
...@@ -8380,6 +8380,7 @@ static void nfs4_init_channel_attrs(struct nfs41_create_session_args *args, ...@@ -8380,6 +8380,7 @@ static void nfs4_init_channel_attrs(struct nfs41_create_session_args *args,
{ {
unsigned int max_rqst_sz, max_resp_sz; unsigned int max_rqst_sz, max_resp_sz;
unsigned int max_bc_payload = rpc_max_bc_payload(clnt); unsigned int max_bc_payload = rpc_max_bc_payload(clnt);
unsigned int max_bc_slots = rpc_num_bc_slots(clnt);
max_rqst_sz = NFS_MAX_FILE_IO_SIZE + nfs41_maxwrite_overhead; max_rqst_sz = NFS_MAX_FILE_IO_SIZE + nfs41_maxwrite_overhead;
max_resp_sz = NFS_MAX_FILE_IO_SIZE + nfs41_maxread_overhead; max_resp_sz = NFS_MAX_FILE_IO_SIZE + nfs41_maxread_overhead;
...@@ -8402,6 +8403,8 @@ static void nfs4_init_channel_attrs(struct nfs41_create_session_args *args, ...@@ -8402,6 +8403,8 @@ static void nfs4_init_channel_attrs(struct nfs41_create_session_args *args,
args->bc_attrs.max_resp_sz_cached = 0; args->bc_attrs.max_resp_sz_cached = 0;
args->bc_attrs.max_ops = NFS4_MAX_BACK_CHANNEL_OPS; args->bc_attrs.max_ops = NFS4_MAX_BACK_CHANNEL_OPS;
args->bc_attrs.max_reqs = max_t(unsigned short, max_session_cb_slots, 1); args->bc_attrs.max_reqs = max_t(unsigned short, max_session_cb_slots, 1);
if (args->bc_attrs.max_reqs > max_bc_slots)
args->bc_attrs.max_reqs = max_bc_slots;
dprintk("%s: Back Channel : max_rqst_sz=%u max_resp_sz=%u " dprintk("%s: Back Channel : max_rqst_sz=%u max_resp_sz=%u "
"max_resp_sz_cached=%u max_ops=%u max_reqs=%u\n", "max_resp_sz_cached=%u max_ops=%u max_reqs=%u\n",
......
...@@ -43,6 +43,7 @@ void xprt_destroy_backchannel(struct rpc_xprt *, unsigned int max_reqs); ...@@ -43,6 +43,7 @@ void xprt_destroy_backchannel(struct rpc_xprt *, unsigned int max_reqs);
int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs); int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs);
void xprt_destroy_bc(struct rpc_xprt *xprt, unsigned int max_reqs); void xprt_destroy_bc(struct rpc_xprt *xprt, unsigned int max_reqs);
void xprt_free_bc_rqst(struct rpc_rqst *req); void xprt_free_bc_rqst(struct rpc_rqst *req);
unsigned int xprt_bc_max_slots(struct rpc_xprt *xprt);
/* /*
* Determine if a shared backchannel is in use * Determine if a shared backchannel is in use
......
...@@ -194,6 +194,7 @@ void rpc_setbufsize(struct rpc_clnt *, unsigned int, unsigned int); ...@@ -194,6 +194,7 @@ void rpc_setbufsize(struct rpc_clnt *, unsigned int, unsigned int);
struct net * rpc_net_ns(struct rpc_clnt *); struct net * rpc_net_ns(struct rpc_clnt *);
size_t rpc_max_payload(struct rpc_clnt *); size_t rpc_max_payload(struct rpc_clnt *);
size_t rpc_max_bc_payload(struct rpc_clnt *); size_t rpc_max_bc_payload(struct rpc_clnt *);
unsigned int rpc_num_bc_slots(struct rpc_clnt *);
void rpc_force_rebind(struct rpc_clnt *); void rpc_force_rebind(struct rpc_clnt *);
size_t rpc_peeraddr(struct rpc_clnt *, struct sockaddr *, size_t); size_t rpc_peeraddr(struct rpc_clnt *, struct sockaddr *, size_t);
const char *rpc_peeraddr2str(struct rpc_clnt *, enum rpc_display_format_t); const char *rpc_peeraddr2str(struct rpc_clnt *, enum rpc_display_format_t);
......
...@@ -158,6 +158,7 @@ struct rpc_xprt_ops { ...@@ -158,6 +158,7 @@ struct rpc_xprt_ops {
int (*bc_setup)(struct rpc_xprt *xprt, int (*bc_setup)(struct rpc_xprt *xprt,
unsigned int min_reqs); unsigned int min_reqs);
size_t (*bc_maxpayload)(struct rpc_xprt *xprt); size_t (*bc_maxpayload)(struct rpc_xprt *xprt);
unsigned int (*bc_num_slots)(struct rpc_xprt *xprt);
void (*bc_free_rqst)(struct rpc_rqst *rqst); void (*bc_free_rqst)(struct rpc_rqst *rqst);
void (*bc_destroy)(struct rpc_xprt *xprt, void (*bc_destroy)(struct rpc_xprt *xprt,
unsigned int max_reqs); unsigned int max_reqs);
...@@ -251,8 +252,9 @@ struct rpc_xprt { ...@@ -251,8 +252,9 @@ struct rpc_xprt {
#if defined(CONFIG_SUNRPC_BACKCHANNEL) #if defined(CONFIG_SUNRPC_BACKCHANNEL)
struct svc_serv *bc_serv; /* The RPC service which will */ struct svc_serv *bc_serv; /* The RPC service which will */
/* process the callback */ /* process the callback */
int bc_alloc_count; /* Total number of preallocs */ unsigned int bc_alloc_max;
atomic_t bc_free_slots; unsigned int bc_alloc_count; /* Total number of preallocs */
atomic_t bc_slot_count; /* Number of allocated slots */
spinlock_t bc_pa_lock; /* Protects the preallocated spinlock_t bc_pa_lock; /* Protects the preallocated
* items */ * items */
struct list_head bc_pa_list; /* List of preallocated struct list_head bc_pa_list; /* List of preallocated
......
...@@ -31,25 +31,20 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. ...@@ -31,25 +31,20 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#define RPCDBG_FACILITY RPCDBG_TRANS #define RPCDBG_FACILITY RPCDBG_TRANS
#endif #endif
#define BC_MAX_SLOTS 64U
unsigned int xprt_bc_max_slots(struct rpc_xprt *xprt)
{
return BC_MAX_SLOTS;
}
/* /*
* Helper routines that track the number of preallocation elements * Helper routines that track the number of preallocation elements
* on the transport. * on the transport.
*/ */
static inline int xprt_need_to_requeue(struct rpc_xprt *xprt) static inline int xprt_need_to_requeue(struct rpc_xprt *xprt)
{ {
return xprt->bc_alloc_count < atomic_read(&xprt->bc_free_slots); return xprt->bc_alloc_count < xprt->bc_alloc_max;
}
static inline void xprt_inc_alloc_count(struct rpc_xprt *xprt, unsigned int n)
{
atomic_add(n, &xprt->bc_free_slots);
xprt->bc_alloc_count += n;
}
static inline int xprt_dec_alloc_count(struct rpc_xprt *xprt, unsigned int n)
{
atomic_sub(n, &xprt->bc_free_slots);
return xprt->bc_alloc_count -= n;
} }
/* /*
...@@ -145,6 +140,9 @@ int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs) ...@@ -145,6 +140,9 @@ int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs)
dprintk("RPC: setup backchannel transport\n"); dprintk("RPC: setup backchannel transport\n");
if (min_reqs > BC_MAX_SLOTS)
min_reqs = BC_MAX_SLOTS;
/* /*
* We use a temporary list to keep track of the preallocated * We use a temporary list to keep track of the preallocated
* buffers. Once we're done building the list we splice it * buffers. Once we're done building the list we splice it
...@@ -172,7 +170,9 @@ int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs) ...@@ -172,7 +170,9 @@ int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs)
*/ */
spin_lock(&xprt->bc_pa_lock); spin_lock(&xprt->bc_pa_lock);
list_splice(&tmp_list, &xprt->bc_pa_list); list_splice(&tmp_list, &xprt->bc_pa_list);
xprt_inc_alloc_count(xprt, min_reqs); xprt->bc_alloc_count += min_reqs;
xprt->bc_alloc_max += min_reqs;
atomic_add(min_reqs, &xprt->bc_slot_count);
spin_unlock(&xprt->bc_pa_lock); spin_unlock(&xprt->bc_pa_lock);
dprintk("RPC: setup backchannel transport done\n"); dprintk("RPC: setup backchannel transport done\n");
...@@ -220,11 +220,13 @@ void xprt_destroy_bc(struct rpc_xprt *xprt, unsigned int max_reqs) ...@@ -220,11 +220,13 @@ void xprt_destroy_bc(struct rpc_xprt *xprt, unsigned int max_reqs)
goto out; goto out;
spin_lock_bh(&xprt->bc_pa_lock); spin_lock_bh(&xprt->bc_pa_lock);
xprt_dec_alloc_count(xprt, max_reqs); xprt->bc_alloc_max -= max_reqs;
list_for_each_entry_safe(req, tmp, &xprt->bc_pa_list, rq_bc_pa_list) { list_for_each_entry_safe(req, tmp, &xprt->bc_pa_list, rq_bc_pa_list) {
dprintk("RPC: req=%p\n", req); dprintk("RPC: req=%p\n", req);
list_del(&req->rq_bc_pa_list); list_del(&req->rq_bc_pa_list);
xprt_free_allocation(req); xprt_free_allocation(req);
xprt->bc_alloc_count--;
atomic_dec(&xprt->bc_slot_count);
if (--max_reqs == 0) if (--max_reqs == 0)
break; break;
} }
...@@ -241,13 +243,14 @@ static struct rpc_rqst *xprt_get_bc_request(struct rpc_xprt *xprt, __be32 xid, ...@@ -241,13 +243,14 @@ static struct rpc_rqst *xprt_get_bc_request(struct rpc_xprt *xprt, __be32 xid,
struct rpc_rqst *req = NULL; struct rpc_rqst *req = NULL;
dprintk("RPC: allocate a backchannel request\n"); dprintk("RPC: allocate a backchannel request\n");
if (atomic_read(&xprt->bc_free_slots) <= 0)
goto not_found;
if (list_empty(&xprt->bc_pa_list)) { if (list_empty(&xprt->bc_pa_list)) {
if (!new) if (!new)
goto not_found; goto not_found;
if (atomic_read(&xprt->bc_slot_count) >= BC_MAX_SLOTS)
goto not_found;
list_add_tail(&new->rq_bc_pa_list, &xprt->bc_pa_list); list_add_tail(&new->rq_bc_pa_list, &xprt->bc_pa_list);
xprt->bc_alloc_count++; xprt->bc_alloc_count++;
atomic_inc(&xprt->bc_slot_count);
} }
req = list_first_entry(&xprt->bc_pa_list, struct rpc_rqst, req = list_first_entry(&xprt->bc_pa_list, struct rpc_rqst,
rq_bc_pa_list); rq_bc_pa_list);
...@@ -291,6 +294,7 @@ void xprt_free_bc_rqst(struct rpc_rqst *req) ...@@ -291,6 +294,7 @@ void xprt_free_bc_rqst(struct rpc_rqst *req)
if (xprt_need_to_requeue(xprt)) { if (xprt_need_to_requeue(xprt)) {
list_add_tail(&req->rq_bc_pa_list, &xprt->bc_pa_list); list_add_tail(&req->rq_bc_pa_list, &xprt->bc_pa_list);
xprt->bc_alloc_count++; xprt->bc_alloc_count++;
atomic_inc(&xprt->bc_slot_count);
req = NULL; req = NULL;
} }
spin_unlock_bh(&xprt->bc_pa_lock); spin_unlock_bh(&xprt->bc_pa_lock);
...@@ -357,7 +361,7 @@ void xprt_complete_bc_request(struct rpc_rqst *req, uint32_t copied) ...@@ -357,7 +361,7 @@ void xprt_complete_bc_request(struct rpc_rqst *req, uint32_t copied)
spin_lock(&xprt->bc_pa_lock); spin_lock(&xprt->bc_pa_lock);
list_del(&req->rq_bc_pa_list); list_del(&req->rq_bc_pa_list);
xprt_dec_alloc_count(xprt, 1); xprt->bc_alloc_count--;
spin_unlock(&xprt->bc_pa_lock); spin_unlock(&xprt->bc_pa_lock);
req->rq_private_buf.len = copied; req->rq_private_buf.len = copied;
......
...@@ -1526,6 +1526,19 @@ size_t rpc_max_bc_payload(struct rpc_clnt *clnt) ...@@ -1526,6 +1526,19 @@ size_t rpc_max_bc_payload(struct rpc_clnt *clnt)
} }
EXPORT_SYMBOL_GPL(rpc_max_bc_payload); EXPORT_SYMBOL_GPL(rpc_max_bc_payload);
unsigned int rpc_num_bc_slots(struct rpc_clnt *clnt)
{
struct rpc_xprt *xprt;
unsigned int ret;
rcu_read_lock();
xprt = rcu_dereference(clnt->cl_xprt);
ret = xprt->ops->bc_num_slots(xprt);
rcu_read_unlock();
return ret;
}
EXPORT_SYMBOL_GPL(rpc_num_bc_slots);
/** /**
* rpc_force_rebind - force transport to check that remote port is unchanged * rpc_force_rebind - force transport to check that remote port is unchanged
* @clnt: client to rebind * @clnt: client to rebind
......
...@@ -1595,7 +1595,7 @@ bc_svc_process(struct svc_serv *serv, struct rpc_rqst *req, ...@@ -1595,7 +1595,7 @@ bc_svc_process(struct svc_serv *serv, struct rpc_rqst *req,
/* Parse and execute the bc call */ /* Parse and execute the bc call */
proc_error = svc_process_common(rqstp, argv, resv); proc_error = svc_process_common(rqstp, argv, resv);
atomic_inc(&req->rq_xprt->bc_free_slots); atomic_dec(&req->rq_xprt->bc_slot_count);
if (!proc_error) { if (!proc_error) {
/* Processing error: drop the request */ /* Processing error: drop the request */
xprt_free_bc_request(req); xprt_free_bc_request(req);
......
...@@ -52,6 +52,13 @@ size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *xprt) ...@@ -52,6 +52,13 @@ size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *xprt)
return maxmsg - RPCRDMA_HDRLEN_MIN; return maxmsg - RPCRDMA_HDRLEN_MIN;
} }
unsigned int xprt_rdma_bc_max_slots(struct rpc_xprt *xprt)
{
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
return r_xprt->rx_buf.rb_bc_srv_max_requests;
}
static int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst) static int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
{ {
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt); struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
......
...@@ -812,6 +812,7 @@ static const struct rpc_xprt_ops xprt_rdma_procs = { ...@@ -812,6 +812,7 @@ static const struct rpc_xprt_ops xprt_rdma_procs = {
#if defined(CONFIG_SUNRPC_BACKCHANNEL) #if defined(CONFIG_SUNRPC_BACKCHANNEL)
.bc_setup = xprt_rdma_bc_setup, .bc_setup = xprt_rdma_bc_setup,
.bc_maxpayload = xprt_rdma_bc_maxpayload, .bc_maxpayload = xprt_rdma_bc_maxpayload,
.bc_num_slots = xprt_rdma_bc_max_slots,
.bc_free_rqst = xprt_rdma_bc_free_rqst, .bc_free_rqst = xprt_rdma_bc_free_rqst,
.bc_destroy = xprt_rdma_bc_destroy, .bc_destroy = xprt_rdma_bc_destroy,
#endif #endif
......
...@@ -605,6 +605,7 @@ void xprt_rdma_cleanup(void); ...@@ -605,6 +605,7 @@ void xprt_rdma_cleanup(void);
#if defined(CONFIG_SUNRPC_BACKCHANNEL) #if defined(CONFIG_SUNRPC_BACKCHANNEL)
int xprt_rdma_bc_setup(struct rpc_xprt *, unsigned int); int xprt_rdma_bc_setup(struct rpc_xprt *, unsigned int);
size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *); size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *);
unsigned int xprt_rdma_bc_max_slots(struct rpc_xprt *);
int rpcrdma_bc_post_recv(struct rpcrdma_xprt *, unsigned int); int rpcrdma_bc_post_recv(struct rpcrdma_xprt *, unsigned int);
void rpcrdma_bc_receive_call(struct rpcrdma_xprt *, struct rpcrdma_rep *); void rpcrdma_bc_receive_call(struct rpcrdma_xprt *, struct rpcrdma_rep *);
int xprt_rdma_bc_send_reply(struct rpc_rqst *rqst); int xprt_rdma_bc_send_reply(struct rpc_rqst *rqst);
......
...@@ -2788,6 +2788,7 @@ static const struct rpc_xprt_ops xs_tcp_ops = { ...@@ -2788,6 +2788,7 @@ static const struct rpc_xprt_ops xs_tcp_ops = {
#ifdef CONFIG_SUNRPC_BACKCHANNEL #ifdef CONFIG_SUNRPC_BACKCHANNEL
.bc_setup = xprt_setup_bc, .bc_setup = xprt_setup_bc,
.bc_maxpayload = xs_tcp_bc_maxpayload, .bc_maxpayload = xs_tcp_bc_maxpayload,
.bc_num_slots = xprt_bc_max_slots,
.bc_free_rqst = xprt_free_bc_rqst, .bc_free_rqst = xprt_free_bc_rqst,
.bc_destroy = xprt_destroy_bc, .bc_destroy = xprt_destroy_bc,
#endif #endif
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment