Commit c5766668 authored by Jens Axboe's avatar Jens Axboe

io_uring: optimize submit_and_wait API

For some applications that end up using a submit-and-wait type of
approach for certain batches of IO, we can make that a bit more
efficient by allowing the application to block for the last IO
submission. This prevents an async when we don't need it, as the
application will be blocking for the completion event(s) anyway.

Typical use cases are using the liburing
io_uring_submit_and_wait() API, or just using io_uring_enter()
doing both submissions and completions. As a specific example,
RocksDB doing MultiGet() is sped up quite a bit with this
change.
Signed-off-by: default avatarJens Axboe <axboe@kernel.dk>
parent 4fe2c963
...@@ -2052,11 +2052,11 @@ static int io_req_set_file(struct io_ring_ctx *ctx, const struct sqe_submit *s, ...@@ -2052,11 +2052,11 @@ static int io_req_set_file(struct io_ring_ctx *ctx, const struct sqe_submit *s,
} }
static int __io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req, static int __io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
struct sqe_submit *s) struct sqe_submit *s, bool force_nonblock)
{ {
int ret; int ret;
ret = __io_submit_sqe(ctx, req, s, true); ret = __io_submit_sqe(ctx, req, s, force_nonblock);
if (ret == -EAGAIN && !(req->flags & REQ_F_NOWAIT)) { if (ret == -EAGAIN && !(req->flags & REQ_F_NOWAIT)) {
struct io_uring_sqe *sqe_copy; struct io_uring_sqe *sqe_copy;
...@@ -2099,7 +2099,7 @@ static int __io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req, ...@@ -2099,7 +2099,7 @@ static int __io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
} }
static int io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req, static int io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
struct sqe_submit *s) struct sqe_submit *s, bool force_nonblock)
{ {
int ret; int ret;
...@@ -2112,17 +2112,18 @@ static int io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req, ...@@ -2112,17 +2112,18 @@ static int io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
return 0; return 0;
} }
return __io_queue_sqe(ctx, req, s); return __io_queue_sqe(ctx, req, s, force_nonblock);
} }
static int io_queue_link_head(struct io_ring_ctx *ctx, struct io_kiocb *req, static int io_queue_link_head(struct io_ring_ctx *ctx, struct io_kiocb *req,
struct sqe_submit *s, struct io_kiocb *shadow) struct sqe_submit *s, struct io_kiocb *shadow,
bool force_nonblock)
{ {
int ret; int ret;
int need_submit = false; int need_submit = false;
if (!shadow) if (!shadow)
return io_queue_sqe(ctx, req, s); return io_queue_sqe(ctx, req, s, force_nonblock);
/* /*
* Mark the first IO in link list as DRAIN, let all the following * Mark the first IO in link list as DRAIN, let all the following
...@@ -2151,7 +2152,7 @@ static int io_queue_link_head(struct io_ring_ctx *ctx, struct io_kiocb *req, ...@@ -2151,7 +2152,7 @@ static int io_queue_link_head(struct io_ring_ctx *ctx, struct io_kiocb *req,
spin_unlock_irq(&ctx->completion_lock); spin_unlock_irq(&ctx->completion_lock);
if (need_submit) if (need_submit)
return __io_queue_sqe(ctx, req, s); return __io_queue_sqe(ctx, req, s, force_nonblock);
return 0; return 0;
} }
...@@ -2159,7 +2160,8 @@ static int io_queue_link_head(struct io_ring_ctx *ctx, struct io_kiocb *req, ...@@ -2159,7 +2160,8 @@ static int io_queue_link_head(struct io_ring_ctx *ctx, struct io_kiocb *req,
#define SQE_VALID_FLAGS (IOSQE_FIXED_FILE|IOSQE_IO_DRAIN|IOSQE_IO_LINK) #define SQE_VALID_FLAGS (IOSQE_FIXED_FILE|IOSQE_IO_DRAIN|IOSQE_IO_LINK)
static void io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s, static void io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s,
struct io_submit_state *state, struct io_kiocb **link) struct io_submit_state *state, struct io_kiocb **link,
bool force_nonblock)
{ {
struct io_uring_sqe *sqe_copy; struct io_uring_sqe *sqe_copy;
struct io_kiocb *req; struct io_kiocb *req;
...@@ -2212,7 +2214,7 @@ static void io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s, ...@@ -2212,7 +2214,7 @@ static void io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s,
INIT_LIST_HEAD(&req->link_list); INIT_LIST_HEAD(&req->link_list);
*link = req; *link = req;
} else { } else {
io_queue_sqe(ctx, req, s); io_queue_sqe(ctx, req, s, force_nonblock);
} }
} }
...@@ -2316,7 +2318,8 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes, ...@@ -2316,7 +2318,8 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes,
* that's the end of the chain. Submit the previous link. * that's the end of the chain. Submit the previous link.
*/ */
if (!prev_was_link && link) { if (!prev_was_link && link) {
io_queue_link_head(ctx, link, &link->submit, shadow_req); io_queue_link_head(ctx, link, &link->submit, shadow_req,
true);
link = NULL; link = NULL;
} }
prev_was_link = (sqes[i].sqe->flags & IOSQE_IO_LINK) != 0; prev_was_link = (sqes[i].sqe->flags & IOSQE_IO_LINK) != 0;
...@@ -2337,13 +2340,13 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes, ...@@ -2337,13 +2340,13 @@ static int io_submit_sqes(struct io_ring_ctx *ctx, struct sqe_submit *sqes,
sqes[i].has_user = has_user; sqes[i].has_user = has_user;
sqes[i].needs_lock = true; sqes[i].needs_lock = true;
sqes[i].needs_fixed_file = true; sqes[i].needs_fixed_file = true;
io_submit_sqe(ctx, &sqes[i], statep, &link); io_submit_sqe(ctx, &sqes[i], statep, &link, true);
submitted++; submitted++;
} }
} }
if (link) if (link)
io_queue_link_head(ctx, link, &link->submit, shadow_req); io_queue_link_head(ctx, link, &link->submit, shadow_req, true);
if (statep) if (statep)
io_submit_state_end(&state); io_submit_state_end(&state);
...@@ -2475,7 +2478,8 @@ static int io_sq_thread(void *data) ...@@ -2475,7 +2478,8 @@ static int io_sq_thread(void *data)
return 0; return 0;
} }
static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit) static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit,
bool block_for_last)
{ {
struct io_submit_state state, *statep = NULL; struct io_submit_state state, *statep = NULL;
struct io_kiocb *link = NULL; struct io_kiocb *link = NULL;
...@@ -2489,6 +2493,7 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit) ...@@ -2489,6 +2493,7 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
} }
for (i = 0; i < to_submit; i++) { for (i = 0; i < to_submit; i++) {
bool force_nonblock = true;
struct sqe_submit s; struct sqe_submit s;
if (!io_get_sqring(ctx, &s)) if (!io_get_sqring(ctx, &s))
...@@ -2499,7 +2504,8 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit) ...@@ -2499,7 +2504,8 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
* that's the end of the chain. Submit the previous link. * that's the end of the chain. Submit the previous link.
*/ */
if (!prev_was_link && link) { if (!prev_was_link && link) {
io_queue_link_head(ctx, link, &link->submit, shadow_req); io_queue_link_head(ctx, link, &link->submit, shadow_req,
force_nonblock);
link = NULL; link = NULL;
} }
prev_was_link = (s.sqe->flags & IOSQE_IO_LINK) != 0; prev_was_link = (s.sqe->flags & IOSQE_IO_LINK) != 0;
...@@ -2517,12 +2523,24 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit) ...@@ -2517,12 +2523,24 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit)
s.needs_lock = false; s.needs_lock = false;
s.needs_fixed_file = false; s.needs_fixed_file = false;
submit++; submit++;
io_submit_sqe(ctx, &s, statep, &link);
/*
* The caller will block for events after submit, submit the
* last IO non-blocking. This is either the only IO it's
* submitting, or it already submitted the previous ones. This
* improves performance by avoiding an async punt that we don't
* need to do.
*/
if (block_for_last && submit == to_submit)
force_nonblock = false;
io_submit_sqe(ctx, &s, statep, &link, force_nonblock);
} }
io_commit_sqring(ctx); io_commit_sqring(ctx);
if (link) if (link)
io_queue_link_head(ctx, link, &link->submit, shadow_req); io_queue_link_head(ctx, link, &link->submit, shadow_req,
block_for_last);
if (statep) if (statep)
io_submit_state_end(statep); io_submit_state_end(statep);
...@@ -3290,10 +3308,21 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit, ...@@ -3290,10 +3308,21 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
ret = 0; ret = 0;
if (to_submit) { if (to_submit) {
bool block_for_last = false;
to_submit = min(to_submit, ctx->sq_entries); to_submit = min(to_submit, ctx->sq_entries);
/*
* Allow last submission to block in a series, IFF the caller
* asked to wait for events and we don't currently have
* enough. This potentially avoids an async punt.
*/
if (to_submit == min_complete &&
io_cqring_events(ctx->rings) < min_complete)
block_for_last = true;
mutex_lock(&ctx->uring_lock); mutex_lock(&ctx->uring_lock);
submitted = io_ring_submit(ctx, to_submit); submitted = io_ring_submit(ctx, to_submit, block_for_last);
mutex_unlock(&ctx->uring_lock); mutex_unlock(&ctx->uring_lock);
} }
if (flags & IORING_ENTER_GETEVENTS) { if (flags & IORING_ENTER_GETEVENTS) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment