Commit e9fd9396 authored by Pavel Begunkov's avatar Pavel Begunkov Committed by Jens Axboe

io_uring/io-wq: forward submission ref to async

First it changes io-wq interfaces. It replaces {get,put}_work() with
free_work(), which guaranteed to be called exactly once. It also enforces
free_work() callback to be non-NULL.

io_uring follows the changes and instead of putting a submission reference
in io_put_req_async_completion(), it will be done in io_free_work(). As
removes io_get_work() with corresponding refcount_inc(), the ref balance
is maintained.
Signed-off-by: default avatarPavel Begunkov <asml.silence@gmail.com>
Signed-off-by: default avatarJens Axboe <axboe@kernel.dk>
parent f462fd36
...@@ -107,8 +107,7 @@ struct io_wq { ...@@ -107,8 +107,7 @@ struct io_wq {
struct io_wqe **wqes; struct io_wqe **wqes;
unsigned long state; unsigned long state;
get_work_fn *get_work; free_work_fn *free_work;
put_work_fn *put_work;
struct task_struct *manager; struct task_struct *manager;
struct user_struct *user; struct user_struct *user;
...@@ -509,16 +508,11 @@ static void io_worker_handle_work(struct io_worker *worker) ...@@ -509,16 +508,11 @@ static void io_worker_handle_work(struct io_worker *worker)
if (test_bit(IO_WQ_BIT_CANCEL, &wq->state)) if (test_bit(IO_WQ_BIT_CANCEL, &wq->state))
work->flags |= IO_WQ_WORK_CANCEL; work->flags |= IO_WQ_WORK_CANCEL;
if (wq->get_work)
wq->get_work(work);
old_work = work; old_work = work;
work->func(&work); work->func(&work);
work = (old_work == work) ? NULL : work; work = (old_work == work) ? NULL : work;
io_assign_current_work(worker, work); io_assign_current_work(worker, work);
wq->free_work(old_work);
if (wq->put_work)
wq->put_work(old_work);
if (hash != -1U) { if (hash != -1U) {
spin_lock_irq(&wqe->lock); spin_lock_irq(&wqe->lock);
...@@ -749,14 +743,17 @@ static bool io_wq_can_queue(struct io_wqe *wqe, struct io_wqe_acct *acct, ...@@ -749,14 +743,17 @@ static bool io_wq_can_queue(struct io_wqe *wqe, struct io_wqe_acct *acct,
return true; return true;
} }
static void io_run_cancel(struct io_wq_work *work) static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
{ {
struct io_wq *wq = wqe->wq;
do { do {
struct io_wq_work *old_work = work; struct io_wq_work *old_work = work;
work->flags |= IO_WQ_WORK_CANCEL; work->flags |= IO_WQ_WORK_CANCEL;
work->func(&work); work->func(&work);
work = (work == old_work) ? NULL : work; work = (work == old_work) ? NULL : work;
wq->free_work(old_work);
} while (work); } while (work);
} }
...@@ -773,7 +770,7 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) ...@@ -773,7 +770,7 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
* It's close enough to not be an issue, fork() has the same delay. * It's close enough to not be an issue, fork() has the same delay.
*/ */
if (unlikely(!io_wq_can_queue(wqe, acct, work))) { if (unlikely(!io_wq_can_queue(wqe, acct, work))) {
io_run_cancel(work); io_run_cancel(work, wqe);
return; return;
} }
...@@ -912,7 +909,7 @@ static enum io_wq_cancel io_wqe_cancel_cb_work(struct io_wqe *wqe, ...@@ -912,7 +909,7 @@ static enum io_wq_cancel io_wqe_cancel_cb_work(struct io_wqe *wqe,
spin_unlock_irqrestore(&wqe->lock, flags); spin_unlock_irqrestore(&wqe->lock, flags);
if (found) { if (found) {
io_run_cancel(work); io_run_cancel(work, wqe);
return IO_WQ_CANCEL_OK; return IO_WQ_CANCEL_OK;
} }
...@@ -987,7 +984,7 @@ static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe, ...@@ -987,7 +984,7 @@ static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe,
spin_unlock_irqrestore(&wqe->lock, flags); spin_unlock_irqrestore(&wqe->lock, flags);
if (found) { if (found) {
io_run_cancel(work); io_run_cancel(work, wqe);
return IO_WQ_CANCEL_OK; return IO_WQ_CANCEL_OK;
} }
...@@ -1064,6 +1061,9 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) ...@@ -1064,6 +1061,9 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
int ret = -ENOMEM, node; int ret = -ENOMEM, node;
struct io_wq *wq; struct io_wq *wq;
if (WARN_ON_ONCE(!data->free_work))
return ERR_PTR(-EINVAL);
wq = kzalloc(sizeof(*wq), GFP_KERNEL); wq = kzalloc(sizeof(*wq), GFP_KERNEL);
if (!wq) if (!wq)
return ERR_PTR(-ENOMEM); return ERR_PTR(-ENOMEM);
...@@ -1074,8 +1074,7 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) ...@@ -1074,8 +1074,7 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
return ERR_PTR(-ENOMEM); return ERR_PTR(-ENOMEM);
} }
wq->get_work = data->get_work; wq->free_work = data->free_work;
wq->put_work = data->put_work;
/* caller must already hold a reference to this */ /* caller must already hold a reference to this */
wq->user = data->user; wq->user = data->user;
...@@ -1132,7 +1131,7 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) ...@@ -1132,7 +1131,7 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
bool io_wq_get(struct io_wq *wq, struct io_wq_data *data) bool io_wq_get(struct io_wq *wq, struct io_wq_data *data)
{ {
if (data->get_work != wq->get_work || data->put_work != wq->put_work) if (data->free_work != wq->free_work)
return false; return false;
return refcount_inc_not_zero(&wq->use_refs); return refcount_inc_not_zero(&wq->use_refs);
......
...@@ -81,14 +81,12 @@ struct io_wq_work { ...@@ -81,14 +81,12 @@ struct io_wq_work {
*(work) = (struct io_wq_work){ .func = _func }; \ *(work) = (struct io_wq_work){ .func = _func }; \
} while (0) \ } while (0) \
typedef void (get_work_fn)(struct io_wq_work *); typedef void (free_work_fn)(struct io_wq_work *);
typedef void (put_work_fn)(struct io_wq_work *);
struct io_wq_data { struct io_wq_data {
struct user_struct *user; struct user_struct *user;
get_work_fn *get_work; free_work_fn *free_work;
put_work_fn *put_work;
}; };
struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data); struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data);
......
...@@ -1558,8 +1558,8 @@ static void io_put_req(struct io_kiocb *req) ...@@ -1558,8 +1558,8 @@ static void io_put_req(struct io_kiocb *req)
io_free_req(req); io_free_req(req);
} }
static void io_put_req_async_completion(struct io_kiocb *req, static void io_steal_work(struct io_kiocb *req,
struct io_wq_work **workptr) struct io_wq_work **workptr)
{ {
/* /*
* It's in an io-wq worker, so there always should be at least * It's in an io-wq worker, so there always should be at least
...@@ -1569,7 +1569,6 @@ static void io_put_req_async_completion(struct io_kiocb *req, ...@@ -1569,7 +1569,6 @@ static void io_put_req_async_completion(struct io_kiocb *req,
* It also means, that if the counter dropped to 1, then there is * It also means, that if the counter dropped to 1, then there is
* no asynchronous users left, so it's safe to steal the next work. * no asynchronous users left, so it's safe to steal the next work.
*/ */
refcount_dec(&req->refs);
if (refcount_read(&req->refs) == 1) { if (refcount_read(&req->refs) == 1) {
struct io_kiocb *nxt = NULL; struct io_kiocb *nxt = NULL;
...@@ -2578,7 +2577,7 @@ static bool io_req_cancelled(struct io_kiocb *req) ...@@ -2578,7 +2577,7 @@ static bool io_req_cancelled(struct io_kiocb *req)
if (req->work.flags & IO_WQ_WORK_CANCEL) { if (req->work.flags & IO_WQ_WORK_CANCEL) {
req_set_fail_links(req); req_set_fail_links(req);
io_cqring_add_event(req, -ECANCELED); io_cqring_add_event(req, -ECANCELED);
io_double_put_req(req); io_put_req(req);
return true; return true;
} }
...@@ -2606,7 +2605,7 @@ static void io_fsync_finish(struct io_wq_work **workptr) ...@@ -2606,7 +2605,7 @@ static void io_fsync_finish(struct io_wq_work **workptr)
if (io_req_cancelled(req)) if (io_req_cancelled(req))
return; return;
__io_fsync(req); __io_fsync(req);
io_put_req_async_completion(req, workptr); io_steal_work(req, workptr);
} }
static int io_fsync(struct io_kiocb *req, bool force_nonblock) static int io_fsync(struct io_kiocb *req, bool force_nonblock)
...@@ -2639,7 +2638,7 @@ static void io_fallocate_finish(struct io_wq_work **workptr) ...@@ -2639,7 +2638,7 @@ static void io_fallocate_finish(struct io_wq_work **workptr)
if (io_req_cancelled(req)) if (io_req_cancelled(req))
return; return;
__io_fallocate(req); __io_fallocate(req);
io_put_req_async_completion(req, workptr); io_steal_work(req, workptr);
} }
static int io_fallocate_prep(struct io_kiocb *req, static int io_fallocate_prep(struct io_kiocb *req,
...@@ -3006,7 +3005,7 @@ static void io_close_finish(struct io_wq_work **workptr) ...@@ -3006,7 +3005,7 @@ static void io_close_finish(struct io_wq_work **workptr)
/* not cancellable, don't do io_req_cancelled() */ /* not cancellable, don't do io_req_cancelled() */
__io_close_finish(req); __io_close_finish(req);
io_put_req_async_completion(req, workptr); io_steal_work(req, workptr);
} }
static int io_close(struct io_kiocb *req, bool force_nonblock) static int io_close(struct io_kiocb *req, bool force_nonblock)
...@@ -3452,7 +3451,7 @@ static void io_accept_finish(struct io_wq_work **workptr) ...@@ -3452,7 +3451,7 @@ static void io_accept_finish(struct io_wq_work **workptr)
if (io_req_cancelled(req)) if (io_req_cancelled(req))
return; return;
__io_accept(req, false); __io_accept(req, false);
io_put_req_async_completion(req, workptr); io_steal_work(req, workptr);
} }
#endif #endif
...@@ -4719,7 +4718,7 @@ static void io_wq_submit_work(struct io_wq_work **workptr) ...@@ -4719,7 +4718,7 @@ static void io_wq_submit_work(struct io_wq_work **workptr)
io_put_req(req); io_put_req(req);
} }
io_put_req_async_completion(req, workptr); io_steal_work(req, workptr);
} }
static int io_req_needs_file(struct io_kiocb *req, int fd) static int io_req_needs_file(struct io_kiocb *req, int fd)
...@@ -6105,21 +6104,14 @@ static int io_sqe_files_update(struct io_ring_ctx *ctx, void __user *arg, ...@@ -6105,21 +6104,14 @@ static int io_sqe_files_update(struct io_ring_ctx *ctx, void __user *arg,
return __io_sqe_files_update(ctx, &up, nr_args); return __io_sqe_files_update(ctx, &up, nr_args);
} }
static void io_put_work(struct io_wq_work *work) static void io_free_work(struct io_wq_work *work)
{ {
struct io_kiocb *req = container_of(work, struct io_kiocb, work); struct io_kiocb *req = container_of(work, struct io_kiocb, work);
/* Consider that io_put_req_async_completion() relies on this ref */ /* Consider that io_steal_work() relies on this ref */
io_put_req(req); io_put_req(req);
} }
static void io_get_work(struct io_wq_work *work)
{
struct io_kiocb *req = container_of(work, struct io_kiocb, work);
refcount_inc(&req->refs);
}
static int io_init_wq_offload(struct io_ring_ctx *ctx, static int io_init_wq_offload(struct io_ring_ctx *ctx,
struct io_uring_params *p) struct io_uring_params *p)
{ {
...@@ -6130,8 +6122,7 @@ static int io_init_wq_offload(struct io_ring_ctx *ctx, ...@@ -6130,8 +6122,7 @@ static int io_init_wq_offload(struct io_ring_ctx *ctx,
int ret = 0; int ret = 0;
data.user = ctx->user; data.user = ctx->user;
data.get_work = io_get_work; data.free_work = io_free_work;
data.put_work = io_put_work;
if (!(p->flags & IORING_SETUP_ATTACH_WQ)) { if (!(p->flags & IORING_SETUP_ATTACH_WQ)) {
/* Do QD, or 4 * CPUS, whatever is smallest */ /* Do QD, or 4 * CPUS, whatever is smallest */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment