1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #if __TBB_CPF_BUILD 18 #define TBB_PREVIEW_ISOLATED_TASK_GROUP 1 19 #define TBB_PREVIEW_TASK_GROUP_EXTENSIONS 1 20 #endif 21 22 #include "common/test.h" 23 #include "common/utils.h" 24 #include "oneapi/tbb/detail/_config.h" 25 #include "tbb/global_control.h" 26 27 #include "tbb/task_group.h" 28 29 #include "common/concurrency_tracker.h" 30 31 #include <atomic> 32 #include <stdexcept> 33 34 //! \file test_task_group.cpp 35 //! \brief Test for [scheduler.task_group scheduler.task_group_status] specification 36 37 unsigned g_MaxConcurrency = 4; 38 using atomic_t = std::atomic<std::uintptr_t>; 39 unsigned MinThread = 1; 40 unsigned MaxThread = 4; 41 42 //------------------------------------------------------------------------ 43 // Tests for the thread safety of the task_group manipulations 44 //------------------------------------------------------------------------ 45 46 #include "common/spin_barrier.h" 47 48 enum SharingMode { 49 VagabondGroup = 1, 50 ParallelWait = 2 51 }; 52 53 template<typename task_group_type> 54 class SharedGroupBodyImpl : utils::NoCopy, utils::NoAfterlife { 55 static const std::uintptr_t c_numTasks0 = 4096, 56 c_numTasks1 = 1024; 57 58 const std::uintptr_t m_numThreads; 59 const std::uintptr_t m_sharingMode; 60 61 task_group_type *m_taskGroup; 62 atomic_t m_tasksSpawned, 63 m_threadsReady; 64 utils::SpinBarrier m_barrier; 65 66 static atomic_t s_tasksExecuted; 67 68 struct TaskFunctor { 69 SharedGroupBodyImpl *m_pOwner; 70 void operator () () const { 71 if ( m_pOwner->m_sharingMode & ParallelWait ) { 72 while ( utils::ConcurrencyTracker::PeakParallelism() < m_pOwner->m_numThreads ) 73 utils::yield(); 74 } 75 ++s_tasksExecuted; 76 } 77 }; 78 79 TaskFunctor m_taskFunctor; 80 81 void Spawn ( std::uintptr_t numTasks ) { 82 for ( std::uintptr_t i = 0; i < numTasks; ++i ) { 83 ++m_tasksSpawned; 84 utils::ConcurrencyTracker ct; 85 m_taskGroup->run( m_taskFunctor ); 86 } 87 ++m_threadsReady; 88 } 89 90 void DeleteTaskGroup () { 91 delete m_taskGroup; 92 m_taskGroup = NULL; 93 } 94 95 void Wait () { 96 while ( m_threadsReady != m_numThreads ) 97 utils::yield(); 98 const std::uintptr_t numSpawned = c_numTasks0 + c_numTasks1 * (m_numThreads - 1); 99 CHECK_MESSAGE( m_tasksSpawned == numSpawned, "Wrong number of spawned tasks. The test is broken" ); 100 INFO("Max spawning parallelism is " << utils::ConcurrencyTracker::PeakParallelism() << "out of " << g_MaxConcurrency); 101 if ( m_sharingMode & ParallelWait ) { 102 m_barrier.wait( &utils::ConcurrencyTracker::Reset ); 103 { 104 utils::ConcurrencyTracker ct; 105 m_taskGroup->wait(); 106 } 107 if ( utils::ConcurrencyTracker::PeakParallelism() == 1 ) 108 WARN( "Warning: No parallel waiting detected in TestParallelWait" ); 109 m_barrier.wait(); 110 } 111 else 112 m_taskGroup->wait(); 113 CHECK_MESSAGE( m_tasksSpawned == numSpawned, "No tasks should be spawned after wait starts. The test is broken" ); 114 CHECK_MESSAGE( s_tasksExecuted == numSpawned, "Not all spawned tasks were executed" ); 115 } 116 117 public: 118 SharedGroupBodyImpl ( std::uintptr_t numThreads, std::uintptr_t sharingMode = 0 ) 119 : m_numThreads(numThreads) 120 , m_sharingMode(sharingMode) 121 , m_taskGroup(NULL) 122 , m_barrier(numThreads) 123 { 124 CHECK_MESSAGE( m_numThreads > 1, "SharedGroupBody tests require concurrency" ); 125 if ((m_sharingMode & VagabondGroup) && m_numThreads != 2) { 126 CHECK_MESSAGE(false, "In vagabond mode SharedGroupBody must be used with 2 threads only"); 127 } 128 utils::ConcurrencyTracker::Reset(); 129 s_tasksExecuted = 0; 130 m_tasksSpawned = 0; 131 m_threadsReady = 0; 132 m_taskFunctor.m_pOwner = this; 133 } 134 135 void Run ( std::uintptr_t idx ) { 136 AssertLive(); 137 if ( idx == 0 ) { 138 if (m_taskGroup || m_tasksSpawned) { 139 CHECK_MESSAGE(false, "SharedGroupBody must be reset before reuse"); 140 } 141 m_taskGroup = new task_group_type; 142 Spawn( c_numTasks0 ); 143 Wait(); 144 if ( m_sharingMode & VagabondGroup ) 145 m_barrier.wait(); 146 else 147 DeleteTaskGroup(); 148 } 149 else { 150 while ( m_tasksSpawned == 0 ) 151 utils::yield(); 152 CHECK_MESSAGE ( m_taskGroup, "Task group is not initialized"); 153 Spawn (c_numTasks1); 154 if ( m_sharingMode & ParallelWait ) 155 Wait(); 156 if ( m_sharingMode & VagabondGroup ) { 157 CHECK_MESSAGE ( idx == 1, "In vagabond mode SharedGroupBody must be used with 2 threads only" ); 158 m_barrier.wait(); 159 DeleteTaskGroup(); 160 } 161 } 162 AssertLive(); 163 } 164 }; 165 166 template<typename task_group_type> 167 atomic_t SharedGroupBodyImpl<task_group_type>::s_tasksExecuted; 168 169 template<typename task_group_type> 170 class SharedGroupBody : utils::NoAssign, utils::NoAfterlife { 171 bool m_bOwner; 172 SharedGroupBodyImpl<task_group_type> *m_pImpl; 173 public: 174 SharedGroupBody ( std::uintptr_t numThreads, std::uintptr_t sharingMode = 0 ) 175 : utils::NoAssign() 176 , utils::NoAfterlife() 177 , m_bOwner(true) 178 , m_pImpl( new SharedGroupBodyImpl<task_group_type>(numThreads, sharingMode) ) 179 {} 180 SharedGroupBody ( const SharedGroupBody& src ) 181 : utils::NoAssign() 182 , utils::NoAfterlife() 183 , m_bOwner(false) 184 , m_pImpl(src.m_pImpl) 185 {} 186 ~SharedGroupBody () { 187 if ( m_bOwner ) 188 delete m_pImpl; 189 } 190 void operator() ( std::uintptr_t idx ) const { 191 // Wrap the functior into additional task group to enforce bounding. 192 task_group_type tg; 193 tg.run_and_wait([&] { m_pImpl->Run(idx); }); 194 } 195 }; 196 197 template<typename task_group_type> 198 class RunAndWaitSyncronizationTestBody : utils::NoAssign { 199 utils::SpinBarrier& m_barrier; 200 std::atomic<bool>& m_completed; 201 task_group_type& m_tg; 202 public: 203 RunAndWaitSyncronizationTestBody(utils::SpinBarrier& barrier, std::atomic<bool>& completed, task_group_type& tg) 204 : m_barrier(barrier), m_completed(completed), m_tg(tg) {} 205 206 void operator()() const { 207 m_barrier.wait(); 208 utils::doDummyWork(100000); 209 m_completed = true; 210 } 211 212 void operator()(int id) const { 213 if (id == 0) { 214 m_tg.run_and_wait(*this); 215 } else { 216 m_barrier.wait(); 217 m_tg.wait(); 218 CHECK_MESSAGE(m_completed, "A concurrent waiter has left the wait method earlier than work has finished"); 219 } 220 } 221 }; 222 223 template<typename task_group_type> 224 void TestParallelSpawn () { 225 NativeParallelFor( g_MaxConcurrency, SharedGroupBody<task_group_type>(g_MaxConcurrency) ); 226 } 227 228 template<typename task_group_type> 229 void TestParallelWait () { 230 NativeParallelFor( g_MaxConcurrency, SharedGroupBody<task_group_type>(g_MaxConcurrency, ParallelWait) ); 231 232 utils::SpinBarrier barrier(g_MaxConcurrency); 233 std::atomic<bool> completed; 234 completed = false; 235 task_group_type tg; 236 RunAndWaitSyncronizationTestBody<task_group_type> b(barrier, completed, tg); 237 NativeParallelFor( g_MaxConcurrency, b ); 238 } 239 240 // Tests non-stack-bound task group (the group that is allocated by one thread and destroyed by the other) 241 template<typename task_group_type> 242 void TestVagabondGroup () { 243 NativeParallelFor( 2, SharedGroupBody<task_group_type>(2, VagabondGroup) ); 244 } 245 246 #include "common/memory_usage.h" 247 248 template<typename task_group_type> 249 void TestThreadSafety() { 250 auto tests = [] { 251 for (int trail = 0; trail < 10; ++trail) { 252 TestParallelSpawn<task_group_type>(); 253 TestParallelWait<task_group_type>(); 254 TestVagabondGroup<task_group_type>(); 255 } 256 }; 257 258 // Test and warm up allocator. 259 tests(); 260 261 // Ensure that cosumption is stabilized. 262 std::size_t initial = utils::GetMemoryUsage(); 263 for (;;) { 264 tests(); 265 std::size_t current = utils::GetMemoryUsage(); 266 if (current <= initial) { 267 return; 268 } 269 initial = current; 270 } 271 } 272 //------------------------------------------------------------------------ 273 // Common requisites of the Fibonacci tests 274 //------------------------------------------------------------------------ 275 276 const std::uintptr_t N = 20; 277 const std::uintptr_t F = 6765; 278 279 atomic_t g_Sum; 280 281 #define FIB_TEST_PROLOGUE() \ 282 const unsigned numRepeats = g_MaxConcurrency * 4; \ 283 utils::ConcurrencyTracker::Reset() 284 285 #define FIB_TEST_EPILOGUE(sum) \ 286 CHECK(utils::ConcurrencyTracker::PeakParallelism() <= g_MaxConcurrency); \ 287 CHECK( sum == numRepeats * F ); 288 289 290 // Fibonacci tasks specified as functors 291 template<class task_group_type> 292 class FibTaskBase : utils::NoAssign, utils::NoAfterlife { 293 protected: 294 std::uintptr_t* m_pRes; 295 mutable std::uintptr_t m_Num; 296 virtual void impl() const = 0; 297 public: 298 FibTaskBase( std::uintptr_t* y, std::uintptr_t n ) : m_pRes(y), m_Num(n) {} 299 void operator()() const { 300 utils::ConcurrencyTracker ct; 301 AssertLive(); 302 if( m_Num < 2 ) { 303 *m_pRes = m_Num; 304 } else { 305 impl(); 306 } 307 } 308 virtual ~FibTaskBase() {} 309 }; 310 311 template<class task_group_type> 312 class FibTaskAsymmetricTreeWithFunctor : public FibTaskBase<task_group_type> { 313 public: 314 FibTaskAsymmetricTreeWithFunctor( std::uintptr_t* y, std::uintptr_t n ) : FibTaskBase<task_group_type>(y, n) {} 315 virtual void impl() const override { 316 std::uintptr_t x = ~0u; 317 task_group_type tg; 318 tg.run( FibTaskAsymmetricTreeWithFunctor(&x, this->m_Num-1) ); 319 this->m_Num -= 2; tg.run_and_wait( *this ); 320 *(this->m_pRes) += x; 321 } 322 }; 323 324 template<class task_group_type> 325 class FibTaskSymmetricTreeWithFunctor : public FibTaskBase<task_group_type> { 326 public: 327 FibTaskSymmetricTreeWithFunctor( std::uintptr_t* y, std::uintptr_t n ) : FibTaskBase<task_group_type>(y, n) {} 328 virtual void impl() const override { 329 std::uintptr_t x = ~0u, 330 y = ~0u; 331 task_group_type tg; 332 tg.run( FibTaskSymmetricTreeWithFunctor(&x, this->m_Num-1) ); 333 tg.run( FibTaskSymmetricTreeWithFunctor(&y, this->m_Num-2) ); 334 tg.wait(); 335 *(this->m_pRes) = x + y; 336 } 337 }; 338 339 // Helper functions 340 template<class fib_task> 341 std::uintptr_t RunFibTask(std::uintptr_t n) { 342 std::uintptr_t res = ~0u; 343 fib_task(&res, n)(); 344 return res; 345 } 346 347 template<typename fib_task> 348 void RunFibTest() { 349 FIB_TEST_PROLOGUE(); 350 std::uintptr_t sum = 0; 351 for( unsigned i = 0; i < numRepeats; ++i ) 352 sum += RunFibTask<fib_task>(N); 353 FIB_TEST_EPILOGUE(sum); 354 } 355 356 template<typename fib_task> 357 void FibFunctionNoArgs() { 358 g_Sum += RunFibTask<fib_task>(N); 359 } 360 361 template<typename task_group_type> 362 void TestFibWithLambdas() { 363 FIB_TEST_PROLOGUE(); 364 atomic_t sum; 365 sum = 0; 366 task_group_type tg; 367 for( unsigned i = 0; i < numRepeats; ++i ) 368 tg.run( [&](){sum += RunFibTask<FibTaskSymmetricTreeWithFunctor<task_group_type> >(N);} ); 369 tg.wait(); 370 FIB_TEST_EPILOGUE(sum); 371 } 372 373 template<typename task_group_type> 374 void TestFibWithFunctor() { 375 RunFibTest<FibTaskAsymmetricTreeWithFunctor<task_group_type> >(); 376 RunFibTest< FibTaskSymmetricTreeWithFunctor<task_group_type> >(); 377 } 378 379 template<typename task_group_type> 380 void TestFibWithFunctionPtr() { 381 FIB_TEST_PROLOGUE(); 382 g_Sum = 0; 383 task_group_type tg; 384 for( unsigned i = 0; i < numRepeats; ++i ) 385 tg.run( &FibFunctionNoArgs<FibTaskSymmetricTreeWithFunctor<task_group_type> > ); 386 tg.wait(); 387 FIB_TEST_EPILOGUE(g_Sum); 388 } 389 390 template<typename task_group_type> 391 void RunFibonacciTests() { 392 TestFibWithLambdas<task_group_type>(); 393 TestFibWithFunctor<task_group_type>(); 394 TestFibWithFunctionPtr<task_group_type>(); 395 } 396 397 class test_exception : public std::exception 398 { 399 const char* m_strDescription; 400 public: 401 test_exception ( const char* descr ) : m_strDescription(descr) {} 402 403 const char* what() const throw() override { return m_strDescription; } 404 }; 405 406 using TestException = test_exception; 407 408 #include <string.h> 409 410 #define NUM_CHORES 512 411 #define NUM_GROUPS 64 412 #define SKIP_CHORES (NUM_CHORES/4) 413 #define SKIP_GROUPS (NUM_GROUPS/4) 414 #define EXCEPTION_DESCR1 "Test exception 1" 415 #define EXCEPTION_DESCR2 "Test exception 2" 416 417 atomic_t g_ExceptionCount; 418 atomic_t g_TaskCount; 419 unsigned g_ExecutedAtCancellation; 420 bool g_Rethrow; 421 bool g_Throw; 422 423 class ThrowingTask : utils::NoAssign, utils::NoAfterlife { 424 atomic_t &m_TaskCount; 425 public: 426 ThrowingTask( atomic_t& counter ) : m_TaskCount(counter) {} 427 void operator() () const { 428 utils::ConcurrencyTracker ct; 429 AssertLive(); 430 if ( g_Throw ) { 431 if ( ++m_TaskCount == SKIP_CHORES ) 432 TBB_TEST_THROW(test_exception(EXCEPTION_DESCR1)); 433 utils::yield(); 434 } 435 else { 436 ++g_TaskCount; 437 while( !tbb::is_current_task_group_canceling() ) 438 utils::yield(); 439 } 440 } 441 }; 442 443 inline void ResetGlobals ( bool bThrow, bool bRethrow ) { 444 g_Throw = bThrow; 445 g_Rethrow = bRethrow; 446 g_ExceptionCount = 0; 447 g_TaskCount = 0; 448 utils::ConcurrencyTracker::Reset(); 449 } 450 451 template<typename task_group_type> 452 void LaunchChildrenWithFunctor () { 453 atomic_t count; 454 count = 0; 455 task_group_type g; 456 for (unsigned i = 0; i < NUM_CHORES; ++i) { 457 if (i % 2 == 1) { 458 g.run(g.defer(ThrowingTask(count))); 459 } else 460 { 461 g.run(ThrowingTask(count)); 462 } 463 } 464 #if TBB_USE_EXCEPTIONS 465 tbb::task_group_status status = tbb::not_complete; 466 bool exceptionCaught = false; 467 try { 468 status = g.wait(); 469 } catch ( TestException& e ) { 470 CHECK_MESSAGE( e.what(), "Empty what() string" ); 471 CHECK_MESSAGE( strcmp(e.what(), EXCEPTION_DESCR1) == 0, "Unknown exception" ); 472 exceptionCaught = true; 473 ++g_ExceptionCount; 474 } catch( ... ) { CHECK_MESSAGE( false, "Unknown exception" ); } 475 if (g_Throw && !exceptionCaught && status != tbb::canceled) { 476 CHECK_MESSAGE(false, "No exception in the child task group"); 477 } 478 if ( g_Rethrow && g_ExceptionCount > SKIP_GROUPS ) { 479 throw test_exception(EXCEPTION_DESCR2); 480 } 481 #else 482 g.wait(); 483 #endif 484 } 485 486 // Tests for cancellation and exception handling behavior 487 template<typename task_group_type> 488 void TestManualCancellationWithFunctor () { 489 ResetGlobals( false, false ); 490 task_group_type tg; 491 for (unsigned i = 0; i < NUM_GROUPS; ++i) { 492 // TBB version does not require taking function address 493 if (i % 2 == 0) { 494 auto h = tg.defer(&LaunchChildrenWithFunctor<task_group_type>); 495 tg.run(std::move(h)); 496 } else 497 { 498 tg.run(&LaunchChildrenWithFunctor<task_group_type>); 499 } 500 } 501 CHECK_MESSAGE ( !tbb::is_current_task_group_canceling(), "Unexpected cancellation" ); 502 while ( g_MaxConcurrency > 1 && g_TaskCount == 0 ) 503 utils::yield(); 504 tg.cancel(); 505 g_ExecutedAtCancellation = int(g_TaskCount); 506 tbb::task_group_status status = tg.wait(); 507 CHECK_MESSAGE( status == tbb::canceled, "Task group reported invalid status." ); 508 CHECK_MESSAGE( g_TaskCount <= NUM_GROUPS * NUM_CHORES, "Too many tasks reported. The test is broken" ); 509 CHECK_MESSAGE( g_TaskCount < NUM_GROUPS * NUM_CHORES, "No tasks were cancelled. Cancellation model changed?" ); 510 CHECK_MESSAGE( g_TaskCount <= g_ExecutedAtCancellation + utils::ConcurrencyTracker::PeakParallelism(), "Too many tasks survived cancellation" ); 511 } 512 513 #if TBB_USE_EXCEPTIONS 514 template<typename task_group_type> 515 void TestExceptionHandling1 () { 516 ResetGlobals( true, false ); 517 task_group_type tg; 518 for( unsigned i = 0; i < NUM_GROUPS; ++i ) 519 // TBB version does not require taking function address 520 tg.run( &LaunchChildrenWithFunctor<task_group_type> ); 521 try { 522 tg.wait(); 523 } catch ( ... ) { 524 CHECK_MESSAGE( false, "Unexpected exception" ); 525 } 526 CHECK_MESSAGE( g_ExceptionCount <= NUM_GROUPS, "Too many exceptions from the child groups. The test is broken" ); 527 CHECK_MESSAGE( g_ExceptionCount == NUM_GROUPS, "Not all child groups threw the exception" ); 528 } 529 530 template<typename task_group_type> 531 void TestExceptionHandling2 () { 532 ResetGlobals( true, true ); 533 task_group_type tg; 534 bool exceptionCaught = false; 535 for( unsigned i = 0; i < NUM_GROUPS; ++i ) { 536 // TBB version does not require taking function address 537 tg.run( &LaunchChildrenWithFunctor<task_group_type> ); 538 } 539 try { 540 tg.wait(); 541 } catch ( TestException& e ) { 542 CHECK_MESSAGE( e.what(), "Empty what() string" ); 543 CHECK_MESSAGE( strcmp(e.what(), EXCEPTION_DESCR2) == 0, "Unknown exception" ); 544 exceptionCaught = true; 545 } catch( ... ) { CHECK_MESSAGE( false, "Unknown exception" ); } 546 CHECK_MESSAGE( exceptionCaught, "No exception thrown from the root task group" ); 547 CHECK_MESSAGE( g_ExceptionCount >= SKIP_GROUPS, "Too few exceptions from the child groups. The test is broken" ); 548 CHECK_MESSAGE( g_ExceptionCount <= NUM_GROUPS - SKIP_GROUPS, "Too many exceptions from the child groups. The test is broken" ); 549 CHECK_MESSAGE( g_ExceptionCount < NUM_GROUPS - SKIP_GROUPS, "None of the child groups was cancelled" ); 550 } 551 552 template <typename task_group_type> 553 void TestExceptionHandling3() { 554 task_group_type tg; 555 try { 556 tg.run_and_wait([]() { 557 volatile bool suppress_unreachable_code_warning = true; 558 if (suppress_unreachable_code_warning) { 559 throw 1; 560 } 561 }); 562 } catch (int error) { 563 CHECK(error == 1); 564 } catch ( ... ) { 565 CHECK_MESSAGE( false, "Unexpected exception" ); 566 } 567 } 568 569 template<typename task_group_type> 570 class LaunchChildrenDriver { 571 public: 572 void Launch(task_group_type& tg) { 573 ResetGlobals(false, false); 574 for (unsigned i = 0; i < NUM_GROUPS; ++i) { 575 tg.run(LaunchChildrenWithFunctor<task_group_type>); 576 } 577 CHECK_MESSAGE(!tbb::is_current_task_group_canceling(), "Unexpected cancellation"); 578 while (g_MaxConcurrency > 1 && g_TaskCount == 0) 579 utils::yield(); 580 } 581 582 void Finish() { 583 CHECK_MESSAGE(g_TaskCount <= NUM_GROUPS * NUM_CHORES, "Too many tasks reported. The test is broken"); 584 CHECK_MESSAGE(g_TaskCount < NUM_GROUPS * NUM_CHORES, "No tasks were cancelled. Cancellation model changed?"); 585 CHECK_MESSAGE(g_TaskCount <= g_ExecutedAtCancellation + g_MaxConcurrency, "Too many tasks survived cancellation"); 586 } 587 }; // LaunchChildrenWithTaskHandleDriver 588 589 template<typename task_group_type, bool Throw> 590 void TestMissingWait () { 591 bool exception_occurred = false, 592 unexpected_exception = false; 593 LaunchChildrenDriver<task_group_type> driver; 594 try { 595 task_group_type tg; 596 driver.Launch( tg ); 597 volatile bool suppress_unreachable_code_warning = Throw; 598 if (suppress_unreachable_code_warning) { 599 throw int(); // Initiate stack unwinding 600 } 601 } 602 catch ( const tbb::missing_wait& e ) { 603 CHECK_MESSAGE( e.what(), "Error message is absent" ); 604 exception_occurred = true; 605 unexpected_exception = Throw; 606 } 607 catch ( int ) { 608 exception_occurred = true; 609 unexpected_exception = !Throw; 610 } 611 catch ( ... ) { 612 exception_occurred = unexpected_exception = true; 613 } 614 CHECK( exception_occurred ); 615 CHECK( !unexpected_exception ); 616 driver.Finish(); 617 } 618 #endif 619 620 template<typename task_group_type> 621 void RunCancellationAndExceptionHandlingTests() { 622 TestManualCancellationWithFunctor<task_group_type>(); 623 #if TBB_USE_EXCEPTIONS 624 TestExceptionHandling1<task_group_type>(); 625 TestExceptionHandling2<task_group_type>(); 626 TestExceptionHandling3<task_group_type>(); 627 TestMissingWait<task_group_type, true>(); 628 TestMissingWait<task_group_type, false>(); 629 #endif 630 } 631 632 void EmptyFunction () {} 633 634 struct TestFunctor { 635 void operator()() { CHECK_MESSAGE( false, "Non-const operator called" ); } 636 void operator()() const { /* library requires this overload only */ } 637 }; 638 639 template<typename task_group_type> 640 void TestConstantFunctorRequirement() { 641 task_group_type g; 642 TestFunctor tf; 643 g.run( tf ); g.wait(); 644 g.run_and_wait( tf ); 645 } 646 647 //------------------------------------------------------------------------ 648 namespace TestMoveSemanticsNS { 649 struct TestFunctor { 650 void operator()() const {}; 651 }; 652 653 struct MoveOnlyFunctor : utils::MoveOnly, TestFunctor { 654 MoveOnlyFunctor() : utils::MoveOnly() {}; 655 MoveOnlyFunctor(MoveOnlyFunctor&& other) : utils::MoveOnly(std::move(other)) {}; 656 }; 657 658 struct MovePreferableFunctor : utils::Movable, TestFunctor { 659 MovePreferableFunctor() : utils::Movable() {}; 660 MovePreferableFunctor(MovePreferableFunctor&& other) : utils::Movable(std::move(other)) {}; 661 MovePreferableFunctor(const MovePreferableFunctor& other) : utils::Movable(other) {}; 662 }; 663 664 struct NoMoveNoCopyFunctor : utils::NoCopy, TestFunctor { 665 NoMoveNoCopyFunctor() : utils::NoCopy() {}; 666 // mv ctor is not allowed as cp ctor from parent utils::NoCopy 667 private: 668 NoMoveNoCopyFunctor(NoMoveNoCopyFunctor&&); 669 }; 670 671 template<typename task_group_type> 672 void TestBareFunctors() { 673 task_group_type tg; 674 MovePreferableFunctor mpf; 675 // run_and_wait() doesn't have any copies or moves of arguments inside the impl 676 tg.run_and_wait( NoMoveNoCopyFunctor() ); 677 678 tg.run( MoveOnlyFunctor() ); 679 tg.wait(); 680 681 tg.run( mpf ); 682 tg.wait(); 683 CHECK_MESSAGE(mpf.alive, "object was moved when was passed by lval"); 684 mpf.Reset(); 685 686 tg.run( std::move(mpf) ); 687 tg.wait(); 688 CHECK_MESSAGE(!mpf.alive, "object was copied when was passed by rval"); 689 mpf.Reset(); 690 } 691 } 692 693 template<typename task_group_type> 694 void TestMoveSemantics() { 695 TestMoveSemanticsNS::TestBareFunctors<task_group_type>(); 696 } 697 //------------------------------------------------------------------------ 698 699 // TODO: TBB_REVAMP_TODO - enable when ETS is available 700 #if TBBTEST_USE_TBB && TBB_PREVIEW_ISOLATED_TASK_GROUP 701 namespace TestIsolationNS { 702 class DummyFunctor { 703 public: 704 DummyFunctor() {} 705 void operator()() const { 706 for ( volatile int j = 0; j < 10; ++j ) {} 707 } 708 }; 709 710 template<typename task_group_type> 711 class ParForBody { 712 task_group_type& m_tg; 713 std::atomic<bool>& m_preserved; 714 tbb::enumerable_thread_specific<int>& m_ets; 715 public: 716 ParForBody( 717 task_group_type& tg, 718 std::atomic<bool>& preserved, 719 tbb::enumerable_thread_specific<int>& ets 720 ) : m_tg(tg), m_preserved(preserved), m_ets(ets) {} 721 722 void operator()(int) const { 723 if (++m_ets.local() > 1) m_preserved = false; 724 725 for (int i = 0; i < 1000; ++i) 726 m_tg.run(DummyFunctor()); 727 m_tg.wait(); 728 m_tg.run_and_wait(DummyFunctor()); 729 730 --m_ets.local(); 731 } 732 }; 733 734 template<typename task_group_type> 735 void CheckIsolation(bool isolation_is_expected) { 736 task_group_type tg; 737 std::atomic<bool> isolation_is_preserved; 738 isolation_is_preserved = true; 739 tbb::enumerable_thread_specific<int> ets(0); 740 741 tbb::parallel_for(0, 100, ParForBody<task_group_type>(tg, isolation_is_preserved, ets)); 742 743 ASSERT( 744 isolation_is_expected == isolation_is_preserved, 745 "Actual and expected isolation-related behaviours are different" 746 ); 747 } 748 749 // Should be called only when > 1 thread is used, because otherwise isolation is guaranteed to take place 750 void TestIsolation() { 751 CheckIsolation<tbb::task_group>(false); 752 CheckIsolation<tbb::isolated_task_group>(true); 753 } 754 } 755 #endif 756 757 #if __TBB_USE_ADDRESS_SANITIZER 758 //! Test for thread safety for the task_group 759 //! \brief \ref error_guessing \ref resource_usage 760 TEST_CASE("Memory leaks test is not applicable under ASAN\n" * doctest::skip(true)) {} 761 #else 762 //! Test for thread safety for the task_group 763 //! \brief \ref error_guessing \ref resource_usage 764 TEST_CASE("Thread safety test for the task group") { 765 if (tbb::this_task_arena::max_concurrency() < 2) { 766 // The test requires more than one thread to check thread safety 767 return; 768 } 769 for (unsigned p=MinThread; p <= MaxThread; ++p) { 770 if (p < 2) { 771 continue; 772 } 773 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 774 g_MaxConcurrency = p; 775 TestThreadSafety<tbb::task_group>(); 776 } 777 } 778 #endif 779 780 //! Fibonacci test for task group 781 //! \brief \ref interface \ref requirement 782 TEST_CASE("Fibonacci test for the task group") { 783 for (unsigned p=MinThread; p <= MaxThread; ++p) { 784 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 785 g_MaxConcurrency = p; 786 RunFibonacciTests<tbb::task_group>(); 787 } 788 } 789 790 //! Cancellation and exception test for the task group 791 //! \brief \ref interface \ref requirement 792 TEST_CASE("Cancellation and exception test for the task group") { 793 for (unsigned p = MinThread; p <= MaxThread; ++p) { 794 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 795 tbb::task_arena a(p); 796 g_MaxConcurrency = p; 797 a.execute([] { 798 RunCancellationAndExceptionHandlingTests<tbb::task_group>(); 799 }); 800 } 801 } 802 803 //! Constant functor test for the task group 804 //! \brief \ref interface \ref negative 805 TEST_CASE("Constant functor test for the task group") { 806 for (unsigned p=MinThread; p <= MaxThread; ++p) { 807 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 808 g_MaxConcurrency = p; 809 TestConstantFunctorRequirement<tbb::task_group>(); 810 } 811 } 812 813 //! Move semantics test for the task group 814 //! \brief \ref interface \ref requirement 815 TEST_CASE("Move semantics test for the task group") { 816 for (unsigned p=MinThread; p <= MaxThread; ++p) { 817 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 818 g_MaxConcurrency = p; 819 TestMoveSemantics<tbb::task_group>(); 820 } 821 } 822 823 #if TBB_PREVIEW_ISOLATED_TASK_GROUP 824 825 #if __TBB_USE_ADDRESS_SANITIZER 826 //! Test for thread safety for the isolated_task_group 827 //! \brief \ref error_guessing 828 TEST_CASE("Memory leaks test is not applicable under ASAN\n" * doctest::skip(true)) {} 829 #else 830 //! Test for thread safety for the isolated_task_group 831 //! \brief \ref error_guessing 832 TEST_CASE("Thread safety test for the isolated task group") { 833 if (tbb::this_task_arena::max_concurrency() < 2) { 834 // The test requires more than one thread to check thread safety 835 return; 836 } 837 for (unsigned p=MinThread; p <= MaxThread; ++p) { 838 if (p < 2) { 839 continue; 840 } 841 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 842 g_MaxConcurrency = p; 843 TestThreadSafety<tbb::isolated_task_group>(); 844 } 845 } 846 #endif 847 848 //! Cancellation and exception test for the isolated task group 849 //! \brief \ref interface \ref requirement 850 TEST_CASE("Fibonacci test for the isolated task group") { 851 for (unsigned p=MinThread; p <= MaxThread; ++p) { 852 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 853 g_MaxConcurrency = p; 854 RunFibonacciTests<tbb::isolated_task_group>(); 855 } 856 } 857 858 //! Cancellation and exception test for the isolated task group 859 //! \brief \ref interface \ref requirement 860 TEST_CASE("Cancellation and exception test for the isolated task group") { 861 for (unsigned p=MinThread; p <= MaxThread; ++p) { 862 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 863 g_MaxConcurrency = p; 864 RunCancellationAndExceptionHandlingTests<tbb::isolated_task_group>(); 865 } 866 } 867 868 //! Constant functor test for the isolated task group. 869 //! \brief \ref interface \ref negative 870 TEST_CASE("Constant functor test for the isolated task group") { 871 for (unsigned p=MinThread; p <= MaxThread; ++p) { 872 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 873 g_MaxConcurrency = p; 874 TestConstantFunctorRequirement<tbb::isolated_task_group>(); 875 } 876 } 877 878 //! Move semantics test for the isolated task group. 879 //! \brief \ref interface \ref requirement 880 TEST_CASE("Move semantics test for the isolated task group") { 881 for (unsigned p=MinThread; p <= MaxThread; ++p) { 882 tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p); 883 g_MaxConcurrency = p; 884 TestMoveSemantics<tbb::isolated_task_group>(); 885 } 886 } 887 888 //TODO: add test void isolated_task_group::run(d2::task_handle&& h) and isolated_task_group::::run_and_wait(d2::task_handle&& h) 889 #endif /* TBB_PREVIEW_ISOLATED_TASK_GROUP */ 890 891 void run_deep_stealing(tbb::task_group& tg1, tbb::task_group& tg2, int num_tasks, std::atomic<int>& tasks_executed) { 892 for (int i = 0; i < num_tasks; ++i) { 893 tg2.run([&tg1, &tasks_executed] { 894 volatile char consume_stack[1000]{}; 895 ++tasks_executed; 896 tg1.wait(); 897 utils::suppress_unused_warning(consume_stack); 898 }); 899 } 900 } 901 902 // TODO: move to the conformance test 903 //! Test for stack overflow avoidance mechanism. 904 //! \brief \ref requirement 905 TEST_CASE("Test for stack overflow avoidance mechanism") { 906 if (tbb::this_task_arena::max_concurrency() < 2) { 907 return; 908 } 909 910 tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, 2); 911 tbb::task_group tg1; 912 tbb::task_group tg2; 913 std::atomic<int> tasks_executed{}; 914 tg1.run_and_wait([&tg1, &tg2, &tasks_executed] { 915 run_deep_stealing(tg1, tg2, 10000, tasks_executed); 916 while (tasks_executed < 100) { 917 // Some stealing is expected to happen. 918 utils::yield(); 919 } 920 CHECK(tasks_executed < 10000); 921 }); 922 tg2.wait(); 923 CHECK(tasks_executed == 10000); 924 } 925 926 //! Test for stack overflow avoidance mechanism. 927 //! \brief \ref error_guessing 928 TEST_CASE("Test for stack overflow avoidance mechanism within arena") { 929 if (tbb::this_task_arena::max_concurrency() < 2) { 930 return; 931 } 932 933 tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, 2); 934 tbb::task_group tg1; 935 tbb::task_group tg2; 936 std::atomic<int> tasks_executed{}; 937 938 // Determine nested task execution limit. 939 int second_thread_executed{}; 940 tg1.run_and_wait([&tg1, &tg2, &tasks_executed, &second_thread_executed] { 941 run_deep_stealing(tg1, tg2, 10000, tasks_executed); 942 do { 943 second_thread_executed = tasks_executed; 944 utils::Sleep(10); 945 } while (second_thread_executed < 100 || second_thread_executed != tasks_executed); 946 CHECK(tasks_executed < 10000); 947 }); 948 tg2.wait(); 949 CHECK(tasks_executed == 10000); 950 951 tasks_executed = 0; 952 tbb::task_arena a(2, 2); 953 tg1.run_and_wait([&a, &tg1, &tg2, &tasks_executed, second_thread_executed] { 954 run_deep_stealing(tg1, tg2, second_thread_executed-1, tasks_executed); 955 while (tasks_executed < second_thread_executed-1) { 956 // Wait until the second thread near the limit. 957 utils::yield(); 958 } 959 tg2.run([&a, &tg1, &tasks_executed] { 960 a.execute([&tg1, &tasks_executed] { 961 volatile char consume_stack[1000]{}; 962 ++tasks_executed; 963 tg1.wait(); 964 utils::suppress_unused_warning(consume_stack); 965 }); 966 }); 967 while (tasks_executed < second_thread_executed) { 968 // Wait until the second joins the arena. 969 utils::yield(); 970 } 971 a.execute([&tg1, &tg2, &tasks_executed] { 972 run_deep_stealing(tg1, tg2, 10000, tasks_executed); 973 }); 974 int currently_executed{}; 975 do { 976 currently_executed = tasks_executed; 977 utils::Sleep(10); 978 } while (currently_executed != tasks_executed); 979 CHECK(tasks_executed < 10000 + second_thread_executed); 980 }); 981 a.execute([&tg2] { 982 tg2.wait(); 983 }); 984 CHECK(tasks_executed == 10000 + second_thread_executed); 985 } 986 987 //! Test checks that we can submit work to task_group asynchronously with waiting. 988 //! \brief \ref regression 989 TEST_CASE("Async task group") { 990 int num_threads = tbb::this_task_arena::max_concurrency(); 991 if (num_threads < 3) { 992 // The test requires at least 2 worker threads 993 return; 994 } 995 tbb::task_arena a(2*num_threads, num_threads); 996 utils::SpinBarrier barrier(num_threads + 2); 997 tbb::task_group tg[2]; 998 std::atomic<bool> finished[2]{}; 999 finished[0] = false; finished[1] = false; 1000 for (int i = 0; i < 2; ++i) { 1001 a.enqueue([i, &tg, &finished, &barrier] { 1002 barrier.wait(); 1003 for (int j = 0; j < 10000; ++j) { 1004 tg[i].run([] {}); 1005 utils::yield(); 1006 } 1007 finished[i] = true; 1008 }); 1009 } 1010 utils::NativeParallelFor(num_threads, [&](int idx) { 1011 barrier.wait(); 1012 a.execute([idx, &tg, &finished] { 1013 std::size_t counter{}; 1014 while (!finished[idx%2]) { 1015 tg[idx%2].wait(); 1016 if (counter++ % 16 == 0) utils::yield(); 1017 } 1018 tg[idx%2].wait(); 1019 }); 1020 }); 1021 } 1022 1023 struct SelfRunner { 1024 tbb::task_group& m_tg; 1025 std::atomic<unsigned>& count; 1026 void operator()() const { 1027 unsigned previous_count = count.fetch_sub(1); 1028 if (previous_count > 1) 1029 m_tg.run( *this ); 1030 } 1031 }; 1032 1033 //! Submit work to single task_group instance from inside the work 1034 //! \brief \ref error_guessing 1035 TEST_CASE("Run self using same task_group instance") { 1036 const unsigned num = 10; 1037 std::atomic<unsigned> count{num}; 1038 tbb::task_group tg; 1039 SelfRunner uf{tg, count}; 1040 tg.run( uf ); 1041 tg.wait(); 1042 CHECK_MESSAGE( 1043 count == 0, 1044 "Not all tasks were spawned from inside the functor running within task_group." 1045 ); 1046 } 1047 1048 //TODO: move to some common place to share with conformance tests 1049 namespace accept_task_group_context { 1050 1051 template <typename TaskGroup, typename CancelF, typename WaitF> 1052 void run_cancellation_use_case(CancelF&& cancel, WaitF&& wait) { 1053 std::atomic<bool> outer_cancelled{false}; 1054 std::atomic<unsigned> count{13}; 1055 1056 tbb::task_group_context inner_ctx(tbb::task_group_context::isolated); 1057 TaskGroup inner_tg(inner_ctx); 1058 1059 tbb::task_group outer_tg; 1060 auto outer_tg_task = [&] { 1061 inner_tg.run([&] { 1062 utils::SpinWaitUntilEq(outer_cancelled, true); 1063 inner_tg.run( SelfRunner{inner_tg, count} ); 1064 }); 1065 1066 utils::try_call([&] { 1067 std::forward<CancelF>(cancel)(outer_tg); 1068 }).on_completion([&] { 1069 outer_cancelled = true; 1070 }); 1071 }; 1072 1073 auto check = [&] { 1074 tbb::task_group_status outer_status = tbb::task_group_status::not_complete; 1075 outer_status = std::forward<WaitF>(wait)(outer_tg); 1076 CHECK_MESSAGE( 1077 outer_status == tbb::task_group_status::canceled, 1078 "Outer task group should have been cancelled." 1079 ); 1080 1081 tbb::task_group_status inner_status = inner_tg.wait(); 1082 CHECK_MESSAGE( 1083 inner_status == tbb::task_group_status::complete, 1084 "Inner task group should have completed despite the cancellation of the outer one." 1085 ); 1086 1087 CHECK_MESSAGE(0 == count, "Some of the inner group tasks were not executed."); 1088 }; 1089 1090 outer_tg.run(outer_tg_task); 1091 check(); 1092 } 1093 1094 template <typename TaskGroup> 1095 void test() { 1096 run_cancellation_use_case<TaskGroup>( 1097 [](tbb::task_group& outer) { outer.cancel(); }, 1098 [](tbb::task_group& outer) { return outer.wait(); } 1099 ); 1100 1101 #if TBB_USE_EXCEPTIONS 1102 run_cancellation_use_case<TaskGroup>( 1103 [](tbb::task_group& /*outer*/) { 1104 volatile bool suppress_unreachable_code_warning = true; 1105 if (suppress_unreachable_code_warning) { 1106 throw int(); 1107 } 1108 }, 1109 [](tbb::task_group& outer) { 1110 try { 1111 outer.wait(); 1112 return tbb::task_group_status::complete; 1113 } catch(const int&) { 1114 return tbb::task_group_status::canceled; 1115 } 1116 } 1117 ); 1118 #endif 1119 } 1120 1121 } // namespace accept_task_group_context 1122 1123 //! Respect task_group_context passed from outside 1124 //! \brief \ref interface \ref requirement 1125 TEST_CASE("Respect task_group_context passed from outside") { 1126 #if TBB_PREVIEW_ISOLATED_TASK_GROUP 1127 accept_task_group_context::test<tbb::isolated_task_group>(); 1128 #endif 1129 } 1130 1131 #if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS 1132 //! The test for task_handle inside other task waiting with run 1133 //! \brief \ref requirement 1134 TEST_CASE("Task handle for scheduler bypass"){ 1135 tbb::task_group tg; 1136 std::atomic<bool> run {false}; 1137 1138 tg.run([&]{ 1139 return tg.defer([&]{ 1140 run = true; 1141 }); 1142 }); 1143 1144 tg.wait(); 1145 CHECK_MESSAGE(run == true, "task handle returned by user lambda (bypassed) should be run"); 1146 } 1147 1148 //! The test for task_handle inside other task waiting with run_and_wait 1149 //! \brief \ref requirement 1150 TEST_CASE("Task handle for scheduler bypass via run_and_wait"){ 1151 tbb::task_group tg; 1152 std::atomic<bool> run {false}; 1153 1154 tg.run_and_wait([&]{ 1155 return tg.defer([&]{ 1156 run = true; 1157 }); 1158 }); 1159 1160 CHECK_MESSAGE(run == true, "task handle returned by user lambda (bypassed) should be run"); 1161 } 1162 #endif //__TBB_PREVIEW_TASK_GROUP_EXTENSIONS 1163 1164 #if TBB_USE_EXCEPTIONS 1165 //As these tests are against behavior marked by spec as undefined, they can not be put into conformance tests 1166 1167 //! The test for error in scheduling empty task_handle 1168 //! \brief \ref requirement 1169 TEST_CASE("Empty task_handle cannot be scheduled" 1170 * doctest::should_fail() //Test needs to revised as implementation uses assertions instead of exceptions 1171 * doctest::skip() //skip the test for now, to not pollute the test log 1172 ){ 1173 tbb::task_group tg; 1174 1175 CHECK_THROWS_WITH_AS(tg.run(tbb::task_handle{}), "Attempt to schedule empty task_handle", std::runtime_error); 1176 } 1177 1178 //! The test for error in task_handle being scheduled into task_group different from one it was created from 1179 //! \brief \ref requirement 1180 TEST_CASE("task_handle cannot be scheduled into different task_group" 1181 * doctest::should_fail() //Test needs to revised as implementation uses assertions instead of exceptions 1182 * doctest::skip() //skip the test for now, to not pollute the test log 1183 ){ 1184 tbb::task_group tg; 1185 tbb::task_group tg1; 1186 1187 CHECK_THROWS_WITH_AS(tg1.run(tg.defer([]{})), "Attempt to schedule task_handle into different task_group", std::runtime_error); 1188 } 1189 1190 //! The test for error in task_handle being scheduled into task_group different from one it was created from 1191 //! \brief \ref requirement 1192 TEST_CASE("task_handle cannot be scheduled into other task_group of the same context" 1193 * doctest::should_fail() //Implementation is no there yet, as it is not clear that is the expected behavior 1194 * doctest::skip() //skip the test for now, to not pollute the test log 1195 ) 1196 { 1197 tbb::task_group_context ctx; 1198 1199 tbb::task_group tg(ctx); 1200 tbb::task_group tg1(ctx); 1201 1202 CHECK_NOTHROW(tg.run(tg.defer([]{}))); 1203 CHECK_THROWS_WITH_AS(tg1.run(tg.defer([]{})), "Attempt to schedule task_handle into different task_group", std::runtime_error); 1204 } 1205 1206 #endif // TBB_USE_EXCEPTIONS 1207 1208