1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #if __TBB_CPF_BUILD 18 #define TBB_PREVIEW_ISOLATED_TASK_GROUP 1 19 #endif 20 21 #include "common/test.h" 22 #include "common/utils.h" 23 #include "oneapi/tbb/detail/_config.h" 24 #include "tbb/global_control.h" 25 26 #include "tbb/task_group.h" 27 28 #include "common/concurrency_tracker.h" 29 30 #include <atomic> 31 32 //! \file test_task_group.cpp 33 //! \brief Test for [scheduler.task_group scheduler.task_group_status] specification 34 35 unsigned g_MaxConcurrency = 4; 36 using atomic_t = std::atomic<std::uintptr_t>; 37 unsigned MinThread = 1; 38 unsigned MaxThread = 4; 39 40 //------------------------------------------------------------------------ 41 // Tests for the thread safety of the task_group manipulations 42 //------------------------------------------------------------------------ 43 44 #include "common/spin_barrier.h" 45 46 enum SharingMode { 47 VagabondGroup = 1, 48 ParallelWait = 2 49 }; 50 51 template<typename task_group_type> 52 class SharedGroupBodyImpl : utils::NoCopy, utils::NoAfterlife { 53 static const std::uintptr_t c_numTasks0 = 4096, 54 c_numTasks1 = 1024; 55 56 const std::uintptr_t m_numThreads; 57 const std::uintptr_t m_sharingMode; 58 59 task_group_type *m_taskGroup; 60 atomic_t m_tasksSpawned, 61 m_threadsReady; 62 utils::SpinBarrier m_barrier; 63 64 static atomic_t s_tasksExecuted; 65 66 struct TaskFunctor { 67 SharedGroupBodyImpl *m_pOwner; 68 void operator () () const { 69 if ( m_pOwner->m_sharingMode & ParallelWait ) { 70 while ( utils::ConcurrencyTracker::PeakParallelism() < m_pOwner->m_numThreads ) 71 utils::yield(); 72 } 73 ++s_tasksExecuted; 74 } 75 }; 76 77 TaskFunctor m_taskFunctor; 78 79 void Spawn ( std::uintptr_t numTasks ) { 80 for ( std::uintptr_t i = 0; i < numTasks; ++i ) { 81 ++m_tasksSpawned; 82 utils::ConcurrencyTracker ct; 83 m_taskGroup->run( m_taskFunctor ); 84 } 85 ++m_threadsReady; 86 } 87 88 void DeleteTaskGroup () { 89 delete m_taskGroup; 90 m_taskGroup = NULL; 91 } 92 93 void Wait () { 94 while ( m_threadsReady != m_numThreads ) 95 utils::yield(); 96 const std::uintptr_t numSpawned = c_numTasks0 + c_numTasks1 * (m_numThreads - 1); 97 CHECK_MESSAGE( m_tasksSpawned == numSpawned, "Wrong number of spawned tasks. The test is broken" ); 98 INFO("Max spawning parallelism is " << utils::ConcurrencyTracker::PeakParallelism() << "out of " << g_MaxConcurrency); 99 if ( m_sharingMode & ParallelWait ) { 100 m_barrier.wait( &utils::ConcurrencyTracker::Reset ); 101 { 102 utils::ConcurrencyTracker ct; 103 m_taskGroup->wait(); 104 } 105 if ( utils::ConcurrencyTracker::PeakParallelism() == 1 ) 106 WARN( "Warning: No parallel waiting detected in TestParallelWait" ); 107 m_barrier.wait(); 108 } 109 else 110 m_taskGroup->wait(); 111 CHECK_MESSAGE( m_tasksSpawned == numSpawned, "No tasks should be spawned after wait starts. The test is broken" ); 112 CHECK_MESSAGE( s_tasksExecuted == numSpawned, "Not all spawned tasks were executed" ); 113 } 114 115 public: 116 SharedGroupBodyImpl ( std::uintptr_t numThreads, std::uintptr_t sharingMode = 0 ) 117 : m_numThreads(numThreads) 118 , m_sharingMode(sharingMode) 119 , m_taskGroup(NULL) 120 , m_barrier(numThreads) 121 { 122 CHECK_MESSAGE( m_numThreads > 1, "SharedGroupBody tests require concurrency" ); 123 if ((m_sharingMode & VagabondGroup) && m_numThreads != 2) { 124 CHECK_MESSAGE(false, "In vagabond mode SharedGroupBody must be used with 2 threads only"); 125 } 126 utils::ConcurrencyTracker::Reset(); 127 s_tasksExecuted = 0; 128 m_tasksSpawned = 0; 129 m_threadsReady = 0; 130 m_taskFunctor.m_pOwner = this; 131 } 132 133 void Run ( std::uintptr_t idx ) { 134 AssertLive(); 135 if ( idx == 0 ) { 136 if (m_taskGroup || m_tasksSpawned) { 137 CHECK_MESSAGE(false, "SharedGroupBody must be reset before reuse"); 138 } 139 m_taskGroup = new task_group_type; 140 Spawn( c_numTasks0 ); 141 Wait(); 142 if ( m_sharingMode & VagabondGroup ) 143 m_barrier.wait(); 144 else 145 DeleteTaskGroup(); 146 } 147 else { 148 while ( m_tasksSpawned == 0 ) 149 utils::yield(); 150 CHECK_MESSAGE ( m_taskGroup, "Task group is not initialized"); 151 Spawn (c_numTasks1); 152 if ( m_sharingMode & ParallelWait ) 153 Wait(); 154 if ( m_sharingMode & VagabondGroup ) { 155 CHECK_MESSAGE ( idx == 1, "In vagabond mode SharedGroupBody must be used with 2 threads only" ); 156 m_barrier.wait(); 157 DeleteTaskGroup(); 158 } 159 } 160 AssertLive(); 161 } 162 }; 163 164 template<typename task_group_type> 165 atomic_t SharedGroupBodyImpl<task_group_type>::s_tasksExecuted; 166 167 template<typename task_group_type> 168 class SharedGroupBody : utils::NoAssign, utils::NoAfterlife { 169 bool m_bOwner; 170 SharedGroupBodyImpl<task_group_type> *m_pImpl; 171 public: 172 SharedGroupBody ( std::uintptr_t numThreads, std::uintptr_t sharingMode = 0 ) 173 : utils::NoAssign() 174 , utils::NoAfterlife() 175 , m_bOwner(true) 176 , m_pImpl( new SharedGroupBodyImpl<task_group_type>(numThreads, sharingMode) ) 177 {} 178 SharedGroupBody ( const SharedGroupBody& src ) 179 : utils::NoAssign() 180 , utils::NoAfterlife() 181 , m_bOwner(false) 182 , m_pImpl(src.m_pImpl) 183 {} 184 ~SharedGroupBody () { 185 if ( m_bOwner ) 186 delete m_pImpl; 187 } 188 void operator() ( std::uintptr_t idx ) const { 189 // Wrap the functior into additional task group to enforce bounding. 190 task_group_type tg; 191 tg.run_and_wait([&] { m_pImpl->Run(idx); }); 192 } 193 }; 194 195 template<typename task_group_type> 196 class RunAndWaitSyncronizationTestBody : utils::NoAssign { 197 utils::SpinBarrier& m_barrier; 198 std::atomic<bool>& m_completed; 199 task_group_type& m_tg; 200 public: 201 RunAndWaitSyncronizationTestBody(utils::SpinBarrier& barrier, std::atomic<bool>& completed, task_group_type& tg) 202 : m_barrier(barrier), m_completed(completed), m_tg(tg) {} 203 204 void operator()() const { 205 m_barrier.wait(); 206 utils::doDummyWork(100000); 207 m_completed = true; 208 } 209 210 void operator()(int id) const { 211 if (id == 0) { 212 m_tg.run_and_wait(*this); 213 } else { 214 m_barrier.wait(); 215 m_tg.wait(); 216 CHECK_MESSAGE(m_completed, "A concurrent waiter has left the wait method earlier than work has finished"); 217 } 218 } 219 }; 220 221 template<typename task_group_type> 222 void TestParallelSpawn () { 223 NativeParallelFor( g_MaxConcurrency, SharedGroupBody<task_group_type>(g_MaxConcurrency) ); 224 } 225 226 template<typename task_group_type> 227 void TestParallelWait () { 228 NativeParallelFor( g_MaxConcurrency, SharedGroupBody<task_group_type>(g_MaxConcurrency, ParallelWait) ); 229 230 utils::SpinBarrier barrier(g_MaxConcurrency); 231 std::atomic<bool> completed; 232 completed = false; 233 task_group_type tg; 234 RunAndWaitSyncronizationTestBody<task_group_type> b(barrier, completed, tg); 235 NativeParallelFor( g_MaxConcurrency, b ); 236 } 237 238 // Tests non-stack-bound task group (the group that is allocated by one thread and destroyed by the other) 239 template<typename task_group_type> 240 void TestVagabondGroup () { 241 NativeParallelFor( 2, SharedGroupBody<task_group_type>(2, VagabondGroup) ); 242 } 243 244 #include "common/memory_usage.h" 245 246 template<typename task_group_type> 247 void TestThreadSafety() { 248 auto tests = [] { 249 for (int trail = 0; trail < 10; ++trail) { 250 TestParallelSpawn<task_group_type>(); 251 TestParallelWait<task_group_type>(); 252 TestVagabondGroup<task_group_type>(); 253 } 254 }; 255 256 // Test and warm up allocator. 257 tests(); 258 259 // Ensure that cosumption is stabilized. 260 std::size_t initial = utils::GetMemoryUsage(); 261 for (;;) { 262 tests(); 263 std::size_t current = utils::GetMemoryUsage(); 264 if (current <= initial) { 265 return; 266 } 267 initial = current; 268 } 269 } 270 //------------------------------------------------------------------------ 271 // Common requisites of the Fibonacci tests 272 //------------------------------------------------------------------------ 273 274 const std::uintptr_t N = 20; 275 const std::uintptr_t F = 6765; 276 277 atomic_t g_Sum; 278 279 #define FIB_TEST_PROLOGUE() \ 280 const unsigned numRepeats = g_MaxConcurrency * 4; \ 281 utils::ConcurrencyTracker::Reset() 282 283 #define FIB_TEST_EPILOGUE(sum) \ 284 CHECK(utils::ConcurrencyTracker::PeakParallelism() <= g_MaxConcurrency); \ 285 CHECK( sum == numRepeats * F ); 286 287 288 // Fibonacci tasks specified as functors 289 template<class task_group_type> 290 class FibTaskBase : utils::NoAssign, utils::NoAfterlife { 291 protected: 292 std::uintptr_t* m_pRes; 293 mutable std::uintptr_t m_Num; 294 virtual void impl() const = 0; 295 public: 296 FibTaskBase( std::uintptr_t* y, std::uintptr_t n ) : m_pRes(y), m_Num(n) {} 297 void operator()() const { 298 utils::ConcurrencyTracker ct; 299 AssertLive(); 300 if( m_Num < 2 ) { 301 *m_pRes = m_Num; 302 } else { 303 impl(); 304 } 305 } 306 virtual ~FibTaskBase() {} 307 }; 308 309 template<class task_group_type> 310 class FibTaskAsymmetricTreeWithFunctor : public FibTaskBase<task_group_type> { 311 public: 312 FibTaskAsymmetricTreeWithFunctor( std::uintptr_t* y, std::uintptr_t n ) : FibTaskBase<task_group_type>(y, n) {} 313 virtual void impl() const override { 314 std::uintptr_t x = ~0u; 315 task_group_type tg; 316 tg.run( FibTaskAsymmetricTreeWithFunctor(&x, this->m_Num-1) ); 317 this->m_Num -= 2; tg.run_and_wait( *this ); 318 *(this->m_pRes) += x; 319 } 320 }; 321 322 template<class task_group_type> 323 class FibTaskSymmetricTreeWithFunctor : public FibTaskBase<task_group_type> { 324 public: 325 FibTaskSymmetricTreeWithFunctor( std::uintptr_t* y, std::uintptr_t n ) : FibTaskBase<task_group_type>(y, n) {} 326 virtual void impl() const override { 327 std::uintptr_t x = ~0u, 328 y = ~0u; 329 task_group_type tg; 330 tg.run( FibTaskSymmetricTreeWithFunctor(&x, this->m_Num-1) ); 331 tg.run( FibTaskSymmetricTreeWithFunctor(&y, this->m_Num-2) ); 332 tg.wait(); 333 *(this->m_pRes) = x + y; 334 } 335 }; 336 337 // Helper functions 338 template<class fib_task> 339 std::uintptr_t RunFibTask(std::uintptr_t n) { 340 std::uintptr_t res = ~0u; 341 fib_task(&res, n)(); 342 return res; 343 } 344 345 template<typename fib_task> 346 void RunFibTest() { 347 FIB_TEST_PROLOGUE(); 348 std::uintptr_t sum = 0; 349 for( unsigned i = 0; i < numRepeats; ++i ) 350 sum += RunFibTask<fib_task>(N); 351 FIB_TEST_EPILOGUE(sum); 352 } 353 354 template<typename fib_task> 355 void FibFunctionNoArgs() { 356 g_Sum += RunFibTask<fib_task>(N); 357 } 358 359 template<typename task_group_type> 360 void TestFibWithLambdas() { 361 FIB_TEST_PROLOGUE(); 362 atomic_t sum; 363 sum = 0; 364 task_group_type tg; 365 for( unsigned i = 0; i < numRepeats; ++i ) 366 tg.run( [&](){sum += RunFibTask<FibTaskSymmetricTreeWithFunctor<task_group_type> >(N);} ); 367 tg.wait(); 368 FIB_TEST_EPILOGUE(sum); 369 } 370 371 template<typename task_group_type> 372 void TestFibWithFunctor() { 373 RunFibTest<FibTaskAsymmetricTreeWithFunctor<task_group_type> >(); 374 RunFibTest< FibTaskSymmetricTreeWithFunctor<task_group_type> >(); 375 } 376 377 template<typename task_group_type> 378 void TestFibWithFunctionPtr() { 379 FIB_TEST_PROLOGUE(); 380 g_Sum = 0; 381 task_group_type tg; 382 for( unsigned i = 0; i < numRepeats; ++i ) 383 tg.run( &FibFunctionNoArgs<FibTaskSymmetricTreeWithFunctor<task_group_type> > ); 384 tg.wait(); 385 FIB_TEST_EPILOGUE(g_Sum); 386 } 387 388 template<typename task_group_type> 389 void RunFibonacciTests() { 390 TestFibWithLambdas<task_group_type>(); 391 TestFibWithFunctor<task_group_type>(); 392 TestFibWithFunctionPtr<task_group_type>(); 393 } 394 395 class test_exception : public std::exception 396 { 397 const char* m_strDescription; 398 public: 399 test_exception ( const char* descr ) : m_strDescription(descr) {} 400 401 const char* what() const throw() override { return m_strDescription; } 402 }; 403 404 #if TBB_USE_CAPTURED_EXCEPTION 405 #include "tbb/tbb_exception.h" 406 typedef tbb::captured_exception TestException; 407 #else 408 typedef test_exception TestException; 409 #endif 410 411 #include <string.h> 412 413 #define NUM_CHORES 512 414 #define NUM_GROUPS 64 415 #define SKIP_CHORES (NUM_CHORES/4) 416 #define SKIP_GROUPS (NUM_GROUPS/4) 417 #define EXCEPTION_DESCR1 "Test exception 1" 418 #define EXCEPTION_DESCR2 "Test exception 2" 419 420 atomic_t g_ExceptionCount; 421 atomic_t g_TaskCount; 422 unsigned g_ExecutedAtCancellation; 423 bool g_Rethrow; 424 bool g_Throw; 425 426 class ThrowingTask : utils::NoAssign, utils::NoAfterlife { 427 atomic_t &m_TaskCount; 428 public: 429 ThrowingTask( atomic_t& counter ) : m_TaskCount(counter) {} 430 void operator() () const { 431 utils::ConcurrencyTracker ct; 432 AssertLive(); 433 if ( g_Throw ) { 434 if ( ++m_TaskCount == SKIP_CHORES ) 435 TBB_TEST_THROW(test_exception(EXCEPTION_DESCR1)); 436 utils::yield(); 437 } 438 else { 439 ++g_TaskCount; 440 while( !tbb::is_current_task_group_canceling() ) 441 utils::yield(); 442 } 443 } 444 }; 445 446 inline void ResetGlobals ( bool bThrow, bool bRethrow ) { 447 g_Throw = bThrow; 448 g_Rethrow = bRethrow; 449 g_ExceptionCount = 0; 450 g_TaskCount = 0; 451 utils::ConcurrencyTracker::Reset(); 452 } 453 454 template<typename task_group_type> 455 void LaunchChildrenWithFunctor () { 456 atomic_t count; 457 count = 0; 458 task_group_type g; 459 for( unsigned i = 0; i < NUM_CHORES; ++i ) 460 g.run( ThrowingTask(count) ); 461 #if TBB_USE_EXCEPTIONS 462 tbb::task_group_status status = tbb::not_complete; 463 bool exceptionCaught = false; 464 try { 465 status = g.wait(); 466 } catch ( TestException& e ) { 467 CHECK_MESSAGE( e.what(), "Empty what() string" ); 468 CHECK_MESSAGE( strcmp(e.what(), EXCEPTION_DESCR1) == 0, "Unknown exception" ); 469 exceptionCaught = true; 470 ++g_ExceptionCount; 471 } catch( ... ) { CHECK_MESSAGE( false, "Unknown exception" ); } 472 if (g_Throw && !exceptionCaught && status != tbb::canceled) { 473 CHECK_MESSAGE(false, "No exception in the child task group"); 474 } 475 if ( g_Rethrow && g_ExceptionCount > SKIP_GROUPS ) { 476 throw test_exception(EXCEPTION_DESCR2); 477 } 478 #else 479 g.wait(); 480 #endif 481 } 482 483 // Tests for cancellation and exception handling behavior 484 template<typename task_group_type> 485 void TestManualCancellationWithFunctor () { 486 ResetGlobals( false, false ); 487 task_group_type tg; 488 for( unsigned i = 0; i < NUM_GROUPS; ++i ) 489 // TBB version does not require taking function address 490 tg.run( &LaunchChildrenWithFunctor<task_group_type> ); 491 CHECK_MESSAGE ( !tbb::is_current_task_group_canceling(), "Unexpected cancellation" ); 492 while ( g_MaxConcurrency > 1 && g_TaskCount == 0 ) 493 utils::yield(); 494 tg.cancel(); 495 g_ExecutedAtCancellation = int(g_TaskCount); 496 tbb::task_group_status status = tg.wait(); 497 CHECK_MESSAGE( status == tbb::canceled, "Task group reported invalid status." ); 498 CHECK_MESSAGE( g_TaskCount <= NUM_GROUPS * NUM_CHORES, "Too many tasks reported. The test is broken" ); 499 CHECK_MESSAGE( g_TaskCount < NUM_GROUPS * NUM_CHORES, "No tasks were cancelled. Cancellation model changed?" ); 500 CHECK_MESSAGE( g_TaskCount <= g_ExecutedAtCancellation + utils::ConcurrencyTracker::PeakParallelism(), "Too many tasks survived cancellation" ); 501 } 502 503 #if TBB_USE_EXCEPTIONS 504 template<typename task_group_type> 505 void TestExceptionHandling1 () { 506 ResetGlobals( true, false ); 507 task_group_type tg; 508 for( unsigned i = 0; i < NUM_GROUPS; ++i ) 509 // TBB version does not require taking function address 510 tg.run( &LaunchChildrenWithFunctor<task_group_type> ); 511 try { 512 tg.wait(); 513 } catch ( ... ) { 514 CHECK_MESSAGE( false, "Unexpected exception" ); 515 } 516 CHECK_MESSAGE( g_ExceptionCount <= NUM_GROUPS, "Too many exceptions from the child groups. The test is broken" ); 517 CHECK_MESSAGE( g_ExceptionCount == NUM_GROUPS, "Not all child groups threw the exception" ); 518 } 519 520 template<typename task_group_type> 521 void TestExceptionHandling2 () { 522 ResetGlobals( true, true ); 523 task_group_type tg; 524 bool exceptionCaught = false; 525 for( unsigned i = 0; i < NUM_GROUPS; ++i ) { 526 // TBB version does not require taking function address 527 tg.run( &LaunchChildrenWithFunctor<task_group_type> ); 528 } 529 try { 530 tg.wait(); 531 } catch ( TestException& e ) { 532 CHECK_MESSAGE( e.what(), "Empty what() string" ); 533 CHECK_MESSAGE( strcmp(e.what(), EXCEPTION_DESCR2) == 0, "Unknown exception" ); 534 exceptionCaught = true; 535 } catch( ... ) { CHECK_MESSAGE( false, "Unknown exception" ); } 536 CHECK_MESSAGE( exceptionCaught, "No exception thrown from the root task group" ); 537 CHECK_MESSAGE( g_ExceptionCount >= SKIP_GROUPS, "Too few exceptions from the child groups. The test is broken" ); 538 CHECK_MESSAGE( g_ExceptionCount <= NUM_GROUPS - SKIP_GROUPS, "Too many exceptions from the child groups. The test is broken" ); 539 CHECK_MESSAGE( g_ExceptionCount < NUM_GROUPS - SKIP_GROUPS, "None of the child groups was cancelled" ); 540 } 541 542 template <typename task_group_type> 543 void TestExceptionHandling3() { 544 task_group_type tg; 545 try { 546 tg.run_and_wait([]() { 547 volatile bool suppress_unreachable_code_warning = true; 548 if (suppress_unreachable_code_warning) { 549 throw 1; 550 } 551 }); 552 } catch (int error) { 553 CHECK(error == 1); 554 } catch ( ... ) { 555 CHECK_MESSAGE( false, "Unexpected exception" ); 556 } 557 } 558 559 template<typename task_group_type> 560 class LaunchChildrenDriver { 561 public: 562 void Launch(task_group_type& tg) { 563 ResetGlobals(false, false); 564 for (unsigned i = 0; i < NUM_GROUPS; ++i) { 565 tg.run(LaunchChildrenWithFunctor<task_group_type>); 566 } 567 CHECK_MESSAGE(!tbb::is_current_task_group_canceling(), "Unexpected cancellation"); 568 while (g_MaxConcurrency > 1 && g_TaskCount == 0) 569 utils::yield(); 570 } 571 572 void Finish() { 573 CHECK_MESSAGE(g_TaskCount <= NUM_GROUPS * NUM_CHORES, "Too many tasks reported. The test is broken"); 574 CHECK_MESSAGE(g_TaskCount < NUM_GROUPS * NUM_CHORES, "No tasks were cancelled. Cancellation model changed?"); 575 CHECK_MESSAGE(g_TaskCount <= g_ExecutedAtCancellation + g_MaxConcurrency, "Too many tasks survived cancellation"); 576 } 577 }; // LaunchChildrenWithTaskHandleDriver 578 579 template<typename task_group_type, bool Throw> 580 void TestMissingWait () { 581 bool exception_occurred = false, 582 unexpected_exception = false; 583 LaunchChildrenDriver<task_group_type> driver; 584 try { 585 task_group_type tg; 586 driver.Launch( tg ); 587 volatile bool suppress_unreachable_code_warning = Throw; 588 if (suppress_unreachable_code_warning) { 589 throw int(); // Initiate stack unwinding 590 } 591 } 592 catch ( const tbb::missing_wait& e ) { 593 CHECK_MESSAGE( e.what(), "Error message is absent" ); 594 exception_occurred = true; 595 unexpected_exception = Throw; 596 } 597 catch ( int ) { 598 exception_occurred = true; 599 unexpected_exception = !Throw; 600 } 601 catch ( ... ) { 602 exception_occurred = unexpected_exception = true; 603 } 604 CHECK( exception_occurred ); 605 CHECK( !unexpected_exception ); 606 driver.Finish(); 607 } 608 #endif 609 610 template<typename task_group_type> 611 void RunCancellationAndExceptionHandlingTests() { 612 TestManualCancellationWithFunctor<task_group_type>(); 613 #if TBB_USE_EXCEPTIONS 614 TestExceptionHandling1<task_group_type>(); 615 TestExceptionHandling2<task_group_type>(); 616 TestExceptionHandling3<task_group_type>(); 617 TestMissingWait<task_group_type, true>(); 618 TestMissingWait<task_group_type, false>(); 619 #endif 620 } 621 622 void EmptyFunction () {} 623 624 struct TestFunctor { 625 void operator()() { CHECK_MESSAGE( false, "Non-const operator called" ); } 626 void operator()() const { /* library requires this overload only */ } 627 }; 628 629 template<typename task_group_type> 630 void TestConstantFunctorRequirement() { 631 task_group_type g; 632 TestFunctor tf; 633 g.run( tf ); g.wait(); 634 g.run_and_wait( tf ); 635 } 636 637 //------------------------------------------------------------------------ 638 namespace TestMoveSemanticsNS { 639 struct TestFunctor { 640 void operator()() const {}; 641 }; 642 643 struct MoveOnlyFunctor : utils::MoveOnly, TestFunctor { 644 MoveOnlyFunctor() : utils::MoveOnly() {}; 645 MoveOnlyFunctor(MoveOnlyFunctor&& other) : utils::MoveOnly(std::move(other)) {}; 646 }; 647 648 struct MovePreferableFunctor : utils::Movable, TestFunctor { 649 MovePreferableFunctor() : utils::Movable() {}; 650 MovePreferableFunctor(MovePreferableFunctor&& other) : utils::Movable(std::move(other)) {}; 651 MovePreferableFunctor(const MovePreferableFunctor& other) : utils::Movable(other) {}; 652 }; 653 654 struct NoMoveNoCopyFunctor : utils::NoCopy, TestFunctor { 655 NoMoveNoCopyFunctor() : utils::NoCopy() {}; 656 // mv ctor is not allowed as cp ctor from parent utils::NoCopy 657 private: 658 NoMoveNoCopyFunctor(NoMoveNoCopyFunctor&&); 659 }; 660 661 template<typename task_group_type> 662 void TestBareFunctors() { 663 task_group_type tg; 664 MovePreferableFunctor mpf; 665 // run_and_wait() doesn't have any copies or moves of arguments inside the impl 666 tg.run_and_wait( NoMoveNoCopyFunctor() ); 667 668 tg.run( MoveOnlyFunctor() ); 669 tg.wait(); 670 671 tg.run( mpf ); 672 tg.wait(); 673 CHECK_MESSAGE(mpf.alive, "object was moved when was passed by lval"); 674 mpf.Reset(); 675 676 tg.run( std::move(mpf) ); 677 tg.wait(); 678 CHECK_MESSAGE(!mpf.alive, "object was copied when was passed by rval"); 679 mpf.Reset(); 680 } 681 } 682 683 template<typename task_group_type> 684 void TestMoveSemantics() { 685 TestMoveSemanticsNS::TestBareFunctors<task_group_type>(); 686 } 687 //------------------------------------------------------------------------ 688 689 // TODO: TBB_REVAMP_TODO - enable when ETS is available 690 #if TBBTEST_USE_TBB && TBB_PREVIEW_ISOLATED_TASK_GROUP 691 namespace TestIsolationNS { 692 class DummyFunctor { 693 public: 694 DummyFunctor() {} 695 void operator()() const { 696 for ( volatile int j = 0; j < 10; ++j ) {} 697 } 698 }; 699 700 template<typename task_group_type> 701 class ParForBody { 702 task_group_type& m_tg; 703 std::atomic<bool>& m_preserved; 704 tbb::enumerable_thread_specific<int>& m_ets; 705 public: 706 ParForBody( 707 task_group_type& tg, 708 std::atomic<bool>& preserved, 709 tbb::enumerable_thread_specific<int>& ets 710 ) : m_tg(tg), m_preserved(preserved), m_ets(ets) {} 711 712 void operator()(int) const { 713 if (++m_ets.local() > 1) m_preserved = false; 714 715 for (int i = 0; i < 1000; ++i) 716 m_tg.run(DummyFunctor()); 717 m_tg.wait(); 718 m_tg.run_and_wait(DummyFunctor()); 719 720 --m_ets.local(); 721 } 722 }; 723 724 template<typename task_group_type> 725 void CheckIsolation(bool isolation_is_expected) { 726 task_group_type tg; 727 std::atomic<bool> isolation_is_preserved; 728 isolation_is_preserved = true; 729 tbb::enumerable_thread_specific<int> ets(0); 730 731 tbb::parallel_for(0, 100, ParForBody<task_group_type>(tg, isolation_is_preserved, ets)); 732 733 ASSERT( 734 isolation_is_expected == isolation_is_preserved, 735 "Actual and expected isolation-related behaviours are different" 736 ); 737 } 738 739 // Should be called only when > 1 thread is used, because otherwise isolation is guaranteed to take place 740 void TestIsolation() { 741 CheckIsolation<tbb::task_group>(false); 742 CheckIsolation<tbb::isolated_task_group>(true); 743 } 744 } 745 #endif 746 747 //! Test for thread safety for the task_group 748 //! \brief \ref error_guessing \ref resource_usage 749 TEST_CASE("Thread safety test for the task group") { 750 for (unsigned p=MinThread; p <= MaxThread; ++p) { 751 if (p < 2) { 752 continue; 753 } 754 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 755 g_MaxConcurrency = p; 756 TestThreadSafety<tbb::task_group>(); 757 } 758 } 759 760 //! Fibonacci test for task group 761 //! \brief \ref interface \ref requirement 762 TEST_CASE("Fibonacci test for the task group") { 763 for (unsigned p=MinThread; p <= MaxThread; ++p) { 764 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 765 g_MaxConcurrency = p; 766 RunFibonacciTests<tbb::task_group>(); 767 } 768 } 769 770 //! Cancellation and exception test for the task group 771 //! \brief \ref interface \ref requirement 772 TEST_CASE("Cancellation and exception test for the task group") { 773 for (unsigned p = MinThread; p <= MaxThread; ++p) { 774 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 775 g_MaxConcurrency = p; 776 RunCancellationAndExceptionHandlingTests<tbb::task_group>(); 777 } 778 } 779 780 //! Constant functor test for the task group 781 //! \brief \ref interface \ref negative 782 TEST_CASE("Constant functor test for the task group") { 783 for (unsigned p=MinThread; p <= MaxThread; ++p) { 784 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 785 g_MaxConcurrency = p; 786 TestConstantFunctorRequirement<tbb::task_group>(); 787 } 788 } 789 790 //! Move semantics test for the task group 791 //! \brief \ref interface \ref requirement 792 TEST_CASE("Move semantics test for the task group") { 793 for (unsigned p=MinThread; p <= MaxThread; ++p) { 794 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 795 g_MaxConcurrency = p; 796 TestMoveSemantics<tbb::task_group>(); 797 } 798 } 799 800 #if TBB_PREVIEW_ISOLATED_TASK_GROUP 801 //! Test for thread safety for the isolated_task_group 802 //! \brief \ref error_guessing 803 TEST_CASE("Thread safety test for the isolated task group") { 804 for (unsigned p=MinThread; p <= MaxThread; ++p) { 805 if (p < 2) { 806 continue; 807 } 808 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 809 g_MaxConcurrency = p; 810 TestThreadSafety<tbb::isolated_task_group>(); 811 } 812 } 813 814 //! Cancellation and exception test for the isolated task group 815 //! \brief \ref interface \ref requirement 816 TEST_CASE("Fibonacci test for the isolated task group") { 817 for (unsigned p=MinThread; p <= MaxThread; ++p) { 818 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 819 g_MaxConcurrency = p; 820 RunFibonacciTests<tbb::isolated_task_group>(); 821 } 822 } 823 824 //! Cancellation and exception test for the isolated task group 825 //! \brief \ref interface \ref requirement 826 TEST_CASE("Cancellation and exception test for the isolated task group") { 827 for (unsigned p=MinThread; p <= MaxThread; ++p) { 828 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 829 g_MaxConcurrency = p; 830 RunCancellationAndExceptionHandlingTests<tbb::isolated_task_group>(); 831 } 832 } 833 834 //! Constant functor test for the isolated task group. 835 //! \brief \ref interface \ref negative 836 TEST_CASE("Constant functor test for the isolated task group") { 837 for (unsigned p=MinThread; p <= MaxThread; ++p) { 838 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 839 g_MaxConcurrency = p; 840 TestConstantFunctorRequirement<tbb::isolated_task_group>(); 841 } 842 } 843 844 //! Move semantics test for the isolated task group. 845 //! \brief \ref interface \ref requirement 846 TEST_CASE("Move semantics test for the isolated task group") { 847 for (unsigned p=MinThread; p <= MaxThread; ++p) { 848 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 849 g_MaxConcurrency = p; 850 TestMoveSemantics<tbb::isolated_task_group>(); 851 } 852 } 853 #endif /* TBB_PREVIEW_ISOLATED_TASK_GROUP */ 854 855 void run_deep_stealing(tbb::task_group& tg1, tbb::task_group& tg2, int num_tasks, std::atomic<int>& tasks_executed) { 856 for (int i = 0; i < num_tasks; ++i) { 857 tg2.run([&tg1, &tasks_executed] { 858 volatile char consume_stack[1000]{}; 859 ++tasks_executed; 860 tg1.wait(); 861 utils::suppress_unused_warning(consume_stack); 862 }); 863 } 864 } 865 866 // TODO: move to the conformance test 867 //! Test for stack overflow avoidance mechanism. 868 //! \brief \ref requirement 869 TEST_CASE("Test for stack overflow avoidance mechanism") { 870 if (tbb::this_task_arena::max_concurrency() < 2) { 871 return; 872 } 873 874 tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, 2); 875 tbb::task_group tg1; 876 tbb::task_group tg2; 877 std::atomic<int> tasks_executed{}; 878 tg1.run_and_wait([&tg1, &tg2, &tasks_executed] { 879 run_deep_stealing(tg1, tg2, 10000, tasks_executed); 880 while (tasks_executed < 100) { 881 // Some stealing is expected to happen. 882 utils::yield(); 883 } 884 CHECK(tasks_executed < 10000); 885 }); 886 tg2.wait(); 887 CHECK(tasks_executed == 10000); 888 } 889 890 //! Test for stack overflow avoidance mechanism. 891 //! \brief \ref error_guessing 892 TEST_CASE("Test for stack overflow avoidance mechanism within arena") { 893 if (tbb::this_task_arena::max_concurrency() < 2) { 894 return; 895 } 896 897 tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, 2); 898 tbb::task_group tg1; 899 tbb::task_group tg2; 900 std::atomic<int> tasks_executed{}; 901 902 // Determine nested task execution limit. 903 int second_thread_executed{}; 904 tg1.run_and_wait([&tg1, &tg2, &tasks_executed, &second_thread_executed] { 905 run_deep_stealing(tg1, tg2, 10000, tasks_executed); 906 do { 907 second_thread_executed = tasks_executed; 908 utils::Sleep(10); 909 } while (second_thread_executed < 100 || second_thread_executed != tasks_executed); 910 CHECK(tasks_executed < 10000); 911 }); 912 tg2.wait(); 913 CHECK(tasks_executed == 10000); 914 915 tasks_executed = 0; 916 tbb::task_arena a(2, 2); 917 tg1.run_and_wait([&a, &tg1, &tg2, &tasks_executed, second_thread_executed] { 918 run_deep_stealing(tg1, tg2, second_thread_executed-1, tasks_executed); 919 while (tasks_executed < second_thread_executed-1) { 920 // Wait until the second thread near the limit. 921 utils::yield(); 922 } 923 tg2.run([&a, &tg1, &tasks_executed] { 924 a.execute([&tg1, &tasks_executed] { 925 volatile char consume_stack[1000]{}; 926 ++tasks_executed; 927 tg1.wait(); 928 utils::suppress_unused_warning(consume_stack); 929 }); 930 }); 931 while (tasks_executed < second_thread_executed) { 932 // Wait until the second joins the arena. 933 utils::yield(); 934 } 935 a.execute([&tg1, &tg2, &tasks_executed] { 936 run_deep_stealing(tg1, tg2, 10000, tasks_executed); 937 }); 938 int currently_executed{}; 939 do { 940 currently_executed = tasks_executed; 941 utils::Sleep(10); 942 } while (currently_executed != tasks_executed); 943 CHECK(tasks_executed < 10000 + second_thread_executed); 944 }); 945 a.execute([&tg2] { 946 tg2.wait(); 947 }); 948 CHECK(tasks_executed == 10000 + second_thread_executed); 949 } 950 951 //! Test checks that we can submit work to task_group asynchronously with waiting. 952 //! \brief \ref regression 953 TEST_CASE("Async task group") { 954 int num_threads = tbb::this_task_arena::max_concurrency(); 955 tbb::task_arena a(2*num_threads, num_threads); 956 utils::SpinBarrier barrier(num_threads + 2); 957 tbb::task_group tg[2]; 958 std::atomic<bool> finished[2]{}; 959 finished[0] = false; finished[1] = false; 960 for (int i = 0; i < 2; ++i) { 961 a.enqueue([i, &tg, &finished, &barrier] { 962 barrier.wait(); 963 for (int j = 0; j < 10000; ++j) { 964 tg[i].run([] {}); 965 utils::yield(); 966 } 967 finished[i] = true; 968 }); 969 } 970 utils::NativeParallelFor(num_threads, [&](int idx) { 971 barrier.wait(); 972 a.execute([idx, &tg, &finished] { 973 std::size_t counter{}; 974 while (!finished[idx%2]) { 975 tg[idx%2].wait(); 976 if (counter++ % 16 == 0) utils::yield(); 977 } 978 tg[idx%2].wait(); 979 }); 980 }); 981 } 982