1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "common/test.h" 18 #include "common/concurrency_tracker.h" 19 #include "common/iterator.h" 20 #include "common/utils_concurrency_limit.h" 21 22 #include <limits.h> // for INT_MAX 23 #include <thread> 24 25 #include "tbb/parallel_for.h" 26 #include "tbb/parallel_reduce.h" 27 #include "tbb/parallel_for_each.h" 28 #include "tbb/parallel_pipeline.h" 29 #include "tbb/blocked_range.h" 30 #include "tbb/task_group.h" 31 #include "tbb/global_control.h" 32 #include "tbb/concurrent_unordered_map.h" 33 #include "tbb/task.h" 34 35 //! \file test_eh_algorithms.cpp 36 //! \brief Test for [algorithms.parallel_for algorithms.parallel_reduce algorithms.parallel_deterministic_reduce algorithms.parallel_for_each algorithms.parallel_pipeline algorithms.parallel_pipeline.flow_control] specifications 37 38 #define FLAT_RANGE 100000 39 #define FLAT_GRAIN 100 40 #define OUTER_RANGE 100 41 #define OUTER_GRAIN 10 42 #define INNER_RANGE (FLAT_RANGE / OUTER_RANGE) 43 #define INNER_GRAIN (FLAT_GRAIN / OUTER_GRAIN) 44 45 struct context_specific_counter { 46 tbb::concurrent_unordered_map<tbb::task_group_context*, std::atomic<unsigned>> context_map{}; 47 48 void increment() { 49 tbb::task_group_context* ctx = tbb::task::current_context(); 50 REQUIRE(ctx != nullptr); 51 context_map[ctx]++; 52 } 53 54 void reset() { 55 context_map.clear(); 56 } 57 58 void validate(unsigned expected_count, const char* msg) { 59 for (auto it = context_map.begin(); it != context_map.end(); it++) { 60 REQUIRE_MESSAGE( it->second <= expected_count, msg); 61 } 62 } 63 }; 64 65 std::atomic<intptr_t> g_FedTasksCount{}; // number of tasks added by parallel_for_each feeder 66 std::atomic<intptr_t> g_OuterParCalls{}; // number of actual invocations of the outer construct executed. 67 context_specific_counter g_TGCCancelled{}; // Number of times a task sees its group cancelled at start 68 69 #include "common/exception_handling.h" 70 71 /******************************** 72 Variables in test 73 74 __ Test control variables 75 g_ExceptionInMaster -- only the external thread is allowed to throw. If false, the external cannot throw 76 g_SolitaryException -- only one throw may be executed. 77 78 -- controls for ThrowTestException for pipeline tests 79 g_NestedPipelines -- are inner pipelines being run? 80 g_PipelinesStarted -- how many pipelines have run their first filter at least once. 81 82 -- Information variables 83 84 g_Master -- Thread ID of the "external" thread 85 In pipelines sometimes the external thread does not participate, so the tests have to be resilient to this. 86 87 -- Measurement variables 88 89 g_OuterParCalls -- how many outer parallel ranges or filters started 90 g_TGCCancelled -- how many inner parallel ranges or filters saw task::self().is_cancelled() 91 g_ExceptionsThrown -- number of throws executed (counted in ThrowTestException) 92 g_MasterExecutedThrow -- number of times external thread actually executed a throw 93 g_NonMasterExecutedThrow -- number of times non-external thread actually executed a throw 94 g_ExceptionCaught -- one of PropagatedException or unknown exception was caught. (Other exceptions cause assertions.) 95 96 -- Tallies for the task bodies which have executed (counted in each inner body, sampled in ThrowTestException) 97 g_CurExecuted -- total number of inner ranges or filters which executed 98 g_ExecutedAtLastCatch -- value of g_CurExecuted when last catch was made, 0 if none. 99 g_ExecutedAtFirstCatch -- value of g_CurExecuted when first catch is made, 0 if none. 100 *********************************/ 101 102 inline void ResetGlobals ( bool throwException = true, bool flog = false ) { 103 ResetEhGlobals( throwException, flog ); 104 g_FedTasksCount = 0; 105 g_OuterParCalls = 0; 106 g_NestedPipelines = false; 107 g_TGCCancelled.reset(); 108 } 109 110 //////////////////////////////////////////////////////////////////////////////// 111 // Tests for tbb::parallel_for and tbb::parallel_reduce 112 //////////////////////////////////////////////////////////////////////////////// 113 114 typedef size_t count_type; 115 typedef tbb::blocked_range<count_type> range_type; 116 117 inline intptr_t CountSubranges(range_type r) { 118 if(!r.is_divisible()) return intptr_t(1); 119 range_type r2(r,tbb::split()); 120 return CountSubranges(r) + CountSubranges(r2); 121 } 122 123 inline intptr_t NumSubranges ( intptr_t length, intptr_t grain ) { 124 return CountSubranges(range_type(0,length,grain)); 125 } 126 127 template<class Body> 128 intptr_t TestNumSubrangesCalculation ( intptr_t length, intptr_t grain, intptr_t inner_length, intptr_t inner_grain ) { 129 ResetGlobals(); 130 g_ThrowException = false; 131 intptr_t outerCalls = NumSubranges(length, grain), 132 innerCalls = NumSubranges(inner_length, inner_grain), 133 maxExecuted = outerCalls * (innerCalls + 1); 134 tbb::parallel_for( range_type(0, length, grain), Body() ); 135 REQUIRE_MESSAGE (g_CurExecuted == maxExecuted, "Wrong estimation of bodies invocation count"); 136 return maxExecuted; 137 } 138 139 class NoThrowParForBody { 140 public: 141 void operator()( const range_type& r ) const { 142 if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true; 143 else g_NonMasterExecuted = true; 144 if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment(); 145 utils::doDummyWork(r.size()); 146 } 147 }; 148 149 #if TBB_USE_EXCEPTIONS 150 151 void Test0 () { 152 ResetGlobals(); 153 tbb::simple_partitioner p; 154 for( size_t i=0; i<10; ++i ) { 155 tbb::parallel_for( range_type(0, 0, 1), NoThrowParForBody() ); 156 tbb::parallel_for( range_type(0, 0, 1), NoThrowParForBody(), p ); 157 tbb::parallel_for( range_type(0, 128, 8), NoThrowParForBody() ); 158 tbb::parallel_for( range_type(0, 128, 8), NoThrowParForBody(), p ); 159 } 160 } // void Test0 () 161 162 //! Template that creates a functor suitable for parallel_reduce from a functor for parallel_for. 163 template<typename ParForBody> 164 class SimpleParReduceBody { 165 ParForBody m_Body; 166 public: 167 void operator=(const SimpleParReduceBody&) = delete; 168 SimpleParReduceBody(const SimpleParReduceBody&) = default; 169 SimpleParReduceBody() = default; 170 171 void operator()( const range_type& r ) const { m_Body(r); } 172 SimpleParReduceBody( SimpleParReduceBody& left, tbb::split ) : m_Body(left.m_Body) {} 173 void join( SimpleParReduceBody& /*right*/ ) {} 174 }; // SimpleParReduceBody 175 176 //! Test parallel_for and parallel_reduce for a given partitioner. 177 /** The Body need only be suitable for a parallel_for. */ 178 template<typename ParForBody, typename Partitioner> 179 void TestParallelLoopAux() { 180 Partitioner partitioner; 181 for( int i=0; i<2; ++i ) { 182 ResetGlobals(); 183 TRY(); 184 if( i==0 ) 185 tbb::parallel_for( range_type(0, FLAT_RANGE, FLAT_GRAIN), ParForBody(), partitioner ); 186 else { 187 SimpleParReduceBody<ParForBody> rb; 188 tbb::parallel_reduce( range_type(0, FLAT_RANGE, FLAT_GRAIN), rb, partitioner ); 189 } 190 CATCH_AND_ASSERT(); 191 // two cases: g_SolitaryException and !g_SolitaryException 192 // 1) g_SolitaryException: only one thread actually threw. There is only one context, so the exception 193 // (when caught) will cause that context to be cancelled. After this event, there may be one or 194 // more threads which are "in-flight", up to g_NumThreads, but no more will be started. The threads, 195 // when they start, if they see they are cancelled, TGCCancelled is incremented. 196 // 2) !g_SolitaryException: more than one thread can throw. The number of threads that actually 197 // threw is g_MasterExecutedThrow if only the external thread is allowed, else g_NonMasterExecutedThrow. 198 // Only one context, so TGCCancelled should be <= g_NumThreads. 199 // 200 // the reasoning is similar for nested algorithms in a single context (Test2). 201 // 202 // If a thread throws in a context, more than one subsequent task body may see the 203 // cancelled state (if they are scheduled before the state is propagated.) this is 204 // infrequent, but it occurs. So what was to be an assertion must be a remark. 205 g_TGCCancelled.validate( g_NumThreads, "Too many tasks ran after exception thrown"); 206 REQUIRE_MESSAGE(g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception"); 207 if ( g_SolitaryException ) { 208 REQUIRE_MESSAGE(g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test"); 209 REQUIRE_MESSAGE(g_NumExceptionsCaught == (g_ExceptionInMaster ? g_MasterExecutedThrow : g_NonMasterExecutedThrow), 210 "Not all throws were caught"); 211 REQUIRE_MESSAGE(g_ExecutedAtFirstCatch == g_ExecutedAtLastCatch, "Too many exceptions occurred"); 212 } 213 else { 214 REQUIRE_MESSAGE(g_NumExceptionsCaught >= 1, "No try blocks in any body expected in this test"); 215 } 216 } 217 } // TestParallelLoopAux 218 219 //! Test with parallel_for and parallel_reduce, over all three kinds of partitioners. 220 /** The Body only needs to be suitable for tbb::parallel_for. */ 221 template<typename Body> 222 void TestParallelLoop() { 223 // The simple and auto partitioners should be const, but not the affinity partitioner. 224 TestParallelLoopAux<Body, const tbb::simple_partitioner >(); 225 TestParallelLoopAux<Body, const tbb::auto_partitioner >(); 226 #define __TBB_TEMPORARILY_DISABLED 1 227 #if !__TBB_TEMPORARILY_DISABLED 228 // TODO: Improve the test so that it tolerates delayed start of tasks with affinity_partitioner 229 TestParallelLoopAux<Body, /***/ tbb::affinity_partitioner>(); 230 #endif 231 #undef __TBB_TEMPORARILY_DISABLED 232 } 233 234 class SimpleParForBody { 235 public: 236 void operator=(const SimpleParForBody&) = delete; 237 SimpleParForBody(const SimpleParForBody&) = default; 238 SimpleParForBody() = default; 239 240 void operator()( const range_type& r ) const { 241 utils::ConcurrencyTracker ct; 242 ++g_CurExecuted; 243 if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true; 244 else g_NonMasterExecuted = true; 245 if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment(); 246 utils::doDummyWork(r.size()); 247 WaitUntilConcurrencyPeaks(); 248 ThrowTestException(1); 249 } 250 }; 251 252 void Test1() { 253 // non-nested parallel_for/reduce with throwing body, one context 254 TestParallelLoop<SimpleParForBody>(); 255 } // void Test1 () 256 257 class OuterParForBody { 258 public: 259 void operator=(const OuterParForBody&) = delete; 260 OuterParForBody(const OuterParForBody&) = default; 261 OuterParForBody() = default; 262 void operator()( const range_type& ) const { 263 utils::ConcurrencyTracker ct; 264 ++g_OuterParCalls; 265 tbb::parallel_for( tbb::blocked_range<size_t>(0, INNER_RANGE, INNER_GRAIN), SimpleParForBody() ); 266 } 267 }; 268 269 //! Uses parallel_for body containing an inner parallel_for with the default context not wrapped by a try-block. 270 /** Inner algorithms are spawned inside the new bound context by default. Since 271 exceptions thrown from the inner parallel_for are not handled by the caller 272 (outer parallel_for body) in this test, they will cancel all the sibling inner 273 algorithms. **/ 274 void Test2 () { 275 TestParallelLoop<OuterParForBody>(); 276 } // void Test2 () 277 278 class OuterParForBodyWithIsolatedCtx { 279 public: 280 void operator()( const range_type& ) const { 281 tbb::task_group_context ctx(tbb::task_group_context::isolated); 282 ++g_OuterParCalls; 283 tbb::parallel_for( tbb::blocked_range<size_t>(0, INNER_RANGE, INNER_GRAIN), SimpleParForBody(), tbb::simple_partitioner(), ctx ); 284 } 285 }; 286 287 //! Uses parallel_for body invoking an inner parallel_for with an isolated context without a try-block. 288 /** Even though exceptions thrown from the inner parallel_for are not handled 289 by the caller in this test, they will not affect sibling inner algorithms 290 already running because of the isolated contexts. However because the first 291 exception cancels the root parallel_for only the first g_NumThreads subranges 292 will be processed (which launch inner parallel_fors) **/ 293 void Test3 () { 294 ResetGlobals(); 295 typedef OuterParForBodyWithIsolatedCtx body_type; 296 intptr_t innerCalls = NumSubranges(INNER_RANGE, INNER_GRAIN), 297 // we expect one thread to throw without counting, the rest to run to completion 298 // this formula assumes g_numThreads outer pfor ranges will be started, but that is not the 299 // case; the SimpleParFor subranges are started up as part of the outer ones, and when 300 // the amount of concurrency reaches g_NumThreads no more outer Pfor ranges are started. 301 // so we have to count the number of outer Pfors actually started. 302 minExecuted = (g_NumThreads - 1) * innerCalls; 303 TRY(); 304 tbb::parallel_for( range_type(0, OUTER_RANGE, OUTER_GRAIN), body_type() ); 305 CATCH_AND_ASSERT(); 306 minExecuted = (g_OuterParCalls - 1) * innerCalls; // see above 307 308 // The first formula above assumes all ranges of the outer parallel for are executed, and one 309 // cancels. In the event, we have a smaller number of ranges that start before the exception 310 // is caught. 311 // 312 // g_SolitaryException:One inner range throws. Outer parallel_For is cancelled, but sibling 313 // parallel_fors continue to completion (unless the threads that execute 314 // are not allowed to throw, in which case we will not see any exceptions). 315 // !g_SolitaryException:multiple inner ranges may throw. Any which throws will stop, and the 316 // corresponding range of the outer pfor will stop also. 317 // 318 // In either case, once the outer pfor gets the exception it will stop executing further ranges. 319 320 // if the only threads executing were not allowed to throw, then not seeing an exception is okay. 321 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecuted) || (!g_ExceptionInMaster && !g_NonMasterExecuted); 322 if ( g_SolitaryException ) { 323 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception"); 324 REQUIRE_MESSAGE (g_CurExecuted > minExecuted, "Too few tasks survived exception"); 325 REQUIRE_MESSAGE ((g_CurExecuted <= minExecuted + (g_ExecutedAtLastCatch + g_NumThreads)), "Too many tasks survived exception"); 326 REQUIRE_MESSAGE ((g_NumExceptionsCaught == 1 || okayNoExceptionsCaught), "No try_blocks in any body expected in this test"); 327 } 328 else { 329 REQUIRE_MESSAGE ((g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads), "Too many tasks survived exception"); 330 REQUIRE_MESSAGE ((g_NumExceptionsCaught >= 1 || okayNoExceptionsCaught), "No try_blocks in any body expected in this test"); 331 } 332 } // void Test3 () 333 334 class OuterParForExceptionSafeBody { 335 public: 336 void operator()( const range_type& ) const { 337 tbb::task_group_context ctx(tbb::task_group_context::isolated); 338 ++g_OuterParCalls; 339 TRY(); 340 tbb::parallel_for( tbb::blocked_range<size_t>(0, INNER_RANGE, INNER_GRAIN), SimpleParForBody(), tbb::simple_partitioner(), ctx ); 341 CATCH(); // this macro sets g_ExceptionCaught 342 } 343 }; 344 345 //! Uses parallel_for body invoking an inner parallel_for (with isolated context) inside a try-block. 346 /** Since exception(s) thrown from the inner parallel_for are handled by the caller 347 in this test, they do not affect neither other tasks of the the root parallel_for 348 nor sibling inner algorithms. **/ 349 void Test4 () { 350 ResetGlobals( true, true ); 351 intptr_t innerCalls = NumSubranges(INNER_RANGE, INNER_GRAIN), 352 outerCalls = NumSubranges(OUTER_RANGE, OUTER_GRAIN); 353 TRY(); 354 tbb::parallel_for( range_type(0, OUTER_RANGE, OUTER_GRAIN), OuterParForExceptionSafeBody() ); 355 CATCH(); 356 // g_SolitaryException : one inner pfor will throw, the rest will execute to completion. 357 // so the count should be (outerCalls -1) * innerCalls, if a throw happened. 358 // !g_SolitaryException : possible multiple inner pfor throws. Should be approximately 359 // (outerCalls - g_NumExceptionsCaught) * innerCalls, give or take a few 360 intptr_t minExecuted = (outerCalls - g_NumExceptionsCaught) * innerCalls; 361 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecuted) || (!g_ExceptionInMaster && !g_NonMasterExecuted); 362 if ( g_SolitaryException ) { 363 // only one task had exception thrown. That task had at least one execution (the one that threw). 364 // There may be an arbitrary number of ranges executed after the throw but before the exception 365 // is caught in the scheduler and cancellation is signaled. (seen 9, 11 and 62 (!) for 8 threads) 366 REQUIRE_MESSAGE ((g_NumExceptionsCaught == 1 || okayNoExceptionsCaught), "No exception registered"); 367 REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too few tasks executed"); 368 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception"); 369 // a small number of threads can execute in a throwing sub-pfor, if the task which is 370 // to do the solitary throw swaps out after registering its intent to throw but before it 371 // actually does so. As a result, the number of extra tasks cannot exceed the number of thread 372 // for each nested pfor invication) 373 REQUIRE_MESSAGE (g_CurExecuted <= minExecuted + (g_NumThreads-1)*g_NumThreads/2, "Too many tasks survived exception"); 374 } 375 else { 376 REQUIRE_MESSAGE (((g_NumExceptionsCaught >= 1 && g_NumExceptionsCaught <= outerCalls) || okayNoExceptionsCaught), "Unexpected actual number of exceptions"); 377 REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too few executed tasks reported"); 378 REQUIRE_MESSAGE ((g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads), "Too many tasks survived multiple exceptions"); 379 REQUIRE_MESSAGE (g_CurExecuted <= outerCalls * (1 + g_NumThreads), "Too many tasks survived exception"); 380 } 381 } // void Test4 () 382 383 //! Testing parallel_for and parallel_reduce exception handling 384 //! \brief \ref error_guessing 385 TEST_CASE("parallel_for and parallel_reduce exception handling test #0") { 386 for (auto concurrency_level: utils::concurrency_range()) { 387 g_NumThreads = static_cast<int>(concurrency_level); 388 g_Master = std::this_thread::get_id(); 389 if (g_NumThreads > 1) { 390 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 391 // Execute in all the possible modes 392 for ( size_t j = 0; j < 4; ++j ) { 393 g_ExceptionInMaster = (j & 1) != 0; 394 g_SolitaryException = (j & 2) != 0; 395 396 Test0(); 397 } 398 } 399 } 400 } 401 402 //! Testing parallel_for and parallel_reduce exception handling 403 //! \brief \ref error_guessing 404 TEST_CASE("parallel_for and parallel_reduce exception handling test #1") { 405 for (auto concurrency_level: utils::concurrency_range()) { 406 g_NumThreads = static_cast<int>(concurrency_level); 407 g_Master = std::this_thread::get_id(); 408 if (g_NumThreads > 1) { 409 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 410 // Execute in all the possible modes 411 for ( size_t j = 0; j < 4; ++j ) { 412 g_ExceptionInMaster = (j & 1) != 0; 413 g_SolitaryException = (j & 2) != 0; 414 415 Test1(); 416 } 417 } 418 } 419 } 420 421 //! Testing parallel_for and parallel_reduce exception handling 422 //! \brief \ref error_guessing 423 TEST_CASE("parallel_for and parallel_reduce exception handling test #2") { 424 for (auto concurrency_level: utils::concurrency_range()) { 425 g_NumThreads = static_cast<int>(concurrency_level); 426 g_Master = std::this_thread::get_id(); 427 if (g_NumThreads > 1) { 428 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 429 // Execute in all the possible modes 430 for ( size_t j = 0; j < 4; ++j ) { 431 g_ExceptionInMaster = (j & 1) != 0; 432 g_SolitaryException = (j & 2) != 0; 433 434 Test2(); 435 } 436 } 437 } 438 } 439 440 //! Testing parallel_for and parallel_reduce exception handling 441 //! \brief \ref error_guessing 442 TEST_CASE("parallel_for and parallel_reduce exception handling test #3") { 443 for (auto concurrency_level: utils::concurrency_range()) { 444 g_NumThreads = static_cast<int>(concurrency_level); 445 g_Master = std::this_thread::get_id(); 446 if (g_NumThreads > 1) { 447 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 448 // Execute in all the possible modes 449 for ( size_t j = 0; j < 4; ++j ) { 450 g_ExceptionInMaster = (j & 1) != 0; 451 g_SolitaryException = (j & 2) != 0; 452 453 Test3(); 454 } 455 } 456 } 457 } 458 459 //! Testing parallel_for and parallel_reduce exception handling 460 //! \brief \ref error_guessing 461 TEST_CASE("parallel_for and parallel_reduce exception handling test #4") { 462 for (auto concurrency_level: utils::concurrency_range()) { 463 g_NumThreads = static_cast<int>(concurrency_level); 464 g_Master = std::this_thread::get_id(); 465 if (g_NumThreads > 1) { 466 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 467 // Execute in all the possible modes 468 for ( size_t j = 0; j < 4; ++j ) { 469 g_ExceptionInMaster = (j & 1) != 0; 470 g_SolitaryException = (j & 2) != 0; 471 472 Test4(); 473 } 474 } 475 } 476 } 477 478 #endif /* TBB_USE_EXCEPTIONS */ 479 480 class ParForBodyToCancel { 481 public: 482 void operator()( const range_type& ) const { 483 ++g_CurExecuted; 484 Cancellator::WaitUntilReady(); 485 } 486 }; 487 488 template<class B> 489 class ParForLauncher { 490 tbb::task_group_context &my_ctx; 491 public: 492 void operator()() const { 493 tbb::parallel_for( range_type(0, FLAT_RANGE, FLAT_GRAIN), B(), tbb::simple_partitioner(), my_ctx ); 494 } 495 ParForLauncher ( tbb::task_group_context& ctx ) : my_ctx(ctx) {} 496 }; 497 498 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm). 499 void TestCancelation1 () { 500 ResetGlobals( false ); 501 RunCancellationTest<ParForLauncher<ParForBodyToCancel>, Cancellator>( NumSubranges(FLAT_RANGE, FLAT_GRAIN) / 4 ); 502 } 503 504 class Cancellator2 { 505 tbb::task_group_context &m_GroupToCancel; 506 507 public: 508 void operator()() const { 509 utils::ConcurrencyTracker ct; 510 WaitUntilConcurrencyPeaks(); 511 m_GroupToCancel.cancel_group_execution(); 512 g_ExecutedAtLastCatch = g_CurExecuted.load(); 513 } 514 515 Cancellator2 ( tbb::task_group_context& ctx, intptr_t ) : m_GroupToCancel(ctx) {} 516 }; 517 518 class ParForBodyToCancel2 { 519 public: 520 void operator()( const range_type& ) const { 521 ++g_CurExecuted; 522 utils::ConcurrencyTracker ct; 523 // The test will hang (and be timed out by the test system) if is_cancelled() is broken 524 while( !tbb::is_current_task_group_canceling() ) 525 utils::yield(); 526 } 527 }; 528 529 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm). 530 /** This version also tests tbb::is_current_task_group_canceling() method. **/ 531 void TestCancelation2 () { 532 ResetGlobals(); 533 RunCancellationTest<ParForLauncher<ParForBodyToCancel2>, Cancellator2>(); 534 REQUIRE_MESSAGE (g_ExecutedAtLastCatch < g_NumThreads, "Somehow worker tasks started their execution before the cancellator task"); 535 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived cancellation"); 536 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Some tasks were executed after cancellation"); 537 } 538 539 //////////////////////////////////////////////////////////////////////////////// 540 // Regression test based on the contribution by the author of the following forum post: 541 // http://softwarecommunity.intel.com/isn/Community/en-US/forums/thread/30254959.aspx 542 543 class Worker { 544 static const int max_nesting = 3; 545 static const int reduce_range = 1024; 546 static const int reduce_grain = 256; 547 public: 548 int DoWork (int level); 549 int Validate (int start_level) { 550 int expected = 1; // identity for multiplication 551 for(int i=start_level+1; i<max_nesting; ++i) 552 expected *= reduce_range; 553 return expected; 554 } 555 }; 556 557 class RecursiveParReduceBodyWithSharedWorker { 558 Worker * m_SharedWorker; 559 int m_NestingLevel; 560 int m_Result; 561 public: 562 RecursiveParReduceBodyWithSharedWorker ( RecursiveParReduceBodyWithSharedWorker& src, tbb::split ) 563 : m_SharedWorker(src.m_SharedWorker) 564 , m_NestingLevel(src.m_NestingLevel) 565 , m_Result(0) 566 {} 567 RecursiveParReduceBodyWithSharedWorker ( Worker *w, int outer ) 568 : m_SharedWorker(w) 569 , m_NestingLevel(outer) 570 , m_Result(0) 571 {} 572 573 void operator() ( const tbb::blocked_range<size_t>& r ) { 574 if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true; 575 else g_NonMasterExecuted = true; 576 if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment(); 577 for (size_t i = r.begin (); i != r.end (); ++i) { 578 m_Result += m_SharedWorker->DoWork (m_NestingLevel); 579 } 580 } 581 void join (const RecursiveParReduceBodyWithSharedWorker & x) { 582 m_Result += x.m_Result; 583 } 584 int result () { return m_Result; } 585 }; 586 587 int Worker::DoWork ( int level ) { 588 ++level; 589 if ( level < max_nesting ) { 590 RecursiveParReduceBodyWithSharedWorker rt (this, level); 591 tbb::parallel_reduce (tbb::blocked_range<size_t>(0, reduce_range, reduce_grain), rt); 592 return rt.result(); 593 } 594 else 595 return 1; 596 } 597 598 //! Regression test for hanging that occurred with the first version of cancellation propagation 599 void TestCancelation3 () { 600 Worker w; 601 int result = w.DoWork (0); 602 int expected = w.Validate(0); 603 REQUIRE_MESSAGE ( result == expected, "Wrong calculation result"); 604 } 605 606 struct StatsCounters { 607 std::atomic<size_t> my_total_created; 608 std::atomic<size_t> my_total_deleted; 609 StatsCounters() { 610 my_total_created = 0; 611 my_total_deleted = 0; 612 } 613 }; 614 615 class ParReduceBody { 616 StatsCounters* my_stats; 617 size_t my_id; 618 bool my_exception; 619 tbb::task_group_context& tgc; 620 621 public: 622 ParReduceBody( StatsCounters& s_, tbb::task_group_context& context, bool e_ ) : 623 my_stats(&s_), my_exception(e_), tgc(context) { 624 my_id = my_stats->my_total_created++; 625 } 626 627 ParReduceBody( const ParReduceBody& lhs ) : tgc(lhs.tgc) { 628 my_stats = lhs.my_stats; 629 my_id = my_stats->my_total_created++; 630 } 631 632 ParReduceBody( ParReduceBody& lhs, tbb::split ) : tgc(lhs.tgc) { 633 my_stats = lhs.my_stats; 634 my_id = my_stats->my_total_created++; 635 } 636 637 ~ParReduceBody(){ ++my_stats->my_total_deleted; } 638 639 void operator()( const tbb::blocked_range<std::size_t>& /*range*/ ) const { 640 //Do nothing, except for one task (chosen arbitrarily) 641 if( my_id >= 12 ) { 642 if( my_exception ) 643 ThrowTestException(1); 644 else 645 tgc.cancel_group_execution(); 646 } 647 } 648 649 void join( ParReduceBody& /*rhs*/ ) {} 650 }; 651 652 void TestCancelation4() { 653 StatsCounters statsObj; 654 #if TBB_USE_EXCEPTIONS 655 try 656 #endif 657 { 658 tbb::task_group_context tgc1, tgc2; 659 ParReduceBody body_for_cancellation(statsObj, tgc1, false), body_for_exception(statsObj, tgc2, true); 660 tbb::parallel_reduce( tbb::blocked_range<std::size_t>(0,100000000,100), body_for_cancellation, tbb::simple_partitioner(), tgc1 ); 661 tbb::parallel_reduce( tbb::blocked_range<std::size_t>(0,100000000,100), body_for_exception, tbb::simple_partitioner(), tgc2 ); 662 } 663 #if TBB_USE_EXCEPTIONS 664 catch(...) {} 665 #endif 666 REQUIRE_MESSAGE ( statsObj.my_total_created==statsObj.my_total_deleted, "Not all parallel_reduce body objects created were reclaimed"); 667 } 668 669 //! Testing parallel_for and parallel_reduce cancellation 670 //! \brief \ref error_guessing 671 TEST_CASE("parallel_for and parallel_reduce cancellation test #1") { 672 for (auto concurrency_level: utils::concurrency_range()) { 673 g_NumThreads = static_cast<int>(concurrency_level); 674 g_Master = std::this_thread::get_id(); 675 if (g_NumThreads > 1) { 676 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 677 // Execute in all the possible modes 678 for ( size_t j = 0; j < 4; ++j ) { 679 g_ExceptionInMaster = (j & 1) != 0; 680 g_SolitaryException = (j & 2) != 0; 681 682 TestCancelation1(); 683 } 684 } 685 } 686 } 687 688 //! Testing parallel_for and parallel_reduce cancellation 689 //! \brief \ref error_guessing 690 TEST_CASE("parallel_for and parallel_reduce cancellation test #2") { 691 for (auto concurrency_level: utils::concurrency_range()) { 692 g_NumThreads = static_cast<int>(concurrency_level); 693 g_Master = std::this_thread::get_id(); 694 if (g_NumThreads > 1) { 695 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 696 // Execute in all the possible modes 697 for ( size_t j = 0; j < 4; ++j ) { 698 g_ExceptionInMaster = (j & 1) != 0; 699 g_SolitaryException = (j & 2) != 0; 700 701 TestCancelation2(); 702 } 703 } 704 } 705 } 706 707 //! Testing parallel_for and parallel_reduce cancellation 708 //! \brief \ref error_guessing 709 TEST_CASE("parallel_for and parallel_reduce cancellation test #3") { 710 for (auto concurrency_level: utils::concurrency_range()) { 711 g_NumThreads = static_cast<int>(concurrency_level); 712 g_Master = std::this_thread::get_id(); 713 if (g_NumThreads > 1) { 714 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 715 // Execute in all the possible modes 716 for ( size_t j = 0; j < 4; ++j ) { 717 g_ExceptionInMaster = (j & 1) != 0; 718 g_SolitaryException = (j & 2) != 0; 719 720 TestCancelation3(); 721 } 722 } 723 } 724 } 725 726 //! Testing parallel_for and parallel_reduce cancellation 727 //! \brief \ref error_guessing 728 TEST_CASE("parallel_for and parallel_reduce cancellation test #4") { 729 for (auto concurrency_level: utils::concurrency_range()) { 730 g_NumThreads = static_cast<int>(concurrency_level); 731 g_Master = std::this_thread::get_id(); 732 if (g_NumThreads > 1) { 733 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 734 // Execute in all the possible modes 735 for ( size_t j = 0; j < 4; ++j ) { 736 g_ExceptionInMaster = (j & 1) != 0; 737 g_SolitaryException = (j & 2) != 0; 738 739 TestCancelation4(); 740 } 741 } 742 } 743 } 744 745 //////////////////////////////////////////////////////////////////////////////// 746 // Tests for tbb::parallel_for_each 747 //////////////////////////////////////////////////////////////////////////////// 748 749 #define ITER_RANGE 1000 750 #define ITEMS_TO_FEED 50 751 #define INNER_ITER_RANGE 100 752 #define OUTER_ITER_RANGE 50 753 754 #define PREPARE_RANGE(Iterator, rangeSize) \ 755 size_t test_vector[rangeSize + 1]; \ 756 for (int i =0; i < rangeSize; i++) \ 757 test_vector[i] = i; \ 758 Iterator begin(&test_vector[0]); \ 759 Iterator end(&test_vector[rangeSize]) 760 761 void Feed ( tbb::feeder<size_t> &feeder, size_t val ) { 762 if (g_FedTasksCount < ITEMS_TO_FEED) { 763 ++g_FedTasksCount; 764 feeder.add(val); 765 } 766 } 767 768 #define RunWithSimpleBody(func, body) \ 769 func<utils::ForwardIterator<size_t>, body>(); \ 770 func<utils::ForwardIterator<size_t>, body##WithFeeder>() 771 772 #define RunWithTemplatedBody(func, body) \ 773 func<utils::ForwardIterator<size_t>, body<utils::ForwardIterator<size_t> > >(); \ 774 func<utils::ForwardIterator<size_t>, body##WithFeeder<utils::ForwardIterator<size_t> > >() 775 776 #if TBB_USE_EXCEPTIONS 777 778 // Simple functor object with exception 779 class SimpleParForEachBody { 780 public: 781 void operator() ( size_t &value ) const { 782 ++g_CurExecuted; 783 if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true; 784 else g_NonMasterExecuted = true; 785 if( tbb::is_current_task_group_canceling() ) { 786 g_TGCCancelled.increment(); 787 } 788 utils::ConcurrencyTracker ct; 789 value += 1000; 790 WaitUntilConcurrencyPeaks(); 791 ThrowTestException(1); 792 } 793 }; 794 795 // Simple functor object with exception and feeder 796 class SimpleParForEachBodyWithFeeder : SimpleParForEachBody { 797 public: 798 void operator() ( size_t &value, tbb::feeder<size_t> &feeder ) const { 799 Feed(feeder, 0); 800 SimpleParForEachBody::operator()(value); 801 } 802 }; 803 804 // Tests exceptions without nesting 805 template <class Iterator, class simple_body> 806 void Test1_parallel_for_each () { 807 ResetGlobals(); 808 PREPARE_RANGE(Iterator, ITER_RANGE); 809 TRY(); 810 tbb::parallel_for_each<Iterator, simple_body>(begin, end, simple_body() ); 811 CATCH_AND_ASSERT(); 812 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception"); 813 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived cancellation"); 814 REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test"); 815 if ( !g_SolitaryException ) 816 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception"); 817 818 } // void Test1_parallel_for_each () 819 820 template <class Iterator> 821 class OuterParForEachBody { 822 public: 823 void operator()( size_t& /*value*/ ) const { 824 ++g_OuterParCalls; 825 PREPARE_RANGE(Iterator, INNER_ITER_RANGE); 826 tbb::parallel_for_each<Iterator, SimpleParForEachBody>(begin, end, SimpleParForEachBody()); 827 } 828 }; 829 830 template <class Iterator> 831 class OuterParForEachBodyWithFeeder : OuterParForEachBody<Iterator> { 832 public: 833 void operator()( size_t& value, tbb::feeder<size_t>& feeder ) const { 834 Feed(feeder, 0); 835 OuterParForEachBody<Iterator>::operator()(value); 836 } 837 }; 838 839 //! Uses parallel_for_each body containing an inner parallel_for_each with the default context not wrapped by a try-block. 840 /** Inner algorithms are spawned inside the new bound context by default. Since 841 exceptions thrown from the inner parallel_for_each are not handled by the caller 842 (outer parallel_for_each body) in this test, they will cancel all the sibling inner 843 algorithms. **/ 844 template <class Iterator, class outer_body> 845 void Test2_parallel_for_each () { 846 ResetGlobals(); 847 PREPARE_RANGE(Iterator, ITER_RANGE); 848 TRY(); 849 tbb::parallel_for_each<Iterator, outer_body >(begin, end, outer_body() ); 850 CATCH_AND_ASSERT(); 851 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception"); 852 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived cancellation"); 853 REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test"); 854 if ( !g_SolitaryException ) 855 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception"); 856 } // void Test2_parallel_for_each () 857 858 template <class Iterator> 859 class OuterParForEachBodyWithIsolatedCtx { 860 public: 861 void operator()( size_t& /*value*/ ) const { 862 tbb::task_group_context ctx(tbb::task_group_context::isolated); 863 ++g_OuterParCalls; 864 PREPARE_RANGE(Iterator, INNER_ITER_RANGE); 865 tbb::parallel_for_each<Iterator, SimpleParForEachBody>(begin, end, SimpleParForEachBody(), ctx); 866 } 867 }; 868 869 template <class Iterator> 870 class OuterParForEachBodyWithIsolatedCtxWithFeeder : OuterParForEachBodyWithIsolatedCtx<Iterator> { 871 public: 872 void operator()( size_t& value, tbb::feeder<size_t> &feeder ) const { 873 Feed(feeder, 0); 874 OuterParForEachBodyWithIsolatedCtx<Iterator>::operator()(value); 875 } 876 }; 877 878 //! Uses parallel_for_each body invoking an inner parallel_for_each with an isolated context without a try-block. 879 /** Even though exceptions thrown from the inner parallel_for_each are not handled 880 by the caller in this test, they will not affect sibling inner algorithms 881 already running because of the isolated contexts. However because the first 882 exception cancels the root parallel_for_each, at most the first g_NumThreads subranges 883 will be processed (which launch inner parallel_for_eachs) **/ 884 template <class Iterator, class outer_body> 885 void Test3_parallel_for_each () { 886 ResetGlobals(); 887 PREPARE_RANGE(Iterator, OUTER_ITER_RANGE); 888 intptr_t innerCalls = INNER_ITER_RANGE, 889 // The assumption here is the same as in outer parallel fors. 890 minExecuted = (g_NumThreads - 1) * innerCalls; 891 g_Master = std::this_thread::get_id(); 892 TRY(); 893 tbb::parallel_for_each<Iterator, outer_body >(begin, end, outer_body()); 894 CATCH_AND_ASSERT(); 895 // figure actual number of expected executions given the number of outer PDos started. 896 minExecuted = (g_OuterParCalls - 1) * innerCalls; 897 // one extra thread may run a task that sees cancellation. Infrequent but possible 898 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception"); 899 if ( g_SolitaryException ) { 900 REQUIRE_MESSAGE (g_CurExecuted > minExecuted, "Too few tasks survived exception"); 901 REQUIRE_MESSAGE (g_CurExecuted <= minExecuted + (g_ExecutedAtLastCatch + g_NumThreads), "Too many tasks survived exception"); 902 } 903 REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test"); 904 if ( !g_SolitaryException ) 905 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception"); 906 } // void Test3_parallel_for_each () 907 908 template <class Iterator> 909 class OuterParForEachWithEhBody { 910 public: 911 void operator()( size_t& /*value*/ ) const { 912 tbb::task_group_context ctx(tbb::task_group_context::isolated); 913 ++g_OuterParCalls; 914 PREPARE_RANGE(Iterator, INNER_ITER_RANGE); 915 TRY(); 916 tbb::parallel_for_each<Iterator, SimpleParForEachBody>(begin, end, SimpleParForEachBody(), ctx); 917 CATCH(); 918 } 919 }; 920 921 template <class Iterator> 922 class OuterParForEachWithEhBodyWithFeeder : OuterParForEachWithEhBody<Iterator> { 923 public: 924 void operator=(const OuterParForEachWithEhBodyWithFeeder&) = delete; 925 OuterParForEachWithEhBodyWithFeeder(const OuterParForEachWithEhBodyWithFeeder&) = default; 926 OuterParForEachWithEhBodyWithFeeder() = default; 927 void operator()( size_t &value, tbb::feeder<size_t> &feeder ) const { 928 Feed(feeder, 0); 929 OuterParForEachWithEhBody<Iterator>::operator()(value); 930 } 931 }; 932 933 //! Uses parallel_for body invoking an inner parallel_for (with default bound context) inside a try-block. 934 /** Since exception(s) thrown from the inner parallel_for are handled by the caller 935 in this test, they do not affect neither other tasks of the the root parallel_for 936 nor sibling inner algorithms. **/ 937 template <class Iterator, class outer_body_with_eh> 938 void Test4_parallel_for_each () { 939 ResetGlobals( true, true ); 940 PREPARE_RANGE(Iterator, OUTER_ITER_RANGE); 941 g_Master = std::this_thread::get_id(); 942 TRY(); 943 tbb::parallel_for_each<Iterator, outer_body_with_eh>(begin, end, outer_body_with_eh()); 944 CATCH(); 945 REQUIRE_MESSAGE (!l_ExceptionCaughtAtCurrentLevel, "All exceptions must have been handled in the parallel_for_each body"); 946 intptr_t innerCalls = INNER_ITER_RANGE, 947 outerCalls = OUTER_ITER_RANGE + g_FedTasksCount, 948 maxExecuted = outerCalls * innerCalls, 949 minExecuted = 0; 950 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception"); 951 if ( g_SolitaryException ) { 952 minExecuted = maxExecuted - innerCalls; 953 REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No exception registered"); 954 REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too few tasks executed"); 955 // This test has the same property as Test4 (parallel_for); the exception can be 956 // thrown, but some number of tasks from the outer Pdo can execute after the throw but 957 // before the cancellation is signaled (have seen 36). 958 DOCTEST_WARN_MESSAGE(g_CurExecuted < maxExecuted, "All tasks survived exception. Oversubscription?"); 959 } 960 else { 961 minExecuted = g_NumExceptionsCaught; 962 REQUIRE_MESSAGE ((g_NumExceptionsCaught > 1 && g_NumExceptionsCaught <= outerCalls), "Unexpected actual number of exceptions"); 963 REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too many executed tasks reported"); 964 REQUIRE_MESSAGE (g_CurExecuted < g_ExecutedAtLastCatch + g_NumThreads + outerCalls, "Too many tasks survived multiple exceptions"); 965 REQUIRE_MESSAGE (g_CurExecuted <= outerCalls * (1 + g_NumThreads), "Too many tasks survived exception"); 966 } 967 } // void Test4_parallel_for_each () 968 969 // This body throws an exception only if the task was added by feeder 970 class ParForEachBodyWithThrowingFeederTasks { 971 public: 972 //! This form of the function call operator can be used when the body needs to add more work during the processing 973 void operator() (const size_t &value, tbb::feeder<size_t> &feeder ) const { 974 ++g_CurExecuted; 975 if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true; 976 else g_NonMasterExecuted = true; 977 if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment(); 978 Feed(feeder, 1); 979 if (value == 1) 980 ThrowTestException(1); 981 } 982 }; // class ParForEachBodyWithThrowingFeederTasks 983 984 // Test exception in task, which was added by feeder. 985 template <class Iterator> 986 void Test5_parallel_for_each () { 987 ResetGlobals(); 988 PREPARE_RANGE(Iterator, ITER_RANGE); 989 g_Master = std::this_thread::get_id(); 990 TRY(); 991 tbb::parallel_for_each<Iterator, ParForEachBodyWithThrowingFeederTasks>(begin, end, ParForEachBodyWithThrowingFeederTasks()); 992 CATCH(); 993 if (g_SolitaryException) { 994 // Failure occurs when g_ExceptionInMaster is false, but all the 1 values in the range 995 // are handled by the external thread. In this case no throw occurs. 996 REQUIRE_MESSAGE ((l_ExceptionCaughtAtCurrentLevel // we saw an exception 997 || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) // non-external trhead throws but none tried 998 || (g_ExceptionInMaster && !g_MasterExecutedThrow)) // external thread throws but external thread didn't try 999 , "At least one exception should occur"); 1000 } 1001 } // void Test5_parallel_for_each () 1002 1003 //! Testing parallel_for_each exception handling 1004 //! \brief \ref error_guessing 1005 TEST_CASE("parallel_for_each exception handling test #1") { 1006 for (auto concurrency_level: utils::concurrency_range()) { 1007 g_NumThreads = static_cast<int>(concurrency_level); 1008 g_Master = std::this_thread::get_id(); 1009 if (g_NumThreads > 1) { 1010 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 1011 // Execute in all the possible modes 1012 for ( size_t j = 0; j < 4; ++j ) { 1013 g_ExceptionInMaster = (j & 1) != 0; 1014 g_SolitaryException = (j & 2) != 0; 1015 1016 RunWithSimpleBody(Test1_parallel_for_each, SimpleParForEachBody); 1017 } 1018 } 1019 } 1020 } 1021 1022 //! Testing parallel_for_each exception handling 1023 //! \brief \ref error_guessing 1024 TEST_CASE("parallel_for_each exception handling test #2") { 1025 for (auto concurrency_level: utils::concurrency_range()) { 1026 g_NumThreads = static_cast<int>(concurrency_level); 1027 g_Master = std::this_thread::get_id(); 1028 if (g_NumThreads > 1) { 1029 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 1030 // Execute in all the possible modes 1031 for ( size_t j = 0; j < 4; ++j ) { 1032 g_ExceptionInMaster = (j & 1) != 0; 1033 g_SolitaryException = (j & 2) != 0; 1034 1035 RunWithTemplatedBody(Test2_parallel_for_each, OuterParForEachBody); 1036 } 1037 } 1038 } 1039 } 1040 1041 //! Testing parallel_for_each exception handling 1042 //! \brief \ref error_guessing 1043 TEST_CASE("parallel_for_each exception handling test #3") { 1044 for (auto concurrency_level: utils::concurrency_range()) { 1045 g_NumThreads = static_cast<int>(concurrency_level); 1046 g_Master = std::this_thread::get_id(); 1047 if (g_NumThreads > 1) { 1048 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 1049 // Execute in all the possible modes 1050 for ( size_t j = 0; j < 4; ++j ) { 1051 g_ExceptionInMaster = (j & 1) != 0; 1052 g_SolitaryException = (j & 2) != 0; 1053 1054 RunWithTemplatedBody(Test3_parallel_for_each, OuterParForEachBodyWithIsolatedCtx); 1055 } 1056 } 1057 } 1058 } 1059 1060 //! Testing parallel_for_each exception handling 1061 //! \brief \ref error_guessing 1062 TEST_CASE("parallel_for_each exception handling test #4") { 1063 for (auto concurrency_level: utils::concurrency_range()) { 1064 g_NumThreads = static_cast<int>(concurrency_level); 1065 g_Master = std::this_thread::get_id(); 1066 if (g_NumThreads > 1) { 1067 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 1068 // Execute in all the possible modes 1069 for ( size_t j = 0; j < 4; ++j ) { 1070 g_ExceptionInMaster = (j & 1) != 0; 1071 g_SolitaryException = (j & 2) != 0; 1072 1073 RunWithTemplatedBody(Test4_parallel_for_each, OuterParForEachWithEhBody); 1074 } 1075 } 1076 } 1077 } 1078 1079 //! Testing parallel_for_each exception handling 1080 //! \brief \ref error_guessing 1081 TEST_CASE("parallel_for_each exception handling test #5") { 1082 for (auto concurrency_level: utils::concurrency_range()) { 1083 g_NumThreads = static_cast<int>(concurrency_level); 1084 g_Master = std::this_thread::get_id(); 1085 if (g_NumThreads > 1) { 1086 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 1087 // Execute in all the possible modes 1088 for ( size_t j = 0; j < 4; ++j ) { 1089 g_ExceptionInMaster = (j & 1) != 0; 1090 g_SolitaryException = (j & 2) != 0; 1091 1092 Test5_parallel_for_each<utils::InputIterator<size_t> >(); 1093 Test5_parallel_for_each<utils::ForwardIterator<size_t> >(); 1094 Test5_parallel_for_each<utils::RandomIterator<size_t> >(); 1095 } 1096 } 1097 } 1098 } 1099 1100 #endif /* TBB_USE_EXCEPTIONS */ 1101 1102 class ParForEachBodyToCancel { 1103 public: 1104 void operator()( size_t& /*value*/ ) const { 1105 ++g_CurExecuted; 1106 Cancellator::WaitUntilReady(); 1107 } 1108 }; 1109 1110 class ParForEachBodyToCancelWithFeeder : ParForEachBodyToCancel { 1111 public: 1112 void operator()( size_t& value, tbb::feeder<size_t> &feeder ) const { 1113 Feed(feeder, 0); 1114 ParForEachBodyToCancel::operator()(value); 1115 } 1116 }; 1117 1118 template<class B, class Iterator> 1119 class ParForEachWorker { 1120 tbb::task_group_context &my_ctx; 1121 public: 1122 void operator()() const { 1123 PREPARE_RANGE(Iterator, INNER_ITER_RANGE); 1124 tbb::parallel_for_each<Iterator, B>( begin, end, B(), my_ctx ); 1125 } 1126 1127 ParForEachWorker ( tbb::task_group_context& ctx ) : my_ctx(ctx) {} 1128 }; 1129 1130 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm). 1131 template <class Iterator, class body_to_cancel> 1132 void TestCancelation1_parallel_for_each () { 1133 ResetGlobals( false ); 1134 intptr_t threshold = 10; 1135 tbb::task_group tg; 1136 tbb::task_group_context ctx; 1137 Cancellator cancellator(ctx, threshold); 1138 ParForEachWorker<body_to_cancel, Iterator> worker(ctx); 1139 tg.run( cancellator ); 1140 utils::yield(); 1141 tg.run( worker ); 1142 TRY(); 1143 tg.wait(); 1144 CATCH_AND_FAIL(); 1145 REQUIRE_MESSAGE (g_CurExecuted < g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks were executed after cancellation"); 1146 } 1147 1148 class ParForEachBodyToCancel2 { 1149 public: 1150 void operator()( size_t& /*value*/ ) const { 1151 ++g_CurExecuted; 1152 utils::ConcurrencyTracker ct; 1153 // The test will hang (and be timed out by the test system) if is_cancelled() is broken 1154 while( !tbb::is_current_task_group_canceling() ) 1155 utils::yield(); 1156 } 1157 }; 1158 1159 class ParForEachBodyToCancel2WithFeeder : ParForEachBodyToCancel2 { 1160 public: 1161 void operator()( size_t& value, tbb::feeder<size_t> &feeder ) const { 1162 Feed(feeder, 0); 1163 ParForEachBodyToCancel2::operator()(value); 1164 } 1165 }; 1166 1167 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm). 1168 /** This version also tests tbb::is_current_task_group_canceling() method. **/ 1169 template <class Iterator, class body_to_cancel> 1170 void TestCancelation2_parallel_for_each () { 1171 ResetGlobals(); 1172 RunCancellationTest<ParForEachWorker<body_to_cancel, Iterator>, Cancellator2>(); 1173 } 1174 1175 //! Testing parallel_for_each cancellation test 1176 //! \brief \ref error_guessing 1177 TEST_CASE("parallel_for_each cancellation test #1") { 1178 for (auto concurrency_level: utils::concurrency_range()) { 1179 g_NumThreads = static_cast<int>(concurrency_level); 1180 g_Master = std::this_thread::get_id(); 1181 if (g_NumThreads > 1) { 1182 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 1183 // Execute in all the possible modes 1184 for ( size_t j = 0; j < 4; ++j ) { 1185 g_ExceptionInMaster = (j & 1) != 0; 1186 g_SolitaryException = (j & 2) != 0; 1187 1188 RunWithSimpleBody(TestCancelation1_parallel_for_each, ParForEachBodyToCancel); 1189 } 1190 } 1191 } 1192 } 1193 1194 //! Testing parallel_for_each cancellation test 1195 //! \brief \ref error_guessing 1196 TEST_CASE("parallel_for_each cancellation test #2") { 1197 for (auto concurrency_level: utils::concurrency_range()) { 1198 g_NumThreads = static_cast<int>(concurrency_level); 1199 g_Master = std::this_thread::get_id(); 1200 if (g_NumThreads > 1) { 1201 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 1202 // Execute in all the possible modes 1203 for ( size_t j = 0; j < 4; ++j ) { 1204 g_ExceptionInMaster = (j & 1) != 0; 1205 g_SolitaryException = (j & 2) != 0; 1206 1207 RunWithSimpleBody(TestCancelation2_parallel_for_each, ParForEachBodyToCancel2); 1208 } 1209 } 1210 } 1211 } 1212 1213 //////////////////////////////////////////////////////////////////////////////// 1214 // Tests for tbb::parallel_pipeline 1215 //////////////////////////////////////////////////////////////////////////////// 1216 #define NUM_ITEMS 100 1217 1218 const size_t c_DataEndTag = size_t(~0); 1219 1220 int g_NumTokens = 0; 1221 1222 // Simple input filter class, it assigns 1 to all array members 1223 // It stops when it receives item equal to -1 1224 class InputFilter { 1225 mutable std::atomic<size_t> m_Item{}; 1226 mutable size_t m_Buffer[NUM_ITEMS + 1]; 1227 public: 1228 InputFilter() { 1229 m_Item = 0; 1230 for (size_t i = 0; i < NUM_ITEMS; ++i ) 1231 m_Buffer[i] = 1; 1232 m_Buffer[NUM_ITEMS] = c_DataEndTag; 1233 } 1234 InputFilter(const InputFilter& other) : m_Item(other.m_Item.load()) { 1235 for (size_t i = 0; i < NUM_ITEMS; ++i ) 1236 m_Buffer[i] = other.m_Buffer[i]; 1237 } 1238 1239 void* operator()(tbb::flow_control& control) const { 1240 size_t item = m_Item++; 1241 if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true; 1242 else g_NonMasterExecuted = true; 1243 if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment(); 1244 if(item == 1) { 1245 ++g_PipelinesStarted; // count on emitting the first item. 1246 } 1247 if ( item >= NUM_ITEMS ) { 1248 control.stop(); 1249 return nullptr; 1250 } 1251 m_Buffer[item] = 1; 1252 return &m_Buffer[item]; 1253 } 1254 1255 size_t* buffer() { return m_Buffer; } 1256 }; // class InputFilter 1257 1258 #if TBB_USE_EXCEPTIONS 1259 1260 // Simple filter with exception throwing. If parallel, will wait until 1261 // as many parallel filters start as there are threads. 1262 class SimpleFilter { 1263 bool m_canThrow; 1264 bool m_serial; 1265 public: 1266 SimpleFilter (bool canThrow, bool serial ) : m_canThrow(canThrow), m_serial(serial) {} 1267 void* operator()(void* item) const { 1268 ++g_CurExecuted; 1269 if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true; 1270 else g_NonMasterExecuted = true; 1271 if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment(); 1272 if ( m_canThrow ) { 1273 if ( !m_serial ) { 1274 utils::ConcurrencyTracker ct; 1275 WaitUntilConcurrencyPeaks( std::min(g_NumTokens, g_NumThreads) ); 1276 } 1277 ThrowTestException(1); 1278 } 1279 return item; 1280 } 1281 }; // class SimpleFilter 1282 1283 // This enumeration represents filters order in pipeline 1284 struct FilterSet { 1285 tbb::filter_mode mode1, mode2; 1286 bool throw1, throw2; 1287 1288 FilterSet( tbb::filter_mode m1, tbb::filter_mode m2, bool t1, bool t2 ) 1289 : mode1(m1), mode2(m2), throw1(t1), throw2(t2) 1290 {} 1291 }; // struct FilterSet 1292 1293 FilterSet serial_parallel( tbb::filter_mode::serial_in_order, tbb::filter_mode::parallel, /*throw1*/false, /*throw2*/true ); 1294 1295 template<typename InFilter, typename Filter> 1296 class CustomPipeline { 1297 InFilter inputFilter; 1298 Filter filter1; 1299 Filter filter2; 1300 FilterSet my_filters; 1301 public: 1302 CustomPipeline( const FilterSet& filters ) 1303 : filter1(filters.throw1, filters.mode1 != tbb::filter_mode::parallel), 1304 filter2(filters.throw2, filters.mode2 != tbb::filter_mode::parallel), 1305 my_filters(filters) 1306 {} 1307 1308 void run () { 1309 tbb::parallel_pipeline( 1310 g_NumTokens, 1311 tbb::make_filter<void, void*>(tbb::filter_mode::parallel, inputFilter) & 1312 tbb::make_filter<void*, void*>(my_filters.mode1, filter1) & 1313 tbb::make_filter<void*, void>(my_filters.mode2, filter2) 1314 ); 1315 } 1316 void run ( tbb::task_group_context& ctx ) { 1317 tbb::parallel_pipeline( 1318 g_NumTokens, 1319 tbb::make_filter<void, void*>(tbb::filter_mode::parallel, inputFilter) & 1320 tbb::make_filter<void*, void*>(my_filters.mode1, filter1) & 1321 tbb::make_filter<void*, void>(my_filters.mode2, filter2), 1322 ctx 1323 ); 1324 } 1325 }; 1326 1327 typedef CustomPipeline<InputFilter, SimpleFilter> SimplePipeline; 1328 1329 // Tests exceptions without nesting 1330 void Test1_pipeline ( const FilterSet& filters ) { 1331 ResetGlobals(); 1332 SimplePipeline testPipeline(filters); 1333 TRY(); 1334 testPipeline.run(); 1335 if ( g_CurExecuted == 2 * NUM_ITEMS ) { 1336 // all the items were processed, though an exception was supposed to occur. 1337 if(!g_ExceptionInMaster && g_NonMasterExecutedThrow > 0) { 1338 // if !g_ExceptionInMaster, the external thread is not allowed to throw. 1339 // if g_nonMasterExcutedThrow > 0 then a thread besides the external thread tried to throw. 1340 REQUIRE_MESSAGE((filters.mode1 != tbb::filter_mode::parallel && filters.mode2 != tbb::filter_mode::parallel), 1341 "Unusual count"); 1342 } 1343 // In case of all serial filters they might be all executed in the thread(s) 1344 // where exceptions are not allowed by the common test logic. So we just quit. 1345 return; 1346 } 1347 CATCH_AND_ASSERT(); 1348 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception"); 1349 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception"); 1350 REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test"); 1351 if ( !g_SolitaryException ) 1352 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception"); 1353 1354 } // void Test1_pipeline () 1355 1356 // Filter with nesting 1357 class OuterFilter { 1358 public: 1359 OuterFilter ( bool, bool ) {} 1360 1361 void* operator()(void* item) const { 1362 ++g_OuterParCalls; 1363 SimplePipeline testPipeline(serial_parallel); 1364 testPipeline.run(); 1365 return item; 1366 } 1367 }; // class OuterFilter 1368 1369 //! Uses pipeline containing an inner pipeline with the default context not wrapped by a try-block. 1370 /** Inner algorithms are spawned inside the new bound context by default. Since 1371 exceptions thrown from the inner pipeline are not handled by the caller 1372 (outer pipeline body) in this test, they will cancel all the sibling inner 1373 algorithms. **/ 1374 void Test2_pipeline ( const FilterSet& filters ) { 1375 ResetGlobals(); 1376 g_NestedPipelines = true; 1377 CustomPipeline<InputFilter, OuterFilter> testPipeline(filters); 1378 TRY(); 1379 testPipeline.run(); 1380 CATCH_AND_ASSERT(); 1381 bool okayNoExceptionCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow); 1382 REQUIRE_MESSAGE ((g_NumExceptionsCaught == 1 || okayNoExceptionCaught), "No try_blocks in any body expected in this test"); 1383 if ( !g_SolitaryException ) { 1384 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception"); 1385 } 1386 } // void Test2_pipeline () 1387 1388 //! creates isolated inner pipeline and runs it. 1389 class OuterFilterWithIsolatedCtx { 1390 public: 1391 OuterFilterWithIsolatedCtx( bool , bool ) {} 1392 1393 void* operator()(void* item) const { 1394 ++g_OuterParCalls; 1395 tbb::task_group_context ctx(tbb::task_group_context::isolated); 1396 // create inner pipeline with serial input, parallel output filter, second filter throws 1397 SimplePipeline testPipeline(serial_parallel); 1398 testPipeline.run(ctx); 1399 return item; 1400 } 1401 }; // class OuterFilterWithIsolatedCtx 1402 1403 //! Uses pipeline invoking an inner pipeline with an isolated context without a try-block. 1404 /** Even though exceptions thrown from the inner pipeline are not handled 1405 by the caller in this test, they will not affect sibling inner algorithms 1406 already running because of the isolated contexts. However because the first 1407 exception cancels the root parallel_for_each only the first g_NumThreads subranges 1408 will be processed (which launch inner pipelines) **/ 1409 void Test3_pipeline ( const FilterSet& filters ) { 1410 for( int nTries = 1; nTries <= 4; ++nTries) { 1411 ResetGlobals(); 1412 g_NestedPipelines = true; 1413 g_Master = std::this_thread::get_id(); 1414 intptr_t innerCalls = NUM_ITEMS, 1415 minExecuted = (g_NumThreads - 1) * innerCalls; 1416 CustomPipeline<InputFilter, OuterFilterWithIsolatedCtx> testPipeline(filters); 1417 TRY(); 1418 testPipeline.run(); 1419 CATCH_AND_ASSERT(); 1420 1421 bool okayNoExceptionCaught = (g_ExceptionInMaster && !g_MasterExecuted) || 1422 (!g_ExceptionInMaster && !g_NonMasterExecuted); 1423 // only test assertions if the test threw an exception (or we don't care) 1424 bool testSucceeded = okayNoExceptionCaught || g_NumExceptionsCaught > 0; 1425 if(testSucceeded) { 1426 if (g_SolitaryException) { 1427 1428 // The test is one outer pipeline with two NestedFilters that each start an inner pipeline. 1429 // Each time the input filter of a pipeline delivers its first item, it increments 1430 // g_PipelinesStarted. When g_SolitaryException, the throw will not occur until 1431 // g_PipelinesStarted >= 3. (This is so at least a second pipeline in its own isolated 1432 // context will start; that is what we're testing.) 1433 // 1434 // There are two pipelines which will NOT run to completion when a solitary throw 1435 // happens in an isolated inner context: the outer pipeline and the pipeline which 1436 // throws. All the other pipelines which start should run to completion. But only 1437 // inner body invocations are counted. 1438 // 1439 // So g_CurExecuted should be about 1440 // 1441 // (2*NUM_ITEMS) * (g_PipelinesStarted - 2) + 1 1442 // ^ executions for each completed pipeline 1443 // ^ completing pipelines (remembering two will not complete) 1444 // ^ one for the inner throwing pipeline 1445 1446 minExecuted = (2*NUM_ITEMS) * (g_PipelinesStarted - 2) + 1; 1447 // each failing pipeline must execute at least two tasks 1448 REQUIRE_MESSAGE(g_CurExecuted >= minExecuted, "Too few tasks survived exception"); 1449 // no more than g_NumThreads tasks will be executed in a cancelled context. Otherwise 1450 // tasks not executing at throw were scheduled. 1451 g_TGCCancelled.validate(g_NumThreads, "Tasks not in-flight were executed"); 1452 REQUIRE_MESSAGE(g_NumExceptionsCaught == 1, "Should have only one exception"); 1453 // if we're only throwing from the external thread, and that thread didn't 1454 // participate in the pipelines, then no throw occurred. 1455 } 1456 REQUIRE_MESSAGE ((g_NumExceptionsCaught == 1 || okayNoExceptionCaught), "No try_blocks in any body expected in this test"); 1457 REQUIRE_MESSAGE (((g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads) || okayNoExceptionCaught), "Too many tasks survived exception"); 1458 return; 1459 } 1460 } 1461 } 1462 1463 class OuterFilterWithEhBody { 1464 public: 1465 OuterFilterWithEhBody( bool, bool ){} 1466 1467 void* operator()(void* item) const { 1468 tbb::task_group_context ctx(tbb::task_group_context::isolated); 1469 ++g_OuterParCalls; 1470 SimplePipeline testPipeline(serial_parallel); 1471 TRY(); 1472 testPipeline.run(ctx); 1473 CATCH(); 1474 return item; 1475 } 1476 }; // class OuterFilterWithEhBody 1477 1478 //! Uses pipeline body invoking an inner pipeline (with isolated context) inside a try-block. 1479 /** Since exception(s) thrown from the inner pipeline are handled by the caller 1480 in this test, they do not affect other tasks of the the root pipeline 1481 nor sibling inner algorithms. **/ 1482 void Test4_pipeline ( const FilterSet& filters ) { 1483 #if __GNUC__ && !__INTEL_COMPILER 1484 if ( strncmp(__VERSION__, "4.1.0", 5) == 0 ) { 1485 MESSAGE("Known issue: one of exception handling tests is skipped."); 1486 return; 1487 } 1488 #endif 1489 ResetGlobals( true, true ); 1490 // each outer pipeline stage will start NUM_ITEMS inner pipelines. 1491 // each inner pipeline that doesn't throw will process NUM_ITEMS items. 1492 // for solitary exception there will be one pipeline that only processes one stage, one item. 1493 // innerCalls should be 2*NUM_ITEMS 1494 intptr_t innerCalls = 2*NUM_ITEMS, 1495 outerCalls = 2*NUM_ITEMS, 1496 maxExecuted = outerCalls * innerCalls; // the number of invocations of the inner pipelines 1497 CustomPipeline<InputFilter, OuterFilterWithEhBody> testPipeline(filters); 1498 TRY(); 1499 testPipeline.run(); 1500 CATCH_AND_ASSERT(); 1501 intptr_t minExecuted = 0; 1502 bool okayNoExceptionCaught = (g_ExceptionInMaster && !g_MasterExecuted) || 1503 (!g_ExceptionInMaster && !g_NonMasterExecuted); 1504 if ( g_SolitaryException ) { 1505 minExecuted = maxExecuted - innerCalls; // one throwing inner pipeline 1506 REQUIRE_MESSAGE((g_NumExceptionsCaught == 1 || okayNoExceptionCaught), "No exception registered"); 1507 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception"); // probably will assert. 1508 } 1509 else { 1510 // we assume throwing pipelines will not count 1511 minExecuted = (outerCalls - g_NumExceptionsCaught) * innerCalls; 1512 REQUIRE_MESSAGE(((g_NumExceptionsCaught >= 1 && g_NumExceptionsCaught <= outerCalls) || okayNoExceptionCaught), "Unexpected actual number of exceptions"); 1513 REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too many executed tasks reported"); 1514 // too many already-scheduled tasks are started after the first exception is 1515 // thrown. And g_ExecutedAtLastCatch is updated every time an exception is caught. 1516 // So with multiple exceptions there are a variable number of tasks that have been 1517 // discarded because of the signals. 1518 // each throw is caught, so we will see many cancelled tasks. g_ExecutedAtLastCatch is 1519 // updated with each throw, so the value will be the number of tasks executed at the last 1520 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived multiple exceptions"); 1521 } 1522 } // void Test4_pipeline () 1523 1524 //! Tests pipeline function passed with different combination of filters 1525 template<void testFunc(const FilterSet&)> 1526 void TestWithDifferentFiltersAndConcurrency() { 1527 for (auto concurrency_level: utils::concurrency_range()) { 1528 g_NumThreads = static_cast<int>(concurrency_level); 1529 g_Master = std::this_thread::get_id(); 1530 if (g_NumThreads > 1) { 1531 // Execute in all the possible modes 1532 for ( size_t j = 0; j < 4; ++j ) { 1533 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 1534 g_ExceptionInMaster = (j & 1) != 0; 1535 g_SolitaryException = (j & 2) != 0; 1536 g_NumTokens = 2 * g_NumThreads; 1537 const int NumFilterTypes = 3; 1538 const tbb::filter_mode modes[NumFilterTypes] = { 1539 tbb::filter_mode::parallel, 1540 tbb::filter_mode::serial_in_order, 1541 tbb::filter_mode::serial_out_of_order 1542 }; 1543 1544 for ( int i = 0; i < NumFilterTypes; ++i ) { 1545 for ( int n = 0; n < NumFilterTypes; ++n ) { 1546 for ( int k = 0; k < 2; ++k ) 1547 testFunc( FilterSet(modes[i], modes[j], k == 0, k != 0) ); 1548 } 1549 } 1550 } 1551 } 1552 } 1553 } 1554 1555 //! Testing parallel_pipeline exception handling 1556 //! \brief \ref error_guessing 1557 TEST_CASE("parallel_pipeline exception handling test #1") { 1558 TestWithDifferentFiltersAndConcurrency<Test1_pipeline>(); 1559 } 1560 1561 //! Testing parallel_pipeline exception handling 1562 //! \brief \ref error_guessing 1563 TEST_CASE("parallel_pipeline exception handling test #2") { 1564 TestWithDifferentFiltersAndConcurrency<Test2_pipeline>(); 1565 } 1566 1567 //! Testing parallel_pipeline exception handling 1568 //! \brief \ref error_guessing 1569 TEST_CASE("parallel_pipeline exception handling test #3") { 1570 TestWithDifferentFiltersAndConcurrency<Test3_pipeline>(); 1571 } 1572 1573 //! Testing parallel_pipeline exception handling 1574 //! \brief \ref error_guessing 1575 TEST_CASE("parallel_pipeline exception handling test #4") { 1576 TestWithDifferentFiltersAndConcurrency<Test4_pipeline>(); 1577 } 1578 1579 #endif /* TBB_USE_EXCEPTIONS */ 1580 1581 class FilterToCancel { 1582 public: 1583 FilterToCancel() {} 1584 void* operator()(void* item) const { 1585 ++g_CurExecuted; 1586 Cancellator::WaitUntilReady(); 1587 return item; 1588 } 1589 }; // class FilterToCancel 1590 1591 template <class Filter_to_cancel> 1592 class PipelineLauncher { 1593 tbb::task_group_context &my_ctx; 1594 public: 1595 PipelineLauncher ( tbb::task_group_context& ctx ) : my_ctx(ctx) {} 1596 1597 void operator()() const { 1598 // Run test when serial filter is the first non-input filter 1599 InputFilter inputFilter; 1600 Filter_to_cancel filterToCancel; 1601 tbb::parallel_pipeline( 1602 g_NumTokens, 1603 tbb::make_filter<void, void*>(tbb::filter_mode::parallel, inputFilter) & 1604 tbb::make_filter<void*, void>(tbb::filter_mode::parallel, filterToCancel), 1605 my_ctx 1606 ); 1607 } 1608 }; 1609 1610 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm). 1611 void TestCancelation1_pipeline () { 1612 ResetGlobals(); 1613 g_ThrowException = false; 1614 RunCancellationTest<PipelineLauncher<FilterToCancel>, Cancellator>(10); 1615 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived cancellation"); 1616 REQUIRE_MESSAGE (g_CurExecuted < g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks were executed after cancellation"); 1617 } 1618 1619 class FilterToCancel2 { 1620 public: 1621 FilterToCancel2() {} 1622 1623 void* operator()(void* item) const { 1624 ++g_CurExecuted; 1625 utils::ConcurrencyTracker ct; 1626 // The test will hang (and be timed out by the test system) if is_cancelled() is broken 1627 while( !tbb::is_current_task_group_canceling() ) 1628 utils::yield(); 1629 return item; 1630 } 1631 }; 1632 1633 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm). 1634 /** This version also tests task::is_cancelled() method. **/ 1635 void TestCancelation2_pipeline () { 1636 ResetGlobals(); 1637 RunCancellationTest<PipelineLauncher<FilterToCancel2>, Cancellator2>(); 1638 // g_CurExecuted is always >= g_ExecutedAtLastCatch, because the latter is always a snapshot of the 1639 // former, and g_CurExecuted is monotonic increasing. so the comparison should be at least ==. 1640 // If another filter is started after cancel but before cancellation is propagated, then the 1641 // number will be larger. 1642 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch, "Some tasks were executed after cancellation"); 1643 } 1644 1645 /** If min and max thread numbers specified on the command line are different, 1646 the test is run only for 2 sizes of the thread pool (MinThread and MaxThread) 1647 to be able to test the high and low contention modes while keeping the test reasonably fast **/ 1648 1649 //! Testing parallel_pipeline cancellation 1650 //! \brief \ref error_guessing 1651 TEST_CASE("parallel_pipeline cancellation test #1") { 1652 for (auto concurrency_level: utils::concurrency_range()) { 1653 g_NumThreads = static_cast<int>(concurrency_level); 1654 g_Master = std::this_thread::get_id(); 1655 if (g_NumThreads > 1) { 1656 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 1657 // Execute in all the possible modes 1658 for ( size_t j = 0; j < 4; ++j ) { 1659 g_ExceptionInMaster = (j & 1) != 0; 1660 g_SolitaryException = (j & 2) != 0; 1661 g_NumTokens = 2 * g_NumThreads; 1662 1663 TestCancelation1_pipeline(); 1664 } 1665 } 1666 } 1667 } 1668 1669 //! Testing parallel_pipeline cancellation 1670 //! \brief \ref error_guessing 1671 TEST_CASE("parallel_pipeline cancellation test #2") { 1672 for (auto concurrency_level: utils::concurrency_range()) { 1673 g_NumThreads = static_cast<int>(concurrency_level); 1674 g_Master = std::this_thread::get_id(); 1675 if (g_NumThreads > 1) { 1676 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 1677 // Execute in all the possible modes 1678 for ( size_t j = 0; j < 4; ++j ) { 1679 g_ExceptionInMaster = (j & 1) != 0; 1680 g_SolitaryException = (j & 2) != 0; 1681 g_NumTokens = 2 * g_NumThreads; 1682 1683 TestCancelation2_pipeline(); 1684 } 1685 } 1686 } 1687 } 1688