Commit 42995cee authored by Linus Torvalds's avatar Linus Torvalds

Merge tag 'io_uring-5.14-2021-08-13' of git://git.kernel.dk/linux-block

Pull io_uring fixes from Jens Axboe:
 "A bit bigger than the previous weeks, but mostly just a few stable
  bound fixes. In detail:

   - Followup fixes to patches from last week for io-wq, turns out they
     weren't complete (Hao)

   - Two lockdep reported fixes out of the RT camp (me)

   - Sync the io_uring-cp example with liburing, as a few bug fixes
     never made it to the kernel carried version (me)

   - SQPOLL related TIF_NOTIFY_SIGNAL fix (Nadav)

   - Use WRITE_ONCE() when writing sq flags (Nadav)

   - io_rsrc_put_work() deadlock fix (Pavel)"

* tag 'io_uring-5.14-2021-08-13' of git://git.kernel.dk/linux-block:
  tools/io_uring/io_uring-cp: sync with liburing example
  io_uring: fix ctx-exit io_rsrc_put_work() deadlock
  io_uring: drop ctx->uring_lock before flushing work item
  io-wq: fix IO_WORKER_F_FIXED issue in create_io_worker()
  io-wq: fix bug of creating io-wokers unconditionally
  io_uring: rsrc ref lock needs to be IRQ safe
  io_uring: Use WRITE_ONCE() when writing to sq_flags
  io_uring: clear TIF_NOTIFY_SIGNAL when running task work
parents 462938cd 8f40d037
......@@ -129,7 +129,7 @@ struct io_cb_cancel_data {
bool cancel_all;
};
static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index);
static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index, bool first);
static void io_wqe_dec_running(struct io_worker *worker);
static bool io_worker_get(struct io_worker *worker)
......@@ -248,18 +248,20 @@ static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
rcu_read_unlock();
if (!ret) {
bool do_create = false;
bool do_create = false, first = false;
raw_spin_lock_irq(&wqe->lock);
if (acct->nr_workers < acct->max_workers) {
atomic_inc(&acct->nr_running);
atomic_inc(&wqe->wq->worker_refs);
if (!acct->nr_workers)
first = true;
acct->nr_workers++;
do_create = true;
}
raw_spin_unlock_irq(&wqe->lock);
if (do_create)
create_io_worker(wqe->wq, wqe, acct->index);
create_io_worker(wqe->wq, wqe, acct->index, first);
}
}
......@@ -282,16 +284,26 @@ static void create_worker_cb(struct callback_head *cb)
struct io_wq *wq;
struct io_wqe *wqe;
struct io_wqe_acct *acct;
bool do_create = false, first = false;
cwd = container_of(cb, struct create_worker_data, work);
wqe = cwd->wqe;
wq = wqe->wq;
acct = &wqe->acct[cwd->index];
raw_spin_lock_irq(&wqe->lock);
if (acct->nr_workers < acct->max_workers)
if (acct->nr_workers < acct->max_workers) {
if (!acct->nr_workers)
first = true;
acct->nr_workers++;
do_create = true;
}
raw_spin_unlock_irq(&wqe->lock);
create_io_worker(wq, cwd->wqe, cwd->index);
if (do_create) {
create_io_worker(wq, wqe, cwd->index, first);
} else {
atomic_dec(&acct->nr_running);
io_worker_ref_put(wq);
}
kfree(cwd);
}
......@@ -629,7 +641,7 @@ void io_wq_worker_sleeping(struct task_struct *tsk)
raw_spin_unlock_irq(&worker->wqe->lock);
}
static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index, bool first)
{
struct io_wqe_acct *acct = &wqe->acct[index];
struct io_worker *worker;
......@@ -670,7 +682,7 @@ static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
worker->flags |= IO_WORKER_F_FREE;
if (index == IO_WQ_ACCT_BOUND)
worker->flags |= IO_WORKER_F_BOUND;
if ((acct->nr_workers == 1) && (worker->flags & IO_WORKER_F_BOUND))
if (first && (worker->flags & IO_WORKER_F_BOUND))
worker->flags |= IO_WORKER_F_FIXED;
raw_spin_unlock_irq(&wqe->lock);
wake_up_new_task(tsk);
......
......@@ -78,6 +78,7 @@
#include <linux/task_work.h>
#include <linux/pagemap.h>
#include <linux/io_uring.h>
#include <linux/tracehook.h>
#define CREATE_TRACE_POINTS
#include <trace/events/io_uring.h>
......@@ -1499,7 +1500,8 @@ static bool __io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force)
all_flushed = list_empty(&ctx->cq_overflow_list);
if (all_flushed) {
clear_bit(0, &ctx->check_cq_overflow);
ctx->rings->sq_flags &= ~IORING_SQ_CQ_OVERFLOW;
WRITE_ONCE(ctx->rings->sq_flags,
ctx->rings->sq_flags & ~IORING_SQ_CQ_OVERFLOW);
}
if (posted)
......@@ -1578,7 +1580,9 @@ static bool io_cqring_event_overflow(struct io_ring_ctx *ctx, u64 user_data,
}
if (list_empty(&ctx->cq_overflow_list)) {
set_bit(0, &ctx->check_cq_overflow);
ctx->rings->sq_flags |= IORING_SQ_CQ_OVERFLOW;
WRITE_ONCE(ctx->rings->sq_flags,
ctx->rings->sq_flags | IORING_SQ_CQ_OVERFLOW);
}
ocqe->cqe.user_data = user_data;
ocqe->cqe.res = res;
......@@ -2222,9 +2226,9 @@ static inline unsigned int io_put_rw_kbuf(struct io_kiocb *req)
static inline bool io_run_task_work(void)
{
if (current->task_works) {
if (test_thread_flag(TIF_NOTIFY_SIGNAL) || current->task_works) {
__set_current_state(TASK_RUNNING);
task_work_run();
tracehook_notify_signal();
return true;
}
......@@ -6803,14 +6807,16 @@ static inline void io_ring_set_wakeup_flag(struct io_ring_ctx *ctx)
{
/* Tell userspace we may need a wakeup call */
spin_lock_irq(&ctx->completion_lock);
ctx->rings->sq_flags |= IORING_SQ_NEED_WAKEUP;
WRITE_ONCE(ctx->rings->sq_flags,
ctx->rings->sq_flags | IORING_SQ_NEED_WAKEUP);
spin_unlock_irq(&ctx->completion_lock);
}
static inline void io_ring_clear_wakeup_flag(struct io_ring_ctx *ctx)
{
spin_lock_irq(&ctx->completion_lock);
ctx->rings->sq_flags &= ~IORING_SQ_NEED_WAKEUP;
WRITE_ONCE(ctx->rings->sq_flags,
ctx->rings->sq_flags & ~IORING_SQ_NEED_WAKEUP);
spin_unlock_irq(&ctx->completion_lock);
}
......@@ -7132,16 +7138,6 @@ static void **io_alloc_page_table(size_t size)
return table;
}
static inline void io_rsrc_ref_lock(struct io_ring_ctx *ctx)
{
spin_lock_bh(&ctx->rsrc_ref_lock);
}
static inline void io_rsrc_ref_unlock(struct io_ring_ctx *ctx)
{
spin_unlock_bh(&ctx->rsrc_ref_lock);
}
static void io_rsrc_node_destroy(struct io_rsrc_node *ref_node)
{
percpu_ref_exit(&ref_node->refs);
......@@ -7158,9 +7154,9 @@ static void io_rsrc_node_switch(struct io_ring_ctx *ctx,
struct io_rsrc_node *rsrc_node = ctx->rsrc_node;
rsrc_node->rsrc_data = data_to_kill;
io_rsrc_ref_lock(ctx);
spin_lock_irq(&ctx->rsrc_ref_lock);
list_add_tail(&rsrc_node->node, &ctx->rsrc_ref_list);
io_rsrc_ref_unlock(ctx);
spin_unlock_irq(&ctx->rsrc_ref_lock);
atomic_inc(&data_to_kill->refs);
percpu_ref_kill(&rsrc_node->refs);
......@@ -7199,17 +7195,19 @@ static int io_rsrc_ref_quiesce(struct io_rsrc_data *data, struct io_ring_ctx *ct
/* kill initial ref, already quiesced if zero */
if (atomic_dec_and_test(&data->refs))
break;
mutex_unlock(&ctx->uring_lock);
flush_delayed_work(&ctx->rsrc_put_work);
ret = wait_for_completion_interruptible(&data->done);
if (!ret)
if (!ret) {
mutex_lock(&ctx->uring_lock);
break;
}
atomic_inc(&data->refs);
/* wait for all works potentially completing data->done */
flush_delayed_work(&ctx->rsrc_put_work);
reinit_completion(&data->done);
mutex_unlock(&ctx->uring_lock);
ret = io_run_task_work_sig();
mutex_lock(&ctx->uring_lock);
} while (ret >= 0);
......@@ -7668,9 +7666,10 @@ static void io_rsrc_node_ref_zero(struct percpu_ref *ref)
{
struct io_rsrc_node *node = container_of(ref, struct io_rsrc_node, refs);
struct io_ring_ctx *ctx = node->rsrc_data->ctx;
unsigned long flags;
bool first_add = false;
io_rsrc_ref_lock(ctx);
spin_lock_irqsave(&ctx->rsrc_ref_lock, flags);
node->done = true;
while (!list_empty(&ctx->rsrc_ref_list)) {
......@@ -7682,7 +7681,7 @@ static void io_rsrc_node_ref_zero(struct percpu_ref *ref)
list_del(&node->node);
first_add |= llist_add(&node->llist, &ctx->rsrc_put_llist);
}
io_rsrc_ref_unlock(ctx);
spin_unlock_irqrestore(&ctx->rsrc_ref_lock, flags);
if (first_add)
mod_delayed_work(system_wq, &ctx->rsrc_put_work, HZ);
......@@ -8653,13 +8652,10 @@ static void io_req_caches_free(struct io_ring_ctx *ctx)
mutex_unlock(&ctx->uring_lock);
}
static bool io_wait_rsrc_data(struct io_rsrc_data *data)
static void io_wait_rsrc_data(struct io_rsrc_data *data)
{
if (!data)
return false;
if (!atomic_dec_and_test(&data->refs))
if (data && !atomic_dec_and_test(&data->refs))
wait_for_completion(&data->done);
return true;
}
static void io_ring_ctx_free(struct io_ring_ctx *ctx)
......@@ -8671,10 +8667,14 @@ static void io_ring_ctx_free(struct io_ring_ctx *ctx)
ctx->mm_account = NULL;
}
/* __io_rsrc_put_work() may need uring_lock to progress, wait w/o it */
io_wait_rsrc_data(ctx->buf_data);
io_wait_rsrc_data(ctx->file_data);
mutex_lock(&ctx->uring_lock);
if (io_wait_rsrc_data(ctx->buf_data))
if (ctx->buf_data)
__io_sqe_buffers_unregister(ctx);
if (io_wait_rsrc_data(ctx->file_data))
if (ctx->file_data)
__io_sqe_files_unregister(ctx);
if (ctx->rings)
__io_cqring_overflow_flush(ctx, true);
......
......@@ -131,8 +131,7 @@ static int copy_file(struct io_uring *ring, off_t insize)
writes = reads = offset = 0;
while (insize || write_left) {
unsigned long had_reads;
int got_comp;
int had_reads, got_comp;
/*
* Queue up as many reads as we can
......@@ -174,8 +173,13 @@ static int copy_file(struct io_uring *ring, off_t insize)
if (!got_comp) {
ret = io_uring_wait_cqe(ring, &cqe);
got_comp = 1;
} else
} else {
ret = io_uring_peek_cqe(ring, &cqe);
if (ret == -EAGAIN) {
cqe = NULL;
ret = 0;
}
}
if (ret < 0) {
fprintf(stderr, "io_uring_peek_cqe: %s\n",
strerror(-ret));
......@@ -194,7 +198,7 @@ static int copy_file(struct io_uring *ring, off_t insize)
fprintf(stderr, "cqe failed: %s\n",
strerror(-cqe->res));
return 1;
} else if ((size_t) cqe->res != data->iov.iov_len) {
} else if (cqe->res != data->iov.iov_len) {
/* Short read/write, adjust and requeue */
data->iov.iov_base += cqe->res;
data->iov.iov_len -= cqe->res;
......@@ -221,6 +225,25 @@ static int copy_file(struct io_uring *ring, off_t insize)
}
}
/* wait out pending writes */
while (writes) {
struct io_data *data;
ret = io_uring_wait_cqe(ring, &cqe);
if (ret) {
fprintf(stderr, "wait_cqe=%d\n", ret);
return 1;
}
if (cqe->res < 0) {
fprintf(stderr, "write res=%d\n", cqe->res);
return 1;
}
data = io_uring_cqe_get_data(cqe);
free(data);
writes--;
io_uring_cqe_seen(ring, cqe);
}
return 0;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment