1 /* 2 Copyright (c) 2005-2022 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "common/test.h" 18 19 #define __TBB_EXTRA_DEBUG 1 20 #include "common/concurrency_tracker.h" 21 #include "common/spin_barrier.h" 22 #include "common/utils.h" 23 #include "common/utils_report.h" 24 #include "common/utils_concurrency_limit.h" 25 26 #include "tbb/task_arena.h" 27 #include "tbb/task_scheduler_observer.h" 28 #include "tbb/enumerable_thread_specific.h" 29 #include "tbb/parallel_for.h" 30 #include "tbb/global_control.h" 31 #include "tbb/concurrent_set.h" 32 #include "tbb/spin_mutex.h" 33 #include "tbb/spin_rw_mutex.h" 34 #include "tbb/task_group.h" 35 36 #include <stdexcept> 37 #include <cstdlib> 38 #include <cstdio> 39 #include <vector> 40 #include <thread> 41 #include <atomic> 42 43 //#include "harness_fp.h" 44 45 //! \file test_task_arena.cpp 46 //! \brief Test for [scheduler.task_arena scheduler.task_scheduler_observer] specification 47 48 //--------------------------------------------------// 49 // Test that task_arena::initialize and task_arena::terminate work when doing nothing else. 50 /* maxthread is treated as the biggest possible concurrency level. */ 51 void InitializeAndTerminate( int maxthread ) { 52 for( int i=0; i<200; ++i ) { 53 switch( i&3 ) { 54 // Arena is created inactive, initialization is always explicit. Lazy initialization is covered by other test functions. 55 // Explicit initialization can either keep the original values or change those. 56 // Arena termination can be explicit or implicit (in the destructor). 57 // TODO: extend with concurrency level checks if such a method is added. 58 default: { 59 tbb::task_arena arena( std::rand() % maxthread + 1 ); 60 CHECK_MESSAGE(!arena.is_active(), "arena should not be active until initialized"); 61 arena.initialize(); 62 CHECK(arena.is_active()); 63 arena.terminate(); 64 CHECK_MESSAGE(!arena.is_active(), "arena should not be active; it was terminated"); 65 break; 66 } 67 case 0: { 68 tbb::task_arena arena( 1 ); 69 CHECK_MESSAGE(!arena.is_active(), "arena should not be active until initialized"); 70 arena.initialize( std::rand() % maxthread + 1 ); // change the parameters 71 CHECK(arena.is_active()); 72 break; 73 } 74 case 1: { 75 tbb::task_arena arena( tbb::task_arena::automatic ); 76 CHECK(!arena.is_active()); 77 arena.initialize(); 78 CHECK(arena.is_active()); 79 break; 80 } 81 case 2: { 82 tbb::task_arena arena; 83 CHECK_MESSAGE(!arena.is_active(), "arena should not be active until initialized"); 84 arena.initialize( std::rand() % maxthread + 1 ); 85 CHECK(arena.is_active()); 86 arena.terminate(); 87 CHECK_MESSAGE(!arena.is_active(), "arena should not be active; it was terminated"); 88 break; 89 } 90 } 91 } 92 } 93 94 //--------------------------------------------------// 95 96 // Definitions used in more than one test 97 typedef tbb::blocked_range<int> Range; 98 99 // slot_id value: -1 is reserved by current_slot(), -2 is set in on_scheduler_exit() below 100 static tbb::enumerable_thread_specific<int> local_id, old_id, slot_id(-3); 101 102 void ResetTLS() { 103 local_id.clear(); 104 old_id.clear(); 105 slot_id.clear(); 106 } 107 108 class ArenaObserver : public tbb::task_scheduler_observer { 109 int myId; // unique observer/arena id within a test 110 int myMaxConcurrency; // concurrency of the associated arena 111 int myNumReservedSlots; // reserved slots in the associated arena 112 void on_scheduler_entry( bool is_worker ) override { 113 int current_index = tbb::this_task_arena::current_thread_index(); 114 CHECK(current_index < (myMaxConcurrency > 1 ? myMaxConcurrency : 2)); 115 if (is_worker) { 116 CHECK(current_index >= myNumReservedSlots); 117 } 118 CHECK_MESSAGE(!old_id.local(), "double call to on_scheduler_entry"); 119 old_id.local() = local_id.local(); 120 CHECK_MESSAGE(old_id.local() != myId, "double entry to the same arena"); 121 local_id.local() = myId; 122 slot_id.local() = current_index; 123 } 124 void on_scheduler_exit( bool /*is_worker*/ ) override { 125 CHECK_MESSAGE(local_id.local() == myId, "nesting of arenas is broken"); 126 CHECK(slot_id.local() == tbb::this_task_arena::current_thread_index()); 127 slot_id.local() = -2; 128 local_id.local() = old_id.local(); 129 old_id.local() = 0; 130 } 131 public: 132 ArenaObserver(tbb::task_arena &a, int maxConcurrency, int numReservedSlots, int id) 133 : tbb::task_scheduler_observer(a) 134 , myId(id) 135 , myMaxConcurrency(maxConcurrency) 136 , myNumReservedSlots(numReservedSlots) { 137 CHECK(myId); 138 observe(true); 139 } 140 ~ArenaObserver () { 141 observe(false); 142 CHECK_MESSAGE(!old_id.local(), "inconsistent observer state"); 143 } 144 }; 145 146 struct IndexTrackingBody { // Must be used together with ArenaObserver 147 void operator() ( const Range& ) const { 148 CHECK(slot_id.local() == tbb::this_task_arena::current_thread_index()); 149 utils::doDummyWork(50000); 150 } 151 }; 152 153 struct AsynchronousWork { 154 utils::SpinBarrier &my_barrier; 155 bool my_is_blocking; 156 AsynchronousWork(utils::SpinBarrier &a_barrier, bool blocking = true) 157 : my_barrier(a_barrier), my_is_blocking(blocking) {} 158 void operator()() const { 159 CHECK_MESSAGE(local_id.local() != 0, "not in explicit arena"); 160 tbb::parallel_for(Range(0,500), IndexTrackingBody(), tbb::simple_partitioner()); 161 if(my_is_blocking) my_barrier.wait(); // must be asynchronous to an external thread 162 else my_barrier.signalNoWait(); 163 } 164 }; 165 166 //-----------------------------------------------------------------------------------------// 167 168 // Test that task_arenas might be created and used from multiple application threads. 169 // Also tests arena observers. The parameter p is the index of an app thread running this test. 170 void TestConcurrentArenasFunc(int idx) { 171 // A regression test for observer activation order: 172 // check that arena observer can be activated before local observer 173 struct LocalObserver : public tbb::task_scheduler_observer { 174 LocalObserver() : tbb::task_scheduler_observer() { observe(true); } 175 LocalObserver(tbb::task_arena& a) : tbb::task_scheduler_observer(a) { 176 observe(true); 177 } 178 ~LocalObserver() { 179 observe(false); 180 } 181 }; 182 183 tbb::task_arena a1; 184 a1.initialize(1,0); 185 ArenaObserver o1(a1, 1, 0, idx*2+1); // the last argument is a "unique" observer/arena id for the test 186 CHECK_MESSAGE(o1.is_observing(), "Arena observer has not been activated"); 187 188 tbb::task_arena a2(2,1); 189 ArenaObserver o2(a2, 2, 1, idx*2+2); 190 CHECK_MESSAGE(o2.is_observing(), "Arena observer has not been activated"); 191 192 LocalObserver lo1; 193 CHECK_MESSAGE(lo1.is_observing(), "Local observer has not been activated"); 194 195 tbb::task_arena a3(1, 0); 196 LocalObserver lo2(a3); 197 CHECK_MESSAGE(lo2.is_observing(), "Local observer has not been activated"); 198 199 utils::SpinBarrier barrier(2); 200 AsynchronousWork work(barrier); 201 202 a1.enqueue(work); // put async work 203 barrier.wait(); 204 205 a2.enqueue(work); // another work 206 a2.execute(work); 207 208 a3.execute([] { 209 utils::doDummyWork(100); 210 }); 211 212 a1.debug_wait_until_empty(); 213 a2.debug_wait_until_empty(); 214 } 215 216 void TestConcurrentArenas(int p) { 217 // TODO REVAMP fix with global control 218 ResetTLS(); 219 utils::NativeParallelFor( p, &TestConcurrentArenasFunc ); 220 } 221 //--------------------------------------------------// 222 // Test multiple application threads working with a single arena at the same time. 223 class MultipleMastersPart1 : utils::NoAssign { 224 tbb::task_arena &my_a; 225 utils::SpinBarrier &my_b1, &my_b2; 226 public: 227 MultipleMastersPart1( tbb::task_arena &a, utils::SpinBarrier &b1, utils::SpinBarrier &b2) 228 : my_a(a), my_b1(b1), my_b2(b2) {} 229 void operator()(int) const { 230 my_a.execute(AsynchronousWork(my_b2, /*blocking=*/false)); 231 my_b1.wait(); 232 // A regression test for bugs 1954 & 1971 233 my_a.enqueue(AsynchronousWork(my_b2, /*blocking=*/false)); 234 } 235 }; 236 237 class MultipleMastersPart2 : utils::NoAssign { 238 tbb::task_arena &my_a; 239 utils::SpinBarrier &my_b; 240 public: 241 MultipleMastersPart2( tbb::task_arena &a, utils::SpinBarrier &b) : my_a(a), my_b(b) {} 242 void operator()(int) const { 243 my_a.execute(AsynchronousWork(my_b, /*blocking=*/false)); 244 } 245 }; 246 247 class MultipleMastersPart3 : utils::NoAssign { 248 tbb::task_arena &my_a; 249 utils::SpinBarrier &my_b; 250 using wait_context = tbb::detail::d1::wait_context; 251 252 struct Runner : NoAssign { 253 wait_context& myWait; 254 Runner(wait_context& w) : myWait(w) {} 255 void operator()() const { 256 utils::doDummyWork(10000); 257 myWait.release(); 258 } 259 }; 260 261 struct Waiter : NoAssign { 262 wait_context& myWait; 263 Waiter(wait_context& w) : myWait(w) {} 264 void operator()() const { 265 tbb::task_group_context ctx; 266 tbb::detail::d1::wait(myWait, ctx); 267 } 268 }; 269 270 public: 271 MultipleMastersPart3(tbb::task_arena &a, utils::SpinBarrier &b) 272 : my_a(a), my_b(b) {} 273 void operator()(int) const { 274 wait_context wait(0); 275 my_b.wait(); // increases chances for task_arena initialization contention 276 for( int i=0; i<100; ++i) { 277 wait.reserve(); 278 my_a.enqueue(Runner(wait)); 279 my_a.execute(Waiter(wait)); 280 } 281 my_b.wait(); 282 } 283 }; 284 285 void TestMultipleMasters(int p) { 286 { 287 ResetTLS(); 288 tbb::task_arena a(1,0); 289 a.initialize(); 290 ArenaObserver o(a, 1, 0, 1); 291 utils::SpinBarrier barrier1(p), barrier2(2*p+1); // each of p threads will submit two tasks signaling the barrier 292 NativeParallelFor( p, MultipleMastersPart1(a, barrier1, barrier2) ); 293 barrier2.wait(); 294 a.debug_wait_until_empty(); 295 } { 296 ResetTLS(); 297 tbb::task_arena a(2,1); 298 ArenaObserver o(a, 2, 1, 2); 299 utils::SpinBarrier barrier(p+2); 300 a.enqueue(AsynchronousWork(barrier, /*blocking=*/true)); // occupy the worker, a regression test for bug 1981 301 // TODO: buggy test. A worker threads need time to occupy the slot to prevent an external thread from taking an enqueue task. 302 utils::Sleep(10); 303 NativeParallelFor( p, MultipleMastersPart2(a, barrier) ); 304 barrier.wait(); 305 a.debug_wait_until_empty(); 306 } { 307 // Regression test for the bug 1981 part 2 (task_arena::execute() with wait_for_all for an enqueued task) 308 tbb::task_arena a(p,1); 309 utils::SpinBarrier barrier(p+1); // for external threads to avoid endless waiting at least in some runs 310 // "Oversubscribe" the arena by 1 external thread 311 NativeParallelFor( p+1, MultipleMastersPart3(a, barrier) ); 312 a.debug_wait_until_empty(); 313 } 314 } 315 316 //--------------------------------------------------// 317 // TODO: explain what TestArenaEntryConsistency does 318 #include <sstream> 319 #include <stdexcept> 320 #include "oneapi/tbb/detail/_exception.h" 321 #include "common/fp_control.h" 322 323 struct TestArenaEntryBody : FPModeContext { 324 std::atomic<int> &my_stage; // each execute increases it 325 std::stringstream my_id; 326 bool is_caught, is_expected; 327 enum { arenaFPMode = 1 }; 328 329 TestArenaEntryBody(std::atomic<int> &s, int idx, int i) // init thread-specific instance 330 : FPModeContext(idx+i) 331 , my_stage(s) 332 , is_caught(false) 333 #if TBB_USE_EXCEPTIONS 334 , is_expected( (idx&(1<<i)) != 0 ) 335 #else 336 , is_expected(false) 337 #endif 338 { 339 my_id << idx << ':' << i << '@'; 340 } 341 void operator()() { // inside task_arena::execute() 342 // synchronize with other stages 343 int stage = my_stage++; 344 int slot = tbb::this_task_arena::current_thread_index(); 345 CHECK(slot >= 0); 346 CHECK(slot <= 1); 347 // wait until the third stage is delegated and then starts on slot 0 348 while(my_stage < 2+slot) utils::yield(); 349 // deduct its entry type and put it into id, it helps to find source of a problem 350 my_id << (stage < 3 ? (tbb::this_task_arena::current_thread_index()? 351 "delegated_to_worker" : stage < 2? "direct" : "delegated_to_master") 352 : stage == 3? "nested_same_ctx" : "nested_alien_ctx"); 353 AssertFPMode(arenaFPMode); 354 if (is_expected) { 355 TBB_TEST_THROW(std::logic_error(my_id.str())); 356 } 357 // no code can be put here since exceptions can be thrown 358 } 359 void on_exception(const char *e) { // outside arena, in catch block 360 is_caught = true; 361 CHECK(my_id.str() == e); 362 assertFPMode(); 363 } 364 void after_execute() { // outside arena and catch block 365 CHECK(is_caught == is_expected); 366 assertFPMode(); 367 } 368 }; 369 370 class ForEachArenaEntryBody : utils::NoAssign { 371 tbb::task_arena &my_a; // expected task_arena(2,1) 372 std::atomic<int> &my_stage; // each execute increases it 373 int my_idx; 374 375 public: 376 ForEachArenaEntryBody(tbb::task_arena &a, std::atomic<int> &c) 377 : my_a(a), my_stage(c), my_idx(0) {} 378 379 void test(int idx) { 380 my_idx = idx; 381 my_stage = 0; 382 NativeParallelFor(3, *this); // test cross-arena calls 383 CHECK(my_stage == 3); 384 my_a.execute(*this); // test nested calls 385 CHECK(my_stage == 5); 386 } 387 388 // task_arena functor for nested tests 389 void operator()() const { 390 test_arena_entry(3); // in current task group context 391 tbb::parallel_for(4, 5, *this); // in different context 392 } 393 394 // NativeParallelFor & parallel_for functor 395 void operator()(int i) const { 396 test_arena_entry(i); 397 } 398 399 private: 400 void test_arena_entry(int i) const { 401 GetRoundingMode(); 402 TestArenaEntryBody scoped_functor(my_stage, my_idx, i); 403 GetRoundingMode(); 404 #if TBB_USE_EXCEPTIONS 405 try { 406 my_a.execute(scoped_functor); 407 } catch(std::logic_error &e) { 408 scoped_functor.on_exception(e.what()); 409 } catch(...) { CHECK_MESSAGE(false, "Unexpected exception type"); } 410 #else 411 my_a.execute(scoped_functor); 412 #endif 413 scoped_functor.after_execute(); 414 } 415 }; 416 417 void TestArenaEntryConsistency() { 418 tbb::task_arena a(2, 1); 419 std::atomic<int> c; 420 ForEachArenaEntryBody body(a, c); 421 422 FPModeContext fp_scope(TestArenaEntryBody::arenaFPMode); 423 a.initialize(); // capture FP settings to arena 424 fp_scope.setNextFPMode(); 425 426 for (int i = 0; i < 100; i++) // not less than 32 = 2^5 of entry types 427 body.test(i); 428 } 429 //-------------------------------------------------- 430 // Test that the requested degree of concurrency for task_arena is achieved in various conditions 431 class TestArenaConcurrencyBody : utils::NoAssign { 432 tbb::task_arena &my_a; 433 int my_max_concurrency; 434 int my_reserved_slots; 435 utils::SpinBarrier *my_barrier; 436 utils::SpinBarrier *my_worker_barrier; 437 public: 438 TestArenaConcurrencyBody( tbb::task_arena &a, int max_concurrency, int reserved_slots, utils::SpinBarrier *b = NULL, utils::SpinBarrier *wb = NULL ) 439 : my_a(a), my_max_concurrency(max_concurrency), my_reserved_slots(reserved_slots), my_barrier(b), my_worker_barrier(wb) {} 440 // NativeParallelFor's functor 441 void operator()( int ) const { 442 CHECK_MESSAGE( local_id.local() == 0, "TLS was not cleaned?" ); 443 local_id.local() = 1; 444 my_a.execute( *this ); 445 } 446 // Arena's functor 447 void operator()() const { 448 int idx = tbb::this_task_arena::current_thread_index(); 449 CHECK( idx < (my_max_concurrency > 1 ? my_max_concurrency : 2) ); 450 CHECK( my_a.max_concurrency() == tbb::this_task_arena::max_concurrency() ); 451 int max_arena_concurrency = tbb::this_task_arena::max_concurrency(); 452 CHECK( max_arena_concurrency == my_max_concurrency ); 453 if ( my_worker_barrier ) { 454 if ( local_id.local() == 1 ) { 455 // External thread in a reserved slot 456 CHECK_MESSAGE( idx < my_reserved_slots, "External threads are supposed to use only reserved slots in this test" ); 457 } else { 458 // Worker thread 459 CHECK( idx >= my_reserved_slots ); 460 my_worker_barrier->wait(); 461 } 462 } else if ( my_barrier ) 463 CHECK_MESSAGE( local_id.local() == 1, "Workers are not supposed to enter the arena in this test" ); 464 if ( my_barrier ) my_barrier->wait(); 465 else utils::Sleep( 1 ); 466 } 467 }; 468 469 void TestArenaConcurrency( int p, int reserved = 0, int step = 1) { 470 for (; reserved <= p; reserved += step) { 471 tbb::task_arena a( p, reserved ); 472 if (p - reserved < tbb::this_task_arena::max_concurrency()) { 473 // Check concurrency with worker & reserved external threads. 474 ResetTLS(); 475 utils::SpinBarrier b( p ); 476 utils::SpinBarrier wb( p-reserved ); 477 TestArenaConcurrencyBody test( a, p, reserved, &b, &wb ); 478 for ( int i = reserved; i < p; ++i ) // requests p-reserved worker threads 479 a.enqueue( test ); 480 if ( reserved==1 ) 481 test( 0 ); // calls execute() 482 else 483 utils::NativeParallelFor( reserved, test ); 484 a.debug_wait_until_empty(); 485 } { // Check if multiple external threads alone can achieve maximum concurrency. 486 ResetTLS(); 487 utils::SpinBarrier b( p ); 488 utils::NativeParallelFor( p, TestArenaConcurrencyBody( a, p, reserved, &b ) ); 489 a.debug_wait_until_empty(); 490 } { // Check oversubscription by external threads. 491 #if !_WIN32 || !_WIN64 492 // Some C++ implementations allocate 8MB stacks for std::thread on 32 bit platforms 493 // that makes impossible to create more than ~500 threads. 494 if ( !(sizeof(std::size_t) == 4 && p > 200) ) 495 #endif 496 #if TBB_TEST_LOW_WORKLOAD 497 if ( p <= 16 ) 498 #endif 499 { 500 ResetTLS(); 501 utils::NativeParallelFor(2 * p, TestArenaConcurrencyBody(a, p, reserved)); 502 a.debug_wait_until_empty(); 503 } 504 } 505 } 506 } 507 508 struct TestMandatoryConcurrencyObserver : public tbb::task_scheduler_observer { 509 utils::SpinBarrier& m_barrier; 510 511 TestMandatoryConcurrencyObserver(tbb::task_arena& a, utils::SpinBarrier& barrier) 512 : tbb::task_scheduler_observer(a), m_barrier(barrier) { 513 observe(true); 514 } 515 ~TestMandatoryConcurrencyObserver() { 516 observe(false); 517 } 518 void on_scheduler_exit(bool worker) override { 519 if (worker) { 520 m_barrier.wait(); 521 } 522 } 523 }; 524 525 void TestMandatoryConcurrency() { 526 tbb::task_arena a(1); 527 a.execute([&a] { 528 int n_threads = 4; 529 utils::SpinBarrier exit_barrier(2); 530 TestMandatoryConcurrencyObserver observer(a, exit_barrier); 531 for (int j = 0; j < 5; ++j) { 532 utils::ExactConcurrencyLevel::check(1); 533 std::atomic<int> num_tasks{ 0 }, curr_tasks{ 0 }; 534 utils::SpinBarrier barrier(n_threads); 535 utils::NativeParallelFor(n_threads, [&](int) { 536 for (int i = 0; i < 5; ++i) { 537 barrier.wait(); 538 a.enqueue([&] { 539 CHECK(tbb::this_task_arena::max_concurrency() == 2); 540 CHECK(a.max_concurrency() == 2); 541 ++curr_tasks; 542 CHECK(curr_tasks == 1); 543 utils::doDummyWork(1000); 544 CHECK(curr_tasks == 1); 545 --curr_tasks; 546 ++num_tasks; 547 }); 548 barrier.wait(); 549 } 550 }); 551 do { 552 exit_barrier.wait(); 553 } while (num_tasks < n_threads * 5); 554 } 555 }); 556 } 557 558 void TestConcurrentFunctionality(int min_thread_num = 1, int max_thread_num = 3) { 559 TestMandatoryConcurrency(); 560 InitializeAndTerminate(max_thread_num); 561 for (int p = min_thread_num; p <= max_thread_num; ++p) { 562 TestConcurrentArenas(p); 563 TestMultipleMasters(p); 564 TestArenaConcurrency(p); 565 } 566 } 567 568 //--------------------------------------------------// 569 // Test creation/initialization of a task_arena that references an existing arena (aka attach). 570 // This part of the test uses the knowledge of task_arena internals 571 572 struct TaskArenaValidator { 573 int my_slot_at_construction; 574 const tbb::task_arena& my_arena; 575 TaskArenaValidator( const tbb::task_arena& other ) 576 : my_slot_at_construction(tbb::this_task_arena::current_thread_index()) 577 , my_arena(other) 578 {} 579 // Inspect the internal state 580 int concurrency() { return my_arena.debug_max_concurrency(); } 581 int reserved_for_masters() { return my_arena.debug_reserved_slots(); } 582 583 // This method should be called in task_arena::execute() for a captured arena 584 // by the same thread that created the validator. 585 void operator()() { 586 CHECK_MESSAGE( tbb::this_task_arena::current_thread_index()==my_slot_at_construction, 587 "Current thread index has changed since the validator construction" ); 588 } 589 }; 590 591 void ValidateAttachedArena( tbb::task_arena& arena, bool expect_activated, 592 int expect_concurrency, int expect_masters ) { 593 CHECK_MESSAGE( arena.is_active()==expect_activated, "Unexpected activation state" ); 594 if( arena.is_active() ) { 595 TaskArenaValidator validator( arena ); 596 CHECK_MESSAGE( validator.concurrency()==expect_concurrency, "Unexpected arena size" ); 597 CHECK_MESSAGE( validator.reserved_for_masters()==expect_masters, "Unexpected # of reserved slots" ); 598 if ( tbb::this_task_arena::current_thread_index() != tbb::task_arena::not_initialized ) { 599 CHECK(tbb::this_task_arena::current_thread_index() >= 0); 600 // for threads already in arena, check that the thread index remains the same 601 arena.execute( validator ); 602 } else { // not_initialized 603 // Test the deprecated method 604 CHECK(tbb::this_task_arena::current_thread_index() == -1); 605 } 606 607 // Ideally, there should be a check for having the same internal arena object, 608 // but that object is not easily accessible for implicit arenas. 609 } 610 } 611 612 struct TestAttachBody : utils::NoAssign { 613 static thread_local int my_idx; // safe to modify and use within the NativeParallelFor functor 614 const int maxthread; 615 TestAttachBody( int max_thr ) : maxthread(max_thr) {} 616 617 // The functor body for NativeParallelFor 618 void operator()( int idx ) const { 619 my_idx = idx; 620 621 int default_threads = tbb::this_task_arena::max_concurrency(); 622 623 tbb::task_arena arena{tbb::task_arena::attach()}; 624 ValidateAttachedArena( arena, false, -1, -1 ); // Nothing yet to attach to 625 626 arena.terminate(); 627 ValidateAttachedArena( arena, false, -1, -1 ); 628 629 // attach to an auto-initialized arena 630 tbb::parallel_for(0, 1, [](int) {}); 631 632 tbb::task_arena arena2{tbb::task_arena::attach()}; 633 ValidateAttachedArena( arena2, true, default_threads, 1 ); 634 635 tbb::task_arena arena3; 636 arena3.initialize(tbb::attach()); 637 ValidateAttachedArena( arena3, true, default_threads, 1 ); 638 639 640 // attach to another task_arena 641 arena.initialize( maxthread, std::min(maxthread,idx) ); 642 arena.execute( *this ); 643 } 644 645 // The functor body for task_arena::execute above 646 void operator()() const { 647 tbb::task_arena arena2{tbb::task_arena::attach()}; 648 ValidateAttachedArena( arena2, true, maxthread, std::min(maxthread,my_idx) ); 649 } 650 651 // The functor body for tbb::parallel_for 652 void operator()( const Range& r ) const { 653 for( int i = r.begin(); i<r.end(); ++i ) { 654 tbb::task_arena arena2{tbb::task_arena::attach()}; 655 ValidateAttachedArena( arena2, true, tbb::this_task_arena::max_concurrency(), 1 ); 656 } 657 } 658 }; 659 660 thread_local int TestAttachBody::my_idx; 661 662 void TestAttach( int maxthread ) { 663 // Externally concurrent, but no concurrency within a thread 664 utils::NativeParallelFor( std::max(maxthread,4), TestAttachBody( maxthread ) ); 665 // Concurrent within the current arena; may also serve as a stress test 666 tbb::parallel_for( Range(0,10000*maxthread), TestAttachBody( maxthread ) ); 667 } 668 669 //--------------------------------------------------// 670 671 // Test that task_arena::enqueue does not tolerate a non-const functor. 672 // TODO: can it be reworked as SFINAE-based compile-time check? 673 struct TestFunctor { 674 void operator()() { CHECK_MESSAGE( false, "Non-const operator called" ); } 675 void operator()() const { /* library requires this overload only */ } 676 }; 677 678 void TestConstantFunctorRequirement() { 679 tbb::task_arena a; 680 TestFunctor tf; 681 a.enqueue( tf ); 682 } 683 684 //--------------------------------------------------// 685 686 #include "tbb/parallel_reduce.h" 687 #include "tbb/parallel_invoke.h" 688 689 // Test this_task_arena::isolate 690 namespace TestIsolatedExecuteNS { 691 template <typename NestedPartitioner> 692 class NestedParFor : utils::NoAssign { 693 public: 694 NestedParFor() {} 695 void operator()() const { 696 NestedPartitioner p; 697 tbb::parallel_for( 0, 10, utils::DummyBody( 10 ), p ); 698 } 699 }; 700 701 template <typename NestedPartitioner> 702 class ParForBody : utils::NoAssign { 703 bool myOuterIsolation; 704 tbb::enumerable_thread_specific<int> &myEts; 705 std::atomic<bool> &myIsStolen; 706 public: 707 ParForBody( bool outer_isolation, tbb::enumerable_thread_specific<int> &ets, std::atomic<bool> &is_stolen ) 708 : myOuterIsolation( outer_isolation ), myEts( ets ), myIsStolen( is_stolen ) {} 709 void operator()( int ) const { 710 int &e = myEts.local(); 711 if ( e++ > 0 ) myIsStolen = true; 712 if ( myOuterIsolation ) 713 NestedParFor<NestedPartitioner>()(); 714 else 715 tbb::this_task_arena::isolate( NestedParFor<NestedPartitioner>() ); 716 --e; 717 } 718 }; 719 720 template <typename OuterPartitioner, typename NestedPartitioner> 721 class OuterParFor : utils::NoAssign { 722 bool myOuterIsolation; 723 std::atomic<bool> &myIsStolen; 724 public: 725 OuterParFor( bool outer_isolation, std::atomic<bool> &is_stolen ) : myOuterIsolation( outer_isolation ), myIsStolen( is_stolen ) {} 726 void operator()() const { 727 tbb::enumerable_thread_specific<int> ets( 0 ); 728 OuterPartitioner p; 729 tbb::parallel_for( 0, 1000, ParForBody<NestedPartitioner>( myOuterIsolation, ets, myIsStolen ), p ); 730 } 731 }; 732 733 template <typename OuterPartitioner, typename NestedPartitioner> 734 void TwoLoopsTest( bool outer_isolation ) { 735 std::atomic<bool> is_stolen; 736 is_stolen = false; 737 const int max_repeats = 100; 738 if ( outer_isolation ) { 739 for ( int i = 0; i <= max_repeats; ++i ) { 740 tbb::this_task_arena::isolate( OuterParFor<OuterPartitioner, NestedPartitioner>( outer_isolation, is_stolen ) ); 741 if ( is_stolen ) break; 742 } 743 // TODO: was ASSERT_WARNING 744 if (!is_stolen) { 745 REPORT("Warning: isolate() should not block stealing on nested levels without isolation\n"); 746 } 747 } else { 748 for ( int i = 0; i <= max_repeats; ++i ) { 749 OuterParFor<OuterPartitioner, NestedPartitioner>( outer_isolation, is_stolen )(); 750 } 751 REQUIRE_MESSAGE( !is_stolen, "isolate() on nested levels should prevent stealing from outer leves" ); 752 } 753 } 754 755 void TwoLoopsTest( bool outer_isolation ) { 756 TwoLoopsTest<tbb::simple_partitioner, tbb::simple_partitioner>( outer_isolation ); 757 TwoLoopsTest<tbb::simple_partitioner, tbb::affinity_partitioner>( outer_isolation ); 758 TwoLoopsTest<tbb::affinity_partitioner, tbb::simple_partitioner>( outer_isolation ); 759 TwoLoopsTest<tbb::affinity_partitioner, tbb::affinity_partitioner>( outer_isolation ); 760 } 761 762 void TwoLoopsTest() { 763 TwoLoopsTest( true ); 764 TwoLoopsTest( false ); 765 } 766 //--------------------------------------------------// 767 class HeavyMixTestBody : utils::NoAssign { 768 tbb::enumerable_thread_specific<utils::FastRandom<>>& myRandom; 769 tbb::enumerable_thread_specific<int>& myIsolatedLevel; 770 int myNestedLevel; 771 772 template <typename Partitioner, typename Body> 773 static void RunTwoBodies( utils::FastRandom<>& rnd, const Body &body, Partitioner& p, tbb::task_group_context* ctx = NULL ) { 774 if ( rnd.get() % 2 ) { 775 if (ctx ) 776 tbb::parallel_for( 0, 2, body, p, *ctx ); 777 else 778 tbb::parallel_for( 0, 2, body, p ); 779 } else { 780 tbb::parallel_invoke( body, body ); 781 } 782 } 783 784 template <typename Partitioner> 785 class IsolatedBody : utils::NoAssign { 786 const HeavyMixTestBody &myHeavyMixTestBody; 787 Partitioner &myPartitioner; 788 public: 789 IsolatedBody( const HeavyMixTestBody &body, Partitioner &partitioner ) 790 : myHeavyMixTestBody( body ), myPartitioner( partitioner ) {} 791 void operator()() const { 792 RunTwoBodies( myHeavyMixTestBody.myRandom.local(), 793 HeavyMixTestBody( myHeavyMixTestBody.myRandom, myHeavyMixTestBody.myIsolatedLevel, 794 myHeavyMixTestBody.myNestedLevel + 1 ), 795 myPartitioner ); 796 } 797 }; 798 799 template <typename Partitioner> 800 void RunNextLevel( utils::FastRandom<>& rnd, int &isolated_level ) const { 801 Partitioner p; 802 switch ( rnd.get() % 2 ) { 803 case 0: { 804 // No features 805 tbb::task_group_context ctx; 806 RunTwoBodies( rnd, HeavyMixTestBody(myRandom, myIsolatedLevel, myNestedLevel + 1), p, &ctx ); 807 break; 808 } 809 case 1: { 810 // Isolation 811 int previous_isolation = isolated_level; 812 isolated_level = myNestedLevel; 813 tbb::this_task_arena::isolate( IsolatedBody<Partitioner>( *this, p ) ); 814 isolated_level = previous_isolation; 815 break; 816 } 817 } 818 } 819 public: 820 HeavyMixTestBody( tbb::enumerable_thread_specific<utils::FastRandom<>>& random, 821 tbb::enumerable_thread_specific<int>& isolated_level, int nested_level ) 822 : myRandom( random ), myIsolatedLevel( isolated_level ) 823 , myNestedLevel( nested_level ) {} 824 void operator()() const { 825 int &isolated_level = myIsolatedLevel.local(); 826 CHECK_FAST_MESSAGE( myNestedLevel > isolated_level, "The outer-level task should not be stolen on isolated level" ); 827 if ( myNestedLevel == 20 ) 828 return; 829 utils::FastRandom<>& rnd = myRandom.local(); 830 if ( rnd.get() % 2 == 1 ) { 831 RunNextLevel<tbb::auto_partitioner>( rnd, isolated_level ); 832 } else { 833 RunNextLevel<tbb::affinity_partitioner>( rnd, isolated_level ); 834 } 835 } 836 void operator()(int) const { 837 this->operator()(); 838 } 839 }; 840 841 struct RandomInitializer { 842 utils::FastRandom<> operator()() { 843 return utils::FastRandom<>( tbb::this_task_arena::current_thread_index() ); 844 } 845 }; 846 847 void HeavyMixTest() { 848 std::size_t num_threads = tbb::this_task_arena::max_concurrency() < 3 ? 3 : tbb::this_task_arena::max_concurrency(); 849 tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, num_threads); 850 851 RandomInitializer init_random; 852 tbb::enumerable_thread_specific<utils::FastRandom<>> random( init_random ); 853 tbb::enumerable_thread_specific<int> isolated_level( 0 ); 854 for ( int i = 0; i < 5; ++i ) { 855 HeavyMixTestBody b( random, isolated_level, 1 ); 856 b( 0 ); 857 } 858 } 859 860 //--------------------------------------------------// 861 #if TBB_USE_EXCEPTIONS 862 struct MyException {}; 863 struct IsolatedBodyThrowsException { 864 void operator()() const { 865 #if _MSC_VER && !__INTEL_COMPILER 866 // Workaround an unreachable code warning in task_arena_function. 867 volatile bool workaround = true; 868 if (workaround) 869 #endif 870 { 871 throw MyException(); 872 } 873 } 874 }; 875 struct ExceptionTestBody : utils::NoAssign { 876 tbb::enumerable_thread_specific<int>& myEts; 877 std::atomic<bool>& myIsStolen; 878 ExceptionTestBody( tbb::enumerable_thread_specific<int>& ets, std::atomic<bool>& is_stolen ) 879 : myEts( ets ), myIsStolen( is_stolen ) {} 880 void operator()( int i ) const { 881 try { 882 tbb::this_task_arena::isolate( IsolatedBodyThrowsException() ); 883 REQUIRE_MESSAGE( false, "The exception has been lost" ); 884 } 885 catch ( MyException ) {} 886 catch ( ... ) { 887 REQUIRE_MESSAGE( false, "Unexpected exception" ); 888 } 889 // Check that nested algorithms can steal outer-level tasks 890 int &e = myEts.local(); 891 if ( e++ > 0 ) myIsStolen = true; 892 // work imbalance increases chances for stealing 893 tbb::parallel_for( 0, 10+i, utils::DummyBody( 100 ) ); 894 --e; 895 } 896 }; 897 898 #endif /* TBB_USE_EXCEPTIONS */ 899 void ExceptionTest() { 900 #if TBB_USE_EXCEPTIONS 901 tbb::enumerable_thread_specific<int> ets; 902 std::atomic<bool> is_stolen; 903 is_stolen = false; 904 for ( ;; ) { 905 tbb::parallel_for( 0, 1000, ExceptionTestBody( ets, is_stolen ) ); 906 if ( is_stolen ) break; 907 } 908 REQUIRE_MESSAGE( is_stolen, "isolate should not affect non-isolated work" ); 909 #endif /* TBB_USE_EXCEPTIONS */ 910 } 911 912 struct NonConstBody { 913 unsigned int state; 914 void operator()() { 915 state ^= ~0u; 916 } 917 }; 918 919 void TestNonConstBody() { 920 NonConstBody body; 921 body.state = 0x6c97d5ed; 922 tbb::this_task_arena::isolate(body); 923 REQUIRE_MESSAGE(body.state == 0x93682a12, "The wrong state"); 924 } 925 926 // TODO: Consider tbb::task_group instead of explicit task API. 927 class TestEnqueueTask : public tbb::detail::d1::task { 928 using wait_context = tbb::detail::d1::wait_context; 929 930 tbb::enumerable_thread_specific<bool>& executed; 931 std::atomic<int>& completed; 932 933 public: 934 wait_context& waiter; 935 tbb::task_arena& arena; 936 static const int N = 100; 937 938 TestEnqueueTask(tbb::enumerable_thread_specific<bool>& exe, std::atomic<int>& c, wait_context& w, tbb::task_arena& a) 939 : executed(exe), completed(c), waiter(w), arena(a) {} 940 941 tbb::detail::d1::task* execute(tbb::detail::d1::execution_data&) override { 942 for (int i = 0; i < N; ++i) { 943 arena.enqueue([&]() { 944 executed.local() = true; 945 ++completed; 946 for (int j = 0; j < 100; j++) utils::yield(); 947 waiter.release(1); 948 }); 949 } 950 return nullptr; 951 } 952 tbb::detail::d1::task* cancel(tbb::detail::d1::execution_data&) override { return nullptr; } 953 }; 954 955 class TestEnqueueIsolateBody : utils::NoCopy { 956 tbb::enumerable_thread_specific<bool>& executed; 957 std::atomic<int>& completed; 958 tbb::task_arena& arena; 959 public: 960 static const int N = 100; 961 962 TestEnqueueIsolateBody(tbb::enumerable_thread_specific<bool>& exe, std::atomic<int>& c, tbb::task_arena& a) 963 : executed(exe), completed(c), arena(a) {} 964 void operator()() { 965 tbb::task_group_context ctx; 966 tbb::detail::d1::wait_context waiter(N); 967 968 TestEnqueueTask root(executed, completed, waiter, arena); 969 tbb::detail::d1::execute_and_wait(root, ctx, waiter, ctx); 970 } 971 }; 972 973 void TestEnqueue() { 974 tbb::enumerable_thread_specific<bool> executed(false); 975 std::atomic<int> completed; 976 tbb::task_arena arena{tbb::task_arena::attach()}; 977 978 // Check that the main thread can process enqueued tasks. 979 completed = 0; 980 TestEnqueueIsolateBody b1(executed, completed, arena); 981 b1(); 982 983 if (!executed.local()) { 984 REPORT("Warning: No one enqueued task has executed by the main thread.\n"); 985 } 986 987 executed.local() = false; 988 completed = 0; 989 const int N = 100; 990 // Create enqueued tasks out of isolation. 991 992 tbb::task_group_context ctx; 993 tbb::detail::d1::wait_context waiter(N); 994 for (int i = 0; i < N; ++i) { 995 arena.enqueue([&]() { 996 executed.local() = true; 997 ++completed; 998 utils::yield(); 999 waiter.release(1); 1000 }); 1001 } 1002 TestEnqueueIsolateBody b2(executed, completed, arena); 1003 tbb::this_task_arena::isolate(b2); 1004 REQUIRE_MESSAGE(executed.local() == false, "An enqueued task was executed within isolate."); 1005 1006 tbb::detail::d1::wait(waiter, ctx); 1007 // while (completed < TestEnqueueTask::N + N) utils::yield(); 1008 } 1009 } 1010 1011 void TestIsolatedExecute() { 1012 // At least 3 threads (owner + 2 thieves) are required to reproduce a situation when the owner steals outer 1013 // level task on a nested level. If we have only one thief then it will execute outer level tasks first and 1014 // the owner will not have a possibility to steal outer level tasks. 1015 int platform_max_thread = tbb::this_task_arena::max_concurrency(); 1016 int num_threads = utils::min( platform_max_thread, 3 ); 1017 { 1018 // Too many threads require too many work to reproduce the stealing from outer level. 1019 tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, utils::max(num_threads, 7)); 1020 TestIsolatedExecuteNS::TwoLoopsTest(); 1021 TestIsolatedExecuteNS::HeavyMixTest(); 1022 TestIsolatedExecuteNS::ExceptionTest(); 1023 } 1024 tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, num_threads); 1025 TestIsolatedExecuteNS::HeavyMixTest(); 1026 TestIsolatedExecuteNS::TestNonConstBody(); 1027 TestIsolatedExecuteNS::TestEnqueue(); 1028 } 1029 1030 //-----------------------------------------------------------------------------------------// 1031 1032 class TestDelegatedSpawnWaitBody : utils::NoAssign { 1033 tbb::task_arena &my_a; 1034 utils::SpinBarrier &my_b1, &my_b2; 1035 public: 1036 TestDelegatedSpawnWaitBody( tbb::task_arena &a, utils::SpinBarrier &b1, utils::SpinBarrier &b2) 1037 : my_a(a), my_b1(b1), my_b2(b2) {} 1038 // NativeParallelFor's functor 1039 void operator()(int idx) const { 1040 if ( idx==0 ) { // thread 0 works in the arena, thread 1 waits for it (to prevent test hang) 1041 for (int i = 0; i < 2; ++i) { 1042 my_a.enqueue([this] { my_b1.wait(); }); // tasks to sync with workers 1043 } 1044 tbb::task_group tg; 1045 my_b1.wait(); // sync with the workers 1046 for( int i=0; i<100000; ++i) { 1047 my_a.execute([&tg] { tg.run([] {}); }); 1048 } 1049 my_a.execute([&tg] {tg.wait(); }); 1050 } 1051 1052 my_b2.wait(); // sync both threads 1053 } 1054 }; 1055 1056 void TestDelegatedSpawnWait() { 1057 if (tbb::this_task_arena::max_concurrency() < 3) { 1058 // The test requires at least 2 worker threads 1059 return; 1060 } 1061 // Regression test for a bug with missed wakeup notification from a delegated task 1062 tbb::task_arena a(2,0); 1063 a.initialize(); 1064 utils::SpinBarrier barrier1(3), barrier2(2); 1065 utils::NativeParallelFor( 2, TestDelegatedSpawnWaitBody(a, barrier1, barrier2) ); 1066 a.debug_wait_until_empty(); 1067 } 1068 1069 //-----------------------------------------------------------------------------------------// 1070 1071 class TestMultipleWaitsArenaWait : utils::NoAssign { 1072 using wait_context = tbb::detail::d1::wait_context; 1073 public: 1074 TestMultipleWaitsArenaWait( int idx, int bunch_size, int num_tasks, std::vector<wait_context*>& waiters, std::atomic<int>& processed, tbb::task_group_context& tgc ) 1075 : my_idx( idx ), my_bunch_size( bunch_size ), my_num_tasks(num_tasks), my_waiters( waiters ), my_processed( processed ), my_context(tgc) {} 1076 void operator()() const { 1077 ++my_processed; 1078 // Wait for all tasks 1079 if ( my_idx < my_num_tasks ) { 1080 tbb::detail::d1::wait(*my_waiters[my_idx], my_context); 1081 } 1082 // Signal waiting tasks 1083 if ( my_idx >= my_bunch_size ) { 1084 my_waiters[my_idx-my_bunch_size]->release(); 1085 } 1086 } 1087 private: 1088 int my_idx; 1089 int my_bunch_size; 1090 int my_num_tasks; 1091 std::vector<wait_context*>& my_waiters; 1092 std::atomic<int>& my_processed; 1093 tbb::task_group_context& my_context; 1094 }; 1095 1096 class TestMultipleWaitsThreadBody : utils::NoAssign { 1097 using wait_context = tbb::detail::d1::wait_context; 1098 public: 1099 TestMultipleWaitsThreadBody( int bunch_size, int num_tasks, tbb::task_arena& a, std::vector<wait_context*>& waiters, std::atomic<int>& processed, tbb::task_group_context& tgc ) 1100 : my_bunch_size( bunch_size ), my_num_tasks( num_tasks ), my_arena( a ), my_waiters( waiters ), my_processed( processed ), my_context(tgc) {} 1101 void operator()( int idx ) const { 1102 my_arena.execute( TestMultipleWaitsArenaWait( idx, my_bunch_size, my_num_tasks, my_waiters, my_processed, my_context ) ); 1103 --my_processed; 1104 } 1105 private: 1106 int my_bunch_size; 1107 int my_num_tasks; 1108 tbb::task_arena& my_arena; 1109 std::vector<wait_context*>& my_waiters; 1110 std::atomic<int>& my_processed; 1111 tbb::task_group_context& my_context; 1112 }; 1113 1114 void TestMultipleWaits( int num_threads, int num_bunches, int bunch_size ) { 1115 tbb::task_arena a( num_threads ); 1116 const int num_tasks = (num_bunches-1)*bunch_size; 1117 1118 tbb::task_group_context tgc; 1119 std::vector<tbb::detail::d1::wait_context*> waiters(num_tasks); 1120 for (auto& w : waiters) w = new tbb::detail::d1::wait_context(0); 1121 1122 std::atomic<int> processed(0); 1123 for ( int repeats = 0; repeats<10; ++repeats ) { 1124 int idx = 0; 1125 for ( int bunch = 0; bunch < num_bunches-1; ++bunch ) { 1126 // Sync with the previous bunch of waiters to prevent "false" nested dependicies (when a nested task waits for an outer task). 1127 while ( processed < bunch*bunch_size ) utils::yield(); 1128 // Run the bunch of threads/waiters that depend on the next bunch of threads/waiters. 1129 for ( int i = 0; i<bunch_size; ++i ) { 1130 waiters[idx]->reserve(); 1131 std::thread( TestMultipleWaitsThreadBody( bunch_size, num_tasks, a, waiters, processed, tgc ), idx++ ).detach(); 1132 } 1133 } 1134 // No sync because the threads of the last bunch do not call wait_for_all. 1135 // Run the last bunch of threads. 1136 for ( int i = 0; i<bunch_size; ++i ) 1137 std::thread( TestMultipleWaitsThreadBody( bunch_size, num_tasks, a, waiters, processed, tgc ), idx++ ).detach(); 1138 while ( processed ) utils::yield(); 1139 } 1140 for (auto w : waiters) delete w; 1141 } 1142 1143 void TestMultipleWaits() { 1144 // Limit the number of threads to prevent heavy oversubscription. 1145 #if TBB_TEST_LOW_WORKLOAD 1146 const int max_threads = std::min( 4, tbb::this_task_arena::max_concurrency() ); 1147 #else 1148 const int max_threads = std::min( 16, tbb::this_task_arena::max_concurrency() ); 1149 #endif 1150 1151 utils::FastRandom<> rnd(1234); 1152 for ( int threads = 1; threads <= max_threads; threads += utils::max( threads/2, 1 ) ) { 1153 for ( int i = 0; i<3; ++i ) { 1154 const int num_bunches = 3 + rnd.get()%3; 1155 const int bunch_size = max_threads + rnd.get()%max_threads; 1156 TestMultipleWaits( threads, num_bunches, bunch_size ); 1157 } 1158 } 1159 } 1160 1161 //--------------------------------------------------// 1162 1163 void TestSmallStackSize() { 1164 tbb::global_control gc(tbb::global_control::thread_stack_size, 1165 tbb::global_control::active_value(tbb::global_control::thread_stack_size) / 2 ); 1166 // The test produces the warning (not a error) if fails. So the test is run many times 1167 // to make the log annoying (to force to consider it as an error). 1168 for (int i = 0; i < 100; ++i) { 1169 tbb::task_arena a; 1170 a.initialize(); 1171 } 1172 } 1173 1174 //--------------------------------------------------// 1175 1176 namespace TestMoveSemanticsNS { 1177 struct TestFunctor { 1178 void operator()() const {}; 1179 }; 1180 1181 struct MoveOnlyFunctor : utils::MoveOnly, TestFunctor { 1182 MoveOnlyFunctor() : utils::MoveOnly() {}; 1183 MoveOnlyFunctor(MoveOnlyFunctor&& other) : utils::MoveOnly(std::move(other)) {}; 1184 }; 1185 1186 struct MovePreferableFunctor : utils::Movable, TestFunctor { 1187 MovePreferableFunctor() : utils::Movable() {}; 1188 MovePreferableFunctor(MovePreferableFunctor&& other) : utils::Movable( std::move(other) ) {}; 1189 MovePreferableFunctor(const MovePreferableFunctor& other) : utils::Movable(other) {}; 1190 }; 1191 1192 struct NoMoveNoCopyFunctor : utils::NoCopy, TestFunctor { 1193 NoMoveNoCopyFunctor() : utils::NoCopy() {}; 1194 // mv ctor is not allowed as cp ctor from parent NoCopy 1195 private: 1196 NoMoveNoCopyFunctor(NoMoveNoCopyFunctor&&); 1197 }; 1198 1199 void TestFunctors() { 1200 tbb::task_arena ta; 1201 MovePreferableFunctor mpf; 1202 // execute() doesn't have any copies or moves of arguments inside the impl 1203 ta.execute( NoMoveNoCopyFunctor() ); 1204 1205 ta.enqueue( MoveOnlyFunctor() ); 1206 ta.enqueue( mpf ); 1207 REQUIRE_MESSAGE(mpf.alive, "object was moved when was passed by lval"); 1208 mpf.Reset(); 1209 ta.enqueue( std::move(mpf) ); 1210 REQUIRE_MESSAGE(!mpf.alive, "object was copied when was passed by rval"); 1211 mpf.Reset(); 1212 } 1213 } 1214 1215 void TestMoveSemantics() { 1216 TestMoveSemanticsNS::TestFunctors(); 1217 } 1218 1219 //--------------------------------------------------// 1220 1221 #include <vector> 1222 1223 #include "common/state_trackable.h" 1224 1225 namespace TestReturnValueNS { 1226 struct noDefaultTag {}; 1227 class ReturnType : public StateTrackable<> { 1228 static const int SIZE = 42; 1229 std::vector<int> data; 1230 public: 1231 ReturnType(noDefaultTag) : StateTrackable<>(0) {} 1232 // Define copy constructor to test that it is never called 1233 ReturnType(const ReturnType& r) : StateTrackable<>(r), data(r.data) {} 1234 ReturnType(ReturnType&& r) : StateTrackable<>(std::move(r)), data(std::move(r.data)) {} 1235 1236 void fill() { 1237 for (int i = 0; i < SIZE; ++i) 1238 data.push_back(i); 1239 } 1240 void check() { 1241 REQUIRE(data.size() == unsigned(SIZE)); 1242 for (int i = 0; i < SIZE; ++i) 1243 REQUIRE(data[i] == i); 1244 StateTrackableCounters::counters_type& cnts = StateTrackableCounters::counters; 1245 REQUIRE(cnts[StateTrackableBase::DefaultInitialized] == 0); 1246 REQUIRE(cnts[StateTrackableBase::DirectInitialized] == 1); 1247 std::size_t copied = cnts[StateTrackableBase::CopyInitialized]; 1248 std::size_t moved = cnts[StateTrackableBase::MoveInitialized]; 1249 REQUIRE(cnts[StateTrackableBase::Destroyed] == copied + moved); 1250 // The number of copies/moves should not exceed 3 if copy elision takes a place: 1251 // function return, store to an internal storage, acquire internal storage. 1252 // For compilation, without copy elision, this number may be grown up to 7. 1253 REQUIRE((copied == 0 && moved <= 7)); 1254 WARN_MESSAGE(moved <= 3, 1255 "Warning: The number of copies/moves should not exceed 3 if copy elision takes a place." 1256 "Take an attention to this warning only if copy elision is enabled." 1257 ); 1258 } 1259 }; 1260 1261 template <typename R> 1262 R function() { 1263 noDefaultTag tag; 1264 R r(tag); 1265 r.fill(); 1266 return r; 1267 } 1268 1269 template <> 1270 void function<void>() {} 1271 1272 template <typename R> 1273 struct Functor { 1274 R operator()() const { 1275 return function<R>(); 1276 } 1277 }; 1278 1279 tbb::task_arena& arena() { 1280 static tbb::task_arena a; 1281 return a; 1282 } 1283 1284 template <typename F> 1285 void TestExecute(F &f) { 1286 StateTrackableCounters::reset(); 1287 ReturnType r{arena().execute(f)}; 1288 r.check(); 1289 } 1290 1291 template <typename F> 1292 void TestExecute(const F &f) { 1293 StateTrackableCounters::reset(); 1294 ReturnType r{arena().execute(f)}; 1295 r.check(); 1296 } 1297 template <typename F> 1298 void TestIsolate(F &f) { 1299 StateTrackableCounters::reset(); 1300 ReturnType r{tbb::this_task_arena::isolate(f)}; 1301 r.check(); 1302 } 1303 1304 template <typename F> 1305 void TestIsolate(const F &f) { 1306 StateTrackableCounters::reset(); 1307 ReturnType r{tbb::this_task_arena::isolate(f)}; 1308 r.check(); 1309 } 1310 1311 void Test() { 1312 TestExecute(Functor<ReturnType>()); 1313 Functor<ReturnType> f1; 1314 TestExecute(f1); 1315 TestExecute(function<ReturnType>); 1316 1317 arena().execute(Functor<void>()); 1318 Functor<void> f2; 1319 arena().execute(f2); 1320 arena().execute(function<void>); 1321 TestIsolate(Functor<ReturnType>()); 1322 TestIsolate(f1); 1323 TestIsolate(function<ReturnType>); 1324 tbb::this_task_arena::isolate(Functor<void>()); 1325 tbb::this_task_arena::isolate(f2); 1326 tbb::this_task_arena::isolate(function<void>); 1327 } 1328 } 1329 1330 void TestReturnValue() { 1331 TestReturnValueNS::Test(); 1332 } 1333 1334 //--------------------------------------------------// 1335 1336 // MyObserver checks if threads join to the same arena 1337 struct MyObserver: public tbb::task_scheduler_observer { 1338 tbb::enumerable_thread_specific<tbb::task_arena*>& my_tls; 1339 tbb::task_arena& my_arena; 1340 std::atomic<int>& my_failure_counter; 1341 std::atomic<int>& my_counter; 1342 utils::SpinBarrier& m_barrier; 1343 1344 MyObserver(tbb::task_arena& a, 1345 tbb::enumerable_thread_specific<tbb::task_arena*>& tls, 1346 std::atomic<int>& failure_counter, 1347 std::atomic<int>& counter, 1348 utils::SpinBarrier& barrier) 1349 : tbb::task_scheduler_observer(a), my_tls(tls), my_arena(a), 1350 my_failure_counter(failure_counter), my_counter(counter), m_barrier(barrier) { 1351 observe(true); 1352 } 1353 ~MyObserver(){ 1354 observe(false); 1355 } 1356 void on_scheduler_entry(bool worker) override { 1357 if (worker) { 1358 ++my_counter; 1359 tbb::task_arena*& cur_arena = my_tls.local(); 1360 if (cur_arena != 0 && cur_arena != &my_arena) { 1361 ++my_failure_counter; 1362 } 1363 cur_arena = &my_arena; 1364 m_barrier.wait(); 1365 } 1366 } 1367 void on_scheduler_exit(bool worker) override { 1368 if (worker) { 1369 m_barrier.wait(); // before wakeup 1370 m_barrier.wait(); // after wakeup 1371 } 1372 } 1373 }; 1374 1375 void TestArenaWorkersMigrationWithNumThreads(int n_threads = 0) { 1376 if (n_threads == 0) { 1377 n_threads = tbb::this_task_arena::max_concurrency(); 1378 } 1379 1380 const int max_n_arenas = 8; 1381 int n_arenas = 2; 1382 if(n_threads > 16) { 1383 n_arenas = max_n_arenas; 1384 } else if (n_threads > 8) { 1385 n_arenas = 4; 1386 } 1387 1388 int n_workers = n_threads - 1; 1389 n_workers = n_arenas * (n_workers / n_arenas); 1390 if (n_workers == 0) { 1391 return; 1392 } 1393 1394 n_threads = n_workers + 1; 1395 tbb::global_control control(tbb::global_control::max_allowed_parallelism, n_threads); 1396 1397 const int n_repetitions = 20; 1398 const int n_outer_repetitions = 100; 1399 std::multiset<float> failure_ratio; // for median calculating 1400 utils::SpinBarrier barrier(n_threads); 1401 utils::SpinBarrier worker_barrier(n_workers); 1402 MyObserver* observer[max_n_arenas]; 1403 std::vector<tbb::task_arena> arenas(n_arenas); 1404 std::atomic<int> failure_counter; 1405 std::atomic<int> counter; 1406 tbb::enumerable_thread_specific<tbb::task_arena*> tls; 1407 1408 for (int i = 0; i < n_arenas; ++i) { 1409 arenas[i].initialize(n_workers / n_arenas + 1); // +1 for master 1410 observer[i] = new MyObserver(arenas[i], tls, failure_counter, counter, barrier); 1411 } 1412 1413 int ii = 0; 1414 for (; ii < n_outer_repetitions; ++ii) { 1415 failure_counter = 0; 1416 counter = 0; 1417 1418 // Main code 1419 auto wakeup = [&arenas] { for (auto& a : arenas) a.enqueue([]{}); }; 1420 wakeup(); 1421 for (int j = 0; j < n_repetitions; ++j) { 1422 barrier.wait(); // entry 1423 barrier.wait(); // exit1 1424 wakeup(); 1425 barrier.wait(); // exit2 1426 } 1427 barrier.wait(); // entry 1428 barrier.wait(); // exit1 1429 barrier.wait(); // exit2 1430 1431 failure_ratio.insert(float(failure_counter) / counter); 1432 tls.clear(); 1433 // collect 3 elements in failure_ratio before calculating median 1434 if (ii > 1) { 1435 std::multiset<float>::iterator it = failure_ratio.begin(); 1436 std::advance(it, failure_ratio.size() / 2); 1437 if (*it < 0.02) 1438 break; 1439 } 1440 } 1441 for (int i = 0; i < n_arenas; ++i) { 1442 delete observer[i]; 1443 } 1444 // check if median is so big 1445 std::multiset<float>::iterator it = failure_ratio.begin(); 1446 std::advance(it, failure_ratio.size() / 2); 1447 // TODO: decrease constants 0.05 and 0.3 by setting ratio between n_threads and n_arenas 1448 if (*it > 0.05) { 1449 REPORT("Warning: So many cases when threads join to different arenas.\n"); 1450 REQUIRE_MESSAGE(*it <= 0.3, "A lot of cases when threads join to different arenas.\n"); 1451 } 1452 } 1453 1454 void TestArenaWorkersMigration() { 1455 TestArenaWorkersMigrationWithNumThreads(4); 1456 if (tbb::this_task_arena::max_concurrency() != 4) { 1457 TestArenaWorkersMigrationWithNumThreads(); 1458 } 1459 } 1460 1461 //--------------------------------------------------// 1462 void TestDefaultCreatedWorkersAmount() { 1463 int threads = tbb::this_task_arena::max_concurrency(); 1464 utils::NativeParallelFor(1, [threads](int idx) { 1465 REQUIRE_MESSAGE(idx == 0, "more than 1 thread is going to reset TLS"); 1466 utils::SpinBarrier barrier(threads); 1467 ResetTLS(); 1468 for (auto blocked : { false, true }) { 1469 for (int trail = 0; trail < (blocked ? 10 : 10000); ++trail) { 1470 tbb::parallel_for(0, threads, [threads, blocked, &barrier](int) { 1471 CHECK_FAST_MESSAGE(threads == tbb::this_task_arena::max_concurrency(), "concurrency level is not equal specified threadnum"); 1472 CHECK_FAST_MESSAGE(tbb::this_task_arena::current_thread_index() < tbb::this_task_arena::max_concurrency(), "amount of created threads is more than specified by default"); 1473 local_id.local() = 1; 1474 if (blocked) { 1475 // If there is more threads than expected, 'sleep' gives a chance to join unexpected threads. 1476 utils::Sleep(1); 1477 barrier.wait(); 1478 } 1479 }, tbb::simple_partitioner()); 1480 REQUIRE_MESSAGE(local_id.size() <= size_t(threads), "amount of created threads is not equal to default num"); 1481 if (blocked) { 1482 REQUIRE_MESSAGE(local_id.size() == size_t(threads), "amount of created threads is not equal to default num"); 1483 } 1484 } 1485 } 1486 }); 1487 } 1488 1489 void TestAbilityToCreateWorkers(int thread_num) { 1490 tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, thread_num); 1491 // Checks only some part of reserved-external threads amount: 1492 // 0 and 1 reserved threads are important cases but it is also needed 1493 // to collect some statistic data with other amount and to not consume 1494 // whole test sesion time checking each amount 1495 TestArenaConcurrency(thread_num - 1, 0, int(thread_num / 2.72)); 1496 TestArenaConcurrency(thread_num, 1, int(thread_num / 3.14)); 1497 } 1498 1499 void TestDefaultWorkersLimit() { 1500 TestDefaultCreatedWorkersAmount(); 1501 #if TBB_TEST_LOW_WORKLOAD 1502 TestAbilityToCreateWorkers(24); 1503 #else 1504 TestAbilityToCreateWorkers(256); 1505 #endif 1506 } 1507 1508 #if TBB_USE_EXCEPTIONS 1509 1510 void ExceptionInExecute() { 1511 std::size_t thread_number = utils::get_platform_max_threads(); 1512 int arena_concurrency = static_cast<int>(thread_number) / 2; 1513 tbb::task_arena test_arena(arena_concurrency, arena_concurrency); 1514 1515 std::atomic<int> canceled_task{}; 1516 1517 auto parallel_func = [&test_arena, &canceled_task] (std::size_t) { 1518 for (std::size_t i = 0; i < 1000; ++i) { 1519 try { 1520 test_arena.execute([] { 1521 volatile bool suppress_unreachable_code_warning = true; 1522 if (suppress_unreachable_code_warning) { 1523 throw -1; 1524 } 1525 }); 1526 FAIL("An exception should have thrown."); 1527 } catch (int) { 1528 ++canceled_task; 1529 } catch (...) { 1530 FAIL("Wrong type of exception."); 1531 } 1532 } 1533 }; 1534 1535 utils::NativeParallelFor(thread_number, parallel_func); 1536 CHECK(canceled_task == thread_number * 1000); 1537 } 1538 1539 #endif // TBB_USE_EXCEPTIONS 1540 1541 class simple_observer : public tbb::task_scheduler_observer { 1542 static std::atomic<int> idx_counter; 1543 int my_idx; 1544 int myMaxConcurrency; // concurrency of the associated arena 1545 int myNumReservedSlots; // reserved slots in the associated arena 1546 void on_scheduler_entry( bool is_worker ) override { 1547 int current_index = tbb::this_task_arena::current_thread_index(); 1548 CHECK(current_index < (myMaxConcurrency > 1 ? myMaxConcurrency : 2)); 1549 if (is_worker) { 1550 CHECK(current_index >= myNumReservedSlots); 1551 } 1552 } 1553 void on_scheduler_exit( bool /*is_worker*/ ) override 1554 {} 1555 public: 1556 simple_observer(tbb::task_arena &a, int maxConcurrency, int numReservedSlots) 1557 : tbb::task_scheduler_observer(a), my_idx(idx_counter++) 1558 , myMaxConcurrency(maxConcurrency) 1559 , myNumReservedSlots(numReservedSlots) { 1560 observe(true); 1561 } 1562 1563 ~simple_observer(){ 1564 observe(false); 1565 } 1566 1567 friend bool operator<(const simple_observer& lhs, const simple_observer& rhs) { 1568 return lhs.my_idx < rhs.my_idx; 1569 } 1570 }; 1571 1572 std::atomic<int> simple_observer::idx_counter{}; 1573 1574 struct arena_handler { 1575 enum arena_status { 1576 alive, 1577 deleting, 1578 deleted 1579 }; 1580 1581 tbb::task_arena* arena; 1582 1583 std::atomic<arena_status> status{alive}; 1584 tbb::spin_rw_mutex arena_in_use{}; 1585 1586 tbb::concurrent_set<simple_observer> observers; 1587 1588 arena_handler(tbb::task_arena* ptr) : arena(ptr) 1589 {} 1590 1591 friend bool operator<(const arena_handler& lhs, const arena_handler& rhs) { 1592 return lhs.arena < rhs.arena; 1593 } 1594 }; 1595 1596 // TODO: Add observer operations 1597 void StressTestMixFunctionality() { 1598 enum operation_type { 1599 create_arena, 1600 delete_arena, 1601 attach_observer, 1602 detach_observer, 1603 arena_execute, 1604 enqueue_task, 1605 last_operation_marker 1606 }; 1607 1608 std::size_t operations_number = last_operation_marker; 1609 std::size_t thread_number = utils::get_platform_max_threads(); 1610 utils::FastRandom<> operation_rnd(42); 1611 tbb::spin_mutex random_operation_guard; 1612 1613 auto get_random_operation = [&operation_rnd, &random_operation_guard, operations_number] () { 1614 tbb::spin_mutex::scoped_lock lock(random_operation_guard); 1615 return static_cast<operation_type>(operation_rnd.get() % operations_number); 1616 }; 1617 1618 utils::FastRandom<> arena_rnd(42); 1619 tbb::spin_mutex random_arena_guard; 1620 auto get_random_arena = [&arena_rnd, &random_arena_guard] () { 1621 tbb::spin_mutex::scoped_lock lock(random_arena_guard); 1622 return arena_rnd.get(); 1623 }; 1624 1625 tbb::concurrent_set<arena_handler> arenas_pool; 1626 1627 std::vector<std::thread> thread_pool; 1628 1629 utils::SpinBarrier thread_barrier(thread_number); 1630 std::size_t max_operations = 20000; 1631 std::atomic<std::size_t> curr_operation{}; 1632 1633 auto find_arena = [&arenas_pool](tbb::spin_rw_mutex::scoped_lock& lock) -> decltype(arenas_pool.begin()) { 1634 for (auto curr_arena = arenas_pool.begin(); curr_arena != arenas_pool.end(); ++curr_arena) { 1635 if (lock.try_acquire(curr_arena->arena_in_use, /*writer*/ false)) { 1636 if (curr_arena->status == arena_handler::alive) { 1637 return curr_arena; 1638 } 1639 else { 1640 lock.release(); 1641 } 1642 } 1643 } 1644 return arenas_pool.end(); 1645 }; 1646 1647 auto thread_func = [&] () { 1648 arenas_pool.emplace(new tbb::task_arena()); 1649 thread_barrier.wait(); 1650 while (curr_operation++ < max_operations) { 1651 switch (get_random_operation()) { 1652 case create_arena : 1653 { 1654 arenas_pool.emplace(new tbb::task_arena()); 1655 break; 1656 } 1657 case delete_arena : 1658 { 1659 auto curr_arena = arenas_pool.begin(); 1660 for (; curr_arena != arenas_pool.end(); ++curr_arena) { 1661 arena_handler::arena_status curr_status = arena_handler::alive; 1662 if (curr_arena->status.compare_exchange_strong(curr_status, arena_handler::deleting)) { 1663 break; 1664 } 1665 } 1666 1667 if (curr_arena == arenas_pool.end()) break; 1668 1669 tbb::spin_rw_mutex::scoped_lock lock(curr_arena->arena_in_use, /*writer*/ true); 1670 1671 delete curr_arena->arena; 1672 curr_arena->status.store(arena_handler::deleted); 1673 1674 break; 1675 } 1676 case attach_observer : 1677 { 1678 tbb::spin_rw_mutex::scoped_lock lock{}; 1679 1680 auto curr_arena = find_arena(lock); 1681 if (curr_arena != arenas_pool.end()) { 1682 curr_arena->observers.emplace(*curr_arena->arena, thread_number, 1); 1683 } 1684 break; 1685 } 1686 case detach_observer: 1687 { 1688 auto arena_number = get_random_arena() % arenas_pool.size(); 1689 auto curr_arena = arenas_pool.begin(); 1690 std::advance(curr_arena, arena_number); 1691 1692 for (auto it = curr_arena->observers.begin(); it != curr_arena->observers.end(); ++it) { 1693 if (it->is_observing()) { 1694 it->observe(false); 1695 break; 1696 } 1697 } 1698 1699 break; 1700 } 1701 case arena_execute: 1702 { 1703 tbb::spin_rw_mutex::scoped_lock lock{}; 1704 auto curr_arena = find_arena(lock); 1705 1706 if (curr_arena != arenas_pool.end()) { 1707 curr_arena->arena->execute([]() { 1708 tbb::affinity_partitioner aff; 1709 tbb::parallel_for(0, 10000, utils::DummyBody(10), tbb::auto_partitioner{}); 1710 tbb::parallel_for(0, 10000, utils::DummyBody(10), aff); 1711 }); 1712 } 1713 1714 break; 1715 } 1716 case enqueue_task: 1717 { 1718 tbb::spin_rw_mutex::scoped_lock lock{}; 1719 auto curr_arena = find_arena(lock); 1720 1721 if (curr_arena != arenas_pool.end()) { 1722 curr_arena->arena->enqueue([] { utils::doDummyWork(1000); }); 1723 } 1724 1725 break; 1726 } 1727 case last_operation_marker : 1728 break; 1729 } 1730 } 1731 }; 1732 1733 for (std::size_t i = 0; i < thread_number - 1; ++i) { 1734 thread_pool.emplace_back(thread_func); 1735 } 1736 1737 thread_func(); 1738 1739 for (std::size_t i = 0; i < thread_number - 1; ++i) { 1740 if (thread_pool[i].joinable()) thread_pool[i].join(); 1741 } 1742 1743 for (auto& handler : arenas_pool) { 1744 if (handler.status != arena_handler::deleted) delete handler.arena; 1745 } 1746 } 1747 1748 struct enqueue_test_helper { 1749 enqueue_test_helper(tbb::task_arena& arena, tbb::enumerable_thread_specific<bool>& ets , std::atomic<std::size_t>& task_counter) 1750 : my_arena(arena), my_ets(ets), my_task_counter(task_counter) 1751 {} 1752 1753 enqueue_test_helper(const enqueue_test_helper& ef) : my_arena(ef.my_arena), my_ets(ef.my_ets), my_task_counter(ef.my_task_counter) 1754 {} 1755 1756 void operator() () const { 1757 CHECK(my_ets.local()); 1758 if (my_task_counter++ < 100000) my_arena.enqueue(enqueue_test_helper(my_arena, my_ets, my_task_counter)); 1759 utils::yield(); 1760 } 1761 1762 tbb::task_arena& my_arena; 1763 tbb::enumerable_thread_specific<bool>& my_ets; 1764 std::atomic<std::size_t>& my_task_counter; 1765 }; 1766 1767 //--------------------------------------------------// 1768 1769 // This test requires TBB in an uninitialized state 1770 //! \brief \ref requirement 1771 TEST_CASE("task_arena initialize soft limit ignoring affinity mask") { 1772 REQUIRE_MESSAGE((tbb::this_task_arena::current_thread_index() == tbb::task_arena::not_initialized), "TBB was initialized state"); 1773 tbb::enumerable_thread_specific<int> ets; 1774 1775 tbb::task_arena arena(int(utils::get_platform_max_threads() * 2)); 1776 arena.execute([&ets] { 1777 tbb::parallel_for(0, 10000000, [&ets](int){ 1778 ets.local() = 1; 1779 utils::doDummyWork(100); 1780 }); 1781 }); 1782 1783 CHECK(ets.combine(std::plus<int>{}) <= int(utils::get_platform_max_threads())); 1784 } 1785 1786 //! Test for task arena in concurrent cases 1787 //! \brief \ref requirement 1788 TEST_CASE("Test for concurrent functionality") { 1789 TestConcurrentFunctionality(); 1790 } 1791 1792 //! Test for arena entry consistency 1793 //! \brief \ref requirement \ref error_guessing 1794 TEST_CASE("Test for task arena entry consistency") { 1795 TestArenaEntryConsistency(); 1796 } 1797 1798 //! Test for task arena attach functionality 1799 //! \brief \ref requirement \ref interface 1800 TEST_CASE("Test for the attach functionality") { 1801 TestAttach(4); 1802 } 1803 1804 //! Test for constant functor requirements 1805 //! \brief \ref requirement \ref interface 1806 TEST_CASE("Test for constant functor requirement") { 1807 TestConstantFunctorRequirement(); 1808 } 1809 1810 //! Test for move semantics support 1811 //! \brief \ref requirement \ref interface 1812 TEST_CASE("Move semantics support") { 1813 TestMoveSemantics(); 1814 } 1815 1816 //! Test for different return value types 1817 //! \brief \ref requirement \ref interface 1818 TEST_CASE("Return value test") { 1819 TestReturnValue(); 1820 } 1821 1822 //! Test for delegated task spawn in case of unsuccessful slot attach 1823 //! \brief \ref error_guessing 1824 TEST_CASE("Delegated spawn wait") { 1825 TestDelegatedSpawnWait(); 1826 } 1827 1828 //! Test task arena isolation functionality 1829 //! \brief \ref requirement \ref interface 1830 TEST_CASE("Isolated execute") { 1831 // Isolation tests cases is valid only for more then 2 threads 1832 if (tbb::this_task_arena::max_concurrency() > 2) { 1833 TestIsolatedExecute(); 1834 } 1835 } 1836 1837 //! Test for TBB Workers creation limits 1838 //! \brief \ref requirement 1839 TEST_CASE("Default workers limit") { 1840 TestDefaultWorkersLimit(); 1841 } 1842 1843 //! Test for workers migration between arenas 1844 //! \brief \ref error_guessing \ref stress 1845 TEST_CASE("Arena workers migration") { 1846 TestArenaWorkersMigration(); 1847 } 1848 1849 //! Test for multiple waits, threads should not block each other 1850 //! \brief \ref requirement 1851 TEST_CASE("Multiple waits") { 1852 TestMultipleWaits(); 1853 } 1854 1855 //! Test for small stack size settings and arena initialization 1856 //! \brief \ref error_guessing 1857 TEST_CASE("Small stack size") { 1858 TestSmallStackSize(); 1859 } 1860 1861 #if TBB_USE_EXCEPTIONS 1862 //! \brief \ref requirement \ref stress 1863 TEST_CASE("Test for exceptions during execute.") { 1864 ExceptionInExecute(); 1865 } 1866 1867 //! \brief \ref error_guessing 1868 TEST_CASE("Exception thrown during tbb::task_arena::execute call") { 1869 struct throwing_obj { 1870 throwing_obj() { 1871 volatile bool flag = true; 1872 if (flag) throw std::exception{}; 1873 } 1874 throwing_obj(const throwing_obj&) = default; 1875 ~throwing_obj() { FAIL("An destructor was called."); } 1876 }; 1877 1878 tbb::task_arena arena; 1879 1880 REQUIRE_THROWS_AS( [&] { 1881 arena.execute([] { 1882 return throwing_obj{}; 1883 }); 1884 }(), std::exception ); 1885 } 1886 #endif // TBB_USE_EXCEPTIONS 1887 //! \brief \ref stress 1888 TEST_CASE("Stress test with mixing functionality") { 1889 StressTestMixFunctionality(); 1890 } 1891 //! \brief \ref stress 1892 TEST_CASE("Workers oversubscription") { 1893 std::size_t num_threads = utils::get_platform_max_threads(); 1894 tbb::enumerable_thread_specific<bool> ets; 1895 tbb::global_control gl(tbb::global_control::max_allowed_parallelism, num_threads * 2); 1896 tbb::task_arena arena(static_cast<int>(num_threads) * 2); 1897 1898 utils::SpinBarrier barrier(num_threads * 2); 1899 1900 arena.execute([&] { 1901 tbb::parallel_for(std::size_t(0), num_threads * 2, 1902 [&] (const std::size_t&) { 1903 ets.local() = true; 1904 barrier.wait(); 1905 } 1906 ); 1907 }); 1908 1909 utils::yield(); 1910 1911 std::atomic<std::size_t> task_counter{0}; 1912 for (std::size_t i = 0; i < num_threads / 4 + 1; ++i) { 1913 arena.enqueue(enqueue_test_helper(arena, ets, task_counter)); 1914 } 1915 1916 while (task_counter < 100000) utils::yield(); 1917 1918 arena.execute([&] { 1919 tbb::parallel_for(std::size_t(0), num_threads * 2, 1920 [&] (const std::size_t&) { 1921 CHECK(ets.local()); 1922 barrier.wait(); 1923 } 1924 ); 1925 }); 1926 } 1927 #if TBB_USE_EXCEPTIONS 1928 //! The test for error in scheduling empty task_handle 1929 //! \brief \ref requirement 1930 TEST_CASE("Empty task_handle cannot be scheduled" 1931 * doctest::should_fail() //Test needs to revised as implementation uses assertions instead of exceptions 1932 * doctest::skip() //skip the test for now, to not pollute the test log 1933 ){ 1934 tbb::task_arena ta; 1935 1936 CHECK_THROWS_WITH_AS(ta.enqueue(tbb::task_handle{}), "Attempt to schedule empty task_handle", std::runtime_error); 1937 CHECK_THROWS_WITH_AS(tbb::this_task_arena::enqueue(tbb::task_handle{}), "Attempt to schedule empty task_handle", std::runtime_error); 1938 } 1939 #endif 1940 1941 #if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS 1942 1943 //! Basic test for is_inside_task in task_group 1944 //! \brief \ref interface \ref requirement 1945 TEST_CASE("is_inside_task in task_group"){ 1946 CHECK( false == tbb::is_inside_task()); 1947 1948 tbb::task_group tg; 1949 tg.run_and_wait([&]{ 1950 CHECK( true == tbb::is_inside_task()); 1951 }); 1952 } 1953 1954 //! Basic test for is_inside_task in arena::execute 1955 //! \brief \ref interface \ref requirement 1956 TEST_CASE("is_inside_task in arena::execute"){ 1957 CHECK( false == tbb::is_inside_task()); 1958 1959 tbb::task_arena arena; 1960 1961 arena.execute([&]{ 1962 // The execute method is processed outside of any task 1963 CHECK( false == tbb::is_inside_task()); 1964 }); 1965 } 1966 1967 //! The test for is_inside_task in arena::execute when inside other task 1968 //! \brief \ref error_guessing 1969 TEST_CASE("is_inside_task in arena::execute") { 1970 CHECK(false == tbb::is_inside_task()); 1971 1972 tbb::task_arena arena; 1973 tbb::task_group tg; 1974 tg.run_and_wait([&] { 1975 arena.execute([&] { 1976 // The execute method is processed outside of any task 1977 CHECK(false == tbb::is_inside_task()); 1978 }); 1979 }); 1980 } 1981 #endif //__TBB_PREVIEW_TASK_GROUP_EXTENSIONS 1982