diff options
-rw-r--r-- | llvm/include/llvm/Support/ThreadPool.h | 113 | ||||
-rw-r--r-- | llvm/include/llvm/Support/thread.h | 2 | ||||
-rw-r--r-- | llvm/lib/Support/CMakeLists.txt | 1 | ||||
-rw-r--r-- | llvm/lib/Support/ThreadPool.cpp | 146 | ||||
-rw-r--r-- | llvm/unittests/Support/CMakeLists.txt | 1 | ||||
-rw-r--r-- | llvm/unittests/Support/ThreadPool.cpp | 91 |
6 files changed, 354 insertions, 0 deletions
diff --git a/llvm/include/llvm/Support/ThreadPool.h b/llvm/include/llvm/Support/ThreadPool.h new file mode 100644 index 00000000000..8a90c85865a --- /dev/null +++ b/llvm/include/llvm/Support/ThreadPool.h @@ -0,0 +1,113 @@ +//===-- llvm/Support/ThreadPool.h - A ThreadPool implementation -*- C++ -*-===// +// +// The LLVM Compiler Infrastructure +// +// This file is distributed under the University of Illinois Open Source +// License. See LICENSE.TXT for details. +// +//===----------------------------------------------------------------------===// +// +// This file defines a crude C++11 based thread pool. +// +//===----------------------------------------------------------------------===// + +#ifndef LLVM_SUPPORT_THREAD_POOL_H +#define LLVM_SUPPORT_THREAD_POOL_H + +#include "llvm/Support/thread.h" + +#include <condition_variable> +#include <functional> +#include <future> +#include <memory> +#include <mutex> +#include <queue> +#include <utility> + +namespace llvm { + +/// A ThreadPool for asynchronous parallel execution on a defined number of +/// threads. +/// +/// The pool keeps a vector of threads alive, waiting on a condition variable +/// for some work to become available. +class ThreadPool { +public: +#ifndef _MSC_VER + using VoidTy = void; +#else + // MSVC 2013 has a bug and can't use std::packaged_task<void()>; + // We force it to use bool(bool) instead. + using VoidTy = bool; +#endif + using TaskTy = std::function<VoidTy(VoidTy)>; + using PackagedTaskTy = std::packaged_task<VoidTy(VoidTy)>; + + /// Construct a pool with the number of core available on the system (or + /// whatever the value returned by std::thread::hardware_concurrency() is). + ThreadPool(); + + /// Construct a pool of \p ThreadCount threads + ThreadPool(unsigned ThreadCount); + + /// Blocking destructor: the pool will wait for all the threads to complete. + ~ThreadPool(); + + /// Asynchronous submission of a task to the pool. The returned future can be + /// used to wait for the task to finish and is *non-blocking* on destruction. + template <typename Function, typename... Args> + inline std::shared_future<VoidTy> async(Function &&F, Args &&... ArgList) { + auto Task = + std::bind(std::forward<Function>(F), std::forward<Args...>(ArgList...)); +#ifndef _MSC_VER + return asyncImpl(std::move(Task)); +#else + return asyncImpl([Task] (VoidTy) -> VoidTy { Task(); return VoidTy(); }); +#endif + } + + /// Asynchronous submission of a task to the pool. The returned future can be + /// used to wait for the task to finish and is *non-blocking* on destruction. + template <typename Function> + inline std::shared_future<VoidTy> async(Function &&F) { +#ifndef _MSC_VER + return asyncImpl(std::forward<Function>(F)); +#else + return asyncImpl([F] (VoidTy) -> VoidTy { F(); return VoidTy(); }); +#endif + } + + /// Blocking wait for all the threads to complete and the queue to be empty. + /// It is an error to try to add new tasks while blocking on this call. + void wait(); + +private: + /// Asynchronous submission of a task to the pool. The returned future can be + /// used to wait for the task to finish and is *non-blocking* on destruction. + std::shared_future<VoidTy> asyncImpl(TaskTy F); + + /// Threads in flight + std::vector<llvm::thread> Threads; + + /// Tasks waiting for execution in the pool. + std::queue<PackagedTaskTy> Tasks; + + /// Locking and signaling for accessing the Tasks queue. + std::mutex QueueLock; + std::condition_variable QueueCondition; + + /// Locking and signaling for job completion + std::mutex CompletionLock; + std::condition_variable CompletionCondition; + + /// Keep track of the number of thread actually busy + std::atomic<unsigned> ActiveThreads; + +#if LLVM_ENABLE_THREADS // avoids warning for unused variable + /// Signal for the destruction of the pool, asking thread to exit. + bool EnableFlag; +#endif +}; +} + +#endif // LLVM_SUPPORT_THREAD_POOL_H diff --git a/llvm/include/llvm/Support/thread.h b/llvm/include/llvm/Support/thread.h index 2d1f1b3a3ec..2d130418a57 100644 --- a/llvm/include/llvm/Support/thread.h +++ b/llvm/include/llvm/Support/thread.h @@ -43,6 +43,8 @@ typedef std::thread thread; #else // !LLVM_ENABLE_THREADS +#include <utility> + namespace llvm { struct thread { diff --git a/llvm/lib/Support/CMakeLists.txt b/llvm/lib/Support/CMakeLists.txt index 784e00e15b9..75b3e89f916 100644 --- a/llvm/lib/Support/CMakeLists.txt +++ b/llvm/lib/Support/CMakeLists.txt @@ -89,6 +89,7 @@ add_llvm_library(LLVMSupport StringRef.cpp SystemUtils.cpp TargetParser.cpp + ThreadPool.cpp Timer.cpp ToolOutputFile.cpp Triple.cpp diff --git a/llvm/lib/Support/ThreadPool.cpp b/llvm/lib/Support/ThreadPool.cpp new file mode 100644 index 00000000000..bc004dfb8a8 --- /dev/null +++ b/llvm/lib/Support/ThreadPool.cpp @@ -0,0 +1,146 @@ +//==-- llvm/Support/ThreadPool.cpp - A ThreadPool implementation -*- C++ -*-==// +// +// The LLVM Compiler Infrastructure +// +// This file is distributed under the University of Illinois Open Source +// License. See LICENSE.TXT for details. +// +//===----------------------------------------------------------------------===// +// +// This file implements a crude C++11 based thread pool. +// +//===----------------------------------------------------------------------===// + +#include "llvm/Support/ThreadPool.h" + +#include "llvm/Config/llvm-config.h" +#include "llvm/Support/raw_ostream.h" + +using namespace llvm; + +#if LLVM_ENABLE_THREADS + +// Default to std::thread::hardware_concurrency +ThreadPool::ThreadPool() : ThreadPool(std::thread::hardware_concurrency()) {} + +ThreadPool::ThreadPool(unsigned ThreadCount) + : ActiveThreads(0), EnableFlag(true) { + // Create ThreadCount threads that will loop forever, wait on QueueCondition + // for tasks to be queued or the Pool to be destroyed. + Threads.reserve(ThreadCount); + for (unsigned ThreadID = 0; ThreadID < ThreadCount; ++ThreadID) { + Threads.emplace_back([&] { + while (true) { + PackagedTaskTy Task; + { + std::unique_lock<std::mutex> LockGuard(QueueLock); + // Wait for tasks to be pushed in the queue + QueueCondition.wait(LockGuard, + [&] { return !EnableFlag || !Tasks.empty(); }); + // Exit condition + if (!EnableFlag && Tasks.empty()) + return; + // Yeah, we have a task, grab it and release the lock on the queue + + // We first need to signal that we are active before popping the queue + // in order for wait() to properly detect that even if the queue is + // empty, there is still a task in flight. + { + ++ActiveThreads; + std::unique_lock<std::mutex> LockGuard(CompletionLock); + } + Task = std::move(Tasks.front()); + Tasks.pop(); + } + // Run the task we just grabbed +#ifndef _MSC_VER + Task(); +#else + Task(/* unused */ false); +#endif + + { + // Adjust `ActiveThreads`, in case someone waits on ThreadPool::wait() + std::unique_lock<std::mutex> LockGuard(CompletionLock); + --ActiveThreads; + } + + // Notify task completion, in case someone waits on ThreadPool::wait() + CompletionCondition.notify_all(); + } + }); + } +} + +void ThreadPool::wait() { + // Wait for all threads to complete and the queue to be empty + std::unique_lock<std::mutex> LockGuard(CompletionLock); + CompletionCondition.wait(LockGuard, + [&] { return Tasks.empty() && !ActiveThreads; }); +} + +std::shared_future<ThreadPool::VoidTy> ThreadPool::asyncImpl(TaskTy Task) { + /// Wrap the Task in a packaged_task to return a future object. + PackagedTaskTy PackagedTask(std::move(Task)); + auto Future = PackagedTask.get_future(); + { + // Lock the queue and push the new task + std::unique_lock<std::mutex> LockGuard(QueueLock); + + // Don't allow enqueueing after disabling the pool + assert(EnableFlag && "Queuing a thread during ThreadPool destruction"); + + Tasks.push(std::move(PackagedTask)); + } + QueueCondition.notify_one(); + return Future.share(); +} + +// The destructor joins all threads, waiting for completion. +ThreadPool::~ThreadPool() { + { + std::unique_lock<std::mutex> LockGuard(QueueLock); + EnableFlag = false; + } + QueueCondition.notify_all(); + for (auto &Worker : Threads) + Worker.join(); +} + +#else // LLVM_ENABLE_THREADS Disabled + +ThreadPool::ThreadPool() : ThreadPool(0) {} + +// No threads are launched, issue a warning if ThreadCount is not 0 +ThreadPool::ThreadPool(unsigned ThreadCount) + : ActiveThreads(0) { + if (ThreadCount) { + errs() << "Warning: request a ThreadPool with " << ThreadCount + << " threads, but LLVM_ENABLE_THREADS has been turned off\n"; + } +} + +void ThreadPool::wait() { + // Sequential implementation running the tasks + while (!Tasks.empty()) { + auto Task = std::move(Tasks.front()); + Tasks.pop(); + Task(); + } +} + +std::shared_future<ThreadPool::VoidTy> ThreadPool::asyncImpl(TaskTy Task) { + // Get a Future with launch::deferred execution using std::async + auto Future = std::async(std::launch::deferred, std::move(Task)).share(); + // Wrap the future so that both ThreadPool::wait() can operate and the + // returned future can be sync'ed on. + PackagedTaskTy PackagedTask([Future]() { Future.get(); }); + Tasks.push(std::move(PackagedTask)); + return Future; +} + +ThreadPool::~ThreadPool() { + wait(); +} + +#endif diff --git a/llvm/unittests/Support/CMakeLists.txt b/llvm/unittests/Support/CMakeLists.txt index fd8324c836d..9bd685759ed 100644 --- a/llvm/unittests/Support/CMakeLists.txt +++ b/llvm/unittests/Support/CMakeLists.txt @@ -41,6 +41,7 @@ add_llvm_unittest(SupportTests SwapByteOrderTest.cpp TargetRegistry.cpp ThreadLocalTest.cpp + ThreadPool.cpp TimeValueTest.cpp TrailingObjectsTest.cpp UnicodeTest.cpp diff --git a/llvm/unittests/Support/ThreadPool.cpp b/llvm/unittests/Support/ThreadPool.cpp new file mode 100644 index 00000000000..d36341e425d --- /dev/null +++ b/llvm/unittests/Support/ThreadPool.cpp @@ -0,0 +1,91 @@ +//========- unittests/Support/ThreadPools.cpp - ThreadPools.h tests --========// +// +// The LLVM Compiler Infrastructure +// +// This file is distributed under the University of Illinois Open Source +// License. See LICENSE.TXT for details. +// +//===----------------------------------------------------------------------===// + +#include "llvm/Support/ThreadPool.h" + +#include "llvm/ADT/STLExtras.h" + +#include "gtest/gtest.h" + +using namespace llvm; +using namespace std::chrono; + +/// Try best to make this thread not progress faster than the main thread +static void yield() { +#ifdef LLVM_ENABLE_THREADS + std::this_thread::yield(); +#endif + std::this_thread::sleep_for(milliseconds(200)); +#ifdef LLVM_ENABLE_THREADS + std::this_thread::yield(); +#endif +} + +TEST(ThreadPoolTest, AsyncBarrier) { + // test that async & barrier work together properly. + + std::atomic_int checked_in{0}; + + ThreadPool Pool; + for (size_t i = 0; i < 5; ++i) { + Pool.async([&checked_in, i] { + yield(); + ++checked_in; + }); + } + ASSERT_EQ(0, checked_in); + Pool.wait(); + ASSERT_EQ(5, checked_in); +} + +TEST(ThreadPoolTest, Async) { + ThreadPool Pool; + std::atomic_int i{0}; + // sleep here just to ensure that the not-equal is correct. + Pool.async([&i] { + yield(); + ++i; + }); + Pool.async([&i] { ++i; }); + ASSERT_NE(2, i.load()); + Pool.wait(); + ASSERT_EQ(2, i.load()); +} + +TEST(ThreadPoolTest, GetFuture) { + ThreadPool Pool; + std::atomic_int i{0}; + // sleep here just to ensure that the not-equal is correct. + Pool.async([&i] { + yield(); + ++i; + }); + // Force the future using get() + Pool.async([&i] { ++i; }).get(); + ASSERT_NE(2, i.load()); + Pool.wait(); + ASSERT_EQ(2, i.load()); +} + +TEST(ThreadPoolTest, PoolDestruction) { + // Test that we are waiting on destruction + std::atomic_int checked_in{0}; + + { + ThreadPool Pool; + for (size_t i = 0; i < 5; ++i) { + Pool.async([&checked_in, i] { + yield(); + ++checked_in; + }); + } + ASSERT_EQ(0, checked_in); + } + ASSERT_EQ(5, checked_in); +} |