Commit 561fb04a authored by Jens Axboe's avatar Jens Axboe

io_uring: replace workqueue usage with io-wq

Drop various work-arounds we have for workqueues:

- We no longer need the async_list for tracking sequential IO.

- We don't have to maintain our own mm tracking/setting.

- We don't need a separate workqueue for buffered writes. This didn't
  even work that well to begin with, as it was suboptimal for multiple
  buffered writers on multiple files.

- We can properly cancel pending interruptible work. This fixes
  deadlocks with particularly socket IO, where we cannot cancel them
  when the io_uring is closed. Hence the ring will wait forever for
  these requests to complete, which may never happen. This is different
  from disk IO where we know requests will complete in a finite amount
  of time.

- Due to being able to cancel work interruptible work that is already
  running, we can implement file table support for work. We need that
  for supporting system calls that add to a process file table.

- It gets us one step closer to adding async support for any system
  call.
Signed-off-by: default avatarJens Axboe <axboe@kernel.dk>
parent 771b53d0
......@@ -56,7 +56,6 @@
#include <linux/mmu_context.h>
#include <linux/percpu.h>
#include <linux/slab.h>
#include <linux/workqueue.h>
#include <linux/kthread.h>
#include <linux/blkdev.h>
#include <linux/bvec.h>
......@@ -77,6 +76,7 @@
#include <uapi/linux/io_uring.h>
#include "internal.h"
#include "io-wq.h"
#define IORING_MAX_ENTRIES 32768
#define IORING_MAX_CQ_ENTRIES (2 * IORING_MAX_ENTRIES)
......@@ -165,16 +165,6 @@ struct io_mapped_ubuf {
unsigned int nr_bvecs;
};
struct async_list {
spinlock_t lock;
atomic_t cnt;
struct list_head list;
struct file *file;
off_t io_start;
size_t io_len;
};
struct io_ring_ctx {
struct {
struct percpu_ref refs;
......@@ -209,7 +199,7 @@ struct io_ring_ctx {
} ____cacheline_aligned_in_smp;
/* IO offload */
struct workqueue_struct *sqo_wq[2];
struct io_wq *io_wq;
struct task_struct *sqo_thread; /* if using sq thread polling */
struct mm_struct *sqo_mm;
wait_queue_head_t sqo_wait;
......@@ -262,8 +252,6 @@ struct io_ring_ctx {
struct list_head cancel_list;
} ____cacheline_aligned_in_smp;
struct async_list pending_async[2];
#if defined(CONFIG_UNIX)
struct socket *ring_sock;
#endif
......@@ -333,7 +321,7 @@ struct io_kiocb {
u32 result;
u32 sequence;
struct work_struct work;
struct io_wq_work work;
};
#define IO_PLUG_THRESHOLD 2
......@@ -359,7 +347,7 @@ struct io_submit_state {
unsigned int ios_left;
};
static void io_sq_wq_submit_work(struct work_struct *work);
static void io_wq_submit_work(struct io_wq_work **workptr);
static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data,
long res);
static void __io_free_req(struct io_kiocb *req);
......@@ -391,7 +379,6 @@ static void io_ring_ctx_ref_free(struct percpu_ref *ref)
static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
{
struct io_ring_ctx *ctx;
int i;
ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
if (!ctx)
......@@ -409,11 +396,6 @@ static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
init_completion(&ctx->sqo_thread_started);
mutex_init(&ctx->uring_lock);
init_waitqueue_head(&ctx->wait);
for (i = 0; i < ARRAY_SIZE(ctx->pending_async); i++) {
spin_lock_init(&ctx->pending_async[i].lock);
INIT_LIST_HEAD(&ctx->pending_async[i].list);
atomic_set(&ctx->pending_async[i].cnt, 0);
}
spin_lock_init(&ctx->completion_lock);
INIT_LIST_HEAD(&ctx->poll_list);
INIT_LIST_HEAD(&ctx->cancel_list);
......@@ -479,22 +461,45 @@ static void __io_commit_cqring(struct io_ring_ctx *ctx)
}
}
static inline void io_queue_async_work(struct io_ring_ctx *ctx,
struct io_kiocb *req)
static inline bool io_sqe_needs_user(const struct io_uring_sqe *sqe)
{
int rw = 0;
u8 opcode = READ_ONCE(sqe->opcode);
return !(opcode == IORING_OP_READ_FIXED ||
opcode == IORING_OP_WRITE_FIXED);
}
static inline bool io_prep_async_work(struct io_kiocb *req)
{
bool do_hashed = false;
if (req->submit.sqe) {
switch (req->submit.sqe->opcode) {
case IORING_OP_WRITEV:
case IORING_OP_WRITE_FIXED:
rw = !(req->rw.ki_flags & IOCB_DIRECT);
do_hashed = true;
break;
}
if (io_sqe_needs_user(req->submit.sqe))
req->work.flags |= IO_WQ_WORK_NEEDS_USER;
}
trace_io_uring_queue_async_work(ctx, rw, req, &req->work, req->flags);
queue_work(ctx->sqo_wq[rw], &req->work);
return do_hashed;
}
static inline void io_queue_async_work(struct io_ring_ctx *ctx,
struct io_kiocb *req)
{
bool do_hashed = io_prep_async_work(req);
trace_io_uring_queue_async_work(ctx, do_hashed, req, &req->work,
req->flags);
if (!do_hashed) {
io_wq_enqueue(ctx->io_wq, &req->work);
} else {
io_wq_enqueue_hashed(ctx->io_wq, &req->work,
file_inode(req->file));
}
}
static void io_kill_timeout(struct io_kiocb *req)
......@@ -647,6 +652,7 @@ static struct io_kiocb *io_get_req(struct io_ring_ctx *ctx,
/* one is dropped after submission, the other at completion */
refcount_set(&req->refs, 2);
req->result = 0;
INIT_IO_WORK(&req->work, io_wq_submit_work);
return req;
out:
percpu_ref_put(&ctx->refs);
......@@ -693,12 +699,10 @@ static void io_req_link_next(struct io_kiocb *req, struct io_kiocb **nxtptr)
* If we're in async work, we can continue processing the chain
* in this context instead of having to queue up new async work.
*/
if (nxtptr && current_work()) {
if (nxtptr && current_work())
*nxtptr = nxt;
} else {
INIT_WORK(&nxt->work, io_sq_wq_submit_work);
else
io_queue_async_work(req->ctx, nxt);
}
}
}
......@@ -757,12 +761,10 @@ static void io_put_req(struct io_kiocb *req, struct io_kiocb **nxtptr)
nxt = io_put_req_find_next(req);
if (nxt) {
if (nxtptr) {
if (nxtptr)
*nxtptr = nxt;
} else {
INIT_WORK(&nxt->work, io_sq_wq_submit_work);
else
io_queue_async_work(nxt->ctx, nxt);
}
}
}
......@@ -1324,65 +1326,6 @@ static ssize_t io_import_iovec(struct io_ring_ctx *ctx, int rw,
return import_iovec(rw, buf, sqe_len, UIO_FASTIOV, iovec, iter);
}
static inline bool io_should_merge(struct async_list *al, struct kiocb *kiocb)
{
if (al->file == kiocb->ki_filp) {
off_t start, end;
/*
* Allow merging if we're anywhere in the range of the same
* page. Generally this happens for sub-page reads or writes,
* and it's beneficial to allow the first worker to bring the
* page in and the piggy backed work can then work on the
* cached page.
*/
start = al->io_start & PAGE_MASK;
end = (al->io_start + al->io_len + PAGE_SIZE - 1) & PAGE_MASK;
if (kiocb->ki_pos >= start && kiocb->ki_pos <= end)
return true;
}
al->file = NULL;
return false;
}
/*
* Make a note of the last file/offset/direction we punted to async
* context. We'll use this information to see if we can piggy back a
* sequential request onto the previous one, if it's still hasn't been
* completed by the async worker.
*/
static void io_async_list_note(int rw, struct io_kiocb *req, size_t len)
{
struct async_list *async_list = &req->ctx->pending_async[rw];
struct kiocb *kiocb = &req->rw;
struct file *filp = kiocb->ki_filp;
if (io_should_merge(async_list, kiocb)) {
unsigned long max_bytes;
/* Use 8x RA size as a decent limiter for both reads/writes */
max_bytes = filp->f_ra.ra_pages << (PAGE_SHIFT + 3);
if (!max_bytes)
max_bytes = VM_READAHEAD_PAGES << (PAGE_SHIFT + 3);
/* If max len are exceeded, reset the state */
if (async_list->io_len + len <= max_bytes) {
req->flags |= REQ_F_SEQ_PREV;
async_list->io_len += len;
} else {
async_list->file = NULL;
}
}
/* New file? Reset state. */
if (async_list->file != filp) {
async_list->io_start = kiocb->ki_pos;
async_list->io_len = len;
async_list->file = filp;
}
}
/*
* For files that don't have ->read_iter() and ->write_iter(), handle them
* by looping over ->read() or ->write() manually.
......@@ -1477,13 +1420,10 @@ static int io_read(struct io_kiocb *req, const struct sqe_submit *s,
ret2 > 0 && ret2 < read_size)
ret2 = -EAGAIN;
/* Catch -EAGAIN return for forced non-blocking submission */
if (!force_nonblock || ret2 != -EAGAIN) {
if (!force_nonblock || ret2 != -EAGAIN)
kiocb_done(kiocb, ret2, nxt, s->in_async);
} else {
if (!s->in_async)
io_async_list_note(READ, req, iov_count);
else
ret = -EAGAIN;
}
}
kfree(iovec);
return ret;
......@@ -1517,11 +1457,8 @@ static int io_write(struct io_kiocb *req, const struct sqe_submit *s,
iov_count = iov_iter_count(&iter);
ret = -EAGAIN;
if (force_nonblock && !(kiocb->ki_flags & IOCB_DIRECT)) {
if (!s->in_async)
io_async_list_note(WRITE, req, iov_count);
if (force_nonblock && !(kiocb->ki_flags & IOCB_DIRECT))
goto out_free;
}
ret = rw_verify_area(WRITE, file, &kiocb->ki_pos, iov_count);
if (!ret) {
......@@ -1546,13 +1483,10 @@ static int io_write(struct io_kiocb *req, const struct sqe_submit *s,
ret2 = call_write_iter(file, kiocb, &iter);
else
ret2 = loop_rw_iter(WRITE, file, kiocb, &iter);
if (!force_nonblock || ret2 != -EAGAIN) {
if (!force_nonblock || ret2 != -EAGAIN)
kiocb_done(kiocb, ret2, nxt, s->in_async);
} else {
if (!s->in_async)
io_async_list_note(WRITE, req, iov_count);
else
ret = -EAGAIN;
}
}
out_free:
kfree(iovec);
......@@ -1794,14 +1728,18 @@ static void io_poll_complete(struct io_ring_ctx *ctx, struct io_kiocb *req,
io_commit_cqring(ctx);
}
static void io_poll_complete_work(struct work_struct *work)
static void io_poll_complete_work(struct io_wq_work **workptr)
{
struct io_wq_work *work = *workptr;
struct io_kiocb *req = container_of(work, struct io_kiocb, work);
struct io_poll_iocb *poll = &req->poll;
struct poll_table_struct pt = { ._key = poll->events };
struct io_ring_ctx *ctx = req->ctx;
__poll_t mask = 0;
if (work->flags & IO_WQ_WORK_CANCEL)
WRITE_ONCE(poll->canceled, true);
if (!READ_ONCE(poll->canceled))
mask = vfs_poll(poll->file, &pt) & poll->events;
......@@ -1894,7 +1832,7 @@ static int io_poll_add(struct io_kiocb *req, const struct io_uring_sqe *sqe)
return -EBADF;
req->submit.sqe = NULL;
INIT_WORK(&req->work, io_poll_complete_work);
INIT_IO_WORK(&req->work, io_poll_complete_work);
events = READ_ONCE(sqe->poll_events);
poll->events = demangle_poll(events) | EPOLLERR | EPOLLHUP;
......@@ -2152,7 +2090,6 @@ static int io_req_defer(struct io_ring_ctx *ctx, struct io_kiocb *req,
memcpy(sqe_copy, sqe, sizeof(*sqe_copy));
req->submit.sqe = sqe_copy;
INIT_WORK(&req->work, io_sq_wq_submit_work);
trace_io_uring_defer(ctx, req, false);
list_add_tail(&req->list, &ctx->defer_list);
spin_unlock_irq(&ctx->completion_lock);
......@@ -2235,186 +2172,54 @@ static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
return 0;
}
static struct async_list *io_async_list_from_sqe(struct io_ring_ctx *ctx,
const struct io_uring_sqe *sqe)
{
switch (sqe->opcode) {
case IORING_OP_READV:
case IORING_OP_READ_FIXED:
return &ctx->pending_async[READ];
case IORING_OP_WRITEV:
case IORING_OP_WRITE_FIXED:
return &ctx->pending_async[WRITE];
default:
return NULL;
}
}
static inline bool io_sqe_needs_user(const struct io_uring_sqe *sqe)
{
u8 opcode = READ_ONCE(sqe->opcode);
return !(opcode == IORING_OP_READ_FIXED ||
opcode == IORING_OP_WRITE_FIXED);
}
static void io_sq_wq_submit_work(struct work_struct *work)
static void io_wq_submit_work(struct io_wq_work **workptr)
{
struct io_wq_work *work = *workptr;
struct io_kiocb *req = container_of(work, struct io_kiocb, work);
struct io_ring_ctx *ctx = req->ctx;
struct mm_struct *cur_mm = NULL;
struct async_list *async_list;
LIST_HEAD(req_list);
mm_segment_t old_fs;
int ret;
struct sqe_submit *s = &req->submit;
const struct io_uring_sqe *sqe = s->sqe;
struct io_kiocb *nxt = NULL;
int ret = 0;
async_list = io_async_list_from_sqe(ctx, req->submit.sqe);
restart:
do {
struct sqe_submit *s = &req->submit;
const struct io_uring_sqe *sqe = s->sqe;
unsigned int flags = req->flags;
struct io_kiocb *nxt = NULL;
/* Ensure we clear previously set non-block flag */
req->rw.ki_flags &= ~IOCB_NOWAIT;
/* Ensure we clear previously set non-block flag */
req->rw.ki_flags &= ~IOCB_NOWAIT;
if (work->flags & IO_WQ_WORK_CANCEL)
ret = -ECANCELED;
ret = 0;
if (io_sqe_needs_user(sqe) && !cur_mm) {
if (!mmget_not_zero(ctx->sqo_mm)) {
ret = -EFAULT;
} else {
cur_mm = ctx->sqo_mm;
use_mm(cur_mm);
old_fs = get_fs();
set_fs(USER_DS);
}
}
if (!ret) {
s->has_user = (work->flags & IO_WQ_WORK_HAS_MM) != 0;
s->in_async = true;
do {
ret = __io_submit_sqe(ctx, req, s, &nxt, false);
/*
* We can get EAGAIN for polled IO even though we're
* forcing a sync submission from here, since we can't
* wait for request slots on the block side.
*/
if (ret != -EAGAIN)
break;
cond_resched();
} while (1);
}
if (!ret) {
s->has_user = cur_mm != NULL;
s->in_async = true;
do {
ret = __io_submit_sqe(ctx, req, s, &nxt, false);
/*
* We can get EAGAIN for polled IO even though
* we're forcing a sync submission from here,
* since we can't wait for request slots on the
* block side.
*/
if (ret != -EAGAIN)
break;
cond_resched();
} while (1);
}
/* drop submission reference */
io_put_req(req, NULL);
/* drop submission reference */
if (ret) {
io_cqring_add_event(ctx, sqe->user_data, ret);
io_put_req(req, NULL);
if (ret) {
io_cqring_add_event(ctx, sqe->user_data, ret);
io_put_req(req, NULL);
}
/* async context always use a copy of the sqe */
kfree(sqe);
/* if a dependent link is ready, do that as the next one */
if (!ret && nxt) {
req = nxt;
continue;
}
/* req from defer and link list needn't decrease async cnt */
if (flags & (REQ_F_IO_DRAINED | REQ_F_LINK_DONE))
goto out;
if (!async_list)
break;
if (!list_empty(&req_list)) {
req = list_first_entry(&req_list, struct io_kiocb,
list);
list_del(&req->list);
continue;
}
if (list_empty(&async_list->list))
break;
req = NULL;
spin_lock(&async_list->lock);
if (list_empty(&async_list->list)) {
spin_unlock(&async_list->lock);
break;
}
list_splice_init(&async_list->list, &req_list);
spin_unlock(&async_list->lock);
req = list_first_entry(&req_list, struct io_kiocb, list);
list_del(&req->list);
} while (req);
/*
* Rare case of racing with a submitter. If we find the count has
* dropped to zero AND we have pending work items, then restart
* the processing. This is a tiny race window.
*/
if (async_list) {
ret = atomic_dec_return(&async_list->cnt);
while (!ret && !list_empty(&async_list->list)) {
spin_lock(&async_list->lock);
atomic_inc(&async_list->cnt);
list_splice_init(&async_list->list, &req_list);
spin_unlock(&async_list->lock);
if (!list_empty(&req_list)) {
req = list_first_entry(&req_list,
struct io_kiocb, list);
list_del(&req->list);
goto restart;
}
ret = atomic_dec_return(&async_list->cnt);
}
}
out:
if (cur_mm) {
set_fs(old_fs);
unuse_mm(cur_mm);
mmput(cur_mm);
}
}
/*
* See if we can piggy back onto previously submitted work, that is still
* running. We currently only allow this if the new request is sequential
* to the previous one we punted.
*/
static bool io_add_to_prev_work(struct async_list *list, struct io_kiocb *req)
{
bool ret;
if (!list)
return false;
if (!(req->flags & REQ_F_SEQ_PREV))
return false;
if (!atomic_read(&list->cnt))
return false;
/* async context always use a copy of the sqe */
kfree(sqe);
ret = true;
spin_lock(&list->lock);
list_add_tail(&req->list, &list->list);
/*
* Ensure we see a simultaneous modification from io_sq_wq_submit_work()
*/
smp_mb();
if (!atomic_read(&list->cnt)) {
list_del_init(&req->list);
ret = false;
/* if a dependent link is ready, pass it back */
if (!ret && nxt) {
io_prep_async_work(nxt);
*workptr = &nxt->work;
}
spin_unlock(&list->lock);
trace_io_uring_add_to_prev(req, ret);
return ret;
}
static bool io_op_needs_file(const struct io_uring_sqe *sqe)
......@@ -2488,17 +2293,9 @@ static int __io_queue_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
sqe_copy = kmemdup(s->sqe, sizeof(*sqe_copy), GFP_KERNEL);
if (sqe_copy) {
struct async_list *list;
s->sqe = sqe_copy;
memcpy(&req->submit, s, sizeof(*s));
list = io_async_list_from_sqe(ctx, s->sqe);
if (!io_add_to_prev_work(list, req)) {
if (list)
atomic_inc(&list->cnt);
INIT_WORK(&req->work, io_sq_wq_submit_work);
io_queue_async_work(ctx, req);
}
io_queue_async_work(ctx, req);
/*
* Queued up for async execution, worker will release
......@@ -3109,15 +2906,11 @@ static void io_sq_thread_stop(struct io_ring_ctx *ctx)
static void io_finish_async(struct io_ring_ctx *ctx)
{
int i;
io_sq_thread_stop(ctx);
for (i = 0; i < ARRAY_SIZE(ctx->sqo_wq); i++) {
if (ctx->sqo_wq[i]) {
destroy_workqueue(ctx->sqo_wq[i]);
ctx->sqo_wq[i] = NULL;
}
if (ctx->io_wq) {
io_wq_destroy(ctx->io_wq);
ctx->io_wq = NULL;
}
}
......@@ -3125,11 +2918,9 @@ static void io_finish_async(struct io_ring_ctx *ctx)
static void io_destruct_skb(struct sk_buff *skb)
{
struct io_ring_ctx *ctx = skb->sk->sk_user_data;
int i;
for (i = 0; i < ARRAY_SIZE(ctx->sqo_wq); i++)
if (ctx->sqo_wq[i])
flush_workqueue(ctx->sqo_wq[i]);
if (ctx->io_wq)
io_wq_flush(ctx->io_wq);
unix_destruct_scm(skb);
}
......@@ -3473,6 +3264,7 @@ static int io_sqe_files_update(struct io_ring_ctx *ctx, void __user *arg,
static int io_sq_offload_start(struct io_ring_ctx *ctx,
struct io_uring_params *p)
{
unsigned concurrency;
int ret;
init_waitqueue_head(&ctx->sqo_wait);
......@@ -3516,25 +3308,10 @@ static int io_sq_offload_start(struct io_ring_ctx *ctx,
goto err;
}
/* Do QD, or 2 * CPUS, whatever is smallest */
ctx->sqo_wq[0] = alloc_workqueue("io_ring-wq",
WQ_UNBOUND | WQ_FREEZABLE,
min(ctx->sq_entries - 1, 2 * num_online_cpus()));
if (!ctx->sqo_wq[0]) {
ret = -ENOMEM;
goto err;
}
/*
* This is for buffered writes, where we want to limit the parallelism
* due to file locking in file systems. As "normal" buffered writes
* should parellelize on writeout quite nicely, limit us to having 2
* pending. This avoids massive contention on the inode when doing
* buffered async writes.
*/
ctx->sqo_wq[1] = alloc_workqueue("io_ring-write-wq",
WQ_UNBOUND | WQ_FREEZABLE, 2);
if (!ctx->sqo_wq[1]) {
/* Do QD, or 4 * CPUS, whatever is smallest */
concurrency = min(ctx->sq_entries, 4 * num_online_cpus());
ctx->io_wq = io_wq_create(concurrency, ctx->sqo_mm);
if (!ctx->io_wq) {
ret = -ENOMEM;
goto err;
}
......@@ -3919,6 +3696,10 @@ static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
io_kill_timeouts(ctx);
io_poll_remove_all(ctx);
if (ctx->io_wq)
io_wq_cancel_all(ctx->io_wq);
io_iopoll_reap_events(ctx);
wait_for_completion(&ctx->ctx_done);
io_ring_ctx_free(ctx);
......
......@@ -7,6 +7,8 @@
#include <linux/tracepoint.h>
struct io_wq_work;
/**
* io_uring_create - called after a new io_uring context was prepared
*
......@@ -126,15 +128,15 @@ TRACE_EVENT(io_uring_file_get,
* io_uring_queue_async_work - called before submitting a new async work
*
* @ctx: pointer to a ring context structure
* @rw: type of workqueue, normal or buffered writes
* @hashed: type of workqueue, hashed or normal
* @req: pointer to a submitted request
* @work: pointer to a submitted work_struct
* @work: pointer to a submitted io_wq_work
*
* Allows to trace asynchronous work submission.
*/
TRACE_EVENT(io_uring_queue_async_work,
TP_PROTO(void *ctx, int rw, void * req, struct work_struct *work,
TP_PROTO(void *ctx, int rw, void * req, struct io_wq_work *work,
unsigned int flags),
TP_ARGS(ctx, rw, req, work, flags),
......@@ -143,7 +145,7 @@ TRACE_EVENT(io_uring_queue_async_work,
__field( void *, ctx )
__field( int, rw )
__field( void *, req )
__field( struct work_struct *, work )
__field( struct io_wq_work *, work )
__field( unsigned int, flags )
),
......@@ -157,7 +159,7 @@ TRACE_EVENT(io_uring_queue_async_work,
TP_printk("ring %p, request %p, flags %d, %s queue, work %p",
__entry->ctx, __entry->req, __entry->flags,
__entry->rw ? "buffered" : "normal", __entry->work)
__entry->rw ? "hashed" : "normal", __entry->work)
);
/**
......
......@@ -1548,6 +1548,7 @@ config AIO
config IO_URING
bool "Enable IO uring support" if EXPERT
select ANON_INODES
select IO_WQ
default y
help
This option enables support for the io_uring interface, enabling
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment