Commit 500d701f authored by Peng Tao's avatar Peng Tao Committed by Trond Myklebust

NFS41: make close wait for layoutreturn

If we send a layoutreturn asynchronously before close, the close
might reach server first and layoutreturn would fail with BADSTATEID
because there is nothing keeping the layout stateid alive.

Also do not pretend sending layoutreturn if we are not.
Signed-off-by: default avatarPeng Tao <tao.peng@primarydata.com>
Signed-off-by: default avatarTrond Myklebust <trond.myklebust@primarydata.com>
parent 834e465b
...@@ -2658,6 +2658,15 @@ static int nfs4_do_setattr(struct inode *inode, struct rpc_cred *cred, ...@@ -2658,6 +2658,15 @@ static int nfs4_do_setattr(struct inode *inode, struct rpc_cred *cred,
return err; return err;
} }
static bool
nfs4_wait_on_layoutreturn(struct inode *inode, struct rpc_task *task)
{
if (inode == NULL || !nfs_have_layout(inode))
return false;
return pnfs_wait_on_layoutreturn(inode, task);
}
struct nfs4_closedata { struct nfs4_closedata {
struct inode *inode; struct inode *inode;
struct nfs4_state *state; struct nfs4_state *state;
...@@ -2776,6 +2785,11 @@ static void nfs4_close_prepare(struct rpc_task *task, void *data) ...@@ -2776,6 +2785,11 @@ static void nfs4_close_prepare(struct rpc_task *task, void *data)
goto out_no_action; goto out_no_action;
} }
if (nfs4_wait_on_layoutreturn(inode, task)) {
nfs_release_seqid(calldata->arg.seqid);
goto out_wait;
}
if (calldata->arg.fmode == 0) if (calldata->arg.fmode == 0)
task->tk_msg.rpc_proc = &nfs4_procedures[NFSPROC4_CLNT_CLOSE]; task->tk_msg.rpc_proc = &nfs4_procedures[NFSPROC4_CLNT_CLOSE];
if (calldata->roc) if (calldata->roc)
...@@ -5321,6 +5335,9 @@ static void nfs4_delegreturn_prepare(struct rpc_task *task, void *data) ...@@ -5321,6 +5335,9 @@ static void nfs4_delegreturn_prepare(struct rpc_task *task, void *data)
d_data = (struct nfs4_delegreturndata *)data; d_data = (struct nfs4_delegreturndata *)data;
if (nfs4_wait_on_layoutreturn(d_data->inode, task))
return;
if (d_data->roc) if (d_data->roc)
pnfs_roc_get_barrier(d_data->inode, &d_data->roc_barrier); pnfs_roc_get_barrier(d_data->inode, &d_data->roc_barrier);
......
...@@ -1104,20 +1104,15 @@ bool pnfs_roc(struct inode *ino) ...@@ -1104,20 +1104,15 @@ bool pnfs_roc(struct inode *ino)
mark_lseg_invalid(lseg, &tmp_list); mark_lseg_invalid(lseg, &tmp_list);
found = true; found = true;
} }
/* pnfs_prepare_layoutreturn() grabs lo ref and it will be put /* ROC in two conditions:
* in pnfs_roc_release(). We don't really send a layoutreturn but
* still want others to view us like we are sending one!
*
* If pnfs_prepare_layoutreturn() fails, it means someone else is doing
* LAYOUTRETURN, so we proceed like there are no layouts to return.
*
* ROC in three conditions:
* 1. there are ROC lsegs * 1. there are ROC lsegs
* 2. we don't send layoutreturn * 2. we don't send layoutreturn
* 3. no others are sending layoutreturn
*/ */
if (found && !layoutreturn && pnfs_prepare_layoutreturn(lo)) if (found && !layoutreturn) {
/* lo ref dropped in pnfs_roc_release() */
pnfs_get_layout_hdr(lo);
roc = true; roc = true;
}
out_noroc: out_noroc:
spin_unlock(&ino->i_lock); spin_unlock(&ino->i_lock);
...@@ -1172,6 +1167,26 @@ void pnfs_roc_get_barrier(struct inode *ino, u32 *barrier) ...@@ -1172,6 +1167,26 @@ void pnfs_roc_get_barrier(struct inode *ino, u32 *barrier)
spin_unlock(&ino->i_lock); spin_unlock(&ino->i_lock);
} }
bool pnfs_wait_on_layoutreturn(struct inode *ino, struct rpc_task *task)
{
struct nfs_inode *nfsi = NFS_I(ino);
struct pnfs_layout_hdr *lo;
bool sleep = false;
/* we might not have grabbed lo reference. so need to check under
* i_lock */
spin_lock(&ino->i_lock);
lo = nfsi->layout;
if (lo && test_bit(NFS_LAYOUT_RETURN, &lo->plh_flags))
sleep = true;
spin_unlock(&ino->i_lock);
if (sleep)
rpc_sleep_on(&NFS_SERVER(ino)->roc_rpcwaitq, task, NULL);
return sleep;
}
/* /*
* Compare two layout segments for sorting into layout cache. * Compare two layout segments for sorting into layout cache.
* We want to preferentially return RW over RO layouts, so ensure those * We want to preferentially return RW over RO layouts, so ensure those
......
...@@ -270,6 +270,7 @@ bool pnfs_roc(struct inode *ino); ...@@ -270,6 +270,7 @@ bool pnfs_roc(struct inode *ino);
void pnfs_roc_release(struct inode *ino); void pnfs_roc_release(struct inode *ino);
void pnfs_roc_set_barrier(struct inode *ino, u32 barrier); void pnfs_roc_set_barrier(struct inode *ino, u32 barrier);
void pnfs_roc_get_barrier(struct inode *ino, u32 *barrier); void pnfs_roc_get_barrier(struct inode *ino, u32 *barrier);
bool pnfs_wait_on_layoutreturn(struct inode *ino, struct rpc_task *task);
void pnfs_set_layoutcommit(struct inode *, struct pnfs_layout_segment *, loff_t); void pnfs_set_layoutcommit(struct inode *, struct pnfs_layout_segment *, loff_t);
void pnfs_cleanup_layoutcommit(struct nfs4_layoutcommit_data *data); void pnfs_cleanup_layoutcommit(struct nfs4_layoutcommit_data *data);
int pnfs_layoutcommit_inode(struct inode *inode, bool sync); int pnfs_layoutcommit_inode(struct inode *inode, bool sync);
...@@ -639,6 +640,12 @@ pnfs_roc_get_barrier(struct inode *ino, u32 *barrier) ...@@ -639,6 +640,12 @@ pnfs_roc_get_barrier(struct inode *ino, u32 *barrier)
{ {
} }
static inline bool
pnfs_wait_on_layoutreturn(struct inode *ino, struct rpc_task *task)
{
return false;
}
static inline void set_pnfs_layoutdriver(struct nfs_server *s, static inline void set_pnfs_layoutdriver(struct nfs_server *s,
const struct nfs_fh *mntfh, u32 id) const struct nfs_fh *mntfh, u32 id)
{ {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment