Commit 86f3cd1b authored by Pavel Begunkov's avatar Pavel Begunkov Committed by Jens Axboe

io-wq: handle hashed writes in chains

We always punt async buffered writes to an io-wq helper, as the core
kernel does not have IOCB_NOWAIT support for that. Most buffered async
writes complete very quickly, as it's just a copy operation. This means
that doing multiple locking roundtrips on the shared wqe lock for each
buffered write is wasteful. Additionally, buffered writes are hashed
work items, which means that any buffered write to a given file is
serialized.

Keep identicaly hashed work items contiguously in @wqe->work_list, and
track a tail for each hash bucket. On dequeue of a hashed item, splice
all of the same hash in one go using the tracked tail. Until the batch
is done, the caller doesn't have to synchronize with the wqe or worker
locks again.
Signed-off-by: default avatarPavel Begunkov <asml.silence@gmail.com>
Signed-off-by: default avatarJens Axboe <axboe@kernel.dk>
parent a5318d3c
...@@ -69,6 +69,8 @@ struct io_worker { ...@@ -69,6 +69,8 @@ struct io_worker {
#define IO_WQ_HASH_ORDER 5 #define IO_WQ_HASH_ORDER 5
#endif #endif
#define IO_WQ_NR_HASH_BUCKETS (1u << IO_WQ_HASH_ORDER)
struct io_wqe_acct { struct io_wqe_acct {
unsigned nr_workers; unsigned nr_workers;
unsigned max_workers; unsigned max_workers;
...@@ -98,6 +100,7 @@ struct io_wqe { ...@@ -98,6 +100,7 @@ struct io_wqe {
struct list_head all_list; struct list_head all_list;
struct io_wq *wq; struct io_wq *wq;
struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS];
}; };
/* /*
...@@ -384,7 +387,7 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe) ...@@ -384,7 +387,7 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe)
__must_hold(wqe->lock) __must_hold(wqe->lock)
{ {
struct io_wq_work_node *node, *prev; struct io_wq_work_node *node, *prev;
struct io_wq_work *work; struct io_wq_work *work, *tail;
unsigned int hash; unsigned int hash;
wq_list_for_each(node, prev, &wqe->work_list) { wq_list_for_each(node, prev, &wqe->work_list) {
...@@ -392,7 +395,7 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe) ...@@ -392,7 +395,7 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe)
/* not hashed, can run anytime */ /* not hashed, can run anytime */
if (!io_wq_is_hashed(work)) { if (!io_wq_is_hashed(work)) {
wq_node_del(&wqe->work_list, node, prev); wq_list_del(&wqe->work_list, node, prev);
return work; return work;
} }
...@@ -400,7 +403,10 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe) ...@@ -400,7 +403,10 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe)
hash = io_get_work_hash(work); hash = io_get_work_hash(work);
if (!(wqe->hash_map & BIT(hash))) { if (!(wqe->hash_map & BIT(hash))) {
wqe->hash_map |= BIT(hash); wqe->hash_map |= BIT(hash);
wq_node_del(&wqe->work_list, node, prev); /* all items with this hash lie in [work, tail] */
tail = wqe->hash_tail[hash];
wqe->hash_tail[hash] = NULL;
wq_list_cut(&wqe->work_list, &tail->list, prev);
return work; return work;
} }
} }
...@@ -485,7 +491,7 @@ static void io_worker_handle_work(struct io_worker *worker) ...@@ -485,7 +491,7 @@ static void io_worker_handle_work(struct io_worker *worker)
struct io_wq *wq = wqe->wq; struct io_wq *wq = wqe->wq;
do { do {
struct io_wq_work *work, *assign_work; struct io_wq_work *work;
unsigned int hash; unsigned int hash;
get_next: get_next:
/* /*
...@@ -508,8 +514,9 @@ static void io_worker_handle_work(struct io_worker *worker) ...@@ -508,8 +514,9 @@ static void io_worker_handle_work(struct io_worker *worker)
/* handle a whole dependent link */ /* handle a whole dependent link */
do { do {
struct io_wq_work *old_work; struct io_wq_work *old_work, *next_hashed, *linked;
next_hashed = wq_next_work(work);
io_impersonate_work(worker, work); io_impersonate_work(worker, work);
/* /*
* OK to set IO_WQ_WORK_CANCEL even for uncancellable * OK to set IO_WQ_WORK_CANCEL even for uncancellable
...@@ -518,22 +525,23 @@ static void io_worker_handle_work(struct io_worker *worker) ...@@ -518,22 +525,23 @@ static void io_worker_handle_work(struct io_worker *worker)
if (test_bit(IO_WQ_BIT_CANCEL, &wq->state)) if (test_bit(IO_WQ_BIT_CANCEL, &wq->state))
work->flags |= IO_WQ_WORK_CANCEL; work->flags |= IO_WQ_WORK_CANCEL;
old_work = work;
hash = io_get_work_hash(work); hash = io_get_work_hash(work);
work->func(&work); linked = old_work = work;
work = (old_work == work) ? NULL : work; linked->func(&linked);
linked = (old_work == linked) ? NULL : linked;
assign_work = work;
if (work && io_wq_is_hashed(work)) work = next_hashed;
assign_work = NULL; if (!work && linked && !io_wq_is_hashed(linked)) {
io_assign_current_work(worker, assign_work); work = linked;
linked = NULL;
}
io_assign_current_work(worker, work);
wq->free_work(old_work); wq->free_work(old_work);
if (work && !assign_work) { if (linked)
io_wqe_enqueue(wqe, work); io_wqe_enqueue(wqe, linked);
work = NULL;
} if (hash != -1U && !next_hashed) {
if (hash != -1U) {
spin_lock_irq(&wqe->lock); spin_lock_irq(&wqe->lock);
wqe->hash_map &= ~BIT_ULL(hash); wqe->hash_map &= ~BIT_ULL(hash);
wqe->flags &= ~IO_WQE_FLAG_STALLED; wqe->flags &= ~IO_WQE_FLAG_STALLED;
...@@ -776,6 +784,26 @@ static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe) ...@@ -776,6 +784,26 @@ static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
} while (work); } while (work);
} }
static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
{
unsigned int hash;
struct io_wq_work *tail;
if (!io_wq_is_hashed(work)) {
append:
wq_list_add_tail(&work->list, &wqe->work_list);
return;
}
hash = io_get_work_hash(work);
tail = wqe->hash_tail[hash];
wqe->hash_tail[hash] = work;
if (!tail)
goto append;
wq_list_add_after(&work->list, &tail->list, &wqe->work_list);
}
static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
{ {
struct io_wqe_acct *acct = io_work_get_acct(wqe, work); struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
...@@ -795,7 +823,7 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) ...@@ -795,7 +823,7 @@ static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
work_flags = work->flags; work_flags = work->flags;
spin_lock_irqsave(&wqe->lock, flags); spin_lock_irqsave(&wqe->lock, flags);
wq_list_add_tail(&work->list, &wqe->work_list); io_wqe_insert_work(wqe, work);
wqe->flags &= ~IO_WQE_FLAG_STALLED; wqe->flags &= ~IO_WQE_FLAG_STALLED;
spin_unlock_irqrestore(&wqe->lock, flags); spin_unlock_irqrestore(&wqe->lock, flags);
...@@ -914,7 +942,7 @@ static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe, ...@@ -914,7 +942,7 @@ static enum io_wq_cancel io_wqe_cancel_work(struct io_wqe *wqe,
work = container_of(node, struct io_wq_work, list); work = container_of(node, struct io_wq_work, list);
if (match->fn(work, match->data)) { if (match->fn(work, match->data)) {
wq_node_del(&wqe->work_list, node, prev); wq_list_del(&wqe->work_list, node, prev);
found = true; found = true;
break; break;
} }
......
...@@ -28,6 +28,18 @@ struct io_wq_work_list { ...@@ -28,6 +28,18 @@ struct io_wq_work_list {
struct io_wq_work_node *last; struct io_wq_work_node *last;
}; };
static inline void wq_list_add_after(struct io_wq_work_node *node,
struct io_wq_work_node *pos,
struct io_wq_work_list *list)
{
struct io_wq_work_node *next = pos->next;
pos->next = node;
node->next = next;
if (!next)
list->last = node;
}
static inline void wq_list_add_tail(struct io_wq_work_node *node, static inline void wq_list_add_tail(struct io_wq_work_node *node,
struct io_wq_work_list *list) struct io_wq_work_list *list)
{ {
...@@ -40,17 +52,26 @@ static inline void wq_list_add_tail(struct io_wq_work_node *node, ...@@ -40,17 +52,26 @@ static inline void wq_list_add_tail(struct io_wq_work_node *node,
} }
} }
static inline void wq_node_del(struct io_wq_work_list *list, static inline void wq_list_cut(struct io_wq_work_list *list,
struct io_wq_work_node *node, struct io_wq_work_node *last,
struct io_wq_work_node *prev) struct io_wq_work_node *prev)
{ {
if (node == list->first) /* first in the list, if prev==NULL */
WRITE_ONCE(list->first, node->next); if (!prev)
if (node == list->last) WRITE_ONCE(list->first, last->next);
else
prev->next = last->next;
if (last == list->last)
list->last = prev; list->last = prev;
if (prev) last->next = NULL;
prev->next = node->next; }
node->next = NULL;
static inline void wq_list_del(struct io_wq_work_list *list,
struct io_wq_work_node *node,
struct io_wq_work_node *prev)
{
wq_list_cut(list, node, prev);
} }
#define wq_list_for_each(pos, prv, head) \ #define wq_list_for_each(pos, prv, head) \
...@@ -78,6 +99,14 @@ struct io_wq_work { ...@@ -78,6 +99,14 @@ struct io_wq_work {
*(work) = (struct io_wq_work){ .func = _func }; \ *(work) = (struct io_wq_work){ .func = _func }; \
} while (0) \ } while (0) \
static inline struct io_wq_work *wq_next_work(struct io_wq_work *work)
{
if (!work->list.next)
return NULL;
return container_of(work->list.next, struct io_wq_work, list);
}
typedef void (free_work_fn)(struct io_wq_work *); typedef void (free_work_fn)(struct io_wq_work *);
struct io_wq_data { struct io_wq_data {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment