Commit 0617bb50 authored by Jens Axboe's avatar Jens Axboe

io_uring/msg_ring: improve handling of target CQE posting

Use the exported helper for queueing task_work for message passing,
rather than rolling our own.

Note that this is only done for strict data messages for now, file
descriptor passing messages still rely on the kernel task_work. It could
get converted at some point if it's performance critical.

This improves peak performance of message passing by about 5x in some
basic testing, with 2 threads just sending messages to each other.
Before this change, it was capped at around 700K/sec, with the change
it's at over 4M/sec.
Signed-off-by: default avatarJens Axboe <axboe@kernel.dk>
parent f33096a3
...@@ -13,7 +13,6 @@ ...@@ -13,7 +13,6 @@
#include "filetable.h" #include "filetable.h"
#include "msg_ring.h" #include "msg_ring.h"
/* All valid masks for MSG_RING */ /* All valid masks for MSG_RING */
#define IORING_MSG_RING_MASK (IORING_MSG_RING_CQE_SKIP | \ #define IORING_MSG_RING_MASK (IORING_MSG_RING_CQE_SKIP | \
IORING_MSG_RING_FLAGS_PASS) IORING_MSG_RING_FLAGS_PASS)
...@@ -71,54 +70,43 @@ static inline bool io_msg_need_remote(struct io_ring_ctx *target_ctx) ...@@ -71,54 +70,43 @@ static inline bool io_msg_need_remote(struct io_ring_ctx *target_ctx)
return target_ctx->task_complete; return target_ctx->task_complete;
} }
static int io_msg_exec_remote(struct io_kiocb *req, task_work_func_t func) static void io_msg_tw_complete(struct io_kiocb *req, struct io_tw_state *ts)
{ {
struct io_ring_ctx *ctx = req->file->private_data; struct io_ring_ctx *ctx = req->ctx;
struct io_msg *msg = io_kiocb_to_cmd(req, struct io_msg);
struct task_struct *task = READ_ONCE(ctx->submitter_task);
if (unlikely(!task))
return -EOWNERDEAD;
init_task_work(&msg->tw, func); io_add_aux_cqe(ctx, req->cqe.user_data, req->cqe.res, req->cqe.flags);
if (task_work_add(task, &msg->tw, TWA_SIGNAL)) kmem_cache_free(req_cachep, req);
return -EOWNERDEAD; percpu_ref_put(&ctx->refs);
}
return IOU_ISSUE_SKIP_COMPLETE; static void io_msg_remote_post(struct io_ring_ctx *ctx, struct io_kiocb *req,
int res, u32 cflags, u64 user_data)
{
req->cqe.user_data = user_data;
io_req_set_res(req, res, cflags);
percpu_ref_get(&ctx->refs);
req->ctx = ctx;
req->task = READ_ONCE(ctx->submitter_task);
req->io_task_work.func = io_msg_tw_complete;
io_req_task_work_add_remote(req, ctx, IOU_F_TWQ_LAZY_WAKE);
} }
static void io_msg_tw_complete(struct callback_head *head) static int io_msg_data_remote(struct io_kiocb *req)
{ {
struct io_msg *msg = container_of(head, struct io_msg, tw);
struct io_kiocb *req = cmd_to_io_kiocb(msg);
struct io_ring_ctx *target_ctx = req->file->private_data; struct io_ring_ctx *target_ctx = req->file->private_data;
int ret = 0; struct io_msg *msg = io_kiocb_to_cmd(req, struct io_msg);
struct io_kiocb *target;
if (current->flags & PF_EXITING) { u32 flags = 0;
ret = -EOWNERDEAD;
} else {
u32 flags = 0;
if (msg->flags & IORING_MSG_RING_FLAGS_PASS)
flags = msg->cqe_flags;
/*
* If the target ring is using IOPOLL mode, then we need to be
* holding the uring_lock for posting completions. Other ring
* types rely on the regular completion locking, which is
* handled while posting.
*/
if (target_ctx->flags & IORING_SETUP_IOPOLL)
mutex_lock(&target_ctx->uring_lock);
if (!io_post_aux_cqe(target_ctx, msg->user_data, msg->len, flags))
ret = -EOVERFLOW;
if (target_ctx->flags & IORING_SETUP_IOPOLL)
mutex_unlock(&target_ctx->uring_lock);
}
if (ret < 0) target = kmem_cache_alloc(req_cachep, GFP_KERNEL);
req_set_fail(req); if (unlikely(!target))
io_req_queue_tw_complete(req, ret); return -ENOMEM;
if (msg->flags & IORING_MSG_RING_FLAGS_PASS)
flags = msg->cqe_flags;
io_msg_remote_post(target_ctx, target, msg->len, flags, msg->user_data);
return 0;
} }
static int io_msg_ring_data(struct io_kiocb *req, unsigned int issue_flags) static int io_msg_ring_data(struct io_kiocb *req, unsigned int issue_flags)
...@@ -136,7 +124,7 @@ static int io_msg_ring_data(struct io_kiocb *req, unsigned int issue_flags) ...@@ -136,7 +124,7 @@ static int io_msg_ring_data(struct io_kiocb *req, unsigned int issue_flags)
return -EBADFD; return -EBADFD;
if (io_msg_need_remote(target_ctx)) if (io_msg_need_remote(target_ctx))
return io_msg_exec_remote(req, io_msg_tw_complete); return io_msg_data_remote(req);
if (msg->flags & IORING_MSG_RING_FLAGS_PASS) if (msg->flags & IORING_MSG_RING_FLAGS_PASS)
flags = msg->cqe_flags; flags = msg->cqe_flags;
...@@ -216,6 +204,22 @@ static void io_msg_tw_fd_complete(struct callback_head *head) ...@@ -216,6 +204,22 @@ static void io_msg_tw_fd_complete(struct callback_head *head)
io_req_queue_tw_complete(req, ret); io_req_queue_tw_complete(req, ret);
} }
static int io_msg_fd_remote(struct io_kiocb *req)
{
struct io_ring_ctx *ctx = req->file->private_data;
struct io_msg *msg = io_kiocb_to_cmd(req, struct io_msg);
struct task_struct *task = READ_ONCE(ctx->submitter_task);
if (unlikely(!task))
return -EOWNERDEAD;
init_task_work(&msg->tw, io_msg_tw_fd_complete);
if (task_work_add(task, &msg->tw, TWA_SIGNAL))
return -EOWNERDEAD;
return IOU_ISSUE_SKIP_COMPLETE;
}
static int io_msg_send_fd(struct io_kiocb *req, unsigned int issue_flags) static int io_msg_send_fd(struct io_kiocb *req, unsigned int issue_flags)
{ {
struct io_ring_ctx *target_ctx = req->file->private_data; struct io_ring_ctx *target_ctx = req->file->private_data;
...@@ -238,7 +242,7 @@ static int io_msg_send_fd(struct io_kiocb *req, unsigned int issue_flags) ...@@ -238,7 +242,7 @@ static int io_msg_send_fd(struct io_kiocb *req, unsigned int issue_flags)
} }
if (io_msg_need_remote(target_ctx)) if (io_msg_need_remote(target_ctx))
return io_msg_exec_remote(req, io_msg_tw_fd_complete); return io_msg_fd_remote(req);
return io_msg_install_complete(req, issue_flags); return io_msg_install_complete(req, issue_flags);
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment