1 // The MIT License (MIT) 2 // 3 // Copyright (c) 2015 Sergey Makeev, Vadim Slyusarev 4 // 5 // Permission is hereby granted, free of charge, to any person obtaining a copy 6 // of this software and associated documentation files (the "Software"), to deal 7 // in the Software without restriction, including without limitation the rights 8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 9 // copies of the Software, and to permit persons to whom the Software is 10 // furnished to do so, subject to the following conditions: 11 // 12 // The above copyright notice and this permission notice shall be included in 13 // all copies or substantial portions of the Software. 14 // 15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 21 // THE SOFTWARE. 22 23 #pragma once 24 25 #include <MTTools.h> 26 #include <MTPlatform.h> 27 #include <MTConcurrentQueueLIFO.h> 28 #include <MTStackArray.h> 29 #include <MTArrayView.h> 30 #include <MTThreadContext.h> 31 #include <MTFiberContext.h> 32 #include <MTAllocator.h> 33 #include <MTTaskPool.h> 34 35 36 namespace MT 37 { 38 39 template<typename CLASS_TYPE, typename MACRO_TYPE> 40 struct CheckType 41 { 42 static_assert(std::is_same<CLASS_TYPE, MACRO_TYPE>::value, "Invalid type in MT_DECLARE_TASK macro. See CheckType template instantiation params to details."); 43 }; 44 45 struct TypeChecker 46 { 47 template <typename T> 48 static T QueryThisType(T thisPtr) 49 { 50 return (T)nullptr; 51 } 52 }; 53 54 55 template <typename T> 56 inline void CallDtor(T * p) 57 { 58 #if _MSC_VER 59 p; 60 #endif 61 p->~T(); 62 } 63 64 } 65 66 #define MT_COLOR_DEFAULT (0) 67 #define MT_COLOR_BLUE (1) 68 #define MT_COLOR_RED (2) 69 #define MT_COLOR_YELLOW (3) 70 71 #if _MSC_VER 72 73 // Visual Studio compile time check 74 #define COMPILE_TIME_TYPE_CHECK(TYPE) \ 75 void CompileTimeCheckMethod() \ 76 { \ 77 MT::CheckType< typename std::remove_pointer< decltype(MT::TypeChecker::QueryThisType(this)) >::type, typename TYPE > compileTypeTypesCheck; \ 78 compileTypeTypesCheck; \ 79 } 80 81 #else 82 83 // GCC, Clang and other compilers compile time check 84 #define COMPILE_TIME_TYPE_CHECK(TYPE) \ 85 void CompileTimeCheckMethod() \ 86 { \ 87 /* query this pointer type */ \ 88 typedef decltype(MT::TypeChecker::QueryThisType(this)) THIS_PTR_TYPE; \ 89 /* query class type from this pointer type */ \ 90 typedef typename std::remove_pointer<THIS_PTR_TYPE>::type CPP_TYPE; \ 91 /* define macro type */ \ 92 typedef TYPE MACRO_TYPE; \ 93 /* compile time checking that is same types */ \ 94 MT::CheckType< CPP_TYPE, MACRO_TYPE > compileTypeTypesCheck; \ 95 /* remove unused variable warning */ \ 96 compileTypeTypesCheck; \ 97 } 98 99 #endif 100 101 102 103 104 #define MT_DECLARE_TASK_IMPL(TYPE) \ 105 \ 106 COMPILE_TIME_TYPE_CHECK(TYPE) \ 107 \ 108 static void TaskEntryPoint(MT::FiberContext& fiberContext, void* userData) \ 109 { \ 110 TYPE * task = static_cast< TYPE *>(userData); \ 111 task->Do(fiberContext); \ 112 } \ 113 \ 114 static void PoolTaskDestroy(void* userData) \ 115 { \ 116 TYPE * task = static_cast< TYPE *>(userData); \ 117 MT::CallDtor( task ); \ 118 /* Find task pool header */ \ 119 MT::PoolElementHeader * poolHeader = (MT::PoolElementHeader *)((char*)userData - sizeof(MT::PoolElementHeader)); \ 120 /* Fixup pool header, mark task as unused */ \ 121 poolHeader->id.Store(MT::TaskID::UNUSED); \ 122 } \ 123 124 125 126 #ifdef MT_INSTRUMENTED_BUILD 127 #include <MTMicroWebSrv.h> 128 #include <MTProfilerEventListener.h> 129 130 #define MT_DECLARE_TASK(TYPE, colorID) \ 131 static const mt_char* GetDebugID() \ 132 { \ 133 return MT_TEXT( #TYPE ); \ 134 } \ 135 \ 136 static int GetDebugColorIndex() \ 137 { \ 138 return colorID; \ 139 } \ 140 \ 141 MT_DECLARE_TASK_IMPL(TYPE); 142 143 144 #else 145 146 #define MT_DECLARE_TASK(TYPE, colorID) \ 147 MT_DECLARE_TASK_IMPL(TYPE); 148 149 #endif 150 151 152 153 154 155 156 namespace MT 157 { 158 const uint32 MT_MAX_THREAD_COUNT = 64; 159 const uint32 MT_MAX_FIBERS_COUNT = 256; 160 const uint32 MT_SCHEDULER_STACK_SIZE = 1048576; 161 const uint32 MT_FIBER_STACK_SIZE = 65536; 162 163 namespace internal 164 { 165 struct ThreadContext; 166 } 167 168 //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// 169 // Task scheduler 170 //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// 171 class TaskScheduler 172 { 173 friend class FiberContext; 174 friend struct internal::ThreadContext; 175 176 177 178 //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// 179 // Task group description 180 //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// 181 // Application can assign task group to task and later wait until group was finished. 182 class TaskGroupDescription 183 { 184 AtomicInt32 inProgressTaskCount; 185 Event allDoneEvent; 186 187 //Tasks awaiting group through FiberContext::WaitGroupAndYield call 188 ConcurrentQueueLIFO<FiberContext*> waitTasksQueue; 189 190 public: 191 192 bool debugIsFree; 193 194 195 private: 196 197 TaskGroupDescription(TaskGroupDescription& ) {} 198 void operator=(const TaskGroupDescription&) {} 199 200 public: 201 202 TaskGroupDescription() 203 { 204 inProgressTaskCount.Store(0); 205 allDoneEvent.Create( EventReset::MANUAL, true ); 206 debugIsFree = true; 207 } 208 209 int GetTaskCount() const 210 { 211 return inProgressTaskCount.Load(); 212 } 213 214 ConcurrentQueueLIFO<FiberContext*> & GetWaitQueue() 215 { 216 return waitTasksQueue; 217 } 218 219 int Dec() 220 { 221 return inProgressTaskCount.DecFetch(); 222 } 223 224 int Inc() 225 { 226 return inProgressTaskCount.IncFetch(); 227 } 228 229 int Add(int sum) 230 { 231 return inProgressTaskCount.AddFetch(sum); 232 } 233 234 void Signal() 235 { 236 allDoneEvent.Signal(); 237 } 238 239 void Reset() 240 { 241 allDoneEvent.Reset(); 242 } 243 244 bool Wait(uint32 milliseconds) 245 { 246 return allDoneEvent.Wait(milliseconds); 247 } 248 }; 249 250 251 // Thread index for new task 252 AtomicInt32 roundRobinThreadIndex; 253 254 // Started threads count 255 AtomicInt32 startedThreadsCount; 256 257 // Threads created by task manager 258 volatile uint32 threadsCount; 259 internal::ThreadContext threadContext[MT_MAX_THREAD_COUNT]; 260 261 // All groups task statistic 262 TaskGroupDescription allGroups; 263 264 // Groups pool 265 ConcurrentQueueLIFO<TaskGroup> availableGroups; 266 267 // 268 TaskGroupDescription groupStats[TaskGroup::MT_MAX_GROUPS_COUNT]; 269 270 // Fibers pool 271 ConcurrentQueueLIFO<FiberContext*> availableFibers; 272 273 // Fibers context 274 FiberContext fiberContext[MT_MAX_FIBERS_COUNT]; 275 276 #ifdef MT_INSTRUMENTED_BUILD 277 IProfilerEventListener * profilerEventListener; 278 int64 startTime; 279 profile::MicroWebServer profilerWebServer; 280 int32 webServerPort; 281 #endif 282 283 FiberContext* RequestFiberContext(internal::GroupedTask& task); 284 void ReleaseFiberContext(FiberContext* fiberExecutionContext); 285 void RunTasksImpl(ArrayView<internal::TaskBucket>& buckets, FiberContext * parentFiber, bool restoredFromAwaitState); 286 TaskGroupDescription & GetGroupDesc(TaskGroup group); 287 288 static void ThreadMain( void* userData ); 289 static void FiberMain( void* userData ); 290 static bool TryStealTask(internal::ThreadContext& threadContext, internal::GroupedTask & task, uint32 workersCount); 291 292 static FiberContext* ExecuteTask (internal::ThreadContext& threadContext, FiberContext* fiberContext); 293 294 public: 295 296 /// \brief Initializes a new instance of the TaskScheduler class. 297 /// \param workerThreadsCount Worker threads count. Automatically determines the required number of threads if workerThreadsCount set to 0 298 #ifdef MT_INSTRUMENTED_BUILD 299 TaskScheduler(uint32 workerThreadsCount = 0, IProfilerEventListener* listener = nullptr); 300 #else 301 TaskScheduler(uint32 workerThreadsCount = 0); 302 #endif 303 304 305 ~TaskScheduler(); 306 307 template<class TTask> 308 void RunAsync(TaskGroup group, TTask* taskArray, uint32 taskCount); 309 310 void RunAsync(TaskGroup group, TaskHandle* taskHandleArray, uint32 taskHandleCount); 311 312 313 bool WaitGroup(TaskGroup group, uint32 milliseconds); 314 bool WaitAll(uint32 milliseconds); 315 316 TaskGroup CreateGroup(); 317 void ReleaseGroup(TaskGroup group); 318 319 bool IsEmpty(); 320 321 uint32 GetWorkerCount() const; 322 323 bool IsWorkerThread() const; 324 325 #ifdef MT_INSTRUMENTED_BUILD 326 327 size_t GetProfilerEvents(uint32 workerIndex, MT::ProfileEventDesc * dstBuffer, size_t dstBufferSize); 328 void UpdateProfiler(); 329 int32 GetWebServerPort() const; 330 331 inline int64 GetStartTime() const 332 { 333 return startTime; 334 } 335 336 inline uint64 GetTimeStamp() const 337 { 338 return MT::GetTimeMicroSeconds() - startTime; 339 } 340 341 inline IProfilerEventListener* GetProfilerEventListener() 342 { 343 return profilerEventListener; 344 } 345 346 #endif 347 }; 348 } 349 350 #include "MTScheduler.inl" 351 #include "MTFiberContext.inl" 352