1 /* 2 Copyright (c) 2005-2023 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "common/test.h" 18 19 #define __TBB_EXTRA_DEBUG 1 20 #include "common/concurrency_tracker.h" 21 #include "common/cpu_usertime.h" 22 #include "common/spin_barrier.h" 23 #include "common/utils.h" 24 #include "common/utils_report.h" 25 #include "common/utils_concurrency_limit.h" 26 27 #include "tbb/task_arena.h" 28 #include "tbb/task_scheduler_observer.h" 29 #include "tbb/enumerable_thread_specific.h" 30 #include "tbb/parallel_for.h" 31 #include "tbb/global_control.h" 32 #include "tbb/concurrent_set.h" 33 #include "tbb/spin_mutex.h" 34 #include "tbb/spin_rw_mutex.h" 35 #include "tbb/task_group.h" 36 37 #include <atomic> 38 #include <condition_variable> 39 #include <cstdio> 40 #include <cstdlib> 41 #include <stdexcept> 42 #include <thread> 43 #include <vector> 44 45 //#include "harness_fp.h" 46 47 //! \file test_task_arena.cpp 48 //! \brief Test for [scheduler.task_arena scheduler.task_scheduler_observer] specification 49 50 //--------------------------------------------------// 51 // Test that task_arena::initialize and task_arena::terminate work when doing nothing else. 52 /* maxthread is treated as the biggest possible concurrency level. */ 53 void InitializeAndTerminate( int maxthread ) { 54 for( int i=0; i<200; ++i ) { 55 switch( i&3 ) { 56 // Arena is created inactive, initialization is always explicit. Lazy initialization is covered by other test functions. 57 // Explicit initialization can either keep the original values or change those. 58 // Arena termination can be explicit or implicit (in the destructor). 59 // TODO: extend with concurrency level checks if such a method is added. 60 default: { 61 tbb::task_arena arena( std::rand() % maxthread + 1 ); 62 CHECK_MESSAGE(!arena.is_active(), "arena should not be active until initialized"); 63 arena.initialize(); 64 CHECK(arena.is_active()); 65 arena.terminate(); 66 CHECK_MESSAGE(!arena.is_active(), "arena should not be active; it was terminated"); 67 break; 68 } 69 case 0: { 70 tbb::task_arena arena( 1 ); 71 CHECK_MESSAGE(!arena.is_active(), "arena should not be active until initialized"); 72 arena.initialize( std::rand() % maxthread + 1 ); // change the parameters 73 CHECK(arena.is_active()); 74 break; 75 } 76 case 1: { 77 tbb::task_arena arena( tbb::task_arena::automatic ); 78 CHECK(!arena.is_active()); 79 arena.initialize(); 80 CHECK(arena.is_active()); 81 break; 82 } 83 case 2: { 84 tbb::task_arena arena; 85 CHECK_MESSAGE(!arena.is_active(), "arena should not be active until initialized"); 86 arena.initialize( std::rand() % maxthread + 1 ); 87 CHECK(arena.is_active()); 88 arena.terminate(); 89 CHECK_MESSAGE(!arena.is_active(), "arena should not be active; it was terminated"); 90 break; 91 } 92 } 93 } 94 } 95 96 //--------------------------------------------------// 97 98 // Definitions used in more than one test 99 typedef tbb::blocked_range<int> Range; 100 101 // slot_id value: -1 is reserved by current_slot(), -2 is set in on_scheduler_exit() below 102 static tbb::enumerable_thread_specific<int> local_id, old_id, slot_id(-3); 103 104 void ResetTLS() { 105 local_id.clear(); 106 old_id.clear(); 107 slot_id.clear(); 108 } 109 110 class ArenaObserver : public tbb::task_scheduler_observer { 111 int myId; // unique observer/arena id within a test 112 int myMaxConcurrency; // concurrency of the associated arena 113 int myNumReservedSlots; // reserved slots in the associated arena 114 void on_scheduler_entry( bool is_worker ) override { 115 int current_index = tbb::this_task_arena::current_thread_index(); 116 CHECK(current_index < (myMaxConcurrency > 1 ? myMaxConcurrency : 2)); 117 if (is_worker) { 118 CHECK(current_index >= myNumReservedSlots); 119 } 120 CHECK_MESSAGE(!old_id.local(), "double call to on_scheduler_entry"); 121 old_id.local() = local_id.local(); 122 CHECK_MESSAGE(old_id.local() != myId, "double entry to the same arena"); 123 local_id.local() = myId; 124 slot_id.local() = current_index; 125 } 126 void on_scheduler_exit( bool /*is_worker*/ ) override { 127 CHECK_MESSAGE(local_id.local() == myId, "nesting of arenas is broken"); 128 CHECK(slot_id.local() == tbb::this_task_arena::current_thread_index()); 129 slot_id.local() = -2; 130 local_id.local() = old_id.local(); 131 old_id.local() = 0; 132 } 133 public: 134 ArenaObserver(tbb::task_arena &a, int maxConcurrency, int numReservedSlots, int id) 135 : tbb::task_scheduler_observer(a) 136 , myId(id) 137 , myMaxConcurrency(maxConcurrency) 138 , myNumReservedSlots(numReservedSlots) { 139 CHECK(myId); 140 observe(true); 141 } 142 ~ArenaObserver () { 143 observe(false); 144 CHECK_MESSAGE(!old_id.local(), "inconsistent observer state"); 145 } 146 }; 147 148 struct IndexTrackingBody { // Must be used together with ArenaObserver 149 void operator() ( const Range& ) const { 150 CHECK(slot_id.local() == tbb::this_task_arena::current_thread_index()); 151 utils::doDummyWork(50000); 152 } 153 }; 154 155 struct AsynchronousWork { 156 utils::SpinBarrier &my_barrier; 157 bool my_is_blocking; 158 AsynchronousWork(utils::SpinBarrier &a_barrier, bool blocking = true) 159 : my_barrier(a_barrier), my_is_blocking(blocking) {} 160 void operator()() const { 161 CHECK_MESSAGE(local_id.local() != 0, "not in explicit arena"); 162 tbb::parallel_for(Range(0,500), IndexTrackingBody(), tbb::simple_partitioner()); 163 if(my_is_blocking) my_barrier.wait(); // must be asynchronous to an external thread 164 else my_barrier.signalNoWait(); 165 } 166 }; 167 168 //-----------------------------------------------------------------------------------------// 169 170 // Test that task_arenas might be created and used from multiple application threads. 171 // Also tests arena observers. The parameter p is the index of an app thread running this test. 172 void TestConcurrentArenasFunc(int idx) { 173 // A regression test for observer activation order: 174 // check that arena observer can be activated before local observer 175 struct LocalObserver : public tbb::task_scheduler_observer { 176 LocalObserver() : tbb::task_scheduler_observer() { observe(true); } 177 LocalObserver(tbb::task_arena& a) : tbb::task_scheduler_observer(a) { 178 observe(true); 179 } 180 ~LocalObserver() { 181 observe(false); 182 } 183 }; 184 185 tbb::task_arena a1; 186 a1.initialize(1,0); 187 ArenaObserver o1(a1, 1, 0, idx*2+1); // the last argument is a "unique" observer/arena id for the test 188 CHECK_MESSAGE(o1.is_observing(), "Arena observer has not been activated"); 189 190 tbb::task_arena a2(2,1); 191 ArenaObserver o2(a2, 2, 1, idx*2+2); 192 CHECK_MESSAGE(o2.is_observing(), "Arena observer has not been activated"); 193 194 LocalObserver lo1; 195 CHECK_MESSAGE(lo1.is_observing(), "Local observer has not been activated"); 196 197 tbb::task_arena a3(1, 0); 198 LocalObserver lo2(a3); 199 CHECK_MESSAGE(lo2.is_observing(), "Local observer has not been activated"); 200 201 utils::SpinBarrier barrier(2); 202 AsynchronousWork work(barrier); 203 204 a1.enqueue(work); // put async work 205 barrier.wait(); 206 207 a2.enqueue(work); // another work 208 a2.execute(work); 209 210 a3.execute([] { 211 utils::doDummyWork(100); 212 }); 213 214 a1.debug_wait_until_empty(); 215 a2.debug_wait_until_empty(); 216 } 217 218 void TestConcurrentArenas(int p) { 219 // TODO REVAMP fix with global control 220 ResetTLS(); 221 utils::NativeParallelFor( p, &TestConcurrentArenasFunc ); 222 } 223 //--------------------------------------------------// 224 // Test multiple application threads working with a single arena at the same time. 225 class MultipleMastersPart1 : utils::NoAssign { 226 tbb::task_arena &my_a; 227 utils::SpinBarrier &my_b1, &my_b2; 228 public: 229 MultipleMastersPart1( tbb::task_arena &a, utils::SpinBarrier &b1, utils::SpinBarrier &b2) 230 : my_a(a), my_b1(b1), my_b2(b2) {} 231 void operator()(int) const { 232 my_a.execute(AsynchronousWork(my_b2, /*blocking=*/false)); 233 my_b1.wait(); 234 // A regression test for bugs 1954 & 1971 235 my_a.enqueue(AsynchronousWork(my_b2, /*blocking=*/false)); 236 } 237 }; 238 239 class MultipleMastersPart2 : utils::NoAssign { 240 tbb::task_arena &my_a; 241 utils::SpinBarrier &my_b; 242 public: 243 MultipleMastersPart2( tbb::task_arena &a, utils::SpinBarrier &b) : my_a(a), my_b(b) {} 244 void operator()(int) const { 245 my_a.execute(AsynchronousWork(my_b, /*blocking=*/false)); 246 } 247 }; 248 249 class MultipleMastersPart3 : utils::NoAssign { 250 tbb::task_arena &my_a; 251 utils::SpinBarrier &my_b; 252 using wait_context = tbb::detail::d1::wait_context; 253 254 struct Runner : NoAssign { 255 wait_context& myWait; 256 Runner(wait_context& w) : myWait(w) {} 257 void operator()() const { 258 utils::doDummyWork(10000); 259 myWait.release(); 260 } 261 }; 262 263 struct Waiter : NoAssign { 264 wait_context& myWait; 265 Waiter(wait_context& w) : myWait(w) {} 266 void operator()() const { 267 tbb::task_group_context ctx; 268 tbb::detail::d1::wait(myWait, ctx); 269 } 270 }; 271 272 public: 273 MultipleMastersPart3(tbb::task_arena &a, utils::SpinBarrier &b) 274 : my_a(a), my_b(b) {} 275 void operator()(int) const { 276 wait_context wait(0); 277 my_b.wait(); // increases chances for task_arena initialization contention 278 for( int i=0; i<100; ++i) { 279 wait.reserve(); 280 my_a.enqueue(Runner(wait)); 281 my_a.execute(Waiter(wait)); 282 } 283 my_b.wait(); 284 } 285 }; 286 287 void TestMultipleMasters(int p) { 288 { 289 ResetTLS(); 290 tbb::task_arena a(1,0); 291 a.initialize(); 292 ArenaObserver o(a, 1, 0, 1); 293 utils::SpinBarrier barrier1(p), barrier2(2*p+1); // each of p threads will submit two tasks signaling the barrier 294 NativeParallelFor( p, MultipleMastersPart1(a, barrier1, barrier2) ); 295 barrier2.wait(); 296 a.debug_wait_until_empty(); 297 } { 298 ResetTLS(); 299 tbb::task_arena a(2,1); 300 ArenaObserver o(a, 2, 1, 2); 301 utils::SpinBarrier barrier(p+2); 302 a.enqueue(AsynchronousWork(barrier, /*blocking=*/true)); // occupy the worker, a regression test for bug 1981 303 // TODO: buggy test. A worker threads need time to occupy the slot to prevent an external thread from taking an enqueue task. 304 utils::Sleep(10); 305 NativeParallelFor( p, MultipleMastersPart2(a, barrier) ); 306 barrier.wait(); 307 a.debug_wait_until_empty(); 308 } { 309 // Regression test for the bug 1981 part 2 (task_arena::execute() with wait_for_all for an enqueued task) 310 tbb::task_arena a(p,1); 311 utils::SpinBarrier barrier(p+1); // for external threads to avoid endless waiting at least in some runs 312 // "Oversubscribe" the arena by 1 external thread 313 NativeParallelFor( p+1, MultipleMastersPart3(a, barrier) ); 314 a.debug_wait_until_empty(); 315 } 316 } 317 318 //--------------------------------------------------// 319 // TODO: explain what TestArenaEntryConsistency does 320 #include <sstream> 321 #include <stdexcept> 322 #include "oneapi/tbb/detail/_exception.h" 323 #include "common/fp_control.h" 324 325 struct TestArenaEntryBody : FPModeContext { 326 std::atomic<int> &my_stage; // each execute increases it 327 std::stringstream my_id; 328 bool is_caught, is_expected; 329 enum { arenaFPMode = 1 }; 330 331 TestArenaEntryBody(std::atomic<int> &s, int idx, int i) // init thread-specific instance 332 : FPModeContext(idx+i) 333 , my_stage(s) 334 , is_caught(false) 335 #if TBB_USE_EXCEPTIONS 336 , is_expected( (idx&(1<<i)) != 0 ) 337 #else 338 , is_expected(false) 339 #endif 340 { 341 my_id << idx << ':' << i << '@'; 342 } 343 void operator()() { // inside task_arena::execute() 344 // synchronize with other stages 345 int stage = my_stage++; 346 int slot = tbb::this_task_arena::current_thread_index(); 347 CHECK(slot >= 0); 348 CHECK(slot <= 1); 349 // wait until the third stage is delegated and then starts on slot 0 350 while(my_stage < 2+slot) utils::yield(); 351 // deduct its entry type and put it into id, it helps to find source of a problem 352 my_id << (stage < 3 ? (tbb::this_task_arena::current_thread_index()? 353 "delegated_to_worker" : stage < 2? "direct" : "delegated_to_master") 354 : stage == 3? "nested_same_ctx" : "nested_alien_ctx"); 355 AssertFPMode(arenaFPMode); 356 if (is_expected) { 357 TBB_TEST_THROW(std::logic_error(my_id.str())); 358 } 359 // no code can be put here since exceptions can be thrown 360 } 361 void on_exception(const char *e) { // outside arena, in catch block 362 is_caught = true; 363 CHECK(my_id.str() == e); 364 assertFPMode(); 365 } 366 void after_execute() { // outside arena and catch block 367 CHECK(is_caught == is_expected); 368 assertFPMode(); 369 } 370 }; 371 372 class ForEachArenaEntryBody : utils::NoAssign { 373 tbb::task_arena &my_a; // expected task_arena(2,1) 374 std::atomic<int> &my_stage; // each execute increases it 375 int my_idx; 376 377 public: 378 ForEachArenaEntryBody(tbb::task_arena &a, std::atomic<int> &c) 379 : my_a(a), my_stage(c), my_idx(0) {} 380 381 void test(int idx) { 382 my_idx = idx; 383 my_stage = 0; 384 NativeParallelFor(3, *this); // test cross-arena calls 385 CHECK(my_stage == 3); 386 my_a.execute(*this); // test nested calls 387 CHECK(my_stage == 5); 388 } 389 390 // task_arena functor for nested tests 391 void operator()() const { 392 test_arena_entry(3); // in current task group context 393 tbb::parallel_for(4, 5, *this); // in different context 394 } 395 396 // NativeParallelFor & parallel_for functor 397 void operator()(int i) const { 398 test_arena_entry(i); 399 } 400 401 private: 402 void test_arena_entry(int i) const { 403 GetRoundingMode(); 404 TestArenaEntryBody scoped_functor(my_stage, my_idx, i); 405 GetRoundingMode(); 406 #if TBB_USE_EXCEPTIONS 407 try { 408 my_a.execute(scoped_functor); 409 } catch(std::logic_error &e) { 410 scoped_functor.on_exception(e.what()); 411 } catch(...) { CHECK_MESSAGE(false, "Unexpected exception type"); } 412 #else 413 my_a.execute(scoped_functor); 414 #endif 415 scoped_functor.after_execute(); 416 } 417 }; 418 419 void TestArenaEntryConsistency() { 420 tbb::task_arena a(2, 1); 421 std::atomic<int> c; 422 ForEachArenaEntryBody body(a, c); 423 424 FPModeContext fp_scope(TestArenaEntryBody::arenaFPMode); 425 a.initialize(); // capture FP settings to arena 426 fp_scope.setNextFPMode(); 427 428 for (int i = 0; i < 100; i++) // not less than 32 = 2^5 of entry types 429 body.test(i); 430 } 431 //-------------------------------------------------- 432 // Test that the requested degree of concurrency for task_arena is achieved in various conditions 433 class TestArenaConcurrencyBody : utils::NoAssign { 434 tbb::task_arena &my_a; 435 int my_max_concurrency; 436 int my_reserved_slots; 437 utils::SpinBarrier *my_barrier; 438 utils::SpinBarrier *my_worker_barrier; 439 public: 440 TestArenaConcurrencyBody( tbb::task_arena &a, int max_concurrency, int reserved_slots, utils::SpinBarrier *b = nullptr, utils::SpinBarrier *wb = nullptr ) 441 : my_a(a), my_max_concurrency(max_concurrency), my_reserved_slots(reserved_slots), my_barrier(b), my_worker_barrier(wb) {} 442 // NativeParallelFor's functor 443 void operator()( int ) const { 444 CHECK_MESSAGE( local_id.local() == 0, "TLS was not cleaned?" ); 445 local_id.local() = 1; 446 my_a.execute( *this ); 447 } 448 // Arena's functor 449 void operator()() const { 450 int idx = tbb::this_task_arena::current_thread_index(); 451 REQUIRE( idx < (my_max_concurrency > 1 ? my_max_concurrency : 2) ); 452 REQUIRE( my_a.max_concurrency() == tbb::this_task_arena::max_concurrency() ); 453 int max_arena_concurrency = tbb::this_task_arena::max_concurrency(); 454 REQUIRE( max_arena_concurrency == my_max_concurrency ); 455 if ( my_worker_barrier ) { 456 if ( local_id.local() == 1 ) { 457 // External thread in a reserved slot 458 CHECK_MESSAGE( idx < my_reserved_slots, "External threads are supposed to use only reserved slots in this test" ); 459 } else { 460 // Worker thread 461 CHECK( idx >= my_reserved_slots ); 462 my_worker_barrier->wait(); 463 } 464 } else if ( my_barrier ) 465 CHECK_MESSAGE( local_id.local() == 1, "Workers are not supposed to enter the arena in this test" ); 466 if ( my_barrier ) my_barrier->wait(); 467 else utils::Sleep( 1 ); 468 } 469 }; 470 471 void TestArenaConcurrency( int p, int reserved = 0, int step = 1) { 472 for (; reserved <= p; reserved += step) { 473 tbb::task_arena a( p, reserved ); 474 if (p - reserved < tbb::this_task_arena::max_concurrency()) { 475 // Check concurrency with worker & reserved external threads. 476 ResetTLS(); 477 utils::SpinBarrier b( p ); 478 utils::SpinBarrier wb( p-reserved ); 479 TestArenaConcurrencyBody test( a, p, reserved, &b, &wb ); 480 for ( int i = reserved; i < p; ++i ) // requests p-reserved worker threads 481 a.enqueue( test ); 482 if ( reserved==1 ) 483 test( 0 ); // calls execute() 484 else 485 utils::NativeParallelFor( reserved, test ); 486 a.debug_wait_until_empty(); 487 } { // Check if multiple external threads alone can achieve maximum concurrency. 488 ResetTLS(); 489 utils::SpinBarrier b( p ); 490 utils::NativeParallelFor( p, TestArenaConcurrencyBody( a, p, reserved, &b ) ); 491 a.debug_wait_until_empty(); 492 } { // Check oversubscription by external threads. 493 #if !_WIN32 || !_WIN64 494 // Some C++ implementations allocate 8MB stacks for std::thread on 32 bit platforms 495 // that makes impossible to create more than ~500 threads. 496 if ( !(sizeof(std::size_t) == 4 && p > 200) ) 497 #endif 498 #if TBB_TEST_LOW_WORKLOAD 499 if ( p <= 16 ) 500 #endif 501 { 502 ResetTLS(); 503 utils::NativeParallelFor(2 * p, TestArenaConcurrencyBody(a, p, reserved)); 504 a.debug_wait_until_empty(); 505 } 506 } 507 } 508 } 509 510 struct TestMandatoryConcurrencyObserver : public tbb::task_scheduler_observer { 511 utils::SpinBarrier& m_barrier; 512 513 TestMandatoryConcurrencyObserver(tbb::task_arena& a, utils::SpinBarrier& barrier) 514 : tbb::task_scheduler_observer(a), m_barrier(barrier) { 515 observe(true); 516 } 517 ~TestMandatoryConcurrencyObserver() { 518 observe(false); 519 } 520 void on_scheduler_exit(bool worker) override { 521 if (worker) { 522 m_barrier.wait(); 523 } 524 } 525 }; 526 527 void TestMandatoryConcurrency() { 528 tbb::task_arena a(1); 529 a.execute([&a] { 530 int n_threads = 4; 531 utils::SpinBarrier exit_barrier(2); 532 TestMandatoryConcurrencyObserver observer(a, exit_barrier); 533 for (int j = 0; j < 5; ++j) { 534 utils::ExactConcurrencyLevel::check(1); 535 std::atomic<int> num_tasks{ 0 }, curr_tasks{ 0 }; 536 utils::SpinBarrier barrier(n_threads); 537 utils::NativeParallelFor(n_threads, [&](int) { 538 for (int i = 0; i < 5; ++i) { 539 barrier.wait(); 540 a.enqueue([&] { 541 CHECK(tbb::this_task_arena::max_concurrency() == 2); 542 CHECK(a.max_concurrency() == 2); 543 ++curr_tasks; 544 CHECK(curr_tasks == 1); 545 utils::doDummyWork(1000); 546 CHECK(curr_tasks == 1); 547 --curr_tasks; 548 ++num_tasks; 549 }); 550 barrier.wait(); 551 } 552 }); 553 do { 554 exit_barrier.wait(); 555 } while (num_tasks < n_threads * 5); 556 } 557 }); 558 } 559 560 void TestConcurrentFunctionality(int min_thread_num = 1, int max_thread_num = 3) { 561 TestMandatoryConcurrency(); 562 InitializeAndTerminate(max_thread_num); 563 for (int p = min_thread_num; p <= max_thread_num; ++p) { 564 TestConcurrentArenas(p); 565 TestMultipleMasters(p); 566 TestArenaConcurrency(p); 567 } 568 } 569 570 //--------------------------------------------------// 571 // Test creation/initialization of a task_arena that references an existing arena (aka attach). 572 // This part of the test uses the knowledge of task_arena internals 573 574 struct TaskArenaValidator { 575 int my_slot_at_construction; 576 const tbb::task_arena& my_arena; 577 TaskArenaValidator( const tbb::task_arena& other ) 578 : my_slot_at_construction(tbb::this_task_arena::current_thread_index()) 579 , my_arena(other) 580 {} 581 // Inspect the internal state 582 int concurrency() { return my_arena.debug_max_concurrency(); } 583 int reserved_for_masters() { return my_arena.debug_reserved_slots(); } 584 585 // This method should be called in task_arena::execute() for a captured arena 586 // by the same thread that created the validator. 587 void operator()() { 588 CHECK_MESSAGE( tbb::this_task_arena::current_thread_index()==my_slot_at_construction, 589 "Current thread index has changed since the validator construction" ); 590 } 591 }; 592 593 void ValidateAttachedArena( tbb::task_arena& arena, bool expect_activated, 594 int expect_concurrency, int expect_masters ) { 595 CHECK_MESSAGE( arena.is_active()==expect_activated, "Unexpected activation state" ); 596 if( arena.is_active() ) { 597 TaskArenaValidator validator( arena ); 598 CHECK_MESSAGE( validator.concurrency()==expect_concurrency, "Unexpected arena size" ); 599 CHECK_MESSAGE( validator.reserved_for_masters()==expect_masters, "Unexpected # of reserved slots" ); 600 if ( tbb::this_task_arena::current_thread_index() != tbb::task_arena::not_initialized ) { 601 CHECK(tbb::this_task_arena::current_thread_index() >= 0); 602 // for threads already in arena, check that the thread index remains the same 603 arena.execute( validator ); 604 } else { // not_initialized 605 // Test the deprecated method 606 CHECK(tbb::this_task_arena::current_thread_index() == -1); 607 } 608 609 // Ideally, there should be a check for having the same internal arena object, 610 // but that object is not easily accessible for implicit arenas. 611 } 612 } 613 614 struct TestAttachBody : utils::NoAssign { 615 static thread_local int my_idx; // safe to modify and use within the NativeParallelFor functor 616 const int maxthread; 617 TestAttachBody( int max_thr ) : maxthread(max_thr) {} 618 619 // The functor body for NativeParallelFor 620 void operator()( int idx ) const { 621 my_idx = idx; 622 623 int default_threads = tbb::this_task_arena::max_concurrency(); 624 625 tbb::task_arena arena{tbb::task_arena::attach()}; 626 ValidateAttachedArena( arena, false, -1, -1 ); // Nothing yet to attach to 627 628 arena.terminate(); 629 ValidateAttachedArena( arena, false, -1, -1 ); 630 631 // attach to an auto-initialized arena 632 tbb::parallel_for(0, 1, [](int) {}); 633 634 tbb::task_arena arena2{tbb::task_arena::attach()}; 635 ValidateAttachedArena( arena2, true, default_threads, 1 ); 636 637 tbb::task_arena arena3; 638 arena3.initialize(tbb::attach()); 639 ValidateAttachedArena( arena3, true, default_threads, 1 ); 640 641 642 // attach to another task_arena 643 arena.initialize( maxthread, std::min(maxthread,idx) ); 644 arena.execute( *this ); 645 } 646 647 // The functor body for task_arena::execute above 648 void operator()() const { 649 tbb::task_arena arena2{tbb::task_arena::attach()}; 650 ValidateAttachedArena( arena2, true, maxthread, std::min(maxthread,my_idx) ); 651 } 652 653 // The functor body for tbb::parallel_for 654 void operator()( const Range& r ) const { 655 for( int i = r.begin(); i<r.end(); ++i ) { 656 tbb::task_arena arena2{tbb::task_arena::attach()}; 657 ValidateAttachedArena( arena2, true, tbb::this_task_arena::max_concurrency(), 1 ); 658 } 659 } 660 }; 661 662 thread_local int TestAttachBody::my_idx; 663 664 void TestAttach( int maxthread ) { 665 // Externally concurrent, but no concurrency within a thread 666 utils::NativeParallelFor( std::max(maxthread,4), TestAttachBody( maxthread ) ); 667 // Concurrent within the current arena; may also serve as a stress test 668 tbb::parallel_for( Range(0,10000*maxthread), TestAttachBody( maxthread ) ); 669 } 670 671 //--------------------------------------------------// 672 673 // Test that task_arena::enqueue does not tolerate a non-const functor. 674 // TODO: can it be reworked as SFINAE-based compile-time check? 675 struct TestFunctor { 676 void operator()() { CHECK_MESSAGE( false, "Non-const operator called" ); } 677 void operator()() const { /* library requires this overload only */ } 678 }; 679 680 void TestConstantFunctorRequirement() { 681 tbb::task_arena a; 682 TestFunctor tf; 683 a.enqueue( tf ); 684 } 685 686 //--------------------------------------------------// 687 688 #include "tbb/parallel_reduce.h" 689 #include "tbb/parallel_invoke.h" 690 691 // Test this_task_arena::isolate 692 namespace TestIsolatedExecuteNS { 693 template <typename NestedPartitioner> 694 class NestedParFor : utils::NoAssign { 695 public: 696 NestedParFor() {} 697 void operator()() const { 698 NestedPartitioner p; 699 tbb::parallel_for( 0, 10, utils::DummyBody( 10 ), p ); 700 } 701 }; 702 703 template <typename NestedPartitioner> 704 class ParForBody : utils::NoAssign { 705 bool myOuterIsolation; 706 tbb::enumerable_thread_specific<int> &myEts; 707 std::atomic<bool> &myIsStolen; 708 public: 709 ParForBody( bool outer_isolation, tbb::enumerable_thread_specific<int> &ets, std::atomic<bool> &is_stolen ) 710 : myOuterIsolation( outer_isolation ), myEts( ets ), myIsStolen( is_stolen ) {} 711 void operator()( int ) const { 712 int &e = myEts.local(); 713 if ( e++ > 0 ) myIsStolen = true; 714 if ( myOuterIsolation ) 715 NestedParFor<NestedPartitioner>()(); 716 else 717 tbb::this_task_arena::isolate( NestedParFor<NestedPartitioner>() ); 718 --e; 719 } 720 }; 721 722 template <typename OuterPartitioner, typename NestedPartitioner> 723 class OuterParFor : utils::NoAssign { 724 bool myOuterIsolation; 725 std::atomic<bool> &myIsStolen; 726 public: 727 OuterParFor( bool outer_isolation, std::atomic<bool> &is_stolen ) : myOuterIsolation( outer_isolation ), myIsStolen( is_stolen ) {} 728 void operator()() const { 729 tbb::enumerable_thread_specific<int> ets( 0 ); 730 OuterPartitioner p; 731 tbb::parallel_for( 0, 1000, ParForBody<NestedPartitioner>( myOuterIsolation, ets, myIsStolen ), p ); 732 } 733 }; 734 735 template <typename OuterPartitioner, typename NestedPartitioner> 736 void TwoLoopsTest( bool outer_isolation ) { 737 std::atomic<bool> is_stolen; 738 is_stolen = false; 739 const int max_repeats = 100; 740 if ( outer_isolation ) { 741 for ( int i = 0; i <= max_repeats; ++i ) { 742 tbb::this_task_arena::isolate( OuterParFor<OuterPartitioner, NestedPartitioner>( outer_isolation, is_stolen ) ); 743 if ( is_stolen ) break; 744 } 745 // TODO: was ASSERT_WARNING 746 if (!is_stolen) { 747 REPORT("Warning: isolate() should not block stealing on nested levels without isolation\n"); 748 } 749 } else { 750 for ( int i = 0; i <= max_repeats; ++i ) { 751 OuterParFor<OuterPartitioner, NestedPartitioner>( outer_isolation, is_stolen )(); 752 } 753 REQUIRE_MESSAGE( !is_stolen, "isolate() on nested levels should prevent stealing from outer leves" ); 754 } 755 } 756 757 void TwoLoopsTest( bool outer_isolation ) { 758 TwoLoopsTest<tbb::simple_partitioner, tbb::simple_partitioner>( outer_isolation ); 759 TwoLoopsTest<tbb::simple_partitioner, tbb::affinity_partitioner>( outer_isolation ); 760 TwoLoopsTest<tbb::affinity_partitioner, tbb::simple_partitioner>( outer_isolation ); 761 TwoLoopsTest<tbb::affinity_partitioner, tbb::affinity_partitioner>( outer_isolation ); 762 } 763 764 void TwoLoopsTest() { 765 TwoLoopsTest( true ); 766 TwoLoopsTest( false ); 767 } 768 //--------------------------------------------------// 769 class HeavyMixTestBody : utils::NoAssign { 770 tbb::enumerable_thread_specific<utils::FastRandom<>>& myRandom; 771 tbb::enumerable_thread_specific<int>& myIsolatedLevel; 772 int myNestedLevel; 773 774 template <typename Partitioner, typename Body> 775 static void RunTwoBodies( utils::FastRandom<>& rnd, const Body &body, Partitioner& p, tbb::task_group_context* ctx = nullptr ) { 776 if ( rnd.get() % 2 ) { 777 if (ctx ) 778 tbb::parallel_for( 0, 2, body, p, *ctx ); 779 else 780 tbb::parallel_for( 0, 2, body, p ); 781 } else { 782 tbb::parallel_invoke( body, body ); 783 } 784 } 785 786 template <typename Partitioner> 787 class IsolatedBody : utils::NoAssign { 788 const HeavyMixTestBody &myHeavyMixTestBody; 789 Partitioner &myPartitioner; 790 public: 791 IsolatedBody( const HeavyMixTestBody &body, Partitioner &partitioner ) 792 : myHeavyMixTestBody( body ), myPartitioner( partitioner ) {} 793 void operator()() const { 794 RunTwoBodies( myHeavyMixTestBody.myRandom.local(), 795 HeavyMixTestBody( myHeavyMixTestBody.myRandom, myHeavyMixTestBody.myIsolatedLevel, 796 myHeavyMixTestBody.myNestedLevel + 1 ), 797 myPartitioner ); 798 } 799 }; 800 801 template <typename Partitioner> 802 void RunNextLevel( utils::FastRandom<>& rnd, int &isolated_level ) const { 803 Partitioner p; 804 switch ( rnd.get() % 2 ) { 805 case 0: { 806 // No features 807 tbb::task_group_context ctx; 808 RunTwoBodies( rnd, HeavyMixTestBody(myRandom, myIsolatedLevel, myNestedLevel + 1), p, &ctx ); 809 break; 810 } 811 case 1: { 812 // Isolation 813 int previous_isolation = isolated_level; 814 isolated_level = myNestedLevel; 815 tbb::this_task_arena::isolate( IsolatedBody<Partitioner>( *this, p ) ); 816 isolated_level = previous_isolation; 817 break; 818 } 819 } 820 } 821 public: 822 HeavyMixTestBody( tbb::enumerable_thread_specific<utils::FastRandom<>>& random, 823 tbb::enumerable_thread_specific<int>& isolated_level, int nested_level ) 824 : myRandom( random ), myIsolatedLevel( isolated_level ) 825 , myNestedLevel( nested_level ) {} 826 void operator()() const { 827 int &isolated_level = myIsolatedLevel.local(); 828 CHECK_FAST_MESSAGE( myNestedLevel > isolated_level, "The outer-level task should not be stolen on isolated level" ); 829 if ( myNestedLevel == 20 ) 830 return; 831 utils::FastRandom<>& rnd = myRandom.local(); 832 if ( rnd.get() % 2 == 1 ) { 833 RunNextLevel<tbb::auto_partitioner>( rnd, isolated_level ); 834 } else { 835 RunNextLevel<tbb::affinity_partitioner>( rnd, isolated_level ); 836 } 837 } 838 void operator()(int) const { 839 this->operator()(); 840 } 841 }; 842 843 struct RandomInitializer { 844 utils::FastRandom<> operator()() { 845 return utils::FastRandom<>( tbb::this_task_arena::current_thread_index() ); 846 } 847 }; 848 849 void HeavyMixTest() { 850 std::size_t num_threads = tbb::this_task_arena::max_concurrency() < 3 ? 3 : tbb::this_task_arena::max_concurrency(); 851 tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, num_threads); 852 853 RandomInitializer init_random; 854 tbb::enumerable_thread_specific<utils::FastRandom<>> random( init_random ); 855 tbb::enumerable_thread_specific<int> isolated_level( 0 ); 856 for ( int i = 0; i < 5; ++i ) { 857 HeavyMixTestBody b( random, isolated_level, 1 ); 858 b( 0 ); 859 } 860 } 861 862 //--------------------------------------------------// 863 #if TBB_USE_EXCEPTIONS 864 struct MyException {}; 865 struct IsolatedBodyThrowsException { 866 void operator()() const { 867 #if _MSC_VER && !__INTEL_COMPILER 868 // Workaround an unreachable code warning in task_arena_function. 869 volatile bool workaround = true; 870 if (workaround) 871 #endif 872 { 873 throw MyException(); 874 } 875 } 876 }; 877 struct ExceptionTestBody : utils::NoAssign { 878 tbb::enumerable_thread_specific<int>& myEts; 879 std::atomic<bool>& myIsStolen; 880 ExceptionTestBody( tbb::enumerable_thread_specific<int>& ets, std::atomic<bool>& is_stolen ) 881 : myEts( ets ), myIsStolen( is_stolen ) {} 882 void operator()( int i ) const { 883 try { 884 tbb::this_task_arena::isolate( IsolatedBodyThrowsException() ); 885 REQUIRE_MESSAGE( false, "The exception has been lost" ); 886 } 887 catch ( MyException ) {} 888 catch ( ... ) { 889 REQUIRE_MESSAGE( false, "Unexpected exception" ); 890 } 891 // Check that nested algorithms can steal outer-level tasks 892 int &e = myEts.local(); 893 if ( e++ > 0 ) myIsStolen = true; 894 // work imbalance increases chances for stealing 895 tbb::parallel_for( 0, 10+i, utils::DummyBody( 100 ) ); 896 --e; 897 } 898 }; 899 900 #endif /* TBB_USE_EXCEPTIONS */ 901 void ExceptionTest() { 902 #if TBB_USE_EXCEPTIONS 903 tbb::enumerable_thread_specific<int> ets; 904 std::atomic<bool> is_stolen; 905 is_stolen = false; 906 for ( ;; ) { 907 tbb::parallel_for( 0, 1000, ExceptionTestBody( ets, is_stolen ) ); 908 if ( is_stolen ) break; 909 } 910 REQUIRE_MESSAGE( is_stolen, "isolate should not affect non-isolated work" ); 911 #endif /* TBB_USE_EXCEPTIONS */ 912 } 913 914 struct NonConstBody { 915 unsigned int state; 916 void operator()() { 917 state ^= ~0u; 918 } 919 }; 920 921 void TestNonConstBody() { 922 NonConstBody body; 923 body.state = 0x6c97d5ed; 924 tbb::this_task_arena::isolate(body); 925 REQUIRE_MESSAGE(body.state == 0x93682a12, "The wrong state"); 926 } 927 928 // TODO: Consider tbb::task_group instead of explicit task API. 929 class TestEnqueueTask : public tbb::detail::d1::task { 930 using wait_context = tbb::detail::d1::wait_context; 931 932 tbb::enumerable_thread_specific<bool>& executed; 933 std::atomic<int>& completed; 934 935 public: 936 wait_context& waiter; 937 tbb::task_arena& arena; 938 static const int N = 100; 939 940 TestEnqueueTask(tbb::enumerable_thread_specific<bool>& exe, std::atomic<int>& c, wait_context& w, tbb::task_arena& a) 941 : executed(exe), completed(c), waiter(w), arena(a) {} 942 943 tbb::detail::d1::task* execute(tbb::detail::d1::execution_data&) override { 944 for (int i = 0; i < N; ++i) { 945 arena.enqueue([&]() { 946 executed.local() = true; 947 ++completed; 948 for (int j = 0; j < 100; j++) utils::yield(); 949 waiter.release(1); 950 }); 951 } 952 return nullptr; 953 } 954 tbb::detail::d1::task* cancel(tbb::detail::d1::execution_data&) override { return nullptr; } 955 }; 956 957 class TestEnqueueIsolateBody : utils::NoCopy { 958 tbb::enumerable_thread_specific<bool>& executed; 959 std::atomic<int>& completed; 960 tbb::task_arena& arena; 961 public: 962 static const int N = 100; 963 964 TestEnqueueIsolateBody(tbb::enumerable_thread_specific<bool>& exe, std::atomic<int>& c, tbb::task_arena& a) 965 : executed(exe), completed(c), arena(a) {} 966 void operator()() { 967 tbb::task_group_context ctx; 968 tbb::detail::d1::wait_context waiter(N); 969 970 TestEnqueueTask root(executed, completed, waiter, arena); 971 tbb::detail::d1::execute_and_wait(root, ctx, waiter, ctx); 972 } 973 }; 974 975 void TestEnqueue() { 976 tbb::enumerable_thread_specific<bool> executed(false); 977 std::atomic<int> completed; 978 tbb::task_arena arena{tbb::task_arena::attach()}; 979 980 // Check that the main thread can process enqueued tasks. 981 completed = 0; 982 TestEnqueueIsolateBody b1(executed, completed, arena); 983 b1(); 984 985 if (!executed.local()) { 986 REPORT("Warning: No one enqueued task has executed by the main thread.\n"); 987 } 988 989 executed.local() = false; 990 completed = 0; 991 const int N = 100; 992 // Create enqueued tasks out of isolation. 993 994 tbb::task_group_context ctx; 995 tbb::detail::d1::wait_context waiter(N); 996 for (int i = 0; i < N; ++i) { 997 arena.enqueue([&]() { 998 executed.local() = true; 999 ++completed; 1000 utils::yield(); 1001 waiter.release(1); 1002 }); 1003 } 1004 TestEnqueueIsolateBody b2(executed, completed, arena); 1005 tbb::this_task_arena::isolate(b2); 1006 REQUIRE_MESSAGE(executed.local() == false, "An enqueued task was executed within isolate."); 1007 1008 tbb::detail::d1::wait(waiter, ctx); 1009 // while (completed < TestEnqueueTask::N + N) utils::yield(); 1010 } 1011 } 1012 1013 void TestIsolatedExecute() { 1014 // At least 3 threads (owner + 2 thieves) are required to reproduce a situation when the owner steals outer 1015 // level task on a nested level. If we have only one thief then it will execute outer level tasks first and 1016 // the owner will not have a possibility to steal outer level tasks. 1017 int platform_max_thread = tbb::this_task_arena::max_concurrency(); 1018 int num_threads = utils::min( platform_max_thread, 3 ); 1019 { 1020 // Too many threads require too many work to reproduce the stealing from outer level. 1021 tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, utils::max(num_threads, 7)); 1022 TestIsolatedExecuteNS::TwoLoopsTest(); 1023 TestIsolatedExecuteNS::HeavyMixTest(); 1024 TestIsolatedExecuteNS::ExceptionTest(); 1025 } 1026 tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, num_threads); 1027 TestIsolatedExecuteNS::HeavyMixTest(); 1028 TestIsolatedExecuteNS::TestNonConstBody(); 1029 TestIsolatedExecuteNS::TestEnqueue(); 1030 } 1031 1032 //-----------------------------------------------------------------------------------------// 1033 1034 class TestDelegatedSpawnWaitBody : utils::NoAssign { 1035 tbb::task_arena &my_a; 1036 utils::SpinBarrier &my_b1, &my_b2; 1037 public: 1038 TestDelegatedSpawnWaitBody( tbb::task_arena &a, utils::SpinBarrier &b1, utils::SpinBarrier &b2) 1039 : my_a(a), my_b1(b1), my_b2(b2) {} 1040 // NativeParallelFor's functor 1041 void operator()(int idx) const { 1042 if ( idx==0 ) { // thread 0 works in the arena, thread 1 waits for it (to prevent test hang) 1043 for (int i = 0; i < 2; ++i) { 1044 my_a.enqueue([this] { my_b1.wait(); }); // tasks to sync with workers 1045 } 1046 tbb::task_group tg; 1047 my_b1.wait(); // sync with the workers 1048 for( int i=0; i<100000; ++i) { 1049 my_a.execute([&tg] { tg.run([] {}); }); 1050 } 1051 my_a.execute([&tg] {tg.wait(); }); 1052 } 1053 1054 my_b2.wait(); // sync both threads 1055 } 1056 }; 1057 1058 void TestDelegatedSpawnWait() { 1059 if (tbb::this_task_arena::max_concurrency() < 3) { 1060 // The test requires at least 2 worker threads 1061 return; 1062 } 1063 // Regression test for a bug with missed wakeup notification from a delegated task 1064 tbb::task_arena a(2,0); 1065 a.initialize(); 1066 utils::SpinBarrier barrier1(3), barrier2(2); 1067 utils::NativeParallelFor( 2, TestDelegatedSpawnWaitBody(a, barrier1, barrier2) ); 1068 a.debug_wait_until_empty(); 1069 } 1070 1071 //-----------------------------------------------------------------------------------------// 1072 1073 class TestMultipleWaitsArenaWait : utils::NoAssign { 1074 using wait_context = tbb::detail::d1::wait_context; 1075 public: 1076 TestMultipleWaitsArenaWait( int idx, int bunch_size, int num_tasks, std::vector<wait_context*>& waiters, std::atomic<int>& processed, tbb::task_group_context& tgc ) 1077 : my_idx( idx ), my_bunch_size( bunch_size ), my_num_tasks(num_tasks), my_waiters( waiters ), my_processed( processed ), my_context(tgc) {} 1078 void operator()() const { 1079 ++my_processed; 1080 // Wait for all tasks 1081 if ( my_idx < my_num_tasks ) { 1082 tbb::detail::d1::wait(*my_waiters[my_idx], my_context); 1083 } 1084 // Signal waiting tasks 1085 if ( my_idx >= my_bunch_size ) { 1086 my_waiters[my_idx-my_bunch_size]->release(); 1087 } 1088 } 1089 private: 1090 int my_idx; 1091 int my_bunch_size; 1092 int my_num_tasks; 1093 std::vector<wait_context*>& my_waiters; 1094 std::atomic<int>& my_processed; 1095 tbb::task_group_context& my_context; 1096 }; 1097 1098 class TestMultipleWaitsThreadBody : utils::NoAssign { 1099 using wait_context = tbb::detail::d1::wait_context; 1100 public: 1101 TestMultipleWaitsThreadBody( int bunch_size, int num_tasks, tbb::task_arena& a, std::vector<wait_context*>& waiters, std::atomic<int>& processed, tbb::task_group_context& tgc ) 1102 : my_bunch_size( bunch_size ), my_num_tasks( num_tasks ), my_arena( a ), my_waiters( waiters ), my_processed( processed ), my_context(tgc) {} 1103 void operator()( int idx ) const { 1104 my_arena.execute( TestMultipleWaitsArenaWait( idx, my_bunch_size, my_num_tasks, my_waiters, my_processed, my_context ) ); 1105 --my_processed; 1106 } 1107 private: 1108 int my_bunch_size; 1109 int my_num_tasks; 1110 tbb::task_arena& my_arena; 1111 std::vector<wait_context*>& my_waiters; 1112 std::atomic<int>& my_processed; 1113 tbb::task_group_context& my_context; 1114 }; 1115 1116 void TestMultipleWaits( int num_threads, int num_bunches, int bunch_size ) { 1117 tbb::task_arena a( num_threads ); 1118 const int num_tasks = (num_bunches-1)*bunch_size; 1119 1120 tbb::task_group_context tgc; 1121 std::vector<tbb::detail::d1::wait_context*> waiters(num_tasks); 1122 for (auto& w : waiters) w = new tbb::detail::d1::wait_context(0); 1123 1124 std::atomic<int> processed(0); 1125 for ( int repeats = 0; repeats<10; ++repeats ) { 1126 int idx = 0; 1127 for ( int bunch = 0; bunch < num_bunches-1; ++bunch ) { 1128 // Sync with the previous bunch of waiters to prevent "false" nested dependicies (when a nested task waits for an outer task). 1129 while ( processed < bunch*bunch_size ) utils::yield(); 1130 // Run the bunch of threads/waiters that depend on the next bunch of threads/waiters. 1131 for ( int i = 0; i<bunch_size; ++i ) { 1132 waiters[idx]->reserve(); 1133 std::thread( TestMultipleWaitsThreadBody( bunch_size, num_tasks, a, waiters, processed, tgc ), idx++ ).detach(); 1134 } 1135 } 1136 // No sync because the threads of the last bunch do not call wait_for_all. 1137 // Run the last bunch of threads. 1138 for ( int i = 0; i<bunch_size; ++i ) 1139 std::thread( TestMultipleWaitsThreadBody( bunch_size, num_tasks, a, waiters, processed, tgc ), idx++ ).detach(); 1140 while ( processed ) utils::yield(); 1141 } 1142 for (auto w : waiters) delete w; 1143 } 1144 1145 void TestMultipleWaits() { 1146 // Limit the number of threads to prevent heavy oversubscription. 1147 #if TBB_TEST_LOW_WORKLOAD 1148 const int max_threads = std::min( 4, tbb::this_task_arena::max_concurrency() ); 1149 #else 1150 const int max_threads = std::min( 16, tbb::this_task_arena::max_concurrency() ); 1151 #endif 1152 1153 utils::FastRandom<> rnd(1234); 1154 for ( int threads = 1; threads <= max_threads; threads += utils::max( threads/2, 1 ) ) { 1155 for ( int i = 0; i<3; ++i ) { 1156 const int num_bunches = 3 + rnd.get()%3; 1157 const int bunch_size = max_threads + rnd.get()%max_threads; 1158 TestMultipleWaits( threads, num_bunches, bunch_size ); 1159 } 1160 } 1161 } 1162 1163 //--------------------------------------------------// 1164 1165 void TestSmallStackSize() { 1166 tbb::global_control gc(tbb::global_control::thread_stack_size, 1167 tbb::global_control::active_value(tbb::global_control::thread_stack_size) / 2 ); 1168 // The test produces the warning (not a error) if fails. So the test is run many times 1169 // to make the log annoying (to force to consider it as an error). 1170 for (int i = 0; i < 100; ++i) { 1171 tbb::task_arena a; 1172 a.initialize(); 1173 } 1174 } 1175 1176 //--------------------------------------------------// 1177 1178 namespace TestMoveSemanticsNS { 1179 struct TestFunctor { 1180 void operator()() const {}; 1181 }; 1182 1183 struct MoveOnlyFunctor : utils::MoveOnly, TestFunctor { 1184 MoveOnlyFunctor() : utils::MoveOnly() {}; 1185 MoveOnlyFunctor(MoveOnlyFunctor&& other) : utils::MoveOnly(std::move(other)) {}; 1186 }; 1187 1188 struct MovePreferableFunctor : utils::Movable, TestFunctor { 1189 MovePreferableFunctor() : utils::Movable() {}; 1190 MovePreferableFunctor(MovePreferableFunctor&& other) : utils::Movable( std::move(other) ) {}; 1191 MovePreferableFunctor(const MovePreferableFunctor& other) : utils::Movable(other) {}; 1192 }; 1193 1194 struct NoMoveNoCopyFunctor : utils::NoCopy, TestFunctor { 1195 NoMoveNoCopyFunctor() : utils::NoCopy() {}; 1196 // mv ctor is not allowed as cp ctor from parent NoCopy 1197 private: 1198 NoMoveNoCopyFunctor(NoMoveNoCopyFunctor&&); 1199 }; 1200 1201 void TestFunctors() { 1202 tbb::task_arena ta; 1203 MovePreferableFunctor mpf; 1204 // execute() doesn't have any copies or moves of arguments inside the impl 1205 ta.execute( NoMoveNoCopyFunctor() ); 1206 1207 ta.enqueue( MoveOnlyFunctor() ); 1208 ta.enqueue( mpf ); 1209 REQUIRE_MESSAGE(mpf.alive, "object was moved when was passed by lval"); 1210 mpf.Reset(); 1211 ta.enqueue( std::move(mpf) ); 1212 REQUIRE_MESSAGE(!mpf.alive, "object was copied when was passed by rval"); 1213 mpf.Reset(); 1214 } 1215 } 1216 1217 void TestMoveSemantics() { 1218 TestMoveSemanticsNS::TestFunctors(); 1219 } 1220 1221 //--------------------------------------------------// 1222 1223 #include <vector> 1224 1225 #include "common/state_trackable.h" 1226 1227 namespace TestReturnValueNS { 1228 struct noDefaultTag {}; 1229 class ReturnType : public StateTrackable<> { 1230 static const int SIZE = 42; 1231 std::vector<int> data; 1232 public: 1233 ReturnType(noDefaultTag) : StateTrackable<>(0) {} 1234 // Define copy constructor to test that it is never called 1235 ReturnType(const ReturnType& r) : StateTrackable<>(r), data(r.data) {} 1236 ReturnType(ReturnType&& r) : StateTrackable<>(std::move(r)), data(std::move(r.data)) {} 1237 1238 void fill() { 1239 for (int i = 0; i < SIZE; ++i) 1240 data.push_back(i); 1241 } 1242 void check() { 1243 REQUIRE(data.size() == unsigned(SIZE)); 1244 for (int i = 0; i < SIZE; ++i) 1245 REQUIRE(data[i] == i); 1246 StateTrackableCounters::counters_type& cnts = StateTrackableCounters::counters; 1247 REQUIRE(cnts[StateTrackableBase::DefaultInitialized] == 0); 1248 REQUIRE(cnts[StateTrackableBase::DirectInitialized] == 1); 1249 std::size_t copied = cnts[StateTrackableBase::CopyInitialized]; 1250 std::size_t moved = cnts[StateTrackableBase::MoveInitialized]; 1251 REQUIRE(cnts[StateTrackableBase::Destroyed] == copied + moved); 1252 // The number of copies/moves should not exceed 3 if copy elision takes a place: 1253 // function return, store to an internal storage, acquire internal storage. 1254 // For compilation, without copy elision, this number may be grown up to 7. 1255 REQUIRE((copied == 0 && moved <= 7)); 1256 WARN_MESSAGE(moved <= 3, 1257 "Warning: The number of copies/moves should not exceed 3 if copy elision takes a place." 1258 "Take an attention to this warning only if copy elision is enabled." 1259 ); 1260 } 1261 }; 1262 1263 template <typename R> 1264 R function() { 1265 noDefaultTag tag; 1266 R r(tag); 1267 r.fill(); 1268 return r; 1269 } 1270 1271 template <> 1272 void function<void>() {} 1273 1274 template <typename R> 1275 struct Functor { 1276 R operator()() const { 1277 return function<R>(); 1278 } 1279 }; 1280 1281 tbb::task_arena& arena() { 1282 static tbb::task_arena a; 1283 return a; 1284 } 1285 1286 template <typename F> 1287 void TestExecute(F &f) { 1288 StateTrackableCounters::reset(); 1289 ReturnType r{arena().execute(f)}; 1290 r.check(); 1291 } 1292 1293 template <typename F> 1294 void TestExecute(const F &f) { 1295 StateTrackableCounters::reset(); 1296 ReturnType r{arena().execute(f)}; 1297 r.check(); 1298 } 1299 template <typename F> 1300 void TestIsolate(F &f) { 1301 StateTrackableCounters::reset(); 1302 ReturnType r{tbb::this_task_arena::isolate(f)}; 1303 r.check(); 1304 } 1305 1306 template <typename F> 1307 void TestIsolate(const F &f) { 1308 StateTrackableCounters::reset(); 1309 ReturnType r{tbb::this_task_arena::isolate(f)}; 1310 r.check(); 1311 } 1312 1313 void Test() { 1314 TestExecute(Functor<ReturnType>()); 1315 Functor<ReturnType> f1; 1316 TestExecute(f1); 1317 TestExecute(function<ReturnType>); 1318 1319 arena().execute(Functor<void>()); 1320 Functor<void> f2; 1321 arena().execute(f2); 1322 arena().execute(function<void>); 1323 TestIsolate(Functor<ReturnType>()); 1324 TestIsolate(f1); 1325 TestIsolate(function<ReturnType>); 1326 tbb::this_task_arena::isolate(Functor<void>()); 1327 tbb::this_task_arena::isolate(f2); 1328 tbb::this_task_arena::isolate(function<void>); 1329 } 1330 } 1331 1332 void TestReturnValue() { 1333 TestReturnValueNS::Test(); 1334 } 1335 1336 //--------------------------------------------------// 1337 1338 // MyObserver checks if threads join to the same arena 1339 struct MyObserver: public tbb::task_scheduler_observer { 1340 tbb::enumerable_thread_specific<tbb::task_arena*>& my_tls; 1341 tbb::task_arena& my_arena; 1342 std::atomic<int>& my_failure_counter; 1343 std::atomic<int>& my_counter; 1344 utils::SpinBarrier& m_barrier; 1345 1346 MyObserver(tbb::task_arena& a, 1347 tbb::enumerable_thread_specific<tbb::task_arena*>& tls, 1348 std::atomic<int>& failure_counter, 1349 std::atomic<int>& counter, 1350 utils::SpinBarrier& barrier) 1351 : tbb::task_scheduler_observer(a), my_tls(tls), my_arena(a), 1352 my_failure_counter(failure_counter), my_counter(counter), m_barrier(barrier) { 1353 observe(true); 1354 } 1355 ~MyObserver(){ 1356 observe(false); 1357 } 1358 void on_scheduler_entry(bool worker) override { 1359 if (worker) { 1360 ++my_counter; 1361 tbb::task_arena*& cur_arena = my_tls.local(); 1362 if (cur_arena != nullptr && cur_arena != &my_arena) { 1363 ++my_failure_counter; 1364 } 1365 cur_arena = &my_arena; 1366 m_barrier.wait(); 1367 } 1368 } 1369 void on_scheduler_exit(bool worker) override { 1370 if (worker) { 1371 m_barrier.wait(); // before wakeup 1372 m_barrier.wait(); // after wakeup 1373 } 1374 } 1375 }; 1376 1377 void TestArenaWorkersMigrationWithNumThreads(int n_threads = 0) { 1378 if (n_threads == 0) { 1379 n_threads = tbb::this_task_arena::max_concurrency(); 1380 } 1381 1382 const int max_n_arenas = 8; 1383 int n_arenas = 2; 1384 if(n_threads > 16) { 1385 n_arenas = max_n_arenas; 1386 } else if (n_threads > 8) { 1387 n_arenas = 4; 1388 } 1389 1390 int n_workers = n_threads - 1; 1391 n_workers = n_arenas * (n_workers / n_arenas); 1392 if (n_workers == 0) { 1393 return; 1394 } 1395 1396 n_threads = n_workers + 1; 1397 tbb::global_control control(tbb::global_control::max_allowed_parallelism, n_threads); 1398 1399 const int n_repetitions = 20; 1400 const int n_outer_repetitions = 100; 1401 std::multiset<float> failure_ratio; // for median calculating 1402 utils::SpinBarrier barrier(n_threads); 1403 utils::SpinBarrier worker_barrier(n_workers); 1404 MyObserver* observer[max_n_arenas]; 1405 std::vector<tbb::task_arena> arenas(n_arenas); 1406 std::atomic<int> failure_counter; 1407 std::atomic<int> counter; 1408 tbb::enumerable_thread_specific<tbb::task_arena*> tls; 1409 1410 for (int i = 0; i < n_arenas; ++i) { 1411 arenas[i].initialize(n_workers / n_arenas + 1); // +1 for master 1412 observer[i] = new MyObserver(arenas[i], tls, failure_counter, counter, barrier); 1413 } 1414 1415 int ii = 0; 1416 for (; ii < n_outer_repetitions; ++ii) { 1417 failure_counter = 0; 1418 counter = 0; 1419 1420 // Main code 1421 auto wakeup = [&arenas] { for (auto& a : arenas) a.enqueue([]{}); }; 1422 wakeup(); 1423 for (int j = 0; j < n_repetitions; ++j) { 1424 barrier.wait(); // entry 1425 barrier.wait(); // exit1 1426 wakeup(); 1427 barrier.wait(); // exit2 1428 } 1429 barrier.wait(); // entry 1430 barrier.wait(); // exit1 1431 barrier.wait(); // exit2 1432 1433 failure_ratio.insert(float(failure_counter) / counter); 1434 tls.clear(); 1435 // collect 3 elements in failure_ratio before calculating median 1436 if (ii > 1) { 1437 std::multiset<float>::iterator it = failure_ratio.begin(); 1438 std::advance(it, failure_ratio.size() / 2); 1439 if (*it < 0.02) 1440 break; 1441 } 1442 } 1443 for (int i = 0; i < n_arenas; ++i) { 1444 delete observer[i]; 1445 } 1446 // check if median is so big 1447 std::multiset<float>::iterator it = failure_ratio.begin(); 1448 std::advance(it, failure_ratio.size() / 2); 1449 // TODO: decrease constants 0.05 and 0.3 by setting ratio between n_threads and n_arenas 1450 if (*it > 0.05) { 1451 REPORT("Warning: So many cases when threads join to different arenas.\n"); 1452 REQUIRE_MESSAGE(*it <= 0.3, "A lot of cases when threads join to different arenas.\n"); 1453 } 1454 } 1455 1456 void TestArenaWorkersMigration() { 1457 TestArenaWorkersMigrationWithNumThreads(4); 1458 if (tbb::this_task_arena::max_concurrency() != 4) { 1459 TestArenaWorkersMigrationWithNumThreads(); 1460 } 1461 } 1462 1463 //--------------------------------------------------// 1464 void TestDefaultCreatedWorkersAmount() { 1465 int threads = tbb::this_task_arena::max_concurrency(); 1466 utils::NativeParallelFor(1, [threads](int idx) { 1467 REQUIRE_MESSAGE(idx == 0, "more than 1 thread is going to reset TLS"); 1468 utils::SpinBarrier barrier(threads); 1469 ResetTLS(); 1470 for (auto blocked : { false, true }) { 1471 for (int trail = 0; trail < (blocked ? 10 : 10000); ++trail) { 1472 tbb::parallel_for(0, threads, [threads, blocked, &barrier](int) { 1473 CHECK_FAST_MESSAGE(threads == tbb::this_task_arena::max_concurrency(), "concurrency level is not equal specified threadnum"); 1474 CHECK_FAST_MESSAGE(tbb::this_task_arena::current_thread_index() < tbb::this_task_arena::max_concurrency(), "amount of created threads is more than specified by default"); 1475 local_id.local() = 1; 1476 if (blocked) { 1477 // If there is more threads than expected, 'sleep' gives a chance to join unexpected threads. 1478 utils::Sleep(1); 1479 barrier.wait(); 1480 } 1481 }, tbb::simple_partitioner()); 1482 REQUIRE_MESSAGE(local_id.size() <= size_t(threads), "amount of created threads is not equal to default num"); 1483 if (blocked) { 1484 REQUIRE_MESSAGE(local_id.size() == size_t(threads), "amount of created threads is not equal to default num"); 1485 } 1486 } 1487 } 1488 }); 1489 } 1490 1491 void TestAbilityToCreateWorkers(int thread_num) { 1492 tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, thread_num); 1493 // Checks only some part of reserved-external threads amount: 1494 // 0 and 1 reserved threads are important cases but it is also needed 1495 // to collect some statistic data with other amount and to not consume 1496 // whole test sesion time checking each amount 1497 TestArenaConcurrency(thread_num - 1, 0, int(thread_num / 2.72)); 1498 TestArenaConcurrency(thread_num, 1, int(thread_num / 3.14)); 1499 } 1500 1501 void TestDefaultWorkersLimit() { 1502 TestDefaultCreatedWorkersAmount(); 1503 #if TBB_TEST_LOW_WORKLOAD 1504 TestAbilityToCreateWorkers(24); 1505 #else 1506 TestAbilityToCreateWorkers(256); 1507 #endif 1508 } 1509 1510 #if TBB_USE_EXCEPTIONS 1511 1512 void ExceptionInExecute() { 1513 std::size_t thread_number = utils::get_platform_max_threads(); 1514 int arena_concurrency = static_cast<int>(thread_number) / 2; 1515 tbb::task_arena test_arena(arena_concurrency, arena_concurrency); 1516 1517 std::atomic<int> canceled_task{}; 1518 1519 auto parallel_func = [&test_arena, &canceled_task] (std::size_t) { 1520 for (std::size_t i = 0; i < 1000; ++i) { 1521 try { 1522 test_arena.execute([] { 1523 volatile bool suppress_unreachable_code_warning = true; 1524 if (suppress_unreachable_code_warning) { 1525 throw -1; 1526 } 1527 }); 1528 FAIL("An exception should have thrown."); 1529 } catch (int) { 1530 ++canceled_task; 1531 } catch (...) { 1532 FAIL("Wrong type of exception."); 1533 } 1534 } 1535 }; 1536 1537 utils::NativeParallelFor(thread_number, parallel_func); 1538 CHECK(canceled_task == thread_number * 1000); 1539 } 1540 1541 #endif // TBB_USE_EXCEPTIONS 1542 1543 class simple_observer : public tbb::task_scheduler_observer { 1544 static std::atomic<int> idx_counter; 1545 int my_idx; 1546 int myMaxConcurrency; // concurrency of the associated arena 1547 int myNumReservedSlots; // reserved slots in the associated arena 1548 void on_scheduler_entry( bool is_worker ) override { 1549 int current_index = tbb::this_task_arena::current_thread_index(); 1550 CHECK(current_index < (myMaxConcurrency > 1 ? myMaxConcurrency : 2)); 1551 if (is_worker) { 1552 CHECK(current_index >= myNumReservedSlots); 1553 } 1554 } 1555 void on_scheduler_exit( bool /*is_worker*/ ) override 1556 {} 1557 public: 1558 simple_observer(tbb::task_arena &a, int maxConcurrency, int numReservedSlots) 1559 : tbb::task_scheduler_observer(a), my_idx(idx_counter++) 1560 , myMaxConcurrency(maxConcurrency) 1561 , myNumReservedSlots(numReservedSlots) { 1562 observe(true); 1563 } 1564 1565 ~simple_observer(){ 1566 observe(false); 1567 } 1568 1569 friend bool operator<(const simple_observer& lhs, const simple_observer& rhs) { 1570 return lhs.my_idx < rhs.my_idx; 1571 } 1572 }; 1573 1574 std::atomic<int> simple_observer::idx_counter{}; 1575 1576 struct arena_handler { 1577 enum arena_status { 1578 alive, 1579 deleting, 1580 deleted 1581 }; 1582 1583 tbb::task_arena* arena; 1584 1585 std::atomic<arena_status> status{alive}; 1586 tbb::spin_rw_mutex arena_in_use{}; 1587 1588 tbb::concurrent_set<simple_observer> observers; 1589 1590 arena_handler(tbb::task_arena* ptr) : arena(ptr) 1591 {} 1592 1593 friend bool operator<(const arena_handler& lhs, const arena_handler& rhs) { 1594 return lhs.arena < rhs.arena; 1595 } 1596 }; 1597 1598 // TODO: Add observer operations 1599 void StressTestMixFunctionality() { 1600 enum operation_type { 1601 create_arena, 1602 delete_arena, 1603 attach_observer, 1604 detach_observer, 1605 arena_execute, 1606 enqueue_task, 1607 last_operation_marker 1608 }; 1609 1610 std::size_t operations_number = last_operation_marker; 1611 std::size_t thread_number = utils::get_platform_max_threads(); 1612 utils::FastRandom<> operation_rnd(42); 1613 tbb::spin_mutex random_operation_guard; 1614 1615 auto get_random_operation = [&operation_rnd, &random_operation_guard, operations_number] () { 1616 tbb::spin_mutex::scoped_lock lock(random_operation_guard); 1617 return static_cast<operation_type>(operation_rnd.get() % operations_number); 1618 }; 1619 1620 utils::FastRandom<> arena_rnd(42); 1621 tbb::spin_mutex random_arena_guard; 1622 auto get_random_arena = [&arena_rnd, &random_arena_guard] () { 1623 tbb::spin_mutex::scoped_lock lock(random_arena_guard); 1624 return arena_rnd.get(); 1625 }; 1626 1627 tbb::concurrent_set<arena_handler> arenas_pool; 1628 1629 std::vector<std::thread> thread_pool; 1630 1631 utils::SpinBarrier thread_barrier(thread_number); 1632 std::size_t max_operations = 20000; 1633 std::atomic<std::size_t> curr_operation{}; 1634 1635 auto find_arena = [&arenas_pool](tbb::spin_rw_mutex::scoped_lock& lock) -> decltype(arenas_pool.begin()) { 1636 for (auto curr_arena = arenas_pool.begin(); curr_arena != arenas_pool.end(); ++curr_arena) { 1637 if (lock.try_acquire(curr_arena->arena_in_use, /*writer*/ false)) { 1638 if (curr_arena->status == arena_handler::alive) { 1639 return curr_arena; 1640 } 1641 else { 1642 lock.release(); 1643 } 1644 } 1645 } 1646 return arenas_pool.end(); 1647 }; 1648 1649 auto thread_func = [&] () { 1650 arenas_pool.emplace(new tbb::task_arena()); 1651 thread_barrier.wait(); 1652 while (curr_operation++ < max_operations) { 1653 switch (get_random_operation()) { 1654 case create_arena : 1655 { 1656 arenas_pool.emplace(new tbb::task_arena()); 1657 break; 1658 } 1659 case delete_arena : 1660 { 1661 auto curr_arena = arenas_pool.begin(); 1662 for (; curr_arena != arenas_pool.end(); ++curr_arena) { 1663 arena_handler::arena_status curr_status = arena_handler::alive; 1664 if (curr_arena->status.compare_exchange_strong(curr_status, arena_handler::deleting)) { 1665 break; 1666 } 1667 } 1668 1669 if (curr_arena == arenas_pool.end()) break; 1670 1671 tbb::spin_rw_mutex::scoped_lock lock(curr_arena->arena_in_use, /*writer*/ true); 1672 1673 delete curr_arena->arena; 1674 curr_arena->status.store(arena_handler::deleted); 1675 1676 break; 1677 } 1678 case attach_observer : 1679 { 1680 tbb::spin_rw_mutex::scoped_lock lock{}; 1681 1682 auto curr_arena = find_arena(lock); 1683 if (curr_arena != arenas_pool.end()) { 1684 curr_arena->observers.emplace(*curr_arena->arena, thread_number, 1); 1685 } 1686 break; 1687 } 1688 case detach_observer: 1689 { 1690 auto arena_number = get_random_arena() % arenas_pool.size(); 1691 auto curr_arena = arenas_pool.begin(); 1692 std::advance(curr_arena, arena_number); 1693 1694 for (auto it = curr_arena->observers.begin(); it != curr_arena->observers.end(); ++it) { 1695 if (it->is_observing()) { 1696 it->observe(false); 1697 break; 1698 } 1699 } 1700 1701 break; 1702 } 1703 case arena_execute: 1704 { 1705 tbb::spin_rw_mutex::scoped_lock lock{}; 1706 auto curr_arena = find_arena(lock); 1707 1708 if (curr_arena != arenas_pool.end()) { 1709 curr_arena->arena->execute([]() { 1710 tbb::affinity_partitioner aff; 1711 tbb::parallel_for(0, 10000, utils::DummyBody(10), tbb::auto_partitioner{}); 1712 tbb::parallel_for(0, 10000, utils::DummyBody(10), aff); 1713 }); 1714 } 1715 1716 break; 1717 } 1718 case enqueue_task: 1719 { 1720 tbb::spin_rw_mutex::scoped_lock lock{}; 1721 auto curr_arena = find_arena(lock); 1722 1723 if (curr_arena != arenas_pool.end()) { 1724 curr_arena->arena->enqueue([] { utils::doDummyWork(1000); }); 1725 } 1726 1727 break; 1728 } 1729 case last_operation_marker : 1730 break; 1731 } 1732 } 1733 }; 1734 1735 for (std::size_t i = 0; i < thread_number - 1; ++i) { 1736 thread_pool.emplace_back(thread_func); 1737 } 1738 1739 thread_func(); 1740 1741 for (std::size_t i = 0; i < thread_number - 1; ++i) { 1742 if (thread_pool[i].joinable()) thread_pool[i].join(); 1743 } 1744 1745 for (auto& handler : arenas_pool) { 1746 if (handler.status != arena_handler::deleted) delete handler.arena; 1747 } 1748 } 1749 1750 struct enqueue_test_helper { 1751 enqueue_test_helper(tbb::task_arena& arena, tbb::enumerable_thread_specific<bool>& ets , std::atomic<std::size_t>& task_counter) 1752 : my_arena(arena), my_ets(ets), my_task_counter(task_counter) 1753 {} 1754 1755 enqueue_test_helper(const enqueue_test_helper& ef) : my_arena(ef.my_arena), my_ets(ef.my_ets), my_task_counter(ef.my_task_counter) 1756 {} 1757 1758 void operator() () const { 1759 CHECK(my_ets.local()); 1760 if (my_task_counter++ < 100000) my_arena.enqueue(enqueue_test_helper(my_arena, my_ets, my_task_counter)); 1761 utils::yield(); 1762 } 1763 1764 tbb::task_arena& my_arena; 1765 tbb::enumerable_thread_specific<bool>& my_ets; 1766 std::atomic<std::size_t>& my_task_counter; 1767 }; 1768 1769 void test_threads_sleep(int concurrency, int reserved_slots, int num_external_threads) { 1770 tbb::task_arena a(concurrency, reserved_slots); 1771 std::mutex m; 1772 std::condition_variable cond_var; 1773 bool completed{ false }; 1774 utils::SpinBarrier barrier( concurrency - reserved_slots + 1 ); 1775 1776 auto body = [&] { 1777 std::unique_lock<std::mutex> lock(m); 1778 cond_var.wait(lock, [&] { return completed == true; }); 1779 }; 1780 1781 for (int i = 0; i < concurrency - reserved_slots; ++i) { 1782 a.enqueue([&] { 1783 body(); 1784 barrier.signalNoWait(); 1785 }); 1786 } 1787 std::vector<std::thread> threads; 1788 for (int i = 0; i < num_external_threads; ++i) { 1789 threads.emplace_back([&]() { a.execute(body); }); 1790 } 1791 TestCPUUserTime(concurrency); 1792 1793 { 1794 std::lock_guard<std::mutex> lock(m); 1795 completed = true; 1796 cond_var.notify_all(); 1797 } 1798 for (auto& t : threads) { 1799 t.join(); 1800 } 1801 barrier.wait(); 1802 } 1803 1804 void test_threads_sleep(int concurrency, int reserved_slots) { 1805 test_threads_sleep(concurrency, reserved_slots, reserved_slots); 1806 test_threads_sleep(concurrency, reserved_slots, 2 * concurrency); 1807 } 1808 1809 //--------------------------------------------------// 1810 1811 // This test requires TBB in an uninitialized state 1812 //! \brief \ref requirement 1813 TEST_CASE("task_arena initialize soft limit ignoring affinity mask") { 1814 REQUIRE_MESSAGE((tbb::this_task_arena::current_thread_index() == tbb::task_arena::not_initialized), "TBB was initialized state"); 1815 tbb::enumerable_thread_specific<int> ets; 1816 1817 tbb::task_arena arena(int(utils::get_platform_max_threads() * 2)); 1818 arena.execute([&ets] { 1819 tbb::parallel_for(0, 10000000, [&ets](int){ 1820 ets.local() = 1; 1821 utils::doDummyWork(100); 1822 }); 1823 }); 1824 1825 CHECK(ets.combine(std::plus<int>{}) <= int(utils::get_platform_max_threads())); 1826 } 1827 1828 //! Test for task arena in concurrent cases 1829 //! \brief \ref requirement 1830 TEST_CASE("Test for concurrent functionality") { 1831 TestConcurrentFunctionality(); 1832 } 1833 1834 //! Test for arena entry consistency 1835 //! \brief \ref requirement \ref error_guessing 1836 TEST_CASE("Test for task arena entry consistency") { 1837 TestArenaEntryConsistency(); 1838 } 1839 1840 //! Test for task arena attach functionality 1841 //! \brief \ref requirement \ref interface 1842 TEST_CASE("Test for the attach functionality") { 1843 TestAttach(4); 1844 } 1845 1846 //! Test for constant functor requirements 1847 //! \brief \ref requirement \ref interface 1848 TEST_CASE("Test for constant functor requirement") { 1849 TestConstantFunctorRequirement(); 1850 } 1851 1852 //! Test for move semantics support 1853 //! \brief \ref requirement \ref interface 1854 TEST_CASE("Move semantics support") { 1855 TestMoveSemantics(); 1856 } 1857 1858 //! Test for different return value types 1859 //! \brief \ref requirement \ref interface 1860 TEST_CASE("Return value test") { 1861 TestReturnValue(); 1862 } 1863 1864 //! Test for delegated task spawn in case of unsuccessful slot attach 1865 //! \brief \ref error_guessing 1866 TEST_CASE("Delegated spawn wait") { 1867 TestDelegatedSpawnWait(); 1868 } 1869 1870 //! Test task arena isolation functionality 1871 //! \brief \ref requirement \ref interface 1872 TEST_CASE("Isolated execute") { 1873 // Isolation tests cases is valid only for more then 2 threads 1874 if (tbb::this_task_arena::max_concurrency() > 2) { 1875 TestIsolatedExecute(); 1876 } 1877 } 1878 1879 //! Test for TBB Workers creation limits 1880 //! \brief \ref requirement 1881 TEST_CASE("Default workers limit") { 1882 TestDefaultWorkersLimit(); 1883 } 1884 1885 //! Test for workers migration between arenas 1886 //! \brief \ref error_guessing \ref stress 1887 TEST_CASE("Arena workers migration") { 1888 TestArenaWorkersMigration(); 1889 } 1890 1891 //! Test for multiple waits, threads should not block each other 1892 //! \brief \ref requirement 1893 TEST_CASE("Multiple waits") { 1894 TestMultipleWaits(); 1895 } 1896 1897 //! Test for small stack size settings and arena initialization 1898 //! \brief \ref error_guessing 1899 TEST_CASE("Small stack size") { 1900 TestSmallStackSize(); 1901 } 1902 1903 #if TBB_USE_EXCEPTIONS 1904 //! \brief \ref requirement \ref stress 1905 TEST_CASE("Test for exceptions during execute.") { 1906 ExceptionInExecute(); 1907 } 1908 1909 //! \brief \ref error_guessing 1910 TEST_CASE("Exception thrown during tbb::task_arena::execute call") { 1911 struct throwing_obj { 1912 throwing_obj() { 1913 volatile bool flag = true; 1914 if (flag) throw std::exception{}; 1915 } 1916 throwing_obj(const throwing_obj&) = default; 1917 ~throwing_obj() { FAIL("An destructor was called."); } 1918 }; 1919 1920 tbb::task_arena arena; 1921 1922 REQUIRE_THROWS_AS( [&] { 1923 arena.execute([] { 1924 return throwing_obj{}; 1925 }); 1926 }(), std::exception ); 1927 } 1928 #endif // TBB_USE_EXCEPTIONS 1929 1930 //! \brief \ref stress 1931 TEST_CASE("Stress test with mixing functionality") { 1932 StressTestMixFunctionality(); 1933 } 1934 1935 //! \brief \ref stress 1936 TEST_CASE("Workers oversubscription") { 1937 std::size_t num_threads = utils::get_platform_max_threads(); 1938 tbb::enumerable_thread_specific<bool> ets; 1939 tbb::global_control gl(tbb::global_control::max_allowed_parallelism, num_threads * 2); 1940 tbb::task_arena arena(static_cast<int>(num_threads) * 2); 1941 1942 utils::SpinBarrier barrier(num_threads * 2); 1943 1944 arena.execute([&] { 1945 tbb::parallel_for(std::size_t(0), num_threads * 2, 1946 [&] (const std::size_t&) { 1947 ets.local() = true; 1948 barrier.wait(); 1949 } 1950 ); 1951 }); 1952 1953 utils::yield(); 1954 1955 std::atomic<std::size_t> task_counter{0}; 1956 for (std::size_t i = 0; i < num_threads / 4 + 1; ++i) { 1957 arena.enqueue(enqueue_test_helper(arena, ets, task_counter)); 1958 } 1959 1960 while (task_counter < 100000) utils::yield(); 1961 1962 arena.execute([&] { 1963 tbb::parallel_for(std::size_t(0), num_threads * 2, 1964 [&] (const std::size_t&) { 1965 CHECK(ets.local()); 1966 barrier.wait(); 1967 } 1968 ); 1969 }); 1970 } 1971 1972 #if TBB_USE_EXCEPTIONS 1973 //! The test for error in scheduling empty task_handle 1974 //! \brief \ref requirement 1975 TEST_CASE("Empty task_handle cannot be scheduled" 1976 * doctest::should_fail() //Test needs to revised as implementation uses assertions instead of exceptions 1977 * doctest::skip() //skip the test for now, to not pollute the test log 1978 ){ 1979 tbb::task_arena ta; 1980 1981 CHECK_THROWS_WITH_AS(ta.enqueue(tbb::task_handle{}), "Attempt to schedule empty task_handle", std::runtime_error); 1982 CHECK_THROWS_WITH_AS(tbb::this_task_arena::enqueue(tbb::task_handle{}), "Attempt to schedule empty task_handle", std::runtime_error); 1983 } 1984 #endif 1985 1986 //! \brief \ref error_guessing 1987 TEST_CASE("Test threads sleep") { 1988 for (auto concurrency_level : utils::concurrency_range()) { 1989 int conc = int(concurrency_level); 1990 test_threads_sleep(conc, 0); 1991 test_threads_sleep(conc, 1); 1992 test_threads_sleep(conc, conc/2); 1993 test_threads_sleep(conc, conc); 1994 } 1995 } 1996 1997 #if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS 1998 1999 //! Basic test for is_inside_task in task_group 2000 //! \brief \ref interface \ref requirement 2001 TEST_CASE("is_inside_task in task_group"){ 2002 CHECK( false == tbb::is_inside_task()); 2003 2004 tbb::task_group tg; 2005 tg.run_and_wait([&]{ 2006 CHECK( true == tbb::is_inside_task()); 2007 }); 2008 } 2009 2010 //! Basic test for is_inside_task in arena::execute 2011 //! \brief \ref interface \ref requirement 2012 TEST_CASE("is_inside_task in arena::execute"){ 2013 CHECK( false == tbb::is_inside_task()); 2014 2015 tbb::task_arena arena; 2016 2017 arena.execute([&]{ 2018 // The execute method is processed outside of any task 2019 CHECK( false == tbb::is_inside_task()); 2020 }); 2021 } 2022 2023 //! The test for is_inside_task in arena::execute when inside other task 2024 //! \brief \ref error_guessing 2025 TEST_CASE("is_inside_task in arena::execute") { 2026 CHECK(false == tbb::is_inside_task()); 2027 2028 tbb::task_arena arena; 2029 tbb::task_group tg; 2030 tg.run_and_wait([&] { 2031 arena.execute([&] { 2032 // The execute method is processed outside of any task 2033 CHECK(false == tbb::is_inside_task()); 2034 }); 2035 }); 2036 } 2037 #endif //__TBB_PREVIEW_TASK_GROUP_EXTENSIONS 2038 2039 //! \brief \ref interface \ref requirement \ref regression 2040 TEST_CASE("worker threads occupy slots in correct range") { 2041 std::vector<tbb::task_arena> arenas(42); 2042 for (auto& arena : arenas) { 2043 arena.initialize(1, 0); 2044 } 2045 2046 std::atomic<int> counter{0}; 2047 for (auto& arena : arenas) { 2048 arena.enqueue([&] { 2049 CHECK(tbb::this_task_arena::current_thread_index() == 0); 2050 ++counter; 2051 }); 2052 } 2053 2054 while (counter < 42) { utils::yield(); } 2055 } 2056