xref: /oneTBB/test/tbb/test_eh_algorithms.cpp (revision d86ed7fb)
1 /*
2     Copyright (c) 2005-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "common/test.h"
18 #include "common/concurrency_tracker.h"
19 #include "common/iterator.h"
20 #include "common/utils_concurrency_limit.h"
21 
22 #include <limits.h> // for INT_MAX
23 #include <thread>
24 
25 #include "tbb/parallel_for.h"
26 #include "tbb/parallel_reduce.h"
27 #include "tbb/parallel_for_each.h"
28 #include "tbb/parallel_pipeline.h"
29 #include "tbb/blocked_range.h"
30 #include "tbb/task_group.h"
31 #include "tbb/global_control.h"
32 #include "tbb/concurrent_unordered_map.h"
33 #include "tbb/task.h"
34 
35 //! \file test_eh_algorithms.cpp
36 //! \brief Test for [algorithms.parallel_for algorithms.parallel_reduce algorithms.parallel_deterministic_reduce algorithms.parallel_for_each algorithms.parallel_pipeline algorithms.parallel_pipeline.flow_control] specifications
37 
38 #define FLAT_RANGE  100000
39 #define FLAT_GRAIN  100
40 #define OUTER_RANGE  100
41 #define OUTER_GRAIN  10
42 #define INNER_RANGE  (FLAT_RANGE / OUTER_RANGE)
43 #define INNER_GRAIN  (FLAT_GRAIN / OUTER_GRAIN)
44 
45 struct context_specific_counter {
46     tbb::concurrent_unordered_map<tbb::task_group_context*, std::atomic<unsigned>> context_map{};
47 
48     void increment() {
49         tbb::task_group_context* ctx = tbb::task::current_context();
50         REQUIRE(ctx != nullptr);
51         context_map[ctx]++;
52     }
53 
54     void reset() {
55         context_map.clear();
56     }
57 
58     void validate(unsigned expected_count, const char* msg) {
59         for (auto it = context_map.begin(); it != context_map.end(); it++) {
60             REQUIRE_MESSAGE( it->second <= expected_count, msg);
61         }
62     }
63 };
64 
65 std::atomic<intptr_t> g_FedTasksCount{}; // number of tasks added by parallel_for_each feeder
66 std::atomic<intptr_t> g_OuterParCalls{};  // number of actual invocations of the outer construct executed.
67 context_specific_counter g_TGCCancelled{};  // Number of times a task sees its group cancelled at start
68 
69 #include "common/exception_handling.h"
70 
71 /********************************
72       Variables in test
73 
74 __ Test control variables
75       g_ExceptionInMaster -- only the master thread is allowed to throw.  If false, the master cannot throw
76       g_SolitaryException -- only one throw may be executed.
77 
78 -- controls for ThrowTestException for pipeline tests
79       g_NestedPipelines -- are inner pipelines being run?
80       g_PipelinesStarted -- how many pipelines have run their first filter at least once.
81 
82 -- Information variables
83 
84    g_Master -- Thread ID of the "master" thread
85       In pipelines sometimes the master thread does not participate, so the tests have to be resilient to this.
86 
87 -- Measurement variables
88 
89    g_OuterParCalls -- how many outer parallel ranges or filters started
90    g_TGCCancelled --  how many inner parallel ranges or filters saw task::self().is_cancelled()
91    g_ExceptionsThrown -- number of throws executed (counted in ThrowTestException)
92    g_MasterExecutedThrow -- number of times master thread actually executed a throw
93    g_NonMasterExecutedThrow -- number of times non-master thread actually executed a throw
94    g_ExceptionCaught -- one of PropagatedException or unknown exception was caught.  (Other exceptions cause assertions.)
95 
96    --  Tallies for the task bodies which have executed (counted in each inner body, sampled in ThrowTestException)
97       g_CurExecuted -- total number of inner ranges or filters which executed
98       g_ExecutedAtLastCatch -- value of g_CurExecuted when last catch was made, 0 if none.
99       g_ExecutedAtFirstCatch -- value of g_CurExecuted when first catch is made, 0 if none.
100   *********************************/
101 
102 inline void ResetGlobals (  bool throwException = true, bool flog = false ) {
103     ResetEhGlobals( throwException, flog );
104     g_FedTasksCount = 0;
105     g_OuterParCalls = 0;
106     g_NestedPipelines = false;
107     g_TGCCancelled.reset();
108 }
109 
110 ////////////////////////////////////////////////////////////////////////////////
111 // Tests for tbb::parallel_for and tbb::parallel_reduce
112 ////////////////////////////////////////////////////////////////////////////////
113 
114 typedef size_t count_type;
115 typedef tbb::blocked_range<count_type> range_type;
116 
117 inline intptr_t CountSubranges(range_type r) {
118     if(!r.is_divisible()) return intptr_t(1);
119     range_type r2(r,tbb::split());
120     return CountSubranges(r) + CountSubranges(r2);
121 }
122 
123 inline intptr_t NumSubranges ( intptr_t length, intptr_t grain ) {
124     return CountSubranges(range_type(0,length,grain));
125 }
126 
127 template<class Body>
128 intptr_t TestNumSubrangesCalculation ( intptr_t length, intptr_t grain, intptr_t inner_length, intptr_t inner_grain ) {
129     ResetGlobals();
130     g_ThrowException = false;
131     intptr_t outerCalls = NumSubranges(length, grain),
132              innerCalls = NumSubranges(inner_length, inner_grain),
133              maxExecuted = outerCalls * (innerCalls + 1);
134     tbb::parallel_for( range_type(0, length, grain), Body() );
135     REQUIRE_MESSAGE (g_CurExecuted == maxExecuted, "Wrong estimation of bodies invocation count");
136     return maxExecuted;
137 }
138 
139 class NoThrowParForBody {
140 public:
141     void operator()( const range_type& r ) const {
142         volatile count_type x = 0;
143         if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true;
144         else g_NonMasterExecuted = true;
145         if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment();
146         count_type end = r.end();
147         for( count_type i=r.begin(); i<end; ++i )
148             x += i;
149     }
150 };
151 
152 #if TBB_USE_EXCEPTIONS
153 
154 void Test0 () {
155     ResetGlobals();
156     tbb::simple_partitioner p;
157     for( size_t i=0; i<10; ++i ) {
158         tbb::parallel_for( range_type(0, 0, 1), NoThrowParForBody() );
159         tbb::parallel_for( range_type(0, 0, 1), NoThrowParForBody(), p );
160         tbb::parallel_for( range_type(0, 128, 8), NoThrowParForBody() );
161         tbb::parallel_for( range_type(0, 128, 8), NoThrowParForBody(), p );
162     }
163 } // void Test0 ()
164 
165 //! Template that creates a functor suitable for parallel_reduce from a functor for parallel_for.
166 template<typename ParForBody>
167 class SimpleParReduceBody {
168     ParForBody m_Body;
169 public:
170     void operator=(const SimpleParReduceBody&) = delete;
171     SimpleParReduceBody(const SimpleParReduceBody&) = default;
172     SimpleParReduceBody() = default;
173 
174     void operator()( const range_type& r ) const { m_Body(r); }
175     SimpleParReduceBody( SimpleParReduceBody& left, tbb::split ) : m_Body(left.m_Body) {}
176     void join( SimpleParReduceBody& /*right*/ ) {}
177 }; // SimpleParReduceBody
178 
179 //! Test parallel_for and parallel_reduce for a given partitioner.
180 /** The Body need only be suitable for a parallel_for. */
181 template<typename ParForBody, typename Partitioner>
182 void TestParallelLoopAux() {
183     Partitioner partitioner;
184     for( int i=0; i<2; ++i ) {
185         ResetGlobals();
186         TRY();
187             if( i==0 )
188                 tbb::parallel_for( range_type(0, FLAT_RANGE, FLAT_GRAIN), ParForBody(), partitioner );
189             else {
190                 SimpleParReduceBody<ParForBody> rb;
191                 tbb::parallel_reduce( range_type(0, FLAT_RANGE, FLAT_GRAIN), rb, partitioner );
192             }
193         CATCH_AND_ASSERT();
194         // two cases: g_SolitaryException and !g_SolitaryException
195         //   1) g_SolitaryException: only one thread actually threw.  There is only one context, so the exception
196         //      (when caught) will cause that context to be cancelled.  After this event, there may be one or
197         //      more threads which are "in-flight", up to g_NumThreads, but no more will be started.  The threads,
198         //      when they start, if they see they are cancelled, TGCCancelled is incremented.
199         //   2) !g_SolitaryException: more than one thread can throw.  The number of threads that actually
200         //      threw is g_MasterExecutedThrow if only the master is allowed, else g_NonMasterExecutedThrow.
201         //      Only one context, so TGCCancelled should be <= g_NumThreads.
202         //
203         // the reasoning is similar for nested algorithms in a single context (Test2).
204         //
205         // If a thread throws in a context, more than one subsequent task body may see the
206         // cancelled state (if they are scheduled before the state is propagated.) this is
207         // infrequent, but it occurs.  So what was to be an assertion must be a remark.
208         g_TGCCancelled.validate( g_NumThreads, "Too many tasks ran after exception thrown");
209         REQUIRE_MESSAGE(g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
210         if ( g_SolitaryException ) {
211             REQUIRE_MESSAGE(g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test");
212             REQUIRE_MESSAGE(g_NumExceptionsCaught == (g_ExceptionInMaster ? g_MasterExecutedThrow : g_NonMasterExecutedThrow),
213                 "Not all throws were caught");
214             REQUIRE_MESSAGE(g_ExecutedAtFirstCatch == g_ExecutedAtLastCatch, "Too many exceptions occurred");
215         }
216         else {
217             REQUIRE_MESSAGE(g_NumExceptionsCaught >= 1, "No try blocks in any body expected in this test");
218         }
219     }
220 }  // TestParallelLoopAux
221 
222 //! Test with parallel_for and parallel_reduce, over all three kinds of partitioners.
223 /** The Body only needs to be suitable for tbb::parallel_for. */
224 template<typename Body>
225 void TestParallelLoop() {
226     // The simple and auto partitioners should be const, but not the affinity partitioner.
227     TestParallelLoopAux<Body, const tbb::simple_partitioner  >();
228     TestParallelLoopAux<Body, const tbb::auto_partitioner    >();
229 #define __TBB_TEMPORARILY_DISABLED 1
230 #if !__TBB_TEMPORARILY_DISABLED
231     // TODO: Improve the test so that it tolerates delayed start of tasks with affinity_partitioner
232     TestParallelLoopAux<Body, /***/ tbb::affinity_partitioner>();
233 #endif
234 #undef __TBB_TEMPORARILY_DISABLED
235 }
236 
237 class SimpleParForBody {
238 public:
239     void operator=(const SimpleParForBody&) = delete;
240     SimpleParForBody(const SimpleParForBody&) = default;
241     SimpleParForBody() = default;
242 
243     void operator()( const range_type& r ) const {
244         utils::ConcurrencyTracker ct;
245         volatile long x = 0;
246         ++g_CurExecuted;
247         if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true;
248         else g_NonMasterExecuted = true;
249         if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment();
250         for( count_type i = r.begin(); i != r.end(); ++i )
251             x += 0;
252         WaitUntilConcurrencyPeaks();
253         ThrowTestException(1);
254     }
255 };
256 
257 void Test1() {
258     // non-nested parallel_for/reduce with throwing body, one context
259     TestParallelLoop<SimpleParForBody>();
260 } // void Test1 ()
261 
262 class OuterParForBody {
263 public:
264     void operator=(const OuterParForBody&) = delete;
265     OuterParForBody(const OuterParForBody&) = default;
266     OuterParForBody() = default;
267     void operator()( const range_type& ) const {
268         utils::ConcurrencyTracker ct;
269         ++g_OuterParCalls;
270         tbb::parallel_for( tbb::blocked_range<size_t>(0, INNER_RANGE, INNER_GRAIN), SimpleParForBody() );
271     }
272 };
273 
274 //! Uses parallel_for body containing an inner parallel_for with the default context not wrapped by a try-block.
275 /** Inner algorithms are spawned inside the new bound context by default. Since
276     exceptions thrown from the inner parallel_for are not handled by the caller
277     (outer parallel_for body) in this test, they will cancel all the sibling inner
278     algorithms. **/
279 void Test2 () {
280     TestParallelLoop<OuterParForBody>();
281 } // void Test2 ()
282 
283 class OuterParForBodyWithIsolatedCtx {
284 public:
285     void operator()( const range_type& ) const {
286         tbb::task_group_context ctx(tbb::task_group_context::isolated);
287         ++g_OuterParCalls;
288         tbb::parallel_for( tbb::blocked_range<size_t>(0, INNER_RANGE, INNER_GRAIN), SimpleParForBody(), tbb::simple_partitioner(), ctx );
289     }
290 };
291 
292 //! Uses parallel_for body invoking an inner parallel_for with an isolated context without a try-block.
293 /** Even though exceptions thrown from the inner parallel_for are not handled
294     by the caller in this test, they will not affect sibling inner algorithms
295     already running because of the isolated contexts. However because the first
296     exception cancels the root parallel_for only the first g_NumThreads subranges
297     will be processed (which launch inner parallel_fors) **/
298 void Test3 () {
299     ResetGlobals();
300     typedef OuterParForBodyWithIsolatedCtx body_type;
301     intptr_t  innerCalls = NumSubranges(INNER_RANGE, INNER_GRAIN),
302             // we expect one thread to throw without counting, the rest to run to completion
303             // this formula assumes g_numThreads outer pfor ranges will be started, but that is not the
304             // case; the SimpleParFor subranges are started up as part of the outer ones, and when
305             // the amount of concurrency reaches g_NumThreads no more outer Pfor ranges are started.
306             // so we have to count the number of outer Pfors actually started.
307             minExecuted = (g_NumThreads - 1) * innerCalls;
308     TRY();
309         tbb::parallel_for( range_type(0, OUTER_RANGE, OUTER_GRAIN), body_type() );
310     CATCH_AND_ASSERT();
311     minExecuted = (g_OuterParCalls - 1) * innerCalls;  // see above
312 
313     // The first formula above assumes all ranges of the outer parallel for are executed, and one
314     // cancels.  In the event, we have a smaller number of ranges that start before the exception
315     // is caught.
316     //
317     //  g_SolitaryException:One inner range throws.  Outer parallel_For is cancelled, but sibling
318     //                      parallel_fors continue to completion (unless the threads that execute
319     //                      are not allowed to throw, in which case we will not see any exceptions).
320     // !g_SolitaryException:multiple inner ranges may throw.  Any which throws will stop, and the
321     //                      corresponding range of the outer pfor will stop also.
322     //
323     // In either case, once the outer pfor gets the exception it will stop executing further ranges.
324 
325     // if the only threads executing were not allowed to throw, then not seeing an exception is okay.
326     bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecuted) || (!g_ExceptionInMaster && !g_NonMasterExecuted);
327     if ( g_SolitaryException ) {
328         g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception");
329         REQUIRE_MESSAGE (g_CurExecuted > minExecuted, "Too few tasks survived exception");
330         REQUIRE_MESSAGE ((g_CurExecuted <= minExecuted + (g_ExecutedAtLastCatch + g_NumThreads)), "Too many tasks survived exception");
331         REQUIRE_MESSAGE ((g_NumExceptionsCaught == 1 || okayNoExceptionsCaught), "No try_blocks in any body expected in this test");
332     }
333     else {
334         REQUIRE_MESSAGE ((g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads), "Too many tasks survived exception");
335         REQUIRE_MESSAGE ((g_NumExceptionsCaught >= 1 || okayNoExceptionsCaught), "No try_blocks in any body expected in this test");
336     }
337 } // void Test3 ()
338 
339 class OuterParForExceptionSafeBody {
340 public:
341     void operator()( const range_type& ) const {
342         tbb::task_group_context ctx(tbb::task_group_context::isolated);
343         ++g_OuterParCalls;
344         TRY();
345             tbb::parallel_for( tbb::blocked_range<size_t>(0, INNER_RANGE, INNER_GRAIN), SimpleParForBody(), tbb::simple_partitioner(), ctx );
346         CATCH();  // this macro sets g_ExceptionCaught
347     }
348 };
349 
350 //! Uses parallel_for body invoking an inner parallel_for (with isolated context) inside a try-block.
351 /** Since exception(s) thrown from the inner parallel_for are handled by the caller
352     in this test, they do not affect neither other tasks of the the root parallel_for
353     nor sibling inner algorithms. **/
354 void Test4 () {
355     ResetGlobals( true, true );
356     intptr_t  innerCalls = NumSubranges(INNER_RANGE, INNER_GRAIN),
357               outerCalls = NumSubranges(OUTER_RANGE, OUTER_GRAIN);
358     TRY();
359         tbb::parallel_for( range_type(0, OUTER_RANGE, OUTER_GRAIN), OuterParForExceptionSafeBody() );
360     CATCH();
361     // g_SolitaryException  : one inner pfor will throw, the rest will execute to completion.
362     //                        so the count should be (outerCalls -1) * innerCalls, if a throw happened.
363     // !g_SolitaryException : possible multiple inner pfor throws.  Should be approximately
364     //                        (outerCalls - g_NumExceptionsCaught) * innerCalls, give or take a few
365     intptr_t  minExecuted = (outerCalls - g_NumExceptionsCaught) * innerCalls;
366     bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecuted) || (!g_ExceptionInMaster && !g_NonMasterExecuted);
367     if ( g_SolitaryException ) {
368         // only one task had exception thrown. That task had at least one execution (the one that threw).
369         // There may be an arbitrary number of ranges executed after the throw but before the exception
370         // is caught in the scheduler and cancellation is signaled.  (seen 9, 11 and 62 (!) for 8 threads)
371         REQUIRE_MESSAGE ((g_NumExceptionsCaught == 1 || okayNoExceptionsCaught), "No exception registered");
372         REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too few tasks executed");
373         g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception");
374         // a small number of threads can execute in a throwing sub-pfor, if the task which is
375         // to do the solitary throw swaps out after registering its intent to throw but before it
376         // actually does so. As a result, the number of extra tasks cannot exceed the number of thread
377         // for each nested pfor invication)
378         REQUIRE_MESSAGE (g_CurExecuted <= minExecuted + (g_NumThreads-1)*g_NumThreads/2, "Too many tasks survived exception");
379     }
380     else {
381         REQUIRE_MESSAGE (((g_NumExceptionsCaught >= 1 && g_NumExceptionsCaught <= outerCalls) || okayNoExceptionsCaught), "Unexpected actual number of exceptions");
382         REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too few executed tasks reported");
383         REQUIRE_MESSAGE ((g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads), "Too many tasks survived multiple exceptions");
384         REQUIRE_MESSAGE (g_CurExecuted <= outerCalls * (1 + g_NumThreads), "Too many tasks survived exception");
385     }
386 } // void Test4 ()
387 
388 //! Testing parallel_for and parallel_reduce exception handling
389 //! \brief \ref error_guessing
390 TEST_CASE("parallel_for and parallel_reduce exception handling test #0") {
391     for (auto concurrency_level: utils::concurrency_range()) {
392         g_NumThreads = static_cast<int>(concurrency_level);
393         g_Master = std::this_thread::get_id();
394         if (g_NumThreads > 1) {
395             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
396             // Execute in all the possible modes
397             for ( size_t j = 0; j < 4; ++j ) {
398                 g_ExceptionInMaster = (j & 1) != 0;
399                 g_SolitaryException = (j & 2) != 0;
400 
401                 Test0();
402             }
403         }
404     }
405 }
406 
407 //! Testing parallel_for and parallel_reduce exception handling
408 //! \brief \ref error_guessing
409 TEST_CASE("parallel_for and parallel_reduce exception handling test #1") {
410     for (auto concurrency_level: utils::concurrency_range()) {
411         g_NumThreads = static_cast<int>(concurrency_level);
412         g_Master = std::this_thread::get_id();
413         if (g_NumThreads > 1) {
414             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
415             // Execute in all the possible modes
416             for ( size_t j = 0; j < 4; ++j ) {
417                 g_ExceptionInMaster = (j & 1) != 0;
418                 g_SolitaryException = (j & 2) != 0;
419 
420                 Test1();
421             }
422         }
423     }
424 }
425 
426 //! Testing parallel_for and parallel_reduce exception handling
427 //! \brief \ref error_guessing
428 TEST_CASE("parallel_for and parallel_reduce exception handling test #2") {
429     for (auto concurrency_level: utils::concurrency_range()) {
430         g_NumThreads = static_cast<int>(concurrency_level);
431         g_Master = std::this_thread::get_id();
432         if (g_NumThreads > 1) {
433             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
434             // Execute in all the possible modes
435             for ( size_t j = 0; j < 4; ++j ) {
436                 g_ExceptionInMaster = (j & 1) != 0;
437                 g_SolitaryException = (j & 2) != 0;
438 
439                 Test2();
440             }
441         }
442     }
443 }
444 
445 //! Testing parallel_for and parallel_reduce exception handling
446 //! \brief \ref error_guessing
447 TEST_CASE("parallel_for and parallel_reduce exception handling test #3") {
448     for (auto concurrency_level: utils::concurrency_range()) {
449         g_NumThreads = static_cast<int>(concurrency_level);
450         g_Master = std::this_thread::get_id();
451         if (g_NumThreads > 1) {
452             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
453             // Execute in all the possible modes
454             for ( size_t j = 0; j < 4; ++j ) {
455                 g_ExceptionInMaster = (j & 1) != 0;
456                 g_SolitaryException = (j & 2) != 0;
457 
458                 Test3();
459             }
460         }
461     }
462 }
463 
464 //! Testing parallel_for and parallel_reduce exception handling
465 //! \brief \ref error_guessing
466 TEST_CASE("parallel_for and parallel_reduce exception handling test #4") {
467     for (auto concurrency_level: utils::concurrency_range()) {
468         g_NumThreads = static_cast<int>(concurrency_level);
469         g_Master = std::this_thread::get_id();
470         if (g_NumThreads > 1) {
471             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
472             // Execute in all the possible modes
473             for ( size_t j = 0; j < 4; ++j ) {
474                 g_ExceptionInMaster = (j & 1) != 0;
475                 g_SolitaryException = (j & 2) != 0;
476 
477                 Test4();
478             }
479         }
480     }
481 }
482 
483 #endif /* TBB_USE_EXCEPTIONS */
484 
485 class ParForBodyToCancel {
486 public:
487     void operator()( const range_type& ) const {
488         ++g_CurExecuted;
489         Cancellator::WaitUntilReady();
490     }
491 };
492 
493 template<class B>
494 class ParForLauncher {
495     tbb::task_group_context &my_ctx;
496 public:
497     void operator()() const {
498         tbb::parallel_for( range_type(0, FLAT_RANGE, FLAT_GRAIN), B(), tbb::simple_partitioner(), my_ctx );
499     }
500     ParForLauncher ( tbb::task_group_context& ctx ) : my_ctx(ctx) {}
501 };
502 
503 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm).
504 void TestCancelation1 () {
505     ResetGlobals( false );
506     RunCancellationTest<ParForLauncher<ParForBodyToCancel>, Cancellator>( NumSubranges(FLAT_RANGE, FLAT_GRAIN) / 4 );
507 }
508 
509 class Cancellator2  {
510     tbb::task_group_context &m_GroupToCancel;
511 
512 public:
513     void operator()() const {
514         utils::ConcurrencyTracker ct;
515         WaitUntilConcurrencyPeaks();
516         m_GroupToCancel.cancel_group_execution();
517         g_ExecutedAtLastCatch = g_CurExecuted.load();
518     }
519 
520     Cancellator2 ( tbb::task_group_context& ctx, intptr_t ) : m_GroupToCancel(ctx) {}
521 };
522 
523 class ParForBodyToCancel2 {
524 public:
525     void operator()( const range_type& ) const {
526         ++g_CurExecuted;
527         utils::ConcurrencyTracker ct;
528         // The test will hang (and be timed out by the test system) if is_cancelled() is broken
529         while( !tbb::is_current_task_group_canceling() )
530             std::this_thread::yield();
531     }
532 };
533 
534 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm).
535 /** This version also tests tbb::is_current_task_group_canceling() method. **/
536 void TestCancelation2 () {
537     ResetGlobals();
538     RunCancellationTest<ParForLauncher<ParForBodyToCancel2>, Cancellator2>();
539     REQUIRE_MESSAGE (g_ExecutedAtLastCatch < g_NumThreads, "Somehow worker tasks started their execution before the cancellator task");
540     g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived cancellation");
541     REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Some tasks were executed after cancellation");
542 }
543 
544 ////////////////////////////////////////////////////////////////////////////////
545 // Regression test based on the contribution by the author of the following forum post:
546 // http://softwarecommunity.intel.com/isn/Community/en-US/forums/thread/30254959.aspx
547 
548 class Worker {
549     static const int max_nesting = 3;
550     static const int reduce_range = 1024;
551     static const int reduce_grain = 256;
552 public:
553     int DoWork (int level);
554     int Validate (int start_level) {
555         int expected = 1; // identity for multiplication
556         for(int i=start_level+1; i<max_nesting; ++i)
557              expected *= reduce_range;
558         return expected;
559     }
560 };
561 
562 class RecursiveParReduceBodyWithSharedWorker {
563     Worker * m_SharedWorker;
564     int m_NestingLevel;
565     int m_Result;
566 public:
567     RecursiveParReduceBodyWithSharedWorker ( RecursiveParReduceBodyWithSharedWorker& src, tbb::split )
568         : m_SharedWorker(src.m_SharedWorker)
569         , m_NestingLevel(src.m_NestingLevel)
570         , m_Result(0)
571     {}
572     RecursiveParReduceBodyWithSharedWorker ( Worker *w, int outer )
573         : m_SharedWorker(w)
574         , m_NestingLevel(outer)
575         , m_Result(0)
576     {}
577 
578     void operator() ( const tbb::blocked_range<size_t>& r ) {
579         if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true;
580         else g_NonMasterExecuted = true;
581         if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment();
582         for (size_t i = r.begin (); i != r.end (); ++i) {
583             m_Result += m_SharedWorker->DoWork (m_NestingLevel);
584         }
585     }
586     void join (const RecursiveParReduceBodyWithSharedWorker & x) {
587         m_Result += x.m_Result;
588     }
589     int result () { return m_Result; }
590 };
591 
592 int Worker::DoWork ( int level ) {
593     ++level;
594     if ( level < max_nesting ) {
595         RecursiveParReduceBodyWithSharedWorker rt (this, level);
596         tbb::parallel_reduce (tbb::blocked_range<size_t>(0, reduce_range, reduce_grain), rt);
597         return rt.result();
598     }
599     else
600         return 1;
601 }
602 
603 //! Regression test for hanging that occurred with the first version of cancellation propagation
604 void TestCancelation3 () {
605     Worker w;
606     int result   = w.DoWork (0);
607     int expected = w.Validate(0);
608     REQUIRE_MESSAGE ( result == expected, "Wrong calculation result");
609 }
610 
611 struct StatsCounters {
612     std::atomic<size_t> my_total_created;
613     std::atomic<size_t> my_total_deleted;
614     StatsCounters() {
615         my_total_created = 0;
616         my_total_deleted = 0;
617     }
618 };
619 
620 class ParReduceBody {
621     StatsCounters* my_stats;
622     size_t my_id;
623     bool my_exception;
624     tbb::task_group_context& tgc;
625 
626 public:
627     ParReduceBody( StatsCounters& s_, tbb::task_group_context& context, bool e_ ) :
628         my_stats(&s_), my_exception(e_), tgc(context) {
629         my_id = my_stats->my_total_created++;
630     }
631 
632     ParReduceBody( const ParReduceBody& lhs ) : tgc(lhs.tgc) {
633         my_stats = lhs.my_stats;
634         my_id = my_stats->my_total_created++;
635     }
636 
637     ParReduceBody( ParReduceBody& lhs, tbb::split ) : tgc(lhs.tgc) {
638         my_stats = lhs.my_stats;
639         my_id = my_stats->my_total_created++;
640     }
641 
642     ~ParReduceBody(){ ++my_stats->my_total_deleted; }
643 
644     void operator()( const tbb::blocked_range<std::size_t>& /*range*/ ) const {
645         //Do nothing, except for one task (chosen arbitrarily)
646         if( my_id >= 12 ) {
647             if( my_exception )
648                 ThrowTestException(1);
649             else
650                 tgc.cancel_group_execution();
651         }
652     }
653 
654     void join( ParReduceBody& /*rhs*/ ) {}
655 };
656 
657 void TestCancelation4() {
658     StatsCounters statsObj;
659 #if TBB_USE_EXCEPTIONS
660     try
661 #endif
662     {
663         tbb::task_group_context tgc1, tgc2;
664         ParReduceBody body_for_cancellation(statsObj, tgc1, false), body_for_exception(statsObj, tgc2, true);
665         tbb::parallel_reduce( tbb::blocked_range<std::size_t>(0,100000000,100), body_for_cancellation, tbb::simple_partitioner(), tgc1 );
666         tbb::parallel_reduce( tbb::blocked_range<std::size_t>(0,100000000,100), body_for_exception, tbb::simple_partitioner(), tgc2 );
667     }
668 #if TBB_USE_EXCEPTIONS
669     catch(...) {}
670 #endif
671     REQUIRE_MESSAGE ( statsObj.my_total_created==statsObj.my_total_deleted, "Not all parallel_reduce body objects created were reclaimed");
672 }
673 
674 //! Testing parallel_for and parallel_reduce cancellation
675 //! \brief \ref error_guessing
676 TEST_CASE("parallel_for and parallel_reduce cancellation test #1") {
677     for (auto concurrency_level: utils::concurrency_range()) {
678         g_NumThreads = static_cast<int>(concurrency_level);
679         g_Master = std::this_thread::get_id();
680         if (g_NumThreads > 1) {
681             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
682             // Execute in all the possible modes
683             for ( size_t j = 0; j < 4; ++j ) {
684                 g_ExceptionInMaster = (j & 1) != 0;
685                 g_SolitaryException = (j & 2) != 0;
686 
687                 TestCancelation1();
688             }
689         }
690     }
691 }
692 
693 //! Testing parallel_for and parallel_reduce cancellation
694 //! \brief \ref error_guessing
695 TEST_CASE("parallel_for and parallel_reduce cancellation test #2") {
696     for (auto concurrency_level: utils::concurrency_range()) {
697         g_NumThreads = static_cast<int>(concurrency_level);
698         g_Master = std::this_thread::get_id();
699         if (g_NumThreads > 1) {
700             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
701             // Execute in all the possible modes
702             for ( size_t j = 0; j < 4; ++j ) {
703                 g_ExceptionInMaster = (j & 1) != 0;
704                 g_SolitaryException = (j & 2) != 0;
705 
706                 TestCancelation2();
707             }
708         }
709     }
710 }
711 
712 //! Testing parallel_for and parallel_reduce cancellation
713 //! \brief \ref error_guessing
714 TEST_CASE("parallel_for and parallel_reduce cancellation test #3") {
715     for (auto concurrency_level: utils::concurrency_range()) {
716         g_NumThreads = static_cast<int>(concurrency_level);
717         g_Master = std::this_thread::get_id();
718         if (g_NumThreads > 1) {
719             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
720             // Execute in all the possible modes
721             for ( size_t j = 0; j < 4; ++j ) {
722                 g_ExceptionInMaster = (j & 1) != 0;
723                 g_SolitaryException = (j & 2) != 0;
724 
725                 TestCancelation3();
726             }
727         }
728     }
729 }
730 
731 //! Testing parallel_for and parallel_reduce cancellation
732 //! \brief \ref error_guessing
733 TEST_CASE("parallel_for and parallel_reduce cancellation test #4") {
734     for (auto concurrency_level: utils::concurrency_range()) {
735         g_NumThreads = static_cast<int>(concurrency_level);
736         g_Master = std::this_thread::get_id();
737         if (g_NumThreads > 1) {
738             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
739             // Execute in all the possible modes
740             for ( size_t j = 0; j < 4; ++j ) {
741                 g_ExceptionInMaster = (j & 1) != 0;
742                 g_SolitaryException = (j & 2) != 0;
743 
744                 TestCancelation4();
745             }
746         }
747     }
748 }
749 
750 ////////////////////////////////////////////////////////////////////////////////
751 // Tests for tbb::parallel_for_each
752 ////////////////////////////////////////////////////////////////////////////////
753 
754 #define ITER_RANGE          1000
755 #define ITEMS_TO_FEED       50
756 #define INNER_ITER_RANGE   100
757 #define OUTER_ITER_RANGE  50
758 
759 #define PREPARE_RANGE(Iterator, rangeSize)  \
760     size_t test_vector[rangeSize + 1]; \
761     for (int i =0; i < rangeSize; i++) \
762         test_vector[i] = i; \
763     Iterator begin(&test_vector[0]); \
764     Iterator end(&test_vector[rangeSize])
765 
766 void Feed ( tbb::feeder<size_t> &feeder, size_t val ) {
767     if (g_FedTasksCount < ITEMS_TO_FEED) {
768         ++g_FedTasksCount;
769         feeder.add(val);
770     }
771 }
772 
773 #define RunWithSimpleBody(func, body)       \
774     func<utils::ForwardIterator<size_t>, body>();         \
775     func<utils::ForwardIterator<size_t>, body##WithFeeder>()
776 
777 #define RunWithTemplatedBody(func, body)     \
778     func<utils::ForwardIterator<size_t>, body<utils::ForwardIterator<size_t> > >();         \
779     func<utils::ForwardIterator<size_t>, body##WithFeeder<utils::ForwardIterator<size_t> > >()
780 
781 #if TBB_USE_EXCEPTIONS
782 
783 // Simple functor object with exception
784 class SimpleParForEachBody {
785 public:
786     void operator() ( size_t &value ) const {
787         ++g_CurExecuted;
788         if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true;
789         else g_NonMasterExecuted = true;
790         if( tbb::is_current_task_group_canceling() ) {
791             g_TGCCancelled.increment();
792         }
793         utils::ConcurrencyTracker ct;
794         value += 1000;
795         WaitUntilConcurrencyPeaks();
796         ThrowTestException(1);
797     }
798 };
799 
800 // Simple functor object with exception and feeder
801 class SimpleParForEachBodyWithFeeder : SimpleParForEachBody {
802 public:
803     void operator() ( size_t &value, tbb::feeder<size_t> &feeder ) const {
804         Feed(feeder, 0);
805         SimpleParForEachBody::operator()(value);
806     }
807 };
808 
809 // Tests exceptions without nesting
810 template <class Iterator, class simple_body>
811 void Test1_parallel_for_each () {
812     ResetGlobals();
813     PREPARE_RANGE(Iterator, ITER_RANGE);
814     TRY();
815         tbb::parallel_for_each<Iterator, simple_body>(begin, end, simple_body() );
816     CATCH_AND_ASSERT();
817     REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
818     g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived cancellation");
819     REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test");
820     if ( !g_SolitaryException )
821         REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
822 
823 } // void Test1_parallel_for_each ()
824 
825 template <class Iterator>
826 class OuterParForEachBody {
827 public:
828     void operator()( size_t& /*value*/ ) const {
829         ++g_OuterParCalls;
830         PREPARE_RANGE(Iterator, INNER_ITER_RANGE);
831         tbb::parallel_for_each<Iterator, SimpleParForEachBody>(begin, end, SimpleParForEachBody());
832     }
833 };
834 
835 template <class Iterator>
836 class OuterParForEachBodyWithFeeder : OuterParForEachBody<Iterator> {
837 public:
838     void operator()( size_t& value, tbb::feeder<size_t>& feeder ) const {
839         Feed(feeder, 0);
840         OuterParForEachBody<Iterator>::operator()(value);
841     }
842 };
843 
844 //! Uses parallel_for_each body containing an inner parallel_for_each with the default context not wrapped by a try-block.
845 /** Inner algorithms are spawned inside the new bound context by default. Since
846     exceptions thrown from the inner parallel_for_each are not handled by the caller
847     (outer parallel_for_each body) in this test, they will cancel all the sibling inner
848     algorithms. **/
849 template <class Iterator, class outer_body>
850 void Test2_parallel_for_each () {
851     ResetGlobals();
852     PREPARE_RANGE(Iterator, ITER_RANGE);
853     TRY();
854         tbb::parallel_for_each<Iterator, outer_body >(begin, end, outer_body() );
855     CATCH_AND_ASSERT();
856     REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
857     g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived cancellation");
858     REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test");
859     if ( !g_SolitaryException )
860         REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
861 } // void Test2_parallel_for_each ()
862 
863 template <class Iterator>
864 class OuterParForEachBodyWithIsolatedCtx {
865 public:
866     void operator()( size_t& /*value*/ ) const {
867         tbb::task_group_context ctx(tbb::task_group_context::isolated);
868         ++g_OuterParCalls;
869         PREPARE_RANGE(Iterator, INNER_ITER_RANGE);
870         tbb::parallel_for_each<Iterator, SimpleParForEachBody>(begin, end, SimpleParForEachBody(), ctx);
871     }
872 };
873 
874 template <class Iterator>
875 class OuterParForEachBodyWithIsolatedCtxWithFeeder : OuterParForEachBodyWithIsolatedCtx<Iterator> {
876 public:
877     void operator()( size_t& value, tbb::feeder<size_t> &feeder ) const {
878         Feed(feeder, 0);
879         OuterParForEachBodyWithIsolatedCtx<Iterator>::operator()(value);
880     }
881 };
882 
883 //! Uses parallel_for_each body invoking an inner parallel_for_each with an isolated context without a try-block.
884 /** Even though exceptions thrown from the inner parallel_for_each are not handled
885     by the caller in this test, they will not affect sibling inner algorithms
886     already running because of the isolated contexts. However because the first
887     exception cancels the root parallel_for_each, at most the first g_NumThreads subranges
888     will be processed (which launch inner parallel_for_eachs) **/
889 template <class Iterator, class outer_body>
890 void Test3_parallel_for_each () {
891     ResetGlobals();
892     PREPARE_RANGE(Iterator, OUTER_ITER_RANGE);
893     intptr_t innerCalls = INNER_ITER_RANGE,
894              // The assumption here is the same as in outer parallel fors.
895              minExecuted = (g_NumThreads - 1) * innerCalls;
896     g_Master = std::this_thread::get_id();
897     TRY();
898         tbb::parallel_for_each<Iterator, outer_body >(begin, end, outer_body());
899     CATCH_AND_ASSERT();
900     // figure actual number of expected executions given the number of outer PDos started.
901     minExecuted = (g_OuterParCalls - 1) * innerCalls;
902     // one extra thread may run a task that sees cancellation.  Infrequent but possible
903     g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception");
904     if ( g_SolitaryException ) {
905         REQUIRE_MESSAGE (g_CurExecuted > minExecuted, "Too few tasks survived exception");
906         REQUIRE_MESSAGE (g_CurExecuted <= minExecuted + (g_ExecutedAtLastCatch + g_NumThreads), "Too many tasks survived exception");
907     }
908     REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test");
909     if ( !g_SolitaryException )
910         REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
911 } // void Test3_parallel_for_each ()
912 
913 template <class Iterator>
914 class OuterParForEachWithEhBody {
915 public:
916     void operator()( size_t& /*value*/ ) const {
917         tbb::task_group_context ctx(tbb::task_group_context::isolated);
918         ++g_OuterParCalls;
919         PREPARE_RANGE(Iterator, INNER_ITER_RANGE);
920         TRY();
921             tbb::parallel_for_each<Iterator, SimpleParForEachBody>(begin, end, SimpleParForEachBody(), ctx);
922         CATCH();
923     }
924 };
925 
926 template <class Iterator>
927 class OuterParForEachWithEhBodyWithFeeder : OuterParForEachWithEhBody<Iterator> {
928 public:
929     void operator=(const OuterParForEachWithEhBodyWithFeeder&) = delete;
930     OuterParForEachWithEhBodyWithFeeder(const OuterParForEachWithEhBodyWithFeeder&) = default;
931     OuterParForEachWithEhBodyWithFeeder() = default;
932     void operator()( size_t &value, tbb::feeder<size_t> &feeder ) const {
933         Feed(feeder, 0);
934         OuterParForEachWithEhBody<Iterator>::operator()(value);
935     }
936 };
937 
938 //! Uses parallel_for body invoking an inner parallel_for (with default bound context) inside a try-block.
939 /** Since exception(s) thrown from the inner parallel_for are handled by the caller
940     in this test, they do not affect neither other tasks of the the root parallel_for
941     nor sibling inner algorithms. **/
942 template <class Iterator, class outer_body_with_eh>
943 void Test4_parallel_for_each () {
944     ResetGlobals( true, true );
945     PREPARE_RANGE(Iterator, OUTER_ITER_RANGE);
946     g_Master = std::this_thread::get_id();
947     TRY();
948         tbb::parallel_for_each<Iterator, outer_body_with_eh>(begin, end, outer_body_with_eh());
949     CATCH();
950     REQUIRE_MESSAGE (!l_ExceptionCaughtAtCurrentLevel, "All exceptions must have been handled in the parallel_for_each body");
951     intptr_t innerCalls = INNER_ITER_RANGE,
952              outerCalls = OUTER_ITER_RANGE + g_FedTasksCount,
953              maxExecuted = outerCalls * innerCalls,
954              minExecuted = 0;
955     g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception");
956     if ( g_SolitaryException ) {
957         minExecuted = maxExecuted - innerCalls;
958         REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No exception registered");
959         REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too few tasks executed");
960         // This test has the same property as Test4 (parallel_for); the exception can be
961         // thrown, but some number of tasks from the outer Pdo can execute after the throw but
962         // before the cancellation is signaled (have seen 36).
963         DOCTEST_WARN_MESSAGE(g_CurExecuted < maxExecuted, "All tasks survived exception. Oversubscription?");
964     }
965     else {
966         minExecuted = g_NumExceptionsCaught;
967         REQUIRE_MESSAGE ((g_NumExceptionsCaught > 1 && g_NumExceptionsCaught <= outerCalls), "Unexpected actual number of exceptions");
968         REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too many executed tasks reported");
969         REQUIRE_MESSAGE (g_CurExecuted < g_ExecutedAtLastCatch + g_NumThreads + outerCalls, "Too many tasks survived multiple exceptions");
970         REQUIRE_MESSAGE (g_CurExecuted <= outerCalls * (1 + g_NumThreads), "Too many tasks survived exception");
971     }
972 } // void Test4_parallel_for_each ()
973 
974 // This body throws an exception only if the task was added by feeder
975 class ParForEachBodyWithThrowingFeederTasks {
976 public:
977     //! This form of the function call operator can be used when the body needs to add more work during the processing
978     void operator() ( size_t &value, tbb::feeder<size_t> &feeder ) const {
979         ++g_CurExecuted;
980         if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true;
981         else g_NonMasterExecuted = true;
982         if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment();
983         Feed(feeder, 1);
984         if (value == 1)
985             ThrowTestException(1);
986     }
987 }; // class ParForEachBodyWithThrowingFeederTasks
988 
989 // Test exception in task, which was added by feeder.
990 template <class Iterator>
991 void Test5_parallel_for_each () {
992     ResetGlobals();
993     PREPARE_RANGE(Iterator, ITER_RANGE);
994     g_Master = std::this_thread::get_id();
995     TRY();
996         tbb::parallel_for_each<Iterator, ParForEachBodyWithThrowingFeederTasks>(begin, end, ParForEachBodyWithThrowingFeederTasks());
997     CATCH();
998     if (g_SolitaryException) {
999         // Failure occurs when g_ExceptionInMaster is false, but all the 1 values in the range
1000         // are handled by the master thread.  In this case no throw occurs.
1001         REQUIRE_MESSAGE ((l_ExceptionCaughtAtCurrentLevel               // we saw an exception
1002                 || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow)  // non-master throws but none tried
1003                 || (g_ExceptionInMaster && !g_MasterExecutedThrow))     // master throws but master didn't try
1004                 , "At least one exception should occur");
1005     }
1006 } // void Test5_parallel_for_each ()
1007 
1008 //! Testing parallel_for_each exception handling
1009 //! \brief \ref error_guessing
1010 TEST_CASE("parallel_for_each exception handling test #1") {
1011     for (auto concurrency_level: utils::concurrency_range()) {
1012         g_NumThreads = static_cast<int>(concurrency_level);
1013         g_Master = std::this_thread::get_id();
1014         if (g_NumThreads > 1) {
1015             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
1016             // Execute in all the possible modes
1017             for ( size_t j = 0; j < 4; ++j ) {
1018                 g_ExceptionInMaster = (j & 1) != 0;
1019                 g_SolitaryException = (j & 2) != 0;
1020 
1021                 RunWithSimpleBody(Test1_parallel_for_each, SimpleParForEachBody);
1022             }
1023         }
1024     }
1025 }
1026 
1027 //! Testing parallel_for_each exception handling
1028 //! \brief \ref error_guessing
1029 TEST_CASE("parallel_for_each exception handling test #2") {
1030     for (auto concurrency_level: utils::concurrency_range()) {
1031         g_NumThreads = static_cast<int>(concurrency_level);
1032         g_Master = std::this_thread::get_id();
1033         if (g_NumThreads > 1) {
1034             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
1035             // Execute in all the possible modes
1036             for ( size_t j = 0; j < 4; ++j ) {
1037                 g_ExceptionInMaster = (j & 1) != 0;
1038                 g_SolitaryException = (j & 2) != 0;
1039 
1040                 RunWithTemplatedBody(Test2_parallel_for_each, OuterParForEachBody);
1041             }
1042         }
1043     }
1044 }
1045 
1046 //! Testing parallel_for_each exception handling
1047 //! \brief \ref error_guessing
1048 TEST_CASE("parallel_for_each exception handling test #3") {
1049     for (auto concurrency_level: utils::concurrency_range()) {
1050         g_NumThreads = static_cast<int>(concurrency_level);
1051         g_Master = std::this_thread::get_id();
1052         if (g_NumThreads > 1) {
1053             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
1054             // Execute in all the possible modes
1055             for ( size_t j = 0; j < 4; ++j ) {
1056                 g_ExceptionInMaster = (j & 1) != 0;
1057                 g_SolitaryException = (j & 2) != 0;
1058 
1059                 RunWithTemplatedBody(Test3_parallel_for_each, OuterParForEachBodyWithIsolatedCtx);
1060             }
1061         }
1062     }
1063 }
1064 
1065 //! Testing parallel_for_each exception handling
1066 //! \brief \ref error_guessing
1067 TEST_CASE("parallel_for_each exception handling test #4") {
1068     for (auto concurrency_level: utils::concurrency_range()) {
1069         g_NumThreads = static_cast<int>(concurrency_level);
1070         g_Master = std::this_thread::get_id();
1071         if (g_NumThreads > 1) {
1072             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
1073             // Execute in all the possible modes
1074             for ( size_t j = 0; j < 4; ++j ) {
1075                 g_ExceptionInMaster = (j & 1) != 0;
1076                 g_SolitaryException = (j & 2) != 0;
1077 
1078                 RunWithTemplatedBody(Test4_parallel_for_each, OuterParForEachWithEhBody);
1079             }
1080         }
1081     }
1082 }
1083 
1084 //! Testing parallel_for_each exception handling
1085 //! \brief \ref error_guessing
1086 TEST_CASE("parallel_for_each exception handling test #5") {
1087     for (auto concurrency_level: utils::concurrency_range()) {
1088         g_NumThreads = static_cast<int>(concurrency_level);
1089         g_Master = std::this_thread::get_id();
1090         if (g_NumThreads > 1) {
1091             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
1092             // Execute in all the possible modes
1093             for ( size_t j = 0; j < 4; ++j ) {
1094                 g_ExceptionInMaster = (j & 1) != 0;
1095                 g_SolitaryException = (j & 2) != 0;
1096 
1097                 Test5_parallel_for_each<utils::InputIterator<size_t> >();
1098                 Test5_parallel_for_each<utils::ForwardIterator<size_t> >();
1099                 Test5_parallel_for_each<utils::RandomIterator<size_t> >();
1100             }
1101         }
1102     }
1103 }
1104 
1105 #endif /* TBB_USE_EXCEPTIONS */
1106 
1107 class ParForEachBodyToCancel {
1108 public:
1109     void operator()( size_t& /*value*/ ) const {
1110         ++g_CurExecuted;
1111         Cancellator::WaitUntilReady();
1112     }
1113 };
1114 
1115 class ParForEachBodyToCancelWithFeeder : ParForEachBodyToCancel {
1116 public:
1117     void operator()( size_t& value, tbb::feeder<size_t> &feeder ) const {
1118         Feed(feeder, 0);
1119         ParForEachBodyToCancel::operator()(value);
1120     }
1121 };
1122 
1123 template<class B, class Iterator>
1124 class ParForEachWorker {
1125     tbb::task_group_context &my_ctx;
1126 public:
1127     void operator()() const {
1128         PREPARE_RANGE(Iterator, INNER_ITER_RANGE);
1129         tbb::parallel_for_each<Iterator, B>( begin, end, B(), my_ctx );
1130     }
1131 
1132     ParForEachWorker ( tbb::task_group_context& ctx ) : my_ctx(ctx) {}
1133 };
1134 
1135 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm).
1136 template <class Iterator, class body_to_cancel>
1137 void TestCancelation1_parallel_for_each () {
1138     ResetGlobals( false );
1139     intptr_t  threshold = 10;
1140     tbb::task_group tg;
1141     tbb::task_group_context  ctx;
1142     Cancellator cancellator(ctx, threshold);
1143     ParForEachWorker<body_to_cancel, Iterator> worker(ctx);
1144     tg.run( cancellator );
1145     std::this_thread::yield();
1146     tg.run( worker );
1147     TRY();
1148         tg.wait();
1149     CATCH_AND_FAIL();
1150     REQUIRE_MESSAGE (g_CurExecuted < g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks were executed after cancellation");
1151 }
1152 
1153 class ParForEachBodyToCancel2 {
1154 public:
1155     void operator()( size_t& /*value*/ ) const {
1156         ++g_CurExecuted;
1157         utils::ConcurrencyTracker ct;
1158         // The test will hang (and be timed out by the test system) if is_cancelled() is broken
1159         while( !tbb::is_current_task_group_canceling() )
1160             std::this_thread::yield();
1161     }
1162 };
1163 
1164 class ParForEachBodyToCancel2WithFeeder : ParForEachBodyToCancel2 {
1165 public:
1166     void operator()( size_t& value, tbb::feeder<size_t> &feeder ) const {
1167         Feed(feeder, 0);
1168         ParForEachBodyToCancel2::operator()(value);
1169     }
1170 };
1171 
1172 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm).
1173 /** This version also tests tbb::is_current_task_group_canceling() method. **/
1174 template <class Iterator, class body_to_cancel>
1175 void TestCancelation2_parallel_for_each () {
1176     ResetGlobals();
1177     RunCancellationTest<ParForEachWorker<body_to_cancel, Iterator>, Cancellator2>();
1178 }
1179 
1180 //! Testing parallel_for_each cancellation test
1181 //! \brief \ref error_guessing
1182 TEST_CASE("parallel_for_each cancellation test #1") {
1183     for (auto concurrency_level: utils::concurrency_range()) {
1184         g_NumThreads = static_cast<int>(concurrency_level);
1185         g_Master = std::this_thread::get_id();
1186         if (g_NumThreads > 1) {
1187             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
1188             // Execute in all the possible modes
1189             for ( size_t j = 0; j < 4; ++j ) {
1190                 g_ExceptionInMaster = (j & 1) != 0;
1191                 g_SolitaryException = (j & 2) != 0;
1192 
1193                 RunWithSimpleBody(TestCancelation1_parallel_for_each, ParForEachBodyToCancel);
1194             }
1195         }
1196     }
1197 }
1198 
1199 //! Testing parallel_for_each cancellation test
1200 //! \brief \ref error_guessing
1201 TEST_CASE("parallel_for_each cancellation test #2") {
1202     for (auto concurrency_level: utils::concurrency_range()) {
1203         g_NumThreads = static_cast<int>(concurrency_level);
1204         g_Master = std::this_thread::get_id();
1205         if (g_NumThreads > 1) {
1206             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
1207             // Execute in all the possible modes
1208             for ( size_t j = 0; j < 4; ++j ) {
1209                 g_ExceptionInMaster = (j & 1) != 0;
1210                 g_SolitaryException = (j & 2) != 0;
1211 
1212                 RunWithSimpleBody(TestCancelation2_parallel_for_each, ParForEachBodyToCancel2);
1213             }
1214         }
1215     }
1216 }
1217 
1218 ////////////////////////////////////////////////////////////////////////////////
1219 // Tests for tbb::parallel_pipeline
1220 ////////////////////////////////////////////////////////////////////////////////
1221 #define NUM_ITEMS   100
1222 
1223 const size_t c_DataEndTag = size_t(~0);
1224 
1225 int g_NumTokens = 0;
1226 
1227 // Simple input filter class, it assigns 1 to all array members
1228 // It stops when it receives item equal to -1
1229 class InputFilter {
1230     mutable std::atomic<size_t> m_Item{};
1231     mutable size_t m_Buffer[NUM_ITEMS + 1];
1232 public:
1233     InputFilter() {
1234         m_Item = 0;
1235         for (size_t i = 0; i < NUM_ITEMS; ++i )
1236             m_Buffer[i] = 1;
1237         m_Buffer[NUM_ITEMS] = c_DataEndTag;
1238     }
1239     InputFilter(const InputFilter& other) : m_Item(other.m_Item.load()) {
1240         for (size_t i = 0; i < NUM_ITEMS; ++i )
1241             m_Buffer[i] = other.m_Buffer[i];
1242     }
1243 
1244     void* operator()(tbb::flow_control& control) const {
1245         size_t item = m_Item++;
1246         if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true;
1247         else g_NonMasterExecuted = true;
1248         if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment();
1249         if(item == 1) {
1250             ++g_PipelinesStarted;   // count on emitting the first item.
1251         }
1252         if ( item >= NUM_ITEMS ) {
1253             control.stop();
1254             return nullptr;
1255         }
1256         m_Buffer[item] = 1;
1257         return &m_Buffer[item];
1258     }
1259 
1260     size_t* buffer() { return m_Buffer; }
1261 }; // class InputFilter
1262 
1263 #if TBB_USE_EXCEPTIONS
1264 
1265 // Simple filter with exception throwing.  If parallel, will wait until
1266 // as many parallel filters start as there are threads.
1267 class SimpleFilter {
1268     bool m_canThrow;
1269     bool m_serial;
1270 public:
1271     SimpleFilter (bool canThrow, bool serial ) : m_canThrow(canThrow), m_serial(serial) {}
1272     void* operator()(void* item) const {
1273         ++g_CurExecuted;
1274         if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true;
1275         else g_NonMasterExecuted = true;
1276         if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment();
1277         if ( m_canThrow ) {
1278             if ( !m_serial ) {
1279                 utils::ConcurrencyTracker ct;
1280                 WaitUntilConcurrencyPeaks( std::min(g_NumTokens, g_NumThreads) );
1281             }
1282             ThrowTestException(1);
1283         }
1284         return item;
1285     }
1286 }; // class SimpleFilter
1287 
1288 // This enumeration represents filters order in pipeline
1289 struct FilterSet {
1290     tbb::filter_mode mode1, mode2;
1291     bool throw1, throw2;
1292 
1293     FilterSet( tbb::filter_mode m1, tbb::filter_mode m2, bool t1, bool t2 )
1294         : mode1(m1), mode2(m2), throw1(t1), throw2(t2)
1295     {}
1296 }; // struct FilterSet
1297 
1298 FilterSet serial_parallel( tbb::filter_mode::serial_in_order, tbb::filter_mode::parallel, /*throw1*/false, /*throw2*/true );
1299 
1300 template<typename InFilter, typename Filter>
1301 class CustomPipeline {
1302     InFilter inputFilter;
1303     Filter filter1;
1304     Filter filter2;
1305     FilterSet my_filters;
1306 public:
1307     CustomPipeline( const FilterSet& filters )
1308         : filter1(filters.throw1, filters.mode1 != tbb::filter_mode::parallel),
1309           filter2(filters.throw2, filters.mode2 != tbb::filter_mode::parallel),
1310           my_filters(filters)
1311     {}
1312 
1313     void run () {
1314         tbb::parallel_pipeline(
1315             g_NumTokens,
1316             tbb::make_filter<void, void*>(tbb::filter_mode::parallel, inputFilter) &
1317             tbb::make_filter<void*, void*>(my_filters.mode1, filter1) &
1318             tbb::make_filter<void*, void>(my_filters.mode2, filter2)
1319         );
1320     }
1321     void run ( tbb::task_group_context& ctx ) {
1322         tbb::parallel_pipeline(
1323             g_NumTokens,
1324             tbb::make_filter<void, void*>(tbb::filter_mode::parallel, inputFilter) &
1325             tbb::make_filter<void*, void*>(my_filters.mode1, filter1) &
1326             tbb::make_filter<void*, void>(my_filters.mode2, filter2),
1327             ctx
1328         );
1329     }
1330 };
1331 
1332 typedef CustomPipeline<InputFilter, SimpleFilter> SimplePipeline;
1333 
1334 // Tests exceptions without nesting
1335 void Test1_pipeline ( const FilterSet& filters ) {
1336     ResetGlobals();
1337     SimplePipeline testPipeline(filters);
1338     TRY();
1339         testPipeline.run();
1340         if ( g_CurExecuted == 2 * NUM_ITEMS ) {
1341             // all the items were processed, though an exception was supposed to occur.
1342             if(!g_ExceptionInMaster && g_NonMasterExecutedThrow > 0) {
1343                 // if !g_ExceptionInMaster, the master thread is not allowed to throw.
1344                 // if g_nonMasterExcutedThrow > 0 then a thread besides the master tried to throw.
1345                 REQUIRE_MESSAGE((filters.mode1 != tbb::filter_mode::parallel && filters.mode2 != tbb::filter_mode::parallel),
1346                     "Unusual count");
1347             }
1348             // In case of all serial filters they might be all executed in the thread(s)
1349             // where exceptions are not allowed by the common test logic. So we just quit.
1350             return;
1351         }
1352     CATCH_AND_ASSERT();
1353     g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception");
1354     REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
1355     REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test");
1356     if ( !g_SolitaryException )
1357         REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
1358 
1359 } // void Test1_pipeline ()
1360 
1361 // Filter with nesting
1362 class OuterFilter {
1363 public:
1364     OuterFilter ( bool, bool ) {}
1365 
1366     void* operator()(void* item) const {
1367         ++g_OuterParCalls;
1368         SimplePipeline testPipeline(serial_parallel);
1369         testPipeline.run();
1370         return item;
1371     }
1372 }; // class OuterFilter
1373 
1374 //! Uses pipeline containing an inner pipeline with the default context not wrapped by a try-block.
1375 /** Inner algorithms are spawned inside the new bound context by default. Since
1376     exceptions thrown from the inner pipeline are not handled by the caller
1377     (outer pipeline body) in this test, they will cancel all the sibling inner
1378     algorithms. **/
1379 void Test2_pipeline ( const FilterSet& filters ) {
1380     ResetGlobals();
1381     g_NestedPipelines = true;
1382     CustomPipeline<InputFilter, OuterFilter> testPipeline(filters);
1383     TRY();
1384         testPipeline.run();
1385     CATCH_AND_ASSERT();
1386     bool okayNoExceptionCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow);
1387     REQUIRE_MESSAGE ((g_NumExceptionsCaught == 1 || okayNoExceptionCaught), "No try_blocks in any body expected in this test");
1388     if ( !g_SolitaryException ) {
1389         REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
1390     }
1391 } // void Test2_pipeline ()
1392 
1393 //! creates isolated inner pipeline and runs it.
1394 class OuterFilterWithIsolatedCtx {
1395 public:
1396     OuterFilterWithIsolatedCtx( bool , bool ) {}
1397 
1398     void* operator()(void* item) const {
1399         ++g_OuterParCalls;
1400         tbb::task_group_context ctx(tbb::task_group_context::isolated);
1401         // create inner pipeline with serial input, parallel output filter, second filter throws
1402         SimplePipeline testPipeline(serial_parallel);
1403         testPipeline.run(ctx);
1404         return item;
1405     }
1406 }; // class OuterFilterWithIsolatedCtx
1407 
1408 //! Uses pipeline invoking an inner pipeline with an isolated context without a try-block.
1409 /** Even though exceptions thrown from the inner pipeline are not handled
1410     by the caller in this test, they will not affect sibling inner algorithms
1411     already running because of the isolated contexts. However because the first
1412     exception cancels the root parallel_for_each only the first g_NumThreads subranges
1413     will be processed (which launch inner pipelines) **/
1414 void Test3_pipeline ( const FilterSet& filters ) {
1415     for( int nTries = 1; nTries <= 4; ++nTries) {
1416         ResetGlobals();
1417         g_NestedPipelines = true;
1418         g_Master = std::this_thread::get_id();
1419         intptr_t innerCalls = NUM_ITEMS,
1420                  minExecuted = (g_NumThreads - 1) * innerCalls;
1421         CustomPipeline<InputFilter, OuterFilterWithIsolatedCtx> testPipeline(filters);
1422         TRY();
1423             testPipeline.run();
1424         CATCH_AND_ASSERT();
1425 
1426         bool okayNoExceptionCaught = (g_ExceptionInMaster && !g_MasterExecuted) ||
1427             (!g_ExceptionInMaster && !g_NonMasterExecuted);
1428         // only test assertions if the test threw an exception (or we don't care)
1429         bool testSucceeded = okayNoExceptionCaught || g_NumExceptionsCaught > 0;
1430         if(testSucceeded) {
1431             if (g_SolitaryException) {
1432 
1433                 // The test is one outer pipeline with two NestedFilters that each start an inner pipeline.
1434                 // Each time the input filter of a pipeline delivers its first item, it increments
1435                 // g_PipelinesStarted.  When g_SolitaryException, the throw will not occur until
1436                 // g_PipelinesStarted >= 3.  (This is so at least a second pipeline in its own isolated
1437                 // context will start; that is what we're testing.)
1438                 //
1439                 // There are two pipelines which will NOT run to completion when a solitary throw
1440                 // happens in an isolated inner context: the outer pipeline and the pipeline which
1441                 // throws.  All the other pipelines which start should run to completion.  But only
1442                 // inner body invocations are counted.
1443                 //
1444                 // So g_CurExecuted should be about
1445                 //
1446                 //   (2*NUM_ITEMS) * (g_PipelinesStarted - 2) + 1
1447                 //   ^ executions for each completed pipeline
1448                 //                   ^ completing pipelines (remembering two will not complete)
1449                 //                                              ^ one for the inner throwing pipeline
1450 
1451                 minExecuted = (2*NUM_ITEMS) * (g_PipelinesStarted - 2) + 1;
1452                 // each failing pipeline must execute at least two tasks
1453                 REQUIRE_MESSAGE(g_CurExecuted >= minExecuted, "Too few tasks survived exception");
1454                 // no more than g_NumThreads tasks will be executed in a cancelled context.  Otherwise
1455                 // tasks not executing at throw were scheduled.
1456                 g_TGCCancelled.validate(g_NumThreads, "Tasks not in-flight were executed");
1457                 REQUIRE_MESSAGE(g_NumExceptionsCaught == 1, "Should have only one exception");
1458                 // if we're only throwing from the master thread, and that thread didn't
1459                 // participate in the pipelines, then no throw occurred.
1460             }
1461             REQUIRE_MESSAGE ((g_NumExceptionsCaught == 1 || okayNoExceptionCaught), "No try_blocks in any body expected in this test");
1462             REQUIRE_MESSAGE (((g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads) || okayNoExceptionCaught), "Too many tasks survived exception");
1463             return;
1464         }
1465     }
1466 }
1467 
1468 class OuterFilterWithEhBody  {
1469 public:
1470     OuterFilterWithEhBody( bool, bool ){}
1471 
1472     void* operator()(void* item) const {
1473         tbb::task_group_context ctx(tbb::task_group_context::isolated);
1474         ++g_OuterParCalls;
1475         SimplePipeline testPipeline(serial_parallel);
1476         TRY();
1477             testPipeline.run(ctx);
1478         CATCH();
1479         return item;
1480     }
1481 }; // class OuterFilterWithEhBody
1482 
1483 //! Uses pipeline body invoking an inner pipeline (with isolated context) inside a try-block.
1484 /** Since exception(s) thrown from the inner pipeline are handled by the caller
1485     in this test, they do not affect other tasks of the the root pipeline
1486     nor sibling inner algorithms. **/
1487 void Test4_pipeline ( const FilterSet& filters ) {
1488 #if __GNUC__ && !__INTEL_COMPILER
1489     if ( strncmp(__VERSION__, "4.1.0", 5) == 0 ) {
1490         MESSAGE("Known issue: one of exception handling tests is skipped.");
1491         return;
1492     }
1493 #endif
1494     ResetGlobals( true, true );
1495     // each outer pipeline stage will start NUM_ITEMS inner pipelines.
1496     // each inner pipeline that doesn't throw will process NUM_ITEMS items.
1497     // for solitary exception there will be one pipeline that only processes one stage, one item.
1498     // innerCalls should be 2*NUM_ITEMS
1499     intptr_t innerCalls = 2*NUM_ITEMS,
1500              outerCalls = 2*NUM_ITEMS,
1501              maxExecuted = outerCalls * innerCalls;  // the number of invocations of the inner pipelines
1502     CustomPipeline<InputFilter, OuterFilterWithEhBody> testPipeline(filters);
1503     TRY();
1504         testPipeline.run();
1505     CATCH_AND_ASSERT();
1506     intptr_t  minExecuted = 0;
1507     bool okayNoExceptionCaught = (g_ExceptionInMaster && !g_MasterExecuted) ||
1508         (!g_ExceptionInMaster && !g_NonMasterExecuted);
1509     if ( g_SolitaryException ) {
1510         minExecuted = maxExecuted - innerCalls;  // one throwing inner pipeline
1511         REQUIRE_MESSAGE((g_NumExceptionsCaught == 1 || okayNoExceptionCaught), "No exception registered");
1512         g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception");  // probably will assert.
1513     }
1514     else {
1515         // we assume throwing pipelines will not count
1516         minExecuted = (outerCalls - g_NumExceptionsCaught) * innerCalls;
1517         REQUIRE_MESSAGE(((g_NumExceptionsCaught >= 1 && g_NumExceptionsCaught <= outerCalls) || okayNoExceptionCaught), "Unexpected actual number of exceptions");
1518         REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too many executed tasks reported");
1519         // too many already-scheduled tasks are started after the first exception is
1520         // thrown.  And g_ExecutedAtLastCatch is updated every time an exception is caught.
1521         // So with multiple exceptions there are a variable number of tasks that have been
1522         // discarded because of the signals.
1523         // each throw is caught, so we will see many cancelled tasks.  g_ExecutedAtLastCatch is
1524         // updated with each throw, so the value will be the number of tasks executed at the last
1525         REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived multiple exceptions");
1526     }
1527 } // void Test4_pipeline ()
1528 
1529 //! Tests pipeline function passed with different combination of filters
1530 template<void testFunc(const FilterSet&)>
1531 void TestWithDifferentFiltersAndConcurrency() {
1532     for (auto concurrency_level: utils::concurrency_range()) {
1533         g_NumThreads = static_cast<int>(concurrency_level);
1534         g_Master = std::this_thread::get_id();
1535         if (g_NumThreads > 1) {
1536             // Execute in all the possible modes
1537             for ( size_t j = 0; j < 4; ++j ) {
1538                 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
1539                 g_ExceptionInMaster = (j & 1) != 0;
1540                 g_SolitaryException = (j & 2) != 0;
1541                 g_NumTokens = 2 * g_NumThreads;
1542                 const int NumFilterTypes = 3;
1543                 const tbb::filter_mode modes[NumFilterTypes] = {
1544                     tbb::filter_mode::parallel,
1545                     tbb::filter_mode::serial_in_order,
1546                     tbb::filter_mode::serial_out_of_order
1547                 };
1548 
1549                 for ( int i = 0; i < NumFilterTypes; ++i ) {
1550                     for ( int n = 0; n < NumFilterTypes; ++n ) {
1551                         for ( int k = 0; k < 2; ++k )
1552                             testFunc( FilterSet(modes[i], modes[j], k == 0, k != 0) );
1553                     }
1554                 }
1555             }
1556         }
1557     }
1558 }
1559 
1560 //! Testing parallel_pipeline exception handling
1561 //! \brief \ref error_guessing
1562 TEST_CASE("parallel_pipeline exception handling test #1") {
1563     TestWithDifferentFiltersAndConcurrency<Test1_pipeline>();
1564 }
1565 
1566 //! Testing parallel_pipeline exception handling
1567 //! \brief \ref error_guessing
1568 TEST_CASE("parallel_pipeline exception handling test #2") {
1569     TestWithDifferentFiltersAndConcurrency<Test2_pipeline>();
1570 }
1571 
1572 //! Testing parallel_pipeline exception handling
1573 //! \brief \ref error_guessing
1574 TEST_CASE("parallel_pipeline exception handling test #3") {
1575     TestWithDifferentFiltersAndConcurrency<Test3_pipeline>();
1576 }
1577 
1578 //! Testing parallel_pipeline exception handling
1579 //! \brief \ref error_guessing
1580 TEST_CASE("parallel_pipeline exception handling test #4") {
1581     TestWithDifferentFiltersAndConcurrency<Test4_pipeline>();
1582 }
1583 
1584 #endif /* TBB_USE_EXCEPTIONS */
1585 
1586 class FilterToCancel  {
1587 public:
1588     FilterToCancel() {}
1589     void* operator()(void* item) const {
1590         ++g_CurExecuted;
1591         Cancellator::WaitUntilReady();
1592         return item;
1593     }
1594 }; // class FilterToCancel
1595 
1596 template <class Filter_to_cancel>
1597 class PipelineLauncher {
1598     tbb::task_group_context &my_ctx;
1599 public:
1600     PipelineLauncher ( tbb::task_group_context& ctx ) : my_ctx(ctx) {}
1601 
1602     void operator()() const {
1603         // Run test when serial filter is the first non-input filter
1604         InputFilter inputFilter;
1605         Filter_to_cancel filterToCancel;
1606         tbb::parallel_pipeline(
1607             g_NumTokens,
1608             tbb::make_filter<void, void*>(tbb::filter_mode::parallel, inputFilter) &
1609             tbb::make_filter<void*, void>(tbb::filter_mode::parallel, filterToCancel),
1610             my_ctx
1611         );
1612     }
1613 };
1614 
1615 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm).
1616 void TestCancelation1_pipeline () {
1617     ResetGlobals();
1618     g_ThrowException = false;
1619     RunCancellationTest<PipelineLauncher<FilterToCancel>, Cancellator>(10);
1620     g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived cancellation");
1621     REQUIRE_MESSAGE (g_CurExecuted < g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks were executed after cancellation");
1622 }
1623 
1624 class FilterToCancel2  {
1625 public:
1626     FilterToCancel2() {}
1627 
1628     void* operator()(void* item) const {
1629         ++g_CurExecuted;
1630         utils::ConcurrencyTracker ct;
1631         // The test will hang (and be timed out by the test system) if is_cancelled() is broken
1632         while( !tbb::is_current_task_group_canceling() )
1633             std::this_thread::yield();
1634         return item;
1635     }
1636 };
1637 
1638 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm).
1639 /** This version also tests task::is_cancelled() method. **/
1640 void TestCancelation2_pipeline () {
1641     ResetGlobals();
1642     RunCancellationTest<PipelineLauncher<FilterToCancel2>, Cancellator2>();
1643     // g_CurExecuted is always >= g_ExecutedAtLastCatch, because the latter is always a snapshot of the
1644     // former, and g_CurExecuted is monotonic increasing.  so the comparison should be at least ==.
1645     // If another filter is started after cancel but before cancellation is propagated, then the
1646     // number will be larger.
1647     REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch, "Some tasks were executed after cancellation");
1648 }
1649 
1650 /** If min and max thread numbers specified on the command line are different,
1651     the test is run only for 2 sizes of the thread pool (MinThread and MaxThread)
1652     to be able to test the high and low contention modes while keeping the test reasonably fast **/
1653 
1654 //! Testing parallel_pipeline cancellation
1655 //! \brief \ref error_guessing
1656 TEST_CASE("parallel_pipeline cancellation test #1") {
1657     for (auto concurrency_level: utils::concurrency_range()) {
1658         g_NumThreads = static_cast<int>(concurrency_level);
1659         g_Master = std::this_thread::get_id();
1660         if (g_NumThreads > 1) {
1661             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
1662             // Execute in all the possible modes
1663             for ( size_t j = 0; j < 4; ++j ) {
1664                 g_ExceptionInMaster = (j & 1) != 0;
1665                 g_SolitaryException = (j & 2) != 0;
1666                 g_NumTokens = 2 * g_NumThreads;
1667 
1668                 TestCancelation1_pipeline();
1669             }
1670         }
1671     }
1672 }
1673 
1674 //! Testing parallel_pipeline cancellation
1675 //! \brief \ref error_guessing
1676 TEST_CASE("parallel_pipeline cancellation test #2") {
1677     for (auto concurrency_level: utils::concurrency_range()) {
1678         g_NumThreads = static_cast<int>(concurrency_level);
1679         g_Master = std::this_thread::get_id();
1680         if (g_NumThreads > 1) {
1681             tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads);
1682             // Execute in all the possible modes
1683             for ( size_t j = 0; j < 4; ++j ) {
1684                 g_ExceptionInMaster = (j & 1) != 0;
1685                 g_SolitaryException = (j & 2) != 0;
1686                 g_NumTokens = 2 * g_NumThreads;
1687 
1688                 TestCancelation2_pipeline();
1689             }
1690         }
1691     }
1692 }
1693