summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--fs/btrfs/Makefile2
-rw-r--r--fs/btrfs/async-thread.c288
-rw-r--r--fs/btrfs/async-thread.h78
-rw-r--r--fs/btrfs/ctree.h14
-rw-r--r--fs/btrfs/disk-io.c200
-rw-r--r--fs/btrfs/inode.c4
-rw-r--r--fs/btrfs/volumes.c162
-rw-r--r--fs/btrfs/volumes.h10
8 files changed, 626 insertions, 132 deletions
diff --git a/fs/btrfs/Makefile b/fs/btrfs/Makefile
index e5fc3cfea0ab..9dcfc2fe3332 100644
--- a/fs/btrfs/Makefile
+++ b/fs/btrfs/Makefile
@@ -6,7 +6,7 @@ btrfs-y := super.o ctree.o extent-tree.o print-tree.o root-tree.o dir-item.o \
hash.o file-item.o inode-item.o inode-map.o disk-io.o \
transaction.o bit-radix.o inode.o file.o tree-defrag.o \
extent_map.o sysfs.o struct-funcs.o xattr.o ordered-data.o \
- extent_io.o volumes.o
+ extent_io.o volumes.o async-thread.o
btrfs-$(CONFIG_FS_POSIX_ACL) += acl.o
else
diff --git a/fs/btrfs/async-thread.c b/fs/btrfs/async-thread.c
new file mode 100644
index 000000000000..2911b67bd6f7
--- /dev/null
+++ b/fs/btrfs/async-thread.c
@@ -0,0 +1,288 @@
+/*
+ * Copyright (C) 2007 Oracle. All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public
+ * License v2 as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public
+ * License along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA.
+ */
+
+#include <linux/kthread.h>
+#include <linux/list.h>
+#include <linux/spinlock.h>
+#include <linux/freezer.h>
+#include "async-thread.h"
+
+/*
+ * container for the kthread task pointer and the list of pending work
+ * One of these is allocated per thread.
+ */
+struct btrfs_worker_thread {
+ /* list of struct btrfs_work that are waiting for service */
+ struct list_head pending;
+
+ /* list of worker threads from struct btrfs_workers */
+ struct list_head worker_list;
+
+ /* kthread */
+ struct task_struct *task;
+
+ /* number of things on the pending list */
+ atomic_t num_pending;
+
+ /* protects the pending list. */
+ spinlock_t lock;
+
+ /* set to non-zero when this thread is already awake and kicking */
+ int working;
+};
+
+/*
+ * main loop for servicing work items
+ */
+static int worker_loop(void *arg)
+{
+ struct btrfs_worker_thread *worker = arg;
+ struct list_head *cur;
+ struct btrfs_work *work;
+ do {
+ spin_lock_irq(&worker->lock);
+ while(!list_empty(&worker->pending)) {
+ cur = worker->pending.next;
+ work = list_entry(cur, struct btrfs_work, list);
+ list_del(&work->list);
+ clear_bit(0, &work->flags);
+
+ work->worker = worker;
+ spin_unlock_irq(&worker->lock);
+
+ work->func(work);
+
+ atomic_dec(&worker->num_pending);
+ spin_lock_irq(&worker->lock);
+ }
+ worker->working = 0;
+ if (freezing(current)) {
+ refrigerator();
+ } else {
+ set_current_state(TASK_INTERRUPTIBLE);
+ spin_unlock_irq(&worker->lock);
+ schedule();
+ __set_current_state(TASK_RUNNING);
+ }
+ } while (!kthread_should_stop());
+ return 0;
+}
+
+/*
+ * this will wait for all the worker threads to shutdown
+ */
+int btrfs_stop_workers(struct btrfs_workers *workers)
+{
+ struct list_head *cur;
+ struct btrfs_worker_thread *worker;
+
+ while(!list_empty(&workers->worker_list)) {
+ cur = workers->worker_list.next;
+ worker = list_entry(cur, struct btrfs_worker_thread,
+ worker_list);
+ kthread_stop(worker->task);
+ list_del(&worker->worker_list);
+ kfree(worker);
+ }
+ return 0;
+}
+
+/*
+ * simple init on struct btrfs_workers
+ */
+void btrfs_init_workers(struct btrfs_workers *workers, int max)
+{
+ workers->num_workers = 0;
+ INIT_LIST_HEAD(&workers->worker_list);
+ workers->last = NULL;
+ spin_lock_init(&workers->lock);
+ workers->max_workers = max;
+}
+
+/*
+ * starts new worker threads. This does not enforce the max worker
+ * count in case you need to temporarily go past it.
+ */
+int btrfs_start_workers(struct btrfs_workers *workers, int num_workers)
+{
+ struct btrfs_worker_thread *worker;
+ int ret = 0;
+ int i;
+
+ for (i = 0; i < num_workers; i++) {
+ worker = kzalloc(sizeof(*worker), GFP_NOFS);
+ if (!worker) {
+ ret = -ENOMEM;
+ goto fail;
+ }
+
+ INIT_LIST_HEAD(&worker->pending);
+ INIT_LIST_HEAD(&worker->worker_list);
+ spin_lock_init(&worker->lock);
+ atomic_set(&worker->num_pending, 0);
+ worker->task = kthread_run(worker_loop, worker, "btrfs");
+ if (IS_ERR(worker->task)) {
+ ret = PTR_ERR(worker->task);
+ goto fail;
+ }
+
+ spin_lock_irq(&workers->lock);
+ list_add_tail(&worker->worker_list, &workers->worker_list);
+ workers->last = worker;
+ workers->num_workers++;
+ spin_unlock_irq(&workers->lock);
+ }
+ return 0;
+fail:
+ btrfs_stop_workers(workers);
+ return ret;
+}
+
+/*
+ * run through the list and find a worker thread that doesn't have a lot
+ * to do right now. This can return null if we aren't yet at the thread
+ * count limit and all of the threads are busy.
+ */
+static struct btrfs_worker_thread *next_worker(struct btrfs_workers *workers)
+{
+ struct btrfs_worker_thread *worker;
+ struct list_head *next;
+ struct list_head *start;
+ int enforce_min = workers->num_workers < workers->max_workers;
+
+ /* start with the last thread if it isn't busy */
+ worker = workers->last;
+ if (atomic_read(&worker->num_pending) < 64)
+ goto done;
+
+ next = worker->worker_list.next;
+ start = &worker->worker_list;
+
+ /*
+ * check all the workers for someone that is bored. FIXME, do
+ * something smart here
+ */
+ while(next != start) {
+ if (next == &workers->worker_list) {
+ next = workers->worker_list.next;
+ continue;
+ }
+ worker = list_entry(next, struct btrfs_worker_thread,
+ worker_list);
+ if (atomic_read(&worker->num_pending) < 64 || !enforce_min)
+ goto done;
+ next = next->next;
+ }
+ /*
+ * nobody was bored, if we're already at the max thread count,
+ * use the last thread
+ */
+ if (!enforce_min || atomic_read(&workers->last->num_pending) < 64) {
+ return workers->last;
+ }
+ return NULL;
+done:
+ workers->last = worker;
+ return worker;
+}
+
+static struct btrfs_worker_thread *find_worker(struct btrfs_workers *workers)
+{
+ struct btrfs_worker_thread *worker;
+ unsigned long flags;
+
+again:
+ spin_lock_irqsave(&workers->lock, flags);
+ worker = next_worker(workers);
+ spin_unlock_irqrestore(&workers->lock, flags);
+
+ if (!worker) {
+ spin_lock_irqsave(&workers->lock, flags);
+ if (workers->num_workers >= workers->max_workers) {
+ /*
+ * we have failed to find any workers, just
+ * return the force one
+ */
+ worker = list_entry(workers->worker_list.next,
+ struct btrfs_worker_thread, worker_list);
+ spin_unlock_irqrestore(&workers->lock, flags);
+ } else {
+ spin_unlock_irqrestore(&workers->lock, flags);
+ /* we're below the limit, start another worker */
+ btrfs_start_workers(workers, 1);
+ goto again;
+ }
+ }
+ return worker;
+}
+
+/*
+ * btrfs_requeue_work just puts the work item back on the tail of the list
+ * it was taken from. It is intended for use with long running work functions
+ * that make some progress and want to give the cpu up for others.
+ */
+int btrfs_requeue_work(struct btrfs_work *work)
+{
+ struct btrfs_worker_thread *worker = work->worker;
+ unsigned long flags;
+
+ if (test_and_set_bit(0, &work->flags))
+ goto out;
+
+ spin_lock_irqsave(&worker->lock, flags);
+ atomic_inc(&worker->num_pending);
+ list_add_tail(&work->list, &worker->pending);
+ spin_unlock_irqrestore(&worker->lock, flags);
+out:
+ return 0;
+}
+
+/*
+ * places a struct btrfs_work into the pending queue of one of the kthreads
+ */
+int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work)
+{
+ struct btrfs_worker_thread *worker;
+ unsigned long flags;
+ int wake = 0;
+
+ /* don't requeue something already on a list */
+ if (test_and_set_bit(0, &work->flags))
+ goto out;
+
+ worker = find_worker(workers);
+
+ spin_lock_irqsave(&worker->lock, flags);
+ atomic_inc(&worker->num_pending);
+ list_add_tail(&work->list, &worker->pending);
+
+ /*
+ * avoid calling into wake_up_process if this thread has already
+ * been kicked
+ */
+ if (!worker->working)
+ wake = 1;
+ worker->working = 1;
+
+ spin_unlock_irqrestore(&worker->lock, flags);
+
+ if (wake)
+ wake_up_process(worker->task);
+out:
+ return 0;
+}
diff --git a/fs/btrfs/async-thread.h b/fs/btrfs/async-thread.h
new file mode 100644
index 000000000000..52fc9da0f9e7
--- /dev/null
+++ b/fs/btrfs/async-thread.h
@@ -0,0 +1,78 @@
+/*
+ * Copyright (C) 2007 Oracle. All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public
+ * License v2 as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public
+ * License along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA.
+ */
+
+#ifndef __BTRFS_ASYNC_THREAD_
+#define __BTRFS_ASYNC_THREAD_
+
+struct btrfs_worker_thread;
+
+/*
+ * This is similar to a workqueue, but it is meant to spread the operations
+ * across all available cpus instead of just the CPU that was used to
+ * queue the work. There is also some batching introduced to try and
+ * cut down on context switches.
+ *
+ * By default threads are added on demand up to 2 * the number of cpus.
+ * Changing struct btrfs_workers->max_workers is one way to prevent
+ * demand creation of kthreads.
+ *
+ * the basic model of these worker threads is to embed a btrfs_work
+ * structure in your own data struct, and use container_of in a
+ * work function to get back to your data struct.
+ */
+struct btrfs_work {
+ /*
+ * only func should be set to the function you want called
+ * your work struct is passed as the only arg
+ */
+ void (*func)(struct btrfs_work *work);
+
+ /*
+ * flags should be set to zero. It is used to make sure the
+ * struct is only inserted once into the list.
+ */
+ unsigned long flags;
+
+ /* don't touch these */
+ struct btrfs_worker_thread *worker;
+ struct list_head list;
+};
+
+struct btrfs_workers {
+ /* current number of running workers */
+ int num_workers;
+
+ /* max number of workers allowed. changed by btrfs_start_workers */
+ int max_workers;
+
+ /* list with all the work threads */
+ struct list_head worker_list;
+
+ /* the last worker thread to have something queued */
+ struct btrfs_worker_thread *last;
+
+ /* lock for finding the next worker thread to queue on */
+ spinlock_t lock;
+};
+
+int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work);
+int btrfs_start_workers(struct btrfs_workers *workers, int num_workers);
+int btrfs_stop_workers(struct btrfs_workers *workers);
+void btrfs_init_workers(struct btrfs_workers *workers, int max);
+int btrfs_requeue_work(struct btrfs_work *work);
+#endif
diff --git a/fs/btrfs/ctree.h b/fs/btrfs/ctree.h
index 49cbc62b42f9..6c91a510c965 100644
--- a/fs/btrfs/ctree.h
+++ b/fs/btrfs/ctree.h
@@ -30,6 +30,7 @@
#include "bit-radix.h"
#include "extent_io.h"
#include "extent_map.h"
+#include "async-thread.h"
struct btrfs_trans_handle;
struct btrfs_transaction;
@@ -518,13 +519,20 @@ struct btrfs_fs_info {
struct list_head hashers;
struct list_head dead_roots;
struct list_head end_io_work_list;
- struct list_head async_submit_work_list;
struct work_struct end_io_work;
- struct work_struct async_submit_work;
spinlock_t end_io_work_lock;
- spinlock_t async_submit_work_lock;
atomic_t nr_async_submits;
+ /*
+ * there is a pool of worker threads for checksumming during writes
+ * and a pool for checksumming after reads. This is because readers
+ * can run with FS locks held, and the writers may be waiting for
+ * those locks. We don't want ordering in the pending list to cause
+ * deadlocks, and so the two are serviced separately.
+ */
+ struct btrfs_workers workers;
+ struct btrfs_workers endio_workers;
+
#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,18)
struct work_struct trans_work;
#else
diff --git a/fs/btrfs/disk-io.c b/fs/btrfs/disk-io.c
index b9a53646ceb2..98ff4fbcb386 100644
--- a/fs/btrfs/disk-io.c
+++ b/fs/btrfs/disk-io.c
@@ -31,6 +31,7 @@
#include "btrfs_inode.h"
#include "volumes.h"
#include "print-tree.h"
+#include "async-thread.h"
#if 0
static int check_tree_block(struct btrfs_root *root, struct extent_buffer *buf)
@@ -46,8 +47,7 @@ static int check_tree_block(struct btrfs_root *root, struct extent_buffer *buf)
#endif
static struct extent_io_ops btree_extent_io_ops;
-static struct workqueue_struct *end_io_workqueue;
-static struct workqueue_struct *async_submit_workqueue;
+static void end_workqueue_fn(struct btrfs_work *work);
struct end_io_wq {
struct bio *bio;
@@ -57,6 +57,7 @@ struct end_io_wq {
int error;
int metadata;
struct list_head list;
+ struct btrfs_work work;
};
struct async_submit_bio {
@@ -66,6 +67,7 @@ struct async_submit_bio {
extent_submit_bio_hook_t *submit_bio_hook;
int rw;
int mirror_num;
+ struct btrfs_work work;
};
struct extent_map *btree_get_extent(struct inode *inode, struct page *page,
@@ -389,7 +391,6 @@ static int end_workqueue_bio(struct bio *bio,
{
struct end_io_wq *end_io_wq = bio->bi_private;
struct btrfs_fs_info *fs_info;
- unsigned long flags;
#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,23)
if (bio->bi_size)
@@ -397,11 +398,10 @@ static int end_workqueue_bio(struct bio *bio,
#endif
fs_info = end_io_wq->info;
- spin_lock_irqsave(&fs_info->end_io_work_lock, flags);
end_io_wq->error = err;
- list_add_tail(&end_io_wq->list, &fs_info->end_io_work_list);
- spin_unlock_irqrestore(&fs_info->end_io_work_lock, flags);
- queue_work(end_io_workqueue, &fs_info->end_io_work);
+ end_io_wq->work.func = end_workqueue_fn;
+ end_io_wq->work.flags = 0;
+ btrfs_queue_worker(&fs_info->endio_workers, &end_io_wq->work);
#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,23)
return 0;
@@ -428,6 +428,19 @@ int btrfs_bio_wq_end_io(struct btrfs_fs_info *info, struct bio *bio,
return 0;
}
+static void run_one_async_submit(struct btrfs_work *work)
+{
+ struct btrfs_fs_info *fs_info;
+ struct async_submit_bio *async;
+
+ async = container_of(work, struct async_submit_bio, work);
+ fs_info = BTRFS_I(async->inode)->root->fs_info;
+ atomic_dec(&fs_info->nr_async_submits);
+ async->submit_bio_hook(async->inode, async->rw, async->bio,
+ async->mirror_num);
+ kfree(async);
+}
+
int btrfs_wq_submit_bio(struct btrfs_fs_info *fs_info, struct inode *inode,
int rw, struct bio *bio, int mirror_num,
extent_submit_bio_hook_t *submit_bio_hook)
@@ -443,13 +456,10 @@ int btrfs_wq_submit_bio(struct btrfs_fs_info *fs_info, struct inode *inode,
async->bio = bio;
async->mirror_num = mirror_num;
async->submit_bio_hook = submit_bio_hook;
-
- spin_lock(&fs_info->async_submit_work_lock);
- list_add_tail(&async->list, &fs_info->async_submit_work_list);
+ async->work.func = run_one_async_submit;
+ async->work.flags = 0;
atomic_inc(&fs_info->nr_async_submits);
- spin_unlock(&fs_info->async_submit_work_lock);
-
- queue_work(async_submit_workqueue, &fs_info->async_submit_work);
+ btrfs_queue_worker(&fs_info->workers, &async->work);
return 0;
}
@@ -462,19 +472,32 @@ static int __btree_submit_bio_hook(struct inode *inode, int rw, struct bio *bio,
offset = bio->bi_sector << 9;
+ /*
+ * when we're called for a write, we're already in the async
+ * submission context. Just jump ingo btrfs_map_bio
+ */
if (rw & (1 << BIO_RW)) {
- return btrfs_map_bio(BTRFS_I(inode)->root, rw, bio, mirror_num);
+ return btrfs_map_bio(BTRFS_I(inode)->root, rw, bio,
+ mirror_num, 0);
}
+ /*
+ * called for a read, do the setup so that checksum validation
+ * can happen in the async kernel threads
+ */
ret = btrfs_bio_wq_end_io(root->fs_info, bio, 1);
BUG_ON(ret);
- return btrfs_map_bio(BTRFS_I(inode)->root, rw, bio, mirror_num);
+ return btrfs_map_bio(BTRFS_I(inode)->root, rw, bio, mirror_num, 1);
}
static int btree_submit_bio_hook(struct inode *inode, int rw, struct bio *bio,
int mirror_num)
{
+ /*
+ * kthread helpers are used to submit writes so that checksumming
+ * can happen in parallel across all CPUs
+ */
if (!(rw & (1 << BIO_RW))) {
return __btree_submit_bio_hook(inode, rw, bio, mirror_num);
}
@@ -1036,95 +1059,40 @@ static int bio_ready_for_csum(struct bio *bio)
return ret;
}
-#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,18)
-static void btrfs_end_io_csum(void *p)
-#else
-static void btrfs_end_io_csum(struct work_struct *work)
-#endif
+/*
+ * called by the kthread helper functions to finally call the bio end_io
+ * functions. This is where read checksum verification actually happens
+ */
+static void end_workqueue_fn(struct btrfs_work *work)
{
-#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,18)
- struct btrfs_fs_info *fs_info = p;
-#else
- struct btrfs_fs_info *fs_info = container_of(work,
- struct btrfs_fs_info,
- end_io_work);
-#endif
- unsigned long flags;
- struct end_io_wq *end_io_wq;
struct bio *bio;
- struct list_head *next;
+ struct end_io_wq *end_io_wq;
+ struct btrfs_fs_info *fs_info;
int error;
- int was_empty;
- while(1) {
- spin_lock_irqsave(&fs_info->end_io_work_lock, flags);
- if (list_empty(&fs_info->end_io_work_list)) {
- spin_unlock_irqrestore(&fs_info->end_io_work_lock,
- flags);
- return;
- }
- next = fs_info->end_io_work_list.next;
- list_del(next);
- spin_unlock_irqrestore(&fs_info->end_io_work_lock, flags);
-
- end_io_wq = list_entry(next, struct end_io_wq, list);
-
- bio = end_io_wq->bio;
- if (end_io_wq->metadata && !bio_ready_for_csum(bio)) {
- spin_lock_irqsave(&fs_info->end_io_work_lock, flags);
- was_empty = list_empty(&fs_info->end_io_work_list);
- list_add_tail(&end_io_wq->list,
- &fs_info->end_io_work_list);
- spin_unlock_irqrestore(&fs_info->end_io_work_lock,
- flags);
- if (was_empty)
- return;
- continue;
- }
- error = end_io_wq->error;
- bio->bi_private = end_io_wq->private;
- bio->bi_end_io = end_io_wq->end_io;
- kfree(end_io_wq);
-#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,23)
- bio_endio(bio, bio->bi_size, error);
-#else
- bio_endio(bio, error);
-#endif
- }
-}
+ end_io_wq = container_of(work, struct end_io_wq, work);
+ bio = end_io_wq->bio;
+ fs_info = end_io_wq->info;
-#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,18)
-static void btrfs_async_submit_work(void *p)
-#else
-static void btrfs_async_submit_work(struct work_struct *work)
-#endif
-{
-#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,18)
- struct btrfs_fs_info *fs_info = p;
+ /* metadata bios are special because the whole tree block must
+ * be checksummed at once. This makes sure the entire block is in
+ * ram and up to date before trying to verify things. For
+ * blocksize <= pagesize, it is basically a noop
+ */
+ if (end_io_wq->metadata && !bio_ready_for_csum(bio)) {
+ btrfs_queue_worker(&fs_info->endio_workers,
+ &end_io_wq->work);
+ return;
+ }
+ error = end_io_wq->error;
+ bio->bi_private = end_io_wq->private;
+ bio->bi_end_io = end_io_wq->end_io;
+ kfree(end_io_wq);
+#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,23)
+ bio_endio(bio, bio->bi_size, error);
#else
- struct btrfs_fs_info *fs_info = container_of(work,
- struct btrfs_fs_info,
- async_submit_work);
+ bio_endio(bio, error);
#endif
- struct async_submit_bio *async;
- struct list_head *next;
-
- while(1) {
- spin_lock(&fs_info->async_submit_work_lock);
- if (list_empty(&fs_info->async_submit_work_list)) {
- spin_unlock(&fs_info->async_submit_work_lock);
- return;
- }
- next = fs_info->async_submit_work_list.next;
- list_del(next);
- atomic_dec(&fs_info->nr_async_submits);
- spin_unlock(&fs_info->async_submit_work_lock);
-
- async = list_entry(next, struct async_submit_bio, list);
- async->submit_bio_hook(async->inode, async->rw, async->bio,
- async->mirror_num);
- kfree(async);
- }
}
struct btrfs_root *open_ctree(struct super_block *sb,
@@ -1155,19 +1123,11 @@ struct btrfs_root *open_ctree(struct super_block *sb,
err = -ENOMEM;
goto fail;
}
- end_io_workqueue = create_workqueue("btrfs-end-io");
- BUG_ON(!end_io_workqueue);
- async_submit_workqueue = create_workqueue("btrfs-async-submit");
-
INIT_RADIX_TREE(&fs_info->fs_roots_radix, GFP_NOFS);
INIT_LIST_HEAD(&fs_info->trans_list);
INIT_LIST_HEAD(&fs_info->dead_roots);
INIT_LIST_HEAD(&fs_info->hashers);
- INIT_LIST_HEAD(&fs_info->end_io_work_list);
- INIT_LIST_HEAD(&fs_info->async_submit_work_list);
spin_lock_init(&fs_info->hash_lock);
- spin_lock_init(&fs_info->end_io_work_lock);
- spin_lock_init(&fs_info->async_submit_work_lock);
spin_lock_init(&fs_info->delalloc_lock);
spin_lock_init(&fs_info->new_trans_lock);
@@ -1222,13 +1182,8 @@ struct btrfs_root *open_ctree(struct super_block *sb,
fs_info->do_barriers = 1;
#if LINUX_VERSION_CODE <= KERNEL_VERSION(2,6,18)
- INIT_WORK(&fs_info->end_io_work, btrfs_end_io_csum, fs_info);
- INIT_WORK(&fs_info->async_submit_work, btrfs_async_submit_work,
- fs_info);
INIT_WORK(&fs_info->trans_work, btrfs_transaction_cleaner, fs_info);
#else
- INIT_WORK(&fs_info->end_io_work, btrfs_end_io_csum);
- INIT_WORK(&fs_info->async_submit_work, btrfs_async_submit_work);
INIT_DELAYED_WORK(&fs_info->trans_work, btrfs_transaction_cleaner);
#endif
BTRFS_I(fs_info->btree_inode)->root = tree_root;
@@ -1240,6 +1195,19 @@ struct btrfs_root *open_ctree(struct super_block *sb,
mutex_init(&fs_info->trans_mutex);
mutex_init(&fs_info->fs_mutex);
+ /* we need to start all the end_io workers up front because the
+ * queue work function gets called at interrupt time. The endio
+ * workers don't normally start IO, so some number of them <= the
+ * number of cpus is fine. They handle checksumming after a read.
+ *
+ * The other worker threads do start IO, so the max is larger than
+ * the number of CPUs. FIXME, tune this for huge machines
+ */
+ btrfs_init_workers(&fs_info->workers, num_online_cpus() * 2);
+ btrfs_init_workers(&fs_info->endio_workers, num_online_cpus());
+ btrfs_start_workers(&fs_info->workers, 1);
+ btrfs_start_workers(&fs_info->endio_workers, num_online_cpus());
+
#if 0
ret = add_hasher(fs_info, "crc32c");
if (ret) {
@@ -1375,6 +1343,8 @@ fail_sb_buffer:
extent_io_tree_empty_lru(&BTRFS_I(fs_info->btree_inode)->io_tree);
fail_iput:
iput(fs_info->btree_inode);
+ btrfs_stop_workers(&fs_info->workers);
+ btrfs_stop_workers(&fs_info->endio_workers);
fail:
btrfs_close_devices(fs_info->fs_devices);
btrfs_mapping_tree_free(&fs_info->mapping_tree);
@@ -1623,16 +1593,10 @@ int close_ctree(struct btrfs_root *root)
extent_io_tree_empty_lru(&fs_info->extent_ins);
extent_io_tree_empty_lru(&BTRFS_I(fs_info->btree_inode)->io_tree);
- flush_workqueue(async_submit_workqueue);
- flush_workqueue(end_io_workqueue);
-
truncate_inode_pages(fs_info->btree_inode->i_mapping, 0);
- flush_workqueue(async_submit_workqueue);
- destroy_workqueue(async_submit_workqueue);
-
- flush_workqueue(end_io_workqueue);
- destroy_workqueue(end_io_workqueue);
+ btrfs_stop_workers(&fs_info->workers);
+ btrfs_stop_workers(&fs_info->endio_workers);
iput(fs_info->btree_inode);
#if 0
diff --git a/fs/btrfs/inode.c b/fs/btrfs/inode.c
index 0f14697becef..7daef8d37006 100644
--- a/fs/btrfs/inode.c
+++ b/fs/btrfs/inode.c
@@ -359,7 +359,7 @@ int __btrfs_submit_bio_hook(struct inode *inode, int rw, struct bio *bio,
kfree(sums);
- return btrfs_map_bio(root, rw, bio, mirror_num);
+ return btrfs_map_bio(root, rw, bio, mirror_num, 1);
}
int btrfs_submit_bio_hook(struct inode *inode, int rw, struct bio *bio,
@@ -383,7 +383,7 @@ int btrfs_submit_bio_hook(struct inode *inode, int rw, struct bio *bio,
inode, rw, bio, mirror_num,
__btrfs_submit_bio_hook);
mapit:
- return btrfs_map_bio(root, rw, bio, mirror_num);
+ return btrfs_map_bio(root, rw, bio, mirror_num, 0);
}
int btrfs_readpage_io_hook(struct page *page, u64 start, u64 end)
diff --git a/fs/btrfs/volumes.c b/fs/btrfs/volumes.c
index 722eb4550154..c57458ce6339 100644
--- a/fs/btrfs/volumes.c
+++ b/fs/btrfs/volumes.c
@@ -27,6 +27,7 @@
#include "transaction.h"
#include "print-tree.h"
#include "volumes.h"
+#include "async-thread.h"
struct map_lookup {
u64 type;
@@ -110,6 +111,101 @@ static struct btrfs_fs_devices *find_fsid(u8 *fsid)
return NULL;
}
+/*
+ * we try to collect pending bios for a device so we don't get a large
+ * number of procs sending bios down to the same device. This greatly
+ * improves the schedulers ability to collect and merge the bios.
+ *
+ * But, it also turns into a long list of bios to process and that is sure
+ * to eventually make the worker thread block. The solution here is to
+ * make some progress and then put this work struct back at the end of
+ * the list if the block device is congested. This way, multiple devices
+ * can make progress from a single worker thread.
+ */
+int run_scheduled_bios(struct btrfs_device *device)
+{
+ struct bio *pending;
+ struct backing_dev_info *bdi;
+ struct bio *tail;
+ struct bio *cur;
+ int again = 0;
+ unsigned long num_run = 0;
+
+ bdi = device->bdev->bd_inode->i_mapping->backing_dev_info;
+loop:
+ spin_lock(&device->io_lock);
+
+ /* take all the bios off the list at once and process them
+ * later on (without the lock held). But, remember the
+ * tail and other pointers so the bios can be properly reinserted
+ * into the list if we hit congestion
+ */
+ pending = device->pending_bios;
+ tail = device->pending_bio_tail;
+ WARN_ON(pending && !tail);
+ device->pending_bios = NULL;
+ device->pending_bio_tail = NULL;
+
+ /*
+ * if pending was null this time around, no bios need processing
+ * at all and we can stop. Otherwise it'll loop back up again
+ * and do an additional check so no bios are missed.
+ *
+ * device->running_pending is used to synchronize with the
+ * schedule_bio code.
+ */
+ if (pending) {
+ again = 1;
+ device->running_pending = 1;
+ } else {
+ again = 0;
+ device->running_pending = 0;
+ }
+ spin_unlock(&device->io_lock);
+
+ while(pending) {
+ cur = pending;
+ pending = pending->bi_next;
+ cur->bi_next = NULL;
+ atomic_dec(&device->dev_root->fs_info->nr_async_submits);
+ submit_bio(cur->bi_rw, cur);
+ num_run++;
+
+ /*
+ * we made progress, there is more work to do and the bdi
+ * is now congested. Back off and let other work structs
+ * run instead
+ */
+ if (pending && num_run && bdi_write_congested(bdi)) {
+ struct bio *old_head;
+
+ spin_lock(&device->io_lock);
+ old_head = device->pending_bios;
+ device->pending_bios = pending;
+ if (device->pending_bio_tail)
+ tail->bi_next = old_head;
+ else
+ device->pending_bio_tail = tail;
+
+ spin_unlock(&device->io_lock);
+ btrfs_requeue_work(&device->work);
+ goto done;
+ }
+ }
+ if (again)
+ goto loop;
+done:
+ return 0;
+}
+
+void pending_bios_fn(struct btrfs_work *work)
+{
+ struct btrfs_device *device;
+
+ device = container_of(work, struct btrfs_device, work);
+ run_scheduled_bios(device);
+}
+
static int device_list_add(const char *path,
struct btrfs_super_block *disk_super,
u64 devid, struct btrfs_fs_devices **fs_devices_ret)
@@ -141,6 +237,7 @@ static int device_list_add(const char *path,
return -ENOMEM;
}
device->devid = devid;
+ device->work.func = pending_bios_fn;
memcpy(device->uuid, disk_super->dev_item.uuid,
BTRFS_UUID_SIZE);
device->barriers = 1;
@@ -925,6 +1022,7 @@ int btrfs_init_new_device(struct btrfs_root *root, char *device_path)
}
device->barriers = 1;
+ device->work.func = pending_bios_fn;
generate_random_uuid(device->uuid);
spin_lock_init(&device->io_lock);
device->name = kstrdup(device_path, GFP_NOFS);
@@ -1965,8 +2063,61 @@ static int end_bio_multi_stripe(struct bio *bio,
#endif
}
+struct async_sched {
+ struct bio *bio;
+ int rw;
+ struct btrfs_fs_info *info;
+ struct btrfs_work work;
+};
+
+/*
+ * see run_scheduled_bios for a description of why bios are collected for
+ * async submit.
+ *
+ * This will add one bio to the pending list for a device and make sure
+ * the work struct is scheduled.
+ */
+int schedule_bio(struct btrfs_root *root, struct btrfs_device *device,
+ int rw, struct bio *bio)
+{
+ int should_queue = 1;
+
+ /* don't bother with additional async steps for reads, right now */
+ if (!(rw & (1 << BIO_RW))) {
+ submit_bio(rw, bio);
+ return 0;
+ }
+
+ /*
+ * nr_async_sumbits allows us to reliably return congestion to the
+ * higher layers. Otherwise, the async bio makes it appear we have
+ * made progress against dirty pages when we've really just put it
+ * on a queue for later
+ */
+ atomic_inc(&root->fs_info->nr_async_submits);
+ bio->bi_next = NULL;
+ bio->bi_rw |= rw;
+
+ spin_lock(&device->io_lock);
+
+ if (device->pending_bio_tail)
+ device->pending_bio_tail->bi_next = bio;
+
+ device->pending_bio_tail = bio;
+ if (!device->pending_bios)
+ device->pending_bios = bio;
+ if (device->running_pending)
+ should_queue = 0;
+
+ spin_unlock(&device->io_lock);
+
+ if (should_queue)
+ btrfs_queue_worker(&root->fs_info->workers, &device->work);
+ return 0;
+}
+
int btrfs_map_bio(struct btrfs_root *root, int rw, struct bio *bio,
- int mirror_num)
+ int mirror_num, int async_submit)
{
struct btrfs_mapping_tree *map_tree;
struct btrfs_device *dev;
@@ -2012,10 +2163,10 @@ int btrfs_map_bio(struct btrfs_root *root, int rw, struct bio *bio,
dev = multi->stripes[dev_nr].dev;
if (dev && dev->bdev) {
bio->bi_bdev = dev->bdev;
- spin_lock(&dev->io_lock);
- dev->total_ios++;
- spin_unlock(&dev->io_lock);
- submit_bio(rw, bio);
+ if (async_submit)
+ schedule_bio(root, dev, rw, bio);
+ else
+ submit_bio(rw, bio);
} else {
bio->bi_bdev = root->fs_info->fs_devices->latest_bdev;
bio->bi_sector = logical >> 9;
@@ -2054,6 +2205,7 @@ static struct btrfs_device *add_missing_dev(struct btrfs_root *root,
device->barriers = 1;
device->dev_root = root->fs_info->dev_root;
device->devid = devid;
+ device->work.func = pending_bios_fn;
fs_devices->num_devices++;
spin_lock_init(&device->io_lock);
memcpy(device->uuid, dev_uuid, BTRFS_UUID_SIZE);
diff --git a/fs/btrfs/volumes.h b/fs/btrfs/volumes.h
index 4df6b1608f91..48a44f7a9385 100644
--- a/fs/btrfs/volumes.h
+++ b/fs/btrfs/volumes.h
@@ -20,6 +20,7 @@
#define __BTRFS_VOLUMES_
#include <linux/bio.h>
+#include "async-thread.h"
struct buffer_head;
struct btrfs_device {
@@ -27,6 +28,9 @@ struct btrfs_device {
struct list_head dev_alloc_list;
struct btrfs_root *dev_root;
struct buffer_head *pending_io;
+ struct bio *pending_bios;
+ struct bio *pending_bio_tail;
+ int running_pending;
u64 generation;
int barriers;
@@ -36,8 +40,6 @@ struct btrfs_device {
struct block_device *bdev;
- u64 total_ios;
-
char *name;
/* the internal btrfs device id */
@@ -63,6 +65,8 @@ struct btrfs_device {
/* physical drive uuid (or lvm uuid) */
u8 uuid[BTRFS_UUID_SIZE];
+
+ struct btrfs_work work;
};
struct btrfs_fs_devices {
@@ -117,7 +121,7 @@ int btrfs_alloc_chunk(struct btrfs_trans_handle *trans,
void btrfs_mapping_init(struct btrfs_mapping_tree *tree);
void btrfs_mapping_tree_free(struct btrfs_mapping_tree *tree);
int btrfs_map_bio(struct btrfs_root *root, int rw, struct bio *bio,
- int mirror_num);
+ int mirror_num, int async_submit);
int btrfs_read_super_device(struct btrfs_root *root, struct extent_buffer *buf);
int btrfs_open_devices(struct btrfs_fs_devices *fs_devices,
int flags, void *holder);
OpenPOWER on IntegriCloud