1 /* 2 Copyright (c) 2005-2022 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "common/test.h" 18 #include "common/utils.h" 19 #include "oneapi/tbb/detail/_config.h" 20 #include "tbb/global_control.h" 21 22 #include "tbb/task_group.h" 23 24 #include "common/concurrency_tracker.h" 25 26 #include <atomic> 27 #include <stdexcept> 28 29 //! \file test_task_group.cpp 30 //! \brief Test for [scheduler.task_group scheduler.task_group_status] specification 31 32 unsigned g_MaxConcurrency = 4; 33 using atomic_t = std::atomic<std::uintptr_t>; 34 unsigned MinThread = 1; 35 unsigned MaxThread = 4; 36 37 //------------------------------------------------------------------------ 38 // Tests for the thread safety of the task_group manipulations 39 //------------------------------------------------------------------------ 40 41 #include "common/spin_barrier.h" 42 43 enum SharingMode { 44 VagabondGroup = 1, 45 ParallelWait = 2 46 }; 47 48 template<typename task_group_type> 49 class SharedGroupBodyImpl : utils::NoCopy, utils::NoAfterlife { 50 static const std::uintptr_t c_numTasks0 = 4096, 51 c_numTasks1 = 1024; 52 53 const std::uintptr_t m_numThreads; 54 const std::uintptr_t m_sharingMode; 55 56 task_group_type *m_taskGroup; 57 atomic_t m_tasksSpawned, 58 m_threadsReady; 59 utils::SpinBarrier m_barrier; 60 61 static atomic_t s_tasksExecuted; 62 63 struct TaskFunctor { 64 SharedGroupBodyImpl *m_pOwner; 65 void operator () () const { 66 if ( m_pOwner->m_sharingMode & ParallelWait ) { 67 while ( utils::ConcurrencyTracker::PeakParallelism() < m_pOwner->m_numThreads ) 68 utils::yield(); 69 } 70 ++s_tasksExecuted; 71 } 72 }; 73 74 TaskFunctor m_taskFunctor; 75 76 void Spawn ( std::uintptr_t numTasks ) { 77 for ( std::uintptr_t i = 0; i < numTasks; ++i ) { 78 ++m_tasksSpawned; 79 utils::ConcurrencyTracker ct; 80 m_taskGroup->run( m_taskFunctor ); 81 } 82 ++m_threadsReady; 83 } 84 85 void DeleteTaskGroup () { 86 delete m_taskGroup; 87 m_taskGroup = NULL; 88 } 89 90 void Wait () { 91 while ( m_threadsReady != m_numThreads ) 92 utils::yield(); 93 const std::uintptr_t numSpawned = c_numTasks0 + c_numTasks1 * (m_numThreads - 1); 94 CHECK_MESSAGE( m_tasksSpawned == numSpawned, "Wrong number of spawned tasks. The test is broken" ); 95 INFO("Max spawning parallelism is " << utils::ConcurrencyTracker::PeakParallelism() << "out of " << g_MaxConcurrency); 96 if ( m_sharingMode & ParallelWait ) { 97 m_barrier.wait( &utils::ConcurrencyTracker::Reset ); 98 { 99 utils::ConcurrencyTracker ct; 100 m_taskGroup->wait(); 101 } 102 if ( utils::ConcurrencyTracker::PeakParallelism() == 1 ) 103 WARN( "Warning: No parallel waiting detected in TestParallelWait" ); 104 m_barrier.wait(); 105 } 106 else 107 m_taskGroup->wait(); 108 CHECK_MESSAGE( m_tasksSpawned == numSpawned, "No tasks should be spawned after wait starts. The test is broken" ); 109 CHECK_MESSAGE( s_tasksExecuted == numSpawned, "Not all spawned tasks were executed" ); 110 } 111 112 public: 113 SharedGroupBodyImpl ( std::uintptr_t numThreads, std::uintptr_t sharingMode = 0 ) 114 : m_numThreads(numThreads) 115 , m_sharingMode(sharingMode) 116 , m_taskGroup(NULL) 117 , m_barrier(numThreads) 118 { 119 CHECK_MESSAGE( m_numThreads > 1, "SharedGroupBody tests require concurrency" ); 120 if ((m_sharingMode & VagabondGroup) && m_numThreads != 2) { 121 CHECK_MESSAGE(false, "In vagabond mode SharedGroupBody must be used with 2 threads only"); 122 } 123 utils::ConcurrencyTracker::Reset(); 124 s_tasksExecuted = 0; 125 m_tasksSpawned = 0; 126 m_threadsReady = 0; 127 m_taskFunctor.m_pOwner = this; 128 } 129 130 void Run ( std::uintptr_t idx ) { 131 AssertLive(); 132 if ( idx == 0 ) { 133 if (m_taskGroup || m_tasksSpawned) { 134 CHECK_MESSAGE(false, "SharedGroupBody must be reset before reuse"); 135 } 136 m_taskGroup = new task_group_type; 137 Spawn( c_numTasks0 ); 138 Wait(); 139 if ( m_sharingMode & VagabondGroup ) 140 m_barrier.wait(); 141 else 142 DeleteTaskGroup(); 143 } 144 else { 145 while ( m_tasksSpawned == 0 ) 146 utils::yield(); 147 CHECK_MESSAGE ( m_taskGroup, "Task group is not initialized"); 148 Spawn (c_numTasks1); 149 if ( m_sharingMode & ParallelWait ) 150 Wait(); 151 if ( m_sharingMode & VagabondGroup ) { 152 CHECK_MESSAGE ( idx == 1, "In vagabond mode SharedGroupBody must be used with 2 threads only" ); 153 m_barrier.wait(); 154 DeleteTaskGroup(); 155 } 156 } 157 AssertLive(); 158 } 159 }; 160 161 template<typename task_group_type> 162 atomic_t SharedGroupBodyImpl<task_group_type>::s_tasksExecuted; 163 164 template<typename task_group_type> 165 class SharedGroupBody : utils::NoAssign, utils::NoAfterlife { 166 bool m_bOwner; 167 SharedGroupBodyImpl<task_group_type> *m_pImpl; 168 public: 169 SharedGroupBody ( std::uintptr_t numThreads, std::uintptr_t sharingMode = 0 ) 170 : utils::NoAssign() 171 , utils::NoAfterlife() 172 , m_bOwner(true) 173 , m_pImpl( new SharedGroupBodyImpl<task_group_type>(numThreads, sharingMode) ) 174 {} 175 SharedGroupBody ( const SharedGroupBody& src ) 176 : utils::NoAssign() 177 , utils::NoAfterlife() 178 , m_bOwner(false) 179 , m_pImpl(src.m_pImpl) 180 {} 181 ~SharedGroupBody () { 182 if ( m_bOwner ) 183 delete m_pImpl; 184 } 185 void operator() ( std::uintptr_t idx ) const { 186 // Wrap the functior into additional task group to enforce bounding. 187 task_group_type tg; 188 tg.run_and_wait([&] { m_pImpl->Run(idx); }); 189 } 190 }; 191 192 template<typename task_group_type> 193 class RunAndWaitSyncronizationTestBody : utils::NoAssign { 194 utils::SpinBarrier& m_barrier; 195 std::atomic<bool>& m_completed; 196 task_group_type& m_tg; 197 public: 198 RunAndWaitSyncronizationTestBody(utils::SpinBarrier& barrier, std::atomic<bool>& completed, task_group_type& tg) 199 : m_barrier(barrier), m_completed(completed), m_tg(tg) {} 200 201 void operator()() const { 202 m_barrier.wait(); 203 utils::doDummyWork(100000); 204 m_completed = true; 205 } 206 207 void operator()(int id) const { 208 if (id == 0) { 209 m_tg.run_and_wait(*this); 210 } else { 211 m_barrier.wait(); 212 m_tg.wait(); 213 CHECK_MESSAGE(m_completed, "A concurrent waiter has left the wait method earlier than work has finished"); 214 } 215 } 216 }; 217 218 template<typename task_group_type> 219 void TestParallelSpawn () { 220 NativeParallelFor( g_MaxConcurrency, SharedGroupBody<task_group_type>(g_MaxConcurrency) ); 221 } 222 223 template<typename task_group_type> 224 void TestParallelWait () { 225 NativeParallelFor( g_MaxConcurrency, SharedGroupBody<task_group_type>(g_MaxConcurrency, ParallelWait) ); 226 227 utils::SpinBarrier barrier(g_MaxConcurrency); 228 std::atomic<bool> completed; 229 completed = false; 230 task_group_type tg; 231 RunAndWaitSyncronizationTestBody<task_group_type> b(barrier, completed, tg); 232 NativeParallelFor( g_MaxConcurrency, b ); 233 } 234 235 // Tests non-stack-bound task group (the group that is allocated by one thread and destroyed by the other) 236 template<typename task_group_type> 237 void TestVagabondGroup () { 238 NativeParallelFor( 2, SharedGroupBody<task_group_type>(2, VagabondGroup) ); 239 } 240 241 #include "common/memory_usage.h" 242 243 template<typename task_group_type> 244 void TestThreadSafety() { 245 auto tests = [] { 246 for (int trail = 0; trail < 10; ++trail) { 247 TestParallelSpawn<task_group_type>(); 248 TestParallelWait<task_group_type>(); 249 TestVagabondGroup<task_group_type>(); 250 } 251 }; 252 253 // Test and warm up allocator. 254 tests(); 255 256 // Ensure that cosumption is stabilized. 257 std::size_t initial = utils::GetMemoryUsage(); 258 for (;;) { 259 tests(); 260 std::size_t current = utils::GetMemoryUsage(); 261 if (current <= initial) { 262 return; 263 } 264 initial = current; 265 } 266 } 267 //------------------------------------------------------------------------ 268 // Common requisites of the Fibonacci tests 269 //------------------------------------------------------------------------ 270 271 const std::uintptr_t N = 20; 272 const std::uintptr_t F = 6765; 273 274 atomic_t g_Sum; 275 276 #define FIB_TEST_PROLOGUE() \ 277 const unsigned numRepeats = g_MaxConcurrency * 4; \ 278 utils::ConcurrencyTracker::Reset() 279 280 #define FIB_TEST_EPILOGUE(sum) \ 281 CHECK(utils::ConcurrencyTracker::PeakParallelism() <= g_MaxConcurrency); \ 282 CHECK( sum == numRepeats * F ); 283 284 285 // Fibonacci tasks specified as functors 286 template<class task_group_type> 287 class FibTaskBase : utils::NoAssign, utils::NoAfterlife { 288 protected: 289 std::uintptr_t* m_pRes; 290 mutable std::uintptr_t m_Num; 291 virtual void impl() const = 0; 292 public: 293 FibTaskBase( std::uintptr_t* y, std::uintptr_t n ) : m_pRes(y), m_Num(n) {} 294 void operator()() const { 295 utils::ConcurrencyTracker ct; 296 AssertLive(); 297 if( m_Num < 2 ) { 298 *m_pRes = m_Num; 299 } else { 300 impl(); 301 } 302 } 303 virtual ~FibTaskBase() {} 304 }; 305 306 template<class task_group_type> 307 class FibTaskAsymmetricTreeWithFunctor : public FibTaskBase<task_group_type> { 308 public: 309 FibTaskAsymmetricTreeWithFunctor( std::uintptr_t* y, std::uintptr_t n ) : FibTaskBase<task_group_type>(y, n) {} 310 virtual void impl() const override { 311 std::uintptr_t x = ~0u; 312 task_group_type tg; 313 tg.run( FibTaskAsymmetricTreeWithFunctor(&x, this->m_Num-1) ); 314 this->m_Num -= 2; tg.run_and_wait( *this ); 315 *(this->m_pRes) += x; 316 } 317 }; 318 319 template<class task_group_type> 320 class FibTaskSymmetricTreeWithFunctor : public FibTaskBase<task_group_type> { 321 public: 322 FibTaskSymmetricTreeWithFunctor( std::uintptr_t* y, std::uintptr_t n ) : FibTaskBase<task_group_type>(y, n) {} 323 virtual void impl() const override { 324 std::uintptr_t x = ~0u, 325 y = ~0u; 326 task_group_type tg; 327 tg.run( FibTaskSymmetricTreeWithFunctor(&x, this->m_Num-1) ); 328 tg.run( FibTaskSymmetricTreeWithFunctor(&y, this->m_Num-2) ); 329 tg.wait(); 330 *(this->m_pRes) = x + y; 331 } 332 }; 333 334 // Helper functions 335 template<class fib_task> 336 std::uintptr_t RunFibTask(std::uintptr_t n) { 337 std::uintptr_t res = ~0u; 338 fib_task(&res, n)(); 339 return res; 340 } 341 342 template<typename fib_task> 343 void RunFibTest() { 344 FIB_TEST_PROLOGUE(); 345 std::uintptr_t sum = 0; 346 for( unsigned i = 0; i < numRepeats; ++i ) 347 sum += RunFibTask<fib_task>(N); 348 FIB_TEST_EPILOGUE(sum); 349 } 350 351 template<typename fib_task> 352 void FibFunctionNoArgs() { 353 g_Sum += RunFibTask<fib_task>(N); 354 } 355 356 template<typename task_group_type> 357 void TestFibWithLambdas() { 358 FIB_TEST_PROLOGUE(); 359 atomic_t sum; 360 sum = 0; 361 task_group_type tg; 362 for( unsigned i = 0; i < numRepeats; ++i ) 363 tg.run( [&](){sum += RunFibTask<FibTaskSymmetricTreeWithFunctor<task_group_type> >(N);} ); 364 tg.wait(); 365 FIB_TEST_EPILOGUE(sum); 366 } 367 368 template<typename task_group_type> 369 void TestFibWithFunctor() { 370 RunFibTest<FibTaskAsymmetricTreeWithFunctor<task_group_type> >(); 371 RunFibTest< FibTaskSymmetricTreeWithFunctor<task_group_type> >(); 372 } 373 374 template<typename task_group_type> 375 void TestFibWithFunctionPtr() { 376 FIB_TEST_PROLOGUE(); 377 g_Sum = 0; 378 task_group_type tg; 379 for( unsigned i = 0; i < numRepeats; ++i ) 380 tg.run( &FibFunctionNoArgs<FibTaskSymmetricTreeWithFunctor<task_group_type> > ); 381 tg.wait(); 382 FIB_TEST_EPILOGUE(g_Sum); 383 } 384 385 template<typename task_group_type> 386 void RunFibonacciTests() { 387 TestFibWithLambdas<task_group_type>(); 388 TestFibWithFunctor<task_group_type>(); 389 TestFibWithFunctionPtr<task_group_type>(); 390 } 391 392 class test_exception : public std::exception 393 { 394 const char* m_strDescription; 395 public: 396 test_exception ( const char* descr ) : m_strDescription(descr) {} 397 398 const char* what() const throw() override { return m_strDescription; } 399 }; 400 401 using TestException = test_exception; 402 403 #include <string.h> 404 405 #define NUM_CHORES 512 406 #define NUM_GROUPS 64 407 #define SKIP_CHORES (NUM_CHORES/4) 408 #define SKIP_GROUPS (NUM_GROUPS/4) 409 #define EXCEPTION_DESCR1 "Test exception 1" 410 #define EXCEPTION_DESCR2 "Test exception 2" 411 412 atomic_t g_ExceptionCount; 413 atomic_t g_TaskCount; 414 unsigned g_ExecutedAtCancellation; 415 bool g_Rethrow; 416 bool g_Throw; 417 418 class ThrowingTask : utils::NoAssign, utils::NoAfterlife { 419 atomic_t &m_TaskCount; 420 public: 421 ThrowingTask( atomic_t& counter ) : m_TaskCount(counter) {} 422 void operator() () const { 423 utils::ConcurrencyTracker ct; 424 AssertLive(); 425 if ( g_Throw ) { 426 if ( ++m_TaskCount == SKIP_CHORES ) 427 TBB_TEST_THROW(test_exception(EXCEPTION_DESCR1)); 428 utils::yield(); 429 } 430 else { 431 ++g_TaskCount; 432 while( !tbb::is_current_task_group_canceling() ) 433 utils::yield(); 434 } 435 } 436 }; 437 438 inline void ResetGlobals ( bool bThrow, bool bRethrow ) { 439 g_Throw = bThrow; 440 g_Rethrow = bRethrow; 441 g_ExceptionCount = 0; 442 g_TaskCount = 0; 443 utils::ConcurrencyTracker::Reset(); 444 } 445 446 template<typename task_group_type> 447 void LaunchChildrenWithFunctor () { 448 atomic_t count; 449 count = 0; 450 task_group_type g; 451 for (unsigned i = 0; i < NUM_CHORES; ++i) { 452 if (i % 2 == 1) { 453 g.run(g.defer(ThrowingTask(count))); 454 } else 455 { 456 g.run(ThrowingTask(count)); 457 } 458 } 459 #if TBB_USE_EXCEPTIONS 460 tbb::task_group_status status = tbb::not_complete; 461 bool exceptionCaught = false; 462 try { 463 status = g.wait(); 464 } catch ( TestException& e ) { 465 CHECK_MESSAGE( e.what(), "Empty what() string" ); 466 CHECK_MESSAGE( strcmp(e.what(), EXCEPTION_DESCR1) == 0, "Unknown exception" ); 467 exceptionCaught = true; 468 ++g_ExceptionCount; 469 } catch( ... ) { CHECK_MESSAGE( false, "Unknown exception" ); } 470 if (g_Throw && !exceptionCaught && status != tbb::canceled) { 471 CHECK_MESSAGE(false, "No exception in the child task group"); 472 } 473 if ( g_Rethrow && g_ExceptionCount > SKIP_GROUPS ) { 474 throw test_exception(EXCEPTION_DESCR2); 475 } 476 #else 477 g.wait(); 478 #endif 479 } 480 481 // Tests for cancellation and exception handling behavior 482 template<typename task_group_type> 483 void TestManualCancellationWithFunctor () { 484 ResetGlobals( false, false ); 485 task_group_type tg; 486 for (unsigned i = 0; i < NUM_GROUPS; ++i) { 487 // TBB version does not require taking function address 488 if (i % 2 == 0) { 489 auto h = tg.defer(&LaunchChildrenWithFunctor<task_group_type>); 490 tg.run(std::move(h)); 491 } else 492 { 493 tg.run(&LaunchChildrenWithFunctor<task_group_type>); 494 } 495 } 496 CHECK_MESSAGE ( !tbb::is_current_task_group_canceling(), "Unexpected cancellation" ); 497 while ( g_MaxConcurrency > 1 && g_TaskCount == 0 ) 498 utils::yield(); 499 tg.cancel(); 500 g_ExecutedAtCancellation = int(g_TaskCount); 501 tbb::task_group_status status = tg.wait(); 502 CHECK_MESSAGE( status == tbb::canceled, "Task group reported invalid status." ); 503 CHECK_MESSAGE( g_TaskCount <= NUM_GROUPS * NUM_CHORES, "Too many tasks reported. The test is broken" ); 504 CHECK_MESSAGE( g_TaskCount < NUM_GROUPS * NUM_CHORES, "No tasks were cancelled. Cancellation model changed?" ); 505 CHECK_MESSAGE( g_TaskCount <= g_ExecutedAtCancellation + utils::ConcurrencyTracker::PeakParallelism(), "Too many tasks survived cancellation" ); 506 } 507 508 #if TBB_USE_EXCEPTIONS 509 template<typename task_group_type> 510 void TestExceptionHandling1 () { 511 ResetGlobals( true, false ); 512 task_group_type tg; 513 for( unsigned i = 0; i < NUM_GROUPS; ++i ) 514 // TBB version does not require taking function address 515 tg.run( &LaunchChildrenWithFunctor<task_group_type> ); 516 try { 517 tg.wait(); 518 } catch ( ... ) { 519 CHECK_MESSAGE( false, "Unexpected exception" ); 520 } 521 CHECK_MESSAGE( g_ExceptionCount <= NUM_GROUPS, "Too many exceptions from the child groups. The test is broken" ); 522 CHECK_MESSAGE( g_ExceptionCount == NUM_GROUPS, "Not all child groups threw the exception" ); 523 } 524 525 template<typename task_group_type> 526 void TestExceptionHandling2 () { 527 ResetGlobals( true, true ); 528 task_group_type tg; 529 bool exceptionCaught = false; 530 for( unsigned i = 0; i < NUM_GROUPS; ++i ) { 531 // TBB version does not require taking function address 532 tg.run( &LaunchChildrenWithFunctor<task_group_type> ); 533 } 534 try { 535 tg.wait(); 536 } catch ( TestException& e ) { 537 CHECK_MESSAGE( e.what(), "Empty what() string" ); 538 CHECK_MESSAGE( strcmp(e.what(), EXCEPTION_DESCR2) == 0, "Unknown exception" ); 539 exceptionCaught = true; 540 } catch( ... ) { CHECK_MESSAGE( false, "Unknown exception" ); } 541 CHECK_MESSAGE( exceptionCaught, "No exception thrown from the root task group" ); 542 CHECK_MESSAGE( g_ExceptionCount >= SKIP_GROUPS, "Too few exceptions from the child groups. The test is broken" ); 543 CHECK_MESSAGE( g_ExceptionCount <= NUM_GROUPS - SKIP_GROUPS, "Too many exceptions from the child groups. The test is broken" ); 544 CHECK_MESSAGE( g_ExceptionCount < NUM_GROUPS - SKIP_GROUPS, "None of the child groups was cancelled" ); 545 } 546 547 template <typename task_group_type> 548 void TestExceptionHandling3() { 549 task_group_type tg; 550 try { 551 tg.run_and_wait([]() { 552 volatile bool suppress_unreachable_code_warning = true; 553 if (suppress_unreachable_code_warning) { 554 throw 1; 555 } 556 }); 557 } catch (int error) { 558 CHECK(error == 1); 559 } catch ( ... ) { 560 CHECK_MESSAGE( false, "Unexpected exception" ); 561 } 562 } 563 564 template<typename task_group_type> 565 class LaunchChildrenDriver { 566 public: 567 void Launch(task_group_type& tg) { 568 ResetGlobals(false, false); 569 for (unsigned i = 0; i < NUM_GROUPS; ++i) { 570 tg.run(LaunchChildrenWithFunctor<task_group_type>); 571 } 572 CHECK_MESSAGE(!tbb::is_current_task_group_canceling(), "Unexpected cancellation"); 573 while (g_MaxConcurrency > 1 && g_TaskCount == 0) 574 utils::yield(); 575 } 576 577 void Finish() { 578 CHECK_MESSAGE(g_TaskCount <= NUM_GROUPS * NUM_CHORES, "Too many tasks reported. The test is broken"); 579 CHECK_MESSAGE(g_TaskCount < NUM_GROUPS * NUM_CHORES, "No tasks were cancelled. Cancellation model changed?"); 580 CHECK_MESSAGE(g_TaskCount <= g_ExecutedAtCancellation + g_MaxConcurrency, "Too many tasks survived cancellation"); 581 } 582 }; // LaunchChildrenWithTaskHandleDriver 583 584 template<typename task_group_type, bool Throw> 585 void TestMissingWait () { 586 bool exception_occurred = false, 587 unexpected_exception = false; 588 LaunchChildrenDriver<task_group_type> driver; 589 try { 590 task_group_type tg; 591 driver.Launch( tg ); 592 volatile bool suppress_unreachable_code_warning = Throw; 593 if (suppress_unreachable_code_warning) { 594 throw int(); // Initiate stack unwinding 595 } 596 } 597 catch ( const tbb::missing_wait& e ) { 598 CHECK_MESSAGE( e.what(), "Error message is absent" ); 599 exception_occurred = true; 600 unexpected_exception = Throw; 601 } 602 catch ( int ) { 603 exception_occurred = true; 604 unexpected_exception = !Throw; 605 } 606 catch ( ... ) { 607 exception_occurred = unexpected_exception = true; 608 } 609 CHECK( exception_occurred ); 610 CHECK( !unexpected_exception ); 611 driver.Finish(); 612 } 613 #endif 614 615 template<typename task_group_type> 616 void RunCancellationAndExceptionHandlingTests() { 617 TestManualCancellationWithFunctor<task_group_type>(); 618 #if TBB_USE_EXCEPTIONS 619 TestExceptionHandling1<task_group_type>(); 620 TestExceptionHandling2<task_group_type>(); 621 TestExceptionHandling3<task_group_type>(); 622 TestMissingWait<task_group_type, true>(); 623 TestMissingWait<task_group_type, false>(); 624 #endif 625 } 626 627 void EmptyFunction () {} 628 629 struct TestFunctor { 630 void operator()() { CHECK_MESSAGE( false, "Non-const operator called" ); } 631 void operator()() const { /* library requires this overload only */ } 632 }; 633 634 template<typename task_group_type> 635 void TestConstantFunctorRequirement() { 636 task_group_type g; 637 TestFunctor tf; 638 g.run( tf ); g.wait(); 639 g.run_and_wait( tf ); 640 } 641 642 //------------------------------------------------------------------------ 643 namespace TestMoveSemanticsNS { 644 struct TestFunctor { 645 void operator()() const {}; 646 }; 647 648 struct MoveOnlyFunctor : utils::MoveOnly, TestFunctor { 649 MoveOnlyFunctor() : utils::MoveOnly() {}; 650 MoveOnlyFunctor(MoveOnlyFunctor&& other) : utils::MoveOnly(std::move(other)) {}; 651 }; 652 653 struct MovePreferableFunctor : utils::Movable, TestFunctor { 654 MovePreferableFunctor() : utils::Movable() {}; 655 MovePreferableFunctor(MovePreferableFunctor&& other) : utils::Movable(std::move(other)) {}; 656 MovePreferableFunctor(const MovePreferableFunctor& other) : utils::Movable(other) {}; 657 }; 658 659 struct NoMoveNoCopyFunctor : utils::NoCopy, TestFunctor { 660 NoMoveNoCopyFunctor() : utils::NoCopy() {}; 661 // mv ctor is not allowed as cp ctor from parent utils::NoCopy 662 private: 663 NoMoveNoCopyFunctor(NoMoveNoCopyFunctor&&); 664 }; 665 666 template<typename task_group_type> 667 void TestBareFunctors() { 668 task_group_type tg; 669 MovePreferableFunctor mpf; 670 // run_and_wait() doesn't have any copies or moves of arguments inside the impl 671 tg.run_and_wait( NoMoveNoCopyFunctor() ); 672 673 tg.run( MoveOnlyFunctor() ); 674 tg.wait(); 675 676 tg.run( mpf ); 677 tg.wait(); 678 CHECK_MESSAGE(mpf.alive, "object was moved when was passed by lval"); 679 mpf.Reset(); 680 681 tg.run( std::move(mpf) ); 682 tg.wait(); 683 CHECK_MESSAGE(!mpf.alive, "object was copied when was passed by rval"); 684 mpf.Reset(); 685 } 686 } 687 688 template<typename task_group_type> 689 void TestMoveSemantics() { 690 TestMoveSemanticsNS::TestBareFunctors<task_group_type>(); 691 } 692 //------------------------------------------------------------------------ 693 694 // TODO: TBB_REVAMP_TODO - enable when ETS is available 695 #if TBBTEST_USE_TBB && TBB_PREVIEW_ISOLATED_TASK_GROUP 696 namespace TestIsolationNS { 697 class DummyFunctor { 698 public: 699 DummyFunctor() {} 700 void operator()() const { 701 for ( volatile int j = 0; j < 10; ++j ) {} 702 } 703 }; 704 705 template<typename task_group_type> 706 class ParForBody { 707 task_group_type& m_tg; 708 std::atomic<bool>& m_preserved; 709 tbb::enumerable_thread_specific<int>& m_ets; 710 public: 711 ParForBody( 712 task_group_type& tg, 713 std::atomic<bool>& preserved, 714 tbb::enumerable_thread_specific<int>& ets 715 ) : m_tg(tg), m_preserved(preserved), m_ets(ets) {} 716 717 void operator()(int) const { 718 if (++m_ets.local() > 1) m_preserved = false; 719 720 for (int i = 0; i < 1000; ++i) 721 m_tg.run(DummyFunctor()); 722 m_tg.wait(); 723 m_tg.run_and_wait(DummyFunctor()); 724 725 --m_ets.local(); 726 } 727 }; 728 729 template<typename task_group_type> 730 void CheckIsolation(bool isolation_is_expected) { 731 task_group_type tg; 732 std::atomic<bool> isolation_is_preserved; 733 isolation_is_preserved = true; 734 tbb::enumerable_thread_specific<int> ets(0); 735 736 tbb::parallel_for(0, 100, ParForBody<task_group_type>(tg, isolation_is_preserved, ets)); 737 738 ASSERT( 739 isolation_is_expected == isolation_is_preserved, 740 "Actual and expected isolation-related behaviours are different" 741 ); 742 } 743 744 // Should be called only when > 1 thread is used, because otherwise isolation is guaranteed to take place 745 void TestIsolation() { 746 CheckIsolation<tbb::task_group>(false); 747 CheckIsolation<tbb::isolated_task_group>(true); 748 } 749 } 750 #endif 751 752 #if __TBB_USE_ADDRESS_SANITIZER 753 //! Test for thread safety for the task_group 754 //! \brief \ref error_guessing \ref resource_usage 755 TEST_CASE("Memory leaks test is not applicable under ASAN\n" * doctest::skip(true)) {} 756 #else 757 //! Test for thread safety for the task_group 758 //! \brief \ref error_guessing \ref resource_usage 759 TEST_CASE("Thread safety test for the task group") { 760 if (tbb::this_task_arena::max_concurrency() < 2) { 761 // The test requires more than one thread to check thread safety 762 return; 763 } 764 for (unsigned p=MinThread; p <= MaxThread; ++p) { 765 if (p < 2) { 766 continue; 767 } 768 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 769 g_MaxConcurrency = p; 770 TestThreadSafety<tbb::task_group>(); 771 } 772 } 773 #endif 774 775 //! Fibonacci test for task group 776 //! \brief \ref interface \ref requirement 777 TEST_CASE("Fibonacci test for the task group") { 778 for (unsigned p=MinThread; p <= MaxThread; ++p) { 779 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 780 g_MaxConcurrency = p; 781 RunFibonacciTests<tbb::task_group>(); 782 } 783 } 784 785 //! Cancellation and exception test for the task group 786 //! \brief \ref interface \ref requirement 787 TEST_CASE("Cancellation and exception test for the task group") { 788 for (unsigned p = MinThread; p <= MaxThread; ++p) { 789 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 790 tbb::task_arena a(p); 791 g_MaxConcurrency = p; 792 a.execute([] { 793 RunCancellationAndExceptionHandlingTests<tbb::task_group>(); 794 }); 795 } 796 } 797 798 //! Constant functor test for the task group 799 //! \brief \ref interface \ref negative 800 TEST_CASE("Constant functor test for the task group") { 801 for (unsigned p=MinThread; p <= MaxThread; ++p) { 802 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 803 g_MaxConcurrency = p; 804 TestConstantFunctorRequirement<tbb::task_group>(); 805 } 806 } 807 808 //! Move semantics test for the task group 809 //! \brief \ref interface \ref requirement 810 TEST_CASE("Move semantics test for the task group") { 811 for (unsigned p=MinThread; p <= MaxThread; ++p) { 812 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 813 g_MaxConcurrency = p; 814 TestMoveSemantics<tbb::task_group>(); 815 } 816 } 817 818 #if TBB_PREVIEW_ISOLATED_TASK_GROUP 819 820 #if __TBB_USE_ADDRESS_SANITIZER 821 //! Test for thread safety for the isolated_task_group 822 //! \brief \ref error_guessing 823 TEST_CASE("Memory leaks test is not applicable under ASAN\n" * doctest::skip(true)) {} 824 #else 825 //! Test for thread safety for the isolated_task_group 826 //! \brief \ref error_guessing 827 TEST_CASE("Thread safety test for the isolated task group") { 828 if (tbb::this_task_arena::max_concurrency() < 2) { 829 // The test requires more than one thread to check thread safety 830 return; 831 } 832 for (unsigned p=MinThread; p <= MaxThread; ++p) { 833 if (p < 2) { 834 continue; 835 } 836 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 837 g_MaxConcurrency = p; 838 TestThreadSafety<tbb::isolated_task_group>(); 839 } 840 } 841 #endif 842 843 //! Cancellation and exception test for the isolated task group 844 //! \brief \ref interface \ref requirement 845 TEST_CASE("Fibonacci test for the isolated task group") { 846 for (unsigned p=MinThread; p <= MaxThread; ++p) { 847 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 848 g_MaxConcurrency = p; 849 RunFibonacciTests<tbb::isolated_task_group>(); 850 } 851 } 852 853 //! Cancellation and exception test for the isolated task group 854 //! \brief \ref interface \ref requirement 855 TEST_CASE("Cancellation and exception test for the isolated task group") { 856 for (unsigned p=MinThread; p <= MaxThread; ++p) { 857 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 858 g_MaxConcurrency = p; 859 RunCancellationAndExceptionHandlingTests<tbb::isolated_task_group>(); 860 } 861 } 862 863 //! Constant functor test for the isolated task group. 864 //! \brief \ref interface \ref negative 865 TEST_CASE("Constant functor test for the isolated task group") { 866 for (unsigned p=MinThread; p <= MaxThread; ++p) { 867 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 868 g_MaxConcurrency = p; 869 TestConstantFunctorRequirement<tbb::isolated_task_group>(); 870 } 871 } 872 873 //! Move semantics test for the isolated task group. 874 //! \brief \ref interface \ref requirement 875 TEST_CASE("Move semantics test for the isolated task group") { 876 for (unsigned p=MinThread; p <= MaxThread; ++p) { 877 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 878 g_MaxConcurrency = p; 879 TestMoveSemantics<tbb::isolated_task_group>(); 880 } 881 } 882 883 //TODO: add test void isolated_task_group::run(d2::task_handle&& h) and isolated_task_group::::run_and_wait(d2::task_handle&& h) 884 #endif /* TBB_PREVIEW_ISOLATED_TASK_GROUP */ 885 886 void run_deep_stealing(tbb::task_group& tg1, tbb::task_group& tg2, int num_tasks, std::atomic<int>& tasks_executed) { 887 for (int i = 0; i < num_tasks; ++i) { 888 tg2.run([&tg1, &tasks_executed] { 889 volatile char consume_stack[1000]{}; 890 ++tasks_executed; 891 tg1.wait(); 892 utils::suppress_unused_warning(consume_stack); 893 }); 894 } 895 } 896 897 // TODO: move to the conformance test 898 //! Test for stack overflow avoidance mechanism. 899 //! \brief \ref requirement 900 TEST_CASE("Test for stack overflow avoidance mechanism") { 901 if (tbb::this_task_arena::max_concurrency() < 2) { 902 return; 903 } 904 905 tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, 2); 906 tbb::task_group tg1; 907 tbb::task_group tg2; 908 std::atomic<int> tasks_executed{}; 909 tg1.run_and_wait([&tg1, &tg2, &tasks_executed] { 910 run_deep_stealing(tg1, tg2, 10000, tasks_executed); 911 while (tasks_executed < 100) { 912 // Some stealing is expected to happen. 913 utils::yield(); 914 } 915 CHECK(tasks_executed < 10000); 916 }); 917 tg2.wait(); 918 CHECK(tasks_executed == 10000); 919 } 920 921 //! Test for stack overflow avoidance mechanism. 922 //! \brief \ref error_guessing 923 TEST_CASE("Test for stack overflow avoidance mechanism within arena") { 924 if (tbb::this_task_arena::max_concurrency() < 2) { 925 return; 926 } 927 928 tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, 2); 929 tbb::task_group tg1; 930 tbb::task_group tg2; 931 std::atomic<int> tasks_executed{}; 932 933 // Determine nested task execution limit. 934 int second_thread_executed{}; 935 tg1.run_and_wait([&tg1, &tg2, &tasks_executed, &second_thread_executed] { 936 run_deep_stealing(tg1, tg2, 10000, tasks_executed); 937 do { 938 second_thread_executed = tasks_executed; 939 utils::Sleep(10); 940 } while (second_thread_executed < 100 || second_thread_executed != tasks_executed); 941 CHECK(tasks_executed < 10000); 942 }); 943 tg2.wait(); 944 CHECK(tasks_executed == 10000); 945 946 tasks_executed = 0; 947 tbb::task_arena a(2, 2); 948 tg1.run_and_wait([&a, &tg1, &tg2, &tasks_executed, second_thread_executed] { 949 run_deep_stealing(tg1, tg2, second_thread_executed-1, tasks_executed); 950 while (tasks_executed < second_thread_executed-1) { 951 // Wait until the second thread near the limit. 952 utils::yield(); 953 } 954 tg2.run([&a, &tg1, &tasks_executed] { 955 a.execute([&tg1, &tasks_executed] { 956 volatile char consume_stack[1000]{}; 957 ++tasks_executed; 958 tg1.wait(); 959 utils::suppress_unused_warning(consume_stack); 960 }); 961 }); 962 while (tasks_executed < second_thread_executed) { 963 // Wait until the second joins the arena. 964 utils::yield(); 965 } 966 a.execute([&tg1, &tg2, &tasks_executed] { 967 run_deep_stealing(tg1, tg2, 10000, tasks_executed); 968 }); 969 int currently_executed{}; 970 do { 971 currently_executed = tasks_executed; 972 utils::Sleep(10); 973 } while (currently_executed != tasks_executed); 974 CHECK(tasks_executed < 10000 + second_thread_executed); 975 }); 976 a.execute([&tg2] { 977 tg2.wait(); 978 }); 979 CHECK(tasks_executed == 10000 + second_thread_executed); 980 } 981 982 //! Test checks that we can submit work to task_group asynchronously with waiting. 983 //! \brief \ref regression 984 TEST_CASE("Async task group") { 985 int num_threads = tbb::this_task_arena::max_concurrency(); 986 if (num_threads < 3) { 987 // The test requires at least 2 worker threads 988 return; 989 } 990 tbb::task_arena a(2*num_threads, num_threads); 991 utils::SpinBarrier barrier(num_threads + 2); 992 tbb::task_group tg[2]; 993 std::atomic<bool> finished[2]{}; 994 finished[0] = false; finished[1] = false; 995 for (int i = 0; i < 2; ++i) { 996 a.enqueue([i, &tg, &finished, &barrier] { 997 barrier.wait(); 998 for (int j = 0; j < 10000; ++j) { 999 tg[i].run([] {}); 1000 utils::yield(); 1001 } 1002 finished[i] = true; 1003 }); 1004 } 1005 utils::NativeParallelFor(num_threads, [&](int idx) { 1006 barrier.wait(); 1007 a.execute([idx, &tg, &finished] { 1008 std::size_t counter{}; 1009 while (!finished[idx%2]) { 1010 tg[idx%2].wait(); 1011 if (counter++ % 16 == 0) utils::yield(); 1012 } 1013 tg[idx%2].wait(); 1014 }); 1015 }); 1016 } 1017 1018 struct SelfRunner { 1019 tbb::task_group& m_tg; 1020 std::atomic<unsigned>& count; 1021 void operator()() const { 1022 unsigned previous_count = count.fetch_sub(1); 1023 if (previous_count > 1) 1024 m_tg.run( *this ); 1025 } 1026 }; 1027 1028 //! Submit work to single task_group instance from inside the work 1029 //! \brief \ref error_guessing 1030 TEST_CASE("Run self using same task_group instance") { 1031 const unsigned num = 10; 1032 std::atomic<unsigned> count{num}; 1033 tbb::task_group tg; 1034 SelfRunner uf{tg, count}; 1035 tg.run( uf ); 1036 tg.wait(); 1037 CHECK_MESSAGE( 1038 count == 0, 1039 "Not all tasks were spawned from inside the functor running within task_group." 1040 ); 1041 } 1042 1043 //TODO: move to some common place to share with conformance tests 1044 namespace accept_task_group_context { 1045 1046 template <typename TaskGroup, typename CancelF, typename WaitF> 1047 void run_cancellation_use_case(CancelF&& cancel, WaitF&& wait) { 1048 std::atomic<bool> outer_cancelled{false}; 1049 std::atomic<unsigned> count{13}; 1050 1051 tbb::task_group_context inner_ctx(tbb::task_group_context::isolated); 1052 TaskGroup inner_tg(inner_ctx); 1053 1054 tbb::task_group outer_tg; 1055 auto outer_tg_task = [&] { 1056 inner_tg.run([&] { 1057 utils::SpinWaitUntilEq(outer_cancelled, true); 1058 inner_tg.run( SelfRunner{inner_tg, count} ); 1059 }); 1060 1061 utils::try_call([&] { 1062 std::forward<CancelF>(cancel)(outer_tg); 1063 }).on_completion([&] { 1064 outer_cancelled = true; 1065 }); 1066 }; 1067 1068 auto check = [&] { 1069 tbb::task_group_status outer_status = tbb::task_group_status::not_complete; 1070 outer_status = std::forward<WaitF>(wait)(outer_tg); 1071 CHECK_MESSAGE( 1072 outer_status == tbb::task_group_status::canceled, 1073 "Outer task group should have been cancelled." 1074 ); 1075 1076 tbb::task_group_status inner_status = inner_tg.wait(); 1077 CHECK_MESSAGE( 1078 inner_status == tbb::task_group_status::complete, 1079 "Inner task group should have completed despite the cancellation of the outer one." 1080 ); 1081 1082 CHECK_MESSAGE(0 == count, "Some of the inner group tasks were not executed."); 1083 }; 1084 1085 outer_tg.run(outer_tg_task); 1086 check(); 1087 } 1088 1089 template <typename TaskGroup> 1090 void test() { 1091 run_cancellation_use_case<TaskGroup>( 1092 [](tbb::task_group& outer) { outer.cancel(); }, 1093 [](tbb::task_group& outer) { return outer.wait(); } 1094 ); 1095 1096 #if TBB_USE_EXCEPTIONS 1097 run_cancellation_use_case<TaskGroup>( 1098 [](tbb::task_group& /*outer*/) { 1099 volatile bool suppress_unreachable_code_warning = true; 1100 if (suppress_unreachable_code_warning) { 1101 throw int(); 1102 } 1103 }, 1104 [](tbb::task_group& outer) { 1105 try { 1106 outer.wait(); 1107 return tbb::task_group_status::complete; 1108 } catch(const int&) { 1109 return tbb::task_group_status::canceled; 1110 } 1111 } 1112 ); 1113 #endif 1114 } 1115 1116 } // namespace accept_task_group_context 1117 1118 //! Respect task_group_context passed from outside 1119 //! \brief \ref interface \ref requirement 1120 TEST_CASE("Respect task_group_context passed from outside") { 1121 #if TBB_PREVIEW_ISOLATED_TASK_GROUP 1122 accept_task_group_context::test<tbb::isolated_task_group>(); 1123 #endif 1124 } 1125 1126 #if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS 1127 //! The test for task_handle inside other task waiting with run 1128 //! \brief \ref requirement 1129 TEST_CASE("Task handle for scheduler bypass"){ 1130 tbb::task_group tg; 1131 std::atomic<bool> run {false}; 1132 1133 tg.run([&]{ 1134 return tg.defer([&]{ 1135 run = true; 1136 }); 1137 }); 1138 1139 tg.wait(); 1140 CHECK_MESSAGE(run == true, "task handle returned by user lambda (bypassed) should be run"); 1141 } 1142 1143 //! The test for task_handle inside other task waiting with run_and_wait 1144 //! \brief \ref requirement 1145 TEST_CASE("Task handle for scheduler bypass via run_and_wait"){ 1146 tbb::task_group tg; 1147 std::atomic<bool> run {false}; 1148 1149 tg.run_and_wait([&]{ 1150 return tg.defer([&]{ 1151 run = true; 1152 }); 1153 }); 1154 1155 CHECK_MESSAGE(run == true, "task handle returned by user lambda (bypassed) should be run"); 1156 } 1157 #endif //__TBB_PREVIEW_TASK_GROUP_EXTENSIONS 1158 1159 #if TBB_USE_EXCEPTIONS 1160 //As these tests are against behavior marked by spec as undefined, they can not be put into conformance tests 1161 1162 //! The test for error in scheduling empty task_handle 1163 //! \brief \ref requirement 1164 TEST_CASE("Empty task_handle cannot be scheduled" 1165 * doctest::should_fail() //Test needs to revised as implementation uses assertions instead of exceptions 1166 * doctest::skip() //skip the test for now, to not pollute the test log 1167 ){ 1168 tbb::task_group tg; 1169 1170 CHECK_THROWS_WITH_AS(tg.run(tbb::task_handle{}), "Attempt to schedule empty task_handle", std::runtime_error); 1171 } 1172 1173 //! The test for error in task_handle being scheduled into task_group different from one it was created from 1174 //! \brief \ref requirement 1175 TEST_CASE("task_handle cannot be scheduled into different task_group" 1176 * doctest::should_fail() //Test needs to revised as implementation uses assertions instead of exceptions 1177 * doctest::skip() //skip the test for now, to not pollute the test log 1178 ){ 1179 tbb::task_group tg; 1180 tbb::task_group tg1; 1181 1182 CHECK_THROWS_WITH_AS(tg1.run(tg.defer([]{})), "Attempt to schedule task_handle into different task_group", std::runtime_error); 1183 } 1184 1185 //! The test for error in task_handle being scheduled into task_group different from one it was created from 1186 //! \brief \ref requirement 1187 TEST_CASE("task_handle cannot be scheduled into other task_group of the same context" 1188 * doctest::should_fail() //Implementation is no there yet, as it is not clear that is the expected behavior 1189 * doctest::skip() //skip the test for now, to not pollute the test log 1190 ) 1191 { 1192 tbb::task_group_context ctx; 1193 1194 tbb::task_group tg(ctx); 1195 tbb::task_group tg1(ctx); 1196 1197 CHECK_NOTHROW(tg.run(tg.defer([]{}))); 1198 CHECK_THROWS_WITH_AS(tg1.run(tg.defer([]{})), "Attempt to schedule task_handle into different task_group", std::runtime_error); 1199 } 1200 1201 #endif // TBB_USE_EXCEPTIONS 1202 1203