Commit e61df66c authored by Jens Axboe's avatar Jens Axboe

io-wq: ensure free/busy list browsing see all items

We have two lists for workers in io-wq, a busy and a free list. For
certain operations we want to browse all workers, and we currently do
that by browsing the two separate lists. But since these lists are RCU
protected, we can potentially miss workers if they move between the two
lists while we're browsing them.

Add a third list, all_list, that simply holds all workers. A worker is
added to that list when it starts, and removed when it exits. This makes
the worker iteration cleaner, too.
Reported-by: default avatarPaul E. McKenney <paulmck@kernel.org>
Reviewed-by: default avatarPaul E. McKenney <paulmck@kernel.org>
Signed-off-by: default avatarJens Axboe <axboe@kernel.dk>
parent 36c2f922
......@@ -46,6 +46,7 @@ struct io_worker {
refcount_t ref;
unsigned flags;
struct hlist_nulls_node nulls_node;
struct list_head all_list;
struct task_struct *task;
wait_queue_head_t wait;
struct io_wqe *wqe;
......@@ -96,6 +97,7 @@ struct io_wqe {
struct io_wq_nulls_list free_list;
struct io_wq_nulls_list busy_list;
struct list_head all_list;
struct io_wq *wq;
};
......@@ -212,6 +214,7 @@ static void io_worker_exit(struct io_worker *worker)
spin_lock_irq(&wqe->lock);
hlist_nulls_del_rcu(&worker->nulls_node);
list_del_rcu(&worker->all_list);
if (__io_worker_unuse(wqe, worker)) {
__release(&wqe->lock);
spin_lock_irq(&wqe->lock);
......@@ -590,6 +593,7 @@ static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
spin_lock_irq(&wqe->lock);
hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list.head);
list_add_tail_rcu(&worker->all_list, &wqe->all_list);
worker->flags |= IO_WORKER_F_FREE;
if (index == IO_WQ_ACCT_BOUND)
worker->flags |= IO_WORKER_F_BOUND;
......@@ -733,16 +737,13 @@ static bool io_wqe_worker_send_sig(struct io_worker *worker, void *data)
* worker that isn't exiting
*/
static bool io_wq_for_each_worker(struct io_wqe *wqe,
struct io_wq_nulls_list *list,
bool (*func)(struct io_worker *, void *),
void *data)
{
struct hlist_nulls_node *n;
struct io_worker *worker;
bool ret = false;
restart:
hlist_nulls_for_each_entry_rcu(worker, n, &list->head, nulls_node) {
list_for_each_entry_rcu(worker, &wqe->all_list, all_list) {
if (io_worker_get(worker)) {
ret = func(worker, data);
io_worker_release(worker);
......@@ -750,8 +751,7 @@ static bool io_wq_for_each_worker(struct io_wqe *wqe,
break;
}
}
if (!ret && get_nulls_value(n) != list->nulls)
goto restart;
return ret;
}
......@@ -769,10 +769,7 @@ void io_wq_cancel_all(struct io_wq *wq)
for (i = 0; i < wq->nr_wqes; i++) {
struct io_wqe *wqe = wq->wqes[i];
io_wq_for_each_worker(wqe, &wqe->busy_list,
io_wqe_worker_send_sig, NULL);
io_wq_for_each_worker(wqe, &wqe->free_list,
io_wqe_worker_send_sig, NULL);
io_wq_for_each_worker(wqe, io_wqe_worker_send_sig, NULL);
}
rcu_read_unlock();
}
......@@ -834,14 +831,7 @@ static enum io_wq_cancel io_wqe_cancel_cb_work(struct io_wqe *wqe,
}
rcu_read_lock();
found = io_wq_for_each_worker(wqe, &wqe->free_list, io_work_cancel,
&data);
if (found)
goto done;
found = io_wq_for_each_worker(wqe, &wqe->busy_list, io_work_cancel,
&data);
done:
found = io_wq_for_each_worker(wqe, io_work_cancel, &data);
rcu_read_unlock();
return found ? IO_WQ_CANCEL_RUNNING : IO_WQ_CANCEL_NOTFOUND;
}
......@@ -919,14 +909,7 @@ static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe,
* completion will run normally in this case.
*/
rcu_read_lock();
found = io_wq_for_each_worker(wqe, &wqe->free_list, io_wq_worker_cancel,
cwork);
if (found)
goto done;
found = io_wq_for_each_worker(wqe, &wqe->busy_list, io_wq_worker_cancel,
cwork);
done:
found = io_wq_for_each_worker(wqe, io_wq_worker_cancel, cwork);
rcu_read_unlock();
return found ? IO_WQ_CANCEL_RUNNING : IO_WQ_CANCEL_NOTFOUND;
}
......@@ -1030,6 +1013,7 @@ struct io_wq *io_wq_create(unsigned bounded, struct mm_struct *mm,
wqe->free_list.nulls = 0;
INIT_HLIST_NULLS_HEAD(&wqe->busy_list.head, 1);
wqe->busy_list.nulls = 1;
INIT_LIST_HEAD(&wqe->all_list);
i++;
}
......@@ -1077,10 +1061,7 @@ void io_wq_destroy(struct io_wq *wq)
if (!wqe)
continue;
io_wq_for_each_worker(wqe, &wqe->free_list, io_wq_worker_wake,
NULL);
io_wq_for_each_worker(wqe, &wqe->busy_list, io_wq_worker_wake,
NULL);
io_wq_for_each_worker(wqe, io_wq_worker_wake, NULL);
}
rcu_read_unlock();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment