xref: /oneTBB/test/tbb/test_eh_algorithms.cpp (revision b15aabb3)
1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "common/test.h"
18 #include "common/concurrency_tracker.h"
19 #include "common/iterator.h"
20 #include "common/utils_concurrency_limit.h"
21 
22 #include <limits.h> // for INT_MAX
23 #include <thread>
24 
25 #include "tbb/parallel_for.h"
26 #include "tbb/parallel_reduce.h"
27 #include "tbb/parallel_for_each.h"
28 #include "tbb/parallel_pipeline.h"
29 #include "tbb/blocked_range.h"
30 #include "tbb/task_group.h"
31 #include "tbb/global_control.h"
32 #include "tbb/concurrent_unordered_map.h"
33 #include "tbb/task.h"
34 
35 //! \file test_eh_algorithms.cpp
36 //! \brief Test for [algorithms.parallel_for algorithms.parallel_reduce algorithms.parallel_deterministic_reduce algorithms.parallel_for_each algorithms.parallel_pipeline algorithms.parallel_pipeline.flow_control] specifications
37 
38 #define FLAT_RANGE  100000
39 #define FLAT_GRAIN  100
40 #define OUTER_RANGE  100
41 #define OUTER_GRAIN  10
42 #define INNER_RANGE  (FLAT_RANGE / OUTER_RANGE)
43 #define INNER_GRAIN  (FLAT_GRAIN / OUTER_GRAIN)
44 
45 struct context_specific_counter {
46     tbb::concurrent_unordered_map<tbb::task_group_context*, std::atomic<unsigned>> context_map{};
47 
48     void increment() {
49         tbb::task_group_context* ctx = tbb::task::current_context();
50         REQUIRE(ctx != nullptr);
51         context_map[ctx]++;
52     }
53 
54     void reset() {
55         context_map.clear();
56     }
57 
58     void validate(unsigned expected_count, const char* msg) {
59         for (auto it = context_map.begin(); it != context_map.end(); it++) {
60             REQUIRE_MESSAGE( it->second <= expected_count, msg);
61         }
62     }
63 };
64 
65 std::atomic<intptr_t> g_FedTasksCount{}; // number of tasks added by parallel_for_each feeder
66 std::atomic<intptr_t> g_OuterParCalls{};  // number of actual invocations of the outer construct executed.
67 context_specific_counter g_TGCCancelled{};  // Number of times a task sees its group cancelled at start
68 
69 #include "common/exception_handling.h"
70 
71 /********************************
72       Variables in test
73 
74 __ Test control variables
75       g_ExceptionInMaster -- only the external thread is allowed to throw.  If false, the external cannot throw
76       g_SolitaryException -- only one throw may be executed.
77 
78 -- controls for ThrowTestException for pipeline tests
79       g_NestedPipelines -- are inner pipelines being run?
80       g_PipelinesStarted -- how many pipelines have run their first filter at least once.
81 
82 -- Information variables
83 
84    g_Master -- Thread ID of the "external" thread
85       In pipelines sometimes the external thread does not participate, so the tests have to be resilient to this.
86 
87 -- Measurement variables
88 
89    g_OuterParCalls -- how many outer parallel ranges or filters started
90    g_TGCCancelled --  how many inner parallel ranges or filters saw task::self().is_cancelled()
91    g_ExceptionsThrown -- number of throws executed (counted in ThrowTestException)
92    g_MasterExecutedThrow -- number of times external thread actually executed a throw
93    g_NonMasterExecutedThrow -- number of times non-external thread actually executed a throw
94    g_ExceptionCaught -- one of PropagatedException or unknown exception was caught.  (Other exceptions cause assertions.)
95 
96    --  Tallies for the task bodies which have executed (counted in each inner body, sampled in ThrowTestException)
97       g_CurExecuted -- total number of inner ranges or filters which executed
98       g_ExecutedAtLastCatch -- value of g_CurExecuted when last catch was made, 0 if none.
99       g_ExecutedAtFirstCatch -- value of g_CurExecuted when first catch is made, 0 if none.
100   *********************************/
101 
102 inline void ResetGlobals (  bool throwException = true, bool flog = false ) {
103     ResetEhGlobals( throwException, flog );
104     g_FedTasksCount = 0;
105     g_OuterParCalls = 0;
106     g_NestedPipelines = false;
107     g_TGCCancelled.reset();
108 }
109 
110 ////////////////////////////////////////////////////////////////////////////////
111 // Tests for tbb::parallel_for and tbb::parallel_reduce
112 ////////////////////////////////////////////////////////////////////////////////
113 
114 typedef size_t count_type;
115 typedef tbb::blocked_range<count_type> range_type;
116 
117 inline intptr_t CountSubranges(range_type r) {
118     if(!r.is_divisible()) return intptr_t(1);
119     range_type r2(r,tbb::split());
120     return CountSubranges(r) + CountSubranges(r2);
121 }
122 
123 inline intptr_t NumSubranges ( intptr_t length, intptr_t grain ) {
124     return CountSubranges(range_type(0,length,grain));
125 }
126 
127 template<class Body>
128 intptr_t TestNumSubrangesCalculation ( intptr_t length, intptr_t grain, intptr_t inner_length, intptr_t inner_grain ) {
129     ResetGlobals();
130     g_ThrowException = false;
131     intptr_t outerCalls = NumSubranges(length, grain),
132              innerCalls = NumSubranges(inner_length, inner_grain),
133              maxExecuted = outerCalls * (innerCalls + 1);
134     tbb::parallel_for( range_type(0, length, grain), Body() );
135     REQUIRE_MESSAGE (g_CurExecuted == maxExecuted, "Wrong estimation of bodies invocation count");
136     return maxExecuted;
137 }
138 
139 class NoThrowParForBody {
140 public:
141     void operator()( const range_type& r ) const {
142         if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true;
143         else g_NonMasterExecuted = true;
144         if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment();
145         utils::doDummyWork(r.size());
146     }
147 };
148 
149 #if TBB_USE_EXCEPTIONS
150 
151 void Test0 () {
152     ResetGlobals();
153     tbb::simple_partitioner p;
154     for( size_t i=0; i<10; ++i ) {
155         tbb::parallel_for( range_type(0, 0, 1), NoThrowParForBody() );
156         tbb::parallel_for( range_type(0, 0, 1), NoThrowParForBody(), p );
157         tbb::parallel_for( range_type(0, 128, 8), NoThrowParForBody() );
158         tbb::parallel_for( range_type(0, 128, 8), NoThrowParForBody(), p );
159     }
160 } // void Test0 ()
161 
162 //! Template that creates a functor suitable for parallel_reduce from a functor for parallel_for.
163 template<typename ParForBody>
164 class SimpleParReduceBody {
165     ParForBody m_Body;
166 public:
167     void operator=(const SimpleParReduceBody&) = delete;
168     SimpleParReduceBody(const SimpleParReduceBody&) = default;
169     SimpleParReduceBody() = default;
170 
171     void operator()( const range_type& r ) const { m_Body(r); }
172     SimpleParReduceBody( SimpleParReduceBody& left, tbb::split ) : m_Body(left.m_Body) {}
173     void join( SimpleParReduceBody& /*right*/ ) {}
174 }; // SimpleParReduceBody
175 
176 //! Test parallel_for and parallel_reduce for a given partitioner.
177 /** The Body need only be suitable for a parallel_for. */
178 template<typename ParForBody, typename Partitioner>
179 void TestParallelLoopAux() {
180     Partitioner partitioner;
181     for( int i=0; i<2; ++i ) {
182         ResetGlobals();
183         TRY();
184             if( i==0 )
185                 tbb::parallel_for( range_type(0, FLAT_RANGE, FLAT_GRAIN), ParForBody(), partitioner );
186             else {
187                 SimpleParReduceBody<ParForBody> rb;
188                 tbb::parallel_reduce( range_type(0, FLAT_RANGE, FLAT_GRAIN), rb, partitioner );
189             }
190         CATCH_AND_ASSERT();
191         // two cases: g_SolitaryException and !g_SolitaryException
192         //   1) g_SolitaryException: only one thread actually threw.  There is only one context, so the exception
193         //      (when caught) will cause that context to be cancelled.  After this event, there may be one or
194         //      more threads which are "in-flight", up to g_NumThreads, but no more will be started.  The threads,
195         //      when they start, if they see they are cancelled, TGCCancelled is incremented.
196         //   2) !g_SolitaryException: more than one thread can throw.  The number of threads that actually
197         //      threw is g_MasterExecutedThrow if only the external thread is allowed, else g_NonMasterExecutedThrow.
198         //      Only one context, so TGCCancelled should be <= g_NumThreads.
199         //
200         // the reasoning is similar for nested algorithms in a single context (Test2).
201         //
202         // If a thread throws in a context, more than one subsequent task body may see the
203         // cancelled state (if they are scheduled before the state is propagated.) this is
204         // infrequent, but it occurs.  So what was to be an assertion must be a remark.
205         g_TGCCancelled.validate( g_NumThreads, "Too many tasks ran after exception thrown");
206         REQUIRE_MESSAGE(g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
207         if ( g_SolitaryException ) {
208             REQUIRE_MESSAGE(g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test");
209             REQUIRE_MESSAGE(g_NumExceptionsCaught == (g_ExceptionInMaster ? g_MasterExecutedThrow : g_NonMasterExecutedThrow),
210                 "Not all throws were caught");
211             REQUIRE_MESSAGE(g_ExecutedAtFirstCatch == g_ExecutedAtLastCatch, "Too many exceptions occurred");
212         }
213         else {
214             REQUIRE_MESSAGE(g_NumExceptionsCaught >= 1, "No try blocks in any body expected in this test");
215         }
216     }
217 }  // TestParallelLoopAux
218 
219 //! Test with parallel_for and parallel_reduce, over all three kinds of partitioners.
220 /** The Body only needs to be suitable for tbb::parallel_for. */
221 template<typename Body>
222 void TestParallelLoop() {
223     // The simple and auto partitioners should be const, but not the affinity partitioner.
224     TestParallelLoopAux<Body, const tbb::simple_partitioner  >();
225     TestParallelLoopAux<Body, const tbb::auto_partitioner    >();
226 #define __TBB_TEMPORARILY_DISABLED 1
227 #if !__TBB_TEMPORARILY_DISABLED
228     // TODO: Improve the test so that it tolerates delayed start of tasks with affinity_partitioner
229     TestParallelLoopAux<Body, /***/ tbb::affinity_partitioner>();
230 #endif
231 #undef __TBB_TEMPORARILY_DISABLED
232 }
233 
234 class SimpleParForBody {
235 public:
236     void operator=(const SimpleParForBody&) = delete;
237     SimpleParForBody(const SimpleParForBody&) = default;
238     SimpleParForBody() = default;
239 
240     void operator()( const range_type& r ) const {
241         utils::ConcurrencyTracker ct;
242         ++g_CurExecuted;
243         if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true;
244         else g_NonMasterExecuted = true;
245         if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment();
246         utils::doDummyWork(r.size());
247         WaitUntilConcurrencyPeaks();
248         ThrowTestException(1);
249     }
250 };
251 
252 void Test1() {
253     // non-nested parallel_for/reduce with throwing body, one context
254     TestParallelLoop<SimpleParForBody>();
255 } // void Test1 ()
256 
257 class OuterParForBody {
258 public:
259     void operator=(const OuterParForBody&) = delete;
260     OuterParForBody(const OuterParForBody&) = default;
261     OuterParForBody() = default;
262     void operator()( const range_type& ) const {
263         utils::ConcurrencyTracker ct;
264         ++g_OuterParCalls;
265         tbb::parallel_for( tbb::blocked_range<size_t>(0, INNER_RANGE, INNER_GRAIN), SimpleParForBody() );
266     }
267 };
268 
269 //! Uses parallel_for body containing an inner parallel_for with the default context not wrapped by a try-block.
270 /** Inner algorithms are spawned inside the new bound context by default. Since
271     exceptions thrown from the inner parallel_for are not handled by the caller
272     (outer parallel_for body) in this test, they will cancel all the sibling inner
273     algorithms. **/
274 void Test2 () {
275     TestParallelLoop<OuterParForBody>();
276 } // void Test2 ()
277 
278 class OuterParForBodyWithIsolatedCtx {
279 public:
280     void operator()( const range_type& ) const {
281         tbb::task_group_context ctx(tbb::task_group_context::isolated);
282         ++g_OuterParCalls;
283         tbb::parallel_for( tbb::blocked_range<size_t>(0, INNER_RANGE, INNER_GRAIN), SimpleParForBody(), tbb::simple_partitioner(), ctx );
284     }
285 };
286 
287 //! Uses parallel_for body invoking an inner parallel_for with an isolated context without a try-block.
288 /** Even though exceptions thrown from the inner parallel_for are not handled
289     by the caller in this test, they will not affect sibling inner algorithms
290     already running because of the isolated contexts. However because the first
291     exception cancels the root parallel_for only the first g_NumThreads subranges
292     will be processed (which launch inner parallel_fors) **/
293 void Test3 () {
294     ResetGlobals();
295     typedef OuterParForBodyWithIsolatedCtx body_type;
296     intptr_t  innerCalls = NumSubranges(INNER_RANGE, INNER_GRAIN),
297             // we expect one thread to throw without counting, the rest to run to completion
298             // this formula assumes g_numThreads outer pfor ranges will be started, but that is not the
299             // case; the SimpleParFor subranges are started up as part of the outer ones, and when
300             // the amount of concurrency reaches g_NumThreads no more outer Pfor ranges are started.
301             // so we have to count the number of outer Pfors actually started.
302             minExecuted = (g_NumThreads - 1) * innerCalls;
303     TRY();
304         tbb::parallel_for( range_type(0, OUTER_RANGE, OUTER_GRAIN), body_type() );
305     CATCH_AND_ASSERT();
306     minExecuted = (g_OuterParCalls - 1) * innerCalls;  // see above
307 
308     // The first formula above assumes all ranges of the outer parallel for are executed, and one
309     // cancels.  In the event, we have a smaller number of ranges that start before the exception
310     // is caught.
311     //
312     //  g_SolitaryException:One inner range throws.  Outer parallel_For is cancelled, but sibling
313     //                      parallel_fors continue to completion (unless the threads that execute
314     //                      are not allowed to throw, in which case we will not see any exceptions).
315     // !g_SolitaryException:multiple inner ranges may throw.  Any which throws will stop, and the
316     //                      corresponding range of the outer pfor will stop also.
317     //
318     // In either case, once the outer pfor gets the exception it will stop executing further ranges.
319 
320     // if the only threads executing were not allowed to throw, then not seeing an exception is okay.
321     bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecuted) || (!g_ExceptionInMaster && !g_NonMasterExecuted);
322     if ( g_SolitaryException ) {
323         g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception");
324         REQUIRE_MESSAGE (g_CurExecuted > minExecuted, "Too few tasks survived exception");
325         REQUIRE_MESSAGE ((g_CurExecuted <= minExecuted + (g_ExecutedAtLastCatch + g_NumThreads)), "Too many tasks survived exception");
326         REQUIRE_MESSAGE ((g_NumExceptionsCaught == 1 || okayNoExceptionsCaught), "No try_blocks in any body expected in this test");
327     }
328     else {
329         REQUIRE_MESSAGE ((g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads), "Too many tasks survived exception");
330         REQUIRE_MESSAGE ((g_NumExceptionsCaught >= 1 || okayNoExceptionsCaught), "No try_blocks in any body expected in this test");
331     }
332 } // void Test3 ()
333 
334 class OuterParForExceptionSafeBody {
335 public:
336     void operator()( const range_type& ) const {
337         tbb::task_group_context ctx(tbb::task_group_context::isolated);
338         ++g_OuterParCalls;
339         TRY();
340             tbb::parallel_for( tbb::blocked_range<size_t>(0, INNER_RANGE, INNER_GRAIN), SimpleParForBody(), tbb::simple_partitioner(), ctx );
341         CATCH();  // this macro sets g_ExceptionCaught
342     }
343 };
344 
345 //! Uses parallel_for body invoking an inner parallel_for (with isolated context) inside a try-block.
346 /** Since exception(s) thrown from the inner parallel_for are handled by the caller
347     in this test, they do not affect neither other tasks of the the root parallel_for
348     nor sibling inner algorithms. **/
349 void Test4 () {
350     ResetGlobals( true, true );
351     intptr_t  innerCalls = NumSubranges(INNER_RANGE, INNER_GRAIN),
352               outerCalls = NumSubranges(OUTER_RANGE, OUTER_GRAIN);
353     TRY();
354         tbb::parallel_for( range_type(0, OUTER_RANGE, OUTER_GRAIN), OuterParForExceptionSafeBody() );
355     CATCH();
356     // g_SolitaryException  : one inner pfor will throw, the rest will execute to completion.
357     //                        so the count should be (outerCalls -1) * innerCalls, if a throw happened.
358     // !g_SolitaryException : possible multiple inner pfor throws.  Should be approximately
359     //                        (outerCalls - g_NumExceptionsCaught) * innerCalls, give or take a few
360     intptr_t  minExecuted = (outerCalls - g_NumExceptionsCaught) * innerCalls;
361     bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecuted) || (!g_ExceptionInMaster && !g_NonMasterExecuted);
362     if ( g_SolitaryException ) {
363         // only one task had exception thrown. That task had at least one execution (the one that threw).
364         // There may be an arbitrary number of ranges executed after the throw but before the exception
365         // is caught in the scheduler and cancellation is signaled.  (seen 9, 11 and 62 (!) for 8 threads)
366         REQUIRE_MESSAGE ((g_NumExceptionsCaught == 1 || okayNoExceptionsCaught), "No exception registered");
367         REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too few tasks executed");
368         g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception");
369         // a small number of threads can execute in a throwing sub-pfor, if the task which is
370         // to do the solitary throw swaps out after registering its intent to throw but before it
371         // actually does so. As a result, the number of extra tasks cannot exceed the number of thread
372         // for each nested pfor invication)
373         REQUIRE_MESSAGE (g_CurExecuted <= minExecuted + (g_NumThreads-1)*g_NumThreads/2, "Too many tasks survived exception");
374     }
375     else {
376         REQUIRE_MESSAGE (((g_NumExceptionsCaught >= 1 && g_NumExceptionsCaught <= outerCalls) || okayNoExceptionsCaught), "Unexpected actual number of exceptions");
377         REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too few executed tasks reported");
378         REQUIRE_MESSAGE ((g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads), "Too many tasks survived multiple exceptions");
379         REQUIRE_MESSAGE (g_CurExecuted <= outerCalls * (1 + g_NumThreads), "Too many tasks survived exception");
380     }
381 } // void Test4 ()
382 
383 //! Testing parallel_for and parallel_reduce exception handling
384 //! \brief \ref error_guessing
385 TEST_CASE("parallel_for and parallel_reduce exception handling test #0") {
386     for (auto concurrency_level: utils::concurrency_range()) {
387         g_NumThreads = static_cast<int>(concurrency_level);
388         g_Master = std::this_thread::get_id();
389         if (g_NumThreads > 1) {
390             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
391             // Execute in all the possible modes
392             for ( size_t j = 0; j < 4; ++j ) {
393                 g_ExceptionInMaster = (j & 1) != 0;
394                 g_SolitaryException = (j & 2) != 0;
395 
396                 Test0();
397             }
398         }
399     }
400 }
401 
402 //! Testing parallel_for and parallel_reduce exception handling
403 //! \brief \ref error_guessing
404 TEST_CASE("parallel_for and parallel_reduce exception handling test #1") {
405     for (auto concurrency_level: utils::concurrency_range()) {
406         g_NumThreads = static_cast<int>(concurrency_level);
407         g_Master = std::this_thread::get_id();
408         if (g_NumThreads > 1) {
409             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
410             // Execute in all the possible modes
411             for ( size_t j = 0; j < 4; ++j ) {
412                 g_ExceptionInMaster = (j & 1) != 0;
413                 g_SolitaryException = (j & 2) != 0;
414 
415                 Test1();
416             }
417         }
418     }
419 }
420 
421 //! Testing parallel_for and parallel_reduce exception handling
422 //! \brief \ref error_guessing
423 TEST_CASE("parallel_for and parallel_reduce exception handling test #2") {
424     for (auto concurrency_level: utils::concurrency_range()) {
425         g_NumThreads = static_cast<int>(concurrency_level);
426         g_Master = std::this_thread::get_id();
427         if (g_NumThreads > 1) {
428             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
429             // Execute in all the possible modes
430             for ( size_t j = 0; j < 4; ++j ) {
431                 g_ExceptionInMaster = (j & 1) != 0;
432                 g_SolitaryException = (j & 2) != 0;
433 
434                 Test2();
435             }
436         }
437     }
438 }
439 
440 //! Testing parallel_for and parallel_reduce exception handling
441 //! \brief \ref error_guessing
442 TEST_CASE("parallel_for and parallel_reduce exception handling test #3") {
443     for (auto concurrency_level: utils::concurrency_range()) {
444         g_NumThreads = static_cast<int>(concurrency_level);
445         g_Master = std::this_thread::get_id();
446         if (g_NumThreads > 1) {
447             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
448             // Execute in all the possible modes
449             for ( size_t j = 0; j < 4; ++j ) {
450                 g_ExceptionInMaster = (j & 1) != 0;
451                 g_SolitaryException = (j & 2) != 0;
452 
453                 Test3();
454             }
455         }
456     }
457 }
458 
459 //! Testing parallel_for and parallel_reduce exception handling
460 //! \brief \ref error_guessing
461 TEST_CASE("parallel_for and parallel_reduce exception handling test #4") {
462     for (auto concurrency_level: utils::concurrency_range()) {
463         g_NumThreads = static_cast<int>(concurrency_level);
464         g_Master = std::this_thread::get_id();
465         if (g_NumThreads > 1) {
466             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
467             // Execute in all the possible modes
468             for ( size_t j = 0; j < 4; ++j ) {
469                 g_ExceptionInMaster = (j & 1) != 0;
470                 g_SolitaryException = (j & 2) != 0;
471 
472                 Test4();
473             }
474         }
475     }
476 }
477 
478 #endif /* TBB_USE_EXCEPTIONS */
479 
480 class ParForBodyToCancel {
481 public:
482     void operator()( const range_type& ) const {
483         ++g_CurExecuted;
484         Cancellator::WaitUntilReady();
485     }
486 };
487 
488 template<class B>
489 class ParForLauncher {
490     tbb::task_group_context &my_ctx;
491 public:
492     void operator()() const {
493         tbb::parallel_for( range_type(0, FLAT_RANGE, FLAT_GRAIN), B(), tbb::simple_partitioner(), my_ctx );
494     }
495     ParForLauncher ( tbb::task_group_context& ctx ) : my_ctx(ctx) {}
496 };
497 
498 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm).
499 void TestCancelation1 () {
500     ResetGlobals( false );
501     RunCancellationTest<ParForLauncher<ParForBodyToCancel>, Cancellator>( NumSubranges(FLAT_RANGE, FLAT_GRAIN) / 4 );
502 }
503 
504 class Cancellator2  {
505     tbb::task_group_context &m_GroupToCancel;
506 
507 public:
508     void operator()() const {
509         utils::ConcurrencyTracker ct;
510         WaitUntilConcurrencyPeaks();
511         m_GroupToCancel.cancel_group_execution();
512         g_ExecutedAtLastCatch = g_CurExecuted.load();
513     }
514 
515     Cancellator2 ( tbb::task_group_context& ctx, intptr_t ) : m_GroupToCancel(ctx) {}
516 };
517 
518 class ParForBodyToCancel2 {
519 public:
520     void operator()( const range_type& ) const {
521         ++g_CurExecuted;
522         utils::ConcurrencyTracker ct;
523         // The test will hang (and be timed out by the test system) if is_cancelled() is broken
524         while( !tbb::is_current_task_group_canceling() )
525             utils::yield();
526     }
527 };
528 
529 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm).
530 /** This version also tests tbb::is_current_task_group_canceling() method. **/
531 void TestCancelation2 () {
532     ResetGlobals();
533     RunCancellationTest<ParForLauncher<ParForBodyToCancel2>, Cancellator2>();
534     REQUIRE_MESSAGE (g_ExecutedAtLastCatch < g_NumThreads, "Somehow worker tasks started their execution before the cancellator task");
535     g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived cancellation");
536     REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Some tasks were executed after cancellation");
537 }
538 
539 ////////////////////////////////////////////////////////////////////////////////
540 // Regression test based on the contribution by the author of the following forum post:
541 // http://softwarecommunity.intel.com/isn/Community/en-US/forums/thread/30254959.aspx
542 
543 class Worker {
544     static const int max_nesting = 3;
545     static const int reduce_range = 1024;
546     static const int reduce_grain = 256;
547 public:
548     int DoWork (int level);
549     int Validate (int start_level) {
550         int expected = 1; // identity for multiplication
551         for(int i=start_level+1; i<max_nesting; ++i)
552              expected *= reduce_range;
553         return expected;
554     }
555 };
556 
557 class RecursiveParReduceBodyWithSharedWorker {
558     Worker * m_SharedWorker;
559     int m_NestingLevel;
560     int m_Result;
561 public:
562     RecursiveParReduceBodyWithSharedWorker ( RecursiveParReduceBodyWithSharedWorker& src, tbb::split )
563         : m_SharedWorker(src.m_SharedWorker)
564         , m_NestingLevel(src.m_NestingLevel)
565         , m_Result(0)
566     {}
567     RecursiveParReduceBodyWithSharedWorker ( Worker *w, int outer )
568         : m_SharedWorker(w)
569         , m_NestingLevel(outer)
570         , m_Result(0)
571     {}
572 
573     void operator() ( const tbb::blocked_range<size_t>& r ) {
574         if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true;
575         else g_NonMasterExecuted = true;
576         if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment();
577         for (size_t i = r.begin (); i != r.end (); ++i) {
578             m_Result += m_SharedWorker->DoWork (m_NestingLevel);
579         }
580     }
581     void join (const RecursiveParReduceBodyWithSharedWorker & x) {
582         m_Result += x.m_Result;
583     }
584     int result () { return m_Result; }
585 };
586 
587 int Worker::DoWork ( int level ) {
588     ++level;
589     if ( level < max_nesting ) {
590         RecursiveParReduceBodyWithSharedWorker rt (this, level);
591         tbb::parallel_reduce (tbb::blocked_range<size_t>(0, reduce_range, reduce_grain), rt);
592         return rt.result();
593     }
594     else
595         return 1;
596 }
597 
598 //! Regression test for hanging that occurred with the first version of cancellation propagation
599 void TestCancelation3 () {
600     Worker w;
601     int result   = w.DoWork (0);
602     int expected = w.Validate(0);
603     REQUIRE_MESSAGE ( result == expected, "Wrong calculation result");
604 }
605 
606 struct StatsCounters {
607     std::atomic<size_t> my_total_created;
608     std::atomic<size_t> my_total_deleted;
609     StatsCounters() {
610         my_total_created = 0;
611         my_total_deleted = 0;
612     }
613 };
614 
615 class ParReduceBody {
616     StatsCounters* my_stats;
617     size_t my_id;
618     bool my_exception;
619     tbb::task_group_context& tgc;
620 
621 public:
622     ParReduceBody( StatsCounters& s_, tbb::task_group_context& context, bool e_ ) :
623         my_stats(&s_), my_exception(e_), tgc(context) {
624         my_id = my_stats->my_total_created++;
625     }
626 
627     ParReduceBody( const ParReduceBody& lhs ) : tgc(lhs.tgc) {
628         my_stats = lhs.my_stats;
629         my_id = my_stats->my_total_created++;
630     }
631 
632     ParReduceBody( ParReduceBody& lhs, tbb::split ) : tgc(lhs.tgc) {
633         my_stats = lhs.my_stats;
634         my_id = my_stats->my_total_created++;
635     }
636 
637     ~ParReduceBody(){ ++my_stats->my_total_deleted; }
638 
639     void operator()( const tbb::blocked_range<std::size_t>& /*range*/ ) const {
640         //Do nothing, except for one task (chosen arbitrarily)
641         if( my_id >= 12 ) {
642             if( my_exception )
643                 ThrowTestException(1);
644             else
645                 tgc.cancel_group_execution();
646         }
647     }
648 
649     void join( ParReduceBody& /*rhs*/ ) {}
650 };
651 
652 void TestCancelation4() {
653     StatsCounters statsObj;
654 #if TBB_USE_EXCEPTIONS
655     try
656 #endif
657     {
658         tbb::task_group_context tgc1, tgc2;
659         ParReduceBody body_for_cancellation(statsObj, tgc1, false), body_for_exception(statsObj, tgc2, true);
660         tbb::parallel_reduce( tbb::blocked_range<std::size_t>(0,100000000,100), body_for_cancellation, tbb::simple_partitioner(), tgc1 );
661         tbb::parallel_reduce( tbb::blocked_range<std::size_t>(0,100000000,100), body_for_exception, tbb::simple_partitioner(), tgc2 );
662     }
663 #if TBB_USE_EXCEPTIONS
664     catch(...) {}
665 #endif
666     REQUIRE_MESSAGE ( statsObj.my_total_created==statsObj.my_total_deleted, "Not all parallel_reduce body objects created were reclaimed");
667 }
668 
669 //! Testing parallel_for and parallel_reduce cancellation
670 //! \brief \ref error_guessing
671 TEST_CASE("parallel_for and parallel_reduce cancellation test #1") {
672     for (auto concurrency_level: utils::concurrency_range()) {
673         g_NumThreads = static_cast<int>(concurrency_level);
674         g_Master = std::this_thread::get_id();
675         if (g_NumThreads > 1) {
676             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
677             // Execute in all the possible modes
678             for ( size_t j = 0; j < 4; ++j ) {
679                 g_ExceptionInMaster = (j & 1) != 0;
680                 g_SolitaryException = (j & 2) != 0;
681 
682                 TestCancelation1();
683             }
684         }
685     }
686 }
687 
688 //! Testing parallel_for and parallel_reduce cancellation
689 //! \brief \ref error_guessing
690 TEST_CASE("parallel_for and parallel_reduce cancellation test #2") {
691     for (auto concurrency_level: utils::concurrency_range()) {
692         g_NumThreads = static_cast<int>(concurrency_level);
693         g_Master = std::this_thread::get_id();
694         if (g_NumThreads > 1) {
695             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
696             // Execute in all the possible modes
697             for ( size_t j = 0; j < 4; ++j ) {
698                 g_ExceptionInMaster = (j & 1) != 0;
699                 g_SolitaryException = (j & 2) != 0;
700 
701                 TestCancelation2();
702             }
703         }
704     }
705 }
706 
707 //! Testing parallel_for and parallel_reduce cancellation
708 //! \brief \ref error_guessing
709 TEST_CASE("parallel_for and parallel_reduce cancellation test #3") {
710     for (auto concurrency_level: utils::concurrency_range()) {
711         g_NumThreads = static_cast<int>(concurrency_level);
712         g_Master = std::this_thread::get_id();
713         if (g_NumThreads > 1) {
714             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
715             // Execute in all the possible modes
716             for ( size_t j = 0; j < 4; ++j ) {
717                 g_ExceptionInMaster = (j & 1) != 0;
718                 g_SolitaryException = (j & 2) != 0;
719 
720                 TestCancelation3();
721             }
722         }
723     }
724 }
725 
726 //! Testing parallel_for and parallel_reduce cancellation
727 //! \brief \ref error_guessing
728 TEST_CASE("parallel_for and parallel_reduce cancellation test #4") {
729     for (auto concurrency_level: utils::concurrency_range()) {
730         g_NumThreads = static_cast<int>(concurrency_level);
731         g_Master = std::this_thread::get_id();
732         if (g_NumThreads > 1) {
733             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
734             // Execute in all the possible modes
735             for ( size_t j = 0; j < 4; ++j ) {
736                 g_ExceptionInMaster = (j & 1) != 0;
737                 g_SolitaryException = (j & 2) != 0;
738 
739                 TestCancelation4();
740             }
741         }
742     }
743 }
744 
745 ////////////////////////////////////////////////////////////////////////////////
746 // Tests for tbb::parallel_for_each
747 ////////////////////////////////////////////////////////////////////////////////
748 
749 #define ITER_RANGE          1000
750 #define ITEMS_TO_FEED       50
751 #define INNER_ITER_RANGE   100
752 #define OUTER_ITER_RANGE  50
753 
754 #define PREPARE_RANGE(Iterator, rangeSize)  \
755     size_t test_vector[rangeSize + 1]; \
756     for (int i =0; i < rangeSize; i++) \
757         test_vector[i] = i; \
758     Iterator begin(&test_vector[0]); \
759     Iterator end(&test_vector[rangeSize])
760 
761 void Feed ( tbb::feeder<size_t> &feeder, size_t val ) {
762     if (g_FedTasksCount < ITEMS_TO_FEED) {
763         ++g_FedTasksCount;
764         feeder.add(val);
765     }
766 }
767 
768 #define RunWithSimpleBody(func, body)       \
769     func<utils::ForwardIterator<size_t>, body>();         \
770     func<utils::ForwardIterator<size_t>, body##WithFeeder>()
771 
772 #define RunWithTemplatedBody(func, body)     \
773     func<utils::ForwardIterator<size_t>, body<utils::ForwardIterator<size_t> > >();         \
774     func<utils::ForwardIterator<size_t>, body##WithFeeder<utils::ForwardIterator<size_t> > >()
775 
776 #if TBB_USE_EXCEPTIONS
777 
778 // Simple functor object with exception
779 class SimpleParForEachBody {
780 public:
781     void operator() ( size_t &value ) const {
782         ++g_CurExecuted;
783         if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true;
784         else g_NonMasterExecuted = true;
785         if( tbb::is_current_task_group_canceling() ) {
786             g_TGCCancelled.increment();
787         }
788         utils::ConcurrencyTracker ct;
789         value += 1000;
790         WaitUntilConcurrencyPeaks();
791         ThrowTestException(1);
792     }
793 };
794 
795 // Simple functor object with exception and feeder
796 class SimpleParForEachBodyWithFeeder : SimpleParForEachBody {
797 public:
798     void operator() ( size_t &value, tbb::feeder<size_t> &feeder ) const {
799         Feed(feeder, 0);
800         SimpleParForEachBody::operator()(value);
801     }
802 };
803 
804 // Tests exceptions without nesting
805 template <class Iterator, class simple_body>
806 void Test1_parallel_for_each () {
807     ResetGlobals();
808     PREPARE_RANGE(Iterator, ITER_RANGE);
809     TRY();
810         tbb::parallel_for_each<Iterator, simple_body>(begin, end, simple_body() );
811     CATCH_AND_ASSERT();
812     REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
813     g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived cancellation");
814     REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test");
815     if ( !g_SolitaryException )
816         REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
817 
818 } // void Test1_parallel_for_each ()
819 
820 template <class Iterator>
821 class OuterParForEachBody {
822 public:
823     void operator()( size_t& /*value*/ ) const {
824         ++g_OuterParCalls;
825         PREPARE_RANGE(Iterator, INNER_ITER_RANGE);
826         tbb::parallel_for_each<Iterator, SimpleParForEachBody>(begin, end, SimpleParForEachBody());
827     }
828 };
829 
830 template <class Iterator>
831 class OuterParForEachBodyWithFeeder : OuterParForEachBody<Iterator> {
832 public:
833     void operator()( size_t& value, tbb::feeder<size_t>& feeder ) const {
834         Feed(feeder, 0);
835         OuterParForEachBody<Iterator>::operator()(value);
836     }
837 };
838 
839 //! Uses parallel_for_each body containing an inner parallel_for_each with the default context not wrapped by a try-block.
840 /** Inner algorithms are spawned inside the new bound context by default. Since
841     exceptions thrown from the inner parallel_for_each are not handled by the caller
842     (outer parallel_for_each body) in this test, they will cancel all the sibling inner
843     algorithms. **/
844 template <class Iterator, class outer_body>
845 void Test2_parallel_for_each () {
846     ResetGlobals();
847     PREPARE_RANGE(Iterator, ITER_RANGE);
848     TRY();
849         tbb::parallel_for_each<Iterator, outer_body >(begin, end, outer_body() );
850     CATCH_AND_ASSERT();
851     REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
852     g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived cancellation");
853     REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test");
854     if ( !g_SolitaryException )
855         REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
856 } // void Test2_parallel_for_each ()
857 
858 template <class Iterator>
859 class OuterParForEachBodyWithIsolatedCtx {
860 public:
861     void operator()( size_t& /*value*/ ) const {
862         tbb::task_group_context ctx(tbb::task_group_context::isolated);
863         ++g_OuterParCalls;
864         PREPARE_RANGE(Iterator, INNER_ITER_RANGE);
865         tbb::parallel_for_each<Iterator, SimpleParForEachBody>(begin, end, SimpleParForEachBody(), ctx);
866     }
867 };
868 
869 template <class Iterator>
870 class OuterParForEachBodyWithIsolatedCtxWithFeeder : OuterParForEachBodyWithIsolatedCtx<Iterator> {
871 public:
872     void operator()( size_t& value, tbb::feeder<size_t> &feeder ) const {
873         Feed(feeder, 0);
874         OuterParForEachBodyWithIsolatedCtx<Iterator>::operator()(value);
875     }
876 };
877 
878 //! Uses parallel_for_each body invoking an inner parallel_for_each with an isolated context without a try-block.
879 /** Even though exceptions thrown from the inner parallel_for_each are not handled
880     by the caller in this test, they will not affect sibling inner algorithms
881     already running because of the isolated contexts. However because the first
882     exception cancels the root parallel_for_each, at most the first g_NumThreads subranges
883     will be processed (which launch inner parallel_for_eachs) **/
884 template <class Iterator, class outer_body>
885 void Test3_parallel_for_each () {
886     ResetGlobals();
887     PREPARE_RANGE(Iterator, OUTER_ITER_RANGE);
888     intptr_t innerCalls = INNER_ITER_RANGE,
889              // The assumption here is the same as in outer parallel fors.
890              minExecuted = (g_NumThreads - 1) * innerCalls;
891     g_Master = std::this_thread::get_id();
892     TRY();
893         tbb::parallel_for_each<Iterator, outer_body >(begin, end, outer_body());
894     CATCH_AND_ASSERT();
895     // figure actual number of expected executions given the number of outer PDos started.
896     minExecuted = (g_OuterParCalls - 1) * innerCalls;
897     // one extra thread may run a task that sees cancellation.  Infrequent but possible
898     g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception");
899     if ( g_SolitaryException ) {
900         REQUIRE_MESSAGE (g_CurExecuted > minExecuted, "Too few tasks survived exception");
901         REQUIRE_MESSAGE (g_CurExecuted <= minExecuted + (g_ExecutedAtLastCatch + g_NumThreads), "Too many tasks survived exception");
902     }
903     REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test");
904     if ( !g_SolitaryException )
905         REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
906 } // void Test3_parallel_for_each ()
907 
908 template <class Iterator>
909 class OuterParForEachWithEhBody {
910 public:
911     void operator()( size_t& /*value*/ ) const {
912         tbb::task_group_context ctx(tbb::task_group_context::isolated);
913         ++g_OuterParCalls;
914         PREPARE_RANGE(Iterator, INNER_ITER_RANGE);
915         TRY();
916             tbb::parallel_for_each<Iterator, SimpleParForEachBody>(begin, end, SimpleParForEachBody(), ctx);
917         CATCH();
918     }
919 };
920 
921 template <class Iterator>
922 class OuterParForEachWithEhBodyWithFeeder : OuterParForEachWithEhBody<Iterator> {
923 public:
924     void operator=(const OuterParForEachWithEhBodyWithFeeder&) = delete;
925     OuterParForEachWithEhBodyWithFeeder(const OuterParForEachWithEhBodyWithFeeder&) = default;
926     OuterParForEachWithEhBodyWithFeeder() = default;
927     void operator()( size_t &value, tbb::feeder<size_t> &feeder ) const {
928         Feed(feeder, 0);
929         OuterParForEachWithEhBody<Iterator>::operator()(value);
930     }
931 };
932 
933 //! Uses parallel_for body invoking an inner parallel_for (with default bound context) inside a try-block.
934 /** Since exception(s) thrown from the inner parallel_for are handled by the caller
935     in this test, they do not affect neither other tasks of the the root parallel_for
936     nor sibling inner algorithms. **/
937 template <class Iterator, class outer_body_with_eh>
938 void Test4_parallel_for_each () {
939     ResetGlobals( true, true );
940     PREPARE_RANGE(Iterator, OUTER_ITER_RANGE);
941     g_Master = std::this_thread::get_id();
942     TRY();
943         tbb::parallel_for_each<Iterator, outer_body_with_eh>(begin, end, outer_body_with_eh());
944     CATCH();
945     REQUIRE_MESSAGE (!l_ExceptionCaughtAtCurrentLevel, "All exceptions must have been handled in the parallel_for_each body");
946     intptr_t innerCalls = INNER_ITER_RANGE,
947              outerCalls = OUTER_ITER_RANGE + g_FedTasksCount,
948              maxExecuted = outerCalls * innerCalls,
949              minExecuted = 0;
950     g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception");
951     if ( g_SolitaryException ) {
952         minExecuted = maxExecuted - innerCalls;
953         REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No exception registered");
954         REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too few tasks executed");
955         // This test has the same property as Test4 (parallel_for); the exception can be
956         // thrown, but some number of tasks from the outer Pdo can execute after the throw but
957         // before the cancellation is signaled (have seen 36).
958         DOCTEST_WARN_MESSAGE(g_CurExecuted < maxExecuted, "All tasks survived exception. Oversubscription?");
959     }
960     else {
961         minExecuted = g_NumExceptionsCaught;
962         REQUIRE_MESSAGE ((g_NumExceptionsCaught > 1 && g_NumExceptionsCaught <= outerCalls), "Unexpected actual number of exceptions");
963         REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too many executed tasks reported");
964         REQUIRE_MESSAGE (g_CurExecuted < g_ExecutedAtLastCatch + g_NumThreads + outerCalls, "Too many tasks survived multiple exceptions");
965         REQUIRE_MESSAGE (g_CurExecuted <= outerCalls * (1 + g_NumThreads), "Too many tasks survived exception");
966     }
967 } // void Test4_parallel_for_each ()
968 
969 // This body throws an exception only if the task was added by feeder
970 class ParForEachBodyWithThrowingFeederTasks {
971 public:
972     //! This form of the function call operator can be used when the body needs to add more work during the processing
973     void operator() (const size_t &value, tbb::feeder<size_t> &feeder ) const {
974         ++g_CurExecuted;
975         if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true;
976         else g_NonMasterExecuted = true;
977         if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment();
978         Feed(feeder, 1);
979         if (value == 1)
980             ThrowTestException(1);
981     }
982 }; // class ParForEachBodyWithThrowingFeederTasks
983 
984 // Test exception in task, which was added by feeder.
985 template <class Iterator>
986 void Test5_parallel_for_each () {
987     ResetGlobals();
988     PREPARE_RANGE(Iterator, ITER_RANGE);
989     g_Master = std::this_thread::get_id();
990     TRY();
991         tbb::parallel_for_each<Iterator, ParForEachBodyWithThrowingFeederTasks>(begin, end, ParForEachBodyWithThrowingFeederTasks());
992     CATCH();
993     if (g_SolitaryException) {
994         // Failure occurs when g_ExceptionInMaster is false, but all the 1 values in the range
995         // are handled by the external thread.  In this case no throw occurs.
996         REQUIRE_MESSAGE ((l_ExceptionCaughtAtCurrentLevel               // we saw an exception
997                 || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow)  // non-external trhead throws but none tried
998                 || (g_ExceptionInMaster && !g_MasterExecutedThrow))     // external thread throws but external thread didn't try
999                 , "At least one exception should occur");
1000     }
1001 } // void Test5_parallel_for_each ()
1002 
1003 //! Testing parallel_for_each exception handling
1004 //! \brief \ref error_guessing
1005 TEST_CASE("parallel_for_each exception handling test #1") {
1006     for (auto concurrency_level: utils::concurrency_range()) {
1007         g_NumThreads = static_cast<int>(concurrency_level);
1008         g_Master = std::this_thread::get_id();
1009         if (g_NumThreads > 1) {
1010             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
1011             // Execute in all the possible modes
1012             for ( size_t j = 0; j < 4; ++j ) {
1013                 g_ExceptionInMaster = (j & 1) != 0;
1014                 g_SolitaryException = (j & 2) != 0;
1015 
1016                 RunWithSimpleBody(Test1_parallel_for_each, SimpleParForEachBody);
1017             }
1018         }
1019     }
1020 }
1021 
1022 //! Testing parallel_for_each exception handling
1023 //! \brief \ref error_guessing
1024 TEST_CASE("parallel_for_each exception handling test #2") {
1025     for (auto concurrency_level: utils::concurrency_range()) {
1026         g_NumThreads = static_cast<int>(concurrency_level);
1027         g_Master = std::this_thread::get_id();
1028         if (g_NumThreads > 1) {
1029             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
1030             // Execute in all the possible modes
1031             for ( size_t j = 0; j < 4; ++j ) {
1032                 g_ExceptionInMaster = (j & 1) != 0;
1033                 g_SolitaryException = (j & 2) != 0;
1034 
1035                 RunWithTemplatedBody(Test2_parallel_for_each, OuterParForEachBody);
1036             }
1037         }
1038     }
1039 }
1040 
1041 //! Testing parallel_for_each exception handling
1042 //! \brief \ref error_guessing
1043 TEST_CASE("parallel_for_each exception handling test #3") {
1044     for (auto concurrency_level: utils::concurrency_range()) {
1045         g_NumThreads = static_cast<int>(concurrency_level);
1046         g_Master = std::this_thread::get_id();
1047         if (g_NumThreads > 1) {
1048             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
1049             // Execute in all the possible modes
1050             for ( size_t j = 0; j < 4; ++j ) {
1051                 g_ExceptionInMaster = (j & 1) != 0;
1052                 g_SolitaryException = (j & 2) != 0;
1053 
1054                 RunWithTemplatedBody(Test3_parallel_for_each, OuterParForEachBodyWithIsolatedCtx);
1055             }
1056         }
1057     }
1058 }
1059 
1060 //! Testing parallel_for_each exception handling
1061 //! \brief \ref error_guessing
1062 TEST_CASE("parallel_for_each exception handling test #4") {
1063     for (auto concurrency_level: utils::concurrency_range()) {
1064         g_NumThreads = static_cast<int>(concurrency_level);
1065         g_Master = std::this_thread::get_id();
1066         if (g_NumThreads > 1) {
1067             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
1068             // Execute in all the possible modes
1069             for ( size_t j = 0; j < 4; ++j ) {
1070                 g_ExceptionInMaster = (j & 1) != 0;
1071                 g_SolitaryException = (j & 2) != 0;
1072 
1073                 RunWithTemplatedBody(Test4_parallel_for_each, OuterParForEachWithEhBody);
1074             }
1075         }
1076     }
1077 }
1078 
1079 //! Testing parallel_for_each exception handling
1080 //! \brief \ref error_guessing
1081 TEST_CASE("parallel_for_each exception handling test #5") {
1082     for (auto concurrency_level: utils::concurrency_range()) {
1083         g_NumThreads = static_cast<int>(concurrency_level);
1084         g_Master = std::this_thread::get_id();
1085         if (g_NumThreads > 1) {
1086             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
1087             // Execute in all the possible modes
1088             for ( size_t j = 0; j < 4; ++j ) {
1089                 g_ExceptionInMaster = (j & 1) != 0;
1090                 g_SolitaryException = (j & 2) != 0;
1091 
1092                 Test5_parallel_for_each<utils::InputIterator<size_t> >();
1093                 Test5_parallel_for_each<utils::ForwardIterator<size_t> >();
1094                 Test5_parallel_for_each<utils::RandomIterator<size_t> >();
1095             }
1096         }
1097     }
1098 }
1099 
1100 #endif /* TBB_USE_EXCEPTIONS */
1101 
1102 class ParForEachBodyToCancel {
1103 public:
1104     void operator()( size_t& /*value*/ ) const {
1105         ++g_CurExecuted;
1106         Cancellator::WaitUntilReady();
1107     }
1108 };
1109 
1110 class ParForEachBodyToCancelWithFeeder : ParForEachBodyToCancel {
1111 public:
1112     void operator()( size_t& value, tbb::feeder<size_t> &feeder ) const {
1113         Feed(feeder, 0);
1114         ParForEachBodyToCancel::operator()(value);
1115     }
1116 };
1117 
1118 template<class B, class Iterator>
1119 class ParForEachWorker {
1120     tbb::task_group_context &my_ctx;
1121 public:
1122     void operator()() const {
1123         PREPARE_RANGE(Iterator, INNER_ITER_RANGE);
1124         tbb::parallel_for_each<Iterator, B>( begin, end, B(), my_ctx );
1125     }
1126 
1127     ParForEachWorker ( tbb::task_group_context& ctx ) : my_ctx(ctx) {}
1128 };
1129 
1130 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm).
1131 template <class Iterator, class body_to_cancel>
1132 void TestCancelation1_parallel_for_each () {
1133     ResetGlobals( false );
1134     intptr_t  threshold = 10;
1135     tbb::task_group tg;
1136     tbb::task_group_context  ctx;
1137     Cancellator cancellator(ctx, threshold);
1138     ParForEachWorker<body_to_cancel, Iterator> worker(ctx);
1139     tg.run( cancellator );
1140     utils::yield();
1141     tg.run( worker );
1142     TRY();
1143         tg.wait();
1144     CATCH_AND_FAIL();
1145     REQUIRE_MESSAGE (g_CurExecuted < g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks were executed after cancellation");
1146 }
1147 
1148 class ParForEachBodyToCancel2 {
1149 public:
1150     void operator()( size_t& /*value*/ ) const {
1151         ++g_CurExecuted;
1152         utils::ConcurrencyTracker ct;
1153         // The test will hang (and be timed out by the test system) if is_cancelled() is broken
1154         while( !tbb::is_current_task_group_canceling() )
1155             utils::yield();
1156     }
1157 };
1158 
1159 class ParForEachBodyToCancel2WithFeeder : ParForEachBodyToCancel2 {
1160 public:
1161     void operator()( size_t& value, tbb::feeder<size_t> &feeder ) const {
1162         Feed(feeder, 0);
1163         ParForEachBodyToCancel2::operator()(value);
1164     }
1165 };
1166 
1167 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm).
1168 /** This version also tests tbb::is_current_task_group_canceling() method. **/
1169 template <class Iterator, class body_to_cancel>
1170 void TestCancelation2_parallel_for_each () {
1171     ResetGlobals();
1172     RunCancellationTest<ParForEachWorker<body_to_cancel, Iterator>, Cancellator2>();
1173 }
1174 
1175 //! Testing parallel_for_each cancellation test
1176 //! \brief \ref error_guessing
1177 TEST_CASE("parallel_for_each cancellation test #1") {
1178     for (auto concurrency_level: utils::concurrency_range()) {
1179         g_NumThreads = static_cast<int>(concurrency_level);
1180         g_Master = std::this_thread::get_id();
1181         if (g_NumThreads > 1) {
1182             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
1183             // Execute in all the possible modes
1184             for ( size_t j = 0; j < 4; ++j ) {
1185                 g_ExceptionInMaster = (j & 1) != 0;
1186                 g_SolitaryException = (j & 2) != 0;
1187 
1188                 RunWithSimpleBody(TestCancelation1_parallel_for_each, ParForEachBodyToCancel);
1189             }
1190         }
1191     }
1192 }
1193 
1194 //! Testing parallel_for_each cancellation test
1195 //! \brief \ref error_guessing
1196 TEST_CASE("parallel_for_each cancellation test #2") {
1197     for (auto concurrency_level: utils::concurrency_range()) {
1198         g_NumThreads = static_cast<int>(concurrency_level);
1199         g_Master = std::this_thread::get_id();
1200         if (g_NumThreads > 1) {
1201             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
1202             // Execute in all the possible modes
1203             for ( size_t j = 0; j < 4; ++j ) {
1204                 g_ExceptionInMaster = (j & 1) != 0;
1205                 g_SolitaryException = (j & 2) != 0;
1206 
1207                 RunWithSimpleBody(TestCancelation2_parallel_for_each, ParForEachBodyToCancel2);
1208             }
1209         }
1210     }
1211 }
1212 
1213 ////////////////////////////////////////////////////////////////////////////////
1214 // Tests for tbb::parallel_pipeline
1215 ////////////////////////////////////////////////////////////////////////////////
1216 #define NUM_ITEMS   100
1217 
1218 const size_t c_DataEndTag = size_t(~0);
1219 
1220 int g_NumTokens = 0;
1221 
1222 // Simple input filter class, it assigns 1 to all array members
1223 // It stops when it receives item equal to -1
1224 class InputFilter {
1225     mutable std::atomic<size_t> m_Item{};
1226     mutable size_t m_Buffer[NUM_ITEMS + 1];
1227 public:
1228     InputFilter() {
1229         m_Item = 0;
1230         for (size_t i = 0; i < NUM_ITEMS; ++i )
1231             m_Buffer[i] = 1;
1232         m_Buffer[NUM_ITEMS] = c_DataEndTag;
1233     }
1234     InputFilter(const InputFilter& other) : m_Item(other.m_Item.load()) {
1235         for (size_t i = 0; i < NUM_ITEMS; ++i )
1236             m_Buffer[i] = other.m_Buffer[i];
1237     }
1238 
1239     void* operator()(tbb::flow_control& control) const {
1240         size_t item = m_Item++;
1241         if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true;
1242         else g_NonMasterExecuted = true;
1243         if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment();
1244         if(item == 1) {
1245             ++g_PipelinesStarted;   // count on emitting the first item.
1246         }
1247         if ( item >= NUM_ITEMS ) {
1248             control.stop();
1249             return nullptr;
1250         }
1251         m_Buffer[item] = 1;
1252         return &m_Buffer[item];
1253     }
1254 
1255     size_t* buffer() { return m_Buffer; }
1256 }; // class InputFilter
1257 
1258 #if TBB_USE_EXCEPTIONS
1259 
1260 // Simple filter with exception throwing.  If parallel, will wait until
1261 // as many parallel filters start as there are threads.
1262 class SimpleFilter {
1263     bool m_canThrow;
1264     bool m_serial;
1265 public:
1266     SimpleFilter (bool canThrow, bool serial ) : m_canThrow(canThrow), m_serial(serial) {}
1267     void* operator()(void* item) const {
1268         ++g_CurExecuted;
1269         if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true;
1270         else g_NonMasterExecuted = true;
1271         if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment();
1272         if ( m_canThrow ) {
1273             if ( !m_serial ) {
1274                 utils::ConcurrencyTracker ct;
1275                 WaitUntilConcurrencyPeaks( std::min(g_NumTokens, g_NumThreads) );
1276             }
1277             ThrowTestException(1);
1278         }
1279         return item;
1280     }
1281 }; // class SimpleFilter
1282 
1283 // This enumeration represents filters order in pipeline
1284 struct FilterSet {
1285     tbb::filter_mode mode1, mode2;
1286     bool throw1, throw2;
1287 
1288     FilterSet( tbb::filter_mode m1, tbb::filter_mode m2, bool t1, bool t2 )
1289         : mode1(m1), mode2(m2), throw1(t1), throw2(t2)
1290     {}
1291 }; // struct FilterSet
1292 
1293 FilterSet serial_parallel( tbb::filter_mode::serial_in_order, tbb::filter_mode::parallel, /*throw1*/false, /*throw2*/true );
1294 
1295 template<typename InFilter, typename Filter>
1296 class CustomPipeline {
1297     InFilter inputFilter;
1298     Filter filter1;
1299     Filter filter2;
1300     FilterSet my_filters;
1301 public:
1302     CustomPipeline( const FilterSet& filters )
1303         : filter1(filters.throw1, filters.mode1 != tbb::filter_mode::parallel),
1304           filter2(filters.throw2, filters.mode2 != tbb::filter_mode::parallel),
1305           my_filters(filters)
1306     {}
1307 
1308     void run () {
1309         tbb::parallel_pipeline(
1310             g_NumTokens,
1311             tbb::make_filter<void, void*>(tbb::filter_mode::parallel, inputFilter) &
1312             tbb::make_filter<void*, void*>(my_filters.mode1, filter1) &
1313             tbb::make_filter<void*, void>(my_filters.mode2, filter2)
1314         );
1315     }
1316     void run ( tbb::task_group_context& ctx ) {
1317         tbb::parallel_pipeline(
1318             g_NumTokens,
1319             tbb::make_filter<void, void*>(tbb::filter_mode::parallel, inputFilter) &
1320             tbb::make_filter<void*, void*>(my_filters.mode1, filter1) &
1321             tbb::make_filter<void*, void>(my_filters.mode2, filter2),
1322             ctx
1323         );
1324     }
1325 };
1326 
1327 typedef CustomPipeline<InputFilter, SimpleFilter> SimplePipeline;
1328 
1329 // Tests exceptions without nesting
1330 void Test1_pipeline ( const FilterSet& filters ) {
1331     ResetGlobals();
1332     SimplePipeline testPipeline(filters);
1333     TRY();
1334         testPipeline.run();
1335         if ( g_CurExecuted == 2 * NUM_ITEMS ) {
1336             // all the items were processed, though an exception was supposed to occur.
1337             if(!g_ExceptionInMaster && g_NonMasterExecutedThrow > 0) {
1338                 // if !g_ExceptionInMaster, the external thread is not allowed to throw.
1339                 // if g_nonMasterExcutedThrow > 0 then a thread besides the external thread tried to throw.
1340                 REQUIRE_MESSAGE((filters.mode1 != tbb::filter_mode::parallel && filters.mode2 != tbb::filter_mode::parallel),
1341                     "Unusual count");
1342             }
1343             // In case of all serial filters they might be all executed in the thread(s)
1344             // where exceptions are not allowed by the common test logic. So we just quit.
1345             return;
1346         }
1347     CATCH_AND_ASSERT();
1348     g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception");
1349     REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
1350     REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test");
1351     if ( !g_SolitaryException )
1352         REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
1353 
1354 } // void Test1_pipeline ()
1355 
1356 // Filter with nesting
1357 class OuterFilter {
1358 public:
1359     OuterFilter ( bool, bool ) {}
1360 
1361     void* operator()(void* item) const {
1362         ++g_OuterParCalls;
1363         SimplePipeline testPipeline(serial_parallel);
1364         testPipeline.run();
1365         return item;
1366     }
1367 }; // class OuterFilter
1368 
1369 //! Uses pipeline containing an inner pipeline with the default context not wrapped by a try-block.
1370 /** Inner algorithms are spawned inside the new bound context by default. Since
1371     exceptions thrown from the inner pipeline are not handled by the caller
1372     (outer pipeline body) in this test, they will cancel all the sibling inner
1373     algorithms. **/
1374 void Test2_pipeline ( const FilterSet& filters ) {
1375     ResetGlobals();
1376     g_NestedPipelines = true;
1377     CustomPipeline<InputFilter, OuterFilter> testPipeline(filters);
1378     TRY();
1379         testPipeline.run();
1380     CATCH_AND_ASSERT();
1381     bool okayNoExceptionCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow);
1382     REQUIRE_MESSAGE ((g_NumExceptionsCaught == 1 || okayNoExceptionCaught), "No try_blocks in any body expected in this test");
1383     if ( !g_SolitaryException ) {
1384         REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
1385     }
1386 } // void Test2_pipeline ()
1387 
1388 //! creates isolated inner pipeline and runs it.
1389 class OuterFilterWithIsolatedCtx {
1390 public:
1391     OuterFilterWithIsolatedCtx( bool , bool ) {}
1392 
1393     void* operator()(void* item) const {
1394         ++g_OuterParCalls;
1395         tbb::task_group_context ctx(tbb::task_group_context::isolated);
1396         // create inner pipeline with serial input, parallel output filter, second filter throws
1397         SimplePipeline testPipeline(serial_parallel);
1398         testPipeline.run(ctx);
1399         return item;
1400     }
1401 }; // class OuterFilterWithIsolatedCtx
1402 
1403 //! Uses pipeline invoking an inner pipeline with an isolated context without a try-block.
1404 /** Even though exceptions thrown from the inner pipeline are not handled
1405     by the caller in this test, they will not affect sibling inner algorithms
1406     already running because of the isolated contexts. However because the first
1407     exception cancels the root parallel_for_each only the first g_NumThreads subranges
1408     will be processed (which launch inner pipelines) **/
1409 void Test3_pipeline ( const FilterSet& filters ) {
1410     for( int nTries = 1; nTries <= 4; ++nTries) {
1411         ResetGlobals();
1412         g_NestedPipelines = true;
1413         g_Master = std::this_thread::get_id();
1414         intptr_t innerCalls = NUM_ITEMS,
1415                  minExecuted = (g_NumThreads - 1) * innerCalls;
1416         CustomPipeline<InputFilter, OuterFilterWithIsolatedCtx> testPipeline(filters);
1417         TRY();
1418             testPipeline.run();
1419         CATCH_AND_ASSERT();
1420 
1421         bool okayNoExceptionCaught = (g_ExceptionInMaster && !g_MasterExecuted) ||
1422             (!g_ExceptionInMaster && !g_NonMasterExecuted);
1423         // only test assertions if the test threw an exception (or we don't care)
1424         bool testSucceeded = okayNoExceptionCaught || g_NumExceptionsCaught > 0;
1425         if(testSucceeded) {
1426             if (g_SolitaryException) {
1427 
1428                 // The test is one outer pipeline with two NestedFilters that each start an inner pipeline.
1429                 // Each time the input filter of a pipeline delivers its first item, it increments
1430                 // g_PipelinesStarted.  When g_SolitaryException, the throw will not occur until
1431                 // g_PipelinesStarted >= 3.  (This is so at least a second pipeline in its own isolated
1432                 // context will start; that is what we're testing.)
1433                 //
1434                 // There are two pipelines which will NOT run to completion when a solitary throw
1435                 // happens in an isolated inner context: the outer pipeline and the pipeline which
1436                 // throws.  All the other pipelines which start should run to completion.  But only
1437                 // inner body invocations are counted.
1438                 //
1439                 // So g_CurExecuted should be about
1440                 //
1441                 //   (2*NUM_ITEMS) * (g_PipelinesStarted - 2) + 1
1442                 //   ^ executions for each completed pipeline
1443                 //                   ^ completing pipelines (remembering two will not complete)
1444                 //                                              ^ one for the inner throwing pipeline
1445 
1446                 minExecuted = (2*NUM_ITEMS) * (g_PipelinesStarted - 2) + 1;
1447                 // each failing pipeline must execute at least two tasks
1448                 REQUIRE_MESSAGE(g_CurExecuted >= minExecuted, "Too few tasks survived exception");
1449                 // no more than g_NumThreads tasks will be executed in a cancelled context.  Otherwise
1450                 // tasks not executing at throw were scheduled.
1451                 g_TGCCancelled.validate(g_NumThreads, "Tasks not in-flight were executed");
1452                 REQUIRE_MESSAGE(g_NumExceptionsCaught == 1, "Should have only one exception");
1453                 // if we're only throwing from the external thread, and that thread didn't
1454                 // participate in the pipelines, then no throw occurred.
1455             }
1456             REQUIRE_MESSAGE ((g_NumExceptionsCaught == 1 || okayNoExceptionCaught), "No try_blocks in any body expected in this test");
1457             REQUIRE_MESSAGE (((g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads) || okayNoExceptionCaught), "Too many tasks survived exception");
1458             return;
1459         }
1460     }
1461 }
1462 
1463 class OuterFilterWithEhBody  {
1464 public:
1465     OuterFilterWithEhBody( bool, bool ){}
1466 
1467     void* operator()(void* item) const {
1468         tbb::task_group_context ctx(tbb::task_group_context::isolated);
1469         ++g_OuterParCalls;
1470         SimplePipeline testPipeline(serial_parallel);
1471         TRY();
1472             testPipeline.run(ctx);
1473         CATCH();
1474         return item;
1475     }
1476 }; // class OuterFilterWithEhBody
1477 
1478 //! Uses pipeline body invoking an inner pipeline (with isolated context) inside a try-block.
1479 /** Since exception(s) thrown from the inner pipeline are handled by the caller
1480     in this test, they do not affect other tasks of the the root pipeline
1481     nor sibling inner algorithms. **/
1482 void Test4_pipeline ( const FilterSet& filters ) {
1483 #if __GNUC__ && !__INTEL_COMPILER
1484     if ( strncmp(__VERSION__, "4.1.0", 5) == 0 ) {
1485         MESSAGE("Known issue: one of exception handling tests is skipped.");
1486         return;
1487     }
1488 #endif
1489     ResetGlobals( true, true );
1490     // each outer pipeline stage will start NUM_ITEMS inner pipelines.
1491     // each inner pipeline that doesn't throw will process NUM_ITEMS items.
1492     // for solitary exception there will be one pipeline that only processes one stage, one item.
1493     // innerCalls should be 2*NUM_ITEMS
1494     intptr_t innerCalls = 2*NUM_ITEMS,
1495              outerCalls = 2*NUM_ITEMS,
1496              maxExecuted = outerCalls * innerCalls;  // the number of invocations of the inner pipelines
1497     CustomPipeline<InputFilter, OuterFilterWithEhBody> testPipeline(filters);
1498     TRY();
1499         testPipeline.run();
1500     CATCH_AND_ASSERT();
1501     intptr_t  minExecuted = 0;
1502     bool okayNoExceptionCaught = (g_ExceptionInMaster && !g_MasterExecuted) ||
1503         (!g_ExceptionInMaster && !g_NonMasterExecuted);
1504     if ( g_SolitaryException ) {
1505         minExecuted = maxExecuted - innerCalls;  // one throwing inner pipeline
1506         REQUIRE_MESSAGE((g_NumExceptionsCaught == 1 || okayNoExceptionCaught), "No exception registered");
1507         g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception");  // probably will assert.
1508     }
1509     else {
1510         // we assume throwing pipelines will not count
1511         minExecuted = (outerCalls - g_NumExceptionsCaught) * innerCalls;
1512         REQUIRE_MESSAGE(((g_NumExceptionsCaught >= 1 && g_NumExceptionsCaught <= outerCalls) || okayNoExceptionCaught), "Unexpected actual number of exceptions");
1513         REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too many executed tasks reported");
1514         // too many already-scheduled tasks are started after the first exception is
1515         // thrown.  And g_ExecutedAtLastCatch is updated every time an exception is caught.
1516         // So with multiple exceptions there are a variable number of tasks that have been
1517         // discarded because of the signals.
1518         // each throw is caught, so we will see many cancelled tasks.  g_ExecutedAtLastCatch is
1519         // updated with each throw, so the value will be the number of tasks executed at the last
1520         REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived multiple exceptions");
1521     }
1522 } // void Test4_pipeline ()
1523 
1524 //! Tests pipeline function passed with different combination of filters
1525 template<void testFunc(const FilterSet&)>
1526 void TestWithDifferentFiltersAndConcurrency() {
1527     for (auto concurrency_level: utils::concurrency_range()) {
1528         g_NumThreads = static_cast<int>(concurrency_level);
1529         g_Master = std::this_thread::get_id();
1530         if (g_NumThreads > 1) {
1531             // Execute in all the possible modes
1532             for ( size_t j = 0; j < 4; ++j ) {
1533                 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
1534                 g_ExceptionInMaster = (j & 1) != 0;
1535                 g_SolitaryException = (j & 2) != 0;
1536                 g_NumTokens = 2 * g_NumThreads;
1537                 const int NumFilterTypes = 3;
1538                 const tbb::filter_mode modes[NumFilterTypes] = {
1539                     tbb::filter_mode::parallel,
1540                     tbb::filter_mode::serial_in_order,
1541                     tbb::filter_mode::serial_out_of_order
1542                 };
1543 
1544                 for ( int i = 0; i < NumFilterTypes; ++i ) {
1545                     for ( int n = 0; n < NumFilterTypes; ++n ) {
1546                         for ( int k = 0; k < 2; ++k )
1547                             testFunc( FilterSet(modes[i], modes[j], k == 0, k != 0) );
1548                     }
1549                 }
1550             }
1551         }
1552     }
1553 }
1554 
1555 //! Testing parallel_pipeline exception handling
1556 //! \brief \ref error_guessing
1557 TEST_CASE("parallel_pipeline exception handling test #1") {
1558     TestWithDifferentFiltersAndConcurrency<Test1_pipeline>();
1559 }
1560 
1561 //! Testing parallel_pipeline exception handling
1562 //! \brief \ref error_guessing
1563 TEST_CASE("parallel_pipeline exception handling test #2") {
1564     TestWithDifferentFiltersAndConcurrency<Test2_pipeline>();
1565 }
1566 
1567 //! Testing parallel_pipeline exception handling
1568 //! \brief \ref error_guessing
1569 TEST_CASE("parallel_pipeline exception handling test #3") {
1570     TestWithDifferentFiltersAndConcurrency<Test3_pipeline>();
1571 }
1572 
1573 //! Testing parallel_pipeline exception handling
1574 //! \brief \ref error_guessing
1575 TEST_CASE("parallel_pipeline exception handling test #4") {
1576     TestWithDifferentFiltersAndConcurrency<Test4_pipeline>();
1577 }
1578 
1579 #endif /* TBB_USE_EXCEPTIONS */
1580 
1581 class FilterToCancel  {
1582 public:
1583     FilterToCancel() {}
1584     void* operator()(void* item) const {
1585         ++g_CurExecuted;
1586         Cancellator::WaitUntilReady();
1587         return item;
1588     }
1589 }; // class FilterToCancel
1590 
1591 template <class Filter_to_cancel>
1592 class PipelineLauncher {
1593     tbb::task_group_context &my_ctx;
1594 public:
1595     PipelineLauncher ( tbb::task_group_context& ctx ) : my_ctx(ctx) {}
1596 
1597     void operator()() const {
1598         // Run test when serial filter is the first non-input filter
1599         InputFilter inputFilter;
1600         Filter_to_cancel filterToCancel;
1601         tbb::parallel_pipeline(
1602             g_NumTokens,
1603             tbb::make_filter<void, void*>(tbb::filter_mode::parallel, inputFilter) &
1604             tbb::make_filter<void*, void>(tbb::filter_mode::parallel, filterToCancel),
1605             my_ctx
1606         );
1607     }
1608 };
1609 
1610 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm).
1611 void TestCancelation1_pipeline () {
1612     ResetGlobals();
1613     g_ThrowException = false;
1614     RunCancellationTest<PipelineLauncher<FilterToCancel>, Cancellator>(10);
1615     g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived cancellation");
1616     REQUIRE_MESSAGE (g_CurExecuted < g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks were executed after cancellation");
1617 }
1618 
1619 class FilterToCancel2  {
1620 public:
1621     FilterToCancel2() {}
1622 
1623     void* operator()(void* item) const {
1624         ++g_CurExecuted;
1625         utils::ConcurrencyTracker ct;
1626         // The test will hang (and be timed out by the test system) if is_cancelled() is broken
1627         while( !tbb::is_current_task_group_canceling() )
1628             utils::yield();
1629         return item;
1630     }
1631 };
1632 
1633 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm).
1634 /** This version also tests task::is_cancelled() method. **/
1635 void TestCancelation2_pipeline () {
1636     ResetGlobals();
1637     RunCancellationTest<PipelineLauncher<FilterToCancel2>, Cancellator2>();
1638     // g_CurExecuted is always >= g_ExecutedAtLastCatch, because the latter is always a snapshot of the
1639     // former, and g_CurExecuted is monotonic increasing.  so the comparison should be at least ==.
1640     // If another filter is started after cancel but before cancellation is propagated, then the
1641     // number will be larger.
1642     REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch, "Some tasks were executed after cancellation");
1643 }
1644 
1645 /** If min and max thread numbers specified on the command line are different,
1646     the test is run only for 2 sizes of the thread pool (MinThread and MaxThread)
1647     to be able to test the high and low contention modes while keeping the test reasonably fast **/
1648 
1649 //! Testing parallel_pipeline cancellation
1650 //! \brief \ref error_guessing
1651 TEST_CASE("parallel_pipeline cancellation test #1") {
1652     for (auto concurrency_level: utils::concurrency_range()) {
1653         g_NumThreads = static_cast<int>(concurrency_level);
1654         g_Master = std::this_thread::get_id();
1655         if (g_NumThreads > 1) {
1656             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
1657             // Execute in all the possible modes
1658             for ( size_t j = 0; j < 4; ++j ) {
1659                 g_ExceptionInMaster = (j & 1) != 0;
1660                 g_SolitaryException = (j & 2) != 0;
1661                 g_NumTokens = 2 * g_NumThreads;
1662 
1663                 TestCancelation1_pipeline();
1664             }
1665         }
1666     }
1667 }
1668 
1669 //! Testing parallel_pipeline cancellation
1670 //! \brief \ref error_guessing
1671 TEST_CASE("parallel_pipeline cancellation test #2") {
1672     for (auto concurrency_level: utils::concurrency_range()) {
1673         g_NumThreads = static_cast<int>(concurrency_level);
1674         g_Master = std::this_thread::get_id();
1675         if (g_NumThreads > 1) {
1676             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
1677             // Execute in all the possible modes
1678             for ( size_t j = 0; j < 4; ++j ) {
1679                 g_ExceptionInMaster = (j & 1) != 0;
1680                 g_SolitaryException = (j & 2) != 0;
1681                 g_NumTokens = 2 * g_NumThreads;
1682 
1683                 TestCancelation2_pipeline();
1684             }
1685         }
1686     }
1687 }
1688