1 // The MIT License (MIT) 2 // 3 // Copyright (c) 2015 Sergey Makeev, Vadim Slyusarev 4 // 5 // Permission is hereby granted, free of charge, to any person obtaining a copy 6 // of this software and associated documentation files (the "Software"), to deal 7 // in the Software without restriction, including without limitation the rights 8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 9 // copies of the Software, and to permit persons to whom the Software is 10 // furnished to do so, subject to the following conditions: 11 // 12 // The above copyright notice and this permission notice shall be included in 13 // all copies or substantial portions of the Software. 14 // 15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 21 // THE SOFTWARE. 22 23 #include <MTScheduler.h> 24 #include <string.h> // for memset 25 26 namespace MT 27 { 28 29 #ifdef MT_INSTRUMENTED_BUILD 30 TaskScheduler::TaskScheduler(uint32 workerThreadsCount, WorkerThreadParams* workerParameters, IProfilerEventListener* listener, TaskStealingMode::Type stealMode) 31 #else 32 TaskScheduler::TaskScheduler(uint32 workerThreadsCount, WorkerThreadParams* workerParameters, TaskStealingMode::Type stealMode) 33 #endif 34 : roundRobinThreadIndex(0) 35 , startedThreadsCount(0) 36 , taskStealingDisabled(stealMode == TaskStealingMode::DISABLED) 37 { 38 39 #ifdef MT_INSTRUMENTED_BUILD 40 profilerEventListener = listener; 41 #endif 42 43 if (workerThreadsCount != 0) 44 { 45 threadsCount.StoreRelaxed( MT::Clamp(workerThreadsCount, (uint32)1, (uint32)MT_MAX_THREAD_COUNT) ); 46 } else 47 { 48 //query number of processor 49 threadsCount.StoreRelaxed( (uint32)MT::Clamp(Thread::GetNumberOfHardwareThreads() - 1, 1, (int)MT_MAX_THREAD_COUNT) ); 50 } 51 52 // create fiber pool (fibers with standard stack size) 53 for (uint32 i = 0; i < MT_MAX_STANDART_FIBERS_COUNT; i++) 54 { 55 FiberContext& context = standartFiberContexts[i]; 56 context.fiber.Create(MT_STANDART_FIBER_STACK_SIZE, FiberMain, &context); 57 bool res = standartFibersAvailable.TryPush( &context ); 58 MT_USED_IN_ASSERT(res); 59 MT_ASSERT(res == true, "Can't add fiber to storage"); 60 } 61 62 // create fiber pool (fibers with extended stack size) 63 for (uint32 i = 0; i < MT_MAX_EXTENDED_FIBERS_COUNT; i++) 64 { 65 FiberContext& context = extendedFiberContexts[i]; 66 context.fiber.Create(MT_EXTENDED_FIBER_STACK_SIZE, FiberMain, &context); 67 bool res = extendedFibersAvailable.TryPush( &context ); 68 MT_USED_IN_ASSERT(res); 69 MT_ASSERT(res == true, "Can't add fiber to storage"); 70 } 71 72 73 for (int16 i = 0; i < TaskGroup::MT_MAX_GROUPS_COUNT; i++) 74 { 75 if (i != TaskGroup::DEFAULT) 76 { 77 bool res = availableGroups.TryPush( TaskGroup(i) ); 78 MT_USED_IN_ASSERT(res); 79 MT_ASSERT(res == true, "Can't add group to storage"); 80 } 81 } 82 83 #if MT_GROUP_DEBUG 84 groupStats[TaskGroup::DEFAULT].SetDebugIsFree(false); 85 #endif 86 87 // create worker thread pool 88 int32 totalThreadsCount = GetWorkersCount(); 89 for (int32 i = 0; i < totalThreadsCount; i++) 90 { 91 threadContext[i].SetThreadIndex(i); 92 threadContext[i].taskScheduler = this; 93 94 uint32 threadCore = i; 95 ThreadPriority::Type priority = ThreadPriority::DEFAULT; 96 if (workerParameters != nullptr) 97 { 98 const WorkerThreadParams& params = workerParameters[i]; 99 100 threadCore = params.core; 101 priority = params.priority; 102 } 103 104 threadContext[i].thread.Start( MT_SCHEDULER_STACK_SIZE, WorkerThreadMain, &threadContext[i], threadCore, priority); 105 } 106 } 107 108 109 TaskScheduler::~TaskScheduler() 110 { 111 int32 totalThreadsCount = GetWorkersCount(); 112 for (int32 i = 0; i < totalThreadsCount; i++) 113 { 114 threadContext[i].state.Store(internal::ThreadState::EXIT); 115 threadContext[i].hasNewTasksEvent.Signal(); 116 } 117 118 for (int32 i = 0; i < totalThreadsCount; i++) 119 { 120 threadContext[i].thread.Join(); 121 } 122 } 123 124 FiberContext* TaskScheduler::RequestFiberContext(internal::GroupedTask& task) 125 { 126 FiberContext *fiberContext = task.awaitingFiber; 127 if (fiberContext) 128 { 129 task.awaitingFiber = nullptr; 130 return fiberContext; 131 } 132 133 MT::StackRequirements::Type stackRequirements = task.desc.stackRequirements; 134 135 fiberContext = nullptr; 136 bool res = false; 137 MT_USED_IN_ASSERT(res); 138 switch(stackRequirements) 139 { 140 case MT::StackRequirements::STANDARD: 141 res = standartFibersAvailable.TryPop(fiberContext); 142 MT_ASSERT(res, "Can't get more standard fibers!"); 143 break; 144 case MT::StackRequirements::EXTENDED: 145 res = extendedFibersAvailable.TryPop(fiberContext); 146 MT_ASSERT(res, "Can't get more extended fibers!"); 147 break; 148 default: 149 MT_REPORT_ASSERT("Unknown stack requrements"); 150 } 151 152 MT_ASSERT(fiberContext != nullptr, "Can't get more fibers. Too many tasks in flight simultaneously?"); 153 154 fiberContext->currentTask = task.desc; 155 fiberContext->currentGroup = task.group; 156 fiberContext->parentFiber = task.parentFiber; 157 fiberContext->stackRequirements = stackRequirements; 158 return fiberContext; 159 } 160 161 void TaskScheduler::ReleaseFiberContext(FiberContext*&& fiberContext) 162 { 163 MT_ASSERT(fiberContext, "Can't release nullptr Fiber. fiberContext is nullptr"); 164 165 MT::StackRequirements::Type stackRequirements = fiberContext->stackRequirements; 166 fiberContext->Reset(); 167 168 MT_ASSERT(fiberContext != nullptr, "Fiber context can't be nullptr"); 169 170 bool res = false; 171 MT_USED_IN_ASSERT(res); 172 switch(stackRequirements) 173 { 174 case MT::StackRequirements::STANDARD: 175 res = standartFibersAvailable.TryPush(std::move(fiberContext)); 176 break; 177 case MT::StackRequirements::EXTENDED: 178 res = extendedFibersAvailable.TryPush(std::move(fiberContext)); 179 break; 180 default: 181 MT_REPORT_ASSERT("Unknown stack requrements"); 182 } 183 184 MT_USED_IN_ASSERT(res); 185 MT_ASSERT(res != false, "Can't return fiber to storage"); 186 } 187 188 FiberContext* TaskScheduler::ExecuteTask(internal::ThreadContext& threadContext, FiberContext* fiberContext) 189 { 190 MT_ASSERT(threadContext.thread.IsCurrentThread(), "Thread context sanity check failed"); 191 192 MT_ASSERT(fiberContext, "Invalid fiber context"); 193 MT_ASSERT(fiberContext->currentTask.IsValid(), "Invalid task"); 194 195 // Set actual thread context to fiber 196 fiberContext->SetThreadContext(&threadContext); 197 198 // Update task status 199 fiberContext->SetStatus(FiberTaskStatus::RUNNED); 200 201 MT_ASSERT(fiberContext->GetThreadContext()->thread.IsCurrentThread(), "Thread context sanity check failed"); 202 203 const void* poolUserData = fiberContext->currentTask.userData; 204 TPoolTaskDestroy poolDestroyFunc = fiberContext->currentTask.poolDestroyFunc; 205 206 #ifdef MT_INSTRUMENTED_BUILD 207 //threadContext.NotifyTaskExecuteStateChanged( MT_SYSTEM_TASK_COLOR, MT_SYSTEM_TASK_NAME, TaskExecuteState::SUSPEND); 208 threadContext.NotifyTaskExecuteStateChanged( MT_SYSTEM_TASK_COLOR, MT_SYSTEM_TASK_NAME, TaskExecuteState::STOP); 209 #endif 210 211 // Run current task code 212 Fiber::SwitchTo(threadContext.schedulerFiber, fiberContext->fiber); 213 214 #ifdef MT_INSTRUMENTED_BUILD 215 //threadContext.NotifyTaskExecuteStateChanged( MT_SYSTEM_TASK_COLOR, MT_SYSTEM_TASK_NAME, TaskExecuteState::RESUME); 216 threadContext.NotifyTaskExecuteStateChanged( MT_SYSTEM_TASK_COLOR, MT_SYSTEM_TASK_NAME, TaskExecuteState::START); 217 #endif 218 219 // If task was done 220 FiberTaskStatus::Type taskStatus = fiberContext->GetStatus(); 221 if (taskStatus == FiberTaskStatus::FINISHED) 222 { 223 //destroy task (call dtor) for "fire and forget" type of task from TaskPool 224 if (poolDestroyFunc != nullptr) 225 { 226 poolDestroyFunc(poolUserData); 227 } 228 229 TaskGroup taskGroup = fiberContext->currentGroup; 230 231 TaskScheduler::TaskGroupDescription & groupDesc = threadContext.taskScheduler->GetGroupDesc(taskGroup); 232 233 // Update group status 234 int groupTaskCount = groupDesc.Dec(); 235 MT_ASSERT(groupTaskCount >= 0, "Sanity check failed!"); 236 if (groupTaskCount == 0) 237 { 238 // Signal pending threads that group work is finished. Group can be destroyed after this call. 239 groupDesc.Signal(); 240 241 fiberContext->currentGroup = TaskGroup::INVALID; 242 } 243 244 // Update total task count 245 int allGroupTaskCount = threadContext.taskScheduler->allGroups.Dec(); 246 MT_ASSERT(allGroupTaskCount >= 0, "Sanity check failed!"); 247 if (allGroupTaskCount == 0) 248 { 249 // Notify all tasks in all group finished 250 threadContext.taskScheduler->allGroups.Signal(); 251 } 252 253 FiberContext* parentFiberContext = fiberContext->parentFiber; 254 if (parentFiberContext != nullptr) 255 { 256 int childrenFibersCount = parentFiberContext->childrenFibersCount.DecFetch(); 257 MT_ASSERT(childrenFibersCount >= 0, "Sanity check failed!"); 258 259 if (childrenFibersCount == 0) 260 { 261 // This is a last subtask. Restore parent task 262 MT_ASSERT(threadContext.thread.IsCurrentThread(), "Thread context sanity check failed"); 263 MT_ASSERT(parentFiberContext->GetThreadContext() == nullptr, "Inactive parent should not have a valid thread context"); 264 265 // WARNING!! Thread context can changed here! Set actual current thread context. 266 parentFiberContext->SetThreadContext(&threadContext); 267 268 MT_ASSERT(parentFiberContext->GetThreadContext()->thread.IsCurrentThread(), "Thread context sanity check failed"); 269 270 // All subtasks is done. 271 // Exiting and return parent fiber to scheduler 272 return parentFiberContext; 273 } else 274 { 275 // Other subtasks still exist 276 // Exiting 277 return nullptr; 278 } 279 } else 280 { 281 // Task is finished and no parent task 282 // Exiting 283 return nullptr; 284 } 285 } 286 287 MT_ASSERT(taskStatus != FiberTaskStatus::RUNNED, "Incorrect task status") 288 return nullptr; 289 } 290 291 292 void TaskScheduler::FiberMain(void* userData) 293 { 294 FiberContext& fiberContext = *(FiberContext*)(userData); 295 for(;;) 296 { 297 MT_ASSERT(fiberContext.currentTask.IsValid(), "Invalid task in fiber context"); 298 MT_ASSERT(fiberContext.GetThreadContext(), "Invalid thread context"); 299 MT_ASSERT(fiberContext.GetThreadContext()->thread.IsCurrentThread(), "Thread context sanity check failed"); 300 301 #ifdef MT_INSTRUMENTED_BUILD 302 fiberContext.fiber.SetName( MT_SYSTEM_TASK_FIBER_NAME ); 303 fiberContext.GetThreadContext()->NotifyTaskExecuteStateChanged( fiberContext.currentTask.debugColor, fiberContext.currentTask.debugID, TaskExecuteState::START ); 304 #endif 305 306 fiberContext.currentTask.taskFunc( fiberContext, fiberContext.currentTask.userData ); 307 fiberContext.SetStatus(FiberTaskStatus::FINISHED); 308 309 #ifdef MT_INSTRUMENTED_BUILD 310 fiberContext.fiber.SetName( MT_SYSTEM_TASK_FIBER_NAME ); 311 fiberContext.GetThreadContext()->NotifyTaskExecuteStateChanged( fiberContext.currentTask.debugColor, fiberContext.currentTask.debugID, TaskExecuteState::STOP ); 312 #endif 313 314 Fiber::SwitchTo(fiberContext.fiber, fiberContext.GetThreadContext()->schedulerFiber); 315 } 316 317 } 318 319 320 bool TaskScheduler::TryStealTask(internal::ThreadContext& threadContext, internal::GroupedTask & task, uint32 workersCount, bool taskStealingDisabled) 321 { 322 if (workersCount <= 1 || taskStealingDisabled ) 323 { 324 return false; 325 } 326 327 uint32 victimIndex = threadContext.random.Get(); 328 329 for (uint32 attempt = 0; attempt < workersCount; attempt++) 330 { 331 uint32 index = victimIndex % workersCount; 332 if (index == threadContext.workerIndex) 333 { 334 victimIndex++; 335 index = victimIndex % workersCount; 336 } 337 338 internal::ThreadContext& victimContext = threadContext.taskScheduler->threadContext[index]; 339 if (victimContext.queue.TryPopNewest(task)) 340 { 341 return true; 342 } 343 344 victimIndex++; 345 } 346 return false; 347 } 348 349 350 void TaskScheduler::WorkerThreadMain( void* userData ) 351 { 352 internal::ThreadContext& context = *(internal::ThreadContext*)(userData); 353 MT_ASSERT(context.taskScheduler, "Task scheduler must be not null!"); 354 355 #ifdef MT_INSTRUMENTED_BUILD 356 const char* threadNames[] = {"worker0","worker1","worker2","worker3","worker4","worker5","worker6","worker7","worker8","worker9","worker10","worker11","worker12"}; 357 if (context.workerIndex < MT_ARRAY_SIZE(threadNames)) 358 { 359 Thread::SetThreadName(threadNames[context.workerIndex]); 360 } else 361 { 362 Thread::SetThreadName("worker_thread"); 363 } 364 #endif 365 366 context.schedulerFiber.CreateFromCurrentThreadAndRun(SchedulerFiberMain, userData); 367 } 368 369 370 void TaskScheduler::SchedulerFiberMain( void* userData ) 371 { 372 internal::ThreadContext& context = *(internal::ThreadContext*)(userData); 373 MT_ASSERT(context.taskScheduler, "Task scheduler must be not null!"); 374 375 #ifdef MT_INSTRUMENTED_BUILD 376 context.NotifyThreadCreate(context.workerIndex); 377 #endif 378 379 bool taskStealingDisabled = context.taskScheduler->IsTaskStealingDisabled(); 380 uint32 workersCount = context.taskScheduler->GetWorkersCount(); 381 int32 totalThreadsCount = context.taskScheduler->threadsCount.LoadRelaxed(); 382 383 context.taskScheduler->startedThreadsCount.IncFetch(); 384 385 //Simple spinlock until all threads is started and initialized 386 for(;;) 387 { 388 int32 initializedThreadsCount = context.taskScheduler->startedThreadsCount.Load(); 389 if (initializedThreadsCount == totalThreadsCount) 390 { 391 break; 392 } 393 394 // sleep some time until all other thread initialized 395 Thread::Sleep(1); 396 } 397 398 HardwareFullMemoryBarrier(); 399 400 #ifdef MT_INSTRUMENTED_BUILD 401 context.NotifyThreadStart(context.workerIndex); 402 context.NotifyTaskExecuteStateChanged( MT_SYSTEM_TASK_COLOR, MT_SYSTEM_TASK_NAME, TaskExecuteState::START); 403 #endif 404 405 while(context.state.Load() != internal::ThreadState::EXIT) 406 { 407 internal::GroupedTask task; 408 if (context.queue.TryPopOldest(task) || TryStealTask(context, task, workersCount, taskStealingDisabled) ) 409 { 410 #ifdef MT_INSTRUMENTED_BUILD 411 bool isNewTask = (task.awaitingFiber == nullptr); 412 #endif 413 414 // There is a new task 415 FiberContext* fiberContext = context.taskScheduler->RequestFiberContext(task); 416 MT_ASSERT(fiberContext, "Can't get execution context from pool"); 417 MT_ASSERT(fiberContext->currentTask.IsValid(), "Sanity check failed"); 418 MT_ASSERT(fiberContext->stackRequirements == task.desc.stackRequirements, "Sanity check failed"); 419 420 while(fiberContext) 421 { 422 #ifdef MT_INSTRUMENTED_BUILD 423 if (isNewTask) 424 { 425 //TODO: 426 isNewTask = false; 427 } 428 #endif 429 // prevent invalid fiber resume from child tasks, before ExecuteTask is done 430 fiberContext->childrenFibersCount.IncFetch(); 431 432 FiberContext* parentFiber = ExecuteTask(context, fiberContext); 433 434 FiberTaskStatus::Type taskStatus = fiberContext->GetStatus(); 435 436 //release guard 437 int childrenFibersCount = fiberContext->childrenFibersCount.DecFetch(); 438 439 // Can drop fiber context - task is finished 440 if (taskStatus == FiberTaskStatus::FINISHED) 441 { 442 MT_ASSERT( childrenFibersCount == 0, "Sanity check failed"); 443 context.taskScheduler->ReleaseFiberContext(std::move(fiberContext)); 444 445 // If parent fiber is exist transfer flow control to parent fiber, if parent fiber is null, exit 446 fiberContext = parentFiber; 447 } else 448 { 449 MT_ASSERT( childrenFibersCount >= 0, "Sanity check failed"); 450 451 // No subtasks here and status is not finished, this mean all subtasks already finished before parent return from ExecuteTask 452 if (childrenFibersCount == 0) 453 { 454 MT_ASSERT(parentFiber == nullptr, "Sanity check failed"); 455 } else 456 { 457 // If subtasks still exist, drop current task execution. task will be resumed when last subtask finished 458 break; 459 } 460 461 // If task is in await state drop execution. task will be resumed when RestoreAwaitingTasks called 462 if (taskStatus == FiberTaskStatus::AWAITING_GROUP) 463 { 464 break; 465 } 466 } 467 } //while(fiberContext) 468 469 } else 470 { 471 #ifdef MT_INSTRUMENTED_BUILD 472 context.NotifyThreadIdleBegin(context.workerIndex); 473 #endif 474 475 // Queue if empty and stealing attempt failed 476 // Wait for new events 477 context.hasNewTasksEvent.Wait(2000); 478 479 #ifdef MT_INSTRUMENTED_BUILD 480 context.NotifyThreadIdleEnd(context.workerIndex); 481 #endif 482 483 } 484 485 } // main thread loop 486 487 #ifdef MT_INSTRUMENTED_BUILD 488 context.NotifyTaskExecuteStateChanged( MT_SYSTEM_TASK_COLOR, MT_SYSTEM_TASK_NAME, TaskExecuteState::STOP); 489 context.NotifyThreadStop(context.workerIndex); 490 #endif 491 492 } 493 494 void TaskScheduler::RunTasksImpl(ArrayView<internal::TaskBucket>& buckets, FiberContext * parentFiber, bool restoredFromAwaitState) 495 { 496 // This storage is necessary to calculate how many tasks we add to different groups 497 int newTaskCountInGroup[TaskGroup::MT_MAX_GROUPS_COUNT]; 498 499 // Default value is 0 500 memset(&newTaskCountInGroup[0], 0, sizeof(newTaskCountInGroup)); 501 502 // Set parent fiber pointer 503 // Calculate the number of tasks per group 504 // Calculate total number of tasks 505 size_t count = 0; 506 for (size_t i = 0; i < buckets.Size(); ++i) 507 { 508 internal::TaskBucket& bucket = buckets[i]; 509 for (size_t taskIndex = 0; taskIndex < bucket.count; taskIndex++) 510 { 511 internal::GroupedTask & task = bucket.tasks[taskIndex]; 512 513 task.parentFiber = parentFiber; 514 515 int idx = task.group.GetValidIndex(); 516 MT_ASSERT(idx >= 0 && idx < TaskGroup::MT_MAX_GROUPS_COUNT, "Invalid index"); 517 newTaskCountInGroup[idx]++; 518 } 519 520 count += bucket.count; 521 } 522 523 // Increments child fibers count on parent fiber 524 if (parentFiber) 525 { 526 parentFiber->childrenFibersCount.AddFetch((int)count); 527 } 528 529 if (restoredFromAwaitState == false) 530 { 531 // Increase the number of active tasks in the group using data from temporary storage 532 for (size_t i = 0; i < TaskGroup::MT_MAX_GROUPS_COUNT; i++) 533 { 534 int groupNewTaskCount = newTaskCountInGroup[i]; 535 if (groupNewTaskCount > 0) 536 { 537 groupStats[i].Reset(); 538 groupStats[i].Add((uint32)groupNewTaskCount); 539 } 540 } 541 542 // Increments all task in progress counter 543 allGroups.Reset(); 544 allGroups.Add((uint32)count); 545 } else 546 { 547 // If task's restored from await state, counters already in correct state 548 } 549 550 // Add to thread queue 551 for (size_t i = 0; i < buckets.Size(); ++i) 552 { 553 int bucketIndex = roundRobinThreadIndex.IncFetch() % threadsCount.LoadRelaxed(); 554 internal::ThreadContext & context = threadContext[bucketIndex]; 555 556 internal::TaskBucket& bucket = buckets[i]; 557 558 for(;;) 559 { 560 bool res = context.queue.Add(bucket.tasks, bucket.count); 561 if (res == true) 562 { 563 break; 564 } 565 566 //Can't add new tasks onto the queue. Look like the job system is overloaded. Wait some time and try again. 567 //TODO: implement waiting until workers done using events. 568 Thread::Sleep(10); 569 } 570 571 context.hasNewTasksEvent.Signal(); 572 } 573 } 574 575 void TaskScheduler::RunAsync(TaskGroup group, const TaskHandle* taskHandleArray, uint32 taskHandleCount) 576 { 577 MT_ASSERT(!IsWorkerThread(), "Can't use RunAsync inside Task. Use FiberContext.RunAsync() instead."); 578 579 ArrayView<internal::GroupedTask> buffer(MT_ALLOCATE_ON_STACK(sizeof(internal::GroupedTask) * taskHandleCount), taskHandleCount); 580 581 uint32 bucketCount = MT::Min((uint32)GetWorkersCount(), taskHandleCount); 582 ArrayView<internal::TaskBucket> buckets(MT_ALLOCATE_ON_STACK(sizeof(internal::TaskBucket) * bucketCount), bucketCount); 583 584 internal::DistibuteDescriptions(group, taskHandleArray, buffer, buckets); 585 RunTasksImpl(buckets, nullptr, false); 586 } 587 588 bool TaskScheduler::WaitGroup(TaskGroup group, uint32 milliseconds) 589 { 590 MT_VERIFY(IsWorkerThread() == false, "Can't use WaitGroup inside Task. Use FiberContext.WaitGroupAndYield() instead.", return false); 591 592 TaskScheduler::TaskGroupDescription & groupDesc = GetGroupDesc(group); 593 594 return groupDesc.Wait(milliseconds); 595 } 596 597 bool TaskScheduler::WaitAll(uint32 milliseconds) 598 { 599 MT_VERIFY(IsWorkerThread() == false, "Can't use WaitAll inside Task.", return false); 600 601 return allGroups.Wait(milliseconds); 602 } 603 604 bool TaskScheduler::IsTaskStealingDisabled() const 605 { 606 return taskStealingDisabled; 607 } 608 609 int32 TaskScheduler::GetWorkersCount() const 610 { 611 return threadsCount.LoadRelaxed(); 612 } 613 614 bool TaskScheduler::IsWorkerThread() const 615 { 616 for (uint32 i = 0; i < MT_MAX_THREAD_COUNT; i++) 617 { 618 if (threadContext[i].thread.IsCurrentThread()) 619 { 620 return true; 621 } 622 } 623 return false; 624 } 625 626 TaskGroup TaskScheduler::CreateGroup() 627 { 628 MT_ASSERT(IsWorkerThread() == false, "Can't use CreateGroup inside Task."); 629 630 TaskGroup group; 631 if (!availableGroups.TryPop(group)) 632 { 633 MT_REPORT_ASSERT("Group pool is empty"); 634 } 635 636 int idx = group.GetValidIndex(); 637 MT_USED_IN_ASSERT(idx); 638 MT_ASSERT(groupStats[idx].GetDebugIsFree() == true, "Bad logic!"); 639 #if MT_GROUP_DEBUG 640 groupStats[idx].SetDebugIsFree(false); 641 #endif 642 643 return group; 644 } 645 646 void TaskScheduler::ReleaseGroup(TaskGroup group) 647 { 648 MT_ASSERT(IsWorkerThread() == false, "Can't use ReleaseGroup inside Task."); 649 MT_ASSERT(group.IsValid(), "Invalid group ID"); 650 651 int idx = group.GetValidIndex(); 652 MT_USED_IN_ASSERT(idx); 653 MT_ASSERT(groupStats[idx].GetDebugIsFree() == false, "Group already released"); 654 #if MT_GROUP_DEBUG 655 groupStats[idx].SetDebugIsFree(true); 656 #endif 657 658 bool res = availableGroups.TryPush(std::move(group)); 659 MT_USED_IN_ASSERT(res); 660 MT_ASSERT(res, "Can't return group to pool"); 661 } 662 663 TaskScheduler::TaskGroupDescription & TaskScheduler::GetGroupDesc(TaskGroup group) 664 { 665 MT_ASSERT(group.IsValid(), "Invalid group ID"); 666 667 int idx = group.GetValidIndex(); 668 TaskScheduler::TaskGroupDescription & groupDesc = groupStats[idx]; 669 670 MT_ASSERT(groupDesc.GetDebugIsFree() == false, "Invalid group"); 671 return groupDesc; 672 } 673 } 674 675 676