Commit adc8682e authored by Olivier Langlois's avatar Olivier Langlois Committed by Jens Axboe

io_uring: Add support for napi_busy_poll

The sqpoll thread can be used for performing the napi busy poll in a
similar way that it does io polling for file systems supporting direct
access bypassing the page cache.

The other way that io_uring can be used for napi busy poll is by
calling io_uring_enter() to get events.

If the user specify a timeout value, it is distributed between polling
and sleeping by using the systemwide setting
/proc/sys/net/core/busy_poll.

The changes have been tested with this program:
https://github.com/lano1106/io_uring_udp_ping

and the result is:
Without sqpoll:
NAPI busy loop disabled:
rtt min/avg/max/mdev = 40.631/42.050/58.667/1.547 us
NAPI busy loop enabled:
rtt min/avg/max/mdev = 30.619/31.753/61.433/1.456 us

With sqpoll:
NAPI busy loop disabled:
rtt min/avg/max/mdev = 42.087/44.438/59.508/1.533 us
NAPI busy loop enabled:
rtt min/avg/max/mdev = 35.779/37.347/52.201/0.924 us
Co-developed-by: default avatarHao Xu <haoxu@linux.alibaba.com>
Signed-off-by: default avatarHao Xu <haoxu@linux.alibaba.com>
Signed-off-by: default avatarOlivier Langlois <olivier@trillion01.com>
Link: https://lore.kernel.org/r/810bd9408ffc510ff08269e78dca9df4af0b9e4e.1646777484.git.olivier@trillion01.comSigned-off-by: default avatarJens Axboe <axboe@kernel.dk>
parent 950e79dd
......@@ -63,6 +63,7 @@
#include <net/sock.h>
#include <net/af_unix.h>
#include <net/scm.h>
#include <net/busy_poll.h>
#include <linux/anon_inodes.h>
#include <linux/sched/mm.h>
#include <linux/uaccess.h>
......@@ -401,6 +402,11 @@ struct io_ring_ctx {
struct list_head sqd_list;
unsigned long check_cq_overflow;
#ifdef CONFIG_NET_RX_BUSY_POLL
/* used to track busy poll napi_id */
struct list_head napi_list;
spinlock_t napi_lock; /* napi_list lock */
#endif
struct {
unsigned cached_cq_tail;
......@@ -1513,6 +1519,10 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
INIT_WQ_LIST(&ctx->locked_free_list);
INIT_DELAYED_WORK(&ctx->fallback_work, io_fallback_req_func);
INIT_WQ_LIST(&ctx->submit_state.compl_reqs);
#ifdef CONFIG_NET_RX_BUSY_POLL
INIT_LIST_HEAD(&ctx->napi_list);
spin_lock_init(&ctx->napi_lock);
#endif
return ctx;
err:
kfree(ctx->dummy_ubuf);
......@@ -5581,6 +5591,108 @@ IO_NETOP_FN(send);
IO_NETOP_FN(recv);
#endif /* CONFIG_NET */
#ifdef CONFIG_NET_RX_BUSY_POLL
#define NAPI_TIMEOUT (60 * SEC_CONVERSION)
struct napi_entry {
struct list_head list;
unsigned int napi_id;
unsigned long timeout;
};
/*
* Add busy poll NAPI ID from sk.
*/
static void io_add_napi(struct file *file, struct io_ring_ctx *ctx)
{
unsigned int napi_id;
struct socket *sock;
struct sock *sk;
struct napi_entry *ne;
if (!net_busy_loop_on())
return;
sock = sock_from_file(file);
if (!sock)
return;
sk = sock->sk;
if (!sk)
return;
napi_id = READ_ONCE(sk->sk_napi_id);
/* Non-NAPI IDs can be rejected */
if (napi_id < MIN_NAPI_ID)
return;
spin_lock(&ctx->napi_lock);
list_for_each_entry(ne, &ctx->napi_list, list) {
if (ne->napi_id == napi_id) {
ne->timeout = jiffies + NAPI_TIMEOUT;
goto out;
}
}
ne = kmalloc(sizeof(*ne), GFP_NOWAIT);
if (!ne)
goto out;
ne->napi_id = napi_id;
ne->timeout = jiffies + NAPI_TIMEOUT;
list_add_tail(&ne->list, &ctx->napi_list);
out:
spin_unlock(&ctx->napi_lock);
}
static inline void io_check_napi_entry_timeout(struct napi_entry *ne)
{
if (time_after(jiffies, ne->timeout)) {
list_del(&ne->list);
kfree(ne);
}
}
/*
* Busy poll if globally on and supporting sockets found
*/
static bool io_napi_busy_loop(struct list_head *napi_list)
{
struct napi_entry *ne, *n;
list_for_each_entry_safe(ne, n, napi_list, list) {
napi_busy_loop(ne->napi_id, NULL, NULL, true,
BUSY_POLL_BUDGET);
io_check_napi_entry_timeout(ne);
}
return !list_empty(napi_list);
}
static void io_free_napi_list(struct io_ring_ctx *ctx)
{
spin_lock(&ctx->napi_lock);
while (!list_empty(&ctx->napi_list)) {
struct napi_entry *ne =
list_first_entry(&ctx->napi_list, struct napi_entry,
list);
list_del(&ne->list);
kfree(ne);
}
spin_unlock(&ctx->napi_lock);
}
#else
static inline void io_add_napi(struct file *file, struct io_ring_ctx *ctx)
{
}
static inline void io_free_napi_list(struct io_ring_ctx *ctx)
{
}
#endif /* CONFIG_NET_RX_BUSY_POLL */
struct io_poll_table {
struct poll_table_struct pt;
struct io_kiocb *req;
......@@ -5727,6 +5839,7 @@ static int io_poll_check_events(struct io_kiocb *req)
if (unlikely(!filled))
return -ECANCELED;
io_cqring_ev_posted(ctx);
io_add_napi(req->file, ctx);
} else if (req->result) {
return 0;
}
......@@ -5959,6 +6072,7 @@ static int __io_arm_poll_handler(struct io_kiocb *req,
__io_poll_execute(req, mask);
return 0;
}
io_add_napi(req->file, req->ctx);
/*
* Release ownership. If someone tried to queue a tw while it was
......@@ -7706,7 +7820,13 @@ static int __io_sq_thread(struct io_ring_ctx *ctx, bool cap_entries)
!(ctx->flags & IORING_SETUP_R_DISABLED))
ret = io_submit_sqes(ctx, to_submit);
mutex_unlock(&ctx->uring_lock);
#ifdef CONFIG_NET_RX_BUSY_POLL
spin_lock(&ctx->napi_lock);
if (!list_empty(&ctx->napi_list) &&
io_napi_busy_loop(&ctx->napi_list))
++ret;
spin_unlock(&ctx->napi_lock);
#endif
if (to_submit && wq_has_sleeper(&ctx->sqo_sq_wait))
wake_up(&ctx->sqo_sq_wait);
if (creds)
......@@ -7837,6 +7957,9 @@ struct io_wait_queue {
struct io_ring_ctx *ctx;
unsigned cq_tail;
unsigned nr_timeouts;
#ifdef CONFIG_NET_RX_BUSY_POLL
unsigned busy_poll_to;
#endif
};
static inline bool io_should_wake(struct io_wait_queue *iowq)
......@@ -7898,6 +8021,87 @@ static inline int io_cqring_wait_schedule(struct io_ring_ctx *ctx,
return 1;
}
#ifdef CONFIG_NET_RX_BUSY_POLL
static void io_adjust_busy_loop_timeout(struct timespec64 *ts,
struct io_wait_queue *iowq)
{
unsigned busy_poll_to = READ_ONCE(sysctl_net_busy_poll);
struct timespec64 pollto = ns_to_timespec64(1000 * (s64)busy_poll_to);
if (timespec64_compare(ts, &pollto) > 0) {
*ts = timespec64_sub(*ts, pollto);
iowq->busy_poll_to = busy_poll_to;
} else {
u64 to = timespec64_to_ns(ts);
do_div(to, 1000);
iowq->busy_poll_to = to;
ts->tv_sec = 0;
ts->tv_nsec = 0;
}
}
static inline bool io_busy_loop_timeout(unsigned long start_time,
unsigned long bp_usec)
{
if (bp_usec) {
unsigned long end_time = start_time + bp_usec;
unsigned long now = busy_loop_current_time();
return time_after(now, end_time);
}
return true;
}
static bool io_busy_loop_end(void *p, unsigned long start_time)
{
struct io_wait_queue *iowq = p;
return signal_pending(current) ||
io_should_wake(iowq) ||
io_busy_loop_timeout(start_time, iowq->busy_poll_to);
}
static void io_blocking_napi_busy_loop(struct list_head *napi_list,
struct io_wait_queue *iowq)
{
unsigned long start_time =
list_is_singular(napi_list) ? 0 :
busy_loop_current_time();
do {
if (list_is_singular(napi_list)) {
struct napi_entry *ne =
list_first_entry(napi_list,
struct napi_entry, list);
napi_busy_loop(ne->napi_id, io_busy_loop_end, iowq,
true, BUSY_POLL_BUDGET);
io_check_napi_entry_timeout(ne);
break;
}
} while (io_napi_busy_loop(napi_list) &&
!io_busy_loop_end(iowq, start_time));
}
static void io_putback_napi_list(struct io_ring_ctx *ctx,
struct list_head *napi_list)
{
struct napi_entry *cne, *lne;
spin_lock(&ctx->napi_lock);
list_for_each_entry(cne, &ctx->napi_list, list)
list_for_each_entry(lne, napi_list, list)
if (cne->napi_id == lne->napi_id) {
list_del(&lne->list);
kfree(lne);
break;
}
list_splice(napi_list, &ctx->napi_list);
spin_unlock(&ctx->napi_lock);
}
#endif /* CONFIG_NET_RX_BUSY_POLL */
/*
* Wait until events become available, if we don't already have some. The
* application must reap them itself, as they reside on the shared cq ring.
......@@ -7910,6 +8114,9 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
struct io_rings *rings = ctx->rings;
ktime_t timeout = KTIME_MAX;
int ret;
#ifdef CONFIG_NET_RX_BUSY_POLL
LIST_HEAD(local_napi_list);
#endif
do {
io_cqring_overflow_flush(ctx);
......@@ -7932,13 +8139,29 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
return ret;
}
#ifdef CONFIG_NET_RX_BUSY_POLL
iowq.busy_poll_to = 0;
if (!(ctx->flags & IORING_SETUP_SQPOLL)) {
spin_lock(&ctx->napi_lock);
list_splice_init(&ctx->napi_list, &local_napi_list);
spin_unlock(&ctx->napi_lock);
}
#endif
if (uts) {
struct timespec64 ts;
if (get_timespec64(&ts, uts))
return -EFAULT;
#ifdef CONFIG_NET_RX_BUSY_POLL
if (!list_empty(&local_napi_list))
io_adjust_busy_loop_timeout(&ts, &iowq);
#endif
timeout = ktime_add_ns(timespec64_to_ktime(ts), ktime_get_ns());
}
#ifdef CONFIG_NET_RX_BUSY_POLL
else if (!list_empty(&local_napi_list))
iowq.busy_poll_to = READ_ONCE(sysctl_net_busy_poll);
#endif
init_waitqueue_func_entry(&iowq.wq, io_wake_function);
iowq.wq.private = current;
......@@ -7948,6 +8171,12 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
iowq.cq_tail = READ_ONCE(ctx->rings->cq.head) + min_events;
trace_io_uring_cqring_wait(ctx, min_events);
#ifdef CONFIG_NET_RX_BUSY_POLL
if (iowq.busy_poll_to)
io_blocking_napi_busy_loop(&local_napi_list, &iowq);
if (!list_empty(&local_napi_list))
io_putback_napi_list(ctx, &local_napi_list);
#endif
do {
/* if we can't even flush overflow, don't wait for more */
if (!io_cqring_overflow_flush(ctx)) {
......@@ -9709,6 +9938,7 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx)
io_req_caches_free(ctx);
if (ctx->hash_map)
io_wq_put_hash(ctx->hash_map);
io_free_napi_list(ctx);
kfree(ctx->cancel_hash);
kfree(ctx->dummy_ubuf);
kfree(ctx);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment