1 //===-- llvm/Support/TaskQueue.h - A TaskQueue implementation ---*- C++ -*-===// 2 // 3 // The LLVM Compiler Infrastructure 4 // 5 // This file is distributed under the University of Illinois Open Source 6 // License. See LICENSE.TXT for details. 7 // 8 //===----------------------------------------------------------------------===// 9 // 10 // This file defines a crude C++11 based task queue. 11 // 12 //===----------------------------------------------------------------------===// 13 14 #ifndef LLVM_SUPPORT_TASK_QUEUE_H 15 #define LLVM_SUPPORT_TASK_QUEUE_H 16 17 #include "llvm/Config/llvm-config.h" 18 #include "llvm/Support/ThreadPool.h" 19 #include "llvm/Support/thread.h" 20 21 #include <atomic> 22 #include <cassert> 23 #include <condition_variable> 24 #include <deque> 25 #include <functional> 26 #include <future> 27 #include <memory> 28 #include <mutex> 29 #include <utility> 30 31 namespace llvm { 32 /// TaskQueue executes serialized work on a user-defined Thread Pool. It 33 /// guarantees that if task B is enqueued after task A, task B begins after 34 /// task A completes and there is no overlap between the two. 35 class TaskQueue { 36 // Because we don't have init capture to use move-only local variables that 37 // are captured into a lambda, we create the promise inside an explicit 38 // callable struct. We want to do as much of the wrapping in the 39 // type-specialized domain (before type erasure) and then erase this into a 40 // std::function. 41 template <typename Callable> struct Task { 42 using ResultTy = typename std::result_of<Callable()>::type; TaskTask43 explicit Task(Callable C, TaskQueue &Parent) 44 : C(std::move(C)), P(std::make_shared<std::promise<ResultTy>>()), 45 Parent(&Parent) {} 46 47 template<typename T> invokeCallbackAndSetPromiseTask48 void invokeCallbackAndSetPromise(T*) { 49 P->set_value(C()); 50 } 51 invokeCallbackAndSetPromiseTask52 void invokeCallbackAndSetPromise(void*) { 53 C(); 54 P->set_value(); 55 } 56 operatorTask57 void operator()() noexcept { 58 ResultTy *Dummy = nullptr; 59 invokeCallbackAndSetPromise(Dummy); 60 Parent->completeTask(); 61 } 62 63 Callable C; 64 std::shared_ptr<std::promise<ResultTy>> P; 65 TaskQueue *Parent; 66 }; 67 68 public: 69 /// Construct a task queue with no work. TaskQueue(ThreadPool & Scheduler)70 TaskQueue(ThreadPool &Scheduler) : Scheduler(Scheduler) { (void)Scheduler; } 71 72 /// Blocking destructor: the queue will wait for all work to complete. ~TaskQueue()73 ~TaskQueue() { 74 Scheduler.wait(); 75 assert(Tasks.empty()); 76 } 77 78 /// Asynchronous submission of a task to the queue. The returned future can be 79 /// used to wait for the task (and all previous tasks that have not yet 80 /// completed) to finish. 81 template <typename Callable> async(Callable && C)82 std::future<typename std::result_of<Callable()>::type> async(Callable &&C) { 83 #if !LLVM_ENABLE_THREADS 84 static_assert(false, 85 "TaskQueue requires building with LLVM_ENABLE_THREADS!"); 86 #endif 87 Task<Callable> T{std::move(C), *this}; 88 using ResultTy = typename std::result_of<Callable()>::type; 89 std::future<ResultTy> F = T.P->get_future(); 90 { 91 std::lock_guard<std::mutex> Lock(QueueLock); 92 // If there's already a task in flight, just queue this one up. If 93 // there is not a task in flight, bypass the queue and schedule this 94 // task immediately. 95 if (IsTaskInFlight) 96 Tasks.push_back(std::move(T)); 97 else { 98 Scheduler.async(std::move(T)); 99 IsTaskInFlight = true; 100 } 101 } 102 return std::move(F); 103 } 104 105 private: completeTask()106 void completeTask() { 107 // We just completed a task. If there are no more tasks in the queue, 108 // update IsTaskInFlight to false and stop doing work. Otherwise 109 // schedule the next task (while not holding the lock). 110 std::function<void()> Continuation; 111 { 112 std::lock_guard<std::mutex> Lock(QueueLock); 113 if (Tasks.empty()) { 114 IsTaskInFlight = false; 115 return; 116 } 117 118 Continuation = std::move(Tasks.front()); 119 Tasks.pop_front(); 120 } 121 Scheduler.async(std::move(Continuation)); 122 } 123 124 /// The thread pool on which to run the work. 125 ThreadPool &Scheduler; 126 127 /// State which indicates whether the queue currently is currently processing 128 /// any work. 129 bool IsTaskInFlight = false; 130 131 /// Mutex for synchronizing access to the Tasks array. 132 std::mutex QueueLock; 133 134 /// Tasks waiting for execution in the queue. 135 std::deque<std::function<void()>> Tasks; 136 }; 137 } // namespace llvm 138 139 #endif // LLVM_SUPPORT_TASK_QUEUE_H 140