1 // The MIT License (MIT)
2 //
3 // 	Copyright (c) 2015 Sergey Makeev, Vadim Slyusarev
4 //
5 // 	Permission is hereby granted, free of charge, to any person obtaining a copy
6 // 	of this software and associated documentation files (the "Software"), to deal
7 // 	in the Software without restriction, including without limitation the rights
8 // 	to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // 	copies of the Software, and to permit persons to whom the Software is
10 // 	furnished to do so, subject to the following conditions:
11 //
12 //  The above copyright notice and this permission notice shall be included in
13 // 	all copies or substantial portions of the Software.
14 //
15 // 	THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // 	IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // 	FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // 	AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // 	LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // 	OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21 // 	THE SOFTWARE.
22 
23 #include <MTScheduler.h>
24 #include <MTStaticVector.h>
25 #include <string.h> // for memset
26 
27 namespace MT
28 {
29 	mt_thread_local uint32 isWorkerThreadTLS = 0;
30 
31 
32 #ifdef MT_INSTRUMENTED_BUILD
33 	TaskScheduler::TaskScheduler(uint32 workerThreadsCount, WorkerThreadParams* workerParameters, IProfilerEventListener* listener, TaskStealingMode::Type stealMode)
34 #else
35 	TaskScheduler::TaskScheduler(uint32 workerThreadsCount, WorkerThreadParams* workerParameters, TaskStealingMode::Type stealMode)
36 #endif
37 		: roundRobinThreadIndex(0)
38 		, startedThreadsCount(0)
39 		, taskStealingDisabled(stealMode == TaskStealingMode::DISABLED)
40 	{
41 
42 #ifdef MT_INSTRUMENTED_BUILD
43 		profilerEventListener = listener;
44 #endif
45 
46 		if (workerThreadsCount != 0)
47 		{
48 			threadsCount.StoreRelaxed( MT::Clamp(workerThreadsCount, (uint32)1, (uint32)MT_MAX_THREAD_COUNT) );
49 		} else
50 		{
51 			//query number of processor
52 			threadsCount.StoreRelaxed( (uint32)MT::Clamp(Thread::GetNumberOfHardwareThreads() - 1, 1, (int)MT_MAX_THREAD_COUNT) );
53 		}
54 
55 		// create fiber pool (fibers with standard stack size)
56 		for (uint32 i = 0; i < MT_MAX_STANDART_FIBERS_COUNT; i++)
57 		{
58 			FiberContext& context = standartFiberContexts[i];
59 			context.fiber.Create(MT_STANDART_FIBER_STACK_SIZE, FiberMain, &context);
60 			bool res = standartFibersAvailable.TryPush( &context );
61 			MT_USED_IN_ASSERT(res);
62 			MT_ASSERT(res == true, "Can't add fiber to storage");
63 		}
64 
65 		// create fiber pool (fibers with extended stack size)
66 		for (uint32 i = 0; i < MT_MAX_EXTENDED_FIBERS_COUNT; i++)
67 		{
68 			FiberContext& context = extendedFiberContexts[i];
69 			context.fiber.Create(MT_EXTENDED_FIBER_STACK_SIZE, FiberMain, &context);
70 			bool res = extendedFibersAvailable.TryPush( &context );
71 			MT_USED_IN_ASSERT(res);
72 			MT_ASSERT(res == true, "Can't add fiber to storage");
73 		}
74 
75 
76 		for (int16 i = 0; i < TaskGroup::MT_MAX_GROUPS_COUNT; i++)
77 		{
78 			if (i != TaskGroup::DEFAULT)
79 			{
80 				bool res = availableGroups.TryPush( TaskGroup(i) );
81 				MT_USED_IN_ASSERT(res);
82 				MT_ASSERT(res == true, "Can't add group to storage");
83 			}
84 		}
85 
86 #if MT_GROUP_DEBUG
87 		groupStats[TaskGroup::DEFAULT].SetDebugIsFree(false);
88 #endif
89 
90 		// create worker thread pool
91 		int32 totalThreadsCount = GetWorkersCount();
92 		for (int32 i = 0; i < totalThreadsCount; i++)
93 		{
94 			threadContext[i].SetThreadIndex(i);
95 			threadContext[i].taskScheduler = this;
96 
97 			uint32 threadCore = i;
98 			ThreadPriority::Type priority = ThreadPriority::DEFAULT;
99 			if (workerParameters != nullptr)
100 			{
101 				const WorkerThreadParams& params = workerParameters[i];
102 
103 				threadCore = params.core;
104 				priority = params.priority;
105 			}
106 
107 			threadContext[i].thread.Start( MT_SCHEDULER_STACK_SIZE, WorkerThreadMain, &threadContext[i], threadCore, priority);
108 		}
109 	}
110 
111 
112 	TaskScheduler::~TaskScheduler()
113 	{
114 		int32 totalThreadsCount = GetWorkersCount();
115 		for (int32 i = 0; i < totalThreadsCount; i++)
116 		{
117 			threadContext[i].state.Store(internal::ThreadState::EXIT);
118 			threadContext[i].hasNewTasksEvent.Signal();
119 		}
120 
121 		for (int32 i = 0; i < totalThreadsCount; i++)
122 		{
123 			threadContext[i].thread.Join();
124 		}
125 	}
126 
127 	FiberContext* TaskScheduler::RequestFiberContext(internal::GroupedTask& task)
128 	{
129 		FiberContext *fiberContext = task.awaitingFiber;
130 		if (fiberContext)
131 		{
132 			task.awaitingFiber = nullptr;
133 			return fiberContext;
134 		}
135 
136 		MT::StackRequirements::Type stackRequirements = task.desc.stackRequirements;
137 
138 		fiberContext = nullptr;
139 		bool res = false;
140 		MT_USED_IN_ASSERT(res);
141 		switch(stackRequirements)
142 		{
143 		case MT::StackRequirements::STANDARD:
144 			res = standartFibersAvailable.TryPop(fiberContext);
145 			MT_ASSERT(res, "Can't get more standard fibers!");
146 			break;
147 		case MT::StackRequirements::EXTENDED:
148 			res = extendedFibersAvailable.TryPop(fiberContext);
149 			MT_ASSERT(res, "Can't get more extended fibers!");
150 			break;
151 		default:
152 			MT_REPORT_ASSERT("Unknown stack requrements");
153 		}
154 
155 		MT_ASSERT(fiberContext != nullptr, "Can't get more fibers. Too many tasks in flight simultaneously?");
156 
157 		fiberContext->currentTask = task.desc;
158 		fiberContext->currentGroup = task.group;
159 		fiberContext->parentFiber = task.parentFiber;
160 		fiberContext->stackRequirements = stackRequirements;
161 		return fiberContext;
162 	}
163 
164 	void TaskScheduler::ReleaseFiberContext(FiberContext*&& fiberContext)
165 	{
166 		MT_ASSERT(fiberContext, "Can't release nullptr Fiber. fiberContext is nullptr");
167 
168 		MT::StackRequirements::Type stackRequirements = fiberContext->stackRequirements;
169 		fiberContext->Reset();
170 
171 		MT_ASSERT(fiberContext != nullptr, "Fiber context can't be nullptr");
172 
173 		bool res = false;
174 		MT_USED_IN_ASSERT(res);
175 		switch(stackRequirements)
176 		{
177 		case MT::StackRequirements::STANDARD:
178 			res = standartFibersAvailable.TryPush(std::move(fiberContext));
179 			break;
180 		case MT::StackRequirements::EXTENDED:
181 			res = extendedFibersAvailable.TryPush(std::move(fiberContext));
182 			break;
183 		default:
184 			MT_REPORT_ASSERT("Unknown stack requrements");
185 		}
186 
187 		MT_USED_IN_ASSERT(res);
188 		MT_ASSERT(res != false, "Can't return fiber to storage");
189 	}
190 
191 	FiberContext* TaskScheduler::ExecuteTask(internal::ThreadContext& threadContext, FiberContext* fiberContext)
192 	{
193 		MT_ASSERT(threadContext.threadId.IsCurrentThread(), "Thread context sanity check failed");
194 
195 		MT_ASSERT(fiberContext, "Invalid fiber context");
196 		MT_ASSERT(fiberContext->currentTask.IsValid(), "Invalid task");
197 
198 		// Set actual thread context to fiber
199 		fiberContext->SetThreadContext(&threadContext);
200 
201 		// Update task status
202 		fiberContext->SetStatus(FiberTaskStatus::RUNNED);
203 
204 		MT_ASSERT(fiberContext->GetThreadContext()->threadId.IsCurrentThread(), "Thread context sanity check failed");
205 
206 		const void* poolUserData = fiberContext->currentTask.userData;
207 		TPoolTaskDestroy poolDestroyFunc = fiberContext->currentTask.poolDestroyFunc;
208 
209 #ifdef MT_INSTRUMENTED_BUILD
210 		//threadContext.NotifyTaskExecuteStateChanged( MT_SYSTEM_TASK_COLOR, MT_SYSTEM_TASK_NAME, TaskExecuteState::SUSPEND);
211 		threadContext.NotifyTaskExecuteStateChanged( MT_SYSTEM_TASK_COLOR, MT_SYSTEM_TASK_NAME, TaskExecuteState::STOP);
212 #endif
213 
214 		// Run current task code
215 		Fiber::SwitchTo(threadContext.schedulerFiber, fiberContext->fiber);
216 
217 #ifdef MT_INSTRUMENTED_BUILD
218 		//threadContext.NotifyTaskExecuteStateChanged( MT_SYSTEM_TASK_COLOR, MT_SYSTEM_TASK_NAME, TaskExecuteState::RESUME);
219 		threadContext.NotifyTaskExecuteStateChanged( MT_SYSTEM_TASK_COLOR, MT_SYSTEM_TASK_NAME, TaskExecuteState::START);
220 #endif
221 
222 		// If task was done
223 		FiberTaskStatus::Type taskStatus = fiberContext->GetStatus();
224 		if (taskStatus == FiberTaskStatus::FINISHED)
225 		{
226 			//destroy task (call dtor) for "fire and forget" type of task from TaskPool
227 			if (poolDestroyFunc != nullptr)
228 			{
229 				poolDestroyFunc(poolUserData);
230 			}
231 
232 			TaskGroup taskGroup = fiberContext->currentGroup;
233 
234 			TaskScheduler::TaskGroupDescription  & groupDesc = threadContext.taskScheduler->GetGroupDesc(taskGroup);
235 
236 			// Update group status
237 			int groupTaskCount = groupDesc.Dec();
238 			MT_ASSERT(groupTaskCount >= 0, "Sanity check failed!");
239 			if (groupTaskCount == 0)
240 			{
241 				fiberContext->currentGroup = TaskGroup::INVALID;
242 			}
243 
244 			// Update total task count
245 			int allGroupTaskCount = threadContext.taskScheduler->allGroups.Dec();
246 			MT_USED_IN_ASSERT(allGroupTaskCount);
247 			MT_ASSERT(allGroupTaskCount >= 0, "Sanity check failed!");
248 
249 			FiberContext* parentFiberContext = fiberContext->parentFiber;
250 			if (parentFiberContext != nullptr)
251 			{
252 				int childrenFibersCount = parentFiberContext->childrenFibersCount.DecFetch();
253 				MT_ASSERT(childrenFibersCount >= 0, "Sanity check failed!");
254 
255 				if (childrenFibersCount == 0)
256 				{
257 					// This is a last subtask. Restore parent task
258 					MT_ASSERT(threadContext.threadId.IsCurrentThread(), "Thread context sanity check failed");
259 					MT_ASSERT(parentFiberContext->GetThreadContext() == nullptr, "Inactive parent should not have a valid thread context");
260 
261 					// WARNING!! Thread context can changed here! Set actual current thread context.
262 					parentFiberContext->SetThreadContext(&threadContext);
263 
264 					MT_ASSERT(parentFiberContext->GetThreadContext()->threadId.IsCurrentThread(), "Thread context sanity check failed");
265 
266 					// All subtasks is done.
267 					// Exiting and return parent fiber to scheduler
268 					return parentFiberContext;
269 				} else
270 				{
271 					// Other subtasks still exist
272 					// Exiting
273 					return nullptr;
274 				}
275 			} else
276 			{
277 				// Task is finished and no parent task
278 				// Exiting
279 				return nullptr;
280 			}
281 		}
282 
283 		MT_ASSERT(taskStatus != FiberTaskStatus::RUNNED, "Incorrect task status")
284 		return nullptr;
285 	}
286 
287 
288 	void TaskScheduler::FiberMain(void* userData)
289 	{
290 		FiberContext& fiberContext = *(FiberContext*)(userData);
291 		for(;;)
292 		{
293 			MT_ASSERT(fiberContext.currentTask.IsValid(), "Invalid task in fiber context");
294 			MT_ASSERT(fiberContext.GetThreadContext(), "Invalid thread context");
295 			MT_ASSERT(fiberContext.GetThreadContext()->threadId.IsCurrentThread(), "Thread context sanity check failed");
296 
297 #ifdef MT_INSTRUMENTED_BUILD
298 			fiberContext.fiber.SetName( MT_SYSTEM_TASK_FIBER_NAME );
299 			fiberContext.GetThreadContext()->NotifyTaskExecuteStateChanged( fiberContext.currentTask.debugColor, fiberContext.currentTask.debugID, TaskExecuteState::START );
300 #endif
301 
302 			fiberContext.currentTask.taskFunc( fiberContext, fiberContext.currentTask.userData );
303 			fiberContext.SetStatus(FiberTaskStatus::FINISHED);
304 
305 #ifdef MT_INSTRUMENTED_BUILD
306 			fiberContext.fiber.SetName( MT_SYSTEM_TASK_FIBER_NAME );
307 			fiberContext.GetThreadContext()->NotifyTaskExecuteStateChanged( fiberContext.currentTask.debugColor, fiberContext.currentTask.debugID, TaskExecuteState::STOP );
308 #endif
309 
310 			Fiber::SwitchTo(fiberContext.fiber, fiberContext.GetThreadContext()->schedulerFiber);
311 		}
312 
313 	}
314 
315 
316 	bool TaskScheduler::TryStealTask(internal::ThreadContext& threadContext, internal::GroupedTask & task)
317 	{
318 		bool taskStealingDisabled = threadContext.taskScheduler->IsTaskStealingDisabled();
319 		uint32 workersCount = threadContext.taskScheduler->GetWorkersCount();
320 
321 		if (workersCount <= 1 || taskStealingDisabled )
322 		{
323 			return false;
324 		}
325 
326 		uint32 victimIndex = threadContext.random.Get();
327 
328 		for (uint32 attempt = 0; attempt < workersCount; attempt++)
329 		{
330 			uint32 index = victimIndex % workersCount;
331 			if (index == threadContext.workerIndex)
332 			{
333 				victimIndex++;
334 				index = victimIndex % workersCount;
335 			}
336 
337 			internal::ThreadContext& victimContext = threadContext.taskScheduler->threadContext[index];
338 			if (victimContext.queue.TryPopNewest(task))
339 			{
340 				return true;
341 			}
342 
343 			victimIndex++;
344 		}
345 		return false;
346 	}
347 
348 	void TaskScheduler::WorkerThreadMain( void* userData )
349 	{
350 		internal::ThreadContext& context = *(internal::ThreadContext*)(userData);
351 		MT_ASSERT(context.taskScheduler, "Task scheduler must be not null!");
352 
353 		isWorkerThreadTLS = 1;
354 		context.threadId.SetAsCurrentThread();
355 
356 #ifdef MT_INSTRUMENTED_BUILD
357 		const char* threadNames[] = {"worker0","worker1","worker2","worker3","worker4","worker5","worker6","worker7","worker8","worker9","worker10","worker11","worker12"};
358 		if (context.workerIndex < MT_ARRAY_SIZE(threadNames))
359 		{
360 			Thread::SetThreadName(threadNames[context.workerIndex]);
361 		} else
362 		{
363 			Thread::SetThreadName("worker_thread");
364 		}
365 #endif
366 
367 		context.schedulerFiber.CreateFromCurrentThreadAndRun(SchedulerFiberMain, userData);
368 	}
369 
370 
371 	void TaskScheduler::SchedulerFiberWait( void* userData )
372 	{
373 		WaitContext& waitContext = *(WaitContext*)(userData);
374 		internal::ThreadContext& context = *waitContext.threadContext;
375 		MT_ASSERT(context.taskScheduler, "Task scheduler must be not null!");
376 		MT_ASSERT(waitContext.waitCounter, "Wait counter must be not null!");
377 
378 #ifdef MT_INSTRUMENTED_BUILD
379 		context.NotifyWaitStarted();
380 		context.NotifyTaskExecuteStateChanged( MT_SYSTEM_TASK_COLOR, MT_SYSTEM_TASK_NAME, TaskExecuteState::START);
381 #endif
382 
383 		int64 timeOut = GetTimeMicroSeconds() + (waitContext.waitTimeMs * 1000);
384 
385 		int32 idleIteration = 0;
386 
387 		for(;;)
388 		{
389 			if ( SchedulerFiberStep(context) == false )
390 			{
391 				//---- Wait for new tasks using hybrid spin ------------------------
392 				// http://www.1024cores.net/home/lock-free-algorithms/tricks/spinning
393 				//
394 				if (idleIteration < 10)
395 				{
396 					MT::YieldCpu();
397 				} else
398 				{
399 					if (idleIteration < 20)
400 					{
401 						for (int32 i = 0; i < 50; i++)
402 						{
403 							MT::YieldCpu();
404 						}
405 					} else
406 					{
407 						if (idleIteration < 40)
408 						{
409 							MT::YieldThread();
410 						} else
411 						{
412 							if (idleIteration < 100)
413 							{
414 								MT::Thread::Sleep(0);
415 							} else
416 							{
417 								if (idleIteration < 200)
418 								{
419 									MT::Thread::Sleep(1);
420 								} else
421 								{
422 									//MT_REPORT_ASSERT("Sanity check failed. Wait too long (at least 200 ms) and still no tasks to processing");
423 									MT::Thread::Sleep(20);
424 								}
425 							}
426 						}
427 					}
428 				}
429 				//---- spin wait for new tasks ------------------------
430 
431 				idleIteration++;
432 			} else
433 			{
434 				idleIteration = 0;
435 			}
436 
437 			int32 groupTaskCount = waitContext.waitCounter->Load();
438 			if (groupTaskCount == 0)
439 			{
440 				waitContext.exitCode = 0;
441 				break;
442 			}
443 
444 			int64 timeNow = GetTimeMicroSeconds();
445 			if (timeNow >= timeOut)
446 			{
447 				waitContext.exitCode = 1;
448 				break;
449 			}
450 		}
451 
452 #ifdef MT_INSTRUMENTED_BUILD
453 		context.NotifyTaskExecuteStateChanged( MT_SYSTEM_TASK_COLOR, MT_SYSTEM_TASK_NAME, TaskExecuteState::STOP);
454 		context.NotifyWaitFinished();
455 #endif
456 
457 	}
458 
459 	void TaskScheduler::SchedulerFiberMain( void* userData )
460 	{
461 		internal::ThreadContext& context = *(internal::ThreadContext*)(userData);
462 		MT_ASSERT(context.taskScheduler, "Task scheduler must be not null!");
463 
464 #ifdef MT_INSTRUMENTED_BUILD
465 		context.NotifyThreadCreated(context.workerIndex);
466 #endif
467 
468 		int32 totalThreadsCount = context.taskScheduler->threadsCount.LoadRelaxed();
469 		context.taskScheduler->startedThreadsCount.IncFetch();
470 
471 		//Simple spinlock until all threads is started and initialized
472 		for(;;)
473 		{
474 			int32 initializedThreadsCount = context.taskScheduler->startedThreadsCount.Load();
475 			if (initializedThreadsCount == totalThreadsCount)
476 			{
477 				break;
478 			}
479 
480 			// sleep some time until all other thread initialized
481 			Thread::Sleep(1);
482 		}
483 
484 		HardwareFullMemoryBarrier();
485 
486 #ifdef MT_INSTRUMENTED_BUILD
487 		context.NotifyThreadStarted(context.workerIndex);
488 		context.NotifyTaskExecuteStateChanged( MT_SYSTEM_TASK_COLOR, MT_SYSTEM_TASK_NAME, TaskExecuteState::START);
489 #endif
490 
491 		while(context.state.Load() != internal::ThreadState::EXIT)
492 		{
493 			if ( SchedulerFiberStep(context) == false )
494 			{
495 #ifdef MT_INSTRUMENTED_BUILD
496 				context.NotifyThreadIdleStarted(context.workerIndex);
497 #endif
498 				// Queue is empty and stealing attempt has failed
499 				// Wait for new events
500 				context.hasNewTasksEvent.Wait(2000);
501 
502 #ifdef MT_INSTRUMENTED_BUILD
503 				context.NotifyThreadIdleFinished(context.workerIndex);
504 #endif
505 			}
506 
507 		} // main thread loop
508 
509 #ifdef MT_INSTRUMENTED_BUILD
510 		context.NotifyTaskExecuteStateChanged( MT_SYSTEM_TASK_COLOR, MT_SYSTEM_TASK_NAME, TaskExecuteState::STOP);
511 		context.NotifyThreadStoped(context.workerIndex);
512 #endif
513 
514 	}
515 
516 	bool TaskScheduler::SchedulerFiberStep( internal::ThreadContext& context )
517 	{
518 		internal::GroupedTask task;
519 		if (context.queue.TryPopOldest(task) || TryStealTask(context, task) )
520 		{
521 #ifdef MT_INSTRUMENTED_BUILD
522 			bool isNewTask = (task.awaitingFiber == nullptr);
523 #endif
524 
525 			// There is a new task
526 			FiberContext* fiberContext = context.taskScheduler->RequestFiberContext(task);
527 			MT_ASSERT(fiberContext, "Can't get execution context from pool");
528 			MT_ASSERT(fiberContext->currentTask.IsValid(), "Sanity check failed");
529 			MT_ASSERT(fiberContext->stackRequirements == task.desc.stackRequirements, "Sanity check failed");
530 
531 			while(fiberContext)
532 			{
533 #ifdef MT_INSTRUMENTED_BUILD
534 				if (isNewTask)
535 				{
536 					//TODO:
537 					isNewTask = false;
538 				}
539 #endif
540 				// prevent invalid fiber resume from child tasks, before ExecuteTask is done
541 				fiberContext->childrenFibersCount.IncFetch();
542 
543 				FiberContext* parentFiber = ExecuteTask(context, fiberContext);
544 
545 				FiberTaskStatus::Type taskStatus = fiberContext->GetStatus();
546 
547 				//release guard
548 				int childrenFibersCount = fiberContext->childrenFibersCount.DecFetch();
549 
550 				// Can drop fiber context - task is finished
551 				if (taskStatus == FiberTaskStatus::FINISHED)
552 				{
553 					MT_ASSERT( childrenFibersCount == 0, "Sanity check failed");
554 					context.taskScheduler->ReleaseFiberContext(std::move(fiberContext));
555 
556 					// If parent fiber is exist transfer flow control to parent fiber, if parent fiber is null, exit
557 					fiberContext = parentFiber;
558 				} else
559 				{
560 					MT_ASSERT( childrenFibersCount >= 0, "Sanity check failed");
561 
562 					// No subtasks here and status is not finished, this mean all subtasks already finished before parent return from ExecuteTask
563 					if (childrenFibersCount == 0)
564 					{
565 						MT_ASSERT(parentFiber == nullptr, "Sanity check failed");
566 					} else
567 					{
568 						// If subtasks still exist, drop current task execution. task will be resumed when last subtask finished
569 						break;
570 					}
571 
572 					// If task is yielded execution, get another task from queue.
573 					if (taskStatus == FiberTaskStatus::YIELDED)
574 					{
575 						// Task is yielded, add to tasks queue
576 						ArrayView<internal::GroupedTask> buffer(context.descBuffer, 1);
577 						ArrayView<internal::TaskBucket> buckets( MT_ALLOCATE_ON_STACK(sizeof(internal::TaskBucket)), 1 );
578 
579 						FiberContext* yieldedTask = fiberContext;
580 						StaticVector<FiberContext*, 1> yieldedTasksQueue(1, yieldedTask);
581 						internal::DistibuteDescriptions( TaskGroup(TaskGroup::ASSIGN_FROM_CONTEXT), yieldedTasksQueue.Begin(), buffer, buckets );
582 
583 						// add yielded task to scheduler
584 						context.taskScheduler->RunTasksImpl(buckets, nullptr, true);
585 
586 						// ATENTION! yielded task can be already completed at this point
587 
588 						break;
589 					}
590 				}
591 			} //while(fiberContext)
592 
593 			return true;
594 		}
595 
596 		return false;
597 	}
598 
599 	void TaskScheduler::RunTasksImpl(ArrayView<internal::TaskBucket>& buckets, FiberContext * parentFiber, bool restoredFromAwaitState)
600 	{
601 		// This storage is necessary to calculate how many tasks we add to different groups
602 		int newTaskCountInGroup[TaskGroup::MT_MAX_GROUPS_COUNT];
603 
604 		// Default value is 0
605 		memset(&newTaskCountInGroup[0], 0, sizeof(newTaskCountInGroup));
606 
607 		// Set parent fiber pointer
608 		// Calculate the number of tasks per group
609 		// Calculate total number of tasks
610 		size_t count = 0;
611 		for (size_t i = 0; i < buckets.Size(); ++i)
612 		{
613 			internal::TaskBucket& bucket = buckets[i];
614 			for (size_t taskIndex = 0; taskIndex < bucket.count; taskIndex++)
615 			{
616 				internal::GroupedTask & task = bucket.tasks[taskIndex];
617 
618 				task.parentFiber = parentFiber;
619 
620 				int idx = task.group.GetValidIndex();
621 				MT_ASSERT(idx >= 0 && idx < TaskGroup::MT_MAX_GROUPS_COUNT, "Invalid index");
622 				newTaskCountInGroup[idx]++;
623 			}
624 
625 			count += bucket.count;
626 		}
627 
628 		// Increments child fibers count on parent fiber
629 		if (parentFiber)
630 		{
631 			parentFiber->childrenFibersCount.AddFetch((int)count);
632 		}
633 
634 		if (restoredFromAwaitState == false)
635 		{
636 			// Increase the number of active tasks in the group using data from temporary storage
637 			for (size_t i = 0; i < TaskGroup::MT_MAX_GROUPS_COUNT; i++)
638 			{
639 				int groupNewTaskCount = newTaskCountInGroup[i];
640 				if (groupNewTaskCount > 0)
641 				{
642 					groupStats[i].Add((uint32)groupNewTaskCount);
643 				}
644 			}
645 
646 			// Increments all task in progress counter
647 			allGroups.Add((uint32)count);
648 		} else
649 		{
650 			// If task's restored from await state, counters already in correct state
651 		}
652 
653 		// Add to thread queue
654 		for (size_t i = 0; i < buckets.Size(); ++i)
655 		{
656 			int bucketIndex = roundRobinThreadIndex.IncFetch() % threadsCount.LoadRelaxed();
657 			internal::ThreadContext & context = threadContext[bucketIndex];
658 
659 			internal::TaskBucket& bucket = buckets[i];
660 
661 			for(;;)
662 			{
663 				MT_ASSERT(bucket.count < (internal::TASK_BUFFER_CAPACITY - 1), "Sanity check failed. Too many tasks per one bucket.");
664 
665 				bool res = context.queue.Add(bucket.tasks, bucket.count);
666 				if (res == true)
667 				{
668 					break;
669 				}
670 
671 				//Can't add new tasks onto the queue. Look like the job system is overloaded. Wait some time and try again.
672 				//TODO: implement waiting until workers done using events.
673 				Thread::Sleep(10);
674 			}
675 
676 			context.hasNewTasksEvent.Signal();
677 		}
678 	}
679 
680 	void TaskScheduler::RunAsync(TaskGroup group, const TaskHandle* taskHandleArray, uint32 taskHandleCount)
681 	{
682 		MT_ASSERT(!IsWorkerThread(), "Can't use RunAsync inside Task. Use FiberContext.RunAsync() instead.");
683 
684 		ArrayView<internal::GroupedTask> buffer(MT_ALLOCATE_ON_STACK(sizeof(internal::GroupedTask) * taskHandleCount), taskHandleCount);
685 
686 		uint32 bucketCount = MT::Min((uint32)GetWorkersCount(), taskHandleCount);
687 		ArrayView<internal::TaskBucket> buckets(MT_ALLOCATE_ON_STACK(sizeof(internal::TaskBucket) * bucketCount), bucketCount);
688 
689 		internal::DistibuteDescriptions(group, taskHandleArray, buffer, buckets);
690 		RunTasksImpl(buckets, nullptr, false);
691 	}
692 
693 	bool TaskScheduler::WaitGroup(TaskGroup group, uint32 milliseconds)
694 	{
695 		MT_VERIFY(IsWorkerThread() == false, "Can't use WaitGroup inside Task. Use FiberContext.WaitGroupAndYield() instead.", return false);
696 
697 		TaskScheduler::TaskGroupDescription& groupDesc = GetGroupDesc(group);
698 
699 		// Early exit if not tasks in group
700 		int32 taskCount = groupDesc.GetTaskCount();
701 		if (taskCount == 0)
702 		{
703 			return true;
704 		}
705 
706 		size_t bytesCountForDescBuffer = internal::ThreadContext::GetMemoryRequrementInBytesForDescBuffer();
707 		void* descBuffer = MT_ALLOCATE_ON_STACK(bytesCountForDescBuffer);
708 
709 		internal::ThreadContext context(descBuffer);
710 		context.taskScheduler = this;
711 		context.SetThreadIndex(0xFFFFFFFF);
712 		context.threadId.SetAsCurrentThread();
713 
714 		WaitContext waitContext;
715 		waitContext.threadContext = &context;
716 		waitContext.waitCounter = groupDesc.GetWaitCounter();
717 		waitContext.waitTimeMs = milliseconds;
718 		waitContext.exitCode = 0;
719 
720 		isWorkerThreadTLS = 1;
721 
722 		context.schedulerFiber.CreateFromCurrentThreadAndRun(SchedulerFiberWait, &waitContext);
723 
724 		isWorkerThreadTLS = 0;
725 		return (waitContext.exitCode == 0);
726 	}
727 
728 	bool TaskScheduler::WaitAll(uint32 milliseconds)
729 	{
730 		MT_VERIFY(IsWorkerThread() == false, "Can't use WaitAll inside Task.", return false);
731 
732 		// Early exit if not tasks in group
733 		int32 taskCount = allGroups.GetTaskCount();
734 		if (taskCount == 0)
735 		{
736 			return true;
737 		}
738 
739 		size_t bytesCountForDescBuffer = internal::ThreadContext::GetMemoryRequrementInBytesForDescBuffer();
740 		void* descBuffer = MT_ALLOCATE_ON_STACK(bytesCountForDescBuffer);
741 
742 		internal::ThreadContext context(descBuffer);
743 		context.taskScheduler = this;
744 		context.SetThreadIndex(0xFFFFFFFF);
745 		context.threadId.SetAsCurrentThread();
746 
747 		WaitContext waitContext;
748 		waitContext.threadContext = &context;
749 		waitContext.waitCounter = allGroups.GetWaitCounter();
750 		waitContext.waitTimeMs = milliseconds;
751 		waitContext.exitCode = 0;
752 
753 		isWorkerThreadTLS = 1;
754 
755 		context.schedulerFiber.CreateFromCurrentThreadAndRun(SchedulerFiberWait, &waitContext);
756 
757 		isWorkerThreadTLS = 0;
758 		return (waitContext.exitCode == 0);
759 	}
760 
761 	bool TaskScheduler::IsTaskStealingDisabled() const
762 	{
763 		return taskStealingDisabled;
764 	}
765 
766 	int32 TaskScheduler::GetWorkersCount() const
767 	{
768 		return threadsCount.LoadRelaxed();
769 	}
770 
771 
772 	bool TaskScheduler::IsWorkerThread() const
773 	{
774 		return (isWorkerThreadTLS != 0);
775 	}
776 
777 	TaskGroup TaskScheduler::CreateGroup()
778 	{
779 		MT_ASSERT(IsWorkerThread() == false, "Can't use CreateGroup inside Task.");
780 
781 		TaskGroup group;
782 		if (!availableGroups.TryPop(group))
783 		{
784 			MT_REPORT_ASSERT("Group pool is empty");
785 		}
786 
787 		int idx = group.GetValidIndex();
788 		MT_USED_IN_ASSERT(idx);
789 		MT_ASSERT(groupStats[idx].GetDebugIsFree() == true, "Bad logic!");
790 #if MT_GROUP_DEBUG
791 		groupStats[idx].SetDebugIsFree(false);
792 #endif
793 
794 		return group;
795 	}
796 
797 	void TaskScheduler::ReleaseGroup(TaskGroup group)
798 	{
799 		MT_ASSERT(IsWorkerThread() == false, "Can't use ReleaseGroup inside Task.");
800 		MT_ASSERT(group.IsValid(), "Invalid group ID");
801 
802 		int idx = group.GetValidIndex();
803 		MT_USED_IN_ASSERT(idx);
804 		MT_ASSERT(groupStats[idx].GetDebugIsFree() == false, "Group already released");
805 #if MT_GROUP_DEBUG
806 		groupStats[idx].SetDebugIsFree(true);
807 #endif
808 
809 		bool res = availableGroups.TryPush(std::move(group));
810 		MT_USED_IN_ASSERT(res);
811 		MT_ASSERT(res, "Can't return group to pool");
812 	}
813 
814 	TaskScheduler::TaskGroupDescription & TaskScheduler::GetGroupDesc(TaskGroup group)
815 	{
816 		MT_ASSERT(group.IsValid(), "Invalid group ID");
817 
818 		int idx = group.GetValidIndex();
819 		TaskScheduler::TaskGroupDescription & groupDesc = groupStats[idx];
820 
821 		MT_ASSERT(groupDesc.GetDebugIsFree() == false, "Invalid group");
822 		return groupDesc;
823 	}
824 }
825 
826 
827