Commit 656f05d8 authored by Kent Overstreet's avatar Kent Overstreet

bcachefs: Split out journal workqueue

We don't want journal write completions to be blocked behind btree
transactions - io_complete_wq is used for btree updates after data and
metadata writes.
Signed-off-by: default avatarKent Overstreet <kent.overstreet@linux.dev>
parent 4f70176c
...@@ -181,14 +181,12 @@ journal_error_check_stuck(struct journal *j, int error, unsigned flags) ...@@ -181,14 +181,12 @@ journal_error_check_stuck(struct journal *j, int error, unsigned flags)
*/ */
void bch2_journal_buf_put_final(struct journal *j, u64 seq, bool write) void bch2_journal_buf_put_final(struct journal *j, u64 seq, bool write)
{ {
struct bch_fs *c = container_of(j, struct bch_fs, journal);
lockdep_assert_held(&j->lock); lockdep_assert_held(&j->lock);
if (__bch2_journal_pin_put(j, seq)) if (__bch2_journal_pin_put(j, seq))
bch2_journal_reclaim_fast(j); bch2_journal_reclaim_fast(j);
if (write) if (write)
closure_call(&j->io, bch2_journal_write, c->io_complete_wq, NULL); closure_call(&j->io, bch2_journal_write, j->wq, NULL);
} }
/* /*
...@@ -418,7 +416,7 @@ static int journal_entry_open(struct journal *j) ...@@ -418,7 +416,7 @@ static int journal_entry_open(struct journal *j)
} while ((v = atomic64_cmpxchg(&j->reservations.counter, } while ((v = atomic64_cmpxchg(&j->reservations.counter,
old.v, new.v)) != old.v); old.v, new.v)) != old.v);
mod_delayed_work(c->io_complete_wq, mod_delayed_work(j->wq,
&j->write_work, &j->write_work,
msecs_to_jiffies(c->opts.journal_flush_delay)); msecs_to_jiffies(c->opts.journal_flush_delay));
journal_wake(j); journal_wake(j);
...@@ -445,7 +443,6 @@ static void journal_quiesce(struct journal *j) ...@@ -445,7 +443,6 @@ static void journal_quiesce(struct journal *j)
static void journal_write_work(struct work_struct *work) static void journal_write_work(struct work_struct *work)
{ {
struct journal *j = container_of(work, struct journal, write_work.work); struct journal *j = container_of(work, struct journal, write_work.work);
struct bch_fs *c = container_of(j, struct bch_fs, journal);
long delta; long delta;
spin_lock(&j->lock); spin_lock(&j->lock);
...@@ -455,7 +452,7 @@ static void journal_write_work(struct work_struct *work) ...@@ -455,7 +452,7 @@ static void journal_write_work(struct work_struct *work)
delta = journal_cur_buf(j)->expires - jiffies; delta = journal_cur_buf(j)->expires - jiffies;
if (delta > 0) if (delta > 0)
mod_delayed_work(c->io_complete_wq, &j->write_work, delta); mod_delayed_work(j->wq, &j->write_work, delta);
else else
__journal_entry_close(j, JOURNAL_ENTRY_CLOSED_VAL, true); __journal_entry_close(j, JOURNAL_ENTRY_CLOSED_VAL, true);
unlock: unlock:
...@@ -1303,11 +1300,12 @@ int bch2_dev_journal_init(struct bch_dev *ca, struct bch_sb *sb) ...@@ -1303,11 +1300,12 @@ int bch2_dev_journal_init(struct bch_dev *ca, struct bch_sb *sb)
void bch2_fs_journal_exit(struct journal *j) void bch2_fs_journal_exit(struct journal *j)
{ {
unsigned i; if (j->wq)
destroy_workqueue(j->wq);
darray_exit(&j->early_journal_entries); darray_exit(&j->early_journal_entries);
for (i = 0; i < ARRAY_SIZE(j->buf); i++) for (unsigned i = 0; i < ARRAY_SIZE(j->buf); i++)
kvpfree(j->buf[i].data, j->buf[i].buf_size); kvpfree(j->buf[i].data, j->buf[i].buf_size);
free_fifo(&j->pin); free_fifo(&j->pin);
} }
...@@ -1315,7 +1313,6 @@ void bch2_fs_journal_exit(struct journal *j) ...@@ -1315,7 +1313,6 @@ void bch2_fs_journal_exit(struct journal *j)
int bch2_fs_journal_init(struct journal *j) int bch2_fs_journal_init(struct journal *j)
{ {
static struct lock_class_key res_key; static struct lock_class_key res_key;
unsigned i;
mutex_init(&j->buf_lock); mutex_init(&j->buf_lock);
spin_lock_init(&j->lock); spin_lock_init(&j->lock);
...@@ -1336,7 +1333,7 @@ int bch2_fs_journal_init(struct journal *j) ...@@ -1336,7 +1333,7 @@ int bch2_fs_journal_init(struct journal *j)
if (!(init_fifo(&j->pin, JOURNAL_PIN, GFP_KERNEL))) if (!(init_fifo(&j->pin, JOURNAL_PIN, GFP_KERNEL)))
return -BCH_ERR_ENOMEM_journal_pin_fifo; return -BCH_ERR_ENOMEM_journal_pin_fifo;
for (i = 0; i < ARRAY_SIZE(j->buf); i++) { for (unsigned i = 0; i < ARRAY_SIZE(j->buf); i++) {
j->buf[i].buf_size = JOURNAL_ENTRY_SIZE_MIN; j->buf[i].buf_size = JOURNAL_ENTRY_SIZE_MIN;
j->buf[i].data = kvpmalloc(j->buf[i].buf_size, GFP_KERNEL); j->buf[i].data = kvpmalloc(j->buf[i].buf_size, GFP_KERNEL);
if (!j->buf[i].data) if (!j->buf[i].data)
...@@ -1344,6 +1341,11 @@ int bch2_fs_journal_init(struct journal *j) ...@@ -1344,6 +1341,11 @@ int bch2_fs_journal_init(struct journal *j)
} }
j->pin.front = j->pin.back = 1; j->pin.front = j->pin.back = 1;
j->wq = alloc_workqueue("bcachefs_journal",
WQ_HIGHPRI|WQ_FREEZABLE|WQ_UNBOUND|WQ_MEM_RECLAIM, 512);
if (!j->wq)
return -BCH_ERR_ENOMEM_fs_other_alloc;
return 0; return 0;
} }
......
...@@ -1648,7 +1648,7 @@ static CLOSURE_CALLBACK(journal_write_done) ...@@ -1648,7 +1648,7 @@ static CLOSURE_CALLBACK(journal_write_done)
if (!journal_state_count(new, new.unwritten_idx) && if (!journal_state_count(new, new.unwritten_idx) &&
journal_last_unwritten_seq(j) <= journal_cur_seq(j)) { journal_last_unwritten_seq(j) <= journal_cur_seq(j)) {
spin_unlock(&j->lock); spin_unlock(&j->lock);
closure_call(&j->io, bch2_journal_write, c->io_complete_wq, NULL); closure_call(&j->io, bch2_journal_write, j->wq, NULL);
} else if (journal_last_unwritten_seq(j) == journal_cur_seq(j) && } else if (journal_last_unwritten_seq(j) == journal_cur_seq(j) &&
new.cur_entry_offset < JOURNAL_ENTRY_CLOSED_VAL) { new.cur_entry_offset < JOURNAL_ENTRY_CLOSED_VAL) {
struct journal_buf *buf = journal_cur_buf(j); struct journal_buf *buf = journal_cur_buf(j);
...@@ -1661,7 +1661,7 @@ static CLOSURE_CALLBACK(journal_write_done) ...@@ -1661,7 +1661,7 @@ static CLOSURE_CALLBACK(journal_write_done)
*/ */
spin_unlock(&j->lock); spin_unlock(&j->lock);
mod_delayed_work(c->io_complete_wq, &j->write_work, max(0L, delta)); mod_delayed_work(j->wq, &j->write_work, max(0L, delta));
} else { } else {
spin_unlock(&j->lock); spin_unlock(&j->lock);
} }
...@@ -1731,7 +1731,7 @@ static CLOSURE_CALLBACK(do_journal_write) ...@@ -1731,7 +1731,7 @@ static CLOSURE_CALLBACK(do_journal_write)
le64_to_cpu(w->data->seq); le64_to_cpu(w->data->seq);
} }
continue_at(cl, journal_write_done, c->io_complete_wq); continue_at(cl, journal_write_done, j->wq);
} }
static int bch2_journal_write_prep(struct journal *j, struct journal_buf *w) static int bch2_journal_write_prep(struct journal *j, struct journal_buf *w)
...@@ -1998,12 +1998,12 @@ CLOSURE_CALLBACK(bch2_journal_write) ...@@ -1998,12 +1998,12 @@ CLOSURE_CALLBACK(bch2_journal_write)
} }
} }
continue_at(cl, do_journal_write, c->io_complete_wq); continue_at(cl, do_journal_write, j->wq);
return; return;
no_io: no_io:
continue_at(cl, journal_write_done, c->io_complete_wq); continue_at(cl, journal_write_done, j->wq);
return; return;
err: err:
bch2_fatal_error(c); bch2_fatal_error(c);
continue_at(cl, journal_write_done, c->io_complete_wq); continue_at(cl, journal_write_done, j->wq);
} }
...@@ -205,6 +205,7 @@ struct journal { ...@@ -205,6 +205,7 @@ struct journal {
struct closure io; struct closure io;
struct delayed_work write_work; struct delayed_work write_work;
struct workqueue_struct *wq;
/* Sequence number of most recent journal entry (last entry in @pin) */ /* Sequence number of most recent journal entry (last entry in @pin) */
atomic64_t seq; atomic64_t seq;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment