133a7ea4bSMehdi Amini //==-- llvm/Support/ThreadPool.cpp - A ThreadPool implementation -*- C++ -*-==// 233a7ea4bSMehdi Amini // 32946cd70SChandler Carruth // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. 42946cd70SChandler Carruth // See https://llvm.org/LICENSE.txt for license information. 52946cd70SChandler Carruth // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception 633a7ea4bSMehdi Amini // 733a7ea4bSMehdi Amini //===----------------------------------------------------------------------===// 833a7ea4bSMehdi Amini // 933a7ea4bSMehdi Amini // This file implements a crude C++11 based thread pool. 1033a7ea4bSMehdi Amini // 1133a7ea4bSMehdi Amini //===----------------------------------------------------------------------===// 1233a7ea4bSMehdi Amini 1333a7ea4bSMehdi Amini #include "llvm/Support/ThreadPool.h" 1433a7ea4bSMehdi Amini 1533a7ea4bSMehdi Amini #include "llvm/Config/llvm-config.h" 168c0ff950SRafael Espindola #include "llvm/Support/Threading.h" 1786f0b70fSHans Wennborg #include "llvm/Support/raw_ostream.h" 1833a7ea4bSMehdi Amini 1933a7ea4bSMehdi Amini using namespace llvm; 2033a7ea4bSMehdi Amini 2133a7ea4bSMehdi Amini #if LLVM_ENABLE_THREADS 2233a7ea4bSMehdi Amini 238404aeb5SAlexandre Ganea ThreadPool::ThreadPool(ThreadPoolStrategy S) 246f230491SFangrui Song : ThreadCount(S.compute_thread_count()) { 2533a7ea4bSMehdi Amini // Create ThreadCount threads that will loop forever, wait on QueueCondition 2633a7ea4bSMehdi Amini // for tasks to be queued or the Pool to be destroyed. 2733a7ea4bSMehdi Amini Threads.reserve(ThreadCount); 2833a7ea4bSMehdi Amini for (unsigned ThreadID = 0; ThreadID < ThreadCount; ++ThreadID) { 298404aeb5SAlexandre Ganea Threads.emplace_back([S, ThreadID, this] { 308404aeb5SAlexandre Ganea S.apply_thread_strategy(ThreadID); 3133a7ea4bSMehdi Amini while (true) { 329b8b0794SZachary Turner PackagedTaskTy Task; 3333a7ea4bSMehdi Amini { 3433a7ea4bSMehdi Amini std::unique_lock<std::mutex> LockGuard(QueueLock); 3533a7ea4bSMehdi Amini // Wait for tasks to be pushed in the queue 3633a7ea4bSMehdi Amini QueueCondition.wait(LockGuard, 3733a7ea4bSMehdi Amini [&] { return !EnableFlag || !Tasks.empty(); }); 3833a7ea4bSMehdi Amini // Exit condition 3933a7ea4bSMehdi Amini if (!EnableFlag && Tasks.empty()) 4033a7ea4bSMehdi Amini return; 4133a7ea4bSMehdi Amini // Yeah, we have a task, grab it and release the lock on the queue 4233a7ea4bSMehdi Amini 4333a7ea4bSMehdi Amini // We first need to signal that we are active before popping the queue 4433a7ea4bSMehdi Amini // in order for wait() to properly detect that even if the queue is 4533a7ea4bSMehdi Amini // empty, there is still a task in flight. 46c723f657SJan Korous ++ActiveThreads; 4733a7ea4bSMehdi Amini Task = std::move(Tasks.front()); 4833a7ea4bSMehdi Amini Tasks.pop(); 4933a7ea4bSMehdi Amini } 5033a7ea4bSMehdi Amini // Run the task we just grabbed 519b8b0794SZachary Turner Task(); 5233a7ea4bSMehdi Amini 536f230491SFangrui Song bool Notify; 5433a7ea4bSMehdi Amini { 5533a7ea4bSMehdi Amini // Adjust `ActiveThreads`, in case someone waits on ThreadPool::wait() 566f230491SFangrui Song std::lock_guard<std::mutex> LockGuard(QueueLock); 5733a7ea4bSMehdi Amini --ActiveThreads; 586f230491SFangrui Song Notify = workCompletedUnlocked(); 5933a7ea4bSMehdi Amini } 606f230491SFangrui Song // Notify task completion if this is the last active thread, in case 616f230491SFangrui Song // someone waits on ThreadPool::wait(). 626f230491SFangrui Song if (Notify) 6333a7ea4bSMehdi Amini CompletionCondition.notify_all(); 6433a7ea4bSMehdi Amini } 6533a7ea4bSMehdi Amini }); 6633a7ea4bSMehdi Amini } 6733a7ea4bSMehdi Amini } 6833a7ea4bSMehdi Amini 6933a7ea4bSMehdi Amini void ThreadPool::wait() { 7033a7ea4bSMehdi Amini // Wait for all threads to complete and the queue to be empty 716f230491SFangrui Song std::unique_lock<std::mutex> LockGuard(QueueLock); 726f230491SFangrui Song CompletionCondition.wait(LockGuard, [&] { return workCompletedUnlocked(); }); 7333a7ea4bSMehdi Amini } 7433a7ea4bSMehdi Amini 75*6569cf2aSRiver Riddle bool ThreadPool::isWorkerThread() const { 76*6569cf2aSRiver Riddle std::thread::id CurrentThreadId = std::this_thread::get_id(); 77*6569cf2aSRiver Riddle for (const std::thread &Thread : Threads) 78*6569cf2aSRiver Riddle if (CurrentThreadId == Thread.get_id()) 79*6569cf2aSRiver Riddle return true; 80*6569cf2aSRiver Riddle return false; 81*6569cf2aSRiver Riddle } 82*6569cf2aSRiver Riddle 839b8b0794SZachary Turner std::shared_future<void> ThreadPool::asyncImpl(TaskTy Task) { 849b8b0794SZachary Turner /// Wrap the Task in a packaged_task to return a future object. 859b8b0794SZachary Turner PackagedTaskTy PackagedTask(std::move(Task)); 869b8b0794SZachary Turner auto Future = PackagedTask.get_future(); 879b8b0794SZachary Turner { 889b8b0794SZachary Turner // Lock the queue and push the new task 899b8b0794SZachary Turner std::unique_lock<std::mutex> LockGuard(QueueLock); 909b8b0794SZachary Turner 919b8b0794SZachary Turner // Don't allow enqueueing after disabling the pool 929b8b0794SZachary Turner assert(EnableFlag && "Queuing a thread during ThreadPool destruction"); 939b8b0794SZachary Turner 949b8b0794SZachary Turner Tasks.push(std::move(PackagedTask)); 959b8b0794SZachary Turner } 969b8b0794SZachary Turner QueueCondition.notify_one(); 979b8b0794SZachary Turner return Future.share(); 989b8b0794SZachary Turner } 999b8b0794SZachary Turner 10033a7ea4bSMehdi Amini // The destructor joins all threads, waiting for completion. 10133a7ea4bSMehdi Amini ThreadPool::~ThreadPool() { 10233a7ea4bSMehdi Amini { 10333a7ea4bSMehdi Amini std::unique_lock<std::mutex> LockGuard(QueueLock); 10433a7ea4bSMehdi Amini EnableFlag = false; 10533a7ea4bSMehdi Amini } 10633a7ea4bSMehdi Amini QueueCondition.notify_all(); 10733a7ea4bSMehdi Amini for (auto &Worker : Threads) 10833a7ea4bSMehdi Amini Worker.join(); 10933a7ea4bSMehdi Amini } 11033a7ea4bSMehdi Amini 11133a7ea4bSMehdi Amini #else // LLVM_ENABLE_THREADS Disabled 11233a7ea4bSMehdi Amini 11333a7ea4bSMehdi Amini // No threads are launched, issue a warning if ThreadCount is not 0 1148404aeb5SAlexandre Ganea ThreadPool::ThreadPool(ThreadPoolStrategy S) 1156f230491SFangrui Song : ThreadCount(S.compute_thread_count()) { 1168404aeb5SAlexandre Ganea if (ThreadCount != 1) { 11733a7ea4bSMehdi Amini errs() << "Warning: request a ThreadPool with " << ThreadCount 11833a7ea4bSMehdi Amini << " threads, but LLVM_ENABLE_THREADS has been turned off\n"; 11933a7ea4bSMehdi Amini } 12033a7ea4bSMehdi Amini } 12133a7ea4bSMehdi Amini 12233a7ea4bSMehdi Amini void ThreadPool::wait() { 12333a7ea4bSMehdi Amini // Sequential implementation running the tasks 12433a7ea4bSMehdi Amini while (!Tasks.empty()) { 12533a7ea4bSMehdi Amini auto Task = std::move(Tasks.front()); 12633a7ea4bSMehdi Amini Tasks.pop(); 12733a7ea4bSMehdi Amini Task(); 12833a7ea4bSMehdi Amini } 12933a7ea4bSMehdi Amini } 13033a7ea4bSMehdi Amini 131b78a68dbSPeter Collingbourne std::shared_future<void> ThreadPool::asyncImpl(TaskTy Task) { 13233a7ea4bSMehdi Amini // Get a Future with launch::deferred execution using std::async 13333a7ea4bSMehdi Amini auto Future = std::async(std::launch::deferred, std::move(Task)).share(); 13433a7ea4bSMehdi Amini // Wrap the future so that both ThreadPool::wait() can operate and the 13533a7ea4bSMehdi Amini // returned future can be sync'ed on. 13633a7ea4bSMehdi Amini PackagedTaskTy PackagedTask([Future]() { Future.get(); }); 13733a7ea4bSMehdi Amini Tasks.push(std::move(PackagedTask)); 1380f0d5d8fSDavide Italiano return Future; 13933a7ea4bSMehdi Amini } 14033a7ea4bSMehdi Amini 1418404aeb5SAlexandre Ganea ThreadPool::~ThreadPool() { wait(); } 14233a7ea4bSMehdi Amini 14333a7ea4bSMehdi Amini #endif 144