Commit 1c5bd76d authored by Trond Myklebust's avatar Trond Myklebust

pNFS: Enable layoutreturn operation for return-on-close

Amend the pnfs return on close helper functions to enable sending the
layoutreturn op in CLOSE/DELEGRETURN. This closes a potential race between
CLOSE/DELEGRETURN and parallel OPEN calls to the same file, and allows the
client and the server to agree on whether or not there is an outstanding
layout.
Signed-off-by: default avatarTrond Myklebust <trond.myklebust@primarydata.com>
parent 828ed9ec
...@@ -3052,7 +3052,8 @@ static void nfs4_free_closedata(void *data) ...@@ -3052,7 +3052,8 @@ static void nfs4_free_closedata(void *data)
struct super_block *sb = calldata->state->inode->i_sb; struct super_block *sb = calldata->state->inode->i_sb;
if (calldata->lr.roc) if (calldata->lr.roc)
pnfs_roc_release(calldata->state->inode); pnfs_roc_release(&calldata->lr.arg, &calldata->lr.res,
calldata->res.lr_ret);
nfs4_put_open_state(calldata->state); nfs4_put_open_state(calldata->state);
nfs_free_seqid(calldata->arg.seqid); nfs_free_seqid(calldata->arg.seqid);
nfs4_put_state_owner(sp); nfs4_put_state_owner(sp);
...@@ -3103,9 +3104,6 @@ static void nfs4_close_done(struct rpc_task *task, void *data) ...@@ -3103,9 +3104,6 @@ static void nfs4_close_done(struct rpc_task *task, void *data)
switch (task->tk_status) { switch (task->tk_status) {
case 0: case 0:
res_stateid = &calldata->res.stateid; res_stateid = &calldata->res.stateid;
if (calldata->lr.roc)
pnfs_roc_set_barrier(state->inode,
calldata->lr.roc_barrier);
renew_lease(server, calldata->timestamp); renew_lease(server, calldata->timestamp);
break; break;
case -NFS4ERR_ADMIN_REVOKED: case -NFS4ERR_ADMIN_REVOKED:
...@@ -3181,7 +3179,7 @@ static void nfs4_close_prepare(struct rpc_task *task, void *data) ...@@ -3181,7 +3179,7 @@ static void nfs4_close_prepare(struct rpc_task *task, void *data)
goto out_no_action; goto out_no_action;
} }
if (!calldata->arg.lr_args && nfs4_wait_on_layoutreturn(inode, task)) { if (!calldata->lr.roc && nfs4_wait_on_layoutreturn(inode, task)) {
nfs_release_seqid(calldata->arg.seqid); nfs_release_seqid(calldata->arg.seqid);
goto out_wait; goto out_wait;
} }
...@@ -3195,8 +3193,6 @@ static void nfs4_close_prepare(struct rpc_task *task, void *data) ...@@ -3195,8 +3193,6 @@ static void nfs4_close_prepare(struct rpc_task *task, void *data)
else else
calldata->arg.bitmask = NULL; calldata->arg.bitmask = NULL;
} }
if (calldata->lr.roc)
pnfs_roc_get_barrier(inode, &calldata->lr.roc_barrier);
calldata->arg.share_access = calldata->arg.share_access =
nfs4_map_atomic_open_share(NFS_SERVER(inode), nfs4_map_atomic_open_share(NFS_SERVER(inode),
...@@ -3223,13 +3219,6 @@ static const struct rpc_call_ops nfs4_close_ops = { ...@@ -3223,13 +3219,6 @@ static const struct rpc_call_ops nfs4_close_ops = {
.rpc_release = nfs4_free_closedata, .rpc_release = nfs4_free_closedata,
}; };
static bool nfs4_roc(struct inode *inode)
{
if (!nfs_have_layout(inode))
return false;
return pnfs_roc(inode);
}
/* /*
* It is possible for data to be read/written from a mem-mapped file * It is possible for data to be read/written from a mem-mapped file
* after the sys_close call (which hits the vfs layer as a flush). * after the sys_close call (which hits the vfs layer as a flush).
...@@ -3281,7 +3270,12 @@ int nfs4_do_close(struct nfs4_state *state, gfp_t gfp_mask, int wait) ...@@ -3281,7 +3270,12 @@ int nfs4_do_close(struct nfs4_state *state, gfp_t gfp_mask, int wait)
calldata->res.seqid = calldata->arg.seqid; calldata->res.seqid = calldata->arg.seqid;
calldata->res.server = server; calldata->res.server = server;
calldata->res.lr_ret = -NFS4ERR_NOMATCHING_LAYOUT; calldata->res.lr_ret = -NFS4ERR_NOMATCHING_LAYOUT;
calldata->lr.roc = nfs4_roc(state->inode); calldata->lr.roc = pnfs_roc(state->inode,
&calldata->lr.arg, &calldata->lr.res, msg.rpc_cred);
if (calldata->lr.roc) {
calldata->arg.lr_args = &calldata->lr.arg;
calldata->res.lr_res = &calldata->lr.res;
}
nfs_sb_active(calldata->inode->i_sb); nfs_sb_active(calldata->inode->i_sb);
msg.rpc_argp = &calldata->arg; msg.rpc_argp = &calldata->arg;
...@@ -5676,8 +5670,6 @@ static void nfs4_delegreturn_done(struct rpc_task *task, void *calldata) ...@@ -5676,8 +5670,6 @@ static void nfs4_delegreturn_done(struct rpc_task *task, void *calldata)
} }
} }
data->rpc_status = task->tk_status; data->rpc_status = task->tk_status;
if (data->lr.roc && data->rpc_status == 0)
pnfs_roc_set_barrier(data->inode, data->lr.roc_barrier);
} }
static void nfs4_delegreturn_release(void *calldata) static void nfs4_delegreturn_release(void *calldata)
...@@ -5687,7 +5679,8 @@ static void nfs4_delegreturn_release(void *calldata) ...@@ -5687,7 +5679,8 @@ static void nfs4_delegreturn_release(void *calldata)
if (inode) { if (inode) {
if (data->lr.roc) if (data->lr.roc)
pnfs_roc_release(inode); pnfs_roc_release(&data->lr.arg, &data->lr.res,
data->res.lr_ret);
nfs_iput_and_deactive(inode); nfs_iput_and_deactive(inode);
} }
kfree(calldata); kfree(calldata);
...@@ -5699,13 +5692,9 @@ static void nfs4_delegreturn_prepare(struct rpc_task *task, void *data) ...@@ -5699,13 +5692,9 @@ static void nfs4_delegreturn_prepare(struct rpc_task *task, void *data)
d_data = (struct nfs4_delegreturndata *)data; d_data = (struct nfs4_delegreturndata *)data;
if (!d_data->args.lr_args && if (!d_data->lr.roc && nfs4_wait_on_layoutreturn(d_data->inode, task))
nfs4_wait_on_layoutreturn(d_data->inode, task))
return; return;
if (d_data->lr.roc)
pnfs_roc_get_barrier(d_data->inode, &d_data->lr.roc_barrier);
nfs4_setup_sequence(d_data->res.server, nfs4_setup_sequence(d_data->res.server,
&d_data->args.seq_args, &d_data->args.seq_args,
&d_data->res.seq_res, &d_data->res.seq_res,
...@@ -5756,8 +5745,14 @@ static int _nfs4_proc_delegreturn(struct inode *inode, struct rpc_cred *cred, co ...@@ -5756,8 +5745,14 @@ static int _nfs4_proc_delegreturn(struct inode *inode, struct rpc_cred *cred, co
data->timestamp = jiffies; data->timestamp = jiffies;
data->rpc_status = 0; data->rpc_status = 0;
data->inode = nfs_igrab_and_active(inode); data->inode = nfs_igrab_and_active(inode);
if (data->inode) if (data->inode) {
data->lr.roc = nfs4_roc(inode); data->lr.roc = pnfs_roc(inode, &data->lr.arg, &data->lr.res,
cred);
if (data->lr.roc) {
data->args.lr_args = &data->lr.arg;
data->res.lr_res = &data->lr.res;
}
}
task_setup_data.callback_data = data; task_setup_data.callback_data = data;
msg.rpc_argp = &data->args; msg.rpc_argp = &data->args;
......
...@@ -984,6 +984,20 @@ void pnfs_layoutreturn_free_lsegs(struct pnfs_layout_hdr *lo, ...@@ -984,6 +984,20 @@ void pnfs_layoutreturn_free_lsegs(struct pnfs_layout_hdr *lo,
} }
static void
pnfs_set_plh_return_info(struct pnfs_layout_hdr *lo, enum pnfs_iomode iomode,
u32 seq)
{
if (lo->plh_return_iomode != 0 && lo->plh_return_iomode != iomode)
iomode = IOMODE_ANY;
lo->plh_return_iomode = iomode;
set_bit(NFS_LAYOUT_RETURN_REQUESTED, &lo->plh_flags);
if (seq != 0) {
WARN_ON_ONCE(lo->plh_return_seq != 0 && lo->plh_return_seq != seq);
lo->plh_return_seq = seq;
}
}
static bool static bool
pnfs_prepare_layoutreturn(struct pnfs_layout_hdr *lo, pnfs_prepare_layoutreturn(struct pnfs_layout_hdr *lo,
nfs4_stateid *stateid, nfs4_stateid *stateid,
...@@ -1188,17 +1202,22 @@ pnfs_commit_and_return_layout(struct inode *inode) ...@@ -1188,17 +1202,22 @@ pnfs_commit_and_return_layout(struct inode *inode)
return ret; return ret;
} }
bool pnfs_roc(struct inode *ino) bool pnfs_roc(struct inode *ino,
struct nfs4_layoutreturn_args *args,
struct nfs4_layoutreturn_res *res,
const struct rpc_cred *cred)
{ {
struct nfs_inode *nfsi = NFS_I(ino); struct nfs_inode *nfsi = NFS_I(ino);
struct nfs_open_context *ctx; struct nfs_open_context *ctx;
struct nfs4_state *state; struct nfs4_state *state;
struct pnfs_layout_hdr *lo; struct pnfs_layout_hdr *lo;
struct pnfs_layout_segment *lseg, *tmp; struct pnfs_layout_segment *lseg, *next;
nfs4_stateid stateid; nfs4_stateid stateid;
LIST_HEAD(tmp_list); enum pnfs_iomode iomode = 0;
bool found = false, layoutreturn = false, roc = false; bool layoutreturn = false, roc = false;
if (!nfs_have_layout(ino))
return false;
spin_lock(&ino->i_lock); spin_lock(&ino->i_lock);
lo = nfsi->layout; lo = nfsi->layout;
if (!lo || !pnfs_layout_is_valid(lo) || if (!lo || !pnfs_layout_is_valid(lo) ||
...@@ -1217,83 +1236,63 @@ bool pnfs_roc(struct inode *ino) ...@@ -1217,83 +1236,63 @@ bool pnfs_roc(struct inode *ino)
} }
list_for_each_entry_safe(lseg, tmp, &lo->plh_segs, pls_list) { list_for_each_entry_safe(lseg, next, &lo->plh_segs, pls_list) {
/* If we are sending layoutreturn, invalidate all valid lsegs */ /* If we are sending layoutreturn, invalidate all valid lsegs */
if (test_bit(NFS_LSEG_ROC, &lseg->pls_flags)) { if (!test_and_clear_bit(NFS_LSEG_ROC, &lseg->pls_flags))
mark_lseg_invalid(lseg, &tmp_list); continue;
found = true; /*
} * Note: mark lseg for return so pnfs_layout_remove_lseg
* doesn't invalidate the layout for us.
*/
set_bit(NFS_LSEG_LAYOUTRETURN, &lseg->pls_flags);
if (!mark_lseg_invalid(lseg, &lo->plh_return_segs))
continue;
pnfs_set_plh_return_info(lo, lseg->pls_range.iomode, 0);
} }
/* always send layoutreturn if being marked so */ if (!test_bit(NFS_LAYOUT_RETURN_REQUESTED, &lo->plh_flags))
if (test_bit(NFS_LAYOUT_RETURN_REQUESTED, &lo->plh_flags)) { goto out_noroc;
layoutreturn = pnfs_prepare_layoutreturn(lo,
&stateid, NULL);
if (layoutreturn)
goto out_noroc;
}
/* ROC in two conditions: /* ROC in two conditions:
* 1. there are ROC lsegs * 1. there are ROC lsegs
* 2. we don't send layoutreturn * 2. we don't send layoutreturn
*/ */
if (found) { /* lo ref dropped in pnfs_roc_release() */
/* lo ref dropped in pnfs_roc_release() */ layoutreturn = pnfs_prepare_layoutreturn(lo, &stateid, &iomode);
pnfs_get_layout_hdr(lo); /* If the creds don't match, we can't compound the layoutreturn */
roc = true; if (!layoutreturn || cred != lo->plh_lc_cred)
} goto out_noroc;
roc = layoutreturn;
pnfs_init_layoutreturn_args(args, lo, &stateid, iomode);
res->lrs_present = 0;
layoutreturn = false;
out_noroc: out_noroc:
spin_unlock(&ino->i_lock); spin_unlock(&ino->i_lock);
pnfs_free_lseg_list(&tmp_list);
pnfs_layoutcommit_inode(ino, true); pnfs_layoutcommit_inode(ino, true);
if (layoutreturn) if (layoutreturn)
pnfs_send_layoutreturn(lo, &stateid, IOMODE_ANY, true); pnfs_send_layoutreturn(lo, &stateid, iomode, true);
return roc; return roc;
} }
void pnfs_roc_release(struct inode *ino) void pnfs_roc_release(struct nfs4_layoutreturn_args *args,
struct nfs4_layoutreturn_res *res,
int ret)
{ {
struct pnfs_layout_hdr *lo; struct pnfs_layout_hdr *lo = args->layout;
const nfs4_stateid *arg_stateid = NULL;
const nfs4_stateid *res_stateid = NULL;
spin_lock(&ino->i_lock); if (ret == 0) {
lo = NFS_I(ino)->layout; arg_stateid = &args->stateid;
pnfs_clear_layoutreturn_waitbit(lo); if (res->lrs_present)
if (atomic_dec_and_test(&lo->plh_refcount)) { res_stateid = &res->stateid;
pnfs_detach_layout_hdr(lo); }
spin_unlock(&ino->i_lock); pnfs_layoutreturn_free_lsegs(lo, arg_stateid, &args->range,
pnfs_free_layout_hdr(lo); res_stateid);
} else pnfs_put_layout_hdr(lo);
spin_unlock(&ino->i_lock); trace_nfs4_layoutreturn_on_close(args->inode, 0);
}
void pnfs_roc_set_barrier(struct inode *ino, u32 barrier)
{
struct pnfs_layout_hdr *lo;
spin_lock(&ino->i_lock);
lo = NFS_I(ino)->layout;
if (pnfs_seqid_is_newer(barrier, lo->plh_barrier))
lo->plh_barrier = barrier;
spin_unlock(&ino->i_lock);
trace_nfs4_layoutreturn_on_close(ino, 0);
}
void pnfs_roc_get_barrier(struct inode *ino, u32 *barrier)
{
struct nfs_inode *nfsi = NFS_I(ino);
struct pnfs_layout_hdr *lo;
u32 current_seqid;
spin_lock(&ino->i_lock);
lo = nfsi->layout;
current_seqid = be32_to_cpu(lo->plh_stateid.seqid);
/* Since close does not return a layout stateid for use as
* a barrier, we choose the worst-case barrier.
*/
*barrier = current_seqid + atomic_read(&lo->plh_outstanding);
spin_unlock(&ino->i_lock);
} }
bool pnfs_wait_on_layoutreturn(struct inode *ino, struct rpc_task *task) bool pnfs_wait_on_layoutreturn(struct inode *ino, struct rpc_task *task)
...@@ -1931,20 +1930,6 @@ pnfs_layout_process(struct nfs4_layoutget *lgp) ...@@ -1931,20 +1930,6 @@ pnfs_layout_process(struct nfs4_layoutget *lgp)
return ERR_PTR(-EAGAIN); return ERR_PTR(-EAGAIN);
} }
static void
pnfs_set_plh_return_info(struct pnfs_layout_hdr *lo, enum pnfs_iomode iomode,
u32 seq)
{
if (lo->plh_return_iomode != 0 && lo->plh_return_iomode != iomode)
iomode = IOMODE_ANY;
lo->plh_return_iomode = iomode;
set_bit(NFS_LAYOUT_RETURN_REQUESTED, &lo->plh_flags);
if (seq != 0) {
WARN_ON_ONCE(lo->plh_return_seq != 0 && lo->plh_return_seq != seq);
lo->plh_return_seq = seq;
}
}
/** /**
* pnfs_mark_matching_lsegs_return - Free or return matching layout segments * pnfs_mark_matching_lsegs_return - Free or return matching layout segments
* @lo: pointer to layout header * @lo: pointer to layout header
......
...@@ -271,10 +271,13 @@ int pnfs_mark_matching_lsegs_return(struct pnfs_layout_hdr *lo, ...@@ -271,10 +271,13 @@ int pnfs_mark_matching_lsegs_return(struct pnfs_layout_hdr *lo,
u32 seq); u32 seq);
int pnfs_mark_layout_stateid_invalid(struct pnfs_layout_hdr *lo, int pnfs_mark_layout_stateid_invalid(struct pnfs_layout_hdr *lo,
struct list_head *lseg_list); struct list_head *lseg_list);
bool pnfs_roc(struct inode *ino); bool pnfs_roc(struct inode *ino,
void pnfs_roc_release(struct inode *ino); struct nfs4_layoutreturn_args *args,
void pnfs_roc_set_barrier(struct inode *ino, u32 barrier); struct nfs4_layoutreturn_res *res,
void pnfs_roc_get_barrier(struct inode *ino, u32 *barrier); const struct rpc_cred *cred);
void pnfs_roc_release(struct nfs4_layoutreturn_args *args,
struct nfs4_layoutreturn_res *res,
int ret);
bool pnfs_wait_on_layoutreturn(struct inode *ino, struct rpc_task *task); bool pnfs_wait_on_layoutreturn(struct inode *ino, struct rpc_task *task);
void pnfs_set_layoutcommit(struct inode *, struct pnfs_layout_segment *, loff_t); void pnfs_set_layoutcommit(struct inode *, struct pnfs_layout_segment *, loff_t);
void pnfs_cleanup_layoutcommit(struct nfs4_layoutcommit_data *data); void pnfs_cleanup_layoutcommit(struct nfs4_layoutcommit_data *data);
...@@ -666,23 +669,18 @@ pnfs_layoutcommit_outstanding(struct inode *inode) ...@@ -666,23 +669,18 @@ pnfs_layoutcommit_outstanding(struct inode *inode)
static inline bool static inline bool
pnfs_roc(struct inode *ino) pnfs_roc(struct inode *ino,
struct nfs4_layoutreturn_args *args,
struct nfs4_layoutreturn_res *res,
const struct rpc_cred *cred)
{ {
return false; return false;
} }
static inline void static inline void
pnfs_roc_release(struct inode *ino) pnfs_roc_release(struct nfs4_layoutreturn_args *args,
{ struct nfs4_layoutreturn_res *res,
} int ret)
static inline void
pnfs_roc_set_barrier(struct inode *ino, u32 barrier)
{
}
static inline void
pnfs_roc_get_barrier(struct inode *ino, u32 *barrier)
{ {
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment