Commit 5b9a7bb7 authored by Linus Torvalds's avatar Linus Torvalds

Merge tag 'for-6.4/io_uring-2023-04-21' of git://git.kernel.dk/linux

Pull io_uring updates from Jens Axboe:

 - Cleanup of the io-wq per-node mapping, notably getting rid of it so
   we just have a single io_wq entry per ring (Breno)

 - Followup to the above, move accounting to io_wq as well and
   completely drop struct io_wqe (Gabriel)

 - Enable KASAN for the internal io_uring caches (Breno)

 - Add support for multishot timeouts. Some applications use timeouts to
   wake someone waiting on completion entries, and this makes it a bit
   easier to just have a recurring timer rather than needing to rearm it
   every time (David)

 - Support archs that have shared cache coloring between userspace and
   the kernel, and hence have strict address requirements for mmap'ing
   the ring into userspace. This should only be parisc/hppa. (Helge, me)

 - XFS has supported O_DIRECT writes without needing to lock the inode
   exclusively for a long time, and ext4 now supports it as well. This
   is true for the common cases of not extending the file size. Flag the
   fs as having that feature, and utilize that to avoid serializing
   those writes in io_uring (me)

 - Enable completion batching for uring commands (me)

 - Revert patch adding io_uring restriction to what can be GUP mapped or
   not. This does not belong in io_uring, as io_uring isn't really
   special in this regard. Since this is also getting in the way of
   cleanups and improvements to the GUP code, get rid of if (me)

 - A few series greatly reducing the complexity of registered resources,
   like buffers or files. Not only does this clean up the code a lot,
   the simplified code is also a LOT more efficient (Pavel)

 - Series optimizing how we wait for events and run task_work related to
   it (Pavel)

 - Fixes for file/buffer unregistration with DEFER_TASKRUN (Pavel)

 - Misc cleanups and improvements (Pavel, me)

* tag 'for-6.4/io_uring-2023-04-21' of git://git.kernel.dk/linux: (71 commits)
  Revert "io_uring/rsrc: disallow multi-source reg buffers"
  io_uring: add support for multishot timeouts
  io_uring/rsrc: disassociate nodes and rsrc_data
  io_uring/rsrc: devirtualise rsrc put callbacks
  io_uring/rsrc: pass node to io_rsrc_put_work()
  io_uring/rsrc: inline io_rsrc_put_work()
  io_uring/rsrc: add empty flag in rsrc_node
  io_uring/rsrc: merge nodes and io_rsrc_put
  io_uring/rsrc: infer node from ctx on io_queue_rsrc_removal
  io_uring/rsrc: remove unused io_rsrc_node::llist
  io_uring/rsrc: refactor io_queue_rsrc_removal
  io_uring/rsrc: simplify single file node switching
  io_uring/rsrc: clean up __io_sqe_buffers_update()
  io_uring/rsrc: inline switch_start fast path
  io_uring/rsrc: remove rsrc_data refs
  io_uring/rsrc: fix DEFER_TASKRUN rsrc quiesce
  io_uring/rsrc: use wq for quiescing
  io_uring/rsrc: refactor io_rsrc_ref_quiesce
  io_uring/rsrc: remove io_rsrc_node::done
  io_uring/rsrc: use nospec'ed indexes
  ...
parents 5c7ecada 3c85cc43
......@@ -899,7 +899,8 @@ static int ext4_file_open(struct inode *inode, struct file *filp)
return ret;
}
filp->f_mode |= FMODE_NOWAIT | FMODE_BUF_RASYNC;
filp->f_mode |= FMODE_NOWAIT | FMODE_BUF_RASYNC |
FMODE_DIO_PARALLEL_WRITE;
return dquot_file_open(inode, filp);
}
......
......@@ -1171,7 +1171,8 @@ xfs_file_open(
{
if (xfs_is_shutdown(XFS_M(inode->i_sb)))
return -EIO;
file->f_mode |= FMODE_NOWAIT | FMODE_BUF_RASYNC | FMODE_BUF_WASYNC;
file->f_mode |= FMODE_NOWAIT | FMODE_BUF_RASYNC | FMODE_BUF_WASYNC |
FMODE_DIO_PARALLEL_WRITE;
return generic_file_open(inode, file);
}
......
......@@ -168,6 +168,9 @@ typedef int (dio_iodone_t)(struct kiocb *iocb, loff_t offset,
#define FMODE_NOREUSE ((__force fmode_t)0x800000)
/* File supports non-exclusive O_DIRECT writes from multiple threads */
#define FMODE_DIO_PARALLEL_WRITE ((__force fmode_t)0x1000000)
/* File was opened by fanotify and shouldn't generate fanotify events */
#define FMODE_NONOTIFY ((__force fmode_t)0x4000000)
......
......@@ -188,8 +188,10 @@ struct io_ev_fd {
};
struct io_alloc_cache {
struct hlist_head list;
struct io_wq_work_node list;
unsigned int nr_cached;
unsigned int max_cached;
size_t elem_size;
};
struct io_ring_ctx {
......@@ -239,7 +241,6 @@ struct io_ring_ctx {
* uring_lock, and updated through io_uring_register(2)
*/
struct io_rsrc_node *rsrc_node;
int rsrc_cached_refs;
atomic_t cancel_seq;
struct io_file_table file_table;
unsigned nr_user_files;
......@@ -295,7 +296,7 @@ struct io_ring_ctx {
spinlock_t completion_lock;
bool poll_multi_queue;
bool cq_waiting;
atomic_t cq_wait_nr;
/*
* ->iopoll_list is protected by the ctx->uring_lock for
......@@ -325,16 +326,15 @@ struct io_ring_ctx {
struct io_restriction restrictions;
/* slow path rsrc auxilary data, used by update/register */
struct io_rsrc_node *rsrc_backup_node;
struct io_mapped_ubuf *dummy_ubuf;
struct io_rsrc_data *file_data;
struct io_rsrc_data *buf_data;
struct delayed_work rsrc_put_work;
struct callback_head rsrc_put_tw;
struct llist_head rsrc_put_llist;
/* protected by ->uring_lock */
struct list_head rsrc_ref_list;
spinlock_t rsrc_ref_lock;
struct io_alloc_cache rsrc_node_cache;
struct wait_queue_head rsrc_quiesce_wq;
unsigned rsrc_quiesce;
struct list_head io_buffers_pages;
......@@ -366,6 +366,11 @@ struct io_ring_ctx {
unsigned evfd_last_cq_tail;
};
struct io_tw_state {
/* ->uring_lock is taken, callbacks can use io_tw_lock to lock it */
bool locked;
};
enum {
REQ_F_FIXED_FILE_BIT = IOSQE_FIXED_FILE_BIT,
REQ_F_IO_DRAIN_BIT = IOSQE_IO_DRAIN_BIT,
......@@ -472,7 +477,7 @@ enum {
REQ_F_HASH_LOCKED = BIT(REQ_F_HASH_LOCKED_BIT),
};
typedef void (*io_req_tw_func_t)(struct io_kiocb *req, bool *locked);
typedef void (*io_req_tw_func_t)(struct io_kiocb *req, struct io_tw_state *ts);
struct io_task_work {
struct llist_node node;
......@@ -562,6 +567,7 @@ struct io_kiocb {
atomic_t refs;
atomic_t poll_refs;
struct io_task_work io_task_work;
unsigned nr_tw;
/* for polled requests, i.e. IORING_OP_POLL_ADD and async armed poll */
union {
struct hlist_node hash_node;
......
......@@ -360,19 +360,18 @@ TRACE_EVENT(io_uring_complete,
);
/**
* io_uring_submit_sqe - called before submitting one SQE
* io_uring_submit_req - called before submitting a request
*
* @req: pointer to a submitted request
* @force_nonblock: whether a context blocking or not
*
* Allows to track SQE submitting, to understand what was the source of it, SQ
* thread or io_uring_enter call.
*/
TRACE_EVENT(io_uring_submit_sqe,
TRACE_EVENT(io_uring_submit_req,
TP_PROTO(struct io_kiocb *req, bool force_nonblock),
TP_PROTO(struct io_kiocb *req),
TP_ARGS(req, force_nonblock),
TP_ARGS(req),
TP_STRUCT__entry (
__field( void *, ctx )
......@@ -380,7 +379,6 @@ TRACE_EVENT(io_uring_submit_sqe,
__field( unsigned long long, user_data )
__field( u8, opcode )
__field( u32, flags )
__field( bool, force_nonblock )
__field( bool, sq_thread )
__string( op_str, io_uring_get_opcode(req->opcode) )
......@@ -392,16 +390,15 @@ TRACE_EVENT(io_uring_submit_sqe,
__entry->user_data = req->cqe.user_data;
__entry->opcode = req->opcode;
__entry->flags = req->flags;
__entry->force_nonblock = force_nonblock;
__entry->sq_thread = req->ctx->flags & IORING_SETUP_SQPOLL;
__assign_str(op_str, io_uring_get_opcode(req->opcode));
),
TP_printk("ring %p, req %p, user_data 0x%llx, opcode %s, flags 0x%x, "
"non block %d, sq_thread %d", __entry->ctx, __entry->req,
"sq_thread %d", __entry->ctx, __entry->req,
__entry->user_data, __get_str(op_str),
__entry->flags, __entry->force_nonblock, __entry->sq_thread)
__entry->flags, __entry->sq_thread)
);
/*
......
......@@ -250,6 +250,7 @@ enum io_uring_op {
#define IORING_TIMEOUT_REALTIME (1U << 3)
#define IORING_LINK_TIMEOUT_UPDATE (1U << 4)
#define IORING_TIMEOUT_ETIME_SUCCESS (1U << 5)
#define IORING_TIMEOUT_MULTISHOT (1U << 6)
#define IORING_TIMEOUT_CLOCK_MASK (IORING_TIMEOUT_BOOTTIME | IORING_TIMEOUT_REALTIME)
#define IORING_TIMEOUT_UPDATE_MASK (IORING_TIMEOUT_UPDATE | IORING_LINK_TIMEOUT_UPDATE)
/*
......@@ -389,6 +390,9 @@ enum {
#define IORING_OFF_SQ_RING 0ULL
#define IORING_OFF_CQ_RING 0x8000000ULL
#define IORING_OFF_SQES 0x10000000ULL
#define IORING_OFF_PBUF_RING 0x80000000ULL
#define IORING_OFF_PBUF_SHIFT 16
#define IORING_OFF_MMAP_MASK 0xf8000000ULL
/*
* Filled with the offset for mmap(2)
......@@ -568,19 +572,6 @@ struct io_uring_rsrc_update2 {
__u32 resv2;
};
struct io_uring_notification_slot {
__u64 tag;
__u64 resv[3];
};
struct io_uring_notification_register {
__u32 nr_slots;
__u32 resv;
__u64 resv2;
__u64 data;
__u64 resv3;
};
/* Skip updating fd indexes set to this value in the fd table */
#define IORING_REGISTER_FILES_SKIP (-2)
......@@ -635,12 +626,26 @@ struct io_uring_buf_ring {
};
};
/*
* Flags for IORING_REGISTER_PBUF_RING.
*
* IOU_PBUF_RING_MMAP: If set, kernel will allocate the memory for the ring.
* The application must not set a ring_addr in struct
* io_uring_buf_reg, instead it must subsequently call
* mmap(2) with the offset set as:
* IORING_OFF_PBUF_RING | (bgid << IORING_OFF_PBUF_SHIFT)
* to get a virtual mapping for the ring.
*/
enum {
IOU_PBUF_RING_MMAP = 1,
};
/* argument for IORING_(UN)REGISTER_PBUF_RING */
struct io_uring_buf_reg {
__u64 ring_addr;
__u32 ring_entries;
__u16 bgid;
__u16 pad;
__u16 flags;
__u64 resv[3];
};
......
......@@ -7,47 +7,60 @@
#define IO_ALLOC_CACHE_MAX 512
struct io_cache_entry {
struct hlist_node node;
struct io_wq_work_node node;
};
static inline bool io_alloc_cache_put(struct io_alloc_cache *cache,
struct io_cache_entry *entry)
{
if (cache->nr_cached < IO_ALLOC_CACHE_MAX) {
if (cache->nr_cached < cache->max_cached) {
cache->nr_cached++;
hlist_add_head(&entry->node, &cache->list);
wq_stack_add_head(&entry->node, &cache->list);
/* KASAN poisons object */
kasan_slab_free_mempool(entry);
return true;
}
return false;
}
static inline bool io_alloc_cache_empty(struct io_alloc_cache *cache)
{
return !cache->list.next;
}
static inline struct io_cache_entry *io_alloc_cache_get(struct io_alloc_cache *cache)
{
if (!hlist_empty(&cache->list)) {
struct hlist_node *node = cache->list.first;
if (cache->list.next) {
struct io_cache_entry *entry;
hlist_del(node);
entry = container_of(cache->list.next, struct io_cache_entry, node);
kasan_unpoison_range(entry, cache->elem_size);
cache->list.next = cache->list.next->next;
cache->nr_cached--;
return container_of(node, struct io_cache_entry, node);
return entry;
}
return NULL;
}
static inline void io_alloc_cache_init(struct io_alloc_cache *cache)
static inline void io_alloc_cache_init(struct io_alloc_cache *cache,
unsigned max_nr, size_t size)
{
INIT_HLIST_HEAD(&cache->list);
cache->list.next = NULL;
cache->nr_cached = 0;
cache->max_cached = max_nr;
cache->elem_size = size;
}
static inline void io_alloc_cache_free(struct io_alloc_cache *cache,
void (*free)(struct io_cache_entry *))
{
while (!hlist_empty(&cache->list)) {
struct hlist_node *node = cache->list.first;
while (1) {
struct io_cache_entry *entry = io_alloc_cache_get(cache);
hlist_del(node);
free(container_of(node, struct io_cache_entry, node));
if (!entry)
break;
free(entry);
}
cache->nr_cached = 0;
}
......
......@@ -64,7 +64,6 @@ static int io_install_fixed_file(struct io_ring_ctx *ctx, struct file *file,
u32 slot_index)
__must_hold(&req->ctx->uring_lock)
{
bool needs_switch = false;
struct io_fixed_file *file_slot;
int ret;
......@@ -81,18 +80,13 @@ static int io_install_fixed_file(struct io_ring_ctx *ctx, struct file *file,
if (file_slot->file_ptr) {
struct file *old_file;
ret = io_rsrc_node_switch_start(ctx);
if (ret)
goto err;
old_file = (struct file *)(file_slot->file_ptr & FFS_MASK);
ret = io_queue_rsrc_removal(ctx->file_data, slot_index,
ctx->rsrc_node, old_file);
ret = io_queue_rsrc_removal(ctx->file_data, slot_index, old_file);
if (ret)
goto err;
return ret;
file_slot->file_ptr = 0;
io_file_bitmap_clear(&ctx->file_table, slot_index);
needs_switch = true;
}
ret = io_scm_file_account(ctx, file);
......@@ -101,9 +95,6 @@ static int io_install_fixed_file(struct io_ring_ctx *ctx, struct file *file,
io_fixed_file_set(file_slot, file);
io_file_bitmap_set(&ctx->file_table, slot_index);
}
err:
if (needs_switch)
io_rsrc_node_switch(ctx, ctx->file_data);
return ret;
}
......@@ -156,9 +147,6 @@ int io_fixed_fd_remove(struct io_ring_ctx *ctx, unsigned int offset)
return -ENXIO;
if (offset >= ctx->nr_user_files)
return -EINVAL;
ret = io_rsrc_node_switch_start(ctx);
if (ret)
return ret;
offset = array_index_nospec(offset, ctx->nr_user_files);
file_slot = io_fixed_file_slot(&ctx->file_table, offset);
......@@ -166,13 +154,12 @@ int io_fixed_fd_remove(struct io_ring_ctx *ctx, unsigned int offset)
return -EBADF;
file = (struct file *)(file_slot->file_ptr & FFS_MASK);
ret = io_queue_rsrc_removal(ctx->file_data, offset, ctx->rsrc_node, file);
ret = io_queue_rsrc_removal(ctx->file_data, offset, file);
if (ret)
return ret;
file_slot->file_ptr = 0;
io_file_bitmap_clear(&ctx->file_table, offset);
io_rsrc_node_switch(ctx, ctx->file_data);
return 0;
}
......
......@@ -15,6 +15,7 @@
#include <linux/cpu.h>
#include <linux/task_work.h>
#include <linux/audit.h>
#include <linux/mmu_context.h>
#include <uapi/linux/io_uring.h>
#include "io-wq.h"
......@@ -39,7 +40,7 @@ enum {
};
/*
* One for each thread in a wqe pool
* One for each thread in a wq pool
*/
struct io_worker {
refcount_t ref;
......@@ -47,7 +48,7 @@ struct io_worker {
struct hlist_nulls_node nulls_node;
struct list_head all_list;
struct task_struct *task;
struct io_wqe *wqe;
struct io_wq *wq;
struct io_wq_work *cur_work;
struct io_wq_work *next_work;
......@@ -73,7 +74,7 @@ struct io_worker {
#define IO_WQ_NR_HASH_BUCKETS (1u << IO_WQ_HASH_ORDER)
struct io_wqe_acct {
struct io_wq_acct {
unsigned nr_workers;
unsigned max_workers;
int index;
......@@ -89,26 +90,6 @@ enum {
IO_WQ_ACCT_NR,
};
/*
* Per-node worker thread pool
*/
struct io_wqe {
raw_spinlock_t lock;
struct io_wqe_acct acct[IO_WQ_ACCT_NR];
int node;
struct hlist_nulls_head free_list;
struct list_head all_list;
struct wait_queue_entry wait;
struct io_wq *wq;
struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS];
cpumask_var_t cpu_mask;
};
/*
* Per io_wq state
*/
......@@ -127,7 +108,19 @@ struct io_wq {
struct task_struct *task;
struct io_wqe *wqes[];
struct io_wq_acct acct[IO_WQ_ACCT_NR];
/* lock protects access to elements below */
raw_spinlock_t lock;
struct hlist_nulls_head free_list;
struct list_head all_list;
struct wait_queue_entry wait;
struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS];
cpumask_var_t cpu_mask;
};
static enum cpuhp_state io_wq_online;
......@@ -140,10 +133,10 @@ struct io_cb_cancel_data {
bool cancel_all;
};
static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index);
static void io_wqe_dec_running(struct io_worker *worker);
static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
struct io_wqe_acct *acct,
static bool create_io_worker(struct io_wq *wq, int index);
static void io_wq_dec_running(struct io_worker *worker);
static bool io_acct_cancel_pending_work(struct io_wq *wq,
struct io_wq_acct *acct,
struct io_cb_cancel_data *match);
static void create_worker_cb(struct callback_head *cb);
static void io_wq_cancel_tw_create(struct io_wq *wq);
......@@ -159,20 +152,20 @@ static void io_worker_release(struct io_worker *worker)
complete(&worker->ref_done);
}
static inline struct io_wqe_acct *io_get_acct(struct io_wqe *wqe, bool bound)
static inline struct io_wq_acct *io_get_acct(struct io_wq *wq, bool bound)
{
return &wqe->acct[bound ? IO_WQ_ACCT_BOUND : IO_WQ_ACCT_UNBOUND];
return &wq->acct[bound ? IO_WQ_ACCT_BOUND : IO_WQ_ACCT_UNBOUND];
}
static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe,
struct io_wq_work *work)
static inline struct io_wq_acct *io_work_get_acct(struct io_wq *wq,
struct io_wq_work *work)
{
return io_get_acct(wqe, !(work->flags & IO_WQ_WORK_UNBOUND));
return io_get_acct(wq, !(work->flags & IO_WQ_WORK_UNBOUND));
}
static inline struct io_wqe_acct *io_wqe_get_acct(struct io_worker *worker)
static inline struct io_wq_acct *io_wq_get_acct(struct io_worker *worker)
{
return io_get_acct(worker->wqe, worker->flags & IO_WORKER_F_BOUND);
return io_get_acct(worker->wq, worker->flags & IO_WORKER_F_BOUND);
}
static void io_worker_ref_put(struct io_wq *wq)
......@@ -183,14 +176,13 @@ static void io_worker_ref_put(struct io_wq *wq)
static void io_worker_cancel_cb(struct io_worker *worker)
{
struct io_wqe_acct *acct = io_wqe_get_acct(worker);
struct io_wqe *wqe = worker->wqe;
struct io_wq *wq = wqe->wq;
struct io_wq_acct *acct = io_wq_get_acct(worker);
struct io_wq *wq = worker->wq;
atomic_dec(&acct->nr_running);
raw_spin_lock(&worker->wqe->lock);
raw_spin_lock(&wq->lock);
acct->nr_workers--;
raw_spin_unlock(&worker->wqe->lock);
raw_spin_unlock(&wq->lock);
io_worker_ref_put(wq);
clear_bit_unlock(0, &worker->create_state);
io_worker_release(worker);
......@@ -208,8 +200,7 @@ static bool io_task_worker_match(struct callback_head *cb, void *data)
static void io_worker_exit(struct io_worker *worker)
{
struct io_wqe *wqe = worker->wqe;
struct io_wq *wq = wqe->wq;
struct io_wq *wq = worker->wq;
while (1) {
struct callback_head *cb = task_work_cancel_match(wq->task,
......@@ -223,23 +214,23 @@ static void io_worker_exit(struct io_worker *worker)
io_worker_release(worker);
wait_for_completion(&worker->ref_done);
raw_spin_lock(&wqe->lock);
raw_spin_lock(&wq->lock);
if (worker->flags & IO_WORKER_F_FREE)
hlist_nulls_del_rcu(&worker->nulls_node);
list_del_rcu(&worker->all_list);
raw_spin_unlock(&wqe->lock);
io_wqe_dec_running(worker);
raw_spin_unlock(&wq->lock);
io_wq_dec_running(worker);
worker->flags = 0;
preempt_disable();
current->flags &= ~PF_IO_WORKER;
preempt_enable();
kfree_rcu(worker, rcu);
io_worker_ref_put(wqe->wq);
io_worker_ref_put(wq);
do_exit(0);
}
static inline bool io_acct_run_queue(struct io_wqe_acct *acct)
static inline bool io_acct_run_queue(struct io_wq_acct *acct)
{
bool ret = false;
......@@ -256,8 +247,8 @@ static inline bool io_acct_run_queue(struct io_wqe_acct *acct)
* Check head of free list for an available worker. If one isn't available,
* caller must create one.
*/
static bool io_wqe_activate_free_worker(struct io_wqe *wqe,
struct io_wqe_acct *acct)
static bool io_wq_activate_free_worker(struct io_wq *wq,
struct io_wq_acct *acct)
__must_hold(RCU)
{
struct hlist_nulls_node *n;
......@@ -268,10 +259,10 @@ static bool io_wqe_activate_free_worker(struct io_wqe *wqe,
* activate. If a given worker is on the free_list but in the process
* of exiting, keep trying.
*/
hlist_nulls_for_each_entry_rcu(worker, n, &wqe->free_list, nulls_node) {
hlist_nulls_for_each_entry_rcu(worker, n, &wq->free_list, nulls_node) {
if (!io_worker_get(worker))
continue;
if (io_wqe_get_acct(worker) != acct) {
if (io_wq_get_acct(worker) != acct) {
io_worker_release(worker);
continue;
}
......@@ -289,7 +280,7 @@ static bool io_wqe_activate_free_worker(struct io_wqe *wqe,
* We need a worker. If we find a free one, we're good. If not, and we're
* below the max number of workers, create one.
*/
static bool io_wqe_create_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
static bool io_wq_create_worker(struct io_wq *wq, struct io_wq_acct *acct)
{
/*
* Most likely an attempt to queue unbounded work on an io_wq that
......@@ -298,21 +289,21 @@ static bool io_wqe_create_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
if (unlikely(!acct->max_workers))
pr_warn_once("io-wq is not configured for unbound workers");
raw_spin_lock(&wqe->lock);
raw_spin_lock(&wq->lock);
if (acct->nr_workers >= acct->max_workers) {
raw_spin_unlock(&wqe->lock);
raw_spin_unlock(&wq->lock);
return true;
}
acct->nr_workers++;
raw_spin_unlock(&wqe->lock);
raw_spin_unlock(&wq->lock);
atomic_inc(&acct->nr_running);
atomic_inc(&wqe->wq->worker_refs);
return create_io_worker(wqe->wq, wqe, acct->index);
atomic_inc(&wq->worker_refs);
return create_io_worker(wq, acct->index);
}
static void io_wqe_inc_running(struct io_worker *worker)
static void io_wq_inc_running(struct io_worker *worker)
{
struct io_wqe_acct *acct = io_wqe_get_acct(worker);
struct io_wq_acct *acct = io_wq_get_acct(worker);
atomic_inc(&acct->nr_running);
}
......@@ -321,22 +312,22 @@ static void create_worker_cb(struct callback_head *cb)
{
struct io_worker *worker;
struct io_wq *wq;
struct io_wqe *wqe;
struct io_wqe_acct *acct;
struct io_wq_acct *acct;
bool do_create = false;
worker = container_of(cb, struct io_worker, create_work);
wqe = worker->wqe;
wq = wqe->wq;
acct = &wqe->acct[worker->create_index];
raw_spin_lock(&wqe->lock);
wq = worker->wq;
acct = &wq->acct[worker->create_index];
raw_spin_lock(&wq->lock);
if (acct->nr_workers < acct->max_workers) {
acct->nr_workers++;
do_create = true;
}
raw_spin_unlock(&wqe->lock);
raw_spin_unlock(&wq->lock);
if (do_create) {
create_io_worker(wq, wqe, worker->create_index);
create_io_worker(wq, worker->create_index);
} else {
atomic_dec(&acct->nr_running);
io_worker_ref_put(wq);
......@@ -346,11 +337,10 @@ static void create_worker_cb(struct callback_head *cb)
}
static bool io_queue_worker_create(struct io_worker *worker,
struct io_wqe_acct *acct,
struct io_wq_acct *acct,
task_work_func_t func)
{
struct io_wqe *wqe = worker->wqe;
struct io_wq *wq = wqe->wq;
struct io_wq *wq = worker->wq;
/* raced with exit, just ignore create call */
if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
......@@ -392,10 +382,10 @@ static bool io_queue_worker_create(struct io_worker *worker,
return false;
}
static void io_wqe_dec_running(struct io_worker *worker)
static void io_wq_dec_running(struct io_worker *worker)
{
struct io_wqe_acct *acct = io_wqe_get_acct(worker);
struct io_wqe *wqe = worker->wqe;
struct io_wq_acct *acct = io_wq_get_acct(worker);
struct io_wq *wq = worker->wq;
if (!(worker->flags & IO_WORKER_F_UP))
return;
......@@ -406,7 +396,7 @@ static void io_wqe_dec_running(struct io_worker *worker)
return;
atomic_inc(&acct->nr_running);
atomic_inc(&wqe->wq->worker_refs);
atomic_inc(&wq->worker_refs);
io_queue_worker_create(worker, acct, create_worker_cb);
}
......@@ -414,29 +404,25 @@ static void io_wqe_dec_running(struct io_worker *worker)
* Worker will start processing some work. Move it to the busy list, if
* it's currently on the freelist
*/
static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker)
static void __io_worker_busy(struct io_wq *wq, struct io_worker *worker)
{
if (worker->flags & IO_WORKER_F_FREE) {
worker->flags &= ~IO_WORKER_F_FREE;
raw_spin_lock(&wqe->lock);
raw_spin_lock(&wq->lock);
hlist_nulls_del_init_rcu(&worker->nulls_node);
raw_spin_unlock(&wqe->lock);
raw_spin_unlock(&wq->lock);
}
}
/*
* No work, worker going to sleep. Move to freelist, and unuse mm if we
* have one attached. Dropping the mm may potentially sleep, so we drop
* the lock in that case and return success. Since the caller has to
* retry the loop in that case (we changed task state), we don't regrab
* the lock if we return success.
* No work, worker going to sleep. Move to freelist.
*/
static void __io_worker_idle(struct io_wqe *wqe, struct io_worker *worker)
__must_hold(wqe->lock)
static void __io_worker_idle(struct io_wq *wq, struct io_worker *worker)
__must_hold(wq->lock)
{
if (!(worker->flags & IO_WORKER_F_FREE)) {
worker->flags |= IO_WORKER_F_FREE;
hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
hlist_nulls_add_head_rcu(&worker->nulls_node, &wq->free_list);
}
}
......@@ -445,17 +431,16 @@ static inline unsigned int io_get_work_hash(struct io_wq_work *work)
return work->flags >> IO_WQ_HASH_SHIFT;
}
static bool io_wait_on_hash(struct io_wqe *wqe, unsigned int hash)
static bool io_wait_on_hash(struct io_wq *wq, unsigned int hash)
{
struct io_wq *wq = wqe->wq;
bool ret = false;
spin_lock_irq(&wq->hash->wait.lock);
if (list_empty(&wqe->wait.entry)) {
__add_wait_queue(&wq->hash->wait, &wqe->wait);
if (list_empty(&wq->wait.entry)) {
__add_wait_queue(&wq->hash->wait, &wq->wait);
if (!test_bit(hash, &wq->hash->map)) {
__set_current_state(TASK_RUNNING);
list_del_init(&wqe->wait.entry);
list_del_init(&wq->wait.entry);
ret = true;
}
}
......@@ -463,14 +448,14 @@ static bool io_wait_on_hash(struct io_wqe *wqe, unsigned int hash)
return ret;
}
static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
static struct io_wq_work *io_get_next_work(struct io_wq_acct *acct,
struct io_worker *worker)
__must_hold(acct->lock)
{
struct io_wq_work_node *node, *prev;
struct io_wq_work *work, *tail;
unsigned int stall_hash = -1U;
struct io_wqe *wqe = worker->wqe;
struct io_wq *wq = worker->wq;
wq_list_for_each(node, prev, &acct->work_list) {
unsigned int hash;
......@@ -485,11 +470,11 @@ static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
hash = io_get_work_hash(work);
/* all items with this hash lie in [work, tail] */
tail = wqe->hash_tail[hash];
tail = wq->hash_tail[hash];
/* hashed, can run if not already running */
if (!test_and_set_bit(hash, &wqe->wq->hash->map)) {
wqe->hash_tail[hash] = NULL;
if (!test_and_set_bit(hash, &wq->hash->map)) {
wq->hash_tail[hash] = NULL;
wq_list_cut(&acct->work_list, &tail->list, prev);
return work;
}
......@@ -508,12 +493,12 @@ static struct io_wq_work *io_get_next_work(struct io_wqe_acct *acct,
*/
set_bit(IO_ACCT_STALLED_BIT, &acct->flags);
raw_spin_unlock(&acct->lock);
unstalled = io_wait_on_hash(wqe, stall_hash);
unstalled = io_wait_on_hash(wq, stall_hash);
raw_spin_lock(&acct->lock);
if (unstalled) {
clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
if (wq_has_sleeper(&wqe->wq->hash->wait))
wake_up(&wqe->wq->hash->wait);
if (wq_has_sleeper(&wq->hash->wait))
wake_up(&wq->hash->wait);
}
}
......@@ -534,13 +519,10 @@ static void io_assign_current_work(struct io_worker *worker,
raw_spin_unlock(&worker->lock);
}
static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work);
static void io_worker_handle_work(struct io_worker *worker)
{
struct io_wqe_acct *acct = io_wqe_get_acct(worker);
struct io_wqe *wqe = worker->wqe;
struct io_wq *wq = wqe->wq;
struct io_wq_acct *acct = io_wq_get_acct(worker);
struct io_wq *wq = worker->wq;
bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state);
do {
......@@ -557,7 +539,7 @@ static void io_worker_handle_work(struct io_worker *worker)
work = io_get_next_work(acct, worker);
raw_spin_unlock(&acct->lock);
if (work) {
__io_worker_busy(wqe, worker);
__io_worker_busy(wq, worker);
/*
* Make sure cancelation can find this, even before
......@@ -595,7 +577,7 @@ static void io_worker_handle_work(struct io_worker *worker)
}
io_assign_current_work(worker, work);
if (linked)
io_wqe_enqueue(wqe, linked);
io_wq_enqueue(wq, linked);
if (hash != -1U && !next_hashed) {
/* serialize hash clear with wake_up() */
......@@ -610,12 +592,11 @@ static void io_worker_handle_work(struct io_worker *worker)
} while (1);
}
static int io_wqe_worker(void *data)
static int io_wq_worker(void *data)
{
struct io_worker *worker = data;
struct io_wqe_acct *acct = io_wqe_get_acct(worker);
struct io_wqe *wqe = worker->wqe;
struct io_wq *wq = wqe->wq;
struct io_wq_acct *acct = io_wq_get_acct(worker);
struct io_wq *wq = worker->wq;
bool exit_mask = false, last_timeout = false;
char buf[TASK_COMM_LEN];
......@@ -631,20 +612,20 @@ static int io_wqe_worker(void *data)
while (io_acct_run_queue(acct))
io_worker_handle_work(worker);
raw_spin_lock(&wqe->lock);
raw_spin_lock(&wq->lock);
/*
* Last sleep timed out. Exit if we're not the last worker,
* or if someone modified our affinity.
*/
if (last_timeout && (exit_mask || acct->nr_workers > 1)) {
acct->nr_workers--;
raw_spin_unlock(&wqe->lock);
raw_spin_unlock(&wq->lock);
__set_current_state(TASK_RUNNING);
break;
}
last_timeout = false;
__io_worker_idle(wqe, worker);
raw_spin_unlock(&wqe->lock);
__io_worker_idle(wq, worker);
raw_spin_unlock(&wq->lock);
if (io_run_task_work())
continue;
ret = schedule_timeout(WORKER_IDLE_TIMEOUT);
......@@ -658,7 +639,7 @@ static int io_wqe_worker(void *data)
if (!ret) {
last_timeout = true;
exit_mask = !cpumask_test_cpu(raw_smp_processor_id(),
wqe->cpu_mask);
wq->cpu_mask);
}
}
......@@ -683,7 +664,7 @@ void io_wq_worker_running(struct task_struct *tsk)
if (worker->flags & IO_WORKER_F_RUNNING)
return;
worker->flags |= IO_WORKER_F_RUNNING;
io_wqe_inc_running(worker);
io_wq_inc_running(worker);
}
/*
......@@ -702,21 +683,21 @@ void io_wq_worker_sleeping(struct task_struct *tsk)
return;
worker->flags &= ~IO_WORKER_F_RUNNING;
io_wqe_dec_running(worker);
io_wq_dec_running(worker);
}
static void io_init_new_worker(struct io_wqe *wqe, struct io_worker *worker,
static void io_init_new_worker(struct io_wq *wq, struct io_worker *worker,
struct task_struct *tsk)
{
tsk->worker_private = worker;
worker->task = tsk;
set_cpus_allowed_ptr(tsk, wqe->cpu_mask);
set_cpus_allowed_ptr(tsk, wq->cpu_mask);
raw_spin_lock(&wqe->lock);
hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list);
list_add_tail_rcu(&worker->all_list, &wqe->all_list);
raw_spin_lock(&wq->lock);
hlist_nulls_add_head_rcu(&worker->nulls_node, &wq->free_list);
list_add_tail_rcu(&worker->all_list, &wq->all_list);
worker->flags |= IO_WORKER_F_FREE;
raw_spin_unlock(&wqe->lock);
raw_spin_unlock(&wq->lock);
wake_up_new_task(tsk);
}
......@@ -749,21 +730,21 @@ static void create_worker_cont(struct callback_head *cb)
{
struct io_worker *worker;
struct task_struct *tsk;
struct io_wqe *wqe;
struct io_wq *wq;
worker = container_of(cb, struct io_worker, create_work);
clear_bit_unlock(0, &worker->create_state);
wqe = worker->wqe;
tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
wq = worker->wq;
tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE);
if (!IS_ERR(tsk)) {
io_init_new_worker(wqe, worker, tsk);
io_init_new_worker(wq, worker, tsk);
io_worker_release(worker);
return;
} else if (!io_should_retry_thread(PTR_ERR(tsk))) {
struct io_wqe_acct *acct = io_wqe_get_acct(worker);
struct io_wq_acct *acct = io_wq_get_acct(worker);
atomic_dec(&acct->nr_running);
raw_spin_lock(&wqe->lock);
raw_spin_lock(&wq->lock);
acct->nr_workers--;
if (!acct->nr_workers) {
struct io_cb_cancel_data match = {
......@@ -771,13 +752,13 @@ static void create_worker_cont(struct callback_head *cb)
.cancel_all = true,
};
raw_spin_unlock(&wqe->lock);
while (io_acct_cancel_pending_work(wqe, acct, &match))
raw_spin_unlock(&wq->lock);
while (io_acct_cancel_pending_work(wq, acct, &match))
;
} else {
raw_spin_unlock(&wqe->lock);
raw_spin_unlock(&wq->lock);
}
io_worker_ref_put(wqe->wq);
io_worker_ref_put(wq);
kfree(worker);
return;
}
......@@ -790,42 +771,42 @@ static void create_worker_cont(struct callback_head *cb)
static void io_workqueue_create(struct work_struct *work)
{
struct io_worker *worker = container_of(work, struct io_worker, work);
struct io_wqe_acct *acct = io_wqe_get_acct(worker);
struct io_wq_acct *acct = io_wq_get_acct(worker);
if (!io_queue_worker_create(worker, acct, create_worker_cont))
kfree(worker);
}
static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
static bool create_io_worker(struct io_wq *wq, int index)
{
struct io_wqe_acct *acct = &wqe->acct[index];
struct io_wq_acct *acct = &wq->acct[index];
struct io_worker *worker;
struct task_struct *tsk;
__set_current_state(TASK_RUNNING);
worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node);
worker = kzalloc(sizeof(*worker), GFP_KERNEL);
if (!worker) {
fail:
atomic_dec(&acct->nr_running);
raw_spin_lock(&wqe->lock);
raw_spin_lock(&wq->lock);
acct->nr_workers--;
raw_spin_unlock(&wqe->lock);
raw_spin_unlock(&wq->lock);
io_worker_ref_put(wq);
return false;
}
refcount_set(&worker->ref, 1);
worker->wqe = wqe;
worker->wq = wq;
raw_spin_lock_init(&worker->lock);
init_completion(&worker->ref_done);
if (index == IO_WQ_ACCT_BOUND)
worker->flags |= IO_WORKER_F_BOUND;
tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE);
if (!IS_ERR(tsk)) {
io_init_new_worker(wqe, worker, tsk);
io_init_new_worker(wq, worker, tsk);
} else if (!io_should_retry_thread(PTR_ERR(tsk))) {
kfree(worker);
goto fail;
......@@ -841,14 +822,14 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
* Iterate the passed in list and call the specific function for each
* worker that isn't exiting
*/
static bool io_wq_for_each_worker(struct io_wqe *wqe,
static bool io_wq_for_each_worker(struct io_wq *wq,
bool (*func)(struct io_worker *, void *),
void *data)
{
struct io_worker *worker;
bool ret = false;
list_for_each_entry_rcu(worker, &wqe->all_list, all_list) {
list_for_each_entry_rcu(worker, &wq->all_list, all_list) {
if (io_worker_get(worker)) {
/* no task if node is/was offline */
if (worker->task)
......@@ -869,10 +850,8 @@ static bool io_wq_worker_wake(struct io_worker *worker, void *data)
return false;
}
static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
static void io_run_cancel(struct io_wq_work *work, struct io_wq *wq)
{
struct io_wq *wq = wqe->wq;
do {
work->flags |= IO_WQ_WORK_CANCEL;
wq->do_work(work);
......@@ -880,9 +859,9 @@ static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
} while (work);
}
static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
static void io_wq_insert_work(struct io_wq *wq, struct io_wq_work *work)
{
struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
struct io_wq_acct *acct = io_work_get_acct(wq, work);
unsigned int hash;
struct io_wq_work *tail;
......@@ -893,8 +872,8 @@ static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
}
hash = io_get_work_hash(work);
tail = wqe->hash_tail[hash];
wqe->hash_tail[hash] = work;
tail = wq->hash_tail[hash];
wq->hash_tail[hash] = work;
if (!tail)
goto append;
......@@ -906,9 +885,9 @@ static bool io_wq_work_match_item(struct io_wq_work *work, void *data)
return work == data;
}
static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
{
struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
struct io_wq_acct *acct = io_work_get_acct(wq, work);
struct io_cb_cancel_data match;
unsigned work_flags = work->flags;
bool do_create;
......@@ -917,55 +896,48 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
* If io-wq is exiting for this task, or if the request has explicitly
* been marked as one that should not get executed, cancel it here.
*/
if (test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state) ||
if (test_bit(IO_WQ_BIT_EXIT, &wq->state) ||
(work->flags & IO_WQ_WORK_CANCEL)) {
io_run_cancel(work, wqe);
io_run_cancel(work, wq);
return;
}
raw_spin_lock(&acct->lock);
io_wqe_insert_work(wqe, work);
io_wq_insert_work(wq, work);
clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
raw_spin_unlock(&acct->lock);
raw_spin_lock(&wqe->lock);
raw_spin_lock(&wq->lock);
rcu_read_lock();
do_create = !io_wqe_activate_free_worker(wqe, acct);
do_create = !io_wq_activate_free_worker(wq, acct);
rcu_read_unlock();
raw_spin_unlock(&wqe->lock);
raw_spin_unlock(&wq->lock);
if (do_create && ((work_flags & IO_WQ_WORK_CONCURRENT) ||
!atomic_read(&acct->nr_running))) {
bool did_create;
did_create = io_wqe_create_worker(wqe, acct);
did_create = io_wq_create_worker(wq, acct);
if (likely(did_create))
return;
raw_spin_lock(&wqe->lock);
raw_spin_lock(&wq->lock);
if (acct->nr_workers) {
raw_spin_unlock(&wqe->lock);
raw_spin_unlock(&wq->lock);
return;
}
raw_spin_unlock(&wqe->lock);
raw_spin_unlock(&wq->lock);
/* fatal condition, failed to create the first worker */
match.fn = io_wq_work_match_item,
match.data = work,
match.cancel_all = false,
io_acct_cancel_pending_work(wqe, acct, &match);
io_acct_cancel_pending_work(wq, acct, &match);
}
}
void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
{
struct io_wqe *wqe = wq->wqes[numa_node_id()];
io_wqe_enqueue(wqe, work);
}
/*
* Work items that hash to the same value will not be done in parallel.
* Used to limit concurrent writes, generally hashed by inode.
......@@ -1008,27 +980,27 @@ static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
return match->nr_running && !match->cancel_all;
}
static inline void io_wqe_remove_pending(struct io_wqe *wqe,
static inline void io_wq_remove_pending(struct io_wq *wq,
struct io_wq_work *work,
struct io_wq_work_node *prev)
{
struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
struct io_wq_acct *acct = io_work_get_acct(wq, work);
unsigned int hash = io_get_work_hash(work);
struct io_wq_work *prev_work = NULL;
if (io_wq_is_hashed(work) && work == wqe->hash_tail[hash]) {
if (io_wq_is_hashed(work) && work == wq->hash_tail[hash]) {
if (prev)
prev_work = container_of(prev, struct io_wq_work, list);
if (prev_work && io_get_work_hash(prev_work) == hash)
wqe->hash_tail[hash] = prev_work;
wq->hash_tail[hash] = prev_work;
else
wqe->hash_tail[hash] = NULL;
wq->hash_tail[hash] = NULL;
}
wq_list_del(&acct->work_list, &work->list, prev);
}
static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
struct io_wqe_acct *acct,
static bool io_acct_cancel_pending_work(struct io_wq *wq,
struct io_wq_acct *acct,
struct io_cb_cancel_data *match)
{
struct io_wq_work_node *node, *prev;
......@@ -1039,9 +1011,9 @@ static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
work = container_of(node, struct io_wq_work, list);
if (!match->fn(work, match->data))
continue;
io_wqe_remove_pending(wqe, work, prev);
io_wq_remove_pending(wq, work, prev);
raw_spin_unlock(&acct->lock);
io_run_cancel(work, wqe);
io_run_cancel(work, wq);
match->nr_pending++;
/* not safe to continue after unlock */
return true;
......@@ -1051,15 +1023,15 @@ static bool io_acct_cancel_pending_work(struct io_wqe *wqe,
return false;
}
static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
struct io_cb_cancel_data *match)
static void io_wq_cancel_pending_work(struct io_wq *wq,
struct io_cb_cancel_data *match)
{
int i;
retry:
for (i = 0; i < IO_WQ_ACCT_NR; i++) {
struct io_wqe_acct *acct = io_get_acct(wqe, i == 0);
struct io_wq_acct *acct = io_get_acct(wq, i == 0);
if (io_acct_cancel_pending_work(wqe, acct, match)) {
if (io_acct_cancel_pending_work(wq, acct, match)) {
if (match->cancel_all)
goto retry;
break;
......@@ -1067,11 +1039,11 @@ static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
}
}
static void io_wqe_cancel_running_work(struct io_wqe *wqe,
static void io_wq_cancel_running_work(struct io_wq *wq,
struct io_cb_cancel_data *match)
{
rcu_read_lock();
io_wq_for_each_worker(wqe, io_wq_worker_cancel, match);
io_wq_for_each_worker(wq, io_wq_worker_cancel, match);
rcu_read_unlock();
}
......@@ -1083,7 +1055,6 @@ enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
.data = data,
.cancel_all = cancel_all,
};
int node;
/*
* First check pending list, if we're lucky we can just remove it
......@@ -1095,22 +1066,18 @@ enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
* as an indication that we attempt to signal cancellation. The
* completion will run normally in this case.
*
* Do both of these while holding the wqe->lock, to ensure that
* Do both of these while holding the wq->lock, to ensure that
* we'll find a work item regardless of state.
*/
for_each_node(node) {
struct io_wqe *wqe = wq->wqes[node];
io_wqe_cancel_pending_work(wqe, &match);
if (match.nr_pending && !match.cancel_all)
return IO_WQ_CANCEL_OK;
raw_spin_lock(&wqe->lock);
io_wqe_cancel_running_work(wqe, &match);
raw_spin_unlock(&wqe->lock);
if (match.nr_running && !match.cancel_all)
return IO_WQ_CANCEL_RUNNING;
}
io_wq_cancel_pending_work(wq, &match);
if (match.nr_pending && !match.cancel_all)
return IO_WQ_CANCEL_OK;
raw_spin_lock(&wq->lock);
io_wq_cancel_running_work(wq, &match);
raw_spin_unlock(&wq->lock);
if (match.nr_running && !match.cancel_all)
return IO_WQ_CANCEL_RUNNING;
if (match.nr_running)
return IO_WQ_CANCEL_RUNNING;
......@@ -1119,20 +1086,20 @@ enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
return IO_WQ_CANCEL_NOTFOUND;
}
static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode,
static int io_wq_hash_wake(struct wait_queue_entry *wait, unsigned mode,
int sync, void *key)
{
struct io_wqe *wqe = container_of(wait, struct io_wqe, wait);
struct io_wq *wq = container_of(wait, struct io_wq, wait);
int i;
list_del_init(&wait->entry);
rcu_read_lock();
for (i = 0; i < IO_WQ_ACCT_NR; i++) {
struct io_wqe_acct *acct = &wqe->acct[i];
struct io_wq_acct *acct = &wq->acct[i];
if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags))
io_wqe_activate_free_worker(wqe, acct);
io_wq_activate_free_worker(wq, acct);
}
rcu_read_unlock();
return 1;
......@@ -1140,7 +1107,7 @@ static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode,
struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
{
int ret, node, i;
int ret, i;
struct io_wq *wq;
if (WARN_ON_ONCE(!data->free_work || !data->do_work))
......@@ -1148,7 +1115,7 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
if (WARN_ON_ONCE(!bounded))
return ERR_PTR(-EINVAL);
wq = kzalloc(struct_size(wq, wqes, nr_node_ids), GFP_KERNEL);
wq = kzalloc(sizeof(struct io_wq), GFP_KERNEL);
if (!wq)
return ERR_PTR(-ENOMEM);
ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node);
......@@ -1161,39 +1128,28 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
wq->do_work = data->do_work;
ret = -ENOMEM;
for_each_node(node) {
struct io_wqe *wqe;
int alloc_node = node;
if (!node_online(alloc_node))
alloc_node = NUMA_NO_NODE;
wqe = kzalloc_node(sizeof(struct io_wqe), GFP_KERNEL, alloc_node);
if (!wqe)
goto err;
wq->wqes[node] = wqe;
if (!alloc_cpumask_var(&wqe->cpu_mask, GFP_KERNEL))
goto err;
cpumask_copy(wqe->cpu_mask, cpumask_of_node(node));
wqe->node = alloc_node;
wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
task_rlimit(current, RLIMIT_NPROC);
INIT_LIST_HEAD(&wqe->wait.entry);
wqe->wait.func = io_wqe_hash_wake;
for (i = 0; i < IO_WQ_ACCT_NR; i++) {
struct io_wqe_acct *acct = &wqe->acct[i];
acct->index = i;
atomic_set(&acct->nr_running, 0);
INIT_WQ_LIST(&acct->work_list);
raw_spin_lock_init(&acct->lock);
}
wqe->wq = wq;
raw_spin_lock_init(&wqe->lock);
INIT_HLIST_NULLS_HEAD(&wqe->free_list, 0);
INIT_LIST_HEAD(&wqe->all_list);
if (!alloc_cpumask_var(&wq->cpu_mask, GFP_KERNEL))
goto err;
cpumask_copy(wq->cpu_mask, cpu_possible_mask);
wq->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
wq->acct[IO_WQ_ACCT_UNBOUND].max_workers =
task_rlimit(current, RLIMIT_NPROC);
INIT_LIST_HEAD(&wq->wait.entry);
wq->wait.func = io_wq_hash_wake;
for (i = 0; i < IO_WQ_ACCT_NR; i++) {
struct io_wq_acct *acct = &wq->acct[i];
acct->index = i;
atomic_set(&acct->nr_running, 0);
INIT_WQ_LIST(&acct->work_list);
raw_spin_lock_init(&acct->lock);
}
raw_spin_lock_init(&wq->lock);
INIT_HLIST_NULLS_HEAD(&wq->free_list, 0);
INIT_LIST_HEAD(&wq->all_list);
wq->task = get_task_struct(data->task);
atomic_set(&wq->worker_refs, 1);
init_completion(&wq->worker_done);
......@@ -1201,12 +1157,8 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
err:
io_wq_put_hash(data->hash);
cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
for_each_node(node) {
if (!wq->wqes[node])
continue;
free_cpumask_var(wq->wqes[node]->cpu_mask);
kfree(wq->wqes[node]);
}
free_cpumask_var(wq->cpu_mask);
err_wq:
kfree(wq);
return ERR_PTR(ret);
......@@ -1219,7 +1171,7 @@ static bool io_task_work_match(struct callback_head *cb, void *data)
if (cb->func != create_worker_cb && cb->func != create_worker_cont)
return false;
worker = container_of(cb, struct io_worker, create_work);
return worker->wqe->wq == data;
return worker->wq == data;
}
void io_wq_exit_start(struct io_wq *wq)
......@@ -1247,48 +1199,35 @@ static void io_wq_cancel_tw_create(struct io_wq *wq)
static void io_wq_exit_workers(struct io_wq *wq)
{
int node;
if (!wq->task)
return;
io_wq_cancel_tw_create(wq);
rcu_read_lock();
for_each_node(node) {
struct io_wqe *wqe = wq->wqes[node];
io_wq_for_each_worker(wqe, io_wq_worker_wake, NULL);
}
io_wq_for_each_worker(wq, io_wq_worker_wake, NULL);
rcu_read_unlock();
io_worker_ref_put(wq);
wait_for_completion(&wq->worker_done);
for_each_node(node) {
spin_lock_irq(&wq->hash->wait.lock);
list_del_init(&wq->wqes[node]->wait.entry);
spin_unlock_irq(&wq->hash->wait.lock);
}
spin_lock_irq(&wq->hash->wait.lock);
list_del_init(&wq->wait.entry);
spin_unlock_irq(&wq->hash->wait.lock);
put_task_struct(wq->task);
wq->task = NULL;
}
static void io_wq_destroy(struct io_wq *wq)
{
int node;
struct io_cb_cancel_data match = {
.fn = io_wq_work_match_all,
.cancel_all = true,
};
cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
for_each_node(node) {
struct io_wqe *wqe = wq->wqes[node];
struct io_cb_cancel_data match = {
.fn = io_wq_work_match_all,
.cancel_all = true,
};
io_wqe_cancel_pending_work(wqe, &match);
free_cpumask_var(wqe->cpu_mask);
kfree(wqe);
}
io_wq_cancel_pending_work(wq, &match);
free_cpumask_var(wq->cpu_mask);
io_wq_put_hash(wq->hash);
kfree(wq);
}
......@@ -1311,9 +1250,9 @@ static bool io_wq_worker_affinity(struct io_worker *worker, void *data)
struct online_data *od = data;
if (od->online)
cpumask_set_cpu(od->cpu, worker->wqe->cpu_mask);
cpumask_set_cpu(od->cpu, worker->wq->cpu_mask);
else
cpumask_clear_cpu(od->cpu, worker->wqe->cpu_mask);
cpumask_clear_cpu(od->cpu, worker->wq->cpu_mask);
return false;
}
......@@ -1323,11 +1262,9 @@ static int __io_wq_cpu_online(struct io_wq *wq, unsigned int cpu, bool online)
.cpu = cpu,
.online = online
};
int i;
rcu_read_lock();
for_each_node(i)
io_wq_for_each_worker(wq->wqes[i], io_wq_worker_affinity, &od);
io_wq_for_each_worker(wq, io_wq_worker_affinity, &od);
rcu_read_unlock();
return 0;
}
......@@ -1348,18 +1285,13 @@ static int io_wq_cpu_offline(unsigned int cpu, struct hlist_node *node)
int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask)
{
int i;
rcu_read_lock();
for_each_node(i) {
struct io_wqe *wqe = wq->wqes[i];
if (mask)
cpumask_copy(wqe->cpu_mask, mask);
else
cpumask_copy(wqe->cpu_mask, cpumask_of_node(i));
}
if (mask)
cpumask_copy(wq->cpu_mask, mask);
else
cpumask_copy(wq->cpu_mask, cpu_possible_mask);
rcu_read_unlock();
return 0;
}
......@@ -1369,9 +1301,9 @@ int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask)
*/
int io_wq_max_workers(struct io_wq *wq, int *new_count)
{
struct io_wq_acct *acct;
int prev[IO_WQ_ACCT_NR];
bool first_node = true;
int i, node;
int i;
BUILD_BUG_ON((int) IO_WQ_ACCT_BOUND != (int) IO_WQ_BOUND);
BUILD_BUG_ON((int) IO_WQ_ACCT_UNBOUND != (int) IO_WQ_UNBOUND);
......@@ -1386,21 +1318,15 @@ int io_wq_max_workers(struct io_wq *wq, int *new_count)
prev[i] = 0;
rcu_read_lock();
for_each_node(node) {
struct io_wqe *wqe = wq->wqes[node];
struct io_wqe_acct *acct;
raw_spin_lock(&wqe->lock);
for (i = 0; i < IO_WQ_ACCT_NR; i++) {
acct = &wqe->acct[i];
if (first_node)
prev[i] = max_t(int, acct->max_workers, prev[i]);
if (new_count[i])
acct->max_workers = new_count[i];
}
raw_spin_unlock(&wqe->lock);
first_node = false;
raw_spin_lock(&wq->lock);
for (i = 0; i < IO_WQ_ACCT_NR; i++) {
acct = &wq->acct[i];
prev[i] = max_t(int, acct->max_workers, prev[i]);
if (new_count[i])
acct->max_workers = new_count[i];
}
raw_spin_unlock(&wq->lock);
rcu_read_unlock();
for (i = 0; i < IO_WQ_ACCT_NR; i++)
......
......@@ -72,6 +72,7 @@
#include <linux/io_uring.h>
#include <linux/audit.h>
#include <linux/security.h>
#include <asm/shmparam.h>
#define CREATE_TRACE_POINTS
#include <trace/events/io_uring.h>
......@@ -246,12 +247,12 @@ static __cold void io_fallback_req_func(struct work_struct *work)
fallback_work.work);
struct llist_node *node = llist_del_all(&ctx->fallback_llist);
struct io_kiocb *req, *tmp;
bool locked = true;
struct io_tw_state ts = { .locked = true, };
mutex_lock(&ctx->uring_lock);
llist_for_each_entry_safe(req, tmp, node, io_task_work.node)
req->io_task_work.func(req, &locked);
if (WARN_ON_ONCE(!locked))
req->io_task_work.func(req, &ts);
if (WARN_ON_ONCE(!ts.locked))
return;
io_submit_flush_completions(ctx);
mutex_unlock(&ctx->uring_lock);
......@@ -309,13 +310,18 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
INIT_LIST_HEAD(&ctx->sqd_list);
INIT_LIST_HEAD(&ctx->cq_overflow_list);
INIT_LIST_HEAD(&ctx->io_buffers_cache);
io_alloc_cache_init(&ctx->apoll_cache);
io_alloc_cache_init(&ctx->netmsg_cache);
io_alloc_cache_init(&ctx->rsrc_node_cache, IO_NODE_ALLOC_CACHE_MAX,
sizeof(struct io_rsrc_node));
io_alloc_cache_init(&ctx->apoll_cache, IO_ALLOC_CACHE_MAX,
sizeof(struct async_poll));
io_alloc_cache_init(&ctx->netmsg_cache, IO_ALLOC_CACHE_MAX,
sizeof(struct io_async_msghdr));
init_completion(&ctx->ref_comp);
xa_init_flags(&ctx->personalities, XA_FLAGS_ALLOC1);
mutex_init(&ctx->uring_lock);
init_waitqueue_head(&ctx->cq_wait);
init_waitqueue_head(&ctx->poll_wq);
init_waitqueue_head(&ctx->rsrc_quiesce_wq);
spin_lock_init(&ctx->completion_lock);
spin_lock_init(&ctx->timeout_lock);
INIT_WQ_LIST(&ctx->iopoll_list);
......@@ -324,11 +330,7 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
INIT_LIST_HEAD(&ctx->defer_list);
INIT_LIST_HEAD(&ctx->timeout_list);
INIT_LIST_HEAD(&ctx->ltimeout_list);
spin_lock_init(&ctx->rsrc_ref_lock);
INIT_LIST_HEAD(&ctx->rsrc_ref_list);
INIT_DELAYED_WORK(&ctx->rsrc_put_work, io_rsrc_put_work);
init_task_work(&ctx->rsrc_put_tw, io_rsrc_put_tw);
init_llist_head(&ctx->rsrc_put_llist);
init_llist_head(&ctx->work_llist);
INIT_LIST_HEAD(&ctx->tctx_list);
ctx->submit_state.free_list.next = NULL;
......@@ -424,8 +426,14 @@ static void io_prep_async_work(struct io_kiocb *req)
if (req->file && !io_req_ffs_set(req))
req->flags |= io_file_get_flags(req->file) << REQ_F_SUPPORT_NOWAIT_BIT;
if (req->flags & REQ_F_ISREG) {
if (def->hash_reg_file || (ctx->flags & IORING_SETUP_IOPOLL))
if (req->file && (req->flags & REQ_F_ISREG)) {
bool should_hash = def->hash_reg_file;
/* don't serialize this request if the fs doesn't need it */
if (should_hash && (req->file->f_flags & O_DIRECT) &&
(req->file->f_mode & FMODE_DIO_PARALLEL_WRITE))
should_hash = false;
if (should_hash || (ctx->flags & IORING_SETUP_IOPOLL))
io_wq_hash_work(&req->work, file_inode(req->file));
} else if (!req->file || !S_ISBLK(file_inode(req->file)->i_mode)) {
if (def->unbound_nonreg_file)
......@@ -450,7 +458,7 @@ static void io_prep_async_link(struct io_kiocb *req)
}
}
void io_queue_iowq(struct io_kiocb *req, bool *dont_use)
void io_queue_iowq(struct io_kiocb *req, struct io_tw_state *ts_dont_use)
{
struct io_kiocb *link = io_prep_linked_timeout(req);
struct io_uring_task *tctx = req->task->io_uring;
......@@ -620,22 +628,22 @@ static inline void __io_cq_unlock_post(struct io_ring_ctx *ctx)
io_cqring_wake(ctx);
}
static inline void __io_cq_unlock_post_flush(struct io_ring_ctx *ctx)
static void __io_cq_unlock_post_flush(struct io_ring_ctx *ctx)
__releases(ctx->completion_lock)
{
io_commit_cqring(ctx);
__io_cq_unlock(ctx);
io_commit_cqring_flush(ctx);
/*
* As ->task_complete implies that the ring is single tasked, cq_wait
* may only be waited on by the current in io_cqring_wait(), but since
* it will re-check the wakeup conditions once we return we can safely
* skip waking it up.
*/
if (!(ctx->flags & IORING_SETUP_DEFER_TASKRUN)) {
smp_mb();
__io_cqring_wake(ctx);
if (ctx->task_complete) {
/*
* ->task_complete implies that only current might be waiting
* for CQEs, and obviously, we currently don't. No one is
* waiting, wakeups are futile, skip them.
*/
io_commit_cqring_flush(ctx);
} else {
__io_cq_unlock(ctx);
io_commit_cqring_flush(ctx);
io_cqring_wake(ctx);
}
}
......@@ -960,9 +968,10 @@ bool io_aux_cqe(struct io_ring_ctx *ctx, bool defer, u64 user_data, s32 res, u32
return true;
}
static void __io_req_complete_post(struct io_kiocb *req)
static void __io_req_complete_post(struct io_kiocb *req, unsigned issue_flags)
{
struct io_ring_ctx *ctx = req->ctx;
struct io_rsrc_node *rsrc_node = NULL;
io_cq_lock(ctx);
if (!(req->flags & REQ_F_CQE_SKIP))
......@@ -983,7 +992,7 @@ static void __io_req_complete_post(struct io_kiocb *req)
}
io_put_kbuf_comp(req);
io_dismantle_req(req);
io_req_put_rsrc(req);
rsrc_node = req->rsrc_node;
/*
* Selected buffer deallocation in io_clean_op() assumes that
* we don't hold ->completion_lock. Clean them here to avoid
......@@ -994,6 +1003,12 @@ static void __io_req_complete_post(struct io_kiocb *req)
ctx->locked_free_nr++;
}
io_cq_unlock_post(ctx);
if (rsrc_node) {
io_ring_submit_lock(ctx, issue_flags);
io_put_rsrc_node(ctx, rsrc_node);
io_ring_submit_unlock(ctx, issue_flags);
}
}
void io_req_complete_post(struct io_kiocb *req, unsigned issue_flags)
......@@ -1003,12 +1018,12 @@ void io_req_complete_post(struct io_kiocb *req, unsigned issue_flags)
io_req_task_work_add(req);
} else if (!(issue_flags & IO_URING_F_UNLOCKED) ||
!(req->ctx->flags & IORING_SETUP_IOPOLL)) {
__io_req_complete_post(req);
__io_req_complete_post(req, issue_flags);
} else {
struct io_ring_ctx *ctx = req->ctx;
mutex_lock(&ctx->uring_lock);
__io_req_complete_post(req);
__io_req_complete_post(req, issue_flags & ~IO_URING_F_UNLOCKED);
mutex_unlock(&ctx->uring_lock);
}
}
......@@ -1106,11 +1121,14 @@ static inline void io_dismantle_req(struct io_kiocb *req)
io_put_file(req->file);
}
__cold void io_free_req(struct io_kiocb *req)
static __cold void io_free_req_tw(struct io_kiocb *req, struct io_tw_state *ts)
{
struct io_ring_ctx *ctx = req->ctx;
io_req_put_rsrc(req);
if (req->rsrc_node) {
io_tw_lock(ctx, ts);
io_put_rsrc_node(ctx, req->rsrc_node);
}
io_dismantle_req(req);
io_put_task_remote(req->task, 1);
......@@ -1120,6 +1138,12 @@ __cold void io_free_req(struct io_kiocb *req)
spin_unlock(&ctx->completion_lock);
}
__cold void io_free_req(struct io_kiocb *req)
{
req->io_task_work.func = io_free_req_tw;
io_req_task_work_add(req);
}
static void __io_req_find_next_prep(struct io_kiocb *req)
{
struct io_ring_ctx *ctx = req->ctx;
......@@ -1146,22 +1170,23 @@ static inline struct io_kiocb *io_req_find_next(struct io_kiocb *req)
return nxt;
}
static void ctx_flush_and_put(struct io_ring_ctx *ctx, bool *locked)
static void ctx_flush_and_put(struct io_ring_ctx *ctx, struct io_tw_state *ts)
{
if (!ctx)
return;
if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
if (*locked) {
if (ts->locked) {
io_submit_flush_completions(ctx);
mutex_unlock(&ctx->uring_lock);
*locked = false;
ts->locked = false;
}
percpu_ref_put(&ctx->refs);
}
static unsigned int handle_tw_list(struct llist_node *node,
struct io_ring_ctx **ctx, bool *locked,
struct io_ring_ctx **ctx,
struct io_tw_state *ts,
struct llist_node *last)
{
unsigned int count = 0;
......@@ -1174,18 +1199,17 @@ static unsigned int handle_tw_list(struct llist_node *node,
prefetch(container_of(next, struct io_kiocb, io_task_work.node));
if (req->ctx != *ctx) {
ctx_flush_and_put(*ctx, locked);
ctx_flush_and_put(*ctx, ts);
*ctx = req->ctx;
/* if not contended, grab and improve batching */
*locked = mutex_trylock(&(*ctx)->uring_lock);
ts->locked = mutex_trylock(&(*ctx)->uring_lock);
percpu_ref_get(&(*ctx)->refs);
} else if (!*locked)
*locked = mutex_trylock(&(*ctx)->uring_lock);
req->io_task_work.func(req, locked);
}
req->io_task_work.func(req, ts);
node = next;
count++;
if (unlikely(need_resched())) {
ctx_flush_and_put(*ctx, locked);
ctx_flush_and_put(*ctx, ts);
*ctx = NULL;
cond_resched();
}
......@@ -1226,7 +1250,7 @@ static inline struct llist_node *io_llist_cmpxchg(struct llist_head *head,
void tctx_task_work(struct callback_head *cb)
{
bool uring_locked = false;
struct io_tw_state ts = {};
struct io_ring_ctx *ctx = NULL;
struct io_uring_task *tctx = container_of(cb, struct io_uring_task,
task_work);
......@@ -1243,12 +1267,12 @@ void tctx_task_work(struct callback_head *cb)
do {
loops++;
node = io_llist_xchg(&tctx->task_list, &fake);
count += handle_tw_list(node, &ctx, &uring_locked, &fake);
count += handle_tw_list(node, &ctx, &ts, &fake);
/* skip expensive cmpxchg if there are items in the list */
if (READ_ONCE(tctx->task_list.first) != &fake)
continue;
if (uring_locked && !wq_list_empty(&ctx->submit_state.compl_reqs)) {
if (ts.locked && !wq_list_empty(&ctx->submit_state.compl_reqs)) {
io_submit_flush_completions(ctx);
if (READ_ONCE(tctx->task_list.first) != &fake)
continue;
......@@ -1256,7 +1280,7 @@ void tctx_task_work(struct callback_head *cb)
node = io_llist_cmpxchg(&tctx->task_list, &fake, NULL);
} while (node != &fake);
ctx_flush_and_put(ctx, &uring_locked);
ctx_flush_and_put(ctx, &ts);
/* relaxed read is enough as only the task itself sets ->in_cancel */
if (unlikely(atomic_read(&tctx->in_cancel)))
......@@ -1279,42 +1303,67 @@ static __cold void io_fallback_tw(struct io_uring_task *tctx)
}
}
static void io_req_local_work_add(struct io_kiocb *req)
static void io_req_local_work_add(struct io_kiocb *req, unsigned flags)
{
struct io_ring_ctx *ctx = req->ctx;
unsigned nr_wait, nr_tw, nr_tw_prev;
struct llist_node *first;
percpu_ref_get(&ctx->refs);
if (!llist_add(&req->io_task_work.node, &ctx->work_llist))
goto put_ref;
if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK))
flags &= ~IOU_F_TWQ_LAZY_WAKE;
/* needed for the following wake up */
smp_mb__after_atomic();
if (unlikely(atomic_read(&req->task->io_uring->in_cancel))) {
io_move_task_work_from_local(ctx);
goto put_ref;
first = READ_ONCE(ctx->work_llist.first);
do {
nr_tw_prev = 0;
if (first) {
struct io_kiocb *first_req = container_of(first,
struct io_kiocb,
io_task_work.node);
/*
* Might be executed at any moment, rely on
* SLAB_TYPESAFE_BY_RCU to keep it alive.
*/
nr_tw_prev = READ_ONCE(first_req->nr_tw);
}
nr_tw = nr_tw_prev + 1;
/* Large enough to fail the nr_wait comparison below */
if (!(flags & IOU_F_TWQ_LAZY_WAKE))
nr_tw = -1U;
req->nr_tw = nr_tw;
req->io_task_work.node.next = first;
} while (!try_cmpxchg(&ctx->work_llist.first, &first,
&req->io_task_work.node));
if (!first) {
if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
if (ctx->has_evfd)
io_eventfd_signal(ctx);
}
if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
if (ctx->has_evfd)
io_eventfd_signal(ctx);
if (READ_ONCE(ctx->cq_waiting))
wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE);
put_ref:
percpu_ref_put(&ctx->refs);
nr_wait = atomic_read(&ctx->cq_wait_nr);
/* no one is waiting */
if (!nr_wait)
return;
/* either not enough or the previous add has already woken it up */
if (nr_wait > nr_tw || nr_tw_prev >= nr_wait)
return;
/* pairs with set_current_state() in io_cqring_wait() */
smp_mb__after_atomic();
wake_up_state(ctx->submitter_task, TASK_INTERRUPTIBLE);
}
void __io_req_task_work_add(struct io_kiocb *req, bool allow_local)
void __io_req_task_work_add(struct io_kiocb *req, unsigned flags)
{
struct io_uring_task *tctx = req->task->io_uring;
struct io_ring_ctx *ctx = req->ctx;
if (allow_local && ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
io_req_local_work_add(req);
if (!(flags & IOU_F_TWQ_FORCE_NORMAL) &&
(ctx->flags & IORING_SETUP_DEFER_TASKRUN)) {
rcu_read_lock();
io_req_local_work_add(req, flags);
rcu_read_unlock();
return;
}
......@@ -1341,11 +1390,11 @@ static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx)
io_task_work.node);
node = node->next;
__io_req_task_work_add(req, false);
__io_req_task_work_add(req, IOU_F_TWQ_FORCE_NORMAL);
}
}
static int __io_run_local_work(struct io_ring_ctx *ctx, bool *locked)
static int __io_run_local_work(struct io_ring_ctx *ctx, struct io_tw_state *ts)
{
struct llist_node *node;
unsigned int loops = 0;
......@@ -1362,7 +1411,7 @@ static int __io_run_local_work(struct io_ring_ctx *ctx, bool *locked)
struct io_kiocb *req = container_of(node, struct io_kiocb,
io_task_work.node);
prefetch(container_of(next, struct io_kiocb, io_task_work.node));
req->io_task_work.func(req, locked);
req->io_task_work.func(req, ts);
ret++;
node = next;
}
......@@ -1370,7 +1419,7 @@ static int __io_run_local_work(struct io_ring_ctx *ctx, bool *locked)
if (!llist_empty(&ctx->work_llist))
goto again;
if (*locked) {
if (ts->locked) {
io_submit_flush_completions(ctx);
if (!llist_empty(&ctx->work_llist))
goto again;
......@@ -1381,46 +1430,46 @@ static int __io_run_local_work(struct io_ring_ctx *ctx, bool *locked)
static inline int io_run_local_work_locked(struct io_ring_ctx *ctx)
{
bool locked;
struct io_tw_state ts = { .locked = true, };
int ret;
if (llist_empty(&ctx->work_llist))
return 0;
locked = true;
ret = __io_run_local_work(ctx, &locked);
ret = __io_run_local_work(ctx, &ts);
/* shouldn't happen! */
if (WARN_ON_ONCE(!locked))
if (WARN_ON_ONCE(!ts.locked))
mutex_lock(&ctx->uring_lock);
return ret;
}
static int io_run_local_work(struct io_ring_ctx *ctx)
{
bool locked = mutex_trylock(&ctx->uring_lock);
struct io_tw_state ts = {};
int ret;
ret = __io_run_local_work(ctx, &locked);
if (locked)
ts.locked = mutex_trylock(&ctx->uring_lock);
ret = __io_run_local_work(ctx, &ts);
if (ts.locked)
mutex_unlock(&ctx->uring_lock);
return ret;
}
static void io_req_task_cancel(struct io_kiocb *req, bool *locked)
static void io_req_task_cancel(struct io_kiocb *req, struct io_tw_state *ts)
{
io_tw_lock(req->ctx, locked);
io_tw_lock(req->ctx, ts);
io_req_defer_failed(req, req->cqe.res);
}
void io_req_task_submit(struct io_kiocb *req, bool *locked)
void io_req_task_submit(struct io_kiocb *req, struct io_tw_state *ts)
{
io_tw_lock(req->ctx, locked);
io_tw_lock(req->ctx, ts);
/* req->task == current here, checking PF_EXITING is safe */
if (unlikely(req->task->flags & PF_EXITING))
io_req_defer_failed(req, -EFAULT);
else if (req->flags & REQ_F_FORCE_ASYNC)
io_queue_iowq(req, locked);
io_queue_iowq(req, ts);
else
io_queue_sqe(req);
}
......@@ -1646,9 +1695,9 @@ static int io_iopoll_check(struct io_ring_ctx *ctx, long min)
return ret;
}
void io_req_task_complete(struct io_kiocb *req, bool *locked)
void io_req_task_complete(struct io_kiocb *req, struct io_tw_state *ts)
{
if (*locked)
if (ts->locked)
io_req_complete_defer(req);
else
io_req_complete_post(req, IO_URING_F_UNLOCKED);
......@@ -1927,9 +1976,9 @@ static int io_issue_sqe(struct io_kiocb *req, unsigned int issue_flags)
return 0;
}
int io_poll_issue(struct io_kiocb *req, bool *locked)
int io_poll_issue(struct io_kiocb *req, struct io_tw_state *ts)
{
io_tw_lock(req->ctx, locked);
io_tw_lock(req->ctx, ts);
return io_issue_sqe(req, IO_URING_F_NONBLOCK|IO_URING_F_MULTISHOT|
IO_URING_F_COMPLETE_DEFER);
}
......@@ -2298,8 +2347,7 @@ static inline int io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
if (unlikely(ret))
return io_submit_fail_init(sqe, req, ret);
/* don't need @sqe from now on */
trace_io_uring_submit_sqe(req, true);
trace_io_uring_submit_req(req);
/*
* If we already have a head request, queue this one for async
......@@ -2428,7 +2476,7 @@ int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr)
if (unlikely(!entries))
return 0;
/* make sure SQ entry isn't read before tail */
ret = left = min3(nr, ctx->sq_entries, entries);
ret = left = min(nr, entries);
io_get_task_refs(left);
io_submit_state_start(&ctx->submit_state, left);
......@@ -2600,7 +2648,9 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
unsigned long check_cq;
if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
WRITE_ONCE(ctx->cq_waiting, 1);
int nr_wait = (int) iowq.cq_tail - READ_ONCE(ctx->rings->cq.tail);
atomic_set(&ctx->cq_wait_nr, nr_wait);
set_current_state(TASK_INTERRUPTIBLE);
} else {
prepare_to_wait_exclusive(&ctx->cq_wait, &iowq.wq,
......@@ -2609,7 +2659,7 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
ret = io_cqring_wait_schedule(ctx, &iowq);
__set_current_state(TASK_RUNNING);
WRITE_ONCE(ctx->cq_waiting, 0);
atomic_set(&ctx->cq_wait_nr, 0);
if (ret < 0)
break;
......@@ -2772,13 +2822,17 @@ static void io_req_caches_free(struct io_ring_ctx *ctx)
mutex_unlock(&ctx->uring_lock);
}
static void io_rsrc_node_cache_free(struct io_cache_entry *entry)
{
kfree(container_of(entry, struct io_rsrc_node, cache));
}
static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx)
{
io_sq_thread_finish(ctx);
io_rsrc_refs_drop(ctx);
/* __io_rsrc_put_work() may need uring_lock to progress, wait w/o it */
io_wait_rsrc_data(ctx->buf_data);
io_wait_rsrc_data(ctx->file_data);
if (WARN_ON_ONCE(!list_empty(&ctx->rsrc_ref_list)))
return;
mutex_lock(&ctx->uring_lock);
if (ctx->buf_data)
......@@ -2798,14 +2852,9 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx)
/* there are no registered resources left, nobody uses it */
if (ctx->rsrc_node)
io_rsrc_node_destroy(ctx->rsrc_node);
if (ctx->rsrc_backup_node)
io_rsrc_node_destroy(ctx->rsrc_backup_node);
flush_delayed_work(&ctx->rsrc_put_work);
flush_delayed_work(&ctx->fallback_work);
io_rsrc_node_destroy(ctx, ctx->rsrc_node);
WARN_ON_ONCE(!list_empty(&ctx->rsrc_ref_list));
WARN_ON_ONCE(!llist_empty(&ctx->rsrc_put_llist));
#if defined(CONFIG_UNIX)
if (ctx->ring_sock) {
......@@ -2815,6 +2864,7 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx)
#endif
WARN_ON_ONCE(!list_empty(&ctx->ltimeout_list));
io_alloc_cache_free(&ctx->rsrc_node_cache, io_rsrc_node_cache_free);
if (ctx->mm_account) {
mmdrop(ctx->mm_account);
ctx->mm_account = NULL;
......@@ -3031,6 +3081,10 @@ static __cold void io_ring_exit_work(struct work_struct *work)
spin_lock(&ctx->completion_lock);
spin_unlock(&ctx->completion_lock);
/* pairs with RCU read section in io_req_local_work_add() */
if (ctx->flags & IORING_SETUP_DEFER_TASKRUN)
synchronize_rcu();
io_ring_ctx_free(ctx);
}
......@@ -3146,6 +3200,12 @@ static __cold bool io_uring_try_cancel_requests(struct io_ring_ctx *ctx,
enum io_wq_cancel cret;
bool ret = false;
/* set it so io_req_local_work_add() would wake us up */
if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
atomic_set(&ctx->cq_wait_nr, 1);
smp_mb();
}
/* failed during ring init, it couldn't have issued any requests */
if (!ctx->rings)
return false;
......@@ -3200,6 +3260,8 @@ __cold void io_uring_cancel_generic(bool cancel_all, struct io_sq_data *sqd)
{
struct io_uring_task *tctx = current->io_uring;
struct io_ring_ctx *ctx;
struct io_tctx_node *node;
unsigned long index;
s64 inflight;
DEFINE_WAIT(wait);
......@@ -3221,9 +3283,6 @@ __cold void io_uring_cancel_generic(bool cancel_all, struct io_sq_data *sqd)
break;
if (!sqd) {
struct io_tctx_node *node;
unsigned long index;
xa_for_each(&tctx->xa, index, node) {
/* sqpoll task will cancel all its requests */
if (node->ctx->sq_data)
......@@ -3246,7 +3305,13 @@ __cold void io_uring_cancel_generic(bool cancel_all, struct io_sq_data *sqd)
prepare_to_wait(&tctx->wait, &wait, TASK_INTERRUPTIBLE);
io_run_task_work();
io_uring_drop_tctx_refs(current);
xa_for_each(&tctx->xa, index, node) {
if (!llist_empty(&node->ctx->work_llist)) {
WARN_ON_ONCE(node->ctx->submitter_task &&
node->ctx->submitter_task != current);
goto end_wait;
}
}
/*
* If we've seen completions, retry without waiting. This
* avoids a race where a completion comes in before we did
......@@ -3254,6 +3319,7 @@ __cold void io_uring_cancel_generic(bool cancel_all, struct io_sq_data *sqd)
*/
if (inflight == tctx_inflight(tctx, !cancel_all))
schedule();
end_wait:
finish_wait(&tctx->wait, &wait);
} while (1);
......@@ -3282,7 +3348,7 @@ static void *io_uring_validate_mmap_request(struct file *file,
struct page *page;
void *ptr;
switch (offset) {
switch (offset & IORING_OFF_MMAP_MASK) {
case IORING_OFF_SQ_RING:
case IORING_OFF_CQ_RING:
ptr = ctx->rings;
......@@ -3290,6 +3356,17 @@ static void *io_uring_validate_mmap_request(struct file *file,
case IORING_OFF_SQES:
ptr = ctx->sq_sqes;
break;
case IORING_OFF_PBUF_RING: {
unsigned int bgid;
bgid = (offset & ~IORING_OFF_MMAP_MASK) >> IORING_OFF_PBUF_SHIFT;
mutex_lock(&ctx->uring_lock);
ptr = io_pbuf_get_address(ctx, bgid);
mutex_unlock(&ctx->uring_lock);
if (!ptr)
return ERR_PTR(-EINVAL);
break;
}
default:
return ERR_PTR(-EINVAL);
}
......@@ -3317,6 +3394,54 @@ static __cold int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
return remap_pfn_range(vma, vma->vm_start, pfn, sz, vma->vm_page_prot);
}
static unsigned long io_uring_mmu_get_unmapped_area(struct file *filp,
unsigned long addr, unsigned long len,
unsigned long pgoff, unsigned long flags)
{
const unsigned long mmap_end = arch_get_mmap_end(addr, len, flags);
struct vm_unmapped_area_info info;
void *ptr;
/*
* Do not allow to map to user-provided address to avoid breaking the
* aliasing rules. Userspace is not able to guess the offset address of
* kernel kmalloc()ed memory area.
*/
if (addr)
return -EINVAL;
ptr = io_uring_validate_mmap_request(filp, pgoff, len);
if (IS_ERR(ptr))
return -ENOMEM;
info.flags = VM_UNMAPPED_AREA_TOPDOWN;
info.length = len;
info.low_limit = max(PAGE_SIZE, mmap_min_addr);
info.high_limit = arch_get_mmap_base(addr, current->mm->mmap_base);
#ifdef SHM_COLOUR
info.align_mask = PAGE_MASK & (SHM_COLOUR - 1UL);
#else
info.align_mask = PAGE_MASK & (SHMLBA - 1UL);
#endif
info.align_offset = (unsigned long) ptr;
/*
* A failed mmap() very likely causes application failure,
* so fall back to the bottom-up function here. This scenario
* can happen with large stack limits and large mmap()
* allocations.
*/
addr = vm_unmapped_area(&info);
if (offset_in_page(addr)) {
info.flags = 0;
info.low_limit = TASK_UNMAPPED_BASE;
info.high_limit = mmap_end;
addr = vm_unmapped_area(&info);
}
return addr;
}
#else /* !CONFIG_MMU */
static int io_uring_mmap(struct file *file, struct vm_area_struct *vma)
......@@ -3529,6 +3654,8 @@ static const struct file_operations io_uring_fops = {
#ifndef CONFIG_MMU
.get_unmapped_area = io_uring_nommu_get_unmapped_area,
.mmap_capabilities = io_uring_nommu_mmap_capabilities,
#else
.get_unmapped_area = io_uring_mmu_get_unmapped_area,
#endif
.poll = io_uring_poll,
#ifdef CONFIG_PROC_FS
......@@ -3755,11 +3882,10 @@ static __cold int io_uring_create(unsigned entries, struct io_uring_params *p,
ret = io_sq_offload_create(ctx, p);
if (ret)
goto err;
/* always set a rsrc node */
ret = io_rsrc_node_switch_start(ctx);
ret = io_rsrc_init(ctx);
if (ret)
goto err;
io_rsrc_node_switch(ctx, NULL);
memset(&p->sq_off, 0, sizeof(p->sq_off));
p->sq_off.head = offsetof(struct io_rings, sq.head);
......@@ -4425,7 +4551,7 @@ static int __init io_uring_init(void)
io_uring_optable_init();
req_cachep = KMEM_CACHE(io_kiocb, SLAB_HWCACHE_ALIGN | SLAB_PANIC |
SLAB_ACCOUNT);
SLAB_ACCOUNT | SLAB_TYPESAFE_BY_RCU);
return 0;
};
__initcall(io_uring_init);
......@@ -15,6 +15,20 @@
#include <trace/events/io_uring.h>
#endif
enum {
/* don't use deferred task_work */
IOU_F_TWQ_FORCE_NORMAL = 1,
/*
* A hint to not wake right away but delay until there are enough of
* tw's queued to match the number of CQEs the task is waiting for.
*
* Must not be used wirh requests generating more than one CQE.
* It's also ignored unless IORING_SETUP_DEFER_TASKRUN is set.
*/
IOU_F_TWQ_LAZY_WAKE = 2,
};
enum {
IOU_OK = 0,
IOU_ISSUE_SKIP_COMPLETE = -EIOCBQUEUED,
......@@ -48,20 +62,20 @@ static inline bool io_req_ffs_set(struct io_kiocb *req)
return req->flags & REQ_F_FIXED_FILE;
}
void __io_req_task_work_add(struct io_kiocb *req, bool allow_local);
void __io_req_task_work_add(struct io_kiocb *req, unsigned flags);
bool io_is_uring_fops(struct file *file);
bool io_alloc_async_data(struct io_kiocb *req);
void io_req_task_queue(struct io_kiocb *req);
void io_queue_iowq(struct io_kiocb *req, bool *dont_use);
void io_req_task_complete(struct io_kiocb *req, bool *locked);
void io_queue_iowq(struct io_kiocb *req, struct io_tw_state *ts_dont_use);
void io_req_task_complete(struct io_kiocb *req, struct io_tw_state *ts);
void io_req_task_queue_fail(struct io_kiocb *req, int ret);
void io_req_task_submit(struct io_kiocb *req, bool *locked);
void io_req_task_submit(struct io_kiocb *req, struct io_tw_state *ts);
void tctx_task_work(struct callback_head *cb);
__cold void io_uring_cancel_generic(bool cancel_all, struct io_sq_data *sqd);
int io_uring_alloc_task_context(struct task_struct *task,
struct io_ring_ctx *ctx);
int io_poll_issue(struct io_kiocb *req, bool *locked);
int io_poll_issue(struct io_kiocb *req, struct io_tw_state *ts);
int io_submit_sqes(struct io_ring_ctx *ctx, unsigned int nr);
int io_do_iopoll(struct io_ring_ctx *ctx, bool force_nonspin);
void io_free_batch_list(struct io_ring_ctx *ctx, struct io_wq_work_node *node);
......@@ -80,6 +94,8 @@ bool io_match_task_safe(struct io_kiocb *head, struct task_struct *task,
#define io_lockdep_assert_cq_locked(ctx) \
do { \
lockdep_assert(in_task()); \
\
if (ctx->flags & IORING_SETUP_IOPOLL) { \
lockdep_assert_held(&ctx->uring_lock); \
} else if (!ctx->task_complete) { \
......@@ -93,7 +109,7 @@ bool io_match_task_safe(struct io_kiocb *head, struct task_struct *task,
static inline void io_req_task_work_add(struct io_kiocb *req)
{
__io_req_task_work_add(req, true);
__io_req_task_work_add(req, 0);
}
#define io_for_each_link(pos, head) \
......@@ -228,8 +244,7 @@ static inline void io_poll_wq_wake(struct io_ring_ctx *ctx)
poll_to_key(EPOLL_URING_WAKE | EPOLLIN));
}
/* requires smb_mb() prior, see wq_has_sleeper() */
static inline void __io_cqring_wake(struct io_ring_ctx *ctx)
static inline void io_cqring_wake(struct io_ring_ctx *ctx)
{
/*
* Trigger waitqueue handler on all waiters on our waitqueue. This
......@@ -241,17 +256,11 @@ static inline void __io_cqring_wake(struct io_ring_ctx *ctx)
* waitqueue handlers, we know we have a dependency between eventfd or
* epoll and should terminate multishot poll at that point.
*/
if (waitqueue_active(&ctx->cq_wait))
if (wq_has_sleeper(&ctx->cq_wait))
__wake_up(&ctx->cq_wait, TASK_NORMAL, 0,
poll_to_key(EPOLL_URING_WAKE | EPOLLIN));
}
static inline void io_cqring_wake(struct io_ring_ctx *ctx)
{
smp_mb();
__io_cqring_wake(ctx);
}
static inline bool io_sqring_full(struct io_ring_ctx *ctx)
{
struct io_rings *r = ctx->rings;
......@@ -262,9 +271,11 @@ static inline bool io_sqring_full(struct io_ring_ctx *ctx)
static inline unsigned int io_sqring_entries(struct io_ring_ctx *ctx)
{
struct io_rings *rings = ctx->rings;
unsigned int entries;
/* make sure SQ entry isn't read before tail */
return smp_load_acquire(&rings->sq.tail) - ctx->cached_sq_head;
entries = smp_load_acquire(&rings->sq.tail) - ctx->cached_sq_head;
return min(entries, ctx->sq_entries);
}
static inline int io_run_task_work(void)
......@@ -299,11 +310,11 @@ static inline bool io_task_work_pending(struct io_ring_ctx *ctx)
return task_work_pending(current) || !wq_list_empty(&ctx->work_llist);
}
static inline void io_tw_lock(struct io_ring_ctx *ctx, bool *locked)
static inline void io_tw_lock(struct io_ring_ctx *ctx, struct io_tw_state *ts)
{
if (!*locked) {
if (!ts->locked) {
mutex_lock(&ctx->uring_lock);
*locked = true;
ts->locked = true;
}
}
......
......@@ -137,7 +137,8 @@ static void __user *io_ring_buffer_select(struct io_kiocb *req, size_t *len,
return NULL;
head &= bl->mask;
if (head < IO_BUFFER_LIST_BUF_PER_PAGE) {
/* mmaped buffers are always contig */
if (bl->is_mmap || head < IO_BUFFER_LIST_BUF_PER_PAGE) {
buf = &br->bufs[head];
} else {
int off = head & (IO_BUFFER_LIST_BUF_PER_PAGE - 1);
......@@ -179,7 +180,7 @@ void __user *io_buffer_select(struct io_kiocb *req, size_t *len,
bl = io_buffer_get_list(ctx, req->buf_index);
if (likely(bl)) {
if (bl->buf_nr_pages)
if (bl->is_mapped)
ret = io_ring_buffer_select(req, len, bl, issue_flags);
else
ret = io_provided_buffer_select(req, len, bl);
......@@ -214,17 +215,28 @@ static int __io_remove_buffers(struct io_ring_ctx *ctx,
if (!nbufs)
return 0;
if (bl->buf_nr_pages) {
int j;
if (bl->is_mapped) {
i = bl->buf_ring->tail - bl->head;
for (j = 0; j < bl->buf_nr_pages; j++)
unpin_user_page(bl->buf_pages[j]);
kvfree(bl->buf_pages);
bl->buf_pages = NULL;
bl->buf_nr_pages = 0;
if (bl->is_mmap) {
struct page *page;
page = virt_to_head_page(bl->buf_ring);
if (put_page_testzero(page))
free_compound_page(page);
bl->buf_ring = NULL;
bl->is_mmap = 0;
} else if (bl->buf_nr_pages) {
int j;
for (j = 0; j < bl->buf_nr_pages; j++)
unpin_user_page(bl->buf_pages[j]);
kvfree(bl->buf_pages);
bl->buf_pages = NULL;
bl->buf_nr_pages = 0;
}
/* make sure it's seen as empty */
INIT_LIST_HEAD(&bl->buf_list);
bl->is_mapped = 0;
return i;
}
......@@ -304,7 +316,7 @@ int io_remove_buffers(struct io_kiocb *req, unsigned int issue_flags)
if (bl) {
ret = -EINVAL;
/* can't use provide/remove buffers command on mapped buffers */
if (!bl->buf_nr_pages)
if (!bl->is_mapped)
ret = __io_remove_buffers(ctx, bl, p->nbufs);
}
io_ring_submit_unlock(ctx, issue_flags);
......@@ -449,7 +461,7 @@ int io_provide_buffers(struct io_kiocb *req, unsigned int issue_flags)
}
}
/* can't add buffers via this command for a mapped buffer ring */
if (bl->buf_nr_pages) {
if (bl->is_mapped) {
ret = -EINVAL;
goto err;
}
......@@ -464,23 +476,87 @@ int io_provide_buffers(struct io_kiocb *req, unsigned int issue_flags)
return IOU_OK;
}
int io_register_pbuf_ring(struct io_ring_ctx *ctx, void __user *arg)
static int io_pin_pbuf_ring(struct io_uring_buf_reg *reg,
struct io_buffer_list *bl)
{
struct io_uring_buf_ring *br;
struct io_uring_buf_reg reg;
struct io_buffer_list *bl, *free_bl = NULL;
struct page **pages;
int nr_pages;
pages = io_pin_pages(reg->ring_addr,
flex_array_size(br, bufs, reg->ring_entries),
&nr_pages);
if (IS_ERR(pages))
return PTR_ERR(pages);
br = page_address(pages[0]);
#ifdef SHM_COLOUR
/*
* On platforms that have specific aliasing requirements, SHM_COLOUR
* is set and we must guarantee that the kernel and user side align
* nicely. We cannot do that if IOU_PBUF_RING_MMAP isn't set and
* the application mmap's the provided ring buffer. Fail the request
* if we, by chance, don't end up with aligned addresses. The app
* should use IOU_PBUF_RING_MMAP instead, and liburing will handle
* this transparently.
*/
if ((reg->ring_addr | (unsigned long) br) & (SHM_COLOUR - 1)) {
int i;
for (i = 0; i < nr_pages; i++)
unpin_user_page(pages[i]);
return -EINVAL;
}
#endif
bl->buf_pages = pages;
bl->buf_nr_pages = nr_pages;
bl->buf_ring = br;
bl->is_mapped = 1;
bl->is_mmap = 0;
return 0;
}
static int io_alloc_pbuf_ring(struct io_uring_buf_reg *reg,
struct io_buffer_list *bl)
{
gfp_t gfp = GFP_KERNEL_ACCOUNT | __GFP_ZERO | __GFP_NOWARN | __GFP_COMP;
size_t ring_size;
void *ptr;
ring_size = reg->ring_entries * sizeof(struct io_uring_buf_ring);
ptr = (void *) __get_free_pages(gfp, get_order(ring_size));
if (!ptr)
return -ENOMEM;
bl->buf_ring = ptr;
bl->is_mapped = 1;
bl->is_mmap = 1;
return 0;
}
int io_register_pbuf_ring(struct io_ring_ctx *ctx, void __user *arg)
{
struct io_uring_buf_reg reg;
struct io_buffer_list *bl, *free_bl = NULL;
int ret;
if (copy_from_user(&reg, arg, sizeof(reg)))
return -EFAULT;
if (reg.pad || reg.resv[0] || reg.resv[1] || reg.resv[2])
if (reg.resv[0] || reg.resv[1] || reg.resv[2])
return -EINVAL;
if (!reg.ring_addr)
return -EFAULT;
if (reg.ring_addr & ~PAGE_MASK)
if (reg.flags & ~IOU_PBUF_RING_MMAP)
return -EINVAL;
if (!(reg.flags & IOU_PBUF_RING_MMAP)) {
if (!reg.ring_addr)
return -EFAULT;
if (reg.ring_addr & ~PAGE_MASK)
return -EINVAL;
} else {
if (reg.ring_addr)
return -EINVAL;
}
if (!is_power_of_2(reg.ring_entries))
return -EINVAL;
......@@ -497,7 +573,7 @@ int io_register_pbuf_ring(struct io_ring_ctx *ctx, void __user *arg)
bl = io_buffer_get_list(ctx, reg.bgid);
if (bl) {
/* if mapped buffer ring OR classic exists, don't allow */
if (bl->buf_nr_pages || !list_empty(&bl->buf_list))
if (bl->is_mapped || !list_empty(&bl->buf_list))
return -EEXIST;
} else {
free_bl = bl = kzalloc(sizeof(*bl), GFP_KERNEL);
......@@ -505,22 +581,21 @@ int io_register_pbuf_ring(struct io_ring_ctx *ctx, void __user *arg)
return -ENOMEM;
}
pages = io_pin_pages(reg.ring_addr,
flex_array_size(br, bufs, reg.ring_entries),
&nr_pages);
if (IS_ERR(pages)) {
kfree(free_bl);
return PTR_ERR(pages);
if (!(reg.flags & IOU_PBUF_RING_MMAP))
ret = io_pin_pbuf_ring(&reg, bl);
else
ret = io_alloc_pbuf_ring(&reg, bl);
if (!ret) {
bl->nr_entries = reg.ring_entries;
bl->mask = reg.ring_entries - 1;
io_buffer_add_list(ctx, bl, reg.bgid);
return 0;
}
br = page_address(pages[0]);
bl->buf_pages = pages;
bl->buf_nr_pages = nr_pages;
bl->nr_entries = reg.ring_entries;
bl->buf_ring = br;
bl->mask = reg.ring_entries - 1;
io_buffer_add_list(ctx, bl, reg.bgid);
return 0;
kfree(free_bl);
return ret;
}
int io_unregister_pbuf_ring(struct io_ring_ctx *ctx, void __user *arg)
......@@ -530,13 +605,15 @@ int io_unregister_pbuf_ring(struct io_ring_ctx *ctx, void __user *arg)
if (copy_from_user(&reg, arg, sizeof(reg)))
return -EFAULT;
if (reg.pad || reg.resv[0] || reg.resv[1] || reg.resv[2])
if (reg.resv[0] || reg.resv[1] || reg.resv[2])
return -EINVAL;
if (reg.flags)
return -EINVAL;
bl = io_buffer_get_list(ctx, reg.bgid);
if (!bl)
return -ENOENT;
if (!bl->buf_nr_pages)
if (!bl->is_mapped)
return -EINVAL;
__io_remove_buffers(ctx, bl, -1U);
......@@ -546,3 +623,14 @@ int io_unregister_pbuf_ring(struct io_ring_ctx *ctx, void __user *arg)
}
return 0;
}
void *io_pbuf_get_address(struct io_ring_ctx *ctx, unsigned long bgid)
{
struct io_buffer_list *bl;
bl = io_buffer_get_list(ctx, bgid);
if (!bl || !bl->is_mmap)
return NULL;
return bl->buf_ring;
}
......@@ -23,6 +23,11 @@ struct io_buffer_list {
__u16 nr_entries;
__u16 head;
__u16 mask;
/* ring mapped provided buffers */
__u8 is_mapped;
/* ring mapped provided buffers, but mmap'ed by application */
__u8 is_mmap;
};
struct io_buffer {
......@@ -50,6 +55,8 @@ unsigned int __io_put_kbuf(struct io_kiocb *req, unsigned issue_flags);
void io_kbuf_recycle_legacy(struct io_kiocb *req, unsigned issue_flags);
void *io_pbuf_get_address(struct io_ring_ctx *ctx, unsigned long bgid);
static inline void io_kbuf_recycle_ring(struct io_kiocb *req)
{
/*
......
......@@ -5,8 +5,8 @@
#include "alloc_cache.h"
#if defined(CONFIG_NET)
struct io_async_msghdr {
#if defined(CONFIG_NET)
union {
struct iovec fast_iov[UIO_FASTIOV];
struct {
......@@ -22,8 +22,11 @@ struct io_async_msghdr {
struct sockaddr __user *uaddr;
struct msghdr msg;
struct sockaddr_storage addr;
#endif
};
#if defined(CONFIG_NET)
struct io_async_connect {
struct sockaddr_storage address;
};
......
......@@ -9,7 +9,7 @@
#include "notif.h"
#include "rsrc.h"
static void io_notif_complete_tw_ext(struct io_kiocb *notif, bool *locked)
static void io_notif_complete_tw_ext(struct io_kiocb *notif, struct io_tw_state *ts)
{
struct io_notif_data *nd = io_notif_to_data(notif);
struct io_ring_ctx *ctx = notif->ctx;
......@@ -21,7 +21,7 @@ static void io_notif_complete_tw_ext(struct io_kiocb *notif, bool *locked)
__io_unaccount_mem(ctx->user, nd->account_pages);
nd->account_pages = 0;
}
io_req_task_complete(notif, locked);
io_req_task_complete(notif, ts);
}
static void io_tx_ubuf_callback(struct sk_buff *skb, struct ubuf_info *uarg,
......@@ -31,7 +31,7 @@ static void io_tx_ubuf_callback(struct sk_buff *skb, struct ubuf_info *uarg,
struct io_kiocb *notif = cmd_to_io_kiocb(nd);
if (refcount_dec_and_test(&uarg->refcnt))
io_req_task_work_add(notif);
__io_req_task_work_add(notif, IOU_F_TWQ_LAZY_WAKE);
}
static void io_tx_ubuf_callback_ext(struct sk_buff *skb, struct ubuf_info *uarg,
......@@ -79,7 +79,7 @@ struct io_kiocb *io_alloc_notif(struct io_ring_ctx *ctx)
notif->io_task_work.func = io_req_task_complete;
nd = io_notif_to_data(notif);
nd->uarg.flags = SKBFL_ZEROCOPY_FRAG | SKBFL_DONT_ORPHAN;
nd->uarg.flags = IO_NOTIF_UBUF_FLAGS;
nd->uarg.callback = io_tx_ubuf_callback;
refcount_set(&nd->uarg.refcnt, 1);
return notif;
......
......@@ -7,6 +7,7 @@
#include "rsrc.h"
#define IO_NOTIF_UBUF_FLAGS (SKBFL_ZEROCOPY_FRAG | SKBFL_DONT_ORPHAN)
#define IO_NOTIF_SPLICE_BATCH 32
struct io_notif_data {
......@@ -33,7 +34,7 @@ static inline void io_notif_flush(struct io_kiocb *notif)
/* drop slot's master ref */
if (refcount_dec_and_test(&nd->uarg.refcnt))
io_req_task_work_add(notif);
__io_req_task_work_add(notif, IOU_F_TWQ_LAZY_WAKE);
}
static inline int io_notif_account_mem(struct io_kiocb *notif, unsigned len)
......
......@@ -148,7 +148,7 @@ static void io_poll_req_insert_locked(struct io_kiocb *req)
hlist_add_head(&req->hash_node, &table->hbs[index].list);
}
static void io_poll_tw_hash_eject(struct io_kiocb *req, bool *locked)
static void io_poll_tw_hash_eject(struct io_kiocb *req, struct io_tw_state *ts)
{
struct io_ring_ctx *ctx = req->ctx;
......@@ -159,7 +159,7 @@ static void io_poll_tw_hash_eject(struct io_kiocb *req, bool *locked)
* already grabbed the mutex for us, but there is a chance it
* failed.
*/
io_tw_lock(ctx, locked);
io_tw_lock(ctx, ts);
hash_del(&req->hash_node);
req->flags &= ~REQ_F_HASH_LOCKED;
} else {
......@@ -238,7 +238,7 @@ enum {
* req->cqe.res. IOU_POLL_REMOVE_POLL_USE_RES indicates to remove multishot
* poll and that the result is stored in req->cqe.
*/
static int io_poll_check_events(struct io_kiocb *req, bool *locked)
static int io_poll_check_events(struct io_kiocb *req, struct io_tw_state *ts)
{
int v;
......@@ -300,13 +300,13 @@ static int io_poll_check_events(struct io_kiocb *req, bool *locked)
__poll_t mask = mangle_poll(req->cqe.res &
req->apoll_events);
if (!io_aux_cqe(req->ctx, *locked, req->cqe.user_data,
if (!io_aux_cqe(req->ctx, ts->locked, req->cqe.user_data,
mask, IORING_CQE_F_MORE, false)) {
io_req_set_res(req, mask, 0);
return IOU_POLL_REMOVE_POLL_USE_RES;
}
} else {
int ret = io_poll_issue(req, locked);
int ret = io_poll_issue(req, ts);
if (ret == IOU_STOP_MULTISHOT)
return IOU_POLL_REMOVE_POLL_USE_RES;
if (ret < 0)
......@@ -326,15 +326,15 @@ static int io_poll_check_events(struct io_kiocb *req, bool *locked)
return IOU_POLL_NO_ACTION;
}
static void io_poll_task_func(struct io_kiocb *req, bool *locked)
static void io_poll_task_func(struct io_kiocb *req, struct io_tw_state *ts)
{
int ret;
ret = io_poll_check_events(req, locked);
ret = io_poll_check_events(req, ts);
if (ret == IOU_POLL_NO_ACTION)
return;
io_poll_remove_entries(req);
io_poll_tw_hash_eject(req, locked);
io_poll_tw_hash_eject(req, ts);
if (req->opcode == IORING_OP_POLL_ADD) {
if (ret == IOU_POLL_DONE) {
......@@ -343,7 +343,7 @@ static void io_poll_task_func(struct io_kiocb *req, bool *locked)
poll = io_kiocb_to_cmd(req, struct io_poll);
req->cqe.res = mangle_poll(req->cqe.res & poll->events);
} else if (ret == IOU_POLL_REISSUE) {
io_req_task_submit(req, locked);
io_req_task_submit(req, ts);
return;
} else if (ret != IOU_POLL_REMOVE_POLL_USE_RES) {
req->cqe.res = ret;
......@@ -351,14 +351,14 @@ static void io_poll_task_func(struct io_kiocb *req, bool *locked)
}
io_req_set_res(req, req->cqe.res, 0);
io_req_task_complete(req, locked);
io_req_task_complete(req, ts);
} else {
io_tw_lock(req->ctx, locked);
io_tw_lock(req->ctx, ts);
if (ret == IOU_POLL_REMOVE_POLL_USE_RES)
io_req_task_complete(req, locked);
io_req_task_complete(req, ts);
else if (ret == IOU_POLL_DONE || ret == IOU_POLL_REISSUE)
io_req_task_submit(req, locked);
io_req_task_submit(req, ts);
else
io_req_defer_failed(req, ret);
}
......@@ -977,7 +977,7 @@ int io_poll_remove(struct io_kiocb *req, unsigned int issue_flags)
struct io_hash_bucket *bucket;
struct io_kiocb *preq;
int ret2, ret = 0;
bool locked;
struct io_tw_state ts = {};
preq = io_poll_find(ctx, true, &cd, &ctx->cancel_table, &bucket);
ret2 = io_poll_disarm(preq);
......@@ -1027,8 +1027,8 @@ int io_poll_remove(struct io_kiocb *req, unsigned int issue_flags)
req_set_fail(preq);
io_req_set_res(preq, -ECANCELED, 0);
locked = !(issue_flags & IO_URING_F_UNLOCKED);
io_req_task_complete(preq, &locked);
ts.locked = !(issue_flags & IO_URING_F_UNLOCKED);
io_req_task_complete(preq, &ts);
out:
if (ret < 0) {
req_set_fail(req);
......
......@@ -23,25 +23,16 @@ struct io_rsrc_update {
u32 offset;
};
static void io_rsrc_buf_put(struct io_ring_ctx *ctx, struct io_rsrc_put *prsrc);
static void io_rsrc_file_put(struct io_ring_ctx *ctx, struct io_rsrc_put *prsrc);
static int io_sqe_buffer_register(struct io_ring_ctx *ctx, struct iovec *iov,
struct io_mapped_ubuf **pimu,
struct page **last_hpage);
#define IO_RSRC_REF_BATCH 100
/* only define max */
#define IORING_MAX_FIXED_FILES (1U << 20)
#define IORING_MAX_REG_BUFFERS (1U << 14)
void io_rsrc_refs_drop(struct io_ring_ctx *ctx)
__must_hold(&ctx->uring_lock)
{
if (ctx->rsrc_cached_refs) {
io_rsrc_put_node(ctx->rsrc_node, ctx->rsrc_cached_refs);
ctx->rsrc_cached_refs = 0;
}
}
int __io_account_mem(struct user_struct *user, unsigned long nr_pages)
{
unsigned long page_limit, cur_pages, new_pages;
......@@ -151,216 +142,129 @@ static void io_buffer_unmap(struct io_ring_ctx *ctx, struct io_mapped_ubuf **slo
*slot = NULL;
}
void io_rsrc_refs_refill(struct io_ring_ctx *ctx)
__must_hold(&ctx->uring_lock)
{
ctx->rsrc_cached_refs += IO_RSRC_REF_BATCH;
percpu_ref_get_many(&ctx->rsrc_node->refs, IO_RSRC_REF_BATCH);
}
static void __io_rsrc_put_work(struct io_rsrc_node *ref_node)
{
struct io_rsrc_data *rsrc_data = ref_node->rsrc_data;
struct io_ring_ctx *ctx = rsrc_data->ctx;
struct io_rsrc_put *prsrc, *tmp;
list_for_each_entry_safe(prsrc, tmp, &ref_node->rsrc_list, list) {
list_del(&prsrc->list);
if (prsrc->tag) {
if (ctx->flags & IORING_SETUP_IOPOLL) {
mutex_lock(&ctx->uring_lock);
io_post_aux_cqe(ctx, prsrc->tag, 0, 0);
mutex_unlock(&ctx->uring_lock);
} else {
io_post_aux_cqe(ctx, prsrc->tag, 0, 0);
}
}
rsrc_data->do_put(ctx, prsrc);
kfree(prsrc);
}
io_rsrc_node_destroy(ref_node);
if (atomic_dec_and_test(&rsrc_data->refs))
complete(&rsrc_data->done);
}
void io_rsrc_put_work(struct work_struct *work)
static void io_rsrc_put_work(struct io_rsrc_node *node)
{
struct io_ring_ctx *ctx;
struct llist_node *node;
ctx = container_of(work, struct io_ring_ctx, rsrc_put_work.work);
node = llist_del_all(&ctx->rsrc_put_llist);
struct io_rsrc_put *prsrc = &node->item;
while (node) {
struct io_rsrc_node *ref_node;
struct llist_node *next = node->next;
if (prsrc->tag)
io_post_aux_cqe(node->ctx, prsrc->tag, 0, 0);
ref_node = llist_entry(node, struct io_rsrc_node, llist);
__io_rsrc_put_work(ref_node);
node = next;
switch (node->type) {
case IORING_RSRC_FILE:
io_rsrc_file_put(node->ctx, prsrc);
break;
case IORING_RSRC_BUFFER:
io_rsrc_buf_put(node->ctx, prsrc);
break;
default:
WARN_ON_ONCE(1);
break;
}
}
void io_rsrc_put_tw(struct callback_head *cb)
void io_rsrc_node_destroy(struct io_ring_ctx *ctx, struct io_rsrc_node *node)
{
struct io_ring_ctx *ctx = container_of(cb, struct io_ring_ctx,
rsrc_put_tw);
io_rsrc_put_work(&ctx->rsrc_put_work.work);
if (!io_alloc_cache_put(&ctx->rsrc_node_cache, &node->cache))
kfree(node);
}
void io_wait_rsrc_data(struct io_rsrc_data *data)
void io_rsrc_node_ref_zero(struct io_rsrc_node *node)
__must_hold(&node->ctx->uring_lock)
{
if (data && !atomic_dec_and_test(&data->refs))
wait_for_completion(&data->done);
}
void io_rsrc_node_destroy(struct io_rsrc_node *ref_node)
{
percpu_ref_exit(&ref_node->refs);
kfree(ref_node);
}
static __cold void io_rsrc_node_ref_zero(struct percpu_ref *ref)
{
struct io_rsrc_node *node = container_of(ref, struct io_rsrc_node, refs);
struct io_ring_ctx *ctx = node->rsrc_data->ctx;
unsigned long flags;
bool first_add = false;
unsigned long delay = HZ;
spin_lock_irqsave(&ctx->rsrc_ref_lock, flags);
node->done = true;
/* if we are mid-quiesce then do not delay */
if (node->rsrc_data->quiesce)
delay = 0;
struct io_ring_ctx *ctx = node->ctx;
while (!list_empty(&ctx->rsrc_ref_list)) {
node = list_first_entry(&ctx->rsrc_ref_list,
struct io_rsrc_node, node);
/* recycle ref nodes in order */
if (!node->done)
if (node->refs)
break;
list_del(&node->node);
first_add |= llist_add(&node->llist, &ctx->rsrc_put_llist);
}
spin_unlock_irqrestore(&ctx->rsrc_ref_lock, flags);
if (!first_add)
return;
if (ctx->submitter_task) {
if (!task_work_add(ctx->submitter_task, &ctx->rsrc_put_tw,
ctx->notify_method))
return;
if (likely(!node->empty))
io_rsrc_put_work(node);
io_rsrc_node_destroy(ctx, node);
}
mod_delayed_work(system_wq, &ctx->rsrc_put_work, delay);
if (list_empty(&ctx->rsrc_ref_list) && unlikely(ctx->rsrc_quiesce))
wake_up_all(&ctx->rsrc_quiesce_wq);
}
static struct io_rsrc_node *io_rsrc_node_alloc(void)
struct io_rsrc_node *io_rsrc_node_alloc(struct io_ring_ctx *ctx)
{
struct io_rsrc_node *ref_node;
struct io_cache_entry *entry;
ref_node = kzalloc(sizeof(*ref_node), GFP_KERNEL);
if (!ref_node)
return NULL;
if (percpu_ref_init(&ref_node->refs, io_rsrc_node_ref_zero,
0, GFP_KERNEL)) {
kfree(ref_node);
return NULL;
}
INIT_LIST_HEAD(&ref_node->node);
INIT_LIST_HEAD(&ref_node->rsrc_list);
ref_node->done = false;
return ref_node;
}
void io_rsrc_node_switch(struct io_ring_ctx *ctx,
struct io_rsrc_data *data_to_kill)
__must_hold(&ctx->uring_lock)
{
WARN_ON_ONCE(!ctx->rsrc_backup_node);
WARN_ON_ONCE(data_to_kill && !ctx->rsrc_node);
io_rsrc_refs_drop(ctx);
if (data_to_kill) {
struct io_rsrc_node *rsrc_node = ctx->rsrc_node;
rsrc_node->rsrc_data = data_to_kill;
spin_lock_irq(&ctx->rsrc_ref_lock);
list_add_tail(&rsrc_node->node, &ctx->rsrc_ref_list);
spin_unlock_irq(&ctx->rsrc_ref_lock);
atomic_inc(&data_to_kill->refs);
percpu_ref_kill(&rsrc_node->refs);
ctx->rsrc_node = NULL;
}
if (!ctx->rsrc_node) {
ctx->rsrc_node = ctx->rsrc_backup_node;
ctx->rsrc_backup_node = NULL;
entry = io_alloc_cache_get(&ctx->rsrc_node_cache);
if (entry) {
ref_node = container_of(entry, struct io_rsrc_node, cache);
} else {
ref_node = kzalloc(sizeof(*ref_node), GFP_KERNEL);
if (!ref_node)
return NULL;
}
}
int io_rsrc_node_switch_start(struct io_ring_ctx *ctx)
{
if (ctx->rsrc_backup_node)
return 0;
ctx->rsrc_backup_node = io_rsrc_node_alloc();
return ctx->rsrc_backup_node ? 0 : -ENOMEM;
ref_node->ctx = ctx;
ref_node->empty = 0;
ref_node->refs = 1;
return ref_node;
}
__cold static int io_rsrc_ref_quiesce(struct io_rsrc_data *data,
struct io_ring_ctx *ctx)
{
struct io_rsrc_node *backup;
DEFINE_WAIT(we);
int ret;
/* As we may drop ->uring_lock, other task may have started quiesce */
/* As We may drop ->uring_lock, other task may have started quiesce */
if (data->quiesce)
return -ENXIO;
ret = io_rsrc_node_switch_start(ctx);
if (ret)
return ret;
io_rsrc_node_switch(ctx, data);
/* kill initial ref, already quiesced if zero */
if (atomic_dec_and_test(&data->refs))
backup = io_rsrc_node_alloc(ctx);
if (!backup)
return -ENOMEM;
ctx->rsrc_node->empty = true;
ctx->rsrc_node->type = -1;
list_add_tail(&ctx->rsrc_node->node, &ctx->rsrc_ref_list);
io_put_rsrc_node(ctx, ctx->rsrc_node);
ctx->rsrc_node = backup;
if (list_empty(&ctx->rsrc_ref_list))
return 0;
if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
atomic_set(&ctx->cq_wait_nr, 1);
smp_mb();
}
ctx->rsrc_quiesce++;
data->quiesce = true;
mutex_unlock(&ctx->uring_lock);
do {
prepare_to_wait(&ctx->rsrc_quiesce_wq, &we, TASK_INTERRUPTIBLE);
mutex_unlock(&ctx->uring_lock);
ret = io_run_task_work_sig(ctx);
if (ret < 0) {
atomic_inc(&data->refs);
/* wait for all works potentially completing data->done */
flush_delayed_work(&ctx->rsrc_put_work);
reinit_completion(&data->done);
mutex_lock(&ctx->uring_lock);
if (list_empty(&ctx->rsrc_ref_list))
ret = 0;
break;
}
flush_delayed_work(&ctx->rsrc_put_work);
ret = wait_for_completion_interruptible(&data->done);
if (!ret) {
mutex_lock(&ctx->uring_lock);
if (atomic_read(&data->refs) <= 0)
break;
/*
* it has been revived by another thread while
* we were unlocked
*/
mutex_unlock(&ctx->uring_lock);
}
} while (1);
schedule();
__set_current_state(TASK_RUNNING);
mutex_lock(&ctx->uring_lock);
ret = 0;
} while (!list_empty(&ctx->rsrc_ref_list));
finish_wait(&ctx->rsrc_quiesce_wq, &we);
data->quiesce = false;
ctx->rsrc_quiesce--;
if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
atomic_set(&ctx->cq_wait_nr, 0);
smp_mb();
}
return ret;
}
......@@ -405,8 +309,8 @@ static __cold void **io_alloc_page_table(size_t size)
return table;
}
__cold static int io_rsrc_data_alloc(struct io_ring_ctx *ctx,
rsrc_put_fn *do_put, u64 __user *utags,
__cold static int io_rsrc_data_alloc(struct io_ring_ctx *ctx, int type,
u64 __user *utags,
unsigned nr, struct io_rsrc_data **pdata)
{
struct io_rsrc_data *data;
......@@ -424,7 +328,7 @@ __cold static int io_rsrc_data_alloc(struct io_ring_ctx *ctx,
data->nr = nr;
data->ctx = ctx;
data->do_put = do_put;
data->rsrc_type = type;
if (utags) {
ret = -EFAULT;
for (i = 0; i < nr; i++) {
......@@ -435,9 +339,6 @@ __cold static int io_rsrc_data_alloc(struct io_ring_ctx *ctx,
goto fail;
}
}
atomic_set(&data->refs, 1);
init_completion(&data->done);
*pdata = data;
return 0;
fail:
......@@ -456,7 +357,6 @@ static int __io_sqe_files_update(struct io_ring_ctx *ctx,
struct file *file;
int fd, i, err = 0;
unsigned int done;
bool needs_switch = false;
if (!ctx->file_data)
return -ENXIO;
......@@ -483,12 +383,11 @@ static int __io_sqe_files_update(struct io_ring_ctx *ctx,
if (file_slot->file_ptr) {
file = (struct file *)(file_slot->file_ptr & FFS_MASK);
err = io_queue_rsrc_removal(data, i, ctx->rsrc_node, file);
err = io_queue_rsrc_removal(data, i, file);
if (err)
break;
file_slot->file_ptr = 0;
io_file_bitmap_clear(&ctx->file_table, i);
needs_switch = true;
}
if (fd != -1) {
file = fget(fd);
......@@ -519,9 +418,6 @@ static int __io_sqe_files_update(struct io_ring_ctx *ctx,
io_file_bitmap_set(&ctx->file_table, i);
}
}
if (needs_switch)
io_rsrc_node_switch(ctx, data);
return done ? done : err;
}
......@@ -532,7 +428,6 @@ static int __io_sqe_buffers_update(struct io_ring_ctx *ctx,
u64 __user *tags = u64_to_user_ptr(up->tags);
struct iovec iov, __user *iovs = u64_to_user_ptr(up->data);
struct page *last_hpage = NULL;
bool needs_switch = false;
__u32 done;
int i, err;
......@@ -543,7 +438,6 @@ static int __io_sqe_buffers_update(struct io_ring_ctx *ctx,
for (done = 0; done < nr_args; done++) {
struct io_mapped_ubuf *imu;
int offset = up->offset + done;
u64 tag = 0;
err = io_copy_iov(ctx, &iov, iovs, done);
......@@ -564,24 +458,20 @@ static int __io_sqe_buffers_update(struct io_ring_ctx *ctx,
if (err)
break;
i = array_index_nospec(offset, ctx->nr_user_bufs);
i = array_index_nospec(up->offset + done, ctx->nr_user_bufs);
if (ctx->user_bufs[i] != ctx->dummy_ubuf) {
err = io_queue_rsrc_removal(ctx->buf_data, i,
ctx->rsrc_node, ctx->user_bufs[i]);
ctx->user_bufs[i]);
if (unlikely(err)) {
io_buffer_unmap(ctx, &imu);
break;
}
ctx->user_bufs[i] = ctx->dummy_ubuf;
needs_switch = true;
}
ctx->user_bufs[i] = imu;
*io_get_tag_slot(ctx->buf_data, offset) = tag;
*io_get_tag_slot(ctx->buf_data, i) = tag;
}
if (needs_switch)
io_rsrc_node_switch(ctx, ctx->buf_data);
return done ? done : err;
}
......@@ -590,13 +480,11 @@ static int __io_register_rsrc_update(struct io_ring_ctx *ctx, unsigned type,
unsigned nr_args)
{
__u32 tmp;
int err;
lockdep_assert_held(&ctx->uring_lock);
if (check_add_overflow(up->offset, nr_args, &tmp))
return -EOVERFLOW;
err = io_rsrc_node_switch_start(ctx);
if (err)
return err;
switch (type) {
case IORING_RSRC_FILE:
......@@ -753,20 +641,24 @@ int io_files_update(struct io_kiocb *req, unsigned int issue_flags)
return IOU_OK;
}
int io_queue_rsrc_removal(struct io_rsrc_data *data, unsigned idx,
struct io_rsrc_node *node, void *rsrc)
int io_queue_rsrc_removal(struct io_rsrc_data *data, unsigned idx, void *rsrc)
{
struct io_ring_ctx *ctx = data->ctx;
struct io_rsrc_node *node = ctx->rsrc_node;
u64 *tag_slot = io_get_tag_slot(data, idx);
struct io_rsrc_put *prsrc;
prsrc = kzalloc(sizeof(*prsrc), GFP_KERNEL);
if (!prsrc)
ctx->rsrc_node = io_rsrc_node_alloc(ctx);
if (unlikely(!ctx->rsrc_node)) {
ctx->rsrc_node = node;
return -ENOMEM;
}
prsrc->tag = *tag_slot;
node->item.rsrc = rsrc;
node->type = data->rsrc_type;
node->item.tag = *tag_slot;
*tag_slot = 0;
prsrc->rsrc = rsrc;
list_add(&prsrc->list, &node->rsrc_list);
list_add_tail(&node->node, &ctx->rsrc_ref_list);
io_put_rsrc_node(ctx, node);
return 0;
}
......@@ -882,20 +774,14 @@ int __io_scm_file_account(struct io_ring_ctx *ctx, struct file *file)
return 0;
}
static void io_rsrc_file_put(struct io_ring_ctx *ctx, struct io_rsrc_put *prsrc)
static __cold void io_rsrc_file_scm_put(struct io_ring_ctx *ctx, struct file *file)
{
struct file *file = prsrc->file;
#if defined(CONFIG_UNIX)
struct sock *sock = ctx->ring_sock->sk;
struct sk_buff_head list, *head = &sock->sk_receive_queue;
struct sk_buff *skb;
int i;
if (!io_file_need_scm(file)) {
fput(file);
return;
}
__skb_queue_head_init(&list);
/*
......@@ -945,11 +831,19 @@ static void io_rsrc_file_put(struct io_ring_ctx *ctx, struct io_rsrc_put *prsrc)
__skb_queue_tail(head, skb);
spin_unlock_irq(&head->lock);
}
#else
fput(file);
#endif
}
static void io_rsrc_file_put(struct io_ring_ctx *ctx, struct io_rsrc_put *prsrc)
{
struct file *file = prsrc->file;
if (likely(!io_file_need_scm(file)))
fput(file);
else
io_rsrc_file_scm_put(ctx, file);
}
int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg,
unsigned nr_args, u64 __user *tags)
{
......@@ -966,10 +860,7 @@ int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg,
return -EMFILE;
if (nr_args > rlimit(RLIMIT_NOFILE))
return -EMFILE;
ret = io_rsrc_node_switch_start(ctx);
if (ret)
return ret;
ret = io_rsrc_data_alloc(ctx, io_rsrc_file_put, tags, nr_args,
ret = io_rsrc_data_alloc(ctx, IORING_RSRC_FILE, tags, nr_args,
&ctx->file_data);
if (ret)
return ret;
......@@ -1023,7 +914,6 @@ int io_sqe_files_register(struct io_ring_ctx *ctx, void __user *arg,
/* default it to the whole table */
io_file_table_set_alloc_range(ctx, 0, ctx->nr_user_files);
io_rsrc_node_switch(ctx, NULL);
return 0;
fail:
__io_sqe_files_unregister(ctx);
......@@ -1163,17 +1053,14 @@ struct page **io_pin_pages(unsigned long ubuf, unsigned long len, int *npages)
pret = pin_user_pages(ubuf, nr_pages, FOLL_WRITE | FOLL_LONGTERM,
pages, vmas);
if (pret == nr_pages) {
struct file *file = vmas[0]->vm_file;
/* don't support file backed memory */
for (i = 0; i < nr_pages; i++) {
if (vmas[i]->vm_file != file) {
ret = -EINVAL;
break;
}
if (!file)
struct vm_area_struct *vma = vmas[i];
if (vma_is_shmem(vma))
continue;
if (!vma_is_shmem(vmas[i]) && !is_file_hugepages(file)) {
if (vma->vm_file &&
!is_file_hugepages(vma->vm_file)) {
ret = -EOPNOTSUPP;
break;
}
......@@ -1305,10 +1192,7 @@ int io_sqe_buffers_register(struct io_ring_ctx *ctx, void __user *arg,
return -EBUSY;
if (!nr_args || nr_args > IORING_MAX_REG_BUFFERS)
return -EINVAL;
ret = io_rsrc_node_switch_start(ctx);
if (ret)
return ret;
ret = io_rsrc_data_alloc(ctx, io_rsrc_buf_put, tags, nr_args, &data);
ret = io_rsrc_data_alloc(ctx, IORING_RSRC_BUFFER, tags, nr_args, &data);
if (ret)
return ret;
ret = io_buffers_map_alloc(ctx, nr_args);
......@@ -1345,8 +1229,6 @@ int io_sqe_buffers_register(struct io_ring_ctx *ctx, void __user *arg,
ctx->buf_data = data;
if (ret)
__io_sqe_buffers_unregister(ctx);
else
io_rsrc_node_switch(ctx, NULL);
return ret;
}
......
......@@ -4,6 +4,10 @@
#include <net/af_unix.h>
#include "alloc_cache.h"
#define IO_NODE_ALLOC_CACHE_MAX 32
#define IO_RSRC_TAG_TABLE_SHIFT (PAGE_SHIFT - 3)
#define IO_RSRC_TAG_TABLE_MAX (1U << IO_RSRC_TAG_TABLE_SHIFT)
#define IO_RSRC_TAG_TABLE_MASK (IO_RSRC_TAG_TABLE_MAX - 1)
......@@ -14,7 +18,6 @@ enum {
};
struct io_rsrc_put {
struct list_head list;
u64 tag;
union {
void *rsrc;
......@@ -30,19 +33,20 @@ struct io_rsrc_data {
u64 **tags;
unsigned int nr;
rsrc_put_fn *do_put;
atomic_t refs;
struct completion done;
u16 rsrc_type;
bool quiesce;
};
struct io_rsrc_node {
struct percpu_ref refs;
union {
struct io_cache_entry cache;
struct io_ring_ctx *ctx;
};
int refs;
bool empty;
u16 type;
struct list_head node;
struct list_head rsrc_list;
struct io_rsrc_data *rsrc_data;
struct llist_node llist;
bool done;
struct io_rsrc_put item;
};
struct io_mapped_ubuf {
......@@ -54,16 +58,10 @@ struct io_mapped_ubuf {
};
void io_rsrc_put_tw(struct callback_head *cb);
void io_rsrc_put_work(struct work_struct *work);
void io_rsrc_refs_refill(struct io_ring_ctx *ctx);
void io_wait_rsrc_data(struct io_rsrc_data *data);
void io_rsrc_node_destroy(struct io_rsrc_node *ref_node);
void io_rsrc_refs_drop(struct io_ring_ctx *ctx);
int io_rsrc_node_switch_start(struct io_ring_ctx *ctx);
int io_queue_rsrc_removal(struct io_rsrc_data *data, unsigned idx,
struct io_rsrc_node *node, void *rsrc);
void io_rsrc_node_switch(struct io_ring_ctx *ctx,
struct io_rsrc_data *data_to_kill);
void io_rsrc_node_ref_zero(struct io_rsrc_node *node);
void io_rsrc_node_destroy(struct io_ring_ctx *ctx, struct io_rsrc_node *ref_node);
struct io_rsrc_node *io_rsrc_node_alloc(struct io_ring_ctx *ctx);
int io_queue_rsrc_removal(struct io_rsrc_data *data, unsigned idx, void *rsrc);
int io_import_fixed(int ddir, struct iov_iter *iter,
struct io_mapped_ubuf *imu,
......@@ -107,36 +105,24 @@ int io_register_rsrc_update(struct io_ring_ctx *ctx, void __user *arg,
int io_register_rsrc(struct io_ring_ctx *ctx, void __user *arg,
unsigned int size, unsigned int type);
static inline void io_rsrc_put_node(struct io_rsrc_node *node, int nr)
static inline void io_put_rsrc_node(struct io_ring_ctx *ctx, struct io_rsrc_node *node)
{
percpu_ref_put_many(&node->refs, nr);
}
lockdep_assert_held(&ctx->uring_lock);
static inline void io_req_put_rsrc(struct io_kiocb *req)
{
if (req->rsrc_node)
io_rsrc_put_node(req->rsrc_node, 1);
if (node && !--node->refs)
io_rsrc_node_ref_zero(node);
}
static inline void io_req_put_rsrc_locked(struct io_kiocb *req,
struct io_ring_ctx *ctx)
__must_hold(&ctx->uring_lock)
{
struct io_rsrc_node *node = req->rsrc_node;
if (node) {
if (node == ctx->rsrc_node)
ctx->rsrc_cached_refs++;
else
io_rsrc_put_node(node, 1);
}
io_put_rsrc_node(ctx, req->rsrc_node);
}
static inline void io_charge_rsrc_node(struct io_ring_ctx *ctx)
static inline void io_charge_rsrc_node(struct io_ring_ctx *ctx,
struct io_rsrc_node *node)
{
ctx->rsrc_cached_refs--;
if (unlikely(ctx->rsrc_cached_refs < 0))
io_rsrc_refs_refill(ctx);
node->refs++;
}
static inline void io_req_set_rsrc_node(struct io_kiocb *req,
......@@ -149,7 +135,7 @@ static inline void io_req_set_rsrc_node(struct io_kiocb *req,
lockdep_assert_held(&ctx->uring_lock);
req->rsrc_node = ctx->rsrc_node;
io_charge_rsrc_node(ctx);
io_charge_rsrc_node(ctx, ctx->rsrc_node);
io_ring_submit_unlock(ctx, issue_flags);
}
}
......@@ -162,6 +148,12 @@ static inline u64 *io_get_tag_slot(struct io_rsrc_data *data, unsigned int idx)
return &data->tags[table_idx][off];
}
static inline int io_rsrc_init(struct io_ring_ctx *ctx)
{
ctx->rsrc_node = io_rsrc_node_alloc(ctx);
return ctx->rsrc_node ? 0 : -ENOMEM;
}
int io_files_update(struct io_kiocb *req, unsigned int issue_flags);
int io_files_update_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe);
......
......@@ -283,16 +283,16 @@ static inline int io_fixup_rw_res(struct io_kiocb *req, long res)
return res;
}
static void io_req_rw_complete(struct io_kiocb *req, bool *locked)
static void io_req_rw_complete(struct io_kiocb *req, struct io_tw_state *ts)
{
io_req_io_end(req);
if (req->flags & (REQ_F_BUFFER_SELECTED|REQ_F_BUFFER_RING)) {
unsigned issue_flags = *locked ? 0 : IO_URING_F_UNLOCKED;
unsigned issue_flags = ts->locked ? 0 : IO_URING_F_UNLOCKED;
req->cqe.flags |= io_put_kbuf(req, issue_flags);
}
io_req_task_complete(req, locked);
io_req_task_complete(req, ts);
}
static void io_complete_rw(struct kiocb *kiocb, long res)
......@@ -304,7 +304,7 @@ static void io_complete_rw(struct kiocb *kiocb, long res)
return;
io_req_set_res(req, io_fixup_rw_res(req, res), 0);
req->io_task_work.func = io_req_rw_complete;
io_req_task_work_add(req);
__io_req_task_work_add(req, IOU_F_TWQ_LAZY_WAKE);
}
static void io_complete_rw_iopoll(struct kiocb *kiocb, long res)
......
......@@ -17,6 +17,7 @@ struct io_timeout {
struct file *file;
u32 off;
u32 target_seq;
u32 repeats;
struct list_head list;
/* head of the link, used by linked timeouts only */
struct io_kiocb *head;
......@@ -37,8 +38,9 @@ struct io_timeout_rem {
static inline bool io_is_timeout_noseq(struct io_kiocb *req)
{
struct io_timeout *timeout = io_kiocb_to_cmd(req, struct io_timeout);
struct io_timeout_data *data = req->async_data;
return !timeout->off;
return !timeout->off || data->flags & IORING_TIMEOUT_MULTISHOT;
}
static inline void io_put_req(struct io_kiocb *req)
......@@ -49,6 +51,44 @@ static inline void io_put_req(struct io_kiocb *req)
}
}
static inline bool io_timeout_finish(struct io_timeout *timeout,
struct io_timeout_data *data)
{
if (!(data->flags & IORING_TIMEOUT_MULTISHOT))
return true;
if (!timeout->off || (timeout->repeats && --timeout->repeats))
return false;
return true;
}
static enum hrtimer_restart io_timeout_fn(struct hrtimer *timer);
static void io_timeout_complete(struct io_kiocb *req, struct io_tw_state *ts)
{
struct io_timeout *timeout = io_kiocb_to_cmd(req, struct io_timeout);
struct io_timeout_data *data = req->async_data;
struct io_ring_ctx *ctx = req->ctx;
if (!io_timeout_finish(timeout, data)) {
bool filled;
filled = io_aux_cqe(ctx, ts->locked, req->cqe.user_data, -ETIME,
IORING_CQE_F_MORE, false);
if (filled) {
/* re-arm timer */
spin_lock_irq(&ctx->timeout_lock);
list_add(&timeout->list, ctx->timeout_list.prev);
data->timer.function = io_timeout_fn;
hrtimer_start(&data->timer, timespec64_to_ktime(data->ts), data->mode);
spin_unlock_irq(&ctx->timeout_lock);
return;
}
}
io_req_task_complete(req, ts);
}
static bool io_kill_timeout(struct io_kiocb *req, int status)
__must_hold(&req->ctx->timeout_lock)
{
......@@ -101,9 +141,9 @@ __cold void io_flush_timeouts(struct io_ring_ctx *ctx)
spin_unlock_irq(&ctx->timeout_lock);
}
static void io_req_tw_fail_links(struct io_kiocb *link, bool *locked)
static void io_req_tw_fail_links(struct io_kiocb *link, struct io_tw_state *ts)
{
io_tw_lock(link->ctx, locked);
io_tw_lock(link->ctx, ts);
while (link) {
struct io_kiocb *nxt = link->link;
long res = -ECANCELED;
......@@ -112,7 +152,7 @@ static void io_req_tw_fail_links(struct io_kiocb *link, bool *locked)
res = link->cqe.res;
link->link = NULL;
io_req_set_res(link, res, 0);
io_req_task_complete(link, locked);
io_req_task_complete(link, ts);
link = nxt;
}
}
......@@ -212,7 +252,7 @@ static enum hrtimer_restart io_timeout_fn(struct hrtimer *timer)
req_set_fail(req);
io_req_set_res(req, -ETIME, 0);
req->io_task_work.func = io_req_task_complete;
req->io_task_work.func = io_timeout_complete;
io_req_task_work_add(req);
return HRTIMER_NORESTART;
}
......@@ -265,9 +305,9 @@ int io_timeout_cancel(struct io_ring_ctx *ctx, struct io_cancel_data *cd)
return 0;
}
static void io_req_task_link_timeout(struct io_kiocb *req, bool *locked)
static void io_req_task_link_timeout(struct io_kiocb *req, struct io_tw_state *ts)
{
unsigned issue_flags = *locked ? 0 : IO_URING_F_UNLOCKED;
unsigned issue_flags = ts->locked ? 0 : IO_URING_F_UNLOCKED;
struct io_timeout *timeout = io_kiocb_to_cmd(req, struct io_timeout);
struct io_kiocb *prev = timeout->prev;
int ret = -ENOENT;
......@@ -282,11 +322,11 @@ static void io_req_task_link_timeout(struct io_kiocb *req, bool *locked)
ret = io_try_cancel(req->task->io_uring, &cd, issue_flags);
}
io_req_set_res(req, ret ?: -ETIME, 0);
io_req_task_complete(req, locked);
io_req_task_complete(req, ts);
io_put_req(prev);
} else {
io_req_set_res(req, -ETIME, 0);
io_req_task_complete(req, locked);
io_req_task_complete(req, ts);
}
}
......@@ -470,16 +510,27 @@ static int __io_timeout_prep(struct io_kiocb *req,
return -EINVAL;
flags = READ_ONCE(sqe->timeout_flags);
if (flags & ~(IORING_TIMEOUT_ABS | IORING_TIMEOUT_CLOCK_MASK |
IORING_TIMEOUT_ETIME_SUCCESS))
IORING_TIMEOUT_ETIME_SUCCESS |
IORING_TIMEOUT_MULTISHOT))
return -EINVAL;
/* more than one clock specified is invalid, obviously */
if (hweight32(flags & IORING_TIMEOUT_CLOCK_MASK) > 1)
return -EINVAL;
/* multishot requests only make sense with rel values */
if (!(~flags & (IORING_TIMEOUT_MULTISHOT | IORING_TIMEOUT_ABS)))
return -EINVAL;
INIT_LIST_HEAD(&timeout->list);
timeout->off = off;
if (unlikely(off && !req->ctx->off_timeout_used))
req->ctx->off_timeout_used = true;
/*
* for multishot reqs w/ fixed nr of repeats, repeats tracks the
* remaining nr
*/
timeout->repeats = 0;
if ((flags & IORING_TIMEOUT_MULTISHOT) && off > 0)
timeout->repeats = off;
if (WARN_ON_ONCE(req_has_async_data(req)))
return -EFAULT;
......
......@@ -12,10 +12,10 @@
#include "rsrc.h"
#include "uring_cmd.h"
static void io_uring_cmd_work(struct io_kiocb *req, bool *locked)
static void io_uring_cmd_work(struct io_kiocb *req, struct io_tw_state *ts)
{
struct io_uring_cmd *ioucmd = io_kiocb_to_cmd(req, struct io_uring_cmd);
unsigned issue_flags = *locked ? 0 : IO_URING_F_UNLOCKED;
unsigned issue_flags = ts->locked ? 0 : IO_URING_F_UNLOCKED;
ioucmd->task_work_cb(ioucmd, issue_flags);
}
......@@ -54,11 +54,15 @@ void io_uring_cmd_done(struct io_uring_cmd *ioucmd, ssize_t ret, ssize_t res2,
io_req_set_res(req, ret, 0);
if (req->ctx->flags & IORING_SETUP_CQE32)
io_req_set_cqe32_extra(req, res2, 0);
if (req->ctx->flags & IORING_SETUP_IOPOLL)
if (req->ctx->flags & IORING_SETUP_IOPOLL) {
/* order with io_iopoll_req_issued() checking ->iopoll_complete */
smp_store_release(&req->iopoll_completed, 1);
else
io_req_complete_post(req, issue_flags);
} else {
struct io_tw_state ts = {
.locked = !(issue_flags & IO_URING_F_UNLOCKED),
};
io_req_task_complete(req, &ts);
}
}
EXPORT_SYMBOL_GPL(io_uring_cmd_done);
......@@ -73,6 +77,7 @@ int io_uring_cmd_prep_async(struct io_kiocb *req)
cmd_size = uring_cmd_pdu_size(req->ctx->flags & IORING_SETUP_SQE128);
memcpy(req->async_data, ioucmd->cmd, cmd_size);
ioucmd->cmd = req->async_data;
return 0;
}
......@@ -129,9 +134,6 @@ int io_uring_cmd(struct io_kiocb *req, unsigned int issue_flags)
WRITE_ONCE(ioucmd->cookie, NULL);
}
if (req_has_async_data(req))
ioucmd->cmd = req->async_data;
ret = file->f_op->uring_cmd(ioucmd, issue_flags);
if (ret == -EAGAIN) {
if (!req_has_async_data(req)) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment