133a7ea4bSMehdi Amini //==-- llvm/Support/ThreadPool.cpp - A ThreadPool implementation -*- C++ -*-==// 233a7ea4bSMehdi Amini // 32946cd70SChandler Carruth // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. 42946cd70SChandler Carruth // See https://llvm.org/LICENSE.txt for license information. 52946cd70SChandler Carruth // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception 633a7ea4bSMehdi Amini // 733a7ea4bSMehdi Amini //===----------------------------------------------------------------------===// 833a7ea4bSMehdi Amini // 933a7ea4bSMehdi Amini // This file implements a crude C++11 based thread pool. 1033a7ea4bSMehdi Amini // 1133a7ea4bSMehdi Amini //===----------------------------------------------------------------------===// 1233a7ea4bSMehdi Amini 1333a7ea4bSMehdi Amini #include "llvm/Support/ThreadPool.h" 1433a7ea4bSMehdi Amini 1533a7ea4bSMehdi Amini #include "llvm/Config/llvm-config.h" 160984aa70Sserge-sans-paille 170984aa70Sserge-sans-paille #if LLVM_ENABLE_THREADS 188c0ff950SRafael Espindola #include "llvm/Support/Threading.h" 190984aa70Sserge-sans-paille #else 200984aa70Sserge-sans-paille #include "llvm/Support/raw_ostream.h" 210984aa70Sserge-sans-paille #endif 2233a7ea4bSMehdi Amini 2333a7ea4bSMehdi Amini using namespace llvm; 2433a7ea4bSMehdi Amini 2533a7ea4bSMehdi Amini #if LLVM_ENABLE_THREADS 2633a7ea4bSMehdi Amini 27*8ef5710eSLuboš Luňák // A note on thread groups: Tasks are by default in no group (represented 28*8ef5710eSLuboš Luňák // by nullptr ThreadPoolTaskGroup pointer in the Tasks queue) and functionality 29*8ef5710eSLuboš Luňák // here normally works on all tasks regardless of their group (functions 30*8ef5710eSLuboš Luňák // in that case receive nullptr ThreadPoolTaskGroup pointer as argument). 31*8ef5710eSLuboš Luňák // A task in a group has a pointer to that ThreadPoolTaskGroup in the Tasks 32*8ef5710eSLuboš Luňák // queue, and functions called to work only on tasks from one group take that 33*8ef5710eSLuboš Luňák // pointer. 34*8ef5710eSLuboš Luňák 35b28f317cSMehdi Amini ThreadPool::ThreadPool(ThreadPoolStrategy S) 36b28f317cSMehdi Amini : Strategy(S), MaxThreadCount(S.compute_thread_count()) {} 37b28f317cSMehdi Amini 38e8469718SMehdi Amini void ThreadPool::grow(int requested) { 39*8ef5710eSLuboš Luňák llvm::sys::ScopedWriter LockGuard(ThreadsLock); 40728b982bSBenoit Jacob if (Threads.size() >= MaxThreadCount) 41728b982bSBenoit Jacob return; // Already hit the max thread pool size. 42e8469718SMehdi Amini int newThreadCount = std::min<int>(requested, MaxThreadCount); 43e8469718SMehdi Amini while (static_cast<int>(Threads.size()) < newThreadCount) { 44728b982bSBenoit Jacob int ThreadID = Threads.size(); 45728b982bSBenoit Jacob Threads.emplace_back([this, ThreadID] { 46728b982bSBenoit Jacob Strategy.apply_thread_strategy(ThreadID); 47*8ef5710eSLuboš Luňák processTasks(nullptr); 48*8ef5710eSLuboš Luňák }); 49*8ef5710eSLuboš Luňák } 50*8ef5710eSLuboš Luňák } 51*8ef5710eSLuboš Luňák 52*8ef5710eSLuboš Luňák #ifndef NDEBUG 53*8ef5710eSLuboš Luňák // The group of the tasks run by the current thread. 54*8ef5710eSLuboš Luňák static LLVM_THREAD_LOCAL std::vector<ThreadPoolTaskGroup *> 55*8ef5710eSLuboš Luňák *CurrentThreadTaskGroups = nullptr; 56*8ef5710eSLuboš Luňák #endif 57*8ef5710eSLuboš Luňák 58*8ef5710eSLuboš Luňák // WaitingForGroup == nullptr means all tasks regardless of their group. 59*8ef5710eSLuboš Luňák void ThreadPool::processTasks(ThreadPoolTaskGroup *WaitingForGroup) { 6033a7ea4bSMehdi Amini while (true) { 618cb1af73SFlorian Hahn std::function<void()> Task; 62*8ef5710eSLuboš Luňák ThreadPoolTaskGroup *GroupOfTask; 6333a7ea4bSMehdi Amini { 6433a7ea4bSMehdi Amini std::unique_lock<std::mutex> LockGuard(QueueLock); 65*8ef5710eSLuboš Luňák bool workCompletedForGroup = false; // Result of workCompletedUnlocked() 6633a7ea4bSMehdi Amini // Wait for tasks to be pushed in the queue 67*8ef5710eSLuboš Luňák QueueCondition.wait(LockGuard, [&] { 68*8ef5710eSLuboš Luňák return !EnableFlag || !Tasks.empty() || 69*8ef5710eSLuboš Luňák (WaitingForGroup != nullptr && 70*8ef5710eSLuboš Luňák (workCompletedForGroup = 71*8ef5710eSLuboš Luňák workCompletedUnlocked(WaitingForGroup))); 72*8ef5710eSLuboš Luňák }); 7333a7ea4bSMehdi Amini // Exit condition 7433a7ea4bSMehdi Amini if (!EnableFlag && Tasks.empty()) 7533a7ea4bSMehdi Amini return; 76*8ef5710eSLuboš Luňák if (WaitingForGroup != nullptr && workCompletedForGroup) 77*8ef5710eSLuboš Luňák return; 7833a7ea4bSMehdi Amini // Yeah, we have a task, grab it and release the lock on the queue 7933a7ea4bSMehdi Amini 8033a7ea4bSMehdi Amini // We first need to signal that we are active before popping the queue 8133a7ea4bSMehdi Amini // in order for wait() to properly detect that even if the queue is 8233a7ea4bSMehdi Amini // empty, there is still a task in flight. 83c723f657SJan Korous ++ActiveThreads; 84*8ef5710eSLuboš Luňák Task = std::move(Tasks.front().first); 85*8ef5710eSLuboš Luňák GroupOfTask = Tasks.front().second; 86*8ef5710eSLuboš Luňák // Need to count active threads in each group separately, ActiveThreads 87*8ef5710eSLuboš Luňák // would never be 0 if waiting for another group inside a wait. 88*8ef5710eSLuboš Luňák if (GroupOfTask != nullptr) 89*8ef5710eSLuboš Luňák ++ActiveGroups[GroupOfTask]; // Increment or set to 1 if new item 90*8ef5710eSLuboš Luňák Tasks.pop_front(); 9133a7ea4bSMehdi Amini } 92*8ef5710eSLuboš Luňák #ifndef NDEBUG 93*8ef5710eSLuboš Luňák if (CurrentThreadTaskGroups == nullptr) 94*8ef5710eSLuboš Luňák CurrentThreadTaskGroups = new std::vector<ThreadPoolTaskGroup *>; 95*8ef5710eSLuboš Luňák CurrentThreadTaskGroups->push_back(GroupOfTask); 96*8ef5710eSLuboš Luňák #endif 97*8ef5710eSLuboš Luňák 9833a7ea4bSMehdi Amini // Run the task we just grabbed 999b8b0794SZachary Turner Task(); 10033a7ea4bSMehdi Amini 101*8ef5710eSLuboš Luňák #ifndef NDEBUG 102*8ef5710eSLuboš Luňák CurrentThreadTaskGroups->pop_back(); 103*8ef5710eSLuboš Luňák #endif 104*8ef5710eSLuboš Luňák 1056f230491SFangrui Song bool Notify; 106*8ef5710eSLuboš Luňák bool NotifyGroup; 10733a7ea4bSMehdi Amini { 10833a7ea4bSMehdi Amini // Adjust `ActiveThreads`, in case someone waits on ThreadPool::wait() 1096f230491SFangrui Song std::lock_guard<std::mutex> LockGuard(QueueLock); 11033a7ea4bSMehdi Amini --ActiveThreads; 111*8ef5710eSLuboš Luňák if (GroupOfTask != nullptr) { 112*8ef5710eSLuboš Luňák auto A = ActiveGroups.find(GroupOfTask); 113*8ef5710eSLuboš Luňák if (--(A->second) == 0) 114*8ef5710eSLuboš Luňák ActiveGroups.erase(A); 115*8ef5710eSLuboš Luňák } 116*8ef5710eSLuboš Luňák Notify = workCompletedUnlocked(GroupOfTask); 117*8ef5710eSLuboš Luňák NotifyGroup = GroupOfTask != nullptr && Notify; 11833a7ea4bSMehdi Amini } 1196f230491SFangrui Song // Notify task completion if this is the last active thread, in case 1206f230491SFangrui Song // someone waits on ThreadPool::wait(). 1216f230491SFangrui Song if (Notify) 12233a7ea4bSMehdi Amini CompletionCondition.notify_all(); 123*8ef5710eSLuboš Luňák // If this was a task in a group, notify also threads waiting for tasks 124*8ef5710eSLuboš Luňák // in this function on QueueCondition, to make a recursive wait() return 125*8ef5710eSLuboš Luňák // after the group it's been waiting for has finished. 126*8ef5710eSLuboš Luňák if (NotifyGroup) 127*8ef5710eSLuboš Luňák QueueCondition.notify_all(); 12833a7ea4bSMehdi Amini } 12933a7ea4bSMehdi Amini } 130*8ef5710eSLuboš Luňák 131*8ef5710eSLuboš Luňák bool ThreadPool::workCompletedUnlocked(ThreadPoolTaskGroup *Group) const { 132*8ef5710eSLuboš Luňák if (Group == nullptr) 133*8ef5710eSLuboš Luňák return !ActiveThreads && Tasks.empty(); 134*8ef5710eSLuboš Luňák return ActiveGroups.count(Group) == 0 && 135*8ef5710eSLuboš Luňák !llvm::any_of(Tasks, 136*8ef5710eSLuboš Luňák [Group](const auto &T) { return T.second == Group; }); 137e8469718SMehdi Amini } 13833a7ea4bSMehdi Amini 13933a7ea4bSMehdi Amini void ThreadPool::wait() { 140*8ef5710eSLuboš Luňák assert(!isWorkerThread()); // Would deadlock waiting for itself. 14133a7ea4bSMehdi Amini // Wait for all threads to complete and the queue to be empty 1426f230491SFangrui Song std::unique_lock<std::mutex> LockGuard(QueueLock); 143*8ef5710eSLuboš Luňák CompletionCondition.wait(LockGuard, 144*8ef5710eSLuboš Luňák [&] { return workCompletedUnlocked(nullptr); }); 145*8ef5710eSLuboš Luňák } 146*8ef5710eSLuboš Luňák 147*8ef5710eSLuboš Luňák void ThreadPool::wait(ThreadPoolTaskGroup &Group) { 148*8ef5710eSLuboš Luňák // Wait for all threads in the group to complete. 149*8ef5710eSLuboš Luňák if (!isWorkerThread()) { 150*8ef5710eSLuboš Luňák std::unique_lock<std::mutex> LockGuard(QueueLock); 151*8ef5710eSLuboš Luňák CompletionCondition.wait(LockGuard, 152*8ef5710eSLuboš Luňák [&] { return workCompletedUnlocked(&Group); }); 153*8ef5710eSLuboš Luňák return; 154*8ef5710eSLuboš Luňák } 155*8ef5710eSLuboš Luňák // Make sure to not deadlock waiting for oneself. 156*8ef5710eSLuboš Luňák assert(CurrentThreadTaskGroups == nullptr || 157*8ef5710eSLuboš Luňák !llvm::is_contained(*CurrentThreadTaskGroups, &Group)); 158*8ef5710eSLuboš Luňák // Handle the case of recursive call from another task in a different group, 159*8ef5710eSLuboš Luňák // in which case process tasks while waiting to keep the thread busy and avoid 160*8ef5710eSLuboš Luňák // possible deadlock. 161*8ef5710eSLuboš Luňák processTasks(&Group); 16233a7ea4bSMehdi Amini } 16333a7ea4bSMehdi Amini 1646569cf2aSRiver Riddle bool ThreadPool::isWorkerThread() const { 165*8ef5710eSLuboš Luňák llvm::sys::ScopedReader LockGuard(ThreadsLock); 16648c68a63STim Northover llvm::thread::id CurrentThreadId = llvm::this_thread::get_id(); 16748c68a63STim Northover for (const llvm::thread &Thread : Threads) 1686569cf2aSRiver Riddle if (CurrentThreadId == Thread.get_id()) 1696569cf2aSRiver Riddle return true; 1706569cf2aSRiver Riddle return false; 1716569cf2aSRiver Riddle } 1726569cf2aSRiver Riddle 17333a7ea4bSMehdi Amini // The destructor joins all threads, waiting for completion. 17433a7ea4bSMehdi Amini ThreadPool::~ThreadPool() { 17533a7ea4bSMehdi Amini { 17633a7ea4bSMehdi Amini std::unique_lock<std::mutex> LockGuard(QueueLock); 17733a7ea4bSMehdi Amini EnableFlag = false; 17833a7ea4bSMehdi Amini } 17933a7ea4bSMehdi Amini QueueCondition.notify_all(); 180*8ef5710eSLuboš Luňák llvm::sys::ScopedReader LockGuard(ThreadsLock); 18133a7ea4bSMehdi Amini for (auto &Worker : Threads) 18233a7ea4bSMehdi Amini Worker.join(); 18333a7ea4bSMehdi Amini } 18433a7ea4bSMehdi Amini 18533a7ea4bSMehdi Amini #else // LLVM_ENABLE_THREADS Disabled 18633a7ea4bSMehdi Amini 18733a7ea4bSMehdi Amini // No threads are launched, issue a warning if ThreadCount is not 0 188b28f317cSMehdi Amini ThreadPool::ThreadPool(ThreadPoolStrategy S) : MaxThreadCount(1) { 189b28f317cSMehdi Amini int ThreadCount = S.compute_thread_count(); 1908404aeb5SAlexandre Ganea if (ThreadCount != 1) { 19133a7ea4bSMehdi Amini errs() << "Warning: request a ThreadPool with " << ThreadCount 19233a7ea4bSMehdi Amini << " threads, but LLVM_ENABLE_THREADS has been turned off\n"; 19333a7ea4bSMehdi Amini } 19433a7ea4bSMehdi Amini } 19533a7ea4bSMehdi Amini 19633a7ea4bSMehdi Amini void ThreadPool::wait() { 19733a7ea4bSMehdi Amini // Sequential implementation running the tasks 19833a7ea4bSMehdi Amini while (!Tasks.empty()) { 199*8ef5710eSLuboš Luňák auto Task = std::move(Tasks.front().first); 200*8ef5710eSLuboš Luňák Tasks.pop_front(); 20133a7ea4bSMehdi Amini Task(); 20233a7ea4bSMehdi Amini } 20333a7ea4bSMehdi Amini } 20433a7ea4bSMehdi Amini 205*8ef5710eSLuboš Luňák void ThreadPool::wait(ThreadPoolTaskGroup &) { 206*8ef5710eSLuboš Luňák // Simply wait for all, this works even if recursive (the running task 207*8ef5710eSLuboš Luňák // is already removed from the queue). 208*8ef5710eSLuboš Luňák wait(); 209*8ef5710eSLuboš Luňák } 210*8ef5710eSLuboš Luňák 211d9547f41SJohn Demme bool ThreadPool::isWorkerThread() const { 212e5c944b4SFangrui Song report_fatal_error("LLVM compiled without multithreading"); 213d9547f41SJohn Demme } 214d9547f41SJohn Demme 2158404aeb5SAlexandre Ganea ThreadPool::~ThreadPool() { wait(); } 21633a7ea4bSMehdi Amini 21733a7ea4bSMehdi Amini #endif 218