Commit 39494194 authored by Trond Myklebust's avatar Trond Myklebust Committed by Anna Schumaker

SUNRPC: Fix races with rpc_killall_tasks()

Ensure that we immediately call rpc_exit_task() after waking up, and
that the tk_rpc_status cannot get clobbered by some other function.
Signed-off-by: default avatarTrond Myklebust <trond.myklebust@hammerspace.com>
Signed-off-by: default avatarAnna Schumaker <Anna.Schumaker@Netapp.com>
parent e4266f23
...@@ -209,6 +209,7 @@ struct rpc_task *rpc_run_task(const struct rpc_task_setup *); ...@@ -209,6 +209,7 @@ struct rpc_task *rpc_run_task(const struct rpc_task_setup *);
struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req); struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req);
void rpc_put_task(struct rpc_task *); void rpc_put_task(struct rpc_task *);
void rpc_put_task_async(struct rpc_task *); void rpc_put_task_async(struct rpc_task *);
bool rpc_task_set_rpc_status(struct rpc_task *task, int rpc_status);
void rpc_signal_task(struct rpc_task *); void rpc_signal_task(struct rpc_task *);
void rpc_exit_task(struct rpc_task *); void rpc_exit_task(struct rpc_task *);
void rpc_exit(struct rpc_task *, int); void rpc_exit(struct rpc_task *, int);
......
...@@ -1642,7 +1642,7 @@ static void ...@@ -1642,7 +1642,7 @@ static void
__rpc_call_rpcerror(struct rpc_task *task, int tk_status, int rpc_status) __rpc_call_rpcerror(struct rpc_task *task, int tk_status, int rpc_status)
{ {
trace_rpc_call_rpcerror(task, tk_status, rpc_status); trace_rpc_call_rpcerror(task, tk_status, rpc_status);
task->tk_rpc_status = rpc_status; rpc_task_set_rpc_status(task, rpc_status);
rpc_exit(task, tk_status); rpc_exit(task, tk_status);
} }
...@@ -2435,10 +2435,8 @@ rpc_check_timeout(struct rpc_task *task) ...@@ -2435,10 +2435,8 @@ rpc_check_timeout(struct rpc_task *task)
{ {
struct rpc_clnt *clnt = task->tk_client; struct rpc_clnt *clnt = task->tk_client;
if (RPC_SIGNALLED(task)) { if (RPC_SIGNALLED(task))
rpc_call_rpcerror(task, -ERESTARTSYS);
return; return;
}
if (xprt_adjust_timeout(task->tk_rqstp) == 0) if (xprt_adjust_timeout(task->tk_rqstp) == 0)
return; return;
......
...@@ -65,6 +65,13 @@ gfp_t rpc_task_gfp_mask(void) ...@@ -65,6 +65,13 @@ gfp_t rpc_task_gfp_mask(void)
} }
EXPORT_SYMBOL_GPL(rpc_task_gfp_mask); EXPORT_SYMBOL_GPL(rpc_task_gfp_mask);
bool rpc_task_set_rpc_status(struct rpc_task *task, int rpc_status)
{
if (cmpxchg(&task->tk_rpc_status, 0, rpc_status) == 0)
return true;
return false;
}
unsigned long unsigned long
rpc_task_timeout(const struct rpc_task *task) rpc_task_timeout(const struct rpc_task *task)
{ {
...@@ -855,12 +862,14 @@ void rpc_signal_task(struct rpc_task *task) ...@@ -855,12 +862,14 @@ void rpc_signal_task(struct rpc_task *task)
if (!RPC_IS_ACTIVATED(task)) if (!RPC_IS_ACTIVATED(task))
return; return;
if (!rpc_task_set_rpc_status(task, -ERESTARTSYS))
return;
trace_rpc_task_signalled(task, task->tk_action); trace_rpc_task_signalled(task, task->tk_action);
set_bit(RPC_TASK_SIGNALLED, &task->tk_runstate); set_bit(RPC_TASK_SIGNALLED, &task->tk_runstate);
smp_mb__after_atomic(); smp_mb__after_atomic();
queue = READ_ONCE(task->tk_waitqueue); queue = READ_ONCE(task->tk_waitqueue);
if (queue) if (queue)
rpc_wake_up_queued_task_set_status(queue, task, -ERESTARTSYS); rpc_wake_up_queued_task(queue, task);
} }
void rpc_exit(struct rpc_task *task, int status) void rpc_exit(struct rpc_task *task, int status)
...@@ -907,10 +916,16 @@ static void __rpc_execute(struct rpc_task *task) ...@@ -907,10 +916,16 @@ static void __rpc_execute(struct rpc_task *task)
* Perform the next FSM step or a pending callback. * Perform the next FSM step or a pending callback.
* *
* tk_action may be NULL if the task has been killed. * tk_action may be NULL if the task has been killed.
* In particular, note that rpc_killall_tasks may
* do this at any time, so beware when dereferencing.
*/ */
do_action = task->tk_action; do_action = task->tk_action;
/* Tasks with an RPC error status should exit */
if (do_action != rpc_exit_task &&
(status = READ_ONCE(task->tk_rpc_status)) != 0) {
task->tk_status = status;
if (do_action != NULL)
do_action = rpc_exit_task;
}
/* Callbacks override all actions */
if (task->tk_callback) { if (task->tk_callback) {
do_action = task->tk_callback; do_action = task->tk_callback;
task->tk_callback = NULL; task->tk_callback = NULL;
...@@ -932,14 +947,6 @@ static void __rpc_execute(struct rpc_task *task) ...@@ -932,14 +947,6 @@ static void __rpc_execute(struct rpc_task *task)
continue; continue;
} }
/*
* Signalled tasks should exit rather than sleep.
*/
if (RPC_SIGNALLED(task)) {
task->tk_rpc_status = -ERESTARTSYS;
rpc_exit(task, -ERESTARTSYS);
}
/* /*
* The queue->lock protects against races with * The queue->lock protects against races with
* rpc_make_runnable(). * rpc_make_runnable().
...@@ -955,6 +962,12 @@ static void __rpc_execute(struct rpc_task *task) ...@@ -955,6 +962,12 @@ static void __rpc_execute(struct rpc_task *task)
spin_unlock(&queue->lock); spin_unlock(&queue->lock);
continue; continue;
} }
/* Wake up any task that has an exit status */
if (READ_ONCE(task->tk_rpc_status) != 0) {
rpc_wake_up_task_queue_locked(queue, task);
spin_unlock(&queue->lock);
continue;
}
rpc_clear_running(task); rpc_clear_running(task);
spin_unlock(&queue->lock); spin_unlock(&queue->lock);
if (task_is_async) if (task_is_async)
...@@ -972,10 +985,7 @@ static void __rpc_execute(struct rpc_task *task) ...@@ -972,10 +985,7 @@ static void __rpc_execute(struct rpc_task *task)
* clean up after sleeping on some queue, we don't * clean up after sleeping on some queue, we don't
* break the loop here, but go around once more. * break the loop here, but go around once more.
*/ */
trace_rpc_task_signalled(task, task->tk_action); rpc_signal_task(task);
set_bit(RPC_TASK_SIGNALLED, &task->tk_runstate);
task->tk_rpc_status = -ERESTARTSYS;
rpc_exit(task, -ERESTARTSYS);
} }
trace_rpc_task_sync_wake(task, task->tk_action); trace_rpc_task_sync_wake(task, task->tk_action);
} }
......
...@@ -1978,8 +1978,7 @@ static void xs_local_connect(struct rpc_xprt *xprt, struct rpc_task *task) ...@@ -1978,8 +1978,7 @@ static void xs_local_connect(struct rpc_xprt *xprt, struct rpc_task *task)
* we'll need to figure out how to pass a namespace to * we'll need to figure out how to pass a namespace to
* connect. * connect.
*/ */
task->tk_rpc_status = -ENOTCONN; rpc_task_set_rpc_status(task, -ENOTCONN);
rpc_exit(task, -ENOTCONN);
goto out_wake; goto out_wake;
} }
ret = xs_local_setup_socket(transport); ret = xs_local_setup_socket(transport);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment