xref: /oneTBB/test/tbb/test_task_arena.cpp (revision c4a799df)
1 /*
2     Copyright (c) 2005-2023 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "common/test.h"
18 
19 #define __TBB_EXTRA_DEBUG 1
20 #include "common/concurrency_tracker.h"
21 #include "common/cpu_usertime.h"
22 #include "common/spin_barrier.h"
23 #include "common/utils.h"
24 #include "common/utils_report.h"
25 #include "common/utils_concurrency_limit.h"
26 
27 #include "tbb/task_arena.h"
28 #include "tbb/task_scheduler_observer.h"
29 #include "tbb/enumerable_thread_specific.h"
30 #include "tbb/parallel_for.h"
31 #include "tbb/global_control.h"
32 #include "tbb/concurrent_set.h"
33 #include "tbb/spin_mutex.h"
34 #include "tbb/spin_rw_mutex.h"
35 #include "tbb/task_group.h"
36 
37 #include <atomic>
38 #include <condition_variable>
39 #include <cstdio>
40 #include <cstdlib>
41 #include <stdexcept>
42 #include <thread>
43 #include <vector>
44 
45 //#include "harness_fp.h"
46 
47 //! \file test_task_arena.cpp
48 //! \brief Test for [scheduler.task_arena scheduler.task_scheduler_observer] specification
49 
50 //--------------------------------------------------//
51 // Test that task_arena::initialize and task_arena::terminate work when doing nothing else.
52 /* maxthread is treated as the biggest possible concurrency level. */
InitializeAndTerminate(int maxthread)53 void InitializeAndTerminate( int maxthread ) {
54     for( int i=0; i<200; ++i ) {
55         switch( i&3 ) {
56             // Arena is created inactive, initialization is always explicit. Lazy initialization is covered by other test functions.
57             // Explicit initialization can either keep the original values or change those.
58             // Arena termination can be explicit or implicit (in the destructor).
59             // TODO: extend with concurrency level checks if such a method is added.
60             default: {
61                 tbb::task_arena arena( std::rand() % maxthread + 1 );
62                 CHECK_MESSAGE(!arena.is_active(), "arena should not be active until initialized");
63                 arena.initialize();
64                 CHECK(arena.is_active());
65                 arena.terminate();
66                 CHECK_MESSAGE(!arena.is_active(), "arena should not be active; it was terminated");
67                 break;
68             }
69             case 0: {
70                 tbb::task_arena arena( 1 );
71                 CHECK_MESSAGE(!arena.is_active(), "arena should not be active until initialized");
72                 arena.initialize( std::rand() % maxthread + 1 ); // change the parameters
73                 CHECK(arena.is_active());
74                 break;
75             }
76             case 1: {
77                 tbb::task_arena arena( tbb::task_arena::automatic );
78                 CHECK(!arena.is_active());
79                 arena.initialize();
80                 CHECK(arena.is_active());
81                 break;
82             }
83             case 2: {
84                 tbb::task_arena arena;
85                 CHECK_MESSAGE(!arena.is_active(), "arena should not be active until initialized");
86                 arena.initialize( std::rand() % maxthread + 1 );
87                 CHECK(arena.is_active());
88                 arena.terminate();
89                 CHECK_MESSAGE(!arena.is_active(), "arena should not be active; it was terminated");
90                 break;
91             }
92         }
93     }
94 }
95 
96 //--------------------------------------------------//
97 
98 // Definitions used in more than one test
99 typedef tbb::blocked_range<int> Range;
100 
101 // slot_id value: -1 is reserved by current_slot(), -2 is set in on_scheduler_exit() below
102 static tbb::enumerable_thread_specific<int> local_id, old_id, slot_id(-3);
103 
ResetTLS()104 void ResetTLS() {
105     local_id.clear();
106     old_id.clear();
107     slot_id.clear();
108 }
109 
110 class ArenaObserver : public tbb::task_scheduler_observer {
111     int myId;               // unique observer/arena id within a test
112     int myMaxConcurrency;   // concurrency of the associated arena
113     int myNumReservedSlots; // reserved slots in the associated arena
on_scheduler_entry(bool is_worker)114     void on_scheduler_entry( bool is_worker ) override {
115         int current_index = tbb::this_task_arena::current_thread_index();
116         CHECK(current_index < (myMaxConcurrency > 1 ? myMaxConcurrency : 2));
117         if (is_worker) {
118             CHECK(current_index >= myNumReservedSlots);
119         }
120         CHECK_MESSAGE(!old_id.local(), "double call to on_scheduler_entry");
121         old_id.local() = local_id.local();
122         CHECK_MESSAGE(old_id.local() != myId, "double entry to the same arena");
123         local_id.local() = myId;
124         slot_id.local() = current_index;
125     }
on_scheduler_exit(bool)126     void on_scheduler_exit( bool /*is_worker*/ ) override {
127         CHECK_MESSAGE(local_id.local() == myId, "nesting of arenas is broken");
128         CHECK(slot_id.local() == tbb::this_task_arena::current_thread_index());
129         slot_id.local() = -2;
130         local_id.local() = old_id.local();
131         old_id.local() = 0;
132     }
133 public:
ArenaObserver(tbb::task_arena & a,int maxConcurrency,int numReservedSlots,int id)134     ArenaObserver(tbb::task_arena &a, int maxConcurrency, int numReservedSlots, int id)
135         : tbb::task_scheduler_observer(a)
136         , myId(id)
137         , myMaxConcurrency(maxConcurrency)
138         , myNumReservedSlots(numReservedSlots) {
139         CHECK(myId);
140         observe(true);
141     }
~ArenaObserver()142     ~ArenaObserver () {
143         observe(false);
144         CHECK_MESSAGE(!old_id.local(), "inconsistent observer state");
145     }
146 };
147 
148 struct IndexTrackingBody { // Must be used together with ArenaObserver
operator ()IndexTrackingBody149     void operator() ( const Range& ) const {
150         CHECK(slot_id.local() == tbb::this_task_arena::current_thread_index());
151         utils::doDummyWork(50000);
152     }
153 };
154 
155 struct AsynchronousWork {
156     utils::SpinBarrier &my_barrier;
157     bool my_is_blocking;
AsynchronousWorkAsynchronousWork158     AsynchronousWork(utils::SpinBarrier &a_barrier, bool blocking = true)
159     : my_barrier(a_barrier), my_is_blocking(blocking) {}
operator ()AsynchronousWork160     void operator()() const {
161         CHECK_MESSAGE(local_id.local() != 0, "not in explicit arena");
162         tbb::parallel_for(Range(0,500), IndexTrackingBody(), tbb::simple_partitioner());
163         if(my_is_blocking) my_barrier.wait(); // must be asynchronous to an external thread
164         else my_barrier.signalNoWait();
165     }
166 };
167 
168 //-----------------------------------------------------------------------------------------//
169 
170 // Test that task_arenas might be created and used from multiple application threads.
171 // Also tests arena observers. The parameter p is the index of an app thread running this test.
TestConcurrentArenasFunc(int idx)172 void TestConcurrentArenasFunc(int idx) {
173     // A regression test for observer activation order:
174     // check that arena observer can be activated before local observer
175     struct LocalObserver : public tbb::task_scheduler_observer {
176         LocalObserver() : tbb::task_scheduler_observer() { observe(true); }
177         LocalObserver(tbb::task_arena& a) : tbb::task_scheduler_observer(a) {
178             observe(true);
179         }
180         ~LocalObserver() {
181             observe(false);
182         }
183     };
184 
185     tbb::task_arena a1;
186     a1.initialize(1,0);
187     ArenaObserver o1(a1, 1, 0, idx*2+1); // the last argument is a "unique" observer/arena id for the test
188     CHECK_MESSAGE(o1.is_observing(), "Arena observer has not been activated");
189 
190     tbb::task_arena a2(2,1);
191     ArenaObserver o2(a2, 2, 1, idx*2+2);
192     CHECK_MESSAGE(o2.is_observing(), "Arena observer has not been activated");
193 
194     LocalObserver lo1;
195     CHECK_MESSAGE(lo1.is_observing(), "Local observer has not been activated");
196 
197     tbb::task_arena a3(1, 0);
198     LocalObserver lo2(a3);
199     CHECK_MESSAGE(lo2.is_observing(), "Local observer has not been activated");
200 
201     utils::SpinBarrier barrier(2);
202     AsynchronousWork work(barrier);
203 
204     a1.enqueue(work); // put async work
205     barrier.wait();
206 
207     a2.enqueue(work); // another work
208     a2.execute(work);
209 
210     a3.execute([] {
211         utils::doDummyWork(100);
212     });
213 
214     a1.debug_wait_until_empty();
215     a2.debug_wait_until_empty();
216 }
217 
TestConcurrentArenas(int p)218 void TestConcurrentArenas(int p) {
219     // TODO REVAMP fix with global control
220     ResetTLS();
221     utils::NativeParallelFor( p, &TestConcurrentArenasFunc );
222 }
223 //--------------------------------------------------//
224 // Test multiple application threads working with a single arena at the same time.
225 class MultipleMastersPart1 : utils::NoAssign {
226     tbb::task_arena &my_a;
227     utils::SpinBarrier &my_b1, &my_b2;
228 public:
MultipleMastersPart1(tbb::task_arena & a,utils::SpinBarrier & b1,utils::SpinBarrier & b2)229     MultipleMastersPart1( tbb::task_arena &a, utils::SpinBarrier &b1, utils::SpinBarrier &b2)
230         : my_a(a), my_b1(b1), my_b2(b2) {}
operator ()(int) const231     void operator()(int) const {
232         my_a.execute(AsynchronousWork(my_b2, /*blocking=*/false));
233         my_b1.wait();
234         // A regression test for bugs 1954 & 1971
235         my_a.enqueue(AsynchronousWork(my_b2, /*blocking=*/false));
236     }
237 };
238 
239 class MultipleMastersPart2 : utils::NoAssign {
240     tbb::task_arena &my_a;
241     utils::SpinBarrier &my_b;
242 public:
MultipleMastersPart2(tbb::task_arena & a,utils::SpinBarrier & b)243     MultipleMastersPart2( tbb::task_arena &a, utils::SpinBarrier &b) : my_a(a), my_b(b) {}
operator ()(int) const244     void operator()(int) const {
245         my_a.execute(AsynchronousWork(my_b, /*blocking=*/false));
246     }
247 };
248 
249 class MultipleMastersPart3 : utils::NoAssign {
250     tbb::task_arena &my_a;
251     utils::SpinBarrier &my_b;
252     using wait_context = tbb::detail::d1::wait_context;
253 
254     struct Runner : NoAssign {
255         wait_context& myWait;
RunnerMultipleMastersPart3::Runner256         Runner(wait_context& w) : myWait(w) {}
operator ()MultipleMastersPart3::Runner257         void operator()() const {
258             utils::doDummyWork(10000);
259             myWait.release();
260         }
261     };
262 
263     struct Waiter : NoAssign {
264         wait_context& myWait;
WaiterMultipleMastersPart3::Waiter265         Waiter(wait_context& w) : myWait(w) {}
operator ()MultipleMastersPart3::Waiter266         void operator()() const {
267             tbb::task_group_context ctx;
268             tbb::detail::d1::wait(myWait, ctx);
269         }
270     };
271 
272 public:
MultipleMastersPart3(tbb::task_arena & a,utils::SpinBarrier & b)273     MultipleMastersPart3(tbb::task_arena &a, utils::SpinBarrier &b)
274         : my_a(a), my_b(b) {}
operator ()(int) const275     void operator()(int) const {
276         wait_context wait(0);
277         my_b.wait(); // increases chances for task_arena initialization contention
278         for( int i=0; i<100; ++i) {
279             wait.reserve();
280             my_a.enqueue(Runner(wait));
281             my_a.execute(Waiter(wait));
282         }
283         my_b.wait();
284     }
285 };
286 
TestMultipleMasters(int p)287 void TestMultipleMasters(int p) {
288     {
289         ResetTLS();
290         tbb::task_arena a(1,0);
291         a.initialize();
292         ArenaObserver o(a, 1, 0, 1);
293         utils::SpinBarrier barrier1(p), barrier2(2*p+1); // each of p threads will submit two tasks signaling the barrier
294         NativeParallelFor( p, MultipleMastersPart1(a, barrier1, barrier2) );
295         barrier2.wait();
296         a.debug_wait_until_empty();
297     } {
298         ResetTLS();
299         tbb::task_arena a(2,1);
300         ArenaObserver o(a, 2, 1, 2);
301         utils::SpinBarrier barrier(p+2);
302         a.enqueue(AsynchronousWork(barrier, /*blocking=*/true)); // occupy the worker, a regression test for bug 1981
303         // TODO: buggy test. A worker threads need time to occupy the slot to prevent an external thread from taking an enqueue task.
304         utils::Sleep(10);
305         NativeParallelFor( p, MultipleMastersPart2(a, barrier) );
306         barrier.wait();
307         a.debug_wait_until_empty();
308     } {
309         // Regression test for the bug 1981 part 2 (task_arena::execute() with wait_for_all for an enqueued task)
310         tbb::task_arena a(p,1);
311         utils::SpinBarrier barrier(p+1); // for external threads to avoid endless waiting at least in some runs
312         // "Oversubscribe" the arena by 1 external thread
313         NativeParallelFor( p+1, MultipleMastersPart3(a, barrier) );
314         a.debug_wait_until_empty();
315     }
316 }
317 
318 //--------------------------------------------------//
319 // TODO: explain what TestArenaEntryConsistency does
320 #include <sstream>
321 #include <stdexcept>
322 #include "oneapi/tbb/detail/_exception.h"
323 #include "common/fp_control.h"
324 
325 struct TestArenaEntryBody : FPModeContext {
326     std::atomic<int> &my_stage; // each execute increases it
327     std::stringstream my_id;
328     bool is_caught, is_expected;
329     enum { arenaFPMode = 1 };
330 
TestArenaEntryBodyTestArenaEntryBody331     TestArenaEntryBody(std::atomic<int> &s, int idx, int i)  // init thread-specific instance
332     :   FPModeContext(idx+i)
333     ,   my_stage(s)
334     ,   is_caught(false)
335 #if TBB_USE_EXCEPTIONS
336     ,   is_expected( (idx&(1<<i)) != 0 )
337 #else
338     , is_expected(false)
339 #endif
340     {
341         my_id << idx << ':' << i << '@';
342     }
operator ()TestArenaEntryBody343     void operator()() { // inside task_arena::execute()
344         // synchronize with other stages
345         int stage = my_stage++;
346         int slot = tbb::this_task_arena::current_thread_index();
347         CHECK(slot >= 0);
348         CHECK(slot <= 1);
349         // wait until the third stage is delegated and then starts on slot 0
350         while(my_stage < 2+slot) utils::yield();
351         // deduct its entry type and put it into id, it helps to find source of a problem
352         my_id << (stage < 3 ? (tbb::this_task_arena::current_thread_index()?
353                               "delegated_to_worker" : stage < 2? "direct" : "delegated_to_master")
354                             : stage == 3? "nested_same_ctx" : "nested_alien_ctx");
355         AssertFPMode(arenaFPMode);
356         if (is_expected) {
357             TBB_TEST_THROW(std::logic_error(my_id.str()));
358         }
359         // no code can be put here since exceptions can be thrown
360     }
on_exceptionTestArenaEntryBody361     void on_exception(const char *e) { // outside arena, in catch block
362         is_caught = true;
363         CHECK(my_id.str() == e);
364         assertFPMode();
365     }
after_executeTestArenaEntryBody366     void after_execute() { // outside arena and catch block
367         CHECK(is_caught == is_expected);
368         assertFPMode();
369     }
370 };
371 
372 class ForEachArenaEntryBody : utils::NoAssign {
373     tbb::task_arena &my_a; // expected task_arena(2,1)
374     std::atomic<int> &my_stage; // each execute increases it
375     int my_idx;
376 
377 public:
ForEachArenaEntryBody(tbb::task_arena & a,std::atomic<int> & c)378     ForEachArenaEntryBody(tbb::task_arena &a, std::atomic<int> &c)
379     : my_a(a), my_stage(c), my_idx(0) {}
380 
test(int idx)381     void test(int idx) {
382         my_idx = idx;
383         my_stage = 0;
384         NativeParallelFor(3, *this); // test cross-arena calls
385         CHECK(my_stage == 3);
386         my_a.execute(*this); // test nested calls
387         CHECK(my_stage == 5);
388     }
389 
390     // task_arena functor for nested tests
operator ()() const391     void operator()() const {
392         test_arena_entry(3); // in current task group context
393         tbb::parallel_for(4, 5, *this); // in different context
394     }
395 
396     // NativeParallelFor & parallel_for functor
operator ()(int i) const397     void operator()(int i) const {
398         test_arena_entry(i);
399     }
400 
401 private:
test_arena_entry(int i) const402     void test_arena_entry(int i) const {
403         GetRoundingMode();
404         TestArenaEntryBody scoped_functor(my_stage, my_idx, i);
405         GetRoundingMode();
406 #if TBB_USE_EXCEPTIONS
407         try {
408             my_a.execute(scoped_functor);
409         } catch(std::logic_error &e) {
410             scoped_functor.on_exception(e.what());
411         } catch(...) { CHECK_MESSAGE(false, "Unexpected exception type"); }
412 #else
413         my_a.execute(scoped_functor);
414 #endif
415         scoped_functor.after_execute();
416     }
417 };
418 
TestArenaEntryConsistency()419 void TestArenaEntryConsistency() {
420     tbb::task_arena a(2, 1);
421     std::atomic<int> c;
422     ForEachArenaEntryBody body(a, c);
423 
424     FPModeContext fp_scope(TestArenaEntryBody::arenaFPMode);
425     a.initialize(); // capture FP settings to arena
426     fp_scope.setNextFPMode();
427 
428     for (int i = 0; i < 100; i++) // not less than 32 = 2^5 of entry types
429         body.test(i);
430 }
431 //--------------------------------------------------
432 // Test that the requested degree of concurrency for task_arena is achieved in various conditions
433 class TestArenaConcurrencyBody : utils::NoAssign {
434     tbb::task_arena &my_a;
435     int my_max_concurrency;
436     int my_reserved_slots;
437     utils::SpinBarrier *my_barrier;
438     utils::SpinBarrier *my_worker_barrier;
439 public:
TestArenaConcurrencyBody(tbb::task_arena & a,int max_concurrency,int reserved_slots,utils::SpinBarrier * b=nullptr,utils::SpinBarrier * wb=nullptr)440     TestArenaConcurrencyBody( tbb::task_arena &a, int max_concurrency, int reserved_slots, utils::SpinBarrier *b = nullptr, utils::SpinBarrier *wb = nullptr )
441     : my_a(a), my_max_concurrency(max_concurrency), my_reserved_slots(reserved_slots), my_barrier(b), my_worker_barrier(wb) {}
442     // NativeParallelFor's functor
operator ()(int) const443     void operator()( int ) const {
444         CHECK_MESSAGE( local_id.local() == 0, "TLS was not cleaned?" );
445         local_id.local() = 1;
446         my_a.execute( *this );
447     }
448     // Arena's functor
operator ()() const449     void operator()() const {
450         int idx = tbb::this_task_arena::current_thread_index();
451         REQUIRE( idx < (my_max_concurrency > 1 ? my_max_concurrency : 2) );
452         REQUIRE( my_a.max_concurrency() == tbb::this_task_arena::max_concurrency() );
453         int max_arena_concurrency = tbb::this_task_arena::max_concurrency();
454         REQUIRE( max_arena_concurrency == my_max_concurrency );
455         if ( my_worker_barrier ) {
456             if ( local_id.local() == 1 ) {
457                 // External thread in a reserved slot
458                 CHECK_MESSAGE( idx < my_reserved_slots, "External threads are supposed to use only reserved slots in this test" );
459             } else {
460                 // Worker thread
461                 CHECK( idx >= my_reserved_slots );
462                 my_worker_barrier->wait();
463             }
464         } else if ( my_barrier )
465             CHECK_MESSAGE( local_id.local() == 1, "Workers are not supposed to enter the arena in this test" );
466         if ( my_barrier ) my_barrier->wait();
467         else utils::Sleep( 1 );
468     }
469 };
470 
TestArenaConcurrency(int p,int reserved=0,int step=1)471 void TestArenaConcurrency( int p, int reserved = 0, int step = 1) {
472     for (; reserved <= p; reserved += step) {
473         tbb::task_arena a( p, reserved );
474         if (p - reserved < tbb::this_task_arena::max_concurrency()) {
475             // Check concurrency with worker & reserved external threads.
476             ResetTLS();
477             utils::SpinBarrier b( p );
478             utils::SpinBarrier wb( p-reserved );
479             TestArenaConcurrencyBody test( a, p, reserved, &b, &wb );
480             for ( int i = reserved; i < p; ++i ) // requests p-reserved worker threads
481                 a.enqueue( test );
482             if ( reserved==1 )
483                 test( 0 ); // calls execute()
484             else
485                 utils::NativeParallelFor( reserved, test );
486             a.debug_wait_until_empty();
487         } { // Check if multiple external threads alone can achieve maximum concurrency.
488             ResetTLS();
489             utils::SpinBarrier b( p );
490             utils::NativeParallelFor( p, TestArenaConcurrencyBody( a, p, reserved, &b ) );
491             a.debug_wait_until_empty();
492         } { // Check oversubscription by external threads.
493 #if !_WIN32 || !_WIN64
494             // Some C++ implementations allocate 8MB stacks for std::thread on 32 bit platforms
495             // that makes impossible to create more than ~500 threads.
496             if ( !(sizeof(std::size_t) == 4 && p > 200) )
497 #endif
498 #if TBB_TEST_LOW_WORKLOAD
499             if ( p <= 16 )
500 #endif
501             {
502                 ResetTLS();
503                 utils::NativeParallelFor(2 * p, TestArenaConcurrencyBody(a, p, reserved));
504                 a.debug_wait_until_empty();
505             }
506         }
507     }
508 }
509 
510 struct TestMandatoryConcurrencyObserver : public tbb::task_scheduler_observer {
511     utils::SpinBarrier& m_barrier;
512 
TestMandatoryConcurrencyObserverTestMandatoryConcurrencyObserver513     TestMandatoryConcurrencyObserver(tbb::task_arena& a, utils::SpinBarrier& barrier)
514         : tbb::task_scheduler_observer(a), m_barrier(barrier) {
515         observe(true);
516     }
~TestMandatoryConcurrencyObserverTestMandatoryConcurrencyObserver517     ~TestMandatoryConcurrencyObserver() {
518         observe(false);
519     }
on_scheduler_exitTestMandatoryConcurrencyObserver520     void on_scheduler_exit(bool worker) override {
521         if (worker) {
522             m_barrier.wait();
523         }
524     }
525 };
526 
TestMandatoryConcurrency()527 void TestMandatoryConcurrency() {
528     tbb::task_arena a(1);
529     a.execute([&a] {
530         int n_threads = 4;
531         utils::SpinBarrier exit_barrier(2);
532         TestMandatoryConcurrencyObserver observer(a, exit_barrier);
533         for (int j = 0; j < 5; ++j) {
534             utils::ExactConcurrencyLevel::check(1);
535             std::atomic<int> num_tasks{ 0 }, curr_tasks{ 0 };
536             utils::SpinBarrier barrier(n_threads);
537             utils::NativeParallelFor(n_threads, [&](int) {
538                 for (int i = 0; i < 5; ++i) {
539                     barrier.wait();
540                     a.enqueue([&] {
541                         CHECK(tbb::this_task_arena::max_concurrency() == 2);
542                         CHECK(a.max_concurrency() == 2);
543                         ++curr_tasks;
544                         CHECK(curr_tasks == 1);
545                         utils::doDummyWork(1000);
546                         CHECK(curr_tasks == 1);
547                         --curr_tasks;
548                         ++num_tasks;
549                     });
550                     barrier.wait();
551                 }
552             });
553             do {
554                 exit_barrier.wait();
555             } while (num_tasks < n_threads * 5);
556         }
557     });
558 }
559 
TestConcurrentFunctionality(int min_thread_num=1,int max_thread_num=3)560 void TestConcurrentFunctionality(int min_thread_num = 1, int max_thread_num = 3) {
561     TestMandatoryConcurrency();
562     InitializeAndTerminate(max_thread_num);
563     for (int p = min_thread_num; p <= max_thread_num; ++p) {
564         TestConcurrentArenas(p);
565         TestMultipleMasters(p);
566         TestArenaConcurrency(p);
567     }
568 }
569 
570 //--------------------------------------------------//
571 // Test creation/initialization of a task_arena that references an existing arena (aka attach).
572 // This part of the test uses the knowledge of task_arena internals
573 
574 struct TaskArenaValidator {
575     int my_slot_at_construction;
576     const tbb::task_arena& my_arena;
TaskArenaValidatorTaskArenaValidator577     TaskArenaValidator( const tbb::task_arena& other )
578         : my_slot_at_construction(tbb::this_task_arena::current_thread_index())
579         , my_arena(other)
580     {}
581     // Inspect the internal state
concurrencyTaskArenaValidator582     int concurrency() { return my_arena.debug_max_concurrency(); }
reserved_for_mastersTaskArenaValidator583     int reserved_for_masters() { return my_arena.debug_reserved_slots(); }
584 
585     // This method should be called in task_arena::execute() for a captured arena
586     // by the same thread that created the validator.
operator ()TaskArenaValidator587     void operator()() {
588         CHECK_MESSAGE( tbb::this_task_arena::current_thread_index()==my_slot_at_construction,
589                 "Current thread index has changed since the validator construction" );
590     }
591 };
592 
ValidateAttachedArena(tbb::task_arena & arena,bool expect_activated,int expect_concurrency,int expect_masters)593 void ValidateAttachedArena( tbb::task_arena& arena, bool expect_activated,
594                             int expect_concurrency, int expect_masters ) {
595     CHECK_MESSAGE( arena.is_active()==expect_activated, "Unexpected activation state" );
596     if( arena.is_active() ) {
597         TaskArenaValidator validator( arena );
598         CHECK_MESSAGE( validator.concurrency()==expect_concurrency, "Unexpected arena size" );
599         CHECK_MESSAGE( validator.reserved_for_masters()==expect_masters, "Unexpected # of reserved slots" );
600         if ( tbb::this_task_arena::current_thread_index() != tbb::task_arena::not_initialized ) {
601             CHECK(tbb::this_task_arena::current_thread_index() >= 0);
602             // for threads already in arena, check that the thread index remains the same
603             arena.execute( validator );
604         } else { // not_initialized
605             // Test the deprecated method
606             CHECK(tbb::this_task_arena::current_thread_index() == -1);
607         }
608 
609         // Ideally, there should be a check for having the same internal arena object,
610         // but that object is not easily accessible for implicit arenas.
611     }
612 }
613 
614 struct TestAttachBody : utils::NoAssign {
615     static thread_local int my_idx; // safe to modify and use within the NativeParallelFor functor
616     const int maxthread;
TestAttachBodyTestAttachBody617     TestAttachBody( int max_thr ) : maxthread(max_thr) {}
618 
619     // The functor body for NativeParallelFor
operator ()TestAttachBody620     void operator()( int idx ) const {
621         my_idx = idx;
622 
623         int default_threads = tbb::this_task_arena::max_concurrency();
624 
625         tbb::task_arena arena{tbb::task_arena::attach()};
626         ValidateAttachedArena( arena, false, -1, -1 ); // Nothing yet to attach to
627 
628         arena.terminate();
629         ValidateAttachedArena( arena, false, -1, -1 );
630 
631         // attach to an auto-initialized arena
632         tbb::parallel_for(0, 1, [](int) {});
633 
634         tbb::task_arena arena2{tbb::task_arena::attach()};
635         ValidateAttachedArena( arena2, true, default_threads, 1 );
636 
637         tbb::task_arena arena3;
638         arena3.initialize(tbb::attach());
639         ValidateAttachedArena( arena3, true, default_threads, 1 );
640 
641 
642         // attach to another task_arena
643         arena.initialize( maxthread, std::min(maxthread,idx) );
644         arena.execute( *this );
645     }
646 
647     // The functor body for task_arena::execute above
operator ()TestAttachBody648     void operator()() const {
649         tbb::task_arena arena2{tbb::task_arena::attach()};
650         ValidateAttachedArena( arena2, true, maxthread, std::min(maxthread,my_idx) );
651     }
652 
653     // The functor body for tbb::parallel_for
operator ()TestAttachBody654     void operator()( const Range& r ) const {
655         for( int i = r.begin(); i<r.end(); ++i ) {
656             tbb::task_arena arena2{tbb::task_arena::attach()};
657             ValidateAttachedArena( arena2, true, tbb::this_task_arena::max_concurrency(), 1 );
658         }
659     }
660 };
661 
662 thread_local int TestAttachBody::my_idx;
663 
TestAttach(int maxthread)664 void TestAttach( int maxthread ) {
665     // Externally concurrent, but no concurrency within a thread
666     utils::NativeParallelFor( std::max(maxthread,4), TestAttachBody( maxthread ) );
667     // Concurrent within the current arena; may also serve as a stress test
668     tbb::parallel_for( Range(0,10000*maxthread), TestAttachBody( maxthread ) );
669 }
670 
671 //--------------------------------------------------//
672 
673 // Test that task_arena::enqueue does not tolerate a non-const functor.
674 // TODO: can it be reworked as SFINAE-based compile-time check?
675 struct TestFunctor {
operator ()TestFunctor676     void operator()() { CHECK_MESSAGE( false, "Non-const operator called" ); }
operator ()TestFunctor677     void operator()() const { /* library requires this overload only */ }
678 };
679 
TestConstantFunctorRequirement()680 void TestConstantFunctorRequirement() {
681     tbb::task_arena a;
682     TestFunctor tf;
683     a.enqueue( tf );
684 }
685 
686 //--------------------------------------------------//
687 
688 #include "tbb/parallel_reduce.h"
689 #include "tbb/parallel_invoke.h"
690 
691 // Test this_task_arena::isolate
692 namespace TestIsolatedExecuteNS {
693     template <typename NestedPartitioner>
694     class NestedParFor : utils::NoAssign {
695     public:
NestedParFor()696         NestedParFor() {}
operator ()() const697         void operator()() const {
698             NestedPartitioner p;
699             tbb::parallel_for( 0, 10, utils::DummyBody( 10 ), p );
700         }
701     };
702 
703     template <typename NestedPartitioner>
704     class ParForBody : utils::NoAssign {
705         bool myOuterIsolation;
706         tbb::enumerable_thread_specific<int> &myEts;
707         std::atomic<bool> &myIsStolen;
708     public:
ParForBody(bool outer_isolation,tbb::enumerable_thread_specific<int> & ets,std::atomic<bool> & is_stolen)709         ParForBody( bool outer_isolation, tbb::enumerable_thread_specific<int> &ets, std::atomic<bool> &is_stolen )
710             : myOuterIsolation( outer_isolation ), myEts( ets ), myIsStolen( is_stolen ) {}
operator ()(int) const711         void operator()( int ) const {
712             int &e = myEts.local();
713             if ( e++ > 0 ) myIsStolen = true;
714             if ( myOuterIsolation )
715                 NestedParFor<NestedPartitioner>()();
716             else
717                 tbb::this_task_arena::isolate( NestedParFor<NestedPartitioner>() );
718             --e;
719         }
720     };
721 
722     template <typename OuterPartitioner, typename NestedPartitioner>
723     class OuterParFor : utils::NoAssign {
724         bool myOuterIsolation;
725         std::atomic<bool> &myIsStolen;
726     public:
OuterParFor(bool outer_isolation,std::atomic<bool> & is_stolen)727         OuterParFor( bool outer_isolation, std::atomic<bool> &is_stolen ) : myOuterIsolation( outer_isolation ), myIsStolen( is_stolen ) {}
operator ()() const728         void operator()() const {
729             tbb::enumerable_thread_specific<int> ets( 0 );
730             OuterPartitioner p;
731             tbb::parallel_for( 0, 1000, ParForBody<NestedPartitioner>( myOuterIsolation, ets, myIsStolen ), p );
732         }
733     };
734 
735     template <typename OuterPartitioner, typename NestedPartitioner>
TwoLoopsTest(bool outer_isolation)736     void TwoLoopsTest( bool outer_isolation ) {
737         std::atomic<bool> is_stolen;
738         is_stolen = false;
739         const int max_repeats = 100;
740         if ( outer_isolation ) {
741             for ( int i = 0; i <= max_repeats; ++i ) {
742                 tbb::this_task_arena::isolate( OuterParFor<OuterPartitioner, NestedPartitioner>( outer_isolation, is_stolen ) );
743                 if ( is_stolen ) break;
744             }
745             // TODO: was ASSERT_WARNING
746             if (!is_stolen) {
747                 REPORT("Warning: isolate() should not block stealing on nested levels without isolation\n");
748             }
749         } else {
750             for ( int i = 0; i <= max_repeats; ++i ) {
751                 OuterParFor<OuterPartitioner, NestedPartitioner>( outer_isolation, is_stolen )();
752             }
753             REQUIRE_MESSAGE( !is_stolen, "isolate() on nested levels should prevent stealing from outer levels" );
754         }
755     }
756 
TwoLoopsTest(bool outer_isolation)757     void TwoLoopsTest( bool outer_isolation ) {
758         TwoLoopsTest<tbb::simple_partitioner, tbb::simple_partitioner>( outer_isolation );
759         TwoLoopsTest<tbb::simple_partitioner, tbb::affinity_partitioner>( outer_isolation );
760         TwoLoopsTest<tbb::affinity_partitioner, tbb::simple_partitioner>( outer_isolation );
761         TwoLoopsTest<tbb::affinity_partitioner, tbb::affinity_partitioner>( outer_isolation );
762     }
763 
TwoLoopsTest()764     void TwoLoopsTest() {
765         TwoLoopsTest( true );
766         TwoLoopsTest( false );
767     }
768     //--------------------------------------------------//
769     class HeavyMixTestBody : utils::NoAssign {
770         tbb::enumerable_thread_specific<utils::FastRandom<>>& myRandom;
771         tbb::enumerable_thread_specific<int>& myIsolatedLevel;
772         int myNestedLevel;
773 
774         template <typename Partitioner, typename Body>
RunTwoBodies(utils::FastRandom<> & rnd,const Body & body,Partitioner & p,tbb::task_group_context * ctx=nullptr)775         static void RunTwoBodies( utils::FastRandom<>& rnd, const Body &body, Partitioner& p, tbb::task_group_context* ctx = nullptr ) {
776             if ( rnd.get() % 2 ) {
777                 if  (ctx )
778                     tbb::parallel_for( 0, 2, body, p, *ctx );
779                 else
780                     tbb::parallel_for( 0, 2, body, p );
781             } else {
782                 tbb::parallel_invoke( body, body );
783             }
784         }
785 
786         template <typename Partitioner>
787         class IsolatedBody : utils::NoAssign {
788             const HeavyMixTestBody &myHeavyMixTestBody;
789             Partitioner &myPartitioner;
790         public:
IsolatedBody(const HeavyMixTestBody & body,Partitioner & partitioner)791             IsolatedBody( const HeavyMixTestBody &body, Partitioner &partitioner )
792                 : myHeavyMixTestBody( body ), myPartitioner( partitioner ) {}
operator ()() const793             void operator()() const {
794                 RunTwoBodies( myHeavyMixTestBody.myRandom.local(),
795                     HeavyMixTestBody( myHeavyMixTestBody.myRandom, myHeavyMixTestBody.myIsolatedLevel,
796                         myHeavyMixTestBody.myNestedLevel + 1 ),
797                     myPartitioner );
798             }
799         };
800 
801         template <typename Partitioner>
RunNextLevel(utils::FastRandom<> & rnd,int & isolated_level) const802         void RunNextLevel( utils::FastRandom<>& rnd, int &isolated_level ) const {
803             Partitioner p;
804             switch ( rnd.get() % 2 ) {
805                 case 0: {
806                     // No features
807                     tbb::task_group_context ctx;
808                     RunTwoBodies( rnd, HeavyMixTestBody(myRandom, myIsolatedLevel, myNestedLevel + 1), p, &ctx );
809                     break;
810                 }
811                 case 1: {
812                     // Isolation
813                     int previous_isolation = isolated_level;
814                     isolated_level = myNestedLevel;
815                     tbb::this_task_arena::isolate( IsolatedBody<Partitioner>( *this, p ) );
816                     isolated_level = previous_isolation;
817                     break;
818                 }
819             }
820         }
821     public:
HeavyMixTestBody(tbb::enumerable_thread_specific<utils::FastRandom<>> & random,tbb::enumerable_thread_specific<int> & isolated_level,int nested_level)822         HeavyMixTestBody( tbb::enumerable_thread_specific<utils::FastRandom<>>& random,
823             tbb::enumerable_thread_specific<int>& isolated_level, int nested_level )
824             : myRandom( random ), myIsolatedLevel( isolated_level )
825             , myNestedLevel( nested_level ) {}
operator ()() const826         void operator()() const {
827             int &isolated_level = myIsolatedLevel.local();
828             CHECK_FAST_MESSAGE( myNestedLevel > isolated_level, "The outer-level task should not be stolen on isolated level" );
829             if ( myNestedLevel == 20 )
830                 return;
831             utils::FastRandom<>& rnd = myRandom.local();
832             if ( rnd.get() % 2 == 1 ) {
833                 RunNextLevel<tbb::auto_partitioner>( rnd, isolated_level );
834             } else {
835                 RunNextLevel<tbb::affinity_partitioner>( rnd, isolated_level );
836             }
837         }
operator ()(int) const838         void operator()(int) const {
839             this->operator()();
840         }
841     };
842 
843     struct RandomInitializer {
operator ()TestIsolatedExecuteNS::RandomInitializer844         utils::FastRandom<> operator()() {
845             return utils::FastRandom<>( tbb::this_task_arena::current_thread_index() );
846         }
847     };
848 
HeavyMixTest()849     void HeavyMixTest() {
850         std::size_t num_threads = tbb::this_task_arena::max_concurrency() < 3 ? 3 : tbb::this_task_arena::max_concurrency();
851         tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, num_threads);
852 
853         RandomInitializer init_random;
854         tbb::enumerable_thread_specific<utils::FastRandom<>> random( init_random );
855         tbb::enumerable_thread_specific<int> isolated_level( 0 );
856         for ( int i = 0; i < 5; ++i ) {
857             HeavyMixTestBody b( random, isolated_level, 1 );
858             b( 0 );
859         }
860     }
861 
862     //--------------------------------------------------//
863 #if TBB_USE_EXCEPTIONS
864     struct MyException {};
865     struct IsolatedBodyThrowsException {
operator ()TestIsolatedExecuteNS::IsolatedBodyThrowsException866         void operator()() const {
867 #if _MSC_VER && !__INTEL_COMPILER
868             // Workaround an unreachable code warning in task_arena_function.
869             volatile bool workaround = true;
870             if (workaround)
871 #endif
872             {
873                 throw MyException();
874             }
875         }
876     };
877     struct ExceptionTestBody : utils::NoAssign {
878         tbb::enumerable_thread_specific<int>& myEts;
879         std::atomic<bool>& myIsStolen;
ExceptionTestBodyTestIsolatedExecuteNS::ExceptionTestBody880         ExceptionTestBody( tbb::enumerable_thread_specific<int>& ets, std::atomic<bool>& is_stolen )
881             : myEts( ets ), myIsStolen( is_stolen ) {}
operator ()TestIsolatedExecuteNS::ExceptionTestBody882         void operator()( int i ) const {
883             try {
884                 tbb::this_task_arena::isolate( IsolatedBodyThrowsException() );
885                 REQUIRE_MESSAGE( false, "The exception has been lost" );
886             }
887             catch ( MyException ) {}
888             catch ( ... ) {
889                 REQUIRE_MESSAGE( false, "Unexpected exception" );
890             }
891             // Check that nested algorithms can steal outer-level tasks
892             int &e = myEts.local();
893             if ( e++ > 0 ) myIsStolen = true;
894             // work imbalance increases chances for stealing
895             tbb::parallel_for( 0, 10+i, utils::DummyBody( 100 ) );
896             --e;
897         }
898     };
899 
900 #endif /* TBB_USE_EXCEPTIONS */
ExceptionTest()901     void ExceptionTest() {
902 #if TBB_USE_EXCEPTIONS
903         tbb::enumerable_thread_specific<int> ets;
904         std::atomic<bool> is_stolen;
905         is_stolen = false;
906         for ( ;; ) {
907             tbb::parallel_for( 0, 1000, ExceptionTestBody( ets, is_stolen ) );
908             if ( is_stolen ) break;
909         }
910         REQUIRE_MESSAGE( is_stolen, "isolate should not affect non-isolated work" );
911 #endif /* TBB_USE_EXCEPTIONS */
912     }
913 
914     struct NonConstBody {
915         unsigned int state;
operator ()TestIsolatedExecuteNS::NonConstBody916         void operator()() {
917             state ^= ~0u;
918         }
919     };
920 
TestNonConstBody()921     void TestNonConstBody() {
922         NonConstBody body;
923         body.state = 0x6c97d5ed;
924         tbb::this_task_arena::isolate(body);
925         REQUIRE_MESSAGE(body.state == 0x93682a12, "The wrong state");
926     }
927 
928     // TODO: Consider tbb::task_group instead of explicit task API.
929     class TestEnqueueTask : public tbb::detail::d1::task {
930         using wait_context = tbb::detail::d1::wait_context;
931 
932         tbb::enumerable_thread_specific<bool>& executed;
933         std::atomic<int>& completed;
934 
935     public:
936         wait_context& waiter;
937         tbb::task_arena& arena;
938         static const int N = 100;
939 
TestEnqueueTask(tbb::enumerable_thread_specific<bool> & exe,std::atomic<int> & c,wait_context & w,tbb::task_arena & a)940         TestEnqueueTask(tbb::enumerable_thread_specific<bool>& exe, std::atomic<int>& c, wait_context& w, tbb::task_arena& a)
941             : executed(exe), completed(c), waiter(w), arena(a) {}
942 
execute(tbb::detail::d1::execution_data &)943         tbb::detail::d1::task* execute(tbb::detail::d1::execution_data&) override {
944             for (int i = 0; i < N; ++i) {
945                 arena.enqueue([&]() {
946                     executed.local() = true;
947                     ++completed;
948                     for (int j = 0; j < 100; j++) utils::yield();
949                     waiter.release(1);
950                 });
951             }
952             return nullptr;
953         }
cancel(tbb::detail::d1::execution_data &)954         tbb::detail::d1::task* cancel(tbb::detail::d1::execution_data&) override { return nullptr; }
955     };
956 
957     class TestEnqueueIsolateBody : utils::NoCopy {
958         tbb::enumerable_thread_specific<bool>& executed;
959         std::atomic<int>& completed;
960         tbb::task_arena& arena;
961     public:
962         static const int N = 100;
963 
TestEnqueueIsolateBody(tbb::enumerable_thread_specific<bool> & exe,std::atomic<int> & c,tbb::task_arena & a)964         TestEnqueueIsolateBody(tbb::enumerable_thread_specific<bool>& exe, std::atomic<int>& c, tbb::task_arena& a)
965             : executed(exe), completed(c), arena(a) {}
operator ()()966         void operator()() {
967             tbb::task_group_context ctx;
968             tbb::detail::d1::wait_context waiter(N);
969 
970             TestEnqueueTask root(executed, completed, waiter, arena);
971             tbb::detail::d1::execute_and_wait(root, ctx, waiter, ctx);
972         }
973     };
974 
TestEnqueue()975     void TestEnqueue() {
976         tbb::enumerable_thread_specific<bool> executed(false);
977         std::atomic<int> completed;
978         tbb::task_arena arena{tbb::task_arena::attach()};
979 
980         // Check that the main thread can process enqueued tasks.
981         completed = 0;
982         TestEnqueueIsolateBody b1(executed, completed, arena);
983         b1();
984 
985         if (!executed.local()) {
986             REPORT("Warning: No one enqueued task has executed by the main thread.\n");
987         }
988 
989         executed.local() = false;
990         completed = 0;
991         const int N = 100;
992         // Create enqueued tasks out of isolation.
993 
994         tbb::task_group_context ctx;
995         tbb::detail::d1::wait_context waiter(N);
996         for (int i = 0; i < N; ++i) {
997             arena.enqueue([&]() {
998                 executed.local() = true;
999                 ++completed;
1000                 utils::yield();
1001                 waiter.release(1);
1002             });
1003         }
1004         TestEnqueueIsolateBody b2(executed, completed, arena);
1005         tbb::this_task_arena::isolate(b2);
1006         REQUIRE_MESSAGE(executed.local() == false, "An enqueued task was executed within isolate.");
1007 
1008         tbb::detail::d1::wait(waiter, ctx);
1009         // while (completed < TestEnqueueTask::N + N) utils::yield();
1010     }
1011 }
1012 
TestIsolatedExecute()1013 void TestIsolatedExecute() {
1014     // At least 3 threads (owner + 2 thieves) are required to reproduce a situation when the owner steals outer
1015     // level task on a nested level. If we have only one thief then it will execute outer level tasks first and
1016     // the owner will not have a possibility to steal outer level tasks.
1017     int platform_max_thread = tbb::this_task_arena::max_concurrency();
1018     int num_threads = utils::min( platform_max_thread, 3 );
1019     {
1020         // Too many threads require too many work to reproduce the stealing from outer level.
1021         tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, utils::max(num_threads, 7));
1022         TestIsolatedExecuteNS::TwoLoopsTest();
1023         TestIsolatedExecuteNS::HeavyMixTest();
1024         TestIsolatedExecuteNS::ExceptionTest();
1025     }
1026     tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, num_threads);
1027     TestIsolatedExecuteNS::HeavyMixTest();
1028     TestIsolatedExecuteNS::TestNonConstBody();
1029     TestIsolatedExecuteNS::TestEnqueue();
1030 }
1031 
1032 //-----------------------------------------------------------------------------------------//
1033 
1034 class TestDelegatedSpawnWaitBody : utils::NoAssign {
1035     tbb::task_arena &my_a;
1036     utils::SpinBarrier &my_b1, &my_b2;
1037 public:
TestDelegatedSpawnWaitBody(tbb::task_arena & a,utils::SpinBarrier & b1,utils::SpinBarrier & b2)1038     TestDelegatedSpawnWaitBody( tbb::task_arena &a, utils::SpinBarrier &b1, utils::SpinBarrier &b2)
1039         : my_a(a), my_b1(b1), my_b2(b2) {}
1040     // NativeParallelFor's functor
operator ()(int idx) const1041     void operator()(int idx) const {
1042         if ( idx==0 ) { // thread 0 works in the arena, thread 1 waits for it (to prevent test hang)
1043             for (int i = 0; i < 2; ++i) {
1044                 my_a.enqueue([this] { my_b1.wait(); }); // tasks to sync with workers
1045             }
1046             tbb::task_group tg;
1047             my_b1.wait(); // sync with the workers
1048             for( int i=0; i<100000; ++i) {
1049                 my_a.execute([&tg] { tg.run([] {}); });
1050             }
1051             my_a.execute([&tg] {tg.wait(); });
1052         }
1053 
1054         my_b2.wait(); // sync both threads
1055     }
1056 };
1057 
TestDelegatedSpawnWait()1058 void TestDelegatedSpawnWait() {
1059     if (tbb::this_task_arena::max_concurrency() < 3) {
1060         // The test requires at least 2 worker threads
1061         return;
1062     }
1063     // Regression test for a bug with missed wakeup notification from a delegated task
1064     tbb::task_arena a(2,0);
1065     a.initialize();
1066     utils::SpinBarrier barrier1(3), barrier2(2);
1067     utils::NativeParallelFor( 2, TestDelegatedSpawnWaitBody(a, barrier1, barrier2) );
1068     a.debug_wait_until_empty();
1069 }
1070 
1071 //-----------------------------------------------------------------------------------------//
1072 
1073 class TestMultipleWaitsArenaWait : utils::NoAssign {
1074     using wait_context = tbb::detail::d1::wait_context;
1075 public:
TestMultipleWaitsArenaWait(int idx,int bunch_size,int num_tasks,std::vector<wait_context * > & waiters,std::atomic<int> & processed,tbb::task_group_context & tgc)1076     TestMultipleWaitsArenaWait( int idx, int bunch_size, int num_tasks, std::vector<wait_context*>& waiters, std::atomic<int>& processed, tbb::task_group_context& tgc )
1077         : my_idx( idx ), my_bunch_size( bunch_size ), my_num_tasks(num_tasks), my_waiters( waiters ), my_processed( processed ), my_context(tgc) {}
operator ()() const1078     void operator()() const {
1079         ++my_processed;
1080         // Wait for all tasks
1081         if ( my_idx < my_num_tasks ) {
1082             tbb::detail::d1::wait(*my_waiters[my_idx], my_context);
1083         }
1084         // Signal waiting tasks
1085         if ( my_idx >= my_bunch_size ) {
1086             my_waiters[my_idx-my_bunch_size]->release();
1087         }
1088     }
1089 private:
1090     int my_idx;
1091     int my_bunch_size;
1092     int my_num_tasks;
1093     std::vector<wait_context*>& my_waiters;
1094     std::atomic<int>& my_processed;
1095     tbb::task_group_context& my_context;
1096 };
1097 
1098 class TestMultipleWaitsThreadBody : utils::NoAssign {
1099     using wait_context = tbb::detail::d1::wait_context;
1100 public:
TestMultipleWaitsThreadBody(int bunch_size,int num_tasks,tbb::task_arena & a,std::vector<wait_context * > & waiters,std::atomic<int> & processed,tbb::task_group_context & tgc)1101     TestMultipleWaitsThreadBody( int bunch_size, int num_tasks, tbb::task_arena& a, std::vector<wait_context*>& waiters, std::atomic<int>& processed, tbb::task_group_context& tgc )
1102         : my_bunch_size( bunch_size ), my_num_tasks( num_tasks ), my_arena( a ), my_waiters( waiters ), my_processed( processed ), my_context(tgc) {}
operator ()(int idx) const1103     void operator()( int idx ) const {
1104         my_arena.execute( TestMultipleWaitsArenaWait( idx, my_bunch_size, my_num_tasks, my_waiters, my_processed, my_context ) );
1105         --my_processed;
1106     }
1107 private:
1108     int my_bunch_size;
1109     int my_num_tasks;
1110     tbb::task_arena& my_arena;
1111     std::vector<wait_context*>& my_waiters;
1112     std::atomic<int>& my_processed;
1113     tbb::task_group_context& my_context;
1114 };
1115 
TestMultipleWaits(int num_threads,int num_bunches,int bunch_size)1116 void TestMultipleWaits( int num_threads, int num_bunches, int bunch_size ) {
1117     tbb::task_arena a( num_threads );
1118     const int num_tasks = (num_bunches-1)*bunch_size;
1119 
1120     tbb::task_group_context tgc;
1121     std::vector<tbb::detail::d1::wait_context*> waiters(num_tasks);
1122     for (auto& w : waiters) w = new tbb::detail::d1::wait_context(0);
1123 
1124     std::atomic<int> processed(0);
1125     for ( int repeats = 0; repeats<10; ++repeats ) {
1126         int idx = 0;
1127         for ( int bunch = 0; bunch < num_bunches-1; ++bunch ) {
1128             // Sync with the previous bunch of waiters to prevent "false" nested dependencies (when a nested task waits for an outer task).
1129             while ( processed < bunch*bunch_size ) utils::yield();
1130             // Run the bunch of threads/waiters that depend on the next bunch of threads/waiters.
1131             for ( int i = 0; i<bunch_size; ++i ) {
1132                 waiters[idx]->reserve();
1133                 std::thread( TestMultipleWaitsThreadBody( bunch_size, num_tasks, a, waiters, processed, tgc ), idx++ ).detach();
1134             }
1135         }
1136         // No sync because the threads of the last bunch do not call wait_for_all.
1137         // Run the last bunch of threads.
1138         for ( int i = 0; i<bunch_size; ++i )
1139             std::thread( TestMultipleWaitsThreadBody( bunch_size, num_tasks, a, waiters, processed, tgc ), idx++ ).detach();
1140         while ( processed ) utils::yield();
1141     }
1142     for (auto w : waiters) delete w;
1143 }
1144 
TestMultipleWaits()1145 void TestMultipleWaits() {
1146     // Limit the number of threads to prevent heavy oversubscription.
1147 #if TBB_TEST_LOW_WORKLOAD
1148     const int max_threads = std::min( 4, tbb::this_task_arena::max_concurrency() );
1149 #else
1150     const int max_threads = std::min( 16, tbb::this_task_arena::max_concurrency() );
1151 #endif
1152 
1153     utils::FastRandom<> rnd(1234);
1154     for ( int threads = 1; threads <= max_threads; threads += utils::max( threads/2, 1 ) ) {
1155         for ( int i = 0; i<3; ++i ) {
1156             const int num_bunches = 3 + rnd.get()%3;
1157             const int bunch_size = max_threads + rnd.get()%max_threads;
1158             TestMultipleWaits( threads, num_bunches, bunch_size );
1159         }
1160     }
1161 }
1162 
1163 //--------------------------------------------------//
1164 
TestSmallStackSize()1165 void TestSmallStackSize() {
1166     tbb::global_control gc(tbb::global_control::thread_stack_size,
1167             tbb::global_control::active_value(tbb::global_control::thread_stack_size) / 2 );
1168     // The test produces the warning (not a error) if fails. So the test is run many times
1169     // to make the log annoying (to force to consider it as an error).
1170     for (int i = 0; i < 100; ++i) {
1171         tbb::task_arena a;
1172         a.initialize();
1173     }
1174 }
1175 
1176 //--------------------------------------------------//
1177 
1178 namespace TestMoveSemanticsNS {
1179     struct TestFunctor {
operator ()TestMoveSemanticsNS::TestFunctor1180         void operator()() const {};
1181     };
1182 
1183     struct MoveOnlyFunctor : utils::MoveOnly, TestFunctor {
MoveOnlyFunctorTestMoveSemanticsNS::MoveOnlyFunctor1184         MoveOnlyFunctor() : utils::MoveOnly() {};
MoveOnlyFunctorTestMoveSemanticsNS::MoveOnlyFunctor1185         MoveOnlyFunctor(MoveOnlyFunctor&& other) : utils::MoveOnly(std::move(other)) {};
1186     };
1187 
1188     struct MovePreferableFunctor : utils::Movable, TestFunctor {
MovePreferableFunctorTestMoveSemanticsNS::MovePreferableFunctor1189         MovePreferableFunctor() : utils::Movable() {};
MovePreferableFunctorTestMoveSemanticsNS::MovePreferableFunctor1190         MovePreferableFunctor(MovePreferableFunctor&& other) : utils::Movable( std::move(other) ) {};
MovePreferableFunctorTestMoveSemanticsNS::MovePreferableFunctor1191         MovePreferableFunctor(const MovePreferableFunctor& other) : utils::Movable(other) {};
1192     };
1193 
1194     struct NoMoveNoCopyFunctor : utils::NoCopy, TestFunctor {
NoMoveNoCopyFunctorTestMoveSemanticsNS::NoMoveNoCopyFunctor1195         NoMoveNoCopyFunctor() : utils::NoCopy() {};
1196         // mv ctor is not allowed as cp ctor from parent NoCopy
1197     private:
1198         NoMoveNoCopyFunctor(NoMoveNoCopyFunctor&&);
1199     };
1200 
TestFunctors()1201     void TestFunctors() {
1202         tbb::task_arena ta;
1203         MovePreferableFunctor mpf;
1204         // execute() doesn't have any copies or moves of arguments inside the impl
1205         ta.execute( NoMoveNoCopyFunctor() );
1206 
1207         ta.enqueue( MoveOnlyFunctor() );
1208         ta.enqueue( mpf );
1209         REQUIRE_MESSAGE(mpf.alive, "object was moved when was passed by lval");
1210         mpf.Reset();
1211         ta.enqueue( std::move(mpf) );
1212         REQUIRE_MESSAGE(!mpf.alive, "object was copied when was passed by rval");
1213         mpf.Reset();
1214     }
1215 }
1216 
TestMoveSemantics()1217 void TestMoveSemantics() {
1218     TestMoveSemanticsNS::TestFunctors();
1219 }
1220 
1221 //--------------------------------------------------//
1222 
1223 #include <vector>
1224 
1225 #include "common/state_trackable.h"
1226 
1227 namespace TestReturnValueNS {
1228     struct noDefaultTag {};
1229     class ReturnType : public StateTrackable<> {
1230         static const int SIZE = 42;
1231         std::vector<int> data;
1232     public:
ReturnType(noDefaultTag)1233         ReturnType(noDefaultTag) : StateTrackable<>(0) {}
1234         // Define copy constructor to test that it is never called
ReturnType(const ReturnType & r)1235         ReturnType(const ReturnType& r) : StateTrackable<>(r), data(r.data) {}
ReturnType(ReturnType && r)1236         ReturnType(ReturnType&& r) : StateTrackable<>(std::move(r)), data(std::move(r.data)) {}
1237 
fill()1238         void fill() {
1239             for (int i = 0; i < SIZE; ++i)
1240                 data.push_back(i);
1241         }
check()1242         void check() {
1243             REQUIRE(data.size() == unsigned(SIZE));
1244             for (int i = 0; i < SIZE; ++i)
1245                 REQUIRE(data[i] == i);
1246             StateTrackableCounters::counters_type& cnts = StateTrackableCounters::counters;
1247             REQUIRE(cnts[StateTrackableBase::DefaultInitialized] == 0);
1248             REQUIRE(cnts[StateTrackableBase::DirectInitialized] == 1);
1249             std::size_t copied = cnts[StateTrackableBase::CopyInitialized];
1250             std::size_t moved = cnts[StateTrackableBase::MoveInitialized];
1251             REQUIRE(cnts[StateTrackableBase::Destroyed] == copied + moved);
1252             // The number of copies/moves should not exceed 3 if copy elision takes a place:
1253             // function return, store to an internal storage, acquire internal storage.
1254             // For compilation, without copy elision, this number may be grown up to 7.
1255             REQUIRE((copied == 0 && moved <= 7));
1256             WARN_MESSAGE(moved <= 3,
1257                 "Warning: The number of copies/moves should not exceed 3 if copy elision takes a place."
1258                 "Take an attention to this warning only if copy elision is enabled."
1259             );
1260         }
1261     };
1262 
1263     template <typename R>
function()1264     R function() {
1265         noDefaultTag tag;
1266         R r(tag);
1267         r.fill();
1268         return r;
1269     }
1270 
1271     template <>
function()1272     void function<void>() {}
1273 
1274     template <typename R>
1275     struct Functor {
operator ()TestReturnValueNS::Functor1276         R operator()() const {
1277             return function<R>();
1278         }
1279     };
1280 
arena()1281     tbb::task_arena& arena() {
1282         static tbb::task_arena a;
1283         return a;
1284     }
1285 
1286     template <typename F>
TestExecute(F & f)1287     void TestExecute(F &f) {
1288         StateTrackableCounters::reset();
1289         ReturnType r{arena().execute(f)};
1290         r.check();
1291     }
1292 
1293     template <typename F>
TestExecute(const F & f)1294     void TestExecute(const F &f) {
1295         StateTrackableCounters::reset();
1296         ReturnType r{arena().execute(f)};
1297         r.check();
1298     }
1299     template <typename F>
TestIsolate(F & f)1300     void TestIsolate(F &f) {
1301         StateTrackableCounters::reset();
1302         ReturnType r{tbb::this_task_arena::isolate(f)};
1303         r.check();
1304     }
1305 
1306     template <typename F>
TestIsolate(const F & f)1307     void TestIsolate(const F &f) {
1308         StateTrackableCounters::reset();
1309         ReturnType r{tbb::this_task_arena::isolate(f)};
1310         r.check();
1311     }
1312 
Test()1313     void Test() {
1314         TestExecute(Functor<ReturnType>());
1315         Functor<ReturnType> f1;
1316         TestExecute(f1);
1317         TestExecute(function<ReturnType>);
1318 
1319         arena().execute(Functor<void>());
1320         Functor<void> f2;
1321         arena().execute(f2);
1322         arena().execute(function<void>);
1323         TestIsolate(Functor<ReturnType>());
1324         TestIsolate(f1);
1325         TestIsolate(function<ReturnType>);
1326         tbb::this_task_arena::isolate(Functor<void>());
1327         tbb::this_task_arena::isolate(f2);
1328         tbb::this_task_arena::isolate(function<void>);
1329     }
1330 }
1331 
TestReturnValue()1332 void TestReturnValue() {
1333     TestReturnValueNS::Test();
1334 }
1335 
1336 //--------------------------------------------------//
1337 
1338 // MyObserver checks if threads join to the same arena
1339 struct MyObserver: public tbb::task_scheduler_observer {
1340     tbb::enumerable_thread_specific<tbb::task_arena*>& my_tls;
1341     tbb::task_arena& my_arena;
1342     std::atomic<int>& my_failure_counter;
1343     std::atomic<int>& my_counter;
1344     utils::SpinBarrier& m_barrier;
1345 
MyObserverMyObserver1346     MyObserver(tbb::task_arena& a,
1347         tbb::enumerable_thread_specific<tbb::task_arena*>& tls,
1348         std::atomic<int>& failure_counter,
1349         std::atomic<int>& counter,
1350         utils::SpinBarrier& barrier)
1351         : tbb::task_scheduler_observer(a), my_tls(tls), my_arena(a),
1352         my_failure_counter(failure_counter), my_counter(counter), m_barrier(barrier) {
1353         observe(true);
1354     }
~MyObserverMyObserver1355     ~MyObserver(){
1356         observe(false);
1357     }
on_scheduler_entryMyObserver1358     void on_scheduler_entry(bool worker) override {
1359         if (worker) {
1360             ++my_counter;
1361             tbb::task_arena*& cur_arena = my_tls.local();
1362             if (cur_arena != nullptr && cur_arena != &my_arena) {
1363                 ++my_failure_counter;
1364             }
1365             cur_arena = &my_arena;
1366             m_barrier.wait();
1367         }
1368     }
on_scheduler_exitMyObserver1369     void on_scheduler_exit(bool worker) override {
1370         if (worker) {
1371             m_barrier.wait(); // before wakeup
1372             m_barrier.wait(); // after wakeup
1373         }
1374     }
1375 };
1376 
TestArenaWorkersMigrationWithNumThreads(int n_threads=0)1377 void TestArenaWorkersMigrationWithNumThreads(int n_threads = 0) {
1378     if (n_threads == 0) {
1379         n_threads = tbb::this_task_arena::max_concurrency();
1380     }
1381 
1382     const int max_n_arenas = 8;
1383     int n_arenas = 2;
1384     if(n_threads > 16) {
1385         n_arenas = max_n_arenas;
1386     } else if (n_threads > 8) {
1387         n_arenas = 4;
1388     }
1389 
1390     int n_workers = n_threads - 1;
1391     n_workers = n_arenas * (n_workers / n_arenas);
1392     if (n_workers == 0) {
1393         return;
1394     }
1395 
1396     n_threads = n_workers + 1;
1397     tbb::global_control control(tbb::global_control::max_allowed_parallelism, n_threads);
1398 
1399     const int n_repetitions = 20;
1400     const int n_outer_repetitions = 100;
1401     std::multiset<float> failure_ratio; // for median calculating
1402     utils::SpinBarrier barrier(n_threads);
1403     utils::SpinBarrier worker_barrier(n_workers);
1404     MyObserver* observer[max_n_arenas];
1405     std::vector<tbb::task_arena> arenas(n_arenas);
1406     std::atomic<int> failure_counter;
1407     std::atomic<int> counter;
1408     tbb::enumerable_thread_specific<tbb::task_arena*> tls;
1409 
1410     for (int i = 0; i < n_arenas; ++i) {
1411         arenas[i].initialize(n_workers / n_arenas + 1); // +1 for master
1412         observer[i] = new MyObserver(arenas[i], tls, failure_counter, counter, barrier);
1413     }
1414 
1415     int ii = 0;
1416     for (; ii < n_outer_repetitions; ++ii) {
1417         failure_counter = 0;
1418         counter = 0;
1419 
1420         // Main code
1421         auto wakeup = [&arenas] { for (auto& a : arenas) a.enqueue([]{}); };
1422         wakeup();
1423         for (int j = 0; j < n_repetitions; ++j) {
1424             barrier.wait(); // entry
1425             barrier.wait(); // exit1
1426             wakeup();
1427             barrier.wait(); // exit2
1428         }
1429         barrier.wait(); // entry
1430         barrier.wait(); // exit1
1431         barrier.wait(); // exit2
1432 
1433         failure_ratio.insert(float(failure_counter) / counter);
1434         tls.clear();
1435         // collect 3 elements in failure_ratio before calculating median
1436         if (ii > 1) {
1437             std::multiset<float>::iterator it = failure_ratio.begin();
1438             std::advance(it, failure_ratio.size() / 2);
1439             if (*it < 0.02)
1440                 break;
1441         }
1442     }
1443     for (int i = 0; i < n_arenas; ++i) {
1444         delete observer[i];
1445     }
1446     // check if median is so big
1447     std::multiset<float>::iterator it = failure_ratio.begin();
1448     std::advance(it, failure_ratio.size() / 2);
1449     // TODO: decrease constants 0.05 and 0.3 by setting ratio between n_threads and n_arenas
1450     if (*it > 0.05) {
1451         REPORT("Warning: So many cases when threads join to different arenas.\n");
1452         REQUIRE_MESSAGE(*it <= 0.3, "A lot of cases when threads join to different arenas.\n");
1453     }
1454 }
1455 
TestArenaWorkersMigration()1456 void TestArenaWorkersMigration() {
1457     TestArenaWorkersMigrationWithNumThreads(4);
1458     if (tbb::this_task_arena::max_concurrency() != 4) {
1459         TestArenaWorkersMigrationWithNumThreads();
1460     }
1461 }
1462 
1463 //--------------------------------------------------//
TestDefaultCreatedWorkersAmount()1464 void TestDefaultCreatedWorkersAmount() {
1465     int threads = tbb::this_task_arena::max_concurrency();
1466     utils::NativeParallelFor(1, [threads](int idx) {
1467         REQUIRE_MESSAGE(idx == 0, "more than 1 thread is going to reset TLS");
1468         utils::SpinBarrier barrier(threads);
1469         ResetTLS();
1470         for (auto blocked : { false, true }) {
1471             for (int trail = 0; trail < (blocked ? 10 : 10000); ++trail) {
1472                 tbb::parallel_for(0, threads, [threads, blocked, &barrier](int) {
1473                     CHECK_FAST_MESSAGE(threads == tbb::this_task_arena::max_concurrency(), "concurrency level is not equal specified threadnum");
1474                     CHECK_FAST_MESSAGE(tbb::this_task_arena::current_thread_index() < tbb::this_task_arena::max_concurrency(), "amount of created threads is more than specified by default");
1475                     local_id.local() = 1;
1476                     if (blocked) {
1477                         // If there is more threads than expected, 'sleep' gives a chance to join unexpected threads.
1478                         utils::Sleep(1);
1479                         barrier.wait();
1480                     }
1481                 }, tbb::simple_partitioner());
1482                 REQUIRE_MESSAGE(local_id.size() <= size_t(threads), "amount of created threads is not equal to default num");
1483                 if (blocked) {
1484                     REQUIRE_MESSAGE(local_id.size() == size_t(threads), "amount of created threads is not equal to default num");
1485                 }
1486             }
1487         }
1488     });
1489 }
1490 
TestAbilityToCreateWorkers(int thread_num)1491 void TestAbilityToCreateWorkers(int thread_num) {
1492     tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, thread_num);
1493     // Checks only some part of reserved-external threads amount:
1494     // 0 and 1 reserved threads are important cases but it is also needed
1495     // to collect some statistic data with other amount and to not consume
1496     // whole test session time checking each amount
1497     TestArenaConcurrency(thread_num - 1, 0, int(thread_num / 2.72));
1498     TestArenaConcurrency(thread_num, 1, int(thread_num / 3.14));
1499 }
1500 
TestDefaultWorkersLimit()1501 void TestDefaultWorkersLimit() {
1502     TestDefaultCreatedWorkersAmount();
1503 #if TBB_TEST_LOW_WORKLOAD
1504     TestAbilityToCreateWorkers(24);
1505 #else
1506     TestAbilityToCreateWorkers(256);
1507 #endif
1508 }
1509 
1510 #if TBB_USE_EXCEPTIONS
1511 
ExceptionInExecute()1512 void ExceptionInExecute() {
1513     std::size_t thread_number = utils::get_platform_max_threads();
1514     int arena_concurrency = static_cast<int>(thread_number) / 2;
1515     tbb::task_arena test_arena(arena_concurrency, arena_concurrency);
1516 
1517     std::atomic<int> canceled_task{};
1518 
1519     auto parallel_func = [&test_arena, &canceled_task] (std::size_t) {
1520         for (std::size_t i = 0; i < 1000; ++i) {
1521             try {
1522                 test_arena.execute([] {
1523                     volatile bool suppress_unreachable_code_warning = true;
1524                     if (suppress_unreachable_code_warning) {
1525                         throw -1;
1526                     }
1527                 });
1528                 FAIL("An exception should have thrown.");
1529             } catch (int) {
1530                 ++canceled_task;
1531             } catch (...) {
1532                 FAIL("Wrong type of exception.");
1533             }
1534         }
1535     };
1536 
1537     utils::NativeParallelFor(thread_number, parallel_func);
1538     CHECK(canceled_task == thread_number * 1000);
1539 }
1540 
1541 #endif // TBB_USE_EXCEPTIONS
1542 
1543 class simple_observer : public tbb::task_scheduler_observer {
1544     static std::atomic<int> idx_counter;
1545     int my_idx;
1546     int myMaxConcurrency;   // concurrency of the associated arena
1547     int myNumReservedSlots; // reserved slots in the associated arena
on_scheduler_entry(bool is_worker)1548     void on_scheduler_entry( bool is_worker ) override {
1549         int current_index = tbb::this_task_arena::current_thread_index();
1550         CHECK(current_index < (myMaxConcurrency > 1 ? myMaxConcurrency : 2));
1551         if (is_worker) {
1552             CHECK(current_index >= myNumReservedSlots);
1553         }
1554     }
on_scheduler_exit(bool)1555     void on_scheduler_exit( bool /*is_worker*/ ) override
1556     {}
1557 public:
simple_observer(tbb::task_arena & a,int maxConcurrency,int numReservedSlots)1558     simple_observer(tbb::task_arena &a, int maxConcurrency, int numReservedSlots)
1559         : tbb::task_scheduler_observer(a), my_idx(idx_counter++)
1560         , myMaxConcurrency(maxConcurrency)
1561         , myNumReservedSlots(numReservedSlots) {
1562         observe(true);
1563     }
1564 
~simple_observer()1565     ~simple_observer(){
1566         observe(false);
1567     }
1568 
operator <(const simple_observer & lhs,const simple_observer & rhs)1569     friend bool operator<(const simple_observer& lhs, const simple_observer& rhs) {
1570         return lhs.my_idx < rhs.my_idx;
1571     }
1572 };
1573 
1574 std::atomic<int> simple_observer::idx_counter{};
1575 
1576 struct arena_handler {
1577     enum arena_status {
1578         alive,
1579         deleting,
1580         deleted
1581     };
1582 
1583     tbb::task_arena* arena;
1584 
1585     std::atomic<arena_status> status{alive};
1586     tbb::spin_rw_mutex arena_in_use{};
1587 
1588     tbb::concurrent_set<simple_observer> observers;
1589 
arena_handlerarena_handler1590     arena_handler(tbb::task_arena* ptr) : arena(ptr)
1591     {}
1592 
operator <(const arena_handler & lhs,const arena_handler & rhs)1593     friend bool operator<(const arena_handler& lhs, const arena_handler& rhs) {
1594         return lhs.arena < rhs.arena;
1595     }
1596 };
1597 
1598 // TODO: Add observer operations
StressTestMixFunctionality()1599 void StressTestMixFunctionality() {
1600     enum operation_type {
1601         create_arena,
1602         delete_arena,
1603         attach_observer,
1604         detach_observer,
1605         arena_execute,
1606         enqueue_task,
1607         last_operation_marker
1608     };
1609 
1610     std::size_t operations_number = last_operation_marker;
1611     std::size_t thread_number = utils::get_platform_max_threads();
1612     utils::FastRandom<> operation_rnd(42);
1613     tbb::spin_mutex random_operation_guard;
1614 
1615     auto get_random_operation = [&operation_rnd, &random_operation_guard, operations_number] () {
1616         tbb::spin_mutex::scoped_lock lock(random_operation_guard);
1617         return static_cast<operation_type>(operation_rnd.get() % operations_number);
1618     };
1619 
1620     utils::FastRandom<> arena_rnd(42);
1621     tbb::spin_mutex random_arena_guard;
1622     auto get_random_arena = [&arena_rnd, &random_arena_guard] () {
1623         tbb::spin_mutex::scoped_lock lock(random_arena_guard);
1624         return arena_rnd.get();
1625     };
1626 
1627     tbb::concurrent_set<arena_handler> arenas_pool;
1628 
1629     std::vector<std::thread> thread_pool;
1630 
1631     utils::SpinBarrier thread_barrier(thread_number);
1632     std::size_t max_operations = 20000;
1633     std::atomic<std::size_t> curr_operation{};
1634 
1635     auto find_arena = [&arenas_pool](tbb::spin_rw_mutex::scoped_lock& lock) -> decltype(arenas_pool.begin()) {
1636         for (auto curr_arena = arenas_pool.begin(); curr_arena != arenas_pool.end(); ++curr_arena) {
1637             if (lock.try_acquire(curr_arena->arena_in_use, /*writer*/ false)) {
1638                 if (curr_arena->status == arena_handler::alive) {
1639                     return curr_arena;
1640                 }
1641                 else {
1642                     lock.release();
1643                 }
1644             }
1645         }
1646         return arenas_pool.end();
1647     };
1648 
1649     auto thread_func = [&] () {
1650         arenas_pool.emplace(new tbb::task_arena());
1651         thread_barrier.wait();
1652         while (curr_operation++ < max_operations) {
1653             switch (get_random_operation()) {
1654                 case create_arena :
1655                 {
1656                     arenas_pool.emplace(new tbb::task_arena());
1657                     break;
1658                 }
1659                 case delete_arena :
1660                 {
1661                     auto curr_arena = arenas_pool.begin();
1662                     for (; curr_arena != arenas_pool.end(); ++curr_arena) {
1663                         arena_handler::arena_status curr_status = arena_handler::alive;
1664                         if (curr_arena->status.compare_exchange_strong(curr_status, arena_handler::deleting)) {
1665                             break;
1666                         }
1667                     }
1668 
1669                     if (curr_arena == arenas_pool.end()) break;
1670 
1671                     tbb::spin_rw_mutex::scoped_lock lock(curr_arena->arena_in_use, /*writer*/ true);
1672 
1673                     delete curr_arena->arena;
1674                     curr_arena->status.store(arena_handler::deleted);
1675 
1676                     break;
1677                 }
1678                 case attach_observer :
1679                 {
1680                     tbb::spin_rw_mutex::scoped_lock lock{};
1681 
1682                     auto curr_arena = find_arena(lock);
1683                     if (curr_arena != arenas_pool.end()) {
1684                         curr_arena->observers.emplace(*curr_arena->arena, thread_number, 1);
1685                     }
1686                     break;
1687                 }
1688                 case detach_observer:
1689                 {
1690                     auto arena_number = get_random_arena() % arenas_pool.size();
1691                     auto curr_arena = arenas_pool.begin();
1692                     std::advance(curr_arena, arena_number);
1693 
1694                     for (auto it = curr_arena->observers.begin(); it != curr_arena->observers.end(); ++it) {
1695                         if (it->is_observing()) {
1696                             it->observe(false);
1697                             break;
1698                         }
1699                     }
1700 
1701                     break;
1702                 }
1703                 case arena_execute:
1704                 {
1705                     tbb::spin_rw_mutex::scoped_lock lock{};
1706                     auto curr_arena = find_arena(lock);
1707 
1708                     if (curr_arena != arenas_pool.end()) {
1709                         curr_arena->arena->execute([]() {
1710                             tbb::affinity_partitioner aff;
1711                             tbb::parallel_for(0, 10000, utils::DummyBody(10), tbb::auto_partitioner{});
1712                             tbb::parallel_for(0, 10000, utils::DummyBody(10), aff);
1713                         });
1714                     }
1715 
1716                     break;
1717                 }
1718                 case enqueue_task:
1719                 {
1720                     tbb::spin_rw_mutex::scoped_lock lock{};
1721                     auto curr_arena = find_arena(lock);
1722 
1723                     if (curr_arena != arenas_pool.end()) {
1724                         curr_arena->arena->enqueue([] { utils::doDummyWork(1000); });
1725                     }
1726 
1727                     break;
1728                 }
1729                 case last_operation_marker :
1730                 break;
1731             }
1732         }
1733     };
1734 
1735     for (std::size_t i = 0; i < thread_number - 1; ++i) {
1736         thread_pool.emplace_back(thread_func);
1737     }
1738 
1739     thread_func();
1740 
1741     for (std::size_t i = 0; i < thread_number - 1; ++i) {
1742         if (thread_pool[i].joinable()) thread_pool[i].join();
1743     }
1744 
1745     for (auto& handler : arenas_pool) {
1746         if (handler.status != arena_handler::deleted) delete handler.arena;
1747     }
1748 }
1749 
1750 struct enqueue_test_helper {
enqueue_test_helperenqueue_test_helper1751     enqueue_test_helper(tbb::task_arena& arena, tbb::enumerable_thread_specific<bool>& ets , std::atomic<std::size_t>& task_counter)
1752         : my_arena(arena), my_ets(ets), my_task_counter(task_counter)
1753     {}
1754 
enqueue_test_helperenqueue_test_helper1755     enqueue_test_helper(const enqueue_test_helper& ef) : my_arena(ef.my_arena), my_ets(ef.my_ets), my_task_counter(ef.my_task_counter)
1756     {}
1757 
operator ()enqueue_test_helper1758     void operator() () const {
1759         CHECK(my_ets.local());
1760         if (my_task_counter++ < 100000) my_arena.enqueue(enqueue_test_helper(my_arena, my_ets, my_task_counter));
1761         utils::yield();
1762     }
1763 
1764     tbb::task_arena& my_arena;
1765     tbb::enumerable_thread_specific<bool>& my_ets;
1766     std::atomic<std::size_t>& my_task_counter;
1767 };
1768 
test_threads_sleep(int concurrency,int reserved_slots,int num_external_threads)1769 void test_threads_sleep(int concurrency, int reserved_slots, int num_external_threads) {
1770     tbb::task_arena a(concurrency, reserved_slots);
1771     std::mutex m;
1772     std::condition_variable cond_var;
1773     bool completed{ false };
1774     utils::SpinBarrier barrier( concurrency - reserved_slots + 1 );
1775 
1776     auto body = [&] {
1777         std::unique_lock<std::mutex> lock(m);
1778         cond_var.wait(lock, [&] { return completed == true; });
1779     };
1780 
1781     for (int i = 0; i < concurrency - reserved_slots; ++i) {
1782         a.enqueue([&] {
1783             body();
1784             barrier.signalNoWait();
1785         });
1786     }
1787     std::vector<std::thread> threads;
1788     for (int i = 0; i < num_external_threads; ++i) {
1789         threads.emplace_back([&]() { a.execute(body); });
1790     }
1791     TestCPUUserTime(concurrency);
1792 
1793     {
1794         std::lock_guard<std::mutex> lock(m);
1795         completed = true;
1796         cond_var.notify_all();
1797     }
1798     for (auto& t : threads) {
1799         t.join();
1800     }
1801     barrier.wait();
1802 }
1803 
test_threads_sleep(int concurrency,int reserved_slots)1804 void test_threads_sleep(int concurrency, int reserved_slots) {
1805     test_threads_sleep(concurrency, reserved_slots, reserved_slots);
1806     test_threads_sleep(concurrency, reserved_slots, 2 * concurrency);
1807 }
1808 
1809 //--------------------------------------------------//
1810 
1811 // This test requires TBB in an uninitialized state
1812 //! \brief \ref requirement
1813 TEST_CASE("task_arena initialize soft limit ignoring affinity mask") {
1814     REQUIRE_MESSAGE((tbb::this_task_arena::current_thread_index() == tbb::task_arena::not_initialized), "TBB was initialized state");
1815     tbb::enumerable_thread_specific<int> ets;
1816 
1817     tbb::task_arena arena(int(utils::get_platform_max_threads() * 2));
__anon1d1198c31c02null1818     arena.execute([&ets] {
1819         tbb::parallel_for(0, 10000000, [&ets](int){
1820             ets.local() = 1;
1821             utils::doDummyWork(100);
1822         });
1823     });
1824 
1825     CHECK(ets.combine(std::plus<int>{}) <= int(utils::get_platform_max_threads()));
1826 }
1827 
1828 //! Test for task arena in concurrent cases
1829 //! \brief \ref requirement
1830 TEST_CASE("Test for concurrent functionality") {
1831     TestConcurrentFunctionality();
1832 }
1833 
1834 #if !EMSCRIPTEN
1835 //! For emscripten, FPU control state has not been set correctly
1836 //! Test for arena entry consistency
1837 //! \brief \ref requirement \ref error_guessing
1838 TEST_CASE("Test for task arena entry consistency") {
1839     TestArenaEntryConsistency();
1840 }
1841 #endif
1842 
1843 //! Test for task arena attach functionality
1844 //! \brief \ref requirement \ref interface
1845 TEST_CASE("Test for the attach functionality") {
1846     TestAttach(4);
1847 }
1848 
1849 //! Test for constant functor requirements
1850 //! \brief \ref requirement \ref interface
1851 TEST_CASE("Test for constant functor requirement") {
1852     TestConstantFunctorRequirement();
1853 }
1854 
1855 //! Test for move semantics support
1856 //! \brief \ref requirement \ref interface
1857 TEST_CASE("Move semantics support") {
1858     TestMoveSemantics();
1859 }
1860 
1861 //! Test for different return value types
1862 //! \brief \ref requirement \ref interface
1863 TEST_CASE("Return value test") {
1864     TestReturnValue();
1865 }
1866 
1867 //! Test for delegated task spawn in case of unsuccessful slot attach
1868 //! \brief \ref error_guessing
1869 TEST_CASE("Delegated spawn wait") {
1870     TestDelegatedSpawnWait();
1871 }
1872 
1873 #if !EMSCRIPTEN
1874 //! For emscripten, FPU control state has not been set correctly
1875 //! Test task arena isolation functionality
1876 //! \brief \ref requirement \ref interface
1877 TEST_CASE("Isolated execute") {
1878     // Isolation tests cases is valid only for more then 2 threads
1879     if (tbb::this_task_arena::max_concurrency() > 2) {
1880         TestIsolatedExecute();
1881     }
1882 }
1883 #endif
1884 
1885 //! Test for TBB Workers creation limits
1886 //! \brief \ref requirement
1887 TEST_CASE("Default workers limit") {
1888     TestDefaultWorkersLimit();
1889 }
1890 
1891 //! Test for workers migration between arenas
1892 //! \brief \ref error_guessing \ref stress
1893 TEST_CASE("Arena workers migration") {
1894     TestArenaWorkersMigration();
1895 }
1896 
1897 #if !EMSCRIPTEN
1898 //! For emscripten, FPU control state has not been set correctly
1899 //! Test for multiple waits, threads should not block each other
1900 //! \brief \ref requirement
1901 TEST_CASE("Multiple waits") {
1902     TestMultipleWaits();
1903 }
1904 #endif
1905 
1906 //! Test for small stack size settings and arena initialization
1907 //! \brief \ref error_guessing
1908 TEST_CASE("Small stack size") {
1909     TestSmallStackSize();
1910 }
1911 
1912 #if TBB_USE_EXCEPTIONS
1913 //! \brief \ref requirement \ref stress
1914 TEST_CASE("Test for exceptions during execute.") {
1915     ExceptionInExecute();
1916 }
1917 
1918 //! \brief \ref error_guessing
1919 TEST_CASE("Exception thrown during tbb::task_arena::execute call") {
1920     struct throwing_obj {
throwing_objthrowing_obj1921         throwing_obj() {
1922             volatile bool flag = true;
1923             if (flag) throw std::exception{};
1924         }
1925         throwing_obj(const throwing_obj&) = default;
~throwing_objthrowing_obj1926         ~throwing_obj() { FAIL("An destructor was called."); }
1927     };
1928 
1929     tbb::task_arena arena;
1930 
__anon1d1198c31e02null1931     REQUIRE_THROWS_AS( [&] {
1932         arena.execute([] {
1933             return throwing_obj{};
1934         });
1935     }(), std::exception );
1936 }
1937 #endif // TBB_USE_EXCEPTIONS
1938 
1939 //! \brief \ref stress
1940 TEST_CASE("Stress test with mixing functionality") {
1941     StressTestMixFunctionality();
1942 }
1943 
1944 //! \brief \ref stress
1945 TEST_CASE("Workers oversubscription") {
1946     std::size_t num_threads = utils::get_platform_max_threads();
1947     tbb::enumerable_thread_specific<bool> ets;
1948     tbb::global_control gl(tbb::global_control::max_allowed_parallelism, num_threads * 2);
1949     tbb::task_arena arena(static_cast<int>(num_threads) * 2);
1950 
1951     utils::SpinBarrier barrier(num_threads * 2);
1952 
__anon1d1198c32002null1953     arena.execute([&] {
1954         tbb::parallel_for(std::size_t(0), num_threads * 2,
1955             [&] (const std::size_t&) {
1956                 ets.local() = true;
1957                 barrier.wait();
1958             }
1959         );
1960     });
1961 
1962     utils::yield();
1963 
1964     std::atomic<std::size_t> task_counter{0};
1965     for (std::size_t i = 0; i < num_threads / 4 + 1; ++i) {
1966         arena.enqueue(enqueue_test_helper(arena, ets, task_counter));
1967     }
1968 
1969     while (task_counter < 100000) utils::yield();
1970 
__anon1d1198c32202null1971     arena.execute([&] {
1972         tbb::parallel_for(std::size_t(0), num_threads * 2,
1973             [&] (const std::size_t&) {
1974                 CHECK(ets.local());
1975                 barrier.wait();
1976             }
1977         );
1978     });
1979 }
1980 
1981 #if TBB_USE_EXCEPTIONS
1982 //! The test for error in scheduling empty task_handle
1983 //! \brief \ref requirement
1984 TEST_CASE("Empty task_handle cannot be scheduled"
should_fail()1985         * doctest::should_fail()    //Test needs to revised as implementation uses assertions instead of exceptions
1986         * doctest::skip()           //skip the test for now, to not pollute the test log
1987 ){
1988     tbb::task_arena ta;
1989 
1990     CHECK_THROWS_WITH_AS(ta.enqueue(tbb::task_handle{}),                    "Attempt to schedule empty task_handle", std::runtime_error);
1991     CHECK_THROWS_WITH_AS(tbb::this_task_arena::enqueue(tbb::task_handle{}), "Attempt to schedule empty task_handle", std::runtime_error);
1992 }
1993 #endif
1994 
1995 #if !EMSCRIPTEN
1996 //! For emscripten, FPU control state has not been set correctly
1997 //! \brief \ref error_guessing
1998 TEST_CASE("Test threads sleep") {
1999     for (auto concurrency_level : utils::concurrency_range()) {
2000         int conc = int(concurrency_level);
2001         test_threads_sleep(conc, 0);
2002         test_threads_sleep(conc, 1);
2003         test_threads_sleep(conc, conc/2);
2004         test_threads_sleep(conc, conc);
2005     }
2006 }
2007 #endif
2008 
2009 #if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS
2010 
2011 //! Basic test for is_inside_task in task_group
2012 //! \brief \ref interface \ref requirement
2013 TEST_CASE("is_inside_task in task_group"){
2014     CHECK( false == tbb::is_inside_task());
2015 
2016     tbb::task_group tg;
__anon1d1198c32402null2017     tg.run_and_wait([&]{
2018         CHECK( true == tbb::is_inside_task());
2019     });
2020 }
2021 
2022 //! Basic test for is_inside_task in arena::execute
2023 //! \brief \ref interface \ref requirement
2024 TEST_CASE("is_inside_task in arena::execute"){
2025     CHECK( false == tbb::is_inside_task());
2026 
2027     tbb::task_arena arena;
2028 
__anon1d1198c32502null2029     arena.execute([&]{
2030         // The execute method is processed outside of any task
2031         CHECK( false == tbb::is_inside_task());
2032     });
2033 }
2034 
2035 //! The test for is_inside_task in arena::execute when inside other task
2036 //! \brief \ref error_guessing
2037 TEST_CASE("is_inside_task in arena::execute") {
2038     CHECK(false == tbb::is_inside_task());
2039 
2040     tbb::task_arena arena;
2041     tbb::task_group tg;
__anon1d1198c32602null2042     tg.run_and_wait([&] {
2043         arena.execute([&] {
2044             // The execute method is processed outside of any task
2045             CHECK(false == tbb::is_inside_task());
2046         });
2047     });
2048 }
2049 #endif //__TBB_PREVIEW_TASK_GROUP_EXTENSIONS
2050 
2051 //! \brief \ref interface \ref requirement \ref regression
2052 TEST_CASE("worker threads occupy slots in correct range") {
2053     std::vector<tbb::task_arena> arenas(42);
2054     for (auto& arena : arenas) {
2055         arena.initialize(1, 0);
2056     }
2057 
2058     std::atomic<int> counter{0};
2059     for (auto& arena : arenas) {
__anon1d1198c32802null2060         arena.enqueue([&] {
2061             CHECK(tbb::this_task_arena::current_thread_index() == 0);
2062             ++counter;
2063         });
2064     }
2065 
2066     while (counter < 42) { utils::yield(); }
2067 }
2068