13a57fbd6SZachary Turner //===- llvm/Support/Parallel.cpp - Parallel algorithms --------------------===//
23a57fbd6SZachary Turner //
32946cd70SChandler Carruth // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
42946cd70SChandler Carruth // See https://llvm.org/LICENSE.txt for license information.
52946cd70SChandler Carruth // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
63a57fbd6SZachary Turner //
73a57fbd6SZachary Turner //===----------------------------------------------------------------------===//
83a57fbd6SZachary Turner 
93a57fbd6SZachary Turner #include "llvm/Support/Parallel.h"
103a57fbd6SZachary Turner #include "llvm/Config/llvm-config.h"
11564481aeSAndrew Ng #include "llvm/Support/ManagedStatic.h"
128c0ff950SRafael Espindola #include "llvm/Support/Threading.h"
133a57fbd6SZachary Turner 
143a57fbd6SZachary Turner #include <atomic>
15564481aeSAndrew Ng #include <future>
163a57fbd6SZachary Turner #include <stack>
173a57fbd6SZachary Turner #include <thread>
18564481aeSAndrew Ng #include <vector>
193a57fbd6SZachary Turner 
20eb4663d8SFangrui Song llvm::ThreadPoolStrategy llvm::parallel::strategy;
21eb4663d8SFangrui Song 
224137ab62SFangrui Song #if LLVM_ENABLE_THREADS
234137ab62SFangrui Song 
24f6a62909SFangrui Song namespace llvm {
25f6a62909SFangrui Song namespace parallel {
26f6a62909SFangrui Song namespace detail {
273a57fbd6SZachary Turner 
283a57fbd6SZachary Turner namespace {
293a57fbd6SZachary Turner 
305f8f34e4SAdrian Prantl /// An abstract class that takes closures and runs them asynchronously.
313a57fbd6SZachary Turner class Executor {
323a57fbd6SZachary Turner public:
333a57fbd6SZachary Turner   virtual ~Executor() = default;
343a57fbd6SZachary Turner   virtual void add(std::function<void()> func) = 0;
353a57fbd6SZachary Turner 
363a57fbd6SZachary Turner   static Executor *getDefaultExecutor();
373a57fbd6SZachary Turner };
383a57fbd6SZachary Turner 
395f8f34e4SAdrian Prantl /// An implementation of an Executor that runs closures on a thread pool
403a57fbd6SZachary Turner ///   in filo order.
413a57fbd6SZachary Turner class ThreadPoolExecutor : public Executor {
423a57fbd6SZachary Turner public:
ThreadPoolExecutor(ThreadPoolStrategy S=hardware_concurrency ())438404aeb5SAlexandre Ganea   explicit ThreadPoolExecutor(ThreadPoolStrategy S = hardware_concurrency()) {
448404aeb5SAlexandre Ganea     unsigned ThreadCount = S.compute_thread_count();
453a57fbd6SZachary Turner     // Spawn all but one of the threads in another thread as spawning threads
463a57fbd6SZachary Turner     // can take a while.
47564481aeSAndrew Ng     Threads.reserve(ThreadCount);
48564481aeSAndrew Ng     Threads.resize(1);
49564481aeSAndrew Ng     std::lock_guard<std::mutex> Lock(Mutex);
508404aeb5SAlexandre Ganea     Threads[0] = std::thread([this, ThreadCount, S] {
518404aeb5SAlexandre Ganea       for (unsigned I = 1; I < ThreadCount; ++I) {
528404aeb5SAlexandre Ganea         Threads.emplace_back([=] { work(S, I); });
53564481aeSAndrew Ng         if (Stop)
54564481aeSAndrew Ng           break;
553a57fbd6SZachary Turner       }
56564481aeSAndrew Ng       ThreadsCreated.set_value();
578404aeb5SAlexandre Ganea       work(S, 0);
58564481aeSAndrew Ng     });
59564481aeSAndrew Ng   }
60564481aeSAndrew Ng 
stop()61564481aeSAndrew Ng   void stop() {
62564481aeSAndrew Ng     {
63564481aeSAndrew Ng       std::lock_guard<std::mutex> Lock(Mutex);
64564481aeSAndrew Ng       if (Stop)
65564481aeSAndrew Ng         return;
66564481aeSAndrew Ng       Stop = true;
67564481aeSAndrew Ng     }
68564481aeSAndrew Ng     Cond.notify_all();
69564481aeSAndrew Ng     ThreadsCreated.get_future().wait();
703a57fbd6SZachary Turner   }
713a57fbd6SZachary Turner 
~ThreadPoolExecutor()723a57fbd6SZachary Turner   ~ThreadPoolExecutor() override {
73564481aeSAndrew Ng     stop();
74564481aeSAndrew Ng     std::thread::id CurrentThreadId = std::this_thread::get_id();
75564481aeSAndrew Ng     for (std::thread &T : Threads)
76564481aeSAndrew Ng       if (T.get_id() == CurrentThreadId)
77564481aeSAndrew Ng         T.detach();
78564481aeSAndrew Ng       else
79564481aeSAndrew Ng         T.join();
803a57fbd6SZachary Turner   }
813a57fbd6SZachary Turner 
82eb4663d8SFangrui Song   struct Creator {
callllvm::parallel::detail::__anond7d369990111::ThreadPoolExecutor::Creator83eb4663d8SFangrui Song     static void *call() { return new ThreadPoolExecutor(strategy); }
84eb4663d8SFangrui Song   };
85564481aeSAndrew Ng   struct Deleter {
callllvm::parallel::detail::__anond7d369990111::ThreadPoolExecutor::Deleter86564481aeSAndrew Ng     static void call(void *Ptr) { ((ThreadPoolExecutor *)Ptr)->stop(); }
87564481aeSAndrew Ng   };
88564481aeSAndrew Ng 
add(std::function<void ()> F)893a57fbd6SZachary Turner   void add(std::function<void()> F) override {
90564481aeSAndrew Ng     {
91564481aeSAndrew Ng       std::lock_guard<std::mutex> Lock(Mutex);
924290ef54SAndrew Ng       WorkStack.push(std::move(F));
93564481aeSAndrew Ng     }
943a57fbd6SZachary Turner     Cond.notify_one();
953a57fbd6SZachary Turner   }
963a57fbd6SZachary Turner 
973a57fbd6SZachary Turner private:
work(ThreadPoolStrategy S,unsigned ThreadID)988404aeb5SAlexandre Ganea   void work(ThreadPoolStrategy S, unsigned ThreadID) {
998404aeb5SAlexandre Ganea     S.apply_thread_strategy(ThreadID);
1003a57fbd6SZachary Turner     while (true) {
1013a57fbd6SZachary Turner       std::unique_lock<std::mutex> Lock(Mutex);
1023a57fbd6SZachary Turner       Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); });
1033a57fbd6SZachary Turner       if (Stop)
1043a57fbd6SZachary Turner         break;
1054290ef54SAndrew Ng       auto Task = std::move(WorkStack.top());
1063a57fbd6SZachary Turner       WorkStack.pop();
1073a57fbd6SZachary Turner       Lock.unlock();
1083a57fbd6SZachary Turner       Task();
1093a57fbd6SZachary Turner     }
1103a57fbd6SZachary Turner   }
1113a57fbd6SZachary Turner 
1123a57fbd6SZachary Turner   std::atomic<bool> Stop{false};
1133a57fbd6SZachary Turner   std::stack<std::function<void()>> WorkStack;
1143a57fbd6SZachary Turner   std::mutex Mutex;
1153a57fbd6SZachary Turner   std::condition_variable Cond;
116564481aeSAndrew Ng   std::promise<void> ThreadsCreated;
117564481aeSAndrew Ng   std::vector<std::thread> Threads;
1183a57fbd6SZachary Turner };
1193a57fbd6SZachary Turner 
getDefaultExecutor()1203a57fbd6SZachary Turner Executor *Executor::getDefaultExecutor() {
121564481aeSAndrew Ng   // The ManagedStatic enables the ThreadPoolExecutor to be stopped via
122564481aeSAndrew Ng   // llvm_shutdown() which allows a "clean" fast exit, e.g. via _exit(). This
123564481aeSAndrew Ng   // stops the thread pool and waits for any worker thread creation to complete
124564481aeSAndrew Ng   // but does not wait for the threads to finish. The wait for worker thread
125564481aeSAndrew Ng   // creation to complete is important as it prevents intermittent crashes on
126564481aeSAndrew Ng   // Windows due to a race condition between thread creation and process exit.
127564481aeSAndrew Ng   //
128564481aeSAndrew Ng   // The ThreadPoolExecutor will only be destroyed when the static unique_ptr to
129564481aeSAndrew Ng   // it is destroyed, i.e. in a normal full exit. The ThreadPoolExecutor
130564481aeSAndrew Ng   // destructor ensures it has been stopped and waits for worker threads to
131564481aeSAndrew Ng   // finish. The wait is important as it prevents intermittent crashes on
132564481aeSAndrew Ng   // Windows when the process is doing a full exit.
133564481aeSAndrew Ng   //
134564481aeSAndrew Ng   // The Windows crashes appear to only occur with the MSVC static runtimes and
135564481aeSAndrew Ng   // are more frequent with the debug static runtime.
136564481aeSAndrew Ng   //
137564481aeSAndrew Ng   // This also prevents intermittent deadlocks on exit with the MinGW runtime.
138eb4663d8SFangrui Song 
139eb4663d8SFangrui Song   static ManagedStatic<ThreadPoolExecutor, ThreadPoolExecutor::Creator,
140564481aeSAndrew Ng                        ThreadPoolExecutor::Deleter>
141564481aeSAndrew Ng       ManagedExec;
142564481aeSAndrew Ng   static std::unique_ptr<ThreadPoolExecutor> Exec(&(*ManagedExec));
143564481aeSAndrew Ng   return Exec.get();
1443a57fbd6SZachary Turner }
145d4960032SNico Weber } // namespace
1463a57fbd6SZachary Turner 
147f6a62909SFangrui Song static std::atomic<int> TaskGroupInstances;
148f6a62909SFangrui Song 
149f6a62909SFangrui Song // Latch::sync() called by the dtor may cause one thread to block. If is a dead
150f6a62909SFangrui Song // lock if all threads in the default executor are blocked. To prevent the dead
151f6a62909SFangrui Song // lock, only allow the first TaskGroup to run tasks parallelly. In the scenario
152f6a62909SFangrui Song // of nested parallel_for_each(), only the outermost one runs parallelly.
TaskGroup()153f6a62909SFangrui Song TaskGroup::TaskGroup() : Parallel(TaskGroupInstances++ == 0) {}
~TaskGroup()1547b25fa8cSAlexandre Ganea TaskGroup::~TaskGroup() {
1557b25fa8cSAlexandre Ganea   // We must ensure that all the workloads have finished before decrementing the
1567b25fa8cSAlexandre Ganea   // instances count.
1577b25fa8cSAlexandre Ganea   L.sync();
1587b25fa8cSAlexandre Ganea   --TaskGroupInstances;
1597b25fa8cSAlexandre Ganea }
160f6a62909SFangrui Song 
spawn(std::function<void ()> F)161f6a62909SFangrui Song void TaskGroup::spawn(std::function<void()> F) {
162f6a62909SFangrui Song   if (Parallel) {
1633a57fbd6SZachary Turner     L.inc();
1644290ef54SAndrew Ng     Executor::getDefaultExecutor()->add([&, F = std::move(F)] {
1653a57fbd6SZachary Turner       F();
1663a57fbd6SZachary Turner       L.dec();
1673a57fbd6SZachary Turner     });
168f6a62909SFangrui Song   } else {
169f6a62909SFangrui Song     F();
1703a57fbd6SZachary Turner   }
171f6a62909SFangrui Song }
172f6a62909SFangrui Song 
173f6a62909SFangrui Song } // namespace detail
174f6a62909SFangrui Song } // namespace parallel
175f6a62909SFangrui Song } // namespace llvm
1760f2a48c1SNico Weber #endif // LLVM_ENABLE_THREADS
1778e382ae9SFangrui Song 
parallelFor(size_t Begin,size_t End,llvm::function_ref<void (size_t)> Fn)178*7effcbdaSNico Weber void llvm::parallelFor(size_t Begin, size_t End,
1798e382ae9SFangrui Song                        llvm::function_ref<void(size_t)> Fn) {
1808e382ae9SFangrui Song   // If we have zero or one items, then do not incur the overhead of spinning up
1818e382ae9SFangrui Song   // a task group.  They are surprisingly expensive, and because they do not
1828e382ae9SFangrui Song   // support nested parallelism, a single entry task group can block parallel
1838e382ae9SFangrui Song   // execution underneath them.
1848e382ae9SFangrui Song #if LLVM_ENABLE_THREADS
1858e382ae9SFangrui Song   auto NumItems = End - Begin;
1868e382ae9SFangrui Song   if (NumItems > 1 && parallel::strategy.ThreadsRequested != 1) {
1878e382ae9SFangrui Song     // Limit the number of tasks to MaxTasksPerGroup to limit job scheduling
1888e382ae9SFangrui Song     // overhead on large inputs.
1898e382ae9SFangrui Song     auto TaskSize = NumItems / parallel::detail::MaxTasksPerGroup;
1908e382ae9SFangrui Song     if (TaskSize == 0)
1918e382ae9SFangrui Song       TaskSize = 1;
1928e382ae9SFangrui Song 
1938e382ae9SFangrui Song     parallel::detail::TaskGroup TG;
1948e382ae9SFangrui Song     for (; Begin + TaskSize < End; Begin += TaskSize) {
1958e382ae9SFangrui Song       TG.spawn([=, &Fn] {
1968e382ae9SFangrui Song         for (size_t I = Begin, E = Begin + TaskSize; I != E; ++I)
1978e382ae9SFangrui Song           Fn(I);
1988e382ae9SFangrui Song       });
1998e382ae9SFangrui Song     }
2008e382ae9SFangrui Song     for (; Begin != End; ++Begin)
2018e382ae9SFangrui Song       Fn(Begin);
2028e382ae9SFangrui Song     return;
2038e382ae9SFangrui Song   }
2048e382ae9SFangrui Song #endif
2058e382ae9SFangrui Song 
2068e382ae9SFangrui Song   for (; Begin != End; ++Begin)
2078e382ae9SFangrui Song     Fn(Begin);
2088e382ae9SFangrui Song }
209