From 33a7ea4b9a1a81fd5b2b3036940bcfc38d438830 Mon Sep 17 00:00:00 2001 From: Mehdi Amini Date: Tue, 15 Dec 2015 00:59:19 +0000 Subject: Add a C++11 ThreadPool implementation in LLVM This is a very simple implementation of a thread pool using C++11 thread. It accepts any std::function for asynchronous execution. Individual task can be synchronize using the returned future, or the client can block on the full queue completion. In case LLVM is configured with Threading disabled, it falls back to sequential execution using std::async with launch:deferred. This is intended to support parallelism for ThinLTO processing in linker plugin, but is generic enough for any other uses. This is a recommit of r255444 ; trying to workaround a bug in the MSVC 2013 standard library. I think I was hit by: http://connect.microsoft.com/VisualStudio/feedbackdetail/view/791185/std-packaged-task-t-where-t-is-void-or-a-reference-class-are-not-movable Recommit of r255589, trying to please g++ as well. Differential Revision: http://reviews.llvm.org/D15464 From: mehdi_amini llvm-svn: 255593 --- llvm/lib/Support/ThreadPool.cpp | 146 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 146 insertions(+) create mode 100644 llvm/lib/Support/ThreadPool.cpp (limited to 'llvm/lib/Support/ThreadPool.cpp') diff --git a/llvm/lib/Support/ThreadPool.cpp b/llvm/lib/Support/ThreadPool.cpp new file mode 100644 index 00000000000..bc004dfb8a8 --- /dev/null +++ b/llvm/lib/Support/ThreadPool.cpp @@ -0,0 +1,146 @@ +//==-- llvm/Support/ThreadPool.cpp - A ThreadPool implementation -*- C++ -*-==// +// +// The LLVM Compiler Infrastructure +// +// This file is distributed under the University of Illinois Open Source +// License. See LICENSE.TXT for details. +// +//===----------------------------------------------------------------------===// +// +// This file implements a crude C++11 based thread pool. +// +//===----------------------------------------------------------------------===// + +#include "llvm/Support/ThreadPool.h" + +#include "llvm/Config/llvm-config.h" +#include "llvm/Support/raw_ostream.h" + +using namespace llvm; + +#if LLVM_ENABLE_THREADS + +// Default to std::thread::hardware_concurrency +ThreadPool::ThreadPool() : ThreadPool(std::thread::hardware_concurrency()) {} + +ThreadPool::ThreadPool(unsigned ThreadCount) + : ActiveThreads(0), EnableFlag(true) { + // Create ThreadCount threads that will loop forever, wait on QueueCondition + // for tasks to be queued or the Pool to be destroyed. + Threads.reserve(ThreadCount); + for (unsigned ThreadID = 0; ThreadID < ThreadCount; ++ThreadID) { + Threads.emplace_back([&] { + while (true) { + PackagedTaskTy Task; + { + std::unique_lock LockGuard(QueueLock); + // Wait for tasks to be pushed in the queue + QueueCondition.wait(LockGuard, + [&] { return !EnableFlag || !Tasks.empty(); }); + // Exit condition + if (!EnableFlag && Tasks.empty()) + return; + // Yeah, we have a task, grab it and release the lock on the queue + + // We first need to signal that we are active before popping the queue + // in order for wait() to properly detect that even if the queue is + // empty, there is still a task in flight. + { + ++ActiveThreads; + std::unique_lock LockGuard(CompletionLock); + } + Task = std::move(Tasks.front()); + Tasks.pop(); + } + // Run the task we just grabbed +#ifndef _MSC_VER + Task(); +#else + Task(/* unused */ false); +#endif + + { + // Adjust `ActiveThreads`, in case someone waits on ThreadPool::wait() + std::unique_lock LockGuard(CompletionLock); + --ActiveThreads; + } + + // Notify task completion, in case someone waits on ThreadPool::wait() + CompletionCondition.notify_all(); + } + }); + } +} + +void ThreadPool::wait() { + // Wait for all threads to complete and the queue to be empty + std::unique_lock LockGuard(CompletionLock); + CompletionCondition.wait(LockGuard, + [&] { return Tasks.empty() && !ActiveThreads; }); +} + +std::shared_future ThreadPool::asyncImpl(TaskTy Task) { + /// Wrap the Task in a packaged_task to return a future object. + PackagedTaskTy PackagedTask(std::move(Task)); + auto Future = PackagedTask.get_future(); + { + // Lock the queue and push the new task + std::unique_lock LockGuard(QueueLock); + + // Don't allow enqueueing after disabling the pool + assert(EnableFlag && "Queuing a thread during ThreadPool destruction"); + + Tasks.push(std::move(PackagedTask)); + } + QueueCondition.notify_one(); + return Future.share(); +} + +// The destructor joins all threads, waiting for completion. +ThreadPool::~ThreadPool() { + { + std::unique_lock LockGuard(QueueLock); + EnableFlag = false; + } + QueueCondition.notify_all(); + for (auto &Worker : Threads) + Worker.join(); +} + +#else // LLVM_ENABLE_THREADS Disabled + +ThreadPool::ThreadPool() : ThreadPool(0) {} + +// No threads are launched, issue a warning if ThreadCount is not 0 +ThreadPool::ThreadPool(unsigned ThreadCount) + : ActiveThreads(0) { + if (ThreadCount) { + errs() << "Warning: request a ThreadPool with " << ThreadCount + << " threads, but LLVM_ENABLE_THREADS has been turned off\n"; + } +} + +void ThreadPool::wait() { + // Sequential implementation running the tasks + while (!Tasks.empty()) { + auto Task = std::move(Tasks.front()); + Tasks.pop(); + Task(); + } +} + +std::shared_future ThreadPool::asyncImpl(TaskTy Task) { + // Get a Future with launch::deferred execution using std::async + auto Future = std::async(std::launch::deferred, std::move(Task)).share(); + // Wrap the future so that both ThreadPool::wait() can operate and the + // returned future can be sync'ed on. + PackagedTaskTy PackagedTask([Future]() { Future.get(); }); + Tasks.push(std::move(PackagedTask)); + return Future; +} + +ThreadPool::~ThreadPool() { + wait(); +} + +#endif -- cgit v1.2.3