Commit 02107148 authored by Chuck Lever's avatar Chuck Lever Committed by Trond Myklebust

SUNRPC: switchable buffer allocation

 Add RPC client transport switch support for replacing buffer management
 on a per-transport basis.

 In the current IPv4 socket transport implementation, RPC buffers are
 allocated as needed for each RPC message that is sent.  Some transport
 implementations may choose to use pre-allocated buffers for encoding,
 sending, receiving, and unmarshalling RPC messages, however.  For
 transports capable of direct data placement, the buffers can be carved
 out of a pre-registered area of memory rather than from a slab cache.

 Test-plan:
 Millions of fsx operations.  Performance characterization with "sio" and
 "iozone".  Use oprofile and other tools to look for significant regression
 in CPU utilization.
Signed-off-by: default avatarChuck Lever <cel@netapp.com>
Signed-off-by: default avatarTrond Myklebust <Trond.Myklebust@netapp.com>
parent 03c21733
...@@ -52,8 +52,6 @@ struct rpc_task { ...@@ -52,8 +52,6 @@ struct rpc_task {
* RPC call state * RPC call state
*/ */
struct rpc_message tk_msg; /* RPC call info */ struct rpc_message tk_msg; /* RPC call info */
__u32 * tk_buffer; /* XDR buffer */
size_t tk_bufsize;
__u8 tk_garb_retry; __u8 tk_garb_retry;
__u8 tk_cred_retry; __u8 tk_cred_retry;
...@@ -268,6 +266,7 @@ struct rpc_task *rpc_wake_up_next(struct rpc_wait_queue *); ...@@ -268,6 +266,7 @@ struct rpc_task *rpc_wake_up_next(struct rpc_wait_queue *);
void rpc_wake_up_status(struct rpc_wait_queue *, int); void rpc_wake_up_status(struct rpc_wait_queue *, int);
void rpc_delay(struct rpc_task *, unsigned long); void rpc_delay(struct rpc_task *, unsigned long);
void * rpc_malloc(struct rpc_task *, size_t); void * rpc_malloc(struct rpc_task *, size_t);
void rpc_free(struct rpc_task *);
int rpciod_up(void); int rpciod_up(void);
void rpciod_down(void); void rpciod_down(void);
void rpciod_wake_up(void); void rpciod_wake_up(void);
......
...@@ -79,21 +79,19 @@ struct rpc_rqst { ...@@ -79,21 +79,19 @@ struct rpc_rqst {
void (*rq_release_snd_buf)(struct rpc_rqst *); /* release rq_enc_pages */ void (*rq_release_snd_buf)(struct rpc_rqst *); /* release rq_enc_pages */
struct list_head rq_list; struct list_head rq_list;
__u32 * rq_buffer; /* XDR encode buffer */
size_t rq_bufsize;
struct xdr_buf rq_private_buf; /* The receive buffer struct xdr_buf rq_private_buf; /* The receive buffer
* used in the softirq. * used in the softirq.
*/ */
unsigned long rq_majortimeo; /* major timeout alarm */ unsigned long rq_majortimeo; /* major timeout alarm */
unsigned long rq_timeout; /* Current timeout value */ unsigned long rq_timeout; /* Current timeout value */
unsigned int rq_retries; /* # of retries */ unsigned int rq_retries; /* # of retries */
/*
* For authentication (e.g. auth_des)
*/
u32 rq_creddata[2];
/* /*
* Partial send handling * Partial send handling
*/ */
u32 rq_bytes_sent; /* Bytes we have sent */ u32 rq_bytes_sent; /* Bytes we have sent */
unsigned long rq_xtime; /* when transmitted */ unsigned long rq_xtime; /* when transmitted */
...@@ -107,6 +105,8 @@ struct rpc_xprt_ops { ...@@ -107,6 +105,8 @@ struct rpc_xprt_ops {
int (*reserve_xprt)(struct rpc_task *task); int (*reserve_xprt)(struct rpc_task *task);
void (*release_xprt)(struct rpc_xprt *xprt, struct rpc_task *task); void (*release_xprt)(struct rpc_xprt *xprt, struct rpc_task *task);
void (*connect)(struct rpc_task *task); void (*connect)(struct rpc_task *task);
void * (*buf_alloc)(struct rpc_task *task, size_t size);
void (*buf_free)(struct rpc_task *task);
int (*send_request)(struct rpc_task *task); int (*send_request)(struct rpc_task *task);
void (*set_retrans_timeout)(struct rpc_task *task); void (*set_retrans_timeout)(struct rpc_task *task);
void (*timer)(struct rpc_task *task); void (*timer)(struct rpc_task *task);
......
...@@ -644,24 +644,26 @@ call_reserveresult(struct rpc_task *task) ...@@ -644,24 +644,26 @@ call_reserveresult(struct rpc_task *task)
/* /*
* 2. Allocate the buffer. For details, see sched.c:rpc_malloc. * 2. Allocate the buffer. For details, see sched.c:rpc_malloc.
* (Note: buffer memory is freed in rpc_task_release). * (Note: buffer memory is freed in xprt_release).
*/ */
static void static void
call_allocate(struct rpc_task *task) call_allocate(struct rpc_task *task)
{ {
struct rpc_rqst *req = task->tk_rqstp;
struct rpc_xprt *xprt = task->tk_xprt;
unsigned int bufsiz; unsigned int bufsiz;
dprintk("RPC: %4d call_allocate (status %d)\n", dprintk("RPC: %4d call_allocate (status %d)\n",
task->tk_pid, task->tk_status); task->tk_pid, task->tk_status);
task->tk_action = call_bind; task->tk_action = call_bind;
if (task->tk_buffer) if (req->rq_buffer)
return; return;
/* FIXME: compute buffer requirements more exactly using /* FIXME: compute buffer requirements more exactly using
* auth->au_wslack */ * auth->au_wslack */
bufsiz = task->tk_msg.rpc_proc->p_bufsiz + RPC_SLACK_SPACE; bufsiz = task->tk_msg.rpc_proc->p_bufsiz + RPC_SLACK_SPACE;
if (rpc_malloc(task, bufsiz << 1) != NULL) if (xprt->ops->buf_alloc(task, bufsiz << 1) != NULL)
return; return;
printk(KERN_INFO "RPC: buffer allocation failed for task %p\n", task); printk(KERN_INFO "RPC: buffer allocation failed for task %p\n", task);
...@@ -704,14 +706,14 @@ call_encode(struct rpc_task *task) ...@@ -704,14 +706,14 @@ call_encode(struct rpc_task *task)
task->tk_pid, task->tk_status); task->tk_pid, task->tk_status);
/* Default buffer setup */ /* Default buffer setup */
bufsiz = task->tk_bufsize >> 1; bufsiz = req->rq_bufsize >> 1;
sndbuf->head[0].iov_base = (void *)task->tk_buffer; sndbuf->head[0].iov_base = (void *)req->rq_buffer;
sndbuf->head[0].iov_len = bufsiz; sndbuf->head[0].iov_len = bufsiz;
sndbuf->tail[0].iov_len = 0; sndbuf->tail[0].iov_len = 0;
sndbuf->page_len = 0; sndbuf->page_len = 0;
sndbuf->len = 0; sndbuf->len = 0;
sndbuf->buflen = bufsiz; sndbuf->buflen = bufsiz;
rcvbuf->head[0].iov_base = (void *)((char *)task->tk_buffer + bufsiz); rcvbuf->head[0].iov_base = (void *)((char *)req->rq_buffer + bufsiz);
rcvbuf->head[0].iov_len = bufsiz; rcvbuf->head[0].iov_len = bufsiz;
rcvbuf->tail[0].iov_len = 0; rcvbuf->tail[0].iov_len = 0;
rcvbuf->page_len = 0; rcvbuf->page_len = 0;
......
...@@ -41,8 +41,6 @@ static mempool_t *rpc_buffer_mempool __read_mostly; ...@@ -41,8 +41,6 @@ static mempool_t *rpc_buffer_mempool __read_mostly;
static void __rpc_default_timer(struct rpc_task *task); static void __rpc_default_timer(struct rpc_task *task);
static void rpciod_killall(void); static void rpciod_killall(void);
static void rpc_free(struct rpc_task *task);
static void rpc_async_schedule(void *); static void rpc_async_schedule(void *);
/* /*
...@@ -599,7 +597,6 @@ void rpc_exit_task(struct rpc_task *task) ...@@ -599,7 +597,6 @@ void rpc_exit_task(struct rpc_task *task)
WARN_ON(RPC_ASSASSINATED(task)); WARN_ON(RPC_ASSASSINATED(task));
/* Always release the RPC slot and buffer memory */ /* Always release the RPC slot and buffer memory */
xprt_release(task); xprt_release(task);
rpc_free(task);
} }
} }
} }
...@@ -724,17 +721,19 @@ static void rpc_async_schedule(void *arg) ...@@ -724,17 +721,19 @@ static void rpc_async_schedule(void *arg)
__rpc_execute((struct rpc_task *)arg); __rpc_execute((struct rpc_task *)arg);
} }
/* /**
* Allocate memory for RPC purposes. * rpc_malloc - allocate an RPC buffer
* @task: RPC task that will use this buffer
* @size: requested byte size
* *
* We try to ensure that some NFS reads and writes can always proceed * We try to ensure that some NFS reads and writes can always proceed
* by using a mempool when allocating 'small' buffers. * by using a mempool when allocating 'small' buffers.
* In order to avoid memory starvation triggering more writebacks of * In order to avoid memory starvation triggering more writebacks of
* NFS requests, we use GFP_NOFS rather than GFP_KERNEL. * NFS requests, we use GFP_NOFS rather than GFP_KERNEL.
*/ */
void * void * rpc_malloc(struct rpc_task *task, size_t size)
rpc_malloc(struct rpc_task *task, size_t size)
{ {
struct rpc_rqst *req = task->tk_rqstp;
gfp_t gfp; gfp_t gfp;
if (task->tk_flags & RPC_TASK_SWAPPER) if (task->tk_flags & RPC_TASK_SWAPPER)
...@@ -743,27 +742,33 @@ rpc_malloc(struct rpc_task *task, size_t size) ...@@ -743,27 +742,33 @@ rpc_malloc(struct rpc_task *task, size_t size)
gfp = GFP_NOFS; gfp = GFP_NOFS;
if (size > RPC_BUFFER_MAXSIZE) { if (size > RPC_BUFFER_MAXSIZE) {
task->tk_buffer = kmalloc(size, gfp); req->rq_buffer = kmalloc(size, gfp);
if (task->tk_buffer) if (req->rq_buffer)
task->tk_bufsize = size; req->rq_bufsize = size;
} else { } else {
task->tk_buffer = mempool_alloc(rpc_buffer_mempool, gfp); req->rq_buffer = mempool_alloc(rpc_buffer_mempool, gfp);
if (task->tk_buffer) if (req->rq_buffer)
task->tk_bufsize = RPC_BUFFER_MAXSIZE; req->rq_bufsize = RPC_BUFFER_MAXSIZE;
} }
return task->tk_buffer; return req->rq_buffer;
} }
static void /**
rpc_free(struct rpc_task *task) * rpc_free - free buffer allocated via rpc_malloc
* @task: RPC task with a buffer to be freed
*
*/
void rpc_free(struct rpc_task *task)
{ {
if (task->tk_buffer) { struct rpc_rqst *req = task->tk_rqstp;
if (task->tk_bufsize == RPC_BUFFER_MAXSIZE)
mempool_free(task->tk_buffer, rpc_buffer_mempool); if (req->rq_buffer) {
if (req->rq_bufsize == RPC_BUFFER_MAXSIZE)
mempool_free(req->rq_buffer, rpc_buffer_mempool);
else else
kfree(task->tk_buffer); kfree(req->rq_buffer);
task->tk_buffer = NULL; req->rq_buffer = NULL;
task->tk_bufsize = 0; req->rq_bufsize = 0;
} }
} }
...@@ -887,7 +892,6 @@ void rpc_release_task(struct rpc_task *task) ...@@ -887,7 +892,6 @@ void rpc_release_task(struct rpc_task *task)
xprt_release(task); xprt_release(task);
if (task->tk_msg.rpc_cred) if (task->tk_msg.rpc_cred)
rpcauth_unbindcred(task); rpcauth_unbindcred(task);
rpc_free(task);
if (task->tk_client) { if (task->tk_client) {
rpc_release_client(task->tk_client); rpc_release_client(task->tk_client);
task->tk_client = NULL; task->tk_client = NULL;
......
...@@ -838,6 +838,8 @@ static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt) ...@@ -838,6 +838,8 @@ static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
req->rq_timeout = xprt->timeout.to_initval; req->rq_timeout = xprt->timeout.to_initval;
req->rq_task = task; req->rq_task = task;
req->rq_xprt = xprt; req->rq_xprt = xprt;
req->rq_buffer = NULL;
req->rq_bufsize = 0;
req->rq_xid = xprt_alloc_xid(xprt); req->rq_xid = xprt_alloc_xid(xprt);
req->rq_release_snd_buf = NULL; req->rq_release_snd_buf = NULL;
dprintk("RPC: %4d reserved req %p xid %08x\n", task->tk_pid, dprintk("RPC: %4d reserved req %p xid %08x\n", task->tk_pid,
...@@ -867,6 +869,7 @@ void xprt_release(struct rpc_task *task) ...@@ -867,6 +869,7 @@ void xprt_release(struct rpc_task *task)
mod_timer(&xprt->timer, mod_timer(&xprt->timer,
xprt->last_used + xprt->idle_timeout); xprt->last_used + xprt->idle_timeout);
spin_unlock_bh(&xprt->transport_lock); spin_unlock_bh(&xprt->transport_lock);
xprt->ops->buf_free(task);
task->tk_rqstp = NULL; task->tk_rqstp = NULL;
if (req->rq_release_snd_buf) if (req->rq_release_snd_buf)
req->rq_release_snd_buf(req); req->rq_release_snd_buf(req);
......
...@@ -28,6 +28,7 @@ ...@@ -28,6 +28,7 @@
#include <linux/udp.h> #include <linux/udp.h>
#include <linux/tcp.h> #include <linux/tcp.h>
#include <linux/sunrpc/clnt.h> #include <linux/sunrpc/clnt.h>
#include <linux/sunrpc/sched.h>
#include <linux/file.h> #include <linux/file.h>
#include <net/sock.h> #include <net/sock.h>
...@@ -1161,6 +1162,8 @@ static struct rpc_xprt_ops xs_udp_ops = { ...@@ -1161,6 +1162,8 @@ static struct rpc_xprt_ops xs_udp_ops = {
.reserve_xprt = xprt_reserve_xprt_cong, .reserve_xprt = xprt_reserve_xprt_cong,
.release_xprt = xprt_release_xprt_cong, .release_xprt = xprt_release_xprt_cong,
.connect = xs_connect, .connect = xs_connect,
.buf_alloc = rpc_malloc,
.buf_free = rpc_free,
.send_request = xs_udp_send_request, .send_request = xs_udp_send_request,
.set_retrans_timeout = xprt_set_retrans_timeout_rtt, .set_retrans_timeout = xprt_set_retrans_timeout_rtt,
.timer = xs_udp_timer, .timer = xs_udp_timer,
...@@ -1173,6 +1176,8 @@ static struct rpc_xprt_ops xs_tcp_ops = { ...@@ -1173,6 +1176,8 @@ static struct rpc_xprt_ops xs_tcp_ops = {
.reserve_xprt = xprt_reserve_xprt, .reserve_xprt = xprt_reserve_xprt,
.release_xprt = xprt_release_xprt, .release_xprt = xprt_release_xprt,
.connect = xs_connect, .connect = xs_connect,
.buf_alloc = rpc_malloc,
.buf_free = rpc_free,
.send_request = xs_tcp_send_request, .send_request = xs_tcp_send_request,
.set_retrans_timeout = xprt_set_retrans_timeout_def, .set_retrans_timeout = xprt_set_retrans_timeout_def,
.close = xs_close, .close = xs_close,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment