Commit cb7a583e authored by Kent Overstreet's avatar Kent Overstreet

bcache: kill closure locking usage

Signed-off-by: default avatarKent Overstreet <kmo@daterainc.com>
parent a5ae4300
...@@ -309,7 +309,8 @@ struct cached_dev { ...@@ -309,7 +309,8 @@ struct cached_dev {
struct cache_sb sb; struct cache_sb sb;
struct bio sb_bio; struct bio sb_bio;
struct bio_vec sb_bv[1]; struct bio_vec sb_bv[1];
struct closure_with_waitlist sb_write; struct closure sb_write;
struct semaphore sb_write_mutex;
/* Refcount on the cache set. Always nonzero when we're caching. */ /* Refcount on the cache set. Always nonzero when we're caching. */
atomic_t count; atomic_t count;
...@@ -514,7 +515,8 @@ struct cache_set { ...@@ -514,7 +515,8 @@ struct cache_set {
uint64_t cached_dev_sectors; uint64_t cached_dev_sectors;
struct closure caching; struct closure caching;
struct closure_with_waitlist sb_write; struct closure sb_write;
struct semaphore sb_write_mutex;
mempool_t *search; mempool_t *search;
mempool_t *bio_meta; mempool_t *bio_meta;
...@@ -635,7 +637,8 @@ struct cache_set { ...@@ -635,7 +637,8 @@ struct cache_set {
unsigned nr_uuids; unsigned nr_uuids;
struct uuid_entry *uuids; struct uuid_entry *uuids;
BKEY_PADDED(uuid_bucket); BKEY_PADDED(uuid_bucket);
struct closure_with_waitlist uuid_write; struct closure uuid_write;
struct semaphore uuid_write_mutex;
/* /*
* A btree node on disk could have too many bsets for an iterator to fit * A btree node on disk could have too many bsets for an iterator to fit
......
...@@ -340,9 +340,16 @@ static void btree_complete_write(struct btree *b, struct btree_write *w) ...@@ -340,9 +340,16 @@ static void btree_complete_write(struct btree *b, struct btree_write *w)
w->journal = NULL; w->journal = NULL;
} }
static void btree_node_write_unlock(struct closure *cl)
{
struct btree *b = container_of(cl, struct btree, io);
up(&b->io_mutex);
}
static void __btree_node_write_done(struct closure *cl) static void __btree_node_write_done(struct closure *cl)
{ {
struct btree *b = container_of(cl, struct btree, io.cl); struct btree *b = container_of(cl, struct btree, io);
struct btree_write *w = btree_prev_write(b); struct btree_write *w = btree_prev_write(b);
bch_bbio_free(b->bio, b->c); bch_bbio_free(b->bio, b->c);
...@@ -353,12 +360,12 @@ static void __btree_node_write_done(struct closure *cl) ...@@ -353,12 +360,12 @@ static void __btree_node_write_done(struct closure *cl)
queue_delayed_work(btree_io_wq, &b->work, queue_delayed_work(btree_io_wq, &b->work,
msecs_to_jiffies(30000)); msecs_to_jiffies(30000));
closure_return(cl); closure_return_with_destructor(cl, btree_node_write_unlock);
} }
static void btree_node_write_done(struct closure *cl) static void btree_node_write_done(struct closure *cl)
{ {
struct btree *b = container_of(cl, struct btree, io.cl); struct btree *b = container_of(cl, struct btree, io);
struct bio_vec *bv; struct bio_vec *bv;
int n; int n;
...@@ -371,7 +378,7 @@ static void btree_node_write_done(struct closure *cl) ...@@ -371,7 +378,7 @@ static void btree_node_write_done(struct closure *cl)
static void btree_node_write_endio(struct bio *bio, int error) static void btree_node_write_endio(struct bio *bio, int error)
{ {
struct closure *cl = bio->bi_private; struct closure *cl = bio->bi_private;
struct btree *b = container_of(cl, struct btree, io.cl); struct btree *b = container_of(cl, struct btree, io);
if (error) if (error)
set_btree_node_io_error(b); set_btree_node_io_error(b);
...@@ -382,7 +389,7 @@ static void btree_node_write_endio(struct bio *bio, int error) ...@@ -382,7 +389,7 @@ static void btree_node_write_endio(struct bio *bio, int error)
static void do_btree_node_write(struct btree *b) static void do_btree_node_write(struct btree *b)
{ {
struct closure *cl = &b->io.cl; struct closure *cl = &b->io;
struct bset *i = b->sets[b->nsets].data; struct bset *i = b->sets[b->nsets].data;
BKEY_PADDED(key) k; BKEY_PADDED(key) k;
...@@ -435,7 +442,7 @@ static void do_btree_node_write(struct btree *b) ...@@ -435,7 +442,7 @@ static void do_btree_node_write(struct btree *b)
bch_submit_bbio(b->bio, b->c, &k.key, 0); bch_submit_bbio(b->bio, b->c, &k.key, 0);
closure_sync(cl); closure_sync(cl);
__btree_node_write_done(cl); continue_at_nobarrier(cl, __btree_node_write_done, NULL);
} }
} }
...@@ -454,7 +461,8 @@ void bch_btree_node_write(struct btree *b, struct closure *parent) ...@@ -454,7 +461,8 @@ void bch_btree_node_write(struct btree *b, struct closure *parent)
cancel_delayed_work(&b->work); cancel_delayed_work(&b->work);
/* If caller isn't waiting for write, parent refcount is cache set */ /* If caller isn't waiting for write, parent refcount is cache set */
closure_lock(&b->io, parent ?: &b->c->cl); down(&b->io_mutex);
closure_init(&b->io, parent ?: &b->c->cl);
clear_bit(BTREE_NODE_dirty, &b->flags); clear_bit(BTREE_NODE_dirty, &b->flags);
change_bit(BTREE_NODE_write_idx, &b->flags); change_bit(BTREE_NODE_write_idx, &b->flags);
...@@ -554,7 +562,8 @@ static void mca_reinit(struct btree *b) ...@@ -554,7 +562,8 @@ static void mca_reinit(struct btree *b)
static void mca_data_free(struct btree *b) static void mca_data_free(struct btree *b)
{ {
struct bset_tree *t = b->sets; struct bset_tree *t = b->sets;
BUG_ON(!closure_is_unlocked(&b->io.cl));
BUG_ON(b->io_mutex.count != 1);
if (bset_prev_bytes(b) < PAGE_SIZE) if (bset_prev_bytes(b) < PAGE_SIZE)
kfree(t->prev); kfree(t->prev);
...@@ -635,7 +644,7 @@ static struct btree *mca_bucket_alloc(struct cache_set *c, ...@@ -635,7 +644,7 @@ static struct btree *mca_bucket_alloc(struct cache_set *c,
INIT_LIST_HEAD(&b->list); INIT_LIST_HEAD(&b->list);
INIT_DELAYED_WORK(&b->work, btree_node_write_work); INIT_DELAYED_WORK(&b->work, btree_node_write_work);
b->c = c; b->c = c;
closure_init_unlocked(&b->io); sema_init(&b->io_mutex, 1);
mca_data_alloc(b, k, gfp); mca_data_alloc(b, k, gfp);
return b; return b;
...@@ -653,22 +662,29 @@ static int mca_reap(struct btree *b, unsigned min_order, bool flush) ...@@ -653,22 +662,29 @@ static int mca_reap(struct btree *b, unsigned min_order, bool flush)
BUG_ON(btree_node_dirty(b) && !b->sets[0].data); BUG_ON(btree_node_dirty(b) && !b->sets[0].data);
if (b->page_order < min_order || if (b->page_order < min_order)
(!flush && goto out_unlock;
(btree_node_dirty(b) ||
atomic_read(&b->io.cl.remaining) != -1))) { if (!flush) {
rw_unlock(true, b); if (btree_node_dirty(b))
return -ENOMEM; goto out_unlock;
if (down_trylock(&b->io_mutex))
goto out_unlock;
up(&b->io_mutex);
} }
if (btree_node_dirty(b)) if (btree_node_dirty(b))
bch_btree_node_write_sync(b); bch_btree_node_write_sync(b);
/* wait for any in flight btree write */ /* wait for any in flight btree write */
closure_wait_event(&b->io.wait, &cl, down(&b->io_mutex);
atomic_read(&b->io.cl.remaining) == -1); up(&b->io_mutex);
return 0; return 0;
out_unlock:
rw_unlock(true, b);
return -ENOMEM;
} }
static unsigned long bch_mca_scan(struct shrinker *shrink, static unsigned long bch_mca_scan(struct shrinker *shrink,
...@@ -918,7 +934,7 @@ static struct btree *mca_alloc(struct cache_set *c, struct bkey *k, int level) ...@@ -918,7 +934,7 @@ static struct btree *mca_alloc(struct cache_set *c, struct bkey *k, int level)
if (!b->sets->data) if (!b->sets->data)
goto err; goto err;
out: out:
BUG_ON(!closure_is_unlocked(&b->io.cl)); BUG_ON(b->io_mutex.count != 1);
bkey_copy(&b->key, k); bkey_copy(&b->key, k);
list_move(&b->list, &c->btree_cache); list_move(&b->list, &c->btree_cache);
......
...@@ -143,7 +143,8 @@ struct btree { ...@@ -143,7 +143,8 @@ struct btree {
struct bset_tree sets[MAX_BSETS]; struct bset_tree sets[MAX_BSETS];
/* For outstanding btree writes, used as a lock - protects write_idx */ /* For outstanding btree writes, used as a lock - protects write_idx */
struct closure_with_waitlist io; struct closure io;
struct semaphore io_mutex;
struct list_head list; struct list_head list;
struct delayed_work work; struct delayed_work work;
......
...@@ -127,9 +127,7 @@ void bch_btree_verify(struct btree *b, struct bset *new) ...@@ -127,9 +127,7 @@ void bch_btree_verify(struct btree *b, struct bset *new)
if (!b->c->verify) if (!b->c->verify)
return; return;
closure_wait_event(&b->io.wait, &cl, down(&b->io_mutex);
atomic_read(&b->io.cl.remaining) == -1);
mutex_lock(&b->c->verify_lock); mutex_lock(&b->c->verify_lock);
bkey_copy(&v->key, &b->key); bkey_copy(&v->key, &b->key);
...@@ -137,8 +135,6 @@ void bch_btree_verify(struct btree *b, struct bset *new) ...@@ -137,8 +135,6 @@ void bch_btree_verify(struct btree *b, struct bset *new)
v->level = b->level; v->level = b->level;
bch_btree_node_read(v); bch_btree_node_read(v);
closure_wait_event(&v->io.wait, &cl,
atomic_read(&b->io.cl.remaining) == -1);
if (new->keys != v->sets[0].data->keys || if (new->keys != v->sets[0].data->keys ||
memcmp(new->start, memcmp(new->start,
...@@ -167,6 +163,7 @@ void bch_btree_verify(struct btree *b, struct bset *new) ...@@ -167,6 +163,7 @@ void bch_btree_verify(struct btree *b, struct bset *new)
} }
mutex_unlock(&b->c->verify_lock); mutex_unlock(&b->c->verify_lock);
up(&b->io_mutex);
} }
void bch_data_verify(struct cached_dev *dc, struct bio *bio) void bch_data_verify(struct cached_dev *dc, struct bio *bio)
......
...@@ -564,6 +564,14 @@ static void journal_write_done(struct closure *cl) ...@@ -564,6 +564,14 @@ static void journal_write_done(struct closure *cl)
continue_at_nobarrier(cl, journal_write, system_wq); continue_at_nobarrier(cl, journal_write, system_wq);
} }
static void journal_write_unlock(struct closure *cl)
{
struct cache_set *c = container_of(cl, struct cache_set, journal.io);
c->journal.io_in_flight = 0;
spin_unlock(&c->journal.lock);
}
static void journal_write_unlocked(struct closure *cl) static void journal_write_unlocked(struct closure *cl)
__releases(c->journal.lock) __releases(c->journal.lock)
{ {
...@@ -578,15 +586,7 @@ static void journal_write_unlocked(struct closure *cl) ...@@ -578,15 +586,7 @@ static void journal_write_unlocked(struct closure *cl)
bio_list_init(&list); bio_list_init(&list);
if (!w->need_write) { if (!w->need_write) {
/* closure_return_with_destructor(cl, journal_write_unlock);
* XXX: have to unlock closure before we unlock journal lock,
* else we race with bch_journal(). But this way we race
* against cache set unregister. Doh.
*/
set_closure_fn(cl, NULL, NULL);
closure_sub(cl, CLOSURE_RUNNING + 1);
spin_unlock(&c->journal.lock);
return;
} else if (journal_full(&c->journal)) { } else if (journal_full(&c->journal)) {
journal_reclaim(c); journal_reclaim(c);
spin_unlock(&c->journal.lock); spin_unlock(&c->journal.lock);
...@@ -662,10 +662,12 @@ static void journal_try_write(struct cache_set *c) ...@@ -662,10 +662,12 @@ static void journal_try_write(struct cache_set *c)
w->need_write = true; w->need_write = true;
if (closure_trylock(cl, &c->cl)) if (!c->journal.io_in_flight) {
journal_write_unlocked(cl); c->journal.io_in_flight = 1;
else closure_call(cl, journal_write_unlocked, NULL, &c->cl);
} else {
spin_unlock(&c->journal.lock); spin_unlock(&c->journal.lock);
}
} }
static struct journal_write *journal_wait_for_write(struct cache_set *c, static struct journal_write *journal_wait_for_write(struct cache_set *c,
...@@ -793,7 +795,6 @@ int bch_journal_alloc(struct cache_set *c) ...@@ -793,7 +795,6 @@ int bch_journal_alloc(struct cache_set *c)
{ {
struct journal *j = &c->journal; struct journal *j = &c->journal;
closure_init_unlocked(&j->io);
spin_lock_init(&j->lock); spin_lock_init(&j->lock);
INIT_DELAYED_WORK(&j->work, journal_write_work); INIT_DELAYED_WORK(&j->work, journal_write_work);
......
...@@ -104,6 +104,7 @@ struct journal { ...@@ -104,6 +104,7 @@ struct journal {
/* used when waiting because the journal was full */ /* used when waiting because the journal was full */
struct closure_waitlist wait; struct closure_waitlist wait;
struct closure io; struct closure io;
int io_in_flight;
struct delayed_work work; struct delayed_work work;
/* Number of blocks free in the bucket(s) we're currently writing to */ /* Number of blocks free in the bucket(s) we're currently writing to */
......
...@@ -225,7 +225,7 @@ static void write_bdev_super_endio(struct bio *bio, int error) ...@@ -225,7 +225,7 @@ static void write_bdev_super_endio(struct bio *bio, int error)
struct cached_dev *dc = bio->bi_private; struct cached_dev *dc = bio->bi_private;
/* XXX: error checking */ /* XXX: error checking */
closure_put(&dc->sb_write.cl); closure_put(&dc->sb_write);
} }
static void __write_super(struct cache_sb *sb, struct bio *bio) static void __write_super(struct cache_sb *sb, struct bio *bio)
...@@ -263,12 +263,20 @@ static void __write_super(struct cache_sb *sb, struct bio *bio) ...@@ -263,12 +263,20 @@ static void __write_super(struct cache_sb *sb, struct bio *bio)
submit_bio(REQ_WRITE, bio); submit_bio(REQ_WRITE, bio);
} }
static void bch_write_bdev_super_unlock(struct closure *cl)
{
struct cached_dev *dc = container_of(cl, struct cached_dev, sb_write);
up(&dc->sb_write_mutex);
}
void bch_write_bdev_super(struct cached_dev *dc, struct closure *parent) void bch_write_bdev_super(struct cached_dev *dc, struct closure *parent)
{ {
struct closure *cl = &dc->sb_write.cl; struct closure *cl = &dc->sb_write;
struct bio *bio = &dc->sb_bio; struct bio *bio = &dc->sb_bio;
closure_lock(&dc->sb_write, parent); down(&dc->sb_write_mutex);
closure_init(cl, parent);
bio_reset(bio); bio_reset(bio);
bio->bi_bdev = dc->bdev; bio->bi_bdev = dc->bdev;
...@@ -278,7 +286,7 @@ void bch_write_bdev_super(struct cached_dev *dc, struct closure *parent) ...@@ -278,7 +286,7 @@ void bch_write_bdev_super(struct cached_dev *dc, struct closure *parent)
closure_get(cl); closure_get(cl);
__write_super(&dc->sb, bio); __write_super(&dc->sb, bio);
closure_return(cl); closure_return_with_destructor(cl, bch_write_bdev_super_unlock);
} }
static void write_super_endio(struct bio *bio, int error) static void write_super_endio(struct bio *bio, int error)
...@@ -286,16 +294,24 @@ static void write_super_endio(struct bio *bio, int error) ...@@ -286,16 +294,24 @@ static void write_super_endio(struct bio *bio, int error)
struct cache *ca = bio->bi_private; struct cache *ca = bio->bi_private;
bch_count_io_errors(ca, error, "writing superblock"); bch_count_io_errors(ca, error, "writing superblock");
closure_put(&ca->set->sb_write.cl); closure_put(&ca->set->sb_write);
}
static void bcache_write_super_unlock(struct closure *cl)
{
struct cache_set *c = container_of(cl, struct cache_set, sb_write);
up(&c->sb_write_mutex);
} }
void bcache_write_super(struct cache_set *c) void bcache_write_super(struct cache_set *c)
{ {
struct closure *cl = &c->sb_write.cl; struct closure *cl = &c->sb_write;
struct cache *ca; struct cache *ca;
unsigned i; unsigned i;
closure_lock(&c->sb_write, &c->cl); down(&c->sb_write_mutex);
closure_init(cl, &c->cl);
c->sb.seq++; c->sb.seq++;
...@@ -317,7 +333,7 @@ void bcache_write_super(struct cache_set *c) ...@@ -317,7 +333,7 @@ void bcache_write_super(struct cache_set *c)
__write_super(&ca->sb, bio); __write_super(&ca->sb, bio);
} }
closure_return(cl); closure_return_with_destructor(cl, bcache_write_super_unlock);
} }
/* UUID io */ /* UUID io */
...@@ -325,23 +341,31 @@ void bcache_write_super(struct cache_set *c) ...@@ -325,23 +341,31 @@ void bcache_write_super(struct cache_set *c)
static void uuid_endio(struct bio *bio, int error) static void uuid_endio(struct bio *bio, int error)
{ {
struct closure *cl = bio->bi_private; struct closure *cl = bio->bi_private;
struct cache_set *c = container_of(cl, struct cache_set, uuid_write.cl); struct cache_set *c = container_of(cl, struct cache_set, uuid_write);
cache_set_err_on(error, c, "accessing uuids"); cache_set_err_on(error, c, "accessing uuids");
bch_bbio_free(bio, c); bch_bbio_free(bio, c);
closure_put(cl); closure_put(cl);
} }
static void uuid_io_unlock(struct closure *cl)
{
struct cache_set *c = container_of(cl, struct cache_set, uuid_write);
up(&c->uuid_write_mutex);
}
static void uuid_io(struct cache_set *c, unsigned long rw, static void uuid_io(struct cache_set *c, unsigned long rw,
struct bkey *k, struct closure *parent) struct bkey *k, struct closure *parent)
{ {
struct closure *cl = &c->uuid_write.cl; struct closure *cl = &c->uuid_write;
struct uuid_entry *u; struct uuid_entry *u;
unsigned i; unsigned i;
char buf[80]; char buf[80];
BUG_ON(!parent); BUG_ON(!parent);
closure_lock(&c->uuid_write, parent); down(&c->uuid_write_mutex);
closure_init(cl, parent);
for (i = 0; i < KEY_PTRS(k); i++) { for (i = 0; i < KEY_PTRS(k); i++) {
struct bio *bio = bch_bbio_alloc(c); struct bio *bio = bch_bbio_alloc(c);
...@@ -368,7 +392,7 @@ static void uuid_io(struct cache_set *c, unsigned long rw, ...@@ -368,7 +392,7 @@ static void uuid_io(struct cache_set *c, unsigned long rw,
u - c->uuids, u->uuid, u->label, u - c->uuids, u->uuid, u->label,
u->first_reg, u->last_reg, u->invalidated); u->first_reg, u->last_reg, u->invalidated);
closure_return(cl); closure_return_with_destructor(cl, uuid_io_unlock);
} }
static char *uuid_read(struct cache_set *c, struct jset *j, struct closure *cl) static char *uuid_read(struct cache_set *c, struct jset *j, struct closure *cl)
...@@ -1098,7 +1122,7 @@ static int cached_dev_init(struct cached_dev *dc, unsigned block_size) ...@@ -1098,7 +1122,7 @@ static int cached_dev_init(struct cached_dev *dc, unsigned block_size)
set_closure_fn(&dc->disk.cl, cached_dev_flush, system_wq); set_closure_fn(&dc->disk.cl, cached_dev_flush, system_wq);
kobject_init(&dc->disk.kobj, &bch_cached_dev_ktype); kobject_init(&dc->disk.kobj, &bch_cached_dev_ktype);
INIT_WORK(&dc->detach, cached_dev_detach_finish); INIT_WORK(&dc->detach, cached_dev_detach_finish);
closure_init_unlocked(&dc->sb_write); sema_init(&dc->sb_write_mutex, 1);
INIT_LIST_HEAD(&dc->io_lru); INIT_LIST_HEAD(&dc->io_lru);
spin_lock_init(&dc->io_lock); spin_lock_init(&dc->io_lock);
bch_cache_accounting_init(&dc->accounting, &dc->disk.cl); bch_cache_accounting_init(&dc->accounting, &dc->disk.cl);
...@@ -1454,11 +1478,11 @@ struct cache_set *bch_cache_set_alloc(struct cache_sb *sb) ...@@ -1454,11 +1478,11 @@ struct cache_set *bch_cache_set_alloc(struct cache_sb *sb)
c->sort_crit_factor = int_sqrt(c->btree_pages); c->sort_crit_factor = int_sqrt(c->btree_pages);
closure_init_unlocked(&c->sb_write); sema_init(&c->sb_write_mutex, 1);
mutex_init(&c->bucket_lock); mutex_init(&c->bucket_lock);
init_waitqueue_head(&c->try_wait); init_waitqueue_head(&c->try_wait);
init_waitqueue_head(&c->bucket_wait); init_waitqueue_head(&c->bucket_wait);
closure_init_unlocked(&c->uuid_write); sema_init(&c->uuid_write_mutex, 1);
mutex_init(&c->sort_lock); mutex_init(&c->sort_lock);
spin_lock_init(&c->sort_time.lock); spin_lock_init(&c->sort_time.lock);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment