xref: /oneTBB/test/tbb/test_task_group.cpp (revision 4eda0f93)
1 /*
2     Copyright (c) 2005-2022 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "common/test.h"
18 #include "common/utils.h"
19 #include "oneapi/tbb/detail/_config.h"
20 #include "tbb/global_control.h"
21 
22 #include "tbb/task_group.h"
23 
24 #include "common/concurrency_tracker.h"
25 
26 #include <atomic>
27 #include <stdexcept>
28 
29 //! \file test_task_group.cpp
30 //! \brief Test for [scheduler.task_group scheduler.task_group_status] specification
31 
32 unsigned g_MaxConcurrency = 4;
33 using atomic_t = std::atomic<std::uintptr_t>;
34 unsigned MinThread = 1;
35 unsigned MaxThread = 4;
36 
37 //------------------------------------------------------------------------
38 // Tests for the thread safety of the task_group manipulations
39 //------------------------------------------------------------------------
40 
41 #include "common/spin_barrier.h"
42 
43 enum SharingMode {
44     VagabondGroup = 1,
45     ParallelWait = 2
46 };
47 
48 template<typename task_group_type>
49 class SharedGroupBodyImpl : utils::NoCopy, utils::NoAfterlife {
50     static const std::uintptr_t c_numTasks0 = 4096,
51                         c_numTasks1 = 1024;
52 
53     const std::uintptr_t m_numThreads;
54     const std::uintptr_t m_sharingMode;
55 
56     task_group_type *m_taskGroup;
57     atomic_t m_tasksSpawned,
58              m_threadsReady;
59     utils::SpinBarrier m_barrier;
60 
61     static atomic_t s_tasksExecuted;
62 
63     struct TaskFunctor {
64         SharedGroupBodyImpl *m_pOwner;
65         void operator () () const {
66             if ( m_pOwner->m_sharingMode & ParallelWait ) {
67                 while ( utils::ConcurrencyTracker::PeakParallelism() < m_pOwner->m_numThreads )
68                     utils::yield();
69             }
70             ++s_tasksExecuted;
71         }
72     };
73 
74     TaskFunctor m_taskFunctor;
75 
76     void Spawn ( std::uintptr_t numTasks ) {
77         for ( std::uintptr_t i = 0; i < numTasks; ++i ) {
78             ++m_tasksSpawned;
79             utils::ConcurrencyTracker ct;
80             m_taskGroup->run( m_taskFunctor );
81         }
82         ++m_threadsReady;
83     }
84 
85     void DeleteTaskGroup () {
86         delete m_taskGroup;
87         m_taskGroup = NULL;
88     }
89 
90     void Wait () {
91         while ( m_threadsReady != m_numThreads )
92             utils::yield();
93         const std::uintptr_t numSpawned = c_numTasks0 + c_numTasks1 * (m_numThreads - 1);
94         CHECK_MESSAGE( m_tasksSpawned == numSpawned, "Wrong number of spawned tasks. The test is broken" );
95         INFO("Max spawning parallelism is " << utils::ConcurrencyTracker::PeakParallelism() << "out of " << g_MaxConcurrency);
96         if ( m_sharingMode & ParallelWait ) {
97             m_barrier.wait( &utils::ConcurrencyTracker::Reset );
98             {
99                 utils::ConcurrencyTracker ct;
100                 m_taskGroup->wait();
101             }
102             if ( utils::ConcurrencyTracker::PeakParallelism() == 1 )
103                 WARN( "Warning: No parallel waiting detected in TestParallelWait" );
104             m_barrier.wait();
105         }
106         else
107             m_taskGroup->wait();
108         CHECK_MESSAGE( m_tasksSpawned == numSpawned, "No tasks should be spawned after wait starts. The test is broken" );
109         CHECK_MESSAGE( s_tasksExecuted == numSpawned, "Not all spawned tasks were executed" );
110     }
111 
112 public:
113     SharedGroupBodyImpl ( std::uintptr_t numThreads, std::uintptr_t sharingMode = 0 )
114         : m_numThreads(numThreads)
115         , m_sharingMode(sharingMode)
116         , m_taskGroup(NULL)
117         , m_barrier(numThreads)
118     {
119         CHECK_MESSAGE( m_numThreads > 1, "SharedGroupBody tests require concurrency" );
120         if ((m_sharingMode & VagabondGroup) && m_numThreads != 2) {
121             CHECK_MESSAGE(false, "In vagabond mode SharedGroupBody must be used with 2 threads only");
122         }
123         utils::ConcurrencyTracker::Reset();
124         s_tasksExecuted = 0;
125         m_tasksSpawned = 0;
126         m_threadsReady = 0;
127         m_taskFunctor.m_pOwner = this;
128     }
129 
130     void Run ( std::uintptr_t idx ) {
131         AssertLive();
132         if ( idx == 0 ) {
133             if (m_taskGroup || m_tasksSpawned) {
134                 CHECK_MESSAGE(false, "SharedGroupBody must be reset before reuse");
135             }
136             m_taskGroup = new task_group_type;
137             Spawn( c_numTasks0 );
138             Wait();
139             if ( m_sharingMode & VagabondGroup )
140                 m_barrier.wait();
141             else
142                 DeleteTaskGroup();
143         }
144         else {
145             while ( m_tasksSpawned == 0 )
146                 utils::yield();
147             CHECK_MESSAGE ( m_taskGroup, "Task group is not initialized");
148             Spawn (c_numTasks1);
149             if ( m_sharingMode & ParallelWait )
150                 Wait();
151             if ( m_sharingMode & VagabondGroup ) {
152                 CHECK_MESSAGE ( idx == 1, "In vagabond mode SharedGroupBody must be used with 2 threads only" );
153                 m_barrier.wait();
154                 DeleteTaskGroup();
155             }
156         }
157         AssertLive();
158     }
159 };
160 
161 template<typename task_group_type>
162 atomic_t SharedGroupBodyImpl<task_group_type>::s_tasksExecuted;
163 
164 template<typename task_group_type>
165 class  SharedGroupBody : utils::NoAssign, utils::NoAfterlife {
166     bool m_bOwner;
167     SharedGroupBodyImpl<task_group_type> *m_pImpl;
168 public:
169     SharedGroupBody ( std::uintptr_t numThreads, std::uintptr_t sharingMode = 0 )
170         : utils::NoAssign()
171         , utils::NoAfterlife()
172         , m_bOwner(true)
173         , m_pImpl( new SharedGroupBodyImpl<task_group_type>(numThreads, sharingMode) )
174     {}
175     SharedGroupBody ( const SharedGroupBody& src )
176         : utils::NoAssign()
177         , utils::NoAfterlife()
178         , m_bOwner(false)
179         , m_pImpl(src.m_pImpl)
180     {}
181     ~SharedGroupBody () {
182         if ( m_bOwner )
183             delete m_pImpl;
184     }
185     void operator() ( std::uintptr_t idx ) const {
186         // Wrap the functior into additional task group to enforce bounding.
187         task_group_type tg;
188         tg.run_and_wait([&] { m_pImpl->Run(idx); });
189     }
190 };
191 
192 template<typename task_group_type>
193 class RunAndWaitSyncronizationTestBody : utils::NoAssign {
194     utils::SpinBarrier& m_barrier;
195     std::atomic<bool>& m_completed;
196     task_group_type& m_tg;
197 public:
198     RunAndWaitSyncronizationTestBody(utils::SpinBarrier& barrier, std::atomic<bool>& completed, task_group_type& tg)
199         : m_barrier(barrier), m_completed(completed), m_tg(tg) {}
200 
201     void operator()() const {
202         m_barrier.wait();
203         utils::doDummyWork(100000);
204         m_completed = true;
205     }
206 
207     void operator()(int id) const {
208         if (id == 0) {
209             m_tg.run_and_wait(*this);
210         } else {
211             m_barrier.wait();
212             m_tg.wait();
213             CHECK_MESSAGE(m_completed, "A concurrent waiter has left the wait method earlier than work has finished");
214         }
215     }
216 };
217 
218 template<typename task_group_type>
219 void TestParallelSpawn () {
220     NativeParallelFor( g_MaxConcurrency, SharedGroupBody<task_group_type>(g_MaxConcurrency) );
221 }
222 
223 template<typename task_group_type>
224 void TestParallelWait () {
225     NativeParallelFor( g_MaxConcurrency, SharedGroupBody<task_group_type>(g_MaxConcurrency, ParallelWait) );
226 
227     utils::SpinBarrier barrier(g_MaxConcurrency);
228     std::atomic<bool> completed;
229     completed = false;
230     task_group_type tg;
231     RunAndWaitSyncronizationTestBody<task_group_type> b(barrier, completed, tg);
232     NativeParallelFor( g_MaxConcurrency, b );
233 }
234 
235 // Tests non-stack-bound task group (the group that is allocated by one thread and destroyed by the other)
236 template<typename task_group_type>
237 void TestVagabondGroup () {
238     NativeParallelFor( 2, SharedGroupBody<task_group_type>(2, VagabondGroup) );
239 }
240 
241 #include "common/memory_usage.h"
242 
243 template<typename task_group_type>
244 void TestThreadSafety() {
245     auto tests = [] {
246         for (int trail = 0; trail < 10; ++trail) {
247             TestParallelSpawn<task_group_type>();
248             TestParallelWait<task_group_type>();
249             TestVagabondGroup<task_group_type>();
250         }
251     };
252 
253     // Test and warm up allocator.
254     tests();
255 
256     // Ensure that cosumption is stabilized.
257     std::size_t initial = utils::GetMemoryUsage();
258     for (;;) {
259         tests();
260         std::size_t current = utils::GetMemoryUsage();
261         if (current <= initial) {
262             return;
263         }
264         initial = current;
265     }
266 }
267 //------------------------------------------------------------------------
268 // Common requisites of the Fibonacci tests
269 //------------------------------------------------------------------------
270 
271 const std::uintptr_t N = 20;
272 const std::uintptr_t F = 6765;
273 
274 atomic_t g_Sum;
275 
276 #define FIB_TEST_PROLOGUE() \
277     const unsigned numRepeats = g_MaxConcurrency * 4;    \
278     utils::ConcurrencyTracker::Reset()
279 
280 #define FIB_TEST_EPILOGUE(sum) \
281     CHECK(utils::ConcurrencyTracker::PeakParallelism() <= g_MaxConcurrency); \
282     CHECK( sum == numRepeats * F );
283 
284 
285 // Fibonacci tasks specified as functors
286 template<class task_group_type>
287 class FibTaskBase : utils::NoAssign, utils::NoAfterlife {
288 protected:
289     std::uintptr_t* m_pRes;
290     mutable std::uintptr_t m_Num;
291     virtual void impl() const = 0;
292 public:
293     FibTaskBase( std::uintptr_t* y, std::uintptr_t n ) : m_pRes(y), m_Num(n) {}
294     void operator()() const {
295         utils::ConcurrencyTracker ct;
296         AssertLive();
297         if( m_Num < 2 ) {
298             *m_pRes = m_Num;
299         } else {
300             impl();
301         }
302     }
303     virtual ~FibTaskBase() {}
304 };
305 
306 template<class task_group_type>
307 class FibTaskAsymmetricTreeWithFunctor : public FibTaskBase<task_group_type> {
308 public:
309     FibTaskAsymmetricTreeWithFunctor( std::uintptr_t* y, std::uintptr_t n ) : FibTaskBase<task_group_type>(y, n) {}
310     virtual void impl() const override {
311         std::uintptr_t x = ~0u;
312         task_group_type tg;
313         tg.run( FibTaskAsymmetricTreeWithFunctor(&x, this->m_Num-1) );
314         this->m_Num -= 2; tg.run_and_wait( *this );
315         *(this->m_pRes) += x;
316     }
317 };
318 
319 template<class task_group_type>
320 class FibTaskSymmetricTreeWithFunctor : public FibTaskBase<task_group_type> {
321 public:
322     FibTaskSymmetricTreeWithFunctor( std::uintptr_t* y, std::uintptr_t n ) : FibTaskBase<task_group_type>(y, n) {}
323     virtual void impl() const override {
324         std::uintptr_t x = ~0u,
325                y = ~0u;
326         task_group_type tg;
327         tg.run( FibTaskSymmetricTreeWithFunctor(&x, this->m_Num-1) );
328         tg.run( FibTaskSymmetricTreeWithFunctor(&y, this->m_Num-2) );
329         tg.wait();
330         *(this->m_pRes) = x + y;
331     }
332 };
333 
334 // Helper functions
335 template<class fib_task>
336 std::uintptr_t RunFibTask(std::uintptr_t n) {
337     std::uintptr_t res = ~0u;
338     fib_task(&res, n)();
339     return res;
340 }
341 
342 template<typename fib_task>
343 void RunFibTest() {
344     FIB_TEST_PROLOGUE();
345     std::uintptr_t sum = 0;
346     for( unsigned i = 0; i < numRepeats; ++i )
347         sum += RunFibTask<fib_task>(N);
348     FIB_TEST_EPILOGUE(sum);
349 }
350 
351 template<typename fib_task>
352 void FibFunctionNoArgs() {
353     g_Sum += RunFibTask<fib_task>(N);
354 }
355 
356 template<typename task_group_type>
357 void TestFibWithLambdas() {
358     FIB_TEST_PROLOGUE();
359     atomic_t sum;
360     sum = 0;
361     task_group_type tg;
362     for( unsigned i = 0; i < numRepeats; ++i )
363         tg.run( [&](){sum += RunFibTask<FibTaskSymmetricTreeWithFunctor<task_group_type> >(N);} );
364     tg.wait();
365     FIB_TEST_EPILOGUE(sum);
366 }
367 
368 template<typename task_group_type>
369 void TestFibWithFunctor() {
370     RunFibTest<FibTaskAsymmetricTreeWithFunctor<task_group_type> >();
371     RunFibTest< FibTaskSymmetricTreeWithFunctor<task_group_type> >();
372 }
373 
374 template<typename task_group_type>
375 void TestFibWithFunctionPtr() {
376     FIB_TEST_PROLOGUE();
377     g_Sum = 0;
378     task_group_type tg;
379     for( unsigned i = 0; i < numRepeats; ++i )
380         tg.run( &FibFunctionNoArgs<FibTaskSymmetricTreeWithFunctor<task_group_type> > );
381     tg.wait();
382     FIB_TEST_EPILOGUE(g_Sum);
383 }
384 
385 template<typename task_group_type>
386 void RunFibonacciTests() {
387     TestFibWithLambdas<task_group_type>();
388     TestFibWithFunctor<task_group_type>();
389     TestFibWithFunctionPtr<task_group_type>();
390 }
391 
392 class test_exception : public std::exception
393 {
394     const char* m_strDescription;
395 public:
396     test_exception ( const char* descr ) : m_strDescription(descr) {}
397 
398     const char* what() const throw() override { return m_strDescription; }
399 };
400 
401 using TestException = test_exception;
402 
403 #include <string.h>
404 
405 #define NUM_CHORES      512
406 #define NUM_GROUPS      64
407 #define SKIP_CHORES     (NUM_CHORES/4)
408 #define SKIP_GROUPS     (NUM_GROUPS/4)
409 #define EXCEPTION_DESCR1 "Test exception 1"
410 #define EXCEPTION_DESCR2 "Test exception 2"
411 
412 atomic_t g_ExceptionCount;
413 atomic_t g_TaskCount;
414 unsigned g_ExecutedAtCancellation;
415 bool g_Rethrow;
416 bool g_Throw;
417 
418 class ThrowingTask : utils::NoAssign, utils::NoAfterlife {
419     atomic_t &m_TaskCount;
420 public:
421     ThrowingTask( atomic_t& counter ) : m_TaskCount(counter) {}
422     void operator() () const {
423         utils::ConcurrencyTracker ct;
424         AssertLive();
425         if ( g_Throw ) {
426             if ( ++m_TaskCount == SKIP_CHORES )
427                 TBB_TEST_THROW(test_exception(EXCEPTION_DESCR1));
428             utils::yield();
429         }
430         else {
431             ++g_TaskCount;
432             while( !tbb::is_current_task_group_canceling() )
433                 utils::yield();
434         }
435     }
436 };
437 
438 inline void ResetGlobals ( bool bThrow, bool bRethrow ) {
439     g_Throw = bThrow;
440     g_Rethrow = bRethrow;
441     g_ExceptionCount = 0;
442     g_TaskCount = 0;
443     utils::ConcurrencyTracker::Reset();
444 }
445 
446 template<typename task_group_type>
447 void LaunchChildrenWithFunctor () {
448     atomic_t count;
449     count = 0;
450     task_group_type g;
451     for (unsigned i = 0; i < NUM_CHORES; ++i) {
452         if (i % 2 == 1) {
453             g.run(g.defer(ThrowingTask(count)));
454         } else
455         {
456             g.run(ThrowingTask(count));
457         }
458     }
459 #if TBB_USE_EXCEPTIONS
460     tbb::task_group_status status = tbb::not_complete;
461     bool exceptionCaught = false;
462     try {
463         status = g.wait();
464     } catch ( TestException& e ) {
465         CHECK_MESSAGE( e.what(), "Empty what() string" );
466         CHECK_MESSAGE( strcmp(e.what(), EXCEPTION_DESCR1) == 0, "Unknown exception" );
467         exceptionCaught = true;
468         ++g_ExceptionCount;
469     } catch( ... ) { CHECK_MESSAGE( false, "Unknown exception" ); }
470     if (g_Throw && !exceptionCaught && status != tbb::canceled) {
471         CHECK_MESSAGE(false, "No exception in the child task group");
472     }
473     if ( g_Rethrow && g_ExceptionCount > SKIP_GROUPS ) {
474         throw test_exception(EXCEPTION_DESCR2);
475     }
476 #else
477     g.wait();
478 #endif
479 }
480 
481 // Tests for cancellation and exception handling behavior
482 template<typename task_group_type>
483 void TestManualCancellationWithFunctor () {
484     ResetGlobals( false, false );
485     task_group_type tg;
486     for (unsigned i = 0; i < NUM_GROUPS; ++i) {
487         // TBB version does not require taking function address
488         if (i % 2 == 0) {
489             auto h = tg.defer(&LaunchChildrenWithFunctor<task_group_type>);
490             tg.run(std::move(h));
491         } else
492         {
493             tg.run(&LaunchChildrenWithFunctor<task_group_type>);
494         }
495     }
496     CHECK_MESSAGE ( !tbb::is_current_task_group_canceling(), "Unexpected cancellation" );
497     while ( g_MaxConcurrency > 1 && g_TaskCount == 0 )
498         utils::yield();
499     tg.cancel();
500     g_ExecutedAtCancellation = int(g_TaskCount);
501     tbb::task_group_status status = tg.wait();
502     CHECK_MESSAGE( status == tbb::canceled, "Task group reported invalid status." );
503     CHECK_MESSAGE( g_TaskCount <= NUM_GROUPS * NUM_CHORES, "Too many tasks reported. The test is broken" );
504     CHECK_MESSAGE( g_TaskCount < NUM_GROUPS * NUM_CHORES, "No tasks were cancelled. Cancellation model changed?" );
505     CHECK_MESSAGE( g_TaskCount <= g_ExecutedAtCancellation + utils::ConcurrencyTracker::PeakParallelism(), "Too many tasks survived cancellation" );
506 }
507 
508 #if TBB_USE_EXCEPTIONS
509 template<typename task_group_type>
510 void TestExceptionHandling1 () {
511     ResetGlobals( true, false );
512     task_group_type tg;
513     for( unsigned i = 0; i < NUM_GROUPS; ++i )
514         // TBB version does not require taking function address
515         tg.run( &LaunchChildrenWithFunctor<task_group_type> );
516     try {
517         tg.wait();
518     } catch ( ... ) {
519         CHECK_MESSAGE( false, "Unexpected exception" );
520     }
521     CHECK_MESSAGE( g_ExceptionCount <= NUM_GROUPS, "Too many exceptions from the child groups. The test is broken" );
522     CHECK_MESSAGE( g_ExceptionCount == NUM_GROUPS, "Not all child groups threw the exception" );
523 }
524 
525 template<typename task_group_type>
526 void TestExceptionHandling2 () {
527     ResetGlobals( true, true );
528     task_group_type tg;
529     bool exceptionCaught = false;
530     for( unsigned i = 0; i < NUM_GROUPS; ++i ) {
531         // TBB version does not require taking function address
532         tg.run( &LaunchChildrenWithFunctor<task_group_type> );
533     }
534     try {
535         tg.wait();
536     } catch ( TestException& e ) {
537         CHECK_MESSAGE( e.what(), "Empty what() string" );
538         CHECK_MESSAGE( strcmp(e.what(), EXCEPTION_DESCR2) == 0, "Unknown exception" );
539         exceptionCaught = true;
540     } catch( ... ) { CHECK_MESSAGE( false, "Unknown exception" ); }
541     CHECK_MESSAGE( exceptionCaught, "No exception thrown from the root task group" );
542     CHECK_MESSAGE( g_ExceptionCount >= SKIP_GROUPS, "Too few exceptions from the child groups. The test is broken" );
543     CHECK_MESSAGE( g_ExceptionCount <= NUM_GROUPS - SKIP_GROUPS, "Too many exceptions from the child groups. The test is broken" );
544     CHECK_MESSAGE( g_ExceptionCount < NUM_GROUPS - SKIP_GROUPS, "None of the child groups was cancelled" );
545 }
546 
547 template <typename task_group_type>
548 void TestExceptionHandling3() {
549     task_group_type tg;
550     try {
551         tg.run_and_wait([]() {
552             volatile bool suppress_unreachable_code_warning = true;
553             if (suppress_unreachable_code_warning) {
554                 throw 1;
555             }
556         });
557     } catch (int error) {
558         CHECK(error == 1);
559     } catch ( ... ) {
560         CHECK_MESSAGE( false, "Unexpected exception" );
561     }
562 }
563 
564 template<typename task_group_type>
565 class LaunchChildrenDriver {
566 public:
567     void Launch(task_group_type& tg) {
568         ResetGlobals(false, false);
569         for (unsigned i = 0; i < NUM_GROUPS; ++i) {
570             tg.run(LaunchChildrenWithFunctor<task_group_type>);
571         }
572         CHECK_MESSAGE(!tbb::is_current_task_group_canceling(), "Unexpected cancellation");
573         while (g_MaxConcurrency > 1 && g_TaskCount == 0)
574             utils::yield();
575     }
576 
577     void Finish() {
578         CHECK_MESSAGE(g_TaskCount <= NUM_GROUPS * NUM_CHORES, "Too many tasks reported. The test is broken");
579         CHECK_MESSAGE(g_TaskCount < NUM_GROUPS * NUM_CHORES, "No tasks were cancelled. Cancellation model changed?");
580         CHECK_MESSAGE(g_TaskCount <= g_ExecutedAtCancellation + g_MaxConcurrency, "Too many tasks survived cancellation");
581     }
582 }; // LaunchChildrenWithTaskHandleDriver
583 
584 template<typename task_group_type, bool Throw>
585 void TestMissingWait () {
586     bool exception_occurred = false,
587          unexpected_exception = false;
588     LaunchChildrenDriver<task_group_type> driver;
589     try {
590         task_group_type tg;
591         driver.Launch( tg );
592         volatile bool suppress_unreachable_code_warning = Throw;
593         if (suppress_unreachable_code_warning) {
594             throw int(); // Initiate stack unwinding
595         }
596     }
597     catch ( const tbb::missing_wait& e ) {
598         CHECK_MESSAGE( e.what(), "Error message is absent" );
599         exception_occurred = true;
600         unexpected_exception = Throw;
601     }
602     catch ( int ) {
603         exception_occurred = true;
604         unexpected_exception = !Throw;
605     }
606     catch ( ... ) {
607         exception_occurred = unexpected_exception = true;
608     }
609     CHECK( exception_occurred );
610     CHECK( !unexpected_exception );
611     driver.Finish();
612 }
613 #endif
614 
615 template<typename task_group_type>
616 void RunCancellationAndExceptionHandlingTests() {
617     TestManualCancellationWithFunctor<task_group_type>();
618 #if TBB_USE_EXCEPTIONS
619     TestExceptionHandling1<task_group_type>();
620     TestExceptionHandling2<task_group_type>();
621     TestExceptionHandling3<task_group_type>();
622     TestMissingWait<task_group_type, true>();
623     TestMissingWait<task_group_type, false>();
624 #endif
625 }
626 
627 void EmptyFunction () {}
628 
629 struct TestFunctor {
630     void operator()() { CHECK_MESSAGE( false, "Non-const operator called" ); }
631     void operator()() const { /* library requires this overload only */ }
632 };
633 
634 template<typename task_group_type>
635 void TestConstantFunctorRequirement() {
636     task_group_type g;
637     TestFunctor tf;
638     g.run( tf ); g.wait();
639     g.run_and_wait( tf );
640 }
641 
642 //------------------------------------------------------------------------
643 namespace TestMoveSemanticsNS {
644     struct TestFunctor {
645         void operator()() const {};
646     };
647 
648     struct MoveOnlyFunctor : utils::MoveOnly, TestFunctor {
649         MoveOnlyFunctor() : utils::MoveOnly() {};
650         MoveOnlyFunctor(MoveOnlyFunctor&& other) : utils::MoveOnly(std::move(other)) {};
651     };
652 
653     struct MovePreferableFunctor : utils::Movable, TestFunctor {
654         MovePreferableFunctor() : utils::Movable() {};
655         MovePreferableFunctor(MovePreferableFunctor&& other) : utils::Movable(std::move(other)) {};
656         MovePreferableFunctor(const MovePreferableFunctor& other) : utils::Movable(other) {};
657     };
658 
659     struct NoMoveNoCopyFunctor : utils::NoCopy, TestFunctor {
660         NoMoveNoCopyFunctor() : utils::NoCopy() {};
661         // mv ctor is not allowed as cp ctor from parent utils::NoCopy
662     private:
663         NoMoveNoCopyFunctor(NoMoveNoCopyFunctor&&);
664     };
665 
666      template<typename task_group_type>
667     void TestBareFunctors() {
668         task_group_type tg;
669         MovePreferableFunctor mpf;
670         // run_and_wait() doesn't have any copies or moves of arguments inside the impl
671         tg.run_and_wait( NoMoveNoCopyFunctor() );
672 
673         tg.run( MoveOnlyFunctor() );
674         tg.wait();
675 
676         tg.run( mpf );
677         tg.wait();
678         CHECK_MESSAGE(mpf.alive, "object was moved when was passed by lval");
679         mpf.Reset();
680 
681         tg.run( std::move(mpf) );
682         tg.wait();
683         CHECK_MESSAGE(!mpf.alive, "object was copied when was passed by rval");
684         mpf.Reset();
685     }
686 }
687 
688 template<typename task_group_type>
689 void TestMoveSemantics() {
690     TestMoveSemanticsNS::TestBareFunctors<task_group_type>();
691 }
692 //------------------------------------------------------------------------
693 
694 // TODO: TBB_REVAMP_TODO - enable when ETS is available
695 #if TBBTEST_USE_TBB && TBB_PREVIEW_ISOLATED_TASK_GROUP
696 namespace TestIsolationNS {
697     class DummyFunctor {
698     public:
699         DummyFunctor() {}
700         void operator()() const {
701             for ( volatile int j = 0; j < 10; ++j ) {}
702         }
703     };
704 
705     template<typename task_group_type>
706     class ParForBody {
707         task_group_type& m_tg;
708         std::atomic<bool>& m_preserved;
709         tbb::enumerable_thread_specific<int>& m_ets;
710     public:
711         ParForBody(
712             task_group_type& tg,
713             std::atomic<bool>& preserved,
714             tbb::enumerable_thread_specific<int>& ets
715         ) : m_tg(tg), m_preserved(preserved), m_ets(ets) {}
716 
717         void operator()(int) const {
718             if (++m_ets.local() > 1) m_preserved = false;
719 
720             for (int i = 0; i < 1000; ++i)
721                 m_tg.run(DummyFunctor());
722             m_tg.wait();
723             m_tg.run_and_wait(DummyFunctor());
724 
725             --m_ets.local();
726         }
727     };
728 
729     template<typename task_group_type>
730     void CheckIsolation(bool isolation_is_expected) {
731         task_group_type tg;
732         std::atomic<bool> isolation_is_preserved;
733         isolation_is_preserved = true;
734         tbb::enumerable_thread_specific<int> ets(0);
735 
736         tbb::parallel_for(0, 100, ParForBody<task_group_type>(tg, isolation_is_preserved, ets));
737 
738         ASSERT(
739             isolation_is_expected == isolation_is_preserved,
740             "Actual and expected isolation-related behaviours are different"
741         );
742     }
743 
744     // Should be called only when > 1 thread is used, because otherwise isolation is guaranteed to take place
745     void TestIsolation() {
746         CheckIsolation<tbb::task_group>(false);
747         CheckIsolation<tbb::isolated_task_group>(true);
748     }
749 }
750 #endif
751 
752 #if __TBB_USE_ADDRESS_SANITIZER
753 //! Test for thread safety for the task_group
754 //! \brief \ref error_guessing \ref resource_usage
755 TEST_CASE("Memory leaks test is not applicable under ASAN\n" * doctest::skip(true)) {}
756 #else
757 //! Test for thread safety for the task_group
758 //! \brief \ref error_guessing \ref resource_usage
759 TEST_CASE("Thread safety test for the task group") {
760     if (tbb::this_task_arena::max_concurrency() < 2) {
761         // The test requires more than one thread to check thread safety
762         return;
763     }
764     for (unsigned p=MinThread; p <= MaxThread; ++p) {
765         if (p < 2) {
766             continue;
767         }
768         tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p);
769         g_MaxConcurrency = p;
770         TestThreadSafety<tbb::task_group>();
771     }
772 }
773 #endif
774 
775 //! Fibonacci test for task group
776 //! \brief \ref interface \ref requirement
777 TEST_CASE("Fibonacci test for the task group") {
778     for (unsigned p=MinThread; p <= MaxThread; ++p) {
779         tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p);
780         g_MaxConcurrency = p;
781         RunFibonacciTests<tbb::task_group>();
782     }
783 }
784 
785 //! Cancellation and exception test for the task group
786 //! \brief \ref interface \ref requirement
787 TEST_CASE("Cancellation and exception test for the task group") {
788     for (unsigned p = MinThread; p <= MaxThread; ++p) {
789         tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p);
790         tbb::task_arena a(p);
791         g_MaxConcurrency = p;
792         a.execute([] {
793             RunCancellationAndExceptionHandlingTests<tbb::task_group>();
794         });
795     }
796 }
797 
798 //! Constant functor test for the task group
799 //! \brief \ref interface \ref negative
800 TEST_CASE("Constant functor test for the task group") {
801     for (unsigned p=MinThread; p <= MaxThread; ++p) {
802         tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p);
803         g_MaxConcurrency = p;
804         TestConstantFunctorRequirement<tbb::task_group>();
805     }
806 }
807 
808 //! Move semantics test for the task group
809 //! \brief \ref interface \ref requirement
810 TEST_CASE("Move semantics test for the task group") {
811     for (unsigned p=MinThread; p <= MaxThread; ++p) {
812         tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p);
813         g_MaxConcurrency = p;
814         TestMoveSemantics<tbb::task_group>();
815     }
816 }
817 
818 #if TBB_PREVIEW_ISOLATED_TASK_GROUP
819 
820 #if __TBB_USE_ADDRESS_SANITIZER
821 //! Test for thread safety for the isolated_task_group
822 //! \brief \ref error_guessing
823 TEST_CASE("Memory leaks test is not applicable under ASAN\n" * doctest::skip(true)) {}
824 #else
825 //! Test for thread safety for the isolated_task_group
826 //! \brief \ref error_guessing
827 TEST_CASE("Thread safety test for the isolated task group") {
828     if (tbb::this_task_arena::max_concurrency() < 2) {
829         // The test requires more than one thread to check thread safety
830         return;
831     }
832     for (unsigned p=MinThread; p <= MaxThread; ++p) {
833         if (p < 2) {
834             continue;
835         }
836         tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p);
837         g_MaxConcurrency = p;
838         TestThreadSafety<tbb::isolated_task_group>();
839     }
840 }
841 #endif
842 
843 //! Cancellation and exception test for the isolated task group
844 //! \brief \ref interface \ref requirement
845 TEST_CASE("Fibonacci test for the isolated task group") {
846     for (unsigned p=MinThread; p <= MaxThread; ++p) {
847         tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p);
848         g_MaxConcurrency = p;
849         RunFibonacciTests<tbb::isolated_task_group>();
850     }
851 }
852 
853 //! Cancellation and exception test for the isolated task group
854 //! \brief \ref interface \ref requirement
855 TEST_CASE("Cancellation and exception test for the isolated task group") {
856     for (unsigned p=MinThread; p <= MaxThread; ++p) {
857         tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p);
858         g_MaxConcurrency = p;
859         RunCancellationAndExceptionHandlingTests<tbb::isolated_task_group>();
860     }
861 }
862 
863 //! Constant functor test for the isolated task group.
864 //! \brief \ref interface \ref negative
865 TEST_CASE("Constant functor test for the isolated task group") {
866     for (unsigned p=MinThread; p <= MaxThread; ++p) {
867         tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p);
868         g_MaxConcurrency = p;
869         TestConstantFunctorRequirement<tbb::isolated_task_group>();
870     }
871 }
872 
873 //! Move semantics test for the isolated task group.
874 //! \brief \ref interface \ref requirement
875 TEST_CASE("Move semantics test for the isolated task group") {
876     for (unsigned p=MinThread; p <= MaxThread; ++p) {
877         tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p);
878         g_MaxConcurrency = p;
879         TestMoveSemantics<tbb::isolated_task_group>();
880     }
881 }
882 
883 //TODO: add test void isolated_task_group::run(d2::task_handle&& h) and isolated_task_group::::run_and_wait(d2::task_handle&& h)
884 #endif /* TBB_PREVIEW_ISOLATED_TASK_GROUP */
885 
886 void run_deep_stealing(tbb::task_group& tg1, tbb::task_group& tg2, int num_tasks, std::atomic<int>& tasks_executed) {
887     for (int i = 0; i < num_tasks; ++i) {
888         tg2.run([&tg1, &tasks_executed] {
889             volatile char consume_stack[1000]{};
890             ++tasks_executed;
891             tg1.wait();
892             utils::suppress_unused_warning(consume_stack);
893         });
894     }
895 }
896 
897 // TODO: move to the conformance test
898 //! Test for stack overflow avoidance mechanism.
899 //! \brief \ref requirement
900 TEST_CASE("Test for stack overflow avoidance mechanism") {
901     if (tbb::this_task_arena::max_concurrency() < 2) {
902         return;
903     }
904 
905     tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, 2);
906     tbb::task_group tg1;
907     tbb::task_group tg2;
908     std::atomic<int> tasks_executed{};
909     tg1.run_and_wait([&tg1, &tg2, &tasks_executed] {
910         run_deep_stealing(tg1, tg2, 10000, tasks_executed);
911         while (tasks_executed < 100) {
912             // Some stealing is expected to happen.
913             utils::yield();
914         }
915         CHECK(tasks_executed < 10000);
916     });
917     tg2.wait();
918     CHECK(tasks_executed == 10000);
919 }
920 
921 //! Test for stack overflow avoidance mechanism.
922 //! \brief \ref error_guessing
923 TEST_CASE("Test for stack overflow avoidance mechanism within arena") {
924     if (tbb::this_task_arena::max_concurrency() < 2) {
925         return;
926     }
927 
928     tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, 2);
929     tbb::task_group tg1;
930     tbb::task_group tg2;
931     std::atomic<int> tasks_executed{};
932 
933     // Determine nested task execution limit.
934     int second_thread_executed{};
935     tg1.run_and_wait([&tg1, &tg2, &tasks_executed, &second_thread_executed] {
936         run_deep_stealing(tg1, tg2, 10000, tasks_executed);
937         do {
938             second_thread_executed = tasks_executed;
939             utils::Sleep(10);
940         } while (second_thread_executed < 100 || second_thread_executed != tasks_executed);
941         CHECK(tasks_executed < 10000);
942     });
943     tg2.wait();
944     CHECK(tasks_executed == 10000);
945 
946     tasks_executed = 0;
947     tbb::task_arena a(2, 2);
948     tg1.run_and_wait([&a, &tg1, &tg2, &tasks_executed, second_thread_executed] {
949         run_deep_stealing(tg1, tg2, second_thread_executed-1, tasks_executed);
950         while (tasks_executed < second_thread_executed-1) {
951             // Wait until the second thread near the limit.
952             utils::yield();
953         }
954         tg2.run([&a, &tg1, &tasks_executed] {
955             a.execute([&tg1, &tasks_executed] {
956                 volatile char consume_stack[1000]{};
957                 ++tasks_executed;
958                 tg1.wait();
959                 utils::suppress_unused_warning(consume_stack);
960             });
961         });
962         while (tasks_executed < second_thread_executed) {
963             // Wait until the second joins the arena.
964             utils::yield();
965         }
966         a.execute([&tg1, &tg2, &tasks_executed] {
967             run_deep_stealing(tg1, tg2, 10000, tasks_executed);
968         });
969         int currently_executed{};
970         do {
971             currently_executed = tasks_executed;
972             utils::Sleep(10);
973         } while (currently_executed != tasks_executed);
974         CHECK(tasks_executed < 10000 + second_thread_executed);
975     });
976     a.execute([&tg2] {
977         tg2.wait();
978     });
979     CHECK(tasks_executed == 10000 + second_thread_executed);
980 }
981 
982 //! Test checks that we can submit work to task_group asynchronously with waiting.
983 //! \brief \ref regression
984 TEST_CASE("Async task group") {
985     int num_threads = tbb::this_task_arena::max_concurrency();
986     if (num_threads < 3) {
987         // The test requires at least 2 worker threads
988         return;
989     }
990     tbb::task_arena a(2*num_threads, num_threads);
991     utils::SpinBarrier barrier(num_threads + 2);
992     tbb::task_group tg[2];
993     std::atomic<bool> finished[2]{};
994     finished[0] = false; finished[1] = false;
995     for (int i = 0; i < 2; ++i) {
996         a.enqueue([i, &tg, &finished, &barrier] {
997             barrier.wait();
998             for (int j = 0; j < 10000; ++j) {
999                 tg[i].run([] {});
1000                 utils::yield();
1001             }
1002             finished[i] = true;
1003         });
1004     }
1005     utils::NativeParallelFor(num_threads, [&](int idx) {
1006         barrier.wait();
1007         a.execute([idx, &tg, &finished] {
1008             std::size_t counter{};
1009             while (!finished[idx%2]) {
1010                 tg[idx%2].wait();
1011                 if (counter++ % 16 == 0) utils::yield();
1012             }
1013             tg[idx%2].wait();
1014         });
1015     });
1016 }
1017 
1018 struct SelfRunner {
1019     tbb::task_group& m_tg;
1020     std::atomic<unsigned>& count;
1021     void operator()() const {
1022         unsigned previous_count = count.fetch_sub(1);
1023         if (previous_count > 1)
1024             m_tg.run( *this );
1025     }
1026 };
1027 
1028 //! Submit work to single task_group instance from inside the work
1029 //! \brief \ref error_guessing
1030 TEST_CASE("Run self using same task_group instance") {
1031     const unsigned num = 10;
1032     std::atomic<unsigned> count{num};
1033     tbb::task_group tg;
1034     SelfRunner uf{tg, count};
1035     tg.run( uf );
1036     tg.wait();
1037     CHECK_MESSAGE(
1038         count == 0,
1039         "Not all tasks were spawned from inside the functor running within task_group."
1040     );
1041 }
1042 
1043 //TODO: move to some common place to share with conformance tests
1044 namespace accept_task_group_context {
1045 
1046 template <typename TaskGroup, typename CancelF, typename WaitF>
1047 void run_cancellation_use_case(CancelF&& cancel, WaitF&& wait) {
1048     std::atomic<bool> outer_cancelled{false};
1049     std::atomic<unsigned> count{13};
1050 
1051     tbb::task_group_context inner_ctx(tbb::task_group_context::isolated);
1052     TaskGroup inner_tg(inner_ctx);
1053 
1054     tbb::task_group outer_tg;
1055     auto outer_tg_task = [&] {
1056         inner_tg.run([&] {
1057             utils::SpinWaitUntilEq(outer_cancelled, true);
1058             inner_tg.run( SelfRunner{inner_tg, count} );
1059         });
1060 
1061         utils::try_call([&] {
1062             std::forward<CancelF>(cancel)(outer_tg);
1063         }).on_completion([&] {
1064             outer_cancelled = true;
1065         });
1066     };
1067 
1068     auto check = [&] {
1069         tbb::task_group_status outer_status = tbb::task_group_status::not_complete;
1070         outer_status = std::forward<WaitF>(wait)(outer_tg);
1071         CHECK_MESSAGE(
1072             outer_status == tbb::task_group_status::canceled,
1073             "Outer task group should have been cancelled."
1074         );
1075 
1076         tbb::task_group_status inner_status = inner_tg.wait();
1077         CHECK_MESSAGE(
1078             inner_status == tbb::task_group_status::complete,
1079             "Inner task group should have completed despite the cancellation of the outer one."
1080         );
1081 
1082         CHECK_MESSAGE(0 == count, "Some of the inner group tasks were not executed.");
1083     };
1084 
1085     outer_tg.run(outer_tg_task);
1086     check();
1087 }
1088 
1089 template <typename TaskGroup>
1090 void test() {
1091     run_cancellation_use_case<TaskGroup>(
1092         [](tbb::task_group& outer) { outer.cancel(); },
1093         [](tbb::task_group& outer) { return outer.wait(); }
1094     );
1095 
1096 #if TBB_USE_EXCEPTIONS
1097     run_cancellation_use_case<TaskGroup>(
1098         [](tbb::task_group& /*outer*/) {
1099             volatile bool suppress_unreachable_code_warning = true;
1100             if (suppress_unreachable_code_warning) {
1101                 throw int();
1102             }
1103         },
1104         [](tbb::task_group& outer) {
1105             try {
1106                 outer.wait();
1107                 return tbb::task_group_status::complete;
1108             } catch(const int&) {
1109                 return tbb::task_group_status::canceled;
1110             }
1111         }
1112     );
1113 #endif
1114 }
1115 
1116 } // namespace accept_task_group_context
1117 
1118 //! Respect task_group_context passed from outside
1119 //! \brief \ref interface \ref requirement
1120 TEST_CASE("Respect task_group_context passed from outside") {
1121 #if TBB_PREVIEW_ISOLATED_TASK_GROUP
1122     accept_task_group_context::test<tbb::isolated_task_group>();
1123 #endif
1124 }
1125 
1126 #if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS
1127 //! The test for task_handle inside other task waiting with run
1128 //! \brief \ref requirement
1129 TEST_CASE("Task handle for scheduler bypass"){
1130     tbb::task_group tg;
1131     std::atomic<bool> run {false};
1132 
1133     tg.run([&]{
1134         return tg.defer([&]{
1135             run = true;
1136         });
1137     });
1138 
1139     tg.wait();
1140     CHECK_MESSAGE(run == true, "task handle returned by user lambda (bypassed) should be run");
1141 }
1142 
1143 //! The test for task_handle inside other task waiting with run_and_wait
1144 //! \brief \ref requirement
1145 TEST_CASE("Task handle for scheduler bypass via run_and_wait"){
1146     tbb::task_group tg;
1147     std::atomic<bool> run {false};
1148 
1149     tg.run_and_wait([&]{
1150         return tg.defer([&]{
1151             run = true;
1152         });
1153     });
1154 
1155     CHECK_MESSAGE(run == true, "task handle returned by user lambda (bypassed) should be run");
1156 }
1157 #endif //__TBB_PREVIEW_TASK_GROUP_EXTENSIONS
1158 
1159 #if TBB_USE_EXCEPTIONS
1160 //As these tests are against behavior marked by spec as undefined, they can not be put into conformance tests
1161 
1162 //! The test for error in scheduling empty task_handle
1163 //! \brief \ref requirement
1164 TEST_CASE("Empty task_handle cannot be scheduled"
1165         * doctest::should_fail()    //Test needs to revised as implementation uses assertions instead of exceptions
1166         * doctest::skip()           //skip the test for now, to not pollute the test log
1167 ){
1168     tbb::task_group tg;
1169 
1170     CHECK_THROWS_WITH_AS(tg.run(tbb::task_handle{}), "Attempt to schedule empty task_handle", std::runtime_error);
1171 }
1172 
1173 //! The test for error in task_handle being scheduled into task_group different from one it was created from
1174 //! \brief \ref requirement
1175 TEST_CASE("task_handle cannot be scheduled into different task_group"
1176         * doctest::should_fail()    //Test needs to revised as implementation uses assertions instead of exceptions
1177         * doctest::skip()           //skip the test for now, to not pollute the test log
1178 ){
1179     tbb::task_group tg;
1180     tbb::task_group tg1;
1181 
1182     CHECK_THROWS_WITH_AS(tg1.run(tg.defer([]{})), "Attempt to schedule task_handle into different task_group", std::runtime_error);
1183 }
1184 
1185 //! The test for error in task_handle being scheduled into task_group different from one it was created from
1186 //! \brief \ref requirement
1187 TEST_CASE("task_handle cannot be scheduled into other task_group of the same context"
1188         * doctest::should_fail()    //Implementation is no there yet, as it is not clear that is the expected behavior
1189         * doctest::skip()           //skip the test for now, to not pollute the test log
1190 )
1191 {
1192     tbb::task_group_context ctx;
1193 
1194     tbb::task_group tg(ctx);
1195     tbb::task_group tg1(ctx);
1196 
1197     CHECK_NOTHROW(tg.run(tg.defer([]{})));
1198     CHECK_THROWS_WITH_AS(tg1.run(tg.defer([]{})), "Attempt to schedule task_handle into different task_group", std::runtime_error);
1199 }
1200 
1201 #endif // TBB_USE_EXCEPTIONS
1202 
1203