1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include <atomic> 18 19 #include "common/parallel_reduce_common.h" 20 #include "common/cpu_usertime.h" 21 #include "common/exception_handling.h" 22 23 //! \file test_parallel_reduce.cpp 24 //! \brief Test for [algorithms.parallel_reduce algorithms.parallel_deterministic_reduce] specification 25 26 using ValueType = uint64_t; 27 28 struct Sum { 29 template<typename T> 30 T operator() ( const T& v1, const T& v2 ) const { 31 return v1 + v2; 32 } 33 }; 34 35 struct Accumulator { 36 ValueType operator() ( const tbb::blocked_range<ValueType*>& r, ValueType value ) const { 37 for ( ValueType* pv = r.begin(); pv != r.end(); ++pv ) 38 value += *pv; 39 return value; 40 } 41 }; 42 43 class ParallelSumTester { 44 public: 45 ParallelSumTester( const ParallelSumTester& ) = default; 46 void operator=( const ParallelSumTester& ) = delete; 47 48 ParallelSumTester() : m_range(nullptr, nullptr) { 49 m_array = new ValueType[unsigned(count)]; 50 for ( ValueType i = 0; i < count; ++i ) 51 m_array[i] = i + 1; 52 m_range = tbb::blocked_range<ValueType*>( m_array, m_array + count ); 53 } 54 ~ParallelSumTester() { delete[] m_array; } 55 56 template<typename Partitioner> 57 void CheckParallelReduce() { 58 Partitioner partitioner; 59 ValueType result1 = reduce_invoker<ValueType>( m_range, Accumulator(), Sum(), partitioner ); 60 REQUIRE_MESSAGE( result1 == expected, "Wrong parallel summation result" ); 61 ValueType result2 = reduce_invoker<ValueType>( m_range, 62 [](const tbb::blocked_range<ValueType*>& r, ValueType value) -> ValueType { 63 for ( const ValueType* pv = r.begin(); pv != r.end(); ++pv ) 64 value += *pv; 65 return value; 66 }, 67 Sum(), 68 partitioner 69 ); 70 REQUIRE_MESSAGE( result2 == expected, "Wrong parallel summation result" ); 71 } 72 private: 73 ValueType* m_array; 74 tbb::blocked_range<ValueType*> m_range; 75 static const ValueType count, expected; 76 }; 77 78 const ValueType ParallelSumTester::count = 1000000; 79 const ValueType ParallelSumTester::expected = count * (count + 1) / 2; 80 81 namespace test_cancellation { 82 83 struct ReduceToCancel { 84 std::size_t operator()( const tbb::blocked_range<std::size_t>&, std::size_t ) const { 85 ++g_CurExecuted; 86 Cancellator::WaitUntilReady(); 87 return 1; 88 } 89 }; // struct ReduceToCancel 90 91 struct JoinToCancel { 92 std::size_t operator()( std::size_t, std::size_t ) const { 93 ++g_CurExecuted; 94 Cancellator::WaitUntilReady(); 95 return 1; 96 } 97 }; // struct Join 98 99 struct ReduceFunctorToCancel { 100 std::size_t result; 101 102 ReduceFunctorToCancel() : result(0) {} 103 ReduceFunctorToCancel( ReduceFunctorToCancel&, tbb::split ) : result(0) {} 104 105 void operator()( const tbb::blocked_range<std::size_t>& br ) { 106 result = ReduceToCancel{}(br, result); 107 } 108 109 void join( ReduceFunctorToCancel& rhs ) { 110 result = JoinToCancel{}(result, rhs.result); 111 } 112 }; // struct ReduceFunctorToCancel 113 114 static constexpr std::size_t buffer_test_size = 1024; 115 static constexpr std::size_t maxParallelReduceRunnerMode = 9; 116 117 template <std::size_t Mode> 118 class ParallelReduceRunner { 119 tbb::task_group_context& my_ctx; 120 121 static_assert(Mode >= 0 && Mode <= maxParallelReduceRunnerMode, "Incorrect mode for ParallelReduceTask"); 122 123 template <typename... Args> 124 void run_parallel_reduce( Args&&... args ) const { 125 switch(Mode % 5) { 126 case 0 : { 127 tbb::parallel_reduce(std::forward<Args>(args)..., my_ctx); 128 break; 129 } 130 case 1 : { 131 tbb::parallel_reduce(std::forward<Args>(args)..., tbb::simple_partitioner{}, my_ctx); 132 break; 133 } 134 case 2 : { 135 tbb::parallel_reduce(std::forward<Args>(args)..., tbb::auto_partitioner{}, my_ctx); 136 break; 137 } 138 case 3 : { 139 tbb::parallel_reduce(std::forward<Args>(args)..., tbb::static_partitioner{}, my_ctx); 140 break; 141 } 142 case 4 : { 143 tbb::affinity_partitioner aff; 144 tbb::parallel_reduce(std::forward<Args>(args)..., aff, my_ctx); 145 break; 146 } 147 } 148 } 149 150 public: 151 ParallelReduceRunner( tbb::task_group_context& ctx ) 152 : my_ctx(ctx) {} 153 154 void operator()() const { 155 tbb::blocked_range<std::size_t> br(0, buffer_test_size); 156 if (Mode < 5) { 157 ReduceFunctorToCancel functor; 158 run_parallel_reduce(br, functor); 159 } else { 160 run_parallel_reduce(br, 0, ReduceToCancel{}, JoinToCancel{}); 161 } 162 } 163 }; // class ParallelReduceRunner 164 165 static constexpr std::size_t maxParallelDeterministicReduceRunnerMode = 5; 166 167 // TODO: unify with ParallelReduceRunner 168 template <std::size_t Mode> 169 class ParallelDeterministicReduceRunner { 170 tbb::task_group_context& my_ctx; 171 172 static_assert(Mode >= 0 && Mode <= maxParallelDeterministicReduceRunnerMode, "Incorrect Mode for deterministic_reduce task"); 173 174 template <typename... Args> 175 void run_parallel_deterministic_reduce( Args&&... args ) const { 176 switch(Mode % 3) { 177 case 0 : { 178 tbb::parallel_deterministic_reduce(std::forward<Args>(args)..., my_ctx); 179 break; 180 } 181 case 1 : { 182 tbb::parallel_deterministic_reduce(std::forward<Args>(args)..., tbb::simple_partitioner{}, my_ctx); 183 break; 184 } 185 case 2 : { 186 tbb::parallel_deterministic_reduce(std::forward<Args>(args)..., tbb::static_partitioner{}, my_ctx); 187 break; 188 } 189 } 190 } 191 192 public: 193 ParallelDeterministicReduceRunner( tbb::task_group_context& ctx ) 194 : my_ctx(ctx) {} 195 196 void operator()() const { 197 tbb::blocked_range<std::size_t> br(0, buffer_test_size); 198 if (Mode < 3) { 199 ReduceFunctorToCancel functor; 200 run_parallel_deterministic_reduce(br, functor); 201 } else { 202 run_parallel_deterministic_reduce(br, 0, ReduceToCancel{}, JoinToCancel{}); 203 } 204 } 205 }; // class ParallelDeterministicReduceRunner 206 207 template <std::size_t Mode> 208 void run_parallel_reduce_cancellation_test() { 209 for ( auto concurrency_level : utils::concurrency_range() ) { 210 if (concurrency_level < 2) continue; 211 212 tbb::global_control gc(tbb::global_control::max_allowed_parallelism, concurrency_level); 213 ResetEhGlobals(); 214 RunCancellationTest<ParallelReduceRunner<Mode>, Cancellator>(); 215 } 216 } 217 218 template <std::size_t Mode> 219 void run_parallel_deterministic_reduce_cancellation_test() { 220 for ( auto concurrency_level : utils::concurrency_range() ) { 221 if (concurrency_level < 2) continue; 222 223 tbb::global_control gc(tbb::global_control::max_allowed_parallelism, concurrency_level); 224 ResetEhGlobals(); 225 RunCancellationTest<ParallelDeterministicReduceRunner<Mode>, Cancellator>(); 226 } 227 } 228 229 template <std::size_t Mode> 230 struct ParallelReduceTestRunner { 231 static void run() { 232 run_parallel_reduce_cancellation_test<Mode>(); 233 ParallelReduceTestRunner<Mode + 1>::run(); 234 } 235 }; // struct ParallelReduceTestRunner 236 237 template <> 238 struct ParallelReduceTestRunner<maxParallelReduceRunnerMode> { 239 static void run() { 240 run_parallel_reduce_cancellation_test<maxParallelReduceRunnerMode>(); 241 } 242 }; // struct ParallelReduceTestRunner<maxParallelReduceRunnerMode> 243 244 template <std::size_t Mode> 245 struct ParallelDeterministicReduceTestRunner { 246 static void run() { 247 run_parallel_deterministic_reduce_cancellation_test<Mode>(); 248 ParallelDeterministicReduceTestRunner<Mode + 1>::run(); 249 } 250 }; // struct ParallelDeterministicReduceTestRunner 251 252 template <> 253 struct ParallelDeterministicReduceTestRunner<maxParallelDeterministicReduceRunnerMode> { 254 static void run() { 255 run_parallel_deterministic_reduce_cancellation_test<maxParallelDeterministicReduceRunnerMode>(); 256 } 257 }; // struct ParallelDeterministicReduceTestRunner<maxParallelDeterministicReduceRunnerMode> 258 259 } // namespace test_cancellation 260 261 //! Test parallel summation correctness 262 //! \brief \ref stress 263 TEST_CASE("Test parallel summation correctness") { 264 ParallelSumTester pst; 265 pst.CheckParallelReduce<utils_default_partitioner>(); 266 pst.CheckParallelReduce<tbb::simple_partitioner>(); 267 pst.CheckParallelReduce<tbb::auto_partitioner>(); 268 pst.CheckParallelReduce<tbb::affinity_partitioner>(); 269 pst.CheckParallelReduce<tbb::static_partitioner>(); 270 } 271 272 static std::atomic<long> ForkCount; 273 static std::atomic<long> FooBodyCount; 274 275 //! Class with public interface that is exactly minimal requirements for Range concept 276 class MinimalRange { 277 size_t begin, end; 278 friend class FooBody; 279 explicit MinimalRange( size_t i ) : begin(0), end(i) {} 280 template <typename Partitioner_> friend void TestSplitting( std::size_t nthread ); 281 public: 282 MinimalRange( MinimalRange& r, tbb::split ) : end(r.end) { 283 begin = r.end = (r.begin+r.end)/2; 284 } 285 bool is_divisible() const {return end-begin>=2;} 286 bool empty() const {return begin==end;} 287 }; 288 289 //! Class with public interface that is exactly minimal requirements for Body of a parallel_reduce 290 class FooBody { 291 private: 292 FooBody( const FooBody& ); // Deny access 293 void operator=( const FooBody& ); // Deny access 294 template <typename Partitioner_> friend void TestSplitting( std::size_t nthread ); 295 //! Parent that created this body via split operation. NULL if original body. 296 FooBody* parent; 297 //! Total number of index values processed by body and its children. 298 size_t sum; 299 //! Number of join operations done so far on this body and its children. 300 long join_count; 301 //! Range that has been processed so far by this body and its children. 302 size_t begin, end; 303 //! True if body has not yet been processed at least once by operator(). 304 bool is_new; 305 //! 1 if body was created by split; 0 if original body. 306 int forked; 307 FooBody() {++FooBodyCount;} 308 public: 309 ~FooBody() { 310 forked = 0xDEADBEEF; 311 sum=0xDEADBEEF; 312 join_count=0xDEADBEEF; 313 --FooBodyCount; 314 } 315 FooBody( FooBody& other, tbb::split ) { 316 ++FooBodyCount; 317 ++ForkCount; 318 sum = 0; 319 parent = &other; 320 join_count = 0; 321 is_new = true; 322 forked = 1; 323 } 324 325 void init() { 326 sum = 0; 327 parent = nullptr; 328 join_count = 0; 329 is_new = true; 330 forked = 0; 331 begin = ~size_t(0); 332 end = ~size_t(0); 333 } 334 335 void join( FooBody& s ) { 336 REQUIRE( s.forked==1 ); 337 REQUIRE( this!=&s ); 338 REQUIRE( this==s.parent ); 339 REQUIRE( end==s.begin ); 340 end = s.end; 341 sum += s.sum; 342 join_count += s.join_count + 1; 343 s.forked = 2; 344 } 345 void operator()( const MinimalRange& r ) { 346 for( size_t k=r.begin; k<r.end; ++k ) 347 ++sum; 348 if( is_new ) { 349 is_new = false; 350 begin = r.begin; 351 } else 352 REQUIRE( end==r.begin ); 353 end = r.end; 354 } 355 }; 356 357 template<typename Partitioner> 358 void TestSplitting( std::size_t nthread ) { 359 ForkCount = 0; 360 long join_count = 0; 361 Partitioner partitioner; 362 for( size_t i=0; i<=1000; ++i ) { 363 FooBody f; 364 f.init(); 365 REQUIRE_MESSAGE( FooBodyCount==1, "Wrong initial BodyCount value" ); 366 reduce_invoker(MinimalRange(i), f, partitioner); 367 368 if (nthread == 1) REQUIRE_MESSAGE(ForkCount==0, "Body was split during 1 thread execution"); 369 370 join_count += f.join_count; 371 REQUIRE_MESSAGE( FooBodyCount==1, "Some copies of FooBody was not removed after reduction"); 372 REQUIRE_MESSAGE( f.sum==i, "Incorrect reduction" ); 373 REQUIRE_MESSAGE( f.begin==(i==0 ? ~size_t(0) : 0), "Incorrect range borders" ); 374 REQUIRE_MESSAGE( f.end==(i==0 ? ~size_t(0) : i), "Incorrect range borders" ); 375 } 376 } 377 378 //! Test splitting range and body during reduction, test that all workers sleep when no work 379 //! \brief \ref resource_usage \ref error_guessing 380 TEST_CASE("Test splitting range and body during reduction, test that all workers sleep when no work") { 381 for ( auto concurrency_level : utils::concurrency_range() ) { 382 tbb::global_control control(tbb::global_control::max_allowed_parallelism, concurrency_level); 383 384 TestSplitting<tbb::simple_partitioner>(concurrency_level); 385 TestSplitting<tbb::static_partitioner>(concurrency_level); 386 TestSplitting<tbb::auto_partitioner>(concurrency_level); 387 TestSplitting<tbb::affinity_partitioner>(concurrency_level); 388 TestSplitting<utils_default_partitioner>(concurrency_level); 389 390 // Test that all workers sleep when no work 391 TestCPUUserTime(concurrency_level); 392 } 393 } 394 395 //! Define overloads of parallel_deterministic_reduce that accept "undesired" types of partitioners 396 namespace unsupported { 397 template<typename Range, typename Body> 398 void parallel_deterministic_reduce(const Range&, Body&, const tbb::auto_partitioner&) { } 399 template<typename Range, typename Body> 400 void parallel_deterministic_reduce(const Range&, Body&, tbb::affinity_partitioner&) { } 401 402 template<typename Range, typename Value, typename RealBody, typename Reduction> 403 Value parallel_deterministic_reduce(const Range& , const Value& identity, const RealBody& , const Reduction& , const tbb::auto_partitioner&) { 404 return identity; 405 } 406 template<typename Range, typename Value, typename RealBody, typename Reduction> 407 Value parallel_deterministic_reduce(const Range& , const Value& identity, const RealBody& , const Reduction& , tbb::affinity_partitioner&) { 408 return identity; 409 } 410 } 411 412 struct Body { 413 float value; 414 Body() : value(0) {} 415 Body(Body&, tbb::split) { value = 0; } 416 void operator()(const tbb::blocked_range<int>&) {} 417 void join(Body&) {} 418 }; 419 420 //! Check that other types of partitioners are not supported (auto, affinity) 421 //! In the case of "unsupported" API unexpectedly sneaking into namespace tbb, 422 //! this test should result in a compilation error due to overload resolution ambiguity 423 //! \brief \ref negative \ref error_guessing 424 TEST_CASE("Test Unsupported Partitioners") { 425 using namespace tbb; 426 using namespace unsupported; 427 Body body; 428 parallel_deterministic_reduce(blocked_range<int>(0, 10), body, tbb::auto_partitioner()); 429 430 tbb::affinity_partitioner ap; 431 parallel_deterministic_reduce(blocked_range<int>(0, 10), body, ap); 432 433 parallel_deterministic_reduce( 434 blocked_range<int>(0, 10), 435 0, 436 [](const blocked_range<int>&, int init)->int { 437 return init; 438 }, 439 [](int x, int y)->int { 440 return x + y; 441 }, 442 tbb::auto_partitioner() 443 ); 444 parallel_deterministic_reduce( 445 blocked_range<int>(0, 10), 446 0, 447 [](const blocked_range<int>&, int init)->int { 448 return init; 449 }, 450 [](int x, int y)->int { 451 return x + y; 452 }, 453 ap 454 ); 455 } 456 457 //! Testing tbb::parallel_reduce with tbb::task_group_context 458 //! \brief \ref interface \ref error_guessing 459 TEST_CASE("cancellation test for tbb::parallel_reduce") { 460 test_cancellation::ParallelReduceTestRunner</*First mode = */0>::run(); 461 } 462 463 //! Testing tbb::parallel_deterministic_reduce with tbb::task_group_context 464 //! \brief \ref interface \ref error_guessing 465 TEST_CASE("cancellation test for tbb::parallel_deterministic_reduce") { 466 test_cancellation::ParallelDeterministicReduceTestRunner</*First mode = */0>::run(); 467 } 468