1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "common/test.h" 18 19 #define __TBB_EXTRA_DEBUG 1 20 #include "common/concurrency_tracker.h" 21 #include "common/spin_barrier.h" 22 #include "common/utils.h" 23 #include "common/utils_report.h" 24 #include "common/utils_concurrency_limit.h" 25 26 #include "tbb/task_arena.h" 27 #include "tbb/task_scheduler_observer.h" 28 #include "tbb/enumerable_thread_specific.h" 29 #include "tbb/parallel_for.h" 30 #include "tbb/global_control.h" 31 #include "tbb/concurrent_set.h" 32 #include "tbb/spin_mutex.h" 33 #include "tbb/spin_rw_mutex.h" 34 #include "tbb/task_group.h" 35 36 #include <stdexcept> 37 #include <cstdlib> 38 #include <cstdio> 39 #include <vector> 40 #include <thread> 41 #include <atomic> 42 43 //#include "harness_fp.h" 44 45 //! \file test_task_arena.cpp 46 //! \brief Test for [scheduler.task_arena scheduler.task_scheduler_observer] specification 47 48 //--------------------------------------------------// 49 // Test that task_arena::initialize and task_arena::terminate work when doing nothing else. 50 /* maxthread is treated as the biggest possible concurrency level. */ 51 void InitializeAndTerminate( int maxthread ) { 52 for( int i=0; i<200; ++i ) { 53 switch( i&3 ) { 54 // Arena is created inactive, initialization is always explicit. Lazy initialization is covered by other test functions. 55 // Explicit initialization can either keep the original values or change those. 56 // Arena termination can be explicit or implicit (in the destructor). 57 // TODO: extend with concurrency level checks if such a method is added. 58 default: { 59 tbb::task_arena arena( std::rand() % maxthread + 1 ); 60 CHECK_MESSAGE(!arena.is_active(), "arena should not be active until initialized"); 61 arena.initialize(); 62 CHECK(arena.is_active()); 63 arena.terminate(); 64 CHECK_MESSAGE(!arena.is_active(), "arena should not be active; it was terminated"); 65 break; 66 } 67 case 0: { 68 tbb::task_arena arena( 1 ); 69 CHECK_MESSAGE(!arena.is_active(), "arena should not be active until initialized"); 70 arena.initialize( std::rand() % maxthread + 1 ); // change the parameters 71 CHECK(arena.is_active()); 72 break; 73 } 74 case 1: { 75 tbb::task_arena arena( tbb::task_arena::automatic ); 76 CHECK(!arena.is_active()); 77 arena.initialize(); 78 CHECK(arena.is_active()); 79 break; 80 } 81 case 2: { 82 tbb::task_arena arena; 83 CHECK_MESSAGE(!arena.is_active(), "arena should not be active until initialized"); 84 arena.initialize( std::rand() % maxthread + 1 ); 85 CHECK(arena.is_active()); 86 arena.terminate(); 87 CHECK_MESSAGE(!arena.is_active(), "arena should not be active; it was terminated"); 88 break; 89 } 90 } 91 } 92 } 93 94 //--------------------------------------------------// 95 96 // Definitions used in more than one test 97 typedef tbb::blocked_range<int> Range; 98 99 // slot_id value: -1 is reserved by current_slot(), -2 is set in on_scheduler_exit() below 100 static tbb::enumerable_thread_specific<int> local_id, old_id, slot_id(-3); 101 102 void ResetTLS() { 103 local_id.clear(); 104 old_id.clear(); 105 slot_id.clear(); 106 } 107 108 class ArenaObserver : public tbb::task_scheduler_observer { 109 int myId; // unique observer/arena id within a test 110 int myMaxConcurrency; // concurrency of the associated arena 111 int myNumReservedSlots; // reserved slots in the associated arena 112 void on_scheduler_entry( bool is_worker ) override { 113 int current_index = tbb::this_task_arena::current_thread_index(); 114 CHECK(current_index < (myMaxConcurrency > 1 ? myMaxConcurrency : 2)); 115 if (is_worker) { 116 CHECK(current_index >= myNumReservedSlots); 117 } 118 CHECK_MESSAGE(!old_id.local(), "double call to on_scheduler_entry"); 119 old_id.local() = local_id.local(); 120 CHECK_MESSAGE(old_id.local() != myId, "double entry to the same arena"); 121 local_id.local() = myId; 122 slot_id.local() = current_index; 123 } 124 void on_scheduler_exit( bool /*is_worker*/ ) override { 125 CHECK_MESSAGE(local_id.local() == myId, "nesting of arenas is broken"); 126 CHECK(slot_id.local() == tbb::this_task_arena::current_thread_index()); 127 slot_id.local() = -2; 128 local_id.local() = old_id.local(); 129 old_id.local() = 0; 130 } 131 public: 132 ArenaObserver(tbb::task_arena &a, int maxConcurrency, int numReservedSlots, int id) 133 : tbb::task_scheduler_observer(a) 134 , myId(id) 135 , myMaxConcurrency(maxConcurrency) 136 , myNumReservedSlots(numReservedSlots) { 137 CHECK(myId); 138 observe(true); 139 } 140 ~ArenaObserver () { 141 CHECK_MESSAGE(!old_id.local(), "inconsistent observer state"); 142 } 143 }; 144 145 struct IndexTrackingBody { // Must be used together with ArenaObserver 146 void operator() ( const Range& ) const { 147 CHECK(slot_id.local() == tbb::this_task_arena::current_thread_index()); 148 utils::doDummyWork(50000); 149 } 150 }; 151 152 struct AsynchronousWork { 153 utils::SpinBarrier &my_barrier; 154 bool my_is_blocking; 155 AsynchronousWork(utils::SpinBarrier &a_barrier, bool blocking = true) 156 : my_barrier(a_barrier), my_is_blocking(blocking) {} 157 void operator()() const { 158 CHECK_MESSAGE(local_id.local() != 0, "not in explicit arena"); 159 tbb::parallel_for(Range(0,500), IndexTrackingBody(), tbb::simple_partitioner()); 160 if(my_is_blocking) my_barrier.wait(); // must be asynchronous to an external thread 161 else my_barrier.signalNoWait(); 162 } 163 }; 164 165 //-----------------------------------------------------------------------------------------// 166 167 // Test that task_arenas might be created and used from multiple application threads. 168 // Also tests arena observers. The parameter p is the index of an app thread running this test. 169 void TestConcurrentArenasFunc(int idx) { 170 // A regression test for observer activation order: 171 // check that arena observer can be activated before local observer 172 struct LocalObserver : public tbb::task_scheduler_observer { 173 LocalObserver() : tbb::task_scheduler_observer() { observe(true); } 174 }; 175 tbb::task_arena a1; 176 a1.initialize(1,0); 177 ArenaObserver o1(a1, 1, 0, idx*2+1); // the last argument is a "unique" observer/arena id for the test 178 CHECK_MESSAGE(o1.is_observing(), "Arena observer has not been activated"); 179 LocalObserver lo; 180 CHECK_MESSAGE(lo.is_observing(), "Local observer has not been activated"); 181 tbb::task_arena a2(2,1); 182 ArenaObserver o2(a2, 2, 1, idx*2+2); 183 CHECK_MESSAGE(o2.is_observing(), "Arena observer has not been activated"); 184 utils::SpinBarrier barrier(2); 185 AsynchronousWork work(barrier); 186 a1.enqueue(work); // put async work 187 barrier.wait(); 188 a2.enqueue(work); // another work 189 a2.execute(work); 190 a1.debug_wait_until_empty(); 191 a2.debug_wait_until_empty(); 192 } 193 194 void TestConcurrentArenas(int p) { 195 // TODO REVAMP fix with global control 196 ResetTLS(); 197 utils::NativeParallelFor( p, &TestConcurrentArenasFunc ); 198 } 199 //--------------------------------------------------// 200 // Test multiple application threads working with a single arena at the same time. 201 class MultipleMastersPart1 : utils::NoAssign { 202 tbb::task_arena &my_a; 203 utils::SpinBarrier &my_b1, &my_b2; 204 public: 205 MultipleMastersPart1( tbb::task_arena &a, utils::SpinBarrier &b1, utils::SpinBarrier &b2) 206 : my_a(a), my_b1(b1), my_b2(b2) {} 207 void operator()(int) const { 208 my_a.execute(AsynchronousWork(my_b2, /*blocking=*/false)); 209 my_b1.wait(); 210 // A regression test for bugs 1954 & 1971 211 my_a.enqueue(AsynchronousWork(my_b2, /*blocking=*/false)); 212 } 213 }; 214 215 class MultipleMastersPart2 : utils::NoAssign { 216 tbb::task_arena &my_a; 217 utils::SpinBarrier &my_b; 218 public: 219 MultipleMastersPart2( tbb::task_arena &a, utils::SpinBarrier &b) : my_a(a), my_b(b) {} 220 void operator()(int) const { 221 my_a.execute(AsynchronousWork(my_b, /*blocking=*/false)); 222 } 223 }; 224 225 class MultipleMastersPart3 : utils::NoAssign { 226 tbb::task_arena &my_a; 227 utils::SpinBarrier &my_b; 228 using wait_context = tbb::detail::d1::wait_context; 229 230 struct Runner : NoAssign { 231 wait_context& myWait; 232 Runner(wait_context& w) : myWait(w) {} 233 void operator()() const { 234 utils::doDummyWork(10000); 235 myWait.release(); 236 } 237 }; 238 239 struct Waiter : NoAssign { 240 wait_context& myWait; 241 Waiter(wait_context& w) : myWait(w) {} 242 void operator()() const { 243 tbb::task_group_context ctx; 244 tbb::detail::d1::wait(myWait, ctx); 245 } 246 }; 247 248 public: 249 MultipleMastersPart3(tbb::task_arena &a, utils::SpinBarrier &b) 250 : my_a(a), my_b(b) {} 251 void operator()(int) const { 252 wait_context wait(0); 253 my_b.wait(); // increases chances for task_arena initialization contention 254 for( int i=0; i<100; ++i) { 255 wait.reserve(); 256 my_a.enqueue(Runner(wait)); 257 my_a.execute(Waiter(wait)); 258 } 259 my_b.wait(); 260 } 261 }; 262 263 void TestMultipleMasters(int p) { 264 { 265 ResetTLS(); 266 tbb::task_arena a(1,0); 267 a.initialize(); 268 ArenaObserver o(a, 1, 0, 1); 269 utils::SpinBarrier barrier1(p), barrier2(2*p+1); // each of p threads will submit two tasks signaling the barrier 270 NativeParallelFor( p, MultipleMastersPart1(a, barrier1, barrier2) ); 271 barrier2.wait(); 272 a.debug_wait_until_empty(); 273 } { 274 ResetTLS(); 275 tbb::task_arena a(2,1); 276 ArenaObserver o(a, 2, 1, 2); 277 utils::SpinBarrier barrier(p+2); 278 a.enqueue(AsynchronousWork(barrier, /*blocking=*/true)); // occupy the worker, a regression test for bug 1981 279 // TODO: buggy test. A worker threads need time to occupy the slot to prevent an external thread from taking an enqueue task. 280 utils::Sleep(10); 281 NativeParallelFor( p, MultipleMastersPart2(a, barrier) ); 282 barrier.wait(); 283 a.debug_wait_until_empty(); 284 } { 285 // Regression test for the bug 1981 part 2 (task_arena::execute() with wait_for_all for an enqueued task) 286 tbb::task_arena a(p,1); 287 utils::SpinBarrier barrier(p+1); // for external threads to avoid endless waiting at least in some runs 288 // "Oversubscribe" the arena by 1 external thread 289 NativeParallelFor( p+1, MultipleMastersPart3(a, barrier) ); 290 a.debug_wait_until_empty(); 291 } 292 } 293 294 //--------------------------------------------------// 295 // TODO: explain what TestArenaEntryConsistency does 296 #include <sstream> 297 #include <stdexcept> 298 #include "oneapi/tbb/detail/_exception.h" 299 #include "common/fp_control.h" 300 301 struct TestArenaEntryBody : FPModeContext { 302 std::atomic<int> &my_stage; // each execute increases it 303 std::stringstream my_id; 304 bool is_caught, is_expected; 305 enum { arenaFPMode = 1 }; 306 307 TestArenaEntryBody(std::atomic<int> &s, int idx, int i) // init thread-specific instance 308 : FPModeContext(idx+i) 309 , my_stage(s) 310 , is_caught(false) 311 #if TBB_USE_EXCEPTIONS 312 , is_expected( (idx&(1<<i)) != 0 ) 313 #else 314 , is_expected(false) 315 #endif 316 { 317 my_id << idx << ':' << i << '@'; 318 } 319 void operator()() { // inside task_arena::execute() 320 // synchronize with other stages 321 int stage = my_stage++; 322 int slot = tbb::this_task_arena::current_thread_index(); 323 CHECK(slot >= 0); 324 CHECK(slot <= 1); 325 // wait until the third stage is delegated and then starts on slot 0 326 while(my_stage < 2+slot) utils::yield(); 327 // deduct its entry type and put it into id, it helps to find source of a problem 328 my_id << (stage < 3 ? (tbb::this_task_arena::current_thread_index()? 329 "delegated_to_worker" : stage < 2? "direct" : "delegated_to_master") 330 : stage == 3? "nested_same_ctx" : "nested_alien_ctx"); 331 AssertFPMode(arenaFPMode); 332 if (is_expected) { 333 TBB_TEST_THROW(std::logic_error(my_id.str())); 334 } 335 // no code can be put here since exceptions can be thrown 336 } 337 void on_exception(const char *e) { // outside arena, in catch block 338 is_caught = true; 339 CHECK(my_id.str() == e); 340 assertFPMode(); 341 } 342 void after_execute() { // outside arena and catch block 343 CHECK(is_caught == is_expected); 344 assertFPMode(); 345 } 346 }; 347 348 class ForEachArenaEntryBody : utils::NoAssign { 349 tbb::task_arena &my_a; // expected task_arena(2,1) 350 std::atomic<int> &my_stage; // each execute increases it 351 int my_idx; 352 353 public: 354 ForEachArenaEntryBody(tbb::task_arena &a, std::atomic<int> &c) 355 : my_a(a), my_stage(c), my_idx(0) {} 356 357 void test(int idx) { 358 my_idx = idx; 359 my_stage = 0; 360 NativeParallelFor(3, *this); // test cross-arena calls 361 CHECK(my_stage == 3); 362 my_a.execute(*this); // test nested calls 363 CHECK(my_stage == 5); 364 } 365 366 // task_arena functor for nested tests 367 void operator()() const { 368 test_arena_entry(3); // in current task group context 369 tbb::parallel_for(4, 5, *this); // in different context 370 } 371 372 // NativeParallelFor & parallel_for functor 373 void operator()(int i) const { 374 test_arena_entry(i); 375 } 376 377 private: 378 void test_arena_entry(int i) const { 379 GetRoundingMode(); 380 TestArenaEntryBody scoped_functor(my_stage, my_idx, i); 381 GetRoundingMode(); 382 #if TBB_USE_EXCEPTIONS 383 try { 384 my_a.execute(scoped_functor); 385 } catch(std::logic_error &e) { 386 scoped_functor.on_exception(e.what()); 387 } catch(...) { CHECK_MESSAGE(false, "Unexpected exception type"); } 388 #else 389 my_a.execute(scoped_functor); 390 #endif 391 scoped_functor.after_execute(); 392 } 393 }; 394 395 void TestArenaEntryConsistency() { 396 tbb::task_arena a(2, 1); 397 std::atomic<int> c; 398 ForEachArenaEntryBody body(a, c); 399 400 FPModeContext fp_scope(TestArenaEntryBody::arenaFPMode); 401 a.initialize(); // capture FP settings to arena 402 fp_scope.setNextFPMode(); 403 404 for (int i = 0; i < 100; i++) // not less than 32 = 2^5 of entry types 405 body.test(i); 406 } 407 //-------------------------------------------------- 408 // Test that the requested degree of concurrency for task_arena is achieved in various conditions 409 class TestArenaConcurrencyBody : utils::NoAssign { 410 tbb::task_arena &my_a; 411 int my_max_concurrency; 412 int my_reserved_slots; 413 utils::SpinBarrier *my_barrier; 414 utils::SpinBarrier *my_worker_barrier; 415 public: 416 TestArenaConcurrencyBody( tbb::task_arena &a, int max_concurrency, int reserved_slots, utils::SpinBarrier *b = NULL, utils::SpinBarrier *wb = NULL ) 417 : my_a(a), my_max_concurrency(max_concurrency), my_reserved_slots(reserved_slots), my_barrier(b), my_worker_barrier(wb) {} 418 // NativeParallelFor's functor 419 void operator()( int ) const { 420 CHECK_MESSAGE( local_id.local() == 0, "TLS was not cleaned?" ); 421 local_id.local() = 1; 422 my_a.execute( *this ); 423 } 424 // Arena's functor 425 void operator()() const { 426 int idx = tbb::this_task_arena::current_thread_index(); 427 CHECK( idx < (my_max_concurrency > 1 ? my_max_concurrency : 2) ); 428 CHECK( my_a.max_concurrency() == tbb::this_task_arena::max_concurrency() ); 429 int max_arena_concurrency = tbb::this_task_arena::max_concurrency(); 430 CHECK( max_arena_concurrency == my_max_concurrency ); 431 if ( my_worker_barrier ) { 432 if ( local_id.local() == 1 ) { 433 // External thread in a reserved slot 434 CHECK_MESSAGE( idx < my_reserved_slots, "External threads are supposed to use only reserved slots in this test" ); 435 } else { 436 // Worker thread 437 CHECK( idx >= my_reserved_slots ); 438 my_worker_barrier->wait(); 439 } 440 } else if ( my_barrier ) 441 CHECK_MESSAGE( local_id.local() == 1, "Workers are not supposed to enter the arena in this test" ); 442 if ( my_barrier ) my_barrier->wait(); 443 else utils::Sleep( 1 ); 444 } 445 }; 446 447 void TestArenaConcurrency( int p, int reserved = 0, int step = 1) { 448 for (; reserved <= p; reserved += step) { 449 tbb::task_arena a( p, reserved ); 450 if (p - reserved < tbb::this_task_arena::max_concurrency()) { 451 // Check concurrency with worker & reserved external threads. 452 ResetTLS(); 453 utils::SpinBarrier b( p ); 454 utils::SpinBarrier wb( p-reserved ); 455 TestArenaConcurrencyBody test( a, p, reserved, &b, &wb ); 456 for ( int i = reserved; i < p; ++i ) // requests p-reserved worker threads 457 a.enqueue( test ); 458 if ( reserved==1 ) 459 test( 0 ); // calls execute() 460 else 461 utils::NativeParallelFor( reserved, test ); 462 a.debug_wait_until_empty(); 463 } { // Check if multiple external threads alone can achieve maximum concurrency. 464 ResetTLS(); 465 utils::SpinBarrier b( p ); 466 utils::NativeParallelFor( p, TestArenaConcurrencyBody( a, p, reserved, &b ) ); 467 a.debug_wait_until_empty(); 468 } { // Check oversubscription by external threads. 469 #if !_WIN32 || !_WIN64 470 // Some C++ implementations allocate 8MB stacks for std::thread on 32 bit platforms 471 // that makes impossible to create more than ~500 threads. 472 if ( !(sizeof(std::size_t) == 4 && p > 200) ) 473 #endif 474 #if TBB_TEST_LOW_WORKLOAD 475 if ( p <= 16 ) 476 #endif 477 { 478 ResetTLS(); 479 utils::NativeParallelFor(2 * p, TestArenaConcurrencyBody(a, p, reserved)); 480 a.debug_wait_until_empty(); 481 } 482 } 483 } 484 } 485 486 struct TestMandatoryConcurrencyObserver : public tbb::task_scheduler_observer { 487 utils::SpinBarrier& m_barrier; 488 489 TestMandatoryConcurrencyObserver(tbb::task_arena& a, utils::SpinBarrier& barrier) 490 : tbb::task_scheduler_observer(a), m_barrier(barrier) { 491 observe(true); 492 } 493 void on_scheduler_exit(bool worker) override { 494 if (worker) { 495 m_barrier.wait(); 496 } 497 } 498 }; 499 500 void TestMandatoryConcurrency() { 501 tbb::task_arena a(1); 502 a.execute([&a] { 503 int n_threads = 4; 504 utils::SpinBarrier exit_barrier(2); 505 TestMandatoryConcurrencyObserver observer(a, exit_barrier); 506 for (int j = 0; j < 5; ++j) { 507 utils::ExactConcurrencyLevel::check(1); 508 std::atomic<int> num_tasks{ 0 }, curr_tasks{ 0 }; 509 utils::SpinBarrier barrier(n_threads); 510 utils::NativeParallelFor(n_threads, [&](int) { 511 for (int i = 0; i < 5; ++i) { 512 barrier.wait(); 513 a.enqueue([&] { 514 CHECK(tbb::this_task_arena::max_concurrency() == 2); 515 CHECK(a.max_concurrency() == 2); 516 ++curr_tasks; 517 CHECK(curr_tasks == 1); 518 utils::doDummyWork(1000); 519 CHECK(curr_tasks == 1); 520 --curr_tasks; 521 ++num_tasks; 522 }); 523 barrier.wait(); 524 } 525 }); 526 do { 527 exit_barrier.wait(); 528 } while (num_tasks < n_threads * 5); 529 } 530 observer.observe(false); 531 }); 532 } 533 534 void TestConcurrentFunctionality(int min_thread_num = 1, int max_thread_num = 3) { 535 TestMandatoryConcurrency(); 536 InitializeAndTerminate(max_thread_num); 537 for (int p = min_thread_num; p <= max_thread_num; ++p) { 538 TestConcurrentArenas(p); 539 TestMultipleMasters(p); 540 TestArenaConcurrency(p); 541 } 542 } 543 544 //--------------------------------------------------// 545 // Test creation/initialization of a task_arena that references an existing arena (aka attach). 546 // This part of the test uses the knowledge of task_arena internals 547 548 struct TaskArenaValidator { 549 int my_slot_at_construction; 550 const tbb::task_arena& my_arena; 551 TaskArenaValidator( const tbb::task_arena& other ) 552 : my_slot_at_construction(tbb::this_task_arena::current_thread_index()) 553 , my_arena(other) 554 {} 555 // Inspect the internal state 556 int concurrency() { return my_arena.debug_max_concurrency(); } 557 int reserved_for_masters() { return my_arena.debug_reserved_slots(); } 558 559 // This method should be called in task_arena::execute() for a captured arena 560 // by the same thread that created the validator. 561 void operator()() { 562 CHECK_MESSAGE( tbb::this_task_arena::current_thread_index()==my_slot_at_construction, 563 "Current thread index has changed since the validator construction" ); 564 } 565 }; 566 567 void ValidateAttachedArena( tbb::task_arena& arena, bool expect_activated, 568 int expect_concurrency, int expect_masters ) { 569 CHECK_MESSAGE( arena.is_active()==expect_activated, "Unexpected activation state" ); 570 if( arena.is_active() ) { 571 TaskArenaValidator validator( arena ); 572 CHECK_MESSAGE( validator.concurrency()==expect_concurrency, "Unexpected arena size" ); 573 CHECK_MESSAGE( validator.reserved_for_masters()==expect_masters, "Unexpected # of reserved slots" ); 574 if ( tbb::this_task_arena::current_thread_index() != tbb::task_arena::not_initialized ) { 575 CHECK(tbb::this_task_arena::current_thread_index() >= 0); 576 // for threads already in arena, check that the thread index remains the same 577 arena.execute( validator ); 578 } else { // not_initialized 579 // Test the deprecated method 580 CHECK(tbb::this_task_arena::current_thread_index() == -1); 581 } 582 583 // Ideally, there should be a check for having the same internal arena object, 584 // but that object is not easily accessible for implicit arenas. 585 } 586 } 587 588 struct TestAttachBody : utils::NoAssign { 589 static thread_local int my_idx; // safe to modify and use within the NativeParallelFor functor 590 const int maxthread; 591 TestAttachBody( int max_thr ) : maxthread(max_thr) {} 592 593 // The functor body for NativeParallelFor 594 void operator()( int idx ) const { 595 my_idx = idx; 596 597 int default_threads = tbb::this_task_arena::max_concurrency(); 598 599 tbb::task_arena arena{tbb::task_arena::attach()}; 600 ValidateAttachedArena( arena, false, -1, -1 ); // Nothing yet to attach to 601 602 arena.terminate(); 603 ValidateAttachedArena( arena, false, -1, -1 ); 604 605 // attach to an auto-initialized arena 606 tbb::parallel_for(0, 1, [](int) {}); 607 608 tbb::task_arena arena2{tbb::task_arena::attach()}; 609 ValidateAttachedArena( arena2, true, default_threads, 1 ); 610 611 // attach to another task_arena 612 arena.initialize( maxthread, std::min(maxthread,idx) ); 613 arena.execute( *this ); 614 } 615 616 // The functor body for task_arena::execute above 617 void operator()() const { 618 tbb::task_arena arena2{tbb::task_arena::attach()}; 619 ValidateAttachedArena( arena2, true, maxthread, std::min(maxthread,my_idx) ); 620 } 621 622 // The functor body for tbb::parallel_for 623 void operator()( const Range& r ) const { 624 for( int i = r.begin(); i<r.end(); ++i ) { 625 tbb::task_arena arena2{tbb::task_arena::attach()}; 626 ValidateAttachedArena( arena2, true, tbb::this_task_arena::max_concurrency(), 1 ); 627 } 628 } 629 }; 630 631 thread_local int TestAttachBody::my_idx; 632 633 void TestAttach( int maxthread ) { 634 // Externally concurrent, but no concurrency within a thread 635 utils::NativeParallelFor( std::max(maxthread,4), TestAttachBody( maxthread ) ); 636 // Concurrent within the current arena; may also serve as a stress test 637 tbb::parallel_for( Range(0,10000*maxthread), TestAttachBody( maxthread ) ); 638 } 639 640 //--------------------------------------------------// 641 642 // Test that task_arena::enqueue does not tolerate a non-const functor. 643 // TODO: can it be reworked as SFINAE-based compile-time check? 644 struct TestFunctor { 645 void operator()() { CHECK_MESSAGE( false, "Non-const operator called" ); } 646 void operator()() const { /* library requires this overload only */ } 647 }; 648 649 void TestConstantFunctorRequirement() { 650 tbb::task_arena a; 651 TestFunctor tf; 652 a.enqueue( tf ); 653 } 654 655 //--------------------------------------------------// 656 657 #include "tbb/parallel_reduce.h" 658 #include "tbb/parallel_invoke.h" 659 660 // Test this_task_arena::isolate 661 namespace TestIsolatedExecuteNS { 662 template <typename NestedPartitioner> 663 class NestedParFor : utils::NoAssign { 664 public: 665 NestedParFor() {} 666 void operator()() const { 667 NestedPartitioner p; 668 tbb::parallel_for( 0, 10, utils::DummyBody( 10 ), p ); 669 } 670 }; 671 672 template <typename NestedPartitioner> 673 class ParForBody : utils::NoAssign { 674 bool myOuterIsolation; 675 tbb::enumerable_thread_specific<int> &myEts; 676 std::atomic<bool> &myIsStolen; 677 public: 678 ParForBody( bool outer_isolation, tbb::enumerable_thread_specific<int> &ets, std::atomic<bool> &is_stolen ) 679 : myOuterIsolation( outer_isolation ), myEts( ets ), myIsStolen( is_stolen ) {} 680 void operator()( int ) const { 681 int &e = myEts.local(); 682 if ( e++ > 0 ) myIsStolen = true; 683 if ( myOuterIsolation ) 684 NestedParFor<NestedPartitioner>()(); 685 else 686 tbb::this_task_arena::isolate( NestedParFor<NestedPartitioner>() ); 687 --e; 688 } 689 }; 690 691 template <typename OuterPartitioner, typename NestedPartitioner> 692 class OuterParFor : utils::NoAssign { 693 bool myOuterIsolation; 694 std::atomic<bool> &myIsStolen; 695 public: 696 OuterParFor( bool outer_isolation, std::atomic<bool> &is_stolen ) : myOuterIsolation( outer_isolation ), myIsStolen( is_stolen ) {} 697 void operator()() const { 698 tbb::enumerable_thread_specific<int> ets( 0 ); 699 OuterPartitioner p; 700 tbb::parallel_for( 0, 1000, ParForBody<NestedPartitioner>( myOuterIsolation, ets, myIsStolen ), p ); 701 } 702 }; 703 704 template <typename OuterPartitioner, typename NestedPartitioner> 705 void TwoLoopsTest( bool outer_isolation ) { 706 std::atomic<bool> is_stolen; 707 is_stolen = false; 708 const int max_repeats = 100; 709 if ( outer_isolation ) { 710 for ( int i = 0; i <= max_repeats; ++i ) { 711 tbb::this_task_arena::isolate( OuterParFor<OuterPartitioner, NestedPartitioner>( outer_isolation, is_stolen ) ); 712 if ( is_stolen ) break; 713 } 714 // TODO: was ASSERT_WARNING 715 if (!is_stolen) { 716 REPORT("Warning: isolate() should not block stealing on nested levels without isolation\n"); 717 } 718 } else { 719 for ( int i = 0; i <= max_repeats; ++i ) { 720 OuterParFor<OuterPartitioner, NestedPartitioner>( outer_isolation, is_stolen )(); 721 } 722 REQUIRE_MESSAGE( !is_stolen, "isolate() on nested levels should prevent stealing from outer leves" ); 723 } 724 } 725 726 void TwoLoopsTest( bool outer_isolation ) { 727 TwoLoopsTest<tbb::simple_partitioner, tbb::simple_partitioner>( outer_isolation ); 728 TwoLoopsTest<tbb::simple_partitioner, tbb::affinity_partitioner>( outer_isolation ); 729 TwoLoopsTest<tbb::affinity_partitioner, tbb::simple_partitioner>( outer_isolation ); 730 TwoLoopsTest<tbb::affinity_partitioner, tbb::affinity_partitioner>( outer_isolation ); 731 } 732 733 void TwoLoopsTest() { 734 TwoLoopsTest( true ); 735 TwoLoopsTest( false ); 736 } 737 //--------------------------------------------------// 738 class HeavyMixTestBody : utils::NoAssign { 739 tbb::enumerable_thread_specific<utils::FastRandom<>>& myRandom; 740 tbb::enumerable_thread_specific<int>& myIsolatedLevel; 741 int myNestedLevel; 742 743 template <typename Partitioner, typename Body> 744 static void RunTwoBodies( utils::FastRandom<>& rnd, const Body &body, Partitioner& p, tbb::task_group_context* ctx = NULL ) { 745 if ( rnd.get() % 2 ) { 746 if (ctx ) 747 tbb::parallel_for( 0, 2, body, p, *ctx ); 748 else 749 tbb::parallel_for( 0, 2, body, p ); 750 } else { 751 tbb::parallel_invoke( body, body ); 752 } 753 } 754 755 template <typename Partitioner> 756 class IsolatedBody : utils::NoAssign { 757 const HeavyMixTestBody &myHeavyMixTestBody; 758 Partitioner &myPartitioner; 759 public: 760 IsolatedBody( const HeavyMixTestBody &body, Partitioner &partitioner ) 761 : myHeavyMixTestBody( body ), myPartitioner( partitioner ) {} 762 void operator()() const { 763 RunTwoBodies( myHeavyMixTestBody.myRandom.local(), 764 HeavyMixTestBody( myHeavyMixTestBody.myRandom, myHeavyMixTestBody.myIsolatedLevel, 765 myHeavyMixTestBody.myNestedLevel + 1 ), 766 myPartitioner ); 767 } 768 }; 769 770 template <typename Partitioner> 771 void RunNextLevel( utils::FastRandom<>& rnd, int &isolated_level ) const { 772 Partitioner p; 773 switch ( rnd.get() % 2 ) { 774 case 0: { 775 // No features 776 tbb::task_group_context ctx; 777 RunTwoBodies( rnd, HeavyMixTestBody(myRandom, myIsolatedLevel, myNestedLevel + 1), p, &ctx ); 778 break; 779 } 780 case 1: { 781 // Isolation 782 int previous_isolation = isolated_level; 783 isolated_level = myNestedLevel; 784 tbb::this_task_arena::isolate( IsolatedBody<Partitioner>( *this, p ) ); 785 isolated_level = previous_isolation; 786 break; 787 } 788 } 789 } 790 public: 791 HeavyMixTestBody( tbb::enumerable_thread_specific<utils::FastRandom<>>& random, 792 tbb::enumerable_thread_specific<int>& isolated_level, int nested_level ) 793 : myRandom( random ), myIsolatedLevel( isolated_level ) 794 , myNestedLevel( nested_level ) {} 795 void operator()() const { 796 int &isolated_level = myIsolatedLevel.local(); 797 CHECK_FAST_MESSAGE( myNestedLevel > isolated_level, "The outer-level task should not be stolen on isolated level" ); 798 if ( myNestedLevel == 20 ) 799 return; 800 utils::FastRandom<>& rnd = myRandom.local(); 801 if ( rnd.get() % 2 == 1 ) { 802 RunNextLevel<tbb::auto_partitioner>( rnd, isolated_level ); 803 } else { 804 RunNextLevel<tbb::affinity_partitioner>( rnd, isolated_level ); 805 } 806 } 807 void operator()(int) const { 808 this->operator()(); 809 } 810 }; 811 812 struct RandomInitializer { 813 utils::FastRandom<> operator()() { 814 return utils::FastRandom<>( tbb::this_task_arena::current_thread_index() ); 815 } 816 }; 817 818 void HeavyMixTest() { 819 std::size_t num_threads = tbb::this_task_arena::max_concurrency() < 3 ? 3 : tbb::this_task_arena::max_concurrency(); 820 tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, num_threads); 821 822 RandomInitializer init_random; 823 tbb::enumerable_thread_specific<utils::FastRandom<>> random( init_random ); 824 tbb::enumerable_thread_specific<int> isolated_level( 0 ); 825 for ( int i = 0; i < 5; ++i ) { 826 HeavyMixTestBody b( random, isolated_level, 1 ); 827 b( 0 ); 828 } 829 } 830 831 //--------------------------------------------------// 832 #if TBB_USE_EXCEPTIONS 833 struct MyException {}; 834 struct IsolatedBodyThrowsException { 835 void operator()() const { 836 #if _MSC_VER && !__INTEL_COMPILER 837 // Workaround an unreachable code warning in task_arena_function. 838 volatile bool workaround = true; 839 if (workaround) 840 #endif 841 { 842 throw MyException(); 843 } 844 } 845 }; 846 struct ExceptionTestBody : utils::NoAssign { 847 tbb::enumerable_thread_specific<int>& myEts; 848 std::atomic<bool>& myIsStolen; 849 ExceptionTestBody( tbb::enumerable_thread_specific<int>& ets, std::atomic<bool>& is_stolen ) 850 : myEts( ets ), myIsStolen( is_stolen ) {} 851 void operator()( int i ) const { 852 try { 853 tbb::this_task_arena::isolate( IsolatedBodyThrowsException() ); 854 REQUIRE_MESSAGE( false, "The exception has been lost" ); 855 } 856 catch ( MyException ) {} 857 catch ( ... ) { 858 REQUIRE_MESSAGE( false, "Unexpected exception" ); 859 } 860 // Check that nested algorithms can steal outer-level tasks 861 int &e = myEts.local(); 862 if ( e++ > 0 ) myIsStolen = true; 863 // work imbalance increases chances for stealing 864 tbb::parallel_for( 0, 10+i, utils::DummyBody( 100 ) ); 865 --e; 866 } 867 }; 868 869 #endif /* TBB_USE_EXCEPTIONS */ 870 void ExceptionTest() { 871 #if TBB_USE_EXCEPTIONS 872 tbb::enumerable_thread_specific<int> ets; 873 std::atomic<bool> is_stolen; 874 is_stolen = false; 875 for ( ;; ) { 876 tbb::parallel_for( 0, 1000, ExceptionTestBody( ets, is_stolen ) ); 877 if ( is_stolen ) break; 878 } 879 REQUIRE_MESSAGE( is_stolen, "isolate should not affect non-isolated work" ); 880 #endif /* TBB_USE_EXCEPTIONS */ 881 } 882 883 struct NonConstBody { 884 unsigned int state; 885 void operator()() { 886 state ^= ~0u; 887 } 888 }; 889 890 void TestNonConstBody() { 891 NonConstBody body; 892 body.state = 0x6c97d5ed; 893 tbb::this_task_arena::isolate(body); 894 REQUIRE_MESSAGE(body.state == 0x93682a12, "The wrong state"); 895 } 896 897 // TODO: Consider tbb::task_group instead of explicit task API. 898 class TestEnqueueTask : public tbb::detail::d1::task { 899 using wait_context = tbb::detail::d1::wait_context; 900 901 tbb::enumerable_thread_specific<bool>& executed; 902 std::atomic<int>& completed; 903 904 public: 905 wait_context& waiter; 906 tbb::task_arena& arena; 907 static const int N = 100; 908 909 TestEnqueueTask(tbb::enumerable_thread_specific<bool>& exe, std::atomic<int>& c, wait_context& w, tbb::task_arena& a) 910 : executed(exe), completed(c), waiter(w), arena(a) {} 911 912 tbb::detail::d1::task* execute(tbb::detail::d1::execution_data&) override { 913 for (int i = 0; i < N; ++i) { 914 arena.enqueue([&]() { 915 executed.local() = true; 916 ++completed; 917 for (int j = 0; j < 100; j++) utils::yield(); 918 waiter.release(1); 919 }); 920 } 921 return nullptr; 922 } 923 tbb::detail::d1::task* cancel(tbb::detail::d1::execution_data&) override { return nullptr; } 924 }; 925 926 class TestEnqueueIsolateBody : utils::NoCopy { 927 tbb::enumerable_thread_specific<bool>& executed; 928 std::atomic<int>& completed; 929 tbb::task_arena& arena; 930 public: 931 static const int N = 100; 932 933 TestEnqueueIsolateBody(tbb::enumerable_thread_specific<bool>& exe, std::atomic<int>& c, tbb::task_arena& a) 934 : executed(exe), completed(c), arena(a) {} 935 void operator()() { 936 tbb::task_group_context ctx; 937 tbb::detail::d1::wait_context waiter(N); 938 939 TestEnqueueTask root(executed, completed, waiter, arena); 940 tbb::detail::d1::execute_and_wait(root, ctx, waiter, ctx); 941 } 942 }; 943 944 void TestEnqueue() { 945 tbb::enumerable_thread_specific<bool> executed(false); 946 std::atomic<int> completed; 947 tbb::task_arena arena{tbb::task_arena::attach()}; 948 949 // Check that the main thread can process enqueued tasks. 950 completed = 0; 951 TestEnqueueIsolateBody b1(executed, completed, arena); 952 b1(); 953 954 if (!executed.local()) { 955 REPORT("Warning: No one enqueued task has executed by the main thread.\n"); 956 } 957 958 executed.local() = false; 959 completed = 0; 960 const int N = 100; 961 // Create enqueued tasks out of isolation. 962 963 tbb::task_group_context ctx; 964 tbb::detail::d1::wait_context waiter(N); 965 for (int i = 0; i < N; ++i) { 966 arena.enqueue([&]() { 967 executed.local() = true; 968 ++completed; 969 utils::yield(); 970 waiter.release(1); 971 }); 972 } 973 TestEnqueueIsolateBody b2(executed, completed, arena); 974 tbb::this_task_arena::isolate(b2); 975 REQUIRE_MESSAGE(executed.local() == false, "An enqueued task was executed within isolate."); 976 977 tbb::detail::d1::wait(waiter, ctx); 978 // while (completed < TestEnqueueTask::N + N) utils::yield(); 979 } 980 } 981 982 void TestIsolatedExecute() { 983 // At least 3 threads (owner + 2 thieves) are required to reproduce a situation when the owner steals outer 984 // level task on a nested level. If we have only one thief then it will execute outer level tasks first and 985 // the owner will not have a possibility to steal outer level tasks. 986 int platform_max_thread = tbb::this_task_arena::max_concurrency(); 987 int num_threads = utils::min( platform_max_thread, 3 ); 988 { 989 // Too many threads require too many work to reproduce the stealing from outer level. 990 tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, utils::max(num_threads, 7)); 991 TestIsolatedExecuteNS::TwoLoopsTest(); 992 TestIsolatedExecuteNS::HeavyMixTest(); 993 TestIsolatedExecuteNS::ExceptionTest(); 994 } 995 tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, num_threads); 996 TestIsolatedExecuteNS::HeavyMixTest(); 997 TestIsolatedExecuteNS::TestNonConstBody(); 998 TestIsolatedExecuteNS::TestEnqueue(); 999 } 1000 1001 //-----------------------------------------------------------------------------------------// 1002 1003 class TestDelegatedSpawnWaitBody : utils::NoAssign { 1004 tbb::task_arena &my_a; 1005 utils::SpinBarrier &my_b1, &my_b2; 1006 public: 1007 TestDelegatedSpawnWaitBody( tbb::task_arena &a, utils::SpinBarrier &b1, utils::SpinBarrier &b2) 1008 : my_a(a), my_b1(b1), my_b2(b2) {} 1009 // NativeParallelFor's functor 1010 void operator()(int idx) const { 1011 if ( idx==0 ) { // thread 0 works in the arena, thread 1 waits for it (to prevent test hang) 1012 for (int i = 0; i < 2; ++i) { 1013 my_a.enqueue([this] { my_b1.wait(); }); // tasks to sync with workers 1014 } 1015 tbb::task_group tg; 1016 my_b1.wait(); // sync with the workers 1017 for( int i=0; i<100000; ++i) { 1018 my_a.execute([&tg] { tg.run([] {}); }); 1019 } 1020 my_a.execute([&tg] {tg.wait(); }); 1021 } 1022 1023 my_b2.wait(); // sync both threads 1024 } 1025 }; 1026 1027 void TestDelegatedSpawnWait() { 1028 if (tbb::this_task_arena::max_concurrency() < 3) { 1029 // The test requires at least 2 worker threads 1030 return; 1031 } 1032 // Regression test for a bug with missed wakeup notification from a delegated task 1033 tbb::task_arena a(2,0); 1034 a.initialize(); 1035 utils::SpinBarrier barrier1(3), barrier2(2); 1036 utils::NativeParallelFor( 2, TestDelegatedSpawnWaitBody(a, barrier1, barrier2) ); 1037 a.debug_wait_until_empty(); 1038 } 1039 1040 //-----------------------------------------------------------------------------------------// 1041 1042 class TestMultipleWaitsArenaWait : utils::NoAssign { 1043 using wait_context = tbb::detail::d1::wait_context; 1044 public: 1045 TestMultipleWaitsArenaWait( int idx, int bunch_size, int num_tasks, std::vector<wait_context*>& waiters, std::atomic<int>& processed, tbb::task_group_context& tgc ) 1046 : my_idx( idx ), my_bunch_size( bunch_size ), my_num_tasks(num_tasks), my_waiters( waiters ), my_processed( processed ), my_context(tgc) {} 1047 void operator()() const { 1048 ++my_processed; 1049 // Wait for all tasks 1050 if ( my_idx < my_num_tasks ) { 1051 tbb::detail::d1::wait(*my_waiters[my_idx], my_context); 1052 } 1053 // Signal waiting tasks 1054 if ( my_idx >= my_bunch_size ) { 1055 my_waiters[my_idx-my_bunch_size]->release(); 1056 } 1057 } 1058 private: 1059 int my_idx; 1060 int my_bunch_size; 1061 int my_num_tasks; 1062 std::vector<wait_context*>& my_waiters; 1063 std::atomic<int>& my_processed; 1064 tbb::task_group_context& my_context; 1065 }; 1066 1067 class TestMultipleWaitsThreadBody : utils::NoAssign { 1068 using wait_context = tbb::detail::d1::wait_context; 1069 public: 1070 TestMultipleWaitsThreadBody( int bunch_size, int num_tasks, tbb::task_arena& a, std::vector<wait_context*>& waiters, std::atomic<int>& processed, tbb::task_group_context& tgc ) 1071 : my_bunch_size( bunch_size ), my_num_tasks( num_tasks ), my_arena( a ), my_waiters( waiters ), my_processed( processed ), my_context(tgc) {} 1072 void operator()( int idx ) const { 1073 my_arena.execute( TestMultipleWaitsArenaWait( idx, my_bunch_size, my_num_tasks, my_waiters, my_processed, my_context ) ); 1074 --my_processed; 1075 } 1076 private: 1077 int my_bunch_size; 1078 int my_num_tasks; 1079 tbb::task_arena& my_arena; 1080 std::vector<wait_context*>& my_waiters; 1081 std::atomic<int>& my_processed; 1082 tbb::task_group_context& my_context; 1083 }; 1084 1085 void TestMultipleWaits( int num_threads, int num_bunches, int bunch_size ) { 1086 tbb::task_arena a( num_threads ); 1087 const int num_tasks = (num_bunches-1)*bunch_size; 1088 1089 tbb::task_group_context tgc; 1090 std::vector<tbb::detail::d1::wait_context*> waiters(num_tasks); 1091 for (auto& w : waiters) w = new tbb::detail::d1::wait_context(0); 1092 1093 std::atomic<int> processed(0); 1094 for ( int repeats = 0; repeats<10; ++repeats ) { 1095 int idx = 0; 1096 for ( int bunch = 0; bunch < num_bunches-1; ++bunch ) { 1097 // Sync with the previous bunch of waiters to prevent "false" nested dependicies (when a nested task waits for an outer task). 1098 while ( processed < bunch*bunch_size ) utils::yield(); 1099 // Run the bunch of threads/waiters that depend on the next bunch of threads/waiters. 1100 for ( int i = 0; i<bunch_size; ++i ) { 1101 waiters[idx]->reserve(); 1102 std::thread( TestMultipleWaitsThreadBody( bunch_size, num_tasks, a, waiters, processed, tgc ), idx++ ).detach(); 1103 } 1104 } 1105 // No sync because the threads of the last bunch do not call wait_for_all. 1106 // Run the last bunch of threads. 1107 for ( int i = 0; i<bunch_size; ++i ) 1108 std::thread( TestMultipleWaitsThreadBody( bunch_size, num_tasks, a, waiters, processed, tgc ), idx++ ).detach(); 1109 while ( processed ) utils::yield(); 1110 } 1111 for (auto w : waiters) delete w; 1112 } 1113 1114 void TestMultipleWaits() { 1115 // Limit the number of threads to prevent heavy oversubscription. 1116 #if TBB_TEST_LOW_WORKLOAD 1117 const int max_threads = std::min( 4, tbb::this_task_arena::max_concurrency() ); 1118 #else 1119 const int max_threads = std::min( 16, tbb::this_task_arena::max_concurrency() ); 1120 #endif 1121 1122 utils::FastRandom<> rnd(1234); 1123 for ( int threads = 1; threads <= max_threads; threads += utils::max( threads/2, 1 ) ) { 1124 for ( int i = 0; i<3; ++i ) { 1125 const int num_bunches = 3 + rnd.get()%3; 1126 const int bunch_size = max_threads + rnd.get()%max_threads; 1127 TestMultipleWaits( threads, num_bunches, bunch_size ); 1128 } 1129 } 1130 } 1131 1132 //--------------------------------------------------// 1133 1134 void TestSmallStackSize() { 1135 tbb::global_control gc(tbb::global_control::thread_stack_size, 1136 tbb::global_control::active_value(tbb::global_control::thread_stack_size) / 2 ); 1137 // The test produces the warning (not a error) if fails. So the test is run many times 1138 // to make the log annoying (to force to consider it as an error). 1139 for (int i = 0; i < 100; ++i) { 1140 tbb::task_arena a; 1141 a.initialize(); 1142 } 1143 } 1144 1145 //--------------------------------------------------// 1146 1147 namespace TestMoveSemanticsNS { 1148 struct TestFunctor { 1149 void operator()() const {}; 1150 }; 1151 1152 struct MoveOnlyFunctor : utils::MoveOnly, TestFunctor { 1153 MoveOnlyFunctor() : utils::MoveOnly() {}; 1154 MoveOnlyFunctor(MoveOnlyFunctor&& other) : utils::MoveOnly(std::move(other)) {}; 1155 }; 1156 1157 struct MovePreferableFunctor : utils::Movable, TestFunctor { 1158 MovePreferableFunctor() : utils::Movable() {}; 1159 MovePreferableFunctor(MovePreferableFunctor&& other) : utils::Movable( std::move(other) ) {}; 1160 MovePreferableFunctor(const MovePreferableFunctor& other) : utils::Movable(other) {}; 1161 }; 1162 1163 struct NoMoveNoCopyFunctor : utils::NoCopy, TestFunctor { 1164 NoMoveNoCopyFunctor() : utils::NoCopy() {}; 1165 // mv ctor is not allowed as cp ctor from parent NoCopy 1166 private: 1167 NoMoveNoCopyFunctor(NoMoveNoCopyFunctor&&); 1168 }; 1169 1170 void TestFunctors() { 1171 tbb::task_arena ta; 1172 MovePreferableFunctor mpf; 1173 // execute() doesn't have any copies or moves of arguments inside the impl 1174 ta.execute( NoMoveNoCopyFunctor() ); 1175 1176 ta.enqueue( MoveOnlyFunctor() ); 1177 ta.enqueue( mpf ); 1178 REQUIRE_MESSAGE(mpf.alive, "object was moved when was passed by lval"); 1179 mpf.Reset(); 1180 ta.enqueue( std::move(mpf) ); 1181 REQUIRE_MESSAGE(!mpf.alive, "object was copied when was passed by rval"); 1182 mpf.Reset(); 1183 } 1184 } 1185 1186 void TestMoveSemantics() { 1187 TestMoveSemanticsNS::TestFunctors(); 1188 } 1189 1190 //--------------------------------------------------// 1191 1192 #include <vector> 1193 1194 #include "common/state_trackable.h" 1195 1196 namespace TestReturnValueNS { 1197 struct noDefaultTag {}; 1198 class ReturnType : public StateTrackable<> { 1199 static const int SIZE = 42; 1200 std::vector<int> data; 1201 public: 1202 ReturnType(noDefaultTag) : StateTrackable<>(0) {} 1203 // Define copy constructor to test that it is never called 1204 ReturnType(const ReturnType& r) : StateTrackable<>(r), data(r.data) {} 1205 ReturnType(ReturnType&& r) : StateTrackable<>(std::move(r)), data(std::move(r.data)) {} 1206 1207 void fill() { 1208 for (int i = 0; i < SIZE; ++i) 1209 data.push_back(i); 1210 } 1211 void check() { 1212 REQUIRE(data.size() == unsigned(SIZE)); 1213 for (int i = 0; i < SIZE; ++i) 1214 REQUIRE(data[i] == i); 1215 StateTrackableCounters::counters_type& cnts = StateTrackableCounters::counters; 1216 REQUIRE(cnts[StateTrackableBase::DefaultInitialized] == 0); 1217 REQUIRE(cnts[StateTrackableBase::DirectInitialized] == 1); 1218 std::size_t copied = cnts[StateTrackableBase::CopyInitialized]; 1219 std::size_t moved = cnts[StateTrackableBase::MoveInitialized]; 1220 REQUIRE(cnts[StateTrackableBase::Destroyed] == copied + moved); 1221 // The number of copies/moves should not exceed 3 if copy elision takes a place: 1222 // function return, store to an internal storage, acquire internal storage. 1223 // For compilation, without copy elision, this number may be grown up to 7. 1224 REQUIRE((copied == 0 && moved <= 7)); 1225 WARN_MESSAGE(moved <= 3, 1226 "Warning: The number of copies/moves should not exceed 3 if copy elision takes a place." 1227 "Take an attention to this warning only if copy elision is enabled." 1228 ); 1229 } 1230 }; 1231 1232 template <typename R> 1233 R function() { 1234 noDefaultTag tag; 1235 R r(tag); 1236 r.fill(); 1237 return r; 1238 } 1239 1240 template <> 1241 void function<void>() {} 1242 1243 template <typename R> 1244 struct Functor { 1245 R operator()() const { 1246 return function<R>(); 1247 } 1248 }; 1249 1250 tbb::task_arena& arena() { 1251 static tbb::task_arena a; 1252 return a; 1253 } 1254 1255 template <typename F> 1256 void TestExecute(F &f) { 1257 StateTrackableCounters::reset(); 1258 ReturnType r{arena().execute(f)}; 1259 r.check(); 1260 } 1261 1262 template <typename F> 1263 void TestExecute(const F &f) { 1264 StateTrackableCounters::reset(); 1265 ReturnType r{arena().execute(f)}; 1266 r.check(); 1267 } 1268 template <typename F> 1269 void TestIsolate(F &f) { 1270 StateTrackableCounters::reset(); 1271 ReturnType r{tbb::this_task_arena::isolate(f)}; 1272 r.check(); 1273 } 1274 1275 template <typename F> 1276 void TestIsolate(const F &f) { 1277 StateTrackableCounters::reset(); 1278 ReturnType r{tbb::this_task_arena::isolate(f)}; 1279 r.check(); 1280 } 1281 1282 void Test() { 1283 TestExecute(Functor<ReturnType>()); 1284 Functor<ReturnType> f1; 1285 TestExecute(f1); 1286 TestExecute(function<ReturnType>); 1287 1288 arena().execute(Functor<void>()); 1289 Functor<void> f2; 1290 arena().execute(f2); 1291 arena().execute(function<void>); 1292 TestIsolate(Functor<ReturnType>()); 1293 TestIsolate(f1); 1294 TestIsolate(function<ReturnType>); 1295 tbb::this_task_arena::isolate(Functor<void>()); 1296 tbb::this_task_arena::isolate(f2); 1297 tbb::this_task_arena::isolate(function<void>); 1298 } 1299 } 1300 1301 void TestReturnValue() { 1302 TestReturnValueNS::Test(); 1303 } 1304 1305 //--------------------------------------------------// 1306 1307 // MyObserver checks if threads join to the same arena 1308 struct MyObserver: public tbb::task_scheduler_observer { 1309 tbb::enumerable_thread_specific<tbb::task_arena*>& my_tls; 1310 tbb::task_arena& my_arena; 1311 std::atomic<int>& my_failure_counter; 1312 std::atomic<int>& my_counter; 1313 utils::SpinBarrier& m_barrier; 1314 1315 MyObserver(tbb::task_arena& a, 1316 tbb::enumerable_thread_specific<tbb::task_arena*>& tls, 1317 std::atomic<int>& failure_counter, 1318 std::atomic<int>& counter, 1319 utils::SpinBarrier& barrier) 1320 : tbb::task_scheduler_observer(a), my_tls(tls), my_arena(a), 1321 my_failure_counter(failure_counter), my_counter(counter), m_barrier(barrier) { 1322 observe(true); 1323 } 1324 void on_scheduler_entry(bool worker) override { 1325 if (worker) { 1326 ++my_counter; 1327 tbb::task_arena*& cur_arena = my_tls.local(); 1328 if (cur_arena != 0 && cur_arena != &my_arena) { 1329 ++my_failure_counter; 1330 } 1331 cur_arena = &my_arena; 1332 m_barrier.wait(); 1333 } 1334 } 1335 void on_scheduler_exit(bool worker) override { 1336 if (worker) { 1337 m_barrier.wait(); // before wakeup 1338 m_barrier.wait(); // after wakeup 1339 } 1340 } 1341 }; 1342 1343 void TestArenaWorkersMigrationWithNumThreads(int n_threads = 0) { 1344 if (n_threads == 0) { 1345 n_threads = tbb::this_task_arena::max_concurrency(); 1346 } 1347 1348 const int max_n_arenas = 8; 1349 int n_arenas = 2; 1350 if(n_threads > 16) { 1351 n_arenas = max_n_arenas; 1352 } else if (n_threads > 8) { 1353 n_arenas = 4; 1354 } 1355 1356 int n_workers = n_threads - 1; 1357 n_workers = n_arenas * (n_workers / n_arenas); 1358 if (n_workers == 0) { 1359 return; 1360 } 1361 1362 n_threads = n_workers + 1; 1363 tbb::global_control control(tbb::global_control::max_allowed_parallelism, n_threads); 1364 1365 const int n_repetitions = 20; 1366 const int n_outer_repetitions = 100; 1367 std::multiset<float> failure_ratio; // for median calculating 1368 utils::SpinBarrier barrier(n_threads); 1369 utils::SpinBarrier worker_barrier(n_workers); 1370 MyObserver* observer[max_n_arenas]; 1371 std::vector<tbb::task_arena> arenas(n_arenas); 1372 std::atomic<int> failure_counter; 1373 std::atomic<int> counter; 1374 tbb::enumerable_thread_specific<tbb::task_arena*> tls; 1375 1376 for (int i = 0; i < n_arenas; ++i) { 1377 arenas[i].initialize(n_workers / n_arenas + 1); // +1 for master 1378 observer[i] = new MyObserver(arenas[i], tls, failure_counter, counter, barrier); 1379 } 1380 1381 int ii = 0; 1382 for (; ii < n_outer_repetitions; ++ii) { 1383 failure_counter = 0; 1384 counter = 0; 1385 1386 // Main code 1387 auto wakeup = [&arenas] { for (auto& a : arenas) a.enqueue([]{}); }; 1388 wakeup(); 1389 for (int j = 0; j < n_repetitions; ++j) { 1390 barrier.wait(); // entry 1391 barrier.wait(); // exit1 1392 wakeup(); 1393 barrier.wait(); // exit2 1394 } 1395 barrier.wait(); // entry 1396 barrier.wait(); // exit1 1397 barrier.wait(); // exit2 1398 1399 failure_ratio.insert(float(failure_counter) / counter); 1400 tls.clear(); 1401 // collect 3 elements in failure_ratio before calculating median 1402 if (ii > 1) { 1403 std::multiset<float>::iterator it = failure_ratio.begin(); 1404 std::advance(it, failure_ratio.size() / 2); 1405 if (*it < 0.02) 1406 break; 1407 } 1408 } 1409 for (int i = 0; i < n_arenas; ++i) { 1410 delete observer[i]; 1411 } 1412 // check if median is so big 1413 std::multiset<float>::iterator it = failure_ratio.begin(); 1414 std::advance(it, failure_ratio.size() / 2); 1415 // TODO: decrease constants 0.05 and 0.3 by setting ratio between n_threads and n_arenas 1416 if (*it > 0.05) { 1417 REPORT("Warning: So many cases when threads join to different arenas.\n"); 1418 REQUIRE_MESSAGE(*it <= 0.3, "A lot of cases when threads join to different arenas.\n"); 1419 } 1420 } 1421 1422 void TestArenaWorkersMigration() { 1423 TestArenaWorkersMigrationWithNumThreads(4); 1424 if (tbb::this_task_arena::max_concurrency() != 4) { 1425 TestArenaWorkersMigrationWithNumThreads(); 1426 } 1427 } 1428 1429 //--------------------------------------------------// 1430 void TestDefaultCreatedWorkersAmount() { 1431 int threads = tbb::this_task_arena::max_concurrency(); 1432 utils::NativeParallelFor(1, [threads](int idx) { 1433 REQUIRE_MESSAGE(idx == 0, "more than 1 thread is going to reset TLS"); 1434 utils::SpinBarrier barrier(threads); 1435 ResetTLS(); 1436 for (int trail = 0; trail < 10; ++trail) { 1437 tbb::parallel_for(0, threads, [threads, &barrier](int) { 1438 REQUIRE_MESSAGE(threads == tbb::this_task_arena::max_concurrency(), "concurrency level is not equal specified threadnum"); 1439 REQUIRE_MESSAGE(tbb::this_task_arena::current_thread_index() < tbb::this_task_arena::max_concurrency(), "amount of created threads is more than specified by default"); 1440 local_id.local() = 1; 1441 // If there is more threads than expected, 'sleep' gives a chance to join unexpected threads. 1442 utils::Sleep(1); 1443 barrier.wait(); 1444 }, tbb::simple_partitioner()); 1445 REQUIRE_MESSAGE(local_id.size() == size_t(threads), "amount of created threads is not equal to default num"); 1446 } 1447 }); 1448 } 1449 1450 void TestAbilityToCreateWorkers(int thread_num) { 1451 tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, thread_num); 1452 // Checks only some part of reserved-external threads amount: 1453 // 0 and 1 reserved threads are important cases but it is also needed 1454 // to collect some statistic data with other amount and to not consume 1455 // whole test sesion time checking each amount 1456 TestArenaConcurrency(thread_num - 1, 0, int(thread_num / 2.72)); 1457 TestArenaConcurrency(thread_num, 1, int(thread_num / 3.14)); 1458 } 1459 1460 void TestDefaultWorkersLimit() { 1461 TestDefaultCreatedWorkersAmount(); 1462 #if TBB_TEST_LOW_WORKLOAD 1463 TestAbilityToCreateWorkers(24); 1464 #else 1465 TestAbilityToCreateWorkers(256); 1466 #endif 1467 } 1468 1469 #if TBB_USE_EXCEPTIONS 1470 1471 void ExceptionInExecute() { 1472 std::size_t thread_number = utils::get_platform_max_threads(); 1473 tbb::task_arena test_arena(thread_number / 2, thread_number / 2); 1474 1475 std::atomic<int> canceled_task{}; 1476 1477 auto parallel_func = [&test_arena, &canceled_task] (std::size_t) { 1478 for (std::size_t i = 0; i < 1000; ++i) { 1479 try { 1480 test_arena.execute([] { 1481 volatile bool suppress_unreachable_code_warning = true; 1482 if (suppress_unreachable_code_warning) { 1483 throw -1; 1484 } 1485 }); 1486 FAIL("An exception should have thrown."); 1487 } catch (int) { 1488 ++canceled_task; 1489 } catch (...) { 1490 FAIL("Wrong type of exception."); 1491 } 1492 } 1493 }; 1494 1495 utils::NativeParallelFor(thread_number, parallel_func); 1496 CHECK(canceled_task == thread_number * 1000); 1497 } 1498 1499 #endif // TBB_USE_EXCEPTIONS 1500 1501 class simple_observer : public tbb::task_scheduler_observer { 1502 static std::atomic<int> idx_counter; 1503 int my_idx; 1504 int myMaxConcurrency; // concurrency of the associated arena 1505 int myNumReservedSlots; // reserved slots in the associated arena 1506 void on_scheduler_entry( bool is_worker ) override { 1507 int current_index = tbb::this_task_arena::current_thread_index(); 1508 CHECK(current_index < (myMaxConcurrency > 1 ? myMaxConcurrency : 2)); 1509 if (is_worker) { 1510 CHECK(current_index >= myNumReservedSlots); 1511 } 1512 } 1513 void on_scheduler_exit( bool /*is_worker*/ ) override 1514 {} 1515 public: 1516 simple_observer(tbb::task_arena &a, int maxConcurrency, int numReservedSlots) 1517 : tbb::task_scheduler_observer(a), my_idx(idx_counter++) 1518 , myMaxConcurrency(maxConcurrency) 1519 , myNumReservedSlots(numReservedSlots) { 1520 observe(true); 1521 } 1522 1523 friend bool operator<(const simple_observer& lhs, const simple_observer& rhs) { 1524 return lhs.my_idx < rhs.my_idx; 1525 } 1526 }; 1527 1528 std::atomic<int> simple_observer::idx_counter{}; 1529 1530 struct arena_handler { 1531 enum arena_status { 1532 alive, 1533 deleting, 1534 deleted 1535 }; 1536 1537 tbb::task_arena* arena; 1538 1539 std::atomic<arena_status> status{alive}; 1540 tbb::spin_rw_mutex arena_in_use{}; 1541 1542 tbb::concurrent_set<simple_observer> observers; 1543 1544 arena_handler(tbb::task_arena* ptr) : arena(ptr) 1545 {} 1546 1547 friend bool operator<(const arena_handler& lhs, const arena_handler& rhs) { 1548 return lhs.arena < rhs.arena; 1549 } 1550 }; 1551 1552 // TODO: Add observer operations 1553 void StressTestMixFunctionality() { 1554 enum operation_type { 1555 create_arena, 1556 delete_arena, 1557 attach_observer, 1558 detach_observer, 1559 arena_execute, 1560 enqueue_task, 1561 last_operation_marker 1562 }; 1563 1564 std::size_t operations_number = last_operation_marker; 1565 std::size_t thread_number = utils::get_platform_max_threads(); 1566 utils::FastRandom<> operation_rnd(42); 1567 tbb::spin_mutex random_operation_guard; 1568 1569 auto get_random_operation = [&operation_rnd, &random_operation_guard, operations_number] () { 1570 tbb::spin_mutex::scoped_lock lock(random_operation_guard); 1571 return static_cast<operation_type>(operation_rnd.get() % operations_number); 1572 }; 1573 1574 utils::FastRandom<> arena_rnd(42); 1575 tbb::spin_mutex random_arena_guard; 1576 auto get_random_arena = [&arena_rnd, &random_arena_guard] () { 1577 tbb::spin_mutex::scoped_lock lock(random_arena_guard); 1578 return arena_rnd.get(); 1579 }; 1580 1581 tbb::concurrent_set<arena_handler> arenas_pool; 1582 1583 std::vector<std::thread> thread_pool; 1584 1585 utils::SpinBarrier thread_barrier(thread_number); 1586 std::size_t max_operations = 20000; 1587 std::atomic<std::size_t> curr_operation{}; 1588 auto thread_func = [&] () { 1589 arenas_pool.emplace(new tbb::task_arena()); 1590 thread_barrier.wait(); 1591 while (curr_operation++ < max_operations) { 1592 switch (get_random_operation()) { 1593 case create_arena : 1594 { 1595 arenas_pool.emplace(new tbb::task_arena()); 1596 break; 1597 } 1598 case delete_arena : 1599 { 1600 auto curr_arena = arenas_pool.begin(); 1601 for (; curr_arena != arenas_pool.end(); ++curr_arena) { 1602 arena_handler::arena_status curr_status = arena_handler::alive; 1603 if (curr_arena->status.compare_exchange_strong(curr_status, arena_handler::deleting)) { 1604 break; 1605 } 1606 } 1607 1608 if (curr_arena == arenas_pool.end()) break; 1609 1610 tbb::spin_rw_mutex::scoped_lock lock(curr_arena->arena_in_use, /*writer*/ true); 1611 1612 delete curr_arena->arena; 1613 curr_arena->status.store(arena_handler::deleted); 1614 1615 break; 1616 } 1617 case attach_observer : 1618 { 1619 tbb::spin_rw_mutex::scoped_lock lock{}; 1620 auto curr_arena = arenas_pool.begin(); 1621 for (; curr_arena != arenas_pool.end(); ++curr_arena) { 1622 if (lock.try_acquire(curr_arena->arena_in_use, /*writer*/ false)) { 1623 if (curr_arena->status == arena_handler::alive) { 1624 break; 1625 } else { 1626 lock.release(); 1627 } 1628 } 1629 } 1630 1631 if (curr_arena == arenas_pool.end()) break; 1632 1633 { 1634 curr_arena->observers.emplace(*curr_arena->arena, thread_number, 1); 1635 } 1636 1637 break; 1638 } 1639 case detach_observer : 1640 { 1641 auto arena_number = get_random_arena() % arenas_pool.size(); 1642 auto curr_arena = arenas_pool.begin(); 1643 std::advance(curr_arena, arena_number); 1644 1645 for (auto it = curr_arena->observers.begin(); it != curr_arena->observers.end(); ++it) { 1646 if (it->is_observing()) { 1647 it->observe(false); 1648 break; 1649 } 1650 } 1651 1652 break; 1653 } 1654 case arena_execute : 1655 { 1656 tbb::spin_rw_mutex::scoped_lock lock{}; 1657 auto curr_arena = arenas_pool.begin(); 1658 for (; curr_arena != arenas_pool.end(); ++curr_arena) { 1659 if (lock.try_acquire(curr_arena->arena_in_use, /*writer*/ false)) { 1660 if (curr_arena->status == arena_handler::alive) { 1661 break; 1662 } else { 1663 lock.release(); 1664 } 1665 } 1666 } 1667 1668 if (curr_arena == arenas_pool.end()) break; 1669 1670 curr_arena->arena->execute([] () { 1671 tbb::parallel_for(tbb::blocked_range<std::size_t>(0, 10000), [] (tbb::blocked_range<std::size_t>&) { 1672 std::atomic<int> sum{}; 1673 // Make some work 1674 for (; sum < 10; ++sum) ; 1675 }); 1676 }); 1677 1678 break; 1679 } 1680 case enqueue_task : 1681 { 1682 tbb::spin_rw_mutex::scoped_lock lock{}; 1683 auto curr_arena = arenas_pool.begin(); 1684 for (; curr_arena != arenas_pool.end(); ++curr_arena) { 1685 if (lock.try_acquire(curr_arena->arena_in_use, /*writer*/ false)) { 1686 if (curr_arena->status == arena_handler::alive) { 1687 break; 1688 } else { 1689 lock.release(); 1690 } 1691 } 1692 } 1693 1694 if (curr_arena == arenas_pool.end()) break; 1695 1696 curr_arena->arena->enqueue([] { 1697 std::atomic<int> sum{}; 1698 // Make some work 1699 for (; sum < 1000; ++sum) ; 1700 }); 1701 1702 break; 1703 } 1704 case last_operation_marker : 1705 break; 1706 } 1707 } 1708 }; 1709 1710 for (std::size_t i = 0; i < thread_number - 1; ++i) { 1711 thread_pool.emplace_back(thread_func); 1712 } 1713 1714 thread_func(); 1715 1716 for (std::size_t i = 0; i < thread_number - 1; ++i) { 1717 if (thread_pool[i].joinable()) thread_pool[i].join(); 1718 } 1719 1720 for (auto& handler : arenas_pool) { 1721 if (handler.status != arena_handler::deleted) delete handler.arena; 1722 } 1723 } 1724 1725 struct enqueue_test_helper { 1726 enqueue_test_helper(tbb::task_arena& arena, tbb::enumerable_thread_specific<bool>& ets , std::atomic<std::size_t>& task_counter) 1727 : my_arena(arena), my_ets(ets), my_task_counter(task_counter) 1728 {} 1729 1730 enqueue_test_helper(const enqueue_test_helper& ef) : my_arena(ef.my_arena), my_ets(ef.my_ets), my_task_counter(ef.my_task_counter) 1731 {} 1732 1733 void operator() () const { 1734 CHECK(my_ets.local()); 1735 if (my_task_counter++ < 100000) my_arena.enqueue(enqueue_test_helper(my_arena, my_ets, my_task_counter)); 1736 utils::yield(); 1737 } 1738 1739 tbb::task_arena& my_arena; 1740 tbb::enumerable_thread_specific<bool>& my_ets; 1741 std::atomic<std::size_t>& my_task_counter; 1742 }; 1743 1744 //--------------------------------------------------// 1745 1746 //! Test for task arena in concurrent cases 1747 //! \brief \ref requirement 1748 TEST_CASE("Test for concurrent functionality") { 1749 TestConcurrentFunctionality(); 1750 } 1751 1752 //! Test for arena entry consistency 1753 //! \brief \ref requirement \ref error_guessing 1754 TEST_CASE("Test for task arena entry consistency") { 1755 TestArenaEntryConsistency(); 1756 } 1757 1758 //! Test for task arena attach functionality 1759 //! \brief \ref requirement \ref interface 1760 TEST_CASE("Test for the attach functionality") { 1761 TestAttach(4); 1762 } 1763 1764 //! Test for constant functor requirements 1765 //! \brief \ref requirement \ref interface 1766 TEST_CASE("Test for constant functor requirement") { 1767 TestConstantFunctorRequirement(); 1768 } 1769 1770 //! Test for move semantics support 1771 //! \brief \ref requirement \ref interface 1772 TEST_CASE("Move semantics support") { 1773 TestMoveSemantics(); 1774 } 1775 1776 //! Test for different return value types 1777 //! \brief \ref requirement \ref interface 1778 TEST_CASE("Return value test") { 1779 TestReturnValue(); 1780 } 1781 1782 //! Test for delegated task spawn in case of unsuccessful slot attach 1783 //! \brief \ref error_guessing 1784 TEST_CASE("Delegated spawn wait") { 1785 TestDelegatedSpawnWait(); 1786 } 1787 1788 //! Test task arena isolation functionality 1789 //! \brief \ref requirement \ref interface 1790 TEST_CASE("Isolated execute") { 1791 // Isolation tests cases is valid only for more then 2 threads 1792 if (tbb::this_task_arena::max_concurrency() > 2) { 1793 TestIsolatedExecute(); 1794 } 1795 } 1796 1797 //! Test for TBB Workers creation limits 1798 //! \brief \ref requirement 1799 TEST_CASE("Default workers limit") { 1800 TestDefaultWorkersLimit(); 1801 } 1802 1803 //! Test for workers migration between arenas 1804 //! \brief \ref error_guessing \ref stress 1805 TEST_CASE("Arena workers migration") { 1806 TestArenaWorkersMigration(); 1807 } 1808 1809 //! Test for multiple waits, threads should not block each other 1810 //! \brief \ref requirement 1811 TEST_CASE("Multiple waits") { 1812 TestMultipleWaits(); 1813 } 1814 1815 //! Test for small stack size settings and arena initialization 1816 //! \brief \ref error_guessing 1817 TEST_CASE("Small stack size") { 1818 TestSmallStackSize(); 1819 } 1820 1821 #if TBB_USE_EXCEPTIONS 1822 //! \brief \ref requirement \ref stress 1823 TEST_CASE("Test for exceptions during execute.") { 1824 ExceptionInExecute(); 1825 } 1826 1827 //! \brief \ref error_guessing 1828 TEST_CASE("Exception thrown during tbb::task_arena::execute call") { 1829 struct throwing_obj { 1830 throwing_obj() { 1831 volatile bool flag = true; 1832 if (flag) throw std::exception{}; 1833 } 1834 throwing_obj(const throwing_obj&) = default; 1835 ~throwing_obj() { FAIL("An destructor was called."); } 1836 }; 1837 1838 tbb::task_arena arena; 1839 1840 REQUIRE_THROWS_AS( [&] { 1841 arena.execute([] { 1842 return throwing_obj{}; 1843 }); 1844 }(), std::exception ); 1845 } 1846 #endif // TBB_USE_EXCEPTIONS 1847 1848 //! \brief \ref stress 1849 TEST_CASE("Stress test with mixing functionality") { 1850 StressTestMixFunctionality(); 1851 } 1852 1853 //! \brief \ref stress 1854 TEST_CASE("Workers oversubscription") { 1855 std::size_t num_threads = utils::get_platform_max_threads(); 1856 tbb::enumerable_thread_specific<bool> ets; 1857 tbb::global_control gl(tbb::global_control::max_allowed_parallelism, num_threads * 2); 1858 tbb::task_arena arena(num_threads * 2); 1859 1860 utils::SpinBarrier barrier(num_threads * 2); 1861 1862 arena.execute([&] { 1863 tbb::parallel_for(std::size_t(0), num_threads * 2, 1864 [&] (const std::size_t&) { 1865 ets.local() = true; 1866 barrier.wait(); 1867 } 1868 ); 1869 }); 1870 1871 utils::yield(); 1872 1873 std::atomic<std::size_t> task_counter{0}; 1874 for (std::size_t i = 0; i < num_threads / 4 + 1; ++i) { 1875 arena.enqueue(enqueue_test_helper(arena, ets, task_counter)); 1876 } 1877 1878 while (task_counter < 100000) utils::yield(); 1879 1880 arena.execute([&] { 1881 tbb::parallel_for(std::size_t(0), num_threads * 2, 1882 [&] (const std::size_t&) { 1883 CHECK(ets.local()); 1884 barrier.wait(); 1885 } 1886 ); 1887 }); 1888 } 1889 1890 #if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS 1891 //! Basic test for arena::enqueue with task handle 1892 //! \brief \ref interface \ref requirement 1893 TEST_CASE("enqueue task_handle") { 1894 tbb::task_arena arena; 1895 tbb::task_group tg; 1896 1897 std::atomic<bool> run{false}; 1898 1899 auto task_handle = tg.defer([&]{ run = true; }); 1900 1901 arena.enqueue(std::move(task_handle)); 1902 tg.wait(); 1903 1904 CHECK(run == true); 1905 } 1906 1907 //! Basic test for this_task_arena::enqueue with task handle 1908 //! \brief \ref interface \ref requirement 1909 TEST_CASE("this_task_arena::enqueue task_handle") { 1910 tbb::task_arena arena; 1911 tbb::task_group tg; 1912 1913 std::atomic<bool> run{false}; 1914 1915 arena.execute([&]{ 1916 auto task_handle = tg.defer([&]{ run = true; }); 1917 1918 tbb::this_task_arena::enqueue(std::move(task_handle)); 1919 }); 1920 1921 tg.wait(); 1922 1923 CHECK(run == true); 1924 } 1925 1926 //! Basic test for this_task_arena::enqueue with functor 1927 //! \brief \ref interface \ref requirement 1928 TEST_CASE("this_task_arena::enqueue function") { 1929 tbb::task_arena arena; 1930 tbb::task_group tg; 1931 1932 std::atomic<bool> run{false}; 1933 //block the task_group to wait on it 1934 auto task_handle = tg.defer([]{}); 1935 1936 arena.execute([&]{ 1937 tbb::this_task_arena::enqueue([&]{ 1938 run = true; 1939 //release the task_group 1940 task_handle = tbb::task_handle{}; 1941 }); 1942 }); 1943 1944 tg.wait(); 1945 1946 CHECK(run == true); 1947 } 1948 1949 #if TBB_USE_EXCEPTIONS 1950 //! Basic test for exceptions in task_arena::enqueue with task_handle 1951 //! \brief \ref interface \ref requirement 1952 TEST_CASE("task_arena::enqueue(task_handle) exception propagation"){ 1953 tbb::task_group tg; 1954 tbb::task_arena arena; 1955 1956 tbb::task_handle h = tg.defer([&]{ 1957 volatile bool suppress_unreachable_code_warning = true; 1958 if (suppress_unreachable_code_warning) { 1959 throw std::runtime_error{ "" }; 1960 } 1961 }); 1962 1963 arena.enqueue(std::move(h)); 1964 1965 CHECK_THROWS_AS(tg.wait(), std::runtime_error); 1966 } 1967 1968 //! Basic test for exceptions in this_task_arena::enqueue with task_handle 1969 //! \brief \ref interface \ref requirement 1970 TEST_CASE("this_task_arena::enqueue(task_handle) exception propagation"){ 1971 tbb::task_group tg; 1972 1973 tbb::task_handle h = tg.defer([&]{ 1974 volatile bool suppress_unreachable_code_warning = true; 1975 if (suppress_unreachable_code_warning) { 1976 throw std::runtime_error{ "" }; 1977 } 1978 }); 1979 1980 tbb::this_task_arena::enqueue(std::move(h)); 1981 1982 CHECK_THROWS_AS(tg.wait(), std::runtime_error); 1983 } 1984 1985 //! The test for error in scheduling empty task_handle 1986 //! \brief \ref requirement 1987 TEST_CASE("Empty task_handle cannot be scheduled"){ 1988 tbb::task_arena ta; 1989 1990 CHECK_THROWS_WITH_AS(ta.enqueue(tbb::task_handle{}), "Attempt to schedule empty task_handle", std::runtime_error); 1991 CHECK_THROWS_WITH_AS(tbb::this_task_arena::enqueue(tbb::task_handle{}), "Attempt to schedule empty task_handle", std::runtime_error); 1992 } 1993 #endif // TBB_USE_EXCEPTIONS 1994 1995 //! Basic test for is_inside_task in task_group 1996 //! \brief \ref interface \ref requirement 1997 TEST_CASE("is_inside_task in task_group"){ 1998 CHECK( false == tbb::is_inside_task()); 1999 2000 tbb::task_group tg; 2001 tg.run_and_wait([&]{ 2002 CHECK( true == tbb::is_inside_task()); 2003 }); 2004 } 2005 2006 //! Basic test for is_inside_task in arena::execute 2007 //! \brief \ref interface \ref requirement 2008 TEST_CASE("is_inside_task in arena::execute"){ 2009 CHECK( false == tbb::is_inside_task()); 2010 2011 tbb::task_arena arena; 2012 2013 arena.execute([&]{ 2014 // The execute method is processed outside of any task 2015 CHECK( false == tbb::is_inside_task()); 2016 }); 2017 } 2018 2019 //! The test for is_inside_task in arena::execute when inside other task 2020 //! \brief \ref error_guessing 2021 TEST_CASE("is_inside_task in arena::execute") { 2022 CHECK(false == tbb::is_inside_task()); 2023 2024 tbb::task_arena arena; 2025 tbb::task_group tg; 2026 tg.run_and_wait([&] { 2027 arena.execute([&] { 2028 // The execute method is processed outside of any task 2029 CHECK(false == tbb::is_inside_task()); 2030 }); 2031 }); 2032 } 2033 #endif //__TBB_PREVIEW_TASK_GROUP_EXTENSIONS 2034