1 /* 2 Copyright (c) 2005-2020 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "common/test.h" 18 #include "common/concurrency_tracker.h" 19 #include "common/iterator.h" 20 #include "common/utils_concurrency_limit.h" 21 22 #include <limits.h> // for INT_MAX 23 #include <thread> 24 25 #include "tbb/parallel_for.h" 26 #include "tbb/parallel_reduce.h" 27 #include "tbb/parallel_for_each.h" 28 #include "tbb/parallel_pipeline.h" 29 #include "tbb/blocked_range.h" 30 #include "tbb/task_group.h" 31 #include "tbb/global_control.h" 32 #include "tbb/concurrent_unordered_map.h" 33 #include "tbb/task.h" 34 35 //! \file test_eh_algorithms.cpp 36 //! \brief Test for [algorithms.parallel_for algorithms.parallel_reduce algorithms.parallel_deterministic_reduce algorithms.parallel_for_each algorithms.parallel_pipeline algorithms.parallel_pipeline.flow_control] specifications 37 38 #define FLAT_RANGE 100000 39 #define FLAT_GRAIN 100 40 #define OUTER_RANGE 100 41 #define OUTER_GRAIN 10 42 #define INNER_RANGE (FLAT_RANGE / OUTER_RANGE) 43 #define INNER_GRAIN (FLAT_GRAIN / OUTER_GRAIN) 44 45 struct context_specific_counter { 46 tbb::concurrent_unordered_map<tbb::task_group_context*, std::atomic<unsigned>> context_map{}; 47 48 void increment() { 49 tbb::task_group_context* ctx = tbb::task::current_context(); 50 REQUIRE(ctx != nullptr); 51 context_map[ctx]++; 52 } 53 54 void reset() { 55 context_map.clear(); 56 } 57 58 void validate(unsigned expected_count, const char* msg) { 59 for (auto it = context_map.begin(); it != context_map.end(); it++) { 60 REQUIRE_MESSAGE( it->second <= expected_count, msg); 61 } 62 } 63 }; 64 65 std::atomic<intptr_t> g_FedTasksCount{}; // number of tasks added by parallel_for_each feeder 66 std::atomic<intptr_t> g_OuterParCalls{}; // number of actual invocations of the outer construct executed. 67 context_specific_counter g_TGCCancelled{}; // Number of times a task sees its group cancelled at start 68 69 #include "common/exception_handling.h" 70 71 /******************************** 72 Variables in test 73 74 __ Test control variables 75 g_ExceptionInMaster -- only the master thread is allowed to throw. If false, the master cannot throw 76 g_SolitaryException -- only one throw may be executed. 77 78 -- controls for ThrowTestException for pipeline tests 79 g_NestedPipelines -- are inner pipelines being run? 80 g_PipelinesStarted -- how many pipelines have run their first filter at least once. 81 82 -- Information variables 83 84 g_Master -- Thread ID of the "master" thread 85 In pipelines sometimes the master thread does not participate, so the tests have to be resilient to this. 86 87 -- Measurement variables 88 89 g_OuterParCalls -- how many outer parallel ranges or filters started 90 g_TGCCancelled -- how many inner parallel ranges or filters saw task::self().is_cancelled() 91 g_ExceptionsThrown -- number of throws executed (counted in ThrowTestException) 92 g_MasterExecutedThrow -- number of times master thread actually executed a throw 93 g_NonMasterExecutedThrow -- number of times non-master thread actually executed a throw 94 g_ExceptionCaught -- one of PropagatedException or unknown exception was caught. (Other exceptions cause assertions.) 95 96 -- Tallies for the task bodies which have executed (counted in each inner body, sampled in ThrowTestException) 97 g_CurExecuted -- total number of inner ranges or filters which executed 98 g_ExecutedAtLastCatch -- value of g_CurExecuted when last catch was made, 0 if none. 99 g_ExecutedAtFirstCatch -- value of g_CurExecuted when first catch is made, 0 if none. 100 *********************************/ 101 102 inline void ResetGlobals ( bool throwException = true, bool flog = false ) { 103 ResetEhGlobals( throwException, flog ); 104 g_FedTasksCount = 0; 105 g_OuterParCalls = 0; 106 g_NestedPipelines = false; 107 g_TGCCancelled.reset(); 108 } 109 110 //////////////////////////////////////////////////////////////////////////////// 111 // Tests for tbb::parallel_for and tbb::parallel_reduce 112 //////////////////////////////////////////////////////////////////////////////// 113 114 typedef size_t count_type; 115 typedef tbb::blocked_range<count_type> range_type; 116 117 inline intptr_t CountSubranges(range_type r) { 118 if(!r.is_divisible()) return intptr_t(1); 119 range_type r2(r,tbb::split()); 120 return CountSubranges(r) + CountSubranges(r2); 121 } 122 123 inline intptr_t NumSubranges ( intptr_t length, intptr_t grain ) { 124 return CountSubranges(range_type(0,length,grain)); 125 } 126 127 template<class Body> 128 intptr_t TestNumSubrangesCalculation ( intptr_t length, intptr_t grain, intptr_t inner_length, intptr_t inner_grain ) { 129 ResetGlobals(); 130 g_ThrowException = false; 131 intptr_t outerCalls = NumSubranges(length, grain), 132 innerCalls = NumSubranges(inner_length, inner_grain), 133 maxExecuted = outerCalls * (innerCalls + 1); 134 tbb::parallel_for( range_type(0, length, grain), Body() ); 135 REQUIRE_MESSAGE (g_CurExecuted == maxExecuted, "Wrong estimation of bodies invocation count"); 136 return maxExecuted; 137 } 138 139 class NoThrowParForBody { 140 public: 141 void operator()( const range_type& r ) const { 142 volatile count_type x = 0; 143 if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true; 144 else g_NonMasterExecuted = true; 145 if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment(); 146 count_type end = r.end(); 147 for( count_type i=r.begin(); i<end; ++i ) 148 x += i; 149 } 150 }; 151 152 #if TBB_USE_EXCEPTIONS 153 154 void Test0 () { 155 ResetGlobals(); 156 tbb::simple_partitioner p; 157 for( size_t i=0; i<10; ++i ) { 158 tbb::parallel_for( range_type(0, 0, 1), NoThrowParForBody() ); 159 tbb::parallel_for( range_type(0, 0, 1), NoThrowParForBody(), p ); 160 tbb::parallel_for( range_type(0, 128, 8), NoThrowParForBody() ); 161 tbb::parallel_for( range_type(0, 128, 8), NoThrowParForBody(), p ); 162 } 163 } // void Test0 () 164 165 //! Template that creates a functor suitable for parallel_reduce from a functor for parallel_for. 166 template<typename ParForBody> 167 class SimpleParReduceBody { 168 ParForBody m_Body; 169 public: 170 void operator=(const SimpleParReduceBody&) = delete; 171 SimpleParReduceBody(const SimpleParReduceBody&) = default; 172 SimpleParReduceBody() = default; 173 174 void operator()( const range_type& r ) const { m_Body(r); } 175 SimpleParReduceBody( SimpleParReduceBody& left, tbb::split ) : m_Body(left.m_Body) {} 176 void join( SimpleParReduceBody& /*right*/ ) {} 177 }; // SimpleParReduceBody 178 179 //! Test parallel_for and parallel_reduce for a given partitioner. 180 /** The Body need only be suitable for a parallel_for. */ 181 template<typename ParForBody, typename Partitioner> 182 void TestParallelLoopAux() { 183 Partitioner partitioner; 184 for( int i=0; i<2; ++i ) { 185 ResetGlobals(); 186 TRY(); 187 if( i==0 ) 188 tbb::parallel_for( range_type(0, FLAT_RANGE, FLAT_GRAIN), ParForBody(), partitioner ); 189 else { 190 SimpleParReduceBody<ParForBody> rb; 191 tbb::parallel_reduce( range_type(0, FLAT_RANGE, FLAT_GRAIN), rb, partitioner ); 192 } 193 CATCH_AND_ASSERT(); 194 // two cases: g_SolitaryException and !g_SolitaryException 195 // 1) g_SolitaryException: only one thread actually threw. There is only one context, so the exception 196 // (when caught) will cause that context to be cancelled. After this event, there may be one or 197 // more threads which are "in-flight", up to g_NumThreads, but no more will be started. The threads, 198 // when they start, if they see they are cancelled, TGCCancelled is incremented. 199 // 2) !g_SolitaryException: more than one thread can throw. The number of threads that actually 200 // threw is g_MasterExecutedThrow if only the master is allowed, else g_NonMasterExecutedThrow. 201 // Only one context, so TGCCancelled should be <= g_NumThreads. 202 // 203 // the reasoning is similar for nested algorithms in a single context (Test2). 204 // 205 // If a thread throws in a context, more than one subsequent task body may see the 206 // cancelled state (if they are scheduled before the state is propagated.) this is 207 // infrequent, but it occurs. So what was to be an assertion must be a remark. 208 g_TGCCancelled.validate( g_NumThreads, "Too many tasks ran after exception thrown"); 209 REQUIRE_MESSAGE(g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception"); 210 if ( g_SolitaryException ) { 211 REQUIRE_MESSAGE(g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test"); 212 REQUIRE_MESSAGE(g_NumExceptionsCaught == (g_ExceptionInMaster ? g_MasterExecutedThrow : g_NonMasterExecutedThrow), 213 "Not all throws were caught"); 214 REQUIRE_MESSAGE(g_ExecutedAtFirstCatch == g_ExecutedAtLastCatch, "Too many exceptions occurred"); 215 } 216 else { 217 REQUIRE_MESSAGE(g_NumExceptionsCaught >= 1, "No try blocks in any body expected in this test"); 218 } 219 } 220 } // TestParallelLoopAux 221 222 //! Test with parallel_for and parallel_reduce, over all three kinds of partitioners. 223 /** The Body only needs to be suitable for tbb::parallel_for. */ 224 template<typename Body> 225 void TestParallelLoop() { 226 // The simple and auto partitioners should be const, but not the affinity partitioner. 227 TestParallelLoopAux<Body, const tbb::simple_partitioner >(); 228 TestParallelLoopAux<Body, const tbb::auto_partitioner >(); 229 #define __TBB_TEMPORARILY_DISABLED 1 230 #if !__TBB_TEMPORARILY_DISABLED 231 // TODO: Improve the test so that it tolerates delayed start of tasks with affinity_partitioner 232 TestParallelLoopAux<Body, /***/ tbb::affinity_partitioner>(); 233 #endif 234 #undef __TBB_TEMPORARILY_DISABLED 235 } 236 237 class SimpleParForBody { 238 public: 239 void operator=(const SimpleParForBody&) = delete; 240 SimpleParForBody(const SimpleParForBody&) = default; 241 SimpleParForBody() = default; 242 243 void operator()( const range_type& r ) const { 244 utils::ConcurrencyTracker ct; 245 volatile long x = 0; 246 ++g_CurExecuted; 247 if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true; 248 else g_NonMasterExecuted = true; 249 if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment(); 250 for( count_type i = r.begin(); i != r.end(); ++i ) 251 x += 0; 252 WaitUntilConcurrencyPeaks(); 253 ThrowTestException(1); 254 } 255 }; 256 257 void Test1() { 258 // non-nested parallel_for/reduce with throwing body, one context 259 TestParallelLoop<SimpleParForBody>(); 260 } // void Test1 () 261 262 class OuterParForBody { 263 public: 264 void operator=(const OuterParForBody&) = delete; 265 OuterParForBody(const OuterParForBody&) = default; 266 OuterParForBody() = default; 267 void operator()( const range_type& ) const { 268 utils::ConcurrencyTracker ct; 269 ++g_OuterParCalls; 270 tbb::parallel_for( tbb::blocked_range<size_t>(0, INNER_RANGE, INNER_GRAIN), SimpleParForBody() ); 271 } 272 }; 273 274 //! Uses parallel_for body containing an inner parallel_for with the default context not wrapped by a try-block. 275 /** Inner algorithms are spawned inside the new bound context by default. Since 276 exceptions thrown from the inner parallel_for are not handled by the caller 277 (outer parallel_for body) in this test, they will cancel all the sibling inner 278 algorithms. **/ 279 void Test2 () { 280 TestParallelLoop<OuterParForBody>(); 281 } // void Test2 () 282 283 class OuterParForBodyWithIsolatedCtx { 284 public: 285 void operator()( const range_type& ) const { 286 tbb::task_group_context ctx(tbb::task_group_context::isolated); 287 ++g_OuterParCalls; 288 tbb::parallel_for( tbb::blocked_range<size_t>(0, INNER_RANGE, INNER_GRAIN), SimpleParForBody(), tbb::simple_partitioner(), ctx ); 289 } 290 }; 291 292 //! Uses parallel_for body invoking an inner parallel_for with an isolated context without a try-block. 293 /** Even though exceptions thrown from the inner parallel_for are not handled 294 by the caller in this test, they will not affect sibling inner algorithms 295 already running because of the isolated contexts. However because the first 296 exception cancels the root parallel_for only the first g_NumThreads subranges 297 will be processed (which launch inner parallel_fors) **/ 298 void Test3 () { 299 ResetGlobals(); 300 typedef OuterParForBodyWithIsolatedCtx body_type; 301 intptr_t innerCalls = NumSubranges(INNER_RANGE, INNER_GRAIN), 302 // we expect one thread to throw without counting, the rest to run to completion 303 // this formula assumes g_numThreads outer pfor ranges will be started, but that is not the 304 // case; the SimpleParFor subranges are started up as part of the outer ones, and when 305 // the amount of concurrency reaches g_NumThreads no more outer Pfor ranges are started. 306 // so we have to count the number of outer Pfors actually started. 307 minExecuted = (g_NumThreads - 1) * innerCalls; 308 TRY(); 309 tbb::parallel_for( range_type(0, OUTER_RANGE, OUTER_GRAIN), body_type() ); 310 CATCH_AND_ASSERT(); 311 minExecuted = (g_OuterParCalls - 1) * innerCalls; // see above 312 313 // The first formula above assumes all ranges of the outer parallel for are executed, and one 314 // cancels. In the event, we have a smaller number of ranges that start before the exception 315 // is caught. 316 // 317 // g_SolitaryException:One inner range throws. Outer parallel_For is cancelled, but sibling 318 // parallel_fors continue to completion (unless the threads that execute 319 // are not allowed to throw, in which case we will not see any exceptions). 320 // !g_SolitaryException:multiple inner ranges may throw. Any which throws will stop, and the 321 // corresponding range of the outer pfor will stop also. 322 // 323 // In either case, once the outer pfor gets the exception it will stop executing further ranges. 324 325 // if the only threads executing were not allowed to throw, then not seeing an exception is okay. 326 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecuted) || (!g_ExceptionInMaster && !g_NonMasterExecuted); 327 if ( g_SolitaryException ) { 328 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception"); 329 REQUIRE_MESSAGE (g_CurExecuted > minExecuted, "Too few tasks survived exception"); 330 REQUIRE_MESSAGE ((g_CurExecuted <= minExecuted + (g_ExecutedAtLastCatch + g_NumThreads)), "Too many tasks survived exception"); 331 REQUIRE_MESSAGE ((g_NumExceptionsCaught == 1 || okayNoExceptionsCaught), "No try_blocks in any body expected in this test"); 332 } 333 else { 334 REQUIRE_MESSAGE ((g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads), "Too many tasks survived exception"); 335 REQUIRE_MESSAGE ((g_NumExceptionsCaught >= 1 || okayNoExceptionsCaught), "No try_blocks in any body expected in this test"); 336 } 337 } // void Test3 () 338 339 class OuterParForExceptionSafeBody { 340 public: 341 void operator()( const range_type& ) const { 342 tbb::task_group_context ctx(tbb::task_group_context::isolated); 343 ++g_OuterParCalls; 344 TRY(); 345 tbb::parallel_for( tbb::blocked_range<size_t>(0, INNER_RANGE, INNER_GRAIN), SimpleParForBody(), tbb::simple_partitioner(), ctx ); 346 CATCH(); // this macro sets g_ExceptionCaught 347 } 348 }; 349 350 //! Uses parallel_for body invoking an inner parallel_for (with isolated context) inside a try-block. 351 /** Since exception(s) thrown from the inner parallel_for are handled by the caller 352 in this test, they do not affect neither other tasks of the the root parallel_for 353 nor sibling inner algorithms. **/ 354 void Test4 () { 355 ResetGlobals( true, true ); 356 intptr_t innerCalls = NumSubranges(INNER_RANGE, INNER_GRAIN), 357 outerCalls = NumSubranges(OUTER_RANGE, OUTER_GRAIN); 358 TRY(); 359 tbb::parallel_for( range_type(0, OUTER_RANGE, OUTER_GRAIN), OuterParForExceptionSafeBody() ); 360 CATCH(); 361 // g_SolitaryException : one inner pfor will throw, the rest will execute to completion. 362 // so the count should be (outerCalls -1) * innerCalls, if a throw happened. 363 // !g_SolitaryException : possible multiple inner pfor throws. Should be approximately 364 // (outerCalls - g_NumExceptionsCaught) * innerCalls, give or take a few 365 intptr_t minExecuted = (outerCalls - g_NumExceptionsCaught) * innerCalls; 366 bool okayNoExceptionsCaught = (g_ExceptionInMaster && !g_MasterExecuted) || (!g_ExceptionInMaster && !g_NonMasterExecuted); 367 if ( g_SolitaryException ) { 368 // only one task had exception thrown. That task had at least one execution (the one that threw). 369 // There may be an arbitrary number of ranges executed after the throw but before the exception 370 // is caught in the scheduler and cancellation is signaled. (seen 9, 11 and 62 (!) for 8 threads) 371 REQUIRE_MESSAGE ((g_NumExceptionsCaught == 1 || okayNoExceptionsCaught), "No exception registered"); 372 REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too few tasks executed"); 373 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception"); 374 // a small number of threads can execute in a throwing sub-pfor, if the task which is 375 // to do the solitary throw swaps out after registering its intent to throw but before it 376 // actually does so. As a result, the number of extra tasks cannot exceed the number of thread 377 // for each nested pfor invication) 378 REQUIRE_MESSAGE (g_CurExecuted <= minExecuted + (g_NumThreads-1)*g_NumThreads/2, "Too many tasks survived exception"); 379 } 380 else { 381 REQUIRE_MESSAGE (((g_NumExceptionsCaught >= 1 && g_NumExceptionsCaught <= outerCalls) || okayNoExceptionsCaught), "Unexpected actual number of exceptions"); 382 REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too few executed tasks reported"); 383 REQUIRE_MESSAGE ((g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads), "Too many tasks survived multiple exceptions"); 384 REQUIRE_MESSAGE (g_CurExecuted <= outerCalls * (1 + g_NumThreads), "Too many tasks survived exception"); 385 } 386 } // void Test4 () 387 388 //! Testing parallel_for and parallel_reduce exception handling 389 //! \brief \ref error_guessing 390 TEST_CASE("parallel_for and parallel_reduce exception handling test #0") { 391 for (auto concurrency_level: utils::concurrency_range()) { 392 g_NumThreads = static_cast<int>(concurrency_level); 393 g_Master = std::this_thread::get_id(); 394 if (g_NumThreads > 1) { 395 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 396 // Execute in all the possible modes 397 for ( size_t j = 0; j < 4; ++j ) { 398 g_ExceptionInMaster = (j & 1) != 0; 399 g_SolitaryException = (j & 2) != 0; 400 401 Test0(); 402 } 403 } 404 } 405 } 406 407 //! Testing parallel_for and parallel_reduce exception handling 408 //! \brief \ref error_guessing 409 TEST_CASE("parallel_for and parallel_reduce exception handling test #1") { 410 for (auto concurrency_level: utils::concurrency_range()) { 411 g_NumThreads = static_cast<int>(concurrency_level); 412 g_Master = std::this_thread::get_id(); 413 if (g_NumThreads > 1) { 414 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 415 // Execute in all the possible modes 416 for ( size_t j = 0; j < 4; ++j ) { 417 g_ExceptionInMaster = (j & 1) != 0; 418 g_SolitaryException = (j & 2) != 0; 419 420 Test1(); 421 } 422 } 423 } 424 } 425 426 //! Testing parallel_for and parallel_reduce exception handling 427 //! \brief \ref error_guessing 428 TEST_CASE("parallel_for and parallel_reduce exception handling test #2") { 429 for (auto concurrency_level: utils::concurrency_range()) { 430 g_NumThreads = static_cast<int>(concurrency_level); 431 g_Master = std::this_thread::get_id(); 432 if (g_NumThreads > 1) { 433 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 434 // Execute in all the possible modes 435 for ( size_t j = 0; j < 4; ++j ) { 436 g_ExceptionInMaster = (j & 1) != 0; 437 g_SolitaryException = (j & 2) != 0; 438 439 Test2(); 440 } 441 } 442 } 443 } 444 445 //! Testing parallel_for and parallel_reduce exception handling 446 //! \brief \ref error_guessing 447 TEST_CASE("parallel_for and parallel_reduce exception handling test #3") { 448 for (auto concurrency_level: utils::concurrency_range()) { 449 g_NumThreads = static_cast<int>(concurrency_level); 450 g_Master = std::this_thread::get_id(); 451 if (g_NumThreads > 1) { 452 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 453 // Execute in all the possible modes 454 for ( size_t j = 0; j < 4; ++j ) { 455 g_ExceptionInMaster = (j & 1) != 0; 456 g_SolitaryException = (j & 2) != 0; 457 458 Test3(); 459 } 460 } 461 } 462 } 463 464 //! Testing parallel_for and parallel_reduce exception handling 465 //! \brief \ref error_guessing 466 TEST_CASE("parallel_for and parallel_reduce exception handling test #4") { 467 for (auto concurrency_level: utils::concurrency_range()) { 468 g_NumThreads = static_cast<int>(concurrency_level); 469 g_Master = std::this_thread::get_id(); 470 if (g_NumThreads > 1) { 471 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 472 // Execute in all the possible modes 473 for ( size_t j = 0; j < 4; ++j ) { 474 g_ExceptionInMaster = (j & 1) != 0; 475 g_SolitaryException = (j & 2) != 0; 476 477 Test4(); 478 } 479 } 480 } 481 } 482 483 #endif /* TBB_USE_EXCEPTIONS */ 484 485 class ParForBodyToCancel { 486 public: 487 void operator()( const range_type& ) const { 488 ++g_CurExecuted; 489 Cancellator::WaitUntilReady(); 490 } 491 }; 492 493 template<class B> 494 class ParForLauncher { 495 tbb::task_group_context &my_ctx; 496 public: 497 void operator()() const { 498 tbb::parallel_for( range_type(0, FLAT_RANGE, FLAT_GRAIN), B(), tbb::simple_partitioner(), my_ctx ); 499 } 500 ParForLauncher ( tbb::task_group_context& ctx ) : my_ctx(ctx) {} 501 }; 502 503 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm). 504 void TestCancelation1 () { 505 ResetGlobals( false ); 506 RunCancellationTest<ParForLauncher<ParForBodyToCancel>, Cancellator>( NumSubranges(FLAT_RANGE, FLAT_GRAIN) / 4 ); 507 } 508 509 class Cancellator2 { 510 tbb::task_group_context &m_GroupToCancel; 511 512 public: 513 void operator()() const { 514 utils::ConcurrencyTracker ct; 515 WaitUntilConcurrencyPeaks(); 516 m_GroupToCancel.cancel_group_execution(); 517 g_ExecutedAtLastCatch = g_CurExecuted.load(); 518 } 519 520 Cancellator2 ( tbb::task_group_context& ctx, intptr_t ) : m_GroupToCancel(ctx) {} 521 }; 522 523 class ParForBodyToCancel2 { 524 public: 525 void operator()( const range_type& ) const { 526 ++g_CurExecuted; 527 utils::ConcurrencyTracker ct; 528 // The test will hang (and be timed out by the test system) if is_cancelled() is broken 529 while( !tbb::is_current_task_group_canceling() ) 530 std::this_thread::yield(); 531 } 532 }; 533 534 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm). 535 /** This version also tests tbb::is_current_task_group_canceling() method. **/ 536 void TestCancelation2 () { 537 ResetGlobals(); 538 RunCancellationTest<ParForLauncher<ParForBodyToCancel2>, Cancellator2>(); 539 REQUIRE_MESSAGE (g_ExecutedAtLastCatch < g_NumThreads, "Somehow worker tasks started their execution before the cancellator task"); 540 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived cancellation"); 541 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Some tasks were executed after cancellation"); 542 } 543 544 //////////////////////////////////////////////////////////////////////////////// 545 // Regression test based on the contribution by the author of the following forum post: 546 // http://softwarecommunity.intel.com/isn/Community/en-US/forums/thread/30254959.aspx 547 548 class Worker { 549 static const int max_nesting = 3; 550 static const int reduce_range = 1024; 551 static const int reduce_grain = 256; 552 public: 553 int DoWork (int level); 554 int Validate (int start_level) { 555 int expected = 1; // identity for multiplication 556 for(int i=start_level+1; i<max_nesting; ++i) 557 expected *= reduce_range; 558 return expected; 559 } 560 }; 561 562 class RecursiveParReduceBodyWithSharedWorker { 563 Worker * m_SharedWorker; 564 int m_NestingLevel; 565 int m_Result; 566 public: 567 RecursiveParReduceBodyWithSharedWorker ( RecursiveParReduceBodyWithSharedWorker& src, tbb::split ) 568 : m_SharedWorker(src.m_SharedWorker) 569 , m_NestingLevel(src.m_NestingLevel) 570 , m_Result(0) 571 {} 572 RecursiveParReduceBodyWithSharedWorker ( Worker *w, int outer ) 573 : m_SharedWorker(w) 574 , m_NestingLevel(outer) 575 , m_Result(0) 576 {} 577 578 void operator() ( const tbb::blocked_range<size_t>& r ) { 579 if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true; 580 else g_NonMasterExecuted = true; 581 if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment(); 582 for (size_t i = r.begin (); i != r.end (); ++i) { 583 m_Result += m_SharedWorker->DoWork (m_NestingLevel); 584 } 585 } 586 void join (const RecursiveParReduceBodyWithSharedWorker & x) { 587 m_Result += x.m_Result; 588 } 589 int result () { return m_Result; } 590 }; 591 592 int Worker::DoWork ( int level ) { 593 ++level; 594 if ( level < max_nesting ) { 595 RecursiveParReduceBodyWithSharedWorker rt (this, level); 596 tbb::parallel_reduce (tbb::blocked_range<size_t>(0, reduce_range, reduce_grain), rt); 597 return rt.result(); 598 } 599 else 600 return 1; 601 } 602 603 //! Regression test for hanging that occurred with the first version of cancellation propagation 604 void TestCancelation3 () { 605 Worker w; 606 int result = w.DoWork (0); 607 int expected = w.Validate(0); 608 REQUIRE_MESSAGE ( result == expected, "Wrong calculation result"); 609 } 610 611 struct StatsCounters { 612 std::atomic<size_t> my_total_created; 613 std::atomic<size_t> my_total_deleted; 614 StatsCounters() { 615 my_total_created = 0; 616 my_total_deleted = 0; 617 } 618 }; 619 620 class ParReduceBody { 621 StatsCounters* my_stats; 622 size_t my_id; 623 bool my_exception; 624 tbb::task_group_context& tgc; 625 626 public: 627 ParReduceBody( StatsCounters& s_, tbb::task_group_context& context, bool e_ ) : 628 my_stats(&s_), my_exception(e_), tgc(context) { 629 my_id = my_stats->my_total_created++; 630 } 631 632 ParReduceBody( const ParReduceBody& lhs ) : tgc(lhs.tgc) { 633 my_stats = lhs.my_stats; 634 my_id = my_stats->my_total_created++; 635 } 636 637 ParReduceBody( ParReduceBody& lhs, tbb::split ) : tgc(lhs.tgc) { 638 my_stats = lhs.my_stats; 639 my_id = my_stats->my_total_created++; 640 } 641 642 ~ParReduceBody(){ ++my_stats->my_total_deleted; } 643 644 void operator()( const tbb::blocked_range<std::size_t>& /*range*/ ) const { 645 //Do nothing, except for one task (chosen arbitrarily) 646 if( my_id >= 12 ) { 647 if( my_exception ) 648 ThrowTestException(1); 649 else 650 tgc.cancel_group_execution(); 651 } 652 } 653 654 void join( ParReduceBody& /*rhs*/ ) {} 655 }; 656 657 void TestCancelation4() { 658 StatsCounters statsObj; 659 #if TBB_USE_EXCEPTIONS 660 try 661 #endif 662 { 663 tbb::task_group_context tgc1, tgc2; 664 ParReduceBody body_for_cancellation(statsObj, tgc1, false), body_for_exception(statsObj, tgc2, true); 665 tbb::parallel_reduce( tbb::blocked_range<std::size_t>(0,100000000,100), body_for_cancellation, tbb::simple_partitioner(), tgc1 ); 666 tbb::parallel_reduce( tbb::blocked_range<std::size_t>(0,100000000,100), body_for_exception, tbb::simple_partitioner(), tgc2 ); 667 } 668 #if TBB_USE_EXCEPTIONS 669 catch(...) {} 670 #endif 671 REQUIRE_MESSAGE ( statsObj.my_total_created==statsObj.my_total_deleted, "Not all parallel_reduce body objects created were reclaimed"); 672 } 673 674 //! Testing parallel_for and parallel_reduce cancellation 675 //! \brief \ref error_guessing 676 TEST_CASE("parallel_for and parallel_reduce cancellation test #1") { 677 for (auto concurrency_level: utils::concurrency_range()) { 678 g_NumThreads = static_cast<int>(concurrency_level); 679 g_Master = std::this_thread::get_id(); 680 if (g_NumThreads > 1) { 681 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 682 // Execute in all the possible modes 683 for ( size_t j = 0; j < 4; ++j ) { 684 g_ExceptionInMaster = (j & 1) != 0; 685 g_SolitaryException = (j & 2) != 0; 686 687 TestCancelation1(); 688 } 689 } 690 } 691 } 692 693 //! Testing parallel_for and parallel_reduce cancellation 694 //! \brief \ref error_guessing 695 TEST_CASE("parallel_for and parallel_reduce cancellation test #2") { 696 for (auto concurrency_level: utils::concurrency_range()) { 697 g_NumThreads = static_cast<int>(concurrency_level); 698 g_Master = std::this_thread::get_id(); 699 if (g_NumThreads > 1) { 700 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 701 // Execute in all the possible modes 702 for ( size_t j = 0; j < 4; ++j ) { 703 g_ExceptionInMaster = (j & 1) != 0; 704 g_SolitaryException = (j & 2) != 0; 705 706 TestCancelation2(); 707 } 708 } 709 } 710 } 711 712 //! Testing parallel_for and parallel_reduce cancellation 713 //! \brief \ref error_guessing 714 TEST_CASE("parallel_for and parallel_reduce cancellation test #3") { 715 for (auto concurrency_level: utils::concurrency_range()) { 716 g_NumThreads = static_cast<int>(concurrency_level); 717 g_Master = std::this_thread::get_id(); 718 if (g_NumThreads > 1) { 719 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 720 // Execute in all the possible modes 721 for ( size_t j = 0; j < 4; ++j ) { 722 g_ExceptionInMaster = (j & 1) != 0; 723 g_SolitaryException = (j & 2) != 0; 724 725 TestCancelation3(); 726 } 727 } 728 } 729 } 730 731 //! Testing parallel_for and parallel_reduce cancellation 732 //! \brief \ref error_guessing 733 TEST_CASE("parallel_for and parallel_reduce cancellation test #4") { 734 for (auto concurrency_level: utils::concurrency_range()) { 735 g_NumThreads = static_cast<int>(concurrency_level); 736 g_Master = std::this_thread::get_id(); 737 if (g_NumThreads > 1) { 738 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 739 // Execute in all the possible modes 740 for ( size_t j = 0; j < 4; ++j ) { 741 g_ExceptionInMaster = (j & 1) != 0; 742 g_SolitaryException = (j & 2) != 0; 743 744 TestCancelation4(); 745 } 746 } 747 } 748 } 749 750 //////////////////////////////////////////////////////////////////////////////// 751 // Tests for tbb::parallel_for_each 752 //////////////////////////////////////////////////////////////////////////////// 753 754 #define ITER_RANGE 1000 755 #define ITEMS_TO_FEED 50 756 #define INNER_ITER_RANGE 100 757 #define OUTER_ITER_RANGE 50 758 759 #define PREPARE_RANGE(Iterator, rangeSize) \ 760 size_t test_vector[rangeSize + 1]; \ 761 for (int i =0; i < rangeSize; i++) \ 762 test_vector[i] = i; \ 763 Iterator begin(&test_vector[0]); \ 764 Iterator end(&test_vector[rangeSize]) 765 766 void Feed ( tbb::feeder<size_t> &feeder, size_t val ) { 767 if (g_FedTasksCount < ITEMS_TO_FEED) { 768 ++g_FedTasksCount; 769 feeder.add(val); 770 } 771 } 772 773 #define RunWithSimpleBody(func, body) \ 774 func<utils::ForwardIterator<size_t>, body>(); \ 775 func<utils::ForwardIterator<size_t>, body##WithFeeder>() 776 777 #define RunWithTemplatedBody(func, body) \ 778 func<utils::ForwardIterator<size_t>, body<utils::ForwardIterator<size_t> > >(); \ 779 func<utils::ForwardIterator<size_t>, body##WithFeeder<utils::ForwardIterator<size_t> > >() 780 781 #if TBB_USE_EXCEPTIONS 782 783 // Simple functor object with exception 784 class SimpleParForEachBody { 785 public: 786 void operator() ( size_t &value ) const { 787 ++g_CurExecuted; 788 if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true; 789 else g_NonMasterExecuted = true; 790 if( tbb::is_current_task_group_canceling() ) { 791 g_TGCCancelled.increment(); 792 } 793 utils::ConcurrencyTracker ct; 794 value += 1000; 795 WaitUntilConcurrencyPeaks(); 796 ThrowTestException(1); 797 } 798 }; 799 800 // Simple functor object with exception and feeder 801 class SimpleParForEachBodyWithFeeder : SimpleParForEachBody { 802 public: 803 void operator() ( size_t &value, tbb::feeder<size_t> &feeder ) const { 804 Feed(feeder, 0); 805 SimpleParForEachBody::operator()(value); 806 } 807 }; 808 809 // Tests exceptions without nesting 810 template <class Iterator, class simple_body> 811 void Test1_parallel_for_each () { 812 ResetGlobals(); 813 PREPARE_RANGE(Iterator, ITER_RANGE); 814 TRY(); 815 tbb::parallel_for_each<Iterator, simple_body>(begin, end, simple_body() ); 816 CATCH_AND_ASSERT(); 817 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception"); 818 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived cancellation"); 819 REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test"); 820 if ( !g_SolitaryException ) 821 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception"); 822 823 } // void Test1_parallel_for_each () 824 825 template <class Iterator> 826 class OuterParForEachBody { 827 public: 828 void operator()( size_t& /*value*/ ) const { 829 ++g_OuterParCalls; 830 PREPARE_RANGE(Iterator, INNER_ITER_RANGE); 831 tbb::parallel_for_each<Iterator, SimpleParForEachBody>(begin, end, SimpleParForEachBody()); 832 } 833 }; 834 835 template <class Iterator> 836 class OuterParForEachBodyWithFeeder : OuterParForEachBody<Iterator> { 837 public: 838 void operator()( size_t& value, tbb::feeder<size_t>& feeder ) const { 839 Feed(feeder, 0); 840 OuterParForEachBody<Iterator>::operator()(value); 841 } 842 }; 843 844 //! Uses parallel_for_each body containing an inner parallel_for_each with the default context not wrapped by a try-block. 845 /** Inner algorithms are spawned inside the new bound context by default. Since 846 exceptions thrown from the inner parallel_for_each are not handled by the caller 847 (outer parallel_for_each body) in this test, they will cancel all the sibling inner 848 algorithms. **/ 849 template <class Iterator, class outer_body> 850 void Test2_parallel_for_each () { 851 ResetGlobals(); 852 PREPARE_RANGE(Iterator, ITER_RANGE); 853 TRY(); 854 tbb::parallel_for_each<Iterator, outer_body >(begin, end, outer_body() ); 855 CATCH_AND_ASSERT(); 856 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception"); 857 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived cancellation"); 858 REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test"); 859 if ( !g_SolitaryException ) 860 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception"); 861 } // void Test2_parallel_for_each () 862 863 template <class Iterator> 864 class OuterParForEachBodyWithIsolatedCtx { 865 public: 866 void operator()( size_t& /*value*/ ) const { 867 tbb::task_group_context ctx(tbb::task_group_context::isolated); 868 ++g_OuterParCalls; 869 PREPARE_RANGE(Iterator, INNER_ITER_RANGE); 870 tbb::parallel_for_each<Iterator, SimpleParForEachBody>(begin, end, SimpleParForEachBody(), ctx); 871 } 872 }; 873 874 template <class Iterator> 875 class OuterParForEachBodyWithIsolatedCtxWithFeeder : OuterParForEachBodyWithIsolatedCtx<Iterator> { 876 public: 877 void operator()( size_t& value, tbb::feeder<size_t> &feeder ) const { 878 Feed(feeder, 0); 879 OuterParForEachBodyWithIsolatedCtx<Iterator>::operator()(value); 880 } 881 }; 882 883 //! Uses parallel_for_each body invoking an inner parallel_for_each with an isolated context without a try-block. 884 /** Even though exceptions thrown from the inner parallel_for_each are not handled 885 by the caller in this test, they will not affect sibling inner algorithms 886 already running because of the isolated contexts. However because the first 887 exception cancels the root parallel_for_each, at most the first g_NumThreads subranges 888 will be processed (which launch inner parallel_for_eachs) **/ 889 template <class Iterator, class outer_body> 890 void Test3_parallel_for_each () { 891 ResetGlobals(); 892 PREPARE_RANGE(Iterator, OUTER_ITER_RANGE); 893 intptr_t innerCalls = INNER_ITER_RANGE, 894 // The assumption here is the same as in outer parallel fors. 895 minExecuted = (g_NumThreads - 1) * innerCalls; 896 g_Master = std::this_thread::get_id(); 897 TRY(); 898 tbb::parallel_for_each<Iterator, outer_body >(begin, end, outer_body()); 899 CATCH_AND_ASSERT(); 900 // figure actual number of expected executions given the number of outer PDos started. 901 minExecuted = (g_OuterParCalls - 1) * innerCalls; 902 // one extra thread may run a task that sees cancellation. Infrequent but possible 903 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception"); 904 if ( g_SolitaryException ) { 905 REQUIRE_MESSAGE (g_CurExecuted > minExecuted, "Too few tasks survived exception"); 906 REQUIRE_MESSAGE (g_CurExecuted <= minExecuted + (g_ExecutedAtLastCatch + g_NumThreads), "Too many tasks survived exception"); 907 } 908 REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test"); 909 if ( !g_SolitaryException ) 910 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception"); 911 } // void Test3_parallel_for_each () 912 913 template <class Iterator> 914 class OuterParForEachWithEhBody { 915 public: 916 void operator()( size_t& /*value*/ ) const { 917 tbb::task_group_context ctx(tbb::task_group_context::isolated); 918 ++g_OuterParCalls; 919 PREPARE_RANGE(Iterator, INNER_ITER_RANGE); 920 TRY(); 921 tbb::parallel_for_each<Iterator, SimpleParForEachBody>(begin, end, SimpleParForEachBody(), ctx); 922 CATCH(); 923 } 924 }; 925 926 template <class Iterator> 927 class OuterParForEachWithEhBodyWithFeeder : OuterParForEachWithEhBody<Iterator> { 928 public: 929 void operator=(const OuterParForEachWithEhBodyWithFeeder&) = delete; 930 OuterParForEachWithEhBodyWithFeeder(const OuterParForEachWithEhBodyWithFeeder&) = default; 931 OuterParForEachWithEhBodyWithFeeder() = default; 932 void operator()( size_t &value, tbb::feeder<size_t> &feeder ) const { 933 Feed(feeder, 0); 934 OuterParForEachWithEhBody<Iterator>::operator()(value); 935 } 936 }; 937 938 //! Uses parallel_for body invoking an inner parallel_for (with default bound context) inside a try-block. 939 /** Since exception(s) thrown from the inner parallel_for are handled by the caller 940 in this test, they do not affect neither other tasks of the the root parallel_for 941 nor sibling inner algorithms. **/ 942 template <class Iterator, class outer_body_with_eh> 943 void Test4_parallel_for_each () { 944 ResetGlobals( true, true ); 945 PREPARE_RANGE(Iterator, OUTER_ITER_RANGE); 946 g_Master = std::this_thread::get_id(); 947 TRY(); 948 tbb::parallel_for_each<Iterator, outer_body_with_eh>(begin, end, outer_body_with_eh()); 949 CATCH(); 950 REQUIRE_MESSAGE (!l_ExceptionCaughtAtCurrentLevel, "All exceptions must have been handled in the parallel_for_each body"); 951 intptr_t innerCalls = INNER_ITER_RANGE, 952 outerCalls = OUTER_ITER_RANGE + g_FedTasksCount, 953 maxExecuted = outerCalls * innerCalls, 954 minExecuted = 0; 955 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception"); 956 if ( g_SolitaryException ) { 957 minExecuted = maxExecuted - innerCalls; 958 REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No exception registered"); 959 REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too few tasks executed"); 960 // This test has the same property as Test4 (parallel_for); the exception can be 961 // thrown, but some number of tasks from the outer Pdo can execute after the throw but 962 // before the cancellation is signaled (have seen 36). 963 DOCTEST_WARN_MESSAGE(g_CurExecuted < maxExecuted, "All tasks survived exception. Oversubscription?"); 964 } 965 else { 966 minExecuted = g_NumExceptionsCaught; 967 REQUIRE_MESSAGE ((g_NumExceptionsCaught > 1 && g_NumExceptionsCaught <= outerCalls), "Unexpected actual number of exceptions"); 968 REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too many executed tasks reported"); 969 REQUIRE_MESSAGE (g_CurExecuted < g_ExecutedAtLastCatch + g_NumThreads + outerCalls, "Too many tasks survived multiple exceptions"); 970 REQUIRE_MESSAGE (g_CurExecuted <= outerCalls * (1 + g_NumThreads), "Too many tasks survived exception"); 971 } 972 } // void Test4_parallel_for_each () 973 974 // This body throws an exception only if the task was added by feeder 975 class ParForEachBodyWithThrowingFeederTasks { 976 public: 977 //! This form of the function call operator can be used when the body needs to add more work during the processing 978 void operator() ( size_t &value, tbb::feeder<size_t> &feeder ) const { 979 ++g_CurExecuted; 980 if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true; 981 else g_NonMasterExecuted = true; 982 if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment(); 983 Feed(feeder, 1); 984 if (value == 1) 985 ThrowTestException(1); 986 } 987 }; // class ParForEachBodyWithThrowingFeederTasks 988 989 // Test exception in task, which was added by feeder. 990 template <class Iterator> 991 void Test5_parallel_for_each () { 992 ResetGlobals(); 993 PREPARE_RANGE(Iterator, ITER_RANGE); 994 g_Master = std::this_thread::get_id(); 995 TRY(); 996 tbb::parallel_for_each<Iterator, ParForEachBodyWithThrowingFeederTasks>(begin, end, ParForEachBodyWithThrowingFeederTasks()); 997 CATCH(); 998 if (g_SolitaryException) { 999 // Failure occurs when g_ExceptionInMaster is false, but all the 1 values in the range 1000 // are handled by the master thread. In this case no throw occurs. 1001 REQUIRE_MESSAGE ((l_ExceptionCaughtAtCurrentLevel // we saw an exception 1002 || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow) // non-master throws but none tried 1003 || (g_ExceptionInMaster && !g_MasterExecutedThrow)) // master throws but master didn't try 1004 , "At least one exception should occur"); 1005 } 1006 } // void Test5_parallel_for_each () 1007 1008 //! Testing parallel_for_each exception handling 1009 //! \brief \ref error_guessing 1010 TEST_CASE("parallel_for_each exception handling test #1") { 1011 for (auto concurrency_level: utils::concurrency_range()) { 1012 g_NumThreads = static_cast<int>(concurrency_level); 1013 g_Master = std::this_thread::get_id(); 1014 if (g_NumThreads > 1) { 1015 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 1016 // Execute in all the possible modes 1017 for ( size_t j = 0; j < 4; ++j ) { 1018 g_ExceptionInMaster = (j & 1) != 0; 1019 g_SolitaryException = (j & 2) != 0; 1020 1021 RunWithSimpleBody(Test1_parallel_for_each, SimpleParForEachBody); 1022 } 1023 } 1024 } 1025 } 1026 1027 //! Testing parallel_for_each exception handling 1028 //! \brief \ref error_guessing 1029 TEST_CASE("parallel_for_each exception handling test #2") { 1030 for (auto concurrency_level: utils::concurrency_range()) { 1031 g_NumThreads = static_cast<int>(concurrency_level); 1032 g_Master = std::this_thread::get_id(); 1033 if (g_NumThreads > 1) { 1034 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 1035 // Execute in all the possible modes 1036 for ( size_t j = 0; j < 4; ++j ) { 1037 g_ExceptionInMaster = (j & 1) != 0; 1038 g_SolitaryException = (j & 2) != 0; 1039 1040 RunWithTemplatedBody(Test2_parallel_for_each, OuterParForEachBody); 1041 } 1042 } 1043 } 1044 } 1045 1046 //! Testing parallel_for_each exception handling 1047 //! \brief \ref error_guessing 1048 TEST_CASE("parallel_for_each exception handling test #3") { 1049 for (auto concurrency_level: utils::concurrency_range()) { 1050 g_NumThreads = static_cast<int>(concurrency_level); 1051 g_Master = std::this_thread::get_id(); 1052 if (g_NumThreads > 1) { 1053 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 1054 // Execute in all the possible modes 1055 for ( size_t j = 0; j < 4; ++j ) { 1056 g_ExceptionInMaster = (j & 1) != 0; 1057 g_SolitaryException = (j & 2) != 0; 1058 1059 RunWithTemplatedBody(Test3_parallel_for_each, OuterParForEachBodyWithIsolatedCtx); 1060 } 1061 } 1062 } 1063 } 1064 1065 //! Testing parallel_for_each exception handling 1066 //! \brief \ref error_guessing 1067 TEST_CASE("parallel_for_each exception handling test #4") { 1068 for (auto concurrency_level: utils::concurrency_range()) { 1069 g_NumThreads = static_cast<int>(concurrency_level); 1070 g_Master = std::this_thread::get_id(); 1071 if (g_NumThreads > 1) { 1072 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 1073 // Execute in all the possible modes 1074 for ( size_t j = 0; j < 4; ++j ) { 1075 g_ExceptionInMaster = (j & 1) != 0; 1076 g_SolitaryException = (j & 2) != 0; 1077 1078 RunWithTemplatedBody(Test4_parallel_for_each, OuterParForEachWithEhBody); 1079 } 1080 } 1081 } 1082 } 1083 1084 //! Testing parallel_for_each exception handling 1085 //! \brief \ref error_guessing 1086 TEST_CASE("parallel_for_each exception handling test #5") { 1087 for (auto concurrency_level: utils::concurrency_range()) { 1088 g_NumThreads = static_cast<int>(concurrency_level); 1089 g_Master = std::this_thread::get_id(); 1090 if (g_NumThreads > 1) { 1091 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 1092 // Execute in all the possible modes 1093 for ( size_t j = 0; j < 4; ++j ) { 1094 g_ExceptionInMaster = (j & 1) != 0; 1095 g_SolitaryException = (j & 2) != 0; 1096 1097 Test5_parallel_for_each<utils::InputIterator<size_t> >(); 1098 Test5_parallel_for_each<utils::ForwardIterator<size_t> >(); 1099 Test5_parallel_for_each<utils::RandomIterator<size_t> >(); 1100 } 1101 } 1102 } 1103 } 1104 1105 #endif /* TBB_USE_EXCEPTIONS */ 1106 1107 class ParForEachBodyToCancel { 1108 public: 1109 void operator()( size_t& /*value*/ ) const { 1110 ++g_CurExecuted; 1111 Cancellator::WaitUntilReady(); 1112 } 1113 }; 1114 1115 class ParForEachBodyToCancelWithFeeder : ParForEachBodyToCancel { 1116 public: 1117 void operator()( size_t& value, tbb::feeder<size_t> &feeder ) const { 1118 Feed(feeder, 0); 1119 ParForEachBodyToCancel::operator()(value); 1120 } 1121 }; 1122 1123 template<class B, class Iterator> 1124 class ParForEachWorker { 1125 tbb::task_group_context &my_ctx; 1126 public: 1127 void operator()() const { 1128 PREPARE_RANGE(Iterator, INNER_ITER_RANGE); 1129 tbb::parallel_for_each<Iterator, B>( begin, end, B(), my_ctx ); 1130 } 1131 1132 ParForEachWorker ( tbb::task_group_context& ctx ) : my_ctx(ctx) {} 1133 }; 1134 1135 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm). 1136 template <class Iterator, class body_to_cancel> 1137 void TestCancelation1_parallel_for_each () { 1138 ResetGlobals( false ); 1139 intptr_t threshold = 10; 1140 tbb::task_group tg; 1141 tbb::task_group_context ctx; 1142 Cancellator cancellator(ctx, threshold); 1143 ParForEachWorker<body_to_cancel, Iterator> worker(ctx); 1144 tg.run( cancellator ); 1145 std::this_thread::yield(); 1146 tg.run( worker ); 1147 TRY(); 1148 tg.wait(); 1149 CATCH_AND_FAIL(); 1150 REQUIRE_MESSAGE (g_CurExecuted < g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks were executed after cancellation"); 1151 } 1152 1153 class ParForEachBodyToCancel2 { 1154 public: 1155 void operator()( size_t& /*value*/ ) const { 1156 ++g_CurExecuted; 1157 utils::ConcurrencyTracker ct; 1158 // The test will hang (and be timed out by the test system) if is_cancelled() is broken 1159 while( !tbb::is_current_task_group_canceling() ) 1160 std::this_thread::yield(); 1161 } 1162 }; 1163 1164 class ParForEachBodyToCancel2WithFeeder : ParForEachBodyToCancel2 { 1165 public: 1166 void operator()( size_t& value, tbb::feeder<size_t> &feeder ) const { 1167 Feed(feeder, 0); 1168 ParForEachBodyToCancel2::operator()(value); 1169 } 1170 }; 1171 1172 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm). 1173 /** This version also tests tbb::is_current_task_group_canceling() method. **/ 1174 template <class Iterator, class body_to_cancel> 1175 void TestCancelation2_parallel_for_each () { 1176 ResetGlobals(); 1177 RunCancellationTest<ParForEachWorker<body_to_cancel, Iterator>, Cancellator2>(); 1178 } 1179 1180 //! Testing parallel_for_each cancellation test 1181 //! \brief \ref error_guessing 1182 TEST_CASE("parallel_for_each cancellation test #1") { 1183 for (auto concurrency_level: utils::concurrency_range()) { 1184 g_NumThreads = static_cast<int>(concurrency_level); 1185 g_Master = std::this_thread::get_id(); 1186 if (g_NumThreads > 1) { 1187 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 1188 // Execute in all the possible modes 1189 for ( size_t j = 0; j < 4; ++j ) { 1190 g_ExceptionInMaster = (j & 1) != 0; 1191 g_SolitaryException = (j & 2) != 0; 1192 1193 RunWithSimpleBody(TestCancelation1_parallel_for_each, ParForEachBodyToCancel); 1194 } 1195 } 1196 } 1197 } 1198 1199 //! Testing parallel_for_each cancellation test 1200 //! \brief \ref error_guessing 1201 TEST_CASE("parallel_for_each cancellation test #2") { 1202 for (auto concurrency_level: utils::concurrency_range()) { 1203 g_NumThreads = static_cast<int>(concurrency_level); 1204 g_Master = std::this_thread::get_id(); 1205 if (g_NumThreads > 1) { 1206 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 1207 // Execute in all the possible modes 1208 for ( size_t j = 0; j < 4; ++j ) { 1209 g_ExceptionInMaster = (j & 1) != 0; 1210 g_SolitaryException = (j & 2) != 0; 1211 1212 RunWithSimpleBody(TestCancelation2_parallel_for_each, ParForEachBodyToCancel2); 1213 } 1214 } 1215 } 1216 } 1217 1218 //////////////////////////////////////////////////////////////////////////////// 1219 // Tests for tbb::parallel_pipeline 1220 //////////////////////////////////////////////////////////////////////////////// 1221 #define NUM_ITEMS 100 1222 1223 const size_t c_DataEndTag = size_t(~0); 1224 1225 int g_NumTokens = 0; 1226 1227 // Simple input filter class, it assigns 1 to all array members 1228 // It stops when it receives item equal to -1 1229 class InputFilter { 1230 mutable std::atomic<size_t> m_Item{}; 1231 mutable size_t m_Buffer[NUM_ITEMS + 1]; 1232 public: 1233 InputFilter() { 1234 m_Item = 0; 1235 for (size_t i = 0; i < NUM_ITEMS; ++i ) 1236 m_Buffer[i] = 1; 1237 m_Buffer[NUM_ITEMS] = c_DataEndTag; 1238 } 1239 InputFilter(const InputFilter& other) : m_Item(other.m_Item.load()) { 1240 for (size_t i = 0; i < NUM_ITEMS; ++i ) 1241 m_Buffer[i] = other.m_Buffer[i]; 1242 } 1243 1244 void* operator()(tbb::flow_control& control) const { 1245 size_t item = m_Item++; 1246 if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true; 1247 else g_NonMasterExecuted = true; 1248 if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment(); 1249 if(item == 1) { 1250 ++g_PipelinesStarted; // count on emitting the first item. 1251 } 1252 if ( item >= NUM_ITEMS ) { 1253 control.stop(); 1254 return nullptr; 1255 } 1256 m_Buffer[item] = 1; 1257 return &m_Buffer[item]; 1258 } 1259 1260 size_t* buffer() { return m_Buffer; } 1261 }; // class InputFilter 1262 1263 #if TBB_USE_EXCEPTIONS 1264 1265 // Simple filter with exception throwing. If parallel, will wait until 1266 // as many parallel filters start as there are threads. 1267 class SimpleFilter { 1268 bool m_canThrow; 1269 bool m_serial; 1270 public: 1271 SimpleFilter (bool canThrow, bool serial ) : m_canThrow(canThrow), m_serial(serial) {} 1272 void* operator()(void* item) const { 1273 ++g_CurExecuted; 1274 if(g_Master == std::this_thread::get_id()) g_MasterExecuted = true; 1275 else g_NonMasterExecuted = true; 1276 if( tbb::is_current_task_group_canceling() ) g_TGCCancelled.increment(); 1277 if ( m_canThrow ) { 1278 if ( !m_serial ) { 1279 utils::ConcurrencyTracker ct; 1280 WaitUntilConcurrencyPeaks( std::min(g_NumTokens, g_NumThreads) ); 1281 } 1282 ThrowTestException(1); 1283 } 1284 return item; 1285 } 1286 }; // class SimpleFilter 1287 1288 // This enumeration represents filters order in pipeline 1289 struct FilterSet { 1290 tbb::filter_mode mode1, mode2; 1291 bool throw1, throw2; 1292 1293 FilterSet( tbb::filter_mode m1, tbb::filter_mode m2, bool t1, bool t2 ) 1294 : mode1(m1), mode2(m2), throw1(t1), throw2(t2) 1295 {} 1296 }; // struct FilterSet 1297 1298 FilterSet serial_parallel( tbb::filter_mode::serial_in_order, tbb::filter_mode::parallel, /*throw1*/false, /*throw2*/true ); 1299 1300 template<typename InFilter, typename Filter> 1301 class CustomPipeline { 1302 InFilter inputFilter; 1303 Filter filter1; 1304 Filter filter2; 1305 FilterSet my_filters; 1306 public: 1307 CustomPipeline( const FilterSet& filters ) 1308 : filter1(filters.throw1, filters.mode1 != tbb::filter_mode::parallel), 1309 filter2(filters.throw2, filters.mode2 != tbb::filter_mode::parallel), 1310 my_filters(filters) 1311 {} 1312 1313 void run () { 1314 tbb::parallel_pipeline( 1315 g_NumTokens, 1316 tbb::make_filter<void, void*>(tbb::filter_mode::parallel, inputFilter) & 1317 tbb::make_filter<void*, void*>(my_filters.mode1, filter1) & 1318 tbb::make_filter<void*, void>(my_filters.mode2, filter2) 1319 ); 1320 } 1321 void run ( tbb::task_group_context& ctx ) { 1322 tbb::parallel_pipeline( 1323 g_NumTokens, 1324 tbb::make_filter<void, void*>(tbb::filter_mode::parallel, inputFilter) & 1325 tbb::make_filter<void*, void*>(my_filters.mode1, filter1) & 1326 tbb::make_filter<void*, void>(my_filters.mode2, filter2), 1327 ctx 1328 ); 1329 } 1330 }; 1331 1332 typedef CustomPipeline<InputFilter, SimpleFilter> SimplePipeline; 1333 1334 // Tests exceptions without nesting 1335 void Test1_pipeline ( const FilterSet& filters ) { 1336 ResetGlobals(); 1337 SimplePipeline testPipeline(filters); 1338 TRY(); 1339 testPipeline.run(); 1340 if ( g_CurExecuted == 2 * NUM_ITEMS ) { 1341 // all the items were processed, though an exception was supposed to occur. 1342 if(!g_ExceptionInMaster && g_NonMasterExecutedThrow > 0) { 1343 // if !g_ExceptionInMaster, the master thread is not allowed to throw. 1344 // if g_nonMasterExcutedThrow > 0 then a thread besides the master tried to throw. 1345 REQUIRE_MESSAGE((filters.mode1 != tbb::filter_mode::parallel && filters.mode2 != tbb::filter_mode::parallel), 1346 "Unusual count"); 1347 } 1348 // In case of all serial filters they might be all executed in the thread(s) 1349 // where exceptions are not allowed by the common test logic. So we just quit. 1350 return; 1351 } 1352 CATCH_AND_ASSERT(); 1353 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception"); 1354 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception"); 1355 REQUIRE_MESSAGE (g_NumExceptionsCaught == 1, "No try_blocks in any body expected in this test"); 1356 if ( !g_SolitaryException ) 1357 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception"); 1358 1359 } // void Test1_pipeline () 1360 1361 // Filter with nesting 1362 class OuterFilter { 1363 public: 1364 OuterFilter ( bool, bool ) {} 1365 1366 void* operator()(void* item) const { 1367 ++g_OuterParCalls; 1368 SimplePipeline testPipeline(serial_parallel); 1369 testPipeline.run(); 1370 return item; 1371 } 1372 }; // class OuterFilter 1373 1374 //! Uses pipeline containing an inner pipeline with the default context not wrapped by a try-block. 1375 /** Inner algorithms are spawned inside the new bound context by default. Since 1376 exceptions thrown from the inner pipeline are not handled by the caller 1377 (outer pipeline body) in this test, they will cancel all the sibling inner 1378 algorithms. **/ 1379 void Test2_pipeline ( const FilterSet& filters ) { 1380 ResetGlobals(); 1381 g_NestedPipelines = true; 1382 CustomPipeline<InputFilter, OuterFilter> testPipeline(filters); 1383 TRY(); 1384 testPipeline.run(); 1385 CATCH_AND_ASSERT(); 1386 bool okayNoExceptionCaught = (g_ExceptionInMaster && !g_MasterExecutedThrow) || (!g_ExceptionInMaster && !g_NonMasterExecutedThrow); 1387 REQUIRE_MESSAGE ((g_NumExceptionsCaught == 1 || okayNoExceptionCaught), "No try_blocks in any body expected in this test"); 1388 if ( !g_SolitaryException ) { 1389 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived exception"); 1390 } 1391 } // void Test2_pipeline () 1392 1393 //! creates isolated inner pipeline and runs it. 1394 class OuterFilterWithIsolatedCtx { 1395 public: 1396 OuterFilterWithIsolatedCtx( bool , bool ) {} 1397 1398 void* operator()(void* item) const { 1399 ++g_OuterParCalls; 1400 tbb::task_group_context ctx(tbb::task_group_context::isolated); 1401 // create inner pipeline with serial input, parallel output filter, second filter throws 1402 SimplePipeline testPipeline(serial_parallel); 1403 testPipeline.run(ctx); 1404 return item; 1405 } 1406 }; // class OuterFilterWithIsolatedCtx 1407 1408 //! Uses pipeline invoking an inner pipeline with an isolated context without a try-block. 1409 /** Even though exceptions thrown from the inner pipeline are not handled 1410 by the caller in this test, they will not affect sibling inner algorithms 1411 already running because of the isolated contexts. However because the first 1412 exception cancels the root parallel_for_each only the first g_NumThreads subranges 1413 will be processed (which launch inner pipelines) **/ 1414 void Test3_pipeline ( const FilterSet& filters ) { 1415 for( int nTries = 1; nTries <= 4; ++nTries) { 1416 ResetGlobals(); 1417 g_NestedPipelines = true; 1418 g_Master = std::this_thread::get_id(); 1419 intptr_t innerCalls = NUM_ITEMS, 1420 minExecuted = (g_NumThreads - 1) * innerCalls; 1421 CustomPipeline<InputFilter, OuterFilterWithIsolatedCtx> testPipeline(filters); 1422 TRY(); 1423 testPipeline.run(); 1424 CATCH_AND_ASSERT(); 1425 1426 bool okayNoExceptionCaught = (g_ExceptionInMaster && !g_MasterExecuted) || 1427 (!g_ExceptionInMaster && !g_NonMasterExecuted); 1428 // only test assertions if the test threw an exception (or we don't care) 1429 bool testSucceeded = okayNoExceptionCaught || g_NumExceptionsCaught > 0; 1430 if(testSucceeded) { 1431 if (g_SolitaryException) { 1432 1433 // The test is one outer pipeline with two NestedFilters that each start an inner pipeline. 1434 // Each time the input filter of a pipeline delivers its first item, it increments 1435 // g_PipelinesStarted. When g_SolitaryException, the throw will not occur until 1436 // g_PipelinesStarted >= 3. (This is so at least a second pipeline in its own isolated 1437 // context will start; that is what we're testing.) 1438 // 1439 // There are two pipelines which will NOT run to completion when a solitary throw 1440 // happens in an isolated inner context: the outer pipeline and the pipeline which 1441 // throws. All the other pipelines which start should run to completion. But only 1442 // inner body invocations are counted. 1443 // 1444 // So g_CurExecuted should be about 1445 // 1446 // (2*NUM_ITEMS) * (g_PipelinesStarted - 2) + 1 1447 // ^ executions for each completed pipeline 1448 // ^ completing pipelines (remembering two will not complete) 1449 // ^ one for the inner throwing pipeline 1450 1451 minExecuted = (2*NUM_ITEMS) * (g_PipelinesStarted - 2) + 1; 1452 // each failing pipeline must execute at least two tasks 1453 REQUIRE_MESSAGE(g_CurExecuted >= minExecuted, "Too few tasks survived exception"); 1454 // no more than g_NumThreads tasks will be executed in a cancelled context. Otherwise 1455 // tasks not executing at throw were scheduled. 1456 g_TGCCancelled.validate(g_NumThreads, "Tasks not in-flight were executed"); 1457 REQUIRE_MESSAGE(g_NumExceptionsCaught == 1, "Should have only one exception"); 1458 // if we're only throwing from the master thread, and that thread didn't 1459 // participate in the pipelines, then no throw occurred. 1460 } 1461 REQUIRE_MESSAGE ((g_NumExceptionsCaught == 1 || okayNoExceptionCaught), "No try_blocks in any body expected in this test"); 1462 REQUIRE_MESSAGE (((g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads) || okayNoExceptionCaught), "Too many tasks survived exception"); 1463 return; 1464 } 1465 } 1466 } 1467 1468 class OuterFilterWithEhBody { 1469 public: 1470 OuterFilterWithEhBody( bool, bool ){} 1471 1472 void* operator()(void* item) const { 1473 tbb::task_group_context ctx(tbb::task_group_context::isolated); 1474 ++g_OuterParCalls; 1475 SimplePipeline testPipeline(serial_parallel); 1476 TRY(); 1477 testPipeline.run(ctx); 1478 CATCH(); 1479 return item; 1480 } 1481 }; // class OuterFilterWithEhBody 1482 1483 //! Uses pipeline body invoking an inner pipeline (with isolated context) inside a try-block. 1484 /** Since exception(s) thrown from the inner pipeline are handled by the caller 1485 in this test, they do not affect other tasks of the the root pipeline 1486 nor sibling inner algorithms. **/ 1487 void Test4_pipeline ( const FilterSet& filters ) { 1488 #if __GNUC__ && !__INTEL_COMPILER 1489 if ( strncmp(__VERSION__, "4.1.0", 5) == 0 ) { 1490 MESSAGE("Known issue: one of exception handling tests is skipped."); 1491 return; 1492 } 1493 #endif 1494 ResetGlobals( true, true ); 1495 // each outer pipeline stage will start NUM_ITEMS inner pipelines. 1496 // each inner pipeline that doesn't throw will process NUM_ITEMS items. 1497 // for solitary exception there will be one pipeline that only processes one stage, one item. 1498 // innerCalls should be 2*NUM_ITEMS 1499 intptr_t innerCalls = 2*NUM_ITEMS, 1500 outerCalls = 2*NUM_ITEMS, 1501 maxExecuted = outerCalls * innerCalls; // the number of invocations of the inner pipelines 1502 CustomPipeline<InputFilter, OuterFilterWithEhBody> testPipeline(filters); 1503 TRY(); 1504 testPipeline.run(); 1505 CATCH_AND_ASSERT(); 1506 intptr_t minExecuted = 0; 1507 bool okayNoExceptionCaught = (g_ExceptionInMaster && !g_MasterExecuted) || 1508 (!g_ExceptionInMaster && !g_NonMasterExecuted); 1509 if ( g_SolitaryException ) { 1510 minExecuted = maxExecuted - innerCalls; // one throwing inner pipeline 1511 REQUIRE_MESSAGE((g_NumExceptionsCaught == 1 || okayNoExceptionCaught), "No exception registered"); 1512 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived exception"); // probably will assert. 1513 } 1514 else { 1515 // we assume throwing pipelines will not count 1516 minExecuted = (outerCalls - g_NumExceptionsCaught) * innerCalls; 1517 REQUIRE_MESSAGE(((g_NumExceptionsCaught >= 1 && g_NumExceptionsCaught <= outerCalls) || okayNoExceptionCaught), "Unexpected actual number of exceptions"); 1518 REQUIRE_MESSAGE (g_CurExecuted >= minExecuted, "Too many executed tasks reported"); 1519 // too many already-scheduled tasks are started after the first exception is 1520 // thrown. And g_ExecutedAtLastCatch is updated every time an exception is caught. 1521 // So with multiple exceptions there are a variable number of tasks that have been 1522 // discarded because of the signals. 1523 // each throw is caught, so we will see many cancelled tasks. g_ExecutedAtLastCatch is 1524 // updated with each throw, so the value will be the number of tasks executed at the last 1525 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks survived multiple exceptions"); 1526 } 1527 } // void Test4_pipeline () 1528 1529 //! Tests pipeline function passed with different combination of filters 1530 template<void testFunc(const FilterSet&)> 1531 void TestWithDifferentFiltersAndConcurrency() { 1532 for (auto concurrency_level: utils::concurrency_range()) { 1533 g_NumThreads = static_cast<int>(concurrency_level); 1534 g_Master = std::this_thread::get_id(); 1535 if (g_NumThreads > 1) { 1536 // Execute in all the possible modes 1537 for ( size_t j = 0; j < 4; ++j ) { 1538 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 1539 g_ExceptionInMaster = (j & 1) != 0; 1540 g_SolitaryException = (j & 2) != 0; 1541 g_NumTokens = 2 * g_NumThreads; 1542 const int NumFilterTypes = 3; 1543 const tbb::filter_mode modes[NumFilterTypes] = { 1544 tbb::filter_mode::parallel, 1545 tbb::filter_mode::serial_in_order, 1546 tbb::filter_mode::serial_out_of_order 1547 }; 1548 1549 for ( int i = 0; i < NumFilterTypes; ++i ) { 1550 for ( int n = 0; n < NumFilterTypes; ++n ) { 1551 for ( int k = 0; k < 2; ++k ) 1552 testFunc( FilterSet(modes[i], modes[j], k == 0, k != 0) ); 1553 } 1554 } 1555 } 1556 } 1557 } 1558 } 1559 1560 //! Testing parallel_pipeline exception handling 1561 //! \brief \ref error_guessing 1562 TEST_CASE("parallel_pipeline exception handling test #1") { 1563 TestWithDifferentFiltersAndConcurrency<Test1_pipeline>(); 1564 } 1565 1566 //! Testing parallel_pipeline exception handling 1567 //! \brief \ref error_guessing 1568 TEST_CASE("parallel_pipeline exception handling test #2") { 1569 TestWithDifferentFiltersAndConcurrency<Test2_pipeline>(); 1570 } 1571 1572 //! Testing parallel_pipeline exception handling 1573 //! \brief \ref error_guessing 1574 TEST_CASE("parallel_pipeline exception handling test #3") { 1575 TestWithDifferentFiltersAndConcurrency<Test3_pipeline>(); 1576 } 1577 1578 //! Testing parallel_pipeline exception handling 1579 //! \brief \ref error_guessing 1580 TEST_CASE("parallel_pipeline exception handling test #4") { 1581 TestWithDifferentFiltersAndConcurrency<Test4_pipeline>(); 1582 } 1583 1584 #endif /* TBB_USE_EXCEPTIONS */ 1585 1586 class FilterToCancel { 1587 public: 1588 FilterToCancel() {} 1589 void* operator()(void* item) const { 1590 ++g_CurExecuted; 1591 Cancellator::WaitUntilReady(); 1592 return item; 1593 } 1594 }; // class FilterToCancel 1595 1596 template <class Filter_to_cancel> 1597 class PipelineLauncher { 1598 tbb::task_group_context &my_ctx; 1599 public: 1600 PipelineLauncher ( tbb::task_group_context& ctx ) : my_ctx(ctx) {} 1601 1602 void operator()() const { 1603 // Run test when serial filter is the first non-input filter 1604 InputFilter inputFilter; 1605 Filter_to_cancel filterToCancel; 1606 tbb::parallel_pipeline( 1607 g_NumTokens, 1608 tbb::make_filter<void, void*>(tbb::filter_mode::parallel, inputFilter) & 1609 tbb::make_filter<void*, void>(tbb::filter_mode::parallel, filterToCancel), 1610 my_ctx 1611 ); 1612 } 1613 }; 1614 1615 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm). 1616 void TestCancelation1_pipeline () { 1617 ResetGlobals(); 1618 g_ThrowException = false; 1619 RunCancellationTest<PipelineLauncher<FilterToCancel>, Cancellator>(10); 1620 g_TGCCancelled.validate(g_NumThreads, "Too many tasks survived cancellation"); 1621 REQUIRE_MESSAGE (g_CurExecuted < g_ExecutedAtLastCatch + g_NumThreads, "Too many tasks were executed after cancellation"); 1622 } 1623 1624 class FilterToCancel2 { 1625 public: 1626 FilterToCancel2() {} 1627 1628 void* operator()(void* item) const { 1629 ++g_CurExecuted; 1630 utils::ConcurrencyTracker ct; 1631 // The test will hang (and be timed out by the test system) if is_cancelled() is broken 1632 while( !tbb::is_current_task_group_canceling() ) 1633 std::this_thread::yield(); 1634 return item; 1635 } 1636 }; 1637 1638 //! Test for cancelling an algorithm from outside (from a task running in parallel with the algorithm). 1639 /** This version also tests task::is_cancelled() method. **/ 1640 void TestCancelation2_pipeline () { 1641 ResetGlobals(); 1642 RunCancellationTest<PipelineLauncher<FilterToCancel2>, Cancellator2>(); 1643 // g_CurExecuted is always >= g_ExecutedAtLastCatch, because the latter is always a snapshot of the 1644 // former, and g_CurExecuted is monotonic increasing. so the comparison should be at least ==. 1645 // If another filter is started after cancel but before cancellation is propagated, then the 1646 // number will be larger. 1647 REQUIRE_MESSAGE (g_CurExecuted <= g_ExecutedAtLastCatch, "Some tasks were executed after cancellation"); 1648 } 1649 1650 /** If min and max thread numbers specified on the command line are different, 1651 the test is run only for 2 sizes of the thread pool (MinThread and MaxThread) 1652 to be able to test the high and low contention modes while keeping the test reasonably fast **/ 1653 1654 //! Testing parallel_pipeline cancellation 1655 //! \brief \ref error_guessing 1656 TEST_CASE("parallel_pipeline cancellation test #1") { 1657 for (auto concurrency_level: utils::concurrency_range()) { 1658 g_NumThreads = static_cast<int>(concurrency_level); 1659 g_Master = std::this_thread::get_id(); 1660 if (g_NumThreads > 1) { 1661 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 1662 // Execute in all the possible modes 1663 for ( size_t j = 0; j < 4; ++j ) { 1664 g_ExceptionInMaster = (j & 1) != 0; 1665 g_SolitaryException = (j & 2) != 0; 1666 g_NumTokens = 2 * g_NumThreads; 1667 1668 TestCancelation1_pipeline(); 1669 } 1670 } 1671 } 1672 } 1673 1674 //! Testing parallel_pipeline cancellation 1675 //! \brief \ref error_guessing 1676 TEST_CASE("parallel_pipeline cancellation test #2") { 1677 for (auto concurrency_level: utils::concurrency_range()) { 1678 g_NumThreads = static_cast<int>(concurrency_level); 1679 g_Master = std::this_thread::get_id(); 1680 if (g_NumThreads > 1) { 1681 tbb::global_control control(tbb::global_control::max_allowed_parallelism, g_NumThreads); 1682 // Execute in all the possible modes 1683 for ( size_t j = 0; j < 4; ++j ) { 1684 g_ExceptionInMaster = (j & 1) != 0; 1685 g_SolitaryException = (j & 2) != 0; 1686 g_NumTokens = 2 * g_NumThreads; 1687 1688 TestCancelation2_pipeline(); 1689 } 1690 } 1691 } 1692 } 1693