xref: /oneTBB/test/tbb/test_task_group.cpp (revision c4a799df)
1 /*
2     Copyright (c) 2005-2023 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "common/test.h"
18 #include "common/utils.h"
19 #include "oneapi/tbb/detail/_config.h"
20 #include "tbb/global_control.h"
21 
22 #include "tbb/task_group.h"
23 
24 #include "common/concurrency_tracker.h"
25 
26 #include <atomic>
27 #include <stdexcept>
28 
29 //! \file test_task_group.cpp
30 //! \brief Test for [scheduler.task_group scheduler.task_group_status] specification
31 
32 unsigned g_MaxConcurrency = 4;
33 using atomic_t = std::atomic<std::uintptr_t>;
34 unsigned MinThread = 1;
35 unsigned MaxThread = 4;
36 
37 //------------------------------------------------------------------------
38 // Tests for the thread safety of the task_group manipulations
39 //------------------------------------------------------------------------
40 
41 #include "common/spin_barrier.h"
42 
43 enum SharingMode {
44     VagabondGroup = 1,
45     ParallelWait = 2
46 };
47 
48 template<typename task_group_type>
49 class SharedGroupBodyImpl : utils::NoCopy, utils::NoAfterlife {
50     static const std::uintptr_t c_numTasks0 = 4096,
51                         c_numTasks1 = 1024;
52 
53     const std::uintptr_t m_numThreads;
54     const std::uintptr_t m_sharingMode;
55 
56     task_group_type *m_taskGroup;
57     atomic_t m_tasksSpawned,
58              m_threadsReady;
59     utils::SpinBarrier m_barrier;
60 
61     static atomic_t s_tasksExecuted;
62 
63     struct TaskFunctor {
64         SharedGroupBodyImpl *m_pOwner;
65         void operator () () const {
66             if ( m_pOwner->m_sharingMode & ParallelWait ) {
67                 while ( utils::ConcurrencyTracker::PeakParallelism() < m_pOwner->m_numThreads )
68                     utils::yield();
69             }
70             ++s_tasksExecuted;
71         }
72     };
73 
74     TaskFunctor m_taskFunctor;
75 
76     void Spawn ( std::uintptr_t numTasks ) {
77         for ( std::uintptr_t i = 0; i < numTasks; ++i ) {
78             ++m_tasksSpawned;
79             utils::ConcurrencyTracker ct;
80             m_taskGroup->run( m_taskFunctor );
81         }
82         ++m_threadsReady;
83     }
84 
85     void DeleteTaskGroup () {
86         delete m_taskGroup;
87         m_taskGroup = nullptr;
88     }
89 
90     void Wait () {
91         while ( m_threadsReady != m_numThreads )
92             utils::yield();
93         const std::uintptr_t numSpawned = c_numTasks0 + c_numTasks1 * (m_numThreads - 1);
94         CHECK_MESSAGE( m_tasksSpawned == numSpawned, "Wrong number of spawned tasks. The test is broken" );
95         INFO("Max spawning parallelism is " << utils::ConcurrencyTracker::PeakParallelism() << "out of " << g_MaxConcurrency);
96         if ( m_sharingMode & ParallelWait ) {
97             m_barrier.wait( &utils::ConcurrencyTracker::Reset );
98             {
99                 utils::ConcurrencyTracker ct;
100                 m_taskGroup->wait();
101             }
102             if ( utils::ConcurrencyTracker::PeakParallelism() == 1 ) {
103                 const char* msg = "Warning: No parallel waiting detected in TestParallelWait";
104                 WARN( msg );
105             }
106             m_barrier.wait();
107         }
108         else
109             m_taskGroup->wait();
110         CHECK_MESSAGE( m_tasksSpawned == numSpawned, "No tasks should be spawned after wait starts. The test is broken" );
111         CHECK_MESSAGE( s_tasksExecuted == numSpawned, "Not all spawned tasks were executed" );
112     }
113 
114 public:
115     SharedGroupBodyImpl ( std::uintptr_t numThreads, std::uintptr_t sharingMode = 0 )
116         : m_numThreads(numThreads)
117         , m_sharingMode(sharingMode)
118         , m_taskGroup(nullptr)
119         , m_barrier(numThreads)
120     {
121         CHECK_MESSAGE( m_numThreads > 1, "SharedGroupBody tests require concurrency" );
122         if ((m_sharingMode & VagabondGroup) && m_numThreads != 2) {
123             CHECK_MESSAGE(false, "In vagabond mode SharedGroupBody must be used with 2 threads only");
124         }
125         utils::ConcurrencyTracker::Reset();
126         s_tasksExecuted = 0;
127         m_tasksSpawned = 0;
128         m_threadsReady = 0;
129         m_taskFunctor.m_pOwner = this;
130     }
131 
132     void Run ( std::uintptr_t idx ) {
133         AssertLive();
134         if ( idx == 0 ) {
135             if (m_taskGroup || m_tasksSpawned) {
136                 CHECK_MESSAGE(false, "SharedGroupBody must be reset before reuse");
137             }
138             m_taskGroup = new task_group_type;
139             Spawn( c_numTasks0 );
140             Wait();
141             if ( m_sharingMode & VagabondGroup )
142                 m_barrier.wait();
143             else
144                 DeleteTaskGroup();
145         }
146         else {
147             while ( m_tasksSpawned == 0 )
148                 utils::yield();
149             CHECK_MESSAGE ( m_taskGroup, "Task group is not initialized");
150             Spawn (c_numTasks1);
151             if ( m_sharingMode & ParallelWait )
152                 Wait();
153             if ( m_sharingMode & VagabondGroup ) {
154                 CHECK_MESSAGE ( idx == 1, "In vagabond mode SharedGroupBody must be used with 2 threads only" );
155                 m_barrier.wait();
156                 DeleteTaskGroup();
157             }
158         }
159         AssertLive();
160     }
161 };
162 
163 template<typename task_group_type>
164 atomic_t SharedGroupBodyImpl<task_group_type>::s_tasksExecuted;
165 
166 template<typename task_group_type>
167 class  SharedGroupBody : utils::NoAssign, utils::NoAfterlife {
168     bool m_bOwner;
169     SharedGroupBodyImpl<task_group_type> *m_pImpl;
170 public:
171     SharedGroupBody ( std::uintptr_t numThreads, std::uintptr_t sharingMode = 0 )
172         : utils::NoAssign()
173         , utils::NoAfterlife()
174         , m_bOwner(true)
175         , m_pImpl( new SharedGroupBodyImpl<task_group_type>(numThreads, sharingMode) )
176     {}
177     SharedGroupBody ( const SharedGroupBody& src )
178         : utils::NoAssign()
179         , utils::NoAfterlife()
180         , m_bOwner(false)
181         , m_pImpl(src.m_pImpl)
182     {}
183     ~SharedGroupBody () {
184         if ( m_bOwner )
185             delete m_pImpl;
186     }
187     void operator() ( std::uintptr_t idx ) const {
188         // Wrap the functior into additional task group to enforce bounding.
189         task_group_type tg;
190         tg.run_and_wait([&] { m_pImpl->Run(idx); });
191     }
192 };
193 
194 template<typename task_group_type>
195 class RunAndWaitSyncronizationTestBody : utils::NoAssign {
196     utils::SpinBarrier& m_barrier;
197     std::atomic<bool>& m_completed;
198     task_group_type& m_tg;
199 public:
200     RunAndWaitSyncronizationTestBody(utils::SpinBarrier& barrier, std::atomic<bool>& completed, task_group_type& tg)
201         : m_barrier(barrier), m_completed(completed), m_tg(tg) {}
202 
203     void operator()() const {
204         m_barrier.wait();
205         utils::doDummyWork(100000);
206         m_completed = true;
207     }
208 
209     void operator()(int id) const {
210         if (id == 0) {
211             m_tg.run_and_wait(*this);
212         } else {
213             m_barrier.wait();
214             m_tg.wait();
215             CHECK_MESSAGE(m_completed, "A concurrent waiter has left the wait method earlier than work has finished");
216         }
217     }
218 };
219 
220 template<typename task_group_type>
221 void TestParallelSpawn () {
222     NativeParallelFor( g_MaxConcurrency, SharedGroupBody<task_group_type>(g_MaxConcurrency) );
223 }
224 
225 template<typename task_group_type>
226 void TestParallelWait () {
227     NativeParallelFor( g_MaxConcurrency, SharedGroupBody<task_group_type>(g_MaxConcurrency, ParallelWait) );
228 
229     utils::SpinBarrier barrier(g_MaxConcurrency);
230     std::atomic<bool> completed;
231     completed = false;
232     task_group_type tg;
233     RunAndWaitSyncronizationTestBody<task_group_type> b(barrier, completed, tg);
234     NativeParallelFor( g_MaxConcurrency, b );
235 }
236 
237 // Tests non-stack-bound task group (the group that is allocated by one thread and destroyed by the other)
238 template<typename task_group_type>
239 void TestVagabondGroup () {
240     NativeParallelFor( 2, SharedGroupBody<task_group_type>(2, VagabondGroup) );
241 }
242 
243 #include "common/memory_usage.h"
244 
245 template<typename task_group_type>
246 void TestThreadSafety() {
247     auto tests = [] {
248         for (int trail = 0; trail < 10; ++trail) {
249             TestParallelSpawn<task_group_type>();
250             TestParallelWait<task_group_type>();
251             TestVagabondGroup<task_group_type>();
252         }
253     };
254 
255     // Test and warm up allocator.
256     tests();
257 
258     // Ensure that cosumption is stabilized.
259     std::size_t initial = utils::GetMemoryUsage();
260     for (;;) {
261         tests();
262         std::size_t current = utils::GetMemoryUsage();
263         if (current <= initial) {
264             return;
265         }
266         initial = current;
267     }
268 }
269 //------------------------------------------------------------------------
270 // Common requisites of the Fibonacci tests
271 //------------------------------------------------------------------------
272 
273 const std::uintptr_t N = 20;
274 const std::uintptr_t F = 6765;
275 
276 atomic_t g_Sum;
277 
278 #define FIB_TEST_PROLOGUE() \
279     const unsigned numRepeats = g_MaxConcurrency * 4;    \
280     utils::ConcurrencyTracker::Reset()
281 
282 #define FIB_TEST_EPILOGUE(sum) \
283     CHECK(utils::ConcurrencyTracker::PeakParallelism() <= g_MaxConcurrency); \
284     CHECK( sum == numRepeats * F );
285 
286 
287 // Fibonacci tasks specified as functors
288 template<class task_group_type>
289 class FibTaskBase : utils::NoAssign, utils::NoAfterlife {
290 protected:
291     std::uintptr_t* m_pRes;
292     mutable std::uintptr_t m_Num;
293     virtual void impl() const = 0;
294 public:
295     FibTaskBase( std::uintptr_t* y, std::uintptr_t n ) : m_pRes(y), m_Num(n) {}
296     void operator()() const {
297         utils::ConcurrencyTracker ct;
298         AssertLive();
299         if( m_Num < 2 ) {
300             *m_pRes = m_Num;
301         } else {
302             impl();
303         }
304     }
305     virtual ~FibTaskBase() {}
306 };
307 
308 template<class task_group_type>
309 class FibTaskAsymmetricTreeWithFunctor : public FibTaskBase<task_group_type> {
310 public:
311     FibTaskAsymmetricTreeWithFunctor( std::uintptr_t* y, std::uintptr_t n ) : FibTaskBase<task_group_type>(y, n) {}
312     virtual void impl() const override {
313         std::uintptr_t x = ~0u;
314         task_group_type tg;
315         tg.run( FibTaskAsymmetricTreeWithFunctor(&x, this->m_Num-1) );
316         this->m_Num -= 2; tg.run_and_wait( *this );
317         *(this->m_pRes) += x;
318     }
319 };
320 
321 template<class task_group_type>
322 class FibTaskSymmetricTreeWithFunctor : public FibTaskBase<task_group_type> {
323 public:
324     FibTaskSymmetricTreeWithFunctor( std::uintptr_t* y, std::uintptr_t n ) : FibTaskBase<task_group_type>(y, n) {}
325     virtual void impl() const override {
326         std::uintptr_t x = ~0u,
327                y = ~0u;
328         task_group_type tg;
329         tg.run( FibTaskSymmetricTreeWithFunctor(&x, this->m_Num-1) );
330         tg.run( FibTaskSymmetricTreeWithFunctor(&y, this->m_Num-2) );
331         tg.wait();
332         *(this->m_pRes) = x + y;
333     }
334 };
335 
336 // Helper functions
337 template<class fib_task>
338 std::uintptr_t RunFibTask(std::uintptr_t n) {
339     std::uintptr_t res = ~0u;
340     fib_task(&res, n)();
341     return res;
342 }
343 
344 template<typename fib_task>
345 void RunFibTest() {
346     FIB_TEST_PROLOGUE();
347     std::uintptr_t sum = 0;
348     for( unsigned i = 0; i < numRepeats; ++i )
349         sum += RunFibTask<fib_task>(N);
350     FIB_TEST_EPILOGUE(sum);
351 }
352 
353 template<typename fib_task>
354 void FibFunctionNoArgs() {
355     g_Sum += RunFibTask<fib_task>(N);
356 }
357 
358 template<typename task_group_type>
359 void TestFibWithLambdas() {
360     FIB_TEST_PROLOGUE();
361     atomic_t sum;
362     sum = 0;
363     task_group_type tg;
364     for( unsigned i = 0; i < numRepeats; ++i )
365         tg.run( [&](){sum += RunFibTask<FibTaskSymmetricTreeWithFunctor<task_group_type> >(N);} );
366     tg.wait();
367     FIB_TEST_EPILOGUE(sum);
368 }
369 
370 template<typename task_group_type>
371 void TestFibWithFunctor() {
372     RunFibTest<FibTaskAsymmetricTreeWithFunctor<task_group_type> >();
373     RunFibTest< FibTaskSymmetricTreeWithFunctor<task_group_type> >();
374 }
375 
376 template<typename task_group_type>
377 void TestFibWithFunctionPtr() {
378     FIB_TEST_PROLOGUE();
379     g_Sum = 0;
380     task_group_type tg;
381     for( unsigned i = 0; i < numRepeats; ++i )
382         tg.run( &FibFunctionNoArgs<FibTaskSymmetricTreeWithFunctor<task_group_type> > );
383     tg.wait();
384     FIB_TEST_EPILOGUE(g_Sum);
385 }
386 
387 template<typename task_group_type>
388 void RunFibonacciTests() {
389     TestFibWithLambdas<task_group_type>();
390     TestFibWithFunctor<task_group_type>();
391     TestFibWithFunctionPtr<task_group_type>();
392 }
393 
394 class test_exception : public std::exception
395 {
396     const char* m_strDescription;
397 public:
398     test_exception ( const char* descr ) : m_strDescription(descr) {}
399 
400     const char* what() const throw() override { return m_strDescription; }
401 };
402 
403 using TestException = test_exception;
404 
405 #include <string.h>
406 
407 #define NUM_CHORES      512
408 #define NUM_GROUPS      64
409 #define SKIP_CHORES     (NUM_CHORES/4)
410 #define SKIP_GROUPS     (NUM_GROUPS/4)
411 #define EXCEPTION_DESCR1 "Test exception 1"
412 #define EXCEPTION_DESCR2 "Test exception 2"
413 
414 atomic_t g_ExceptionCount;
415 atomic_t g_TaskCount;
416 unsigned g_ExecutedAtCancellation;
417 bool g_Rethrow;
418 bool g_Throw;
419 
420 class ThrowingTask : utils::NoAssign, utils::NoAfterlife {
421     atomic_t &m_TaskCount;
422 public:
423     ThrowingTask( atomic_t& counter ) : m_TaskCount(counter) {}
424     void operator() () const {
425         utils::ConcurrencyTracker ct;
426         AssertLive();
427         if ( g_Throw ) {
428             if ( ++m_TaskCount == SKIP_CHORES )
429                 TBB_TEST_THROW(test_exception(EXCEPTION_DESCR1));
430             utils::yield();
431         }
432         else {
433             ++g_TaskCount;
434             while( !tbb::is_current_task_group_canceling() )
435                 utils::yield();
436         }
437     }
438 };
439 
440 inline void ResetGlobals ( bool bThrow, bool bRethrow ) {
441     g_Throw = bThrow;
442     g_Rethrow = bRethrow;
443     g_ExceptionCount = 0;
444     g_TaskCount = 0;
445     utils::ConcurrencyTracker::Reset();
446 }
447 
448 template<typename task_group_type>
449 void LaunchChildrenWithFunctor () {
450     atomic_t count;
451     count = 0;
452     task_group_type g;
453     for (unsigned i = 0; i < NUM_CHORES; ++i) {
454         if (i % 2 == 1) {
455             g.run(g.defer(ThrowingTask(count)));
456         } else
457         {
458             g.run(ThrowingTask(count));
459         }
460     }
461 #if TBB_USE_EXCEPTIONS
462     tbb::task_group_status status = tbb::not_complete;
463     bool exceptionCaught = false;
464     try {
465         status = g.wait();
466     } catch ( TestException& e ) {
467         CHECK_MESSAGE( e.what(), "Empty what() string" );
468         CHECK_MESSAGE( strcmp(e.what(), EXCEPTION_DESCR1) == 0, "Unknown exception" );
469         exceptionCaught = true;
470         ++g_ExceptionCount;
471     } catch( ... ) { CHECK_MESSAGE( false, "Unknown exception" ); }
472     if (g_Throw && !exceptionCaught && status != tbb::canceled) {
473         CHECK_MESSAGE(false, "No exception in the child task group");
474     }
475     if ( g_Rethrow && g_ExceptionCount > SKIP_GROUPS ) {
476         throw test_exception(EXCEPTION_DESCR2);
477     }
478 #else
479     g.wait();
480 #endif
481 }
482 
483 // Tests for cancellation and exception handling behavior
484 template<typename task_group_type>
485 void TestManualCancellationWithFunctor () {
486     ResetGlobals( false, false );
487     task_group_type tg;
488     for (unsigned i = 0; i < NUM_GROUPS; ++i) {
489         // TBB version does not require taking function address
490         if (i % 2 == 0) {
491             auto h = tg.defer(&LaunchChildrenWithFunctor<task_group_type>);
492             tg.run(std::move(h));
493         } else
494         {
495             tg.run(&LaunchChildrenWithFunctor<task_group_type>);
496         }
497     }
498     CHECK_MESSAGE ( !tbb::is_current_task_group_canceling(), "Unexpected cancellation" );
499     while ( g_MaxConcurrency > 1 && g_TaskCount == 0 )
500         utils::yield();
501     tg.cancel();
502     g_ExecutedAtCancellation = int(g_TaskCount);
503     tbb::task_group_status status = tg.wait();
504     CHECK_MESSAGE( status == tbb::canceled, "Task group reported invalid status." );
505     CHECK_MESSAGE( g_TaskCount <= NUM_GROUPS * NUM_CHORES, "Too many tasks reported. The test is broken" );
506     CHECK_MESSAGE( g_TaskCount < NUM_GROUPS * NUM_CHORES, "No tasks were cancelled. Cancellation model changed?" );
507     CHECK_MESSAGE( g_TaskCount <= g_ExecutedAtCancellation + utils::ConcurrencyTracker::PeakParallelism(), "Too many tasks survived cancellation" );
508 }
509 
510 #if TBB_USE_EXCEPTIONS
511 template<typename task_group_type>
512 void TestExceptionHandling1 () {
513     ResetGlobals( true, false );
514     task_group_type tg;
515     for( unsigned i = 0; i < NUM_GROUPS; ++i )
516         // TBB version does not require taking function address
517         tg.run( &LaunchChildrenWithFunctor<task_group_type> );
518     try {
519         tg.wait();
520     } catch ( ... ) {
521         CHECK_MESSAGE( false, "Unexpected exception" );
522     }
523     CHECK_MESSAGE( g_ExceptionCount <= NUM_GROUPS, "Too many exceptions from the child groups. The test is broken" );
524     CHECK_MESSAGE( g_ExceptionCount == NUM_GROUPS, "Not all child groups threw the exception" );
525 }
526 
527 template<typename task_group_type>
528 void TestExceptionHandling2 () {
529     ResetGlobals( true, true );
530     task_group_type tg;
531     bool exceptionCaught = false;
532     for( unsigned i = 0; i < NUM_GROUPS; ++i ) {
533         // TBB version does not require taking function address
534         tg.run( &LaunchChildrenWithFunctor<task_group_type> );
535     }
536     try {
537         tg.wait();
538     } catch ( TestException& e ) {
539         CHECK_MESSAGE( e.what(), "Empty what() string" );
540         CHECK_MESSAGE( strcmp(e.what(), EXCEPTION_DESCR2) == 0, "Unknown exception" );
541         exceptionCaught = true;
542     } catch( ... ) { CHECK_MESSAGE( false, "Unknown exception" ); }
543     CHECK_MESSAGE( exceptionCaught, "No exception thrown from the root task group" );
544     CHECK_MESSAGE( g_ExceptionCount >= SKIP_GROUPS, "Too few exceptions from the child groups. The test is broken" );
545     CHECK_MESSAGE( g_ExceptionCount <= NUM_GROUPS - SKIP_GROUPS, "Too many exceptions from the child groups. The test is broken" );
546     CHECK_MESSAGE( g_ExceptionCount < NUM_GROUPS - SKIP_GROUPS, "None of the child groups was cancelled" );
547 }
548 
549 template <typename task_group_type>
550 void TestExceptionHandling3() {
551     task_group_type tg;
552     try {
553         tg.run_and_wait([]() {
554             volatile bool suppress_unreachable_code_warning = true;
555             if (suppress_unreachable_code_warning) {
556                 throw 1;
557             }
558         });
559     } catch (int error) {
560         CHECK(error == 1);
561     } catch ( ... ) {
562         CHECK_MESSAGE( false, "Unexpected exception" );
563     }
564 }
565 
566 template<typename task_group_type>
567 class LaunchChildrenDriver {
568 public:
569     void Launch(task_group_type& tg) {
570         ResetGlobals(false, false);
571         for (unsigned i = 0; i < NUM_GROUPS; ++i) {
572             tg.run(LaunchChildrenWithFunctor<task_group_type>);
573         }
574         CHECK_MESSAGE(!tbb::is_current_task_group_canceling(), "Unexpected cancellation");
575         while (g_MaxConcurrency > 1 && g_TaskCount == 0)
576             utils::yield();
577     }
578 
579     void Finish() {
580         CHECK_MESSAGE(g_TaskCount <= NUM_GROUPS * NUM_CHORES, "Too many tasks reported. The test is broken");
581         CHECK_MESSAGE(g_TaskCount < NUM_GROUPS * NUM_CHORES, "No tasks were cancelled. Cancellation model changed?");
582         CHECK_MESSAGE(g_TaskCount <= g_ExecutedAtCancellation + g_MaxConcurrency, "Too many tasks survived cancellation");
583     }
584 }; // LaunchChildrenWithTaskHandleDriver
585 
586 template<typename task_group_type, bool Throw>
587 void TestMissingWait () {
588     bool exception_occurred = false,
589          unexpected_exception = false;
590     LaunchChildrenDriver<task_group_type> driver;
591     try {
592         task_group_type tg;
593         driver.Launch( tg );
594         volatile bool suppress_unreachable_code_warning = Throw;
595         if (suppress_unreachable_code_warning) {
596             throw int(); // Initiate stack unwinding
597         }
598     }
599     catch ( const tbb::missing_wait& e ) {
600         CHECK_MESSAGE( e.what(), "Error message is absent" );
601         exception_occurred = true;
602         unexpected_exception = Throw;
603     }
604     catch ( int ) {
605         exception_occurred = true;
606         unexpected_exception = !Throw;
607     }
608     catch ( ... ) {
609         exception_occurred = unexpected_exception = true;
610     }
611     CHECK( exception_occurred );
612     CHECK( !unexpected_exception );
613     driver.Finish();
614 }
615 #endif
616 
617 template<typename task_group_type>
618 void RunCancellationAndExceptionHandlingTests() {
619     TestManualCancellationWithFunctor<task_group_type>();
620 #if TBB_USE_EXCEPTIONS
621     TestExceptionHandling1<task_group_type>();
622     TestExceptionHandling2<task_group_type>();
623     TestExceptionHandling3<task_group_type>();
624     TestMissingWait<task_group_type, true>();
625     TestMissingWait<task_group_type, false>();
626 #endif
627 }
628 
629 void EmptyFunction () {}
630 
631 struct TestFunctor {
632     void operator()() { CHECK_MESSAGE( false, "Non-const operator called" ); }
633     void operator()() const { /* library requires this overload only */ }
634 };
635 
636 template<typename task_group_type>
637 void TestConstantFunctorRequirement() {
638     task_group_type g;
639     TestFunctor tf;
640     g.run( tf ); g.wait();
641     g.run_and_wait( tf );
642 }
643 
644 //------------------------------------------------------------------------
645 namespace TestMoveSemanticsNS {
646     struct TestFunctor {
647         void operator()() const {};
648     };
649 
650     struct MoveOnlyFunctor : utils::MoveOnly, TestFunctor {
651         MoveOnlyFunctor() : utils::MoveOnly() {};
652         MoveOnlyFunctor(MoveOnlyFunctor&& other) : utils::MoveOnly(std::move(other)) {};
653     };
654 
655     struct MovePreferableFunctor : utils::Movable, TestFunctor {
656         MovePreferableFunctor() : utils::Movable() {};
657         MovePreferableFunctor(MovePreferableFunctor&& other) : utils::Movable(std::move(other)) {};
658         MovePreferableFunctor(const MovePreferableFunctor& other) : utils::Movable(other) {};
659     };
660 
661     struct NoMoveNoCopyFunctor : utils::NoCopy, TestFunctor {
662         NoMoveNoCopyFunctor() : utils::NoCopy() {};
663         // mv ctor is not allowed as cp ctor from parent utils::NoCopy
664     private:
665         NoMoveNoCopyFunctor(NoMoveNoCopyFunctor&&);
666     };
667 
668      template<typename task_group_type>
669     void TestBareFunctors() {
670         task_group_type tg;
671         MovePreferableFunctor mpf;
672         // run_and_wait() doesn't have any copies or moves of arguments inside the impl
673         tg.run_and_wait( NoMoveNoCopyFunctor() );
674 
675         tg.run( MoveOnlyFunctor() );
676         tg.wait();
677 
678         tg.run( mpf );
679         tg.wait();
680         CHECK_MESSAGE(mpf.alive, "object was moved when was passed by lval");
681         mpf.Reset();
682 
683         tg.run( std::move(mpf) );
684         tg.wait();
685         CHECK_MESSAGE(!mpf.alive, "object was copied when was passed by rval");
686         mpf.Reset();
687     }
688 }
689 
690 template<typename task_group_type>
691 void TestMoveSemantics() {
692     TestMoveSemanticsNS::TestBareFunctors<task_group_type>();
693 }
694 //------------------------------------------------------------------------
695 
696 // TODO: TBB_REVAMP_TODO - enable when ETS is available
697 #if TBBTEST_USE_TBB && TBB_PREVIEW_ISOLATED_TASK_GROUP
698 namespace TestIsolationNS {
699     class DummyFunctor {
700     public:
701         DummyFunctor() {}
702         void operator()() const {
703             for ( volatile int j = 0; j < 10; ++j ) {}
704         }
705     };
706 
707     template<typename task_group_type>
708     class ParForBody {
709         task_group_type& m_tg;
710         std::atomic<bool>& m_preserved;
711         tbb::enumerable_thread_specific<int>& m_ets;
712     public:
713         ParForBody(
714             task_group_type& tg,
715             std::atomic<bool>& preserved,
716             tbb::enumerable_thread_specific<int>& ets
717         ) : m_tg(tg), m_preserved(preserved), m_ets(ets) {}
718 
719         void operator()(int) const {
720             if (++m_ets.local() > 1) m_preserved = false;
721 
722             for (int i = 0; i < 1000; ++i)
723                 m_tg.run(DummyFunctor());
724             m_tg.wait();
725             m_tg.run_and_wait(DummyFunctor());
726 
727             --m_ets.local();
728         }
729     };
730 
731     template<typename task_group_type>
732     void CheckIsolation(bool isolation_is_expected) {
733         task_group_type tg;
734         std::atomic<bool> isolation_is_preserved;
735         isolation_is_preserved = true;
736         tbb::enumerable_thread_specific<int> ets(0);
737 
738         tbb::parallel_for(0, 100, ParForBody<task_group_type>(tg, isolation_is_preserved, ets));
739 
740         ASSERT(
741             isolation_is_expected == isolation_is_preserved,
742             "Actual and expected isolation-related behaviours are different"
743         );
744     }
745 
746     // Should be called only when > 1 thread is used, because otherwise isolation is guaranteed to take place
747     void TestIsolation() {
748         CheckIsolation<tbb::task_group>(false);
749         CheckIsolation<tbb::isolated_task_group>(true);
750     }
751 }
752 #endif
753 
754 #if __TBB_USE_ADDRESS_SANITIZER
755 //! Test for thread safety for the task_group
756 //! \brief \ref error_guessing \ref resource_usage
757 TEST_CASE("Memory leaks test is not applicable under ASAN\n" * doctest::skip(true)) {}
758 #elif !EMSCRIPTEN
759 //! Emscripten requires preloading of the file used to determine memory usage, hence disabled.
760 //! Test for thread safety for the task_group
761 //! \brief \ref error_guessing \ref resource_usage
762 TEST_CASE("Thread safety test for the task group") {
763     if (tbb::this_task_arena::max_concurrency() < 2) {
764         // The test requires more than one thread to check thread safety
765         return;
766     }
767     for (unsigned p=MinThread; p <= MaxThread; ++p) {
768         if (p < 2) {
769             continue;
770         }
771         tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p);
772         g_MaxConcurrency = p;
773         TestThreadSafety<tbb::task_group>();
774     }
775 }
776 #endif
777 
778 //! Fibonacci test for task group
779 //! \brief \ref interface \ref requirement
780 TEST_CASE("Fibonacci test for the task group") {
781     for (unsigned p=MinThread; p <= MaxThread; ++p) {
782         tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p);
783         g_MaxConcurrency = p;
784         RunFibonacciTests<tbb::task_group>();
785     }
786 }
787 
788 //! Cancellation and exception test for the task group
789 //! \brief \ref interface \ref requirement
790 TEST_CASE("Cancellation and exception test for the task group") {
791     for (unsigned p = MinThread; p <= MaxThread; ++p) {
792         tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p);
793         tbb::task_arena a(p);
794         g_MaxConcurrency = p;
795         a.execute([] {
796             RunCancellationAndExceptionHandlingTests<tbb::task_group>();
797         });
798     }
799 }
800 
801 //! Constant functor test for the task group
802 //! \brief \ref interface \ref negative
803 TEST_CASE("Constant functor test for the task group") {
804     for (unsigned p=MinThread; p <= MaxThread; ++p) {
805         tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p);
806         g_MaxConcurrency = p;
807         TestConstantFunctorRequirement<tbb::task_group>();
808     }
809 }
810 
811 //! Move semantics test for the task group
812 //! \brief \ref interface \ref requirement
813 TEST_CASE("Move semantics test for the task group") {
814     for (unsigned p=MinThread; p <= MaxThread; ++p) {
815         tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p);
816         g_MaxConcurrency = p;
817         TestMoveSemantics<tbb::task_group>();
818     }
819 }
820 
821 #if TBB_PREVIEW_ISOLATED_TASK_GROUP
822 
823 #if __TBB_USE_ADDRESS_SANITIZER
824 //! Test for thread safety for the isolated_task_group
825 //! \brief \ref error_guessing
826 TEST_CASE("Memory leaks test is not applicable under ASAN\n" * doctest::skip(true)) {}
827 #elif !EMSCRIPTEN
828 //! Test for thread safety for the isolated_task_group
829 //! \brief \ref error_guessing
830 TEST_CASE("Thread safety test for the isolated task group") {
831     if (tbb::this_task_arena::max_concurrency() < 2) {
832         // The test requires more than one thread to check thread safety
833         return;
834     }
835     for (unsigned p=MinThread; p <= MaxThread; ++p) {
836         if (p < 2) {
837             continue;
838         }
839         tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p);
840         g_MaxConcurrency = p;
841         TestThreadSafety<tbb::isolated_task_group>();
842     }
843 }
844 #endif
845 
846 //! Cancellation and exception test for the isolated task group
847 //! \brief \ref interface \ref requirement
848 TEST_CASE("Fibonacci test for the isolated task group") {
849     for (unsigned p=MinThread; p <= MaxThread; ++p) {
850         tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p);
851         g_MaxConcurrency = p;
852         RunFibonacciTests<tbb::isolated_task_group>();
853     }
854 }
855 
856 //! Cancellation and exception test for the isolated task group
857 //! \brief \ref interface \ref requirement
858 TEST_CASE("Cancellation and exception test for the isolated task group") {
859     for (unsigned p=MinThread; p <= MaxThread; ++p) {
860         tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p);
861         g_MaxConcurrency = p;
862         RunCancellationAndExceptionHandlingTests<tbb::isolated_task_group>();
863     }
864 }
865 
866 //! Constant functor test for the isolated task group.
867 //! \brief \ref interface \ref negative
868 TEST_CASE("Constant functor test for the isolated task group") {
869     for (unsigned p=MinThread; p <= MaxThread; ++p) {
870         tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p);
871         g_MaxConcurrency = p;
872         TestConstantFunctorRequirement<tbb::isolated_task_group>();
873     }
874 }
875 
876 //! Move semantics test for the isolated task group.
877 //! \brief \ref interface \ref requirement
878 TEST_CASE("Move semantics test for the isolated task group") {
879     for (unsigned p=MinThread; p <= MaxThread; ++p) {
880         tbb::global_control limit(tbb::global_control::max_allowed_parallelism, p);
881         g_MaxConcurrency = p;
882         TestMoveSemantics<tbb::isolated_task_group>();
883     }
884 }
885 
886 //TODO: add test void isolated_task_group::run(d2::task_handle&& h) and isolated_task_group::::run_and_wait(d2::task_handle&& h)
887 #endif /* TBB_PREVIEW_ISOLATED_TASK_GROUP */
888 
889 void run_deep_stealing(tbb::task_group& tg1, tbb::task_group& tg2, int num_tasks, std::atomic<int>& tasks_executed) {
890     for (int i = 0; i < num_tasks; ++i) {
891         tg2.run([&tg1, &tasks_executed] {
892             volatile char consume_stack[1000]{};
893             ++tasks_executed;
894             tg1.wait();
895             utils::suppress_unused_warning(consume_stack);
896         });
897     }
898 }
899 
900 // TODO: move to the conformance test
901 //! Test for stack overflow avoidance mechanism.
902 //! \brief \ref requirement
903 TEST_CASE("Test for stack overflow avoidance mechanism") {
904     if (tbb::this_task_arena::max_concurrency() < 2) {
905         return;
906     }
907 
908     tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, 2);
909     tbb::task_group tg1;
910     tbb::task_group tg2;
911     std::atomic<int> tasks_executed{};
912     tg1.run_and_wait([&tg1, &tg2, &tasks_executed] {
913         run_deep_stealing(tg1, tg2, 10000, tasks_executed);
914         while (tasks_executed < 100) {
915             // Some stealing is expected to happen.
916             utils::yield();
917         }
918         CHECK(tasks_executed < 10000);
919     });
920     tg2.wait();
921     CHECK(tasks_executed == 10000);
922 }
923 
924 //! Test for stack overflow avoidance mechanism.
925 //! \brief \ref error_guessing
926 TEST_CASE("Test for stack overflow avoidance mechanism within arena") {
927     if (tbb::this_task_arena::max_concurrency() < 2) {
928         return;
929     }
930 
931     tbb::task_arena a1(2, 1);
932     a1.execute([] {
933         tbb::task_group tg1;
934         tbb::task_group tg2;
935         std::atomic<int> tasks_executed{};
936 
937         // Determine nested task execution limit.
938         int second_thread_executed{};
939         tg1.run_and_wait([&tg1, &tg2, &tasks_executed, &second_thread_executed] {
940             run_deep_stealing(tg1, tg2, 10000, tasks_executed);
941             do {
942                 second_thread_executed = tasks_executed;
943                 utils::Sleep(10);
944             } while (second_thread_executed < 100 || second_thread_executed != tasks_executed);
945             CHECK(tasks_executed < 10000);
946         });
947         tg2.wait();
948         CHECK(tasks_executed == 10000);
949 
950         tasks_executed = 0;
951         tbb::task_arena a2(2, 2);
952         tg1.run_and_wait([&a2, &tg1, &tg2, &tasks_executed, second_thread_executed] {
953             run_deep_stealing(tg1, tg2, second_thread_executed - 1, tasks_executed);
954             while (tasks_executed < second_thread_executed - 1) {
955                 // Wait until the second thread near the limit.
956                 utils::yield();
957             }
958             tg2.run([&a2, &tg1, &tasks_executed] {
959                 a2.execute([&tg1, &tasks_executed] {
960                     volatile char consume_stack[1000]{};
961                     ++tasks_executed;
962                     tg1.wait();
963                     utils::suppress_unused_warning(consume_stack);
964                 });
965             });
966             while (tasks_executed < second_thread_executed) {
967                 // Wait until the second joins the arena.
968                 utils::yield();
969             }
970             a2.execute([&tg1, &tg2, &tasks_executed] {
971                 run_deep_stealing(tg1, tg2, 10000, tasks_executed);
972             });
973             int currently_executed{};
974             do {
975                 currently_executed = tasks_executed;
976                 utils::Sleep(10);
977             } while (currently_executed != tasks_executed);
978             CHECK(tasks_executed < 10000 + second_thread_executed);
979         });
980         a2.execute([&tg2] {
981             tg2.wait();
982         });
983         CHECK(tasks_executed == 10000 + second_thread_executed);
984     });
985 }
986 
987 //! Test checks that we can submit work to task_group asynchronously with waiting.
988 //! \brief \ref regression
989 TEST_CASE("Async task group") {
990     int num_threads = tbb::this_task_arena::max_concurrency();
991     if (num_threads < 3) {
992         // The test requires at least 2 worker threads
993         return;
994     }
995     tbb::task_arena a(2*num_threads, num_threads);
996     utils::SpinBarrier barrier(num_threads + 2);
997     tbb::task_group tg[2];
998     std::atomic<bool> finished[2]{};
999     finished[0] = false; finished[1] = false;
1000     for (int i = 0; i < 2; ++i) {
1001         a.enqueue([i, &tg, &finished, &barrier] {
1002             barrier.wait();
1003             for (int j = 0; j < 10000; ++j) {
1004                 tg[i].run([] {});
1005                 utils::yield();
1006             }
1007             finished[i] = true;
1008         });
1009     }
1010     utils::NativeParallelFor(num_threads, [&](int idx) {
1011         barrier.wait();
1012         a.execute([idx, &tg, &finished] {
1013             std::size_t counter{};
1014             while (!finished[idx%2]) {
1015                 tg[idx%2].wait();
1016                 if (counter++ % 16 == 0) utils::yield();
1017             }
1018             tg[idx%2].wait();
1019         });
1020     });
1021 }
1022 
1023 struct SelfRunner {
1024     tbb::task_group& m_tg;
1025     std::atomic<unsigned>& count;
1026     void operator()() const {
1027         unsigned previous_count = count.fetch_sub(1);
1028         if (previous_count > 1)
1029             m_tg.run( *this );
1030     }
1031 };
1032 
1033 //! Submit work to single task_group instance from inside the work
1034 //! \brief \ref error_guessing
1035 TEST_CASE("Run self using same task_group instance") {
1036     const unsigned num = 10;
1037     std::atomic<unsigned> count{num};
1038     tbb::task_group tg;
1039     SelfRunner uf{tg, count};
1040     tg.run( uf );
1041     tg.wait();
1042     CHECK_MESSAGE(
1043         count == 0,
1044         "Not all tasks were spawned from inside the functor running within task_group."
1045     );
1046 }
1047 
1048 //TODO: move to some common place to share with conformance tests
1049 namespace accept_task_group_context {
1050 
1051 template <typename TaskGroup, typename CancelF, typename WaitF>
1052 void run_cancellation_use_case(CancelF&& cancel, WaitF&& wait) {
1053     std::atomic<bool> outer_cancelled{false};
1054     std::atomic<unsigned> count{13};
1055 
1056     tbb::task_group_context inner_ctx(tbb::task_group_context::isolated);
1057     TaskGroup inner_tg(inner_ctx);
1058 
1059     tbb::task_group outer_tg;
1060     auto outer_tg_task = [&] {
1061         inner_tg.run([&] {
1062             utils::SpinWaitUntilEq(outer_cancelled, true);
1063             inner_tg.run( SelfRunner{inner_tg, count} );
1064         });
1065 
1066         utils::try_call([&] {
1067             std::forward<CancelF>(cancel)(outer_tg);
1068         }).on_completion([&] {
1069             outer_cancelled = true;
1070         });
1071     };
1072 
1073     auto check = [&] {
1074         tbb::task_group_status outer_status = tbb::task_group_status::not_complete;
1075         outer_status = std::forward<WaitF>(wait)(outer_tg);
1076         CHECK_MESSAGE(
1077             outer_status == tbb::task_group_status::canceled,
1078             "Outer task group should have been cancelled."
1079         );
1080 
1081         tbb::task_group_status inner_status = inner_tg.wait();
1082         CHECK_MESSAGE(
1083             inner_status == tbb::task_group_status::complete,
1084             "Inner task group should have completed despite the cancellation of the outer one."
1085         );
1086 
1087         CHECK_MESSAGE(0 == count, "Some of the inner group tasks were not executed.");
1088     };
1089 
1090     outer_tg.run(outer_tg_task);
1091     check();
1092 }
1093 
1094 template <typename TaskGroup>
1095 void test() {
1096     run_cancellation_use_case<TaskGroup>(
1097         [](tbb::task_group& outer) { outer.cancel(); },
1098         [](tbb::task_group& outer) { return outer.wait(); }
1099     );
1100 
1101 #if TBB_USE_EXCEPTIONS
1102     run_cancellation_use_case<TaskGroup>(
1103         [](tbb::task_group& /*outer*/) {
1104             volatile bool suppress_unreachable_code_warning = true;
1105             if (suppress_unreachable_code_warning) {
1106                 throw int();
1107             }
1108         },
1109         [](tbb::task_group& outer) {
1110             try {
1111                 outer.wait();
1112                 return tbb::task_group_status::complete;
1113             } catch(const int&) {
1114                 return tbb::task_group_status::canceled;
1115             }
1116         }
1117     );
1118 #endif
1119 }
1120 
1121 } // namespace accept_task_group_context
1122 
1123 //! Respect task_group_context passed from outside
1124 //! \brief \ref interface \ref requirement
1125 TEST_CASE("Respect task_group_context passed from outside") {
1126 #if TBB_PREVIEW_ISOLATED_TASK_GROUP
1127     accept_task_group_context::test<tbb::isolated_task_group>();
1128 #endif
1129 }
1130 
1131 #if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS
1132 //! The test for task_handle inside other task waiting with run
1133 //! \brief \ref requirement
1134 TEST_CASE("Task handle for scheduler bypass"){
1135     tbb::task_group tg;
1136     std::atomic<bool> run {false};
1137 
1138     tg.run([&]{
1139         return tg.defer([&]{
1140             run = true;
1141         });
1142     });
1143 
1144     tg.wait();
1145     CHECK_MESSAGE(run == true, "task handle returned by user lambda (bypassed) should be run");
1146 }
1147 
1148 //! The test for task_handle inside other task waiting with run_and_wait
1149 //! \brief \ref requirement
1150 TEST_CASE("Task handle for scheduler bypass via run_and_wait"){
1151     tbb::task_group tg;
1152     std::atomic<bool> run {false};
1153 
1154     tg.run_and_wait([&]{
1155         return tg.defer([&]{
1156             run = true;
1157         });
1158     });
1159 
1160     CHECK_MESSAGE(run == true, "task handle returned by user lambda (bypassed) should be run");
1161 }
1162 #endif //__TBB_PREVIEW_TASK_GROUP_EXTENSIONS
1163 
1164 #if TBB_USE_EXCEPTIONS
1165 //As these tests are against behavior marked by spec as undefined, they can not be put into conformance tests
1166 
1167 //! The test for error in scheduling empty task_handle
1168 //! \brief \ref requirement
1169 TEST_CASE("Empty task_handle cannot be scheduled"
1170         * doctest::should_fail()    //Test needs to revised as implementation uses assertions instead of exceptions
1171         * doctest::skip()           //skip the test for now, to not pollute the test log
1172 ){
1173     tbb::task_group tg;
1174 
1175     CHECK_THROWS_WITH_AS(tg.run(tbb::task_handle{}), "Attempt to schedule empty task_handle", std::runtime_error);
1176 }
1177 
1178 //! The test for error in task_handle being scheduled into task_group different from one it was created from
1179 //! \brief \ref requirement
1180 TEST_CASE("task_handle cannot be scheduled into different task_group"
1181         * doctest::should_fail()    //Test needs to revised as implementation uses assertions instead of exceptions
1182         * doctest::skip()           //skip the test for now, to not pollute the test log
1183 ){
1184     tbb::task_group tg;
1185     tbb::task_group tg1;
1186 
1187     CHECK_THROWS_WITH_AS(tg1.run(tg.defer([]{})), "Attempt to schedule task_handle into different task_group", std::runtime_error);
1188 }
1189 
1190 //! The test for error in task_handle being scheduled into task_group different from one it was created from
1191 //! \brief \ref requirement
1192 TEST_CASE("task_handle cannot be scheduled into other task_group of the same context"
1193         * doctest::should_fail()    //Implementation is no there yet, as it is not clear that is the expected behavior
1194         * doctest::skip()           //skip the test for now, to not pollute the test log
1195 )
1196 {
1197     tbb::task_group_context ctx;
1198 
1199     tbb::task_group tg(ctx);
1200     tbb::task_group tg1(ctx);
1201 
1202     CHECK_NOTHROW(tg.run(tg.defer([]{})));
1203     CHECK_THROWS_WITH_AS(tg1.run(tg.defer([]{})), "Attempt to schedule task_handle into different task_group", std::runtime_error);
1204 }
1205 
1206 #endif // TBB_USE_EXCEPTIONS
1207 
1208