xref: /oneTBB/test/tbb/test_task_arena.cpp (revision 6caecf96)
1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "common/test.h"
18 
19 #define __TBB_EXTRA_DEBUG 1
20 #include "common/concurrency_tracker.h"
21 #include "common/spin_barrier.h"
22 #include "common/utils.h"
23 #include "common/utils_report.h"
24 #include "common/utils_concurrency_limit.h"
25 
26 #include "tbb/task_arena.h"
27 #include "tbb/task_scheduler_observer.h"
28 #include "tbb/enumerable_thread_specific.h"
29 #include "tbb/parallel_for.h"
30 #include "tbb/global_control.h"
31 #include "tbb/concurrent_set.h"
32 #include "tbb/spin_mutex.h"
33 #include "tbb/spin_rw_mutex.h"
34 #include "tbb/task_group.h"
35 
36 #include <stdexcept>
37 #include <cstdlib>
38 #include <cstdio>
39 #include <vector>
40 #include <thread>
41 #include <atomic>
42 
43 //#include "harness_fp.h"
44 
45 //! \file test_task_arena.cpp
46 //! \brief Test for [scheduler.task_arena scheduler.task_scheduler_observer] specification
47 
48 //--------------------------------------------------//
49 // Test that task_arena::initialize and task_arena::terminate work when doing nothing else.
50 /* maxthread is treated as the biggest possible concurrency level. */
51 void InitializeAndTerminate( int maxthread ) {
52     for( int i=0; i<200; ++i ) {
53         switch( i&3 ) {
54             // Arena is created inactive, initialization is always explicit. Lazy initialization is covered by other test functions.
55             // Explicit initialization can either keep the original values or change those.
56             // Arena termination can be explicit or implicit (in the destructor).
57             // TODO: extend with concurrency level checks if such a method is added.
58             default: {
59                 tbb::task_arena arena( std::rand() % maxthread + 1 );
60                 CHECK_MESSAGE(!arena.is_active(), "arena should not be active until initialized");
61                 arena.initialize();
62                 CHECK(arena.is_active());
63                 arena.terminate();
64                 CHECK_MESSAGE(!arena.is_active(), "arena should not be active; it was terminated");
65                 break;
66             }
67             case 0: {
68                 tbb::task_arena arena( 1 );
69                 CHECK_MESSAGE(!arena.is_active(), "arena should not be active until initialized");
70                 arena.initialize( std::rand() % maxthread + 1 ); // change the parameters
71                 CHECK(arena.is_active());
72                 break;
73             }
74             case 1: {
75                 tbb::task_arena arena( tbb::task_arena::automatic );
76                 CHECK(!arena.is_active());
77                 arena.initialize();
78                 CHECK(arena.is_active());
79                 break;
80             }
81             case 2: {
82                 tbb::task_arena arena;
83                 CHECK_MESSAGE(!arena.is_active(), "arena should not be active until initialized");
84                 arena.initialize( std::rand() % maxthread + 1 );
85                 CHECK(arena.is_active());
86                 arena.terminate();
87                 CHECK_MESSAGE(!arena.is_active(), "arena should not be active; it was terminated");
88                 break;
89             }
90         }
91     }
92 }
93 
94 //--------------------------------------------------//
95 
96 // Definitions used in more than one test
97 typedef tbb::blocked_range<int> Range;
98 
99 // slot_id value: -1 is reserved by current_slot(), -2 is set in on_scheduler_exit() below
100 static tbb::enumerable_thread_specific<int> local_id, old_id, slot_id(-3);
101 
102 void ResetTLS() {
103     local_id.clear();
104     old_id.clear();
105     slot_id.clear();
106 }
107 
108 class ArenaObserver : public tbb::task_scheduler_observer {
109     int myId;               // unique observer/arena id within a test
110     int myMaxConcurrency;   // concurrency of the associated arena
111     int myNumReservedSlots; // reserved slots in the associated arena
112     void on_scheduler_entry( bool is_worker ) override {
113         int current_index = tbb::this_task_arena::current_thread_index();
114         CHECK(current_index < (myMaxConcurrency > 1 ? myMaxConcurrency : 2));
115         if (is_worker) {
116             CHECK(current_index >= myNumReservedSlots);
117         }
118         CHECK_MESSAGE(!old_id.local(), "double call to on_scheduler_entry");
119         old_id.local() = local_id.local();
120         CHECK_MESSAGE(old_id.local() != myId, "double entry to the same arena");
121         local_id.local() = myId;
122         slot_id.local() = current_index;
123     }
124     void on_scheduler_exit( bool /*is_worker*/ ) override {
125         CHECK_MESSAGE(local_id.local() == myId, "nesting of arenas is broken");
126         CHECK(slot_id.local() == tbb::this_task_arena::current_thread_index());
127         slot_id.local() = -2;
128         local_id.local() = old_id.local();
129         old_id.local() = 0;
130     }
131 public:
132     ArenaObserver(tbb::task_arena &a, int maxConcurrency, int numReservedSlots, int id)
133         : tbb::task_scheduler_observer(a)
134         , myId(id)
135         , myMaxConcurrency(maxConcurrency)
136         , myNumReservedSlots(numReservedSlots) {
137         CHECK(myId);
138         observe(true);
139     }
140     ~ArenaObserver () {
141         CHECK_MESSAGE(!old_id.local(), "inconsistent observer state");
142     }
143 };
144 
145 struct IndexTrackingBody { // Must be used together with ArenaObserver
146     void operator() ( const Range& ) const {
147         CHECK(slot_id.local() == tbb::this_task_arena::current_thread_index());
148         utils::doDummyWork(50000);
149     }
150 };
151 
152 struct AsynchronousWork {
153     utils::SpinBarrier &my_barrier;
154     bool my_is_blocking;
155     AsynchronousWork(utils::SpinBarrier &a_barrier, bool blocking = true)
156     : my_barrier(a_barrier), my_is_blocking(blocking) {}
157     void operator()() const {
158         CHECK_MESSAGE(local_id.local() != 0, "not in explicit arena");
159         tbb::parallel_for(Range(0,500), IndexTrackingBody(), tbb::simple_partitioner());
160         if(my_is_blocking) my_barrier.wait(); // must be asynchronous to an external thread
161         else my_barrier.signalNoWait();
162     }
163 };
164 
165 //-----------------------------------------------------------------------------------------//
166 
167 // Test that task_arenas might be created and used from multiple application threads.
168 // Also tests arena observers. The parameter p is the index of an app thread running this test.
169 void TestConcurrentArenasFunc(int idx) {
170     // A regression test for observer activation order:
171     // check that arena observer can be activated before local observer
172     struct LocalObserver : public tbb::task_scheduler_observer {
173         LocalObserver() : tbb::task_scheduler_observer() { observe(true); }
174     };
175     tbb::task_arena a1;
176     a1.initialize(1,0);
177     ArenaObserver o1(a1, 1, 0, idx*2+1); // the last argument is a "unique" observer/arena id for the test
178     CHECK_MESSAGE(o1.is_observing(), "Arena observer has not been activated");
179     LocalObserver lo;
180     CHECK_MESSAGE(lo.is_observing(), "Local observer has not been activated");
181     tbb::task_arena a2(2,1);
182     ArenaObserver o2(a2, 2, 1, idx*2+2);
183     CHECK_MESSAGE(o2.is_observing(), "Arena observer has not been activated");
184     utils::SpinBarrier barrier(2);
185     AsynchronousWork work(barrier);
186     a1.enqueue(work); // put async work
187     barrier.wait();
188     a2.enqueue(work); // another work
189     a2.execute(work);
190     a1.debug_wait_until_empty();
191     a2.debug_wait_until_empty();
192 }
193 
194 void TestConcurrentArenas(int p) {
195     // TODO REVAMP fix with global control
196     ResetTLS();
197     utils::NativeParallelFor( p, &TestConcurrentArenasFunc );
198 }
199 //--------------------------------------------------//
200 // Test multiple application threads working with a single arena at the same time.
201 class MultipleMastersPart1 : utils::NoAssign {
202     tbb::task_arena &my_a;
203     utils::SpinBarrier &my_b1, &my_b2;
204 public:
205     MultipleMastersPart1( tbb::task_arena &a, utils::SpinBarrier &b1, utils::SpinBarrier &b2)
206         : my_a(a), my_b1(b1), my_b2(b2) {}
207     void operator()(int) const {
208         my_a.execute(AsynchronousWork(my_b2, /*blocking=*/false));
209         my_b1.wait();
210         // A regression test for bugs 1954 & 1971
211         my_a.enqueue(AsynchronousWork(my_b2, /*blocking=*/false));
212     }
213 };
214 
215 class MultipleMastersPart2 : utils::NoAssign {
216     tbb::task_arena &my_a;
217     utils::SpinBarrier &my_b;
218 public:
219     MultipleMastersPart2( tbb::task_arena &a, utils::SpinBarrier &b) : my_a(a), my_b(b) {}
220     void operator()(int) const {
221         my_a.execute(AsynchronousWork(my_b, /*blocking=*/false));
222     }
223 };
224 
225 class MultipleMastersPart3 : utils::NoAssign {
226     tbb::task_arena &my_a;
227     utils::SpinBarrier &my_b;
228     using wait_context = tbb::detail::d1::wait_context;
229 
230     struct Runner : NoAssign {
231         wait_context& myWait;
232         Runner(wait_context& w) : myWait(w) {}
233         void operator()() const {
234             utils::doDummyWork(10000);
235             myWait.release();
236         }
237     };
238 
239     struct Waiter : NoAssign {
240         wait_context& myWait;
241         Waiter(wait_context& w) : myWait(w) {}
242         void operator()() const {
243             tbb::task_group_context ctx;
244             tbb::detail::d1::wait(myWait, ctx);
245         }
246     };
247 
248 public:
249     MultipleMastersPart3(tbb::task_arena &a, utils::SpinBarrier &b)
250         : my_a(a), my_b(b) {}
251     void operator()(int) const {
252         wait_context wait(0);
253         my_b.wait(); // increases chances for task_arena initialization contention
254         for( int i=0; i<100; ++i) {
255             wait.reserve();
256             my_a.enqueue(Runner(wait));
257             my_a.execute(Waiter(wait));
258         }
259         my_b.wait();
260     }
261 };
262 
263 void TestMultipleMasters(int p) {
264     {
265         ResetTLS();
266         tbb::task_arena a(1,0);
267         a.initialize();
268         ArenaObserver o(a, 1, 0, 1);
269         utils::SpinBarrier barrier1(p), barrier2(2*p+1); // each of p threads will submit two tasks signaling the barrier
270         NativeParallelFor( p, MultipleMastersPart1(a, barrier1, barrier2) );
271         barrier2.wait();
272         a.debug_wait_until_empty();
273     } {
274         ResetTLS();
275         tbb::task_arena a(2,1);
276         ArenaObserver o(a, 2, 1, 2);
277         utils::SpinBarrier barrier(p+2);
278         a.enqueue(AsynchronousWork(barrier, /*blocking=*/true)); // occupy the worker, a regression test for bug 1981
279         // TODO: buggy test. A worker threads need time to occupy the slot to prevent an external thread from taking an enqueue task.
280         utils::Sleep(10);
281         NativeParallelFor( p, MultipleMastersPart2(a, barrier) );
282         barrier.wait();
283         a.debug_wait_until_empty();
284     } {
285         // Regression test for the bug 1981 part 2 (task_arena::execute() with wait_for_all for an enqueued task)
286         tbb::task_arena a(p,1);
287         utils::SpinBarrier barrier(p+1); // for external threads to avoid endless waiting at least in some runs
288         // "Oversubscribe" the arena by 1 external thread
289         NativeParallelFor( p+1, MultipleMastersPart3(a, barrier) );
290         a.debug_wait_until_empty();
291     }
292 }
293 
294 //--------------------------------------------------//
295 // TODO: explain what TestArenaEntryConsistency does
296 #include <sstream>
297 #include <stdexcept>
298 #include "oneapi/tbb/detail/_exception.h"
299 #include "common/fp_control.h"
300 
301 struct TestArenaEntryBody : FPModeContext {
302     std::atomic<int> &my_stage; // each execute increases it
303     std::stringstream my_id;
304     bool is_caught, is_expected;
305     enum { arenaFPMode = 1 };
306 
307     TestArenaEntryBody(std::atomic<int> &s, int idx, int i)  // init thread-specific instance
308     :   FPModeContext(idx+i)
309     ,   my_stage(s)
310     ,   is_caught(false)
311 #if TBB_USE_EXCEPTIONS
312     ,   is_expected( (idx&(1<<i)) != 0 )
313 #else
314     , is_expected(false)
315 #endif
316     {
317         my_id << idx << ':' << i << '@';
318     }
319     void operator()() { // inside task_arena::execute()
320         // synchronize with other stages
321         int stage = my_stage++;
322         int slot = tbb::this_task_arena::current_thread_index();
323         CHECK(slot >= 0);
324         CHECK(slot <= 1);
325         // wait until the third stage is delegated and then starts on slot 0
326         while(my_stage < 2+slot) utils::yield();
327         // deduct its entry type and put it into id, it helps to find source of a problem
328         my_id << (stage < 3 ? (tbb::this_task_arena::current_thread_index()?
329                               "delegated_to_worker" : stage < 2? "direct" : "delegated_to_master")
330                             : stage == 3? "nested_same_ctx" : "nested_alien_ctx");
331         AssertFPMode(arenaFPMode);
332         if (is_expected) {
333             TBB_TEST_THROW(std::logic_error(my_id.str()));
334         }
335         // no code can be put here since exceptions can be thrown
336     }
337     void on_exception(const char *e) { // outside arena, in catch block
338         is_caught = true;
339         CHECK(my_id.str() == e);
340         assertFPMode();
341     }
342     void after_execute() { // outside arena and catch block
343         CHECK(is_caught == is_expected);
344         assertFPMode();
345     }
346 };
347 
348 class ForEachArenaEntryBody : utils::NoAssign {
349     tbb::task_arena &my_a; // expected task_arena(2,1)
350     std::atomic<int> &my_stage; // each execute increases it
351     int my_idx;
352 
353 public:
354     ForEachArenaEntryBody(tbb::task_arena &a, std::atomic<int> &c)
355     : my_a(a), my_stage(c), my_idx(0) {}
356 
357     void test(int idx) {
358         my_idx = idx;
359         my_stage = 0;
360         NativeParallelFor(3, *this); // test cross-arena calls
361         CHECK(my_stage == 3);
362         my_a.execute(*this); // test nested calls
363         CHECK(my_stage == 5);
364     }
365 
366     // task_arena functor for nested tests
367     void operator()() const {
368         test_arena_entry(3); // in current task group context
369         tbb::parallel_for(4, 5, *this); // in different context
370     }
371 
372     // NativeParallelFor & parallel_for functor
373     void operator()(int i) const {
374         test_arena_entry(i);
375     }
376 
377 private:
378     void test_arena_entry(int i) const {
379         GetRoundingMode();
380         TestArenaEntryBody scoped_functor(my_stage, my_idx, i);
381         GetRoundingMode();
382 #if TBB_USE_EXCEPTIONS
383         try {
384             my_a.execute(scoped_functor);
385         } catch(std::logic_error &e) {
386             scoped_functor.on_exception(e.what());
387         } catch(...) { CHECK_MESSAGE(false, "Unexpected exception type"); }
388 #else
389         my_a.execute(scoped_functor);
390 #endif
391         scoped_functor.after_execute();
392     }
393 };
394 
395 void TestArenaEntryConsistency() {
396     tbb::task_arena a(2, 1);
397     std::atomic<int> c;
398     ForEachArenaEntryBody body(a, c);
399 
400     FPModeContext fp_scope(TestArenaEntryBody::arenaFPMode);
401     a.initialize(); // capture FP settings to arena
402     fp_scope.setNextFPMode();
403 
404     for (int i = 0; i < 100; i++) // not less than 32 = 2^5 of entry types
405         body.test(i);
406 }
407 //--------------------------------------------------
408 // Test that the requested degree of concurrency for task_arena is achieved in various conditions
409 class TestArenaConcurrencyBody : utils::NoAssign {
410     tbb::task_arena &my_a;
411     int my_max_concurrency;
412     int my_reserved_slots;
413     utils::SpinBarrier *my_barrier;
414     utils::SpinBarrier *my_worker_barrier;
415 public:
416     TestArenaConcurrencyBody( tbb::task_arena &a, int max_concurrency, int reserved_slots, utils::SpinBarrier *b = NULL, utils::SpinBarrier *wb = NULL )
417     : my_a(a), my_max_concurrency(max_concurrency), my_reserved_slots(reserved_slots), my_barrier(b), my_worker_barrier(wb) {}
418     // NativeParallelFor's functor
419     void operator()( int ) const {
420         CHECK_MESSAGE( local_id.local() == 0, "TLS was not cleaned?" );
421         local_id.local() = 1;
422         my_a.execute( *this );
423     }
424     // Arena's functor
425     void operator()() const {
426         int idx = tbb::this_task_arena::current_thread_index();
427         CHECK( idx < (my_max_concurrency > 1 ? my_max_concurrency : 2) );
428         CHECK( my_a.max_concurrency() == tbb::this_task_arena::max_concurrency() );
429         int max_arena_concurrency = tbb::this_task_arena::max_concurrency();
430         CHECK( max_arena_concurrency == my_max_concurrency );
431         if ( my_worker_barrier ) {
432             if ( local_id.local() == 1 ) {
433                 // External thread in a reserved slot
434                 CHECK_MESSAGE( idx < my_reserved_slots, "External threads are supposed to use only reserved slots in this test" );
435             } else {
436                 // Worker thread
437                 CHECK( idx >= my_reserved_slots );
438                 my_worker_barrier->wait();
439             }
440         } else if ( my_barrier )
441             CHECK_MESSAGE( local_id.local() == 1, "Workers are not supposed to enter the arena in this test" );
442         if ( my_barrier ) my_barrier->wait();
443         else utils::Sleep( 1 );
444     }
445 };
446 
447 void TestArenaConcurrency( int p, int reserved = 0, int step = 1) {
448     for (; reserved <= p; reserved += step) {
449         tbb::task_arena a( p, reserved );
450         if (p - reserved < tbb::this_task_arena::max_concurrency()) {
451             // Check concurrency with worker & reserved external threads.
452             ResetTLS();
453             utils::SpinBarrier b( p );
454             utils::SpinBarrier wb( p-reserved );
455             TestArenaConcurrencyBody test( a, p, reserved, &b, &wb );
456             for ( int i = reserved; i < p; ++i ) // requests p-reserved worker threads
457                 a.enqueue( test );
458             if ( reserved==1 )
459                 test( 0 ); // calls execute()
460             else
461                 utils::NativeParallelFor( reserved, test );
462             a.debug_wait_until_empty();
463         } { // Check if multiple external threads alone can achieve maximum concurrency.
464             ResetTLS();
465             utils::SpinBarrier b( p );
466             utils::NativeParallelFor( p, TestArenaConcurrencyBody( a, p, reserved, &b ) );
467             a.debug_wait_until_empty();
468         } { // Check oversubscription by external threads.
469 #if !_WIN32 || !_WIN64
470             // Some C++ implementations allocate 8MB stacks for std::thread on 32 bit platforms
471             // that makes impossible to create more than ~500 threads.
472             if ( !(sizeof(std::size_t) == 4 && p > 200) )
473 #endif
474 #if TBB_TEST_LOW_WORKLOAD
475             if ( p <= 16 )
476 #endif
477             {
478                 ResetTLS();
479                 utils::NativeParallelFor(2 * p, TestArenaConcurrencyBody(a, p, reserved));
480                 a.debug_wait_until_empty();
481             }
482         }
483     }
484 }
485 
486 struct TestMandatoryConcurrencyObserver : public tbb::task_scheduler_observer {
487     utils::SpinBarrier& m_barrier;
488 
489     TestMandatoryConcurrencyObserver(tbb::task_arena& a, utils::SpinBarrier& barrier)
490         : tbb::task_scheduler_observer(a), m_barrier(barrier) {
491         observe(true);
492     }
493     void on_scheduler_exit(bool worker) override {
494         if (worker) {
495             m_barrier.wait();
496         }
497     }
498 };
499 
500 void TestMandatoryConcurrency() {
501     tbb::task_arena a(1);
502     a.execute([&a] {
503         int n_threads = 4;
504         utils::SpinBarrier exit_barrier(2);
505         TestMandatoryConcurrencyObserver observer(a, exit_barrier);
506         for (int j = 0; j < 5; ++j) {
507             utils::ExactConcurrencyLevel::check(1);
508             std::atomic<int> num_tasks{ 0 }, curr_tasks{ 0 };
509             utils::SpinBarrier barrier(n_threads);
510             utils::NativeParallelFor(n_threads, [&](int) {
511                 for (int i = 0; i < 5; ++i) {
512                     barrier.wait();
513                     a.enqueue([&] {
514                         CHECK(tbb::this_task_arena::max_concurrency() == 2);
515                         CHECK(a.max_concurrency() == 2);
516                         ++curr_tasks;
517                         CHECK(curr_tasks == 1);
518                         utils::doDummyWork(1000);
519                         CHECK(curr_tasks == 1);
520                         --curr_tasks;
521                         ++num_tasks;
522                     });
523                     barrier.wait();
524                 }
525             });
526             do {
527                 exit_barrier.wait();
528             } while (num_tasks < n_threads * 5);
529         }
530         observer.observe(false);
531     });
532 }
533 
534 void TestConcurrentFunctionality(int min_thread_num = 1, int max_thread_num = 3) {
535     TestMandatoryConcurrency();
536     InitializeAndTerminate(max_thread_num);
537     for (int p = min_thread_num; p <= max_thread_num; ++p) {
538         TestConcurrentArenas(p);
539         TestMultipleMasters(p);
540         TestArenaConcurrency(p);
541     }
542 }
543 
544 //--------------------------------------------------//
545 // Test creation/initialization of a task_arena that references an existing arena (aka attach).
546 // This part of the test uses the knowledge of task_arena internals
547 
548 struct TaskArenaValidator {
549     int my_slot_at_construction;
550     const tbb::task_arena& my_arena;
551     TaskArenaValidator( const tbb::task_arena& other )
552         : my_slot_at_construction(tbb::this_task_arena::current_thread_index())
553         , my_arena(other)
554     {}
555     // Inspect the internal state
556     int concurrency() { return my_arena.debug_max_concurrency(); }
557     int reserved_for_masters() { return my_arena.debug_reserved_slots(); }
558 
559     // This method should be called in task_arena::execute() for a captured arena
560     // by the same thread that created the validator.
561     void operator()() {
562         CHECK_MESSAGE( tbb::this_task_arena::current_thread_index()==my_slot_at_construction,
563                 "Current thread index has changed since the validator construction" );
564     }
565 };
566 
567 void ValidateAttachedArena( tbb::task_arena& arena, bool expect_activated,
568                             int expect_concurrency, int expect_masters ) {
569     CHECK_MESSAGE( arena.is_active()==expect_activated, "Unexpected activation state" );
570     if( arena.is_active() ) {
571         TaskArenaValidator validator( arena );
572         CHECK_MESSAGE( validator.concurrency()==expect_concurrency, "Unexpected arena size" );
573         CHECK_MESSAGE( validator.reserved_for_masters()==expect_masters, "Unexpected # of reserved slots" );
574         if ( tbb::this_task_arena::current_thread_index() != tbb::task_arena::not_initialized ) {
575             CHECK(tbb::this_task_arena::current_thread_index() >= 0);
576             // for threads already in arena, check that the thread index remains the same
577             arena.execute( validator );
578         } else { // not_initialized
579             // Test the deprecated method
580             CHECK(tbb::this_task_arena::current_thread_index() == -1);
581         }
582 
583         // Ideally, there should be a check for having the same internal arena object,
584         // but that object is not easily accessible for implicit arenas.
585     }
586 }
587 
588 struct TestAttachBody : utils::NoAssign {
589     static thread_local int my_idx; // safe to modify and use within the NativeParallelFor functor
590     const int maxthread;
591     TestAttachBody( int max_thr ) : maxthread(max_thr) {}
592 
593     // The functor body for NativeParallelFor
594     void operator()( int idx ) const {
595         my_idx = idx;
596 
597         int default_threads = tbb::this_task_arena::max_concurrency();
598 
599         tbb::task_arena arena = tbb::task_arena( tbb::task_arena::attach() );
600         ValidateAttachedArena( arena, false, -1, -1 ); // Nothing yet to attach to
601 
602         arena.terminate();
603         ValidateAttachedArena( arena, false, -1, -1 );
604 
605         // attach to an auto-initialized arena
606         tbb::parallel_for(0, 1, [](int) {});
607 
608         tbb::task_arena arena2 = tbb::task_arena( tbb::task_arena::attach() );
609         ValidateAttachedArena( arena2, true, default_threads, 1 );
610 
611         // attach to another task_arena
612         arena.initialize( maxthread, std::min(maxthread,idx) );
613         arena.execute( *this );
614     }
615 
616     // The functor body for task_arena::execute above
617     void operator()() const {
618         tbb::task_arena arena2 = tbb::task_arena( tbb::task_arena::attach() );
619         ValidateAttachedArena( arena2, true, maxthread, std::min(maxthread,my_idx) );
620     }
621 
622     // The functor body for tbb::parallel_for
623     void operator()( const Range& r ) const {
624         for( int i = r.begin(); i<r.end(); ++i ) {
625             tbb::task_arena arena2 = tbb::task_arena( tbb::task_arena::attach() );
626             ValidateAttachedArena( arena2, true, tbb::this_task_arena::max_concurrency(), 1 );
627         }
628     }
629 };
630 
631 thread_local int TestAttachBody::my_idx;
632 
633 void TestAttach( int maxthread ) {
634     // Externally concurrent, but no concurrency within a thread
635     utils::NativeParallelFor( std::max(maxthread,4), TestAttachBody( maxthread ) );
636     // Concurrent within the current arena; may also serve as a stress test
637     tbb::parallel_for( Range(0,10000*maxthread), TestAttachBody( maxthread ) );
638 }
639 
640 //--------------------------------------------------//
641 
642 // Test that task_arena::enqueue does not tolerate a non-const functor.
643 // TODO: can it be reworked as SFINAE-based compile-time check?
644 struct TestFunctor {
645     void operator()() { CHECK_MESSAGE( false, "Non-const operator called" ); }
646     void operator()() const { /* library requires this overload only */ }
647 };
648 
649 void TestConstantFunctorRequirement() {
650     tbb::task_arena a;
651     TestFunctor tf;
652     a.enqueue( tf );
653 }
654 
655 //--------------------------------------------------//
656 
657 #include "tbb/parallel_reduce.h"
658 #include "tbb/parallel_invoke.h"
659 
660 // Test this_task_arena::isolate
661 namespace TestIsolatedExecuteNS {
662     template <typename NestedPartitioner>
663     class NestedParFor : utils::NoAssign {
664     public:
665         NestedParFor() {}
666         void operator()() const {
667             NestedPartitioner p;
668             tbb::parallel_for( 0, 10, utils::DummyBody( 10 ), p );
669         }
670     };
671 
672     template <typename NestedPartitioner>
673     class ParForBody : utils::NoAssign {
674         bool myOuterIsolation;
675         tbb::enumerable_thread_specific<int> &myEts;
676         std::atomic<bool> &myIsStolen;
677     public:
678         ParForBody( bool outer_isolation, tbb::enumerable_thread_specific<int> &ets, std::atomic<bool> &is_stolen )
679             : myOuterIsolation( outer_isolation ), myEts( ets ), myIsStolen( is_stolen ) {}
680         void operator()( int ) const {
681             int &e = myEts.local();
682             if ( e++ > 0 ) myIsStolen = true;
683             if ( myOuterIsolation )
684                 NestedParFor<NestedPartitioner>()();
685             else
686                 tbb::this_task_arena::isolate( NestedParFor<NestedPartitioner>() );
687             --e;
688         }
689     };
690 
691     template <typename OuterPartitioner, typename NestedPartitioner>
692     class OuterParFor : utils::NoAssign {
693         bool myOuterIsolation;
694         std::atomic<bool> &myIsStolen;
695     public:
696         OuterParFor( bool outer_isolation, std::atomic<bool> &is_stolen ) : myOuterIsolation( outer_isolation ), myIsStolen( is_stolen ) {}
697         void operator()() const {
698             tbb::enumerable_thread_specific<int> ets( 0 );
699             OuterPartitioner p;
700             tbb::parallel_for( 0, 1000, ParForBody<NestedPartitioner>( myOuterIsolation, ets, myIsStolen ), p );
701         }
702     };
703 
704     template <typename OuterPartitioner, typename NestedPartitioner>
705     void TwoLoopsTest( bool outer_isolation ) {
706         std::atomic<bool> is_stolen;
707         is_stolen = false;
708         const int max_repeats = 100;
709         if ( outer_isolation ) {
710             for ( int i = 0; i <= max_repeats; ++i ) {
711                 tbb::this_task_arena::isolate( OuterParFor<OuterPartitioner, NestedPartitioner>( outer_isolation, is_stolen ) );
712                 if ( is_stolen ) break;
713             }
714             // TODO: was ASSERT_WARNING
715             if (!is_stolen) {
716                 REPORT("Warning: isolate() should not block stealing on nested levels without isolation\n");
717             }
718         } else {
719             for ( int i = 0; i <= max_repeats; ++i ) {
720                 OuterParFor<OuterPartitioner, NestedPartitioner>( outer_isolation, is_stolen )();
721             }
722             REQUIRE_MESSAGE( !is_stolen, "isolate() on nested levels should prevent stealing from outer leves" );
723         }
724     }
725 
726     void TwoLoopsTest( bool outer_isolation ) {
727         TwoLoopsTest<tbb::simple_partitioner, tbb::simple_partitioner>( outer_isolation );
728         TwoLoopsTest<tbb::simple_partitioner, tbb::affinity_partitioner>( outer_isolation );
729         TwoLoopsTest<tbb::affinity_partitioner, tbb::simple_partitioner>( outer_isolation );
730         TwoLoopsTest<tbb::affinity_partitioner, tbb::affinity_partitioner>( outer_isolation );
731     }
732 
733     void TwoLoopsTest() {
734         TwoLoopsTest( true );
735         TwoLoopsTest( false );
736     }
737     //--------------------------------------------------//
738     class HeavyMixTestBody : utils::NoAssign {
739         tbb::enumerable_thread_specific<utils::FastRandom<>>& myRandom;
740         tbb::enumerable_thread_specific<int>& myIsolatedLevel;
741         int myNestedLevel;
742 
743         template <typename Partitioner, typename Body>
744         static void RunTwoBodies( utils::FastRandom<>& rnd, const Body &body, Partitioner& p, tbb::task_group_context* ctx = NULL ) {
745             if ( rnd.get() % 2 ) {
746                 if  (ctx )
747                     tbb::parallel_for( 0, 2, body, p, *ctx );
748                 else
749                     tbb::parallel_for( 0, 2, body, p );
750             } else {
751                 tbb::parallel_invoke( body, body );
752             }
753         }
754 
755         template <typename Partitioner>
756         class IsolatedBody : utils::NoAssign {
757             const HeavyMixTestBody &myHeavyMixTestBody;
758             Partitioner &myPartitioner;
759         public:
760             IsolatedBody( const HeavyMixTestBody &body, Partitioner &partitioner )
761                 : myHeavyMixTestBody( body ), myPartitioner( partitioner ) {}
762             void operator()() const {
763                 RunTwoBodies( myHeavyMixTestBody.myRandom.local(),
764                     HeavyMixTestBody( myHeavyMixTestBody.myRandom, myHeavyMixTestBody.myIsolatedLevel,
765                         myHeavyMixTestBody.myNestedLevel + 1 ),
766                     myPartitioner );
767             }
768         };
769 
770         template <typename Partitioner>
771         void RunNextLevel( utils::FastRandom<>& rnd, int &isolated_level ) const {
772             Partitioner p;
773             switch ( rnd.get() % 2 ) {
774                 case 0: {
775                     // No features
776                     tbb::task_group_context ctx;
777                     RunTwoBodies( rnd, HeavyMixTestBody(myRandom, myIsolatedLevel, myNestedLevel + 1), p, &ctx );
778                     break;
779                 }
780                 case 1: {
781                     // Isolation
782                     int previous_isolation = isolated_level;
783                     isolated_level = myNestedLevel;
784                     tbb::this_task_arena::isolate( IsolatedBody<Partitioner>( *this, p ) );
785                     isolated_level = previous_isolation;
786                     break;
787                 }
788             }
789         }
790     public:
791         HeavyMixTestBody( tbb::enumerable_thread_specific<utils::FastRandom<>>& random,
792             tbb::enumerable_thread_specific<int>& isolated_level, int nested_level )
793             : myRandom( random ), myIsolatedLevel( isolated_level )
794             , myNestedLevel( nested_level ) {}
795         void operator()() const {
796             int &isolated_level = myIsolatedLevel.local();
797             CHECK_FAST_MESSAGE( myNestedLevel > isolated_level, "The outer-level task should not be stolen on isolated level" );
798             if ( myNestedLevel == 20 )
799                 return;
800             utils::FastRandom<>& rnd = myRandom.local();
801             if ( rnd.get() % 2 == 1 ) {
802                 RunNextLevel<tbb::auto_partitioner>( rnd, isolated_level );
803             } else {
804                 RunNextLevel<tbb::affinity_partitioner>( rnd, isolated_level );
805             }
806         }
807         void operator()(int) const {
808             this->operator()();
809         }
810     };
811 
812     struct RandomInitializer {
813         utils::FastRandom<> operator()() {
814             return utils::FastRandom<>( tbb::this_task_arena::current_thread_index() );
815         }
816     };
817 
818     void HeavyMixTest() {
819         std::size_t num_threads = tbb::this_task_arena::max_concurrency() < 3 ? 3 : tbb::this_task_arena::max_concurrency();
820         tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, num_threads);
821 
822         RandomInitializer init_random;
823         tbb::enumerable_thread_specific<utils::FastRandom<>> random( init_random );
824         tbb::enumerable_thread_specific<int> isolated_level( 0 );
825         for ( int i = 0; i < 5; ++i ) {
826             HeavyMixTestBody b( random, isolated_level, 1 );
827             b( 0 );
828         }
829     }
830 
831     //--------------------------------------------------//
832 #if TBB_USE_EXCEPTIONS
833     struct MyException {};
834     struct IsolatedBodyThrowsException {
835         void operator()() const {
836 #if _MSC_VER && !__INTEL_COMPILER
837             // Workaround an unreachable code warning in task_arena_function.
838             volatile bool workaround = true;
839             if (workaround)
840 #endif
841             {
842                 throw MyException();
843             }
844         }
845     };
846     struct ExceptionTestBody : utils::NoAssign {
847         tbb::enumerable_thread_specific<int>& myEts;
848         std::atomic<bool>& myIsStolen;
849         ExceptionTestBody( tbb::enumerable_thread_specific<int>& ets, std::atomic<bool>& is_stolen )
850             : myEts( ets ), myIsStolen( is_stolen ) {}
851         void operator()( int i ) const {
852             try {
853                 tbb::this_task_arena::isolate( IsolatedBodyThrowsException() );
854                 REQUIRE_MESSAGE( false, "The exception has been lost" );
855             }
856             catch ( MyException ) {}
857             catch ( ... ) {
858                 REQUIRE_MESSAGE( false, "Unexpected exception" );
859             }
860             // Check that nested algorithms can steal outer-level tasks
861             int &e = myEts.local();
862             if ( e++ > 0 ) myIsStolen = true;
863             // work imbalance increases chances for stealing
864             tbb::parallel_for( 0, 10+i, utils::DummyBody( 100 ) );
865             --e;
866         }
867     };
868 
869 #endif /* TBB_USE_EXCEPTIONS */
870     void ExceptionTest() {
871 #if TBB_USE_EXCEPTIONS
872         tbb::enumerable_thread_specific<int> ets;
873         std::atomic<bool> is_stolen;
874         is_stolen = false;
875         for ( ;; ) {
876             tbb::parallel_for( 0, 1000, ExceptionTestBody( ets, is_stolen ) );
877             if ( is_stolen ) break;
878         }
879         REQUIRE_MESSAGE( is_stolen, "isolate should not affect non-isolated work" );
880 #endif /* TBB_USE_EXCEPTIONS */
881     }
882 
883     struct NonConstBody {
884         unsigned int state;
885         void operator()() {
886             state ^= ~0u;
887         }
888     };
889 
890     void TestNonConstBody() {
891         NonConstBody body;
892         body.state = 0x6c97d5ed;
893         tbb::this_task_arena::isolate(body);
894         REQUIRE_MESSAGE(body.state == 0x93682a12, "The wrong state");
895     }
896 
897     // TODO: Consider tbb::task_group instead of explicit task API.
898     class TestEnqueueTask : public tbb::detail::d1::task {
899         using wait_context = tbb::detail::d1::wait_context;
900 
901         tbb::enumerable_thread_specific<bool>& executed;
902         std::atomic<int>& completed;
903 
904     public:
905         wait_context& waiter;
906         tbb::task_arena& arena;
907         static const int N = 100;
908 
909         TestEnqueueTask(tbb::enumerable_thread_specific<bool>& exe, std::atomic<int>& c, wait_context& w, tbb::task_arena& a)
910             : executed(exe), completed(c), waiter(w), arena(a) {}
911 
912         tbb::detail::d1::task* execute(tbb::detail::d1::execution_data&) override {
913             for (int i = 0; i < N; ++i) {
914                 arena.enqueue([&]() {
915                     executed.local() = true;
916                     ++completed;
917                     for (int j = 0; j < 100; j++) utils::yield();
918                     waiter.release(1);
919                 });
920             }
921             return nullptr;
922         }
923         tbb::detail::d1::task* cancel(tbb::detail::d1::execution_data&) override { return nullptr; }
924     };
925 
926     class TestEnqueueIsolateBody : utils::NoCopy {
927         tbb::enumerable_thread_specific<bool>& executed;
928         std::atomic<int>& completed;
929         tbb::task_arena& arena;
930     public:
931         static const int N = 100;
932 
933         TestEnqueueIsolateBody(tbb::enumerable_thread_specific<bool>& exe, std::atomic<int>& c, tbb::task_arena& a)
934             : executed(exe), completed(c), arena(a) {}
935         void operator()() {
936             tbb::task_group_context ctx;
937             tbb::detail::d1::wait_context waiter(N);
938 
939             TestEnqueueTask root(executed, completed, waiter, arena);
940             tbb::detail::d1::execute_and_wait(root, ctx, waiter, ctx);
941         }
942     };
943 
944     void TestEnqueue() {
945         tbb::enumerable_thread_specific<bool> executed(false);
946         std::atomic<int> completed;
947         tbb::task_arena arena = tbb::task_arena(tbb::task_arena::attach());
948 
949         // Check that the main thread can process enqueued tasks.
950         completed = 0;
951         TestEnqueueIsolateBody b1(executed, completed, arena);
952         b1();
953 
954         if (!executed.local()) {
955             REPORT("Warning: No one enqueued task has executed by the main thread.\n");
956         }
957 
958         executed.local() = false;
959         completed = 0;
960         const int N = 100;
961         // Create enqueued tasks out of isolation.
962 
963         tbb::task_group_context ctx;
964         tbb::detail::d1::wait_context waiter(N);
965         for (int i = 0; i < N; ++i) {
966             arena.enqueue([&]() {
967                 executed.local() = true;
968                 ++completed;
969                 utils::yield();
970                 waiter.release(1);
971             });
972         }
973         TestEnqueueIsolateBody b2(executed, completed, arena);
974         tbb::this_task_arena::isolate(b2);
975         REQUIRE_MESSAGE(executed.local() == false, "An enqueued task was executed within isolate.");
976 
977         tbb::detail::d1::wait(waiter, ctx);
978         // while (completed < TestEnqueueTask::N + N) utils::yield();
979     }
980 }
981 
982 void TestIsolatedExecute() {
983     // At least 3 threads (owner + 2 thieves) are required to reproduce a situation when the owner steals outer
984     // level task on a nested level. If we have only one thief then it will execute outer level tasks first and
985     // the owner will not have a possibility to steal outer level tasks.
986     int platform_max_thread = tbb::this_task_arena::max_concurrency();
987     int num_threads = utils::min( platform_max_thread, 3 );
988     {
989         // Too many threads require too many work to reproduce the stealing from outer level.
990         tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, utils::max(num_threads, 7));
991         TestIsolatedExecuteNS::TwoLoopsTest();
992         TestIsolatedExecuteNS::HeavyMixTest();
993         TestIsolatedExecuteNS::ExceptionTest();
994     }
995     tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, num_threads);
996     TestIsolatedExecuteNS::HeavyMixTest();
997     TestIsolatedExecuteNS::TestNonConstBody();
998     TestIsolatedExecuteNS::TestEnqueue();
999 }
1000 
1001 //-----------------------------------------------------------------------------------------//
1002 
1003 class TestDelegatedSpawnWaitBody : utils::NoAssign {
1004     tbb::task_arena &my_a;
1005     utils::SpinBarrier &my_b1, &my_b2;
1006 public:
1007     TestDelegatedSpawnWaitBody( tbb::task_arena &a, utils::SpinBarrier &b1, utils::SpinBarrier &b2)
1008         : my_a(a), my_b1(b1), my_b2(b2) {}
1009     // NativeParallelFor's functor
1010     void operator()(int idx) const {
1011         if ( idx==0 ) { // thread 0 works in the arena, thread 1 waits for it (to prevent test hang)
1012             for (int i = 0; i < 2; ++i) {
1013                 my_a.enqueue([this] { my_b1.wait(); }); // tasks to sync with workers
1014             }
1015             tbb::task_group tg;
1016             my_b1.wait(); // sync with the workers
1017             for( int i=0; i<100000; ++i) {
1018                 my_a.execute([&tg] { tg.run([] {}); });
1019             }
1020             my_a.execute([&tg] {tg.wait(); });
1021         }
1022 
1023         my_b2.wait(); // sync both threads
1024     }
1025 };
1026 
1027 void TestDelegatedSpawnWait() {
1028     if (tbb::this_task_arena::max_concurrency() < 3) {
1029         // The test requires at least 2 worker threads
1030         return;
1031     }
1032     // Regression test for a bug with missed wakeup notification from a delegated task
1033     tbb::task_arena a(2,0);
1034     a.initialize();
1035     utils::SpinBarrier barrier1(3), barrier2(2);
1036     utils::NativeParallelFor( 2, TestDelegatedSpawnWaitBody(a, barrier1, barrier2) );
1037     a.debug_wait_until_empty();
1038 }
1039 
1040 //-----------------------------------------------------------------------------------------//
1041 
1042 class TestMultipleWaitsArenaWait : utils::NoAssign {
1043     using wait_context = tbb::detail::d1::wait_context;
1044 public:
1045     TestMultipleWaitsArenaWait( int idx, int bunch_size, int num_tasks, std::vector<wait_context*>& waiters, std::atomic<int>& processed, tbb::task_group_context& tgc )
1046         : my_idx( idx ), my_bunch_size( bunch_size ), my_num_tasks(num_tasks), my_waiters( waiters ), my_processed( processed ), my_context(tgc) {}
1047     void operator()() const {
1048         ++my_processed;
1049         // Wait for all tasks
1050         if ( my_idx < my_num_tasks ) {
1051             tbb::detail::d1::wait(*my_waiters[my_idx], my_context);
1052         }
1053         // Signal waiting tasks
1054         if ( my_idx >= my_bunch_size ) {
1055             my_waiters[my_idx-my_bunch_size]->release();
1056         }
1057     }
1058 private:
1059     int my_idx;
1060     int my_bunch_size;
1061     int my_num_tasks;
1062     std::vector<wait_context*>& my_waiters;
1063     std::atomic<int>& my_processed;
1064     tbb::task_group_context& my_context;
1065 };
1066 
1067 class TestMultipleWaitsThreadBody : utils::NoAssign {
1068     using wait_context = tbb::detail::d1::wait_context;
1069 public:
1070     TestMultipleWaitsThreadBody( int bunch_size, int num_tasks, tbb::task_arena& a, std::vector<wait_context*>& waiters, std::atomic<int>& processed, tbb::task_group_context& tgc )
1071         : my_bunch_size( bunch_size ), my_num_tasks( num_tasks ), my_arena( a ), my_waiters( waiters ), my_processed( processed ), my_context(tgc) {}
1072     void operator()( int idx ) const {
1073         my_arena.execute( TestMultipleWaitsArenaWait( idx, my_bunch_size, my_num_tasks, my_waiters, my_processed, my_context ) );
1074         --my_processed;
1075     }
1076 private:
1077     int my_bunch_size;
1078     int my_num_tasks;
1079     tbb::task_arena& my_arena;
1080     std::vector<wait_context*>& my_waiters;
1081     std::atomic<int>& my_processed;
1082     tbb::task_group_context& my_context;
1083 };
1084 
1085 void TestMultipleWaits( int num_threads, int num_bunches, int bunch_size ) {
1086     tbb::task_arena a( num_threads );
1087     const int num_tasks = (num_bunches-1)*bunch_size;
1088 
1089     tbb::task_group_context tgc;
1090     std::vector<tbb::detail::d1::wait_context*> waiters(num_tasks);
1091     for (auto& w : waiters) w = new tbb::detail::d1::wait_context(0);
1092 
1093     std::atomic<int> processed(0);
1094     for ( int repeats = 0; repeats<10; ++repeats ) {
1095         int idx = 0;
1096         for ( int bunch = 0; bunch < num_bunches-1; ++bunch ) {
1097             // Sync with the previous bunch of waiters to prevent "false" nested dependicies (when a nested task waits for an outer task).
1098             while ( processed < bunch*bunch_size ) utils::yield();
1099             // Run the bunch of threads/waiters that depend on the next bunch of threads/waiters.
1100             for ( int i = 0; i<bunch_size; ++i ) {
1101                 waiters[idx]->reserve();
1102                 std::thread( TestMultipleWaitsThreadBody( bunch_size, num_tasks, a, waiters, processed, tgc ), idx++ ).detach();
1103             }
1104         }
1105         // No sync because the threads of the last bunch do not call wait_for_all.
1106         // Run the last bunch of threads.
1107         for ( int i = 0; i<bunch_size; ++i )
1108             std::thread( TestMultipleWaitsThreadBody( bunch_size, num_tasks, a, waiters, processed, tgc ), idx++ ).detach();
1109         while ( processed ) utils::yield();
1110     }
1111     for (auto w : waiters) delete w;
1112 }
1113 
1114 void TestMultipleWaits() {
1115     // Limit the number of threads to prevent heavy oversubscription.
1116 #if TBB_TEST_LOW_WORKLOAD
1117     const int max_threads = std::min( 4, tbb::this_task_arena::max_concurrency() );
1118 #else
1119     const int max_threads = std::min( 16, tbb::this_task_arena::max_concurrency() );
1120 #endif
1121 
1122     utils::FastRandom<> rnd(1234);
1123     for ( int threads = 1; threads <= max_threads; threads += utils::max( threads/2, 1 ) ) {
1124         for ( int i = 0; i<3; ++i ) {
1125             const int num_bunches = 3 + rnd.get()%3;
1126             const int bunch_size = max_threads + rnd.get()%max_threads;
1127             TestMultipleWaits( threads, num_bunches, bunch_size );
1128         }
1129     }
1130 }
1131 
1132 //--------------------------------------------------//
1133 
1134 void TestSmallStackSize() {
1135     tbb::global_control gc(tbb::global_control::thread_stack_size,
1136             tbb::global_control::active_value(tbb::global_control::thread_stack_size) / 2 );
1137     // The test produces the warning (not a error) if fails. So the test is run many times
1138     // to make the log annoying (to force to consider it as an error).
1139     for (int i = 0; i < 100; ++i) {
1140         tbb::task_arena a;
1141         a.initialize();
1142     }
1143 }
1144 
1145 //--------------------------------------------------//
1146 
1147 namespace TestMoveSemanticsNS {
1148     struct TestFunctor {
1149         void operator()() const {};
1150     };
1151 
1152     struct MoveOnlyFunctor : utils::MoveOnly, TestFunctor {
1153         MoveOnlyFunctor() : utils::MoveOnly() {};
1154         MoveOnlyFunctor(MoveOnlyFunctor&& other) : utils::MoveOnly(std::move(other)) {};
1155     };
1156 
1157     struct MovePreferableFunctor : utils::Movable, TestFunctor {
1158         MovePreferableFunctor() : utils::Movable() {};
1159         MovePreferableFunctor(MovePreferableFunctor&& other) : utils::Movable( std::move(other) ) {};
1160         MovePreferableFunctor(const MovePreferableFunctor& other) : utils::Movable(other) {};
1161     };
1162 
1163     struct NoMoveNoCopyFunctor : utils::NoCopy, TestFunctor {
1164         NoMoveNoCopyFunctor() : utils::NoCopy() {};
1165         // mv ctor is not allowed as cp ctor from parent NoCopy
1166     private:
1167         NoMoveNoCopyFunctor(NoMoveNoCopyFunctor&&);
1168     };
1169 
1170     void TestFunctors() {
1171         tbb::task_arena ta;
1172         MovePreferableFunctor mpf;
1173         // execute() doesn't have any copies or moves of arguments inside the impl
1174         ta.execute( NoMoveNoCopyFunctor() );
1175 
1176         ta.enqueue( MoveOnlyFunctor() );
1177         ta.enqueue( mpf );
1178         REQUIRE_MESSAGE(mpf.alive, "object was moved when was passed by lval");
1179         mpf.Reset();
1180         ta.enqueue( std::move(mpf) );
1181         REQUIRE_MESSAGE(!mpf.alive, "object was copied when was passed by rval");
1182         mpf.Reset();
1183     }
1184 }
1185 
1186 void TestMoveSemantics() {
1187     TestMoveSemanticsNS::TestFunctors();
1188 }
1189 
1190 //--------------------------------------------------//
1191 
1192 #include <vector>
1193 
1194 #include "common/state_trackable.h"
1195 
1196 namespace TestReturnValueNS {
1197     struct noDefaultTag {};
1198     class ReturnType : public StateTrackable<> {
1199         static const int SIZE = 42;
1200         std::vector<int> data;
1201     public:
1202         ReturnType(noDefaultTag) : StateTrackable<>(0) {}
1203         // Define copy constructor to test that it is never called
1204         ReturnType(const ReturnType& r) : StateTrackable<>(r), data(r.data) {}
1205         ReturnType(ReturnType&& r) : StateTrackable<>(std::move(r)), data(std::move(r.data)) {}
1206 
1207         void fill() {
1208             for (int i = 0; i < SIZE; ++i)
1209                 data.push_back(i);
1210         }
1211         void check() {
1212             REQUIRE(data.size() == unsigned(SIZE));
1213             for (int i = 0; i < SIZE; ++i)
1214                 REQUIRE(data[i] == i);
1215             StateTrackableCounters::counters_type& cnts = StateTrackableCounters::counters;
1216             REQUIRE(cnts[StateTrackableBase::DefaultInitialized] == 0);
1217             REQUIRE(cnts[StateTrackableBase::DirectInitialized] == 1);
1218             std::size_t copied = cnts[StateTrackableBase::CopyInitialized];
1219             std::size_t moved = cnts[StateTrackableBase::MoveInitialized];
1220             REQUIRE(cnts[StateTrackableBase::Destroyed] == copied + moved);
1221             // The number of copies/moves should not exceed 3: function return, store to an internal storage,
1222             // acquire internal storage.
1223             REQUIRE((copied == 0 && moved <=3));
1224         }
1225     };
1226 
1227     template <typename R>
1228     R function() {
1229         noDefaultTag tag;
1230         R r(tag);
1231         r.fill();
1232         return r;
1233     }
1234 
1235     template <>
1236     void function<void>() {}
1237 
1238     template <typename R>
1239     struct Functor {
1240         R operator()() const {
1241             return function<R>();
1242         }
1243     };
1244 
1245     tbb::task_arena& arena() {
1246         static tbb::task_arena a;
1247         return a;
1248     }
1249 
1250     template <typename F>
1251     void TestExecute(F &f) {
1252         StateTrackableCounters::reset();
1253         ReturnType r = arena().execute(f);
1254         r.check();
1255     }
1256 
1257     template <typename F>
1258     void TestExecute(const F &f) {
1259         StateTrackableCounters::reset();
1260         ReturnType r = arena().execute(f);
1261         r.check();
1262     }
1263     template <typename F>
1264     void TestIsolate(F &f) {
1265         StateTrackableCounters::reset();
1266         ReturnType r = tbb::this_task_arena::isolate(f);
1267         r.check();
1268     }
1269 
1270     template <typename F>
1271     void TestIsolate(const F &f) {
1272         StateTrackableCounters::reset();
1273         ReturnType r = tbb::this_task_arena::isolate(f);
1274         r.check();
1275     }
1276 
1277     void Test() {
1278         TestExecute(Functor<ReturnType>());
1279         Functor<ReturnType> f1;
1280         TestExecute(f1);
1281         TestExecute(function<ReturnType>);
1282 
1283         arena().execute(Functor<void>());
1284         Functor<void> f2;
1285         arena().execute(f2);
1286         arena().execute(function<void>);
1287         TestIsolate(Functor<ReturnType>());
1288         TestIsolate(f1);
1289         TestIsolate(function<ReturnType>);
1290         tbb::this_task_arena::isolate(Functor<void>());
1291         tbb::this_task_arena::isolate(f2);
1292         tbb::this_task_arena::isolate(function<void>);
1293     }
1294 }
1295 
1296 void TestReturnValue() {
1297     TestReturnValueNS::Test();
1298 }
1299 
1300 //--------------------------------------------------//
1301 
1302 // MyObserver checks if threads join to the same arena
1303 struct MyObserver: public tbb::task_scheduler_observer {
1304     tbb::enumerable_thread_specific<tbb::task_arena*>& my_tls;
1305     tbb::task_arena& my_arena;
1306     std::atomic<int>& my_failure_counter;
1307     std::atomic<int>& my_counter;
1308     utils::SpinBarrier& m_barrier;
1309 
1310     MyObserver(tbb::task_arena& a,
1311         tbb::enumerable_thread_specific<tbb::task_arena*>& tls,
1312         std::atomic<int>& failure_counter,
1313         std::atomic<int>& counter,
1314         utils::SpinBarrier& barrier)
1315         : tbb::task_scheduler_observer(a), my_tls(tls), my_arena(a),
1316         my_failure_counter(failure_counter), my_counter(counter), m_barrier(barrier) {
1317         observe(true);
1318     }
1319     void on_scheduler_entry(bool worker) override {
1320         if (worker) {
1321             ++my_counter;
1322             tbb::task_arena*& cur_arena = my_tls.local();
1323             if (cur_arena != 0 && cur_arena != &my_arena) {
1324                 ++my_failure_counter;
1325             }
1326             cur_arena = &my_arena;
1327             m_barrier.wait();
1328         }
1329     }
1330     void on_scheduler_exit(bool worker) override {
1331         if (worker) {
1332             m_barrier.wait(); // before wakeup
1333             m_barrier.wait(); // after wakeup
1334         }
1335     }
1336 };
1337 
1338 void TestArenaWorkersMigrationWithNumThreads(int n_threads = 0) {
1339     if (n_threads == 0) {
1340         n_threads = tbb::this_task_arena::max_concurrency();
1341     }
1342 
1343     const int max_n_arenas = 8;
1344     int n_arenas = 2;
1345     if(n_threads > 16) {
1346         n_arenas = max_n_arenas;
1347     } else if (n_threads > 8) {
1348         n_arenas = 4;
1349     }
1350 
1351     int n_workers = n_threads - 1;
1352     n_workers = n_arenas * (n_workers / n_arenas);
1353     if (n_workers == 0) {
1354         return;
1355     }
1356 
1357     n_threads = n_workers + 1;
1358     tbb::global_control control(tbb::global_control::max_allowed_parallelism, n_threads);
1359 
1360     const int n_repetitions = 20;
1361     const int n_outer_repetitions = 100;
1362     std::multiset<float> failure_ratio; // for median calculating
1363     utils::SpinBarrier barrier(n_threads);
1364     utils::SpinBarrier worker_barrier(n_workers);
1365     MyObserver* observer[max_n_arenas];
1366     std::vector<tbb::task_arena> arenas(n_arenas);
1367     std::atomic<int> failure_counter;
1368     std::atomic<int> counter;
1369     tbb::enumerable_thread_specific<tbb::task_arena*> tls;
1370 
1371     for (int i = 0; i < n_arenas; ++i) {
1372         arenas[i].initialize(n_workers / n_arenas + 1); // +1 for master
1373         observer[i] = new MyObserver(arenas[i], tls, failure_counter, counter, barrier);
1374     }
1375 
1376     int ii = 0;
1377     for (; ii < n_outer_repetitions; ++ii) {
1378         failure_counter = 0;
1379         counter = 0;
1380 
1381         // Main code
1382         auto wakeup = [&arenas] { for (auto& a : arenas) a.enqueue([]{}); };
1383         wakeup();
1384         for (int j = 0; j < n_repetitions; ++j) {
1385             barrier.wait(); // entry
1386             barrier.wait(); // exit1
1387             wakeup();
1388             barrier.wait(); // exit2
1389         }
1390         barrier.wait(); // entry
1391         barrier.wait(); // exit1
1392         barrier.wait(); // exit2
1393 
1394         failure_ratio.insert(float(failure_counter) / counter);
1395         tls.clear();
1396         // collect 3 elements in failure_ratio before calculating median
1397         if (ii > 1) {
1398             std::multiset<float>::iterator it = failure_ratio.begin();
1399             std::advance(it, failure_ratio.size() / 2);
1400             if (*it < 0.02)
1401                 break;
1402         }
1403     }
1404     for (int i = 0; i < n_arenas; ++i) {
1405         delete observer[i];
1406     }
1407     // check if median is so big
1408     std::multiset<float>::iterator it = failure_ratio.begin();
1409     std::advance(it, failure_ratio.size() / 2);
1410     // TODO: decrease constants 0.05 and 0.3 by setting ratio between n_threads and n_arenas
1411     if (*it > 0.05) {
1412         REPORT("Warning: So many cases when threads join to different arenas.\n");
1413         REQUIRE_MESSAGE(*it <= 0.3, "A lot of cases when threads join to different arenas.\n");
1414     }
1415 }
1416 
1417 void TestArenaWorkersMigration() {
1418     TestArenaWorkersMigrationWithNumThreads(4);
1419     if (tbb::this_task_arena::max_concurrency() != 4) {
1420         TestArenaWorkersMigrationWithNumThreads();
1421     }
1422 }
1423 
1424 //--------------------------------------------------//
1425 void TestDefaultCreatedWorkersAmount() {
1426     int threads = tbb::this_task_arena::max_concurrency();
1427     utils::NativeParallelFor(1, [threads](int idx) {
1428         REQUIRE_MESSAGE(idx == 0, "more than 1 thread is going to reset TLS");
1429         utils::SpinBarrier barrier(threads);
1430         ResetTLS();
1431         for (int trail = 0; trail < 10; ++trail) {
1432             tbb::parallel_for(0, threads, [threads, &barrier](int) {
1433                 REQUIRE_MESSAGE(threads == tbb::this_task_arena::max_concurrency(), "concurrency level is not equal specified threadnum");
1434                 REQUIRE_MESSAGE(tbb::this_task_arena::current_thread_index() < tbb::this_task_arena::max_concurrency(), "amount of created threads is more than specified by default");
1435                 local_id.local() = 1;
1436                 // If there is more threads than expected, 'sleep' gives a chance to join unexpected threads.
1437                 utils::Sleep(1);
1438                 barrier.wait();
1439                 }, tbb::simple_partitioner());
1440             REQUIRE_MESSAGE(local_id.size() == size_t(threads), "amount of created threads is not equal to default num");
1441         }
1442     });
1443 }
1444 
1445 void TestAbilityToCreateWorkers(int thread_num) {
1446     tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, thread_num);
1447     // Checks only some part of reserved-external threads amount:
1448     // 0 and 1 reserved threads are important cases but it is also needed
1449     // to collect some statistic data with other amount and to not consume
1450     // whole test sesion time checking each amount
1451     TestArenaConcurrency(thread_num - 1, 0, int(thread_num / 2.72));
1452     TestArenaConcurrency(thread_num, 1, int(thread_num / 3.14));
1453 }
1454 
1455 void TestDefaultWorkersLimit() {
1456     TestDefaultCreatedWorkersAmount();
1457 #if TBB_TEST_LOW_WORKLOAD
1458     TestAbilityToCreateWorkers(24);
1459 #else
1460     TestAbilityToCreateWorkers(256);
1461 #endif
1462 }
1463 
1464 #if TBB_USE_EXCEPTIONS
1465 
1466 void ExceptionInExecute() {
1467     std::size_t thread_number = utils::get_platform_max_threads();
1468     tbb::task_arena test_arena(thread_number / 2, thread_number / 2);
1469 
1470     std::atomic<int> canceled_task{};
1471 
1472     auto parallel_func = [&test_arena, &canceled_task] (std::size_t) {
1473         for (std::size_t i = 0; i < 1000; ++i) {
1474             try {
1475                 test_arena.execute([] {
1476                     volatile bool suppress_unreachable_code_warning = true;
1477                     if (suppress_unreachable_code_warning) {
1478                         throw -1;
1479                     }
1480                 });
1481                 FAIL("An exception should have thrown.");
1482             } catch (int) {
1483                 ++canceled_task;
1484             } catch (...) {
1485                 FAIL("Wrong type of exception.");
1486             }
1487         }
1488     };
1489 
1490     utils::NativeParallelFor(thread_number, parallel_func);
1491     CHECK(canceled_task == thread_number * 1000);
1492 }
1493 
1494 #endif // TBB_USE_EXCEPTIONS
1495 
1496 class simple_observer : public tbb::task_scheduler_observer {
1497     static std::atomic<int> idx_counter;
1498     int my_idx;
1499     int myMaxConcurrency;   // concurrency of the associated arena
1500     int myNumReservedSlots; // reserved slots in the associated arena
1501     void on_scheduler_entry( bool is_worker ) override {
1502         int current_index = tbb::this_task_arena::current_thread_index();
1503         CHECK(current_index < (myMaxConcurrency > 1 ? myMaxConcurrency : 2));
1504         if (is_worker) {
1505             CHECK(current_index >= myNumReservedSlots);
1506         }
1507     }
1508     void on_scheduler_exit( bool /*is_worker*/ ) override
1509     {}
1510 public:
1511     simple_observer(tbb::task_arena &a, int maxConcurrency, int numReservedSlots)
1512         : tbb::task_scheduler_observer(a), my_idx(idx_counter++)
1513         , myMaxConcurrency(maxConcurrency)
1514         , myNumReservedSlots(numReservedSlots) {
1515         observe(true);
1516     }
1517 
1518     friend bool operator<(const simple_observer& lhs, const simple_observer& rhs) {
1519         return lhs.my_idx < rhs.my_idx;
1520     }
1521 };
1522 
1523 std::atomic<int> simple_observer::idx_counter{};
1524 
1525 struct arena_handler {
1526     enum arena_status {
1527         alive,
1528         deleting,
1529         deleted
1530     };
1531 
1532     tbb::task_arena* arena;
1533 
1534     std::atomic<arena_status> status{alive};
1535     tbb::spin_rw_mutex arena_in_use{};
1536 
1537     tbb::concurrent_set<simple_observer> observers;
1538 
1539     arena_handler(tbb::task_arena* ptr) : arena(ptr)
1540     {}
1541 
1542     friend bool operator<(const arena_handler& lhs, const arena_handler& rhs) {
1543         return lhs.arena < rhs.arena;
1544     }
1545 };
1546 
1547 // TODO: Add observer operations
1548 void StressTestMixFunctionality() {
1549     enum operation_type {
1550         create_arena,
1551         delete_arena,
1552         attach_observer,
1553         detach_observer,
1554         arena_execute,
1555         enqueue_task,
1556         last_operation_marker
1557     };
1558 
1559     std::size_t operations_number = last_operation_marker;
1560     std::size_t thread_number = utils::get_platform_max_threads();
1561     utils::FastRandom<> operation_rnd(42);
1562     tbb::spin_mutex random_operation_guard;
1563 
1564     auto get_random_operation = [&operation_rnd, &random_operation_guard, operations_number] () {
1565         tbb::spin_mutex::scoped_lock lock(random_operation_guard);
1566         return static_cast<operation_type>(operation_rnd.get() % operations_number);
1567     };
1568 
1569     utils::FastRandom<> arena_rnd(42);
1570     tbb::spin_mutex random_arena_guard;
1571     auto get_random_arena = [&arena_rnd, &random_arena_guard] () {
1572         tbb::spin_mutex::scoped_lock lock(random_arena_guard);
1573         return arena_rnd.get();
1574     };
1575 
1576     tbb::concurrent_set<arena_handler> arenas_pool;
1577 
1578     std::vector<std::thread> thread_pool;
1579 
1580     utils::SpinBarrier thread_barrier(thread_number);
1581     std::size_t max_operations = 20000;
1582     std::atomic<std::size_t> curr_operation{};
1583     auto thread_func = [&] () {
1584         arenas_pool.emplace(new tbb::task_arena());
1585         thread_barrier.wait();
1586         while (curr_operation++ < max_operations) {
1587             switch (get_random_operation()) {
1588                 case create_arena :
1589                 {
1590                     arenas_pool.emplace(new tbb::task_arena());
1591                     break;
1592                 }
1593                 case delete_arena :
1594                 {
1595                     auto curr_arena = arenas_pool.begin();
1596                     for (; curr_arena != arenas_pool.end(); ++curr_arena) {
1597                         arena_handler::arena_status curr_status = arena_handler::alive;
1598                         if (curr_arena->status.compare_exchange_strong(curr_status, arena_handler::deleting)) {
1599                             break;
1600                         }
1601                     }
1602 
1603                     if (curr_arena == arenas_pool.end()) break;
1604 
1605                     tbb::spin_rw_mutex::scoped_lock lock(curr_arena->arena_in_use, /*writer*/ true);
1606 
1607                     delete curr_arena->arena;
1608                     curr_arena->status.store(arena_handler::deleted);
1609 
1610                     break;
1611                 }
1612                 case attach_observer :
1613                 {
1614                     tbb::spin_rw_mutex::scoped_lock lock{};
1615                     auto curr_arena = arenas_pool.begin();
1616                     for (; curr_arena != arenas_pool.end(); ++curr_arena) {
1617                         if (lock.try_acquire(curr_arena->arena_in_use, /*writer*/ false)) {
1618                             if (curr_arena->status == arena_handler::alive) {
1619                                 break;
1620                             } else {
1621                                 lock.release();
1622                             }
1623                         }
1624                     }
1625 
1626                     if (curr_arena == arenas_pool.end()) break;
1627 
1628                     {
1629                         curr_arena->observers.emplace(*curr_arena->arena, thread_number, 1);
1630                     }
1631 
1632                     break;
1633                 }
1634                 case detach_observer :
1635                 {
1636                     auto arena_number = get_random_arena() % arenas_pool.size();
1637                     auto curr_arena = arenas_pool.begin();
1638                     std::advance(curr_arena, arena_number);
1639 
1640                     for (auto it = curr_arena->observers.begin(); it != curr_arena->observers.end(); ++it) {
1641                         if (it->is_observing()) {
1642                             it->observe(false);
1643                             break;
1644                         }
1645                     }
1646 
1647                     break;
1648                 }
1649                 case arena_execute :
1650                 {
1651                     tbb::spin_rw_mutex::scoped_lock lock{};
1652                     auto curr_arena = arenas_pool.begin();
1653                     for (; curr_arena != arenas_pool.end(); ++curr_arena) {
1654                         if (lock.try_acquire(curr_arena->arena_in_use, /*writer*/ false)) {
1655                             if (curr_arena->status == arena_handler::alive) {
1656                                 break;
1657                             } else {
1658                                 lock.release();
1659                             }
1660                         }
1661                     }
1662 
1663                     if (curr_arena == arenas_pool.end()) break;
1664 
1665                     curr_arena->arena->execute([] () {
1666                         tbb::parallel_for(tbb::blocked_range<std::size_t>(0, 10000), [] (tbb::blocked_range<std::size_t>&) {
1667                             std::atomic<int> sum{};
1668                             // Make some work
1669                             for (; sum < 10; ++sum) ;
1670                         });
1671                     });
1672 
1673                     break;
1674                 }
1675                 case enqueue_task :
1676                 {
1677                     tbb::spin_rw_mutex::scoped_lock lock{};
1678                     auto curr_arena = arenas_pool.begin();
1679                     for (; curr_arena != arenas_pool.end(); ++curr_arena) {
1680                         if (lock.try_acquire(curr_arena->arena_in_use, /*writer*/ false)) {
1681                             if (curr_arena->status == arena_handler::alive) {
1682                                 break;
1683                             } else {
1684                                 lock.release();
1685                             }
1686                         }
1687                     }
1688 
1689                     if (curr_arena == arenas_pool.end()) break;
1690 
1691                     curr_arena->arena->enqueue([] {
1692                         std::atomic<int> sum{};
1693                         // Make some work
1694                         for (; sum < 1000; ++sum) ;
1695                     });
1696 
1697                     break;
1698                 }
1699                 case last_operation_marker :
1700                 break;
1701             }
1702         }
1703     };
1704 
1705     for (std::size_t i = 0; i < thread_number - 1; ++i) {
1706         thread_pool.emplace_back(thread_func);
1707     }
1708 
1709     thread_func();
1710 
1711     for (std::size_t i = 0; i < thread_number - 1; ++i) {
1712         if (thread_pool[i].joinable()) thread_pool[i].join();
1713     }
1714 
1715     for (auto& handler : arenas_pool) {
1716         if (handler.status != arena_handler::deleted) delete handler.arena;
1717     }
1718 }
1719 
1720 struct enqueue_test_helper {
1721     enqueue_test_helper(tbb::task_arena& arena, tbb::enumerable_thread_specific<bool>& ets , std::atomic<std::size_t>& task_counter)
1722         : my_arena(arena), my_ets(ets), my_task_counter(task_counter)
1723     {}
1724 
1725     enqueue_test_helper(const enqueue_test_helper& ef) : my_arena(ef.my_arena), my_ets(ef.my_ets), my_task_counter(ef.my_task_counter)
1726     {}
1727 
1728     void operator() () const {
1729         CHECK(my_ets.local());
1730         if (my_task_counter++ < 100000) my_arena.enqueue(enqueue_test_helper(my_arena, my_ets, my_task_counter));
1731         utils::yield();
1732     }
1733 
1734     tbb::task_arena& my_arena;
1735     tbb::enumerable_thread_specific<bool>& my_ets;
1736     std::atomic<std::size_t>& my_task_counter;
1737 };
1738 
1739 //--------------------------------------------------//
1740 
1741 //! Test for task arena in concurrent cases
1742 //! \brief \ref requirement
1743 TEST_CASE("Test for concurrent functionality") {
1744     TestConcurrentFunctionality();
1745 }
1746 
1747 //! Test for arena entry consistency
1748 //! \brief \ref requirement \ref error_guessing
1749 TEST_CASE("Test for task arena entry consistency") {
1750     TestArenaEntryConsistency();
1751 }
1752 
1753 //! Test for task arena attach functionality
1754 //! \brief \ref requirement \ref interface
1755 TEST_CASE("Test for the attach functionality") {
1756     TestAttach(4);
1757 }
1758 
1759 //! Test for constant functor requirements
1760 //! \brief \ref requirement \ref interface
1761 TEST_CASE("Test for constant functor requirement") {
1762     TestConstantFunctorRequirement();
1763 }
1764 
1765 //! Test for move semantics support
1766 //! \brief \ref requirement \ref interface
1767 TEST_CASE("Move semantics support") {
1768     TestMoveSemantics();
1769 }
1770 
1771 //! Test for different return value types
1772 //! \brief \ref requirement \ref interface
1773 TEST_CASE("Return value test") {
1774     TestReturnValue();
1775 }
1776 
1777 //! Test for delegated task spawn in case of unsuccessful slot attach
1778 //! \brief \ref error_guessing
1779 TEST_CASE("Delegated spawn wait") {
1780     TestDelegatedSpawnWait();
1781 }
1782 
1783 //! Test task arena isolation functionality
1784 //! \brief \ref requirement \ref interface
1785 TEST_CASE("Isolated execute") {
1786     // Isolation tests cases is valid only for more then 2 threads
1787     if (tbb::this_task_arena::max_concurrency() > 2) {
1788         TestIsolatedExecute();
1789     }
1790 }
1791 
1792 //! Test for TBB Workers creation limits
1793 //! \brief \ref requirement
1794 TEST_CASE("Default workers limit") {
1795     TestDefaultWorkersLimit();
1796 }
1797 
1798 //! Test for workers migration between arenas
1799 //! \brief \ref error_guessing \ref stress
1800 TEST_CASE("Arena workers migration") {
1801     TestArenaWorkersMigration();
1802 }
1803 
1804 //! Test for multiple waits, threads should not block each other
1805 //! \brief \ref requirement
1806 TEST_CASE("Multiple waits") {
1807     TestMultipleWaits();
1808 }
1809 
1810 //! Test for small stack size settings and arena initialization
1811 //! \brief \ref error_guessing
1812 TEST_CASE("Small stack size") {
1813     TestSmallStackSize();
1814 }
1815 
1816 #if TBB_USE_EXCEPTIONS
1817 //! \brief \ref requirement \ref stress
1818 TEST_CASE("Test for exceptions during execute.") {
1819     ExceptionInExecute();
1820 }
1821 
1822 //! \brief \ref error_guessing
1823 TEST_CASE("Exception thrown during tbb::task_arena::execute call") {
1824     struct throwing_obj {
1825         throwing_obj() {
1826             volatile bool flag = true;
1827             if (flag) throw std::exception{};
1828         }
1829         throwing_obj(const throwing_obj&) = default;
1830         ~throwing_obj() { FAIL("An destructor was called."); }
1831     };
1832 
1833     tbb::task_arena arena;
1834 
1835     REQUIRE_THROWS_AS( [&] {
1836         arena.execute([] {
1837             return throwing_obj{};
1838         });
1839     }(), std::exception );
1840 }
1841 #endif // TBB_USE_EXCEPTIONS
1842 
1843 //! \brief \ref stress
1844 TEST_CASE("Stress test with mixing functionality") {
1845     StressTestMixFunctionality();
1846 }
1847 
1848 //! \brief \ref stress
1849 TEST_CASE("Workers oversubscription") {
1850     std::size_t num_threads = utils::get_platform_max_threads();
1851     tbb::enumerable_thread_specific<bool> ets;
1852     tbb::global_control gl(tbb::global_control::max_allowed_parallelism, num_threads * 2);
1853     tbb::task_arena arena(num_threads * 2);
1854 
1855     utils::SpinBarrier barrier(num_threads * 2);
1856 
1857     arena.execute([&] {
1858         tbb::parallel_for(std::size_t(0), num_threads * 2,
1859             [&] (const std::size_t&) {
1860                 ets.local() = true;
1861                 barrier.wait();
1862             }
1863         );
1864     });
1865 
1866     utils::yield();
1867 
1868     std::atomic<std::size_t> task_counter{0};
1869     for (std::size_t i = 0; i < num_threads / 4 + 1; ++i) {
1870         arena.enqueue(enqueue_test_helper(arena, ets, task_counter));
1871     }
1872 
1873     while (task_counter < 100000) utils::yield();
1874 
1875     arena.execute([&] {
1876         tbb::parallel_for(std::size_t(0), num_threads * 2,
1877             [&] (const std::size_t&) {
1878                 CHECK(ets.local());
1879                 barrier.wait();
1880             }
1881         );
1882     });
1883 }
1884 
1885 #if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS
1886 //! Basic test for arena::enqueue with task handle
1887 //! \brief \ref interface \ref requirement
1888 TEST_CASE("enqueue task_handle") {
1889     tbb::task_arena arena;
1890     tbb::task_group tg;
1891 
1892     std::atomic<bool> run{false};
1893 
1894     auto task_handle = tg.defer([&]{ run = true; });
1895 
1896     arena.enqueue(std::move(task_handle));
1897     tg.wait();
1898 
1899     CHECK(run == true);
1900 }
1901 
1902 //! Basic test for this_task_arena::enqueue with task handle
1903 //! \brief \ref interface \ref requirement
1904 TEST_CASE("this_task_arena::enqueue task_handle") {
1905     tbb::task_arena arena;
1906     tbb::task_group tg;
1907 
1908     std::atomic<bool> run{false};
1909 
1910     arena.execute([&]{
1911         auto task_handle = tg.defer([&]{ run = true; });
1912 
1913         tbb::this_task_arena::enqueue(std::move(task_handle));
1914     });
1915 
1916     tg.wait();
1917 
1918     CHECK(run == true);
1919 }
1920 
1921 //! Basic test for this_task_arena::enqueue with functor
1922 //! \brief \ref interface \ref requirement
1923 TEST_CASE("this_task_arena::enqueue function") {
1924     tbb::task_arena arena;
1925     tbb::task_group tg;
1926 
1927     std::atomic<bool> run{false};
1928     //block the task_group to wait on it
1929     auto task_handle = tg.defer([]{});
1930 
1931     arena.execute([&]{
1932         tbb::this_task_arena::enqueue([&]{
1933             run = true;
1934             //release the task_group
1935             task_handle = tbb::task_handle{};
1936         });
1937     });
1938 
1939     tg.wait();
1940 
1941     CHECK(run == true);
1942 }
1943 
1944 #if TBB_USE_EXCEPTIONS
1945 //! Basic test for exceptions in task_arena::enqueue with task_handle
1946 //! \brief \ref interface \ref requirement
1947 TEST_CASE("task_arena::enqueue(task_handle) exception propagation"){
1948     tbb::task_group tg;
1949     tbb::task_arena arena;
1950 
1951     tbb::task_handle h = tg.defer([&]{
1952         volatile bool suppress_unreachable_code_warning = true;
1953         if (suppress_unreachable_code_warning) {
1954             throw std::runtime_error{ "" };
1955         }
1956     });
1957 
1958     arena.enqueue(std::move(h));
1959 
1960     CHECK_THROWS_AS(tg.wait(), std::runtime_error);
1961 }
1962 
1963 //! Basic test for exceptions in this_task_arena::enqueue with task_handle
1964 //! \brief \ref interface \ref requirement
1965 TEST_CASE("this_task_arena::enqueue(task_handle) exception propagation"){
1966     tbb::task_group tg;
1967 
1968     tbb::task_handle h = tg.defer([&]{
1969         volatile bool suppress_unreachable_code_warning = true;
1970         if (suppress_unreachable_code_warning) {
1971             throw std::runtime_error{ "" };
1972         }
1973     });
1974 
1975     tbb::this_task_arena::enqueue(std::move(h));
1976 
1977     CHECK_THROWS_AS(tg.wait(), std::runtime_error);
1978 }
1979 
1980 //! The test for error in scheduling empty task_handle
1981 //! \brief \ref requirement
1982 TEST_CASE("Empty task_handle cannot be scheduled"){
1983     tbb::task_arena ta;
1984 
1985     CHECK_THROWS_WITH_AS(ta.enqueue(tbb::task_handle{}),                    "Attempt to schedule empty task_handle", std::runtime_error);
1986     CHECK_THROWS_WITH_AS(tbb::this_task_arena::enqueue(tbb::task_handle{}), "Attempt to schedule empty task_handle", std::runtime_error);
1987 }
1988 #endif // TBB_USE_EXCEPTIONS
1989 
1990 //! Basic test for is_inside_task in task_group
1991 //! \brief \ref interface \ref requirement
1992 TEST_CASE("is_inside_task in task_group"){
1993     CHECK( false == tbb::is_inside_task());
1994 
1995     tbb::task_group tg;
1996     tg.run_and_wait([&]{
1997         CHECK( true == tbb::is_inside_task());
1998     });
1999 }
2000 
2001 //! Basic test for is_inside_task in arena::execute
2002 //! \brief \ref interface \ref requirement
2003 TEST_CASE("is_inside_task in arena::execute"){
2004     CHECK( false == tbb::is_inside_task());
2005 
2006     tbb::task_arena arena;
2007 
2008     arena.execute([&]{
2009         // The execute method is processed outside of any task
2010         CHECK( false == tbb::is_inside_task());
2011     });
2012 }
2013 
2014 //! The test for is_inside_task in arena::execute when inside other task
2015 //! \brief \ref error_guessing
2016 TEST_CASE("is_inside_task in arena::execute") {
2017     CHECK(false == tbb::is_inside_task());
2018 
2019     tbb::task_arena arena;
2020     tbb::task_group tg;
2021     tg.run_and_wait([&] {
2022         arena.execute([&] {
2023             // The execute method is processed outside of any task
2024             CHECK(false == tbb::is_inside_task());
2025         });
2026     });
2027 }
2028 #endif //__TBB_PREVIEW_TASK_GROUP_EXTENSIONS
2029