1 /* 2 Copyright (c) 2005-2023 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "common/test.h" 18 #include "common/utils.h" 19 #include "oneapi/tbb/detail/_config.h" 20 #include "tbb/global_control.h" 21 22 #include "tbb/task_group.h" 23 24 #include "common/concurrency_tracker.h" 25 26 #include <atomic> 27 #include <stdexcept> 28 29 //! \file test_task_group.cpp 30 //! \brief Test for [scheduler.task_group scheduler.task_group_status] specification 31 32 unsigned g_MaxConcurrency = 4; 33 using atomic_t = std::atomic<std::uintptr_t>; 34 unsigned MinThread = 1; 35 unsigned MaxThread = 4; 36 37 //------------------------------------------------------------------------ 38 // Tests for the thread safety of the task_group manipulations 39 //------------------------------------------------------------------------ 40 41 #include "common/spin_barrier.h" 42 43 enum SharingMode { 44 VagabondGroup = 1, 45 ParallelWait = 2 46 }; 47 48 template<typename task_group_type> 49 class SharedGroupBodyImpl : utils::NoCopy, utils::NoAfterlife { 50 static const std::uintptr_t c_numTasks0 = 4096, 51 c_numTasks1 = 1024; 52 53 const std::uintptr_t m_numThreads; 54 const std::uintptr_t m_sharingMode; 55 56 task_group_type *m_taskGroup; 57 atomic_t m_tasksSpawned, 58 m_threadsReady; 59 utils::SpinBarrier m_barrier; 60 61 static atomic_t s_tasksExecuted; 62 63 struct TaskFunctor { 64 SharedGroupBodyImpl *m_pOwner; 65 void operator () () const { 66 if ( m_pOwner->m_sharingMode & ParallelWait ) { 67 while ( utils::ConcurrencyTracker::PeakParallelism() < m_pOwner->m_numThreads ) 68 utils::yield(); 69 } 70 ++s_tasksExecuted; 71 } 72 }; 73 74 TaskFunctor m_taskFunctor; 75 76 void Spawn ( std::uintptr_t numTasks ) { 77 for ( std::uintptr_t i = 0; i < numTasks; ++i ) { 78 ++m_tasksSpawned; 79 utils::ConcurrencyTracker ct; 80 m_taskGroup->run( m_taskFunctor ); 81 } 82 ++m_threadsReady; 83 } 84 85 void DeleteTaskGroup () { 86 delete m_taskGroup; 87 m_taskGroup = nullptr; 88 } 89 90 void Wait () { 91 while ( m_threadsReady != m_numThreads ) 92 utils::yield(); 93 const std::uintptr_t numSpawned = c_numTasks0 + c_numTasks1 * (m_numThreads - 1); 94 CHECK_MESSAGE( m_tasksSpawned == numSpawned, "Wrong number of spawned tasks. The test is broken" ); 95 INFO("Max spawning parallelism is " << utils::ConcurrencyTracker::PeakParallelism() << "out of " << g_MaxConcurrency); 96 if ( m_sharingMode & ParallelWait ) { 97 m_barrier.wait( &utils::ConcurrencyTracker::Reset ); 98 { 99 utils::ConcurrencyTracker ct; 100 m_taskGroup->wait(); 101 } 102 if ( utils::ConcurrencyTracker::PeakParallelism() == 1 ) { 103 const char* msg = "Warning: No parallel waiting detected in TestParallelWait"; 104 WARN( msg ); 105 } 106 m_barrier.wait(); 107 } 108 else 109 m_taskGroup->wait(); 110 CHECK_MESSAGE( m_tasksSpawned == numSpawned, "No tasks should be spawned after wait starts. The test is broken" ); 111 CHECK_MESSAGE( s_tasksExecuted == numSpawned, "Not all spawned tasks were executed" ); 112 } 113 114 public: 115 SharedGroupBodyImpl ( std::uintptr_t numThreads, std::uintptr_t sharingMode = 0 ) 116 : m_numThreads(numThreads) 117 , m_sharingMode(sharingMode) 118 , m_taskGroup(nullptr) 119 , m_barrier(numThreads) 120 { 121 CHECK_MESSAGE( m_numThreads > 1, "SharedGroupBody tests require concurrency" ); 122 if ((m_sharingMode & VagabondGroup) && m_numThreads != 2) { 123 CHECK_MESSAGE(false, "In vagabond mode SharedGroupBody must be used with 2 threads only"); 124 } 125 utils::ConcurrencyTracker::Reset(); 126 s_tasksExecuted = 0; 127 m_tasksSpawned = 0; 128 m_threadsReady = 0; 129 m_taskFunctor.m_pOwner = this; 130 } 131 132 void Run ( std::uintptr_t idx ) { 133 AssertLive(); 134 if ( idx == 0 ) { 135 if (m_taskGroup || m_tasksSpawned) { 136 CHECK_MESSAGE(false, "SharedGroupBody must be reset before reuse"); 137 } 138 m_taskGroup = new task_group_type; 139 Spawn( c_numTasks0 ); 140 Wait(); 141 if ( m_sharingMode & VagabondGroup ) 142 m_barrier.wait(); 143 else 144 DeleteTaskGroup(); 145 } 146 else { 147 while ( m_tasksSpawned == 0 ) 148 utils::yield(); 149 CHECK_MESSAGE ( m_taskGroup, "Task group is not initialized"); 150 Spawn (c_numTasks1); 151 if ( m_sharingMode & ParallelWait ) 152 Wait(); 153 if ( m_sharingMode & VagabondGroup ) { 154 CHECK_MESSAGE ( idx == 1, "In vagabond mode SharedGroupBody must be used with 2 threads only" ); 155 m_barrier.wait(); 156 DeleteTaskGroup(); 157 } 158 } 159 AssertLive(); 160 } 161 }; 162 163 template<typename task_group_type> 164 atomic_t SharedGroupBodyImpl<task_group_type>::s_tasksExecuted; 165 166 template<typename task_group_type> 167 class SharedGroupBody : utils::NoAssign, utils::NoAfterlife { 168 bool m_bOwner; 169 SharedGroupBodyImpl<task_group_type> *m_pImpl; 170 public: 171 SharedGroupBody ( std::uintptr_t numThreads, std::uintptr_t sharingMode = 0 ) 172 : utils::NoAssign() 173 , utils::NoAfterlife() 174 , m_bOwner(true) 175 , m_pImpl( new SharedGroupBodyImpl<task_group_type>(numThreads, sharingMode) ) 176 {} 177 SharedGroupBody ( const SharedGroupBody& src ) 178 : utils::NoAssign() 179 , utils::NoAfterlife() 180 , m_bOwner(false) 181 , m_pImpl(src.m_pImpl) 182 {} 183 ~SharedGroupBody () { 184 if ( m_bOwner ) 185 delete m_pImpl; 186 } 187 void operator() ( std::uintptr_t idx ) const { 188 // Wrap the functior into additional task group to enforce bounding. 189 task_group_type tg; 190 tg.run_and_wait([&] { m_pImpl->Run(idx); }); 191 } 192 }; 193 194 template<typename task_group_type> 195 class RunAndWaitSyncronizationTestBody : utils::NoAssign { 196 utils::SpinBarrier& m_barrier; 197 std::atomic<bool>& m_completed; 198 task_group_type& m_tg; 199 public: 200 RunAndWaitSyncronizationTestBody(utils::SpinBarrier& barrier, std::atomic<bool>& completed, task_group_type& tg) 201 : m_barrier(barrier), m_completed(completed), m_tg(tg) {} 202 203 void operator()() const { 204 m_barrier.wait(); 205 utils::doDummyWork(100000); 206 m_completed = true; 207 } 208 209 void operator()(int id) const { 210 if (id == 0) { 211 m_tg.run_and_wait(*this); 212 } else { 213 m_barrier.wait(); 214 m_tg.wait(); 215 CHECK_MESSAGE(m_completed, "A concurrent waiter has left the wait method earlier than work has finished"); 216 } 217 } 218 }; 219 220 template<typename task_group_type> 221 void TestParallelSpawn () { 222 NativeParallelFor( g_MaxConcurrency, SharedGroupBody<task_group_type>(g_MaxConcurrency) ); 223 } 224 225 template<typename task_group_type> 226 void TestParallelWait () { 227 NativeParallelFor( g_MaxConcurrency, SharedGroupBody<task_group_type>(g_MaxConcurrency, ParallelWait) ); 228 229 utils::SpinBarrier barrier(g_MaxConcurrency); 230 std::atomic<bool> completed; 231 completed = false; 232 task_group_type tg; 233 RunAndWaitSyncronizationTestBody<task_group_type> b(barrier, completed, tg); 234 NativeParallelFor( g_MaxConcurrency, b ); 235 } 236 237 // Tests non-stack-bound task group (the group that is allocated by one thread and destroyed by the other) 238 template<typename task_group_type> 239 void TestVagabondGroup () { 240 NativeParallelFor( 2, SharedGroupBody<task_group_type>(2, VagabondGroup) ); 241 } 242 243 #include "common/memory_usage.h" 244 245 template<typename task_group_type> 246 void TestThreadSafety() { 247 auto tests = [] { 248 for (int trail = 0; trail < 10; ++trail) { 249 TestParallelSpawn<task_group_type>(); 250 TestParallelWait<task_group_type>(); 251 TestVagabondGroup<task_group_type>(); 252 } 253 }; 254 255 // Test and warm up allocator. 256 tests(); 257 258 // Ensure that cosumption is stabilized. 259 std::size_t initial = utils::GetMemoryUsage(); 260 for (;;) { 261 tests(); 262 std::size_t current = utils::GetMemoryUsage(); 263 if (current <= initial) { 264 return; 265 } 266 initial = current; 267 } 268 } 269 //------------------------------------------------------------------------ 270 // Common requisites of the Fibonacci tests 271 //------------------------------------------------------------------------ 272 273 const std::uintptr_t N = 20; 274 const std::uintptr_t F = 6765; 275 276 atomic_t g_Sum; 277 278 #define FIB_TEST_PROLOGUE() \ 279 const unsigned numRepeats = g_MaxConcurrency * 4; \ 280 utils::ConcurrencyTracker::Reset() 281 282 #define FIB_TEST_EPILOGUE(sum) \ 283 CHECK(utils::ConcurrencyTracker::PeakParallelism() <= g_MaxConcurrency); \ 284 CHECK( sum == numRepeats * F ); 285 286 287 // Fibonacci tasks specified as functors 288 template<class task_group_type> 289 class FibTaskBase : utils::NoAssign, utils::NoAfterlife { 290 protected: 291 std::uintptr_t* m_pRes; 292 mutable std::uintptr_t m_Num; 293 virtual void impl() const = 0; 294 public: 295 FibTaskBase( std::uintptr_t* y, std::uintptr_t n ) : m_pRes(y), m_Num(n) {} 296 void operator()() const { 297 utils::ConcurrencyTracker ct; 298 AssertLive(); 299 if( m_Num < 2 ) { 300 *m_pRes = m_Num; 301 } else { 302 impl(); 303 } 304 } 305 virtual ~FibTaskBase() {} 306 }; 307 308 template<class task_group_type> 309 class FibTaskAsymmetricTreeWithFunctor : public FibTaskBase<task_group_type> { 310 public: 311 FibTaskAsymmetricTreeWithFunctor( std::uintptr_t* y, std::uintptr_t n ) : FibTaskBase<task_group_type>(y, n) {} 312 virtual void impl() const override { 313 std::uintptr_t x = ~0u; 314 task_group_type tg; 315 tg.run( FibTaskAsymmetricTreeWithFunctor(&x, this->m_Num-1) ); 316 this->m_Num -= 2; tg.run_and_wait( *this ); 317 *(this->m_pRes) += x; 318 } 319 }; 320 321 template<class task_group_type> 322 class FibTaskSymmetricTreeWithFunctor : public FibTaskBase<task_group_type> { 323 public: 324 FibTaskSymmetricTreeWithFunctor( std::uintptr_t* y, std::uintptr_t n ) : FibTaskBase<task_group_type>(y, n) {} 325 virtual void impl() const override { 326 std::uintptr_t x = ~0u, 327 y = ~0u; 328 task_group_type tg; 329 tg.run( FibTaskSymmetricTreeWithFunctor(&x, this->m_Num-1) ); 330 tg.run( FibTaskSymmetricTreeWithFunctor(&y, this->m_Num-2) ); 331 tg.wait(); 332 *(this->m_pRes) = x + y; 333 } 334 }; 335 336 // Helper functions 337 template<class fib_task> 338 std::uintptr_t RunFibTask(std::uintptr_t n) { 339 std::uintptr_t res = ~0u; 340 fib_task(&res, n)(); 341 return res; 342 } 343 344 template<typename fib_task> 345 void RunFibTest() { 346 FIB_TEST_PROLOGUE(); 347 std::uintptr_t sum = 0; 348 for( unsigned i = 0; i < numRepeats; ++i ) 349 sum += RunFibTask<fib_task>(N); 350 FIB_TEST_EPILOGUE(sum); 351 } 352 353 template<typename fib_task> 354 void FibFunctionNoArgs() { 355 g_Sum += RunFibTask<fib_task>(N); 356 } 357 358 template<typename task_group_type> 359 void TestFibWithLambdas() { 360 FIB_TEST_PROLOGUE(); 361 atomic_t sum; 362 sum = 0; 363 task_group_type tg; 364 for( unsigned i = 0; i < numRepeats; ++i ) 365 tg.run( [&](){sum += RunFibTask<FibTaskSymmetricTreeWithFunctor<task_group_type> >(N);} ); 366 tg.wait(); 367 FIB_TEST_EPILOGUE(sum); 368 } 369 370 template<typename task_group_type> 371 void TestFibWithFunctor() { 372 RunFibTest<FibTaskAsymmetricTreeWithFunctor<task_group_type> >(); 373 RunFibTest< FibTaskSymmetricTreeWithFunctor<task_group_type> >(); 374 } 375 376 template<typename task_group_type> 377 void TestFibWithFunctionPtr() { 378 FIB_TEST_PROLOGUE(); 379 g_Sum = 0; 380 task_group_type tg; 381 for( unsigned i = 0; i < numRepeats; ++i ) 382 tg.run( &FibFunctionNoArgs<FibTaskSymmetricTreeWithFunctor<task_group_type> > ); 383 tg.wait(); 384 FIB_TEST_EPILOGUE(g_Sum); 385 } 386 387 template<typename task_group_type> 388 void RunFibonacciTests() { 389 TestFibWithLambdas<task_group_type>(); 390 TestFibWithFunctor<task_group_type>(); 391 TestFibWithFunctionPtr<task_group_type>(); 392 } 393 394 class test_exception : public std::exception 395 { 396 const char* m_strDescription; 397 public: 398 test_exception ( const char* descr ) : m_strDescription(descr) {} 399 400 const char* what() const throw() override { return m_strDescription; } 401 }; 402 403 using TestException = test_exception; 404 405 #include <string.h> 406 407 #define NUM_CHORES 512 408 #define NUM_GROUPS 64 409 #define SKIP_CHORES (NUM_CHORES/4) 410 #define SKIP_GROUPS (NUM_GROUPS/4) 411 #define EXCEPTION_DESCR1 "Test exception 1" 412 #define EXCEPTION_DESCR2 "Test exception 2" 413 414 atomic_t g_ExceptionCount; 415 atomic_t g_TaskCount; 416 unsigned g_ExecutedAtCancellation; 417 bool g_Rethrow; 418 bool g_Throw; 419 420 class ThrowingTask : utils::NoAssign, utils::NoAfterlife { 421 atomic_t &m_TaskCount; 422 public: 423 ThrowingTask( atomic_t& counter ) : m_TaskCount(counter) {} 424 void operator() () const { 425 utils::ConcurrencyTracker ct; 426 AssertLive(); 427 if ( g_Throw ) { 428 if ( ++m_TaskCount == SKIP_CHORES ) 429 TBB_TEST_THROW(test_exception(EXCEPTION_DESCR1)); 430 utils::yield(); 431 } 432 else { 433 ++g_TaskCount; 434 while( !tbb::is_current_task_group_canceling() ) 435 utils::yield(); 436 } 437 } 438 }; 439 440 inline void ResetGlobals ( bool bThrow, bool bRethrow ) { 441 g_Throw = bThrow; 442 g_Rethrow = bRethrow; 443 g_ExceptionCount = 0; 444 g_TaskCount = 0; 445 utils::ConcurrencyTracker::Reset(); 446 } 447 448 template<typename task_group_type> 449 void LaunchChildrenWithFunctor () { 450 atomic_t count; 451 count = 0; 452 task_group_type g; 453 for (unsigned i = 0; i < NUM_CHORES; ++i) { 454 if (i % 2 == 1) { 455 g.run(g.defer(ThrowingTask(count))); 456 } else 457 { 458 g.run(ThrowingTask(count)); 459 } 460 } 461 #if TBB_USE_EXCEPTIONS 462 tbb::task_group_status status = tbb::not_complete; 463 bool exceptionCaught = false; 464 try { 465 status = g.wait(); 466 } catch ( TestException& e ) { 467 CHECK_MESSAGE( e.what(), "Empty what() string" ); 468 CHECK_MESSAGE( strcmp(e.what(), EXCEPTION_DESCR1) == 0, "Unknown exception" ); 469 exceptionCaught = true; 470 ++g_ExceptionCount; 471 } catch( ... ) { CHECK_MESSAGE( false, "Unknown exception" ); } 472 if (g_Throw && !exceptionCaught && status != tbb::canceled) { 473 CHECK_MESSAGE(false, "No exception in the child task group"); 474 } 475 if ( g_Rethrow && g_ExceptionCount > SKIP_GROUPS ) { 476 throw test_exception(EXCEPTION_DESCR2); 477 } 478 #else 479 g.wait(); 480 #endif 481 } 482 483 // Tests for cancellation and exception handling behavior 484 template<typename task_group_type> 485 void TestManualCancellationWithFunctor () { 486 ResetGlobals( false, false ); 487 task_group_type tg; 488 for (unsigned i = 0; i < NUM_GROUPS; ++i) { 489 // TBB version does not require taking function address 490 if (i % 2 == 0) { 491 auto h = tg.defer(&LaunchChildrenWithFunctor<task_group_type>); 492 tg.run(std::move(h)); 493 } else 494 { 495 tg.run(&LaunchChildrenWithFunctor<task_group_type>); 496 } 497 } 498 CHECK_MESSAGE ( !tbb::is_current_task_group_canceling(), "Unexpected cancellation" ); 499 while ( g_MaxConcurrency > 1 && g_TaskCount == 0 ) 500 utils::yield(); 501 tg.cancel(); 502 g_ExecutedAtCancellation = int(g_TaskCount); 503 tbb::task_group_status status = tg.wait(); 504 CHECK_MESSAGE( status == tbb::canceled, "Task group reported invalid status." ); 505 CHECK_MESSAGE( g_TaskCount <= NUM_GROUPS * NUM_CHORES, "Too many tasks reported. The test is broken" ); 506 CHECK_MESSAGE( g_TaskCount < NUM_GROUPS * NUM_CHORES, "No tasks were cancelled. Cancellation model changed?" ); 507 CHECK_MESSAGE( g_TaskCount <= g_ExecutedAtCancellation + utils::ConcurrencyTracker::PeakParallelism(), "Too many tasks survived cancellation" ); 508 } 509 510 #if TBB_USE_EXCEPTIONS 511 template<typename task_group_type> 512 void TestExceptionHandling1 () { 513 ResetGlobals( true, false ); 514 task_group_type tg; 515 for( unsigned i = 0; i < NUM_GROUPS; ++i ) 516 // TBB version does not require taking function address 517 tg.run( &LaunchChildrenWithFunctor<task_group_type> ); 518 try { 519 tg.wait(); 520 } catch ( ... ) { 521 CHECK_MESSAGE( false, "Unexpected exception" ); 522 } 523 CHECK_MESSAGE( g_ExceptionCount <= NUM_GROUPS, "Too many exceptions from the child groups. The test is broken" ); 524 CHECK_MESSAGE( g_ExceptionCount == NUM_GROUPS, "Not all child groups threw the exception" ); 525 } 526 527 template<typename task_group_type> 528 void TestExceptionHandling2 () { 529 ResetGlobals( true, true ); 530 task_group_type tg; 531 bool exceptionCaught = false; 532 for( unsigned i = 0; i < NUM_GROUPS; ++i ) { 533 // TBB version does not require taking function address 534 tg.run( &LaunchChildrenWithFunctor<task_group_type> ); 535 } 536 try { 537 tg.wait(); 538 } catch ( TestException& e ) { 539 CHECK_MESSAGE( e.what(), "Empty what() string" ); 540 CHECK_MESSAGE( strcmp(e.what(), EXCEPTION_DESCR2) == 0, "Unknown exception" ); 541 exceptionCaught = true; 542 } catch( ... ) { CHECK_MESSAGE( false, "Unknown exception" ); } 543 CHECK_MESSAGE( exceptionCaught, "No exception thrown from the root task group" ); 544 CHECK_MESSAGE( g_ExceptionCount >= SKIP_GROUPS, "Too few exceptions from the child groups. The test is broken" ); 545 CHECK_MESSAGE( g_ExceptionCount <= NUM_GROUPS - SKIP_GROUPS, "Too many exceptions from the child groups. The test is broken" ); 546 CHECK_MESSAGE( g_ExceptionCount < NUM_GROUPS - SKIP_GROUPS, "None of the child groups was cancelled" ); 547 } 548 549 template <typename task_group_type> 550 void TestExceptionHandling3() { 551 task_group_type tg; 552 try { 553 tg.run_and_wait([]() { 554 volatile bool suppress_unreachable_code_warning = true; 555 if (suppress_unreachable_code_warning) { 556 throw 1; 557 } 558 }); 559 } catch (int error) { 560 CHECK(error == 1); 561 } catch ( ... ) { 562 CHECK_MESSAGE( false, "Unexpected exception" ); 563 } 564 } 565 566 template<typename task_group_type> 567 class LaunchChildrenDriver { 568 public: 569 void Launch(task_group_type& tg) { 570 ResetGlobals(false, false); 571 for (unsigned i = 0; i < NUM_GROUPS; ++i) { 572 tg.run(LaunchChildrenWithFunctor<task_group_type>); 573 } 574 CHECK_MESSAGE(!tbb::is_current_task_group_canceling(), "Unexpected cancellation"); 575 while (g_MaxConcurrency > 1 && g_TaskCount == 0) 576 utils::yield(); 577 } 578 579 void Finish() { 580 CHECK_MESSAGE(g_TaskCount <= NUM_GROUPS * NUM_CHORES, "Too many tasks reported. The test is broken"); 581 CHECK_MESSAGE(g_TaskCount < NUM_GROUPS * NUM_CHORES, "No tasks were cancelled. Cancellation model changed?"); 582 CHECK_MESSAGE(g_TaskCount <= g_ExecutedAtCancellation + g_MaxConcurrency, "Too many tasks survived cancellation"); 583 } 584 }; // LaunchChildrenWithTaskHandleDriver 585 586 template<typename task_group_type, bool Throw> 587 void TestMissingWait () { 588 bool exception_occurred = false, 589 unexpected_exception = false; 590 LaunchChildrenDriver<task_group_type> driver; 591 try { 592 task_group_type tg; 593 driver.Launch( tg ); 594 volatile bool suppress_unreachable_code_warning = Throw; 595 if (suppress_unreachable_code_warning) { 596 throw int(); // Initiate stack unwinding 597 } 598 } 599 catch ( const tbb::missing_wait& e ) { 600 CHECK_MESSAGE( e.what(), "Error message is absent" ); 601 exception_occurred = true; 602 unexpected_exception = Throw; 603 } 604 catch ( int ) { 605 exception_occurred = true; 606 unexpected_exception = !Throw; 607 } 608 catch ( ... ) { 609 exception_occurred = unexpected_exception = true; 610 } 611 CHECK( exception_occurred ); 612 CHECK( !unexpected_exception ); 613 driver.Finish(); 614 } 615 #endif 616 617 template<typename task_group_type> 618 void RunCancellationAndExceptionHandlingTests() { 619 TestManualCancellationWithFunctor<task_group_type>(); 620 #if TBB_USE_EXCEPTIONS 621 TestExceptionHandling1<task_group_type>(); 622 TestExceptionHandling2<task_group_type>(); 623 TestExceptionHandling3<task_group_type>(); 624 TestMissingWait<task_group_type, true>(); 625 TestMissingWait<task_group_type, false>(); 626 #endif 627 } 628 629 void EmptyFunction () {} 630 631 struct TestFunctor { 632 void operator()() { CHECK_MESSAGE( false, "Non-const operator called" ); } 633 void operator()() const { /* library requires this overload only */ } 634 }; 635 636 template<typename task_group_type> 637 void TestConstantFunctorRequirement() { 638 task_group_type g; 639 TestFunctor tf; 640 g.run( tf ); g.wait(); 641 g.run_and_wait( tf ); 642 } 643 644 //------------------------------------------------------------------------ 645 namespace TestMoveSemanticsNS { 646 struct TestFunctor { 647 void operator()() const {}; 648 }; 649 650 struct MoveOnlyFunctor : utils::MoveOnly, TestFunctor { 651 MoveOnlyFunctor() : utils::MoveOnly() {}; 652 MoveOnlyFunctor(MoveOnlyFunctor&& other) : utils::MoveOnly(std::move(other)) {}; 653 }; 654 655 struct MovePreferableFunctor : utils::Movable, TestFunctor { 656 MovePreferableFunctor() : utils::Movable() {}; 657 MovePreferableFunctor(MovePreferableFunctor&& other) : utils::Movable(std::move(other)) {}; 658 MovePreferableFunctor(const MovePreferableFunctor& other) : utils::Movable(other) {}; 659 }; 660 661 struct NoMoveNoCopyFunctor : utils::NoCopy, TestFunctor { 662 NoMoveNoCopyFunctor() : utils::NoCopy() {}; 663 // mv ctor is not allowed as cp ctor from parent utils::NoCopy 664 private: 665 NoMoveNoCopyFunctor(NoMoveNoCopyFunctor&&); 666 }; 667 668 template<typename task_group_type> 669 void TestBareFunctors() { 670 task_group_type tg; 671 MovePreferableFunctor mpf; 672 // run_and_wait() doesn't have any copies or moves of arguments inside the impl 673 tg.run_and_wait( NoMoveNoCopyFunctor() ); 674 675 tg.run( MoveOnlyFunctor() ); 676 tg.wait(); 677 678 tg.run( mpf ); 679 tg.wait(); 680 CHECK_MESSAGE(mpf.alive, "object was moved when was passed by lval"); 681 mpf.Reset(); 682 683 tg.run( std::move(mpf) ); 684 tg.wait(); 685 CHECK_MESSAGE(!mpf.alive, "object was copied when was passed by rval"); 686 mpf.Reset(); 687 } 688 } 689 690 template<typename task_group_type> 691 void TestMoveSemantics() { 692 TestMoveSemanticsNS::TestBareFunctors<task_group_type>(); 693 } 694 //------------------------------------------------------------------------ 695 696 // TODO: TBB_REVAMP_TODO - enable when ETS is available 697 #if TBBTEST_USE_TBB && TBB_PREVIEW_ISOLATED_TASK_GROUP 698 namespace TestIsolationNS { 699 class DummyFunctor { 700 public: 701 DummyFunctor() {} 702 void operator()() const { 703 for ( volatile int j = 0; j < 10; ++j ) {} 704 } 705 }; 706 707 template<typename task_group_type> 708 class ParForBody { 709 task_group_type& m_tg; 710 std::atomic<bool>& m_preserved; 711 tbb::enumerable_thread_specific<int>& m_ets; 712 public: 713 ParForBody( 714 task_group_type& tg, 715 std::atomic<bool>& preserved, 716 tbb::enumerable_thread_specific<int>& ets 717 ) : m_tg(tg), m_preserved(preserved), m_ets(ets) {} 718 719 void operator()(int) const { 720 if (++m_ets.local() > 1) m_preserved = false; 721 722 for (int i = 0; i < 1000; ++i) 723 m_tg.run(DummyFunctor()); 724 m_tg.wait(); 725 m_tg.run_and_wait(DummyFunctor()); 726 727 --m_ets.local(); 728 } 729 }; 730 731 template<typename task_group_type> 732 void CheckIsolation(bool isolation_is_expected) { 733 task_group_type tg; 734 std::atomic<bool> isolation_is_preserved; 735 isolation_is_preserved = true; 736 tbb::enumerable_thread_specific<int> ets(0); 737 738 tbb::parallel_for(0, 100, ParForBody<task_group_type>(tg, isolation_is_preserved, ets)); 739 740 ASSERT( 741 isolation_is_expected == isolation_is_preserved, 742 "Actual and expected isolation-related behaviours are different" 743 ); 744 } 745 746 // Should be called only when > 1 thread is used, because otherwise isolation is guaranteed to take place 747 void TestIsolation() { 748 CheckIsolation<tbb::task_group>(false); 749 CheckIsolation<tbb::isolated_task_group>(true); 750 } 751 } 752 #endif 753 754 #if __TBB_USE_ADDRESS_SANITIZER 755 //! Test for thread safety for the task_group 756 //! \brief \ref error_guessing \ref resource_usage 757 TEST_CASE("Memory leaks test is not applicable under ASAN\n" * doctest::skip(true)) {} 758 #else 759 //! Test for thread safety for the task_group 760 //! \brief \ref error_guessing \ref resource_usage 761 TEST_CASE("Thread safety test for the task group") { 762 if (tbb::this_task_arena::max_concurrency() < 2) { 763 // The test requires more than one thread to check thread safety 764 return; 765 } 766 for (unsigned p=MinThread; p <= MaxThread; ++p) { 767 if (p < 2) { 768 continue; 769 } 770 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 771 g_MaxConcurrency = p; 772 TestThreadSafety<tbb::task_group>(); 773 } 774 } 775 #endif 776 777 //! Fibonacci test for task group 778 //! \brief \ref interface \ref requirement 779 TEST_CASE("Fibonacci test for the task group") { 780 for (unsigned p=MinThread; p <= MaxThread; ++p) { 781 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 782 g_MaxConcurrency = p; 783 RunFibonacciTests<tbb::task_group>(); 784 } 785 } 786 787 //! Cancellation and exception test for the task group 788 //! \brief \ref interface \ref requirement 789 TEST_CASE("Cancellation and exception test for the task group") { 790 for (unsigned p = MinThread; p <= MaxThread; ++p) { 791 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 792 tbb::task_arena a(p); 793 g_MaxConcurrency = p; 794 a.execute([] { 795 RunCancellationAndExceptionHandlingTests<tbb::task_group>(); 796 }); 797 } 798 } 799 800 //! Constant functor test for the task group 801 //! \brief \ref interface \ref negative 802 TEST_CASE("Constant functor test for the task group") { 803 for (unsigned p=MinThread; p <= MaxThread; ++p) { 804 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 805 g_MaxConcurrency = p; 806 TestConstantFunctorRequirement<tbb::task_group>(); 807 } 808 } 809 810 //! Move semantics test for the task group 811 //! \brief \ref interface \ref requirement 812 TEST_CASE("Move semantics test for the task group") { 813 for (unsigned p=MinThread; p <= MaxThread; ++p) { 814 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 815 g_MaxConcurrency = p; 816 TestMoveSemantics<tbb::task_group>(); 817 } 818 } 819 820 #if TBB_PREVIEW_ISOLATED_TASK_GROUP 821 822 #if __TBB_USE_ADDRESS_SANITIZER 823 //! Test for thread safety for the isolated_task_group 824 //! \brief \ref error_guessing 825 TEST_CASE("Memory leaks test is not applicable under ASAN\n" * doctest::skip(true)) {} 826 #else 827 //! Test for thread safety for the isolated_task_group 828 //! \brief \ref error_guessing 829 TEST_CASE("Thread safety test for the isolated task group") { 830 if (tbb::this_task_arena::max_concurrency() < 2) { 831 // The test requires more than one thread to check thread safety 832 return; 833 } 834 for (unsigned p=MinThread; p <= MaxThread; ++p) { 835 if (p < 2) { 836 continue; 837 } 838 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 839 g_MaxConcurrency = p; 840 TestThreadSafety<tbb::isolated_task_group>(); 841 } 842 } 843 #endif 844 845 //! Cancellation and exception test for the isolated task group 846 //! \brief \ref interface \ref requirement 847 TEST_CASE("Fibonacci test for the isolated task group") { 848 for (unsigned p=MinThread; p <= MaxThread; ++p) { 849 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 850 g_MaxConcurrency = p; 851 RunFibonacciTests<tbb::isolated_task_group>(); 852 } 853 } 854 855 //! Cancellation and exception test for the isolated task group 856 //! \brief \ref interface \ref requirement 857 TEST_CASE("Cancellation and exception test for the isolated task group") { 858 for (unsigned p=MinThread; p <= MaxThread; ++p) { 859 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 860 g_MaxConcurrency = p; 861 RunCancellationAndExceptionHandlingTests<tbb::isolated_task_group>(); 862 } 863 } 864 865 //! Constant functor test for the isolated task group. 866 //! \brief \ref interface \ref negative 867 TEST_CASE("Constant functor test for the isolated task group") { 868 for (unsigned p=MinThread; p <= MaxThread; ++p) { 869 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 870 g_MaxConcurrency = p; 871 TestConstantFunctorRequirement<tbb::isolated_task_group>(); 872 } 873 } 874 875 //! Move semantics test for the isolated task group. 876 //! \brief \ref interface \ref requirement 877 TEST_CASE("Move semantics test for the isolated task group") { 878 for (unsigned p=MinThread; p <= MaxThread; ++p) { 879 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 880 g_MaxConcurrency = p; 881 TestMoveSemantics<tbb::isolated_task_group>(); 882 } 883 } 884 885 //TODO: add test void isolated_task_group::run(d2::task_handle&& h) and isolated_task_group::::run_and_wait(d2::task_handle&& h) 886 #endif /* TBB_PREVIEW_ISOLATED_TASK_GROUP */ 887 888 void run_deep_stealing(tbb::task_group& tg1, tbb::task_group& tg2, int num_tasks, std::atomic<int>& tasks_executed) { 889 for (int i = 0; i < num_tasks; ++i) { 890 tg2.run([&tg1, &tasks_executed] { 891 volatile char consume_stack[1000]{}; 892 ++tasks_executed; 893 tg1.wait(); 894 utils::suppress_unused_warning(consume_stack); 895 }); 896 } 897 } 898 899 // TODO: move to the conformance test 900 //! Test for stack overflow avoidance mechanism. 901 //! \brief \ref requirement 902 TEST_CASE("Test for stack overflow avoidance mechanism") { 903 if (tbb::this_task_arena::max_concurrency() < 2) { 904 return; 905 } 906 907 tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, 2); 908 tbb::task_group tg1; 909 tbb::task_group tg2; 910 std::atomic<int> tasks_executed{}; 911 tg1.run_and_wait([&tg1, &tg2, &tasks_executed] { 912 run_deep_stealing(tg1, tg2, 10000, tasks_executed); 913 while (tasks_executed < 100) { 914 // Some stealing is expected to happen. 915 utils::yield(); 916 } 917 CHECK(tasks_executed < 10000); 918 }); 919 tg2.wait(); 920 CHECK(tasks_executed == 10000); 921 } 922 923 //! Test for stack overflow avoidance mechanism. 924 //! \brief \ref error_guessing 925 TEST_CASE("Test for stack overflow avoidance mechanism within arena") { 926 if (tbb::this_task_arena::max_concurrency() < 2) { 927 return; 928 } 929 930 tbb::task_arena a1(2, 1); 931 a1.execute([] { 932 tbb::task_group tg1; 933 tbb::task_group tg2; 934 std::atomic<int> tasks_executed{}; 935 936 // Determine nested task execution limit. 937 int second_thread_executed{}; 938 tg1.run_and_wait([&tg1, &tg2, &tasks_executed, &second_thread_executed] { 939 run_deep_stealing(tg1, tg2, 10000, tasks_executed); 940 do { 941 second_thread_executed = tasks_executed; 942 utils::Sleep(10); 943 } while (second_thread_executed < 100 || second_thread_executed != tasks_executed); 944 CHECK(tasks_executed < 10000); 945 }); 946 tg2.wait(); 947 CHECK(tasks_executed == 10000); 948 949 tasks_executed = 0; 950 tbb::task_arena a2(2, 2); 951 tg1.run_and_wait([&a2, &tg1, &tg2, &tasks_executed, second_thread_executed] { 952 run_deep_stealing(tg1, tg2, second_thread_executed - 1, tasks_executed); 953 while (tasks_executed < second_thread_executed - 1) { 954 // Wait until the second thread near the limit. 955 utils::yield(); 956 } 957 tg2.run([&a2, &tg1, &tasks_executed] { 958 a2.execute([&tg1, &tasks_executed] { 959 volatile char consume_stack[1000]{}; 960 ++tasks_executed; 961 tg1.wait(); 962 utils::suppress_unused_warning(consume_stack); 963 }); 964 }); 965 while (tasks_executed < second_thread_executed) { 966 // Wait until the second joins the arena. 967 utils::yield(); 968 } 969 a2.execute([&tg1, &tg2, &tasks_executed] { 970 run_deep_stealing(tg1, tg2, 10000, tasks_executed); 971 }); 972 int currently_executed{}; 973 do { 974 currently_executed = tasks_executed; 975 utils::Sleep(10); 976 } while (currently_executed != tasks_executed); 977 CHECK(tasks_executed < 10000 + second_thread_executed); 978 }); 979 a2.execute([&tg2] { 980 tg2.wait(); 981 }); 982 CHECK(tasks_executed == 10000 + second_thread_executed); 983 }); 984 } 985 986 //! Test checks that we can submit work to task_group asynchronously with waiting. 987 //! \brief \ref regression 988 TEST_CASE("Async task group") { 989 int num_threads = tbb::this_task_arena::max_concurrency(); 990 if (num_threads < 3) { 991 // The test requires at least 2 worker threads 992 return; 993 } 994 tbb::task_arena a(2*num_threads, num_threads); 995 utils::SpinBarrier barrier(num_threads + 2); 996 tbb::task_group tg[2]; 997 std::atomic<bool> finished[2]{}; 998 finished[0] = false; finished[1] = false; 999 for (int i = 0; i < 2; ++i) { 1000 a.enqueue([i, &tg, &finished, &barrier] { 1001 barrier.wait(); 1002 for (int j = 0; j < 10000; ++j) { 1003 tg[i].run([] {}); 1004 utils::yield(); 1005 } 1006 finished[i] = true; 1007 }); 1008 } 1009 utils::NativeParallelFor(num_threads, [&](int idx) { 1010 barrier.wait(); 1011 a.execute([idx, &tg, &finished] { 1012 std::size_t counter{}; 1013 while (!finished[idx%2]) { 1014 tg[idx%2].wait(); 1015 if (counter++ % 16 == 0) utils::yield(); 1016 } 1017 tg[idx%2].wait(); 1018 }); 1019 }); 1020 } 1021 1022 struct SelfRunner { 1023 tbb::task_group& m_tg; 1024 std::atomic<unsigned>& count; 1025 void operator()() const { 1026 unsigned previous_count = count.fetch_sub(1); 1027 if (previous_count > 1) 1028 m_tg.run( *this ); 1029 } 1030 }; 1031 1032 //! Submit work to single task_group instance from inside the work 1033 //! \brief \ref error_guessing 1034 TEST_CASE("Run self using same task_group instance") { 1035 const unsigned num = 10; 1036 std::atomic<unsigned> count{num}; 1037 tbb::task_group tg; 1038 SelfRunner uf{tg, count}; 1039 tg.run( uf ); 1040 tg.wait(); 1041 CHECK_MESSAGE( 1042 count == 0, 1043 "Not all tasks were spawned from inside the functor running within task_group." 1044 ); 1045 } 1046 1047 //TODO: move to some common place to share with conformance tests 1048 namespace accept_task_group_context { 1049 1050 template <typename TaskGroup, typename CancelF, typename WaitF> 1051 void run_cancellation_use_case(CancelF&& cancel, WaitF&& wait) { 1052 std::atomic<bool> outer_cancelled{false}; 1053 std::atomic<unsigned> count{13}; 1054 1055 tbb::task_group_context inner_ctx(tbb::task_group_context::isolated); 1056 TaskGroup inner_tg(inner_ctx); 1057 1058 tbb::task_group outer_tg; 1059 auto outer_tg_task = [&] { 1060 inner_tg.run([&] { 1061 utils::SpinWaitUntilEq(outer_cancelled, true); 1062 inner_tg.run( SelfRunner{inner_tg, count} ); 1063 }); 1064 1065 utils::try_call([&] { 1066 std::forward<CancelF>(cancel)(outer_tg); 1067 }).on_completion([&] { 1068 outer_cancelled = true; 1069 }); 1070 }; 1071 1072 auto check = [&] { 1073 tbb::task_group_status outer_status = tbb::task_group_status::not_complete; 1074 outer_status = std::forward<WaitF>(wait)(outer_tg); 1075 CHECK_MESSAGE( 1076 outer_status == tbb::task_group_status::canceled, 1077 "Outer task group should have been cancelled." 1078 ); 1079 1080 tbb::task_group_status inner_status = inner_tg.wait(); 1081 CHECK_MESSAGE( 1082 inner_status == tbb::task_group_status::complete, 1083 "Inner task group should have completed despite the cancellation of the outer one." 1084 ); 1085 1086 CHECK_MESSAGE(0 == count, "Some of the inner group tasks were not executed."); 1087 }; 1088 1089 outer_tg.run(outer_tg_task); 1090 check(); 1091 } 1092 1093 template <typename TaskGroup> 1094 void test() { 1095 run_cancellation_use_case<TaskGroup>( 1096 [](tbb::task_group& outer) { outer.cancel(); }, 1097 [](tbb::task_group& outer) { return outer.wait(); } 1098 ); 1099 1100 #if TBB_USE_EXCEPTIONS 1101 run_cancellation_use_case<TaskGroup>( 1102 [](tbb::task_group& /*outer*/) { 1103 volatile bool suppress_unreachable_code_warning = true; 1104 if (suppress_unreachable_code_warning) { 1105 throw int(); 1106 } 1107 }, 1108 [](tbb::task_group& outer) { 1109 try { 1110 outer.wait(); 1111 return tbb::task_group_status::complete; 1112 } catch(const int&) { 1113 return tbb::task_group_status::canceled; 1114 } 1115 } 1116 ); 1117 #endif 1118 } 1119 1120 } // namespace accept_task_group_context 1121 1122 //! Respect task_group_context passed from outside 1123 //! \brief \ref interface \ref requirement 1124 TEST_CASE("Respect task_group_context passed from outside") { 1125 #if TBB_PREVIEW_ISOLATED_TASK_GROUP 1126 accept_task_group_context::test<tbb::isolated_task_group>(); 1127 #endif 1128 } 1129 1130 #if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS 1131 //! The test for task_handle inside other task waiting with run 1132 //! \brief \ref requirement 1133 TEST_CASE("Task handle for scheduler bypass"){ 1134 tbb::task_group tg; 1135 std::atomic<bool> run {false}; 1136 1137 tg.run([&]{ 1138 return tg.defer([&]{ 1139 run = true; 1140 }); 1141 }); 1142 1143 tg.wait(); 1144 CHECK_MESSAGE(run == true, "task handle returned by user lambda (bypassed) should be run"); 1145 } 1146 1147 //! The test for task_handle inside other task waiting with run_and_wait 1148 //! \brief \ref requirement 1149 TEST_CASE("Task handle for scheduler bypass via run_and_wait"){ 1150 tbb::task_group tg; 1151 std::atomic<bool> run {false}; 1152 1153 tg.run_and_wait([&]{ 1154 return tg.defer([&]{ 1155 run = true; 1156 }); 1157 }); 1158 1159 CHECK_MESSAGE(run == true, "task handle returned by user lambda (bypassed) should be run"); 1160 } 1161 #endif //__TBB_PREVIEW_TASK_GROUP_EXTENSIONS 1162 1163 #if TBB_USE_EXCEPTIONS 1164 //As these tests are against behavior marked by spec as undefined, they can not be put into conformance tests 1165 1166 //! The test for error in scheduling empty task_handle 1167 //! \brief \ref requirement 1168 TEST_CASE("Empty task_handle cannot be scheduled" 1169 * doctest::should_fail() //Test needs to revised as implementation uses assertions instead of exceptions 1170 * doctest::skip() //skip the test for now, to not pollute the test log 1171 ){ 1172 tbb::task_group tg; 1173 1174 CHECK_THROWS_WITH_AS(tg.run(tbb::task_handle{}), "Attempt to schedule empty task_handle", std::runtime_error); 1175 } 1176 1177 //! The test for error in task_handle being scheduled into task_group different from one it was created from 1178 //! \brief \ref requirement 1179 TEST_CASE("task_handle cannot be scheduled into different task_group" 1180 * doctest::should_fail() //Test needs to revised as implementation uses assertions instead of exceptions 1181 * doctest::skip() //skip the test for now, to not pollute the test log 1182 ){ 1183 tbb::task_group tg; 1184 tbb::task_group tg1; 1185 1186 CHECK_THROWS_WITH_AS(tg1.run(tg.defer([]{})), "Attempt to schedule task_handle into different task_group", std::runtime_error); 1187 } 1188 1189 //! The test for error in task_handle being scheduled into task_group different from one it was created from 1190 //! \brief \ref requirement 1191 TEST_CASE("task_handle cannot be scheduled into other task_group of the same context" 1192 * doctest::should_fail() //Implementation is no there yet, as it is not clear that is the expected behavior 1193 * doctest::skip() //skip the test for now, to not pollute the test log 1194 ) 1195 { 1196 tbb::task_group_context ctx; 1197 1198 tbb::task_group tg(ctx); 1199 tbb::task_group tg1(ctx); 1200 1201 CHECK_NOTHROW(tg.run(tg.defer([]{}))); 1202 CHECK_THROWS_WITH_AS(tg1.run(tg.defer([]{})), "Attempt to schedule task_handle into different task_group", std::runtime_error); 1203 } 1204 1205 #endif // TBB_USE_EXCEPTIONS 1206 1207