xref: /oneTBB/test/tbb/test_task_arena.cpp (revision b15aabb3)
1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "common/test.h"
18 
19 #define __TBB_EXTRA_DEBUG 1
20 #include "common/concurrency_tracker.h"
21 #include "common/spin_barrier.h"
22 #include "common/utils.h"
23 #include "common/utils_report.h"
24 #include "common/utils_concurrency_limit.h"
25 
26 #include "tbb/task_arena.h"
27 #include "tbb/task_scheduler_observer.h"
28 #include "tbb/enumerable_thread_specific.h"
29 #include "tbb/parallel_for.h"
30 #include "tbb/global_control.h"
31 #include "tbb/concurrent_set.h"
32 #include "tbb/spin_mutex.h"
33 #include "tbb/spin_rw_mutex.h"
34 
35 #include <stdexcept>
36 #include <cstdlib>
37 #include <cstdio>
38 #include <vector>
39 #include <thread>
40 #include <atomic>
41 
42 //#include "harness_fp.h"
43 
44 //! \file test_task_arena.cpp
45 //! \brief Test for [scheduler.task_arena scheduler.task_scheduler_observer] specification
46 
47 //--------------------------------------------------//
48 // Test that task_arena::initialize and task_arena::terminate work when doing nothing else.
49 /* maxthread is treated as the biggest possible concurrency level. */
50 void InitializeAndTerminate( int maxthread ) {
51     for( int i=0; i<200; ++i ) {
52         switch( i&3 ) {
53             // Arena is created inactive, initialization is always explicit. Lazy initialization is covered by other test functions.
54             // Explicit initialization can either keep the original values or change those.
55             // Arena termination can be explicit or implicit (in the destructor).
56             // TODO: extend with concurrency level checks if such a method is added.
57             default: {
58                 tbb::task_arena arena( std::rand() % maxthread + 1 );
59                 CHECK_MESSAGE(!arena.is_active(), "arena should not be active until initialized");
60                 arena.initialize();
61                 CHECK(arena.is_active());
62                 arena.terminate();
63                 CHECK_MESSAGE(!arena.is_active(), "arena should not be active; it was terminated");
64                 break;
65             }
66             case 0: {
67                 tbb::task_arena arena( 1 );
68                 CHECK_MESSAGE(!arena.is_active(), "arena should not be active until initialized");
69                 arena.initialize( std::rand() % maxthread + 1 ); // change the parameters
70                 CHECK(arena.is_active());
71                 break;
72             }
73             case 1: {
74                 tbb::task_arena arena( tbb::task_arena::automatic );
75                 CHECK(!arena.is_active());
76                 arena.initialize();
77                 CHECK(arena.is_active());
78                 break;
79             }
80             case 2: {
81                 tbb::task_arena arena;
82                 CHECK_MESSAGE(!arena.is_active(), "arena should not be active until initialized");
83                 arena.initialize( std::rand() % maxthread + 1 );
84                 CHECK(arena.is_active());
85                 arena.terminate();
86                 CHECK_MESSAGE(!arena.is_active(), "arena should not be active; it was terminated");
87                 break;
88             }
89         }
90     }
91 }
92 
93 //--------------------------------------------------//
94 
95 // Definitions used in more than one test
96 typedef tbb::blocked_range<int> Range;
97 
98 // slot_id value: -1 is reserved by current_slot(), -2 is set in on_scheduler_exit() below
99 static tbb::enumerable_thread_specific<int> local_id, old_id, slot_id(-3);
100 
101 void ResetTLS() {
102     local_id.clear();
103     old_id.clear();
104     slot_id.clear();
105 }
106 
107 class ArenaObserver : public tbb::task_scheduler_observer {
108     int myId;               // unique observer/arena id within a test
109     int myMaxConcurrency;   // concurrency of the associated arena
110     int myNumReservedSlots; // reserved slots in the associated arena
111     void on_scheduler_entry( bool is_worker ) override {
112         int current_index = tbb::this_task_arena::current_thread_index();
113         CHECK(current_index < (myMaxConcurrency > 1 ? myMaxConcurrency : 2));
114         if (is_worker) {
115             CHECK(current_index >= myNumReservedSlots);
116         }
117         CHECK_MESSAGE(!old_id.local(), "double call to on_scheduler_entry");
118         old_id.local() = local_id.local();
119         CHECK_MESSAGE(old_id.local() != myId, "double entry to the same arena");
120         local_id.local() = myId;
121         slot_id.local() = current_index;
122     }
123     void on_scheduler_exit( bool /*is_worker*/ ) override {
124         CHECK_MESSAGE(local_id.local() == myId, "nesting of arenas is broken");
125         CHECK(slot_id.local() == tbb::this_task_arena::current_thread_index());
126         slot_id.local() = -2;
127         local_id.local() = old_id.local();
128         old_id.local() = 0;
129     }
130 public:
131     ArenaObserver(tbb::task_arena &a, int maxConcurrency, int numReservedSlots, int id)
132         : tbb::task_scheduler_observer(a)
133         , myId(id)
134         , myMaxConcurrency(maxConcurrency)
135         , myNumReservedSlots(numReservedSlots) {
136         CHECK(myId);
137         observe(true);
138     }
139     ~ArenaObserver () {
140         CHECK_MESSAGE(!old_id.local(), "inconsistent observer state");
141     }
142 };
143 
144 struct IndexTrackingBody { // Must be used together with ArenaObserver
145     void operator() ( const Range& ) const {
146         CHECK(slot_id.local() == tbb::this_task_arena::current_thread_index());
147         utils::doDummyWork(50000);
148     }
149 };
150 
151 struct AsynchronousWork {
152     utils::SpinBarrier &my_barrier;
153     bool my_is_blocking;
154     AsynchronousWork(utils::SpinBarrier &a_barrier, bool blocking = true)
155     : my_barrier(a_barrier), my_is_blocking(blocking) {}
156     void operator()() const {
157         CHECK_MESSAGE(local_id.local() != 0, "not in explicit arena");
158         tbb::parallel_for(Range(0,500), IndexTrackingBody(), tbb::simple_partitioner());
159         if(my_is_blocking) my_barrier.wait(); // must be asynchronous to an external thread
160         else my_barrier.signalNoWait();
161     }
162 };
163 
164 //-----------------------------------------------------------------------------------------//
165 
166 // Test that task_arenas might be created and used from multiple application threads.
167 // Also tests arena observers. The parameter p is the index of an app thread running this test.
168 void TestConcurrentArenasFunc(int idx) {
169     // A regression test for observer activation order:
170     // check that arena observer can be activated before local observer
171     struct LocalObserver : public tbb::task_scheduler_observer {
172         LocalObserver() : tbb::task_scheduler_observer() { observe(true); }
173     };
174     tbb::task_arena a1;
175     a1.initialize(1,0);
176     ArenaObserver o1(a1, 1, 0, idx*2+1); // the last argument is a "unique" observer/arena id for the test
177     CHECK_MESSAGE(o1.is_observing(), "Arena observer has not been activated");
178     LocalObserver lo;
179     CHECK_MESSAGE(lo.is_observing(), "Local observer has not been activated");
180     tbb::task_arena a2(2,1);
181     ArenaObserver o2(a2, 2, 1, idx*2+2);
182     CHECK_MESSAGE(o2.is_observing(), "Arena observer has not been activated");
183     utils::SpinBarrier barrier(2);
184     AsynchronousWork work(barrier);
185     a1.enqueue(work); // put async work
186     barrier.wait();
187     a2.enqueue(work); // another work
188     a2.execute(work);
189     a1.debug_wait_until_empty();
190     a2.debug_wait_until_empty();
191 }
192 
193 void TestConcurrentArenas(int p) {
194     // TODO REVAMP fix with global control
195     ResetTLS();
196     utils::NativeParallelFor( p, &TestConcurrentArenasFunc );
197 }
198 //--------------------------------------------------//
199 // Test multiple application threads working with a single arena at the same time.
200 class MultipleMastersPart1 : utils::NoAssign {
201     tbb::task_arena &my_a;
202     utils::SpinBarrier &my_b1, &my_b2;
203 public:
204     MultipleMastersPart1( tbb::task_arena &a, utils::SpinBarrier &b1, utils::SpinBarrier &b2)
205         : my_a(a), my_b1(b1), my_b2(b2) {}
206     void operator()(int) const {
207         my_a.execute(AsynchronousWork(my_b2, /*blocking=*/false));
208         my_b1.wait();
209         // A regression test for bugs 1954 & 1971
210         my_a.enqueue(AsynchronousWork(my_b2, /*blocking=*/false));
211     }
212 };
213 
214 class MultipleMastersPart2 : utils::NoAssign {
215     tbb::task_arena &my_a;
216     utils::SpinBarrier &my_b;
217 public:
218     MultipleMastersPart2( tbb::task_arena &a, utils::SpinBarrier &b) : my_a(a), my_b(b) {}
219     void operator()(int) const {
220         my_a.execute(AsynchronousWork(my_b, /*blocking=*/false));
221     }
222 };
223 
224 class MultipleMastersPart3 : utils::NoAssign {
225     tbb::task_arena &my_a;
226     utils::SpinBarrier &my_b;
227     using wait_context = tbb::detail::d1::wait_context;
228 
229     struct Runner : NoAssign {
230         wait_context& myWait;
231         Runner(wait_context& w) : myWait(w) {}
232         void operator()() const {
233             utils::doDummyWork(10000);
234             myWait.release();
235         }
236     };
237 
238     struct Waiter : NoAssign {
239         wait_context& myWait;
240         Waiter(wait_context& w) : myWait(w) {}
241         void operator()() const {
242             tbb::task_group_context ctx;
243             tbb::detail::d1::wait(myWait, ctx);
244         }
245     };
246 
247 public:
248     MultipleMastersPart3(tbb::task_arena &a, utils::SpinBarrier &b)
249         : my_a(a), my_b(b) {}
250     void operator()(int) const {
251         wait_context wait(0);
252         my_b.wait(); // increases chances for task_arena initialization contention
253         for( int i=0; i<100; ++i) {
254             wait.reserve();
255             my_a.enqueue(Runner(wait));
256             my_a.execute(Waiter(wait));
257         }
258         my_b.wait();
259     }
260 };
261 
262 void TestMultipleMasters(int p) {
263     {
264         ResetTLS();
265         tbb::task_arena a(1,0);
266         a.initialize();
267         ArenaObserver o(a, 1, 0, 1);
268         utils::SpinBarrier barrier1(p), barrier2(2*p+1); // each of p threads will submit two tasks signaling the barrier
269         NativeParallelFor( p, MultipleMastersPart1(a, barrier1, barrier2) );
270         barrier2.wait();
271         a.debug_wait_until_empty();
272     } {
273         ResetTLS();
274         tbb::task_arena a(2,1);
275         ArenaObserver o(a, 2, 1, 2);
276         utils::SpinBarrier barrier(p+2);
277         a.enqueue(AsynchronousWork(barrier, /*blocking=*/true)); // occupy the worker, a regression test for bug 1981
278         // TODO: buggy test. A worker threads need time to occupy the slot to prevent an external thread from taking an enqueue task.
279         utils::Sleep(10);
280         NativeParallelFor( p, MultipleMastersPart2(a, barrier) );
281         barrier.wait();
282         a.debug_wait_until_empty();
283     } {
284         // Regression test for the bug 1981 part 2 (task_arena::execute() with wait_for_all for an enqueued task)
285         tbb::task_arena a(p,1);
286         utils::SpinBarrier barrier(p+1); // for external threads to avoid endless waiting at least in some runs
287         // "Oversubscribe" the arena by 1 external thread
288         NativeParallelFor( p+1, MultipleMastersPart3(a, barrier) );
289         a.debug_wait_until_empty();
290     }
291 }
292 
293 //--------------------------------------------------//
294 // TODO: explain what TestArenaEntryConsistency does
295 #include <sstream>
296 #include <stdexcept>
297 #include "oneapi/tbb/detail/_exception.h"
298 #include "common/fp_control.h"
299 
300 struct TestArenaEntryBody : FPModeContext {
301     std::atomic<int> &my_stage; // each execute increases it
302     std::stringstream my_id;
303     bool is_caught, is_expected;
304     enum { arenaFPMode = 1 };
305 
306     TestArenaEntryBody(std::atomic<int> &s, int idx, int i)  // init thread-specific instance
307     :   FPModeContext(idx+i)
308     ,   my_stage(s)
309     ,   is_caught(false)
310 #if TBB_USE_EXCEPTIONS
311     ,   is_expected( (idx&(1<<i)) != 0 )
312 #else
313     , is_expected(false)
314 #endif
315     {
316         my_id << idx << ':' << i << '@';
317     }
318     void operator()() { // inside task_arena::execute()
319         // synchronize with other stages
320         int stage = my_stage++;
321         int slot = tbb::this_task_arena::current_thread_index();
322         CHECK(slot >= 0);
323         CHECK(slot <= 1);
324         // wait until the third stage is delegated and then starts on slot 0
325         while(my_stage < 2+slot) utils::yield();
326         // deduct its entry type and put it into id, it helps to find source of a problem
327         my_id << (stage < 3 ? (tbb::this_task_arena::current_thread_index()?
328                               "delegated_to_worker" : stage < 2? "direct" : "delegated_to_master")
329                             : stage == 3? "nested_same_ctx" : "nested_alien_ctx");
330         AssertFPMode(arenaFPMode);
331         if (is_expected) {
332             TBB_TEST_THROW(std::logic_error(my_id.str()));
333         }
334         // no code can be put here since exceptions can be thrown
335     }
336     void on_exception(const char *e) { // outside arena, in catch block
337         is_caught = true;
338         CHECK(my_id.str() == e);
339         assertFPMode();
340     }
341     void after_execute() { // outside arena and catch block
342         CHECK(is_caught == is_expected);
343         assertFPMode();
344     }
345 };
346 
347 class ForEachArenaEntryBody : utils::NoAssign {
348     tbb::task_arena &my_a; // expected task_arena(2,1)
349     std::atomic<int> &my_stage; // each execute increases it
350     int my_idx;
351 
352 public:
353     ForEachArenaEntryBody(tbb::task_arena &a, std::atomic<int> &c)
354     : my_a(a), my_stage(c), my_idx(0) {}
355 
356     void test(int idx) {
357         my_idx = idx;
358         my_stage = 0;
359         NativeParallelFor(3, *this); // test cross-arena calls
360         CHECK(my_stage == 3);
361         my_a.execute(*this); // test nested calls
362         CHECK(my_stage == 5);
363     }
364 
365     // task_arena functor for nested tests
366     void operator()() const {
367         test_arena_entry(3); // in current task group context
368         tbb::parallel_for(4, 5, *this); // in different context
369     }
370 
371     // NativeParallelFor & parallel_for functor
372     void operator()(int i) const {
373         test_arena_entry(i);
374     }
375 
376 private:
377     void test_arena_entry(int i) const {
378         GetRoundingMode();
379         TestArenaEntryBody scoped_functor(my_stage, my_idx, i);
380         GetRoundingMode();
381 #if TBB_USE_EXCEPTIONS
382         try {
383             my_a.execute(scoped_functor);
384         } catch(std::logic_error &e) {
385             scoped_functor.on_exception(e.what());
386         } catch(...) { CHECK_MESSAGE(false, "Unexpected exception type"); }
387 #else
388         my_a.execute(scoped_functor);
389 #endif
390         scoped_functor.after_execute();
391     }
392 };
393 
394 void TestArenaEntryConsistency() {
395     tbb::task_arena a(2, 1);
396     std::atomic<int> c;
397     ForEachArenaEntryBody body(a, c);
398 
399     FPModeContext fp_scope(TestArenaEntryBody::arenaFPMode);
400     a.initialize(); // capture FP settings to arena
401     fp_scope.setNextFPMode();
402 
403     for (int i = 0; i < 100; i++) // not less than 32 = 2^5 of entry types
404         body.test(i);
405 }
406 //--------------------------------------------------
407 // Test that the requested degree of concurrency for task_arena is achieved in various conditions
408 class TestArenaConcurrencyBody : utils::NoAssign {
409     tbb::task_arena &my_a;
410     int my_max_concurrency;
411     int my_reserved_slots;
412     utils::SpinBarrier *my_barrier;
413     utils::SpinBarrier *my_worker_barrier;
414 public:
415     TestArenaConcurrencyBody( tbb::task_arena &a, int max_concurrency, int reserved_slots, utils::SpinBarrier *b = NULL, utils::SpinBarrier *wb = NULL )
416     : my_a(a), my_max_concurrency(max_concurrency), my_reserved_slots(reserved_slots), my_barrier(b), my_worker_barrier(wb) {}
417     // NativeParallelFor's functor
418     void operator()( int ) const {
419         CHECK_MESSAGE( local_id.local() == 0, "TLS was not cleaned?" );
420         local_id.local() = 1;
421         my_a.execute( *this );
422     }
423     // Arena's functor
424     void operator()() const {
425         int idx = tbb::this_task_arena::current_thread_index();
426         CHECK( idx < (my_max_concurrency > 1 ? my_max_concurrency : 2) );
427         CHECK( my_a.max_concurrency() == tbb::this_task_arena::max_concurrency() );
428         int max_arena_concurrency = tbb::this_task_arena::max_concurrency();
429         CHECK( max_arena_concurrency == my_max_concurrency );
430         if ( my_worker_barrier ) {
431             if ( local_id.local() == 1 ) {
432                 // External thread in a reserved slot
433                 CHECK_MESSAGE( idx < my_reserved_slots, "External threads are supposed to use only reserved slots in this test" );
434             } else {
435                 // Worker thread
436                 CHECK( idx >= my_reserved_slots );
437                 my_worker_barrier->wait();
438             }
439         } else if ( my_barrier )
440             CHECK_MESSAGE( local_id.local() == 1, "Workers are not supposed to enter the arena in this test" );
441         if ( my_barrier ) my_barrier->wait();
442         else utils::Sleep( 1 );
443     }
444 };
445 
446 void TestArenaConcurrency( int p, int reserved = 0, int step = 1) {
447     for (; reserved <= p; reserved += step) {
448         tbb::task_arena a( p, reserved );
449         { // Check concurrency with worker & reserved external threads.
450             ResetTLS();
451             utils::SpinBarrier b( p );
452             utils::SpinBarrier wb( p-reserved );
453             TestArenaConcurrencyBody test( a, p, reserved, &b, &wb );
454             for ( int i = reserved; i < p; ++i )
455                 a.enqueue( test );
456             if ( reserved==1 )
457                 test( 0 ); // calls execute()
458             else
459                 utils::NativeParallelFor( reserved, test );
460             a.debug_wait_until_empty();
461         } { // Check if multiple external threads alone can achieve maximum concurrency.
462             ResetTLS();
463             utils::SpinBarrier b( p );
464             utils::NativeParallelFor( p, TestArenaConcurrencyBody( a, p, reserved, &b ) );
465             a.debug_wait_until_empty();
466         } { // Check oversubscription by external threads.
467 #if !_WIN32 || !_WIN64
468             // Some C++ implementations allocate 8MB stacks for std::thread on 32 bit platforms
469             // that makes impossible to create more than ~500 threads.
470             if ( !(sizeof(std::size_t) == 4 && p > 200) )
471 #endif
472 #if TBB_TEST_LOW_WORKLOAD
473             if ( p <= 16 )
474 #endif
475             {
476                 ResetTLS();
477                 utils::NativeParallelFor(2 * p, TestArenaConcurrencyBody(a, p, reserved));
478                 a.debug_wait_until_empty();
479             }
480         }
481     }
482 }
483 
484 struct TestMandatoryConcurrencyObserver : public tbb::task_scheduler_observer {
485     utils::SpinBarrier& m_barrier;
486 
487     TestMandatoryConcurrencyObserver(tbb::task_arena& a, utils::SpinBarrier& barrier)
488         : tbb::task_scheduler_observer(a), m_barrier(barrier) {
489         observe(true);
490     }
491     void on_scheduler_exit(bool worker) override {
492         if (worker) {
493             m_barrier.wait();
494         }
495     }
496 };
497 
498 void TestMandatoryConcurrency() {
499     tbb::task_arena a(1);
500     a.execute([&a] {
501         int n_threads = 4;
502         utils::SpinBarrier exit_barrier(2);
503         TestMandatoryConcurrencyObserver observer(a, exit_barrier);
504         for (int j = 0; j < 5; ++j) {
505             utils::ExactConcurrencyLevel::check(1);
506             std::atomic<int> num_tasks{ 0 }, curr_tasks{ 0 };
507             utils::SpinBarrier barrier(n_threads);
508             utils::NativeParallelFor(n_threads, [&](int) {
509                 for (int i = 0; i < 5; ++i) {
510                     barrier.wait();
511                     a.enqueue([&] {
512                         CHECK(tbb::this_task_arena::max_concurrency() == 2);
513                         CHECK(a.max_concurrency() == 2);
514                         ++curr_tasks;
515                         CHECK(curr_tasks == 1);
516                         utils::doDummyWork(1000);
517                         CHECK(curr_tasks == 1);
518                         --curr_tasks;
519                         ++num_tasks;
520                     });
521                     barrier.wait();
522                 }
523             });
524             do {
525                 exit_barrier.wait();
526             } while (num_tasks < n_threads * 5);
527         }
528         observer.observe(false);
529     });
530 }
531 
532 void TestConcurrentFunctionality(int min_thread_num = 1, int max_thread_num = 3) {
533     TestMandatoryConcurrency();
534     InitializeAndTerminate(max_thread_num);
535     for (int p = min_thread_num; p <= max_thread_num; ++p) {
536         TestConcurrentArenas(p);
537         TestMultipleMasters(p);
538         TestArenaConcurrency(p);
539     }
540 }
541 
542 //--------------------------------------------------//
543 // Test creation/initialization of a task_arena that references an existing arena (aka attach).
544 // This part of the test uses the knowledge of task_arena internals
545 
546 struct TaskArenaValidator {
547     int my_slot_at_construction;
548     const tbb::task_arena& my_arena;
549     TaskArenaValidator( const tbb::task_arena& other )
550         : my_slot_at_construction(tbb::this_task_arena::current_thread_index())
551         , my_arena(other)
552     {}
553     // Inspect the internal state
554     int concurrency() { return my_arena.debug_max_concurrency(); }
555     int reserved_for_masters() { return my_arena.debug_reserved_slots(); }
556 
557     // This method should be called in task_arena::execute() for a captured arena
558     // by the same thread that created the validator.
559     void operator()() {
560         CHECK_MESSAGE( tbb::this_task_arena::current_thread_index()==my_slot_at_construction,
561                 "Current thread index has changed since the validator construction" );
562     }
563 };
564 
565 void ValidateAttachedArena( tbb::task_arena& arena, bool expect_activated,
566                             int expect_concurrency, int expect_masters ) {
567     CHECK_MESSAGE( arena.is_active()==expect_activated, "Unexpected activation state" );
568     if( arena.is_active() ) {
569         TaskArenaValidator validator( arena );
570         CHECK_MESSAGE( validator.concurrency()==expect_concurrency, "Unexpected arena size" );
571         CHECK_MESSAGE( validator.reserved_for_masters()==expect_masters, "Unexpected # of reserved slots" );
572         if ( tbb::this_task_arena::current_thread_index() != tbb::task_arena::not_initialized ) {
573             CHECK(tbb::this_task_arena::current_thread_index() >= 0);
574             // for threads already in arena, check that the thread index remains the same
575             arena.execute( validator );
576         } else { // not_initialized
577             // Test the deprecated method
578             CHECK(tbb::this_task_arena::current_thread_index() == -1);
579         }
580 
581         // Ideally, there should be a check for having the same internal arena object,
582         // but that object is not easily accessible for implicit arenas.
583     }
584 }
585 
586 struct TestAttachBody : utils::NoAssign {
587     static thread_local int my_idx; // safe to modify and use within the NativeParallelFor functor
588     const int maxthread;
589     TestAttachBody( int max_thr ) : maxthread(max_thr) {}
590 
591     // The functor body for NativeParallelFor
592     void operator()( int idx ) const {
593         my_idx = idx;
594 
595         int default_threads = tbb::this_task_arena::max_concurrency();
596 
597         tbb::task_arena arena = tbb::task_arena( tbb::task_arena::attach() );
598         ValidateAttachedArena( arena, false, -1, -1 ); // Nothing yet to attach to
599 
600         arena.terminate();
601         ValidateAttachedArena( arena, false, -1, -1 );
602 
603         // attach to an auto-initialized arena
604         tbb::parallel_for(0, 1, [](int) {});
605 
606         tbb::task_arena arena2 = tbb::task_arena( tbb::task_arena::attach() );
607         ValidateAttachedArena( arena2, true, default_threads, 1 );
608 
609         // attach to another task_arena
610         arena.initialize( maxthread, std::min(maxthread,idx) );
611         arena.execute( *this );
612     }
613 
614     // The functor body for task_arena::execute above
615     void operator()() const {
616         tbb::task_arena arena2 = tbb::task_arena( tbb::task_arena::attach() );
617         ValidateAttachedArena( arena2, true, maxthread, std::min(maxthread,my_idx) );
618     }
619 
620     // The functor body for tbb::parallel_for
621     void operator()( const Range& r ) const {
622         for( int i = r.begin(); i<r.end(); ++i ) {
623             tbb::task_arena arena2 = tbb::task_arena( tbb::task_arena::attach() );
624             ValidateAttachedArena( arena2, true, tbb::this_task_arena::max_concurrency(), 1 );
625         }
626     }
627 };
628 
629 thread_local int TestAttachBody::my_idx;
630 
631 void TestAttach( int maxthread ) {
632     // Externally concurrent, but no concurrency within a thread
633     utils::NativeParallelFor( std::max(maxthread,4), TestAttachBody( maxthread ) );
634     // Concurrent within the current arena; may also serve as a stress test
635     tbb::parallel_for( Range(0,10000*maxthread), TestAttachBody( maxthread ) );
636 }
637 
638 //--------------------------------------------------//
639 
640 // Test that task_arena::enqueue does not tolerate a non-const functor.
641 // TODO: can it be reworked as SFINAE-based compile-time check?
642 struct TestFunctor {
643     void operator()() { CHECK_MESSAGE( false, "Non-const operator called" ); }
644     void operator()() const { /* library requires this overload only */ }
645 };
646 
647 void TestConstantFunctorRequirement() {
648     tbb::task_arena a;
649     TestFunctor tf;
650     a.enqueue( tf );
651 }
652 
653 //--------------------------------------------------//
654 
655 #include "tbb/parallel_reduce.h"
656 #include "tbb/parallel_invoke.h"
657 
658 // Test this_task_arena::isolate
659 namespace TestIsolatedExecuteNS {
660     template <typename NestedPartitioner>
661     class NestedParFor : utils::NoAssign {
662     public:
663         NestedParFor() {}
664         void operator()() const {
665             NestedPartitioner p;
666             tbb::parallel_for( 0, 10, utils::DummyBody( 10 ), p );
667         }
668     };
669 
670     template <typename NestedPartitioner>
671     class ParForBody : utils::NoAssign {
672         bool myOuterIsolation;
673         tbb::enumerable_thread_specific<int> &myEts;
674         std::atomic<bool> &myIsStolen;
675     public:
676         ParForBody( bool outer_isolation, tbb::enumerable_thread_specific<int> &ets, std::atomic<bool> &is_stolen )
677             : myOuterIsolation( outer_isolation ), myEts( ets ), myIsStolen( is_stolen ) {}
678         void operator()( int ) const {
679             int &e = myEts.local();
680             if ( e++ > 0 ) myIsStolen = true;
681             if ( myOuterIsolation )
682                 NestedParFor<NestedPartitioner>()();
683             else
684                 tbb::this_task_arena::isolate( NestedParFor<NestedPartitioner>() );
685             --e;
686         }
687     };
688 
689     template <typename OuterPartitioner, typename NestedPartitioner>
690     class OuterParFor : utils::NoAssign {
691         bool myOuterIsolation;
692         std::atomic<bool> &myIsStolen;
693     public:
694         OuterParFor( bool outer_isolation, std::atomic<bool> &is_stolen ) : myOuterIsolation( outer_isolation ), myIsStolen( is_stolen ) {}
695         void operator()() const {
696             tbb::enumerable_thread_specific<int> ets( 0 );
697             OuterPartitioner p;
698             tbb::parallel_for( 0, 1000, ParForBody<NestedPartitioner>( myOuterIsolation, ets, myIsStolen ), p );
699         }
700     };
701 
702     template <typename OuterPartitioner, typename NestedPartitioner>
703     void TwoLoopsTest( bool outer_isolation ) {
704         std::atomic<bool> is_stolen;
705         is_stolen = false;
706         const int max_repeats = 100;
707         if ( outer_isolation ) {
708             for ( int i = 0; i <= max_repeats; ++i ) {
709                 tbb::this_task_arena::isolate( OuterParFor<OuterPartitioner, NestedPartitioner>( outer_isolation, is_stolen ) );
710                 if ( is_stolen ) break;
711             }
712             // TODO: was ASSERT_WARNING
713             if (!is_stolen) {
714                 REPORT("Warning: isolate() should not block stealing on nested levels without isolation\n");
715             }
716         } else {
717             for ( int i = 0; i <= max_repeats; ++i ) {
718                 OuterParFor<OuterPartitioner, NestedPartitioner>( outer_isolation, is_stolen )();
719             }
720             REQUIRE_MESSAGE( !is_stolen, "isolate() on nested levels should prevent stealing from outer leves" );
721         }
722     }
723 
724     void TwoLoopsTest( bool outer_isolation ) {
725         TwoLoopsTest<tbb::simple_partitioner, tbb::simple_partitioner>( outer_isolation );
726         TwoLoopsTest<tbb::simple_partitioner, tbb::affinity_partitioner>( outer_isolation );
727         TwoLoopsTest<tbb::affinity_partitioner, tbb::simple_partitioner>( outer_isolation );
728         TwoLoopsTest<tbb::affinity_partitioner, tbb::affinity_partitioner>( outer_isolation );
729     }
730 
731     void TwoLoopsTest() {
732         TwoLoopsTest( true );
733         TwoLoopsTest( false );
734     }
735     //--------------------------------------------------//
736     class HeavyMixTestBody : utils::NoAssign {
737         tbb::enumerable_thread_specific<utils::FastRandom<>>& myRandom;
738         tbb::enumerable_thread_specific<int>& myIsolatedLevel;
739         int myNestedLevel;
740 
741         template <typename Partitioner, typename Body>
742         static void RunTwoBodies( utils::FastRandom<>& rnd, const Body &body, Partitioner& p, tbb::task_group_context* ctx = NULL ) {
743             if ( rnd.get() % 2 ) {
744                 if  (ctx )
745                     tbb::parallel_for( 0, 2, body, p, *ctx );
746                 else
747                     tbb::parallel_for( 0, 2, body, p );
748             } else {
749                 tbb::parallel_invoke( body, body );
750             }
751         }
752 
753         template <typename Partitioner>
754         class IsolatedBody : utils::NoAssign {
755             const HeavyMixTestBody &myHeavyMixTestBody;
756             Partitioner &myPartitioner;
757         public:
758             IsolatedBody( const HeavyMixTestBody &body, Partitioner &partitioner )
759                 : myHeavyMixTestBody( body ), myPartitioner( partitioner ) {}
760             void operator()() const {
761                 RunTwoBodies( myHeavyMixTestBody.myRandom.local(),
762                     HeavyMixTestBody( myHeavyMixTestBody.myRandom, myHeavyMixTestBody.myIsolatedLevel,
763                         myHeavyMixTestBody.myNestedLevel + 1 ),
764                     myPartitioner );
765             }
766         };
767 
768         template <typename Partitioner>
769         void RunNextLevel( utils::FastRandom<>& rnd, int &isolated_level ) const {
770             Partitioner p;
771             switch ( rnd.get() % 2 ) {
772                 case 0: {
773                     // No features
774                     tbb::task_group_context ctx;
775                     RunTwoBodies( rnd, HeavyMixTestBody(myRandom, myIsolatedLevel, myNestedLevel + 1), p, &ctx );
776                     break;
777                 }
778                 case 1: {
779                     // Isolation
780                     int previous_isolation = isolated_level;
781                     isolated_level = myNestedLevel;
782                     tbb::this_task_arena::isolate( IsolatedBody<Partitioner>( *this, p ) );
783                     isolated_level = previous_isolation;
784                     break;
785                 }
786             }
787         }
788     public:
789         HeavyMixTestBody( tbb::enumerable_thread_specific<utils::FastRandom<>>& random,
790             tbb::enumerable_thread_specific<int>& isolated_level, int nested_level )
791             : myRandom( random ), myIsolatedLevel( isolated_level )
792             , myNestedLevel( nested_level ) {}
793         void operator()() const {
794             int &isolated_level = myIsolatedLevel.local();
795             CHECK_FAST_MESSAGE( myNestedLevel > isolated_level, "The outer-level task should not be stolen on isolated level" );
796             if ( myNestedLevel == 20 )
797                 return;
798             utils::FastRandom<>& rnd = myRandom.local();
799             if ( rnd.get() % 2 == 1 ) {
800                 RunNextLevel<tbb::auto_partitioner>( rnd, isolated_level );
801             } else {
802                 RunNextLevel<tbb::affinity_partitioner>( rnd, isolated_level );
803             }
804         }
805         void operator()(int) const {
806             this->operator()();
807         }
808     };
809 
810     struct RandomInitializer {
811         utils::FastRandom<> operator()() {
812             return utils::FastRandom<>( tbb::this_task_arena::current_thread_index() );
813         }
814     };
815 
816     void HeavyMixTest() {
817         std::size_t num_threads = tbb::this_task_arena::max_concurrency() < 3 ? 3 : tbb::this_task_arena::max_concurrency();
818         tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, num_threads);
819 
820         RandomInitializer init_random;
821         tbb::enumerable_thread_specific<utils::FastRandom<>> random( init_random );
822         tbb::enumerable_thread_specific<int> isolated_level( 0 );
823         for ( int i = 0; i < 5; ++i ) {
824             HeavyMixTestBody b( random, isolated_level, 1 );
825             b( 0 );
826         }
827     }
828 
829     //--------------------------------------------------//
830 #if TBB_USE_EXCEPTIONS
831     struct MyException {};
832     struct IsolatedBodyThrowsException {
833         void operator()() const {
834 #if _MSC_VER && !__INTEL_COMPILER
835             // Workaround an unreachable code warning in task_arena_function.
836             volatile bool workaround = true;
837             if (workaround)
838 #endif
839             {
840                 throw MyException();
841             }
842         }
843     };
844     struct ExceptionTestBody : utils::NoAssign {
845         tbb::enumerable_thread_specific<int>& myEts;
846         std::atomic<bool>& myIsStolen;
847         ExceptionTestBody( tbb::enumerable_thread_specific<int>& ets, std::atomic<bool>& is_stolen )
848             : myEts( ets ), myIsStolen( is_stolen ) {}
849         void operator()( int i ) const {
850             try {
851                 tbb::this_task_arena::isolate( IsolatedBodyThrowsException() );
852                 REQUIRE_MESSAGE( false, "The exception has been lost" );
853             }
854             catch ( MyException ) {}
855             catch ( ... ) {
856                 REQUIRE_MESSAGE( false, "Unexpected exception" );
857             }
858             // Check that nested algorithms can steal outer-level tasks
859             int &e = myEts.local();
860             if ( e++ > 0 ) myIsStolen = true;
861             // work imbalance increases chances for stealing
862             tbb::parallel_for( 0, 10+i, utils::DummyBody( 100 ) );
863             --e;
864         }
865     };
866 
867 #endif /* TBB_USE_EXCEPTIONS */
868     void ExceptionTest() {
869 #if TBB_USE_EXCEPTIONS
870         tbb::enumerable_thread_specific<int> ets;
871         std::atomic<bool> is_stolen;
872         is_stolen = false;
873         for ( ;; ) {
874             tbb::parallel_for( 0, 1000, ExceptionTestBody( ets, is_stolen ) );
875             if ( is_stolen ) break;
876         }
877         REQUIRE_MESSAGE( is_stolen, "isolate should not affect non-isolated work" );
878 #endif /* TBB_USE_EXCEPTIONS */
879     }
880 
881     struct NonConstBody {
882         unsigned int state;
883         void operator()() {
884             state ^= ~0u;
885         }
886     };
887 
888     void TestNonConstBody() {
889         NonConstBody body;
890         body.state = 0x6c97d5ed;
891         tbb::this_task_arena::isolate(body);
892         REQUIRE_MESSAGE(body.state == 0x93682a12, "The wrong state");
893     }
894 
895     // TODO: Consider tbb::task_group instead of explicit task API.
896     class TestEnqueueTask : public tbb::detail::d1::task {
897         using wait_context = tbb::detail::d1::wait_context;
898 
899         tbb::enumerable_thread_specific<bool>& executed;
900         std::atomic<int>& completed;
901 
902     public:
903         wait_context& waiter;
904         tbb::task_arena& arena;
905         static const int N = 100;
906 
907         TestEnqueueTask(tbb::enumerable_thread_specific<bool>& exe, std::atomic<int>& c, wait_context& w, tbb::task_arena& a)
908             : executed(exe), completed(c), waiter(w), arena(a) {}
909 
910         tbb::detail::d1::task* execute(tbb::detail::d1::execution_data&) override {
911             for (int i = 0; i < N; ++i) {
912                 arena.enqueue([&]() {
913                     executed.local() = true;
914                     ++completed;
915                     for (int j = 0; j < 100; j++) utils::yield();
916                     waiter.release(1);
917                 });
918             }
919             return nullptr;
920         }
921         tbb::detail::d1::task* cancel(tbb::detail::d1::execution_data&) override { return nullptr; }
922     };
923 
924     class TestEnqueueIsolateBody : utils::NoCopy {
925         tbb::enumerable_thread_specific<bool>& executed;
926         std::atomic<int>& completed;
927         tbb::task_arena& arena;
928     public:
929         static const int N = 100;
930 
931         TestEnqueueIsolateBody(tbb::enumerable_thread_specific<bool>& exe, std::atomic<int>& c, tbb::task_arena& a)
932             : executed(exe), completed(c), arena(a) {}
933         void operator()() {
934             tbb::task_group_context ctx;
935             tbb::detail::d1::wait_context waiter(N);
936 
937             TestEnqueueTask root(executed, completed, waiter, arena);
938             tbb::detail::d1::execute_and_wait(root, ctx, waiter, ctx);
939         }
940     };
941 
942     void TestEnqueue() {
943         tbb::enumerable_thread_specific<bool> executed(false);
944         std::atomic<int> completed;
945         tbb::task_arena arena = tbb::task_arena(tbb::task_arena::attach());
946 
947         // Check that the main thread can process enqueued tasks.
948         completed = 0;
949         TestEnqueueIsolateBody b1(executed, completed, arena);
950         b1();
951 
952         if (!executed.local()) {
953             REPORT("Warning: No one enqueued task has executed by the main thread.\n");
954         }
955 
956         executed.local() = false;
957         completed = 0;
958         const int N = 100;
959         // Create enqueued tasks out of isolation.
960 
961         tbb::task_group_context ctx;
962         tbb::detail::d1::wait_context waiter(N);
963         for (int i = 0; i < N; ++i) {
964             arena.enqueue([&]() {
965                 executed.local() = true;
966                 ++completed;
967                 utils::yield();
968                 waiter.release(1);
969             });
970         }
971         TestEnqueueIsolateBody b2(executed, completed, arena);
972         tbb::this_task_arena::isolate(b2);
973         REQUIRE_MESSAGE(executed.local() == false, "An enqueued task was executed within isolate.");
974 
975         tbb::detail::d1::wait(waiter, ctx);
976         // while (completed < TestEnqueueTask::N + N) utils::yield();
977     }
978 }
979 
980 void TestIsolatedExecute() {
981     // At least 3 threads (owner + 2 thieves) are required to reproduce a situation when the owner steals outer
982     // level task on a nested level. If we have only one thief then it will execute outer level tasks first and
983     // the owner will not have a possibility to steal outer level tasks.
984     int platform_max_thread = tbb::this_task_arena::max_concurrency();
985     int num_threads = utils::min( platform_max_thread, 3 );
986     {
987         // Too many threads require too many work to reproduce the stealing from outer level.
988         tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, utils::max(num_threads, 7));
989         TestIsolatedExecuteNS::TwoLoopsTest();
990         TestIsolatedExecuteNS::HeavyMixTest();
991         TestIsolatedExecuteNS::ExceptionTest();
992     }
993     tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, num_threads);
994     TestIsolatedExecuteNS::HeavyMixTest();
995     TestIsolatedExecuteNS::TestNonConstBody();
996     TestIsolatedExecuteNS::TestEnqueue();
997 }
998 
999 //-----------------------------------------------------------------------------------------//
1000 
1001 class TestDelegatedSpawnWaitBody : utils::NoAssign {
1002     tbb::task_arena &my_a;
1003     utils::SpinBarrier &my_b1, &my_b2;
1004 public:
1005     TestDelegatedSpawnWaitBody( tbb::task_arena &a, utils::SpinBarrier &b1, utils::SpinBarrier &b2)
1006         : my_a(a), my_b1(b1), my_b2(b2) {}
1007     // NativeParallelFor's functor
1008     void operator()(int idx) const {
1009         if ( idx==0 ) { // thread 0 works in the arena, thread 1 waits for it (to prevent test hang)
1010             for (int i = 0; i < 2; ++i) {
1011                 my_a.enqueue([this] { my_b1.wait(); }); // tasks to sync with workers
1012             }
1013             tbb::task_group tg;
1014             my_b1.wait(); // sync with the workers
1015             for( int i=0; i<100000; ++i) {
1016                 my_a.execute([&tg] { tg.run([] {}); });
1017             }
1018             my_a.execute([&tg] {tg.wait(); });
1019         }
1020 
1021         my_b2.wait(); // sync both threads
1022     }
1023 };
1024 
1025 void TestDelegatedSpawnWait() {
1026     // Regression test for a bug with missed wakeup notification from a delegated task
1027     tbb::task_arena a(2,0);
1028     a.initialize();
1029     utils::SpinBarrier barrier1(3), barrier2(2);
1030     utils::NativeParallelFor( 2, TestDelegatedSpawnWaitBody(a, barrier1, barrier2) );
1031     a.debug_wait_until_empty();
1032 }
1033 
1034 //-----------------------------------------------------------------------------------------//
1035 
1036 class TestMultipleWaitsArenaWait : utils::NoAssign {
1037     using wait_context = tbb::detail::d1::wait_context;
1038 public:
1039     TestMultipleWaitsArenaWait( int idx, int bunch_size, int num_tasks, std::vector<wait_context*>& waiters, std::atomic<int>& processed, tbb::task_group_context& tgc )
1040         : my_idx( idx ), my_bunch_size( bunch_size ), my_num_tasks(num_tasks), my_waiters( waiters ), my_processed( processed ), my_context(tgc) {}
1041     void operator()() const {
1042         ++my_processed;
1043         // Wait for all tasks
1044         if ( my_idx < my_num_tasks ) {
1045             tbb::detail::d1::wait(*my_waiters[my_idx], my_context);
1046         }
1047         // Signal waiting tasks
1048         if ( my_idx >= my_bunch_size ) {
1049             my_waiters[my_idx-my_bunch_size]->release();
1050         }
1051     }
1052 private:
1053     int my_idx;
1054     int my_bunch_size;
1055     int my_num_tasks;
1056     std::vector<wait_context*>& my_waiters;
1057     std::atomic<int>& my_processed;
1058     tbb::task_group_context& my_context;
1059 };
1060 
1061 class TestMultipleWaitsThreadBody : utils::NoAssign {
1062     using wait_context = tbb::detail::d1::wait_context;
1063 public:
1064     TestMultipleWaitsThreadBody( int bunch_size, int num_tasks, tbb::task_arena& a, std::vector<wait_context*>& waiters, std::atomic<int>& processed, tbb::task_group_context& tgc )
1065         : my_bunch_size( bunch_size ), my_num_tasks( num_tasks ), my_arena( a ), my_waiters( waiters ), my_processed( processed ), my_context(tgc) {}
1066     void operator()( int idx ) const {
1067         my_arena.execute( TestMultipleWaitsArenaWait( idx, my_bunch_size, my_num_tasks, my_waiters, my_processed, my_context ) );
1068         --my_processed;
1069     }
1070 private:
1071     int my_bunch_size;
1072     int my_num_tasks;
1073     tbb::task_arena& my_arena;
1074     std::vector<wait_context*>& my_waiters;
1075     std::atomic<int>& my_processed;
1076     tbb::task_group_context& my_context;
1077 };
1078 
1079 void TestMultipleWaits( int num_threads, int num_bunches, int bunch_size ) {
1080     tbb::task_arena a( num_threads );
1081     const int num_tasks = (num_bunches-1)*bunch_size;
1082 
1083     tbb::task_group_context tgc;
1084     std::vector<tbb::detail::d1::wait_context*> waiters(num_tasks);
1085     for (auto& w : waiters) w = new tbb::detail::d1::wait_context(0);
1086 
1087     std::atomic<int> processed(0);
1088     for ( int repeats = 0; repeats<10; ++repeats ) {
1089         int idx = 0;
1090         for ( int bunch = 0; bunch < num_bunches-1; ++bunch ) {
1091             // Sync with the previous bunch of waiters to prevent "false" nested dependicies (when a nested task waits for an outer task).
1092             while ( processed < bunch*bunch_size ) utils::yield();
1093             // Run the bunch of threads/waiters that depend on the next bunch of threads/waiters.
1094             for ( int i = 0; i<bunch_size; ++i ) {
1095                 waiters[idx]->reserve();
1096                 std::thread( TestMultipleWaitsThreadBody( bunch_size, num_tasks, a, waiters, processed, tgc ), idx++ ).detach();
1097             }
1098         }
1099         // No sync because the threads of the last bunch do not call wait_for_all.
1100         // Run the last bunch of threads.
1101         for ( int i = 0; i<bunch_size; ++i )
1102             std::thread( TestMultipleWaitsThreadBody( bunch_size, num_tasks, a, waiters, processed, tgc ), idx++ ).detach();
1103         while ( processed ) utils::yield();
1104     }
1105     for (auto w : waiters) delete w;
1106 }
1107 
1108 void TestMultipleWaits() {
1109     // Limit the number of threads to prevent heavy oversubscription.
1110 #if TBB_TEST_LOW_WORKLOAD
1111     const int max_threads = std::min( 4, tbb::this_task_arena::max_concurrency() );
1112 #else
1113     const int max_threads = std::min( 16, tbb::this_task_arena::max_concurrency() );
1114 #endif
1115 
1116     utils::FastRandom<> rnd(1234);
1117     for ( int threads = 1; threads <= max_threads; threads += utils::max( threads/2, 1 ) ) {
1118         for ( int i = 0; i<3; ++i ) {
1119             const int num_bunches = 3 + rnd.get()%3;
1120             const int bunch_size = max_threads + rnd.get()%max_threads;
1121             TestMultipleWaits( threads, num_bunches, bunch_size );
1122         }
1123     }
1124 }
1125 
1126 //--------------------------------------------------//
1127 
1128 void TestSmallStackSize() {
1129     tbb::global_control gc(tbb::global_control::thread_stack_size,
1130             tbb::global_control::active_value(tbb::global_control::thread_stack_size) / 2 );
1131     // The test produces the warning (not a error) if fails. So the test is run many times
1132     // to make the log annoying (to force to consider it as an error).
1133     for (int i = 0; i < 100; ++i) {
1134         tbb::task_arena a;
1135         a.initialize();
1136     }
1137 }
1138 
1139 //--------------------------------------------------//
1140 
1141 namespace TestMoveSemanticsNS {
1142     struct TestFunctor {
1143         void operator()() const {};
1144     };
1145 
1146     struct MoveOnlyFunctor : utils::MoveOnly, TestFunctor {
1147         MoveOnlyFunctor() : utils::MoveOnly() {};
1148         MoveOnlyFunctor(MoveOnlyFunctor&& other) : utils::MoveOnly(std::move(other)) {};
1149     };
1150 
1151     struct MovePreferableFunctor : utils::Movable, TestFunctor {
1152         MovePreferableFunctor() : utils::Movable() {};
1153         MovePreferableFunctor(MovePreferableFunctor&& other) : utils::Movable( std::move(other) ) {};
1154         MovePreferableFunctor(const MovePreferableFunctor& other) : utils::Movable(other) {};
1155     };
1156 
1157     struct NoMoveNoCopyFunctor : utils::NoCopy, TestFunctor {
1158         NoMoveNoCopyFunctor() : utils::NoCopy() {};
1159         // mv ctor is not allowed as cp ctor from parent NoCopy
1160     private:
1161         NoMoveNoCopyFunctor(NoMoveNoCopyFunctor&&);
1162     };
1163 
1164     void TestFunctors() {
1165         tbb::task_arena ta;
1166         MovePreferableFunctor mpf;
1167         // execute() doesn't have any copies or moves of arguments inside the impl
1168         ta.execute( NoMoveNoCopyFunctor() );
1169 
1170         ta.enqueue( MoveOnlyFunctor() );
1171         ta.enqueue( mpf );
1172         REQUIRE_MESSAGE(mpf.alive, "object was moved when was passed by lval");
1173         mpf.Reset();
1174         ta.enqueue( std::move(mpf) );
1175         REQUIRE_MESSAGE(!mpf.alive, "object was copied when was passed by rval");
1176         mpf.Reset();
1177     }
1178 }
1179 
1180 void TestMoveSemantics() {
1181     TestMoveSemanticsNS::TestFunctors();
1182 }
1183 
1184 //--------------------------------------------------//
1185 
1186 #include <vector>
1187 
1188 #include "common/state_trackable.h"
1189 
1190 namespace TestReturnValueNS {
1191     struct noDefaultTag {};
1192     class ReturnType : public StateTrackable<> {
1193         static const int SIZE = 42;
1194         std::vector<int> data;
1195     public:
1196         ReturnType(noDefaultTag) : StateTrackable<>(0) {}
1197         // Define copy constructor to test that it is never called
1198         ReturnType(const ReturnType& r) : StateTrackable<>(r), data(r.data) {}
1199         ReturnType(ReturnType&& r) : StateTrackable<>(std::move(r)), data(std::move(r.data)) {}
1200 
1201         void fill() {
1202             for (int i = 0; i < SIZE; ++i)
1203                 data.push_back(i);
1204         }
1205         void check() {
1206             REQUIRE(data.size() == unsigned(SIZE));
1207             for (int i = 0; i < SIZE; ++i)
1208                 REQUIRE(data[i] == i);
1209             StateTrackableCounters::counters_type& cnts = StateTrackableCounters::counters;
1210             REQUIRE(cnts[StateTrackableBase::DefaultInitialized] == 0);
1211             REQUIRE(cnts[StateTrackableBase::DirectInitialized] == 1);
1212             std::size_t copied = cnts[StateTrackableBase::CopyInitialized];
1213             std::size_t moved = cnts[StateTrackableBase::MoveInitialized];
1214             REQUIRE(cnts[StateTrackableBase::Destroyed] == copied + moved);
1215             // The number of copies/moves should not exceed 3: function return, store to an internal storage,
1216             // acquire internal storage.
1217             REQUIRE((copied == 0 && moved <=3));
1218         }
1219     };
1220 
1221     template <typename R>
1222     R function() {
1223         noDefaultTag tag;
1224         R r(tag);
1225         r.fill();
1226         return r;
1227     }
1228 
1229     template <>
1230     void function<void>() {}
1231 
1232     template <typename R>
1233     struct Functor {
1234         R operator()() const {
1235             return function<R>();
1236         }
1237     };
1238 
1239     tbb::task_arena& arena() {
1240         static tbb::task_arena a;
1241         return a;
1242     }
1243 
1244     template <typename F>
1245     void TestExecute(F &f) {
1246         StateTrackableCounters::reset();
1247         ReturnType r = arena().execute(f);
1248         r.check();
1249     }
1250 
1251     template <typename F>
1252     void TestExecute(const F &f) {
1253         StateTrackableCounters::reset();
1254         ReturnType r = arena().execute(f);
1255         r.check();
1256     }
1257     template <typename F>
1258     void TestIsolate(F &f) {
1259         StateTrackableCounters::reset();
1260         ReturnType r = tbb::this_task_arena::isolate(f);
1261         r.check();
1262     }
1263 
1264     template <typename F>
1265     void TestIsolate(const F &f) {
1266         StateTrackableCounters::reset();
1267         ReturnType r = tbb::this_task_arena::isolate(f);
1268         r.check();
1269     }
1270 
1271     void Test() {
1272         TestExecute(Functor<ReturnType>());
1273         Functor<ReturnType> f1;
1274         TestExecute(f1);
1275         TestExecute(function<ReturnType>);
1276 
1277         arena().execute(Functor<void>());
1278         Functor<void> f2;
1279         arena().execute(f2);
1280         arena().execute(function<void>);
1281         TestIsolate(Functor<ReturnType>());
1282         TestIsolate(f1);
1283         TestIsolate(function<ReturnType>);
1284         tbb::this_task_arena::isolate(Functor<void>());
1285         tbb::this_task_arena::isolate(f2);
1286         tbb::this_task_arena::isolate(function<void>);
1287     }
1288 }
1289 
1290 void TestReturnValue() {
1291     TestReturnValueNS::Test();
1292 }
1293 
1294 //--------------------------------------------------//
1295 
1296 // MyObserver checks if threads join to the same arena
1297 struct MyObserver: public tbb::task_scheduler_observer {
1298     tbb::enumerable_thread_specific<tbb::task_arena*>& my_tls;
1299     tbb::task_arena& my_arena;
1300     std::atomic<int>& my_failure_counter;
1301     std::atomic<int>& my_counter;
1302     utils::SpinBarrier& m_barrier;
1303 
1304     MyObserver(tbb::task_arena& a,
1305         tbb::enumerable_thread_specific<tbb::task_arena*>& tls,
1306         std::atomic<int>& failure_counter,
1307         std::atomic<int>& counter,
1308         utils::SpinBarrier& barrier)
1309         : tbb::task_scheduler_observer(a), my_tls(tls), my_arena(a),
1310         my_failure_counter(failure_counter), my_counter(counter), m_barrier(barrier) {
1311         observe(true);
1312     }
1313     void on_scheduler_entry(bool worker) override {
1314         if (worker) {
1315             ++my_counter;
1316             tbb::task_arena*& cur_arena = my_tls.local();
1317             if (cur_arena != 0 && cur_arena != &my_arena) {
1318                 ++my_failure_counter;
1319             }
1320             cur_arena = &my_arena;
1321             m_barrier.wait();
1322         }
1323     }
1324     void on_scheduler_exit(bool worker) override {
1325         if (worker) {
1326             m_barrier.wait(); // before wakeup
1327             m_barrier.wait(); // after wakeup
1328         }
1329     }
1330 };
1331 
1332 void TestArenaWorkersMigrationWithNumThreads(int n_threads = 0) {
1333     if (n_threads == 0) {
1334         n_threads = tbb::this_task_arena::max_concurrency();
1335     }
1336 
1337     const int max_n_arenas = 8;
1338     int n_arenas = 2;
1339     if(n_threads > 16) {
1340         n_arenas = max_n_arenas;
1341     } else if (n_threads > 8) {
1342         n_arenas = 4;
1343     }
1344 
1345     int n_workers = n_threads - 1;
1346     n_workers = n_arenas * (n_workers / n_arenas);
1347     if (n_workers == 0) {
1348         return;
1349     }
1350 
1351     n_threads = n_workers + 1;
1352     tbb::global_control control(tbb::global_control::max_allowed_parallelism, n_threads);
1353 
1354     const int n_repetitions = 20;
1355     const int n_outer_repetitions = 100;
1356     std::multiset<float> failure_ratio; // for median calculating
1357     utils::SpinBarrier barrier(n_threads);
1358     utils::SpinBarrier worker_barrier(n_workers);
1359     MyObserver* observer[max_n_arenas];
1360     std::vector<tbb::task_arena> arenas(n_arenas);
1361     std::atomic<int> failure_counter;
1362     std::atomic<int> counter;
1363     tbb::enumerable_thread_specific<tbb::task_arena*> tls;
1364 
1365     for (int i = 0; i < n_arenas; ++i) {
1366         arenas[i].initialize(n_workers / n_arenas + 1); // +1 for master
1367         observer[i] = new MyObserver(arenas[i], tls, failure_counter, counter, barrier);
1368     }
1369 
1370     int ii = 0;
1371     for (; ii < n_outer_repetitions; ++ii) {
1372         failure_counter = 0;
1373         counter = 0;
1374 
1375         // Main code
1376         auto wakeup = [&arenas] { for (auto& a : arenas) a.enqueue([]{}); };
1377         wakeup();
1378         for (int j = 0; j < n_repetitions; ++j) {
1379             barrier.wait(); // entry
1380             barrier.wait(); // exit1
1381             wakeup();
1382             barrier.wait(); // exit2
1383         }
1384         barrier.wait(); // entry
1385         barrier.wait(); // exit1
1386         barrier.wait(); // exit2
1387 
1388         failure_ratio.insert(float(failure_counter) / counter);
1389         tls.clear();
1390         // collect 3 elements in failure_ratio before calculating median
1391         if (ii > 1) {
1392             std::multiset<float>::iterator it = failure_ratio.begin();
1393             std::advance(it, failure_ratio.size() / 2);
1394             if (*it < 0.02)
1395                 break;
1396         }
1397     }
1398     for (int i = 0; i < n_arenas; ++i) {
1399         delete observer[i];
1400     }
1401     // check if median is so big
1402     std::multiset<float>::iterator it = failure_ratio.begin();
1403     std::advance(it, failure_ratio.size() / 2);
1404     // TODO: decrease constants 0.05 and 0.3 by setting ratio between n_threads and n_arenas
1405     if (*it > 0.05) {
1406         REPORT("Warning: So many cases when threads join to different arenas.\n");
1407         REQUIRE_MESSAGE(*it <= 0.3, "A lot of cases when threads join to different arenas.\n");
1408     }
1409 }
1410 
1411 void TestArenaWorkersMigration() {
1412     TestArenaWorkersMigrationWithNumThreads(4);
1413     if (tbb::this_task_arena::max_concurrency() != 4) {
1414         TestArenaWorkersMigrationWithNumThreads();
1415     }
1416 }
1417 
1418 //--------------------------------------------------//
1419 void TestDefaultCreatedWorkersAmount() {
1420     int threads = tbb::this_task_arena::max_concurrency();
1421     utils::NativeParallelFor(1, [threads](int idx) {
1422         REQUIRE_MESSAGE(idx == 0, "more than 1 thread is going to reset TLS");
1423         utils::SpinBarrier barrier(threads);
1424         ResetTLS();
1425         for (int trail = 0; trail < 10; ++trail) {
1426             tbb::parallel_for(0, threads, [threads, &barrier](int) {
1427                 REQUIRE_MESSAGE(threads == tbb::this_task_arena::max_concurrency(), "concurrency level is not equal specified threadnum");
1428                 REQUIRE_MESSAGE(tbb::this_task_arena::current_thread_index() < tbb::this_task_arena::max_concurrency(), "amount of created threads is more than specified by default");
1429                 local_id.local() = 1;
1430                 // If there is more threads than expected, 'sleep' gives a chance to join unexpected threads.
1431                 utils::Sleep(1);
1432                 barrier.wait();
1433                 }, tbb::simple_partitioner());
1434             REQUIRE_MESSAGE(local_id.size() == size_t(threads), "amount of created threads is not equal to default num");
1435         }
1436     });
1437 }
1438 
1439 void TestAbilityToCreateWorkers(int thread_num) {
1440     tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, thread_num);
1441     // Checks only some part of reserved-external threads amount:
1442     // 0 and 1 reserved threads are important cases but it is also needed
1443     // to collect some statistic data with other amount and to not consume
1444     // whole test sesion time checking each amount
1445     TestArenaConcurrency(thread_num - 1, 0, int(thread_num / 2.72));
1446     TestArenaConcurrency(thread_num, 1, int(thread_num / 3.14));
1447 }
1448 
1449 void TestDefaultWorkersLimit() {
1450     TestDefaultCreatedWorkersAmount();
1451 #if TBB_TEST_LOW_WORKLOAD
1452     TestAbilityToCreateWorkers(24);
1453 #else
1454     TestAbilityToCreateWorkers(256);
1455 #endif
1456 }
1457 
1458 #if TBB_USE_EXCEPTIONS
1459 
1460 void ExceptionInExecute() {
1461     std::size_t thread_number = utils::get_platform_max_threads();
1462     tbb::task_arena test_arena(thread_number / 2, thread_number / 2);
1463 
1464     std::atomic<int> canceled_task{};
1465 
1466     auto parallel_func = [&test_arena, &canceled_task] (std::size_t) {
1467         for (std::size_t i = 0; i < 1000; ++i) {
1468             try {
1469                 test_arena.execute([] {
1470                     volatile bool suppress_unreachable_code_warning = true;
1471                     if (suppress_unreachable_code_warning) {
1472                         throw -1;
1473                     }
1474                 });
1475                 FAIL("An exception should have thrown.");
1476             } catch (int) {
1477                 ++canceled_task;
1478             } catch (...) {
1479                 FAIL("Wrong type of exception.");
1480             }
1481         }
1482     };
1483 
1484     utils::NativeParallelFor(thread_number, parallel_func);
1485     CHECK(canceled_task == thread_number * 1000);
1486 }
1487 
1488 #endif // TBB_USE_EXCEPTIONS
1489 
1490 class simple_observer : public tbb::task_scheduler_observer {
1491     static std::atomic<int> idx_counter;
1492     int my_idx;
1493     int myMaxConcurrency;   // concurrency of the associated arena
1494     int myNumReservedSlots; // reserved slots in the associated arena
1495     void on_scheduler_entry( bool is_worker ) override {
1496         int current_index = tbb::this_task_arena::current_thread_index();
1497         CHECK(current_index < (myMaxConcurrency > 1 ? myMaxConcurrency : 2));
1498         if (is_worker) {
1499             CHECK(current_index >= myNumReservedSlots);
1500         }
1501     }
1502     void on_scheduler_exit( bool /*is_worker*/ ) override
1503     {}
1504 public:
1505     simple_observer(tbb::task_arena &a, int maxConcurrency, int numReservedSlots)
1506         : tbb::task_scheduler_observer(a), my_idx(idx_counter++)
1507         , myMaxConcurrency(maxConcurrency)
1508         , myNumReservedSlots(numReservedSlots) {
1509         observe(true);
1510     }
1511 
1512     friend bool operator<(const simple_observer& lhs, const simple_observer& rhs) {
1513         return lhs.my_idx < rhs.my_idx;
1514     }
1515 };
1516 
1517 std::atomic<int> simple_observer::idx_counter{};
1518 
1519 struct arena_handler {
1520     enum arena_status {
1521         alive,
1522         deleting,
1523         deleted
1524     };
1525 
1526     tbb::task_arena* arena;
1527 
1528     std::atomic<arena_status> status{alive};
1529     tbb::spin_rw_mutex arena_in_use{};
1530 
1531     tbb::concurrent_set<simple_observer> observers;
1532 
1533     arena_handler(tbb::task_arena* ptr) : arena(ptr)
1534     {}
1535 
1536     friend bool operator<(const arena_handler& lhs, const arena_handler& rhs) {
1537         return lhs.arena < rhs.arena;
1538     }
1539 };
1540 
1541 // TODO: Add observer operations
1542 void StressTestMixFunctionality() {
1543     enum operation_type {
1544         create_arena,
1545         delete_arena,
1546         attach_observer,
1547         detach_observer,
1548         arena_execute,
1549         enqueue_task,
1550         last_operation_marker
1551     };
1552 
1553     std::size_t operations_number = last_operation_marker;
1554     std::size_t thread_number = utils::get_platform_max_threads();
1555     utils::FastRandom<> operation_rnd(42);
1556     tbb::spin_mutex random_operation_guard;
1557 
1558     auto get_random_operation = [&operation_rnd, &random_operation_guard, operations_number] () {
1559         tbb::spin_mutex::scoped_lock lock(random_operation_guard);
1560         return static_cast<operation_type>(operation_rnd.get() % operations_number);
1561     };
1562 
1563     utils::FastRandom<> arena_rnd(42);
1564     tbb::spin_mutex random_arena_guard;
1565     auto get_random_arena = [&arena_rnd, &random_arena_guard] () {
1566         tbb::spin_mutex::scoped_lock lock(random_arena_guard);
1567         return arena_rnd.get();
1568     };
1569 
1570     tbb::concurrent_set<arena_handler> arenas_pool;
1571 
1572     std::vector<std::thread> thread_pool;
1573 
1574     utils::SpinBarrier thread_barrier(thread_number);
1575     std::size_t max_operations = 20000;
1576     std::atomic<std::size_t> curr_operation{};
1577     auto thread_func = [&] () {
1578         arenas_pool.emplace(new tbb::task_arena());
1579         thread_barrier.wait();
1580         while (curr_operation++ < max_operations) {
1581             switch (get_random_operation()) {
1582                 case create_arena :
1583                 {
1584                     arenas_pool.emplace(new tbb::task_arena());
1585                     break;
1586                 }
1587                 case delete_arena :
1588                 {
1589                     auto curr_arena = arenas_pool.begin();
1590                     for (; curr_arena != arenas_pool.end(); ++curr_arena) {
1591                         arena_handler::arena_status curr_status = arena_handler::alive;
1592                         if (curr_arena->status.compare_exchange_strong(curr_status, arena_handler::deleting)) {
1593                             break;
1594                         }
1595                     }
1596 
1597                     if (curr_arena == arenas_pool.end()) break;
1598 
1599                     tbb::spin_rw_mutex::scoped_lock lock(curr_arena->arena_in_use, /*writer*/ true);
1600 
1601                     delete curr_arena->arena;
1602                     curr_arena->status.store(arena_handler::deleted);
1603 
1604                     break;
1605                 }
1606                 case attach_observer :
1607                 {
1608                     tbb::spin_rw_mutex::scoped_lock lock{};
1609                     auto curr_arena = arenas_pool.begin();
1610                     for (; curr_arena != arenas_pool.end(); ++curr_arena) {
1611                         if (lock.try_acquire(curr_arena->arena_in_use, /*writer*/ false)) {
1612                             if (curr_arena->status == arena_handler::alive) {
1613                                 break;
1614                             } else {
1615                                 lock.release();
1616                             }
1617                         }
1618                     }
1619 
1620                     if (curr_arena == arenas_pool.end()) break;
1621 
1622                     {
1623                         curr_arena->observers.emplace(*curr_arena->arena, thread_number, 1);
1624                     }
1625 
1626                     break;
1627                 }
1628                 case detach_observer :
1629                 {
1630                     auto arena_number = get_random_arena() % arenas_pool.size();
1631                     auto curr_arena = arenas_pool.begin();
1632                     std::advance(curr_arena, arena_number);
1633 
1634                     for (auto it = curr_arena->observers.begin(); it != curr_arena->observers.end(); ++it) {
1635                         if (it->is_observing()) {
1636                             it->observe(false);
1637                             break;
1638                         }
1639                     }
1640 
1641                     break;
1642                 }
1643                 case arena_execute :
1644                 {
1645                     tbb::spin_rw_mutex::scoped_lock lock{};
1646                     auto curr_arena = arenas_pool.begin();
1647                     for (; curr_arena != arenas_pool.end(); ++curr_arena) {
1648                         if (lock.try_acquire(curr_arena->arena_in_use, /*writer*/ false)) {
1649                             if (curr_arena->status == arena_handler::alive) {
1650                                 break;
1651                             } else {
1652                                 lock.release();
1653                             }
1654                         }
1655                     }
1656 
1657                     if (curr_arena == arenas_pool.end()) break;
1658 
1659                     curr_arena->arena->execute([] () {
1660                         tbb::parallel_for(tbb::blocked_range<std::size_t>(0, 10000), [] (tbb::blocked_range<std::size_t>&) {
1661                             std::atomic<int> sum{};
1662                             // Make some work
1663                             for (; sum < 10; ++sum) ;
1664                         });
1665                     });
1666 
1667                     break;
1668                 }
1669                 case enqueue_task :
1670                 {
1671                     tbb::spin_rw_mutex::scoped_lock lock{};
1672                     auto curr_arena = arenas_pool.begin();
1673                     for (; curr_arena != arenas_pool.end(); ++curr_arena) {
1674                         if (lock.try_acquire(curr_arena->arena_in_use, /*writer*/ false)) {
1675                             if (curr_arena->status == arena_handler::alive) {
1676                                 break;
1677                             } else {
1678                                 lock.release();
1679                             }
1680                         }
1681                     }
1682 
1683                     if (curr_arena == arenas_pool.end()) break;
1684 
1685                     curr_arena->arena->enqueue([] {
1686                         std::atomic<int> sum{};
1687                         // Make some work
1688                         for (; sum < 1000; ++sum) ;
1689                     });
1690 
1691                     break;
1692                 }
1693                 case last_operation_marker :
1694                 break;
1695             }
1696         }
1697     };
1698 
1699     for (std::size_t i = 0; i < thread_number - 1; ++i) {
1700         thread_pool.emplace_back(thread_func);
1701     }
1702 
1703     thread_func();
1704 
1705     for (std::size_t i = 0; i < thread_number - 1; ++i) {
1706         if (thread_pool[i].joinable()) thread_pool[i].join();
1707     }
1708 
1709     for (auto& handler : arenas_pool) {
1710         if (handler.status != arena_handler::deleted) delete handler.arena;
1711     }
1712 }
1713 
1714 struct enqueue_test_helper {
1715     enqueue_test_helper(tbb::task_arena& arena, tbb::enumerable_thread_specific<bool>& ets , std::atomic<std::size_t>& task_counter)
1716         : my_arena(arena), my_ets(ets), my_task_counter(task_counter)
1717     {}
1718 
1719     enqueue_test_helper(const enqueue_test_helper& ef) : my_arena(ef.my_arena), my_ets(ef.my_ets), my_task_counter(ef.my_task_counter)
1720     {}
1721 
1722     void operator() () const {
1723         CHECK(my_ets.local());
1724         if (my_task_counter++ < 100000) my_arena.enqueue(enqueue_test_helper(my_arena, my_ets, my_task_counter));
1725         utils::yield();
1726     }
1727 
1728     tbb::task_arena& my_arena;
1729     tbb::enumerable_thread_specific<bool>& my_ets;
1730     std::atomic<std::size_t>& my_task_counter;
1731 };
1732 
1733 //--------------------------------------------------//
1734 
1735 //! Test for task arena in concurrent cases
1736 //! \brief \ref requirement
1737 TEST_CASE("Test for concurrent functionality") {
1738     TestConcurrentFunctionality();
1739 }
1740 
1741 //! Test for arena entry consistency
1742 //! \brief \ref requirement \ref error_guessing
1743 TEST_CASE("Test for task arena entry consistency") {
1744     TestArenaEntryConsistency();
1745 }
1746 
1747 //! Test for task arena attach functionality
1748 //! \brief \ref requirement \ref interface
1749 TEST_CASE("Test for the attach functionality") {
1750     TestAttach(4);
1751 }
1752 
1753 //! Test for constant functor requirements
1754 //! \brief \ref requirement \ref interface
1755 TEST_CASE("Test for constant functor requirement") {
1756     TestConstantFunctorRequirement();
1757 }
1758 
1759 //! Test for move semantics support
1760 //! \brief \ref requirement \ref interface
1761 TEST_CASE("Move semantics support") {
1762     TestMoveSemantics();
1763 }
1764 
1765 //! Test for different return value types
1766 //! \brief \ref requirement \ref interface
1767 TEST_CASE("Return value test") {
1768     TestReturnValue();
1769 }
1770 
1771 //! Test for delegated task spawn in case of unsuccessful slot attach
1772 //! \brief \ref error_guessing
1773 TEST_CASE("Delegated spawn wait") {
1774     TestDelegatedSpawnWait();
1775 }
1776 
1777 //! Test task arena isolation functionality
1778 //! \brief \ref requirement \ref interface
1779 TEST_CASE("Isolated execute") {
1780     // Isolation tests cases is valid only for more then 2 threads
1781     if (tbb::this_task_arena::max_concurrency() > 2) {
1782         TestIsolatedExecute();
1783     }
1784 }
1785 
1786 //! Test for TBB Workers creation limits
1787 //! \brief \ref requirement
1788 TEST_CASE("Default workers limit") {
1789     TestDefaultWorkersLimit();
1790 }
1791 
1792 //! Test for workers migration between arenas
1793 //! \brief \ref error_guessing \ref stress
1794 TEST_CASE("Arena workers migration") {
1795     TestArenaWorkersMigration();
1796 }
1797 
1798 //! Test for multiple waits, threads should not block each other
1799 //! \brief \ref requirement
1800 TEST_CASE("Multiple waits") {
1801     TestMultipleWaits();
1802 }
1803 
1804 //! Test for small stack size settings and arena initialization
1805 //! \brief \ref error_guessing
1806 TEST_CASE("Small stack size") {
1807     TestSmallStackSize();
1808 }
1809 
1810 #if TBB_USE_EXCEPTIONS
1811 //! \brief \ref requirement \ref stress
1812 TEST_CASE("Test for exceptions during execute.") {
1813     ExceptionInExecute();
1814 }
1815 
1816 //! \brief \ref error_guessing
1817 TEST_CASE("Exception thrown during tbb::task_arena::execute call") {
1818     struct throwing_obj {
1819         throwing_obj() {
1820             volatile bool flag = true;
1821             if (flag) throw std::exception{};
1822         }
1823         throwing_obj(const throwing_obj&) = default;
1824         ~throwing_obj() { FAIL("An destructor was called."); }
1825     };
1826 
1827     tbb::task_arena arena;
1828 
1829     REQUIRE_THROWS_AS( [&] {
1830         arena.execute([] {
1831             return throwing_obj{};
1832         });
1833     }(), std::exception );
1834 }
1835 #endif // TBB_USE_EXCEPTIONS
1836 
1837 //! \brief \ref stress
1838 TEST_CASE("Stress test with mixing functionality") {
1839     StressTestMixFunctionality();
1840 }
1841 
1842 //! \brief \ref stress
1843 TEST_CASE("Workers oversubscription") {
1844     std::size_t num_threads = utils::get_platform_max_threads();
1845     tbb::enumerable_thread_specific<bool> ets;
1846     tbb::global_control gl(tbb::global_control::max_allowed_parallelism, num_threads * 2);
1847     tbb::task_arena arena(num_threads * 2);
1848 
1849     utils::SpinBarrier barrier(num_threads * 2);
1850 
1851     arena.execute([&] {
1852         tbb::parallel_for(std::size_t(0), num_threads * 2,
1853             [&] (const std::size_t&) {
1854                 ets.local() = true;
1855                 barrier.wait();
1856             }
1857         );
1858     });
1859 
1860     utils::yield();
1861 
1862     std::atomic<std::size_t> task_counter{0};
1863     for (std::size_t i = 0; i < num_threads / 4 + 1; ++i) {
1864         arena.enqueue(enqueue_test_helper(arena, ets, task_counter));
1865     }
1866 
1867     while (task_counter < 100000) utils::yield();
1868 
1869     arena.execute([&] {
1870         tbb::parallel_for(std::size_t(0), num_threads * 2,
1871             [&] (const std::size_t&) {
1872                 CHECK(ets.local());
1873                 barrier.wait();
1874             }
1875         );
1876     });
1877 }
1878