Commit 9f73fdbc authored by Trond Myklebust's avatar Trond Myklebust Committed by Linus Torvalds

[PATCH] Teach RPC client to send pages rather than iovecs.

Stop rpciod from deadlocking against itself in map_new_virtual() on HIGHMEM
systems. RPC client currently has to keep all pages that are scheduled for
transmission kmap()ed into an iovec for the entire duration of the call.
We only actually need to kmap() pages while making the (non-blocking)
call to sock_sendmsg().

NOTE: When transmitting several pages in one RPC call, sock_sendmsg()
requires us to kmap() *all* those pages at the same time. Opens for
deadlocks between rpciod and some other process that also kmaps more
than 1 page at a time.
For the TCP case we can solve later by converting to TCP_CORK+sendpage().

include/linux/sunrpc/xdr.h
   Introduce 'struct xdr_buf' in order to allow RPC layer to handle
   pages directly.

include/linux/sunrpc/xprt.h:
   Convert the RPC client send-buffer to the new format.

net/sunrpc/clnt.c
   Initialize the new format RPC send-buffer.

net/sunrpc/sunrpc_syms.c
   Export xdr_encode_pages()

net/sunrpc/xdr.c
   xdr_kmap() kmap()+copy a struct xdr_buf into an iovec array.
   xdr_kunmap() clean up after xdr_kmap().
   xdr_encode_pages() used to inline pages for transmission.

net/sunrpc/xprt.c
   xprt_sendmsg() needs to kmap() the pages into an iovec for transmission.

include/linux/nfs_xdr.h
   struct nfs_writeargs transmits full page information.
   Convert nfs_rpc_ops->write() to send pages.

fs/nfs/write.c
   Adapt to new format nfs_writeargs / nfs_rpc_ops->write()

fs/nfs/proc.c
   Convert nfs_proc_write().

fs/nfs/nfs2xdr.c
   Convert nfs_xdr_writeargs()

fs/nfs/nfs3proc.c
   Convert nfs3_proc_write().

fs/nfs/nfs3xdr.c
   Convert nfs3_xdr_writeargs()

Cheers,
   Trond
parent b2521b9c
......@@ -24,9 +24,6 @@
#include <linux/nfs2.h>
#include <linux/nfs_fs.h>
/* Uncomment this to support servers requiring longword lengths */
#define NFS_PAD_WRITES 1
#define NFSDBG_FACILITY NFSDBG_XDR
/* #define NFS_PARANOIA 1 */
......@@ -300,46 +297,19 @@ nfs_xdr_readres(struct rpc_rqst *req, u32 *p, struct nfs_readres *res)
static int
nfs_xdr_writeargs(struct rpc_rqst *req, u32 *p, struct nfs_writeargs *args)
{
unsigned int nr;
struct xdr_buf *sndbuf = &req->rq_snd_buf;
u32 offset = (u32)args->offset;
u32 count = args->count;
p = xdr_encode_fhandle(p, args->fh);
*p++ = htonl(args->offset);
*p++ = htonl(args->offset);
*p++ = htonl(offset);
*p++ = htonl(offset);
*p++ = htonl(count);
*p++ = htonl(count);
req->rq_slen = xdr_adjust_iovec(req->rq_svec, p);
/* Get the number of buffers in the send iovec */
nr = args->nriov;
if (nr+2 > MAX_IOVEC) {
printk(KERN_ERR "NFS: Bad number of iov's in xdr_writeargs "
"(nr %d max %d)\n", nr, MAX_IOVEC);
return -EINVAL;
}
/* Copy the iovec */
memcpy(req->rq_svec + 1, args->iov, nr * sizeof(struct iovec));
#ifdef NFS_PAD_WRITES
/*
* Some old servers require that the message length
* be a multiple of 4, so we pad it here if needed.
*/
if (count & 3) {
struct iovec *iov = req->rq_svec + nr + 1;
int pad = 4 - (count & 3);
iov->iov_base = (void *) "\0\0\0";
iov->iov_len = pad;
count += pad;
nr++;
}
#endif
req->rq_slen += count;
req->rq_snr += nr;
sndbuf->len = xdr_adjust_iovec(sndbuf->head, p);
/* Copy the page array */
xdr_encode_pages(sndbuf, args->pages, args->pgbase, count);
return 0;
}
......
......@@ -230,16 +230,17 @@ nfs3_proc_read(struct inode *inode, struct rpc_cred *cred,
static int
nfs3_proc_write(struct inode *inode, struct rpc_cred *cred,
struct nfs_fattr *fattr, int flags,
loff_t offset, unsigned int count,
void *buffer, struct nfs_writeverf *verf)
unsigned int base, unsigned int count,
struct page *page, struct nfs_writeverf *verf)
{
u64 offset = page_offset(page) + base;
struct nfs_writeargs arg = {
fh: NFS_FH(inode),
offset: offset,
count: count,
stable: NFS_FILE_SYNC,
nriov: 1,
iov: {{buffer, count}, {0,0}, {0,0}, {0,0}, {0,0}, {0,0}, {0,0}, {0,0}}
pgbase: base,
pages: &page
};
struct nfs_writeres res = {
fattr: fattr,
......
......@@ -22,9 +22,6 @@
#include <linux/nfs3.h>
#include <linux/nfs_fs.h>
/* Uncomment this to support servers requiring longword lengths */
#define NFS_PAD_WRITES 1
#define NFSDBG_FACILITY NFSDBG_XDR
/* Mapping from NFS error code to "errno" error code. */
......@@ -388,7 +385,7 @@ nfs3_xdr_readargs(struct rpc_rqst *req, u32 *p, struct nfs_readargs *args)
static int
nfs3_xdr_writeargs(struct rpc_rqst *req, u32 *p, struct nfs_writeargs *args)
{
unsigned int nr;
struct xdr_buf *sndbuf = &req->rq_snd_buf;
u32 count = args->count;
p = xdr_encode_fhandle(p, args->fh);
......@@ -396,37 +393,10 @@ nfs3_xdr_writeargs(struct rpc_rqst *req, u32 *p, struct nfs_writeargs *args)
*p++ = htonl(count);
*p++ = htonl(args->stable);
*p++ = htonl(count);
req->rq_slen = xdr_adjust_iovec(req->rq_svec, p);
/* Get the number of buffers in the send iovec */
nr = args->nriov;
if (nr+2 > MAX_IOVEC) {
printk(KERN_ERR "NFS: Bad number of iov's in xdr_writeargs\n");
return -EINVAL;
}
/* Copy the iovec */
memcpy(req->rq_svec + 1, args->iov, nr * sizeof(struct iovec));
#ifdef NFS_PAD_WRITES
/*
* Some old servers require that the message length
* be a multiple of 4, so we pad it here if needed.
*/
if (count & 3) {
struct iovec *iov = req->rq_svec + nr + 1;
int pad = 4 - (count & 3);
iov->iov_base = (void *) "\0\0\0";
iov->iov_len = pad;
count += pad;
nr++;
}
#endif
req->rq_slen += count;
req->rq_snr += nr;
sndbuf->len = xdr_adjust_iovec(sndbuf->head, p);
/* Copy the page array */
xdr_encode_pages(sndbuf, args->pages, args->pgbase, count);
return 0;
}
......
......@@ -173,16 +173,17 @@ nfs_proc_read(struct inode *inode, struct rpc_cred *cred,
static int
nfs_proc_write(struct inode *inode, struct rpc_cred *cred,
struct nfs_fattr *fattr, int how,
loff_t offset, unsigned int count,
void *buffer, struct nfs_writeverf *verf)
unsigned int base, unsigned int count,
struct page *page, struct nfs_writeverf *verf)
{
u64 offset = page_offset(page) + base;
struct nfs_writeargs arg = {
fh: NFS_FH(inode),
offset: offset,
count: count,
stable: NFS_FILE_SYNC,
nriov: 1,
iov: {{buffer, count}, {0,0}, {0,0}, {0,0}, {0,0}, {0,0}, {0,0}, {0,0}}
pgbase: base,
pages: &page
};
struct nfs_writeres res = {
fattr: fattr,
......
......@@ -77,6 +77,7 @@ struct nfs_write_data {
struct nfs_fattr fattr;
struct nfs_writeverf verf;
struct list_head pages; /* Coalesced requests we wish to flush */
struct page *pagevec[NFS_WRITE_MAXIOV];
};
/*
......@@ -105,6 +106,7 @@ static __inline__ struct nfs_write_data *nfs_writedata_alloc(void)
if (p) {
memset(p, 0, sizeof(*p));
INIT_LIST_HEAD(&p->pages);
p->args.pages = p->pagevec;
}
return p;
}
......@@ -164,7 +166,6 @@ nfs_writepage_sync(struct file *file, struct inode *inode, struct page *page,
(long long)NFS_FILEID(inode),
count, (long long)(page_offset(page) + offset));
buffer = kmap(page) + offset;
base = page_offset(page) + offset;
flags = ((IS_SWAPFILE(inode)) ? NFS_RW_SWAP : 0) | NFS_RW_SYNC;
......@@ -174,7 +175,7 @@ nfs_writepage_sync(struct file *file, struct inode *inode, struct page *page,
wsize = count;
result = NFS_PROTO(inode)->write(inode, cred, &fattr, flags,
base, wsize, buffer, &verf);
offset, wsize, page, &verf);
nfs_write_attributes(inode, &fattr);
if (result < 0) {
......@@ -187,7 +188,8 @@ nfs_writepage_sync(struct file *file, struct inode *inode, struct page *page,
wsize, result);
refresh = 1;
buffer += wsize;
base += wsize;
base += wsize;
offset += wsize;
written += wsize;
count -= wsize;
/*
......@@ -202,7 +204,6 @@ nfs_writepage_sync(struct file *file, struct inode *inode, struct page *page,
ClearPageError(page);
io_error:
kunmap(page);
if (cred)
put_rpccred(cred);
......@@ -857,29 +858,27 @@ static void
nfs_write_rpcsetup(struct list_head *head, struct nfs_write_data *data)
{
struct nfs_page *req;
struct iovec *iov;
struct page **pages;
unsigned int count;
/* Set up the RPC argument and reply structs
* NB: take care not to mess about with data->commit et al. */
iov = data->args.iov;
pages = data->args.pages;
count = 0;
while (!list_empty(head)) {
struct nfs_page *req = nfs_list_entry(head->next);
nfs_list_remove_request(req);
nfs_list_add_request(req, &data->pages);
iov->iov_base = kmap(req->wb_page) + req->wb_offset;
iov->iov_len = req->wb_bytes;
*pages++ = req->wb_page;
count += req->wb_bytes;
iov++;
data->args.nriov++;
}
req = nfs_list_entry(data->pages.next);
data->inode = req->wb_inode;
data->cred = req->wb_cred;
data->args.fh = NFS_FH(req->wb_inode);
data->args.offset = page_offset(req->wb_page) + req->wb_offset;
data->args.pgbase = req->wb_offset;
data->args.count = count;
data->res.fattr = &data->fattr;
data->res.count = count;
......@@ -945,11 +944,12 @@ nfs_flush_one(struct list_head *head, struct inode *inode, int how)
msg.rpc_resp = &data->res;
msg.rpc_cred = data->cred;
dprintk("NFS: %4d initiated write call (req %s/%Ld count %d nriov %d)\n",
task->tk_pid,
dprintk("NFS: %4d initiated write call (req %s/%Ld, %u bytes @ offset %Lu)\n",
task->tk_pid,
inode->i_sb->s_id,
(long long)NFS_FILEID(inode),
data->args.count, data->args.nriov);
(unsigned int)data->args.count,
(unsigned long long)data->args.offset);
rpc_clnt_sigmask(clnt, &oldset);
rpc_call_setup(task, &msg, 0);
......@@ -1061,8 +1061,6 @@ nfs_writeback_done(struct rpc_task *task)
nfs_list_remove_request(req);
page = req->wb_page;
kunmap(page);
dprintk("NFS: write (%s/%Ld %d@%Ld)",
req->wb_inode->i_sb->s_id,
(long long)NFS_FILEID(req->wb_inode),
......
......@@ -87,8 +87,8 @@ struct nfs_writeargs {
__u64 offset;
__u32 count;
enum nfs3_stable_how stable;
unsigned int nriov;
struct iovec iov[NFS_WRITE_MAXIOV];
unsigned int pgbase;
struct page ** pages;
};
struct nfs_writeverf {
......@@ -329,8 +329,8 @@ struct nfs_rpc_ops {
void *buffer, int *eofp);
int (*write) (struct inode *, struct rpc_cred *,
struct nfs_fattr *,
int, loff_t, unsigned int,
void *buffer, struct nfs_writeverf *verfp);
int, unsigned int, unsigned int,
struct page *, struct nfs_writeverf *verfp);
int (*commit) (struct inode *, struct nfs_fattr *,
unsigned long, unsigned int);
int (*create) (struct inode *, struct qstr *, struct iattr *,
......
......@@ -34,6 +34,31 @@ struct xdr_netobj {
*/
typedef int (*kxdrproc_t)(void *rqstp, u32 *data, void *obj);
/*
* Basic structure for transmission/reception of a client XDR message.
* Features a header (for a linear buffer containing RPC headers
* and the data payload for short messages), and then an array of
* pages.
* The tail iovec allows you to append data after the page array. Its
* main interest is for appending padding to the pages in order to
* satisfy the int_32-alignment requirements in RFC1832.
*
* For the future, we might want to string several of these together
* in a list if anybody wants to make use of NFSv4 COMPOUND
* operations and/or has a need for scatter/gather involving pages.
*/
struct xdr_buf {
struct iovec head[1], /* RPC header + non-page data */
tail[1]; /* Appended after page data */
struct page ** pages; /* Array of contiguous pages */
unsigned int page_base, /* Start of page data */
page_len; /* Length of page data */
unsigned int len; /* Total length of data */
};
/*
* pre-xdr'ed macros.
*/
......@@ -68,6 +93,9 @@ u32 * xdr_encode_netobj(u32 *p, const struct xdr_netobj *);
u32 * xdr_decode_netobj(u32 *p, struct xdr_netobj *);
u32 * xdr_decode_netobj_fixed(u32 *p, void *obj, unsigned int len);
void xdr_encode_pages(struct xdr_buf *, struct page **, unsigned int,
unsigned int);
/*
* Decode 64bit quantities (NFSv3 support)
*/
......@@ -99,6 +127,12 @@ xdr_adjust_iovec(struct iovec *iov, u32 *p)
void xdr_shift_iovec(struct iovec *, int, size_t);
void xdr_zero_iovec(struct iovec *, int, size_t);
/*
* XDR buffer helper functions
*/
extern int xdr_kmap(struct iovec *, struct xdr_buf *, unsigned int);
extern void xdr_kunmap(struct xdr_buf *, unsigned int);
#endif /* __KERNEL__ */
#endif /* _SUNRPC_XDR_H_ */
......@@ -13,6 +13,7 @@
#include <linux/socket.h>
#include <linux/in.h>
#include <linux/sunrpc/sched.h>
#include <linux/sunrpc/xdr.h>
/*
* Maximum number of iov's we use.
......@@ -87,7 +88,7 @@ struct rpc_rqst {
*/
struct rpc_xprt * rq_xprt; /* RPC client */
struct rpc_timeout rq_timeout; /* timeout parms */
struct rpc_iov rq_snd_buf; /* send buffer */
struct xdr_buf rq_snd_buf; /* send buffer */
struct rpc_iov rq_rcv_buf; /* recv buffer */
/*
......@@ -113,9 +114,8 @@ struct rpc_rqst {
unsigned long rq_xtime; /* when transmitted */
#endif
};
#define rq_svec rq_snd_buf.io_vec
#define rq_snr rq_snd_buf.io_nr
#define rq_slen rq_snd_buf.io_len
#define rq_svec rq_snd_buf.head
#define rq_slen rq_snd_buf.len
#define rq_rvec rq_rcv_buf.io_vec
#define rq_rnr rq_rcv_buf.io_nr
#define rq_rlen rq_rcv_buf.io_len
......
......@@ -461,6 +461,7 @@ call_encode(struct rpc_task *task)
{
struct rpc_clnt *clnt = task->tk_client;
struct rpc_rqst *req = task->tk_rqstp;
struct xdr_buf *sndbuf = &req->rq_snd_buf;
unsigned int bufsiz;
kxdrproc_t encode;
int status;
......@@ -473,10 +474,11 @@ call_encode(struct rpc_task *task)
/* Default buffer setup */
bufsiz = rpcproc_bufsiz(clnt, task->tk_msg.rpc_proc)+RPC_SLACK_SPACE;
req->rq_svec[0].iov_base = (void *)task->tk_buffer;
req->rq_svec[0].iov_len = bufsiz;
req->rq_slen = 0;
req->rq_snr = 1;
sndbuf->head[0].iov_base = (void *)task->tk_buffer;
sndbuf->head[0].iov_len = bufsiz;
sndbuf->tail[0].iov_len = 0;
sndbuf->page_len = 0;
sndbuf->len = 0;
req->rq_rvec[0].iov_base = (void *)((char *)task->tk_buffer + bufsiz);
req->rq_rvec[0].iov_len = bufsiz;
req->rq_rlen = bufsiz;
......
......@@ -96,6 +96,7 @@ EXPORT_SYMBOL(xdr_decode_string);
EXPORT_SYMBOL(xdr_decode_string_inplace);
EXPORT_SYMBOL(xdr_decode_netobj);
EXPORT_SYMBOL(xdr_encode_netobj);
EXPORT_SYMBOL(xdr_encode_pages);
EXPORT_SYMBOL(xdr_shift_iovec);
EXPORT_SYMBOL(xdr_zero_iovec);
......
......@@ -10,6 +10,8 @@
#include <linux/socket.h>
#include <linux/string.h>
#include <linux/kernel.h>
#include <linux/pagemap.h>
#include <linux/errno.h>
#include <linux/in.h>
#include <linux/sunrpc/xdr.h>
#include <linux/sunrpc/msg_prot.h>
......@@ -99,6 +101,25 @@ xdr_decode_string_inplace(u32 *p, char **sp, int *lenp, int maxlen)
return p + XDR_QUADLEN(len);
}
void
xdr_encode_pages(struct xdr_buf *xdr, struct page **pages, unsigned int base,
unsigned int len)
{
xdr->pages = pages;
xdr->page_base = base;
xdr->page_len = len;
if (len & 3) {
struct iovec *iov = xdr->tail;
unsigned int pad = 4 - (len & 3);
iov->iov_base = (void *) "\0\0\0";
iov->iov_len = pad;
len += pad;
}
xdr->len += len;
}
/*
* Realign the iovec if the server missed out some reply elements
......@@ -148,3 +169,85 @@ void xdr_zero_iovec(struct iovec *iov, int nr, size_t n)
}
}
}
/*
* Map a struct xdr_buf into an iovec array.
*/
int xdr_kmap(struct iovec *iov_base, struct xdr_buf *xdr, unsigned int base)
{
struct iovec *iov = iov_base;
struct page **ppage = xdr->pages;
unsigned int len, pglen = xdr->page_len;
len = xdr->head[0].iov_len;
if (base < len) {
iov->iov_len = len - base;
iov->iov_base = (char *)xdr->head[0].iov_base + base;
iov++;
base = 0;
} else
base -= len;
if (pglen == 0)
goto map_tail;
if (base >= pglen) {
base -= pglen;
goto map_tail;
}
if (base || xdr->page_base) {
pglen -= base;
base += xdr->page_base;
ppage += base >> PAGE_CACHE_SHIFT;
base &= ~PAGE_CACHE_MASK;
}
do {
len = PAGE_CACHE_SIZE;
iov->iov_base = kmap(*ppage);
if (base) {
iov->iov_base += base;
len -= base;
base = 0;
}
if (pglen < len)
len = pglen;
iov->iov_len = len;
iov++;
ppage++;
} while ((pglen -= len) != 0);
map_tail:
if (xdr->tail[0].iov_len) {
iov->iov_len = xdr->tail[0].iov_len - base;
iov->iov_base = (char *)xdr->tail[0].iov_base + base;
iov++;
}
return (iov - iov_base);
}
void xdr_kunmap(struct xdr_buf *xdr, unsigned int base)
{
struct page **ppage = xdr->pages;
unsigned int pglen = xdr->page_len;
if (!pglen)
return;
if (base > xdr->head[0].iov_len)
base -= xdr->head[0].iov_len;
else
base = 0;
if (base >= pglen)
return;
if (base || xdr->page_base) {
pglen -= base;
ppage += base >> PAGE_CACHE_SHIFT;
}
for (;;) {
flush_dcache_page(*ppage);
flush_page_to_ram(*ppage);
kunmap(*ppage);
if (pglen <= PAGE_CACHE_SIZE)
break;
pglen -= PAGE_CACHE_SIZE;
ppage++;
}
}
......@@ -211,13 +211,11 @@ xprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req)
{
struct socket *sock = xprt->sock;
struct msghdr msg;
struct xdr_buf *xdr = &req->rq_snd_buf;
struct iovec niv[MAX_IOVEC];
unsigned int niov, slen, skip;
mm_segment_t oldfs;
int result;
int slen = req->rq_slen - req->rq_bytes_sent;
struct iovec niv[MAX_IOVEC];
if (slen <= 0)
return 0;
if (!sock)
return -ENOTCONN;
......@@ -226,22 +224,25 @@ xprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req)
req->rq_svec->iov_base,
req->rq_svec->iov_len);
/* Dont repeat bytes */
skip = req->rq_bytes_sent;
slen = xdr->len - skip;
niov = xdr_kmap(niv, xdr, skip);
msg.msg_flags = MSG_DONTWAIT|MSG_NOSIGNAL;
msg.msg_iov = req->rq_svec;
msg.msg_iovlen = req->rq_snr;
msg.msg_iov = niv;
msg.msg_iovlen = niov;
msg.msg_name = (struct sockaddr *) &xprt->addr;
msg.msg_namelen = sizeof(xprt->addr);
msg.msg_control = NULL;
msg.msg_controllen = 0;
/* Dont repeat bytes */
if (req->rq_bytes_sent)
xprt_move_iov(&msg, niv, req->rq_bytes_sent);
oldfs = get_fs(); set_fs(get_ds());
result = sock_sendmsg(sock, &msg, slen);
set_fs(oldfs);
xdr_kunmap(xdr, skip);
dprintk("RPC: xprt_sendmsg(%d) = %d\n", slen, result);
if (result >= 0)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment