Commit 8b3e78b5 authored by Jens Axboe's avatar Jens Axboe

io-wq: fix races around manager/worker creation and task exit

These races have always been there, they are just more apparent now that
we do early cancel of io-wq when the task exits.

Ensure that the io-wq manager sets task state correctly to not miss
wakeups for task creation. This is important if we get a wakeup after
having marked ourselves as TASK_INTERRUPTIBLE. If we do end up creating
workers, then we flip the state back to running, making the subsequent
schedule() a no-op. Also increment the wq ref count before forking the
thread, to avoid a use-after-free.
Signed-off-by: default avatarJens Axboe <axboe@kernel.dk>
parent 8a378fb0
...@@ -605,6 +605,8 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) ...@@ -605,6 +605,8 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
struct io_worker *worker; struct io_worker *worker;
pid_t pid; pid_t pid;
__set_current_state(TASK_RUNNING);
worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node); worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node);
if (!worker) if (!worker)
return false; return false;
...@@ -614,15 +616,18 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) ...@@ -614,15 +616,18 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
worker->wqe = wqe; worker->wqe = wqe;
spin_lock_init(&worker->lock); spin_lock_init(&worker->lock);
refcount_inc(&wq->refs);
if (index == IO_WQ_ACCT_BOUND) if (index == IO_WQ_ACCT_BOUND)
pid = io_wq_fork_thread(task_thread_bound, worker); pid = io_wq_fork_thread(task_thread_bound, worker);
else else
pid = io_wq_fork_thread(task_thread_unbound, worker); pid = io_wq_fork_thread(task_thread_unbound, worker);
if (pid < 0) { if (pid < 0) {
if (refcount_dec_and_test(&wq->refs))
complete(&wq->done);
kfree(worker); kfree(worker);
return false; return false;
} }
refcount_inc(&wq->refs);
return true; return true;
} }
...@@ -668,6 +673,30 @@ static bool io_wq_worker_wake(struct io_worker *worker, void *data) ...@@ -668,6 +673,30 @@ static bool io_wq_worker_wake(struct io_worker *worker, void *data)
return false; return false;
} }
static void io_wq_check_workers(struct io_wq *wq)
{
int node;
for_each_node(node) {
struct io_wqe *wqe = wq->wqes[node];
bool fork_worker[2] = { false, false };
if (!node_online(node))
continue;
raw_spin_lock_irq(&wqe->lock);
if (io_wqe_need_worker(wqe, IO_WQ_ACCT_BOUND))
fork_worker[IO_WQ_ACCT_BOUND] = true;
if (io_wqe_need_worker(wqe, IO_WQ_ACCT_UNBOUND))
fork_worker[IO_WQ_ACCT_UNBOUND] = true;
raw_spin_unlock_irq(&wqe->lock);
if (fork_worker[IO_WQ_ACCT_BOUND])
create_io_worker(wq, wqe, IO_WQ_ACCT_BOUND);
if (fork_worker[IO_WQ_ACCT_UNBOUND])
create_io_worker(wq, wqe, IO_WQ_ACCT_UNBOUND);
}
}
/* /*
* Manager thread. Tasked with creating new workers, if we need them. * Manager thread. Tasked with creating new workers, if we need them.
*/ */
...@@ -684,30 +713,15 @@ static int io_wq_manager(void *data) ...@@ -684,30 +713,15 @@ static int io_wq_manager(void *data)
complete(&wq->done); complete(&wq->done);
while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) { do {
for_each_node(node) {
struct io_wqe *wqe = wq->wqes[node];
bool fork_worker[2] = { false, false };
if (!node_online(node))
continue;
raw_spin_lock_irq(&wqe->lock);
if (io_wqe_need_worker(wqe, IO_WQ_ACCT_BOUND))
fork_worker[IO_WQ_ACCT_BOUND] = true;
if (io_wqe_need_worker(wqe, IO_WQ_ACCT_UNBOUND))
fork_worker[IO_WQ_ACCT_UNBOUND] = true;
raw_spin_unlock_irq(&wqe->lock);
if (fork_worker[IO_WQ_ACCT_BOUND])
create_io_worker(wq, wqe, IO_WQ_ACCT_BOUND);
if (fork_worker[IO_WQ_ACCT_UNBOUND])
create_io_worker(wq, wqe, IO_WQ_ACCT_UNBOUND);
}
set_current_state(TASK_INTERRUPTIBLE); set_current_state(TASK_INTERRUPTIBLE);
io_wq_check_workers(wq);
schedule_timeout(HZ); schedule_timeout(HZ);
if (fatal_signal_pending(current)) if (fatal_signal_pending(current))
set_bit(IO_WQ_BIT_EXIT, &wq->state); set_bit(IO_WQ_BIT_EXIT, &wq->state);
} } while (!test_bit(IO_WQ_BIT_EXIT, &wq->state));
io_wq_check_workers(wq);
if (refcount_dec_and_test(&wq->refs)) { if (refcount_dec_and_test(&wq->refs)) {
complete(&wq->done); complete(&wq->done);
...@@ -970,7 +984,6 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) ...@@ -970,7 +984,6 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
current->flags &= ~PF_IO_WORKER; current->flags &= ~PF_IO_WORKER;
if (ret >= 0) { if (ret >= 0) {
wait_for_completion(&wq->done); wait_for_completion(&wq->done);
reinit_completion(&wq->done);
return wq; return wq;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment