Commit 3453d570 authored by Trond Myklebust's avatar Trond Myklebust

NFSv4.1: Avoid false retries when RPC calls are interrupted

A 'false retry' in NFSv4.1 occurs when the client attempts to transmit a
new RPC call using a slot+sequence number combination that references an
already cached one. Currently, the Linux NFS client will do this if a
user process interrupts an RPC call that is in progress.
The problem with doing so is that we defeat the main mechanism used by
the server to differentiate between a new call and a replayed one. Even
if the server is able to perfectly cache the arguments of the old call,
it cannot know if the client intended to replay or send a new call.

The obvious fix is to bump the sequence number pre-emptively if an
RPC call is interrupted, but in order to deal with the corner cases
where the interrupted call is not actually received and processed by
the server, we need to interpret the error NFS4ERR_SEQ_MISORDERED
as a sign that we need to either wait or locate a correct sequence
number that lies between the value we sent, and the last value that
was acked by a SEQUENCE call on that slot.
Signed-off-by: default avatarTrond Myklebust <trond.myklebust@hammerspace.com>
Tested-by: default avatarJason Tibbitts <tibbs@math.uh.edu>
parent 6f903b11
...@@ -730,13 +730,25 @@ static void nfs41_sequence_free_slot(struct nfs4_sequence_res *res) ...@@ -730,13 +730,25 @@ static void nfs41_sequence_free_slot(struct nfs4_sequence_res *res)
res->sr_slot = NULL; res->sr_slot = NULL;
} }
static void nfs4_slot_sequence_record_sent(struct nfs4_slot *slot,
u32 seqnr)
{
if ((s32)(seqnr - slot->seq_nr_highest_sent) > 0)
slot->seq_nr_highest_sent = seqnr;
}
static void nfs4_slot_sequence_acked(struct nfs4_slot *slot,
u32 seqnr)
{
slot->seq_nr_highest_sent = seqnr;
slot->seq_nr_last_acked = seqnr;
}
static int nfs41_sequence_process(struct rpc_task *task, static int nfs41_sequence_process(struct rpc_task *task,
struct nfs4_sequence_res *res) struct nfs4_sequence_res *res)
{ {
struct nfs4_session *session; struct nfs4_session *session;
struct nfs4_slot *slot = res->sr_slot; struct nfs4_slot *slot = res->sr_slot;
struct nfs_client *clp; struct nfs_client *clp;
bool interrupted = false;
int ret = 1; int ret = 1;
if (slot == NULL) if (slot == NULL)
...@@ -747,16 +759,12 @@ static int nfs41_sequence_process(struct rpc_task *task, ...@@ -747,16 +759,12 @@ static int nfs41_sequence_process(struct rpc_task *task,
session = slot->table->session; session = slot->table->session;
if (slot->interrupted) {
if (res->sr_status != -NFS4ERR_DELAY)
slot->interrupted = 0;
interrupted = true;
}
trace_nfs4_sequence_done(session, res); trace_nfs4_sequence_done(session, res);
/* Check the SEQUENCE operation status */ /* Check the SEQUENCE operation status */
switch (res->sr_status) { switch (res->sr_status) {
case 0: case 0:
/* Mark this sequence number as having been acked */
nfs4_slot_sequence_acked(slot, slot->seq_nr);
/* Update the slot's sequence and clientid lease timer */ /* Update the slot's sequence and clientid lease timer */
slot->seq_done = 1; slot->seq_done = 1;
clp = session->clp; clp = session->clp;
...@@ -771,9 +779,9 @@ static int nfs41_sequence_process(struct rpc_task *task, ...@@ -771,9 +779,9 @@ static int nfs41_sequence_process(struct rpc_task *task,
* sr_status remains 1 if an RPC level error occurred. * sr_status remains 1 if an RPC level error occurred.
* The server may or may not have processed the sequence * The server may or may not have processed the sequence
* operation.. * operation..
* Mark the slot as having hosted an interrupted RPC call.
*/ */
slot->interrupted = 1; nfs4_slot_sequence_record_sent(slot, slot->seq_nr);
slot->seq_done = 1;
goto out; goto out;
case -NFS4ERR_DELAY: case -NFS4ERR_DELAY:
/* The server detected a resend of the RPC call and /* The server detected a resend of the RPC call and
...@@ -784,6 +792,7 @@ static int nfs41_sequence_process(struct rpc_task *task, ...@@ -784,6 +792,7 @@ static int nfs41_sequence_process(struct rpc_task *task,
__func__, __func__,
slot->slot_nr, slot->slot_nr,
slot->seq_nr); slot->seq_nr);
nfs4_slot_sequence_acked(slot, slot->seq_nr);
goto out_retry; goto out_retry;
case -NFS4ERR_RETRY_UNCACHED_REP: case -NFS4ERR_RETRY_UNCACHED_REP:
case -NFS4ERR_SEQ_FALSE_RETRY: case -NFS4ERR_SEQ_FALSE_RETRY:
...@@ -791,6 +800,7 @@ static int nfs41_sequence_process(struct rpc_task *task, ...@@ -791,6 +800,7 @@ static int nfs41_sequence_process(struct rpc_task *task,
* The server thinks we tried to replay a request. * The server thinks we tried to replay a request.
* Retry the call after bumping the sequence ID. * Retry the call after bumping the sequence ID.
*/ */
nfs4_slot_sequence_acked(slot, slot->seq_nr);
goto retry_new_seq; goto retry_new_seq;
case -NFS4ERR_BADSLOT: case -NFS4ERR_BADSLOT:
/* /*
...@@ -801,21 +811,28 @@ static int nfs41_sequence_process(struct rpc_task *task, ...@@ -801,21 +811,28 @@ static int nfs41_sequence_process(struct rpc_task *task,
goto session_recover; goto session_recover;
goto retry_nowait; goto retry_nowait;
case -NFS4ERR_SEQ_MISORDERED: case -NFS4ERR_SEQ_MISORDERED:
nfs4_slot_sequence_record_sent(slot, slot->seq_nr);
/* /*
* Was the last operation on this sequence interrupted? * Were one or more calls using this slot interrupted?
* If so, retry after bumping the sequence number. * If the server never received the request, then our
* transmitted slot sequence number may be too high.
*/ */
if (interrupted) if ((s32)(slot->seq_nr - slot->seq_nr_last_acked) > 1) {
goto retry_new_seq; slot->seq_nr--;
/*
* Could this slot have been previously retired?
* If so, then the server may be expecting seq_nr = 1!
*/
if (slot->seq_nr != 1) {
slot->seq_nr = 1;
goto retry_nowait; goto retry_nowait;
} }
goto session_recover; /*
* RFC5661:
* A retry might be sent while the original request is
* still in progress on the replier. The replier SHOULD
* deal with the issue by returning NFS4ERR_DELAY as the
* reply to SEQUENCE or CB_SEQUENCE operation, but
* implementations MAY return NFS4ERR_SEQ_MISORDERED.
*
* Restart the search after a delay.
*/
slot->seq_nr = slot->seq_nr_highest_sent;
goto out_retry;
default: default:
/* Just update the slot sequence no. */ /* Just update the slot sequence no. */
slot->seq_done = 1; slot->seq_done = 1;
...@@ -906,17 +923,6 @@ static const struct rpc_call_ops nfs41_call_sync_ops = { ...@@ -906,17 +923,6 @@ static const struct rpc_call_ops nfs41_call_sync_ops = {
.rpc_call_done = nfs41_call_sync_done, .rpc_call_done = nfs41_call_sync_done,
}; };
static void
nfs4_sequence_process_interrupted(struct nfs_client *client,
struct nfs4_slot *slot, const struct cred *cred)
{
struct rpc_task *task;
task = _nfs41_proc_sequence(client, cred, slot, true);
if (!IS_ERR(task))
rpc_put_task_async(task);
}
#else /* !CONFIG_NFS_V4_1 */ #else /* !CONFIG_NFS_V4_1 */
static int nfs4_sequence_process(struct rpc_task *task, struct nfs4_sequence_res *res) static int nfs4_sequence_process(struct rpc_task *task, struct nfs4_sequence_res *res)
...@@ -937,14 +943,6 @@ int nfs4_sequence_done(struct rpc_task *task, ...@@ -937,14 +943,6 @@ int nfs4_sequence_done(struct rpc_task *task,
} }
EXPORT_SYMBOL_GPL(nfs4_sequence_done); EXPORT_SYMBOL_GPL(nfs4_sequence_done);
static void
nfs4_sequence_process_interrupted(struct nfs_client *client,
struct nfs4_slot *slot, const struct cred *cred)
{
WARN_ON_ONCE(1);
slot->interrupted = 0;
}
#endif /* !CONFIG_NFS_V4_1 */ #endif /* !CONFIG_NFS_V4_1 */
static static
...@@ -982,26 +980,19 @@ int nfs4_setup_sequence(struct nfs_client *client, ...@@ -982,26 +980,19 @@ int nfs4_setup_sequence(struct nfs_client *client,
task->tk_timeout = 0; task->tk_timeout = 0;
} }
for (;;) { spin_lock(&tbl->slot_tbl_lock);
spin_lock(&tbl->slot_tbl_lock); /* The state manager will wait until the slot table is empty */
/* The state manager will wait until the slot table is empty */ if (nfs4_slot_tbl_draining(tbl) && !args->sa_privileged)
if (nfs4_slot_tbl_draining(tbl) && !args->sa_privileged) goto out_sleep;
goto out_sleep;
slot = nfs4_alloc_slot(tbl);
if (IS_ERR(slot)) {
/* Try again in 1/4 second */
if (slot == ERR_PTR(-ENOMEM))
task->tk_timeout = HZ >> 2;
goto out_sleep;
}
spin_unlock(&tbl->slot_tbl_lock);
if (likely(!slot->interrupted)) slot = nfs4_alloc_slot(tbl);
break; if (IS_ERR(slot)) {
nfs4_sequence_process_interrupted(client, /* Try again in 1/4 second */
slot, task->tk_msg.rpc_cred); if (slot == ERR_PTR(-ENOMEM))
task->tk_timeout = HZ >> 2;
goto out_sleep;
} }
spin_unlock(&tbl->slot_tbl_lock);
nfs4_sequence_attach_slot(args, res, slot); nfs4_sequence_attach_slot(args, res, slot);
......
...@@ -110,6 +110,8 @@ static struct nfs4_slot *nfs4_new_slot(struct nfs4_slot_table *tbl, ...@@ -110,6 +110,8 @@ static struct nfs4_slot *nfs4_new_slot(struct nfs4_slot_table *tbl,
slot->table = tbl; slot->table = tbl;
slot->slot_nr = slotid; slot->slot_nr = slotid;
slot->seq_nr = seq_init; slot->seq_nr = seq_init;
slot->seq_nr_highest_sent = seq_init;
slot->seq_nr_last_acked = seq_init - 1;
} }
return slot; return slot;
} }
...@@ -276,7 +278,8 @@ static void nfs4_reset_slot_table(struct nfs4_slot_table *tbl, ...@@ -276,7 +278,8 @@ static void nfs4_reset_slot_table(struct nfs4_slot_table *tbl,
p = &tbl->slots; p = &tbl->slots;
while (*p) { while (*p) {
(*p)->seq_nr = ivalue; (*p)->seq_nr = ivalue;
(*p)->interrupted = 0; (*p)->seq_nr_highest_sent = ivalue;
(*p)->seq_nr_last_acked = ivalue - 1;
p = &(*p)->next; p = &(*p)->next;
} }
tbl->highest_used_slotid = NFS4_NO_SLOT; tbl->highest_used_slotid = NFS4_NO_SLOT;
......
...@@ -23,8 +23,9 @@ struct nfs4_slot { ...@@ -23,8 +23,9 @@ struct nfs4_slot {
unsigned long generation; unsigned long generation;
u32 slot_nr; u32 slot_nr;
u32 seq_nr; u32 seq_nr;
unsigned int interrupted : 1, u32 seq_nr_last_acked;
privileged : 1, u32 seq_nr_highest_sent;
unsigned int privileged : 1,
seq_done : 1; seq_done : 1;
}; };
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment