1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "common/test.h" 18 #include "common/concurrency_tracker.h" 19 #include "common/iterator.h" 20 #include "common/utils_concurrency_limit.h" 21 22 #include <limits.h> // for INT_MAX 23 #include <thread> 24 #include <vector> 25 26 #include "tbb/parallel_for.h" 27 #include "tbb/parallel_reduce.h" 28 #include "tbb/parallel_for_each.h" 29 #include "tbb/parallel_pipeline.h" 30 #include "tbb/blocked_range.h" 31 #include "tbb/task_group.h" 32 #include "tbb/global_control.h" 33 #include "tbb/concurrent_unordered_map.h" 34 #include "tbb/task.h" 35 36 //! \file test_eh_algorithms.cpp 37 //! \brief Test for [algorithms.parallel_for algorithms.parallel_reduce algorithms.parallel_deterministic_reduce algorithms.parallel_for_each algorithms.parallel_pipeline algorithms.parallel_pipeline.flow_control] specifications 38 39 #define FLAT_RANGE 100000 40 #define FLAT_GRAIN 100 41 #define OUTER_RANGE 100 42 #define OUTER_GRAIN 10 43 #define INNER_RANGE (FLAT_RANGE / OUTER_RANGE) 44 #define INNER_GRAIN (FLAT_GRAIN / OUTER_GRAIN) 45 46 struct context_specific_counter { 47 tbb::concurrent_unordered_map<tbb::task_group_context*, std::atomic<unsigned>> context_map{}; 48 49 void increment() { 50 tbb::task_group_context* ctx = tbb::task::current_context(); 51 REQUIRE(ctx != nullptr); 52 context_map[ctx]++; 53 } 54 55 void reset() { 56 context_map.clear(); 57 } 58 59 void validate(unsigned expected_count, const char* msg) { 60 for (auto it = context_map.begin(); it != context_map.end(); it++) { 61 REQUIRE_MESSAGE( it->second <= expected_count, msg); 62 } 63 } 64 }; 65 66 std::atomic<intptr_t> g_FedTasksCount{}; // number of tasks added by parallel_for_each feeder 67 std::atomic<intptr_t> g_OuterParCalls{}; // number of actual invocations of the outer construct executed. 68 context_specific_counter g_TGCCancelled{}; // Number of times a task sees its group cancelled at start 69 70 #include "common/exception_handling.h" 71 72 /******************************** 73 Variables in test 74 75 __ Test control variables 76 g_ExceptionInMaster -- only the external thread is allowed to throw. If false, the external cannot throw 77 g_SolitaryException -- only one throw may be executed. 78 79 -- controls for ThrowTestException for pipeline tests 80 g_NestedPipelines -- are inner pipelines being run? 81 g_PipelinesStarted -- how many pipelines have run their first filter at least once. 82 83 -- Information variables 84 85 g_Master -- Thread ID of the "external" thread 86 In pipelines sometimes the external thread does not participate, so the tests have to be resilient to this. 87 88 -- Measurement variables 89 90 g_OuterParCalls -- how many outer parallel ranges or filters started 91 g_TGCCancelled -- how many inner parallel ranges or filters saw task::self().is_cancelled() 92 g_ExceptionsThrown -- number of throws executed (counted in ThrowTestException) 93 g_MasterExecutedThrow -- number of times external thread actually executed a throw 94 g_NonMasterExecutedThrow -- number of times non-external thread actually executed a throw 95 g_ExceptionCaught -- one of PropagatedException or unknown exception was caught. (Other exceptions cause assertions.) 96 97 -- Tallies for the task bodies which have executed (counted in each inner body, sampled in ThrowTestException) 98 g_CurExecuted -- total number of inner ranges or filters which executed 99 g_ExecutedAtLastCatch -- value of g_CurExecuted when last catch was made, 0 if none. 100 g_ExecutedAtFirstCatch -- value of g_CurExecuted when first catch is made, 0 if none. 101 *********************************/ 102 103 inline void ResetGlobals ( bool throwException = true, bool flog = false ) { 104 ResetEhGlobals( throwException, flog ); 105 g_FedTasksCount = 0; 106 g_OuterParCalls = 0; 107 g_NestedPipelines = false; 108 g_TGCCancelled.reset(); 109 } 110 111 //////////////////////////////////////////////////////////////////////////////// 112 // Tests for tbb::parallel_for and tbb::parallel_reduce 113 //////////////////////////////////////////////////////////////////////////////// 114 115 typedef size_t count_type; 116 typedef tbb::blocked_range<count_type> range_type; 117 118 inline intptr_t CountSubranges(range_type r) { 119 if(!r.is_divisible()) return intptr_t(1); 120 range_type r2(r,tbb::split()); 121 return CountSubranges(r) + CountSubranges(r2); 122 } 123 124 inline intptr_t NumSubranges ( intptr_t length, intptr_t grain ) { 125 return CountSubranges(range_type(0,length,grain)); 126 } 127 128 template<class Body> 129 intptr_t TestNumSubrangesCalculation ( intptr_t length, intptr_t grain, intptr_t inner_length, intptr_t inner_grain ) { 130 ResetGlobals(); 131 g_ThrowException = false; 132 intptr_t outerCalls = NumSubranges(length, grain), 133 innerCalls = NumSubranges(inner_length, inner_grain), 134 maxExecuted = outerCalls * (innerCalls + 1); 135 tbb::parallel_for( range_type(0, length, grain), Body() ); 136 REQUIRE_MESSAGE (g_CurExecuted == maxExecuted, "Wrong estimation of bodies invocation count"); 137 return maxExecuted; 138 } 139 140 class NoThrowParForBody { 141 public: 142 void operator()( const range_type& r ) const { 143 if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true; 144 else g_NonMasterExecuted = true; 145 if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment(); 146 utils::doDummyWork(r.size()); 147 } 148 }; 149 150 #if TBB_USE_EXCEPTIONS 151 152 void Test0 () { 153 ResetGlobals(); 154 tbb::simple_partitioner p; 155 for( size_t i=0; i<10; ++i ) { 156 tbb::parallel_for( range_type(0, 0, 1), NoThrowParForBody() ); 157 tbb::parallel_for( range_type(0, 0, 1), NoThrowParForBody(), p ); 158 tbb::parallel_for( range_type(0, 128, 8), NoThrowParForBody() ); 159 tbb::parallel_for( range_type(0, 128, 8), NoThrowParForBody(), p ); 160 } 161 } // void Test0 () 162 163 //! Template that creates a functor suitable for parallel_reduce from a functor for parallel_for. 164 template<typename ParForBody> 165 class SimpleParReduceBody { 166 ParForBody m_Body; 167 public: 168 void operator=(const SimpleParReduceBody&) = delete; 169 SimpleParReduceBody(const SimpleParReduceBody&) = default; 170 SimpleParReduceBody() = default; 171 172 void operator()( const range_type& r ) const { m_Body(r); } 173 SimpleParReduceBody( SimpleParReduceBody& left, tbb::split ) : m_Body(left.m_Body) {} 174 void join( SimpleParReduceBody& /*right*/ ) {} 175 }; // SimpleParReduceBody 176 177 //! Test parallel_for and parallel_reduce for a given partitioner. 178 /** The Body need only be suitable for a parallel_for. */ 179 template<typename ParForBody, typename Partitioner> 180 void TestParallelLoopAux() { 181 Partitioner partitioner; 182 for( int i=0; i<2; ++i ) { 183 ResetGlobals(); 184 TRY(); 185 if( i==0 ) 186 tbb::parallel_for( range_type(0, FLAT_RANGE, FLAT_GRAIN), ParForBody(), partitioner ); 187 else { 188 SimpleParReduceBody<ParForBody> rb; 189 tbb::parallel_reduce( range_type(0, FLAT_RANGE, FLAT_GRAIN), rb, partitioner ); 190 } 191 CATCH_AND_ASSERT(); 192 // two cases: g_SolitaryException and !g_SolitaryException 193 // 1) g_SolitaryException: only one thread actually threw. There is only one context, so the exception 194 // (when caught) will cause that context to be cancelled. After this event, there may be one or 195 // more threads which are "in-flight", up to g_NumThreads, but no more will be started. The threads, 196 // when they start, if they see they are cancelled, TGCCancelled is incremented. 197 // 2) !g_SolitaryException: more than one thread can throw. The number of threads that actually 198 // threw is g_MasterExecutedThrow if only the external thread is allowed, else g_NonMasterExecutedThrow. 199 // Only one context, so TGCCancelled should be <= g_NumThreads. 200 // 201 // the reasoning is similar for nested algorithms in a single context (Test2). 202 // 203 // If a thread throws in a context, more than one subsequent task body may see the 204 // cancelled state (if they are scheduled before the state is propagated.) this is 205 // infrequent, but it occurs. So what was to be an assertion must be a remark. 206 g_TGCCancelled.validate( g_NumThreads, "Too many tasks ran after exception thrown"); 207 REQUIRE_MESSAGE(g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception"); 208 if ( g_SolitaryException ) { 209 REQUIRE_MESSAGE(g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test"); 210 REQUIRE_MESSAGE(g_NumExceptionsCaught == (g_ExceptionInMaster ? g_MasterExecutedThrow : g_NonMasterExecutedThrow), 211 "Not all throws were caught"); 212 REQUIRE_MESSAGE(g_ExecutedAtFirstCatch == g_ExecutedAtLastCatch, "Too many exceptions occurred"); 213 } 214 else { 215 REQUIRE_MESSAGE(g_NumExceptionsCaught >= 1, "No try blocks in any body expected in this test"); 216 } 217 } 218 } // TestParallelLoopAux 219 220 //! Test with parallel_for and parallel_reduce, over all three kinds of partitioners. 221 /** The Body only needs to be suitable for tbb::parallel_for. */ 222 template<typename Body> 223 void TestParallelLoop() { 224 // The simple and auto partitioners should be const, but not the affinity partitioner. 225 TestParallelLoopAux<Body, const tbb::simple_partitioner >(); 226 TestParallelLoopAux<Body, const tbb::auto_partitioner >(); 227 #define __TBB_TEMPORARILY_DISABLED 1 228 #if !__TBB_TEMPORARILY_DISABLED 229 // TODO: Improve the test so that it tolerates delayed start of tasks with affinity_partitioner 230 TestParallelLoopAux<Body, /***/ tbb::affinity_partitioner>(); 231 #endif 232 #undef __TBB_TEMPORARILY_DISABLED 233 } 234 235 class SimpleParForBody { 236 public: 237 void operator=(const SimpleParForBody&) = delete; 238 SimpleParForBody(const SimpleParForBody&) = default; 239 SimpleParForBody() = default; 240 241 void operator()( const range_type& r ) const { 242 utils::ConcurrencyTracker ct; 243 ++g_CurExecuted; 244 if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true; 245 else g_NonMasterExecuted = true; 246 if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment(); 247 utils::doDummyWork(r.size()); 248 WaitUntilConcurrencyPeaks(); 249 ThrowTestException(1); 250 } 251 }; 252 253 void Test1() { 254 // non-nested parallel_for/reduce with throwing body, one context 255 TestParallelLoop<SimpleParForBody>(); 256 } // void Test1 () 257 258 class OuterParForBody { 259 public: 260 void operator=(const OuterParForBody&) = delete; 261 OuterParForBody(const OuterParForBody&) = default; 262 OuterParForBody() = default; 263 void operator()( const range_type& ) const { 264 utils::ConcurrencyTracker ct; 265 ++g_OuterParCalls; 266 tbb::parallel_for( tbb::blocked_range<size_t>(0, INNER_RANGE, INNER_GRAIN), SimpleParForBody() ); 267 } 268 }; 269 270 //! Uses parallel_for body containing an inner parallel_for with the default context not wrapped by a try-block. 271 /** Inner algorithms are spawned inside the new bound context by default. Since 272 exceptions thrown from the inner parallel_for are not handled by the caller 273 (outer parallel_for body) in this test, they will cancel all the sibling inner 274 algorithms. **/ 275 void Test2 () { 276 TestParallelLoop<OuterParForBody>(); 277 } // void Test2 () 278 279 class OuterParForBodyWithIsolatedCtx { 280 public: 281 void operator()( const range_type& ) const { 282 tbb::task_group_context ctx(tbb::task_group_context::isolated); 283 ++g_OuterParCalls; 284 tbb::parallel_for( tbb::blocked_range<size_t>(0, INNER_RANGE, INNER_GRAIN), SimpleParForBody(), tbb::simple_partitioner(), ctx ); 285 } 286 }; 287 288 //! Uses parallel_for body invoking an inner parallel_for with an isolated context without a try-block. 289 /** Even though exceptions thrown from the inner parallel_for are not handled 290 by the caller in this test, they will not affect sibling inner algorithms 291 already running because of the isolated contexts. However because the first 292 exception cancels the root parallel_for only the first g_NumThreads subranges 293 will be processed (which launch inner parallel_fors) **/ 294 void Test3 () { 295 ResetGlobals(); 296 typedef OuterParForBodyWithIsolatedCtx body_type; 297 intptr_t innerCalls = NumSubranges(INNER_RANGE, INNER_GRAIN), 298 // we expect one thread to throw without counting, the rest to run to completion 299 // this formula assumes g_numThreads outer pfor ranges will be started, but that is not the 300 // case; the SimpleParFor subranges are started up as part of the outer ones, and when 301 // the amount of concurrency reaches g_NumThreads no more outer Pfor ranges are started. 302 // so we have to count the number of outer Pfors actually started. 303 minExecuted = (g_NumThreads - 1) * innerCalls; 304 TRY(); 305 tbb::parallel_for( range_type(0, OUTER_RANGE, OUTER_GRAIN), body_type() ); 306 CATCH_AND_ASSERT(); 307 minExecuted = (g_OuterParCalls - 1) * innerCalls; // see above 308 309 // The first formula above assumes all ranges of the outer parallel for are executed, and one 310 // cancels. In the event, we have a smaller number of ranges that start before the exception 311 // is caught. 312 // 313 // g_SolitaryException:One inner range throws. Outer parallel_For is cancelled, but sibling 314 // parallel_fors continue to completion (unless the threads that execute 315 // are not allowed to throw, in which case we will not see any exceptions). 316 // !g_SolitaryException:multiple inner ranges may throw. Any which throws will stop, and the 317 // corresponding range of the outer pfor will stop also. 318 // 319 // In either case, once the outer pfor gets the exception it will stop executing further ranges. 320 321 // if the only threads executing were not allowed to throw, then not seeing an exception is okay. 322 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecuted) || (!g_ExceptionInMaster && !g_NonMasterExecuted); 323 if ( g_SolitaryException ) { 324 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception"); 325 REQUIRE_MESSAGE (g_CurExecuted > minExecuted, "Too few tasks survived exception"); 326 REQUIRE_MESSAGE ((g_CurExecuted <= minExecuted + (g_ExecutedAtLastCatch + g_NumThreads)), "Too many tasks survived exception"); 327 REQUIRE_MESSAGE ((g_NumExceptionsCaught == 1 || okayNoExceptionsCaught), "No try_blocks in any body expected in this test"); 328 } 329 else { 330 REQUIRE_MESSAGE ((g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads), "Too many tasks survived exception"); 331 REQUIRE_MESSAGE ((g_NumExceptionsCaught >= 1 || okayNoExceptionsCaught), "No try_blocks in any body expected in this test"); 332 } 333 } // void Test3 () 334 335 class OuterParForExceptionSafeBody { 336 public: 337 void operator()( const range_type& ) const { 338 tbb::task_group_context ctx(tbb::task_group_context::isolated); 339 ++g_OuterParCalls; 340 TRY(); 341 tbb::parallel_for( tbb::blocked_range<size_t>(0, INNER_RANGE, INNER_GRAIN), SimpleParForBody(), tbb::simple_partitioner(), ctx ); 342 CATCH(); // this macro sets g_ExceptionCaught 343 } 344 }; 345 346 //! Uses parallel_for body invoking an inner parallel_for (with isolated context) inside a try-block. 347 /** Since exception(s) thrown from the inner parallel_for are handled by the caller 348 in this test, they do not affect neither other tasks of the the root parallel_for 349 nor sibling inner algorithms. **/ 350 void Test4 () { 351 ResetGlobals( true, true ); 352 intptr_t innerCalls = NumSubranges(INNER_RANGE, INNER_GRAIN), 353 outerCalls = NumSubranges(OUTER_RANGE, OUTER_GRAIN); 354 TRY(); 355 tbb::parallel_for( range_type(0, OUTER_RANGE, OUTER_GRAIN), OuterParForExceptionSafeBody() ); 356 CATCH(); 357 // g_SolitaryException : one inner pfor will throw, the rest will execute to completion. 358 // so the count should be (outerCalls -1) * innerCalls, if a throw happened. 359 // !g_SolitaryException : possible multiple inner pfor throws. Should be approximately 360 // (outerCalls - g_NumExceptionsCaught) * innerCalls, give or take a few 361 intptr_t minExecuted = (outerCalls - g_NumExceptionsCaught) * innerCalls; 362 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecuted) || (!g_ExceptionInMaster && !g_NonMasterExecuted); 363 if ( g_SolitaryException ) { 364 // only one task had exception thrown. That task had at least one execution (the one that threw). 365 // There may be an arbitrary number of ranges executed after the throw but before the exception 366 // is caught in the scheduler and cancellation is signaled. (seen 9, 11 and 62 (!) for 8 threads) 367 REQUIRE_MESSAGE ((g_NumExceptionsCaught == 1 || okayNoExceptionsCaught), "No exception registered"); 368 REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too few tasks executed"); 369 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception"); 370 // a small number of threads can execute in a throwing sub-pfor, if the task which is 371 // to do the solitary throw swaps out after registering its intent to throw but before it 372 // actually does so. As a result, the number of extra tasks cannot exceed the number of thread 373 // for each nested pfor invication) 374 REQUIRE_MESSAGE (g_CurExecuted <= minExecuted + (g_NumThreads-1)*g_NumThreads/2, "Too many tasks survived exception"); 375 } 376 else { 377 REQUIRE_MESSAGE (((g_NumExceptionsCaught >= 1 && g_NumExceptionsCaught <= outerCalls) || okayNoExceptionsCaught), "Unexpected actual number of exceptions"); 378 REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too few executed tasks reported"); 379 REQUIRE_MESSAGE ((g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads), "Too many tasks survived multiple exceptions"); 380 REQUIRE_MESSAGE (g_CurExecuted <= outerCalls * (1 + g_NumThreads), "Too many tasks survived exception"); 381 } 382 } // void Test4 () 383 384 //! Testing parallel_for and parallel_reduce exception handling 385 //! \brief \ref error_guessing 386 TEST_CASE("parallel_for and parallel_reduce exception handling test #0") { 387 for (auto concurrency_level: utils::concurrency_range()) { 388 g_NumThreads = static_cast<int>(concurrency_level); 389 g_Master = std::this_thread::get_id(); 390 if (g_NumThreads > 1) { 391 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 392 // Execute in all the possible modes 393 for ( size_t j = 0; j < 4; ++j ) { 394 g_ExceptionInMaster = (j & 1) != 0; 395 g_SolitaryException = (j & 2) != 0; 396 397 Test0(); 398 } 399 } 400 } 401 } 402 403 //! Testing parallel_for and parallel_reduce exception handling 404 //! \brief \ref error_guessing 405 TEST_CASE("parallel_for and parallel_reduce exception handling test #1") { 406 for (auto concurrency_level: utils::concurrency_range()) { 407 g_NumThreads = static_cast<int>(concurrency_level); 408 g_Master = std::this_thread::get_id(); 409 if (g_NumThreads > 1) { 410 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 411 // Execute in all the possible modes 412 for ( size_t j = 0; j < 4; ++j ) { 413 g_ExceptionInMaster = (j & 1) != 0; 414 g_SolitaryException = (j & 2) != 0; 415 416 Test1(); 417 } 418 } 419 } 420 } 421 422 //! Testing parallel_for and parallel_reduce exception handling 423 //! \brief \ref error_guessing 424 TEST_CASE("parallel_for and parallel_reduce exception handling test #2") { 425 for (auto concurrency_level: utils::concurrency_range()) { 426 g_NumThreads = static_cast<int>(concurrency_level); 427 g_Master = std::this_thread::get_id(); 428 if (g_NumThreads > 1) { 429 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 430 // Execute in all the possible modes 431 for ( size_t j = 0; j < 4; ++j ) { 432 g_ExceptionInMaster = (j & 1) != 0; 433 g_SolitaryException = (j & 2) != 0; 434 435 Test2(); 436 } 437 } 438 } 439 } 440 441 //! Testing parallel_for and parallel_reduce exception handling 442 //! \brief \ref error_guessing 443 TEST_CASE("parallel_for and parallel_reduce exception handling test #3") { 444 for (auto concurrency_level: utils::concurrency_range()) { 445 g_NumThreads = static_cast<int>(concurrency_level); 446 g_Master = std::this_thread::get_id(); 447 if (g_NumThreads > 1) { 448 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 449 // Execute in all the possible modes 450 for ( size_t j = 0; j < 4; ++j ) { 451 g_ExceptionInMaster = (j & 1) != 0; 452 g_SolitaryException = (j & 2) != 0; 453 454 Test3(); 455 } 456 } 457 } 458 } 459 460 //! Testing parallel_for and parallel_reduce exception handling 461 //! \brief \ref error_guessing 462 TEST_CASE("parallel_for and parallel_reduce exception handling test #4") { 463 for (auto concurrency_level: utils::concurrency_range()) { 464 g_NumThreads = static_cast<int>(concurrency_level); 465 g_Master = std::this_thread::get_id(); 466 if (g_NumThreads > 1) { 467 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 468 // Execute in all the possible modes 469 for ( size_t j = 0; j < 4; ++j ) { 470 g_ExceptionInMaster = (j & 1) != 0; 471 g_SolitaryException = (j & 2) != 0; 472 473 Test4(); 474 } 475 } 476 } 477 } 478 479 #endif /* TBB_USE_EXCEPTIONS */ 480 481 class ParForBodyToCancel { 482 public: 483 void operator()( const range_type& ) const { 484 ++g_CurExecuted; 485 Cancellator::WaitUntilReady(); 486 } 487 }; 488 489 template<class B> 490 class ParForLauncher { 491 tbb::task_group_context &my_ctx; 492 public: 493 void operator()() const { 494 tbb::parallel_for( range_type(0, FLAT_RANGE, FLAT_GRAIN), B(), tbb::simple_partitioner(), my_ctx ); 495 } 496 ParForLauncher ( tbb::task_group_context& ctx ) : my_ctx(ctx) {} 497 }; 498 499 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm). 500 void TestCancelation1 () { 501 ResetGlobals( false ); 502 RunCancellationTest<ParForLauncher<ParForBodyToCancel>, Cancellator>( NumSubranges(FLAT_RANGE, FLAT_GRAIN) / 4 ); 503 } 504 505 class Cancellator2 { 506 tbb::task_group_context &m_GroupToCancel; 507 508 public: 509 void operator()() const { 510 utils::ConcurrencyTracker ct; 511 WaitUntilConcurrencyPeaks(); 512 m_GroupToCancel.cancel_group_execution(); 513 g_ExecutedAtLastCatch = g_CurExecuted.load(); 514 } 515 516 Cancellator2 ( tbb::task_group_context& ctx, intptr_t ) : m_GroupToCancel(ctx) {} 517 }; 518 519 class ParForBodyToCancel2 { 520 public: 521 void operator()( const range_type& ) const { 522 ++g_CurExecuted; 523 utils::ConcurrencyTracker ct; 524 // The test will hang (and be timed out by the test system) if is_cancelled() is broken 525 while( !tbb::is_current_task_group_canceling() ) 526 utils::yield(); 527 } 528 }; 529 530 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm). 531 /** This version also tests tbb::is_current_task_group_canceling() method. **/ 532 void TestCancelation2 () { 533 ResetGlobals(); 534 RunCancellationTest<ParForLauncher<ParForBodyToCancel2>, Cancellator2>(); 535 REQUIRE_MESSAGE (g_ExecutedAtLastCatch < g_NumThreads, "Somehow worker tasks started their execution before the cancellator task"); 536 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived cancellation"); 537 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Some tasks were executed after cancellation"); 538 } 539 540 //////////////////////////////////////////////////////////////////////////////// 541 // Regression test based on the contribution by the author of the following forum post: 542 // http://softwarecommunity.intel.com/isn/Community/en-US/forums/thread/30254959.aspx 543 544 class Worker { 545 static const int max_nesting = 3; 546 static const int reduce_range = 1024; 547 static const int reduce_grain = 256; 548 public: 549 int DoWork (int level); 550 int Validate (int start_level) { 551 int expected = 1; // identity for multiplication 552 for(int i=start_level+1; i<max_nesting; ++i) 553 expected *= reduce_range; 554 return expected; 555 } 556 }; 557 558 class RecursiveParReduceBodyWithSharedWorker { 559 Worker * m_SharedWorker; 560 int m_NestingLevel; 561 int m_Result; 562 public: 563 RecursiveParReduceBodyWithSharedWorker ( RecursiveParReduceBodyWithSharedWorker& src, tbb::split ) 564 : m_SharedWorker(src.m_SharedWorker) 565 , m_NestingLevel(src.m_NestingLevel) 566 , m_Result(0) 567 {} 568 RecursiveParReduceBodyWithSharedWorker ( Worker *w, int outer ) 569 : m_SharedWorker(w) 570 , m_NestingLevel(outer) 571 , m_Result(0) 572 {} 573 574 void operator() ( const tbb::blocked_range<size_t>& r ) { 575 if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true; 576 else g_NonMasterExecuted = true; 577 if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment(); 578 for (size_t i = r.begin (); i != r.end (); ++i) { 579 m_Result += m_SharedWorker->DoWork (m_NestingLevel); 580 } 581 } 582 void join (const RecursiveParReduceBodyWithSharedWorker & x) { 583 m_Result += x.m_Result; 584 } 585 int result () { return m_Result; } 586 }; 587 588 int Worker::DoWork ( int level ) { 589 ++level; 590 if ( level < max_nesting ) { 591 RecursiveParReduceBodyWithSharedWorker rt (this, level); 592 tbb::parallel_reduce (tbb::blocked_range<size_t>(0, reduce_range, reduce_grain), rt); 593 return rt.result(); 594 } 595 else 596 return 1; 597 } 598 599 //! Regression test for hanging that occurred with the first version of cancellation propagation 600 void TestCancelation3 () { 601 Worker w; 602 int result = w.DoWork (0); 603 int expected = w.Validate(0); 604 REQUIRE_MESSAGE ( result == expected, "Wrong calculation result"); 605 } 606 607 struct StatsCounters { 608 std::atomic<size_t> my_total_created; 609 std::atomic<size_t> my_total_deleted; 610 StatsCounters() { 611 my_total_created = 0; 612 my_total_deleted = 0; 613 } 614 }; 615 616 class ParReduceBody { 617 StatsCounters* my_stats; 618 size_t my_id; 619 bool my_exception; 620 tbb::task_group_context& tgc; 621 622 public: 623 ParReduceBody( StatsCounters& s_, tbb::task_group_context& context, bool e_ ) : 624 my_stats(&s_), my_exception(e_), tgc(context) { 625 my_id = my_stats->my_total_created++; 626 } 627 628 ParReduceBody( const ParReduceBody& lhs ) : tgc(lhs.tgc) { 629 my_stats = lhs.my_stats; 630 my_id = my_stats->my_total_created++; 631 } 632 633 ParReduceBody( ParReduceBody& lhs, tbb::split ) : tgc(lhs.tgc) { 634 my_stats = lhs.my_stats; 635 my_id = my_stats->my_total_created++; 636 } 637 638 ~ParReduceBody(){ ++my_stats->my_total_deleted; } 639 640 void operator()( const tbb::blocked_range<std::size_t>& /*range*/ ) const { 641 //Do nothing, except for one task (chosen arbitrarily) 642 if( my_id >= 12 ) { 643 if( my_exception ) 644 ThrowTestException(1); 645 else 646 tgc.cancel_group_execution(); 647 } 648 } 649 650 void join( ParReduceBody& /*rhs*/ ) {} 651 }; 652 653 void TestCancelation4() { 654 StatsCounters statsObj; 655 #if TBB_USE_EXCEPTIONS 656 try 657 #endif 658 { 659 tbb::task_group_context tgc1, tgc2; 660 ParReduceBody body_for_cancellation(statsObj, tgc1, false), body_for_exception(statsObj, tgc2, true); 661 tbb::parallel_reduce( tbb::blocked_range<std::size_t>(0,100000000,100), body_for_cancellation, tbb::simple_partitioner(), tgc1 ); 662 tbb::parallel_reduce( tbb::blocked_range<std::size_t>(0,100000000,100), body_for_exception, tbb::simple_partitioner(), tgc2 ); 663 } 664 #if TBB_USE_EXCEPTIONS 665 catch(...) {} 666 #endif 667 REQUIRE_MESSAGE ( statsObj.my_total_created==statsObj.my_total_deleted, "Not all parallel_reduce body objects created were reclaimed"); 668 } 669 670 //! Testing parallel_for and parallel_reduce cancellation 671 //! \brief \ref error_guessing 672 TEST_CASE("parallel_for and parallel_reduce cancellation test #1") { 673 for (auto concurrency_level: utils::concurrency_range()) { 674 g_NumThreads = static_cast<int>(concurrency_level); 675 g_Master = std::this_thread::get_id(); 676 if (g_NumThreads > 1) { 677 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 678 // Execute in all the possible modes 679 for ( size_t j = 0; j < 4; ++j ) { 680 g_ExceptionInMaster = (j & 1) != 0; 681 g_SolitaryException = (j & 2) != 0; 682 683 TestCancelation1(); 684 } 685 } 686 } 687 } 688 689 //! Testing parallel_for and parallel_reduce cancellation 690 //! \brief \ref error_guessing 691 TEST_CASE("parallel_for and parallel_reduce cancellation test #2") { 692 for (auto concurrency_level: utils::concurrency_range()) { 693 g_NumThreads = static_cast<int>(concurrency_level); 694 g_Master = std::this_thread::get_id(); 695 if (g_NumThreads > 1) { 696 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 697 // Execute in all the possible modes 698 for ( size_t j = 0; j < 4; ++j ) { 699 g_ExceptionInMaster = (j & 1) != 0; 700 g_SolitaryException = (j & 2) != 0; 701 702 TestCancelation2(); 703 } 704 } 705 } 706 } 707 708 //! Testing parallel_for and parallel_reduce cancellation 709 //! \brief \ref error_guessing 710 TEST_CASE("parallel_for and parallel_reduce cancellation test #3") { 711 for (auto concurrency_level: utils::concurrency_range()) { 712 g_NumThreads = static_cast<int>(concurrency_level); 713 g_Master = std::this_thread::get_id(); 714 if (g_NumThreads > 1) { 715 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 716 // Execute in all the possible modes 717 for ( size_t j = 0; j < 4; ++j ) { 718 g_ExceptionInMaster = (j & 1) != 0; 719 g_SolitaryException = (j & 2) != 0; 720 721 TestCancelation3(); 722 } 723 } 724 } 725 } 726 727 //! Testing parallel_for and parallel_reduce cancellation 728 //! \brief \ref error_guessing 729 TEST_CASE("parallel_for and parallel_reduce cancellation test #4") { 730 for (auto concurrency_level: utils::concurrency_range()) { 731 g_NumThreads = static_cast<int>(concurrency_level); 732 g_Master = std::this_thread::get_id(); 733 if (g_NumThreads > 1) { 734 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 735 // Execute in all the possible modes 736 for ( size_t j = 0; j < 4; ++j ) { 737 g_ExceptionInMaster = (j & 1) != 0; 738 g_SolitaryException = (j & 2) != 0; 739 740 TestCancelation4(); 741 } 742 } 743 } 744 } 745 746 //////////////////////////////////////////////////////////////////////////////// 747 // Tests for tbb::parallel_for_each 748 //////////////////////////////////////////////////////////////////////////////// 749 750 std::size_t get_iter_range_size() { 751 // Set the minimal iteration sequence size to 50 to improve test complexity on small machines 752 return std::max(50, g_NumThreads * 2); 753 } 754 755 template<typename Iterator> 756 struct adaptive_range { 757 std::vector<std::size_t> my_array; 758 759 adaptive_range(std::size_t size) : my_array(size + 1) {} 760 using iterator = Iterator; 761 762 iterator begin() const { 763 return iterator{&my_array.front()}; 764 } 765 iterator begin() { 766 return iterator{&my_array.front()}; 767 } 768 iterator end() const { 769 return iterator{&my_array.back()}; 770 } 771 iterator end() { 772 return iterator{&my_array.back()}; 773 } 774 }; 775 776 void Feed ( tbb::feeder<size_t> &feeder, size_t val ) { 777 if (g_FedTasksCount < 50) { 778 ++g_FedTasksCount; 779 feeder.add(val); 780 } 781 } 782 783 #define RunWithSimpleBody(func, body) \ 784 func<utils::ForwardIterator<size_t>, body>(); \ 785 func<utils::ForwardIterator<size_t>, body##WithFeeder>() 786 787 #define RunWithTemplatedBody(func, body) \ 788 func<utils::ForwardIterator<size_t>, body<utils::ForwardIterator<size_t> > >(); \ 789 func<utils::ForwardIterator<size_t>, body##WithFeeder<utils::ForwardIterator<size_t> > >() 790 791 #if TBB_USE_EXCEPTIONS 792 793 // Simple functor object with exception 794 class SimpleParForEachBody { 795 public: 796 void operator() ( size_t &value ) const { 797 ++g_CurExecuted; 798 if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true; 799 else g_NonMasterExecuted = true; 800 if( tbb::is_current_task_group_canceling() ) { 801 g_TGCCancelled.increment(); 802 } 803 utils::ConcurrencyTracker ct; 804 value += 1000; 805 WaitUntilConcurrencyPeaks(); 806 ThrowTestException(1); 807 } 808 }; 809 810 // Simple functor object with exception and feeder 811 class SimpleParForEachBodyWithFeeder : SimpleParForEachBody { 812 public: 813 void operator() ( size_t &value, tbb::feeder<size_t> &feeder ) const { 814 Feed(feeder, 0); 815 SimpleParForEachBody::operator()(value); 816 } 817 }; 818 819 // Tests exceptions without nesting 820 template <class Iterator, class simple_body> 821 void Test1_parallel_for_each () { 822 ResetGlobals(); 823 auto range = adaptive_range<Iterator>(get_iter_range_size()); 824 TRY(); 825 tbb::parallel_for_each(std::begin(range), std::end(range), simple_body() ); 826 CATCH_AND_ASSERT(); 827 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception"); 828 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived cancellation"); 829 REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test"); 830 if ( !g_SolitaryException ) 831 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception"); 832 833 } // void Test1_parallel_for_each () 834 835 template <class Iterator> 836 class OuterParForEachBody { 837 public: 838 void operator()( size_t& /*value*/ ) const { 839 ++g_OuterParCalls; 840 auto range = adaptive_range<Iterator>(get_iter_range_size()); 841 tbb::parallel_for_each(std::begin(range), std::end(range), SimpleParForEachBody()); 842 } 843 }; 844 845 template <class Iterator> 846 class OuterParForEachBodyWithFeeder : OuterParForEachBody<Iterator> { 847 public: 848 void operator()( size_t& value, tbb::feeder<size_t>& feeder ) const { 849 Feed(feeder, 0); 850 OuterParForEachBody<Iterator>::operator()(value); 851 } 852 }; 853 854 //! Uses parallel_for_each body containing an inner parallel_for_each with the default context not wrapped by a try-block. 855 /** Inner algorithms are spawned inside the new bound context by default. Since 856 exceptions thrown from the inner parallel_for_each are not handled by the caller 857 (outer parallel_for_each body) in this test, they will cancel all the sibling inner 858 algorithms. **/ 859 template <class Iterator, class outer_body> 860 void Test2_parallel_for_each () { 861 ResetGlobals(); 862 auto range = adaptive_range<Iterator>(get_iter_range_size()); 863 TRY(); 864 tbb::parallel_for_each(std::begin(range), std::end(range), outer_body() ); 865 CATCH_AND_ASSERT(); 866 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception"); 867 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived cancellation"); 868 REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test"); 869 if ( !g_SolitaryException ) 870 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception"); 871 } // void Test2_parallel_for_each () 872 873 template <class Iterator> 874 class OuterParForEachBodyWithIsolatedCtx { 875 public: 876 void operator()( size_t& /*value*/ ) const { 877 tbb::task_group_context ctx(tbb::task_group_context::isolated); 878 ++g_OuterParCalls; 879 auto range = adaptive_range<Iterator>(get_iter_range_size()); 880 tbb::parallel_for_each(std::begin(range), std::end(range), SimpleParForEachBody(), ctx); 881 } 882 }; 883 884 template <class Iterator> 885 class OuterParForEachBodyWithIsolatedCtxWithFeeder : OuterParForEachBodyWithIsolatedCtx<Iterator> { 886 public: 887 void operator()( size_t& value, tbb::feeder<size_t> &feeder ) const { 888 Feed(feeder, 0); 889 OuterParForEachBodyWithIsolatedCtx<Iterator>::operator()(value); 890 } 891 }; 892 893 //! Uses parallel_for_each body invoking an inner parallel_for_each with an isolated context without a try-block. 894 /** Even though exceptions thrown from the inner parallel_for_each are not handled 895 by the caller in this test, they will not affect sibling inner algorithms 896 already running because of the isolated contexts. However because the first 897 exception cancels the root parallel_for_each, at most the first g_NumThreads subranges 898 will be processed (which launch inner parallel_for_eachs) **/ 899 template <class Iterator, class outer_body> 900 void Test3_parallel_for_each () { 901 ResetGlobals(); 902 auto range = adaptive_range<Iterator>(get_iter_range_size()); 903 intptr_t innerCalls = get_iter_range_size(), 904 // The assumption here is the same as in outer parallel fors. 905 minExecuted = (g_NumThreads - 1) * innerCalls; 906 g_Master = std::this_thread::get_id(); 907 TRY(); 908 tbb::parallel_for_each(std::begin(range), std::end(range), outer_body()); 909 CATCH_AND_ASSERT(); 910 // figure actual number of expected executions given the number of outer PDos started. 911 minExecuted = (g_OuterParCalls - 1) * innerCalls; 912 // one extra thread may run a task that sees cancellation. Infrequent but possible 913 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception"); 914 if ( g_SolitaryException ) { 915 REQUIRE_MESSAGE (g_CurExecuted > minExecuted, "Too few tasks survived exception"); 916 REQUIRE_MESSAGE (g_CurExecuted <= minExecuted + (g_ExecutedAtLastCatch + g_NumThreads), "Too many tasks survived exception"); 917 } 918 REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test"); 919 if ( !g_SolitaryException ) 920 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception"); 921 } // void Test3_parallel_for_each () 922 923 template <class Iterator> 924 class OuterParForEachWithEhBody { 925 public: 926 void operator()( size_t& /*value*/ ) const { 927 tbb::task_group_context ctx(tbb::task_group_context::isolated); 928 ++g_OuterParCalls; 929 auto range = adaptive_range<Iterator>(get_iter_range_size()); 930 TRY(); 931 tbb::parallel_for_each(std::begin(range), std::end(range), SimpleParForEachBody(), ctx); 932 CATCH(); 933 } 934 }; 935 936 template <class Iterator> 937 class OuterParForEachWithEhBodyWithFeeder : OuterParForEachWithEhBody<Iterator> { 938 public: 939 void operator=(const OuterParForEachWithEhBodyWithFeeder&) = delete; 940 OuterParForEachWithEhBodyWithFeeder(const OuterParForEachWithEhBodyWithFeeder&) = default; 941 OuterParForEachWithEhBodyWithFeeder() = default; 942 void operator()( size_t &value, tbb::feeder<size_t> &feeder ) const { 943 Feed(feeder, 0); 944 OuterParForEachWithEhBody<Iterator>::operator()(value); 945 } 946 }; 947 948 //! Uses parallel_for body invoking an inner parallel_for (with default bound context) inside a try-block. 949 /** Since exception(s) thrown from the inner parallel_for are handled by the caller 950 in this test, they do not affect neither other tasks of the the root parallel_for 951 nor sibling inner algorithms. **/ 952 template <class Iterator, class outer_body_with_eh> 953 void Test4_parallel_for_each () { 954 ResetGlobals( true, true ); 955 auto range = adaptive_range<Iterator>(get_iter_range_size()); 956 g_Master = std::this_thread::get_id(); 957 TRY(); 958 tbb::parallel_for_each(std::begin(range), std::end(range), outer_body_with_eh()); 959 CATCH(); 960 REQUIRE_MESSAGE (!l_ExceptionCaughtAtCurrentLevel, "All exceptions must have been handled in the parallel_for_each body"); 961 intptr_t innerCalls = get_iter_range_size(), 962 outerCalls = get_iter_range_size() + g_FedTasksCount, 963 maxExecuted = outerCalls * innerCalls, 964 minExecuted = 0; 965 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception"); 966 if ( g_SolitaryException ) { 967 minExecuted = maxExecuted - innerCalls; 968 REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No exception registered"); 969 REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too few tasks executed"); 970 // This test has the same property as Test4 (parallel_for); the exception can be 971 // thrown, but some number of tasks from the outer Pdo can execute after the throw but 972 // before the cancellation is signaled (have seen 36). 973 DOCTEST_WARN_MESSAGE(g_CurExecuted < maxExecuted, "All tasks survived exception. Oversubscription?"); 974 } 975 else { 976 minExecuted = g_NumExceptionsCaught; 977 REQUIRE_MESSAGE ((g_NumExceptionsCaught > 1 && g_NumExceptionsCaught <= outerCalls), "Unexpected actual number of exceptions"); 978 REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too many executed tasks reported"); 979 REQUIRE_MESSAGE (g_CurExecuted < g_ExecutedAtLastCatch + g_NumThreads + outerCalls, "Too many tasks survived multiple exceptions"); 980 REQUIRE_MESSAGE (g_CurExecuted <= outerCalls * (1 + g_NumThreads), "Too many tasks survived exception"); 981 } 982 } // void Test4_parallel_for_each () 983 984 // This body throws an exception only if the task was added by feeder 985 class ParForEachBodyWithThrowingFeederTasks { 986 public: 987 //! This form of the function call operator can be used when the body needs to add more work during the processing 988 void operator() (const size_t &value, tbb::feeder<size_t> &feeder ) const { 989 ++g_CurExecuted; 990 if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true; 991 else g_NonMasterExecuted = true; 992 if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment(); 993 Feed(feeder, 1); 994 if (value == 1) 995 ThrowTestException(1); 996 } 997 }; // class ParForEachBodyWithThrowingFeederTasks 998 999 // Test exception in task, which was added by feeder. 1000 template <class Iterator> 1001 void Test5_parallel_for_each () { 1002 ResetGlobals(); 1003 auto range = adaptive_range<Iterator>(get_iter_range_size()); 1004 g_Master = std::this_thread::get_id(); 1005 TRY(); 1006 tbb::parallel_for_each(std::begin(range), std::end(range), ParForEachBodyWithThrowingFeederTasks()); 1007 CATCH(); 1008 if (g_SolitaryException) { 1009 // Failure occurs when g_ExceptionInMaster is false, but all the 1 values in the range 1010 // are handled by the external thread. In this case no throw occurs. 1011 REQUIRE_MESSAGE ((l_ExceptionCaughtAtCurrentLevel // we saw an exception 1012 || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) // non-external trhead throws but none tried 1013 || (g_ExceptionInMaster && !g_MasterExecutedThrow)) // external thread throws but external thread didn't try 1014 , "At least one exception should occur"); 1015 } 1016 } // void Test5_parallel_for_each () 1017 1018 //! Testing parallel_for_each exception handling 1019 //! \brief \ref error_guessing 1020 TEST_CASE("parallel_for_each exception handling test #1") { 1021 for (auto concurrency_level: utils::concurrency_range()) { 1022 g_NumThreads = static_cast<int>(concurrency_level); 1023 g_Master = std::this_thread::get_id(); 1024 if (g_NumThreads > 1) { 1025 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 1026 // Execute in all the possible modes 1027 for ( size_t j = 0; j < 4; ++j ) { 1028 g_ExceptionInMaster = (j & 1) != 0; 1029 g_SolitaryException = (j & 2) != 0; 1030 1031 RunWithSimpleBody(Test1_parallel_for_each, SimpleParForEachBody); 1032 } 1033 } 1034 } 1035 } 1036 1037 //! Testing parallel_for_each exception handling 1038 //! \brief \ref error_guessing 1039 TEST_CASE("parallel_for_each exception handling test #2") { 1040 for (auto concurrency_level: utils::concurrency_range()) { 1041 g_NumThreads = static_cast<int>(concurrency_level); 1042 g_Master = std::this_thread::get_id(); 1043 if (g_NumThreads > 1) { 1044 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 1045 // Execute in all the possible modes 1046 for ( size_t j = 0; j < 4; ++j ) { 1047 g_ExceptionInMaster = (j & 1) != 0; 1048 g_SolitaryException = (j & 2) != 0; 1049 1050 RunWithTemplatedBody(Test2_parallel_for_each, OuterParForEachBody); 1051 } 1052 } 1053 } 1054 } 1055 1056 //! Testing parallel_for_each exception handling 1057 //! \brief \ref error_guessing 1058 TEST_CASE("parallel_for_each exception handling test #3") { 1059 for (auto concurrency_level: utils::concurrency_range()) { 1060 g_NumThreads = static_cast<int>(concurrency_level); 1061 g_Master = std::this_thread::get_id(); 1062 if (g_NumThreads > 1) { 1063 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 1064 // Execute in all the possible modes 1065 for ( size_t j = 0; j < 4; ++j ) { 1066 g_ExceptionInMaster = (j & 1) != 0; 1067 g_SolitaryException = (j & 2) != 0; 1068 1069 RunWithTemplatedBody(Test3_parallel_for_each, OuterParForEachBodyWithIsolatedCtx); 1070 } 1071 } 1072 } 1073 } 1074 1075 //! Testing parallel_for_each exception handling 1076 //! \brief \ref error_guessing 1077 TEST_CASE("parallel_for_each exception handling test #4") { 1078 for (auto concurrency_level: utils::concurrency_range()) { 1079 g_NumThreads = static_cast<int>(concurrency_level); 1080 g_Master = std::this_thread::get_id(); 1081 if (g_NumThreads > 1) { 1082 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 1083 // Execute in all the possible modes 1084 for ( size_t j = 0; j < 4; ++j ) { 1085 g_ExceptionInMaster = (j & 1) != 0; 1086 g_SolitaryException = (j & 2) != 0; 1087 1088 RunWithTemplatedBody(Test4_parallel_for_each, OuterParForEachWithEhBody); 1089 } 1090 } 1091 } 1092 } 1093 1094 //! Testing parallel_for_each exception handling 1095 //! \brief \ref error_guessing 1096 TEST_CASE("parallel_for_each exception handling test #5") { 1097 for (auto concurrency_level: utils::concurrency_range()) { 1098 g_NumThreads = static_cast<int>(concurrency_level); 1099 g_Master = std::this_thread::get_id(); 1100 if (g_NumThreads > 1) { 1101 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 1102 // Execute in all the possible modes 1103 for ( size_t j = 0; j < 4; ++j ) { 1104 g_ExceptionInMaster = (j & 1) != 0; 1105 g_SolitaryException = (j & 2) != 0; 1106 1107 Test5_parallel_for_each<utils::InputIterator<size_t> >(); 1108 Test5_parallel_for_each<utils::ForwardIterator<size_t> >(); 1109 Test5_parallel_for_each<utils::RandomIterator<size_t> >(); 1110 } 1111 } 1112 } 1113 } 1114 1115 #endif /* TBB_USE_EXCEPTIONS */ 1116 1117 class ParForEachBodyToCancel { 1118 public: 1119 void operator()( size_t& /*value*/ ) const { 1120 ++g_CurExecuted; 1121 Cancellator::WaitUntilReady(); 1122 } 1123 }; 1124 1125 class ParForEachBodyToCancelWithFeeder : ParForEachBodyToCancel { 1126 public: 1127 void operator()( size_t& value, tbb::feeder<size_t> &feeder ) const { 1128 Feed(feeder, 0); 1129 ParForEachBodyToCancel::operator()(value); 1130 } 1131 }; 1132 1133 template<class B, class Iterator> 1134 class ParForEachWorker { 1135 tbb::task_group_context &my_ctx; 1136 public: 1137 void operator()() const { 1138 auto range = adaptive_range<Iterator>(get_iter_range_size()); 1139 tbb::parallel_for_each( std::begin(range), std::end(range), B(), my_ctx ); 1140 } 1141 1142 ParForEachWorker ( tbb::task_group_context& ctx ) : my_ctx(ctx) {} 1143 }; 1144 1145 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm). 1146 template <class Iterator, class body_to_cancel> 1147 void TestCancelation1_parallel_for_each () { 1148 ResetGlobals( false ); 1149 // Threshold should leave more then max_threads tasks to test the cancellation. Set the threshold to iter_range_size()/4 since iter_range_size >= max_threads*2 1150 intptr_t threshold = get_iter_range_size() / 4; 1151 tbb::task_group tg; 1152 tbb::task_group_context ctx; 1153 Cancellator cancellator(ctx, threshold); 1154 ParForEachWorker<body_to_cancel, Iterator> worker(ctx); 1155 tg.run( cancellator ); 1156 utils::yield(); 1157 tg.run( worker ); 1158 TRY(); 1159 tg.wait(); 1160 CATCH_AND_FAIL(); 1161 REQUIRE_MESSAGE (g_CurExecuted < g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks were executed after cancellation"); 1162 } 1163 1164 class ParForEachBodyToCancel2 { 1165 public: 1166 void operator()( size_t& /*value*/ ) const { 1167 ++g_CurExecuted; 1168 utils::ConcurrencyTracker ct; 1169 // The test will hang (and be timed out by the test system) if is_cancelled() is broken 1170 while( !tbb::is_current_task_group_canceling() ) 1171 utils::yield(); 1172 } 1173 }; 1174 1175 class ParForEachBodyToCancel2WithFeeder : ParForEachBodyToCancel2 { 1176 public: 1177 void operator()( size_t& value, tbb::feeder<size_t> &feeder ) const { 1178 Feed(feeder, 0); 1179 ParForEachBodyToCancel2::operator()(value); 1180 } 1181 }; 1182 1183 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm). 1184 /** This version also tests tbb::is_current_task_group_canceling() method. **/ 1185 template <class Iterator, class body_to_cancel> 1186 void TestCancelation2_parallel_for_each () { 1187 ResetGlobals(); 1188 RunCancellationTest<ParForEachWorker<body_to_cancel, Iterator>, Cancellator2>(); 1189 } 1190 1191 //! Testing parallel_for_each cancellation test 1192 //! \brief \ref error_guessing 1193 TEST_CASE("parallel_for_each cancellation test #1") { 1194 for (auto concurrency_level: utils::concurrency_range()) { 1195 g_NumThreads = static_cast<int>(concurrency_level); 1196 g_Master = std::this_thread::get_id(); 1197 if (g_NumThreads > 1) { 1198 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 1199 // Execute in all the possible modes 1200 for ( size_t j = 0; j < 4; ++j ) { 1201 g_ExceptionInMaster = (j & 1) != 0; 1202 g_SolitaryException = (j & 2) != 0; 1203 RunWithSimpleBody(TestCancelation1_parallel_for_each, ParForEachBodyToCancel); 1204 } 1205 } 1206 } 1207 } 1208 1209 //! Testing parallel_for_each cancellation test 1210 //! \brief \ref error_guessing 1211 TEST_CASE("parallel_for_each cancellation test #2") { 1212 for (auto concurrency_level: utils::concurrency_range()) { 1213 g_NumThreads = static_cast<int>(concurrency_level); 1214 g_Master = std::this_thread::get_id(); 1215 if (g_NumThreads > 1) { 1216 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 1217 // Execute in all the possible modes 1218 for ( size_t j = 0; j < 4; ++j ) { 1219 g_ExceptionInMaster = (j & 1) != 0; 1220 g_SolitaryException = (j & 2) != 0; 1221 1222 RunWithSimpleBody(TestCancelation2_parallel_for_each, ParForEachBodyToCancel2); 1223 } 1224 } 1225 } 1226 } 1227 1228 //////////////////////////////////////////////////////////////////////////////// 1229 // Tests for tbb::parallel_pipeline 1230 //////////////////////////////////////////////////////////////////////////////// 1231 int g_NumTokens = 0; 1232 1233 // Simple input filter class, it assigns 1 to all array members 1234 // It stops when it receives item equal to -1 1235 class InputFilter { 1236 mutable std::atomic<size_t> m_Item{}; 1237 mutable std::vector<size_t> m_Buffer; 1238 public: 1239 InputFilter() { 1240 m_Item = 0; 1241 m_Buffer.resize(get_iter_range_size()); 1242 for (size_t i = 0; i < get_iter_range_size(); ++i ) 1243 m_Buffer[i] = 1; 1244 } 1245 InputFilter(const InputFilter& other) : m_Item(other.m_Item.load()) { 1246 m_Buffer.resize(get_iter_range_size()); 1247 for (size_t i = 0; i < get_iter_range_size(); ++i ) 1248 m_Buffer[i] = other.m_Buffer[i]; 1249 } 1250 1251 void* operator()(tbb::flow_control& control) const { 1252 size_t item = m_Item++; 1253 if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true; 1254 else g_NonMasterExecuted = true; 1255 if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment(); 1256 if(item == 1) { 1257 ++g_PipelinesStarted; // count on emitting the first item. 1258 } 1259 if ( item >= get_iter_range_size() ) { 1260 control.stop(); 1261 return nullptr; 1262 } 1263 m_Buffer[item] = 1; 1264 return &m_Buffer[item]; 1265 } 1266 1267 size_t* buffer() { return m_Buffer.data(); } 1268 }; // class InputFilter 1269 1270 #if TBB_USE_EXCEPTIONS 1271 1272 // Simple filter with exception throwing. If parallel, will wait until 1273 // as many parallel filters start as there are threads. 1274 class SimpleFilter { 1275 bool m_canThrow; 1276 bool m_serial; 1277 public: 1278 SimpleFilter (bool canThrow, bool serial ) : m_canThrow(canThrow), m_serial(serial) {} 1279 void* operator()(void* item) const { 1280 ++g_CurExecuted; 1281 if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true; 1282 else g_NonMasterExecuted = true; 1283 if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment(); 1284 if ( m_canThrow ) { 1285 if ( !m_serial ) { 1286 utils::ConcurrencyTracker ct; 1287 WaitUntilConcurrencyPeaks( std::min(g_NumTokens, g_NumThreads) ); 1288 } 1289 ThrowTestException(1); 1290 } 1291 return item; 1292 } 1293 }; // class SimpleFilter 1294 1295 // This enumeration represents filters order in pipeline 1296 struct FilterSet { 1297 tbb::filter_mode mode1, mode2; 1298 bool throw1, throw2; 1299 1300 FilterSet( tbb::filter_mode m1, tbb::filter_mode m2, bool t1, bool t2 ) 1301 : mode1(m1), mode2(m2), throw1(t1), throw2(t2) 1302 {} 1303 }; // struct FilterSet 1304 1305 FilterSet serial_parallel( tbb::filter_mode::serial_in_order, tbb::filter_mode::parallel, /*throw1*/false, /*throw2*/true ); 1306 1307 template<typename InFilter, typename Filter> 1308 class CustomPipeline { 1309 InFilter inputFilter; 1310 Filter filter1; 1311 Filter filter2; 1312 FilterSet my_filters; 1313 public: 1314 CustomPipeline( const FilterSet& filters ) 1315 : filter1(filters.throw1, filters.mode1 != tbb::filter_mode::parallel), 1316 filter2(filters.throw2, filters.mode2 != tbb::filter_mode::parallel), 1317 my_filters(filters) 1318 {} 1319 1320 void run () { 1321 tbb::parallel_pipeline( 1322 g_NumTokens, 1323 tbb::make_filter<void, void*>(tbb::filter_mode::parallel, inputFilter) & 1324 tbb::make_filter<void*, void*>(my_filters.mode1, filter1) & 1325 tbb::make_filter<void*, void>(my_filters.mode2, filter2) 1326 ); 1327 } 1328 void run ( tbb::task_group_context& ctx ) { 1329 tbb::parallel_pipeline( 1330 g_NumTokens, 1331 tbb::make_filter<void, void*>(tbb::filter_mode::parallel, inputFilter) & 1332 tbb::make_filter<void*, void*>(my_filters.mode1, filter1) & 1333 tbb::make_filter<void*, void>(my_filters.mode2, filter2), 1334 ctx 1335 ); 1336 } 1337 }; 1338 1339 typedef CustomPipeline<InputFilter, SimpleFilter> SimplePipeline; 1340 1341 // Tests exceptions without nesting 1342 void Test1_pipeline ( const FilterSet& filters ) { 1343 ResetGlobals(); 1344 SimplePipeline testPipeline(filters); 1345 TRY(); 1346 testPipeline.run(); 1347 if ( g_CurExecuted == 2 * static_cast<int>(get_iter_range_size()) ) { 1348 // all the items were processed, though an exception was supposed to occur. 1349 if(!g_ExceptionInMaster && g_NonMasterExecutedThrow > 0) { 1350 // if !g_ExceptionInMaster, the external thread is not allowed to throw. 1351 // if g_nonMasterExcutedThrow > 0 then a thread besides the external thread tried to throw. 1352 REQUIRE_MESSAGE((filters.mode1 != tbb::filter_mode::parallel && filters.mode2 != tbb::filter_mode::parallel), 1353 "Unusual count"); 1354 } 1355 // In case of all serial filters they might be all executed in the thread(s) 1356 // where exceptions are not allowed by the common test logic. So we just quit. 1357 return; 1358 } 1359 CATCH_AND_ASSERT(); 1360 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception"); 1361 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception"); 1362 REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test"); 1363 if ( !g_SolitaryException ) 1364 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception"); 1365 1366 } // void Test1_pipeline () 1367 1368 // Filter with nesting 1369 class OuterFilter { 1370 public: 1371 OuterFilter ( bool, bool ) {} 1372 1373 void* operator()(void* item) const { 1374 ++g_OuterParCalls; 1375 SimplePipeline testPipeline(serial_parallel); 1376 testPipeline.run(); 1377 return item; 1378 } 1379 }; // class OuterFilter 1380 1381 //! Uses pipeline containing an inner pipeline with the default context not wrapped by a try-block. 1382 /** Inner algorithms are spawned inside the new bound context by default. Since 1383 exceptions thrown from the inner pipeline are not handled by the caller 1384 (outer pipeline body) in this test, they will cancel all the sibling inner 1385 algorithms. **/ 1386 void Test2_pipeline ( const FilterSet& filters ) { 1387 ResetGlobals(); 1388 g_NestedPipelines = true; 1389 CustomPipeline<InputFilter, OuterFilter> testPipeline(filters); 1390 TRY(); 1391 testPipeline.run(); 1392 CATCH_AND_ASSERT(); 1393 bool okayNoExceptionCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow); 1394 REQUIRE_MESSAGE ((g_NumExceptionsCaught == 1 || okayNoExceptionCaught), "No try_blocks in any body expected in this test"); 1395 if ( !g_SolitaryException ) { 1396 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception"); 1397 } 1398 } // void Test2_pipeline () 1399 1400 //! creates isolated inner pipeline and runs it. 1401 class OuterFilterWithIsolatedCtx { 1402 public: 1403 OuterFilterWithIsolatedCtx( bool , bool ) {} 1404 1405 void* operator()(void* item) const { 1406 ++g_OuterParCalls; 1407 tbb::task_group_context ctx(tbb::task_group_context::isolated); 1408 // create inner pipeline with serial input, parallel output filter, second filter throws 1409 SimplePipeline testPipeline(serial_parallel); 1410 testPipeline.run(ctx); 1411 return item; 1412 } 1413 }; // class OuterFilterWithIsolatedCtx 1414 1415 //! Uses pipeline invoking an inner pipeline with an isolated context without a try-block. 1416 /** Even though exceptions thrown from the inner pipeline are not handled 1417 by the caller in this test, they will not affect sibling inner algorithms 1418 already running because of the isolated contexts. However because the first 1419 exception cancels the root parallel_for_each only the first g_NumThreads subranges 1420 will be processed (which launch inner pipelines) **/ 1421 void Test3_pipeline ( const FilterSet& filters ) { 1422 for( int nTries = 1; nTries <= 4; ++nTries) { 1423 ResetGlobals(); 1424 g_NestedPipelines = true; 1425 g_Master = std::this_thread::get_id(); 1426 intptr_t innerCalls = get_iter_range_size(), 1427 minExecuted = (g_NumThreads - 1) * innerCalls; 1428 CustomPipeline<InputFilter, OuterFilterWithIsolatedCtx> testPipeline(filters); 1429 TRY(); 1430 testPipeline.run(); 1431 CATCH_AND_ASSERT(); 1432 1433 bool okayNoExceptionCaught = (g_ExceptionInMaster && !g_MasterExecuted) || 1434 (!g_ExceptionInMaster && !g_NonMasterExecuted); 1435 // only test assertions if the test threw an exception (or we don't care) 1436 bool testSucceeded = okayNoExceptionCaught || g_NumExceptionsCaught > 0; 1437 if(testSucceeded) { 1438 if (g_SolitaryException) { 1439 1440 // The test is one outer pipeline with two NestedFilters that each start an inner pipeline. 1441 // Each time the input filter of a pipeline delivers its first item, it increments 1442 // g_PipelinesStarted. When g_SolitaryException, the throw will not occur until 1443 // g_PipelinesStarted >= 3. (This is so at least a second pipeline in its own isolated 1444 // context will start; that is what we're testing.) 1445 // 1446 // There are two pipelines which will NOT run to completion when a solitary throw 1447 // happens in an isolated inner context: the outer pipeline and the pipeline which 1448 // throws. All the other pipelines which start should run to completion. But only 1449 // inner body invocations are counted. 1450 // 1451 // So g_CurExecuted should be about 1452 // 1453 // (2*get_iter_range_size()) * (g_PipelinesStarted - 2) + 1 1454 // ^ executions for each completed pipeline 1455 // ^ completing pipelines (remembering two will not complete) 1456 // ^ one for the inner throwing pipeline 1457 1458 minExecuted = (2*get_iter_range_size()) * (g_PipelinesStarted - 2) + 1; 1459 // each failing pipeline must execute at least two tasks 1460 REQUIRE_MESSAGE(g_CurExecuted >= minExecuted, "Too few tasks survived exception"); 1461 // no more than g_NumThreads tasks will be executed in a cancelled context. Otherwise 1462 // tasks not executing at throw were scheduled. 1463 g_TGCCancelled.validate(g_NumThreads, "Tasks not in-flight were executed"); 1464 REQUIRE_MESSAGE(g_NumExceptionsCaught == 1, "Should have only one exception"); 1465 // if we're only throwing from the external thread, and that thread didn't 1466 // participate in the pipelines, then no throw occurred. 1467 } 1468 REQUIRE_MESSAGE ((g_NumExceptionsCaught == 1 || okayNoExceptionCaught), "No try_blocks in any body expected in this test"); 1469 REQUIRE_MESSAGE (((g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads) || okayNoExceptionCaught), "Too many tasks survived exception"); 1470 return; 1471 } 1472 } 1473 } 1474 1475 class OuterFilterWithEhBody { 1476 public: 1477 OuterFilterWithEhBody( bool, bool ){} 1478 1479 void* operator()(void* item) const { 1480 tbb::task_group_context ctx(tbb::task_group_context::isolated); 1481 ++g_OuterParCalls; 1482 SimplePipeline testPipeline(serial_parallel); 1483 TRY(); 1484 testPipeline.run(ctx); 1485 CATCH(); 1486 return item; 1487 } 1488 }; // class OuterFilterWithEhBody 1489 1490 //! Uses pipeline body invoking an inner pipeline (with isolated context) inside a try-block. 1491 /** Since exception(s) thrown from the inner pipeline are handled by the caller 1492 in this test, they do not affect other tasks of the the root pipeline 1493 nor sibling inner algorithms. **/ 1494 void Test4_pipeline ( const FilterSet& filters ) { 1495 #if __GNUC__ && !__INTEL_COMPILER 1496 if ( strncmp(__VERSION__, "4.1.0", 5) == 0 ) { 1497 MESSAGE("Known issue: one of exception handling tests is skipped."); 1498 return; 1499 } 1500 #endif 1501 ResetGlobals( true, true ); 1502 // each outer pipeline stage will start get_iter_range_size() inner pipelines. 1503 // each inner pipeline that doesn't throw will process get_iter_range_size() items. 1504 // for solitary exception there will be one pipeline that only processes one stage, one item. 1505 // innerCalls should be 2*get_iter_range_size() 1506 intptr_t innerCalls = 2*get_iter_range_size(), 1507 outerCalls = 2*get_iter_range_size(), 1508 maxExecuted = outerCalls * innerCalls; // the number of invocations of the inner pipelines 1509 CustomPipeline<InputFilter, OuterFilterWithEhBody> testPipeline(filters); 1510 TRY(); 1511 testPipeline.run(); 1512 CATCH_AND_ASSERT(); 1513 intptr_t minExecuted = 0; 1514 bool okayNoExceptionCaught = (g_ExceptionInMaster && !g_MasterExecuted) || 1515 (!g_ExceptionInMaster && !g_NonMasterExecuted); 1516 if ( g_SolitaryException ) { 1517 minExecuted = maxExecuted - innerCalls; // one throwing inner pipeline 1518 REQUIRE_MESSAGE((g_NumExceptionsCaught == 1 || okayNoExceptionCaught), "No exception registered"); 1519 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception"); // probably will assert. 1520 } 1521 else { 1522 // we assume throwing pipelines will not count 1523 minExecuted = (outerCalls - g_NumExceptionsCaught) * innerCalls; 1524 REQUIRE_MESSAGE(((g_NumExceptionsCaught >= 1 && g_NumExceptionsCaught <= outerCalls) || okayNoExceptionCaught), "Unexpected actual number of exceptions"); 1525 REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too many executed tasks reported"); 1526 // too many already-scheduled tasks are started after the first exception is 1527 // thrown. And g_ExecutedAtLastCatch is updated every time an exception is caught. 1528 // So with multiple exceptions there are a variable number of tasks that have been 1529 // discarded because of the signals. 1530 // each throw is caught, so we will see many cancelled tasks. g_ExecutedAtLastCatch is 1531 // updated with each throw, so the value will be the number of tasks executed at the last 1532 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived multiple exceptions"); 1533 } 1534 } // void Test4_pipeline () 1535 1536 //! Tests pipeline function passed with different combination of filters 1537 template<void testFunc(const FilterSet&)> 1538 void TestWithDifferentFiltersAndConcurrency() { 1539 for (auto concurrency_level: utils::concurrency_range()) { 1540 g_NumThreads = static_cast<int>(concurrency_level); 1541 g_Master = std::this_thread::get_id(); 1542 if (g_NumThreads > 1) { 1543 // Execute in all the possible modes 1544 for ( size_t j = 0; j < 4; ++j ) { 1545 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 1546 g_ExceptionInMaster = (j & 1) != 0; 1547 g_SolitaryException = (j & 2) != 0; 1548 g_NumTokens = 2 * g_NumThreads; 1549 const int NumFilterTypes = 3; 1550 const tbb::filter_mode modes[NumFilterTypes] = { 1551 tbb::filter_mode::parallel, 1552 tbb::filter_mode::serial_in_order, 1553 tbb::filter_mode::serial_out_of_order 1554 }; 1555 1556 for ( int i = 0; i < NumFilterTypes; ++i ) { 1557 for ( int n = 0; n < NumFilterTypes; ++n ) { 1558 for ( int k = 0; k < 2; ++k ) 1559 testFunc( FilterSet(modes[i], modes[j], k == 0, k != 0) ); 1560 } 1561 } 1562 } 1563 } 1564 } 1565 } 1566 1567 //! Testing parallel_pipeline exception handling 1568 //! \brief \ref error_guessing 1569 TEST_CASE("parallel_pipeline exception handling test #1") { 1570 TestWithDifferentFiltersAndConcurrency<Test1_pipeline>(); 1571 } 1572 1573 //! Testing parallel_pipeline exception handling 1574 //! \brief \ref error_guessing 1575 TEST_CASE("parallel_pipeline exception handling test #2") { 1576 TestWithDifferentFiltersAndConcurrency<Test2_pipeline>(); 1577 } 1578 1579 //! Testing parallel_pipeline exception handling 1580 //! \brief \ref error_guessing 1581 TEST_CASE("parallel_pipeline exception handling test #3") { 1582 TestWithDifferentFiltersAndConcurrency<Test3_pipeline>(); 1583 } 1584 1585 //! Testing parallel_pipeline exception handling 1586 //! \brief \ref error_guessing 1587 TEST_CASE("parallel_pipeline exception handling test #4") { 1588 TestWithDifferentFiltersAndConcurrency<Test4_pipeline>(); 1589 } 1590 1591 #endif /* TBB_USE_EXCEPTIONS */ 1592 1593 class FilterToCancel { 1594 public: 1595 FilterToCancel() {} 1596 void* operator()(void* item) const { 1597 ++g_CurExecuted; 1598 Cancellator::WaitUntilReady(); 1599 return item; 1600 } 1601 }; // class FilterToCancel 1602 1603 template <class Filter_to_cancel> 1604 class PipelineLauncher { 1605 tbb::task_group_context &my_ctx; 1606 public: 1607 PipelineLauncher ( tbb::task_group_context& ctx ) : my_ctx(ctx) {} 1608 1609 void operator()() const { 1610 // Run test when serial filter is the first non-input filter 1611 InputFilter inputFilter; 1612 Filter_to_cancel filterToCancel; 1613 tbb::parallel_pipeline( 1614 g_NumTokens, 1615 tbb::make_filter<void, void*>(tbb::filter_mode::parallel, inputFilter) & 1616 tbb::make_filter<void*, void>(tbb::filter_mode::parallel, filterToCancel), 1617 my_ctx 1618 ); 1619 } 1620 }; 1621 1622 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm). 1623 void TestCancelation1_pipeline () { 1624 ResetGlobals(); 1625 g_ThrowException = false; 1626 // Threshold should leave more then max_threads tasks to test the cancellation. Set the threshold to iter_range_size()/4 since iter_range_size >= max_threads*2 1627 intptr_t threshold = get_iter_range_size() / 4; 1628 RunCancellationTest<PipelineLauncher<FilterToCancel>, Cancellator>(threshold); 1629 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived cancellation"); 1630 REQUIRE_MESSAGE (g_CurExecuted < g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks were executed after cancellation"); 1631 } 1632 1633 class FilterToCancel2 { 1634 public: 1635 FilterToCancel2() {} 1636 1637 void* operator()(void* item) const { 1638 ++g_CurExecuted; 1639 utils::ConcurrencyTracker ct; 1640 // The test will hang (and be timed out by the test system) if is_cancelled() is broken 1641 while( !tbb::is_current_task_group_canceling() ) 1642 utils::yield(); 1643 return item; 1644 } 1645 }; 1646 1647 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm). 1648 /** This version also tests task::is_cancelled() method. **/ 1649 void TestCancelation2_pipeline () { 1650 ResetGlobals(); 1651 RunCancellationTest<PipelineLauncher<FilterToCancel2>, Cancellator2>(); 1652 // g_CurExecuted is always >= g_ExecutedAtLastCatch, because the latter is always a snapshot of the 1653 // former, and g_CurExecuted is monotonic increasing. so the comparison should be at least ==. 1654 // If another filter is started after cancel but before cancellation is propagated, then the 1655 // number will be larger. 1656 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch, "Some tasks were executed after cancellation"); 1657 } 1658 1659 /** If min and max thread numbers specified on the command line are different, 1660 the test is run only for 2 sizes of the thread pool (MinThread and MaxThread) 1661 to be able to test the high and low contention modes while keeping the test reasonably fast **/ 1662 1663 //! Testing parallel_pipeline cancellation 1664 //! \brief \ref error_guessing 1665 TEST_CASE("parallel_pipeline cancellation test #1") { 1666 for (auto concurrency_level: utils::concurrency_range()) { 1667 g_NumThreads = static_cast<int>(concurrency_level); 1668 g_Master = std::this_thread::get_id(); 1669 if (g_NumThreads > 1) { 1670 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 1671 // Execute in all the possible modes 1672 for ( size_t j = 0; j < 4; ++j ) { 1673 g_ExceptionInMaster = (j & 1) != 0; 1674 g_SolitaryException = (j & 2) != 0; 1675 g_NumTokens = 2 * g_NumThreads; 1676 1677 TestCancelation1_pipeline(); 1678 } 1679 } 1680 } 1681 } 1682 1683 //! Testing parallel_pipeline cancellation 1684 //! \brief \ref error_guessing 1685 TEST_CASE("parallel_pipeline cancellation test #2") { 1686 for (auto concurrency_level: utils::concurrency_range()) { 1687 g_NumThreads = static_cast<int>(concurrency_level); 1688 g_Master = std::this_thread::get_id(); 1689 if (g_NumThreads > 1) { 1690 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 1691 // Execute in all the possible modes 1692 for ( size_t j = 0; j < 4; ++j ) { 1693 g_ExceptionInMaster = (j & 1) != 0; 1694 g_SolitaryException = (j & 2) != 0; 1695 g_NumTokens = 2 * g_NumThreads; 1696 1697 TestCancelation2_pipeline(); 1698 } 1699 } 1700 } 1701 } 1702