Commit 462d5b32 authored by Chuck Lever's avatar Chuck Lever Committed by Trond Myklebust

NFS: make direct write path generate write requests concurrently

Duplicate infrastructure from direct read path that will allow write
path to generate multiple write requests concurrently.  This will
enable us to add support for aio in this path.

Temporarily we will lose the ability to do UNSTABLE writes followed by
a COMMIT in the direct write path.  However, all applications I am
aware of that use NFS O_DIRECT currently write in relatively small
chunks, so this should not be inconvenient in any way.

Test plan:
Millions of fsx-odirect ops. OraSim.
Signed-off-by: default avatarChuck Lever <cel@netapp.com>
Signed-off-by: default avatarTrond Myklebust <Trond.Myklebust@netapp.com>
parent 63ab46ab
......@@ -384,106 +384,185 @@ static ssize_t nfs_direct_read(struct kiocb *iocb, unsigned long user_addr, size
return result;
}
static ssize_t nfs_direct_write_seg(struct inode *inode, struct nfs_open_context *ctx, unsigned long user_addr, size_t count, loff_t file_offset, struct page **pages, int nr_pages)
static struct nfs_direct_req *nfs_direct_write_alloc(size_t nbytes, size_t wsize)
{
const unsigned int wsize = NFS_SERVER(inode)->wsize;
size_t request;
int curpage, need_commit;
ssize_t result, tot_bytes;
struct nfs_writeverf first_verf;
struct nfs_write_data *wdata;
wdata = nfs_writedata_alloc(NFS_SERVER(inode)->wpages);
if (!wdata)
return -ENOMEM;
struct list_head *list;
struct nfs_direct_req *dreq;
unsigned int writes = 0;
unsigned int wpages = (wsize + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
wdata->inode = inode;
wdata->cred = ctx->cred;
wdata->args.fh = NFS_FH(inode);
wdata->args.context = ctx;
wdata->args.stable = NFS_UNSTABLE;
if (IS_SYNC(inode) || NFS_PROTO(inode)->version == 2 || count <= wsize)
wdata->args.stable = NFS_FILE_SYNC;
wdata->res.fattr = &wdata->fattr;
wdata->res.verf = &wdata->verf;
dreq = nfs_direct_req_alloc();
if (!dreq)
return NULL;
list = &dreq->list;
for(;;) {
struct nfs_write_data *data = nfs_writedata_alloc(wpages);
if (unlikely(!data)) {
while (!list_empty(list)) {
data = list_entry(list->next,
struct nfs_write_data, pages);
list_del(&data->pages);
nfs_writedata_free(data);
}
kref_put(&dreq->kref, nfs_direct_req_release);
return NULL;
}
INIT_LIST_HEAD(&data->pages);
list_add(&data->pages, list);
data->req = (struct nfs_page *) dreq;
writes++;
if (nbytes <= wsize)
break;
nbytes -= wsize;
}
kref_get(&dreq->kref);
atomic_set(&dreq->complete, writes);
return dreq;
}
/*
* Collects and returns the final error value/byte-count.
*/
static ssize_t nfs_direct_write_wait(struct nfs_direct_req *dreq, int intr)
{
int result = 0;
if (intr) {
result = wait_event_interruptible(dreq->wait,
(atomic_read(&dreq->complete) == 0));
} else {
wait_event(dreq->wait, (atomic_read(&dreq->complete) == 0));
}
if (!result)
result = atomic_read(&dreq->error);
if (!result)
result = atomic_read(&dreq->count);
kref_put(&dreq->kref, nfs_direct_req_release);
return (ssize_t) result;
}
static void nfs_direct_write_result(struct rpc_task *task, void *calldata)
{
struct nfs_write_data *data = calldata;
struct nfs_direct_req *dreq = (struct nfs_direct_req *) data->req;
int status = task->tk_status;
if (nfs_writeback_done(task, data) != 0)
return;
/* If the server fell back to an UNSTABLE write, it's an error. */
if (unlikely(data->res.verf->committed != NFS_FILE_SYNC))
status = -EIO;
if (likely(status >= 0))
atomic_add(data->res.count, &dreq->count);
else
atomic_set(&dreq->error, status);
if (unlikely(atomic_dec_and_test(&dreq->complete)))
nfs_direct_complete(dreq);
}
static const struct rpc_call_ops nfs_write_direct_ops = {
.rpc_call_done = nfs_direct_write_result,
.rpc_release = nfs_writedata_release,
};
/*
* For each nfs_write_data struct that was allocated on the list, dispatch
* an NFS WRITE operation
*
* XXX: For now, support only FILE_SYNC writes. Later we may add
* support for UNSTABLE + COMMIT.
*/
static void nfs_direct_write_schedule(struct nfs_direct_req *dreq, struct inode *inode, struct nfs_open_context *ctx, unsigned long user_addr, size_t count, loff_t file_offset)
{
struct list_head *list = &dreq->list;
struct page **pages = dreq->pages;
size_t wsize = NFS_SERVER(inode)->wsize;
unsigned int curpage, pgbase;
nfs_begin_data_update(inode);
retry:
need_commit = 0;
tot_bytes = 0;
curpage = 0;
request = count;
wdata->args.pgbase = user_addr & ~PAGE_MASK;
wdata->args.offset = file_offset;
pgbase = user_addr & ~PAGE_MASK;
do {
wdata->args.count = request;
if (wdata->args.count > wsize)
wdata->args.count = wsize;
wdata->args.pages = &pages[curpage];
struct nfs_write_data *data;
size_t bytes;
bytes = wsize;
if (count < wsize)
bytes = count;
data = list_entry(list->next, struct nfs_write_data, pages);
list_del_init(&data->pages);
data->inode = inode;
data->cred = ctx->cred;
data->args.fh = NFS_FH(inode);
data->args.context = ctx;
data->args.offset = file_offset;
data->args.pgbase = pgbase;
data->args.pages = &pages[curpage];
data->args.count = bytes;
data->res.fattr = &data->fattr;
data->res.count = bytes;
rpc_init_task(&data->task, NFS_CLIENT(inode), RPC_TASK_ASYNC,
&nfs_write_direct_ops, data);
NFS_PROTO(inode)->write_setup(data, FLUSH_STABLE);
dprintk("NFS: direct write: c=%u o=%Ld ua=%lu, pb=%u, cp=%u\n",
wdata->args.count, (long long) wdata->args.offset,
user_addr + tot_bytes, wdata->args.pgbase, curpage);
data->task.tk_priority = RPC_PRIORITY_NORMAL;
data->task.tk_cookie = (unsigned long) inode;
lock_kernel();
result = NFS_PROTO(inode)->write(wdata);
rpc_execute(&data->task);
unlock_kernel();
if (result <= 0) {
if (tot_bytes > 0)
break;
goto out;
}
dfprintk(VFS, "NFS: %4d initiated direct write call (req %s/%Ld, %u bytes @ offset %Lu)\n",
data->task.tk_pid,
inode->i_sb->s_id,
(long long)NFS_FILEID(inode),
bytes,
(unsigned long long)data->args.offset);
if (tot_bytes == 0)
memcpy(&first_verf.verifier, &wdata->verf.verifier,
sizeof(first_verf.verifier));
if (wdata->verf.committed != NFS_FILE_SYNC) {
need_commit = 1;
if (memcmp(&first_verf.verifier, &wdata->verf.verifier,
sizeof(first_verf.verifier)))
goto sync_retry;
}
file_offset += bytes;
pgbase += bytes;
curpage += pgbase >> PAGE_SHIFT;
pgbase &= ~PAGE_MASK;
tot_bytes += result;
count -= bytes;
} while (count != 0);
}
/* in case of a short write: stop now, let the app recover */
if (result < wdata->args.count)
break;
static ssize_t nfs_direct_write_seg(struct inode *inode, struct nfs_open_context *ctx, unsigned long user_addr, size_t count, loff_t file_offset, struct page **pages, int nr_pages)
{
ssize_t result;
sigset_t oldset;
struct rpc_clnt *clnt = NFS_CLIENT(inode);
struct nfs_direct_req *dreq;
wdata->args.offset += result;
wdata->args.pgbase += result;
curpage += wdata->args.pgbase >> PAGE_SHIFT;
wdata->args.pgbase &= ~PAGE_MASK;
request -= result;
} while (request != 0);
dreq = nfs_direct_write_alloc(count, NFS_SERVER(inode)->wsize);
if (!dreq)
return -ENOMEM;
/*
* Commit data written so far, even in the event of an error
*/
if (need_commit) {
wdata->args.count = tot_bytes;
wdata->args.offset = file_offset;
dreq->pages = pages;
dreq->npages = nr_pages;
lock_kernel();
result = NFS_PROTO(inode)->commit(wdata);
unlock_kernel();
nfs_begin_data_update(inode);
if (result < 0 || memcmp(&first_verf.verifier,
&wdata->verf.verifier,
sizeof(first_verf.verifier)) != 0)
goto sync_retry;
}
result = tot_bytes;
rpc_clnt_sigmask(clnt, &oldset);
nfs_direct_write_schedule(dreq, inode, ctx, user_addr, count,
file_offset);
result = nfs_direct_write_wait(dreq, clnt->cl_intr);
rpc_clnt_sigunmask(clnt, &oldset);
out:
nfs_end_data_update(inode);
nfs_writedata_free(wdata);
return result;
sync_retry:
wdata->args.stable = NFS_FILE_SYNC;
goto retry;
return result;
}
/*
......@@ -515,7 +594,6 @@ static ssize_t nfs_direct_write(struct inode *inode, struct nfs_open_context *ct
nfs_add_stats(inode, NFSIOS_DIRECTWRITTENBYTES, size);
result = nfs_direct_write_seg(inode, ctx, user_addr, size,
file_offset, pages, page_count);
nfs_free_user_pages(pages, page_count, 0);
if (result <= 0) {
if (tot_bytes > 0)
......
......@@ -77,7 +77,6 @@ static struct nfs_page * nfs_update_request(struct nfs_open_context*,
struct inode *,
struct page *,
unsigned int, unsigned int);
static int nfs_writeback_done(struct rpc_task *, struct nfs_write_data *);
static int nfs_wait_on_write_congestion(struct address_space *, int);
static int nfs_wait_on_requests(struct inode *, unsigned long, unsigned int);
static int nfs_flush_inode(struct inode *inode, unsigned long idx_start,
......@@ -1183,7 +1182,7 @@ static const struct rpc_call_ops nfs_write_full_ops = {
/*
* This function is called when the WRITE call is complete.
*/
static int nfs_writeback_done(struct rpc_task *task, struct nfs_write_data *data)
int nfs_writeback_done(struct rpc_task *task, struct nfs_write_data *data)
{
struct nfs_writeargs *argp = &data->args;
struct nfs_writeres *resp = &data->res;
......
......@@ -407,6 +407,8 @@ extern int nfs_writepage(struct page *page, struct writeback_control *wbc);
extern int nfs_writepages(struct address_space *, struct writeback_control *);
extern int nfs_flush_incompatible(struct file *file, struct page *page);
extern int nfs_updatepage(struct file *, struct page *, unsigned int, unsigned int);
extern int nfs_writeback_done(struct rpc_task *, struct nfs_write_data *);
extern void nfs_writedata_release(void *);
/*
* Try to write back everything synchronously (but check the
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment