xref: /oneTBB/test/tbb/test_task_arena.cpp (revision 8dcbd5b1)
1 /*
2     Copyright (c) 2005-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "common/test.h"
18 
19 #define __TBB_EXTRA_DEBUG 1
20 #include "common/spin_barrier.h"
21 #include "common/utils.h"
22 #include "common/utils_report.h"
23 #include "common/utils_concurrency_limit.h"
24 
25 #include "tbb/task_arena.h"
26 #include "tbb/task_scheduler_observer.h"
27 #include "tbb/enumerable_thread_specific.h"
28 #include "tbb/parallel_for.h"
29 #include "tbb/global_control.h"
30 #include "tbb/concurrent_set.h"
31 #include "tbb/spin_mutex.h"
32 #include "tbb/spin_rw_mutex.h"
33 
34 #include <stdexcept>
35 #include <cstdlib>
36 #include <cstdio>
37 #include <vector>
38 #include <thread>
39 #include <atomic>
40 
41 //#include "harness_fp.h"
42 
43 //! \file test_task_arena.cpp
44 //! \brief Test for [scheduler.task_arena scheduler.task_scheduler_observer] specification
45 
46 //--------------------------------------------------//
47 // Test that task_arena::initialize and task_arena::terminate work when doing nothing else.
48 /* maxthread is treated as the biggest possible concurrency level. */
49 void InitializeAndTerminate( int maxthread ) {
50     for( int i=0; i<200; ++i ) {
51         switch( i&3 ) {
52             // Arena is created inactive, initialization is always explicit. Lazy initialization is covered by other test functions.
53             // Explicit initialization can either keep the original values or change those.
54             // Arena termination can be explicit or implicit (in the destructor).
55             // TODO: extend with concurrency level checks if such a method is added.
56             default: {
57                 tbb::task_arena arena( std::rand() % maxthread + 1 );
58                 CHECK_MESSAGE(!arena.is_active(), "arena should not be active until initialized");
59                 arena.initialize();
60                 CHECK(arena.is_active());
61                 arena.terminate();
62                 CHECK_MESSAGE(!arena.is_active(), "arena should not be active; it was terminated");
63                 break;
64             }
65             case 0: {
66                 tbb::task_arena arena( 1 );
67                 CHECK_MESSAGE(!arena.is_active(), "arena should not be active until initialized");
68                 arena.initialize( std::rand() % maxthread + 1 ); // change the parameters
69                 CHECK(arena.is_active());
70                 break;
71             }
72             case 1: {
73                 tbb::task_arena arena( tbb::task_arena::automatic );
74                 CHECK(!arena.is_active());
75                 arena.initialize();
76                 CHECK(arena.is_active());
77                 break;
78             }
79             case 2: {
80                 tbb::task_arena arena;
81                 CHECK_MESSAGE(!arena.is_active(), "arena should not be active until initialized");
82                 arena.initialize( std::rand() % maxthread + 1 );
83                 CHECK(arena.is_active());
84                 arena.terminate();
85                 CHECK_MESSAGE(!arena.is_active(), "arena should not be active; it was terminated");
86                 break;
87             }
88         }
89     }
90 }
91 
92 //--------------------------------------------------//
93 
94 // Definitions used in more than one test
95 typedef tbb::blocked_range<int> Range;
96 
97 // slot_id value: -1 is reserved by current_slot(), -2 is set in on_scheduler_exit() below
98 static tbb::enumerable_thread_specific<int> local_id, old_id, slot_id(-3);
99 
100 void ResetTLS() {
101     local_id.clear();
102     old_id.clear();
103     slot_id.clear();
104 }
105 
106 class ArenaObserver : public tbb::task_scheduler_observer {
107     int myId;               // unique observer/arena id within a test
108     int myMaxConcurrency;   // concurrency of the associated arena
109     int myNumReservedSlots; // reserved slots in the associated arena
110     void on_scheduler_entry( bool is_worker ) override {
111         int current_index = tbb::this_task_arena::current_thread_index();
112         CHECK(current_index < (myMaxConcurrency > 1 ? myMaxConcurrency : 2));
113         if (is_worker) {
114             CHECK(current_index >= myNumReservedSlots);
115         }
116         CHECK_MESSAGE(!old_id.local(), "double call to on_scheduler_entry");
117         old_id.local() = local_id.local();
118         CHECK_MESSAGE(old_id.local() != myId, "double entry to the same arena");
119         local_id.local() = myId;
120         slot_id.local() = current_index;
121     }
122     void on_scheduler_exit( bool /*is_worker*/ ) override {
123         CHECK_MESSAGE(local_id.local() == myId, "nesting of arenas is broken");
124         CHECK(slot_id.local() == tbb::this_task_arena::current_thread_index());
125         slot_id.local() = -2;
126         local_id.local() = old_id.local();
127         old_id.local() = 0;
128     }
129 public:
130     ArenaObserver(tbb::task_arena &a, int maxConcurrency, int numReservedSlots, int id)
131         : tbb::task_scheduler_observer(a)
132         , myId(id)
133         , myMaxConcurrency(maxConcurrency)
134         , myNumReservedSlots(numReservedSlots) {
135         CHECK(myId);
136         observe(true);
137     }
138     ~ArenaObserver () {
139         CHECK_MESSAGE(!old_id.local(), "inconsistent observer state");
140     }
141 };
142 
143 struct IndexTrackingBody { // Must be used together with ArenaObserver
144     void operator() ( const Range& ) const {
145         CHECK(slot_id.local() == tbb::this_task_arena::current_thread_index());
146         for ( volatile int i = 0; i < 50000; ++i )
147             ;
148     }
149 };
150 
151 struct AsynchronousWork {
152     utils::SpinBarrier &my_barrier;
153     bool my_is_blocking;
154     AsynchronousWork(utils::SpinBarrier &a_barrier, bool blocking = true)
155     : my_barrier(a_barrier), my_is_blocking(blocking) {}
156     void operator()() const {
157         CHECK_MESSAGE(local_id.local() != 0, "not in explicit arena");
158         tbb::parallel_for(Range(0,500), IndexTrackingBody(), tbb::simple_partitioner());
159         if(my_is_blocking) my_barrier.wait(); // must be asynchronous to master thread
160         else my_barrier.signalNoWait();
161     }
162 };
163 
164 //-----------------------------------------------------------------------------------------//
165 
166 // Test that task_arenas might be created and used from multiple application threads.
167 // Also tests arena observers. The parameter p is the index of an app thread running this test.
168 void TestConcurrentArenasFunc(int idx) {
169     // A regression test for observer activation order:
170     // check that arena observer can be activated before local observer
171     struct LocalObserver : public tbb::task_scheduler_observer {
172         LocalObserver() : tbb::task_scheduler_observer() { observe(true); }
173     };
174     tbb::task_arena a1;
175     a1.initialize(1,0);
176     ArenaObserver o1(a1, 1, 0, idx*2+1); // the last argument is a "unique" observer/arena id for the test
177     CHECK_MESSAGE(o1.is_observing(), "Arena observer has not been activated");
178     LocalObserver lo;
179     CHECK_MESSAGE(lo.is_observing(), "Local observer has not been activated");
180     tbb::task_arena a2(2,1);
181     ArenaObserver o2(a2, 2, 1, idx*2+2);
182     CHECK_MESSAGE(o2.is_observing(), "Arena observer has not been activated");
183     utils::SpinBarrier barrier(2);
184     AsynchronousWork work(barrier);
185     a1.enqueue(work); // put async work
186     barrier.wait();
187     a2.enqueue(work); // another work
188     a2.execute(work);
189     a1.debug_wait_until_empty();
190     a2.debug_wait_until_empty();
191 }
192 
193 void TestConcurrentArenas(int p) {
194     // TODO REVAMP fix with global control
195     ResetTLS();
196     utils::NativeParallelFor( p, &TestConcurrentArenasFunc );
197 }
198 //--------------------------------------------------//
199 // Test multiple application threads working with a single arena at the same time.
200 class MultipleMastersPart1 : utils::NoAssign {
201     tbb::task_arena &my_a;
202     utils::SpinBarrier &my_b1, &my_b2;
203 public:
204     MultipleMastersPart1( tbb::task_arena &a, utils::SpinBarrier &b1, utils::SpinBarrier &b2)
205         : my_a(a), my_b1(b1), my_b2(b2) {}
206     void operator()(int) const {
207         my_a.execute(AsynchronousWork(my_b2, /*blocking=*/false));
208         my_b1.wait();
209         // A regression test for bugs 1954 & 1971
210         my_a.enqueue(AsynchronousWork(my_b2, /*blocking=*/false));
211     }
212 };
213 
214 class MultipleMastersPart2 : utils::NoAssign {
215     tbb::task_arena &my_a;
216     utils::SpinBarrier &my_b;
217 public:
218     MultipleMastersPart2( tbb::task_arena &a, utils::SpinBarrier &b) : my_a(a), my_b(b) {}
219     void operator()(int) const {
220         my_a.execute(AsynchronousWork(my_b, /*blocking=*/false));
221     }
222 };
223 
224 class MultipleMastersPart3 : utils::NoAssign {
225     tbb::task_arena &my_a;
226     utils::SpinBarrier &my_b;
227     using wait_context = tbb::detail::d1::wait_context;
228 
229     struct Runner : NoAssign {
230         wait_context& myWait;
231         Runner(wait_context& w) : myWait(w) {}
232         void operator()() const {
233             for ( volatile int i = 0; i < 10000; ++i )
234                 ;
235             myWait.release();
236         }
237     };
238 
239     struct Waiter : NoAssign {
240         wait_context& myWait;
241         Waiter(wait_context& w) : myWait(w) {}
242         void operator()() const {
243             tbb::task_group_context ctx;
244             tbb::detail::d1::wait(myWait, ctx);
245         }
246     };
247 
248 public:
249     MultipleMastersPart3(tbb::task_arena &a, utils::SpinBarrier &b)
250         : my_a(a), my_b(b) {}
251     void operator()(int) const {
252         wait_context wait(0);
253         my_b.wait(); // increases chances for task_arena initialization contention
254         for( int i=0; i<100; ++i) {
255             wait.reserve();
256             my_a.enqueue(Runner(wait));
257             my_a.execute(Waiter(wait));
258         }
259         my_b.wait();
260     }
261 };
262 
263 void TestMultipleMasters(int p) {
264     {
265         ResetTLS();
266         tbb::task_arena a(1,0);
267         a.initialize();
268         ArenaObserver o(a, 1, 0, 1);
269         utils::SpinBarrier barrier1(p), barrier2(2*p+1); // each of p threads will submit two tasks signaling the barrier
270         NativeParallelFor( p, MultipleMastersPart1(a, barrier1, barrier2) );
271         barrier2.wait();
272         a.debug_wait_until_empty();
273     } {
274         ResetTLS();
275         tbb::task_arena a(2,1);
276         ArenaObserver o(a, 2, 1, 2);
277         utils::SpinBarrier barrier(p+2);
278         a.enqueue(AsynchronousWork(barrier, /*blocking=*/true)); // occupy the worker, a regression test for bug 1981
279         // TODO: buggy test. A worker threads need time to occupy the slot to prevent a master from taking an enqueue task.
280         utils::Sleep(10);
281         NativeParallelFor( p, MultipleMastersPart2(a, barrier) );
282         barrier.wait();
283         a.debug_wait_until_empty();
284     } {
285         // Regression test for the bug 1981 part 2 (task_arena::execute() with wait_for_all for an enqueued task)
286         tbb::task_arena a(p,1);
287         utils::SpinBarrier barrier(p+1); // for masters to avoid endless waiting at least in some runs
288         // "Oversubscribe" the arena by 1 master thread
289         NativeParallelFor( p+1, MultipleMastersPart3(a, barrier) );
290         a.debug_wait_until_empty();
291     }
292 }
293 
294 //--------------------------------------------------//
295 // TODO: explain what TestArenaEntryConsistency does
296 #include <sstream>
297 #include <stdexcept>
298 #include "oneapi/tbb/detail/_exception.h"
299 #include "common/fp_control.h"
300 
301 struct TestArenaEntryBody : FPModeContext {
302     std::atomic<int> &my_stage; // each execute increases it
303     std::stringstream my_id;
304     bool is_caught, is_expected;
305     enum { arenaFPMode = 1 };
306 
307     TestArenaEntryBody(std::atomic<int> &s, int idx, int i)  // init thread-specific instance
308     :   FPModeContext(idx+i)
309     ,   my_stage(s)
310     ,   is_caught(false)
311 #if TBB_USE_EXCEPTIONS
312     ,   is_expected( (idx&(1<<i)) != 0 )
313 #else
314     , is_expected(false)
315 #endif
316     {
317         my_id << idx << ':' << i << '@';
318     }
319     void operator()() { // inside task_arena::execute()
320         // synchronize with other stages
321         int stage = my_stage++;
322         int slot = tbb::this_task_arena::current_thread_index();
323         CHECK(slot >= 0);
324         CHECK(slot <= 1);
325         // wait until the third stage is delegated and then starts on slot 0
326         while(my_stage < 2+slot) std::this_thread::yield();
327         // deduct its entry type and put it into id, it helps to find source of a problem
328         my_id << (stage < 3 ? (tbb::this_task_arena::current_thread_index()?
329                               "delegated_to_worker" : stage < 2? "direct" : "delegated_to_master")
330                             : stage == 3? "nested_same_ctx" : "nested_alien_ctx");
331         AssertFPMode(arenaFPMode);
332         if (is_expected) {
333             TBB_TEST_THROW(std::logic_error(my_id.str()));
334         }
335         // no code can be put here since exceptions can be thrown
336     }
337     void on_exception(const char *e) { // outside arena, in catch block
338         is_caught = true;
339         CHECK(my_id.str() == e);
340         assertFPMode();
341     }
342     void after_execute() { // outside arena and catch block
343         CHECK(is_caught == is_expected);
344         assertFPMode();
345     }
346 };
347 
348 class ForEachArenaEntryBody : utils::NoAssign {
349     tbb::task_arena &my_a; // expected task_arena(2,1)
350     std::atomic<int> &my_stage; // each execute increases it
351     int my_idx;
352 
353 public:
354     ForEachArenaEntryBody(tbb::task_arena &a, std::atomic<int> &c)
355     : my_a(a), my_stage(c), my_idx(0) {}
356 
357     void test(int idx) {
358         my_idx = idx;
359         my_stage = 0;
360         NativeParallelFor(3, *this); // test cross-arena calls
361         CHECK(my_stage == 3);
362         my_a.execute(*this); // test nested calls
363         CHECK(my_stage == 5);
364     }
365 
366     // task_arena functor for nested tests
367     void operator()() const {
368         test_arena_entry(3); // in current task group context
369         tbb::parallel_for(4, 5, *this); // in different context
370     }
371 
372     // NativeParallelFor & parallel_for functor
373     void operator()(int i) const {
374         test_arena_entry(i);
375     }
376 
377 private:
378     void test_arena_entry(int i) const {
379         GetRoundingMode();
380         TestArenaEntryBody scoped_functor(my_stage, my_idx, i);
381         GetRoundingMode();
382 #if TBB_USE_EXCEPTIONS
383         try {
384             my_a.execute(scoped_functor);
385         } catch(std::logic_error &e) {
386             scoped_functor.on_exception(e.what());
387         } catch(...) { CHECK_MESSAGE(false, "Unexpected exception type"); }
388 #else
389         my_a.execute(scoped_functor);
390 #endif
391         scoped_functor.after_execute();
392     }
393 };
394 
395 void TestArenaEntryConsistency() {
396     tbb::task_arena a(2, 1);
397     std::atomic<int> c;
398     ForEachArenaEntryBody body(a, c);
399 
400     FPModeContext fp_scope(TestArenaEntryBody::arenaFPMode);
401     a.initialize(); // capture FP settings to arena
402     fp_scope.setNextFPMode();
403 
404     for (int i = 0; i < 100; i++) // not less than 32 = 2^5 of entry types
405         body.test(i);
406 }
407 //--------------------------------------------------
408 // Test that the requested degree of concurrency for task_arena is achieved in various conditions
409 class TestArenaConcurrencyBody : utils::NoAssign {
410     tbb::task_arena &my_a;
411     int my_max_concurrency;
412     int my_reserved_slots;
413     utils::SpinBarrier *my_barrier;
414     utils::SpinBarrier *my_worker_barrier;
415 public:
416     TestArenaConcurrencyBody( tbb::task_arena &a, int max_concurrency, int reserved_slots, utils::SpinBarrier *b = NULL, utils::SpinBarrier *wb = NULL )
417     : my_a(a), my_max_concurrency(max_concurrency), my_reserved_slots(reserved_slots), my_barrier(b), my_worker_barrier(wb) {}
418     // NativeParallelFor's functor
419     void operator()( int ) const {
420         CHECK_MESSAGE( local_id.local() == 0, "TLS was not cleaned?" );
421         local_id.local() = 1;
422         my_a.execute( *this );
423     }
424     // Arena's functor
425     void operator()() const {
426         int idx = tbb::this_task_arena::current_thread_index();
427         CHECK( idx < (my_max_concurrency > 1 ? my_max_concurrency : 2) );
428         CHECK( my_a.max_concurrency() == tbb::this_task_arena::max_concurrency() );
429         int max_arena_concurrency = tbb::this_task_arena::max_concurrency();
430         CHECK( max_arena_concurrency == my_max_concurrency );
431         if ( my_worker_barrier ) {
432             if ( local_id.local() == 1 ) {
433                 // Master thread in a reserved slot
434                 CHECK_MESSAGE( idx < my_reserved_slots, "Masters are supposed to use only reserved slots in this test" );
435             } else {
436                 // Worker thread
437                 CHECK( idx >= my_reserved_slots );
438                 my_worker_barrier->wait();
439             }
440         } else if ( my_barrier )
441             CHECK_MESSAGE( local_id.local() == 1, "Workers are not supposed to enter the arena in this test" );
442         if ( my_barrier ) my_barrier->wait();
443         else utils::Sleep( 10 );
444     }
445 };
446 
447 void TestArenaConcurrency( int p, int reserved = 0, int step = 1) {
448     for (; reserved <= p; reserved += step) {
449         tbb::task_arena a( p, reserved );
450         { // Check concurrency with worker & reserved master threads.
451             ResetTLS();
452             utils::SpinBarrier b( p );
453             utils::SpinBarrier wb( p-reserved );
454             TestArenaConcurrencyBody test( a, p, reserved, &b, &wb );
455             for ( int i = reserved; i < p; ++i )
456                 a.enqueue( test );
457             if ( reserved==1 )
458                 test( 0 ); // calls execute()
459             else
460                 utils::NativeParallelFor( reserved, test );
461             a.debug_wait_until_empty();
462         } { // Check if multiple masters alone can achieve maximum concurrency.
463             ResetTLS();
464             utils::SpinBarrier b( p );
465             utils::NativeParallelFor( p, TestArenaConcurrencyBody( a, p, reserved, &b ) );
466             a.debug_wait_until_empty();
467         } { // Check oversubscription by masters.
468 #if !_WIN32 || !_WIN64
469             // Some C++ implementations allocate 8MB stacks for std::thread on 32 bit platforms
470             // that makes impossible to create more than ~500 threads.
471             if ( !(sizeof(std::size_t) == 4 && p > 200) )
472 #endif
473 #if TBB_TEST_LOW_WORKLOAD
474             if ( p <= 16 )
475 #endif
476             {
477                 ResetTLS();
478                 utils::NativeParallelFor(2 * p, TestArenaConcurrencyBody(a, p, reserved));
479                 a.debug_wait_until_empty();
480             }
481         }
482     }
483 }
484 
485 void TestConcurrentFunctionality(int min_thread_num = 1, int max_thread_num = 3) {
486     InitializeAndTerminate(max_thread_num);
487     for (int p = min_thread_num; p <= max_thread_num; ++p) {
488         TestConcurrentArenas(p);
489         TestMultipleMasters(p);
490         TestArenaConcurrency(p);
491     }
492 }
493 
494 //--------------------------------------------------//
495 // Test creation/initialization of a task_arena that references an existing arena (aka attach).
496 // This part of the test uses the knowledge of task_arena internals
497 
498 struct TaskArenaValidator {
499     int my_slot_at_construction;
500     const tbb::task_arena& my_arena;
501     TaskArenaValidator( const tbb::task_arena& other )
502         : my_slot_at_construction(tbb::this_task_arena::current_thread_index())
503         , my_arena(other)
504     {}
505     // Inspect the internal state
506     int concurrency() { return my_arena.debug_max_concurrency(); }
507     int reserved_for_masters() { return my_arena.debug_reserved_slots(); }
508 
509     // This method should be called in task_arena::execute() for a captured arena
510     // by the same thread that created the validator.
511     void operator()() {
512         CHECK_MESSAGE( tbb::this_task_arena::current_thread_index()==my_slot_at_construction,
513                 "Current thread index has changed since the validator construction" );
514     }
515 };
516 
517 void ValidateAttachedArena( tbb::task_arena& arena, bool expect_activated,
518                             int expect_concurrency, int expect_masters ) {
519     CHECK_MESSAGE( arena.is_active()==expect_activated, "Unexpected activation state" );
520     if( arena.is_active() ) {
521         TaskArenaValidator validator( arena );
522         CHECK_MESSAGE( validator.concurrency()==expect_concurrency, "Unexpected arena size" );
523         CHECK_MESSAGE( validator.reserved_for_masters()==expect_masters, "Unexpected # of reserved slots" );
524         if ( tbb::this_task_arena::current_thread_index() != tbb::task_arena::not_initialized ) {
525             CHECK(tbb::this_task_arena::current_thread_index() >= 0);
526             // for threads already in arena, check that the thread index remains the same
527             arena.execute( validator );
528         } else { // not_initialized
529             // Test the deprecated method
530             CHECK(tbb::this_task_arena::current_thread_index() == -1);
531         }
532 
533         // Ideally, there should be a check for having the same internal arena object,
534         // but that object is not easily accessible for implicit arenas.
535     }
536 }
537 
538 struct TestAttachBody : utils::NoAssign {
539     static thread_local int my_idx; // safe to modify and use within the NativeParallelFor functor
540     const int maxthread;
541     TestAttachBody( int max_thr ) : maxthread(max_thr) {}
542 
543     // The functor body for NativeParallelFor
544     void operator()( int idx ) const {
545         my_idx = idx;
546 
547         int default_threads = tbb::this_task_arena::max_concurrency();
548 
549         tbb::task_arena arena = tbb::task_arena( tbb::task_arena::attach() );
550         ValidateAttachedArena( arena, false, -1, -1 ); // Nothing yet to attach to
551 
552         arena.terminate();
553         ValidateAttachedArena( arena, false, -1, -1 );
554 
555         // attach to an auto-initialized arena
556         tbb::parallel_for(0, 1, [](int) {});
557 
558         tbb::task_arena arena2 = tbb::task_arena( tbb::task_arena::attach() );
559         ValidateAttachedArena( arena2, true, default_threads, 1 );
560 
561         // attach to another task_arena
562         arena.initialize( maxthread, std::min(maxthread,idx) );
563         arena.execute( *this );
564     }
565 
566     // The functor body for task_arena::execute above
567     void operator()() const {
568         tbb::task_arena arena2 = tbb::task_arena( tbb::task_arena::attach() );
569         ValidateAttachedArena( arena2, true, maxthread, std::min(maxthread,my_idx) );
570     }
571 
572     // The functor body for tbb::parallel_for
573     void operator()( const Range& r ) const {
574         for( int i = r.begin(); i<r.end(); ++i ) {
575             tbb::task_arena arena2 = tbb::task_arena( tbb::task_arena::attach() );
576             ValidateAttachedArena( arena2, true, tbb::this_task_arena::max_concurrency(), 1 );
577         }
578     }
579 };
580 
581 thread_local int TestAttachBody::my_idx;
582 
583 void TestAttach( int maxthread ) {
584     // Externally concurrent, but no concurrency within a thread
585     utils::NativeParallelFor( std::max(maxthread,4), TestAttachBody( maxthread ) );
586     // Concurrent within the current arena; may also serve as a stress test
587     tbb::parallel_for( Range(0,10000*maxthread), TestAttachBody( maxthread ) );
588 }
589 
590 //--------------------------------------------------//
591 
592 // Test that task_arena::enqueue does not tolerate a non-const functor.
593 // TODO: can it be reworked as SFINAE-based compile-time check?
594 struct TestFunctor {
595     void operator()() { CHECK_MESSAGE( false, "Non-const operator called" ); }
596     void operator()() const { /* library requires this overload only */ }
597 };
598 
599 void TestConstantFunctorRequirement() {
600     tbb::task_arena a;
601     TestFunctor tf;
602     a.enqueue( tf );
603 }
604 
605 //--------------------------------------------------//
606 
607 #include "tbb/parallel_reduce.h"
608 #include "tbb/parallel_invoke.h"
609 
610 // Test this_task_arena::isolate
611 namespace TestIsolatedExecuteNS {
612     template <typename NestedPartitioner>
613     class NestedParFor : utils::NoAssign {
614     public:
615         NestedParFor() {}
616         void operator()() const {
617             NestedPartitioner p;
618             tbb::parallel_for( 0, 10, utils::DummyBody( 10 ), p );
619         }
620     };
621 
622     template <typename NestedPartitioner>
623     class ParForBody : utils::NoAssign {
624         bool myOuterIsolation;
625         tbb::enumerable_thread_specific<int> &myEts;
626         std::atomic<bool> &myIsStolen;
627     public:
628         ParForBody( bool outer_isolation, tbb::enumerable_thread_specific<int> &ets, std::atomic<bool> &is_stolen )
629             : myOuterIsolation( outer_isolation ), myEts( ets ), myIsStolen( is_stolen ) {}
630         void operator()( int ) const {
631             int &e = myEts.local();
632             if ( e++ > 0 ) myIsStolen = true;
633             if ( myOuterIsolation )
634                 NestedParFor<NestedPartitioner>()();
635             else
636                 tbb::this_task_arena::isolate( NestedParFor<NestedPartitioner>() );
637             --e;
638         }
639     };
640 
641     template <typename OuterPartitioner, typename NestedPartitioner>
642     class OuterParFor : utils::NoAssign {
643         bool myOuterIsolation;
644         std::atomic<bool> &myIsStolen;
645     public:
646         OuterParFor( bool outer_isolation, std::atomic<bool> &is_stolen ) : myOuterIsolation( outer_isolation ), myIsStolen( is_stolen ) {}
647         void operator()() const {
648             tbb::enumerable_thread_specific<int> ets( 0 );
649             OuterPartitioner p;
650             tbb::parallel_for( 0, 1000, ParForBody<NestedPartitioner>( myOuterIsolation, ets, myIsStolen ), p );
651         }
652     };
653 
654     template <typename OuterPartitioner, typename NestedPartitioner>
655     void TwoLoopsTest( bool outer_isolation ) {
656         std::atomic<bool> is_stolen;
657         is_stolen = false;
658         const int max_repeats = 100;
659         if ( outer_isolation ) {
660             for ( int i = 0; i <= max_repeats; ++i ) {
661                 tbb::this_task_arena::isolate( OuterParFor<OuterPartitioner, NestedPartitioner>( outer_isolation, is_stolen ) );
662                 if ( is_stolen ) break;
663             }
664             // TODO: was ASSERT_WARNING
665             if (!is_stolen) {
666                 REPORT("Warning: isolate() should not block stealing on nested levels without isolation\n");
667             }
668         } else {
669             for ( int i = 0; i <= max_repeats; ++i ) {
670                 OuterParFor<OuterPartitioner, NestedPartitioner>( outer_isolation, is_stolen )();
671             }
672             REQUIRE_MESSAGE( !is_stolen, "isolate() on nested levels should prevent stealing from outer leves" );
673         }
674     }
675 
676     void TwoLoopsTest( bool outer_isolation ) {
677         TwoLoopsTest<tbb::simple_partitioner, tbb::simple_partitioner>( outer_isolation );
678         TwoLoopsTest<tbb::simple_partitioner, tbb::affinity_partitioner>( outer_isolation );
679         TwoLoopsTest<tbb::affinity_partitioner, tbb::simple_partitioner>( outer_isolation );
680         TwoLoopsTest<tbb::affinity_partitioner, tbb::affinity_partitioner>( outer_isolation );
681     }
682 
683     void TwoLoopsTest() {
684         TwoLoopsTest( true );
685         TwoLoopsTest( false );
686     }
687     //--------------------------------------------------//
688     class HeavyMixTestBody : utils::NoAssign {
689         tbb::enumerable_thread_specific<utils::FastRandom<>>& myRandom;
690         tbb::enumerable_thread_specific<int>& myIsolatedLevel;
691         int myNestedLevel;
692 
693         template <typename Partitioner, typename Body>
694         static void RunTwoBodies( utils::FastRandom<>& rnd, const Body &body, Partitioner& p, tbb::task_group_context* ctx = NULL ) {
695             if ( rnd.get() % 2 ) {
696                 if  (ctx )
697                     tbb::parallel_for( 0, 2, body, p, *ctx );
698                 else
699                     tbb::parallel_for( 0, 2, body, p );
700             } else {
701                 tbb::parallel_invoke( body, body );
702             }
703         }
704 
705         template <typename Partitioner>
706         class IsolatedBody : utils::NoAssign {
707             const HeavyMixTestBody &myHeavyMixTestBody;
708             Partitioner &myPartitioner;
709         public:
710             IsolatedBody( const HeavyMixTestBody &body, Partitioner &partitioner )
711                 : myHeavyMixTestBody( body ), myPartitioner( partitioner ) {}
712             void operator()() const {
713                 RunTwoBodies( myHeavyMixTestBody.myRandom.local(),
714                     HeavyMixTestBody( myHeavyMixTestBody.myRandom, myHeavyMixTestBody.myIsolatedLevel,
715                         myHeavyMixTestBody.myNestedLevel + 1 ),
716                     myPartitioner );
717             }
718         };
719 
720         template <typename Partitioner>
721         void RunNextLevel( utils::FastRandom<>& rnd, int &isolated_level ) const {
722             Partitioner p;
723             switch ( rnd.get() % 2 ) {
724                 case 0: {
725                     // No features
726                     tbb::task_group_context ctx;
727                     RunTwoBodies( rnd, HeavyMixTestBody(myRandom, myIsolatedLevel, myNestedLevel + 1), p, &ctx );
728                     break;
729                 }
730                 case 1: {
731                     // Isolation
732                     int previous_isolation = isolated_level;
733                     isolated_level = myNestedLevel;
734                     tbb::this_task_arena::isolate( IsolatedBody<Partitioner>( *this, p ) );
735                     isolated_level = previous_isolation;
736                     break;
737                 }
738             }
739         }
740     public:
741         HeavyMixTestBody( tbb::enumerable_thread_specific<utils::FastRandom<>>& random,
742             tbb::enumerable_thread_specific<int>& isolated_level, int nested_level )
743             : myRandom( random ), myIsolatedLevel( isolated_level )
744             , myNestedLevel( nested_level ) {}
745         void operator()() const {
746             int &isolated_level = myIsolatedLevel.local();
747             REQUIRE_MESSAGE( myNestedLevel > isolated_level, "The outer-level task should not be stolen on isolated level" );
748             if ( myNestedLevel == 20 )
749                 return;
750             utils::FastRandom<>& rnd = myRandom.local();
751             if ( rnd.get() % 2 == 1 ) {
752                 RunNextLevel<tbb::auto_partitioner>( rnd, isolated_level );
753             } else {
754                 RunNextLevel<tbb::affinity_partitioner>( rnd, isolated_level );
755             }
756         }
757         void operator()(int) const {
758             this->operator()();
759         }
760     };
761 
762     struct RandomInitializer {
763         utils::FastRandom<> operator()() {
764             return utils::FastRandom<>( tbb::this_task_arena::current_thread_index() );
765         }
766     };
767 
768     void HeavyMixTest() {
769         std::size_t num_threads = tbb::this_task_arena::max_concurrency() < 3 ? 3 : tbb::this_task_arena::max_concurrency();
770         tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, num_threads);
771 
772         RandomInitializer init_random;
773         tbb::enumerable_thread_specific<utils::FastRandom<>> random( init_random );
774         tbb::enumerable_thread_specific<int> isolated_level( 0 );
775         for ( int i = 0; i < 5; ++i ) {
776             HeavyMixTestBody b( random, isolated_level, 1 );
777             b( 0 );
778         }
779     }
780 
781     //--------------------------------------------------//
782 #if TBB_USE_EXCEPTIONS
783     struct MyException {};
784     struct IsolatedBodyThrowsException {
785         void operator()() const {
786 #if _MSC_VER && !__INTEL_COMPILER
787             // Workaround an unreachable code warning in task_arena_function.
788             volatile bool workaround = true;
789             if (workaround)
790 #endif
791             {
792                 throw MyException();
793             }
794         }
795     };
796     struct ExceptionTestBody : utils::NoAssign {
797         tbb::enumerable_thread_specific<int>& myEts;
798         std::atomic<bool>& myIsStolen;
799         ExceptionTestBody( tbb::enumerable_thread_specific<int>& ets, std::atomic<bool>& is_stolen )
800             : myEts( ets ), myIsStolen( is_stolen ) {}
801         void operator()( int i ) const {
802             try {
803                 tbb::this_task_arena::isolate( IsolatedBodyThrowsException() );
804                 REQUIRE_MESSAGE( false, "The exception has been lost" );
805             }
806             catch ( MyException ) {}
807             catch ( ... ) {
808                 REQUIRE_MESSAGE( false, "Unexpected exception" );
809             }
810             // Check that nested algorithms can steal outer-level tasks
811             int &e = myEts.local();
812             if ( e++ > 0 ) myIsStolen = true;
813             // work imbalance increases chances for stealing
814             tbb::parallel_for( 0, 10+i, utils::DummyBody( 100 ) );
815             --e;
816         }
817     };
818 
819 #endif /* TBB_USE_EXCEPTIONS */
820     void ExceptionTest() {
821 #if TBB_USE_EXCEPTIONS
822         tbb::enumerable_thread_specific<int> ets;
823         std::atomic<bool> is_stolen;
824         is_stolen = false;
825         for ( ;; ) {
826             tbb::parallel_for( 0, 1000, ExceptionTestBody( ets, is_stolen ) );
827             if ( is_stolen ) break;
828         }
829         REQUIRE_MESSAGE( is_stolen, "isolate should not affect non-isolated work" );
830 #endif /* TBB_USE_EXCEPTIONS */
831     }
832 
833     struct NonConstBody {
834         unsigned int state;
835         void operator()() {
836             state ^= ~0u;
837         }
838     };
839 
840     void TestNonConstBody() {
841         NonConstBody body;
842         body.state = 0x6c97d5ed;
843         tbb::this_task_arena::isolate(body);
844         REQUIRE_MESSAGE(body.state == 0x93682a12, "The wrong state");
845     }
846 
847     // TODO: Consider tbb::task_group instead of explicit task API.
848     class TestEnqueueTask : public tbb::detail::d1::task {
849         using wait_context = tbb::detail::d1::wait_context;
850 
851         tbb::enumerable_thread_specific<bool>& executed;
852         std::atomic<int>& completed;
853 
854     public:
855         wait_context& waiter;
856         tbb::task_arena& arena;
857         static const int N = 100;
858 
859         TestEnqueueTask(tbb::enumerable_thread_specific<bool>& exe, std::atomic<int>& c, wait_context& w, tbb::task_arena& a)
860             : executed(exe), completed(c), waiter(w), arena(a) {}
861 
862         tbb::detail::d1::task* execute(tbb::detail::d1::execution_data&) override {
863             for (int i = 0; i < N; ++i) {
864                 arena.enqueue([&]() {
865                     executed.local() = true;
866                     ++completed;
867                     for (volatile int j = 0; j < 100; j++) std::this_thread::yield();
868                     waiter.release(1);
869                 });
870             }
871             return nullptr;
872         }
873         tbb::detail::d1::task* cancel(tbb::detail::d1::execution_data&) override { return nullptr; }
874     };
875 
876     class TestEnqueueIsolateBody : utils::NoCopy {
877         tbb::enumerable_thread_specific<bool>& executed;
878         std::atomic<int>& completed;
879         tbb::task_arena& arena;
880     public:
881         static const int N = 100;
882 
883         TestEnqueueIsolateBody(tbb::enumerable_thread_specific<bool>& exe, std::atomic<int>& c, tbb::task_arena& a)
884             : executed(exe), completed(c), arena(a) {}
885         void operator()() {
886             tbb::task_group_context ctx;
887             tbb::detail::d1::wait_context waiter(N);
888 
889             TestEnqueueTask root(executed, completed, waiter, arena);
890             tbb::detail::d1::execute_and_wait(root, ctx, waiter, ctx);
891         }
892     };
893 
894     void TestEnqueue() {
895         tbb::enumerable_thread_specific<bool> executed(false);
896         std::atomic<int> completed;
897         tbb::task_arena arena = tbb::task_arena(tbb::task_arena::attach());
898 
899         // Check that the main thread can process enqueued tasks.
900         completed = 0;
901         TestEnqueueIsolateBody b1(executed, completed, arena);
902         b1();
903 
904         if (!executed.local()) {
905             REPORT("Warning: No one enqueued task has executed by the main thread.\n");
906         }
907 
908         executed.local() = false;
909         completed = 0;
910         const int N = 100;
911         // Create enqueued tasks out of isolation.
912 
913         tbb::task_group_context ctx;
914         tbb::detail::d1::wait_context waiter(N);
915         for (int i = 0; i < N; ++i) {
916             arena.enqueue([&]() {
917                 executed.local() = true;
918                 ++completed;
919                 std::this_thread::yield();
920                 waiter.release(1);
921             });
922         }
923         TestEnqueueIsolateBody b2(executed, completed, arena);
924         tbb::this_task_arena::isolate(b2);
925         REQUIRE_MESSAGE(executed.local() == false, "An enqueued task was executed within isolate.");
926 
927         tbb::detail::d1::wait(waiter, ctx);
928         // while (completed < TestEnqueueTask::N + N) std::this_thread::yield();
929     }
930 }
931 
932 void TestIsolatedExecute() {
933     // At least 3 threads (owner + 2 thieves) are required to reproduce a situation when the owner steals outer
934     // level task on a nested level. If we have only one thief then it will execute outer level tasks first and
935     // the owner will not have a possibility to steal outer level tasks.
936     int platform_max_thread = tbb::this_task_arena::max_concurrency();
937     int num_threads = utils::min( platform_max_thread, 3 );
938     {
939         // Too many threads require too many work to reproduce the stealing from outer level.
940         tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, utils::max(num_threads, 7));
941         TestIsolatedExecuteNS::TwoLoopsTest();
942         TestIsolatedExecuteNS::HeavyMixTest();
943         TestIsolatedExecuteNS::ExceptionTest();
944     }
945     tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, num_threads);
946     TestIsolatedExecuteNS::HeavyMixTest();
947     TestIsolatedExecuteNS::TestNonConstBody();
948     TestIsolatedExecuteNS::TestEnqueue();
949 }
950 
951 //-----------------------------------------------------------------------------------------//
952 
953 class TestDelegatedSpawnWaitBody : utils::NoAssign {
954     tbb::task_arena &my_a;
955     utils::SpinBarrier &my_b1, &my_b2;
956 public:
957     TestDelegatedSpawnWaitBody( tbb::task_arena &a, utils::SpinBarrier &b1, utils::SpinBarrier &b2)
958         : my_a(a), my_b1(b1), my_b2(b2) {}
959     // NativeParallelFor's functor
960     void operator()(int idx) const {
961         if ( idx==0 ) { // thread 0 works in the arena, thread 1 waits for it (to prevent test hang)
962             for (int i = 0; i < 2; ++i) {
963                 my_a.enqueue([this] { my_b1.wait(); }); // tasks to sync with workers
964             }
965             tbb::task_group tg;
966             my_b1.wait(); // sync with the workers
967             for( int i=0; i<100000; ++i) {
968                 my_a.execute([&tg] { tg.run([] {}); });
969             }
970             my_a.execute([&tg] {tg.wait(); });
971         }
972 
973         my_b2.wait(); // sync both threads
974     }
975 };
976 
977 void TestDelegatedSpawnWait() {
978     // Regression test for a bug with missed wakeup notification from a delegated task
979     tbb::task_arena a(2,0);
980     a.initialize();
981     utils::SpinBarrier barrier1(3), barrier2(2);
982     utils::NativeParallelFor( 2, TestDelegatedSpawnWaitBody(a, barrier1, barrier2) );
983     a.debug_wait_until_empty();
984 }
985 
986 //-----------------------------------------------------------------------------------------//
987 
988 class TestMultipleWaitsArenaWait : utils::NoAssign {
989     using wait_context = tbb::detail::d1::wait_context;
990 public:
991     TestMultipleWaitsArenaWait( int idx, int bunch_size, int num_tasks, std::vector<wait_context*>& waiters, std::atomic<int>& processed, tbb::task_group_context& tgc )
992         : my_idx( idx ), my_bunch_size( bunch_size ), my_num_tasks(num_tasks), my_waiters( waiters ), my_processed( processed ), my_context(tgc) {}
993     void operator()() const {
994         ++my_processed;
995         // Wait for all tasks
996         if ( my_idx < my_num_tasks ) {
997             tbb::detail::d1::wait(*my_waiters[my_idx], my_context);
998         }
999         // Signal waiting tasks
1000         if ( my_idx >= my_bunch_size ) {
1001             my_waiters[my_idx-my_bunch_size]->release();
1002         }
1003     }
1004 private:
1005     int my_idx;
1006     int my_bunch_size;
1007     int my_num_tasks;
1008     std::vector<wait_context*>& my_waiters;
1009     std::atomic<int>& my_processed;
1010     tbb::task_group_context& my_context;
1011 };
1012 
1013 class TestMultipleWaitsThreadBody : utils::NoAssign {
1014     using wait_context = tbb::detail::d1::wait_context;
1015 public:
1016     TestMultipleWaitsThreadBody( int bunch_size, int num_tasks, tbb::task_arena& a, std::vector<wait_context*>& waiters, std::atomic<int>& processed, tbb::task_group_context& tgc )
1017         : my_bunch_size( bunch_size ), my_num_tasks( num_tasks ), my_arena( a ), my_waiters( waiters ), my_processed( processed ), my_context(tgc) {}
1018     void operator()( int idx ) const {
1019         my_arena.execute( TestMultipleWaitsArenaWait( idx, my_bunch_size, my_num_tasks, my_waiters, my_processed, my_context ) );
1020         --my_processed;
1021     }
1022 private:
1023     int my_bunch_size;
1024     int my_num_tasks;
1025     tbb::task_arena& my_arena;
1026     std::vector<wait_context*>& my_waiters;
1027     std::atomic<int>& my_processed;
1028     tbb::task_group_context& my_context;
1029 };
1030 
1031 void TestMultipleWaits( int num_threads, int num_bunches, int bunch_size ) {
1032     tbb::task_arena a( num_threads );
1033     const int num_tasks = (num_bunches-1)*bunch_size;
1034 
1035     tbb::task_group_context tgc;
1036     std::vector<tbb::detail::d1::wait_context*> waiters(num_tasks);
1037     for (auto& w : waiters) w = new tbb::detail::d1::wait_context(0);
1038 
1039     std::atomic<int> processed(0);
1040     for ( int repeats = 0; repeats<10; ++repeats ) {
1041         int idx = 0;
1042         for ( int bunch = 0; bunch < num_bunches-1; ++bunch ) {
1043             // Sync with the previous bunch of waiters to prevent "false" nested dependicies (when a nested task waits for an outer task).
1044             while ( processed < bunch*bunch_size ) std::this_thread::yield();
1045             // Run the bunch of threads/waiters that depend on the next bunch of threads/waiters.
1046             for ( int i = 0; i<bunch_size; ++i ) {
1047                 waiters[idx]->reserve();
1048                 std::thread( TestMultipleWaitsThreadBody( bunch_size, num_tasks, a, waiters, processed, tgc ), idx++ ).detach();
1049             }
1050         }
1051         // No sync because the threads of the last bunch do not call wait_for_all.
1052         // Run the last bunch of threads.
1053         for ( int i = 0; i<bunch_size; ++i )
1054             std::thread( TestMultipleWaitsThreadBody( bunch_size, num_tasks, a, waiters, processed, tgc ), idx++ ).detach();
1055         while ( processed ) std::this_thread::yield();
1056     }
1057     for (auto w : waiters) delete w;
1058 }
1059 
1060 void TestMultipleWaits() {
1061     // Limit the number of threads to prevent heavy oversubscription.
1062 #if TBB_TEST_LOW_WORKLOAD
1063     const int max_threads = std::min( 4, tbb::this_task_arena::max_concurrency() );
1064 #else
1065     const int max_threads = std::min( 16, tbb::this_task_arena::max_concurrency() );
1066 #endif
1067 
1068     utils::FastRandom<> rnd(1234);
1069     for ( int threads = 1; threads <= max_threads; threads += utils::max( threads/2, 1 ) ) {
1070         for ( int i = 0; i<3; ++i ) {
1071             const int num_bunches = 3 + rnd.get()%3;
1072             const int bunch_size = max_threads + rnd.get()%max_threads;
1073             TestMultipleWaits( threads, num_bunches, bunch_size );
1074         }
1075     }
1076 }
1077 
1078 //--------------------------------------------------//
1079 
1080 void TestSmallStackSize() {
1081     tbb::global_control gc(tbb::global_control::thread_stack_size,
1082             tbb::global_control::active_value(tbb::global_control::thread_stack_size) / 2 );
1083     // The test produces the warning (not a error) if fails. So the test is run many times
1084     // to make the log annoying (to force to consider it as an error).
1085     for (int i = 0; i < 100; ++i) {
1086         tbb::task_arena a;
1087         a.initialize();
1088     }
1089 }
1090 
1091 //--------------------------------------------------//
1092 
1093 namespace TestMoveSemanticsNS {
1094     struct TestFunctor {
1095         void operator()() const {};
1096     };
1097 
1098     struct MoveOnlyFunctor : utils::MoveOnly, TestFunctor {
1099         MoveOnlyFunctor() : utils::MoveOnly() {};
1100         MoveOnlyFunctor(MoveOnlyFunctor&& other) : utils::MoveOnly(std::move(other)) {};
1101     };
1102 
1103     struct MovePreferableFunctor : utils::Movable, TestFunctor {
1104         MovePreferableFunctor() : utils::Movable() {};
1105         MovePreferableFunctor(MovePreferableFunctor&& other) : utils::Movable( std::move(other) ) {};
1106         MovePreferableFunctor(const MovePreferableFunctor& other) : utils::Movable(other) {};
1107     };
1108 
1109     struct NoMoveNoCopyFunctor : utils::NoCopy, TestFunctor {
1110         NoMoveNoCopyFunctor() : utils::NoCopy() {};
1111         // mv ctor is not allowed as cp ctor from parent NoCopy
1112     private:
1113         NoMoveNoCopyFunctor(NoMoveNoCopyFunctor&&);
1114     };
1115 
1116     void TestFunctors() {
1117         tbb::task_arena ta;
1118         MovePreferableFunctor mpf;
1119         // execute() doesn't have any copies or moves of arguments inside the impl
1120         ta.execute( NoMoveNoCopyFunctor() );
1121 
1122         ta.enqueue( MoveOnlyFunctor() );
1123         ta.enqueue( mpf );
1124         REQUIRE_MESSAGE(mpf.alive, "object was moved when was passed by lval");
1125         mpf.Reset();
1126         ta.enqueue( std::move(mpf) );
1127         REQUIRE_MESSAGE(!mpf.alive, "object was copied when was passed by rval");
1128         mpf.Reset();
1129     }
1130 }
1131 
1132 void TestMoveSemantics() {
1133     TestMoveSemanticsNS::TestFunctors();
1134 }
1135 
1136 //--------------------------------------------------//
1137 
1138 #include <vector>
1139 
1140 #include "common/state_trackable.h"
1141 
1142 namespace TestReturnValueNS {
1143     struct noDefaultTag {};
1144     class ReturnType : public StateTrackable<> {
1145         static const int SIZE = 42;
1146         std::vector<int> data;
1147     public:
1148         ReturnType(noDefaultTag) : StateTrackable<>(0) {}
1149         // Define copy constructor to test that it is never called
1150         ReturnType(const ReturnType& r) : StateTrackable<>(r), data(r.data) {}
1151         ReturnType(ReturnType&& r) : StateTrackable<>(std::move(r)), data(std::move(r.data)) {}
1152 
1153         void fill() {
1154             for (int i = 0; i < SIZE; ++i)
1155                 data.push_back(i);
1156         }
1157         void check() {
1158             REQUIRE(data.size() == unsigned(SIZE));
1159             for (int i = 0; i < SIZE; ++i)
1160                 REQUIRE(data[i] == i);
1161             StateTrackableCounters::counters_type& cnts = StateTrackableCounters::counters;
1162             REQUIRE(cnts[StateTrackableBase::DefaultInitialized] == 0);
1163             REQUIRE(cnts[StateTrackableBase::DirectInitialized] == 1);
1164             std::size_t copied = cnts[StateTrackableBase::CopyInitialized];
1165             std::size_t moved = cnts[StateTrackableBase::MoveInitialized];
1166             REQUIRE(cnts[StateTrackableBase::Destroyed] == copied + moved);
1167             // The number of copies/moves should not exceed 3: function return, store to an internal storage,
1168             // acquire internal storage.
1169             REQUIRE((copied == 0 && moved <=3));
1170         }
1171     };
1172 
1173     template <typename R>
1174     R function() {
1175         noDefaultTag tag;
1176         R r(tag);
1177         r.fill();
1178         return r;
1179     }
1180 
1181     template <>
1182     void function<void>() {}
1183 
1184     template <typename R>
1185     struct Functor {
1186         R operator()() const {
1187             return function<R>();
1188         }
1189     };
1190 
1191     tbb::task_arena& arena() {
1192         static tbb::task_arena a;
1193         return a;
1194     }
1195 
1196     template <typename F>
1197     void TestExecute(F &f) {
1198         StateTrackableCounters::reset();
1199         ReturnType r = arena().execute(f);
1200         r.check();
1201     }
1202 
1203     template <typename F>
1204     void TestExecute(const F &f) {
1205         StateTrackableCounters::reset();
1206         ReturnType r = arena().execute(f);
1207         r.check();
1208     }
1209     template <typename F>
1210     void TestIsolate(F &f) {
1211         StateTrackableCounters::reset();
1212         ReturnType r = tbb::this_task_arena::isolate(f);
1213         r.check();
1214     }
1215 
1216     template <typename F>
1217     void TestIsolate(const F &f) {
1218         StateTrackableCounters::reset();
1219         ReturnType r = tbb::this_task_arena::isolate(f);
1220         r.check();
1221     }
1222 
1223     void Test() {
1224         TestExecute(Functor<ReturnType>());
1225         Functor<ReturnType> f1;
1226         TestExecute(f1);
1227         TestExecute(function<ReturnType>);
1228 
1229         arena().execute(Functor<void>());
1230         Functor<void> f2;
1231         arena().execute(f2);
1232         arena().execute(function<void>);
1233         TestIsolate(Functor<ReturnType>());
1234         TestIsolate(f1);
1235         TestIsolate(function<ReturnType>);
1236         tbb::this_task_arena::isolate(Functor<void>());
1237         tbb::this_task_arena::isolate(f2);
1238         tbb::this_task_arena::isolate(function<void>);
1239     }
1240 }
1241 
1242 void TestReturnValue() {
1243     TestReturnValueNS::Test();
1244 }
1245 
1246 //--------------------------------------------------//
1247 
1248 // MyObserver checks if threads join to the same arena
1249 struct MyObserver: public tbb::task_scheduler_observer {
1250     tbb::enumerable_thread_specific<tbb::task_arena*>& my_tls;
1251     tbb::task_arena& my_arena;
1252     std::atomic<int>& my_failure_counter;
1253     std::atomic<int>& my_counter;
1254 
1255     MyObserver(tbb::task_arena& a,
1256         tbb::enumerable_thread_specific<tbb::task_arena*>& tls,
1257         std::atomic<int>& failure_counter,
1258         std::atomic<int>& counter)
1259         : tbb::task_scheduler_observer(a), my_tls(tls), my_arena(a),
1260         my_failure_counter(failure_counter), my_counter(counter) {
1261         observe(true);
1262     }
1263     void on_scheduler_entry(bool worker) override {
1264         if (worker) {
1265             ++my_counter;
1266             tbb::task_arena*& cur_arena = my_tls.local();
1267             if (cur_arena != 0 && cur_arena != &my_arena) {
1268                 ++my_failure_counter;
1269             }
1270             cur_arena = &my_arena;
1271         }
1272     }
1273 };
1274 
1275 struct MyLoopBody {
1276     utils::SpinBarrier& m_barrier;
1277     MyLoopBody(utils::SpinBarrier& b):m_barrier(b) { }
1278     void operator()(int) const {
1279         m_barrier.wait();
1280     }
1281 };
1282 
1283 struct TaskForArenaExecute {
1284     utils::SpinBarrier& m_barrier;
1285     TaskForArenaExecute(utils::SpinBarrier& b):m_barrier(b) { }
1286     void operator()() const {
1287          tbb::parallel_for(0, tbb::this_task_arena::max_concurrency(),
1288              MyLoopBody(m_barrier), tbb::simple_partitioner()
1289          );
1290     }
1291 };
1292 
1293 struct ExecuteParallelFor {
1294     int n_per_thread;
1295     int n_repetitions;
1296     std::vector<tbb::task_arena>& arenas;
1297     utils::SpinBarrier& arena_barrier;
1298     utils::SpinBarrier& master_barrier;
1299     ExecuteParallelFor(const int n_per_thread_, const int n_repetitions_,
1300         std::vector<tbb::task_arena>& arenas_,
1301         utils::SpinBarrier& arena_barrier_, utils::SpinBarrier& master_barrier_)
1302             : n_per_thread(n_per_thread_), n_repetitions(n_repetitions_), arenas(arenas_),
1303               arena_barrier(arena_barrier_), master_barrier(master_barrier_){ }
1304     void operator()(int i) const {
1305         for (int j = 0; j < n_repetitions; ++j) {
1306             arenas[i].execute(TaskForArenaExecute(arena_barrier));
1307             for(volatile int k = 0; k < n_per_thread; ++k){/* waiting until workers fall asleep */}
1308             master_barrier.wait();
1309         }
1310     }
1311 };
1312 
1313 // if n_threads == -1 then global_control initialized with default value
1314 void TestArenaWorkersMigrationWithNumThreads(int n_threads = 0) {
1315     if (n_threads == 0) {
1316         n_threads = tbb::this_task_arena::max_concurrency();
1317     }
1318     const int max_n_arenas = 8;
1319     int n_arenas = 2;
1320     if(n_threads >= 16)
1321         n_arenas = max_n_arenas;
1322     else if (n_threads >= 8)
1323         n_arenas = 4;
1324     n_threads = n_arenas * (n_threads / n_arenas);
1325     const int n_per_thread = 10000000;
1326     const int n_repetitions = 100;
1327     const int n_outer_repetitions = 20;
1328     std::multiset<float> failure_ratio; // for median calculating
1329     tbb::global_control control(tbb::global_control::max_allowed_parallelism, n_threads - (n_arenas - 1));
1330     utils::SpinBarrier master_barrier(n_arenas);
1331     utils::SpinBarrier arena_barrier(n_threads);
1332     MyObserver* observer[max_n_arenas];
1333     std::vector<tbb::task_arena> arenas(n_arenas);
1334     std::atomic<int> failure_counter;
1335     std::atomic<int> counter;
1336     tbb::enumerable_thread_specific<tbb::task_arena*> tls;
1337     for (int i = 0; i < n_arenas; ++i) {
1338         arenas[i].initialize(n_threads / n_arenas);
1339         observer[i] = new MyObserver(arenas[i], tls, failure_counter, counter);
1340     }
1341     int ii = 0;
1342     for (; ii < n_outer_repetitions; ++ii) {
1343         failure_counter = 0;
1344         counter = 0;
1345         // Main code
1346         utils::NativeParallelFor(n_arenas, ExecuteParallelFor(n_per_thread, n_repetitions,
1347             arenas, arena_barrier, master_barrier));
1348         // TODO: get rid of check below by setting ratio between n_threads and n_arenas
1349         failure_ratio.insert((counter != 0 ? float(failure_counter) / counter : 1.0f));
1350         tls.clear();
1351         // collect 3 elements in failure_ratio before calculating median
1352         if (ii > 1) {
1353             std::multiset<float>::iterator it = failure_ratio.begin();
1354             std::advance(it, failure_ratio.size() / 2);
1355             if (*it < 0.02)
1356                 break;
1357         }
1358     }
1359     for (int i = 0; i < n_arenas; ++i) {
1360         delete observer[i];
1361     }
1362     // check if median is so big
1363     std::multiset<float>::iterator it = failure_ratio.begin();
1364     std::advance(it, failure_ratio.size() / 2);
1365     // TODO: decrease constants 0.05 and 0.3 by setting ratio between n_threads and n_arenas
1366     if (*it > 0.05) {
1367         REPORT("Warning: So many cases when threads join to different arenas.\n");
1368         REQUIRE_MESSAGE(*it <= 0.3, "A lot of cases when threads join to different arenas.\n");
1369     }
1370 }
1371 
1372 void TestArenaWorkersMigration() {
1373     TestArenaWorkersMigrationWithNumThreads(4);
1374     if (tbb::this_task_arena::max_concurrency() != 4) {
1375         TestArenaWorkersMigrationWithNumThreads();
1376     }
1377 }
1378 
1379 //--------------------------------------------------//
1380 void TestDefaultCreatedWorkersAmount() {
1381     int threads = tbb::this_task_arena::max_concurrency();
1382     utils::NativeParallelFor(1, [threads](int idx) {
1383         REQUIRE_MESSAGE(idx == 0, "more than 1 thread is going to reset TLS");
1384         utils::SpinBarrier barrier(threads);
1385         ResetTLS();
1386         for (int trail = 0; trail < 10; ++trail) {
1387             tbb::parallel_for(0, threads, [threads, &barrier](int) {
1388                 REQUIRE_MESSAGE(threads == tbb::this_task_arena::max_concurrency(), "concurrency level is not equal specified threadnum");
1389                 REQUIRE_MESSAGE(tbb::this_task_arena::current_thread_index() < tbb::this_task_arena::max_concurrency(), "amount of created threads is more than specified by default");
1390                 local_id.local() = 1;
1391                 // If there is more threads than expected, 'sleep' gives a chance to join unexpected threads.
1392                 utils::Sleep(1);
1393                 barrier.wait();
1394                 }, tbb::simple_partitioner());
1395             REQUIRE_MESSAGE(local_id.size() == size_t(threads), "amount of created threads is not equal to default num");
1396         }
1397     });
1398 }
1399 
1400 void TestAbilityToCreateWorkers(int thread_num) {
1401     tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, thread_num);
1402     // Checks only some part of reserved-master threads amount:
1403     // 0 and 1 reserved threads are important cases but it is also needed
1404     // to collect some statistic data with other amount and to not consume
1405     // whole test sesion time checking each amount
1406     TestArenaConcurrency(thread_num - 1, 0, int(thread_num / 2.72));
1407     TestArenaConcurrency(thread_num, 1, int(thread_num / 3.14));
1408 }
1409 
1410 void TestDefaultWorkersLimit() {
1411     TestDefaultCreatedWorkersAmount();
1412 #if TBB_TEST_LOW_WORKLOAD
1413     TestAbilityToCreateWorkers(24);
1414 #else
1415     TestAbilityToCreateWorkers(256);
1416 #endif
1417 }
1418 
1419 #if TBB_USE_EXCEPTIONS
1420 
1421 void ExceptionInExecute() {
1422     std::size_t thread_number = utils::get_platform_max_threads();
1423     tbb::task_arena test_arena(thread_number / 2, thread_number / 2);
1424 
1425     std::atomic<int> canceled_task{};
1426 
1427     auto parallel_func = [&test_arena, &canceled_task] (std::size_t) {
1428         for (std::size_t i = 0; i < 1000; ++i) {
1429             try {
1430                 test_arena.execute([] {
1431                     volatile bool suppress_unreachable_code_warning = true;
1432                     if (suppress_unreachable_code_warning) {
1433                         throw -1;
1434                     }
1435                 });
1436                 FAIL("An exception should have thrown.");
1437             } catch (int) {
1438                 ++canceled_task;
1439             } catch (...) {
1440                 FAIL("Wrong type of exception.");
1441             }
1442         }
1443     };
1444 
1445     utils::NativeParallelFor(thread_number, parallel_func);
1446     CHECK(canceled_task == thread_number * 1000);
1447 }
1448 
1449 #endif // TBB_USE_EXCEPTIONS
1450 
1451 class simple_observer : public tbb::task_scheduler_observer {
1452     static std::atomic<int> idx_counter;
1453     int my_idx;
1454     int myMaxConcurrency;   // concurrency of the associated arena
1455     int myNumReservedSlots; // reserved slots in the associated arena
1456     void on_scheduler_entry( bool is_worker ) override {
1457         int current_index = tbb::this_task_arena::current_thread_index();
1458         CHECK(current_index < (myMaxConcurrency > 1 ? myMaxConcurrency : 2));
1459         if (is_worker) {
1460             CHECK(current_index >= myNumReservedSlots);
1461         }
1462     }
1463     void on_scheduler_exit( bool /*is_worker*/ ) override
1464     {}
1465 public:
1466     simple_observer(tbb::task_arena &a, int maxConcurrency, int numReservedSlots)
1467         : tbb::task_scheduler_observer(a), my_idx(idx_counter++)
1468         , myMaxConcurrency(maxConcurrency)
1469         , myNumReservedSlots(numReservedSlots) {
1470         observe(true);
1471     }
1472 
1473     friend bool operator<(const simple_observer& lhs, const simple_observer& rhs) {
1474         return lhs.my_idx < rhs.my_idx;
1475     }
1476 };
1477 
1478 std::atomic<int> simple_observer::idx_counter{};
1479 
1480 struct arena_handler {
1481     enum arena_status {
1482         alive,
1483         deleting,
1484         deleted
1485     };
1486 
1487     tbb::task_arena* arena;
1488 
1489     std::atomic<arena_status> status{alive};
1490     tbb::spin_rw_mutex arena_in_use{};
1491 
1492     tbb::concurrent_set<simple_observer> observers;
1493 
1494     arena_handler(tbb::task_arena* ptr) : arena(ptr)
1495     {}
1496 
1497     friend bool operator<(const arena_handler& lhs, const arena_handler& rhs) {
1498         return lhs.arena < rhs.arena;
1499     }
1500 };
1501 
1502 // TODO: Add observer operations
1503 void StressTestMixFunctionality() {
1504     enum operation_type {
1505         create_arena,
1506         delete_arena,
1507         attach_observer,
1508         detach_observer,
1509         arena_execute,
1510         enqueue_task,
1511         last_operation_marker
1512     };
1513 
1514     std::size_t operations_number = last_operation_marker;
1515     std::size_t thread_number = utils::get_platform_max_threads();
1516     utils::FastRandom<> operation_rnd(42);
1517     tbb::spin_mutex random_operation_guard;
1518 
1519     auto get_random_operation = [&operation_rnd, &random_operation_guard, operations_number] () {
1520         tbb::spin_mutex::scoped_lock lock(random_operation_guard);
1521         return static_cast<operation_type>(operation_rnd.get() % operations_number);
1522     };
1523 
1524     utils::FastRandom<> arena_rnd(42);
1525     tbb::spin_mutex random_arena_guard;
1526     auto get_random_arena = [&arena_rnd, &random_arena_guard] () {
1527         tbb::spin_mutex::scoped_lock lock(random_arena_guard);
1528         return arena_rnd.get();
1529     };
1530 
1531     tbb::concurrent_set<arena_handler> arenas_pool;
1532 
1533     std::vector<std::thread> thread_pool;
1534 
1535     utils::SpinBarrier thread_barrier(thread_number);
1536     std::size_t max_operations = 10000;
1537     std::atomic<std::size_t> curr_operation{};
1538     auto thread_func = [&] () {
1539         arenas_pool.emplace(new tbb::task_arena());
1540         thread_barrier.wait();
1541         while (curr_operation++ < max_operations) {
1542             switch (get_random_operation()) {
1543                 case create_arena :
1544                 {
1545                     arenas_pool.emplace(new tbb::task_arena());
1546                     break;
1547                 }
1548                 case delete_arena :
1549                 {
1550                     auto curr_arena = arenas_pool.begin();
1551                     for (; curr_arena != arenas_pool.end(); ++curr_arena) {
1552                         arena_handler::arena_status curr_status = arena_handler::alive;
1553                         if (curr_arena->status.compare_exchange_strong(curr_status, arena_handler::deleting)) {
1554                             break;
1555                         }
1556                     }
1557 
1558                     if (curr_arena == arenas_pool.end()) break;
1559 
1560                     tbb::spin_rw_mutex::scoped_lock lock(curr_arena->arena_in_use, /*writer*/ true);
1561 
1562                     delete curr_arena->arena;
1563                     curr_arena->status.store(arena_handler::deleted);
1564 
1565                     break;
1566                 }
1567                 case attach_observer :
1568                 {
1569                     tbb::spin_rw_mutex::scoped_lock lock{};
1570                     auto curr_arena = arenas_pool.begin();
1571                     for (; curr_arena != arenas_pool.end(); ++curr_arena) {
1572                         if (lock.try_acquire(curr_arena->arena_in_use, /*writer*/ false)) {
1573                             if (curr_arena->status == arena_handler::alive) {
1574                                 break;
1575                             } else {
1576                                 lock.release();
1577                             }
1578                         }
1579                     }
1580 
1581                     if (curr_arena == arenas_pool.end()) break;
1582 
1583                     {
1584                         curr_arena->observers.emplace(*curr_arena->arena, thread_number, 1);
1585                     }
1586 
1587                     break;
1588                 }
1589                 case detach_observer :
1590                 {
1591                     auto arena_number = get_random_arena() % arenas_pool.size();
1592                     auto curr_arena = arenas_pool.begin();
1593                     std::advance(curr_arena, arena_number);
1594 
1595                     for (auto it = curr_arena->observers.begin(); it != curr_arena->observers.end(); ++it) {
1596                         if (it->is_observing()) {
1597                             it->observe(false);
1598                             break;
1599                         }
1600                     }
1601 
1602                     break;
1603                 }
1604                 case arena_execute :
1605                 {
1606                     tbb::spin_rw_mutex::scoped_lock lock{};
1607                     auto curr_arena = arenas_pool.begin();
1608                     for (; curr_arena != arenas_pool.end(); ++curr_arena) {
1609                         if (lock.try_acquire(curr_arena->arena_in_use, /*writer*/ false)) {
1610                             if (curr_arena->status == arena_handler::alive) {
1611                                 break;
1612                             } else {
1613                                 lock.release();
1614                             }
1615                         }
1616                     }
1617 
1618                     if (curr_arena == arenas_pool.end()) break;
1619 
1620                     curr_arena->arena->execute([] () {
1621                         tbb::parallel_for(tbb::blocked_range<std::size_t>(0, 10000), [] (tbb::blocked_range<std::size_t>&) {
1622                             std::atomic<int> sum{};
1623                             // Make some work
1624                             for (; sum < 100; ++sum) ;
1625                         });
1626                     });
1627 
1628                     break;
1629                 }
1630                 case enqueue_task :
1631                 {
1632                     tbb::spin_rw_mutex::scoped_lock lock{};
1633                     auto curr_arena = arenas_pool.begin();
1634                     for (; curr_arena != arenas_pool.end(); ++curr_arena) {
1635                         if (lock.try_acquire(curr_arena->arena_in_use, /*writer*/ false)) {
1636                             if (curr_arena->status == arena_handler::alive) {
1637                                 break;
1638                             } else {
1639                                 lock.release();
1640                             }
1641                         }
1642                     }
1643 
1644                     if (curr_arena == arenas_pool.end()) break;
1645 
1646                     curr_arena->arena->enqueue([] {
1647                         std::atomic<int> sum{};
1648                         // Make some work
1649                         for (; sum < 100000; ++sum) ;
1650                     });
1651 
1652                     break;
1653                 }
1654                 case last_operation_marker :
1655                 break;
1656             }
1657         }
1658     };
1659 
1660     for (std::size_t i = 0; i < thread_number - 1; ++i) {
1661         thread_pool.emplace_back(thread_func);
1662     }
1663 
1664     thread_func();
1665 
1666     for (std::size_t i = 0; i < thread_number - 1; ++i) {
1667         if (thread_pool[i].joinable()) thread_pool[i].join();
1668     }
1669 
1670     for (auto& handler : arenas_pool) {
1671         if (handler.status != arena_handler::deleted) delete handler.arena;
1672     }
1673 }
1674 
1675 struct enqueue_test_helper {
1676     enqueue_test_helper(tbb::task_arena& arena, tbb::enumerable_thread_specific<bool>& ets , std::atomic<std::size_t>& task_counter)
1677         : my_arena(arena), my_ets(ets), my_task_counter(task_counter)
1678     {}
1679 
1680     enqueue_test_helper(const enqueue_test_helper& ef) : my_arena(ef.my_arena), my_ets(ef.my_ets), my_task_counter(ef.my_task_counter)
1681     {}
1682 
1683     void operator() () const {
1684         CHECK(my_ets.local());
1685         if (my_task_counter++ < 100000) my_arena.enqueue(enqueue_test_helper(my_arena, my_ets, my_task_counter));
1686         std::this_thread::yield();
1687     }
1688 
1689     tbb::task_arena& my_arena;
1690     tbb::enumerable_thread_specific<bool>& my_ets;
1691     std::atomic<std::size_t>& my_task_counter;
1692 };
1693 
1694 //--------------------------------------------------//
1695 
1696 //! Test for task arena in concurrent cases
1697 //! \brief \ref requirement
1698 TEST_CASE("Test for concurrent functionality") {
1699     TestConcurrentFunctionality();
1700 }
1701 
1702 //! Test for arena entry consistency
1703 //! \brief \ref requirement \ref error_guessing
1704 TEST_CASE("Test for task arena entry consistency") {
1705     TestArenaEntryConsistency();
1706 }
1707 
1708 //! Test for task arena attach functionality
1709 //! \brief \ref requirement \ref interface
1710 TEST_CASE("Test for the attach functionality") {
1711     TestAttach(4);
1712 }
1713 
1714 //! Test for constant functor requirements
1715 //! \brief \ref requirement \ref interface
1716 TEST_CASE("Test for constant functor requirement") {
1717     TestConstantFunctorRequirement();
1718 }
1719 
1720 //! Test for move semantics support
1721 //! \brief \ref requirement \ref interface
1722 TEST_CASE("Move semantics support") {
1723     TestMoveSemantics();
1724 }
1725 
1726 //! Test for different return value types
1727 //! \brief \ref requirement \ref interface
1728 TEST_CASE("Return value test") {
1729     TestReturnValue();
1730 }
1731 
1732 //! Test for delegated task spawn in case of unsuccessful slot attach
1733 //! \brief \ref error_guessing
1734 TEST_CASE("Delegated spawn wait") {
1735     TestDelegatedSpawnWait();
1736 }
1737 
1738 //! Test task arena isolation functionality
1739 //! \brief \ref requirement \ref interface
1740 TEST_CASE("Isolated execute") {
1741     // Isolation tests cases is valid only for more then 2 threads
1742     if (tbb::this_task_arena::max_concurrency() > 2) {
1743         TestIsolatedExecute();
1744     }
1745 }
1746 
1747 //! Test for TBB Workers creation limits
1748 //! \brief \ref requirement
1749 TEST_CASE("Default workers limit") {
1750     TestDefaultWorkersLimit();
1751 }
1752 
1753 //! Test for workers migration between arenas
1754 //! \brief \ref error_guessing \ref stress
1755 TEST_CASE("Arena workers migration") {
1756     TestArenaWorkersMigration();
1757 }
1758 
1759 //! Test for multiple waits, threads should not block each other
1760 //! \brief \ref requirement
1761 TEST_CASE("Multiple waits") {
1762     TestMultipleWaits();
1763 }
1764 
1765 //! Test for small stack size settings and arena initialization
1766 //! \brief \ref error_guessing
1767 TEST_CASE("Small stack size") {
1768     TestSmallStackSize();
1769 }
1770 
1771 #if TBB_USE_EXCEPTIONS
1772 //! \brief \ref requirement \ref stress
1773 TEST_CASE("Test for exceptions during execute.") {
1774     ExceptionInExecute();
1775 }
1776 
1777 //! \brief \ref error_guessing
1778 TEST_CASE("Exception thrown during tbb::task_arena::execute call") {
1779     struct throwing_obj {
1780         throwing_obj() {
1781             volatile bool flag = true;
1782             if (flag) throw std::exception{};
1783         }
1784         throwing_obj(const throwing_obj&) = default;
1785         ~throwing_obj() { FAIL("An destructor was called."); }
1786     };
1787 
1788     tbb::task_arena arena;
1789 
1790     REQUIRE_THROWS_AS( [&] {
1791         arena.execute([] {
1792             return throwing_obj{};
1793         });
1794     }(), std::exception );
1795 }
1796 #endif // TBB_USE_EXCEPTIONS
1797 
1798 //! \brief \ref stress
1799 TEST_CASE("Stress test with mixing functionality") {
1800     StressTestMixFunctionality();
1801 }
1802 
1803 //! \brief \ref stress
1804 TEST_CASE("Workers oversubscription") {
1805     std::size_t num_threads = utils::get_platform_max_threads();
1806     tbb::enumerable_thread_specific<bool> ets;
1807     tbb::global_control gl(tbb::global_control::max_allowed_parallelism, num_threads * 2);
1808     tbb::task_arena arena(num_threads * 2);
1809 
1810     utils::SpinBarrier barrier(num_threads * 2);
1811 
1812     arena.execute([&] {
1813         tbb::parallel_for(std::size_t(0), num_threads * 2,
1814             [&] (const std::size_t&) {
1815                 ets.local() = true;
1816                 barrier.wait();
1817             }
1818         );
1819     });
1820 
1821     std::this_thread::yield();
1822 
1823     std::atomic<std::size_t> task_counter{0};
1824     for (std::size_t i = 0; i < num_threads / 4 + 1; ++i) {
1825         arena.enqueue(enqueue_test_helper(arena, ets, task_counter));
1826     }
1827 
1828     while (task_counter < 100000) std::this_thread::yield();
1829 
1830     arena.execute([&] {
1831         tbb::parallel_for(std::size_t(0), num_threads * 2,
1832             [&] (const std::size_t&) {
1833                 CHECK(ets.local());
1834                 barrier.wait();
1835             }
1836         );
1837     });
1838 }
1839