1 /* 2 Copyright (c) 2005-2020 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #if __TBB_CPF_BUILD 18 #define TBB_PREVIEW_ISOLATED_TASK_GROUP 1 19 #endif 20 21 #include "common/test.h" 22 #include "common/utils.h" 23 #include "tbb/detail/_config.h" 24 #include "tbb/global_control.h" 25 26 #include "tbb/task_group.h" 27 28 #include "common/concurrency_tracker.h" 29 30 #include <atomic> 31 32 //! \file test_task_group.cpp 33 //! \brief Test for [scheduler.task_group scheduler.task_group_status] specification 34 35 unsigned g_MaxConcurrency = 4; 36 using atomic_t = std::atomic<std::uintptr_t>; 37 unsigned MinThread = 1; 38 unsigned MaxThread = 4; 39 40 //------------------------------------------------------------------------ 41 // Tests for the thread safety of the task_group manipulations 42 //------------------------------------------------------------------------ 43 44 #include "common/spin_barrier.h" 45 46 enum SharingMode { 47 VagabondGroup = 1, 48 ParallelWait = 2 49 }; 50 51 template<typename task_group_type> 52 class SharedGroupBodyImpl : utils::NoCopy, utils::NoAfterlife { 53 static const std::uintptr_t c_numTasks0 = 4096, 54 c_numTasks1 = 1024; 55 56 const std::uintptr_t m_numThreads; 57 const std::uintptr_t m_sharingMode; 58 59 task_group_type *m_taskGroup; 60 atomic_t m_tasksSpawned, 61 m_threadsReady; 62 utils::SpinBarrier m_barrier; 63 64 static atomic_t s_tasksExecuted; 65 66 struct TaskFunctor { 67 SharedGroupBodyImpl *m_pOwner; 68 void operator () () const { 69 if ( m_pOwner->m_sharingMode & ParallelWait ) { 70 while ( utils::ConcurrencyTracker::PeakParallelism() < m_pOwner->m_numThreads ) 71 std::this_thread::yield(); 72 } 73 ++s_tasksExecuted; 74 } 75 }; 76 77 TaskFunctor m_taskFunctor; 78 79 void Spawn ( std::uintptr_t numTasks ) { 80 for ( std::uintptr_t i = 0; i < numTasks; ++i ) { 81 ++m_tasksSpawned; 82 utils::ConcurrencyTracker ct; 83 m_taskGroup->run( m_taskFunctor ); 84 } 85 ++m_threadsReady; 86 } 87 88 void DeleteTaskGroup () { 89 delete m_taskGroup; 90 m_taskGroup = NULL; 91 } 92 93 void Wait () { 94 while ( m_threadsReady != m_numThreads ) 95 std::this_thread::yield(); 96 const std::uintptr_t numSpawned = c_numTasks0 + c_numTasks1 * (m_numThreads - 1); 97 CHECK_MESSAGE( m_tasksSpawned == numSpawned, "Wrong number of spawned tasks. The test is broken" ); 98 INFO("Max spawning parallelism is " << utils::ConcurrencyTracker::PeakParallelism() << "out of " << g_MaxConcurrency); 99 if ( m_sharingMode & ParallelWait ) { 100 m_barrier.wait( &utils::ConcurrencyTracker::Reset ); 101 { 102 utils::ConcurrencyTracker ct; 103 m_taskGroup->wait(); 104 } 105 if ( utils::ConcurrencyTracker::PeakParallelism() == 1 ) 106 WARN( "Warning: No parallel waiting detected in TestParallelWait" ); 107 m_barrier.wait(); 108 } 109 else 110 m_taskGroup->wait(); 111 CHECK_MESSAGE( m_tasksSpawned == numSpawned, "No tasks should be spawned after wait starts. The test is broken" ); 112 CHECK_MESSAGE( s_tasksExecuted == numSpawned, "Not all spawned tasks were executed" ); 113 } 114 115 public: 116 SharedGroupBodyImpl ( std::uintptr_t numThreads, std::uintptr_t sharingMode = 0 ) 117 : m_numThreads(numThreads) 118 , m_sharingMode(sharingMode) 119 , m_taskGroup(NULL) 120 , m_barrier(numThreads) 121 { 122 CHECK_MESSAGE( m_numThreads > 1, "SharedGroupBody tests require concurrency" ); 123 if ((m_sharingMode & VagabondGroup) && m_numThreads != 2) { 124 CHECK_MESSAGE(false, "In vagabond mode SharedGroupBody must be used with 2 threads only"); 125 } 126 utils::ConcurrencyTracker::Reset(); 127 s_tasksExecuted = 0; 128 m_tasksSpawned = 0; 129 m_threadsReady = 0; 130 m_taskFunctor.m_pOwner = this; 131 } 132 133 void Run ( std::uintptr_t idx ) { 134 AssertLive(); 135 if ( idx == 0 ) { 136 if (m_taskGroup || m_tasksSpawned) { 137 CHECK_MESSAGE(false, "SharedGroupBody must be reset before reuse"); 138 } 139 #if _MSC_VER && !defined(__INTEL_COMPILER) 140 #pragma warning(push) 141 #pragma warning(disable:4316) // object allocated on the heap may not be aligned 142 #endif 143 #if __TBB_GCC_VERSION >= 70000 && !defined(__clang__) && !defined(__INTEL_COMPILER) 144 #pragma GCC diagnostic push 145 #pragma GCC diagnostic ignored "-Waligned-new=none" 146 #endif 147 m_taskGroup = new task_group_type; 148 #if __TBB_GCC_VERSION >= 70000 && !defined(__clang__) && !defined(__INTEL_COMPILER) 149 #pragma GCC diagnostic pop 150 #endif 151 #if _MSC_VER && !defined(__INTEL_COMPILER) 152 #pragma warning(pop) 153 #endif 154 Spawn( c_numTasks0 ); 155 Wait(); 156 if ( m_sharingMode & VagabondGroup ) 157 m_barrier.wait(); 158 else 159 DeleteTaskGroup(); 160 } 161 else { 162 while ( m_tasksSpawned == 0 ) 163 std::this_thread::yield(); 164 CHECK_MESSAGE ( m_taskGroup, "Task group is not initialized"); 165 Spawn (c_numTasks1); 166 if ( m_sharingMode & ParallelWait ) 167 Wait(); 168 if ( m_sharingMode & VagabondGroup ) { 169 CHECK_MESSAGE ( idx == 1, "In vagabond mode SharedGroupBody must be used with 2 threads only" ); 170 m_barrier.wait(); 171 DeleteTaskGroup(); 172 } 173 } 174 AssertLive(); 175 } 176 }; 177 178 template<typename task_group_type> 179 atomic_t SharedGroupBodyImpl<task_group_type>::s_tasksExecuted; 180 181 template<typename task_group_type> 182 class SharedGroupBody : utils::NoAssign, utils::NoAfterlife { 183 bool m_bOwner; 184 SharedGroupBodyImpl<task_group_type> *m_pImpl; 185 public: 186 SharedGroupBody ( std::uintptr_t numThreads, std::uintptr_t sharingMode = 0 ) 187 : utils::NoAssign() 188 , utils::NoAfterlife() 189 , m_bOwner(true) 190 , m_pImpl( new SharedGroupBodyImpl<task_group_type>(numThreads, sharingMode) ) 191 {} 192 SharedGroupBody ( const SharedGroupBody& src ) 193 : utils::NoAssign() 194 , utils::NoAfterlife() 195 , m_bOwner(false) 196 , m_pImpl(src.m_pImpl) 197 {} 198 ~SharedGroupBody () { 199 if ( m_bOwner ) 200 delete m_pImpl; 201 } 202 void operator() ( std::uintptr_t idx ) const { 203 // Wrap the functior into additional task group to enforce bounding. 204 task_group_type tg; 205 tg.run_and_wait([&] { m_pImpl->Run(idx); }); 206 } 207 }; 208 209 template<typename task_group_type> 210 class RunAndWaitSyncronizationTestBody : utils::NoAssign { 211 utils::SpinBarrier& m_barrier; 212 std::atomic<bool>& m_completed; 213 task_group_type& m_tg; 214 public: 215 RunAndWaitSyncronizationTestBody(utils::SpinBarrier& barrier, std::atomic<bool>& completed, task_group_type& tg) 216 : m_barrier(barrier), m_completed(completed), m_tg(tg) {} 217 218 void operator()() const { 219 m_barrier.wait(); 220 for (volatile int i = 0; i < 100000; ++i) {} 221 m_completed = true; 222 } 223 224 void operator()(int id) const { 225 if (id == 0) { 226 m_tg.run_and_wait(*this); 227 } else { 228 m_barrier.wait(); 229 m_tg.wait(); 230 CHECK_MESSAGE(m_completed, "A concurrent waiter has left the wait method earlier than work has finished"); 231 } 232 } 233 }; 234 235 template<typename task_group_type> 236 void TestParallelSpawn () { 237 NativeParallelFor( g_MaxConcurrency, SharedGroupBody<task_group_type>(g_MaxConcurrency) ); 238 } 239 240 template<typename task_group_type> 241 void TestParallelWait () { 242 NativeParallelFor( g_MaxConcurrency, SharedGroupBody<task_group_type>(g_MaxConcurrency, ParallelWait) ); 243 244 utils::SpinBarrier barrier(g_MaxConcurrency); 245 std::atomic<bool> completed; 246 completed = false; 247 task_group_type tg; 248 RunAndWaitSyncronizationTestBody<task_group_type> b(barrier, completed, tg); 249 NativeParallelFor( g_MaxConcurrency, b ); 250 } 251 252 // Tests non-stack-bound task group (the group that is allocated by one thread and destroyed by the other) 253 template<typename task_group_type> 254 void TestVagabondGroup () { 255 NativeParallelFor( 2, SharedGroupBody<task_group_type>(2, VagabondGroup) ); 256 } 257 258 #include "common/memory_usage.h" 259 260 template<typename task_group_type> 261 void TestThreadSafety() { 262 auto tests = [] { 263 for (int trail = 0; trail < 10; ++trail) { 264 TestParallelSpawn<task_group_type>(); 265 TestParallelWait<task_group_type>(); 266 TestVagabondGroup<task_group_type>(); 267 } 268 }; 269 270 // Test and warm up allocator. 271 tests(); 272 273 // Ensure that cosumption is stabilized. 274 std::size_t initial = utils::GetMemoryUsage(); 275 for (int trail = 0; trail < 20; ++trail) { 276 tests(); 277 std::size_t current = utils::GetMemoryUsage(); 278 if (current <= initial) { 279 return; 280 } 281 initial = current; 282 } 283 CHECK_MESSAGE(false, "Memory leak is detected"); 284 } 285 //------------------------------------------------------------------------ 286 // Common requisites of the Fibonacci tests 287 //------------------------------------------------------------------------ 288 289 const std::uintptr_t N = 20; 290 const std::uintptr_t F = 6765; 291 292 atomic_t g_Sum; 293 294 #define FIB_TEST_PROLOGUE() \ 295 const unsigned numRepeats = g_MaxConcurrency * (TBB_USE_DEBUG ? 4 : 16); \ 296 utils::ConcurrencyTracker::Reset() 297 298 #define FIB_TEST_EPILOGUE(sum) \ 299 CHECK( sum == numRepeats * F ); 300 301 302 // Fibonacci tasks specified as functors 303 template<class task_group_type> 304 class FibTaskBase : utils::NoAssign, utils::NoAfterlife { 305 protected: 306 std::uintptr_t* m_pRes; 307 mutable std::uintptr_t m_Num; 308 virtual void impl() const = 0; 309 public: 310 FibTaskBase( std::uintptr_t* y, std::uintptr_t n ) : m_pRes(y), m_Num(n) {} 311 void operator()() const { 312 utils::ConcurrencyTracker ct; 313 AssertLive(); 314 if( m_Num < 2 ) { 315 *m_pRes = m_Num; 316 } else { 317 impl(); 318 } 319 } 320 virtual ~FibTaskBase() {} 321 }; 322 323 template<class task_group_type> 324 class FibTaskAsymmetricTreeWithFunctor : public FibTaskBase<task_group_type> { 325 public: 326 FibTaskAsymmetricTreeWithFunctor( std::uintptr_t* y, std::uintptr_t n ) : FibTaskBase<task_group_type>(y, n) {} 327 virtual void impl() const override { 328 std::uintptr_t x = ~0u; 329 task_group_type tg; 330 tg.run( FibTaskAsymmetricTreeWithFunctor(&x, this->m_Num-1) ); 331 this->m_Num -= 2; tg.run_and_wait( *this ); 332 *(this->m_pRes) += x; 333 } 334 }; 335 336 template<class task_group_type> 337 class FibTaskSymmetricTreeWithFunctor : public FibTaskBase<task_group_type> { 338 public: 339 FibTaskSymmetricTreeWithFunctor( std::uintptr_t* y, std::uintptr_t n ) : FibTaskBase<task_group_type>(y, n) {} 340 virtual void impl() const override { 341 std::uintptr_t x = ~0u, 342 y = ~0u; 343 task_group_type tg; 344 tg.run( FibTaskSymmetricTreeWithFunctor(&x, this->m_Num-1) ); 345 tg.run( FibTaskSymmetricTreeWithFunctor(&y, this->m_Num-2) ); 346 tg.wait(); 347 *(this->m_pRes) = x + y; 348 } 349 }; 350 351 // Helper functions 352 template<class fib_task> 353 std::uintptr_t RunFibTask(std::uintptr_t n) { 354 std::uintptr_t res = ~0u; 355 fib_task(&res, n)(); 356 return res; 357 } 358 359 template<typename fib_task> 360 void RunFibTest() { 361 FIB_TEST_PROLOGUE(); 362 std::uintptr_t sum = 0; 363 for( unsigned i = 0; i < numRepeats; ++i ) 364 sum += RunFibTask<fib_task>(N); 365 FIB_TEST_EPILOGUE(sum); 366 } 367 368 template<typename fib_task> 369 void FibFunctionNoArgs() { 370 g_Sum += RunFibTask<fib_task>(N); 371 } 372 373 template<typename task_group_type> 374 void TestFibWithLambdas() { 375 FIB_TEST_PROLOGUE(); 376 atomic_t sum; 377 sum = 0; 378 task_group_type tg; 379 for( unsigned i = 0; i < numRepeats; ++i ) 380 tg.run( [&](){sum += RunFibTask<FibTaskSymmetricTreeWithFunctor<task_group_type> >(N);} ); 381 tg.wait(); 382 FIB_TEST_EPILOGUE(sum); 383 } 384 385 template<typename task_group_type> 386 void TestFibWithFunctor() { 387 RunFibTest<FibTaskAsymmetricTreeWithFunctor<task_group_type> >(); 388 RunFibTest< FibTaskSymmetricTreeWithFunctor<task_group_type> >(); 389 } 390 391 template<typename task_group_type> 392 void TestFibWithFunctionPtr() { 393 FIB_TEST_PROLOGUE(); 394 g_Sum = 0; 395 task_group_type tg; 396 for( unsigned i = 0; i < numRepeats; ++i ) 397 tg.run( &FibFunctionNoArgs<FibTaskSymmetricTreeWithFunctor<task_group_type> > ); 398 tg.wait(); 399 FIB_TEST_EPILOGUE(g_Sum); 400 } 401 402 template<typename task_group_type> 403 void RunFibonacciTests() { 404 TestFibWithLambdas<task_group_type>(); 405 TestFibWithFunctor<task_group_type>(); 406 TestFibWithFunctionPtr<task_group_type>(); 407 } 408 409 class test_exception : public std::exception 410 { 411 const char* m_strDescription; 412 public: 413 test_exception ( const char* descr ) : m_strDescription(descr) {} 414 415 const char* what() const throw() override { return m_strDescription; } 416 }; 417 418 #if TBB_USE_CAPTURED_EXCEPTION 419 #include "tbb/tbb_exception.h" 420 typedef tbb::captured_exception TestException; 421 #else 422 typedef test_exception TestException; 423 #endif 424 425 #include <string.h> 426 427 #define NUM_CHORES 512 428 #define NUM_GROUPS 64 429 #define SKIP_CHORES (NUM_CHORES/4) 430 #define SKIP_GROUPS (NUM_GROUPS/4) 431 #define EXCEPTION_DESCR1 "Test exception 1" 432 #define EXCEPTION_DESCR2 "Test exception 2" 433 434 atomic_t g_ExceptionCount; 435 atomic_t g_TaskCount; 436 unsigned g_ExecutedAtCancellation; 437 bool g_Rethrow; 438 bool g_Throw; 439 440 class ThrowingTask : utils::NoAssign, utils::NoAfterlife { 441 atomic_t &m_TaskCount; 442 public: 443 ThrowingTask( atomic_t& counter ) : m_TaskCount(counter) {} 444 void operator() () const { 445 utils::ConcurrencyTracker ct; 446 AssertLive(); 447 if ( g_Throw ) { 448 if ( ++m_TaskCount == SKIP_CHORES ) 449 TBB_TEST_THROW(test_exception(EXCEPTION_DESCR1)); 450 std::this_thread::yield(); 451 } 452 else { 453 ++g_TaskCount; 454 while( !tbb::is_current_task_group_canceling() ) 455 std::this_thread::yield(); 456 } 457 } 458 }; 459 460 inline void ResetGlobals ( bool bThrow, bool bRethrow ) { 461 g_Throw = bThrow; 462 g_Rethrow = bRethrow; 463 g_ExceptionCount = 0; 464 g_TaskCount = 0; 465 utils::ConcurrencyTracker::Reset(); 466 } 467 468 template<typename task_group_type> 469 void LaunchChildrenWithFunctor () { 470 atomic_t count; 471 count = 0; 472 task_group_type g; 473 for( unsigned i = 0; i < NUM_CHORES; ++i ) 474 g.run( ThrowingTask(count) ); 475 #if TBB_USE_EXCEPTIONS 476 tbb::task_group_status status = tbb::not_complete; 477 bool exceptionCaught = false; 478 try { 479 status = g.wait(); 480 } catch ( TestException& e ) { 481 CHECK_MESSAGE( e.what(), "Empty what() string" ); 482 CHECK_MESSAGE( strcmp(e.what(), EXCEPTION_DESCR1) == 0, "Unknown exception" ); 483 exceptionCaught = true; 484 ++g_ExceptionCount; 485 } catch( ... ) { CHECK_MESSAGE( false, "Unknown exception" ); } 486 if (g_Throw && !exceptionCaught && status != tbb::canceled) { 487 CHECK_MESSAGE(false, "No exception in the child task group"); 488 } 489 if ( g_Rethrow && g_ExceptionCount > SKIP_GROUPS ) { 490 throw test_exception(EXCEPTION_DESCR2); 491 } 492 #else 493 g.wait(); 494 #endif 495 } 496 497 // Tests for cancellation and exception handling behavior 498 template<typename task_group_type> 499 void TestManualCancellationWithFunctor () { 500 ResetGlobals( false, false ); 501 task_group_type tg; 502 for( unsigned i = 0; i < NUM_GROUPS; ++i ) 503 // TBB version does not require taking function address 504 tg.run( &LaunchChildrenWithFunctor<task_group_type> ); 505 CHECK_MESSAGE ( !tbb::is_current_task_group_canceling(), "Unexpected cancellation" ); 506 CHECK_MESSAGE ( !tg.is_canceling(), "Unexpected cancellation" ); 507 while ( g_MaxConcurrency > 1 && g_TaskCount == 0 ) 508 std::this_thread::yield(); 509 tg.cancel(); 510 g_ExecutedAtCancellation = int(g_TaskCount); 511 CHECK_MESSAGE ( tg.is_canceling(), "No cancellation reported" ); 512 tg.wait(); 513 CHECK_MESSAGE( g_TaskCount <= NUM_GROUPS * NUM_CHORES, "Too many tasks reported. The test is broken" ); 514 CHECK_MESSAGE( g_TaskCount < NUM_GROUPS * NUM_CHORES, "No tasks were cancelled. Cancellation model changed?" ); 515 CHECK_MESSAGE( g_TaskCount <= g_ExecutedAtCancellation + utils::ConcurrencyTracker::PeakParallelism(), "Too many tasks survived cancellation" ); 516 } 517 518 #if TBB_USE_EXCEPTIONS 519 template<typename task_group_type> 520 void TestExceptionHandling1 () { 521 ResetGlobals( true, false ); 522 task_group_type tg; 523 for( unsigned i = 0; i < NUM_GROUPS; ++i ) 524 // TBB version does not require taking function address 525 tg.run( &LaunchChildrenWithFunctor<task_group_type> ); 526 try { 527 tg.wait(); 528 } catch ( ... ) { 529 CHECK_MESSAGE( false, "Unexpected exception" ); 530 } 531 CHECK_MESSAGE( g_ExceptionCount <= NUM_GROUPS, "Too many exceptions from the child groups. The test is broken" ); 532 CHECK_MESSAGE( g_ExceptionCount == NUM_GROUPS, "Not all child groups threw the exception" ); 533 } 534 535 template<typename task_group_type> 536 void TestExceptionHandling2 () { 537 ResetGlobals( true, true ); 538 task_group_type tg; 539 bool exceptionCaught = false; 540 for( unsigned i = 0; i < NUM_GROUPS; ++i ) { 541 // TBB version does not require taking function address 542 tg.run( &LaunchChildrenWithFunctor<task_group_type> ); 543 } 544 try { 545 tg.wait(); 546 } catch ( TestException& e ) { 547 CHECK_MESSAGE( e.what(), "Empty what() string" ); 548 CHECK_MESSAGE( strcmp(e.what(), EXCEPTION_DESCR2) == 0, "Unknown exception" ); 549 CHECK_MESSAGE ( !tg.is_canceling(), "wait() has not reset cancellation state" ); 550 exceptionCaught = true; 551 } catch( ... ) { CHECK_MESSAGE( false, "Unknown exception" ); } 552 CHECK_MESSAGE( exceptionCaught, "No exception thrown from the root task group" ); 553 CHECK_MESSAGE( g_ExceptionCount >= SKIP_GROUPS, "Too few exceptions from the child groups. The test is broken" ); 554 CHECK_MESSAGE( g_ExceptionCount <= NUM_GROUPS - SKIP_GROUPS, "Too many exceptions from the child groups. The test is broken" ); 555 CHECK_MESSAGE( g_ExceptionCount < NUM_GROUPS - SKIP_GROUPS, "None of the child groups was cancelled" ); 556 } 557 558 template <typename task_group_type> 559 void TestExceptionHandling3() { 560 task_group_type tg; 561 try { 562 tg.run_and_wait([]() { 563 volatile bool suppress_unreachable_code_warning = true; 564 if (suppress_unreachable_code_warning) { 565 throw 1; 566 } 567 }); 568 } catch (int error) { 569 CHECK(error == 1); 570 } catch ( ... ) { 571 CHECK_MESSAGE( false, "Unexpected exception" ); 572 } 573 } 574 575 template<typename task_group_type> 576 class LaunchChildrenDriver { 577 public: 578 void Launch(task_group_type& tg) { 579 ResetGlobals(false, false); 580 for (unsigned i = 0; i < NUM_GROUPS; ++i) { 581 tg.run(LaunchChildrenWithFunctor<task_group_type>); 582 } 583 CHECK_MESSAGE(!tbb::is_current_task_group_canceling(), "Unexpected cancellation"); 584 CHECK_MESSAGE(!tg.is_canceling(), "Unexpected cancellation"); 585 while (g_MaxConcurrency > 1 && g_TaskCount == 0) 586 std::this_thread::yield(); 587 } 588 589 void Finish() { 590 CHECK_MESSAGE(g_TaskCount <= NUM_GROUPS * NUM_CHORES, "Too many tasks reported. The test is broken"); 591 CHECK_MESSAGE(g_TaskCount < NUM_GROUPS * NUM_CHORES, "No tasks were cancelled. Cancellation model changed?"); 592 CHECK_MESSAGE(g_TaskCount <= g_ExecutedAtCancellation + g_MaxConcurrency, "Too many tasks survived cancellation"); 593 } 594 }; // LaunchChildrenWithTaskHandleDriver 595 596 template<typename task_group_type, bool Throw> 597 void TestMissingWait () { 598 bool exception_occurred = false, 599 unexpected_exception = false; 600 LaunchChildrenDriver<task_group_type> driver; 601 try { 602 task_group_type tg; 603 driver.Launch( tg ); 604 volatile bool suppress_unreachable_code_warning = Throw; 605 if (suppress_unreachable_code_warning) { 606 throw int(); // Initiate stack unwinding 607 } 608 } 609 catch ( const tbb::missing_wait& e ) { 610 CHECK_MESSAGE( e.what(), "Error message is absent" ); 611 exception_occurred = true; 612 unexpected_exception = Throw; 613 } 614 catch ( int ) { 615 exception_occurred = true; 616 unexpected_exception = !Throw; 617 } 618 catch ( ... ) { 619 exception_occurred = unexpected_exception = true; 620 } 621 CHECK( exception_occurred ); 622 CHECK( !unexpected_exception ); 623 driver.Finish(); 624 } 625 #endif 626 627 template<typename task_group_type> 628 void RunCancellationAndExceptionHandlingTests() { 629 TestManualCancellationWithFunctor<task_group_type>(); 630 #if TBB_USE_EXCEPTIONS 631 TestExceptionHandling1<task_group_type>(); 632 TestExceptionHandling2<task_group_type>(); 633 TestExceptionHandling3<task_group_type>(); 634 TestMissingWait<task_group_type, true>(); 635 TestMissingWait<task_group_type, false>(); 636 #endif 637 } 638 639 void EmptyFunction () {} 640 641 struct TestFunctor { 642 void operator()() { CHECK_MESSAGE( false, "Non-const operator called" ); } 643 void operator()() const { /* library requires this overload only */ } 644 }; 645 646 template<typename task_group_type> 647 void TestConstantFunctorRequirement() { 648 task_group_type g; 649 TestFunctor tf; 650 g.run( tf ); g.wait(); 651 g.run_and_wait( tf ); 652 } 653 654 //------------------------------------------------------------------------ 655 namespace TestMoveSemanticsNS { 656 struct TestFunctor { 657 void operator()() const {}; 658 }; 659 660 struct MoveOnlyFunctor : utils::MoveOnly, TestFunctor { 661 MoveOnlyFunctor() : utils::MoveOnly() {}; 662 MoveOnlyFunctor(MoveOnlyFunctor&& other) : utils::MoveOnly(std::move(other)) {}; 663 }; 664 665 struct MovePreferableFunctor : utils::Movable, TestFunctor { 666 MovePreferableFunctor() : utils::Movable() {}; 667 MovePreferableFunctor(MovePreferableFunctor&& other) : utils::Movable(std::move(other)) {}; 668 MovePreferableFunctor(const MovePreferableFunctor& other) : utils::Movable(other) {}; 669 }; 670 671 struct NoMoveNoCopyFunctor : utils::NoCopy, TestFunctor { 672 NoMoveNoCopyFunctor() : utils::NoCopy() {}; 673 // mv ctor is not allowed as cp ctor from parent utils::NoCopy 674 private: 675 NoMoveNoCopyFunctor(NoMoveNoCopyFunctor&&); 676 }; 677 678 template<typename task_group_type> 679 void TestBareFunctors() { 680 task_group_type tg; 681 MovePreferableFunctor mpf; 682 // run_and_wait() doesn't have any copies or moves of arguments inside the impl 683 tg.run_and_wait( NoMoveNoCopyFunctor() ); 684 685 tg.run( MoveOnlyFunctor() ); 686 tg.wait(); 687 688 tg.run( mpf ); 689 tg.wait(); 690 CHECK_MESSAGE(mpf.alive, "object was moved when was passed by lval"); 691 mpf.Reset(); 692 693 tg.run( std::move(mpf) ); 694 tg.wait(); 695 CHECK_MESSAGE(!mpf.alive, "object was copied when was passed by rval"); 696 mpf.Reset(); 697 } 698 } 699 700 template<typename task_group_type> 701 void TestMoveSemantics() { 702 TestMoveSemanticsNS::TestBareFunctors<task_group_type>(); 703 } 704 //------------------------------------------------------------------------ 705 706 // TODO: TBB_REVAMP_TODO - enable when ETS is available 707 #if TBBTEST_USE_TBB && TBB_PREVIEW_ISOLATED_TASK_GROUP 708 namespace TestIsolationNS { 709 class DummyFunctor { 710 public: 711 DummyFunctor() {} 712 void operator()() const { 713 for ( volatile int j = 0; j < 10; ++j ) {} 714 } 715 }; 716 717 template<typename task_group_type> 718 class ParForBody { 719 task_group_type& m_tg; 720 std::atomic<bool>& m_preserved; 721 tbb::enumerable_thread_specific<int>& m_ets; 722 public: 723 ParForBody( 724 task_group_type& tg, 725 std::atomic<bool>& preserved, 726 tbb::enumerable_thread_specific<int>& ets 727 ) : m_tg(tg), m_preserved(preserved), m_ets(ets) {} 728 729 void operator()(int) const { 730 if (++m_ets.local() > 1) m_preserved = false; 731 732 for (int i = 0; i < 1000; ++i) 733 m_tg.run(DummyFunctor()); 734 m_tg.wait(); 735 m_tg.run_and_wait(DummyFunctor()); 736 737 --m_ets.local(); 738 } 739 }; 740 741 template<typename task_group_type> 742 void CheckIsolation(bool isolation_is_expected) { 743 task_group_type tg; 744 std::atomic<bool> isolation_is_preserved; 745 isolation_is_preserved = true; 746 tbb::enumerable_thread_specific<int> ets(0); 747 748 tbb::parallel_for(0, 100, ParForBody<task_group_type>(tg, isolation_is_preserved, ets)); 749 750 ASSERT( 751 isolation_is_expected == isolation_is_preserved, 752 "Actual and expected isolation-related behaviours are different" 753 ); 754 } 755 756 // Should be called only when > 1 thread is used, because otherwise isolation is guaranteed to take place 757 void TestIsolation() { 758 CheckIsolation<tbb::task_group>(false); 759 CheckIsolation<tbb::isolated_task_group>(true); 760 } 761 } 762 #endif 763 764 //! Test for thread safety for the task_group 765 //! \brief \ref error_guessing \ref resource_usage 766 TEST_CASE("Thread safety test for the task group") { 767 for (unsigned p=MinThread; p <= MaxThread; ++p) { 768 if (p < 2) { 769 continue; 770 } 771 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 772 g_MaxConcurrency = p; 773 TestThreadSafety<tbb::task_group>(); 774 } 775 } 776 777 //! Fibonacci test for task group 778 //! \brief \ref interface \ref requirement 779 TEST_CASE("Fibonacci test for the task group") { 780 for (unsigned p=MinThread; p <= MaxThread; ++p) { 781 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 782 g_MaxConcurrency = p; 783 RunFibonacciTests<tbb::task_group>(); 784 } 785 } 786 787 //! Cancellation and exception test for the task group 788 //! \brief \ref interface \ref requirement 789 TEST_CASE("Cancellation and exception test for the task group") { 790 for (unsigned p = MinThread; p <= MaxThread; ++p) { 791 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 792 g_MaxConcurrency = p; 793 RunCancellationAndExceptionHandlingTests<tbb::task_group>(); 794 } 795 } 796 797 //! Constant functor test for the task group 798 //! \brief \ref interface \ref negative 799 TEST_CASE("Constant functor test for the task group") { 800 for (unsigned p=MinThread; p <= MaxThread; ++p) { 801 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 802 g_MaxConcurrency = p; 803 TestConstantFunctorRequirement<tbb::task_group>(); 804 } 805 } 806 807 //! Move semantics test for the task group 808 //! \brief \ref interface \ref requirement 809 TEST_CASE("Move semantics test for the task group") { 810 for (unsigned p=MinThread; p <= MaxThread; ++p) { 811 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 812 g_MaxConcurrency = p; 813 TestMoveSemantics<tbb::task_group>(); 814 } 815 } 816 817 #if TBB_PREVIEW_ISOLATED_TASK_GROUP 818 //! Test for thread safety for the isolated_task_group 819 //! \brief \ref error_guessing 820 TEST_CASE("Thread safety test for the isolated task group") { 821 for (unsigned p=MinThread; p <= MaxThread; ++p) { 822 if (p < 2) { 823 continue; 824 } 825 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 826 g_MaxConcurrency = p; 827 TestThreadSafety<tbb::isolated_task_group>(); 828 } 829 } 830 831 //! Cancellation and exception test for the isolated task group 832 //! \brief \ref interface \ref requirement 833 TEST_CASE("Fibonacci test for the isolated task group") { 834 for (unsigned p=MinThread; p <= MaxThread; ++p) { 835 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 836 g_MaxConcurrency = p; 837 RunFibonacciTests<tbb::isolated_task_group>(); 838 } 839 } 840 841 //! Cancellation and exception test for the isolated task group 842 //! \brief \ref interface \ref requirement 843 TEST_CASE("Cancellation and exception test for the isolated task group") { 844 for (unsigned p=MinThread; p <= MaxThread; ++p) { 845 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 846 g_MaxConcurrency = p; 847 RunCancellationAndExceptionHandlingTests<tbb::isolated_task_group>(); 848 } 849 } 850 851 //! Constant functor test for the isolated task group. 852 //! \brief \ref interface \ref negative 853 TEST_CASE("Constant functor test for the isolated task group") { 854 for (unsigned p=MinThread; p <= MaxThread; ++p) { 855 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 856 g_MaxConcurrency = p; 857 TestConstantFunctorRequirement<tbb::isolated_task_group>(); 858 } 859 } 860 861 //! Move semantics test for the isolated task group. 862 //! \brief \ref interface \ref requirement 863 TEST_CASE("Move semantics test for the isolated task group") { 864 for (unsigned p=MinThread; p <= MaxThread; ++p) { 865 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 866 g_MaxConcurrency = p; 867 TestMoveSemantics<tbb::isolated_task_group>(); 868 } 869 } 870 #endif /* TBB_PREVIEW_ISOLATED_TASK_GROUP */ 871 872 void run_deep_stealing(tbb::task_group& tg1, tbb::task_group& tg2, int num_tasks, std::atomic<int>& tasks_executed) { 873 for (int i = 0; i < num_tasks; ++i) { 874 tg2.run([&tg1, &tasks_executed] { 875 volatile char consume_stack[1000]{}; 876 ++tasks_executed; 877 tg1.wait(); 878 utils::suppress_unused_warning(consume_stack); 879 }); 880 } 881 } 882 883 // TODO: move to the conformance test 884 //! Test for stack overflow avoidance mechanism. 885 //! \brief \ref requirement 886 TEST_CASE("Test for stack overflow avoidance mechanism") { 887 if (tbb::this_task_arena::max_concurrency() < 2) { 888 return; 889 } 890 891 tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, 2); 892 tbb::task_group tg1; 893 tbb::task_group tg2; 894 std::atomic<int> tasks_executed{}; 895 tg1.run_and_wait([&tg1, &tg2, &tasks_executed] { 896 run_deep_stealing(tg1, tg2, 10000, tasks_executed); 897 while (tasks_executed < 100) { 898 // Some stealing is expected to happen. 899 std::this_thread::yield(); 900 } 901 CHECK(tasks_executed < 10000); 902 }); 903 tg2.wait(); 904 CHECK(tasks_executed == 10000); 905 } 906 907 //! Test for stack overflow avoidance mechanism. 908 //! \brief \ref error_guessing 909 TEST_CASE("Test for stack overflow avoidance mechanism within arena") { 910 if (tbb::this_task_arena::max_concurrency() < 2) { 911 return; 912 } 913 914 tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, 2); 915 tbb::task_group tg1; 916 tbb::task_group tg2; 917 std::atomic<int> tasks_executed{}; 918 919 // Determine nested task execution limit. 920 int second_thread_executed{}; 921 tg1.run_and_wait([&tg1, &tg2, &tasks_executed, &second_thread_executed] { 922 run_deep_stealing(tg1, tg2, 10000, tasks_executed); 923 do { 924 second_thread_executed = tasks_executed; 925 utils::Sleep(10); 926 } while (second_thread_executed < 100 || second_thread_executed != tasks_executed); 927 CHECK(tasks_executed < 10000); 928 }); 929 tg2.wait(); 930 CHECK(tasks_executed == 10000); 931 932 tasks_executed = 0; 933 tbb::task_arena a(2, 2); 934 tg1.run_and_wait([&a, &tg1, &tg2, &tasks_executed, second_thread_executed] { 935 run_deep_stealing(tg1, tg2, second_thread_executed-1, tasks_executed); 936 while (tasks_executed < second_thread_executed-1) { 937 // Wait until the second thread near the limit. 938 std::this_thread::yield(); 939 } 940 tg2.run([&a, &tg1, &tasks_executed] { 941 a.execute([&tg1, &tasks_executed] { 942 volatile char consume_stack[1000]{}; 943 ++tasks_executed; 944 tg1.wait(); 945 utils::suppress_unused_warning(consume_stack); 946 }); 947 }); 948 while (tasks_executed < second_thread_executed) { 949 // Wait until the second joins the arena. 950 std::this_thread::yield(); 951 } 952 a.execute([&tg1, &tg2, &tasks_executed] { 953 run_deep_stealing(tg1, tg2, 10000, tasks_executed); 954 }); 955 int currently_executed{}; 956 do { 957 currently_executed = tasks_executed; 958 utils::Sleep(10); 959 } while (currently_executed != tasks_executed); 960 CHECK(tasks_executed < 10000 + second_thread_executed); 961 }); 962 a.execute([&tg2] { 963 tg2.wait(); 964 }); 965 CHECK(tasks_executed == 10000 + second_thread_executed); 966 } 967 968 //! Test checks that we can submit work to task_group asynchronously with waiting. 969 //! \brief \ref regression 970 TEST_CASE("Async task group") { 971 int num_threads = tbb::this_task_arena::max_concurrency(); 972 tbb::task_arena a(2*num_threads, num_threads); 973 utils::SpinBarrier barrier(num_threads + 2); 974 tbb::task_group tg[2]; 975 std::atomic<bool> finished[2]{}; 976 finished[0] = false; finished[1] = false; 977 for (int i = 0; i < 2; ++i) { 978 a.enqueue([i, &tg, &finished, &barrier] { 979 barrier.wait(); 980 for (int j = 0; j < 10000; ++j) { 981 tg[i].run([] {}); 982 std::this_thread::yield(); 983 } 984 finished[i] = true; 985 }); 986 } 987 utils::NativeParallelFor(num_threads, [&](int idx) { 988 barrier.wait(); 989 a.execute([idx, &tg, &finished] { 990 while (!finished[idx%2]) { 991 tg[idx%2].wait(); 992 } 993 tg[idx%2].wait(); 994 }); 995 }); 996 } 997