xref: /oneTBB/test/tbb/test_eh_algorithms.cpp (revision c4568449)
151c0b2f7Stbbdev /*
2*c4568449SPavel Kumbrasev     Copyright (c) 2005-2023 Intel Corporation
351c0b2f7Stbbdev 
451c0b2f7Stbbdev     Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev     you may not use this file except in compliance with the License.
651c0b2f7Stbbdev     You may obtain a copy of the License at
751c0b2f7Stbbdev 
851c0b2f7Stbbdev         http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev 
1051c0b2f7Stbbdev     Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev     distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev     See the License for the specific language governing permissions and
1451c0b2f7Stbbdev     limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev 
1751c0b2f7Stbbdev #include "common/test.h"
1851c0b2f7Stbbdev #include "common/concurrency_tracker.h"
1951c0b2f7Stbbdev #include "common/iterator.h"
2051c0b2f7Stbbdev #include "common/utils_concurrency_limit.h"
2151c0b2f7Stbbdev 
2251c0b2f7Stbbdev #include <limits.h> // for INT_MAX
2351c0b2f7Stbbdev #include <thread>
248b6f831cStbbdev #include <vector>
2551c0b2f7Stbbdev 
2651c0b2f7Stbbdev #include "tbb/parallel_for.h"
2751c0b2f7Stbbdev #include "tbb/parallel_reduce.h"
2851c0b2f7Stbbdev #include "tbb/parallel_for_each.h"
2951c0b2f7Stbbdev #include "tbb/parallel_pipeline.h"
3051c0b2f7Stbbdev #include "tbb/blocked_range.h"
3151c0b2f7Stbbdev #include "tbb/task_group.h"
3251c0b2f7Stbbdev #include "tbb/concurrent_unordered_map.h"
3351c0b2f7Stbbdev #include "tbb/task.h"
34*c4568449SPavel Kumbrasev #include "tbb/global_control.h"
3551c0b2f7Stbbdev 
3651c0b2f7Stbbdev //! \file test_eh_algorithms.cpp
3751c0b2f7Stbbdev //! \brief Test for [algorithms.parallel_for algorithms.parallel_reduce algorithms.parallel_deterministic_reduce algorithms.parallel_for_each algorithms.parallel_pipeline algorithms.parallel_pipeline.flow_control] specifications
3851c0b2f7Stbbdev 
3951c0b2f7Stbbdev #define FLAT_RANGE  100000
4051c0b2f7Stbbdev #define FLAT_GRAIN  100
4151c0b2f7Stbbdev #define OUTER_RANGE  100
4251c0b2f7Stbbdev #define OUTER_GRAIN  10
4351c0b2f7Stbbdev #define INNER_RANGE  (FLAT_RANGE / OUTER_RANGE)
4451c0b2f7Stbbdev #define INNER_GRAIN  (FLAT_GRAIN / OUTER_GRAIN)
4551c0b2f7Stbbdev 
4651c0b2f7Stbbdev struct context_specific_counter {
4751c0b2f7Stbbdev     tbb::concurrent_unordered_map<tbb::task_group_context*, std::atomic<unsigned>> context_map{};
4851c0b2f7Stbbdev 
incrementcontext_specific_counter4951c0b2f7Stbbdev     void increment() {
5051c0b2f7Stbbdev         tbb::task_group_context* ctx = tbb::task::current_context();
5151c0b2f7Stbbdev         REQUIRE(ctx != nullptr);
5251c0b2f7Stbbdev         context_map[ctx]++;
5351c0b2f7Stbbdev     }
5451c0b2f7Stbbdev 
resetcontext_specific_counter5551c0b2f7Stbbdev     void reset() {
5651c0b2f7Stbbdev         context_map.clear();
5751c0b2f7Stbbdev     }
5851c0b2f7Stbbdev 
validatecontext_specific_counter5951c0b2f7Stbbdev     void validate(unsigned expected_count, const char* msg) {
6051c0b2f7Stbbdev         for (auto it = context_map.begin(); it != context_map.end(); it++) {
6151c0b2f7Stbbdev             REQUIRE_MESSAGE( it->second <= expected_count, msg);
6251c0b2f7Stbbdev         }
6351c0b2f7Stbbdev     }
6451c0b2f7Stbbdev };
6551c0b2f7Stbbdev 
6651c0b2f7Stbbdev std::atomic<intptr_t> g_FedTasksCount{}; // number of tasks added by parallel_for_each feeder
6751c0b2f7Stbbdev std::atomic<intptr_t> g_OuterParCalls{};  // number of actual invocations of the outer construct executed.
6851c0b2f7Stbbdev context_specific_counter g_TGCCancelled{};  // Number of times a task sees its group cancelled at start
6951c0b2f7Stbbdev 
7051c0b2f7Stbbdev #include "common/exception_handling.h"
7151c0b2f7Stbbdev 
7251c0b2f7Stbbdev /********************************
7351c0b2f7Stbbdev       Variables in test
7451c0b2f7Stbbdev 
7551c0b2f7Stbbdev __ Test control variables
76b15aabb3Stbbdev       g_ExceptionInMaster -- only the external thread is allowed to throw.  If false, the external cannot throw
7751c0b2f7Stbbdev       g_SolitaryException -- only one throw may be executed.
7851c0b2f7Stbbdev 
7951c0b2f7Stbbdev -- controls for ThrowTestException for pipeline tests
8051c0b2f7Stbbdev       g_NestedPipelines -- are inner pipelines being run?
8151c0b2f7Stbbdev       g_PipelinesStarted -- how many pipelines have run their first filter at least once.
8251c0b2f7Stbbdev 
8351c0b2f7Stbbdev -- Information variables
8451c0b2f7Stbbdev 
85b15aabb3Stbbdev    g_Master -- Thread ID of the "external" thread
86b15aabb3Stbbdev       In pipelines sometimes the external thread does not participate, so the tests have to be resilient to this.
8751c0b2f7Stbbdev 
8851c0b2f7Stbbdev -- Measurement variables
8951c0b2f7Stbbdev 
9051c0b2f7Stbbdev    g_OuterParCalls -- how many outer parallel ranges or filters started
9151c0b2f7Stbbdev    g_TGCCancelled --  how many inner parallel ranges or filters saw task::self().is_cancelled()
9251c0b2f7Stbbdev    g_ExceptionsThrown -- number of throws executed (counted in ThrowTestException)
93b15aabb3Stbbdev    g_MasterExecutedThrow -- number of times external thread actually executed a throw
94b15aabb3Stbbdev    g_NonMasterExecutedThrow -- number of times non-external thread actually executed a throw
9551c0b2f7Stbbdev    g_ExceptionCaught -- one of PropagatedException or unknown exception was caught.  (Other exceptions cause assertions.)
9651c0b2f7Stbbdev 
9751c0b2f7Stbbdev    --  Tallies for the task bodies which have executed (counted in each inner body, sampled in ThrowTestException)
9851c0b2f7Stbbdev       g_CurExecuted -- total number of inner ranges or filters which executed
9951c0b2f7Stbbdev       g_ExecutedAtLastCatch -- value of g_CurExecuted when last catch was made, 0 if none.
10051c0b2f7Stbbdev       g_ExecutedAtFirstCatch -- value of g_CurExecuted when first catch is made, 0 if none.
10151c0b2f7Stbbdev   *********************************/
10251c0b2f7Stbbdev 
ResetGlobals(bool throwException=true,bool flog=false)10351c0b2f7Stbbdev inline void ResetGlobals (  bool throwException = true, bool flog = false ) {
10451c0b2f7Stbbdev     ResetEhGlobals( throwException, flog );
10551c0b2f7Stbbdev     g_FedTasksCount = 0;
10651c0b2f7Stbbdev     g_OuterParCalls = 0;
10751c0b2f7Stbbdev     g_NestedPipelines = false;
10851c0b2f7Stbbdev     g_TGCCancelled.reset();
10951c0b2f7Stbbdev }
11051c0b2f7Stbbdev 
11151c0b2f7Stbbdev ////////////////////////////////////////////////////////////////////////////////
11251c0b2f7Stbbdev // Tests for tbb::parallel_for and tbb::parallel_reduce
11351c0b2f7Stbbdev ////////////////////////////////////////////////////////////////////////////////
11451c0b2f7Stbbdev 
11551c0b2f7Stbbdev typedef size_t count_type;
11651c0b2f7Stbbdev typedef tbb::blocked_range<count_type> range_type;
11751c0b2f7Stbbdev 
CountSubranges(range_type r)11851c0b2f7Stbbdev inline intptr_t CountSubranges(range_type r) {
11951c0b2f7Stbbdev     if(!r.is_divisible()) return intptr_t(1);
12051c0b2f7Stbbdev     range_type r2(r,tbb::split());
12151c0b2f7Stbbdev     return CountSubranges(r) + CountSubranges(r2);
12251c0b2f7Stbbdev }
12351c0b2f7Stbbdev 
NumSubranges(intptr_t length,intptr_t grain)12451c0b2f7Stbbdev inline intptr_t NumSubranges ( intptr_t length, intptr_t grain ) {
12551c0b2f7Stbbdev     return CountSubranges(range_type(0,length,grain));
12651c0b2f7Stbbdev }
12751c0b2f7Stbbdev 
12851c0b2f7Stbbdev template<class Body>
TestNumSubrangesCalculation(intptr_t length,intptr_t grain,intptr_t inner_length,intptr_t inner_grain)12951c0b2f7Stbbdev intptr_t TestNumSubrangesCalculation ( intptr_t length, intptr_t grain, intptr_t inner_length, intptr_t inner_grain ) {
13051c0b2f7Stbbdev     ResetGlobals();
13151c0b2f7Stbbdev     g_ThrowException = false;
13251c0b2f7Stbbdev     intptr_t outerCalls = NumSubranges(length, grain),
13351c0b2f7Stbbdev              innerCalls = NumSubranges(inner_length, inner_grain),
13451c0b2f7Stbbdev              maxExecuted = outerCalls * (innerCalls + 1);
13551c0b2f7Stbbdev     tbb::parallel_for( range_type(0, length, grain), Body() );
13651c0b2f7Stbbdev     REQUIRE_MESSAGE (g_CurExecuted == maxExecuted, "Wrong estimation of bodies invocation count");
13751c0b2f7Stbbdev     return maxExecuted;
13851c0b2f7Stbbdev }
13951c0b2f7Stbbdev 
14051c0b2f7Stbbdev class NoThrowParForBody {
14151c0b2f7Stbbdev public:
operator ()(const range_type & r) const14251c0b2f7Stbbdev     void operator()( const range_type& r ) const {
14351c0b2f7Stbbdev         if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true;
14451c0b2f7Stbbdev         else g_NonMasterExecuted = true;
14551c0b2f7Stbbdev         if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment();
146b15aabb3Stbbdev         utils::doDummyWork(r.size());
14751c0b2f7Stbbdev     }
14851c0b2f7Stbbdev };
14951c0b2f7Stbbdev 
15051c0b2f7Stbbdev #if TBB_USE_EXCEPTIONS
15151c0b2f7Stbbdev 
Test0()15251c0b2f7Stbbdev void Test0 () {
15351c0b2f7Stbbdev     ResetGlobals();
15451c0b2f7Stbbdev     tbb::simple_partitioner p;
15551c0b2f7Stbbdev     for( size_t i=0; i<10; ++i ) {
15651c0b2f7Stbbdev         tbb::parallel_for( range_type(0, 0, 1), NoThrowParForBody() );
15751c0b2f7Stbbdev         tbb::parallel_for( range_type(0, 0, 1), NoThrowParForBody(), p );
15851c0b2f7Stbbdev         tbb::parallel_for( range_type(0, 128, 8), NoThrowParForBody() );
15951c0b2f7Stbbdev         tbb::parallel_for( range_type(0, 128, 8), NoThrowParForBody(), p );
16051c0b2f7Stbbdev     }
16151c0b2f7Stbbdev } // void Test0 ()
16251c0b2f7Stbbdev 
16351c0b2f7Stbbdev //! Template that creates a functor suitable for parallel_reduce from a functor for parallel_for.
16451c0b2f7Stbbdev template<typename ParForBody>
16551c0b2f7Stbbdev class SimpleParReduceBody {
16651c0b2f7Stbbdev     ParForBody m_Body;
16751c0b2f7Stbbdev public:
16851c0b2f7Stbbdev     void operator=(const SimpleParReduceBody&) = delete;
16951c0b2f7Stbbdev     SimpleParReduceBody(const SimpleParReduceBody&) = default;
17051c0b2f7Stbbdev     SimpleParReduceBody() = default;
17151c0b2f7Stbbdev 
operator ()(const range_type & r) const17251c0b2f7Stbbdev     void operator()( const range_type& r ) const { m_Body(r); }
SimpleParReduceBody(SimpleParReduceBody & left,tbb::split)17351c0b2f7Stbbdev     SimpleParReduceBody( SimpleParReduceBody& left, tbb::split ) : m_Body(left.m_Body) {}
join(SimpleParReduceBody &)17451c0b2f7Stbbdev     void join( SimpleParReduceBody& /*right*/ ) {}
17551c0b2f7Stbbdev }; // SimpleParReduceBody
17651c0b2f7Stbbdev 
17751c0b2f7Stbbdev //! Test parallel_for and parallel_reduce for a given partitioner.
17851c0b2f7Stbbdev /** The Body need only be suitable for a parallel_for. */
17951c0b2f7Stbbdev template<typename ParForBody, typename Partitioner>
TestParallelLoopAux()18051c0b2f7Stbbdev void TestParallelLoopAux() {
18151c0b2f7Stbbdev     Partitioner partitioner;
18251c0b2f7Stbbdev     for( int i=0; i<2; ++i ) {
18351c0b2f7Stbbdev         ResetGlobals();
18451c0b2f7Stbbdev         TRY();
18551c0b2f7Stbbdev             if( i==0 )
18651c0b2f7Stbbdev                 tbb::parallel_for( range_type(0, FLAT_RANGE, FLAT_GRAIN), ParForBody(), partitioner );
18751c0b2f7Stbbdev             else {
18851c0b2f7Stbbdev                 SimpleParReduceBody<ParForBody> rb;
18951c0b2f7Stbbdev                 tbb::parallel_reduce( range_type(0, FLAT_RANGE, FLAT_GRAIN), rb, partitioner );
19051c0b2f7Stbbdev             }
19151c0b2f7Stbbdev         CATCH_AND_ASSERT();
19251c0b2f7Stbbdev         // two cases: g_SolitaryException and !g_SolitaryException
19351c0b2f7Stbbdev         //   1) g_SolitaryException: only one thread actually threw.  There is only one context, so the exception
19451c0b2f7Stbbdev         //      (when caught) will cause that context to be cancelled.  After this event, there may be one or
19551c0b2f7Stbbdev         //      more threads which are "in-flight", up to g_NumThreads, but no more will be started.  The threads,
19651c0b2f7Stbbdev         //      when they start, if they see they are cancelled, TGCCancelled is incremented.
19751c0b2f7Stbbdev         //   2) !g_SolitaryException: more than one thread can throw.  The number of threads that actually
198b15aabb3Stbbdev         //      threw is g_MasterExecutedThrow if only the external thread is allowed, else g_NonMasterExecutedThrow.
19951c0b2f7Stbbdev         //      Only one context, so TGCCancelled should be <= g_NumThreads.
20051c0b2f7Stbbdev         //
20151c0b2f7Stbbdev         // the reasoning is similar for nested algorithms in a single context (Test2).
20251c0b2f7Stbbdev         //
20351c0b2f7Stbbdev         // If a thread throws in a context, more than one subsequent task body may see the
20451c0b2f7Stbbdev         // cancelled state (if they are scheduled before the state is propagated.) this is
20551c0b2f7Stbbdev         // infrequent, but it occurs.  So what was to be an assertion must be a remark.
20651c0b2f7Stbbdev         g_TGCCancelled.validate( g_NumThreads, "Too many tasks ran after exception thrown");
20751c0b2f7Stbbdev         REQUIRE_MESSAGE(g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
20851c0b2f7Stbbdev         if ( g_SolitaryException ) {
20951c0b2f7Stbbdev             REQUIRE_MESSAGE(g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test");
21051c0b2f7Stbbdev             REQUIRE_MESSAGE(g_NumExceptionsCaught == (g_ExceptionInMaster ? g_MasterExecutedThrow : g_NonMasterExecutedThrow),
21151c0b2f7Stbbdev                 "Not all throws were caught");
21251c0b2f7Stbbdev             REQUIRE_MESSAGE(g_ExecutedAtFirstCatch == g_ExecutedAtLastCatch, "Too many exceptions occurred");
21351c0b2f7Stbbdev         }
21451c0b2f7Stbbdev         else {
21551c0b2f7Stbbdev             REQUIRE_MESSAGE(g_NumExceptionsCaught >= 1, "No try blocks in any body expected in this test");
21651c0b2f7Stbbdev         }
21751c0b2f7Stbbdev     }
21851c0b2f7Stbbdev }  // TestParallelLoopAux
21951c0b2f7Stbbdev 
22051c0b2f7Stbbdev //! Test with parallel_for and parallel_reduce, over all three kinds of partitioners.
22151c0b2f7Stbbdev /** The Body only needs to be suitable for tbb::parallel_for. */
22251c0b2f7Stbbdev template<typename Body>
TestParallelLoop()22351c0b2f7Stbbdev void TestParallelLoop() {
22451c0b2f7Stbbdev     // The simple and auto partitioners should be const, but not the affinity partitioner.
22551c0b2f7Stbbdev     TestParallelLoopAux<Body, const tbb::simple_partitioner  >();
22651c0b2f7Stbbdev     TestParallelLoopAux<Body, const tbb::auto_partitioner    >();
22751c0b2f7Stbbdev #define __TBB_TEMPORARILY_DISABLED 1
22851c0b2f7Stbbdev #if !__TBB_TEMPORARILY_DISABLED
22951c0b2f7Stbbdev     // TODO: Improve the test so that it tolerates delayed start of tasks with affinity_partitioner
23051c0b2f7Stbbdev     TestParallelLoopAux<Body, /***/ tbb::affinity_partitioner>();
23151c0b2f7Stbbdev #endif
23251c0b2f7Stbbdev #undef __TBB_TEMPORARILY_DISABLED
23351c0b2f7Stbbdev }
23451c0b2f7Stbbdev 
23551c0b2f7Stbbdev class SimpleParForBody {
23651c0b2f7Stbbdev public:
23751c0b2f7Stbbdev     void operator=(const SimpleParForBody&) = delete;
23851c0b2f7Stbbdev     SimpleParForBody(const SimpleParForBody&) = default;
23951c0b2f7Stbbdev     SimpleParForBody() = default;
24051c0b2f7Stbbdev 
operator ()(const range_type & r) const24151c0b2f7Stbbdev     void operator()( const range_type& r ) const {
24251c0b2f7Stbbdev         utils::ConcurrencyTracker ct;
24351c0b2f7Stbbdev         ++g_CurExecuted;
24451c0b2f7Stbbdev         if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true;
24551c0b2f7Stbbdev         else g_NonMasterExecuted = true;
24651c0b2f7Stbbdev         if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment();
247b15aabb3Stbbdev         utils::doDummyWork(r.size());
24851c0b2f7Stbbdev         WaitUntilConcurrencyPeaks();
24951c0b2f7Stbbdev         ThrowTestException(1);
25051c0b2f7Stbbdev     }
25151c0b2f7Stbbdev };
25251c0b2f7Stbbdev 
Test1()25351c0b2f7Stbbdev void Test1() {
25451c0b2f7Stbbdev     // non-nested parallel_for/reduce with throwing body, one context
25551c0b2f7Stbbdev     TestParallelLoop<SimpleParForBody>();
25651c0b2f7Stbbdev } // void Test1 ()
25751c0b2f7Stbbdev 
25851c0b2f7Stbbdev class OuterParForBody {
25951c0b2f7Stbbdev public:
26051c0b2f7Stbbdev     void operator=(const OuterParForBody&) = delete;
26151c0b2f7Stbbdev     OuterParForBody(const OuterParForBody&) = default;
26251c0b2f7Stbbdev     OuterParForBody() = default;
operator ()(const range_type &) const26351c0b2f7Stbbdev     void operator()( const range_type& ) const {
26451c0b2f7Stbbdev         utils::ConcurrencyTracker ct;
26551c0b2f7Stbbdev         ++g_OuterParCalls;
26651c0b2f7Stbbdev         tbb::parallel_for( tbb::blocked_range<size_t>(0, INNER_RANGE, INNER_GRAIN), SimpleParForBody() );
26751c0b2f7Stbbdev     }
26851c0b2f7Stbbdev };
26951c0b2f7Stbbdev 
27051c0b2f7Stbbdev //! Uses parallel_for body containing an inner parallel_for with the default context not wrapped by a try-block.
27151c0b2f7Stbbdev /** Inner algorithms are spawned inside the new bound context by default. Since
27251c0b2f7Stbbdev     exceptions thrown from the inner parallel_for are not handled by the caller
27351c0b2f7Stbbdev     (outer parallel_for body) in this test, they will cancel all the sibling inner
27451c0b2f7Stbbdev     algorithms. **/
Test2()27551c0b2f7Stbbdev void Test2 () {
27651c0b2f7Stbbdev     TestParallelLoop<OuterParForBody>();
27751c0b2f7Stbbdev } // void Test2 ()
27851c0b2f7Stbbdev 
27951c0b2f7Stbbdev class OuterParForBodyWithIsolatedCtx {
28051c0b2f7Stbbdev public:
operator ()(const range_type &) const28151c0b2f7Stbbdev     void operator()( const range_type& ) const {
28251c0b2f7Stbbdev         tbb::task_group_context ctx(tbb::task_group_context::isolated);
28351c0b2f7Stbbdev         ++g_OuterParCalls;
28451c0b2f7Stbbdev         tbb::parallel_for( tbb::blocked_range<size_t>(0, INNER_RANGE, INNER_GRAIN), SimpleParForBody(), tbb::simple_partitioner(), ctx );
28551c0b2f7Stbbdev     }
28651c0b2f7Stbbdev };
28751c0b2f7Stbbdev 
28851c0b2f7Stbbdev //! Uses parallel_for body invoking an inner parallel_for with an isolated context without a try-block.
28951c0b2f7Stbbdev /** Even though exceptions thrown from the inner parallel_for are not handled
29051c0b2f7Stbbdev     by the caller in this test, they will not affect sibling inner algorithms
29151c0b2f7Stbbdev     already running because of the isolated contexts. However because the first
29251c0b2f7Stbbdev     exception cancels the root parallel_for only the first g_NumThreads subranges
29351c0b2f7Stbbdev     will be processed (which launch inner parallel_fors) **/
Test3()29451c0b2f7Stbbdev void Test3 () {
29551c0b2f7Stbbdev     ResetGlobals();
29651c0b2f7Stbbdev     typedef OuterParForBodyWithIsolatedCtx body_type;
29751c0b2f7Stbbdev     intptr_t  innerCalls = NumSubranges(INNER_RANGE, INNER_GRAIN),
29851c0b2f7Stbbdev             // we expect one thread to throw without counting, the rest to run to completion
29951c0b2f7Stbbdev             // this formula assumes g_numThreads outer pfor ranges will be started, but that is not the
30051c0b2f7Stbbdev             // case; the SimpleParFor subranges are started up as part of the outer ones, and when
30151c0b2f7Stbbdev             // the amount of concurrency reaches g_NumThreads no more outer Pfor ranges are started.
30251c0b2f7Stbbdev             // so we have to count the number of outer Pfors actually started.
30351c0b2f7Stbbdev             minExecuted = (g_NumThreads - 1) * innerCalls;
30451c0b2f7Stbbdev     TRY();
30551c0b2f7Stbbdev         tbb::parallel_for( range_type(0, OUTER_RANGE, OUTER_GRAIN), body_type() );
30651c0b2f7Stbbdev     CATCH_AND_ASSERT();
30751c0b2f7Stbbdev     minExecuted = (g_OuterParCalls - 1) * innerCalls;  // see above
30851c0b2f7Stbbdev 
30951c0b2f7Stbbdev     // The first formula above assumes all ranges of the outer parallel for are executed, and one
31051c0b2f7Stbbdev     // cancels.  In the event, we have a smaller number of ranges that start before the exception
31151c0b2f7Stbbdev     // is caught.
31251c0b2f7Stbbdev     //
31351c0b2f7Stbbdev     //  g_SolitaryException:One inner range throws.  Outer parallel_For is cancelled, but sibling
31451c0b2f7Stbbdev     //                      parallel_fors continue to completion (unless the threads that execute
31551c0b2f7Stbbdev     //                      are not allowed to throw, in which case we will not see any exceptions).
31651c0b2f7Stbbdev     // !g_SolitaryException:multiple inner ranges may throw.  Any which throws will stop, and the
31751c0b2f7Stbbdev     //                      corresponding range of the outer pfor will stop also.
31851c0b2f7Stbbdev     //
31951c0b2f7Stbbdev     // In either case, once the outer pfor gets the exception it will stop executing further ranges.
32051c0b2f7Stbbdev 
32151c0b2f7Stbbdev     // if the only threads executing were not allowed to throw, then not seeing an exception is okay.
32251c0b2f7Stbbdev     bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecuted) || (!g_ExceptionInMaster && !g_NonMasterExecuted);
32351c0b2f7Stbbdev     if ( g_SolitaryException ) {
32451c0b2f7Stbbdev         g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception");
32551c0b2f7Stbbdev         REQUIRE_MESSAGE (g_CurExecuted > minExecuted, "Too few tasks survived exception");
32651c0b2f7Stbbdev         REQUIRE_MESSAGE ((g_CurExecuted <= minExecuted + (g_ExecutedAtLastCatch + g_NumThreads)), "Too many tasks survived exception");
32751c0b2f7Stbbdev         REQUIRE_MESSAGE ((g_NumExceptionsCaught == 1 || okayNoExceptionsCaught), "No try_blocks in any body expected in this test");
32851c0b2f7Stbbdev     }
32951c0b2f7Stbbdev     else {
33051c0b2f7Stbbdev         REQUIRE_MESSAGE ((g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads), "Too many tasks survived exception");
33151c0b2f7Stbbdev         REQUIRE_MESSAGE ((g_NumExceptionsCaught >= 1 || okayNoExceptionsCaught), "No try_blocks in any body expected in this test");
33251c0b2f7Stbbdev     }
33351c0b2f7Stbbdev } // void Test3 ()
33451c0b2f7Stbbdev 
33551c0b2f7Stbbdev class OuterParForExceptionSafeBody {
33651c0b2f7Stbbdev public:
operator ()(const range_type &) const33751c0b2f7Stbbdev     void operator()( const range_type& ) const {
33851c0b2f7Stbbdev         tbb::task_group_context ctx(tbb::task_group_context::isolated);
33951c0b2f7Stbbdev         ++g_OuterParCalls;
34051c0b2f7Stbbdev         TRY();
34151c0b2f7Stbbdev             tbb::parallel_for( tbb::blocked_range<size_t>(0, INNER_RANGE, INNER_GRAIN), SimpleParForBody(), tbb::simple_partitioner(), ctx );
34251c0b2f7Stbbdev         CATCH();  // this macro sets g_ExceptionCaught
34351c0b2f7Stbbdev     }
34451c0b2f7Stbbdev };
34551c0b2f7Stbbdev 
34651c0b2f7Stbbdev //! Uses parallel_for body invoking an inner parallel_for (with isolated context) inside a try-block.
34751c0b2f7Stbbdev /** Since exception(s) thrown from the inner parallel_for are handled by the caller
34851c0b2f7Stbbdev     in this test, they do not affect neither other tasks of the the root parallel_for
34951c0b2f7Stbbdev     nor sibling inner algorithms. **/
Test4()35051c0b2f7Stbbdev void Test4 () {
35151c0b2f7Stbbdev     ResetGlobals( true, true );
35251c0b2f7Stbbdev     intptr_t  innerCalls = NumSubranges(INNER_RANGE, INNER_GRAIN),
35351c0b2f7Stbbdev               outerCalls = NumSubranges(OUTER_RANGE, OUTER_GRAIN);
35451c0b2f7Stbbdev     TRY();
35551c0b2f7Stbbdev         tbb::parallel_for( range_type(0, OUTER_RANGE, OUTER_GRAIN), OuterParForExceptionSafeBody() );
35651c0b2f7Stbbdev     CATCH();
35751c0b2f7Stbbdev     // g_SolitaryException  : one inner pfor will throw, the rest will execute to completion.
35851c0b2f7Stbbdev     //                        so the count should be (outerCalls -1) * innerCalls, if a throw happened.
35951c0b2f7Stbbdev     // !g_SolitaryException : possible multiple inner pfor throws.  Should be approximately
36051c0b2f7Stbbdev     //                        (outerCalls - g_NumExceptionsCaught) * innerCalls, give or take a few
36151c0b2f7Stbbdev     intptr_t  minExecuted = (outerCalls - g_NumExceptionsCaught) * innerCalls;
36251c0b2f7Stbbdev     bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecuted) || (!g_ExceptionInMaster && !g_NonMasterExecuted);
36351c0b2f7Stbbdev     if ( g_SolitaryException ) {
36451c0b2f7Stbbdev         // only one task had exception thrown. That task had at least one execution (the one that threw).
36551c0b2f7Stbbdev         // There may be an arbitrary number of ranges executed after the throw but before the exception
36651c0b2f7Stbbdev         // is caught in the scheduler and cancellation is signaled.  (seen 9, 11 and 62 (!) for 8 threads)
36751c0b2f7Stbbdev         REQUIRE_MESSAGE ((g_NumExceptionsCaught == 1 || okayNoExceptionsCaught), "No exception registered");
36851c0b2f7Stbbdev         REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too few tasks executed");
36951c0b2f7Stbbdev         g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception");
37051c0b2f7Stbbdev         // a small number of threads can execute in a throwing sub-pfor, if the task which is
37151c0b2f7Stbbdev         // to do the solitary throw swaps out after registering its intent to throw but before it
37251c0b2f7Stbbdev         // actually does so. As a result, the number of extra tasks cannot exceed the number of thread
37351c0b2f7Stbbdev         // for each nested pfor invication)
374ab1409e1SAlex         REQUIRE_MESSAGE (g_CurExecuted <= minExecuted + (g_ExecutedAtLastCatch + g_NumThreads), "Too many tasks survived exception");
37551c0b2f7Stbbdev     }
37651c0b2f7Stbbdev     else {
37751c0b2f7Stbbdev         REQUIRE_MESSAGE (((g_NumExceptionsCaught >= 1 && g_NumExceptionsCaught <= outerCalls) || okayNoExceptionsCaught), "Unexpected actual number of exceptions");
37851c0b2f7Stbbdev         REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too few executed tasks reported");
37951c0b2f7Stbbdev         REQUIRE_MESSAGE ((g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads), "Too many tasks survived multiple exceptions");
38051c0b2f7Stbbdev         REQUIRE_MESSAGE (g_CurExecuted <= outerCalls * (1 + g_NumThreads), "Too many tasks survived exception");
38151c0b2f7Stbbdev     }
38251c0b2f7Stbbdev } // void Test4 ()
38351c0b2f7Stbbdev 
38451c0b2f7Stbbdev //! Testing parallel_for and parallel_reduce exception handling
38551c0b2f7Stbbdev //! \brief \ref error_guessing
38651c0b2f7Stbbdev TEST_CASE("parallel_for and parallel_reduce exception handling test #0") {
38751c0b2f7Stbbdev     for (auto concurrency_level: utils::concurrency_range()) {
38851c0b2f7Stbbdev         g_NumThreads = static_cast<int>(concurrency_level);
38951c0b2f7Stbbdev         g_Master = std::this_thread::get_id();
39051c0b2f7Stbbdev         if (g_NumThreads > 1) {
391*c4568449SPavel Kumbrasev             tbb::task_arena a(g_NumThreads);
__anone85726900102null392*c4568449SPavel Kumbrasev             a.execute([] {
39351c0b2f7Stbbdev                 // Execute in all the possible modes
39451c0b2f7Stbbdev                 for (size_t j = 0; j < 4; ++j) {
39551c0b2f7Stbbdev                     g_ExceptionInMaster = (j & 1) != 0;
39651c0b2f7Stbbdev                     g_SolitaryException = (j & 2) != 0;
39751c0b2f7Stbbdev 
39851c0b2f7Stbbdev                     Test0();
39951c0b2f7Stbbdev                 }
400*c4568449SPavel Kumbrasev             });
40151c0b2f7Stbbdev         }
40251c0b2f7Stbbdev     }
40351c0b2f7Stbbdev }
40451c0b2f7Stbbdev 
40551c0b2f7Stbbdev //! Testing parallel_for and parallel_reduce exception handling
40651c0b2f7Stbbdev //! \brief \ref error_guessing
40751c0b2f7Stbbdev TEST_CASE("parallel_for and parallel_reduce exception handling test #1") {
40851c0b2f7Stbbdev     for (auto concurrency_level: utils::concurrency_range()) {
40951c0b2f7Stbbdev         g_NumThreads = static_cast<int>(concurrency_level);
41051c0b2f7Stbbdev         g_Master = std::this_thread::get_id();
41151c0b2f7Stbbdev         if (g_NumThreads > 1) {
412*c4568449SPavel Kumbrasev             tbb::task_arena a(g_NumThreads);
__anone85726900202null413*c4568449SPavel Kumbrasev             a.execute([] {
41451c0b2f7Stbbdev                 // Execute in all the possible modes
41551c0b2f7Stbbdev                 for (size_t j = 0; j < 4; ++j) {
41651c0b2f7Stbbdev                     g_ExceptionInMaster = (j & 1) != 0;
41751c0b2f7Stbbdev                     g_SolitaryException = (j & 2) != 0;
41851c0b2f7Stbbdev 
41951c0b2f7Stbbdev                     Test1();
42051c0b2f7Stbbdev                 }
421*c4568449SPavel Kumbrasev             });
42251c0b2f7Stbbdev         }
42351c0b2f7Stbbdev     }
42451c0b2f7Stbbdev }
42551c0b2f7Stbbdev 
42651c0b2f7Stbbdev //! Testing parallel_for and parallel_reduce exception handling
42751c0b2f7Stbbdev //! \brief \ref error_guessing
42851c0b2f7Stbbdev TEST_CASE("parallel_for and parallel_reduce exception handling test #2") {
42951c0b2f7Stbbdev     for (auto concurrency_level: utils::concurrency_range()) {
43051c0b2f7Stbbdev         g_NumThreads = static_cast<int>(concurrency_level);
43151c0b2f7Stbbdev         g_Master = std::this_thread::get_id();
43251c0b2f7Stbbdev         if (g_NumThreads > 1) {
433*c4568449SPavel Kumbrasev             tbb::task_arena a(g_NumThreads);
__anone85726900302null434*c4568449SPavel Kumbrasev             a.execute([] {
43551c0b2f7Stbbdev                 // Execute in all the possible modes
43651c0b2f7Stbbdev                 for (size_t j = 0; j < 4; ++j) {
43751c0b2f7Stbbdev                     g_ExceptionInMaster = (j & 1) != 0;
43851c0b2f7Stbbdev                     g_SolitaryException = (j & 2) != 0;
43951c0b2f7Stbbdev 
44051c0b2f7Stbbdev                     Test2();
44151c0b2f7Stbbdev                 }
442*c4568449SPavel Kumbrasev             });
44351c0b2f7Stbbdev         }
44451c0b2f7Stbbdev     }
44551c0b2f7Stbbdev }
44651c0b2f7Stbbdev 
44751c0b2f7Stbbdev //! Testing parallel_for and parallel_reduce exception handling
44851c0b2f7Stbbdev //! \brief \ref error_guessing
44951c0b2f7Stbbdev TEST_CASE("parallel_for and parallel_reduce exception handling test #3") {
45051c0b2f7Stbbdev     for (auto concurrency_level: utils::concurrency_range()) {
45151c0b2f7Stbbdev         g_NumThreads = static_cast<int>(concurrency_level);
45251c0b2f7Stbbdev         g_Master = std::this_thread::get_id();
45351c0b2f7Stbbdev         if (g_NumThreads > 1) {
454*c4568449SPavel Kumbrasev             tbb::task_arena a(g_NumThreads);
__anone85726900402null455*c4568449SPavel Kumbrasev             a.execute([] {
45651c0b2f7Stbbdev                 // Execute in all the possible modes
45751c0b2f7Stbbdev                 for (size_t j = 0; j < 4; ++j) {
45851c0b2f7Stbbdev                     g_ExceptionInMaster = (j & 1) != 0;
45951c0b2f7Stbbdev                     g_SolitaryException = (j & 2) != 0;
46051c0b2f7Stbbdev 
46151c0b2f7Stbbdev                     Test3();
46251c0b2f7Stbbdev                 }
463*c4568449SPavel Kumbrasev             });
46451c0b2f7Stbbdev         }
46551c0b2f7Stbbdev     }
46651c0b2f7Stbbdev }
46751c0b2f7Stbbdev 
46851c0b2f7Stbbdev //! Testing parallel_for and parallel_reduce exception handling
46951c0b2f7Stbbdev //! \brief \ref error_guessing
47051c0b2f7Stbbdev TEST_CASE("parallel_for and parallel_reduce exception handling test #4") {
47151c0b2f7Stbbdev     for (auto concurrency_level: utils::concurrency_range()) {
47251c0b2f7Stbbdev         g_NumThreads = static_cast<int>(concurrency_level);
47351c0b2f7Stbbdev         g_Master = std::this_thread::get_id();
47451c0b2f7Stbbdev         if (g_NumThreads > 1) {
475*c4568449SPavel Kumbrasev             tbb::task_arena a(g_NumThreads);
__anone85726900502null476*c4568449SPavel Kumbrasev             a.execute([] {
47751c0b2f7Stbbdev                 // Execute in all the possible modes
47851c0b2f7Stbbdev                 for (size_t j = 0; j < 4; ++j) {
47951c0b2f7Stbbdev                     g_ExceptionInMaster = (j & 1) != 0;
48051c0b2f7Stbbdev                     g_SolitaryException = (j & 2) != 0;
48151c0b2f7Stbbdev 
48251c0b2f7Stbbdev                     Test4();
48351c0b2f7Stbbdev                 }
484*c4568449SPavel Kumbrasev             });
48551c0b2f7Stbbdev         }
48651c0b2f7Stbbdev     }
48751c0b2f7Stbbdev }
48851c0b2f7Stbbdev 
48951c0b2f7Stbbdev #endif /* TBB_USE_EXCEPTIONS */
49051c0b2f7Stbbdev 
49151c0b2f7Stbbdev class ParForBodyToCancel {
49251c0b2f7Stbbdev public:
operator ()(const range_type &) const49351c0b2f7Stbbdev     void operator()( const range_type& ) const {
49451c0b2f7Stbbdev         ++g_CurExecuted;
49551c0b2f7Stbbdev         Cancellator::WaitUntilReady();
49651c0b2f7Stbbdev     }
49751c0b2f7Stbbdev };
49851c0b2f7Stbbdev 
49951c0b2f7Stbbdev template<class B>
50051c0b2f7Stbbdev class ParForLauncher {
50151c0b2f7Stbbdev     tbb::task_group_context &my_ctx;
50251c0b2f7Stbbdev public:
operator ()() const50351c0b2f7Stbbdev     void operator()() const {
50451c0b2f7Stbbdev         tbb::parallel_for( range_type(0, FLAT_RANGE, FLAT_GRAIN), B(), tbb::simple_partitioner(), my_ctx );
50551c0b2f7Stbbdev     }
ParForLauncher(tbb::task_group_context & ctx)50651c0b2f7Stbbdev     ParForLauncher ( tbb::task_group_context& ctx ) : my_ctx(ctx) {}
50751c0b2f7Stbbdev };
50851c0b2f7Stbbdev 
50951c0b2f7Stbbdev //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm).
TestCancelation1()51051c0b2f7Stbbdev void TestCancelation1 () {
51151c0b2f7Stbbdev     ResetGlobals( false );
51251c0b2f7Stbbdev     RunCancellationTest<ParForLauncher<ParForBodyToCancel>, Cancellator>( NumSubranges(FLAT_RANGE, FLAT_GRAIN) / 4 );
51351c0b2f7Stbbdev }
51451c0b2f7Stbbdev 
51551c0b2f7Stbbdev class Cancellator2  {
51651c0b2f7Stbbdev     tbb::task_group_context &m_GroupToCancel;
51751c0b2f7Stbbdev 
51851c0b2f7Stbbdev public:
operator ()() const51951c0b2f7Stbbdev     void operator()() const {
52051c0b2f7Stbbdev         utils::ConcurrencyTracker ct;
52151c0b2f7Stbbdev         WaitUntilConcurrencyPeaks();
52251c0b2f7Stbbdev         m_GroupToCancel.cancel_group_execution();
52351c0b2f7Stbbdev         g_ExecutedAtLastCatch = g_CurExecuted.load();
52451c0b2f7Stbbdev     }
52551c0b2f7Stbbdev 
Cancellator2(tbb::task_group_context & ctx,intptr_t)52651c0b2f7Stbbdev     Cancellator2 ( tbb::task_group_context& ctx, intptr_t ) : m_GroupToCancel(ctx) {}
52751c0b2f7Stbbdev };
52851c0b2f7Stbbdev 
52951c0b2f7Stbbdev class ParForBodyToCancel2 {
53051c0b2f7Stbbdev public:
operator ()(const range_type &) const53151c0b2f7Stbbdev     void operator()( const range_type& ) const {
53251c0b2f7Stbbdev         ++g_CurExecuted;
53351c0b2f7Stbbdev         utils::ConcurrencyTracker ct;
53451c0b2f7Stbbdev         // The test will hang (and be timed out by the test system) if is_cancelled() is broken
53551c0b2f7Stbbdev         while( !tbb::is_current_task_group_canceling() )
536b15aabb3Stbbdev             utils::yield();
53751c0b2f7Stbbdev     }
53851c0b2f7Stbbdev };
53951c0b2f7Stbbdev 
54051c0b2f7Stbbdev //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm).
54151c0b2f7Stbbdev /** This version also tests tbb::is_current_task_group_canceling() method. **/
TestCancelation2()54251c0b2f7Stbbdev void TestCancelation2 () {
54351c0b2f7Stbbdev     ResetGlobals();
54451c0b2f7Stbbdev     RunCancellationTest<ParForLauncher<ParForBodyToCancel2>, Cancellator2>();
54551c0b2f7Stbbdev     REQUIRE_MESSAGE (g_ExecutedAtLastCatch < g_NumThreads, "Somehow worker tasks started their execution before the cancellator task");
54651c0b2f7Stbbdev     g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived cancellation");
54751c0b2f7Stbbdev     REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Some tasks were executed after cancellation");
54851c0b2f7Stbbdev }
54951c0b2f7Stbbdev 
55051c0b2f7Stbbdev ////////////////////////////////////////////////////////////////////////////////
55151c0b2f7Stbbdev // Regression test based on the contribution by the author of the following forum post:
55251c0b2f7Stbbdev // http://softwarecommunity.intel.com/isn/Community/en-US/forums/thread/30254959.aspx
55351c0b2f7Stbbdev 
55451c0b2f7Stbbdev class Worker {
55551c0b2f7Stbbdev     static const int max_nesting = 3;
55651c0b2f7Stbbdev     static const int reduce_range = 1024;
55751c0b2f7Stbbdev     static const int reduce_grain = 256;
55851c0b2f7Stbbdev public:
55951c0b2f7Stbbdev     int DoWork (int level);
Validate(int start_level)56051c0b2f7Stbbdev     int Validate (int start_level) {
56151c0b2f7Stbbdev         int expected = 1; // identity for multiplication
56251c0b2f7Stbbdev         for(int i=start_level+1; i<max_nesting; ++i)
56351c0b2f7Stbbdev              expected *= reduce_range;
56451c0b2f7Stbbdev         return expected;
56551c0b2f7Stbbdev     }
56651c0b2f7Stbbdev };
56751c0b2f7Stbbdev 
56851c0b2f7Stbbdev class RecursiveParReduceBodyWithSharedWorker {
56951c0b2f7Stbbdev     Worker * m_SharedWorker;
57051c0b2f7Stbbdev     int m_NestingLevel;
57151c0b2f7Stbbdev     int m_Result;
57251c0b2f7Stbbdev public:
RecursiveParReduceBodyWithSharedWorker(RecursiveParReduceBodyWithSharedWorker & src,tbb::split)57351c0b2f7Stbbdev     RecursiveParReduceBodyWithSharedWorker ( RecursiveParReduceBodyWithSharedWorker& src, tbb::split )
57451c0b2f7Stbbdev         : m_SharedWorker(src.m_SharedWorker)
57551c0b2f7Stbbdev         , m_NestingLevel(src.m_NestingLevel)
57651c0b2f7Stbbdev         , m_Result(0)
57751c0b2f7Stbbdev     {}
RecursiveParReduceBodyWithSharedWorker(Worker * w,int outer)57851c0b2f7Stbbdev     RecursiveParReduceBodyWithSharedWorker ( Worker *w, int outer )
57951c0b2f7Stbbdev         : m_SharedWorker(w)
58051c0b2f7Stbbdev         , m_NestingLevel(outer)
58151c0b2f7Stbbdev         , m_Result(0)
58251c0b2f7Stbbdev     {}
58351c0b2f7Stbbdev 
operator ()(const tbb::blocked_range<size_t> & r)58451c0b2f7Stbbdev     void operator() ( const tbb::blocked_range<size_t>& r ) {
58551c0b2f7Stbbdev         if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true;
58651c0b2f7Stbbdev         else g_NonMasterExecuted = true;
58751c0b2f7Stbbdev         if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment();
58851c0b2f7Stbbdev         for (size_t i = r.begin (); i != r.end (); ++i) {
58951c0b2f7Stbbdev             m_Result += m_SharedWorker->DoWork (m_NestingLevel);
59051c0b2f7Stbbdev         }
59151c0b2f7Stbbdev     }
join(const RecursiveParReduceBodyWithSharedWorker & x)59251c0b2f7Stbbdev     void join (const RecursiveParReduceBodyWithSharedWorker & x) {
59351c0b2f7Stbbdev         m_Result += x.m_Result;
59451c0b2f7Stbbdev     }
result()59551c0b2f7Stbbdev     int result () { return m_Result; }
59651c0b2f7Stbbdev };
59751c0b2f7Stbbdev 
DoWork(int level)59851c0b2f7Stbbdev int Worker::DoWork ( int level ) {
59951c0b2f7Stbbdev     ++level;
60051c0b2f7Stbbdev     if ( level < max_nesting ) {
60151c0b2f7Stbbdev         RecursiveParReduceBodyWithSharedWorker rt (this, level);
60251c0b2f7Stbbdev         tbb::parallel_reduce (tbb::blocked_range<size_t>(0, reduce_range, reduce_grain), rt);
60351c0b2f7Stbbdev         return rt.result();
60451c0b2f7Stbbdev     }
60551c0b2f7Stbbdev     else
60651c0b2f7Stbbdev         return 1;
60751c0b2f7Stbbdev }
60851c0b2f7Stbbdev 
60951c0b2f7Stbbdev //! Regression test for hanging that occurred with the first version of cancellation propagation
TestCancelation3()61051c0b2f7Stbbdev void TestCancelation3 () {
61151c0b2f7Stbbdev     Worker w;
61251c0b2f7Stbbdev     int result   = w.DoWork (0);
61351c0b2f7Stbbdev     int expected = w.Validate(0);
61451c0b2f7Stbbdev     REQUIRE_MESSAGE ( result == expected, "Wrong calculation result");
61551c0b2f7Stbbdev }
61651c0b2f7Stbbdev 
61751c0b2f7Stbbdev struct StatsCounters {
61851c0b2f7Stbbdev     std::atomic<size_t> my_total_created;
61951c0b2f7Stbbdev     std::atomic<size_t> my_total_deleted;
StatsCountersStatsCounters62051c0b2f7Stbbdev     StatsCounters() {
62151c0b2f7Stbbdev         my_total_created = 0;
62251c0b2f7Stbbdev         my_total_deleted = 0;
62351c0b2f7Stbbdev     }
62451c0b2f7Stbbdev };
62551c0b2f7Stbbdev 
62651c0b2f7Stbbdev class ParReduceBody {
62751c0b2f7Stbbdev     StatsCounters* my_stats;
62851c0b2f7Stbbdev     size_t my_id;
62951c0b2f7Stbbdev     bool my_exception;
63051c0b2f7Stbbdev     tbb::task_group_context& tgc;
63151c0b2f7Stbbdev 
63251c0b2f7Stbbdev public:
ParReduceBody(StatsCounters & s_,tbb::task_group_context & context,bool e_)63351c0b2f7Stbbdev     ParReduceBody( StatsCounters& s_, tbb::task_group_context& context, bool e_ ) :
63451c0b2f7Stbbdev         my_stats(&s_), my_exception(e_), tgc(context) {
63551c0b2f7Stbbdev         my_id = my_stats->my_total_created++;
63651c0b2f7Stbbdev     }
63751c0b2f7Stbbdev 
ParReduceBody(const ParReduceBody & lhs)63851c0b2f7Stbbdev     ParReduceBody( const ParReduceBody& lhs ) : tgc(lhs.tgc) {
63951c0b2f7Stbbdev         my_stats = lhs.my_stats;
64051c0b2f7Stbbdev         my_id = my_stats->my_total_created++;
64151c0b2f7Stbbdev     }
64251c0b2f7Stbbdev 
ParReduceBody(ParReduceBody & lhs,tbb::split)64351c0b2f7Stbbdev     ParReduceBody( ParReduceBody& lhs, tbb::split ) : tgc(lhs.tgc) {
64451c0b2f7Stbbdev         my_stats = lhs.my_stats;
64551c0b2f7Stbbdev         my_id = my_stats->my_total_created++;
64651c0b2f7Stbbdev     }
64751c0b2f7Stbbdev 
~ParReduceBody()64851c0b2f7Stbbdev     ~ParReduceBody(){ ++my_stats->my_total_deleted; }
64951c0b2f7Stbbdev 
operator ()(const tbb::blocked_range<std::size_t> &) const65051c0b2f7Stbbdev     void operator()( const tbb::blocked_range<std::size_t>& /*range*/ ) const {
65151c0b2f7Stbbdev         //Do nothing, except for one task (chosen arbitrarily)
65251c0b2f7Stbbdev         if( my_id >= 12 ) {
65351c0b2f7Stbbdev             if( my_exception )
65451c0b2f7Stbbdev                 ThrowTestException(1);
65551c0b2f7Stbbdev             else
65651c0b2f7Stbbdev                 tgc.cancel_group_execution();
65751c0b2f7Stbbdev         }
65851c0b2f7Stbbdev     }
65951c0b2f7Stbbdev 
join(ParReduceBody &)66051c0b2f7Stbbdev     void join( ParReduceBody& /*rhs*/ ) {}
66151c0b2f7Stbbdev };
66251c0b2f7Stbbdev 
TestCancelation4()66351c0b2f7Stbbdev void TestCancelation4() {
66451c0b2f7Stbbdev     StatsCounters statsObj;
66551c0b2f7Stbbdev #if TBB_USE_EXCEPTIONS
66651c0b2f7Stbbdev     try
66751c0b2f7Stbbdev #endif
66851c0b2f7Stbbdev     {
66951c0b2f7Stbbdev         tbb::task_group_context tgc1, tgc2;
67051c0b2f7Stbbdev         ParReduceBody body_for_cancellation(statsObj, tgc1, false), body_for_exception(statsObj, tgc2, true);
67151c0b2f7Stbbdev         tbb::parallel_reduce( tbb::blocked_range<std::size_t>(0,100000000,100), body_for_cancellation, tbb::simple_partitioner(), tgc1 );
67251c0b2f7Stbbdev         tbb::parallel_reduce( tbb::blocked_range<std::size_t>(0,100000000,100), body_for_exception, tbb::simple_partitioner(), tgc2 );
67351c0b2f7Stbbdev     }
67451c0b2f7Stbbdev #if TBB_USE_EXCEPTIONS
67551c0b2f7Stbbdev     catch(...) {}
67651c0b2f7Stbbdev #endif
67751c0b2f7Stbbdev     REQUIRE_MESSAGE ( statsObj.my_total_created==statsObj.my_total_deleted, "Not all parallel_reduce body objects created were reclaimed");
67851c0b2f7Stbbdev }
67951c0b2f7Stbbdev 
68051c0b2f7Stbbdev //! Testing parallel_for and parallel_reduce cancellation
68151c0b2f7Stbbdev //! \brief \ref error_guessing
68251c0b2f7Stbbdev TEST_CASE("parallel_for and parallel_reduce cancellation test #1") {
68351c0b2f7Stbbdev     for (auto concurrency_level: utils::concurrency_range()) {
68451c0b2f7Stbbdev         g_NumThreads = static_cast<int>(concurrency_level);
68551c0b2f7Stbbdev         g_Master = std::this_thread::get_id();
68651c0b2f7Stbbdev         if (g_NumThreads > 1) {
687*c4568449SPavel Kumbrasev             tbb::task_arena a(g_NumThreads);
__anone85726900602null688*c4568449SPavel Kumbrasev             a.execute([] {
68951c0b2f7Stbbdev                 // Execute in all the possible modes
69051c0b2f7Stbbdev                 for (size_t j = 0; j < 4; ++j) {
69151c0b2f7Stbbdev                     g_ExceptionInMaster = (j & 1) != 0;
69251c0b2f7Stbbdev                     g_SolitaryException = (j & 2) != 0;
69351c0b2f7Stbbdev 
69451c0b2f7Stbbdev                     TestCancelation1();
69551c0b2f7Stbbdev                 }
696*c4568449SPavel Kumbrasev             });
69751c0b2f7Stbbdev         }
69851c0b2f7Stbbdev     }
69951c0b2f7Stbbdev }
70051c0b2f7Stbbdev 
70151c0b2f7Stbbdev //! Testing parallel_for and parallel_reduce cancellation
70251c0b2f7Stbbdev //! \brief \ref error_guessing
70351c0b2f7Stbbdev TEST_CASE("parallel_for and parallel_reduce cancellation test #2") {
70451c0b2f7Stbbdev     for (auto concurrency_level: utils::concurrency_range()) {
70551c0b2f7Stbbdev         g_NumThreads = static_cast<int>(concurrency_level);
70651c0b2f7Stbbdev         g_Master = std::this_thread::get_id();
70751c0b2f7Stbbdev         if (g_NumThreads > 1) {
708*c4568449SPavel Kumbrasev             tbb::task_arena a(g_NumThreads);
__anone85726900702null709*c4568449SPavel Kumbrasev             a.execute([] {
71051c0b2f7Stbbdev                 // Execute in all the possible modes
71151c0b2f7Stbbdev                 for (size_t j = 0; j < 4; ++j) {
71251c0b2f7Stbbdev                     g_ExceptionInMaster = (j & 1) != 0;
71351c0b2f7Stbbdev                     g_SolitaryException = (j & 2) != 0;
71451c0b2f7Stbbdev 
71551c0b2f7Stbbdev                     TestCancelation2();
71651c0b2f7Stbbdev                 }
717*c4568449SPavel Kumbrasev             });
71851c0b2f7Stbbdev         }
71951c0b2f7Stbbdev     }
72051c0b2f7Stbbdev }
72151c0b2f7Stbbdev 
72251c0b2f7Stbbdev //! Testing parallel_for and parallel_reduce cancellation
72351c0b2f7Stbbdev //! \brief \ref error_guessing
72451c0b2f7Stbbdev TEST_CASE("parallel_for and parallel_reduce cancellation test #3") {
72551c0b2f7Stbbdev     for (auto concurrency_level: utils::concurrency_range()) {
72651c0b2f7Stbbdev         g_NumThreads = static_cast<int>(concurrency_level);
72751c0b2f7Stbbdev         g_Master = std::this_thread::get_id();
72851c0b2f7Stbbdev         if (g_NumThreads > 1) {
729*c4568449SPavel Kumbrasev             tbb::task_arena a(g_NumThreads);
__anone85726900802null730*c4568449SPavel Kumbrasev             a.execute([] {
73151c0b2f7Stbbdev                 // Execute in all the possible modes
73251c0b2f7Stbbdev                 for (size_t j = 0; j < 4; ++j) {
73351c0b2f7Stbbdev                     g_ExceptionInMaster = (j & 1) != 0;
73451c0b2f7Stbbdev                     g_SolitaryException = (j & 2) != 0;
73551c0b2f7Stbbdev 
73651c0b2f7Stbbdev                     TestCancelation3();
73751c0b2f7Stbbdev                 }
738*c4568449SPavel Kumbrasev             });
73951c0b2f7Stbbdev         }
74051c0b2f7Stbbdev     }
74151c0b2f7Stbbdev }
74251c0b2f7Stbbdev 
74351c0b2f7Stbbdev //! Testing parallel_for and parallel_reduce cancellation
74451c0b2f7Stbbdev //! \brief \ref error_guessing
74551c0b2f7Stbbdev TEST_CASE("parallel_for and parallel_reduce cancellation test #4") {
74651c0b2f7Stbbdev     for (auto concurrency_level: utils::concurrency_range()) {
74751c0b2f7Stbbdev         g_NumThreads = static_cast<int>(concurrency_level);
74851c0b2f7Stbbdev         g_Master = std::this_thread::get_id();
74951c0b2f7Stbbdev         if (g_NumThreads > 1) {
750*c4568449SPavel Kumbrasev             tbb::task_arena a(g_NumThreads);
__anone85726900902null751*c4568449SPavel Kumbrasev             a.execute([] {
75251c0b2f7Stbbdev                 // Execute in all the possible modes
75351c0b2f7Stbbdev                 for (size_t j = 0; j < 4; ++j) {
75451c0b2f7Stbbdev                     g_ExceptionInMaster = (j & 1) != 0;
75551c0b2f7Stbbdev                     g_SolitaryException = (j & 2) != 0;
75651c0b2f7Stbbdev 
75751c0b2f7Stbbdev                     TestCancelation4();
75851c0b2f7Stbbdev                 }
759*c4568449SPavel Kumbrasev             });
76051c0b2f7Stbbdev         }
76151c0b2f7Stbbdev     }
76251c0b2f7Stbbdev }
76351c0b2f7Stbbdev 
76451c0b2f7Stbbdev ////////////////////////////////////////////////////////////////////////////////
76551c0b2f7Stbbdev // Tests for tbb::parallel_for_each
76651c0b2f7Stbbdev ////////////////////////////////////////////////////////////////////////////////
76751c0b2f7Stbbdev 
get_iter_range_size()7688b6f831cStbbdev std::size_t get_iter_range_size() {
7698b6f831cStbbdev     // Set the minimal iteration sequence size to 50 to improve test complexity on small machines
7708b6f831cStbbdev     return std::max(50, g_NumThreads * 2);
7718b6f831cStbbdev }
77251c0b2f7Stbbdev 
7738b6f831cStbbdev template<typename Iterator>
7748b6f831cStbbdev struct adaptive_range {
7758b6f831cStbbdev     std::vector<std::size_t> my_array;
7768b6f831cStbbdev 
adaptive_rangeadaptive_range7778b6f831cStbbdev     adaptive_range(std::size_t size) : my_array(size + 1) {}
7788b6f831cStbbdev     using iterator = Iterator;
7798b6f831cStbbdev 
beginadaptive_range7808b6f831cStbbdev     iterator begin() const {
7818b6f831cStbbdev         return iterator{&my_array.front()};
7828b6f831cStbbdev     }
beginadaptive_range7838b6f831cStbbdev     iterator begin() {
7848b6f831cStbbdev         return iterator{&my_array.front()};
7858b6f831cStbbdev     }
endadaptive_range7868b6f831cStbbdev     iterator end() const {
7878b6f831cStbbdev         return iterator{&my_array.back()};
7888b6f831cStbbdev     }
endadaptive_range7898b6f831cStbbdev     iterator end() {
7908b6f831cStbbdev         return iterator{&my_array.back()};
7918b6f831cStbbdev     }
7928b6f831cStbbdev };
79351c0b2f7Stbbdev 
Feed(tbb::feeder<size_t> & feeder,size_t val)79451c0b2f7Stbbdev void Feed ( tbb::feeder<size_t> &feeder, size_t val ) {
7958b6f831cStbbdev     if (g_FedTasksCount < 50) {
79651c0b2f7Stbbdev         ++g_FedTasksCount;
79751c0b2f7Stbbdev         feeder.add(val);
79851c0b2f7Stbbdev     }
79951c0b2f7Stbbdev }
80051c0b2f7Stbbdev 
80151c0b2f7Stbbdev #define RunWithSimpleBody(func, body)       \
80251c0b2f7Stbbdev     func<utils::ForwardIterator<size_t>, body>();         \
80351c0b2f7Stbbdev     func<utils::ForwardIterator<size_t>, body##WithFeeder>()
80451c0b2f7Stbbdev 
80551c0b2f7Stbbdev #define RunWithTemplatedBody(func, body)     \
80651c0b2f7Stbbdev     func<utils::ForwardIterator<size_t>, body<utils::ForwardIterator<size_t> > >();         \
80751c0b2f7Stbbdev     func<utils::ForwardIterator<size_t>, body##WithFeeder<utils::ForwardIterator<size_t> > >()
80851c0b2f7Stbbdev 
80951c0b2f7Stbbdev #if TBB_USE_EXCEPTIONS
81051c0b2f7Stbbdev 
81151c0b2f7Stbbdev // Simple functor object with exception
81251c0b2f7Stbbdev class SimpleParForEachBody {
81351c0b2f7Stbbdev public:
operator ()(size_t & value) const81451c0b2f7Stbbdev     void operator() ( size_t &value ) const {
81551c0b2f7Stbbdev         ++g_CurExecuted;
81651c0b2f7Stbbdev         if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true;
81751c0b2f7Stbbdev         else g_NonMasterExecuted = true;
81851c0b2f7Stbbdev         if( tbb::is_current_task_group_canceling() ) {
81951c0b2f7Stbbdev             g_TGCCancelled.increment();
82051c0b2f7Stbbdev         }
82151c0b2f7Stbbdev         utils::ConcurrencyTracker ct;
82251c0b2f7Stbbdev         value += 1000;
82351c0b2f7Stbbdev         WaitUntilConcurrencyPeaks();
82451c0b2f7Stbbdev         ThrowTestException(1);
82551c0b2f7Stbbdev     }
82651c0b2f7Stbbdev };
82751c0b2f7Stbbdev 
82851c0b2f7Stbbdev // Simple functor object with exception and feeder
82951c0b2f7Stbbdev class SimpleParForEachBodyWithFeeder : SimpleParForEachBody {
83051c0b2f7Stbbdev public:
operator ()(size_t & value,tbb::feeder<size_t> & feeder) const83151c0b2f7Stbbdev     void operator() ( size_t &value, tbb::feeder<size_t> &feeder ) const {
83251c0b2f7Stbbdev         Feed(feeder, 0);
83351c0b2f7Stbbdev         SimpleParForEachBody::operator()(value);
83451c0b2f7Stbbdev     }
83551c0b2f7Stbbdev };
83651c0b2f7Stbbdev 
83751c0b2f7Stbbdev // Tests exceptions without nesting
83851c0b2f7Stbbdev template <class Iterator, class simple_body>
Test1_parallel_for_each()83951c0b2f7Stbbdev void Test1_parallel_for_each () {
84051c0b2f7Stbbdev     ResetGlobals();
8418b6f831cStbbdev     auto range = adaptive_range<Iterator>(get_iter_range_size());
84251c0b2f7Stbbdev     TRY();
8438b6f831cStbbdev         tbb::parallel_for_each(std::begin(range), std::end(range), simple_body() );
84451c0b2f7Stbbdev     CATCH_AND_ASSERT();
84551c0b2f7Stbbdev     REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
84651c0b2f7Stbbdev     g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived cancellation");
84751c0b2f7Stbbdev     REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test");
84851c0b2f7Stbbdev     if ( !g_SolitaryException )
84951c0b2f7Stbbdev         REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
85051c0b2f7Stbbdev 
85151c0b2f7Stbbdev } // void Test1_parallel_for_each ()
85251c0b2f7Stbbdev 
85351c0b2f7Stbbdev template <class Iterator>
85451c0b2f7Stbbdev class OuterParForEachBody {
85551c0b2f7Stbbdev public:
operator ()(size_t &) const85651c0b2f7Stbbdev     void operator()( size_t& /*value*/ ) const {
85751c0b2f7Stbbdev         ++g_OuterParCalls;
8588b6f831cStbbdev         auto range = adaptive_range<Iterator>(get_iter_range_size());
8598b6f831cStbbdev         tbb::parallel_for_each(std::begin(range), std::end(range), SimpleParForEachBody());
86051c0b2f7Stbbdev     }
86151c0b2f7Stbbdev };
86251c0b2f7Stbbdev 
86351c0b2f7Stbbdev template <class Iterator>
86451c0b2f7Stbbdev class OuterParForEachBodyWithFeeder : OuterParForEachBody<Iterator> {
86551c0b2f7Stbbdev public:
operator ()(size_t & value,tbb::feeder<size_t> & feeder) const86651c0b2f7Stbbdev     void operator()( size_t& value, tbb::feeder<size_t>& feeder ) const {
86751c0b2f7Stbbdev         Feed(feeder, 0);
86851c0b2f7Stbbdev         OuterParForEachBody<Iterator>::operator()(value);
86951c0b2f7Stbbdev     }
87051c0b2f7Stbbdev };
87151c0b2f7Stbbdev 
87251c0b2f7Stbbdev //! Uses parallel_for_each body containing an inner parallel_for_each with the default context not wrapped by a try-block.
87351c0b2f7Stbbdev /** Inner algorithms are spawned inside the new bound context by default. Since
87451c0b2f7Stbbdev     exceptions thrown from the inner parallel_for_each are not handled by the caller
87551c0b2f7Stbbdev     (outer parallel_for_each body) in this test, they will cancel all the sibling inner
87651c0b2f7Stbbdev     algorithms. **/
87751c0b2f7Stbbdev template <class Iterator, class outer_body>
Test2_parallel_for_each()87851c0b2f7Stbbdev void Test2_parallel_for_each () {
87951c0b2f7Stbbdev     ResetGlobals();
8808b6f831cStbbdev     auto range = adaptive_range<Iterator>(get_iter_range_size());
88151c0b2f7Stbbdev     TRY();
8828b6f831cStbbdev         tbb::parallel_for_each(std::begin(range), std::end(range), outer_body() );
88351c0b2f7Stbbdev     CATCH_AND_ASSERT();
88451c0b2f7Stbbdev     REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
88551c0b2f7Stbbdev     g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived cancellation");
88651c0b2f7Stbbdev     REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test");
88751c0b2f7Stbbdev     if ( !g_SolitaryException )
88851c0b2f7Stbbdev         REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
88951c0b2f7Stbbdev } // void Test2_parallel_for_each ()
89051c0b2f7Stbbdev 
89151c0b2f7Stbbdev template <class Iterator>
89251c0b2f7Stbbdev class OuterParForEachBodyWithIsolatedCtx {
89351c0b2f7Stbbdev public:
operator ()(size_t &) const89451c0b2f7Stbbdev     void operator()( size_t& /*value*/ ) const {
89551c0b2f7Stbbdev         tbb::task_group_context ctx(tbb::task_group_context::isolated);
89651c0b2f7Stbbdev         ++g_OuterParCalls;
8978b6f831cStbbdev         auto range = adaptive_range<Iterator>(get_iter_range_size());
8988b6f831cStbbdev         tbb::parallel_for_each(std::begin(range), std::end(range), SimpleParForEachBody(), ctx);
89951c0b2f7Stbbdev     }
90051c0b2f7Stbbdev };
90151c0b2f7Stbbdev 
90251c0b2f7Stbbdev template <class Iterator>
90351c0b2f7Stbbdev class OuterParForEachBodyWithIsolatedCtxWithFeeder : OuterParForEachBodyWithIsolatedCtx<Iterator> {
90451c0b2f7Stbbdev public:
operator ()(size_t & value,tbb::feeder<size_t> & feeder) const90551c0b2f7Stbbdev     void operator()( size_t& value, tbb::feeder<size_t> &feeder ) const {
90651c0b2f7Stbbdev         Feed(feeder, 0);
90751c0b2f7Stbbdev         OuterParForEachBodyWithIsolatedCtx<Iterator>::operator()(value);
90851c0b2f7Stbbdev     }
90951c0b2f7Stbbdev };
91051c0b2f7Stbbdev 
91151c0b2f7Stbbdev //! Uses parallel_for_each body invoking an inner parallel_for_each with an isolated context without a try-block.
91251c0b2f7Stbbdev /** Even though exceptions thrown from the inner parallel_for_each are not handled
91351c0b2f7Stbbdev     by the caller in this test, they will not affect sibling inner algorithms
91451c0b2f7Stbbdev     already running because of the isolated contexts. However because the first
91551c0b2f7Stbbdev     exception cancels the root parallel_for_each, at most the first g_NumThreads subranges
91651c0b2f7Stbbdev     will be processed (which launch inner parallel_for_eachs) **/
91751c0b2f7Stbbdev template <class Iterator, class outer_body>
Test3_parallel_for_each()91851c0b2f7Stbbdev void Test3_parallel_for_each () {
91951c0b2f7Stbbdev     ResetGlobals();
9208b6f831cStbbdev     auto range = adaptive_range<Iterator>(get_iter_range_size());
9218b6f831cStbbdev     intptr_t innerCalls = get_iter_range_size(),
92251c0b2f7Stbbdev              // The assumption here is the same as in outer parallel fors.
92351c0b2f7Stbbdev              minExecuted = (g_NumThreads - 1) * innerCalls;
92451c0b2f7Stbbdev     g_Master = std::this_thread::get_id();
92551c0b2f7Stbbdev     TRY();
9268b6f831cStbbdev         tbb::parallel_for_each(std::begin(range), std::end(range), outer_body());
92751c0b2f7Stbbdev     CATCH_AND_ASSERT();
92851c0b2f7Stbbdev     // figure actual number of expected executions given the number of outer PDos started.
92951c0b2f7Stbbdev     minExecuted = (g_OuterParCalls - 1) * innerCalls;
93051c0b2f7Stbbdev     // one extra thread may run a task that sees cancellation.  Infrequent but possible
93151c0b2f7Stbbdev     g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception");
93251c0b2f7Stbbdev     if ( g_SolitaryException ) {
93351c0b2f7Stbbdev         REQUIRE_MESSAGE (g_CurExecuted > minExecuted, "Too few tasks survived exception");
93451c0b2f7Stbbdev         REQUIRE_MESSAGE (g_CurExecuted <= minExecuted + (g_ExecutedAtLastCatch + g_NumThreads), "Too many tasks survived exception");
93551c0b2f7Stbbdev     }
93651c0b2f7Stbbdev     REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test");
93751c0b2f7Stbbdev     if ( !g_SolitaryException )
93851c0b2f7Stbbdev         REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
93951c0b2f7Stbbdev } // void Test3_parallel_for_each ()
94051c0b2f7Stbbdev 
94151c0b2f7Stbbdev template <class Iterator>
94251c0b2f7Stbbdev class OuterParForEachWithEhBody {
94351c0b2f7Stbbdev public:
operator ()(size_t &) const94451c0b2f7Stbbdev     void operator()( size_t& /*value*/ ) const {
94551c0b2f7Stbbdev         tbb::task_group_context ctx(tbb::task_group_context::isolated);
94651c0b2f7Stbbdev         ++g_OuterParCalls;
9478b6f831cStbbdev         auto range = adaptive_range<Iterator>(get_iter_range_size());
94851c0b2f7Stbbdev         TRY();
9498b6f831cStbbdev             tbb::parallel_for_each(std::begin(range), std::end(range), SimpleParForEachBody(), ctx);
95051c0b2f7Stbbdev         CATCH();
95151c0b2f7Stbbdev     }
95251c0b2f7Stbbdev };
95351c0b2f7Stbbdev 
95451c0b2f7Stbbdev template <class Iterator>
95551c0b2f7Stbbdev class OuterParForEachWithEhBodyWithFeeder : OuterParForEachWithEhBody<Iterator> {
95651c0b2f7Stbbdev public:
95751c0b2f7Stbbdev     void operator=(const OuterParForEachWithEhBodyWithFeeder&) = delete;
95851c0b2f7Stbbdev     OuterParForEachWithEhBodyWithFeeder(const OuterParForEachWithEhBodyWithFeeder&) = default;
95951c0b2f7Stbbdev     OuterParForEachWithEhBodyWithFeeder() = default;
operator ()(size_t & value,tbb::feeder<size_t> & feeder) const96051c0b2f7Stbbdev     void operator()( size_t &value, tbb::feeder<size_t> &feeder ) const {
96151c0b2f7Stbbdev         Feed(feeder, 0);
96251c0b2f7Stbbdev         OuterParForEachWithEhBody<Iterator>::operator()(value);
96351c0b2f7Stbbdev     }
96451c0b2f7Stbbdev };
96551c0b2f7Stbbdev 
96651c0b2f7Stbbdev //! Uses parallel_for body invoking an inner parallel_for (with default bound context) inside a try-block.
96751c0b2f7Stbbdev /** Since exception(s) thrown from the inner parallel_for are handled by the caller
96851c0b2f7Stbbdev     in this test, they do not affect neither other tasks of the the root parallel_for
96951c0b2f7Stbbdev     nor sibling inner algorithms. **/
97051c0b2f7Stbbdev template <class Iterator, class outer_body_with_eh>
Test4_parallel_for_each()97151c0b2f7Stbbdev void Test4_parallel_for_each () {
97251c0b2f7Stbbdev     ResetGlobals( true, true );
9738b6f831cStbbdev     auto range = adaptive_range<Iterator>(get_iter_range_size());
97451c0b2f7Stbbdev     g_Master = std::this_thread::get_id();
97551c0b2f7Stbbdev     TRY();
9768b6f831cStbbdev         tbb::parallel_for_each(std::begin(range), std::end(range), outer_body_with_eh());
97751c0b2f7Stbbdev     CATCH();
97851c0b2f7Stbbdev     REQUIRE_MESSAGE (!l_ExceptionCaughtAtCurrentLevel, "All exceptions must have been handled in the parallel_for_each body");
9798b6f831cStbbdev     intptr_t innerCalls = get_iter_range_size(),
9808b6f831cStbbdev              outerCalls = get_iter_range_size() + g_FedTasksCount,
98151c0b2f7Stbbdev              maxExecuted = outerCalls * innerCalls,
98251c0b2f7Stbbdev              minExecuted = 0;
98351c0b2f7Stbbdev     g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception");
98451c0b2f7Stbbdev     if ( g_SolitaryException ) {
98551c0b2f7Stbbdev         minExecuted = maxExecuted - innerCalls;
98651c0b2f7Stbbdev         REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No exception registered");
98751c0b2f7Stbbdev         REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too few tasks executed");
98851c0b2f7Stbbdev         // This test has the same property as Test4 (parallel_for); the exception can be
98951c0b2f7Stbbdev         // thrown, but some number of tasks from the outer Pdo can execute after the throw but
99051c0b2f7Stbbdev         // before the cancellation is signaled (have seen 36).
99151c0b2f7Stbbdev         DOCTEST_WARN_MESSAGE(g_CurExecuted < maxExecuted, "All tasks survived exception. Oversubscription?");
99251c0b2f7Stbbdev     }
99351c0b2f7Stbbdev     else {
99451c0b2f7Stbbdev         minExecuted = g_NumExceptionsCaught;
99551c0b2f7Stbbdev         REQUIRE_MESSAGE ((g_NumExceptionsCaught > 1 && g_NumExceptionsCaught <= outerCalls), "Unexpected actual number of exceptions");
99651c0b2f7Stbbdev         REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too many executed tasks reported");
99751c0b2f7Stbbdev         REQUIRE_MESSAGE (g_CurExecuted < g_ExecutedAtLastCatch + g_NumThreads + outerCalls, "Too many tasks survived multiple exceptions");
99851c0b2f7Stbbdev         REQUIRE_MESSAGE (g_CurExecuted <= outerCalls * (1 + g_NumThreads), "Too many tasks survived exception");
99951c0b2f7Stbbdev     }
100051c0b2f7Stbbdev } // void Test4_parallel_for_each ()
100151c0b2f7Stbbdev 
100251c0b2f7Stbbdev // This body throws an exception only if the task was added by feeder
100351c0b2f7Stbbdev class ParForEachBodyWithThrowingFeederTasks {
100451c0b2f7Stbbdev public:
100551c0b2f7Stbbdev     //! This form of the function call operator can be used when the body needs to add more work during the processing
operator ()(const size_t & value,tbb::feeder<size_t> & feeder) const1006b15aabb3Stbbdev     void operator() (const size_t &value, tbb::feeder<size_t> &feeder ) const {
100751c0b2f7Stbbdev         ++g_CurExecuted;
100851c0b2f7Stbbdev         if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true;
100951c0b2f7Stbbdev         else g_NonMasterExecuted = true;
101051c0b2f7Stbbdev         if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment();
101151c0b2f7Stbbdev         Feed(feeder, 1);
101251c0b2f7Stbbdev         if (value == 1)
101351c0b2f7Stbbdev             ThrowTestException(1);
101451c0b2f7Stbbdev     }
101551c0b2f7Stbbdev }; // class ParForEachBodyWithThrowingFeederTasks
101651c0b2f7Stbbdev 
101751c0b2f7Stbbdev // Test exception in task, which was added by feeder.
101851c0b2f7Stbbdev template <class Iterator>
Test5_parallel_for_each()101951c0b2f7Stbbdev void Test5_parallel_for_each () {
102051c0b2f7Stbbdev     ResetGlobals();
10218b6f831cStbbdev     auto range = adaptive_range<Iterator>(get_iter_range_size());
102251c0b2f7Stbbdev     g_Master = std::this_thread::get_id();
102351c0b2f7Stbbdev     TRY();
10248b6f831cStbbdev         tbb::parallel_for_each(std::begin(range), std::end(range), ParForEachBodyWithThrowingFeederTasks());
102551c0b2f7Stbbdev     CATCH();
102651c0b2f7Stbbdev     if (g_SolitaryException) {
102751c0b2f7Stbbdev         // Failure occurs when g_ExceptionInMaster is false, but all the 1 values in the range
1028b15aabb3Stbbdev         // are handled by the external thread.  In this case no throw occurs.
102951c0b2f7Stbbdev         REQUIRE_MESSAGE ((l_ExceptionCaughtAtCurrentLevel               // we saw an exception
1030b15aabb3Stbbdev                 || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow)  // non-external trhead throws but none tried
1031b15aabb3Stbbdev                 || (g_ExceptionInMaster && !g_MasterExecutedThrow))     // external thread throws but external thread didn't try
103251c0b2f7Stbbdev                 , "At least one exception should occur");
103351c0b2f7Stbbdev     }
103451c0b2f7Stbbdev } // void Test5_parallel_for_each ()
103551c0b2f7Stbbdev 
103651c0b2f7Stbbdev //! Testing parallel_for_each exception handling
103751c0b2f7Stbbdev //! \brief \ref error_guessing
103851c0b2f7Stbbdev TEST_CASE("parallel_for_each exception handling test #1") {
103951c0b2f7Stbbdev     for (auto concurrency_level: utils::concurrency_range()) {
104051c0b2f7Stbbdev         g_NumThreads = static_cast<int>(concurrency_level);
104151c0b2f7Stbbdev         g_Master = std::this_thread::get_id();
104251c0b2f7Stbbdev         if (g_NumThreads > 1) {
1043*c4568449SPavel Kumbrasev             tbb::task_arena a(g_NumThreads);
__anone85726900a02null1044*c4568449SPavel Kumbrasev             a.execute([] {
104551c0b2f7Stbbdev                 // Execute in all the possible modes
104651c0b2f7Stbbdev                 for (size_t j = 0; j < 4; ++j) {
104751c0b2f7Stbbdev                     g_ExceptionInMaster = (j & 1) != 0;
104851c0b2f7Stbbdev                     g_SolitaryException = (j & 2) != 0;
104951c0b2f7Stbbdev 
105051c0b2f7Stbbdev                     RunWithSimpleBody(Test1_parallel_for_each, SimpleParForEachBody);
105151c0b2f7Stbbdev                 }
1052*c4568449SPavel Kumbrasev             });
105351c0b2f7Stbbdev         }
105451c0b2f7Stbbdev     }
105551c0b2f7Stbbdev }
105651c0b2f7Stbbdev 
105751c0b2f7Stbbdev //! Testing parallel_for_each exception handling
105851c0b2f7Stbbdev //! \brief \ref error_guessing
105951c0b2f7Stbbdev TEST_CASE("parallel_for_each exception handling test #2") {
106051c0b2f7Stbbdev     for (auto concurrency_level: utils::concurrency_range()) {
106151c0b2f7Stbbdev         g_NumThreads = static_cast<int>(concurrency_level);
106251c0b2f7Stbbdev         g_Master = std::this_thread::get_id();
106351c0b2f7Stbbdev         if (g_NumThreads > 1) {
1064*c4568449SPavel Kumbrasev             tbb::task_arena a(g_NumThreads);
__anone85726900b02null1065*c4568449SPavel Kumbrasev             a.execute([] {
106651c0b2f7Stbbdev                 // Execute in all the possible modes
106751c0b2f7Stbbdev                 for (size_t j = 0; j < 4; ++j) {
106851c0b2f7Stbbdev                     g_ExceptionInMaster = (j & 1) != 0;
106951c0b2f7Stbbdev                     g_SolitaryException = (j & 2) != 0;
107051c0b2f7Stbbdev 
107151c0b2f7Stbbdev                     RunWithTemplatedBody(Test2_parallel_for_each, OuterParForEachBody);
107251c0b2f7Stbbdev                 }
1073*c4568449SPavel Kumbrasev             });
107451c0b2f7Stbbdev         }
107551c0b2f7Stbbdev     }
107651c0b2f7Stbbdev }
107751c0b2f7Stbbdev 
107851c0b2f7Stbbdev //! Testing parallel_for_each exception handling
107951c0b2f7Stbbdev //! \brief \ref error_guessing
108051c0b2f7Stbbdev TEST_CASE("parallel_for_each exception handling test #3") {
108151c0b2f7Stbbdev     for (auto concurrency_level: utils::concurrency_range()) {
108251c0b2f7Stbbdev         g_NumThreads = static_cast<int>(concurrency_level);
108351c0b2f7Stbbdev         g_Master = std::this_thread::get_id();
108451c0b2f7Stbbdev         if (g_NumThreads > 1) {
1085*c4568449SPavel Kumbrasev             tbb::task_arena a(g_NumThreads);
__anone85726900c02null1086*c4568449SPavel Kumbrasev             a.execute([] {
108751c0b2f7Stbbdev                 // Execute in all the possible modes
108851c0b2f7Stbbdev                 for (size_t j = 0; j < 4; ++j) {
108951c0b2f7Stbbdev                     g_ExceptionInMaster = (j & 1) != 0;
109051c0b2f7Stbbdev                     g_SolitaryException = (j & 2) != 0;
109151c0b2f7Stbbdev 
109251c0b2f7Stbbdev                     RunWithTemplatedBody(Test3_parallel_for_each, OuterParForEachBodyWithIsolatedCtx);
109351c0b2f7Stbbdev                 }
1094*c4568449SPavel Kumbrasev             });
109551c0b2f7Stbbdev         }
109651c0b2f7Stbbdev     }
109751c0b2f7Stbbdev }
109851c0b2f7Stbbdev 
109951c0b2f7Stbbdev //! Testing parallel_for_each exception handling
110051c0b2f7Stbbdev //! \brief \ref error_guessing
110151c0b2f7Stbbdev TEST_CASE("parallel_for_each exception handling test #4") {
110251c0b2f7Stbbdev     for (auto concurrency_level: utils::concurrency_range()) {
110351c0b2f7Stbbdev         g_NumThreads = static_cast<int>(concurrency_level);
110451c0b2f7Stbbdev         g_Master = std::this_thread::get_id();
110551c0b2f7Stbbdev         if (g_NumThreads > 1) {
1106*c4568449SPavel Kumbrasev             tbb::task_arena a(g_NumThreads);
__anone85726900d02null1107*c4568449SPavel Kumbrasev             a.execute([] {
110851c0b2f7Stbbdev                 // Execute in all the possible modes
110951c0b2f7Stbbdev                 for (size_t j = 0; j < 4; ++j) {
111051c0b2f7Stbbdev                     g_ExceptionInMaster = (j & 1) != 0;
111151c0b2f7Stbbdev                     g_SolitaryException = (j & 2) != 0;
111251c0b2f7Stbbdev 
111351c0b2f7Stbbdev                     RunWithTemplatedBody(Test4_parallel_for_each, OuterParForEachWithEhBody);
111451c0b2f7Stbbdev                 }
1115*c4568449SPavel Kumbrasev             });
111651c0b2f7Stbbdev         }
111751c0b2f7Stbbdev     }
111851c0b2f7Stbbdev }
111951c0b2f7Stbbdev 
112051c0b2f7Stbbdev //! Testing parallel_for_each exception handling
112151c0b2f7Stbbdev //! \brief \ref error_guessing
112251c0b2f7Stbbdev TEST_CASE("parallel_for_each exception handling test #5") {
112351c0b2f7Stbbdev     for (auto concurrency_level: utils::concurrency_range()) {
112451c0b2f7Stbbdev         g_NumThreads = static_cast<int>(concurrency_level);
112551c0b2f7Stbbdev         g_Master = std::this_thread::get_id();
112651c0b2f7Stbbdev         if (g_NumThreads > 1) {
1127*c4568449SPavel Kumbrasev             tbb::task_arena a(g_NumThreads);
__anone85726900e02null1128*c4568449SPavel Kumbrasev             a.execute([] {
112951c0b2f7Stbbdev                 // Execute in all the possible modes
113051c0b2f7Stbbdev                 for (size_t j = 0; j < 4; ++j) {
113151c0b2f7Stbbdev                     g_ExceptionInMaster = (j & 1) != 0;
113251c0b2f7Stbbdev                     g_SolitaryException = (j & 2) != 0;
113351c0b2f7Stbbdev 
113451c0b2f7Stbbdev                     Test5_parallel_for_each<utils::InputIterator<size_t> >();
113551c0b2f7Stbbdev                     Test5_parallel_for_each<utils::ForwardIterator<size_t> >();
113651c0b2f7Stbbdev                     Test5_parallel_for_each<utils::RandomIterator<size_t> >();
113751c0b2f7Stbbdev                 }
1138*c4568449SPavel Kumbrasev             });
113951c0b2f7Stbbdev         }
114051c0b2f7Stbbdev     }
114151c0b2f7Stbbdev }
114251c0b2f7Stbbdev 
114351c0b2f7Stbbdev #endif /* TBB_USE_EXCEPTIONS */
114451c0b2f7Stbbdev 
114551c0b2f7Stbbdev class ParForEachBodyToCancel {
114651c0b2f7Stbbdev public:
operator ()(size_t &) const114751c0b2f7Stbbdev     void operator()( size_t& /*value*/ ) const {
114851c0b2f7Stbbdev         ++g_CurExecuted;
114951c0b2f7Stbbdev         Cancellator::WaitUntilReady();
115051c0b2f7Stbbdev     }
115151c0b2f7Stbbdev };
115251c0b2f7Stbbdev 
115351c0b2f7Stbbdev class ParForEachBodyToCancelWithFeeder : ParForEachBodyToCancel {
115451c0b2f7Stbbdev public:
operator ()(size_t & value,tbb::feeder<size_t> & feeder) const115551c0b2f7Stbbdev     void operator()( size_t& value, tbb::feeder<size_t> &feeder ) const {
115651c0b2f7Stbbdev         Feed(feeder, 0);
115751c0b2f7Stbbdev         ParForEachBodyToCancel::operator()(value);
115851c0b2f7Stbbdev     }
115951c0b2f7Stbbdev };
116051c0b2f7Stbbdev 
116151c0b2f7Stbbdev template<class B, class Iterator>
116251c0b2f7Stbbdev class ParForEachWorker {
116351c0b2f7Stbbdev     tbb::task_group_context &my_ctx;
116451c0b2f7Stbbdev public:
operator ()() const116551c0b2f7Stbbdev     void operator()() const {
11668b6f831cStbbdev         auto range = adaptive_range<Iterator>(get_iter_range_size());
11678b6f831cStbbdev         tbb::parallel_for_each( std::begin(range), std::end(range), B(), my_ctx );
116851c0b2f7Stbbdev     }
116951c0b2f7Stbbdev 
ParForEachWorker(tbb::task_group_context & ctx)117051c0b2f7Stbbdev     ParForEachWorker ( tbb::task_group_context& ctx ) : my_ctx(ctx) {}
117151c0b2f7Stbbdev };
117251c0b2f7Stbbdev 
117351c0b2f7Stbbdev //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm).
117451c0b2f7Stbbdev template <class Iterator, class body_to_cancel>
TestCancelation1_parallel_for_each()117551c0b2f7Stbbdev void TestCancelation1_parallel_for_each () {
117651c0b2f7Stbbdev     ResetGlobals( false );
11773e9e8c9cSIlya Mishin     // Threshold should leave more than max_threads tasks to test the cancellation. Set the threshold to iter_range_size()/4 since iter_range_size >= max_threads*2
11788b6f831cStbbdev     intptr_t threshold = get_iter_range_size() / 4;
11793e9e8c9cSIlya Mishin     REQUIRE_MESSAGE(get_iter_range_size() - threshold > g_NumThreads, "Threshold should leave more than max_threads tasks to test the cancellation.");
118051c0b2f7Stbbdev     tbb::task_group tg;
118151c0b2f7Stbbdev     tbb::task_group_context  ctx;
118251c0b2f7Stbbdev     Cancellator cancellator(ctx, threshold);
118351c0b2f7Stbbdev     ParForEachWorker<body_to_cancel, Iterator> worker(ctx);
118451c0b2f7Stbbdev     tg.run( cancellator );
1185b15aabb3Stbbdev     utils::yield();
118651c0b2f7Stbbdev     tg.run( worker );
118751c0b2f7Stbbdev     TRY();
118851c0b2f7Stbbdev         tg.wait();
118951c0b2f7Stbbdev     CATCH_AND_FAIL();
119051c0b2f7Stbbdev     REQUIRE_MESSAGE (g_CurExecuted < g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks were executed after cancellation");
119151c0b2f7Stbbdev }
119251c0b2f7Stbbdev 
119351c0b2f7Stbbdev class ParForEachBodyToCancel2 {
119451c0b2f7Stbbdev public:
operator ()(size_t &) const119551c0b2f7Stbbdev     void operator()( size_t& /*value*/ ) const {
119651c0b2f7Stbbdev         ++g_CurExecuted;
119751c0b2f7Stbbdev         utils::ConcurrencyTracker ct;
119851c0b2f7Stbbdev         // The test will hang (and be timed out by the test system) if is_cancelled() is broken
119951c0b2f7Stbbdev         while( !tbb::is_current_task_group_canceling() )
1200b15aabb3Stbbdev             utils::yield();
120151c0b2f7Stbbdev     }
120251c0b2f7Stbbdev };
120351c0b2f7Stbbdev 
120451c0b2f7Stbbdev class ParForEachBodyToCancel2WithFeeder : ParForEachBodyToCancel2 {
120551c0b2f7Stbbdev public:
operator ()(size_t & value,tbb::feeder<size_t> & feeder) const120651c0b2f7Stbbdev     void operator()( size_t& value, tbb::feeder<size_t> &feeder ) const {
120751c0b2f7Stbbdev         Feed(feeder, 0);
120851c0b2f7Stbbdev         ParForEachBodyToCancel2::operator()(value);
120951c0b2f7Stbbdev     }
121051c0b2f7Stbbdev };
121151c0b2f7Stbbdev 
121251c0b2f7Stbbdev //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm).
121351c0b2f7Stbbdev /** This version also tests tbb::is_current_task_group_canceling() method. **/
121451c0b2f7Stbbdev template <class Iterator, class body_to_cancel>
TestCancelation2_parallel_for_each()121551c0b2f7Stbbdev void TestCancelation2_parallel_for_each () {
121651c0b2f7Stbbdev     ResetGlobals();
121751c0b2f7Stbbdev     RunCancellationTest<ParForEachWorker<body_to_cancel, Iterator>, Cancellator2>();
121851c0b2f7Stbbdev }
121951c0b2f7Stbbdev 
122051c0b2f7Stbbdev //! Testing parallel_for_each cancellation test
122151c0b2f7Stbbdev //! \brief \ref error_guessing
122251c0b2f7Stbbdev TEST_CASE("parallel_for_each cancellation test #1") {
122351c0b2f7Stbbdev     for (auto concurrency_level: utils::concurrency_range()) {
122451c0b2f7Stbbdev         g_NumThreads = static_cast<int>(concurrency_level);
122551c0b2f7Stbbdev         g_Master = std::this_thread::get_id();
122651c0b2f7Stbbdev         if (g_NumThreads > 1) {
1227*c4568449SPavel Kumbrasev             tbb::task_arena a(g_NumThreads);
__anone85726900f02null1228*c4568449SPavel Kumbrasev             a.execute([] {
122951c0b2f7Stbbdev                 // Execute in all the possible modes
123051c0b2f7Stbbdev                 for (size_t j = 0; j < 4; ++j) {
123151c0b2f7Stbbdev                     g_ExceptionInMaster = (j & 1) != 0;
123251c0b2f7Stbbdev                     g_SolitaryException = (j & 2) != 0;
123351c0b2f7Stbbdev                     RunWithSimpleBody(TestCancelation1_parallel_for_each, ParForEachBodyToCancel);
123451c0b2f7Stbbdev                 }
1235*c4568449SPavel Kumbrasev             });
123651c0b2f7Stbbdev         }
123751c0b2f7Stbbdev     }
123851c0b2f7Stbbdev }
123951c0b2f7Stbbdev 
124051c0b2f7Stbbdev //! Testing parallel_for_each cancellation test
124151c0b2f7Stbbdev //! \brief \ref error_guessing
124251c0b2f7Stbbdev TEST_CASE("parallel_for_each cancellation test #2") {
124351c0b2f7Stbbdev     for (auto concurrency_level: utils::concurrency_range()) {
124451c0b2f7Stbbdev         g_NumThreads = static_cast<int>(concurrency_level);
124551c0b2f7Stbbdev         g_Master = std::this_thread::get_id();
124651c0b2f7Stbbdev         if (g_NumThreads > 1) {
1247*c4568449SPavel Kumbrasev             tbb::task_arena a(g_NumThreads);
__anone85726901002null1248*c4568449SPavel Kumbrasev             a.execute([] {
124951c0b2f7Stbbdev                 // Execute in all the possible modes
125051c0b2f7Stbbdev                 for (size_t j = 0; j < 4; ++j) {
125151c0b2f7Stbbdev                     g_ExceptionInMaster = (j & 1) != 0;
125251c0b2f7Stbbdev                     g_SolitaryException = (j & 2) != 0;
125351c0b2f7Stbbdev 
125451c0b2f7Stbbdev                     RunWithSimpleBody(TestCancelation2_parallel_for_each, ParForEachBodyToCancel2);
125551c0b2f7Stbbdev                 }
1256*c4568449SPavel Kumbrasev             });
125751c0b2f7Stbbdev         }
125851c0b2f7Stbbdev     }
125951c0b2f7Stbbdev }
126051c0b2f7Stbbdev 
126151c0b2f7Stbbdev ////////////////////////////////////////////////////////////////////////////////
126251c0b2f7Stbbdev // Tests for tbb::parallel_pipeline
126351c0b2f7Stbbdev ////////////////////////////////////////////////////////////////////////////////
126451c0b2f7Stbbdev int g_NumTokens = 0;
126551c0b2f7Stbbdev 
126651c0b2f7Stbbdev // Simple input filter class, it assigns 1 to all array members
126751c0b2f7Stbbdev // It stops when it receives item equal to -1
126851c0b2f7Stbbdev class InputFilter {
126951c0b2f7Stbbdev     mutable std::atomic<size_t> m_Item{};
12708b6f831cStbbdev     mutable std::vector<size_t> m_Buffer;
127151c0b2f7Stbbdev public:
InputFilter()12723e9e8c9cSIlya Mishin     InputFilter() : m_Buffer(get_iter_range_size()) {
127351c0b2f7Stbbdev         m_Item = 0;
12748b6f831cStbbdev         for (size_t i = 0; i < get_iter_range_size(); ++i )
127551c0b2f7Stbbdev             m_Buffer[i] = 1;
127651c0b2f7Stbbdev     }
InputFilter(const InputFilter & other)12773e9e8c9cSIlya Mishin     InputFilter(const InputFilter& other) : m_Item(other.m_Item.load()), m_Buffer(get_iter_range_size()) {
12788b6f831cStbbdev         for (size_t i = 0; i < get_iter_range_size(); ++i )
127951c0b2f7Stbbdev             m_Buffer[i] = other.m_Buffer[i];
128051c0b2f7Stbbdev     }
128151c0b2f7Stbbdev 
operator ()(tbb::flow_control & control) const128251c0b2f7Stbbdev     void* operator()(tbb::flow_control& control) const {
128351c0b2f7Stbbdev         size_t item = m_Item++;
128451c0b2f7Stbbdev         if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true;
128551c0b2f7Stbbdev         else g_NonMasterExecuted = true;
128651c0b2f7Stbbdev         if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment();
128751c0b2f7Stbbdev         if(item == 1) {
128851c0b2f7Stbbdev             ++g_PipelinesStarted;   // count on emitting the first item.
128951c0b2f7Stbbdev         }
12908b6f831cStbbdev         if ( item >= get_iter_range_size() ) {
129151c0b2f7Stbbdev             control.stop();
129251c0b2f7Stbbdev             return nullptr;
129351c0b2f7Stbbdev         }
129451c0b2f7Stbbdev         m_Buffer[item] = 1;
129551c0b2f7Stbbdev         return &m_Buffer[item];
129651c0b2f7Stbbdev     }
129751c0b2f7Stbbdev 
buffer()12988b6f831cStbbdev     size_t* buffer() { return m_Buffer.data(); }
129951c0b2f7Stbbdev }; // class InputFilter
130051c0b2f7Stbbdev 
130151c0b2f7Stbbdev #if TBB_USE_EXCEPTIONS
130251c0b2f7Stbbdev 
130351c0b2f7Stbbdev // Simple filter with exception throwing.  If parallel, will wait until
130451c0b2f7Stbbdev // as many parallel filters start as there are threads.
130551c0b2f7Stbbdev class SimpleFilter {
130651c0b2f7Stbbdev     bool m_canThrow;
130751c0b2f7Stbbdev     bool m_serial;
130851c0b2f7Stbbdev public:
SimpleFilter(bool canThrow,bool serial)130951c0b2f7Stbbdev     SimpleFilter (bool canThrow, bool serial ) : m_canThrow(canThrow), m_serial(serial) {}
operator ()(void * item) const131051c0b2f7Stbbdev     void* operator()(void* item) const {
131151c0b2f7Stbbdev         ++g_CurExecuted;
131251c0b2f7Stbbdev         if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true;
131351c0b2f7Stbbdev         else g_NonMasterExecuted = true;
131451c0b2f7Stbbdev         if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment();
131551c0b2f7Stbbdev         if ( m_canThrow ) {
131651c0b2f7Stbbdev             if ( !m_serial ) {
131751c0b2f7Stbbdev                 utils::ConcurrencyTracker ct;
131851c0b2f7Stbbdev                 WaitUntilConcurrencyPeaks( std::min(g_NumTokens, g_NumThreads) );
131951c0b2f7Stbbdev             }
132051c0b2f7Stbbdev             ThrowTestException(1);
132151c0b2f7Stbbdev         }
132251c0b2f7Stbbdev         return item;
132351c0b2f7Stbbdev     }
132451c0b2f7Stbbdev }; // class SimpleFilter
132551c0b2f7Stbbdev 
132651c0b2f7Stbbdev // This enumeration represents filters order in pipeline
132751c0b2f7Stbbdev struct FilterSet {
132851c0b2f7Stbbdev     tbb::filter_mode mode1, mode2;
132951c0b2f7Stbbdev     bool throw1, throw2;
133051c0b2f7Stbbdev 
FilterSetFilterSet133151c0b2f7Stbbdev     FilterSet( tbb::filter_mode m1, tbb::filter_mode m2, bool t1, bool t2 )
133251c0b2f7Stbbdev         : mode1(m1), mode2(m2), throw1(t1), throw2(t2)
133351c0b2f7Stbbdev     {}
133451c0b2f7Stbbdev }; // struct FilterSet
133551c0b2f7Stbbdev 
133651c0b2f7Stbbdev FilterSet serial_parallel( tbb::filter_mode::serial_in_order, tbb::filter_mode::parallel, /*throw1*/false, /*throw2*/true );
133751c0b2f7Stbbdev 
133851c0b2f7Stbbdev template<typename InFilter, typename Filter>
133951c0b2f7Stbbdev class CustomPipeline {
134051c0b2f7Stbbdev     InFilter inputFilter;
134151c0b2f7Stbbdev     Filter filter1;
134251c0b2f7Stbbdev     Filter filter2;
134351c0b2f7Stbbdev     FilterSet my_filters;
134451c0b2f7Stbbdev public:
CustomPipeline(const FilterSet & filters)134551c0b2f7Stbbdev     CustomPipeline( const FilterSet& filters )
134651c0b2f7Stbbdev         : filter1(filters.throw1, filters.mode1 != tbb::filter_mode::parallel),
134751c0b2f7Stbbdev           filter2(filters.throw2, filters.mode2 != tbb::filter_mode::parallel),
134851c0b2f7Stbbdev           my_filters(filters)
134951c0b2f7Stbbdev     {}
135051c0b2f7Stbbdev 
run()135151c0b2f7Stbbdev     void run () {
135251c0b2f7Stbbdev         tbb::parallel_pipeline(
135351c0b2f7Stbbdev             g_NumTokens,
135451c0b2f7Stbbdev             tbb::make_filter<void, void*>(tbb::filter_mode::parallel, inputFilter) &
135551c0b2f7Stbbdev             tbb::make_filter<void*, void*>(my_filters.mode1, filter1) &
135651c0b2f7Stbbdev             tbb::make_filter<void*, void>(my_filters.mode2, filter2)
135751c0b2f7Stbbdev         );
135851c0b2f7Stbbdev     }
run(tbb::task_group_context & ctx)135951c0b2f7Stbbdev     void run ( tbb::task_group_context& ctx ) {
136051c0b2f7Stbbdev         tbb::parallel_pipeline(
136151c0b2f7Stbbdev             g_NumTokens,
136251c0b2f7Stbbdev             tbb::make_filter<void, void*>(tbb::filter_mode::parallel, inputFilter) &
136351c0b2f7Stbbdev             tbb::make_filter<void*, void*>(my_filters.mode1, filter1) &
136451c0b2f7Stbbdev             tbb::make_filter<void*, void>(my_filters.mode2, filter2),
136551c0b2f7Stbbdev             ctx
136651c0b2f7Stbbdev         );
136751c0b2f7Stbbdev     }
136851c0b2f7Stbbdev };
136951c0b2f7Stbbdev 
137051c0b2f7Stbbdev typedef CustomPipeline<InputFilter, SimpleFilter> SimplePipeline;
137151c0b2f7Stbbdev 
137251c0b2f7Stbbdev // Tests exceptions without nesting
Test1_pipeline(const FilterSet & filters)137351c0b2f7Stbbdev void Test1_pipeline ( const FilterSet& filters ) {
137451c0b2f7Stbbdev     ResetGlobals();
137551c0b2f7Stbbdev     SimplePipeline testPipeline(filters);
137651c0b2f7Stbbdev     TRY();
137751c0b2f7Stbbdev         testPipeline.run();
13788b6f831cStbbdev         if ( g_CurExecuted == 2 * static_cast<int>(get_iter_range_size()) ) {
137951c0b2f7Stbbdev             // all the items were processed, though an exception was supposed to occur.
138051c0b2f7Stbbdev             if(!g_ExceptionInMaster && g_NonMasterExecutedThrow > 0) {
1381b15aabb3Stbbdev                 // if !g_ExceptionInMaster, the external thread is not allowed to throw.
1382b15aabb3Stbbdev                 // if g_nonMasterExcutedThrow > 0 then a thread besides the external thread tried to throw.
138351c0b2f7Stbbdev                 REQUIRE_MESSAGE((filters.mode1 != tbb::filter_mode::parallel && filters.mode2 != tbb::filter_mode::parallel),
138451c0b2f7Stbbdev                     "Unusual count");
138551c0b2f7Stbbdev             }
138651c0b2f7Stbbdev             // In case of all serial filters they might be all executed in the thread(s)
138751c0b2f7Stbbdev             // where exceptions are not allowed by the common test logic. So we just quit.
138851c0b2f7Stbbdev             return;
138951c0b2f7Stbbdev         }
139051c0b2f7Stbbdev     CATCH_AND_ASSERT();
139151c0b2f7Stbbdev     g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception");
139251c0b2f7Stbbdev     REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
139351c0b2f7Stbbdev     REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test");
139451c0b2f7Stbbdev     if ( !g_SolitaryException )
139551c0b2f7Stbbdev         REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
139651c0b2f7Stbbdev 
139751c0b2f7Stbbdev } // void Test1_pipeline ()
139851c0b2f7Stbbdev 
139951c0b2f7Stbbdev // Filter with nesting
140051c0b2f7Stbbdev class OuterFilter {
140151c0b2f7Stbbdev public:
OuterFilter(bool,bool)140251c0b2f7Stbbdev     OuterFilter ( bool, bool ) {}
140351c0b2f7Stbbdev 
operator ()(void * item) const140451c0b2f7Stbbdev     void* operator()(void* item) const {
140551c0b2f7Stbbdev         ++g_OuterParCalls;
140651c0b2f7Stbbdev         SimplePipeline testPipeline(serial_parallel);
140751c0b2f7Stbbdev         testPipeline.run();
140851c0b2f7Stbbdev         return item;
140951c0b2f7Stbbdev     }
141051c0b2f7Stbbdev }; // class OuterFilter
141151c0b2f7Stbbdev 
141251c0b2f7Stbbdev //! Uses pipeline containing an inner pipeline with the default context not wrapped by a try-block.
141351c0b2f7Stbbdev /** Inner algorithms are spawned inside the new bound context by default. Since
141451c0b2f7Stbbdev     exceptions thrown from the inner pipeline are not handled by the caller
141551c0b2f7Stbbdev     (outer pipeline body) in this test, they will cancel all the sibling inner
141651c0b2f7Stbbdev     algorithms. **/
Test2_pipeline(const FilterSet & filters)141751c0b2f7Stbbdev void Test2_pipeline ( const FilterSet& filters ) {
141851c0b2f7Stbbdev     ResetGlobals();
141951c0b2f7Stbbdev     g_NestedPipelines = true;
142051c0b2f7Stbbdev     CustomPipeline<InputFilter, OuterFilter> testPipeline(filters);
142151c0b2f7Stbbdev     TRY();
142251c0b2f7Stbbdev         testPipeline.run();
142351c0b2f7Stbbdev     CATCH_AND_ASSERT();
142451c0b2f7Stbbdev     bool okayNoExceptionCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow);
142551c0b2f7Stbbdev     REQUIRE_MESSAGE ((g_NumExceptionsCaught == 1 || okayNoExceptionCaught), "No try_blocks in any body expected in this test");
142651c0b2f7Stbbdev     if ( !g_SolitaryException ) {
142751c0b2f7Stbbdev         REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
142851c0b2f7Stbbdev     }
142951c0b2f7Stbbdev } // void Test2_pipeline ()
143051c0b2f7Stbbdev 
143151c0b2f7Stbbdev //! creates isolated inner pipeline and runs it.
143251c0b2f7Stbbdev class OuterFilterWithIsolatedCtx {
143351c0b2f7Stbbdev public:
OuterFilterWithIsolatedCtx(bool,bool)143451c0b2f7Stbbdev     OuterFilterWithIsolatedCtx( bool , bool ) {}
143551c0b2f7Stbbdev 
operator ()(void * item) const143651c0b2f7Stbbdev     void* operator()(void* item) const {
143751c0b2f7Stbbdev         ++g_OuterParCalls;
143851c0b2f7Stbbdev         tbb::task_group_context ctx(tbb::task_group_context::isolated);
143951c0b2f7Stbbdev         // create inner pipeline with serial input, parallel output filter, second filter throws
144051c0b2f7Stbbdev         SimplePipeline testPipeline(serial_parallel);
144151c0b2f7Stbbdev         testPipeline.run(ctx);
144251c0b2f7Stbbdev         return item;
144351c0b2f7Stbbdev     }
144451c0b2f7Stbbdev }; // class OuterFilterWithIsolatedCtx
144551c0b2f7Stbbdev 
144651c0b2f7Stbbdev //! Uses pipeline invoking an inner pipeline with an isolated context without a try-block.
144751c0b2f7Stbbdev /** Even though exceptions thrown from the inner pipeline are not handled
144851c0b2f7Stbbdev     by the caller in this test, they will not affect sibling inner algorithms
144951c0b2f7Stbbdev     already running because of the isolated contexts. However because the first
145051c0b2f7Stbbdev     exception cancels the root parallel_for_each only the first g_NumThreads subranges
145151c0b2f7Stbbdev     will be processed (which launch inner pipelines) **/
Test3_pipeline(const FilterSet & filters)145251c0b2f7Stbbdev void Test3_pipeline ( const FilterSet& filters ) {
145351c0b2f7Stbbdev     for( int nTries = 1; nTries <= 4; ++nTries) {
145451c0b2f7Stbbdev         ResetGlobals();
145551c0b2f7Stbbdev         g_NestedPipelines = true;
145651c0b2f7Stbbdev         g_Master = std::this_thread::get_id();
14578b6f831cStbbdev         intptr_t innerCalls = get_iter_range_size(),
145851c0b2f7Stbbdev                  minExecuted = (g_NumThreads - 1) * innerCalls;
145951c0b2f7Stbbdev         CustomPipeline<InputFilter, OuterFilterWithIsolatedCtx> testPipeline(filters);
146051c0b2f7Stbbdev         TRY();
146151c0b2f7Stbbdev             testPipeline.run();
146251c0b2f7Stbbdev         CATCH_AND_ASSERT();
146351c0b2f7Stbbdev 
146451c0b2f7Stbbdev         bool okayNoExceptionCaught = (g_ExceptionInMaster && !g_MasterExecuted) ||
146551c0b2f7Stbbdev             (!g_ExceptionInMaster && !g_NonMasterExecuted);
146651c0b2f7Stbbdev         // only test assertions if the test threw an exception (or we don't care)
146751c0b2f7Stbbdev         bool testSucceeded = okayNoExceptionCaught || g_NumExceptionsCaught > 0;
146851c0b2f7Stbbdev         if(testSucceeded) {
146951c0b2f7Stbbdev             if (g_SolitaryException) {
147051c0b2f7Stbbdev 
147151c0b2f7Stbbdev                 // The test is one outer pipeline with two NestedFilters that each start an inner pipeline.
147251c0b2f7Stbbdev                 // Each time the input filter of a pipeline delivers its first item, it increments
147351c0b2f7Stbbdev                 // g_PipelinesStarted.  When g_SolitaryException, the throw will not occur until
147451c0b2f7Stbbdev                 // g_PipelinesStarted >= 3.  (This is so at least a second pipeline in its own isolated
147551c0b2f7Stbbdev                 // context will start; that is what we're testing.)
147651c0b2f7Stbbdev                 //
147751c0b2f7Stbbdev                 // There are two pipelines which will NOT run to completion when a solitary throw
147851c0b2f7Stbbdev                 // happens in an isolated inner context: the outer pipeline and the pipeline which
147951c0b2f7Stbbdev                 // throws.  All the other pipelines which start should run to completion.  But only
148051c0b2f7Stbbdev                 // inner body invocations are counted.
148151c0b2f7Stbbdev                 //
148251c0b2f7Stbbdev                 // So g_CurExecuted should be about
148351c0b2f7Stbbdev                 //
14848b6f831cStbbdev                 //   (2*get_iter_range_size()) * (g_PipelinesStarted - 2) + 1
148551c0b2f7Stbbdev                 //   ^ executions for each completed pipeline
148651c0b2f7Stbbdev                 //                   ^ completing pipelines (remembering two will not complete)
148751c0b2f7Stbbdev                 //                                              ^ one for the inner throwing pipeline
148851c0b2f7Stbbdev 
14898b6f831cStbbdev                 minExecuted = (2*get_iter_range_size()) * (g_PipelinesStarted - 2) + 1;
149051c0b2f7Stbbdev                 // each failing pipeline must execute at least two tasks
149151c0b2f7Stbbdev                 REQUIRE_MESSAGE(g_CurExecuted >= minExecuted, "Too few tasks survived exception");
149251c0b2f7Stbbdev                 // no more than g_NumThreads tasks will be executed in a cancelled context.  Otherwise
149351c0b2f7Stbbdev                 // tasks not executing at throw were scheduled.
149451c0b2f7Stbbdev                 g_TGCCancelled.validate(g_NumThreads, "Tasks not in-flight were executed");
149551c0b2f7Stbbdev                 REQUIRE_MESSAGE(g_NumExceptionsCaught == 1, "Should have only one exception");
1496b15aabb3Stbbdev                 // if we're only throwing from the external thread, and that thread didn't
149751c0b2f7Stbbdev                 // participate in the pipelines, then no throw occurred.
149851c0b2f7Stbbdev             }
149951c0b2f7Stbbdev             REQUIRE_MESSAGE ((g_NumExceptionsCaught == 1 || okayNoExceptionCaught), "No try_blocks in any body expected in this test");
150051c0b2f7Stbbdev             REQUIRE_MESSAGE (((g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads) || okayNoExceptionCaught), "Too many tasks survived exception");
150151c0b2f7Stbbdev             return;
150251c0b2f7Stbbdev         }
150351c0b2f7Stbbdev     }
150451c0b2f7Stbbdev }
150551c0b2f7Stbbdev 
150651c0b2f7Stbbdev class OuterFilterWithEhBody  {
150751c0b2f7Stbbdev public:
OuterFilterWithEhBody(bool,bool)150851c0b2f7Stbbdev     OuterFilterWithEhBody( bool, bool ){}
150951c0b2f7Stbbdev 
operator ()(void * item) const151051c0b2f7Stbbdev     void* operator()(void* item) const {
151151c0b2f7Stbbdev         tbb::task_group_context ctx(tbb::task_group_context::isolated);
151251c0b2f7Stbbdev         ++g_OuterParCalls;
151351c0b2f7Stbbdev         SimplePipeline testPipeline(serial_parallel);
151451c0b2f7Stbbdev         TRY();
151551c0b2f7Stbbdev             testPipeline.run(ctx);
151651c0b2f7Stbbdev         CATCH();
151751c0b2f7Stbbdev         return item;
151851c0b2f7Stbbdev     }
151951c0b2f7Stbbdev }; // class OuterFilterWithEhBody
152051c0b2f7Stbbdev 
152151c0b2f7Stbbdev //! Uses pipeline body invoking an inner pipeline (with isolated context) inside a try-block.
152251c0b2f7Stbbdev /** Since exception(s) thrown from the inner pipeline are handled by the caller
152351c0b2f7Stbbdev     in this test, they do not affect other tasks of the the root pipeline
152451c0b2f7Stbbdev     nor sibling inner algorithms. **/
Test4_pipeline(const FilterSet & filters)152551c0b2f7Stbbdev void Test4_pipeline ( const FilterSet& filters ) {
152651c0b2f7Stbbdev #if __GNUC__ && !__INTEL_COMPILER
152751c0b2f7Stbbdev     if ( strncmp(__VERSION__, "4.1.0", 5) == 0 ) {
152851c0b2f7Stbbdev         MESSAGE("Known issue: one of exception handling tests is skipped.");
152951c0b2f7Stbbdev         return;
153051c0b2f7Stbbdev     }
153151c0b2f7Stbbdev #endif
153251c0b2f7Stbbdev     ResetGlobals( true, true );
15338b6f831cStbbdev     // each outer pipeline stage will start get_iter_range_size() inner pipelines.
15348b6f831cStbbdev     // each inner pipeline that doesn't throw will process get_iter_range_size() items.
153551c0b2f7Stbbdev     // for solitary exception there will be one pipeline that only processes one stage, one item.
15368b6f831cStbbdev     // innerCalls should be 2*get_iter_range_size()
15378b6f831cStbbdev     intptr_t innerCalls = 2*get_iter_range_size(),
15388b6f831cStbbdev              outerCalls = 2*get_iter_range_size(),
153951c0b2f7Stbbdev              maxExecuted = outerCalls * innerCalls;  // the number of invocations of the inner pipelines
154051c0b2f7Stbbdev     CustomPipeline<InputFilter, OuterFilterWithEhBody> testPipeline(filters);
154151c0b2f7Stbbdev     TRY();
154251c0b2f7Stbbdev         testPipeline.run();
154351c0b2f7Stbbdev     CATCH_AND_ASSERT();
154451c0b2f7Stbbdev     intptr_t  minExecuted = 0;
154551c0b2f7Stbbdev     bool okayNoExceptionCaught = (g_ExceptionInMaster && !g_MasterExecuted) ||
154651c0b2f7Stbbdev         (!g_ExceptionInMaster && !g_NonMasterExecuted);
154751c0b2f7Stbbdev     if ( g_SolitaryException ) {
154851c0b2f7Stbbdev         minExecuted = maxExecuted - innerCalls;  // one throwing inner pipeline
154951c0b2f7Stbbdev         REQUIRE_MESSAGE((g_NumExceptionsCaught == 1 || okayNoExceptionCaught), "No exception registered");
155051c0b2f7Stbbdev         g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception");  // probably will assert.
155151c0b2f7Stbbdev     }
155251c0b2f7Stbbdev     else {
155351c0b2f7Stbbdev         // we assume throwing pipelines will not count
155451c0b2f7Stbbdev         minExecuted = (outerCalls - g_NumExceptionsCaught) * innerCalls;
155551c0b2f7Stbbdev         REQUIRE_MESSAGE(((g_NumExceptionsCaught >= 1 && g_NumExceptionsCaught <= outerCalls) || okayNoExceptionCaught), "Unexpected actual number of exceptions");
155651c0b2f7Stbbdev         REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too many executed tasks reported");
155751c0b2f7Stbbdev         // too many already-scheduled tasks are started after the first exception is
155851c0b2f7Stbbdev         // thrown.  And g_ExecutedAtLastCatch is updated every time an exception is caught.
155951c0b2f7Stbbdev         // So with multiple exceptions there are a variable number of tasks that have been
156051c0b2f7Stbbdev         // discarded because of the signals.
156151c0b2f7Stbbdev         // each throw is caught, so we will see many cancelled tasks.  g_ExecutedAtLastCatch is
156251c0b2f7Stbbdev         // updated with each throw, so the value will be the number of tasks executed at the last
156351c0b2f7Stbbdev         REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived multiple exceptions");
156451c0b2f7Stbbdev     }
156551c0b2f7Stbbdev } // void Test4_pipeline ()
156651c0b2f7Stbbdev 
156751c0b2f7Stbbdev //! Tests pipeline function passed with different combination of filters
156851c0b2f7Stbbdev template<void testFunc(const FilterSet&)>
TestWithDifferentFiltersAndConcurrency()156951c0b2f7Stbbdev void TestWithDifferentFiltersAndConcurrency() {
1570f5ed9a6bSAlex #if __TBB_USE_ADDRESS_SANITIZER
1571f5ed9a6bSAlex     // parallel_pipeline allocates tls that sporadically observed as a memory leak with
1572f5ed9a6bSAlex     // detached threads. So, use task_scheduler_handle to join threads with finalize
1573f5ed9a6bSAlex     tbb::task_scheduler_handle handle{ tbb::attach{} };
1574f5ed9a6bSAlex #endif
157551c0b2f7Stbbdev     for (auto concurrency_level: utils::concurrency_range()) {
157651c0b2f7Stbbdev         g_NumThreads = static_cast<int>(concurrency_level);
157751c0b2f7Stbbdev         g_Master = std::this_thread::get_id();
157851c0b2f7Stbbdev         if (g_NumThreads > 1) {
1579444fd2bdSAnton Potapov 
1580444fd2bdSAnton Potapov             const tbb::filter_mode modes[] = {
158151c0b2f7Stbbdev                 tbb::filter_mode::parallel,
158251c0b2f7Stbbdev                 tbb::filter_mode::serial_in_order,
158351c0b2f7Stbbdev                 tbb::filter_mode::serial_out_of_order
158451c0b2f7Stbbdev             };
158551c0b2f7Stbbdev 
1586444fd2bdSAnton Potapov             const int NumFilterTypes = sizeof(modes)/sizeof(modes[0]);
1587444fd2bdSAnton Potapov 
1588444fd2bdSAnton Potapov             // Execute in all the possible modes
15890063fb3cSAnton Potapov             for ( size_t j = 0; j < 4; ++j ) {
1590*c4568449SPavel Kumbrasev                 tbb::task_arena a(g_NumThreads);
1591*c4568449SPavel Kumbrasev                 a.execute([&] {
1592444fd2bdSAnton Potapov                     g_ExceptionInMaster = (j & 1) != 0;
1593444fd2bdSAnton Potapov                     g_SolitaryException = (j & 2) != 0;
1594444fd2bdSAnton Potapov                     g_NumTokens = 2 * g_NumThreads;
1595444fd2bdSAnton Potapov 
159651c0b2f7Stbbdev                     for (int i = 0; i < NumFilterTypes; ++i) {
159751c0b2f7Stbbdev                         for (int n = 0; n < NumFilterTypes; ++n) {
159851c0b2f7Stbbdev                             for (int k = 0; k < 2; ++k)
15990063fb3cSAnton Potapov                                 testFunc(FilterSet(modes[i], modes[n], k == 0, k != 0));
160051c0b2f7Stbbdev                         }
160151c0b2f7Stbbdev                     }
1602*c4568449SPavel Kumbrasev                 });
160351c0b2f7Stbbdev             }
160451c0b2f7Stbbdev         }
160551c0b2f7Stbbdev     }
1606f5ed9a6bSAlex #if __TBB_USE_ADDRESS_SANITIZER
1607f5ed9a6bSAlex     tbb::finalize(handle);
1608f5ed9a6bSAlex #endif
160951c0b2f7Stbbdev }
161051c0b2f7Stbbdev 
161151c0b2f7Stbbdev //! Testing parallel_pipeline exception handling
161251c0b2f7Stbbdev //! \brief \ref error_guessing
161351c0b2f7Stbbdev TEST_CASE("parallel_pipeline exception handling test #1") {
161451c0b2f7Stbbdev     TestWithDifferentFiltersAndConcurrency<Test1_pipeline>();
161551c0b2f7Stbbdev }
161651c0b2f7Stbbdev 
161751c0b2f7Stbbdev //! Testing parallel_pipeline exception handling
161851c0b2f7Stbbdev //! \brief \ref error_guessing
161951c0b2f7Stbbdev TEST_CASE("parallel_pipeline exception handling test #2") {
162051c0b2f7Stbbdev     TestWithDifferentFiltersAndConcurrency<Test2_pipeline>();
162151c0b2f7Stbbdev }
162251c0b2f7Stbbdev 
162351c0b2f7Stbbdev //! Testing parallel_pipeline exception handling
162451c0b2f7Stbbdev //! \brief \ref error_guessing
162551c0b2f7Stbbdev TEST_CASE("parallel_pipeline exception handling test #3") {
162651c0b2f7Stbbdev     TestWithDifferentFiltersAndConcurrency<Test3_pipeline>();
162751c0b2f7Stbbdev }
162851c0b2f7Stbbdev 
162951c0b2f7Stbbdev //! Testing parallel_pipeline exception handling
163051c0b2f7Stbbdev //! \brief \ref error_guessing
163151c0b2f7Stbbdev TEST_CASE("parallel_pipeline exception handling test #4") {
163251c0b2f7Stbbdev     TestWithDifferentFiltersAndConcurrency<Test4_pipeline>();
163351c0b2f7Stbbdev }
163451c0b2f7Stbbdev 
163551c0b2f7Stbbdev #endif /* TBB_USE_EXCEPTIONS */
163651c0b2f7Stbbdev 
163751c0b2f7Stbbdev class FilterToCancel  {
163851c0b2f7Stbbdev public:
FilterToCancel()163951c0b2f7Stbbdev     FilterToCancel() {}
operator ()(void * item) const164051c0b2f7Stbbdev     void* operator()(void* item) const {
164151c0b2f7Stbbdev         ++g_CurExecuted;
164251c0b2f7Stbbdev         Cancellator::WaitUntilReady();
164351c0b2f7Stbbdev         return item;
164451c0b2f7Stbbdev     }
164551c0b2f7Stbbdev }; // class FilterToCancel
164651c0b2f7Stbbdev 
164751c0b2f7Stbbdev template <class Filter_to_cancel>
164851c0b2f7Stbbdev class PipelineLauncher {
164951c0b2f7Stbbdev     tbb::task_group_context &my_ctx;
165051c0b2f7Stbbdev public:
PipelineLauncher(tbb::task_group_context & ctx)165151c0b2f7Stbbdev     PipelineLauncher ( tbb::task_group_context& ctx ) : my_ctx(ctx) {}
165251c0b2f7Stbbdev 
operator ()() const165351c0b2f7Stbbdev     void operator()() const {
165451c0b2f7Stbbdev         // Run test when serial filter is the first non-input filter
165551c0b2f7Stbbdev         InputFilter inputFilter;
165651c0b2f7Stbbdev         Filter_to_cancel filterToCancel;
165751c0b2f7Stbbdev         tbb::parallel_pipeline(
165851c0b2f7Stbbdev             g_NumTokens,
165951c0b2f7Stbbdev             tbb::make_filter<void, void*>(tbb::filter_mode::parallel, inputFilter) &
166051c0b2f7Stbbdev             tbb::make_filter<void*, void>(tbb::filter_mode::parallel, filterToCancel),
166151c0b2f7Stbbdev             my_ctx
166251c0b2f7Stbbdev         );
166351c0b2f7Stbbdev     }
166451c0b2f7Stbbdev };
166551c0b2f7Stbbdev 
166651c0b2f7Stbbdev //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm).
TestCancelation1_pipeline()166751c0b2f7Stbbdev void TestCancelation1_pipeline () {
166851c0b2f7Stbbdev     ResetGlobals();
166951c0b2f7Stbbdev     g_ThrowException = false;
16703e9e8c9cSIlya Mishin     // Threshold should leave more than max_threads tasks to test the cancellation. Set the threshold to iter_range_size()/4 since iter_range_size >= max_threads*2
16718b6f831cStbbdev     intptr_t threshold = get_iter_range_size() / 4;
16723e9e8c9cSIlya Mishin     REQUIRE_MESSAGE(get_iter_range_size() - threshold > g_NumThreads, "Threshold should leave more than max_threads tasks to test the cancellation.");
16738b6f831cStbbdev     RunCancellationTest<PipelineLauncher<FilterToCancel>, Cancellator>(threshold);
167451c0b2f7Stbbdev     g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived cancellation");
167551c0b2f7Stbbdev     REQUIRE_MESSAGE (g_CurExecuted < g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks were executed after cancellation");
167651c0b2f7Stbbdev }
167751c0b2f7Stbbdev 
167851c0b2f7Stbbdev class FilterToCancel2  {
167951c0b2f7Stbbdev public:
FilterToCancel2()168051c0b2f7Stbbdev     FilterToCancel2() {}
168151c0b2f7Stbbdev 
operator ()(void * item) const168251c0b2f7Stbbdev     void* operator()(void* item) const {
168351c0b2f7Stbbdev         ++g_CurExecuted;
168451c0b2f7Stbbdev         utils::ConcurrencyTracker ct;
168551c0b2f7Stbbdev         // The test will hang (and be timed out by the test system) if is_cancelled() is broken
168651c0b2f7Stbbdev         while( !tbb::is_current_task_group_canceling() )
1687b15aabb3Stbbdev             utils::yield();
168851c0b2f7Stbbdev         return item;
168951c0b2f7Stbbdev     }
169051c0b2f7Stbbdev };
169151c0b2f7Stbbdev 
169251c0b2f7Stbbdev //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm).
169351c0b2f7Stbbdev /** This version also tests task::is_cancelled() method. **/
TestCancelation2_pipeline()169451c0b2f7Stbbdev void TestCancelation2_pipeline () {
169551c0b2f7Stbbdev     ResetGlobals();
169651c0b2f7Stbbdev     RunCancellationTest<PipelineLauncher<FilterToCancel2>, Cancellator2>();
169751c0b2f7Stbbdev     // g_CurExecuted is always >= g_ExecutedAtLastCatch, because the latter is always a snapshot of the
169851c0b2f7Stbbdev     // former, and g_CurExecuted is monotonic increasing.  so the comparison should be at least ==.
169951c0b2f7Stbbdev     // If another filter is started after cancel but before cancellation is propagated, then the
170051c0b2f7Stbbdev     // number will be larger.
170151c0b2f7Stbbdev     REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch, "Some tasks were executed after cancellation");
170251c0b2f7Stbbdev }
170351c0b2f7Stbbdev 
170451c0b2f7Stbbdev /** If min and max thread numbers specified on the command line are different,
170551c0b2f7Stbbdev     the test is run only for 2 sizes of the thread pool (MinThread and MaxThread)
170651c0b2f7Stbbdev     to be able to test the high and low contention modes while keeping the test reasonably fast **/
170751c0b2f7Stbbdev 
170851c0b2f7Stbbdev //! Testing parallel_pipeline cancellation
170951c0b2f7Stbbdev //! \brief \ref error_guessing
171051c0b2f7Stbbdev TEST_CASE("parallel_pipeline cancellation test #1") {
171151c0b2f7Stbbdev     for (auto concurrency_level: utils::concurrency_range()) {
171251c0b2f7Stbbdev         g_NumThreads = static_cast<int>(concurrency_level);
171351c0b2f7Stbbdev         g_Master = std::this_thread::get_id();
171451c0b2f7Stbbdev         if (g_NumThreads > 1) {
1715*c4568449SPavel Kumbrasev             tbb::task_arena a(g_NumThreads);
__anone85726901202null1716*c4568449SPavel Kumbrasev             a.execute([] {
171751c0b2f7Stbbdev                 // Execute in all the possible modes
171851c0b2f7Stbbdev                 for (size_t j = 0; j < 4; ++j) {
171951c0b2f7Stbbdev                     g_ExceptionInMaster = (j & 1) != 0;
172051c0b2f7Stbbdev                     g_SolitaryException = (j & 2) != 0;
172151c0b2f7Stbbdev                     g_NumTokens = 2 * g_NumThreads;
172251c0b2f7Stbbdev 
172351c0b2f7Stbbdev                     TestCancelation1_pipeline();
172451c0b2f7Stbbdev                 }
1725*c4568449SPavel Kumbrasev             });
172651c0b2f7Stbbdev         }
172751c0b2f7Stbbdev     }
172851c0b2f7Stbbdev }
172951c0b2f7Stbbdev 
173051c0b2f7Stbbdev //! Testing parallel_pipeline cancellation
173151c0b2f7Stbbdev //! \brief \ref error_guessing
173251c0b2f7Stbbdev TEST_CASE("parallel_pipeline cancellation test #2") {
173351c0b2f7Stbbdev     for (auto concurrency_level: utils::concurrency_range()) {
173451c0b2f7Stbbdev         g_NumThreads = static_cast<int>(concurrency_level);
173551c0b2f7Stbbdev         g_Master = std::this_thread::get_id();
173651c0b2f7Stbbdev         if (g_NumThreads > 1) {
1737*c4568449SPavel Kumbrasev             tbb::task_arena a(g_NumThreads);
__anone85726901302null1738*c4568449SPavel Kumbrasev             a.execute([] {
173951c0b2f7Stbbdev                 // Execute in all the possible modes
174051c0b2f7Stbbdev                 for (size_t j = 0; j < 4; ++j) {
174151c0b2f7Stbbdev                     g_ExceptionInMaster = (j & 1) != 0;
174251c0b2f7Stbbdev                     g_SolitaryException = (j & 2) != 0;
174351c0b2f7Stbbdev                     g_NumTokens = 2 * g_NumThreads;
174451c0b2f7Stbbdev 
174551c0b2f7Stbbdev                     TestCancelation2_pipeline();
174651c0b2f7Stbbdev                 }
1747*c4568449SPavel Kumbrasev             });
174851c0b2f7Stbbdev         }
174951c0b2f7Stbbdev     }
175051c0b2f7Stbbdev }
1751