Commit bfcdbd79 authored by Trond Myklebust's avatar Trond Myklebust

Subject: [PATCH] NFS: Use parallel read operations to do direct read requests

 The initial implementation of NFS direct reads was entirely synchronous.
 The direct read logic issued one NFS READ operation at a time, and waited
 for the server's reply before issuing the next one.  For large direct
 read requests, this is unnecessarily slow.

 This patch changes the NFS direct read path to dispatch NFS READ operations
 for a single direct read request in parallel and wait for them once.  The
 direct read path is still synchronous in nature, but because the NFS READ
 operations are going in parallel, the completion wait should be much shorter.
Signed-off-by: default avatarChuck Lever <cel@netapp.com>
Signed-off-by: default avatarTrond Myklebust <trond.myklebust@fys.uio.no>
parent e5a9af8f
...@@ -33,6 +33,7 @@ ...@@ -33,6 +33,7 @@
* 08 Jul 2002 Version for 2.4.19, with bug fixes --trondmy * 08 Jul 2002 Version for 2.4.19, with bug fixes --trondmy
* 08 Jun 2003 Port to 2.5 APIs --cel * 08 Jun 2003 Port to 2.5 APIs --cel
* 31 Mar 2004 Handle direct I/O without VFS support --cel * 31 Mar 2004 Handle direct I/O without VFS support --cel
* 15 Sep 2004 Parallel async reads --cel
* *
*/ */
...@@ -43,6 +44,7 @@ ...@@ -43,6 +44,7 @@
#include <linux/smp_lock.h> #include <linux/smp_lock.h>
#include <linux/file.h> #include <linux/file.h>
#include <linux/pagemap.h> #include <linux/pagemap.h>
#include <linux/kref.h>
#include <linux/nfs_fs.h> #include <linux/nfs_fs.h>
#include <linux/nfs_page.h> #include <linux/nfs_page.h>
...@@ -50,10 +52,27 @@ ...@@ -50,10 +52,27 @@
#include <asm/system.h> #include <asm/system.h>
#include <asm/uaccess.h> #include <asm/uaccess.h>
#include <asm/atomic.h>
#define NFSDBG_FACILITY NFSDBG_VFS #define NFSDBG_FACILITY NFSDBG_VFS
#define MAX_DIRECTIO_SIZE (4096UL << PAGE_SHIFT) #define MAX_DIRECTIO_SIZE (4096UL << PAGE_SHIFT)
static kmem_cache_t *nfs_direct_cachep;
/*
* This represents a set of asynchronous requests that we're waiting on
*/
struct nfs_direct_req {
struct kref kref; /* release manager */
struct list_head list; /* nfs_read_data structs */
wait_queue_head_t wait; /* wait for i/o completion */
struct page ** pages; /* pages in our buffer */
unsigned int npages; /* count of pages */
atomic_t complete, /* i/os we're waiting for */
count, /* bytes actually processed */
error; /* any reported error */
};
/** /**
* nfs_get_user_pages - find and set up pages underlying user's buffer * nfs_get_user_pages - find and set up pages underlying user's buffer
...@@ -70,7 +89,8 @@ nfs_get_user_pages(int rw, unsigned long user_addr, size_t size, ...@@ -70,7 +89,8 @@ nfs_get_user_pages(int rw, unsigned long user_addr, size_t size,
unsigned long page_count; unsigned long page_count;
size_t array_size; size_t array_size;
/* set an arbitrary limit to prevent arithmetic overflow */ /* set an arbitrary limit to prevent type overflow */
/* XXX: this can probably be as large as INT_MAX */
if (size > MAX_DIRECTIO_SIZE) if (size > MAX_DIRECTIO_SIZE)
return -EFBIG; return -EFBIG;
...@@ -92,6 +112,8 @@ nfs_get_user_pages(int rw, unsigned long user_addr, size_t size, ...@@ -92,6 +112,8 @@ nfs_get_user_pages(int rw, unsigned long user_addr, size_t size,
/** /**
* nfs_free_user_pages - tear down page struct array * nfs_free_user_pages - tear down page struct array
* @pages: array of page struct pointers underlying target buffer * @pages: array of page struct pointers underlying target buffer
* @npages: number of pages in the array
* @do_dirty: dirty the pages as we release them
*/ */
static void static void
nfs_free_user_pages(struct page **pages, int npages, int do_dirty) nfs_free_user_pages(struct page **pages, int npages, int do_dirty)
...@@ -106,77 +128,231 @@ nfs_free_user_pages(struct page **pages, int npages, int do_dirty) ...@@ -106,77 +128,231 @@ nfs_free_user_pages(struct page **pages, int npages, int do_dirty)
} }
/** /**
* nfs_direct_read_seg - Read in one iov segment. Generate separate * nfs_direct_req_release - release nfs_direct_req structure for direct read
* read RPCs for each "rsize" bytes. * @kref: kref object embedded in an nfs_direct_req structure
* @inode: target inode *
* @ctx: target file open context
* user_addr: starting address of this segment of user's buffer
* count: size of this segment
* file_offset: offset in file to begin the operation
* @pages: array of addresses of page structs defining user's buffer
* nr_pages: size of pages array
*/ */
static int static void nfs_direct_req_release(struct kref *kref)
nfs_direct_read_seg(struct inode *inode, struct nfs_open_context *ctx,
unsigned long user_addr, size_t count, loff_t file_offset,
struct page **pages, int nr_pages)
{ {
const unsigned int rsize = NFS_SERVER(inode)->rsize; struct nfs_direct_req *dreq = container_of(kref, struct nfs_direct_req, kref);
int tot_bytes = 0; kmem_cache_free(nfs_direct_cachep, dreq);
int curpage = 0; }
struct nfs_read_data *rdata;
rdata = nfs_readdata_alloc(); /**
if (!rdata) * nfs_direct_read_alloc - allocate nfs_read_data structures for direct read
return -ENOMEM; * @count: count of bytes for the read request
* @rsize: local rsize setting
*
* Note we also set the number of requests we have in the dreq when we are
* done. This prevents races with I/O completion so we will always wait
* until all requests have been dispatched and completed.
*/
static struct nfs_direct_req *nfs_direct_read_alloc(size_t nbytes, unsigned int rsize)
{
struct list_head *list;
struct nfs_direct_req *dreq;
unsigned int reads = 0;
dreq = kmem_cache_alloc(nfs_direct_cachep, SLAB_KERNEL);
if (!dreq)
return NULL;
kref_init(&dreq->kref);
init_waitqueue_head(&dreq->wait);
INIT_LIST_HEAD(&dreq->list);
atomic_set(&dreq->count, 0);
atomic_set(&dreq->error, 0);
list = &dreq->list;
for(;;) {
struct nfs_read_data *data = nfs_readdata_alloc();
if (unlikely(!data)) {
while (!list_empty(list)) {
data = list_entry(list->next,
struct nfs_read_data, pages);
list_del(&data->pages);
nfs_readdata_free(data);
}
kref_put(&dreq->kref, nfs_direct_req_release);
return NULL;
}
memset(rdata, 0, sizeof(*rdata)); INIT_LIST_HEAD(&data->pages);
rdata->inode = inode; list_add(&data->pages, list);
rdata->cred = ctx->cred;
rdata->args.fh = NFS_FH(inode);
rdata->args.context = ctx;
rdata->res.fattr = &rdata->fattr;
rdata->args.pgbase = user_addr & ~PAGE_MASK; data->req = (struct nfs_page *) dreq;
rdata->args.offset = file_offset; reads++;
do { if (nbytes <= rsize)
int result; break;
nbytes -= rsize;
}
kref_get(&dreq->kref);
atomic_set(&dreq->complete, reads);
return dreq;
}
/**
* nfs_direct_read_result - handle a read reply for a direct read request
* @data: address of NFS READ operation control block
* @status: status of this NFS READ operation
*
* We must hold a reference to all the pages in this direct read request
* until the RPCs complete. This could be long *after* we are woken up in
* nfs_direct_read_wait (for instance, if someone hits ^C on a slow server).
*/
static void nfs_direct_read_result(struct nfs_read_data *data, int status)
{
struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req;
if (likely(status >= 0))
atomic_add(data->res.count, &dreq->count);
else
atomic_set(&dreq->error, status);
if (unlikely(atomic_dec_and_test(&dreq->complete))) {
nfs_free_user_pages(dreq->pages, dreq->npages, 1);
wake_up(&dreq->wait);
kref_put(&dreq->kref, nfs_direct_req_release);
}
}
rdata->args.count = count; /**
if (rdata->args.count > rsize) * nfs_direct_read_schedule - dispatch NFS READ operations for a direct read
rdata->args.count = rsize; * @dreq: address of nfs_direct_req struct for this request
rdata->args.pages = &pages[curpage]; * @inode: target inode
* @ctx: target file open context
* @user_addr: starting address of this segment of user's buffer
* @count: size of this segment
* @file_offset: offset in file to begin the operation
*
* For each nfs_read_data struct that was allocated on the list, dispatch
* an NFS READ operation
*/
static void nfs_direct_read_schedule(struct nfs_direct_req *dreq,
struct inode *inode, struct nfs_open_context *ctx,
unsigned long user_addr, size_t count, loff_t file_offset)
{
struct list_head *list = &dreq->list;
struct page **pages = dreq->pages;
unsigned int curpage, pgbase;
unsigned int rsize = NFS_SERVER(inode)->rsize;
dprintk("NFS: direct read: c=%u o=%Ld ua=%lu, pb=%u, cp=%u\n", curpage = 0;
rdata->args.count, (long long) rdata->args.offset, pgbase = user_addr & ~PAGE_MASK;
user_addr + tot_bytes, rdata->args.pgbase, curpage); do {
struct nfs_read_data *data;
unsigned int bytes;
bytes = rsize;
if (count < rsize)
bytes = count;
data = list_entry(list->next, struct nfs_read_data, pages);
list_del_init(&data->pages);
data->inode = inode;
data->cred = ctx->cred;
data->args.fh = NFS_FH(inode);
data->args.context = ctx;
data->args.offset = file_offset;
data->args.pgbase = pgbase;
data->args.pages = &pages[curpage];
data->args.count = bytes;
data->res.fattr = &data->fattr;
data->res.eof = 0;
data->res.count = bytes;
NFS_PROTO(inode)->read_setup(data);
data->task.tk_cookie = (unsigned long) inode;
data->task.tk_calldata = data;
data->task.tk_release = nfs_readdata_release;
data->complete = nfs_direct_read_result;
lock_kernel(); lock_kernel();
result = NFS_PROTO(inode)->read(rdata); rpc_execute(&data->task);
unlock_kernel(); unlock_kernel();
if (result <= 0) { dfprintk(VFS, "NFS: %4d initiated direct read call (req %s/%Ld, %u bytes @ offset %Lu)\n",
if (tot_bytes > 0) data->task.tk_pid,
break; inode->i_sb->s_id,
if (result == -EISDIR) (long long)NFS_FILEID(inode),
result = -EINVAL; bytes,
nfs_readdata_free(rdata); (unsigned long long)data->args.offset);
return result;
}
tot_bytes += result; file_offset += bytes;
if (rdata->res.eof) pgbase += bytes;
break; curpage += pgbase >> PAGE_SHIFT;
pgbase &= ~PAGE_MASK;
rdata->args.offset += result; count -= bytes;
rdata->args.pgbase += result;
curpage += rdata->args.pgbase >> PAGE_SHIFT;
rdata->args.pgbase &= ~PAGE_MASK;
count -= result;
} while (count != 0); } while (count != 0);
}
nfs_readdata_free(rdata); /**
return tot_bytes; * nfs_direct_read_wait - wait for I/O completion for direct reads
* @dreq: request on which we are to wait
* @intr: whether or not this wait can be interrupted
*
* Collects and returns the final error value/byte-count.
*/
static ssize_t nfs_direct_read_wait(struct nfs_direct_req *dreq, int intr)
{
int result = 0;
if (intr) {
result = wait_event_interruptible(dreq->wait,
(atomic_read(&dreq->complete) == 0));
} else {
wait_event(dreq->wait, (atomic_read(&dreq->complete) == 0));
}
if (!result)
result = atomic_read(&dreq->error);
if (!result)
result = atomic_read(&dreq->count);
kref_put(&dreq->kref, nfs_direct_req_release);
return (ssize_t) result;
}
/**
* nfs_direct_read_seg - Read in one iov segment. Generate separate
* read RPCs for each "rsize" bytes.
* @inode: target inode
* @ctx: target file open context
* @user_addr: starting address of this segment of user's buffer
* @count: size of this segment
* @file_offset: offset in file to begin the operation
* @pages: array of addresses of page structs defining user's buffer
* @nr_pages: number of pages in the array
*
*/
static ssize_t nfs_direct_read_seg(struct inode *inode,
struct nfs_open_context *ctx, unsigned long user_addr,
size_t count, loff_t file_offset, struct page **pages,
unsigned int nr_pages)
{
ssize_t result;
sigset_t oldset;
struct rpc_clnt *clnt = NFS_CLIENT(inode);
struct nfs_direct_req *dreq;
dreq = nfs_direct_read_alloc(count, NFS_SERVER(inode)->rsize);
if (!dreq)
return -ENOMEM;
dreq->pages = pages;
dreq->npages = nr_pages;
rpc_clnt_sigmask(clnt, &oldset);
nfs_direct_read_schedule(dreq, inode, ctx, user_addr, count,
file_offset);
result = nfs_direct_read_wait(dreq, clnt->cl_intr);
rpc_clnt_sigunmask(clnt, &oldset);
return result;
} }
/** /**
...@@ -218,8 +394,6 @@ nfs_direct_read(struct inode *inode, struct nfs_open_context *ctx, ...@@ -218,8 +394,6 @@ nfs_direct_read(struct inode *inode, struct nfs_open_context *ctx,
result = nfs_direct_read_seg(inode, ctx, user_addr, size, result = nfs_direct_read_seg(inode, ctx, user_addr, size,
file_offset, pages, page_count); file_offset, pages, page_count);
nfs_free_user_pages(pages, page_count, 1);
if (result <= 0) { if (result <= 0) {
if (tot_bytes > 0) if (tot_bytes > 0)
break; break;
...@@ -245,14 +419,15 @@ nfs_direct_read(struct inode *inode, struct nfs_open_context *ctx, ...@@ -245,14 +419,15 @@ nfs_direct_read(struct inode *inode, struct nfs_open_context *ctx,
* @pages: array of addresses of page structs defining user's buffer * @pages: array of addresses of page structs defining user's buffer
* nr_pages: size of pages array * nr_pages: size of pages array
*/ */
static int static ssize_t nfs_direct_write_seg(struct inode *inode,
nfs_direct_write_seg(struct inode *inode, struct nfs_open_context *ctx, struct nfs_open_context *ctx, unsigned long user_addr,
unsigned long user_addr, size_t count, loff_t file_offset, size_t count, loff_t file_offset, struct page **pages,
struct page **pages, int nr_pages) int nr_pages)
{ {
const unsigned int wsize = NFS_SERVER(inode)->wsize; const unsigned int wsize = NFS_SERVER(inode)->wsize;
size_t request; size_t request;
int curpage, need_commit, result, tot_bytes; int curpage, need_commit;
ssize_t result, tot_bytes;
struct nfs_writeverf first_verf; struct nfs_writeverf first_verf;
struct nfs_write_data *wdata; struct nfs_write_data *wdata;
...@@ -362,9 +537,9 @@ nfs_direct_write_seg(struct inode *inode, struct nfs_open_context *ctx, ...@@ -362,9 +537,9 @@ nfs_direct_write_seg(struct inode *inode, struct nfs_open_context *ctx,
* that non-direct readers might access, so they will pick up these * that non-direct readers might access, so they will pick up these
* writes immediately. * writes immediately.
*/ */
static int nfs_direct_write(struct inode *inode, struct nfs_open_context *ctx, static ssize_t nfs_direct_write(struct inode *inode,
const struct iovec *iov, loff_t file_offset, struct nfs_open_context *ctx, const struct iovec *iov,
unsigned long nr_segs) loff_t file_offset, unsigned long nr_segs)
{ {
ssize_t tot_bytes = 0; ssize_t tot_bytes = 0;
unsigned long seg = 0; unsigned long seg = 0;
...@@ -607,3 +782,21 @@ nfs_file_direct_write(struct kiocb *iocb, const char __user *buf, size_t count, ...@@ -607,3 +782,21 @@ nfs_file_direct_write(struct kiocb *iocb, const char __user *buf, size_t count,
out: out:
return retval; return retval;
} }
int nfs_init_directcache(void)
{
nfs_direct_cachep = kmem_cache_create("nfs_direct_cache",
sizeof(struct nfs_direct_req),
0, SLAB_RECLAIM_ACCOUNT,
NULL, NULL);
if (nfs_direct_cachep == NULL)
return -ENOMEM;
return 0;
}
void nfs_destroy_directcache(void)
{
if (kmem_cache_destroy(nfs_direct_cachep))
printk(KERN_INFO "nfs_direct_cache: not all structures were freed\n");
}
...@@ -968,7 +968,7 @@ __nfs_revalidate_inode(struct nfs_server *server, struct inode *inode) ...@@ -968,7 +968,7 @@ __nfs_revalidate_inode(struct nfs_server *server, struct inode *inode)
/* Protect against RPC races by saving the change attribute */ /* Protect against RPC races by saving the change attribute */
verifier = nfs_save_change_attribute(inode); verifier = nfs_save_change_attribute(inode);
status = NFS_PROTO(inode)->getattr(server, NFS_FH(inode), &fattr); status = NFS_PROTO(inode)->getattr(server, NFS_FH(inode), &fattr);
if (status) { if (status != 0) {
dfprintk(PAGECACHE, "nfs_revalidate_inode: (%s/%Ld) getattr failed, error=%d\n", dfprintk(PAGECACHE, "nfs_revalidate_inode: (%s/%Ld) getattr failed, error=%d\n",
inode->i_sb->s_id, inode->i_sb->s_id,
(long long)NFS_FILEID(inode), status); (long long)NFS_FILEID(inode), status);
...@@ -1828,9 +1828,13 @@ static struct file_system_type nfs4_fs_type = { ...@@ -1828,9 +1828,13 @@ static struct file_system_type nfs4_fs_type = {
extern int nfs_init_nfspagecache(void); extern int nfs_init_nfspagecache(void);
extern void nfs_destroy_nfspagecache(void); extern void nfs_destroy_nfspagecache(void);
extern int nfs_init_readpagecache(void); extern int nfs_init_readpagecache(void);
extern int nfs_destroy_readpagecache(void); extern void nfs_destroy_readpagecache(void);
extern int nfs_init_writepagecache(void); extern int nfs_init_writepagecache(void);
extern int nfs_destroy_writepagecache(void); extern void nfs_destroy_writepagecache(void);
#ifdef CONFIG_NFS_DIRECTIO
extern int nfs_init_directcache(void);
extern void nfs_destroy_directcache(void);
#endif
static kmem_cache_t * nfs_inode_cachep; static kmem_cache_t * nfs_inode_cachep;
...@@ -1911,6 +1915,12 @@ static int __init init_nfs_fs(void) ...@@ -1911,6 +1915,12 @@ static int __init init_nfs_fs(void)
if (err) if (err)
goto out1; goto out1;
#ifdef CONFIG_NFS_DIRECTIO
err = nfs_init_directcache();
if (err)
goto out0;
#endif
#ifdef CONFIG_PROC_FS #ifdef CONFIG_PROC_FS
rpc_proc_register(&nfs_rpcstat); rpc_proc_register(&nfs_rpcstat);
#endif #endif
...@@ -1921,8 +1931,14 @@ static int __init init_nfs_fs(void) ...@@ -1921,8 +1931,14 @@ static int __init init_nfs_fs(void)
goto out; goto out;
return 0; return 0;
out: out:
#ifdef CONFIG_PROC_FS
rpc_proc_unregister("nfs"); rpc_proc_unregister("nfs");
#endif
nfs_destroy_writepagecache(); nfs_destroy_writepagecache();
#ifdef CONFIG_NFS_DIRECTIO
out0:
nfs_destroy_directcache();
#endif
out1: out1:
nfs_destroy_readpagecache(); nfs_destroy_readpagecache();
out2: out2:
...@@ -1935,6 +1951,9 @@ static int __init init_nfs_fs(void) ...@@ -1935,6 +1951,9 @@ static int __init init_nfs_fs(void)
static void __exit exit_nfs_fs(void) static void __exit exit_nfs_fs(void)
{ {
#ifdef CONFIG_NFS_DIRECTIO
nfs_destroy_directcache();
#endif
nfs_destroy_writepagecache(); nfs_destroy_writepagecache();
nfs_destroy_readpagecache(); nfs_destroy_readpagecache();
nfs_destroy_inodecache(); nfs_destroy_inodecache();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment