xref: /oneTBB/test/tbb/test_task_group.cpp (revision 3e7d8e7f)
1 /*
2     Copyright (c) 2005-2023 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "common/test.h"
18 #include "common/utils.h"
19 #include "oneapi/tbb/detail/_config.h"
20 #include "tbb/global_control.h"
21 
22 #include "tbb/task_group.h"
23 
24 #include "common/concurrency_tracker.h"
25 
26 #include <atomic>
27 #include <stdexcept>
28 
29 //! \file test_task_group.cpp
30 //! \brief Test for [scheduler.task_group scheduler.task_group_status] specification
31 
32 unsigned g_MaxConcurrency = 4;
33 using atomic_t = std::atomic<std::uintptr_t>;
34 unsigned MinThread = 1;
35 unsigned MaxThread = 4;
36 
37 //------------------------------------------------------------------------
38 // Tests for the thread safety of the task_group manipulations
39 //------------------------------------------------------------------------
40 
41 #include "common/spin_barrier.h"
42 
43 enum SharingMode {
44     VagabondGroup = 1,
45     ParallelWait = 2
46 };
47 
48 template<typename task_group_type>
49 class SharedGroupBodyImpl : utils::NoCopy, utils::NoAfterlife {
50     static const std::uintptr_t c_numTasks0 = 4096,
51                         c_numTasks1 = 1024;
52 
53     const std::uintptr_t m_numThreads;
54     const std::uintptr_t m_sharingMode;
55 
56     task_group_type *m_taskGroup;
57     atomic_t m_tasksSpawned,
58              m_threadsReady;
59     utils::SpinBarrier m_barrier;
60 
61     static atomic_t s_tasksExecuted;
62 
63     struct TaskFunctor {
64         SharedGroupBodyImpl *m_pOwner;
65         void operator () () const {
66             if ( m_pOwner->m_sharingMode & ParallelWait ) {
67                 while ( utils::ConcurrencyTracker::PeakParallelism() < m_pOwner->m_numThreads )
68                     utils::yield();
69             }
70             ++s_tasksExecuted;
71         }
72     };
73 
74     TaskFunctor m_taskFunctor;
75 
76     void Spawn ( std::uintptr_t numTasks ) {
77         for ( std::uintptr_t i = 0; i < numTasks; ++i ) {
78             ++m_tasksSpawned;
79             utils::ConcurrencyTracker ct;
80             m_taskGroup->run( m_taskFunctor );
81         }
82         ++m_threadsReady;
83     }
84 
85     void DeleteTaskGroup () {
86         delete m_taskGroup;
87         m_taskGroup = nullptr;
88     }
89 
90     void Wait () {
91         while ( m_threadsReady != m_numThreads )
92             utils::yield();
93         const std::uintptr_t numSpawned = c_numTasks0 + c_numTasks1 * (m_numThreads - 1);
94         CHECK_MESSAGE( m_tasksSpawned == numSpawned, "Wrong number of spawned tasks. The test is broken" );
95         INFO("Max spawning parallelism is " << utils::ConcurrencyTracker::PeakParallelism() << "out of " << g_MaxConcurrency);
96         if ( m_sharingMode & ParallelWait ) {
97             m_barrier.wait( &utils::ConcurrencyTracker::Reset );
98             {
99                 utils::ConcurrencyTracker ct;
100                 m_taskGroup->wait();
101             }
102             if ( utils::ConcurrencyTracker::PeakParallelism() == 1 ) {
103                 const char* msg = "Warning: No parallel waiting detected in TestParallelWait";
104                 WARN( msg );
105             }
106             m_barrier.wait();
107         }
108         else
109             m_taskGroup->wait();
110         CHECK_MESSAGE( m_tasksSpawned == numSpawned, "No tasks should be spawned after wait starts. The test is broken" );
111         CHECK_MESSAGE( s_tasksExecuted == numSpawned, "Not all spawned tasks were executed" );
112     }
113 
114 public:
115     SharedGroupBodyImpl ( std::uintptr_t numThreads, std::uintptr_t sharingMode = 0 )
116         : m_numThreads(numThreads)
117         , m_sharingMode(sharingMode)
118         , m_taskGroup(nullptr)
119         , m_barrier(numThreads)
120     {
121         CHECK_MESSAGE( m_numThreads > 1, "SharedGroupBody tests require concurrency" );
122         if ((m_sharingMode & VagabondGroup) && m_numThreads != 2) {
123             CHECK_MESSAGE(false, "In vagabond mode SharedGroupBody must be used with 2 threads only");
124         }
125         utils::ConcurrencyTracker::Reset();
126         s_tasksExecuted = 0;
127         m_tasksSpawned = 0;
128         m_threadsReady = 0;
129         m_taskFunctor.m_pOwner = this;
130     }
131 
132     void Run ( std::uintptr_t idx ) {
133         AssertLive();
134         if ( idx == 0 ) {
135             if (m_taskGroup || m_tasksSpawned) {
136                 CHECK_MESSAGE(false, "SharedGroupBody must be reset before reuse");
137             }
138             m_taskGroup = new task_group_type;
139             Spawn( c_numTasks0 );
140             Wait();
141             if ( m_sharingMode & VagabondGroup )
142                 m_barrier.wait();
143             else
144                 DeleteTaskGroup();
145         }
146         else {
147             while ( m_tasksSpawned == 0 )
148                 utils::yield();
149             CHECK_MESSAGE ( m_taskGroup, "Task group is not initialized");
150             Spawn (c_numTasks1);
151             if ( m_sharingMode & ParallelWait )
152                 Wait();
153             if ( m_sharingMode & VagabondGroup ) {
154                 CHECK_MESSAGE ( idx == 1, "In vagabond mode SharedGroupBody must be used with 2 threads only" );
155                 m_barrier.wait();
156                 DeleteTaskGroup();
157             }
158         }
159         AssertLive();
160     }
161 };
162 
163 template<typename task_group_type>
164 atomic_t SharedGroupBodyImpl<task_group_type>::s_tasksExecuted;
165 
166 template<typename task_group_type>
167 class  SharedGroupBody : utils::NoAssign, utils::NoAfterlife {
168     bool m_bOwner;
169     SharedGroupBodyImpl<task_group_type> *m_pImpl;
170 public:
171     SharedGroupBody ( std::uintptr_t numThreads, std::uintptr_t sharingMode = 0 )
172         : utils::NoAssign()
173         , utils::NoAfterlife()
174         , m_bOwner(true)
175         , m_pImpl( new SharedGroupBodyImpl<task_group_type>(numThreads, sharingMode) )
176     {}
177     SharedGroupBody ( const SharedGroupBody& src )
178         : utils::NoAssign()
179         , utils::NoAfterlife()
180         , m_bOwner(false)
181         , m_pImpl(src.m_pImpl)
182     {}
183     ~SharedGroupBody () {
184         if ( m_bOwner )
185             delete m_pImpl;
186     }
187     void operator() ( std::uintptr_t idx ) const {
188         // Wrap the functior into additional task group to enforce bounding.
189         task_group_type tg;
190         tg.run_and_wait([&] { m_pImpl->Run(idx); });
191     }
192 };
193 
194 template<typename task_group_type>
195 class RunAndWaitSyncronizationTestBody : utils::NoAssign {
196     utils::SpinBarrier& m_barrier;
197     std::atomic<bool>& m_completed;
198     task_group_type& m_tg;
199 public:
200     RunAndWaitSyncronizationTestBody(utils::SpinBarrier& barrier, std::atomic<bool>& completed, task_group_type& tg)
201         : m_barrier(barrier), m_completed(completed), m_tg(tg) {}
202 
203     void operator()() const {
204         m_barrier.wait();
205         utils::doDummyWork(100000);
206         m_completed = true;
207     }
208 
209     void operator()(int id) const {
210         if (id == 0) {
211             m_tg.run_and_wait(*this);
212         } else {
213             m_barrier.wait();
214             m_tg.wait();
215             CHECK_MESSAGE(m_completed, "A concurrent waiter has left the wait method earlier than work has finished");
216         }
217     }
218 };
219 
220 template<typename task_group_type>
221 void TestParallelSpawn () {
222     NativeParallelFor( g_MaxConcurrency, SharedGroupBody<task_group_type>(g_MaxConcurrency) );
223 }
224 
225 template<typename task_group_type>
226 void TestParallelWait () {
227     NativeParallelFor( g_MaxConcurrency, SharedGroupBody<task_group_type>(g_MaxConcurrency, ParallelWait) );
228 
229     utils::SpinBarrier barrier(g_MaxConcurrency);
230     std::atomic<bool> completed;
231     completed = false;
232     task_group_type tg;
233     RunAndWaitSyncronizationTestBody<task_group_type> b(barrier, completed, tg);
234     NativeParallelFor( g_MaxConcurrency, b );
235 }
236 
237 // Tests non-stack-bound task group (the group that is allocated by one thread and destroyed by the other)
238 template<typename task_group_type>
239 void TestVagabondGroup () {
240     NativeParallelFor( 2, SharedGroupBody<task_group_type>(2, VagabondGroup) );
241 }
242 
243 #include "common/memory_usage.h"
244 
245 template<typename task_group_type>
246 void TestThreadSafety() {
247     auto tests = [] {
248         for (int trail = 0; trail < 10; ++trail) {
249             TestParallelSpawn<task_group_type>();
250             TestParallelWait<task_group_type>();
251             TestVagabondGroup<task_group_type>();
252         }
253     };
254 
255     // Test and warm up allocator.
256     tests();
257 
258     // Ensure that cosumption is stabilized.
259     std::size_t initial = utils::GetMemoryUsage();
260     for (;;) {
261         tests();
262         std::size_t current = utils::GetMemoryUsage();
263         if (current <= initial) {
264             return;
265         }
266         initial = current;
267     }
268 }
269 //------------------------------------------------------------------------
270 // Common requisites of the Fibonacci tests
271 //------------------------------------------------------------------------
272 
273 const std::uintptr_t N = 20;
274 const std::uintptr_t F = 6765;
275 
276 atomic_t g_Sum;
277 
278 #define FIB_TEST_PROLOGUE() \
279     const unsigned numRepeats = g_MaxConcurrency * 4;    \
280     utils::ConcurrencyTracker::Reset()
281 
282 #define FIB_TEST_EPILOGUE(sum) \
283     CHECK(utils::ConcurrencyTracker::PeakParallelism() <= g_MaxConcurrency); \
284     CHECK( sum == numRepeats * F );
285 
286 
287 // Fibonacci tasks specified as functors
288 template<class task_group_type>
289 class FibTaskBase : utils::NoAssign, utils::NoAfterlife {
290 protected:
291     std::uintptr_t* m_pRes;
292     mutable std::uintptr_t m_Num;
293     virtual void impl() const = 0;
294 public:
295     FibTaskBase( std::uintptr_t* y, std::uintptr_t n ) : m_pRes(y), m_Num(n) {}
296     void operator()() const {
297         utils::ConcurrencyTracker ct;
298         AssertLive();
299         if( m_Num < 2 ) {
300             *m_pRes = m_Num;
301         } else {
302             impl();
303         }
304     }
305     virtual ~FibTaskBase() {}
306 };
307 
308 template<class task_group_type>
309 class FibTaskAsymmetricTreeWithFunctor : public FibTaskBase<task_group_type> {
310 public:
311     FibTaskAsymmetricTreeWithFunctor( std::uintptr_t* y, std::uintptr_t n ) : FibTaskBase<task_group_type>(y, n) {}
312     virtual void impl() const override {
313         std::uintptr_t x = ~0u;
314         task_group_type tg;
315         tg.run( FibTaskAsymmetricTreeWithFunctor(&x, this->m_Num-1) );
316         this->m_Num -= 2; tg.run_and_wait( *this );
317         *(this->m_pRes) += x;
318     }
319 };
320 
321 template<class task_group_type>
322 class FibTaskSymmetricTreeWithFunctor : public FibTaskBase<task_group_type> {
323 public:
324     FibTaskSymmetricTreeWithFunctor( std::uintptr_t* y, std::uintptr_t n ) : FibTaskBase<task_group_type>(y, n) {}
325     virtual void impl() const override {
326         std::uintptr_t x = ~0u,
327                y = ~0u;
328         task_group_type tg;
329         tg.run( FibTaskSymmetricTreeWithFunctor(&x, this->m_Num-1) );
330         tg.run( FibTaskSymmetricTreeWithFunctor(&y, this->m_Num-2) );
331         tg.wait();
332         *(this->m_pRes) = x + y;
333     }
334 };
335 
336 // Helper functions
337 template<class fib_task>
338 std::uintptr_t RunFibTask(std::uintptr_t n) {
339     std::uintptr_t res = ~0u;
340     fib_task(&res, n)();
341     return res;
342 }
343 
344 template<typename fib_task>
345 void RunFibTest() {
346     FIB_TEST_PROLOGUE();
347     std::uintptr_t sum = 0;
348     for( unsigned i = 0; i < numRepeats; ++i )
349         sum += RunFibTask<fib_task>(N);
350     FIB_TEST_EPILOGUE(sum);
351 }
352 
353 template<typename fib_task>
354 void FibFunctionNoArgs() {
355     g_Sum += RunFibTask<fib_task>(N);
356 }
357 
358 template<typename task_group_type>
359 void TestFibWithLambdas() {
360     FIB_TEST_PROLOGUE();
361     atomic_t sum;
362     sum = 0;
363     task_group_type tg;
364     for( unsigned i = 0; i < numRepeats; ++i )
365         tg.run( [&](){sum += RunFibTask<FibTaskSymmetricTreeWithFunctor<task_group_type> >(N);} );
366     tg.wait();
367     FIB_TEST_EPILOGUE(sum);
368 }
369 
370 template<typename task_group_type>
371 void TestFibWithFunctor() {
372     RunFibTest<FibTaskAsymmetricTreeWithFunctor<task_group_type> >();
373     RunFibTest< FibTaskSymmetricTreeWithFunctor<task_group_type> >();
374 }
375 
376 template<typename task_group_type>
377 void TestFibWithFunctionPtr() {
378     FIB_TEST_PROLOGUE();
379     g_Sum = 0;
380     task_group_type tg;
381     for( unsigned i = 0; i < numRepeats; ++i )
382         tg.run( &FibFunctionNoArgs<FibTaskSymmetricTreeWithFunctor<task_group_type> > );
383     tg.wait();
384     FIB_TEST_EPILOGUE(g_Sum);
385 }
386 
387 template<typename task_group_type>
388 void RunFibonacciTests() {
389     TestFibWithLambdas<task_group_type>();
390     TestFibWithFunctor<task_group_type>();
391     TestFibWithFunctionPtr<task_group_type>();
392 }
393 
394 class test_exception : public std::exception
395 {
396     const char* m_strDescription;
397 public:
398     test_exception ( const char* descr ) : m_strDescription(descr) {}
399 
400     const char* what() const throw() override { return m_strDescription; }
401 };
402 
403 using TestException = test_exception;
404 
405 #include <string.h>
406 
407 #define NUM_CHORES      512
408 #define NUM_GROUPS      64
409 #define SKIP_CHORES     (NUM_CHORES/4)
410 #define SKIP_GROUPS     (NUM_GROUPS/4)
411 #define EXCEPTION_DESCR1 "Test exception 1"
412 #define EXCEPTION_DESCR2 "Test exception 2"
413 
414 atomic_t g_ExceptionCount;
415 atomic_t g_TaskCount;
416 unsigned g_ExecutedAtCancellation;
417 bool g_Rethrow;
418 bool g_Throw;
419 
420 class ThrowingTask : utils::NoAssign, utils::NoAfterlife {
421     atomic_t &m_TaskCount;
422 public:
423     ThrowingTask( atomic_t& counter ) : m_TaskCount(counter) {}
424     void operator() () const {
425         utils::ConcurrencyTracker ct;
426         AssertLive();
427         if ( g_Throw ) {
428             if ( ++m_TaskCount == SKIP_CHORES )
429                 TBB_TEST_THROW(test_exception(EXCEPTION_DESCR1));
430             utils::yield();
431         }
432         else {
433             ++g_TaskCount;
434             while( !tbb::is_current_task_group_canceling() )
435                 utils::yield();
436         }
437     }
438 };
439 
440 inline void ResetGlobals ( bool bThrow, bool bRethrow ) {
441     g_Throw = bThrow;
442     g_Rethrow = bRethrow;
443     g_ExceptionCount = 0;
444     g_TaskCount = 0;
445     utils::ConcurrencyTracker::Reset();
446 }
447 
448 template<typename task_group_type>
449 void LaunchChildrenWithFunctor () {
450     atomic_t count;
451     count = 0;
452     task_group_type g;
453     for (unsigned i = 0; i < NUM_CHORES; ++i) {
454         if (i % 2 == 1) {
455             g.run(g.defer(ThrowingTask(count)));
456         } else
457         {
458             g.run(ThrowingTask(count));
459         }
460     }
461 #if TBB_USE_EXCEPTIONS
462     tbb::task_group_status status = tbb::not_complete;
463     bool exceptionCaught = false;
464     try {
465         status = g.wait();
466     } catch ( TestException& e ) {
467         CHECK_MESSAGE( e.what(), "Empty what() string" );
468         CHECK_MESSAGE( strcmp(e.what(), EXCEPTION_DESCR1) == 0, "Unknown exception" );
469         exceptionCaught = true;
470         ++g_ExceptionCount;
471     } catch( ... ) { CHECK_MESSAGE( false, "Unknown exception" ); }
472     if (g_Throw && !exceptionCaught && status != tbb::canceled) {
473         CHECK_MESSAGE(false, "No exception in the child task group");
474     }
475     if ( g_Rethrow && g_ExceptionCount > SKIP_GROUPS ) {
476         throw test_exception(EXCEPTION_DESCR2);
477     }
478 #else
479     g.wait();
480 #endif
481 }
482 
483 // Tests for cancellation and exception handling behavior
484 template<typename task_group_type>
485 void TestManualCancellationWithFunctor () {
486     ResetGlobals( false, false );
487     task_group_type tg;
488     for (unsigned i = 0; i < NUM_GROUPS; ++i) {
489         // TBB version does not require taking function address
490         if (i % 2 == 0) {
491             auto h = tg.defer(&LaunchChildrenWithFunctor<task_group_type>);
492             tg.run(std::move(h));
493         } else
494         {
495             tg.run(&LaunchChildrenWithFunctor<task_group_type>);
496         }
497     }
498     CHECK_MESSAGE ( !tbb::is_current_task_group_canceling(), "Unexpected cancellation" );
499     while ( g_MaxConcurrency > 1 && g_TaskCount == 0 )
500         utils::yield();
501     tg.cancel();
502     g_ExecutedAtCancellation = int(g_TaskCount);
503     tbb::task_group_status status = tg.wait();
504     CHECK_MESSAGE( status == tbb::canceled, "Task group reported invalid status." );
505     CHECK_MESSAGE( g_TaskCount <= NUM_GROUPS * NUM_CHORES, "Too many tasks reported. The test is broken" );
506     CHECK_MESSAGE( g_TaskCount < NUM_GROUPS * NUM_CHORES, "No tasks were cancelled. Cancellation model changed?" );
507     CHECK_MESSAGE( g_TaskCount <= g_ExecutedAtCancellation + utils::ConcurrencyTracker::PeakParallelism(), "Too many tasks survived cancellation" );
508 }
509 
510 #if TBB_USE_EXCEPTIONS
511 template<typename task_group_type>
512 void TestExceptionHandling1 () {
513     ResetGlobals( true, false );
514     task_group_type tg;
515     for( unsigned i = 0; i < NUM_GROUPS; ++i )
516         // TBB version does not require taking function address
517         tg.run( &LaunchChildrenWithFunctor<task_group_type> );
518     try {
519         tg.wait();
520     } catch ( ... ) {
521         CHECK_MESSAGE( false, "Unexpected exception" );
522     }
523     CHECK_MESSAGE( g_ExceptionCount <= NUM_GROUPS, "Too many exceptions from the child groups. The test is broken" );
524     CHECK_MESSAGE( g_ExceptionCount == NUM_GROUPS, "Not all child groups threw the exception" );
525 }
526 
527 template<typename task_group_type>
528 void TestExceptionHandling2 () {
529     ResetGlobals( true, true );
530     task_group_type tg;
531     bool exceptionCaught = false;
532     for( unsigned i = 0; i < NUM_GROUPS; ++i ) {
533         // TBB version does not require taking function address
534         tg.run( &LaunchChildrenWithFunctor<task_group_type> );
535     }
536     try {
537         tg.wait();
538     } catch ( TestException& e ) {
539         CHECK_MESSAGE( e.what(), "Empty what() string" );
540         CHECK_MESSAGE( strcmp(e.what(), EXCEPTION_DESCR2) == 0, "Unknown exception" );
541         exceptionCaught = true;
542     } catch( ... ) { CHECK_MESSAGE( false, "Unknown exception" ); }
543     CHECK_MESSAGE( exceptionCaught, "No exception thrown from the root task group" );
544     CHECK_MESSAGE( g_ExceptionCount >= SKIP_GROUPS, "Too few exceptions from the child groups. The test is broken" );
545     CHECK_MESSAGE( g_ExceptionCount <= NUM_GROUPS - SKIP_GROUPS, "Too many exceptions from the child groups. The test is broken" );
546     CHECK_MESSAGE( g_ExceptionCount < NUM_GROUPS - SKIP_GROUPS, "None of the child groups was cancelled" );
547 }
548 
549 template <typename task_group_type>
550 void TestExceptionHandling3() {
551     task_group_type tg;
552     try {
553         tg.run_and_wait([]() {
554             volatile bool suppress_unreachable_code_warning = true;
555             if (suppress_unreachable_code_warning) {
556                 throw 1;
557             }
558         });
559     } catch (int error) {
560         CHECK(error == 1);
561     } catch ( ... ) {
562         CHECK_MESSAGE( false, "Unexpected exception" );
563     }
564 }
565 
566 template<typename task_group_type>
567 class LaunchChildrenDriver {
568 public:
569     void Launch(task_group_type& tg) {
570         ResetGlobals(false, false);
571         for (unsigned i = 0; i < NUM_GROUPS; ++i) {
572             tg.run(LaunchChildrenWithFunctor<task_group_type>);
573         }
574         CHECK_MESSAGE(!tbb::is_current_task_group_canceling(), "Unexpected cancellation");
575         while (g_MaxConcurrency > 1 && g_TaskCount == 0)
576             utils::yield();
577     }
578 
579     void Finish() {
580         CHECK_MESSAGE(g_TaskCount <= NUM_GROUPS * NUM_CHORES, "Too many tasks reported. The test is broken");
581         CHECK_MESSAGE(g_TaskCount < NUM_GROUPS * NUM_CHORES, "No tasks were cancelled. Cancellation model changed?");
582         CHECK_MESSAGE(g_TaskCount <= g_ExecutedAtCancellation + g_MaxConcurrency, "Too many tasks survived cancellation");
583     }
584 }; // LaunchChildrenWithTaskHandleDriver
585 
586 template<typename task_group_type, bool Throw>
587 void TestMissingWait () {
588     bool exception_occurred = false,
589          unexpected_exception = false;
590     LaunchChildrenDriver<task_group_type> driver;
591     try {
592         task_group_type tg;
593         driver.Launch( tg );
594         volatile bool suppress_unreachable_code_warning = Throw;
595         if (suppress_unreachable_code_warning) {
596             throw int(); // Initiate stack unwinding
597         }
598     }
599     catch ( const tbb::missing_wait& e ) {
600         CHECK_MESSAGE( e.what(), "Error message is absent" );
601         exception_occurred = true;
602         unexpected_exception = Throw;
603     }
604     catch ( int ) {
605         exception_occurred = true;
606         unexpected_exception = !Throw;
607     }
608     catch ( ... ) {
609         exception_occurred = unexpected_exception = true;
610     }
611     CHECK( exception_occurred );
612     CHECK( !unexpected_exception );
613     driver.Finish();
614 }
615 #endif
616 
617 template<typename task_group_type>
618 void RunCancellationAndExceptionHandlingTests() {
619     TestManualCancellationWithFunctor<task_group_type>();
620 #if TBB_USE_EXCEPTIONS
621     TestExceptionHandling1<task_group_type>();
622     TestExceptionHandling2<task_group_type>();
623     TestExceptionHandling3<task_group_type>();
624     TestMissingWait<task_group_type, true>();
625     TestMissingWait<task_group_type, false>();
626 #endif
627 }
628 
629 void EmptyFunction () {}
630 
631 struct TestFunctor {
632     void operator()() { CHECK_MESSAGE( false, "Non-const operator called" ); }
633     void operator()() const { /* library requires this overload only */ }
634 };
635 
636 template<typename task_group_type>
637 void TestConstantFunctorRequirement() {
638     task_group_type g;
639     TestFunctor tf;
640     g.run( tf ); g.wait();
641     g.run_and_wait( tf );
642 }
643 
644 //------------------------------------------------------------------------
645 namespace TestMoveSemanticsNS {
646     struct TestFunctor {
647         void operator()() const {};
648     };
649 
650     struct MoveOnlyFunctor : utils::MoveOnly, TestFunctor {
651         MoveOnlyFunctor() : utils::MoveOnly() {};
652         MoveOnlyFunctor(MoveOnlyFunctor&& other) : utils::MoveOnly(std::move(other)) {};
653     };
654 
655     struct MovePreferableFunctor : utils::Movable, TestFunctor {
656         MovePreferableFunctor() : utils::Movable() {};
657         MovePreferableFunctor(MovePreferableFunctor&& other) : utils::Movable(std::move(other)) {};
658         MovePreferableFunctor(const MovePreferableFunctor& other) : utils::Movable(other) {};
659     };
660 
661     struct NoMoveNoCopyFunctor : utils::NoCopy, TestFunctor {
662         NoMoveNoCopyFunctor() : utils::NoCopy() {};
663         // mv ctor is not allowed as cp ctor from parent utils::NoCopy
664     private:
665         NoMoveNoCopyFunctor(NoMoveNoCopyFunctor&&);
666     };
667 
668      template<typename task_group_type>
669     void TestBareFunctors() {
670         task_group_type tg;
671         MovePreferableFunctor mpf;
672         // run_and_wait() doesn't have any copies or moves of arguments inside the impl
673         tg.run_and_wait( NoMoveNoCopyFunctor() );
674 
675         tg.run( MoveOnlyFunctor() );
676         tg.wait();
677 
678         tg.run( mpf );
679         tg.wait();
680         CHECK_MESSAGE(mpf.alive, "object was moved when was passed by lval");
681         mpf.Reset();
682 
683         tg.run( std::move(mpf) );
684         tg.wait();
685         CHECK_MESSAGE(!mpf.alive, "object was copied when was passed by rval");
686         mpf.Reset();
687     }
688 }
689 
690 template<typename task_group_type>
691 void TestMoveSemantics() {
692     TestMoveSemanticsNS::TestBareFunctors<task_group_type>();
693 }
694 //------------------------------------------------------------------------
695 
696 // TODO: TBB_REVAMP_TODO - enable when ETS is available
697 #if TBBTEST_USE_TBB && TBB_PREVIEW_ISOLATED_TASK_GROUP
698 namespace TestIsolationNS {
699     class DummyFunctor {
700     public:
701         DummyFunctor() {}
702         void operator()() const {
703             for ( volatile int j = 0; j < 10; ++j ) {}
704         }
705     };
706 
707     template<typename task_group_type>
708     class ParForBody {
709         task_group_type& m_tg;
710         std::atomic<bool>& m_preserved;
711         tbb::enumerable_thread_specific<int>& m_ets;
712     public:
713         ParForBody(
714             task_group_type& tg,
715             std::atomic<bool>& preserved,
716             tbb::enumerable_thread_specific<int>& ets
717         ) : m_tg(tg), m_preserved(preserved), m_ets(ets) {}
718 
719         void operator()(int) const {
720             if (++m_ets.local() > 1) m_preserved = false;
721 
722             for (int i = 0; i < 1000; ++i)
723                 m_tg.run(DummyFunctor());
724             m_tg.wait();
725             m_tg.run_and_wait(DummyFunctor());
726 
727             --m_ets.local();
728         }
729     };
730 
731     template<typename task_group_type>
732     void CheckIsolation(bool isolation_is_expected) {
733         task_group_type tg;
734         std::atomic<bool> isolation_is_preserved;
735         isolation_is_preserved = true;
736         tbb::enumerable_thread_specific<int> ets(0);
737 
738         tbb::parallel_for(0, 100, ParForBody<task_group_type>(tg, isolation_is_preserved, ets));
739 
740         ASSERT(
741             isolation_is_expected == isolation_is_preserved,
742             "Actual and expected isolation-related behaviours are different"
743         );
744     }
745 
746     // Should be called only when > 1 thread is used, because otherwise isolation is guaranteed to take place
747     void TestIsolation() {
748         CheckIsolation<tbb::task_group>(false);
749         CheckIsolation<tbb::isolated_task_group>(true);
750     }
751 }
752 #endif
753 
754 #if __TBB_USE_ADDRESS_SANITIZER
755 //! Test for thread safety for the task_group
756 //! \brief \ref error_guessing \ref resource_usage
757 TEST_CASE("Memory leaks test is not applicable under ASAN\n" * doctest::skip(true)) {}
758 #else
759 //! Test for thread safety for the task_group
760 //! \brief \ref error_guessing \ref resource_usage
761 TEST_CASE("Thread safety test for the task group") {
762     if (tbb::this_task_arena::max_concurrency() < 2) {
763         // The test requires more than one thread to check thread safety
764         return;
765     }
766     for (unsigned p=MinThread; p <= MaxThread; ++p) {
767         if (p < 2) {
768             continue;
769         }
770         tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p);
771         g_MaxConcurrency = p;
772         TestThreadSafety<tbb::task_group>();
773     }
774 }
775 #endif
776 
777 //! Fibonacci test for task group
778 //! \brief \ref interface \ref requirement
779 TEST_CASE("Fibonacci test for the task group") {
780     for (unsigned p=MinThread; p <= MaxThread; ++p) {
781         tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p);
782         g_MaxConcurrency = p;
783         RunFibonacciTests<tbb::task_group>();
784     }
785 }
786 
787 //! Cancellation and exception test for the task group
788 //! \brief \ref interface \ref requirement
789 TEST_CASE("Cancellation and exception test for the task group") {
790     for (unsigned p = MinThread; p <= MaxThread; ++p) {
791         tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p);
792         tbb::task_arena a(p);
793         g_MaxConcurrency = p;
794         a.execute([] {
795             RunCancellationAndExceptionHandlingTests<tbb::task_group>();
796         });
797     }
798 }
799 
800 //! Constant functor test for the task group
801 //! \brief \ref interface \ref negative
802 TEST_CASE("Constant functor test for the task group") {
803     for (unsigned p=MinThread; p <= MaxThread; ++p) {
804         tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p);
805         g_MaxConcurrency = p;
806         TestConstantFunctorRequirement<tbb::task_group>();
807     }
808 }
809 
810 //! Move semantics test for the task group
811 //! \brief \ref interface \ref requirement
812 TEST_CASE("Move semantics test for the task group") {
813     for (unsigned p=MinThread; p <= MaxThread; ++p) {
814         tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p);
815         g_MaxConcurrency = p;
816         TestMoveSemantics<tbb::task_group>();
817     }
818 }
819 
820 #if TBB_PREVIEW_ISOLATED_TASK_GROUP
821 
822 #if __TBB_USE_ADDRESS_SANITIZER
823 //! Test for thread safety for the isolated_task_group
824 //! \brief \ref error_guessing
825 TEST_CASE("Memory leaks test is not applicable under ASAN\n" * doctest::skip(true)) {}
826 #else
827 //! Test for thread safety for the isolated_task_group
828 //! \brief \ref error_guessing
829 TEST_CASE("Thread safety test for the isolated task group") {
830     if (tbb::this_task_arena::max_concurrency() < 2) {
831         // The test requires more than one thread to check thread safety
832         return;
833     }
834     for (unsigned p=MinThread; p <= MaxThread; ++p) {
835         if (p < 2) {
836             continue;
837         }
838         tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p);
839         g_MaxConcurrency = p;
840         TestThreadSafety<tbb::isolated_task_group>();
841     }
842 }
843 #endif
844 
845 //! Cancellation and exception test for the isolated task group
846 //! \brief \ref interface \ref requirement
847 TEST_CASE("Fibonacci test for the isolated task group") {
848     for (unsigned p=MinThread; p <= MaxThread; ++p) {
849         tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p);
850         g_MaxConcurrency = p;
851         RunFibonacciTests<tbb::isolated_task_group>();
852     }
853 }
854 
855 //! Cancellation and exception test for the isolated task group
856 //! \brief \ref interface \ref requirement
857 TEST_CASE("Cancellation and exception test for the isolated task group") {
858     for (unsigned p=MinThread; p <= MaxThread; ++p) {
859         tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p);
860         g_MaxConcurrency = p;
861         RunCancellationAndExceptionHandlingTests<tbb::isolated_task_group>();
862     }
863 }
864 
865 //! Constant functor test for the isolated task group.
866 //! \brief \ref interface \ref negative
867 TEST_CASE("Constant functor test for the isolated task group") {
868     for (unsigned p=MinThread; p <= MaxThread; ++p) {
869         tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p);
870         g_MaxConcurrency = p;
871         TestConstantFunctorRequirement<tbb::isolated_task_group>();
872     }
873 }
874 
875 //! Move semantics test for the isolated task group.
876 //! \brief \ref interface \ref requirement
877 TEST_CASE("Move semantics test for the isolated task group") {
878     for (unsigned p=MinThread; p <= MaxThread; ++p) {
879         tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p);
880         g_MaxConcurrency = p;
881         TestMoveSemantics<tbb::isolated_task_group>();
882     }
883 }
884 
885 //TODO: add test void isolated_task_group::run(d2::task_handle&& h) and isolated_task_group::::run_and_wait(d2::task_handle&& h)
886 #endif /* TBB_PREVIEW_ISOLATED_TASK_GROUP */
887 
888 void run_deep_stealing(tbb::task_group& tg1, tbb::task_group& tg2, int num_tasks, std::atomic<int>& tasks_executed) {
889     for (int i = 0; i < num_tasks; ++i) {
890         tg2.run([&tg1, &tasks_executed] {
891             volatile char consume_stack[1000]{};
892             ++tasks_executed;
893             tg1.wait();
894             utils::suppress_unused_warning(consume_stack);
895         });
896     }
897 }
898 
899 // TODO: move to the conformance test
900 //! Test for stack overflow avoidance mechanism.
901 //! \brief \ref requirement
902 TEST_CASE("Test for stack overflow avoidance mechanism") {
903     if (tbb::this_task_arena::max_concurrency() < 2) {
904         return;
905     }
906 
907     tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, 2);
908     tbb::task_group tg1;
909     tbb::task_group tg2;
910     std::atomic<int> tasks_executed{};
911     tg1.run_and_wait([&tg1, &tg2, &tasks_executed] {
912         run_deep_stealing(tg1, tg2, 10000, tasks_executed);
913         while (tasks_executed < 100) {
914             // Some stealing is expected to happen.
915             utils::yield();
916         }
917         CHECK(tasks_executed < 10000);
918     });
919     tg2.wait();
920     CHECK(tasks_executed == 10000);
921 }
922 
923 //! Test for stack overflow avoidance mechanism.
924 //! \brief \ref error_guessing
925 TEST_CASE("Test for stack overflow avoidance mechanism within arena") {
926     if (tbb::this_task_arena::max_concurrency() < 2) {
927         return;
928     }
929 
930     tbb::task_arena a1(2, 1);
931     a1.execute([] {
932         tbb::task_group tg1;
933         tbb::task_group tg2;
934         std::atomic<int> tasks_executed{};
935 
936         // Determine nested task execution limit.
937         int second_thread_executed{};
938         tg1.run_and_wait([&tg1, &tg2, &tasks_executed, &second_thread_executed] {
939             run_deep_stealing(tg1, tg2, 10000, tasks_executed);
940             do {
941                 second_thread_executed = tasks_executed;
942                 utils::Sleep(10);
943             } while (second_thread_executed < 100 || second_thread_executed != tasks_executed);
944             CHECK(tasks_executed < 10000);
945         });
946         tg2.wait();
947         CHECK(tasks_executed == 10000);
948 
949         tasks_executed = 0;
950         tbb::task_arena a2(2, 2);
951         tg1.run_and_wait([&a2, &tg1, &tg2, &tasks_executed, second_thread_executed] {
952             run_deep_stealing(tg1, tg2, second_thread_executed - 1, tasks_executed);
953             while (tasks_executed < second_thread_executed - 1) {
954                 // Wait until the second thread near the limit.
955                 utils::yield();
956             }
957             tg2.run([&a2, &tg1, &tasks_executed] {
958                 a2.execute([&tg1, &tasks_executed] {
959                     volatile char consume_stack[1000]{};
960                     ++tasks_executed;
961                     tg1.wait();
962                     utils::suppress_unused_warning(consume_stack);
963                 });
964             });
965             while (tasks_executed < second_thread_executed) {
966                 // Wait until the second joins the arena.
967                 utils::yield();
968             }
969             a2.execute([&tg1, &tg2, &tasks_executed] {
970                 run_deep_stealing(tg1, tg2, 10000, tasks_executed);
971             });
972             int currently_executed{};
973             do {
974                 currently_executed = tasks_executed;
975                 utils::Sleep(10);
976             } while (currently_executed != tasks_executed);
977             CHECK(tasks_executed < 10000 + second_thread_executed);
978         });
979         a2.execute([&tg2] {
980             tg2.wait();
981         });
982         CHECK(tasks_executed == 10000 + second_thread_executed);
983     });
984 }
985 
986 //! Test checks that we can submit work to task_group asynchronously with waiting.
987 //! \brief \ref regression
988 TEST_CASE("Async task group") {
989     int num_threads = tbb::this_task_arena::max_concurrency();
990     if (num_threads < 3) {
991         // The test requires at least 2 worker threads
992         return;
993     }
994     tbb::task_arena a(2*num_threads, num_threads);
995     utils::SpinBarrier barrier(num_threads + 2);
996     tbb::task_group tg[2];
997     std::atomic<bool> finished[2]{};
998     finished[0] = false; finished[1] = false;
999     for (int i = 0; i < 2; ++i) {
1000         a.enqueue([i, &tg, &finished, &barrier] {
1001             barrier.wait();
1002             for (int j = 0; j < 10000; ++j) {
1003                 tg[i].run([] {});
1004                 utils::yield();
1005             }
1006             finished[i] = true;
1007         });
1008     }
1009     utils::NativeParallelFor(num_threads, [&](int idx) {
1010         barrier.wait();
1011         a.execute([idx, &tg, &finished] {
1012             std::size_t counter{};
1013             while (!finished[idx%2]) {
1014                 tg[idx%2].wait();
1015                 if (counter++ % 16 == 0) utils::yield();
1016             }
1017             tg[idx%2].wait();
1018         });
1019     });
1020 }
1021 
1022 struct SelfRunner {
1023     tbb::task_group& m_tg;
1024     std::atomic<unsigned>& count;
1025     void operator()() const {
1026         unsigned previous_count = count.fetch_sub(1);
1027         if (previous_count > 1)
1028             m_tg.run( *this );
1029     }
1030 };
1031 
1032 //! Submit work to single task_group instance from inside the work
1033 //! \brief \ref error_guessing
1034 TEST_CASE("Run self using same task_group instance") {
1035     const unsigned num = 10;
1036     std::atomic<unsigned> count{num};
1037     tbb::task_group tg;
1038     SelfRunner uf{tg, count};
1039     tg.run( uf );
1040     tg.wait();
1041     CHECK_MESSAGE(
1042         count == 0,
1043         "Not all tasks were spawned from inside the functor running within task_group."
1044     );
1045 }
1046 
1047 //TODO: move to some common place to share with conformance tests
1048 namespace accept_task_group_context {
1049 
1050 template <typename TaskGroup, typename CancelF, typename WaitF>
1051 void run_cancellation_use_case(CancelF&& cancel, WaitF&& wait) {
1052     std::atomic<bool> outer_cancelled{false};
1053     std::atomic<unsigned> count{13};
1054 
1055     tbb::task_group_context inner_ctx(tbb::task_group_context::isolated);
1056     TaskGroup inner_tg(inner_ctx);
1057 
1058     tbb::task_group outer_tg;
1059     auto outer_tg_task = [&] {
1060         inner_tg.run([&] {
1061             utils::SpinWaitUntilEq(outer_cancelled, true);
1062             inner_tg.run( SelfRunner{inner_tg, count} );
1063         });
1064 
1065         utils::try_call([&] {
1066             std::forward<CancelF>(cancel)(outer_tg);
1067         }).on_completion([&] {
1068             outer_cancelled = true;
1069         });
1070     };
1071 
1072     auto check = [&] {
1073         tbb::task_group_status outer_status = tbb::task_group_status::not_complete;
1074         outer_status = std::forward<WaitF>(wait)(outer_tg);
1075         CHECK_MESSAGE(
1076             outer_status == tbb::task_group_status::canceled,
1077             "Outer task group should have been cancelled."
1078         );
1079 
1080         tbb::task_group_status inner_status = inner_tg.wait();
1081         CHECK_MESSAGE(
1082             inner_status == tbb::task_group_status::complete,
1083             "Inner task group should have completed despite the cancellation of the outer one."
1084         );
1085 
1086         CHECK_MESSAGE(0 == count, "Some of the inner group tasks were not executed.");
1087     };
1088 
1089     outer_tg.run(outer_tg_task);
1090     check();
1091 }
1092 
1093 template <typename TaskGroup>
1094 void test() {
1095     run_cancellation_use_case<TaskGroup>(
1096         [](tbb::task_group& outer) { outer.cancel(); },
1097         [](tbb::task_group& outer) { return outer.wait(); }
1098     );
1099 
1100 #if TBB_USE_EXCEPTIONS
1101     run_cancellation_use_case<TaskGroup>(
1102         [](tbb::task_group& /*outer*/) {
1103             volatile bool suppress_unreachable_code_warning = true;
1104             if (suppress_unreachable_code_warning) {
1105                 throw int();
1106             }
1107         },
1108         [](tbb::task_group& outer) {
1109             try {
1110                 outer.wait();
1111                 return tbb::task_group_status::complete;
1112             } catch(const int&) {
1113                 return tbb::task_group_status::canceled;
1114             }
1115         }
1116     );
1117 #endif
1118 }
1119 
1120 } // namespace accept_task_group_context
1121 
1122 //! Respect task_group_context passed from outside
1123 //! \brief \ref interface \ref requirement
1124 TEST_CASE("Respect task_group_context passed from outside") {
1125 #if TBB_PREVIEW_ISOLATED_TASK_GROUP
1126     accept_task_group_context::test<tbb::isolated_task_group>();
1127 #endif
1128 }
1129 
1130 #if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS
1131 //! The test for task_handle inside other task waiting with run
1132 //! \brief \ref requirement
1133 TEST_CASE("Task handle for scheduler bypass"){
1134     tbb::task_group tg;
1135     std::atomic<bool> run {false};
1136 
1137     tg.run([&]{
1138         return tg.defer([&]{
1139             run = true;
1140         });
1141     });
1142 
1143     tg.wait();
1144     CHECK_MESSAGE(run == true, "task handle returned by user lambda (bypassed) should be run");
1145 }
1146 
1147 //! The test for task_handle inside other task waiting with run_and_wait
1148 //! \brief \ref requirement
1149 TEST_CASE("Task handle for scheduler bypass via run_and_wait"){
1150     tbb::task_group tg;
1151     std::atomic<bool> run {false};
1152 
1153     tg.run_and_wait([&]{
1154         return tg.defer([&]{
1155             run = true;
1156         });
1157     });
1158 
1159     CHECK_MESSAGE(run == true, "task handle returned by user lambda (bypassed) should be run");
1160 }
1161 #endif //__TBB_PREVIEW_TASK_GROUP_EXTENSIONS
1162 
1163 #if TBB_USE_EXCEPTIONS
1164 //As these tests are against behavior marked by spec as undefined, they can not be put into conformance tests
1165 
1166 //! The test for error in scheduling empty task_handle
1167 //! \brief \ref requirement
1168 TEST_CASE("Empty task_handle cannot be scheduled"
1169         * doctest::should_fail()    //Test needs to revised as implementation uses assertions instead of exceptions
1170         * doctest::skip()           //skip the test for now, to not pollute the test log
1171 ){
1172     tbb::task_group tg;
1173 
1174     CHECK_THROWS_WITH_AS(tg.run(tbb::task_handle{}), "Attempt to schedule empty task_handle", std::runtime_error);
1175 }
1176 
1177 //! The test for error in task_handle being scheduled into task_group different from one it was created from
1178 //! \brief \ref requirement
1179 TEST_CASE("task_handle cannot be scheduled into different task_group"
1180         * doctest::should_fail()    //Test needs to revised as implementation uses assertions instead of exceptions
1181         * doctest::skip()           //skip the test for now, to not pollute the test log
1182 ){
1183     tbb::task_group tg;
1184     tbb::task_group tg1;
1185 
1186     CHECK_THROWS_WITH_AS(tg1.run(tg.defer([]{})), "Attempt to schedule task_handle into different task_group", std::runtime_error);
1187 }
1188 
1189 //! The test for error in task_handle being scheduled into task_group different from one it was created from
1190 //! \brief \ref requirement
1191 TEST_CASE("task_handle cannot be scheduled into other task_group of the same context"
1192         * doctest::should_fail()    //Implementation is no there yet, as it is not clear that is the expected behavior
1193         * doctest::skip()           //skip the test for now, to not pollute the test log
1194 )
1195 {
1196     tbb::task_group_context ctx;
1197 
1198     tbb::task_group tg(ctx);
1199     tbb::task_group tg1(ctx);
1200 
1201     CHECK_NOTHROW(tg.run(tg.defer([]{})));
1202     CHECK_THROWS_WITH_AS(tg1.run(tg.defer([]{})), "Attempt to schedule task_handle into different task_group", std::runtime_error);
1203 }
1204 
1205 #endif // TBB_USE_EXCEPTIONS
1206 
1207