1 // The MIT License (MIT)
2 //
3 // 	Copyright (c) 2015 Sergey Makeev, Vadim Slyusarev
4 //
5 // 	Permission is hereby granted, free of charge, to any person obtaining a copy
6 // 	of this software and associated documentation files (the "Software"), to deal
7 // 	in the Software without restriction, including without limitation the rights
8 // 	to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // 	copies of the Software, and to permit persons to whom the Software is
10 // 	furnished to do so, subject to the following conditions:
11 //
12 //  The above copyright notice and this permission notice shall be included in
13 // 	all copies or substantial portions of the Software.
14 //
15 // 	THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // 	IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // 	FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // 	AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // 	LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // 	OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21 // 	THE SOFTWARE.
22 
23 #pragma once
24 
25 #include <MTTools.h>
26 #include <MTPlatform.h>
27 #include <MTConcurrentQueueLIFO.h>
28 #include <MTStackArray.h>
29 #include <MTArrayView.h>
30 #include <MTThreadContext.h>
31 #include <MTFiberContext.h>
32 #include <MTAllocator.h>
33 #include <MTTaskPool.h>
34 
35 
36 namespace MT
37 {
38 
39 	template<typename CLASS_TYPE, typename MACRO_TYPE>
40 	struct CheckType
41 	{
42 		static_assert(std::is_same<CLASS_TYPE, MACRO_TYPE>::value, "Invalid type in MT_DECLARE_TASK macro. See CheckType template instantiation params to details.");
43 	};
44 
45 	struct TypeChecker
46 	{
47 		template <typename T>
48 		static T QueryThisType(T thisPtr)
49 		{
50 			return (T)nullptr;
51 		}
52 	};
53 
54 
55 	template <typename T>
56 	inline void CallDtor(T * p)
57 	{
58 #if _MSC_VER
59 		p;
60 #endif
61 		p->~T();
62 	}
63 
64 }
65 
66 #define MT_COLOR_DEFAULT (0)
67 #define MT_COLOR_BLUE (1)
68 #define MT_COLOR_RED (2)
69 #define MT_COLOR_YELLOW (3)
70 
71 #if _MSC_VER
72 
73 // Visual Studio compile time check
74 #define COMPILE_TIME_TYPE_CHECK(TYPE) \
75 	void CompileTimeCheckMethod() \
76 	{ \
77 		MT::CheckType< typename std::remove_pointer< decltype(MT::TypeChecker::QueryThisType(this)) >::type, typename TYPE > compileTypeTypesCheck; \
78 		compileTypeTypesCheck; \
79 	}
80 
81 #else
82 
83 // GCC, Clang and other compilers compile time check
84 #define COMPILE_TIME_TYPE_CHECK(TYPE) \
85 	void CompileTimeCheckMethod() \
86 	{ \
87 		/* query this pointer type */ \
88 		typedef decltype(MT::TypeChecker::QueryThisType(this)) THIS_PTR_TYPE; \
89 		/* query class type from this pointer type */ \
90 		typedef typename std::remove_pointer<THIS_PTR_TYPE>::type CPP_TYPE; \
91 		/* define macro type */ \
92 		typedef TYPE MACRO_TYPE; \
93 		/* compile time checking that is same types */ \
94 		MT::CheckType< CPP_TYPE, MACRO_TYPE > compileTypeTypesCheck; \
95 		/* remove unused variable warning */ \
96 		compileTypeTypesCheck; \
97 	}
98 
99 #endif
100 
101 
102 
103 
104 #define MT_DECLARE_TASK_IMPL(TYPE) \
105 	\
106 	COMPILE_TIME_TYPE_CHECK(TYPE) \
107 	\
108 	static void TaskEntryPoint(MT::FiberContext& fiberContext, void* userData) \
109 	{ \
110 		TYPE * task = static_cast< TYPE *>(userData); \
111 		task->Do(fiberContext); \
112 	} \
113 	\
114 	static void PoolTaskDestroy(void* userData) \
115 	{ \
116 		TYPE * task = static_cast< TYPE *>(userData); \
117 		MT::CallDtor( task ); \
118 		/* Find task pool header */ \
119 		MT::PoolElementHeader * poolHeader = (MT::PoolElementHeader *)((char*)userData - sizeof(MT::PoolElementHeader)); \
120 		/* Fixup pool header, mark task as unused */ \
121 		poolHeader->id.Store(MT::TaskID::UNUSED); \
122 	} \
123 
124 
125 
126 #ifdef MT_INSTRUMENTED_BUILD
127 #include <MTMicroWebSrv.h>
128 #include <MTProfilerEventListener.h>
129 
130 #define MT_DECLARE_TASK(TYPE, colorID) \
131 	static const mt_char* GetDebugID() \
132 	{ \
133 		return MT_TEXT( #TYPE ); \
134 	} \
135 	\
136 	static int GetDebugColorIndex() \
137 	{ \
138 		return colorID; \
139 	} \
140 	\
141 	MT_DECLARE_TASK_IMPL(TYPE);
142 
143 
144 #else
145 
146 #define MT_DECLARE_TASK(TYPE, colorID) \
147 	MT_DECLARE_TASK_IMPL(TYPE);
148 
149 #endif
150 
151 
152 
153 
154 
155 
156 namespace MT
157 {
158 	const uint32 MT_MAX_THREAD_COUNT = 64;
159 	const uint32 MT_MAX_FIBERS_COUNT = 256;
160 	const uint32 MT_SCHEDULER_STACK_SIZE = 1048576;
161 	const uint32 MT_FIBER_STACK_SIZE = 65536;
162 
163 	namespace internal
164 	{
165 		struct ThreadContext;
166 	}
167 
168 	////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
169 	// Task scheduler
170 	////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
171 	class TaskScheduler
172 	{
173 		friend class FiberContext;
174 		friend struct internal::ThreadContext;
175 
176 
177 
178 		////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
179 		// Task group description
180 		////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
181 		// Application can assign task group to task and later wait until group was finished.
182 		class TaskGroupDescription
183 		{
184 			AtomicInt32 inProgressTaskCount;
185 			Event allDoneEvent;
186 
187 			//Tasks awaiting group through FiberContext::WaitGroupAndYield call
188 			ConcurrentQueueLIFO<FiberContext*> waitTasksQueue;
189 
190 		public:
191 
192 			bool debugIsFree;
193 
194 
195 		private:
196 
197 			TaskGroupDescription(TaskGroupDescription& ) {}
198 			void operator=(const TaskGroupDescription&) {}
199 
200 		public:
201 
202 			TaskGroupDescription()
203 			{
204 				inProgressTaskCount.Store(0);
205 				allDoneEvent.Create( EventReset::MANUAL, true );
206 				debugIsFree = true;
207 			}
208 
209 			int GetTaskCount() const
210 			{
211 				return inProgressTaskCount.Load();
212 			}
213 
214 			ConcurrentQueueLIFO<FiberContext*> & GetWaitQueue()
215 			{
216 				return waitTasksQueue;
217 			}
218 
219 			int Dec()
220 			{
221 				return inProgressTaskCount.DecFetch();
222 			}
223 
224 			int Inc()
225 			{
226 				return inProgressTaskCount.IncFetch();
227 			}
228 
229 			int Add(int sum)
230 			{
231 				return inProgressTaskCount.AddFetch(sum);
232 			}
233 
234 			void Signal()
235 			{
236 				allDoneEvent.Signal();
237 			}
238 
239 			void Reset()
240 			{
241 				allDoneEvent.Reset();
242 			}
243 
244 			bool Wait(uint32 milliseconds)
245 			{
246 				return allDoneEvent.Wait(milliseconds);
247 			}
248 		};
249 
250 
251 		// Thread index for new task
252 		AtomicInt32 roundRobinThreadIndex;
253 
254 		// Started threads count
255 		AtomicInt32 startedThreadsCount;
256 
257 		// Threads created by task manager
258 		volatile uint32 threadsCount;
259 		internal::ThreadContext threadContext[MT_MAX_THREAD_COUNT];
260 
261 		// All groups task statistic
262 		TaskGroupDescription allGroups;
263 
264 		// Groups pool
265 		ConcurrentQueueLIFO<TaskGroup> availableGroups;
266 
267 		//
268 		TaskGroupDescription groupStats[TaskGroup::MT_MAX_GROUPS_COUNT];
269 
270 		// Fibers pool
271 		ConcurrentQueueLIFO<FiberContext*> availableFibers;
272 
273 		// Fibers context
274 		FiberContext fiberContext[MT_MAX_FIBERS_COUNT];
275 
276 #ifdef MT_INSTRUMENTED_BUILD
277 		IProfilerEventListener * profilerEventListener;
278 		int64 startTime;
279 		profile::MicroWebServer profilerWebServer;
280 		int32 webServerPort;
281 #endif
282 
283 		FiberContext* RequestFiberContext(internal::GroupedTask& task);
284 		void ReleaseFiberContext(FiberContext* fiberExecutionContext);
285 		void RunTasksImpl(ArrayView<internal::TaskBucket>& buckets, FiberContext * parentFiber, bool restoredFromAwaitState);
286 		TaskGroupDescription & GetGroupDesc(TaskGroup group);
287 
288 		static void ThreadMain( void* userData );
289 		static void FiberMain( void* userData );
290 		static bool TryStealTask(internal::ThreadContext& threadContext, internal::GroupedTask & task, uint32 workersCount);
291 
292 		static FiberContext* ExecuteTask (internal::ThreadContext& threadContext, FiberContext* fiberContext);
293 
294 	public:
295 
296 		/// \brief Initializes a new instance of the TaskScheduler class.
297 		/// \param workerThreadsCount Worker threads count. Automatically determines the required number of threads if workerThreadsCount set to 0
298 #ifdef MT_INSTRUMENTED_BUILD
299 		TaskScheduler(uint32 workerThreadsCount = 0, IProfilerEventListener* listener = nullptr);
300 #else
301 		TaskScheduler(uint32 workerThreadsCount = 0);
302 #endif
303 
304 
305 		~TaskScheduler();
306 
307 		template<class TTask>
308 		void RunAsync(TaskGroup group, TTask* taskArray, uint32 taskCount);
309 
310 		void RunAsync(TaskGroup group, TaskHandle* taskHandleArray, uint32 taskHandleCount);
311 
312 
313 		bool WaitGroup(TaskGroup group, uint32 milliseconds);
314 		bool WaitAll(uint32 milliseconds);
315 
316 		TaskGroup CreateGroup();
317 		void ReleaseGroup(TaskGroup group);
318 
319 		bool IsEmpty();
320 
321 		uint32 GetWorkerCount() const;
322 
323 		bool IsWorkerThread() const;
324 
325 #ifdef MT_INSTRUMENTED_BUILD
326 
327 		size_t GetProfilerEvents(uint32 workerIndex, MT::ProfileEventDesc * dstBuffer, size_t dstBufferSize);
328 		void UpdateProfiler();
329 		int32 GetWebServerPort() const;
330 
331 		inline int64 GetStartTime() const
332 		{
333 			return startTime;
334 		}
335 
336 		inline uint64 GetTimeStamp() const
337 		{
338 			return MT::GetTimeMicroSeconds() - startTime;
339 		}
340 
341 		inline IProfilerEventListener* GetProfilerEventListener()
342 		{
343 			return profilerEventListener;
344 		}
345 
346 #endif
347 	};
348 }
349 
350 #include "MTScheduler.inl"
351 #include "MTFiberContext.inl"
352