1 // The MIT License (MIT)
2 //
3 // 	Copyright (c) 2015 Sergey Makeev, Vadim Slyusarev
4 //
5 // 	Permission is hereby granted, free of charge, to any person obtaining a copy
6 // 	of this software and associated documentation files (the "Software"), to deal
7 // 	in the Software without restriction, including without limitation the rights
8 // 	to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // 	copies of the Software, and to permit persons to whom the Software is
10 // 	furnished to do so, subject to the following conditions:
11 //
12 //  The above copyright notice and this permission notice shall be included in
13 // 	all copies or substantial portions of the Software.
14 //
15 // 	THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // 	IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // 	FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // 	AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // 	LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // 	OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21 // 	THE SOFTWARE.
22 
23 #pragma once
24 
25 #include <MTConfig.h>
26 #include <MTColorTable.h>
27 #include <MTTools.h>
28 #include <MTPlatform.h>
29 #include <MTConcurrentQueueLIFO.h>
30 #include <MTStackArray.h>
31 #include <MTArrayView.h>
32 #include <MTThreadContext.h>
33 #include <MTFiberContext.h>
34 #include <MTAppInterop.h>
35 #include <MTTaskPool.h>
36 #include <MTStackRequirements.h>
37 #include <Scopes/MTScopes.h>
38 
39 
40 namespace MT
41 {
42 
43 	template<typename CLASS_TYPE, typename MACRO_TYPE>
44 	struct CheckType
45 	{
46 		static_assert(std::is_same<CLASS_TYPE, MACRO_TYPE>::value, "Invalid type in MT_DECLARE_TASK macro. See CheckType template instantiation params to details.");
47 	};
48 
49 	struct TypeChecker
50 	{
51 		template <typename T>
52 		static T QueryThisType(T thisPtr)
53 		{
54 			MT_UNUSED(thisPtr);
55 			return (T)nullptr;
56 		}
57 	};
58 
59 
60 	template <typename T>
61 	inline void CallDtor(T* p)
62 	{
63 		MT_UNUSED(p);
64 		p->~T();
65 	}
66 
67 }
68 
69 #if MT_MSVC_COMPILER_FAMILY
70 
71 // Visual Studio compile time check
72 #define MT_COMPILE_TIME_TYPE_CHECK(TYPE) \
73 	void CompileTimeCheckMethod() \
74 	{ \
75 		MT::CheckType< typename std::remove_pointer< decltype(MT::TypeChecker::QueryThisType(this)) >::type, typename TYPE > compileTypeTypesCheck; \
76 		compileTypeTypesCheck; \
77 	}
78 
79 #elif MT_GCC_COMPILER_FAMILY
80 
81 // GCC, Clang and other compilers compile time check
82 #define MT_COMPILE_TIME_TYPE_CHECK(TYPE) \
83 	void CompileTimeCheckMethod() \
84 	{ \
85 		/* query this pointer type */ \
86 		typedef decltype(MT::TypeChecker::QueryThisType(this)) THIS_PTR_TYPE; \
87 		/* query class type from this pointer type */ \
88 		typedef typename std::remove_pointer<THIS_PTR_TYPE>::type CPP_TYPE; \
89 		/* define macro type */ \
90 		typedef TYPE MACRO_TYPE; \
91 		/* compile time checking that is same types */ \
92 		MT::CheckType< CPP_TYPE, MACRO_TYPE > compileTypeTypesCheck; \
93 		/* remove unused variable warning */ \
94 		MT_UNUSED(compileTypeTypesCheck); \
95 	}
96 
97 #else
98 
99 #error Platform is not supported.
100 
101 #endif
102 
103 
104 
105 
106 #define MT_DECLARE_TASK_IMPL(TYPE, STACK_REQUIREMENTS) \
107 	\
108 	MT_COMPILE_TIME_TYPE_CHECK(TYPE) \
109 	\
110 	static void TaskEntryPoint(MT::FiberContext& fiberContext, const void* userData) \
111 	{ \
112 		/* C style cast */ \
113 		TYPE * task = (TYPE *)(userData); \
114 		task->Do(fiberContext); \
115 	} \
116 	\
117 	static void PoolTaskDestroy(const void* userData) \
118 	{ \
119 		/* C style cast */ \
120 		TYPE * task = (TYPE *)(userData); \
121 		MT::CallDtor( task ); \
122 		/* Find task pool header */ \
123 		MT::PoolElementHeader * poolHeader = (MT::PoolElementHeader *)((char*)userData - sizeof(MT::PoolElementHeader)); \
124 		/* Fixup pool header, mark task as unused */ \
125 		poolHeader->id.Store(MT::TaskID::UNUSED); \
126 	} \
127 	\
128 	static MT::StackRequirements::Type GetStackRequirements() \
129 	{ \
130 		return STACK_REQUIREMENTS; \
131 	} \
132 
133 
134 
135 #ifdef MT_INSTRUMENTED_BUILD
136 #include <MTProfilerEventListener.h>
137 
138 #define MT_DECLARE_TASK(TYPE, STACK_REQUIREMENTS, DEBUG_COLOR) \
139 	static const mt_char* GetDebugID() \
140 	{ \
141 		return MT_TEXT( #TYPE ); \
142 	} \
143 	\
144 	static MT::Color::Type GetDebugColor() \
145 	{ \
146 		return DEBUG_COLOR; \
147 	} \
148 	\
149 	MT_DECLARE_TASK_IMPL(TYPE, STACK_REQUIREMENTS);
150 
151 
152 #else
153 
154 #define MT_DECLARE_TASK(TYPE, STACK_REQUIREMENTS, DEBUG_COLOR) \
155 	MT_DECLARE_TASK_IMPL(TYPE, STACK_REQUIREMENTS);
156 
157 #endif
158 
159 
160 
161 
162 
163 
164 namespace MT
165 {
166 	const uint32 MT_MAX_THREAD_COUNT = 64;
167 	const uint32 MT_SCHEDULER_STACK_SIZE = 1048576; // 1Mb
168 
169 	const uint32 MT_MAX_STANDART_FIBERS_COUNT = 256;
170 	const uint32 MT_STANDART_FIBER_STACK_SIZE = 32768; //32Kb
171 
172 	const uint32 MT_MAX_EXTENDED_FIBERS_COUNT = 8;
173 	const uint32 MT_EXTENDED_FIBER_STACK_SIZE = 1048576; // 1Mb
174 
175 	namespace internal
176 	{
177 		struct ThreadContext;
178 	}
179 
180 	struct WorkerThreadParams
181 	{
182 		uint32 core;
183 		ThreadPriority::Type priority;
184 
185 		WorkerThreadParams()
186 			: core(MT_CPUCORE_ANY)
187 			, priority(ThreadPriority::DEFAULT)
188 		{
189 		}
190 	};
191 
192 	////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
193 	// Task scheduler
194 	////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
195 	class TaskScheduler
196 	{
197 		friend class FiberContext;
198 		friend struct internal::ThreadContext;
199 
200 
201 
202 		////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
203 		// Task group description
204 		////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
205 		// Application can assign task group to task and later wait until group was finished.
206 		class TaskGroupDescription
207 		{
208 			Atomic32<int32> inProgressTaskCount;
209 			Event allDoneEvent;
210 
211 			//Tasks awaiting group through FiberContext::WaitGroupAndYield call
212 			ConcurrentQueueLIFO<FiberContext*> waitTasksQueue;
213 
214 			bool debugIsFree;
215 
216 		public:
217 
218 			MT_NOCOPYABLE(TaskGroupDescription);
219 
220 			TaskGroupDescription()
221 			{
222 				inProgressTaskCount.Store(0);
223 				allDoneEvent.Create( EventReset::MANUAL, true );
224 				debugIsFree = true;
225 			}
226 
227 			int GetTaskCount() const
228 			{
229 				return inProgressTaskCount.Load();
230 			}
231 
232 			ConcurrentQueueLIFO<FiberContext*> & GetWaitQueue()
233 			{
234 				return waitTasksQueue;
235 			}
236 
237 			int Dec()
238 			{
239 				return inProgressTaskCount.DecFetch();
240 			}
241 
242 			int Inc()
243 			{
244 				return inProgressTaskCount.IncFetch();
245 			}
246 
247 			int Add(int sum)
248 			{
249 				return inProgressTaskCount.AddFetch(sum);
250 			}
251 
252 			void Signal()
253 			{
254 				allDoneEvent.Signal();
255 			}
256 
257 			void Reset()
258 			{
259 				allDoneEvent.Reset();
260 			}
261 
262 			bool Wait(uint32 milliseconds)
263 			{
264 				return allDoneEvent.Wait(milliseconds);
265 			}
266 
267 			void SetDebugIsFree(bool _debugIsFree)
268 			{
269 				debugIsFree = _debugIsFree;
270 			}
271 
272 			bool GetDebugIsFree() const
273 			{
274 				return debugIsFree;
275 			}
276 		};
277 
278 
279 		// Thread index for new task
280 		Atomic32<int32> roundRobinThreadIndex;
281 
282 		// Started threads count
283 		Atomic32<int32> startedThreadsCount;
284 
285 		// Threads created by task manager
286 		Atomic32<int32> threadsCount;
287 		internal::ThreadContext threadContext[MT_MAX_THREAD_COUNT];
288 
289 		// All groups task statistic
290 		TaskGroupDescription allGroups;
291 
292 		// Groups pool
293 		ConcurrentQueueLIFO<TaskGroup> availableGroups;
294 
295 		//
296 		TaskGroupDescription groupStats[TaskGroup::MT_MAX_GROUPS_COUNT];
297 
298 		// Fibers context
299 		FiberContext standartFiberContexts[MT_MAX_STANDART_FIBERS_COUNT];
300 		FiberContext extendedFiberContexts[MT_MAX_EXTENDED_FIBERS_COUNT];
301 
302 		// Fibers pool
303 		ConcurrentQueueLIFO<FiberContext*> standartFibersAvailable;
304 		ConcurrentQueueLIFO<FiberContext*> extendedFibersAvailable;
305 
306 		ConcurrentQueueLIFO<FiberContext*>* GetFibersStorage(MT::StackRequirements::Type stackRequirements);
307 
308 #ifdef MT_INSTRUMENTED_BUILD
309 		IProfilerEventListener * profilerEventListener;
310 #endif
311 
312 		FiberContext* RequestFiberContext(internal::GroupedTask& task);
313 		void ReleaseFiberContext(FiberContext* fiberExecutionContext);
314 		void RunTasksImpl(ArrayView<internal::TaskBucket>& buckets, FiberContext * parentFiber, bool restoredFromAwaitState);
315 		TaskGroupDescription & GetGroupDesc(TaskGroup group);
316 
317 		static void WorkerThreadMain( void* userData );
318 		static void SchedulerFiberMain( void* userData );
319 		static void FiberMain( void* userData );
320 		static bool TryStealTask(internal::ThreadContext& threadContext, internal::GroupedTask & task, uint32 workersCount);
321 
322 		static FiberContext* ExecuteTask (internal::ThreadContext& threadContext, FiberContext* fiberContext);
323 
324 	public:
325 
326 		/// \brief Initializes a new instance of the TaskScheduler class.
327 		/// \param workerThreadsCount Worker threads count. Automatically determines the required number of threads if workerThreadsCount set to 0
328 #ifdef MT_INSTRUMENTED_BUILD
329 		TaskScheduler(uint32 workerThreadsCount = 0, WorkerThreadParams* workerParameters = nullptr, IProfilerEventListener* listener = nullptr);
330 #else
331 		TaskScheduler(uint32 workerThreadsCount = 0, WorkerThreadParams* workerParameters = nullptr);
332 #endif
333 
334 
335 		~TaskScheduler();
336 
337 		template<class TTask>
338 		void RunAsync(TaskGroup group, const TTask* taskArray, uint32 taskCount);
339 
340 		void RunAsync(TaskGroup group, const TaskHandle* taskHandleArray, uint32 taskHandleCount);
341 
342 		/// \brief Wait while no more tasks in specific group.
343 		/// \return true - if no more tasks in specific group. false - if timeout in milliseconds has reached and group still has some tasks.
344 		bool WaitGroup(TaskGroup group, uint32 milliseconds);
345 
346 		bool WaitAll(uint32 milliseconds);
347 
348 		TaskGroup CreateGroup();
349 		void ReleaseGroup(TaskGroup group);
350 
351 		bool IsEmpty();
352 
353 		int32 GetWorkersCount() const;
354 
355 		bool IsWorkerThread() const;
356 
357 #ifdef MT_INSTRUMENTED_BUILD
358 
359 		inline IProfilerEventListener* GetProfilerEventListener()
360 		{
361 			return profilerEventListener;
362 		}
363 
364 #endif
365 	};
366 }
367 
368 #include "MTScheduler.inl"
369 #include "MTFiberContext.inl"
370