1*33a7ea4bSMehdi Amini //==-- llvm/Support/ThreadPool.cpp - A ThreadPool implementation -*- C++ -*-==//
2*33a7ea4bSMehdi Amini //
3*33a7ea4bSMehdi Amini //                     The LLVM Compiler Infrastructure
4*33a7ea4bSMehdi Amini //
5*33a7ea4bSMehdi Amini // This file is distributed under the University of Illinois Open Source
6*33a7ea4bSMehdi Amini // License. See LICENSE.TXT for details.
7*33a7ea4bSMehdi Amini //
8*33a7ea4bSMehdi Amini //===----------------------------------------------------------------------===//
9*33a7ea4bSMehdi Amini //
10*33a7ea4bSMehdi Amini // This file implements a crude C++11 based thread pool.
11*33a7ea4bSMehdi Amini //
12*33a7ea4bSMehdi Amini //===----------------------------------------------------------------------===//
13*33a7ea4bSMehdi Amini 
14*33a7ea4bSMehdi Amini #include "llvm/Support/ThreadPool.h"
15*33a7ea4bSMehdi Amini 
16*33a7ea4bSMehdi Amini #include "llvm/Config/llvm-config.h"
17*33a7ea4bSMehdi Amini #include "llvm/Support/raw_ostream.h"
18*33a7ea4bSMehdi Amini 
19*33a7ea4bSMehdi Amini using namespace llvm;
20*33a7ea4bSMehdi Amini 
21*33a7ea4bSMehdi Amini #if LLVM_ENABLE_THREADS
22*33a7ea4bSMehdi Amini 
23*33a7ea4bSMehdi Amini // Default to std::thread::hardware_concurrency
24*33a7ea4bSMehdi Amini ThreadPool::ThreadPool() : ThreadPool(std::thread::hardware_concurrency()) {}
25*33a7ea4bSMehdi Amini 
26*33a7ea4bSMehdi Amini ThreadPool::ThreadPool(unsigned ThreadCount)
27*33a7ea4bSMehdi Amini     : ActiveThreads(0), EnableFlag(true) {
28*33a7ea4bSMehdi Amini   // Create ThreadCount threads that will loop forever, wait on QueueCondition
29*33a7ea4bSMehdi Amini   // for tasks to be queued or the Pool to be destroyed.
30*33a7ea4bSMehdi Amini   Threads.reserve(ThreadCount);
31*33a7ea4bSMehdi Amini   for (unsigned ThreadID = 0; ThreadID < ThreadCount; ++ThreadID) {
32*33a7ea4bSMehdi Amini     Threads.emplace_back([&] {
33*33a7ea4bSMehdi Amini       while (true) {
34*33a7ea4bSMehdi Amini         PackagedTaskTy Task;
35*33a7ea4bSMehdi Amini         {
36*33a7ea4bSMehdi Amini           std::unique_lock<std::mutex> LockGuard(QueueLock);
37*33a7ea4bSMehdi Amini           // Wait for tasks to be pushed in the queue
38*33a7ea4bSMehdi Amini           QueueCondition.wait(LockGuard,
39*33a7ea4bSMehdi Amini                               [&] { return !EnableFlag || !Tasks.empty(); });
40*33a7ea4bSMehdi Amini           // Exit condition
41*33a7ea4bSMehdi Amini           if (!EnableFlag && Tasks.empty())
42*33a7ea4bSMehdi Amini             return;
43*33a7ea4bSMehdi Amini           // Yeah, we have a task, grab it and release the lock on the queue
44*33a7ea4bSMehdi Amini 
45*33a7ea4bSMehdi Amini           // We first need to signal that we are active before popping the queue
46*33a7ea4bSMehdi Amini           // in order for wait() to properly detect that even if the queue is
47*33a7ea4bSMehdi Amini           // empty, there is still a task in flight.
48*33a7ea4bSMehdi Amini           {
49*33a7ea4bSMehdi Amini             ++ActiveThreads;
50*33a7ea4bSMehdi Amini             std::unique_lock<std::mutex> LockGuard(CompletionLock);
51*33a7ea4bSMehdi Amini           }
52*33a7ea4bSMehdi Amini           Task = std::move(Tasks.front());
53*33a7ea4bSMehdi Amini           Tasks.pop();
54*33a7ea4bSMehdi Amini         }
55*33a7ea4bSMehdi Amini         // Run the task we just grabbed
56*33a7ea4bSMehdi Amini #ifndef _MSC_VER
57*33a7ea4bSMehdi Amini         Task();
58*33a7ea4bSMehdi Amini #else
59*33a7ea4bSMehdi Amini         Task(/* unused */ false);
60*33a7ea4bSMehdi Amini #endif
61*33a7ea4bSMehdi Amini 
62*33a7ea4bSMehdi Amini         {
63*33a7ea4bSMehdi Amini           // Adjust `ActiveThreads`, in case someone waits on ThreadPool::wait()
64*33a7ea4bSMehdi Amini           std::unique_lock<std::mutex> LockGuard(CompletionLock);
65*33a7ea4bSMehdi Amini           --ActiveThreads;
66*33a7ea4bSMehdi Amini         }
67*33a7ea4bSMehdi Amini 
68*33a7ea4bSMehdi Amini         // Notify task completion, in case someone waits on ThreadPool::wait()
69*33a7ea4bSMehdi Amini         CompletionCondition.notify_all();
70*33a7ea4bSMehdi Amini       }
71*33a7ea4bSMehdi Amini     });
72*33a7ea4bSMehdi Amini   }
73*33a7ea4bSMehdi Amini }
74*33a7ea4bSMehdi Amini 
75*33a7ea4bSMehdi Amini void ThreadPool::wait() {
76*33a7ea4bSMehdi Amini   // Wait for all threads to complete and the queue to be empty
77*33a7ea4bSMehdi Amini   std::unique_lock<std::mutex> LockGuard(CompletionLock);
78*33a7ea4bSMehdi Amini   CompletionCondition.wait(LockGuard,
79*33a7ea4bSMehdi Amini                            [&] { return Tasks.empty() && !ActiveThreads; });
80*33a7ea4bSMehdi Amini }
81*33a7ea4bSMehdi Amini 
82*33a7ea4bSMehdi Amini std::shared_future<ThreadPool::VoidTy> ThreadPool::asyncImpl(TaskTy Task) {
83*33a7ea4bSMehdi Amini   /// Wrap the Task in a packaged_task to return a future object.
84*33a7ea4bSMehdi Amini   PackagedTaskTy PackagedTask(std::move(Task));
85*33a7ea4bSMehdi Amini   auto Future = PackagedTask.get_future();
86*33a7ea4bSMehdi Amini   {
87*33a7ea4bSMehdi Amini     // Lock the queue and push the new task
88*33a7ea4bSMehdi Amini     std::unique_lock<std::mutex> LockGuard(QueueLock);
89*33a7ea4bSMehdi Amini 
90*33a7ea4bSMehdi Amini     // Don't allow enqueueing after disabling the pool
91*33a7ea4bSMehdi Amini     assert(EnableFlag && "Queuing a thread during ThreadPool destruction");
92*33a7ea4bSMehdi Amini 
93*33a7ea4bSMehdi Amini     Tasks.push(std::move(PackagedTask));
94*33a7ea4bSMehdi Amini   }
95*33a7ea4bSMehdi Amini   QueueCondition.notify_one();
96*33a7ea4bSMehdi Amini   return Future.share();
97*33a7ea4bSMehdi Amini }
98*33a7ea4bSMehdi Amini 
99*33a7ea4bSMehdi Amini // The destructor joins all threads, waiting for completion.
100*33a7ea4bSMehdi Amini ThreadPool::~ThreadPool() {
101*33a7ea4bSMehdi Amini   {
102*33a7ea4bSMehdi Amini     std::unique_lock<std::mutex> LockGuard(QueueLock);
103*33a7ea4bSMehdi Amini     EnableFlag = false;
104*33a7ea4bSMehdi Amini   }
105*33a7ea4bSMehdi Amini   QueueCondition.notify_all();
106*33a7ea4bSMehdi Amini   for (auto &Worker : Threads)
107*33a7ea4bSMehdi Amini     Worker.join();
108*33a7ea4bSMehdi Amini }
109*33a7ea4bSMehdi Amini 
110*33a7ea4bSMehdi Amini #else // LLVM_ENABLE_THREADS Disabled
111*33a7ea4bSMehdi Amini 
112*33a7ea4bSMehdi Amini ThreadPool::ThreadPool() : ThreadPool(0) {}
113*33a7ea4bSMehdi Amini 
114*33a7ea4bSMehdi Amini // No threads are launched, issue a warning if ThreadCount is not 0
115*33a7ea4bSMehdi Amini ThreadPool::ThreadPool(unsigned ThreadCount)
116*33a7ea4bSMehdi Amini     : ActiveThreads(0) {
117*33a7ea4bSMehdi Amini   if (ThreadCount) {
118*33a7ea4bSMehdi Amini     errs() << "Warning: request a ThreadPool with " << ThreadCount
119*33a7ea4bSMehdi Amini            << " threads, but LLVM_ENABLE_THREADS has been turned off\n";
120*33a7ea4bSMehdi Amini   }
121*33a7ea4bSMehdi Amini }
122*33a7ea4bSMehdi Amini 
123*33a7ea4bSMehdi Amini void ThreadPool::wait() {
124*33a7ea4bSMehdi Amini   // Sequential implementation running the tasks
125*33a7ea4bSMehdi Amini   while (!Tasks.empty()) {
126*33a7ea4bSMehdi Amini     auto Task = std::move(Tasks.front());
127*33a7ea4bSMehdi Amini     Tasks.pop();
128*33a7ea4bSMehdi Amini     Task();
129*33a7ea4bSMehdi Amini   }
130*33a7ea4bSMehdi Amini }
131*33a7ea4bSMehdi Amini 
132*33a7ea4bSMehdi Amini std::shared_future<ThreadPool::VoidTy> ThreadPool::asyncImpl(TaskTy Task) {
133*33a7ea4bSMehdi Amini   // Get a Future with launch::deferred execution using std::async
134*33a7ea4bSMehdi Amini   auto Future = std::async(std::launch::deferred, std::move(Task)).share();
135*33a7ea4bSMehdi Amini   // Wrap the future so that both ThreadPool::wait() can operate and the
136*33a7ea4bSMehdi Amini   // returned future can be sync'ed on.
137*33a7ea4bSMehdi Amini   PackagedTaskTy PackagedTask([Future]() { Future.get(); });
138*33a7ea4bSMehdi Amini   Tasks.push(std::move(PackagedTask));
139*33a7ea4bSMehdi Amini   return Future;
140*33a7ea4bSMehdi Amini }
141*33a7ea4bSMehdi Amini 
142*33a7ea4bSMehdi Amini ThreadPool::~ThreadPool() {
143*33a7ea4bSMehdi Amini   wait();
144*33a7ea4bSMehdi Amini }
145*33a7ea4bSMehdi Amini 
146*33a7ea4bSMehdi Amini #endif
147