Commit 49f1c68e authored by Pavel Begunkov's avatar Pavel Begunkov Committed by Jens Axboe

io_uring: optimise submission side poll_refs

The final poll_refs put in __io_arm_poll_handler() takes quite some
cycles. When we're arming from the original task context task_work won't
be run, so in this case we can assume that we won't race with task_works
and so not take the initial ownership ref.

One caveat is that after arming a poll we may race with it, so we have
to add a bunch of io_poll_get_ownership() hidden inside of
io_poll_can_finish_inline() whenever we want to complete arming inline.
For the same reason we can't just set REQ_F_DOUBLE_POLL in
__io_queue_proc() and so need to sync with the first poll entry by
taking its wq head lock.
Signed-off-by: default avatarPavel Begunkov <asml.silence@gmail.com>
Link: https://lore.kernel.org/r/8825315d7f5e182ac1578a031e546f79b1c97d01.1655990418.git.asml.silence@gmail.comSigned-off-by: default avatarJens Axboe <axboe@kernel.dk>
parent de08356f
...@@ -34,6 +34,7 @@ struct io_poll_table { ...@@ -34,6 +34,7 @@ struct io_poll_table {
struct io_kiocb *req; struct io_kiocb *req;
int nr_entries; int nr_entries;
int error; int error;
bool owning;
/* output value, set only if arm poll returns >0 */ /* output value, set only if arm poll returns >0 */
__poll_t result_mask; __poll_t result_mask;
}; };
...@@ -374,6 +375,27 @@ static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync, ...@@ -374,6 +375,27 @@ static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
return 1; return 1;
} }
static void io_poll_double_prepare(struct io_kiocb *req)
{
struct wait_queue_head *head;
struct io_poll *poll = io_poll_get_single(req);
/* head is RCU protected, see io_poll_remove_entries() comments */
rcu_read_lock();
head = smp_load_acquire(&poll->head);
if (head) {
/*
* poll arm may not hold ownership and so race with
* io_poll_wake() by modifying req->flags. There is only one
* poll entry queued, serialise with it by taking its head lock.
*/
spin_lock_irq(&head->lock);
req->flags |= REQ_F_DOUBLE_POLL;
spin_unlock_irq(&head->lock);
}
rcu_read_unlock();
}
static void __io_queue_proc(struct io_poll *poll, struct io_poll_table *pt, static void __io_queue_proc(struct io_poll *poll, struct io_poll_table *pt,
struct wait_queue_head *head, struct wait_queue_head *head,
struct io_poll **poll_ptr) struct io_poll **poll_ptr)
...@@ -405,16 +427,19 @@ static void __io_queue_proc(struct io_poll *poll, struct io_poll_table *pt, ...@@ -405,16 +427,19 @@ static void __io_queue_proc(struct io_poll *poll, struct io_poll_table *pt,
pt->error = -ENOMEM; pt->error = -ENOMEM;
return; return;
} }
io_poll_double_prepare(req);
/* mark as double wq entry */ /* mark as double wq entry */
wqe_private |= IO_WQE_F_DOUBLE; wqe_private |= IO_WQE_F_DOUBLE;
req->flags |= REQ_F_DOUBLE_POLL;
io_init_poll_iocb(poll, first->events, first->wait.func); io_init_poll_iocb(poll, first->events, first->wait.func);
*poll_ptr = poll; *poll_ptr = poll;
if (req->opcode == IORING_OP_POLL_ADD) if (req->opcode == IORING_OP_POLL_ADD)
req->flags |= REQ_F_ASYNC_DATA; req->flags |= REQ_F_ASYNC_DATA;
} else {
/* fine to modify, there is no poll queued to race with us */
req->flags |= REQ_F_SINGLE_POLL;
} }
req->flags |= REQ_F_SINGLE_POLL;
pt->nr_entries++; pt->nr_entries++;
poll->head = head; poll->head = head;
poll->wait.private = (void *) wqe_private; poll->wait.private = (void *) wqe_private;
...@@ -435,6 +460,12 @@ static void io_poll_queue_proc(struct file *file, struct wait_queue_head *head, ...@@ -435,6 +460,12 @@ static void io_poll_queue_proc(struct file *file, struct wait_queue_head *head,
(struct io_poll **) &pt->req->async_data); (struct io_poll **) &pt->req->async_data);
} }
static bool io_poll_can_finish_inline(struct io_kiocb *req,
struct io_poll_table *pt)
{
return pt->owning || io_poll_get_ownership(req);
}
/* /*
* Returns 0 when it's handed over for polling. The caller owns the requests if * Returns 0 when it's handed over for polling. The caller owns the requests if
* it returns non-zero, but otherwise should not touch it. Negative values * it returns non-zero, but otherwise should not touch it. Negative values
...@@ -443,7 +474,8 @@ static void io_poll_queue_proc(struct file *file, struct wait_queue_head *head, ...@@ -443,7 +474,8 @@ static void io_poll_queue_proc(struct file *file, struct wait_queue_head *head,
*/ */
static int __io_arm_poll_handler(struct io_kiocb *req, static int __io_arm_poll_handler(struct io_kiocb *req,
struct io_poll *poll, struct io_poll *poll,
struct io_poll_table *ipt, __poll_t mask) struct io_poll_table *ipt, __poll_t mask,
unsigned issue_flags)
{ {
struct io_ring_ctx *ctx = req->ctx; struct io_ring_ctx *ctx = req->ctx;
int v; int v;
...@@ -452,34 +484,45 @@ static int __io_arm_poll_handler(struct io_kiocb *req, ...@@ -452,34 +484,45 @@ static int __io_arm_poll_handler(struct io_kiocb *req,
req->work.cancel_seq = atomic_read(&ctx->cancel_seq); req->work.cancel_seq = atomic_read(&ctx->cancel_seq);
io_init_poll_iocb(poll, mask, io_poll_wake); io_init_poll_iocb(poll, mask, io_poll_wake);
poll->file = req->file; poll->file = req->file;
req->apoll_events = poll->events; req->apoll_events = poll->events;
ipt->pt._key = mask; ipt->pt._key = mask;
ipt->req = req; ipt->req = req;
ipt->error = 0; ipt->error = 0;
ipt->nr_entries = 0; ipt->nr_entries = 0;
/* /*
* Take the ownership to delay any tw execution up until we're done * Polling is either completed here or via task_work, so if we're in the
* with poll arming. see io_poll_get_ownership(). * task context we're naturally serialised with tw by merit of running
* the same task. When it's io-wq, take the ownership to prevent tw
* from running. However, when we're in the task context, skip taking
* it as an optimisation.
*
* Note: even though the request won't be completed/freed, without
* ownership we still can race with io_poll_wake().
* io_poll_can_finish_inline() tries to deal with that.
*/ */
atomic_set(&req->poll_refs, 1); ipt->owning = issue_flags & IO_URING_F_UNLOCKED;
atomic_set(&req->poll_refs, (int)ipt->owning);
mask = vfs_poll(req->file, &ipt->pt) & poll->events; mask = vfs_poll(req->file, &ipt->pt) & poll->events;
if (unlikely(ipt->error || !ipt->nr_entries)) { if (unlikely(ipt->error || !ipt->nr_entries)) {
io_poll_remove_entries(req); io_poll_remove_entries(req);
if (mask && (poll->events & EPOLLET)) { if (!io_poll_can_finish_inline(req, ipt)) {
io_poll_mark_cancelled(req);
return 0;
} else if (mask && (poll->events & EPOLLET)) {
ipt->result_mask = mask; ipt->result_mask = mask;
return 1; return 1;
} else {
return ipt->error ?: -EINVAL;
} }
return ipt->error ?: -EINVAL;
} }
if (mask && if (mask &&
((poll->events & (EPOLLET|EPOLLONESHOT)) == (EPOLLET|EPOLLONESHOT))) { ((poll->events & (EPOLLET|EPOLLONESHOT)) == (EPOLLET|EPOLLONESHOT))) {
if (!io_poll_can_finish_inline(req, ipt))
return 0;
io_poll_remove_entries(req); io_poll_remove_entries(req);
ipt->result_mask = mask; ipt->result_mask = mask;
/* no one else has access to the req, forget about the ref */ /* no one else has access to the req, forget about the ref */
...@@ -491,11 +534,13 @@ static int __io_arm_poll_handler(struct io_kiocb *req, ...@@ -491,11 +534,13 @@ static int __io_arm_poll_handler(struct io_kiocb *req,
else else
io_poll_req_insert(req); io_poll_req_insert(req);
if (mask && (poll->events & EPOLLET)) { if (mask && (poll->events & EPOLLET) &&
io_poll_can_finish_inline(req, ipt)) {
__io_poll_execute(req, mask); __io_poll_execute(req, mask);
return 0; return 0;
} }
if (ipt->owning) {
/* /*
* Release ownership. If someone tried to queue a tw while it was * Release ownership. If someone tried to queue a tw while it was
* locked, kick it off for them. * locked, kick it off for them.
...@@ -503,6 +548,7 @@ static int __io_arm_poll_handler(struct io_kiocb *req, ...@@ -503,6 +548,7 @@ static int __io_arm_poll_handler(struct io_kiocb *req,
v = atomic_dec_return(&req->poll_refs); v = atomic_dec_return(&req->poll_refs);
if (unlikely(v & IO_POLL_REF_MASK)) if (unlikely(v & IO_POLL_REF_MASK))
__io_poll_execute(req, 0); __io_poll_execute(req, 0);
}
return 0; return 0;
} }
...@@ -585,7 +631,7 @@ int io_arm_poll_handler(struct io_kiocb *req, unsigned issue_flags) ...@@ -585,7 +631,7 @@ int io_arm_poll_handler(struct io_kiocb *req, unsigned issue_flags)
io_kbuf_recycle(req, issue_flags); io_kbuf_recycle(req, issue_flags);
ret = __io_arm_poll_handler(req, &apoll->poll, &ipt, mask); ret = __io_arm_poll_handler(req, &apoll->poll, &ipt, mask, issue_flags);
if (ret) if (ret)
return ret > 0 ? IO_APOLL_READY : IO_APOLL_ABORTED; return ret > 0 ? IO_APOLL_READY : IO_APOLL_ABORTED;
trace_io_uring_poll_arm(req, mask, apoll->poll.events); trace_io_uring_poll_arm(req, mask, apoll->poll.events);
...@@ -817,7 +863,7 @@ int io_poll_add(struct io_kiocb *req, unsigned int issue_flags) ...@@ -817,7 +863,7 @@ int io_poll_add(struct io_kiocb *req, unsigned int issue_flags)
else else
req->flags &= ~REQ_F_HASH_LOCKED; req->flags &= ~REQ_F_HASH_LOCKED;
ret = __io_arm_poll_handler(req, poll, &ipt, poll->events); ret = __io_arm_poll_handler(req, poll, &ipt, poll->events, issue_flags);
if (ret > 0) { if (ret > 0) {
io_req_set_res(req, ipt.result_mask, 0); io_req_set_res(req, ipt.result_mask, 0);
return IOU_OK; return IOU_OK;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment