1 // The MIT License (MIT)
2 //
3 // 	Copyright (c) 2015 Sergey Makeev, Vadim Slyusarev
4 //
5 // 	Permission is hereby granted, free of charge, to any person obtaining a copy
6 // 	of this software and associated documentation files (the "Software"), to deal
7 // 	in the Software without restriction, including without limitation the rights
8 // 	to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // 	copies of the Software, and to permit persons to whom the Software is
10 // 	furnished to do so, subject to the following conditions:
11 //
12 //  The above copyright notice and this permission notice shall be included in
13 // 	all copies or substantial portions of the Software.
14 //
15 // 	THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // 	IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // 	FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // 	AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // 	LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // 	OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21 // 	THE SOFTWARE.
22 
23 #include <MTScheduler.h>
24 #include <MTStaticVector.h>
25 #include <string.h> // for memset
26 
27 namespace MT
28 {
29 	mt_thread_local uint32 isWorkerThreadTLS = 0;
30 
31 
32 #ifdef MT_INSTRUMENTED_BUILD
33 	TaskScheduler::TaskScheduler(uint32 workerThreadsCount, WorkerThreadParams* workerParameters, IProfilerEventListener* listener, TaskStealingMode::Type stealMode)
34 #else
35 	TaskScheduler::TaskScheduler(uint32 workerThreadsCount, WorkerThreadParams* workerParameters, TaskStealingMode::Type stealMode)
36 #endif
37 		: roundRobinThreadIndex(0)
38 		, startedThreadsCount(0)
39 		, taskStealingDisabled(stealMode == TaskStealingMode::DISABLED)
40 	{
41 
42 #ifdef MT_INSTRUMENTED_BUILD
43 		profilerEventListener = listener;
44 #endif
45 
46 		if (workerThreadsCount != 0)
47 		{
48 			threadsCount.StoreRelaxed( MT::Clamp(workerThreadsCount, (uint32)1, (uint32)MT_MAX_THREAD_COUNT) );
49 		} else
50 		{
51 			//query number of processor
52 			threadsCount.StoreRelaxed( (uint32)MT::Clamp(Thread::GetNumberOfHardwareThreads() - 1, 1, (int)MT_MAX_THREAD_COUNT) );
53 		}
54 
55 		// create fiber pool (fibers with standard stack size)
56 		for (uint32 i = 0; i < MT_MAX_STANDART_FIBERS_COUNT; i++)
57 		{
58 			FiberContext& context = standartFiberContexts[i];
59 			context.fiber.Create(MT_STANDART_FIBER_STACK_SIZE, FiberMain, &context);
60 			bool res = standartFibersAvailable.TryPush( &context );
61 			MT_USED_IN_ASSERT(res);
62 			MT_ASSERT(res == true, "Can't add fiber to storage");
63 		}
64 
65 		// create fiber pool (fibers with extended stack size)
66 		for (uint32 i = 0; i < MT_MAX_EXTENDED_FIBERS_COUNT; i++)
67 		{
68 			FiberContext& context = extendedFiberContexts[i];
69 			context.fiber.Create(MT_EXTENDED_FIBER_STACK_SIZE, FiberMain, &context);
70 			bool res = extendedFibersAvailable.TryPush( &context );
71 			MT_USED_IN_ASSERT(res);
72 			MT_ASSERT(res == true, "Can't add fiber to storage");
73 		}
74 
75 
76 		for (int16 i = 0; i < TaskGroup::MT_MAX_GROUPS_COUNT; i++)
77 		{
78 			if (i != TaskGroup::DEFAULT)
79 			{
80 				bool res = availableGroups.TryPush( TaskGroup(i) );
81 				MT_USED_IN_ASSERT(res);
82 				MT_ASSERT(res == true, "Can't add group to storage");
83 			}
84 		}
85 
86 #if MT_GROUP_DEBUG
87 		groupStats[TaskGroup::DEFAULT].SetDebugIsFree(false);
88 #endif
89 
90 		// create worker thread pool
91 		int32 totalThreadsCount = GetWorkersCount();
92 		for (int32 i = 0; i < totalThreadsCount; i++)
93 		{
94 			threadContext[i].SetThreadIndex(i);
95 			threadContext[i].taskScheduler = this;
96 
97 			uint32 threadCore = i;
98 			ThreadPriority::Type priority = ThreadPriority::DEFAULT;
99 			if (workerParameters != nullptr)
100 			{
101 				const WorkerThreadParams& params = workerParameters[i];
102 
103 				threadCore = params.core;
104 				priority = params.priority;
105 			}
106 
107 			threadContext[i].thread.Start( MT_SCHEDULER_STACK_SIZE, WorkerThreadMain, &threadContext[i], threadCore, priority);
108 		}
109 	}
110 
111 
112 	TaskScheduler::~TaskScheduler()
113 	{
114 		int32 totalThreadsCount = GetWorkersCount();
115 		for (int32 i = 0; i < totalThreadsCount; i++)
116 		{
117 			threadContext[i].state.Store(internal::ThreadState::EXIT);
118 			threadContext[i].hasNewTasksEvent.Signal();
119 		}
120 
121 		for (int32 i = 0; i < totalThreadsCount; i++)
122 		{
123 			threadContext[i].thread.Join();
124 		}
125 	}
126 
127 	FiberContext* TaskScheduler::RequestFiberContext(internal::GroupedTask& task)
128 	{
129 		FiberContext *fiberContext = task.awaitingFiber;
130 		if (fiberContext)
131 		{
132 			task.awaitingFiber = nullptr;
133 			return fiberContext;
134 		}
135 
136 		MT::StackRequirements::Type stackRequirements = task.desc.stackRequirements;
137 
138 		fiberContext = nullptr;
139 		bool res = false;
140 		MT_USED_IN_ASSERT(res);
141 		switch(stackRequirements)
142 		{
143 		case MT::StackRequirements::STANDARD:
144 			res = standartFibersAvailable.TryPop(fiberContext);
145 			MT_ASSERT(res, "Can't get more standard fibers!");
146 			break;
147 		case MT::StackRequirements::EXTENDED:
148 			res = extendedFibersAvailable.TryPop(fiberContext);
149 			MT_ASSERT(res, "Can't get more extended fibers!");
150 			break;
151 		default:
152 			MT_REPORT_ASSERT("Unknown stack requrements");
153 		}
154 
155 		MT_ASSERT(fiberContext != nullptr, "Can't get more fibers. Too many tasks in flight simultaneously?");
156 
157 		fiberContext->currentTask = task.desc;
158 		fiberContext->currentGroup = task.group;
159 		fiberContext->parentFiber = task.parentFiber;
160 		fiberContext->stackRequirements = stackRequirements;
161 		return fiberContext;
162 	}
163 
164 	void TaskScheduler::ReleaseFiberContext(FiberContext*&& fiberContext)
165 	{
166 		MT_ASSERT(fiberContext, "Can't release nullptr Fiber. fiberContext is nullptr");
167 
168 		MT::StackRequirements::Type stackRequirements = fiberContext->stackRequirements;
169 		fiberContext->Reset();
170 
171 		MT_ASSERT(fiberContext != nullptr, "Fiber context can't be nullptr");
172 
173 		bool res = false;
174 		MT_USED_IN_ASSERT(res);
175 		switch(stackRequirements)
176 		{
177 		case MT::StackRequirements::STANDARD:
178 			res = standartFibersAvailable.TryPush(std::move(fiberContext));
179 			break;
180 		case MT::StackRequirements::EXTENDED:
181 			res = extendedFibersAvailable.TryPush(std::move(fiberContext));
182 			break;
183 		default:
184 			MT_REPORT_ASSERT("Unknown stack requrements");
185 		}
186 
187 		MT_USED_IN_ASSERT(res);
188 		MT_ASSERT(res != false, "Can't return fiber to storage");
189 	}
190 
191 	FiberContext* TaskScheduler::ExecuteTask(internal::ThreadContext& threadContext, FiberContext* fiberContext)
192 	{
193 		MT_ASSERT(threadContext.threadId.IsCurrentThread(), "Thread context sanity check failed");
194 
195 		MT_ASSERT(fiberContext, "Invalid fiber context");
196 		MT_ASSERT(fiberContext->currentTask.IsValid(), "Invalid task");
197 
198 		// Set actual thread context to fiber
199 		fiberContext->SetThreadContext(&threadContext);
200 
201 		// Update task status
202 		fiberContext->SetStatus(FiberTaskStatus::RUNNED);
203 
204 		MT_ASSERT(fiberContext->GetThreadContext()->threadId.IsCurrentThread(), "Thread context sanity check failed");
205 
206 		const void* poolUserData = fiberContext->currentTask.userData;
207 		TPoolTaskDestroy poolDestroyFunc = fiberContext->currentTask.poolDestroyFunc;
208 
209 #ifdef MT_INSTRUMENTED_BUILD
210 		//threadContext.NotifyTaskExecuteStateChanged( MT_SYSTEM_TASK_COLOR, MT_SYSTEM_TASK_NAME, TaskExecuteState::SUSPEND);
211 		threadContext.NotifyTaskExecuteStateChanged( MT_SYSTEM_TASK_COLOR, MT_SYSTEM_TASK_NAME, TaskExecuteState::STOP);
212 #endif
213 
214 		// Run current task code
215 		Fiber::SwitchTo(threadContext.schedulerFiber, fiberContext->fiber);
216 
217 #ifdef MT_INSTRUMENTED_BUILD
218 		//threadContext.NotifyTaskExecuteStateChanged( MT_SYSTEM_TASK_COLOR, MT_SYSTEM_TASK_NAME, TaskExecuteState::RESUME);
219 		threadContext.NotifyTaskExecuteStateChanged( MT_SYSTEM_TASK_COLOR, MT_SYSTEM_TASK_NAME, TaskExecuteState::START);
220 #endif
221 
222 		// If task was done
223 		FiberTaskStatus::Type taskStatus = fiberContext->GetStatus();
224 		if (taskStatus == FiberTaskStatus::FINISHED)
225 		{
226 			//destroy task (call dtor) for "fire and forget" type of task from TaskPool
227 			if (poolDestroyFunc != nullptr)
228 			{
229 				poolDestroyFunc(poolUserData);
230 			}
231 
232 			TaskGroup taskGroup = fiberContext->currentGroup;
233 
234 			TaskScheduler::TaskGroupDescription  & groupDesc = threadContext.taskScheduler->GetGroupDesc(taskGroup);
235 
236 			// Update group status
237 			int groupTaskCount = groupDesc.Dec();
238 			MT_ASSERT(groupTaskCount >= 0, "Sanity check failed!");
239 			if (groupTaskCount == 0)
240 			{
241 				fiberContext->currentGroup = TaskGroup::INVALID;
242 			}
243 
244 			// Update total task count
245 			int allGroupTaskCount = threadContext.taskScheduler->allGroups.Dec();
246 			MT_USED_IN_ASSERT(allGroupTaskCount);
247 			MT_ASSERT(allGroupTaskCount >= 0, "Sanity check failed!");
248 
249 			FiberContext* parentFiberContext = fiberContext->parentFiber;
250 			if (parentFiberContext != nullptr)
251 			{
252 				int childrenFibersCount = parentFiberContext->childrenFibersCount.DecFetch();
253 				MT_ASSERT(childrenFibersCount >= 0, "Sanity check failed!");
254 
255 				if (childrenFibersCount == 0)
256 				{
257 					// This is a last subtask. Restore parent task
258 					MT_ASSERT(threadContext.threadId.IsCurrentThread(), "Thread context sanity check failed");
259 					MT_ASSERT(parentFiberContext->GetThreadContext() == nullptr, "Inactive parent should not have a valid thread context");
260 
261 					// WARNING!! Thread context can changed here! Set actual current thread context.
262 					parentFiberContext->SetThreadContext(&threadContext);
263 
264 					MT_ASSERT(parentFiberContext->GetThreadContext()->threadId.IsCurrentThread(), "Thread context sanity check failed");
265 
266 					// All subtasks is done.
267 					// Exiting and return parent fiber to scheduler
268 					return parentFiberContext;
269 				} else
270 				{
271 					// Other subtasks still exist
272 					// Exiting
273 					return nullptr;
274 				}
275 			} else
276 			{
277 				// Task is finished and no parent task
278 				// Exiting
279 				return nullptr;
280 			}
281 		}
282 
283 		MT_ASSERT(taskStatus != FiberTaskStatus::RUNNED, "Incorrect task status")
284 		return nullptr;
285 	}
286 
287 
288 	void TaskScheduler::FiberMain(void* userData)
289 	{
290 		FiberContext& fiberContext = *(FiberContext*)(userData);
291 		for(;;)
292 		{
293 			MT_ASSERT(fiberContext.currentTask.IsValid(), "Invalid task in fiber context");
294 			MT_ASSERT(fiberContext.GetThreadContext(), "Invalid thread context");
295 			MT_ASSERT(fiberContext.GetThreadContext()->threadId.IsCurrentThread(), "Thread context sanity check failed");
296 
297 #ifdef MT_INSTRUMENTED_BUILD
298 			fiberContext.fiber.SetName( MT_SYSTEM_TASK_FIBER_NAME );
299 			fiberContext.GetThreadContext()->NotifyTaskExecuteStateChanged( fiberContext.currentTask.debugColor, fiberContext.currentTask.debugID, TaskExecuteState::START );
300 #endif
301 
302 			fiberContext.currentTask.taskFunc( fiberContext, fiberContext.currentTask.userData );
303 			fiberContext.SetStatus(FiberTaskStatus::FINISHED);
304 
305 #ifdef MT_INSTRUMENTED_BUILD
306 			fiberContext.fiber.SetName( MT_SYSTEM_TASK_FIBER_NAME );
307 			fiberContext.GetThreadContext()->NotifyTaskExecuteStateChanged( fiberContext.currentTask.debugColor, fiberContext.currentTask.debugID, TaskExecuteState::STOP );
308 #endif
309 
310 			Fiber::SwitchTo(fiberContext.fiber, fiberContext.GetThreadContext()->schedulerFiber);
311 		}
312 
313 	}
314 
315 
316 	bool TaskScheduler::TryStealTask(internal::ThreadContext& threadContext, internal::GroupedTask & task)
317 	{
318 		bool taskStealingDisabled = threadContext.taskScheduler->IsTaskStealingDisabled();
319 		uint32 workersCount = threadContext.taskScheduler->GetWorkersCount();
320 
321 		if (workersCount <= 1 || taskStealingDisabled )
322 		{
323 			return false;
324 		}
325 
326 		uint32 victimIndex = threadContext.random.Get();
327 
328 		for (uint32 attempt = 0; attempt < workersCount; attempt++)
329 		{
330 			uint32 index = victimIndex % workersCount;
331 			if (index == threadContext.workerIndex)
332 			{
333 				victimIndex++;
334 				index = victimIndex % workersCount;
335 			}
336 
337 			internal::ThreadContext& victimContext = threadContext.taskScheduler->threadContext[index];
338 			if (victimContext.queue.TryPopNewest(task))
339 			{
340 				return true;
341 			}
342 
343 			victimIndex++;
344 		}
345 		return false;
346 	}
347 
348 	void TaskScheduler::WorkerThreadMain( void* userData )
349 	{
350 		internal::ThreadContext& context = *(internal::ThreadContext*)(userData);
351 		MT_ASSERT(context.taskScheduler, "Task scheduler must be not null!");
352 
353 		isWorkerThreadTLS = 1;
354 		context.threadId.SetAsCurrentThread();
355 
356 #ifdef MT_INSTRUMENTED_BUILD
357 		const char* threadNames[] = {"worker0","worker1","worker2","worker3","worker4","worker5","worker6","worker7","worker8","worker9","worker10","worker11","worker12"};
358 		if (context.workerIndex < MT_ARRAY_SIZE(threadNames))
359 		{
360 			Thread::SetThreadName(threadNames[context.workerIndex]);
361 		} else
362 		{
363 			Thread::SetThreadName("worker_thread");
364 		}
365 #endif
366 
367 		context.schedulerFiber.CreateFromCurrentThreadAndRun(SchedulerFiberMain, userData);
368 	}
369 
370 
371 	void TaskScheduler::SchedulerFiberWait( void* userData )
372 	{
373 		WaitContext& waitContext = *(WaitContext*)(userData);
374 		internal::ThreadContext& context = *waitContext.threadContext;
375 		MT_ASSERT(context.taskScheduler, "Task scheduler must be not null!");
376 		MT_ASSERT(waitContext.waitCounter, "Wait counter must be not null!");
377 
378 #ifdef MT_INSTRUMENTED_BUILD
379 		context.NotifyWaitStarted();
380 		context.NotifyTaskExecuteStateChanged( MT_SYSTEM_TASK_COLOR, MT_SYSTEM_TASK_NAME, TaskExecuteState::START);
381 #endif
382 
383 		int64 timeOut = GetTimeMicroSeconds() + (waitContext.waitTimeMs * 1000);
384 
385 		int32 idleIteration = 0;
386 
387 		for(;;)
388 		{
389 			if ( SchedulerFiberStep(context) == false )
390 			{
391 				//---- Wait for new tasks using hybrid spin ------------------------
392 				// http://www.1024cores.net/home/lock-free-algorithms/tricks/spinning
393 				//
394 				if (idleIteration < 10)
395 				{
396 					MT::YieldCpu();
397 				} else
398 				{
399 					if (idleIteration < 20)
400 					{
401 						for (int32 i = 0; i < 50; i++)
402 						{
403 							MT::YieldCpu();
404 						}
405 					} else
406 					{
407 						if (idleIteration < 24)
408 						{
409 							MT::Thread::Sleep(0);
410 						} else
411 						{
412 							if (idleIteration < 200)
413 							{
414 								MT::Thread::Sleep(1);
415 							} else
416 							{
417 								//MT_REPORT_ASSERT("Sanity check failed. Wait too long (at least 200 ms) and still no tasks to processing");
418 								MT::Thread::Sleep(20);
419 							}
420 						}
421 					}
422 				}
423 				//---- spin wait for new tasks ------------------------
424 
425 				idleIteration++;
426 			} else
427 			{
428 				idleIteration = 0;
429 			}
430 
431 			int32 groupTaskCount = waitContext.waitCounter->Load();
432 			if (groupTaskCount == 0)
433 			{
434 				waitContext.exitCode = 0;
435 				break;
436 			}
437 
438 			int64 timeNow = GetTimeMicroSeconds();
439 			if (timeNow >= timeOut)
440 			{
441 				waitContext.exitCode = 1;
442 				break;
443 			}
444 		}
445 
446 #ifdef MT_INSTRUMENTED_BUILD
447 		context.NotifyTaskExecuteStateChanged( MT_SYSTEM_TASK_COLOR, MT_SYSTEM_TASK_NAME, TaskExecuteState::STOP);
448 		context.NotifyWaitFinished();
449 #endif
450 
451 	}
452 
453 	void TaskScheduler::SchedulerFiberMain( void* userData )
454 	{
455 		internal::ThreadContext& context = *(internal::ThreadContext*)(userData);
456 		MT_ASSERT(context.taskScheduler, "Task scheduler must be not null!");
457 
458 #ifdef MT_INSTRUMENTED_BUILD
459 		context.NotifyThreadCreated(context.workerIndex);
460 #endif
461 
462 		int32 totalThreadsCount = context.taskScheduler->threadsCount.LoadRelaxed();
463 		context.taskScheduler->startedThreadsCount.IncFetch();
464 
465 		//Simple spinlock until all threads is started and initialized
466 		for(;;)
467 		{
468 			int32 initializedThreadsCount = context.taskScheduler->startedThreadsCount.Load();
469 			if (initializedThreadsCount == totalThreadsCount)
470 			{
471 				break;
472 			}
473 
474 			// sleep some time until all other thread initialized
475 			Thread::Sleep(1);
476 		}
477 
478 		HardwareFullMemoryBarrier();
479 
480 #ifdef MT_INSTRUMENTED_BUILD
481 		context.NotifyThreadStarted(context.workerIndex);
482 		context.NotifyTaskExecuteStateChanged( MT_SYSTEM_TASK_COLOR, MT_SYSTEM_TASK_NAME, TaskExecuteState::START);
483 #endif
484 
485 		while(context.state.Load() != internal::ThreadState::EXIT)
486 		{
487 			if ( SchedulerFiberStep(context) == false )
488 			{
489 #ifdef MT_INSTRUMENTED_BUILD
490 				context.NotifyThreadIdleStarted(context.workerIndex);
491 #endif
492 				// Queue is empty and stealing attempt has failed
493 				// Wait for new events
494 				context.hasNewTasksEvent.Wait(2000);
495 
496 #ifdef MT_INSTRUMENTED_BUILD
497 				context.NotifyThreadIdleFinished(context.workerIndex);
498 #endif
499 			}
500 
501 		} // main thread loop
502 
503 #ifdef MT_INSTRUMENTED_BUILD
504 		context.NotifyTaskExecuteStateChanged( MT_SYSTEM_TASK_COLOR, MT_SYSTEM_TASK_NAME, TaskExecuteState::STOP);
505 		context.NotifyThreadStoped(context.workerIndex);
506 #endif
507 
508 	}
509 
510 	bool TaskScheduler::SchedulerFiberStep( internal::ThreadContext& context )
511 	{
512 		internal::GroupedTask task;
513 		if (context.queue.TryPopOldest(task) || TryStealTask(context, task) )
514 		{
515 #ifdef MT_INSTRUMENTED_BUILD
516 			bool isNewTask = (task.awaitingFiber == nullptr);
517 #endif
518 
519 			// There is a new task
520 			FiberContext* fiberContext = context.taskScheduler->RequestFiberContext(task);
521 			MT_ASSERT(fiberContext, "Can't get execution context from pool");
522 			MT_ASSERT(fiberContext->currentTask.IsValid(), "Sanity check failed");
523 			MT_ASSERT(fiberContext->stackRequirements == task.desc.stackRequirements, "Sanity check failed");
524 
525 			while(fiberContext)
526 			{
527 #ifdef MT_INSTRUMENTED_BUILD
528 				if (isNewTask)
529 				{
530 					//TODO:
531 					isNewTask = false;
532 				}
533 #endif
534 				// prevent invalid fiber resume from child tasks, before ExecuteTask is done
535 				fiberContext->childrenFibersCount.IncFetch();
536 
537 				FiberContext* parentFiber = ExecuteTask(context, fiberContext);
538 
539 				FiberTaskStatus::Type taskStatus = fiberContext->GetStatus();
540 
541 				//release guard
542 				int childrenFibersCount = fiberContext->childrenFibersCount.DecFetch();
543 
544 				// Can drop fiber context - task is finished
545 				if (taskStatus == FiberTaskStatus::FINISHED)
546 				{
547 					MT_ASSERT( childrenFibersCount == 0, "Sanity check failed");
548 					context.taskScheduler->ReleaseFiberContext(std::move(fiberContext));
549 
550 					// If parent fiber is exist transfer flow control to parent fiber, if parent fiber is null, exit
551 					fiberContext = parentFiber;
552 				} else
553 				{
554 					MT_ASSERT( childrenFibersCount >= 0, "Sanity check failed");
555 
556 					// No subtasks here and status is not finished, this mean all subtasks already finished before parent return from ExecuteTask
557 					if (childrenFibersCount == 0)
558 					{
559 						MT_ASSERT(parentFiber == nullptr, "Sanity check failed");
560 					} else
561 					{
562 						// If subtasks still exist, drop current task execution. task will be resumed when last subtask finished
563 						break;
564 					}
565 
566 					// If task is yielded execution, get another task from queue.
567 					if (taskStatus == FiberTaskStatus::YIELDED)
568 					{
569 						// Task is yielded, add to tasks queue
570 						ArrayView<internal::GroupedTask> buffer(context.descBuffer, 1);
571 						ArrayView<internal::TaskBucket> buckets( MT_ALLOCATE_ON_STACK(sizeof(internal::TaskBucket)), 1 );
572 
573 						FiberContext* yieldedTask = fiberContext;
574 						StaticVector<FiberContext*, 1> yieldedTasksQueue(1, yieldedTask);
575 						internal::DistibuteDescriptions( TaskGroup(TaskGroup::ASSIGN_FROM_CONTEXT), yieldedTasksQueue.Begin(), buffer, buckets );
576 
577 						// add yielded task to scheduler
578 						context.taskScheduler->RunTasksImpl(buckets, nullptr, true);
579 
580 						// ATENTION! yielded task can be already completed at this point
581 
582 						break;
583 					}
584 				}
585 			} //while(fiberContext)
586 
587 			return true;
588 		}
589 
590 		return false;
591 	}
592 
593 	void TaskScheduler::RunTasksImpl(ArrayView<internal::TaskBucket>& buckets, FiberContext * parentFiber, bool restoredFromAwaitState)
594 	{
595 		// This storage is necessary to calculate how many tasks we add to different groups
596 		int newTaskCountInGroup[TaskGroup::MT_MAX_GROUPS_COUNT];
597 
598 		// Default value is 0
599 		memset(&newTaskCountInGroup[0], 0, sizeof(newTaskCountInGroup));
600 
601 		// Set parent fiber pointer
602 		// Calculate the number of tasks per group
603 		// Calculate total number of tasks
604 		size_t count = 0;
605 		for (size_t i = 0; i < buckets.Size(); ++i)
606 		{
607 			internal::TaskBucket& bucket = buckets[i];
608 			for (size_t taskIndex = 0; taskIndex < bucket.count; taskIndex++)
609 			{
610 				internal::GroupedTask & task = bucket.tasks[taskIndex];
611 
612 				task.parentFiber = parentFiber;
613 
614 				int idx = task.group.GetValidIndex();
615 				MT_ASSERT(idx >= 0 && idx < TaskGroup::MT_MAX_GROUPS_COUNT, "Invalid index");
616 				newTaskCountInGroup[idx]++;
617 			}
618 
619 			count += bucket.count;
620 		}
621 
622 		// Increments child fibers count on parent fiber
623 		if (parentFiber)
624 		{
625 			parentFiber->childrenFibersCount.AddFetch((int)count);
626 		}
627 
628 		if (restoredFromAwaitState == false)
629 		{
630 			// Increase the number of active tasks in the group using data from temporary storage
631 			for (size_t i = 0; i < TaskGroup::MT_MAX_GROUPS_COUNT; i++)
632 			{
633 				int groupNewTaskCount = newTaskCountInGroup[i];
634 				if (groupNewTaskCount > 0)
635 				{
636 					groupStats[i].Add((uint32)groupNewTaskCount);
637 				}
638 			}
639 
640 			// Increments all task in progress counter
641 			allGroups.Add((uint32)count);
642 		} else
643 		{
644 			// If task's restored from await state, counters already in correct state
645 		}
646 
647 		// Add to thread queue
648 		for (size_t i = 0; i < buckets.Size(); ++i)
649 		{
650 			int bucketIndex = roundRobinThreadIndex.IncFetch() % threadsCount.LoadRelaxed();
651 			internal::ThreadContext & context = threadContext[bucketIndex];
652 
653 			internal::TaskBucket& bucket = buckets[i];
654 
655 			for(;;)
656 			{
657 				MT_ASSERT(bucket.count < (internal::TASK_BUFFER_CAPACITY - 1), "Sanity check failed. Too many tasks per one bucket.");
658 
659 				bool res = context.queue.Add(bucket.tasks, bucket.count);
660 				if (res == true)
661 				{
662 					break;
663 				}
664 
665 				//Can't add new tasks onto the queue. Look like the job system is overloaded. Wait some time and try again.
666 				//TODO: implement waiting until workers done using events.
667 				Thread::Sleep(10);
668 			}
669 
670 			context.hasNewTasksEvent.Signal();
671 		}
672 	}
673 
674 	void TaskScheduler::RunAsync(TaskGroup group, const TaskHandle* taskHandleArray, uint32 taskHandleCount)
675 	{
676 		MT_ASSERT(!IsWorkerThread(), "Can't use RunAsync inside Task. Use FiberContext.RunAsync() instead.");
677 
678 		ArrayView<internal::GroupedTask> buffer(MT_ALLOCATE_ON_STACK(sizeof(internal::GroupedTask) * taskHandleCount), taskHandleCount);
679 
680 		uint32 bucketCount = MT::Min((uint32)GetWorkersCount(), taskHandleCount);
681 		ArrayView<internal::TaskBucket> buckets(MT_ALLOCATE_ON_STACK(sizeof(internal::TaskBucket) * bucketCount), bucketCount);
682 
683 		internal::DistibuteDescriptions(group, taskHandleArray, buffer, buckets);
684 		RunTasksImpl(buckets, nullptr, false);
685 	}
686 
687 	bool TaskScheduler::WaitGroup(TaskGroup group, uint32 milliseconds)
688 	{
689 		MT_VERIFY(IsWorkerThread() == false, "Can't use WaitGroup inside Task. Use FiberContext.WaitGroupAndYield() instead.", return false);
690 
691 		TaskScheduler::TaskGroupDescription& groupDesc = GetGroupDesc(group);
692 
693 		size_t bytesCountForDescBuffer = internal::ThreadContext::GetMemoryRequrementInBytesForDescBuffer();
694 		void* descBuffer = MT_ALLOCATE_ON_STACK(bytesCountForDescBuffer);
695 
696 		internal::ThreadContext context(descBuffer);
697 		context.taskScheduler = this;
698 		context.SetThreadIndex(0xFFFFFFFF);
699 		context.threadId.SetAsCurrentThread();
700 
701 		WaitContext waitContext;
702 		waitContext.threadContext = &context;
703 		waitContext.waitCounter = groupDesc.GetWaitCounter();
704 		waitContext.waitTimeMs = milliseconds;
705 		waitContext.exitCode = 0;
706 
707 		isWorkerThreadTLS = 1;
708 
709 		context.schedulerFiber.CreateFromCurrentThreadAndRun(SchedulerFiberWait, &waitContext);
710 
711 		isWorkerThreadTLS = 0;
712 		return (waitContext.exitCode == 0);
713 	}
714 
715 	bool TaskScheduler::WaitAll(uint32 milliseconds)
716 	{
717 		MT_VERIFY(IsWorkerThread() == false, "Can't use WaitAll inside Task.", return false);
718 
719 		size_t bytesCountForDescBuffer = internal::ThreadContext::GetMemoryRequrementInBytesForDescBuffer();
720 		void* descBuffer = MT_ALLOCATE_ON_STACK(bytesCountForDescBuffer);
721 
722 		internal::ThreadContext context(descBuffer);
723 		context.taskScheduler = this;
724 		context.SetThreadIndex(0xFFFFFFFF);
725 		context.threadId.SetAsCurrentThread();
726 
727 		WaitContext waitContext;
728 		waitContext.threadContext = &context;
729 		waitContext.waitCounter = allGroups.GetWaitCounter();
730 		waitContext.waitTimeMs = milliseconds;
731 		waitContext.exitCode = 0;
732 
733 		isWorkerThreadTLS = 1;
734 
735 		context.schedulerFiber.CreateFromCurrentThreadAndRun(SchedulerFiberWait, &waitContext);
736 
737 		isWorkerThreadTLS = 0;
738 		return (waitContext.exitCode == 0);
739 	}
740 
741 	bool TaskScheduler::IsTaskStealingDisabled() const
742 	{
743 		return taskStealingDisabled;
744 	}
745 
746 	int32 TaskScheduler::GetWorkersCount() const
747 	{
748 		return threadsCount.LoadRelaxed();
749 	}
750 
751 
752 	bool TaskScheduler::IsWorkerThread() const
753 	{
754 		return (isWorkerThreadTLS != 0);
755 	}
756 
757 	TaskGroup TaskScheduler::CreateGroup()
758 	{
759 		MT_ASSERT(IsWorkerThread() == false, "Can't use CreateGroup inside Task.");
760 
761 		TaskGroup group;
762 		if (!availableGroups.TryPop(group))
763 		{
764 			MT_REPORT_ASSERT("Group pool is empty");
765 		}
766 
767 		int idx = group.GetValidIndex();
768 		MT_USED_IN_ASSERT(idx);
769 		MT_ASSERT(groupStats[idx].GetDebugIsFree() == true, "Bad logic!");
770 #if MT_GROUP_DEBUG
771 		groupStats[idx].SetDebugIsFree(false);
772 #endif
773 
774 		return group;
775 	}
776 
777 	void TaskScheduler::ReleaseGroup(TaskGroup group)
778 	{
779 		MT_ASSERT(IsWorkerThread() == false, "Can't use ReleaseGroup inside Task.");
780 		MT_ASSERT(group.IsValid(), "Invalid group ID");
781 
782 		int idx = group.GetValidIndex();
783 		MT_USED_IN_ASSERT(idx);
784 		MT_ASSERT(groupStats[idx].GetDebugIsFree() == false, "Group already released");
785 #if MT_GROUP_DEBUG
786 		groupStats[idx].SetDebugIsFree(true);
787 #endif
788 
789 		bool res = availableGroups.TryPush(std::move(group));
790 		MT_USED_IN_ASSERT(res);
791 		MT_ASSERT(res, "Can't return group to pool");
792 	}
793 
794 	TaskScheduler::TaskGroupDescription & TaskScheduler::GetGroupDesc(TaskGroup group)
795 	{
796 		MT_ASSERT(group.IsValid(), "Invalid group ID");
797 
798 		int idx = group.GetValidIndex();
799 		TaskScheduler::TaskGroupDescription & groupDesc = groupStats[idx];
800 
801 		MT_ASSERT(groupDesc.GetDebugIsFree() == false, "Invalid group");
802 		return groupDesc;
803 	}
804 }
805 
806 
807