diff options
author | Kent Overstreet <kmo@daterainc.com> | 2013-10-24 17:07:04 -0700 |
---|---|---|
committer | Kent Overstreet <kmo@daterainc.com> | 2013-11-10 21:56:02 -0800 |
commit | a34a8bfd4e6358c646928320d37b0425c0762f8a (patch) | |
tree | 650dd57be0460f439551baca3514009b4287bb12 /drivers/md/bcache/journal.c | |
parent | cdd972b164be8fc69f6ee8533c5a07b621da74c7 (diff) | |
download | talos-op-linux-a34a8bfd4e6358c646928320d37b0425c0762f8a.tar.gz talos-op-linux-a34a8bfd4e6358c646928320d37b0425c0762f8a.zip |
bcache: Refactor journalling flow control
Making things less asynchronous that don't need to be - bch_journal()
only has to block when the journal or journal entry is full, which is
emphatically not a fast path. So make it a normal function that just
returns when it finishes, to make the code and control flow easier to
follow.
Signed-off-by: Kent Overstreet <kmo@daterainc.com>
Diffstat (limited to 'drivers/md/bcache/journal.c')
-rw-r--r-- | drivers/md/bcache/journal.c | 213 |
1 files changed, 100 insertions, 113 deletions
diff --git a/drivers/md/bcache/journal.c b/drivers/md/bcache/journal.c index 1bdefdb1fa71..940e89e0d706 100644 --- a/drivers/md/bcache/journal.c +++ b/drivers/md/bcache/journal.c @@ -318,7 +318,6 @@ int bch_journal_replay(struct cache_set *s, struct list_head *list, bch_keylist_push(&op->keys); op->journal = i->pin; - atomic_inc(op->journal); ret = bch_btree_insert(op, s, &op->keys); if (ret) @@ -357,48 +356,35 @@ static void btree_flush_write(struct cache_set *c) * Try to find the btree node with that references the oldest journal * entry, best is our current candidate and is locked if non NULL: */ - struct btree *b, *best = NULL; - unsigned iter; + struct btree *b, *best; + unsigned i; +retry: + best = NULL; + + for_each_cached_btree(b, c, i) + if (btree_current_write(b)->journal) { + if (!best) + best = b; + else if (journal_pin_cmp(c, + btree_current_write(best), + btree_current_write(b))) { + best = b; + } + } - for_each_cached_btree(b, c, iter) { - if (!down_write_trylock(&b->lock)) - continue; + b = best; + if (b) { + rw_lock(true, b, b->level); - if (!btree_node_dirty(b) || - !btree_current_write(b)->journal) { + if (!btree_current_write(b)->journal) { rw_unlock(true, b); - continue; + /* We raced */ + goto retry; } - if (!best) - best = b; - else if (journal_pin_cmp(c, - btree_current_write(best), - btree_current_write(b))) { - rw_unlock(true, best); - best = b; - } else - rw_unlock(true, b); + bch_btree_node_write(b, NULL); + rw_unlock(true, b); } - - if (best) - goto out; - - /* We can't find the best btree node, just pick the first */ - list_for_each_entry(b, &c->btree_cache, list) - if (!b->level && btree_node_dirty(b)) { - best = b; - rw_lock(true, best, best->level); - goto found; - } - -out: - if (!best) - return; -found: - if (btree_node_dirty(best)) - bch_btree_node_write(best, NULL); - rw_unlock(true, best); } #define last_seq(j) ((j)->seq - fifo_used(&(j)->pin) + 1) @@ -494,7 +480,7 @@ static void journal_reclaim(struct cache_set *c) do_journal_discard(ca); if (c->journal.blocks_free) - return; + goto out; /* * Allocate: @@ -520,7 +506,7 @@ static void journal_reclaim(struct cache_set *c) if (n) c->journal.blocks_free = c->sb.bucket_size >> c->block_bits; - +out: if (!journal_full(&c->journal)) __closure_wake_up(&c->journal.wait); } @@ -659,7 +645,7 @@ static void journal_write(struct closure *cl) journal_write_unlocked(cl); } -static void __journal_try_write(struct cache_set *c, bool noflush) +static void journal_try_write(struct cache_set *c) __releases(c->journal.lock) { struct closure *cl = &c->journal.io; @@ -667,29 +653,59 @@ static void __journal_try_write(struct cache_set *c, bool noflush) w->need_write = true; - if (!closure_trylock(cl, &c->cl)) - spin_unlock(&c->journal.lock); - else if (noflush && journal_full(&c->journal)) { - spin_unlock(&c->journal.lock); - continue_at(cl, journal_write, system_wq); - } else + if (closure_trylock(cl, &c->cl)) journal_write_unlocked(cl); + else + spin_unlock(&c->journal.lock); } -#define journal_try_write(c) __journal_try_write(c, false) - -void bch_journal_meta(struct cache_set *c, struct closure *cl) +static struct journal_write *journal_wait_for_write(struct cache_set *c, + unsigned nkeys) { - struct journal_write *w; + size_t sectors; + struct closure cl; - if (CACHE_SYNC(&c->sb)) { - spin_lock(&c->journal.lock); - w = c->journal.cur; + closure_init_stack(&cl); + + spin_lock(&c->journal.lock); + + while (1) { + struct journal_write *w = c->journal.cur; + + sectors = __set_blocks(w->data, w->data->keys + nkeys, + c) * c->sb.block_size; + + if (sectors <= min_t(size_t, + c->journal.blocks_free * c->sb.block_size, + PAGE_SECTORS << JSET_BITS)) + return w; + + /* XXX: tracepoint */ + if (!journal_full(&c->journal)) { + trace_bcache_journal_entry_full(c); + + /* + * XXX: If we were inserting so many keys that they + * won't fit in an _empty_ journal write, we'll + * deadlock. For now, handle this in + * bch_keylist_realloc() - but something to think about. + */ + BUG_ON(!w->data->keys); + + closure_wait(&w->wait, &cl); + journal_try_write(c); /* unlocks */ + } else { + trace_bcache_journal_full(c); + + closure_wait(&c->journal.wait, &cl); + journal_reclaim(c); + spin_unlock(&c->journal.lock); - if (cl) - BUG_ON(!closure_wait(&w->wait, cl)); + btree_flush_write(c); + } - __journal_try_write(c, true); + closure_sync(&cl); + spin_lock(&c->journal.lock); } } @@ -708,68 +724,26 @@ static void journal_write_work(struct work_struct *work) * bch_journal() hands those same keys off to btree_insert_async() */ -void bch_journal(struct closure *cl) +atomic_t *bch_journal(struct cache_set *c, + struct keylist *keys, + struct closure *parent) { - struct btree_op *op = container_of(cl, struct btree_op, cl); - struct cache_set *c = op->c; struct journal_write *w; - size_t sectors, nkeys; - - if (op->type != BTREE_INSERT || - !CACHE_SYNC(&c->sb)) - goto out; - - /* - * If we're looping because we errored, might already be waiting on - * another journal write: - */ - while (atomic_read(&cl->parent->remaining) & CLOSURE_WAITING) - closure_sync(cl->parent); - - spin_lock(&c->journal.lock); - - if (journal_full(&c->journal)) { - trace_bcache_journal_full(c); - - closure_wait(&c->journal.wait, cl); - - journal_reclaim(c); - spin_unlock(&c->journal.lock); - - btree_flush_write(c); - continue_at(cl, bch_journal, bcache_wq); - } + atomic_t *ret; - w = c->journal.cur; - nkeys = w->data->keys + bch_keylist_nkeys(&op->keys); - sectors = __set_blocks(w->data, nkeys, c) * c->sb.block_size; + if (!CACHE_SYNC(&c->sb)) + return NULL; - if (sectors > min_t(size_t, - c->journal.blocks_free * c->sb.block_size, - PAGE_SECTORS << JSET_BITS)) { - trace_bcache_journal_entry_full(c); + w = journal_wait_for_write(c, bch_keylist_nkeys(keys)); - /* - * XXX: If we were inserting so many keys that they won't fit in - * an _empty_ journal write, we'll deadlock. For now, handle - * this in bch_keylist_realloc() - but something to think about. - */ - BUG_ON(!w->data->keys); + memcpy(end(w->data), keys->keys, bch_keylist_bytes(keys)); + w->data->keys += bch_keylist_nkeys(keys); - BUG_ON(!closure_wait(&w->wait, cl)); + ret = &fifo_back(&c->journal.pin); + atomic_inc(ret); - journal_try_write(c); - continue_at(cl, bch_journal, bcache_wq); - } - - memcpy(end(w->data), op->keys.keys, bch_keylist_bytes(&op->keys)); - w->data->keys += bch_keylist_nkeys(&op->keys); - - op->journal = &fifo_back(&c->journal.pin); - atomic_inc(op->journal); - - if (op->flush_journal) { - closure_wait(&w->wait, cl->parent); + if (parent) { + closure_wait(&w->wait, parent); journal_try_write(c); } else if (!w->need_write) { schedule_delayed_work(&c->journal.work, @@ -778,8 +752,21 @@ void bch_journal(struct closure *cl) } else { spin_unlock(&c->journal.lock); } -out: - bch_btree_insert_async(cl); + + + return ret; +} + +void bch_journal_meta(struct cache_set *c, struct closure *cl) +{ + struct keylist keys; + atomic_t *ref; + + bch_keylist_init(&keys); + + ref = bch_journal(c, &keys, cl); + if (ref) + atomic_dec_bug(ref); } void bch_journal_free(struct cache_set *c) |