Commit 5fd46178 authored by Jens Axboe's avatar Jens Axboe

io_uring: be smarter about waking multiple CQ ring waiters

Currently we only wake the first waiter, even if we have enough entries
posted to satisfy multiple waiters. Improve that situation so that
every waiter knows how much the CQ tail has to advance before they can
be safely woken up.

With this change, if we have N waiters each asking for 1 event and we get
4 completions, then we wake up 4 waiters. If we have N waiters asking
for 2 completions and we get 4 completions, then we wake up the first
two. Previously, only the first waiter would've been woken up.
Signed-off-by: default avatarJens Axboe <axboe@kernel.dk>
parent d3e9f732
...@@ -1436,11 +1436,13 @@ static inline bool io_should_trigger_evfd(struct io_ring_ctx *ctx) ...@@ -1436,11 +1436,13 @@ static inline bool io_should_trigger_evfd(struct io_ring_ctx *ctx)
static void io_cqring_ev_posted(struct io_ring_ctx *ctx) static void io_cqring_ev_posted(struct io_ring_ctx *ctx)
{ {
/* see waitqueue_active() comment */ /*
smp_mb(); * wake_up_all() may seem excessive, but io_wake_function() and
* io_should_wake() handle the termination of the loop and only
if (waitqueue_active(&ctx->cq_wait)) * wake as many waiters as we need to.
wake_up(&ctx->cq_wait); */
if (wq_has_sleeper(&ctx->cq_wait))
wake_up_all(&ctx->cq_wait);
if (ctx->sq_data && waitqueue_active(&ctx->sq_data->wait)) if (ctx->sq_data && waitqueue_active(&ctx->sq_data->wait))
wake_up(&ctx->sq_data->wait); wake_up(&ctx->sq_data->wait);
if (io_should_trigger_evfd(ctx)) if (io_should_trigger_evfd(ctx))
...@@ -1453,12 +1455,9 @@ static void io_cqring_ev_posted(struct io_ring_ctx *ctx) ...@@ -1453,12 +1455,9 @@ static void io_cqring_ev_posted(struct io_ring_ctx *ctx)
static void io_cqring_ev_posted_iopoll(struct io_ring_ctx *ctx) static void io_cqring_ev_posted_iopoll(struct io_ring_ctx *ctx)
{ {
/* see waitqueue_active() comment */
smp_mb();
if (ctx->flags & IORING_SETUP_SQPOLL) { if (ctx->flags & IORING_SETUP_SQPOLL) {
if (waitqueue_active(&ctx->cq_wait)) if (wq_has_sleeper(&ctx->cq_wait))
wake_up(&ctx->cq_wait); wake_up_all(&ctx->cq_wait);
} }
if (io_should_trigger_evfd(ctx)) if (io_should_trigger_evfd(ctx))
eventfd_signal(ctx->cq_ev_fd, 1); eventfd_signal(ctx->cq_ev_fd, 1);
...@@ -6976,21 +6975,21 @@ static int io_sq_thread(void *data) ...@@ -6976,21 +6975,21 @@ static int io_sq_thread(void *data)
struct io_wait_queue { struct io_wait_queue {
struct wait_queue_entry wq; struct wait_queue_entry wq;
struct io_ring_ctx *ctx; struct io_ring_ctx *ctx;
unsigned to_wait; unsigned cq_tail;
unsigned nr_timeouts; unsigned nr_timeouts;
}; };
static inline bool io_should_wake(struct io_wait_queue *iowq) static inline bool io_should_wake(struct io_wait_queue *iowq)
{ {
struct io_ring_ctx *ctx = iowq->ctx; struct io_ring_ctx *ctx = iowq->ctx;
int dist = ctx->cached_cq_tail - (int) iowq->cq_tail;
/* /*
* Wake up if we have enough events, or if a timeout occurred since we * Wake up if we have enough events, or if a timeout occurred since we
* started waiting. For timeouts, we always want to return to userspace, * started waiting. For timeouts, we always want to return to userspace,
* regardless of event count. * regardless of event count.
*/ */
return io_cqring_events(ctx) >= iowq->to_wait || return dist >= 0 || atomic_read(&ctx->cq_timeouts) != iowq->nr_timeouts;
atomic_read(&ctx->cq_timeouts) != iowq->nr_timeouts;
} }
static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode, static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode,
...@@ -7053,7 +7052,6 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, ...@@ -7053,7 +7052,6 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
.entry = LIST_HEAD_INIT(iowq.wq.entry), .entry = LIST_HEAD_INIT(iowq.wq.entry),
}, },
.ctx = ctx, .ctx = ctx,
.to_wait = min_events,
}; };
struct io_rings *rings = ctx->rings; struct io_rings *rings = ctx->rings;
signed long timeout = MAX_SCHEDULE_TIMEOUT; signed long timeout = MAX_SCHEDULE_TIMEOUT;
...@@ -7089,6 +7087,7 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, ...@@ -7089,6 +7087,7 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
} }
iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts); iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts);
iowq.cq_tail = READ_ONCE(ctx->rings->cq.head) + min_events;
trace_io_uring_cqring_wait(ctx, min_events); trace_io_uring_cqring_wait(ctx, min_events);
do { do {
/* if we can't even flush overflow, don't wait for more */ /* if we can't even flush overflow, don't wait for more */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment