Commit 685fe7fe authored by Jens Axboe's avatar Jens Axboe

io-wq: eliminate the need for a manager thread

io-wq relies on a manager thread to create/fork new workers, as needed.
But there's really no strong need for it anymore. We have the following
cases that fork a new worker:

1) Work queue. This is done from the task itself always, and it's trivial
   to create a worker off that path, if needed.

2) All workers have gone to sleep, and we have more work. This is called
   off the sched out path. For this case, use a task_work items to queue
   a fork-worker operation.

3) Hashed work completion. Don't think we need to do anything off this
   case. If need be, it could just use approach 2 as well.

Part of this change is incrementing the running worker count before the
fork, to avoid cases where we observe we need a worker and then queue
creation of one. Then new work comes in, we fork a new one. That last
queue operation should have waited for the previous worker to come up,
it's quite possible we don't even need it. Hence move the worker running
from before we fork it off to more efficiently handle that case.
Signed-off-by: default avatarJens Axboe <axboe@kernel.dk>
parent 66ae0d1e
......@@ -68,6 +68,7 @@ struct io_worker {
struct io_wqe_acct {
unsigned nr_workers;
unsigned max_workers;
int index;
atomic_t nr_running;
};
......@@ -108,19 +109,16 @@ struct io_wq {
free_work_fn *free_work;
io_wq_work_fn *do_work;
struct task_struct *manager;
struct io_wq_hash *hash;
refcount_t refs;
struct completion exited;
atomic_t worker_refs;
struct completion worker_done;
struct hlist_node cpuhp_node;
pid_t task_pid;
struct task_struct *task;
};
static enum cpuhp_state io_wq_online;
......@@ -133,8 +131,7 @@ struct io_cb_cancel_data {
bool cancel_all;
};
static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
struct io_cb_cancel_data *match);
static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index);
static bool io_worker_get(struct io_worker *worker)
{
......@@ -163,6 +160,12 @@ static inline struct io_wqe_acct *io_wqe_get_acct(struct io_worker *worker)
return io_get_acct(worker->wqe, worker->flags & IO_WORKER_F_BOUND);
}
static void io_worker_ref_put(struct io_wq *wq)
{
if (atomic_dec_and_test(&wq->worker_refs))
complete(&wq->worker_done);
}
static void io_worker_exit(struct io_worker *worker)
{
struct io_wqe *wqe = worker->wqe;
......@@ -190,8 +193,7 @@ static void io_worker_exit(struct io_worker *worker)
raw_spin_unlock_irq(&wqe->lock);
kfree_rcu(worker, rcu);
if (atomic_dec_and_test(&wqe->wq->worker_refs))
complete(&wqe->wq->worker_done);
io_worker_ref_put(wqe->wq);
do_exit(0);
}
......@@ -206,7 +208,7 @@ static inline bool io_wqe_run_queue(struct io_wqe *wqe)
/*
* Check head of free list for an available worker. If one isn't available,
* caller must wake up the wq manager to create one.
* caller must create one.
*/
static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
__must_hold(RCU)
......@@ -230,7 +232,7 @@ static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
/*
* We need a worker. If we find a free one, we're good. If not, and we're
* below the max number of workers, wake up the manager to create one.
* below the max number of workers, create one.
*/
static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
{
......@@ -246,8 +248,11 @@ static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
ret = io_wqe_activate_free_worker(wqe);
rcu_read_unlock();
if (!ret && acct->nr_workers < acct->max_workers)
wake_up_process(wqe->wq->manager);
if (!ret && acct->nr_workers < acct->max_workers) {
atomic_inc(&acct->nr_running);
atomic_inc(&wqe->wq->worker_refs);
create_io_worker(wqe->wq, wqe, acct->index);
}
}
static void io_wqe_inc_running(struct io_worker *worker)
......@@ -257,14 +262,61 @@ static void io_wqe_inc_running(struct io_worker *worker)
atomic_inc(&acct->nr_running);
}
struct create_worker_data {
struct callback_head work;
struct io_wqe *wqe;
int index;
};
static void create_worker_cb(struct callback_head *cb)
{
struct create_worker_data *cwd;
struct io_wq *wq;
cwd = container_of(cb, struct create_worker_data, work);
wq = cwd->wqe->wq;
create_io_worker(wq, cwd->wqe, cwd->index);
kfree(cwd);
}
static void io_queue_worker_create(struct io_wqe *wqe, struct io_wqe_acct *acct)
{
struct create_worker_data *cwd;
struct io_wq *wq = wqe->wq;
/* raced with exit, just ignore create call */
if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
goto fail;
cwd = kmalloc(sizeof(*cwd), GFP_ATOMIC);
if (cwd) {
init_task_work(&cwd->work, create_worker_cb);
cwd->wqe = wqe;
cwd->index = acct->index;
if (!task_work_add(wq->task, &cwd->work, TWA_SIGNAL))
return;
kfree(cwd);
}
fail:
atomic_dec(&acct->nr_running);
io_worker_ref_put(wq);
}
static void io_wqe_dec_running(struct io_worker *worker)
__must_hold(wqe->lock)
{
struct io_wqe_acct *acct = io_wqe_get_acct(worker);
struct io_wqe *wqe = worker->wqe;
if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe))
io_wqe_wake_worker(wqe, acct);
if (!(worker->flags & IO_WORKER_F_UP))
return;
if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe)) {
atomic_inc(&acct->nr_running);
atomic_inc(&wqe->wq->worker_refs);
io_queue_worker_create(wqe, acct);
}
}
/*
......@@ -483,9 +535,8 @@ static int io_wqe_worker(void *data)
char buf[TASK_COMM_LEN];
worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
io_wqe_inc_running(worker);
snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task_pid);
snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid);
set_task_comm(current, buf);
while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
......@@ -549,8 +600,7 @@ void io_wq_worker_running(struct task_struct *tsk)
/*
* Called when worker is going to sleep. If there are no workers currently
* running and we have work pending, wake up a free one or have the manager
* set one up.
* running and we have work pending, wake up a free one or create a new one.
*/
void io_wq_worker_sleeping(struct task_struct *tsk)
{
......@@ -570,7 +620,7 @@ void io_wq_worker_sleeping(struct task_struct *tsk)
raw_spin_unlock_irq(&worker->wqe->lock);
}
static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
{
struct io_wqe_acct *acct = &wqe->acct[index];
struct io_worker *worker;
......@@ -580,7 +630,7 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node);
if (!worker)
return false;
goto fail;
refcount_set(&worker->ref, 1);
worker->nulls_node.pprev = NULL;
......@@ -588,14 +638,13 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
spin_lock_init(&worker->lock);
init_completion(&worker->ref_done);
atomic_inc(&wq->worker_refs);
tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
if (IS_ERR(tsk)) {
if (atomic_dec_and_test(&wq->worker_refs))
complete(&wq->worker_done);
kfree(worker);
return false;
fail:
atomic_dec(&acct->nr_running);
io_worker_ref_put(wq);
return;
}
tsk->pf_io_worker = worker;
......@@ -614,7 +663,6 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
acct->nr_workers++;
raw_spin_unlock_irq(&wqe->lock);
wake_up_new_task(tsk);
return true;
}
static inline bool io_wqe_need_worker(struct io_wqe *wqe, int index)
......@@ -662,93 +710,11 @@ static bool io_wq_worker_wake(struct io_worker *worker, void *data)
return false;
}
static void io_wq_check_workers(struct io_wq *wq)
{
int node;
for_each_node(node) {
struct io_wqe *wqe = wq->wqes[node];
bool fork_worker[2] = { false, false };
if (!node_online(node))
continue;
raw_spin_lock_irq(&wqe->lock);
if (io_wqe_need_worker(wqe, IO_WQ_ACCT_BOUND))
fork_worker[IO_WQ_ACCT_BOUND] = true;
if (io_wqe_need_worker(wqe, IO_WQ_ACCT_UNBOUND))
fork_worker[IO_WQ_ACCT_UNBOUND] = true;
raw_spin_unlock_irq(&wqe->lock);
if (fork_worker[IO_WQ_ACCT_BOUND])
create_io_worker(wq, wqe, IO_WQ_ACCT_BOUND);
if (fork_worker[IO_WQ_ACCT_UNBOUND])
create_io_worker(wq, wqe, IO_WQ_ACCT_UNBOUND);
}
}
static bool io_wq_work_match_all(struct io_wq_work *work, void *data)
{
return true;
}
static void io_wq_cancel_pending(struct io_wq *wq)
{
struct io_cb_cancel_data match = {
.fn = io_wq_work_match_all,
.cancel_all = true,
};
int node;
for_each_node(node)
io_wqe_cancel_pending_work(wq->wqes[node], &match);
}
/*
* Manager thread. Tasked with creating new workers, if we need them.
*/
static int io_wq_manager(void *data)
{
struct io_wq *wq = data;
char buf[TASK_COMM_LEN];
int node;
snprintf(buf, sizeof(buf), "iou-mgr-%d", wq->task_pid);
set_task_comm(current, buf);
do {
set_current_state(TASK_INTERRUPTIBLE);
io_wq_check_workers(wq);
schedule_timeout(HZ);
if (signal_pending(current)) {
struct ksignal ksig;
if (!get_signal(&ksig))
continue;
set_bit(IO_WQ_BIT_EXIT, &wq->state);
}
} while (!test_bit(IO_WQ_BIT_EXIT, &wq->state));
io_wq_check_workers(wq);
rcu_read_lock();
for_each_node(node)
io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL);
rcu_read_unlock();
if (atomic_dec_and_test(&wq->worker_refs))
complete(&wq->worker_done);
wait_for_completion(&wq->worker_done);
spin_lock_irq(&wq->hash->wait.lock);
for_each_node(node)
list_del_init(&wq->wqes[node]->wait.entry);
spin_unlock_irq(&wq->hash->wait.lock);
io_wq_cancel_pending(wq);
complete(&wq->exited);
do_exit(0);
}
static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
{
struct io_wq *wq = wqe->wq;
......@@ -780,39 +746,13 @@ static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
wq_list_add_after(&work->list, &tail->list, &wqe->work_list);
}
static int io_wq_fork_manager(struct io_wq *wq)
{
struct task_struct *tsk;
if (wq->manager)
return 0;
WARN_ON_ONCE(test_bit(IO_WQ_BIT_EXIT, &wq->state));
init_completion(&wq->worker_done);
atomic_set(&wq->worker_refs, 1);
tsk = create_io_thread(io_wq_manager, wq, NUMA_NO_NODE);
if (!IS_ERR(tsk)) {
wq->manager = get_task_struct(tsk);
wake_up_new_task(tsk);
return 0;
}
if (atomic_dec_and_test(&wq->worker_refs))
complete(&wq->worker_done);
return PTR_ERR(tsk);
}
static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
{
struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
int work_flags;
unsigned long flags;
/* Can only happen if manager creation fails after exec */
if (io_wq_fork_manager(wqe->wq) ||
test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state)) {
if (test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state)) {
io_run_cancel(work, wqe);
return;
}
......@@ -967,17 +907,12 @@ static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode,
int sync, void *key)
{
struct io_wqe *wqe = container_of(wait, struct io_wqe, wait);
int ret;
list_del_init(&wait->entry);
rcu_read_lock();
ret = io_wqe_activate_free_worker(wqe);
io_wqe_activate_free_worker(wqe);
rcu_read_unlock();
if (!ret)
wake_up_process(wqe->wq->manager);
return 1;
}
......@@ -1018,6 +953,8 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
goto err;
wq->wqes[node] = wqe;
wqe->node = alloc_node;
wqe->acct[IO_WQ_ACCT_BOUND].index = IO_WQ_ACCT_BOUND;
wqe->acct[IO_WQ_ACCT_UNBOUND].index = IO_WQ_ACCT_UNBOUND;
wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0);
wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
......@@ -1032,13 +969,11 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
INIT_LIST_HEAD(&wqe->all_list);
}
wq->task_pid = current->pid;
init_completion(&wq->exited);
wq->task = get_task_struct(data->task);
refcount_set(&wq->refs, 1);
ret = io_wq_fork_manager(wq);
if (!ret)
return wq;
atomic_set(&wq->worker_refs, 1);
init_completion(&wq->worker_done);
return wq;
err:
io_wq_put_hash(data->hash);
cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
......@@ -1051,14 +986,39 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
return ERR_PTR(ret);
}
static void io_wq_destroy_manager(struct io_wq *wq)
static void io_wq_exit_workers(struct io_wq *wq)
{
if (wq->manager) {
wake_up_process(wq->manager);
wait_for_completion(&wq->exited);
put_task_struct(wq->manager);
wq->manager = NULL;
struct callback_head *cb;
int node;
set_bit(IO_WQ_BIT_EXIT, &wq->state);
if (!wq->task)
return;
while ((cb = task_work_cancel(wq->task, create_worker_cb)) != NULL) {
struct create_worker_data *cwd;
cwd = container_of(cb, struct create_worker_data, work);
atomic_dec(&cwd->wqe->acct[cwd->index].nr_running);
io_worker_ref_put(wq);
kfree(cwd);
}
rcu_read_lock();
for_each_node(node) {
struct io_wqe *wqe = wq->wqes[node];
io_wq_for_each_worker(wqe, io_wq_worker_wake, NULL);
spin_lock_irq(&wq->hash->wait.lock);
list_del_init(&wq->wqes[node]->wait.entry);
spin_unlock_irq(&wq->hash->wait.lock);
}
rcu_read_unlock();
io_worker_ref_put(wq);
wait_for_completion(&wq->worker_done);
put_task_struct(wq->task);
wq->task = NULL;
}
static void io_wq_destroy(struct io_wq *wq)
......@@ -1067,8 +1027,7 @@ static void io_wq_destroy(struct io_wq *wq)
cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
set_bit(IO_WQ_BIT_EXIT, &wq->state);
io_wq_destroy_manager(wq);
io_wq_exit_workers(wq);
for_each_node(node) {
struct io_wqe *wqe = wq->wqes[node];
......@@ -1092,8 +1051,7 @@ void io_wq_put(struct io_wq *wq)
void io_wq_put_and_exit(struct io_wq *wq)
{
set_bit(IO_WQ_BIT_EXIT, &wq->state);
io_wq_destroy_manager(wq);
io_wq_exit_workers(wq);
io_wq_put(wq);
}
......
......@@ -116,6 +116,7 @@ static inline void io_wq_put_hash(struct io_wq_hash *hash)
struct io_wq_data {
struct io_wq_hash *hash;
struct task_struct *task;
io_wq_work_fn *do_work;
free_work_fn *free_work;
};
......
......@@ -7911,7 +7911,8 @@ static struct io_wq_work *io_free_work(struct io_wq_work *work)
return req ? &req->work : NULL;
}
static struct io_wq *io_init_wq_offload(struct io_ring_ctx *ctx)
static struct io_wq *io_init_wq_offload(struct io_ring_ctx *ctx,
struct task_struct *task)
{
struct io_wq_hash *hash;
struct io_wq_data data;
......@@ -7928,6 +7929,7 @@ static struct io_wq *io_init_wq_offload(struct io_ring_ctx *ctx)
}
data.hash = hash;
data.task = task;
data.free_work = io_free_work;
data.do_work = io_wq_submit_work;
......@@ -7953,7 +7955,7 @@ static int io_uring_alloc_task_context(struct task_struct *task,
return ret;
}
tctx->io_wq = io_init_wq_offload(ctx);
tctx->io_wq = io_init_wq_offload(ctx, task);
if (IS_ERR(tctx->io_wq)) {
ret = PTR_ERR(tctx->io_wq);
percpu_counter_destroy(&tctx->inflight);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment