xref: /oneTBB/test/tbb/test_task_group.cpp (revision 74b7fc74)
1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #if __TBB_CPF_BUILD
18 #define TBB_PREVIEW_ISOLATED_TASK_GROUP 1
19 #define TBB_PREVIEW_TASK_GROUP_EXTENSIONS 1
20 #endif
21 
22 #include "common/test.h"
23 #include "common/utils.h"
24 #include "oneapi/tbb/detail/_config.h"
25 #include "tbb/global_control.h"
26 
27 #include "tbb/task_group.h"
28 
29 #include "common/concurrency_tracker.h"
30 
31 #include <atomic>
32 #include <stdexcept>
33 
34 //! \file test_task_group.cpp
35 //! \brief Test for [scheduler.task_group scheduler.task_group_status] specification
36 
37 unsigned g_MaxConcurrency = 4;
38 using atomic_t = std::atomic<std::uintptr_t>;
39 unsigned MinThread = 1;
40 unsigned MaxThread = 4;
41 
42 //------------------------------------------------------------------------
43 // Tests for the thread safety of the task_group manipulations
44 //------------------------------------------------------------------------
45 
46 #include "common/spin_barrier.h"
47 
48 enum SharingMode {
49     VagabondGroup = 1,
50     ParallelWait = 2
51 };
52 
53 template<typename task_group_type>
54 class SharedGroupBodyImpl : utils::NoCopy, utils::NoAfterlife {
55     static const std::uintptr_t c_numTasks0 = 4096,
56                         c_numTasks1 = 1024;
57 
58     const std::uintptr_t m_numThreads;
59     const std::uintptr_t m_sharingMode;
60 
61     task_group_type *m_taskGroup;
62     atomic_t m_tasksSpawned,
63              m_threadsReady;
64     utils::SpinBarrier m_barrier;
65 
66     static atomic_t s_tasksExecuted;
67 
68     struct TaskFunctor {
69         SharedGroupBodyImpl *m_pOwner;
70         void operator () () const {
71             if ( m_pOwner->m_sharingMode & ParallelWait ) {
72                 while ( utils::ConcurrencyTracker::PeakParallelism() < m_pOwner->m_numThreads )
73                     utils::yield();
74             }
75             ++s_tasksExecuted;
76         }
77     };
78 
79     TaskFunctor m_taskFunctor;
80 
81     void Spawn ( std::uintptr_t numTasks ) {
82         for ( std::uintptr_t i = 0; i < numTasks; ++i ) {
83             ++m_tasksSpawned;
84             utils::ConcurrencyTracker ct;
85             m_taskGroup->run( m_taskFunctor );
86         }
87         ++m_threadsReady;
88     }
89 
90     void DeleteTaskGroup () {
91         delete m_taskGroup;
92         m_taskGroup = NULL;
93     }
94 
95     void Wait () {
96         while ( m_threadsReady != m_numThreads )
97             utils::yield();
98         const std::uintptr_t numSpawned = c_numTasks0 + c_numTasks1 * (m_numThreads - 1);
99         CHECK_MESSAGE( m_tasksSpawned == numSpawned, "Wrong number of spawned tasks. The test is broken" );
100         INFO("Max spawning parallelism is " << utils::ConcurrencyTracker::PeakParallelism() << "out of " << g_MaxConcurrency);
101         if ( m_sharingMode & ParallelWait ) {
102             m_barrier.wait( &utils::ConcurrencyTracker::Reset );
103             {
104                 utils::ConcurrencyTracker ct;
105                 m_taskGroup->wait();
106             }
107             if ( utils::ConcurrencyTracker::PeakParallelism() == 1 )
108                 WARN( "Warning: No parallel waiting detected in TestParallelWait" );
109             m_barrier.wait();
110         }
111         else
112             m_taskGroup->wait();
113         CHECK_MESSAGE( m_tasksSpawned == numSpawned, "No tasks should be spawned after wait starts. The test is broken" );
114         CHECK_MESSAGE( s_tasksExecuted == numSpawned, "Not all spawned tasks were executed" );
115     }
116 
117 public:
118     SharedGroupBodyImpl ( std::uintptr_t numThreads, std::uintptr_t sharingMode = 0 )
119         : m_numThreads(numThreads)
120         , m_sharingMode(sharingMode)
121         , m_taskGroup(NULL)
122         , m_barrier(numThreads)
123     {
124         CHECK_MESSAGE( m_numThreads > 1, "SharedGroupBody tests require concurrency" );
125         if ((m_sharingMode & VagabondGroup) && m_numThreads != 2) {
126             CHECK_MESSAGE(false, "In vagabond mode SharedGroupBody must be used with 2 threads only");
127         }
128         utils::ConcurrencyTracker::Reset();
129         s_tasksExecuted = 0;
130         m_tasksSpawned = 0;
131         m_threadsReady = 0;
132         m_taskFunctor.m_pOwner = this;
133     }
134 
135     void Run ( std::uintptr_t idx ) {
136         AssertLive();
137         if ( idx == 0 ) {
138             if (m_taskGroup || m_tasksSpawned) {
139                 CHECK_MESSAGE(false, "SharedGroupBody must be reset before reuse");
140             }
141             m_taskGroup = new task_group_type;
142             Spawn( c_numTasks0 );
143             Wait();
144             if ( m_sharingMode & VagabondGroup )
145                 m_barrier.wait();
146             else
147                 DeleteTaskGroup();
148         }
149         else {
150             while ( m_tasksSpawned == 0 )
151                 utils::yield();
152             CHECK_MESSAGE ( m_taskGroup, "Task group is not initialized");
153             Spawn (c_numTasks1);
154             if ( m_sharingMode & ParallelWait )
155                 Wait();
156             if ( m_sharingMode & VagabondGroup ) {
157                 CHECK_MESSAGE ( idx == 1, "In vagabond mode SharedGroupBody must be used with 2 threads only" );
158                 m_barrier.wait();
159                 DeleteTaskGroup();
160             }
161         }
162         AssertLive();
163     }
164 };
165 
166 template<typename task_group_type>
167 atomic_t SharedGroupBodyImpl<task_group_type>::s_tasksExecuted;
168 
169 template<typename task_group_type>
170 class  SharedGroupBody : utils::NoAssign, utils::NoAfterlife {
171     bool m_bOwner;
172     SharedGroupBodyImpl<task_group_type> *m_pImpl;
173 public:
174     SharedGroupBody ( std::uintptr_t numThreads, std::uintptr_t sharingMode = 0 )
175         : utils::NoAssign()
176         , utils::NoAfterlife()
177         , m_bOwner(true)
178         , m_pImpl( new SharedGroupBodyImpl<task_group_type>(numThreads, sharingMode) )
179     {}
180     SharedGroupBody ( const SharedGroupBody& src )
181         : utils::NoAssign()
182         , utils::NoAfterlife()
183         , m_bOwner(false)
184         , m_pImpl(src.m_pImpl)
185     {}
186     ~SharedGroupBody () {
187         if ( m_bOwner )
188             delete m_pImpl;
189     }
190     void operator() ( std::uintptr_t idx ) const {
191         // Wrap the functior into additional task group to enforce bounding.
192         task_group_type tg;
193         tg.run_and_wait([&] { m_pImpl->Run(idx); });
194     }
195 };
196 
197 template<typename task_group_type>
198 class RunAndWaitSyncronizationTestBody : utils::NoAssign {
199     utils::SpinBarrier& m_barrier;
200     std::atomic<bool>& m_completed;
201     task_group_type& m_tg;
202 public:
203     RunAndWaitSyncronizationTestBody(utils::SpinBarrier& barrier, std::atomic<bool>& completed, task_group_type& tg)
204         : m_barrier(barrier), m_completed(completed), m_tg(tg) {}
205 
206     void operator()() const {
207         m_barrier.wait();
208         utils::doDummyWork(100000);
209         m_completed = true;
210     }
211 
212     void operator()(int id) const {
213         if (id == 0) {
214             m_tg.run_and_wait(*this);
215         } else {
216             m_barrier.wait();
217             m_tg.wait();
218             CHECK_MESSAGE(m_completed, "A concurrent waiter has left the wait method earlier than work has finished");
219         }
220     }
221 };
222 
223 template<typename task_group_type>
224 void TestParallelSpawn () {
225     NativeParallelFor( g_MaxConcurrency, SharedGroupBody<task_group_type>(g_MaxConcurrency) );
226 }
227 
228 template<typename task_group_type>
229 void TestParallelWait () {
230     NativeParallelFor( g_MaxConcurrency, SharedGroupBody<task_group_type>(g_MaxConcurrency, ParallelWait) );
231 
232     utils::SpinBarrier barrier(g_MaxConcurrency);
233     std::atomic<bool> completed;
234     completed = false;
235     task_group_type tg;
236     RunAndWaitSyncronizationTestBody<task_group_type> b(barrier, completed, tg);
237     NativeParallelFor( g_MaxConcurrency, b );
238 }
239 
240 // Tests non-stack-bound task group (the group that is allocated by one thread and destroyed by the other)
241 template<typename task_group_type>
242 void TestVagabondGroup () {
243     NativeParallelFor( 2, SharedGroupBody<task_group_type>(2, VagabondGroup) );
244 }
245 
246 #include "common/memory_usage.h"
247 
248 template<typename task_group_type>
249 void TestThreadSafety() {
250     auto tests = [] {
251         for (int trail = 0; trail < 10; ++trail) {
252             TestParallelSpawn<task_group_type>();
253             TestParallelWait<task_group_type>();
254             TestVagabondGroup<task_group_type>();
255         }
256     };
257 
258     // Test and warm up allocator.
259     tests();
260 
261     // Ensure that cosumption is stabilized.
262     std::size_t initial = utils::GetMemoryUsage();
263     for (;;) {
264         tests();
265         std::size_t current = utils::GetMemoryUsage();
266         if (current <= initial) {
267             return;
268         }
269         initial = current;
270     }
271 }
272 //------------------------------------------------------------------------
273 // Common requisites of the Fibonacci tests
274 //------------------------------------------------------------------------
275 
276 const std::uintptr_t N = 20;
277 const std::uintptr_t F = 6765;
278 
279 atomic_t g_Sum;
280 
281 #define FIB_TEST_PROLOGUE() \
282     const unsigned numRepeats = g_MaxConcurrency * 4;    \
283     utils::ConcurrencyTracker::Reset()
284 
285 #define FIB_TEST_EPILOGUE(sum) \
286     CHECK(utils::ConcurrencyTracker::PeakParallelism() <= g_MaxConcurrency); \
287     CHECK( sum == numRepeats * F );
288 
289 
290 // Fibonacci tasks specified as functors
291 template<class task_group_type>
292 class FibTaskBase : utils::NoAssign, utils::NoAfterlife {
293 protected:
294     std::uintptr_t* m_pRes;
295     mutable std::uintptr_t m_Num;
296     virtual void impl() const = 0;
297 public:
298     FibTaskBase( std::uintptr_t* y, std::uintptr_t n ) : m_pRes(y), m_Num(n) {}
299     void operator()() const {
300         utils::ConcurrencyTracker ct;
301         AssertLive();
302         if( m_Num < 2 ) {
303             *m_pRes = m_Num;
304         } else {
305             impl();
306         }
307     }
308     virtual ~FibTaskBase() {}
309 };
310 
311 template<class task_group_type>
312 class FibTaskAsymmetricTreeWithFunctor : public FibTaskBase<task_group_type> {
313 public:
314     FibTaskAsymmetricTreeWithFunctor( std::uintptr_t* y, std::uintptr_t n ) : FibTaskBase<task_group_type>(y, n) {}
315     virtual void impl() const override {
316         std::uintptr_t x = ~0u;
317         task_group_type tg;
318         tg.run( FibTaskAsymmetricTreeWithFunctor(&x, this->m_Num-1) );
319         this->m_Num -= 2; tg.run_and_wait( *this );
320         *(this->m_pRes) += x;
321     }
322 };
323 
324 template<class task_group_type>
325 class FibTaskSymmetricTreeWithFunctor : public FibTaskBase<task_group_type> {
326 public:
327     FibTaskSymmetricTreeWithFunctor( std::uintptr_t* y, std::uintptr_t n ) : FibTaskBase<task_group_type>(y, n) {}
328     virtual void impl() const override {
329         std::uintptr_t x = ~0u,
330                y = ~0u;
331         task_group_type tg;
332         tg.run( FibTaskSymmetricTreeWithFunctor(&x, this->m_Num-1) );
333         tg.run( FibTaskSymmetricTreeWithFunctor(&y, this->m_Num-2) );
334         tg.wait();
335         *(this->m_pRes) = x + y;
336     }
337 };
338 
339 // Helper functions
340 template<class fib_task>
341 std::uintptr_t RunFibTask(std::uintptr_t n) {
342     std::uintptr_t res = ~0u;
343     fib_task(&res, n)();
344     return res;
345 }
346 
347 template<typename fib_task>
348 void RunFibTest() {
349     FIB_TEST_PROLOGUE();
350     std::uintptr_t sum = 0;
351     for( unsigned i = 0; i < numRepeats; ++i )
352         sum += RunFibTask<fib_task>(N);
353     FIB_TEST_EPILOGUE(sum);
354 }
355 
356 template<typename fib_task>
357 void FibFunctionNoArgs() {
358     g_Sum += RunFibTask<fib_task>(N);
359 }
360 
361 template<typename task_group_type>
362 void TestFibWithLambdas() {
363     FIB_TEST_PROLOGUE();
364     atomic_t sum;
365     sum = 0;
366     task_group_type tg;
367     for( unsigned i = 0; i < numRepeats; ++i )
368         tg.run( [&](){sum += RunFibTask<FibTaskSymmetricTreeWithFunctor<task_group_type> >(N);} );
369     tg.wait();
370     FIB_TEST_EPILOGUE(sum);
371 }
372 
373 template<typename task_group_type>
374 void TestFibWithFunctor() {
375     RunFibTest<FibTaskAsymmetricTreeWithFunctor<task_group_type> >();
376     RunFibTest< FibTaskSymmetricTreeWithFunctor<task_group_type> >();
377 }
378 
379 template<typename task_group_type>
380 void TestFibWithFunctionPtr() {
381     FIB_TEST_PROLOGUE();
382     g_Sum = 0;
383     task_group_type tg;
384     for( unsigned i = 0; i < numRepeats; ++i )
385         tg.run( &FibFunctionNoArgs<FibTaskSymmetricTreeWithFunctor<task_group_type> > );
386     tg.wait();
387     FIB_TEST_EPILOGUE(g_Sum);
388 }
389 
390 template<typename task_group_type>
391 void RunFibonacciTests() {
392     TestFibWithLambdas<task_group_type>();
393     TestFibWithFunctor<task_group_type>();
394     TestFibWithFunctionPtr<task_group_type>();
395 }
396 
397 class test_exception : public std::exception
398 {
399     const char* m_strDescription;
400 public:
401     test_exception ( const char* descr ) : m_strDescription(descr) {}
402 
403     const char* what() const throw() override { return m_strDescription; }
404 };
405 
406 using TestException = test_exception;
407 
408 #include <string.h>
409 
410 #define NUM_CHORES      512
411 #define NUM_GROUPS      64
412 #define SKIP_CHORES     (NUM_CHORES/4)
413 #define SKIP_GROUPS     (NUM_GROUPS/4)
414 #define EXCEPTION_DESCR1 "Test exception 1"
415 #define EXCEPTION_DESCR2 "Test exception 2"
416 
417 atomic_t g_ExceptionCount;
418 atomic_t g_TaskCount;
419 unsigned g_ExecutedAtCancellation;
420 bool g_Rethrow;
421 bool g_Throw;
422 
423 class ThrowingTask : utils::NoAssign, utils::NoAfterlife {
424     atomic_t &m_TaskCount;
425 public:
426     ThrowingTask( atomic_t& counter ) : m_TaskCount(counter) {}
427     void operator() () const {
428         utils::ConcurrencyTracker ct;
429         AssertLive();
430         if ( g_Throw ) {
431             if ( ++m_TaskCount == SKIP_CHORES )
432                 TBB_TEST_THROW(test_exception(EXCEPTION_DESCR1));
433             utils::yield();
434         }
435         else {
436             ++g_TaskCount;
437             while( !tbb::is_current_task_group_canceling() )
438                 utils::yield();
439         }
440     }
441 };
442 
443 inline void ResetGlobals ( bool bThrow, bool bRethrow ) {
444     g_Throw = bThrow;
445     g_Rethrow = bRethrow;
446     g_ExceptionCount = 0;
447     g_TaskCount = 0;
448     utils::ConcurrencyTracker::Reset();
449 }
450 
451 template<typename task_group_type>
452 void LaunchChildrenWithFunctor () {
453     atomic_t count;
454     count = 0;
455     task_group_type g;
456     for (unsigned i = 0; i < NUM_CHORES; ++i) {
457         if (i % 2 == 1) {
458             g.run(g.defer(ThrowingTask(count)));
459         } else
460         {
461             g.run(ThrowingTask(count));
462         }
463     }
464 #if TBB_USE_EXCEPTIONS
465     tbb::task_group_status status = tbb::not_complete;
466     bool exceptionCaught = false;
467     try {
468         status = g.wait();
469     } catch ( TestException& e ) {
470         CHECK_MESSAGE( e.what(), "Empty what() string" );
471         CHECK_MESSAGE( strcmp(e.what(), EXCEPTION_DESCR1) == 0, "Unknown exception" );
472         exceptionCaught = true;
473         ++g_ExceptionCount;
474     } catch( ... ) { CHECK_MESSAGE( false, "Unknown exception" ); }
475     if (g_Throw && !exceptionCaught && status != tbb::canceled) {
476         CHECK_MESSAGE(false, "No exception in the child task group");
477     }
478     if ( g_Rethrow && g_ExceptionCount > SKIP_GROUPS ) {
479         throw test_exception(EXCEPTION_DESCR2);
480     }
481 #else
482     g.wait();
483 #endif
484 }
485 
486 // Tests for cancellation and exception handling behavior
487 template<typename task_group_type>
488 void TestManualCancellationWithFunctor () {
489     ResetGlobals( false, false );
490     task_group_type tg;
491     for (unsigned i = 0; i < NUM_GROUPS; ++i) {
492         // TBB version does not require taking function address
493         if (i % 2 == 0) {
494             auto h = tg.defer(&LaunchChildrenWithFunctor<task_group_type>);
495             tg.run(std::move(h));
496         } else
497         {
498             tg.run(&LaunchChildrenWithFunctor<task_group_type>);
499         }
500     }
501     CHECK_MESSAGE ( !tbb::is_current_task_group_canceling(), "Unexpected cancellation" );
502     while ( g_MaxConcurrency > 1 && g_TaskCount == 0 )
503         utils::yield();
504     tg.cancel();
505     g_ExecutedAtCancellation = int(g_TaskCount);
506     tbb::task_group_status status = tg.wait();
507     CHECK_MESSAGE( status == tbb::canceled, "Task group reported invalid status." );
508     CHECK_MESSAGE( g_TaskCount <= NUM_GROUPS * NUM_CHORES, "Too many tasks reported. The test is broken" );
509     CHECK_MESSAGE( g_TaskCount < NUM_GROUPS * NUM_CHORES, "No tasks were cancelled. Cancellation model changed?" );
510     CHECK_MESSAGE( g_TaskCount <= g_ExecutedAtCancellation + utils::ConcurrencyTracker::PeakParallelism(), "Too many tasks survived cancellation" );
511 }
512 
513 #if TBB_USE_EXCEPTIONS
514 template<typename task_group_type>
515 void TestExceptionHandling1 () {
516     ResetGlobals( true, false );
517     task_group_type tg;
518     for( unsigned i = 0; i < NUM_GROUPS; ++i )
519         // TBB version does not require taking function address
520         tg.run( &LaunchChildrenWithFunctor<task_group_type> );
521     try {
522         tg.wait();
523     } catch ( ... ) {
524         CHECK_MESSAGE( false, "Unexpected exception" );
525     }
526     CHECK_MESSAGE( g_ExceptionCount <= NUM_GROUPS, "Too many exceptions from the child groups. The test is broken" );
527     CHECK_MESSAGE( g_ExceptionCount == NUM_GROUPS, "Not all child groups threw the exception" );
528 }
529 
530 template<typename task_group_type>
531 void TestExceptionHandling2 () {
532     ResetGlobals( true, true );
533     task_group_type tg;
534     bool exceptionCaught = false;
535     for( unsigned i = 0; i < NUM_GROUPS; ++i ) {
536         // TBB version does not require taking function address
537         tg.run( &LaunchChildrenWithFunctor<task_group_type> );
538     }
539     try {
540         tg.wait();
541     } catch ( TestException& e ) {
542         CHECK_MESSAGE( e.what(), "Empty what() string" );
543         CHECK_MESSAGE( strcmp(e.what(), EXCEPTION_DESCR2) == 0, "Unknown exception" );
544         exceptionCaught = true;
545     } catch( ... ) { CHECK_MESSAGE( false, "Unknown exception" ); }
546     CHECK_MESSAGE( exceptionCaught, "No exception thrown from the root task group" );
547     CHECK_MESSAGE( g_ExceptionCount >= SKIP_GROUPS, "Too few exceptions from the child groups. The test is broken" );
548     CHECK_MESSAGE( g_ExceptionCount <= NUM_GROUPS - SKIP_GROUPS, "Too many exceptions from the child groups. The test is broken" );
549     CHECK_MESSAGE( g_ExceptionCount < NUM_GROUPS - SKIP_GROUPS, "None of the child groups was cancelled" );
550 }
551 
552 template <typename task_group_type>
553 void TestExceptionHandling3() {
554     task_group_type tg;
555     try {
556         tg.run_and_wait([]() {
557             volatile bool suppress_unreachable_code_warning = true;
558             if (suppress_unreachable_code_warning) {
559                 throw 1;
560             }
561         });
562     } catch (int error) {
563         CHECK(error == 1);
564     } catch ( ... ) {
565         CHECK_MESSAGE( false, "Unexpected exception" );
566     }
567 }
568 
569 template<typename task_group_type>
570 class LaunchChildrenDriver {
571 public:
572     void Launch(task_group_type& tg) {
573         ResetGlobals(false, false);
574         for (unsigned i = 0; i < NUM_GROUPS; ++i) {
575             tg.run(LaunchChildrenWithFunctor<task_group_type>);
576         }
577         CHECK_MESSAGE(!tbb::is_current_task_group_canceling(), "Unexpected cancellation");
578         while (g_MaxConcurrency > 1 && g_TaskCount == 0)
579             utils::yield();
580     }
581 
582     void Finish() {
583         CHECK_MESSAGE(g_TaskCount <= NUM_GROUPS * NUM_CHORES, "Too many tasks reported. The test is broken");
584         CHECK_MESSAGE(g_TaskCount < NUM_GROUPS * NUM_CHORES, "No tasks were cancelled. Cancellation model changed?");
585         CHECK_MESSAGE(g_TaskCount <= g_ExecutedAtCancellation + g_MaxConcurrency, "Too many tasks survived cancellation");
586     }
587 }; // LaunchChildrenWithTaskHandleDriver
588 
589 template<typename task_group_type, bool Throw>
590 void TestMissingWait () {
591     bool exception_occurred = false,
592          unexpected_exception = false;
593     LaunchChildrenDriver<task_group_type> driver;
594     try {
595         task_group_type tg;
596         driver.Launch( tg );
597         volatile bool suppress_unreachable_code_warning = Throw;
598         if (suppress_unreachable_code_warning) {
599             throw int(); // Initiate stack unwinding
600         }
601     }
602     catch ( const tbb::missing_wait& e ) {
603         CHECK_MESSAGE( e.what(), "Error message is absent" );
604         exception_occurred = true;
605         unexpected_exception = Throw;
606     }
607     catch ( int ) {
608         exception_occurred = true;
609         unexpected_exception = !Throw;
610     }
611     catch ( ... ) {
612         exception_occurred = unexpected_exception = true;
613     }
614     CHECK( exception_occurred );
615     CHECK( !unexpected_exception );
616     driver.Finish();
617 }
618 #endif
619 
620 template<typename task_group_type>
621 void RunCancellationAndExceptionHandlingTests() {
622     TestManualCancellationWithFunctor<task_group_type>();
623 #if TBB_USE_EXCEPTIONS
624     TestExceptionHandling1<task_group_type>();
625     TestExceptionHandling2<task_group_type>();
626     TestExceptionHandling3<task_group_type>();
627     TestMissingWait<task_group_type, true>();
628     TestMissingWait<task_group_type, false>();
629 #endif
630 }
631 
632 void EmptyFunction () {}
633 
634 struct TestFunctor {
635     void operator()() { CHECK_MESSAGE( false, "Non-const operator called" ); }
636     void operator()() const { /* library requires this overload only */ }
637 };
638 
639 template<typename task_group_type>
640 void TestConstantFunctorRequirement() {
641     task_group_type g;
642     TestFunctor tf;
643     g.run( tf ); g.wait();
644     g.run_and_wait( tf );
645 }
646 
647 //------------------------------------------------------------------------
648 namespace TestMoveSemanticsNS {
649     struct TestFunctor {
650         void operator()() const {};
651     };
652 
653     struct MoveOnlyFunctor : utils::MoveOnly, TestFunctor {
654         MoveOnlyFunctor() : utils::MoveOnly() {};
655         MoveOnlyFunctor(MoveOnlyFunctor&& other) : utils::MoveOnly(std::move(other)) {};
656     };
657 
658     struct MovePreferableFunctor : utils::Movable, TestFunctor {
659         MovePreferableFunctor() : utils::Movable() {};
660         MovePreferableFunctor(MovePreferableFunctor&& other) : utils::Movable(std::move(other)) {};
661         MovePreferableFunctor(const MovePreferableFunctor& other) : utils::Movable(other) {};
662     };
663 
664     struct NoMoveNoCopyFunctor : utils::NoCopy, TestFunctor {
665         NoMoveNoCopyFunctor() : utils::NoCopy() {};
666         // mv ctor is not allowed as cp ctor from parent utils::NoCopy
667     private:
668         NoMoveNoCopyFunctor(NoMoveNoCopyFunctor&&);
669     };
670 
671      template<typename task_group_type>
672     void TestBareFunctors() {
673         task_group_type tg;
674         MovePreferableFunctor mpf;
675         // run_and_wait() doesn't have any copies or moves of arguments inside the impl
676         tg.run_and_wait( NoMoveNoCopyFunctor() );
677 
678         tg.run( MoveOnlyFunctor() );
679         tg.wait();
680 
681         tg.run( mpf );
682         tg.wait();
683         CHECK_MESSAGE(mpf.alive, "object was moved when was passed by lval");
684         mpf.Reset();
685 
686         tg.run( std::move(mpf) );
687         tg.wait();
688         CHECK_MESSAGE(!mpf.alive, "object was copied when was passed by rval");
689         mpf.Reset();
690     }
691 }
692 
693 template<typename task_group_type>
694 void TestMoveSemantics() {
695     TestMoveSemanticsNS::TestBareFunctors<task_group_type>();
696 }
697 //------------------------------------------------------------------------
698 
699 // TODO: TBB_REVAMP_TODO - enable when ETS is available
700 #if TBBTEST_USE_TBB && TBB_PREVIEW_ISOLATED_TASK_GROUP
701 namespace TestIsolationNS {
702     class DummyFunctor {
703     public:
704         DummyFunctor() {}
705         void operator()() const {
706             for ( volatile int j = 0; j < 10; ++j ) {}
707         }
708     };
709 
710     template<typename task_group_type>
711     class ParForBody {
712         task_group_type& m_tg;
713         std::atomic<bool>& m_preserved;
714         tbb::enumerable_thread_specific<int>& m_ets;
715     public:
716         ParForBody(
717             task_group_type& tg,
718             std::atomic<bool>& preserved,
719             tbb::enumerable_thread_specific<int>& ets
720         ) : m_tg(tg), m_preserved(preserved), m_ets(ets) {}
721 
722         void operator()(int) const {
723             if (++m_ets.local() > 1) m_preserved = false;
724 
725             for (int i = 0; i < 1000; ++i)
726                 m_tg.run(DummyFunctor());
727             m_tg.wait();
728             m_tg.run_and_wait(DummyFunctor());
729 
730             --m_ets.local();
731         }
732     };
733 
734     template<typename task_group_type>
735     void CheckIsolation(bool isolation_is_expected) {
736         task_group_type tg;
737         std::atomic<bool> isolation_is_preserved;
738         isolation_is_preserved = true;
739         tbb::enumerable_thread_specific<int> ets(0);
740 
741         tbb::parallel_for(0, 100, ParForBody<task_group_type>(tg, isolation_is_preserved, ets));
742 
743         ASSERT(
744             isolation_is_expected == isolation_is_preserved,
745             "Actual and expected isolation-related behaviours are different"
746         );
747     }
748 
749     // Should be called only when > 1 thread is used, because otherwise isolation is guaranteed to take place
750     void TestIsolation() {
751         CheckIsolation<tbb::task_group>(false);
752         CheckIsolation<tbb::isolated_task_group>(true);
753     }
754 }
755 #endif
756 
757 #if __TBB_USE_ADDRESS_SANITIZER
758 //! Test for thread safety for the task_group
759 //! \brief \ref error_guessing \ref resource_usage
760 TEST_CASE("Memory leaks test is not applicable under ASAN\n" * doctest::skip(true)) {}
761 #else
762 //! Test for thread safety for the task_group
763 //! \brief \ref error_guessing \ref resource_usage
764 TEST_CASE("Thread safety test for the task group") {
765     if (tbb::this_task_arena::max_concurrency() < 2) {
766         // The test requires more than one thread to check thread safety
767         return;
768     }
769     for (unsigned p=MinThread; p <= MaxThread; ++p) {
770         if (p < 2) {
771             continue;
772         }
773         tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p);
774         g_MaxConcurrency = p;
775         TestThreadSafety<tbb::task_group>();
776     }
777 }
778 #endif
779 
780 //! Fibonacci test for task group
781 //! \brief \ref interface \ref requirement
782 TEST_CASE("Fibonacci test for the task group") {
783     for (unsigned p=MinThread; p <= MaxThread; ++p) {
784         tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p);
785         g_MaxConcurrency = p;
786         RunFibonacciTests<tbb::task_group>();
787     }
788 }
789 
790 //! Cancellation and exception test for the task group
791 //! \brief \ref interface \ref requirement
792 TEST_CASE("Cancellation and exception test for the task group") {
793     for (unsigned p = MinThread; p <= MaxThread; ++p) {
794         tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p);
795         tbb::task_arena a(p);
796         g_MaxConcurrency = p;
797         a.execute([] {
798             RunCancellationAndExceptionHandlingTests<tbb::task_group>();
799         });
800     }
801 }
802 
803 //! Constant functor test for the task group
804 //! \brief \ref interface \ref negative
805 TEST_CASE("Constant functor test for the task group") {
806     for (unsigned p=MinThread; p <= MaxThread; ++p) {
807         tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p);
808         g_MaxConcurrency = p;
809         TestConstantFunctorRequirement<tbb::task_group>();
810     }
811 }
812 
813 //! Move semantics test for the task group
814 //! \brief \ref interface \ref requirement
815 TEST_CASE("Move semantics test for the task group") {
816     for (unsigned p=MinThread; p <= MaxThread; ++p) {
817         tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p);
818         g_MaxConcurrency = p;
819         TestMoveSemantics<tbb::task_group>();
820     }
821 }
822 
823 #if TBB_PREVIEW_ISOLATED_TASK_GROUP
824 
825 #if __TBB_USE_ADDRESS_SANITIZER
826 //! Test for thread safety for the isolated_task_group
827 //! \brief \ref error_guessing
828 TEST_CASE("Memory leaks test is not applicable under ASAN\n" * doctest::skip(true)) {}
829 #else
830 //! Test for thread safety for the isolated_task_group
831 //! \brief \ref error_guessing
832 TEST_CASE("Thread safety test for the isolated task group") {
833     if (tbb::this_task_arena::max_concurrency() < 2) {
834         // The test requires more than one thread to check thread safety
835         return;
836     }
837     for (unsigned p=MinThread; p <= MaxThread; ++p) {
838         if (p < 2) {
839             continue;
840         }
841         tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p);
842         g_MaxConcurrency = p;
843         TestThreadSafety<tbb::isolated_task_group>();
844     }
845 }
846 #endif
847 
848 //! Cancellation and exception test for the isolated task group
849 //! \brief \ref interface \ref requirement
850 TEST_CASE("Fibonacci test for the isolated task group") {
851     for (unsigned p=MinThread; p <= MaxThread; ++p) {
852         tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p);
853         g_MaxConcurrency = p;
854         RunFibonacciTests<tbb::isolated_task_group>();
855     }
856 }
857 
858 //! Cancellation and exception test for the isolated task group
859 //! \brief \ref interface \ref requirement
860 TEST_CASE("Cancellation and exception test for the isolated task group") {
861     for (unsigned p=MinThread; p <= MaxThread; ++p) {
862         tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p);
863         g_MaxConcurrency = p;
864         RunCancellationAndExceptionHandlingTests<tbb::isolated_task_group>();
865     }
866 }
867 
868 //! Constant functor test for the isolated task group.
869 //! \brief \ref interface \ref negative
870 TEST_CASE("Constant functor test for the isolated task group") {
871     for (unsigned p=MinThread; p <= MaxThread; ++p) {
872         tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p);
873         g_MaxConcurrency = p;
874         TestConstantFunctorRequirement<tbb::isolated_task_group>();
875     }
876 }
877 
878 //! Move semantics test for the isolated task group.
879 //! \brief \ref interface \ref requirement
880 TEST_CASE("Move semantics test for the isolated task group") {
881     for (unsigned p=MinThread; p <= MaxThread; ++p) {
882         tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p);
883         g_MaxConcurrency = p;
884         TestMoveSemantics<tbb::isolated_task_group>();
885     }
886 }
887 
888 //TODO: add test void isolated_task_group::run(d2::task_handle&& h) and isolated_task_group::::run_and_wait(d2::task_handle&& h)
889 #endif /* TBB_PREVIEW_ISOLATED_TASK_GROUP */
890 
891 void run_deep_stealing(tbb::task_group& tg1, tbb::task_group& tg2, int num_tasks, std::atomic<int>& tasks_executed) {
892     for (int i = 0; i < num_tasks; ++i) {
893         tg2.run([&tg1, &tasks_executed] {
894             volatile char consume_stack[1000]{};
895             ++tasks_executed;
896             tg1.wait();
897             utils::suppress_unused_warning(consume_stack);
898         });
899     }
900 }
901 
902 // TODO: move to the conformance test
903 //! Test for stack overflow avoidance mechanism.
904 //! \brief \ref requirement
905 TEST_CASE("Test for stack overflow avoidance mechanism") {
906     if (tbb::this_task_arena::max_concurrency() < 2) {
907         return;
908     }
909 
910     tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, 2);
911     tbb::task_group tg1;
912     tbb::task_group tg2;
913     std::atomic<int> tasks_executed{};
914     tg1.run_and_wait([&tg1, &tg2, &tasks_executed] {
915         run_deep_stealing(tg1, tg2, 10000, tasks_executed);
916         while (tasks_executed < 100) {
917             // Some stealing is expected to happen.
918             utils::yield();
919         }
920         CHECK(tasks_executed < 10000);
921     });
922     tg2.wait();
923     CHECK(tasks_executed == 10000);
924 }
925 
926 //! Test for stack overflow avoidance mechanism.
927 //! \brief \ref error_guessing
928 TEST_CASE("Test for stack overflow avoidance mechanism within arena") {
929     if (tbb::this_task_arena::max_concurrency() < 2) {
930         return;
931     }
932 
933     tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, 2);
934     tbb::task_group tg1;
935     tbb::task_group tg2;
936     std::atomic<int> tasks_executed{};
937 
938     // Determine nested task execution limit.
939     int second_thread_executed{};
940     tg1.run_and_wait([&tg1, &tg2, &tasks_executed, &second_thread_executed] {
941         run_deep_stealing(tg1, tg2, 10000, tasks_executed);
942         do {
943             second_thread_executed = tasks_executed;
944             utils::Sleep(10);
945         } while (second_thread_executed < 100 || second_thread_executed != tasks_executed);
946         CHECK(tasks_executed < 10000);
947     });
948     tg2.wait();
949     CHECK(tasks_executed == 10000);
950 
951     tasks_executed = 0;
952     tbb::task_arena a(2, 2);
953     tg1.run_and_wait([&a, &tg1, &tg2, &tasks_executed, second_thread_executed] {
954         run_deep_stealing(tg1, tg2, second_thread_executed-1, tasks_executed);
955         while (tasks_executed < second_thread_executed-1) {
956             // Wait until the second thread near the limit.
957             utils::yield();
958         }
959         tg2.run([&a, &tg1, &tasks_executed] {
960             a.execute([&tg1, &tasks_executed] {
961                 volatile char consume_stack[1000]{};
962                 ++tasks_executed;
963                 tg1.wait();
964                 utils::suppress_unused_warning(consume_stack);
965             });
966         });
967         while (tasks_executed < second_thread_executed) {
968             // Wait until the second joins the arena.
969             utils::yield();
970         }
971         a.execute([&tg1, &tg2, &tasks_executed] {
972             run_deep_stealing(tg1, tg2, 10000, tasks_executed);
973         });
974         int currently_executed{};
975         do {
976             currently_executed = tasks_executed;
977             utils::Sleep(10);
978         } while (currently_executed != tasks_executed);
979         CHECK(tasks_executed < 10000 + second_thread_executed);
980     });
981     a.execute([&tg2] {
982         tg2.wait();
983     });
984     CHECK(tasks_executed == 10000 + second_thread_executed);
985 }
986 
987 //! Test checks that we can submit work to task_group asynchronously with waiting.
988 //! \brief \ref regression
989 TEST_CASE("Async task group") {
990     int num_threads = tbb::this_task_arena::max_concurrency();
991     if (num_threads < 3) {
992         // The test requires at least 2 worker threads
993         return;
994     }
995     tbb::task_arena a(2*num_threads, num_threads);
996     utils::SpinBarrier barrier(num_threads + 2);
997     tbb::task_group tg[2];
998     std::atomic<bool> finished[2]{};
999     finished[0] = false; finished[1] = false;
1000     for (int i = 0; i < 2; ++i) {
1001         a.enqueue([i, &tg, &finished, &barrier] {
1002             barrier.wait();
1003             for (int j = 0; j < 10000; ++j) {
1004                 tg[i].run([] {});
1005                 utils::yield();
1006             }
1007             finished[i] = true;
1008         });
1009     }
1010     utils::NativeParallelFor(num_threads, [&](int idx) {
1011         barrier.wait();
1012         a.execute([idx, &tg, &finished] {
1013             std::size_t counter{};
1014             while (!finished[idx%2]) {
1015                 tg[idx%2].wait();
1016                 if (counter++ % 16 == 0) utils::yield();
1017             }
1018             tg[idx%2].wait();
1019         });
1020     });
1021 }
1022 
1023 struct SelfRunner {
1024     tbb::task_group& m_tg;
1025     std::atomic<unsigned>& count;
1026     void operator()() const {
1027         unsigned previous_count = count.fetch_sub(1);
1028         if (previous_count > 1)
1029             m_tg.run( *this );
1030     }
1031 };
1032 
1033 //! Submit work to single task_group instance from inside the work
1034 //! \brief \ref error_guessing
1035 TEST_CASE("Run self using same task_group instance") {
1036     const unsigned num = 10;
1037     std::atomic<unsigned> count{num};
1038     tbb::task_group tg;
1039     SelfRunner uf{tg, count};
1040     tg.run( uf );
1041     tg.wait();
1042     CHECK_MESSAGE(
1043         count == 0,
1044         "Not all tasks were spawned from inside the functor running within task_group."
1045     );
1046 }
1047 
1048 //TODO: move to some common place to share with conformance tests
1049 namespace accept_task_group_context {
1050 
1051 template <typename TaskGroup, typename CancelF, typename WaitF>
1052 void run_cancellation_use_case(CancelF&& cancel, WaitF&& wait) {
1053     std::atomic<bool> outer_cancelled{false};
1054     std::atomic<unsigned> count{13};
1055 
1056     tbb::task_group_context inner_ctx(tbb::task_group_context::isolated);
1057     TaskGroup inner_tg(inner_ctx);
1058 
1059     tbb::task_group outer_tg;
1060     auto outer_tg_task = [&] {
1061         inner_tg.run([&] {
1062             utils::SpinWaitUntilEq(outer_cancelled, true);
1063             inner_tg.run( SelfRunner{inner_tg, count} );
1064         });
1065 
1066         utils::try_call([&] {
1067             std::forward<CancelF>(cancel)(outer_tg);
1068         }).on_completion([&] {
1069             outer_cancelled = true;
1070         });
1071     };
1072 
1073     auto check = [&] {
1074         tbb::task_group_status outer_status = tbb::task_group_status::not_complete;
1075         outer_status = std::forward<WaitF>(wait)(outer_tg);
1076         CHECK_MESSAGE(
1077             outer_status == tbb::task_group_status::canceled,
1078             "Outer task group should have been cancelled."
1079         );
1080 
1081         tbb::task_group_status inner_status = inner_tg.wait();
1082         CHECK_MESSAGE(
1083             inner_status == tbb::task_group_status::complete,
1084             "Inner task group should have completed despite the cancellation of the outer one."
1085         );
1086 
1087         CHECK_MESSAGE(0 == count, "Some of the inner group tasks were not executed.");
1088     };
1089 
1090     outer_tg.run(outer_tg_task);
1091     check();
1092 }
1093 
1094 template <typename TaskGroup>
1095 void test() {
1096     run_cancellation_use_case<TaskGroup>(
1097         [](tbb::task_group& outer) { outer.cancel(); },
1098         [](tbb::task_group& outer) { return outer.wait(); }
1099     );
1100 
1101 #if TBB_USE_EXCEPTIONS
1102     run_cancellation_use_case<TaskGroup>(
1103         [](tbb::task_group& /*outer*/) {
1104             volatile bool suppress_unreachable_code_warning = true;
1105             if (suppress_unreachable_code_warning) {
1106                 throw int();
1107             }
1108         },
1109         [](tbb::task_group& outer) {
1110             try {
1111                 outer.wait();
1112                 return tbb::task_group_status::complete;
1113             } catch(const int&) {
1114                 return tbb::task_group_status::canceled;
1115             }
1116         }
1117     );
1118 #endif
1119 }
1120 
1121 } // namespace accept_task_group_context
1122 
1123 //! Respect task_group_context passed from outside
1124 //! \brief \ref interface \ref requirement
1125 TEST_CASE("Respect task_group_context passed from outside") {
1126 #if TBB_PREVIEW_ISOLATED_TASK_GROUP
1127     accept_task_group_context::test<tbb::isolated_task_group>();
1128 #endif
1129 }
1130 
1131 #if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS
1132 //! The test for task_handle inside other task waiting with run
1133 //! \brief \ref requirement
1134 TEST_CASE("Task handle for scheduler bypass"){
1135     tbb::task_group tg;
1136     std::atomic<bool> run {false};
1137 
1138     tg.run([&]{
1139         return tg.defer([&]{
1140             run = true;
1141         });
1142     });
1143 
1144     tg.wait();
1145     CHECK_MESSAGE(run == true, "task handle returned by user lambda (bypassed) should be run");
1146 }
1147 
1148 //! The test for task_handle inside other task waiting with run_and_wait
1149 //! \brief \ref requirement
1150 TEST_CASE("Task handle for scheduler bypass via run_and_wait"){
1151     tbb::task_group tg;
1152     std::atomic<bool> run {false};
1153 
1154     tg.run_and_wait([&]{
1155         return tg.defer([&]{
1156             run = true;
1157         });
1158     });
1159 
1160     CHECK_MESSAGE(run == true, "task handle returned by user lambda (bypassed) should be run");
1161 }
1162 #endif //__TBB_PREVIEW_TASK_GROUP_EXTENSIONS
1163 
1164 #if TBB_USE_EXCEPTIONS
1165 //As these tests are against behavior marked by spec as undefined, they can not be put into conformance tests
1166 
1167 //! The test for error in scheduling empty task_handle
1168 //! \brief \ref requirement
1169 TEST_CASE("Empty task_handle cannot be scheduled"
1170         * doctest::should_fail()    //Test needs to revised as implementation uses assertions instead of exceptions
1171         * doctest::skip()           //skip the test for now, to not pollute the test log
1172 ){
1173     tbb::task_group tg;
1174 
1175     CHECK_THROWS_WITH_AS(tg.run(tbb::task_handle{}), "Attempt to schedule empty task_handle", std::runtime_error);
1176 }
1177 
1178 //! The test for error in task_handle being scheduled into task_group different from one it was created from
1179 //! \brief \ref requirement
1180 TEST_CASE("task_handle cannot be scheduled into different task_group"
1181         * doctest::should_fail()    //Test needs to revised as implementation uses assertions instead of exceptions
1182         * doctest::skip()           //skip the test for now, to not pollute the test log
1183 ){
1184     tbb::task_group tg;
1185     tbb::task_group tg1;
1186 
1187     CHECK_THROWS_WITH_AS(tg1.run(tg.defer([]{})), "Attempt to schedule task_handle into different task_group", std::runtime_error);
1188 }
1189 
1190 //! The test for error in task_handle being scheduled into task_group different from one it was created from
1191 //! \brief \ref requirement
1192 TEST_CASE("task_handle cannot be scheduled into other task_group of the same context"
1193         * doctest::should_fail()    //Implementation is no there yet, as it is not clear that is the expected behavior
1194         * doctest::skip()           //skip the test for now, to not pollute the test log
1195 )
1196 {
1197     tbb::task_group_context ctx;
1198 
1199     tbb::task_group tg(ctx);
1200     tbb::task_group tg1(ctx);
1201 
1202     CHECK_NOTHROW(tg.run(tg.defer([]{})));
1203     CHECK_THROWS_WITH_AS(tg1.run(tg.defer([]{})), "Attempt to schedule task_handle into different task_group", std::runtime_error);
1204 }
1205 
1206 #endif // TBB_USE_EXCEPTIONS
1207 
1208