xref: /oneTBB/test/tbb/test_eh_algorithms.cpp (revision c4568449)
1 /*
2     Copyright (c) 2005-2023 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "common/test.h"
18 #include "common/concurrency_tracker.h"
19 #include "common/iterator.h"
20 #include "common/utils_concurrency_limit.h"
21 
22 #include <limits.h> // for INT_MAX
23 #include <thread>
24 #include <vector>
25 
26 #include "tbb/parallel_for.h"
27 #include "tbb/parallel_reduce.h"
28 #include "tbb/parallel_for_each.h"
29 #include "tbb/parallel_pipeline.h"
30 #include "tbb/blocked_range.h"
31 #include "tbb/task_group.h"
32 #include "tbb/concurrent_unordered_map.h"
33 #include "tbb/task.h"
34 #include "tbb/global_control.h"
35 
36 //! \file test_eh_algorithms.cpp
37 //! \brief Test for [algorithms.parallel_for algorithms.parallel_reduce algorithms.parallel_deterministic_reduce algorithms.parallel_for_each algorithms.parallel_pipeline algorithms.parallel_pipeline.flow_control] specifications
38 
39 #define FLAT_RANGE  100000
40 #define FLAT_GRAIN  100
41 #define OUTER_RANGE  100
42 #define OUTER_GRAIN  10
43 #define INNER_RANGE  (FLAT_RANGE / OUTER_RANGE)
44 #define INNER_GRAIN  (FLAT_GRAIN / OUTER_GRAIN)
45 
46 struct context_specific_counter {
47     tbb::concurrent_unordered_map<tbb::task_group_context*, std::atomic<unsigned>> context_map{};
48 
incrementcontext_specific_counter49     void increment() {
50         tbb::task_group_context* ctx = tbb::task::current_context();
51         REQUIRE(ctx != nullptr);
52         context_map[ctx]++;
53     }
54 
resetcontext_specific_counter55     void reset() {
56         context_map.clear();
57     }
58 
validatecontext_specific_counter59     void validate(unsigned expected_count, const char* msg) {
60         for (auto it = context_map.begin(); it != context_map.end(); it++) {
61             REQUIRE_MESSAGE( it->second <= expected_count, msg);
62         }
63     }
64 };
65 
66 std::atomic<intptr_t> g_FedTasksCount{}; // number of tasks added by parallel_for_each feeder
67 std::atomic<intptr_t> g_OuterParCalls{};  // number of actual invocations of the outer construct executed.
68 context_specific_counter g_TGCCancelled{};  // Number of times a task sees its group cancelled at start
69 
70 #include "common/exception_handling.h"
71 
72 /********************************
73       Variables in test
74 
75 __ Test control variables
76       g_ExceptionInMaster -- only the external thread is allowed to throw.  If false, the external cannot throw
77       g_SolitaryException -- only one throw may be executed.
78 
79 -- controls for ThrowTestException for pipeline tests
80       g_NestedPipelines -- are inner pipelines being run?
81       g_PipelinesStarted -- how many pipelines have run their first filter at least once.
82 
83 -- Information variables
84 
85    g_Master -- Thread ID of the "external" thread
86       In pipelines sometimes the external thread does not participate, so the tests have to be resilient to this.
87 
88 -- Measurement variables
89 
90    g_OuterParCalls -- how many outer parallel ranges or filters started
91    g_TGCCancelled --  how many inner parallel ranges or filters saw task::self().is_cancelled()
92    g_ExceptionsThrown -- number of throws executed (counted in ThrowTestException)
93    g_MasterExecutedThrow -- number of times external thread actually executed a throw
94    g_NonMasterExecutedThrow -- number of times non-external thread actually executed a throw
95    g_ExceptionCaught -- one of PropagatedException or unknown exception was caught.  (Other exceptions cause assertions.)
96 
97    --  Tallies for the task bodies which have executed (counted in each inner body, sampled in ThrowTestException)
98       g_CurExecuted -- total number of inner ranges or filters which executed
99       g_ExecutedAtLastCatch -- value of g_CurExecuted when last catch was made, 0 if none.
100       g_ExecutedAtFirstCatch -- value of g_CurExecuted when first catch is made, 0 if none.
101   *********************************/
102 
ResetGlobals(bool throwException=true,bool flog=false)103 inline void ResetGlobals (  bool throwException = true, bool flog = false ) {
104     ResetEhGlobals( throwException, flog );
105     g_FedTasksCount = 0;
106     g_OuterParCalls = 0;
107     g_NestedPipelines = false;
108     g_TGCCancelled.reset();
109 }
110 
111 ////////////////////////////////////////////////////////////////////////////////
112 // Tests for tbb::parallel_for and tbb::parallel_reduce
113 ////////////////////////////////////////////////////////////////////////////////
114 
115 typedef size_t count_type;
116 typedef tbb::blocked_range<count_type> range_type;
117 
CountSubranges(range_type r)118 inline intptr_t CountSubranges(range_type r) {
119     if(!r.is_divisible()) return intptr_t(1);
120     range_type r2(r,tbb::split());
121     return CountSubranges(r) + CountSubranges(r2);
122 }
123 
NumSubranges(intptr_t length,intptr_t grain)124 inline intptr_t NumSubranges ( intptr_t length, intptr_t grain ) {
125     return CountSubranges(range_type(0,length,grain));
126 }
127 
128 template<class Body>
TestNumSubrangesCalculation(intptr_t length,intptr_t grain,intptr_t inner_length,intptr_t inner_grain)129 intptr_t TestNumSubrangesCalculation ( intptr_t length, intptr_t grain, intptr_t inner_length, intptr_t inner_grain ) {
130     ResetGlobals();
131     g_ThrowException = false;
132     intptr_t outerCalls = NumSubranges(length, grain),
133              innerCalls = NumSubranges(inner_length, inner_grain),
134              maxExecuted = outerCalls * (innerCalls + 1);
135     tbb::parallel_for( range_type(0, length, grain), Body() );
136     REQUIRE_MESSAGE (g_CurExecuted == maxExecuted, "Wrong estimation of bodies invocation count");
137     return maxExecuted;
138 }
139 
140 class NoThrowParForBody {
141 public:
operator ()(const range_type & r) const142     void operator()( const range_type& r ) const {
143         if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true;
144         else g_NonMasterExecuted = true;
145         if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment();
146         utils::doDummyWork(r.size());
147     }
148 };
149 
150 #if TBB_USE_EXCEPTIONS
151 
Test0()152 void Test0 () {
153     ResetGlobals();
154     tbb::simple_partitioner p;
155     for( size_t i=0; i<10; ++i ) {
156         tbb::parallel_for( range_type(0, 0, 1), NoThrowParForBody() );
157         tbb::parallel_for( range_type(0, 0, 1), NoThrowParForBody(), p );
158         tbb::parallel_for( range_type(0, 128, 8), NoThrowParForBody() );
159         tbb::parallel_for( range_type(0, 128, 8), NoThrowParForBody(), p );
160     }
161 } // void Test0 ()
162 
163 //! Template that creates a functor suitable for parallel_reduce from a functor for parallel_for.
164 template<typename ParForBody>
165 class SimpleParReduceBody {
166     ParForBody m_Body;
167 public:
168     void operator=(const SimpleParReduceBody&) = delete;
169     SimpleParReduceBody(const SimpleParReduceBody&) = default;
170     SimpleParReduceBody() = default;
171 
operator ()(const range_type & r) const172     void operator()( const range_type& r ) const { m_Body(r); }
SimpleParReduceBody(SimpleParReduceBody & left,tbb::split)173     SimpleParReduceBody( SimpleParReduceBody& left, tbb::split ) : m_Body(left.m_Body) {}
join(SimpleParReduceBody &)174     void join( SimpleParReduceBody& /*right*/ ) {}
175 }; // SimpleParReduceBody
176 
177 //! Test parallel_for and parallel_reduce for a given partitioner.
178 /** The Body need only be suitable for a parallel_for. */
179 template<typename ParForBody, typename Partitioner>
TestParallelLoopAux()180 void TestParallelLoopAux() {
181     Partitioner partitioner;
182     for( int i=0; i<2; ++i ) {
183         ResetGlobals();
184         TRY();
185             if( i==0 )
186                 tbb::parallel_for( range_type(0, FLAT_RANGE, FLAT_GRAIN), ParForBody(), partitioner );
187             else {
188                 SimpleParReduceBody<ParForBody> rb;
189                 tbb::parallel_reduce( range_type(0, FLAT_RANGE, FLAT_GRAIN), rb, partitioner );
190             }
191         CATCH_AND_ASSERT();
192         // two cases: g_SolitaryException and !g_SolitaryException
193         //   1) g_SolitaryException: only one thread actually threw.  There is only one context, so the exception
194         //      (when caught) will cause that context to be cancelled.  After this event, there may be one or
195         //      more threads which are "in-flight", up to g_NumThreads, but no more will be started.  The threads,
196         //      when they start, if they see they are cancelled, TGCCancelled is incremented.
197         //   2) !g_SolitaryException: more than one thread can throw.  The number of threads that actually
198         //      threw is g_MasterExecutedThrow if only the external thread is allowed, else g_NonMasterExecutedThrow.
199         //      Only one context, so TGCCancelled should be <= g_NumThreads.
200         //
201         // the reasoning is similar for nested algorithms in a single context (Test2).
202         //
203         // If a thread throws in a context, more than one subsequent task body may see the
204         // cancelled state (if they are scheduled before the state is propagated.) this is
205         // infrequent, but it occurs.  So what was to be an assertion must be a remark.
206         g_TGCCancelled.validate( g_NumThreads, "Too many tasks ran after exception thrown");
207         REQUIRE_MESSAGE(g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
208         if ( g_SolitaryException ) {
209             REQUIRE_MESSAGE(g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test");
210             REQUIRE_MESSAGE(g_NumExceptionsCaught == (g_ExceptionInMaster ? g_MasterExecutedThrow : g_NonMasterExecutedThrow),
211                 "Not all throws were caught");
212             REQUIRE_MESSAGE(g_ExecutedAtFirstCatch == g_ExecutedAtLastCatch, "Too many exceptions occurred");
213         }
214         else {
215             REQUIRE_MESSAGE(g_NumExceptionsCaught >= 1, "No try blocks in any body expected in this test");
216         }
217     }
218 }  // TestParallelLoopAux
219 
220 //! Test with parallel_for and parallel_reduce, over all three kinds of partitioners.
221 /** The Body only needs to be suitable for tbb::parallel_for. */
222 template<typename Body>
TestParallelLoop()223 void TestParallelLoop() {
224     // The simple and auto partitioners should be const, but not the affinity partitioner.
225     TestParallelLoopAux<Body, const tbb::simple_partitioner  >();
226     TestParallelLoopAux<Body, const tbb::auto_partitioner    >();
227 #define __TBB_TEMPORARILY_DISABLED 1
228 #if !__TBB_TEMPORARILY_DISABLED
229     // TODO: Improve the test so that it tolerates delayed start of tasks with affinity_partitioner
230     TestParallelLoopAux<Body, /***/ tbb::affinity_partitioner>();
231 #endif
232 #undef __TBB_TEMPORARILY_DISABLED
233 }
234 
235 class SimpleParForBody {
236 public:
237     void operator=(const SimpleParForBody&) = delete;
238     SimpleParForBody(const SimpleParForBody&) = default;
239     SimpleParForBody() = default;
240 
operator ()(const range_type & r) const241     void operator()( const range_type& r ) const {
242         utils::ConcurrencyTracker ct;
243         ++g_CurExecuted;
244         if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true;
245         else g_NonMasterExecuted = true;
246         if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment();
247         utils::doDummyWork(r.size());
248         WaitUntilConcurrencyPeaks();
249         ThrowTestException(1);
250     }
251 };
252 
Test1()253 void Test1() {
254     // non-nested parallel_for/reduce with throwing body, one context
255     TestParallelLoop<SimpleParForBody>();
256 } // void Test1 ()
257 
258 class OuterParForBody {
259 public:
260     void operator=(const OuterParForBody&) = delete;
261     OuterParForBody(const OuterParForBody&) = default;
262     OuterParForBody() = default;
operator ()(const range_type &) const263     void operator()( const range_type& ) const {
264         utils::ConcurrencyTracker ct;
265         ++g_OuterParCalls;
266         tbb::parallel_for( tbb::blocked_range<size_t>(0, INNER_RANGE, INNER_GRAIN), SimpleParForBody() );
267     }
268 };
269 
270 //! Uses parallel_for body containing an inner parallel_for with the default context not wrapped by a try-block.
271 /** Inner algorithms are spawned inside the new bound context by default. Since
272     exceptions thrown from the inner parallel_for are not handled by the caller
273     (outer parallel_for body) in this test, they will cancel all the sibling inner
274     algorithms. **/
Test2()275 void Test2 () {
276     TestParallelLoop<OuterParForBody>();
277 } // void Test2 ()
278 
279 class OuterParForBodyWithIsolatedCtx {
280 public:
operator ()(const range_type &) const281     void operator()( const range_type& ) const {
282         tbb::task_group_context ctx(tbb::task_group_context::isolated);
283         ++g_OuterParCalls;
284         tbb::parallel_for( tbb::blocked_range<size_t>(0, INNER_RANGE, INNER_GRAIN), SimpleParForBody(), tbb::simple_partitioner(), ctx );
285     }
286 };
287 
288 //! Uses parallel_for body invoking an inner parallel_for with an isolated context without a try-block.
289 /** Even though exceptions thrown from the inner parallel_for are not handled
290     by the caller in this test, they will not affect sibling inner algorithms
291     already running because of the isolated contexts. However because the first
292     exception cancels the root parallel_for only the first g_NumThreads subranges
293     will be processed (which launch inner parallel_fors) **/
Test3()294 void Test3 () {
295     ResetGlobals();
296     typedef OuterParForBodyWithIsolatedCtx body_type;
297     intptr_t  innerCalls = NumSubranges(INNER_RANGE, INNER_GRAIN),
298             // we expect one thread to throw without counting, the rest to run to completion
299             // this formula assumes g_numThreads outer pfor ranges will be started, but that is not the
300             // case; the SimpleParFor subranges are started up as part of the outer ones, and when
301             // the amount of concurrency reaches g_NumThreads no more outer Pfor ranges are started.
302             // so we have to count the number of outer Pfors actually started.
303             minExecuted = (g_NumThreads - 1) * innerCalls;
304     TRY();
305         tbb::parallel_for( range_type(0, OUTER_RANGE, OUTER_GRAIN), body_type() );
306     CATCH_AND_ASSERT();
307     minExecuted = (g_OuterParCalls - 1) * innerCalls;  // see above
308 
309     // The first formula above assumes all ranges of the outer parallel for are executed, and one
310     // cancels.  In the event, we have a smaller number of ranges that start before the exception
311     // is caught.
312     //
313     //  g_SolitaryException:One inner range throws.  Outer parallel_For is cancelled, but sibling
314     //                      parallel_fors continue to completion (unless the threads that execute
315     //                      are not allowed to throw, in which case we will not see any exceptions).
316     // !g_SolitaryException:multiple inner ranges may throw.  Any which throws will stop, and the
317     //                      corresponding range of the outer pfor will stop also.
318     //
319     // In either case, once the outer pfor gets the exception it will stop executing further ranges.
320 
321     // if the only threads executing were not allowed to throw, then not seeing an exception is okay.
322     bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecuted) || (!g_ExceptionInMaster && !g_NonMasterExecuted);
323     if ( g_SolitaryException ) {
324         g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception");
325         REQUIRE_MESSAGE (g_CurExecuted > minExecuted, "Too few tasks survived exception");
326         REQUIRE_MESSAGE ((g_CurExecuted <= minExecuted + (g_ExecutedAtLastCatch + g_NumThreads)), "Too many tasks survived exception");
327         REQUIRE_MESSAGE ((g_NumExceptionsCaught == 1 || okayNoExceptionsCaught), "No try_blocks in any body expected in this test");
328     }
329     else {
330         REQUIRE_MESSAGE ((g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads), "Too many tasks survived exception");
331         REQUIRE_MESSAGE ((g_NumExceptionsCaught >= 1 || okayNoExceptionsCaught), "No try_blocks in any body expected in this test");
332     }
333 } // void Test3 ()
334 
335 class OuterParForExceptionSafeBody {
336 public:
operator ()(const range_type &) const337     void operator()( const range_type& ) const {
338         tbb::task_group_context ctx(tbb::task_group_context::isolated);
339         ++g_OuterParCalls;
340         TRY();
341             tbb::parallel_for( tbb::blocked_range<size_t>(0, INNER_RANGE, INNER_GRAIN), SimpleParForBody(), tbb::simple_partitioner(), ctx );
342         CATCH();  // this macro sets g_ExceptionCaught
343     }
344 };
345 
346 //! Uses parallel_for body invoking an inner parallel_for (with isolated context) inside a try-block.
347 /** Since exception(s) thrown from the inner parallel_for are handled by the caller
348     in this test, they do not affect neither other tasks of the the root parallel_for
349     nor sibling inner algorithms. **/
Test4()350 void Test4 () {
351     ResetGlobals( true, true );
352     intptr_t  innerCalls = NumSubranges(INNER_RANGE, INNER_GRAIN),
353               outerCalls = NumSubranges(OUTER_RANGE, OUTER_GRAIN);
354     TRY();
355         tbb::parallel_for( range_type(0, OUTER_RANGE, OUTER_GRAIN), OuterParForExceptionSafeBody() );
356     CATCH();
357     // g_SolitaryException  : one inner pfor will throw, the rest will execute to completion.
358     //                        so the count should be (outerCalls -1) * innerCalls, if a throw happened.
359     // !g_SolitaryException : possible multiple inner pfor throws.  Should be approximately
360     //                        (outerCalls - g_NumExceptionsCaught) * innerCalls, give or take a few
361     intptr_t  minExecuted = (outerCalls - g_NumExceptionsCaught) * innerCalls;
362     bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecuted) || (!g_ExceptionInMaster && !g_NonMasterExecuted);
363     if ( g_SolitaryException ) {
364         // only one task had exception thrown. That task had at least one execution (the one that threw).
365         // There may be an arbitrary number of ranges executed after the throw but before the exception
366         // is caught in the scheduler and cancellation is signaled.  (seen 9, 11 and 62 (!) for 8 threads)
367         REQUIRE_MESSAGE ((g_NumExceptionsCaught == 1 || okayNoExceptionsCaught), "No exception registered");
368         REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too few tasks executed");
369         g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception");
370         // a small number of threads can execute in a throwing sub-pfor, if the task which is
371         // to do the solitary throw swaps out after registering its intent to throw but before it
372         // actually does so. As a result, the number of extra tasks cannot exceed the number of thread
373         // for each nested pfor invication)
374         REQUIRE_MESSAGE (g_CurExecuted <= minExecuted + (g_ExecutedAtLastCatch + g_NumThreads), "Too many tasks survived exception");
375     }
376     else {
377         REQUIRE_MESSAGE (((g_NumExceptionsCaught >= 1 && g_NumExceptionsCaught <= outerCalls) || okayNoExceptionsCaught), "Unexpected actual number of exceptions");
378         REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too few executed tasks reported");
379         REQUIRE_MESSAGE ((g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads), "Too many tasks survived multiple exceptions");
380         REQUIRE_MESSAGE (g_CurExecuted <= outerCalls * (1 + g_NumThreads), "Too many tasks survived exception");
381     }
382 } // void Test4 ()
383 
384 //! Testing parallel_for and parallel_reduce exception handling
385 //! \brief \ref error_guessing
386 TEST_CASE("parallel_for and parallel_reduce exception handling test #0") {
387     for (auto concurrency_level: utils::concurrency_range()) {
388         g_NumThreads = static_cast<int>(concurrency_level);
389         g_Master = std::this_thread::get_id();
390         if (g_NumThreads > 1) {
391             tbb::task_arena a(g_NumThreads);
__anone85726900102null392             a.execute([] {
393                 // Execute in all the possible modes
394                 for (size_t j = 0; j < 4; ++j) {
395                     g_ExceptionInMaster = (j & 1) != 0;
396                     g_SolitaryException = (j & 2) != 0;
397 
398                     Test0();
399                 }
400             });
401         }
402     }
403 }
404 
405 //! Testing parallel_for and parallel_reduce exception handling
406 //! \brief \ref error_guessing
407 TEST_CASE("parallel_for and parallel_reduce exception handling test #1") {
408     for (auto concurrency_level: utils::concurrency_range()) {
409         g_NumThreads = static_cast<int>(concurrency_level);
410         g_Master = std::this_thread::get_id();
411         if (g_NumThreads > 1) {
412             tbb::task_arena a(g_NumThreads);
__anone85726900202null413             a.execute([] {
414                 // Execute in all the possible modes
415                 for (size_t j = 0; j < 4; ++j) {
416                     g_ExceptionInMaster = (j & 1) != 0;
417                     g_SolitaryException = (j & 2) != 0;
418 
419                     Test1();
420                 }
421             });
422         }
423     }
424 }
425 
426 //! Testing parallel_for and parallel_reduce exception handling
427 //! \brief \ref error_guessing
428 TEST_CASE("parallel_for and parallel_reduce exception handling test #2") {
429     for (auto concurrency_level: utils::concurrency_range()) {
430         g_NumThreads = static_cast<int>(concurrency_level);
431         g_Master = std::this_thread::get_id();
432         if (g_NumThreads > 1) {
433             tbb::task_arena a(g_NumThreads);
__anone85726900302null434             a.execute([] {
435                 // Execute in all the possible modes
436                 for (size_t j = 0; j < 4; ++j) {
437                     g_ExceptionInMaster = (j & 1) != 0;
438                     g_SolitaryException = (j & 2) != 0;
439 
440                     Test2();
441                 }
442             });
443         }
444     }
445 }
446 
447 //! Testing parallel_for and parallel_reduce exception handling
448 //! \brief \ref error_guessing
449 TEST_CASE("parallel_for and parallel_reduce exception handling test #3") {
450     for (auto concurrency_level: utils::concurrency_range()) {
451         g_NumThreads = static_cast<int>(concurrency_level);
452         g_Master = std::this_thread::get_id();
453         if (g_NumThreads > 1) {
454             tbb::task_arena a(g_NumThreads);
__anone85726900402null455             a.execute([] {
456                 // Execute in all the possible modes
457                 for (size_t j = 0; j < 4; ++j) {
458                     g_ExceptionInMaster = (j & 1) != 0;
459                     g_SolitaryException = (j & 2) != 0;
460 
461                     Test3();
462                 }
463             });
464         }
465     }
466 }
467 
468 //! Testing parallel_for and parallel_reduce exception handling
469 //! \brief \ref error_guessing
470 TEST_CASE("parallel_for and parallel_reduce exception handling test #4") {
471     for (auto concurrency_level: utils::concurrency_range()) {
472         g_NumThreads = static_cast<int>(concurrency_level);
473         g_Master = std::this_thread::get_id();
474         if (g_NumThreads > 1) {
475             tbb::task_arena a(g_NumThreads);
__anone85726900502null476             a.execute([] {
477                 // Execute in all the possible modes
478                 for (size_t j = 0; j < 4; ++j) {
479                     g_ExceptionInMaster = (j & 1) != 0;
480                     g_SolitaryException = (j & 2) != 0;
481 
482                     Test4();
483                 }
484             });
485         }
486     }
487 }
488 
489 #endif /* TBB_USE_EXCEPTIONS */
490 
491 class ParForBodyToCancel {
492 public:
operator ()(const range_type &) const493     void operator()( const range_type& ) const {
494         ++g_CurExecuted;
495         Cancellator::WaitUntilReady();
496     }
497 };
498 
499 template<class B>
500 class ParForLauncher {
501     tbb::task_group_context &my_ctx;
502 public:
operator ()() const503     void operator()() const {
504         tbb::parallel_for( range_type(0, FLAT_RANGE, FLAT_GRAIN), B(), tbb::simple_partitioner(), my_ctx );
505     }
ParForLauncher(tbb::task_group_context & ctx)506     ParForLauncher ( tbb::task_group_context& ctx ) : my_ctx(ctx) {}
507 };
508 
509 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm).
TestCancelation1()510 void TestCancelation1 () {
511     ResetGlobals( false );
512     RunCancellationTest<ParForLauncher<ParForBodyToCancel>, Cancellator>( NumSubranges(FLAT_RANGE, FLAT_GRAIN) / 4 );
513 }
514 
515 class Cancellator2  {
516     tbb::task_group_context &m_GroupToCancel;
517 
518 public:
operator ()() const519     void operator()() const {
520         utils::ConcurrencyTracker ct;
521         WaitUntilConcurrencyPeaks();
522         m_GroupToCancel.cancel_group_execution();
523         g_ExecutedAtLastCatch = g_CurExecuted.load();
524     }
525 
Cancellator2(tbb::task_group_context & ctx,intptr_t)526     Cancellator2 ( tbb::task_group_context& ctx, intptr_t ) : m_GroupToCancel(ctx) {}
527 };
528 
529 class ParForBodyToCancel2 {
530 public:
operator ()(const range_type &) const531     void operator()( const range_type& ) const {
532         ++g_CurExecuted;
533         utils::ConcurrencyTracker ct;
534         // The test will hang (and be timed out by the test system) if is_cancelled() is broken
535         while( !tbb::is_current_task_group_canceling() )
536             utils::yield();
537     }
538 };
539 
540 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm).
541 /** This version also tests tbb::is_current_task_group_canceling() method. **/
TestCancelation2()542 void TestCancelation2 () {
543     ResetGlobals();
544     RunCancellationTest<ParForLauncher<ParForBodyToCancel2>, Cancellator2>();
545     REQUIRE_MESSAGE (g_ExecutedAtLastCatch < g_NumThreads, "Somehow worker tasks started their execution before the cancellator task");
546     g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived cancellation");
547     REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Some tasks were executed after cancellation");
548 }
549 
550 ////////////////////////////////////////////////////////////////////////////////
551 // Regression test based on the contribution by the author of the following forum post:
552 // http://softwarecommunity.intel.com/isn/Community/en-US/forums/thread/30254959.aspx
553 
554 class Worker {
555     static const int max_nesting = 3;
556     static const int reduce_range = 1024;
557     static const int reduce_grain = 256;
558 public:
559     int DoWork (int level);
Validate(int start_level)560     int Validate (int start_level) {
561         int expected = 1; // identity for multiplication
562         for(int i=start_level+1; i<max_nesting; ++i)
563              expected *= reduce_range;
564         return expected;
565     }
566 };
567 
568 class RecursiveParReduceBodyWithSharedWorker {
569     Worker * m_SharedWorker;
570     int m_NestingLevel;
571     int m_Result;
572 public:
RecursiveParReduceBodyWithSharedWorker(RecursiveParReduceBodyWithSharedWorker & src,tbb::split)573     RecursiveParReduceBodyWithSharedWorker ( RecursiveParReduceBodyWithSharedWorker& src, tbb::split )
574         : m_SharedWorker(src.m_SharedWorker)
575         , m_NestingLevel(src.m_NestingLevel)
576         , m_Result(0)
577     {}
RecursiveParReduceBodyWithSharedWorker(Worker * w,int outer)578     RecursiveParReduceBodyWithSharedWorker ( Worker *w, int outer )
579         : m_SharedWorker(w)
580         , m_NestingLevel(outer)
581         , m_Result(0)
582     {}
583 
operator ()(const tbb::blocked_range<size_t> & r)584     void operator() ( const tbb::blocked_range<size_t>& r ) {
585         if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true;
586         else g_NonMasterExecuted = true;
587         if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment();
588         for (size_t i = r.begin (); i != r.end (); ++i) {
589             m_Result += m_SharedWorker->DoWork (m_NestingLevel);
590         }
591     }
join(const RecursiveParReduceBodyWithSharedWorker & x)592     void join (const RecursiveParReduceBodyWithSharedWorker & x) {
593         m_Result += x.m_Result;
594     }
result()595     int result () { return m_Result; }
596 };
597 
DoWork(int level)598 int Worker::DoWork ( int level ) {
599     ++level;
600     if ( level < max_nesting ) {
601         RecursiveParReduceBodyWithSharedWorker rt (this, level);
602         tbb::parallel_reduce (tbb::blocked_range<size_t>(0, reduce_range, reduce_grain), rt);
603         return rt.result();
604     }
605     else
606         return 1;
607 }
608 
609 //! Regression test for hanging that occurred with the first version of cancellation propagation
TestCancelation3()610 void TestCancelation3 () {
611     Worker w;
612     int result   = w.DoWork (0);
613     int expected = w.Validate(0);
614     REQUIRE_MESSAGE ( result == expected, "Wrong calculation result");
615 }
616 
617 struct StatsCounters {
618     std::atomic<size_t> my_total_created;
619     std::atomic<size_t> my_total_deleted;
StatsCountersStatsCounters620     StatsCounters() {
621         my_total_created = 0;
622         my_total_deleted = 0;
623     }
624 };
625 
626 class ParReduceBody {
627     StatsCounters* my_stats;
628     size_t my_id;
629     bool my_exception;
630     tbb::task_group_context& tgc;
631 
632 public:
ParReduceBody(StatsCounters & s_,tbb::task_group_context & context,bool e_)633     ParReduceBody( StatsCounters& s_, tbb::task_group_context& context, bool e_ ) :
634         my_stats(&s_), my_exception(e_), tgc(context) {
635         my_id = my_stats->my_total_created++;
636     }
637 
ParReduceBody(const ParReduceBody & lhs)638     ParReduceBody( const ParReduceBody& lhs ) : tgc(lhs.tgc) {
639         my_stats = lhs.my_stats;
640         my_id = my_stats->my_total_created++;
641     }
642 
ParReduceBody(ParReduceBody & lhs,tbb::split)643     ParReduceBody( ParReduceBody& lhs, tbb::split ) : tgc(lhs.tgc) {
644         my_stats = lhs.my_stats;
645         my_id = my_stats->my_total_created++;
646     }
647 
~ParReduceBody()648     ~ParReduceBody(){ ++my_stats->my_total_deleted; }
649 
operator ()(const tbb::blocked_range<std::size_t> &) const650     void operator()( const tbb::blocked_range<std::size_t>& /*range*/ ) const {
651         //Do nothing, except for one task (chosen arbitrarily)
652         if( my_id >= 12 ) {
653             if( my_exception )
654                 ThrowTestException(1);
655             else
656                 tgc.cancel_group_execution();
657         }
658     }
659 
join(ParReduceBody &)660     void join( ParReduceBody& /*rhs*/ ) {}
661 };
662 
TestCancelation4()663 void TestCancelation4() {
664     StatsCounters statsObj;
665 #if TBB_USE_EXCEPTIONS
666     try
667 #endif
668     {
669         tbb::task_group_context tgc1, tgc2;
670         ParReduceBody body_for_cancellation(statsObj, tgc1, false), body_for_exception(statsObj, tgc2, true);
671         tbb::parallel_reduce( tbb::blocked_range<std::size_t>(0,100000000,100), body_for_cancellation, tbb::simple_partitioner(), tgc1 );
672         tbb::parallel_reduce( tbb::blocked_range<std::size_t>(0,100000000,100), body_for_exception, tbb::simple_partitioner(), tgc2 );
673     }
674 #if TBB_USE_EXCEPTIONS
675     catch(...) {}
676 #endif
677     REQUIRE_MESSAGE ( statsObj.my_total_created==statsObj.my_total_deleted, "Not all parallel_reduce body objects created were reclaimed");
678 }
679 
680 //! Testing parallel_for and parallel_reduce cancellation
681 //! \brief \ref error_guessing
682 TEST_CASE("parallel_for and parallel_reduce cancellation test #1") {
683     for (auto concurrency_level: utils::concurrency_range()) {
684         g_NumThreads = static_cast<int>(concurrency_level);
685         g_Master = std::this_thread::get_id();
686         if (g_NumThreads > 1) {
687             tbb::task_arena a(g_NumThreads);
__anone85726900602null688             a.execute([] {
689                 // Execute in all the possible modes
690                 for (size_t j = 0; j < 4; ++j) {
691                     g_ExceptionInMaster = (j & 1) != 0;
692                     g_SolitaryException = (j & 2) != 0;
693 
694                     TestCancelation1();
695                 }
696             });
697         }
698     }
699 }
700 
701 //! Testing parallel_for and parallel_reduce cancellation
702 //! \brief \ref error_guessing
703 TEST_CASE("parallel_for and parallel_reduce cancellation test #2") {
704     for (auto concurrency_level: utils::concurrency_range()) {
705         g_NumThreads = static_cast<int>(concurrency_level);
706         g_Master = std::this_thread::get_id();
707         if (g_NumThreads > 1) {
708             tbb::task_arena a(g_NumThreads);
__anone85726900702null709             a.execute([] {
710                 // Execute in all the possible modes
711                 for (size_t j = 0; j < 4; ++j) {
712                     g_ExceptionInMaster = (j & 1) != 0;
713                     g_SolitaryException = (j & 2) != 0;
714 
715                     TestCancelation2();
716                 }
717             });
718         }
719     }
720 }
721 
722 //! Testing parallel_for and parallel_reduce cancellation
723 //! \brief \ref error_guessing
724 TEST_CASE("parallel_for and parallel_reduce cancellation test #3") {
725     for (auto concurrency_level: utils::concurrency_range()) {
726         g_NumThreads = static_cast<int>(concurrency_level);
727         g_Master = std::this_thread::get_id();
728         if (g_NumThreads > 1) {
729             tbb::task_arena a(g_NumThreads);
__anone85726900802null730             a.execute([] {
731                 // Execute in all the possible modes
732                 for (size_t j = 0; j < 4; ++j) {
733                     g_ExceptionInMaster = (j & 1) != 0;
734                     g_SolitaryException = (j & 2) != 0;
735 
736                     TestCancelation3();
737                 }
738             });
739         }
740     }
741 }
742 
743 //! Testing parallel_for and parallel_reduce cancellation
744 //! \brief \ref error_guessing
745 TEST_CASE("parallel_for and parallel_reduce cancellation test #4") {
746     for (auto concurrency_level: utils::concurrency_range()) {
747         g_NumThreads = static_cast<int>(concurrency_level);
748         g_Master = std::this_thread::get_id();
749         if (g_NumThreads > 1) {
750             tbb::task_arena a(g_NumThreads);
__anone85726900902null751             a.execute([] {
752                 // Execute in all the possible modes
753                 for (size_t j = 0; j < 4; ++j) {
754                     g_ExceptionInMaster = (j & 1) != 0;
755                     g_SolitaryException = (j & 2) != 0;
756 
757                     TestCancelation4();
758                 }
759             });
760         }
761     }
762 }
763 
764 ////////////////////////////////////////////////////////////////////////////////
765 // Tests for tbb::parallel_for_each
766 ////////////////////////////////////////////////////////////////////////////////
767 
get_iter_range_size()768 std::size_t get_iter_range_size() {
769     // Set the minimal iteration sequence size to 50 to improve test complexity on small machines
770     return std::max(50, g_NumThreads * 2);
771 }
772 
773 template<typename Iterator>
774 struct adaptive_range {
775     std::vector<std::size_t> my_array;
776 
adaptive_rangeadaptive_range777     adaptive_range(std::size_t size) : my_array(size + 1) {}
778     using iterator = Iterator;
779 
beginadaptive_range780     iterator begin() const {
781         return iterator{&my_array.front()};
782     }
beginadaptive_range783     iterator begin() {
784         return iterator{&my_array.front()};
785     }
endadaptive_range786     iterator end() const {
787         return iterator{&my_array.back()};
788     }
endadaptive_range789     iterator end() {
790         return iterator{&my_array.back()};
791     }
792 };
793 
Feed(tbb::feeder<size_t> & feeder,size_t val)794 void Feed ( tbb::feeder<size_t> &feeder, size_t val ) {
795     if (g_FedTasksCount < 50) {
796         ++g_FedTasksCount;
797         feeder.add(val);
798     }
799 }
800 
801 #define RunWithSimpleBody(func, body)       \
802     func<utils::ForwardIterator<size_t>, body>();         \
803     func<utils::ForwardIterator<size_t>, body##WithFeeder>()
804 
805 #define RunWithTemplatedBody(func, body)     \
806     func<utils::ForwardIterator<size_t>, body<utils::ForwardIterator<size_t> > >();         \
807     func<utils::ForwardIterator<size_t>, body##WithFeeder<utils::ForwardIterator<size_t> > >()
808 
809 #if TBB_USE_EXCEPTIONS
810 
811 // Simple functor object with exception
812 class SimpleParForEachBody {
813 public:
operator ()(size_t & value) const814     void operator() ( size_t &value ) const {
815         ++g_CurExecuted;
816         if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true;
817         else g_NonMasterExecuted = true;
818         if( tbb::is_current_task_group_canceling() ) {
819             g_TGCCancelled.increment();
820         }
821         utils::ConcurrencyTracker ct;
822         value += 1000;
823         WaitUntilConcurrencyPeaks();
824         ThrowTestException(1);
825     }
826 };
827 
828 // Simple functor object with exception and feeder
829 class SimpleParForEachBodyWithFeeder : SimpleParForEachBody {
830 public:
operator ()(size_t & value,tbb::feeder<size_t> & feeder) const831     void operator() ( size_t &value, tbb::feeder<size_t> &feeder ) const {
832         Feed(feeder, 0);
833         SimpleParForEachBody::operator()(value);
834     }
835 };
836 
837 // Tests exceptions without nesting
838 template <class Iterator, class simple_body>
Test1_parallel_for_each()839 void Test1_parallel_for_each () {
840     ResetGlobals();
841     auto range = adaptive_range<Iterator>(get_iter_range_size());
842     TRY();
843         tbb::parallel_for_each(std::begin(range), std::end(range), simple_body() );
844     CATCH_AND_ASSERT();
845     REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
846     g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived cancellation");
847     REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test");
848     if ( !g_SolitaryException )
849         REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
850 
851 } // void Test1_parallel_for_each ()
852 
853 template <class Iterator>
854 class OuterParForEachBody {
855 public:
operator ()(size_t &) const856     void operator()( size_t& /*value*/ ) const {
857         ++g_OuterParCalls;
858         auto range = adaptive_range<Iterator>(get_iter_range_size());
859         tbb::parallel_for_each(std::begin(range), std::end(range), SimpleParForEachBody());
860     }
861 };
862 
863 template <class Iterator>
864 class OuterParForEachBodyWithFeeder : OuterParForEachBody<Iterator> {
865 public:
operator ()(size_t & value,tbb::feeder<size_t> & feeder) const866     void operator()( size_t& value, tbb::feeder<size_t>& feeder ) const {
867         Feed(feeder, 0);
868         OuterParForEachBody<Iterator>::operator()(value);
869     }
870 };
871 
872 //! Uses parallel_for_each body containing an inner parallel_for_each with the default context not wrapped by a try-block.
873 /** Inner algorithms are spawned inside the new bound context by default. Since
874     exceptions thrown from the inner parallel_for_each are not handled by the caller
875     (outer parallel_for_each body) in this test, they will cancel all the sibling inner
876     algorithms. **/
877 template <class Iterator, class outer_body>
Test2_parallel_for_each()878 void Test2_parallel_for_each () {
879     ResetGlobals();
880     auto range = adaptive_range<Iterator>(get_iter_range_size());
881     TRY();
882         tbb::parallel_for_each(std::begin(range), std::end(range), outer_body() );
883     CATCH_AND_ASSERT();
884     REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
885     g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived cancellation");
886     REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test");
887     if ( !g_SolitaryException )
888         REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
889 } // void Test2_parallel_for_each ()
890 
891 template <class Iterator>
892 class OuterParForEachBodyWithIsolatedCtx {
893 public:
operator ()(size_t &) const894     void operator()( size_t& /*value*/ ) const {
895         tbb::task_group_context ctx(tbb::task_group_context::isolated);
896         ++g_OuterParCalls;
897         auto range = adaptive_range<Iterator>(get_iter_range_size());
898         tbb::parallel_for_each(std::begin(range), std::end(range), SimpleParForEachBody(), ctx);
899     }
900 };
901 
902 template <class Iterator>
903 class OuterParForEachBodyWithIsolatedCtxWithFeeder : OuterParForEachBodyWithIsolatedCtx<Iterator> {
904 public:
operator ()(size_t & value,tbb::feeder<size_t> & feeder) const905     void operator()( size_t& value, tbb::feeder<size_t> &feeder ) const {
906         Feed(feeder, 0);
907         OuterParForEachBodyWithIsolatedCtx<Iterator>::operator()(value);
908     }
909 };
910 
911 //! Uses parallel_for_each body invoking an inner parallel_for_each with an isolated context without a try-block.
912 /** Even though exceptions thrown from the inner parallel_for_each are not handled
913     by the caller in this test, they will not affect sibling inner algorithms
914     already running because of the isolated contexts. However because the first
915     exception cancels the root parallel_for_each, at most the first g_NumThreads subranges
916     will be processed (which launch inner parallel_for_eachs) **/
917 template <class Iterator, class outer_body>
Test3_parallel_for_each()918 void Test3_parallel_for_each () {
919     ResetGlobals();
920     auto range = adaptive_range<Iterator>(get_iter_range_size());
921     intptr_t innerCalls = get_iter_range_size(),
922              // The assumption here is the same as in outer parallel fors.
923              minExecuted = (g_NumThreads - 1) * innerCalls;
924     g_Master = std::this_thread::get_id();
925     TRY();
926         tbb::parallel_for_each(std::begin(range), std::end(range), outer_body());
927     CATCH_AND_ASSERT();
928     // figure actual number of expected executions given the number of outer PDos started.
929     minExecuted = (g_OuterParCalls - 1) * innerCalls;
930     // one extra thread may run a task that sees cancellation.  Infrequent but possible
931     g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception");
932     if ( g_SolitaryException ) {
933         REQUIRE_MESSAGE (g_CurExecuted > minExecuted, "Too few tasks survived exception");
934         REQUIRE_MESSAGE (g_CurExecuted <= minExecuted + (g_ExecutedAtLastCatch + g_NumThreads), "Too many tasks survived exception");
935     }
936     REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test");
937     if ( !g_SolitaryException )
938         REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
939 } // void Test3_parallel_for_each ()
940 
941 template <class Iterator>
942 class OuterParForEachWithEhBody {
943 public:
operator ()(size_t &) const944     void operator()( size_t& /*value*/ ) const {
945         tbb::task_group_context ctx(tbb::task_group_context::isolated);
946         ++g_OuterParCalls;
947         auto range = adaptive_range<Iterator>(get_iter_range_size());
948         TRY();
949             tbb::parallel_for_each(std::begin(range), std::end(range), SimpleParForEachBody(), ctx);
950         CATCH();
951     }
952 };
953 
954 template <class Iterator>
955 class OuterParForEachWithEhBodyWithFeeder : OuterParForEachWithEhBody<Iterator> {
956 public:
957     void operator=(const OuterParForEachWithEhBodyWithFeeder&) = delete;
958     OuterParForEachWithEhBodyWithFeeder(const OuterParForEachWithEhBodyWithFeeder&) = default;
959     OuterParForEachWithEhBodyWithFeeder() = default;
operator ()(size_t & value,tbb::feeder<size_t> & feeder) const960     void operator()( size_t &value, tbb::feeder<size_t> &feeder ) const {
961         Feed(feeder, 0);
962         OuterParForEachWithEhBody<Iterator>::operator()(value);
963     }
964 };
965 
966 //! Uses parallel_for body invoking an inner parallel_for (with default bound context) inside a try-block.
967 /** Since exception(s) thrown from the inner parallel_for are handled by the caller
968     in this test, they do not affect neither other tasks of the the root parallel_for
969     nor sibling inner algorithms. **/
970 template <class Iterator, class outer_body_with_eh>
Test4_parallel_for_each()971 void Test4_parallel_for_each () {
972     ResetGlobals( true, true );
973     auto range = adaptive_range<Iterator>(get_iter_range_size());
974     g_Master = std::this_thread::get_id();
975     TRY();
976         tbb::parallel_for_each(std::begin(range), std::end(range), outer_body_with_eh());
977     CATCH();
978     REQUIRE_MESSAGE (!l_ExceptionCaughtAtCurrentLevel, "All exceptions must have been handled in the parallel_for_each body");
979     intptr_t innerCalls = get_iter_range_size(),
980              outerCalls = get_iter_range_size() + g_FedTasksCount,
981              maxExecuted = outerCalls * innerCalls,
982              minExecuted = 0;
983     g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception");
984     if ( g_SolitaryException ) {
985         minExecuted = maxExecuted - innerCalls;
986         REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No exception registered");
987         REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too few tasks executed");
988         // This test has the same property as Test4 (parallel_for); the exception can be
989         // thrown, but some number of tasks from the outer Pdo can execute after the throw but
990         // before the cancellation is signaled (have seen 36).
991         DOCTEST_WARN_MESSAGE(g_CurExecuted < maxExecuted, "All tasks survived exception. Oversubscription?");
992     }
993     else {
994         minExecuted = g_NumExceptionsCaught;
995         REQUIRE_MESSAGE ((g_NumExceptionsCaught > 1 && g_NumExceptionsCaught <= outerCalls), "Unexpected actual number of exceptions");
996         REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too many executed tasks reported");
997         REQUIRE_MESSAGE (g_CurExecuted < g_ExecutedAtLastCatch + g_NumThreads + outerCalls, "Too many tasks survived multiple exceptions");
998         REQUIRE_MESSAGE (g_CurExecuted <= outerCalls * (1 + g_NumThreads), "Too many tasks survived exception");
999     }
1000 } // void Test4_parallel_for_each ()
1001 
1002 // This body throws an exception only if the task was added by feeder
1003 class ParForEachBodyWithThrowingFeederTasks {
1004 public:
1005     //! This form of the function call operator can be used when the body needs to add more work during the processing
operator ()(const size_t & value,tbb::feeder<size_t> & feeder) const1006     void operator() (const size_t &value, tbb::feeder<size_t> &feeder ) const {
1007         ++g_CurExecuted;
1008         if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true;
1009         else g_NonMasterExecuted = true;
1010         if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment();
1011         Feed(feeder, 1);
1012         if (value == 1)
1013             ThrowTestException(1);
1014     }
1015 }; // class ParForEachBodyWithThrowingFeederTasks
1016 
1017 // Test exception in task, which was added by feeder.
1018 template <class Iterator>
Test5_parallel_for_each()1019 void Test5_parallel_for_each () {
1020     ResetGlobals();
1021     auto range = adaptive_range<Iterator>(get_iter_range_size());
1022     g_Master = std::this_thread::get_id();
1023     TRY();
1024         tbb::parallel_for_each(std::begin(range), std::end(range), ParForEachBodyWithThrowingFeederTasks());
1025     CATCH();
1026     if (g_SolitaryException) {
1027         // Failure occurs when g_ExceptionInMaster is false, but all the 1 values in the range
1028         // are handled by the external thread.  In this case no throw occurs.
1029         REQUIRE_MESSAGE ((l_ExceptionCaughtAtCurrentLevel               // we saw an exception
1030                 || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow)  // non-external trhead throws but none tried
1031                 || (g_ExceptionInMaster && !g_MasterExecutedThrow))     // external thread throws but external thread didn't try
1032                 , "At least one exception should occur");
1033     }
1034 } // void Test5_parallel_for_each ()
1035 
1036 //! Testing parallel_for_each exception handling
1037 //! \brief \ref error_guessing
1038 TEST_CASE("parallel_for_each exception handling test #1") {
1039     for (auto concurrency_level: utils::concurrency_range()) {
1040         g_NumThreads = static_cast<int>(concurrency_level);
1041         g_Master = std::this_thread::get_id();
1042         if (g_NumThreads > 1) {
1043             tbb::task_arena a(g_NumThreads);
__anone85726900a02null1044             a.execute([] {
1045                 // Execute in all the possible modes
1046                 for (size_t j = 0; j < 4; ++j) {
1047                     g_ExceptionInMaster = (j & 1) != 0;
1048                     g_SolitaryException = (j & 2) != 0;
1049 
1050                     RunWithSimpleBody(Test1_parallel_for_each, SimpleParForEachBody);
1051                 }
1052             });
1053         }
1054     }
1055 }
1056 
1057 //! Testing parallel_for_each exception handling
1058 //! \brief \ref error_guessing
1059 TEST_CASE("parallel_for_each exception handling test #2") {
1060     for (auto concurrency_level: utils::concurrency_range()) {
1061         g_NumThreads = static_cast<int>(concurrency_level);
1062         g_Master = std::this_thread::get_id();
1063         if (g_NumThreads > 1) {
1064             tbb::task_arena a(g_NumThreads);
__anone85726900b02null1065             a.execute([] {
1066                 // Execute in all the possible modes
1067                 for (size_t j = 0; j < 4; ++j) {
1068                     g_ExceptionInMaster = (j & 1) != 0;
1069                     g_SolitaryException = (j & 2) != 0;
1070 
1071                     RunWithTemplatedBody(Test2_parallel_for_each, OuterParForEachBody);
1072                 }
1073             });
1074         }
1075     }
1076 }
1077 
1078 //! Testing parallel_for_each exception handling
1079 //! \brief \ref error_guessing
1080 TEST_CASE("parallel_for_each exception handling test #3") {
1081     for (auto concurrency_level: utils::concurrency_range()) {
1082         g_NumThreads = static_cast<int>(concurrency_level);
1083         g_Master = std::this_thread::get_id();
1084         if (g_NumThreads > 1) {
1085             tbb::task_arena a(g_NumThreads);
__anone85726900c02null1086             a.execute([] {
1087                 // Execute in all the possible modes
1088                 for (size_t j = 0; j < 4; ++j) {
1089                     g_ExceptionInMaster = (j & 1) != 0;
1090                     g_SolitaryException = (j & 2) != 0;
1091 
1092                     RunWithTemplatedBody(Test3_parallel_for_each, OuterParForEachBodyWithIsolatedCtx);
1093                 }
1094             });
1095         }
1096     }
1097 }
1098 
1099 //! Testing parallel_for_each exception handling
1100 //! \brief \ref error_guessing
1101 TEST_CASE("parallel_for_each exception handling test #4") {
1102     for (auto concurrency_level: utils::concurrency_range()) {
1103         g_NumThreads = static_cast<int>(concurrency_level);
1104         g_Master = std::this_thread::get_id();
1105         if (g_NumThreads > 1) {
1106             tbb::task_arena a(g_NumThreads);
__anone85726900d02null1107             a.execute([] {
1108                 // Execute in all the possible modes
1109                 for (size_t j = 0; j < 4; ++j) {
1110                     g_ExceptionInMaster = (j & 1) != 0;
1111                     g_SolitaryException = (j & 2) != 0;
1112 
1113                     RunWithTemplatedBody(Test4_parallel_for_each, OuterParForEachWithEhBody);
1114                 }
1115             });
1116         }
1117     }
1118 }
1119 
1120 //! Testing parallel_for_each exception handling
1121 //! \brief \ref error_guessing
1122 TEST_CASE("parallel_for_each exception handling test #5") {
1123     for (auto concurrency_level: utils::concurrency_range()) {
1124         g_NumThreads = static_cast<int>(concurrency_level);
1125         g_Master = std::this_thread::get_id();
1126         if (g_NumThreads > 1) {
1127             tbb::task_arena a(g_NumThreads);
__anone85726900e02null1128             a.execute([] {
1129                 // Execute in all the possible modes
1130                 for (size_t j = 0; j < 4; ++j) {
1131                     g_ExceptionInMaster = (j & 1) != 0;
1132                     g_SolitaryException = (j & 2) != 0;
1133 
1134                     Test5_parallel_for_each<utils::InputIterator<size_t> >();
1135                     Test5_parallel_for_each<utils::ForwardIterator<size_t> >();
1136                     Test5_parallel_for_each<utils::RandomIterator<size_t> >();
1137                 }
1138             });
1139         }
1140     }
1141 }
1142 
1143 #endif /* TBB_USE_EXCEPTIONS */
1144 
1145 class ParForEachBodyToCancel {
1146 public:
operator ()(size_t &) const1147     void operator()( size_t& /*value*/ ) const {
1148         ++g_CurExecuted;
1149         Cancellator::WaitUntilReady();
1150     }
1151 };
1152 
1153 class ParForEachBodyToCancelWithFeeder : ParForEachBodyToCancel {
1154 public:
operator ()(size_t & value,tbb::feeder<size_t> & feeder) const1155     void operator()( size_t& value, tbb::feeder<size_t> &feeder ) const {
1156         Feed(feeder, 0);
1157         ParForEachBodyToCancel::operator()(value);
1158     }
1159 };
1160 
1161 template<class B, class Iterator>
1162 class ParForEachWorker {
1163     tbb::task_group_context &my_ctx;
1164 public:
operator ()() const1165     void operator()() const {
1166         auto range = adaptive_range<Iterator>(get_iter_range_size());
1167         tbb::parallel_for_each( std::begin(range), std::end(range), B(), my_ctx );
1168     }
1169 
ParForEachWorker(tbb::task_group_context & ctx)1170     ParForEachWorker ( tbb::task_group_context& ctx ) : my_ctx(ctx) {}
1171 };
1172 
1173 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm).
1174 template <class Iterator, class body_to_cancel>
TestCancelation1_parallel_for_each()1175 void TestCancelation1_parallel_for_each () {
1176     ResetGlobals( false );
1177     // Threshold should leave more than max_threads tasks to test the cancellation. Set the threshold to iter_range_size()/4 since iter_range_size >= max_threads*2
1178     intptr_t threshold = get_iter_range_size() / 4;
1179     REQUIRE_MESSAGE(get_iter_range_size() - threshold > g_NumThreads, "Threshold should leave more than max_threads tasks to test the cancellation.");
1180     tbb::task_group tg;
1181     tbb::task_group_context  ctx;
1182     Cancellator cancellator(ctx, threshold);
1183     ParForEachWorker<body_to_cancel, Iterator> worker(ctx);
1184     tg.run( cancellator );
1185     utils::yield();
1186     tg.run( worker );
1187     TRY();
1188         tg.wait();
1189     CATCH_AND_FAIL();
1190     REQUIRE_MESSAGE (g_CurExecuted < g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks were executed after cancellation");
1191 }
1192 
1193 class ParForEachBodyToCancel2 {
1194 public:
operator ()(size_t &) const1195     void operator()( size_t& /*value*/ ) const {
1196         ++g_CurExecuted;
1197         utils::ConcurrencyTracker ct;
1198         // The test will hang (and be timed out by the test system) if is_cancelled() is broken
1199         while( !tbb::is_current_task_group_canceling() )
1200             utils::yield();
1201     }
1202 };
1203 
1204 class ParForEachBodyToCancel2WithFeeder : ParForEachBodyToCancel2 {
1205 public:
operator ()(size_t & value,tbb::feeder<size_t> & feeder) const1206     void operator()( size_t& value, tbb::feeder<size_t> &feeder ) const {
1207         Feed(feeder, 0);
1208         ParForEachBodyToCancel2::operator()(value);
1209     }
1210 };
1211 
1212 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm).
1213 /** This version also tests tbb::is_current_task_group_canceling() method. **/
1214 template <class Iterator, class body_to_cancel>
TestCancelation2_parallel_for_each()1215 void TestCancelation2_parallel_for_each () {
1216     ResetGlobals();
1217     RunCancellationTest<ParForEachWorker<body_to_cancel, Iterator>, Cancellator2>();
1218 }
1219 
1220 //! Testing parallel_for_each cancellation test
1221 //! \brief \ref error_guessing
1222 TEST_CASE("parallel_for_each cancellation test #1") {
1223     for (auto concurrency_level: utils::concurrency_range()) {
1224         g_NumThreads = static_cast<int>(concurrency_level);
1225         g_Master = std::this_thread::get_id();
1226         if (g_NumThreads > 1) {
1227             tbb::task_arena a(g_NumThreads);
__anone85726900f02null1228             a.execute([] {
1229                 // Execute in all the possible modes
1230                 for (size_t j = 0; j < 4; ++j) {
1231                     g_ExceptionInMaster = (j & 1) != 0;
1232                     g_SolitaryException = (j & 2) != 0;
1233                     RunWithSimpleBody(TestCancelation1_parallel_for_each, ParForEachBodyToCancel);
1234                 }
1235             });
1236         }
1237     }
1238 }
1239 
1240 //! Testing parallel_for_each cancellation test
1241 //! \brief \ref error_guessing
1242 TEST_CASE("parallel_for_each cancellation test #2") {
1243     for (auto concurrency_level: utils::concurrency_range()) {
1244         g_NumThreads = static_cast<int>(concurrency_level);
1245         g_Master = std::this_thread::get_id();
1246         if (g_NumThreads > 1) {
1247             tbb::task_arena a(g_NumThreads);
__anone85726901002null1248             a.execute([] {
1249                 // Execute in all the possible modes
1250                 for (size_t j = 0; j < 4; ++j) {
1251                     g_ExceptionInMaster = (j & 1) != 0;
1252                     g_SolitaryException = (j & 2) != 0;
1253 
1254                     RunWithSimpleBody(TestCancelation2_parallel_for_each, ParForEachBodyToCancel2);
1255                 }
1256             });
1257         }
1258     }
1259 }
1260 
1261 ////////////////////////////////////////////////////////////////////////////////
1262 // Tests for tbb::parallel_pipeline
1263 ////////////////////////////////////////////////////////////////////////////////
1264 int g_NumTokens = 0;
1265 
1266 // Simple input filter class, it assigns 1 to all array members
1267 // It stops when it receives item equal to -1
1268 class InputFilter {
1269     mutable std::atomic<size_t> m_Item{};
1270     mutable std::vector<size_t> m_Buffer;
1271 public:
InputFilter()1272     InputFilter() : m_Buffer(get_iter_range_size()) {
1273         m_Item = 0;
1274         for (size_t i = 0; i < get_iter_range_size(); ++i )
1275             m_Buffer[i] = 1;
1276     }
InputFilter(const InputFilter & other)1277     InputFilter(const InputFilter& other) : m_Item(other.m_Item.load()), m_Buffer(get_iter_range_size()) {
1278         for (size_t i = 0; i < get_iter_range_size(); ++i )
1279             m_Buffer[i] = other.m_Buffer[i];
1280     }
1281 
operator ()(tbb::flow_control & control) const1282     void* operator()(tbb::flow_control& control) const {
1283         size_t item = m_Item++;
1284         if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true;
1285         else g_NonMasterExecuted = true;
1286         if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment();
1287         if(item == 1) {
1288             ++g_PipelinesStarted;   // count on emitting the first item.
1289         }
1290         if ( item >= get_iter_range_size() ) {
1291             control.stop();
1292             return nullptr;
1293         }
1294         m_Buffer[item] = 1;
1295         return &m_Buffer[item];
1296     }
1297 
buffer()1298     size_t* buffer() { return m_Buffer.data(); }
1299 }; // class InputFilter
1300 
1301 #if TBB_USE_EXCEPTIONS
1302 
1303 // Simple filter with exception throwing.  If parallel, will wait until
1304 // as many parallel filters start as there are threads.
1305 class SimpleFilter {
1306     bool m_canThrow;
1307     bool m_serial;
1308 public:
SimpleFilter(bool canThrow,bool serial)1309     SimpleFilter (bool canThrow, bool serial ) : m_canThrow(canThrow), m_serial(serial) {}
operator ()(void * item) const1310     void* operator()(void* item) const {
1311         ++g_CurExecuted;
1312         if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true;
1313         else g_NonMasterExecuted = true;
1314         if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment();
1315         if ( m_canThrow ) {
1316             if ( !m_serial ) {
1317                 utils::ConcurrencyTracker ct;
1318                 WaitUntilConcurrencyPeaks( std::min(g_NumTokens, g_NumThreads) );
1319             }
1320             ThrowTestException(1);
1321         }
1322         return item;
1323     }
1324 }; // class SimpleFilter
1325 
1326 // This enumeration represents filters order in pipeline
1327 struct FilterSet {
1328     tbb::filter_mode mode1, mode2;
1329     bool throw1, throw2;
1330 
FilterSetFilterSet1331     FilterSet( tbb::filter_mode m1, tbb::filter_mode m2, bool t1, bool t2 )
1332         : mode1(m1), mode2(m2), throw1(t1), throw2(t2)
1333     {}
1334 }; // struct FilterSet
1335 
1336 FilterSet serial_parallel( tbb::filter_mode::serial_in_order, tbb::filter_mode::parallel, /*throw1*/false, /*throw2*/true );
1337 
1338 template<typename InFilter, typename Filter>
1339 class CustomPipeline {
1340     InFilter inputFilter;
1341     Filter filter1;
1342     Filter filter2;
1343     FilterSet my_filters;
1344 public:
CustomPipeline(const FilterSet & filters)1345     CustomPipeline( const FilterSet& filters )
1346         : filter1(filters.throw1, filters.mode1 != tbb::filter_mode::parallel),
1347           filter2(filters.throw2, filters.mode2 != tbb::filter_mode::parallel),
1348           my_filters(filters)
1349     {}
1350 
run()1351     void run () {
1352         tbb::parallel_pipeline(
1353             g_NumTokens,
1354             tbb::make_filter<void, void*>(tbb::filter_mode::parallel, inputFilter) &
1355             tbb::make_filter<void*, void*>(my_filters.mode1, filter1) &
1356             tbb::make_filter<void*, void>(my_filters.mode2, filter2)
1357         );
1358     }
run(tbb::task_group_context & ctx)1359     void run ( tbb::task_group_context& ctx ) {
1360         tbb::parallel_pipeline(
1361             g_NumTokens,
1362             tbb::make_filter<void, void*>(tbb::filter_mode::parallel, inputFilter) &
1363             tbb::make_filter<void*, void*>(my_filters.mode1, filter1) &
1364             tbb::make_filter<void*, void>(my_filters.mode2, filter2),
1365             ctx
1366         );
1367     }
1368 };
1369 
1370 typedef CustomPipeline<InputFilter, SimpleFilter> SimplePipeline;
1371 
1372 // Tests exceptions without nesting
Test1_pipeline(const FilterSet & filters)1373 void Test1_pipeline ( const FilterSet& filters ) {
1374     ResetGlobals();
1375     SimplePipeline testPipeline(filters);
1376     TRY();
1377         testPipeline.run();
1378         if ( g_CurExecuted == 2 * static_cast<int>(get_iter_range_size()) ) {
1379             // all the items were processed, though an exception was supposed to occur.
1380             if(!g_ExceptionInMaster && g_NonMasterExecutedThrow > 0) {
1381                 // if !g_ExceptionInMaster, the external thread is not allowed to throw.
1382                 // if g_nonMasterExcutedThrow > 0 then a thread besides the external thread tried to throw.
1383                 REQUIRE_MESSAGE((filters.mode1 != tbb::filter_mode::parallel && filters.mode2 != tbb::filter_mode::parallel),
1384                     "Unusual count");
1385             }
1386             // In case of all serial filters they might be all executed in the thread(s)
1387             // where exceptions are not allowed by the common test logic. So we just quit.
1388             return;
1389         }
1390     CATCH_AND_ASSERT();
1391     g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception");
1392     REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
1393     REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test");
1394     if ( !g_SolitaryException )
1395         REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
1396 
1397 } // void Test1_pipeline ()
1398 
1399 // Filter with nesting
1400 class OuterFilter {
1401 public:
OuterFilter(bool,bool)1402     OuterFilter ( bool, bool ) {}
1403 
operator ()(void * item) const1404     void* operator()(void* item) const {
1405         ++g_OuterParCalls;
1406         SimplePipeline testPipeline(serial_parallel);
1407         testPipeline.run();
1408         return item;
1409     }
1410 }; // class OuterFilter
1411 
1412 //! Uses pipeline containing an inner pipeline with the default context not wrapped by a try-block.
1413 /** Inner algorithms are spawned inside the new bound context by default. Since
1414     exceptions thrown from the inner pipeline are not handled by the caller
1415     (outer pipeline body) in this test, they will cancel all the sibling inner
1416     algorithms. **/
Test2_pipeline(const FilterSet & filters)1417 void Test2_pipeline ( const FilterSet& filters ) {
1418     ResetGlobals();
1419     g_NestedPipelines = true;
1420     CustomPipeline<InputFilter, OuterFilter> testPipeline(filters);
1421     TRY();
1422         testPipeline.run();
1423     CATCH_AND_ASSERT();
1424     bool okayNoExceptionCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow);
1425     REQUIRE_MESSAGE ((g_NumExceptionsCaught == 1 || okayNoExceptionCaught), "No try_blocks in any body expected in this test");
1426     if ( !g_SolitaryException ) {
1427         REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
1428     }
1429 } // void Test2_pipeline ()
1430 
1431 //! creates isolated inner pipeline and runs it.
1432 class OuterFilterWithIsolatedCtx {
1433 public:
OuterFilterWithIsolatedCtx(bool,bool)1434     OuterFilterWithIsolatedCtx( bool , bool ) {}
1435 
operator ()(void * item) const1436     void* operator()(void* item) const {
1437         ++g_OuterParCalls;
1438         tbb::task_group_context ctx(tbb::task_group_context::isolated);
1439         // create inner pipeline with serial input, parallel output filter, second filter throws
1440         SimplePipeline testPipeline(serial_parallel);
1441         testPipeline.run(ctx);
1442         return item;
1443     }
1444 }; // class OuterFilterWithIsolatedCtx
1445 
1446 //! Uses pipeline invoking an inner pipeline with an isolated context without a try-block.
1447 /** Even though exceptions thrown from the inner pipeline are not handled
1448     by the caller in this test, they will not affect sibling inner algorithms
1449     already running because of the isolated contexts. However because the first
1450     exception cancels the root parallel_for_each only the first g_NumThreads subranges
1451     will be processed (which launch inner pipelines) **/
Test3_pipeline(const FilterSet & filters)1452 void Test3_pipeline ( const FilterSet& filters ) {
1453     for( int nTries = 1; nTries <= 4; ++nTries) {
1454         ResetGlobals();
1455         g_NestedPipelines = true;
1456         g_Master = std::this_thread::get_id();
1457         intptr_t innerCalls = get_iter_range_size(),
1458                  minExecuted = (g_NumThreads - 1) * innerCalls;
1459         CustomPipeline<InputFilter, OuterFilterWithIsolatedCtx> testPipeline(filters);
1460         TRY();
1461             testPipeline.run();
1462         CATCH_AND_ASSERT();
1463 
1464         bool okayNoExceptionCaught = (g_ExceptionInMaster && !g_MasterExecuted) ||
1465             (!g_ExceptionInMaster && !g_NonMasterExecuted);
1466         // only test assertions if the test threw an exception (or we don't care)
1467         bool testSucceeded = okayNoExceptionCaught || g_NumExceptionsCaught > 0;
1468         if(testSucceeded) {
1469             if (g_SolitaryException) {
1470 
1471                 // The test is one outer pipeline with two NestedFilters that each start an inner pipeline.
1472                 // Each time the input filter of a pipeline delivers its first item, it increments
1473                 // g_PipelinesStarted.  When g_SolitaryException, the throw will not occur until
1474                 // g_PipelinesStarted >= 3.  (This is so at least a second pipeline in its own isolated
1475                 // context will start; that is what we're testing.)
1476                 //
1477                 // There are two pipelines which will NOT run to completion when a solitary throw
1478                 // happens in an isolated inner context: the outer pipeline and the pipeline which
1479                 // throws.  All the other pipelines which start should run to completion.  But only
1480                 // inner body invocations are counted.
1481                 //
1482                 // So g_CurExecuted should be about
1483                 //
1484                 //   (2*get_iter_range_size()) * (g_PipelinesStarted - 2) + 1
1485                 //   ^ executions for each completed pipeline
1486                 //                   ^ completing pipelines (remembering two will not complete)
1487                 //                                              ^ one for the inner throwing pipeline
1488 
1489                 minExecuted = (2*get_iter_range_size()) * (g_PipelinesStarted - 2) + 1;
1490                 // each failing pipeline must execute at least two tasks
1491                 REQUIRE_MESSAGE(g_CurExecuted >= minExecuted, "Too few tasks survived exception");
1492                 // no more than g_NumThreads tasks will be executed in a cancelled context.  Otherwise
1493                 // tasks not executing at throw were scheduled.
1494                 g_TGCCancelled.validate(g_NumThreads, "Tasks not in-flight were executed");
1495                 REQUIRE_MESSAGE(g_NumExceptionsCaught == 1, "Should have only one exception");
1496                 // if we're only throwing from the external thread, and that thread didn't
1497                 // participate in the pipelines, then no throw occurred.
1498             }
1499             REQUIRE_MESSAGE ((g_NumExceptionsCaught == 1 || okayNoExceptionCaught), "No try_blocks in any body expected in this test");
1500             REQUIRE_MESSAGE (((g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads) || okayNoExceptionCaught), "Too many tasks survived exception");
1501             return;
1502         }
1503     }
1504 }
1505 
1506 class OuterFilterWithEhBody  {
1507 public:
OuterFilterWithEhBody(bool,bool)1508     OuterFilterWithEhBody( bool, bool ){}
1509 
operator ()(void * item) const1510     void* operator()(void* item) const {
1511         tbb::task_group_context ctx(tbb::task_group_context::isolated);
1512         ++g_OuterParCalls;
1513         SimplePipeline testPipeline(serial_parallel);
1514         TRY();
1515             testPipeline.run(ctx);
1516         CATCH();
1517         return item;
1518     }
1519 }; // class OuterFilterWithEhBody
1520 
1521 //! Uses pipeline body invoking an inner pipeline (with isolated context) inside a try-block.
1522 /** Since exception(s) thrown from the inner pipeline are handled by the caller
1523     in this test, they do not affect other tasks of the the root pipeline
1524     nor sibling inner algorithms. **/
Test4_pipeline(const FilterSet & filters)1525 void Test4_pipeline ( const FilterSet& filters ) {
1526 #if __GNUC__ && !__INTEL_COMPILER
1527     if ( strncmp(__VERSION__, "4.1.0", 5) == 0 ) {
1528         MESSAGE("Known issue: one of exception handling tests is skipped.");
1529         return;
1530     }
1531 #endif
1532     ResetGlobals( true, true );
1533     // each outer pipeline stage will start get_iter_range_size() inner pipelines.
1534     // each inner pipeline that doesn't throw will process get_iter_range_size() items.
1535     // for solitary exception there will be one pipeline that only processes one stage, one item.
1536     // innerCalls should be 2*get_iter_range_size()
1537     intptr_t innerCalls = 2*get_iter_range_size(),
1538              outerCalls = 2*get_iter_range_size(),
1539              maxExecuted = outerCalls * innerCalls;  // the number of invocations of the inner pipelines
1540     CustomPipeline<InputFilter, OuterFilterWithEhBody> testPipeline(filters);
1541     TRY();
1542         testPipeline.run();
1543     CATCH_AND_ASSERT();
1544     intptr_t  minExecuted = 0;
1545     bool okayNoExceptionCaught = (g_ExceptionInMaster && !g_MasterExecuted) ||
1546         (!g_ExceptionInMaster && !g_NonMasterExecuted);
1547     if ( g_SolitaryException ) {
1548         minExecuted = maxExecuted - innerCalls;  // one throwing inner pipeline
1549         REQUIRE_MESSAGE((g_NumExceptionsCaught == 1 || okayNoExceptionCaught), "No exception registered");
1550         g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception");  // probably will assert.
1551     }
1552     else {
1553         // we assume throwing pipelines will not count
1554         minExecuted = (outerCalls - g_NumExceptionsCaught) * innerCalls;
1555         REQUIRE_MESSAGE(((g_NumExceptionsCaught >= 1 && g_NumExceptionsCaught <= outerCalls) || okayNoExceptionCaught), "Unexpected actual number of exceptions");
1556         REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too many executed tasks reported");
1557         // too many already-scheduled tasks are started after the first exception is
1558         // thrown.  And g_ExecutedAtLastCatch is updated every time an exception is caught.
1559         // So with multiple exceptions there are a variable number of tasks that have been
1560         // discarded because of the signals.
1561         // each throw is caught, so we will see many cancelled tasks.  g_ExecutedAtLastCatch is
1562         // updated with each throw, so the value will be the number of tasks executed at the last
1563         REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived multiple exceptions");
1564     }
1565 } // void Test4_pipeline ()
1566 
1567 //! Tests pipeline function passed with different combination of filters
1568 template<void testFunc(const FilterSet&)>
TestWithDifferentFiltersAndConcurrency()1569 void TestWithDifferentFiltersAndConcurrency() {
1570 #if __TBB_USE_ADDRESS_SANITIZER
1571     // parallel_pipeline allocates tls that sporadically observed as a memory leak with
1572     // detached threads. So, use task_scheduler_handle to join threads with finalize
1573     tbb::task_scheduler_handle handle{ tbb::attach{} };
1574 #endif
1575     for (auto concurrency_level: utils::concurrency_range()) {
1576         g_NumThreads = static_cast<int>(concurrency_level);
1577         g_Master = std::this_thread::get_id();
1578         if (g_NumThreads > 1) {
1579 
1580             const tbb::filter_mode modes[] = {
1581                 tbb::filter_mode::parallel,
1582                 tbb::filter_mode::serial_in_order,
1583                 tbb::filter_mode::serial_out_of_order
1584             };
1585 
1586             const int NumFilterTypes = sizeof(modes)/sizeof(modes[0]);
1587 
1588             // Execute in all the possible modes
1589             for ( size_t j = 0; j < 4; ++j ) {
1590                 tbb::task_arena a(g_NumThreads);
1591                 a.execute([&] {
1592                     g_ExceptionInMaster = (j & 1) != 0;
1593                     g_SolitaryException = (j & 2) != 0;
1594                     g_NumTokens = 2 * g_NumThreads;
1595 
1596                     for (int i = 0; i < NumFilterTypes; ++i) {
1597                         for (int n = 0; n < NumFilterTypes; ++n) {
1598                             for (int k = 0; k < 2; ++k)
1599                                 testFunc(FilterSet(modes[i], modes[n], k == 0, k != 0));
1600                         }
1601                     }
1602                 });
1603             }
1604         }
1605     }
1606 #if __TBB_USE_ADDRESS_SANITIZER
1607     tbb::finalize(handle);
1608 #endif
1609 }
1610 
1611 //! Testing parallel_pipeline exception handling
1612 //! \brief \ref error_guessing
1613 TEST_CASE("parallel_pipeline exception handling test #1") {
1614     TestWithDifferentFiltersAndConcurrency<Test1_pipeline>();
1615 }
1616 
1617 //! Testing parallel_pipeline exception handling
1618 //! \brief \ref error_guessing
1619 TEST_CASE("parallel_pipeline exception handling test #2") {
1620     TestWithDifferentFiltersAndConcurrency<Test2_pipeline>();
1621 }
1622 
1623 //! Testing parallel_pipeline exception handling
1624 //! \brief \ref error_guessing
1625 TEST_CASE("parallel_pipeline exception handling test #3") {
1626     TestWithDifferentFiltersAndConcurrency<Test3_pipeline>();
1627 }
1628 
1629 //! Testing parallel_pipeline exception handling
1630 //! \brief \ref error_guessing
1631 TEST_CASE("parallel_pipeline exception handling test #4") {
1632     TestWithDifferentFiltersAndConcurrency<Test4_pipeline>();
1633 }
1634 
1635 #endif /* TBB_USE_EXCEPTIONS */
1636 
1637 class FilterToCancel  {
1638 public:
FilterToCancel()1639     FilterToCancel() {}
operator ()(void * item) const1640     void* operator()(void* item) const {
1641         ++g_CurExecuted;
1642         Cancellator::WaitUntilReady();
1643         return item;
1644     }
1645 }; // class FilterToCancel
1646 
1647 template <class Filter_to_cancel>
1648 class PipelineLauncher {
1649     tbb::task_group_context &my_ctx;
1650 public:
PipelineLauncher(tbb::task_group_context & ctx)1651     PipelineLauncher ( tbb::task_group_context& ctx ) : my_ctx(ctx) {}
1652 
operator ()() const1653     void operator()() const {
1654         // Run test when serial filter is the first non-input filter
1655         InputFilter inputFilter;
1656         Filter_to_cancel filterToCancel;
1657         tbb::parallel_pipeline(
1658             g_NumTokens,
1659             tbb::make_filter<void, void*>(tbb::filter_mode::parallel, inputFilter) &
1660             tbb::make_filter<void*, void>(tbb::filter_mode::parallel, filterToCancel),
1661             my_ctx
1662         );
1663     }
1664 };
1665 
1666 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm).
TestCancelation1_pipeline()1667 void TestCancelation1_pipeline () {
1668     ResetGlobals();
1669     g_ThrowException = false;
1670     // Threshold should leave more than max_threads tasks to test the cancellation. Set the threshold to iter_range_size()/4 since iter_range_size >= max_threads*2
1671     intptr_t threshold = get_iter_range_size() / 4;
1672     REQUIRE_MESSAGE(get_iter_range_size() - threshold > g_NumThreads, "Threshold should leave more than max_threads tasks to test the cancellation.");
1673     RunCancellationTest<PipelineLauncher<FilterToCancel>, Cancellator>(threshold);
1674     g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived cancellation");
1675     REQUIRE_MESSAGE (g_CurExecuted < g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks were executed after cancellation");
1676 }
1677 
1678 class FilterToCancel2  {
1679 public:
FilterToCancel2()1680     FilterToCancel2() {}
1681 
operator ()(void * item) const1682     void* operator()(void* item) const {
1683         ++g_CurExecuted;
1684         utils::ConcurrencyTracker ct;
1685         // The test will hang (and be timed out by the test system) if is_cancelled() is broken
1686         while( !tbb::is_current_task_group_canceling() )
1687             utils::yield();
1688         return item;
1689     }
1690 };
1691 
1692 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm).
1693 /** This version also tests task::is_cancelled() method. **/
TestCancelation2_pipeline()1694 void TestCancelation2_pipeline () {
1695     ResetGlobals();
1696     RunCancellationTest<PipelineLauncher<FilterToCancel2>, Cancellator2>();
1697     // g_CurExecuted is always >= g_ExecutedAtLastCatch, because the latter is always a snapshot of the
1698     // former, and g_CurExecuted is monotonic increasing.  so the comparison should be at least ==.
1699     // If another filter is started after cancel but before cancellation is propagated, then the
1700     // number will be larger.
1701     REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch, "Some tasks were executed after cancellation");
1702 }
1703 
1704 /** If min and max thread numbers specified on the command line are different,
1705     the test is run only for 2 sizes of the thread pool (MinThread and MaxThread)
1706     to be able to test the high and low contention modes while keeping the test reasonably fast **/
1707 
1708 //! Testing parallel_pipeline cancellation
1709 //! \brief \ref error_guessing
1710 TEST_CASE("parallel_pipeline cancellation test #1") {
1711     for (auto concurrency_level: utils::concurrency_range()) {
1712         g_NumThreads = static_cast<int>(concurrency_level);
1713         g_Master = std::this_thread::get_id();
1714         if (g_NumThreads > 1) {
1715             tbb::task_arena a(g_NumThreads);
__anone85726901202null1716             a.execute([] {
1717                 // Execute in all the possible modes
1718                 for (size_t j = 0; j < 4; ++j) {
1719                     g_ExceptionInMaster = (j & 1) != 0;
1720                     g_SolitaryException = (j & 2) != 0;
1721                     g_NumTokens = 2 * g_NumThreads;
1722 
1723                     TestCancelation1_pipeline();
1724                 }
1725             });
1726         }
1727     }
1728 }
1729 
1730 //! Testing parallel_pipeline cancellation
1731 //! \brief \ref error_guessing
1732 TEST_CASE("parallel_pipeline cancellation test #2") {
1733     for (auto concurrency_level: utils::concurrency_range()) {
1734         g_NumThreads = static_cast<int>(concurrency_level);
1735         g_Master = std::this_thread::get_id();
1736         if (g_NumThreads > 1) {
1737             tbb::task_arena a(g_NumThreads);
__anone85726901302null1738             a.execute([] {
1739                 // Execute in all the possible modes
1740                 for (size_t j = 0; j < 4; ++j) {
1741                     g_ExceptionInMaster = (j & 1) != 0;
1742                     g_SolitaryException = (j & 2) != 0;
1743                     g_NumTokens = 2 * g_NumThreads;
1744 
1745                     TestCancelation2_pipeline();
1746                 }
1747             });
1748         }
1749     }
1750 }
1751