1 /* 2 Copyright (c) 2005-2023 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "common/test.h" 18 #include "common/utils.h" 19 #include "oneapi/tbb/detail/_config.h" 20 #include "tbb/global_control.h" 21 22 #include "tbb/task_group.h" 23 24 #include "common/concurrency_tracker.h" 25 26 #include <atomic> 27 #include <stdexcept> 28 29 //! \file test_task_group.cpp 30 //! \brief Test for [scheduler.task_group scheduler.task_group_status] specification 31 32 unsigned g_MaxConcurrency = 4; 33 using atomic_t = std::atomic<std::uintptr_t>; 34 unsigned MinThread = 1; 35 unsigned MaxThread = 4; 36 37 //------------------------------------------------------------------------ 38 // Tests for the thread safety of the task_group manipulations 39 //------------------------------------------------------------------------ 40 41 #include "common/spin_barrier.h" 42 43 enum SharingMode { 44 VagabondGroup = 1, 45 ParallelWait = 2 46 }; 47 48 template<typename task_group_type> 49 class SharedGroupBodyImpl : utils::NoCopy, utils::NoAfterlife { 50 static const std::uintptr_t c_numTasks0 = 4096, 51 c_numTasks1 = 1024; 52 53 const std::uintptr_t m_numThreads; 54 const std::uintptr_t m_sharingMode; 55 56 task_group_type *m_taskGroup; 57 atomic_t m_tasksSpawned, 58 m_threadsReady; 59 utils::SpinBarrier m_barrier; 60 61 static atomic_t s_tasksExecuted; 62 63 struct TaskFunctor { 64 SharedGroupBodyImpl *m_pOwner; 65 void operator () () const { 66 if ( m_pOwner->m_sharingMode & ParallelWait ) { 67 while ( utils::ConcurrencyTracker::PeakParallelism() < m_pOwner->m_numThreads ) 68 utils::yield(); 69 } 70 ++s_tasksExecuted; 71 } 72 }; 73 74 TaskFunctor m_taskFunctor; 75 76 void Spawn ( std::uintptr_t numTasks ) { 77 for ( std::uintptr_t i = 0; i < numTasks; ++i ) { 78 ++m_tasksSpawned; 79 utils::ConcurrencyTracker ct; 80 m_taskGroup->run( m_taskFunctor ); 81 } 82 ++m_threadsReady; 83 } 84 85 void DeleteTaskGroup () { 86 delete m_taskGroup; 87 m_taskGroup = nullptr; 88 } 89 90 void Wait () { 91 while ( m_threadsReady != m_numThreads ) 92 utils::yield(); 93 const std::uintptr_t numSpawned = c_numTasks0 + c_numTasks1 * (m_numThreads - 1); 94 CHECK_MESSAGE( m_tasksSpawned == numSpawned, "Wrong number of spawned tasks. The test is broken" ); 95 INFO("Max spawning parallelism is " << utils::ConcurrencyTracker::PeakParallelism() << "out of " << g_MaxConcurrency); 96 if ( m_sharingMode & ParallelWait ) { 97 m_barrier.wait( &utils::ConcurrencyTracker::Reset ); 98 { 99 utils::ConcurrencyTracker ct; 100 m_taskGroup->wait(); 101 } 102 if ( utils::ConcurrencyTracker::PeakParallelism() == 1 ) { 103 const char* msg = "Warning: No parallel waiting detected in TestParallelWait"; 104 WARN( msg ); 105 } 106 m_barrier.wait(); 107 } 108 else 109 m_taskGroup->wait(); 110 CHECK_MESSAGE( m_tasksSpawned == numSpawned, "No tasks should be spawned after wait starts. The test is broken" ); 111 CHECK_MESSAGE( s_tasksExecuted == numSpawned, "Not all spawned tasks were executed" ); 112 } 113 114 public: 115 SharedGroupBodyImpl ( std::uintptr_t numThreads, std::uintptr_t sharingMode = 0 ) 116 : m_numThreads(numThreads) 117 , m_sharingMode(sharingMode) 118 , m_taskGroup(nullptr) 119 , m_barrier(numThreads) 120 { 121 CHECK_MESSAGE( m_numThreads > 1, "SharedGroupBody tests require concurrency" ); 122 if ((m_sharingMode & VagabondGroup) && m_numThreads != 2) { 123 CHECK_MESSAGE(false, "In vagabond mode SharedGroupBody must be used with 2 threads only"); 124 } 125 utils::ConcurrencyTracker::Reset(); 126 s_tasksExecuted = 0; 127 m_tasksSpawned = 0; 128 m_threadsReady = 0; 129 m_taskFunctor.m_pOwner = this; 130 } 131 132 void Run ( std::uintptr_t idx ) { 133 AssertLive(); 134 if ( idx == 0 ) { 135 if (m_taskGroup || m_tasksSpawned) { 136 CHECK_MESSAGE(false, "SharedGroupBody must be reset before reuse"); 137 } 138 m_taskGroup = new task_group_type; 139 Spawn( c_numTasks0 ); 140 Wait(); 141 if ( m_sharingMode & VagabondGroup ) 142 m_barrier.wait(); 143 else 144 DeleteTaskGroup(); 145 } 146 else { 147 while ( m_tasksSpawned == 0 ) 148 utils::yield(); 149 CHECK_MESSAGE ( m_taskGroup, "Task group is not initialized"); 150 Spawn (c_numTasks1); 151 if ( m_sharingMode & ParallelWait ) 152 Wait(); 153 if ( m_sharingMode & VagabondGroup ) { 154 CHECK_MESSAGE ( idx == 1, "In vagabond mode SharedGroupBody must be used with 2 threads only" ); 155 m_barrier.wait(); 156 DeleteTaskGroup(); 157 } 158 } 159 AssertLive(); 160 } 161 }; 162 163 template<typename task_group_type> 164 atomic_t SharedGroupBodyImpl<task_group_type>::s_tasksExecuted; 165 166 template<typename task_group_type> 167 class SharedGroupBody : utils::NoAssign, utils::NoAfterlife { 168 bool m_bOwner; 169 SharedGroupBodyImpl<task_group_type> *m_pImpl; 170 public: 171 SharedGroupBody ( std::uintptr_t numThreads, std::uintptr_t sharingMode = 0 ) 172 : utils::NoAssign() 173 , utils::NoAfterlife() 174 , m_bOwner(true) 175 , m_pImpl( new SharedGroupBodyImpl<task_group_type>(numThreads, sharingMode) ) 176 {} 177 SharedGroupBody ( const SharedGroupBody& src ) 178 : utils::NoAssign() 179 , utils::NoAfterlife() 180 , m_bOwner(false) 181 , m_pImpl(src.m_pImpl) 182 {} 183 ~SharedGroupBody () { 184 if ( m_bOwner ) 185 delete m_pImpl; 186 } 187 void operator() ( std::uintptr_t idx ) const { 188 // Wrap the functior into additional task group to enforce bounding. 189 task_group_type tg; 190 tg.run_and_wait([&] { m_pImpl->Run(idx); }); 191 } 192 }; 193 194 template<typename task_group_type> 195 class RunAndWaitSyncronizationTestBody : utils::NoAssign { 196 utils::SpinBarrier& m_barrier; 197 std::atomic<bool>& m_completed; 198 task_group_type& m_tg; 199 public: 200 RunAndWaitSyncronizationTestBody(utils::SpinBarrier& barrier, std::atomic<bool>& completed, task_group_type& tg) 201 : m_barrier(barrier), m_completed(completed), m_tg(tg) {} 202 203 void operator()() const { 204 m_barrier.wait(); 205 utils::doDummyWork(100000); 206 m_completed = true; 207 } 208 209 void operator()(int id) const { 210 if (id == 0) { 211 m_tg.run_and_wait(*this); 212 } else { 213 m_barrier.wait(); 214 m_tg.wait(); 215 CHECK_MESSAGE(m_completed, "A concurrent waiter has left the wait method earlier than work has finished"); 216 } 217 } 218 }; 219 220 template<typename task_group_type> 221 void TestParallelSpawn () { 222 NativeParallelFor( g_MaxConcurrency, SharedGroupBody<task_group_type>(g_MaxConcurrency) ); 223 } 224 225 template<typename task_group_type> 226 void TestParallelWait () { 227 NativeParallelFor( g_MaxConcurrency, SharedGroupBody<task_group_type>(g_MaxConcurrency, ParallelWait) ); 228 229 utils::SpinBarrier barrier(g_MaxConcurrency); 230 std::atomic<bool> completed; 231 completed = false; 232 task_group_type tg; 233 RunAndWaitSyncronizationTestBody<task_group_type> b(barrier, completed, tg); 234 NativeParallelFor( g_MaxConcurrency, b ); 235 } 236 237 // Tests non-stack-bound task group (the group that is allocated by one thread and destroyed by the other) 238 template<typename task_group_type> 239 void TestVagabondGroup () { 240 NativeParallelFor( 2, SharedGroupBody<task_group_type>(2, VagabondGroup) ); 241 } 242 243 #include "common/memory_usage.h" 244 245 template<typename task_group_type> 246 void TestThreadSafety() { 247 auto tests = [] { 248 for (int trail = 0; trail < 10; ++trail) { 249 TestParallelSpawn<task_group_type>(); 250 TestParallelWait<task_group_type>(); 251 TestVagabondGroup<task_group_type>(); 252 } 253 }; 254 255 // Test and warm up allocator. 256 tests(); 257 258 // Ensure that cosumption is stabilized. 259 std::size_t initial = utils::GetMemoryUsage(); 260 for (;;) { 261 tests(); 262 std::size_t current = utils::GetMemoryUsage(); 263 if (current <= initial) { 264 return; 265 } 266 initial = current; 267 } 268 } 269 //------------------------------------------------------------------------ 270 // Common requisites of the Fibonacci tests 271 //------------------------------------------------------------------------ 272 273 const std::uintptr_t N = 20; 274 const std::uintptr_t F = 6765; 275 276 atomic_t g_Sum; 277 278 #define FIB_TEST_PROLOGUE() \ 279 const unsigned numRepeats = g_MaxConcurrency * 4; \ 280 utils::ConcurrencyTracker::Reset() 281 282 #define FIB_TEST_EPILOGUE(sum) \ 283 CHECK(utils::ConcurrencyTracker::PeakParallelism() <= g_MaxConcurrency); \ 284 CHECK( sum == numRepeats * F ); 285 286 287 // Fibonacci tasks specified as functors 288 template<class task_group_type> 289 class FibTaskBase : utils::NoAssign, utils::NoAfterlife { 290 protected: 291 std::uintptr_t* m_pRes; 292 mutable std::uintptr_t m_Num; 293 virtual void impl() const = 0; 294 public: 295 FibTaskBase( std::uintptr_t* y, std::uintptr_t n ) : m_pRes(y), m_Num(n) {} 296 void operator()() const { 297 utils::ConcurrencyTracker ct; 298 AssertLive(); 299 if( m_Num < 2 ) { 300 *m_pRes = m_Num; 301 } else { 302 impl(); 303 } 304 } 305 virtual ~FibTaskBase() {} 306 }; 307 308 template<class task_group_type> 309 class FibTaskAsymmetricTreeWithFunctor : public FibTaskBase<task_group_type> { 310 public: 311 FibTaskAsymmetricTreeWithFunctor( std::uintptr_t* y, std::uintptr_t n ) : FibTaskBase<task_group_type>(y, n) {} 312 virtual void impl() const override { 313 std::uintptr_t x = ~0u; 314 task_group_type tg; 315 tg.run( FibTaskAsymmetricTreeWithFunctor(&x, this->m_Num-1) ); 316 this->m_Num -= 2; tg.run_and_wait( *this ); 317 *(this->m_pRes) += x; 318 } 319 }; 320 321 template<class task_group_type> 322 class FibTaskSymmetricTreeWithFunctor : public FibTaskBase<task_group_type> { 323 public: 324 FibTaskSymmetricTreeWithFunctor( std::uintptr_t* y, std::uintptr_t n ) : FibTaskBase<task_group_type>(y, n) {} 325 virtual void impl() const override { 326 std::uintptr_t x = ~0u, 327 y = ~0u; 328 task_group_type tg; 329 tg.run( FibTaskSymmetricTreeWithFunctor(&x, this->m_Num-1) ); 330 tg.run( FibTaskSymmetricTreeWithFunctor(&y, this->m_Num-2) ); 331 tg.wait(); 332 *(this->m_pRes) = x + y; 333 } 334 }; 335 336 // Helper functions 337 template<class fib_task> 338 std::uintptr_t RunFibTask(std::uintptr_t n) { 339 std::uintptr_t res = ~0u; 340 fib_task(&res, n)(); 341 return res; 342 } 343 344 template<typename fib_task> 345 void RunFibTest() { 346 FIB_TEST_PROLOGUE(); 347 std::uintptr_t sum = 0; 348 for( unsigned i = 0; i < numRepeats; ++i ) 349 sum += RunFibTask<fib_task>(N); 350 FIB_TEST_EPILOGUE(sum); 351 } 352 353 template<typename fib_task> 354 void FibFunctionNoArgs() { 355 g_Sum += RunFibTask<fib_task>(N); 356 } 357 358 template<typename task_group_type> 359 void TestFibWithLambdas() { 360 FIB_TEST_PROLOGUE(); 361 atomic_t sum; 362 sum = 0; 363 task_group_type tg; 364 for( unsigned i = 0; i < numRepeats; ++i ) 365 tg.run( [&](){sum += RunFibTask<FibTaskSymmetricTreeWithFunctor<task_group_type> >(N);} ); 366 tg.wait(); 367 FIB_TEST_EPILOGUE(sum); 368 } 369 370 template<typename task_group_type> 371 void TestFibWithFunctor() { 372 RunFibTest<FibTaskAsymmetricTreeWithFunctor<task_group_type> >(); 373 RunFibTest< FibTaskSymmetricTreeWithFunctor<task_group_type> >(); 374 } 375 376 template<typename task_group_type> 377 void TestFibWithFunctionPtr() { 378 FIB_TEST_PROLOGUE(); 379 g_Sum = 0; 380 task_group_type tg; 381 for( unsigned i = 0; i < numRepeats; ++i ) 382 tg.run( &FibFunctionNoArgs<FibTaskSymmetricTreeWithFunctor<task_group_type> > ); 383 tg.wait(); 384 FIB_TEST_EPILOGUE(g_Sum); 385 } 386 387 template<typename task_group_type> 388 void RunFibonacciTests() { 389 TestFibWithLambdas<task_group_type>(); 390 TestFibWithFunctor<task_group_type>(); 391 TestFibWithFunctionPtr<task_group_type>(); 392 } 393 394 class test_exception : public std::exception 395 { 396 const char* m_strDescription; 397 public: 398 test_exception ( const char* descr ) : m_strDescription(descr) {} 399 400 const char* what() const throw() override { return m_strDescription; } 401 }; 402 403 using TestException = test_exception; 404 405 #include <string.h> 406 407 #define NUM_CHORES 512 408 #define NUM_GROUPS 64 409 #define SKIP_CHORES (NUM_CHORES/4) 410 #define SKIP_GROUPS (NUM_GROUPS/4) 411 #define EXCEPTION_DESCR1 "Test exception 1" 412 #define EXCEPTION_DESCR2 "Test exception 2" 413 414 atomic_t g_ExceptionCount; 415 atomic_t g_TaskCount; 416 unsigned g_ExecutedAtCancellation; 417 bool g_Rethrow; 418 bool g_Throw; 419 420 class ThrowingTask : utils::NoAssign, utils::NoAfterlife { 421 atomic_t &m_TaskCount; 422 public: 423 ThrowingTask( atomic_t& counter ) : m_TaskCount(counter) {} 424 void operator() () const { 425 utils::ConcurrencyTracker ct; 426 AssertLive(); 427 if ( g_Throw ) { 428 if ( ++m_TaskCount == SKIP_CHORES ) 429 TBB_TEST_THROW(test_exception(EXCEPTION_DESCR1)); 430 utils::yield(); 431 } 432 else { 433 ++g_TaskCount; 434 while( !tbb::is_current_task_group_canceling() ) 435 utils::yield(); 436 } 437 } 438 }; 439 440 inline void ResetGlobals ( bool bThrow, bool bRethrow ) { 441 g_Throw = bThrow; 442 g_Rethrow = bRethrow; 443 g_ExceptionCount = 0; 444 g_TaskCount = 0; 445 utils::ConcurrencyTracker::Reset(); 446 } 447 448 template<typename task_group_type> 449 void LaunchChildrenWithFunctor () { 450 atomic_t count; 451 count = 0; 452 task_group_type g; 453 for (unsigned i = 0; i < NUM_CHORES; ++i) { 454 if (i % 2 == 1) { 455 g.run(g.defer(ThrowingTask(count))); 456 } else 457 { 458 g.run(ThrowingTask(count)); 459 } 460 } 461 #if TBB_USE_EXCEPTIONS 462 tbb::task_group_status status = tbb::not_complete; 463 bool exceptionCaught = false; 464 try { 465 status = g.wait(); 466 } catch ( TestException& e ) { 467 CHECK_MESSAGE( e.what(), "Empty what() string" ); 468 CHECK_MESSAGE( strcmp(e.what(), EXCEPTION_DESCR1) == 0, "Unknown exception" ); 469 exceptionCaught = true; 470 ++g_ExceptionCount; 471 } catch( ... ) { CHECK_MESSAGE( false, "Unknown exception" ); } 472 if (g_Throw && !exceptionCaught && status != tbb::canceled) { 473 CHECK_MESSAGE(false, "No exception in the child task group"); 474 } 475 if ( g_Rethrow && g_ExceptionCount > SKIP_GROUPS ) { 476 throw test_exception(EXCEPTION_DESCR2); 477 } 478 #else 479 g.wait(); 480 #endif 481 } 482 483 // Tests for cancellation and exception handling behavior 484 template<typename task_group_type> 485 void TestManualCancellationWithFunctor () { 486 ResetGlobals( false, false ); 487 task_group_type tg; 488 for (unsigned i = 0; i < NUM_GROUPS; ++i) { 489 // TBB version does not require taking function address 490 if (i % 2 == 0) { 491 auto h = tg.defer(&LaunchChildrenWithFunctor<task_group_type>); 492 tg.run(std::move(h)); 493 } else 494 { 495 tg.run(&LaunchChildrenWithFunctor<task_group_type>); 496 } 497 } 498 CHECK_MESSAGE ( !tbb::is_current_task_group_canceling(), "Unexpected cancellation" ); 499 while ( g_MaxConcurrency > 1 && g_TaskCount == 0 ) 500 utils::yield(); 501 tg.cancel(); 502 g_ExecutedAtCancellation = int(g_TaskCount); 503 tbb::task_group_status status = tg.wait(); 504 CHECK_MESSAGE( status == tbb::canceled, "Task group reported invalid status." ); 505 CHECK_MESSAGE( g_TaskCount <= NUM_GROUPS * NUM_CHORES, "Too many tasks reported. The test is broken" ); 506 CHECK_MESSAGE( g_TaskCount < NUM_GROUPS * NUM_CHORES, "No tasks were cancelled. Cancellation model changed?" ); 507 CHECK_MESSAGE( g_TaskCount <= g_ExecutedAtCancellation + utils::ConcurrencyTracker::PeakParallelism(), "Too many tasks survived cancellation" ); 508 } 509 510 #if TBB_USE_EXCEPTIONS 511 template<typename task_group_type> 512 void TestExceptionHandling1 () { 513 ResetGlobals( true, false ); 514 task_group_type tg; 515 for( unsigned i = 0; i < NUM_GROUPS; ++i ) 516 // TBB version does not require taking function address 517 tg.run( &LaunchChildrenWithFunctor<task_group_type> ); 518 try { 519 tg.wait(); 520 } catch ( ... ) { 521 CHECK_MESSAGE( false, "Unexpected exception" ); 522 } 523 CHECK_MESSAGE( g_ExceptionCount <= NUM_GROUPS, "Too many exceptions from the child groups. The test is broken" ); 524 CHECK_MESSAGE( g_ExceptionCount == NUM_GROUPS, "Not all child groups threw the exception" ); 525 } 526 527 template<typename task_group_type> 528 void TestExceptionHandling2 () { 529 ResetGlobals( true, true ); 530 task_group_type tg; 531 bool exceptionCaught = false; 532 for( unsigned i = 0; i < NUM_GROUPS; ++i ) { 533 // TBB version does not require taking function address 534 tg.run( &LaunchChildrenWithFunctor<task_group_type> ); 535 } 536 try { 537 tg.wait(); 538 } catch ( TestException& e ) { 539 CHECK_MESSAGE( e.what(), "Empty what() string" ); 540 CHECK_MESSAGE( strcmp(e.what(), EXCEPTION_DESCR2) == 0, "Unknown exception" ); 541 exceptionCaught = true; 542 } catch( ... ) { CHECK_MESSAGE( false, "Unknown exception" ); } 543 CHECK_MESSAGE( exceptionCaught, "No exception thrown from the root task group" ); 544 CHECK_MESSAGE( g_ExceptionCount >= SKIP_GROUPS, "Too few exceptions from the child groups. The test is broken" ); 545 CHECK_MESSAGE( g_ExceptionCount <= NUM_GROUPS - SKIP_GROUPS, "Too many exceptions from the child groups. The test is broken" ); 546 CHECK_MESSAGE( g_ExceptionCount < NUM_GROUPS - SKIP_GROUPS, "None of the child groups was cancelled" ); 547 } 548 549 template <typename task_group_type> 550 void TestExceptionHandling3() { 551 task_group_type tg; 552 try { 553 tg.run_and_wait([]() { 554 volatile bool suppress_unreachable_code_warning = true; 555 if (suppress_unreachable_code_warning) { 556 throw 1; 557 } 558 }); 559 } catch (int error) { 560 CHECK(error == 1); 561 } catch ( ... ) { 562 CHECK_MESSAGE( false, "Unexpected exception" ); 563 } 564 } 565 566 template<typename task_group_type> 567 class LaunchChildrenDriver { 568 public: 569 void Launch(task_group_type& tg) { 570 ResetGlobals(false, false); 571 for (unsigned i = 0; i < NUM_GROUPS; ++i) { 572 tg.run(LaunchChildrenWithFunctor<task_group_type>); 573 } 574 CHECK_MESSAGE(!tbb::is_current_task_group_canceling(), "Unexpected cancellation"); 575 while (g_MaxConcurrency > 1 && g_TaskCount == 0) 576 utils::yield(); 577 } 578 579 void Finish() { 580 CHECK_MESSAGE(g_TaskCount <= NUM_GROUPS * NUM_CHORES, "Too many tasks reported. The test is broken"); 581 CHECK_MESSAGE(g_TaskCount < NUM_GROUPS * NUM_CHORES, "No tasks were cancelled. Cancellation model changed?"); 582 CHECK_MESSAGE(g_TaskCount <= g_ExecutedAtCancellation + g_MaxConcurrency, "Too many tasks survived cancellation"); 583 } 584 }; // LaunchChildrenWithTaskHandleDriver 585 586 template<typename task_group_type, bool Throw> 587 void TestMissingWait () { 588 bool exception_occurred = false, 589 unexpected_exception = false; 590 LaunchChildrenDriver<task_group_type> driver; 591 try { 592 task_group_type tg; 593 driver.Launch( tg ); 594 volatile bool suppress_unreachable_code_warning = Throw; 595 if (suppress_unreachable_code_warning) { 596 throw int(); // Initiate stack unwinding 597 } 598 } 599 catch ( const tbb::missing_wait& e ) { 600 CHECK_MESSAGE( e.what(), "Error message is absent" ); 601 exception_occurred = true; 602 unexpected_exception = Throw; 603 } 604 catch ( int ) { 605 exception_occurred = true; 606 unexpected_exception = !Throw; 607 } 608 catch ( ... ) { 609 exception_occurred = unexpected_exception = true; 610 } 611 CHECK( exception_occurred ); 612 CHECK( !unexpected_exception ); 613 driver.Finish(); 614 } 615 #endif 616 617 template<typename task_group_type> 618 void RunCancellationAndExceptionHandlingTests() { 619 TestManualCancellationWithFunctor<task_group_type>(); 620 #if TBB_USE_EXCEPTIONS 621 TestExceptionHandling1<task_group_type>(); 622 TestExceptionHandling2<task_group_type>(); 623 TestExceptionHandling3<task_group_type>(); 624 TestMissingWait<task_group_type, true>(); 625 TestMissingWait<task_group_type, false>(); 626 #endif 627 } 628 629 void EmptyFunction () {} 630 631 struct TestFunctor { 632 void operator()() { CHECK_MESSAGE( false, "Non-const operator called" ); } 633 void operator()() const { /* library requires this overload only */ } 634 }; 635 636 template<typename task_group_type> 637 void TestConstantFunctorRequirement() { 638 task_group_type g; 639 TestFunctor tf; 640 g.run( tf ); g.wait(); 641 g.run_and_wait( tf ); 642 } 643 644 //------------------------------------------------------------------------ 645 namespace TestMoveSemanticsNS { 646 struct TestFunctor { 647 void operator()() const {}; 648 }; 649 650 struct MoveOnlyFunctor : utils::MoveOnly, TestFunctor { 651 MoveOnlyFunctor() : utils::MoveOnly() {}; 652 MoveOnlyFunctor(MoveOnlyFunctor&& other) : utils::MoveOnly(std::move(other)) {}; 653 }; 654 655 struct MovePreferableFunctor : utils::Movable, TestFunctor { 656 MovePreferableFunctor() : utils::Movable() {}; 657 MovePreferableFunctor(MovePreferableFunctor&& other) : utils::Movable(std::move(other)) {}; 658 MovePreferableFunctor(const MovePreferableFunctor& other) : utils::Movable(other) {}; 659 }; 660 661 struct NoMoveNoCopyFunctor : utils::NoCopy, TestFunctor { 662 NoMoveNoCopyFunctor() : utils::NoCopy() {}; 663 // mv ctor is not allowed as cp ctor from parent utils::NoCopy 664 private: 665 NoMoveNoCopyFunctor(NoMoveNoCopyFunctor&&); 666 }; 667 668 template<typename task_group_type> 669 void TestBareFunctors() { 670 task_group_type tg; 671 MovePreferableFunctor mpf; 672 // run_and_wait() doesn't have any copies or moves of arguments inside the impl 673 tg.run_and_wait( NoMoveNoCopyFunctor() ); 674 675 tg.run( MoveOnlyFunctor() ); 676 tg.wait(); 677 678 tg.run( mpf ); 679 tg.wait(); 680 CHECK_MESSAGE(mpf.alive, "object was moved when was passed by lval"); 681 mpf.Reset(); 682 683 tg.run( std::move(mpf) ); 684 tg.wait(); 685 CHECK_MESSAGE(!mpf.alive, "object was copied when was passed by rval"); 686 mpf.Reset(); 687 } 688 } 689 690 template<typename task_group_type> 691 void TestMoveSemantics() { 692 TestMoveSemanticsNS::TestBareFunctors<task_group_type>(); 693 } 694 //------------------------------------------------------------------------ 695 696 // TODO: TBB_REVAMP_TODO - enable when ETS is available 697 #if TBBTEST_USE_TBB && TBB_PREVIEW_ISOLATED_TASK_GROUP 698 namespace TestIsolationNS { 699 class DummyFunctor { 700 public: 701 DummyFunctor() {} 702 void operator()() const { 703 for ( volatile int j = 0; j < 10; ++j ) {} 704 } 705 }; 706 707 template<typename task_group_type> 708 class ParForBody { 709 task_group_type& m_tg; 710 std::atomic<bool>& m_preserved; 711 tbb::enumerable_thread_specific<int>& m_ets; 712 public: 713 ParForBody( 714 task_group_type& tg, 715 std::atomic<bool>& preserved, 716 tbb::enumerable_thread_specific<int>& ets 717 ) : m_tg(tg), m_preserved(preserved), m_ets(ets) {} 718 719 void operator()(int) const { 720 if (++m_ets.local() > 1) m_preserved = false; 721 722 for (int i = 0; i < 1000; ++i) 723 m_tg.run(DummyFunctor()); 724 m_tg.wait(); 725 m_tg.run_and_wait(DummyFunctor()); 726 727 --m_ets.local(); 728 } 729 }; 730 731 template<typename task_group_type> 732 void CheckIsolation(bool isolation_is_expected) { 733 task_group_type tg; 734 std::atomic<bool> isolation_is_preserved; 735 isolation_is_preserved = true; 736 tbb::enumerable_thread_specific<int> ets(0); 737 738 tbb::parallel_for(0, 100, ParForBody<task_group_type>(tg, isolation_is_preserved, ets)); 739 740 ASSERT( 741 isolation_is_expected == isolation_is_preserved, 742 "Actual and expected isolation-related behaviours are different" 743 ); 744 } 745 746 // Should be called only when > 1 thread is used, because otherwise isolation is guaranteed to take place 747 void TestIsolation() { 748 CheckIsolation<tbb::task_group>(false); 749 CheckIsolation<tbb::isolated_task_group>(true); 750 } 751 } 752 #endif 753 754 #if __TBB_USE_ADDRESS_SANITIZER 755 //! Test for thread safety for the task_group 756 //! \brief \ref error_guessing \ref resource_usage 757 TEST_CASE("Memory leaks test is not applicable under ASAN\n" * doctest::skip(true)) {} 758 #elif !EMSCRIPTEN 759 //! Emscripten requires preloading of the file used to determine memory usage, hence disabled. 760 //! Test for thread safety for the task_group 761 //! \brief \ref error_guessing \ref resource_usage 762 TEST_CASE("Thread safety test for the task group") { 763 if (tbb::this_task_arena::max_concurrency() < 2) { 764 // The test requires more than one thread to check thread safety 765 return; 766 } 767 for (unsigned p=MinThread; p <= MaxThread; ++p) { 768 if (p < 2) { 769 continue; 770 } 771 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 772 g_MaxConcurrency = p; 773 TestThreadSafety<tbb::task_group>(); 774 } 775 } 776 #endif 777 778 //! Fibonacci test for task group 779 //! \brief \ref interface \ref requirement 780 TEST_CASE("Fibonacci test for the task group") { 781 for (unsigned p=MinThread; p <= MaxThread; ++p) { 782 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 783 g_MaxConcurrency = p; 784 RunFibonacciTests<tbb::task_group>(); 785 } 786 } 787 788 //! Cancellation and exception test for the task group 789 //! \brief \ref interface \ref requirement 790 TEST_CASE("Cancellation and exception test for the task group") { 791 for (unsigned p = MinThread; p <= MaxThread; ++p) { 792 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 793 tbb::task_arena a(p); 794 g_MaxConcurrency = p; 795 a.execute([] { 796 RunCancellationAndExceptionHandlingTests<tbb::task_group>(); 797 }); 798 } 799 } 800 801 //! Constant functor test for the task group 802 //! \brief \ref interface \ref negative 803 TEST_CASE("Constant functor test for the task group") { 804 for (unsigned p=MinThread; p <= MaxThread; ++p) { 805 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 806 g_MaxConcurrency = p; 807 TestConstantFunctorRequirement<tbb::task_group>(); 808 } 809 } 810 811 //! Move semantics test for the task group 812 //! \brief \ref interface \ref requirement 813 TEST_CASE("Move semantics test for the task group") { 814 for (unsigned p=MinThread; p <= MaxThread; ++p) { 815 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 816 g_MaxConcurrency = p; 817 TestMoveSemantics<tbb::task_group>(); 818 } 819 } 820 821 #if TBB_PREVIEW_ISOLATED_TASK_GROUP 822 823 #if __TBB_USE_ADDRESS_SANITIZER 824 //! Test for thread safety for the isolated_task_group 825 //! \brief \ref error_guessing 826 TEST_CASE("Memory leaks test is not applicable under ASAN\n" * doctest::skip(true)) {} 827 #elif !EMSCRIPTEN 828 //! Test for thread safety for the isolated_task_group 829 //! \brief \ref error_guessing 830 TEST_CASE("Thread safety test for the isolated task group") { 831 if (tbb::this_task_arena::max_concurrency() < 2) { 832 // The test requires more than one thread to check thread safety 833 return; 834 } 835 for (unsigned p=MinThread; p <= MaxThread; ++p) { 836 if (p < 2) { 837 continue; 838 } 839 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 840 g_MaxConcurrency = p; 841 TestThreadSafety<tbb::isolated_task_group>(); 842 } 843 } 844 #endif 845 846 //! Cancellation and exception test for the isolated task group 847 //! \brief \ref interface \ref requirement 848 TEST_CASE("Fibonacci test for the isolated task group") { 849 for (unsigned p=MinThread; p <= MaxThread; ++p) { 850 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 851 g_MaxConcurrency = p; 852 RunFibonacciTests<tbb::isolated_task_group>(); 853 } 854 } 855 856 //! Cancellation and exception test for the isolated task group 857 //! \brief \ref interface \ref requirement 858 TEST_CASE("Cancellation and exception test for the isolated task group") { 859 for (unsigned p=MinThread; p <= MaxThread; ++p) { 860 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 861 g_MaxConcurrency = p; 862 RunCancellationAndExceptionHandlingTests<tbb::isolated_task_group>(); 863 } 864 } 865 866 //! Constant functor test for the isolated task group. 867 //! \brief \ref interface \ref negative 868 TEST_CASE("Constant functor test for the isolated task group") { 869 for (unsigned p=MinThread; p <= MaxThread; ++p) { 870 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 871 g_MaxConcurrency = p; 872 TestConstantFunctorRequirement<tbb::isolated_task_group>(); 873 } 874 } 875 876 //! Move semantics test for the isolated task group. 877 //! \brief \ref interface \ref requirement 878 TEST_CASE("Move semantics test for the isolated task group") { 879 for (unsigned p=MinThread; p <= MaxThread; ++p) { 880 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 881 g_MaxConcurrency = p; 882 TestMoveSemantics<tbb::isolated_task_group>(); 883 } 884 } 885 886 //TODO: add test void isolated_task_group::run(d2::task_handle&& h) and isolated_task_group::::run_and_wait(d2::task_handle&& h) 887 #endif /* TBB_PREVIEW_ISOLATED_TASK_GROUP */ 888 889 void run_deep_stealing(tbb::task_group& tg1, tbb::task_group& tg2, int num_tasks, std::atomic<int>& tasks_executed) { 890 for (int i = 0; i < num_tasks; ++i) { 891 tg2.run([&tg1, &tasks_executed] { 892 volatile char consume_stack[1000]{}; 893 ++tasks_executed; 894 tg1.wait(); 895 utils::suppress_unused_warning(consume_stack); 896 }); 897 } 898 } 899 900 // TODO: move to the conformance test 901 //! Test for stack overflow avoidance mechanism. 902 //! \brief \ref requirement 903 TEST_CASE("Test for stack overflow avoidance mechanism") { 904 if (tbb::this_task_arena::max_concurrency() < 2) { 905 return; 906 } 907 908 tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, 2); 909 tbb::task_group tg1; 910 tbb::task_group tg2; 911 std::atomic<int> tasks_executed{}; 912 tg1.run_and_wait([&tg1, &tg2, &tasks_executed] { 913 run_deep_stealing(tg1, tg2, 10000, tasks_executed); 914 while (tasks_executed < 100) { 915 // Some stealing is expected to happen. 916 utils::yield(); 917 } 918 CHECK(tasks_executed < 10000); 919 }); 920 tg2.wait(); 921 CHECK(tasks_executed == 10000); 922 } 923 924 //! Test for stack overflow avoidance mechanism. 925 //! \brief \ref error_guessing 926 TEST_CASE("Test for stack overflow avoidance mechanism within arena") { 927 if (tbb::this_task_arena::max_concurrency() < 2) { 928 return; 929 } 930 931 tbb::task_arena a1(2, 1); 932 a1.execute([] { 933 tbb::task_group tg1; 934 tbb::task_group tg2; 935 std::atomic<int> tasks_executed{}; 936 937 // Determine nested task execution limit. 938 int second_thread_executed{}; 939 tg1.run_and_wait([&tg1, &tg2, &tasks_executed, &second_thread_executed] { 940 run_deep_stealing(tg1, tg2, 10000, tasks_executed); 941 do { 942 second_thread_executed = tasks_executed; 943 utils::Sleep(10); 944 } while (second_thread_executed < 100 || second_thread_executed != tasks_executed); 945 CHECK(tasks_executed < 10000); 946 }); 947 tg2.wait(); 948 CHECK(tasks_executed == 10000); 949 950 tasks_executed = 0; 951 tbb::task_arena a2(2, 2); 952 tg1.run_and_wait([&a2, &tg1, &tg2, &tasks_executed, second_thread_executed] { 953 run_deep_stealing(tg1, tg2, second_thread_executed - 1, tasks_executed); 954 while (tasks_executed < second_thread_executed - 1) { 955 // Wait until the second thread near the limit. 956 utils::yield(); 957 } 958 tg2.run([&a2, &tg1, &tasks_executed] { 959 a2.execute([&tg1, &tasks_executed] { 960 volatile char consume_stack[1000]{}; 961 ++tasks_executed; 962 tg1.wait(); 963 utils::suppress_unused_warning(consume_stack); 964 }); 965 }); 966 while (tasks_executed < second_thread_executed) { 967 // Wait until the second joins the arena. 968 utils::yield(); 969 } 970 a2.execute([&tg1, &tg2, &tasks_executed] { 971 run_deep_stealing(tg1, tg2, 10000, tasks_executed); 972 }); 973 int currently_executed{}; 974 do { 975 currently_executed = tasks_executed; 976 utils::Sleep(10); 977 } while (currently_executed != tasks_executed); 978 CHECK(tasks_executed < 10000 + second_thread_executed); 979 }); 980 a2.execute([&tg2] { 981 tg2.wait(); 982 }); 983 CHECK(tasks_executed == 10000 + second_thread_executed); 984 }); 985 } 986 987 //! Test checks that we can submit work to task_group asynchronously with waiting. 988 //! \brief \ref regression 989 TEST_CASE("Async task group") { 990 int num_threads = tbb::this_task_arena::max_concurrency(); 991 if (num_threads < 3) { 992 // The test requires at least 2 worker threads 993 return; 994 } 995 tbb::task_arena a(2*num_threads, num_threads); 996 utils::SpinBarrier barrier(num_threads + 2); 997 tbb::task_group tg[2]; 998 std::atomic<bool> finished[2]{}; 999 finished[0] = false; finished[1] = false; 1000 for (int i = 0; i < 2; ++i) { 1001 a.enqueue([i, &tg, &finished, &barrier] { 1002 barrier.wait(); 1003 for (int j = 0; j < 10000; ++j) { 1004 tg[i].run([] {}); 1005 utils::yield(); 1006 } 1007 finished[i] = true; 1008 }); 1009 } 1010 utils::NativeParallelFor(num_threads, [&](int idx) { 1011 barrier.wait(); 1012 a.execute([idx, &tg, &finished] { 1013 std::size_t counter{}; 1014 while (!finished[idx%2]) { 1015 tg[idx%2].wait(); 1016 if (counter++ % 16 == 0) utils::yield(); 1017 } 1018 tg[idx%2].wait(); 1019 }); 1020 }); 1021 } 1022 1023 struct SelfRunner { 1024 tbb::task_group& m_tg; 1025 std::atomic<unsigned>& count; 1026 void operator()() const { 1027 unsigned previous_count = count.fetch_sub(1); 1028 if (previous_count > 1) 1029 m_tg.run( *this ); 1030 } 1031 }; 1032 1033 //! Submit work to single task_group instance from inside the work 1034 //! \brief \ref error_guessing 1035 TEST_CASE("Run self using same task_group instance") { 1036 const unsigned num = 10; 1037 std::atomic<unsigned> count{num}; 1038 tbb::task_group tg; 1039 SelfRunner uf{tg, count}; 1040 tg.run( uf ); 1041 tg.wait(); 1042 CHECK_MESSAGE( 1043 count == 0, 1044 "Not all tasks were spawned from inside the functor running within task_group." 1045 ); 1046 } 1047 1048 //TODO: move to some common place to share with conformance tests 1049 namespace accept_task_group_context { 1050 1051 template <typename TaskGroup, typename CancelF, typename WaitF> 1052 void run_cancellation_use_case(CancelF&& cancel, WaitF&& wait) { 1053 std::atomic<bool> outer_cancelled{false}; 1054 std::atomic<unsigned> count{13}; 1055 1056 tbb::task_group_context inner_ctx(tbb::task_group_context::isolated); 1057 TaskGroup inner_tg(inner_ctx); 1058 1059 tbb::task_group outer_tg; 1060 auto outer_tg_task = [&] { 1061 inner_tg.run([&] { 1062 utils::SpinWaitUntilEq(outer_cancelled, true); 1063 inner_tg.run( SelfRunner{inner_tg, count} ); 1064 }); 1065 1066 utils::try_call([&] { 1067 std::forward<CancelF>(cancel)(outer_tg); 1068 }).on_completion([&] { 1069 outer_cancelled = true; 1070 }); 1071 }; 1072 1073 auto check = [&] { 1074 tbb::task_group_status outer_status = tbb::task_group_status::not_complete; 1075 outer_status = std::forward<WaitF>(wait)(outer_tg); 1076 CHECK_MESSAGE( 1077 outer_status == tbb::task_group_status::canceled, 1078 "Outer task group should have been cancelled." 1079 ); 1080 1081 tbb::task_group_status inner_status = inner_tg.wait(); 1082 CHECK_MESSAGE( 1083 inner_status == tbb::task_group_status::complete, 1084 "Inner task group should have completed despite the cancellation of the outer one." 1085 ); 1086 1087 CHECK_MESSAGE(0 == count, "Some of the inner group tasks were not executed."); 1088 }; 1089 1090 outer_tg.run(outer_tg_task); 1091 check(); 1092 } 1093 1094 template <typename TaskGroup> 1095 void test() { 1096 run_cancellation_use_case<TaskGroup>( 1097 [](tbb::task_group& outer) { outer.cancel(); }, 1098 [](tbb::task_group& outer) { return outer.wait(); } 1099 ); 1100 1101 #if TBB_USE_EXCEPTIONS 1102 run_cancellation_use_case<TaskGroup>( 1103 [](tbb::task_group& /*outer*/) { 1104 volatile bool suppress_unreachable_code_warning = true; 1105 if (suppress_unreachable_code_warning) { 1106 throw int(); 1107 } 1108 }, 1109 [](tbb::task_group& outer) { 1110 try { 1111 outer.wait(); 1112 return tbb::task_group_status::complete; 1113 } catch(const int&) { 1114 return tbb::task_group_status::canceled; 1115 } 1116 } 1117 ); 1118 #endif 1119 } 1120 1121 } // namespace accept_task_group_context 1122 1123 //! Respect task_group_context passed from outside 1124 //! \brief \ref interface \ref requirement 1125 TEST_CASE("Respect task_group_context passed from outside") { 1126 #if TBB_PREVIEW_ISOLATED_TASK_GROUP 1127 accept_task_group_context::test<tbb::isolated_task_group>(); 1128 #endif 1129 } 1130 1131 #if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS 1132 //! The test for task_handle inside other task waiting with run 1133 //! \brief \ref requirement 1134 TEST_CASE("Task handle for scheduler bypass"){ 1135 tbb::task_group tg; 1136 std::atomic<bool> run {false}; 1137 1138 tg.run([&]{ 1139 return tg.defer([&]{ 1140 run = true; 1141 }); 1142 }); 1143 1144 tg.wait(); 1145 CHECK_MESSAGE(run == true, "task handle returned by user lambda (bypassed) should be run"); 1146 } 1147 1148 //! The test for task_handle inside other task waiting with run_and_wait 1149 //! \brief \ref requirement 1150 TEST_CASE("Task handle for scheduler bypass via run_and_wait"){ 1151 tbb::task_group tg; 1152 std::atomic<bool> run {false}; 1153 1154 tg.run_and_wait([&]{ 1155 return tg.defer([&]{ 1156 run = true; 1157 }); 1158 }); 1159 1160 CHECK_MESSAGE(run == true, "task handle returned by user lambda (bypassed) should be run"); 1161 } 1162 #endif //__TBB_PREVIEW_TASK_GROUP_EXTENSIONS 1163 1164 #if TBB_USE_EXCEPTIONS 1165 //As these tests are against behavior marked by spec as undefined, they can not be put into conformance tests 1166 1167 //! The test for error in scheduling empty task_handle 1168 //! \brief \ref requirement 1169 TEST_CASE("Empty task_handle cannot be scheduled" 1170 * doctest::should_fail() //Test needs to revised as implementation uses assertions instead of exceptions 1171 * doctest::skip() //skip the test for now, to not pollute the test log 1172 ){ 1173 tbb::task_group tg; 1174 1175 CHECK_THROWS_WITH_AS(tg.run(tbb::task_handle{}), "Attempt to schedule empty task_handle", std::runtime_error); 1176 } 1177 1178 //! The test for error in task_handle being scheduled into task_group different from one it was created from 1179 //! \brief \ref requirement 1180 TEST_CASE("task_handle cannot be scheduled into different task_group" 1181 * doctest::should_fail() //Test needs to revised as implementation uses assertions instead of exceptions 1182 * doctest::skip() //skip the test for now, to not pollute the test log 1183 ){ 1184 tbb::task_group tg; 1185 tbb::task_group tg1; 1186 1187 CHECK_THROWS_WITH_AS(tg1.run(tg.defer([]{})), "Attempt to schedule task_handle into different task_group", std::runtime_error); 1188 } 1189 1190 //! The test for error in task_handle being scheduled into task_group different from one it was created from 1191 //! \brief \ref requirement 1192 TEST_CASE("task_handle cannot be scheduled into other task_group of the same context" 1193 * doctest::should_fail() //Implementation is no there yet, as it is not clear that is the expected behavior 1194 * doctest::skip() //skip the test for now, to not pollute the test log 1195 ) 1196 { 1197 tbb::task_group_context ctx; 1198 1199 tbb::task_group tg(ctx); 1200 tbb::task_group tg1(ctx); 1201 1202 CHECK_NOTHROW(tg.run(tg.defer([]{}))); 1203 CHECK_THROWS_WITH_AS(tg1.run(tg.defer([]{})), "Attempt to schedule task_handle into different task_group", std::runtime_error); 1204 } 1205 1206 #endif // TBB_USE_EXCEPTIONS 1207 1208