xref: /oneTBB/test/tbb/test_eh_algorithms.cpp (revision 67c11716)
1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "common/test.h"
18 #include "common/concurrency_tracker.h"
19 #include "common/iterator.h"
20 #include "common/utils_concurrency_limit.h"
21 
22 #include <limits.h> // for INT_MAX
23 #include <thread>
24 #include <vector>
25 
26 #include "tbb/parallel_for.h"
27 #include "tbb/parallel_reduce.h"
28 #include "tbb/parallel_for_each.h"
29 #include "tbb/parallel_pipeline.h"
30 #include "tbb/blocked_range.h"
31 #include "tbb/task_group.h"
32 #include "tbb/global_control.h"
33 #include "tbb/concurrent_unordered_map.h"
34 #include "tbb/task.h"
35 
36 //! \file test_eh_algorithms.cpp
37 //! \brief Test for [algorithms.parallel_for algorithms.parallel_reduce algorithms.parallel_deterministic_reduce algorithms.parallel_for_each algorithms.parallel_pipeline algorithms.parallel_pipeline.flow_control] specifications
38 
39 #define FLAT_RANGE  100000
40 #define FLAT_GRAIN  100
41 #define OUTER_RANGE  100
42 #define OUTER_GRAIN  10
43 #define INNER_RANGE  (FLAT_RANGE / OUTER_RANGE)
44 #define INNER_GRAIN  (FLAT_GRAIN / OUTER_GRAIN)
45 
46 struct context_specific_counter {
47     tbb::concurrent_unordered_map<tbb::task_group_context*, std::atomic<unsigned>> context_map{};
48 
49     void increment() {
50         tbb::task_group_context* ctx = tbb::task::current_context();
51         REQUIRE(ctx != nullptr);
52         context_map[ctx]++;
53     }
54 
55     void reset() {
56         context_map.clear();
57     }
58 
59     void validate(unsigned expected_count, const char* msg) {
60         for (auto it = context_map.begin(); it != context_map.end(); it++) {
61             REQUIRE_MESSAGE( it->second <= expected_count, msg);
62         }
63     }
64 };
65 
66 std::atomic<intptr_t> g_FedTasksCount{}; // number of tasks added by parallel_for_each feeder
67 std::atomic<intptr_t> g_OuterParCalls{};  // number of actual invocations of the outer construct executed.
68 context_specific_counter g_TGCCancelled{};  // Number of times a task sees its group cancelled at start
69 
70 #include "common/exception_handling.h"
71 
72 /********************************
73       Variables in test
74 
75 __ Test control variables
76       g_ExceptionInMaster -- only the external thread is allowed to throw.  If false, the external cannot throw
77       g_SolitaryException -- only one throw may be executed.
78 
79 -- controls for ThrowTestException for pipeline tests
80       g_NestedPipelines -- are inner pipelines being run?
81       g_PipelinesStarted -- how many pipelines have run their first filter at least once.
82 
83 -- Information variables
84 
85    g_Master -- Thread ID of the "external" thread
86       In pipelines sometimes the external thread does not participate, so the tests have to be resilient to this.
87 
88 -- Measurement variables
89 
90    g_OuterParCalls -- how many outer parallel ranges or filters started
91    g_TGCCancelled --  how many inner parallel ranges or filters saw task::self().is_cancelled()
92    g_ExceptionsThrown -- number of throws executed (counted in ThrowTestException)
93    g_MasterExecutedThrow -- number of times external thread actually executed a throw
94    g_NonMasterExecutedThrow -- number of times non-external thread actually executed a throw
95    g_ExceptionCaught -- one of PropagatedException or unknown exception was caught.  (Other exceptions cause assertions.)
96 
97    --  Tallies for the task bodies which have executed (counted in each inner body, sampled in ThrowTestException)
98       g_CurExecuted -- total number of inner ranges or filters which executed
99       g_ExecutedAtLastCatch -- value of g_CurExecuted when last catch was made, 0 if none.
100       g_ExecutedAtFirstCatch -- value of g_CurExecuted when first catch is made, 0 if none.
101   *********************************/
102 
103 inline void ResetGlobals (  bool throwException = true, bool flog = false ) {
104     ResetEhGlobals( throwException, flog );
105     g_FedTasksCount = 0;
106     g_OuterParCalls = 0;
107     g_NestedPipelines = false;
108     g_TGCCancelled.reset();
109 }
110 
111 ////////////////////////////////////////////////////////////////////////////////
112 // Tests for tbb::parallel_for and tbb::parallel_reduce
113 ////////////////////////////////////////////////////////////////////////////////
114 
115 typedef size_t count_type;
116 typedef tbb::blocked_range<count_type> range_type;
117 
118 inline intptr_t CountSubranges(range_type r) {
119     if(!r.is_divisible()) return intptr_t(1);
120     range_type r2(r,tbb::split());
121     return CountSubranges(r) + CountSubranges(r2);
122 }
123 
124 inline intptr_t NumSubranges ( intptr_t length, intptr_t grain ) {
125     return CountSubranges(range_type(0,length,grain));
126 }
127 
128 template<class Body>
129 intptr_t TestNumSubrangesCalculation ( intptr_t length, intptr_t grain, intptr_t inner_length, intptr_t inner_grain ) {
130     ResetGlobals();
131     g_ThrowException = false;
132     intptr_t outerCalls = NumSubranges(length, grain),
133              innerCalls = NumSubranges(inner_length, inner_grain),
134              maxExecuted = outerCalls * (innerCalls + 1);
135     tbb::parallel_for( range_type(0, length, grain), Body() );
136     REQUIRE_MESSAGE (g_CurExecuted == maxExecuted, "Wrong estimation of bodies invocation count");
137     return maxExecuted;
138 }
139 
140 class NoThrowParForBody {
141 public:
142     void operator()( const range_type& r ) const {
143         if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true;
144         else g_NonMasterExecuted = true;
145         if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment();
146         utils::doDummyWork(r.size());
147     }
148 };
149 
150 #if TBB_USE_EXCEPTIONS
151 
152 void Test0 () {
153     ResetGlobals();
154     tbb::simple_partitioner p;
155     for( size_t i=0; i<10; ++i ) {
156         tbb::parallel_for( range_type(0, 0, 1), NoThrowParForBody() );
157         tbb::parallel_for( range_type(0, 0, 1), NoThrowParForBody(), p );
158         tbb::parallel_for( range_type(0, 128, 8), NoThrowParForBody() );
159         tbb::parallel_for( range_type(0, 128, 8), NoThrowParForBody(), p );
160     }
161 } // void Test0 ()
162 
163 //! Template that creates a functor suitable for parallel_reduce from a functor for parallel_for.
164 template<typename ParForBody>
165 class SimpleParReduceBody {
166     ParForBody m_Body;
167 public:
168     void operator=(const SimpleParReduceBody&) = delete;
169     SimpleParReduceBody(const SimpleParReduceBody&) = default;
170     SimpleParReduceBody() = default;
171 
172     void operator()( const range_type& r ) const { m_Body(r); }
173     SimpleParReduceBody( SimpleParReduceBody& left, tbb::split ) : m_Body(left.m_Body) {}
174     void join( SimpleParReduceBody& /*right*/ ) {}
175 }; // SimpleParReduceBody
176 
177 //! Test parallel_for and parallel_reduce for a given partitioner.
178 /** The Body need only be suitable for a parallel_for. */
179 template<typename ParForBody, typename Partitioner>
180 void TestParallelLoopAux() {
181     Partitioner partitioner;
182     for( int i=0; i<2; ++i ) {
183         ResetGlobals();
184         TRY();
185             if( i==0 )
186                 tbb::parallel_for( range_type(0, FLAT_RANGE, FLAT_GRAIN), ParForBody(), partitioner );
187             else {
188                 SimpleParReduceBody<ParForBody> rb;
189                 tbb::parallel_reduce( range_type(0, FLAT_RANGE, FLAT_GRAIN), rb, partitioner );
190             }
191         CATCH_AND_ASSERT();
192         // two cases: g_SolitaryException and !g_SolitaryException
193         //   1) g_SolitaryException: only one thread actually threw.  There is only one context, so the exception
194         //      (when caught) will cause that context to be cancelled.  After this event, there may be one or
195         //      more threads which are "in-flight", up to g_NumThreads, but no more will be started.  The threads,
196         //      when they start, if they see they are cancelled, TGCCancelled is incremented.
197         //   2) !g_SolitaryException: more than one thread can throw.  The number of threads that actually
198         //      threw is g_MasterExecutedThrow if only the external thread is allowed, else g_NonMasterExecutedThrow.
199         //      Only one context, so TGCCancelled should be <= g_NumThreads.
200         //
201         // the reasoning is similar for nested algorithms in a single context (Test2).
202         //
203         // If a thread throws in a context, more than one subsequent task body may see the
204         // cancelled state (if they are scheduled before the state is propagated.) this is
205         // infrequent, but it occurs.  So what was to be an assertion must be a remark.
206         g_TGCCancelled.validate( g_NumThreads, "Too many tasks ran after exception thrown");
207         REQUIRE_MESSAGE(g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
208         if ( g_SolitaryException ) {
209             REQUIRE_MESSAGE(g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test");
210             REQUIRE_MESSAGE(g_NumExceptionsCaught == (g_ExceptionInMaster ? g_MasterExecutedThrow : g_NonMasterExecutedThrow),
211                 "Not all throws were caught");
212             REQUIRE_MESSAGE(g_ExecutedAtFirstCatch == g_ExecutedAtLastCatch, "Too many exceptions occurred");
213         }
214         else {
215             REQUIRE_MESSAGE(g_NumExceptionsCaught >= 1, "No try blocks in any body expected in this test");
216         }
217     }
218 }  // TestParallelLoopAux
219 
220 //! Test with parallel_for and parallel_reduce, over all three kinds of partitioners.
221 /** The Body only needs to be suitable for tbb::parallel_for. */
222 template<typename Body>
223 void TestParallelLoop() {
224     // The simple and auto partitioners should be const, but not the affinity partitioner.
225     TestParallelLoopAux<Body, const tbb::simple_partitioner  >();
226     TestParallelLoopAux<Body, const tbb::auto_partitioner    >();
227 #define __TBB_TEMPORARILY_DISABLED 1
228 #if !__TBB_TEMPORARILY_DISABLED
229     // TODO: Improve the test so that it tolerates delayed start of tasks with affinity_partitioner
230     TestParallelLoopAux<Body, /***/ tbb::affinity_partitioner>();
231 #endif
232 #undef __TBB_TEMPORARILY_DISABLED
233 }
234 
235 class SimpleParForBody {
236 public:
237     void operator=(const SimpleParForBody&) = delete;
238     SimpleParForBody(const SimpleParForBody&) = default;
239     SimpleParForBody() = default;
240 
241     void operator()( const range_type& r ) const {
242         utils::ConcurrencyTracker ct;
243         ++g_CurExecuted;
244         if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true;
245         else g_NonMasterExecuted = true;
246         if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment();
247         utils::doDummyWork(r.size());
248         WaitUntilConcurrencyPeaks();
249         ThrowTestException(1);
250     }
251 };
252 
253 void Test1() {
254     // non-nested parallel_for/reduce with throwing body, one context
255     TestParallelLoop<SimpleParForBody>();
256 } // void Test1 ()
257 
258 class OuterParForBody {
259 public:
260     void operator=(const OuterParForBody&) = delete;
261     OuterParForBody(const OuterParForBody&) = default;
262     OuterParForBody() = default;
263     void operator()( const range_type& ) const {
264         utils::ConcurrencyTracker ct;
265         ++g_OuterParCalls;
266         tbb::parallel_for( tbb::blocked_range<size_t>(0, INNER_RANGE, INNER_GRAIN), SimpleParForBody() );
267     }
268 };
269 
270 //! Uses parallel_for body containing an inner parallel_for with the default context not wrapped by a try-block.
271 /** Inner algorithms are spawned inside the new bound context by default. Since
272     exceptions thrown from the inner parallel_for are not handled by the caller
273     (outer parallel_for body) in this test, they will cancel all the sibling inner
274     algorithms. **/
275 void Test2 () {
276     TestParallelLoop<OuterParForBody>();
277 } // void Test2 ()
278 
279 class OuterParForBodyWithIsolatedCtx {
280 public:
281     void operator()( const range_type& ) const {
282         tbb::task_group_context ctx(tbb::task_group_context::isolated);
283         ++g_OuterParCalls;
284         tbb::parallel_for( tbb::blocked_range<size_t>(0, INNER_RANGE, INNER_GRAIN), SimpleParForBody(), tbb::simple_partitioner(), ctx );
285     }
286 };
287 
288 //! Uses parallel_for body invoking an inner parallel_for with an isolated context without a try-block.
289 /** Even though exceptions thrown from the inner parallel_for are not handled
290     by the caller in this test, they will not affect sibling inner algorithms
291     already running because of the isolated contexts. However because the first
292     exception cancels the root parallel_for only the first g_NumThreads subranges
293     will be processed (which launch inner parallel_fors) **/
294 void Test3 () {
295     ResetGlobals();
296     typedef OuterParForBodyWithIsolatedCtx body_type;
297     intptr_t  innerCalls = NumSubranges(INNER_RANGE, INNER_GRAIN),
298             // we expect one thread to throw without counting, the rest to run to completion
299             // this formula assumes g_numThreads outer pfor ranges will be started, but that is not the
300             // case; the SimpleParFor subranges are started up as part of the outer ones, and when
301             // the amount of concurrency reaches g_NumThreads no more outer Pfor ranges are started.
302             // so we have to count the number of outer Pfors actually started.
303             minExecuted = (g_NumThreads - 1) * innerCalls;
304     TRY();
305         tbb::parallel_for( range_type(0, OUTER_RANGE, OUTER_GRAIN), body_type() );
306     CATCH_AND_ASSERT();
307     minExecuted = (g_OuterParCalls - 1) * innerCalls;  // see above
308 
309     // The first formula above assumes all ranges of the outer parallel for are executed, and one
310     // cancels.  In the event, we have a smaller number of ranges that start before the exception
311     // is caught.
312     //
313     //  g_SolitaryException:One inner range throws.  Outer parallel_For is cancelled, but sibling
314     //                      parallel_fors continue to completion (unless the threads that execute
315     //                      are not allowed to throw, in which case we will not see any exceptions).
316     // !g_SolitaryException:multiple inner ranges may throw.  Any which throws will stop, and the
317     //                      corresponding range of the outer pfor will stop also.
318     //
319     // In either case, once the outer pfor gets the exception it will stop executing further ranges.
320 
321     // if the only threads executing were not allowed to throw, then not seeing an exception is okay.
322     bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecuted) || (!g_ExceptionInMaster && !g_NonMasterExecuted);
323     if ( g_SolitaryException ) {
324         g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception");
325         REQUIRE_MESSAGE (g_CurExecuted > minExecuted, "Too few tasks survived exception");
326         REQUIRE_MESSAGE ((g_CurExecuted <= minExecuted + (g_ExecutedAtLastCatch + g_NumThreads)), "Too many tasks survived exception");
327         REQUIRE_MESSAGE ((g_NumExceptionsCaught == 1 || okayNoExceptionsCaught), "No try_blocks in any body expected in this test");
328     }
329     else {
330         REQUIRE_MESSAGE ((g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads), "Too many tasks survived exception");
331         REQUIRE_MESSAGE ((g_NumExceptionsCaught >= 1 || okayNoExceptionsCaught), "No try_blocks in any body expected in this test");
332     }
333 } // void Test3 ()
334 
335 class OuterParForExceptionSafeBody {
336 public:
337     void operator()( const range_type& ) const {
338         tbb::task_group_context ctx(tbb::task_group_context::isolated);
339         ++g_OuterParCalls;
340         TRY();
341             tbb::parallel_for( tbb::blocked_range<size_t>(0, INNER_RANGE, INNER_GRAIN), SimpleParForBody(), tbb::simple_partitioner(), ctx );
342         CATCH();  // this macro sets g_ExceptionCaught
343     }
344 };
345 
346 //! Uses parallel_for body invoking an inner parallel_for (with isolated context) inside a try-block.
347 /** Since exception(s) thrown from the inner parallel_for are handled by the caller
348     in this test, they do not affect neither other tasks of the the root parallel_for
349     nor sibling inner algorithms. **/
350 void Test4 () {
351     ResetGlobals( true, true );
352     intptr_t  innerCalls = NumSubranges(INNER_RANGE, INNER_GRAIN),
353               outerCalls = NumSubranges(OUTER_RANGE, OUTER_GRAIN);
354     TRY();
355         tbb::parallel_for( range_type(0, OUTER_RANGE, OUTER_GRAIN), OuterParForExceptionSafeBody() );
356     CATCH();
357     // g_SolitaryException  : one inner pfor will throw, the rest will execute to completion.
358     //                        so the count should be (outerCalls -1) * innerCalls, if a throw happened.
359     // !g_SolitaryException : possible multiple inner pfor throws.  Should be approximately
360     //                        (outerCalls - g_NumExceptionsCaught) * innerCalls, give or take a few
361     intptr_t  minExecuted = (outerCalls - g_NumExceptionsCaught) * innerCalls;
362     bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecuted) || (!g_ExceptionInMaster && !g_NonMasterExecuted);
363     if ( g_SolitaryException ) {
364         // only one task had exception thrown. That task had at least one execution (the one that threw).
365         // There may be an arbitrary number of ranges executed after the throw but before the exception
366         // is caught in the scheduler and cancellation is signaled.  (seen 9, 11 and 62 (!) for 8 threads)
367         REQUIRE_MESSAGE ((g_NumExceptionsCaught == 1 || okayNoExceptionsCaught), "No exception registered");
368         REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too few tasks executed");
369         g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception");
370         // a small number of threads can execute in a throwing sub-pfor, if the task which is
371         // to do the solitary throw swaps out after registering its intent to throw but before it
372         // actually does so. As a result, the number of extra tasks cannot exceed the number of thread
373         // for each nested pfor invication)
374         REQUIRE_MESSAGE (g_CurExecuted <= minExecuted + (g_NumThreads-1)*g_NumThreads/2, "Too many tasks survived exception");
375     }
376     else {
377         REQUIRE_MESSAGE (((g_NumExceptionsCaught >= 1 && g_NumExceptionsCaught <= outerCalls) || okayNoExceptionsCaught), "Unexpected actual number of exceptions");
378         REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too few executed tasks reported");
379         REQUIRE_MESSAGE ((g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads), "Too many tasks survived multiple exceptions");
380         REQUIRE_MESSAGE (g_CurExecuted <= outerCalls * (1 + g_NumThreads), "Too many tasks survived exception");
381     }
382 } // void Test4 ()
383 
384 //! Testing parallel_for and parallel_reduce exception handling
385 //! \brief \ref error_guessing
386 TEST_CASE("parallel_for and parallel_reduce exception handling test #0") {
387     for (auto concurrency_level: utils::concurrency_range()) {
388         g_NumThreads = static_cast<int>(concurrency_level);
389         g_Master = std::this_thread::get_id();
390         if (g_NumThreads > 1) {
391             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
392             // Execute in all the possible modes
393             for ( size_t j = 0; j < 4; ++j ) {
394                 g_ExceptionInMaster = (j & 1) != 0;
395                 g_SolitaryException = (j & 2) != 0;
396 
397                 Test0();
398             }
399         }
400     }
401 }
402 
403 //! Testing parallel_for and parallel_reduce exception handling
404 //! \brief \ref error_guessing
405 TEST_CASE("parallel_for and parallel_reduce exception handling test #1") {
406     for (auto concurrency_level: utils::concurrency_range()) {
407         g_NumThreads = static_cast<int>(concurrency_level);
408         g_Master = std::this_thread::get_id();
409         if (g_NumThreads > 1) {
410             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
411             // Execute in all the possible modes
412             for ( size_t j = 0; j < 4; ++j ) {
413                 g_ExceptionInMaster = (j & 1) != 0;
414                 g_SolitaryException = (j & 2) != 0;
415 
416                 Test1();
417             }
418         }
419     }
420 }
421 
422 //! Testing parallel_for and parallel_reduce exception handling
423 //! \brief \ref error_guessing
424 TEST_CASE("parallel_for and parallel_reduce exception handling test #2") {
425     for (auto concurrency_level: utils::concurrency_range()) {
426         g_NumThreads = static_cast<int>(concurrency_level);
427         g_Master = std::this_thread::get_id();
428         if (g_NumThreads > 1) {
429             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
430             // Execute in all the possible modes
431             for ( size_t j = 0; j < 4; ++j ) {
432                 g_ExceptionInMaster = (j & 1) != 0;
433                 g_SolitaryException = (j & 2) != 0;
434 
435                 Test2();
436             }
437         }
438     }
439 }
440 
441 //! Testing parallel_for and parallel_reduce exception handling
442 //! \brief \ref error_guessing
443 TEST_CASE("parallel_for and parallel_reduce exception handling test #3") {
444     for (auto concurrency_level: utils::concurrency_range()) {
445         g_NumThreads = static_cast<int>(concurrency_level);
446         g_Master = std::this_thread::get_id();
447         if (g_NumThreads > 1) {
448             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
449             // Execute in all the possible modes
450             for ( size_t j = 0; j < 4; ++j ) {
451                 g_ExceptionInMaster = (j & 1) != 0;
452                 g_SolitaryException = (j & 2) != 0;
453 
454                 Test3();
455             }
456         }
457     }
458 }
459 
460 //! Testing parallel_for and parallel_reduce exception handling
461 //! \brief \ref error_guessing
462 TEST_CASE("parallel_for and parallel_reduce exception handling test #4") {
463     for (auto concurrency_level: utils::concurrency_range()) {
464         g_NumThreads = static_cast<int>(concurrency_level);
465         g_Master = std::this_thread::get_id();
466         if (g_NumThreads > 1) {
467             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
468             // Execute in all the possible modes
469             for ( size_t j = 0; j < 4; ++j ) {
470                 g_ExceptionInMaster = (j & 1) != 0;
471                 g_SolitaryException = (j & 2) != 0;
472 
473                 Test4();
474             }
475         }
476     }
477 }
478 
479 #endif /* TBB_USE_EXCEPTIONS */
480 
481 class ParForBodyToCancel {
482 public:
483     void operator()( const range_type& ) const {
484         ++g_CurExecuted;
485         Cancellator::WaitUntilReady();
486     }
487 };
488 
489 template<class B>
490 class ParForLauncher {
491     tbb::task_group_context &my_ctx;
492 public:
493     void operator()() const {
494         tbb::parallel_for( range_type(0, FLAT_RANGE, FLAT_GRAIN), B(), tbb::simple_partitioner(), my_ctx );
495     }
496     ParForLauncher ( tbb::task_group_context& ctx ) : my_ctx(ctx) {}
497 };
498 
499 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm).
500 void TestCancelation1 () {
501     ResetGlobals( false );
502     RunCancellationTest<ParForLauncher<ParForBodyToCancel>, Cancellator>( NumSubranges(FLAT_RANGE, FLAT_GRAIN) / 4 );
503 }
504 
505 class Cancellator2  {
506     tbb::task_group_context &m_GroupToCancel;
507 
508 public:
509     void operator()() const {
510         utils::ConcurrencyTracker ct;
511         WaitUntilConcurrencyPeaks();
512         m_GroupToCancel.cancel_group_execution();
513         g_ExecutedAtLastCatch = g_CurExecuted.load();
514     }
515 
516     Cancellator2 ( tbb::task_group_context& ctx, intptr_t ) : m_GroupToCancel(ctx) {}
517 };
518 
519 class ParForBodyToCancel2 {
520 public:
521     void operator()( const range_type& ) const {
522         ++g_CurExecuted;
523         utils::ConcurrencyTracker ct;
524         // The test will hang (and be timed out by the test system) if is_cancelled() is broken
525         while( !tbb::is_current_task_group_canceling() )
526             utils::yield();
527     }
528 };
529 
530 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm).
531 /** This version also tests tbb::is_current_task_group_canceling() method. **/
532 void TestCancelation2 () {
533     ResetGlobals();
534     RunCancellationTest<ParForLauncher<ParForBodyToCancel2>, Cancellator2>();
535     REQUIRE_MESSAGE (g_ExecutedAtLastCatch < g_NumThreads, "Somehow worker tasks started their execution before the cancellator task");
536     g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived cancellation");
537     REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Some tasks were executed after cancellation");
538 }
539 
540 ////////////////////////////////////////////////////////////////////////////////
541 // Regression test based on the contribution by the author of the following forum post:
542 // http://softwarecommunity.intel.com/isn/Community/en-US/forums/thread/30254959.aspx
543 
544 class Worker {
545     static const int max_nesting = 3;
546     static const int reduce_range = 1024;
547     static const int reduce_grain = 256;
548 public:
549     int DoWork (int level);
550     int Validate (int start_level) {
551         int expected = 1; // identity for multiplication
552         for(int i=start_level+1; i<max_nesting; ++i)
553              expected *= reduce_range;
554         return expected;
555     }
556 };
557 
558 class RecursiveParReduceBodyWithSharedWorker {
559     Worker * m_SharedWorker;
560     int m_NestingLevel;
561     int m_Result;
562 public:
563     RecursiveParReduceBodyWithSharedWorker ( RecursiveParReduceBodyWithSharedWorker& src, tbb::split )
564         : m_SharedWorker(src.m_SharedWorker)
565         , m_NestingLevel(src.m_NestingLevel)
566         , m_Result(0)
567     {}
568     RecursiveParReduceBodyWithSharedWorker ( Worker *w, int outer )
569         : m_SharedWorker(w)
570         , m_NestingLevel(outer)
571         , m_Result(0)
572     {}
573 
574     void operator() ( const tbb::blocked_range<size_t>& r ) {
575         if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true;
576         else g_NonMasterExecuted = true;
577         if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment();
578         for (size_t i = r.begin (); i != r.end (); ++i) {
579             m_Result += m_SharedWorker->DoWork (m_NestingLevel);
580         }
581     }
582     void join (const RecursiveParReduceBodyWithSharedWorker & x) {
583         m_Result += x.m_Result;
584     }
585     int result () { return m_Result; }
586 };
587 
588 int Worker::DoWork ( int level ) {
589     ++level;
590     if ( level < max_nesting ) {
591         RecursiveParReduceBodyWithSharedWorker rt (this, level);
592         tbb::parallel_reduce (tbb::blocked_range<size_t>(0, reduce_range, reduce_grain), rt);
593         return rt.result();
594     }
595     else
596         return 1;
597 }
598 
599 //! Regression test for hanging that occurred with the first version of cancellation propagation
600 void TestCancelation3 () {
601     Worker w;
602     int result   = w.DoWork (0);
603     int expected = w.Validate(0);
604     REQUIRE_MESSAGE ( result == expected, "Wrong calculation result");
605 }
606 
607 struct StatsCounters {
608     std::atomic<size_t> my_total_created;
609     std::atomic<size_t> my_total_deleted;
610     StatsCounters() {
611         my_total_created = 0;
612         my_total_deleted = 0;
613     }
614 };
615 
616 class ParReduceBody {
617     StatsCounters* my_stats;
618     size_t my_id;
619     bool my_exception;
620     tbb::task_group_context& tgc;
621 
622 public:
623     ParReduceBody( StatsCounters& s_, tbb::task_group_context& context, bool e_ ) :
624         my_stats(&s_), my_exception(e_), tgc(context) {
625         my_id = my_stats->my_total_created++;
626     }
627 
628     ParReduceBody( const ParReduceBody& lhs ) : tgc(lhs.tgc) {
629         my_stats = lhs.my_stats;
630         my_id = my_stats->my_total_created++;
631     }
632 
633     ParReduceBody( ParReduceBody& lhs, tbb::split ) : tgc(lhs.tgc) {
634         my_stats = lhs.my_stats;
635         my_id = my_stats->my_total_created++;
636     }
637 
638     ~ParReduceBody(){ ++my_stats->my_total_deleted; }
639 
640     void operator()( const tbb::blocked_range<std::size_t>& /*range*/ ) const {
641         //Do nothing, except for one task (chosen arbitrarily)
642         if( my_id >= 12 ) {
643             if( my_exception )
644                 ThrowTestException(1);
645             else
646                 tgc.cancel_group_execution();
647         }
648     }
649 
650     void join( ParReduceBody& /*rhs*/ ) {}
651 };
652 
653 void TestCancelation4() {
654     StatsCounters statsObj;
655 #if TBB_USE_EXCEPTIONS
656     try
657 #endif
658     {
659         tbb::task_group_context tgc1, tgc2;
660         ParReduceBody body_for_cancellation(statsObj, tgc1, false), body_for_exception(statsObj, tgc2, true);
661         tbb::parallel_reduce( tbb::blocked_range<std::size_t>(0,100000000,100), body_for_cancellation, tbb::simple_partitioner(), tgc1 );
662         tbb::parallel_reduce( tbb::blocked_range<std::size_t>(0,100000000,100), body_for_exception, tbb::simple_partitioner(), tgc2 );
663     }
664 #if TBB_USE_EXCEPTIONS
665     catch(...) {}
666 #endif
667     REQUIRE_MESSAGE ( statsObj.my_total_created==statsObj.my_total_deleted, "Not all parallel_reduce body objects created were reclaimed");
668 }
669 
670 //! Testing parallel_for and parallel_reduce cancellation
671 //! \brief \ref error_guessing
672 TEST_CASE("parallel_for and parallel_reduce cancellation test #1") {
673     for (auto concurrency_level: utils::concurrency_range()) {
674         g_NumThreads = static_cast<int>(concurrency_level);
675         g_Master = std::this_thread::get_id();
676         if (g_NumThreads > 1) {
677             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
678             // Execute in all the possible modes
679             for ( size_t j = 0; j < 4; ++j ) {
680                 g_ExceptionInMaster = (j & 1) != 0;
681                 g_SolitaryException = (j & 2) != 0;
682 
683                 TestCancelation1();
684             }
685         }
686     }
687 }
688 
689 //! Testing parallel_for and parallel_reduce cancellation
690 //! \brief \ref error_guessing
691 TEST_CASE("parallel_for and parallel_reduce cancellation test #2") {
692     for (auto concurrency_level: utils::concurrency_range()) {
693         g_NumThreads = static_cast<int>(concurrency_level);
694         g_Master = std::this_thread::get_id();
695         if (g_NumThreads > 1) {
696             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
697             // Execute in all the possible modes
698             for ( size_t j = 0; j < 4; ++j ) {
699                 g_ExceptionInMaster = (j & 1) != 0;
700                 g_SolitaryException = (j & 2) != 0;
701 
702                 TestCancelation2();
703             }
704         }
705     }
706 }
707 
708 //! Testing parallel_for and parallel_reduce cancellation
709 //! \brief \ref error_guessing
710 TEST_CASE("parallel_for and parallel_reduce cancellation test #3") {
711     for (auto concurrency_level: utils::concurrency_range()) {
712         g_NumThreads = static_cast<int>(concurrency_level);
713         g_Master = std::this_thread::get_id();
714         if (g_NumThreads > 1) {
715             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
716             // Execute in all the possible modes
717             for ( size_t j = 0; j < 4; ++j ) {
718                 g_ExceptionInMaster = (j & 1) != 0;
719                 g_SolitaryException = (j & 2) != 0;
720 
721                 TestCancelation3();
722             }
723         }
724     }
725 }
726 
727 //! Testing parallel_for and parallel_reduce cancellation
728 //! \brief \ref error_guessing
729 TEST_CASE("parallel_for and parallel_reduce cancellation test #4") {
730     for (auto concurrency_level: utils::concurrency_range()) {
731         g_NumThreads = static_cast<int>(concurrency_level);
732         g_Master = std::this_thread::get_id();
733         if (g_NumThreads > 1) {
734             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
735             // Execute in all the possible modes
736             for ( size_t j = 0; j < 4; ++j ) {
737                 g_ExceptionInMaster = (j & 1) != 0;
738                 g_SolitaryException = (j & 2) != 0;
739 
740                 TestCancelation4();
741             }
742         }
743     }
744 }
745 
746 ////////////////////////////////////////////////////////////////////////////////
747 // Tests for tbb::parallel_for_each
748 ////////////////////////////////////////////////////////////////////////////////
749 
750 std::size_t get_iter_range_size() {
751     // Set the minimal iteration sequence size to 50 to improve test complexity on small machines
752     return std::max(50, g_NumThreads * 2);
753 }
754 
755 template<typename Iterator>
756 struct adaptive_range {
757     std::vector<std::size_t> my_array;
758 
759     adaptive_range(std::size_t size) : my_array(size + 1) {}
760     using iterator = Iterator;
761 
762     iterator begin() const {
763         return iterator{&my_array.front()};
764     }
765     iterator begin() {
766         return iterator{&my_array.front()};
767     }
768     iterator end() const {
769         return iterator{&my_array.back()};
770     }
771     iterator end() {
772         return iterator{&my_array.back()};
773     }
774 };
775 
776 void Feed ( tbb::feeder<size_t> &feeder, size_t val ) {
777     if (g_FedTasksCount < 50) {
778         ++g_FedTasksCount;
779         feeder.add(val);
780     }
781 }
782 
783 #define RunWithSimpleBody(func, body)       \
784     func<utils::ForwardIterator<size_t>, body>();         \
785     func<utils::ForwardIterator<size_t>, body##WithFeeder>()
786 
787 #define RunWithTemplatedBody(func, body)     \
788     func<utils::ForwardIterator<size_t>, body<utils::ForwardIterator<size_t> > >();         \
789     func<utils::ForwardIterator<size_t>, body##WithFeeder<utils::ForwardIterator<size_t> > >()
790 
791 #if TBB_USE_EXCEPTIONS
792 
793 // Simple functor object with exception
794 class SimpleParForEachBody {
795 public:
796     void operator() ( size_t &value ) const {
797         ++g_CurExecuted;
798         if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true;
799         else g_NonMasterExecuted = true;
800         if( tbb::is_current_task_group_canceling() ) {
801             g_TGCCancelled.increment();
802         }
803         utils::ConcurrencyTracker ct;
804         value += 1000;
805         WaitUntilConcurrencyPeaks();
806         ThrowTestException(1);
807     }
808 };
809 
810 // Simple functor object with exception and feeder
811 class SimpleParForEachBodyWithFeeder : SimpleParForEachBody {
812 public:
813     void operator() ( size_t &value, tbb::feeder<size_t> &feeder ) const {
814         Feed(feeder, 0);
815         SimpleParForEachBody::operator()(value);
816     }
817 };
818 
819 // Tests exceptions without nesting
820 template <class Iterator, class simple_body>
821 void Test1_parallel_for_each () {
822     ResetGlobals();
823     auto range = adaptive_range<Iterator>(get_iter_range_size());
824     TRY();
825         tbb::parallel_for_each(std::begin(range), std::end(range), simple_body() );
826     CATCH_AND_ASSERT();
827     REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
828     g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived cancellation");
829     REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test");
830     if ( !g_SolitaryException )
831         REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
832 
833 } // void Test1_parallel_for_each ()
834 
835 template <class Iterator>
836 class OuterParForEachBody {
837 public:
838     void operator()( size_t& /*value*/ ) const {
839         ++g_OuterParCalls;
840         auto range = adaptive_range<Iterator>(get_iter_range_size());
841         tbb::parallel_for_each(std::begin(range), std::end(range), SimpleParForEachBody());
842     }
843 };
844 
845 template <class Iterator>
846 class OuterParForEachBodyWithFeeder : OuterParForEachBody<Iterator> {
847 public:
848     void operator()( size_t& value, tbb::feeder<size_t>& feeder ) const {
849         Feed(feeder, 0);
850         OuterParForEachBody<Iterator>::operator()(value);
851     }
852 };
853 
854 //! Uses parallel_for_each body containing an inner parallel_for_each with the default context not wrapped by a try-block.
855 /** Inner algorithms are spawned inside the new bound context by default. Since
856     exceptions thrown from the inner parallel_for_each are not handled by the caller
857     (outer parallel_for_each body) in this test, they will cancel all the sibling inner
858     algorithms. **/
859 template <class Iterator, class outer_body>
860 void Test2_parallel_for_each () {
861     ResetGlobals();
862     auto range = adaptive_range<Iterator>(get_iter_range_size());
863     TRY();
864         tbb::parallel_for_each(std::begin(range), std::end(range), outer_body() );
865     CATCH_AND_ASSERT();
866     REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
867     g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived cancellation");
868     REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test");
869     if ( !g_SolitaryException )
870         REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
871 } // void Test2_parallel_for_each ()
872 
873 template <class Iterator>
874 class OuterParForEachBodyWithIsolatedCtx {
875 public:
876     void operator()( size_t& /*value*/ ) const {
877         tbb::task_group_context ctx(tbb::task_group_context::isolated);
878         ++g_OuterParCalls;
879         auto range = adaptive_range<Iterator>(get_iter_range_size());
880         tbb::parallel_for_each(std::begin(range), std::end(range), SimpleParForEachBody(), ctx);
881     }
882 };
883 
884 template <class Iterator>
885 class OuterParForEachBodyWithIsolatedCtxWithFeeder : OuterParForEachBodyWithIsolatedCtx<Iterator> {
886 public:
887     void operator()( size_t& value, tbb::feeder<size_t> &feeder ) const {
888         Feed(feeder, 0);
889         OuterParForEachBodyWithIsolatedCtx<Iterator>::operator()(value);
890     }
891 };
892 
893 //! Uses parallel_for_each body invoking an inner parallel_for_each with an isolated context without a try-block.
894 /** Even though exceptions thrown from the inner parallel_for_each are not handled
895     by the caller in this test, they will not affect sibling inner algorithms
896     already running because of the isolated contexts. However because the first
897     exception cancels the root parallel_for_each, at most the first g_NumThreads subranges
898     will be processed (which launch inner parallel_for_eachs) **/
899 template <class Iterator, class outer_body>
900 void Test3_parallel_for_each () {
901     ResetGlobals();
902     auto range = adaptive_range<Iterator>(get_iter_range_size());
903     intptr_t innerCalls = get_iter_range_size(),
904              // The assumption here is the same as in outer parallel fors.
905              minExecuted = (g_NumThreads - 1) * innerCalls;
906     g_Master = std::this_thread::get_id();
907     TRY();
908         tbb::parallel_for_each(std::begin(range), std::end(range), outer_body());
909     CATCH_AND_ASSERT();
910     // figure actual number of expected executions given the number of outer PDos started.
911     minExecuted = (g_OuterParCalls - 1) * innerCalls;
912     // one extra thread may run a task that sees cancellation.  Infrequent but possible
913     g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception");
914     if ( g_SolitaryException ) {
915         REQUIRE_MESSAGE (g_CurExecuted > minExecuted, "Too few tasks survived exception");
916         REQUIRE_MESSAGE (g_CurExecuted <= minExecuted + (g_ExecutedAtLastCatch + g_NumThreads), "Too many tasks survived exception");
917     }
918     REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test");
919     if ( !g_SolitaryException )
920         REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
921 } // void Test3_parallel_for_each ()
922 
923 template <class Iterator>
924 class OuterParForEachWithEhBody {
925 public:
926     void operator()( size_t& /*value*/ ) const {
927         tbb::task_group_context ctx(tbb::task_group_context::isolated);
928         ++g_OuterParCalls;
929         auto range = adaptive_range<Iterator>(get_iter_range_size());
930         TRY();
931             tbb::parallel_for_each(std::begin(range), std::end(range), SimpleParForEachBody(), ctx);
932         CATCH();
933     }
934 };
935 
936 template <class Iterator>
937 class OuterParForEachWithEhBodyWithFeeder : OuterParForEachWithEhBody<Iterator> {
938 public:
939     void operator=(const OuterParForEachWithEhBodyWithFeeder&) = delete;
940     OuterParForEachWithEhBodyWithFeeder(const OuterParForEachWithEhBodyWithFeeder&) = default;
941     OuterParForEachWithEhBodyWithFeeder() = default;
942     void operator()( size_t &value, tbb::feeder<size_t> &feeder ) const {
943         Feed(feeder, 0);
944         OuterParForEachWithEhBody<Iterator>::operator()(value);
945     }
946 };
947 
948 //! Uses parallel_for body invoking an inner parallel_for (with default bound context) inside a try-block.
949 /** Since exception(s) thrown from the inner parallel_for are handled by the caller
950     in this test, they do not affect neither other tasks of the the root parallel_for
951     nor sibling inner algorithms. **/
952 template <class Iterator, class outer_body_with_eh>
953 void Test4_parallel_for_each () {
954     ResetGlobals( true, true );
955     auto range = adaptive_range<Iterator>(get_iter_range_size());
956     g_Master = std::this_thread::get_id();
957     TRY();
958         tbb::parallel_for_each(std::begin(range), std::end(range), outer_body_with_eh());
959     CATCH();
960     REQUIRE_MESSAGE (!l_ExceptionCaughtAtCurrentLevel, "All exceptions must have been handled in the parallel_for_each body");
961     intptr_t innerCalls = get_iter_range_size(),
962              outerCalls = get_iter_range_size() + g_FedTasksCount,
963              maxExecuted = outerCalls * innerCalls,
964              minExecuted = 0;
965     g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception");
966     if ( g_SolitaryException ) {
967         minExecuted = maxExecuted - innerCalls;
968         REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No exception registered");
969         REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too few tasks executed");
970         // This test has the same property as Test4 (parallel_for); the exception can be
971         // thrown, but some number of tasks from the outer Pdo can execute after the throw but
972         // before the cancellation is signaled (have seen 36).
973         DOCTEST_WARN_MESSAGE(g_CurExecuted < maxExecuted, "All tasks survived exception. Oversubscription?");
974     }
975     else {
976         minExecuted = g_NumExceptionsCaught;
977         REQUIRE_MESSAGE ((g_NumExceptionsCaught > 1 && g_NumExceptionsCaught <= outerCalls), "Unexpected actual number of exceptions");
978         REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too many executed tasks reported");
979         REQUIRE_MESSAGE (g_CurExecuted < g_ExecutedAtLastCatch + g_NumThreads + outerCalls, "Too many tasks survived multiple exceptions");
980         REQUIRE_MESSAGE (g_CurExecuted <= outerCalls * (1 + g_NumThreads), "Too many tasks survived exception");
981     }
982 } // void Test4_parallel_for_each ()
983 
984 // This body throws an exception only if the task was added by feeder
985 class ParForEachBodyWithThrowingFeederTasks {
986 public:
987     //! This form of the function call operator can be used when the body needs to add more work during the processing
988     void operator() (const size_t &value, tbb::feeder<size_t> &feeder ) const {
989         ++g_CurExecuted;
990         if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true;
991         else g_NonMasterExecuted = true;
992         if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment();
993         Feed(feeder, 1);
994         if (value == 1)
995             ThrowTestException(1);
996     }
997 }; // class ParForEachBodyWithThrowingFeederTasks
998 
999 // Test exception in task, which was added by feeder.
1000 template <class Iterator>
1001 void Test5_parallel_for_each () {
1002     ResetGlobals();
1003     auto range = adaptive_range<Iterator>(get_iter_range_size());
1004     g_Master = std::this_thread::get_id();
1005     TRY();
1006         tbb::parallel_for_each(std::begin(range), std::end(range), ParForEachBodyWithThrowingFeederTasks());
1007     CATCH();
1008     if (g_SolitaryException) {
1009         // Failure occurs when g_ExceptionInMaster is false, but all the 1 values in the range
1010         // are handled by the external thread.  In this case no throw occurs.
1011         REQUIRE_MESSAGE ((l_ExceptionCaughtAtCurrentLevel               // we saw an exception
1012                 || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow)  // non-external trhead throws but none tried
1013                 || (g_ExceptionInMaster && !g_MasterExecutedThrow))     // external thread throws but external thread didn't try
1014                 , "At least one exception should occur");
1015     }
1016 } // void Test5_parallel_for_each ()
1017 
1018 //! Testing parallel_for_each exception handling
1019 //! \brief \ref error_guessing
1020 TEST_CASE("parallel_for_each exception handling test #1") {
1021     for (auto concurrency_level: utils::concurrency_range()) {
1022         g_NumThreads = static_cast<int>(concurrency_level);
1023         g_Master = std::this_thread::get_id();
1024         if (g_NumThreads > 1) {
1025             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
1026             // Execute in all the possible modes
1027             for ( size_t j = 0; j < 4; ++j ) {
1028                 g_ExceptionInMaster = (j & 1) != 0;
1029                 g_SolitaryException = (j & 2) != 0;
1030 
1031                 RunWithSimpleBody(Test1_parallel_for_each, SimpleParForEachBody);
1032             }
1033         }
1034     }
1035 }
1036 
1037 //! Testing parallel_for_each exception handling
1038 //! \brief \ref error_guessing
1039 TEST_CASE("parallel_for_each exception handling test #2") {
1040     for (auto concurrency_level: utils::concurrency_range()) {
1041         g_NumThreads = static_cast<int>(concurrency_level);
1042         g_Master = std::this_thread::get_id();
1043         if (g_NumThreads > 1) {
1044             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
1045             // Execute in all the possible modes
1046             for ( size_t j = 0; j < 4; ++j ) {
1047                 g_ExceptionInMaster = (j & 1) != 0;
1048                 g_SolitaryException = (j & 2) != 0;
1049 
1050                 RunWithTemplatedBody(Test2_parallel_for_each, OuterParForEachBody);
1051             }
1052         }
1053     }
1054 }
1055 
1056 //! Testing parallel_for_each exception handling
1057 //! \brief \ref error_guessing
1058 TEST_CASE("parallel_for_each exception handling test #3") {
1059     for (auto concurrency_level: utils::concurrency_range()) {
1060         g_NumThreads = static_cast<int>(concurrency_level);
1061         g_Master = std::this_thread::get_id();
1062         if (g_NumThreads > 1) {
1063             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
1064             // Execute in all the possible modes
1065             for ( size_t j = 0; j < 4; ++j ) {
1066                 g_ExceptionInMaster = (j & 1) != 0;
1067                 g_SolitaryException = (j & 2) != 0;
1068 
1069                 RunWithTemplatedBody(Test3_parallel_for_each, OuterParForEachBodyWithIsolatedCtx);
1070             }
1071         }
1072     }
1073 }
1074 
1075 //! Testing parallel_for_each exception handling
1076 //! \brief \ref error_guessing
1077 TEST_CASE("parallel_for_each exception handling test #4") {
1078     for (auto concurrency_level: utils::concurrency_range()) {
1079         g_NumThreads = static_cast<int>(concurrency_level);
1080         g_Master = std::this_thread::get_id();
1081         if (g_NumThreads > 1) {
1082             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
1083             // Execute in all the possible modes
1084             for ( size_t j = 0; j < 4; ++j ) {
1085                 g_ExceptionInMaster = (j & 1) != 0;
1086                 g_SolitaryException = (j & 2) != 0;
1087 
1088                 RunWithTemplatedBody(Test4_parallel_for_each, OuterParForEachWithEhBody);
1089             }
1090         }
1091     }
1092 }
1093 
1094 //! Testing parallel_for_each exception handling
1095 //! \brief \ref error_guessing
1096 TEST_CASE("parallel_for_each exception handling test #5") {
1097     for (auto concurrency_level: utils::concurrency_range()) {
1098         g_NumThreads = static_cast<int>(concurrency_level);
1099         g_Master = std::this_thread::get_id();
1100         if (g_NumThreads > 1) {
1101             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
1102             // Execute in all the possible modes
1103             for ( size_t j = 0; j < 4; ++j ) {
1104                 g_ExceptionInMaster = (j & 1) != 0;
1105                 g_SolitaryException = (j & 2) != 0;
1106 
1107                 Test5_parallel_for_each<utils::InputIterator<size_t> >();
1108                 Test5_parallel_for_each<utils::ForwardIterator<size_t> >();
1109                 Test5_parallel_for_each<utils::RandomIterator<size_t> >();
1110             }
1111         }
1112     }
1113 }
1114 
1115 #endif /* TBB_USE_EXCEPTIONS */
1116 
1117 class ParForEachBodyToCancel {
1118 public:
1119     void operator()( size_t& /*value*/ ) const {
1120         ++g_CurExecuted;
1121         Cancellator::WaitUntilReady();
1122     }
1123 };
1124 
1125 class ParForEachBodyToCancelWithFeeder : ParForEachBodyToCancel {
1126 public:
1127     void operator()( size_t& value, tbb::feeder<size_t> &feeder ) const {
1128         Feed(feeder, 0);
1129         ParForEachBodyToCancel::operator()(value);
1130     }
1131 };
1132 
1133 template<class B, class Iterator>
1134 class ParForEachWorker {
1135     tbb::task_group_context &my_ctx;
1136 public:
1137     void operator()() const {
1138         auto range = adaptive_range<Iterator>(get_iter_range_size());
1139         tbb::parallel_for_each( std::begin(range), std::end(range), B(), my_ctx );
1140     }
1141 
1142     ParForEachWorker ( tbb::task_group_context& ctx ) : my_ctx(ctx) {}
1143 };
1144 
1145 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm).
1146 template <class Iterator, class body_to_cancel>
1147 void TestCancelation1_parallel_for_each () {
1148     ResetGlobals( false );
1149     // Threshold should leave more than max_threads tasks to test the cancellation. Set the threshold to iter_range_size()/4 since iter_range_size >= max_threads*2
1150     intptr_t threshold = get_iter_range_size() / 4;
1151     REQUIRE_MESSAGE(get_iter_range_size() - threshold > g_NumThreads, "Threshold should leave more than max_threads tasks to test the cancellation.");
1152     tbb::task_group tg;
1153     tbb::task_group_context  ctx;
1154     Cancellator cancellator(ctx, threshold);
1155     ParForEachWorker<body_to_cancel, Iterator> worker(ctx);
1156     tg.run( cancellator );
1157     utils::yield();
1158     tg.run( worker );
1159     TRY();
1160         tg.wait();
1161     CATCH_AND_FAIL();
1162     REQUIRE_MESSAGE (g_CurExecuted < g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks were executed after cancellation");
1163 }
1164 
1165 class ParForEachBodyToCancel2 {
1166 public:
1167     void operator()( size_t& /*value*/ ) const {
1168         ++g_CurExecuted;
1169         utils::ConcurrencyTracker ct;
1170         // The test will hang (and be timed out by the test system) if is_cancelled() is broken
1171         while( !tbb::is_current_task_group_canceling() )
1172             utils::yield();
1173     }
1174 };
1175 
1176 class ParForEachBodyToCancel2WithFeeder : ParForEachBodyToCancel2 {
1177 public:
1178     void operator()( size_t& value, tbb::feeder<size_t> &feeder ) const {
1179         Feed(feeder, 0);
1180         ParForEachBodyToCancel2::operator()(value);
1181     }
1182 };
1183 
1184 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm).
1185 /** This version also tests tbb::is_current_task_group_canceling() method. **/
1186 template <class Iterator, class body_to_cancel>
1187 void TestCancelation2_parallel_for_each () {
1188     ResetGlobals();
1189     RunCancellationTest<ParForEachWorker<body_to_cancel, Iterator>, Cancellator2>();
1190 }
1191 
1192 //! Testing parallel_for_each cancellation test
1193 //! \brief \ref error_guessing
1194 TEST_CASE("parallel_for_each cancellation test #1") {
1195     for (auto concurrency_level: utils::concurrency_range()) {
1196         g_NumThreads = static_cast<int>(concurrency_level);
1197         g_Master = std::this_thread::get_id();
1198         if (g_NumThreads > 1) {
1199             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
1200             // Execute in all the possible modes
1201             for ( size_t j = 0; j < 4; ++j ) {
1202                 g_ExceptionInMaster = (j & 1) != 0;
1203                 g_SolitaryException = (j & 2) != 0;
1204                 RunWithSimpleBody(TestCancelation1_parallel_for_each, ParForEachBodyToCancel);
1205             }
1206         }
1207     }
1208 }
1209 
1210 //! Testing parallel_for_each cancellation test
1211 //! \brief \ref error_guessing
1212 TEST_CASE("parallel_for_each cancellation test #2") {
1213     for (auto concurrency_level: utils::concurrency_range()) {
1214         g_NumThreads = static_cast<int>(concurrency_level);
1215         g_Master = std::this_thread::get_id();
1216         if (g_NumThreads > 1) {
1217             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
1218             // Execute in all the possible modes
1219             for ( size_t j = 0; j < 4; ++j ) {
1220                 g_ExceptionInMaster = (j & 1) != 0;
1221                 g_SolitaryException = (j & 2) != 0;
1222 
1223                 RunWithSimpleBody(TestCancelation2_parallel_for_each, ParForEachBodyToCancel2);
1224             }
1225         }
1226     }
1227 }
1228 
1229 ////////////////////////////////////////////////////////////////////////////////
1230 // Tests for tbb::parallel_pipeline
1231 ////////////////////////////////////////////////////////////////////////////////
1232 int g_NumTokens = 0;
1233 
1234 // Simple input filter class, it assigns 1 to all array members
1235 // It stops when it receives item equal to -1
1236 class InputFilter {
1237     mutable std::atomic<size_t> m_Item{};
1238     mutable std::vector<size_t> m_Buffer;
1239 public:
1240     InputFilter() : m_Buffer(get_iter_range_size()) {
1241         m_Item = 0;
1242         for (size_t i = 0; i < get_iter_range_size(); ++i )
1243             m_Buffer[i] = 1;
1244     }
1245     InputFilter(const InputFilter& other) : m_Item(other.m_Item.load()), m_Buffer(get_iter_range_size()) {
1246         for (size_t i = 0; i < get_iter_range_size(); ++i )
1247             m_Buffer[i] = other.m_Buffer[i];
1248     }
1249 
1250     void* operator()(tbb::flow_control& control) const {
1251         size_t item = m_Item++;
1252         if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true;
1253         else g_NonMasterExecuted = true;
1254         if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment();
1255         if(item == 1) {
1256             ++g_PipelinesStarted;   // count on emitting the first item.
1257         }
1258         if ( item >= get_iter_range_size() ) {
1259             control.stop();
1260             return nullptr;
1261         }
1262         m_Buffer[item] = 1;
1263         return &m_Buffer[item];
1264     }
1265 
1266     size_t* buffer() { return m_Buffer.data(); }
1267 }; // class InputFilter
1268 
1269 #if TBB_USE_EXCEPTIONS
1270 
1271 // Simple filter with exception throwing.  If parallel, will wait until
1272 // as many parallel filters start as there are threads.
1273 class SimpleFilter {
1274     bool m_canThrow;
1275     bool m_serial;
1276 public:
1277     SimpleFilter (bool canThrow, bool serial ) : m_canThrow(canThrow), m_serial(serial) {}
1278     void* operator()(void* item) const {
1279         ++g_CurExecuted;
1280         if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true;
1281         else g_NonMasterExecuted = true;
1282         if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment();
1283         if ( m_canThrow ) {
1284             if ( !m_serial ) {
1285                 utils::ConcurrencyTracker ct;
1286                 WaitUntilConcurrencyPeaks( std::min(g_NumTokens, g_NumThreads) );
1287             }
1288             ThrowTestException(1);
1289         }
1290         return item;
1291     }
1292 }; // class SimpleFilter
1293 
1294 // This enumeration represents filters order in pipeline
1295 struct FilterSet {
1296     tbb::filter_mode mode1, mode2;
1297     bool throw1, throw2;
1298 
1299     FilterSet( tbb::filter_mode m1, tbb::filter_mode m2, bool t1, bool t2 )
1300         : mode1(m1), mode2(m2), throw1(t1), throw2(t2)
1301     {}
1302 }; // struct FilterSet
1303 
1304 FilterSet serial_parallel( tbb::filter_mode::serial_in_order, tbb::filter_mode::parallel, /*throw1*/false, /*throw2*/true );
1305 
1306 template<typename InFilter, typename Filter>
1307 class CustomPipeline {
1308     InFilter inputFilter;
1309     Filter filter1;
1310     Filter filter2;
1311     FilterSet my_filters;
1312 public:
1313     CustomPipeline( const FilterSet& filters )
1314         : filter1(filters.throw1, filters.mode1 != tbb::filter_mode::parallel),
1315           filter2(filters.throw2, filters.mode2 != tbb::filter_mode::parallel),
1316           my_filters(filters)
1317     {}
1318 
1319     void run () {
1320         tbb::parallel_pipeline(
1321             g_NumTokens,
1322             tbb::make_filter<void, void*>(tbb::filter_mode::parallel, inputFilter) &
1323             tbb::make_filter<void*, void*>(my_filters.mode1, filter1) &
1324             tbb::make_filter<void*, void>(my_filters.mode2, filter2)
1325         );
1326     }
1327     void run ( tbb::task_group_context& ctx ) {
1328         tbb::parallel_pipeline(
1329             g_NumTokens,
1330             tbb::make_filter<void, void*>(tbb::filter_mode::parallel, inputFilter) &
1331             tbb::make_filter<void*, void*>(my_filters.mode1, filter1) &
1332             tbb::make_filter<void*, void>(my_filters.mode2, filter2),
1333             ctx
1334         );
1335     }
1336 };
1337 
1338 typedef CustomPipeline<InputFilter, SimpleFilter> SimplePipeline;
1339 
1340 // Tests exceptions without nesting
1341 void Test1_pipeline ( const FilterSet& filters ) {
1342     ResetGlobals();
1343     SimplePipeline testPipeline(filters);
1344     TRY();
1345         testPipeline.run();
1346         if ( g_CurExecuted == 2 * static_cast<int>(get_iter_range_size()) ) {
1347             // all the items were processed, though an exception was supposed to occur.
1348             if(!g_ExceptionInMaster && g_NonMasterExecutedThrow > 0) {
1349                 // if !g_ExceptionInMaster, the external thread is not allowed to throw.
1350                 // if g_nonMasterExcutedThrow > 0 then a thread besides the external thread tried to throw.
1351                 REQUIRE_MESSAGE((filters.mode1 != tbb::filter_mode::parallel && filters.mode2 != tbb::filter_mode::parallel),
1352                     "Unusual count");
1353             }
1354             // In case of all serial filters they might be all executed in the thread(s)
1355             // where exceptions are not allowed by the common test logic. So we just quit.
1356             return;
1357         }
1358     CATCH_AND_ASSERT();
1359     g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception");
1360     REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
1361     REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test");
1362     if ( !g_SolitaryException )
1363         REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
1364 
1365 } // void Test1_pipeline ()
1366 
1367 // Filter with nesting
1368 class OuterFilter {
1369 public:
1370     OuterFilter ( bool, bool ) {}
1371 
1372     void* operator()(void* item) const {
1373         ++g_OuterParCalls;
1374         SimplePipeline testPipeline(serial_parallel);
1375         testPipeline.run();
1376         return item;
1377     }
1378 }; // class OuterFilter
1379 
1380 //! Uses pipeline containing an inner pipeline with the default context not wrapped by a try-block.
1381 /** Inner algorithms are spawned inside the new bound context by default. Since
1382     exceptions thrown from the inner pipeline are not handled by the caller
1383     (outer pipeline body) in this test, they will cancel all the sibling inner
1384     algorithms. **/
1385 void Test2_pipeline ( const FilterSet& filters ) {
1386     ResetGlobals();
1387     g_NestedPipelines = true;
1388     CustomPipeline<InputFilter, OuterFilter> testPipeline(filters);
1389     TRY();
1390         testPipeline.run();
1391     CATCH_AND_ASSERT();
1392     bool okayNoExceptionCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow);
1393     REQUIRE_MESSAGE ((g_NumExceptionsCaught == 1 || okayNoExceptionCaught), "No try_blocks in any body expected in this test");
1394     if ( !g_SolitaryException ) {
1395         REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
1396     }
1397 } // void Test2_pipeline ()
1398 
1399 //! creates isolated inner pipeline and runs it.
1400 class OuterFilterWithIsolatedCtx {
1401 public:
1402     OuterFilterWithIsolatedCtx( bool , bool ) {}
1403 
1404     void* operator()(void* item) const {
1405         ++g_OuterParCalls;
1406         tbb::task_group_context ctx(tbb::task_group_context::isolated);
1407         // create inner pipeline with serial input, parallel output filter, second filter throws
1408         SimplePipeline testPipeline(serial_parallel);
1409         testPipeline.run(ctx);
1410         return item;
1411     }
1412 }; // class OuterFilterWithIsolatedCtx
1413 
1414 //! Uses pipeline invoking an inner pipeline with an isolated context without a try-block.
1415 /** Even though exceptions thrown from the inner pipeline are not handled
1416     by the caller in this test, they will not affect sibling inner algorithms
1417     already running because of the isolated contexts. However because the first
1418     exception cancels the root parallel_for_each only the first g_NumThreads subranges
1419     will be processed (which launch inner pipelines) **/
1420 void Test3_pipeline ( const FilterSet& filters ) {
1421     for( int nTries = 1; nTries <= 4; ++nTries) {
1422         ResetGlobals();
1423         g_NestedPipelines = true;
1424         g_Master = std::this_thread::get_id();
1425         intptr_t innerCalls = get_iter_range_size(),
1426                  minExecuted = (g_NumThreads - 1) * innerCalls;
1427         CustomPipeline<InputFilter, OuterFilterWithIsolatedCtx> testPipeline(filters);
1428         TRY();
1429             testPipeline.run();
1430         CATCH_AND_ASSERT();
1431 
1432         bool okayNoExceptionCaught = (g_ExceptionInMaster && !g_MasterExecuted) ||
1433             (!g_ExceptionInMaster && !g_NonMasterExecuted);
1434         // only test assertions if the test threw an exception (or we don't care)
1435         bool testSucceeded = okayNoExceptionCaught || g_NumExceptionsCaught > 0;
1436         if(testSucceeded) {
1437             if (g_SolitaryException) {
1438 
1439                 // The test is one outer pipeline with two NestedFilters that each start an inner pipeline.
1440                 // Each time the input filter of a pipeline delivers its first item, it increments
1441                 // g_PipelinesStarted.  When g_SolitaryException, the throw will not occur until
1442                 // g_PipelinesStarted >= 3.  (This is so at least a second pipeline in its own isolated
1443                 // context will start; that is what we're testing.)
1444                 //
1445                 // There are two pipelines which will NOT run to completion when a solitary throw
1446                 // happens in an isolated inner context: the outer pipeline and the pipeline which
1447                 // throws.  All the other pipelines which start should run to completion.  But only
1448                 // inner body invocations are counted.
1449                 //
1450                 // So g_CurExecuted should be about
1451                 //
1452                 //   (2*get_iter_range_size()) * (g_PipelinesStarted - 2) + 1
1453                 //   ^ executions for each completed pipeline
1454                 //                   ^ completing pipelines (remembering two will not complete)
1455                 //                                              ^ one for the inner throwing pipeline
1456 
1457                 minExecuted = (2*get_iter_range_size()) * (g_PipelinesStarted - 2) + 1;
1458                 // each failing pipeline must execute at least two tasks
1459                 REQUIRE_MESSAGE(g_CurExecuted >= minExecuted, "Too few tasks survived exception");
1460                 // no more than g_NumThreads tasks will be executed in a cancelled context.  Otherwise
1461                 // tasks not executing at throw were scheduled.
1462                 g_TGCCancelled.validate(g_NumThreads, "Tasks not in-flight were executed");
1463                 REQUIRE_MESSAGE(g_NumExceptionsCaught == 1, "Should have only one exception");
1464                 // if we're only throwing from the external thread, and that thread didn't
1465                 // participate in the pipelines, then no throw occurred.
1466             }
1467             REQUIRE_MESSAGE ((g_NumExceptionsCaught == 1 || okayNoExceptionCaught), "No try_blocks in any body expected in this test");
1468             REQUIRE_MESSAGE (((g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads) || okayNoExceptionCaught), "Too many tasks survived exception");
1469             return;
1470         }
1471     }
1472 }
1473 
1474 class OuterFilterWithEhBody  {
1475 public:
1476     OuterFilterWithEhBody( bool, bool ){}
1477 
1478     void* operator()(void* item) const {
1479         tbb::task_group_context ctx(tbb::task_group_context::isolated);
1480         ++g_OuterParCalls;
1481         SimplePipeline testPipeline(serial_parallel);
1482         TRY();
1483             testPipeline.run(ctx);
1484         CATCH();
1485         return item;
1486     }
1487 }; // class OuterFilterWithEhBody
1488 
1489 //! Uses pipeline body invoking an inner pipeline (with isolated context) inside a try-block.
1490 /** Since exception(s) thrown from the inner pipeline are handled by the caller
1491     in this test, they do not affect other tasks of the the root pipeline
1492     nor sibling inner algorithms. **/
1493 void Test4_pipeline ( const FilterSet& filters ) {
1494 #if __GNUC__ && !__INTEL_COMPILER
1495     if ( strncmp(__VERSION__, "4.1.0", 5) == 0 ) {
1496         MESSAGE("Known issue: one of exception handling tests is skipped.");
1497         return;
1498     }
1499 #endif
1500     ResetGlobals( true, true );
1501     // each outer pipeline stage will start get_iter_range_size() inner pipelines.
1502     // each inner pipeline that doesn't throw will process get_iter_range_size() items.
1503     // for solitary exception there will be one pipeline that only processes one stage, one item.
1504     // innerCalls should be 2*get_iter_range_size()
1505     intptr_t innerCalls = 2*get_iter_range_size(),
1506              outerCalls = 2*get_iter_range_size(),
1507              maxExecuted = outerCalls * innerCalls;  // the number of invocations of the inner pipelines
1508     CustomPipeline<InputFilter, OuterFilterWithEhBody> testPipeline(filters);
1509     TRY();
1510         testPipeline.run();
1511     CATCH_AND_ASSERT();
1512     intptr_t  minExecuted = 0;
1513     bool okayNoExceptionCaught = (g_ExceptionInMaster && !g_MasterExecuted) ||
1514         (!g_ExceptionInMaster && !g_NonMasterExecuted);
1515     if ( g_SolitaryException ) {
1516         minExecuted = maxExecuted - innerCalls;  // one throwing inner pipeline
1517         REQUIRE_MESSAGE((g_NumExceptionsCaught == 1 || okayNoExceptionCaught), "No exception registered");
1518         g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception");  // probably will assert.
1519     }
1520     else {
1521         // we assume throwing pipelines will not count
1522         minExecuted = (outerCalls - g_NumExceptionsCaught) * innerCalls;
1523         REQUIRE_MESSAGE(((g_NumExceptionsCaught >= 1 && g_NumExceptionsCaught <= outerCalls) || okayNoExceptionCaught), "Unexpected actual number of exceptions");
1524         REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too many executed tasks reported");
1525         // too many already-scheduled tasks are started after the first exception is
1526         // thrown.  And g_ExecutedAtLastCatch is updated every time an exception is caught.
1527         // So with multiple exceptions there are a variable number of tasks that have been
1528         // discarded because of the signals.
1529         // each throw is caught, so we will see many cancelled tasks.  g_ExecutedAtLastCatch is
1530         // updated with each throw, so the value will be the number of tasks executed at the last
1531         REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived multiple exceptions");
1532     }
1533 } // void Test4_pipeline ()
1534 
1535 //! Tests pipeline function passed with different combination of filters
1536 template<void testFunc(const FilterSet&)>
1537 void TestWithDifferentFiltersAndConcurrency() {
1538     for (auto concurrency_level: utils::concurrency_range()) {
1539         g_NumThreads = static_cast<int>(concurrency_level);
1540         g_Master = std::this_thread::get_id();
1541         if (g_NumThreads > 1) {
1542 
1543             const tbb::filter_mode modes[] = {
1544                 tbb::filter_mode::parallel,
1545                 tbb::filter_mode::serial_in_order,
1546                 tbb::filter_mode::serial_out_of_order
1547             };
1548 
1549             const int NumFilterTypes = sizeof(modes)/sizeof(modes[0]);
1550 
1551             // Execute in all the possible modes
1552             for ( size_t j = 0; j < 4; ++j ) {
1553                 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
1554                 g_ExceptionInMaster = (j & 1) != 0;
1555                 g_SolitaryException = (j & 2) != 0;
1556                 g_NumTokens = 2 * g_NumThreads;
1557 
1558                 for ( int i = 0; i < NumFilterTypes; ++i ) {
1559                     for ( int n = 0; n < NumFilterTypes; ++n ) {
1560                         for ( int k = 0; k < 2; ++k )
1561                             testFunc( FilterSet(modes[i], modes[n], k == 0, k != 0) );
1562                     }
1563                 }
1564             }
1565         }
1566     }
1567 }
1568 
1569 //! Testing parallel_pipeline exception handling
1570 //! \brief \ref error_guessing
1571 TEST_CASE("parallel_pipeline exception handling test #1") {
1572     TestWithDifferentFiltersAndConcurrency<Test1_pipeline>();
1573 }
1574 
1575 //! Testing parallel_pipeline exception handling
1576 //! \brief \ref error_guessing
1577 TEST_CASE("parallel_pipeline exception handling test #2") {
1578     TestWithDifferentFiltersAndConcurrency<Test2_pipeline>();
1579 }
1580 
1581 //! Testing parallel_pipeline exception handling
1582 //! \brief \ref error_guessing
1583 TEST_CASE("parallel_pipeline exception handling test #3") {
1584     TestWithDifferentFiltersAndConcurrency<Test3_pipeline>();
1585 }
1586 
1587 //! Testing parallel_pipeline exception handling
1588 //! \brief \ref error_guessing
1589 TEST_CASE("parallel_pipeline exception handling test #4") {
1590     TestWithDifferentFiltersAndConcurrency<Test4_pipeline>();
1591 }
1592 
1593 #endif /* TBB_USE_EXCEPTIONS */
1594 
1595 class FilterToCancel  {
1596 public:
1597     FilterToCancel() {}
1598     void* operator()(void* item) const {
1599         ++g_CurExecuted;
1600         Cancellator::WaitUntilReady();
1601         return item;
1602     }
1603 }; // class FilterToCancel
1604 
1605 template <class Filter_to_cancel>
1606 class PipelineLauncher {
1607     tbb::task_group_context &my_ctx;
1608 public:
1609     PipelineLauncher ( tbb::task_group_context& ctx ) : my_ctx(ctx) {}
1610 
1611     void operator()() const {
1612         // Run test when serial filter is the first non-input filter
1613         InputFilter inputFilter;
1614         Filter_to_cancel filterToCancel;
1615         tbb::parallel_pipeline(
1616             g_NumTokens,
1617             tbb::make_filter<void, void*>(tbb::filter_mode::parallel, inputFilter) &
1618             tbb::make_filter<void*, void>(tbb::filter_mode::parallel, filterToCancel),
1619             my_ctx
1620         );
1621     }
1622 };
1623 
1624 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm).
1625 void TestCancelation1_pipeline () {
1626     ResetGlobals();
1627     g_ThrowException = false;
1628     // Threshold should leave more than max_threads tasks to test the cancellation. Set the threshold to iter_range_size()/4 since iter_range_size >= max_threads*2
1629     intptr_t threshold = get_iter_range_size() / 4;
1630     REQUIRE_MESSAGE(get_iter_range_size() - threshold > g_NumThreads, "Threshold should leave more than max_threads tasks to test the cancellation.");
1631     RunCancellationTest<PipelineLauncher<FilterToCancel>, Cancellator>(threshold);
1632     g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived cancellation");
1633     REQUIRE_MESSAGE (g_CurExecuted < g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks were executed after cancellation");
1634 }
1635 
1636 class FilterToCancel2  {
1637 public:
1638     FilterToCancel2() {}
1639 
1640     void* operator()(void* item) const {
1641         ++g_CurExecuted;
1642         utils::ConcurrencyTracker ct;
1643         // The test will hang (and be timed out by the test system) if is_cancelled() is broken
1644         while( !tbb::is_current_task_group_canceling() )
1645             utils::yield();
1646         return item;
1647     }
1648 };
1649 
1650 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm).
1651 /** This version also tests task::is_cancelled() method. **/
1652 void TestCancelation2_pipeline () {
1653     ResetGlobals();
1654     RunCancellationTest<PipelineLauncher<FilterToCancel2>, Cancellator2>();
1655     // g_CurExecuted is always >= g_ExecutedAtLastCatch, because the latter is always a snapshot of the
1656     // former, and g_CurExecuted is monotonic increasing.  so the comparison should be at least ==.
1657     // If another filter is started after cancel but before cancellation is propagated, then the
1658     // number will be larger.
1659     REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch, "Some tasks were executed after cancellation");
1660 }
1661 
1662 /** If min and max thread numbers specified on the command line are different,
1663     the test is run only for 2 sizes of the thread pool (MinThread and MaxThread)
1664     to be able to test the high and low contention modes while keeping the test reasonably fast **/
1665 
1666 //! Testing parallel_pipeline cancellation
1667 //! \brief \ref error_guessing
1668 TEST_CASE("parallel_pipeline cancellation test #1") {
1669     for (auto concurrency_level: utils::concurrency_range()) {
1670         g_NumThreads = static_cast<int>(concurrency_level);
1671         g_Master = std::this_thread::get_id();
1672         if (g_NumThreads > 1) {
1673             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
1674             // Execute in all the possible modes
1675             for ( size_t j = 0; j < 4; ++j ) {
1676                 g_ExceptionInMaster = (j & 1) != 0;
1677                 g_SolitaryException = (j & 2) != 0;
1678                 g_NumTokens = 2 * g_NumThreads;
1679 
1680                 TestCancelation1_pipeline();
1681             }
1682         }
1683     }
1684 }
1685 
1686 //! Testing parallel_pipeline cancellation
1687 //! \brief \ref error_guessing
1688 TEST_CASE("parallel_pipeline cancellation test #2") {
1689     for (auto concurrency_level: utils::concurrency_range()) {
1690         g_NumThreads = static_cast<int>(concurrency_level);
1691         g_Master = std::this_thread::get_id();
1692         if (g_NumThreads > 1) {
1693             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
1694             // Execute in all the possible modes
1695             for ( size_t j = 0; j < 4; ++j ) {
1696                 g_ExceptionInMaster = (j & 1) != 0;
1697                 g_SolitaryException = (j & 2) != 0;
1698                 g_NumTokens = 2 * g_NumThreads;
1699 
1700                 TestCancelation2_pipeline();
1701             }
1702         }
1703     }
1704 }
1705