Commit 7e19d190 authored by Trond Myklebust's avatar Trond Myklebust

NFSv2/v3/v4: Add support for asynchronous reads even if rsize<PAGE_CACHE_SIZE.

parent 6c2689a5
......@@ -128,6 +128,7 @@ nfs_direct_read_seg(struct inode *inode, struct file *file,
.inode = inode,
.args = {
.fh = NFS_FH(inode),
.lockowner = current->files,
},
.res = {
.fattr = &rdata.fattr,
......
......@@ -729,11 +729,10 @@ nfs3_read_done(struct rpc_task *task)
}
static void
nfs3_proc_read_setup(struct nfs_read_data *data, unsigned int count)
nfs3_proc_read_setup(struct nfs_read_data *data)
{
struct rpc_task *task = &data->task;
struct inode *inode = data->inode;
struct nfs_page *req;
int flags;
struct rpc_message msg = {
.rpc_proc = &nfs3_procedures[NFS3PROC_READ],
......@@ -742,26 +741,12 @@ nfs3_proc_read_setup(struct nfs_read_data *data, unsigned int count)
.rpc_cred = data->cred,
};
req = nfs_list_entry(data->pages.next);
data->args.fh = NFS_FH(inode);
data->args.offset = req_offset(req);
data->args.pgbase = req->wb_pgbase;
data->args.pages = data->pagevec;
data->args.count = count;
data->res.fattr = &data->fattr;
data->res.count = count;
data->res.eof = 0;
/* N.B. Do we need to test? Never called for swapfile inode */
flags = RPC_TASK_ASYNC | (IS_SWAPFILE(inode)? NFS_RPC_SWAPFLAGS : 0);
/* Finalize the task. */
rpc_init_task(task, NFS_CLIENT(inode), nfs3_read_done, flags);
task->tk_calldata = data;
/* Release requests */
task->tk_release = nfs_readdata_release;
rpc_call_setup(&data->task, &msg, 0);
rpc_call_setup(task, &msg, 0);
}
static void
......
......@@ -1096,10 +1096,10 @@ nfs4_proc_read(struct nfs_read_data *rdata, struct file *filp)
if (filp) {
struct nfs4_state *state;
state = (struct nfs4_state *)filp->private_data;
nfs4_copy_stateid(&rdata->args.stateid, state, rdata->lockowner);
rdata->args.state = state;
msg.rpc_cred = state->owner->so_cred;
} else {
memcpy(&rdata->args.stateid, &zero_stateid, sizeof(rdata->args.stateid));
rdata->args.state = NULL;
msg.rpc_cred = NFS_I(inode)->mm_cred;
}
......@@ -1509,20 +1509,6 @@ nfs4_proc_pathconf(struct nfs_server *server, struct nfs_fh *fhandle,
return nfs4_map_errors(nfs4_call_compound(&compound, NULL, 0));
}
static void
nfs4_restart_read(struct rpc_task *task)
{
struct nfs_read_data *data = (struct nfs_read_data *)task->tk_calldata;
struct nfs_page *req;
rpc_restart_call(task);
req = nfs_list_entry(data->pages.next);
if (req->wb_state)
nfs4_copy_stateid(&data->args.stateid, req->wb_state, req->wb_lockowner);
else
memcpy(&data->args.stateid, &zero_stateid, sizeof(data->args.stateid));
}
static void
nfs4_read_done(struct rpc_task *task)
{
......@@ -1530,7 +1516,7 @@ nfs4_read_done(struct rpc_task *task)
struct inode *inode = data->inode;
if (nfs4_async_handle_error(task, NFS_SERVER(inode)) == -EAGAIN) {
task->tk_action = nfs4_restart_read;
rpc_restart_call(task);
return;
}
if (task->tk_status > 0)
......@@ -1540,7 +1526,7 @@ nfs4_read_done(struct rpc_task *task)
}
static void
nfs4_proc_read_setup(struct nfs_read_data *data, unsigned int count)
nfs4_proc_read_setup(struct nfs_read_data *data)
{
struct rpc_task *task = &data->task;
struct rpc_message msg = {
......@@ -1550,34 +1536,15 @@ nfs4_proc_read_setup(struct nfs_read_data *data, unsigned int count)
.rpc_cred = data->cred,
};
struct inode *inode = data->inode;
struct nfs_page *req = nfs_list_entry(data->pages.next);
int flags;
data->args.fh = NFS_FH(inode);
data->args.offset = req_offset(req);
data->args.pgbase = req->wb_pgbase;
data->args.pages = data->pagevec;
data->args.count = count;
data->res.fattr = &data->fattr;
data->res.count = count;
data->res.eof = 0;
data->timestamp = jiffies;
data->lockowner = req->wb_lockowner;
if (req->wb_state)
nfs4_copy_stateid(&data->args.stateid, req->wb_state, req->wb_lockowner);
else
memcpy(&data->args.stateid, &zero_stateid, sizeof(data->args.stateid));
/* N.B. Do we need to test? Never called for swapfile inode */
flags = RPC_TASK_ASYNC | (IS_SWAPFILE(inode)? NFS_RPC_SWAPFLAGS : 0);
/* Finalize the task. */
rpc_init_task(task, NFS_CLIENT(inode), nfs4_read_done, flags);
task->tk_calldata = data;
/* Release requests */
task->tk_release = nfs_readdata_release;
rpc_call_setup(task, &msg, 0);
}
......
......@@ -868,14 +868,32 @@ encode_putrootfh(struct xdr_stream *xdr)
return 0;
}
static void
encode_stateid(struct xdr_stream *xdr, struct nfs4_state *state, fl_owner_t lockowner)
{
extern nfs4_stateid zero_stateid;
nfs4_stateid stateid;
uint32_t *p;
RESERVE_SPACE(16);
if (state != NULL) {
nfs4_copy_stateid(&stateid, state, lockowner);
WRITEMEM(stateid.data, sizeof(stateid.data));
} else
WRITEMEM(zero_stateid.data, sizeof(zero_stateid.data));
}
static int
encode_read(struct xdr_stream *xdr, struct nfs_readargs *args)
{
uint32_t *p;
RESERVE_SPACE(32);
RESERVE_SPACE(4);
WRITE32(OP_READ);
WRITEMEM(args->stateid.data, sizeof(args->stateid.data));
encode_stateid(xdr, args->state, args->lockowner);
RESERVE_SPACE(12);
WRITE64(args->offset);
WRITE32(args->count);
......
......@@ -559,11 +559,10 @@ nfs_read_done(struct rpc_task *task)
}
static void
nfs_proc_read_setup(struct nfs_read_data *data, unsigned int count)
nfs_proc_read_setup(struct nfs_read_data *data)
{
struct rpc_task *task = &data->task;
struct inode *inode = data->inode;
struct nfs_page *req;
int flags;
struct rpc_message msg = {
.rpc_proc = &nfs_procedures[NFSPROC_READ],
......@@ -572,26 +571,12 @@ nfs_proc_read_setup(struct nfs_read_data *data, unsigned int count)
.rpc_cred = data->cred,
};
req = nfs_list_entry(data->pages.next);
data->args.fh = NFS_FH(inode);
data->args.offset = req_offset(req);
data->args.pgbase = req->wb_pgbase;
data->args.pages = data->pagevec;
data->args.count = count;
data->res.fattr = &data->fattr;
data->res.count = count;
data->res.eof = 0;
/* N.B. Do we need to test? Never called for swapfile inode */
flags = RPC_TASK_ASYNC | (IS_SWAPFILE(inode)? NFS_RPC_SWAPFLAGS : 0);
/* Finalize the task. */
rpc_init_task(task, NFS_CLIENT(inode), nfs_read_done, flags);
task->tk_calldata = data;
/* Release requests */
task->tk_release = nfs_readdata_release;
rpc_call_setup(&data->task, &msg, 0);
rpc_call_setup(task, &msg, 0);
}
static void
......
......@@ -35,6 +35,8 @@
#define NFSDBG_FACILITY NFSDBG_PAGECACHE
static int nfs_pagein_one(struct list_head *, struct inode *);
static void nfs_readpage_result_partial(struct nfs_read_data *, int);
static void nfs_readpage_result_full(struct nfs_read_data *, int);
static kmem_cache_t *nfs_rdata_cachep;
static mempool_t *nfs_rdata_mempool;
......@@ -57,12 +59,37 @@ static __inline__ void nfs_readdata_free(struct nfs_read_data *p)
mempool_free(p, nfs_rdata_mempool);
}
void nfs_readdata_release(struct rpc_task *task)
static void nfs_readdata_release(struct rpc_task *task)
{
struct nfs_read_data *data = (struct nfs_read_data *)task->tk_calldata;
nfs_readdata_free(data);
}
static
unsigned int nfs_page_length(struct inode *inode, struct page *page)
{
loff_t i_size = i_size_read(inode);
unsigned long idx;
if (i_size <= 0)
return 0;
idx = (i_size - 1) >> PAGE_CACHE_SHIFT;
if (page->index > idx)
return 0;
if (page->index != idx)
return PAGE_CACHE_SIZE;
return 1 + ((i_size - 1) & (PAGE_CACHE_SIZE - 1));
}
static
int nfs_return_empty_page(struct page *page)
{
memclear_highpage_flush(page, 0, PAGE_CACHE_SIZE);
SetPageUptodate(page);
unlock_page(page);
return 0;
}
/*
* Read a page synchronously.
*/
......@@ -78,6 +105,7 @@ nfs_readpage_sync(struct file *file, struct inode *inode, struct page *page)
.inode = inode,
.args = {
.fh = NFS_FH(inode),
.lockowner = current->files,
.pages = &page,
.pgbase = 0UL,
.count = rsize,
......@@ -146,89 +174,208 @@ nfs_readpage_async(struct file *file, struct inode *inode, struct page *page)
{
LIST_HEAD(one_request);
struct nfs_page *new;
unsigned int len;
new = nfs_create_request(file, inode, page, 0, PAGE_CACHE_SIZE);
len = nfs_page_length(inode, page);
if (len == 0)
return nfs_return_empty_page(page);
new = nfs_create_request(file, inode, page, 0, len);
if (IS_ERR(new)) {
unlock_page(page);
return PTR_ERR(new);
}
if (len < PAGE_CACHE_SIZE)
memclear_highpage_flush(page, len, PAGE_CACHE_SIZE - len);
nfs_lock_request(new);
nfs_list_add_request(new, &one_request);
nfs_pagein_one(&one_request, inode);
return 0;
}
static void nfs_readpage_release(struct nfs_page *req)
{
unlock_page(req->wb_page);
nfs_clear_request(req);
nfs_release_request(req);
nfs_unlock_request(req);
dprintk("NFS: read done (%s/%Ld %d@%Ld)\n",
req->wb_inode->i_sb->s_id,
(long long)NFS_FILEID(req->wb_inode),
req->wb_bytes,
(long long)req_offset(req));
}
/*
* Set up the NFS read request struct
*/
static void
nfs_read_rpcsetup(struct list_head *head, struct nfs_read_data *data)
static void nfs_read_rpcsetup(struct nfs_page *req, struct nfs_read_data *data,
unsigned int count, unsigned int offset)
{
struct inode *inode;
struct nfs_page *req;
struct page **pages;
unsigned int count;
pages = data->pagevec;
count = 0;
while (!list_empty(head)) {
req = nfs_list_entry(head->next);
nfs_list_remove_request(req);
nfs_list_add_request(req, &data->pages);
*pages++ = req->wb_page;
count += req->wb_bytes;
}
req = nfs_list_entry(data->pages.next);
data->req = req;
data->inode = inode = req->wb_inode;
data->cred = req->wb_cred;
NFS_PROTO(inode)->read_setup(data, count);
data->args.fh = NFS_FH(inode);
data->args.offset = req_offset(req) + offset;
data->args.pgbase = req->wb_pgbase + offset;
data->args.pages = data->pagevec;
data->args.count = count;
data->args.lockowner = req->wb_lockowner;
data->args.state = req->wb_state;
data->res.fattr = &data->fattr;
data->res.count = count;
data->res.eof = 0;
NFS_PROTO(inode)->read_setup(data);
dprintk("NFS: %4d initiated read call (req %s/%Ld, %u bytes @ offset %Lu.\n",
data->task.tk_calldata = data;
/* Release requests */
data->task.tk_release = nfs_readdata_release;
dprintk("NFS: %4d initiated read call (req %s/%Ld, %u bytes @ offset %Lu)\n",
data->task.tk_pid,
inode->i_sb->s_id,
(long long)NFS_FILEID(inode),
count,
(unsigned long long)req_offset(req));
(unsigned long long)data->args.offset);
}
static void
nfs_async_read_error(struct list_head *head)
{
struct nfs_page *req;
struct page *page;
while (!list_empty(head)) {
req = nfs_list_entry(head->next);
page = req->wb_page;
nfs_list_remove_request(req);
SetPageError(page);
unlock_page(page);
nfs_clear_request(req);
nfs_release_request(req);
nfs_unlock_request(req);
SetPageError(req->wb_page);
nfs_readpage_release(req);
}
}
static int
nfs_pagein_one(struct list_head *head, struct inode *inode)
/*
* Start an async read operation
*/
static void nfs_execute_read(struct nfs_read_data *data)
{
struct rpc_clnt *clnt = NFS_CLIENT(inode);
struct nfs_read_data *data;
struct rpc_clnt *clnt = NFS_CLIENT(data->inode);
sigset_t oldset;
data = nfs_readdata_alloc();
if (!data)
goto out_bad;
nfs_read_rpcsetup(head, data);
/* Start the async call */
rpc_clnt_sigmask(clnt, &oldset);
lock_kernel();
rpc_execute(&data->task);
unlock_kernel();
rpc_clnt_sigunmask(clnt, &oldset);
}
/*
* Generate multiple requests to fill a single page.
*
* We optimize to reduce the number of read operations on the wire. If we
* detect that we're reading a page, or an area of a page, that is past the
* end of file, we do not generate NFS read operations but just clear the
* parts of the page that would have come back zero from the server anyway.
*
* We rely on the cached value of i_size to make this determination; another
* client can fill pages on the server past our cached end-of-file, but we
* won't see the new data until our attribute cache is updated. This is more
* or less conventional NFS client behavior.
*/
static int nfs_pagein_multi(struct list_head *head, struct inode *inode)
{
struct nfs_page *req = nfs_list_entry(head->next);
struct page *page = req->wb_page;
struct nfs_read_data *data;
unsigned int rsize = NFS_SERVER(inode)->rsize;
unsigned int nbytes, offset;
int requests = 0;
LIST_HEAD(list);
nfs_list_remove_request(req);
nbytes = req->wb_bytes;
for(;;) {
data = nfs_readdata_alloc();
if (!data)
goto out_bad;
list_add(&data->pages, &list);
requests++;
if (nbytes <= rsize)
break;
nbytes -= rsize;
}
atomic_set(&req->wb_complete, requests);
ClearPageError(page);
offset = 0;
nbytes = req->wb_bytes;
do {
data = list_entry(list.next, struct nfs_read_data, pages);
list_del_init(&data->pages);
data->pagevec[0] = page;
data->complete = nfs_readpage_result_partial;
if (nbytes > rsize) {
nfs_read_rpcsetup(req, data, rsize, offset);
offset += rsize;
nbytes -= rsize;
} else {
nfs_read_rpcsetup(req, data, nbytes, offset);
nbytes = 0;
}
nfs_execute_read(data);
} while (nbytes != 0);
return 0;
out_bad:
while (!list_empty(&list)) {
data = list_entry(list.next, struct nfs_read_data, pages);
list_del(&data->pages);
nfs_readdata_free(data);
}
SetPageError(page);
nfs_readpage_release(req);
return -ENOMEM;
}
static int nfs_pagein_one(struct list_head *head, struct inode *inode)
{
struct nfs_page *req;
struct page **pages;
struct nfs_read_data *data;
unsigned int count;
if (NFS_SERVER(inode)->rsize < PAGE_CACHE_SIZE)
return nfs_pagein_multi(head, inode);
data = nfs_readdata_alloc();
if (!data)
goto out_bad;
pages = data->pagevec;
count = 0;
while (!list_empty(head)) {
req = nfs_list_entry(head->next);
nfs_list_remove_request(req);
nfs_list_add_request(req, &data->pages);
ClearPageError(req->wb_page);
*pages++ = req->wb_page;
count += req->wb_bytes;
}
req = nfs_list_entry(data->pages.next);
data->complete = nfs_readpage_result_full;
nfs_read_rpcsetup(req, data, count, 0);
nfs_execute_read(data);
return 0;
out_bad:
nfs_async_read_error(head);
......@@ -257,56 +404,85 @@ nfs_pagein_list(struct list_head *head, int rpages)
return error;
}
/*
* Handle a read reply that fills part of a page.
*/
static void nfs_readpage_result_partial(struct nfs_read_data *data, int status)
{
struct nfs_page *req = data->req;
struct page *page = req->wb_page;
if (status >= 0) {
unsigned int request = data->args.count;
unsigned int result = data->res.count;
if (result < request) {
memclear_highpage_flush(page,
data->args.pgbase + result,
request - result);
if (!data->res.eof)
SetPageError(page);
}
} else
SetPageError(page);
if (atomic_dec_and_test(&req->wb_complete)) {
if (!PageError(page))
SetPageUptodate(page);
nfs_readpage_release(req);
}
}
/*
* This is the callback from RPC telling us whether a reply was
* received or some error occurred (timeout or socket shutdown).
*/
void
nfs_readpage_result(struct rpc_task *task)
static void nfs_readpage_result_full(struct nfs_read_data *data, int status)
{
struct nfs_read_data *data = (struct nfs_read_data *)task->tk_calldata;
unsigned int count = data->res.count;
dprintk("NFS: %4d nfs_readpage_result, (status %d)\n",
task->tk_pid, task->tk_status);
NFS_FLAGS(data->inode) |= NFS_INO_INVALID_ATIME;
while (!list_empty(&data->pages)) {
struct nfs_page *req = nfs_list_entry(data->pages.next);
struct page *page = req->wb_page;
nfs_list_remove_request(req);
if (task->tk_status >= 0) {
if (status >= 0) {
if (count < PAGE_CACHE_SIZE) {
if (count < req->wb_bytes)
memclear_highpage_flush(page,
req->wb_pgbase + count,
req->wb_bytes - count);
if (!data->res.eof)
SetPageError(page);
count = 0;
} else
count -= PAGE_CACHE_SIZE;
SetPageUptodate(page);
} else
SetPageError(page);
unlock_page(page);
dprintk("NFS: read (%s/%Ld %d@%Ld)\n",
req->wb_inode->i_sb->s_id,
(long long)NFS_FILEID(req->wb_inode),
req->wb_bytes,
(long long)req_offset(req));
nfs_clear_request(req);
nfs_release_request(req);
nfs_unlock_request(req);
nfs_readpage_release(req);
}
}
/*
* This is the callback from RPC telling us whether a reply was
* received or some error occurred (timeout or socket shutdown).
*/
void nfs_readpage_result(struct rpc_task *task)
{
struct nfs_read_data *data = (struct nfs_read_data *)task->tk_calldata;
int status = task->tk_status;
dprintk("NFS: %4d nfs_readpage_result, (status %d)\n",
task->tk_pid, status);
NFS_FLAGS(data->inode) |= NFS_INO_INVALID_ATIME;
data->complete(data, status);
}
/*
* Read a page over NFS.
* We read the page synchronously in the following cases:
* - The NFS rsize is smaller than PAGE_CACHE_SIZE. We could kludge our way
* around this by creating several consecutive read requests, but
* that's hardly worth it.
* We read the page synchronously in the following case:
* - The error flag is set for this page. This happens only when a
* previous async read operation failed.
*/
......@@ -329,7 +505,7 @@ nfs_readpage(struct file *file, struct page *page)
if (error)
goto out_error;
if (!PageError(page) && NFS_SERVER(inode)->rsize >= PAGE_CACHE_SIZE) {
if (!IS_SYNC(inode)) {
error = nfs_readpage_async(file, inode, page);
goto out;
}
......@@ -350,27 +526,26 @@ struct nfs_readdesc {
struct file *filp;
};
static int
readpage_sync_filler(void *data, struct page *page)
{
struct nfs_readdesc *desc = (struct nfs_readdesc *)data;
return nfs_readpage_sync(desc->filp, page->mapping->host, page);
}
static int
readpage_async_filler(void *data, struct page *page)
{
struct nfs_readdesc *desc = (struct nfs_readdesc *)data;
struct inode *inode = page->mapping->host;
struct nfs_page *new;
unsigned int len;
nfs_wb_page(inode, page);
new = nfs_create_request(desc->filp, inode, page, 0, PAGE_CACHE_SIZE);
len = nfs_page_length(inode, page);
if (len == 0)
return nfs_return_empty_page(page);
new = nfs_create_request(desc->filp, inode, page, 0, len);
if (IS_ERR(new)) {
SetPageError(page);
unlock_page(page);
return PTR_ERR(new);
}
if (len < PAGE_CACHE_SIZE)
memclear_highpage_flush(page, len, PAGE_CACHE_SIZE - len);
nfs_lock_request(new);
nfs_list_add_request(new, desc->head);
return 0;
......@@ -385,14 +560,16 @@ nfs_readpages(struct file *filp, struct address_space *mapping,
.filp = filp,
.head = &head,
};
struct nfs_server *server = NFS_SERVER(mapping->host);
int is_sync = server->rsize < PAGE_CACHE_SIZE;
struct inode *inode = mapping->host;
struct nfs_server *server = NFS_SERVER(inode);
int ret;
ret = read_cache_pages(mapping, pages,
is_sync ? readpage_sync_filler :
readpage_async_filler,
&desc);
dprintk("NFS: nfs_readpages (%s/%Ld %d)\n",
inode->i_sb->s_id,
(long long)NFS_FILEID(inode),
nr_pages);
ret = read_cache_pages(mapping, pages, readpage_async_filler, &desc);
if (!list_empty(&head)) {
int err = nfs_pagein_list(&head, server->rpages);
if (!ret)
......
......@@ -397,7 +397,6 @@ extern int nfs_readpages(struct file *, struct address_space *,
struct list_head *, unsigned);
extern int nfs_pagein_list(struct list_head *, int);
extern void nfs_readpage_result(struct rpc_task *);
extern void nfs_readdata_release(struct rpc_task *);
/*
* linux/fs/mount_clnt.c
......
......@@ -229,7 +229,8 @@ struct nfs_lockres {
struct nfs_readargs {
struct nfs_fh * fh;
nfs4_stateid stateid;
fl_owner_t lockowner;
struct nfs4_state * state;
__u64 offset;
__u32 count;
unsigned int pgbase;
......@@ -663,7 +664,6 @@ struct nfs_read_data {
struct rpc_task task;
struct inode *inode;
struct rpc_cred *cred;
fl_owner_t lockowner;
struct nfs_fattr fattr; /* fattr storage */
struct list_head pages; /* Coalesced read requests */
struct nfs_page *req; /* multi ops per nfs_page */
......@@ -741,7 +741,7 @@ struct nfs_rpc_ops {
int (*pathconf) (struct nfs_server *, struct nfs_fh *,
struct nfs_pathconf *);
u32 * (*decode_dirent)(u32 *, struct nfs_entry *, int plus);
void (*read_setup) (struct nfs_read_data *, unsigned int count);
void (*read_setup) (struct nfs_read_data *);
void (*write_setup) (struct nfs_write_data *, unsigned int count, int how);
void (*commit_setup) (struct nfs_write_data *, u64 start, u32 len, int how);
int (*file_open) (struct inode *, struct file *);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment