Commit 8fcdf814 authored by Kent Overstreet's avatar Kent Overstreet

bcachefs: Improved copygc pipelining

This improves copygc pipelining across multiple buckets: we now track
each in flight bucket we're evacuating, with separate moving_contexts.

This means that whereas previously we had to wait for outstanding moves
to complete to ensure we didn't try to evacuate the same bucket twice,
we can now just check buckets we want to evacuate against the pending
list.

This also mean we can run the verify_bucket_evacuated() check without
killing pipelining - meaning it can now always be enabled, not just on
debug builds.

This is going to be important for the upcoming erasure coding work,
where moving IOs that are being erasure coded will now skip the initial
replication step; instead the IOs will wait on the stripe to complete.
Signed-off-by: default avatarKent Overstreet <kent.overstreet@linux.dev>
parent 0b943b97
......@@ -40,18 +40,19 @@ static void progress_list_del(struct bch_fs *c, struct bch_move_stats *stats)
}
struct moving_io {
struct list_head list;
struct closure cl;
bool read_completed;
struct list_head list;
struct move_bucket_in_flight *b;
struct closure cl;
bool read_completed;
unsigned read_sectors;
unsigned write_sectors;
unsigned read_sectors;
unsigned write_sectors;
struct bch_read_bio rbio;
struct bch_read_bio rbio;
struct data_update write;
struct data_update write;
/* Must be last since it is variable size */
struct bio_vec bi_inline_vecs[0];
struct bio_vec bi_inline_vecs[0];
};
static void move_free(struct moving_io *io)
......@@ -59,6 +60,9 @@ static void move_free(struct moving_io *io)
struct moving_context *ctxt = io->write.ctxt;
struct bch_fs *c = ctxt->c;
if (io->b)
atomic_dec(&io->b->count);
bch2_data_update_exit(&io->write);
wake_up(&ctxt->wait);
bch2_write_ref_put(c, BCH_WRITE_REF_move);
......@@ -234,6 +238,7 @@ static int bch2_extent_drop_ptrs(struct btree_trans *trans,
static int bch2_move_extent(struct btree_trans *trans,
struct btree_iter *iter,
struct moving_context *ctxt,
struct move_bucket_in_flight *bucket_in_flight,
struct bch_io_opts io_opts,
enum btree_id btree_id,
struct bkey_s_c k,
......@@ -319,6 +324,11 @@ static int bch2_move_extent(struct btree_trans *trans,
atomic64_add(k.k->size, &ctxt->stats->sectors_moved);
}
if (bucket_in_flight) {
io->b = bucket_in_flight;
atomic_inc(&io->b->count);
}
this_cpu_add(c->counters[BCH_COUNTER_io_move], k.k->size);
this_cpu_add(c->counters[BCH_COUNTER_move_extent_read], k.k->size);
trace_move_extent_read(k.k);
......@@ -521,8 +531,8 @@ static int __bch2_move_data(struct moving_context *ctxt,
k = bkey_i_to_s_c(sk.k);
bch2_trans_unlock(&trans);
ret2 = bch2_move_extent(&trans, &iter, ctxt, io_opts,
btree_id, k, data_opts);
ret2 = bch2_move_extent(&trans, &iter, ctxt, NULL,
io_opts, btree_id, k, data_opts);
if (ret2) {
if (bch2_err_matches(ret2, BCH_ERR_transaction_restart))
continue;
......@@ -590,7 +600,7 @@ int bch2_move_data(struct bch_fs *c,
return ret;
}
static noinline void verify_bucket_evacuated(struct btree_trans *trans, struct bpos bucket, int gen)
void bch2_verify_bucket_evacuated(struct btree_trans *trans, struct bpos bucket, int gen)
{
struct bch_fs *c = trans->c;
struct btree_iter iter;
......@@ -625,6 +635,9 @@ static noinline void verify_bucket_evacuated(struct btree_trans *trans, struct b
failed_to_evacuate:
bch2_trans_iter_exit(trans, &iter);
if (test_bit(BCH_FS_EMERGENCY_RO, &c->flags))
return;
prt_printf(&buf, bch2_log_msg(c, "failed to evacuate bucket "));
bch2_bkey_val_to_text(&buf, c, k);
......@@ -661,6 +674,7 @@ static noinline void verify_bucket_evacuated(struct btree_trans *trans, struct b
int __bch2_evacuate_bucket(struct btree_trans *trans,
struct moving_context *ctxt,
struct move_bucket_in_flight *bucket_in_flight,
struct bpos bucket, int gen,
struct data_update_opts _data_opts)
{
......@@ -749,8 +763,9 @@ int __bch2_evacuate_bucket(struct btree_trans *trans,
i++;
}
ret = bch2_move_extent(trans, &iter, ctxt, io_opts,
bp.btree_id, k, data_opts);
ret = bch2_move_extent(trans, &iter, ctxt,
bucket_in_flight,
io_opts, bp.btree_id, k, data_opts);
bch2_trans_iter_exit(trans, &iter);
if (bch2_err_matches(ret, BCH_ERR_transaction_restart))
......@@ -809,7 +824,7 @@ int __bch2_evacuate_bucket(struct btree_trans *trans,
move_ctxt_wait_event(ctxt, NULL, list_empty(&ctxt->reads));
closure_sync(&ctxt->cl);
if (!ctxt->write_error)
verify_bucket_evacuated(trans, bucket, gen);
bch2_verify_bucket_evacuated(trans, bucket, gen);
}
err:
bch2_bkey_buf_exit(&sk, c);
......@@ -830,7 +845,7 @@ int bch2_evacuate_bucket(struct bch_fs *c,
bch2_trans_init(&trans, c, 0, 0);
bch2_moving_ctxt_init(&ctxt, c, rate, stats, wp, wait_on_copygc);
ret = __bch2_evacuate_bucket(&trans, &ctxt, bucket, gen, data_opts);
ret = __bch2_evacuate_bucket(&trans, &ctxt, NULL, bucket, gen, data_opts);
bch2_moving_ctxt_exit(&ctxt);
bch2_trans_exit(&trans);
......
......@@ -30,6 +30,8 @@ struct moving_context {
wait_queue_head_t wait;
};
void bch2_verify_bucket_evacuated(struct btree_trans *, struct bpos, int);
#define move_ctxt_wait_event(_ctxt, _trans, _cond) \
do { \
bool cond_finished = false; \
......@@ -68,6 +70,7 @@ int bch2_move_data(struct bch_fs *,
int __bch2_evacuate_bucket(struct btree_trans *,
struct moving_context *,
struct move_bucket_in_flight *,
struct bpos, int,
struct data_update_opts);
int bch2_evacuate_bucket(struct bch_fs *, struct bpos, int,
......
......@@ -16,4 +16,10 @@ struct bch_move_stats {
atomic64_t sectors_raced;
};
struct move_bucket_in_flight {
struct bpos bucket;
u8 gen;
atomic_t count;
};
#endif /* _BCACHEFS_MOVE_TYPES_H */
......@@ -26,6 +26,7 @@
#include "super-io.h"
#include "trace.h"
#include <linux/bsearch.h>
#include <linux/freezer.h>
#include <linux/kthread.h>
#include <linux/math64.h>
......@@ -70,62 +71,146 @@ static int bch2_bucket_is_movable(struct btree_trans *trans,
return ret;
}
static int bch2_copygc_next_bucket(struct btree_trans *trans,
struct bpos *bucket, u8 *gen, struct bpos *pos)
typedef FIFO(struct move_bucket_in_flight) move_buckets_in_flight;
struct move_bucket {
struct bpos bucket;
u8 gen;
};
typedef DARRAY(struct move_bucket) move_buckets;
static int move_bucket_cmp(const void *_l, const void *_r)
{
const struct move_bucket *l = _l;
const struct move_bucket *r = _r;
return bkey_cmp(l->bucket, r->bucket);
}
static bool bucket_in_flight(move_buckets *buckets_sorted, struct move_bucket b)
{
return bsearch(&b,
buckets_sorted->data,
buckets_sorted->nr,
sizeof(buckets_sorted->data[0]),
move_bucket_cmp) != NULL;
}
static void move_buckets_wait(struct btree_trans *trans,
struct moving_context *ctxt,
move_buckets_in_flight *buckets_in_flight,
size_t nr, bool verify_evacuated)
{
while (!fifo_empty(buckets_in_flight)) {
struct move_bucket_in_flight *i = &fifo_peek_front(buckets_in_flight);
if (fifo_used(buckets_in_flight) > nr)
move_ctxt_wait_event(ctxt, trans, !atomic_read(&i->count));
if (atomic_read(&i->count))
break;
/*
* moving_ctxt_exit calls bch2_write as it flushes pending
* reads, which inits another btree_trans; this one must be
* unlocked:
*/
if (verify_evacuated)
bch2_verify_bucket_evacuated(trans, i->bucket, i->gen);
buckets_in_flight->front++;
}
bch2_trans_unlock(trans);
}
static int bch2_copygc_get_buckets(struct btree_trans *trans,
struct moving_context *ctxt,
move_buckets_in_flight *buckets_in_flight,
move_buckets *buckets)
{
struct btree_iter iter;
move_buckets buckets_sorted = { 0 };
struct move_bucket_in_flight *i;
struct bkey_s_c k;
size_t fifo_iter, nr_to_get;
int ret;
move_buckets_wait(trans, ctxt, buckets_in_flight, buckets_in_flight->size / 2, true);
nr_to_get = max(16UL, fifo_used(buckets_in_flight) / 4);
fifo_for_each_entry_ptr(i, buckets_in_flight, fifo_iter) {
ret = darray_push(&buckets_sorted, ((struct move_bucket) {i->bucket, i->gen}));
if (ret) {
bch_err(trans->c, "error allocating move_buckets_sorted");
goto err;
}
}
sort(buckets_sorted.data,
buckets_sorted.nr,
sizeof(buckets_sorted.data[0]),
move_bucket_cmp,
NULL);
ret = for_each_btree_key2_upto(trans, iter, BTREE_ID_lru,
bpos_max(*pos, lru_pos(BCH_LRU_FRAGMENTATION_START, 0, 0)),
lru_pos(BCH_LRU_FRAGMENTATION_START, 0, 0),
lru_pos(BCH_LRU_FRAGMENTATION_START, U64_MAX, LRU_TIME_MAX),
0, k, ({
*bucket = u64_to_bucket(k.k->p.offset);
struct move_bucket b = { .bucket = u64_to_bucket(k.k->p.offset) };
int ret = 0;
if (!bucket_in_flight(&buckets_sorted, b) &&
bch2_bucket_is_movable(trans, b.bucket, lru_pos_time(k.k->p), &b.gen))
ret = darray_push(buckets, b) ?: buckets->nr >= nr_to_get;
bch2_bucket_is_movable(trans, *bucket, lru_pos_time(k.k->p), gen);
ret;
}));
err:
darray_exit(&buckets_sorted);
*pos = iter.pos;
if (ret < 0)
return ret;
return ret ? 0 : -ENOENT;
return ret < 0 ? ret : 0;
}
static int bch2_copygc(struct bch_fs *c)
static int bch2_copygc(struct btree_trans *trans,
struct moving_context *ctxt,
move_buckets_in_flight *buckets_in_flight)
{
struct bch_move_stats move_stats;
struct btree_trans trans;
struct moving_context ctxt;
struct bch_fs *c = trans->c;
struct data_update_opts data_opts = {
.btree_insert_flags = BTREE_INSERT_USE_RESERVE|JOURNAL_WATERMARK_copygc,
};
struct bpos bucket;
struct bpos pos;
u8 gen = 0;
unsigned nr_evacuated;
move_buckets buckets = { 0 };
struct move_bucket_in_flight *f;
struct move_bucket *i;
u64 moved = atomic64_read(&ctxt->stats->sectors_moved);
int ret = 0;
bch2_move_stats_init(&move_stats, "copygc");
bch2_moving_ctxt_init(&ctxt, c, NULL, &move_stats,
writepoint_ptr(&c->copygc_write_point),
false);
bch2_trans_init(&trans, c, 0, 0);
ret = bch2_btree_write_buffer_flush(trans);
if (bch2_fs_fatal_err_on(ret, c, "%s: error %s from bch2_btree_write_buffer_flush()",
__func__, bch2_err_str(ret)))
return ret;
ret = bch2_btree_write_buffer_flush(&trans);
BUG_ON(ret);
ret = bch2_copygc_get_buckets(trans, ctxt, buckets_in_flight, &buckets);
if (ret)
goto err;
for (nr_evacuated = 0, pos = POS_MIN;
nr_evacuated < 32 && !ret;
nr_evacuated++, pos = bpos_nosnap_successor(pos)) {
ret = bch2_copygc_next_bucket(&trans, &bucket, &gen, &pos) ?:
__bch2_evacuate_bucket(&trans, &ctxt, bucket, gen, data_opts);
if (bkey_eq(pos, POS_MAX))
darray_for_each(buckets, i) {
if (unlikely(freezing(current)))
break;
}
bch2_trans_exit(&trans);
bch2_moving_ctxt_exit(&ctxt);
f = fifo_push_ref(buckets_in_flight);
f->bucket = i->bucket;
f->gen = i->gen;
atomic_set(&f->count, 0);
ret = __bch2_evacuate_bucket(trans, ctxt, f, f->bucket, f->gen, data_opts);
if (ret)
goto err;
}
err:
darray_exit(&buckets);
/* no entries in LRU btree found, or got to end: */
if (ret == -ENOENT)
......@@ -134,7 +219,8 @@ static int bch2_copygc(struct bch_fs *c)
if (ret < 0 && !bch2_err_matches(ret, EROFS))
bch_err(c, "error from bch2_move_data() in copygc: %s", bch2_err_str(ret));
trace_and_count(c, copygc, c, atomic64_read(&move_stats.sectors_moved), 0, 0, 0);
moved = atomic64_read(&ctxt->stats->sectors_moved) - moved;
trace_and_count(c, copygc, c, moved, 0, 0, 0);
return ret;
}
......@@ -162,7 +248,7 @@ unsigned long bch2_copygc_wait_amount(struct bch_fs *c)
for_each_rw_member(ca, c, dev_idx) {
struct bch_dev_usage usage = bch2_dev_usage_read(ca);
fragmented_allowed = ((__dev_buckets_available(ca, usage, RESERVE_none) *
fragmented_allowed = ((__dev_buckets_available(ca, usage, RESERVE_stripe) *
ca->mi.bucket_size) >> 1);
fragmented = 0;
......@@ -191,22 +277,47 @@ void bch2_copygc_wait_to_text(struct printbuf *out, struct bch_fs *c)
static int bch2_copygc_thread(void *arg)
{
struct bch_fs *c = arg;
struct btree_trans trans;
struct moving_context ctxt;
struct bch_move_stats move_stats;
struct io_clock *clock = &c->io_clock[WRITE];
move_buckets_in_flight move_buckets;
u64 last, wait;
int ret = 0;
if (!init_fifo(&move_buckets, 1 << 14, GFP_KERNEL)) {
bch_err(c, "error allocating copygc buckets in flight");
return -ENOMEM;
}
set_freezable();
bch2_trans_init(&trans, c, 0, 0);
bch2_move_stats_init(&move_stats, "copygc");
bch2_moving_ctxt_init(&ctxt, c, NULL, &move_stats,
writepoint_ptr(&c->copygc_write_point),
false);
while (!ret && !kthread_should_stop()) {
bch2_trans_unlock(&trans);
cond_resched();
if (kthread_wait_freezable(c->copy_gc_enabled))
break;
if (!c->copy_gc_enabled) {
move_buckets_wait(&trans, &ctxt, &move_buckets, 0, true);
kthread_wait_freezable(c->copy_gc_enabled);
}
if (unlikely(freezing(current))) {
move_buckets_wait(&trans, &ctxt, &move_buckets, 0, true);
__refrigerator(false);
continue;
}
last = atomic64_read(&clock->now);
wait = bch2_copygc_wait_amount(c);
if (wait > clock->max_slop) {
move_buckets_wait(&trans, &ctxt, &move_buckets, 0, true);
trace_and_count(c, copygc_wait, c, wait, last + wait);
c->copygc_wait = last + wait;
bch2_kthread_io_clock_wait(clock, last + wait,
......@@ -217,12 +328,16 @@ static int bch2_copygc_thread(void *arg)
c->copygc_wait = 0;
c->copygc_running = true;
ret = bch2_copygc(c);
ret = bch2_copygc(&trans, &ctxt, &move_buckets);
c->copygc_running = false;
wake_up(&c->copygc_running_wq);
}
bch2_moving_ctxt_exit(&ctxt);
bch2_trans_exit(&trans);
free_fifo(&move_buckets);
return 0;
}
......
......@@ -544,6 +544,26 @@ do { \
submit_bio(bio); \
} while (0)
#define kthread_wait(cond) \
({ \
int _ret = 0; \
\
while (1) { \
set_current_state(TASK_INTERRUPTIBLE); \
if (kthread_should_stop()) { \
_ret = -1; \
break; \
} \
\
if (cond) \
break; \
\
schedule(); \
} \
set_current_state(TASK_RUNNING); \
_ret; \
})
#define kthread_wait_freezable(cond) \
({ \
int _ret = 0; \
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment