1 /* 2 Copyright (c) 2005-2022 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "common/test.h" 18 #include "common/concurrency_tracker.h" 19 #include "common/iterator.h" 20 #include "common/utils_concurrency_limit.h" 21 22 #include <limits.h> // for INT_MAX 23 #include <thread> 24 #include <vector> 25 26 #include "tbb/parallel_for.h" 27 #include "tbb/parallel_reduce.h" 28 #include "tbb/parallel_for_each.h" 29 #include "tbb/parallel_pipeline.h" 30 #include "tbb/blocked_range.h" 31 #include "tbb/task_group.h" 32 #include "tbb/global_control.h" 33 #include "tbb/concurrent_unordered_map.h" 34 #include "tbb/task.h" 35 36 //! \file test_eh_algorithms.cpp 37 //! \brief Test for [algorithms.parallel_for algorithms.parallel_reduce algorithms.parallel_deterministic_reduce algorithms.parallel_for_each algorithms.parallel_pipeline algorithms.parallel_pipeline.flow_control] specifications 38 39 #define FLAT_RANGE 100000 40 #define FLAT_GRAIN 100 41 #define OUTER_RANGE 100 42 #define OUTER_GRAIN 10 43 #define INNER_RANGE (FLAT_RANGE / OUTER_RANGE) 44 #define INNER_GRAIN (FLAT_GRAIN / OUTER_GRAIN) 45 46 struct context_specific_counter { 47 tbb::concurrent_unordered_map<tbb::task_group_context*, std::atomic<unsigned>> context_map{}; 48 49 void increment() { 50 tbb::task_group_context* ctx = tbb::task::current_context(); 51 REQUIRE(ctx != nullptr); 52 context_map[ctx]++; 53 } 54 55 void reset() { 56 context_map.clear(); 57 } 58 59 void validate(unsigned expected_count, const char* msg) { 60 for (auto it = context_map.begin(); it != context_map.end(); it++) { 61 REQUIRE_MESSAGE( it->second <= expected_count, msg); 62 } 63 } 64 }; 65 66 std::atomic<intptr_t> g_FedTasksCount{}; // number of tasks added by parallel_for_each feeder 67 std::atomic<intptr_t> g_OuterParCalls{}; // number of actual invocations of the outer construct executed. 68 context_specific_counter g_TGCCancelled{}; // Number of times a task sees its group cancelled at start 69 70 #include "common/exception_handling.h" 71 72 /******************************** 73 Variables in test 74 75 __ Test control variables 76 g_ExceptionInMaster -- only the external thread is allowed to throw. If false, the external cannot throw 77 g_SolitaryException -- only one throw may be executed. 78 79 -- controls for ThrowTestException for pipeline tests 80 g_NestedPipelines -- are inner pipelines being run? 81 g_PipelinesStarted -- how many pipelines have run their first filter at least once. 82 83 -- Information variables 84 85 g_Master -- Thread ID of the "external" thread 86 In pipelines sometimes the external thread does not participate, so the tests have to be resilient to this. 87 88 -- Measurement variables 89 90 g_OuterParCalls -- how many outer parallel ranges or filters started 91 g_TGCCancelled -- how many inner parallel ranges or filters saw task::self().is_cancelled() 92 g_ExceptionsThrown -- number of throws executed (counted in ThrowTestException) 93 g_MasterExecutedThrow -- number of times external thread actually executed a throw 94 g_NonMasterExecutedThrow -- number of times non-external thread actually executed a throw 95 g_ExceptionCaught -- one of PropagatedException or unknown exception was caught. (Other exceptions cause assertions.) 96 97 -- Tallies for the task bodies which have executed (counted in each inner body, sampled in ThrowTestException) 98 g_CurExecuted -- total number of inner ranges or filters which executed 99 g_ExecutedAtLastCatch -- value of g_CurExecuted when last catch was made, 0 if none. 100 g_ExecutedAtFirstCatch -- value of g_CurExecuted when first catch is made, 0 if none. 101 *********************************/ 102 103 inline void ResetGlobals ( bool throwException = true, bool flog = false ) { 104 ResetEhGlobals( throwException, flog ); 105 g_FedTasksCount = 0; 106 g_OuterParCalls = 0; 107 g_NestedPipelines = false; 108 g_TGCCancelled.reset(); 109 } 110 111 //////////////////////////////////////////////////////////////////////////////// 112 // Tests for tbb::parallel_for and tbb::parallel_reduce 113 //////////////////////////////////////////////////////////////////////////////// 114 115 typedef size_t count_type; 116 typedef tbb::blocked_range<count_type> range_type; 117 118 inline intptr_t CountSubranges(range_type r) { 119 if(!r.is_divisible()) return intptr_t(1); 120 range_type r2(r,tbb::split()); 121 return CountSubranges(r) + CountSubranges(r2); 122 } 123 124 inline intptr_t NumSubranges ( intptr_t length, intptr_t grain ) { 125 return CountSubranges(range_type(0,length,grain)); 126 } 127 128 template<class Body> 129 intptr_t TestNumSubrangesCalculation ( intptr_t length, intptr_t grain, intptr_t inner_length, intptr_t inner_grain ) { 130 ResetGlobals(); 131 g_ThrowException = false; 132 intptr_t outerCalls = NumSubranges(length, grain), 133 innerCalls = NumSubranges(inner_length, inner_grain), 134 maxExecuted = outerCalls * (innerCalls + 1); 135 tbb::parallel_for( range_type(0, length, grain), Body() ); 136 REQUIRE_MESSAGE (g_CurExecuted == maxExecuted, "Wrong estimation of bodies invocation count"); 137 return maxExecuted; 138 } 139 140 class NoThrowParForBody { 141 public: 142 void operator()( const range_type& r ) const { 143 if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true; 144 else g_NonMasterExecuted = true; 145 if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment(); 146 utils::doDummyWork(r.size()); 147 } 148 }; 149 150 #if TBB_USE_EXCEPTIONS 151 152 void Test0 () { 153 ResetGlobals(); 154 tbb::simple_partitioner p; 155 for( size_t i=0; i<10; ++i ) { 156 tbb::parallel_for( range_type(0, 0, 1), NoThrowParForBody() ); 157 tbb::parallel_for( range_type(0, 0, 1), NoThrowParForBody(), p ); 158 tbb::parallel_for( range_type(0, 128, 8), NoThrowParForBody() ); 159 tbb::parallel_for( range_type(0, 128, 8), NoThrowParForBody(), p ); 160 } 161 } // void Test0 () 162 163 //! Template that creates a functor suitable for parallel_reduce from a functor for parallel_for. 164 template<typename ParForBody> 165 class SimpleParReduceBody { 166 ParForBody m_Body; 167 public: 168 void operator=(const SimpleParReduceBody&) = delete; 169 SimpleParReduceBody(const SimpleParReduceBody&) = default; 170 SimpleParReduceBody() = default; 171 172 void operator()( const range_type& r ) const { m_Body(r); } 173 SimpleParReduceBody( SimpleParReduceBody& left, tbb::split ) : m_Body(left.m_Body) {} 174 void join( SimpleParReduceBody& /*right*/ ) {} 175 }; // SimpleParReduceBody 176 177 //! Test parallel_for and parallel_reduce for a given partitioner. 178 /** The Body need only be suitable for a parallel_for. */ 179 template<typename ParForBody, typename Partitioner> 180 void TestParallelLoopAux() { 181 Partitioner partitioner; 182 for( int i=0; i<2; ++i ) { 183 ResetGlobals(); 184 TRY(); 185 if( i==0 ) 186 tbb::parallel_for( range_type(0, FLAT_RANGE, FLAT_GRAIN), ParForBody(), partitioner ); 187 else { 188 SimpleParReduceBody<ParForBody> rb; 189 tbb::parallel_reduce( range_type(0, FLAT_RANGE, FLAT_GRAIN), rb, partitioner ); 190 } 191 CATCH_AND_ASSERT(); 192 // two cases: g_SolitaryException and !g_SolitaryException 193 // 1) g_SolitaryException: only one thread actually threw. There is only one context, so the exception 194 // (when caught) will cause that context to be cancelled. After this event, there may be one or 195 // more threads which are "in-flight", up to g_NumThreads, but no more will be started. The threads, 196 // when they start, if they see they are cancelled, TGCCancelled is incremented. 197 // 2) !g_SolitaryException: more than one thread can throw. The number of threads that actually 198 // threw is g_MasterExecutedThrow if only the external thread is allowed, else g_NonMasterExecutedThrow. 199 // Only one context, so TGCCancelled should be <= g_NumThreads. 200 // 201 // the reasoning is similar for nested algorithms in a single context (Test2). 202 // 203 // If a thread throws in a context, more than one subsequent task body may see the 204 // cancelled state (if they are scheduled before the state is propagated.) this is 205 // infrequent, but it occurs. So what was to be an assertion must be a remark. 206 g_TGCCancelled.validate( g_NumThreads, "Too many tasks ran after exception thrown"); 207 REQUIRE_MESSAGE(g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception"); 208 if ( g_SolitaryException ) { 209 REQUIRE_MESSAGE(g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test"); 210 REQUIRE_MESSAGE(g_NumExceptionsCaught == (g_ExceptionInMaster ? g_MasterExecutedThrow : g_NonMasterExecutedThrow), 211 "Not all throws were caught"); 212 REQUIRE_MESSAGE(g_ExecutedAtFirstCatch == g_ExecutedAtLastCatch, "Too many exceptions occurred"); 213 } 214 else { 215 REQUIRE_MESSAGE(g_NumExceptionsCaught >= 1, "No try blocks in any body expected in this test"); 216 } 217 } 218 } // TestParallelLoopAux 219 220 //! Test with parallel_for and parallel_reduce, over all three kinds of partitioners. 221 /** The Body only needs to be suitable for tbb::parallel_for. */ 222 template<typename Body> 223 void TestParallelLoop() { 224 // The simple and auto partitioners should be const, but not the affinity partitioner. 225 TestParallelLoopAux<Body, const tbb::simple_partitioner >(); 226 TestParallelLoopAux<Body, const tbb::auto_partitioner >(); 227 #define __TBB_TEMPORARILY_DISABLED 1 228 #if !__TBB_TEMPORARILY_DISABLED 229 // TODO: Improve the test so that it tolerates delayed start of tasks with affinity_partitioner 230 TestParallelLoopAux<Body, /***/ tbb::affinity_partitioner>(); 231 #endif 232 #undef __TBB_TEMPORARILY_DISABLED 233 } 234 235 class SimpleParForBody { 236 public: 237 void operator=(const SimpleParForBody&) = delete; 238 SimpleParForBody(const SimpleParForBody&) = default; 239 SimpleParForBody() = default; 240 241 void operator()( const range_type& r ) const { 242 utils::ConcurrencyTracker ct; 243 ++g_CurExecuted; 244 if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true; 245 else g_NonMasterExecuted = true; 246 if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment(); 247 utils::doDummyWork(r.size()); 248 WaitUntilConcurrencyPeaks(); 249 ThrowTestException(1); 250 } 251 }; 252 253 void Test1() { 254 // non-nested parallel_for/reduce with throwing body, one context 255 TestParallelLoop<SimpleParForBody>(); 256 } // void Test1 () 257 258 class OuterParForBody { 259 public: 260 void operator=(const OuterParForBody&) = delete; 261 OuterParForBody(const OuterParForBody&) = default; 262 OuterParForBody() = default; 263 void operator()( const range_type& ) const { 264 utils::ConcurrencyTracker ct; 265 ++g_OuterParCalls; 266 tbb::parallel_for( tbb::blocked_range<size_t>(0, INNER_RANGE, INNER_GRAIN), SimpleParForBody() ); 267 } 268 }; 269 270 //! Uses parallel_for body containing an inner parallel_for with the default context not wrapped by a try-block. 271 /** Inner algorithms are spawned inside the new bound context by default. Since 272 exceptions thrown from the inner parallel_for are not handled by the caller 273 (outer parallel_for body) in this test, they will cancel all the sibling inner 274 algorithms. **/ 275 void Test2 () { 276 TestParallelLoop<OuterParForBody>(); 277 } // void Test2 () 278 279 class OuterParForBodyWithIsolatedCtx { 280 public: 281 void operator()( const range_type& ) const { 282 tbb::task_group_context ctx(tbb::task_group_context::isolated); 283 ++g_OuterParCalls; 284 tbb::parallel_for( tbb::blocked_range<size_t>(0, INNER_RANGE, INNER_GRAIN), SimpleParForBody(), tbb::simple_partitioner(), ctx ); 285 } 286 }; 287 288 //! Uses parallel_for body invoking an inner parallel_for with an isolated context without a try-block. 289 /** Even though exceptions thrown from the inner parallel_for are not handled 290 by the caller in this test, they will not affect sibling inner algorithms 291 already running because of the isolated contexts. However because the first 292 exception cancels the root parallel_for only the first g_NumThreads subranges 293 will be processed (which launch inner parallel_fors) **/ 294 void Test3 () { 295 ResetGlobals(); 296 typedef OuterParForBodyWithIsolatedCtx body_type; 297 intptr_t innerCalls = NumSubranges(INNER_RANGE, INNER_GRAIN), 298 // we expect one thread to throw without counting, the rest to run to completion 299 // this formula assumes g_numThreads outer pfor ranges will be started, but that is not the 300 // case; the SimpleParFor subranges are started up as part of the outer ones, and when 301 // the amount of concurrency reaches g_NumThreads no more outer Pfor ranges are started. 302 // so we have to count the number of outer Pfors actually started. 303 minExecuted = (g_NumThreads - 1) * innerCalls; 304 TRY(); 305 tbb::parallel_for( range_type(0, OUTER_RANGE, OUTER_GRAIN), body_type() ); 306 CATCH_AND_ASSERT(); 307 minExecuted = (g_OuterParCalls - 1) * innerCalls; // see above 308 309 // The first formula above assumes all ranges of the outer parallel for are executed, and one 310 // cancels. In the event, we have a smaller number of ranges that start before the exception 311 // is caught. 312 // 313 // g_SolitaryException:One inner range throws. Outer parallel_For is cancelled, but sibling 314 // parallel_fors continue to completion (unless the threads that execute 315 // are not allowed to throw, in which case we will not see any exceptions). 316 // !g_SolitaryException:multiple inner ranges may throw. Any which throws will stop, and the 317 // corresponding range of the outer pfor will stop also. 318 // 319 // In either case, once the outer pfor gets the exception it will stop executing further ranges. 320 321 // if the only threads executing were not allowed to throw, then not seeing an exception is okay. 322 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecuted) || (!g_ExceptionInMaster && !g_NonMasterExecuted); 323 if ( g_SolitaryException ) { 324 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception"); 325 REQUIRE_MESSAGE (g_CurExecuted > minExecuted, "Too few tasks survived exception"); 326 REQUIRE_MESSAGE ((g_CurExecuted <= minExecuted + (g_ExecutedAtLastCatch + g_NumThreads)), "Too many tasks survived exception"); 327 REQUIRE_MESSAGE ((g_NumExceptionsCaught == 1 || okayNoExceptionsCaught), "No try_blocks in any body expected in this test"); 328 } 329 else { 330 REQUIRE_MESSAGE ((g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads), "Too many tasks survived exception"); 331 REQUIRE_MESSAGE ((g_NumExceptionsCaught >= 1 || okayNoExceptionsCaught), "No try_blocks in any body expected in this test"); 332 } 333 } // void Test3 () 334 335 class OuterParForExceptionSafeBody { 336 public: 337 void operator()( const range_type& ) const { 338 tbb::task_group_context ctx(tbb::task_group_context::isolated); 339 ++g_OuterParCalls; 340 TRY(); 341 tbb::parallel_for( tbb::blocked_range<size_t>(0, INNER_RANGE, INNER_GRAIN), SimpleParForBody(), tbb::simple_partitioner(), ctx ); 342 CATCH(); // this macro sets g_ExceptionCaught 343 } 344 }; 345 346 //! Uses parallel_for body invoking an inner parallel_for (with isolated context) inside a try-block. 347 /** Since exception(s) thrown from the inner parallel_for are handled by the caller 348 in this test, they do not affect neither other tasks of the the root parallel_for 349 nor sibling inner algorithms. **/ 350 void Test4 () { 351 ResetGlobals( true, true ); 352 intptr_t innerCalls = NumSubranges(INNER_RANGE, INNER_GRAIN), 353 outerCalls = NumSubranges(OUTER_RANGE, OUTER_GRAIN); 354 TRY(); 355 tbb::parallel_for( range_type(0, OUTER_RANGE, OUTER_GRAIN), OuterParForExceptionSafeBody() ); 356 CATCH(); 357 // g_SolitaryException : one inner pfor will throw, the rest will execute to completion. 358 // so the count should be (outerCalls -1) * innerCalls, if a throw happened. 359 // !g_SolitaryException : possible multiple inner pfor throws. Should be approximately 360 // (outerCalls - g_NumExceptionsCaught) * innerCalls, give or take a few 361 intptr_t minExecuted = (outerCalls - g_NumExceptionsCaught) * innerCalls; 362 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecuted) || (!g_ExceptionInMaster && !g_NonMasterExecuted); 363 if ( g_SolitaryException ) { 364 // only one task had exception thrown. That task had at least one execution (the one that threw). 365 // There may be an arbitrary number of ranges executed after the throw but before the exception 366 // is caught in the scheduler and cancellation is signaled. (seen 9, 11 and 62 (!) for 8 threads) 367 REQUIRE_MESSAGE ((g_NumExceptionsCaught == 1 || okayNoExceptionsCaught), "No exception registered"); 368 REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too few tasks executed"); 369 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception"); 370 // a small number of threads can execute in a throwing sub-pfor, if the task which is 371 // to do the solitary throw swaps out after registering its intent to throw but before it 372 // actually does so. As a result, the number of extra tasks cannot exceed the number of thread 373 // for each nested pfor invication) 374 REQUIRE_MESSAGE (g_CurExecuted <= minExecuted + (g_ExecutedAtLastCatch + g_NumThreads), "Too many tasks survived exception"); 375 } 376 else { 377 REQUIRE_MESSAGE (((g_NumExceptionsCaught >= 1 && g_NumExceptionsCaught <= outerCalls) || okayNoExceptionsCaught), "Unexpected actual number of exceptions"); 378 REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too few executed tasks reported"); 379 REQUIRE_MESSAGE ((g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads), "Too many tasks survived multiple exceptions"); 380 REQUIRE_MESSAGE (g_CurExecuted <= outerCalls * (1 + g_NumThreads), "Too many tasks survived exception"); 381 } 382 } // void Test4 () 383 384 //! Testing parallel_for and parallel_reduce exception handling 385 //! \brief \ref error_guessing 386 TEST_CASE("parallel_for and parallel_reduce exception handling test #0") { 387 for (auto concurrency_level: utils::concurrency_range()) { 388 g_NumThreads = static_cast<int>(concurrency_level); 389 g_Master = std::this_thread::get_id(); 390 if (g_NumThreads > 1) { 391 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 392 // Execute in all the possible modes 393 for ( size_t j = 0; j < 4; ++j ) { 394 g_ExceptionInMaster = (j & 1) != 0; 395 g_SolitaryException = (j & 2) != 0; 396 397 Test0(); 398 } 399 } 400 } 401 } 402 403 //! Testing parallel_for and parallel_reduce exception handling 404 //! \brief \ref error_guessing 405 TEST_CASE("parallel_for and parallel_reduce exception handling test #1") { 406 for (auto concurrency_level: utils::concurrency_range()) { 407 g_NumThreads = static_cast<int>(concurrency_level); 408 g_Master = std::this_thread::get_id(); 409 if (g_NumThreads > 1) { 410 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 411 // Execute in all the possible modes 412 for ( size_t j = 0; j < 4; ++j ) { 413 g_ExceptionInMaster = (j & 1) != 0; 414 g_SolitaryException = (j & 2) != 0; 415 416 Test1(); 417 } 418 } 419 } 420 } 421 422 //! Testing parallel_for and parallel_reduce exception handling 423 //! \brief \ref error_guessing 424 TEST_CASE("parallel_for and parallel_reduce exception handling test #2") { 425 for (auto concurrency_level: utils::concurrency_range()) { 426 g_NumThreads = static_cast<int>(concurrency_level); 427 g_Master = std::this_thread::get_id(); 428 if (g_NumThreads > 1) { 429 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 430 // Execute in all the possible modes 431 for ( size_t j = 0; j < 4; ++j ) { 432 g_ExceptionInMaster = (j & 1) != 0; 433 g_SolitaryException = (j & 2) != 0; 434 435 Test2(); 436 } 437 } 438 } 439 } 440 441 //! Testing parallel_for and parallel_reduce exception handling 442 //! \brief \ref error_guessing 443 TEST_CASE("parallel_for and parallel_reduce exception handling test #3") { 444 for (auto concurrency_level: utils::concurrency_range()) { 445 g_NumThreads = static_cast<int>(concurrency_level); 446 g_Master = std::this_thread::get_id(); 447 if (g_NumThreads > 1) { 448 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 449 // Execute in all the possible modes 450 for ( size_t j = 0; j < 4; ++j ) { 451 g_ExceptionInMaster = (j & 1) != 0; 452 g_SolitaryException = (j & 2) != 0; 453 454 Test3(); 455 } 456 } 457 } 458 } 459 460 //! Testing parallel_for and parallel_reduce exception handling 461 //! \brief \ref error_guessing 462 TEST_CASE("parallel_for and parallel_reduce exception handling test #4") { 463 for (auto concurrency_level: utils::concurrency_range()) { 464 g_NumThreads = static_cast<int>(concurrency_level); 465 g_Master = std::this_thread::get_id(); 466 if (g_NumThreads > 1) { 467 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 468 // Execute in all the possible modes 469 for ( size_t j = 0; j < 4; ++j ) { 470 g_ExceptionInMaster = (j & 1) != 0; 471 g_SolitaryException = (j & 2) != 0; 472 473 Test4(); 474 } 475 } 476 } 477 } 478 479 #endif /* TBB_USE_EXCEPTIONS */ 480 481 class ParForBodyToCancel { 482 public: 483 void operator()( const range_type& ) const { 484 ++g_CurExecuted; 485 Cancellator::WaitUntilReady(); 486 } 487 }; 488 489 template<class B> 490 class ParForLauncher { 491 tbb::task_group_context &my_ctx; 492 public: 493 void operator()() const { 494 tbb::parallel_for( range_type(0, FLAT_RANGE, FLAT_GRAIN), B(), tbb::simple_partitioner(), my_ctx ); 495 } 496 ParForLauncher ( tbb::task_group_context& ctx ) : my_ctx(ctx) {} 497 }; 498 499 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm). 500 void TestCancelation1 () { 501 ResetGlobals( false ); 502 RunCancellationTest<ParForLauncher<ParForBodyToCancel>, Cancellator>( NumSubranges(FLAT_RANGE, FLAT_GRAIN) / 4 ); 503 } 504 505 class Cancellator2 { 506 tbb::task_group_context &m_GroupToCancel; 507 508 public: 509 void operator()() const { 510 utils::ConcurrencyTracker ct; 511 WaitUntilConcurrencyPeaks(); 512 m_GroupToCancel.cancel_group_execution(); 513 g_ExecutedAtLastCatch = g_CurExecuted.load(); 514 } 515 516 Cancellator2 ( tbb::task_group_context& ctx, intptr_t ) : m_GroupToCancel(ctx) {} 517 }; 518 519 class ParForBodyToCancel2 { 520 public: 521 void operator()( const range_type& ) const { 522 ++g_CurExecuted; 523 utils::ConcurrencyTracker ct; 524 // The test will hang (and be timed out by the test system) if is_cancelled() is broken 525 while( !tbb::is_current_task_group_canceling() ) 526 utils::yield(); 527 } 528 }; 529 530 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm). 531 /** This version also tests tbb::is_current_task_group_canceling() method. **/ 532 void TestCancelation2 () { 533 ResetGlobals(); 534 RunCancellationTest<ParForLauncher<ParForBodyToCancel2>, Cancellator2>(); 535 REQUIRE_MESSAGE (g_ExecutedAtLastCatch < g_NumThreads, "Somehow worker tasks started their execution before the cancellator task"); 536 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived cancellation"); 537 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Some tasks were executed after cancellation"); 538 } 539 540 //////////////////////////////////////////////////////////////////////////////// 541 // Regression test based on the contribution by the author of the following forum post: 542 // http://softwarecommunity.intel.com/isn/Community/en-US/forums/thread/30254959.aspx 543 544 class Worker { 545 static const int max_nesting = 3; 546 static const int reduce_range = 1024; 547 static const int reduce_grain = 256; 548 public: 549 int DoWork (int level); 550 int Validate (int start_level) { 551 int expected = 1; // identity for multiplication 552 for(int i=start_level+1; i<max_nesting; ++i) 553 expected *= reduce_range; 554 return expected; 555 } 556 }; 557 558 class RecursiveParReduceBodyWithSharedWorker { 559 Worker * m_SharedWorker; 560 int m_NestingLevel; 561 int m_Result; 562 public: 563 RecursiveParReduceBodyWithSharedWorker ( RecursiveParReduceBodyWithSharedWorker& src, tbb::split ) 564 : m_SharedWorker(src.m_SharedWorker) 565 , m_NestingLevel(src.m_NestingLevel) 566 , m_Result(0) 567 {} 568 RecursiveParReduceBodyWithSharedWorker ( Worker *w, int outer ) 569 : m_SharedWorker(w) 570 , m_NestingLevel(outer) 571 , m_Result(0) 572 {} 573 574 void operator() ( const tbb::blocked_range<size_t>& r ) { 575 if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true; 576 else g_NonMasterExecuted = true; 577 if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment(); 578 for (size_t i = r.begin (); i != r.end (); ++i) { 579 m_Result += m_SharedWorker->DoWork (m_NestingLevel); 580 } 581 } 582 void join (const RecursiveParReduceBodyWithSharedWorker & x) { 583 m_Result += x.m_Result; 584 } 585 int result () { return m_Result; } 586 }; 587 588 int Worker::DoWork ( int level ) { 589 ++level; 590 if ( level < max_nesting ) { 591 RecursiveParReduceBodyWithSharedWorker rt (this, level); 592 tbb::parallel_reduce (tbb::blocked_range<size_t>(0, reduce_range, reduce_grain), rt); 593 return rt.result(); 594 } 595 else 596 return 1; 597 } 598 599 //! Regression test for hanging that occurred with the first version of cancellation propagation 600 void TestCancelation3 () { 601 Worker w; 602 int result = w.DoWork (0); 603 int expected = w.Validate(0); 604 REQUIRE_MESSAGE ( result == expected, "Wrong calculation result"); 605 } 606 607 struct StatsCounters { 608 std::atomic<size_t> my_total_created; 609 std::atomic<size_t> my_total_deleted; 610 StatsCounters() { 611 my_total_created = 0; 612 my_total_deleted = 0; 613 } 614 }; 615 616 class ParReduceBody { 617 StatsCounters* my_stats; 618 size_t my_id; 619 bool my_exception; 620 tbb::task_group_context& tgc; 621 622 public: 623 ParReduceBody( StatsCounters& s_, tbb::task_group_context& context, bool e_ ) : 624 my_stats(&s_), my_exception(e_), tgc(context) { 625 my_id = my_stats->my_total_created++; 626 } 627 628 ParReduceBody( const ParReduceBody& lhs ) : tgc(lhs.tgc) { 629 my_stats = lhs.my_stats; 630 my_id = my_stats->my_total_created++; 631 } 632 633 ParReduceBody( ParReduceBody& lhs, tbb::split ) : tgc(lhs.tgc) { 634 my_stats = lhs.my_stats; 635 my_id = my_stats->my_total_created++; 636 } 637 638 ~ParReduceBody(){ ++my_stats->my_total_deleted; } 639 640 void operator()( const tbb::blocked_range<std::size_t>& /*range*/ ) const { 641 //Do nothing, except for one task (chosen arbitrarily) 642 if( my_id >= 12 ) { 643 if( my_exception ) 644 ThrowTestException(1); 645 else 646 tgc.cancel_group_execution(); 647 } 648 } 649 650 void join( ParReduceBody& /*rhs*/ ) {} 651 }; 652 653 void TestCancelation4() { 654 StatsCounters statsObj; 655 #if TBB_USE_EXCEPTIONS 656 try 657 #endif 658 { 659 tbb::task_group_context tgc1, tgc2; 660 ParReduceBody body_for_cancellation(statsObj, tgc1, false), body_for_exception(statsObj, tgc2, true); 661 tbb::parallel_reduce( tbb::blocked_range<std::size_t>(0,100000000,100), body_for_cancellation, tbb::simple_partitioner(), tgc1 ); 662 tbb::parallel_reduce( tbb::blocked_range<std::size_t>(0,100000000,100), body_for_exception, tbb::simple_partitioner(), tgc2 ); 663 } 664 #if TBB_USE_EXCEPTIONS 665 catch(...) {} 666 #endif 667 REQUIRE_MESSAGE ( statsObj.my_total_created==statsObj.my_total_deleted, "Not all parallel_reduce body objects created were reclaimed"); 668 } 669 670 //! Testing parallel_for and parallel_reduce cancellation 671 //! \brief \ref error_guessing 672 TEST_CASE("parallel_for and parallel_reduce cancellation test #1") { 673 for (auto concurrency_level: utils::concurrency_range()) { 674 g_NumThreads = static_cast<int>(concurrency_level); 675 g_Master = std::this_thread::get_id(); 676 if (g_NumThreads > 1) { 677 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 678 // Execute in all the possible modes 679 for ( size_t j = 0; j < 4; ++j ) { 680 g_ExceptionInMaster = (j & 1) != 0; 681 g_SolitaryException = (j & 2) != 0; 682 683 TestCancelation1(); 684 } 685 } 686 } 687 } 688 689 //! Testing parallel_for and parallel_reduce cancellation 690 //! \brief \ref error_guessing 691 TEST_CASE("parallel_for and parallel_reduce cancellation test #2") { 692 for (auto concurrency_level: utils::concurrency_range()) { 693 g_NumThreads = static_cast<int>(concurrency_level); 694 g_Master = std::this_thread::get_id(); 695 if (g_NumThreads > 1) { 696 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 697 // Execute in all the possible modes 698 for ( size_t j = 0; j < 4; ++j ) { 699 g_ExceptionInMaster = (j & 1) != 0; 700 g_SolitaryException = (j & 2) != 0; 701 702 TestCancelation2(); 703 } 704 } 705 } 706 } 707 708 //! Testing parallel_for and parallel_reduce cancellation 709 //! \brief \ref error_guessing 710 TEST_CASE("parallel_for and parallel_reduce cancellation test #3") { 711 for (auto concurrency_level: utils::concurrency_range()) { 712 g_NumThreads = static_cast<int>(concurrency_level); 713 g_Master = std::this_thread::get_id(); 714 if (g_NumThreads > 1) { 715 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 716 // Execute in all the possible modes 717 for ( size_t j = 0; j < 4; ++j ) { 718 g_ExceptionInMaster = (j & 1) != 0; 719 g_SolitaryException = (j & 2) != 0; 720 721 TestCancelation3(); 722 } 723 } 724 } 725 } 726 727 //! Testing parallel_for and parallel_reduce cancellation 728 //! \brief \ref error_guessing 729 TEST_CASE("parallel_for and parallel_reduce cancellation test #4") { 730 for (auto concurrency_level: utils::concurrency_range()) { 731 g_NumThreads = static_cast<int>(concurrency_level); 732 g_Master = std::this_thread::get_id(); 733 if (g_NumThreads > 1) { 734 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 735 // Execute in all the possible modes 736 for ( size_t j = 0; j < 4; ++j ) { 737 g_ExceptionInMaster = (j & 1) != 0; 738 g_SolitaryException = (j & 2) != 0; 739 740 TestCancelation4(); 741 } 742 } 743 } 744 } 745 746 //////////////////////////////////////////////////////////////////////////////// 747 // Tests for tbb::parallel_for_each 748 //////////////////////////////////////////////////////////////////////////////// 749 750 std::size_t get_iter_range_size() { 751 // Set the minimal iteration sequence size to 50 to improve test complexity on small machines 752 return std::max(50, g_NumThreads * 2); 753 } 754 755 template<typename Iterator> 756 struct adaptive_range { 757 std::vector<std::size_t> my_array; 758 759 adaptive_range(std::size_t size) : my_array(size + 1) {} 760 using iterator = Iterator; 761 762 iterator begin() const { 763 return iterator{&my_array.front()}; 764 } 765 iterator begin() { 766 return iterator{&my_array.front()}; 767 } 768 iterator end() const { 769 return iterator{&my_array.back()}; 770 } 771 iterator end() { 772 return iterator{&my_array.back()}; 773 } 774 }; 775 776 void Feed ( tbb::feeder<size_t> &feeder, size_t val ) { 777 if (g_FedTasksCount < 50) { 778 ++g_FedTasksCount; 779 feeder.add(val); 780 } 781 } 782 783 #define RunWithSimpleBody(func, body) \ 784 func<utils::ForwardIterator<size_t>, body>(); \ 785 func<utils::ForwardIterator<size_t>, body##WithFeeder>() 786 787 #define RunWithTemplatedBody(func, body) \ 788 func<utils::ForwardIterator<size_t>, body<utils::ForwardIterator<size_t> > >(); \ 789 func<utils::ForwardIterator<size_t>, body##WithFeeder<utils::ForwardIterator<size_t> > >() 790 791 #if TBB_USE_EXCEPTIONS 792 793 // Simple functor object with exception 794 class SimpleParForEachBody { 795 public: 796 void operator() ( size_t &value ) const { 797 ++g_CurExecuted; 798 if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true; 799 else g_NonMasterExecuted = true; 800 if( tbb::is_current_task_group_canceling() ) { 801 g_TGCCancelled.increment(); 802 } 803 utils::ConcurrencyTracker ct; 804 value += 1000; 805 WaitUntilConcurrencyPeaks(); 806 ThrowTestException(1); 807 } 808 }; 809 810 // Simple functor object with exception and feeder 811 class SimpleParForEachBodyWithFeeder : SimpleParForEachBody { 812 public: 813 void operator() ( size_t &value, tbb::feeder<size_t> &feeder ) const { 814 Feed(feeder, 0); 815 SimpleParForEachBody::operator()(value); 816 } 817 }; 818 819 // Tests exceptions without nesting 820 template <class Iterator, class simple_body> 821 void Test1_parallel_for_each () { 822 ResetGlobals(); 823 auto range = adaptive_range<Iterator>(get_iter_range_size()); 824 TRY(); 825 tbb::parallel_for_each(std::begin(range), std::end(range), simple_body() ); 826 CATCH_AND_ASSERT(); 827 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception"); 828 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived cancellation"); 829 REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test"); 830 if ( !g_SolitaryException ) 831 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception"); 832 833 } // void Test1_parallel_for_each () 834 835 template <class Iterator> 836 class OuterParForEachBody { 837 public: 838 void operator()( size_t& /*value*/ ) const { 839 ++g_OuterParCalls; 840 auto range = adaptive_range<Iterator>(get_iter_range_size()); 841 tbb::parallel_for_each(std::begin(range), std::end(range), SimpleParForEachBody()); 842 } 843 }; 844 845 template <class Iterator> 846 class OuterParForEachBodyWithFeeder : OuterParForEachBody<Iterator> { 847 public: 848 void operator()( size_t& value, tbb::feeder<size_t>& feeder ) const { 849 Feed(feeder, 0); 850 OuterParForEachBody<Iterator>::operator()(value); 851 } 852 }; 853 854 //! Uses parallel_for_each body containing an inner parallel_for_each with the default context not wrapped by a try-block. 855 /** Inner algorithms are spawned inside the new bound context by default. Since 856 exceptions thrown from the inner parallel_for_each are not handled by the caller 857 (outer parallel_for_each body) in this test, they will cancel all the sibling inner 858 algorithms. **/ 859 template <class Iterator, class outer_body> 860 void Test2_parallel_for_each () { 861 ResetGlobals(); 862 auto range = adaptive_range<Iterator>(get_iter_range_size()); 863 TRY(); 864 tbb::parallel_for_each(std::begin(range), std::end(range), outer_body() ); 865 CATCH_AND_ASSERT(); 866 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception"); 867 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived cancellation"); 868 REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test"); 869 if ( !g_SolitaryException ) 870 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception"); 871 } // void Test2_parallel_for_each () 872 873 template <class Iterator> 874 class OuterParForEachBodyWithIsolatedCtx { 875 public: 876 void operator()( size_t& /*value*/ ) const { 877 tbb::task_group_context ctx(tbb::task_group_context::isolated); 878 ++g_OuterParCalls; 879 auto range = adaptive_range<Iterator>(get_iter_range_size()); 880 tbb::parallel_for_each(std::begin(range), std::end(range), SimpleParForEachBody(), ctx); 881 } 882 }; 883 884 template <class Iterator> 885 class OuterParForEachBodyWithIsolatedCtxWithFeeder : OuterParForEachBodyWithIsolatedCtx<Iterator> { 886 public: 887 void operator()( size_t& value, tbb::feeder<size_t> &feeder ) const { 888 Feed(feeder, 0); 889 OuterParForEachBodyWithIsolatedCtx<Iterator>::operator()(value); 890 } 891 }; 892 893 //! Uses parallel_for_each body invoking an inner parallel_for_each with an isolated context without a try-block. 894 /** Even though exceptions thrown from the inner parallel_for_each are not handled 895 by the caller in this test, they will not affect sibling inner algorithms 896 already running because of the isolated contexts. However because the first 897 exception cancels the root parallel_for_each, at most the first g_NumThreads subranges 898 will be processed (which launch inner parallel_for_eachs) **/ 899 template <class Iterator, class outer_body> 900 void Test3_parallel_for_each () { 901 ResetGlobals(); 902 auto range = adaptive_range<Iterator>(get_iter_range_size()); 903 intptr_t innerCalls = get_iter_range_size(), 904 // The assumption here is the same as in outer parallel fors. 905 minExecuted = (g_NumThreads - 1) * innerCalls; 906 g_Master = std::this_thread::get_id(); 907 TRY(); 908 tbb::parallel_for_each(std::begin(range), std::end(range), outer_body()); 909 CATCH_AND_ASSERT(); 910 // figure actual number of expected executions given the number of outer PDos started. 911 minExecuted = (g_OuterParCalls - 1) * innerCalls; 912 // one extra thread may run a task that sees cancellation. Infrequent but possible 913 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception"); 914 if ( g_SolitaryException ) { 915 REQUIRE_MESSAGE (g_CurExecuted > minExecuted, "Too few tasks survived exception"); 916 REQUIRE_MESSAGE (g_CurExecuted <= minExecuted + (g_ExecutedAtLastCatch + g_NumThreads), "Too many tasks survived exception"); 917 } 918 REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test"); 919 if ( !g_SolitaryException ) 920 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception"); 921 } // void Test3_parallel_for_each () 922 923 template <class Iterator> 924 class OuterParForEachWithEhBody { 925 public: 926 void operator()( size_t& /*value*/ ) const { 927 tbb::task_group_context ctx(tbb::task_group_context::isolated); 928 ++g_OuterParCalls; 929 auto range = adaptive_range<Iterator>(get_iter_range_size()); 930 TRY(); 931 tbb::parallel_for_each(std::begin(range), std::end(range), SimpleParForEachBody(), ctx); 932 CATCH(); 933 } 934 }; 935 936 template <class Iterator> 937 class OuterParForEachWithEhBodyWithFeeder : OuterParForEachWithEhBody<Iterator> { 938 public: 939 void operator=(const OuterParForEachWithEhBodyWithFeeder&) = delete; 940 OuterParForEachWithEhBodyWithFeeder(const OuterParForEachWithEhBodyWithFeeder&) = default; 941 OuterParForEachWithEhBodyWithFeeder() = default; 942 void operator()( size_t &value, tbb::feeder<size_t> &feeder ) const { 943 Feed(feeder, 0); 944 OuterParForEachWithEhBody<Iterator>::operator()(value); 945 } 946 }; 947 948 //! Uses parallel_for body invoking an inner parallel_for (with default bound context) inside a try-block. 949 /** Since exception(s) thrown from the inner parallel_for are handled by the caller 950 in this test, they do not affect neither other tasks of the the root parallel_for 951 nor sibling inner algorithms. **/ 952 template <class Iterator, class outer_body_with_eh> 953 void Test4_parallel_for_each () { 954 ResetGlobals( true, true ); 955 auto range = adaptive_range<Iterator>(get_iter_range_size()); 956 g_Master = std::this_thread::get_id(); 957 TRY(); 958 tbb::parallel_for_each(std::begin(range), std::end(range), outer_body_with_eh()); 959 CATCH(); 960 REQUIRE_MESSAGE (!l_ExceptionCaughtAtCurrentLevel, "All exceptions must have been handled in the parallel_for_each body"); 961 intptr_t innerCalls = get_iter_range_size(), 962 outerCalls = get_iter_range_size() + g_FedTasksCount, 963 maxExecuted = outerCalls * innerCalls, 964 minExecuted = 0; 965 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception"); 966 if ( g_SolitaryException ) { 967 minExecuted = maxExecuted - innerCalls; 968 REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No exception registered"); 969 REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too few tasks executed"); 970 // This test has the same property as Test4 (parallel_for); the exception can be 971 // thrown, but some number of tasks from the outer Pdo can execute after the throw but 972 // before the cancellation is signaled (have seen 36). 973 DOCTEST_WARN_MESSAGE(g_CurExecuted < maxExecuted, "All tasks survived exception. Oversubscription?"); 974 } 975 else { 976 minExecuted = g_NumExceptionsCaught; 977 REQUIRE_MESSAGE ((g_NumExceptionsCaught > 1 && g_NumExceptionsCaught <= outerCalls), "Unexpected actual number of exceptions"); 978 REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too many executed tasks reported"); 979 REQUIRE_MESSAGE (g_CurExecuted < g_ExecutedAtLastCatch + g_NumThreads + outerCalls, "Too many tasks survived multiple exceptions"); 980 REQUIRE_MESSAGE (g_CurExecuted <= outerCalls * (1 + g_NumThreads), "Too many tasks survived exception"); 981 } 982 } // void Test4_parallel_for_each () 983 984 // This body throws an exception only if the task was added by feeder 985 class ParForEachBodyWithThrowingFeederTasks { 986 public: 987 //! This form of the function call operator can be used when the body needs to add more work during the processing 988 void operator() (const size_t &value, tbb::feeder<size_t> &feeder ) const { 989 ++g_CurExecuted; 990 if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true; 991 else g_NonMasterExecuted = true; 992 if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment(); 993 Feed(feeder, 1); 994 if (value == 1) 995 ThrowTestException(1); 996 } 997 }; // class ParForEachBodyWithThrowingFeederTasks 998 999 // Test exception in task, which was added by feeder. 1000 template <class Iterator> 1001 void Test5_parallel_for_each () { 1002 ResetGlobals(); 1003 auto range = adaptive_range<Iterator>(get_iter_range_size()); 1004 g_Master = std::this_thread::get_id(); 1005 TRY(); 1006 tbb::parallel_for_each(std::begin(range), std::end(range), ParForEachBodyWithThrowingFeederTasks()); 1007 CATCH(); 1008 if (g_SolitaryException) { 1009 // Failure occurs when g_ExceptionInMaster is false, but all the 1 values in the range 1010 // are handled by the external thread. In this case no throw occurs. 1011 REQUIRE_MESSAGE ((l_ExceptionCaughtAtCurrentLevel // we saw an exception 1012 || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) // non-external trhead throws but none tried 1013 || (g_ExceptionInMaster && !g_MasterExecutedThrow)) // external thread throws but external thread didn't try 1014 , "At least one exception should occur"); 1015 } 1016 } // void Test5_parallel_for_each () 1017 1018 //! Testing parallel_for_each exception handling 1019 //! \brief \ref error_guessing 1020 TEST_CASE("parallel_for_each exception handling test #1") { 1021 for (auto concurrency_level: utils::concurrency_range()) { 1022 g_NumThreads = static_cast<int>(concurrency_level); 1023 g_Master = std::this_thread::get_id(); 1024 if (g_NumThreads > 1) { 1025 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 1026 // Execute in all the possible modes 1027 for ( size_t j = 0; j < 4; ++j ) { 1028 g_ExceptionInMaster = (j & 1) != 0; 1029 g_SolitaryException = (j & 2) != 0; 1030 1031 RunWithSimpleBody(Test1_parallel_for_each, SimpleParForEachBody); 1032 } 1033 } 1034 } 1035 } 1036 1037 //! Testing parallel_for_each exception handling 1038 //! \brief \ref error_guessing 1039 TEST_CASE("parallel_for_each exception handling test #2") { 1040 for (auto concurrency_level: utils::concurrency_range()) { 1041 g_NumThreads = static_cast<int>(concurrency_level); 1042 g_Master = std::this_thread::get_id(); 1043 if (g_NumThreads > 1) { 1044 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 1045 // Execute in all the possible modes 1046 for ( size_t j = 0; j < 4; ++j ) { 1047 g_ExceptionInMaster = (j & 1) != 0; 1048 g_SolitaryException = (j & 2) != 0; 1049 1050 RunWithTemplatedBody(Test2_parallel_for_each, OuterParForEachBody); 1051 } 1052 } 1053 } 1054 } 1055 1056 //! Testing parallel_for_each exception handling 1057 //! \brief \ref error_guessing 1058 TEST_CASE("parallel_for_each exception handling test #3") { 1059 for (auto concurrency_level: utils::concurrency_range()) { 1060 g_NumThreads = static_cast<int>(concurrency_level); 1061 g_Master = std::this_thread::get_id(); 1062 if (g_NumThreads > 1) { 1063 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 1064 // Execute in all the possible modes 1065 for ( size_t j = 0; j < 4; ++j ) { 1066 g_ExceptionInMaster = (j & 1) != 0; 1067 g_SolitaryException = (j & 2) != 0; 1068 1069 RunWithTemplatedBody(Test3_parallel_for_each, OuterParForEachBodyWithIsolatedCtx); 1070 } 1071 } 1072 } 1073 } 1074 1075 //! Testing parallel_for_each exception handling 1076 //! \brief \ref error_guessing 1077 TEST_CASE("parallel_for_each exception handling test #4") { 1078 for (auto concurrency_level: utils::concurrency_range()) { 1079 g_NumThreads = static_cast<int>(concurrency_level); 1080 g_Master = std::this_thread::get_id(); 1081 if (g_NumThreads > 1) { 1082 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 1083 // Execute in all the possible modes 1084 for ( size_t j = 0; j < 4; ++j ) { 1085 g_ExceptionInMaster = (j & 1) != 0; 1086 g_SolitaryException = (j & 2) != 0; 1087 1088 RunWithTemplatedBody(Test4_parallel_for_each, OuterParForEachWithEhBody); 1089 } 1090 } 1091 } 1092 } 1093 1094 //! Testing parallel_for_each exception handling 1095 //! \brief \ref error_guessing 1096 TEST_CASE("parallel_for_each exception handling test #5") { 1097 for (auto concurrency_level: utils::concurrency_range()) { 1098 g_NumThreads = static_cast<int>(concurrency_level); 1099 g_Master = std::this_thread::get_id(); 1100 if (g_NumThreads > 1) { 1101 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 1102 // Execute in all the possible modes 1103 for ( size_t j = 0; j < 4; ++j ) { 1104 g_ExceptionInMaster = (j & 1) != 0; 1105 g_SolitaryException = (j & 2) != 0; 1106 1107 Test5_parallel_for_each<utils::InputIterator<size_t> >(); 1108 Test5_parallel_for_each<utils::ForwardIterator<size_t> >(); 1109 Test5_parallel_for_each<utils::RandomIterator<size_t> >(); 1110 } 1111 } 1112 } 1113 } 1114 1115 #endif /* TBB_USE_EXCEPTIONS */ 1116 1117 class ParForEachBodyToCancel { 1118 public: 1119 void operator()( size_t& /*value*/ ) const { 1120 ++g_CurExecuted; 1121 Cancellator::WaitUntilReady(); 1122 } 1123 }; 1124 1125 class ParForEachBodyToCancelWithFeeder : ParForEachBodyToCancel { 1126 public: 1127 void operator()( size_t& value, tbb::feeder<size_t> &feeder ) const { 1128 Feed(feeder, 0); 1129 ParForEachBodyToCancel::operator()(value); 1130 } 1131 }; 1132 1133 template<class B, class Iterator> 1134 class ParForEachWorker { 1135 tbb::task_group_context &my_ctx; 1136 public: 1137 void operator()() const { 1138 auto range = adaptive_range<Iterator>(get_iter_range_size()); 1139 tbb::parallel_for_each( std::begin(range), std::end(range), B(), my_ctx ); 1140 } 1141 1142 ParForEachWorker ( tbb::task_group_context& ctx ) : my_ctx(ctx) {} 1143 }; 1144 1145 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm). 1146 template <class Iterator, class body_to_cancel> 1147 void TestCancelation1_parallel_for_each () { 1148 ResetGlobals( false ); 1149 // Threshold should leave more than max_threads tasks to test the cancellation. Set the threshold to iter_range_size()/4 since iter_range_size >= max_threads*2 1150 intptr_t threshold = get_iter_range_size() / 4; 1151 REQUIRE_MESSAGE(get_iter_range_size() - threshold > g_NumThreads, "Threshold should leave more than max_threads tasks to test the cancellation."); 1152 tbb::task_group tg; 1153 tbb::task_group_context ctx; 1154 Cancellator cancellator(ctx, threshold); 1155 ParForEachWorker<body_to_cancel, Iterator> worker(ctx); 1156 tg.run( cancellator ); 1157 utils::yield(); 1158 tg.run( worker ); 1159 TRY(); 1160 tg.wait(); 1161 CATCH_AND_FAIL(); 1162 REQUIRE_MESSAGE (g_CurExecuted < g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks were executed after cancellation"); 1163 } 1164 1165 class ParForEachBodyToCancel2 { 1166 public: 1167 void operator()( size_t& /*value*/ ) const { 1168 ++g_CurExecuted; 1169 utils::ConcurrencyTracker ct; 1170 // The test will hang (and be timed out by the test system) if is_cancelled() is broken 1171 while( !tbb::is_current_task_group_canceling() ) 1172 utils::yield(); 1173 } 1174 }; 1175 1176 class ParForEachBodyToCancel2WithFeeder : ParForEachBodyToCancel2 { 1177 public: 1178 void operator()( size_t& value, tbb::feeder<size_t> &feeder ) const { 1179 Feed(feeder, 0); 1180 ParForEachBodyToCancel2::operator()(value); 1181 } 1182 }; 1183 1184 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm). 1185 /** This version also tests tbb::is_current_task_group_canceling() method. **/ 1186 template <class Iterator, class body_to_cancel> 1187 void TestCancelation2_parallel_for_each () { 1188 ResetGlobals(); 1189 RunCancellationTest<ParForEachWorker<body_to_cancel, Iterator>, Cancellator2>(); 1190 } 1191 1192 //! Testing parallel_for_each cancellation test 1193 //! \brief \ref error_guessing 1194 TEST_CASE("parallel_for_each cancellation test #1") { 1195 for (auto concurrency_level: utils::concurrency_range()) { 1196 g_NumThreads = static_cast<int>(concurrency_level); 1197 g_Master = std::this_thread::get_id(); 1198 if (g_NumThreads > 1) { 1199 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 1200 // Execute in all the possible modes 1201 for ( size_t j = 0; j < 4; ++j ) { 1202 g_ExceptionInMaster = (j & 1) != 0; 1203 g_SolitaryException = (j & 2) != 0; 1204 RunWithSimpleBody(TestCancelation1_parallel_for_each, ParForEachBodyToCancel); 1205 } 1206 } 1207 } 1208 } 1209 1210 //! Testing parallel_for_each cancellation test 1211 //! \brief \ref error_guessing 1212 TEST_CASE("parallel_for_each cancellation test #2") { 1213 for (auto concurrency_level: utils::concurrency_range()) { 1214 g_NumThreads = static_cast<int>(concurrency_level); 1215 g_Master = std::this_thread::get_id(); 1216 if (g_NumThreads > 1) { 1217 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 1218 // Execute in all the possible modes 1219 for ( size_t j = 0; j < 4; ++j ) { 1220 g_ExceptionInMaster = (j & 1) != 0; 1221 g_SolitaryException = (j & 2) != 0; 1222 1223 RunWithSimpleBody(TestCancelation2_parallel_for_each, ParForEachBodyToCancel2); 1224 } 1225 } 1226 } 1227 } 1228 1229 //////////////////////////////////////////////////////////////////////////////// 1230 // Tests for tbb::parallel_pipeline 1231 //////////////////////////////////////////////////////////////////////////////// 1232 int g_NumTokens = 0; 1233 1234 // Simple input filter class, it assigns 1 to all array members 1235 // It stops when it receives item equal to -1 1236 class InputFilter { 1237 mutable std::atomic<size_t> m_Item{}; 1238 mutable std::vector<size_t> m_Buffer; 1239 public: 1240 InputFilter() : m_Buffer(get_iter_range_size()) { 1241 m_Item = 0; 1242 for (size_t i = 0; i < get_iter_range_size(); ++i ) 1243 m_Buffer[i] = 1; 1244 } 1245 InputFilter(const InputFilter& other) : m_Item(other.m_Item.load()), m_Buffer(get_iter_range_size()) { 1246 for (size_t i = 0; i < get_iter_range_size(); ++i ) 1247 m_Buffer[i] = other.m_Buffer[i]; 1248 } 1249 1250 void* operator()(tbb::flow_control& control) const { 1251 size_t item = m_Item++; 1252 if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true; 1253 else g_NonMasterExecuted = true; 1254 if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment(); 1255 if(item == 1) { 1256 ++g_PipelinesStarted; // count on emitting the first item. 1257 } 1258 if ( item >= get_iter_range_size() ) { 1259 control.stop(); 1260 return nullptr; 1261 } 1262 m_Buffer[item] = 1; 1263 return &m_Buffer[item]; 1264 } 1265 1266 size_t* buffer() { return m_Buffer.data(); } 1267 }; // class InputFilter 1268 1269 #if TBB_USE_EXCEPTIONS 1270 1271 // Simple filter with exception throwing. If parallel, will wait until 1272 // as many parallel filters start as there are threads. 1273 class SimpleFilter { 1274 bool m_canThrow; 1275 bool m_serial; 1276 public: 1277 SimpleFilter (bool canThrow, bool serial ) : m_canThrow(canThrow), m_serial(serial) {} 1278 void* operator()(void* item) const { 1279 ++g_CurExecuted; 1280 if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true; 1281 else g_NonMasterExecuted = true; 1282 if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment(); 1283 if ( m_canThrow ) { 1284 if ( !m_serial ) { 1285 utils::ConcurrencyTracker ct; 1286 WaitUntilConcurrencyPeaks( std::min(g_NumTokens, g_NumThreads) ); 1287 } 1288 ThrowTestException(1); 1289 } 1290 return item; 1291 } 1292 }; // class SimpleFilter 1293 1294 // This enumeration represents filters order in pipeline 1295 struct FilterSet { 1296 tbb::filter_mode mode1, mode2; 1297 bool throw1, throw2; 1298 1299 FilterSet( tbb::filter_mode m1, tbb::filter_mode m2, bool t1, bool t2 ) 1300 : mode1(m1), mode2(m2), throw1(t1), throw2(t2) 1301 {} 1302 }; // struct FilterSet 1303 1304 FilterSet serial_parallel( tbb::filter_mode::serial_in_order, tbb::filter_mode::parallel, /*throw1*/false, /*throw2*/true ); 1305 1306 template<typename InFilter, typename Filter> 1307 class CustomPipeline { 1308 InFilter inputFilter; 1309 Filter filter1; 1310 Filter filter2; 1311 FilterSet my_filters; 1312 public: 1313 CustomPipeline( const FilterSet& filters ) 1314 : filter1(filters.throw1, filters.mode1 != tbb::filter_mode::parallel), 1315 filter2(filters.throw2, filters.mode2 != tbb::filter_mode::parallel), 1316 my_filters(filters) 1317 {} 1318 1319 void run () { 1320 tbb::parallel_pipeline( 1321 g_NumTokens, 1322 tbb::make_filter<void, void*>(tbb::filter_mode::parallel, inputFilter) & 1323 tbb::make_filter<void*, void*>(my_filters.mode1, filter1) & 1324 tbb::make_filter<void*, void>(my_filters.mode2, filter2) 1325 ); 1326 } 1327 void run ( tbb::task_group_context& ctx ) { 1328 tbb::parallel_pipeline( 1329 g_NumTokens, 1330 tbb::make_filter<void, void*>(tbb::filter_mode::parallel, inputFilter) & 1331 tbb::make_filter<void*, void*>(my_filters.mode1, filter1) & 1332 tbb::make_filter<void*, void>(my_filters.mode2, filter2), 1333 ctx 1334 ); 1335 } 1336 }; 1337 1338 typedef CustomPipeline<InputFilter, SimpleFilter> SimplePipeline; 1339 1340 // Tests exceptions without nesting 1341 void Test1_pipeline ( const FilterSet& filters ) { 1342 ResetGlobals(); 1343 SimplePipeline testPipeline(filters); 1344 TRY(); 1345 testPipeline.run(); 1346 if ( g_CurExecuted == 2 * static_cast<int>(get_iter_range_size()) ) { 1347 // all the items were processed, though an exception was supposed to occur. 1348 if(!g_ExceptionInMaster && g_NonMasterExecutedThrow > 0) { 1349 // if !g_ExceptionInMaster, the external thread is not allowed to throw. 1350 // if g_nonMasterExcutedThrow > 0 then a thread besides the external thread tried to throw. 1351 REQUIRE_MESSAGE((filters.mode1 != tbb::filter_mode::parallel && filters.mode2 != tbb::filter_mode::parallel), 1352 "Unusual count"); 1353 } 1354 // In case of all serial filters they might be all executed in the thread(s) 1355 // where exceptions are not allowed by the common test logic. So we just quit. 1356 return; 1357 } 1358 CATCH_AND_ASSERT(); 1359 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception"); 1360 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception"); 1361 REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test"); 1362 if ( !g_SolitaryException ) 1363 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception"); 1364 1365 } // void Test1_pipeline () 1366 1367 // Filter with nesting 1368 class OuterFilter { 1369 public: 1370 OuterFilter ( bool, bool ) {} 1371 1372 void* operator()(void* item) const { 1373 ++g_OuterParCalls; 1374 SimplePipeline testPipeline(serial_parallel); 1375 testPipeline.run(); 1376 return item; 1377 } 1378 }; // class OuterFilter 1379 1380 //! Uses pipeline containing an inner pipeline with the default context not wrapped by a try-block. 1381 /** Inner algorithms are spawned inside the new bound context by default. Since 1382 exceptions thrown from the inner pipeline are not handled by the caller 1383 (outer pipeline body) in this test, they will cancel all the sibling inner 1384 algorithms. **/ 1385 void Test2_pipeline ( const FilterSet& filters ) { 1386 ResetGlobals(); 1387 g_NestedPipelines = true; 1388 CustomPipeline<InputFilter, OuterFilter> testPipeline(filters); 1389 TRY(); 1390 testPipeline.run(); 1391 CATCH_AND_ASSERT(); 1392 bool okayNoExceptionCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow); 1393 REQUIRE_MESSAGE ((g_NumExceptionsCaught == 1 || okayNoExceptionCaught), "No try_blocks in any body expected in this test"); 1394 if ( !g_SolitaryException ) { 1395 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception"); 1396 } 1397 } // void Test2_pipeline () 1398 1399 //! creates isolated inner pipeline and runs it. 1400 class OuterFilterWithIsolatedCtx { 1401 public: 1402 OuterFilterWithIsolatedCtx( bool , bool ) {} 1403 1404 void* operator()(void* item) const { 1405 ++g_OuterParCalls; 1406 tbb::task_group_context ctx(tbb::task_group_context::isolated); 1407 // create inner pipeline with serial input, parallel output filter, second filter throws 1408 SimplePipeline testPipeline(serial_parallel); 1409 testPipeline.run(ctx); 1410 return item; 1411 } 1412 }; // class OuterFilterWithIsolatedCtx 1413 1414 //! Uses pipeline invoking an inner pipeline with an isolated context without a try-block. 1415 /** Even though exceptions thrown from the inner pipeline are not handled 1416 by the caller in this test, they will not affect sibling inner algorithms 1417 already running because of the isolated contexts. However because the first 1418 exception cancels the root parallel_for_each only the first g_NumThreads subranges 1419 will be processed (which launch inner pipelines) **/ 1420 void Test3_pipeline ( const FilterSet& filters ) { 1421 for( int nTries = 1; nTries <= 4; ++nTries) { 1422 ResetGlobals(); 1423 g_NestedPipelines = true; 1424 g_Master = std::this_thread::get_id(); 1425 intptr_t innerCalls = get_iter_range_size(), 1426 minExecuted = (g_NumThreads - 1) * innerCalls; 1427 CustomPipeline<InputFilter, OuterFilterWithIsolatedCtx> testPipeline(filters); 1428 TRY(); 1429 testPipeline.run(); 1430 CATCH_AND_ASSERT(); 1431 1432 bool okayNoExceptionCaught = (g_ExceptionInMaster && !g_MasterExecuted) || 1433 (!g_ExceptionInMaster && !g_NonMasterExecuted); 1434 // only test assertions if the test threw an exception (or we don't care) 1435 bool testSucceeded = okayNoExceptionCaught || g_NumExceptionsCaught > 0; 1436 if(testSucceeded) { 1437 if (g_SolitaryException) { 1438 1439 // The test is one outer pipeline with two NestedFilters that each start an inner pipeline. 1440 // Each time the input filter of a pipeline delivers its first item, it increments 1441 // g_PipelinesStarted. When g_SolitaryException, the throw will not occur until 1442 // g_PipelinesStarted >= 3. (This is so at least a second pipeline in its own isolated 1443 // context will start; that is what we're testing.) 1444 // 1445 // There are two pipelines which will NOT run to completion when a solitary throw 1446 // happens in an isolated inner context: the outer pipeline and the pipeline which 1447 // throws. All the other pipelines which start should run to completion. But only 1448 // inner body invocations are counted. 1449 // 1450 // So g_CurExecuted should be about 1451 // 1452 // (2*get_iter_range_size()) * (g_PipelinesStarted - 2) + 1 1453 // ^ executions for each completed pipeline 1454 // ^ completing pipelines (remembering two will not complete) 1455 // ^ one for the inner throwing pipeline 1456 1457 minExecuted = (2*get_iter_range_size()) * (g_PipelinesStarted - 2) + 1; 1458 // each failing pipeline must execute at least two tasks 1459 REQUIRE_MESSAGE(g_CurExecuted >= minExecuted, "Too few tasks survived exception"); 1460 // no more than g_NumThreads tasks will be executed in a cancelled context. Otherwise 1461 // tasks not executing at throw were scheduled. 1462 g_TGCCancelled.validate(g_NumThreads, "Tasks not in-flight were executed"); 1463 REQUIRE_MESSAGE(g_NumExceptionsCaught == 1, "Should have only one exception"); 1464 // if we're only throwing from the external thread, and that thread didn't 1465 // participate in the pipelines, then no throw occurred. 1466 } 1467 REQUIRE_MESSAGE ((g_NumExceptionsCaught == 1 || okayNoExceptionCaught), "No try_blocks in any body expected in this test"); 1468 REQUIRE_MESSAGE (((g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads) || okayNoExceptionCaught), "Too many tasks survived exception"); 1469 return; 1470 } 1471 } 1472 } 1473 1474 class OuterFilterWithEhBody { 1475 public: 1476 OuterFilterWithEhBody( bool, bool ){} 1477 1478 void* operator()(void* item) const { 1479 tbb::task_group_context ctx(tbb::task_group_context::isolated); 1480 ++g_OuterParCalls; 1481 SimplePipeline testPipeline(serial_parallel); 1482 TRY(); 1483 testPipeline.run(ctx); 1484 CATCH(); 1485 return item; 1486 } 1487 }; // class OuterFilterWithEhBody 1488 1489 //! Uses pipeline body invoking an inner pipeline (with isolated context) inside a try-block. 1490 /** Since exception(s) thrown from the inner pipeline are handled by the caller 1491 in this test, they do not affect other tasks of the the root pipeline 1492 nor sibling inner algorithms. **/ 1493 void Test4_pipeline ( const FilterSet& filters ) { 1494 #if __GNUC__ && !__INTEL_COMPILER 1495 if ( strncmp(__VERSION__, "4.1.0", 5) == 0 ) { 1496 MESSAGE("Known issue: one of exception handling tests is skipped."); 1497 return; 1498 } 1499 #endif 1500 ResetGlobals( true, true ); 1501 // each outer pipeline stage will start get_iter_range_size() inner pipelines. 1502 // each inner pipeline that doesn't throw will process get_iter_range_size() items. 1503 // for solitary exception there will be one pipeline that only processes one stage, one item. 1504 // innerCalls should be 2*get_iter_range_size() 1505 intptr_t innerCalls = 2*get_iter_range_size(), 1506 outerCalls = 2*get_iter_range_size(), 1507 maxExecuted = outerCalls * innerCalls; // the number of invocations of the inner pipelines 1508 CustomPipeline<InputFilter, OuterFilterWithEhBody> testPipeline(filters); 1509 TRY(); 1510 testPipeline.run(); 1511 CATCH_AND_ASSERT(); 1512 intptr_t minExecuted = 0; 1513 bool okayNoExceptionCaught = (g_ExceptionInMaster && !g_MasterExecuted) || 1514 (!g_ExceptionInMaster && !g_NonMasterExecuted); 1515 if ( g_SolitaryException ) { 1516 minExecuted = maxExecuted - innerCalls; // one throwing inner pipeline 1517 REQUIRE_MESSAGE((g_NumExceptionsCaught == 1 || okayNoExceptionCaught), "No exception registered"); 1518 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception"); // probably will assert. 1519 } 1520 else { 1521 // we assume throwing pipelines will not count 1522 minExecuted = (outerCalls - g_NumExceptionsCaught) * innerCalls; 1523 REQUIRE_MESSAGE(((g_NumExceptionsCaught >= 1 && g_NumExceptionsCaught <= outerCalls) || okayNoExceptionCaught), "Unexpected actual number of exceptions"); 1524 REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too many executed tasks reported"); 1525 // too many already-scheduled tasks are started after the first exception is 1526 // thrown. And g_ExecutedAtLastCatch is updated every time an exception is caught. 1527 // So with multiple exceptions there are a variable number of tasks that have been 1528 // discarded because of the signals. 1529 // each throw is caught, so we will see many cancelled tasks. g_ExecutedAtLastCatch is 1530 // updated with each throw, so the value will be the number of tasks executed at the last 1531 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived multiple exceptions"); 1532 } 1533 } // void Test4_pipeline () 1534 1535 //! Tests pipeline function passed with different combination of filters 1536 template<void testFunc(const FilterSet&)> 1537 void TestWithDifferentFiltersAndConcurrency() { 1538 #if __TBB_USE_ADDRESS_SANITIZER 1539 // parallel_pipeline allocates tls that sporadically observed as a memory leak with 1540 // detached threads. So, use task_scheduler_handle to join threads with finalize 1541 tbb::task_scheduler_handle handle{ tbb::attach{} }; 1542 #endif 1543 for (auto concurrency_level: utils::concurrency_range()) { 1544 g_NumThreads = static_cast<int>(concurrency_level); 1545 g_Master = std::this_thread::get_id(); 1546 if (g_NumThreads > 1) { 1547 1548 const tbb::filter_mode modes[] = { 1549 tbb::filter_mode::parallel, 1550 tbb::filter_mode::serial_in_order, 1551 tbb::filter_mode::serial_out_of_order 1552 }; 1553 1554 const int NumFilterTypes = sizeof(modes)/sizeof(modes[0]); 1555 1556 // Execute in all the possible modes 1557 for ( size_t j = 0; j < 4; ++j ) { 1558 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 1559 g_ExceptionInMaster = (j & 1) != 0; 1560 g_SolitaryException = (j & 2) != 0; 1561 g_NumTokens = 2 * g_NumThreads; 1562 1563 for ( int i = 0; i < NumFilterTypes; ++i ) { 1564 for ( int n = 0; n < NumFilterTypes; ++n ) { 1565 for ( int k = 0; k < 2; ++k ) 1566 testFunc( FilterSet(modes[i], modes[n], k == 0, k != 0) ); 1567 } 1568 } 1569 } 1570 } 1571 } 1572 #if __TBB_USE_ADDRESS_SANITIZER 1573 tbb::finalize(handle); 1574 #endif 1575 } 1576 1577 //! Testing parallel_pipeline exception handling 1578 //! \brief \ref error_guessing 1579 TEST_CASE("parallel_pipeline exception handling test #1") { 1580 TestWithDifferentFiltersAndConcurrency<Test1_pipeline>(); 1581 } 1582 1583 //! Testing parallel_pipeline exception handling 1584 //! \brief \ref error_guessing 1585 TEST_CASE("parallel_pipeline exception handling test #2") { 1586 TestWithDifferentFiltersAndConcurrency<Test2_pipeline>(); 1587 } 1588 1589 //! Testing parallel_pipeline exception handling 1590 //! \brief \ref error_guessing 1591 TEST_CASE("parallel_pipeline exception handling test #3") { 1592 TestWithDifferentFiltersAndConcurrency<Test3_pipeline>(); 1593 } 1594 1595 //! Testing parallel_pipeline exception handling 1596 //! \brief \ref error_guessing 1597 TEST_CASE("parallel_pipeline exception handling test #4") { 1598 TestWithDifferentFiltersAndConcurrency<Test4_pipeline>(); 1599 } 1600 1601 #endif /* TBB_USE_EXCEPTIONS */ 1602 1603 class FilterToCancel { 1604 public: 1605 FilterToCancel() {} 1606 void* operator()(void* item) const { 1607 ++g_CurExecuted; 1608 Cancellator::WaitUntilReady(); 1609 return item; 1610 } 1611 }; // class FilterToCancel 1612 1613 template <class Filter_to_cancel> 1614 class PipelineLauncher { 1615 tbb::task_group_context &my_ctx; 1616 public: 1617 PipelineLauncher ( tbb::task_group_context& ctx ) : my_ctx(ctx) {} 1618 1619 void operator()() const { 1620 // Run test when serial filter is the first non-input filter 1621 InputFilter inputFilter; 1622 Filter_to_cancel filterToCancel; 1623 tbb::parallel_pipeline( 1624 g_NumTokens, 1625 tbb::make_filter<void, void*>(tbb::filter_mode::parallel, inputFilter) & 1626 tbb::make_filter<void*, void>(tbb::filter_mode::parallel, filterToCancel), 1627 my_ctx 1628 ); 1629 } 1630 }; 1631 1632 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm). 1633 void TestCancelation1_pipeline () { 1634 ResetGlobals(); 1635 g_ThrowException = false; 1636 // Threshold should leave more than max_threads tasks to test the cancellation. Set the threshold to iter_range_size()/4 since iter_range_size >= max_threads*2 1637 intptr_t threshold = get_iter_range_size() / 4; 1638 REQUIRE_MESSAGE(get_iter_range_size() - threshold > g_NumThreads, "Threshold should leave more than max_threads tasks to test the cancellation."); 1639 RunCancellationTest<PipelineLauncher<FilterToCancel>, Cancellator>(threshold); 1640 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived cancellation"); 1641 REQUIRE_MESSAGE (g_CurExecuted < g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks were executed after cancellation"); 1642 } 1643 1644 class FilterToCancel2 { 1645 public: 1646 FilterToCancel2() {} 1647 1648 void* operator()(void* item) const { 1649 ++g_CurExecuted; 1650 utils::ConcurrencyTracker ct; 1651 // The test will hang (and be timed out by the test system) if is_cancelled() is broken 1652 while( !tbb::is_current_task_group_canceling() ) 1653 utils::yield(); 1654 return item; 1655 } 1656 }; 1657 1658 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm). 1659 /** This version also tests task::is_cancelled() method. **/ 1660 void TestCancelation2_pipeline () { 1661 ResetGlobals(); 1662 RunCancellationTest<PipelineLauncher<FilterToCancel2>, Cancellator2>(); 1663 // g_CurExecuted is always >= g_ExecutedAtLastCatch, because the latter is always a snapshot of the 1664 // former, and g_CurExecuted is monotonic increasing. so the comparison should be at least ==. 1665 // If another filter is started after cancel but before cancellation is propagated, then the 1666 // number will be larger. 1667 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch, "Some tasks were executed after cancellation"); 1668 } 1669 1670 /** If min and max thread numbers specified on the command line are different, 1671 the test is run only for 2 sizes of the thread pool (MinThread and MaxThread) 1672 to be able to test the high and low contention modes while keeping the test reasonably fast **/ 1673 1674 //! Testing parallel_pipeline cancellation 1675 //! \brief \ref error_guessing 1676 TEST_CASE("parallel_pipeline cancellation test #1") { 1677 for (auto concurrency_level: utils::concurrency_range()) { 1678 g_NumThreads = static_cast<int>(concurrency_level); 1679 g_Master = std::this_thread::get_id(); 1680 if (g_NumThreads > 1) { 1681 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 1682 // Execute in all the possible modes 1683 for ( size_t j = 0; j < 4; ++j ) { 1684 g_ExceptionInMaster = (j & 1) != 0; 1685 g_SolitaryException = (j & 2) != 0; 1686 g_NumTokens = 2 * g_NumThreads; 1687 1688 TestCancelation1_pipeline(); 1689 } 1690 } 1691 } 1692 } 1693 1694 //! Testing parallel_pipeline cancellation 1695 //! \brief \ref error_guessing 1696 TEST_CASE("parallel_pipeline cancellation test #2") { 1697 for (auto concurrency_level: utils::concurrency_range()) { 1698 g_NumThreads = static_cast<int>(concurrency_level); 1699 g_Master = std::this_thread::get_id(); 1700 if (g_NumThreads > 1) { 1701 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 1702 // Execute in all the possible modes 1703 for ( size_t j = 0; j < 4; ++j ) { 1704 g_ExceptionInMaster = (j & 1) != 0; 1705 g_SolitaryException = (j & 2) != 0; 1706 g_NumTokens = 2 * g_NumThreads; 1707 1708 TestCancelation2_pipeline(); 1709 } 1710 } 1711 } 1712 } 1713