1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "common/test.h" 18 19 #define __TBB_EXTRA_DEBUG 1 20 #include "common/concurrency_tracker.h" 21 #include "common/spin_barrier.h" 22 #include "common/utils.h" 23 #include "common/utils_report.h" 24 #include "common/utils_concurrency_limit.h" 25 26 #include "tbb/task_arena.h" 27 #include "tbb/task_scheduler_observer.h" 28 #include "tbb/enumerable_thread_specific.h" 29 #include "tbb/parallel_for.h" 30 #include "tbb/global_control.h" 31 #include "tbb/concurrent_set.h" 32 #include "tbb/spin_mutex.h" 33 #include "tbb/spin_rw_mutex.h" 34 #include "tbb/task_group.h" 35 36 #include <stdexcept> 37 #include <cstdlib> 38 #include <cstdio> 39 #include <vector> 40 #include <thread> 41 #include <atomic> 42 43 //#include "harness_fp.h" 44 45 //! \file test_task_arena.cpp 46 //! \brief Test for [scheduler.task_arena scheduler.task_scheduler_observer] specification 47 48 //--------------------------------------------------// 49 // Test that task_arena::initialize and task_arena::terminate work when doing nothing else. 50 /* maxthread is treated as the biggest possible concurrency level. */ 51 void InitializeAndTerminate( int maxthread ) { 52 for( int i=0; i<200; ++i ) { 53 switch( i&3 ) { 54 // Arena is created inactive, initialization is always explicit. Lazy initialization is covered by other test functions. 55 // Explicit initialization can either keep the original values or change those. 56 // Arena termination can be explicit or implicit (in the destructor). 57 // TODO: extend with concurrency level checks if such a method is added. 58 default: { 59 tbb::task_arena arena( std::rand() % maxthread + 1 ); 60 CHECK_MESSAGE(!arena.is_active(), "arena should not be active until initialized"); 61 arena.initialize(); 62 CHECK(arena.is_active()); 63 arena.terminate(); 64 CHECK_MESSAGE(!arena.is_active(), "arena should not be active; it was terminated"); 65 break; 66 } 67 case 0: { 68 tbb::task_arena arena( 1 ); 69 CHECK_MESSAGE(!arena.is_active(), "arena should not be active until initialized"); 70 arena.initialize( std::rand() % maxthread + 1 ); // change the parameters 71 CHECK(arena.is_active()); 72 break; 73 } 74 case 1: { 75 tbb::task_arena arena( tbb::task_arena::automatic ); 76 CHECK(!arena.is_active()); 77 arena.initialize(); 78 CHECK(arena.is_active()); 79 break; 80 } 81 case 2: { 82 tbb::task_arena arena; 83 CHECK_MESSAGE(!arena.is_active(), "arena should not be active until initialized"); 84 arena.initialize( std::rand() % maxthread + 1 ); 85 CHECK(arena.is_active()); 86 arena.terminate(); 87 CHECK_MESSAGE(!arena.is_active(), "arena should not be active; it was terminated"); 88 break; 89 } 90 } 91 } 92 } 93 94 //--------------------------------------------------// 95 96 // Definitions used in more than one test 97 typedef tbb::blocked_range<int> Range; 98 99 // slot_id value: -1 is reserved by current_slot(), -2 is set in on_scheduler_exit() below 100 static tbb::enumerable_thread_specific<int> local_id, old_id, slot_id(-3); 101 102 void ResetTLS() { 103 local_id.clear(); 104 old_id.clear(); 105 slot_id.clear(); 106 } 107 108 class ArenaObserver : public tbb::task_scheduler_observer { 109 int myId; // unique observer/arena id within a test 110 int myMaxConcurrency; // concurrency of the associated arena 111 int myNumReservedSlots; // reserved slots in the associated arena 112 void on_scheduler_entry( bool is_worker ) override { 113 int current_index = tbb::this_task_arena::current_thread_index(); 114 CHECK(current_index < (myMaxConcurrency > 1 ? myMaxConcurrency : 2)); 115 if (is_worker) { 116 CHECK(current_index >= myNumReservedSlots); 117 } 118 CHECK_MESSAGE(!old_id.local(), "double call to on_scheduler_entry"); 119 old_id.local() = local_id.local(); 120 CHECK_MESSAGE(old_id.local() != myId, "double entry to the same arena"); 121 local_id.local() = myId; 122 slot_id.local() = current_index; 123 } 124 void on_scheduler_exit( bool /*is_worker*/ ) override { 125 CHECK_MESSAGE(local_id.local() == myId, "nesting of arenas is broken"); 126 CHECK(slot_id.local() == tbb::this_task_arena::current_thread_index()); 127 slot_id.local() = -2; 128 local_id.local() = old_id.local(); 129 old_id.local() = 0; 130 } 131 public: 132 ArenaObserver(tbb::task_arena &a, int maxConcurrency, int numReservedSlots, int id) 133 : tbb::task_scheduler_observer(a) 134 , myId(id) 135 , myMaxConcurrency(maxConcurrency) 136 , myNumReservedSlots(numReservedSlots) { 137 CHECK(myId); 138 observe(true); 139 } 140 ~ArenaObserver () { 141 CHECK_MESSAGE(!old_id.local(), "inconsistent observer state"); 142 } 143 }; 144 145 struct IndexTrackingBody { // Must be used together with ArenaObserver 146 void operator() ( const Range& ) const { 147 CHECK(slot_id.local() == tbb::this_task_arena::current_thread_index()); 148 utils::doDummyWork(50000); 149 } 150 }; 151 152 struct AsynchronousWork { 153 utils::SpinBarrier &my_barrier; 154 bool my_is_blocking; 155 AsynchronousWork(utils::SpinBarrier &a_barrier, bool blocking = true) 156 : my_barrier(a_barrier), my_is_blocking(blocking) {} 157 void operator()() const { 158 CHECK_MESSAGE(local_id.local() != 0, "not in explicit arena"); 159 tbb::parallel_for(Range(0,500), IndexTrackingBody(), tbb::simple_partitioner()); 160 if(my_is_blocking) my_barrier.wait(); // must be asynchronous to an external thread 161 else my_barrier.signalNoWait(); 162 } 163 }; 164 165 //-----------------------------------------------------------------------------------------// 166 167 // Test that task_arenas might be created and used from multiple application threads. 168 // Also tests arena observers. The parameter p is the index of an app thread running this test. 169 void TestConcurrentArenasFunc(int idx) { 170 // A regression test for observer activation order: 171 // check that arena observer can be activated before local observer 172 struct LocalObserver : public tbb::task_scheduler_observer { 173 LocalObserver() : tbb::task_scheduler_observer() { observe(true); } 174 }; 175 tbb::task_arena a1; 176 a1.initialize(1,0); 177 ArenaObserver o1(a1, 1, 0, idx*2+1); // the last argument is a "unique" observer/arena id for the test 178 CHECK_MESSAGE(o1.is_observing(), "Arena observer has not been activated"); 179 LocalObserver lo; 180 CHECK_MESSAGE(lo.is_observing(), "Local observer has not been activated"); 181 tbb::task_arena a2(2,1); 182 ArenaObserver o2(a2, 2, 1, idx*2+2); 183 CHECK_MESSAGE(o2.is_observing(), "Arena observer has not been activated"); 184 utils::SpinBarrier barrier(2); 185 AsynchronousWork work(barrier); 186 a1.enqueue(work); // put async work 187 barrier.wait(); 188 a2.enqueue(work); // another work 189 a2.execute(work); 190 a1.debug_wait_until_empty(); 191 a2.debug_wait_until_empty(); 192 } 193 194 void TestConcurrentArenas(int p) { 195 // TODO REVAMP fix with global control 196 ResetTLS(); 197 utils::NativeParallelFor( p, &TestConcurrentArenasFunc ); 198 } 199 //--------------------------------------------------// 200 // Test multiple application threads working with a single arena at the same time. 201 class MultipleMastersPart1 : utils::NoAssign { 202 tbb::task_arena &my_a; 203 utils::SpinBarrier &my_b1, &my_b2; 204 public: 205 MultipleMastersPart1( tbb::task_arena &a, utils::SpinBarrier &b1, utils::SpinBarrier &b2) 206 : my_a(a), my_b1(b1), my_b2(b2) {} 207 void operator()(int) const { 208 my_a.execute(AsynchronousWork(my_b2, /*blocking=*/false)); 209 my_b1.wait(); 210 // A regression test for bugs 1954 & 1971 211 my_a.enqueue(AsynchronousWork(my_b2, /*blocking=*/false)); 212 } 213 }; 214 215 class MultipleMastersPart2 : utils::NoAssign { 216 tbb::task_arena &my_a; 217 utils::SpinBarrier &my_b; 218 public: 219 MultipleMastersPart2( tbb::task_arena &a, utils::SpinBarrier &b) : my_a(a), my_b(b) {} 220 void operator()(int) const { 221 my_a.execute(AsynchronousWork(my_b, /*blocking=*/false)); 222 } 223 }; 224 225 class MultipleMastersPart3 : utils::NoAssign { 226 tbb::task_arena &my_a; 227 utils::SpinBarrier &my_b; 228 using wait_context = tbb::detail::d1::wait_context; 229 230 struct Runner : NoAssign { 231 wait_context& myWait; 232 Runner(wait_context& w) : myWait(w) {} 233 void operator()() const { 234 utils::doDummyWork(10000); 235 myWait.release(); 236 } 237 }; 238 239 struct Waiter : NoAssign { 240 wait_context& myWait; 241 Waiter(wait_context& w) : myWait(w) {} 242 void operator()() const { 243 tbb::task_group_context ctx; 244 tbb::detail::d1::wait(myWait, ctx); 245 } 246 }; 247 248 public: 249 MultipleMastersPart3(tbb::task_arena &a, utils::SpinBarrier &b) 250 : my_a(a), my_b(b) {} 251 void operator()(int) const { 252 wait_context wait(0); 253 my_b.wait(); // increases chances for task_arena initialization contention 254 for( int i=0; i<100; ++i) { 255 wait.reserve(); 256 my_a.enqueue(Runner(wait)); 257 my_a.execute(Waiter(wait)); 258 } 259 my_b.wait(); 260 } 261 }; 262 263 void TestMultipleMasters(int p) { 264 { 265 ResetTLS(); 266 tbb::task_arena a(1,0); 267 a.initialize(); 268 ArenaObserver o(a, 1, 0, 1); 269 utils::SpinBarrier barrier1(p), barrier2(2*p+1); // each of p threads will submit two tasks signaling the barrier 270 NativeParallelFor( p, MultipleMastersPart1(a, barrier1, barrier2) ); 271 barrier2.wait(); 272 a.debug_wait_until_empty(); 273 } { 274 ResetTLS(); 275 tbb::task_arena a(2,1); 276 ArenaObserver o(a, 2, 1, 2); 277 utils::SpinBarrier barrier(p+2); 278 a.enqueue(AsynchronousWork(barrier, /*blocking=*/true)); // occupy the worker, a regression test for bug 1981 279 // TODO: buggy test. A worker threads need time to occupy the slot to prevent an external thread from taking an enqueue task. 280 utils::Sleep(10); 281 NativeParallelFor( p, MultipleMastersPart2(a, barrier) ); 282 barrier.wait(); 283 a.debug_wait_until_empty(); 284 } { 285 // Regression test for the bug 1981 part 2 (task_arena::execute() with wait_for_all for an enqueued task) 286 tbb::task_arena a(p,1); 287 utils::SpinBarrier barrier(p+1); // for external threads to avoid endless waiting at least in some runs 288 // "Oversubscribe" the arena by 1 external thread 289 NativeParallelFor( p+1, MultipleMastersPart3(a, barrier) ); 290 a.debug_wait_until_empty(); 291 } 292 } 293 294 //--------------------------------------------------// 295 // TODO: explain what TestArenaEntryConsistency does 296 #include <sstream> 297 #include <stdexcept> 298 #include "oneapi/tbb/detail/_exception.h" 299 #include "common/fp_control.h" 300 301 struct TestArenaEntryBody : FPModeContext { 302 std::atomic<int> &my_stage; // each execute increases it 303 std::stringstream my_id; 304 bool is_caught, is_expected; 305 enum { arenaFPMode = 1 }; 306 307 TestArenaEntryBody(std::atomic<int> &s, int idx, int i) // init thread-specific instance 308 : FPModeContext(idx+i) 309 , my_stage(s) 310 , is_caught(false) 311 #if TBB_USE_EXCEPTIONS 312 , is_expected( (idx&(1<<i)) != 0 ) 313 #else 314 , is_expected(false) 315 #endif 316 { 317 my_id << idx << ':' << i << '@'; 318 } 319 void operator()() { // inside task_arena::execute() 320 // synchronize with other stages 321 int stage = my_stage++; 322 int slot = tbb::this_task_arena::current_thread_index(); 323 CHECK(slot >= 0); 324 CHECK(slot <= 1); 325 // wait until the third stage is delegated and then starts on slot 0 326 while(my_stage < 2+slot) utils::yield(); 327 // deduct its entry type and put it into id, it helps to find source of a problem 328 my_id << (stage < 3 ? (tbb::this_task_arena::current_thread_index()? 329 "delegated_to_worker" : stage < 2? "direct" : "delegated_to_master") 330 : stage == 3? "nested_same_ctx" : "nested_alien_ctx"); 331 AssertFPMode(arenaFPMode); 332 if (is_expected) { 333 TBB_TEST_THROW(std::logic_error(my_id.str())); 334 } 335 // no code can be put here since exceptions can be thrown 336 } 337 void on_exception(const char *e) { // outside arena, in catch block 338 is_caught = true; 339 CHECK(my_id.str() == e); 340 assertFPMode(); 341 } 342 void after_execute() { // outside arena and catch block 343 CHECK(is_caught == is_expected); 344 assertFPMode(); 345 } 346 }; 347 348 class ForEachArenaEntryBody : utils::NoAssign { 349 tbb::task_arena &my_a; // expected task_arena(2,1) 350 std::atomic<int> &my_stage; // each execute increases it 351 int my_idx; 352 353 public: 354 ForEachArenaEntryBody(tbb::task_arena &a, std::atomic<int> &c) 355 : my_a(a), my_stage(c), my_idx(0) {} 356 357 void test(int idx) { 358 my_idx = idx; 359 my_stage = 0; 360 NativeParallelFor(3, *this); // test cross-arena calls 361 CHECK(my_stage == 3); 362 my_a.execute(*this); // test nested calls 363 CHECK(my_stage == 5); 364 } 365 366 // task_arena functor for nested tests 367 void operator()() const { 368 test_arena_entry(3); // in current task group context 369 tbb::parallel_for(4, 5, *this); // in different context 370 } 371 372 // NativeParallelFor & parallel_for functor 373 void operator()(int i) const { 374 test_arena_entry(i); 375 } 376 377 private: 378 void test_arena_entry(int i) const { 379 GetRoundingMode(); 380 TestArenaEntryBody scoped_functor(my_stage, my_idx, i); 381 GetRoundingMode(); 382 #if TBB_USE_EXCEPTIONS 383 try { 384 my_a.execute(scoped_functor); 385 } catch(std::logic_error &e) { 386 scoped_functor.on_exception(e.what()); 387 } catch(...) { CHECK_MESSAGE(false, "Unexpected exception type"); } 388 #else 389 my_a.execute(scoped_functor); 390 #endif 391 scoped_functor.after_execute(); 392 } 393 }; 394 395 void TestArenaEntryConsistency() { 396 tbb::task_arena a(2, 1); 397 std::atomic<int> c; 398 ForEachArenaEntryBody body(a, c); 399 400 FPModeContext fp_scope(TestArenaEntryBody::arenaFPMode); 401 a.initialize(); // capture FP settings to arena 402 fp_scope.setNextFPMode(); 403 404 for (int i = 0; i < 100; i++) // not less than 32 = 2^5 of entry types 405 body.test(i); 406 } 407 //-------------------------------------------------- 408 // Test that the requested degree of concurrency for task_arena is achieved in various conditions 409 class TestArenaConcurrencyBody : utils::NoAssign { 410 tbb::task_arena &my_a; 411 int my_max_concurrency; 412 int my_reserved_slots; 413 utils::SpinBarrier *my_barrier; 414 utils::SpinBarrier *my_worker_barrier; 415 public: 416 TestArenaConcurrencyBody( tbb::task_arena &a, int max_concurrency, int reserved_slots, utils::SpinBarrier *b = NULL, utils::SpinBarrier *wb = NULL ) 417 : my_a(a), my_max_concurrency(max_concurrency), my_reserved_slots(reserved_slots), my_barrier(b), my_worker_barrier(wb) {} 418 // NativeParallelFor's functor 419 void operator()( int ) const { 420 CHECK_MESSAGE( local_id.local() == 0, "TLS was not cleaned?" ); 421 local_id.local() = 1; 422 my_a.execute( *this ); 423 } 424 // Arena's functor 425 void operator()() const { 426 int idx = tbb::this_task_arena::current_thread_index(); 427 CHECK( idx < (my_max_concurrency > 1 ? my_max_concurrency : 2) ); 428 CHECK( my_a.max_concurrency() == tbb::this_task_arena::max_concurrency() ); 429 int max_arena_concurrency = tbb::this_task_arena::max_concurrency(); 430 CHECK( max_arena_concurrency == my_max_concurrency ); 431 if ( my_worker_barrier ) { 432 if ( local_id.local() == 1 ) { 433 // External thread in a reserved slot 434 CHECK_MESSAGE( idx < my_reserved_slots, "External threads are supposed to use only reserved slots in this test" ); 435 } else { 436 // Worker thread 437 CHECK( idx >= my_reserved_slots ); 438 my_worker_barrier->wait(); 439 } 440 } else if ( my_barrier ) 441 CHECK_MESSAGE( local_id.local() == 1, "Workers are not supposed to enter the arena in this test" ); 442 if ( my_barrier ) my_barrier->wait(); 443 else utils::Sleep( 1 ); 444 } 445 }; 446 447 void TestArenaConcurrency( int p, int reserved = 0, int step = 1) { 448 for (; reserved <= p; reserved += step) { 449 tbb::task_arena a( p, reserved ); 450 if (p - reserved < tbb::this_task_arena::max_concurrency()) { 451 // Check concurrency with worker & reserved external threads. 452 ResetTLS(); 453 utils::SpinBarrier b( p ); 454 utils::SpinBarrier wb( p-reserved ); 455 TestArenaConcurrencyBody test( a, p, reserved, &b, &wb ); 456 for ( int i = reserved; i < p; ++i ) // requests p-reserved worker threads 457 a.enqueue( test ); 458 if ( reserved==1 ) 459 test( 0 ); // calls execute() 460 else 461 utils::NativeParallelFor( reserved, test ); 462 a.debug_wait_until_empty(); 463 } { // Check if multiple external threads alone can achieve maximum concurrency. 464 ResetTLS(); 465 utils::SpinBarrier b( p ); 466 utils::NativeParallelFor( p, TestArenaConcurrencyBody( a, p, reserved, &b ) ); 467 a.debug_wait_until_empty(); 468 } { // Check oversubscription by external threads. 469 #if !_WIN32 || !_WIN64 470 // Some C++ implementations allocate 8MB stacks for std::thread on 32 bit platforms 471 // that makes impossible to create more than ~500 threads. 472 if ( !(sizeof(std::size_t) == 4 && p > 200) ) 473 #endif 474 #if TBB_TEST_LOW_WORKLOAD 475 if ( p <= 16 ) 476 #endif 477 { 478 ResetTLS(); 479 utils::NativeParallelFor(2 * p, TestArenaConcurrencyBody(a, p, reserved)); 480 a.debug_wait_until_empty(); 481 } 482 } 483 } 484 } 485 486 struct TestMandatoryConcurrencyObserver : public tbb::task_scheduler_observer { 487 utils::SpinBarrier& m_barrier; 488 489 TestMandatoryConcurrencyObserver(tbb::task_arena& a, utils::SpinBarrier& barrier) 490 : tbb::task_scheduler_observer(a), m_barrier(barrier) { 491 observe(true); 492 } 493 void on_scheduler_exit(bool worker) override { 494 if (worker) { 495 m_barrier.wait(); 496 } 497 } 498 }; 499 500 void TestMandatoryConcurrency() { 501 tbb::task_arena a(1); 502 a.execute([&a] { 503 int n_threads = 4; 504 utils::SpinBarrier exit_barrier(2); 505 TestMandatoryConcurrencyObserver observer(a, exit_barrier); 506 for (int j = 0; j < 5; ++j) { 507 utils::ExactConcurrencyLevel::check(1); 508 std::atomic<int> num_tasks{ 0 }, curr_tasks{ 0 }; 509 utils::SpinBarrier barrier(n_threads); 510 utils::NativeParallelFor(n_threads, [&](int) { 511 for (int i = 0; i < 5; ++i) { 512 barrier.wait(); 513 a.enqueue([&] { 514 CHECK(tbb::this_task_arena::max_concurrency() == 2); 515 CHECK(a.max_concurrency() == 2); 516 ++curr_tasks; 517 CHECK(curr_tasks == 1); 518 utils::doDummyWork(1000); 519 CHECK(curr_tasks == 1); 520 --curr_tasks; 521 ++num_tasks; 522 }); 523 barrier.wait(); 524 } 525 }); 526 do { 527 exit_barrier.wait(); 528 } while (num_tasks < n_threads * 5); 529 } 530 observer.observe(false); 531 }); 532 } 533 534 void TestConcurrentFunctionality(int min_thread_num = 1, int max_thread_num = 3) { 535 TestMandatoryConcurrency(); 536 InitializeAndTerminate(max_thread_num); 537 for (int p = min_thread_num; p <= max_thread_num; ++p) { 538 TestConcurrentArenas(p); 539 TestMultipleMasters(p); 540 TestArenaConcurrency(p); 541 } 542 } 543 544 //--------------------------------------------------// 545 // Test creation/initialization of a task_arena that references an existing arena (aka attach). 546 // This part of the test uses the knowledge of task_arena internals 547 548 struct TaskArenaValidator { 549 int my_slot_at_construction; 550 const tbb::task_arena& my_arena; 551 TaskArenaValidator( const tbb::task_arena& other ) 552 : my_slot_at_construction(tbb::this_task_arena::current_thread_index()) 553 , my_arena(other) 554 {} 555 // Inspect the internal state 556 int concurrency() { return my_arena.debug_max_concurrency(); } 557 int reserved_for_masters() { return my_arena.debug_reserved_slots(); } 558 559 // This method should be called in task_arena::execute() for a captured arena 560 // by the same thread that created the validator. 561 void operator()() { 562 CHECK_MESSAGE( tbb::this_task_arena::current_thread_index()==my_slot_at_construction, 563 "Current thread index has changed since the validator construction" ); 564 } 565 }; 566 567 void ValidateAttachedArena( tbb::task_arena& arena, bool expect_activated, 568 int expect_concurrency, int expect_masters ) { 569 CHECK_MESSAGE( arena.is_active()==expect_activated, "Unexpected activation state" ); 570 if( arena.is_active() ) { 571 TaskArenaValidator validator( arena ); 572 CHECK_MESSAGE( validator.concurrency()==expect_concurrency, "Unexpected arena size" ); 573 CHECK_MESSAGE( validator.reserved_for_masters()==expect_masters, "Unexpected # of reserved slots" ); 574 if ( tbb::this_task_arena::current_thread_index() != tbb::task_arena::not_initialized ) { 575 CHECK(tbb::this_task_arena::current_thread_index() >= 0); 576 // for threads already in arena, check that the thread index remains the same 577 arena.execute( validator ); 578 } else { // not_initialized 579 // Test the deprecated method 580 CHECK(tbb::this_task_arena::current_thread_index() == -1); 581 } 582 583 // Ideally, there should be a check for having the same internal arena object, 584 // but that object is not easily accessible for implicit arenas. 585 } 586 } 587 588 struct TestAttachBody : utils::NoAssign { 589 static thread_local int my_idx; // safe to modify and use within the NativeParallelFor functor 590 const int maxthread; 591 TestAttachBody( int max_thr ) : maxthread(max_thr) {} 592 593 // The functor body for NativeParallelFor 594 void operator()( int idx ) const { 595 my_idx = idx; 596 597 int default_threads = tbb::this_task_arena::max_concurrency(); 598 599 tbb::task_arena arena{tbb::task_arena::attach()}; 600 ValidateAttachedArena( arena, false, -1, -1 ); // Nothing yet to attach to 601 602 arena.terminate(); 603 ValidateAttachedArena( arena, false, -1, -1 ); 604 605 // attach to an auto-initialized arena 606 tbb::parallel_for(0, 1, [](int) {}); 607 608 tbb::task_arena arena2{tbb::task_arena::attach()}; 609 ValidateAttachedArena( arena2, true, default_threads, 1 ); 610 611 // attach to another task_arena 612 arena.initialize( maxthread, std::min(maxthread,idx) ); 613 arena.execute( *this ); 614 } 615 616 // The functor body for task_arena::execute above 617 void operator()() const { 618 tbb::task_arena arena2{tbb::task_arena::attach()}; 619 ValidateAttachedArena( arena2, true, maxthread, std::min(maxthread,my_idx) ); 620 } 621 622 // The functor body for tbb::parallel_for 623 void operator()( const Range& r ) const { 624 for( int i = r.begin(); i<r.end(); ++i ) { 625 tbb::task_arena arena2{tbb::task_arena::attach()}; 626 ValidateAttachedArena( arena2, true, tbb::this_task_arena::max_concurrency(), 1 ); 627 } 628 } 629 }; 630 631 thread_local int TestAttachBody::my_idx; 632 633 void TestAttach( int maxthread ) { 634 // Externally concurrent, but no concurrency within a thread 635 utils::NativeParallelFor( std::max(maxthread,4), TestAttachBody( maxthread ) ); 636 // Concurrent within the current arena; may also serve as a stress test 637 tbb::parallel_for( Range(0,10000*maxthread), TestAttachBody( maxthread ) ); 638 } 639 640 //--------------------------------------------------// 641 642 // Test that task_arena::enqueue does not tolerate a non-const functor. 643 // TODO: can it be reworked as SFINAE-based compile-time check? 644 struct TestFunctor { 645 void operator()() { CHECK_MESSAGE( false, "Non-const operator called" ); } 646 void operator()() const { /* library requires this overload only */ } 647 }; 648 649 void TestConstantFunctorRequirement() { 650 tbb::task_arena a; 651 TestFunctor tf; 652 a.enqueue( tf ); 653 } 654 655 //--------------------------------------------------// 656 657 #include "tbb/parallel_reduce.h" 658 #include "tbb/parallel_invoke.h" 659 660 // Test this_task_arena::isolate 661 namespace TestIsolatedExecuteNS { 662 template <typename NestedPartitioner> 663 class NestedParFor : utils::NoAssign { 664 public: 665 NestedParFor() {} 666 void operator()() const { 667 NestedPartitioner p; 668 tbb::parallel_for( 0, 10, utils::DummyBody( 10 ), p ); 669 } 670 }; 671 672 template <typename NestedPartitioner> 673 class ParForBody : utils::NoAssign { 674 bool myOuterIsolation; 675 tbb::enumerable_thread_specific<int> &myEts; 676 std::atomic<bool> &myIsStolen; 677 public: 678 ParForBody( bool outer_isolation, tbb::enumerable_thread_specific<int> &ets, std::atomic<bool> &is_stolen ) 679 : myOuterIsolation( outer_isolation ), myEts( ets ), myIsStolen( is_stolen ) {} 680 void operator()( int ) const { 681 int &e = myEts.local(); 682 if ( e++ > 0 ) myIsStolen = true; 683 if ( myOuterIsolation ) 684 NestedParFor<NestedPartitioner>()(); 685 else 686 tbb::this_task_arena::isolate( NestedParFor<NestedPartitioner>() ); 687 --e; 688 } 689 }; 690 691 template <typename OuterPartitioner, typename NestedPartitioner> 692 class OuterParFor : utils::NoAssign { 693 bool myOuterIsolation; 694 std::atomic<bool> &myIsStolen; 695 public: 696 OuterParFor( bool outer_isolation, std::atomic<bool> &is_stolen ) : myOuterIsolation( outer_isolation ), myIsStolen( is_stolen ) {} 697 void operator()() const { 698 tbb::enumerable_thread_specific<int> ets( 0 ); 699 OuterPartitioner p; 700 tbb::parallel_for( 0, 1000, ParForBody<NestedPartitioner>( myOuterIsolation, ets, myIsStolen ), p ); 701 } 702 }; 703 704 template <typename OuterPartitioner, typename NestedPartitioner> 705 void TwoLoopsTest( bool outer_isolation ) { 706 std::atomic<bool> is_stolen; 707 is_stolen = false; 708 const int max_repeats = 100; 709 if ( outer_isolation ) { 710 for ( int i = 0; i <= max_repeats; ++i ) { 711 tbb::this_task_arena::isolate( OuterParFor<OuterPartitioner, NestedPartitioner>( outer_isolation, is_stolen ) ); 712 if ( is_stolen ) break; 713 } 714 // TODO: was ASSERT_WARNING 715 if (!is_stolen) { 716 REPORT("Warning: isolate() should not block stealing on nested levels without isolation\n"); 717 } 718 } else { 719 for ( int i = 0; i <= max_repeats; ++i ) { 720 OuterParFor<OuterPartitioner, NestedPartitioner>( outer_isolation, is_stolen )(); 721 } 722 REQUIRE_MESSAGE( !is_stolen, "isolate() on nested levels should prevent stealing from outer leves" ); 723 } 724 } 725 726 void TwoLoopsTest( bool outer_isolation ) { 727 TwoLoopsTest<tbb::simple_partitioner, tbb::simple_partitioner>( outer_isolation ); 728 TwoLoopsTest<tbb::simple_partitioner, tbb::affinity_partitioner>( outer_isolation ); 729 TwoLoopsTest<tbb::affinity_partitioner, tbb::simple_partitioner>( outer_isolation ); 730 TwoLoopsTest<tbb::affinity_partitioner, tbb::affinity_partitioner>( outer_isolation ); 731 } 732 733 void TwoLoopsTest() { 734 TwoLoopsTest( true ); 735 TwoLoopsTest( false ); 736 } 737 //--------------------------------------------------// 738 class HeavyMixTestBody : utils::NoAssign { 739 tbb::enumerable_thread_specific<utils::FastRandom<>>& myRandom; 740 tbb::enumerable_thread_specific<int>& myIsolatedLevel; 741 int myNestedLevel; 742 743 template <typename Partitioner, typename Body> 744 static void RunTwoBodies( utils::FastRandom<>& rnd, const Body &body, Partitioner& p, tbb::task_group_context* ctx = NULL ) { 745 if ( rnd.get() % 2 ) { 746 if (ctx ) 747 tbb::parallel_for( 0, 2, body, p, *ctx ); 748 else 749 tbb::parallel_for( 0, 2, body, p ); 750 } else { 751 tbb::parallel_invoke( body, body ); 752 } 753 } 754 755 template <typename Partitioner> 756 class IsolatedBody : utils::NoAssign { 757 const HeavyMixTestBody &myHeavyMixTestBody; 758 Partitioner &myPartitioner; 759 public: 760 IsolatedBody( const HeavyMixTestBody &body, Partitioner &partitioner ) 761 : myHeavyMixTestBody( body ), myPartitioner( partitioner ) {} 762 void operator()() const { 763 RunTwoBodies( myHeavyMixTestBody.myRandom.local(), 764 HeavyMixTestBody( myHeavyMixTestBody.myRandom, myHeavyMixTestBody.myIsolatedLevel, 765 myHeavyMixTestBody.myNestedLevel + 1 ), 766 myPartitioner ); 767 } 768 }; 769 770 template <typename Partitioner> 771 void RunNextLevel( utils::FastRandom<>& rnd, int &isolated_level ) const { 772 Partitioner p; 773 switch ( rnd.get() % 2 ) { 774 case 0: { 775 // No features 776 tbb::task_group_context ctx; 777 RunTwoBodies( rnd, HeavyMixTestBody(myRandom, myIsolatedLevel, myNestedLevel + 1), p, &ctx ); 778 break; 779 } 780 case 1: { 781 // Isolation 782 int previous_isolation = isolated_level; 783 isolated_level = myNestedLevel; 784 tbb::this_task_arena::isolate( IsolatedBody<Partitioner>( *this, p ) ); 785 isolated_level = previous_isolation; 786 break; 787 } 788 } 789 } 790 public: 791 HeavyMixTestBody( tbb::enumerable_thread_specific<utils::FastRandom<>>& random, 792 tbb::enumerable_thread_specific<int>& isolated_level, int nested_level ) 793 : myRandom( random ), myIsolatedLevel( isolated_level ) 794 , myNestedLevel( nested_level ) {} 795 void operator()() const { 796 int &isolated_level = myIsolatedLevel.local(); 797 CHECK_FAST_MESSAGE( myNestedLevel > isolated_level, "The outer-level task should not be stolen on isolated level" ); 798 if ( myNestedLevel == 20 ) 799 return; 800 utils::FastRandom<>& rnd = myRandom.local(); 801 if ( rnd.get() % 2 == 1 ) { 802 RunNextLevel<tbb::auto_partitioner>( rnd, isolated_level ); 803 } else { 804 RunNextLevel<tbb::affinity_partitioner>( rnd, isolated_level ); 805 } 806 } 807 void operator()(int) const { 808 this->operator()(); 809 } 810 }; 811 812 struct RandomInitializer { 813 utils::FastRandom<> operator()() { 814 return utils::FastRandom<>( tbb::this_task_arena::current_thread_index() ); 815 } 816 }; 817 818 void HeavyMixTest() { 819 std::size_t num_threads = tbb::this_task_arena::max_concurrency() < 3 ? 3 : tbb::this_task_arena::max_concurrency(); 820 tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, num_threads); 821 822 RandomInitializer init_random; 823 tbb::enumerable_thread_specific<utils::FastRandom<>> random( init_random ); 824 tbb::enumerable_thread_specific<int> isolated_level( 0 ); 825 for ( int i = 0; i < 5; ++i ) { 826 HeavyMixTestBody b( random, isolated_level, 1 ); 827 b( 0 ); 828 } 829 } 830 831 //--------------------------------------------------// 832 #if TBB_USE_EXCEPTIONS 833 struct MyException {}; 834 struct IsolatedBodyThrowsException { 835 void operator()() const { 836 #if _MSC_VER && !__INTEL_COMPILER 837 // Workaround an unreachable code warning in task_arena_function. 838 volatile bool workaround = true; 839 if (workaround) 840 #endif 841 { 842 throw MyException(); 843 } 844 } 845 }; 846 struct ExceptionTestBody : utils::NoAssign { 847 tbb::enumerable_thread_specific<int>& myEts; 848 std::atomic<bool>& myIsStolen; 849 ExceptionTestBody( tbb::enumerable_thread_specific<int>& ets, std::atomic<bool>& is_stolen ) 850 : myEts( ets ), myIsStolen( is_stolen ) {} 851 void operator()( int i ) const { 852 try { 853 tbb::this_task_arena::isolate( IsolatedBodyThrowsException() ); 854 REQUIRE_MESSAGE( false, "The exception has been lost" ); 855 } 856 catch ( MyException ) {} 857 catch ( ... ) { 858 REQUIRE_MESSAGE( false, "Unexpected exception" ); 859 } 860 // Check that nested algorithms can steal outer-level tasks 861 int &e = myEts.local(); 862 if ( e++ > 0 ) myIsStolen = true; 863 // work imbalance increases chances for stealing 864 tbb::parallel_for( 0, 10+i, utils::DummyBody( 100 ) ); 865 --e; 866 } 867 }; 868 869 #endif /* TBB_USE_EXCEPTIONS */ 870 void ExceptionTest() { 871 #if TBB_USE_EXCEPTIONS 872 tbb::enumerable_thread_specific<int> ets; 873 std::atomic<bool> is_stolen; 874 is_stolen = false; 875 for ( ;; ) { 876 tbb::parallel_for( 0, 1000, ExceptionTestBody( ets, is_stolen ) ); 877 if ( is_stolen ) break; 878 } 879 REQUIRE_MESSAGE( is_stolen, "isolate should not affect non-isolated work" ); 880 #endif /* TBB_USE_EXCEPTIONS */ 881 } 882 883 struct NonConstBody { 884 unsigned int state; 885 void operator()() { 886 state ^= ~0u; 887 } 888 }; 889 890 void TestNonConstBody() { 891 NonConstBody body; 892 body.state = 0x6c97d5ed; 893 tbb::this_task_arena::isolate(body); 894 REQUIRE_MESSAGE(body.state == 0x93682a12, "The wrong state"); 895 } 896 897 // TODO: Consider tbb::task_group instead of explicit task API. 898 class TestEnqueueTask : public tbb::detail::d1::task { 899 using wait_context = tbb::detail::d1::wait_context; 900 901 tbb::enumerable_thread_specific<bool>& executed; 902 std::atomic<int>& completed; 903 904 public: 905 wait_context& waiter; 906 tbb::task_arena& arena; 907 static const int N = 100; 908 909 TestEnqueueTask(tbb::enumerable_thread_specific<bool>& exe, std::atomic<int>& c, wait_context& w, tbb::task_arena& a) 910 : executed(exe), completed(c), waiter(w), arena(a) {} 911 912 tbb::detail::d1::task* execute(tbb::detail::d1::execution_data&) override { 913 for (int i = 0; i < N; ++i) { 914 arena.enqueue([&]() { 915 executed.local() = true; 916 ++completed; 917 for (int j = 0; j < 100; j++) utils::yield(); 918 waiter.release(1); 919 }); 920 } 921 return nullptr; 922 } 923 tbb::detail::d1::task* cancel(tbb::detail::d1::execution_data&) override { return nullptr; } 924 }; 925 926 class TestEnqueueIsolateBody : utils::NoCopy { 927 tbb::enumerable_thread_specific<bool>& executed; 928 std::atomic<int>& completed; 929 tbb::task_arena& arena; 930 public: 931 static const int N = 100; 932 933 TestEnqueueIsolateBody(tbb::enumerable_thread_specific<bool>& exe, std::atomic<int>& c, tbb::task_arena& a) 934 : executed(exe), completed(c), arena(a) {} 935 void operator()() { 936 tbb::task_group_context ctx; 937 tbb::detail::d1::wait_context waiter(N); 938 939 TestEnqueueTask root(executed, completed, waiter, arena); 940 tbb::detail::d1::execute_and_wait(root, ctx, waiter, ctx); 941 } 942 }; 943 944 void TestEnqueue() { 945 tbb::enumerable_thread_specific<bool> executed(false); 946 std::atomic<int> completed; 947 tbb::task_arena arena{tbb::task_arena::attach()}; 948 949 // Check that the main thread can process enqueued tasks. 950 completed = 0; 951 TestEnqueueIsolateBody b1(executed, completed, arena); 952 b1(); 953 954 if (!executed.local()) { 955 REPORT("Warning: No one enqueued task has executed by the main thread.\n"); 956 } 957 958 executed.local() = false; 959 completed = 0; 960 const int N = 100; 961 // Create enqueued tasks out of isolation. 962 963 tbb::task_group_context ctx; 964 tbb::detail::d1::wait_context waiter(N); 965 for (int i = 0; i < N; ++i) { 966 arena.enqueue([&]() { 967 executed.local() = true; 968 ++completed; 969 utils::yield(); 970 waiter.release(1); 971 }); 972 } 973 TestEnqueueIsolateBody b2(executed, completed, arena); 974 tbb::this_task_arena::isolate(b2); 975 REQUIRE_MESSAGE(executed.local() == false, "An enqueued task was executed within isolate."); 976 977 tbb::detail::d1::wait(waiter, ctx); 978 // while (completed < TestEnqueueTask::N + N) utils::yield(); 979 } 980 } 981 982 void TestIsolatedExecute() { 983 // At least 3 threads (owner + 2 thieves) are required to reproduce a situation when the owner steals outer 984 // level task on a nested level. If we have only one thief then it will execute outer level tasks first and 985 // the owner will not have a possibility to steal outer level tasks. 986 int platform_max_thread = tbb::this_task_arena::max_concurrency(); 987 int num_threads = utils::min( platform_max_thread, 3 ); 988 { 989 // Too many threads require too many work to reproduce the stealing from outer level. 990 tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, utils::max(num_threads, 7)); 991 TestIsolatedExecuteNS::TwoLoopsTest(); 992 TestIsolatedExecuteNS::HeavyMixTest(); 993 TestIsolatedExecuteNS::ExceptionTest(); 994 } 995 tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, num_threads); 996 TestIsolatedExecuteNS::HeavyMixTest(); 997 TestIsolatedExecuteNS::TestNonConstBody(); 998 TestIsolatedExecuteNS::TestEnqueue(); 999 } 1000 1001 //-----------------------------------------------------------------------------------------// 1002 1003 class TestDelegatedSpawnWaitBody : utils::NoAssign { 1004 tbb::task_arena &my_a; 1005 utils::SpinBarrier &my_b1, &my_b2; 1006 public: 1007 TestDelegatedSpawnWaitBody( tbb::task_arena &a, utils::SpinBarrier &b1, utils::SpinBarrier &b2) 1008 : my_a(a), my_b1(b1), my_b2(b2) {} 1009 // NativeParallelFor's functor 1010 void operator()(int idx) const { 1011 if ( idx==0 ) { // thread 0 works in the arena, thread 1 waits for it (to prevent test hang) 1012 for (int i = 0; i < 2; ++i) { 1013 my_a.enqueue([this] { my_b1.wait(); }); // tasks to sync with workers 1014 } 1015 tbb::task_group tg; 1016 my_b1.wait(); // sync with the workers 1017 for( int i=0; i<100000; ++i) { 1018 my_a.execute([&tg] { tg.run([] {}); }); 1019 } 1020 my_a.execute([&tg] {tg.wait(); }); 1021 } 1022 1023 my_b2.wait(); // sync both threads 1024 } 1025 }; 1026 1027 void TestDelegatedSpawnWait() { 1028 if (tbb::this_task_arena::max_concurrency() < 3) { 1029 // The test requires at least 2 worker threads 1030 return; 1031 } 1032 // Regression test for a bug with missed wakeup notification from a delegated task 1033 tbb::task_arena a(2,0); 1034 a.initialize(); 1035 utils::SpinBarrier barrier1(3), barrier2(2); 1036 utils::NativeParallelFor( 2, TestDelegatedSpawnWaitBody(a, barrier1, barrier2) ); 1037 a.debug_wait_until_empty(); 1038 } 1039 1040 //-----------------------------------------------------------------------------------------// 1041 1042 class TestMultipleWaitsArenaWait : utils::NoAssign { 1043 using wait_context = tbb::detail::d1::wait_context; 1044 public: 1045 TestMultipleWaitsArenaWait( int idx, int bunch_size, int num_tasks, std::vector<wait_context*>& waiters, std::atomic<int>& processed, tbb::task_group_context& tgc ) 1046 : my_idx( idx ), my_bunch_size( bunch_size ), my_num_tasks(num_tasks), my_waiters( waiters ), my_processed( processed ), my_context(tgc) {} 1047 void operator()() const { 1048 ++my_processed; 1049 // Wait for all tasks 1050 if ( my_idx < my_num_tasks ) { 1051 tbb::detail::d1::wait(*my_waiters[my_idx], my_context); 1052 } 1053 // Signal waiting tasks 1054 if ( my_idx >= my_bunch_size ) { 1055 my_waiters[my_idx-my_bunch_size]->release(); 1056 } 1057 } 1058 private: 1059 int my_idx; 1060 int my_bunch_size; 1061 int my_num_tasks; 1062 std::vector<wait_context*>& my_waiters; 1063 std::atomic<int>& my_processed; 1064 tbb::task_group_context& my_context; 1065 }; 1066 1067 class TestMultipleWaitsThreadBody : utils::NoAssign { 1068 using wait_context = tbb::detail::d1::wait_context; 1069 public: 1070 TestMultipleWaitsThreadBody( int bunch_size, int num_tasks, tbb::task_arena& a, std::vector<wait_context*>& waiters, std::atomic<int>& processed, tbb::task_group_context& tgc ) 1071 : my_bunch_size( bunch_size ), my_num_tasks( num_tasks ), my_arena( a ), my_waiters( waiters ), my_processed( processed ), my_context(tgc) {} 1072 void operator()( int idx ) const { 1073 my_arena.execute( TestMultipleWaitsArenaWait( idx, my_bunch_size, my_num_tasks, my_waiters, my_processed, my_context ) ); 1074 --my_processed; 1075 } 1076 private: 1077 int my_bunch_size; 1078 int my_num_tasks; 1079 tbb::task_arena& my_arena; 1080 std::vector<wait_context*>& my_waiters; 1081 std::atomic<int>& my_processed; 1082 tbb::task_group_context& my_context; 1083 }; 1084 1085 void TestMultipleWaits( int num_threads, int num_bunches, int bunch_size ) { 1086 tbb::task_arena a( num_threads ); 1087 const int num_tasks = (num_bunches-1)*bunch_size; 1088 1089 tbb::task_group_context tgc; 1090 std::vector<tbb::detail::d1::wait_context*> waiters(num_tasks); 1091 for (auto& w : waiters) w = new tbb::detail::d1::wait_context(0); 1092 1093 std::atomic<int> processed(0); 1094 for ( int repeats = 0; repeats<10; ++repeats ) { 1095 int idx = 0; 1096 for ( int bunch = 0; bunch < num_bunches-1; ++bunch ) { 1097 // Sync with the previous bunch of waiters to prevent "false" nested dependicies (when a nested task waits for an outer task). 1098 while ( processed < bunch*bunch_size ) utils::yield(); 1099 // Run the bunch of threads/waiters that depend on the next bunch of threads/waiters. 1100 for ( int i = 0; i<bunch_size; ++i ) { 1101 waiters[idx]->reserve(); 1102 std::thread( TestMultipleWaitsThreadBody( bunch_size, num_tasks, a, waiters, processed, tgc ), idx++ ).detach(); 1103 } 1104 } 1105 // No sync because the threads of the last bunch do not call wait_for_all. 1106 // Run the last bunch of threads. 1107 for ( int i = 0; i<bunch_size; ++i ) 1108 std::thread( TestMultipleWaitsThreadBody( bunch_size, num_tasks, a, waiters, processed, tgc ), idx++ ).detach(); 1109 while ( processed ) utils::yield(); 1110 } 1111 for (auto w : waiters) delete w; 1112 } 1113 1114 void TestMultipleWaits() { 1115 // Limit the number of threads to prevent heavy oversubscription. 1116 #if TBB_TEST_LOW_WORKLOAD 1117 const int max_threads = std::min( 4, tbb::this_task_arena::max_concurrency() ); 1118 #else 1119 const int max_threads = std::min( 16, tbb::this_task_arena::max_concurrency() ); 1120 #endif 1121 1122 utils::FastRandom<> rnd(1234); 1123 for ( int threads = 1; threads <= max_threads; threads += utils::max( threads/2, 1 ) ) { 1124 for ( int i = 0; i<3; ++i ) { 1125 const int num_bunches = 3 + rnd.get()%3; 1126 const int bunch_size = max_threads + rnd.get()%max_threads; 1127 TestMultipleWaits( threads, num_bunches, bunch_size ); 1128 } 1129 } 1130 } 1131 1132 //--------------------------------------------------// 1133 1134 void TestSmallStackSize() { 1135 tbb::global_control gc(tbb::global_control::thread_stack_size, 1136 tbb::global_control::active_value(tbb::global_control::thread_stack_size) / 2 ); 1137 // The test produces the warning (not a error) if fails. So the test is run many times 1138 // to make the log annoying (to force to consider it as an error). 1139 for (int i = 0; i < 100; ++i) { 1140 tbb::task_arena a; 1141 a.initialize(); 1142 } 1143 } 1144 1145 //--------------------------------------------------// 1146 1147 namespace TestMoveSemanticsNS { 1148 struct TestFunctor { 1149 void operator()() const {}; 1150 }; 1151 1152 struct MoveOnlyFunctor : utils::MoveOnly, TestFunctor { 1153 MoveOnlyFunctor() : utils::MoveOnly() {}; 1154 MoveOnlyFunctor(MoveOnlyFunctor&& other) : utils::MoveOnly(std::move(other)) {}; 1155 }; 1156 1157 struct MovePreferableFunctor : utils::Movable, TestFunctor { 1158 MovePreferableFunctor() : utils::Movable() {}; 1159 MovePreferableFunctor(MovePreferableFunctor&& other) : utils::Movable( std::move(other) ) {}; 1160 MovePreferableFunctor(const MovePreferableFunctor& other) : utils::Movable(other) {}; 1161 }; 1162 1163 struct NoMoveNoCopyFunctor : utils::NoCopy, TestFunctor { 1164 NoMoveNoCopyFunctor() : utils::NoCopy() {}; 1165 // mv ctor is not allowed as cp ctor from parent NoCopy 1166 private: 1167 NoMoveNoCopyFunctor(NoMoveNoCopyFunctor&&); 1168 }; 1169 1170 void TestFunctors() { 1171 tbb::task_arena ta; 1172 MovePreferableFunctor mpf; 1173 // execute() doesn't have any copies or moves of arguments inside the impl 1174 ta.execute( NoMoveNoCopyFunctor() ); 1175 1176 ta.enqueue( MoveOnlyFunctor() ); 1177 ta.enqueue( mpf ); 1178 REQUIRE_MESSAGE(mpf.alive, "object was moved when was passed by lval"); 1179 mpf.Reset(); 1180 ta.enqueue( std::move(mpf) ); 1181 REQUIRE_MESSAGE(!mpf.alive, "object was copied when was passed by rval"); 1182 mpf.Reset(); 1183 } 1184 } 1185 1186 void TestMoveSemantics() { 1187 TestMoveSemanticsNS::TestFunctors(); 1188 } 1189 1190 //--------------------------------------------------// 1191 1192 #include <vector> 1193 1194 #include "common/state_trackable.h" 1195 1196 namespace TestReturnValueNS { 1197 struct noDefaultTag {}; 1198 class ReturnType : public StateTrackable<> { 1199 static const int SIZE = 42; 1200 std::vector<int> data; 1201 public: 1202 ReturnType(noDefaultTag) : StateTrackable<>(0) {} 1203 // Define copy constructor to test that it is never called 1204 ReturnType(const ReturnType& r) : StateTrackable<>(r), data(r.data) {} 1205 ReturnType(ReturnType&& r) : StateTrackable<>(std::move(r)), data(std::move(r.data)) {} 1206 1207 void fill() { 1208 for (int i = 0; i < SIZE; ++i) 1209 data.push_back(i); 1210 } 1211 void check() { 1212 REQUIRE(data.size() == unsigned(SIZE)); 1213 for (int i = 0; i < SIZE; ++i) 1214 REQUIRE(data[i] == i); 1215 StateTrackableCounters::counters_type& cnts = StateTrackableCounters::counters; 1216 REQUIRE(cnts[StateTrackableBase::DefaultInitialized] == 0); 1217 REQUIRE(cnts[StateTrackableBase::DirectInitialized] == 1); 1218 std::size_t copied = cnts[StateTrackableBase::CopyInitialized]; 1219 std::size_t moved = cnts[StateTrackableBase::MoveInitialized]; 1220 REQUIRE(cnts[StateTrackableBase::Destroyed] == copied + moved); 1221 // The number of copies/moves should not exceed 3 if copy elision takes a place: 1222 // function return, store to an internal storage, acquire internal storage. 1223 // For compilation, without copy elision, this number may be grown up to 7. 1224 REQUIRE((copied == 0 && moved <= 7)); 1225 WARN_MESSAGE(moved <= 3, 1226 "Warning: The number of copies/moves should not exceed 3 if copy elision takes a place." 1227 "Take an attention to this warning only if copy elision is enabled." 1228 ); 1229 } 1230 }; 1231 1232 template <typename R> 1233 R function() { 1234 noDefaultTag tag; 1235 R r(tag); 1236 r.fill(); 1237 return r; 1238 } 1239 1240 template <> 1241 void function<void>() {} 1242 1243 template <typename R> 1244 struct Functor { 1245 R operator()() const { 1246 return function<R>(); 1247 } 1248 }; 1249 1250 tbb::task_arena& arena() { 1251 static tbb::task_arena a; 1252 return a; 1253 } 1254 1255 template <typename F> 1256 void TestExecute(F &f) { 1257 StateTrackableCounters::reset(); 1258 ReturnType r{arena().execute(f)}; 1259 r.check(); 1260 } 1261 1262 template <typename F> 1263 void TestExecute(const F &f) { 1264 StateTrackableCounters::reset(); 1265 ReturnType r{arena().execute(f)}; 1266 r.check(); 1267 } 1268 template <typename F> 1269 void TestIsolate(F &f) { 1270 StateTrackableCounters::reset(); 1271 ReturnType r{tbb::this_task_arena::isolate(f)}; 1272 r.check(); 1273 } 1274 1275 template <typename F> 1276 void TestIsolate(const F &f) { 1277 StateTrackableCounters::reset(); 1278 ReturnType r{tbb::this_task_arena::isolate(f)}; 1279 r.check(); 1280 } 1281 1282 void Test() { 1283 TestExecute(Functor<ReturnType>()); 1284 Functor<ReturnType> f1; 1285 TestExecute(f1); 1286 TestExecute(function<ReturnType>); 1287 1288 arena().execute(Functor<void>()); 1289 Functor<void> f2; 1290 arena().execute(f2); 1291 arena().execute(function<void>); 1292 TestIsolate(Functor<ReturnType>()); 1293 TestIsolate(f1); 1294 TestIsolate(function<ReturnType>); 1295 tbb::this_task_arena::isolate(Functor<void>()); 1296 tbb::this_task_arena::isolate(f2); 1297 tbb::this_task_arena::isolate(function<void>); 1298 } 1299 } 1300 1301 void TestReturnValue() { 1302 TestReturnValueNS::Test(); 1303 } 1304 1305 //--------------------------------------------------// 1306 1307 // MyObserver checks if threads join to the same arena 1308 struct MyObserver: public tbb::task_scheduler_observer { 1309 tbb::enumerable_thread_specific<tbb::task_arena*>& my_tls; 1310 tbb::task_arena& my_arena; 1311 std::atomic<int>& my_failure_counter; 1312 std::atomic<int>& my_counter; 1313 utils::SpinBarrier& m_barrier; 1314 1315 MyObserver(tbb::task_arena& a, 1316 tbb::enumerable_thread_specific<tbb::task_arena*>& tls, 1317 std::atomic<int>& failure_counter, 1318 std::atomic<int>& counter, 1319 utils::SpinBarrier& barrier) 1320 : tbb::task_scheduler_observer(a), my_tls(tls), my_arena(a), 1321 my_failure_counter(failure_counter), my_counter(counter), m_barrier(barrier) { 1322 observe(true); 1323 } 1324 void on_scheduler_entry(bool worker) override { 1325 if (worker) { 1326 ++my_counter; 1327 tbb::task_arena*& cur_arena = my_tls.local(); 1328 if (cur_arena != 0 && cur_arena != &my_arena) { 1329 ++my_failure_counter; 1330 } 1331 cur_arena = &my_arena; 1332 m_barrier.wait(); 1333 } 1334 } 1335 void on_scheduler_exit(bool worker) override { 1336 if (worker) { 1337 m_barrier.wait(); // before wakeup 1338 m_barrier.wait(); // after wakeup 1339 } 1340 } 1341 }; 1342 1343 void TestArenaWorkersMigrationWithNumThreads(int n_threads = 0) { 1344 if (n_threads == 0) { 1345 n_threads = tbb::this_task_arena::max_concurrency(); 1346 } 1347 1348 const int max_n_arenas = 8; 1349 int n_arenas = 2; 1350 if(n_threads > 16) { 1351 n_arenas = max_n_arenas; 1352 } else if (n_threads > 8) { 1353 n_arenas = 4; 1354 } 1355 1356 int n_workers = n_threads - 1; 1357 n_workers = n_arenas * (n_workers / n_arenas); 1358 if (n_workers == 0) { 1359 return; 1360 } 1361 1362 n_threads = n_workers + 1; 1363 tbb::global_control control(tbb::global_control::max_allowed_parallelism, n_threads); 1364 1365 const int n_repetitions = 20; 1366 const int n_outer_repetitions = 100; 1367 std::multiset<float> failure_ratio; // for median calculating 1368 utils::SpinBarrier barrier(n_threads); 1369 utils::SpinBarrier worker_barrier(n_workers); 1370 MyObserver* observer[max_n_arenas]; 1371 std::vector<tbb::task_arena> arenas(n_arenas); 1372 std::atomic<int> failure_counter; 1373 std::atomic<int> counter; 1374 tbb::enumerable_thread_specific<tbb::task_arena*> tls; 1375 1376 for (int i = 0; i < n_arenas; ++i) { 1377 arenas[i].initialize(n_workers / n_arenas + 1); // +1 for master 1378 observer[i] = new MyObserver(arenas[i], tls, failure_counter, counter, barrier); 1379 } 1380 1381 int ii = 0; 1382 for (; ii < n_outer_repetitions; ++ii) { 1383 failure_counter = 0; 1384 counter = 0; 1385 1386 // Main code 1387 auto wakeup = [&arenas] { for (auto& a : arenas) a.enqueue([]{}); }; 1388 wakeup(); 1389 for (int j = 0; j < n_repetitions; ++j) { 1390 barrier.wait(); // entry 1391 barrier.wait(); // exit1 1392 wakeup(); 1393 barrier.wait(); // exit2 1394 } 1395 barrier.wait(); // entry 1396 barrier.wait(); // exit1 1397 barrier.wait(); // exit2 1398 1399 failure_ratio.insert(float(failure_counter) / counter); 1400 tls.clear(); 1401 // collect 3 elements in failure_ratio before calculating median 1402 if (ii > 1) { 1403 std::multiset<float>::iterator it = failure_ratio.begin(); 1404 std::advance(it, failure_ratio.size() / 2); 1405 if (*it < 0.02) 1406 break; 1407 } 1408 } 1409 for (int i = 0; i < n_arenas; ++i) { 1410 delete observer[i]; 1411 } 1412 // check if median is so big 1413 std::multiset<float>::iterator it = failure_ratio.begin(); 1414 std::advance(it, failure_ratio.size() / 2); 1415 // TODO: decrease constants 0.05 and 0.3 by setting ratio between n_threads and n_arenas 1416 if (*it > 0.05) { 1417 REPORT("Warning: So many cases when threads join to different arenas.\n"); 1418 REQUIRE_MESSAGE(*it <= 0.3, "A lot of cases when threads join to different arenas.\n"); 1419 } 1420 } 1421 1422 void TestArenaWorkersMigration() { 1423 TestArenaWorkersMigrationWithNumThreads(4); 1424 if (tbb::this_task_arena::max_concurrency() != 4) { 1425 TestArenaWorkersMigrationWithNumThreads(); 1426 } 1427 } 1428 1429 //--------------------------------------------------// 1430 void TestDefaultCreatedWorkersAmount() { 1431 int threads = tbb::this_task_arena::max_concurrency(); 1432 utils::NativeParallelFor(1, [threads](int idx) { 1433 REQUIRE_MESSAGE(idx == 0, "more than 1 thread is going to reset TLS"); 1434 utils::SpinBarrier barrier(threads); 1435 ResetTLS(); 1436 for (auto blocked : { false, true }) { 1437 for (int trail = 0; trail < (blocked ? 10 : 10000); ++trail) { 1438 tbb::parallel_for(0, threads, [threads, blocked, &barrier](int) { 1439 CHECK_FAST_MESSAGE(threads == tbb::this_task_arena::max_concurrency(), "concurrency level is not equal specified threadnum"); 1440 CHECK_FAST_MESSAGE(tbb::this_task_arena::current_thread_index() < tbb::this_task_arena::max_concurrency(), "amount of created threads is more than specified by default"); 1441 local_id.local() = 1; 1442 if (blocked) { 1443 // If there is more threads than expected, 'sleep' gives a chance to join unexpected threads. 1444 utils::Sleep(1); 1445 barrier.wait(); 1446 } 1447 }, tbb::simple_partitioner()); 1448 REQUIRE_MESSAGE(local_id.size() <= size_t(threads), "amount of created threads is not equal to default num"); 1449 if (blocked) { 1450 REQUIRE_MESSAGE(local_id.size() == size_t(threads), "amount of created threads is not equal to default num"); 1451 } 1452 } 1453 } 1454 }); 1455 } 1456 1457 void TestAbilityToCreateWorkers(int thread_num) { 1458 tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, thread_num); 1459 // Checks only some part of reserved-external threads amount: 1460 // 0 and 1 reserved threads are important cases but it is also needed 1461 // to collect some statistic data with other amount and to not consume 1462 // whole test sesion time checking each amount 1463 TestArenaConcurrency(thread_num - 1, 0, int(thread_num / 2.72)); 1464 TestArenaConcurrency(thread_num, 1, int(thread_num / 3.14)); 1465 } 1466 1467 void TestDefaultWorkersLimit() { 1468 TestDefaultCreatedWorkersAmount(); 1469 #if TBB_TEST_LOW_WORKLOAD 1470 TestAbilityToCreateWorkers(24); 1471 #else 1472 TestAbilityToCreateWorkers(256); 1473 #endif 1474 } 1475 1476 #if TBB_USE_EXCEPTIONS 1477 1478 void ExceptionInExecute() { 1479 std::size_t thread_number = utils::get_platform_max_threads(); 1480 int arena_concurrency = static_cast<int>(thread_number) / 2; 1481 tbb::task_arena test_arena(arena_concurrency, arena_concurrency); 1482 1483 std::atomic<int> canceled_task{}; 1484 1485 auto parallel_func = [&test_arena, &canceled_task] (std::size_t) { 1486 for (std::size_t i = 0; i < 1000; ++i) { 1487 try { 1488 test_arena.execute([] { 1489 volatile bool suppress_unreachable_code_warning = true; 1490 if (suppress_unreachable_code_warning) { 1491 throw -1; 1492 } 1493 }); 1494 FAIL("An exception should have thrown."); 1495 } catch (int) { 1496 ++canceled_task; 1497 } catch (...) { 1498 FAIL("Wrong type of exception."); 1499 } 1500 } 1501 }; 1502 1503 utils::NativeParallelFor(thread_number, parallel_func); 1504 CHECK(canceled_task == thread_number * 1000); 1505 } 1506 1507 #endif // TBB_USE_EXCEPTIONS 1508 1509 class simple_observer : public tbb::task_scheduler_observer { 1510 static std::atomic<int> idx_counter; 1511 int my_idx; 1512 int myMaxConcurrency; // concurrency of the associated arena 1513 int myNumReservedSlots; // reserved slots in the associated arena 1514 void on_scheduler_entry( bool is_worker ) override { 1515 int current_index = tbb::this_task_arena::current_thread_index(); 1516 CHECK(current_index < (myMaxConcurrency > 1 ? myMaxConcurrency : 2)); 1517 if (is_worker) { 1518 CHECK(current_index >= myNumReservedSlots); 1519 } 1520 } 1521 void on_scheduler_exit( bool /*is_worker*/ ) override 1522 {} 1523 public: 1524 simple_observer(tbb::task_arena &a, int maxConcurrency, int numReservedSlots) 1525 : tbb::task_scheduler_observer(a), my_idx(idx_counter++) 1526 , myMaxConcurrency(maxConcurrency) 1527 , myNumReservedSlots(numReservedSlots) { 1528 observe(true); 1529 } 1530 1531 friend bool operator<(const simple_observer& lhs, const simple_observer& rhs) { 1532 return lhs.my_idx < rhs.my_idx; 1533 } 1534 }; 1535 1536 std::atomic<int> simple_observer::idx_counter{}; 1537 1538 struct arena_handler { 1539 enum arena_status { 1540 alive, 1541 deleting, 1542 deleted 1543 }; 1544 1545 tbb::task_arena* arena; 1546 1547 std::atomic<arena_status> status{alive}; 1548 tbb::spin_rw_mutex arena_in_use{}; 1549 1550 tbb::concurrent_set<simple_observer> observers; 1551 1552 arena_handler(tbb::task_arena* ptr) : arena(ptr) 1553 {} 1554 1555 friend bool operator<(const arena_handler& lhs, const arena_handler& rhs) { 1556 return lhs.arena < rhs.arena; 1557 } 1558 }; 1559 1560 // TODO: Add observer operations 1561 void StressTestMixFunctionality() { 1562 enum operation_type { 1563 create_arena, 1564 delete_arena, 1565 attach_observer, 1566 detach_observer, 1567 arena_execute, 1568 enqueue_task, 1569 last_operation_marker 1570 }; 1571 1572 std::size_t operations_number = last_operation_marker; 1573 std::size_t thread_number = utils::get_platform_max_threads(); 1574 utils::FastRandom<> operation_rnd(42); 1575 tbb::spin_mutex random_operation_guard; 1576 1577 auto get_random_operation = [&operation_rnd, &random_operation_guard, operations_number] () { 1578 tbb::spin_mutex::scoped_lock lock(random_operation_guard); 1579 return static_cast<operation_type>(operation_rnd.get() % operations_number); 1580 }; 1581 1582 utils::FastRandom<> arena_rnd(42); 1583 tbb::spin_mutex random_arena_guard; 1584 auto get_random_arena = [&arena_rnd, &random_arena_guard] () { 1585 tbb::spin_mutex::scoped_lock lock(random_arena_guard); 1586 return arena_rnd.get(); 1587 }; 1588 1589 tbb::concurrent_set<arena_handler> arenas_pool; 1590 1591 std::vector<std::thread> thread_pool; 1592 1593 utils::SpinBarrier thread_barrier(thread_number); 1594 std::size_t max_operations = 20000; 1595 std::atomic<std::size_t> curr_operation{}; 1596 1597 auto find_arena = [&arenas_pool](tbb::spin_rw_mutex::scoped_lock& lock) -> decltype(arenas_pool.begin()) { 1598 for (auto curr_arena = arenas_pool.begin(); curr_arena != arenas_pool.end(); ++curr_arena) { 1599 if (lock.try_acquire(curr_arena->arena_in_use, /*writer*/ false)) { 1600 if (curr_arena->status == arena_handler::alive) { 1601 return curr_arena; 1602 } 1603 else { 1604 lock.release(); 1605 } 1606 } 1607 } 1608 return arenas_pool.end(); 1609 }; 1610 1611 auto thread_func = [&] () { 1612 arenas_pool.emplace(new tbb::task_arena()); 1613 thread_barrier.wait(); 1614 while (curr_operation++ < max_operations) { 1615 switch (get_random_operation()) { 1616 case create_arena : 1617 { 1618 arenas_pool.emplace(new tbb::task_arena()); 1619 break; 1620 } 1621 case delete_arena : 1622 { 1623 auto curr_arena = arenas_pool.begin(); 1624 for (; curr_arena != arenas_pool.end(); ++curr_arena) { 1625 arena_handler::arena_status curr_status = arena_handler::alive; 1626 if (curr_arena->status.compare_exchange_strong(curr_status, arena_handler::deleting)) { 1627 break; 1628 } 1629 } 1630 1631 if (curr_arena == arenas_pool.end()) break; 1632 1633 tbb::spin_rw_mutex::scoped_lock lock(curr_arena->arena_in_use, /*writer*/ true); 1634 1635 delete curr_arena->arena; 1636 curr_arena->status.store(arena_handler::deleted); 1637 1638 break; 1639 } 1640 case attach_observer : 1641 { 1642 tbb::spin_rw_mutex::scoped_lock lock{}; 1643 1644 auto curr_arena = find_arena(lock); 1645 if (curr_arena != arenas_pool.end()) { 1646 curr_arena->observers.emplace(*curr_arena->arena, thread_number, 1); 1647 } 1648 break; 1649 } 1650 case detach_observer: 1651 { 1652 auto arena_number = get_random_arena() % arenas_pool.size(); 1653 auto curr_arena = arenas_pool.begin(); 1654 std::advance(curr_arena, arena_number); 1655 1656 for (auto it = curr_arena->observers.begin(); it != curr_arena->observers.end(); ++it) { 1657 if (it->is_observing()) { 1658 it->observe(false); 1659 break; 1660 } 1661 } 1662 1663 break; 1664 } 1665 case arena_execute: 1666 { 1667 tbb::spin_rw_mutex::scoped_lock lock{}; 1668 auto curr_arena = find_arena(lock); 1669 1670 if (curr_arena != arenas_pool.end()) { 1671 curr_arena->arena->execute([]() { 1672 static tbb::affinity_partitioner aff; 1673 tbb::parallel_for(0, 10000, utils::DummyBody(10), tbb::auto_partitioner{}); 1674 tbb::parallel_for(0, 10000, utils::DummyBody(10), aff); 1675 }); 1676 } 1677 1678 break; 1679 } 1680 case enqueue_task: 1681 { 1682 tbb::spin_rw_mutex::scoped_lock lock{}; 1683 auto curr_arena = find_arena(lock); 1684 1685 if (curr_arena != arenas_pool.end()) { 1686 curr_arena->arena->enqueue([] { utils::doDummyWork(1000); }); 1687 } 1688 1689 break; 1690 } 1691 case last_operation_marker : 1692 break; 1693 } 1694 } 1695 }; 1696 1697 for (std::size_t i = 0; i < thread_number - 1; ++i) { 1698 thread_pool.emplace_back(thread_func); 1699 } 1700 1701 thread_func(); 1702 1703 for (std::size_t i = 0; i < thread_number - 1; ++i) { 1704 if (thread_pool[i].joinable()) thread_pool[i].join(); 1705 } 1706 1707 for (auto& handler : arenas_pool) { 1708 if (handler.status != arena_handler::deleted) delete handler.arena; 1709 } 1710 } 1711 1712 struct enqueue_test_helper { 1713 enqueue_test_helper(tbb::task_arena& arena, tbb::enumerable_thread_specific<bool>& ets , std::atomic<std::size_t>& task_counter) 1714 : my_arena(arena), my_ets(ets), my_task_counter(task_counter) 1715 {} 1716 1717 enqueue_test_helper(const enqueue_test_helper& ef) : my_arena(ef.my_arena), my_ets(ef.my_ets), my_task_counter(ef.my_task_counter) 1718 {} 1719 1720 void operator() () const { 1721 CHECK(my_ets.local()); 1722 if (my_task_counter++ < 100000) my_arena.enqueue(enqueue_test_helper(my_arena, my_ets, my_task_counter)); 1723 utils::yield(); 1724 } 1725 1726 tbb::task_arena& my_arena; 1727 tbb::enumerable_thread_specific<bool>& my_ets; 1728 std::atomic<std::size_t>& my_task_counter; 1729 }; 1730 1731 //--------------------------------------------------// 1732 //! Test for task arena in concurrent cases 1733 //! \brief \ref requirement 1734 TEST_CASE("Test for concurrent functionality") { 1735 TestConcurrentFunctionality(); 1736 } 1737 1738 //! Test for arena entry consistency 1739 //! \brief \ref requirement \ref error_guessing 1740 TEST_CASE("Test for task arena entry consistency") { 1741 TestArenaEntryConsistency(); 1742 } 1743 1744 //! Test for task arena attach functionality 1745 //! \brief \ref requirement \ref interface 1746 TEST_CASE("Test for the attach functionality") { 1747 TestAttach(4); 1748 } 1749 1750 //! Test for constant functor requirements 1751 //! \brief \ref requirement \ref interface 1752 TEST_CASE("Test for constant functor requirement") { 1753 TestConstantFunctorRequirement(); 1754 } 1755 1756 //! Test for move semantics support 1757 //! \brief \ref requirement \ref interface 1758 TEST_CASE("Move semantics support") { 1759 TestMoveSemantics(); 1760 } 1761 1762 //! Test for different return value types 1763 //! \brief \ref requirement \ref interface 1764 TEST_CASE("Return value test") { 1765 TestReturnValue(); 1766 } 1767 1768 //! Test for delegated task spawn in case of unsuccessful slot attach 1769 //! \brief \ref error_guessing 1770 TEST_CASE("Delegated spawn wait") { 1771 TestDelegatedSpawnWait(); 1772 } 1773 1774 //! Test task arena isolation functionality 1775 //! \brief \ref requirement \ref interface 1776 TEST_CASE("Isolated execute") { 1777 // Isolation tests cases is valid only for more then 2 threads 1778 if (tbb::this_task_arena::max_concurrency() > 2) { 1779 TestIsolatedExecute(); 1780 } 1781 } 1782 1783 //! Test for TBB Workers creation limits 1784 //! \brief \ref requirement 1785 TEST_CASE("Default workers limit") { 1786 TestDefaultWorkersLimit(); 1787 } 1788 1789 //! Test for workers migration between arenas 1790 //! \brief \ref error_guessing \ref stress 1791 TEST_CASE("Arena workers migration") { 1792 TestArenaWorkersMigration(); 1793 } 1794 1795 //! Test for multiple waits, threads should not block each other 1796 //! \brief \ref requirement 1797 TEST_CASE("Multiple waits") { 1798 TestMultipleWaits(); 1799 } 1800 1801 //! Test for small stack size settings and arena initialization 1802 //! \brief \ref error_guessing 1803 TEST_CASE("Small stack size") { 1804 TestSmallStackSize(); 1805 } 1806 1807 #if TBB_USE_EXCEPTIONS 1808 //! \brief \ref requirement \ref stress 1809 TEST_CASE("Test for exceptions during execute.") { 1810 ExceptionInExecute(); 1811 } 1812 1813 //! \brief \ref error_guessing 1814 TEST_CASE("Exception thrown during tbb::task_arena::execute call") { 1815 struct throwing_obj { 1816 throwing_obj() { 1817 volatile bool flag = true; 1818 if (flag) throw std::exception{}; 1819 } 1820 throwing_obj(const throwing_obj&) = default; 1821 ~throwing_obj() { FAIL("An destructor was called."); } 1822 }; 1823 1824 tbb::task_arena arena; 1825 1826 REQUIRE_THROWS_AS( [&] { 1827 arena.execute([] { 1828 return throwing_obj{}; 1829 }); 1830 }(), std::exception ); 1831 } 1832 #endif // TBB_USE_EXCEPTIONS 1833 1834 //! \brief \ref stress 1835 TEST_CASE("Stress test with mixing functionality") { 1836 StressTestMixFunctionality(); 1837 } 1838 1839 //! \brief \ref stress 1840 TEST_CASE("Workers oversubscription") { 1841 std::size_t num_threads = utils::get_platform_max_threads(); 1842 tbb::enumerable_thread_specific<bool> ets; 1843 tbb::global_control gl(tbb::global_control::max_allowed_parallelism, num_threads * 2); 1844 tbb::task_arena arena(static_cast<int>(num_threads) * 2); 1845 1846 utils::SpinBarrier barrier(num_threads * 2); 1847 1848 arena.execute([&] { 1849 tbb::parallel_for(std::size_t(0), num_threads * 2, 1850 [&] (const std::size_t&) { 1851 ets.local() = true; 1852 barrier.wait(); 1853 } 1854 ); 1855 }); 1856 1857 utils::yield(); 1858 1859 std::atomic<std::size_t> task_counter{0}; 1860 for (std::size_t i = 0; i < num_threads / 4 + 1; ++i) { 1861 arena.enqueue(enqueue_test_helper(arena, ets, task_counter)); 1862 } 1863 1864 while (task_counter < 100000) utils::yield(); 1865 1866 arena.execute([&] { 1867 tbb::parallel_for(std::size_t(0), num_threads * 2, 1868 [&] (const std::size_t&) { 1869 CHECK(ets.local()); 1870 barrier.wait(); 1871 } 1872 ); 1873 }); 1874 } 1875 1876 #if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS 1877 //! Basic test for arena::enqueue with task handle 1878 //! \brief \ref interface \ref requirement 1879 TEST_CASE("enqueue task_handle") { 1880 tbb::task_arena arena; 1881 tbb::task_group tg; 1882 1883 std::atomic<bool> run{false}; 1884 1885 auto task_handle = tg.defer([&]{ run = true; }); 1886 1887 arena.enqueue(std::move(task_handle)); 1888 tg.wait(); 1889 1890 CHECK(run == true); 1891 } 1892 1893 //! Basic test for this_task_arena::enqueue with task handle 1894 //! \brief \ref interface \ref requirement 1895 TEST_CASE("this_task_arena::enqueue task_handle") { 1896 tbb::task_arena arena; 1897 tbb::task_group tg; 1898 1899 std::atomic<bool> run{false}; 1900 1901 arena.execute([&]{ 1902 auto task_handle = tg.defer([&]{ run = true; }); 1903 1904 tbb::this_task_arena::enqueue(std::move(task_handle)); 1905 }); 1906 1907 tg.wait(); 1908 1909 CHECK(run == true); 1910 } 1911 1912 //! Basic test for this_task_arena::enqueue with functor 1913 //! \brief \ref interface \ref requirement 1914 TEST_CASE("this_task_arena::enqueue function") { 1915 tbb::task_arena arena; 1916 tbb::task_group tg; 1917 1918 std::atomic<bool> run{false}; 1919 //block the task_group to wait on it 1920 auto task_handle = tg.defer([]{}); 1921 1922 arena.execute([&]{ 1923 tbb::this_task_arena::enqueue([&]{ 1924 run = true; 1925 //release the task_group 1926 task_handle = tbb::task_handle{}; 1927 }); 1928 }); 1929 1930 tg.wait(); 1931 1932 CHECK(run == true); 1933 } 1934 1935 #if TBB_USE_EXCEPTIONS 1936 //! Basic test for exceptions in task_arena::enqueue with task_handle 1937 //! \brief \ref interface \ref requirement 1938 TEST_CASE("task_arena::enqueue(task_handle) exception propagation"){ 1939 tbb::task_group tg; 1940 tbb::task_arena arena; 1941 1942 tbb::task_handle h = tg.defer([&]{ 1943 volatile bool suppress_unreachable_code_warning = true; 1944 if (suppress_unreachable_code_warning) { 1945 throw std::runtime_error{ "" }; 1946 } 1947 }); 1948 1949 arena.enqueue(std::move(h)); 1950 1951 CHECK_THROWS_AS(tg.wait(), std::runtime_error); 1952 } 1953 1954 //! Basic test for exceptions in this_task_arena::enqueue with task_handle 1955 //! \brief \ref interface \ref requirement 1956 TEST_CASE("this_task_arena::enqueue(task_handle) exception propagation"){ 1957 tbb::task_group tg; 1958 1959 tbb::task_handle h = tg.defer([&]{ 1960 volatile bool suppress_unreachable_code_warning = true; 1961 if (suppress_unreachable_code_warning) { 1962 throw std::runtime_error{ "" }; 1963 } 1964 }); 1965 1966 tbb::this_task_arena::enqueue(std::move(h)); 1967 1968 CHECK_THROWS_AS(tg.wait(), std::runtime_error); 1969 } 1970 1971 //! The test for error in scheduling empty task_handle 1972 //! \brief \ref requirement 1973 TEST_CASE("Empty task_handle cannot be scheduled"){ 1974 tbb::task_arena ta; 1975 1976 CHECK_THROWS_WITH_AS(ta.enqueue(tbb::task_handle{}), "Attempt to schedule empty task_handle", std::runtime_error); 1977 CHECK_THROWS_WITH_AS(tbb::this_task_arena::enqueue(tbb::task_handle{}), "Attempt to schedule empty task_handle", std::runtime_error); 1978 } 1979 #endif // TBB_USE_EXCEPTIONS 1980 1981 //! Basic test for is_inside_task in task_group 1982 //! \brief \ref interface \ref requirement 1983 TEST_CASE("is_inside_task in task_group"){ 1984 CHECK( false == tbb::is_inside_task()); 1985 1986 tbb::task_group tg; 1987 tg.run_and_wait([&]{ 1988 CHECK( true == tbb::is_inside_task()); 1989 }); 1990 } 1991 1992 //! Basic test for is_inside_task in arena::execute 1993 //! \brief \ref interface \ref requirement 1994 TEST_CASE("is_inside_task in arena::execute"){ 1995 CHECK( false == tbb::is_inside_task()); 1996 1997 tbb::task_arena arena; 1998 1999 arena.execute([&]{ 2000 // The execute method is processed outside of any task 2001 CHECK( false == tbb::is_inside_task()); 2002 }); 2003 } 2004 2005 //! The test for is_inside_task in arena::execute when inside other task 2006 //! \brief \ref error_guessing 2007 TEST_CASE("is_inside_task in arena::execute") { 2008 CHECK(false == tbb::is_inside_task()); 2009 2010 tbb::task_arena arena; 2011 tbb::task_group tg; 2012 tg.run_and_wait([&] { 2013 arena.execute([&] { 2014 // The execute method is processed outside of any task 2015 CHECK(false == tbb::is_inside_task()); 2016 }); 2017 }); 2018 } 2019 #endif //__TBB_PREVIEW_TASK_GROUP_EXTENSIONS 2020