13a57fbd6SZachary Turner //===- llvm/Support/Parallel.cpp - Parallel algorithms --------------------===//
23a57fbd6SZachary Turner //
32946cd70SChandler Carruth // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
42946cd70SChandler Carruth // See https://llvm.org/LICENSE.txt for license information.
52946cd70SChandler Carruth // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
63a57fbd6SZachary Turner //
73a57fbd6SZachary Turner //===----------------------------------------------------------------------===//
83a57fbd6SZachary Turner
93a57fbd6SZachary Turner #include "llvm/Support/Parallel.h"
103a57fbd6SZachary Turner #include "llvm/Config/llvm-config.h"
11564481aeSAndrew Ng #include "llvm/Support/ManagedStatic.h"
128c0ff950SRafael Espindola #include "llvm/Support/Threading.h"
133a57fbd6SZachary Turner
143a57fbd6SZachary Turner #include <atomic>
15564481aeSAndrew Ng #include <future>
163a57fbd6SZachary Turner #include <stack>
173a57fbd6SZachary Turner #include <thread>
18564481aeSAndrew Ng #include <vector>
193a57fbd6SZachary Turner
20eb4663d8SFangrui Song llvm::ThreadPoolStrategy llvm::parallel::strategy;
21eb4663d8SFangrui Song
224137ab62SFangrui Song #if LLVM_ENABLE_THREADS
234137ab62SFangrui Song
24f6a62909SFangrui Song namespace llvm {
25f6a62909SFangrui Song namespace parallel {
26f6a62909SFangrui Song namespace detail {
273a57fbd6SZachary Turner
283a57fbd6SZachary Turner namespace {
293a57fbd6SZachary Turner
305f8f34e4SAdrian Prantl /// An abstract class that takes closures and runs them asynchronously.
313a57fbd6SZachary Turner class Executor {
323a57fbd6SZachary Turner public:
333a57fbd6SZachary Turner virtual ~Executor() = default;
343a57fbd6SZachary Turner virtual void add(std::function<void()> func) = 0;
353a57fbd6SZachary Turner
363a57fbd6SZachary Turner static Executor *getDefaultExecutor();
373a57fbd6SZachary Turner };
383a57fbd6SZachary Turner
395f8f34e4SAdrian Prantl /// An implementation of an Executor that runs closures on a thread pool
403a57fbd6SZachary Turner /// in filo order.
413a57fbd6SZachary Turner class ThreadPoolExecutor : public Executor {
423a57fbd6SZachary Turner public:
ThreadPoolExecutor(ThreadPoolStrategy S=hardware_concurrency ())438404aeb5SAlexandre Ganea explicit ThreadPoolExecutor(ThreadPoolStrategy S = hardware_concurrency()) {
448404aeb5SAlexandre Ganea unsigned ThreadCount = S.compute_thread_count();
453a57fbd6SZachary Turner // Spawn all but one of the threads in another thread as spawning threads
463a57fbd6SZachary Turner // can take a while.
47564481aeSAndrew Ng Threads.reserve(ThreadCount);
48564481aeSAndrew Ng Threads.resize(1);
49564481aeSAndrew Ng std::lock_guard<std::mutex> Lock(Mutex);
508404aeb5SAlexandre Ganea Threads[0] = std::thread([this, ThreadCount, S] {
518404aeb5SAlexandre Ganea for (unsigned I = 1; I < ThreadCount; ++I) {
528404aeb5SAlexandre Ganea Threads.emplace_back([=] { work(S, I); });
53564481aeSAndrew Ng if (Stop)
54564481aeSAndrew Ng break;
553a57fbd6SZachary Turner }
56564481aeSAndrew Ng ThreadsCreated.set_value();
578404aeb5SAlexandre Ganea work(S, 0);
58564481aeSAndrew Ng });
59564481aeSAndrew Ng }
60564481aeSAndrew Ng
stop()61564481aeSAndrew Ng void stop() {
62564481aeSAndrew Ng {
63564481aeSAndrew Ng std::lock_guard<std::mutex> Lock(Mutex);
64564481aeSAndrew Ng if (Stop)
65564481aeSAndrew Ng return;
66564481aeSAndrew Ng Stop = true;
67564481aeSAndrew Ng }
68564481aeSAndrew Ng Cond.notify_all();
69564481aeSAndrew Ng ThreadsCreated.get_future().wait();
703a57fbd6SZachary Turner }
713a57fbd6SZachary Turner
~ThreadPoolExecutor()723a57fbd6SZachary Turner ~ThreadPoolExecutor() override {
73564481aeSAndrew Ng stop();
74564481aeSAndrew Ng std::thread::id CurrentThreadId = std::this_thread::get_id();
75564481aeSAndrew Ng for (std::thread &T : Threads)
76564481aeSAndrew Ng if (T.get_id() == CurrentThreadId)
77564481aeSAndrew Ng T.detach();
78564481aeSAndrew Ng else
79564481aeSAndrew Ng T.join();
803a57fbd6SZachary Turner }
813a57fbd6SZachary Turner
82eb4663d8SFangrui Song struct Creator {
callllvm::parallel::detail::__anond7d369990111::ThreadPoolExecutor::Creator83eb4663d8SFangrui Song static void *call() { return new ThreadPoolExecutor(strategy); }
84eb4663d8SFangrui Song };
85564481aeSAndrew Ng struct Deleter {
callllvm::parallel::detail::__anond7d369990111::ThreadPoolExecutor::Deleter86564481aeSAndrew Ng static void call(void *Ptr) { ((ThreadPoolExecutor *)Ptr)->stop(); }
87564481aeSAndrew Ng };
88564481aeSAndrew Ng
add(std::function<void ()> F)893a57fbd6SZachary Turner void add(std::function<void()> F) override {
90564481aeSAndrew Ng {
91564481aeSAndrew Ng std::lock_guard<std::mutex> Lock(Mutex);
924290ef54SAndrew Ng WorkStack.push(std::move(F));
93564481aeSAndrew Ng }
943a57fbd6SZachary Turner Cond.notify_one();
953a57fbd6SZachary Turner }
963a57fbd6SZachary Turner
973a57fbd6SZachary Turner private:
work(ThreadPoolStrategy S,unsigned ThreadID)988404aeb5SAlexandre Ganea void work(ThreadPoolStrategy S, unsigned ThreadID) {
998404aeb5SAlexandre Ganea S.apply_thread_strategy(ThreadID);
1003a57fbd6SZachary Turner while (true) {
1013a57fbd6SZachary Turner std::unique_lock<std::mutex> Lock(Mutex);
1023a57fbd6SZachary Turner Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); });
1033a57fbd6SZachary Turner if (Stop)
1043a57fbd6SZachary Turner break;
1054290ef54SAndrew Ng auto Task = std::move(WorkStack.top());
1063a57fbd6SZachary Turner WorkStack.pop();
1073a57fbd6SZachary Turner Lock.unlock();
1083a57fbd6SZachary Turner Task();
1093a57fbd6SZachary Turner }
1103a57fbd6SZachary Turner }
1113a57fbd6SZachary Turner
1123a57fbd6SZachary Turner std::atomic<bool> Stop{false};
1133a57fbd6SZachary Turner std::stack<std::function<void()>> WorkStack;
1143a57fbd6SZachary Turner std::mutex Mutex;
1153a57fbd6SZachary Turner std::condition_variable Cond;
116564481aeSAndrew Ng std::promise<void> ThreadsCreated;
117564481aeSAndrew Ng std::vector<std::thread> Threads;
1183a57fbd6SZachary Turner };
1193a57fbd6SZachary Turner
getDefaultExecutor()1203a57fbd6SZachary Turner Executor *Executor::getDefaultExecutor() {
121564481aeSAndrew Ng // The ManagedStatic enables the ThreadPoolExecutor to be stopped via
122564481aeSAndrew Ng // llvm_shutdown() which allows a "clean" fast exit, e.g. via _exit(). This
123564481aeSAndrew Ng // stops the thread pool and waits for any worker thread creation to complete
124564481aeSAndrew Ng // but does not wait for the threads to finish. The wait for worker thread
125564481aeSAndrew Ng // creation to complete is important as it prevents intermittent crashes on
126564481aeSAndrew Ng // Windows due to a race condition between thread creation and process exit.
127564481aeSAndrew Ng //
128564481aeSAndrew Ng // The ThreadPoolExecutor will only be destroyed when the static unique_ptr to
129564481aeSAndrew Ng // it is destroyed, i.e. in a normal full exit. The ThreadPoolExecutor
130564481aeSAndrew Ng // destructor ensures it has been stopped and waits for worker threads to
131564481aeSAndrew Ng // finish. The wait is important as it prevents intermittent crashes on
132564481aeSAndrew Ng // Windows when the process is doing a full exit.
133564481aeSAndrew Ng //
134564481aeSAndrew Ng // The Windows crashes appear to only occur with the MSVC static runtimes and
135564481aeSAndrew Ng // are more frequent with the debug static runtime.
136564481aeSAndrew Ng //
137564481aeSAndrew Ng // This also prevents intermittent deadlocks on exit with the MinGW runtime.
138eb4663d8SFangrui Song
139eb4663d8SFangrui Song static ManagedStatic<ThreadPoolExecutor, ThreadPoolExecutor::Creator,
140564481aeSAndrew Ng ThreadPoolExecutor::Deleter>
141564481aeSAndrew Ng ManagedExec;
142564481aeSAndrew Ng static std::unique_ptr<ThreadPoolExecutor> Exec(&(*ManagedExec));
143564481aeSAndrew Ng return Exec.get();
1443a57fbd6SZachary Turner }
145d4960032SNico Weber } // namespace
1463a57fbd6SZachary Turner
147f6a62909SFangrui Song static std::atomic<int> TaskGroupInstances;
148f6a62909SFangrui Song
149f6a62909SFangrui Song // Latch::sync() called by the dtor may cause one thread to block. If is a dead
150f6a62909SFangrui Song // lock if all threads in the default executor are blocked. To prevent the dead
151f6a62909SFangrui Song // lock, only allow the first TaskGroup to run tasks parallelly. In the scenario
152f6a62909SFangrui Song // of nested parallel_for_each(), only the outermost one runs parallelly.
TaskGroup()153f6a62909SFangrui Song TaskGroup::TaskGroup() : Parallel(TaskGroupInstances++ == 0) {}
~TaskGroup()1547b25fa8cSAlexandre Ganea TaskGroup::~TaskGroup() {
1557b25fa8cSAlexandre Ganea // We must ensure that all the workloads have finished before decrementing the
1567b25fa8cSAlexandre Ganea // instances count.
1577b25fa8cSAlexandre Ganea L.sync();
1587b25fa8cSAlexandre Ganea --TaskGroupInstances;
1597b25fa8cSAlexandre Ganea }
160f6a62909SFangrui Song
spawn(std::function<void ()> F)161f6a62909SFangrui Song void TaskGroup::spawn(std::function<void()> F) {
162f6a62909SFangrui Song if (Parallel) {
1633a57fbd6SZachary Turner L.inc();
1644290ef54SAndrew Ng Executor::getDefaultExecutor()->add([&, F = std::move(F)] {
1653a57fbd6SZachary Turner F();
1663a57fbd6SZachary Turner L.dec();
1673a57fbd6SZachary Turner });
168f6a62909SFangrui Song } else {
169f6a62909SFangrui Song F();
1703a57fbd6SZachary Turner }
171f6a62909SFangrui Song }
172f6a62909SFangrui Song
173f6a62909SFangrui Song } // namespace detail
174f6a62909SFangrui Song } // namespace parallel
175f6a62909SFangrui Song } // namespace llvm
1760f2a48c1SNico Weber #endif // LLVM_ENABLE_THREADS
1778e382ae9SFangrui Song
parallelFor(size_t Begin,size_t End,llvm::function_ref<void (size_t)> Fn)178*7effcbdaSNico Weber void llvm::parallelFor(size_t Begin, size_t End,
1798e382ae9SFangrui Song llvm::function_ref<void(size_t)> Fn) {
1808e382ae9SFangrui Song // If we have zero or one items, then do not incur the overhead of spinning up
1818e382ae9SFangrui Song // a task group. They are surprisingly expensive, and because they do not
1828e382ae9SFangrui Song // support nested parallelism, a single entry task group can block parallel
1838e382ae9SFangrui Song // execution underneath them.
1848e382ae9SFangrui Song #if LLVM_ENABLE_THREADS
1858e382ae9SFangrui Song auto NumItems = End - Begin;
1868e382ae9SFangrui Song if (NumItems > 1 && parallel::strategy.ThreadsRequested != 1) {
1878e382ae9SFangrui Song // Limit the number of tasks to MaxTasksPerGroup to limit job scheduling
1888e382ae9SFangrui Song // overhead on large inputs.
1898e382ae9SFangrui Song auto TaskSize = NumItems / parallel::detail::MaxTasksPerGroup;
1908e382ae9SFangrui Song if (TaskSize == 0)
1918e382ae9SFangrui Song TaskSize = 1;
1928e382ae9SFangrui Song
1938e382ae9SFangrui Song parallel::detail::TaskGroup TG;
1948e382ae9SFangrui Song for (; Begin + TaskSize < End; Begin += TaskSize) {
1958e382ae9SFangrui Song TG.spawn([=, &Fn] {
1968e382ae9SFangrui Song for (size_t I = Begin, E = Begin + TaskSize; I != E; ++I)
1978e382ae9SFangrui Song Fn(I);
1988e382ae9SFangrui Song });
1998e382ae9SFangrui Song }
2008e382ae9SFangrui Song for (; Begin != End; ++Begin)
2018e382ae9SFangrui Song Fn(Begin);
2028e382ae9SFangrui Song return;
2038e382ae9SFangrui Song }
2048e382ae9SFangrui Song #endif
2058e382ae9SFangrui Song
2068e382ae9SFangrui Song for (; Begin != End; ++Begin)
2078e382ae9SFangrui Song Fn(Begin);
2088e382ae9SFangrui Song }
209