1 // The MIT License (MIT) 2 // 3 // Copyright (c) 2015 Sergey Makeev, Vadim Slyusarev 4 // 5 // Permission is hereby granted, free of charge, to any person obtaining a copy 6 // of this software and associated documentation files (the "Software"), to deal 7 // in the Software without restriction, including without limitation the rights 8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 9 // copies of the Software, and to permit persons to whom the Software is 10 // furnished to do so, subject to the following conditions: 11 // 12 // The above copyright notice and this permission notice shall be included in 13 // all copies or substantial portions of the Software. 14 // 15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 21 // THE SOFTWARE. 22 23 #include <MTScheduler.h> 24 #include <MTStaticVector.h> 25 #include <string.h> // for memset 26 27 namespace MT 28 { 29 30 #ifdef MT_INSTRUMENTED_BUILD 31 TaskScheduler::TaskScheduler(uint32 workerThreadsCount, WorkerThreadParams* workerParameters, IProfilerEventListener* listener, TaskStealingMode::Type stealMode) 32 #else 33 TaskScheduler::TaskScheduler(uint32 workerThreadsCount, WorkerThreadParams* workerParameters, TaskStealingMode::Type stealMode) 34 #endif 35 : roundRobinThreadIndex(0) 36 , startedThreadsCount(0) 37 , taskStealingDisabled(stealMode == TaskStealingMode::DISABLED) 38 { 39 40 #ifdef MT_INSTRUMENTED_BUILD 41 profilerEventListener = listener; 42 #endif 43 44 if (workerThreadsCount != 0) 45 { 46 threadsCount.StoreRelaxed( MT::Clamp(workerThreadsCount, (uint32)1, (uint32)MT_MAX_THREAD_COUNT) ); 47 } else 48 { 49 //query number of processor 50 threadsCount.StoreRelaxed( (uint32)MT::Clamp(Thread::GetNumberOfHardwareThreads() - 1, 1, (int)MT_MAX_THREAD_COUNT) ); 51 } 52 53 // create fiber pool (fibers with standard stack size) 54 for (uint32 i = 0; i < MT_MAX_STANDART_FIBERS_COUNT; i++) 55 { 56 FiberContext& context = standartFiberContexts[i]; 57 context.fiber.Create(MT_STANDART_FIBER_STACK_SIZE, FiberMain, &context); 58 bool res = standartFibersAvailable.TryPush( &context ); 59 MT_USED_IN_ASSERT(res); 60 MT_ASSERT(res == true, "Can't add fiber to storage"); 61 } 62 63 // create fiber pool (fibers with extended stack size) 64 for (uint32 i = 0; i < MT_MAX_EXTENDED_FIBERS_COUNT; i++) 65 { 66 FiberContext& context = extendedFiberContexts[i]; 67 context.fiber.Create(MT_EXTENDED_FIBER_STACK_SIZE, FiberMain, &context); 68 bool res = extendedFibersAvailable.TryPush( &context ); 69 MT_USED_IN_ASSERT(res); 70 MT_ASSERT(res == true, "Can't add fiber to storage"); 71 } 72 73 74 for (int16 i = 0; i < TaskGroup::MT_MAX_GROUPS_COUNT; i++) 75 { 76 if (i != TaskGroup::DEFAULT) 77 { 78 bool res = availableGroups.TryPush( TaskGroup(i) ); 79 MT_USED_IN_ASSERT(res); 80 MT_ASSERT(res == true, "Can't add group to storage"); 81 } 82 } 83 84 #if MT_GROUP_DEBUG 85 groupStats[TaskGroup::DEFAULT].SetDebugIsFree(false); 86 #endif 87 88 // create worker thread pool 89 int32 totalThreadsCount = GetWorkersCount(); 90 for (int32 i = 0; i < totalThreadsCount; i++) 91 { 92 threadContext[i].SetThreadIndex(i); 93 threadContext[i].taskScheduler = this; 94 95 uint32 threadCore = i; 96 ThreadPriority::Type priority = ThreadPriority::DEFAULT; 97 if (workerParameters != nullptr) 98 { 99 const WorkerThreadParams& params = workerParameters[i]; 100 101 threadCore = params.core; 102 priority = params.priority; 103 } 104 105 threadContext[i].thread.Start( MT_SCHEDULER_STACK_SIZE, WorkerThreadMain, &threadContext[i], threadCore, priority); 106 } 107 } 108 109 110 TaskScheduler::~TaskScheduler() 111 { 112 int32 totalThreadsCount = GetWorkersCount(); 113 for (int32 i = 0; i < totalThreadsCount; i++) 114 { 115 threadContext[i].state.Store(internal::ThreadState::EXIT); 116 threadContext[i].hasNewTasksEvent.Signal(); 117 } 118 119 for (int32 i = 0; i < totalThreadsCount; i++) 120 { 121 threadContext[i].thread.Join(); 122 } 123 } 124 125 FiberContext* TaskScheduler::RequestFiberContext(internal::GroupedTask& task) 126 { 127 FiberContext *fiberContext = task.awaitingFiber; 128 if (fiberContext) 129 { 130 task.awaitingFiber = nullptr; 131 return fiberContext; 132 } 133 134 MT::StackRequirements::Type stackRequirements = task.desc.stackRequirements; 135 136 fiberContext = nullptr; 137 bool res = false; 138 MT_USED_IN_ASSERT(res); 139 switch(stackRequirements) 140 { 141 case MT::StackRequirements::STANDARD: 142 res = standartFibersAvailable.TryPop(fiberContext); 143 MT_ASSERT(res, "Can't get more standard fibers!"); 144 break; 145 case MT::StackRequirements::EXTENDED: 146 res = extendedFibersAvailable.TryPop(fiberContext); 147 MT_ASSERT(res, "Can't get more extended fibers!"); 148 break; 149 default: 150 MT_REPORT_ASSERT("Unknown stack requrements"); 151 } 152 153 MT_ASSERT(fiberContext != nullptr, "Can't get more fibers. Too many tasks in flight simultaneously?"); 154 155 fiberContext->currentTask = task.desc; 156 fiberContext->currentGroup = task.group; 157 fiberContext->parentFiber = task.parentFiber; 158 fiberContext->stackRequirements = stackRequirements; 159 return fiberContext; 160 } 161 162 void TaskScheduler::ReleaseFiberContext(FiberContext*&& fiberContext) 163 { 164 MT_ASSERT(fiberContext, "Can't release nullptr Fiber. fiberContext is nullptr"); 165 166 MT::StackRequirements::Type stackRequirements = fiberContext->stackRequirements; 167 fiberContext->Reset(); 168 169 MT_ASSERT(fiberContext != nullptr, "Fiber context can't be nullptr"); 170 171 bool res = false; 172 MT_USED_IN_ASSERT(res); 173 switch(stackRequirements) 174 { 175 case MT::StackRequirements::STANDARD: 176 res = standartFibersAvailable.TryPush(std::move(fiberContext)); 177 break; 178 case MT::StackRequirements::EXTENDED: 179 res = extendedFibersAvailable.TryPush(std::move(fiberContext)); 180 break; 181 default: 182 MT_REPORT_ASSERT("Unknown stack requrements"); 183 } 184 185 MT_USED_IN_ASSERT(res); 186 MT_ASSERT(res != false, "Can't return fiber to storage"); 187 } 188 189 FiberContext* TaskScheduler::ExecuteTask(internal::ThreadContext& threadContext, FiberContext* fiberContext) 190 { 191 MT_ASSERT(threadContext.thread.IsCurrentThread(), "Thread context sanity check failed"); 192 193 MT_ASSERT(fiberContext, "Invalid fiber context"); 194 MT_ASSERT(fiberContext->currentTask.IsValid(), "Invalid task"); 195 196 // Set actual thread context to fiber 197 fiberContext->SetThreadContext(&threadContext); 198 199 // Update task status 200 fiberContext->SetStatus(FiberTaskStatus::RUNNED); 201 202 MT_ASSERT(fiberContext->GetThreadContext()->thread.IsCurrentThread(), "Thread context sanity check failed"); 203 204 const void* poolUserData = fiberContext->currentTask.userData; 205 TPoolTaskDestroy poolDestroyFunc = fiberContext->currentTask.poolDestroyFunc; 206 207 #ifdef MT_INSTRUMENTED_BUILD 208 //threadContext.NotifyTaskExecuteStateChanged( MT_SYSTEM_TASK_COLOR, MT_SYSTEM_TASK_NAME, TaskExecuteState::SUSPEND); 209 threadContext.NotifyTaskExecuteStateChanged( MT_SYSTEM_TASK_COLOR, MT_SYSTEM_TASK_NAME, TaskExecuteState::STOP); 210 #endif 211 212 // Run current task code 213 Fiber::SwitchTo(threadContext.schedulerFiber, fiberContext->fiber); 214 215 #ifdef MT_INSTRUMENTED_BUILD 216 //threadContext.NotifyTaskExecuteStateChanged( MT_SYSTEM_TASK_COLOR, MT_SYSTEM_TASK_NAME, TaskExecuteState::RESUME); 217 threadContext.NotifyTaskExecuteStateChanged( MT_SYSTEM_TASK_COLOR, MT_SYSTEM_TASK_NAME, TaskExecuteState::START); 218 #endif 219 220 // If task was done 221 FiberTaskStatus::Type taskStatus = fiberContext->GetStatus(); 222 if (taskStatus == FiberTaskStatus::FINISHED) 223 { 224 //destroy task (call dtor) for "fire and forget" type of task from TaskPool 225 if (poolDestroyFunc != nullptr) 226 { 227 poolDestroyFunc(poolUserData); 228 } 229 230 TaskGroup taskGroup = fiberContext->currentGroup; 231 232 TaskScheduler::TaskGroupDescription & groupDesc = threadContext.taskScheduler->GetGroupDesc(taskGroup); 233 234 // Update group status 235 int groupTaskCount = groupDesc.Dec(); 236 MT_ASSERT(groupTaskCount >= 0, "Sanity check failed!"); 237 if (groupTaskCount == 0) 238 { 239 // Signal pending threads that group work is finished. Group can be destroyed after this call. 240 groupDesc.Signal(); 241 242 fiberContext->currentGroup = TaskGroup::INVALID; 243 } 244 245 // Update total task count 246 int allGroupTaskCount = threadContext.taskScheduler->allGroups.Dec(); 247 MT_ASSERT(allGroupTaskCount >= 0, "Sanity check failed!"); 248 if (allGroupTaskCount == 0) 249 { 250 // Notify all tasks in all group finished 251 threadContext.taskScheduler->allGroups.Signal(); 252 } 253 254 FiberContext* parentFiberContext = fiberContext->parentFiber; 255 if (parentFiberContext != nullptr) 256 { 257 int childrenFibersCount = parentFiberContext->childrenFibersCount.DecFetch(); 258 MT_ASSERT(childrenFibersCount >= 0, "Sanity check failed!"); 259 260 if (childrenFibersCount == 0) 261 { 262 // This is a last subtask. Restore parent task 263 MT_ASSERT(threadContext.thread.IsCurrentThread(), "Thread context sanity check failed"); 264 MT_ASSERT(parentFiberContext->GetThreadContext() == nullptr, "Inactive parent should not have a valid thread context"); 265 266 // WARNING!! Thread context can changed here! Set actual current thread context. 267 parentFiberContext->SetThreadContext(&threadContext); 268 269 MT_ASSERT(parentFiberContext->GetThreadContext()->thread.IsCurrentThread(), "Thread context sanity check failed"); 270 271 // All subtasks is done. 272 // Exiting and return parent fiber to scheduler 273 return parentFiberContext; 274 } else 275 { 276 // Other subtasks still exist 277 // Exiting 278 return nullptr; 279 } 280 } else 281 { 282 // Task is finished and no parent task 283 // Exiting 284 return nullptr; 285 } 286 } 287 288 MT_ASSERT(taskStatus != FiberTaskStatus::RUNNED, "Incorrect task status") 289 return nullptr; 290 } 291 292 293 void TaskScheduler::FiberMain(void* userData) 294 { 295 FiberContext& fiberContext = *(FiberContext*)(userData); 296 for(;;) 297 { 298 MT_ASSERT(fiberContext.currentTask.IsValid(), "Invalid task in fiber context"); 299 MT_ASSERT(fiberContext.GetThreadContext(), "Invalid thread context"); 300 MT_ASSERT(fiberContext.GetThreadContext()->thread.IsCurrentThread(), "Thread context sanity check failed"); 301 302 #ifdef MT_INSTRUMENTED_BUILD 303 fiberContext.fiber.SetName( MT_SYSTEM_TASK_FIBER_NAME ); 304 fiberContext.GetThreadContext()->NotifyTaskExecuteStateChanged( fiberContext.currentTask.debugColor, fiberContext.currentTask.debugID, TaskExecuteState::START ); 305 #endif 306 307 fiberContext.currentTask.taskFunc( fiberContext, fiberContext.currentTask.userData ); 308 fiberContext.SetStatus(FiberTaskStatus::FINISHED); 309 310 #ifdef MT_INSTRUMENTED_BUILD 311 fiberContext.fiber.SetName( MT_SYSTEM_TASK_FIBER_NAME ); 312 fiberContext.GetThreadContext()->NotifyTaskExecuteStateChanged( fiberContext.currentTask.debugColor, fiberContext.currentTask.debugID, TaskExecuteState::STOP ); 313 #endif 314 315 Fiber::SwitchTo(fiberContext.fiber, fiberContext.GetThreadContext()->schedulerFiber); 316 } 317 318 } 319 320 321 bool TaskScheduler::TryStealTask(internal::ThreadContext& threadContext, internal::GroupedTask & task, uint32 workersCount, bool taskStealingDisabled) 322 { 323 if (workersCount <= 1 || taskStealingDisabled ) 324 { 325 return false; 326 } 327 328 uint32 victimIndex = threadContext.random.Get(); 329 330 for (uint32 attempt = 0; attempt < workersCount; attempt++) 331 { 332 uint32 index = victimIndex % workersCount; 333 if (index == threadContext.workerIndex) 334 { 335 victimIndex++; 336 index = victimIndex % workersCount; 337 } 338 339 internal::ThreadContext& victimContext = threadContext.taskScheduler->threadContext[index]; 340 if (victimContext.queue.TryPopNewest(task)) 341 { 342 return true; 343 } 344 345 victimIndex++; 346 } 347 return false; 348 } 349 350 351 void TaskScheduler::WorkerThreadMain( void* userData ) 352 { 353 internal::ThreadContext& context = *(internal::ThreadContext*)(userData); 354 MT_ASSERT(context.taskScheduler, "Task scheduler must be not null!"); 355 356 #ifdef MT_INSTRUMENTED_BUILD 357 const char* threadNames[] = {"worker0","worker1","worker2","worker3","worker4","worker5","worker6","worker7","worker8","worker9","worker10","worker11","worker12"}; 358 if (context.workerIndex < MT_ARRAY_SIZE(threadNames)) 359 { 360 Thread::SetThreadName(threadNames[context.workerIndex]); 361 } else 362 { 363 Thread::SetThreadName("worker_thread"); 364 } 365 #endif 366 367 context.schedulerFiber.CreateFromCurrentThreadAndRun(SchedulerFiberMain, userData); 368 } 369 370 371 void TaskScheduler::SchedulerFiberMain( void* userData ) 372 { 373 internal::ThreadContext& context = *(internal::ThreadContext*)(userData); 374 MT_ASSERT(context.taskScheduler, "Task scheduler must be not null!"); 375 376 #ifdef MT_INSTRUMENTED_BUILD 377 context.NotifyThreadCreate(context.workerIndex); 378 #endif 379 380 bool taskStealingDisabled = context.taskScheduler->IsTaskStealingDisabled(); 381 uint32 workersCount = context.taskScheduler->GetWorkersCount(); 382 int32 totalThreadsCount = context.taskScheduler->threadsCount.LoadRelaxed(); 383 384 context.taskScheduler->startedThreadsCount.IncFetch(); 385 386 //Simple spinlock until all threads is started and initialized 387 for(;;) 388 { 389 int32 initializedThreadsCount = context.taskScheduler->startedThreadsCount.Load(); 390 if (initializedThreadsCount == totalThreadsCount) 391 { 392 break; 393 } 394 395 // sleep some time until all other thread initialized 396 Thread::Sleep(1); 397 } 398 399 HardwareFullMemoryBarrier(); 400 401 #ifdef MT_INSTRUMENTED_BUILD 402 context.NotifyThreadStart(context.workerIndex); 403 context.NotifyTaskExecuteStateChanged( MT_SYSTEM_TASK_COLOR, MT_SYSTEM_TASK_NAME, TaskExecuteState::START); 404 #endif 405 406 while(context.state.Load() != internal::ThreadState::EXIT) 407 { 408 internal::GroupedTask task; 409 if (context.queue.TryPopOldest(task) || TryStealTask(context, task, workersCount, taskStealingDisabled) ) 410 { 411 #ifdef MT_INSTRUMENTED_BUILD 412 bool isNewTask = (task.awaitingFiber == nullptr); 413 #endif 414 415 // There is a new task 416 FiberContext* fiberContext = context.taskScheduler->RequestFiberContext(task); 417 MT_ASSERT(fiberContext, "Can't get execution context from pool"); 418 MT_ASSERT(fiberContext->currentTask.IsValid(), "Sanity check failed"); 419 MT_ASSERT(fiberContext->stackRequirements == task.desc.stackRequirements, "Sanity check failed"); 420 421 while(fiberContext) 422 { 423 #ifdef MT_INSTRUMENTED_BUILD 424 if (isNewTask) 425 { 426 //TODO: 427 isNewTask = false; 428 } 429 #endif 430 // prevent invalid fiber resume from child tasks, before ExecuteTask is done 431 fiberContext->childrenFibersCount.IncFetch(); 432 433 FiberContext* parentFiber = ExecuteTask(context, fiberContext); 434 435 FiberTaskStatus::Type taskStatus = fiberContext->GetStatus(); 436 437 //release guard 438 int childrenFibersCount = fiberContext->childrenFibersCount.DecFetch(); 439 440 // Can drop fiber context - task is finished 441 if (taskStatus == FiberTaskStatus::FINISHED) 442 { 443 MT_ASSERT( childrenFibersCount == 0, "Sanity check failed"); 444 context.taskScheduler->ReleaseFiberContext(std::move(fiberContext)); 445 446 // If parent fiber is exist transfer flow control to parent fiber, if parent fiber is null, exit 447 fiberContext = parentFiber; 448 } else 449 { 450 MT_ASSERT( childrenFibersCount >= 0, "Sanity check failed"); 451 452 // No subtasks here and status is not finished, this mean all subtasks already finished before parent return from ExecuteTask 453 if (childrenFibersCount == 0) 454 { 455 MT_ASSERT(parentFiber == nullptr, "Sanity check failed"); 456 } else 457 { 458 // If subtasks still exist, drop current task execution. task will be resumed when last subtask finished 459 break; 460 } 461 462 // If task is yielded execution, get another task from queue. 463 if (taskStatus == FiberTaskStatus::YIELDED) 464 { 465 // Task is yielded, add to tasks queue 466 ArrayView<internal::GroupedTask> buffer(context.descBuffer, 1); 467 ArrayView<internal::TaskBucket> buckets( MT_ALLOCATE_ON_STACK(sizeof(internal::TaskBucket)), 1 ); 468 469 FiberContext* yieldedTask = fiberContext; 470 StaticVector<FiberContext*, 1> yieldedTasksQueue(1, yieldedTask); 471 internal::DistibuteDescriptions( TaskGroup(TaskGroup::ASSIGN_FROM_CONTEXT), yieldedTasksQueue.Begin(), buffer, buckets ); 472 473 // add yielded task to scheduler 474 context.taskScheduler->RunTasksImpl(buckets, nullptr, true); 475 476 // ATENTION! yielded task can be already completed at this point 477 478 break; 479 } 480 } 481 } //while(fiberContext) 482 483 } else 484 { 485 #ifdef MT_INSTRUMENTED_BUILD 486 context.NotifyThreadIdleBegin(context.workerIndex); 487 #endif 488 489 // Queue if empty and stealing attempt failed 490 // Wait for new events 491 context.hasNewTasksEvent.Wait(2000); 492 493 #ifdef MT_INSTRUMENTED_BUILD 494 context.NotifyThreadIdleEnd(context.workerIndex); 495 #endif 496 497 } 498 499 } // main thread loop 500 501 #ifdef MT_INSTRUMENTED_BUILD 502 context.NotifyTaskExecuteStateChanged( MT_SYSTEM_TASK_COLOR, MT_SYSTEM_TASK_NAME, TaskExecuteState::STOP); 503 context.NotifyThreadStop(context.workerIndex); 504 #endif 505 506 } 507 508 void TaskScheduler::RunTasksImpl(ArrayView<internal::TaskBucket>& buckets, FiberContext * parentFiber, bool restoredFromAwaitState) 509 { 510 // This storage is necessary to calculate how many tasks we add to different groups 511 int newTaskCountInGroup[TaskGroup::MT_MAX_GROUPS_COUNT]; 512 513 // Default value is 0 514 memset(&newTaskCountInGroup[0], 0, sizeof(newTaskCountInGroup)); 515 516 // Set parent fiber pointer 517 // Calculate the number of tasks per group 518 // Calculate total number of tasks 519 size_t count = 0; 520 for (size_t i = 0; i < buckets.Size(); ++i) 521 { 522 internal::TaskBucket& bucket = buckets[i]; 523 for (size_t taskIndex = 0; taskIndex < bucket.count; taskIndex++) 524 { 525 internal::GroupedTask & task = bucket.tasks[taskIndex]; 526 527 task.parentFiber = parentFiber; 528 529 int idx = task.group.GetValidIndex(); 530 MT_ASSERT(idx >= 0 && idx < TaskGroup::MT_MAX_GROUPS_COUNT, "Invalid index"); 531 newTaskCountInGroup[idx]++; 532 } 533 534 count += bucket.count; 535 } 536 537 // Increments child fibers count on parent fiber 538 if (parentFiber) 539 { 540 parentFiber->childrenFibersCount.AddFetch((int)count); 541 } 542 543 if (restoredFromAwaitState == false) 544 { 545 // Increase the number of active tasks in the group using data from temporary storage 546 for (size_t i = 0; i < TaskGroup::MT_MAX_GROUPS_COUNT; i++) 547 { 548 int groupNewTaskCount = newTaskCountInGroup[i]; 549 if (groupNewTaskCount > 0) 550 { 551 groupStats[i].Reset(); 552 groupStats[i].Add((uint32)groupNewTaskCount); 553 } 554 } 555 556 // Increments all task in progress counter 557 allGroups.Reset(); 558 allGroups.Add((uint32)count); 559 } else 560 { 561 // If task's restored from await state, counters already in correct state 562 } 563 564 // Add to thread queue 565 for (size_t i = 0; i < buckets.Size(); ++i) 566 { 567 int bucketIndex = roundRobinThreadIndex.IncFetch() % threadsCount.LoadRelaxed(); 568 internal::ThreadContext & context = threadContext[bucketIndex]; 569 570 internal::TaskBucket& bucket = buckets[i]; 571 572 for(;;) 573 { 574 bool res = context.queue.Add(bucket.tasks, bucket.count); 575 if (res == true) 576 { 577 break; 578 } 579 580 //Can't add new tasks onto the queue. Look like the job system is overloaded. Wait some time and try again. 581 //TODO: implement waiting until workers done using events. 582 Thread::Sleep(10); 583 } 584 585 context.hasNewTasksEvent.Signal(); 586 } 587 } 588 589 void TaskScheduler::RunAsync(TaskGroup group, const TaskHandle* taskHandleArray, uint32 taskHandleCount) 590 { 591 MT_ASSERT(!IsWorkerThread(), "Can't use RunAsync inside Task. Use FiberContext.RunAsync() instead."); 592 593 ArrayView<internal::GroupedTask> buffer(MT_ALLOCATE_ON_STACK(sizeof(internal::GroupedTask) * taskHandleCount), taskHandleCount); 594 595 uint32 bucketCount = MT::Min((uint32)GetWorkersCount(), taskHandleCount); 596 ArrayView<internal::TaskBucket> buckets(MT_ALLOCATE_ON_STACK(sizeof(internal::TaskBucket) * bucketCount), bucketCount); 597 598 internal::DistibuteDescriptions(group, taskHandleArray, buffer, buckets); 599 RunTasksImpl(buckets, nullptr, false); 600 } 601 602 bool TaskScheduler::WaitGroup(TaskGroup group, uint32 milliseconds) 603 { 604 MT_VERIFY(IsWorkerThread() == false, "Can't use WaitGroup inside Task. Use FiberContext.WaitGroupAndYield() instead.", return false); 605 606 TaskScheduler::TaskGroupDescription & groupDesc = GetGroupDesc(group); 607 608 return groupDesc.Wait(milliseconds); 609 } 610 611 bool TaskScheduler::WaitAll(uint32 milliseconds) 612 { 613 MT_VERIFY(IsWorkerThread() == false, "Can't use WaitAll inside Task.", return false); 614 615 return allGroups.Wait(milliseconds); 616 } 617 618 bool TaskScheduler::IsTaskStealingDisabled() const 619 { 620 return taskStealingDisabled; 621 } 622 623 int32 TaskScheduler::GetWorkersCount() const 624 { 625 return threadsCount.LoadRelaxed(); 626 } 627 628 bool TaskScheduler::IsWorkerThread() const 629 { 630 for (uint32 i = 0; i < MT_MAX_THREAD_COUNT; i++) 631 { 632 if (threadContext[i].thread.IsCurrentThread()) 633 { 634 return true; 635 } 636 } 637 return false; 638 } 639 640 TaskGroup TaskScheduler::CreateGroup() 641 { 642 MT_ASSERT(IsWorkerThread() == false, "Can't use CreateGroup inside Task."); 643 644 TaskGroup group; 645 if (!availableGroups.TryPop(group)) 646 { 647 MT_REPORT_ASSERT("Group pool is empty"); 648 } 649 650 int idx = group.GetValidIndex(); 651 MT_USED_IN_ASSERT(idx); 652 MT_ASSERT(groupStats[idx].GetDebugIsFree() == true, "Bad logic!"); 653 #if MT_GROUP_DEBUG 654 groupStats[idx].SetDebugIsFree(false); 655 #endif 656 657 return group; 658 } 659 660 void TaskScheduler::ReleaseGroup(TaskGroup group) 661 { 662 MT_ASSERT(IsWorkerThread() == false, "Can't use ReleaseGroup inside Task."); 663 MT_ASSERT(group.IsValid(), "Invalid group ID"); 664 665 int idx = group.GetValidIndex(); 666 MT_USED_IN_ASSERT(idx); 667 MT_ASSERT(groupStats[idx].GetDebugIsFree() == false, "Group already released"); 668 #if MT_GROUP_DEBUG 669 groupStats[idx].SetDebugIsFree(true); 670 #endif 671 672 bool res = availableGroups.TryPush(std::move(group)); 673 MT_USED_IN_ASSERT(res); 674 MT_ASSERT(res, "Can't return group to pool"); 675 } 676 677 TaskScheduler::TaskGroupDescription & TaskScheduler::GetGroupDesc(TaskGroup group) 678 { 679 MT_ASSERT(group.IsValid(), "Invalid group ID"); 680 681 int idx = group.GetValidIndex(); 682 TaskScheduler::TaskGroupDescription & groupDesc = groupStats[idx]; 683 684 MT_ASSERT(groupDesc.GetDebugIsFree() == false, "Invalid group"); 685 return groupDesc; 686 } 687 } 688 689 690