1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "common/test.h" 18 19 #define __TBB_EXTRA_DEBUG 1 20 #include "common/concurrency_tracker.h" 21 #include "common/spin_barrier.h" 22 #include "common/utils.h" 23 #include "common/utils_report.h" 24 #include "common/utils_concurrency_limit.h" 25 26 #include "tbb/task_arena.h" 27 #include "tbb/task_scheduler_observer.h" 28 #include "tbb/enumerable_thread_specific.h" 29 #include "tbb/parallel_for.h" 30 #include "tbb/global_control.h" 31 #include "tbb/concurrent_set.h" 32 #include "tbb/spin_mutex.h" 33 #include "tbb/spin_rw_mutex.h" 34 #include "tbb/task_group.h" 35 36 #include <stdexcept> 37 #include <cstdlib> 38 #include <cstdio> 39 #include <vector> 40 #include <thread> 41 #include <atomic> 42 43 //#include "harness_fp.h" 44 45 //! \file test_task_arena.cpp 46 //! \brief Test for [scheduler.task_arena scheduler.task_scheduler_observer] specification 47 48 //--------------------------------------------------// 49 // Test that task_arena::initialize and task_arena::terminate work when doing nothing else. 50 /* maxthread is treated as the biggest possible concurrency level. */ 51 void InitializeAndTerminate( int maxthread ) { 52 for( int i=0; i<200; ++i ) { 53 switch( i&3 ) { 54 // Arena is created inactive, initialization is always explicit. Lazy initialization is covered by other test functions. 55 // Explicit initialization can either keep the original values or change those. 56 // Arena termination can be explicit or implicit (in the destructor). 57 // TODO: extend with concurrency level checks if such a method is added. 58 default: { 59 tbb::task_arena arena( std::rand() % maxthread + 1 ); 60 CHECK_MESSAGE(!arena.is_active(), "arena should not be active until initialized"); 61 arena.initialize(); 62 CHECK(arena.is_active()); 63 arena.terminate(); 64 CHECK_MESSAGE(!arena.is_active(), "arena should not be active; it was terminated"); 65 break; 66 } 67 case 0: { 68 tbb::task_arena arena( 1 ); 69 CHECK_MESSAGE(!arena.is_active(), "arena should not be active until initialized"); 70 arena.initialize( std::rand() % maxthread + 1 ); // change the parameters 71 CHECK(arena.is_active()); 72 break; 73 } 74 case 1: { 75 tbb::task_arena arena( tbb::task_arena::automatic ); 76 CHECK(!arena.is_active()); 77 arena.initialize(); 78 CHECK(arena.is_active()); 79 break; 80 } 81 case 2: { 82 tbb::task_arena arena; 83 CHECK_MESSAGE(!arena.is_active(), "arena should not be active until initialized"); 84 arena.initialize( std::rand() % maxthread + 1 ); 85 CHECK(arena.is_active()); 86 arena.terminate(); 87 CHECK_MESSAGE(!arena.is_active(), "arena should not be active; it was terminated"); 88 break; 89 } 90 } 91 } 92 } 93 94 //--------------------------------------------------// 95 96 // Definitions used in more than one test 97 typedef tbb::blocked_range<int> Range; 98 99 // slot_id value: -1 is reserved by current_slot(), -2 is set in on_scheduler_exit() below 100 static tbb::enumerable_thread_specific<int> local_id, old_id, slot_id(-3); 101 102 void ResetTLS() { 103 local_id.clear(); 104 old_id.clear(); 105 slot_id.clear(); 106 } 107 108 class ArenaObserver : public tbb::task_scheduler_observer { 109 int myId; // unique observer/arena id within a test 110 int myMaxConcurrency; // concurrency of the associated arena 111 int myNumReservedSlots; // reserved slots in the associated arena 112 void on_scheduler_entry( bool is_worker ) override { 113 int current_index = tbb::this_task_arena::current_thread_index(); 114 CHECK(current_index < (myMaxConcurrency > 1 ? myMaxConcurrency : 2)); 115 if (is_worker) { 116 CHECK(current_index >= myNumReservedSlots); 117 } 118 CHECK_MESSAGE(!old_id.local(), "double call to on_scheduler_entry"); 119 old_id.local() = local_id.local(); 120 CHECK_MESSAGE(old_id.local() != myId, "double entry to the same arena"); 121 local_id.local() = myId; 122 slot_id.local() = current_index; 123 } 124 void on_scheduler_exit( bool /*is_worker*/ ) override { 125 CHECK_MESSAGE(local_id.local() == myId, "nesting of arenas is broken"); 126 CHECK(slot_id.local() == tbb::this_task_arena::current_thread_index()); 127 slot_id.local() = -2; 128 local_id.local() = old_id.local(); 129 old_id.local() = 0; 130 } 131 public: 132 ArenaObserver(tbb::task_arena &a, int maxConcurrency, int numReservedSlots, int id) 133 : tbb::task_scheduler_observer(a) 134 , myId(id) 135 , myMaxConcurrency(maxConcurrency) 136 , myNumReservedSlots(numReservedSlots) { 137 CHECK(myId); 138 observe(true); 139 } 140 ~ArenaObserver () { 141 CHECK_MESSAGE(!old_id.local(), "inconsistent observer state"); 142 } 143 }; 144 145 struct IndexTrackingBody { // Must be used together with ArenaObserver 146 void operator() ( const Range& ) const { 147 CHECK(slot_id.local() == tbb::this_task_arena::current_thread_index()); 148 utils::doDummyWork(50000); 149 } 150 }; 151 152 struct AsynchronousWork { 153 utils::SpinBarrier &my_barrier; 154 bool my_is_blocking; 155 AsynchronousWork(utils::SpinBarrier &a_barrier, bool blocking = true) 156 : my_barrier(a_barrier), my_is_blocking(blocking) {} 157 void operator()() const { 158 CHECK_MESSAGE(local_id.local() != 0, "not in explicit arena"); 159 tbb::parallel_for(Range(0,500), IndexTrackingBody(), tbb::simple_partitioner()); 160 if(my_is_blocking) my_barrier.wait(); // must be asynchronous to an external thread 161 else my_barrier.signalNoWait(); 162 } 163 }; 164 165 //-----------------------------------------------------------------------------------------// 166 167 // Test that task_arenas might be created and used from multiple application threads. 168 // Also tests arena observers. The parameter p is the index of an app thread running this test. 169 void TestConcurrentArenasFunc(int idx) { 170 // A regression test for observer activation order: 171 // check that arena observer can be activated before local observer 172 struct LocalObserver : public tbb::task_scheduler_observer { 173 LocalObserver() : tbb::task_scheduler_observer() { observe(true); } 174 }; 175 tbb::task_arena a1; 176 a1.initialize(1,0); 177 ArenaObserver o1(a1, 1, 0, idx*2+1); // the last argument is a "unique" observer/arena id for the test 178 CHECK_MESSAGE(o1.is_observing(), "Arena observer has not been activated"); 179 LocalObserver lo; 180 CHECK_MESSAGE(lo.is_observing(), "Local observer has not been activated"); 181 tbb::task_arena a2(2,1); 182 ArenaObserver o2(a2, 2, 1, idx*2+2); 183 CHECK_MESSAGE(o2.is_observing(), "Arena observer has not been activated"); 184 utils::SpinBarrier barrier(2); 185 AsynchronousWork work(barrier); 186 a1.enqueue(work); // put async work 187 barrier.wait(); 188 a2.enqueue(work); // another work 189 a2.execute(work); 190 a1.debug_wait_until_empty(); 191 a2.debug_wait_until_empty(); 192 } 193 194 void TestConcurrentArenas(int p) { 195 // TODO REVAMP fix with global control 196 ResetTLS(); 197 utils::NativeParallelFor( p, &TestConcurrentArenasFunc ); 198 } 199 //--------------------------------------------------// 200 // Test multiple application threads working with a single arena at the same time. 201 class MultipleMastersPart1 : utils::NoAssign { 202 tbb::task_arena &my_a; 203 utils::SpinBarrier &my_b1, &my_b2; 204 public: 205 MultipleMastersPart1( tbb::task_arena &a, utils::SpinBarrier &b1, utils::SpinBarrier &b2) 206 : my_a(a), my_b1(b1), my_b2(b2) {} 207 void operator()(int) const { 208 my_a.execute(AsynchronousWork(my_b2, /*blocking=*/false)); 209 my_b1.wait(); 210 // A regression test for bugs 1954 & 1971 211 my_a.enqueue(AsynchronousWork(my_b2, /*blocking=*/false)); 212 } 213 }; 214 215 class MultipleMastersPart2 : utils::NoAssign { 216 tbb::task_arena &my_a; 217 utils::SpinBarrier &my_b; 218 public: 219 MultipleMastersPart2( tbb::task_arena &a, utils::SpinBarrier &b) : my_a(a), my_b(b) {} 220 void operator()(int) const { 221 my_a.execute(AsynchronousWork(my_b, /*blocking=*/false)); 222 } 223 }; 224 225 class MultipleMastersPart3 : utils::NoAssign { 226 tbb::task_arena &my_a; 227 utils::SpinBarrier &my_b; 228 using wait_context = tbb::detail::d1::wait_context; 229 230 struct Runner : NoAssign { 231 wait_context& myWait; 232 Runner(wait_context& w) : myWait(w) {} 233 void operator()() const { 234 utils::doDummyWork(10000); 235 myWait.release(); 236 } 237 }; 238 239 struct Waiter : NoAssign { 240 wait_context& myWait; 241 Waiter(wait_context& w) : myWait(w) {} 242 void operator()() const { 243 tbb::task_group_context ctx; 244 tbb::detail::d1::wait(myWait, ctx); 245 } 246 }; 247 248 public: 249 MultipleMastersPart3(tbb::task_arena &a, utils::SpinBarrier &b) 250 : my_a(a), my_b(b) {} 251 void operator()(int) const { 252 wait_context wait(0); 253 my_b.wait(); // increases chances for task_arena initialization contention 254 for( int i=0; i<100; ++i) { 255 wait.reserve(); 256 my_a.enqueue(Runner(wait)); 257 my_a.execute(Waiter(wait)); 258 } 259 my_b.wait(); 260 } 261 }; 262 263 void TestMultipleMasters(int p) { 264 { 265 ResetTLS(); 266 tbb::task_arena a(1,0); 267 a.initialize(); 268 ArenaObserver o(a, 1, 0, 1); 269 utils::SpinBarrier barrier1(p), barrier2(2*p+1); // each of p threads will submit two tasks signaling the barrier 270 NativeParallelFor( p, MultipleMastersPart1(a, barrier1, barrier2) ); 271 barrier2.wait(); 272 a.debug_wait_until_empty(); 273 } { 274 ResetTLS(); 275 tbb::task_arena a(2,1); 276 ArenaObserver o(a, 2, 1, 2); 277 utils::SpinBarrier barrier(p+2); 278 a.enqueue(AsynchronousWork(barrier, /*blocking=*/true)); // occupy the worker, a regression test for bug 1981 279 // TODO: buggy test. A worker threads need time to occupy the slot to prevent an external thread from taking an enqueue task. 280 utils::Sleep(10); 281 NativeParallelFor( p, MultipleMastersPart2(a, barrier) ); 282 barrier.wait(); 283 a.debug_wait_until_empty(); 284 } { 285 // Regression test for the bug 1981 part 2 (task_arena::execute() with wait_for_all for an enqueued task) 286 tbb::task_arena a(p,1); 287 utils::SpinBarrier barrier(p+1); // for external threads to avoid endless waiting at least in some runs 288 // "Oversubscribe" the arena by 1 external thread 289 NativeParallelFor( p+1, MultipleMastersPart3(a, barrier) ); 290 a.debug_wait_until_empty(); 291 } 292 } 293 294 //--------------------------------------------------// 295 // TODO: explain what TestArenaEntryConsistency does 296 #include <sstream> 297 #include <stdexcept> 298 #include "oneapi/tbb/detail/_exception.h" 299 #include "common/fp_control.h" 300 301 struct TestArenaEntryBody : FPModeContext { 302 std::atomic<int> &my_stage; // each execute increases it 303 std::stringstream my_id; 304 bool is_caught, is_expected; 305 enum { arenaFPMode = 1 }; 306 307 TestArenaEntryBody(std::atomic<int> &s, int idx, int i) // init thread-specific instance 308 : FPModeContext(idx+i) 309 , my_stage(s) 310 , is_caught(false) 311 #if TBB_USE_EXCEPTIONS 312 , is_expected( (idx&(1<<i)) != 0 ) 313 #else 314 , is_expected(false) 315 #endif 316 { 317 my_id << idx << ':' << i << '@'; 318 } 319 void operator()() { // inside task_arena::execute() 320 // synchronize with other stages 321 int stage = my_stage++; 322 int slot = tbb::this_task_arena::current_thread_index(); 323 CHECK(slot >= 0); 324 CHECK(slot <= 1); 325 // wait until the third stage is delegated and then starts on slot 0 326 while(my_stage < 2+slot) utils::yield(); 327 // deduct its entry type and put it into id, it helps to find source of a problem 328 my_id << (stage < 3 ? (tbb::this_task_arena::current_thread_index()? 329 "delegated_to_worker" : stage < 2? "direct" : "delegated_to_master") 330 : stage == 3? "nested_same_ctx" : "nested_alien_ctx"); 331 AssertFPMode(arenaFPMode); 332 if (is_expected) { 333 TBB_TEST_THROW(std::logic_error(my_id.str())); 334 } 335 // no code can be put here since exceptions can be thrown 336 } 337 void on_exception(const char *e) { // outside arena, in catch block 338 is_caught = true; 339 CHECK(my_id.str() == e); 340 assertFPMode(); 341 } 342 void after_execute() { // outside arena and catch block 343 CHECK(is_caught == is_expected); 344 assertFPMode(); 345 } 346 }; 347 348 class ForEachArenaEntryBody : utils::NoAssign { 349 tbb::task_arena &my_a; // expected task_arena(2,1) 350 std::atomic<int> &my_stage; // each execute increases it 351 int my_idx; 352 353 public: 354 ForEachArenaEntryBody(tbb::task_arena &a, std::atomic<int> &c) 355 : my_a(a), my_stage(c), my_idx(0) {} 356 357 void test(int idx) { 358 my_idx = idx; 359 my_stage = 0; 360 NativeParallelFor(3, *this); // test cross-arena calls 361 CHECK(my_stage == 3); 362 my_a.execute(*this); // test nested calls 363 CHECK(my_stage == 5); 364 } 365 366 // task_arena functor for nested tests 367 void operator()() const { 368 test_arena_entry(3); // in current task group context 369 tbb::parallel_for(4, 5, *this); // in different context 370 } 371 372 // NativeParallelFor & parallel_for functor 373 void operator()(int i) const { 374 test_arena_entry(i); 375 } 376 377 private: 378 void test_arena_entry(int i) const { 379 GetRoundingMode(); 380 TestArenaEntryBody scoped_functor(my_stage, my_idx, i); 381 GetRoundingMode(); 382 #if TBB_USE_EXCEPTIONS 383 try { 384 my_a.execute(scoped_functor); 385 } catch(std::logic_error &e) { 386 scoped_functor.on_exception(e.what()); 387 } catch(...) { CHECK_MESSAGE(false, "Unexpected exception type"); } 388 #else 389 my_a.execute(scoped_functor); 390 #endif 391 scoped_functor.after_execute(); 392 } 393 }; 394 395 void TestArenaEntryConsistency() { 396 tbb::task_arena a(2, 1); 397 std::atomic<int> c; 398 ForEachArenaEntryBody body(a, c); 399 400 FPModeContext fp_scope(TestArenaEntryBody::arenaFPMode); 401 a.initialize(); // capture FP settings to arena 402 fp_scope.setNextFPMode(); 403 404 for (int i = 0; i < 100; i++) // not less than 32 = 2^5 of entry types 405 body.test(i); 406 } 407 //-------------------------------------------------- 408 // Test that the requested degree of concurrency for task_arena is achieved in various conditions 409 class TestArenaConcurrencyBody : utils::NoAssign { 410 tbb::task_arena &my_a; 411 int my_max_concurrency; 412 int my_reserved_slots; 413 utils::SpinBarrier *my_barrier; 414 utils::SpinBarrier *my_worker_barrier; 415 public: 416 TestArenaConcurrencyBody( tbb::task_arena &a, int max_concurrency, int reserved_slots, utils::SpinBarrier *b = NULL, utils::SpinBarrier *wb = NULL ) 417 : my_a(a), my_max_concurrency(max_concurrency), my_reserved_slots(reserved_slots), my_barrier(b), my_worker_barrier(wb) {} 418 // NativeParallelFor's functor 419 void operator()( int ) const { 420 CHECK_MESSAGE( local_id.local() == 0, "TLS was not cleaned?" ); 421 local_id.local() = 1; 422 my_a.execute( *this ); 423 } 424 // Arena's functor 425 void operator()() const { 426 int idx = tbb::this_task_arena::current_thread_index(); 427 CHECK( idx < (my_max_concurrency > 1 ? my_max_concurrency : 2) ); 428 CHECK( my_a.max_concurrency() == tbb::this_task_arena::max_concurrency() ); 429 int max_arena_concurrency = tbb::this_task_arena::max_concurrency(); 430 CHECK( max_arena_concurrency == my_max_concurrency ); 431 if ( my_worker_barrier ) { 432 if ( local_id.local() == 1 ) { 433 // External thread in a reserved slot 434 CHECK_MESSAGE( idx < my_reserved_slots, "External threads are supposed to use only reserved slots in this test" ); 435 } else { 436 // Worker thread 437 CHECK( idx >= my_reserved_slots ); 438 my_worker_barrier->wait(); 439 } 440 } else if ( my_barrier ) 441 CHECK_MESSAGE( local_id.local() == 1, "Workers are not supposed to enter the arena in this test" ); 442 if ( my_barrier ) my_barrier->wait(); 443 else utils::Sleep( 1 ); 444 } 445 }; 446 447 void TestArenaConcurrency( int p, int reserved = 0, int step = 1) { 448 for (; reserved <= p; reserved += step) { 449 tbb::task_arena a( p, reserved ); 450 if (p - reserved < tbb::this_task_arena::max_concurrency()) { 451 // Check concurrency with worker & reserved external threads. 452 ResetTLS(); 453 utils::SpinBarrier b( p ); 454 utils::SpinBarrier wb( p-reserved ); 455 TestArenaConcurrencyBody test( a, p, reserved, &b, &wb ); 456 for ( int i = reserved; i < p; ++i ) // requests p-reserved worker threads 457 a.enqueue( test ); 458 if ( reserved==1 ) 459 test( 0 ); // calls execute() 460 else 461 utils::NativeParallelFor( reserved, test ); 462 a.debug_wait_until_empty(); 463 } { // Check if multiple external threads alone can achieve maximum concurrency. 464 ResetTLS(); 465 utils::SpinBarrier b( p ); 466 utils::NativeParallelFor( p, TestArenaConcurrencyBody( a, p, reserved, &b ) ); 467 a.debug_wait_until_empty(); 468 } { // Check oversubscription by external threads. 469 #if !_WIN32 || !_WIN64 470 // Some C++ implementations allocate 8MB stacks for std::thread on 32 bit platforms 471 // that makes impossible to create more than ~500 threads. 472 if ( !(sizeof(std::size_t) == 4 && p > 200) ) 473 #endif 474 #if TBB_TEST_LOW_WORKLOAD 475 if ( p <= 16 ) 476 #endif 477 { 478 ResetTLS(); 479 utils::NativeParallelFor(2 * p, TestArenaConcurrencyBody(a, p, reserved)); 480 a.debug_wait_until_empty(); 481 } 482 } 483 } 484 } 485 486 struct TestMandatoryConcurrencyObserver : public tbb::task_scheduler_observer { 487 utils::SpinBarrier& m_barrier; 488 489 TestMandatoryConcurrencyObserver(tbb::task_arena& a, utils::SpinBarrier& barrier) 490 : tbb::task_scheduler_observer(a), m_barrier(barrier) { 491 observe(true); 492 } 493 void on_scheduler_exit(bool worker) override { 494 if (worker) { 495 m_barrier.wait(); 496 } 497 } 498 }; 499 500 void TestMandatoryConcurrency() { 501 tbb::task_arena a(1); 502 a.execute([&a] { 503 int n_threads = 4; 504 utils::SpinBarrier exit_barrier(2); 505 TestMandatoryConcurrencyObserver observer(a, exit_barrier); 506 for (int j = 0; j < 5; ++j) { 507 utils::ExactConcurrencyLevel::check(1); 508 std::atomic<int> num_tasks{ 0 }, curr_tasks{ 0 }; 509 utils::SpinBarrier barrier(n_threads); 510 utils::NativeParallelFor(n_threads, [&](int) { 511 for (int i = 0; i < 5; ++i) { 512 barrier.wait(); 513 a.enqueue([&] { 514 CHECK(tbb::this_task_arena::max_concurrency() == 2); 515 CHECK(a.max_concurrency() == 2); 516 ++curr_tasks; 517 CHECK(curr_tasks == 1); 518 utils::doDummyWork(1000); 519 CHECK(curr_tasks == 1); 520 --curr_tasks; 521 ++num_tasks; 522 }); 523 barrier.wait(); 524 } 525 }); 526 do { 527 exit_barrier.wait(); 528 } while (num_tasks < n_threads * 5); 529 } 530 observer.observe(false); 531 }); 532 } 533 534 void TestConcurrentFunctionality(int min_thread_num = 1, int max_thread_num = 3) { 535 TestMandatoryConcurrency(); 536 InitializeAndTerminate(max_thread_num); 537 for (int p = min_thread_num; p <= max_thread_num; ++p) { 538 TestConcurrentArenas(p); 539 TestMultipleMasters(p); 540 TestArenaConcurrency(p); 541 } 542 } 543 544 //--------------------------------------------------// 545 // Test creation/initialization of a task_arena that references an existing arena (aka attach). 546 // This part of the test uses the knowledge of task_arena internals 547 548 struct TaskArenaValidator { 549 int my_slot_at_construction; 550 const tbb::task_arena& my_arena; 551 TaskArenaValidator( const tbb::task_arena& other ) 552 : my_slot_at_construction(tbb::this_task_arena::current_thread_index()) 553 , my_arena(other) 554 {} 555 // Inspect the internal state 556 int concurrency() { return my_arena.debug_max_concurrency(); } 557 int reserved_for_masters() { return my_arena.debug_reserved_slots(); } 558 559 // This method should be called in task_arena::execute() for a captured arena 560 // by the same thread that created the validator. 561 void operator()() { 562 CHECK_MESSAGE( tbb::this_task_arena::current_thread_index()==my_slot_at_construction, 563 "Current thread index has changed since the validator construction" ); 564 } 565 }; 566 567 void ValidateAttachedArena( tbb::task_arena& arena, bool expect_activated, 568 int expect_concurrency, int expect_masters ) { 569 CHECK_MESSAGE( arena.is_active()==expect_activated, "Unexpected activation state" ); 570 if( arena.is_active() ) { 571 TaskArenaValidator validator( arena ); 572 CHECK_MESSAGE( validator.concurrency()==expect_concurrency, "Unexpected arena size" ); 573 CHECK_MESSAGE( validator.reserved_for_masters()==expect_masters, "Unexpected # of reserved slots" ); 574 if ( tbb::this_task_arena::current_thread_index() != tbb::task_arena::not_initialized ) { 575 CHECK(tbb::this_task_arena::current_thread_index() >= 0); 576 // for threads already in arena, check that the thread index remains the same 577 arena.execute( validator ); 578 } else { // not_initialized 579 // Test the deprecated method 580 CHECK(tbb::this_task_arena::current_thread_index() == -1); 581 } 582 583 // Ideally, there should be a check for having the same internal arena object, 584 // but that object is not easily accessible for implicit arenas. 585 } 586 } 587 588 struct TestAttachBody : utils::NoAssign { 589 static thread_local int my_idx; // safe to modify and use within the NativeParallelFor functor 590 const int maxthread; 591 TestAttachBody( int max_thr ) : maxthread(max_thr) {} 592 593 // The functor body for NativeParallelFor 594 void operator()( int idx ) const { 595 my_idx = idx; 596 597 int default_threads = tbb::this_task_arena::max_concurrency(); 598 599 tbb::task_arena arena = tbb::task_arena( tbb::task_arena::attach() ); 600 ValidateAttachedArena( arena, false, -1, -1 ); // Nothing yet to attach to 601 602 arena.terminate(); 603 ValidateAttachedArena( arena, false, -1, -1 ); 604 605 // attach to an auto-initialized arena 606 tbb::parallel_for(0, 1, [](int) {}); 607 608 tbb::task_arena arena2 = tbb::task_arena( tbb::task_arena::attach() ); 609 ValidateAttachedArena( arena2, true, default_threads, 1 ); 610 611 // attach to another task_arena 612 arena.initialize( maxthread, std::min(maxthread,idx) ); 613 arena.execute( *this ); 614 } 615 616 // The functor body for task_arena::execute above 617 void operator()() const { 618 tbb::task_arena arena2 = tbb::task_arena( tbb::task_arena::attach() ); 619 ValidateAttachedArena( arena2, true, maxthread, std::min(maxthread,my_idx) ); 620 } 621 622 // The functor body for tbb::parallel_for 623 void operator()( const Range& r ) const { 624 for( int i = r.begin(); i<r.end(); ++i ) { 625 tbb::task_arena arena2 = tbb::task_arena( tbb::task_arena::attach() ); 626 ValidateAttachedArena( arena2, true, tbb::this_task_arena::max_concurrency(), 1 ); 627 } 628 } 629 }; 630 631 thread_local int TestAttachBody::my_idx; 632 633 void TestAttach( int maxthread ) { 634 // Externally concurrent, but no concurrency within a thread 635 utils::NativeParallelFor( std::max(maxthread,4), TestAttachBody( maxthread ) ); 636 // Concurrent within the current arena; may also serve as a stress test 637 tbb::parallel_for( Range(0,10000*maxthread), TestAttachBody( maxthread ) ); 638 } 639 640 //--------------------------------------------------// 641 642 // Test that task_arena::enqueue does not tolerate a non-const functor. 643 // TODO: can it be reworked as SFINAE-based compile-time check? 644 struct TestFunctor { 645 void operator()() { CHECK_MESSAGE( false, "Non-const operator called" ); } 646 void operator()() const { /* library requires this overload only */ } 647 }; 648 649 void TestConstantFunctorRequirement() { 650 tbb::task_arena a; 651 TestFunctor tf; 652 a.enqueue( tf ); 653 } 654 655 //--------------------------------------------------// 656 657 #include "tbb/parallel_reduce.h" 658 #include "tbb/parallel_invoke.h" 659 660 // Test this_task_arena::isolate 661 namespace TestIsolatedExecuteNS { 662 template <typename NestedPartitioner> 663 class NestedParFor : utils::NoAssign { 664 public: 665 NestedParFor() {} 666 void operator()() const { 667 NestedPartitioner p; 668 tbb::parallel_for( 0, 10, utils::DummyBody( 10 ), p ); 669 } 670 }; 671 672 template <typename NestedPartitioner> 673 class ParForBody : utils::NoAssign { 674 bool myOuterIsolation; 675 tbb::enumerable_thread_specific<int> &myEts; 676 std::atomic<bool> &myIsStolen; 677 public: 678 ParForBody( bool outer_isolation, tbb::enumerable_thread_specific<int> &ets, std::atomic<bool> &is_stolen ) 679 : myOuterIsolation( outer_isolation ), myEts( ets ), myIsStolen( is_stolen ) {} 680 void operator()( int ) const { 681 int &e = myEts.local(); 682 if ( e++ > 0 ) myIsStolen = true; 683 if ( myOuterIsolation ) 684 NestedParFor<NestedPartitioner>()(); 685 else 686 tbb::this_task_arena::isolate( NestedParFor<NestedPartitioner>() ); 687 --e; 688 } 689 }; 690 691 template <typename OuterPartitioner, typename NestedPartitioner> 692 class OuterParFor : utils::NoAssign { 693 bool myOuterIsolation; 694 std::atomic<bool> &myIsStolen; 695 public: 696 OuterParFor( bool outer_isolation, std::atomic<bool> &is_stolen ) : myOuterIsolation( outer_isolation ), myIsStolen( is_stolen ) {} 697 void operator()() const { 698 tbb::enumerable_thread_specific<int> ets( 0 ); 699 OuterPartitioner p; 700 tbb::parallel_for( 0, 1000, ParForBody<NestedPartitioner>( myOuterIsolation, ets, myIsStolen ), p ); 701 } 702 }; 703 704 template <typename OuterPartitioner, typename NestedPartitioner> 705 void TwoLoopsTest( bool outer_isolation ) { 706 std::atomic<bool> is_stolen; 707 is_stolen = false; 708 const int max_repeats = 100; 709 if ( outer_isolation ) { 710 for ( int i = 0; i <= max_repeats; ++i ) { 711 tbb::this_task_arena::isolate( OuterParFor<OuterPartitioner, NestedPartitioner>( outer_isolation, is_stolen ) ); 712 if ( is_stolen ) break; 713 } 714 // TODO: was ASSERT_WARNING 715 if (!is_stolen) { 716 REPORT("Warning: isolate() should not block stealing on nested levels without isolation\n"); 717 } 718 } else { 719 for ( int i = 0; i <= max_repeats; ++i ) { 720 OuterParFor<OuterPartitioner, NestedPartitioner>( outer_isolation, is_stolen )(); 721 } 722 REQUIRE_MESSAGE( !is_stolen, "isolate() on nested levels should prevent stealing from outer leves" ); 723 } 724 } 725 726 void TwoLoopsTest( bool outer_isolation ) { 727 TwoLoopsTest<tbb::simple_partitioner, tbb::simple_partitioner>( outer_isolation ); 728 TwoLoopsTest<tbb::simple_partitioner, tbb::affinity_partitioner>( outer_isolation ); 729 TwoLoopsTest<tbb::affinity_partitioner, tbb::simple_partitioner>( outer_isolation ); 730 TwoLoopsTest<tbb::affinity_partitioner, tbb::affinity_partitioner>( outer_isolation ); 731 } 732 733 void TwoLoopsTest() { 734 TwoLoopsTest( true ); 735 TwoLoopsTest( false ); 736 } 737 //--------------------------------------------------// 738 class HeavyMixTestBody : utils::NoAssign { 739 tbb::enumerable_thread_specific<utils::FastRandom<>>& myRandom; 740 tbb::enumerable_thread_specific<int>& myIsolatedLevel; 741 int myNestedLevel; 742 743 template <typename Partitioner, typename Body> 744 static void RunTwoBodies( utils::FastRandom<>& rnd, const Body &body, Partitioner& p, tbb::task_group_context* ctx = NULL ) { 745 if ( rnd.get() % 2 ) { 746 if (ctx ) 747 tbb::parallel_for( 0, 2, body, p, *ctx ); 748 else 749 tbb::parallel_for( 0, 2, body, p ); 750 } else { 751 tbb::parallel_invoke( body, body ); 752 } 753 } 754 755 template <typename Partitioner> 756 class IsolatedBody : utils::NoAssign { 757 const HeavyMixTestBody &myHeavyMixTestBody; 758 Partitioner &myPartitioner; 759 public: 760 IsolatedBody( const HeavyMixTestBody &body, Partitioner &partitioner ) 761 : myHeavyMixTestBody( body ), myPartitioner( partitioner ) {} 762 void operator()() const { 763 RunTwoBodies( myHeavyMixTestBody.myRandom.local(), 764 HeavyMixTestBody( myHeavyMixTestBody.myRandom, myHeavyMixTestBody.myIsolatedLevel, 765 myHeavyMixTestBody.myNestedLevel + 1 ), 766 myPartitioner ); 767 } 768 }; 769 770 template <typename Partitioner> 771 void RunNextLevel( utils::FastRandom<>& rnd, int &isolated_level ) const { 772 Partitioner p; 773 switch ( rnd.get() % 2 ) { 774 case 0: { 775 // No features 776 tbb::task_group_context ctx; 777 RunTwoBodies( rnd, HeavyMixTestBody(myRandom, myIsolatedLevel, myNestedLevel + 1), p, &ctx ); 778 break; 779 } 780 case 1: { 781 // Isolation 782 int previous_isolation = isolated_level; 783 isolated_level = myNestedLevel; 784 tbb::this_task_arena::isolate( IsolatedBody<Partitioner>( *this, p ) ); 785 isolated_level = previous_isolation; 786 break; 787 } 788 } 789 } 790 public: 791 HeavyMixTestBody( tbb::enumerable_thread_specific<utils::FastRandom<>>& random, 792 tbb::enumerable_thread_specific<int>& isolated_level, int nested_level ) 793 : myRandom( random ), myIsolatedLevel( isolated_level ) 794 , myNestedLevel( nested_level ) {} 795 void operator()() const { 796 int &isolated_level = myIsolatedLevel.local(); 797 CHECK_FAST_MESSAGE( myNestedLevel > isolated_level, "The outer-level task should not be stolen on isolated level" ); 798 if ( myNestedLevel == 20 ) 799 return; 800 utils::FastRandom<>& rnd = myRandom.local(); 801 if ( rnd.get() % 2 == 1 ) { 802 RunNextLevel<tbb::auto_partitioner>( rnd, isolated_level ); 803 } else { 804 RunNextLevel<tbb::affinity_partitioner>( rnd, isolated_level ); 805 } 806 } 807 void operator()(int) const { 808 this->operator()(); 809 } 810 }; 811 812 struct RandomInitializer { 813 utils::FastRandom<> operator()() { 814 return utils::FastRandom<>( tbb::this_task_arena::current_thread_index() ); 815 } 816 }; 817 818 void HeavyMixTest() { 819 std::size_t num_threads = tbb::this_task_arena::max_concurrency() < 3 ? 3 : tbb::this_task_arena::max_concurrency(); 820 tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, num_threads); 821 822 RandomInitializer init_random; 823 tbb::enumerable_thread_specific<utils::FastRandom<>> random( init_random ); 824 tbb::enumerable_thread_specific<int> isolated_level( 0 ); 825 for ( int i = 0; i < 5; ++i ) { 826 HeavyMixTestBody b( random, isolated_level, 1 ); 827 b( 0 ); 828 } 829 } 830 831 //--------------------------------------------------// 832 #if TBB_USE_EXCEPTIONS 833 struct MyException {}; 834 struct IsolatedBodyThrowsException { 835 void operator()() const { 836 #if _MSC_VER && !__INTEL_COMPILER 837 // Workaround an unreachable code warning in task_arena_function. 838 volatile bool workaround = true; 839 if (workaround) 840 #endif 841 { 842 throw MyException(); 843 } 844 } 845 }; 846 struct ExceptionTestBody : utils::NoAssign { 847 tbb::enumerable_thread_specific<int>& myEts; 848 std::atomic<bool>& myIsStolen; 849 ExceptionTestBody( tbb::enumerable_thread_specific<int>& ets, std::atomic<bool>& is_stolen ) 850 : myEts( ets ), myIsStolen( is_stolen ) {} 851 void operator()( int i ) const { 852 try { 853 tbb::this_task_arena::isolate( IsolatedBodyThrowsException() ); 854 REQUIRE_MESSAGE( false, "The exception has been lost" ); 855 } 856 catch ( MyException ) {} 857 catch ( ... ) { 858 REQUIRE_MESSAGE( false, "Unexpected exception" ); 859 } 860 // Check that nested algorithms can steal outer-level tasks 861 int &e = myEts.local(); 862 if ( e++ > 0 ) myIsStolen = true; 863 // work imbalance increases chances for stealing 864 tbb::parallel_for( 0, 10+i, utils::DummyBody( 100 ) ); 865 --e; 866 } 867 }; 868 869 #endif /* TBB_USE_EXCEPTIONS */ 870 void ExceptionTest() { 871 #if TBB_USE_EXCEPTIONS 872 tbb::enumerable_thread_specific<int> ets; 873 std::atomic<bool> is_stolen; 874 is_stolen = false; 875 for ( ;; ) { 876 tbb::parallel_for( 0, 1000, ExceptionTestBody( ets, is_stolen ) ); 877 if ( is_stolen ) break; 878 } 879 REQUIRE_MESSAGE( is_stolen, "isolate should not affect non-isolated work" ); 880 #endif /* TBB_USE_EXCEPTIONS */ 881 } 882 883 struct NonConstBody { 884 unsigned int state; 885 void operator()() { 886 state ^= ~0u; 887 } 888 }; 889 890 void TestNonConstBody() { 891 NonConstBody body; 892 body.state = 0x6c97d5ed; 893 tbb::this_task_arena::isolate(body); 894 REQUIRE_MESSAGE(body.state == 0x93682a12, "The wrong state"); 895 } 896 897 // TODO: Consider tbb::task_group instead of explicit task API. 898 class TestEnqueueTask : public tbb::detail::d1::task { 899 using wait_context = tbb::detail::d1::wait_context; 900 901 tbb::enumerable_thread_specific<bool>& executed; 902 std::atomic<int>& completed; 903 904 public: 905 wait_context& waiter; 906 tbb::task_arena& arena; 907 static const int N = 100; 908 909 TestEnqueueTask(tbb::enumerable_thread_specific<bool>& exe, std::atomic<int>& c, wait_context& w, tbb::task_arena& a) 910 : executed(exe), completed(c), waiter(w), arena(a) {} 911 912 tbb::detail::d1::task* execute(tbb::detail::d1::execution_data&) override { 913 for (int i = 0; i < N; ++i) { 914 arena.enqueue([&]() { 915 executed.local() = true; 916 ++completed; 917 for (int j = 0; j < 100; j++) utils::yield(); 918 waiter.release(1); 919 }); 920 } 921 return nullptr; 922 } 923 tbb::detail::d1::task* cancel(tbb::detail::d1::execution_data&) override { return nullptr; } 924 }; 925 926 class TestEnqueueIsolateBody : utils::NoCopy { 927 tbb::enumerable_thread_specific<bool>& executed; 928 std::atomic<int>& completed; 929 tbb::task_arena& arena; 930 public: 931 static const int N = 100; 932 933 TestEnqueueIsolateBody(tbb::enumerable_thread_specific<bool>& exe, std::atomic<int>& c, tbb::task_arena& a) 934 : executed(exe), completed(c), arena(a) {} 935 void operator()() { 936 tbb::task_group_context ctx; 937 tbb::detail::d1::wait_context waiter(N); 938 939 TestEnqueueTask root(executed, completed, waiter, arena); 940 tbb::detail::d1::execute_and_wait(root, ctx, waiter, ctx); 941 } 942 }; 943 944 void TestEnqueue() { 945 tbb::enumerable_thread_specific<bool> executed(false); 946 std::atomic<int> completed; 947 tbb::task_arena arena = tbb::task_arena(tbb::task_arena::attach()); 948 949 // Check that the main thread can process enqueued tasks. 950 completed = 0; 951 TestEnqueueIsolateBody b1(executed, completed, arena); 952 b1(); 953 954 if (!executed.local()) { 955 REPORT("Warning: No one enqueued task has executed by the main thread.\n"); 956 } 957 958 executed.local() = false; 959 completed = 0; 960 const int N = 100; 961 // Create enqueued tasks out of isolation. 962 963 tbb::task_group_context ctx; 964 tbb::detail::d1::wait_context waiter(N); 965 for (int i = 0; i < N; ++i) { 966 arena.enqueue([&]() { 967 executed.local() = true; 968 ++completed; 969 utils::yield(); 970 waiter.release(1); 971 }); 972 } 973 TestEnqueueIsolateBody b2(executed, completed, arena); 974 tbb::this_task_arena::isolate(b2); 975 REQUIRE_MESSAGE(executed.local() == false, "An enqueued task was executed within isolate."); 976 977 tbb::detail::d1::wait(waiter, ctx); 978 // while (completed < TestEnqueueTask::N + N) utils::yield(); 979 } 980 } 981 982 void TestIsolatedExecute() { 983 // At least 3 threads (owner + 2 thieves) are required to reproduce a situation when the owner steals outer 984 // level task on a nested level. If we have only one thief then it will execute outer level tasks first and 985 // the owner will not have a possibility to steal outer level tasks. 986 int platform_max_thread = tbb::this_task_arena::max_concurrency(); 987 int num_threads = utils::min( platform_max_thread, 3 ); 988 { 989 // Too many threads require too many work to reproduce the stealing from outer level. 990 tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, utils::max(num_threads, 7)); 991 TestIsolatedExecuteNS::TwoLoopsTest(); 992 TestIsolatedExecuteNS::HeavyMixTest(); 993 TestIsolatedExecuteNS::ExceptionTest(); 994 } 995 tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, num_threads); 996 TestIsolatedExecuteNS::HeavyMixTest(); 997 TestIsolatedExecuteNS::TestNonConstBody(); 998 TestIsolatedExecuteNS::TestEnqueue(); 999 } 1000 1001 //-----------------------------------------------------------------------------------------// 1002 1003 class TestDelegatedSpawnWaitBody : utils::NoAssign { 1004 tbb::task_arena &my_a; 1005 utils::SpinBarrier &my_b1, &my_b2; 1006 public: 1007 TestDelegatedSpawnWaitBody( tbb::task_arena &a, utils::SpinBarrier &b1, utils::SpinBarrier &b2) 1008 : my_a(a), my_b1(b1), my_b2(b2) {} 1009 // NativeParallelFor's functor 1010 void operator()(int idx) const { 1011 if ( idx==0 ) { // thread 0 works in the arena, thread 1 waits for it (to prevent test hang) 1012 for (int i = 0; i < 2; ++i) { 1013 my_a.enqueue([this] { my_b1.wait(); }); // tasks to sync with workers 1014 } 1015 tbb::task_group tg; 1016 my_b1.wait(); // sync with the workers 1017 for( int i=0; i<100000; ++i) { 1018 my_a.execute([&tg] { tg.run([] {}); }); 1019 } 1020 my_a.execute([&tg] {tg.wait(); }); 1021 } 1022 1023 my_b2.wait(); // sync both threads 1024 } 1025 }; 1026 1027 void TestDelegatedSpawnWait() { 1028 if (tbb::this_task_arena::max_concurrency() < 3) { 1029 // The test requires at least 2 worker threads 1030 return; 1031 } 1032 // Regression test for a bug with missed wakeup notification from a delegated task 1033 tbb::task_arena a(2,0); 1034 a.initialize(); 1035 utils::SpinBarrier barrier1(3), barrier2(2); 1036 utils::NativeParallelFor( 2, TestDelegatedSpawnWaitBody(a, barrier1, barrier2) ); 1037 a.debug_wait_until_empty(); 1038 } 1039 1040 //-----------------------------------------------------------------------------------------// 1041 1042 class TestMultipleWaitsArenaWait : utils::NoAssign { 1043 using wait_context = tbb::detail::d1::wait_context; 1044 public: 1045 TestMultipleWaitsArenaWait( int idx, int bunch_size, int num_tasks, std::vector<wait_context*>& waiters, std::atomic<int>& processed, tbb::task_group_context& tgc ) 1046 : my_idx( idx ), my_bunch_size( bunch_size ), my_num_tasks(num_tasks), my_waiters( waiters ), my_processed( processed ), my_context(tgc) {} 1047 void operator()() const { 1048 ++my_processed; 1049 // Wait for all tasks 1050 if ( my_idx < my_num_tasks ) { 1051 tbb::detail::d1::wait(*my_waiters[my_idx], my_context); 1052 } 1053 // Signal waiting tasks 1054 if ( my_idx >= my_bunch_size ) { 1055 my_waiters[my_idx-my_bunch_size]->release(); 1056 } 1057 } 1058 private: 1059 int my_idx; 1060 int my_bunch_size; 1061 int my_num_tasks; 1062 std::vector<wait_context*>& my_waiters; 1063 std::atomic<int>& my_processed; 1064 tbb::task_group_context& my_context; 1065 }; 1066 1067 class TestMultipleWaitsThreadBody : utils::NoAssign { 1068 using wait_context = tbb::detail::d1::wait_context; 1069 public: 1070 TestMultipleWaitsThreadBody( int bunch_size, int num_tasks, tbb::task_arena& a, std::vector<wait_context*>& waiters, std::atomic<int>& processed, tbb::task_group_context& tgc ) 1071 : my_bunch_size( bunch_size ), my_num_tasks( num_tasks ), my_arena( a ), my_waiters( waiters ), my_processed( processed ), my_context(tgc) {} 1072 void operator()( int idx ) const { 1073 my_arena.execute( TestMultipleWaitsArenaWait( idx, my_bunch_size, my_num_tasks, my_waiters, my_processed, my_context ) ); 1074 --my_processed; 1075 } 1076 private: 1077 int my_bunch_size; 1078 int my_num_tasks; 1079 tbb::task_arena& my_arena; 1080 std::vector<wait_context*>& my_waiters; 1081 std::atomic<int>& my_processed; 1082 tbb::task_group_context& my_context; 1083 }; 1084 1085 void TestMultipleWaits( int num_threads, int num_bunches, int bunch_size ) { 1086 tbb::task_arena a( num_threads ); 1087 const int num_tasks = (num_bunches-1)*bunch_size; 1088 1089 tbb::task_group_context tgc; 1090 std::vector<tbb::detail::d1::wait_context*> waiters(num_tasks); 1091 for (auto& w : waiters) w = new tbb::detail::d1::wait_context(0); 1092 1093 std::atomic<int> processed(0); 1094 for ( int repeats = 0; repeats<10; ++repeats ) { 1095 int idx = 0; 1096 for ( int bunch = 0; bunch < num_bunches-1; ++bunch ) { 1097 // Sync with the previous bunch of waiters to prevent "false" nested dependicies (when a nested task waits for an outer task). 1098 while ( processed < bunch*bunch_size ) utils::yield(); 1099 // Run the bunch of threads/waiters that depend on the next bunch of threads/waiters. 1100 for ( int i = 0; i<bunch_size; ++i ) { 1101 waiters[idx]->reserve(); 1102 std::thread( TestMultipleWaitsThreadBody( bunch_size, num_tasks, a, waiters, processed, tgc ), idx++ ).detach(); 1103 } 1104 } 1105 // No sync because the threads of the last bunch do not call wait_for_all. 1106 // Run the last bunch of threads. 1107 for ( int i = 0; i<bunch_size; ++i ) 1108 std::thread( TestMultipleWaitsThreadBody( bunch_size, num_tasks, a, waiters, processed, tgc ), idx++ ).detach(); 1109 while ( processed ) utils::yield(); 1110 } 1111 for (auto w : waiters) delete w; 1112 } 1113 1114 void TestMultipleWaits() { 1115 // Limit the number of threads to prevent heavy oversubscription. 1116 #if TBB_TEST_LOW_WORKLOAD 1117 const int max_threads = std::min( 4, tbb::this_task_arena::max_concurrency() ); 1118 #else 1119 const int max_threads = std::min( 16, tbb::this_task_arena::max_concurrency() ); 1120 #endif 1121 1122 utils::FastRandom<> rnd(1234); 1123 for ( int threads = 1; threads <= max_threads; threads += utils::max( threads/2, 1 ) ) { 1124 for ( int i = 0; i<3; ++i ) { 1125 const int num_bunches = 3 + rnd.get()%3; 1126 const int bunch_size = max_threads + rnd.get()%max_threads; 1127 TestMultipleWaits( threads, num_bunches, bunch_size ); 1128 } 1129 } 1130 } 1131 1132 //--------------------------------------------------// 1133 1134 void TestSmallStackSize() { 1135 tbb::global_control gc(tbb::global_control::thread_stack_size, 1136 tbb::global_control::active_value(tbb::global_control::thread_stack_size) / 2 ); 1137 // The test produces the warning (not a error) if fails. So the test is run many times 1138 // to make the log annoying (to force to consider it as an error). 1139 for (int i = 0; i < 100; ++i) { 1140 tbb::task_arena a; 1141 a.initialize(); 1142 } 1143 } 1144 1145 //--------------------------------------------------// 1146 1147 namespace TestMoveSemanticsNS { 1148 struct TestFunctor { 1149 void operator()() const {}; 1150 }; 1151 1152 struct MoveOnlyFunctor : utils::MoveOnly, TestFunctor { 1153 MoveOnlyFunctor() : utils::MoveOnly() {}; 1154 MoveOnlyFunctor(MoveOnlyFunctor&& other) : utils::MoveOnly(std::move(other)) {}; 1155 }; 1156 1157 struct MovePreferableFunctor : utils::Movable, TestFunctor { 1158 MovePreferableFunctor() : utils::Movable() {}; 1159 MovePreferableFunctor(MovePreferableFunctor&& other) : utils::Movable( std::move(other) ) {}; 1160 MovePreferableFunctor(const MovePreferableFunctor& other) : utils::Movable(other) {}; 1161 }; 1162 1163 struct NoMoveNoCopyFunctor : utils::NoCopy, TestFunctor { 1164 NoMoveNoCopyFunctor() : utils::NoCopy() {}; 1165 // mv ctor is not allowed as cp ctor from parent NoCopy 1166 private: 1167 NoMoveNoCopyFunctor(NoMoveNoCopyFunctor&&); 1168 }; 1169 1170 void TestFunctors() { 1171 tbb::task_arena ta; 1172 MovePreferableFunctor mpf; 1173 // execute() doesn't have any copies or moves of arguments inside the impl 1174 ta.execute( NoMoveNoCopyFunctor() ); 1175 1176 ta.enqueue( MoveOnlyFunctor() ); 1177 ta.enqueue( mpf ); 1178 REQUIRE_MESSAGE(mpf.alive, "object was moved when was passed by lval"); 1179 mpf.Reset(); 1180 ta.enqueue( std::move(mpf) ); 1181 REQUIRE_MESSAGE(!mpf.alive, "object was copied when was passed by rval"); 1182 mpf.Reset(); 1183 } 1184 } 1185 1186 void TestMoveSemantics() { 1187 TestMoveSemanticsNS::TestFunctors(); 1188 } 1189 1190 //--------------------------------------------------// 1191 1192 #include <vector> 1193 1194 #include "common/state_trackable.h" 1195 1196 namespace TestReturnValueNS { 1197 struct noDefaultTag {}; 1198 class ReturnType : public StateTrackable<> { 1199 static const int SIZE = 42; 1200 std::vector<int> data; 1201 public: 1202 ReturnType(noDefaultTag) : StateTrackable<>(0) {} 1203 // Define copy constructor to test that it is never called 1204 ReturnType(const ReturnType& r) : StateTrackable<>(r), data(r.data) {} 1205 ReturnType(ReturnType&& r) : StateTrackable<>(std::move(r)), data(std::move(r.data)) {} 1206 1207 void fill() { 1208 for (int i = 0; i < SIZE; ++i) 1209 data.push_back(i); 1210 } 1211 void check() { 1212 REQUIRE(data.size() == unsigned(SIZE)); 1213 for (int i = 0; i < SIZE; ++i) 1214 REQUIRE(data[i] == i); 1215 StateTrackableCounters::counters_type& cnts = StateTrackableCounters::counters; 1216 REQUIRE(cnts[StateTrackableBase::DefaultInitialized] == 0); 1217 REQUIRE(cnts[StateTrackableBase::DirectInitialized] == 1); 1218 std::size_t copied = cnts[StateTrackableBase::CopyInitialized]; 1219 std::size_t moved = cnts[StateTrackableBase::MoveInitialized]; 1220 REQUIRE(cnts[StateTrackableBase::Destroyed] == copied + moved); 1221 // The number of copies/moves should not exceed 3: function return, store to an internal storage, 1222 // acquire internal storage. 1223 REQUIRE((copied == 0 && moved <=3)); 1224 } 1225 }; 1226 1227 template <typename R> 1228 R function() { 1229 noDefaultTag tag; 1230 R r(tag); 1231 r.fill(); 1232 return r; 1233 } 1234 1235 template <> 1236 void function<void>() {} 1237 1238 template <typename R> 1239 struct Functor { 1240 R operator()() const { 1241 return function<R>(); 1242 } 1243 }; 1244 1245 tbb::task_arena& arena() { 1246 static tbb::task_arena a; 1247 return a; 1248 } 1249 1250 template <typename F> 1251 void TestExecute(F &f) { 1252 StateTrackableCounters::reset(); 1253 ReturnType r = arena().execute(f); 1254 r.check(); 1255 } 1256 1257 template <typename F> 1258 void TestExecute(const F &f) { 1259 StateTrackableCounters::reset(); 1260 ReturnType r = arena().execute(f); 1261 r.check(); 1262 } 1263 template <typename F> 1264 void TestIsolate(F &f) { 1265 StateTrackableCounters::reset(); 1266 ReturnType r = tbb::this_task_arena::isolate(f); 1267 r.check(); 1268 } 1269 1270 template <typename F> 1271 void TestIsolate(const F &f) { 1272 StateTrackableCounters::reset(); 1273 ReturnType r = tbb::this_task_arena::isolate(f); 1274 r.check(); 1275 } 1276 1277 void Test() { 1278 TestExecute(Functor<ReturnType>()); 1279 Functor<ReturnType> f1; 1280 TestExecute(f1); 1281 TestExecute(function<ReturnType>); 1282 1283 arena().execute(Functor<void>()); 1284 Functor<void> f2; 1285 arena().execute(f2); 1286 arena().execute(function<void>); 1287 TestIsolate(Functor<ReturnType>()); 1288 TestIsolate(f1); 1289 TestIsolate(function<ReturnType>); 1290 tbb::this_task_arena::isolate(Functor<void>()); 1291 tbb::this_task_arena::isolate(f2); 1292 tbb::this_task_arena::isolate(function<void>); 1293 } 1294 } 1295 1296 void TestReturnValue() { 1297 TestReturnValueNS::Test(); 1298 } 1299 1300 //--------------------------------------------------// 1301 1302 // MyObserver checks if threads join to the same arena 1303 struct MyObserver: public tbb::task_scheduler_observer { 1304 tbb::enumerable_thread_specific<tbb::task_arena*>& my_tls; 1305 tbb::task_arena& my_arena; 1306 std::atomic<int>& my_failure_counter; 1307 std::atomic<int>& my_counter; 1308 utils::SpinBarrier& m_barrier; 1309 1310 MyObserver(tbb::task_arena& a, 1311 tbb::enumerable_thread_specific<tbb::task_arena*>& tls, 1312 std::atomic<int>& failure_counter, 1313 std::atomic<int>& counter, 1314 utils::SpinBarrier& barrier) 1315 : tbb::task_scheduler_observer(a), my_tls(tls), my_arena(a), 1316 my_failure_counter(failure_counter), my_counter(counter), m_barrier(barrier) { 1317 observe(true); 1318 } 1319 void on_scheduler_entry(bool worker) override { 1320 if (worker) { 1321 ++my_counter; 1322 tbb::task_arena*& cur_arena = my_tls.local(); 1323 if (cur_arena != 0 && cur_arena != &my_arena) { 1324 ++my_failure_counter; 1325 } 1326 cur_arena = &my_arena; 1327 m_barrier.wait(); 1328 } 1329 } 1330 void on_scheduler_exit(bool worker) override { 1331 if (worker) { 1332 m_barrier.wait(); // before wakeup 1333 m_barrier.wait(); // after wakeup 1334 } 1335 } 1336 }; 1337 1338 void TestArenaWorkersMigrationWithNumThreads(int n_threads = 0) { 1339 if (n_threads == 0) { 1340 n_threads = tbb::this_task_arena::max_concurrency(); 1341 } 1342 1343 const int max_n_arenas = 8; 1344 int n_arenas = 2; 1345 if(n_threads > 16) { 1346 n_arenas = max_n_arenas; 1347 } else if (n_threads > 8) { 1348 n_arenas = 4; 1349 } 1350 1351 int n_workers = n_threads - 1; 1352 n_workers = n_arenas * (n_workers / n_arenas); 1353 if (n_workers == 0) { 1354 return; 1355 } 1356 1357 n_threads = n_workers + 1; 1358 tbb::global_control control(tbb::global_control::max_allowed_parallelism, n_threads); 1359 1360 const int n_repetitions = 20; 1361 const int n_outer_repetitions = 100; 1362 std::multiset<float> failure_ratio; // for median calculating 1363 utils::SpinBarrier barrier(n_threads); 1364 utils::SpinBarrier worker_barrier(n_workers); 1365 MyObserver* observer[max_n_arenas]; 1366 std::vector<tbb::task_arena> arenas(n_arenas); 1367 std::atomic<int> failure_counter; 1368 std::atomic<int> counter; 1369 tbb::enumerable_thread_specific<tbb::task_arena*> tls; 1370 1371 for (int i = 0; i < n_arenas; ++i) { 1372 arenas[i].initialize(n_workers / n_arenas + 1); // +1 for master 1373 observer[i] = new MyObserver(arenas[i], tls, failure_counter, counter, barrier); 1374 } 1375 1376 int ii = 0; 1377 for (; ii < n_outer_repetitions; ++ii) { 1378 failure_counter = 0; 1379 counter = 0; 1380 1381 // Main code 1382 auto wakeup = [&arenas] { for (auto& a : arenas) a.enqueue([]{}); }; 1383 wakeup(); 1384 for (int j = 0; j < n_repetitions; ++j) { 1385 barrier.wait(); // entry 1386 barrier.wait(); // exit1 1387 wakeup(); 1388 barrier.wait(); // exit2 1389 } 1390 barrier.wait(); // entry 1391 barrier.wait(); // exit1 1392 barrier.wait(); // exit2 1393 1394 failure_ratio.insert(float(failure_counter) / counter); 1395 tls.clear(); 1396 // collect 3 elements in failure_ratio before calculating median 1397 if (ii > 1) { 1398 std::multiset<float>::iterator it = failure_ratio.begin(); 1399 std::advance(it, failure_ratio.size() / 2); 1400 if (*it < 0.02) 1401 break; 1402 } 1403 } 1404 for (int i = 0; i < n_arenas; ++i) { 1405 delete observer[i]; 1406 } 1407 // check if median is so big 1408 std::multiset<float>::iterator it = failure_ratio.begin(); 1409 std::advance(it, failure_ratio.size() / 2); 1410 // TODO: decrease constants 0.05 and 0.3 by setting ratio between n_threads and n_arenas 1411 if (*it > 0.05) { 1412 REPORT("Warning: So many cases when threads join to different arenas.\n"); 1413 REQUIRE_MESSAGE(*it <= 0.3, "A lot of cases when threads join to different arenas.\n"); 1414 } 1415 } 1416 1417 void TestArenaWorkersMigration() { 1418 TestArenaWorkersMigrationWithNumThreads(4); 1419 if (tbb::this_task_arena::max_concurrency() != 4) { 1420 TestArenaWorkersMigrationWithNumThreads(); 1421 } 1422 } 1423 1424 //--------------------------------------------------// 1425 void TestDefaultCreatedWorkersAmount() { 1426 int threads = tbb::this_task_arena::max_concurrency(); 1427 utils::NativeParallelFor(1, [threads](int idx) { 1428 REQUIRE_MESSAGE(idx == 0, "more than 1 thread is going to reset TLS"); 1429 utils::SpinBarrier barrier(threads); 1430 ResetTLS(); 1431 for (int trail = 0; trail < 10; ++trail) { 1432 tbb::parallel_for(0, threads, [threads, &barrier](int) { 1433 REQUIRE_MESSAGE(threads == tbb::this_task_arena::max_concurrency(), "concurrency level is not equal specified threadnum"); 1434 REQUIRE_MESSAGE(tbb::this_task_arena::current_thread_index() < tbb::this_task_arena::max_concurrency(), "amount of created threads is more than specified by default"); 1435 local_id.local() = 1; 1436 // If there is more threads than expected, 'sleep' gives a chance to join unexpected threads. 1437 utils::Sleep(1); 1438 barrier.wait(); 1439 }, tbb::simple_partitioner()); 1440 REQUIRE_MESSAGE(local_id.size() == size_t(threads), "amount of created threads is not equal to default num"); 1441 } 1442 }); 1443 } 1444 1445 void TestAbilityToCreateWorkers(int thread_num) { 1446 tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, thread_num); 1447 // Checks only some part of reserved-external threads amount: 1448 // 0 and 1 reserved threads are important cases but it is also needed 1449 // to collect some statistic data with other amount and to not consume 1450 // whole test sesion time checking each amount 1451 TestArenaConcurrency(thread_num - 1, 0, int(thread_num / 2.72)); 1452 TestArenaConcurrency(thread_num, 1, int(thread_num / 3.14)); 1453 } 1454 1455 void TestDefaultWorkersLimit() { 1456 TestDefaultCreatedWorkersAmount(); 1457 #if TBB_TEST_LOW_WORKLOAD 1458 TestAbilityToCreateWorkers(24); 1459 #else 1460 TestAbilityToCreateWorkers(256); 1461 #endif 1462 } 1463 1464 #if TBB_USE_EXCEPTIONS 1465 1466 void ExceptionInExecute() { 1467 std::size_t thread_number = utils::get_platform_max_threads(); 1468 tbb::task_arena test_arena(thread_number / 2, thread_number / 2); 1469 1470 std::atomic<int> canceled_task{}; 1471 1472 auto parallel_func = [&test_arena, &canceled_task] (std::size_t) { 1473 for (std::size_t i = 0; i < 1000; ++i) { 1474 try { 1475 test_arena.execute([] { 1476 volatile bool suppress_unreachable_code_warning = true; 1477 if (suppress_unreachable_code_warning) { 1478 throw -1; 1479 } 1480 }); 1481 FAIL("An exception should have thrown."); 1482 } catch (int) { 1483 ++canceled_task; 1484 } catch (...) { 1485 FAIL("Wrong type of exception."); 1486 } 1487 } 1488 }; 1489 1490 utils::NativeParallelFor(thread_number, parallel_func); 1491 CHECK(canceled_task == thread_number * 1000); 1492 } 1493 1494 #endif // TBB_USE_EXCEPTIONS 1495 1496 class simple_observer : public tbb::task_scheduler_observer { 1497 static std::atomic<int> idx_counter; 1498 int my_idx; 1499 int myMaxConcurrency; // concurrency of the associated arena 1500 int myNumReservedSlots; // reserved slots in the associated arena 1501 void on_scheduler_entry( bool is_worker ) override { 1502 int current_index = tbb::this_task_arena::current_thread_index(); 1503 CHECK(current_index < (myMaxConcurrency > 1 ? myMaxConcurrency : 2)); 1504 if (is_worker) { 1505 CHECK(current_index >= myNumReservedSlots); 1506 } 1507 } 1508 void on_scheduler_exit( bool /*is_worker*/ ) override 1509 {} 1510 public: 1511 simple_observer(tbb::task_arena &a, int maxConcurrency, int numReservedSlots) 1512 : tbb::task_scheduler_observer(a), my_idx(idx_counter++) 1513 , myMaxConcurrency(maxConcurrency) 1514 , myNumReservedSlots(numReservedSlots) { 1515 observe(true); 1516 } 1517 1518 friend bool operator<(const simple_observer& lhs, const simple_observer& rhs) { 1519 return lhs.my_idx < rhs.my_idx; 1520 } 1521 }; 1522 1523 std::atomic<int> simple_observer::idx_counter{}; 1524 1525 struct arena_handler { 1526 enum arena_status { 1527 alive, 1528 deleting, 1529 deleted 1530 }; 1531 1532 tbb::task_arena* arena; 1533 1534 std::atomic<arena_status> status{alive}; 1535 tbb::spin_rw_mutex arena_in_use{}; 1536 1537 tbb::concurrent_set<simple_observer> observers; 1538 1539 arena_handler(tbb::task_arena* ptr) : arena(ptr) 1540 {} 1541 1542 friend bool operator<(const arena_handler& lhs, const arena_handler& rhs) { 1543 return lhs.arena < rhs.arena; 1544 } 1545 }; 1546 1547 // TODO: Add observer operations 1548 void StressTestMixFunctionality() { 1549 enum operation_type { 1550 create_arena, 1551 delete_arena, 1552 attach_observer, 1553 detach_observer, 1554 arena_execute, 1555 enqueue_task, 1556 last_operation_marker 1557 }; 1558 1559 std::size_t operations_number = last_operation_marker; 1560 std::size_t thread_number = utils::get_platform_max_threads(); 1561 utils::FastRandom<> operation_rnd(42); 1562 tbb::spin_mutex random_operation_guard; 1563 1564 auto get_random_operation = [&operation_rnd, &random_operation_guard, operations_number] () { 1565 tbb::spin_mutex::scoped_lock lock(random_operation_guard); 1566 return static_cast<operation_type>(operation_rnd.get() % operations_number); 1567 }; 1568 1569 utils::FastRandom<> arena_rnd(42); 1570 tbb::spin_mutex random_arena_guard; 1571 auto get_random_arena = [&arena_rnd, &random_arena_guard] () { 1572 tbb::spin_mutex::scoped_lock lock(random_arena_guard); 1573 return arena_rnd.get(); 1574 }; 1575 1576 tbb::concurrent_set<arena_handler> arenas_pool; 1577 1578 std::vector<std::thread> thread_pool; 1579 1580 utils::SpinBarrier thread_barrier(thread_number); 1581 std::size_t max_operations = 20000; 1582 std::atomic<std::size_t> curr_operation{}; 1583 auto thread_func = [&] () { 1584 arenas_pool.emplace(new tbb::task_arena()); 1585 thread_barrier.wait(); 1586 while (curr_operation++ < max_operations) { 1587 switch (get_random_operation()) { 1588 case create_arena : 1589 { 1590 arenas_pool.emplace(new tbb::task_arena()); 1591 break; 1592 } 1593 case delete_arena : 1594 { 1595 auto curr_arena = arenas_pool.begin(); 1596 for (; curr_arena != arenas_pool.end(); ++curr_arena) { 1597 arena_handler::arena_status curr_status = arena_handler::alive; 1598 if (curr_arena->status.compare_exchange_strong(curr_status, arena_handler::deleting)) { 1599 break; 1600 } 1601 } 1602 1603 if (curr_arena == arenas_pool.end()) break; 1604 1605 tbb::spin_rw_mutex::scoped_lock lock(curr_arena->arena_in_use, /*writer*/ true); 1606 1607 delete curr_arena->arena; 1608 curr_arena->status.store(arena_handler::deleted); 1609 1610 break; 1611 } 1612 case attach_observer : 1613 { 1614 tbb::spin_rw_mutex::scoped_lock lock{}; 1615 auto curr_arena = arenas_pool.begin(); 1616 for (; curr_arena != arenas_pool.end(); ++curr_arena) { 1617 if (lock.try_acquire(curr_arena->arena_in_use, /*writer*/ false)) { 1618 if (curr_arena->status == arena_handler::alive) { 1619 break; 1620 } else { 1621 lock.release(); 1622 } 1623 } 1624 } 1625 1626 if (curr_arena == arenas_pool.end()) break; 1627 1628 { 1629 curr_arena->observers.emplace(*curr_arena->arena, thread_number, 1); 1630 } 1631 1632 break; 1633 } 1634 case detach_observer : 1635 { 1636 auto arena_number = get_random_arena() % arenas_pool.size(); 1637 auto curr_arena = arenas_pool.begin(); 1638 std::advance(curr_arena, arena_number); 1639 1640 for (auto it = curr_arena->observers.begin(); it != curr_arena->observers.end(); ++it) { 1641 if (it->is_observing()) { 1642 it->observe(false); 1643 break; 1644 } 1645 } 1646 1647 break; 1648 } 1649 case arena_execute : 1650 { 1651 tbb::spin_rw_mutex::scoped_lock lock{}; 1652 auto curr_arena = arenas_pool.begin(); 1653 for (; curr_arena != arenas_pool.end(); ++curr_arena) { 1654 if (lock.try_acquire(curr_arena->arena_in_use, /*writer*/ false)) { 1655 if (curr_arena->status == arena_handler::alive) { 1656 break; 1657 } else { 1658 lock.release(); 1659 } 1660 } 1661 } 1662 1663 if (curr_arena == arenas_pool.end()) break; 1664 1665 curr_arena->arena->execute([] () { 1666 tbb::parallel_for(tbb::blocked_range<std::size_t>(0, 10000), [] (tbb::blocked_range<std::size_t>&) { 1667 std::atomic<int> sum{}; 1668 // Make some work 1669 for (; sum < 10; ++sum) ; 1670 }); 1671 }); 1672 1673 break; 1674 } 1675 case enqueue_task : 1676 { 1677 tbb::spin_rw_mutex::scoped_lock lock{}; 1678 auto curr_arena = arenas_pool.begin(); 1679 for (; curr_arena != arenas_pool.end(); ++curr_arena) { 1680 if (lock.try_acquire(curr_arena->arena_in_use, /*writer*/ false)) { 1681 if (curr_arena->status == arena_handler::alive) { 1682 break; 1683 } else { 1684 lock.release(); 1685 } 1686 } 1687 } 1688 1689 if (curr_arena == arenas_pool.end()) break; 1690 1691 curr_arena->arena->enqueue([] { 1692 std::atomic<int> sum{}; 1693 // Make some work 1694 for (; sum < 1000; ++sum) ; 1695 }); 1696 1697 break; 1698 } 1699 case last_operation_marker : 1700 break; 1701 } 1702 } 1703 }; 1704 1705 for (std::size_t i = 0; i < thread_number - 1; ++i) { 1706 thread_pool.emplace_back(thread_func); 1707 } 1708 1709 thread_func(); 1710 1711 for (std::size_t i = 0; i < thread_number - 1; ++i) { 1712 if (thread_pool[i].joinable()) thread_pool[i].join(); 1713 } 1714 1715 for (auto& handler : arenas_pool) { 1716 if (handler.status != arena_handler::deleted) delete handler.arena; 1717 } 1718 } 1719 1720 struct enqueue_test_helper { 1721 enqueue_test_helper(tbb::task_arena& arena, tbb::enumerable_thread_specific<bool>& ets , std::atomic<std::size_t>& task_counter) 1722 : my_arena(arena), my_ets(ets), my_task_counter(task_counter) 1723 {} 1724 1725 enqueue_test_helper(const enqueue_test_helper& ef) : my_arena(ef.my_arena), my_ets(ef.my_ets), my_task_counter(ef.my_task_counter) 1726 {} 1727 1728 void operator() () const { 1729 CHECK(my_ets.local()); 1730 if (my_task_counter++ < 100000) my_arena.enqueue(enqueue_test_helper(my_arena, my_ets, my_task_counter)); 1731 utils::yield(); 1732 } 1733 1734 tbb::task_arena& my_arena; 1735 tbb::enumerable_thread_specific<bool>& my_ets; 1736 std::atomic<std::size_t>& my_task_counter; 1737 }; 1738 1739 //--------------------------------------------------// 1740 1741 //! Test for task arena in concurrent cases 1742 //! \brief \ref requirement 1743 TEST_CASE("Test for concurrent functionality") { 1744 TestConcurrentFunctionality(); 1745 } 1746 1747 //! Test for arena entry consistency 1748 //! \brief \ref requirement \ref error_guessing 1749 TEST_CASE("Test for task arena entry consistency") { 1750 TestArenaEntryConsistency(); 1751 } 1752 1753 //! Test for task arena attach functionality 1754 //! \brief \ref requirement \ref interface 1755 TEST_CASE("Test for the attach functionality") { 1756 TestAttach(4); 1757 } 1758 1759 //! Test for constant functor requirements 1760 //! \brief \ref requirement \ref interface 1761 TEST_CASE("Test for constant functor requirement") { 1762 TestConstantFunctorRequirement(); 1763 } 1764 1765 //! Test for move semantics support 1766 //! \brief \ref requirement \ref interface 1767 TEST_CASE("Move semantics support") { 1768 TestMoveSemantics(); 1769 } 1770 1771 //! Test for different return value types 1772 //! \brief \ref requirement \ref interface 1773 TEST_CASE("Return value test") { 1774 TestReturnValue(); 1775 } 1776 1777 //! Test for delegated task spawn in case of unsuccessful slot attach 1778 //! \brief \ref error_guessing 1779 TEST_CASE("Delegated spawn wait") { 1780 TestDelegatedSpawnWait(); 1781 } 1782 1783 //! Test task arena isolation functionality 1784 //! \brief \ref requirement \ref interface 1785 TEST_CASE("Isolated execute") { 1786 // Isolation tests cases is valid only for more then 2 threads 1787 if (tbb::this_task_arena::max_concurrency() > 2) { 1788 TestIsolatedExecute(); 1789 } 1790 } 1791 1792 //! Test for TBB Workers creation limits 1793 //! \brief \ref requirement 1794 TEST_CASE("Default workers limit") { 1795 TestDefaultWorkersLimit(); 1796 } 1797 1798 //! Test for workers migration between arenas 1799 //! \brief \ref error_guessing \ref stress 1800 TEST_CASE("Arena workers migration") { 1801 TestArenaWorkersMigration(); 1802 } 1803 1804 //! Test for multiple waits, threads should not block each other 1805 //! \brief \ref requirement 1806 TEST_CASE("Multiple waits") { 1807 TestMultipleWaits(); 1808 } 1809 1810 //! Test for small stack size settings and arena initialization 1811 //! \brief \ref error_guessing 1812 TEST_CASE("Small stack size") { 1813 TestSmallStackSize(); 1814 } 1815 1816 #if TBB_USE_EXCEPTIONS 1817 //! \brief \ref requirement \ref stress 1818 TEST_CASE("Test for exceptions during execute.") { 1819 ExceptionInExecute(); 1820 } 1821 1822 //! \brief \ref error_guessing 1823 TEST_CASE("Exception thrown during tbb::task_arena::execute call") { 1824 struct throwing_obj { 1825 throwing_obj() { 1826 volatile bool flag = true; 1827 if (flag) throw std::exception{}; 1828 } 1829 throwing_obj(const throwing_obj&) = default; 1830 ~throwing_obj() { FAIL("An destructor was called."); } 1831 }; 1832 1833 tbb::task_arena arena; 1834 1835 REQUIRE_THROWS_AS( [&] { 1836 arena.execute([] { 1837 return throwing_obj{}; 1838 }); 1839 }(), std::exception ); 1840 } 1841 #endif // TBB_USE_EXCEPTIONS 1842 1843 //! \brief \ref stress 1844 TEST_CASE("Stress test with mixing functionality") { 1845 StressTestMixFunctionality(); 1846 } 1847 1848 //! \brief \ref stress 1849 TEST_CASE("Workers oversubscription") { 1850 std::size_t num_threads = utils::get_platform_max_threads(); 1851 tbb::enumerable_thread_specific<bool> ets; 1852 tbb::global_control gl(tbb::global_control::max_allowed_parallelism, num_threads * 2); 1853 tbb::task_arena arena(num_threads * 2); 1854 1855 utils::SpinBarrier barrier(num_threads * 2); 1856 1857 arena.execute([&] { 1858 tbb::parallel_for(std::size_t(0), num_threads * 2, 1859 [&] (const std::size_t&) { 1860 ets.local() = true; 1861 barrier.wait(); 1862 } 1863 ); 1864 }); 1865 1866 utils::yield(); 1867 1868 std::atomic<std::size_t> task_counter{0}; 1869 for (std::size_t i = 0; i < num_threads / 4 + 1; ++i) { 1870 arena.enqueue(enqueue_test_helper(arena, ets, task_counter)); 1871 } 1872 1873 while (task_counter < 100000) utils::yield(); 1874 1875 arena.execute([&] { 1876 tbb::parallel_for(std::size_t(0), num_threads * 2, 1877 [&] (const std::size_t&) { 1878 CHECK(ets.local()); 1879 barrier.wait(); 1880 } 1881 ); 1882 }); 1883 } 1884 1885 #if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS 1886 //! Basic test for arena::enqueue with task handle 1887 //! \brief \ref interface \ref requirement 1888 TEST_CASE("enqueue task_handle") { 1889 tbb::task_arena arena; 1890 tbb::task_group tg; 1891 1892 std::atomic<bool> run{false}; 1893 1894 auto task_handle = tg.defer([&]{ run = true; }); 1895 1896 arena.enqueue(std::move(task_handle)); 1897 tg.wait(); 1898 1899 CHECK(run == true); 1900 } 1901 1902 //! Basic test for this_task_arena::enqueue with task handle 1903 //! \brief \ref interface \ref requirement 1904 TEST_CASE("this_task_arena::enqueue task_handle") { 1905 tbb::task_arena arena; 1906 tbb::task_group tg; 1907 1908 std::atomic<bool> run{false}; 1909 1910 arena.execute([&]{ 1911 auto task_handle = tg.defer([&]{ run = true; }); 1912 1913 tbb::this_task_arena::enqueue(std::move(task_handle)); 1914 }); 1915 1916 tg.wait(); 1917 1918 CHECK(run == true); 1919 } 1920 1921 //! Basic test for this_task_arena::enqueue with functor 1922 //! \brief \ref interface \ref requirement 1923 TEST_CASE("this_task_arena::enqueue function") { 1924 tbb::task_arena arena; 1925 tbb::task_group tg; 1926 1927 std::atomic<bool> run{false}; 1928 //block the task_group to wait on it 1929 auto task_handle = tg.defer([]{}); 1930 1931 arena.execute([&]{ 1932 tbb::this_task_arena::enqueue([&]{ 1933 run = true; 1934 //release the task_group 1935 task_handle = tbb::task_handle{}; 1936 }); 1937 }); 1938 1939 tg.wait(); 1940 1941 CHECK(run == true); 1942 } 1943 1944 #if TBB_USE_EXCEPTIONS 1945 //! Basic test for exceptions in task_arena::enqueue with task_handle 1946 //! \brief \ref interface \ref requirement 1947 TEST_CASE("task_arena::enqueue(task_handle) exception propagation"){ 1948 tbb::task_group tg; 1949 tbb::task_arena arena; 1950 1951 tbb::task_handle h = tg.defer([&]{ 1952 volatile bool suppress_unreachable_code_warning = true; 1953 if (suppress_unreachable_code_warning) { 1954 throw std::runtime_error{ "" }; 1955 } 1956 }); 1957 1958 arena.enqueue(std::move(h)); 1959 1960 CHECK_THROWS_AS(tg.wait(), std::runtime_error); 1961 } 1962 1963 //! Basic test for exceptions in this_task_arena::enqueue with task_handle 1964 //! \brief \ref interface \ref requirement 1965 TEST_CASE("this_task_arena::enqueue(task_handle) exception propagation"){ 1966 tbb::task_group tg; 1967 1968 tbb::task_handle h = tg.defer([&]{ 1969 volatile bool suppress_unreachable_code_warning = true; 1970 if (suppress_unreachable_code_warning) { 1971 throw std::runtime_error{ "" }; 1972 } 1973 }); 1974 1975 tbb::this_task_arena::enqueue(std::move(h)); 1976 1977 CHECK_THROWS_AS(tg.wait(), std::runtime_error); 1978 } 1979 1980 //! The test for error in scheduling empty task_handle 1981 //! \brief \ref requirement 1982 TEST_CASE("Empty task_handle cannot be scheduled"){ 1983 tbb::task_arena ta; 1984 1985 CHECK_THROWS_WITH_AS(ta.enqueue(tbb::task_handle{}), "Attempt to schedule empty task_handle", std::runtime_error); 1986 CHECK_THROWS_WITH_AS(tbb::this_task_arena::enqueue(tbb::task_handle{}), "Attempt to schedule empty task_handle", std::runtime_error); 1987 } 1988 #endif // TBB_USE_EXCEPTIONS 1989 1990 //! Basic test for is_inside_task in task_group 1991 //! \brief \ref interface \ref requirement 1992 TEST_CASE("is_inside_task in task_group"){ 1993 CHECK( false == tbb::is_inside_task()); 1994 1995 tbb::task_group tg; 1996 tg.run_and_wait([&]{ 1997 CHECK( true == tbb::is_inside_task()); 1998 }); 1999 } 2000 2001 //! Basic test for is_inside_task in arena::execute 2002 //! \brief \ref interface \ref requirement 2003 TEST_CASE("is_inside_task in arena::execute"){ 2004 CHECK( false == tbb::is_inside_task()); 2005 2006 tbb::task_arena arena; 2007 2008 arena.execute([&]{ 2009 // The execute method is processed outside of any task 2010 CHECK( false == tbb::is_inside_task()); 2011 }); 2012 } 2013 2014 //! The test for is_inside_task in arena::execute when inside other task 2015 //! \brief \ref error_guessing 2016 TEST_CASE("is_inside_task in arena::execute") { 2017 CHECK(false == tbb::is_inside_task()); 2018 2019 tbb::task_arena arena; 2020 tbb::task_group tg; 2021 tg.run_and_wait([&] { 2022 arena.execute([&] { 2023 // The execute method is processed outside of any task 2024 CHECK(false == tbb::is_inside_task()); 2025 }); 2026 }); 2027 } 2028 #endif //__TBB_PREVIEW_TASK_GROUP_EXTENSIONS 2029