1 /* 2 Copyright (c) 2005-2023 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "common/test.h" 18 #include "common/concurrency_tracker.h" 19 #include "common/iterator.h" 20 #include "common/utils_concurrency_limit.h" 21 22 #include <limits.h> // for INT_MAX 23 #include <thread> 24 #include <vector> 25 26 #include "tbb/parallel_for.h" 27 #include "tbb/parallel_reduce.h" 28 #include "tbb/parallel_for_each.h" 29 #include "tbb/parallel_pipeline.h" 30 #include "tbb/blocked_range.h" 31 #include "tbb/task_group.h" 32 #include "tbb/concurrent_unordered_map.h" 33 #include "tbb/task.h" 34 #include "tbb/global_control.h" 35 36 //! \file test_eh_algorithms.cpp 37 //! \brief Test for [algorithms.parallel_for algorithms.parallel_reduce algorithms.parallel_deterministic_reduce algorithms.parallel_for_each algorithms.parallel_pipeline algorithms.parallel_pipeline.flow_control] specifications 38 39 #define FLAT_RANGE 100000 40 #define FLAT_GRAIN 100 41 #define OUTER_RANGE 100 42 #define OUTER_GRAIN 10 43 #define INNER_RANGE (FLAT_RANGE / OUTER_RANGE) 44 #define INNER_GRAIN (FLAT_GRAIN / OUTER_GRAIN) 45 46 struct context_specific_counter { 47 tbb::concurrent_unordered_map<tbb::task_group_context*, std::atomic<unsigned>> context_map{}; 48 49 void increment() { 50 tbb::task_group_context* ctx = tbb::task::current_context(); 51 REQUIRE(ctx != nullptr); 52 context_map[ctx]++; 53 } 54 55 void reset() { 56 context_map.clear(); 57 } 58 59 void validate(unsigned expected_count, const char* msg) { 60 for (auto it = context_map.begin(); it != context_map.end(); it++) { 61 REQUIRE_MESSAGE( it->second <= expected_count, msg); 62 } 63 } 64 }; 65 66 std::atomic<intptr_t> g_FedTasksCount{}; // number of tasks added by parallel_for_each feeder 67 std::atomic<intptr_t> g_OuterParCalls{}; // number of actual invocations of the outer construct executed. 68 context_specific_counter g_TGCCancelled{}; // Number of times a task sees its group cancelled at start 69 70 #include "common/exception_handling.h" 71 72 /******************************** 73 Variables in test 74 75 __ Test control variables 76 g_ExceptionInMaster -- only the external thread is allowed to throw. If false, the external cannot throw 77 g_SolitaryException -- only one throw may be executed. 78 79 -- controls for ThrowTestException for pipeline tests 80 g_NestedPipelines -- are inner pipelines being run? 81 g_PipelinesStarted -- how many pipelines have run their first filter at least once. 82 83 -- Information variables 84 85 g_Master -- Thread ID of the "external" thread 86 In pipelines sometimes the external thread does not participate, so the tests have to be resilient to this. 87 88 -- Measurement variables 89 90 g_OuterParCalls -- how many outer parallel ranges or filters started 91 g_TGCCancelled -- how many inner parallel ranges or filters saw task::self().is_cancelled() 92 g_ExceptionsThrown -- number of throws executed (counted in ThrowTestException) 93 g_MasterExecutedThrow -- number of times external thread actually executed a throw 94 g_NonMasterExecutedThrow -- number of times non-external thread actually executed a throw 95 g_ExceptionCaught -- one of PropagatedException or unknown exception was caught. (Other exceptions cause assertions.) 96 97 -- Tallies for the task bodies which have executed (counted in each inner body, sampled in ThrowTestException) 98 g_CurExecuted -- total number of inner ranges or filters which executed 99 g_ExecutedAtLastCatch -- value of g_CurExecuted when last catch was made, 0 if none. 100 g_ExecutedAtFirstCatch -- value of g_CurExecuted when first catch is made, 0 if none. 101 *********************************/ 102 103 inline void ResetGlobals ( bool throwException = true, bool flog = false ) { 104 ResetEhGlobals( throwException, flog ); 105 g_FedTasksCount = 0; 106 g_OuterParCalls = 0; 107 g_NestedPipelines = false; 108 g_TGCCancelled.reset(); 109 } 110 111 //////////////////////////////////////////////////////////////////////////////// 112 // Tests for tbb::parallel_for and tbb::parallel_reduce 113 //////////////////////////////////////////////////////////////////////////////// 114 115 typedef size_t count_type; 116 typedef tbb::blocked_range<count_type> range_type; 117 118 inline intptr_t CountSubranges(range_type r) { 119 if(!r.is_divisible()) return intptr_t(1); 120 range_type r2(r,tbb::split()); 121 return CountSubranges(r) + CountSubranges(r2); 122 } 123 124 inline intptr_t NumSubranges ( intptr_t length, intptr_t grain ) { 125 return CountSubranges(range_type(0,length,grain)); 126 } 127 128 template<class Body> 129 intptr_t TestNumSubrangesCalculation ( intptr_t length, intptr_t grain, intptr_t inner_length, intptr_t inner_grain ) { 130 ResetGlobals(); 131 g_ThrowException = false; 132 intptr_t outerCalls = NumSubranges(length, grain), 133 innerCalls = NumSubranges(inner_length, inner_grain), 134 maxExecuted = outerCalls * (innerCalls + 1); 135 tbb::parallel_for( range_type(0, length, grain), Body() ); 136 REQUIRE_MESSAGE (g_CurExecuted == maxExecuted, "Wrong estimation of bodies invocation count"); 137 return maxExecuted; 138 } 139 140 class NoThrowParForBody { 141 public: 142 void operator()( const range_type& r ) const { 143 if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true; 144 else g_NonMasterExecuted = true; 145 if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment(); 146 utils::doDummyWork(r.size()); 147 } 148 }; 149 150 #if TBB_USE_EXCEPTIONS 151 152 void Test0 () { 153 ResetGlobals(); 154 tbb::simple_partitioner p; 155 for( size_t i=0; i<10; ++i ) { 156 tbb::parallel_for( range_type(0, 0, 1), NoThrowParForBody() ); 157 tbb::parallel_for( range_type(0, 0, 1), NoThrowParForBody(), p ); 158 tbb::parallel_for( range_type(0, 128, 8), NoThrowParForBody() ); 159 tbb::parallel_for( range_type(0, 128, 8), NoThrowParForBody(), p ); 160 } 161 } // void Test0 () 162 163 //! Template that creates a functor suitable for parallel_reduce from a functor for parallel_for. 164 template<typename ParForBody> 165 class SimpleParReduceBody { 166 ParForBody m_Body; 167 public: 168 void operator=(const SimpleParReduceBody&) = delete; 169 SimpleParReduceBody(const SimpleParReduceBody&) = default; 170 SimpleParReduceBody() = default; 171 172 void operator()( const range_type& r ) const { m_Body(r); } 173 SimpleParReduceBody( SimpleParReduceBody& left, tbb::split ) : m_Body(left.m_Body) {} 174 void join( SimpleParReduceBody& /*right*/ ) {} 175 }; // SimpleParReduceBody 176 177 //! Test parallel_for and parallel_reduce for a given partitioner. 178 /** The Body need only be suitable for a parallel_for. */ 179 template<typename ParForBody, typename Partitioner> 180 void TestParallelLoopAux() { 181 Partitioner partitioner; 182 for( int i=0; i<2; ++i ) { 183 ResetGlobals(); 184 TRY(); 185 if( i==0 ) 186 tbb::parallel_for( range_type(0, FLAT_RANGE, FLAT_GRAIN), ParForBody(), partitioner ); 187 else { 188 SimpleParReduceBody<ParForBody> rb; 189 tbb::parallel_reduce( range_type(0, FLAT_RANGE, FLAT_GRAIN), rb, partitioner ); 190 } 191 CATCH_AND_ASSERT(); 192 // two cases: g_SolitaryException and !g_SolitaryException 193 // 1) g_SolitaryException: only one thread actually threw. There is only one context, so the exception 194 // (when caught) will cause that context to be cancelled. After this event, there may be one or 195 // more threads which are "in-flight", up to g_NumThreads, but no more will be started. The threads, 196 // when they start, if they see they are cancelled, TGCCancelled is incremented. 197 // 2) !g_SolitaryException: more than one thread can throw. The number of threads that actually 198 // threw is g_MasterExecutedThrow if only the external thread is allowed, else g_NonMasterExecutedThrow. 199 // Only one context, so TGCCancelled should be <= g_NumThreads. 200 // 201 // the reasoning is similar for nested algorithms in a single context (Test2). 202 // 203 // If a thread throws in a context, more than one subsequent task body may see the 204 // cancelled state (if they are scheduled before the state is propagated.) this is 205 // infrequent, but it occurs. So what was to be an assertion must be a remark. 206 g_TGCCancelled.validate( g_NumThreads, "Too many tasks ran after exception thrown"); 207 REQUIRE_MESSAGE(g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception"); 208 if ( g_SolitaryException ) { 209 REQUIRE_MESSAGE(g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test"); 210 REQUIRE_MESSAGE(g_NumExceptionsCaught == (g_ExceptionInMaster ? g_MasterExecutedThrow : g_NonMasterExecutedThrow), 211 "Not all throws were caught"); 212 REQUIRE_MESSAGE(g_ExecutedAtFirstCatch == g_ExecutedAtLastCatch, "Too many exceptions occurred"); 213 } 214 else { 215 REQUIRE_MESSAGE(g_NumExceptionsCaught >= 1, "No try blocks in any body expected in this test"); 216 } 217 } 218 } // TestParallelLoopAux 219 220 //! Test with parallel_for and parallel_reduce, over all three kinds of partitioners. 221 /** The Body only needs to be suitable for tbb::parallel_for. */ 222 template<typename Body> 223 void TestParallelLoop() { 224 // The simple and auto partitioners should be const, but not the affinity partitioner. 225 TestParallelLoopAux<Body, const tbb::simple_partitioner >(); 226 TestParallelLoopAux<Body, const tbb::auto_partitioner >(); 227 #define __TBB_TEMPORARILY_DISABLED 1 228 #if !__TBB_TEMPORARILY_DISABLED 229 // TODO: Improve the test so that it tolerates delayed start of tasks with affinity_partitioner 230 TestParallelLoopAux<Body, /***/ tbb::affinity_partitioner>(); 231 #endif 232 #undef __TBB_TEMPORARILY_DISABLED 233 } 234 235 class SimpleParForBody { 236 public: 237 void operator=(const SimpleParForBody&) = delete; 238 SimpleParForBody(const SimpleParForBody&) = default; 239 SimpleParForBody() = default; 240 241 void operator()( const range_type& r ) const { 242 utils::ConcurrencyTracker ct; 243 ++g_CurExecuted; 244 if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true; 245 else g_NonMasterExecuted = true; 246 if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment(); 247 utils::doDummyWork(r.size()); 248 WaitUntilConcurrencyPeaks(); 249 ThrowTestException(1); 250 } 251 }; 252 253 void Test1() { 254 // non-nested parallel_for/reduce with throwing body, one context 255 TestParallelLoop<SimpleParForBody>(); 256 } // void Test1 () 257 258 class OuterParForBody { 259 public: 260 void operator=(const OuterParForBody&) = delete; 261 OuterParForBody(const OuterParForBody&) = default; 262 OuterParForBody() = default; 263 void operator()( const range_type& ) const { 264 utils::ConcurrencyTracker ct; 265 ++g_OuterParCalls; 266 tbb::parallel_for( tbb::blocked_range<size_t>(0, INNER_RANGE, INNER_GRAIN), SimpleParForBody() ); 267 } 268 }; 269 270 //! Uses parallel_for body containing an inner parallel_for with the default context not wrapped by a try-block. 271 /** Inner algorithms are spawned inside the new bound context by default. Since 272 exceptions thrown from the inner parallel_for are not handled by the caller 273 (outer parallel_for body) in this test, they will cancel all the sibling inner 274 algorithms. **/ 275 void Test2 () { 276 TestParallelLoop<OuterParForBody>(); 277 } // void Test2 () 278 279 class OuterParForBodyWithIsolatedCtx { 280 public: 281 void operator()( const range_type& ) const { 282 tbb::task_group_context ctx(tbb::task_group_context::isolated); 283 ++g_OuterParCalls; 284 tbb::parallel_for( tbb::blocked_range<size_t>(0, INNER_RANGE, INNER_GRAIN), SimpleParForBody(), tbb::simple_partitioner(), ctx ); 285 } 286 }; 287 288 //! Uses parallel_for body invoking an inner parallel_for with an isolated context without a try-block. 289 /** Even though exceptions thrown from the inner parallel_for are not handled 290 by the caller in this test, they will not affect sibling inner algorithms 291 already running because of the isolated contexts. However because the first 292 exception cancels the root parallel_for only the first g_NumThreads subranges 293 will be processed (which launch inner parallel_fors) **/ 294 void Test3 () { 295 ResetGlobals(); 296 typedef OuterParForBodyWithIsolatedCtx body_type; 297 intptr_t innerCalls = NumSubranges(INNER_RANGE, INNER_GRAIN), 298 // we expect one thread to throw without counting, the rest to run to completion 299 // this formula assumes g_numThreads outer pfor ranges will be started, but that is not the 300 // case; the SimpleParFor subranges are started up as part of the outer ones, and when 301 // the amount of concurrency reaches g_NumThreads no more outer Pfor ranges are started. 302 // so we have to count the number of outer Pfors actually started. 303 minExecuted = (g_NumThreads - 1) * innerCalls; 304 TRY(); 305 tbb::parallel_for( range_type(0, OUTER_RANGE, OUTER_GRAIN), body_type() ); 306 CATCH_AND_ASSERT(); 307 minExecuted = (g_OuterParCalls - 1) * innerCalls; // see above 308 309 // The first formula above assumes all ranges of the outer parallel for are executed, and one 310 // cancels. In the event, we have a smaller number of ranges that start before the exception 311 // is caught. 312 // 313 // g_SolitaryException:One inner range throws. Outer parallel_For is cancelled, but sibling 314 // parallel_fors continue to completion (unless the threads that execute 315 // are not allowed to throw, in which case we will not see any exceptions). 316 // !g_SolitaryException:multiple inner ranges may throw. Any which throws will stop, and the 317 // corresponding range of the outer pfor will stop also. 318 // 319 // In either case, once the outer pfor gets the exception it will stop executing further ranges. 320 321 // if the only threads executing were not allowed to throw, then not seeing an exception is okay. 322 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecuted) || (!g_ExceptionInMaster && !g_NonMasterExecuted); 323 if ( g_SolitaryException ) { 324 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception"); 325 REQUIRE_MESSAGE (g_CurExecuted > minExecuted, "Too few tasks survived exception"); 326 REQUIRE_MESSAGE ((g_CurExecuted <= minExecuted + (g_ExecutedAtLastCatch + g_NumThreads)), "Too many tasks survived exception"); 327 REQUIRE_MESSAGE ((g_NumExceptionsCaught == 1 || okayNoExceptionsCaught), "No try_blocks in any body expected in this test"); 328 } 329 else { 330 REQUIRE_MESSAGE ((g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads), "Too many tasks survived exception"); 331 REQUIRE_MESSAGE ((g_NumExceptionsCaught >= 1 || okayNoExceptionsCaught), "No try_blocks in any body expected in this test"); 332 } 333 } // void Test3 () 334 335 class OuterParForExceptionSafeBody { 336 public: 337 void operator()( const range_type& ) const { 338 tbb::task_group_context ctx(tbb::task_group_context::isolated); 339 ++g_OuterParCalls; 340 TRY(); 341 tbb::parallel_for( tbb::blocked_range<size_t>(0, INNER_RANGE, INNER_GRAIN), SimpleParForBody(), tbb::simple_partitioner(), ctx ); 342 CATCH(); // this macro sets g_ExceptionCaught 343 } 344 }; 345 346 //! Uses parallel_for body invoking an inner parallel_for (with isolated context) inside a try-block. 347 /** Since exception(s) thrown from the inner parallel_for are handled by the caller 348 in this test, they do not affect neither other tasks of the the root parallel_for 349 nor sibling inner algorithms. **/ 350 void Test4 () { 351 ResetGlobals( true, true ); 352 intptr_t innerCalls = NumSubranges(INNER_RANGE, INNER_GRAIN), 353 outerCalls = NumSubranges(OUTER_RANGE, OUTER_GRAIN); 354 TRY(); 355 tbb::parallel_for( range_type(0, OUTER_RANGE, OUTER_GRAIN), OuterParForExceptionSafeBody() ); 356 CATCH(); 357 // g_SolitaryException : one inner pfor will throw, the rest will execute to completion. 358 // so the count should be (outerCalls -1) * innerCalls, if a throw happened. 359 // !g_SolitaryException : possible multiple inner pfor throws. Should be approximately 360 // (outerCalls - g_NumExceptionsCaught) * innerCalls, give or take a few 361 intptr_t minExecuted = (outerCalls - g_NumExceptionsCaught) * innerCalls; 362 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecuted) || (!g_ExceptionInMaster && !g_NonMasterExecuted); 363 if ( g_SolitaryException ) { 364 // only one task had exception thrown. That task had at least one execution (the one that threw). 365 // There may be an arbitrary number of ranges executed after the throw but before the exception 366 // is caught in the scheduler and cancellation is signaled. (seen 9, 11 and 62 (!) for 8 threads) 367 REQUIRE_MESSAGE ((g_NumExceptionsCaught == 1 || okayNoExceptionsCaught), "No exception registered"); 368 REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too few tasks executed"); 369 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception"); 370 // a small number of threads can execute in a throwing sub-pfor, if the task which is 371 // to do the solitary throw swaps out after registering its intent to throw but before it 372 // actually does so. As a result, the number of extra tasks cannot exceed the number of thread 373 // for each nested pfor invication) 374 REQUIRE_MESSAGE (g_CurExecuted <= minExecuted + (g_ExecutedAtLastCatch + g_NumThreads), "Too many tasks survived exception"); 375 } 376 else { 377 REQUIRE_MESSAGE (((g_NumExceptionsCaught >= 1 && g_NumExceptionsCaught <= outerCalls) || okayNoExceptionsCaught), "Unexpected actual number of exceptions"); 378 REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too few executed tasks reported"); 379 REQUIRE_MESSAGE ((g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads), "Too many tasks survived multiple exceptions"); 380 REQUIRE_MESSAGE (g_CurExecuted <= outerCalls * (1 + g_NumThreads), "Too many tasks survived exception"); 381 } 382 } // void Test4 () 383 384 //! Testing parallel_for and parallel_reduce exception handling 385 //! \brief \ref error_guessing 386 TEST_CASE("parallel_for and parallel_reduce exception handling test #0") { 387 for (auto concurrency_level: utils::concurrency_range()) { 388 g_NumThreads = static_cast<int>(concurrency_level); 389 g_Master = std::this_thread::get_id(); 390 if (g_NumThreads > 1) { 391 tbb::task_arena a(g_NumThreads); 392 a.execute([] { 393 // Execute in all the possible modes 394 for (size_t j = 0; j < 4; ++j) { 395 g_ExceptionInMaster = (j & 1) != 0; 396 g_SolitaryException = (j & 2) != 0; 397 398 Test0(); 399 } 400 }); 401 } 402 } 403 } 404 405 //! Testing parallel_for and parallel_reduce exception handling 406 //! \brief \ref error_guessing 407 TEST_CASE("parallel_for and parallel_reduce exception handling test #1") { 408 for (auto concurrency_level: utils::concurrency_range()) { 409 g_NumThreads = static_cast<int>(concurrency_level); 410 g_Master = std::this_thread::get_id(); 411 if (g_NumThreads > 1) { 412 tbb::task_arena a(g_NumThreads); 413 a.execute([] { 414 // Execute in all the possible modes 415 for (size_t j = 0; j < 4; ++j) { 416 g_ExceptionInMaster = (j & 1) != 0; 417 g_SolitaryException = (j & 2) != 0; 418 419 Test1(); 420 } 421 }); 422 } 423 } 424 } 425 426 //! Testing parallel_for and parallel_reduce exception handling 427 //! \brief \ref error_guessing 428 TEST_CASE("parallel_for and parallel_reduce exception handling test #2") { 429 for (auto concurrency_level: utils::concurrency_range()) { 430 g_NumThreads = static_cast<int>(concurrency_level); 431 g_Master = std::this_thread::get_id(); 432 if (g_NumThreads > 1) { 433 tbb::task_arena a(g_NumThreads); 434 a.execute([] { 435 // Execute in all the possible modes 436 for (size_t j = 0; j < 4; ++j) { 437 g_ExceptionInMaster = (j & 1) != 0; 438 g_SolitaryException = (j & 2) != 0; 439 440 Test2(); 441 } 442 }); 443 } 444 } 445 } 446 447 //! Testing parallel_for and parallel_reduce exception handling 448 //! \brief \ref error_guessing 449 TEST_CASE("parallel_for and parallel_reduce exception handling test #3") { 450 for (auto concurrency_level: utils::concurrency_range()) { 451 g_NumThreads = static_cast<int>(concurrency_level); 452 g_Master = std::this_thread::get_id(); 453 if (g_NumThreads > 1) { 454 tbb::task_arena a(g_NumThreads); 455 a.execute([] { 456 // Execute in all the possible modes 457 for (size_t j = 0; j < 4; ++j) { 458 g_ExceptionInMaster = (j & 1) != 0; 459 g_SolitaryException = (j & 2) != 0; 460 461 Test3(); 462 } 463 }); 464 } 465 } 466 } 467 468 //! Testing parallel_for and parallel_reduce exception handling 469 //! \brief \ref error_guessing 470 TEST_CASE("parallel_for and parallel_reduce exception handling test #4") { 471 for (auto concurrency_level: utils::concurrency_range()) { 472 g_NumThreads = static_cast<int>(concurrency_level); 473 g_Master = std::this_thread::get_id(); 474 if (g_NumThreads > 1) { 475 tbb::task_arena a(g_NumThreads); 476 a.execute([] { 477 // Execute in all the possible modes 478 for (size_t j = 0; j < 4; ++j) { 479 g_ExceptionInMaster = (j & 1) != 0; 480 g_SolitaryException = (j & 2) != 0; 481 482 Test4(); 483 } 484 }); 485 } 486 } 487 } 488 489 #endif /* TBB_USE_EXCEPTIONS */ 490 491 class ParForBodyToCancel { 492 public: 493 void operator()( const range_type& ) const { 494 ++g_CurExecuted; 495 Cancellator::WaitUntilReady(); 496 } 497 }; 498 499 template<class B> 500 class ParForLauncher { 501 tbb::task_group_context &my_ctx; 502 public: 503 void operator()() const { 504 tbb::parallel_for( range_type(0, FLAT_RANGE, FLAT_GRAIN), B(), tbb::simple_partitioner(), my_ctx ); 505 } 506 ParForLauncher ( tbb::task_group_context& ctx ) : my_ctx(ctx) {} 507 }; 508 509 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm). 510 void TestCancelation1 () { 511 ResetGlobals( false ); 512 RunCancellationTest<ParForLauncher<ParForBodyToCancel>, Cancellator>( NumSubranges(FLAT_RANGE, FLAT_GRAIN) / 4 ); 513 } 514 515 class Cancellator2 { 516 tbb::task_group_context &m_GroupToCancel; 517 518 public: 519 void operator()() const { 520 utils::ConcurrencyTracker ct; 521 WaitUntilConcurrencyPeaks(); 522 m_GroupToCancel.cancel_group_execution(); 523 g_ExecutedAtLastCatch = g_CurExecuted.load(); 524 } 525 526 Cancellator2 ( tbb::task_group_context& ctx, intptr_t ) : m_GroupToCancel(ctx) {} 527 }; 528 529 class ParForBodyToCancel2 { 530 public: 531 void operator()( const range_type& ) const { 532 ++g_CurExecuted; 533 utils::ConcurrencyTracker ct; 534 // The test will hang (and be timed out by the test system) if is_cancelled() is broken 535 while( !tbb::is_current_task_group_canceling() ) 536 utils::yield(); 537 } 538 }; 539 540 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm). 541 /** This version also tests tbb::is_current_task_group_canceling() method. **/ 542 void TestCancelation2 () { 543 ResetGlobals(); 544 RunCancellationTest<ParForLauncher<ParForBodyToCancel2>, Cancellator2>(); 545 REQUIRE_MESSAGE (g_ExecutedAtLastCatch < g_NumThreads, "Somehow worker tasks started their execution before the cancellator task"); 546 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived cancellation"); 547 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Some tasks were executed after cancellation"); 548 } 549 550 //////////////////////////////////////////////////////////////////////////////// 551 // Regression test based on the contribution by the author of the following forum post: 552 // http://softwarecommunity.intel.com/isn/Community/en-US/forums/thread/30254959.aspx 553 554 class Worker { 555 static const int max_nesting = 3; 556 static const int reduce_range = 1024; 557 static const int reduce_grain = 256; 558 public: 559 int DoWork (int level); 560 int Validate (int start_level) { 561 int expected = 1; // identity for multiplication 562 for(int i=start_level+1; i<max_nesting; ++i) 563 expected *= reduce_range; 564 return expected; 565 } 566 }; 567 568 class RecursiveParReduceBodyWithSharedWorker { 569 Worker * m_SharedWorker; 570 int m_NestingLevel; 571 int m_Result; 572 public: 573 RecursiveParReduceBodyWithSharedWorker ( RecursiveParReduceBodyWithSharedWorker& src, tbb::split ) 574 : m_SharedWorker(src.m_SharedWorker) 575 , m_NestingLevel(src.m_NestingLevel) 576 , m_Result(0) 577 {} 578 RecursiveParReduceBodyWithSharedWorker ( Worker *w, int outer ) 579 : m_SharedWorker(w) 580 , m_NestingLevel(outer) 581 , m_Result(0) 582 {} 583 584 void operator() ( const tbb::blocked_range<size_t>& r ) { 585 if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true; 586 else g_NonMasterExecuted = true; 587 if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment(); 588 for (size_t i = r.begin (); i != r.end (); ++i) { 589 m_Result += m_SharedWorker->DoWork (m_NestingLevel); 590 } 591 } 592 void join (const RecursiveParReduceBodyWithSharedWorker & x) { 593 m_Result += x.m_Result; 594 } 595 int result () { return m_Result; } 596 }; 597 598 int Worker::DoWork ( int level ) { 599 ++level; 600 if ( level < max_nesting ) { 601 RecursiveParReduceBodyWithSharedWorker rt (this, level); 602 tbb::parallel_reduce (tbb::blocked_range<size_t>(0, reduce_range, reduce_grain), rt); 603 return rt.result(); 604 } 605 else 606 return 1; 607 } 608 609 //! Regression test for hanging that occurred with the first version of cancellation propagation 610 void TestCancelation3 () { 611 Worker w; 612 int result = w.DoWork (0); 613 int expected = w.Validate(0); 614 REQUIRE_MESSAGE ( result == expected, "Wrong calculation result"); 615 } 616 617 struct StatsCounters { 618 std::atomic<size_t> my_total_created; 619 std::atomic<size_t> my_total_deleted; 620 StatsCounters() { 621 my_total_created = 0; 622 my_total_deleted = 0; 623 } 624 }; 625 626 class ParReduceBody { 627 StatsCounters* my_stats; 628 size_t my_id; 629 bool my_exception; 630 tbb::task_group_context& tgc; 631 632 public: 633 ParReduceBody( StatsCounters& s_, tbb::task_group_context& context, bool e_ ) : 634 my_stats(&s_), my_exception(e_), tgc(context) { 635 my_id = my_stats->my_total_created++; 636 } 637 638 ParReduceBody( const ParReduceBody& lhs ) : tgc(lhs.tgc) { 639 my_stats = lhs.my_stats; 640 my_id = my_stats->my_total_created++; 641 } 642 643 ParReduceBody( ParReduceBody& lhs, tbb::split ) : tgc(lhs.tgc) { 644 my_stats = lhs.my_stats; 645 my_id = my_stats->my_total_created++; 646 } 647 648 ~ParReduceBody(){ ++my_stats->my_total_deleted; } 649 650 void operator()( const tbb::blocked_range<std::size_t>& /*range*/ ) const { 651 //Do nothing, except for one task (chosen arbitrarily) 652 if( my_id >= 12 ) { 653 if( my_exception ) 654 ThrowTestException(1); 655 else 656 tgc.cancel_group_execution(); 657 } 658 } 659 660 void join( ParReduceBody& /*rhs*/ ) {} 661 }; 662 663 void TestCancelation4() { 664 StatsCounters statsObj; 665 #if TBB_USE_EXCEPTIONS 666 try 667 #endif 668 { 669 tbb::task_group_context tgc1, tgc2; 670 ParReduceBody body_for_cancellation(statsObj, tgc1, false), body_for_exception(statsObj, tgc2, true); 671 tbb::parallel_reduce( tbb::blocked_range<std::size_t>(0,100000000,100), body_for_cancellation, tbb::simple_partitioner(), tgc1 ); 672 tbb::parallel_reduce( tbb::blocked_range<std::size_t>(0,100000000,100), body_for_exception, tbb::simple_partitioner(), tgc2 ); 673 } 674 #if TBB_USE_EXCEPTIONS 675 catch(...) {} 676 #endif 677 REQUIRE_MESSAGE ( statsObj.my_total_created==statsObj.my_total_deleted, "Not all parallel_reduce body objects created were reclaimed"); 678 } 679 680 //! Testing parallel_for and parallel_reduce cancellation 681 //! \brief \ref error_guessing 682 TEST_CASE("parallel_for and parallel_reduce cancellation test #1") { 683 for (auto concurrency_level: utils::concurrency_range()) { 684 g_NumThreads = static_cast<int>(concurrency_level); 685 g_Master = std::this_thread::get_id(); 686 if (g_NumThreads > 1) { 687 tbb::task_arena a(g_NumThreads); 688 a.execute([] { 689 // Execute in all the possible modes 690 for (size_t j = 0; j < 4; ++j) { 691 g_ExceptionInMaster = (j & 1) != 0; 692 g_SolitaryException = (j & 2) != 0; 693 694 TestCancelation1(); 695 } 696 }); 697 } 698 } 699 } 700 701 //! Testing parallel_for and parallel_reduce cancellation 702 //! \brief \ref error_guessing 703 TEST_CASE("parallel_for and parallel_reduce cancellation test #2") { 704 for (auto concurrency_level: utils::concurrency_range()) { 705 g_NumThreads = static_cast<int>(concurrency_level); 706 g_Master = std::this_thread::get_id(); 707 if (g_NumThreads > 1) { 708 tbb::task_arena a(g_NumThreads); 709 a.execute([] { 710 // Execute in all the possible modes 711 for (size_t j = 0; j < 4; ++j) { 712 g_ExceptionInMaster = (j & 1) != 0; 713 g_SolitaryException = (j & 2) != 0; 714 715 TestCancelation2(); 716 } 717 }); 718 } 719 } 720 } 721 722 //! Testing parallel_for and parallel_reduce cancellation 723 //! \brief \ref error_guessing 724 TEST_CASE("parallel_for and parallel_reduce cancellation test #3") { 725 for (auto concurrency_level: utils::concurrency_range()) { 726 g_NumThreads = static_cast<int>(concurrency_level); 727 g_Master = std::this_thread::get_id(); 728 if (g_NumThreads > 1) { 729 tbb::task_arena a(g_NumThreads); 730 a.execute([] { 731 // Execute in all the possible modes 732 for (size_t j = 0; j < 4; ++j) { 733 g_ExceptionInMaster = (j & 1) != 0; 734 g_SolitaryException = (j & 2) != 0; 735 736 TestCancelation3(); 737 } 738 }); 739 } 740 } 741 } 742 743 //! Testing parallel_for and parallel_reduce cancellation 744 //! \brief \ref error_guessing 745 TEST_CASE("parallel_for and parallel_reduce cancellation test #4") { 746 for (auto concurrency_level: utils::concurrency_range()) { 747 g_NumThreads = static_cast<int>(concurrency_level); 748 g_Master = std::this_thread::get_id(); 749 if (g_NumThreads > 1) { 750 tbb::task_arena a(g_NumThreads); 751 a.execute([] { 752 // Execute in all the possible modes 753 for (size_t j = 0; j < 4; ++j) { 754 g_ExceptionInMaster = (j & 1) != 0; 755 g_SolitaryException = (j & 2) != 0; 756 757 TestCancelation4(); 758 } 759 }); 760 } 761 } 762 } 763 764 //////////////////////////////////////////////////////////////////////////////// 765 // Tests for tbb::parallel_for_each 766 //////////////////////////////////////////////////////////////////////////////// 767 768 std::size_t get_iter_range_size() { 769 // Set the minimal iteration sequence size to 50 to improve test complexity on small machines 770 return std::max(50, g_NumThreads * 2); 771 } 772 773 template<typename Iterator> 774 struct adaptive_range { 775 std::vector<std::size_t> my_array; 776 777 adaptive_range(std::size_t size) : my_array(size + 1) {} 778 using iterator = Iterator; 779 780 iterator begin() const { 781 return iterator{&my_array.front()}; 782 } 783 iterator begin() { 784 return iterator{&my_array.front()}; 785 } 786 iterator end() const { 787 return iterator{&my_array.back()}; 788 } 789 iterator end() { 790 return iterator{&my_array.back()}; 791 } 792 }; 793 794 void Feed ( tbb::feeder<size_t> &feeder, size_t val ) { 795 if (g_FedTasksCount < 50) { 796 ++g_FedTasksCount; 797 feeder.add(val); 798 } 799 } 800 801 #define RunWithSimpleBody(func, body) \ 802 func<utils::ForwardIterator<size_t>, body>(); \ 803 func<utils::ForwardIterator<size_t>, body##WithFeeder>() 804 805 #define RunWithTemplatedBody(func, body) \ 806 func<utils::ForwardIterator<size_t>, body<utils::ForwardIterator<size_t> > >(); \ 807 func<utils::ForwardIterator<size_t>, body##WithFeeder<utils::ForwardIterator<size_t> > >() 808 809 #if TBB_USE_EXCEPTIONS 810 811 // Simple functor object with exception 812 class SimpleParForEachBody { 813 public: 814 void operator() ( size_t &value ) const { 815 ++g_CurExecuted; 816 if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true; 817 else g_NonMasterExecuted = true; 818 if( tbb::is_current_task_group_canceling() ) { 819 g_TGCCancelled.increment(); 820 } 821 utils::ConcurrencyTracker ct; 822 value += 1000; 823 WaitUntilConcurrencyPeaks(); 824 ThrowTestException(1); 825 } 826 }; 827 828 // Simple functor object with exception and feeder 829 class SimpleParForEachBodyWithFeeder : SimpleParForEachBody { 830 public: 831 void operator() ( size_t &value, tbb::feeder<size_t> &feeder ) const { 832 Feed(feeder, 0); 833 SimpleParForEachBody::operator()(value); 834 } 835 }; 836 837 // Tests exceptions without nesting 838 template <class Iterator, class simple_body> 839 void Test1_parallel_for_each () { 840 ResetGlobals(); 841 auto range = adaptive_range<Iterator>(get_iter_range_size()); 842 TRY(); 843 tbb::parallel_for_each(std::begin(range), std::end(range), simple_body() ); 844 CATCH_AND_ASSERT(); 845 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception"); 846 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived cancellation"); 847 REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test"); 848 if ( !g_SolitaryException ) 849 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception"); 850 851 } // void Test1_parallel_for_each () 852 853 template <class Iterator> 854 class OuterParForEachBody { 855 public: 856 void operator()( size_t& /*value*/ ) const { 857 ++g_OuterParCalls; 858 auto range = adaptive_range<Iterator>(get_iter_range_size()); 859 tbb::parallel_for_each(std::begin(range), std::end(range), SimpleParForEachBody()); 860 } 861 }; 862 863 template <class Iterator> 864 class OuterParForEachBodyWithFeeder : OuterParForEachBody<Iterator> { 865 public: 866 void operator()( size_t& value, tbb::feeder<size_t>& feeder ) const { 867 Feed(feeder, 0); 868 OuterParForEachBody<Iterator>::operator()(value); 869 } 870 }; 871 872 //! Uses parallel_for_each body containing an inner parallel_for_each with the default context not wrapped by a try-block. 873 /** Inner algorithms are spawned inside the new bound context by default. Since 874 exceptions thrown from the inner parallel_for_each are not handled by the caller 875 (outer parallel_for_each body) in this test, they will cancel all the sibling inner 876 algorithms. **/ 877 template <class Iterator, class outer_body> 878 void Test2_parallel_for_each () { 879 ResetGlobals(); 880 auto range = adaptive_range<Iterator>(get_iter_range_size()); 881 TRY(); 882 tbb::parallel_for_each(std::begin(range), std::end(range), outer_body() ); 883 CATCH_AND_ASSERT(); 884 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception"); 885 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived cancellation"); 886 REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test"); 887 if ( !g_SolitaryException ) 888 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception"); 889 } // void Test2_parallel_for_each () 890 891 template <class Iterator> 892 class OuterParForEachBodyWithIsolatedCtx { 893 public: 894 void operator()( size_t& /*value*/ ) const { 895 tbb::task_group_context ctx(tbb::task_group_context::isolated); 896 ++g_OuterParCalls; 897 auto range = adaptive_range<Iterator>(get_iter_range_size()); 898 tbb::parallel_for_each(std::begin(range), std::end(range), SimpleParForEachBody(), ctx); 899 } 900 }; 901 902 template <class Iterator> 903 class OuterParForEachBodyWithIsolatedCtxWithFeeder : OuterParForEachBodyWithIsolatedCtx<Iterator> { 904 public: 905 void operator()( size_t& value, tbb::feeder<size_t> &feeder ) const { 906 Feed(feeder, 0); 907 OuterParForEachBodyWithIsolatedCtx<Iterator>::operator()(value); 908 } 909 }; 910 911 //! Uses parallel_for_each body invoking an inner parallel_for_each with an isolated context without a try-block. 912 /** Even though exceptions thrown from the inner parallel_for_each are not handled 913 by the caller in this test, they will not affect sibling inner algorithms 914 already running because of the isolated contexts. However because the first 915 exception cancels the root parallel_for_each, at most the first g_NumThreads subranges 916 will be processed (which launch inner parallel_for_eachs) **/ 917 template <class Iterator, class outer_body> 918 void Test3_parallel_for_each () { 919 ResetGlobals(); 920 auto range = adaptive_range<Iterator>(get_iter_range_size()); 921 intptr_t innerCalls = get_iter_range_size(), 922 // The assumption here is the same as in outer parallel fors. 923 minExecuted = (g_NumThreads - 1) * innerCalls; 924 g_Master = std::this_thread::get_id(); 925 TRY(); 926 tbb::parallel_for_each(std::begin(range), std::end(range), outer_body()); 927 CATCH_AND_ASSERT(); 928 // figure actual number of expected executions given the number of outer PDos started. 929 minExecuted = (g_OuterParCalls - 1) * innerCalls; 930 // one extra thread may run a task that sees cancellation. Infrequent but possible 931 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception"); 932 if ( g_SolitaryException ) { 933 REQUIRE_MESSAGE (g_CurExecuted > minExecuted, "Too few tasks survived exception"); 934 REQUIRE_MESSAGE (g_CurExecuted <= minExecuted + (g_ExecutedAtLastCatch + g_NumThreads), "Too many tasks survived exception"); 935 } 936 REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test"); 937 if ( !g_SolitaryException ) 938 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception"); 939 } // void Test3_parallel_for_each () 940 941 template <class Iterator> 942 class OuterParForEachWithEhBody { 943 public: 944 void operator()( size_t& /*value*/ ) const { 945 tbb::task_group_context ctx(tbb::task_group_context::isolated); 946 ++g_OuterParCalls; 947 auto range = adaptive_range<Iterator>(get_iter_range_size()); 948 TRY(); 949 tbb::parallel_for_each(std::begin(range), std::end(range), SimpleParForEachBody(), ctx); 950 CATCH(); 951 } 952 }; 953 954 template <class Iterator> 955 class OuterParForEachWithEhBodyWithFeeder : OuterParForEachWithEhBody<Iterator> { 956 public: 957 void operator=(const OuterParForEachWithEhBodyWithFeeder&) = delete; 958 OuterParForEachWithEhBodyWithFeeder(const OuterParForEachWithEhBodyWithFeeder&) = default; 959 OuterParForEachWithEhBodyWithFeeder() = default; 960 void operator()( size_t &value, tbb::feeder<size_t> &feeder ) const { 961 Feed(feeder, 0); 962 OuterParForEachWithEhBody<Iterator>::operator()(value); 963 } 964 }; 965 966 //! Uses parallel_for body invoking an inner parallel_for (with default bound context) inside a try-block. 967 /** Since exception(s) thrown from the inner parallel_for are handled by the caller 968 in this test, they do not affect neither other tasks of the the root parallel_for 969 nor sibling inner algorithms. **/ 970 template <class Iterator, class outer_body_with_eh> 971 void Test4_parallel_for_each () { 972 ResetGlobals( true, true ); 973 auto range = adaptive_range<Iterator>(get_iter_range_size()); 974 g_Master = std::this_thread::get_id(); 975 TRY(); 976 tbb::parallel_for_each(std::begin(range), std::end(range), outer_body_with_eh()); 977 CATCH(); 978 REQUIRE_MESSAGE (!l_ExceptionCaughtAtCurrentLevel, "All exceptions must have been handled in the parallel_for_each body"); 979 intptr_t innerCalls = get_iter_range_size(), 980 outerCalls = get_iter_range_size() + g_FedTasksCount, 981 maxExecuted = outerCalls * innerCalls, 982 minExecuted = 0; 983 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception"); 984 if ( g_SolitaryException ) { 985 minExecuted = maxExecuted - innerCalls; 986 REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No exception registered"); 987 REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too few tasks executed"); 988 // This test has the same property as Test4 (parallel_for); the exception can be 989 // thrown, but some number of tasks from the outer Pdo can execute after the throw but 990 // before the cancellation is signaled (have seen 36). 991 DOCTEST_WARN_MESSAGE(g_CurExecuted < maxExecuted, "All tasks survived exception. Oversubscription?"); 992 } 993 else { 994 minExecuted = g_NumExceptionsCaught; 995 REQUIRE_MESSAGE ((g_NumExceptionsCaught > 1 && g_NumExceptionsCaught <= outerCalls), "Unexpected actual number of exceptions"); 996 REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too many executed tasks reported"); 997 REQUIRE_MESSAGE (g_CurExecuted < g_ExecutedAtLastCatch + g_NumThreads + outerCalls, "Too many tasks survived multiple exceptions"); 998 REQUIRE_MESSAGE (g_CurExecuted <= outerCalls * (1 + g_NumThreads), "Too many tasks survived exception"); 999 } 1000 } // void Test4_parallel_for_each () 1001 1002 // This body throws an exception only if the task was added by feeder 1003 class ParForEachBodyWithThrowingFeederTasks { 1004 public: 1005 //! This form of the function call operator can be used when the body needs to add more work during the processing 1006 void operator() (const size_t &value, tbb::feeder<size_t> &feeder ) const { 1007 ++g_CurExecuted; 1008 if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true; 1009 else g_NonMasterExecuted = true; 1010 if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment(); 1011 Feed(feeder, 1); 1012 if (value == 1) 1013 ThrowTestException(1); 1014 } 1015 }; // class ParForEachBodyWithThrowingFeederTasks 1016 1017 // Test exception in task, which was added by feeder. 1018 template <class Iterator> 1019 void Test5_parallel_for_each () { 1020 ResetGlobals(); 1021 auto range = adaptive_range<Iterator>(get_iter_range_size()); 1022 g_Master = std::this_thread::get_id(); 1023 TRY(); 1024 tbb::parallel_for_each(std::begin(range), std::end(range), ParForEachBodyWithThrowingFeederTasks()); 1025 CATCH(); 1026 if (g_SolitaryException) { 1027 // Failure occurs when g_ExceptionInMaster is false, but all the 1 values in the range 1028 // are handled by the external thread. In this case no throw occurs. 1029 REQUIRE_MESSAGE ((l_ExceptionCaughtAtCurrentLevel // we saw an exception 1030 || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) // non-external trhead throws but none tried 1031 || (g_ExceptionInMaster && !g_MasterExecutedThrow)) // external thread throws but external thread didn't try 1032 , "At least one exception should occur"); 1033 } 1034 } // void Test5_parallel_for_each () 1035 1036 //! Testing parallel_for_each exception handling 1037 //! \brief \ref error_guessing 1038 TEST_CASE("parallel_for_each exception handling test #1") { 1039 for (auto concurrency_level: utils::concurrency_range()) { 1040 g_NumThreads = static_cast<int>(concurrency_level); 1041 g_Master = std::this_thread::get_id(); 1042 if (g_NumThreads > 1) { 1043 tbb::task_arena a(g_NumThreads); 1044 a.execute([] { 1045 // Execute in all the possible modes 1046 for (size_t j = 0; j < 4; ++j) { 1047 g_ExceptionInMaster = (j & 1) != 0; 1048 g_SolitaryException = (j & 2) != 0; 1049 1050 RunWithSimpleBody(Test1_parallel_for_each, SimpleParForEachBody); 1051 } 1052 }); 1053 } 1054 } 1055 } 1056 1057 //! Testing parallel_for_each exception handling 1058 //! \brief \ref error_guessing 1059 TEST_CASE("parallel_for_each exception handling test #2") { 1060 for (auto concurrency_level: utils::concurrency_range()) { 1061 g_NumThreads = static_cast<int>(concurrency_level); 1062 g_Master = std::this_thread::get_id(); 1063 if (g_NumThreads > 1) { 1064 tbb::task_arena a(g_NumThreads); 1065 a.execute([] { 1066 // Execute in all the possible modes 1067 for (size_t j = 0; j < 4; ++j) { 1068 g_ExceptionInMaster = (j & 1) != 0; 1069 g_SolitaryException = (j & 2) != 0; 1070 1071 RunWithTemplatedBody(Test2_parallel_for_each, OuterParForEachBody); 1072 } 1073 }); 1074 } 1075 } 1076 } 1077 1078 //! Testing parallel_for_each exception handling 1079 //! \brief \ref error_guessing 1080 TEST_CASE("parallel_for_each exception handling test #3") { 1081 for (auto concurrency_level: utils::concurrency_range()) { 1082 g_NumThreads = static_cast<int>(concurrency_level); 1083 g_Master = std::this_thread::get_id(); 1084 if (g_NumThreads > 1) { 1085 tbb::task_arena a(g_NumThreads); 1086 a.execute([] { 1087 // Execute in all the possible modes 1088 for (size_t j = 0; j < 4; ++j) { 1089 g_ExceptionInMaster = (j & 1) != 0; 1090 g_SolitaryException = (j & 2) != 0; 1091 1092 RunWithTemplatedBody(Test3_parallel_for_each, OuterParForEachBodyWithIsolatedCtx); 1093 } 1094 }); 1095 } 1096 } 1097 } 1098 1099 //! Testing parallel_for_each exception handling 1100 //! \brief \ref error_guessing 1101 TEST_CASE("parallel_for_each exception handling test #4") { 1102 for (auto concurrency_level: utils::concurrency_range()) { 1103 g_NumThreads = static_cast<int>(concurrency_level); 1104 g_Master = std::this_thread::get_id(); 1105 if (g_NumThreads > 1) { 1106 tbb::task_arena a(g_NumThreads); 1107 a.execute([] { 1108 // Execute in all the possible modes 1109 for (size_t j = 0; j < 4; ++j) { 1110 g_ExceptionInMaster = (j & 1) != 0; 1111 g_SolitaryException = (j & 2) != 0; 1112 1113 RunWithTemplatedBody(Test4_parallel_for_each, OuterParForEachWithEhBody); 1114 } 1115 }); 1116 } 1117 } 1118 } 1119 1120 //! Testing parallel_for_each exception handling 1121 //! \brief \ref error_guessing 1122 TEST_CASE("parallel_for_each exception handling test #5") { 1123 for (auto concurrency_level: utils::concurrency_range()) { 1124 g_NumThreads = static_cast<int>(concurrency_level); 1125 g_Master = std::this_thread::get_id(); 1126 if (g_NumThreads > 1) { 1127 tbb::task_arena a(g_NumThreads); 1128 a.execute([] { 1129 // Execute in all the possible modes 1130 for (size_t j = 0; j < 4; ++j) { 1131 g_ExceptionInMaster = (j & 1) != 0; 1132 g_SolitaryException = (j & 2) != 0; 1133 1134 Test5_parallel_for_each<utils::InputIterator<size_t> >(); 1135 Test5_parallel_for_each<utils::ForwardIterator<size_t> >(); 1136 Test5_parallel_for_each<utils::RandomIterator<size_t> >(); 1137 } 1138 }); 1139 } 1140 } 1141 } 1142 1143 #endif /* TBB_USE_EXCEPTIONS */ 1144 1145 class ParForEachBodyToCancel { 1146 public: 1147 void operator()( size_t& /*value*/ ) const { 1148 ++g_CurExecuted; 1149 Cancellator::WaitUntilReady(); 1150 } 1151 }; 1152 1153 class ParForEachBodyToCancelWithFeeder : ParForEachBodyToCancel { 1154 public: 1155 void operator()( size_t& value, tbb::feeder<size_t> &feeder ) const { 1156 Feed(feeder, 0); 1157 ParForEachBodyToCancel::operator()(value); 1158 } 1159 }; 1160 1161 template<class B, class Iterator> 1162 class ParForEachWorker { 1163 tbb::task_group_context &my_ctx; 1164 public: 1165 void operator()() const { 1166 auto range = adaptive_range<Iterator>(get_iter_range_size()); 1167 tbb::parallel_for_each( std::begin(range), std::end(range), B(), my_ctx ); 1168 } 1169 1170 ParForEachWorker ( tbb::task_group_context& ctx ) : my_ctx(ctx) {} 1171 }; 1172 1173 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm). 1174 template <class Iterator, class body_to_cancel> 1175 void TestCancelation1_parallel_for_each () { 1176 ResetGlobals( false ); 1177 // Threshold should leave more than max_threads tasks to test the cancellation. Set the threshold to iter_range_size()/4 since iter_range_size >= max_threads*2 1178 intptr_t threshold = get_iter_range_size() / 4; 1179 REQUIRE_MESSAGE(get_iter_range_size() - threshold > g_NumThreads, "Threshold should leave more than max_threads tasks to test the cancellation."); 1180 tbb::task_group tg; 1181 tbb::task_group_context ctx; 1182 Cancellator cancellator(ctx, threshold); 1183 ParForEachWorker<body_to_cancel, Iterator> worker(ctx); 1184 tg.run( cancellator ); 1185 utils::yield(); 1186 tg.run( worker ); 1187 TRY(); 1188 tg.wait(); 1189 CATCH_AND_FAIL(); 1190 REQUIRE_MESSAGE (g_CurExecuted < g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks were executed after cancellation"); 1191 } 1192 1193 class ParForEachBodyToCancel2 { 1194 public: 1195 void operator()( size_t& /*value*/ ) const { 1196 ++g_CurExecuted; 1197 utils::ConcurrencyTracker ct; 1198 // The test will hang (and be timed out by the test system) if is_cancelled() is broken 1199 while( !tbb::is_current_task_group_canceling() ) 1200 utils::yield(); 1201 } 1202 }; 1203 1204 class ParForEachBodyToCancel2WithFeeder : ParForEachBodyToCancel2 { 1205 public: 1206 void operator()( size_t& value, tbb::feeder<size_t> &feeder ) const { 1207 Feed(feeder, 0); 1208 ParForEachBodyToCancel2::operator()(value); 1209 } 1210 }; 1211 1212 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm). 1213 /** This version also tests tbb::is_current_task_group_canceling() method. **/ 1214 template <class Iterator, class body_to_cancel> 1215 void TestCancelation2_parallel_for_each () { 1216 ResetGlobals(); 1217 RunCancellationTest<ParForEachWorker<body_to_cancel, Iterator>, Cancellator2>(); 1218 } 1219 1220 //! Testing parallel_for_each cancellation test 1221 //! \brief \ref error_guessing 1222 TEST_CASE("parallel_for_each cancellation test #1") { 1223 for (auto concurrency_level: utils::concurrency_range()) { 1224 g_NumThreads = static_cast<int>(concurrency_level); 1225 g_Master = std::this_thread::get_id(); 1226 if (g_NumThreads > 1) { 1227 tbb::task_arena a(g_NumThreads); 1228 a.execute([] { 1229 // Execute in all the possible modes 1230 for (size_t j = 0; j < 4; ++j) { 1231 g_ExceptionInMaster = (j & 1) != 0; 1232 g_SolitaryException = (j & 2) != 0; 1233 RunWithSimpleBody(TestCancelation1_parallel_for_each, ParForEachBodyToCancel); 1234 } 1235 }); 1236 } 1237 } 1238 } 1239 1240 //! Testing parallel_for_each cancellation test 1241 //! \brief \ref error_guessing 1242 TEST_CASE("parallel_for_each cancellation test #2") { 1243 for (auto concurrency_level: utils::concurrency_range()) { 1244 g_NumThreads = static_cast<int>(concurrency_level); 1245 g_Master = std::this_thread::get_id(); 1246 if (g_NumThreads > 1) { 1247 tbb::task_arena a(g_NumThreads); 1248 a.execute([] { 1249 // Execute in all the possible modes 1250 for (size_t j = 0; j < 4; ++j) { 1251 g_ExceptionInMaster = (j & 1) != 0; 1252 g_SolitaryException = (j & 2) != 0; 1253 1254 RunWithSimpleBody(TestCancelation2_parallel_for_each, ParForEachBodyToCancel2); 1255 } 1256 }); 1257 } 1258 } 1259 } 1260 1261 //////////////////////////////////////////////////////////////////////////////// 1262 // Tests for tbb::parallel_pipeline 1263 //////////////////////////////////////////////////////////////////////////////// 1264 int g_NumTokens = 0; 1265 1266 // Simple input filter class, it assigns 1 to all array members 1267 // It stops when it receives item equal to -1 1268 class InputFilter { 1269 mutable std::atomic<size_t> m_Item{}; 1270 mutable std::vector<size_t> m_Buffer; 1271 public: 1272 InputFilter() : m_Buffer(get_iter_range_size()) { 1273 m_Item = 0; 1274 for (size_t i = 0; i < get_iter_range_size(); ++i ) 1275 m_Buffer[i] = 1; 1276 } 1277 InputFilter(const InputFilter& other) : m_Item(other.m_Item.load()), m_Buffer(get_iter_range_size()) { 1278 for (size_t i = 0; i < get_iter_range_size(); ++i ) 1279 m_Buffer[i] = other.m_Buffer[i]; 1280 } 1281 1282 void* operator()(tbb::flow_control& control) const { 1283 size_t item = m_Item++; 1284 if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true; 1285 else g_NonMasterExecuted = true; 1286 if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment(); 1287 if(item == 1) { 1288 ++g_PipelinesStarted; // count on emitting the first item. 1289 } 1290 if ( item >= get_iter_range_size() ) { 1291 control.stop(); 1292 return nullptr; 1293 } 1294 m_Buffer[item] = 1; 1295 return &m_Buffer[item]; 1296 } 1297 1298 size_t* buffer() { return m_Buffer.data(); } 1299 }; // class InputFilter 1300 1301 #if TBB_USE_EXCEPTIONS 1302 1303 // Simple filter with exception throwing. If parallel, will wait until 1304 // as many parallel filters start as there are threads. 1305 class SimpleFilter { 1306 bool m_canThrow; 1307 bool m_serial; 1308 public: 1309 SimpleFilter (bool canThrow, bool serial ) : m_canThrow(canThrow), m_serial(serial) {} 1310 void* operator()(void* item) const { 1311 ++g_CurExecuted; 1312 if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true; 1313 else g_NonMasterExecuted = true; 1314 if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment(); 1315 if ( m_canThrow ) { 1316 if ( !m_serial ) { 1317 utils::ConcurrencyTracker ct; 1318 WaitUntilConcurrencyPeaks( std::min(g_NumTokens, g_NumThreads) ); 1319 } 1320 ThrowTestException(1); 1321 } 1322 return item; 1323 } 1324 }; // class SimpleFilter 1325 1326 // This enumeration represents filters order in pipeline 1327 struct FilterSet { 1328 tbb::filter_mode mode1, mode2; 1329 bool throw1, throw2; 1330 1331 FilterSet( tbb::filter_mode m1, tbb::filter_mode m2, bool t1, bool t2 ) 1332 : mode1(m1), mode2(m2), throw1(t1), throw2(t2) 1333 {} 1334 }; // struct FilterSet 1335 1336 FilterSet serial_parallel( tbb::filter_mode::serial_in_order, tbb::filter_mode::parallel, /*throw1*/false, /*throw2*/true ); 1337 1338 template<typename InFilter, typename Filter> 1339 class CustomPipeline { 1340 InFilter inputFilter; 1341 Filter filter1; 1342 Filter filter2; 1343 FilterSet my_filters; 1344 public: 1345 CustomPipeline( const FilterSet& filters ) 1346 : filter1(filters.throw1, filters.mode1 != tbb::filter_mode::parallel), 1347 filter2(filters.throw2, filters.mode2 != tbb::filter_mode::parallel), 1348 my_filters(filters) 1349 {} 1350 1351 void run () { 1352 tbb::parallel_pipeline( 1353 g_NumTokens, 1354 tbb::make_filter<void, void*>(tbb::filter_mode::parallel, inputFilter) & 1355 tbb::make_filter<void*, void*>(my_filters.mode1, filter1) & 1356 tbb::make_filter<void*, void>(my_filters.mode2, filter2) 1357 ); 1358 } 1359 void run ( tbb::task_group_context& ctx ) { 1360 tbb::parallel_pipeline( 1361 g_NumTokens, 1362 tbb::make_filter<void, void*>(tbb::filter_mode::parallel, inputFilter) & 1363 tbb::make_filter<void*, void*>(my_filters.mode1, filter1) & 1364 tbb::make_filter<void*, void>(my_filters.mode2, filter2), 1365 ctx 1366 ); 1367 } 1368 }; 1369 1370 typedef CustomPipeline<InputFilter, SimpleFilter> SimplePipeline; 1371 1372 // Tests exceptions without nesting 1373 void Test1_pipeline ( const FilterSet& filters ) { 1374 ResetGlobals(); 1375 SimplePipeline testPipeline(filters); 1376 TRY(); 1377 testPipeline.run(); 1378 if ( g_CurExecuted == 2 * static_cast<int>(get_iter_range_size()) ) { 1379 // all the items were processed, though an exception was supposed to occur. 1380 if(!g_ExceptionInMaster && g_NonMasterExecutedThrow > 0) { 1381 // if !g_ExceptionInMaster, the external thread is not allowed to throw. 1382 // if g_nonMasterExcutedThrow > 0 then a thread besides the external thread tried to throw. 1383 REQUIRE_MESSAGE((filters.mode1 != tbb::filter_mode::parallel && filters.mode2 != tbb::filter_mode::parallel), 1384 "Unusual count"); 1385 } 1386 // In case of all serial filters they might be all executed in the thread(s) 1387 // where exceptions are not allowed by the common test logic. So we just quit. 1388 return; 1389 } 1390 CATCH_AND_ASSERT(); 1391 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception"); 1392 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception"); 1393 REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test"); 1394 if ( !g_SolitaryException ) 1395 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception"); 1396 1397 } // void Test1_pipeline () 1398 1399 // Filter with nesting 1400 class OuterFilter { 1401 public: 1402 OuterFilter ( bool, bool ) {} 1403 1404 void* operator()(void* item) const { 1405 ++g_OuterParCalls; 1406 SimplePipeline testPipeline(serial_parallel); 1407 testPipeline.run(); 1408 return item; 1409 } 1410 }; // class OuterFilter 1411 1412 //! Uses pipeline containing an inner pipeline with the default context not wrapped by a try-block. 1413 /** Inner algorithms are spawned inside the new bound context by default. Since 1414 exceptions thrown from the inner pipeline are not handled by the caller 1415 (outer pipeline body) in this test, they will cancel all the sibling inner 1416 algorithms. **/ 1417 void Test2_pipeline ( const FilterSet& filters ) { 1418 ResetGlobals(); 1419 g_NestedPipelines = true; 1420 CustomPipeline<InputFilter, OuterFilter> testPipeline(filters); 1421 TRY(); 1422 testPipeline.run(); 1423 CATCH_AND_ASSERT(); 1424 bool okayNoExceptionCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow); 1425 REQUIRE_MESSAGE ((g_NumExceptionsCaught == 1 || okayNoExceptionCaught), "No try_blocks in any body expected in this test"); 1426 if ( !g_SolitaryException ) { 1427 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception"); 1428 } 1429 } // void Test2_pipeline () 1430 1431 //! creates isolated inner pipeline and runs it. 1432 class OuterFilterWithIsolatedCtx { 1433 public: 1434 OuterFilterWithIsolatedCtx( bool , bool ) {} 1435 1436 void* operator()(void* item) const { 1437 ++g_OuterParCalls; 1438 tbb::task_group_context ctx(tbb::task_group_context::isolated); 1439 // create inner pipeline with serial input, parallel output filter, second filter throws 1440 SimplePipeline testPipeline(serial_parallel); 1441 testPipeline.run(ctx); 1442 return item; 1443 } 1444 }; // class OuterFilterWithIsolatedCtx 1445 1446 //! Uses pipeline invoking an inner pipeline with an isolated context without a try-block. 1447 /** Even though exceptions thrown from the inner pipeline are not handled 1448 by the caller in this test, they will not affect sibling inner algorithms 1449 already running because of the isolated contexts. However because the first 1450 exception cancels the root parallel_for_each only the first g_NumThreads subranges 1451 will be processed (which launch inner pipelines) **/ 1452 void Test3_pipeline ( const FilterSet& filters ) { 1453 for( int nTries = 1; nTries <= 4; ++nTries) { 1454 ResetGlobals(); 1455 g_NestedPipelines = true; 1456 g_Master = std::this_thread::get_id(); 1457 intptr_t innerCalls = get_iter_range_size(), 1458 minExecuted = (g_NumThreads - 1) * innerCalls; 1459 CustomPipeline<InputFilter, OuterFilterWithIsolatedCtx> testPipeline(filters); 1460 TRY(); 1461 testPipeline.run(); 1462 CATCH_AND_ASSERT(); 1463 1464 bool okayNoExceptionCaught = (g_ExceptionInMaster && !g_MasterExecuted) || 1465 (!g_ExceptionInMaster && !g_NonMasterExecuted); 1466 // only test assertions if the test threw an exception (or we don't care) 1467 bool testSucceeded = okayNoExceptionCaught || g_NumExceptionsCaught > 0; 1468 if(testSucceeded) { 1469 if (g_SolitaryException) { 1470 1471 // The test is one outer pipeline with two NestedFilters that each start an inner pipeline. 1472 // Each time the input filter of a pipeline delivers its first item, it increments 1473 // g_PipelinesStarted. When g_SolitaryException, the throw will not occur until 1474 // g_PipelinesStarted >= 3. (This is so at least a second pipeline in its own isolated 1475 // context will start; that is what we're testing.) 1476 // 1477 // There are two pipelines which will NOT run to completion when a solitary throw 1478 // happens in an isolated inner context: the outer pipeline and the pipeline which 1479 // throws. All the other pipelines which start should run to completion. But only 1480 // inner body invocations are counted. 1481 // 1482 // So g_CurExecuted should be about 1483 // 1484 // (2*get_iter_range_size()) * (g_PipelinesStarted - 2) + 1 1485 // ^ executions for each completed pipeline 1486 // ^ completing pipelines (remembering two will not complete) 1487 // ^ one for the inner throwing pipeline 1488 1489 minExecuted = (2*get_iter_range_size()) * (g_PipelinesStarted - 2) + 1; 1490 // each failing pipeline must execute at least two tasks 1491 REQUIRE_MESSAGE(g_CurExecuted >= minExecuted, "Too few tasks survived exception"); 1492 // no more than g_NumThreads tasks will be executed in a cancelled context. Otherwise 1493 // tasks not executing at throw were scheduled. 1494 g_TGCCancelled.validate(g_NumThreads, "Tasks not in-flight were executed"); 1495 REQUIRE_MESSAGE(g_NumExceptionsCaught == 1, "Should have only one exception"); 1496 // if we're only throwing from the external thread, and that thread didn't 1497 // participate in the pipelines, then no throw occurred. 1498 } 1499 REQUIRE_MESSAGE ((g_NumExceptionsCaught == 1 || okayNoExceptionCaught), "No try_blocks in any body expected in this test"); 1500 REQUIRE_MESSAGE (((g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads) || okayNoExceptionCaught), "Too many tasks survived exception"); 1501 return; 1502 } 1503 } 1504 } 1505 1506 class OuterFilterWithEhBody { 1507 public: 1508 OuterFilterWithEhBody( bool, bool ){} 1509 1510 void* operator()(void* item) const { 1511 tbb::task_group_context ctx(tbb::task_group_context::isolated); 1512 ++g_OuterParCalls; 1513 SimplePipeline testPipeline(serial_parallel); 1514 TRY(); 1515 testPipeline.run(ctx); 1516 CATCH(); 1517 return item; 1518 } 1519 }; // class OuterFilterWithEhBody 1520 1521 //! Uses pipeline body invoking an inner pipeline (with isolated context) inside a try-block. 1522 /** Since exception(s) thrown from the inner pipeline are handled by the caller 1523 in this test, they do not affect other tasks of the the root pipeline 1524 nor sibling inner algorithms. **/ 1525 void Test4_pipeline ( const FilterSet& filters ) { 1526 #if __GNUC__ && !__INTEL_COMPILER 1527 if ( strncmp(__VERSION__, "4.1.0", 5) == 0 ) { 1528 MESSAGE("Known issue: one of exception handling tests is skipped."); 1529 return; 1530 } 1531 #endif 1532 ResetGlobals( true, true ); 1533 // each outer pipeline stage will start get_iter_range_size() inner pipelines. 1534 // each inner pipeline that doesn't throw will process get_iter_range_size() items. 1535 // for solitary exception there will be one pipeline that only processes one stage, one item. 1536 // innerCalls should be 2*get_iter_range_size() 1537 intptr_t innerCalls = 2*get_iter_range_size(), 1538 outerCalls = 2*get_iter_range_size(), 1539 maxExecuted = outerCalls * innerCalls; // the number of invocations of the inner pipelines 1540 CustomPipeline<InputFilter, OuterFilterWithEhBody> testPipeline(filters); 1541 TRY(); 1542 testPipeline.run(); 1543 CATCH_AND_ASSERT(); 1544 intptr_t minExecuted = 0; 1545 bool okayNoExceptionCaught = (g_ExceptionInMaster && !g_MasterExecuted) || 1546 (!g_ExceptionInMaster && !g_NonMasterExecuted); 1547 if ( g_SolitaryException ) { 1548 minExecuted = maxExecuted - innerCalls; // one throwing inner pipeline 1549 REQUIRE_MESSAGE((g_NumExceptionsCaught == 1 || okayNoExceptionCaught), "No exception registered"); 1550 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception"); // probably will assert. 1551 } 1552 else { 1553 // we assume throwing pipelines will not count 1554 minExecuted = (outerCalls - g_NumExceptionsCaught) * innerCalls; 1555 REQUIRE_MESSAGE(((g_NumExceptionsCaught >= 1 && g_NumExceptionsCaught <= outerCalls) || okayNoExceptionCaught), "Unexpected actual number of exceptions"); 1556 REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too many executed tasks reported"); 1557 // too many already-scheduled tasks are started after the first exception is 1558 // thrown. And g_ExecutedAtLastCatch is updated every time an exception is caught. 1559 // So with multiple exceptions there are a variable number of tasks that have been 1560 // discarded because of the signals. 1561 // each throw is caught, so we will see many cancelled tasks. g_ExecutedAtLastCatch is 1562 // updated with each throw, so the value will be the number of tasks executed at the last 1563 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived multiple exceptions"); 1564 } 1565 } // void Test4_pipeline () 1566 1567 //! Tests pipeline function passed with different combination of filters 1568 template<void testFunc(const FilterSet&)> 1569 void TestWithDifferentFiltersAndConcurrency() { 1570 #if __TBB_USE_ADDRESS_SANITIZER 1571 // parallel_pipeline allocates tls that sporadically observed as a memory leak with 1572 // detached threads. So, use task_scheduler_handle to join threads with finalize 1573 tbb::task_scheduler_handle handle{ tbb::attach{} }; 1574 #endif 1575 for (auto concurrency_level: utils::concurrency_range()) { 1576 g_NumThreads = static_cast<int>(concurrency_level); 1577 g_Master = std::this_thread::get_id(); 1578 if (g_NumThreads > 1) { 1579 1580 const tbb::filter_mode modes[] = { 1581 tbb::filter_mode::parallel, 1582 tbb::filter_mode::serial_in_order, 1583 tbb::filter_mode::serial_out_of_order 1584 }; 1585 1586 const int NumFilterTypes = sizeof(modes)/sizeof(modes[0]); 1587 1588 // Execute in all the possible modes 1589 for ( size_t j = 0; j < 4; ++j ) { 1590 tbb::task_arena a(g_NumThreads); 1591 a.execute([&] { 1592 g_ExceptionInMaster = (j & 1) != 0; 1593 g_SolitaryException = (j & 2) != 0; 1594 g_NumTokens = 2 * g_NumThreads; 1595 1596 for (int i = 0; i < NumFilterTypes; ++i) { 1597 for (int n = 0; n < NumFilterTypes; ++n) { 1598 for (int k = 0; k < 2; ++k) 1599 testFunc(FilterSet(modes[i], modes[n], k == 0, k != 0)); 1600 } 1601 } 1602 }); 1603 } 1604 } 1605 } 1606 #if __TBB_USE_ADDRESS_SANITIZER 1607 tbb::finalize(handle); 1608 #endif 1609 } 1610 1611 //! Testing parallel_pipeline exception handling 1612 //! \brief \ref error_guessing 1613 TEST_CASE("parallel_pipeline exception handling test #1") { 1614 TestWithDifferentFiltersAndConcurrency<Test1_pipeline>(); 1615 } 1616 1617 //! Testing parallel_pipeline exception handling 1618 //! \brief \ref error_guessing 1619 TEST_CASE("parallel_pipeline exception handling test #2") { 1620 TestWithDifferentFiltersAndConcurrency<Test2_pipeline>(); 1621 } 1622 1623 //! Testing parallel_pipeline exception handling 1624 //! \brief \ref error_guessing 1625 TEST_CASE("parallel_pipeline exception handling test #3") { 1626 TestWithDifferentFiltersAndConcurrency<Test3_pipeline>(); 1627 } 1628 1629 //! Testing parallel_pipeline exception handling 1630 //! \brief \ref error_guessing 1631 TEST_CASE("parallel_pipeline exception handling test #4") { 1632 TestWithDifferentFiltersAndConcurrency<Test4_pipeline>(); 1633 } 1634 1635 #endif /* TBB_USE_EXCEPTIONS */ 1636 1637 class FilterToCancel { 1638 public: 1639 FilterToCancel() {} 1640 void* operator()(void* item) const { 1641 ++g_CurExecuted; 1642 Cancellator::WaitUntilReady(); 1643 return item; 1644 } 1645 }; // class FilterToCancel 1646 1647 template <class Filter_to_cancel> 1648 class PipelineLauncher { 1649 tbb::task_group_context &my_ctx; 1650 public: 1651 PipelineLauncher ( tbb::task_group_context& ctx ) : my_ctx(ctx) {} 1652 1653 void operator()() const { 1654 // Run test when serial filter is the first non-input filter 1655 InputFilter inputFilter; 1656 Filter_to_cancel filterToCancel; 1657 tbb::parallel_pipeline( 1658 g_NumTokens, 1659 tbb::make_filter<void, void*>(tbb::filter_mode::parallel, inputFilter) & 1660 tbb::make_filter<void*, void>(tbb::filter_mode::parallel, filterToCancel), 1661 my_ctx 1662 ); 1663 } 1664 }; 1665 1666 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm). 1667 void TestCancelation1_pipeline () { 1668 ResetGlobals(); 1669 g_ThrowException = false; 1670 // Threshold should leave more than max_threads tasks to test the cancellation. Set the threshold to iter_range_size()/4 since iter_range_size >= max_threads*2 1671 intptr_t threshold = get_iter_range_size() / 4; 1672 REQUIRE_MESSAGE(get_iter_range_size() - threshold > g_NumThreads, "Threshold should leave more than max_threads tasks to test the cancellation."); 1673 RunCancellationTest<PipelineLauncher<FilterToCancel>, Cancellator>(threshold); 1674 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived cancellation"); 1675 REQUIRE_MESSAGE (g_CurExecuted < g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks were executed after cancellation"); 1676 } 1677 1678 class FilterToCancel2 { 1679 public: 1680 FilterToCancel2() {} 1681 1682 void* operator()(void* item) const { 1683 ++g_CurExecuted; 1684 utils::ConcurrencyTracker ct; 1685 // The test will hang (and be timed out by the test system) if is_cancelled() is broken 1686 while( !tbb::is_current_task_group_canceling() ) 1687 utils::yield(); 1688 return item; 1689 } 1690 }; 1691 1692 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm). 1693 /** This version also tests task::is_cancelled() method. **/ 1694 void TestCancelation2_pipeline () { 1695 ResetGlobals(); 1696 RunCancellationTest<PipelineLauncher<FilterToCancel2>, Cancellator2>(); 1697 // g_CurExecuted is always >= g_ExecutedAtLastCatch, because the latter is always a snapshot of the 1698 // former, and g_CurExecuted is monotonic increasing. so the comparison should be at least ==. 1699 // If another filter is started after cancel but before cancellation is propagated, then the 1700 // number will be larger. 1701 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch, "Some tasks were executed after cancellation"); 1702 } 1703 1704 /** If min and max thread numbers specified on the command line are different, 1705 the test is run only for 2 sizes of the thread pool (MinThread and MaxThread) 1706 to be able to test the high and low contention modes while keeping the test reasonably fast **/ 1707 1708 //! Testing parallel_pipeline cancellation 1709 //! \brief \ref error_guessing 1710 TEST_CASE("parallel_pipeline cancellation test #1") { 1711 for (auto concurrency_level: utils::concurrency_range()) { 1712 g_NumThreads = static_cast<int>(concurrency_level); 1713 g_Master = std::this_thread::get_id(); 1714 if (g_NumThreads > 1) { 1715 tbb::task_arena a(g_NumThreads); 1716 a.execute([] { 1717 // Execute in all the possible modes 1718 for (size_t j = 0; j < 4; ++j) { 1719 g_ExceptionInMaster = (j & 1) != 0; 1720 g_SolitaryException = (j & 2) != 0; 1721 g_NumTokens = 2 * g_NumThreads; 1722 1723 TestCancelation1_pipeline(); 1724 } 1725 }); 1726 } 1727 } 1728 } 1729 1730 //! Testing parallel_pipeline cancellation 1731 //! \brief \ref error_guessing 1732 TEST_CASE("parallel_pipeline cancellation test #2") { 1733 for (auto concurrency_level: utils::concurrency_range()) { 1734 g_NumThreads = static_cast<int>(concurrency_level); 1735 g_Master = std::this_thread::get_id(); 1736 if (g_NumThreads > 1) { 1737 tbb::task_arena a(g_NumThreads); 1738 a.execute([] { 1739 // Execute in all the possible modes 1740 for (size_t j = 0; j < 4; ++j) { 1741 g_ExceptionInMaster = (j & 1) != 0; 1742 g_SolitaryException = (j & 2) != 0; 1743 g_NumTokens = 2 * g_NumThreads; 1744 1745 TestCancelation2_pipeline(); 1746 } 1747 }); 1748 } 1749 } 1750 } 1751