1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #if __TBB_CPF_BUILD 18 #define TBB_PREVIEW_ISOLATED_TASK_GROUP 1 19 #endif 20 21 #include "common/test.h" 22 #include "common/utils.h" 23 #include "oneapi/tbb/detail/_config.h" 24 #include "tbb/global_control.h" 25 26 #include "tbb/task_group.h" 27 28 #include "common/concurrency_tracker.h" 29 30 #include <atomic> 31 32 //! \file test_task_group.cpp 33 //! \brief Test for [scheduler.task_group scheduler.task_group_status] specification 34 35 unsigned g_MaxConcurrency = 4; 36 using atomic_t = std::atomic<std::uintptr_t>; 37 unsigned MinThread = 1; 38 unsigned MaxThread = 4; 39 40 //------------------------------------------------------------------------ 41 // Tests for the thread safety of the task_group manipulations 42 //------------------------------------------------------------------------ 43 44 #include "common/spin_barrier.h" 45 46 enum SharingMode { 47 VagabondGroup = 1, 48 ParallelWait = 2 49 }; 50 51 template<typename task_group_type> 52 class SharedGroupBodyImpl : utils::NoCopy, utils::NoAfterlife { 53 static const std::uintptr_t c_numTasks0 = 4096, 54 c_numTasks1 = 1024; 55 56 const std::uintptr_t m_numThreads; 57 const std::uintptr_t m_sharingMode; 58 59 task_group_type *m_taskGroup; 60 atomic_t m_tasksSpawned, 61 m_threadsReady; 62 utils::SpinBarrier m_barrier; 63 64 static atomic_t s_tasksExecuted; 65 66 struct TaskFunctor { 67 SharedGroupBodyImpl *m_pOwner; 68 void operator () () const { 69 if ( m_pOwner->m_sharingMode & ParallelWait ) { 70 while ( utils::ConcurrencyTracker::PeakParallelism() < m_pOwner->m_numThreads ) 71 utils::yield(); 72 } 73 ++s_tasksExecuted; 74 } 75 }; 76 77 TaskFunctor m_taskFunctor; 78 79 void Spawn ( std::uintptr_t numTasks ) { 80 for ( std::uintptr_t i = 0; i < numTasks; ++i ) { 81 ++m_tasksSpawned; 82 utils::ConcurrencyTracker ct; 83 m_taskGroup->run( m_taskFunctor ); 84 } 85 ++m_threadsReady; 86 } 87 88 void DeleteTaskGroup () { 89 delete m_taskGroup; 90 m_taskGroup = NULL; 91 } 92 93 void Wait () { 94 while ( m_threadsReady != m_numThreads ) 95 utils::yield(); 96 const std::uintptr_t numSpawned = c_numTasks0 + c_numTasks1 * (m_numThreads - 1); 97 CHECK_MESSAGE( m_tasksSpawned == numSpawned, "Wrong number of spawned tasks. The test is broken" ); 98 INFO("Max spawning parallelism is " << utils::ConcurrencyTracker::PeakParallelism() << "out of " << g_MaxConcurrency); 99 if ( m_sharingMode & ParallelWait ) { 100 m_barrier.wait( &utils::ConcurrencyTracker::Reset ); 101 { 102 utils::ConcurrencyTracker ct; 103 m_taskGroup->wait(); 104 } 105 if ( utils::ConcurrencyTracker::PeakParallelism() == 1 ) 106 WARN( "Warning: No parallel waiting detected in TestParallelWait" ); 107 m_barrier.wait(); 108 } 109 else 110 m_taskGroup->wait(); 111 CHECK_MESSAGE( m_tasksSpawned == numSpawned, "No tasks should be spawned after wait starts. The test is broken" ); 112 CHECK_MESSAGE( s_tasksExecuted == numSpawned, "Not all spawned tasks were executed" ); 113 } 114 115 public: 116 SharedGroupBodyImpl ( std::uintptr_t numThreads, std::uintptr_t sharingMode = 0 ) 117 : m_numThreads(numThreads) 118 , m_sharingMode(sharingMode) 119 , m_taskGroup(NULL) 120 , m_barrier(numThreads) 121 { 122 CHECK_MESSAGE( m_numThreads > 1, "SharedGroupBody tests require concurrency" ); 123 if ((m_sharingMode & VagabondGroup) && m_numThreads != 2) { 124 CHECK_MESSAGE(false, "In vagabond mode SharedGroupBody must be used with 2 threads only"); 125 } 126 utils::ConcurrencyTracker::Reset(); 127 s_tasksExecuted = 0; 128 m_tasksSpawned = 0; 129 m_threadsReady = 0; 130 m_taskFunctor.m_pOwner = this; 131 } 132 133 void Run ( std::uintptr_t idx ) { 134 AssertLive(); 135 if ( idx == 0 ) { 136 if (m_taskGroup || m_tasksSpawned) { 137 CHECK_MESSAGE(false, "SharedGroupBody must be reset before reuse"); 138 } 139 m_taskGroup = new task_group_type; 140 Spawn( c_numTasks0 ); 141 Wait(); 142 if ( m_sharingMode & VagabondGroup ) 143 m_barrier.wait(); 144 else 145 DeleteTaskGroup(); 146 } 147 else { 148 while ( m_tasksSpawned == 0 ) 149 utils::yield(); 150 CHECK_MESSAGE ( m_taskGroup, "Task group is not initialized"); 151 Spawn (c_numTasks1); 152 if ( m_sharingMode & ParallelWait ) 153 Wait(); 154 if ( m_sharingMode & VagabondGroup ) { 155 CHECK_MESSAGE ( idx == 1, "In vagabond mode SharedGroupBody must be used with 2 threads only" ); 156 m_barrier.wait(); 157 DeleteTaskGroup(); 158 } 159 } 160 AssertLive(); 161 } 162 }; 163 164 template<typename task_group_type> 165 atomic_t SharedGroupBodyImpl<task_group_type>::s_tasksExecuted; 166 167 template<typename task_group_type> 168 class SharedGroupBody : utils::NoAssign, utils::NoAfterlife { 169 bool m_bOwner; 170 SharedGroupBodyImpl<task_group_type> *m_pImpl; 171 public: 172 SharedGroupBody ( std::uintptr_t numThreads, std::uintptr_t sharingMode = 0 ) 173 : utils::NoAssign() 174 , utils::NoAfterlife() 175 , m_bOwner(true) 176 , m_pImpl( new SharedGroupBodyImpl<task_group_type>(numThreads, sharingMode) ) 177 {} 178 SharedGroupBody ( const SharedGroupBody& src ) 179 : utils::NoAssign() 180 , utils::NoAfterlife() 181 , m_bOwner(false) 182 , m_pImpl(src.m_pImpl) 183 {} 184 ~SharedGroupBody () { 185 if ( m_bOwner ) 186 delete m_pImpl; 187 } 188 void operator() ( std::uintptr_t idx ) const { 189 // Wrap the functior into additional task group to enforce bounding. 190 task_group_type tg; 191 tg.run_and_wait([&] { m_pImpl->Run(idx); }); 192 } 193 }; 194 195 template<typename task_group_type> 196 class RunAndWaitSyncronizationTestBody : utils::NoAssign { 197 utils::SpinBarrier& m_barrier; 198 std::atomic<bool>& m_completed; 199 task_group_type& m_tg; 200 public: 201 RunAndWaitSyncronizationTestBody(utils::SpinBarrier& barrier, std::atomic<bool>& completed, task_group_type& tg) 202 : m_barrier(barrier), m_completed(completed), m_tg(tg) {} 203 204 void operator()() const { 205 m_barrier.wait(); 206 utils::doDummyWork(100000); 207 m_completed = true; 208 } 209 210 void operator()(int id) const { 211 if (id == 0) { 212 m_tg.run_and_wait(*this); 213 } else { 214 m_barrier.wait(); 215 m_tg.wait(); 216 CHECK_MESSAGE(m_completed, "A concurrent waiter has left the wait method earlier than work has finished"); 217 } 218 } 219 }; 220 221 template<typename task_group_type> 222 void TestParallelSpawn () { 223 NativeParallelFor( g_MaxConcurrency, SharedGroupBody<task_group_type>(g_MaxConcurrency) ); 224 } 225 226 template<typename task_group_type> 227 void TestParallelWait () { 228 NativeParallelFor( g_MaxConcurrency, SharedGroupBody<task_group_type>(g_MaxConcurrency, ParallelWait) ); 229 230 utils::SpinBarrier barrier(g_MaxConcurrency); 231 std::atomic<bool> completed; 232 completed = false; 233 task_group_type tg; 234 RunAndWaitSyncronizationTestBody<task_group_type> b(barrier, completed, tg); 235 NativeParallelFor( g_MaxConcurrency, b ); 236 } 237 238 // Tests non-stack-bound task group (the group that is allocated by one thread and destroyed by the other) 239 template<typename task_group_type> 240 void TestVagabondGroup () { 241 NativeParallelFor( 2, SharedGroupBody<task_group_type>(2, VagabondGroup) ); 242 } 243 244 #include "common/memory_usage.h" 245 246 template<typename task_group_type> 247 void TestThreadSafety() { 248 auto tests = [] { 249 for (int trail = 0; trail < 10; ++trail) { 250 TestParallelSpawn<task_group_type>(); 251 TestParallelWait<task_group_type>(); 252 TestVagabondGroup<task_group_type>(); 253 } 254 }; 255 256 // Test and warm up allocator. 257 tests(); 258 259 // Ensure that cosumption is stabilized. 260 std::size_t initial = utils::GetMemoryUsage(); 261 for (;;) { 262 tests(); 263 std::size_t current = utils::GetMemoryUsage(); 264 if (current <= initial) { 265 return; 266 } 267 initial = current; 268 } 269 } 270 //------------------------------------------------------------------------ 271 // Common requisites of the Fibonacci tests 272 //------------------------------------------------------------------------ 273 274 const std::uintptr_t N = 20; 275 const std::uintptr_t F = 6765; 276 277 atomic_t g_Sum; 278 279 #define FIB_TEST_PROLOGUE() \ 280 const unsigned numRepeats = g_MaxConcurrency * (TBB_USE_DEBUG ? 4 : 16); \ 281 utils::ConcurrencyTracker::Reset() 282 283 #define FIB_TEST_EPILOGUE(sum) \ 284 CHECK( sum == numRepeats * F ); 285 286 287 // Fibonacci tasks specified as functors 288 template<class task_group_type> 289 class FibTaskBase : utils::NoAssign, utils::NoAfterlife { 290 protected: 291 std::uintptr_t* m_pRes; 292 mutable std::uintptr_t m_Num; 293 virtual void impl() const = 0; 294 public: 295 FibTaskBase( std::uintptr_t* y, std::uintptr_t n ) : m_pRes(y), m_Num(n) {} 296 void operator()() const { 297 utils::ConcurrencyTracker ct; 298 AssertLive(); 299 if( m_Num < 2 ) { 300 *m_pRes = m_Num; 301 } else { 302 impl(); 303 } 304 } 305 virtual ~FibTaskBase() {} 306 }; 307 308 template<class task_group_type> 309 class FibTaskAsymmetricTreeWithFunctor : public FibTaskBase<task_group_type> { 310 public: 311 FibTaskAsymmetricTreeWithFunctor( std::uintptr_t* y, std::uintptr_t n ) : FibTaskBase<task_group_type>(y, n) {} 312 virtual void impl() const override { 313 std::uintptr_t x = ~0u; 314 task_group_type tg; 315 tg.run( FibTaskAsymmetricTreeWithFunctor(&x, this->m_Num-1) ); 316 this->m_Num -= 2; tg.run_and_wait( *this ); 317 *(this->m_pRes) += x; 318 } 319 }; 320 321 template<class task_group_type> 322 class FibTaskSymmetricTreeWithFunctor : public FibTaskBase<task_group_type> { 323 public: 324 FibTaskSymmetricTreeWithFunctor( std::uintptr_t* y, std::uintptr_t n ) : FibTaskBase<task_group_type>(y, n) {} 325 virtual void impl() const override { 326 std::uintptr_t x = ~0u, 327 y = ~0u; 328 task_group_type tg; 329 tg.run( FibTaskSymmetricTreeWithFunctor(&x, this->m_Num-1) ); 330 tg.run( FibTaskSymmetricTreeWithFunctor(&y, this->m_Num-2) ); 331 tg.wait(); 332 *(this->m_pRes) = x + y; 333 } 334 }; 335 336 // Helper functions 337 template<class fib_task> 338 std::uintptr_t RunFibTask(std::uintptr_t n) { 339 std::uintptr_t res = ~0u; 340 fib_task(&res, n)(); 341 return res; 342 } 343 344 template<typename fib_task> 345 void RunFibTest() { 346 FIB_TEST_PROLOGUE(); 347 std::uintptr_t sum = 0; 348 for( unsigned i = 0; i < numRepeats; ++i ) 349 sum += RunFibTask<fib_task>(N); 350 FIB_TEST_EPILOGUE(sum); 351 } 352 353 template<typename fib_task> 354 void FibFunctionNoArgs() { 355 g_Sum += RunFibTask<fib_task>(N); 356 } 357 358 template<typename task_group_type> 359 void TestFibWithLambdas() { 360 FIB_TEST_PROLOGUE(); 361 atomic_t sum; 362 sum = 0; 363 task_group_type tg; 364 for( unsigned i = 0; i < numRepeats; ++i ) 365 tg.run( [&](){sum += RunFibTask<FibTaskSymmetricTreeWithFunctor<task_group_type> >(N);} ); 366 tg.wait(); 367 FIB_TEST_EPILOGUE(sum); 368 } 369 370 template<typename task_group_type> 371 void TestFibWithFunctor() { 372 RunFibTest<FibTaskAsymmetricTreeWithFunctor<task_group_type> >(); 373 RunFibTest< FibTaskSymmetricTreeWithFunctor<task_group_type> >(); 374 } 375 376 template<typename task_group_type> 377 void TestFibWithFunctionPtr() { 378 FIB_TEST_PROLOGUE(); 379 g_Sum = 0; 380 task_group_type tg; 381 for( unsigned i = 0; i < numRepeats; ++i ) 382 tg.run( &FibFunctionNoArgs<FibTaskSymmetricTreeWithFunctor<task_group_type> > ); 383 tg.wait(); 384 FIB_TEST_EPILOGUE(g_Sum); 385 } 386 387 template<typename task_group_type> 388 void RunFibonacciTests() { 389 TestFibWithLambdas<task_group_type>(); 390 TestFibWithFunctor<task_group_type>(); 391 TestFibWithFunctionPtr<task_group_type>(); 392 } 393 394 class test_exception : public std::exception 395 { 396 const char* m_strDescription; 397 public: 398 test_exception ( const char* descr ) : m_strDescription(descr) {} 399 400 const char* what() const throw() override { return m_strDescription; } 401 }; 402 403 #if TBB_USE_CAPTURED_EXCEPTION 404 #include "tbb/tbb_exception.h" 405 typedef tbb::captured_exception TestException; 406 #else 407 typedef test_exception TestException; 408 #endif 409 410 #include <string.h> 411 412 #define NUM_CHORES 512 413 #define NUM_GROUPS 64 414 #define SKIP_CHORES (NUM_CHORES/4) 415 #define SKIP_GROUPS (NUM_GROUPS/4) 416 #define EXCEPTION_DESCR1 "Test exception 1" 417 #define EXCEPTION_DESCR2 "Test exception 2" 418 419 atomic_t g_ExceptionCount; 420 atomic_t g_TaskCount; 421 unsigned g_ExecutedAtCancellation; 422 bool g_Rethrow; 423 bool g_Throw; 424 425 class ThrowingTask : utils::NoAssign, utils::NoAfterlife { 426 atomic_t &m_TaskCount; 427 public: 428 ThrowingTask( atomic_t& counter ) : m_TaskCount(counter) {} 429 void operator() () const { 430 utils::ConcurrencyTracker ct; 431 AssertLive(); 432 if ( g_Throw ) { 433 if ( ++m_TaskCount == SKIP_CHORES ) 434 TBB_TEST_THROW(test_exception(EXCEPTION_DESCR1)); 435 utils::yield(); 436 } 437 else { 438 ++g_TaskCount; 439 while( !tbb::is_current_task_group_canceling() ) 440 utils::yield(); 441 } 442 } 443 }; 444 445 inline void ResetGlobals ( bool bThrow, bool bRethrow ) { 446 g_Throw = bThrow; 447 g_Rethrow = bRethrow; 448 g_ExceptionCount = 0; 449 g_TaskCount = 0; 450 utils::ConcurrencyTracker::Reset(); 451 } 452 453 template<typename task_group_type> 454 void LaunchChildrenWithFunctor () { 455 atomic_t count; 456 count = 0; 457 task_group_type g; 458 for( unsigned i = 0; i < NUM_CHORES; ++i ) 459 g.run( ThrowingTask(count) ); 460 #if TBB_USE_EXCEPTIONS 461 tbb::task_group_status status = tbb::not_complete; 462 bool exceptionCaught = false; 463 try { 464 status = g.wait(); 465 } catch ( TestException& e ) { 466 CHECK_MESSAGE( e.what(), "Empty what() string" ); 467 CHECK_MESSAGE( strcmp(e.what(), EXCEPTION_DESCR1) == 0, "Unknown exception" ); 468 exceptionCaught = true; 469 ++g_ExceptionCount; 470 } catch( ... ) { CHECK_MESSAGE( false, "Unknown exception" ); } 471 if (g_Throw && !exceptionCaught && status != tbb::canceled) { 472 CHECK_MESSAGE(false, "No exception in the child task group"); 473 } 474 if ( g_Rethrow && g_ExceptionCount > SKIP_GROUPS ) { 475 throw test_exception(EXCEPTION_DESCR2); 476 } 477 #else 478 g.wait(); 479 #endif 480 } 481 482 // Tests for cancellation and exception handling behavior 483 template<typename task_group_type> 484 void TestManualCancellationWithFunctor () { 485 ResetGlobals( false, false ); 486 task_group_type tg; 487 for( unsigned i = 0; i < NUM_GROUPS; ++i ) 488 // TBB version does not require taking function address 489 tg.run( &LaunchChildrenWithFunctor<task_group_type> ); 490 CHECK_MESSAGE ( !tbb::is_current_task_group_canceling(), "Unexpected cancellation" ); 491 while ( g_MaxConcurrency > 1 && g_TaskCount == 0 ) 492 utils::yield(); 493 tg.cancel(); 494 g_ExecutedAtCancellation = int(g_TaskCount); 495 tbb::task_group_status status = tg.wait(); 496 CHECK_MESSAGE( status == tbb::canceled, "Task group reported invalid status." ); 497 CHECK_MESSAGE( g_TaskCount <= NUM_GROUPS * NUM_CHORES, "Too many tasks reported. The test is broken" ); 498 CHECK_MESSAGE( g_TaskCount < NUM_GROUPS * NUM_CHORES, "No tasks were cancelled. Cancellation model changed?" ); 499 CHECK_MESSAGE( g_TaskCount <= g_ExecutedAtCancellation + utils::ConcurrencyTracker::PeakParallelism(), "Too many tasks survived cancellation" ); 500 } 501 502 #if TBB_USE_EXCEPTIONS 503 template<typename task_group_type> 504 void TestExceptionHandling1 () { 505 ResetGlobals( true, false ); 506 task_group_type tg; 507 for( unsigned i = 0; i < NUM_GROUPS; ++i ) 508 // TBB version does not require taking function address 509 tg.run( &LaunchChildrenWithFunctor<task_group_type> ); 510 try { 511 tg.wait(); 512 } catch ( ... ) { 513 CHECK_MESSAGE( false, "Unexpected exception" ); 514 } 515 CHECK_MESSAGE( g_ExceptionCount <= NUM_GROUPS, "Too many exceptions from the child groups. The test is broken" ); 516 CHECK_MESSAGE( g_ExceptionCount == NUM_GROUPS, "Not all child groups threw the exception" ); 517 } 518 519 template<typename task_group_type> 520 void TestExceptionHandling2 () { 521 ResetGlobals( true, true ); 522 task_group_type tg; 523 bool exceptionCaught = false; 524 for( unsigned i = 0; i < NUM_GROUPS; ++i ) { 525 // TBB version does not require taking function address 526 tg.run( &LaunchChildrenWithFunctor<task_group_type> ); 527 } 528 try { 529 tg.wait(); 530 } catch ( TestException& e ) { 531 CHECK_MESSAGE( e.what(), "Empty what() string" ); 532 CHECK_MESSAGE( strcmp(e.what(), EXCEPTION_DESCR2) == 0, "Unknown exception" ); 533 exceptionCaught = true; 534 } catch( ... ) { CHECK_MESSAGE( false, "Unknown exception" ); } 535 CHECK_MESSAGE( exceptionCaught, "No exception thrown from the root task group" ); 536 CHECK_MESSAGE( g_ExceptionCount >= SKIP_GROUPS, "Too few exceptions from the child groups. The test is broken" ); 537 CHECK_MESSAGE( g_ExceptionCount <= NUM_GROUPS - SKIP_GROUPS, "Too many exceptions from the child groups. The test is broken" ); 538 CHECK_MESSAGE( g_ExceptionCount < NUM_GROUPS - SKIP_GROUPS, "None of the child groups was cancelled" ); 539 } 540 541 template <typename task_group_type> 542 void TestExceptionHandling3() { 543 task_group_type tg; 544 try { 545 tg.run_and_wait([]() { 546 volatile bool suppress_unreachable_code_warning = true; 547 if (suppress_unreachable_code_warning) { 548 throw 1; 549 } 550 }); 551 } catch (int error) { 552 CHECK(error == 1); 553 } catch ( ... ) { 554 CHECK_MESSAGE( false, "Unexpected exception" ); 555 } 556 } 557 558 template<typename task_group_type> 559 class LaunchChildrenDriver { 560 public: 561 void Launch(task_group_type& tg) { 562 ResetGlobals(false, false); 563 for (unsigned i = 0; i < NUM_GROUPS; ++i) { 564 tg.run(LaunchChildrenWithFunctor<task_group_type>); 565 } 566 CHECK_MESSAGE(!tbb::is_current_task_group_canceling(), "Unexpected cancellation"); 567 while (g_MaxConcurrency > 1 && g_TaskCount == 0) 568 utils::yield(); 569 } 570 571 void Finish() { 572 CHECK_MESSAGE(g_TaskCount <= NUM_GROUPS * NUM_CHORES, "Too many tasks reported. The test is broken"); 573 CHECK_MESSAGE(g_TaskCount < NUM_GROUPS * NUM_CHORES, "No tasks were cancelled. Cancellation model changed?"); 574 CHECK_MESSAGE(g_TaskCount <= g_ExecutedAtCancellation + g_MaxConcurrency, "Too many tasks survived cancellation"); 575 } 576 }; // LaunchChildrenWithTaskHandleDriver 577 578 template<typename task_group_type, bool Throw> 579 void TestMissingWait () { 580 bool exception_occurred = false, 581 unexpected_exception = false; 582 LaunchChildrenDriver<task_group_type> driver; 583 try { 584 task_group_type tg; 585 driver.Launch( tg ); 586 volatile bool suppress_unreachable_code_warning = Throw; 587 if (suppress_unreachable_code_warning) { 588 throw int(); // Initiate stack unwinding 589 } 590 } 591 catch ( const tbb::missing_wait& e ) { 592 CHECK_MESSAGE( e.what(), "Error message is absent" ); 593 exception_occurred = true; 594 unexpected_exception = Throw; 595 } 596 catch ( int ) { 597 exception_occurred = true; 598 unexpected_exception = !Throw; 599 } 600 catch ( ... ) { 601 exception_occurred = unexpected_exception = true; 602 } 603 CHECK( exception_occurred ); 604 CHECK( !unexpected_exception ); 605 driver.Finish(); 606 } 607 #endif 608 609 template<typename task_group_type> 610 void RunCancellationAndExceptionHandlingTests() { 611 TestManualCancellationWithFunctor<task_group_type>(); 612 #if TBB_USE_EXCEPTIONS 613 TestExceptionHandling1<task_group_type>(); 614 TestExceptionHandling2<task_group_type>(); 615 TestExceptionHandling3<task_group_type>(); 616 TestMissingWait<task_group_type, true>(); 617 TestMissingWait<task_group_type, false>(); 618 #endif 619 } 620 621 void EmptyFunction () {} 622 623 struct TestFunctor { 624 void operator()() { CHECK_MESSAGE( false, "Non-const operator called" ); } 625 void operator()() const { /* library requires this overload only */ } 626 }; 627 628 template<typename task_group_type> 629 void TestConstantFunctorRequirement() { 630 task_group_type g; 631 TestFunctor tf; 632 g.run( tf ); g.wait(); 633 g.run_and_wait( tf ); 634 } 635 636 //------------------------------------------------------------------------ 637 namespace TestMoveSemanticsNS { 638 struct TestFunctor { 639 void operator()() const {}; 640 }; 641 642 struct MoveOnlyFunctor : utils::MoveOnly, TestFunctor { 643 MoveOnlyFunctor() : utils::MoveOnly() {}; 644 MoveOnlyFunctor(MoveOnlyFunctor&& other) : utils::MoveOnly(std::move(other)) {}; 645 }; 646 647 struct MovePreferableFunctor : utils::Movable, TestFunctor { 648 MovePreferableFunctor() : utils::Movable() {}; 649 MovePreferableFunctor(MovePreferableFunctor&& other) : utils::Movable(std::move(other)) {}; 650 MovePreferableFunctor(const MovePreferableFunctor& other) : utils::Movable(other) {}; 651 }; 652 653 struct NoMoveNoCopyFunctor : utils::NoCopy, TestFunctor { 654 NoMoveNoCopyFunctor() : utils::NoCopy() {}; 655 // mv ctor is not allowed as cp ctor from parent utils::NoCopy 656 private: 657 NoMoveNoCopyFunctor(NoMoveNoCopyFunctor&&); 658 }; 659 660 template<typename task_group_type> 661 void TestBareFunctors() { 662 task_group_type tg; 663 MovePreferableFunctor mpf; 664 // run_and_wait() doesn't have any copies or moves of arguments inside the impl 665 tg.run_and_wait( NoMoveNoCopyFunctor() ); 666 667 tg.run( MoveOnlyFunctor() ); 668 tg.wait(); 669 670 tg.run( mpf ); 671 tg.wait(); 672 CHECK_MESSAGE(mpf.alive, "object was moved when was passed by lval"); 673 mpf.Reset(); 674 675 tg.run( std::move(mpf) ); 676 tg.wait(); 677 CHECK_MESSAGE(!mpf.alive, "object was copied when was passed by rval"); 678 mpf.Reset(); 679 } 680 } 681 682 template<typename task_group_type> 683 void TestMoveSemantics() { 684 TestMoveSemanticsNS::TestBareFunctors<task_group_type>(); 685 } 686 //------------------------------------------------------------------------ 687 688 // TODO: TBB_REVAMP_TODO - enable when ETS is available 689 #if TBBTEST_USE_TBB && TBB_PREVIEW_ISOLATED_TASK_GROUP 690 namespace TestIsolationNS { 691 class DummyFunctor { 692 public: 693 DummyFunctor() {} 694 void operator()() const { 695 for ( volatile int j = 0; j < 10; ++j ) {} 696 } 697 }; 698 699 template<typename task_group_type> 700 class ParForBody { 701 task_group_type& m_tg; 702 std::atomic<bool>& m_preserved; 703 tbb::enumerable_thread_specific<int>& m_ets; 704 public: 705 ParForBody( 706 task_group_type& tg, 707 std::atomic<bool>& preserved, 708 tbb::enumerable_thread_specific<int>& ets 709 ) : m_tg(tg), m_preserved(preserved), m_ets(ets) {} 710 711 void operator()(int) const { 712 if (++m_ets.local() > 1) m_preserved = false; 713 714 for (int i = 0; i < 1000; ++i) 715 m_tg.run(DummyFunctor()); 716 m_tg.wait(); 717 m_tg.run_and_wait(DummyFunctor()); 718 719 --m_ets.local(); 720 } 721 }; 722 723 template<typename task_group_type> 724 void CheckIsolation(bool isolation_is_expected) { 725 task_group_type tg; 726 std::atomic<bool> isolation_is_preserved; 727 isolation_is_preserved = true; 728 tbb::enumerable_thread_specific<int> ets(0); 729 730 tbb::parallel_for(0, 100, ParForBody<task_group_type>(tg, isolation_is_preserved, ets)); 731 732 ASSERT( 733 isolation_is_expected == isolation_is_preserved, 734 "Actual and expected isolation-related behaviours are different" 735 ); 736 } 737 738 // Should be called only when > 1 thread is used, because otherwise isolation is guaranteed to take place 739 void TestIsolation() { 740 CheckIsolation<tbb::task_group>(false); 741 CheckIsolation<tbb::isolated_task_group>(true); 742 } 743 } 744 #endif 745 746 //! Test for thread safety for the task_group 747 //! \brief \ref error_guessing \ref resource_usage 748 TEST_CASE("Thread safety test for the task group") { 749 for (unsigned p=MinThread; p <= MaxThread; ++p) { 750 if (p < 2) { 751 continue; 752 } 753 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 754 g_MaxConcurrency = p; 755 TestThreadSafety<tbb::task_group>(); 756 } 757 } 758 759 //! Fibonacci test for task group 760 //! \brief \ref interface \ref requirement 761 TEST_CASE("Fibonacci test for the task group") { 762 for (unsigned p=MinThread; p <= MaxThread; ++p) { 763 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 764 g_MaxConcurrency = p; 765 RunFibonacciTests<tbb::task_group>(); 766 } 767 } 768 769 //! Cancellation and exception test for the task group 770 //! \brief \ref interface \ref requirement 771 TEST_CASE("Cancellation and exception test for the task group") { 772 for (unsigned p = MinThread; p <= MaxThread; ++p) { 773 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 774 g_MaxConcurrency = p; 775 RunCancellationAndExceptionHandlingTests<tbb::task_group>(); 776 } 777 } 778 779 //! Constant functor test for the task group 780 //! \brief \ref interface \ref negative 781 TEST_CASE("Constant functor test for the task group") { 782 for (unsigned p=MinThread; p <= MaxThread; ++p) { 783 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 784 g_MaxConcurrency = p; 785 TestConstantFunctorRequirement<tbb::task_group>(); 786 } 787 } 788 789 //! Move semantics test for the task group 790 //! \brief \ref interface \ref requirement 791 TEST_CASE("Move semantics test for the task group") { 792 for (unsigned p=MinThread; p <= MaxThread; ++p) { 793 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 794 g_MaxConcurrency = p; 795 TestMoveSemantics<tbb::task_group>(); 796 } 797 } 798 799 #if TBB_PREVIEW_ISOLATED_TASK_GROUP 800 //! Test for thread safety for the isolated_task_group 801 //! \brief \ref error_guessing 802 TEST_CASE("Thread safety test for the isolated task group") { 803 for (unsigned p=MinThread; p <= MaxThread; ++p) { 804 if (p < 2) { 805 continue; 806 } 807 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 808 g_MaxConcurrency = p; 809 TestThreadSafety<tbb::isolated_task_group>(); 810 } 811 } 812 813 //! Cancellation and exception test for the isolated task group 814 //! \brief \ref interface \ref requirement 815 TEST_CASE("Fibonacci test for the isolated task group") { 816 for (unsigned p=MinThread; p <= MaxThread; ++p) { 817 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 818 g_MaxConcurrency = p; 819 RunFibonacciTests<tbb::isolated_task_group>(); 820 } 821 } 822 823 //! Cancellation and exception test for the isolated task group 824 //! \brief \ref interface \ref requirement 825 TEST_CASE("Cancellation and exception test for the isolated task group") { 826 for (unsigned p=MinThread; p <= MaxThread; ++p) { 827 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 828 g_MaxConcurrency = p; 829 RunCancellationAndExceptionHandlingTests<tbb::isolated_task_group>(); 830 } 831 } 832 833 //! Constant functor test for the isolated task group. 834 //! \brief \ref interface \ref negative 835 TEST_CASE("Constant functor test for the isolated task group") { 836 for (unsigned p=MinThread; p <= MaxThread; ++p) { 837 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 838 g_MaxConcurrency = p; 839 TestConstantFunctorRequirement<tbb::isolated_task_group>(); 840 } 841 } 842 843 //! Move semantics test for the isolated task group. 844 //! \brief \ref interface \ref requirement 845 TEST_CASE("Move semantics test for the isolated task group") { 846 for (unsigned p=MinThread; p <= MaxThread; ++p) { 847 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 848 g_MaxConcurrency = p; 849 TestMoveSemantics<tbb::isolated_task_group>(); 850 } 851 } 852 #endif /* TBB_PREVIEW_ISOLATED_TASK_GROUP */ 853 854 void run_deep_stealing(tbb::task_group& tg1, tbb::task_group& tg2, int num_tasks, std::atomic<int>& tasks_executed) { 855 for (int i = 0; i < num_tasks; ++i) { 856 tg2.run([&tg1, &tasks_executed] { 857 volatile char consume_stack[1000]{}; 858 ++tasks_executed; 859 tg1.wait(); 860 utils::suppress_unused_warning(consume_stack); 861 }); 862 } 863 } 864 865 // TODO: move to the conformance test 866 //! Test for stack overflow avoidance mechanism. 867 //! \brief \ref requirement 868 TEST_CASE("Test for stack overflow avoidance mechanism") { 869 if (tbb::this_task_arena::max_concurrency() < 2) { 870 return; 871 } 872 873 tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, 2); 874 tbb::task_group tg1; 875 tbb::task_group tg2; 876 std::atomic<int> tasks_executed{}; 877 tg1.run_and_wait([&tg1, &tg2, &tasks_executed] { 878 run_deep_stealing(tg1, tg2, 10000, tasks_executed); 879 while (tasks_executed < 100) { 880 // Some stealing is expected to happen. 881 utils::yield(); 882 } 883 CHECK(tasks_executed < 10000); 884 }); 885 tg2.wait(); 886 CHECK(tasks_executed == 10000); 887 } 888 889 //! Test for stack overflow avoidance mechanism. 890 //! \brief \ref error_guessing 891 TEST_CASE("Test for stack overflow avoidance mechanism within arena") { 892 if (tbb::this_task_arena::max_concurrency() < 2) { 893 return; 894 } 895 896 tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, 2); 897 tbb::task_group tg1; 898 tbb::task_group tg2; 899 std::atomic<int> tasks_executed{}; 900 901 // Determine nested task execution limit. 902 int second_thread_executed{}; 903 tg1.run_and_wait([&tg1, &tg2, &tasks_executed, &second_thread_executed] { 904 run_deep_stealing(tg1, tg2, 10000, tasks_executed); 905 do { 906 second_thread_executed = tasks_executed; 907 utils::Sleep(10); 908 } while (second_thread_executed < 100 || second_thread_executed != tasks_executed); 909 CHECK(tasks_executed < 10000); 910 }); 911 tg2.wait(); 912 CHECK(tasks_executed == 10000); 913 914 tasks_executed = 0; 915 tbb::task_arena a(2, 2); 916 tg1.run_and_wait([&a, &tg1, &tg2, &tasks_executed, second_thread_executed] { 917 run_deep_stealing(tg1, tg2, second_thread_executed-1, tasks_executed); 918 while (tasks_executed < second_thread_executed-1) { 919 // Wait until the second thread near the limit. 920 utils::yield(); 921 } 922 tg2.run([&a, &tg1, &tasks_executed] { 923 a.execute([&tg1, &tasks_executed] { 924 volatile char consume_stack[1000]{}; 925 ++tasks_executed; 926 tg1.wait(); 927 utils::suppress_unused_warning(consume_stack); 928 }); 929 }); 930 while (tasks_executed < second_thread_executed) { 931 // Wait until the second joins the arena. 932 utils::yield(); 933 } 934 a.execute([&tg1, &tg2, &tasks_executed] { 935 run_deep_stealing(tg1, tg2, 10000, tasks_executed); 936 }); 937 int currently_executed{}; 938 do { 939 currently_executed = tasks_executed; 940 utils::Sleep(10); 941 } while (currently_executed != tasks_executed); 942 CHECK(tasks_executed < 10000 + second_thread_executed); 943 }); 944 a.execute([&tg2] { 945 tg2.wait(); 946 }); 947 CHECK(tasks_executed == 10000 + second_thread_executed); 948 } 949 950 //! Test checks that we can submit work to task_group asynchronously with waiting. 951 //! \brief \ref regression 952 TEST_CASE("Async task group") { 953 int num_threads = tbb::this_task_arena::max_concurrency(); 954 tbb::task_arena a(2*num_threads, num_threads); 955 utils::SpinBarrier barrier(num_threads + 2); 956 tbb::task_group tg[2]; 957 std::atomic<bool> finished[2]{}; 958 finished[0] = false; finished[1] = false; 959 for (int i = 0; i < 2; ++i) { 960 a.enqueue([i, &tg, &finished, &barrier] { 961 barrier.wait(); 962 for (int j = 0; j < 10000; ++j) { 963 tg[i].run([] {}); 964 utils::yield(); 965 } 966 finished[i] = true; 967 }); 968 } 969 utils::NativeParallelFor(num_threads, [&](int idx) { 970 barrier.wait(); 971 a.execute([idx, &tg, &finished] { 972 std::size_t counter{}; 973 while (!finished[idx%2]) { 974 tg[idx%2].wait(); 975 if (counter++ % 16 == 0) utils::yield(); 976 } 977 tg[idx%2].wait(); 978 }); 979 }); 980 } 981