1 /*
2 Copyright (c) 2005-2023 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #include "common/test.h"
18 #include "common/concurrency_tracker.h"
19 #include "common/iterator.h"
20 #include "common/utils_concurrency_limit.h"
21
22 #include <limits.h> // for INT_MAX
23 #include <thread>
24 #include <vector>
25
26 #include "tbb/parallel_for.h"
27 #include "tbb/parallel_reduce.h"
28 #include "tbb/parallel_for_each.h"
29 #include "tbb/parallel_pipeline.h"
30 #include "tbb/blocked_range.h"
31 #include "tbb/task_group.h"
32 #include "tbb/concurrent_unordered_map.h"
33 #include "tbb/task.h"
34 #include "tbb/global_control.h"
35
36 //! \file test_eh_algorithms.cpp
37 //! \brief Test for [algorithms.parallel_for algorithms.parallel_reduce algorithms.parallel_deterministic_reduce algorithms.parallel_for_each algorithms.parallel_pipeline algorithms.parallel_pipeline.flow_control] specifications
38
39 #define FLAT_RANGE 100000
40 #define FLAT_GRAIN 100
41 #define OUTER_RANGE 100
42 #define OUTER_GRAIN 10
43 #define INNER_RANGE (FLAT_RANGE / OUTER_RANGE)
44 #define INNER_GRAIN (FLAT_GRAIN / OUTER_GRAIN)
45
46 struct context_specific_counter {
47 tbb::concurrent_unordered_map<tbb::task_group_context*, std::atomic<unsigned>> context_map{};
48
incrementcontext_specific_counter49 void increment() {
50 tbb::task_group_context* ctx = tbb::task::current_context();
51 REQUIRE(ctx != nullptr);
52 context_map[ctx]++;
53 }
54
resetcontext_specific_counter55 void reset() {
56 context_map.clear();
57 }
58
validatecontext_specific_counter59 void validate(unsigned expected_count, const char* msg) {
60 for (auto it = context_map.begin(); it != context_map.end(); it++) {
61 REQUIRE_MESSAGE( it->second <= expected_count, msg);
62 }
63 }
64 };
65
66 std::atomic<intptr_t> g_FedTasksCount{}; // number of tasks added by parallel_for_each feeder
67 std::atomic<intptr_t> g_OuterParCalls{}; // number of actual invocations of the outer construct executed.
68 context_specific_counter g_TGCCancelled{}; // Number of times a task sees its group cancelled at start
69
70 #include "common/exception_handling.h"
71
72 /********************************
73 Variables in test
74
75 __ Test control variables
76 g_ExceptionInMaster -- only the external thread is allowed to throw. If false, the external cannot throw
77 g_SolitaryException -- only one throw may be executed.
78
79 -- controls for ThrowTestException for pipeline tests
80 g_NestedPipelines -- are inner pipelines being run?
81 g_PipelinesStarted -- how many pipelines have run their first filter at least once.
82
83 -- Information variables
84
85 g_Master -- Thread ID of the "external" thread
86 In pipelines sometimes the external thread does not participate, so the tests have to be resilient to this.
87
88 -- Measurement variables
89
90 g_OuterParCalls -- how many outer parallel ranges or filters started
91 g_TGCCancelled -- how many inner parallel ranges or filters saw task::self().is_cancelled()
92 g_ExceptionsThrown -- number of throws executed (counted in ThrowTestException)
93 g_MasterExecutedThrow -- number of times external thread actually executed a throw
94 g_NonMasterExecutedThrow -- number of times non-external thread actually executed a throw
95 g_ExceptionCaught -- one of PropagatedException or unknown exception was caught. (Other exceptions cause assertions.)
96
97 -- Tallies for the task bodies which have executed (counted in each inner body, sampled in ThrowTestException)
98 g_CurExecuted -- total number of inner ranges or filters which executed
99 g_ExecutedAtLastCatch -- value of g_CurExecuted when last catch was made, 0 if none.
100 g_ExecutedAtFirstCatch -- value of g_CurExecuted when first catch is made, 0 if none.
101 *********************************/
102
ResetGlobals(bool throwException=true,bool flog=false)103 inline void ResetGlobals ( bool throwException = true, bool flog = false ) {
104 ResetEhGlobals( throwException, flog );
105 g_FedTasksCount = 0;
106 g_OuterParCalls = 0;
107 g_NestedPipelines = false;
108 g_TGCCancelled.reset();
109 }
110
111 ////////////////////////////////////////////////////////////////////////////////
112 // Tests for tbb::parallel_for and tbb::parallel_reduce
113 ////////////////////////////////////////////////////////////////////////////////
114
115 typedef size_t count_type;
116 typedef tbb::blocked_range<count_type> range_type;
117
CountSubranges(range_type r)118 inline intptr_t CountSubranges(range_type r) {
119 if(!r.is_divisible()) return intptr_t(1);
120 range_type r2(r,tbb::split());
121 return CountSubranges(r) + CountSubranges(r2);
122 }
123
NumSubranges(intptr_t length,intptr_t grain)124 inline intptr_t NumSubranges ( intptr_t length, intptr_t grain ) {
125 return CountSubranges(range_type(0,length,grain));
126 }
127
128 template<class Body>
TestNumSubrangesCalculation(intptr_t length,intptr_t grain,intptr_t inner_length,intptr_t inner_grain)129 intptr_t TestNumSubrangesCalculation ( intptr_t length, intptr_t grain, intptr_t inner_length, intptr_t inner_grain ) {
130 ResetGlobals();
131 g_ThrowException = false;
132 intptr_t outerCalls = NumSubranges(length, grain),
133 innerCalls = NumSubranges(inner_length, inner_grain),
134 maxExecuted = outerCalls * (innerCalls + 1);
135 tbb::parallel_for( range_type(0, length, grain), Body() );
136 REQUIRE_MESSAGE (g_CurExecuted == maxExecuted, "Wrong estimation of bodies invocation count");
137 return maxExecuted;
138 }
139
140 class NoThrowParForBody {
141 public:
operator ()(const range_type & r) const142 void operator()( const range_type& r ) const {
143 if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true;
144 else g_NonMasterExecuted = true;
145 if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment();
146 utils::doDummyWork(r.size());
147 }
148 };
149
150 #if TBB_USE_EXCEPTIONS
151
Test0()152 void Test0 () {
153 ResetGlobals();
154 tbb::simple_partitioner p;
155 for( size_t i=0; i<10; ++i ) {
156 tbb::parallel_for( range_type(0, 0, 1), NoThrowParForBody() );
157 tbb::parallel_for( range_type(0, 0, 1), NoThrowParForBody(), p );
158 tbb::parallel_for( range_type(0, 128, 8), NoThrowParForBody() );
159 tbb::parallel_for( range_type(0, 128, 8), NoThrowParForBody(), p );
160 }
161 } // void Test0 ()
162
163 //! Template that creates a functor suitable for parallel_reduce from a functor for parallel_for.
164 template<typename ParForBody>
165 class SimpleParReduceBody {
166 ParForBody m_Body;
167 public:
168 void operator=(const SimpleParReduceBody&) = delete;
169 SimpleParReduceBody(const SimpleParReduceBody&) = default;
170 SimpleParReduceBody() = default;
171
operator ()(const range_type & r) const172 void operator()( const range_type& r ) const { m_Body(r); }
SimpleParReduceBody(SimpleParReduceBody & left,tbb::split)173 SimpleParReduceBody( SimpleParReduceBody& left, tbb::split ) : m_Body(left.m_Body) {}
join(SimpleParReduceBody &)174 void join( SimpleParReduceBody& /*right*/ ) {}
175 }; // SimpleParReduceBody
176
177 //! Test parallel_for and parallel_reduce for a given partitioner.
178 /** The Body need only be suitable for a parallel_for. */
179 template<typename ParForBody, typename Partitioner>
TestParallelLoopAux()180 void TestParallelLoopAux() {
181 Partitioner partitioner;
182 for( int i=0; i<2; ++i ) {
183 ResetGlobals();
184 TRY();
185 if( i==0 )
186 tbb::parallel_for( range_type(0, FLAT_RANGE, FLAT_GRAIN), ParForBody(), partitioner );
187 else {
188 SimpleParReduceBody<ParForBody> rb;
189 tbb::parallel_reduce( range_type(0, FLAT_RANGE, FLAT_GRAIN), rb, partitioner );
190 }
191 CATCH_AND_ASSERT();
192 // two cases: g_SolitaryException and !g_SolitaryException
193 // 1) g_SolitaryException: only one thread actually threw. There is only one context, so the exception
194 // (when caught) will cause that context to be cancelled. After this event, there may be one or
195 // more threads which are "in-flight", up to g_NumThreads, but no more will be started. The threads,
196 // when they start, if they see they are cancelled, TGCCancelled is incremented.
197 // 2) !g_SolitaryException: more than one thread can throw. The number of threads that actually
198 // threw is g_MasterExecutedThrow if only the external thread is allowed, else g_NonMasterExecutedThrow.
199 // Only one context, so TGCCancelled should be <= g_NumThreads.
200 //
201 // the reasoning is similar for nested algorithms in a single context (Test2).
202 //
203 // If a thread throws in a context, more than one subsequent task body may see the
204 // cancelled state (if they are scheduled before the state is propagated.) this is
205 // infrequent, but it occurs. So what was to be an assertion must be a remark.
206 g_TGCCancelled.validate( g_NumThreads, "Too many tasks ran after exception thrown");
207 REQUIRE_MESSAGE(g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
208 if ( g_SolitaryException ) {
209 REQUIRE_MESSAGE(g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test");
210 REQUIRE_MESSAGE(g_NumExceptionsCaught == (g_ExceptionInMaster ? g_MasterExecutedThrow : g_NonMasterExecutedThrow),
211 "Not all throws were caught");
212 REQUIRE_MESSAGE(g_ExecutedAtFirstCatch == g_ExecutedAtLastCatch, "Too many exceptions occurred");
213 }
214 else {
215 REQUIRE_MESSAGE(g_NumExceptionsCaught >= 1, "No try blocks in any body expected in this test");
216 }
217 }
218 } // TestParallelLoopAux
219
220 //! Test with parallel_for and parallel_reduce, over all three kinds of partitioners.
221 /** The Body only needs to be suitable for tbb::parallel_for. */
222 template<typename Body>
TestParallelLoop()223 void TestParallelLoop() {
224 // The simple and auto partitioners should be const, but not the affinity partitioner.
225 TestParallelLoopAux<Body, const tbb::simple_partitioner >();
226 TestParallelLoopAux<Body, const tbb::auto_partitioner >();
227 #define __TBB_TEMPORARILY_DISABLED 1
228 #if !__TBB_TEMPORARILY_DISABLED
229 // TODO: Improve the test so that it tolerates delayed start of tasks with affinity_partitioner
230 TestParallelLoopAux<Body, /***/ tbb::affinity_partitioner>();
231 #endif
232 #undef __TBB_TEMPORARILY_DISABLED
233 }
234
235 class SimpleParForBody {
236 public:
237 void operator=(const SimpleParForBody&) = delete;
238 SimpleParForBody(const SimpleParForBody&) = default;
239 SimpleParForBody() = default;
240
operator ()(const range_type & r) const241 void operator()( const range_type& r ) const {
242 utils::ConcurrencyTracker ct;
243 ++g_CurExecuted;
244 if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true;
245 else g_NonMasterExecuted = true;
246 if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment();
247 utils::doDummyWork(r.size());
248 WaitUntilConcurrencyPeaks();
249 ThrowTestException(1);
250 }
251 };
252
Test1()253 void Test1() {
254 // non-nested parallel_for/reduce with throwing body, one context
255 TestParallelLoop<SimpleParForBody>();
256 } // void Test1 ()
257
258 class OuterParForBody {
259 public:
260 void operator=(const OuterParForBody&) = delete;
261 OuterParForBody(const OuterParForBody&) = default;
262 OuterParForBody() = default;
operator ()(const range_type &) const263 void operator()( const range_type& ) const {
264 utils::ConcurrencyTracker ct;
265 ++g_OuterParCalls;
266 tbb::parallel_for( tbb::blocked_range<size_t>(0, INNER_RANGE, INNER_GRAIN), SimpleParForBody() );
267 }
268 };
269
270 //! Uses parallel_for body containing an inner parallel_for with the default context not wrapped by a try-block.
271 /** Inner algorithms are spawned inside the new bound context by default. Since
272 exceptions thrown from the inner parallel_for are not handled by the caller
273 (outer parallel_for body) in this test, they will cancel all the sibling inner
274 algorithms. **/
Test2()275 void Test2 () {
276 TestParallelLoop<OuterParForBody>();
277 } // void Test2 ()
278
279 class OuterParForBodyWithIsolatedCtx {
280 public:
operator ()(const range_type &) const281 void operator()( const range_type& ) const {
282 tbb::task_group_context ctx(tbb::task_group_context::isolated);
283 ++g_OuterParCalls;
284 tbb::parallel_for( tbb::blocked_range<size_t>(0, INNER_RANGE, INNER_GRAIN), SimpleParForBody(), tbb::simple_partitioner(), ctx );
285 }
286 };
287
288 //! Uses parallel_for body invoking an inner parallel_for with an isolated context without a try-block.
289 /** Even though exceptions thrown from the inner parallel_for are not handled
290 by the caller in this test, they will not affect sibling inner algorithms
291 already running because of the isolated contexts. However because the first
292 exception cancels the root parallel_for only the first g_NumThreads subranges
293 will be processed (which launch inner parallel_fors) **/
Test3()294 void Test3 () {
295 ResetGlobals();
296 typedef OuterParForBodyWithIsolatedCtx body_type;
297 intptr_t innerCalls = NumSubranges(INNER_RANGE, INNER_GRAIN),
298 // we expect one thread to throw without counting, the rest to run to completion
299 // this formula assumes g_numThreads outer pfor ranges will be started, but that is not the
300 // case; the SimpleParFor subranges are started up as part of the outer ones, and when
301 // the amount of concurrency reaches g_NumThreads no more outer Pfor ranges are started.
302 // so we have to count the number of outer Pfors actually started.
303 minExecuted = (g_NumThreads - 1) * innerCalls;
304 TRY();
305 tbb::parallel_for( range_type(0, OUTER_RANGE, OUTER_GRAIN), body_type() );
306 CATCH_AND_ASSERT();
307 minExecuted = (g_OuterParCalls - 1) * innerCalls; // see above
308
309 // The first formula above assumes all ranges of the outer parallel for are executed, and one
310 // cancels. In the event, we have a smaller number of ranges that start before the exception
311 // is caught.
312 //
313 // g_SolitaryException:One inner range throws. Outer parallel_For is cancelled, but sibling
314 // parallel_fors continue to completion (unless the threads that execute
315 // are not allowed to throw, in which case we will not see any exceptions).
316 // !g_SolitaryException:multiple inner ranges may throw. Any which throws will stop, and the
317 // corresponding range of the outer pfor will stop also.
318 //
319 // In either case, once the outer pfor gets the exception it will stop executing further ranges.
320
321 // if the only threads executing were not allowed to throw, then not seeing an exception is okay.
322 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecuted) || (!g_ExceptionInMaster && !g_NonMasterExecuted);
323 if ( g_SolitaryException ) {
324 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception");
325 REQUIRE_MESSAGE (g_CurExecuted > minExecuted, "Too few tasks survived exception");
326 REQUIRE_MESSAGE ((g_CurExecuted <= minExecuted + (g_ExecutedAtLastCatch + g_NumThreads)), "Too many tasks survived exception");
327 REQUIRE_MESSAGE ((g_NumExceptionsCaught == 1 || okayNoExceptionsCaught), "No try_blocks in any body expected in this test");
328 }
329 else {
330 REQUIRE_MESSAGE ((g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads), "Too many tasks survived exception");
331 REQUIRE_MESSAGE ((g_NumExceptionsCaught >= 1 || okayNoExceptionsCaught), "No try_blocks in any body expected in this test");
332 }
333 } // void Test3 ()
334
335 class OuterParForExceptionSafeBody {
336 public:
operator ()(const range_type &) const337 void operator()( const range_type& ) const {
338 tbb::task_group_context ctx(tbb::task_group_context::isolated);
339 ++g_OuterParCalls;
340 TRY();
341 tbb::parallel_for( tbb::blocked_range<size_t>(0, INNER_RANGE, INNER_GRAIN), SimpleParForBody(), tbb::simple_partitioner(), ctx );
342 CATCH(); // this macro sets g_ExceptionCaught
343 }
344 };
345
346 //! Uses parallel_for body invoking an inner parallel_for (with isolated context) inside a try-block.
347 /** Since exception(s) thrown from the inner parallel_for are handled by the caller
348 in this test, they do not affect neither other tasks of the the root parallel_for
349 nor sibling inner algorithms. **/
Test4()350 void Test4 () {
351 ResetGlobals( true, true );
352 intptr_t innerCalls = NumSubranges(INNER_RANGE, INNER_GRAIN),
353 outerCalls = NumSubranges(OUTER_RANGE, OUTER_GRAIN);
354 TRY();
355 tbb::parallel_for( range_type(0, OUTER_RANGE, OUTER_GRAIN), OuterParForExceptionSafeBody() );
356 CATCH();
357 // g_SolitaryException : one inner pfor will throw, the rest will execute to completion.
358 // so the count should be (outerCalls -1) * innerCalls, if a throw happened.
359 // !g_SolitaryException : possible multiple inner pfor throws. Should be approximately
360 // (outerCalls - g_NumExceptionsCaught) * innerCalls, give or take a few
361 intptr_t minExecuted = (outerCalls - g_NumExceptionsCaught) * innerCalls;
362 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecuted) || (!g_ExceptionInMaster && !g_NonMasterExecuted);
363 if ( g_SolitaryException ) {
364 // only one task had exception thrown. That task had at least one execution (the one that threw).
365 // There may be an arbitrary number of ranges executed after the throw but before the exception
366 // is caught in the scheduler and cancellation is signaled. (seen 9, 11 and 62 (!) for 8 threads)
367 REQUIRE_MESSAGE ((g_NumExceptionsCaught == 1 || okayNoExceptionsCaught), "No exception registered");
368 REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too few tasks executed");
369 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception");
370 // a small number of threads can execute in a throwing sub-pfor, if the task which is
371 // to do the solitary throw swaps out after registering its intent to throw but before it
372 // actually does so. As a result, the number of extra tasks cannot exceed the number of thread
373 // for each nested pfor invication)
374 REQUIRE_MESSAGE (g_CurExecuted <= minExecuted + (g_ExecutedAtLastCatch + g_NumThreads), "Too many tasks survived exception");
375 }
376 else {
377 REQUIRE_MESSAGE (((g_NumExceptionsCaught >= 1 && g_NumExceptionsCaught <= outerCalls) || okayNoExceptionsCaught), "Unexpected actual number of exceptions");
378 REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too few executed tasks reported");
379 REQUIRE_MESSAGE ((g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads), "Too many tasks survived multiple exceptions");
380 REQUIRE_MESSAGE (g_CurExecuted <= outerCalls * (1 + g_NumThreads), "Too many tasks survived exception");
381 }
382 } // void Test4 ()
383
384 //! Testing parallel_for and parallel_reduce exception handling
385 //! \brief \ref error_guessing
386 TEST_CASE("parallel_for and parallel_reduce exception handling test #0") {
387 for (auto concurrency_level: utils::concurrency_range()) {
388 g_NumThreads = static_cast<int>(concurrency_level);
389 g_Master = std::this_thread::get_id();
390 if (g_NumThreads > 1) {
391 tbb::task_arena a(g_NumThreads);
__anone85726900102null392 a.execute([] {
393 // Execute in all the possible modes
394 for (size_t j = 0; j < 4; ++j) {
395 g_ExceptionInMaster = (j & 1) != 0;
396 g_SolitaryException = (j & 2) != 0;
397
398 Test0();
399 }
400 });
401 }
402 }
403 }
404
405 //! Testing parallel_for and parallel_reduce exception handling
406 //! \brief \ref error_guessing
407 TEST_CASE("parallel_for and parallel_reduce exception handling test #1") {
408 for (auto concurrency_level: utils::concurrency_range()) {
409 g_NumThreads = static_cast<int>(concurrency_level);
410 g_Master = std::this_thread::get_id();
411 if (g_NumThreads > 1) {
412 tbb::task_arena a(g_NumThreads);
__anone85726900202null413 a.execute([] {
414 // Execute in all the possible modes
415 for (size_t j = 0; j < 4; ++j) {
416 g_ExceptionInMaster = (j & 1) != 0;
417 g_SolitaryException = (j & 2) != 0;
418
419 Test1();
420 }
421 });
422 }
423 }
424 }
425
426 //! Testing parallel_for and parallel_reduce exception handling
427 //! \brief \ref error_guessing
428 TEST_CASE("parallel_for and parallel_reduce exception handling test #2") {
429 for (auto concurrency_level: utils::concurrency_range()) {
430 g_NumThreads = static_cast<int>(concurrency_level);
431 g_Master = std::this_thread::get_id();
432 if (g_NumThreads > 1) {
433 tbb::task_arena a(g_NumThreads);
__anone85726900302null434 a.execute([] {
435 // Execute in all the possible modes
436 for (size_t j = 0; j < 4; ++j) {
437 g_ExceptionInMaster = (j & 1) != 0;
438 g_SolitaryException = (j & 2) != 0;
439
440 Test2();
441 }
442 });
443 }
444 }
445 }
446
447 //! Testing parallel_for and parallel_reduce exception handling
448 //! \brief \ref error_guessing
449 TEST_CASE("parallel_for and parallel_reduce exception handling test #3") {
450 for (auto concurrency_level: utils::concurrency_range()) {
451 g_NumThreads = static_cast<int>(concurrency_level);
452 g_Master = std::this_thread::get_id();
453 if (g_NumThreads > 1) {
454 tbb::task_arena a(g_NumThreads);
__anone85726900402null455 a.execute([] {
456 // Execute in all the possible modes
457 for (size_t j = 0; j < 4; ++j) {
458 g_ExceptionInMaster = (j & 1) != 0;
459 g_SolitaryException = (j & 2) != 0;
460
461 Test3();
462 }
463 });
464 }
465 }
466 }
467
468 //! Testing parallel_for and parallel_reduce exception handling
469 //! \brief \ref error_guessing
470 TEST_CASE("parallel_for and parallel_reduce exception handling test #4") {
471 for (auto concurrency_level: utils::concurrency_range()) {
472 g_NumThreads = static_cast<int>(concurrency_level);
473 g_Master = std::this_thread::get_id();
474 if (g_NumThreads > 1) {
475 tbb::task_arena a(g_NumThreads);
__anone85726900502null476 a.execute([] {
477 // Execute in all the possible modes
478 for (size_t j = 0; j < 4; ++j) {
479 g_ExceptionInMaster = (j & 1) != 0;
480 g_SolitaryException = (j & 2) != 0;
481
482 Test4();
483 }
484 });
485 }
486 }
487 }
488
489 #endif /* TBB_USE_EXCEPTIONS */
490
491 class ParForBodyToCancel {
492 public:
operator ()(const range_type &) const493 void operator()( const range_type& ) const {
494 ++g_CurExecuted;
495 Cancellator::WaitUntilReady();
496 }
497 };
498
499 template<class B>
500 class ParForLauncher {
501 tbb::task_group_context &my_ctx;
502 public:
operator ()() const503 void operator()() const {
504 tbb::parallel_for( range_type(0, FLAT_RANGE, FLAT_GRAIN), B(), tbb::simple_partitioner(), my_ctx );
505 }
ParForLauncher(tbb::task_group_context & ctx)506 ParForLauncher ( tbb::task_group_context& ctx ) : my_ctx(ctx) {}
507 };
508
509 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm).
TestCancelation1()510 void TestCancelation1 () {
511 ResetGlobals( false );
512 RunCancellationTest<ParForLauncher<ParForBodyToCancel>, Cancellator>( NumSubranges(FLAT_RANGE, FLAT_GRAIN) / 4 );
513 }
514
515 class Cancellator2 {
516 tbb::task_group_context &m_GroupToCancel;
517
518 public:
operator ()() const519 void operator()() const {
520 utils::ConcurrencyTracker ct;
521 WaitUntilConcurrencyPeaks();
522 m_GroupToCancel.cancel_group_execution();
523 g_ExecutedAtLastCatch = g_CurExecuted.load();
524 }
525
Cancellator2(tbb::task_group_context & ctx,intptr_t)526 Cancellator2 ( tbb::task_group_context& ctx, intptr_t ) : m_GroupToCancel(ctx) {}
527 };
528
529 class ParForBodyToCancel2 {
530 public:
operator ()(const range_type &) const531 void operator()( const range_type& ) const {
532 ++g_CurExecuted;
533 utils::ConcurrencyTracker ct;
534 // The test will hang (and be timed out by the test system) if is_cancelled() is broken
535 while( !tbb::is_current_task_group_canceling() )
536 utils::yield();
537 }
538 };
539
540 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm).
541 /** This version also tests tbb::is_current_task_group_canceling() method. **/
TestCancelation2()542 void TestCancelation2 () {
543 ResetGlobals();
544 RunCancellationTest<ParForLauncher<ParForBodyToCancel2>, Cancellator2>();
545 REQUIRE_MESSAGE (g_ExecutedAtLastCatch < g_NumThreads, "Somehow worker tasks started their execution before the cancellator task");
546 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived cancellation");
547 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Some tasks were executed after cancellation");
548 }
549
550 ////////////////////////////////////////////////////////////////////////////////
551 // Regression test based on the contribution by the author of the following forum post:
552 // http://softwarecommunity.intel.com/isn/Community/en-US/forums/thread/30254959.aspx
553
554 class Worker {
555 static const int max_nesting = 3;
556 static const int reduce_range = 1024;
557 static const int reduce_grain = 256;
558 public:
559 int DoWork (int level);
Validate(int start_level)560 int Validate (int start_level) {
561 int expected = 1; // identity for multiplication
562 for(int i=start_level+1; i<max_nesting; ++i)
563 expected *= reduce_range;
564 return expected;
565 }
566 };
567
568 class RecursiveParReduceBodyWithSharedWorker {
569 Worker * m_SharedWorker;
570 int m_NestingLevel;
571 int m_Result;
572 public:
RecursiveParReduceBodyWithSharedWorker(RecursiveParReduceBodyWithSharedWorker & src,tbb::split)573 RecursiveParReduceBodyWithSharedWorker ( RecursiveParReduceBodyWithSharedWorker& src, tbb::split )
574 : m_SharedWorker(src.m_SharedWorker)
575 , m_NestingLevel(src.m_NestingLevel)
576 , m_Result(0)
577 {}
RecursiveParReduceBodyWithSharedWorker(Worker * w,int outer)578 RecursiveParReduceBodyWithSharedWorker ( Worker *w, int outer )
579 : m_SharedWorker(w)
580 , m_NestingLevel(outer)
581 , m_Result(0)
582 {}
583
operator ()(const tbb::blocked_range<size_t> & r)584 void operator() ( const tbb::blocked_range<size_t>& r ) {
585 if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true;
586 else g_NonMasterExecuted = true;
587 if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment();
588 for (size_t i = r.begin (); i != r.end (); ++i) {
589 m_Result += m_SharedWorker->DoWork (m_NestingLevel);
590 }
591 }
join(const RecursiveParReduceBodyWithSharedWorker & x)592 void join (const RecursiveParReduceBodyWithSharedWorker & x) {
593 m_Result += x.m_Result;
594 }
result()595 int result () { return m_Result; }
596 };
597
DoWork(int level)598 int Worker::DoWork ( int level ) {
599 ++level;
600 if ( level < max_nesting ) {
601 RecursiveParReduceBodyWithSharedWorker rt (this, level);
602 tbb::parallel_reduce (tbb::blocked_range<size_t>(0, reduce_range, reduce_grain), rt);
603 return rt.result();
604 }
605 else
606 return 1;
607 }
608
609 //! Regression test for hanging that occurred with the first version of cancellation propagation
TestCancelation3()610 void TestCancelation3 () {
611 Worker w;
612 int result = w.DoWork (0);
613 int expected = w.Validate(0);
614 REQUIRE_MESSAGE ( result == expected, "Wrong calculation result");
615 }
616
617 struct StatsCounters {
618 std::atomic<size_t> my_total_created;
619 std::atomic<size_t> my_total_deleted;
StatsCountersStatsCounters620 StatsCounters() {
621 my_total_created = 0;
622 my_total_deleted = 0;
623 }
624 };
625
626 class ParReduceBody {
627 StatsCounters* my_stats;
628 size_t my_id;
629 bool my_exception;
630 tbb::task_group_context& tgc;
631
632 public:
ParReduceBody(StatsCounters & s_,tbb::task_group_context & context,bool e_)633 ParReduceBody( StatsCounters& s_, tbb::task_group_context& context, bool e_ ) :
634 my_stats(&s_), my_exception(e_), tgc(context) {
635 my_id = my_stats->my_total_created++;
636 }
637
ParReduceBody(const ParReduceBody & lhs)638 ParReduceBody( const ParReduceBody& lhs ) : tgc(lhs.tgc) {
639 my_stats = lhs.my_stats;
640 my_id = my_stats->my_total_created++;
641 }
642
ParReduceBody(ParReduceBody & lhs,tbb::split)643 ParReduceBody( ParReduceBody& lhs, tbb::split ) : tgc(lhs.tgc) {
644 my_stats = lhs.my_stats;
645 my_id = my_stats->my_total_created++;
646 }
647
~ParReduceBody()648 ~ParReduceBody(){ ++my_stats->my_total_deleted; }
649
operator ()(const tbb::blocked_range<std::size_t> &) const650 void operator()( const tbb::blocked_range<std::size_t>& /*range*/ ) const {
651 //Do nothing, except for one task (chosen arbitrarily)
652 if( my_id >= 12 ) {
653 if( my_exception )
654 ThrowTestException(1);
655 else
656 tgc.cancel_group_execution();
657 }
658 }
659
join(ParReduceBody &)660 void join( ParReduceBody& /*rhs*/ ) {}
661 };
662
TestCancelation4()663 void TestCancelation4() {
664 StatsCounters statsObj;
665 #if TBB_USE_EXCEPTIONS
666 try
667 #endif
668 {
669 tbb::task_group_context tgc1, tgc2;
670 ParReduceBody body_for_cancellation(statsObj, tgc1, false), body_for_exception(statsObj, tgc2, true);
671 tbb::parallel_reduce( tbb::blocked_range<std::size_t>(0,100000000,100), body_for_cancellation, tbb::simple_partitioner(), tgc1 );
672 tbb::parallel_reduce( tbb::blocked_range<std::size_t>(0,100000000,100), body_for_exception, tbb::simple_partitioner(), tgc2 );
673 }
674 #if TBB_USE_EXCEPTIONS
675 catch(...) {}
676 #endif
677 REQUIRE_MESSAGE ( statsObj.my_total_created==statsObj.my_total_deleted, "Not all parallel_reduce body objects created were reclaimed");
678 }
679
680 //! Testing parallel_for and parallel_reduce cancellation
681 //! \brief \ref error_guessing
682 TEST_CASE("parallel_for and parallel_reduce cancellation test #1") {
683 for (auto concurrency_level: utils::concurrency_range()) {
684 g_NumThreads = static_cast<int>(concurrency_level);
685 g_Master = std::this_thread::get_id();
686 if (g_NumThreads > 1) {
687 tbb::task_arena a(g_NumThreads);
__anone85726900602null688 a.execute([] {
689 // Execute in all the possible modes
690 for (size_t j = 0; j < 4; ++j) {
691 g_ExceptionInMaster = (j & 1) != 0;
692 g_SolitaryException = (j & 2) != 0;
693
694 TestCancelation1();
695 }
696 });
697 }
698 }
699 }
700
701 //! Testing parallel_for and parallel_reduce cancellation
702 //! \brief \ref error_guessing
703 TEST_CASE("parallel_for and parallel_reduce cancellation test #2") {
704 for (auto concurrency_level: utils::concurrency_range()) {
705 g_NumThreads = static_cast<int>(concurrency_level);
706 g_Master = std::this_thread::get_id();
707 if (g_NumThreads > 1) {
708 tbb::task_arena a(g_NumThreads);
__anone85726900702null709 a.execute([] {
710 // Execute in all the possible modes
711 for (size_t j = 0; j < 4; ++j) {
712 g_ExceptionInMaster = (j & 1) != 0;
713 g_SolitaryException = (j & 2) != 0;
714
715 TestCancelation2();
716 }
717 });
718 }
719 }
720 }
721
722 //! Testing parallel_for and parallel_reduce cancellation
723 //! \brief \ref error_guessing
724 TEST_CASE("parallel_for and parallel_reduce cancellation test #3") {
725 for (auto concurrency_level: utils::concurrency_range()) {
726 g_NumThreads = static_cast<int>(concurrency_level);
727 g_Master = std::this_thread::get_id();
728 if (g_NumThreads > 1) {
729 tbb::task_arena a(g_NumThreads);
__anone85726900802null730 a.execute([] {
731 // Execute in all the possible modes
732 for (size_t j = 0; j < 4; ++j) {
733 g_ExceptionInMaster = (j & 1) != 0;
734 g_SolitaryException = (j & 2) != 0;
735
736 TestCancelation3();
737 }
738 });
739 }
740 }
741 }
742
743 //! Testing parallel_for and parallel_reduce cancellation
744 //! \brief \ref error_guessing
745 TEST_CASE("parallel_for and parallel_reduce cancellation test #4") {
746 for (auto concurrency_level: utils::concurrency_range()) {
747 g_NumThreads = static_cast<int>(concurrency_level);
748 g_Master = std::this_thread::get_id();
749 if (g_NumThreads > 1) {
750 tbb::task_arena a(g_NumThreads);
__anone85726900902null751 a.execute([] {
752 // Execute in all the possible modes
753 for (size_t j = 0; j < 4; ++j) {
754 g_ExceptionInMaster = (j & 1) != 0;
755 g_SolitaryException = (j & 2) != 0;
756
757 TestCancelation4();
758 }
759 });
760 }
761 }
762 }
763
764 ////////////////////////////////////////////////////////////////////////////////
765 // Tests for tbb::parallel_for_each
766 ////////////////////////////////////////////////////////////////////////////////
767
get_iter_range_size()768 std::size_t get_iter_range_size() {
769 // Set the minimal iteration sequence size to 50 to improve test complexity on small machines
770 return std::max(50, g_NumThreads * 2);
771 }
772
773 template<typename Iterator>
774 struct adaptive_range {
775 std::vector<std::size_t> my_array;
776
adaptive_rangeadaptive_range777 adaptive_range(std::size_t size) : my_array(size + 1) {}
778 using iterator = Iterator;
779
beginadaptive_range780 iterator begin() const {
781 return iterator{&my_array.front()};
782 }
beginadaptive_range783 iterator begin() {
784 return iterator{&my_array.front()};
785 }
endadaptive_range786 iterator end() const {
787 return iterator{&my_array.back()};
788 }
endadaptive_range789 iterator end() {
790 return iterator{&my_array.back()};
791 }
792 };
793
Feed(tbb::feeder<size_t> & feeder,size_t val)794 void Feed ( tbb::feeder<size_t> &feeder, size_t val ) {
795 if (g_FedTasksCount < 50) {
796 ++g_FedTasksCount;
797 feeder.add(val);
798 }
799 }
800
801 #define RunWithSimpleBody(func, body) \
802 func<utils::ForwardIterator<size_t>, body>(); \
803 func<utils::ForwardIterator<size_t>, body##WithFeeder>()
804
805 #define RunWithTemplatedBody(func, body) \
806 func<utils::ForwardIterator<size_t>, body<utils::ForwardIterator<size_t> > >(); \
807 func<utils::ForwardIterator<size_t>, body##WithFeeder<utils::ForwardIterator<size_t> > >()
808
809 #if TBB_USE_EXCEPTIONS
810
811 // Simple functor object with exception
812 class SimpleParForEachBody {
813 public:
operator ()(size_t & value) const814 void operator() ( size_t &value ) const {
815 ++g_CurExecuted;
816 if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true;
817 else g_NonMasterExecuted = true;
818 if( tbb::is_current_task_group_canceling() ) {
819 g_TGCCancelled.increment();
820 }
821 utils::ConcurrencyTracker ct;
822 value += 1000;
823 WaitUntilConcurrencyPeaks();
824 ThrowTestException(1);
825 }
826 };
827
828 // Simple functor object with exception and feeder
829 class SimpleParForEachBodyWithFeeder : SimpleParForEachBody {
830 public:
operator ()(size_t & value,tbb::feeder<size_t> & feeder) const831 void operator() ( size_t &value, tbb::feeder<size_t> &feeder ) const {
832 Feed(feeder, 0);
833 SimpleParForEachBody::operator()(value);
834 }
835 };
836
837 // Tests exceptions without nesting
838 template <class Iterator, class simple_body>
Test1_parallel_for_each()839 void Test1_parallel_for_each () {
840 ResetGlobals();
841 auto range = adaptive_range<Iterator>(get_iter_range_size());
842 TRY();
843 tbb::parallel_for_each(std::begin(range), std::end(range), simple_body() );
844 CATCH_AND_ASSERT();
845 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
846 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived cancellation");
847 REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test");
848 if ( !g_SolitaryException )
849 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
850
851 } // void Test1_parallel_for_each ()
852
853 template <class Iterator>
854 class OuterParForEachBody {
855 public:
operator ()(size_t &) const856 void operator()( size_t& /*value*/ ) const {
857 ++g_OuterParCalls;
858 auto range = adaptive_range<Iterator>(get_iter_range_size());
859 tbb::parallel_for_each(std::begin(range), std::end(range), SimpleParForEachBody());
860 }
861 };
862
863 template <class Iterator>
864 class OuterParForEachBodyWithFeeder : OuterParForEachBody<Iterator> {
865 public:
operator ()(size_t & value,tbb::feeder<size_t> & feeder) const866 void operator()( size_t& value, tbb::feeder<size_t>& feeder ) const {
867 Feed(feeder, 0);
868 OuterParForEachBody<Iterator>::operator()(value);
869 }
870 };
871
872 //! Uses parallel_for_each body containing an inner parallel_for_each with the default context not wrapped by a try-block.
873 /** Inner algorithms are spawned inside the new bound context by default. Since
874 exceptions thrown from the inner parallel_for_each are not handled by the caller
875 (outer parallel_for_each body) in this test, they will cancel all the sibling inner
876 algorithms. **/
877 template <class Iterator, class outer_body>
Test2_parallel_for_each()878 void Test2_parallel_for_each () {
879 ResetGlobals();
880 auto range = adaptive_range<Iterator>(get_iter_range_size());
881 TRY();
882 tbb::parallel_for_each(std::begin(range), std::end(range), outer_body() );
883 CATCH_AND_ASSERT();
884 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
885 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived cancellation");
886 REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test");
887 if ( !g_SolitaryException )
888 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
889 } // void Test2_parallel_for_each ()
890
891 template <class Iterator>
892 class OuterParForEachBodyWithIsolatedCtx {
893 public:
operator ()(size_t &) const894 void operator()( size_t& /*value*/ ) const {
895 tbb::task_group_context ctx(tbb::task_group_context::isolated);
896 ++g_OuterParCalls;
897 auto range = adaptive_range<Iterator>(get_iter_range_size());
898 tbb::parallel_for_each(std::begin(range), std::end(range), SimpleParForEachBody(), ctx);
899 }
900 };
901
902 template <class Iterator>
903 class OuterParForEachBodyWithIsolatedCtxWithFeeder : OuterParForEachBodyWithIsolatedCtx<Iterator> {
904 public:
operator ()(size_t & value,tbb::feeder<size_t> & feeder) const905 void operator()( size_t& value, tbb::feeder<size_t> &feeder ) const {
906 Feed(feeder, 0);
907 OuterParForEachBodyWithIsolatedCtx<Iterator>::operator()(value);
908 }
909 };
910
911 //! Uses parallel_for_each body invoking an inner parallel_for_each with an isolated context without a try-block.
912 /** Even though exceptions thrown from the inner parallel_for_each are not handled
913 by the caller in this test, they will not affect sibling inner algorithms
914 already running because of the isolated contexts. However because the first
915 exception cancels the root parallel_for_each, at most the first g_NumThreads subranges
916 will be processed (which launch inner parallel_for_eachs) **/
917 template <class Iterator, class outer_body>
Test3_parallel_for_each()918 void Test3_parallel_for_each () {
919 ResetGlobals();
920 auto range = adaptive_range<Iterator>(get_iter_range_size());
921 intptr_t innerCalls = get_iter_range_size(),
922 // The assumption here is the same as in outer parallel fors.
923 minExecuted = (g_NumThreads - 1) * innerCalls;
924 g_Master = std::this_thread::get_id();
925 TRY();
926 tbb::parallel_for_each(std::begin(range), std::end(range), outer_body());
927 CATCH_AND_ASSERT();
928 // figure actual number of expected executions given the number of outer PDos started.
929 minExecuted = (g_OuterParCalls - 1) * innerCalls;
930 // one extra thread may run a task that sees cancellation. Infrequent but possible
931 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception");
932 if ( g_SolitaryException ) {
933 REQUIRE_MESSAGE (g_CurExecuted > minExecuted, "Too few tasks survived exception");
934 REQUIRE_MESSAGE (g_CurExecuted <= minExecuted + (g_ExecutedAtLastCatch + g_NumThreads), "Too many tasks survived exception");
935 }
936 REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test");
937 if ( !g_SolitaryException )
938 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
939 } // void Test3_parallel_for_each ()
940
941 template <class Iterator>
942 class OuterParForEachWithEhBody {
943 public:
operator ()(size_t &) const944 void operator()( size_t& /*value*/ ) const {
945 tbb::task_group_context ctx(tbb::task_group_context::isolated);
946 ++g_OuterParCalls;
947 auto range = adaptive_range<Iterator>(get_iter_range_size());
948 TRY();
949 tbb::parallel_for_each(std::begin(range), std::end(range), SimpleParForEachBody(), ctx);
950 CATCH();
951 }
952 };
953
954 template <class Iterator>
955 class OuterParForEachWithEhBodyWithFeeder : OuterParForEachWithEhBody<Iterator> {
956 public:
957 void operator=(const OuterParForEachWithEhBodyWithFeeder&) = delete;
958 OuterParForEachWithEhBodyWithFeeder(const OuterParForEachWithEhBodyWithFeeder&) = default;
959 OuterParForEachWithEhBodyWithFeeder() = default;
operator ()(size_t & value,tbb::feeder<size_t> & feeder) const960 void operator()( size_t &value, tbb::feeder<size_t> &feeder ) const {
961 Feed(feeder, 0);
962 OuterParForEachWithEhBody<Iterator>::operator()(value);
963 }
964 };
965
966 //! Uses parallel_for body invoking an inner parallel_for (with default bound context) inside a try-block.
967 /** Since exception(s) thrown from the inner parallel_for are handled by the caller
968 in this test, they do not affect neither other tasks of the the root parallel_for
969 nor sibling inner algorithms. **/
970 template <class Iterator, class outer_body_with_eh>
Test4_parallel_for_each()971 void Test4_parallel_for_each () {
972 ResetGlobals( true, true );
973 auto range = adaptive_range<Iterator>(get_iter_range_size());
974 g_Master = std::this_thread::get_id();
975 TRY();
976 tbb::parallel_for_each(std::begin(range), std::end(range), outer_body_with_eh());
977 CATCH();
978 REQUIRE_MESSAGE (!l_ExceptionCaughtAtCurrentLevel, "All exceptions must have been handled in the parallel_for_each body");
979 intptr_t innerCalls = get_iter_range_size(),
980 outerCalls = get_iter_range_size() + g_FedTasksCount,
981 maxExecuted = outerCalls * innerCalls,
982 minExecuted = 0;
983 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception");
984 if ( g_SolitaryException ) {
985 minExecuted = maxExecuted - innerCalls;
986 REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No exception registered");
987 REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too few tasks executed");
988 // This test has the same property as Test4 (parallel_for); the exception can be
989 // thrown, but some number of tasks from the outer Pdo can execute after the throw but
990 // before the cancellation is signaled (have seen 36).
991 DOCTEST_WARN_MESSAGE(g_CurExecuted < maxExecuted, "All tasks survived exception. Oversubscription?");
992 }
993 else {
994 minExecuted = g_NumExceptionsCaught;
995 REQUIRE_MESSAGE ((g_NumExceptionsCaught > 1 && g_NumExceptionsCaught <= outerCalls), "Unexpected actual number of exceptions");
996 REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too many executed tasks reported");
997 REQUIRE_MESSAGE (g_CurExecuted < g_ExecutedAtLastCatch + g_NumThreads + outerCalls, "Too many tasks survived multiple exceptions");
998 REQUIRE_MESSAGE (g_CurExecuted <= outerCalls * (1 + g_NumThreads), "Too many tasks survived exception");
999 }
1000 } // void Test4_parallel_for_each ()
1001
1002 // This body throws an exception only if the task was added by feeder
1003 class ParForEachBodyWithThrowingFeederTasks {
1004 public:
1005 //! This form of the function call operator can be used when the body needs to add more work during the processing
operator ()(const size_t & value,tbb::feeder<size_t> & feeder) const1006 void operator() (const size_t &value, tbb::feeder<size_t> &feeder ) const {
1007 ++g_CurExecuted;
1008 if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true;
1009 else g_NonMasterExecuted = true;
1010 if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment();
1011 Feed(feeder, 1);
1012 if (value == 1)
1013 ThrowTestException(1);
1014 }
1015 }; // class ParForEachBodyWithThrowingFeederTasks
1016
1017 // Test exception in task, which was added by feeder.
1018 template <class Iterator>
Test5_parallel_for_each()1019 void Test5_parallel_for_each () {
1020 ResetGlobals();
1021 auto range = adaptive_range<Iterator>(get_iter_range_size());
1022 g_Master = std::this_thread::get_id();
1023 TRY();
1024 tbb::parallel_for_each(std::begin(range), std::end(range), ParForEachBodyWithThrowingFeederTasks());
1025 CATCH();
1026 if (g_SolitaryException) {
1027 // Failure occurs when g_ExceptionInMaster is false, but all the 1 values in the range
1028 // are handled by the external thread. In this case no throw occurs.
1029 REQUIRE_MESSAGE ((l_ExceptionCaughtAtCurrentLevel // we saw an exception
1030 || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) // non-external trhead throws but none tried
1031 || (g_ExceptionInMaster && !g_MasterExecutedThrow)) // external thread throws but external thread didn't try
1032 , "At least one exception should occur");
1033 }
1034 } // void Test5_parallel_for_each ()
1035
1036 //! Testing parallel_for_each exception handling
1037 //! \brief \ref error_guessing
1038 TEST_CASE("parallel_for_each exception handling test #1") {
1039 for (auto concurrency_level: utils::concurrency_range()) {
1040 g_NumThreads = static_cast<int>(concurrency_level);
1041 g_Master = std::this_thread::get_id();
1042 if (g_NumThreads > 1) {
1043 tbb::task_arena a(g_NumThreads);
__anone85726900a02null1044 a.execute([] {
1045 // Execute in all the possible modes
1046 for (size_t j = 0; j < 4; ++j) {
1047 g_ExceptionInMaster = (j & 1) != 0;
1048 g_SolitaryException = (j & 2) != 0;
1049
1050 RunWithSimpleBody(Test1_parallel_for_each, SimpleParForEachBody);
1051 }
1052 });
1053 }
1054 }
1055 }
1056
1057 //! Testing parallel_for_each exception handling
1058 //! \brief \ref error_guessing
1059 TEST_CASE("parallel_for_each exception handling test #2") {
1060 for (auto concurrency_level: utils::concurrency_range()) {
1061 g_NumThreads = static_cast<int>(concurrency_level);
1062 g_Master = std::this_thread::get_id();
1063 if (g_NumThreads > 1) {
1064 tbb::task_arena a(g_NumThreads);
__anone85726900b02null1065 a.execute([] {
1066 // Execute in all the possible modes
1067 for (size_t j = 0; j < 4; ++j) {
1068 g_ExceptionInMaster = (j & 1) != 0;
1069 g_SolitaryException = (j & 2) != 0;
1070
1071 RunWithTemplatedBody(Test2_parallel_for_each, OuterParForEachBody);
1072 }
1073 });
1074 }
1075 }
1076 }
1077
1078 //! Testing parallel_for_each exception handling
1079 //! \brief \ref error_guessing
1080 TEST_CASE("parallel_for_each exception handling test #3") {
1081 for (auto concurrency_level: utils::concurrency_range()) {
1082 g_NumThreads = static_cast<int>(concurrency_level);
1083 g_Master = std::this_thread::get_id();
1084 if (g_NumThreads > 1) {
1085 tbb::task_arena a(g_NumThreads);
__anone85726900c02null1086 a.execute([] {
1087 // Execute in all the possible modes
1088 for (size_t j = 0; j < 4; ++j) {
1089 g_ExceptionInMaster = (j & 1) != 0;
1090 g_SolitaryException = (j & 2) != 0;
1091
1092 RunWithTemplatedBody(Test3_parallel_for_each, OuterParForEachBodyWithIsolatedCtx);
1093 }
1094 });
1095 }
1096 }
1097 }
1098
1099 //! Testing parallel_for_each exception handling
1100 //! \brief \ref error_guessing
1101 TEST_CASE("parallel_for_each exception handling test #4") {
1102 for (auto concurrency_level: utils::concurrency_range()) {
1103 g_NumThreads = static_cast<int>(concurrency_level);
1104 g_Master = std::this_thread::get_id();
1105 if (g_NumThreads > 1) {
1106 tbb::task_arena a(g_NumThreads);
__anone85726900d02null1107 a.execute([] {
1108 // Execute in all the possible modes
1109 for (size_t j = 0; j < 4; ++j) {
1110 g_ExceptionInMaster = (j & 1) != 0;
1111 g_SolitaryException = (j & 2) != 0;
1112
1113 RunWithTemplatedBody(Test4_parallel_for_each, OuterParForEachWithEhBody);
1114 }
1115 });
1116 }
1117 }
1118 }
1119
1120 //! Testing parallel_for_each exception handling
1121 //! \brief \ref error_guessing
1122 TEST_CASE("parallel_for_each exception handling test #5") {
1123 for (auto concurrency_level: utils::concurrency_range()) {
1124 g_NumThreads = static_cast<int>(concurrency_level);
1125 g_Master = std::this_thread::get_id();
1126 if (g_NumThreads > 1) {
1127 tbb::task_arena a(g_NumThreads);
__anone85726900e02null1128 a.execute([] {
1129 // Execute in all the possible modes
1130 for (size_t j = 0; j < 4; ++j) {
1131 g_ExceptionInMaster = (j & 1) != 0;
1132 g_SolitaryException = (j & 2) != 0;
1133
1134 Test5_parallel_for_each<utils::InputIterator<size_t> >();
1135 Test5_parallel_for_each<utils::ForwardIterator<size_t> >();
1136 Test5_parallel_for_each<utils::RandomIterator<size_t> >();
1137 }
1138 });
1139 }
1140 }
1141 }
1142
1143 #endif /* TBB_USE_EXCEPTIONS */
1144
1145 class ParForEachBodyToCancel {
1146 public:
operator ()(size_t &) const1147 void operator()( size_t& /*value*/ ) const {
1148 ++g_CurExecuted;
1149 Cancellator::WaitUntilReady();
1150 }
1151 };
1152
1153 class ParForEachBodyToCancelWithFeeder : ParForEachBodyToCancel {
1154 public:
operator ()(size_t & value,tbb::feeder<size_t> & feeder) const1155 void operator()( size_t& value, tbb::feeder<size_t> &feeder ) const {
1156 Feed(feeder, 0);
1157 ParForEachBodyToCancel::operator()(value);
1158 }
1159 };
1160
1161 template<class B, class Iterator>
1162 class ParForEachWorker {
1163 tbb::task_group_context &my_ctx;
1164 public:
operator ()() const1165 void operator()() const {
1166 auto range = adaptive_range<Iterator>(get_iter_range_size());
1167 tbb::parallel_for_each( std::begin(range), std::end(range), B(), my_ctx );
1168 }
1169
ParForEachWorker(tbb::task_group_context & ctx)1170 ParForEachWorker ( tbb::task_group_context& ctx ) : my_ctx(ctx) {}
1171 };
1172
1173 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm).
1174 template <class Iterator, class body_to_cancel>
TestCancelation1_parallel_for_each()1175 void TestCancelation1_parallel_for_each () {
1176 ResetGlobals( false );
1177 // Threshold should leave more than max_threads tasks to test the cancellation. Set the threshold to iter_range_size()/4 since iter_range_size >= max_threads*2
1178 intptr_t threshold = get_iter_range_size() / 4;
1179 REQUIRE_MESSAGE(get_iter_range_size() - threshold > g_NumThreads, "Threshold should leave more than max_threads tasks to test the cancellation.");
1180 tbb::task_group tg;
1181 tbb::task_group_context ctx;
1182 Cancellator cancellator(ctx, threshold);
1183 ParForEachWorker<body_to_cancel, Iterator> worker(ctx);
1184 tg.run( cancellator );
1185 utils::yield();
1186 tg.run( worker );
1187 TRY();
1188 tg.wait();
1189 CATCH_AND_FAIL();
1190 REQUIRE_MESSAGE (g_CurExecuted < g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks were executed after cancellation");
1191 }
1192
1193 class ParForEachBodyToCancel2 {
1194 public:
operator ()(size_t &) const1195 void operator()( size_t& /*value*/ ) const {
1196 ++g_CurExecuted;
1197 utils::ConcurrencyTracker ct;
1198 // The test will hang (and be timed out by the test system) if is_cancelled() is broken
1199 while( !tbb::is_current_task_group_canceling() )
1200 utils::yield();
1201 }
1202 };
1203
1204 class ParForEachBodyToCancel2WithFeeder : ParForEachBodyToCancel2 {
1205 public:
operator ()(size_t & value,tbb::feeder<size_t> & feeder) const1206 void operator()( size_t& value, tbb::feeder<size_t> &feeder ) const {
1207 Feed(feeder, 0);
1208 ParForEachBodyToCancel2::operator()(value);
1209 }
1210 };
1211
1212 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm).
1213 /** This version also tests tbb::is_current_task_group_canceling() method. **/
1214 template <class Iterator, class body_to_cancel>
TestCancelation2_parallel_for_each()1215 void TestCancelation2_parallel_for_each () {
1216 ResetGlobals();
1217 RunCancellationTest<ParForEachWorker<body_to_cancel, Iterator>, Cancellator2>();
1218 }
1219
1220 //! Testing parallel_for_each cancellation test
1221 //! \brief \ref error_guessing
1222 TEST_CASE("parallel_for_each cancellation test #1") {
1223 for (auto concurrency_level: utils::concurrency_range()) {
1224 g_NumThreads = static_cast<int>(concurrency_level);
1225 g_Master = std::this_thread::get_id();
1226 if (g_NumThreads > 1) {
1227 tbb::task_arena a(g_NumThreads);
__anone85726900f02null1228 a.execute([] {
1229 // Execute in all the possible modes
1230 for (size_t j = 0; j < 4; ++j) {
1231 g_ExceptionInMaster = (j & 1) != 0;
1232 g_SolitaryException = (j & 2) != 0;
1233 RunWithSimpleBody(TestCancelation1_parallel_for_each, ParForEachBodyToCancel);
1234 }
1235 });
1236 }
1237 }
1238 }
1239
1240 //! Testing parallel_for_each cancellation test
1241 //! \brief \ref error_guessing
1242 TEST_CASE("parallel_for_each cancellation test #2") {
1243 for (auto concurrency_level: utils::concurrency_range()) {
1244 g_NumThreads = static_cast<int>(concurrency_level);
1245 g_Master = std::this_thread::get_id();
1246 if (g_NumThreads > 1) {
1247 tbb::task_arena a(g_NumThreads);
__anone85726901002null1248 a.execute([] {
1249 // Execute in all the possible modes
1250 for (size_t j = 0; j < 4; ++j) {
1251 g_ExceptionInMaster = (j & 1) != 0;
1252 g_SolitaryException = (j & 2) != 0;
1253
1254 RunWithSimpleBody(TestCancelation2_parallel_for_each, ParForEachBodyToCancel2);
1255 }
1256 });
1257 }
1258 }
1259 }
1260
1261 ////////////////////////////////////////////////////////////////////////////////
1262 // Tests for tbb::parallel_pipeline
1263 ////////////////////////////////////////////////////////////////////////////////
1264 int g_NumTokens = 0;
1265
1266 // Simple input filter class, it assigns 1 to all array members
1267 // It stops when it receives item equal to -1
1268 class InputFilter {
1269 mutable std::atomic<size_t> m_Item{};
1270 mutable std::vector<size_t> m_Buffer;
1271 public:
InputFilter()1272 InputFilter() : m_Buffer(get_iter_range_size()) {
1273 m_Item = 0;
1274 for (size_t i = 0; i < get_iter_range_size(); ++i )
1275 m_Buffer[i] = 1;
1276 }
InputFilter(const InputFilter & other)1277 InputFilter(const InputFilter& other) : m_Item(other.m_Item.load()), m_Buffer(get_iter_range_size()) {
1278 for (size_t i = 0; i < get_iter_range_size(); ++i )
1279 m_Buffer[i] = other.m_Buffer[i];
1280 }
1281
operator ()(tbb::flow_control & control) const1282 void* operator()(tbb::flow_control& control) const {
1283 size_t item = m_Item++;
1284 if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true;
1285 else g_NonMasterExecuted = true;
1286 if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment();
1287 if(item == 1) {
1288 ++g_PipelinesStarted; // count on emitting the first item.
1289 }
1290 if ( item >= get_iter_range_size() ) {
1291 control.stop();
1292 return nullptr;
1293 }
1294 m_Buffer[item] = 1;
1295 return &m_Buffer[item];
1296 }
1297
buffer()1298 size_t* buffer() { return m_Buffer.data(); }
1299 }; // class InputFilter
1300
1301 #if TBB_USE_EXCEPTIONS
1302
1303 // Simple filter with exception throwing. If parallel, will wait until
1304 // as many parallel filters start as there are threads.
1305 class SimpleFilter {
1306 bool m_canThrow;
1307 bool m_serial;
1308 public:
SimpleFilter(bool canThrow,bool serial)1309 SimpleFilter (bool canThrow, bool serial ) : m_canThrow(canThrow), m_serial(serial) {}
operator ()(void * item) const1310 void* operator()(void* item) const {
1311 ++g_CurExecuted;
1312 if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true;
1313 else g_NonMasterExecuted = true;
1314 if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment();
1315 if ( m_canThrow ) {
1316 if ( !m_serial ) {
1317 utils::ConcurrencyTracker ct;
1318 WaitUntilConcurrencyPeaks( std::min(g_NumTokens, g_NumThreads) );
1319 }
1320 ThrowTestException(1);
1321 }
1322 return item;
1323 }
1324 }; // class SimpleFilter
1325
1326 // This enumeration represents filters order in pipeline
1327 struct FilterSet {
1328 tbb::filter_mode mode1, mode2;
1329 bool throw1, throw2;
1330
FilterSetFilterSet1331 FilterSet( tbb::filter_mode m1, tbb::filter_mode m2, bool t1, bool t2 )
1332 : mode1(m1), mode2(m2), throw1(t1), throw2(t2)
1333 {}
1334 }; // struct FilterSet
1335
1336 FilterSet serial_parallel( tbb::filter_mode::serial_in_order, tbb::filter_mode::parallel, /*throw1*/false, /*throw2*/true );
1337
1338 template<typename InFilter, typename Filter>
1339 class CustomPipeline {
1340 InFilter inputFilter;
1341 Filter filter1;
1342 Filter filter2;
1343 FilterSet my_filters;
1344 public:
CustomPipeline(const FilterSet & filters)1345 CustomPipeline( const FilterSet& filters )
1346 : filter1(filters.throw1, filters.mode1 != tbb::filter_mode::parallel),
1347 filter2(filters.throw2, filters.mode2 != tbb::filter_mode::parallel),
1348 my_filters(filters)
1349 {}
1350
run()1351 void run () {
1352 tbb::parallel_pipeline(
1353 g_NumTokens,
1354 tbb::make_filter<void, void*>(tbb::filter_mode::parallel, inputFilter) &
1355 tbb::make_filter<void*, void*>(my_filters.mode1, filter1) &
1356 tbb::make_filter<void*, void>(my_filters.mode2, filter2)
1357 );
1358 }
run(tbb::task_group_context & ctx)1359 void run ( tbb::task_group_context& ctx ) {
1360 tbb::parallel_pipeline(
1361 g_NumTokens,
1362 tbb::make_filter<void, void*>(tbb::filter_mode::parallel, inputFilter) &
1363 tbb::make_filter<void*, void*>(my_filters.mode1, filter1) &
1364 tbb::make_filter<void*, void>(my_filters.mode2, filter2),
1365 ctx
1366 );
1367 }
1368 };
1369
1370 typedef CustomPipeline<InputFilter, SimpleFilter> SimplePipeline;
1371
1372 // Tests exceptions without nesting
Test1_pipeline(const FilterSet & filters)1373 void Test1_pipeline ( const FilterSet& filters ) {
1374 ResetGlobals();
1375 SimplePipeline testPipeline(filters);
1376 TRY();
1377 testPipeline.run();
1378 if ( g_CurExecuted == 2 * static_cast<int>(get_iter_range_size()) ) {
1379 // all the items were processed, though an exception was supposed to occur.
1380 if(!g_ExceptionInMaster && g_NonMasterExecutedThrow > 0) {
1381 // if !g_ExceptionInMaster, the external thread is not allowed to throw.
1382 // if g_nonMasterExcutedThrow > 0 then a thread besides the external thread tried to throw.
1383 REQUIRE_MESSAGE((filters.mode1 != tbb::filter_mode::parallel && filters.mode2 != tbb::filter_mode::parallel),
1384 "Unusual count");
1385 }
1386 // In case of all serial filters they might be all executed in the thread(s)
1387 // where exceptions are not allowed by the common test logic. So we just quit.
1388 return;
1389 }
1390 CATCH_AND_ASSERT();
1391 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception");
1392 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
1393 REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test");
1394 if ( !g_SolitaryException )
1395 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
1396
1397 } // void Test1_pipeline ()
1398
1399 // Filter with nesting
1400 class OuterFilter {
1401 public:
OuterFilter(bool,bool)1402 OuterFilter ( bool, bool ) {}
1403
operator ()(void * item) const1404 void* operator()(void* item) const {
1405 ++g_OuterParCalls;
1406 SimplePipeline testPipeline(serial_parallel);
1407 testPipeline.run();
1408 return item;
1409 }
1410 }; // class OuterFilter
1411
1412 //! Uses pipeline containing an inner pipeline with the default context not wrapped by a try-block.
1413 /** Inner algorithms are spawned inside the new bound context by default. Since
1414 exceptions thrown from the inner pipeline are not handled by the caller
1415 (outer pipeline body) in this test, they will cancel all the sibling inner
1416 algorithms. **/
Test2_pipeline(const FilterSet & filters)1417 void Test2_pipeline ( const FilterSet& filters ) {
1418 ResetGlobals();
1419 g_NestedPipelines = true;
1420 CustomPipeline<InputFilter, OuterFilter> testPipeline(filters);
1421 TRY();
1422 testPipeline.run();
1423 CATCH_AND_ASSERT();
1424 bool okayNoExceptionCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow);
1425 REQUIRE_MESSAGE ((g_NumExceptionsCaught == 1 || okayNoExceptionCaught), "No try_blocks in any body expected in this test");
1426 if ( !g_SolitaryException ) {
1427 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception");
1428 }
1429 } // void Test2_pipeline ()
1430
1431 //! creates isolated inner pipeline and runs it.
1432 class OuterFilterWithIsolatedCtx {
1433 public:
OuterFilterWithIsolatedCtx(bool,bool)1434 OuterFilterWithIsolatedCtx( bool , bool ) {}
1435
operator ()(void * item) const1436 void* operator()(void* item) const {
1437 ++g_OuterParCalls;
1438 tbb::task_group_context ctx(tbb::task_group_context::isolated);
1439 // create inner pipeline with serial input, parallel output filter, second filter throws
1440 SimplePipeline testPipeline(serial_parallel);
1441 testPipeline.run(ctx);
1442 return item;
1443 }
1444 }; // class OuterFilterWithIsolatedCtx
1445
1446 //! Uses pipeline invoking an inner pipeline with an isolated context without a try-block.
1447 /** Even though exceptions thrown from the inner pipeline are not handled
1448 by the caller in this test, they will not affect sibling inner algorithms
1449 already running because of the isolated contexts. However because the first
1450 exception cancels the root parallel_for_each only the first g_NumThreads subranges
1451 will be processed (which launch inner pipelines) **/
Test3_pipeline(const FilterSet & filters)1452 void Test3_pipeline ( const FilterSet& filters ) {
1453 for( int nTries = 1; nTries <= 4; ++nTries) {
1454 ResetGlobals();
1455 g_NestedPipelines = true;
1456 g_Master = std::this_thread::get_id();
1457 intptr_t innerCalls = get_iter_range_size(),
1458 minExecuted = (g_NumThreads - 1) * innerCalls;
1459 CustomPipeline<InputFilter, OuterFilterWithIsolatedCtx> testPipeline(filters);
1460 TRY();
1461 testPipeline.run();
1462 CATCH_AND_ASSERT();
1463
1464 bool okayNoExceptionCaught = (g_ExceptionInMaster && !g_MasterExecuted) ||
1465 (!g_ExceptionInMaster && !g_NonMasterExecuted);
1466 // only test assertions if the test threw an exception (or we don't care)
1467 bool testSucceeded = okayNoExceptionCaught || g_NumExceptionsCaught > 0;
1468 if(testSucceeded) {
1469 if (g_SolitaryException) {
1470
1471 // The test is one outer pipeline with two NestedFilters that each start an inner pipeline.
1472 // Each time the input filter of a pipeline delivers its first item, it increments
1473 // g_PipelinesStarted. When g_SolitaryException, the throw will not occur until
1474 // g_PipelinesStarted >= 3. (This is so at least a second pipeline in its own isolated
1475 // context will start; that is what we're testing.)
1476 //
1477 // There are two pipelines which will NOT run to completion when a solitary throw
1478 // happens in an isolated inner context: the outer pipeline and the pipeline which
1479 // throws. All the other pipelines which start should run to completion. But only
1480 // inner body invocations are counted.
1481 //
1482 // So g_CurExecuted should be about
1483 //
1484 // (2*get_iter_range_size()) * (g_PipelinesStarted - 2) + 1
1485 // ^ executions for each completed pipeline
1486 // ^ completing pipelines (remembering two will not complete)
1487 // ^ one for the inner throwing pipeline
1488
1489 minExecuted = (2*get_iter_range_size()) * (g_PipelinesStarted - 2) + 1;
1490 // each failing pipeline must execute at least two tasks
1491 REQUIRE_MESSAGE(g_CurExecuted >= minExecuted, "Too few tasks survived exception");
1492 // no more than g_NumThreads tasks will be executed in a cancelled context. Otherwise
1493 // tasks not executing at throw were scheduled.
1494 g_TGCCancelled.validate(g_NumThreads, "Tasks not in-flight were executed");
1495 REQUIRE_MESSAGE(g_NumExceptionsCaught == 1, "Should have only one exception");
1496 // if we're only throwing from the external thread, and that thread didn't
1497 // participate in the pipelines, then no throw occurred.
1498 }
1499 REQUIRE_MESSAGE ((g_NumExceptionsCaught == 1 || okayNoExceptionCaught), "No try_blocks in any body expected in this test");
1500 REQUIRE_MESSAGE (((g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads) || okayNoExceptionCaught), "Too many tasks survived exception");
1501 return;
1502 }
1503 }
1504 }
1505
1506 class OuterFilterWithEhBody {
1507 public:
OuterFilterWithEhBody(bool,bool)1508 OuterFilterWithEhBody( bool, bool ){}
1509
operator ()(void * item) const1510 void* operator()(void* item) const {
1511 tbb::task_group_context ctx(tbb::task_group_context::isolated);
1512 ++g_OuterParCalls;
1513 SimplePipeline testPipeline(serial_parallel);
1514 TRY();
1515 testPipeline.run(ctx);
1516 CATCH();
1517 return item;
1518 }
1519 }; // class OuterFilterWithEhBody
1520
1521 //! Uses pipeline body invoking an inner pipeline (with isolated context) inside a try-block.
1522 /** Since exception(s) thrown from the inner pipeline are handled by the caller
1523 in this test, they do not affect other tasks of the the root pipeline
1524 nor sibling inner algorithms. **/
Test4_pipeline(const FilterSet & filters)1525 void Test4_pipeline ( const FilterSet& filters ) {
1526 #if __GNUC__ && !__INTEL_COMPILER
1527 if ( strncmp(__VERSION__, "4.1.0", 5) == 0 ) {
1528 MESSAGE("Known issue: one of exception handling tests is skipped.");
1529 return;
1530 }
1531 #endif
1532 ResetGlobals( true, true );
1533 // each outer pipeline stage will start get_iter_range_size() inner pipelines.
1534 // each inner pipeline that doesn't throw will process get_iter_range_size() items.
1535 // for solitary exception there will be one pipeline that only processes one stage, one item.
1536 // innerCalls should be 2*get_iter_range_size()
1537 intptr_t innerCalls = 2*get_iter_range_size(),
1538 outerCalls = 2*get_iter_range_size(),
1539 maxExecuted = outerCalls * innerCalls; // the number of invocations of the inner pipelines
1540 CustomPipeline<InputFilter, OuterFilterWithEhBody> testPipeline(filters);
1541 TRY();
1542 testPipeline.run();
1543 CATCH_AND_ASSERT();
1544 intptr_t minExecuted = 0;
1545 bool okayNoExceptionCaught = (g_ExceptionInMaster && !g_MasterExecuted) ||
1546 (!g_ExceptionInMaster && !g_NonMasterExecuted);
1547 if ( g_SolitaryException ) {
1548 minExecuted = maxExecuted - innerCalls; // one throwing inner pipeline
1549 REQUIRE_MESSAGE((g_NumExceptionsCaught == 1 || okayNoExceptionCaught), "No exception registered");
1550 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception"); // probably will assert.
1551 }
1552 else {
1553 // we assume throwing pipelines will not count
1554 minExecuted = (outerCalls - g_NumExceptionsCaught) * innerCalls;
1555 REQUIRE_MESSAGE(((g_NumExceptionsCaught >= 1 && g_NumExceptionsCaught <= outerCalls) || okayNoExceptionCaught), "Unexpected actual number of exceptions");
1556 REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too many executed tasks reported");
1557 // too many already-scheduled tasks are started after the first exception is
1558 // thrown. And g_ExecutedAtLastCatch is updated every time an exception is caught.
1559 // So with multiple exceptions there are a variable number of tasks that have been
1560 // discarded because of the signals.
1561 // each throw is caught, so we will see many cancelled tasks. g_ExecutedAtLastCatch is
1562 // updated with each throw, so the value will be the number of tasks executed at the last
1563 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived multiple exceptions");
1564 }
1565 } // void Test4_pipeline ()
1566
1567 //! Tests pipeline function passed with different combination of filters
1568 template<void testFunc(const FilterSet&)>
TestWithDifferentFiltersAndConcurrency()1569 void TestWithDifferentFiltersAndConcurrency() {
1570 #if __TBB_USE_ADDRESS_SANITIZER
1571 // parallel_pipeline allocates tls that sporadically observed as a memory leak with
1572 // detached threads. So, use task_scheduler_handle to join threads with finalize
1573 tbb::task_scheduler_handle handle{ tbb::attach{} };
1574 #endif
1575 for (auto concurrency_level: utils::concurrency_range()) {
1576 g_NumThreads = static_cast<int>(concurrency_level);
1577 g_Master = std::this_thread::get_id();
1578 if (g_NumThreads > 1) {
1579
1580 const tbb::filter_mode modes[] = {
1581 tbb::filter_mode::parallel,
1582 tbb::filter_mode::serial_in_order,
1583 tbb::filter_mode::serial_out_of_order
1584 };
1585
1586 const int NumFilterTypes = sizeof(modes)/sizeof(modes[0]);
1587
1588 // Execute in all the possible modes
1589 for ( size_t j = 0; j < 4; ++j ) {
1590 tbb::task_arena a(g_NumThreads);
1591 a.execute([&] {
1592 g_ExceptionInMaster = (j & 1) != 0;
1593 g_SolitaryException = (j & 2) != 0;
1594 g_NumTokens = 2 * g_NumThreads;
1595
1596 for (int i = 0; i < NumFilterTypes; ++i) {
1597 for (int n = 0; n < NumFilterTypes; ++n) {
1598 for (int k = 0; k < 2; ++k)
1599 testFunc(FilterSet(modes[i], modes[n], k == 0, k != 0));
1600 }
1601 }
1602 });
1603 }
1604 }
1605 }
1606 #if __TBB_USE_ADDRESS_SANITIZER
1607 tbb::finalize(handle);
1608 #endif
1609 }
1610
1611 //! Testing parallel_pipeline exception handling
1612 //! \brief \ref error_guessing
1613 TEST_CASE("parallel_pipeline exception handling test #1") {
1614 TestWithDifferentFiltersAndConcurrency<Test1_pipeline>();
1615 }
1616
1617 //! Testing parallel_pipeline exception handling
1618 //! \brief \ref error_guessing
1619 TEST_CASE("parallel_pipeline exception handling test #2") {
1620 TestWithDifferentFiltersAndConcurrency<Test2_pipeline>();
1621 }
1622
1623 //! Testing parallel_pipeline exception handling
1624 //! \brief \ref error_guessing
1625 TEST_CASE("parallel_pipeline exception handling test #3") {
1626 TestWithDifferentFiltersAndConcurrency<Test3_pipeline>();
1627 }
1628
1629 //! Testing parallel_pipeline exception handling
1630 //! \brief \ref error_guessing
1631 TEST_CASE("parallel_pipeline exception handling test #4") {
1632 TestWithDifferentFiltersAndConcurrency<Test4_pipeline>();
1633 }
1634
1635 #endif /* TBB_USE_EXCEPTIONS */
1636
1637 class FilterToCancel {
1638 public:
FilterToCancel()1639 FilterToCancel() {}
operator ()(void * item) const1640 void* operator()(void* item) const {
1641 ++g_CurExecuted;
1642 Cancellator::WaitUntilReady();
1643 return item;
1644 }
1645 }; // class FilterToCancel
1646
1647 template <class Filter_to_cancel>
1648 class PipelineLauncher {
1649 tbb::task_group_context &my_ctx;
1650 public:
PipelineLauncher(tbb::task_group_context & ctx)1651 PipelineLauncher ( tbb::task_group_context& ctx ) : my_ctx(ctx) {}
1652
operator ()() const1653 void operator()() const {
1654 // Run test when serial filter is the first non-input filter
1655 InputFilter inputFilter;
1656 Filter_to_cancel filterToCancel;
1657 tbb::parallel_pipeline(
1658 g_NumTokens,
1659 tbb::make_filter<void, void*>(tbb::filter_mode::parallel, inputFilter) &
1660 tbb::make_filter<void*, void>(tbb::filter_mode::parallel, filterToCancel),
1661 my_ctx
1662 );
1663 }
1664 };
1665
1666 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm).
TestCancelation1_pipeline()1667 void TestCancelation1_pipeline () {
1668 ResetGlobals();
1669 g_ThrowException = false;
1670 // Threshold should leave more than max_threads tasks to test the cancellation. Set the threshold to iter_range_size()/4 since iter_range_size >= max_threads*2
1671 intptr_t threshold = get_iter_range_size() / 4;
1672 REQUIRE_MESSAGE(get_iter_range_size() - threshold > g_NumThreads, "Threshold should leave more than max_threads tasks to test the cancellation.");
1673 RunCancellationTest<PipelineLauncher<FilterToCancel>, Cancellator>(threshold);
1674 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived cancellation");
1675 REQUIRE_MESSAGE (g_CurExecuted < g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks were executed after cancellation");
1676 }
1677
1678 class FilterToCancel2 {
1679 public:
FilterToCancel2()1680 FilterToCancel2() {}
1681
operator ()(void * item) const1682 void* operator()(void* item) const {
1683 ++g_CurExecuted;
1684 utils::ConcurrencyTracker ct;
1685 // The test will hang (and be timed out by the test system) if is_cancelled() is broken
1686 while( !tbb::is_current_task_group_canceling() )
1687 utils::yield();
1688 return item;
1689 }
1690 };
1691
1692 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm).
1693 /** This version also tests task::is_cancelled() method. **/
TestCancelation2_pipeline()1694 void TestCancelation2_pipeline () {
1695 ResetGlobals();
1696 RunCancellationTest<PipelineLauncher<FilterToCancel2>, Cancellator2>();
1697 // g_CurExecuted is always >= g_ExecutedAtLastCatch, because the latter is always a snapshot of the
1698 // former, and g_CurExecuted is monotonic increasing. so the comparison should be at least ==.
1699 // If another filter is started after cancel but before cancellation is propagated, then the
1700 // number will be larger.
1701 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch, "Some tasks were executed after cancellation");
1702 }
1703
1704 /** If min and max thread numbers specified on the command line are different,
1705 the test is run only for 2 sizes of the thread pool (MinThread and MaxThread)
1706 to be able to test the high and low contention modes while keeping the test reasonably fast **/
1707
1708 //! Testing parallel_pipeline cancellation
1709 //! \brief \ref error_guessing
1710 TEST_CASE("parallel_pipeline cancellation test #1") {
1711 for (auto concurrency_level: utils::concurrency_range()) {
1712 g_NumThreads = static_cast<int>(concurrency_level);
1713 g_Master = std::this_thread::get_id();
1714 if (g_NumThreads > 1) {
1715 tbb::task_arena a(g_NumThreads);
__anone85726901202null1716 a.execute([] {
1717 // Execute in all the possible modes
1718 for (size_t j = 0; j < 4; ++j) {
1719 g_ExceptionInMaster = (j & 1) != 0;
1720 g_SolitaryException = (j & 2) != 0;
1721 g_NumTokens = 2 * g_NumThreads;
1722
1723 TestCancelation1_pipeline();
1724 }
1725 });
1726 }
1727 }
1728 }
1729
1730 //! Testing parallel_pipeline cancellation
1731 //! \brief \ref error_guessing
1732 TEST_CASE("parallel_pipeline cancellation test #2") {
1733 for (auto concurrency_level: utils::concurrency_range()) {
1734 g_NumThreads = static_cast<int>(concurrency_level);
1735 g_Master = std::this_thread::get_id();
1736 if (g_NumThreads > 1) {
1737 tbb::task_arena a(g_NumThreads);
__anone85726901302null1738 a.execute([] {
1739 // Execute in all the possible modes
1740 for (size_t j = 0; j < 4; ++j) {
1741 g_ExceptionInMaster = (j & 1) != 0;
1742 g_SolitaryException = (j & 2) != 0;
1743 g_NumTokens = 2 * g_NumThreads;
1744
1745 TestCancelation2_pipeline();
1746 }
1747 });
1748 }
1749 }
1750 }
1751