xref: /oneTBB/test/tbb/test_task_arena.cpp (revision 5d8ddb3f)
1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "common/test.h"
18 
19 #define __TBB_EXTRA_DEBUG 1
20 #include "common/concurrency_tracker.h"
21 #include "common/spin_barrier.h"
22 #include "common/utils.h"
23 #include "common/utils_report.h"
24 #include "common/utils_concurrency_limit.h"
25 
26 #include "tbb/task_arena.h"
27 #include "tbb/task_scheduler_observer.h"
28 #include "tbb/enumerable_thread_specific.h"
29 #include "tbb/parallel_for.h"
30 #include "tbb/global_control.h"
31 #include "tbb/concurrent_set.h"
32 #include "tbb/spin_mutex.h"
33 #include "tbb/spin_rw_mutex.h"
34 #include "tbb/task_group.h"
35 
36 #include <stdexcept>
37 #include <cstdlib>
38 #include <cstdio>
39 #include <vector>
40 #include <thread>
41 #include <atomic>
42 
43 //#include "harness_fp.h"
44 
45 //! \file test_task_arena.cpp
46 //! \brief Test for [scheduler.task_arena scheduler.task_scheduler_observer] specification
47 
48 //--------------------------------------------------//
49 // Test that task_arena::initialize and task_arena::terminate work when doing nothing else.
50 /* maxthread is treated as the biggest possible concurrency level. */
51 void InitializeAndTerminate( int maxthread ) {
52     for( int i=0; i<200; ++i ) {
53         switch( i&3 ) {
54             // Arena is created inactive, initialization is always explicit. Lazy initialization is covered by other test functions.
55             // Explicit initialization can either keep the original values or change those.
56             // Arena termination can be explicit or implicit (in the destructor).
57             // TODO: extend with concurrency level checks if such a method is added.
58             default: {
59                 tbb::task_arena arena( std::rand() % maxthread + 1 );
60                 CHECK_MESSAGE(!arena.is_active(), "arena should not be active until initialized");
61                 arena.initialize();
62                 CHECK(arena.is_active());
63                 arena.terminate();
64                 CHECK_MESSAGE(!arena.is_active(), "arena should not be active; it was terminated");
65                 break;
66             }
67             case 0: {
68                 tbb::task_arena arena( 1 );
69                 CHECK_MESSAGE(!arena.is_active(), "arena should not be active until initialized");
70                 arena.initialize( std::rand() % maxthread + 1 ); // change the parameters
71                 CHECK(arena.is_active());
72                 break;
73             }
74             case 1: {
75                 tbb::task_arena arena( tbb::task_arena::automatic );
76                 CHECK(!arena.is_active());
77                 arena.initialize();
78                 CHECK(arena.is_active());
79                 break;
80             }
81             case 2: {
82                 tbb::task_arena arena;
83                 CHECK_MESSAGE(!arena.is_active(), "arena should not be active until initialized");
84                 arena.initialize( std::rand() % maxthread + 1 );
85                 CHECK(arena.is_active());
86                 arena.terminate();
87                 CHECK_MESSAGE(!arena.is_active(), "arena should not be active; it was terminated");
88                 break;
89             }
90         }
91     }
92 }
93 
94 //--------------------------------------------------//
95 
96 // Definitions used in more than one test
97 typedef tbb::blocked_range<int> Range;
98 
99 // slot_id value: -1 is reserved by current_slot(), -2 is set in on_scheduler_exit() below
100 static tbb::enumerable_thread_specific<int> local_id, old_id, slot_id(-3);
101 
102 void ResetTLS() {
103     local_id.clear();
104     old_id.clear();
105     slot_id.clear();
106 }
107 
108 class ArenaObserver : public tbb::task_scheduler_observer {
109     int myId;               // unique observer/arena id within a test
110     int myMaxConcurrency;   // concurrency of the associated arena
111     int myNumReservedSlots; // reserved slots in the associated arena
112     void on_scheduler_entry( bool is_worker ) override {
113         int current_index = tbb::this_task_arena::current_thread_index();
114         CHECK(current_index < (myMaxConcurrency > 1 ? myMaxConcurrency : 2));
115         if (is_worker) {
116             CHECK(current_index >= myNumReservedSlots);
117         }
118         CHECK_MESSAGE(!old_id.local(), "double call to on_scheduler_entry");
119         old_id.local() = local_id.local();
120         CHECK_MESSAGE(old_id.local() != myId, "double entry to the same arena");
121         local_id.local() = myId;
122         slot_id.local() = current_index;
123     }
124     void on_scheduler_exit( bool /*is_worker*/ ) override {
125         CHECK_MESSAGE(local_id.local() == myId, "nesting of arenas is broken");
126         CHECK(slot_id.local() == tbb::this_task_arena::current_thread_index());
127         slot_id.local() = -2;
128         local_id.local() = old_id.local();
129         old_id.local() = 0;
130     }
131 public:
132     ArenaObserver(tbb::task_arena &a, int maxConcurrency, int numReservedSlots, int id)
133         : tbb::task_scheduler_observer(a)
134         , myId(id)
135         , myMaxConcurrency(maxConcurrency)
136         , myNumReservedSlots(numReservedSlots) {
137         CHECK(myId);
138         observe(true);
139     }
140     ~ArenaObserver () {
141         CHECK_MESSAGE(!old_id.local(), "inconsistent observer state");
142     }
143 };
144 
145 struct IndexTrackingBody { // Must be used together with ArenaObserver
146     void operator() ( const Range& ) const {
147         CHECK(slot_id.local() == tbb::this_task_arena::current_thread_index());
148         utils::doDummyWork(50000);
149     }
150 };
151 
152 struct AsynchronousWork {
153     utils::SpinBarrier &my_barrier;
154     bool my_is_blocking;
155     AsynchronousWork(utils::SpinBarrier &a_barrier, bool blocking = true)
156     : my_barrier(a_barrier), my_is_blocking(blocking) {}
157     void operator()() const {
158         CHECK_MESSAGE(local_id.local() != 0, "not in explicit arena");
159         tbb::parallel_for(Range(0,500), IndexTrackingBody(), tbb::simple_partitioner());
160         if(my_is_blocking) my_barrier.wait(); // must be asynchronous to an external thread
161         else my_barrier.signalNoWait();
162     }
163 };
164 
165 //-----------------------------------------------------------------------------------------//
166 
167 // Test that task_arenas might be created and used from multiple application threads.
168 // Also tests arena observers. The parameter p is the index of an app thread running this test.
169 void TestConcurrentArenasFunc(int idx) {
170     // A regression test for observer activation order:
171     // check that arena observer can be activated before local observer
172     struct LocalObserver : public tbb::task_scheduler_observer {
173         LocalObserver() : tbb::task_scheduler_observer() { observe(true); }
174         LocalObserver(tbb::task_arena& a) : tbb::task_scheduler_observer(a) {
175             observe(true);
176         }
177     };
178 
179     tbb::task_arena a1;
180     a1.initialize(1,0);
181     ArenaObserver o1(a1, 1, 0, idx*2+1); // the last argument is a "unique" observer/arena id for the test
182     CHECK_MESSAGE(o1.is_observing(), "Arena observer has not been activated");
183 
184     tbb::task_arena a2(2,1);
185     ArenaObserver o2(a2, 2, 1, idx*2+2);
186     CHECK_MESSAGE(o2.is_observing(), "Arena observer has not been activated");
187 
188     LocalObserver lo1;
189     CHECK_MESSAGE(lo1.is_observing(), "Local observer has not been activated");
190 
191     tbb::task_arena a3(1, 0);
192     LocalObserver lo2(a3);
193     CHECK_MESSAGE(lo2.is_observing(), "Local observer has not been activated");
194 
195     utils::SpinBarrier barrier(2);
196     AsynchronousWork work(barrier);
197 
198     a1.enqueue(work); // put async work
199     barrier.wait();
200 
201     a2.enqueue(work); // another work
202     a2.execute(work);
203 
204     a3.execute([] {
205         utils::doDummyWork(100);
206     });
207 
208     a1.debug_wait_until_empty();
209     a2.debug_wait_until_empty();
210 }
211 
212 void TestConcurrentArenas(int p) {
213     // TODO REVAMP fix with global control
214     ResetTLS();
215     utils::NativeParallelFor( p, &TestConcurrentArenasFunc );
216 }
217 //--------------------------------------------------//
218 // Test multiple application threads working with a single arena at the same time.
219 class MultipleMastersPart1 : utils::NoAssign {
220     tbb::task_arena &my_a;
221     utils::SpinBarrier &my_b1, &my_b2;
222 public:
223     MultipleMastersPart1( tbb::task_arena &a, utils::SpinBarrier &b1, utils::SpinBarrier &b2)
224         : my_a(a), my_b1(b1), my_b2(b2) {}
225     void operator()(int) const {
226         my_a.execute(AsynchronousWork(my_b2, /*blocking=*/false));
227         my_b1.wait();
228         // A regression test for bugs 1954 & 1971
229         my_a.enqueue(AsynchronousWork(my_b2, /*blocking=*/false));
230     }
231 };
232 
233 class MultipleMastersPart2 : utils::NoAssign {
234     tbb::task_arena &my_a;
235     utils::SpinBarrier &my_b;
236 public:
237     MultipleMastersPart2( tbb::task_arena &a, utils::SpinBarrier &b) : my_a(a), my_b(b) {}
238     void operator()(int) const {
239         my_a.execute(AsynchronousWork(my_b, /*blocking=*/false));
240     }
241 };
242 
243 class MultipleMastersPart3 : utils::NoAssign {
244     tbb::task_arena &my_a;
245     utils::SpinBarrier &my_b;
246     using wait_context = tbb::detail::d1::wait_context;
247 
248     struct Runner : NoAssign {
249         wait_context& myWait;
250         Runner(wait_context& w) : myWait(w) {}
251         void operator()() const {
252             utils::doDummyWork(10000);
253             myWait.release();
254         }
255     };
256 
257     struct Waiter : NoAssign {
258         wait_context& myWait;
259         Waiter(wait_context& w) : myWait(w) {}
260         void operator()() const {
261             tbb::task_group_context ctx;
262             tbb::detail::d1::wait(myWait, ctx);
263         }
264     };
265 
266 public:
267     MultipleMastersPart3(tbb::task_arena &a, utils::SpinBarrier &b)
268         : my_a(a), my_b(b) {}
269     void operator()(int) const {
270         wait_context wait(0);
271         my_b.wait(); // increases chances for task_arena initialization contention
272         for( int i=0; i<100; ++i) {
273             wait.reserve();
274             my_a.enqueue(Runner(wait));
275             my_a.execute(Waiter(wait));
276         }
277         my_b.wait();
278     }
279 };
280 
281 void TestMultipleMasters(int p) {
282     {
283         ResetTLS();
284         tbb::task_arena a(1,0);
285         a.initialize();
286         ArenaObserver o(a, 1, 0, 1);
287         utils::SpinBarrier barrier1(p), barrier2(2*p+1); // each of p threads will submit two tasks signaling the barrier
288         NativeParallelFor( p, MultipleMastersPart1(a, barrier1, barrier2) );
289         barrier2.wait();
290         a.debug_wait_until_empty();
291     } {
292         ResetTLS();
293         tbb::task_arena a(2,1);
294         ArenaObserver o(a, 2, 1, 2);
295         utils::SpinBarrier barrier(p+2);
296         a.enqueue(AsynchronousWork(barrier, /*blocking=*/true)); // occupy the worker, a regression test for bug 1981
297         // TODO: buggy test. A worker threads need time to occupy the slot to prevent an external thread from taking an enqueue task.
298         utils::Sleep(10);
299         NativeParallelFor( p, MultipleMastersPart2(a, barrier) );
300         barrier.wait();
301         a.debug_wait_until_empty();
302     } {
303         // Regression test for the bug 1981 part 2 (task_arena::execute() with wait_for_all for an enqueued task)
304         tbb::task_arena a(p,1);
305         utils::SpinBarrier barrier(p+1); // for external threads to avoid endless waiting at least in some runs
306         // "Oversubscribe" the arena by 1 external thread
307         NativeParallelFor( p+1, MultipleMastersPart3(a, barrier) );
308         a.debug_wait_until_empty();
309     }
310 }
311 
312 //--------------------------------------------------//
313 // TODO: explain what TestArenaEntryConsistency does
314 #include <sstream>
315 #include <stdexcept>
316 #include "oneapi/tbb/detail/_exception.h"
317 #include "common/fp_control.h"
318 
319 struct TestArenaEntryBody : FPModeContext {
320     std::atomic<int> &my_stage; // each execute increases it
321     std::stringstream my_id;
322     bool is_caught, is_expected;
323     enum { arenaFPMode = 1 };
324 
325     TestArenaEntryBody(std::atomic<int> &s, int idx, int i)  // init thread-specific instance
326     :   FPModeContext(idx+i)
327     ,   my_stage(s)
328     ,   is_caught(false)
329 #if TBB_USE_EXCEPTIONS
330     ,   is_expected( (idx&(1<<i)) != 0 )
331 #else
332     , is_expected(false)
333 #endif
334     {
335         my_id << idx << ':' << i << '@';
336     }
337     void operator()() { // inside task_arena::execute()
338         // synchronize with other stages
339         int stage = my_stage++;
340         int slot = tbb::this_task_arena::current_thread_index();
341         CHECK(slot >= 0);
342         CHECK(slot <= 1);
343         // wait until the third stage is delegated and then starts on slot 0
344         while(my_stage < 2+slot) utils::yield();
345         // deduct its entry type and put it into id, it helps to find source of a problem
346         my_id << (stage < 3 ? (tbb::this_task_arena::current_thread_index()?
347                               "delegated_to_worker" : stage < 2? "direct" : "delegated_to_master")
348                             : stage == 3? "nested_same_ctx" : "nested_alien_ctx");
349         AssertFPMode(arenaFPMode);
350         if (is_expected) {
351             TBB_TEST_THROW(std::logic_error(my_id.str()));
352         }
353         // no code can be put here since exceptions can be thrown
354     }
355     void on_exception(const char *e) { // outside arena, in catch block
356         is_caught = true;
357         CHECK(my_id.str() == e);
358         assertFPMode();
359     }
360     void after_execute() { // outside arena and catch block
361         CHECK(is_caught == is_expected);
362         assertFPMode();
363     }
364 };
365 
366 class ForEachArenaEntryBody : utils::NoAssign {
367     tbb::task_arena &my_a; // expected task_arena(2,1)
368     std::atomic<int> &my_stage; // each execute increases it
369     int my_idx;
370 
371 public:
372     ForEachArenaEntryBody(tbb::task_arena &a, std::atomic<int> &c)
373     : my_a(a), my_stage(c), my_idx(0) {}
374 
375     void test(int idx) {
376         my_idx = idx;
377         my_stage = 0;
378         NativeParallelFor(3, *this); // test cross-arena calls
379         CHECK(my_stage == 3);
380         my_a.execute(*this); // test nested calls
381         CHECK(my_stage == 5);
382     }
383 
384     // task_arena functor for nested tests
385     void operator()() const {
386         test_arena_entry(3); // in current task group context
387         tbb::parallel_for(4, 5, *this); // in different context
388     }
389 
390     // NativeParallelFor & parallel_for functor
391     void operator()(int i) const {
392         test_arena_entry(i);
393     }
394 
395 private:
396     void test_arena_entry(int i) const {
397         GetRoundingMode();
398         TestArenaEntryBody scoped_functor(my_stage, my_idx, i);
399         GetRoundingMode();
400 #if TBB_USE_EXCEPTIONS
401         try {
402             my_a.execute(scoped_functor);
403         } catch(std::logic_error &e) {
404             scoped_functor.on_exception(e.what());
405         } catch(...) { CHECK_MESSAGE(false, "Unexpected exception type"); }
406 #else
407         my_a.execute(scoped_functor);
408 #endif
409         scoped_functor.after_execute();
410     }
411 };
412 
413 void TestArenaEntryConsistency() {
414     tbb::task_arena a(2, 1);
415     std::atomic<int> c;
416     ForEachArenaEntryBody body(a, c);
417 
418     FPModeContext fp_scope(TestArenaEntryBody::arenaFPMode);
419     a.initialize(); // capture FP settings to arena
420     fp_scope.setNextFPMode();
421 
422     for (int i = 0; i < 100; i++) // not less than 32 = 2^5 of entry types
423         body.test(i);
424 }
425 //--------------------------------------------------
426 // Test that the requested degree of concurrency for task_arena is achieved in various conditions
427 class TestArenaConcurrencyBody : utils::NoAssign {
428     tbb::task_arena &my_a;
429     int my_max_concurrency;
430     int my_reserved_slots;
431     utils::SpinBarrier *my_barrier;
432     utils::SpinBarrier *my_worker_barrier;
433 public:
434     TestArenaConcurrencyBody( tbb::task_arena &a, int max_concurrency, int reserved_slots, utils::SpinBarrier *b = NULL, utils::SpinBarrier *wb = NULL )
435     : my_a(a), my_max_concurrency(max_concurrency), my_reserved_slots(reserved_slots), my_barrier(b), my_worker_barrier(wb) {}
436     // NativeParallelFor's functor
437     void operator()( int ) const {
438         CHECK_MESSAGE( local_id.local() == 0, "TLS was not cleaned?" );
439         local_id.local() = 1;
440         my_a.execute( *this );
441     }
442     // Arena's functor
443     void operator()() const {
444         int idx = tbb::this_task_arena::current_thread_index();
445         CHECK( idx < (my_max_concurrency > 1 ? my_max_concurrency : 2) );
446         CHECK( my_a.max_concurrency() == tbb::this_task_arena::max_concurrency() );
447         int max_arena_concurrency = tbb::this_task_arena::max_concurrency();
448         CHECK( max_arena_concurrency == my_max_concurrency );
449         if ( my_worker_barrier ) {
450             if ( local_id.local() == 1 ) {
451                 // External thread in a reserved slot
452                 CHECK_MESSAGE( idx < my_reserved_slots, "External threads are supposed to use only reserved slots in this test" );
453             } else {
454                 // Worker thread
455                 CHECK( idx >= my_reserved_slots );
456                 my_worker_barrier->wait();
457             }
458         } else if ( my_barrier )
459             CHECK_MESSAGE( local_id.local() == 1, "Workers are not supposed to enter the arena in this test" );
460         if ( my_barrier ) my_barrier->wait();
461         else utils::Sleep( 1 );
462     }
463 };
464 
465 void TestArenaConcurrency( int p, int reserved = 0, int step = 1) {
466     for (; reserved <= p; reserved += step) {
467         tbb::task_arena a( p, reserved );
468         if (p - reserved < tbb::this_task_arena::max_concurrency()) {
469             // Check concurrency with worker & reserved external threads.
470             ResetTLS();
471             utils::SpinBarrier b( p );
472             utils::SpinBarrier wb( p-reserved );
473             TestArenaConcurrencyBody test( a, p, reserved, &b, &wb );
474             for ( int i = reserved; i < p; ++i ) // requests p-reserved worker threads
475                 a.enqueue( test );
476             if ( reserved==1 )
477                 test( 0 ); // calls execute()
478             else
479                 utils::NativeParallelFor( reserved, test );
480             a.debug_wait_until_empty();
481         } { // Check if multiple external threads alone can achieve maximum concurrency.
482             ResetTLS();
483             utils::SpinBarrier b( p );
484             utils::NativeParallelFor( p, TestArenaConcurrencyBody( a, p, reserved, &b ) );
485             a.debug_wait_until_empty();
486         } { // Check oversubscription by external threads.
487 #if !_WIN32 || !_WIN64
488             // Some C++ implementations allocate 8MB stacks for std::thread on 32 bit platforms
489             // that makes impossible to create more than ~500 threads.
490             if ( !(sizeof(std::size_t) == 4 && p > 200) )
491 #endif
492 #if TBB_TEST_LOW_WORKLOAD
493             if ( p <= 16 )
494 #endif
495             {
496                 ResetTLS();
497                 utils::NativeParallelFor(2 * p, TestArenaConcurrencyBody(a, p, reserved));
498                 a.debug_wait_until_empty();
499             }
500         }
501     }
502 }
503 
504 struct TestMandatoryConcurrencyObserver : public tbb::task_scheduler_observer {
505     utils::SpinBarrier& m_barrier;
506 
507     TestMandatoryConcurrencyObserver(tbb::task_arena& a, utils::SpinBarrier& barrier)
508         : tbb::task_scheduler_observer(a), m_barrier(barrier) {
509         observe(true);
510     }
511     void on_scheduler_exit(bool worker) override {
512         if (worker) {
513             m_barrier.wait();
514         }
515     }
516 };
517 
518 void TestMandatoryConcurrency() {
519     tbb::task_arena a(1);
520     a.execute([&a] {
521         int n_threads = 4;
522         utils::SpinBarrier exit_barrier(2);
523         TestMandatoryConcurrencyObserver observer(a, exit_barrier);
524         for (int j = 0; j < 5; ++j) {
525             utils::ExactConcurrencyLevel::check(1);
526             std::atomic<int> num_tasks{ 0 }, curr_tasks{ 0 };
527             utils::SpinBarrier barrier(n_threads);
528             utils::NativeParallelFor(n_threads, [&](int) {
529                 for (int i = 0; i < 5; ++i) {
530                     barrier.wait();
531                     a.enqueue([&] {
532                         CHECK(tbb::this_task_arena::max_concurrency() == 2);
533                         CHECK(a.max_concurrency() == 2);
534                         ++curr_tasks;
535                         CHECK(curr_tasks == 1);
536                         utils::doDummyWork(1000);
537                         CHECK(curr_tasks == 1);
538                         --curr_tasks;
539                         ++num_tasks;
540                     });
541                     barrier.wait();
542                 }
543             });
544             do {
545                 exit_barrier.wait();
546             } while (num_tasks < n_threads * 5);
547         }
548         observer.observe(false);
549     });
550 }
551 
552 void TestConcurrentFunctionality(int min_thread_num = 1, int max_thread_num = 3) {
553     TestMandatoryConcurrency();
554     InitializeAndTerminate(max_thread_num);
555     for (int p = min_thread_num; p <= max_thread_num; ++p) {
556         TestConcurrentArenas(p);
557         TestMultipleMasters(p);
558         TestArenaConcurrency(p);
559     }
560 }
561 
562 //--------------------------------------------------//
563 // Test creation/initialization of a task_arena that references an existing arena (aka attach).
564 // This part of the test uses the knowledge of task_arena internals
565 
566 struct TaskArenaValidator {
567     int my_slot_at_construction;
568     const tbb::task_arena& my_arena;
569     TaskArenaValidator( const tbb::task_arena& other )
570         : my_slot_at_construction(tbb::this_task_arena::current_thread_index())
571         , my_arena(other)
572     {}
573     // Inspect the internal state
574     int concurrency() { return my_arena.debug_max_concurrency(); }
575     int reserved_for_masters() { return my_arena.debug_reserved_slots(); }
576 
577     // This method should be called in task_arena::execute() for a captured arena
578     // by the same thread that created the validator.
579     void operator()() {
580         CHECK_MESSAGE( tbb::this_task_arena::current_thread_index()==my_slot_at_construction,
581                 "Current thread index has changed since the validator construction" );
582     }
583 };
584 
585 void ValidateAttachedArena( tbb::task_arena& arena, bool expect_activated,
586                             int expect_concurrency, int expect_masters ) {
587     CHECK_MESSAGE( arena.is_active()==expect_activated, "Unexpected activation state" );
588     if( arena.is_active() ) {
589         TaskArenaValidator validator( arena );
590         CHECK_MESSAGE( validator.concurrency()==expect_concurrency, "Unexpected arena size" );
591         CHECK_MESSAGE( validator.reserved_for_masters()==expect_masters, "Unexpected # of reserved slots" );
592         if ( tbb::this_task_arena::current_thread_index() != tbb::task_arena::not_initialized ) {
593             CHECK(tbb::this_task_arena::current_thread_index() >= 0);
594             // for threads already in arena, check that the thread index remains the same
595             arena.execute( validator );
596         } else { // not_initialized
597             // Test the deprecated method
598             CHECK(tbb::this_task_arena::current_thread_index() == -1);
599         }
600 
601         // Ideally, there should be a check for having the same internal arena object,
602         // but that object is not easily accessible for implicit arenas.
603     }
604 }
605 
606 struct TestAttachBody : utils::NoAssign {
607     static thread_local int my_idx; // safe to modify and use within the NativeParallelFor functor
608     const int maxthread;
609     TestAttachBody( int max_thr ) : maxthread(max_thr) {}
610 
611     // The functor body for NativeParallelFor
612     void operator()( int idx ) const {
613         my_idx = idx;
614 
615         int default_threads = tbb::this_task_arena::max_concurrency();
616 
617         tbb::task_arena arena{tbb::task_arena::attach()};
618         ValidateAttachedArena( arena, false, -1, -1 ); // Nothing yet to attach to
619 
620         arena.terminate();
621         ValidateAttachedArena( arena, false, -1, -1 );
622 
623         // attach to an auto-initialized arena
624         tbb::parallel_for(0, 1, [](int) {});
625 
626         tbb::task_arena arena2{tbb::task_arena::attach()};
627         ValidateAttachedArena( arena2, true, default_threads, 1 );
628 
629         // attach to another task_arena
630         arena.initialize( maxthread, std::min(maxthread,idx) );
631         arena.execute( *this );
632     }
633 
634     // The functor body for task_arena::execute above
635     void operator()() const {
636         tbb::task_arena arena2{tbb::task_arena::attach()};
637         ValidateAttachedArena( arena2, true, maxthread, std::min(maxthread,my_idx) );
638     }
639 
640     // The functor body for tbb::parallel_for
641     void operator()( const Range& r ) const {
642         for( int i = r.begin(); i<r.end(); ++i ) {
643             tbb::task_arena arena2{tbb::task_arena::attach()};
644             ValidateAttachedArena( arena2, true, tbb::this_task_arena::max_concurrency(), 1 );
645         }
646     }
647 };
648 
649 thread_local int TestAttachBody::my_idx;
650 
651 void TestAttach( int maxthread ) {
652     // Externally concurrent, but no concurrency within a thread
653     utils::NativeParallelFor( std::max(maxthread,4), TestAttachBody( maxthread ) );
654     // Concurrent within the current arena; may also serve as a stress test
655     tbb::parallel_for( Range(0,10000*maxthread), TestAttachBody( maxthread ) );
656 }
657 
658 //--------------------------------------------------//
659 
660 // Test that task_arena::enqueue does not tolerate a non-const functor.
661 // TODO: can it be reworked as SFINAE-based compile-time check?
662 struct TestFunctor {
663     void operator()() { CHECK_MESSAGE( false, "Non-const operator called" ); }
664     void operator()() const { /* library requires this overload only */ }
665 };
666 
667 void TestConstantFunctorRequirement() {
668     tbb::task_arena a;
669     TestFunctor tf;
670     a.enqueue( tf );
671 }
672 
673 //--------------------------------------------------//
674 
675 #include "tbb/parallel_reduce.h"
676 #include "tbb/parallel_invoke.h"
677 
678 // Test this_task_arena::isolate
679 namespace TestIsolatedExecuteNS {
680     template <typename NestedPartitioner>
681     class NestedParFor : utils::NoAssign {
682     public:
683         NestedParFor() {}
684         void operator()() const {
685             NestedPartitioner p;
686             tbb::parallel_for( 0, 10, utils::DummyBody( 10 ), p );
687         }
688     };
689 
690     template <typename NestedPartitioner>
691     class ParForBody : utils::NoAssign {
692         bool myOuterIsolation;
693         tbb::enumerable_thread_specific<int> &myEts;
694         std::atomic<bool> &myIsStolen;
695     public:
696         ParForBody( bool outer_isolation, tbb::enumerable_thread_specific<int> &ets, std::atomic<bool> &is_stolen )
697             : myOuterIsolation( outer_isolation ), myEts( ets ), myIsStolen( is_stolen ) {}
698         void operator()( int ) const {
699             int &e = myEts.local();
700             if ( e++ > 0 ) myIsStolen = true;
701             if ( myOuterIsolation )
702                 NestedParFor<NestedPartitioner>()();
703             else
704                 tbb::this_task_arena::isolate( NestedParFor<NestedPartitioner>() );
705             --e;
706         }
707     };
708 
709     template <typename OuterPartitioner, typename NestedPartitioner>
710     class OuterParFor : utils::NoAssign {
711         bool myOuterIsolation;
712         std::atomic<bool> &myIsStolen;
713     public:
714         OuterParFor( bool outer_isolation, std::atomic<bool> &is_stolen ) : myOuterIsolation( outer_isolation ), myIsStolen( is_stolen ) {}
715         void operator()() const {
716             tbb::enumerable_thread_specific<int> ets( 0 );
717             OuterPartitioner p;
718             tbb::parallel_for( 0, 1000, ParForBody<NestedPartitioner>( myOuterIsolation, ets, myIsStolen ), p );
719         }
720     };
721 
722     template <typename OuterPartitioner, typename NestedPartitioner>
723     void TwoLoopsTest( bool outer_isolation ) {
724         std::atomic<bool> is_stolen;
725         is_stolen = false;
726         const int max_repeats = 100;
727         if ( outer_isolation ) {
728             for ( int i = 0; i <= max_repeats; ++i ) {
729                 tbb::this_task_arena::isolate( OuterParFor<OuterPartitioner, NestedPartitioner>( outer_isolation, is_stolen ) );
730                 if ( is_stolen ) break;
731             }
732             // TODO: was ASSERT_WARNING
733             if (!is_stolen) {
734                 REPORT("Warning: isolate() should not block stealing on nested levels without isolation\n");
735             }
736         } else {
737             for ( int i = 0; i <= max_repeats; ++i ) {
738                 OuterParFor<OuterPartitioner, NestedPartitioner>( outer_isolation, is_stolen )();
739             }
740             REQUIRE_MESSAGE( !is_stolen, "isolate() on nested levels should prevent stealing from outer leves" );
741         }
742     }
743 
744     void TwoLoopsTest( bool outer_isolation ) {
745         TwoLoopsTest<tbb::simple_partitioner, tbb::simple_partitioner>( outer_isolation );
746         TwoLoopsTest<tbb::simple_partitioner, tbb::affinity_partitioner>( outer_isolation );
747         TwoLoopsTest<tbb::affinity_partitioner, tbb::simple_partitioner>( outer_isolation );
748         TwoLoopsTest<tbb::affinity_partitioner, tbb::affinity_partitioner>( outer_isolation );
749     }
750 
751     void TwoLoopsTest() {
752         TwoLoopsTest( true );
753         TwoLoopsTest( false );
754     }
755     //--------------------------------------------------//
756     class HeavyMixTestBody : utils::NoAssign {
757         tbb::enumerable_thread_specific<utils::FastRandom<>>& myRandom;
758         tbb::enumerable_thread_specific<int>& myIsolatedLevel;
759         int myNestedLevel;
760 
761         template <typename Partitioner, typename Body>
762         static void RunTwoBodies( utils::FastRandom<>& rnd, const Body &body, Partitioner& p, tbb::task_group_context* ctx = NULL ) {
763             if ( rnd.get() % 2 ) {
764                 if  (ctx )
765                     tbb::parallel_for( 0, 2, body, p, *ctx );
766                 else
767                     tbb::parallel_for( 0, 2, body, p );
768             } else {
769                 tbb::parallel_invoke( body, body );
770             }
771         }
772 
773         template <typename Partitioner>
774         class IsolatedBody : utils::NoAssign {
775             const HeavyMixTestBody &myHeavyMixTestBody;
776             Partitioner &myPartitioner;
777         public:
778             IsolatedBody( const HeavyMixTestBody &body, Partitioner &partitioner )
779                 : myHeavyMixTestBody( body ), myPartitioner( partitioner ) {}
780             void operator()() const {
781                 RunTwoBodies( myHeavyMixTestBody.myRandom.local(),
782                     HeavyMixTestBody( myHeavyMixTestBody.myRandom, myHeavyMixTestBody.myIsolatedLevel,
783                         myHeavyMixTestBody.myNestedLevel + 1 ),
784                     myPartitioner );
785             }
786         };
787 
788         template <typename Partitioner>
789         void RunNextLevel( utils::FastRandom<>& rnd, int &isolated_level ) const {
790             Partitioner p;
791             switch ( rnd.get() % 2 ) {
792                 case 0: {
793                     // No features
794                     tbb::task_group_context ctx;
795                     RunTwoBodies( rnd, HeavyMixTestBody(myRandom, myIsolatedLevel, myNestedLevel + 1), p, &ctx );
796                     break;
797                 }
798                 case 1: {
799                     // Isolation
800                     int previous_isolation = isolated_level;
801                     isolated_level = myNestedLevel;
802                     tbb::this_task_arena::isolate( IsolatedBody<Partitioner>( *this, p ) );
803                     isolated_level = previous_isolation;
804                     break;
805                 }
806             }
807         }
808     public:
809         HeavyMixTestBody( tbb::enumerable_thread_specific<utils::FastRandom<>>& random,
810             tbb::enumerable_thread_specific<int>& isolated_level, int nested_level )
811             : myRandom( random ), myIsolatedLevel( isolated_level )
812             , myNestedLevel( nested_level ) {}
813         void operator()() const {
814             int &isolated_level = myIsolatedLevel.local();
815             CHECK_FAST_MESSAGE( myNestedLevel > isolated_level, "The outer-level task should not be stolen on isolated level" );
816             if ( myNestedLevel == 20 )
817                 return;
818             utils::FastRandom<>& rnd = myRandom.local();
819             if ( rnd.get() % 2 == 1 ) {
820                 RunNextLevel<tbb::auto_partitioner>( rnd, isolated_level );
821             } else {
822                 RunNextLevel<tbb::affinity_partitioner>( rnd, isolated_level );
823             }
824         }
825         void operator()(int) const {
826             this->operator()();
827         }
828     };
829 
830     struct RandomInitializer {
831         utils::FastRandom<> operator()() {
832             return utils::FastRandom<>( tbb::this_task_arena::current_thread_index() );
833         }
834     };
835 
836     void HeavyMixTest() {
837         std::size_t num_threads = tbb::this_task_arena::max_concurrency() < 3 ? 3 : tbb::this_task_arena::max_concurrency();
838         tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, num_threads);
839 
840         RandomInitializer init_random;
841         tbb::enumerable_thread_specific<utils::FastRandom<>> random( init_random );
842         tbb::enumerable_thread_specific<int> isolated_level( 0 );
843         for ( int i = 0; i < 5; ++i ) {
844             HeavyMixTestBody b( random, isolated_level, 1 );
845             b( 0 );
846         }
847     }
848 
849     //--------------------------------------------------//
850 #if TBB_USE_EXCEPTIONS
851     struct MyException {};
852     struct IsolatedBodyThrowsException {
853         void operator()() const {
854 #if _MSC_VER && !__INTEL_COMPILER
855             // Workaround an unreachable code warning in task_arena_function.
856             volatile bool workaround = true;
857             if (workaround)
858 #endif
859             {
860                 throw MyException();
861             }
862         }
863     };
864     struct ExceptionTestBody : utils::NoAssign {
865         tbb::enumerable_thread_specific<int>& myEts;
866         std::atomic<bool>& myIsStolen;
867         ExceptionTestBody( tbb::enumerable_thread_specific<int>& ets, std::atomic<bool>& is_stolen )
868             : myEts( ets ), myIsStolen( is_stolen ) {}
869         void operator()( int i ) const {
870             try {
871                 tbb::this_task_arena::isolate( IsolatedBodyThrowsException() );
872                 REQUIRE_MESSAGE( false, "The exception has been lost" );
873             }
874             catch ( MyException ) {}
875             catch ( ... ) {
876                 REQUIRE_MESSAGE( false, "Unexpected exception" );
877             }
878             // Check that nested algorithms can steal outer-level tasks
879             int &e = myEts.local();
880             if ( e++ > 0 ) myIsStolen = true;
881             // work imbalance increases chances for stealing
882             tbb::parallel_for( 0, 10+i, utils::DummyBody( 100 ) );
883             --e;
884         }
885     };
886 
887 #endif /* TBB_USE_EXCEPTIONS */
888     void ExceptionTest() {
889 #if TBB_USE_EXCEPTIONS
890         tbb::enumerable_thread_specific<int> ets;
891         std::atomic<bool> is_stolen;
892         is_stolen = false;
893         for ( ;; ) {
894             tbb::parallel_for( 0, 1000, ExceptionTestBody( ets, is_stolen ) );
895             if ( is_stolen ) break;
896         }
897         REQUIRE_MESSAGE( is_stolen, "isolate should not affect non-isolated work" );
898 #endif /* TBB_USE_EXCEPTIONS */
899     }
900 
901     struct NonConstBody {
902         unsigned int state;
903         void operator()() {
904             state ^= ~0u;
905         }
906     };
907 
908     void TestNonConstBody() {
909         NonConstBody body;
910         body.state = 0x6c97d5ed;
911         tbb::this_task_arena::isolate(body);
912         REQUIRE_MESSAGE(body.state == 0x93682a12, "The wrong state");
913     }
914 
915     // TODO: Consider tbb::task_group instead of explicit task API.
916     class TestEnqueueTask : public tbb::detail::d1::task {
917         using wait_context = tbb::detail::d1::wait_context;
918 
919         tbb::enumerable_thread_specific<bool>& executed;
920         std::atomic<int>& completed;
921 
922     public:
923         wait_context& waiter;
924         tbb::task_arena& arena;
925         static const int N = 100;
926 
927         TestEnqueueTask(tbb::enumerable_thread_specific<bool>& exe, std::atomic<int>& c, wait_context& w, tbb::task_arena& a)
928             : executed(exe), completed(c), waiter(w), arena(a) {}
929 
930         tbb::detail::d1::task* execute(tbb::detail::d1::execution_data&) override {
931             for (int i = 0; i < N; ++i) {
932                 arena.enqueue([&]() {
933                     executed.local() = true;
934                     ++completed;
935                     for (int j = 0; j < 100; j++) utils::yield();
936                     waiter.release(1);
937                 });
938             }
939             return nullptr;
940         }
941         tbb::detail::d1::task* cancel(tbb::detail::d1::execution_data&) override { return nullptr; }
942     };
943 
944     class TestEnqueueIsolateBody : utils::NoCopy {
945         tbb::enumerable_thread_specific<bool>& executed;
946         std::atomic<int>& completed;
947         tbb::task_arena& arena;
948     public:
949         static const int N = 100;
950 
951         TestEnqueueIsolateBody(tbb::enumerable_thread_specific<bool>& exe, std::atomic<int>& c, tbb::task_arena& a)
952             : executed(exe), completed(c), arena(a) {}
953         void operator()() {
954             tbb::task_group_context ctx;
955             tbb::detail::d1::wait_context waiter(N);
956 
957             TestEnqueueTask root(executed, completed, waiter, arena);
958             tbb::detail::d1::execute_and_wait(root, ctx, waiter, ctx);
959         }
960     };
961 
962     void TestEnqueue() {
963         tbb::enumerable_thread_specific<bool> executed(false);
964         std::atomic<int> completed;
965         tbb::task_arena arena{tbb::task_arena::attach()};
966 
967         // Check that the main thread can process enqueued tasks.
968         completed = 0;
969         TestEnqueueIsolateBody b1(executed, completed, arena);
970         b1();
971 
972         if (!executed.local()) {
973             REPORT("Warning: No one enqueued task has executed by the main thread.\n");
974         }
975 
976         executed.local() = false;
977         completed = 0;
978         const int N = 100;
979         // Create enqueued tasks out of isolation.
980 
981         tbb::task_group_context ctx;
982         tbb::detail::d1::wait_context waiter(N);
983         for (int i = 0; i < N; ++i) {
984             arena.enqueue([&]() {
985                 executed.local() = true;
986                 ++completed;
987                 utils::yield();
988                 waiter.release(1);
989             });
990         }
991         TestEnqueueIsolateBody b2(executed, completed, arena);
992         tbb::this_task_arena::isolate(b2);
993         REQUIRE_MESSAGE(executed.local() == false, "An enqueued task was executed within isolate.");
994 
995         tbb::detail::d1::wait(waiter, ctx);
996         // while (completed < TestEnqueueTask::N + N) utils::yield();
997     }
998 }
999 
1000 void TestIsolatedExecute() {
1001     // At least 3 threads (owner + 2 thieves) are required to reproduce a situation when the owner steals outer
1002     // level task on a nested level. If we have only one thief then it will execute outer level tasks first and
1003     // the owner will not have a possibility to steal outer level tasks.
1004     int platform_max_thread = tbb::this_task_arena::max_concurrency();
1005     int num_threads = utils::min( platform_max_thread, 3 );
1006     {
1007         // Too many threads require too many work to reproduce the stealing from outer level.
1008         tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, utils::max(num_threads, 7));
1009         TestIsolatedExecuteNS::TwoLoopsTest();
1010         TestIsolatedExecuteNS::HeavyMixTest();
1011         TestIsolatedExecuteNS::ExceptionTest();
1012     }
1013     tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, num_threads);
1014     TestIsolatedExecuteNS::HeavyMixTest();
1015     TestIsolatedExecuteNS::TestNonConstBody();
1016     TestIsolatedExecuteNS::TestEnqueue();
1017 }
1018 
1019 //-----------------------------------------------------------------------------------------//
1020 
1021 class TestDelegatedSpawnWaitBody : utils::NoAssign {
1022     tbb::task_arena &my_a;
1023     utils::SpinBarrier &my_b1, &my_b2;
1024 public:
1025     TestDelegatedSpawnWaitBody( tbb::task_arena &a, utils::SpinBarrier &b1, utils::SpinBarrier &b2)
1026         : my_a(a), my_b1(b1), my_b2(b2) {}
1027     // NativeParallelFor's functor
1028     void operator()(int idx) const {
1029         if ( idx==0 ) { // thread 0 works in the arena, thread 1 waits for it (to prevent test hang)
1030             for (int i = 0; i < 2; ++i) {
1031                 my_a.enqueue([this] { my_b1.wait(); }); // tasks to sync with workers
1032             }
1033             tbb::task_group tg;
1034             my_b1.wait(); // sync with the workers
1035             for( int i=0; i<100000; ++i) {
1036                 my_a.execute([&tg] { tg.run([] {}); });
1037             }
1038             my_a.execute([&tg] {tg.wait(); });
1039         }
1040 
1041         my_b2.wait(); // sync both threads
1042     }
1043 };
1044 
1045 void TestDelegatedSpawnWait() {
1046     if (tbb::this_task_arena::max_concurrency() < 3) {
1047         // The test requires at least 2 worker threads
1048         return;
1049     }
1050     // Regression test for a bug with missed wakeup notification from a delegated task
1051     tbb::task_arena a(2,0);
1052     a.initialize();
1053     utils::SpinBarrier barrier1(3), barrier2(2);
1054     utils::NativeParallelFor( 2, TestDelegatedSpawnWaitBody(a, barrier1, barrier2) );
1055     a.debug_wait_until_empty();
1056 }
1057 
1058 //-----------------------------------------------------------------------------------------//
1059 
1060 class TestMultipleWaitsArenaWait : utils::NoAssign {
1061     using wait_context = tbb::detail::d1::wait_context;
1062 public:
1063     TestMultipleWaitsArenaWait( int idx, int bunch_size, int num_tasks, std::vector<wait_context*>& waiters, std::atomic<int>& processed, tbb::task_group_context& tgc )
1064         : my_idx( idx ), my_bunch_size( bunch_size ), my_num_tasks(num_tasks), my_waiters( waiters ), my_processed( processed ), my_context(tgc) {}
1065     void operator()() const {
1066         ++my_processed;
1067         // Wait for all tasks
1068         if ( my_idx < my_num_tasks ) {
1069             tbb::detail::d1::wait(*my_waiters[my_idx], my_context);
1070         }
1071         // Signal waiting tasks
1072         if ( my_idx >= my_bunch_size ) {
1073             my_waiters[my_idx-my_bunch_size]->release();
1074         }
1075     }
1076 private:
1077     int my_idx;
1078     int my_bunch_size;
1079     int my_num_tasks;
1080     std::vector<wait_context*>& my_waiters;
1081     std::atomic<int>& my_processed;
1082     tbb::task_group_context& my_context;
1083 };
1084 
1085 class TestMultipleWaitsThreadBody : utils::NoAssign {
1086     using wait_context = tbb::detail::d1::wait_context;
1087 public:
1088     TestMultipleWaitsThreadBody( int bunch_size, int num_tasks, tbb::task_arena& a, std::vector<wait_context*>& waiters, std::atomic<int>& processed, tbb::task_group_context& tgc )
1089         : my_bunch_size( bunch_size ), my_num_tasks( num_tasks ), my_arena( a ), my_waiters( waiters ), my_processed( processed ), my_context(tgc) {}
1090     void operator()( int idx ) const {
1091         my_arena.execute( TestMultipleWaitsArenaWait( idx, my_bunch_size, my_num_tasks, my_waiters, my_processed, my_context ) );
1092         --my_processed;
1093     }
1094 private:
1095     int my_bunch_size;
1096     int my_num_tasks;
1097     tbb::task_arena& my_arena;
1098     std::vector<wait_context*>& my_waiters;
1099     std::atomic<int>& my_processed;
1100     tbb::task_group_context& my_context;
1101 };
1102 
1103 void TestMultipleWaits( int num_threads, int num_bunches, int bunch_size ) {
1104     tbb::task_arena a( num_threads );
1105     const int num_tasks = (num_bunches-1)*bunch_size;
1106 
1107     tbb::task_group_context tgc;
1108     std::vector<tbb::detail::d1::wait_context*> waiters(num_tasks);
1109     for (auto& w : waiters) w = new tbb::detail::d1::wait_context(0);
1110 
1111     std::atomic<int> processed(0);
1112     for ( int repeats = 0; repeats<10; ++repeats ) {
1113         int idx = 0;
1114         for ( int bunch = 0; bunch < num_bunches-1; ++bunch ) {
1115             // Sync with the previous bunch of waiters to prevent "false" nested dependicies (when a nested task waits for an outer task).
1116             while ( processed < bunch*bunch_size ) utils::yield();
1117             // Run the bunch of threads/waiters that depend on the next bunch of threads/waiters.
1118             for ( int i = 0; i<bunch_size; ++i ) {
1119                 waiters[idx]->reserve();
1120                 std::thread( TestMultipleWaitsThreadBody( bunch_size, num_tasks, a, waiters, processed, tgc ), idx++ ).detach();
1121             }
1122         }
1123         // No sync because the threads of the last bunch do not call wait_for_all.
1124         // Run the last bunch of threads.
1125         for ( int i = 0; i<bunch_size; ++i )
1126             std::thread( TestMultipleWaitsThreadBody( bunch_size, num_tasks, a, waiters, processed, tgc ), idx++ ).detach();
1127         while ( processed ) utils::yield();
1128     }
1129     for (auto w : waiters) delete w;
1130 }
1131 
1132 void TestMultipleWaits() {
1133     // Limit the number of threads to prevent heavy oversubscription.
1134 #if TBB_TEST_LOW_WORKLOAD
1135     const int max_threads = std::min( 4, tbb::this_task_arena::max_concurrency() );
1136 #else
1137     const int max_threads = std::min( 16, tbb::this_task_arena::max_concurrency() );
1138 #endif
1139 
1140     utils::FastRandom<> rnd(1234);
1141     for ( int threads = 1; threads <= max_threads; threads += utils::max( threads/2, 1 ) ) {
1142         for ( int i = 0; i<3; ++i ) {
1143             const int num_bunches = 3 + rnd.get()%3;
1144             const int bunch_size = max_threads + rnd.get()%max_threads;
1145             TestMultipleWaits( threads, num_bunches, bunch_size );
1146         }
1147     }
1148 }
1149 
1150 //--------------------------------------------------//
1151 
1152 void TestSmallStackSize() {
1153     tbb::global_control gc(tbb::global_control::thread_stack_size,
1154             tbb::global_control::active_value(tbb::global_control::thread_stack_size) / 2 );
1155     // The test produces the warning (not a error) if fails. So the test is run many times
1156     // to make the log annoying (to force to consider it as an error).
1157     for (int i = 0; i < 100; ++i) {
1158         tbb::task_arena a;
1159         a.initialize();
1160     }
1161 }
1162 
1163 //--------------------------------------------------//
1164 
1165 namespace TestMoveSemanticsNS {
1166     struct TestFunctor {
1167         void operator()() const {};
1168     };
1169 
1170     struct MoveOnlyFunctor : utils::MoveOnly, TestFunctor {
1171         MoveOnlyFunctor() : utils::MoveOnly() {};
1172         MoveOnlyFunctor(MoveOnlyFunctor&& other) : utils::MoveOnly(std::move(other)) {};
1173     };
1174 
1175     struct MovePreferableFunctor : utils::Movable, TestFunctor {
1176         MovePreferableFunctor() : utils::Movable() {};
1177         MovePreferableFunctor(MovePreferableFunctor&& other) : utils::Movable( std::move(other) ) {};
1178         MovePreferableFunctor(const MovePreferableFunctor& other) : utils::Movable(other) {};
1179     };
1180 
1181     struct NoMoveNoCopyFunctor : utils::NoCopy, TestFunctor {
1182         NoMoveNoCopyFunctor() : utils::NoCopy() {};
1183         // mv ctor is not allowed as cp ctor from parent NoCopy
1184     private:
1185         NoMoveNoCopyFunctor(NoMoveNoCopyFunctor&&);
1186     };
1187 
1188     void TestFunctors() {
1189         tbb::task_arena ta;
1190         MovePreferableFunctor mpf;
1191         // execute() doesn't have any copies or moves of arguments inside the impl
1192         ta.execute( NoMoveNoCopyFunctor() );
1193 
1194         ta.enqueue( MoveOnlyFunctor() );
1195         ta.enqueue( mpf );
1196         REQUIRE_MESSAGE(mpf.alive, "object was moved when was passed by lval");
1197         mpf.Reset();
1198         ta.enqueue( std::move(mpf) );
1199         REQUIRE_MESSAGE(!mpf.alive, "object was copied when was passed by rval");
1200         mpf.Reset();
1201     }
1202 }
1203 
1204 void TestMoveSemantics() {
1205     TestMoveSemanticsNS::TestFunctors();
1206 }
1207 
1208 //--------------------------------------------------//
1209 
1210 #include <vector>
1211 
1212 #include "common/state_trackable.h"
1213 
1214 namespace TestReturnValueNS {
1215     struct noDefaultTag {};
1216     class ReturnType : public StateTrackable<> {
1217         static const int SIZE = 42;
1218         std::vector<int> data;
1219     public:
1220         ReturnType(noDefaultTag) : StateTrackable<>(0) {}
1221         // Define copy constructor to test that it is never called
1222         ReturnType(const ReturnType& r) : StateTrackable<>(r), data(r.data) {}
1223         ReturnType(ReturnType&& r) : StateTrackable<>(std::move(r)), data(std::move(r.data)) {}
1224 
1225         void fill() {
1226             for (int i = 0; i < SIZE; ++i)
1227                 data.push_back(i);
1228         }
1229         void check() {
1230             REQUIRE(data.size() == unsigned(SIZE));
1231             for (int i = 0; i < SIZE; ++i)
1232                 REQUIRE(data[i] == i);
1233             StateTrackableCounters::counters_type& cnts = StateTrackableCounters::counters;
1234             REQUIRE(cnts[StateTrackableBase::DefaultInitialized] == 0);
1235             REQUIRE(cnts[StateTrackableBase::DirectInitialized] == 1);
1236             std::size_t copied = cnts[StateTrackableBase::CopyInitialized];
1237             std::size_t moved = cnts[StateTrackableBase::MoveInitialized];
1238             REQUIRE(cnts[StateTrackableBase::Destroyed] == copied + moved);
1239             // The number of copies/moves should not exceed 3 if copy elision takes a place:
1240             // function return, store to an internal storage, acquire internal storage.
1241             // For compilation, without copy elision, this number may be grown up to 7.
1242             REQUIRE((copied == 0 && moved <= 7));
1243             WARN_MESSAGE(moved <= 3,
1244                 "Warning: The number of copies/moves should not exceed 3 if copy elision takes a place."
1245                 "Take an attention to this warning only if copy elision is enabled."
1246             );
1247         }
1248     };
1249 
1250     template <typename R>
1251     R function() {
1252         noDefaultTag tag;
1253         R r(tag);
1254         r.fill();
1255         return r;
1256     }
1257 
1258     template <>
1259     void function<void>() {}
1260 
1261     template <typename R>
1262     struct Functor {
1263         R operator()() const {
1264             return function<R>();
1265         }
1266     };
1267 
1268     tbb::task_arena& arena() {
1269         static tbb::task_arena a;
1270         return a;
1271     }
1272 
1273     template <typename F>
1274     void TestExecute(F &f) {
1275         StateTrackableCounters::reset();
1276         ReturnType r{arena().execute(f)};
1277         r.check();
1278     }
1279 
1280     template <typename F>
1281     void TestExecute(const F &f) {
1282         StateTrackableCounters::reset();
1283         ReturnType r{arena().execute(f)};
1284         r.check();
1285     }
1286     template <typename F>
1287     void TestIsolate(F &f) {
1288         StateTrackableCounters::reset();
1289         ReturnType r{tbb::this_task_arena::isolate(f)};
1290         r.check();
1291     }
1292 
1293     template <typename F>
1294     void TestIsolate(const F &f) {
1295         StateTrackableCounters::reset();
1296         ReturnType r{tbb::this_task_arena::isolate(f)};
1297         r.check();
1298     }
1299 
1300     void Test() {
1301         TestExecute(Functor<ReturnType>());
1302         Functor<ReturnType> f1;
1303         TestExecute(f1);
1304         TestExecute(function<ReturnType>);
1305 
1306         arena().execute(Functor<void>());
1307         Functor<void> f2;
1308         arena().execute(f2);
1309         arena().execute(function<void>);
1310         TestIsolate(Functor<ReturnType>());
1311         TestIsolate(f1);
1312         TestIsolate(function<ReturnType>);
1313         tbb::this_task_arena::isolate(Functor<void>());
1314         tbb::this_task_arena::isolate(f2);
1315         tbb::this_task_arena::isolate(function<void>);
1316     }
1317 }
1318 
1319 void TestReturnValue() {
1320     TestReturnValueNS::Test();
1321 }
1322 
1323 //--------------------------------------------------//
1324 
1325 // MyObserver checks if threads join to the same arena
1326 struct MyObserver: public tbb::task_scheduler_observer {
1327     tbb::enumerable_thread_specific<tbb::task_arena*>& my_tls;
1328     tbb::task_arena& my_arena;
1329     std::atomic<int>& my_failure_counter;
1330     std::atomic<int>& my_counter;
1331     utils::SpinBarrier& m_barrier;
1332 
1333     MyObserver(tbb::task_arena& a,
1334         tbb::enumerable_thread_specific<tbb::task_arena*>& tls,
1335         std::atomic<int>& failure_counter,
1336         std::atomic<int>& counter,
1337         utils::SpinBarrier& barrier)
1338         : tbb::task_scheduler_observer(a), my_tls(tls), my_arena(a),
1339         my_failure_counter(failure_counter), my_counter(counter), m_barrier(barrier) {
1340         observe(true);
1341     }
1342     void on_scheduler_entry(bool worker) override {
1343         if (worker) {
1344             ++my_counter;
1345             tbb::task_arena*& cur_arena = my_tls.local();
1346             if (cur_arena != 0 && cur_arena != &my_arena) {
1347                 ++my_failure_counter;
1348             }
1349             cur_arena = &my_arena;
1350             m_barrier.wait();
1351         }
1352     }
1353     void on_scheduler_exit(bool worker) override {
1354         if (worker) {
1355             m_barrier.wait(); // before wakeup
1356             m_barrier.wait(); // after wakeup
1357         }
1358     }
1359 };
1360 
1361 void TestArenaWorkersMigrationWithNumThreads(int n_threads = 0) {
1362     if (n_threads == 0) {
1363         n_threads = tbb::this_task_arena::max_concurrency();
1364     }
1365 
1366     const int max_n_arenas = 8;
1367     int n_arenas = 2;
1368     if(n_threads > 16) {
1369         n_arenas = max_n_arenas;
1370     } else if (n_threads > 8) {
1371         n_arenas = 4;
1372     }
1373 
1374     int n_workers = n_threads - 1;
1375     n_workers = n_arenas * (n_workers / n_arenas);
1376     if (n_workers == 0) {
1377         return;
1378     }
1379 
1380     n_threads = n_workers + 1;
1381     tbb::global_control control(tbb::global_control::max_allowed_parallelism, n_threads);
1382 
1383     const int n_repetitions = 20;
1384     const int n_outer_repetitions = 100;
1385     std::multiset<float> failure_ratio; // for median calculating
1386     utils::SpinBarrier barrier(n_threads);
1387     utils::SpinBarrier worker_barrier(n_workers);
1388     MyObserver* observer[max_n_arenas];
1389     std::vector<tbb::task_arena> arenas(n_arenas);
1390     std::atomic<int> failure_counter;
1391     std::atomic<int> counter;
1392     tbb::enumerable_thread_specific<tbb::task_arena*> tls;
1393 
1394     for (int i = 0; i < n_arenas; ++i) {
1395         arenas[i].initialize(n_workers / n_arenas + 1); // +1 for master
1396         observer[i] = new MyObserver(arenas[i], tls, failure_counter, counter, barrier);
1397     }
1398 
1399     int ii = 0;
1400     for (; ii < n_outer_repetitions; ++ii) {
1401         failure_counter = 0;
1402         counter = 0;
1403 
1404         // Main code
1405         auto wakeup = [&arenas] { for (auto& a : arenas) a.enqueue([]{}); };
1406         wakeup();
1407         for (int j = 0; j < n_repetitions; ++j) {
1408             barrier.wait(); // entry
1409             barrier.wait(); // exit1
1410             wakeup();
1411             barrier.wait(); // exit2
1412         }
1413         barrier.wait(); // entry
1414         barrier.wait(); // exit1
1415         barrier.wait(); // exit2
1416 
1417         failure_ratio.insert(float(failure_counter) / counter);
1418         tls.clear();
1419         // collect 3 elements in failure_ratio before calculating median
1420         if (ii > 1) {
1421             std::multiset<float>::iterator it = failure_ratio.begin();
1422             std::advance(it, failure_ratio.size() / 2);
1423             if (*it < 0.02)
1424                 break;
1425         }
1426     }
1427     for (int i = 0; i < n_arenas; ++i) {
1428         delete observer[i];
1429     }
1430     // check if median is so big
1431     std::multiset<float>::iterator it = failure_ratio.begin();
1432     std::advance(it, failure_ratio.size() / 2);
1433     // TODO: decrease constants 0.05 and 0.3 by setting ratio between n_threads and n_arenas
1434     if (*it > 0.05) {
1435         REPORT("Warning: So many cases when threads join to different arenas.\n");
1436         REQUIRE_MESSAGE(*it <= 0.3, "A lot of cases when threads join to different arenas.\n");
1437     }
1438 }
1439 
1440 void TestArenaWorkersMigration() {
1441     TestArenaWorkersMigrationWithNumThreads(4);
1442     if (tbb::this_task_arena::max_concurrency() != 4) {
1443         TestArenaWorkersMigrationWithNumThreads();
1444     }
1445 }
1446 
1447 //--------------------------------------------------//
1448 void TestDefaultCreatedWorkersAmount() {
1449     int threads = tbb::this_task_arena::max_concurrency();
1450     utils::NativeParallelFor(1, [threads](int idx) {
1451         REQUIRE_MESSAGE(idx == 0, "more than 1 thread is going to reset TLS");
1452         utils::SpinBarrier barrier(threads);
1453         ResetTLS();
1454         for (auto blocked : { false, true }) {
1455             for (int trail = 0; trail < (blocked ? 10 : 10000); ++trail) {
1456                 tbb::parallel_for(0, threads, [threads, blocked, &barrier](int) {
1457                     CHECK_FAST_MESSAGE(threads == tbb::this_task_arena::max_concurrency(), "concurrency level is not equal specified threadnum");
1458                     CHECK_FAST_MESSAGE(tbb::this_task_arena::current_thread_index() < tbb::this_task_arena::max_concurrency(), "amount of created threads is more than specified by default");
1459                     local_id.local() = 1;
1460                     if (blocked) {
1461                         // If there is more threads than expected, 'sleep' gives a chance to join unexpected threads.
1462                         utils::Sleep(1);
1463                         barrier.wait();
1464                     }
1465                 }, tbb::simple_partitioner());
1466                 REQUIRE_MESSAGE(local_id.size() <= size_t(threads), "amount of created threads is not equal to default num");
1467                 if (blocked) {
1468                     REQUIRE_MESSAGE(local_id.size() == size_t(threads), "amount of created threads is not equal to default num");
1469                 }
1470             }
1471         }
1472     });
1473 }
1474 
1475 void TestAbilityToCreateWorkers(int thread_num) {
1476     tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, thread_num);
1477     // Checks only some part of reserved-external threads amount:
1478     // 0 and 1 reserved threads are important cases but it is also needed
1479     // to collect some statistic data with other amount and to not consume
1480     // whole test sesion time checking each amount
1481     TestArenaConcurrency(thread_num - 1, 0, int(thread_num / 2.72));
1482     TestArenaConcurrency(thread_num, 1, int(thread_num / 3.14));
1483 }
1484 
1485 void TestDefaultWorkersLimit() {
1486     TestDefaultCreatedWorkersAmount();
1487 #if TBB_TEST_LOW_WORKLOAD
1488     TestAbilityToCreateWorkers(24);
1489 #else
1490     TestAbilityToCreateWorkers(256);
1491 #endif
1492 }
1493 
1494 #if TBB_USE_EXCEPTIONS
1495 
1496 void ExceptionInExecute() {
1497     std::size_t thread_number = utils::get_platform_max_threads();
1498     int arena_concurrency = static_cast<int>(thread_number) / 2;
1499     tbb::task_arena test_arena(arena_concurrency, arena_concurrency);
1500 
1501     std::atomic<int> canceled_task{};
1502 
1503     auto parallel_func = [&test_arena, &canceled_task] (std::size_t) {
1504         for (std::size_t i = 0; i < 1000; ++i) {
1505             try {
1506                 test_arena.execute([] {
1507                     volatile bool suppress_unreachable_code_warning = true;
1508                     if (suppress_unreachable_code_warning) {
1509                         throw -1;
1510                     }
1511                 });
1512                 FAIL("An exception should have thrown.");
1513             } catch (int) {
1514                 ++canceled_task;
1515             } catch (...) {
1516                 FAIL("Wrong type of exception.");
1517             }
1518         }
1519     };
1520 
1521     utils::NativeParallelFor(thread_number, parallel_func);
1522     CHECK(canceled_task == thread_number * 1000);
1523 }
1524 
1525 #endif // TBB_USE_EXCEPTIONS
1526 
1527 class simple_observer : public tbb::task_scheduler_observer {
1528     static std::atomic<int> idx_counter;
1529     int my_idx;
1530     int myMaxConcurrency;   // concurrency of the associated arena
1531     int myNumReservedSlots; // reserved slots in the associated arena
1532     void on_scheduler_entry( bool is_worker ) override {
1533         int current_index = tbb::this_task_arena::current_thread_index();
1534         CHECK(current_index < (myMaxConcurrency > 1 ? myMaxConcurrency : 2));
1535         if (is_worker) {
1536             CHECK(current_index >= myNumReservedSlots);
1537         }
1538     }
1539     void on_scheduler_exit( bool /*is_worker*/ ) override
1540     {}
1541 public:
1542     simple_observer(tbb::task_arena &a, int maxConcurrency, int numReservedSlots)
1543         : tbb::task_scheduler_observer(a), my_idx(idx_counter++)
1544         , myMaxConcurrency(maxConcurrency)
1545         , myNumReservedSlots(numReservedSlots) {
1546         observe(true);
1547     }
1548 
1549     friend bool operator<(const simple_observer& lhs, const simple_observer& rhs) {
1550         return lhs.my_idx < rhs.my_idx;
1551     }
1552 };
1553 
1554 std::atomic<int> simple_observer::idx_counter{};
1555 
1556 struct arena_handler {
1557     enum arena_status {
1558         alive,
1559         deleting,
1560         deleted
1561     };
1562 
1563     tbb::task_arena* arena;
1564 
1565     std::atomic<arena_status> status{alive};
1566     tbb::spin_rw_mutex arena_in_use{};
1567 
1568     tbb::concurrent_set<simple_observer> observers;
1569 
1570     arena_handler(tbb::task_arena* ptr) : arena(ptr)
1571     {}
1572 
1573     friend bool operator<(const arena_handler& lhs, const arena_handler& rhs) {
1574         return lhs.arena < rhs.arena;
1575     }
1576 };
1577 
1578 // TODO: Add observer operations
1579 void StressTestMixFunctionality() {
1580     enum operation_type {
1581         create_arena,
1582         delete_arena,
1583         attach_observer,
1584         detach_observer,
1585         arena_execute,
1586         enqueue_task,
1587         last_operation_marker
1588     };
1589 
1590     std::size_t operations_number = last_operation_marker;
1591     std::size_t thread_number = utils::get_platform_max_threads();
1592     utils::FastRandom<> operation_rnd(42);
1593     tbb::spin_mutex random_operation_guard;
1594 
1595     auto get_random_operation = [&operation_rnd, &random_operation_guard, operations_number] () {
1596         tbb::spin_mutex::scoped_lock lock(random_operation_guard);
1597         return static_cast<operation_type>(operation_rnd.get() % operations_number);
1598     };
1599 
1600     utils::FastRandom<> arena_rnd(42);
1601     tbb::spin_mutex random_arena_guard;
1602     auto get_random_arena = [&arena_rnd, &random_arena_guard] () {
1603         tbb::spin_mutex::scoped_lock lock(random_arena_guard);
1604         return arena_rnd.get();
1605     };
1606 
1607     tbb::concurrent_set<arena_handler> arenas_pool;
1608 
1609     std::vector<std::thread> thread_pool;
1610 
1611     utils::SpinBarrier thread_barrier(thread_number);
1612     std::size_t max_operations = 20000;
1613     std::atomic<std::size_t> curr_operation{};
1614 
1615     auto find_arena = [&arenas_pool](tbb::spin_rw_mutex::scoped_lock& lock) -> decltype(arenas_pool.begin()) {
1616         for (auto curr_arena = arenas_pool.begin(); curr_arena != arenas_pool.end(); ++curr_arena) {
1617             if (lock.try_acquire(curr_arena->arena_in_use, /*writer*/ false)) {
1618                 if (curr_arena->status == arena_handler::alive) {
1619                     return curr_arena;
1620                 }
1621                 else {
1622                     lock.release();
1623                 }
1624             }
1625         }
1626         return arenas_pool.end();
1627     };
1628 
1629     auto thread_func = [&] () {
1630         arenas_pool.emplace(new tbb::task_arena());
1631         thread_barrier.wait();
1632         while (curr_operation++ < max_operations) {
1633             switch (get_random_operation()) {
1634                 case create_arena :
1635                 {
1636                     arenas_pool.emplace(new tbb::task_arena());
1637                     break;
1638                 }
1639                 case delete_arena :
1640                 {
1641                     auto curr_arena = arenas_pool.begin();
1642                     for (; curr_arena != arenas_pool.end(); ++curr_arena) {
1643                         arena_handler::arena_status curr_status = arena_handler::alive;
1644                         if (curr_arena->status.compare_exchange_strong(curr_status, arena_handler::deleting)) {
1645                             break;
1646                         }
1647                     }
1648 
1649                     if (curr_arena == arenas_pool.end()) break;
1650 
1651                     tbb::spin_rw_mutex::scoped_lock lock(curr_arena->arena_in_use, /*writer*/ true);
1652 
1653                     delete curr_arena->arena;
1654                     curr_arena->status.store(arena_handler::deleted);
1655 
1656                     break;
1657                 }
1658                 case attach_observer :
1659                 {
1660                     tbb::spin_rw_mutex::scoped_lock lock{};
1661 
1662                     auto curr_arena = find_arena(lock);
1663                     if (curr_arena != arenas_pool.end()) {
1664                         curr_arena->observers.emplace(*curr_arena->arena, thread_number, 1);
1665                     }
1666                     break;
1667                 }
1668                 case detach_observer:
1669                 {
1670                     auto arena_number = get_random_arena() % arenas_pool.size();
1671                     auto curr_arena = arenas_pool.begin();
1672                     std::advance(curr_arena, arena_number);
1673 
1674                     for (auto it = curr_arena->observers.begin(); it != curr_arena->observers.end(); ++it) {
1675                         if (it->is_observing()) {
1676                             it->observe(false);
1677                             break;
1678                         }
1679                     }
1680 
1681                     break;
1682                 }
1683                 case arena_execute:
1684                 {
1685                     tbb::spin_rw_mutex::scoped_lock lock{};
1686                     auto curr_arena = find_arena(lock);
1687 
1688                     if (curr_arena != arenas_pool.end()) {
1689                         curr_arena->arena->execute([]() {
1690                             tbb::affinity_partitioner aff;
1691                             tbb::parallel_for(0, 10000, utils::DummyBody(10), tbb::auto_partitioner{});
1692                             tbb::parallel_for(0, 10000, utils::DummyBody(10), aff);
1693                         });
1694                     }
1695 
1696                     break;
1697                 }
1698                 case enqueue_task:
1699                 {
1700                     tbb::spin_rw_mutex::scoped_lock lock{};
1701                     auto curr_arena = find_arena(lock);
1702 
1703                     if (curr_arena != arenas_pool.end()) {
1704                         curr_arena->arena->enqueue([] { utils::doDummyWork(1000); });
1705                     }
1706 
1707                     break;
1708                 }
1709                 case last_operation_marker :
1710                 break;
1711             }
1712         }
1713     };
1714 
1715     for (std::size_t i = 0; i < thread_number - 1; ++i) {
1716         thread_pool.emplace_back(thread_func);
1717     }
1718 
1719     thread_func();
1720 
1721     for (std::size_t i = 0; i < thread_number - 1; ++i) {
1722         if (thread_pool[i].joinable()) thread_pool[i].join();
1723     }
1724 
1725     for (auto& handler : arenas_pool) {
1726         if (handler.status != arena_handler::deleted) delete handler.arena;
1727     }
1728 }
1729 
1730 struct enqueue_test_helper {
1731     enqueue_test_helper(tbb::task_arena& arena, tbb::enumerable_thread_specific<bool>& ets , std::atomic<std::size_t>& task_counter)
1732         : my_arena(arena), my_ets(ets), my_task_counter(task_counter)
1733     {}
1734 
1735     enqueue_test_helper(const enqueue_test_helper& ef) : my_arena(ef.my_arena), my_ets(ef.my_ets), my_task_counter(ef.my_task_counter)
1736     {}
1737 
1738     void operator() () const {
1739         CHECK(my_ets.local());
1740         if (my_task_counter++ < 100000) my_arena.enqueue(enqueue_test_helper(my_arena, my_ets, my_task_counter));
1741         utils::yield();
1742     }
1743 
1744     tbb::task_arena& my_arena;
1745     tbb::enumerable_thread_specific<bool>& my_ets;
1746     std::atomic<std::size_t>& my_task_counter;
1747 };
1748 
1749 //--------------------------------------------------//
1750 
1751 // This test requires TBB in an uninitialized state
1752 //! \brief \ref requirement
1753 TEST_CASE("task_arena initialize soft limit ignoring affinity mask") {
1754     REQUIRE_MESSAGE((tbb::this_task_arena::current_thread_index() == tbb::task_arena::not_initialized), "TBB was initialized state");
1755     tbb::enumerable_thread_specific<int> ets;
1756 
1757     tbb::task_arena arena(int(utils::get_platform_max_threads() * 2));
1758     arena.execute([&ets] {
1759         tbb::parallel_for(0, 10000000, [&ets](int){
1760             ets.local() = 1;
1761             utils::doDummyWork(100);
1762         });
1763     });
1764 
1765     CHECK(ets.combine(std::plus<int>{}) <= int(utils::get_platform_max_threads()));
1766 }
1767 
1768 //! Test for task arena in concurrent cases
1769 //! \brief \ref requirement
1770 TEST_CASE("Test for concurrent functionality") {
1771     TestConcurrentFunctionality();
1772 }
1773 
1774 //! Test for arena entry consistency
1775 //! \brief \ref requirement \ref error_guessing
1776 TEST_CASE("Test for task arena entry consistency") {
1777     TestArenaEntryConsistency();
1778 }
1779 
1780 //! Test for task arena attach functionality
1781 //! \brief \ref requirement \ref interface
1782 TEST_CASE("Test for the attach functionality") {
1783     TestAttach(4);
1784 }
1785 
1786 //! Test for constant functor requirements
1787 //! \brief \ref requirement \ref interface
1788 TEST_CASE("Test for constant functor requirement") {
1789     TestConstantFunctorRequirement();
1790 }
1791 
1792 //! Test for move semantics support
1793 //! \brief \ref requirement \ref interface
1794 TEST_CASE("Move semantics support") {
1795     TestMoveSemantics();
1796 }
1797 
1798 //! Test for different return value types
1799 //! \brief \ref requirement \ref interface
1800 TEST_CASE("Return value test") {
1801     TestReturnValue();
1802 }
1803 
1804 //! Test for delegated task spawn in case of unsuccessful slot attach
1805 //! \brief \ref error_guessing
1806 TEST_CASE("Delegated spawn wait") {
1807     TestDelegatedSpawnWait();
1808 }
1809 
1810 //! Test task arena isolation functionality
1811 //! \brief \ref requirement \ref interface
1812 TEST_CASE("Isolated execute") {
1813     // Isolation tests cases is valid only for more then 2 threads
1814     if (tbb::this_task_arena::max_concurrency() > 2) {
1815         TestIsolatedExecute();
1816     }
1817 }
1818 
1819 //! Test for TBB Workers creation limits
1820 //! \brief \ref requirement
1821 TEST_CASE("Default workers limit") {
1822     TestDefaultWorkersLimit();
1823 }
1824 
1825 //! Test for workers migration between arenas
1826 //! \brief \ref error_guessing \ref stress
1827 TEST_CASE("Arena workers migration") {
1828     TestArenaWorkersMigration();
1829 }
1830 
1831 //! Test for multiple waits, threads should not block each other
1832 //! \brief \ref requirement
1833 TEST_CASE("Multiple waits") {
1834     TestMultipleWaits();
1835 }
1836 
1837 //! Test for small stack size settings and arena initialization
1838 //! \brief \ref error_guessing
1839 TEST_CASE("Small stack size") {
1840     TestSmallStackSize();
1841 }
1842 
1843 #if TBB_USE_EXCEPTIONS
1844 //! \brief \ref requirement \ref stress
1845 TEST_CASE("Test for exceptions during execute.") {
1846     ExceptionInExecute();
1847 }
1848 
1849 //! \brief \ref error_guessing
1850 TEST_CASE("Exception thrown during tbb::task_arena::execute call") {
1851     struct throwing_obj {
1852         throwing_obj() {
1853             volatile bool flag = true;
1854             if (flag) throw std::exception{};
1855         }
1856         throwing_obj(const throwing_obj&) = default;
1857         ~throwing_obj() { FAIL("An destructor was called."); }
1858     };
1859 
1860     tbb::task_arena arena;
1861 
1862     REQUIRE_THROWS_AS( [&] {
1863         arena.execute([] {
1864             return throwing_obj{};
1865         });
1866     }(), std::exception );
1867 }
1868 #endif // TBB_USE_EXCEPTIONS
1869 //! \brief \ref stress
1870 TEST_CASE("Stress test with mixing functionality") {
1871     StressTestMixFunctionality();
1872 }
1873 //! \brief \ref stress
1874 TEST_CASE("Workers oversubscription") {
1875     std::size_t num_threads = utils::get_platform_max_threads();
1876     tbb::enumerable_thread_specific<bool> ets;
1877     tbb::global_control gl(tbb::global_control::max_allowed_parallelism, num_threads * 2);
1878     tbb::task_arena arena(static_cast<int>(num_threads) * 2);
1879 
1880     utils::SpinBarrier barrier(num_threads * 2);
1881 
1882     arena.execute([&] {
1883         tbb::parallel_for(std::size_t(0), num_threads * 2,
1884             [&] (const std::size_t&) {
1885                 ets.local() = true;
1886                 barrier.wait();
1887             }
1888         );
1889     });
1890 
1891     utils::yield();
1892 
1893     std::atomic<std::size_t> task_counter{0};
1894     for (std::size_t i = 0; i < num_threads / 4 + 1; ++i) {
1895         arena.enqueue(enqueue_test_helper(arena, ets, task_counter));
1896     }
1897 
1898     while (task_counter < 100000) utils::yield();
1899 
1900     arena.execute([&] {
1901         tbb::parallel_for(std::size_t(0), num_threads * 2,
1902             [&] (const std::size_t&) {
1903                 CHECK(ets.local());
1904                 barrier.wait();
1905             }
1906         );
1907     });
1908 }
1909 
1910 #if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS
1911 //! Basic test for arena::enqueue with task handle
1912 //! \brief \ref interface \ref requirement
1913 TEST_CASE("enqueue task_handle") {
1914     tbb::task_arena arena;
1915     tbb::task_group tg;
1916 
1917     std::atomic<bool> run{false};
1918 
1919     auto task_handle = tg.defer([&]{ run = true; });
1920 
1921     arena.enqueue(std::move(task_handle));
1922     tg.wait();
1923 
1924     CHECK(run == true);
1925 }
1926 
1927 //! Basic test for this_task_arena::enqueue with task handle
1928 //! \brief \ref interface \ref requirement
1929 TEST_CASE("this_task_arena::enqueue task_handle") {
1930     tbb::task_arena arena;
1931     tbb::task_group tg;
1932 
1933     std::atomic<bool> run{false};
1934 
1935     arena.execute([&]{
1936         auto task_handle = tg.defer([&]{ run = true; });
1937 
1938         tbb::this_task_arena::enqueue(std::move(task_handle));
1939     });
1940 
1941     tg.wait();
1942 
1943     CHECK(run == true);
1944 }
1945 
1946 //! Basic test for this_task_arena::enqueue with functor
1947 //! \brief \ref interface \ref requirement
1948 TEST_CASE("this_task_arena::enqueue function") {
1949     tbb::task_arena arena;
1950     tbb::task_group tg;
1951 
1952     std::atomic<bool> run{false};
1953     //block the task_group to wait on it
1954     auto task_handle = tg.defer([]{});
1955 
1956     arena.execute([&]{
1957         tbb::this_task_arena::enqueue([&]{
1958             run = true;
1959             //release the task_group
1960             task_handle = tbb::task_handle{};
1961         });
1962     });
1963 
1964     tg.wait();
1965 
1966     CHECK(run == true);
1967 }
1968 
1969 #if TBB_USE_EXCEPTIONS
1970 //! Basic test for exceptions in task_arena::enqueue with task_handle
1971 //! \brief \ref interface \ref requirement
1972 TEST_CASE("task_arena::enqueue(task_handle) exception propagation"){
1973     tbb::task_group tg;
1974     tbb::task_arena arena;
1975 
1976     tbb::task_handle h = tg.defer([&]{
1977         volatile bool suppress_unreachable_code_warning = true;
1978         if (suppress_unreachable_code_warning) {
1979             throw std::runtime_error{ "" };
1980         }
1981     });
1982 
1983     arena.enqueue(std::move(h));
1984 
1985     CHECK_THROWS_AS(tg.wait(), std::runtime_error);
1986 }
1987 
1988 //! Basic test for exceptions in this_task_arena::enqueue with task_handle
1989 //! \brief \ref interface \ref requirement
1990 TEST_CASE("this_task_arena::enqueue(task_handle) exception propagation"){
1991     tbb::task_group tg;
1992 
1993     tbb::task_handle h = tg.defer([&]{
1994         volatile bool suppress_unreachable_code_warning = true;
1995         if (suppress_unreachable_code_warning) {
1996             throw std::runtime_error{ "" };
1997         }
1998     });
1999 
2000     tbb::this_task_arena::enqueue(std::move(h));
2001 
2002     CHECK_THROWS_AS(tg.wait(), std::runtime_error);
2003 }
2004 
2005 //! The test for error in scheduling empty task_handle
2006 //! \brief \ref requirement
2007 TEST_CASE("Empty task_handle cannot be scheduled"){
2008     tbb::task_arena ta;
2009 
2010     CHECK_THROWS_WITH_AS(ta.enqueue(tbb::task_handle{}),                    "Attempt to schedule empty task_handle", std::runtime_error);
2011     CHECK_THROWS_WITH_AS(tbb::this_task_arena::enqueue(tbb::task_handle{}), "Attempt to schedule empty task_handle", std::runtime_error);
2012 }
2013 #endif // TBB_USE_EXCEPTIONS
2014 
2015 //! Basic test for is_inside_task in task_group
2016 //! \brief \ref interface \ref requirement
2017 TEST_CASE("is_inside_task in task_group"){
2018     CHECK( false == tbb::is_inside_task());
2019 
2020     tbb::task_group tg;
2021     tg.run_and_wait([&]{
2022         CHECK( true == tbb::is_inside_task());
2023     });
2024 }
2025 
2026 //! Basic test for is_inside_task in arena::execute
2027 //! \brief \ref interface \ref requirement
2028 TEST_CASE("is_inside_task in arena::execute"){
2029     CHECK( false == tbb::is_inside_task());
2030 
2031     tbb::task_arena arena;
2032 
2033     arena.execute([&]{
2034         // The execute method is processed outside of any task
2035         CHECK( false == tbb::is_inside_task());
2036     });
2037 }
2038 
2039 //! The test for is_inside_task in arena::execute when inside other task
2040 //! \brief \ref error_guessing
2041 TEST_CASE("is_inside_task in arena::execute") {
2042     CHECK(false == tbb::is_inside_task());
2043 
2044     tbb::task_arena arena;
2045     tbb::task_group tg;
2046     tg.run_and_wait([&] {
2047         arena.execute([&] {
2048             // The execute method is processed outside of any task
2049             CHECK(false == tbb::is_inside_task());
2050         });
2051     });
2052 }
2053 #endif //__TBB_PREVIEW_TASK_GROUP_EXTENSIONS
2054