Commit 1d7bb1d5 authored by Jens Axboe's avatar Jens Axboe

io_uring: add support for backlogged CQ ring

Currently we drop completion events, if the CQ ring is full. That's fine
for requests with bounded completion times, but it may make it harder or
impossible to use io_uring with networked IO where request completion
times are generally unbounded. Or with POLL, for example, which is also
unbounded.

After this patch, we never overflow the ring, we simply store requests
in a backlog for later flushing. This flushing is done automatically by
the kernel. To prevent the backlog from growing indefinitely, if the
backlog is non-empty, we apply back pressure on IO submissions. Any
attempt to submit new IO with a non-empty backlog will get an -EBUSY
return from the kernel. This is a signal to the application that it has
backlogged CQ events, and that it must reap those before being allowed
to submit more IO.

Note that if we do return -EBUSY, we will have filled whatever
backlogged events into the CQ ring first, if there's room. This means
the application can safely reap events WITHOUT entering the kernel and
waiting for them, they are already available in the CQ ring.
Signed-off-by: default avatarJens Axboe <axboe@kernel.dk>
parent 78e19bbe
...@@ -185,6 +185,7 @@ struct io_ring_ctx { ...@@ -185,6 +185,7 @@ struct io_ring_ctx {
unsigned int flags; unsigned int flags;
bool compat; bool compat;
bool account_mem; bool account_mem;
bool cq_overflow_flushed;
/* /*
* Ring buffer of indices into array of io_uring_sqe, which is * Ring buffer of indices into array of io_uring_sqe, which is
...@@ -207,6 +208,7 @@ struct io_ring_ctx { ...@@ -207,6 +208,7 @@ struct io_ring_ctx {
struct list_head defer_list; struct list_head defer_list;
struct list_head timeout_list; struct list_head timeout_list;
struct list_head cq_overflow_list;
wait_queue_head_t inflight_wait; wait_queue_head_t inflight_wait;
} ____cacheline_aligned_in_smp; } ____cacheline_aligned_in_smp;
...@@ -414,6 +416,7 @@ static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p) ...@@ -414,6 +416,7 @@ static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
ctx->flags = p->flags; ctx->flags = p->flags;
init_waitqueue_head(&ctx->cq_wait); init_waitqueue_head(&ctx->cq_wait);
INIT_LIST_HEAD(&ctx->cq_overflow_list);
init_completion(&ctx->ctx_done); init_completion(&ctx->ctx_done);
init_completion(&ctx->sqo_thread_started); init_completion(&ctx->sqo_thread_started);
mutex_init(&ctx->uring_lock); mutex_init(&ctx->uring_lock);
...@@ -588,6 +591,67 @@ static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx) ...@@ -588,6 +591,67 @@ static struct io_uring_cqe *io_get_cqring(struct io_ring_ctx *ctx)
return &rings->cqes[tail & ctx->cq_mask]; return &rings->cqes[tail & ctx->cq_mask];
} }
static void io_cqring_ev_posted(struct io_ring_ctx *ctx)
{
if (waitqueue_active(&ctx->wait))
wake_up(&ctx->wait);
if (waitqueue_active(&ctx->sqo_wait))
wake_up(&ctx->sqo_wait);
if (ctx->cq_ev_fd)
eventfd_signal(ctx->cq_ev_fd, 1);
}
static void io_cqring_overflow_flush(struct io_ring_ctx *ctx, bool force)
{
struct io_rings *rings = ctx->rings;
struct io_uring_cqe *cqe;
struct io_kiocb *req;
unsigned long flags;
LIST_HEAD(list);
if (!force) {
if (list_empty_careful(&ctx->cq_overflow_list))
return;
if ((ctx->cached_cq_tail - READ_ONCE(rings->cq.head) ==
rings->cq_ring_entries))
return;
}
spin_lock_irqsave(&ctx->completion_lock, flags);
/* if force is set, the ring is going away. always drop after that */
if (force)
ctx->cq_overflow_flushed = true;
while (!list_empty(&ctx->cq_overflow_list)) {
cqe = io_get_cqring(ctx);
if (!cqe && !force)
break;
req = list_first_entry(&ctx->cq_overflow_list, struct io_kiocb,
list);
list_move(&req->list, &list);
if (cqe) {
WRITE_ONCE(cqe->user_data, req->user_data);
WRITE_ONCE(cqe->res, req->result);
WRITE_ONCE(cqe->flags, 0);
} else {
WRITE_ONCE(ctx->rings->cq_overflow,
atomic_inc_return(&ctx->cached_cq_overflow));
}
}
io_commit_cqring(ctx);
spin_unlock_irqrestore(&ctx->completion_lock, flags);
io_cqring_ev_posted(ctx);
while (!list_empty(&list)) {
req = list_first_entry(&list, struct io_kiocb, list);
list_del(&req->list);
io_put_req(req, NULL);
}
}
static void io_cqring_fill_event(struct io_kiocb *req, long res) static void io_cqring_fill_event(struct io_kiocb *req, long res)
{ {
struct io_ring_ctx *ctx = req->ctx; struct io_ring_ctx *ctx = req->ctx;
...@@ -601,26 +665,20 @@ static void io_cqring_fill_event(struct io_kiocb *req, long res) ...@@ -601,26 +665,20 @@ static void io_cqring_fill_event(struct io_kiocb *req, long res)
* the ring. * the ring.
*/ */
cqe = io_get_cqring(ctx); cqe = io_get_cqring(ctx);
if (cqe) { if (likely(cqe)) {
WRITE_ONCE(cqe->user_data, req->user_data); WRITE_ONCE(cqe->user_data, req->user_data);
WRITE_ONCE(cqe->res, res); WRITE_ONCE(cqe->res, res);
WRITE_ONCE(cqe->flags, 0); WRITE_ONCE(cqe->flags, 0);
} else { } else if (ctx->cq_overflow_flushed) {
WRITE_ONCE(ctx->rings->cq_overflow, WRITE_ONCE(ctx->rings->cq_overflow,
atomic_inc_return(&ctx->cached_cq_overflow)); atomic_inc_return(&ctx->cached_cq_overflow));
} else {
refcount_inc(&req->refs);
req->result = res;
list_add_tail(&req->list, &ctx->cq_overflow_list);
} }
} }
static void io_cqring_ev_posted(struct io_ring_ctx *ctx)
{
if (waitqueue_active(&ctx->wait))
wake_up(&ctx->wait);
if (waitqueue_active(&ctx->sqo_wait))
wake_up(&ctx->sqo_wait);
if (ctx->cq_ev_fd)
eventfd_signal(ctx->cq_ev_fd, 1);
}
static void io_cqring_add_event(struct io_kiocb *req, long res) static void io_cqring_add_event(struct io_kiocb *req, long res)
{ {
struct io_ring_ctx *ctx = req->ctx; struct io_ring_ctx *ctx = req->ctx;
...@@ -873,10 +931,20 @@ static void io_double_put_req(struct io_kiocb *req) ...@@ -873,10 +931,20 @@ static void io_double_put_req(struct io_kiocb *req)
__io_free_req(req); __io_free_req(req);
} }
static unsigned io_cqring_events(struct io_ring_ctx *ctx) static unsigned io_cqring_events(struct io_ring_ctx *ctx, bool noflush)
{ {
struct io_rings *rings = ctx->rings; struct io_rings *rings = ctx->rings;
/*
* noflush == true is from the waitqueue handler, just ensure we wake
* up the task, and the next invocation will flush the entries. We
* cannot safely to it from here.
*/
if (noflush && !list_empty(&ctx->cq_overflow_list))
return -1U;
io_cqring_overflow_flush(ctx, false);
/* See comment at the top of this file */ /* See comment at the top of this file */
smp_rmb(); smp_rmb();
return READ_ONCE(rings->cq.tail) - READ_ONCE(rings->cq.head); return READ_ONCE(rings->cq.tail) - READ_ONCE(rings->cq.head);
...@@ -1032,7 +1100,7 @@ static int __io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events, ...@@ -1032,7 +1100,7 @@ static int __io_iopoll_check(struct io_ring_ctx *ctx, unsigned *nr_events,
* If we do, we can potentially be spinning for commands that * If we do, we can potentially be spinning for commands that
* already triggered a CQE (eg in error). * already triggered a CQE (eg in error).
*/ */
if (io_cqring_events(ctx)) if (io_cqring_events(ctx, false))
break; break;
/* /*
...@@ -2876,6 +2944,11 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr, ...@@ -2876,6 +2944,11 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr,
int i, submitted = 0; int i, submitted = 0;
bool mm_fault = false; bool mm_fault = false;
if (!list_empty(&ctx->cq_overflow_list)) {
io_cqring_overflow_flush(ctx, false);
return -EBUSY;
}
if (nr > IO_PLUG_THRESHOLD) { if (nr > IO_PLUG_THRESHOLD) {
io_submit_state_start(&state, ctx, nr); io_submit_state_start(&state, ctx, nr);
statep = &state; statep = &state;
...@@ -2967,6 +3040,7 @@ static int io_sq_thread(void *data) ...@@ -2967,6 +3040,7 @@ static int io_sq_thread(void *data)
timeout = inflight = 0; timeout = inflight = 0;
while (!kthread_should_park()) { while (!kthread_should_park()) {
unsigned int to_submit; unsigned int to_submit;
int ret;
if (inflight) { if (inflight) {
unsigned nr_events = 0; unsigned nr_events = 0;
...@@ -3051,8 +3125,9 @@ static int io_sq_thread(void *data) ...@@ -3051,8 +3125,9 @@ static int io_sq_thread(void *data)
} }
to_submit = min(to_submit, ctx->sq_entries); to_submit = min(to_submit, ctx->sq_entries);
inflight += io_submit_sqes(ctx, to_submit, NULL, -1, &cur_mm, ret = io_submit_sqes(ctx, to_submit, NULL, -1, &cur_mm, true);
true); if (ret > 0)
inflight += ret;
} }
set_fs(old_fs); set_fs(old_fs);
...@@ -3073,7 +3148,7 @@ struct io_wait_queue { ...@@ -3073,7 +3148,7 @@ struct io_wait_queue {
unsigned nr_timeouts; unsigned nr_timeouts;
}; };
static inline bool io_should_wake(struct io_wait_queue *iowq) static inline bool io_should_wake(struct io_wait_queue *iowq, bool noflush)
{ {
struct io_ring_ctx *ctx = iowq->ctx; struct io_ring_ctx *ctx = iowq->ctx;
...@@ -3082,7 +3157,7 @@ static inline bool io_should_wake(struct io_wait_queue *iowq) ...@@ -3082,7 +3157,7 @@ static inline bool io_should_wake(struct io_wait_queue *iowq)
* started waiting. For timeouts, we always want to return to userspace, * started waiting. For timeouts, we always want to return to userspace,
* regardless of event count. * regardless of event count.
*/ */
return io_cqring_events(ctx) >= iowq->to_wait || return io_cqring_events(ctx, noflush) >= iowq->to_wait ||
atomic_read(&ctx->cq_timeouts) != iowq->nr_timeouts; atomic_read(&ctx->cq_timeouts) != iowq->nr_timeouts;
} }
...@@ -3092,7 +3167,8 @@ static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode, ...@@ -3092,7 +3167,8 @@ static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode,
struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue, struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue,
wq); wq);
if (!io_should_wake(iowq)) /* use noflush == true, as we can't safely rely on locking context */
if (!io_should_wake(iowq, true))
return -1; return -1;
return autoremove_wake_function(curr, mode, wake_flags, key); return autoremove_wake_function(curr, mode, wake_flags, key);
...@@ -3117,7 +3193,7 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, ...@@ -3117,7 +3193,7 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
struct io_rings *rings = ctx->rings; struct io_rings *rings = ctx->rings;
int ret = 0; int ret = 0;
if (io_cqring_events(ctx) >= min_events) if (io_cqring_events(ctx, false) >= min_events)
return 0; return 0;
if (sig) { if (sig) {
...@@ -3138,7 +3214,7 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, ...@@ -3138,7 +3214,7 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
do { do {
prepare_to_wait_exclusive(&ctx->wait, &iowq.wq, prepare_to_wait_exclusive(&ctx->wait, &iowq.wq,
TASK_INTERRUPTIBLE); TASK_INTERRUPTIBLE);
if (io_should_wake(&iowq)) if (io_should_wake(&iowq, false))
break; break;
schedule(); schedule();
if (signal_pending(current)) { if (signal_pending(current)) {
...@@ -4061,6 +4137,7 @@ static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx) ...@@ -4061,6 +4137,7 @@ static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
io_wq_cancel_all(ctx->io_wq); io_wq_cancel_all(ctx->io_wq);
io_iopoll_reap_events(ctx); io_iopoll_reap_events(ctx);
io_cqring_overflow_flush(ctx, true);
wait_for_completion(&ctx->ctx_done); wait_for_completion(&ctx->ctx_done);
io_ring_ctx_free(ctx); io_ring_ctx_free(ctx);
} }
...@@ -4116,8 +4193,10 @@ static int io_uring_flush(struct file *file, void *data) ...@@ -4116,8 +4193,10 @@ static int io_uring_flush(struct file *file, void *data)
struct io_ring_ctx *ctx = file->private_data; struct io_ring_ctx *ctx = file->private_data;
io_uring_cancel_files(ctx, data); io_uring_cancel_files(ctx, data);
if (fatal_signal_pending(current) || (current->flags & PF_EXITING)) if (fatal_signal_pending(current) || (current->flags & PF_EXITING)) {
io_cqring_overflow_flush(ctx, true);
io_wq_cancel_all(ctx->io_wq); io_wq_cancel_all(ctx->io_wq);
}
return 0; return 0;
} }
...@@ -4391,7 +4470,7 @@ static int io_uring_create(unsigned entries, struct io_uring_params *p) ...@@ -4391,7 +4470,7 @@ static int io_uring_create(unsigned entries, struct io_uring_params *p)
if (ret < 0) if (ret < 0)
goto err; goto err;
p->features = IORING_FEAT_SINGLE_MMAP; p->features = IORING_FEAT_SINGLE_MMAP | IORING_FEAT_NODROP;
trace_io_uring_create(ret, ctx, p->sq_entries, p->cq_entries, p->flags); trace_io_uring_create(ret, ctx, p->sq_entries, p->cq_entries, p->flags);
return ret; return ret;
err: err:
......
...@@ -155,6 +155,7 @@ struct io_uring_params { ...@@ -155,6 +155,7 @@ struct io_uring_params {
* io_uring_params->features flags * io_uring_params->features flags
*/ */
#define IORING_FEAT_SINGLE_MMAP (1U << 0) #define IORING_FEAT_SINGLE_MMAP (1U << 0)
#define IORING_FEAT_NODROP (1U << 1)
/* /*
* io_uring_register(2) opcodes and arguments * io_uring_register(2) opcodes and arguments
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment