1 //===- llvm/Support/Parallel.cpp - Parallel algorithms --------------------===// 2 // 3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. 4 // See https://llvm.org/LICENSE.txt for license information. 5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception 6 // 7 //===----------------------------------------------------------------------===// 8 9 #include "llvm/Support/Parallel.h" 10 #include "llvm/Config/llvm-config.h" 11 #include "llvm/Support/ManagedStatic.h" 12 13 #if LLVM_ENABLE_THREADS 14 15 #include "llvm/Support/Threading.h" 16 17 #include <atomic> 18 #include <future> 19 #include <stack> 20 #include <thread> 21 #include <vector> 22 23 llvm::ThreadPoolStrategy llvm::parallel::strategy; 24 25 namespace llvm { 26 namespace parallel { 27 namespace detail { 28 29 namespace { 30 31 /// An abstract class that takes closures and runs them asynchronously. 32 class Executor { 33 public: 34 virtual ~Executor() = default; 35 virtual void add(std::function<void()> func) = 0; 36 37 static Executor *getDefaultExecutor(); 38 }; 39 40 /// An implementation of an Executor that runs closures on a thread pool 41 /// in filo order. 42 class ThreadPoolExecutor : public Executor { 43 public: 44 explicit ThreadPoolExecutor(ThreadPoolStrategy S = hardware_concurrency()) { 45 unsigned ThreadCount = S.compute_thread_count(); 46 // Spawn all but one of the threads in another thread as spawning threads 47 // can take a while. 48 Threads.reserve(ThreadCount); 49 Threads.resize(1); 50 std::lock_guard<std::mutex> Lock(Mutex); 51 Threads[0] = std::thread([this, ThreadCount, S] { 52 for (unsigned I = 1; I < ThreadCount; ++I) { 53 Threads.emplace_back([=] { work(S, I); }); 54 if (Stop) 55 break; 56 } 57 ThreadsCreated.set_value(); 58 work(S, 0); 59 }); 60 } 61 62 void stop() { 63 { 64 std::lock_guard<std::mutex> Lock(Mutex); 65 if (Stop) 66 return; 67 Stop = true; 68 } 69 Cond.notify_all(); 70 ThreadsCreated.get_future().wait(); 71 } 72 73 ~ThreadPoolExecutor() override { 74 stop(); 75 std::thread::id CurrentThreadId = std::this_thread::get_id(); 76 for (std::thread &T : Threads) 77 if (T.get_id() == CurrentThreadId) 78 T.detach(); 79 else 80 T.join(); 81 } 82 83 struct Creator { 84 static void *call() { return new ThreadPoolExecutor(strategy); } 85 }; 86 struct Deleter { 87 static void call(void *Ptr) { ((ThreadPoolExecutor *)Ptr)->stop(); } 88 }; 89 90 void add(std::function<void()> F) override { 91 { 92 std::lock_guard<std::mutex> Lock(Mutex); 93 WorkStack.push(F); 94 } 95 Cond.notify_one(); 96 } 97 98 private: 99 void work(ThreadPoolStrategy S, unsigned ThreadID) { 100 S.apply_thread_strategy(ThreadID); 101 while (true) { 102 std::unique_lock<std::mutex> Lock(Mutex); 103 Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); }); 104 if (Stop) 105 break; 106 auto Task = WorkStack.top(); 107 WorkStack.pop(); 108 Lock.unlock(); 109 Task(); 110 } 111 } 112 113 std::atomic<bool> Stop{false}; 114 std::stack<std::function<void()>> WorkStack; 115 std::mutex Mutex; 116 std::condition_variable Cond; 117 std::promise<void> ThreadsCreated; 118 std::vector<std::thread> Threads; 119 }; 120 121 Executor *Executor::getDefaultExecutor() { 122 // The ManagedStatic enables the ThreadPoolExecutor to be stopped via 123 // llvm_shutdown() which allows a "clean" fast exit, e.g. via _exit(). This 124 // stops the thread pool and waits for any worker thread creation to complete 125 // but does not wait for the threads to finish. The wait for worker thread 126 // creation to complete is important as it prevents intermittent crashes on 127 // Windows due to a race condition between thread creation and process exit. 128 // 129 // The ThreadPoolExecutor will only be destroyed when the static unique_ptr to 130 // it is destroyed, i.e. in a normal full exit. The ThreadPoolExecutor 131 // destructor ensures it has been stopped and waits for worker threads to 132 // finish. The wait is important as it prevents intermittent crashes on 133 // Windows when the process is doing a full exit. 134 // 135 // The Windows crashes appear to only occur with the MSVC static runtimes and 136 // are more frequent with the debug static runtime. 137 // 138 // This also prevents intermittent deadlocks on exit with the MinGW runtime. 139 140 static ManagedStatic<ThreadPoolExecutor, ThreadPoolExecutor::Creator, 141 ThreadPoolExecutor::Deleter> 142 ManagedExec; 143 static std::unique_ptr<ThreadPoolExecutor> Exec(&(*ManagedExec)); 144 return Exec.get(); 145 } 146 } // namespace 147 148 static std::atomic<int> TaskGroupInstances; 149 150 // Latch::sync() called by the dtor may cause one thread to block. If is a dead 151 // lock if all threads in the default executor are blocked. To prevent the dead 152 // lock, only allow the first TaskGroup to run tasks parallelly. In the scenario 153 // of nested parallel_for_each(), only the outermost one runs parallelly. 154 TaskGroup::TaskGroup() : Parallel(TaskGroupInstances++ == 0) {} 155 TaskGroup::~TaskGroup() { --TaskGroupInstances; } 156 157 void TaskGroup::spawn(std::function<void()> F) { 158 if (Parallel) { 159 L.inc(); 160 Executor::getDefaultExecutor()->add([&, F] { 161 F(); 162 L.dec(); 163 }); 164 } else { 165 F(); 166 } 167 } 168 169 } // namespace detail 170 } // namespace parallel 171 } // namespace llvm 172 #endif // LLVM_ENABLE_THREADS 173