Commit 54e60e50 authored by Linus Torvalds's avatar Linus Torvalds

Merge tag 'for-6.2/io_uring-2022-12-08' of git://git.kernel.dk/linux

Pull io_uring updates from Jens Axboe:

 - Always ensure proper ordering in case of CQ ring overflow, which then
   means we can remove some work-arounds for that (Dylan)

 - Support completion batching for multishot, greatly increasing the
   efficiency for those (Dylan)

 - Flag epoll/eventfd wakeups done from io_uring, so that we can easily
   tell if we're recursing into io_uring again.

   Previously, this would have resulted in repeated multishot
   notifications if we had a dependency there. That could happen if an
   eventfd was registered as the ring eventfd, and we multishot polled
   for events on it. Or if an io_uring fd was added to epoll, and
   io_uring had a multishot request for the epoll fd.

   Test cases here:
	https://git.kernel.dk/cgit/liburing/commit/?id=919755a7d0096fda08fb6d65ac54ad8d0fe027cd

   Previously these got terminated when the CQ ring eventually
   overflowed, now it's handled gracefully (me).

 - Tightening of the IOPOLL based completions (Pavel)

 - Optimizations of the networking zero-copy paths (Pavel)

 - Various tweaks and fixes (Dylan, Pavel)

* tag 'for-6.2/io_uring-2022-12-08' of git://git.kernel.dk/linux: (41 commits)
  io_uring: keep unlock_post inlined in hot path
  io_uring: don't use complete_post in kbuf
  io_uring: spelling fix
  io_uring: remove io_req_complete_post_tw
  io_uring: allow multishot polled reqs to defer completion
  io_uring: remove overflow param from io_post_aux_cqe
  io_uring: add lockdep assertion in io_fill_cqe_aux
  io_uring: make io_fill_cqe_aux static
  io_uring: add io_aux_cqe which allows deferred completion
  io_uring: allow defer completion for aux posted cqes
  io_uring: defer all io_req_complete_failed
  io_uring: always lock in io_apoll_task_func
  io_uring: remove iopoll spinlock
  io_uring: iopoll protect complete_post
  io_uring: inline __io_req_complete_put()
  io_uring: remove io_req_tw_post_queue
  io_uring: use io_req_task_complete() in timeout
  io_uring: hold locks for io_req_complete_failed
  io_uring: add completion locking for iopoll
  io_uring: kill io_cqring_ev_posted() and __io_cq_unlock_post()
  ...
parents d523ec4c 5d772916
...@@ -43,21 +43,7 @@ struct eventfd_ctx { ...@@ -43,21 +43,7 @@ struct eventfd_ctx {
int id; int id;
}; };
/** __u64 eventfd_signal_mask(struct eventfd_ctx *ctx, __u64 n, unsigned mask)
* eventfd_signal - Adds @n to the eventfd counter.
* @ctx: [in] Pointer to the eventfd context.
* @n: [in] Value of the counter to be added to the eventfd internal counter.
* The value cannot be negative.
*
* This function is supposed to be called by the kernel in paths that do not
* allow sleeping. In this function we allow the counter to reach the ULLONG_MAX
* value, and we signal this as overflow condition by returning a EPOLLERR
* to poll(2).
*
* Returns the amount by which the counter was incremented. This will be less
* than @n if the counter has overflowed.
*/
__u64 eventfd_signal(struct eventfd_ctx *ctx, __u64 n)
{ {
unsigned long flags; unsigned long flags;
...@@ -78,12 +64,31 @@ __u64 eventfd_signal(struct eventfd_ctx *ctx, __u64 n) ...@@ -78,12 +64,31 @@ __u64 eventfd_signal(struct eventfd_ctx *ctx, __u64 n)
n = ULLONG_MAX - ctx->count; n = ULLONG_MAX - ctx->count;
ctx->count += n; ctx->count += n;
if (waitqueue_active(&ctx->wqh)) if (waitqueue_active(&ctx->wqh))
wake_up_locked_poll(&ctx->wqh, EPOLLIN); wake_up_locked_poll(&ctx->wqh, EPOLLIN | mask);
current->in_eventfd = 0; current->in_eventfd = 0;
spin_unlock_irqrestore(&ctx->wqh.lock, flags); spin_unlock_irqrestore(&ctx->wqh.lock, flags);
return n; return n;
} }
/**
* eventfd_signal - Adds @n to the eventfd counter.
* @ctx: [in] Pointer to the eventfd context.
* @n: [in] Value of the counter to be added to the eventfd internal counter.
* The value cannot be negative.
*
* This function is supposed to be called by the kernel in paths that do not
* allow sleeping. In this function we allow the counter to reach the ULLONG_MAX
* value, and we signal this as overflow condition by returning a EPOLLERR
* to poll(2).
*
* Returns the amount by which the counter was incremented. This will be less
* than @n if the counter has overflowed.
*/
__u64 eventfd_signal(struct eventfd_ctx *ctx, __u64 n)
{
return eventfd_signal_mask(ctx, n, 0);
}
EXPORT_SYMBOL_GPL(eventfd_signal); EXPORT_SYMBOL_GPL(eventfd_signal);
static void eventfd_free_ctx(struct eventfd_ctx *ctx) static void eventfd_free_ctx(struct eventfd_ctx *ctx)
......
...@@ -491,7 +491,8 @@ static inline void ep_set_busy_poll_napi_id(struct epitem *epi) ...@@ -491,7 +491,8 @@ static inline void ep_set_busy_poll_napi_id(struct epitem *epi)
*/ */
#ifdef CONFIG_DEBUG_LOCK_ALLOC #ifdef CONFIG_DEBUG_LOCK_ALLOC
static void ep_poll_safewake(struct eventpoll *ep, struct epitem *epi) static void ep_poll_safewake(struct eventpoll *ep, struct epitem *epi,
unsigned pollflags)
{ {
struct eventpoll *ep_src; struct eventpoll *ep_src;
unsigned long flags; unsigned long flags;
...@@ -522,16 +523,17 @@ static void ep_poll_safewake(struct eventpoll *ep, struct epitem *epi) ...@@ -522,16 +523,17 @@ static void ep_poll_safewake(struct eventpoll *ep, struct epitem *epi)
} }
spin_lock_irqsave_nested(&ep->poll_wait.lock, flags, nests); spin_lock_irqsave_nested(&ep->poll_wait.lock, flags, nests);
ep->nests = nests + 1; ep->nests = nests + 1;
wake_up_locked_poll(&ep->poll_wait, EPOLLIN); wake_up_locked_poll(&ep->poll_wait, EPOLLIN | pollflags);
ep->nests = 0; ep->nests = 0;
spin_unlock_irqrestore(&ep->poll_wait.lock, flags); spin_unlock_irqrestore(&ep->poll_wait.lock, flags);
} }
#else #else
static void ep_poll_safewake(struct eventpoll *ep, struct epitem *epi) static void ep_poll_safewake(struct eventpoll *ep, struct epitem *epi,
unsigned pollflags)
{ {
wake_up_poll(&ep->poll_wait, EPOLLIN); wake_up_poll(&ep->poll_wait, EPOLLIN | pollflags);
} }
#endif #endif
...@@ -742,7 +744,7 @@ static void ep_free(struct eventpoll *ep) ...@@ -742,7 +744,7 @@ static void ep_free(struct eventpoll *ep)
/* We need to release all tasks waiting for these file */ /* We need to release all tasks waiting for these file */
if (waitqueue_active(&ep->poll_wait)) if (waitqueue_active(&ep->poll_wait))
ep_poll_safewake(ep, NULL); ep_poll_safewake(ep, NULL, 0);
/* /*
* We need to lock this because we could be hit by * We need to lock this because we could be hit by
...@@ -1208,7 +1210,7 @@ static int ep_poll_callback(wait_queue_entry_t *wait, unsigned mode, int sync, v ...@@ -1208,7 +1210,7 @@ static int ep_poll_callback(wait_queue_entry_t *wait, unsigned mode, int sync, v
/* We have to call this outside the lock */ /* We have to call this outside the lock */
if (pwake) if (pwake)
ep_poll_safewake(ep, epi); ep_poll_safewake(ep, epi, pollflags & EPOLL_URING_WAKE);
if (!(epi->event.events & EPOLLEXCLUSIVE)) if (!(epi->event.events & EPOLLEXCLUSIVE))
ewake = 1; ewake = 1;
...@@ -1553,7 +1555,7 @@ static int ep_insert(struct eventpoll *ep, const struct epoll_event *event, ...@@ -1553,7 +1555,7 @@ static int ep_insert(struct eventpoll *ep, const struct epoll_event *event,
/* We have to call this outside the lock */ /* We have to call this outside the lock */
if (pwake) if (pwake)
ep_poll_safewake(ep, NULL); ep_poll_safewake(ep, NULL, 0);
return 0; return 0;
} }
...@@ -1629,7 +1631,7 @@ static int ep_modify(struct eventpoll *ep, struct epitem *epi, ...@@ -1629,7 +1631,7 @@ static int ep_modify(struct eventpoll *ep, struct epitem *epi,
/* We have to call this outside the lock */ /* We have to call this outside the lock */
if (pwake) if (pwake)
ep_poll_safewake(ep, NULL); ep_poll_safewake(ep, NULL, 0);
return 0; return 0;
} }
......
...@@ -40,6 +40,7 @@ struct file *eventfd_fget(int fd); ...@@ -40,6 +40,7 @@ struct file *eventfd_fget(int fd);
struct eventfd_ctx *eventfd_ctx_fdget(int fd); struct eventfd_ctx *eventfd_ctx_fdget(int fd);
struct eventfd_ctx *eventfd_ctx_fileget(struct file *file); struct eventfd_ctx *eventfd_ctx_fileget(struct file *file);
__u64 eventfd_signal(struct eventfd_ctx *ctx, __u64 n); __u64 eventfd_signal(struct eventfd_ctx *ctx, __u64 n);
__u64 eventfd_signal_mask(struct eventfd_ctx *ctx, __u64 n, unsigned mask);
int eventfd_ctx_remove_wait_queue(struct eventfd_ctx *ctx, wait_queue_entry_t *wait, int eventfd_ctx_remove_wait_queue(struct eventfd_ctx *ctx, wait_queue_entry_t *wait,
__u64 *cnt); __u64 *cnt);
void eventfd_ctx_do_read(struct eventfd_ctx *ctx, __u64 *cnt); void eventfd_ctx_do_read(struct eventfd_ctx *ctx, __u64 *cnt);
...@@ -66,6 +67,12 @@ static inline int eventfd_signal(struct eventfd_ctx *ctx, __u64 n) ...@@ -66,6 +67,12 @@ static inline int eventfd_signal(struct eventfd_ctx *ctx, __u64 n)
return -ENOSYS; return -ENOSYS;
} }
static inline int eventfd_signal_mask(struct eventfd_ctx *ctx, __u64 n,
unsigned mask)
{
return -ENOSYS;
}
static inline void eventfd_ctx_put(struct eventfd_ctx *ctx) static inline void eventfd_ctx_put(struct eventfd_ctx *ctx)
{ {
......
...@@ -174,7 +174,9 @@ struct io_submit_state { ...@@ -174,7 +174,9 @@ struct io_submit_state {
bool plug_started; bool plug_started;
bool need_plug; bool need_plug;
unsigned short submit_nr; unsigned short submit_nr;
unsigned int cqes_count;
struct blk_plug plug; struct blk_plug plug;
struct io_uring_cqe cqes[16];
}; };
struct io_ev_fd { struct io_ev_fd {
......
...@@ -41,6 +41,12 @@ ...@@ -41,6 +41,12 @@
#define EPOLLMSG (__force __poll_t)0x00000400 #define EPOLLMSG (__force __poll_t)0x00000400
#define EPOLLRDHUP (__force __poll_t)0x00002000 #define EPOLLRDHUP (__force __poll_t)0x00002000
/*
* Internal flag - wakeup generated by io_uring, used to detect recursion back
* into the io_uring poll handler.
*/
#define EPOLL_URING_WAKE ((__force __poll_t)(1U << 27))
/* Set exclusive wakeup mode for the target file descriptor */ /* Set exclusive wakeup mode for the target file descriptor */
#define EPOLLEXCLUSIVE ((__force __poll_t)(1U << 28)) #define EPOLLEXCLUSIVE ((__force __poll_t)(1U << 28))
......
...@@ -296,10 +296,28 @@ enum io_uring_op { ...@@ -296,10 +296,28 @@ enum io_uring_op {
* *
* IORING_RECVSEND_FIXED_BUF Use registered buffers, the index is stored in * IORING_RECVSEND_FIXED_BUF Use registered buffers, the index is stored in
* the buf_index field. * the buf_index field.
*
* IORING_SEND_ZC_REPORT_USAGE
* If set, SEND[MSG]_ZC should report
* the zerocopy usage in cqe.res
* for the IORING_CQE_F_NOTIF cqe.
* 0 is reported if zerocopy was actually possible.
* IORING_NOTIF_USAGE_ZC_COPIED if data was copied
* (at least partially).
*/ */
#define IORING_RECVSEND_POLL_FIRST (1U << 0) #define IORING_RECVSEND_POLL_FIRST (1U << 0)
#define IORING_RECV_MULTISHOT (1U << 1) #define IORING_RECV_MULTISHOT (1U << 1)
#define IORING_RECVSEND_FIXED_BUF (1U << 2) #define IORING_RECVSEND_FIXED_BUF (1U << 2)
#define IORING_SEND_ZC_REPORT_USAGE (1U << 3)
/*
* cqe.res for IORING_CQE_F_NOTIF if
* IORING_SEND_ZC_REPORT_USAGE was requested
*
* It should be treated as a flag, all other
* bits of cqe.res should be treated as reserved!
*/
#define IORING_NOTIF_USAGE_ZC_COPIED (1U << 31)
/* /*
* accept flags stored in sqe->ioprio * accept flags stored in sqe->ioprio
......
This diff is collapsed.
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
#include <linux/errno.h> #include <linux/errno.h>
#include <linux/lockdep.h> #include <linux/lockdep.h>
#include <linux/io_uring_types.h> #include <linux/io_uring_types.h>
#include <uapi/linux/eventpoll.h>
#include "io-wq.h" #include "io-wq.h"
#include "slist.h" #include "slist.h"
#include "filetable.h" #include "filetable.h"
...@@ -29,14 +30,11 @@ bool io_req_cqe_overflow(struct io_kiocb *req); ...@@ -29,14 +30,11 @@ bool io_req_cqe_overflow(struct io_kiocb *req);
int io_run_task_work_sig(struct io_ring_ctx *ctx); int io_run_task_work_sig(struct io_ring_ctx *ctx);
int __io_run_local_work(struct io_ring_ctx *ctx, bool *locked); int __io_run_local_work(struct io_ring_ctx *ctx, bool *locked);
int io_run_local_work(struct io_ring_ctx *ctx); int io_run_local_work(struct io_ring_ctx *ctx);
void io_req_complete_failed(struct io_kiocb *req, s32 res); void io_req_defer_failed(struct io_kiocb *req, s32 res);
void __io_req_complete(struct io_kiocb *req, unsigned issue_flags); void io_req_complete_post(struct io_kiocb *req, unsigned issue_flags);
void io_req_complete_post(struct io_kiocb *req); bool io_post_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags);
void __io_req_complete_post(struct io_kiocb *req); bool io_aux_cqe(struct io_ring_ctx *ctx, bool defer, u64 user_data, s32 res, u32 cflags,
bool io_post_aux_cqe(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags, bool allow_overflow);
bool allow_overflow);
bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data, s32 res, u32 cflags,
bool allow_overflow);
void __io_commit_cqring_flush(struct io_ring_ctx *ctx); void __io_commit_cqring_flush(struct io_ring_ctx *ctx);
struct page **io_pin_pages(unsigned long ubuf, unsigned long len, int *npages); struct page **io_pin_pages(unsigned long ubuf, unsigned long len, int *npages);
...@@ -50,10 +48,9 @@ static inline bool io_req_ffs_set(struct io_kiocb *req) ...@@ -50,10 +48,9 @@ static inline bool io_req_ffs_set(struct io_kiocb *req)
return req->flags & REQ_F_FIXED_FILE; return req->flags & REQ_F_FIXED_FILE;
} }
void __io_req_task_work_add(struct io_kiocb *req, bool allow_local);
bool io_is_uring_fops(struct file *file); bool io_is_uring_fops(struct file *file);
bool io_alloc_async_data(struct io_kiocb *req); bool io_alloc_async_data(struct io_kiocb *req);
void io_req_task_work_add(struct io_kiocb *req);
void io_req_tw_post_queue(struct io_kiocb *req, s32 res, u32 cflags);
void io_req_task_queue(struct io_kiocb *req); void io_req_task_queue(struct io_kiocb *req);
void io_queue_iowq(struct io_kiocb *req, bool *dont_use); void io_queue_iowq(struct io_kiocb *req, bool *dont_use);
void io_req_task_complete(struct io_kiocb *req, bool *locked); void io_req_task_complete(struct io_kiocb *req, bool *locked);
...@@ -82,6 +79,11 @@ bool __io_alloc_req_refill(struct io_ring_ctx *ctx); ...@@ -82,6 +79,11 @@ bool __io_alloc_req_refill(struct io_ring_ctx *ctx);
bool io_match_task_safe(struct io_kiocb *head, struct task_struct *task, bool io_match_task_safe(struct io_kiocb *head, struct task_struct *task,
bool cancel_all); bool cancel_all);
static inline void io_req_task_work_add(struct io_kiocb *req)
{
__io_req_task_work_add(req, true);
}
#define io_for_each_link(pos, head) \ #define io_for_each_link(pos, head) \
for (pos = (head); pos; pos = pos->link) for (pos = (head); pos; pos = pos->link)
...@@ -207,12 +209,18 @@ static inline void io_commit_cqring(struct io_ring_ctx *ctx) ...@@ -207,12 +209,18 @@ static inline void io_commit_cqring(struct io_ring_ctx *ctx)
static inline void __io_cqring_wake(struct io_ring_ctx *ctx) static inline void __io_cqring_wake(struct io_ring_ctx *ctx)
{ {
/* /*
* wake_up_all() may seem excessive, but io_wake_function() and * Trigger waitqueue handler on all waiters on our waitqueue. This
* io_should_wake() handle the termination of the loop and only * won't necessarily wake up all the tasks, io_should_wake() will make
* wake as many waiters as we need to. * that decision.
*
* Pass in EPOLLIN|EPOLL_URING_WAKE as the poll wakeup key. The latter
* set in the mask so that if we recurse back into our own poll
* waitqueue handlers, we know we have a dependency between eventfd or
* epoll and should terminate multishot poll at that point.
*/ */
if (waitqueue_active(&ctx->cq_wait)) if (waitqueue_active(&ctx->cq_wait))
wake_up_all(&ctx->cq_wait); __wake_up(&ctx->cq_wait, TASK_NORMAL, 0,
poll_to_key(EPOLL_URING_WAKE | EPOLLIN));
} }
static inline void io_cqring_wake(struct io_ring_ctx *ctx) static inline void io_cqring_wake(struct io_ring_ctx *ctx)
...@@ -369,4 +377,11 @@ static inline bool io_allowed_run_tw(struct io_ring_ctx *ctx) ...@@ -369,4 +377,11 @@ static inline bool io_allowed_run_tw(struct io_ring_ctx *ctx)
ctx->submitter_task == current); ctx->submitter_task == current);
} }
static inline void io_req_queue_tw_complete(struct io_kiocb *req, s32 res)
{
io_req_set_res(req, res, 0);
req->io_task_work.func = io_req_task_complete;
io_req_task_work_add(req);
}
#endif #endif
...@@ -306,14 +306,11 @@ int io_remove_buffers(struct io_kiocb *req, unsigned int issue_flags) ...@@ -306,14 +306,11 @@ int io_remove_buffers(struct io_kiocb *req, unsigned int issue_flags)
if (!bl->buf_nr_pages) if (!bl->buf_nr_pages)
ret = __io_remove_buffers(ctx, bl, p->nbufs); ret = __io_remove_buffers(ctx, bl, p->nbufs);
} }
io_ring_submit_unlock(ctx, issue_flags);
if (ret < 0) if (ret < 0)
req_set_fail(req); req_set_fail(req);
/* complete before unlock, IOPOLL may need the lock */
io_req_set_res(req, ret, 0); io_req_set_res(req, ret, 0);
__io_req_complete(req, issue_flags); return IOU_OK;
io_ring_submit_unlock(ctx, issue_flags);
return IOU_ISSUE_SKIP_COMPLETE;
} }
int io_provide_buffers_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) int io_provide_buffers_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
...@@ -458,13 +455,12 @@ int io_provide_buffers(struct io_kiocb *req, unsigned int issue_flags) ...@@ -458,13 +455,12 @@ int io_provide_buffers(struct io_kiocb *req, unsigned int issue_flags)
ret = io_add_buffers(ctx, p, bl); ret = io_add_buffers(ctx, p, bl);
err: err:
io_ring_submit_unlock(ctx, issue_flags);
if (ret < 0) if (ret < 0)
req_set_fail(req); req_set_fail(req);
/* complete before unlock, IOPOLL may need the lock */
io_req_set_res(req, ret, 0); io_req_set_res(req, ret, 0);
__io_req_complete(req, issue_flags); return IOU_OK;
io_ring_submit_unlock(ctx, issue_flags);
return IOU_ISSUE_SKIP_COMPLETE;
} }
int io_register_pbuf_ring(struct io_ring_ctx *ctx, void __user *arg) int io_register_pbuf_ring(struct io_ring_ctx *ctx, void __user *arg)
......
...@@ -31,7 +31,7 @@ static int io_msg_ring_data(struct io_kiocb *req) ...@@ -31,7 +31,7 @@ static int io_msg_ring_data(struct io_kiocb *req)
if (msg->src_fd || msg->dst_fd || msg->flags) if (msg->src_fd || msg->dst_fd || msg->flags)
return -EINVAL; return -EINVAL;
if (io_post_aux_cqe(target_ctx, msg->user_data, msg->len, 0, true)) if (io_post_aux_cqe(target_ctx, msg->user_data, msg->len, 0))
return 0; return 0;
return -EOVERFLOW; return -EOVERFLOW;
...@@ -116,7 +116,7 @@ static int io_msg_send_fd(struct io_kiocb *req, unsigned int issue_flags) ...@@ -116,7 +116,7 @@ static int io_msg_send_fd(struct io_kiocb *req, unsigned int issue_flags)
* completes with -EOVERFLOW, then the sender must ensure that a * completes with -EOVERFLOW, then the sender must ensure that a
* later IORING_OP_MSG_RING delivers the message. * later IORING_OP_MSG_RING delivers the message.
*/ */
if (!io_post_aux_cqe(target_ctx, msg->user_data, msg->len, 0, true)) if (!io_post_aux_cqe(target_ctx, msg->user_data, msg->len, 0))
ret = -EOVERFLOW; ret = -EOVERFLOW;
out_unlock: out_unlock:
io_double_unlock_ctx(ctx, target_ctx, issue_flags); io_double_unlock_ctx(ctx, target_ctx, issue_flags);
......
...@@ -125,13 +125,15 @@ static struct io_async_msghdr *io_msg_alloc_async(struct io_kiocb *req, ...@@ -125,13 +125,15 @@ static struct io_async_msghdr *io_msg_alloc_async(struct io_kiocb *req,
struct io_cache_entry *entry; struct io_cache_entry *entry;
struct io_async_msghdr *hdr; struct io_async_msghdr *hdr;
if (!(issue_flags & IO_URING_F_UNLOCKED) && if (!(issue_flags & IO_URING_F_UNLOCKED)) {
(entry = io_alloc_cache_get(&ctx->netmsg_cache)) != NULL) { entry = io_alloc_cache_get(&ctx->netmsg_cache);
hdr = container_of(entry, struct io_async_msghdr, cache); if (entry) {
hdr->free_iov = NULL; hdr = container_of(entry, struct io_async_msghdr, cache);
req->flags |= REQ_F_ASYNC_DATA; hdr->free_iov = NULL;
req->async_data = hdr; req->flags |= REQ_F_ASYNC_DATA;
return hdr; req->async_data = hdr;
return hdr;
}
} }
if (!io_alloc_async_data(req)) { if (!io_alloc_async_data(req)) {
...@@ -599,16 +601,12 @@ static inline bool io_recv_finish(struct io_kiocb *req, int *ret, ...@@ -599,16 +601,12 @@ static inline bool io_recv_finish(struct io_kiocb *req, int *ret,
} }
if (!mshot_finished) { if (!mshot_finished) {
if (io_post_aux_cqe(req->ctx, req->cqe.user_data, *ret, if (io_aux_cqe(req->ctx, issue_flags & IO_URING_F_COMPLETE_DEFER,
cflags | IORING_CQE_F_MORE, false)) { req->cqe.user_data, *ret, cflags | IORING_CQE_F_MORE, true)) {
io_recv_prep_retry(req); io_recv_prep_retry(req);
return false; return false;
} }
/* /* Otherwise stop multishot but use the current result. */
* Otherwise stop multishot but use the current result.
* Probably will end up going into overflow, but this means
* we cannot trust the ordering anymore
*/
} }
io_req_set_res(req, *ret, cflags); io_req_set_res(req, *ret, cflags);
...@@ -923,6 +921,9 @@ void io_send_zc_cleanup(struct io_kiocb *req) ...@@ -923,6 +921,9 @@ void io_send_zc_cleanup(struct io_kiocb *req)
} }
} }
#define IO_ZC_FLAGS_COMMON (IORING_RECVSEND_POLL_FIRST | IORING_RECVSEND_FIXED_BUF)
#define IO_ZC_FLAGS_VALID (IO_ZC_FLAGS_COMMON | IORING_SEND_ZC_REPORT_USAGE)
int io_send_zc_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) int io_send_zc_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
{ {
struct io_sr_msg *zc = io_kiocb_to_cmd(req, struct io_sr_msg); struct io_sr_msg *zc = io_kiocb_to_cmd(req, struct io_sr_msg);
...@@ -935,10 +936,6 @@ int io_send_zc_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) ...@@ -935,10 +936,6 @@ int io_send_zc_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
if (req->flags & REQ_F_CQE_SKIP) if (req->flags & REQ_F_CQE_SKIP)
return -EINVAL; return -EINVAL;
zc->flags = READ_ONCE(sqe->ioprio);
if (zc->flags & ~(IORING_RECVSEND_POLL_FIRST |
IORING_RECVSEND_FIXED_BUF))
return -EINVAL;
notif = zc->notif = io_alloc_notif(ctx); notif = zc->notif = io_alloc_notif(ctx);
if (!notif) if (!notif)
return -ENOMEM; return -ENOMEM;
...@@ -946,6 +943,17 @@ int io_send_zc_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) ...@@ -946,6 +943,17 @@ int io_send_zc_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
notif->cqe.res = 0; notif->cqe.res = 0;
notif->cqe.flags = IORING_CQE_F_NOTIF; notif->cqe.flags = IORING_CQE_F_NOTIF;
req->flags |= REQ_F_NEED_CLEANUP; req->flags |= REQ_F_NEED_CLEANUP;
zc->flags = READ_ONCE(sqe->ioprio);
if (unlikely(zc->flags & ~IO_ZC_FLAGS_COMMON)) {
if (zc->flags & ~IO_ZC_FLAGS_VALID)
return -EINVAL;
if (zc->flags & IORING_SEND_ZC_REPORT_USAGE) {
io_notif_set_extended(notif);
io_notif_to_data(notif)->zc_report = true;
}
}
if (zc->flags & IORING_RECVSEND_FIXED_BUF) { if (zc->flags & IORING_RECVSEND_FIXED_BUF) {
unsigned idx = READ_ONCE(sqe->buf_index); unsigned idx = READ_ONCE(sqe->buf_index);
...@@ -1087,6 +1095,7 @@ int io_send_zc(struct io_kiocb *req, unsigned int issue_flags) ...@@ -1087,6 +1095,7 @@ int io_send_zc(struct io_kiocb *req, unsigned int issue_flags)
return ret; return ret;
msg.sg_from_iter = io_sg_from_iter; msg.sg_from_iter = io_sg_from_iter;
} else { } else {
io_notif_set_extended(zc->notif);
ret = import_single_range(ITER_SOURCE, zc->buf, zc->len, &iov, ret = import_single_range(ITER_SOURCE, zc->buf, zc->len, &iov,
&msg.msg_iter); &msg.msg_iter);
if (unlikely(ret)) if (unlikely(ret))
...@@ -1148,6 +1157,8 @@ int io_sendmsg_zc(struct io_kiocb *req, unsigned int issue_flags) ...@@ -1148,6 +1157,8 @@ int io_sendmsg_zc(struct io_kiocb *req, unsigned int issue_flags)
unsigned flags; unsigned flags;
int ret, min_ret = 0; int ret, min_ret = 0;
io_notif_set_extended(sr->notif);
sock = sock_from_file(req->file); sock = sock_from_file(req->file);
if (unlikely(!sock)) if (unlikely(!sock))
return -ENOTSOCK; return -ENOTSOCK;
...@@ -1307,12 +1318,13 @@ int io_accept(struct io_kiocb *req, unsigned int issue_flags) ...@@ -1307,12 +1318,13 @@ int io_accept(struct io_kiocb *req, unsigned int issue_flags)
return IOU_OK; return IOU_OK;
} }
if (ret >= 0 && if (ret < 0)
io_post_aux_cqe(ctx, req->cqe.user_data, ret, IORING_CQE_F_MORE, false)) return ret;
if (io_aux_cqe(ctx, issue_flags & IO_URING_F_COMPLETE_DEFER,
req->cqe.user_data, ret, IORING_CQE_F_MORE, true))
goto retry; goto retry;
io_req_set_res(req, ret, 0); return -ECANCELED;
return (issue_flags & IO_URING_F_MULTISHOT) ? IOU_STOP_MULTISHOT : IOU_OK;
} }
int io_socket_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) int io_socket_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe)
......
...@@ -9,11 +9,14 @@ ...@@ -9,11 +9,14 @@
#include "notif.h" #include "notif.h"
#include "rsrc.h" #include "rsrc.h"
static void __io_notif_complete_tw(struct io_kiocb *notif, bool *locked) static void io_notif_complete_tw_ext(struct io_kiocb *notif, bool *locked)
{ {
struct io_notif_data *nd = io_notif_to_data(notif); struct io_notif_data *nd = io_notif_to_data(notif);
struct io_ring_ctx *ctx = notif->ctx; struct io_ring_ctx *ctx = notif->ctx;
if (nd->zc_report && (nd->zc_copied || !nd->zc_used))
notif->cqe.res |= IORING_NOTIF_USAGE_ZC_COPIED;
if (nd->account_pages && ctx->user) { if (nd->account_pages && ctx->user) {
__io_unaccount_mem(ctx->user, nd->account_pages); __io_unaccount_mem(ctx->user, nd->account_pages);
nd->account_pages = 0; nd->account_pages = 0;
...@@ -21,16 +24,41 @@ static void __io_notif_complete_tw(struct io_kiocb *notif, bool *locked) ...@@ -21,16 +24,41 @@ static void __io_notif_complete_tw(struct io_kiocb *notif, bool *locked)
io_req_task_complete(notif, locked); io_req_task_complete(notif, locked);
} }
static void io_uring_tx_zerocopy_callback(struct sk_buff *skb, static void io_tx_ubuf_callback(struct sk_buff *skb, struct ubuf_info *uarg,
struct ubuf_info *uarg, bool success)
bool success)
{ {
struct io_notif_data *nd = container_of(uarg, struct io_notif_data, uarg); struct io_notif_data *nd = container_of(uarg, struct io_notif_data, uarg);
struct io_kiocb *notif = cmd_to_io_kiocb(nd); struct io_kiocb *notif = cmd_to_io_kiocb(nd);
if (refcount_dec_and_test(&uarg->refcnt)) { if (refcount_dec_and_test(&uarg->refcnt))
notif->io_task_work.func = __io_notif_complete_tw;
io_req_task_work_add(notif); io_req_task_work_add(notif);
}
static void io_tx_ubuf_callback_ext(struct sk_buff *skb, struct ubuf_info *uarg,
bool success)
{
struct io_notif_data *nd = container_of(uarg, struct io_notif_data, uarg);
if (nd->zc_report) {
if (success && !nd->zc_used && skb)
WRITE_ONCE(nd->zc_used, true);
else if (!success && !nd->zc_copied)
WRITE_ONCE(nd->zc_copied, true);
}
io_tx_ubuf_callback(skb, uarg, success);
}
void io_notif_set_extended(struct io_kiocb *notif)
{
struct io_notif_data *nd = io_notif_to_data(notif);
if (nd->uarg.callback != io_tx_ubuf_callback_ext) {
nd->account_pages = 0;
nd->zc_report = false;
nd->zc_used = false;
nd->zc_copied = false;
nd->uarg.callback = io_tx_ubuf_callback_ext;
notif->io_task_work.func = io_notif_complete_tw_ext;
} }
} }
...@@ -49,24 +77,11 @@ struct io_kiocb *io_alloc_notif(struct io_ring_ctx *ctx) ...@@ -49,24 +77,11 @@ struct io_kiocb *io_alloc_notif(struct io_ring_ctx *ctx)
notif->task = current; notif->task = current;
io_get_task_refs(1); io_get_task_refs(1);
notif->rsrc_node = NULL; notif->rsrc_node = NULL;
io_req_set_rsrc_node(notif, ctx, 0); notif->io_task_work.func = io_req_task_complete;
nd = io_notif_to_data(notif); nd = io_notif_to_data(notif);
nd->account_pages = 0;
nd->uarg.flags = SKBFL_ZEROCOPY_FRAG | SKBFL_DONT_ORPHAN; nd->uarg.flags = SKBFL_ZEROCOPY_FRAG | SKBFL_DONT_ORPHAN;
nd->uarg.callback = io_uring_tx_zerocopy_callback; nd->uarg.callback = io_tx_ubuf_callback;
refcount_set(&nd->uarg.refcnt, 1); refcount_set(&nd->uarg.refcnt, 1);
return notif; return notif;
} }
void io_notif_flush(struct io_kiocb *notif)
__must_hold(&slot->notif->ctx->uring_lock)
{
struct io_notif_data *nd = io_notif_to_data(notif);
/* drop slot's master ref */
if (refcount_dec_and_test(&nd->uarg.refcnt)) {
notif->io_task_work.func = __io_notif_complete_tw;
io_req_task_work_add(notif);
}
}
...@@ -13,16 +13,29 @@ struct io_notif_data { ...@@ -13,16 +13,29 @@ struct io_notif_data {
struct file *file; struct file *file;
struct ubuf_info uarg; struct ubuf_info uarg;
unsigned long account_pages; unsigned long account_pages;
bool zc_report;
bool zc_used;
bool zc_copied;
}; };
void io_notif_flush(struct io_kiocb *notif);
struct io_kiocb *io_alloc_notif(struct io_ring_ctx *ctx); struct io_kiocb *io_alloc_notif(struct io_ring_ctx *ctx);
void io_notif_set_extended(struct io_kiocb *notif);
static inline struct io_notif_data *io_notif_to_data(struct io_kiocb *notif) static inline struct io_notif_data *io_notif_to_data(struct io_kiocb *notif)
{ {
return io_kiocb_to_cmd(notif, struct io_notif_data); return io_kiocb_to_cmd(notif, struct io_notif_data);
} }
static inline void io_notif_flush(struct io_kiocb *notif)
__must_hold(&notif->ctx->uring_lock)
{
struct io_notif_data *nd = io_notif_to_data(notif);
/* drop slot's master ref */
if (refcount_dec_and_test(&nd->uarg.refcnt))
io_req_task_work_add(notif);
}
static inline int io_notif_account_mem(struct io_kiocb *notif, unsigned len) static inline int io_notif_account_mem(struct io_kiocb *notif, unsigned len)
{ {
struct io_ring_ctx *ctx = notif->ctx; struct io_ring_ctx *ctx = notif->ctx;
......
...@@ -280,16 +280,14 @@ static int io_poll_check_events(struct io_kiocb *req, bool *locked) ...@@ -280,16 +280,14 @@ static int io_poll_check_events(struct io_kiocb *req, bool *locked)
continue; continue;
if (req->apoll_events & EPOLLONESHOT) if (req->apoll_events & EPOLLONESHOT)
return IOU_POLL_DONE; return IOU_POLL_DONE;
if (io_is_uring_fops(req->file))
return IOU_POLL_DONE;
/* multishot, just fill a CQE and proceed */ /* multishot, just fill a CQE and proceed */
if (!(req->flags & REQ_F_APOLL_MULTISHOT)) { if (!(req->flags & REQ_F_APOLL_MULTISHOT)) {
__poll_t mask = mangle_poll(req->cqe.res & __poll_t mask = mangle_poll(req->cqe.res &
req->apoll_events); req->apoll_events);
if (!io_post_aux_cqe(ctx, req->cqe.user_data, if (!io_aux_cqe(ctx, *locked, req->cqe.user_data,
mask, IORING_CQE_F_MORE, false)) { mask, IORING_CQE_F_MORE, false)) {
io_req_set_res(req, mask, 0); io_req_set_res(req, mask, 0);
return IOU_POLL_REMOVE_POLL_USE_RES; return IOU_POLL_REMOVE_POLL_USE_RES;
} }
...@@ -345,26 +343,22 @@ static void io_apoll_task_func(struct io_kiocb *req, bool *locked) ...@@ -345,26 +343,22 @@ static void io_apoll_task_func(struct io_kiocb *req, bool *locked)
if (ret == IOU_POLL_NO_ACTION) if (ret == IOU_POLL_NO_ACTION)
return; return;
io_tw_lock(req->ctx, locked);
io_poll_remove_entries(req); io_poll_remove_entries(req);
io_poll_tw_hash_eject(req, locked); io_poll_tw_hash_eject(req, locked);
if (ret == IOU_POLL_REMOVE_POLL_USE_RES) if (ret == IOU_POLL_REMOVE_POLL_USE_RES)
io_req_complete_post(req); io_req_task_complete(req, locked);
else if (ret == IOU_POLL_DONE) else if (ret == IOU_POLL_DONE)
io_req_task_submit(req, locked); io_req_task_submit(req, locked);
else else
io_req_complete_failed(req, ret); io_req_defer_failed(req, ret);
} }
static void __io_poll_execute(struct io_kiocb *req, int mask) static void __io_poll_execute(struct io_kiocb *req, int mask)
{ {
io_req_set_res(req, mask, 0); io_req_set_res(req, mask, 0);
/*
* This is useful for poll that is armed on behalf of another
* request, and where the wakeup path could be on a different
* CPU. We want to avoid pulling in req->apoll->events for that
* case.
*/
if (req->opcode == IORING_OP_POLL_ADD) if (req->opcode == IORING_OP_POLL_ADD)
req->io_task_work.func = io_poll_task_func; req->io_task_work.func = io_poll_task_func;
else else
...@@ -429,6 +423,14 @@ static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync, ...@@ -429,6 +423,14 @@ static int io_poll_wake(struct wait_queue_entry *wait, unsigned mode, int sync,
return 0; return 0;
if (io_poll_get_ownership(req)) { if (io_poll_get_ownership(req)) {
/*
* If we trigger a multishot poll off our own wakeup path,
* disable multishot as there is a circular dependency between
* CQ posting and triggering the event.
*/
if (mask & EPOLL_URING_WAKE)
poll->events |= EPOLLONESHOT;
/* optional, saves extra locking for removal in tw handler */ /* optional, saves extra locking for removal in tw handler */
if (mask && poll->events & EPOLLONESHOT) { if (mask && poll->events & EPOLLONESHOT) {
list_del_init(&poll->wait.entry); list_del_init(&poll->wait.entry);
...@@ -648,10 +650,13 @@ static struct async_poll *io_req_alloc_apoll(struct io_kiocb *req, ...@@ -648,10 +650,13 @@ static struct async_poll *io_req_alloc_apoll(struct io_kiocb *req,
if (req->flags & REQ_F_POLLED) { if (req->flags & REQ_F_POLLED) {
apoll = req->apoll; apoll = req->apoll;
kfree(apoll->double_poll); kfree(apoll->double_poll);
} else if (!(issue_flags & IO_URING_F_UNLOCKED) && } else if (!(issue_flags & IO_URING_F_UNLOCKED)) {
(entry = io_alloc_cache_get(&ctx->apoll_cache)) != NULL) { entry = io_alloc_cache_get(&ctx->apoll_cache);
if (entry == NULL)
goto alloc_apoll;
apoll = container_of(entry, struct async_poll, cache); apoll = container_of(entry, struct async_poll, cache);
} else { } else {
alloc_apoll:
apoll = kmalloc(sizeof(*apoll), GFP_ATOMIC); apoll = kmalloc(sizeof(*apoll), GFP_ATOMIC);
if (unlikely(!apoll)) if (unlikely(!apoll))
return NULL; return NULL;
......
...@@ -170,10 +170,10 @@ static void __io_rsrc_put_work(struct io_rsrc_node *ref_node) ...@@ -170,10 +170,10 @@ static void __io_rsrc_put_work(struct io_rsrc_node *ref_node)
if (prsrc->tag) { if (prsrc->tag) {
if (ctx->flags & IORING_SETUP_IOPOLL) { if (ctx->flags & IORING_SETUP_IOPOLL) {
mutex_lock(&ctx->uring_lock); mutex_lock(&ctx->uring_lock);
io_post_aux_cqe(ctx, prsrc->tag, 0, 0, true); io_post_aux_cqe(ctx, prsrc->tag, 0, 0);
mutex_unlock(&ctx->uring_lock); mutex_unlock(&ctx->uring_lock);
} else { } else {
io_post_aux_cqe(ctx, prsrc->tag, 0, 0, true); io_post_aux_cqe(ctx, prsrc->tag, 0, 0);
} }
} }
...@@ -321,6 +321,11 @@ __cold static int io_rsrc_ref_quiesce(struct io_rsrc_data *data, ...@@ -321,6 +321,11 @@ __cold static int io_rsrc_ref_quiesce(struct io_rsrc_data *data,
if (atomic_dec_and_test(&data->refs)) if (atomic_dec_and_test(&data->refs))
break; break;
mutex_unlock(&ctx->uring_lock); mutex_unlock(&ctx->uring_lock);
ret = io_run_task_work_sig(ctx);
if (ret < 0)
goto reinit;
flush_delayed_work(&ctx->rsrc_put_work); flush_delayed_work(&ctx->rsrc_put_work);
ret = wait_for_completion_interruptible(&data->done); ret = wait_for_completion_interruptible(&data->done);
if (!ret) { if (!ret) {
...@@ -336,12 +341,12 @@ __cold static int io_rsrc_ref_quiesce(struct io_rsrc_data *data, ...@@ -336,12 +341,12 @@ __cold static int io_rsrc_ref_quiesce(struct io_rsrc_data *data,
} }
} }
reinit:
atomic_inc(&data->refs); atomic_inc(&data->refs);
/* wait for all works potentially completing data->done */ /* wait for all works potentially completing data->done */
flush_delayed_work(&ctx->rsrc_put_work); flush_delayed_work(&ctx->rsrc_put_work);
reinit_completion(&data->done); reinit_completion(&data->done);
ret = io_run_task_work_sig(ctx);
mutex_lock(&ctx->uring_lock); mutex_lock(&ctx->uring_lock);
} while (ret >= 0); } while (ret >= 0);
data->quiesce = false; data->quiesce = false;
......
...@@ -286,6 +286,12 @@ static inline int io_fixup_rw_res(struct io_kiocb *req, long res) ...@@ -286,6 +286,12 @@ static inline int io_fixup_rw_res(struct io_kiocb *req, long res)
static void io_req_rw_complete(struct io_kiocb *req, bool *locked) static void io_req_rw_complete(struct io_kiocb *req, bool *locked)
{ {
io_req_io_end(req); io_req_io_end(req);
if (req->flags & (REQ_F_BUFFER_SELECTED|REQ_F_BUFFER_RING)) {
unsigned issue_flags = *locked ? 0 : IO_URING_F_UNLOCKED;
req->cqe.flags |= io_put_kbuf(req, issue_flags);
}
io_req_task_complete(req, locked); io_req_task_complete(req, locked);
} }
......
...@@ -63,7 +63,7 @@ static bool io_kill_timeout(struct io_kiocb *req, int status) ...@@ -63,7 +63,7 @@ static bool io_kill_timeout(struct io_kiocb *req, int status)
atomic_set(&req->ctx->cq_timeouts, atomic_set(&req->ctx->cq_timeouts,
atomic_read(&req->ctx->cq_timeouts) + 1); atomic_read(&req->ctx->cq_timeouts) + 1);
list_del_init(&timeout->list); list_del_init(&timeout->list);
io_req_tw_post_queue(req, status, 0); io_req_queue_tw_complete(req, status);
return true; return true;
} }
return false; return false;
...@@ -159,7 +159,7 @@ void io_disarm_next(struct io_kiocb *req) ...@@ -159,7 +159,7 @@ void io_disarm_next(struct io_kiocb *req)
req->flags &= ~REQ_F_ARM_LTIMEOUT; req->flags &= ~REQ_F_ARM_LTIMEOUT;
if (link && link->opcode == IORING_OP_LINK_TIMEOUT) { if (link && link->opcode == IORING_OP_LINK_TIMEOUT) {
io_remove_next_linked(req); io_remove_next_linked(req);
io_req_tw_post_queue(link, -ECANCELED, 0); io_req_queue_tw_complete(link, -ECANCELED);
} }
} else if (req->flags & REQ_F_LINK_TIMEOUT) { } else if (req->flags & REQ_F_LINK_TIMEOUT) {
struct io_ring_ctx *ctx = req->ctx; struct io_ring_ctx *ctx = req->ctx;
...@@ -168,7 +168,7 @@ void io_disarm_next(struct io_kiocb *req) ...@@ -168,7 +168,7 @@ void io_disarm_next(struct io_kiocb *req)
link = io_disarm_linked_timeout(req); link = io_disarm_linked_timeout(req);
spin_unlock_irq(&ctx->timeout_lock); spin_unlock_irq(&ctx->timeout_lock);
if (link) if (link)
io_req_tw_post_queue(link, -ECANCELED, 0); io_req_queue_tw_complete(link, -ECANCELED);
} }
if (unlikely((req->flags & REQ_F_FAIL) && if (unlikely((req->flags & REQ_F_FAIL) &&
!(req->flags & REQ_F_HARDLINK))) !(req->flags & REQ_F_HARDLINK)))
...@@ -282,11 +282,11 @@ static void io_req_task_link_timeout(struct io_kiocb *req, bool *locked) ...@@ -282,11 +282,11 @@ static void io_req_task_link_timeout(struct io_kiocb *req, bool *locked)
ret = io_try_cancel(req->task->io_uring, &cd, issue_flags); ret = io_try_cancel(req->task->io_uring, &cd, issue_flags);
} }
io_req_set_res(req, ret ?: -ETIME, 0); io_req_set_res(req, ret ?: -ETIME, 0);
io_req_complete_post(req); io_req_task_complete(req, locked);
io_put_req(prev); io_put_req(prev);
} else { } else {
io_req_set_res(req, -ETIME, 0); io_req_set_res(req, -ETIME, 0);
io_req_complete_post(req); io_req_task_complete(req, locked);
} }
} }
......
...@@ -56,7 +56,7 @@ void io_uring_cmd_done(struct io_uring_cmd *ioucmd, ssize_t ret, ssize_t res2) ...@@ -56,7 +56,7 @@ void io_uring_cmd_done(struct io_uring_cmd *ioucmd, ssize_t ret, ssize_t res2)
/* order with io_iopoll_req_issued() checking ->iopoll_complete */ /* order with io_iopoll_req_issued() checking ->iopoll_complete */
smp_store_release(&req->iopoll_completed, 1); smp_store_release(&req->iopoll_completed, 1);
else else
__io_req_complete(req, 0); io_req_complete_post(req, 0);
} }
EXPORT_SYMBOL_GPL(io_uring_cmd_done); EXPORT_SYMBOL_GPL(io_uring_cmd_done);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment