1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "common/test.h" 18 19 #define __TBB_EXTRA_DEBUG 1 20 #include "common/concurrency_tracker.h" 21 #include "common/spin_barrier.h" 22 #include "common/utils.h" 23 #include "common/utils_report.h" 24 #include "common/utils_concurrency_limit.h" 25 26 #include "tbb/task_arena.h" 27 #include "tbb/task_scheduler_observer.h" 28 #include "tbb/enumerable_thread_specific.h" 29 #include "tbb/parallel_for.h" 30 #include "tbb/global_control.h" 31 #include "tbb/concurrent_set.h" 32 #include "tbb/spin_mutex.h" 33 #include "tbb/spin_rw_mutex.h" 34 #include "tbb/task_group.h" 35 36 #include <stdexcept> 37 #include <cstdlib> 38 #include <cstdio> 39 #include <vector> 40 #include <thread> 41 #include <atomic> 42 43 //#include "harness_fp.h" 44 45 //! \file test_task_arena.cpp 46 //! \brief Test for [scheduler.task_arena scheduler.task_scheduler_observer] specification 47 48 //--------------------------------------------------// 49 // Test that task_arena::initialize and task_arena::terminate work when doing nothing else. 50 /* maxthread is treated as the biggest possible concurrency level. */ 51 void InitializeAndTerminate( int maxthread ) { 52 for( int i=0; i<200; ++i ) { 53 switch( i&3 ) { 54 // Arena is created inactive, initialization is always explicit. Lazy initialization is covered by other test functions. 55 // Explicit initialization can either keep the original values or change those. 56 // Arena termination can be explicit or implicit (in the destructor). 57 // TODO: extend with concurrency level checks if such a method is added. 58 default: { 59 tbb::task_arena arena( std::rand() % maxthread + 1 ); 60 CHECK_MESSAGE(!arena.is_active(), "arena should not be active until initialized"); 61 arena.initialize(); 62 CHECK(arena.is_active()); 63 arena.terminate(); 64 CHECK_MESSAGE(!arena.is_active(), "arena should not be active; it was terminated"); 65 break; 66 } 67 case 0: { 68 tbb::task_arena arena( 1 ); 69 CHECK_MESSAGE(!arena.is_active(), "arena should not be active until initialized"); 70 arena.initialize( std::rand() % maxthread + 1 ); // change the parameters 71 CHECK(arena.is_active()); 72 break; 73 } 74 case 1: { 75 tbb::task_arena arena( tbb::task_arena::automatic ); 76 CHECK(!arena.is_active()); 77 arena.initialize(); 78 CHECK(arena.is_active()); 79 break; 80 } 81 case 2: { 82 tbb::task_arena arena; 83 CHECK_MESSAGE(!arena.is_active(), "arena should not be active until initialized"); 84 arena.initialize( std::rand() % maxthread + 1 ); 85 CHECK(arena.is_active()); 86 arena.terminate(); 87 CHECK_MESSAGE(!arena.is_active(), "arena should not be active; it was terminated"); 88 break; 89 } 90 } 91 } 92 } 93 94 //--------------------------------------------------// 95 96 // Definitions used in more than one test 97 typedef tbb::blocked_range<int> Range; 98 99 // slot_id value: -1 is reserved by current_slot(), -2 is set in on_scheduler_exit() below 100 static tbb::enumerable_thread_specific<int> local_id, old_id, slot_id(-3); 101 102 void ResetTLS() { 103 local_id.clear(); 104 old_id.clear(); 105 slot_id.clear(); 106 } 107 108 class ArenaObserver : public tbb::task_scheduler_observer { 109 int myId; // unique observer/arena id within a test 110 int myMaxConcurrency; // concurrency of the associated arena 111 int myNumReservedSlots; // reserved slots in the associated arena 112 void on_scheduler_entry( bool is_worker ) override { 113 int current_index = tbb::this_task_arena::current_thread_index(); 114 CHECK(current_index < (myMaxConcurrency > 1 ? myMaxConcurrency : 2)); 115 if (is_worker) { 116 CHECK(current_index >= myNumReservedSlots); 117 } 118 CHECK_MESSAGE(!old_id.local(), "double call to on_scheduler_entry"); 119 old_id.local() = local_id.local(); 120 CHECK_MESSAGE(old_id.local() != myId, "double entry to the same arena"); 121 local_id.local() = myId; 122 slot_id.local() = current_index; 123 } 124 void on_scheduler_exit( bool /*is_worker*/ ) override { 125 CHECK_MESSAGE(local_id.local() == myId, "nesting of arenas is broken"); 126 CHECK(slot_id.local() == tbb::this_task_arena::current_thread_index()); 127 slot_id.local() = -2; 128 local_id.local() = old_id.local(); 129 old_id.local() = 0; 130 } 131 public: 132 ArenaObserver(tbb::task_arena &a, int maxConcurrency, int numReservedSlots, int id) 133 : tbb::task_scheduler_observer(a) 134 , myId(id) 135 , myMaxConcurrency(maxConcurrency) 136 , myNumReservedSlots(numReservedSlots) { 137 CHECK(myId); 138 observe(true); 139 } 140 ~ArenaObserver () { 141 CHECK_MESSAGE(!old_id.local(), "inconsistent observer state"); 142 } 143 }; 144 145 struct IndexTrackingBody { // Must be used together with ArenaObserver 146 void operator() ( const Range& ) const { 147 CHECK(slot_id.local() == tbb::this_task_arena::current_thread_index()); 148 utils::doDummyWork(50000); 149 } 150 }; 151 152 struct AsynchronousWork { 153 utils::SpinBarrier &my_barrier; 154 bool my_is_blocking; 155 AsynchronousWork(utils::SpinBarrier &a_barrier, bool blocking = true) 156 : my_barrier(a_barrier), my_is_blocking(blocking) {} 157 void operator()() const { 158 CHECK_MESSAGE(local_id.local() != 0, "not in explicit arena"); 159 tbb::parallel_for(Range(0,500), IndexTrackingBody(), tbb::simple_partitioner()); 160 if(my_is_blocking) my_barrier.wait(); // must be asynchronous to an external thread 161 else my_barrier.signalNoWait(); 162 } 163 }; 164 165 //-----------------------------------------------------------------------------------------// 166 167 // Test that task_arenas might be created and used from multiple application threads. 168 // Also tests arena observers. The parameter p is the index of an app thread running this test. 169 void TestConcurrentArenasFunc(int idx) { 170 // A regression test for observer activation order: 171 // check that arena observer can be activated before local observer 172 struct LocalObserver : public tbb::task_scheduler_observer { 173 LocalObserver() : tbb::task_scheduler_observer() { observe(true); } 174 LocalObserver(tbb::task_arena& a) : tbb::task_scheduler_observer(a) { 175 observe(true); 176 } 177 }; 178 179 tbb::task_arena a1; 180 a1.initialize(1,0); 181 ArenaObserver o1(a1, 1, 0, idx*2+1); // the last argument is a "unique" observer/arena id for the test 182 CHECK_MESSAGE(o1.is_observing(), "Arena observer has not been activated"); 183 184 tbb::task_arena a2(2,1); 185 ArenaObserver o2(a2, 2, 1, idx*2+2); 186 CHECK_MESSAGE(o2.is_observing(), "Arena observer has not been activated"); 187 188 LocalObserver lo1; 189 CHECK_MESSAGE(lo1.is_observing(), "Local observer has not been activated"); 190 191 tbb::task_arena a3(1, 0); 192 LocalObserver lo2(a3); 193 CHECK_MESSAGE(lo2.is_observing(), "Local observer has not been activated"); 194 195 utils::SpinBarrier barrier(2); 196 AsynchronousWork work(barrier); 197 198 a1.enqueue(work); // put async work 199 barrier.wait(); 200 201 a2.enqueue(work); // another work 202 a2.execute(work); 203 204 a3.execute([] { 205 utils::doDummyWork(100); 206 }); 207 208 a1.debug_wait_until_empty(); 209 a2.debug_wait_until_empty(); 210 } 211 212 void TestConcurrentArenas(int p) { 213 // TODO REVAMP fix with global control 214 ResetTLS(); 215 utils::NativeParallelFor( p, &TestConcurrentArenasFunc ); 216 } 217 //--------------------------------------------------// 218 // Test multiple application threads working with a single arena at the same time. 219 class MultipleMastersPart1 : utils::NoAssign { 220 tbb::task_arena &my_a; 221 utils::SpinBarrier &my_b1, &my_b2; 222 public: 223 MultipleMastersPart1( tbb::task_arena &a, utils::SpinBarrier &b1, utils::SpinBarrier &b2) 224 : my_a(a), my_b1(b1), my_b2(b2) {} 225 void operator()(int) const { 226 my_a.execute(AsynchronousWork(my_b2, /*blocking=*/false)); 227 my_b1.wait(); 228 // A regression test for bugs 1954 & 1971 229 my_a.enqueue(AsynchronousWork(my_b2, /*blocking=*/false)); 230 } 231 }; 232 233 class MultipleMastersPart2 : utils::NoAssign { 234 tbb::task_arena &my_a; 235 utils::SpinBarrier &my_b; 236 public: 237 MultipleMastersPart2( tbb::task_arena &a, utils::SpinBarrier &b) : my_a(a), my_b(b) {} 238 void operator()(int) const { 239 my_a.execute(AsynchronousWork(my_b, /*blocking=*/false)); 240 } 241 }; 242 243 class MultipleMastersPart3 : utils::NoAssign { 244 tbb::task_arena &my_a; 245 utils::SpinBarrier &my_b; 246 using wait_context = tbb::detail::d1::wait_context; 247 248 struct Runner : NoAssign { 249 wait_context& myWait; 250 Runner(wait_context& w) : myWait(w) {} 251 void operator()() const { 252 utils::doDummyWork(10000); 253 myWait.release(); 254 } 255 }; 256 257 struct Waiter : NoAssign { 258 wait_context& myWait; 259 Waiter(wait_context& w) : myWait(w) {} 260 void operator()() const { 261 tbb::task_group_context ctx; 262 tbb::detail::d1::wait(myWait, ctx); 263 } 264 }; 265 266 public: 267 MultipleMastersPart3(tbb::task_arena &a, utils::SpinBarrier &b) 268 : my_a(a), my_b(b) {} 269 void operator()(int) const { 270 wait_context wait(0); 271 my_b.wait(); // increases chances for task_arena initialization contention 272 for( int i=0; i<100; ++i) { 273 wait.reserve(); 274 my_a.enqueue(Runner(wait)); 275 my_a.execute(Waiter(wait)); 276 } 277 my_b.wait(); 278 } 279 }; 280 281 void TestMultipleMasters(int p) { 282 { 283 ResetTLS(); 284 tbb::task_arena a(1,0); 285 a.initialize(); 286 ArenaObserver o(a, 1, 0, 1); 287 utils::SpinBarrier barrier1(p), barrier2(2*p+1); // each of p threads will submit two tasks signaling the barrier 288 NativeParallelFor( p, MultipleMastersPart1(a, barrier1, barrier2) ); 289 barrier2.wait(); 290 a.debug_wait_until_empty(); 291 } { 292 ResetTLS(); 293 tbb::task_arena a(2,1); 294 ArenaObserver o(a, 2, 1, 2); 295 utils::SpinBarrier barrier(p+2); 296 a.enqueue(AsynchronousWork(barrier, /*blocking=*/true)); // occupy the worker, a regression test for bug 1981 297 // TODO: buggy test. A worker threads need time to occupy the slot to prevent an external thread from taking an enqueue task. 298 utils::Sleep(10); 299 NativeParallelFor( p, MultipleMastersPart2(a, barrier) ); 300 barrier.wait(); 301 a.debug_wait_until_empty(); 302 } { 303 // Regression test for the bug 1981 part 2 (task_arena::execute() with wait_for_all for an enqueued task) 304 tbb::task_arena a(p,1); 305 utils::SpinBarrier barrier(p+1); // for external threads to avoid endless waiting at least in some runs 306 // "Oversubscribe" the arena by 1 external thread 307 NativeParallelFor( p+1, MultipleMastersPart3(a, barrier) ); 308 a.debug_wait_until_empty(); 309 } 310 } 311 312 //--------------------------------------------------// 313 // TODO: explain what TestArenaEntryConsistency does 314 #include <sstream> 315 #include <stdexcept> 316 #include "oneapi/tbb/detail/_exception.h" 317 #include "common/fp_control.h" 318 319 struct TestArenaEntryBody : FPModeContext { 320 std::atomic<int> &my_stage; // each execute increases it 321 std::stringstream my_id; 322 bool is_caught, is_expected; 323 enum { arenaFPMode = 1 }; 324 325 TestArenaEntryBody(std::atomic<int> &s, int idx, int i) // init thread-specific instance 326 : FPModeContext(idx+i) 327 , my_stage(s) 328 , is_caught(false) 329 #if TBB_USE_EXCEPTIONS 330 , is_expected( (idx&(1<<i)) != 0 ) 331 #else 332 , is_expected(false) 333 #endif 334 { 335 my_id << idx << ':' << i << '@'; 336 } 337 void operator()() { // inside task_arena::execute() 338 // synchronize with other stages 339 int stage = my_stage++; 340 int slot = tbb::this_task_arena::current_thread_index(); 341 CHECK(slot >= 0); 342 CHECK(slot <= 1); 343 // wait until the third stage is delegated and then starts on slot 0 344 while(my_stage < 2+slot) utils::yield(); 345 // deduct its entry type and put it into id, it helps to find source of a problem 346 my_id << (stage < 3 ? (tbb::this_task_arena::current_thread_index()? 347 "delegated_to_worker" : stage < 2? "direct" : "delegated_to_master") 348 : stage == 3? "nested_same_ctx" : "nested_alien_ctx"); 349 AssertFPMode(arenaFPMode); 350 if (is_expected) { 351 TBB_TEST_THROW(std::logic_error(my_id.str())); 352 } 353 // no code can be put here since exceptions can be thrown 354 } 355 void on_exception(const char *e) { // outside arena, in catch block 356 is_caught = true; 357 CHECK(my_id.str() == e); 358 assertFPMode(); 359 } 360 void after_execute() { // outside arena and catch block 361 CHECK(is_caught == is_expected); 362 assertFPMode(); 363 } 364 }; 365 366 class ForEachArenaEntryBody : utils::NoAssign { 367 tbb::task_arena &my_a; // expected task_arena(2,1) 368 std::atomic<int> &my_stage; // each execute increases it 369 int my_idx; 370 371 public: 372 ForEachArenaEntryBody(tbb::task_arena &a, std::atomic<int> &c) 373 : my_a(a), my_stage(c), my_idx(0) {} 374 375 void test(int idx) { 376 my_idx = idx; 377 my_stage = 0; 378 NativeParallelFor(3, *this); // test cross-arena calls 379 CHECK(my_stage == 3); 380 my_a.execute(*this); // test nested calls 381 CHECK(my_stage == 5); 382 } 383 384 // task_arena functor for nested tests 385 void operator()() const { 386 test_arena_entry(3); // in current task group context 387 tbb::parallel_for(4, 5, *this); // in different context 388 } 389 390 // NativeParallelFor & parallel_for functor 391 void operator()(int i) const { 392 test_arena_entry(i); 393 } 394 395 private: 396 void test_arena_entry(int i) const { 397 GetRoundingMode(); 398 TestArenaEntryBody scoped_functor(my_stage, my_idx, i); 399 GetRoundingMode(); 400 #if TBB_USE_EXCEPTIONS 401 try { 402 my_a.execute(scoped_functor); 403 } catch(std::logic_error &e) { 404 scoped_functor.on_exception(e.what()); 405 } catch(...) { CHECK_MESSAGE(false, "Unexpected exception type"); } 406 #else 407 my_a.execute(scoped_functor); 408 #endif 409 scoped_functor.after_execute(); 410 } 411 }; 412 413 void TestArenaEntryConsistency() { 414 tbb::task_arena a(2, 1); 415 std::atomic<int> c; 416 ForEachArenaEntryBody body(a, c); 417 418 FPModeContext fp_scope(TestArenaEntryBody::arenaFPMode); 419 a.initialize(); // capture FP settings to arena 420 fp_scope.setNextFPMode(); 421 422 for (int i = 0; i < 100; i++) // not less than 32 = 2^5 of entry types 423 body.test(i); 424 } 425 //-------------------------------------------------- 426 // Test that the requested degree of concurrency for task_arena is achieved in various conditions 427 class TestArenaConcurrencyBody : utils::NoAssign { 428 tbb::task_arena &my_a; 429 int my_max_concurrency; 430 int my_reserved_slots; 431 utils::SpinBarrier *my_barrier; 432 utils::SpinBarrier *my_worker_barrier; 433 public: 434 TestArenaConcurrencyBody( tbb::task_arena &a, int max_concurrency, int reserved_slots, utils::SpinBarrier *b = NULL, utils::SpinBarrier *wb = NULL ) 435 : my_a(a), my_max_concurrency(max_concurrency), my_reserved_slots(reserved_slots), my_barrier(b), my_worker_barrier(wb) {} 436 // NativeParallelFor's functor 437 void operator()( int ) const { 438 CHECK_MESSAGE( local_id.local() == 0, "TLS was not cleaned?" ); 439 local_id.local() = 1; 440 my_a.execute( *this ); 441 } 442 // Arena's functor 443 void operator()() const { 444 int idx = tbb::this_task_arena::current_thread_index(); 445 CHECK( idx < (my_max_concurrency > 1 ? my_max_concurrency : 2) ); 446 CHECK( my_a.max_concurrency() == tbb::this_task_arena::max_concurrency() ); 447 int max_arena_concurrency = tbb::this_task_arena::max_concurrency(); 448 CHECK( max_arena_concurrency == my_max_concurrency ); 449 if ( my_worker_barrier ) { 450 if ( local_id.local() == 1 ) { 451 // External thread in a reserved slot 452 CHECK_MESSAGE( idx < my_reserved_slots, "External threads are supposed to use only reserved slots in this test" ); 453 } else { 454 // Worker thread 455 CHECK( idx >= my_reserved_slots ); 456 my_worker_barrier->wait(); 457 } 458 } else if ( my_barrier ) 459 CHECK_MESSAGE( local_id.local() == 1, "Workers are not supposed to enter the arena in this test" ); 460 if ( my_barrier ) my_barrier->wait(); 461 else utils::Sleep( 1 ); 462 } 463 }; 464 465 void TestArenaConcurrency( int p, int reserved = 0, int step = 1) { 466 for (; reserved <= p; reserved += step) { 467 tbb::task_arena a( p, reserved ); 468 if (p - reserved < tbb::this_task_arena::max_concurrency()) { 469 // Check concurrency with worker & reserved external threads. 470 ResetTLS(); 471 utils::SpinBarrier b( p ); 472 utils::SpinBarrier wb( p-reserved ); 473 TestArenaConcurrencyBody test( a, p, reserved, &b, &wb ); 474 for ( int i = reserved; i < p; ++i ) // requests p-reserved worker threads 475 a.enqueue( test ); 476 if ( reserved==1 ) 477 test( 0 ); // calls execute() 478 else 479 utils::NativeParallelFor( reserved, test ); 480 a.debug_wait_until_empty(); 481 } { // Check if multiple external threads alone can achieve maximum concurrency. 482 ResetTLS(); 483 utils::SpinBarrier b( p ); 484 utils::NativeParallelFor( p, TestArenaConcurrencyBody( a, p, reserved, &b ) ); 485 a.debug_wait_until_empty(); 486 } { // Check oversubscription by external threads. 487 #if !_WIN32 || !_WIN64 488 // Some C++ implementations allocate 8MB stacks for std::thread on 32 bit platforms 489 // that makes impossible to create more than ~500 threads. 490 if ( !(sizeof(std::size_t) == 4 && p > 200) ) 491 #endif 492 #if TBB_TEST_LOW_WORKLOAD 493 if ( p <= 16 ) 494 #endif 495 { 496 ResetTLS(); 497 utils::NativeParallelFor(2 * p, TestArenaConcurrencyBody(a, p, reserved)); 498 a.debug_wait_until_empty(); 499 } 500 } 501 } 502 } 503 504 struct TestMandatoryConcurrencyObserver : public tbb::task_scheduler_observer { 505 utils::SpinBarrier& m_barrier; 506 507 TestMandatoryConcurrencyObserver(tbb::task_arena& a, utils::SpinBarrier& barrier) 508 : tbb::task_scheduler_observer(a), m_barrier(barrier) { 509 observe(true); 510 } 511 void on_scheduler_exit(bool worker) override { 512 if (worker) { 513 m_barrier.wait(); 514 } 515 } 516 }; 517 518 void TestMandatoryConcurrency() { 519 tbb::task_arena a(1); 520 a.execute([&a] { 521 int n_threads = 4; 522 utils::SpinBarrier exit_barrier(2); 523 TestMandatoryConcurrencyObserver observer(a, exit_barrier); 524 for (int j = 0; j < 5; ++j) { 525 utils::ExactConcurrencyLevel::check(1); 526 std::atomic<int> num_tasks{ 0 }, curr_tasks{ 0 }; 527 utils::SpinBarrier barrier(n_threads); 528 utils::NativeParallelFor(n_threads, [&](int) { 529 for (int i = 0; i < 5; ++i) { 530 barrier.wait(); 531 a.enqueue([&] { 532 CHECK(tbb::this_task_arena::max_concurrency() == 2); 533 CHECK(a.max_concurrency() == 2); 534 ++curr_tasks; 535 CHECK(curr_tasks == 1); 536 utils::doDummyWork(1000); 537 CHECK(curr_tasks == 1); 538 --curr_tasks; 539 ++num_tasks; 540 }); 541 barrier.wait(); 542 } 543 }); 544 do { 545 exit_barrier.wait(); 546 } while (num_tasks < n_threads * 5); 547 } 548 observer.observe(false); 549 }); 550 } 551 552 void TestConcurrentFunctionality(int min_thread_num = 1, int max_thread_num = 3) { 553 TestMandatoryConcurrency(); 554 InitializeAndTerminate(max_thread_num); 555 for (int p = min_thread_num; p <= max_thread_num; ++p) { 556 TestConcurrentArenas(p); 557 TestMultipleMasters(p); 558 TestArenaConcurrency(p); 559 } 560 } 561 562 //--------------------------------------------------// 563 // Test creation/initialization of a task_arena that references an existing arena (aka attach). 564 // This part of the test uses the knowledge of task_arena internals 565 566 struct TaskArenaValidator { 567 int my_slot_at_construction; 568 const tbb::task_arena& my_arena; 569 TaskArenaValidator( const tbb::task_arena& other ) 570 : my_slot_at_construction(tbb::this_task_arena::current_thread_index()) 571 , my_arena(other) 572 {} 573 // Inspect the internal state 574 int concurrency() { return my_arena.debug_max_concurrency(); } 575 int reserved_for_masters() { return my_arena.debug_reserved_slots(); } 576 577 // This method should be called in task_arena::execute() for a captured arena 578 // by the same thread that created the validator. 579 void operator()() { 580 CHECK_MESSAGE( tbb::this_task_arena::current_thread_index()==my_slot_at_construction, 581 "Current thread index has changed since the validator construction" ); 582 } 583 }; 584 585 void ValidateAttachedArena( tbb::task_arena& arena, bool expect_activated, 586 int expect_concurrency, int expect_masters ) { 587 CHECK_MESSAGE( arena.is_active()==expect_activated, "Unexpected activation state" ); 588 if( arena.is_active() ) { 589 TaskArenaValidator validator( arena ); 590 CHECK_MESSAGE( validator.concurrency()==expect_concurrency, "Unexpected arena size" ); 591 CHECK_MESSAGE( validator.reserved_for_masters()==expect_masters, "Unexpected # of reserved slots" ); 592 if ( tbb::this_task_arena::current_thread_index() != tbb::task_arena::not_initialized ) { 593 CHECK(tbb::this_task_arena::current_thread_index() >= 0); 594 // for threads already in arena, check that the thread index remains the same 595 arena.execute( validator ); 596 } else { // not_initialized 597 // Test the deprecated method 598 CHECK(tbb::this_task_arena::current_thread_index() == -1); 599 } 600 601 // Ideally, there should be a check for having the same internal arena object, 602 // but that object is not easily accessible for implicit arenas. 603 } 604 } 605 606 struct TestAttachBody : utils::NoAssign { 607 static thread_local int my_idx; // safe to modify and use within the NativeParallelFor functor 608 const int maxthread; 609 TestAttachBody( int max_thr ) : maxthread(max_thr) {} 610 611 // The functor body for NativeParallelFor 612 void operator()( int idx ) const { 613 my_idx = idx; 614 615 int default_threads = tbb::this_task_arena::max_concurrency(); 616 617 tbb::task_arena arena{tbb::task_arena::attach()}; 618 ValidateAttachedArena( arena, false, -1, -1 ); // Nothing yet to attach to 619 620 arena.terminate(); 621 ValidateAttachedArena( arena, false, -1, -1 ); 622 623 // attach to an auto-initialized arena 624 tbb::parallel_for(0, 1, [](int) {}); 625 626 tbb::task_arena arena2{tbb::task_arena::attach()}; 627 ValidateAttachedArena( arena2, true, default_threads, 1 ); 628 629 // attach to another task_arena 630 arena.initialize( maxthread, std::min(maxthread,idx) ); 631 arena.execute( *this ); 632 } 633 634 // The functor body for task_arena::execute above 635 void operator()() const { 636 tbb::task_arena arena2{tbb::task_arena::attach()}; 637 ValidateAttachedArena( arena2, true, maxthread, std::min(maxthread,my_idx) ); 638 } 639 640 // The functor body for tbb::parallel_for 641 void operator()( const Range& r ) const { 642 for( int i = r.begin(); i<r.end(); ++i ) { 643 tbb::task_arena arena2{tbb::task_arena::attach()}; 644 ValidateAttachedArena( arena2, true, tbb::this_task_arena::max_concurrency(), 1 ); 645 } 646 } 647 }; 648 649 thread_local int TestAttachBody::my_idx; 650 651 void TestAttach( int maxthread ) { 652 // Externally concurrent, but no concurrency within a thread 653 utils::NativeParallelFor( std::max(maxthread,4), TestAttachBody( maxthread ) ); 654 // Concurrent within the current arena; may also serve as a stress test 655 tbb::parallel_for( Range(0,10000*maxthread), TestAttachBody( maxthread ) ); 656 } 657 658 //--------------------------------------------------// 659 660 // Test that task_arena::enqueue does not tolerate a non-const functor. 661 // TODO: can it be reworked as SFINAE-based compile-time check? 662 struct TestFunctor { 663 void operator()() { CHECK_MESSAGE( false, "Non-const operator called" ); } 664 void operator()() const { /* library requires this overload only */ } 665 }; 666 667 void TestConstantFunctorRequirement() { 668 tbb::task_arena a; 669 TestFunctor tf; 670 a.enqueue( tf ); 671 } 672 673 //--------------------------------------------------// 674 675 #include "tbb/parallel_reduce.h" 676 #include "tbb/parallel_invoke.h" 677 678 // Test this_task_arena::isolate 679 namespace TestIsolatedExecuteNS { 680 template <typename NestedPartitioner> 681 class NestedParFor : utils::NoAssign { 682 public: 683 NestedParFor() {} 684 void operator()() const { 685 NestedPartitioner p; 686 tbb::parallel_for( 0, 10, utils::DummyBody( 10 ), p ); 687 } 688 }; 689 690 template <typename NestedPartitioner> 691 class ParForBody : utils::NoAssign { 692 bool myOuterIsolation; 693 tbb::enumerable_thread_specific<int> &myEts; 694 std::atomic<bool> &myIsStolen; 695 public: 696 ParForBody( bool outer_isolation, tbb::enumerable_thread_specific<int> &ets, std::atomic<bool> &is_stolen ) 697 : myOuterIsolation( outer_isolation ), myEts( ets ), myIsStolen( is_stolen ) {} 698 void operator()( int ) const { 699 int &e = myEts.local(); 700 if ( e++ > 0 ) myIsStolen = true; 701 if ( myOuterIsolation ) 702 NestedParFor<NestedPartitioner>()(); 703 else 704 tbb::this_task_arena::isolate( NestedParFor<NestedPartitioner>() ); 705 --e; 706 } 707 }; 708 709 template <typename OuterPartitioner, typename NestedPartitioner> 710 class OuterParFor : utils::NoAssign { 711 bool myOuterIsolation; 712 std::atomic<bool> &myIsStolen; 713 public: 714 OuterParFor( bool outer_isolation, std::atomic<bool> &is_stolen ) : myOuterIsolation( outer_isolation ), myIsStolen( is_stolen ) {} 715 void operator()() const { 716 tbb::enumerable_thread_specific<int> ets( 0 ); 717 OuterPartitioner p; 718 tbb::parallel_for( 0, 1000, ParForBody<NestedPartitioner>( myOuterIsolation, ets, myIsStolen ), p ); 719 } 720 }; 721 722 template <typename OuterPartitioner, typename NestedPartitioner> 723 void TwoLoopsTest( bool outer_isolation ) { 724 std::atomic<bool> is_stolen; 725 is_stolen = false; 726 const int max_repeats = 100; 727 if ( outer_isolation ) { 728 for ( int i = 0; i <= max_repeats; ++i ) { 729 tbb::this_task_arena::isolate( OuterParFor<OuterPartitioner, NestedPartitioner>( outer_isolation, is_stolen ) ); 730 if ( is_stolen ) break; 731 } 732 // TODO: was ASSERT_WARNING 733 if (!is_stolen) { 734 REPORT("Warning: isolate() should not block stealing on nested levels without isolation\n"); 735 } 736 } else { 737 for ( int i = 0; i <= max_repeats; ++i ) { 738 OuterParFor<OuterPartitioner, NestedPartitioner>( outer_isolation, is_stolen )(); 739 } 740 REQUIRE_MESSAGE( !is_stolen, "isolate() on nested levels should prevent stealing from outer leves" ); 741 } 742 } 743 744 void TwoLoopsTest( bool outer_isolation ) { 745 TwoLoopsTest<tbb::simple_partitioner, tbb::simple_partitioner>( outer_isolation ); 746 TwoLoopsTest<tbb::simple_partitioner, tbb::affinity_partitioner>( outer_isolation ); 747 TwoLoopsTest<tbb::affinity_partitioner, tbb::simple_partitioner>( outer_isolation ); 748 TwoLoopsTest<tbb::affinity_partitioner, tbb::affinity_partitioner>( outer_isolation ); 749 } 750 751 void TwoLoopsTest() { 752 TwoLoopsTest( true ); 753 TwoLoopsTest( false ); 754 } 755 //--------------------------------------------------// 756 class HeavyMixTestBody : utils::NoAssign { 757 tbb::enumerable_thread_specific<utils::FastRandom<>>& myRandom; 758 tbb::enumerable_thread_specific<int>& myIsolatedLevel; 759 int myNestedLevel; 760 761 template <typename Partitioner, typename Body> 762 static void RunTwoBodies( utils::FastRandom<>& rnd, const Body &body, Partitioner& p, tbb::task_group_context* ctx = NULL ) { 763 if ( rnd.get() % 2 ) { 764 if (ctx ) 765 tbb::parallel_for( 0, 2, body, p, *ctx ); 766 else 767 tbb::parallel_for( 0, 2, body, p ); 768 } else { 769 tbb::parallel_invoke( body, body ); 770 } 771 } 772 773 template <typename Partitioner> 774 class IsolatedBody : utils::NoAssign { 775 const HeavyMixTestBody &myHeavyMixTestBody; 776 Partitioner &myPartitioner; 777 public: 778 IsolatedBody( const HeavyMixTestBody &body, Partitioner &partitioner ) 779 : myHeavyMixTestBody( body ), myPartitioner( partitioner ) {} 780 void operator()() const { 781 RunTwoBodies( myHeavyMixTestBody.myRandom.local(), 782 HeavyMixTestBody( myHeavyMixTestBody.myRandom, myHeavyMixTestBody.myIsolatedLevel, 783 myHeavyMixTestBody.myNestedLevel + 1 ), 784 myPartitioner ); 785 } 786 }; 787 788 template <typename Partitioner> 789 void RunNextLevel( utils::FastRandom<>& rnd, int &isolated_level ) const { 790 Partitioner p; 791 switch ( rnd.get() % 2 ) { 792 case 0: { 793 // No features 794 tbb::task_group_context ctx; 795 RunTwoBodies( rnd, HeavyMixTestBody(myRandom, myIsolatedLevel, myNestedLevel + 1), p, &ctx ); 796 break; 797 } 798 case 1: { 799 // Isolation 800 int previous_isolation = isolated_level; 801 isolated_level = myNestedLevel; 802 tbb::this_task_arena::isolate( IsolatedBody<Partitioner>( *this, p ) ); 803 isolated_level = previous_isolation; 804 break; 805 } 806 } 807 } 808 public: 809 HeavyMixTestBody( tbb::enumerable_thread_specific<utils::FastRandom<>>& random, 810 tbb::enumerable_thread_specific<int>& isolated_level, int nested_level ) 811 : myRandom( random ), myIsolatedLevel( isolated_level ) 812 , myNestedLevel( nested_level ) {} 813 void operator()() const { 814 int &isolated_level = myIsolatedLevel.local(); 815 CHECK_FAST_MESSAGE( myNestedLevel > isolated_level, "The outer-level task should not be stolen on isolated level" ); 816 if ( myNestedLevel == 20 ) 817 return; 818 utils::FastRandom<>& rnd = myRandom.local(); 819 if ( rnd.get() % 2 == 1 ) { 820 RunNextLevel<tbb::auto_partitioner>( rnd, isolated_level ); 821 } else { 822 RunNextLevel<tbb::affinity_partitioner>( rnd, isolated_level ); 823 } 824 } 825 void operator()(int) const { 826 this->operator()(); 827 } 828 }; 829 830 struct RandomInitializer { 831 utils::FastRandom<> operator()() { 832 return utils::FastRandom<>( tbb::this_task_arena::current_thread_index() ); 833 } 834 }; 835 836 void HeavyMixTest() { 837 std::size_t num_threads = tbb::this_task_arena::max_concurrency() < 3 ? 3 : tbb::this_task_arena::max_concurrency(); 838 tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, num_threads); 839 840 RandomInitializer init_random; 841 tbb::enumerable_thread_specific<utils::FastRandom<>> random( init_random ); 842 tbb::enumerable_thread_specific<int> isolated_level( 0 ); 843 for ( int i = 0; i < 5; ++i ) { 844 HeavyMixTestBody b( random, isolated_level, 1 ); 845 b( 0 ); 846 } 847 } 848 849 //--------------------------------------------------// 850 #if TBB_USE_EXCEPTIONS 851 struct MyException {}; 852 struct IsolatedBodyThrowsException { 853 void operator()() const { 854 #if _MSC_VER && !__INTEL_COMPILER 855 // Workaround an unreachable code warning in task_arena_function. 856 volatile bool workaround = true; 857 if (workaround) 858 #endif 859 { 860 throw MyException(); 861 } 862 } 863 }; 864 struct ExceptionTestBody : utils::NoAssign { 865 tbb::enumerable_thread_specific<int>& myEts; 866 std::atomic<bool>& myIsStolen; 867 ExceptionTestBody( tbb::enumerable_thread_specific<int>& ets, std::atomic<bool>& is_stolen ) 868 : myEts( ets ), myIsStolen( is_stolen ) {} 869 void operator()( int i ) const { 870 try { 871 tbb::this_task_arena::isolate( IsolatedBodyThrowsException() ); 872 REQUIRE_MESSAGE( false, "The exception has been lost" ); 873 } 874 catch ( MyException ) {} 875 catch ( ... ) { 876 REQUIRE_MESSAGE( false, "Unexpected exception" ); 877 } 878 // Check that nested algorithms can steal outer-level tasks 879 int &e = myEts.local(); 880 if ( e++ > 0 ) myIsStolen = true; 881 // work imbalance increases chances for stealing 882 tbb::parallel_for( 0, 10+i, utils::DummyBody( 100 ) ); 883 --e; 884 } 885 }; 886 887 #endif /* TBB_USE_EXCEPTIONS */ 888 void ExceptionTest() { 889 #if TBB_USE_EXCEPTIONS 890 tbb::enumerable_thread_specific<int> ets; 891 std::atomic<bool> is_stolen; 892 is_stolen = false; 893 for ( ;; ) { 894 tbb::parallel_for( 0, 1000, ExceptionTestBody( ets, is_stolen ) ); 895 if ( is_stolen ) break; 896 } 897 REQUIRE_MESSAGE( is_stolen, "isolate should not affect non-isolated work" ); 898 #endif /* TBB_USE_EXCEPTIONS */ 899 } 900 901 struct NonConstBody { 902 unsigned int state; 903 void operator()() { 904 state ^= ~0u; 905 } 906 }; 907 908 void TestNonConstBody() { 909 NonConstBody body; 910 body.state = 0x6c97d5ed; 911 tbb::this_task_arena::isolate(body); 912 REQUIRE_MESSAGE(body.state == 0x93682a12, "The wrong state"); 913 } 914 915 // TODO: Consider tbb::task_group instead of explicit task API. 916 class TestEnqueueTask : public tbb::detail::d1::task { 917 using wait_context = tbb::detail::d1::wait_context; 918 919 tbb::enumerable_thread_specific<bool>& executed; 920 std::atomic<int>& completed; 921 922 public: 923 wait_context& waiter; 924 tbb::task_arena& arena; 925 static const int N = 100; 926 927 TestEnqueueTask(tbb::enumerable_thread_specific<bool>& exe, std::atomic<int>& c, wait_context& w, tbb::task_arena& a) 928 : executed(exe), completed(c), waiter(w), arena(a) {} 929 930 tbb::detail::d1::task* execute(tbb::detail::d1::execution_data&) override { 931 for (int i = 0; i < N; ++i) { 932 arena.enqueue([&]() { 933 executed.local() = true; 934 ++completed; 935 for (int j = 0; j < 100; j++) utils::yield(); 936 waiter.release(1); 937 }); 938 } 939 return nullptr; 940 } 941 tbb::detail::d1::task* cancel(tbb::detail::d1::execution_data&) override { return nullptr; } 942 }; 943 944 class TestEnqueueIsolateBody : utils::NoCopy { 945 tbb::enumerable_thread_specific<bool>& executed; 946 std::atomic<int>& completed; 947 tbb::task_arena& arena; 948 public: 949 static const int N = 100; 950 951 TestEnqueueIsolateBody(tbb::enumerable_thread_specific<bool>& exe, std::atomic<int>& c, tbb::task_arena& a) 952 : executed(exe), completed(c), arena(a) {} 953 void operator()() { 954 tbb::task_group_context ctx; 955 tbb::detail::d1::wait_context waiter(N); 956 957 TestEnqueueTask root(executed, completed, waiter, arena); 958 tbb::detail::d1::execute_and_wait(root, ctx, waiter, ctx); 959 } 960 }; 961 962 void TestEnqueue() { 963 tbb::enumerable_thread_specific<bool> executed(false); 964 std::atomic<int> completed; 965 tbb::task_arena arena{tbb::task_arena::attach()}; 966 967 // Check that the main thread can process enqueued tasks. 968 completed = 0; 969 TestEnqueueIsolateBody b1(executed, completed, arena); 970 b1(); 971 972 if (!executed.local()) { 973 REPORT("Warning: No one enqueued task has executed by the main thread.\n"); 974 } 975 976 executed.local() = false; 977 completed = 0; 978 const int N = 100; 979 // Create enqueued tasks out of isolation. 980 981 tbb::task_group_context ctx; 982 tbb::detail::d1::wait_context waiter(N); 983 for (int i = 0; i < N; ++i) { 984 arena.enqueue([&]() { 985 executed.local() = true; 986 ++completed; 987 utils::yield(); 988 waiter.release(1); 989 }); 990 } 991 TestEnqueueIsolateBody b2(executed, completed, arena); 992 tbb::this_task_arena::isolate(b2); 993 REQUIRE_MESSAGE(executed.local() == false, "An enqueued task was executed within isolate."); 994 995 tbb::detail::d1::wait(waiter, ctx); 996 // while (completed < TestEnqueueTask::N + N) utils::yield(); 997 } 998 } 999 1000 void TestIsolatedExecute() { 1001 // At least 3 threads (owner + 2 thieves) are required to reproduce a situation when the owner steals outer 1002 // level task on a nested level. If we have only one thief then it will execute outer level tasks first and 1003 // the owner will not have a possibility to steal outer level tasks. 1004 int platform_max_thread = tbb::this_task_arena::max_concurrency(); 1005 int num_threads = utils::min( platform_max_thread, 3 ); 1006 { 1007 // Too many threads require too many work to reproduce the stealing from outer level. 1008 tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, utils::max(num_threads, 7)); 1009 TestIsolatedExecuteNS::TwoLoopsTest(); 1010 TestIsolatedExecuteNS::HeavyMixTest(); 1011 TestIsolatedExecuteNS::ExceptionTest(); 1012 } 1013 tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, num_threads); 1014 TestIsolatedExecuteNS::HeavyMixTest(); 1015 TestIsolatedExecuteNS::TestNonConstBody(); 1016 TestIsolatedExecuteNS::TestEnqueue(); 1017 } 1018 1019 //-----------------------------------------------------------------------------------------// 1020 1021 class TestDelegatedSpawnWaitBody : utils::NoAssign { 1022 tbb::task_arena &my_a; 1023 utils::SpinBarrier &my_b1, &my_b2; 1024 public: 1025 TestDelegatedSpawnWaitBody( tbb::task_arena &a, utils::SpinBarrier &b1, utils::SpinBarrier &b2) 1026 : my_a(a), my_b1(b1), my_b2(b2) {} 1027 // NativeParallelFor's functor 1028 void operator()(int idx) const { 1029 if ( idx==0 ) { // thread 0 works in the arena, thread 1 waits for it (to prevent test hang) 1030 for (int i = 0; i < 2; ++i) { 1031 my_a.enqueue([this] { my_b1.wait(); }); // tasks to sync with workers 1032 } 1033 tbb::task_group tg; 1034 my_b1.wait(); // sync with the workers 1035 for( int i=0; i<100000; ++i) { 1036 my_a.execute([&tg] { tg.run([] {}); }); 1037 } 1038 my_a.execute([&tg] {tg.wait(); }); 1039 } 1040 1041 my_b2.wait(); // sync both threads 1042 } 1043 }; 1044 1045 void TestDelegatedSpawnWait() { 1046 if (tbb::this_task_arena::max_concurrency() < 3) { 1047 // The test requires at least 2 worker threads 1048 return; 1049 } 1050 // Regression test for a bug with missed wakeup notification from a delegated task 1051 tbb::task_arena a(2,0); 1052 a.initialize(); 1053 utils::SpinBarrier barrier1(3), barrier2(2); 1054 utils::NativeParallelFor( 2, TestDelegatedSpawnWaitBody(a, barrier1, barrier2) ); 1055 a.debug_wait_until_empty(); 1056 } 1057 1058 //-----------------------------------------------------------------------------------------// 1059 1060 class TestMultipleWaitsArenaWait : utils::NoAssign { 1061 using wait_context = tbb::detail::d1::wait_context; 1062 public: 1063 TestMultipleWaitsArenaWait( int idx, int bunch_size, int num_tasks, std::vector<wait_context*>& waiters, std::atomic<int>& processed, tbb::task_group_context& tgc ) 1064 : my_idx( idx ), my_bunch_size( bunch_size ), my_num_tasks(num_tasks), my_waiters( waiters ), my_processed( processed ), my_context(tgc) {} 1065 void operator()() const { 1066 ++my_processed; 1067 // Wait for all tasks 1068 if ( my_idx < my_num_tasks ) { 1069 tbb::detail::d1::wait(*my_waiters[my_idx], my_context); 1070 } 1071 // Signal waiting tasks 1072 if ( my_idx >= my_bunch_size ) { 1073 my_waiters[my_idx-my_bunch_size]->release(); 1074 } 1075 } 1076 private: 1077 int my_idx; 1078 int my_bunch_size; 1079 int my_num_tasks; 1080 std::vector<wait_context*>& my_waiters; 1081 std::atomic<int>& my_processed; 1082 tbb::task_group_context& my_context; 1083 }; 1084 1085 class TestMultipleWaitsThreadBody : utils::NoAssign { 1086 using wait_context = tbb::detail::d1::wait_context; 1087 public: 1088 TestMultipleWaitsThreadBody( int bunch_size, int num_tasks, tbb::task_arena& a, std::vector<wait_context*>& waiters, std::atomic<int>& processed, tbb::task_group_context& tgc ) 1089 : my_bunch_size( bunch_size ), my_num_tasks( num_tasks ), my_arena( a ), my_waiters( waiters ), my_processed( processed ), my_context(tgc) {} 1090 void operator()( int idx ) const { 1091 my_arena.execute( TestMultipleWaitsArenaWait( idx, my_bunch_size, my_num_tasks, my_waiters, my_processed, my_context ) ); 1092 --my_processed; 1093 } 1094 private: 1095 int my_bunch_size; 1096 int my_num_tasks; 1097 tbb::task_arena& my_arena; 1098 std::vector<wait_context*>& my_waiters; 1099 std::atomic<int>& my_processed; 1100 tbb::task_group_context& my_context; 1101 }; 1102 1103 void TestMultipleWaits( int num_threads, int num_bunches, int bunch_size ) { 1104 tbb::task_arena a( num_threads ); 1105 const int num_tasks = (num_bunches-1)*bunch_size; 1106 1107 tbb::task_group_context tgc; 1108 std::vector<tbb::detail::d1::wait_context*> waiters(num_tasks); 1109 for (auto& w : waiters) w = new tbb::detail::d1::wait_context(0); 1110 1111 std::atomic<int> processed(0); 1112 for ( int repeats = 0; repeats<10; ++repeats ) { 1113 int idx = 0; 1114 for ( int bunch = 0; bunch < num_bunches-1; ++bunch ) { 1115 // Sync with the previous bunch of waiters to prevent "false" nested dependicies (when a nested task waits for an outer task). 1116 while ( processed < bunch*bunch_size ) utils::yield(); 1117 // Run the bunch of threads/waiters that depend on the next bunch of threads/waiters. 1118 for ( int i = 0; i<bunch_size; ++i ) { 1119 waiters[idx]->reserve(); 1120 std::thread( TestMultipleWaitsThreadBody( bunch_size, num_tasks, a, waiters, processed, tgc ), idx++ ).detach(); 1121 } 1122 } 1123 // No sync because the threads of the last bunch do not call wait_for_all. 1124 // Run the last bunch of threads. 1125 for ( int i = 0; i<bunch_size; ++i ) 1126 std::thread( TestMultipleWaitsThreadBody( bunch_size, num_tasks, a, waiters, processed, tgc ), idx++ ).detach(); 1127 while ( processed ) utils::yield(); 1128 } 1129 for (auto w : waiters) delete w; 1130 } 1131 1132 void TestMultipleWaits() { 1133 // Limit the number of threads to prevent heavy oversubscription. 1134 #if TBB_TEST_LOW_WORKLOAD 1135 const int max_threads = std::min( 4, tbb::this_task_arena::max_concurrency() ); 1136 #else 1137 const int max_threads = std::min( 16, tbb::this_task_arena::max_concurrency() ); 1138 #endif 1139 1140 utils::FastRandom<> rnd(1234); 1141 for ( int threads = 1; threads <= max_threads; threads += utils::max( threads/2, 1 ) ) { 1142 for ( int i = 0; i<3; ++i ) { 1143 const int num_bunches = 3 + rnd.get()%3; 1144 const int bunch_size = max_threads + rnd.get()%max_threads; 1145 TestMultipleWaits( threads, num_bunches, bunch_size ); 1146 } 1147 } 1148 } 1149 1150 //--------------------------------------------------// 1151 1152 void TestSmallStackSize() { 1153 tbb::global_control gc(tbb::global_control::thread_stack_size, 1154 tbb::global_control::active_value(tbb::global_control::thread_stack_size) / 2 ); 1155 // The test produces the warning (not a error) if fails. So the test is run many times 1156 // to make the log annoying (to force to consider it as an error). 1157 for (int i = 0; i < 100; ++i) { 1158 tbb::task_arena a; 1159 a.initialize(); 1160 } 1161 } 1162 1163 //--------------------------------------------------// 1164 1165 namespace TestMoveSemanticsNS { 1166 struct TestFunctor { 1167 void operator()() const {}; 1168 }; 1169 1170 struct MoveOnlyFunctor : utils::MoveOnly, TestFunctor { 1171 MoveOnlyFunctor() : utils::MoveOnly() {}; 1172 MoveOnlyFunctor(MoveOnlyFunctor&& other) : utils::MoveOnly(std::move(other)) {}; 1173 }; 1174 1175 struct MovePreferableFunctor : utils::Movable, TestFunctor { 1176 MovePreferableFunctor() : utils::Movable() {}; 1177 MovePreferableFunctor(MovePreferableFunctor&& other) : utils::Movable( std::move(other) ) {}; 1178 MovePreferableFunctor(const MovePreferableFunctor& other) : utils::Movable(other) {}; 1179 }; 1180 1181 struct NoMoveNoCopyFunctor : utils::NoCopy, TestFunctor { 1182 NoMoveNoCopyFunctor() : utils::NoCopy() {}; 1183 // mv ctor is not allowed as cp ctor from parent NoCopy 1184 private: 1185 NoMoveNoCopyFunctor(NoMoveNoCopyFunctor&&); 1186 }; 1187 1188 void TestFunctors() { 1189 tbb::task_arena ta; 1190 MovePreferableFunctor mpf; 1191 // execute() doesn't have any copies or moves of arguments inside the impl 1192 ta.execute( NoMoveNoCopyFunctor() ); 1193 1194 ta.enqueue( MoveOnlyFunctor() ); 1195 ta.enqueue( mpf ); 1196 REQUIRE_MESSAGE(mpf.alive, "object was moved when was passed by lval"); 1197 mpf.Reset(); 1198 ta.enqueue( std::move(mpf) ); 1199 REQUIRE_MESSAGE(!mpf.alive, "object was copied when was passed by rval"); 1200 mpf.Reset(); 1201 } 1202 } 1203 1204 void TestMoveSemantics() { 1205 TestMoveSemanticsNS::TestFunctors(); 1206 } 1207 1208 //--------------------------------------------------// 1209 1210 #include <vector> 1211 1212 #include "common/state_trackable.h" 1213 1214 namespace TestReturnValueNS { 1215 struct noDefaultTag {}; 1216 class ReturnType : public StateTrackable<> { 1217 static const int SIZE = 42; 1218 std::vector<int> data; 1219 public: 1220 ReturnType(noDefaultTag) : StateTrackable<>(0) {} 1221 // Define copy constructor to test that it is never called 1222 ReturnType(const ReturnType& r) : StateTrackable<>(r), data(r.data) {} 1223 ReturnType(ReturnType&& r) : StateTrackable<>(std::move(r)), data(std::move(r.data)) {} 1224 1225 void fill() { 1226 for (int i = 0; i < SIZE; ++i) 1227 data.push_back(i); 1228 } 1229 void check() { 1230 REQUIRE(data.size() == unsigned(SIZE)); 1231 for (int i = 0; i < SIZE; ++i) 1232 REQUIRE(data[i] == i); 1233 StateTrackableCounters::counters_type& cnts = StateTrackableCounters::counters; 1234 REQUIRE(cnts[StateTrackableBase::DefaultInitialized] == 0); 1235 REQUIRE(cnts[StateTrackableBase::DirectInitialized] == 1); 1236 std::size_t copied = cnts[StateTrackableBase::CopyInitialized]; 1237 std::size_t moved = cnts[StateTrackableBase::MoveInitialized]; 1238 REQUIRE(cnts[StateTrackableBase::Destroyed] == copied + moved); 1239 // The number of copies/moves should not exceed 3 if copy elision takes a place: 1240 // function return, store to an internal storage, acquire internal storage. 1241 // For compilation, without copy elision, this number may be grown up to 7. 1242 REQUIRE((copied == 0 && moved <= 7)); 1243 WARN_MESSAGE(moved <= 3, 1244 "Warning: The number of copies/moves should not exceed 3 if copy elision takes a place." 1245 "Take an attention to this warning only if copy elision is enabled." 1246 ); 1247 } 1248 }; 1249 1250 template <typename R> 1251 R function() { 1252 noDefaultTag tag; 1253 R r(tag); 1254 r.fill(); 1255 return r; 1256 } 1257 1258 template <> 1259 void function<void>() {} 1260 1261 template <typename R> 1262 struct Functor { 1263 R operator()() const { 1264 return function<R>(); 1265 } 1266 }; 1267 1268 tbb::task_arena& arena() { 1269 static tbb::task_arena a; 1270 return a; 1271 } 1272 1273 template <typename F> 1274 void TestExecute(F &f) { 1275 StateTrackableCounters::reset(); 1276 ReturnType r{arena().execute(f)}; 1277 r.check(); 1278 } 1279 1280 template <typename F> 1281 void TestExecute(const F &f) { 1282 StateTrackableCounters::reset(); 1283 ReturnType r{arena().execute(f)}; 1284 r.check(); 1285 } 1286 template <typename F> 1287 void TestIsolate(F &f) { 1288 StateTrackableCounters::reset(); 1289 ReturnType r{tbb::this_task_arena::isolate(f)}; 1290 r.check(); 1291 } 1292 1293 template <typename F> 1294 void TestIsolate(const F &f) { 1295 StateTrackableCounters::reset(); 1296 ReturnType r{tbb::this_task_arena::isolate(f)}; 1297 r.check(); 1298 } 1299 1300 void Test() { 1301 TestExecute(Functor<ReturnType>()); 1302 Functor<ReturnType> f1; 1303 TestExecute(f1); 1304 TestExecute(function<ReturnType>); 1305 1306 arena().execute(Functor<void>()); 1307 Functor<void> f2; 1308 arena().execute(f2); 1309 arena().execute(function<void>); 1310 TestIsolate(Functor<ReturnType>()); 1311 TestIsolate(f1); 1312 TestIsolate(function<ReturnType>); 1313 tbb::this_task_arena::isolate(Functor<void>()); 1314 tbb::this_task_arena::isolate(f2); 1315 tbb::this_task_arena::isolate(function<void>); 1316 } 1317 } 1318 1319 void TestReturnValue() { 1320 TestReturnValueNS::Test(); 1321 } 1322 1323 //--------------------------------------------------// 1324 1325 // MyObserver checks if threads join to the same arena 1326 struct MyObserver: public tbb::task_scheduler_observer { 1327 tbb::enumerable_thread_specific<tbb::task_arena*>& my_tls; 1328 tbb::task_arena& my_arena; 1329 std::atomic<int>& my_failure_counter; 1330 std::atomic<int>& my_counter; 1331 utils::SpinBarrier& m_barrier; 1332 1333 MyObserver(tbb::task_arena& a, 1334 tbb::enumerable_thread_specific<tbb::task_arena*>& tls, 1335 std::atomic<int>& failure_counter, 1336 std::atomic<int>& counter, 1337 utils::SpinBarrier& barrier) 1338 : tbb::task_scheduler_observer(a), my_tls(tls), my_arena(a), 1339 my_failure_counter(failure_counter), my_counter(counter), m_barrier(barrier) { 1340 observe(true); 1341 } 1342 void on_scheduler_entry(bool worker) override { 1343 if (worker) { 1344 ++my_counter; 1345 tbb::task_arena*& cur_arena = my_tls.local(); 1346 if (cur_arena != 0 && cur_arena != &my_arena) { 1347 ++my_failure_counter; 1348 } 1349 cur_arena = &my_arena; 1350 m_barrier.wait(); 1351 } 1352 } 1353 void on_scheduler_exit(bool worker) override { 1354 if (worker) { 1355 m_barrier.wait(); // before wakeup 1356 m_barrier.wait(); // after wakeup 1357 } 1358 } 1359 }; 1360 1361 void TestArenaWorkersMigrationWithNumThreads(int n_threads = 0) { 1362 if (n_threads == 0) { 1363 n_threads = tbb::this_task_arena::max_concurrency(); 1364 } 1365 1366 const int max_n_arenas = 8; 1367 int n_arenas = 2; 1368 if(n_threads > 16) { 1369 n_arenas = max_n_arenas; 1370 } else if (n_threads > 8) { 1371 n_arenas = 4; 1372 } 1373 1374 int n_workers = n_threads - 1; 1375 n_workers = n_arenas * (n_workers / n_arenas); 1376 if (n_workers == 0) { 1377 return; 1378 } 1379 1380 n_threads = n_workers + 1; 1381 tbb::global_control control(tbb::global_control::max_allowed_parallelism, n_threads); 1382 1383 const int n_repetitions = 20; 1384 const int n_outer_repetitions = 100; 1385 std::multiset<float> failure_ratio; // for median calculating 1386 utils::SpinBarrier barrier(n_threads); 1387 utils::SpinBarrier worker_barrier(n_workers); 1388 MyObserver* observer[max_n_arenas]; 1389 std::vector<tbb::task_arena> arenas(n_arenas); 1390 std::atomic<int> failure_counter; 1391 std::atomic<int> counter; 1392 tbb::enumerable_thread_specific<tbb::task_arena*> tls; 1393 1394 for (int i = 0; i < n_arenas; ++i) { 1395 arenas[i].initialize(n_workers / n_arenas + 1); // +1 for master 1396 observer[i] = new MyObserver(arenas[i], tls, failure_counter, counter, barrier); 1397 } 1398 1399 int ii = 0; 1400 for (; ii < n_outer_repetitions; ++ii) { 1401 failure_counter = 0; 1402 counter = 0; 1403 1404 // Main code 1405 auto wakeup = [&arenas] { for (auto& a : arenas) a.enqueue([]{}); }; 1406 wakeup(); 1407 for (int j = 0; j < n_repetitions; ++j) { 1408 barrier.wait(); // entry 1409 barrier.wait(); // exit1 1410 wakeup(); 1411 barrier.wait(); // exit2 1412 } 1413 barrier.wait(); // entry 1414 barrier.wait(); // exit1 1415 barrier.wait(); // exit2 1416 1417 failure_ratio.insert(float(failure_counter) / counter); 1418 tls.clear(); 1419 // collect 3 elements in failure_ratio before calculating median 1420 if (ii > 1) { 1421 std::multiset<float>::iterator it = failure_ratio.begin(); 1422 std::advance(it, failure_ratio.size() / 2); 1423 if (*it < 0.02) 1424 break; 1425 } 1426 } 1427 for (int i = 0; i < n_arenas; ++i) { 1428 delete observer[i]; 1429 } 1430 // check if median is so big 1431 std::multiset<float>::iterator it = failure_ratio.begin(); 1432 std::advance(it, failure_ratio.size() / 2); 1433 // TODO: decrease constants 0.05 and 0.3 by setting ratio between n_threads and n_arenas 1434 if (*it > 0.05) { 1435 REPORT("Warning: So many cases when threads join to different arenas.\n"); 1436 REQUIRE_MESSAGE(*it <= 0.3, "A lot of cases when threads join to different arenas.\n"); 1437 } 1438 } 1439 1440 void TestArenaWorkersMigration() { 1441 TestArenaWorkersMigrationWithNumThreads(4); 1442 if (tbb::this_task_arena::max_concurrency() != 4) { 1443 TestArenaWorkersMigrationWithNumThreads(); 1444 } 1445 } 1446 1447 //--------------------------------------------------// 1448 void TestDefaultCreatedWorkersAmount() { 1449 int threads = tbb::this_task_arena::max_concurrency(); 1450 utils::NativeParallelFor(1, [threads](int idx) { 1451 REQUIRE_MESSAGE(idx == 0, "more than 1 thread is going to reset TLS"); 1452 utils::SpinBarrier barrier(threads); 1453 ResetTLS(); 1454 for (auto blocked : { false, true }) { 1455 for (int trail = 0; trail < (blocked ? 10 : 10000); ++trail) { 1456 tbb::parallel_for(0, threads, [threads, blocked, &barrier](int) { 1457 CHECK_FAST_MESSAGE(threads == tbb::this_task_arena::max_concurrency(), "concurrency level is not equal specified threadnum"); 1458 CHECK_FAST_MESSAGE(tbb::this_task_arena::current_thread_index() < tbb::this_task_arena::max_concurrency(), "amount of created threads is more than specified by default"); 1459 local_id.local() = 1; 1460 if (blocked) { 1461 // If there is more threads than expected, 'sleep' gives a chance to join unexpected threads. 1462 utils::Sleep(1); 1463 barrier.wait(); 1464 } 1465 }, tbb::simple_partitioner()); 1466 REQUIRE_MESSAGE(local_id.size() <= size_t(threads), "amount of created threads is not equal to default num"); 1467 if (blocked) { 1468 REQUIRE_MESSAGE(local_id.size() == size_t(threads), "amount of created threads is not equal to default num"); 1469 } 1470 } 1471 } 1472 }); 1473 } 1474 1475 void TestAbilityToCreateWorkers(int thread_num) { 1476 tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, thread_num); 1477 // Checks only some part of reserved-external threads amount: 1478 // 0 and 1 reserved threads are important cases but it is also needed 1479 // to collect some statistic data with other amount and to not consume 1480 // whole test sesion time checking each amount 1481 TestArenaConcurrency(thread_num - 1, 0, int(thread_num / 2.72)); 1482 TestArenaConcurrency(thread_num, 1, int(thread_num / 3.14)); 1483 } 1484 1485 void TestDefaultWorkersLimit() { 1486 TestDefaultCreatedWorkersAmount(); 1487 #if TBB_TEST_LOW_WORKLOAD 1488 TestAbilityToCreateWorkers(24); 1489 #else 1490 TestAbilityToCreateWorkers(256); 1491 #endif 1492 } 1493 1494 #if TBB_USE_EXCEPTIONS 1495 1496 void ExceptionInExecute() { 1497 std::size_t thread_number = utils::get_platform_max_threads(); 1498 int arena_concurrency = static_cast<int>(thread_number) / 2; 1499 tbb::task_arena test_arena(arena_concurrency, arena_concurrency); 1500 1501 std::atomic<int> canceled_task{}; 1502 1503 auto parallel_func = [&test_arena, &canceled_task] (std::size_t) { 1504 for (std::size_t i = 0; i < 1000; ++i) { 1505 try { 1506 test_arena.execute([] { 1507 volatile bool suppress_unreachable_code_warning = true; 1508 if (suppress_unreachable_code_warning) { 1509 throw -1; 1510 } 1511 }); 1512 FAIL("An exception should have thrown."); 1513 } catch (int) { 1514 ++canceled_task; 1515 } catch (...) { 1516 FAIL("Wrong type of exception."); 1517 } 1518 } 1519 }; 1520 1521 utils::NativeParallelFor(thread_number, parallel_func); 1522 CHECK(canceled_task == thread_number * 1000); 1523 } 1524 1525 #endif // TBB_USE_EXCEPTIONS 1526 1527 class simple_observer : public tbb::task_scheduler_observer { 1528 static std::atomic<int> idx_counter; 1529 int my_idx; 1530 int myMaxConcurrency; // concurrency of the associated arena 1531 int myNumReservedSlots; // reserved slots in the associated arena 1532 void on_scheduler_entry( bool is_worker ) override { 1533 int current_index = tbb::this_task_arena::current_thread_index(); 1534 CHECK(current_index < (myMaxConcurrency > 1 ? myMaxConcurrency : 2)); 1535 if (is_worker) { 1536 CHECK(current_index >= myNumReservedSlots); 1537 } 1538 } 1539 void on_scheduler_exit( bool /*is_worker*/ ) override 1540 {} 1541 public: 1542 simple_observer(tbb::task_arena &a, int maxConcurrency, int numReservedSlots) 1543 : tbb::task_scheduler_observer(a), my_idx(idx_counter++) 1544 , myMaxConcurrency(maxConcurrency) 1545 , myNumReservedSlots(numReservedSlots) { 1546 observe(true); 1547 } 1548 1549 friend bool operator<(const simple_observer& lhs, const simple_observer& rhs) { 1550 return lhs.my_idx < rhs.my_idx; 1551 } 1552 }; 1553 1554 std::atomic<int> simple_observer::idx_counter{}; 1555 1556 struct arena_handler { 1557 enum arena_status { 1558 alive, 1559 deleting, 1560 deleted 1561 }; 1562 1563 tbb::task_arena* arena; 1564 1565 std::atomic<arena_status> status{alive}; 1566 tbb::spin_rw_mutex arena_in_use{}; 1567 1568 tbb::concurrent_set<simple_observer> observers; 1569 1570 arena_handler(tbb::task_arena* ptr) : arena(ptr) 1571 {} 1572 1573 friend bool operator<(const arena_handler& lhs, const arena_handler& rhs) { 1574 return lhs.arena < rhs.arena; 1575 } 1576 }; 1577 1578 // TODO: Add observer operations 1579 void StressTestMixFunctionality() { 1580 enum operation_type { 1581 create_arena, 1582 delete_arena, 1583 attach_observer, 1584 detach_observer, 1585 arena_execute, 1586 enqueue_task, 1587 last_operation_marker 1588 }; 1589 1590 std::size_t operations_number = last_operation_marker; 1591 std::size_t thread_number = utils::get_platform_max_threads(); 1592 utils::FastRandom<> operation_rnd(42); 1593 tbb::spin_mutex random_operation_guard; 1594 1595 auto get_random_operation = [&operation_rnd, &random_operation_guard, operations_number] () { 1596 tbb::spin_mutex::scoped_lock lock(random_operation_guard); 1597 return static_cast<operation_type>(operation_rnd.get() % operations_number); 1598 }; 1599 1600 utils::FastRandom<> arena_rnd(42); 1601 tbb::spin_mutex random_arena_guard; 1602 auto get_random_arena = [&arena_rnd, &random_arena_guard] () { 1603 tbb::spin_mutex::scoped_lock lock(random_arena_guard); 1604 return arena_rnd.get(); 1605 }; 1606 1607 tbb::concurrent_set<arena_handler> arenas_pool; 1608 1609 std::vector<std::thread> thread_pool; 1610 1611 utils::SpinBarrier thread_barrier(thread_number); 1612 std::size_t max_operations = 20000; 1613 std::atomic<std::size_t> curr_operation{}; 1614 1615 auto find_arena = [&arenas_pool](tbb::spin_rw_mutex::scoped_lock& lock) -> decltype(arenas_pool.begin()) { 1616 for (auto curr_arena = arenas_pool.begin(); curr_arena != arenas_pool.end(); ++curr_arena) { 1617 if (lock.try_acquire(curr_arena->arena_in_use, /*writer*/ false)) { 1618 if (curr_arena->status == arena_handler::alive) { 1619 return curr_arena; 1620 } 1621 else { 1622 lock.release(); 1623 } 1624 } 1625 } 1626 return arenas_pool.end(); 1627 }; 1628 1629 auto thread_func = [&] () { 1630 arenas_pool.emplace(new tbb::task_arena()); 1631 thread_barrier.wait(); 1632 while (curr_operation++ < max_operations) { 1633 switch (get_random_operation()) { 1634 case create_arena : 1635 { 1636 arenas_pool.emplace(new tbb::task_arena()); 1637 break; 1638 } 1639 case delete_arena : 1640 { 1641 auto curr_arena = arenas_pool.begin(); 1642 for (; curr_arena != arenas_pool.end(); ++curr_arena) { 1643 arena_handler::arena_status curr_status = arena_handler::alive; 1644 if (curr_arena->status.compare_exchange_strong(curr_status, arena_handler::deleting)) { 1645 break; 1646 } 1647 } 1648 1649 if (curr_arena == arenas_pool.end()) break; 1650 1651 tbb::spin_rw_mutex::scoped_lock lock(curr_arena->arena_in_use, /*writer*/ true); 1652 1653 delete curr_arena->arena; 1654 curr_arena->status.store(arena_handler::deleted); 1655 1656 break; 1657 } 1658 case attach_observer : 1659 { 1660 tbb::spin_rw_mutex::scoped_lock lock{}; 1661 1662 auto curr_arena = find_arena(lock); 1663 if (curr_arena != arenas_pool.end()) { 1664 curr_arena->observers.emplace(*curr_arena->arena, thread_number, 1); 1665 } 1666 break; 1667 } 1668 case detach_observer: 1669 { 1670 auto arena_number = get_random_arena() % arenas_pool.size(); 1671 auto curr_arena = arenas_pool.begin(); 1672 std::advance(curr_arena, arena_number); 1673 1674 for (auto it = curr_arena->observers.begin(); it != curr_arena->observers.end(); ++it) { 1675 if (it->is_observing()) { 1676 it->observe(false); 1677 break; 1678 } 1679 } 1680 1681 break; 1682 } 1683 case arena_execute: 1684 { 1685 tbb::spin_rw_mutex::scoped_lock lock{}; 1686 auto curr_arena = find_arena(lock); 1687 1688 if (curr_arena != arenas_pool.end()) { 1689 curr_arena->arena->execute([]() { 1690 static tbb::affinity_partitioner aff; 1691 tbb::parallel_for(0, 10000, utils::DummyBody(10), tbb::auto_partitioner{}); 1692 tbb::parallel_for(0, 10000, utils::DummyBody(10), aff); 1693 }); 1694 } 1695 1696 break; 1697 } 1698 case enqueue_task: 1699 { 1700 tbb::spin_rw_mutex::scoped_lock lock{}; 1701 auto curr_arena = find_arena(lock); 1702 1703 if (curr_arena != arenas_pool.end()) { 1704 curr_arena->arena->enqueue([] { utils::doDummyWork(1000); }); 1705 } 1706 1707 break; 1708 } 1709 case last_operation_marker : 1710 break; 1711 } 1712 } 1713 }; 1714 1715 for (std::size_t i = 0; i < thread_number - 1; ++i) { 1716 thread_pool.emplace_back(thread_func); 1717 } 1718 1719 thread_func(); 1720 1721 for (std::size_t i = 0; i < thread_number - 1; ++i) { 1722 if (thread_pool[i].joinable()) thread_pool[i].join(); 1723 } 1724 1725 for (auto& handler : arenas_pool) { 1726 if (handler.status != arena_handler::deleted) delete handler.arena; 1727 } 1728 } 1729 1730 struct enqueue_test_helper { 1731 enqueue_test_helper(tbb::task_arena& arena, tbb::enumerable_thread_specific<bool>& ets , std::atomic<std::size_t>& task_counter) 1732 : my_arena(arena), my_ets(ets), my_task_counter(task_counter) 1733 {} 1734 1735 enqueue_test_helper(const enqueue_test_helper& ef) : my_arena(ef.my_arena), my_ets(ef.my_ets), my_task_counter(ef.my_task_counter) 1736 {} 1737 1738 void operator() () const { 1739 CHECK(my_ets.local()); 1740 if (my_task_counter++ < 100000) my_arena.enqueue(enqueue_test_helper(my_arena, my_ets, my_task_counter)); 1741 utils::yield(); 1742 } 1743 1744 tbb::task_arena& my_arena; 1745 tbb::enumerable_thread_specific<bool>& my_ets; 1746 std::atomic<std::size_t>& my_task_counter; 1747 }; 1748 1749 //--------------------------------------------------// 1750 //! Test for task arena in concurrent cases 1751 //! \brief \ref requirement 1752 TEST_CASE("Test for concurrent functionality") { 1753 TestConcurrentFunctionality(); 1754 } 1755 1756 //! Test for arena entry consistency 1757 //! \brief \ref requirement \ref error_guessing 1758 TEST_CASE("Test for task arena entry consistency") { 1759 TestArenaEntryConsistency(); 1760 } 1761 1762 //! Test for task arena attach functionality 1763 //! \brief \ref requirement \ref interface 1764 TEST_CASE("Test for the attach functionality") { 1765 TestAttach(4); 1766 } 1767 1768 //! Test for constant functor requirements 1769 //! \brief \ref requirement \ref interface 1770 TEST_CASE("Test for constant functor requirement") { 1771 TestConstantFunctorRequirement(); 1772 } 1773 1774 //! Test for move semantics support 1775 //! \brief \ref requirement \ref interface 1776 TEST_CASE("Move semantics support") { 1777 TestMoveSemantics(); 1778 } 1779 1780 //! Test for different return value types 1781 //! \brief \ref requirement \ref interface 1782 TEST_CASE("Return value test") { 1783 TestReturnValue(); 1784 } 1785 1786 //! Test for delegated task spawn in case of unsuccessful slot attach 1787 //! \brief \ref error_guessing 1788 TEST_CASE("Delegated spawn wait") { 1789 TestDelegatedSpawnWait(); 1790 } 1791 1792 //! Test task arena isolation functionality 1793 //! \brief \ref requirement \ref interface 1794 TEST_CASE("Isolated execute") { 1795 // Isolation tests cases is valid only for more then 2 threads 1796 if (tbb::this_task_arena::max_concurrency() > 2) { 1797 TestIsolatedExecute(); 1798 } 1799 } 1800 1801 //! Test for TBB Workers creation limits 1802 //! \brief \ref requirement 1803 TEST_CASE("Default workers limit") { 1804 TestDefaultWorkersLimit(); 1805 } 1806 1807 //! Test for workers migration between arenas 1808 //! \brief \ref error_guessing \ref stress 1809 TEST_CASE("Arena workers migration") { 1810 TestArenaWorkersMigration(); 1811 } 1812 1813 //! Test for multiple waits, threads should not block each other 1814 //! \brief \ref requirement 1815 TEST_CASE("Multiple waits") { 1816 TestMultipleWaits(); 1817 } 1818 1819 //! Test for small stack size settings and arena initialization 1820 //! \brief \ref error_guessing 1821 TEST_CASE("Small stack size") { 1822 TestSmallStackSize(); 1823 } 1824 1825 #if TBB_USE_EXCEPTIONS 1826 //! \brief \ref requirement \ref stress 1827 TEST_CASE("Test for exceptions during execute.") { 1828 ExceptionInExecute(); 1829 } 1830 1831 //! \brief \ref error_guessing 1832 TEST_CASE("Exception thrown during tbb::task_arena::execute call") { 1833 struct throwing_obj { 1834 throwing_obj() { 1835 volatile bool flag = true; 1836 if (flag) throw std::exception{}; 1837 } 1838 throwing_obj(const throwing_obj&) = default; 1839 ~throwing_obj() { FAIL("An destructor was called."); } 1840 }; 1841 1842 tbb::task_arena arena; 1843 1844 REQUIRE_THROWS_AS( [&] { 1845 arena.execute([] { 1846 return throwing_obj{}; 1847 }); 1848 }(), std::exception ); 1849 } 1850 #endif // TBB_USE_EXCEPTIONS 1851 1852 //! \brief \ref stress 1853 TEST_CASE("Stress test with mixing functionality") { 1854 StressTestMixFunctionality(); 1855 } 1856 1857 //! \brief \ref stress 1858 TEST_CASE("Workers oversubscription") { 1859 std::size_t num_threads = utils::get_platform_max_threads(); 1860 tbb::enumerable_thread_specific<bool> ets; 1861 tbb::global_control gl(tbb::global_control::max_allowed_parallelism, num_threads * 2); 1862 tbb::task_arena arena(static_cast<int>(num_threads) * 2); 1863 1864 utils::SpinBarrier barrier(num_threads * 2); 1865 1866 arena.execute([&] { 1867 tbb::parallel_for(std::size_t(0), num_threads * 2, 1868 [&] (const std::size_t&) { 1869 ets.local() = true; 1870 barrier.wait(); 1871 } 1872 ); 1873 }); 1874 1875 utils::yield(); 1876 1877 std::atomic<std::size_t> task_counter{0}; 1878 for (std::size_t i = 0; i < num_threads / 4 + 1; ++i) { 1879 arena.enqueue(enqueue_test_helper(arena, ets, task_counter)); 1880 } 1881 1882 while (task_counter < 100000) utils::yield(); 1883 1884 arena.execute([&] { 1885 tbb::parallel_for(std::size_t(0), num_threads * 2, 1886 [&] (const std::size_t&) { 1887 CHECK(ets.local()); 1888 barrier.wait(); 1889 } 1890 ); 1891 }); 1892 } 1893 1894 #if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS 1895 //! Basic test for arena::enqueue with task handle 1896 //! \brief \ref interface \ref requirement 1897 TEST_CASE("enqueue task_handle") { 1898 tbb::task_arena arena; 1899 tbb::task_group tg; 1900 1901 std::atomic<bool> run{false}; 1902 1903 auto task_handle = tg.defer([&]{ run = true; }); 1904 1905 arena.enqueue(std::move(task_handle)); 1906 tg.wait(); 1907 1908 CHECK(run == true); 1909 } 1910 1911 //! Basic test for this_task_arena::enqueue with task handle 1912 //! \brief \ref interface \ref requirement 1913 TEST_CASE("this_task_arena::enqueue task_handle") { 1914 tbb::task_arena arena; 1915 tbb::task_group tg; 1916 1917 std::atomic<bool> run{false}; 1918 1919 arena.execute([&]{ 1920 auto task_handle = tg.defer([&]{ run = true; }); 1921 1922 tbb::this_task_arena::enqueue(std::move(task_handle)); 1923 }); 1924 1925 tg.wait(); 1926 1927 CHECK(run == true); 1928 } 1929 1930 //! Basic test for this_task_arena::enqueue with functor 1931 //! \brief \ref interface \ref requirement 1932 TEST_CASE("this_task_arena::enqueue function") { 1933 tbb::task_arena arena; 1934 tbb::task_group tg; 1935 1936 std::atomic<bool> run{false}; 1937 //block the task_group to wait on it 1938 auto task_handle = tg.defer([]{}); 1939 1940 arena.execute([&]{ 1941 tbb::this_task_arena::enqueue([&]{ 1942 run = true; 1943 //release the task_group 1944 task_handle = tbb::task_handle{}; 1945 }); 1946 }); 1947 1948 tg.wait(); 1949 1950 CHECK(run == true); 1951 } 1952 1953 #if TBB_USE_EXCEPTIONS 1954 //! Basic test for exceptions in task_arena::enqueue with task_handle 1955 //! \brief \ref interface \ref requirement 1956 TEST_CASE("task_arena::enqueue(task_handle) exception propagation"){ 1957 tbb::task_group tg; 1958 tbb::task_arena arena; 1959 1960 tbb::task_handle h = tg.defer([&]{ 1961 volatile bool suppress_unreachable_code_warning = true; 1962 if (suppress_unreachable_code_warning) { 1963 throw std::runtime_error{ "" }; 1964 } 1965 }); 1966 1967 arena.enqueue(std::move(h)); 1968 1969 CHECK_THROWS_AS(tg.wait(), std::runtime_error); 1970 } 1971 1972 //! Basic test for exceptions in this_task_arena::enqueue with task_handle 1973 //! \brief \ref interface \ref requirement 1974 TEST_CASE("this_task_arena::enqueue(task_handle) exception propagation"){ 1975 tbb::task_group tg; 1976 1977 tbb::task_handle h = tg.defer([&]{ 1978 volatile bool suppress_unreachable_code_warning = true; 1979 if (suppress_unreachable_code_warning) { 1980 throw std::runtime_error{ "" }; 1981 } 1982 }); 1983 1984 tbb::this_task_arena::enqueue(std::move(h)); 1985 1986 CHECK_THROWS_AS(tg.wait(), std::runtime_error); 1987 } 1988 1989 //! The test for error in scheduling empty task_handle 1990 //! \brief \ref requirement 1991 TEST_CASE("Empty task_handle cannot be scheduled"){ 1992 tbb::task_arena ta; 1993 1994 CHECK_THROWS_WITH_AS(ta.enqueue(tbb::task_handle{}), "Attempt to schedule empty task_handle", std::runtime_error); 1995 CHECK_THROWS_WITH_AS(tbb::this_task_arena::enqueue(tbb::task_handle{}), "Attempt to schedule empty task_handle", std::runtime_error); 1996 } 1997 #endif // TBB_USE_EXCEPTIONS 1998 1999 //! Basic test for is_inside_task in task_group 2000 //! \brief \ref interface \ref requirement 2001 TEST_CASE("is_inside_task in task_group"){ 2002 CHECK( false == tbb::is_inside_task()); 2003 2004 tbb::task_group tg; 2005 tg.run_and_wait([&]{ 2006 CHECK( true == tbb::is_inside_task()); 2007 }); 2008 } 2009 2010 //! Basic test for is_inside_task in arena::execute 2011 //! \brief \ref interface \ref requirement 2012 TEST_CASE("is_inside_task in arena::execute"){ 2013 CHECK( false == tbb::is_inside_task()); 2014 2015 tbb::task_arena arena; 2016 2017 arena.execute([&]{ 2018 // The execute method is processed outside of any task 2019 CHECK( false == tbb::is_inside_task()); 2020 }); 2021 } 2022 2023 //! The test for is_inside_task in arena::execute when inside other task 2024 //! \brief \ref error_guessing 2025 TEST_CASE("is_inside_task in arena::execute") { 2026 CHECK(false == tbb::is_inside_task()); 2027 2028 tbb::task_arena arena; 2029 tbb::task_group tg; 2030 tg.run_and_wait([&] { 2031 arena.execute([&] { 2032 // The execute method is processed outside of any task 2033 CHECK(false == tbb::is_inside_task()); 2034 }); 2035 }); 2036 } 2037 #endif //__TBB_PREVIEW_TASK_GROUP_EXTENSIONS 2038