1 /* 2 Copyright (c) 2005-2020 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "common/test.h" 18 19 #define __TBB_EXTRA_DEBUG 1 20 #include "common/spin_barrier.h" 21 #include "common/utils.h" 22 #include "common/utils_report.h" 23 #include "common/utils_concurrency_limit.h" 24 25 #include "tbb/task_arena.h" 26 #include "tbb/task_scheduler_observer.h" 27 #include "tbb/enumerable_thread_specific.h" 28 #include "tbb/parallel_for.h" 29 #include "tbb/global_control.h" 30 #include "tbb/concurrent_set.h" 31 #include "tbb/spin_mutex.h" 32 #include "tbb/spin_rw_mutex.h" 33 34 #include <stdexcept> 35 #include <cstdlib> 36 #include <cstdio> 37 #include <vector> 38 #include <thread> 39 #include <atomic> 40 41 //#include "harness_fp.h" 42 43 //! \file test_task_arena.cpp 44 //! \brief Test for [scheduler.task_arena scheduler.task_scheduler_observer] specification 45 46 //--------------------------------------------------// 47 // Test that task_arena::initialize and task_arena::terminate work when doing nothing else. 48 /* maxthread is treated as the biggest possible concurrency level. */ 49 void InitializeAndTerminate( int maxthread ) { 50 for( int i=0; i<200; ++i ) { 51 switch( i&3 ) { 52 // Arena is created inactive, initialization is always explicit. Lazy initialization is covered by other test functions. 53 // Explicit initialization can either keep the original values or change those. 54 // Arena termination can be explicit or implicit (in the destructor). 55 // TODO: extend with concurrency level checks if such a method is added. 56 default: { 57 tbb::task_arena arena( std::rand() % maxthread + 1 ); 58 CHECK_MESSAGE(!arena.is_active(), "arena should not be active until initialized"); 59 arena.initialize(); 60 CHECK(arena.is_active()); 61 arena.terminate(); 62 CHECK_MESSAGE(!arena.is_active(), "arena should not be active; it was terminated"); 63 break; 64 } 65 case 0: { 66 tbb::task_arena arena( 1 ); 67 CHECK_MESSAGE(!arena.is_active(), "arena should not be active until initialized"); 68 arena.initialize( std::rand() % maxthread + 1 ); // change the parameters 69 CHECK(arena.is_active()); 70 break; 71 } 72 case 1: { 73 tbb::task_arena arena( tbb::task_arena::automatic ); 74 CHECK(!arena.is_active()); 75 arena.initialize(); 76 CHECK(arena.is_active()); 77 break; 78 } 79 case 2: { 80 tbb::task_arena arena; 81 CHECK_MESSAGE(!arena.is_active(), "arena should not be active until initialized"); 82 arena.initialize( std::rand() % maxthread + 1 ); 83 CHECK(arena.is_active()); 84 arena.terminate(); 85 CHECK_MESSAGE(!arena.is_active(), "arena should not be active; it was terminated"); 86 break; 87 } 88 } 89 } 90 } 91 92 //--------------------------------------------------// 93 94 // Definitions used in more than one test 95 typedef tbb::blocked_range<int> Range; 96 97 // slot_id value: -1 is reserved by current_slot(), -2 is set in on_scheduler_exit() below 98 static tbb::enumerable_thread_specific<int> local_id, old_id, slot_id(-3); 99 100 void ResetTLS() { 101 local_id.clear(); 102 old_id.clear(); 103 slot_id.clear(); 104 } 105 106 class ArenaObserver : public tbb::task_scheduler_observer { 107 int myId; // unique observer/arena id within a test 108 int myMaxConcurrency; // concurrency of the associated arena 109 int myNumReservedSlots; // reserved slots in the associated arena 110 void on_scheduler_entry( bool is_worker ) override { 111 int current_index = tbb::this_task_arena::current_thread_index(); 112 CHECK(current_index < (myMaxConcurrency > 1 ? myMaxConcurrency : 2)); 113 if (is_worker) { 114 CHECK(current_index >= myNumReservedSlots); 115 } 116 CHECK_MESSAGE(!old_id.local(), "double call to on_scheduler_entry"); 117 old_id.local() = local_id.local(); 118 CHECK_MESSAGE(old_id.local() != myId, "double entry to the same arena"); 119 local_id.local() = myId; 120 slot_id.local() = current_index; 121 } 122 void on_scheduler_exit( bool /*is_worker*/ ) override { 123 CHECK_MESSAGE(local_id.local() == myId, "nesting of arenas is broken"); 124 CHECK(slot_id.local() == tbb::this_task_arena::current_thread_index()); 125 slot_id.local() = -2; 126 local_id.local() = old_id.local(); 127 old_id.local() = 0; 128 } 129 public: 130 ArenaObserver(tbb::task_arena &a, int maxConcurrency, int numReservedSlots, int id) 131 : tbb::task_scheduler_observer(a) 132 , myId(id) 133 , myMaxConcurrency(maxConcurrency) 134 , myNumReservedSlots(numReservedSlots) { 135 CHECK(myId); 136 observe(true); 137 } 138 ~ArenaObserver () { 139 CHECK_MESSAGE(!old_id.local(), "inconsistent observer state"); 140 } 141 }; 142 143 struct IndexTrackingBody { // Must be used together with ArenaObserver 144 void operator() ( const Range& ) const { 145 CHECK(slot_id.local() == tbb::this_task_arena::current_thread_index()); 146 for ( volatile int i = 0; i < 50000; ++i ) 147 ; 148 } 149 }; 150 151 struct AsynchronousWork { 152 utils::SpinBarrier &my_barrier; 153 bool my_is_blocking; 154 AsynchronousWork(utils::SpinBarrier &a_barrier, bool blocking = true) 155 : my_barrier(a_barrier), my_is_blocking(blocking) {} 156 void operator()() const { 157 CHECK_MESSAGE(local_id.local() != 0, "not in explicit arena"); 158 tbb::parallel_for(Range(0,500), IndexTrackingBody(), tbb::simple_partitioner()); 159 if(my_is_blocking) my_barrier.wait(); // must be asynchronous to master thread 160 else my_barrier.signalNoWait(); 161 } 162 }; 163 164 //-----------------------------------------------------------------------------------------// 165 166 // Test that task_arenas might be created and used from multiple application threads. 167 // Also tests arena observers. The parameter p is the index of an app thread running this test. 168 void TestConcurrentArenasFunc(int idx) { 169 // A regression test for observer activation order: 170 // check that arena observer can be activated before local observer 171 struct LocalObserver : public tbb::task_scheduler_observer { 172 LocalObserver() : tbb::task_scheduler_observer() { observe(true); } 173 }; 174 tbb::task_arena a1; 175 a1.initialize(1,0); 176 ArenaObserver o1(a1, 1, 0, idx*2+1); // the last argument is a "unique" observer/arena id for the test 177 CHECK_MESSAGE(o1.is_observing(), "Arena observer has not been activated"); 178 LocalObserver lo; 179 CHECK_MESSAGE(lo.is_observing(), "Local observer has not been activated"); 180 tbb::task_arena a2(2,1); 181 ArenaObserver o2(a2, 2, 1, idx*2+2); 182 CHECK_MESSAGE(o2.is_observing(), "Arena observer has not been activated"); 183 utils::SpinBarrier barrier(2); 184 AsynchronousWork work(barrier); 185 a1.enqueue(work); // put async work 186 barrier.wait(); 187 a2.enqueue(work); // another work 188 a2.execute(work); 189 a1.debug_wait_until_empty(); 190 a2.debug_wait_until_empty(); 191 } 192 193 void TestConcurrentArenas(int p) { 194 // TODO REVAMP fix with global control 195 ResetTLS(); 196 utils::NativeParallelFor( p, &TestConcurrentArenasFunc ); 197 } 198 //--------------------------------------------------// 199 // Test multiple application threads working with a single arena at the same time. 200 class MultipleMastersPart1 : utils::NoAssign { 201 tbb::task_arena &my_a; 202 utils::SpinBarrier &my_b1, &my_b2; 203 public: 204 MultipleMastersPart1( tbb::task_arena &a, utils::SpinBarrier &b1, utils::SpinBarrier &b2) 205 : my_a(a), my_b1(b1), my_b2(b2) {} 206 void operator()(int) const { 207 my_a.execute(AsynchronousWork(my_b2, /*blocking=*/false)); 208 my_b1.wait(); 209 // A regression test for bugs 1954 & 1971 210 my_a.enqueue(AsynchronousWork(my_b2, /*blocking=*/false)); 211 } 212 }; 213 214 class MultipleMastersPart2 : utils::NoAssign { 215 tbb::task_arena &my_a; 216 utils::SpinBarrier &my_b; 217 public: 218 MultipleMastersPart2( tbb::task_arena &a, utils::SpinBarrier &b) : my_a(a), my_b(b) {} 219 void operator()(int) const { 220 my_a.execute(AsynchronousWork(my_b, /*blocking=*/false)); 221 } 222 }; 223 224 class MultipleMastersPart3 : utils::NoAssign { 225 tbb::task_arena &my_a; 226 utils::SpinBarrier &my_b; 227 using wait_context = tbb::detail::d1::wait_context; 228 229 struct Runner : NoAssign { 230 wait_context& myWait; 231 Runner(wait_context& w) : myWait(w) {} 232 void operator()() const { 233 for ( volatile int i = 0; i < 10000; ++i ) 234 ; 235 myWait.release(); 236 } 237 }; 238 239 struct Waiter : NoAssign { 240 wait_context& myWait; 241 Waiter(wait_context& w) : myWait(w) {} 242 void operator()() const { 243 tbb::task_group_context ctx; 244 tbb::detail::d1::wait(myWait, ctx); 245 } 246 }; 247 248 public: 249 MultipleMastersPart3(tbb::task_arena &a, utils::SpinBarrier &b) 250 : my_a(a), my_b(b) {} 251 void operator()(int) const { 252 wait_context wait(0); 253 my_b.wait(); // increases chances for task_arena initialization contention 254 for( int i=0; i<100; ++i) { 255 wait.reserve(); 256 my_a.enqueue(Runner(wait)); 257 my_a.execute(Waiter(wait)); 258 } 259 my_b.wait(); 260 } 261 }; 262 263 void TestMultipleMasters(int p) { 264 { 265 ResetTLS(); 266 tbb::task_arena a(1,0); 267 a.initialize(); 268 ArenaObserver o(a, 1, 0, 1); 269 utils::SpinBarrier barrier1(p), barrier2(2*p+1); // each of p threads will submit two tasks signaling the barrier 270 NativeParallelFor( p, MultipleMastersPart1(a, barrier1, barrier2) ); 271 barrier2.wait(); 272 a.debug_wait_until_empty(); 273 } { 274 ResetTLS(); 275 tbb::task_arena a(2,1); 276 ArenaObserver o(a, 2, 1, 2); 277 utils::SpinBarrier barrier(p+2); 278 a.enqueue(AsynchronousWork(barrier, /*blocking=*/true)); // occupy the worker, a regression test for bug 1981 279 // TODO: buggy test. A worker threads need time to occupy the slot to prevent a master from taking an enqueue task. 280 utils::Sleep(10); 281 NativeParallelFor( p, MultipleMastersPart2(a, barrier) ); 282 barrier.wait(); 283 a.debug_wait_until_empty(); 284 } { 285 // Regression test for the bug 1981 part 2 (task_arena::execute() with wait_for_all for an enqueued task) 286 tbb::task_arena a(p,1); 287 utils::SpinBarrier barrier(p+1); // for masters to avoid endless waiting at least in some runs 288 // "Oversubscribe" the arena by 1 master thread 289 NativeParallelFor( p+1, MultipleMastersPart3(a, barrier) ); 290 a.debug_wait_until_empty(); 291 } 292 } 293 294 //--------------------------------------------------// 295 // TODO: explain what TestArenaEntryConsistency does 296 #include <sstream> 297 #include <stdexcept> 298 #include "oneapi/tbb/detail/_exception.h" 299 #include "common/fp_control.h" 300 301 struct TestArenaEntryBody : FPModeContext { 302 std::atomic<int> &my_stage; // each execute increases it 303 std::stringstream my_id; 304 bool is_caught, is_expected; 305 enum { arenaFPMode = 1 }; 306 307 TestArenaEntryBody(std::atomic<int> &s, int idx, int i) // init thread-specific instance 308 : FPModeContext(idx+i) 309 , my_stage(s) 310 , is_caught(false) 311 #if TBB_USE_EXCEPTIONS 312 , is_expected( (idx&(1<<i)) != 0 ) 313 #else 314 , is_expected(false) 315 #endif 316 { 317 my_id << idx << ':' << i << '@'; 318 } 319 void operator()() { // inside task_arena::execute() 320 // synchronize with other stages 321 int stage = my_stage++; 322 int slot = tbb::this_task_arena::current_thread_index(); 323 CHECK(slot >= 0); 324 CHECK(slot <= 1); 325 // wait until the third stage is delegated and then starts on slot 0 326 while(my_stage < 2+slot) std::this_thread::yield(); 327 // deduct its entry type and put it into id, it helps to find source of a problem 328 my_id << (stage < 3 ? (tbb::this_task_arena::current_thread_index()? 329 "delegated_to_worker" : stage < 2? "direct" : "delegated_to_master") 330 : stage == 3? "nested_same_ctx" : "nested_alien_ctx"); 331 AssertFPMode(arenaFPMode); 332 if (is_expected) { 333 TBB_TEST_THROW(std::logic_error(my_id.str())); 334 } 335 // no code can be put here since exceptions can be thrown 336 } 337 void on_exception(const char *e) { // outside arena, in catch block 338 is_caught = true; 339 CHECK(my_id.str() == e); 340 assertFPMode(); 341 } 342 void after_execute() { // outside arena and catch block 343 CHECK(is_caught == is_expected); 344 assertFPMode(); 345 } 346 }; 347 348 class ForEachArenaEntryBody : utils::NoAssign { 349 tbb::task_arena &my_a; // expected task_arena(2,1) 350 std::atomic<int> &my_stage; // each execute increases it 351 int my_idx; 352 353 public: 354 ForEachArenaEntryBody(tbb::task_arena &a, std::atomic<int> &c) 355 : my_a(a), my_stage(c), my_idx(0) {} 356 357 void test(int idx) { 358 my_idx = idx; 359 my_stage = 0; 360 NativeParallelFor(3, *this); // test cross-arena calls 361 CHECK(my_stage == 3); 362 my_a.execute(*this); // test nested calls 363 CHECK(my_stage == 5); 364 } 365 366 // task_arena functor for nested tests 367 void operator()() const { 368 test_arena_entry(3); // in current task group context 369 tbb::parallel_for(4, 5, *this); // in different context 370 } 371 372 // NativeParallelFor & parallel_for functor 373 void operator()(int i) const { 374 test_arena_entry(i); 375 } 376 377 private: 378 void test_arena_entry(int i) const { 379 GetRoundingMode(); 380 TestArenaEntryBody scoped_functor(my_stage, my_idx, i); 381 GetRoundingMode(); 382 #if TBB_USE_EXCEPTIONS 383 try { 384 my_a.execute(scoped_functor); 385 } catch(std::logic_error &e) { 386 scoped_functor.on_exception(e.what()); 387 } catch(...) { CHECK_MESSAGE(false, "Unexpected exception type"); } 388 #else 389 my_a.execute(scoped_functor); 390 #endif 391 scoped_functor.after_execute(); 392 } 393 }; 394 395 void TestArenaEntryConsistency() { 396 tbb::task_arena a(2, 1); 397 std::atomic<int> c; 398 ForEachArenaEntryBody body(a, c); 399 400 FPModeContext fp_scope(TestArenaEntryBody::arenaFPMode); 401 a.initialize(); // capture FP settings to arena 402 fp_scope.setNextFPMode(); 403 404 for (int i = 0; i < 100; i++) // not less than 32 = 2^5 of entry types 405 body.test(i); 406 } 407 //-------------------------------------------------- 408 // Test that the requested degree of concurrency for task_arena is achieved in various conditions 409 class TestArenaConcurrencyBody : utils::NoAssign { 410 tbb::task_arena &my_a; 411 int my_max_concurrency; 412 int my_reserved_slots; 413 utils::SpinBarrier *my_barrier; 414 utils::SpinBarrier *my_worker_barrier; 415 public: 416 TestArenaConcurrencyBody( tbb::task_arena &a, int max_concurrency, int reserved_slots, utils::SpinBarrier *b = NULL, utils::SpinBarrier *wb = NULL ) 417 : my_a(a), my_max_concurrency(max_concurrency), my_reserved_slots(reserved_slots), my_barrier(b), my_worker_barrier(wb) {} 418 // NativeParallelFor's functor 419 void operator()( int ) const { 420 CHECK_MESSAGE( local_id.local() == 0, "TLS was not cleaned?" ); 421 local_id.local() = 1; 422 my_a.execute( *this ); 423 } 424 // Arena's functor 425 void operator()() const { 426 int idx = tbb::this_task_arena::current_thread_index(); 427 CHECK( idx < (my_max_concurrency > 1 ? my_max_concurrency : 2) ); 428 CHECK( my_a.max_concurrency() == tbb::this_task_arena::max_concurrency() ); 429 int max_arena_concurrency = tbb::this_task_arena::max_concurrency(); 430 CHECK( max_arena_concurrency == my_max_concurrency ); 431 if ( my_worker_barrier ) { 432 if ( local_id.local() == 1 ) { 433 // Master thread in a reserved slot 434 CHECK_MESSAGE( idx < my_reserved_slots, "Masters are supposed to use only reserved slots in this test" ); 435 } else { 436 // Worker thread 437 CHECK( idx >= my_reserved_slots ); 438 my_worker_barrier->wait(); 439 } 440 } else if ( my_barrier ) 441 CHECK_MESSAGE( local_id.local() == 1, "Workers are not supposed to enter the arena in this test" ); 442 if ( my_barrier ) my_barrier->wait(); 443 else utils::Sleep( 10 ); 444 } 445 }; 446 447 void TestArenaConcurrency( int p, int reserved = 0, int step = 1) { 448 for (; reserved <= p; reserved += step) { 449 tbb::task_arena a( p, reserved ); 450 { // Check concurrency with worker & reserved master threads. 451 ResetTLS(); 452 utils::SpinBarrier b( p ); 453 utils::SpinBarrier wb( p-reserved ); 454 TestArenaConcurrencyBody test( a, p, reserved, &b, &wb ); 455 for ( int i = reserved; i < p; ++i ) 456 a.enqueue( test ); 457 if ( reserved==1 ) 458 test( 0 ); // calls execute() 459 else 460 utils::NativeParallelFor( reserved, test ); 461 a.debug_wait_until_empty(); 462 } { // Check if multiple masters alone can achieve maximum concurrency. 463 ResetTLS(); 464 utils::SpinBarrier b( p ); 465 utils::NativeParallelFor( p, TestArenaConcurrencyBody( a, p, reserved, &b ) ); 466 a.debug_wait_until_empty(); 467 } { // Check oversubscription by masters. 468 #if !_WIN32 || !_WIN64 469 // Some C++ implementations allocate 8MB stacks for std::thread on 32 bit platforms 470 // that makes impossible to create more than ~500 threads. 471 if ( !(sizeof(std::size_t) == 4 && p > 200) ) 472 #endif 473 #if TBB_TEST_LOW_WORKLOAD 474 if ( p <= 16 ) 475 #endif 476 { 477 ResetTLS(); 478 utils::NativeParallelFor(2 * p, TestArenaConcurrencyBody(a, p, reserved)); 479 a.debug_wait_until_empty(); 480 } 481 } 482 } 483 } 484 485 void TestConcurrentFunctionality(int min_thread_num = 1, int max_thread_num = 3) { 486 InitializeAndTerminate(max_thread_num); 487 for (int p = min_thread_num; p <= max_thread_num; ++p) { 488 TestConcurrentArenas(p); 489 TestMultipleMasters(p); 490 TestArenaConcurrency(p); 491 } 492 } 493 494 //--------------------------------------------------// 495 // Test creation/initialization of a task_arena that references an existing arena (aka attach). 496 // This part of the test uses the knowledge of task_arena internals 497 498 struct TaskArenaValidator { 499 int my_slot_at_construction; 500 const tbb::task_arena& my_arena; 501 TaskArenaValidator( const tbb::task_arena& other ) 502 : my_slot_at_construction(tbb::this_task_arena::current_thread_index()) 503 , my_arena(other) 504 {} 505 // Inspect the internal state 506 int concurrency() { return my_arena.debug_max_concurrency(); } 507 int reserved_for_masters() { return my_arena.debug_reserved_slots(); } 508 509 // This method should be called in task_arena::execute() for a captured arena 510 // by the same thread that created the validator. 511 void operator()() { 512 CHECK_MESSAGE( tbb::this_task_arena::current_thread_index()==my_slot_at_construction, 513 "Current thread index has changed since the validator construction" ); 514 } 515 }; 516 517 void ValidateAttachedArena( tbb::task_arena& arena, bool expect_activated, 518 int expect_concurrency, int expect_masters ) { 519 CHECK_MESSAGE( arena.is_active()==expect_activated, "Unexpected activation state" ); 520 if( arena.is_active() ) { 521 TaskArenaValidator validator( arena ); 522 CHECK_MESSAGE( validator.concurrency()==expect_concurrency, "Unexpected arena size" ); 523 CHECK_MESSAGE( validator.reserved_for_masters()==expect_masters, "Unexpected # of reserved slots" ); 524 if ( tbb::this_task_arena::current_thread_index() != tbb::task_arena::not_initialized ) { 525 CHECK(tbb::this_task_arena::current_thread_index() >= 0); 526 // for threads already in arena, check that the thread index remains the same 527 arena.execute( validator ); 528 } else { // not_initialized 529 // Test the deprecated method 530 CHECK(tbb::this_task_arena::current_thread_index() == -1); 531 } 532 533 // Ideally, there should be a check for having the same internal arena object, 534 // but that object is not easily accessible for implicit arenas. 535 } 536 } 537 538 struct TestAttachBody : utils::NoAssign { 539 static thread_local int my_idx; // safe to modify and use within the NativeParallelFor functor 540 const int maxthread; 541 TestAttachBody( int max_thr ) : maxthread(max_thr) {} 542 543 // The functor body for NativeParallelFor 544 void operator()( int idx ) const { 545 my_idx = idx; 546 547 int default_threads = tbb::this_task_arena::max_concurrency(); 548 549 tbb::task_arena arena = tbb::task_arena( tbb::task_arena::attach() ); 550 ValidateAttachedArena( arena, false, -1, -1 ); // Nothing yet to attach to 551 552 arena.terminate(); 553 ValidateAttachedArena( arena, false, -1, -1 ); 554 555 // attach to an auto-initialized arena 556 tbb::parallel_for(0, 1, [](int) {}); 557 558 tbb::task_arena arena2 = tbb::task_arena( tbb::task_arena::attach() ); 559 ValidateAttachedArena( arena2, true, default_threads, 1 ); 560 561 // attach to another task_arena 562 arena.initialize( maxthread, std::min(maxthread,idx) ); 563 arena.execute( *this ); 564 } 565 566 // The functor body for task_arena::execute above 567 void operator()() const { 568 tbb::task_arena arena2 = tbb::task_arena( tbb::task_arena::attach() ); 569 ValidateAttachedArena( arena2, true, maxthread, std::min(maxthread,my_idx) ); 570 } 571 572 // The functor body for tbb::parallel_for 573 void operator()( const Range& r ) const { 574 for( int i = r.begin(); i<r.end(); ++i ) { 575 tbb::task_arena arena2 = tbb::task_arena( tbb::task_arena::attach() ); 576 ValidateAttachedArena( arena2, true, tbb::this_task_arena::max_concurrency(), 1 ); 577 } 578 } 579 }; 580 581 thread_local int TestAttachBody::my_idx; 582 583 void TestAttach( int maxthread ) { 584 // Externally concurrent, but no concurrency within a thread 585 utils::NativeParallelFor( std::max(maxthread,4), TestAttachBody( maxthread ) ); 586 // Concurrent within the current arena; may also serve as a stress test 587 tbb::parallel_for( Range(0,10000*maxthread), TestAttachBody( maxthread ) ); 588 } 589 590 //--------------------------------------------------// 591 592 // Test that task_arena::enqueue does not tolerate a non-const functor. 593 // TODO: can it be reworked as SFINAE-based compile-time check? 594 struct TestFunctor { 595 void operator()() { CHECK_MESSAGE( false, "Non-const operator called" ); } 596 void operator()() const { /* library requires this overload only */ } 597 }; 598 599 void TestConstantFunctorRequirement() { 600 tbb::task_arena a; 601 TestFunctor tf; 602 a.enqueue( tf ); 603 } 604 605 //--------------------------------------------------// 606 607 #include "tbb/parallel_reduce.h" 608 #include "tbb/parallel_invoke.h" 609 610 // Test this_task_arena::isolate 611 namespace TestIsolatedExecuteNS { 612 template <typename NestedPartitioner> 613 class NestedParFor : utils::NoAssign { 614 public: 615 NestedParFor() {} 616 void operator()() const { 617 NestedPartitioner p; 618 tbb::parallel_for( 0, 10, utils::DummyBody( 10 ), p ); 619 } 620 }; 621 622 template <typename NestedPartitioner> 623 class ParForBody : utils::NoAssign { 624 bool myOuterIsolation; 625 tbb::enumerable_thread_specific<int> &myEts; 626 std::atomic<bool> &myIsStolen; 627 public: 628 ParForBody( bool outer_isolation, tbb::enumerable_thread_specific<int> &ets, std::atomic<bool> &is_stolen ) 629 : myOuterIsolation( outer_isolation ), myEts( ets ), myIsStolen( is_stolen ) {} 630 void operator()( int ) const { 631 int &e = myEts.local(); 632 if ( e++ > 0 ) myIsStolen = true; 633 if ( myOuterIsolation ) 634 NestedParFor<NestedPartitioner>()(); 635 else 636 tbb::this_task_arena::isolate( NestedParFor<NestedPartitioner>() ); 637 --e; 638 } 639 }; 640 641 template <typename OuterPartitioner, typename NestedPartitioner> 642 class OuterParFor : utils::NoAssign { 643 bool myOuterIsolation; 644 std::atomic<bool> &myIsStolen; 645 public: 646 OuterParFor( bool outer_isolation, std::atomic<bool> &is_stolen ) : myOuterIsolation( outer_isolation ), myIsStolen( is_stolen ) {} 647 void operator()() const { 648 tbb::enumerable_thread_specific<int> ets( 0 ); 649 OuterPartitioner p; 650 tbb::parallel_for( 0, 1000, ParForBody<NestedPartitioner>( myOuterIsolation, ets, myIsStolen ), p ); 651 } 652 }; 653 654 template <typename OuterPartitioner, typename NestedPartitioner> 655 void TwoLoopsTest( bool outer_isolation ) { 656 std::atomic<bool> is_stolen; 657 is_stolen = false; 658 const int max_repeats = 100; 659 if ( outer_isolation ) { 660 for ( int i = 0; i <= max_repeats; ++i ) { 661 tbb::this_task_arena::isolate( OuterParFor<OuterPartitioner, NestedPartitioner>( outer_isolation, is_stolen ) ); 662 if ( is_stolen ) break; 663 } 664 // TODO: was ASSERT_WARNING 665 if (!is_stolen) { 666 REPORT("Warning: isolate() should not block stealing on nested levels without isolation\n"); 667 } 668 } else { 669 for ( int i = 0; i <= max_repeats; ++i ) { 670 OuterParFor<OuterPartitioner, NestedPartitioner>( outer_isolation, is_stolen )(); 671 } 672 REQUIRE_MESSAGE( !is_stolen, "isolate() on nested levels should prevent stealing from outer leves" ); 673 } 674 } 675 676 void TwoLoopsTest( bool outer_isolation ) { 677 TwoLoopsTest<tbb::simple_partitioner, tbb::simple_partitioner>( outer_isolation ); 678 TwoLoopsTest<tbb::simple_partitioner, tbb::affinity_partitioner>( outer_isolation ); 679 TwoLoopsTest<tbb::affinity_partitioner, tbb::simple_partitioner>( outer_isolation ); 680 TwoLoopsTest<tbb::affinity_partitioner, tbb::affinity_partitioner>( outer_isolation ); 681 } 682 683 void TwoLoopsTest() { 684 TwoLoopsTest( true ); 685 TwoLoopsTest( false ); 686 } 687 //--------------------------------------------------// 688 class HeavyMixTestBody : utils::NoAssign { 689 tbb::enumerable_thread_specific<utils::FastRandom<>>& myRandom; 690 tbb::enumerable_thread_specific<int>& myIsolatedLevel; 691 int myNestedLevel; 692 693 template <typename Partitioner, typename Body> 694 static void RunTwoBodies( utils::FastRandom<>& rnd, const Body &body, Partitioner& p, tbb::task_group_context* ctx = NULL ) { 695 if ( rnd.get() % 2 ) { 696 if (ctx ) 697 tbb::parallel_for( 0, 2, body, p, *ctx ); 698 else 699 tbb::parallel_for( 0, 2, body, p ); 700 } else { 701 tbb::parallel_invoke( body, body ); 702 } 703 } 704 705 template <typename Partitioner> 706 class IsolatedBody : utils::NoAssign { 707 const HeavyMixTestBody &myHeavyMixTestBody; 708 Partitioner &myPartitioner; 709 public: 710 IsolatedBody( const HeavyMixTestBody &body, Partitioner &partitioner ) 711 : myHeavyMixTestBody( body ), myPartitioner( partitioner ) {} 712 void operator()() const { 713 RunTwoBodies( myHeavyMixTestBody.myRandom.local(), 714 HeavyMixTestBody( myHeavyMixTestBody.myRandom, myHeavyMixTestBody.myIsolatedLevel, 715 myHeavyMixTestBody.myNestedLevel + 1 ), 716 myPartitioner ); 717 } 718 }; 719 720 template <typename Partitioner> 721 void RunNextLevel( utils::FastRandom<>& rnd, int &isolated_level ) const { 722 Partitioner p; 723 switch ( rnd.get() % 2 ) { 724 case 0: { 725 // No features 726 tbb::task_group_context ctx; 727 RunTwoBodies( rnd, HeavyMixTestBody(myRandom, myIsolatedLevel, myNestedLevel + 1), p, &ctx ); 728 break; 729 } 730 case 1: { 731 // Isolation 732 int previous_isolation = isolated_level; 733 isolated_level = myNestedLevel; 734 tbb::this_task_arena::isolate( IsolatedBody<Partitioner>( *this, p ) ); 735 isolated_level = previous_isolation; 736 break; 737 } 738 } 739 } 740 public: 741 HeavyMixTestBody( tbb::enumerable_thread_specific<utils::FastRandom<>>& random, 742 tbb::enumerable_thread_specific<int>& isolated_level, int nested_level ) 743 : myRandom( random ), myIsolatedLevel( isolated_level ) 744 , myNestedLevel( nested_level ) {} 745 void operator()() const { 746 int &isolated_level = myIsolatedLevel.local(); 747 REQUIRE_MESSAGE( myNestedLevel > isolated_level, "The outer-level task should not be stolen on isolated level" ); 748 if ( myNestedLevel == 20 ) 749 return; 750 utils::FastRandom<>& rnd = myRandom.local(); 751 if ( rnd.get() % 2 == 1 ) { 752 RunNextLevel<tbb::auto_partitioner>( rnd, isolated_level ); 753 } else { 754 RunNextLevel<tbb::affinity_partitioner>( rnd, isolated_level ); 755 } 756 } 757 void operator()(int) const { 758 this->operator()(); 759 } 760 }; 761 762 struct RandomInitializer { 763 utils::FastRandom<> operator()() { 764 return utils::FastRandom<>( tbb::this_task_arena::current_thread_index() ); 765 } 766 }; 767 768 void HeavyMixTest() { 769 std::size_t num_threads = tbb::this_task_arena::max_concurrency() < 3 ? 3 : tbb::this_task_arena::max_concurrency(); 770 tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, num_threads); 771 772 RandomInitializer init_random; 773 tbb::enumerable_thread_specific<utils::FastRandom<>> random( init_random ); 774 tbb::enumerable_thread_specific<int> isolated_level( 0 ); 775 for ( int i = 0; i < 5; ++i ) { 776 HeavyMixTestBody b( random, isolated_level, 1 ); 777 b( 0 ); 778 } 779 } 780 781 //--------------------------------------------------// 782 #if TBB_USE_EXCEPTIONS 783 struct MyException {}; 784 struct IsolatedBodyThrowsException { 785 void operator()() const { 786 #if _MSC_VER && !__INTEL_COMPILER 787 // Workaround an unreachable code warning in task_arena_function. 788 volatile bool workaround = true; 789 if (workaround) 790 #endif 791 { 792 throw MyException(); 793 } 794 } 795 }; 796 struct ExceptionTestBody : utils::NoAssign { 797 tbb::enumerable_thread_specific<int>& myEts; 798 std::atomic<bool>& myIsStolen; 799 ExceptionTestBody( tbb::enumerable_thread_specific<int>& ets, std::atomic<bool>& is_stolen ) 800 : myEts( ets ), myIsStolen( is_stolen ) {} 801 void operator()( int i ) const { 802 try { 803 tbb::this_task_arena::isolate( IsolatedBodyThrowsException() ); 804 REQUIRE_MESSAGE( false, "The exception has been lost" ); 805 } 806 catch ( MyException ) {} 807 catch ( ... ) { 808 REQUIRE_MESSAGE( false, "Unexpected exception" ); 809 } 810 // Check that nested algorithms can steal outer-level tasks 811 int &e = myEts.local(); 812 if ( e++ > 0 ) myIsStolen = true; 813 // work imbalance increases chances for stealing 814 tbb::parallel_for( 0, 10+i, utils::DummyBody( 100 ) ); 815 --e; 816 } 817 }; 818 819 #endif /* TBB_USE_EXCEPTIONS */ 820 void ExceptionTest() { 821 #if TBB_USE_EXCEPTIONS 822 tbb::enumerable_thread_specific<int> ets; 823 std::atomic<bool> is_stolen; 824 is_stolen = false; 825 for ( ;; ) { 826 tbb::parallel_for( 0, 1000, ExceptionTestBody( ets, is_stolen ) ); 827 if ( is_stolen ) break; 828 } 829 REQUIRE_MESSAGE( is_stolen, "isolate should not affect non-isolated work" ); 830 #endif /* TBB_USE_EXCEPTIONS */ 831 } 832 833 struct NonConstBody { 834 unsigned int state; 835 void operator()() { 836 state ^= ~0u; 837 } 838 }; 839 840 void TestNonConstBody() { 841 NonConstBody body; 842 body.state = 0x6c97d5ed; 843 tbb::this_task_arena::isolate(body); 844 REQUIRE_MESSAGE(body.state == 0x93682a12, "The wrong state"); 845 } 846 847 // TODO: Consider tbb::task_group instead of explicit task API. 848 class TestEnqueueTask : public tbb::detail::d1::task { 849 using wait_context = tbb::detail::d1::wait_context; 850 851 tbb::enumerable_thread_specific<bool>& executed; 852 std::atomic<int>& completed; 853 854 public: 855 wait_context& waiter; 856 tbb::task_arena& arena; 857 static const int N = 100; 858 859 TestEnqueueTask(tbb::enumerable_thread_specific<bool>& exe, std::atomic<int>& c, wait_context& w, tbb::task_arena& a) 860 : executed(exe), completed(c), waiter(w), arena(a) {} 861 862 tbb::detail::d1::task* execute(tbb::detail::d1::execution_data&) override { 863 for (int i = 0; i < N; ++i) { 864 arena.enqueue([&]() { 865 executed.local() = true; 866 ++completed; 867 for (volatile int j = 0; j < 100; j++) std::this_thread::yield(); 868 waiter.release(1); 869 }); 870 } 871 return nullptr; 872 } 873 tbb::detail::d1::task* cancel(tbb::detail::d1::execution_data&) override { return nullptr; } 874 }; 875 876 class TestEnqueueIsolateBody : utils::NoCopy { 877 tbb::enumerable_thread_specific<bool>& executed; 878 std::atomic<int>& completed; 879 tbb::task_arena& arena; 880 public: 881 static const int N = 100; 882 883 TestEnqueueIsolateBody(tbb::enumerable_thread_specific<bool>& exe, std::atomic<int>& c, tbb::task_arena& a) 884 : executed(exe), completed(c), arena(a) {} 885 void operator()() { 886 tbb::task_group_context ctx; 887 tbb::detail::d1::wait_context waiter(N); 888 889 TestEnqueueTask root(executed, completed, waiter, arena); 890 tbb::detail::d1::execute_and_wait(root, ctx, waiter, ctx); 891 } 892 }; 893 894 void TestEnqueue() { 895 tbb::enumerable_thread_specific<bool> executed(false); 896 std::atomic<int> completed; 897 tbb::task_arena arena = tbb::task_arena(tbb::task_arena::attach()); 898 899 // Check that the main thread can process enqueued tasks. 900 completed = 0; 901 TestEnqueueIsolateBody b1(executed, completed, arena); 902 b1(); 903 904 if (!executed.local()) { 905 REPORT("Warning: No one enqueued task has executed by the main thread.\n"); 906 } 907 908 executed.local() = false; 909 completed = 0; 910 const int N = 100; 911 // Create enqueued tasks out of isolation. 912 913 tbb::task_group_context ctx; 914 tbb::detail::d1::wait_context waiter(N); 915 for (int i = 0; i < N; ++i) { 916 arena.enqueue([&]() { 917 executed.local() = true; 918 ++completed; 919 std::this_thread::yield(); 920 waiter.release(1); 921 }); 922 } 923 TestEnqueueIsolateBody b2(executed, completed, arena); 924 tbb::this_task_arena::isolate(b2); 925 REQUIRE_MESSAGE(executed.local() == false, "An enqueued task was executed within isolate."); 926 927 tbb::detail::d1::wait(waiter, ctx); 928 // while (completed < TestEnqueueTask::N + N) std::this_thread::yield(); 929 } 930 } 931 932 void TestIsolatedExecute() { 933 // At least 3 threads (owner + 2 thieves) are required to reproduce a situation when the owner steals outer 934 // level task on a nested level. If we have only one thief then it will execute outer level tasks first and 935 // the owner will not have a possibility to steal outer level tasks. 936 int platform_max_thread = tbb::this_task_arena::max_concurrency(); 937 int num_threads = utils::min( platform_max_thread, 3 ); 938 { 939 // Too many threads require too many work to reproduce the stealing from outer level. 940 tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, utils::max(num_threads, 7)); 941 TestIsolatedExecuteNS::TwoLoopsTest(); 942 TestIsolatedExecuteNS::HeavyMixTest(); 943 TestIsolatedExecuteNS::ExceptionTest(); 944 } 945 tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, num_threads); 946 TestIsolatedExecuteNS::HeavyMixTest(); 947 TestIsolatedExecuteNS::TestNonConstBody(); 948 TestIsolatedExecuteNS::TestEnqueue(); 949 } 950 951 //-----------------------------------------------------------------------------------------// 952 953 class TestDelegatedSpawnWaitBody : utils::NoAssign { 954 tbb::task_arena &my_a; 955 utils::SpinBarrier &my_b1, &my_b2; 956 public: 957 TestDelegatedSpawnWaitBody( tbb::task_arena &a, utils::SpinBarrier &b1, utils::SpinBarrier &b2) 958 : my_a(a), my_b1(b1), my_b2(b2) {} 959 // NativeParallelFor's functor 960 void operator()(int idx) const { 961 if ( idx==0 ) { // thread 0 works in the arena, thread 1 waits for it (to prevent test hang) 962 for (int i = 0; i < 2; ++i) { 963 my_a.enqueue([this] { my_b1.wait(); }); // tasks to sync with workers 964 } 965 tbb::task_group tg; 966 my_b1.wait(); // sync with the workers 967 for( int i=0; i<100000; ++i) { 968 my_a.execute([&tg] { tg.run([] {}); }); 969 } 970 my_a.execute([&tg] {tg.wait(); }); 971 } 972 973 my_b2.wait(); // sync both threads 974 } 975 }; 976 977 void TestDelegatedSpawnWait() { 978 // Regression test for a bug with missed wakeup notification from a delegated task 979 tbb::task_arena a(2,0); 980 a.initialize(); 981 utils::SpinBarrier barrier1(3), barrier2(2); 982 utils::NativeParallelFor( 2, TestDelegatedSpawnWaitBody(a, barrier1, barrier2) ); 983 a.debug_wait_until_empty(); 984 } 985 986 //-----------------------------------------------------------------------------------------// 987 988 class TestMultipleWaitsArenaWait : utils::NoAssign { 989 using wait_context = tbb::detail::d1::wait_context; 990 public: 991 TestMultipleWaitsArenaWait( int idx, int bunch_size, int num_tasks, std::vector<wait_context*>& waiters, std::atomic<int>& processed, tbb::task_group_context& tgc ) 992 : my_idx( idx ), my_bunch_size( bunch_size ), my_num_tasks(num_tasks), my_waiters( waiters ), my_processed( processed ), my_context(tgc) {} 993 void operator()() const { 994 ++my_processed; 995 // Wait for all tasks 996 if ( my_idx < my_num_tasks ) { 997 tbb::detail::d1::wait(*my_waiters[my_idx], my_context); 998 } 999 // Signal waiting tasks 1000 if ( my_idx >= my_bunch_size ) { 1001 my_waiters[my_idx-my_bunch_size]->release(); 1002 } 1003 } 1004 private: 1005 int my_idx; 1006 int my_bunch_size; 1007 int my_num_tasks; 1008 std::vector<wait_context*>& my_waiters; 1009 std::atomic<int>& my_processed; 1010 tbb::task_group_context& my_context; 1011 }; 1012 1013 class TestMultipleWaitsThreadBody : utils::NoAssign { 1014 using wait_context = tbb::detail::d1::wait_context; 1015 public: 1016 TestMultipleWaitsThreadBody( int bunch_size, int num_tasks, tbb::task_arena& a, std::vector<wait_context*>& waiters, std::atomic<int>& processed, tbb::task_group_context& tgc ) 1017 : my_bunch_size( bunch_size ), my_num_tasks( num_tasks ), my_arena( a ), my_waiters( waiters ), my_processed( processed ), my_context(tgc) {} 1018 void operator()( int idx ) const { 1019 my_arena.execute( TestMultipleWaitsArenaWait( idx, my_bunch_size, my_num_tasks, my_waiters, my_processed, my_context ) ); 1020 --my_processed; 1021 } 1022 private: 1023 int my_bunch_size; 1024 int my_num_tasks; 1025 tbb::task_arena& my_arena; 1026 std::vector<wait_context*>& my_waiters; 1027 std::atomic<int>& my_processed; 1028 tbb::task_group_context& my_context; 1029 }; 1030 1031 void TestMultipleWaits( int num_threads, int num_bunches, int bunch_size ) { 1032 tbb::task_arena a( num_threads ); 1033 const int num_tasks = (num_bunches-1)*bunch_size; 1034 1035 tbb::task_group_context tgc; 1036 std::vector<tbb::detail::d1::wait_context*> waiters(num_tasks); 1037 for (auto& w : waiters) w = new tbb::detail::d1::wait_context(0); 1038 1039 std::atomic<int> processed(0); 1040 for ( int repeats = 0; repeats<10; ++repeats ) { 1041 int idx = 0; 1042 for ( int bunch = 0; bunch < num_bunches-1; ++bunch ) { 1043 // Sync with the previous bunch of waiters to prevent "false" nested dependicies (when a nested task waits for an outer task). 1044 while ( processed < bunch*bunch_size ) std::this_thread::yield(); 1045 // Run the bunch of threads/waiters that depend on the next bunch of threads/waiters. 1046 for ( int i = 0; i<bunch_size; ++i ) { 1047 waiters[idx]->reserve(); 1048 std::thread( TestMultipleWaitsThreadBody( bunch_size, num_tasks, a, waiters, processed, tgc ), idx++ ).detach(); 1049 } 1050 } 1051 // No sync because the threads of the last bunch do not call wait_for_all. 1052 // Run the last bunch of threads. 1053 for ( int i = 0; i<bunch_size; ++i ) 1054 std::thread( TestMultipleWaitsThreadBody( bunch_size, num_tasks, a, waiters, processed, tgc ), idx++ ).detach(); 1055 while ( processed ) std::this_thread::yield(); 1056 } 1057 for (auto w : waiters) delete w; 1058 } 1059 1060 void TestMultipleWaits() { 1061 // Limit the number of threads to prevent heavy oversubscription. 1062 #if TBB_TEST_LOW_WORKLOAD 1063 const int max_threads = std::min( 4, tbb::this_task_arena::max_concurrency() ); 1064 #else 1065 const int max_threads = std::min( 16, tbb::this_task_arena::max_concurrency() ); 1066 #endif 1067 1068 utils::FastRandom<> rnd(1234); 1069 for ( int threads = 1; threads <= max_threads; threads += utils::max( threads/2, 1 ) ) { 1070 for ( int i = 0; i<3; ++i ) { 1071 const int num_bunches = 3 + rnd.get()%3; 1072 const int bunch_size = max_threads + rnd.get()%max_threads; 1073 TestMultipleWaits( threads, num_bunches, bunch_size ); 1074 } 1075 } 1076 } 1077 1078 //--------------------------------------------------// 1079 1080 void TestSmallStackSize() { 1081 tbb::global_control gc(tbb::global_control::thread_stack_size, 1082 tbb::global_control::active_value(tbb::global_control::thread_stack_size) / 2 ); 1083 // The test produces the warning (not a error) if fails. So the test is run many times 1084 // to make the log annoying (to force to consider it as an error). 1085 for (int i = 0; i < 100; ++i) { 1086 tbb::task_arena a; 1087 a.initialize(); 1088 } 1089 } 1090 1091 //--------------------------------------------------// 1092 1093 namespace TestMoveSemanticsNS { 1094 struct TestFunctor { 1095 void operator()() const {}; 1096 }; 1097 1098 struct MoveOnlyFunctor : utils::MoveOnly, TestFunctor { 1099 MoveOnlyFunctor() : utils::MoveOnly() {}; 1100 MoveOnlyFunctor(MoveOnlyFunctor&& other) : utils::MoveOnly(std::move(other)) {}; 1101 }; 1102 1103 struct MovePreferableFunctor : utils::Movable, TestFunctor { 1104 MovePreferableFunctor() : utils::Movable() {}; 1105 MovePreferableFunctor(MovePreferableFunctor&& other) : utils::Movable( std::move(other) ) {}; 1106 MovePreferableFunctor(const MovePreferableFunctor& other) : utils::Movable(other) {}; 1107 }; 1108 1109 struct NoMoveNoCopyFunctor : utils::NoCopy, TestFunctor { 1110 NoMoveNoCopyFunctor() : utils::NoCopy() {}; 1111 // mv ctor is not allowed as cp ctor from parent NoCopy 1112 private: 1113 NoMoveNoCopyFunctor(NoMoveNoCopyFunctor&&); 1114 }; 1115 1116 void TestFunctors() { 1117 tbb::task_arena ta; 1118 MovePreferableFunctor mpf; 1119 // execute() doesn't have any copies or moves of arguments inside the impl 1120 ta.execute( NoMoveNoCopyFunctor() ); 1121 1122 ta.enqueue( MoveOnlyFunctor() ); 1123 ta.enqueue( mpf ); 1124 REQUIRE_MESSAGE(mpf.alive, "object was moved when was passed by lval"); 1125 mpf.Reset(); 1126 ta.enqueue( std::move(mpf) ); 1127 REQUIRE_MESSAGE(!mpf.alive, "object was copied when was passed by rval"); 1128 mpf.Reset(); 1129 } 1130 } 1131 1132 void TestMoveSemantics() { 1133 TestMoveSemanticsNS::TestFunctors(); 1134 } 1135 1136 //--------------------------------------------------// 1137 1138 #include <vector> 1139 1140 #include "common/state_trackable.h" 1141 1142 namespace TestReturnValueNS { 1143 struct noDefaultTag {}; 1144 class ReturnType : public StateTrackable<> { 1145 static const int SIZE = 42; 1146 std::vector<int> data; 1147 public: 1148 ReturnType(noDefaultTag) : StateTrackable<>(0) {} 1149 // Define copy constructor to test that it is never called 1150 ReturnType(const ReturnType& r) : StateTrackable<>(r), data(r.data) {} 1151 ReturnType(ReturnType&& r) : StateTrackable<>(std::move(r)), data(std::move(r.data)) {} 1152 1153 void fill() { 1154 for (int i = 0; i < SIZE; ++i) 1155 data.push_back(i); 1156 } 1157 void check() { 1158 REQUIRE(data.size() == unsigned(SIZE)); 1159 for (int i = 0; i < SIZE; ++i) 1160 REQUIRE(data[i] == i); 1161 StateTrackableCounters::counters_type& cnts = StateTrackableCounters::counters; 1162 REQUIRE(cnts[StateTrackableBase::DefaultInitialized] == 0); 1163 REQUIRE(cnts[StateTrackableBase::DirectInitialized] == 1); 1164 std::size_t copied = cnts[StateTrackableBase::CopyInitialized]; 1165 std::size_t moved = cnts[StateTrackableBase::MoveInitialized]; 1166 REQUIRE(cnts[StateTrackableBase::Destroyed] == copied + moved); 1167 // The number of copies/moves should not exceed 3: function return, store to an internal storage, 1168 // acquire internal storage. 1169 REQUIRE((copied == 0 && moved <=3)); 1170 } 1171 }; 1172 1173 template <typename R> 1174 R function() { 1175 noDefaultTag tag; 1176 R r(tag); 1177 r.fill(); 1178 return r; 1179 } 1180 1181 template <> 1182 void function<void>() {} 1183 1184 template <typename R> 1185 struct Functor { 1186 R operator()() const { 1187 return function<R>(); 1188 } 1189 }; 1190 1191 tbb::task_arena& arena() { 1192 static tbb::task_arena a; 1193 return a; 1194 } 1195 1196 template <typename F> 1197 void TestExecute(F &f) { 1198 StateTrackableCounters::reset(); 1199 ReturnType r = arena().execute(f); 1200 r.check(); 1201 } 1202 1203 template <typename F> 1204 void TestExecute(const F &f) { 1205 StateTrackableCounters::reset(); 1206 ReturnType r = arena().execute(f); 1207 r.check(); 1208 } 1209 template <typename F> 1210 void TestIsolate(F &f) { 1211 StateTrackableCounters::reset(); 1212 ReturnType r = tbb::this_task_arena::isolate(f); 1213 r.check(); 1214 } 1215 1216 template <typename F> 1217 void TestIsolate(const F &f) { 1218 StateTrackableCounters::reset(); 1219 ReturnType r = tbb::this_task_arena::isolate(f); 1220 r.check(); 1221 } 1222 1223 void Test() { 1224 TestExecute(Functor<ReturnType>()); 1225 Functor<ReturnType> f1; 1226 TestExecute(f1); 1227 TestExecute(function<ReturnType>); 1228 1229 arena().execute(Functor<void>()); 1230 Functor<void> f2; 1231 arena().execute(f2); 1232 arena().execute(function<void>); 1233 TestIsolate(Functor<ReturnType>()); 1234 TestIsolate(f1); 1235 TestIsolate(function<ReturnType>); 1236 tbb::this_task_arena::isolate(Functor<void>()); 1237 tbb::this_task_arena::isolate(f2); 1238 tbb::this_task_arena::isolate(function<void>); 1239 } 1240 } 1241 1242 void TestReturnValue() { 1243 TestReturnValueNS::Test(); 1244 } 1245 1246 //--------------------------------------------------// 1247 1248 // MyObserver checks if threads join to the same arena 1249 struct MyObserver: public tbb::task_scheduler_observer { 1250 tbb::enumerable_thread_specific<tbb::task_arena*>& my_tls; 1251 tbb::task_arena& my_arena; 1252 std::atomic<int>& my_failure_counter; 1253 std::atomic<int>& my_counter; 1254 1255 MyObserver(tbb::task_arena& a, 1256 tbb::enumerable_thread_specific<tbb::task_arena*>& tls, 1257 std::atomic<int>& failure_counter, 1258 std::atomic<int>& counter) 1259 : tbb::task_scheduler_observer(a), my_tls(tls), my_arena(a), 1260 my_failure_counter(failure_counter), my_counter(counter) { 1261 observe(true); 1262 } 1263 void on_scheduler_entry(bool worker) override { 1264 if (worker) { 1265 ++my_counter; 1266 tbb::task_arena*& cur_arena = my_tls.local(); 1267 if (cur_arena != 0 && cur_arena != &my_arena) { 1268 ++my_failure_counter; 1269 } 1270 cur_arena = &my_arena; 1271 } 1272 } 1273 }; 1274 1275 struct MyLoopBody { 1276 utils::SpinBarrier& m_barrier; 1277 MyLoopBody(utils::SpinBarrier& b):m_barrier(b) { } 1278 void operator()(int) const { 1279 m_barrier.wait(); 1280 } 1281 }; 1282 1283 struct TaskForArenaExecute { 1284 utils::SpinBarrier& m_barrier; 1285 TaskForArenaExecute(utils::SpinBarrier& b):m_barrier(b) { } 1286 void operator()() const { 1287 tbb::parallel_for(0, tbb::this_task_arena::max_concurrency(), 1288 MyLoopBody(m_barrier), tbb::simple_partitioner() 1289 ); 1290 } 1291 }; 1292 1293 struct ExecuteParallelFor { 1294 int n_per_thread; 1295 int n_repetitions; 1296 std::vector<tbb::task_arena>& arenas; 1297 utils::SpinBarrier& arena_barrier; 1298 utils::SpinBarrier& master_barrier; 1299 ExecuteParallelFor(const int n_per_thread_, const int n_repetitions_, 1300 std::vector<tbb::task_arena>& arenas_, 1301 utils::SpinBarrier& arena_barrier_, utils::SpinBarrier& master_barrier_) 1302 : n_per_thread(n_per_thread_), n_repetitions(n_repetitions_), arenas(arenas_), 1303 arena_barrier(arena_barrier_), master_barrier(master_barrier_){ } 1304 void operator()(int i) const { 1305 for (int j = 0; j < n_repetitions; ++j) { 1306 arenas[i].execute(TaskForArenaExecute(arena_barrier)); 1307 for(volatile int k = 0; k < n_per_thread; ++k){/* waiting until workers fall asleep */} 1308 master_barrier.wait(); 1309 } 1310 } 1311 }; 1312 1313 // if n_threads == -1 then global_control initialized with default value 1314 void TestArenaWorkersMigrationWithNumThreads(int n_threads = 0) { 1315 if (n_threads == 0) { 1316 n_threads = tbb::this_task_arena::max_concurrency(); 1317 } 1318 const int max_n_arenas = 8; 1319 int n_arenas = 2; 1320 if(n_threads >= 16) 1321 n_arenas = max_n_arenas; 1322 else if (n_threads >= 8) 1323 n_arenas = 4; 1324 n_threads = n_arenas * (n_threads / n_arenas); 1325 const int n_per_thread = 10000000; 1326 const int n_repetitions = 100; 1327 const int n_outer_repetitions = 20; 1328 std::multiset<float> failure_ratio; // for median calculating 1329 tbb::global_control control(tbb::global_control::max_allowed_parallelism, n_threads - (n_arenas - 1)); 1330 utils::SpinBarrier master_barrier(n_arenas); 1331 utils::SpinBarrier arena_barrier(n_threads); 1332 MyObserver* observer[max_n_arenas]; 1333 std::vector<tbb::task_arena> arenas(n_arenas); 1334 std::atomic<int> failure_counter; 1335 std::atomic<int> counter; 1336 tbb::enumerable_thread_specific<tbb::task_arena*> tls; 1337 for (int i = 0; i < n_arenas; ++i) { 1338 arenas[i].initialize(n_threads / n_arenas); 1339 observer[i] = new MyObserver(arenas[i], tls, failure_counter, counter); 1340 } 1341 int ii = 0; 1342 for (; ii < n_outer_repetitions; ++ii) { 1343 failure_counter = 0; 1344 counter = 0; 1345 // Main code 1346 utils::NativeParallelFor(n_arenas, ExecuteParallelFor(n_per_thread, n_repetitions, 1347 arenas, arena_barrier, master_barrier)); 1348 // TODO: get rid of check below by setting ratio between n_threads and n_arenas 1349 failure_ratio.insert((counter != 0 ? float(failure_counter) / counter : 1.0f)); 1350 tls.clear(); 1351 // collect 3 elements in failure_ratio before calculating median 1352 if (ii > 1) { 1353 std::multiset<float>::iterator it = failure_ratio.begin(); 1354 std::advance(it, failure_ratio.size() / 2); 1355 if (*it < 0.02) 1356 break; 1357 } 1358 } 1359 for (int i = 0; i < n_arenas; ++i) { 1360 delete observer[i]; 1361 } 1362 // check if median is so big 1363 std::multiset<float>::iterator it = failure_ratio.begin(); 1364 std::advance(it, failure_ratio.size() / 2); 1365 // TODO: decrease constants 0.05 and 0.3 by setting ratio between n_threads and n_arenas 1366 if (*it > 0.05) { 1367 REPORT("Warning: So many cases when threads join to different arenas.\n"); 1368 REQUIRE_MESSAGE(*it <= 0.3, "A lot of cases when threads join to different arenas.\n"); 1369 } 1370 } 1371 1372 void TestArenaWorkersMigration() { 1373 TestArenaWorkersMigrationWithNumThreads(4); 1374 if (tbb::this_task_arena::max_concurrency() != 4) { 1375 TestArenaWorkersMigrationWithNumThreads(); 1376 } 1377 } 1378 1379 //--------------------------------------------------// 1380 void TestDefaultCreatedWorkersAmount() { 1381 int threads = tbb::this_task_arena::max_concurrency(); 1382 utils::NativeParallelFor(1, [threads](int idx) { 1383 REQUIRE_MESSAGE(idx == 0, "more than 1 thread is going to reset TLS"); 1384 utils::SpinBarrier barrier(threads); 1385 ResetTLS(); 1386 for (int trail = 0; trail < 10; ++trail) { 1387 tbb::parallel_for(0, threads, [threads, &barrier](int) { 1388 REQUIRE_MESSAGE(threads == tbb::this_task_arena::max_concurrency(), "concurrency level is not equal specified threadnum"); 1389 REQUIRE_MESSAGE(tbb::this_task_arena::current_thread_index() < tbb::this_task_arena::max_concurrency(), "amount of created threads is more than specified by default"); 1390 local_id.local() = 1; 1391 // If there is more threads than expected, 'sleep' gives a chance to join unexpected threads. 1392 utils::Sleep(1); 1393 barrier.wait(); 1394 }, tbb::simple_partitioner()); 1395 REQUIRE_MESSAGE(local_id.size() == size_t(threads), "amount of created threads is not equal to default num"); 1396 } 1397 }); 1398 } 1399 1400 void TestAbilityToCreateWorkers(int thread_num) { 1401 tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, thread_num); 1402 // Checks only some part of reserved-master threads amount: 1403 // 0 and 1 reserved threads are important cases but it is also needed 1404 // to collect some statistic data with other amount and to not consume 1405 // whole test sesion time checking each amount 1406 TestArenaConcurrency(thread_num - 1, 0, int(thread_num / 2.72)); 1407 TestArenaConcurrency(thread_num, 1, int(thread_num / 3.14)); 1408 } 1409 1410 void TestDefaultWorkersLimit() { 1411 TestDefaultCreatedWorkersAmount(); 1412 #if TBB_TEST_LOW_WORKLOAD 1413 TestAbilityToCreateWorkers(24); 1414 #else 1415 TestAbilityToCreateWorkers(256); 1416 #endif 1417 } 1418 1419 #if TBB_USE_EXCEPTIONS 1420 1421 void ExceptionInExecute() { 1422 std::size_t thread_number = utils::get_platform_max_threads(); 1423 tbb::task_arena test_arena(thread_number / 2, thread_number / 2); 1424 1425 std::atomic<int> canceled_task{}; 1426 1427 auto parallel_func = [&test_arena, &canceled_task] (std::size_t) { 1428 for (std::size_t i = 0; i < 1000; ++i) { 1429 try { 1430 test_arena.execute([] { 1431 volatile bool suppress_unreachable_code_warning = true; 1432 if (suppress_unreachable_code_warning) { 1433 throw -1; 1434 } 1435 }); 1436 FAIL("An exception should have thrown."); 1437 } catch (int) { 1438 ++canceled_task; 1439 } catch (...) { 1440 FAIL("Wrong type of exception."); 1441 } 1442 } 1443 }; 1444 1445 utils::NativeParallelFor(thread_number, parallel_func); 1446 CHECK(canceled_task == thread_number * 1000); 1447 } 1448 1449 #endif // TBB_USE_EXCEPTIONS 1450 1451 class simple_observer : public tbb::task_scheduler_observer { 1452 static std::atomic<int> idx_counter; 1453 int my_idx; 1454 int myMaxConcurrency; // concurrency of the associated arena 1455 int myNumReservedSlots; // reserved slots in the associated arena 1456 void on_scheduler_entry( bool is_worker ) override { 1457 int current_index = tbb::this_task_arena::current_thread_index(); 1458 CHECK(current_index < (myMaxConcurrency > 1 ? myMaxConcurrency : 2)); 1459 if (is_worker) { 1460 CHECK(current_index >= myNumReservedSlots); 1461 } 1462 } 1463 void on_scheduler_exit( bool /*is_worker*/ ) override 1464 {} 1465 public: 1466 simple_observer(tbb::task_arena &a, int maxConcurrency, int numReservedSlots) 1467 : tbb::task_scheduler_observer(a), my_idx(idx_counter++) 1468 , myMaxConcurrency(maxConcurrency) 1469 , myNumReservedSlots(numReservedSlots) { 1470 observe(true); 1471 } 1472 1473 friend bool operator<(const simple_observer& lhs, const simple_observer& rhs) { 1474 return lhs.my_idx < rhs.my_idx; 1475 } 1476 }; 1477 1478 std::atomic<int> simple_observer::idx_counter{}; 1479 1480 struct arena_handler { 1481 enum arena_status { 1482 alive, 1483 deleting, 1484 deleted 1485 }; 1486 1487 tbb::task_arena* arena; 1488 1489 std::atomic<arena_status> status{alive}; 1490 tbb::spin_rw_mutex arena_in_use{}; 1491 1492 tbb::concurrent_set<simple_observer> observers; 1493 1494 arena_handler(tbb::task_arena* ptr) : arena(ptr) 1495 {} 1496 1497 friend bool operator<(const arena_handler& lhs, const arena_handler& rhs) { 1498 return lhs.arena < rhs.arena; 1499 } 1500 }; 1501 1502 // TODO: Add observer operations 1503 void StressTestMixFunctionality() { 1504 enum operation_type { 1505 create_arena, 1506 delete_arena, 1507 attach_observer, 1508 detach_observer, 1509 arena_execute, 1510 enqueue_task, 1511 last_operation_marker 1512 }; 1513 1514 std::size_t operations_number = last_operation_marker; 1515 std::size_t thread_number = utils::get_platform_max_threads(); 1516 utils::FastRandom<> operation_rnd(42); 1517 tbb::spin_mutex random_operation_guard; 1518 1519 auto get_random_operation = [&operation_rnd, &random_operation_guard, operations_number] () { 1520 tbb::spin_mutex::scoped_lock lock(random_operation_guard); 1521 return static_cast<operation_type>(operation_rnd.get() % operations_number); 1522 }; 1523 1524 utils::FastRandom<> arena_rnd(42); 1525 tbb::spin_mutex random_arena_guard; 1526 auto get_random_arena = [&arena_rnd, &random_arena_guard] () { 1527 tbb::spin_mutex::scoped_lock lock(random_arena_guard); 1528 return arena_rnd.get(); 1529 }; 1530 1531 tbb::concurrent_set<arena_handler> arenas_pool; 1532 1533 std::vector<std::thread> thread_pool; 1534 1535 utils::SpinBarrier thread_barrier(thread_number); 1536 std::size_t max_operations = 10000; 1537 std::atomic<std::size_t> curr_operation{}; 1538 auto thread_func = [&] () { 1539 arenas_pool.emplace(new tbb::task_arena()); 1540 thread_barrier.wait(); 1541 while (curr_operation++ < max_operations) { 1542 switch (get_random_operation()) { 1543 case create_arena : 1544 { 1545 arenas_pool.emplace(new tbb::task_arena()); 1546 break; 1547 } 1548 case delete_arena : 1549 { 1550 auto curr_arena = arenas_pool.begin(); 1551 for (; curr_arena != arenas_pool.end(); ++curr_arena) { 1552 arena_handler::arena_status curr_status = arena_handler::alive; 1553 if (curr_arena->status.compare_exchange_strong(curr_status, arena_handler::deleting)) { 1554 break; 1555 } 1556 } 1557 1558 if (curr_arena == arenas_pool.end()) break; 1559 1560 tbb::spin_rw_mutex::scoped_lock lock(curr_arena->arena_in_use, /*writer*/ true); 1561 1562 delete curr_arena->arena; 1563 curr_arena->status.store(arena_handler::deleted); 1564 1565 break; 1566 } 1567 case attach_observer : 1568 { 1569 tbb::spin_rw_mutex::scoped_lock lock{}; 1570 auto curr_arena = arenas_pool.begin(); 1571 for (; curr_arena != arenas_pool.end(); ++curr_arena) { 1572 if (lock.try_acquire(curr_arena->arena_in_use, /*writer*/ false)) { 1573 if (curr_arena->status == arena_handler::alive) { 1574 break; 1575 } else { 1576 lock.release(); 1577 } 1578 } 1579 } 1580 1581 if (curr_arena == arenas_pool.end()) break; 1582 1583 { 1584 curr_arena->observers.emplace(*curr_arena->arena, thread_number, 1); 1585 } 1586 1587 break; 1588 } 1589 case detach_observer : 1590 { 1591 auto arena_number = get_random_arena() % arenas_pool.size(); 1592 auto curr_arena = arenas_pool.begin(); 1593 std::advance(curr_arena, arena_number); 1594 1595 for (auto it = curr_arena->observers.begin(); it != curr_arena->observers.end(); ++it) { 1596 if (it->is_observing()) { 1597 it->observe(false); 1598 break; 1599 } 1600 } 1601 1602 break; 1603 } 1604 case arena_execute : 1605 { 1606 tbb::spin_rw_mutex::scoped_lock lock{}; 1607 auto curr_arena = arenas_pool.begin(); 1608 for (; curr_arena != arenas_pool.end(); ++curr_arena) { 1609 if (lock.try_acquire(curr_arena->arena_in_use, /*writer*/ false)) { 1610 if (curr_arena->status == arena_handler::alive) { 1611 break; 1612 } else { 1613 lock.release(); 1614 } 1615 } 1616 } 1617 1618 if (curr_arena == arenas_pool.end()) break; 1619 1620 curr_arena->arena->execute([] () { 1621 tbb::parallel_for(tbb::blocked_range<std::size_t>(0, 10000), [] (tbb::blocked_range<std::size_t>&) { 1622 std::atomic<int> sum{}; 1623 // Make some work 1624 for (; sum < 100; ++sum) ; 1625 }); 1626 }); 1627 1628 break; 1629 } 1630 case enqueue_task : 1631 { 1632 tbb::spin_rw_mutex::scoped_lock lock{}; 1633 auto curr_arena = arenas_pool.begin(); 1634 for (; curr_arena != arenas_pool.end(); ++curr_arena) { 1635 if (lock.try_acquire(curr_arena->arena_in_use, /*writer*/ false)) { 1636 if (curr_arena->status == arena_handler::alive) { 1637 break; 1638 } else { 1639 lock.release(); 1640 } 1641 } 1642 } 1643 1644 if (curr_arena == arenas_pool.end()) break; 1645 1646 curr_arena->arena->enqueue([] { 1647 std::atomic<int> sum{}; 1648 // Make some work 1649 for (; sum < 100000; ++sum) ; 1650 }); 1651 1652 break; 1653 } 1654 case last_operation_marker : 1655 break; 1656 } 1657 } 1658 }; 1659 1660 for (std::size_t i = 0; i < thread_number - 1; ++i) { 1661 thread_pool.emplace_back(thread_func); 1662 } 1663 1664 thread_func(); 1665 1666 for (std::size_t i = 0; i < thread_number - 1; ++i) { 1667 if (thread_pool[i].joinable()) thread_pool[i].join(); 1668 } 1669 1670 for (auto& handler : arenas_pool) { 1671 if (handler.status != arena_handler::deleted) delete handler.arena; 1672 } 1673 } 1674 1675 struct enqueue_test_helper { 1676 enqueue_test_helper(tbb::task_arena& arena, tbb::enumerable_thread_specific<bool>& ets , std::atomic<std::size_t>& task_counter) 1677 : my_arena(arena), my_ets(ets), my_task_counter(task_counter) 1678 {} 1679 1680 enqueue_test_helper(const enqueue_test_helper& ef) : my_arena(ef.my_arena), my_ets(ef.my_ets), my_task_counter(ef.my_task_counter) 1681 {} 1682 1683 void operator() () const { 1684 CHECK(my_ets.local()); 1685 if (my_task_counter++ < 100000) my_arena.enqueue(enqueue_test_helper(my_arena, my_ets, my_task_counter)); 1686 std::this_thread::yield(); 1687 } 1688 1689 tbb::task_arena& my_arena; 1690 tbb::enumerable_thread_specific<bool>& my_ets; 1691 std::atomic<std::size_t>& my_task_counter; 1692 }; 1693 1694 //--------------------------------------------------// 1695 1696 //! Test for task arena in concurrent cases 1697 //! \brief \ref requirement 1698 TEST_CASE("Test for concurrent functionality") { 1699 TestConcurrentFunctionality(); 1700 } 1701 1702 //! Test for arena entry consistency 1703 //! \brief \ref requirement \ref error_guessing 1704 TEST_CASE("Test for task arena entry consistency") { 1705 TestArenaEntryConsistency(); 1706 } 1707 1708 //! Test for task arena attach functionality 1709 //! \brief \ref requirement \ref interface 1710 TEST_CASE("Test for the attach functionality") { 1711 TestAttach(4); 1712 } 1713 1714 //! Test for constant functor requirements 1715 //! \brief \ref requirement \ref interface 1716 TEST_CASE("Test for constant functor requirement") { 1717 TestConstantFunctorRequirement(); 1718 } 1719 1720 //! Test for move semantics support 1721 //! \brief \ref requirement \ref interface 1722 TEST_CASE("Move semantics support") { 1723 TestMoveSemantics(); 1724 } 1725 1726 //! Test for different return value types 1727 //! \brief \ref requirement \ref interface 1728 TEST_CASE("Return value test") { 1729 TestReturnValue(); 1730 } 1731 1732 //! Test for delegated task spawn in case of unsuccessful slot attach 1733 //! \brief \ref error_guessing 1734 TEST_CASE("Delegated spawn wait") { 1735 TestDelegatedSpawnWait(); 1736 } 1737 1738 //! Test task arena isolation functionality 1739 //! \brief \ref requirement \ref interface 1740 TEST_CASE("Isolated execute") { 1741 // Isolation tests cases is valid only for more then 2 threads 1742 if (tbb::this_task_arena::max_concurrency() > 2) { 1743 TestIsolatedExecute(); 1744 } 1745 } 1746 1747 //! Test for TBB Workers creation limits 1748 //! \brief \ref requirement 1749 TEST_CASE("Default workers limit") { 1750 TestDefaultWorkersLimit(); 1751 } 1752 1753 //! Test for workers migration between arenas 1754 //! \brief \ref error_guessing \ref stress 1755 TEST_CASE("Arena workers migration") { 1756 TestArenaWorkersMigration(); 1757 } 1758 1759 //! Test for multiple waits, threads should not block each other 1760 //! \brief \ref requirement 1761 TEST_CASE("Multiple waits") { 1762 TestMultipleWaits(); 1763 } 1764 1765 //! Test for small stack size settings and arena initialization 1766 //! \brief \ref error_guessing 1767 TEST_CASE("Small stack size") { 1768 TestSmallStackSize(); 1769 } 1770 1771 #if TBB_USE_EXCEPTIONS 1772 //! \brief \ref requirement \ref stress 1773 TEST_CASE("Test for exceptions during execute.") { 1774 ExceptionInExecute(); 1775 } 1776 1777 //! \brief \ref error_guessing 1778 TEST_CASE("Exception thrown during tbb::task_arena::execute call") { 1779 struct throwing_obj { 1780 throwing_obj() { 1781 volatile bool flag = true; 1782 if (flag) throw std::exception{}; 1783 } 1784 throwing_obj(const throwing_obj&) = default; 1785 ~throwing_obj() { FAIL("An destructor was called."); } 1786 }; 1787 1788 tbb::task_arena arena; 1789 1790 REQUIRE_THROWS_AS( [&] { 1791 arena.execute([] { 1792 return throwing_obj{}; 1793 }); 1794 }(), std::exception ); 1795 } 1796 #endif // TBB_USE_EXCEPTIONS 1797 1798 //! \brief \ref stress 1799 TEST_CASE("Stress test with mixing functionality") { 1800 StressTestMixFunctionality(); 1801 } 1802 1803 //! \brief \ref stress 1804 TEST_CASE("Workers oversubscription") { 1805 std::size_t num_threads = utils::get_platform_max_threads(); 1806 tbb::enumerable_thread_specific<bool> ets; 1807 tbb::global_control gl(tbb::global_control::max_allowed_parallelism, num_threads * 2); 1808 tbb::task_arena arena(num_threads * 2); 1809 1810 utils::SpinBarrier barrier(num_threads * 2); 1811 1812 arena.execute([&] { 1813 tbb::parallel_for(std::size_t(0), num_threads * 2, 1814 [&] (const std::size_t&) { 1815 ets.local() = true; 1816 barrier.wait(); 1817 } 1818 ); 1819 }); 1820 1821 std::this_thread::yield(); 1822 1823 std::atomic<std::size_t> task_counter{0}; 1824 for (std::size_t i = 0; i < num_threads / 4 + 1; ++i) { 1825 arena.enqueue(enqueue_test_helper(arena, ets, task_counter)); 1826 } 1827 1828 while (task_counter < 100000) std::this_thread::yield(); 1829 1830 arena.execute([&] { 1831 tbb::parallel_for(std::size_t(0), num_threads * 2, 1832 [&] (const std::size_t&) { 1833 CHECK(ets.local()); 1834 barrier.wait(); 1835 } 1836 ); 1837 }); 1838 } 1839