133a7ea4bSMehdi Amini //==-- llvm/Support/ThreadPool.cpp - A ThreadPool implementation -*- C++ -*-==//
233a7ea4bSMehdi Amini //
32946cd70SChandler Carruth // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
42946cd70SChandler Carruth // See https://llvm.org/LICENSE.txt for license information.
52946cd70SChandler Carruth // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
633a7ea4bSMehdi Amini //
733a7ea4bSMehdi Amini //===----------------------------------------------------------------------===//
833a7ea4bSMehdi Amini //
933a7ea4bSMehdi Amini // This file implements a crude C++11 based thread pool.
1033a7ea4bSMehdi Amini //
1133a7ea4bSMehdi Amini //===----------------------------------------------------------------------===//
1233a7ea4bSMehdi Amini
1333a7ea4bSMehdi Amini #include "llvm/Support/ThreadPool.h"
1433a7ea4bSMehdi Amini
1533a7ea4bSMehdi Amini #include "llvm/Config/llvm-config.h"
160984aa70Sserge-sans-paille
170984aa70Sserge-sans-paille #if LLVM_ENABLE_THREADS
188c0ff950SRafael Espindola #include "llvm/Support/Threading.h"
190984aa70Sserge-sans-paille #else
200984aa70Sserge-sans-paille #include "llvm/Support/raw_ostream.h"
210984aa70Sserge-sans-paille #endif
2233a7ea4bSMehdi Amini
2333a7ea4bSMehdi Amini using namespace llvm;
2433a7ea4bSMehdi Amini
2533a7ea4bSMehdi Amini #if LLVM_ENABLE_THREADS
2633a7ea4bSMehdi Amini
278ef5710eSLuboš Luňák // A note on thread groups: Tasks are by default in no group (represented
288ef5710eSLuboš Luňák // by nullptr ThreadPoolTaskGroup pointer in the Tasks queue) and functionality
298ef5710eSLuboš Luňák // here normally works on all tasks regardless of their group (functions
308ef5710eSLuboš Luňák // in that case receive nullptr ThreadPoolTaskGroup pointer as argument).
318ef5710eSLuboš Luňák // A task in a group has a pointer to that ThreadPoolTaskGroup in the Tasks
328ef5710eSLuboš Luňák // queue, and functions called to work only on tasks from one group take that
338ef5710eSLuboš Luňák // pointer.
348ef5710eSLuboš Luňák
ThreadPool(ThreadPoolStrategy S)35b28f317cSMehdi Amini ThreadPool::ThreadPool(ThreadPoolStrategy S)
36b28f317cSMehdi Amini : Strategy(S), MaxThreadCount(S.compute_thread_count()) {}
37b28f317cSMehdi Amini
grow(int requested)38e8469718SMehdi Amini void ThreadPool::grow(int requested) {
398ef5710eSLuboš Luňák llvm::sys::ScopedWriter LockGuard(ThreadsLock);
40728b982bSBenoit Jacob if (Threads.size() >= MaxThreadCount)
41728b982bSBenoit Jacob return; // Already hit the max thread pool size.
42e8469718SMehdi Amini int newThreadCount = std::min<int>(requested, MaxThreadCount);
43e8469718SMehdi Amini while (static_cast<int>(Threads.size()) < newThreadCount) {
44728b982bSBenoit Jacob int ThreadID = Threads.size();
45728b982bSBenoit Jacob Threads.emplace_back([this, ThreadID] {
46728b982bSBenoit Jacob Strategy.apply_thread_strategy(ThreadID);
478ef5710eSLuboš Luňák processTasks(nullptr);
488ef5710eSLuboš Luňák });
498ef5710eSLuboš Luňák }
508ef5710eSLuboš Luňák }
518ef5710eSLuboš Luňák
528ef5710eSLuboš Luňák #ifndef NDEBUG
538ef5710eSLuboš Luňák // The group of the tasks run by the current thread.
548ef5710eSLuboš Luňák static LLVM_THREAD_LOCAL std::vector<ThreadPoolTaskGroup *>
558ef5710eSLuboš Luňák *CurrentThreadTaskGroups = nullptr;
568ef5710eSLuboš Luňák #endif
578ef5710eSLuboš Luňák
588ef5710eSLuboš Luňák // WaitingForGroup == nullptr means all tasks regardless of their group.
processTasks(ThreadPoolTaskGroup * WaitingForGroup)598ef5710eSLuboš Luňák void ThreadPool::processTasks(ThreadPoolTaskGroup *WaitingForGroup) {
6033a7ea4bSMehdi Amini while (true) {
618cb1af73SFlorian Hahn std::function<void()> Task;
628ef5710eSLuboš Luňák ThreadPoolTaskGroup *GroupOfTask;
6333a7ea4bSMehdi Amini {
6433a7ea4bSMehdi Amini std::unique_lock<std::mutex> LockGuard(QueueLock);
658ef5710eSLuboš Luňák bool workCompletedForGroup = false; // Result of workCompletedUnlocked()
6633a7ea4bSMehdi Amini // Wait for tasks to be pushed in the queue
678ef5710eSLuboš Luňák QueueCondition.wait(LockGuard, [&] {
688ef5710eSLuboš Luňák return !EnableFlag || !Tasks.empty() ||
698ef5710eSLuboš Luňák (WaitingForGroup != nullptr &&
708ef5710eSLuboš Luňák (workCompletedForGroup =
718ef5710eSLuboš Luňák workCompletedUnlocked(WaitingForGroup)));
728ef5710eSLuboš Luňák });
7333a7ea4bSMehdi Amini // Exit condition
7433a7ea4bSMehdi Amini if (!EnableFlag && Tasks.empty())
7533a7ea4bSMehdi Amini return;
768ef5710eSLuboš Luňák if (WaitingForGroup != nullptr && workCompletedForGroup)
778ef5710eSLuboš Luňák return;
7833a7ea4bSMehdi Amini // Yeah, we have a task, grab it and release the lock on the queue
7933a7ea4bSMehdi Amini
8033a7ea4bSMehdi Amini // We first need to signal that we are active before popping the queue
8133a7ea4bSMehdi Amini // in order for wait() to properly detect that even if the queue is
8233a7ea4bSMehdi Amini // empty, there is still a task in flight.
83c723f657SJan Korous ++ActiveThreads;
848ef5710eSLuboš Luňák Task = std::move(Tasks.front().first);
858ef5710eSLuboš Luňák GroupOfTask = Tasks.front().second;
868ef5710eSLuboš Luňák // Need to count active threads in each group separately, ActiveThreads
878ef5710eSLuboš Luňák // would never be 0 if waiting for another group inside a wait.
888ef5710eSLuboš Luňák if (GroupOfTask != nullptr)
898ef5710eSLuboš Luňák ++ActiveGroups[GroupOfTask]; // Increment or set to 1 if new item
908ef5710eSLuboš Luňák Tasks.pop_front();
9133a7ea4bSMehdi Amini }
928ef5710eSLuboš Luňák #ifndef NDEBUG
938ef5710eSLuboš Luňák if (CurrentThreadTaskGroups == nullptr)
948ef5710eSLuboš Luňák CurrentThreadTaskGroups = new std::vector<ThreadPoolTaskGroup *>;
958ef5710eSLuboš Luňák CurrentThreadTaskGroups->push_back(GroupOfTask);
968ef5710eSLuboš Luňák #endif
978ef5710eSLuboš Luňák
9833a7ea4bSMehdi Amini // Run the task we just grabbed
999b8b0794SZachary Turner Task();
10033a7ea4bSMehdi Amini
1018ef5710eSLuboš Luňák #ifndef NDEBUG
1028ef5710eSLuboš Luňák CurrentThreadTaskGroups->pop_back();
103*9c34a16cSLuboš Luňák if (CurrentThreadTaskGroups->empty()) {
104*9c34a16cSLuboš Luňák delete CurrentThreadTaskGroups;
105*9c34a16cSLuboš Luňák CurrentThreadTaskGroups = nullptr;
106*9c34a16cSLuboš Luňák }
1078ef5710eSLuboš Luňák #endif
1088ef5710eSLuboš Luňák
1096f230491SFangrui Song bool Notify;
1108ef5710eSLuboš Luňák bool NotifyGroup;
11133a7ea4bSMehdi Amini {
11233a7ea4bSMehdi Amini // Adjust `ActiveThreads`, in case someone waits on ThreadPool::wait()
1136f230491SFangrui Song std::lock_guard<std::mutex> LockGuard(QueueLock);
11433a7ea4bSMehdi Amini --ActiveThreads;
1158ef5710eSLuboš Luňák if (GroupOfTask != nullptr) {
1168ef5710eSLuboš Luňák auto A = ActiveGroups.find(GroupOfTask);
1178ef5710eSLuboš Luňák if (--(A->second) == 0)
1188ef5710eSLuboš Luňák ActiveGroups.erase(A);
1198ef5710eSLuboš Luňák }
1208ef5710eSLuboš Luňák Notify = workCompletedUnlocked(GroupOfTask);
1218ef5710eSLuboš Luňák NotifyGroup = GroupOfTask != nullptr && Notify;
12233a7ea4bSMehdi Amini }
1236f230491SFangrui Song // Notify task completion if this is the last active thread, in case
1246f230491SFangrui Song // someone waits on ThreadPool::wait().
1256f230491SFangrui Song if (Notify)
12633a7ea4bSMehdi Amini CompletionCondition.notify_all();
1278ef5710eSLuboš Luňák // If this was a task in a group, notify also threads waiting for tasks
1288ef5710eSLuboš Luňák // in this function on QueueCondition, to make a recursive wait() return
1298ef5710eSLuboš Luňák // after the group it's been waiting for has finished.
1308ef5710eSLuboš Luňák if (NotifyGroup)
1318ef5710eSLuboš Luňák QueueCondition.notify_all();
13233a7ea4bSMehdi Amini }
13333a7ea4bSMehdi Amini }
1348ef5710eSLuboš Luňák
workCompletedUnlocked(ThreadPoolTaskGroup * Group) const1358ef5710eSLuboš Luňák bool ThreadPool::workCompletedUnlocked(ThreadPoolTaskGroup *Group) const {
1368ef5710eSLuboš Luňák if (Group == nullptr)
1378ef5710eSLuboš Luňák return !ActiveThreads && Tasks.empty();
1388ef5710eSLuboš Luňák return ActiveGroups.count(Group) == 0 &&
1398ef5710eSLuboš Luňák !llvm::any_of(Tasks,
1408ef5710eSLuboš Luňák [Group](const auto &T) { return T.second == Group; });
141e8469718SMehdi Amini }
14233a7ea4bSMehdi Amini
wait()14333a7ea4bSMehdi Amini void ThreadPool::wait() {
1448ef5710eSLuboš Luňák assert(!isWorkerThread()); // Would deadlock waiting for itself.
14533a7ea4bSMehdi Amini // Wait for all threads to complete and the queue to be empty
1466f230491SFangrui Song std::unique_lock<std::mutex> LockGuard(QueueLock);
1478ef5710eSLuboš Luňák CompletionCondition.wait(LockGuard,
1488ef5710eSLuboš Luňák [&] { return workCompletedUnlocked(nullptr); });
1498ef5710eSLuboš Luňák }
1508ef5710eSLuboš Luňák
wait(ThreadPoolTaskGroup & Group)1518ef5710eSLuboš Luňák void ThreadPool::wait(ThreadPoolTaskGroup &Group) {
1528ef5710eSLuboš Luňák // Wait for all threads in the group to complete.
1538ef5710eSLuboš Luňák if (!isWorkerThread()) {
1548ef5710eSLuboš Luňák std::unique_lock<std::mutex> LockGuard(QueueLock);
1558ef5710eSLuboš Luňák CompletionCondition.wait(LockGuard,
1568ef5710eSLuboš Luňák [&] { return workCompletedUnlocked(&Group); });
1578ef5710eSLuboš Luňák return;
1588ef5710eSLuboš Luňák }
1598ef5710eSLuboš Luňák // Make sure to not deadlock waiting for oneself.
1608ef5710eSLuboš Luňák assert(CurrentThreadTaskGroups == nullptr ||
1618ef5710eSLuboš Luňák !llvm::is_contained(*CurrentThreadTaskGroups, &Group));
1628ef5710eSLuboš Luňák // Handle the case of recursive call from another task in a different group,
1638ef5710eSLuboš Luňák // in which case process tasks while waiting to keep the thread busy and avoid
1648ef5710eSLuboš Luňák // possible deadlock.
1658ef5710eSLuboš Luňák processTasks(&Group);
16633a7ea4bSMehdi Amini }
16733a7ea4bSMehdi Amini
isWorkerThread() const1686569cf2aSRiver Riddle bool ThreadPool::isWorkerThread() const {
1698ef5710eSLuboš Luňák llvm::sys::ScopedReader LockGuard(ThreadsLock);
17048c68a63STim Northover llvm::thread::id CurrentThreadId = llvm::this_thread::get_id();
17148c68a63STim Northover for (const llvm::thread &Thread : Threads)
1726569cf2aSRiver Riddle if (CurrentThreadId == Thread.get_id())
1736569cf2aSRiver Riddle return true;
1746569cf2aSRiver Riddle return false;
1756569cf2aSRiver Riddle }
1766569cf2aSRiver Riddle
17733a7ea4bSMehdi Amini // The destructor joins all threads, waiting for completion.
~ThreadPool()17833a7ea4bSMehdi Amini ThreadPool::~ThreadPool() {
17933a7ea4bSMehdi Amini {
18033a7ea4bSMehdi Amini std::unique_lock<std::mutex> LockGuard(QueueLock);
18133a7ea4bSMehdi Amini EnableFlag = false;
18233a7ea4bSMehdi Amini }
18333a7ea4bSMehdi Amini QueueCondition.notify_all();
1848ef5710eSLuboš Luňák llvm::sys::ScopedReader LockGuard(ThreadsLock);
18533a7ea4bSMehdi Amini for (auto &Worker : Threads)
18633a7ea4bSMehdi Amini Worker.join();
18733a7ea4bSMehdi Amini }
18833a7ea4bSMehdi Amini
18933a7ea4bSMehdi Amini #else // LLVM_ENABLE_THREADS Disabled
19033a7ea4bSMehdi Amini
19133a7ea4bSMehdi Amini // No threads are launched, issue a warning if ThreadCount is not 0
ThreadPool(ThreadPoolStrategy S)192b28f317cSMehdi Amini ThreadPool::ThreadPool(ThreadPoolStrategy S) : MaxThreadCount(1) {
193b28f317cSMehdi Amini int ThreadCount = S.compute_thread_count();
1948404aeb5SAlexandre Ganea if (ThreadCount != 1) {
19533a7ea4bSMehdi Amini errs() << "Warning: request a ThreadPool with " << ThreadCount
19633a7ea4bSMehdi Amini << " threads, but LLVM_ENABLE_THREADS has been turned off\n";
19733a7ea4bSMehdi Amini }
19833a7ea4bSMehdi Amini }
19933a7ea4bSMehdi Amini
wait()20033a7ea4bSMehdi Amini void ThreadPool::wait() {
20133a7ea4bSMehdi Amini // Sequential implementation running the tasks
20233a7ea4bSMehdi Amini while (!Tasks.empty()) {
2038ef5710eSLuboš Luňák auto Task = std::move(Tasks.front().first);
2048ef5710eSLuboš Luňák Tasks.pop_front();
20533a7ea4bSMehdi Amini Task();
20633a7ea4bSMehdi Amini }
20733a7ea4bSMehdi Amini }
20833a7ea4bSMehdi Amini
wait(ThreadPoolTaskGroup &)2098ef5710eSLuboš Luňák void ThreadPool::wait(ThreadPoolTaskGroup &) {
2108ef5710eSLuboš Luňák // Simply wait for all, this works even if recursive (the running task
2118ef5710eSLuboš Luňák // is already removed from the queue).
2128ef5710eSLuboš Luňák wait();
2138ef5710eSLuboš Luňák }
2148ef5710eSLuboš Luňák
isWorkerThread() const215d9547f41SJohn Demme bool ThreadPool::isWorkerThread() const {
216e5c944b4SFangrui Song report_fatal_error("LLVM compiled without multithreading");
217d9547f41SJohn Demme }
218d9547f41SJohn Demme
~ThreadPool()2198404aeb5SAlexandre Ganea ThreadPool::~ThreadPool() { wait(); }
22033a7ea4bSMehdi Amini
22133a7ea4bSMehdi Amini #endif
222