1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "common/test.h" 18 #include "common/config.h" 19 #include "common/utils_concurrency_limit.h" 20 #include "common/cpu_usertime.h" 21 22 #include "tbb/global_control.h" 23 #include "tbb/parallel_scan.h" 24 #include "tbb/blocked_range.h" 25 #include "tbb/tick_count.h" 26 #include <vector> 27 #include <atomic> 28 29 //! \file test_parallel_scan.cpp 30 //! \brief Test for [algorithms.parallel_scan] specification 31 32 using Range = tbb::blocked_range<long>; 33 34 static volatile bool ScanIsRunning = false; 35 36 //! Sum of 0..i with wrap around on overflow. 37 inline int TriangularSum( int i ) { 38 return i&1 ? ((i>>1)+1)*i : (i>>1)*(i+1); 39 } 40 41 //! Verify that sum is init plus sum of integers in closed interval [0..finish_index]. 42 /** line should be the source line of the caller */ 43 void VerifySum( int init, long finish_index, int sum, int line ) { 44 int expected = init + TriangularSum(finish_index); 45 CHECK_MESSAGE(expected == sum, "line " << line << ": sum[0.." << finish_index << "] should be = " << expected << ", but was computed as " << sum << "\n"); 46 } 47 48 const int MAXN = 20000; 49 50 enum AddendFlag { 51 UNUSED=0, 52 USED_NONFINAL=1, 53 USED_FINAL=2 54 }; 55 56 //! Array recording how each addend was used. 57 /** 'unsigned char' instead of AddendFlag for sake of compactness. */ 58 static unsigned char AddendHistory[MAXN]; 59 60 std::atomic<long> NumberOfLiveStorage; 61 62 template<typename T> 63 struct Storage { 64 T my_total; 65 Range my_range; 66 Storage(T init) : 67 my_total(init), my_range(-1, -1, 1) { 68 ++NumberOfLiveStorage; 69 } 70 ~Storage() { 71 --NumberOfLiveStorage; 72 } 73 Storage(const Storage& strg) : 74 my_total(strg.my_total), my_range(strg.my_range) { 75 ++NumberOfLiveStorage; 76 } 77 Storage & operator=(const Storage& strg) { 78 my_total = strg.my_total; 79 my_range = strg.my_range; 80 return *this; 81 } 82 }; 83 84 template<typename T> 85 void JoinStorages(const Storage<T>& left, Storage<T>& right) { 86 CHECK(ScanIsRunning); 87 CHECK(left.my_range.end() == right.my_range.begin()); 88 right.my_total += left.my_total; 89 right.my_range = Range(left.my_range.begin(), right.my_range.end(), 1); 90 CHECK(ScanIsRunning); 91 } 92 93 template<typename T> 94 void Scan(const Range & r, bool is_final, Storage<T> & storage, std::vector<T> & sum, const std::vector<T> & addend) { 95 CHECK((!is_final || (storage.my_range.begin() == 0 && storage.my_range.end() == r.begin()) || (storage.my_range.empty() && r.begin() == 0))); 96 for (long i = r.begin(); i < r.end(); ++i) { 97 storage.my_total += addend[i]; 98 if (is_final) { 99 CHECK_MESSAGE(AddendHistory[i] < USED_FINAL, "addend used 'finally' twice?"); 100 AddendHistory[i] |= USED_FINAL; 101 sum[i] = storage.my_total; 102 VerifySum(42, i, int(sum[i]), __LINE__); 103 } 104 else { 105 CHECK_MESSAGE(AddendHistory[i] == UNUSED, "addend used too many times"); 106 AddendHistory[i] |= USED_NONFINAL; 107 } 108 } 109 if (storage.my_range.empty()) 110 storage.my_range = r; 111 else 112 storage.my_range = Range(storage.my_range.begin(), r.end(), 1); 113 } 114 115 template<typename T> 116 Storage<T> ScanWithInit(const Range & r, T init, bool is_final, Storage<T> & storage, std::vector<T> & sum, const std::vector<T> & addend) { 117 if (r.begin() == 0) 118 storage.my_total = init; 119 Scan(r, is_final, storage, sum, addend); 120 return storage; 121 } 122 123 template<typename T> 124 class Accumulator { 125 const std::vector<T> &my_array; 126 std::vector<T> & my_sum; 127 Storage<T> storage; 128 enum state_type { 129 full, // Accumulator has sufficient information for final scan, 130 // i.e. has seen all iterations to its left. 131 // It's either the original Accumulator provided by the user 132 // or a Accumulator constructed by a splitting constructor *and* subsequently 133 // subjected to a reverse_join with a full accumulator. 134 135 partial, // Accumulator has only enough information for pre_scan. 136 // i.e. has not seen all iterations to its left. 137 // It's an Accumulator created by a splitting constructor that 138 // has not yet been subjected to a reverse_join with a full accumulator. 139 140 summary, // Accumulator has summary of iterations processed, but not necessarily 141 // the information required for a final_scan or pre_scan. 142 // It's the result of "assign". 143 144 trash // Accumulator with possibly no useful information. 145 // It was the source for "assign". 146 147 }; 148 mutable state_type my_state; 149 //! Equals this while object is fully constructed, NULL otherwise. 150 /** Used to detect premature destruction and accidental bitwise copy. */ 151 Accumulator* self; 152 Accumulator& operator= (const Accumulator& other); 153 public: 154 Accumulator( T init, const std::vector<T> & array, std::vector<T> & sum ) : 155 my_array(array), my_sum(sum), storage(init), my_state(full) 156 { 157 // Set self as last action of constructor, to indicate that object is fully constructed. 158 self = this; 159 } 160 ~Accumulator() { 161 // Clear self as first action of destructor, to indicate that object is not fully constructed. 162 self = 0; 163 } 164 Accumulator( Accumulator& a, tbb::split ) : 165 my_array(a.my_array), my_sum(a.my_sum), storage(0), my_state(partial) 166 { 167 if (!(a.my_state == partial)) 168 CHECK(a.my_state == full); 169 if (!(a.my_state == full)) 170 CHECK(a.my_state == partial); 171 CHECK(ScanIsRunning); 172 // Set self as last action of constructor, to indicate that object is fully constructed. 173 self = this; 174 } 175 template<typename Tag> 176 void operator()( const Range& r, Tag /*tag*/ ) { 177 if(Tag::is_final_scan()) 178 CHECK(my_state == full); 179 else 180 CHECK(my_state == partial); 181 Scan(r, Tag::is_final_scan(), storage, my_sum, my_array); 182 CHECK_MESSAGE(self==this, "this Accumulator corrupted or prematurely destroyed"); 183 } 184 void reverse_join( const Accumulator& left_body) { 185 const Storage<T> & left = left_body.storage; 186 Storage<T> & right = storage; 187 CHECK(my_state == partial); 188 CHECK( ((left_body.my_state == full) || (left_body.my_state==partial)) ); 189 190 JoinStorages(left, right); 191 192 CHECK(left_body.self == &left_body); 193 my_state = left_body.my_state; 194 } 195 void assign( const Accumulator& other ) { 196 CHECK(other.my_state == full); 197 CHECK(my_state == full); 198 storage.my_total = other.storage.my_total; 199 storage.my_range = other.storage.my_range; 200 CHECK(self == this); 201 CHECK_MESSAGE(other.self==&other, "other Accumulator corrupted or prematurely destroyed"); 202 my_state = summary; 203 other.my_state = trash; 204 } 205 T get_total() { 206 return storage.my_total; 207 } 208 }; 209 210 211 template<typename T, typename Scan, typename ReverseJoin> 212 T ParallelScanFunctionalInvoker(const Range& range, T idx, const Scan& scan, const ReverseJoin& reverse_join, int mode) { 213 switch (mode%3) { 214 case 0: 215 return tbb::parallel_scan(range, idx, scan, reverse_join); 216 break; 217 case 1: 218 return tbb::parallel_scan(range, idx, scan, reverse_join, tbb::simple_partitioner()); 219 break; 220 default: 221 return tbb::parallel_scan(range, idx, scan, reverse_join, tbb::auto_partitioner()); 222 } 223 } 224 225 template<typename T> 226 class ScanBody { 227 const std::vector<T> &my_addend; 228 std::vector<T> &my_sum; 229 const T my_init; 230 ScanBody& operator= (const ScanBody&); 231 public: 232 ScanBody(T init, const std::vector<T> &addend, std::vector<T> &sum) :my_addend(addend), my_sum(sum), my_init(init) {} 233 template<typename S, typename Tag> 234 Storage<S> operator()(const Range& r, Storage<S> storage, Tag) const { 235 return ScanWithInit(r, my_init, Tag::is_final_scan(), storage, my_sum, my_addend); 236 } 237 }; 238 239 class JoinBody { 240 public: 241 template<typename T> 242 Storage<T> operator()(const Storage<T>& left, Storage<T>& right) const { 243 JoinStorages(left, right); 244 return right; 245 } 246 }; 247 248 struct ParallelScanTemplateFunctor { 249 template<typename T> 250 T operator()(Range range, T init, const std::vector<T> &addend, std::vector<T> &sum, int mode) { 251 for (long i = 0; i<MAXN; ++i) { 252 AddendHistory[i] = UNUSED; 253 } 254 ScanIsRunning = true; 255 ScanBody<T> sb(init, addend, sum); 256 JoinBody jb; 257 Storage<T> res = ParallelScanFunctionalInvoker(range, Storage<T>(0), sb, jb, mode); 258 ScanIsRunning = false; 259 if (range.empty()) 260 res.my_total = init; 261 return res.my_total; 262 } 263 }; 264 265 struct ParallelScanLambda { 266 template<typename T> 267 T operator()(Range range, T init, const std::vector<T> &addend, std::vector<T> &sum, int mode) { 268 for (long i = 0; i<MAXN; ++i) { 269 AddendHistory[i] = UNUSED; 270 } 271 ScanIsRunning = true; 272 Storage<T> res = ParallelScanFunctionalInvoker(range, Storage<T>(0), 273 [&addend, &sum, init](const Range& r, Storage<T> storage, bool is_final_scan /*tag*/) -> Storage<T> { 274 return ScanWithInit(r, init, is_final_scan, storage, sum, addend); 275 }, 276 [](const Storage<T>& left, Storage<T>& right) -> Storage<T> { 277 JoinStorages(left, right); 278 return right; 279 }, 280 mode); 281 ScanIsRunning = false; 282 if (range.empty()) 283 res.my_total = init; 284 return res.my_total; 285 } 286 }; 287 288 void TestAccumulator( int mode ) { 289 typedef int T; 290 std::vector<T> addend(MAXN); 291 std::vector<T> sum(MAXN); 292 std::vector<T> control_sum(MAXN); 293 T control_total; 294 for( int n=0; n<=MAXN; n = n <=128? n+1: n*3) { 295 for( int gs : {1, 2, 100, 511, 12345, n/ 111, n/17, n-1, n}) { 296 if(gs<=0 || gs > n) 297 continue; 298 control_total = 42; 299 for( long i=0; i<MAXN; ++i ) { 300 addend[i] = -1; 301 sum[i] = -2; 302 control_sum[i] = -2; 303 AddendHistory[i] = UNUSED; 304 } 305 for (long i = 0; i<n; ++i) { 306 addend[i] = i; 307 control_total += addend[i]; 308 control_sum[i] = control_total; 309 } 310 311 Accumulator<T> acc( 42, addend, sum); 312 ScanIsRunning = true; 313 314 switch (mode) { 315 case 0: 316 tbb::parallel_scan( Range( 0, n, gs ), acc ); 317 break; 318 case 1: 319 tbb::parallel_scan( Range( 0, n, gs ), acc, tbb::simple_partitioner() ); 320 break; 321 case 2: 322 tbb::parallel_scan( Range( 0, n, gs ), acc, tbb::auto_partitioner() ); 323 break; 324 } 325 326 ScanIsRunning = false; 327 328 long used_once_count = 0; 329 for( long i=0; i<n; ++i ) 330 CHECK_MESSAGE((AddendHistory[i]&USED_FINAL), "failed to use addend[" << i << "] " << (AddendHistory[i] & USED_NONFINAL ? "(but used nonfinal)\n" : "\n")); 331 for( long i=0; i<n; ++i ) { 332 VerifySum( 42, i, sum[i], __LINE__ ); 333 used_once_count += AddendHistory[i]==USED_FINAL; 334 } 335 if( n ) 336 CHECK(acc.get_total()==sum[n-1]); 337 else 338 CHECK(acc.get_total()==42); 339 CHECK(control_total ==acc.get_total()); 340 CHECK(control_sum==sum); 341 } 342 } 343 } 344 345 template<typename ParallelScanWrapper> 346 void TestInterface( int mode, ParallelScanWrapper parallel_scan_wrapper ) { 347 using T = int; 348 std::vector<T> addend(MAXN); 349 std::vector<T> control_sum(MAXN); 350 T control_total(42); 351 for( long i=0; i<MAXN; ++i ) { 352 addend[i] = i; 353 control_total += addend[i]; 354 control_sum[i] = control_total; 355 AddendHistory[i] = UNUSED; 356 } 357 358 std::vector<T> sum(MAXN); 359 for (long i = 0; i<MAXN; ++i) 360 sum[i] = -2; 361 ScanIsRunning = true; 362 T total = parallel_scan_wrapper(Range(0, MAXN, 1), 42, addend, sum, mode); 363 ScanIsRunning = false; 364 365 CHECK_MESSAGE(control_total==total, "Parallel prefix sum is not equal to serial"); 366 CHECK_MESSAGE(control_sum==sum, "Parallel prefix vector is not equal to serial"); 367 } 368 369 370 #if __TBB_CPP14_GENERIC_LAMBDAS_PRESENT 371 struct ParallelScanGenericLambda { 372 template<typename T> 373 T operator()(Range range, T init, const std::vector<T> &addend, std::vector<T> &sum, int mode) { 374 for (long i = 0; i<MAXN; ++i) { 375 AddendHistory[i] = UNUSED; 376 } 377 ScanIsRunning = true; 378 Storage<T> res = ParallelScanFunctionalInvoker(range, Storage<T>(0), 379 [&addend, &sum, init](const auto& rng, auto storage, auto scan_tag) { 380 return ScanWithInit(rng, init, scan_tag.is_final_scan(), storage, sum, addend); 381 }, 382 [](const auto& left, auto& right) { 383 JoinStorages(left, right); 384 return right; 385 }, 386 mode); 387 ScanIsRunning = false; 388 if (range.empty()) 389 res.my_total = init; 390 return res.my_total; 391 } 392 }; 393 #endif /* __TBB_CPP14_GENERIC_LAMBDAS_PRESENT */ 394 395 396 397 // Test for parallel_scan with with different partitioners 398 //! \brief \ref error_guessing \ref resource_usage 399 TEST_CASE("parallel_scan testing with different partitioners") { 400 for (auto concurrency_level : utils::concurrency_range()) { 401 tbb::global_control control(tbb::global_control::max_allowed_parallelism, concurrency_level); 402 for (int mode = 0; mode < 3; mode++) { 403 NumberOfLiveStorage = 0; 404 TestAccumulator(mode); 405 // Test that all workers sleep when no work 406 TestCPUUserTime(concurrency_level); 407 408 // Checking has to be done late, because when parallel_scan makes copies of 409 // the user's "Body", the copies might be destroyed slightly after parallel_scan 410 // returns. 411 CHECK(NumberOfLiveStorage == 0); 412 } 413 } 414 } 415 416 // Test for parallel_scan with template functors 417 //! \brief \ref error_guessing \ref interface \ref resource_usage 418 TEST_CASE("parallel_scan testing with template functor") { 419 for (auto concurrency_level : utils::concurrency_range()) { 420 tbb::global_control control(tbb::global_control::max_allowed_parallelism, concurrency_level); 421 for (int mode = 0; mode < 3; mode++) { 422 NumberOfLiveStorage = 0; 423 TestInterface(mode, ParallelScanTemplateFunctor()); 424 // Test that all workers sleep when no work 425 TestCPUUserTime(concurrency_level); 426 427 // Checking has to be done late, because when parallel_scan makes copies of 428 // the user's "Body", the copies might be destroyed slightly after parallel_scan 429 // returns. 430 CHECK(NumberOfLiveStorage == 0); 431 } 432 } 433 } 434 435 // Test for parallel_scan with lambdas 436 //! \brief \ref error_guessing \ref interface \ref resource_usage 437 TEST_CASE("parallel_scan testing with lambdas") { 438 for (auto concurrency_level : utils::concurrency_range()) { 439 tbb::global_control control(tbb::global_control::max_allowed_parallelism, concurrency_level); 440 for (int mode = 0; mode < 3; mode++) { 441 NumberOfLiveStorage = 0; 442 TestInterface(mode, ParallelScanLambda()); 443 444 // Test that all workers sleep when no work 445 TestCPUUserTime(concurrency_level); 446 447 // Checking has to be done late, because when parallel_scan makes copies of 448 // the user's "Body", the copies might be destroyed slightly after parallel_scan 449 // returns. 450 CHECK(NumberOfLiveStorage == 0); 451 } 452 } 453 } 454 455 #if __TBB_CPP14_GENERIC_LAMBDAS_PRESENT 456 // Test for parallel_scan with genetic lambdas 457 //! \brief \ref error_guessing \ref interface \ref resource_usage 458 TEST_CASE("parallel_scan testing with generic lambdas") { 459 for (auto concurrency_level : utils::concurrency_range()) { 460 tbb::global_control control(tbb::global_control::max_allowed_parallelism, concurrency_level); 461 for (int mode = 0; mode < 3; mode++) { 462 NumberOfLiveStorage = 0; 463 TestInterface(mode, ParallelScanGenericLambda()); 464 // Test that all workers sleep when no work 465 TestCPUUserTime(concurrency_level); 466 467 // Checking has to be done late, because when parallel_scan makes copies of 468 // the user's "Body", the copies might be destroyed slightly after parallel_scan 469 // returns. 470 CHECK(NumberOfLiveStorage == 0); 471 } 472 } 473 } 474 #endif /* __TBB_CPP14_GENERIC_LAMBDAS_PRESENT */ 475