1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "common/test.h" 18 19 #define __TBB_EXTRA_DEBUG 1 20 #include "common/concurrency_tracker.h" 21 #include "common/spin_barrier.h" 22 #include "common/utils.h" 23 #include "common/utils_report.h" 24 #include "common/utils_concurrency_limit.h" 25 26 #include "tbb/task_arena.h" 27 #include "tbb/task_scheduler_observer.h" 28 #include "tbb/enumerable_thread_specific.h" 29 #include "tbb/parallel_for.h" 30 #include "tbb/global_control.h" 31 #include "tbb/concurrent_set.h" 32 #include "tbb/spin_mutex.h" 33 #include "tbb/spin_rw_mutex.h" 34 #include "tbb/task_group.h" 35 36 #include <stdexcept> 37 #include <cstdlib> 38 #include <cstdio> 39 #include <vector> 40 #include <thread> 41 #include <atomic> 42 43 //#include "harness_fp.h" 44 45 //! \file test_task_arena.cpp 46 //! \brief Test for [scheduler.task_arena scheduler.task_scheduler_observer] specification 47 48 //--------------------------------------------------// 49 // Test that task_arena::initialize and task_arena::terminate work when doing nothing else. 50 /* maxthread is treated as the biggest possible concurrency level. */ 51 void InitializeAndTerminate( int maxthread ) { 52 for( int i=0; i<200; ++i ) { 53 switch( i&3 ) { 54 // Arena is created inactive, initialization is always explicit. Lazy initialization is covered by other test functions. 55 // Explicit initialization can either keep the original values or change those. 56 // Arena termination can be explicit or implicit (in the destructor). 57 // TODO: extend with concurrency level checks if such a method is added. 58 default: { 59 tbb::task_arena arena( std::rand() % maxthread + 1 ); 60 CHECK_MESSAGE(!arena.is_active(), "arena should not be active until initialized"); 61 arena.initialize(); 62 CHECK(arena.is_active()); 63 arena.terminate(); 64 CHECK_MESSAGE(!arena.is_active(), "arena should not be active; it was terminated"); 65 break; 66 } 67 case 0: { 68 tbb::task_arena arena( 1 ); 69 CHECK_MESSAGE(!arena.is_active(), "arena should not be active until initialized"); 70 arena.initialize( std::rand() % maxthread + 1 ); // change the parameters 71 CHECK(arena.is_active()); 72 break; 73 } 74 case 1: { 75 tbb::task_arena arena( tbb::task_arena::automatic ); 76 CHECK(!arena.is_active()); 77 arena.initialize(); 78 CHECK(arena.is_active()); 79 break; 80 } 81 case 2: { 82 tbb::task_arena arena; 83 CHECK_MESSAGE(!arena.is_active(), "arena should not be active until initialized"); 84 arena.initialize( std::rand() % maxthread + 1 ); 85 CHECK(arena.is_active()); 86 arena.terminate(); 87 CHECK_MESSAGE(!arena.is_active(), "arena should not be active; it was terminated"); 88 break; 89 } 90 } 91 } 92 } 93 94 //--------------------------------------------------// 95 96 // Definitions used in more than one test 97 typedef tbb::blocked_range<int> Range; 98 99 // slot_id value: -1 is reserved by current_slot(), -2 is set in on_scheduler_exit() below 100 static tbb::enumerable_thread_specific<int> local_id, old_id, slot_id(-3); 101 102 void ResetTLS() { 103 local_id.clear(); 104 old_id.clear(); 105 slot_id.clear(); 106 } 107 108 class ArenaObserver : public tbb::task_scheduler_observer { 109 int myId; // unique observer/arena id within a test 110 int myMaxConcurrency; // concurrency of the associated arena 111 int myNumReservedSlots; // reserved slots in the associated arena 112 void on_scheduler_entry( bool is_worker ) override { 113 int current_index = tbb::this_task_arena::current_thread_index(); 114 CHECK(current_index < (myMaxConcurrency > 1 ? myMaxConcurrency : 2)); 115 if (is_worker) { 116 CHECK(current_index >= myNumReservedSlots); 117 } 118 CHECK_MESSAGE(!old_id.local(), "double call to on_scheduler_entry"); 119 old_id.local() = local_id.local(); 120 CHECK_MESSAGE(old_id.local() != myId, "double entry to the same arena"); 121 local_id.local() = myId; 122 slot_id.local() = current_index; 123 } 124 void on_scheduler_exit( bool /*is_worker*/ ) override { 125 CHECK_MESSAGE(local_id.local() == myId, "nesting of arenas is broken"); 126 CHECK(slot_id.local() == tbb::this_task_arena::current_thread_index()); 127 slot_id.local() = -2; 128 local_id.local() = old_id.local(); 129 old_id.local() = 0; 130 } 131 public: 132 ArenaObserver(tbb::task_arena &a, int maxConcurrency, int numReservedSlots, int id) 133 : tbb::task_scheduler_observer(a) 134 , myId(id) 135 , myMaxConcurrency(maxConcurrency) 136 , myNumReservedSlots(numReservedSlots) { 137 CHECK(myId); 138 observe(true); 139 } 140 ~ArenaObserver () { 141 CHECK_MESSAGE(!old_id.local(), "inconsistent observer state"); 142 } 143 }; 144 145 struct IndexTrackingBody { // Must be used together with ArenaObserver 146 void operator() ( const Range& ) const { 147 CHECK(slot_id.local() == tbb::this_task_arena::current_thread_index()); 148 utils::doDummyWork(50000); 149 } 150 }; 151 152 struct AsynchronousWork { 153 utils::SpinBarrier &my_barrier; 154 bool my_is_blocking; 155 AsynchronousWork(utils::SpinBarrier &a_barrier, bool blocking = true) 156 : my_barrier(a_barrier), my_is_blocking(blocking) {} 157 void operator()() const { 158 CHECK_MESSAGE(local_id.local() != 0, "not in explicit arena"); 159 tbb::parallel_for(Range(0,500), IndexTrackingBody(), tbb::simple_partitioner()); 160 if(my_is_blocking) my_barrier.wait(); // must be asynchronous to an external thread 161 else my_barrier.signalNoWait(); 162 } 163 }; 164 165 //-----------------------------------------------------------------------------------------// 166 167 // Test that task_arenas might be created and used from multiple application threads. 168 // Also tests arena observers. The parameter p is the index of an app thread running this test. 169 void TestConcurrentArenasFunc(int idx) { 170 // A regression test for observer activation order: 171 // check that arena observer can be activated before local observer 172 struct LocalObserver : public tbb::task_scheduler_observer { 173 LocalObserver() : tbb::task_scheduler_observer() { observe(true); } 174 LocalObserver(tbb::task_arena& a) : tbb::task_scheduler_observer(a) { 175 observe(true); 176 } 177 }; 178 179 tbb::task_arena a1; 180 a1.initialize(1,0); 181 ArenaObserver o1(a1, 1, 0, idx*2+1); // the last argument is a "unique" observer/arena id for the test 182 CHECK_MESSAGE(o1.is_observing(), "Arena observer has not been activated"); 183 184 tbb::task_arena a2(2,1); 185 ArenaObserver o2(a2, 2, 1, idx*2+2); 186 CHECK_MESSAGE(o2.is_observing(), "Arena observer has not been activated"); 187 188 LocalObserver lo1; 189 CHECK_MESSAGE(lo1.is_observing(), "Local observer has not been activated"); 190 191 tbb::task_arena a3(1, 0); 192 LocalObserver lo2(a3); 193 CHECK_MESSAGE(lo2.is_observing(), "Local observer has not been activated"); 194 195 utils::SpinBarrier barrier(2); 196 AsynchronousWork work(barrier); 197 198 a1.enqueue(work); // put async work 199 barrier.wait(); 200 201 a2.enqueue(work); // another work 202 a2.execute(work); 203 204 a3.execute([] { 205 utils::doDummyWork(100); 206 }); 207 208 a1.debug_wait_until_empty(); 209 a2.debug_wait_until_empty(); 210 } 211 212 void TestConcurrentArenas(int p) { 213 // TODO REVAMP fix with global control 214 ResetTLS(); 215 utils::NativeParallelFor( p, &TestConcurrentArenasFunc ); 216 } 217 //--------------------------------------------------// 218 // Test multiple application threads working with a single arena at the same time. 219 class MultipleMastersPart1 : utils::NoAssign { 220 tbb::task_arena &my_a; 221 utils::SpinBarrier &my_b1, &my_b2; 222 public: 223 MultipleMastersPart1( tbb::task_arena &a, utils::SpinBarrier &b1, utils::SpinBarrier &b2) 224 : my_a(a), my_b1(b1), my_b2(b2) {} 225 void operator()(int) const { 226 my_a.execute(AsynchronousWork(my_b2, /*blocking=*/false)); 227 my_b1.wait(); 228 // A regression test for bugs 1954 & 1971 229 my_a.enqueue(AsynchronousWork(my_b2, /*blocking=*/false)); 230 } 231 }; 232 233 class MultipleMastersPart2 : utils::NoAssign { 234 tbb::task_arena &my_a; 235 utils::SpinBarrier &my_b; 236 public: 237 MultipleMastersPart2( tbb::task_arena &a, utils::SpinBarrier &b) : my_a(a), my_b(b) {} 238 void operator()(int) const { 239 my_a.execute(AsynchronousWork(my_b, /*blocking=*/false)); 240 } 241 }; 242 243 class MultipleMastersPart3 : utils::NoAssign { 244 tbb::task_arena &my_a; 245 utils::SpinBarrier &my_b; 246 using wait_context = tbb::detail::d1::wait_context; 247 248 struct Runner : NoAssign { 249 wait_context& myWait; 250 Runner(wait_context& w) : myWait(w) {} 251 void operator()() const { 252 utils::doDummyWork(10000); 253 myWait.release(); 254 } 255 }; 256 257 struct Waiter : NoAssign { 258 wait_context& myWait; 259 Waiter(wait_context& w) : myWait(w) {} 260 void operator()() const { 261 tbb::task_group_context ctx; 262 tbb::detail::d1::wait(myWait, ctx); 263 } 264 }; 265 266 public: 267 MultipleMastersPart3(tbb::task_arena &a, utils::SpinBarrier &b) 268 : my_a(a), my_b(b) {} 269 void operator()(int) const { 270 wait_context wait(0); 271 my_b.wait(); // increases chances for task_arena initialization contention 272 for( int i=0; i<100; ++i) { 273 wait.reserve(); 274 my_a.enqueue(Runner(wait)); 275 my_a.execute(Waiter(wait)); 276 } 277 my_b.wait(); 278 } 279 }; 280 281 void TestMultipleMasters(int p) { 282 { 283 ResetTLS(); 284 tbb::task_arena a(1,0); 285 a.initialize(); 286 ArenaObserver o(a, 1, 0, 1); 287 utils::SpinBarrier barrier1(p), barrier2(2*p+1); // each of p threads will submit two tasks signaling the barrier 288 NativeParallelFor( p, MultipleMastersPart1(a, barrier1, barrier2) ); 289 barrier2.wait(); 290 a.debug_wait_until_empty(); 291 } { 292 ResetTLS(); 293 tbb::task_arena a(2,1); 294 ArenaObserver o(a, 2, 1, 2); 295 utils::SpinBarrier barrier(p+2); 296 a.enqueue(AsynchronousWork(barrier, /*blocking=*/true)); // occupy the worker, a regression test for bug 1981 297 // TODO: buggy test. A worker threads need time to occupy the slot to prevent an external thread from taking an enqueue task. 298 utils::Sleep(10); 299 NativeParallelFor( p, MultipleMastersPart2(a, barrier) ); 300 barrier.wait(); 301 a.debug_wait_until_empty(); 302 } { 303 // Regression test for the bug 1981 part 2 (task_arena::execute() with wait_for_all for an enqueued task) 304 tbb::task_arena a(p,1); 305 utils::SpinBarrier barrier(p+1); // for external threads to avoid endless waiting at least in some runs 306 // "Oversubscribe" the arena by 1 external thread 307 NativeParallelFor( p+1, MultipleMastersPart3(a, barrier) ); 308 a.debug_wait_until_empty(); 309 } 310 } 311 312 //--------------------------------------------------// 313 // TODO: explain what TestArenaEntryConsistency does 314 #include <sstream> 315 #include <stdexcept> 316 #include "oneapi/tbb/detail/_exception.h" 317 #include "common/fp_control.h" 318 319 struct TestArenaEntryBody : FPModeContext { 320 std::atomic<int> &my_stage; // each execute increases it 321 std::stringstream my_id; 322 bool is_caught, is_expected; 323 enum { arenaFPMode = 1 }; 324 325 TestArenaEntryBody(std::atomic<int> &s, int idx, int i) // init thread-specific instance 326 : FPModeContext(idx+i) 327 , my_stage(s) 328 , is_caught(false) 329 #if TBB_USE_EXCEPTIONS 330 , is_expected( (idx&(1<<i)) != 0 ) 331 #else 332 , is_expected(false) 333 #endif 334 { 335 my_id << idx << ':' << i << '@'; 336 } 337 void operator()() { // inside task_arena::execute() 338 // synchronize with other stages 339 int stage = my_stage++; 340 int slot = tbb::this_task_arena::current_thread_index(); 341 CHECK(slot >= 0); 342 CHECK(slot <= 1); 343 // wait until the third stage is delegated and then starts on slot 0 344 while(my_stage < 2+slot) utils::yield(); 345 // deduct its entry type and put it into id, it helps to find source of a problem 346 my_id << (stage < 3 ? (tbb::this_task_arena::current_thread_index()? 347 "delegated_to_worker" : stage < 2? "direct" : "delegated_to_master") 348 : stage == 3? "nested_same_ctx" : "nested_alien_ctx"); 349 AssertFPMode(arenaFPMode); 350 if (is_expected) { 351 TBB_TEST_THROW(std::logic_error(my_id.str())); 352 } 353 // no code can be put here since exceptions can be thrown 354 } 355 void on_exception(const char *e) { // outside arena, in catch block 356 is_caught = true; 357 CHECK(my_id.str() == e); 358 assertFPMode(); 359 } 360 void after_execute() { // outside arena and catch block 361 CHECK(is_caught == is_expected); 362 assertFPMode(); 363 } 364 }; 365 366 class ForEachArenaEntryBody : utils::NoAssign { 367 tbb::task_arena &my_a; // expected task_arena(2,1) 368 std::atomic<int> &my_stage; // each execute increases it 369 int my_idx; 370 371 public: 372 ForEachArenaEntryBody(tbb::task_arena &a, std::atomic<int> &c) 373 : my_a(a), my_stage(c), my_idx(0) {} 374 375 void test(int idx) { 376 my_idx = idx; 377 my_stage = 0; 378 NativeParallelFor(3, *this); // test cross-arena calls 379 CHECK(my_stage == 3); 380 my_a.execute(*this); // test nested calls 381 CHECK(my_stage == 5); 382 } 383 384 // task_arena functor for nested tests 385 void operator()() const { 386 test_arena_entry(3); // in current task group context 387 tbb::parallel_for(4, 5, *this); // in different context 388 } 389 390 // NativeParallelFor & parallel_for functor 391 void operator()(int i) const { 392 test_arena_entry(i); 393 } 394 395 private: 396 void test_arena_entry(int i) const { 397 GetRoundingMode(); 398 TestArenaEntryBody scoped_functor(my_stage, my_idx, i); 399 GetRoundingMode(); 400 #if TBB_USE_EXCEPTIONS 401 try { 402 my_a.execute(scoped_functor); 403 } catch(std::logic_error &e) { 404 scoped_functor.on_exception(e.what()); 405 } catch(...) { CHECK_MESSAGE(false, "Unexpected exception type"); } 406 #else 407 my_a.execute(scoped_functor); 408 #endif 409 scoped_functor.after_execute(); 410 } 411 }; 412 413 void TestArenaEntryConsistency() { 414 tbb::task_arena a(2, 1); 415 std::atomic<int> c; 416 ForEachArenaEntryBody body(a, c); 417 418 FPModeContext fp_scope(TestArenaEntryBody::arenaFPMode); 419 a.initialize(); // capture FP settings to arena 420 fp_scope.setNextFPMode(); 421 422 for (int i = 0; i < 100; i++) // not less than 32 = 2^5 of entry types 423 body.test(i); 424 } 425 //-------------------------------------------------- 426 // Test that the requested degree of concurrency for task_arena is achieved in various conditions 427 class TestArenaConcurrencyBody : utils::NoAssign { 428 tbb::task_arena &my_a; 429 int my_max_concurrency; 430 int my_reserved_slots; 431 utils::SpinBarrier *my_barrier; 432 utils::SpinBarrier *my_worker_barrier; 433 public: 434 TestArenaConcurrencyBody( tbb::task_arena &a, int max_concurrency, int reserved_slots, utils::SpinBarrier *b = NULL, utils::SpinBarrier *wb = NULL ) 435 : my_a(a), my_max_concurrency(max_concurrency), my_reserved_slots(reserved_slots), my_barrier(b), my_worker_barrier(wb) {} 436 // NativeParallelFor's functor 437 void operator()( int ) const { 438 CHECK_MESSAGE( local_id.local() == 0, "TLS was not cleaned?" ); 439 local_id.local() = 1; 440 my_a.execute( *this ); 441 } 442 // Arena's functor 443 void operator()() const { 444 int idx = tbb::this_task_arena::current_thread_index(); 445 CHECK( idx < (my_max_concurrency > 1 ? my_max_concurrency : 2) ); 446 CHECK( my_a.max_concurrency() == tbb::this_task_arena::max_concurrency() ); 447 int max_arena_concurrency = tbb::this_task_arena::max_concurrency(); 448 CHECK( max_arena_concurrency == my_max_concurrency ); 449 if ( my_worker_barrier ) { 450 if ( local_id.local() == 1 ) { 451 // External thread in a reserved slot 452 CHECK_MESSAGE( idx < my_reserved_slots, "External threads are supposed to use only reserved slots in this test" ); 453 } else { 454 // Worker thread 455 CHECK( idx >= my_reserved_slots ); 456 my_worker_barrier->wait(); 457 } 458 } else if ( my_barrier ) 459 CHECK_MESSAGE( local_id.local() == 1, "Workers are not supposed to enter the arena in this test" ); 460 if ( my_barrier ) my_barrier->wait(); 461 else utils::Sleep( 1 ); 462 } 463 }; 464 465 void TestArenaConcurrency( int p, int reserved = 0, int step = 1) { 466 for (; reserved <= p; reserved += step) { 467 tbb::task_arena a( p, reserved ); 468 if (p - reserved < tbb::this_task_arena::max_concurrency()) { 469 // Check concurrency with worker & reserved external threads. 470 ResetTLS(); 471 utils::SpinBarrier b( p ); 472 utils::SpinBarrier wb( p-reserved ); 473 TestArenaConcurrencyBody test( a, p, reserved, &b, &wb ); 474 for ( int i = reserved; i < p; ++i ) // requests p-reserved worker threads 475 a.enqueue( test ); 476 if ( reserved==1 ) 477 test( 0 ); // calls execute() 478 else 479 utils::NativeParallelFor( reserved, test ); 480 a.debug_wait_until_empty(); 481 } { // Check if multiple external threads alone can achieve maximum concurrency. 482 ResetTLS(); 483 utils::SpinBarrier b( p ); 484 utils::NativeParallelFor( p, TestArenaConcurrencyBody( a, p, reserved, &b ) ); 485 a.debug_wait_until_empty(); 486 } { // Check oversubscription by external threads. 487 #if !_WIN32 || !_WIN64 488 // Some C++ implementations allocate 8MB stacks for std::thread on 32 bit platforms 489 // that makes impossible to create more than ~500 threads. 490 if ( !(sizeof(std::size_t) == 4 && p > 200) ) 491 #endif 492 #if TBB_TEST_LOW_WORKLOAD 493 if ( p <= 16 ) 494 #endif 495 { 496 ResetTLS(); 497 utils::NativeParallelFor(2 * p, TestArenaConcurrencyBody(a, p, reserved)); 498 a.debug_wait_until_empty(); 499 } 500 } 501 } 502 } 503 504 struct TestMandatoryConcurrencyObserver : public tbb::task_scheduler_observer { 505 utils::SpinBarrier& m_barrier; 506 507 TestMandatoryConcurrencyObserver(tbb::task_arena& a, utils::SpinBarrier& barrier) 508 : tbb::task_scheduler_observer(a), m_barrier(barrier) { 509 observe(true); 510 } 511 void on_scheduler_exit(bool worker) override { 512 if (worker) { 513 m_barrier.wait(); 514 } 515 } 516 }; 517 518 void TestMandatoryConcurrency() { 519 tbb::task_arena a(1); 520 a.execute([&a] { 521 int n_threads = 4; 522 utils::SpinBarrier exit_barrier(2); 523 TestMandatoryConcurrencyObserver observer(a, exit_barrier); 524 for (int j = 0; j < 5; ++j) { 525 utils::ExactConcurrencyLevel::check(1); 526 std::atomic<int> num_tasks{ 0 }, curr_tasks{ 0 }; 527 utils::SpinBarrier barrier(n_threads); 528 utils::NativeParallelFor(n_threads, [&](int) { 529 for (int i = 0; i < 5; ++i) { 530 barrier.wait(); 531 a.enqueue([&] { 532 CHECK(tbb::this_task_arena::max_concurrency() == 2); 533 CHECK(a.max_concurrency() == 2); 534 ++curr_tasks; 535 CHECK(curr_tasks == 1); 536 utils::doDummyWork(1000); 537 CHECK(curr_tasks == 1); 538 --curr_tasks; 539 ++num_tasks; 540 }); 541 barrier.wait(); 542 } 543 }); 544 do { 545 exit_barrier.wait(); 546 } while (num_tasks < n_threads * 5); 547 } 548 observer.observe(false); 549 }); 550 } 551 552 void TestConcurrentFunctionality(int min_thread_num = 1, int max_thread_num = 3) { 553 TestMandatoryConcurrency(); 554 InitializeAndTerminate(max_thread_num); 555 for (int p = min_thread_num; p <= max_thread_num; ++p) { 556 TestConcurrentArenas(p); 557 TestMultipleMasters(p); 558 TestArenaConcurrency(p); 559 } 560 } 561 562 //--------------------------------------------------// 563 // Test creation/initialization of a task_arena that references an existing arena (aka attach). 564 // This part of the test uses the knowledge of task_arena internals 565 566 struct TaskArenaValidator { 567 int my_slot_at_construction; 568 const tbb::task_arena& my_arena; 569 TaskArenaValidator( const tbb::task_arena& other ) 570 : my_slot_at_construction(tbb::this_task_arena::current_thread_index()) 571 , my_arena(other) 572 {} 573 // Inspect the internal state 574 int concurrency() { return my_arena.debug_max_concurrency(); } 575 int reserved_for_masters() { return my_arena.debug_reserved_slots(); } 576 577 // This method should be called in task_arena::execute() for a captured arena 578 // by the same thread that created the validator. 579 void operator()() { 580 CHECK_MESSAGE( tbb::this_task_arena::current_thread_index()==my_slot_at_construction, 581 "Current thread index has changed since the validator construction" ); 582 } 583 }; 584 585 void ValidateAttachedArena( tbb::task_arena& arena, bool expect_activated, 586 int expect_concurrency, int expect_masters ) { 587 CHECK_MESSAGE( arena.is_active()==expect_activated, "Unexpected activation state" ); 588 if( arena.is_active() ) { 589 TaskArenaValidator validator( arena ); 590 CHECK_MESSAGE( validator.concurrency()==expect_concurrency, "Unexpected arena size" ); 591 CHECK_MESSAGE( validator.reserved_for_masters()==expect_masters, "Unexpected # of reserved slots" ); 592 if ( tbb::this_task_arena::current_thread_index() != tbb::task_arena::not_initialized ) { 593 CHECK(tbb::this_task_arena::current_thread_index() >= 0); 594 // for threads already in arena, check that the thread index remains the same 595 arena.execute( validator ); 596 } else { // not_initialized 597 // Test the deprecated method 598 CHECK(tbb::this_task_arena::current_thread_index() == -1); 599 } 600 601 // Ideally, there should be a check for having the same internal arena object, 602 // but that object is not easily accessible for implicit arenas. 603 } 604 } 605 606 struct TestAttachBody : utils::NoAssign { 607 static thread_local int my_idx; // safe to modify and use within the NativeParallelFor functor 608 const int maxthread; 609 TestAttachBody( int max_thr ) : maxthread(max_thr) {} 610 611 // The functor body for NativeParallelFor 612 void operator()( int idx ) const { 613 my_idx = idx; 614 615 int default_threads = tbb::this_task_arena::max_concurrency(); 616 617 tbb::task_arena arena{tbb::task_arena::attach()}; 618 ValidateAttachedArena( arena, false, -1, -1 ); // Nothing yet to attach to 619 620 arena.terminate(); 621 ValidateAttachedArena( arena, false, -1, -1 ); 622 623 // attach to an auto-initialized arena 624 tbb::parallel_for(0, 1, [](int) {}); 625 626 tbb::task_arena arena2{tbb::task_arena::attach()}; 627 ValidateAttachedArena( arena2, true, default_threads, 1 ); 628 629 tbb::task_arena arena3; 630 arena3.initialize(tbb::attach()); 631 ValidateAttachedArena( arena3, true, default_threads, 1 ); 632 633 634 // attach to another task_arena 635 arena.initialize( maxthread, std::min(maxthread,idx) ); 636 arena.execute( *this ); 637 } 638 639 // The functor body for task_arena::execute above 640 void operator()() const { 641 tbb::task_arena arena2{tbb::task_arena::attach()}; 642 ValidateAttachedArena( arena2, true, maxthread, std::min(maxthread,my_idx) ); 643 } 644 645 // The functor body for tbb::parallel_for 646 void operator()( const Range& r ) const { 647 for( int i = r.begin(); i<r.end(); ++i ) { 648 tbb::task_arena arena2{tbb::task_arena::attach()}; 649 ValidateAttachedArena( arena2, true, tbb::this_task_arena::max_concurrency(), 1 ); 650 } 651 } 652 }; 653 654 thread_local int TestAttachBody::my_idx; 655 656 void TestAttach( int maxthread ) { 657 // Externally concurrent, but no concurrency within a thread 658 utils::NativeParallelFor( std::max(maxthread,4), TestAttachBody( maxthread ) ); 659 // Concurrent within the current arena; may also serve as a stress test 660 tbb::parallel_for( Range(0,10000*maxthread), TestAttachBody( maxthread ) ); 661 } 662 663 //--------------------------------------------------// 664 665 // Test that task_arena::enqueue does not tolerate a non-const functor. 666 // TODO: can it be reworked as SFINAE-based compile-time check? 667 struct TestFunctor { 668 void operator()() { CHECK_MESSAGE( false, "Non-const operator called" ); } 669 void operator()() const { /* library requires this overload only */ } 670 }; 671 672 void TestConstantFunctorRequirement() { 673 tbb::task_arena a; 674 TestFunctor tf; 675 a.enqueue( tf ); 676 } 677 678 //--------------------------------------------------// 679 680 #include "tbb/parallel_reduce.h" 681 #include "tbb/parallel_invoke.h" 682 683 // Test this_task_arena::isolate 684 namespace TestIsolatedExecuteNS { 685 template <typename NestedPartitioner> 686 class NestedParFor : utils::NoAssign { 687 public: 688 NestedParFor() {} 689 void operator()() const { 690 NestedPartitioner p; 691 tbb::parallel_for( 0, 10, utils::DummyBody( 10 ), p ); 692 } 693 }; 694 695 template <typename NestedPartitioner> 696 class ParForBody : utils::NoAssign { 697 bool myOuterIsolation; 698 tbb::enumerable_thread_specific<int> &myEts; 699 std::atomic<bool> &myIsStolen; 700 public: 701 ParForBody( bool outer_isolation, tbb::enumerable_thread_specific<int> &ets, std::atomic<bool> &is_stolen ) 702 : myOuterIsolation( outer_isolation ), myEts( ets ), myIsStolen( is_stolen ) {} 703 void operator()( int ) const { 704 int &e = myEts.local(); 705 if ( e++ > 0 ) myIsStolen = true; 706 if ( myOuterIsolation ) 707 NestedParFor<NestedPartitioner>()(); 708 else 709 tbb::this_task_arena::isolate( NestedParFor<NestedPartitioner>() ); 710 --e; 711 } 712 }; 713 714 template <typename OuterPartitioner, typename NestedPartitioner> 715 class OuterParFor : utils::NoAssign { 716 bool myOuterIsolation; 717 std::atomic<bool> &myIsStolen; 718 public: 719 OuterParFor( bool outer_isolation, std::atomic<bool> &is_stolen ) : myOuterIsolation( outer_isolation ), myIsStolen( is_stolen ) {} 720 void operator()() const { 721 tbb::enumerable_thread_specific<int> ets( 0 ); 722 OuterPartitioner p; 723 tbb::parallel_for( 0, 1000, ParForBody<NestedPartitioner>( myOuterIsolation, ets, myIsStolen ), p ); 724 } 725 }; 726 727 template <typename OuterPartitioner, typename NestedPartitioner> 728 void TwoLoopsTest( bool outer_isolation ) { 729 std::atomic<bool> is_stolen; 730 is_stolen = false; 731 const int max_repeats = 100; 732 if ( outer_isolation ) { 733 for ( int i = 0; i <= max_repeats; ++i ) { 734 tbb::this_task_arena::isolate( OuterParFor<OuterPartitioner, NestedPartitioner>( outer_isolation, is_stolen ) ); 735 if ( is_stolen ) break; 736 } 737 // TODO: was ASSERT_WARNING 738 if (!is_stolen) { 739 REPORT("Warning: isolate() should not block stealing on nested levels without isolation\n"); 740 } 741 } else { 742 for ( int i = 0; i <= max_repeats; ++i ) { 743 OuterParFor<OuterPartitioner, NestedPartitioner>( outer_isolation, is_stolen )(); 744 } 745 REQUIRE_MESSAGE( !is_stolen, "isolate() on nested levels should prevent stealing from outer leves" ); 746 } 747 } 748 749 void TwoLoopsTest( bool outer_isolation ) { 750 TwoLoopsTest<tbb::simple_partitioner, tbb::simple_partitioner>( outer_isolation ); 751 TwoLoopsTest<tbb::simple_partitioner, tbb::affinity_partitioner>( outer_isolation ); 752 TwoLoopsTest<tbb::affinity_partitioner, tbb::simple_partitioner>( outer_isolation ); 753 TwoLoopsTest<tbb::affinity_partitioner, tbb::affinity_partitioner>( outer_isolation ); 754 } 755 756 void TwoLoopsTest() { 757 TwoLoopsTest( true ); 758 TwoLoopsTest( false ); 759 } 760 //--------------------------------------------------// 761 class HeavyMixTestBody : utils::NoAssign { 762 tbb::enumerable_thread_specific<utils::FastRandom<>>& myRandom; 763 tbb::enumerable_thread_specific<int>& myIsolatedLevel; 764 int myNestedLevel; 765 766 template <typename Partitioner, typename Body> 767 static void RunTwoBodies( utils::FastRandom<>& rnd, const Body &body, Partitioner& p, tbb::task_group_context* ctx = NULL ) { 768 if ( rnd.get() % 2 ) { 769 if (ctx ) 770 tbb::parallel_for( 0, 2, body, p, *ctx ); 771 else 772 tbb::parallel_for( 0, 2, body, p ); 773 } else { 774 tbb::parallel_invoke( body, body ); 775 } 776 } 777 778 template <typename Partitioner> 779 class IsolatedBody : utils::NoAssign { 780 const HeavyMixTestBody &myHeavyMixTestBody; 781 Partitioner &myPartitioner; 782 public: 783 IsolatedBody( const HeavyMixTestBody &body, Partitioner &partitioner ) 784 : myHeavyMixTestBody( body ), myPartitioner( partitioner ) {} 785 void operator()() const { 786 RunTwoBodies( myHeavyMixTestBody.myRandom.local(), 787 HeavyMixTestBody( myHeavyMixTestBody.myRandom, myHeavyMixTestBody.myIsolatedLevel, 788 myHeavyMixTestBody.myNestedLevel + 1 ), 789 myPartitioner ); 790 } 791 }; 792 793 template <typename Partitioner> 794 void RunNextLevel( utils::FastRandom<>& rnd, int &isolated_level ) const { 795 Partitioner p; 796 switch ( rnd.get() % 2 ) { 797 case 0: { 798 // No features 799 tbb::task_group_context ctx; 800 RunTwoBodies( rnd, HeavyMixTestBody(myRandom, myIsolatedLevel, myNestedLevel + 1), p, &ctx ); 801 break; 802 } 803 case 1: { 804 // Isolation 805 int previous_isolation = isolated_level; 806 isolated_level = myNestedLevel; 807 tbb::this_task_arena::isolate( IsolatedBody<Partitioner>( *this, p ) ); 808 isolated_level = previous_isolation; 809 break; 810 } 811 } 812 } 813 public: 814 HeavyMixTestBody( tbb::enumerable_thread_specific<utils::FastRandom<>>& random, 815 tbb::enumerable_thread_specific<int>& isolated_level, int nested_level ) 816 : myRandom( random ), myIsolatedLevel( isolated_level ) 817 , myNestedLevel( nested_level ) {} 818 void operator()() const { 819 int &isolated_level = myIsolatedLevel.local(); 820 CHECK_FAST_MESSAGE( myNestedLevel > isolated_level, "The outer-level task should not be stolen on isolated level" ); 821 if ( myNestedLevel == 20 ) 822 return; 823 utils::FastRandom<>& rnd = myRandom.local(); 824 if ( rnd.get() % 2 == 1 ) { 825 RunNextLevel<tbb::auto_partitioner>( rnd, isolated_level ); 826 } else { 827 RunNextLevel<tbb::affinity_partitioner>( rnd, isolated_level ); 828 } 829 } 830 void operator()(int) const { 831 this->operator()(); 832 } 833 }; 834 835 struct RandomInitializer { 836 utils::FastRandom<> operator()() { 837 return utils::FastRandom<>( tbb::this_task_arena::current_thread_index() ); 838 } 839 }; 840 841 void HeavyMixTest() { 842 std::size_t num_threads = tbb::this_task_arena::max_concurrency() < 3 ? 3 : tbb::this_task_arena::max_concurrency(); 843 tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, num_threads); 844 845 RandomInitializer init_random; 846 tbb::enumerable_thread_specific<utils::FastRandom<>> random( init_random ); 847 tbb::enumerable_thread_specific<int> isolated_level( 0 ); 848 for ( int i = 0; i < 5; ++i ) { 849 HeavyMixTestBody b( random, isolated_level, 1 ); 850 b( 0 ); 851 } 852 } 853 854 //--------------------------------------------------// 855 #if TBB_USE_EXCEPTIONS 856 struct MyException {}; 857 struct IsolatedBodyThrowsException { 858 void operator()() const { 859 #if _MSC_VER && !__INTEL_COMPILER 860 // Workaround an unreachable code warning in task_arena_function. 861 volatile bool workaround = true; 862 if (workaround) 863 #endif 864 { 865 throw MyException(); 866 } 867 } 868 }; 869 struct ExceptionTestBody : utils::NoAssign { 870 tbb::enumerable_thread_specific<int>& myEts; 871 std::atomic<bool>& myIsStolen; 872 ExceptionTestBody( tbb::enumerable_thread_specific<int>& ets, std::atomic<bool>& is_stolen ) 873 : myEts( ets ), myIsStolen( is_stolen ) {} 874 void operator()( int i ) const { 875 try { 876 tbb::this_task_arena::isolate( IsolatedBodyThrowsException() ); 877 REQUIRE_MESSAGE( false, "The exception has been lost" ); 878 } 879 catch ( MyException ) {} 880 catch ( ... ) { 881 REQUIRE_MESSAGE( false, "Unexpected exception" ); 882 } 883 // Check that nested algorithms can steal outer-level tasks 884 int &e = myEts.local(); 885 if ( e++ > 0 ) myIsStolen = true; 886 // work imbalance increases chances for stealing 887 tbb::parallel_for( 0, 10+i, utils::DummyBody( 100 ) ); 888 --e; 889 } 890 }; 891 892 #endif /* TBB_USE_EXCEPTIONS */ 893 void ExceptionTest() { 894 #if TBB_USE_EXCEPTIONS 895 tbb::enumerable_thread_specific<int> ets; 896 std::atomic<bool> is_stolen; 897 is_stolen = false; 898 for ( ;; ) { 899 tbb::parallel_for( 0, 1000, ExceptionTestBody( ets, is_stolen ) ); 900 if ( is_stolen ) break; 901 } 902 REQUIRE_MESSAGE( is_stolen, "isolate should not affect non-isolated work" ); 903 #endif /* TBB_USE_EXCEPTIONS */ 904 } 905 906 struct NonConstBody { 907 unsigned int state; 908 void operator()() { 909 state ^= ~0u; 910 } 911 }; 912 913 void TestNonConstBody() { 914 NonConstBody body; 915 body.state = 0x6c97d5ed; 916 tbb::this_task_arena::isolate(body); 917 REQUIRE_MESSAGE(body.state == 0x93682a12, "The wrong state"); 918 } 919 920 // TODO: Consider tbb::task_group instead of explicit task API. 921 class TestEnqueueTask : public tbb::detail::d1::task { 922 using wait_context = tbb::detail::d1::wait_context; 923 924 tbb::enumerable_thread_specific<bool>& executed; 925 std::atomic<int>& completed; 926 927 public: 928 wait_context& waiter; 929 tbb::task_arena& arena; 930 static const int N = 100; 931 932 TestEnqueueTask(tbb::enumerable_thread_specific<bool>& exe, std::atomic<int>& c, wait_context& w, tbb::task_arena& a) 933 : executed(exe), completed(c), waiter(w), arena(a) {} 934 935 tbb::detail::d1::task* execute(tbb::detail::d1::execution_data&) override { 936 for (int i = 0; i < N; ++i) { 937 arena.enqueue([&]() { 938 executed.local() = true; 939 ++completed; 940 for (int j = 0; j < 100; j++) utils::yield(); 941 waiter.release(1); 942 }); 943 } 944 return nullptr; 945 } 946 tbb::detail::d1::task* cancel(tbb::detail::d1::execution_data&) override { return nullptr; } 947 }; 948 949 class TestEnqueueIsolateBody : utils::NoCopy { 950 tbb::enumerable_thread_specific<bool>& executed; 951 std::atomic<int>& completed; 952 tbb::task_arena& arena; 953 public: 954 static const int N = 100; 955 956 TestEnqueueIsolateBody(tbb::enumerable_thread_specific<bool>& exe, std::atomic<int>& c, tbb::task_arena& a) 957 : executed(exe), completed(c), arena(a) {} 958 void operator()() { 959 tbb::task_group_context ctx; 960 tbb::detail::d1::wait_context waiter(N); 961 962 TestEnqueueTask root(executed, completed, waiter, arena); 963 tbb::detail::d1::execute_and_wait(root, ctx, waiter, ctx); 964 } 965 }; 966 967 void TestEnqueue() { 968 tbb::enumerable_thread_specific<bool> executed(false); 969 std::atomic<int> completed; 970 tbb::task_arena arena{tbb::task_arena::attach()}; 971 972 // Check that the main thread can process enqueued tasks. 973 completed = 0; 974 TestEnqueueIsolateBody b1(executed, completed, arena); 975 b1(); 976 977 if (!executed.local()) { 978 REPORT("Warning: No one enqueued task has executed by the main thread.\n"); 979 } 980 981 executed.local() = false; 982 completed = 0; 983 const int N = 100; 984 // Create enqueued tasks out of isolation. 985 986 tbb::task_group_context ctx; 987 tbb::detail::d1::wait_context waiter(N); 988 for (int i = 0; i < N; ++i) { 989 arena.enqueue([&]() { 990 executed.local() = true; 991 ++completed; 992 utils::yield(); 993 waiter.release(1); 994 }); 995 } 996 TestEnqueueIsolateBody b2(executed, completed, arena); 997 tbb::this_task_arena::isolate(b2); 998 REQUIRE_MESSAGE(executed.local() == false, "An enqueued task was executed within isolate."); 999 1000 tbb::detail::d1::wait(waiter, ctx); 1001 // while (completed < TestEnqueueTask::N + N) utils::yield(); 1002 } 1003 } 1004 1005 void TestIsolatedExecute() { 1006 // At least 3 threads (owner + 2 thieves) are required to reproduce a situation when the owner steals outer 1007 // level task on a nested level. If we have only one thief then it will execute outer level tasks first and 1008 // the owner will not have a possibility to steal outer level tasks. 1009 int platform_max_thread = tbb::this_task_arena::max_concurrency(); 1010 int num_threads = utils::min( platform_max_thread, 3 ); 1011 { 1012 // Too many threads require too many work to reproduce the stealing from outer level. 1013 tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, utils::max(num_threads, 7)); 1014 TestIsolatedExecuteNS::TwoLoopsTest(); 1015 TestIsolatedExecuteNS::HeavyMixTest(); 1016 TestIsolatedExecuteNS::ExceptionTest(); 1017 } 1018 tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, num_threads); 1019 TestIsolatedExecuteNS::HeavyMixTest(); 1020 TestIsolatedExecuteNS::TestNonConstBody(); 1021 TestIsolatedExecuteNS::TestEnqueue(); 1022 } 1023 1024 //-----------------------------------------------------------------------------------------// 1025 1026 class TestDelegatedSpawnWaitBody : utils::NoAssign { 1027 tbb::task_arena &my_a; 1028 utils::SpinBarrier &my_b1, &my_b2; 1029 public: 1030 TestDelegatedSpawnWaitBody( tbb::task_arena &a, utils::SpinBarrier &b1, utils::SpinBarrier &b2) 1031 : my_a(a), my_b1(b1), my_b2(b2) {} 1032 // NativeParallelFor's functor 1033 void operator()(int idx) const { 1034 if ( idx==0 ) { // thread 0 works in the arena, thread 1 waits for it (to prevent test hang) 1035 for (int i = 0; i < 2; ++i) { 1036 my_a.enqueue([this] { my_b1.wait(); }); // tasks to sync with workers 1037 } 1038 tbb::task_group tg; 1039 my_b1.wait(); // sync with the workers 1040 for( int i=0; i<100000; ++i) { 1041 my_a.execute([&tg] { tg.run([] {}); }); 1042 } 1043 my_a.execute([&tg] {tg.wait(); }); 1044 } 1045 1046 my_b2.wait(); // sync both threads 1047 } 1048 }; 1049 1050 void TestDelegatedSpawnWait() { 1051 if (tbb::this_task_arena::max_concurrency() < 3) { 1052 // The test requires at least 2 worker threads 1053 return; 1054 } 1055 // Regression test for a bug with missed wakeup notification from a delegated task 1056 tbb::task_arena a(2,0); 1057 a.initialize(); 1058 utils::SpinBarrier barrier1(3), barrier2(2); 1059 utils::NativeParallelFor( 2, TestDelegatedSpawnWaitBody(a, barrier1, barrier2) ); 1060 a.debug_wait_until_empty(); 1061 } 1062 1063 //-----------------------------------------------------------------------------------------// 1064 1065 class TestMultipleWaitsArenaWait : utils::NoAssign { 1066 using wait_context = tbb::detail::d1::wait_context; 1067 public: 1068 TestMultipleWaitsArenaWait( int idx, int bunch_size, int num_tasks, std::vector<wait_context*>& waiters, std::atomic<int>& processed, tbb::task_group_context& tgc ) 1069 : my_idx( idx ), my_bunch_size( bunch_size ), my_num_tasks(num_tasks), my_waiters( waiters ), my_processed( processed ), my_context(tgc) {} 1070 void operator()() const { 1071 ++my_processed; 1072 // Wait for all tasks 1073 if ( my_idx < my_num_tasks ) { 1074 tbb::detail::d1::wait(*my_waiters[my_idx], my_context); 1075 } 1076 // Signal waiting tasks 1077 if ( my_idx >= my_bunch_size ) { 1078 my_waiters[my_idx-my_bunch_size]->release(); 1079 } 1080 } 1081 private: 1082 int my_idx; 1083 int my_bunch_size; 1084 int my_num_tasks; 1085 std::vector<wait_context*>& my_waiters; 1086 std::atomic<int>& my_processed; 1087 tbb::task_group_context& my_context; 1088 }; 1089 1090 class TestMultipleWaitsThreadBody : utils::NoAssign { 1091 using wait_context = tbb::detail::d1::wait_context; 1092 public: 1093 TestMultipleWaitsThreadBody( int bunch_size, int num_tasks, tbb::task_arena& a, std::vector<wait_context*>& waiters, std::atomic<int>& processed, tbb::task_group_context& tgc ) 1094 : my_bunch_size( bunch_size ), my_num_tasks( num_tasks ), my_arena( a ), my_waiters( waiters ), my_processed( processed ), my_context(tgc) {} 1095 void operator()( int idx ) const { 1096 my_arena.execute( TestMultipleWaitsArenaWait( idx, my_bunch_size, my_num_tasks, my_waiters, my_processed, my_context ) ); 1097 --my_processed; 1098 } 1099 private: 1100 int my_bunch_size; 1101 int my_num_tasks; 1102 tbb::task_arena& my_arena; 1103 std::vector<wait_context*>& my_waiters; 1104 std::atomic<int>& my_processed; 1105 tbb::task_group_context& my_context; 1106 }; 1107 1108 void TestMultipleWaits( int num_threads, int num_bunches, int bunch_size ) { 1109 tbb::task_arena a( num_threads ); 1110 const int num_tasks = (num_bunches-1)*bunch_size; 1111 1112 tbb::task_group_context tgc; 1113 std::vector<tbb::detail::d1::wait_context*> waiters(num_tasks); 1114 for (auto& w : waiters) w = new tbb::detail::d1::wait_context(0); 1115 1116 std::atomic<int> processed(0); 1117 for ( int repeats = 0; repeats<10; ++repeats ) { 1118 int idx = 0; 1119 for ( int bunch = 0; bunch < num_bunches-1; ++bunch ) { 1120 // Sync with the previous bunch of waiters to prevent "false" nested dependicies (when a nested task waits for an outer task). 1121 while ( processed < bunch*bunch_size ) utils::yield(); 1122 // Run the bunch of threads/waiters that depend on the next bunch of threads/waiters. 1123 for ( int i = 0; i<bunch_size; ++i ) { 1124 waiters[idx]->reserve(); 1125 std::thread( TestMultipleWaitsThreadBody( bunch_size, num_tasks, a, waiters, processed, tgc ), idx++ ).detach(); 1126 } 1127 } 1128 // No sync because the threads of the last bunch do not call wait_for_all. 1129 // Run the last bunch of threads. 1130 for ( int i = 0; i<bunch_size; ++i ) 1131 std::thread( TestMultipleWaitsThreadBody( bunch_size, num_tasks, a, waiters, processed, tgc ), idx++ ).detach(); 1132 while ( processed ) utils::yield(); 1133 } 1134 for (auto w : waiters) delete w; 1135 } 1136 1137 void TestMultipleWaits() { 1138 // Limit the number of threads to prevent heavy oversubscription. 1139 #if TBB_TEST_LOW_WORKLOAD 1140 const int max_threads = std::min( 4, tbb::this_task_arena::max_concurrency() ); 1141 #else 1142 const int max_threads = std::min( 16, tbb::this_task_arena::max_concurrency() ); 1143 #endif 1144 1145 utils::FastRandom<> rnd(1234); 1146 for ( int threads = 1; threads <= max_threads; threads += utils::max( threads/2, 1 ) ) { 1147 for ( int i = 0; i<3; ++i ) { 1148 const int num_bunches = 3 + rnd.get()%3; 1149 const int bunch_size = max_threads + rnd.get()%max_threads; 1150 TestMultipleWaits( threads, num_bunches, bunch_size ); 1151 } 1152 } 1153 } 1154 1155 //--------------------------------------------------// 1156 1157 void TestSmallStackSize() { 1158 tbb::global_control gc(tbb::global_control::thread_stack_size, 1159 tbb::global_control::active_value(tbb::global_control::thread_stack_size) / 2 ); 1160 // The test produces the warning (not a error) if fails. So the test is run many times 1161 // to make the log annoying (to force to consider it as an error). 1162 for (int i = 0; i < 100; ++i) { 1163 tbb::task_arena a; 1164 a.initialize(); 1165 } 1166 } 1167 1168 //--------------------------------------------------// 1169 1170 namespace TestMoveSemanticsNS { 1171 struct TestFunctor { 1172 void operator()() const {}; 1173 }; 1174 1175 struct MoveOnlyFunctor : utils::MoveOnly, TestFunctor { 1176 MoveOnlyFunctor() : utils::MoveOnly() {}; 1177 MoveOnlyFunctor(MoveOnlyFunctor&& other) : utils::MoveOnly(std::move(other)) {}; 1178 }; 1179 1180 struct MovePreferableFunctor : utils::Movable, TestFunctor { 1181 MovePreferableFunctor() : utils::Movable() {}; 1182 MovePreferableFunctor(MovePreferableFunctor&& other) : utils::Movable( std::move(other) ) {}; 1183 MovePreferableFunctor(const MovePreferableFunctor& other) : utils::Movable(other) {}; 1184 }; 1185 1186 struct NoMoveNoCopyFunctor : utils::NoCopy, TestFunctor { 1187 NoMoveNoCopyFunctor() : utils::NoCopy() {}; 1188 // mv ctor is not allowed as cp ctor from parent NoCopy 1189 private: 1190 NoMoveNoCopyFunctor(NoMoveNoCopyFunctor&&); 1191 }; 1192 1193 void TestFunctors() { 1194 tbb::task_arena ta; 1195 MovePreferableFunctor mpf; 1196 // execute() doesn't have any copies or moves of arguments inside the impl 1197 ta.execute( NoMoveNoCopyFunctor() ); 1198 1199 ta.enqueue( MoveOnlyFunctor() ); 1200 ta.enqueue( mpf ); 1201 REQUIRE_MESSAGE(mpf.alive, "object was moved when was passed by lval"); 1202 mpf.Reset(); 1203 ta.enqueue( std::move(mpf) ); 1204 REQUIRE_MESSAGE(!mpf.alive, "object was copied when was passed by rval"); 1205 mpf.Reset(); 1206 } 1207 } 1208 1209 void TestMoveSemantics() { 1210 TestMoveSemanticsNS::TestFunctors(); 1211 } 1212 1213 //--------------------------------------------------// 1214 1215 #include <vector> 1216 1217 #include "common/state_trackable.h" 1218 1219 namespace TestReturnValueNS { 1220 struct noDefaultTag {}; 1221 class ReturnType : public StateTrackable<> { 1222 static const int SIZE = 42; 1223 std::vector<int> data; 1224 public: 1225 ReturnType(noDefaultTag) : StateTrackable<>(0) {} 1226 // Define copy constructor to test that it is never called 1227 ReturnType(const ReturnType& r) : StateTrackable<>(r), data(r.data) {} 1228 ReturnType(ReturnType&& r) : StateTrackable<>(std::move(r)), data(std::move(r.data)) {} 1229 1230 void fill() { 1231 for (int i = 0; i < SIZE; ++i) 1232 data.push_back(i); 1233 } 1234 void check() { 1235 REQUIRE(data.size() == unsigned(SIZE)); 1236 for (int i = 0; i < SIZE; ++i) 1237 REQUIRE(data[i] == i); 1238 StateTrackableCounters::counters_type& cnts = StateTrackableCounters::counters; 1239 REQUIRE(cnts[StateTrackableBase::DefaultInitialized] == 0); 1240 REQUIRE(cnts[StateTrackableBase::DirectInitialized] == 1); 1241 std::size_t copied = cnts[StateTrackableBase::CopyInitialized]; 1242 std::size_t moved = cnts[StateTrackableBase::MoveInitialized]; 1243 REQUIRE(cnts[StateTrackableBase::Destroyed] == copied + moved); 1244 // The number of copies/moves should not exceed 3 if copy elision takes a place: 1245 // function return, store to an internal storage, acquire internal storage. 1246 // For compilation, without copy elision, this number may be grown up to 7. 1247 REQUIRE((copied == 0 && moved <= 7)); 1248 WARN_MESSAGE(moved <= 3, 1249 "Warning: The number of copies/moves should not exceed 3 if copy elision takes a place." 1250 "Take an attention to this warning only if copy elision is enabled." 1251 ); 1252 } 1253 }; 1254 1255 template <typename R> 1256 R function() { 1257 noDefaultTag tag; 1258 R r(tag); 1259 r.fill(); 1260 return r; 1261 } 1262 1263 template <> 1264 void function<void>() {} 1265 1266 template <typename R> 1267 struct Functor { 1268 R operator()() const { 1269 return function<R>(); 1270 } 1271 }; 1272 1273 tbb::task_arena& arena() { 1274 static tbb::task_arena a; 1275 return a; 1276 } 1277 1278 template <typename F> 1279 void TestExecute(F &f) { 1280 StateTrackableCounters::reset(); 1281 ReturnType r{arena().execute(f)}; 1282 r.check(); 1283 } 1284 1285 template <typename F> 1286 void TestExecute(const F &f) { 1287 StateTrackableCounters::reset(); 1288 ReturnType r{arena().execute(f)}; 1289 r.check(); 1290 } 1291 template <typename F> 1292 void TestIsolate(F &f) { 1293 StateTrackableCounters::reset(); 1294 ReturnType r{tbb::this_task_arena::isolate(f)}; 1295 r.check(); 1296 } 1297 1298 template <typename F> 1299 void TestIsolate(const F &f) { 1300 StateTrackableCounters::reset(); 1301 ReturnType r{tbb::this_task_arena::isolate(f)}; 1302 r.check(); 1303 } 1304 1305 void Test() { 1306 TestExecute(Functor<ReturnType>()); 1307 Functor<ReturnType> f1; 1308 TestExecute(f1); 1309 TestExecute(function<ReturnType>); 1310 1311 arena().execute(Functor<void>()); 1312 Functor<void> f2; 1313 arena().execute(f2); 1314 arena().execute(function<void>); 1315 TestIsolate(Functor<ReturnType>()); 1316 TestIsolate(f1); 1317 TestIsolate(function<ReturnType>); 1318 tbb::this_task_arena::isolate(Functor<void>()); 1319 tbb::this_task_arena::isolate(f2); 1320 tbb::this_task_arena::isolate(function<void>); 1321 } 1322 } 1323 1324 void TestReturnValue() { 1325 TestReturnValueNS::Test(); 1326 } 1327 1328 //--------------------------------------------------// 1329 1330 // MyObserver checks if threads join to the same arena 1331 struct MyObserver: public tbb::task_scheduler_observer { 1332 tbb::enumerable_thread_specific<tbb::task_arena*>& my_tls; 1333 tbb::task_arena& my_arena; 1334 std::atomic<int>& my_failure_counter; 1335 std::atomic<int>& my_counter; 1336 utils::SpinBarrier& m_barrier; 1337 1338 MyObserver(tbb::task_arena& a, 1339 tbb::enumerable_thread_specific<tbb::task_arena*>& tls, 1340 std::atomic<int>& failure_counter, 1341 std::atomic<int>& counter, 1342 utils::SpinBarrier& barrier) 1343 : tbb::task_scheduler_observer(a), my_tls(tls), my_arena(a), 1344 my_failure_counter(failure_counter), my_counter(counter), m_barrier(barrier) { 1345 observe(true); 1346 } 1347 void on_scheduler_entry(bool worker) override { 1348 if (worker) { 1349 ++my_counter; 1350 tbb::task_arena*& cur_arena = my_tls.local(); 1351 if (cur_arena != 0 && cur_arena != &my_arena) { 1352 ++my_failure_counter; 1353 } 1354 cur_arena = &my_arena; 1355 m_barrier.wait(); 1356 } 1357 } 1358 void on_scheduler_exit(bool worker) override { 1359 if (worker) { 1360 m_barrier.wait(); // before wakeup 1361 m_barrier.wait(); // after wakeup 1362 } 1363 } 1364 }; 1365 1366 void TestArenaWorkersMigrationWithNumThreads(int n_threads = 0) { 1367 if (n_threads == 0) { 1368 n_threads = tbb::this_task_arena::max_concurrency(); 1369 } 1370 1371 const int max_n_arenas = 8; 1372 int n_arenas = 2; 1373 if(n_threads > 16) { 1374 n_arenas = max_n_arenas; 1375 } else if (n_threads > 8) { 1376 n_arenas = 4; 1377 } 1378 1379 int n_workers = n_threads - 1; 1380 n_workers = n_arenas * (n_workers / n_arenas); 1381 if (n_workers == 0) { 1382 return; 1383 } 1384 1385 n_threads = n_workers + 1; 1386 tbb::global_control control(tbb::global_control::max_allowed_parallelism, n_threads); 1387 1388 const int n_repetitions = 20; 1389 const int n_outer_repetitions = 100; 1390 std::multiset<float> failure_ratio; // for median calculating 1391 utils::SpinBarrier barrier(n_threads); 1392 utils::SpinBarrier worker_barrier(n_workers); 1393 MyObserver* observer[max_n_arenas]; 1394 std::vector<tbb::task_arena> arenas(n_arenas); 1395 std::atomic<int> failure_counter; 1396 std::atomic<int> counter; 1397 tbb::enumerable_thread_specific<tbb::task_arena*> tls; 1398 1399 for (int i = 0; i < n_arenas; ++i) { 1400 arenas[i].initialize(n_workers / n_arenas + 1); // +1 for master 1401 observer[i] = new MyObserver(arenas[i], tls, failure_counter, counter, barrier); 1402 } 1403 1404 int ii = 0; 1405 for (; ii < n_outer_repetitions; ++ii) { 1406 failure_counter = 0; 1407 counter = 0; 1408 1409 // Main code 1410 auto wakeup = [&arenas] { for (auto& a : arenas) a.enqueue([]{}); }; 1411 wakeup(); 1412 for (int j = 0; j < n_repetitions; ++j) { 1413 barrier.wait(); // entry 1414 barrier.wait(); // exit1 1415 wakeup(); 1416 barrier.wait(); // exit2 1417 } 1418 barrier.wait(); // entry 1419 barrier.wait(); // exit1 1420 barrier.wait(); // exit2 1421 1422 failure_ratio.insert(float(failure_counter) / counter); 1423 tls.clear(); 1424 // collect 3 elements in failure_ratio before calculating median 1425 if (ii > 1) { 1426 std::multiset<float>::iterator it = failure_ratio.begin(); 1427 std::advance(it, failure_ratio.size() / 2); 1428 if (*it < 0.02) 1429 break; 1430 } 1431 } 1432 for (int i = 0; i < n_arenas; ++i) { 1433 delete observer[i]; 1434 } 1435 // check if median is so big 1436 std::multiset<float>::iterator it = failure_ratio.begin(); 1437 std::advance(it, failure_ratio.size() / 2); 1438 // TODO: decrease constants 0.05 and 0.3 by setting ratio between n_threads and n_arenas 1439 if (*it > 0.05) { 1440 REPORT("Warning: So many cases when threads join to different arenas.\n"); 1441 REQUIRE_MESSAGE(*it <= 0.3, "A lot of cases when threads join to different arenas.\n"); 1442 } 1443 } 1444 1445 void TestArenaWorkersMigration() { 1446 TestArenaWorkersMigrationWithNumThreads(4); 1447 if (tbb::this_task_arena::max_concurrency() != 4) { 1448 TestArenaWorkersMigrationWithNumThreads(); 1449 } 1450 } 1451 1452 //--------------------------------------------------// 1453 void TestDefaultCreatedWorkersAmount() { 1454 int threads = tbb::this_task_arena::max_concurrency(); 1455 utils::NativeParallelFor(1, [threads](int idx) { 1456 REQUIRE_MESSAGE(idx == 0, "more than 1 thread is going to reset TLS"); 1457 utils::SpinBarrier barrier(threads); 1458 ResetTLS(); 1459 for (auto blocked : { false, true }) { 1460 for (int trail = 0; trail < (blocked ? 10 : 10000); ++trail) { 1461 tbb::parallel_for(0, threads, [threads, blocked, &barrier](int) { 1462 CHECK_FAST_MESSAGE(threads == tbb::this_task_arena::max_concurrency(), "concurrency level is not equal specified threadnum"); 1463 CHECK_FAST_MESSAGE(tbb::this_task_arena::current_thread_index() < tbb::this_task_arena::max_concurrency(), "amount of created threads is more than specified by default"); 1464 local_id.local() = 1; 1465 if (blocked) { 1466 // If there is more threads than expected, 'sleep' gives a chance to join unexpected threads. 1467 utils::Sleep(1); 1468 barrier.wait(); 1469 } 1470 }, tbb::simple_partitioner()); 1471 REQUIRE_MESSAGE(local_id.size() <= size_t(threads), "amount of created threads is not equal to default num"); 1472 if (blocked) { 1473 REQUIRE_MESSAGE(local_id.size() == size_t(threads), "amount of created threads is not equal to default num"); 1474 } 1475 } 1476 } 1477 }); 1478 } 1479 1480 void TestAbilityToCreateWorkers(int thread_num) { 1481 tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, thread_num); 1482 // Checks only some part of reserved-external threads amount: 1483 // 0 and 1 reserved threads are important cases but it is also needed 1484 // to collect some statistic data with other amount and to not consume 1485 // whole test sesion time checking each amount 1486 TestArenaConcurrency(thread_num - 1, 0, int(thread_num / 2.72)); 1487 TestArenaConcurrency(thread_num, 1, int(thread_num / 3.14)); 1488 } 1489 1490 void TestDefaultWorkersLimit() { 1491 TestDefaultCreatedWorkersAmount(); 1492 #if TBB_TEST_LOW_WORKLOAD 1493 TestAbilityToCreateWorkers(24); 1494 #else 1495 TestAbilityToCreateWorkers(256); 1496 #endif 1497 } 1498 1499 #if TBB_USE_EXCEPTIONS 1500 1501 void ExceptionInExecute() { 1502 std::size_t thread_number = utils::get_platform_max_threads(); 1503 int arena_concurrency = static_cast<int>(thread_number) / 2; 1504 tbb::task_arena test_arena(arena_concurrency, arena_concurrency); 1505 1506 std::atomic<int> canceled_task{}; 1507 1508 auto parallel_func = [&test_arena, &canceled_task] (std::size_t) { 1509 for (std::size_t i = 0; i < 1000; ++i) { 1510 try { 1511 test_arena.execute([] { 1512 volatile bool suppress_unreachable_code_warning = true; 1513 if (suppress_unreachable_code_warning) { 1514 throw -1; 1515 } 1516 }); 1517 FAIL("An exception should have thrown."); 1518 } catch (int) { 1519 ++canceled_task; 1520 } catch (...) { 1521 FAIL("Wrong type of exception."); 1522 } 1523 } 1524 }; 1525 1526 utils::NativeParallelFor(thread_number, parallel_func); 1527 CHECK(canceled_task == thread_number * 1000); 1528 } 1529 1530 #endif // TBB_USE_EXCEPTIONS 1531 1532 class simple_observer : public tbb::task_scheduler_observer { 1533 static std::atomic<int> idx_counter; 1534 int my_idx; 1535 int myMaxConcurrency; // concurrency of the associated arena 1536 int myNumReservedSlots; // reserved slots in the associated arena 1537 void on_scheduler_entry( bool is_worker ) override { 1538 int current_index = tbb::this_task_arena::current_thread_index(); 1539 CHECK(current_index < (myMaxConcurrency > 1 ? myMaxConcurrency : 2)); 1540 if (is_worker) { 1541 CHECK(current_index >= myNumReservedSlots); 1542 } 1543 } 1544 void on_scheduler_exit( bool /*is_worker*/ ) override 1545 {} 1546 public: 1547 simple_observer(tbb::task_arena &a, int maxConcurrency, int numReservedSlots) 1548 : tbb::task_scheduler_observer(a), my_idx(idx_counter++) 1549 , myMaxConcurrency(maxConcurrency) 1550 , myNumReservedSlots(numReservedSlots) { 1551 observe(true); 1552 } 1553 1554 friend bool operator<(const simple_observer& lhs, const simple_observer& rhs) { 1555 return lhs.my_idx < rhs.my_idx; 1556 } 1557 }; 1558 1559 std::atomic<int> simple_observer::idx_counter{}; 1560 1561 struct arena_handler { 1562 enum arena_status { 1563 alive, 1564 deleting, 1565 deleted 1566 }; 1567 1568 tbb::task_arena* arena; 1569 1570 std::atomic<arena_status> status{alive}; 1571 tbb::spin_rw_mutex arena_in_use{}; 1572 1573 tbb::concurrent_set<simple_observer> observers; 1574 1575 arena_handler(tbb::task_arena* ptr) : arena(ptr) 1576 {} 1577 1578 friend bool operator<(const arena_handler& lhs, const arena_handler& rhs) { 1579 return lhs.arena < rhs.arena; 1580 } 1581 }; 1582 1583 // TODO: Add observer operations 1584 void StressTestMixFunctionality() { 1585 enum operation_type { 1586 create_arena, 1587 delete_arena, 1588 attach_observer, 1589 detach_observer, 1590 arena_execute, 1591 enqueue_task, 1592 last_operation_marker 1593 }; 1594 1595 std::size_t operations_number = last_operation_marker; 1596 std::size_t thread_number = utils::get_platform_max_threads(); 1597 utils::FastRandom<> operation_rnd(42); 1598 tbb::spin_mutex random_operation_guard; 1599 1600 auto get_random_operation = [&operation_rnd, &random_operation_guard, operations_number] () { 1601 tbb::spin_mutex::scoped_lock lock(random_operation_guard); 1602 return static_cast<operation_type>(operation_rnd.get() % operations_number); 1603 }; 1604 1605 utils::FastRandom<> arena_rnd(42); 1606 tbb::spin_mutex random_arena_guard; 1607 auto get_random_arena = [&arena_rnd, &random_arena_guard] () { 1608 tbb::spin_mutex::scoped_lock lock(random_arena_guard); 1609 return arena_rnd.get(); 1610 }; 1611 1612 tbb::concurrent_set<arena_handler> arenas_pool; 1613 1614 std::vector<std::thread> thread_pool; 1615 1616 utils::SpinBarrier thread_barrier(thread_number); 1617 std::size_t max_operations = 20000; 1618 std::atomic<std::size_t> curr_operation{}; 1619 1620 auto find_arena = [&arenas_pool](tbb::spin_rw_mutex::scoped_lock& lock) -> decltype(arenas_pool.begin()) { 1621 for (auto curr_arena = arenas_pool.begin(); curr_arena != arenas_pool.end(); ++curr_arena) { 1622 if (lock.try_acquire(curr_arena->arena_in_use, /*writer*/ false)) { 1623 if (curr_arena->status == arena_handler::alive) { 1624 return curr_arena; 1625 } 1626 else { 1627 lock.release(); 1628 } 1629 } 1630 } 1631 return arenas_pool.end(); 1632 }; 1633 1634 auto thread_func = [&] () { 1635 arenas_pool.emplace(new tbb::task_arena()); 1636 thread_barrier.wait(); 1637 while (curr_operation++ < max_operations) { 1638 switch (get_random_operation()) { 1639 case create_arena : 1640 { 1641 arenas_pool.emplace(new tbb::task_arena()); 1642 break; 1643 } 1644 case delete_arena : 1645 { 1646 auto curr_arena = arenas_pool.begin(); 1647 for (; curr_arena != arenas_pool.end(); ++curr_arena) { 1648 arena_handler::arena_status curr_status = arena_handler::alive; 1649 if (curr_arena->status.compare_exchange_strong(curr_status, arena_handler::deleting)) { 1650 break; 1651 } 1652 } 1653 1654 if (curr_arena == arenas_pool.end()) break; 1655 1656 tbb::spin_rw_mutex::scoped_lock lock(curr_arena->arena_in_use, /*writer*/ true); 1657 1658 delete curr_arena->arena; 1659 curr_arena->status.store(arena_handler::deleted); 1660 1661 break; 1662 } 1663 case attach_observer : 1664 { 1665 tbb::spin_rw_mutex::scoped_lock lock{}; 1666 1667 auto curr_arena = find_arena(lock); 1668 if (curr_arena != arenas_pool.end()) { 1669 curr_arena->observers.emplace(*curr_arena->arena, thread_number, 1); 1670 } 1671 break; 1672 } 1673 case detach_observer: 1674 { 1675 auto arena_number = get_random_arena() % arenas_pool.size(); 1676 auto curr_arena = arenas_pool.begin(); 1677 std::advance(curr_arena, arena_number); 1678 1679 for (auto it = curr_arena->observers.begin(); it != curr_arena->observers.end(); ++it) { 1680 if (it->is_observing()) { 1681 it->observe(false); 1682 break; 1683 } 1684 } 1685 1686 break; 1687 } 1688 case arena_execute: 1689 { 1690 tbb::spin_rw_mutex::scoped_lock lock{}; 1691 auto curr_arena = find_arena(lock); 1692 1693 if (curr_arena != arenas_pool.end()) { 1694 curr_arena->arena->execute([]() { 1695 tbb::affinity_partitioner aff; 1696 tbb::parallel_for(0, 10000, utils::DummyBody(10), tbb::auto_partitioner{}); 1697 tbb::parallel_for(0, 10000, utils::DummyBody(10), aff); 1698 }); 1699 } 1700 1701 break; 1702 } 1703 case enqueue_task: 1704 { 1705 tbb::spin_rw_mutex::scoped_lock lock{}; 1706 auto curr_arena = find_arena(lock); 1707 1708 if (curr_arena != arenas_pool.end()) { 1709 curr_arena->arena->enqueue([] { utils::doDummyWork(1000); }); 1710 } 1711 1712 break; 1713 } 1714 case last_operation_marker : 1715 break; 1716 } 1717 } 1718 }; 1719 1720 for (std::size_t i = 0; i < thread_number - 1; ++i) { 1721 thread_pool.emplace_back(thread_func); 1722 } 1723 1724 thread_func(); 1725 1726 for (std::size_t i = 0; i < thread_number - 1; ++i) { 1727 if (thread_pool[i].joinable()) thread_pool[i].join(); 1728 } 1729 1730 for (auto& handler : arenas_pool) { 1731 if (handler.status != arena_handler::deleted) delete handler.arena; 1732 } 1733 } 1734 1735 struct enqueue_test_helper { 1736 enqueue_test_helper(tbb::task_arena& arena, tbb::enumerable_thread_specific<bool>& ets , std::atomic<std::size_t>& task_counter) 1737 : my_arena(arena), my_ets(ets), my_task_counter(task_counter) 1738 {} 1739 1740 enqueue_test_helper(const enqueue_test_helper& ef) : my_arena(ef.my_arena), my_ets(ef.my_ets), my_task_counter(ef.my_task_counter) 1741 {} 1742 1743 void operator() () const { 1744 CHECK(my_ets.local()); 1745 if (my_task_counter++ < 100000) my_arena.enqueue(enqueue_test_helper(my_arena, my_ets, my_task_counter)); 1746 utils::yield(); 1747 } 1748 1749 tbb::task_arena& my_arena; 1750 tbb::enumerable_thread_specific<bool>& my_ets; 1751 std::atomic<std::size_t>& my_task_counter; 1752 }; 1753 1754 //--------------------------------------------------// 1755 1756 // This test requires TBB in an uninitialized state 1757 //! \brief \ref requirement 1758 TEST_CASE("task_arena initialize soft limit ignoring affinity mask") { 1759 REQUIRE_MESSAGE((tbb::this_task_arena::current_thread_index() == tbb::task_arena::not_initialized), "TBB was initialized state"); 1760 tbb::enumerable_thread_specific<int> ets; 1761 1762 tbb::task_arena arena(int(utils::get_platform_max_threads() * 2)); 1763 arena.execute([&ets] { 1764 tbb::parallel_for(0, 10000000, [&ets](int){ 1765 ets.local() = 1; 1766 utils::doDummyWork(100); 1767 }); 1768 }); 1769 1770 CHECK(ets.combine(std::plus<int>{}) <= int(utils::get_platform_max_threads())); 1771 } 1772 1773 //! Test for task arena in concurrent cases 1774 //! \brief \ref requirement 1775 TEST_CASE("Test for concurrent functionality") { 1776 TestConcurrentFunctionality(); 1777 } 1778 1779 //! Test for arena entry consistency 1780 //! \brief \ref requirement \ref error_guessing 1781 TEST_CASE("Test for task arena entry consistency") { 1782 TestArenaEntryConsistency(); 1783 } 1784 1785 //! Test for task arena attach functionality 1786 //! \brief \ref requirement \ref interface 1787 TEST_CASE("Test for the attach functionality") { 1788 TestAttach(4); 1789 } 1790 1791 //! Test for constant functor requirements 1792 //! \brief \ref requirement \ref interface 1793 TEST_CASE("Test for constant functor requirement") { 1794 TestConstantFunctorRequirement(); 1795 } 1796 1797 //! Test for move semantics support 1798 //! \brief \ref requirement \ref interface 1799 TEST_CASE("Move semantics support") { 1800 TestMoveSemantics(); 1801 } 1802 1803 //! Test for different return value types 1804 //! \brief \ref requirement \ref interface 1805 TEST_CASE("Return value test") { 1806 TestReturnValue(); 1807 } 1808 1809 //! Test for delegated task spawn in case of unsuccessful slot attach 1810 //! \brief \ref error_guessing 1811 TEST_CASE("Delegated spawn wait") { 1812 TestDelegatedSpawnWait(); 1813 } 1814 1815 //! Test task arena isolation functionality 1816 //! \brief \ref requirement \ref interface 1817 TEST_CASE("Isolated execute") { 1818 // Isolation tests cases is valid only for more then 2 threads 1819 if (tbb::this_task_arena::max_concurrency() > 2) { 1820 TestIsolatedExecute(); 1821 } 1822 } 1823 1824 //! Test for TBB Workers creation limits 1825 //! \brief \ref requirement 1826 TEST_CASE("Default workers limit") { 1827 TestDefaultWorkersLimit(); 1828 } 1829 1830 //! Test for workers migration between arenas 1831 //! \brief \ref error_guessing \ref stress 1832 TEST_CASE("Arena workers migration") { 1833 TestArenaWorkersMigration(); 1834 } 1835 1836 //! Test for multiple waits, threads should not block each other 1837 //! \brief \ref requirement 1838 TEST_CASE("Multiple waits") { 1839 TestMultipleWaits(); 1840 } 1841 1842 //! Test for small stack size settings and arena initialization 1843 //! \brief \ref error_guessing 1844 TEST_CASE("Small stack size") { 1845 TestSmallStackSize(); 1846 } 1847 1848 #if TBB_USE_EXCEPTIONS 1849 //! \brief \ref requirement \ref stress 1850 TEST_CASE("Test for exceptions during execute.") { 1851 ExceptionInExecute(); 1852 } 1853 1854 //! \brief \ref error_guessing 1855 TEST_CASE("Exception thrown during tbb::task_arena::execute call") { 1856 struct throwing_obj { 1857 throwing_obj() { 1858 volatile bool flag = true; 1859 if (flag) throw std::exception{}; 1860 } 1861 throwing_obj(const throwing_obj&) = default; 1862 ~throwing_obj() { FAIL("An destructor was called."); } 1863 }; 1864 1865 tbb::task_arena arena; 1866 1867 REQUIRE_THROWS_AS( [&] { 1868 arena.execute([] { 1869 return throwing_obj{}; 1870 }); 1871 }(), std::exception ); 1872 } 1873 #endif // TBB_USE_EXCEPTIONS 1874 //! \brief \ref stress 1875 TEST_CASE("Stress test with mixing functionality") { 1876 StressTestMixFunctionality(); 1877 } 1878 //! \brief \ref stress 1879 TEST_CASE("Workers oversubscription") { 1880 std::size_t num_threads = utils::get_platform_max_threads(); 1881 tbb::enumerable_thread_specific<bool> ets; 1882 tbb::global_control gl(tbb::global_control::max_allowed_parallelism, num_threads * 2); 1883 tbb::task_arena arena(static_cast<int>(num_threads) * 2); 1884 1885 utils::SpinBarrier barrier(num_threads * 2); 1886 1887 arena.execute([&] { 1888 tbb::parallel_for(std::size_t(0), num_threads * 2, 1889 [&] (const std::size_t&) { 1890 ets.local() = true; 1891 barrier.wait(); 1892 } 1893 ); 1894 }); 1895 1896 utils::yield(); 1897 1898 std::atomic<std::size_t> task_counter{0}; 1899 for (std::size_t i = 0; i < num_threads / 4 + 1; ++i) { 1900 arena.enqueue(enqueue_test_helper(arena, ets, task_counter)); 1901 } 1902 1903 while (task_counter < 100000) utils::yield(); 1904 1905 arena.execute([&] { 1906 tbb::parallel_for(std::size_t(0), num_threads * 2, 1907 [&] (const std::size_t&) { 1908 CHECK(ets.local()); 1909 barrier.wait(); 1910 } 1911 ); 1912 }); 1913 } 1914 #if TBB_USE_EXCEPTIONS 1915 //! The test for error in scheduling empty task_handle 1916 //! \brief \ref requirement 1917 TEST_CASE("Empty task_handle cannot be scheduled" 1918 * doctest::should_fail() //Test needs to revised as implementation uses assertions instead of exceptions 1919 * doctest::skip() //skip the test for now, to not pollute the test log 1920 ){ 1921 tbb::task_arena ta; 1922 1923 CHECK_THROWS_WITH_AS(ta.enqueue(tbb::task_handle{}), "Attempt to schedule empty task_handle", std::runtime_error); 1924 CHECK_THROWS_WITH_AS(tbb::this_task_arena::enqueue(tbb::task_handle{}), "Attempt to schedule empty task_handle", std::runtime_error); 1925 } 1926 #endif 1927 1928 #if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS 1929 1930 //! Basic test for is_inside_task in task_group 1931 //! \brief \ref interface \ref requirement 1932 TEST_CASE("is_inside_task in task_group"){ 1933 CHECK( false == tbb::is_inside_task()); 1934 1935 tbb::task_group tg; 1936 tg.run_and_wait([&]{ 1937 CHECK( true == tbb::is_inside_task()); 1938 }); 1939 } 1940 1941 //! Basic test for is_inside_task in arena::execute 1942 //! \brief \ref interface \ref requirement 1943 TEST_CASE("is_inside_task in arena::execute"){ 1944 CHECK( false == tbb::is_inside_task()); 1945 1946 tbb::task_arena arena; 1947 1948 arena.execute([&]{ 1949 // The execute method is processed outside of any task 1950 CHECK( false == tbb::is_inside_task()); 1951 }); 1952 } 1953 1954 //! The test for is_inside_task in arena::execute when inside other task 1955 //! \brief \ref error_guessing 1956 TEST_CASE("is_inside_task in arena::execute") { 1957 CHECK(false == tbb::is_inside_task()); 1958 1959 tbb::task_arena arena; 1960 tbb::task_group tg; 1961 tg.run_and_wait([&] { 1962 arena.execute([&] { 1963 // The execute method is processed outside of any task 1964 CHECK(false == tbb::is_inside_task()); 1965 }); 1966 }); 1967 } 1968 #endif //__TBB_PREVIEW_TASK_GROUP_EXTENSIONS 1969