Commit fb3a1f6c authored by Jens Axboe's avatar Jens Axboe

io-wq: have manager wait for all workers to exit

Instead of having to wait separately on workers and manager, just have
the manager wait on the workers. We use an atomic_t for the reference
here, as we need to start at 0 and allow increment from that. Since the
number of workers is naturally capped by the allowed nr of processes,
and that uses an int, there is no risk of overflow.
Signed-off-by: default avatarJens Axboe <axboe@kernel.dk>
parent 65d43023
...@@ -120,6 +120,9 @@ struct io_wq { ...@@ -120,6 +120,9 @@ struct io_wq {
refcount_t refs; refcount_t refs;
struct completion done; struct completion done;
atomic_t worker_refs;
struct completion worker_done;
struct hlist_node cpuhp_node; struct hlist_node cpuhp_node;
pid_t task_pid; pid_t task_pid;
...@@ -189,7 +192,8 @@ static void io_worker_exit(struct io_worker *worker) ...@@ -189,7 +192,8 @@ static void io_worker_exit(struct io_worker *worker)
raw_spin_unlock_irq(&wqe->lock); raw_spin_unlock_irq(&wqe->lock);
kfree_rcu(worker, rcu); kfree_rcu(worker, rcu);
io_wq_put(wqe->wq); if (atomic_dec_and_test(&wqe->wq->worker_refs))
complete(&wqe->wq->worker_done);
} }
static inline bool io_wqe_run_queue(struct io_wqe *wqe) static inline bool io_wqe_run_queue(struct io_wqe *wqe)
...@@ -648,14 +652,15 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) ...@@ -648,14 +652,15 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
init_completion(&worker->ref_done); init_completion(&worker->ref_done);
init_completion(&worker->started); init_completion(&worker->started);
refcount_inc(&wq->refs); atomic_inc(&wq->worker_refs);
if (index == IO_WQ_ACCT_BOUND) if (index == IO_WQ_ACCT_BOUND)
pid = io_wq_fork_thread(task_thread_bound, worker); pid = io_wq_fork_thread(task_thread_bound, worker);
else else
pid = io_wq_fork_thread(task_thread_unbound, worker); pid = io_wq_fork_thread(task_thread_unbound, worker);
if (pid < 0) { if (pid < 0) {
io_wq_put(wq); if (atomic_dec_and_test(&wq->worker_refs))
complete(&wq->worker_done);
kfree(worker); kfree(worker);
return false; return false;
} }
...@@ -736,6 +741,7 @@ static int io_wq_manager(void *data) ...@@ -736,6 +741,7 @@ static int io_wq_manager(void *data)
{ {
struct io_wq *wq = data; struct io_wq *wq = data;
char buf[TASK_COMM_LEN]; char buf[TASK_COMM_LEN];
int node;
sprintf(buf, "iou-mgr-%d", wq->task_pid); sprintf(buf, "iou-mgr-%d", wq->task_pid);
set_task_comm(current, buf); set_task_comm(current, buf);
...@@ -753,6 +759,15 @@ static int io_wq_manager(void *data) ...@@ -753,6 +759,15 @@ static int io_wq_manager(void *data)
} while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)); } while (!test_bit(IO_WQ_BIT_EXIT, &wq->state));
io_wq_check_workers(wq); io_wq_check_workers(wq);
rcu_read_lock();
for_each_node(node)
io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL);
rcu_read_unlock();
/* we might not ever have created any workers */
if (atomic_read(&wq->worker_refs))
wait_for_completion(&wq->worker_done);
wq->manager = NULL; wq->manager = NULL;
io_wq_put(wq); io_wq_put(wq);
do_exit(0); do_exit(0);
...@@ -796,6 +811,7 @@ static int io_wq_fork_manager(struct io_wq *wq) ...@@ -796,6 +811,7 @@ static int io_wq_fork_manager(struct io_wq *wq)
if (wq->manager) if (wq->manager)
return 0; return 0;
reinit_completion(&wq->worker_done);
clear_bit(IO_WQ_BIT_EXIT, &wq->state); clear_bit(IO_WQ_BIT_EXIT, &wq->state);
refcount_inc(&wq->refs); refcount_inc(&wq->refs);
current->flags |= PF_IO_WORKER; current->flags |= PF_IO_WORKER;
...@@ -1050,6 +1066,9 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) ...@@ -1050,6 +1066,9 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
init_completion(&wq->done); init_completion(&wq->done);
refcount_set(&wq->refs, 1); refcount_set(&wq->refs, 1);
init_completion(&wq->worker_done);
atomic_set(&wq->worker_refs, 0);
ret = io_wq_fork_manager(wq); ret = io_wq_fork_manager(wq);
if (!ret) if (!ret)
return wq; return wq;
...@@ -1077,11 +1096,6 @@ static void io_wq_destroy(struct io_wq *wq) ...@@ -1077,11 +1096,6 @@ static void io_wq_destroy(struct io_wq *wq)
if (wq->manager) if (wq->manager)
wake_up_process(wq->manager); wake_up_process(wq->manager);
rcu_read_lock();
for_each_node(node)
io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL);
rcu_read_unlock();
spin_lock_irq(&wq->hash->wait.lock); spin_lock_irq(&wq->hash->wait.lock);
for_each_node(node) { for_each_node(node) {
struct io_wqe *wqe = wq->wqes[node]; struct io_wqe *wqe = wq->wqes[node];
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment