Commit b9315c75 authored by Trond Myklebust's avatar Trond Myklebust

RPC: patch by Chuck Lever to make the number of RPC slots a tunable parameter.

     This is wanted in order to allow the NFS client to send more requests before
     is has to block and wait for replies.
     This is mainly useful if you have a WAN and want to ensure that the bandwidth
     is being used efficiently.
parent 9c9ff1bc
...@@ -47,11 +47,8 @@ ...@@ -47,11 +47,8 @@
* their needs. People that do NFS over a slow network, might for * their needs. People that do NFS over a slow network, might for
* instance want to reduce it to something closer to 1 for improved * instance want to reduce it to something closer to 1 for improved
* interactive response. * interactive response.
*
* For the moment, though, we instead set it to RPC_MAXREQS, which
* is the maximum number of simultaneous RPC requests on the wire.
*/ */
#define NFS_MAX_READAHEAD RPC_MAXREQS #define NFS_MAX_READAHEAD (RPC_DEF_SLOT_TABLE - 1)
static void nfs_invalidate_inode(struct inode *); static void nfs_invalidate_inode(struct inode *);
static int nfs_update_inode(struct inode *, struct nfs_fattr *, unsigned long); static int nfs_update_inode(struct inode *, struct nfs_fattr *, unsigned long);
......
...@@ -92,6 +92,8 @@ enum { ...@@ -92,6 +92,8 @@ enum {
CTL_NFSDEBUG, CTL_NFSDEBUG,
CTL_NFSDDEBUG, CTL_NFSDDEBUG,
CTL_NLMDEBUG, CTL_NLMDEBUG,
CTL_SLOTTABLE_UDP,
CTL_SLOTTABLE_TCP,
}; };
#endif /* _LINUX_SUNRPC_DEBUG_H_ */ #endif /* _LINUX_SUNRPC_DEBUG_H_ */
...@@ -28,16 +28,18 @@ ...@@ -28,16 +28,18 @@
* *
* Upper procedures may check whether a request would block waiting for * Upper procedures may check whether a request would block waiting for
* a free RPC slot by using the RPC_CONGESTED() macro. * a free RPC slot by using the RPC_CONGESTED() macro.
*
* Note: on machines with low memory we should probably use a smaller
* MAXREQS value: At 32 outstanding reqs with 8 megs of RAM, fragment
* reassembly will frequently run out of memory.
*/ */
#define RPC_MAXCONG (16) extern unsigned int xprt_udp_slot_table_entries;
#define RPC_MAXREQS RPC_MAXCONG extern unsigned int xprt_tcp_slot_table_entries;
#define RPC_CWNDSCALE (256)
#define RPC_MAXCWND (RPC_MAXCONG * RPC_CWNDSCALE) #define RPC_MIN_SLOT_TABLE (2U)
#define RPC_DEF_SLOT_TABLE (16U)
#define RPC_MAX_SLOT_TABLE (128U)
#define RPC_CWNDSHIFT (8U)
#define RPC_CWNDSCALE (1U << RPC_CWNDSHIFT)
#define RPC_INITCWND RPC_CWNDSCALE #define RPC_INITCWND RPC_CWNDSCALE
#define RPC_MAXCWND(xprt) ((xprt)->max_reqs << RPC_CWNDSHIFT)
#define RPCXPRT_CONGESTED(xprt) ((xprt)->cong >= (xprt)->cwnd) #define RPCXPRT_CONGESTED(xprt) ((xprt)->cong >= (xprt)->cwnd)
/* Default timeout values */ /* Default timeout values */
...@@ -92,7 +94,6 @@ struct rpc_rqst { ...@@ -92,7 +94,6 @@ struct rpc_rqst {
*/ */
struct rpc_task * rq_task; /* RPC task data */ struct rpc_task * rq_task; /* RPC task data */
__u32 rq_xid; /* request XID */ __u32 rq_xid; /* request XID */
struct rpc_rqst * rq_next; /* free list */
int rq_cong; /* has incremented xprt->cong */ int rq_cong; /* has incremented xprt->cong */
int rq_received; /* receive completed */ int rq_received; /* receive completed */
u32 rq_seqno; /* gss seq no. used on req. */ u32 rq_seqno; /* gss seq no. used on req. */
...@@ -145,8 +146,9 @@ struct rpc_xprt { ...@@ -145,8 +146,9 @@ struct rpc_xprt {
struct rpc_wait_queue resend; /* requests waiting to resend */ struct rpc_wait_queue resend; /* requests waiting to resend */
struct rpc_wait_queue pending; /* requests in flight */ struct rpc_wait_queue pending; /* requests in flight */
struct rpc_wait_queue backlog; /* waiting for slot */ struct rpc_wait_queue backlog; /* waiting for slot */
struct rpc_rqst * free; /* free slots */ struct list_head free; /* free slots */
struct rpc_rqst slot[RPC_MAXREQS]; struct rpc_rqst * slot; /* slot table storage */
unsigned int max_reqs; /* total slots */
unsigned long sockstate; /* Socket state */ unsigned long sockstate; /* Socket state */
unsigned char shutdown : 1, /* being shut down */ unsigned char shutdown : 1, /* being shut down */
nocong : 1, /* no congestion control */ nocong : 1, /* no congestion control */
......
...@@ -63,6 +63,8 @@ EXPORT_SYMBOL(rpc_mkpipe); ...@@ -63,6 +63,8 @@ EXPORT_SYMBOL(rpc_mkpipe);
EXPORT_SYMBOL(xprt_create_proto); EXPORT_SYMBOL(xprt_create_proto);
EXPORT_SYMBOL(xprt_destroy); EXPORT_SYMBOL(xprt_destroy);
EXPORT_SYMBOL(xprt_set_timeout); EXPORT_SYMBOL(xprt_set_timeout);
EXPORT_SYMBOL(xprt_udp_slot_table_entries);
EXPORT_SYMBOL(xprt_tcp_slot_table_entries);
/* Client credential cache */ /* Client credential cache */
EXPORT_SYMBOL(rpcauth_register); EXPORT_SYMBOL(rpcauth_register);
......
/* /*
* linux/net/sunrpc/sysctl.c * linux/net/sunrpc/sysctl.c
* *
* Sysctl interface to sunrpc module. This is for debugging only now. * Sysctl interface to sunrpc module.
* *
* I would prefer to register the sunrpc table below sys/net, but that's * I would prefer to register the sunrpc table below sys/net, but that's
* impossible at the moment. * impossible at the moment.
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#include <linux/sunrpc/types.h> #include <linux/sunrpc/types.h>
#include <linux/sunrpc/sched.h> #include <linux/sunrpc/sched.h>
#include <linux/sunrpc/stats.h> #include <linux/sunrpc/stats.h>
#include <linux/sunrpc/xprt.h>
/* /*
* Declare the debug flags here * Declare the debug flags here
...@@ -117,6 +118,9 @@ proc_dodebug(ctl_table *table, int write, struct file *file, ...@@ -117,6 +118,9 @@ proc_dodebug(ctl_table *table, int write, struct file *file,
return 0; return 0;
} }
static unsigned int min_slot_table_size = RPC_MIN_SLOT_TABLE;
static unsigned int max_slot_table_size = RPC_MAX_SLOT_TABLE;
static ctl_table debug_table[] = { static ctl_table debug_table[] = {
{ {
.ctl_name = CTL_RPCDEBUG, .ctl_name = CTL_RPCDEBUG,
...@@ -150,6 +154,28 @@ static ctl_table debug_table[] = { ...@@ -150,6 +154,28 @@ static ctl_table debug_table[] = {
.mode = 0644, .mode = 0644,
.proc_handler = &proc_dodebug .proc_handler = &proc_dodebug
}, },
{
.ctl_name = CTL_SLOTTABLE_UDP,
.procname = "udp_slot_table_entries",
.data = &xprt_udp_slot_table_entries,
.maxlen = sizeof(unsigned int),
.mode = 0644,
.proc_handler = &proc_dointvec_minmax,
.strategy = &sysctl_intvec,
.extra1 = &min_slot_table_size,
.extra2 = &max_slot_table_size
},
{
.ctl_name = CTL_SLOTTABLE_TCP,
.procname = "tcp_slot_table_entries",
.data = &xprt_tcp_slot_table_entries,
.maxlen = sizeof(unsigned int),
.mode = 0644,
.proc_handler = &proc_dointvec_minmax,
.strategy = &sysctl_intvec,
.extra1 = &min_slot_table_size,
.extra2 = &max_slot_table_size
},
{ .ctl_name = 0 } { .ctl_name = 0 }
}; };
......
...@@ -338,8 +338,8 @@ xprt_adjust_cwnd(struct rpc_xprt *xprt, int result) ...@@ -338,8 +338,8 @@ xprt_adjust_cwnd(struct rpc_xprt *xprt, int result)
/* The (cwnd >> 1) term makes sure /* The (cwnd >> 1) term makes sure
* the result gets rounded properly. */ * the result gets rounded properly. */
cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd; cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd;
if (cwnd > RPC_MAXCWND) if (cwnd > RPC_MAXCWND(xprt))
cwnd = RPC_MAXCWND; cwnd = RPC_MAXCWND(xprt);
__xprt_lock_write_next(xprt); __xprt_lock_write_next(xprt);
} else if (result == -ETIMEDOUT) { } else if (result == -ETIMEDOUT) {
cwnd >>= 1; cwnd >>= 1;
...@@ -1304,10 +1304,9 @@ do_xprt_reserve(struct rpc_task *task) ...@@ -1304,10 +1304,9 @@ do_xprt_reserve(struct rpc_task *task)
task->tk_status = 0; task->tk_status = 0;
if (task->tk_rqstp) if (task->tk_rqstp)
return; return;
if (xprt->free) { if (!list_empty(&xprt->free)) {
struct rpc_rqst *req = xprt->free; struct rpc_rqst *req = list_entry(xprt->free.next, struct rpc_rqst, rq_list);
xprt->free = req->rq_next; list_del_init(&req->rq_list);
req->rq_next = NULL;
task->tk_rqstp = req; task->tk_rqstp = req;
xprt_request_init(task, xprt); xprt_request_init(task, xprt);
return; return;
...@@ -1343,7 +1342,6 @@ xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt) ...@@ -1343,7 +1342,6 @@ xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
req->rq_task = task; req->rq_task = task;
req->rq_xprt = xprt; req->rq_xprt = xprt;
req->rq_xid = xprt_alloc_xid(xprt); req->rq_xid = xprt_alloc_xid(xprt);
INIT_LIST_HEAD(&req->rq_list);
dprintk("RPC: %4d reserved req %p xid %08x\n", task->tk_pid, dprintk("RPC: %4d reserved req %p xid %08x\n", task->tk_pid,
req, req->rq_xid); req, req->rq_xid);
} }
...@@ -1374,9 +1372,7 @@ xprt_release(struct rpc_task *task) ...@@ -1374,9 +1372,7 @@ xprt_release(struct rpc_task *task)
dprintk("RPC: %4d release request %p\n", task->tk_pid, req); dprintk("RPC: %4d release request %p\n", task->tk_pid, req);
spin_lock(&xprt->xprt_lock); spin_lock(&xprt->xprt_lock);
req->rq_next = xprt->free; list_add(&req->rq_list, &xprt->free);
xprt->free = req;
xprt_clear_backlog(xprt); xprt_clear_backlog(xprt);
spin_unlock(&xprt->xprt_lock); spin_unlock(&xprt->xprt_lock);
} }
...@@ -1407,6 +1403,9 @@ xprt_set_timeout(struct rpc_timeout *to, unsigned int retr, unsigned long incr) ...@@ -1407,6 +1403,9 @@ xprt_set_timeout(struct rpc_timeout *to, unsigned int retr, unsigned long incr)
to->to_exponential = 0; to->to_exponential = 0;
} }
unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE;
unsigned int xprt_tcp_slot_table_entries = RPC_DEF_SLOT_TABLE;
/* /*
* Initialize an RPC client * Initialize an RPC client
*/ */
...@@ -1414,21 +1413,33 @@ static struct rpc_xprt * ...@@ -1414,21 +1413,33 @@ static struct rpc_xprt *
xprt_setup(int proto, struct sockaddr_in *ap, struct rpc_timeout *to) xprt_setup(int proto, struct sockaddr_in *ap, struct rpc_timeout *to)
{ {
struct rpc_xprt *xprt; struct rpc_xprt *xprt;
unsigned int entries;
size_t slot_table_size;
struct rpc_rqst *req; struct rpc_rqst *req;
int i;
dprintk("RPC: setting up %s transport...\n", dprintk("RPC: setting up %s transport...\n",
proto == IPPROTO_UDP? "UDP" : "TCP"); proto == IPPROTO_UDP? "UDP" : "TCP");
entries = (proto == IPPROTO_TCP)?
xprt_tcp_slot_table_entries : xprt_udp_slot_table_entries;
if ((xprt = kmalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL) if ((xprt = kmalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL)
return ERR_PTR(-ENOMEM); return ERR_PTR(-ENOMEM);
memset(xprt, 0, sizeof(*xprt)); /* Nnnngh! */ memset(xprt, 0, sizeof(*xprt)); /* Nnnngh! */
xprt->max_reqs = entries;
slot_table_size = entries * sizeof(xprt->slot[0]);
xprt->slot = kmalloc(slot_table_size, GFP_KERNEL);
if (xprt->slot == NULL) {
kfree(xprt);
return ERR_PTR(-ENOMEM);
}
memset(xprt->slot, 0, slot_table_size);
xprt->addr = *ap; xprt->addr = *ap;
xprt->prot = proto; xprt->prot = proto;
xprt->stream = (proto == IPPROTO_TCP)? 1 : 0; xprt->stream = (proto == IPPROTO_TCP)? 1 : 0;
if (xprt->stream) { if (xprt->stream) {
xprt->cwnd = RPC_MAXCWND; xprt->cwnd = RPC_MAXCWND(xprt);
xprt->nocong = 1; xprt->nocong = 1;
} else } else
xprt->cwnd = RPC_INITCWND; xprt->cwnd = RPC_INITCWND;
...@@ -1436,6 +1447,7 @@ xprt_setup(int proto, struct sockaddr_in *ap, struct rpc_timeout *to) ...@@ -1436,6 +1447,7 @@ xprt_setup(int proto, struct sockaddr_in *ap, struct rpc_timeout *to)
spin_lock_init(&xprt->xprt_lock); spin_lock_init(&xprt->xprt_lock);
init_waitqueue_head(&xprt->cong_wait); init_waitqueue_head(&xprt->cong_wait);
INIT_LIST_HEAD(&xprt->free);
INIT_LIST_HEAD(&xprt->recv); INIT_LIST_HEAD(&xprt->recv);
INIT_WORK(&xprt->sock_connect, xprt_socket_connect, xprt); INIT_WORK(&xprt->sock_connect, xprt_socket_connect, xprt);
INIT_WORK(&xprt->task_cleanup, xprt_socket_autoclose, xprt); INIT_WORK(&xprt->task_cleanup, xprt_socket_autoclose, xprt);
...@@ -1458,17 +1470,16 @@ xprt_setup(int proto, struct sockaddr_in *ap, struct rpc_timeout *to) ...@@ -1458,17 +1470,16 @@ xprt_setup(int proto, struct sockaddr_in *ap, struct rpc_timeout *to)
INIT_RPC_WAITQ(&xprt->backlog, "xprt_backlog"); INIT_RPC_WAITQ(&xprt->backlog, "xprt_backlog");
/* initialize free list */ /* initialize free list */
for (i = 0, req = xprt->slot; i < RPC_MAXREQS-1; i++, req++) for (req = &xprt->slot[entries-1]; req >= &xprt->slot[0]; req--)
req->rq_next = req + 1; list_add(&req->rq_list, &xprt->free);
req->rq_next = NULL;
xprt->free = xprt->slot;
xprt_init_xid(xprt); xprt_init_xid(xprt);
/* Check whether we want to use a reserved port */ /* Check whether we want to use a reserved port */
xprt->resvport = capable(CAP_NET_BIND_SERVICE) ? 1 : 0; xprt->resvport = capable(CAP_NET_BIND_SERVICE) ? 1 : 0;
dprintk("RPC: created transport %p\n", xprt); dprintk("RPC: created transport %p with %u slots\n", xprt,
xprt->max_reqs);
return xprt; return xprt;
} }
...@@ -1547,11 +1558,11 @@ xprt_sock_setbufsize(struct rpc_xprt *xprt) ...@@ -1547,11 +1558,11 @@ xprt_sock_setbufsize(struct rpc_xprt *xprt)
return; return;
if (xprt->rcvsize) { if (xprt->rcvsize) {
sk->sk_userlocks |= SOCK_RCVBUF_LOCK; sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
sk->sk_rcvbuf = xprt->rcvsize * RPC_MAXCONG * 2; sk->sk_rcvbuf = xprt->rcvsize * xprt->max_reqs * 2;
} }
if (xprt->sndsize) { if (xprt->sndsize) {
sk->sk_userlocks |= SOCK_SNDBUF_LOCK; sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
sk->sk_sndbuf = xprt->sndsize * RPC_MAXCONG * 2; sk->sk_sndbuf = xprt->sndsize * xprt->max_reqs * 2;
sk->sk_write_space(sk); sk->sk_write_space(sk);
} }
} }
...@@ -1640,6 +1651,7 @@ xprt_destroy(struct rpc_xprt *xprt) ...@@ -1640,6 +1651,7 @@ xprt_destroy(struct rpc_xprt *xprt)
dprintk("RPC: destroying transport %p\n", xprt); dprintk("RPC: destroying transport %p\n", xprt);
xprt_shutdown(xprt); xprt_shutdown(xprt);
xprt_close(xprt); xprt_close(xprt);
kfree(xprt->slot);
kfree(xprt); kfree(xprt);
return 0; return 0;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment