1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "common/test.h" 18 19 #define __TBB_EXTRA_DEBUG 1 20 #include "common/concurrency_tracker.h" 21 #include "common/spin_barrier.h" 22 #include "common/utils.h" 23 #include "common/utils_report.h" 24 #include "common/utils_concurrency_limit.h" 25 26 #include "tbb/task_arena.h" 27 #include "tbb/task_scheduler_observer.h" 28 #include "tbb/enumerable_thread_specific.h" 29 #include "tbb/parallel_for.h" 30 #include "tbb/global_control.h" 31 #include "tbb/concurrent_set.h" 32 #include "tbb/spin_mutex.h" 33 #include "tbb/spin_rw_mutex.h" 34 35 #include <stdexcept> 36 #include <cstdlib> 37 #include <cstdio> 38 #include <vector> 39 #include <thread> 40 #include <atomic> 41 42 //#include "harness_fp.h" 43 44 //! \file test_task_arena.cpp 45 //! \brief Test for [scheduler.task_arena scheduler.task_scheduler_observer] specification 46 47 //--------------------------------------------------// 48 // Test that task_arena::initialize and task_arena::terminate work when doing nothing else. 49 /* maxthread is treated as the biggest possible concurrency level. */ 50 void InitializeAndTerminate( int maxthread ) { 51 for( int i=0; i<200; ++i ) { 52 switch( i&3 ) { 53 // Arena is created inactive, initialization is always explicit. Lazy initialization is covered by other test functions. 54 // Explicit initialization can either keep the original values or change those. 55 // Arena termination can be explicit or implicit (in the destructor). 56 // TODO: extend with concurrency level checks if such a method is added. 57 default: { 58 tbb::task_arena arena( std::rand() % maxthread + 1 ); 59 CHECK_MESSAGE(!arena.is_active(), "arena should not be active until initialized"); 60 arena.initialize(); 61 CHECK(arena.is_active()); 62 arena.terminate(); 63 CHECK_MESSAGE(!arena.is_active(), "arena should not be active; it was terminated"); 64 break; 65 } 66 case 0: { 67 tbb::task_arena arena( 1 ); 68 CHECK_MESSAGE(!arena.is_active(), "arena should not be active until initialized"); 69 arena.initialize( std::rand() % maxthread + 1 ); // change the parameters 70 CHECK(arena.is_active()); 71 break; 72 } 73 case 1: { 74 tbb::task_arena arena( tbb::task_arena::automatic ); 75 CHECK(!arena.is_active()); 76 arena.initialize(); 77 CHECK(arena.is_active()); 78 break; 79 } 80 case 2: { 81 tbb::task_arena arena; 82 CHECK_MESSAGE(!arena.is_active(), "arena should not be active until initialized"); 83 arena.initialize( std::rand() % maxthread + 1 ); 84 CHECK(arena.is_active()); 85 arena.terminate(); 86 CHECK_MESSAGE(!arena.is_active(), "arena should not be active; it was terminated"); 87 break; 88 } 89 } 90 } 91 } 92 93 //--------------------------------------------------// 94 95 // Definitions used in more than one test 96 typedef tbb::blocked_range<int> Range; 97 98 // slot_id value: -1 is reserved by current_slot(), -2 is set in on_scheduler_exit() below 99 static tbb::enumerable_thread_specific<int> local_id, old_id, slot_id(-3); 100 101 void ResetTLS() { 102 local_id.clear(); 103 old_id.clear(); 104 slot_id.clear(); 105 } 106 107 class ArenaObserver : public tbb::task_scheduler_observer { 108 int myId; // unique observer/arena id within a test 109 int myMaxConcurrency; // concurrency of the associated arena 110 int myNumReservedSlots; // reserved slots in the associated arena 111 void on_scheduler_entry( bool is_worker ) override { 112 int current_index = tbb::this_task_arena::current_thread_index(); 113 CHECK(current_index < (myMaxConcurrency > 1 ? myMaxConcurrency : 2)); 114 if (is_worker) { 115 CHECK(current_index >= myNumReservedSlots); 116 } 117 CHECK_MESSAGE(!old_id.local(), "double call to on_scheduler_entry"); 118 old_id.local() = local_id.local(); 119 CHECK_MESSAGE(old_id.local() != myId, "double entry to the same arena"); 120 local_id.local() = myId; 121 slot_id.local() = current_index; 122 } 123 void on_scheduler_exit( bool /*is_worker*/ ) override { 124 CHECK_MESSAGE(local_id.local() == myId, "nesting of arenas is broken"); 125 CHECK(slot_id.local() == tbb::this_task_arena::current_thread_index()); 126 slot_id.local() = -2; 127 local_id.local() = old_id.local(); 128 old_id.local() = 0; 129 } 130 public: 131 ArenaObserver(tbb::task_arena &a, int maxConcurrency, int numReservedSlots, int id) 132 : tbb::task_scheduler_observer(a) 133 , myId(id) 134 , myMaxConcurrency(maxConcurrency) 135 , myNumReservedSlots(numReservedSlots) { 136 CHECK(myId); 137 observe(true); 138 } 139 ~ArenaObserver () { 140 CHECK_MESSAGE(!old_id.local(), "inconsistent observer state"); 141 } 142 }; 143 144 struct IndexTrackingBody { // Must be used together with ArenaObserver 145 void operator() ( const Range& ) const { 146 CHECK(slot_id.local() == tbb::this_task_arena::current_thread_index()); 147 utils::doDummyWork(50000); 148 } 149 }; 150 151 struct AsynchronousWork { 152 utils::SpinBarrier &my_barrier; 153 bool my_is_blocking; 154 AsynchronousWork(utils::SpinBarrier &a_barrier, bool blocking = true) 155 : my_barrier(a_barrier), my_is_blocking(blocking) {} 156 void operator()() const { 157 CHECK_MESSAGE(local_id.local() != 0, "not in explicit arena"); 158 tbb::parallel_for(Range(0,500), IndexTrackingBody(), tbb::simple_partitioner()); 159 if(my_is_blocking) my_barrier.wait(); // must be asynchronous to an external thread 160 else my_barrier.signalNoWait(); 161 } 162 }; 163 164 //-----------------------------------------------------------------------------------------// 165 166 // Test that task_arenas might be created and used from multiple application threads. 167 // Also tests arena observers. The parameter p is the index of an app thread running this test. 168 void TestConcurrentArenasFunc(int idx) { 169 // A regression test for observer activation order: 170 // check that arena observer can be activated before local observer 171 struct LocalObserver : public tbb::task_scheduler_observer { 172 LocalObserver() : tbb::task_scheduler_observer() { observe(true); } 173 }; 174 tbb::task_arena a1; 175 a1.initialize(1,0); 176 ArenaObserver o1(a1, 1, 0, idx*2+1); // the last argument is a "unique" observer/arena id for the test 177 CHECK_MESSAGE(o1.is_observing(), "Arena observer has not been activated"); 178 LocalObserver lo; 179 CHECK_MESSAGE(lo.is_observing(), "Local observer has not been activated"); 180 tbb::task_arena a2(2,1); 181 ArenaObserver o2(a2, 2, 1, idx*2+2); 182 CHECK_MESSAGE(o2.is_observing(), "Arena observer has not been activated"); 183 utils::SpinBarrier barrier(2); 184 AsynchronousWork work(barrier); 185 a1.enqueue(work); // put async work 186 barrier.wait(); 187 a2.enqueue(work); // another work 188 a2.execute(work); 189 a1.debug_wait_until_empty(); 190 a2.debug_wait_until_empty(); 191 } 192 193 void TestConcurrentArenas(int p) { 194 // TODO REVAMP fix with global control 195 ResetTLS(); 196 utils::NativeParallelFor( p, &TestConcurrentArenasFunc ); 197 } 198 //--------------------------------------------------// 199 // Test multiple application threads working with a single arena at the same time. 200 class MultipleMastersPart1 : utils::NoAssign { 201 tbb::task_arena &my_a; 202 utils::SpinBarrier &my_b1, &my_b2; 203 public: 204 MultipleMastersPart1( tbb::task_arena &a, utils::SpinBarrier &b1, utils::SpinBarrier &b2) 205 : my_a(a), my_b1(b1), my_b2(b2) {} 206 void operator()(int) const { 207 my_a.execute(AsynchronousWork(my_b2, /*blocking=*/false)); 208 my_b1.wait(); 209 // A regression test for bugs 1954 & 1971 210 my_a.enqueue(AsynchronousWork(my_b2, /*blocking=*/false)); 211 } 212 }; 213 214 class MultipleMastersPart2 : utils::NoAssign { 215 tbb::task_arena &my_a; 216 utils::SpinBarrier &my_b; 217 public: 218 MultipleMastersPart2( tbb::task_arena &a, utils::SpinBarrier &b) : my_a(a), my_b(b) {} 219 void operator()(int) const { 220 my_a.execute(AsynchronousWork(my_b, /*blocking=*/false)); 221 } 222 }; 223 224 class MultipleMastersPart3 : utils::NoAssign { 225 tbb::task_arena &my_a; 226 utils::SpinBarrier &my_b; 227 using wait_context = tbb::detail::d1::wait_context; 228 229 struct Runner : NoAssign { 230 wait_context& myWait; 231 Runner(wait_context& w) : myWait(w) {} 232 void operator()() const { 233 utils::doDummyWork(10000); 234 myWait.release(); 235 } 236 }; 237 238 struct Waiter : NoAssign { 239 wait_context& myWait; 240 Waiter(wait_context& w) : myWait(w) {} 241 void operator()() const { 242 tbb::task_group_context ctx; 243 tbb::detail::d1::wait(myWait, ctx); 244 } 245 }; 246 247 public: 248 MultipleMastersPart3(tbb::task_arena &a, utils::SpinBarrier &b) 249 : my_a(a), my_b(b) {} 250 void operator()(int) const { 251 wait_context wait(0); 252 my_b.wait(); // increases chances for task_arena initialization contention 253 for( int i=0; i<100; ++i) { 254 wait.reserve(); 255 my_a.enqueue(Runner(wait)); 256 my_a.execute(Waiter(wait)); 257 } 258 my_b.wait(); 259 } 260 }; 261 262 void TestMultipleMasters(int p) { 263 { 264 ResetTLS(); 265 tbb::task_arena a(1,0); 266 a.initialize(); 267 ArenaObserver o(a, 1, 0, 1); 268 utils::SpinBarrier barrier1(p), barrier2(2*p+1); // each of p threads will submit two tasks signaling the barrier 269 NativeParallelFor( p, MultipleMastersPart1(a, barrier1, barrier2) ); 270 barrier2.wait(); 271 a.debug_wait_until_empty(); 272 } { 273 ResetTLS(); 274 tbb::task_arena a(2,1); 275 ArenaObserver o(a, 2, 1, 2); 276 utils::SpinBarrier barrier(p+2); 277 a.enqueue(AsynchronousWork(barrier, /*blocking=*/true)); // occupy the worker, a regression test for bug 1981 278 // TODO: buggy test. A worker threads need time to occupy the slot to prevent an external thread from taking an enqueue task. 279 utils::Sleep(10); 280 NativeParallelFor( p, MultipleMastersPart2(a, barrier) ); 281 barrier.wait(); 282 a.debug_wait_until_empty(); 283 } { 284 // Regression test for the bug 1981 part 2 (task_arena::execute() with wait_for_all for an enqueued task) 285 tbb::task_arena a(p,1); 286 utils::SpinBarrier barrier(p+1); // for external threads to avoid endless waiting at least in some runs 287 // "Oversubscribe" the arena by 1 external thread 288 NativeParallelFor( p+1, MultipleMastersPart3(a, barrier) ); 289 a.debug_wait_until_empty(); 290 } 291 } 292 293 //--------------------------------------------------// 294 // TODO: explain what TestArenaEntryConsistency does 295 #include <sstream> 296 #include <stdexcept> 297 #include "oneapi/tbb/detail/_exception.h" 298 #include "common/fp_control.h" 299 300 struct TestArenaEntryBody : FPModeContext { 301 std::atomic<int> &my_stage; // each execute increases it 302 std::stringstream my_id; 303 bool is_caught, is_expected; 304 enum { arenaFPMode = 1 }; 305 306 TestArenaEntryBody(std::atomic<int> &s, int idx, int i) // init thread-specific instance 307 : FPModeContext(idx+i) 308 , my_stage(s) 309 , is_caught(false) 310 #if TBB_USE_EXCEPTIONS 311 , is_expected( (idx&(1<<i)) != 0 ) 312 #else 313 , is_expected(false) 314 #endif 315 { 316 my_id << idx << ':' << i << '@'; 317 } 318 void operator()() { // inside task_arena::execute() 319 // synchronize with other stages 320 int stage = my_stage++; 321 int slot = tbb::this_task_arena::current_thread_index(); 322 CHECK(slot >= 0); 323 CHECK(slot <= 1); 324 // wait until the third stage is delegated and then starts on slot 0 325 while(my_stage < 2+slot) utils::yield(); 326 // deduct its entry type and put it into id, it helps to find source of a problem 327 my_id << (stage < 3 ? (tbb::this_task_arena::current_thread_index()? 328 "delegated_to_worker" : stage < 2? "direct" : "delegated_to_master") 329 : stage == 3? "nested_same_ctx" : "nested_alien_ctx"); 330 AssertFPMode(arenaFPMode); 331 if (is_expected) { 332 TBB_TEST_THROW(std::logic_error(my_id.str())); 333 } 334 // no code can be put here since exceptions can be thrown 335 } 336 void on_exception(const char *e) { // outside arena, in catch block 337 is_caught = true; 338 CHECK(my_id.str() == e); 339 assertFPMode(); 340 } 341 void after_execute() { // outside arena and catch block 342 CHECK(is_caught == is_expected); 343 assertFPMode(); 344 } 345 }; 346 347 class ForEachArenaEntryBody : utils::NoAssign { 348 tbb::task_arena &my_a; // expected task_arena(2,1) 349 std::atomic<int> &my_stage; // each execute increases it 350 int my_idx; 351 352 public: 353 ForEachArenaEntryBody(tbb::task_arena &a, std::atomic<int> &c) 354 : my_a(a), my_stage(c), my_idx(0) {} 355 356 void test(int idx) { 357 my_idx = idx; 358 my_stage = 0; 359 NativeParallelFor(3, *this); // test cross-arena calls 360 CHECK(my_stage == 3); 361 my_a.execute(*this); // test nested calls 362 CHECK(my_stage == 5); 363 } 364 365 // task_arena functor for nested tests 366 void operator()() const { 367 test_arena_entry(3); // in current task group context 368 tbb::parallel_for(4, 5, *this); // in different context 369 } 370 371 // NativeParallelFor & parallel_for functor 372 void operator()(int i) const { 373 test_arena_entry(i); 374 } 375 376 private: 377 void test_arena_entry(int i) const { 378 GetRoundingMode(); 379 TestArenaEntryBody scoped_functor(my_stage, my_idx, i); 380 GetRoundingMode(); 381 #if TBB_USE_EXCEPTIONS 382 try { 383 my_a.execute(scoped_functor); 384 } catch(std::logic_error &e) { 385 scoped_functor.on_exception(e.what()); 386 } catch(...) { CHECK_MESSAGE(false, "Unexpected exception type"); } 387 #else 388 my_a.execute(scoped_functor); 389 #endif 390 scoped_functor.after_execute(); 391 } 392 }; 393 394 void TestArenaEntryConsistency() { 395 tbb::task_arena a(2, 1); 396 std::atomic<int> c; 397 ForEachArenaEntryBody body(a, c); 398 399 FPModeContext fp_scope(TestArenaEntryBody::arenaFPMode); 400 a.initialize(); // capture FP settings to arena 401 fp_scope.setNextFPMode(); 402 403 for (int i = 0; i < 100; i++) // not less than 32 = 2^5 of entry types 404 body.test(i); 405 } 406 //-------------------------------------------------- 407 // Test that the requested degree of concurrency for task_arena is achieved in various conditions 408 class TestArenaConcurrencyBody : utils::NoAssign { 409 tbb::task_arena &my_a; 410 int my_max_concurrency; 411 int my_reserved_slots; 412 utils::SpinBarrier *my_barrier; 413 utils::SpinBarrier *my_worker_barrier; 414 public: 415 TestArenaConcurrencyBody( tbb::task_arena &a, int max_concurrency, int reserved_slots, utils::SpinBarrier *b = NULL, utils::SpinBarrier *wb = NULL ) 416 : my_a(a), my_max_concurrency(max_concurrency), my_reserved_slots(reserved_slots), my_barrier(b), my_worker_barrier(wb) {} 417 // NativeParallelFor's functor 418 void operator()( int ) const { 419 CHECK_MESSAGE( local_id.local() == 0, "TLS was not cleaned?" ); 420 local_id.local() = 1; 421 my_a.execute( *this ); 422 } 423 // Arena's functor 424 void operator()() const { 425 int idx = tbb::this_task_arena::current_thread_index(); 426 CHECK( idx < (my_max_concurrency > 1 ? my_max_concurrency : 2) ); 427 CHECK( my_a.max_concurrency() == tbb::this_task_arena::max_concurrency() ); 428 int max_arena_concurrency = tbb::this_task_arena::max_concurrency(); 429 CHECK( max_arena_concurrency == my_max_concurrency ); 430 if ( my_worker_barrier ) { 431 if ( local_id.local() == 1 ) { 432 // External thread in a reserved slot 433 CHECK_MESSAGE( idx < my_reserved_slots, "External threads are supposed to use only reserved slots in this test" ); 434 } else { 435 // Worker thread 436 CHECK( idx >= my_reserved_slots ); 437 my_worker_barrier->wait(); 438 } 439 } else if ( my_barrier ) 440 CHECK_MESSAGE( local_id.local() == 1, "Workers are not supposed to enter the arena in this test" ); 441 if ( my_barrier ) my_barrier->wait(); 442 else utils::Sleep( 1 ); 443 } 444 }; 445 446 void TestArenaConcurrency( int p, int reserved = 0, int step = 1) { 447 for (; reserved <= p; reserved += step) { 448 tbb::task_arena a( p, reserved ); 449 { // Check concurrency with worker & reserved external threads. 450 ResetTLS(); 451 utils::SpinBarrier b( p ); 452 utils::SpinBarrier wb( p-reserved ); 453 TestArenaConcurrencyBody test( a, p, reserved, &b, &wb ); 454 for ( int i = reserved; i < p; ++i ) 455 a.enqueue( test ); 456 if ( reserved==1 ) 457 test( 0 ); // calls execute() 458 else 459 utils::NativeParallelFor( reserved, test ); 460 a.debug_wait_until_empty(); 461 } { // Check if multiple external threads alone can achieve maximum concurrency. 462 ResetTLS(); 463 utils::SpinBarrier b( p ); 464 utils::NativeParallelFor( p, TestArenaConcurrencyBody( a, p, reserved, &b ) ); 465 a.debug_wait_until_empty(); 466 } { // Check oversubscription by external threads. 467 #if !_WIN32 || !_WIN64 468 // Some C++ implementations allocate 8MB stacks for std::thread on 32 bit platforms 469 // that makes impossible to create more than ~500 threads. 470 if ( !(sizeof(std::size_t) == 4 && p > 200) ) 471 #endif 472 #if TBB_TEST_LOW_WORKLOAD 473 if ( p <= 16 ) 474 #endif 475 { 476 ResetTLS(); 477 utils::NativeParallelFor(2 * p, TestArenaConcurrencyBody(a, p, reserved)); 478 a.debug_wait_until_empty(); 479 } 480 } 481 } 482 } 483 484 struct TestMandatoryConcurrencyObserver : public tbb::task_scheduler_observer { 485 utils::SpinBarrier& m_barrier; 486 487 TestMandatoryConcurrencyObserver(tbb::task_arena& a, utils::SpinBarrier& barrier) 488 : tbb::task_scheduler_observer(a), m_barrier(barrier) { 489 observe(true); 490 } 491 void on_scheduler_exit(bool worker) override { 492 if (worker) { 493 m_barrier.wait(); 494 } 495 } 496 }; 497 498 void TestMandatoryConcurrency() { 499 tbb::task_arena a(1); 500 a.execute([&a] { 501 int n_threads = 4; 502 utils::SpinBarrier exit_barrier(2); 503 TestMandatoryConcurrencyObserver observer(a, exit_barrier); 504 for (int j = 0; j < 5; ++j) { 505 utils::ExactConcurrencyLevel::check(1); 506 std::atomic<int> num_tasks{ 0 }, curr_tasks{ 0 }; 507 utils::SpinBarrier barrier(n_threads); 508 utils::NativeParallelFor(n_threads, [&](int) { 509 for (int i = 0; i < 5; ++i) { 510 barrier.wait(); 511 a.enqueue([&] { 512 CHECK(tbb::this_task_arena::max_concurrency() == 2); 513 CHECK(a.max_concurrency() == 2); 514 ++curr_tasks; 515 CHECK(curr_tasks == 1); 516 utils::doDummyWork(1000); 517 CHECK(curr_tasks == 1); 518 --curr_tasks; 519 ++num_tasks; 520 }); 521 barrier.wait(); 522 } 523 }); 524 do { 525 exit_barrier.wait(); 526 } while (num_tasks < n_threads * 5); 527 } 528 observer.observe(false); 529 }); 530 } 531 532 void TestConcurrentFunctionality(int min_thread_num = 1, int max_thread_num = 3) { 533 TestMandatoryConcurrency(); 534 InitializeAndTerminate(max_thread_num); 535 for (int p = min_thread_num; p <= max_thread_num; ++p) { 536 TestConcurrentArenas(p); 537 TestMultipleMasters(p); 538 TestArenaConcurrency(p); 539 } 540 } 541 542 //--------------------------------------------------// 543 // Test creation/initialization of a task_arena that references an existing arena (aka attach). 544 // This part of the test uses the knowledge of task_arena internals 545 546 struct TaskArenaValidator { 547 int my_slot_at_construction; 548 const tbb::task_arena& my_arena; 549 TaskArenaValidator( const tbb::task_arena& other ) 550 : my_slot_at_construction(tbb::this_task_arena::current_thread_index()) 551 , my_arena(other) 552 {} 553 // Inspect the internal state 554 int concurrency() { return my_arena.debug_max_concurrency(); } 555 int reserved_for_masters() { return my_arena.debug_reserved_slots(); } 556 557 // This method should be called in task_arena::execute() for a captured arena 558 // by the same thread that created the validator. 559 void operator()() { 560 CHECK_MESSAGE( tbb::this_task_arena::current_thread_index()==my_slot_at_construction, 561 "Current thread index has changed since the validator construction" ); 562 } 563 }; 564 565 void ValidateAttachedArena( tbb::task_arena& arena, bool expect_activated, 566 int expect_concurrency, int expect_masters ) { 567 CHECK_MESSAGE( arena.is_active()==expect_activated, "Unexpected activation state" ); 568 if( arena.is_active() ) { 569 TaskArenaValidator validator( arena ); 570 CHECK_MESSAGE( validator.concurrency()==expect_concurrency, "Unexpected arena size" ); 571 CHECK_MESSAGE( validator.reserved_for_masters()==expect_masters, "Unexpected # of reserved slots" ); 572 if ( tbb::this_task_arena::current_thread_index() != tbb::task_arena::not_initialized ) { 573 CHECK(tbb::this_task_arena::current_thread_index() >= 0); 574 // for threads already in arena, check that the thread index remains the same 575 arena.execute( validator ); 576 } else { // not_initialized 577 // Test the deprecated method 578 CHECK(tbb::this_task_arena::current_thread_index() == -1); 579 } 580 581 // Ideally, there should be a check for having the same internal arena object, 582 // but that object is not easily accessible for implicit arenas. 583 } 584 } 585 586 struct TestAttachBody : utils::NoAssign { 587 static thread_local int my_idx; // safe to modify and use within the NativeParallelFor functor 588 const int maxthread; 589 TestAttachBody( int max_thr ) : maxthread(max_thr) {} 590 591 // The functor body for NativeParallelFor 592 void operator()( int idx ) const { 593 my_idx = idx; 594 595 int default_threads = tbb::this_task_arena::max_concurrency(); 596 597 tbb::task_arena arena = tbb::task_arena( tbb::task_arena::attach() ); 598 ValidateAttachedArena( arena, false, -1, -1 ); // Nothing yet to attach to 599 600 arena.terminate(); 601 ValidateAttachedArena( arena, false, -1, -1 ); 602 603 // attach to an auto-initialized arena 604 tbb::parallel_for(0, 1, [](int) {}); 605 606 tbb::task_arena arena2 = tbb::task_arena( tbb::task_arena::attach() ); 607 ValidateAttachedArena( arena2, true, default_threads, 1 ); 608 609 // attach to another task_arena 610 arena.initialize( maxthread, std::min(maxthread,idx) ); 611 arena.execute( *this ); 612 } 613 614 // The functor body for task_arena::execute above 615 void operator()() const { 616 tbb::task_arena arena2 = tbb::task_arena( tbb::task_arena::attach() ); 617 ValidateAttachedArena( arena2, true, maxthread, std::min(maxthread,my_idx) ); 618 } 619 620 // The functor body for tbb::parallel_for 621 void operator()( const Range& r ) const { 622 for( int i = r.begin(); i<r.end(); ++i ) { 623 tbb::task_arena arena2 = tbb::task_arena( tbb::task_arena::attach() ); 624 ValidateAttachedArena( arena2, true, tbb::this_task_arena::max_concurrency(), 1 ); 625 } 626 } 627 }; 628 629 thread_local int TestAttachBody::my_idx; 630 631 void TestAttach( int maxthread ) { 632 // Externally concurrent, but no concurrency within a thread 633 utils::NativeParallelFor( std::max(maxthread,4), TestAttachBody( maxthread ) ); 634 // Concurrent within the current arena; may also serve as a stress test 635 tbb::parallel_for( Range(0,10000*maxthread), TestAttachBody( maxthread ) ); 636 } 637 638 //--------------------------------------------------// 639 640 // Test that task_arena::enqueue does not tolerate a non-const functor. 641 // TODO: can it be reworked as SFINAE-based compile-time check? 642 struct TestFunctor { 643 void operator()() { CHECK_MESSAGE( false, "Non-const operator called" ); } 644 void operator()() const { /* library requires this overload only */ } 645 }; 646 647 void TestConstantFunctorRequirement() { 648 tbb::task_arena a; 649 TestFunctor tf; 650 a.enqueue( tf ); 651 } 652 653 //--------------------------------------------------// 654 655 #include "tbb/parallel_reduce.h" 656 #include "tbb/parallel_invoke.h" 657 658 // Test this_task_arena::isolate 659 namespace TestIsolatedExecuteNS { 660 template <typename NestedPartitioner> 661 class NestedParFor : utils::NoAssign { 662 public: 663 NestedParFor() {} 664 void operator()() const { 665 NestedPartitioner p; 666 tbb::parallel_for( 0, 10, utils::DummyBody( 10 ), p ); 667 } 668 }; 669 670 template <typename NestedPartitioner> 671 class ParForBody : utils::NoAssign { 672 bool myOuterIsolation; 673 tbb::enumerable_thread_specific<int> &myEts; 674 std::atomic<bool> &myIsStolen; 675 public: 676 ParForBody( bool outer_isolation, tbb::enumerable_thread_specific<int> &ets, std::atomic<bool> &is_stolen ) 677 : myOuterIsolation( outer_isolation ), myEts( ets ), myIsStolen( is_stolen ) {} 678 void operator()( int ) const { 679 int &e = myEts.local(); 680 if ( e++ > 0 ) myIsStolen = true; 681 if ( myOuterIsolation ) 682 NestedParFor<NestedPartitioner>()(); 683 else 684 tbb::this_task_arena::isolate( NestedParFor<NestedPartitioner>() ); 685 --e; 686 } 687 }; 688 689 template <typename OuterPartitioner, typename NestedPartitioner> 690 class OuterParFor : utils::NoAssign { 691 bool myOuterIsolation; 692 std::atomic<bool> &myIsStolen; 693 public: 694 OuterParFor( bool outer_isolation, std::atomic<bool> &is_stolen ) : myOuterIsolation( outer_isolation ), myIsStolen( is_stolen ) {} 695 void operator()() const { 696 tbb::enumerable_thread_specific<int> ets( 0 ); 697 OuterPartitioner p; 698 tbb::parallel_for( 0, 1000, ParForBody<NestedPartitioner>( myOuterIsolation, ets, myIsStolen ), p ); 699 } 700 }; 701 702 template <typename OuterPartitioner, typename NestedPartitioner> 703 void TwoLoopsTest( bool outer_isolation ) { 704 std::atomic<bool> is_stolen; 705 is_stolen = false; 706 const int max_repeats = 100; 707 if ( outer_isolation ) { 708 for ( int i = 0; i <= max_repeats; ++i ) { 709 tbb::this_task_arena::isolate( OuterParFor<OuterPartitioner, NestedPartitioner>( outer_isolation, is_stolen ) ); 710 if ( is_stolen ) break; 711 } 712 // TODO: was ASSERT_WARNING 713 if (!is_stolen) { 714 REPORT("Warning: isolate() should not block stealing on nested levels without isolation\n"); 715 } 716 } else { 717 for ( int i = 0; i <= max_repeats; ++i ) { 718 OuterParFor<OuterPartitioner, NestedPartitioner>( outer_isolation, is_stolen )(); 719 } 720 REQUIRE_MESSAGE( !is_stolen, "isolate() on nested levels should prevent stealing from outer leves" ); 721 } 722 } 723 724 void TwoLoopsTest( bool outer_isolation ) { 725 TwoLoopsTest<tbb::simple_partitioner, tbb::simple_partitioner>( outer_isolation ); 726 TwoLoopsTest<tbb::simple_partitioner, tbb::affinity_partitioner>( outer_isolation ); 727 TwoLoopsTest<tbb::affinity_partitioner, tbb::simple_partitioner>( outer_isolation ); 728 TwoLoopsTest<tbb::affinity_partitioner, tbb::affinity_partitioner>( outer_isolation ); 729 } 730 731 void TwoLoopsTest() { 732 TwoLoopsTest( true ); 733 TwoLoopsTest( false ); 734 } 735 //--------------------------------------------------// 736 class HeavyMixTestBody : utils::NoAssign { 737 tbb::enumerable_thread_specific<utils::FastRandom<>>& myRandom; 738 tbb::enumerable_thread_specific<int>& myIsolatedLevel; 739 int myNestedLevel; 740 741 template <typename Partitioner, typename Body> 742 static void RunTwoBodies( utils::FastRandom<>& rnd, const Body &body, Partitioner& p, tbb::task_group_context* ctx = NULL ) { 743 if ( rnd.get() % 2 ) { 744 if (ctx ) 745 tbb::parallel_for( 0, 2, body, p, *ctx ); 746 else 747 tbb::parallel_for( 0, 2, body, p ); 748 } else { 749 tbb::parallel_invoke( body, body ); 750 } 751 } 752 753 template <typename Partitioner> 754 class IsolatedBody : utils::NoAssign { 755 const HeavyMixTestBody &myHeavyMixTestBody; 756 Partitioner &myPartitioner; 757 public: 758 IsolatedBody( const HeavyMixTestBody &body, Partitioner &partitioner ) 759 : myHeavyMixTestBody( body ), myPartitioner( partitioner ) {} 760 void operator()() const { 761 RunTwoBodies( myHeavyMixTestBody.myRandom.local(), 762 HeavyMixTestBody( myHeavyMixTestBody.myRandom, myHeavyMixTestBody.myIsolatedLevel, 763 myHeavyMixTestBody.myNestedLevel + 1 ), 764 myPartitioner ); 765 } 766 }; 767 768 template <typename Partitioner> 769 void RunNextLevel( utils::FastRandom<>& rnd, int &isolated_level ) const { 770 Partitioner p; 771 switch ( rnd.get() % 2 ) { 772 case 0: { 773 // No features 774 tbb::task_group_context ctx; 775 RunTwoBodies( rnd, HeavyMixTestBody(myRandom, myIsolatedLevel, myNestedLevel + 1), p, &ctx ); 776 break; 777 } 778 case 1: { 779 // Isolation 780 int previous_isolation = isolated_level; 781 isolated_level = myNestedLevel; 782 tbb::this_task_arena::isolate( IsolatedBody<Partitioner>( *this, p ) ); 783 isolated_level = previous_isolation; 784 break; 785 } 786 } 787 } 788 public: 789 HeavyMixTestBody( tbb::enumerable_thread_specific<utils::FastRandom<>>& random, 790 tbb::enumerable_thread_specific<int>& isolated_level, int nested_level ) 791 : myRandom( random ), myIsolatedLevel( isolated_level ) 792 , myNestedLevel( nested_level ) {} 793 void operator()() const { 794 int &isolated_level = myIsolatedLevel.local(); 795 CHECK_FAST_MESSAGE( myNestedLevel > isolated_level, "The outer-level task should not be stolen on isolated level" ); 796 if ( myNestedLevel == 20 ) 797 return; 798 utils::FastRandom<>& rnd = myRandom.local(); 799 if ( rnd.get() % 2 == 1 ) { 800 RunNextLevel<tbb::auto_partitioner>( rnd, isolated_level ); 801 } else { 802 RunNextLevel<tbb::affinity_partitioner>( rnd, isolated_level ); 803 } 804 } 805 void operator()(int) const { 806 this->operator()(); 807 } 808 }; 809 810 struct RandomInitializer { 811 utils::FastRandom<> operator()() { 812 return utils::FastRandom<>( tbb::this_task_arena::current_thread_index() ); 813 } 814 }; 815 816 void HeavyMixTest() { 817 std::size_t num_threads = tbb::this_task_arena::max_concurrency() < 3 ? 3 : tbb::this_task_arena::max_concurrency(); 818 tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, num_threads); 819 820 RandomInitializer init_random; 821 tbb::enumerable_thread_specific<utils::FastRandom<>> random( init_random ); 822 tbb::enumerable_thread_specific<int> isolated_level( 0 ); 823 for ( int i = 0; i < 5; ++i ) { 824 HeavyMixTestBody b( random, isolated_level, 1 ); 825 b( 0 ); 826 } 827 } 828 829 //--------------------------------------------------// 830 #if TBB_USE_EXCEPTIONS 831 struct MyException {}; 832 struct IsolatedBodyThrowsException { 833 void operator()() const { 834 #if _MSC_VER && !__INTEL_COMPILER 835 // Workaround an unreachable code warning in task_arena_function. 836 volatile bool workaround = true; 837 if (workaround) 838 #endif 839 { 840 throw MyException(); 841 } 842 } 843 }; 844 struct ExceptionTestBody : utils::NoAssign { 845 tbb::enumerable_thread_specific<int>& myEts; 846 std::atomic<bool>& myIsStolen; 847 ExceptionTestBody( tbb::enumerable_thread_specific<int>& ets, std::atomic<bool>& is_stolen ) 848 : myEts( ets ), myIsStolen( is_stolen ) {} 849 void operator()( int i ) const { 850 try { 851 tbb::this_task_arena::isolate( IsolatedBodyThrowsException() ); 852 REQUIRE_MESSAGE( false, "The exception has been lost" ); 853 } 854 catch ( MyException ) {} 855 catch ( ... ) { 856 REQUIRE_MESSAGE( false, "Unexpected exception" ); 857 } 858 // Check that nested algorithms can steal outer-level tasks 859 int &e = myEts.local(); 860 if ( e++ > 0 ) myIsStolen = true; 861 // work imbalance increases chances for stealing 862 tbb::parallel_for( 0, 10+i, utils::DummyBody( 100 ) ); 863 --e; 864 } 865 }; 866 867 #endif /* TBB_USE_EXCEPTIONS */ 868 void ExceptionTest() { 869 #if TBB_USE_EXCEPTIONS 870 tbb::enumerable_thread_specific<int> ets; 871 std::atomic<bool> is_stolen; 872 is_stolen = false; 873 for ( ;; ) { 874 tbb::parallel_for( 0, 1000, ExceptionTestBody( ets, is_stolen ) ); 875 if ( is_stolen ) break; 876 } 877 REQUIRE_MESSAGE( is_stolen, "isolate should not affect non-isolated work" ); 878 #endif /* TBB_USE_EXCEPTIONS */ 879 } 880 881 struct NonConstBody { 882 unsigned int state; 883 void operator()() { 884 state ^= ~0u; 885 } 886 }; 887 888 void TestNonConstBody() { 889 NonConstBody body; 890 body.state = 0x6c97d5ed; 891 tbb::this_task_arena::isolate(body); 892 REQUIRE_MESSAGE(body.state == 0x93682a12, "The wrong state"); 893 } 894 895 // TODO: Consider tbb::task_group instead of explicit task API. 896 class TestEnqueueTask : public tbb::detail::d1::task { 897 using wait_context = tbb::detail::d1::wait_context; 898 899 tbb::enumerable_thread_specific<bool>& executed; 900 std::atomic<int>& completed; 901 902 public: 903 wait_context& waiter; 904 tbb::task_arena& arena; 905 static const int N = 100; 906 907 TestEnqueueTask(tbb::enumerable_thread_specific<bool>& exe, std::atomic<int>& c, wait_context& w, tbb::task_arena& a) 908 : executed(exe), completed(c), waiter(w), arena(a) {} 909 910 tbb::detail::d1::task* execute(tbb::detail::d1::execution_data&) override { 911 for (int i = 0; i < N; ++i) { 912 arena.enqueue([&]() { 913 executed.local() = true; 914 ++completed; 915 for (int j = 0; j < 100; j++) utils::yield(); 916 waiter.release(1); 917 }); 918 } 919 return nullptr; 920 } 921 tbb::detail::d1::task* cancel(tbb::detail::d1::execution_data&) override { return nullptr; } 922 }; 923 924 class TestEnqueueIsolateBody : utils::NoCopy { 925 tbb::enumerable_thread_specific<bool>& executed; 926 std::atomic<int>& completed; 927 tbb::task_arena& arena; 928 public: 929 static const int N = 100; 930 931 TestEnqueueIsolateBody(tbb::enumerable_thread_specific<bool>& exe, std::atomic<int>& c, tbb::task_arena& a) 932 : executed(exe), completed(c), arena(a) {} 933 void operator()() { 934 tbb::task_group_context ctx; 935 tbb::detail::d1::wait_context waiter(N); 936 937 TestEnqueueTask root(executed, completed, waiter, arena); 938 tbb::detail::d1::execute_and_wait(root, ctx, waiter, ctx); 939 } 940 }; 941 942 void TestEnqueue() { 943 tbb::enumerable_thread_specific<bool> executed(false); 944 std::atomic<int> completed; 945 tbb::task_arena arena = tbb::task_arena(tbb::task_arena::attach()); 946 947 // Check that the main thread can process enqueued tasks. 948 completed = 0; 949 TestEnqueueIsolateBody b1(executed, completed, arena); 950 b1(); 951 952 if (!executed.local()) { 953 REPORT("Warning: No one enqueued task has executed by the main thread.\n"); 954 } 955 956 executed.local() = false; 957 completed = 0; 958 const int N = 100; 959 // Create enqueued tasks out of isolation. 960 961 tbb::task_group_context ctx; 962 tbb::detail::d1::wait_context waiter(N); 963 for (int i = 0; i < N; ++i) { 964 arena.enqueue([&]() { 965 executed.local() = true; 966 ++completed; 967 utils::yield(); 968 waiter.release(1); 969 }); 970 } 971 TestEnqueueIsolateBody b2(executed, completed, arena); 972 tbb::this_task_arena::isolate(b2); 973 REQUIRE_MESSAGE(executed.local() == false, "An enqueued task was executed within isolate."); 974 975 tbb::detail::d1::wait(waiter, ctx); 976 // while (completed < TestEnqueueTask::N + N) utils::yield(); 977 } 978 } 979 980 void TestIsolatedExecute() { 981 // At least 3 threads (owner + 2 thieves) are required to reproduce a situation when the owner steals outer 982 // level task on a nested level. If we have only one thief then it will execute outer level tasks first and 983 // the owner will not have a possibility to steal outer level tasks. 984 int platform_max_thread = tbb::this_task_arena::max_concurrency(); 985 int num_threads = utils::min( platform_max_thread, 3 ); 986 { 987 // Too many threads require too many work to reproduce the stealing from outer level. 988 tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, utils::max(num_threads, 7)); 989 TestIsolatedExecuteNS::TwoLoopsTest(); 990 TestIsolatedExecuteNS::HeavyMixTest(); 991 TestIsolatedExecuteNS::ExceptionTest(); 992 } 993 tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, num_threads); 994 TestIsolatedExecuteNS::HeavyMixTest(); 995 TestIsolatedExecuteNS::TestNonConstBody(); 996 TestIsolatedExecuteNS::TestEnqueue(); 997 } 998 999 //-----------------------------------------------------------------------------------------// 1000 1001 class TestDelegatedSpawnWaitBody : utils::NoAssign { 1002 tbb::task_arena &my_a; 1003 utils::SpinBarrier &my_b1, &my_b2; 1004 public: 1005 TestDelegatedSpawnWaitBody( tbb::task_arena &a, utils::SpinBarrier &b1, utils::SpinBarrier &b2) 1006 : my_a(a), my_b1(b1), my_b2(b2) {} 1007 // NativeParallelFor's functor 1008 void operator()(int idx) const { 1009 if ( idx==0 ) { // thread 0 works in the arena, thread 1 waits for it (to prevent test hang) 1010 for (int i = 0; i < 2; ++i) { 1011 my_a.enqueue([this] { my_b1.wait(); }); // tasks to sync with workers 1012 } 1013 tbb::task_group tg; 1014 my_b1.wait(); // sync with the workers 1015 for( int i=0; i<100000; ++i) { 1016 my_a.execute([&tg] { tg.run([] {}); }); 1017 } 1018 my_a.execute([&tg] {tg.wait(); }); 1019 } 1020 1021 my_b2.wait(); // sync both threads 1022 } 1023 }; 1024 1025 void TestDelegatedSpawnWait() { 1026 // Regression test for a bug with missed wakeup notification from a delegated task 1027 tbb::task_arena a(2,0); 1028 a.initialize(); 1029 utils::SpinBarrier barrier1(3), barrier2(2); 1030 utils::NativeParallelFor( 2, TestDelegatedSpawnWaitBody(a, barrier1, barrier2) ); 1031 a.debug_wait_until_empty(); 1032 } 1033 1034 //-----------------------------------------------------------------------------------------// 1035 1036 class TestMultipleWaitsArenaWait : utils::NoAssign { 1037 using wait_context = tbb::detail::d1::wait_context; 1038 public: 1039 TestMultipleWaitsArenaWait( int idx, int bunch_size, int num_tasks, std::vector<wait_context*>& waiters, std::atomic<int>& processed, tbb::task_group_context& tgc ) 1040 : my_idx( idx ), my_bunch_size( bunch_size ), my_num_tasks(num_tasks), my_waiters( waiters ), my_processed( processed ), my_context(tgc) {} 1041 void operator()() const { 1042 ++my_processed; 1043 // Wait for all tasks 1044 if ( my_idx < my_num_tasks ) { 1045 tbb::detail::d1::wait(*my_waiters[my_idx], my_context); 1046 } 1047 // Signal waiting tasks 1048 if ( my_idx >= my_bunch_size ) { 1049 my_waiters[my_idx-my_bunch_size]->release(); 1050 } 1051 } 1052 private: 1053 int my_idx; 1054 int my_bunch_size; 1055 int my_num_tasks; 1056 std::vector<wait_context*>& my_waiters; 1057 std::atomic<int>& my_processed; 1058 tbb::task_group_context& my_context; 1059 }; 1060 1061 class TestMultipleWaitsThreadBody : utils::NoAssign { 1062 using wait_context = tbb::detail::d1::wait_context; 1063 public: 1064 TestMultipleWaitsThreadBody( int bunch_size, int num_tasks, tbb::task_arena& a, std::vector<wait_context*>& waiters, std::atomic<int>& processed, tbb::task_group_context& tgc ) 1065 : my_bunch_size( bunch_size ), my_num_tasks( num_tasks ), my_arena( a ), my_waiters( waiters ), my_processed( processed ), my_context(tgc) {} 1066 void operator()( int idx ) const { 1067 my_arena.execute( TestMultipleWaitsArenaWait( idx, my_bunch_size, my_num_tasks, my_waiters, my_processed, my_context ) ); 1068 --my_processed; 1069 } 1070 private: 1071 int my_bunch_size; 1072 int my_num_tasks; 1073 tbb::task_arena& my_arena; 1074 std::vector<wait_context*>& my_waiters; 1075 std::atomic<int>& my_processed; 1076 tbb::task_group_context& my_context; 1077 }; 1078 1079 void TestMultipleWaits( int num_threads, int num_bunches, int bunch_size ) { 1080 tbb::task_arena a( num_threads ); 1081 const int num_tasks = (num_bunches-1)*bunch_size; 1082 1083 tbb::task_group_context tgc; 1084 std::vector<tbb::detail::d1::wait_context*> waiters(num_tasks); 1085 for (auto& w : waiters) w = new tbb::detail::d1::wait_context(0); 1086 1087 std::atomic<int> processed(0); 1088 for ( int repeats = 0; repeats<10; ++repeats ) { 1089 int idx = 0; 1090 for ( int bunch = 0; bunch < num_bunches-1; ++bunch ) { 1091 // Sync with the previous bunch of waiters to prevent "false" nested dependicies (when a nested task waits for an outer task). 1092 while ( processed < bunch*bunch_size ) utils::yield(); 1093 // Run the bunch of threads/waiters that depend on the next bunch of threads/waiters. 1094 for ( int i = 0; i<bunch_size; ++i ) { 1095 waiters[idx]->reserve(); 1096 std::thread( TestMultipleWaitsThreadBody( bunch_size, num_tasks, a, waiters, processed, tgc ), idx++ ).detach(); 1097 } 1098 } 1099 // No sync because the threads of the last bunch do not call wait_for_all. 1100 // Run the last bunch of threads. 1101 for ( int i = 0; i<bunch_size; ++i ) 1102 std::thread( TestMultipleWaitsThreadBody( bunch_size, num_tasks, a, waiters, processed, tgc ), idx++ ).detach(); 1103 while ( processed ) utils::yield(); 1104 } 1105 for (auto w : waiters) delete w; 1106 } 1107 1108 void TestMultipleWaits() { 1109 // Limit the number of threads to prevent heavy oversubscription. 1110 #if TBB_TEST_LOW_WORKLOAD 1111 const int max_threads = std::min( 4, tbb::this_task_arena::max_concurrency() ); 1112 #else 1113 const int max_threads = std::min( 16, tbb::this_task_arena::max_concurrency() ); 1114 #endif 1115 1116 utils::FastRandom<> rnd(1234); 1117 for ( int threads = 1; threads <= max_threads; threads += utils::max( threads/2, 1 ) ) { 1118 for ( int i = 0; i<3; ++i ) { 1119 const int num_bunches = 3 + rnd.get()%3; 1120 const int bunch_size = max_threads + rnd.get()%max_threads; 1121 TestMultipleWaits( threads, num_bunches, bunch_size ); 1122 } 1123 } 1124 } 1125 1126 //--------------------------------------------------// 1127 1128 void TestSmallStackSize() { 1129 tbb::global_control gc(tbb::global_control::thread_stack_size, 1130 tbb::global_control::active_value(tbb::global_control::thread_stack_size) / 2 ); 1131 // The test produces the warning (not a error) if fails. So the test is run many times 1132 // to make the log annoying (to force to consider it as an error). 1133 for (int i = 0; i < 100; ++i) { 1134 tbb::task_arena a; 1135 a.initialize(); 1136 } 1137 } 1138 1139 //--------------------------------------------------// 1140 1141 namespace TestMoveSemanticsNS { 1142 struct TestFunctor { 1143 void operator()() const {}; 1144 }; 1145 1146 struct MoveOnlyFunctor : utils::MoveOnly, TestFunctor { 1147 MoveOnlyFunctor() : utils::MoveOnly() {}; 1148 MoveOnlyFunctor(MoveOnlyFunctor&& other) : utils::MoveOnly(std::move(other)) {}; 1149 }; 1150 1151 struct MovePreferableFunctor : utils::Movable, TestFunctor { 1152 MovePreferableFunctor() : utils::Movable() {}; 1153 MovePreferableFunctor(MovePreferableFunctor&& other) : utils::Movable( std::move(other) ) {}; 1154 MovePreferableFunctor(const MovePreferableFunctor& other) : utils::Movable(other) {}; 1155 }; 1156 1157 struct NoMoveNoCopyFunctor : utils::NoCopy, TestFunctor { 1158 NoMoveNoCopyFunctor() : utils::NoCopy() {}; 1159 // mv ctor is not allowed as cp ctor from parent NoCopy 1160 private: 1161 NoMoveNoCopyFunctor(NoMoveNoCopyFunctor&&); 1162 }; 1163 1164 void TestFunctors() { 1165 tbb::task_arena ta; 1166 MovePreferableFunctor mpf; 1167 // execute() doesn't have any copies or moves of arguments inside the impl 1168 ta.execute( NoMoveNoCopyFunctor() ); 1169 1170 ta.enqueue( MoveOnlyFunctor() ); 1171 ta.enqueue( mpf ); 1172 REQUIRE_MESSAGE(mpf.alive, "object was moved when was passed by lval"); 1173 mpf.Reset(); 1174 ta.enqueue( std::move(mpf) ); 1175 REQUIRE_MESSAGE(!mpf.alive, "object was copied when was passed by rval"); 1176 mpf.Reset(); 1177 } 1178 } 1179 1180 void TestMoveSemantics() { 1181 TestMoveSemanticsNS::TestFunctors(); 1182 } 1183 1184 //--------------------------------------------------// 1185 1186 #include <vector> 1187 1188 #include "common/state_trackable.h" 1189 1190 namespace TestReturnValueNS { 1191 struct noDefaultTag {}; 1192 class ReturnType : public StateTrackable<> { 1193 static const int SIZE = 42; 1194 std::vector<int> data; 1195 public: 1196 ReturnType(noDefaultTag) : StateTrackable<>(0) {} 1197 // Define copy constructor to test that it is never called 1198 ReturnType(const ReturnType& r) : StateTrackable<>(r), data(r.data) {} 1199 ReturnType(ReturnType&& r) : StateTrackable<>(std::move(r)), data(std::move(r.data)) {} 1200 1201 void fill() { 1202 for (int i = 0; i < SIZE; ++i) 1203 data.push_back(i); 1204 } 1205 void check() { 1206 REQUIRE(data.size() == unsigned(SIZE)); 1207 for (int i = 0; i < SIZE; ++i) 1208 REQUIRE(data[i] == i); 1209 StateTrackableCounters::counters_type& cnts = StateTrackableCounters::counters; 1210 REQUIRE(cnts[StateTrackableBase::DefaultInitialized] == 0); 1211 REQUIRE(cnts[StateTrackableBase::DirectInitialized] == 1); 1212 std::size_t copied = cnts[StateTrackableBase::CopyInitialized]; 1213 std::size_t moved = cnts[StateTrackableBase::MoveInitialized]; 1214 REQUIRE(cnts[StateTrackableBase::Destroyed] == copied + moved); 1215 // The number of copies/moves should not exceed 3: function return, store to an internal storage, 1216 // acquire internal storage. 1217 REQUIRE((copied == 0 && moved <=3)); 1218 } 1219 }; 1220 1221 template <typename R> 1222 R function() { 1223 noDefaultTag tag; 1224 R r(tag); 1225 r.fill(); 1226 return r; 1227 } 1228 1229 template <> 1230 void function<void>() {} 1231 1232 template <typename R> 1233 struct Functor { 1234 R operator()() const { 1235 return function<R>(); 1236 } 1237 }; 1238 1239 tbb::task_arena& arena() { 1240 static tbb::task_arena a; 1241 return a; 1242 } 1243 1244 template <typename F> 1245 void TestExecute(F &f) { 1246 StateTrackableCounters::reset(); 1247 ReturnType r = arena().execute(f); 1248 r.check(); 1249 } 1250 1251 template <typename F> 1252 void TestExecute(const F &f) { 1253 StateTrackableCounters::reset(); 1254 ReturnType r = arena().execute(f); 1255 r.check(); 1256 } 1257 template <typename F> 1258 void TestIsolate(F &f) { 1259 StateTrackableCounters::reset(); 1260 ReturnType r = tbb::this_task_arena::isolate(f); 1261 r.check(); 1262 } 1263 1264 template <typename F> 1265 void TestIsolate(const F &f) { 1266 StateTrackableCounters::reset(); 1267 ReturnType r = tbb::this_task_arena::isolate(f); 1268 r.check(); 1269 } 1270 1271 void Test() { 1272 TestExecute(Functor<ReturnType>()); 1273 Functor<ReturnType> f1; 1274 TestExecute(f1); 1275 TestExecute(function<ReturnType>); 1276 1277 arena().execute(Functor<void>()); 1278 Functor<void> f2; 1279 arena().execute(f2); 1280 arena().execute(function<void>); 1281 TestIsolate(Functor<ReturnType>()); 1282 TestIsolate(f1); 1283 TestIsolate(function<ReturnType>); 1284 tbb::this_task_arena::isolate(Functor<void>()); 1285 tbb::this_task_arena::isolate(f2); 1286 tbb::this_task_arena::isolate(function<void>); 1287 } 1288 } 1289 1290 void TestReturnValue() { 1291 TestReturnValueNS::Test(); 1292 } 1293 1294 //--------------------------------------------------// 1295 1296 // MyObserver checks if threads join to the same arena 1297 struct MyObserver: public tbb::task_scheduler_observer { 1298 tbb::enumerable_thread_specific<tbb::task_arena*>& my_tls; 1299 tbb::task_arena& my_arena; 1300 std::atomic<int>& my_failure_counter; 1301 std::atomic<int>& my_counter; 1302 utils::SpinBarrier& m_barrier; 1303 1304 MyObserver(tbb::task_arena& a, 1305 tbb::enumerable_thread_specific<tbb::task_arena*>& tls, 1306 std::atomic<int>& failure_counter, 1307 std::atomic<int>& counter, 1308 utils::SpinBarrier& barrier) 1309 : tbb::task_scheduler_observer(a), my_tls(tls), my_arena(a), 1310 my_failure_counter(failure_counter), my_counter(counter), m_barrier(barrier) { 1311 observe(true); 1312 } 1313 void on_scheduler_entry(bool worker) override { 1314 if (worker) { 1315 ++my_counter; 1316 tbb::task_arena*& cur_arena = my_tls.local(); 1317 if (cur_arena != 0 && cur_arena != &my_arena) { 1318 ++my_failure_counter; 1319 } 1320 cur_arena = &my_arena; 1321 m_barrier.wait(); 1322 } 1323 } 1324 void on_scheduler_exit(bool worker) override { 1325 if (worker) { 1326 m_barrier.wait(); // before wakeup 1327 m_barrier.wait(); // after wakeup 1328 } 1329 } 1330 }; 1331 1332 void TestArenaWorkersMigrationWithNumThreads(int n_threads = 0) { 1333 if (n_threads == 0) { 1334 n_threads = tbb::this_task_arena::max_concurrency(); 1335 } 1336 1337 const int max_n_arenas = 8; 1338 int n_arenas = 2; 1339 if(n_threads > 16) { 1340 n_arenas = max_n_arenas; 1341 } else if (n_threads > 8) { 1342 n_arenas = 4; 1343 } 1344 1345 int n_workers = n_threads - 1; 1346 n_workers = n_arenas * (n_workers / n_arenas); 1347 if (n_workers == 0) { 1348 return; 1349 } 1350 1351 n_threads = n_workers + 1; 1352 tbb::global_control control(tbb::global_control::max_allowed_parallelism, n_threads); 1353 1354 const int n_repetitions = 20; 1355 const int n_outer_repetitions = 100; 1356 std::multiset<float> failure_ratio; // for median calculating 1357 utils::SpinBarrier barrier(n_threads); 1358 utils::SpinBarrier worker_barrier(n_workers); 1359 MyObserver* observer[max_n_arenas]; 1360 std::vector<tbb::task_arena> arenas(n_arenas); 1361 std::atomic<int> failure_counter; 1362 std::atomic<int> counter; 1363 tbb::enumerable_thread_specific<tbb::task_arena*> tls; 1364 1365 for (int i = 0; i < n_arenas; ++i) { 1366 arenas[i].initialize(n_workers / n_arenas + 1); // +1 for master 1367 observer[i] = new MyObserver(arenas[i], tls, failure_counter, counter, barrier); 1368 } 1369 1370 int ii = 0; 1371 for (; ii < n_outer_repetitions; ++ii) { 1372 failure_counter = 0; 1373 counter = 0; 1374 1375 // Main code 1376 auto wakeup = [&arenas] { for (auto& a : arenas) a.enqueue([]{}); }; 1377 wakeup(); 1378 for (int j = 0; j < n_repetitions; ++j) { 1379 barrier.wait(); // entry 1380 barrier.wait(); // exit1 1381 wakeup(); 1382 barrier.wait(); // exit2 1383 } 1384 barrier.wait(); // entry 1385 barrier.wait(); // exit1 1386 barrier.wait(); // exit2 1387 1388 failure_ratio.insert(float(failure_counter) / counter); 1389 tls.clear(); 1390 // collect 3 elements in failure_ratio before calculating median 1391 if (ii > 1) { 1392 std::multiset<float>::iterator it = failure_ratio.begin(); 1393 std::advance(it, failure_ratio.size() / 2); 1394 if (*it < 0.02) 1395 break; 1396 } 1397 } 1398 for (int i = 0; i < n_arenas; ++i) { 1399 delete observer[i]; 1400 } 1401 // check if median is so big 1402 std::multiset<float>::iterator it = failure_ratio.begin(); 1403 std::advance(it, failure_ratio.size() / 2); 1404 // TODO: decrease constants 0.05 and 0.3 by setting ratio between n_threads and n_arenas 1405 if (*it > 0.05) { 1406 REPORT("Warning: So many cases when threads join to different arenas.\n"); 1407 REQUIRE_MESSAGE(*it <= 0.3, "A lot of cases when threads join to different arenas.\n"); 1408 } 1409 } 1410 1411 void TestArenaWorkersMigration() { 1412 TestArenaWorkersMigrationWithNumThreads(4); 1413 if (tbb::this_task_arena::max_concurrency() != 4) { 1414 TestArenaWorkersMigrationWithNumThreads(); 1415 } 1416 } 1417 1418 //--------------------------------------------------// 1419 void TestDefaultCreatedWorkersAmount() { 1420 int threads = tbb::this_task_arena::max_concurrency(); 1421 utils::NativeParallelFor(1, [threads](int idx) { 1422 REQUIRE_MESSAGE(idx == 0, "more than 1 thread is going to reset TLS"); 1423 utils::SpinBarrier barrier(threads); 1424 ResetTLS(); 1425 for (int trail = 0; trail < 10; ++trail) { 1426 tbb::parallel_for(0, threads, [threads, &barrier](int) { 1427 REQUIRE_MESSAGE(threads == tbb::this_task_arena::max_concurrency(), "concurrency level is not equal specified threadnum"); 1428 REQUIRE_MESSAGE(tbb::this_task_arena::current_thread_index() < tbb::this_task_arena::max_concurrency(), "amount of created threads is more than specified by default"); 1429 local_id.local() = 1; 1430 // If there is more threads than expected, 'sleep' gives a chance to join unexpected threads. 1431 utils::Sleep(1); 1432 barrier.wait(); 1433 }, tbb::simple_partitioner()); 1434 REQUIRE_MESSAGE(local_id.size() == size_t(threads), "amount of created threads is not equal to default num"); 1435 } 1436 }); 1437 } 1438 1439 void TestAbilityToCreateWorkers(int thread_num) { 1440 tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, thread_num); 1441 // Checks only some part of reserved-external threads amount: 1442 // 0 and 1 reserved threads are important cases but it is also needed 1443 // to collect some statistic data with other amount and to not consume 1444 // whole test sesion time checking each amount 1445 TestArenaConcurrency(thread_num - 1, 0, int(thread_num / 2.72)); 1446 TestArenaConcurrency(thread_num, 1, int(thread_num / 3.14)); 1447 } 1448 1449 void TestDefaultWorkersLimit() { 1450 TestDefaultCreatedWorkersAmount(); 1451 #if TBB_TEST_LOW_WORKLOAD 1452 TestAbilityToCreateWorkers(24); 1453 #else 1454 TestAbilityToCreateWorkers(256); 1455 #endif 1456 } 1457 1458 #if TBB_USE_EXCEPTIONS 1459 1460 void ExceptionInExecute() { 1461 std::size_t thread_number = utils::get_platform_max_threads(); 1462 tbb::task_arena test_arena(thread_number / 2, thread_number / 2); 1463 1464 std::atomic<int> canceled_task{}; 1465 1466 auto parallel_func = [&test_arena, &canceled_task] (std::size_t) { 1467 for (std::size_t i = 0; i < 1000; ++i) { 1468 try { 1469 test_arena.execute([] { 1470 volatile bool suppress_unreachable_code_warning = true; 1471 if (suppress_unreachable_code_warning) { 1472 throw -1; 1473 } 1474 }); 1475 FAIL("An exception should have thrown."); 1476 } catch (int) { 1477 ++canceled_task; 1478 } catch (...) { 1479 FAIL("Wrong type of exception."); 1480 } 1481 } 1482 }; 1483 1484 utils::NativeParallelFor(thread_number, parallel_func); 1485 CHECK(canceled_task == thread_number * 1000); 1486 } 1487 1488 #endif // TBB_USE_EXCEPTIONS 1489 1490 class simple_observer : public tbb::task_scheduler_observer { 1491 static std::atomic<int> idx_counter; 1492 int my_idx; 1493 int myMaxConcurrency; // concurrency of the associated arena 1494 int myNumReservedSlots; // reserved slots in the associated arena 1495 void on_scheduler_entry( bool is_worker ) override { 1496 int current_index = tbb::this_task_arena::current_thread_index(); 1497 CHECK(current_index < (myMaxConcurrency > 1 ? myMaxConcurrency : 2)); 1498 if (is_worker) { 1499 CHECK(current_index >= myNumReservedSlots); 1500 } 1501 } 1502 void on_scheduler_exit( bool /*is_worker*/ ) override 1503 {} 1504 public: 1505 simple_observer(tbb::task_arena &a, int maxConcurrency, int numReservedSlots) 1506 : tbb::task_scheduler_observer(a), my_idx(idx_counter++) 1507 , myMaxConcurrency(maxConcurrency) 1508 , myNumReservedSlots(numReservedSlots) { 1509 observe(true); 1510 } 1511 1512 friend bool operator<(const simple_observer& lhs, const simple_observer& rhs) { 1513 return lhs.my_idx < rhs.my_idx; 1514 } 1515 }; 1516 1517 std::atomic<int> simple_observer::idx_counter{}; 1518 1519 struct arena_handler { 1520 enum arena_status { 1521 alive, 1522 deleting, 1523 deleted 1524 }; 1525 1526 tbb::task_arena* arena; 1527 1528 std::atomic<arena_status> status{alive}; 1529 tbb::spin_rw_mutex arena_in_use{}; 1530 1531 tbb::concurrent_set<simple_observer> observers; 1532 1533 arena_handler(tbb::task_arena* ptr) : arena(ptr) 1534 {} 1535 1536 friend bool operator<(const arena_handler& lhs, const arena_handler& rhs) { 1537 return lhs.arena < rhs.arena; 1538 } 1539 }; 1540 1541 // TODO: Add observer operations 1542 void StressTestMixFunctionality() { 1543 enum operation_type { 1544 create_arena, 1545 delete_arena, 1546 attach_observer, 1547 detach_observer, 1548 arena_execute, 1549 enqueue_task, 1550 last_operation_marker 1551 }; 1552 1553 std::size_t operations_number = last_operation_marker; 1554 std::size_t thread_number = utils::get_platform_max_threads(); 1555 utils::FastRandom<> operation_rnd(42); 1556 tbb::spin_mutex random_operation_guard; 1557 1558 auto get_random_operation = [&operation_rnd, &random_operation_guard, operations_number] () { 1559 tbb::spin_mutex::scoped_lock lock(random_operation_guard); 1560 return static_cast<operation_type>(operation_rnd.get() % operations_number); 1561 }; 1562 1563 utils::FastRandom<> arena_rnd(42); 1564 tbb::spin_mutex random_arena_guard; 1565 auto get_random_arena = [&arena_rnd, &random_arena_guard] () { 1566 tbb::spin_mutex::scoped_lock lock(random_arena_guard); 1567 return arena_rnd.get(); 1568 }; 1569 1570 tbb::concurrent_set<arena_handler> arenas_pool; 1571 1572 std::vector<std::thread> thread_pool; 1573 1574 utils::SpinBarrier thread_barrier(thread_number); 1575 std::size_t max_operations = 20000; 1576 std::atomic<std::size_t> curr_operation{}; 1577 auto thread_func = [&] () { 1578 arenas_pool.emplace(new tbb::task_arena()); 1579 thread_barrier.wait(); 1580 while (curr_operation++ < max_operations) { 1581 switch (get_random_operation()) { 1582 case create_arena : 1583 { 1584 arenas_pool.emplace(new tbb::task_arena()); 1585 break; 1586 } 1587 case delete_arena : 1588 { 1589 auto curr_arena = arenas_pool.begin(); 1590 for (; curr_arena != arenas_pool.end(); ++curr_arena) { 1591 arena_handler::arena_status curr_status = arena_handler::alive; 1592 if (curr_arena->status.compare_exchange_strong(curr_status, arena_handler::deleting)) { 1593 break; 1594 } 1595 } 1596 1597 if (curr_arena == arenas_pool.end()) break; 1598 1599 tbb::spin_rw_mutex::scoped_lock lock(curr_arena->arena_in_use, /*writer*/ true); 1600 1601 delete curr_arena->arena; 1602 curr_arena->status.store(arena_handler::deleted); 1603 1604 break; 1605 } 1606 case attach_observer : 1607 { 1608 tbb::spin_rw_mutex::scoped_lock lock{}; 1609 auto curr_arena = arenas_pool.begin(); 1610 for (; curr_arena != arenas_pool.end(); ++curr_arena) { 1611 if (lock.try_acquire(curr_arena->arena_in_use, /*writer*/ false)) { 1612 if (curr_arena->status == arena_handler::alive) { 1613 break; 1614 } else { 1615 lock.release(); 1616 } 1617 } 1618 } 1619 1620 if (curr_arena == arenas_pool.end()) break; 1621 1622 { 1623 curr_arena->observers.emplace(*curr_arena->arena, thread_number, 1); 1624 } 1625 1626 break; 1627 } 1628 case detach_observer : 1629 { 1630 auto arena_number = get_random_arena() % arenas_pool.size(); 1631 auto curr_arena = arenas_pool.begin(); 1632 std::advance(curr_arena, arena_number); 1633 1634 for (auto it = curr_arena->observers.begin(); it != curr_arena->observers.end(); ++it) { 1635 if (it->is_observing()) { 1636 it->observe(false); 1637 break; 1638 } 1639 } 1640 1641 break; 1642 } 1643 case arena_execute : 1644 { 1645 tbb::spin_rw_mutex::scoped_lock lock{}; 1646 auto curr_arena = arenas_pool.begin(); 1647 for (; curr_arena != arenas_pool.end(); ++curr_arena) { 1648 if (lock.try_acquire(curr_arena->arena_in_use, /*writer*/ false)) { 1649 if (curr_arena->status == arena_handler::alive) { 1650 break; 1651 } else { 1652 lock.release(); 1653 } 1654 } 1655 } 1656 1657 if (curr_arena == arenas_pool.end()) break; 1658 1659 curr_arena->arena->execute([] () { 1660 tbb::parallel_for(tbb::blocked_range<std::size_t>(0, 10000), [] (tbb::blocked_range<std::size_t>&) { 1661 std::atomic<int> sum{}; 1662 // Make some work 1663 for (; sum < 10; ++sum) ; 1664 }); 1665 }); 1666 1667 break; 1668 } 1669 case enqueue_task : 1670 { 1671 tbb::spin_rw_mutex::scoped_lock lock{}; 1672 auto curr_arena = arenas_pool.begin(); 1673 for (; curr_arena != arenas_pool.end(); ++curr_arena) { 1674 if (lock.try_acquire(curr_arena->arena_in_use, /*writer*/ false)) { 1675 if (curr_arena->status == arena_handler::alive) { 1676 break; 1677 } else { 1678 lock.release(); 1679 } 1680 } 1681 } 1682 1683 if (curr_arena == arenas_pool.end()) break; 1684 1685 curr_arena->arena->enqueue([] { 1686 std::atomic<int> sum{}; 1687 // Make some work 1688 for (; sum < 1000; ++sum) ; 1689 }); 1690 1691 break; 1692 } 1693 case last_operation_marker : 1694 break; 1695 } 1696 } 1697 }; 1698 1699 for (std::size_t i = 0; i < thread_number - 1; ++i) { 1700 thread_pool.emplace_back(thread_func); 1701 } 1702 1703 thread_func(); 1704 1705 for (std::size_t i = 0; i < thread_number - 1; ++i) { 1706 if (thread_pool[i].joinable()) thread_pool[i].join(); 1707 } 1708 1709 for (auto& handler : arenas_pool) { 1710 if (handler.status != arena_handler::deleted) delete handler.arena; 1711 } 1712 } 1713 1714 struct enqueue_test_helper { 1715 enqueue_test_helper(tbb::task_arena& arena, tbb::enumerable_thread_specific<bool>& ets , std::atomic<std::size_t>& task_counter) 1716 : my_arena(arena), my_ets(ets), my_task_counter(task_counter) 1717 {} 1718 1719 enqueue_test_helper(const enqueue_test_helper& ef) : my_arena(ef.my_arena), my_ets(ef.my_ets), my_task_counter(ef.my_task_counter) 1720 {} 1721 1722 void operator() () const { 1723 CHECK(my_ets.local()); 1724 if (my_task_counter++ < 100000) my_arena.enqueue(enqueue_test_helper(my_arena, my_ets, my_task_counter)); 1725 utils::yield(); 1726 } 1727 1728 tbb::task_arena& my_arena; 1729 tbb::enumerable_thread_specific<bool>& my_ets; 1730 std::atomic<std::size_t>& my_task_counter; 1731 }; 1732 1733 //--------------------------------------------------// 1734 1735 //! Test for task arena in concurrent cases 1736 //! \brief \ref requirement 1737 TEST_CASE("Test for concurrent functionality") { 1738 TestConcurrentFunctionality(); 1739 } 1740 1741 //! Test for arena entry consistency 1742 //! \brief \ref requirement \ref error_guessing 1743 TEST_CASE("Test for task arena entry consistency") { 1744 TestArenaEntryConsistency(); 1745 } 1746 1747 //! Test for task arena attach functionality 1748 //! \brief \ref requirement \ref interface 1749 TEST_CASE("Test for the attach functionality") { 1750 TestAttach(4); 1751 } 1752 1753 //! Test for constant functor requirements 1754 //! \brief \ref requirement \ref interface 1755 TEST_CASE("Test for constant functor requirement") { 1756 TestConstantFunctorRequirement(); 1757 } 1758 1759 //! Test for move semantics support 1760 //! \brief \ref requirement \ref interface 1761 TEST_CASE("Move semantics support") { 1762 TestMoveSemantics(); 1763 } 1764 1765 //! Test for different return value types 1766 //! \brief \ref requirement \ref interface 1767 TEST_CASE("Return value test") { 1768 TestReturnValue(); 1769 } 1770 1771 //! Test for delegated task spawn in case of unsuccessful slot attach 1772 //! \brief \ref error_guessing 1773 TEST_CASE("Delegated spawn wait") { 1774 TestDelegatedSpawnWait(); 1775 } 1776 1777 //! Test task arena isolation functionality 1778 //! \brief \ref requirement \ref interface 1779 TEST_CASE("Isolated execute") { 1780 // Isolation tests cases is valid only for more then 2 threads 1781 if (tbb::this_task_arena::max_concurrency() > 2) { 1782 TestIsolatedExecute(); 1783 } 1784 } 1785 1786 //! Test for TBB Workers creation limits 1787 //! \brief \ref requirement 1788 TEST_CASE("Default workers limit") { 1789 TestDefaultWorkersLimit(); 1790 } 1791 1792 //! Test for workers migration between arenas 1793 //! \brief \ref error_guessing \ref stress 1794 TEST_CASE("Arena workers migration") { 1795 TestArenaWorkersMigration(); 1796 } 1797 1798 //! Test for multiple waits, threads should not block each other 1799 //! \brief \ref requirement 1800 TEST_CASE("Multiple waits") { 1801 TestMultipleWaits(); 1802 } 1803 1804 //! Test for small stack size settings and arena initialization 1805 //! \brief \ref error_guessing 1806 TEST_CASE("Small stack size") { 1807 TestSmallStackSize(); 1808 } 1809 1810 #if TBB_USE_EXCEPTIONS 1811 //! \brief \ref requirement \ref stress 1812 TEST_CASE("Test for exceptions during execute.") { 1813 ExceptionInExecute(); 1814 } 1815 1816 //! \brief \ref error_guessing 1817 TEST_CASE("Exception thrown during tbb::task_arena::execute call") { 1818 struct throwing_obj { 1819 throwing_obj() { 1820 volatile bool flag = true; 1821 if (flag) throw std::exception{}; 1822 } 1823 throwing_obj(const throwing_obj&) = default; 1824 ~throwing_obj() { FAIL("An destructor was called."); } 1825 }; 1826 1827 tbb::task_arena arena; 1828 1829 REQUIRE_THROWS_AS( [&] { 1830 arena.execute([] { 1831 return throwing_obj{}; 1832 }); 1833 }(), std::exception ); 1834 } 1835 #endif // TBB_USE_EXCEPTIONS 1836 1837 //! \brief \ref stress 1838 TEST_CASE("Stress test with mixing functionality") { 1839 StressTestMixFunctionality(); 1840 } 1841 1842 //! \brief \ref stress 1843 TEST_CASE("Workers oversubscription") { 1844 std::size_t num_threads = utils::get_platform_max_threads(); 1845 tbb::enumerable_thread_specific<bool> ets; 1846 tbb::global_control gl(tbb::global_control::max_allowed_parallelism, num_threads * 2); 1847 tbb::task_arena arena(num_threads * 2); 1848 1849 utils::SpinBarrier barrier(num_threads * 2); 1850 1851 arena.execute([&] { 1852 tbb::parallel_for(std::size_t(0), num_threads * 2, 1853 [&] (const std::size_t&) { 1854 ets.local() = true; 1855 barrier.wait(); 1856 } 1857 ); 1858 }); 1859 1860 utils::yield(); 1861 1862 std::atomic<std::size_t> task_counter{0}; 1863 for (std::size_t i = 0; i < num_threads / 4 + 1; ++i) { 1864 arena.enqueue(enqueue_test_helper(arena, ets, task_counter)); 1865 } 1866 1867 while (task_counter < 100000) utils::yield(); 1868 1869 arena.execute([&] { 1870 tbb::parallel_for(std::size_t(0), num_threads * 2, 1871 [&] (const std::size_t&) { 1872 CHECK(ets.local()); 1873 barrier.wait(); 1874 } 1875 ); 1876 }); 1877 } 1878