1 // The MIT License (MIT) 2 // 3 // Copyright (c) 2015 Sergey Makeev, Vadim Slyusarev 4 // 5 // Permission is hereby granted, free of charge, to any person obtaining a copy 6 // of this software and associated documentation files (the "Software"), to deal 7 // in the Software without restriction, including without limitation the rights 8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 9 // copies of the Software, and to permit persons to whom the Software is 10 // furnished to do so, subject to the following conditions: 11 // 12 // The above copyright notice and this permission notice shall be included in 13 // all copies or substantial portions of the Software. 14 // 15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 21 // THE SOFTWARE. 22 23 #include <MTScheduler.h> 24 #include <MTStaticVector.h> 25 #include <string.h> // for memset 26 27 28 // Enable low latency experimental wait code path. 29 // Look like low latency hybrid wait is work better for PS4/X1, but a little worse on PC 30 //#define MT_LOW_LATENCY_EXPERIMENTAL_WAIT (1) 31 32 #if defined(MT_PLATFORM_POSIX) 33 #define MT_LOW_LATENCY_EXPERIMENTAL_WAIT (1) 34 #endif 35 36 namespace MT 37 { 38 mt_thread_local uint32 isWorkerThreadTLS = 0; 39 40 41 #ifdef MT_INSTRUMENTED_BUILD 42 TaskScheduler::TaskScheduler(uint32 workerThreadsCount, WorkerThreadParams* workerParameters, IProfilerEventListener* listener, TaskStealingMode::Type stealMode) 43 #else 44 TaskScheduler::TaskScheduler(uint32 workerThreadsCount, WorkerThreadParams* workerParameters, TaskStealingMode::Type stealMode) 45 #endif 46 : roundRobinThreadIndex(0) 47 , startedThreadsCount(0) 48 , taskStealingDisabled(stealMode == TaskStealingMode::DISABLED) 49 { 50 51 #ifdef MT_INSTRUMENTED_BUILD 52 profilerEventListener = listener; 53 #endif 54 55 if (workerThreadsCount != 0) 56 { 57 threadsCount.StoreRelaxed( MT::Clamp(workerThreadsCount, (uint32)1, (uint32)MT_MAX_THREAD_COUNT) ); 58 } else 59 { 60 //query number of processor 61 threadsCount.StoreRelaxed( (uint32)MT::Clamp(Thread::GetNumberOfHardwareThreads() - 1, 1, (int)MT_MAX_THREAD_COUNT) ); 62 } 63 64 // create fiber pool (fibers with standard stack size) 65 for (uint32 i = 0; i < MT_MAX_STANDART_FIBERS_COUNT; i++) 66 { 67 FiberContext& context = standartFiberContexts[i]; 68 context.fiber.Create(MT_STANDART_FIBER_STACK_SIZE, FiberMain, &context); 69 bool res = standartFibersAvailable.TryPush( &context ); 70 MT_USED_IN_ASSERT(res); 71 MT_ASSERT(res == true, "Can't add fiber to storage"); 72 } 73 74 // create fiber pool (fibers with extended stack size) 75 for (uint32 i = 0; i < MT_MAX_EXTENDED_FIBERS_COUNT; i++) 76 { 77 FiberContext& context = extendedFiberContexts[i]; 78 context.fiber.Create(MT_EXTENDED_FIBER_STACK_SIZE, FiberMain, &context); 79 bool res = extendedFibersAvailable.TryPush( &context ); 80 MT_USED_IN_ASSERT(res); 81 MT_ASSERT(res == true, "Can't add fiber to storage"); 82 } 83 84 85 for (int16 i = 0; i < TaskGroup::MT_MAX_GROUPS_COUNT; i++) 86 { 87 if (i != TaskGroup::DEFAULT) 88 { 89 bool res = availableGroups.TryPush( TaskGroup(i) ); 90 MT_USED_IN_ASSERT(res); 91 MT_ASSERT(res == true, "Can't add group to storage"); 92 } 93 } 94 95 #if MT_GROUP_DEBUG 96 groupStats[TaskGroup::DEFAULT].SetDebugIsFree(false); 97 #endif 98 99 // create worker thread pool 100 int32 totalThreadsCount = GetWorkersCount(); 101 for (int32 i = 0; i < totalThreadsCount; i++) 102 { 103 threadContext[i].SetThreadIndex(i); 104 threadContext[i].taskScheduler = this; 105 106 uint32 threadCore = i; 107 ThreadPriority::Type priority = ThreadPriority::DEFAULT; 108 if (workerParameters != nullptr) 109 { 110 const WorkerThreadParams& params = workerParameters[i]; 111 112 threadCore = params.core; 113 priority = params.priority; 114 } 115 116 threadContext[i].thread.Start( MT_SCHEDULER_STACK_SIZE, WorkerThreadMain, &threadContext[i], threadCore, priority); 117 } 118 } 119 120 121 TaskScheduler::~TaskScheduler() 122 { 123 int32 totalThreadsCount = GetWorkersCount(); 124 for (int32 i = 0; i < totalThreadsCount; i++) 125 { 126 threadContext[i].state.Store(internal::ThreadState::EXIT); 127 threadContext[i].hasNewTasksEvent.Signal(); 128 } 129 130 for (int32 i = 0; i < totalThreadsCount; i++) 131 { 132 threadContext[i].thread.Join(); 133 } 134 } 135 136 FiberContext* TaskScheduler::RequestFiberContext(internal::GroupedTask& task) 137 { 138 FiberContext *fiberContext = task.awaitingFiber; 139 if (fiberContext) 140 { 141 task.awaitingFiber = nullptr; 142 return fiberContext; 143 } 144 145 MT::StackRequirements::Type stackRequirements = task.desc.stackRequirements; 146 147 fiberContext = nullptr; 148 bool res = false; 149 MT_USED_IN_ASSERT(res); 150 switch(stackRequirements) 151 { 152 case MT::StackRequirements::STANDARD: 153 res = standartFibersAvailable.TryPop(fiberContext); 154 MT_ASSERT(res, "Can't get more standard fibers!"); 155 break; 156 case MT::StackRequirements::EXTENDED: 157 res = extendedFibersAvailable.TryPop(fiberContext); 158 MT_ASSERT(res, "Can't get more extended fibers!"); 159 break; 160 default: 161 MT_REPORT_ASSERT("Unknown stack requrements"); 162 } 163 164 MT_ASSERT(fiberContext != nullptr, "Can't get more fibers. Too many tasks in flight simultaneously?"); 165 166 fiberContext->currentTask = task.desc; 167 fiberContext->currentGroup = task.group; 168 fiberContext->parentFiber = task.parentFiber; 169 fiberContext->stackRequirements = stackRequirements; 170 return fiberContext; 171 } 172 173 void TaskScheduler::ReleaseFiberContext(FiberContext*&& fiberContext) 174 { 175 MT_ASSERT(fiberContext, "Can't release nullptr Fiber. fiberContext is nullptr"); 176 177 MT::StackRequirements::Type stackRequirements = fiberContext->stackRequirements; 178 fiberContext->Reset(); 179 180 MT_ASSERT(fiberContext != nullptr, "Fiber context can't be nullptr"); 181 182 bool res = false; 183 MT_USED_IN_ASSERT(res); 184 switch(stackRequirements) 185 { 186 case MT::StackRequirements::STANDARD: 187 res = standartFibersAvailable.TryPush(std::move(fiberContext)); 188 break; 189 case MT::StackRequirements::EXTENDED: 190 res = extendedFibersAvailable.TryPush(std::move(fiberContext)); 191 break; 192 default: 193 MT_REPORT_ASSERT("Unknown stack requrements"); 194 } 195 196 MT_USED_IN_ASSERT(res); 197 MT_ASSERT(res != false, "Can't return fiber to storage"); 198 } 199 200 FiberContext* TaskScheduler::ExecuteTask(internal::ThreadContext& threadContext, FiberContext* fiberContext) 201 { 202 MT_ASSERT(threadContext.threadId.IsCurrentThread(), "Thread context sanity check failed"); 203 204 MT_ASSERT(fiberContext, "Invalid fiber context"); 205 MT_ASSERT(fiberContext->currentTask.IsValid(), "Invalid task"); 206 207 // Set actual thread context to fiber 208 fiberContext->SetThreadContext(&threadContext); 209 210 // Update task status 211 fiberContext->SetStatus(FiberTaskStatus::RUNNED); 212 213 MT_ASSERT(fiberContext->GetThreadContext()->threadId.IsCurrentThread(), "Thread context sanity check failed"); 214 215 const void* poolUserData = fiberContext->currentTask.userData; 216 TPoolTaskDestroy poolDestroyFunc = fiberContext->currentTask.poolDestroyFunc; 217 218 #ifdef MT_INSTRUMENTED_BUILD 219 //threadContext.NotifyTaskExecuteStateChanged( MT_SYSTEM_TASK_COLOR, MT_SYSTEM_TASK_NAME, TaskExecuteState::SUSPEND); 220 threadContext.NotifyTaskExecuteStateChanged( MT_SYSTEM_TASK_COLOR, MT_SYSTEM_TASK_NAME, TaskExecuteState::STOP); 221 #endif 222 223 // Run current task code 224 Fiber::SwitchTo(threadContext.schedulerFiber, fiberContext->fiber); 225 226 #ifdef MT_INSTRUMENTED_BUILD 227 //threadContext.NotifyTaskExecuteStateChanged( MT_SYSTEM_TASK_COLOR, MT_SYSTEM_TASK_NAME, TaskExecuteState::RESUME); 228 threadContext.NotifyTaskExecuteStateChanged( MT_SYSTEM_TASK_COLOR, MT_SYSTEM_TASK_NAME, TaskExecuteState::START); 229 #endif 230 231 // If task was done 232 FiberTaskStatus::Type taskStatus = fiberContext->GetStatus(); 233 if (taskStatus == FiberTaskStatus::FINISHED) 234 { 235 //destroy task (call dtor) for "fire and forget" type of task from TaskPool 236 if (poolDestroyFunc != nullptr) 237 { 238 poolDestroyFunc(poolUserData); 239 } 240 241 TaskGroup taskGroup = fiberContext->currentGroup; 242 243 TaskScheduler::TaskGroupDescription & groupDesc = threadContext.taskScheduler->GetGroupDesc(taskGroup); 244 245 // Update group status 246 int groupTaskCount = groupDesc.Dec(); 247 MT_ASSERT(groupTaskCount >= 0, "Sanity check failed!"); 248 if (groupTaskCount == 0) 249 { 250 fiberContext->currentGroup = TaskGroup::INVALID; 251 } 252 253 // Update total task count 254 int allGroupTaskCount = threadContext.taskScheduler->allGroups.Dec(); 255 MT_USED_IN_ASSERT(allGroupTaskCount); 256 MT_ASSERT(allGroupTaskCount >= 0, "Sanity check failed!"); 257 258 FiberContext* parentFiberContext = fiberContext->parentFiber; 259 if (parentFiberContext != nullptr) 260 { 261 int childrenFibersCount = parentFiberContext->childrenFibersCount.DecFetch(); 262 MT_ASSERT(childrenFibersCount >= 0, "Sanity check failed!"); 263 264 if (childrenFibersCount == 0) 265 { 266 // This is a last subtask. Restore parent task 267 MT_ASSERT(threadContext.threadId.IsCurrentThread(), "Thread context sanity check failed"); 268 MT_ASSERT(parentFiberContext->GetThreadContext() == nullptr, "Inactive parent should not have a valid thread context"); 269 270 // WARNING!! Thread context can changed here! Set actual current thread context. 271 parentFiberContext->SetThreadContext(&threadContext); 272 273 MT_ASSERT(parentFiberContext->GetThreadContext()->threadId.IsCurrentThread(), "Thread context sanity check failed"); 274 275 // All subtasks is done. 276 // Exiting and return parent fiber to scheduler 277 return parentFiberContext; 278 } else 279 { 280 // Other subtasks still exist 281 // Exiting 282 return nullptr; 283 } 284 } else 285 { 286 // Task is finished and no parent task 287 // Exiting 288 return nullptr; 289 } 290 } 291 292 MT_ASSERT(taskStatus != FiberTaskStatus::RUNNED, "Incorrect task status") 293 return nullptr; 294 } 295 296 297 void TaskScheduler::FiberMain(void* userData) 298 { 299 FiberContext& fiberContext = *(FiberContext*)(userData); 300 for(;;) 301 { 302 MT_ASSERT(fiberContext.currentTask.IsValid(), "Invalid task in fiber context"); 303 MT_ASSERT(fiberContext.GetThreadContext(), "Invalid thread context"); 304 MT_ASSERT(fiberContext.GetThreadContext()->threadId.IsCurrentThread(), "Thread context sanity check failed"); 305 306 #ifdef MT_INSTRUMENTED_BUILD 307 fiberContext.fiber.SetName( MT_SYSTEM_TASK_FIBER_NAME ); 308 fiberContext.GetThreadContext()->NotifyTaskExecuteStateChanged( fiberContext.currentTask.debugColor, fiberContext.currentTask.debugID, TaskExecuteState::START ); 309 #endif 310 311 fiberContext.currentTask.taskFunc( fiberContext, fiberContext.currentTask.userData ); 312 fiberContext.SetStatus(FiberTaskStatus::FINISHED); 313 314 #ifdef MT_INSTRUMENTED_BUILD 315 fiberContext.fiber.SetName( MT_SYSTEM_TASK_FIBER_NAME ); 316 fiberContext.GetThreadContext()->NotifyTaskExecuteStateChanged( fiberContext.currentTask.debugColor, fiberContext.currentTask.debugID, TaskExecuteState::STOP ); 317 #endif 318 319 Fiber::SwitchTo(fiberContext.fiber, fiberContext.GetThreadContext()->schedulerFiber); 320 } 321 322 } 323 324 325 bool TaskScheduler::TryStealTask(internal::ThreadContext& threadContext, internal::GroupedTask & task) 326 { 327 bool taskStealingDisabled = threadContext.taskScheduler->IsTaskStealingDisabled(); 328 uint32 workersCount = threadContext.taskScheduler->GetWorkersCount(); 329 330 if (workersCount <= 1 || taskStealingDisabled ) 331 { 332 return false; 333 } 334 335 uint32 victimIndex = threadContext.random.Get(); 336 337 for (uint32 attempt = 0; attempt < workersCount; attempt++) 338 { 339 uint32 index = victimIndex % workersCount; 340 if (index == threadContext.workerIndex) 341 { 342 victimIndex++; 343 index = victimIndex % workersCount; 344 } 345 346 internal::ThreadContext& victimContext = threadContext.taskScheduler->threadContext[index]; 347 if (victimContext.queue.TryPopNewest(task)) 348 { 349 return true; 350 } 351 352 victimIndex++; 353 } 354 return false; 355 } 356 357 void TaskScheduler::WorkerThreadMain( void* userData ) 358 { 359 internal::ThreadContext& context = *(internal::ThreadContext*)(userData); 360 MT_ASSERT(context.taskScheduler, "Task scheduler must be not null!"); 361 362 isWorkerThreadTLS = 1; 363 context.threadId.SetAsCurrentThread(); 364 365 #ifdef MT_INSTRUMENTED_BUILD 366 const char* threadNames[] = {"worker0","worker1","worker2","worker3","worker4","worker5","worker6","worker7","worker8","worker9","worker10","worker11","worker12"}; 367 if (context.workerIndex < MT_ARRAY_SIZE(threadNames)) 368 { 369 Thread::SetThreadName(threadNames[context.workerIndex]); 370 } else 371 { 372 Thread::SetThreadName("worker_thread"); 373 } 374 #endif 375 376 context.schedulerFiber.CreateFromCurrentThreadAndRun(SchedulerFiberMain, userData); 377 } 378 379 380 void TaskScheduler::SchedulerFiberWait( void* userData ) 381 { 382 WaitContext& waitContext = *(WaitContext*)(userData); 383 internal::ThreadContext& context = *waitContext.threadContext; 384 MT_ASSERT(context.taskScheduler, "Task scheduler must be not null!"); 385 MT_ASSERT(waitContext.waitCounter, "Wait counter must be not null!"); 386 387 #ifdef MT_INSTRUMENTED_BUILD 388 context.NotifyWaitStarted(); 389 context.NotifyTaskExecuteStateChanged( MT_SYSTEM_TASK_COLOR, MT_SYSTEM_TASK_NAME, TaskExecuteState::START); 390 #endif 391 392 bool isTaskStealingDisabled = context.taskScheduler->IsTaskStealingDisabled(); 393 394 int64 timeOut = GetTimeMicroSeconds() + (waitContext.waitTimeMs * 1000); 395 396 SpinWait spinWait; 397 398 for(;;) 399 { 400 if ( SchedulerFiberStep(context, isTaskStealingDisabled) == false ) 401 { 402 spinWait.SpinOnce(); 403 } else 404 { 405 spinWait.Reset(); 406 } 407 408 int32 groupTaskCount = waitContext.waitCounter->Load(); 409 if (groupTaskCount == 0) 410 { 411 waitContext.exitCode = 0; 412 break; 413 } 414 415 int64 timeNow = GetTimeMicroSeconds(); 416 if (timeNow >= timeOut) 417 { 418 waitContext.exitCode = 1; 419 break; 420 } 421 } 422 423 #ifdef MT_INSTRUMENTED_BUILD 424 context.NotifyTaskExecuteStateChanged( MT_SYSTEM_TASK_COLOR, MT_SYSTEM_TASK_NAME, TaskExecuteState::STOP); 425 context.NotifyWaitFinished(); 426 #endif 427 428 } 429 430 void TaskScheduler::SchedulerFiberMain( void* userData ) 431 { 432 internal::ThreadContext& context = *(internal::ThreadContext*)(userData); 433 MT_ASSERT(context.taskScheduler, "Task scheduler must be not null!"); 434 435 #ifdef MT_INSTRUMENTED_BUILD 436 context.NotifyThreadCreated(context.workerIndex); 437 #endif 438 439 int32 totalThreadsCount = context.taskScheduler->threadsCount.LoadRelaxed(); 440 context.taskScheduler->startedThreadsCount.IncFetch(); 441 442 //Simple spinlock until all threads is started and initialized 443 for(;;) 444 { 445 int32 initializedThreadsCount = context.taskScheduler->startedThreadsCount.Load(); 446 if (initializedThreadsCount == totalThreadsCount) 447 { 448 break; 449 } 450 451 // sleep some time until all other thread initialized 452 Thread::Sleep(1); 453 } 454 455 HardwareFullMemoryBarrier(); 456 457 #ifdef MT_INSTRUMENTED_BUILD 458 context.NotifyThreadStarted(context.workerIndex); 459 context.NotifyTaskExecuteStateChanged( MT_SYSTEM_TASK_COLOR, MT_SYSTEM_TASK_NAME, TaskExecuteState::START); 460 #endif 461 bool isTaskStealingDisabled = context.taskScheduler->IsTaskStealingDisabled(); 462 463 while(context.state.Load() != internal::ThreadState::EXIT) 464 { 465 if ( SchedulerFiberStep(context, isTaskStealingDisabled) == false) 466 { 467 #ifdef MT_INSTRUMENTED_BUILD 468 context.NotifyThreadIdleStarted(context.workerIndex); 469 #endif 470 471 #if MT_LOW_LATENCY_EXPERIMENTAL_WAIT 472 473 SpinWait spinWait; 474 475 for(;;) 476 { 477 // Queue is empty and stealing attempt has failed. 478 // Fast Spin Wait for new tasks 479 if (spinWait.SpinOnce() >= SpinWait::YIELD_SLEEP0_THRESHOLD) 480 { 481 // Fast Spin wait for new tasks has failed. 482 // Wait for new events using events 483 context.hasNewTasksEvent.Wait(20000); 484 485 spinWait.Reset(); 486 487 #ifdef MT_INSTRUMENTED_BUILD 488 context.NotifyThreadIdleFinished(context.workerIndex); 489 #endif 490 491 break; 492 } 493 494 internal::GroupedTask task; 495 if ( context.queue.TryPopOldest(task) ) 496 { 497 #ifdef MT_INSTRUMENTED_BUILD 498 context.NotifyThreadIdleFinished(context.workerIndex); 499 #endif 500 501 SchedulerFiberProcessTask(context, task); 502 503 break; 504 } 505 506 } 507 #else 508 // Queue is empty and stealing attempt has failed. 509 // Wait for new events using events 510 context.hasNewTasksEvent.Wait(20000); 511 512 #ifdef MT_INSTRUMENTED_BUILD 513 context.NotifyThreadIdleFinished(context.workerIndex); 514 #endif 515 516 #endif 517 518 } 519 520 } // main thread loop 521 522 #ifdef MT_INSTRUMENTED_BUILD 523 context.NotifyTaskExecuteStateChanged( MT_SYSTEM_TASK_COLOR, MT_SYSTEM_TASK_NAME, TaskExecuteState::STOP); 524 context.NotifyThreadStoped(context.workerIndex); 525 #endif 526 527 } 528 529 void TaskScheduler::SchedulerFiberProcessTask( internal::ThreadContext& context, internal::GroupedTask& task ) 530 { 531 #ifdef MT_INSTRUMENTED_BUILD 532 bool isNewTask = (task.awaitingFiber == nullptr); 533 #endif 534 535 // There is a new task 536 FiberContext* fiberContext = context.taskScheduler->RequestFiberContext(task); 537 MT_ASSERT(fiberContext, "Can't get execution context from pool"); 538 MT_ASSERT(fiberContext->currentTask.IsValid(), "Sanity check failed"); 539 MT_ASSERT(fiberContext->stackRequirements == task.desc.stackRequirements, "Sanity check failed"); 540 541 while(fiberContext) 542 { 543 #ifdef MT_INSTRUMENTED_BUILD 544 if (isNewTask) 545 { 546 //TODO: 547 isNewTask = false; 548 } 549 #endif 550 // prevent invalid fiber resume from child tasks, before ExecuteTask is done 551 fiberContext->childrenFibersCount.IncFetch(); 552 553 FiberContext* parentFiber = ExecuteTask(context, fiberContext); 554 555 FiberTaskStatus::Type taskStatus = fiberContext->GetStatus(); 556 557 //release guard 558 int childrenFibersCount = fiberContext->childrenFibersCount.DecFetch(); 559 560 // Can drop fiber context - task is finished 561 if (taskStatus == FiberTaskStatus::FINISHED) 562 { 563 MT_ASSERT( childrenFibersCount == 0, "Sanity check failed"); 564 context.taskScheduler->ReleaseFiberContext(std::move(fiberContext)); 565 566 // If parent fiber is exist transfer flow control to parent fiber, if parent fiber is null, exit 567 fiberContext = parentFiber; 568 } else 569 { 570 MT_ASSERT( childrenFibersCount >= 0, "Sanity check failed"); 571 572 // No subtasks here and status is not finished, this mean all subtasks already finished before parent return from ExecuteTask 573 if (childrenFibersCount == 0) 574 { 575 MT_ASSERT(parentFiber == nullptr, "Sanity check failed"); 576 } else 577 { 578 // If subtasks still exist, drop current task execution. task will be resumed when last subtask finished 579 break; 580 } 581 582 // If task is yielded execution, get another task from queue. 583 if (taskStatus == FiberTaskStatus::YIELDED) 584 { 585 // Task is yielded, add to tasks queue 586 ArrayView<internal::GroupedTask> buffer(context.descBuffer, 1); 587 ArrayView<internal::TaskBucket> buckets( MT_ALLOCATE_ON_STACK(sizeof(internal::TaskBucket)), 1 ); 588 589 FiberContext* yieldedTask = fiberContext; 590 StaticVector<FiberContext*, 1> yieldedTasksQueue(1, yieldedTask); 591 internal::DistibuteDescriptions( TaskGroup(TaskGroup::ASSIGN_FROM_CONTEXT), yieldedTasksQueue.Begin(), buffer, buckets ); 592 593 // add yielded task to scheduler 594 context.taskScheduler->RunTasksImpl(buckets, nullptr, true); 595 596 // ATENTION! yielded task can be already completed at this point 597 598 break; 599 } 600 } 601 } //while(fiberContext) 602 } 603 604 bool TaskScheduler::SchedulerFiberStep( internal::ThreadContext& context, bool disableTaskStealing) 605 { 606 internal::GroupedTask task; 607 if ( context.queue.TryPopOldest(task) || (disableTaskStealing == false && TryStealTask(context, task) ) ) 608 { 609 SchedulerFiberProcessTask(context, task); 610 return true; 611 } 612 613 return false; 614 } 615 616 void TaskScheduler::RunTasksImpl(ArrayView<internal::TaskBucket>& buckets, FiberContext * parentFiber, bool restoredFromAwaitState) 617 { 618 619 #if MT_LOW_LATENCY_EXPERIMENTAL_WAIT 620 // Early wakeup worker threads (worker thread spin wait for some time before sleep) 621 int32 roundRobinIndex = roundRobinThreadIndex.LoadRelaxed(); 622 for (size_t i = 0; i < buckets.Size(); ++i) 623 { 624 int bucketIndex = ((roundRobinIndex + i) % threadsCount.LoadRelaxed()); 625 internal::ThreadContext & context = threadContext[bucketIndex]; 626 context.hasNewTasksEvent.Signal(); 627 } 628 #endif 629 630 631 // This storage is necessary to calculate how many tasks we add to different groups 632 int newTaskCountInGroup[TaskGroup::MT_MAX_GROUPS_COUNT]; 633 634 // Default value is 0 635 memset(&newTaskCountInGroup[0], 0, MT_ARRAY_SIZE(newTaskCountInGroup)); 636 637 // Set parent fiber pointer 638 // Calculate the number of tasks per group 639 // Calculate total number of tasks 640 size_t count = 0; 641 for (size_t i = 0; i < buckets.Size(); ++i) 642 { 643 internal::TaskBucket& bucket = buckets[i]; 644 for (size_t taskIndex = 0; taskIndex < bucket.count; taskIndex++) 645 { 646 internal::GroupedTask & task = bucket.tasks[taskIndex]; 647 648 task.parentFiber = parentFiber; 649 650 int idx = task.group.GetValidIndex(); 651 MT_ASSERT(idx >= 0 && idx < TaskGroup::MT_MAX_GROUPS_COUNT, "Invalid index"); 652 newTaskCountInGroup[idx]++; 653 } 654 655 count += bucket.count; 656 } 657 658 // Increments child fibers count on parent fiber 659 if (parentFiber) 660 { 661 parentFiber->childrenFibersCount.AddFetch((int)count); 662 } 663 664 if (restoredFromAwaitState == false) 665 { 666 // Increase the number of active tasks in the group using data from temporary storage 667 for (size_t i = 0; i < TaskGroup::MT_MAX_GROUPS_COUNT; i++) 668 { 669 int groupNewTaskCount = newTaskCountInGroup[i]; 670 if (groupNewTaskCount > 0) 671 { 672 groupStats[i].Add((uint32)groupNewTaskCount); 673 } 674 } 675 676 // Increments all task in progress counter 677 allGroups.Add((uint32)count); 678 } else 679 { 680 // If task's restored from await state, counters already in correct state 681 } 682 683 // Add to thread queue 684 for (size_t i = 0; i < buckets.Size(); ++i) 685 { 686 int bucketIndex = roundRobinThreadIndex.IncFetch() % threadsCount.LoadRelaxed(); 687 internal::ThreadContext & context = threadContext[bucketIndex]; 688 689 internal::TaskBucket& bucket = buckets[i]; 690 691 for(;;) 692 { 693 MT_ASSERT(bucket.count < (internal::TASK_BUFFER_CAPACITY - 1), "Sanity check failed. Too many tasks per one bucket."); 694 695 bool res = context.queue.Add(bucket.tasks, bucket.count); 696 if (res == true) 697 { 698 break; 699 } 700 701 //Can't add new tasks onto the queue. Look like the job system is overloaded. Wait some time and try again. 702 //TODO: implement waiting until workers done using events. 703 Thread::Sleep(10); 704 } 705 706 context.hasNewTasksEvent.Signal(); 707 } 708 } 709 710 void TaskScheduler::RunAsync(TaskGroup group, const TaskHandle* taskHandleArray, uint32 taskHandleCount) 711 { 712 MT_ASSERT(!IsWorkerThread(), "Can't use RunAsync inside Task. Use FiberContext.RunAsync() instead."); 713 714 ArrayView<internal::GroupedTask> buffer(MT_ALLOCATE_ON_STACK(sizeof(internal::GroupedTask) * taskHandleCount), taskHandleCount); 715 716 uint32 bucketCount = MT::Min((uint32)GetWorkersCount(), taskHandleCount); 717 ArrayView<internal::TaskBucket> buckets(MT_ALLOCATE_ON_STACK(sizeof(internal::TaskBucket) * bucketCount), bucketCount); 718 719 internal::DistibuteDescriptions(group, taskHandleArray, buffer, buckets); 720 RunTasksImpl(buckets, nullptr, false); 721 } 722 723 bool TaskScheduler::WaitGroup(TaskGroup group, uint32 milliseconds) 724 { 725 MT_VERIFY(IsWorkerThread() == false, "Can't use WaitGroup inside Task. Use FiberContext.WaitGroupAndYield() instead.", return false); 726 727 TaskScheduler::TaskGroupDescription& groupDesc = GetGroupDesc(group); 728 729 // Early exit if not tasks in group 730 int32 taskCount = groupDesc.GetTaskCount(); 731 if (taskCount == 0) 732 { 733 return true; 734 } 735 736 size_t bytesCountForDescBuffer = internal::ThreadContext::GetMemoryRequrementInBytesForDescBuffer(); 737 void* descBuffer = MT_ALLOCATE_ON_STACK(bytesCountForDescBuffer); 738 739 internal::ThreadContext context(descBuffer); 740 context.taskScheduler = this; 741 context.SetThreadIndex(0xFFFFFFFF); 742 context.threadId.SetAsCurrentThread(); 743 744 WaitContext waitContext; 745 waitContext.threadContext = &context; 746 waitContext.waitCounter = groupDesc.GetWaitCounter(); 747 waitContext.waitTimeMs = milliseconds; 748 waitContext.exitCode = 0; 749 750 isWorkerThreadTLS = 1; 751 752 context.schedulerFiber.CreateFromCurrentThreadAndRun(SchedulerFiberWait, &waitContext); 753 754 isWorkerThreadTLS = 0; 755 return (waitContext.exitCode == 0); 756 } 757 758 bool TaskScheduler::WaitAll(uint32 milliseconds) 759 { 760 MT_VERIFY(IsWorkerThread() == false, "Can't use WaitAll inside Task.", return false); 761 762 // Early exit if not tasks in group 763 int32 taskCount = allGroups.GetTaskCount(); 764 if (taskCount == 0) 765 { 766 return true; 767 } 768 769 size_t bytesCountForDescBuffer = internal::ThreadContext::GetMemoryRequrementInBytesForDescBuffer(); 770 void* descBuffer = MT_ALLOCATE_ON_STACK(bytesCountForDescBuffer); 771 772 internal::ThreadContext context(descBuffer); 773 context.taskScheduler = this; 774 context.SetThreadIndex(0xFFFFFFFF); 775 context.threadId.SetAsCurrentThread(); 776 777 WaitContext waitContext; 778 waitContext.threadContext = &context; 779 waitContext.waitCounter = allGroups.GetWaitCounter(); 780 waitContext.waitTimeMs = milliseconds; 781 waitContext.exitCode = 0; 782 783 isWorkerThreadTLS = 1; 784 785 context.schedulerFiber.CreateFromCurrentThreadAndRun(SchedulerFiberWait, &waitContext); 786 787 isWorkerThreadTLS = 0; 788 return (waitContext.exitCode == 0); 789 } 790 791 bool TaskScheduler::IsTaskStealingDisabled() const 792 { 793 if (threadsCount.LoadRelaxed() <= 1) 794 { 795 return true; 796 } 797 798 return taskStealingDisabled; 799 } 800 801 int32 TaskScheduler::GetWorkersCount() const 802 { 803 return threadsCount.LoadRelaxed(); 804 } 805 806 807 bool TaskScheduler::IsWorkerThread() const 808 { 809 return (isWorkerThreadTLS != 0); 810 } 811 812 TaskGroup TaskScheduler::CreateGroup() 813 { 814 MT_ASSERT(IsWorkerThread() == false, "Can't use CreateGroup inside Task."); 815 816 TaskGroup group; 817 if (!availableGroups.TryPop(group)) 818 { 819 MT_REPORT_ASSERT("Group pool is empty"); 820 } 821 822 int idx = group.GetValidIndex(); 823 MT_USED_IN_ASSERT(idx); 824 MT_ASSERT(groupStats[idx].GetDebugIsFree() == true, "Bad logic!"); 825 #if MT_GROUP_DEBUG 826 groupStats[idx].SetDebugIsFree(false); 827 #endif 828 829 return group; 830 } 831 832 void TaskScheduler::ReleaseGroup(TaskGroup group) 833 { 834 MT_ASSERT(IsWorkerThread() == false, "Can't use ReleaseGroup inside Task."); 835 MT_ASSERT(group.IsValid(), "Invalid group ID"); 836 837 int idx = group.GetValidIndex(); 838 MT_USED_IN_ASSERT(idx); 839 MT_ASSERT(groupStats[idx].GetDebugIsFree() == false, "Group already released"); 840 #if MT_GROUP_DEBUG 841 groupStats[idx].SetDebugIsFree(true); 842 #endif 843 844 bool res = availableGroups.TryPush(std::move(group)); 845 MT_USED_IN_ASSERT(res); 846 MT_ASSERT(res, "Can't return group to pool"); 847 } 848 849 TaskScheduler::TaskGroupDescription & TaskScheduler::GetGroupDesc(TaskGroup group) 850 { 851 MT_ASSERT(group.IsValid(), "Invalid group ID"); 852 853 int idx = group.GetValidIndex(); 854 TaskScheduler::TaskGroupDescription & groupDesc = groupStats[idx]; 855 856 MT_ASSERT(groupDesc.GetDebugIsFree() == false, "Invalid group"); 857 return groupDesc; 858 } 859 } 860 861 862