xref: /oneTBB/test/tbb/test_parallel_scan.cpp (revision b15aabb3)
1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "common/test.h"
18 #include "common/config.h"
19 #include "common/utils_concurrency_limit.h"
20 #include "common/cpu_usertime.h"
21 
22 #include "tbb/global_control.h"
23 #include "tbb/parallel_scan.h"
24 #include "tbb/blocked_range.h"
25 #include "tbb/tick_count.h"
26 #include <vector>
27 #include <atomic>
28 
29 //! \file test_parallel_scan.cpp
30 //! \brief Test for [algorithms.parallel_scan] specification
31 
32 using Range = tbb::blocked_range<long>;
33 
34 static volatile bool ScanIsRunning = false;
35 
36 //! Sum of 0..i with wrap around on overflow.
37 inline int TriangularSum( int i ) {
38     return i&1 ? ((i>>1)+1)*i : (i>>1)*(i+1);
39 }
40 
41 //! Verify that sum is init plus sum of integers in closed interval [0..finish_index].
42 /** line should be the source line of the caller */
43 void VerifySum( int init, long finish_index, int sum, int line ) {
44     int expected = init + TriangularSum(finish_index);
45     CHECK_MESSAGE(expected == sum, "line " << line << ": sum[0.." << finish_index << "] should be = " << expected << ", but was computed as " << sum << "\n");
46 }
47 
48 const int MAXN = 20000;
49 
50 enum AddendFlag {
51     UNUSED=0,
52     USED_NONFINAL=1,
53     USED_FINAL=2
54 };
55 
56 //! Array recording how each addend was used.
57 /** 'unsigned char' instead of AddendFlag for sake of compactness. */
58 static unsigned char AddendHistory[MAXN];
59 
60 std::atomic<long> NumberOfLiveStorage;
61 
62 template<typename T>
63 struct Storage {
64     T my_total;
65     Range my_range;
66     Storage(T init) :
67         my_total(init), my_range(-1, -1, 1) {
68         ++NumberOfLiveStorage;
69     }
70     ~Storage() {
71         --NumberOfLiveStorage;
72     }
73     Storage(const Storage& strg) :
74         my_total(strg.my_total), my_range(strg.my_range) {
75         ++NumberOfLiveStorage;
76     }
77     Storage & operator=(const Storage& strg) {
78         my_total = strg.my_total;
79         my_range = strg.my_range;
80         return *this;
81     }
82 };
83 
84 template<typename T>
85 void JoinStorages(const Storage<T>& left, Storage<T>& right) {
86     CHECK(ScanIsRunning);
87     CHECK(left.my_range.end() == right.my_range.begin());
88     right.my_total += left.my_total;
89     right.my_range = Range(left.my_range.begin(), right.my_range.end(), 1);
90     CHECK(ScanIsRunning);
91 }
92 
93 template<typename T>
94 void Scan(const Range & r, bool is_final, Storage<T> & storage, std::vector<T> & sum, const std::vector<T> & addend) {
95     CHECK((!is_final || (storage.my_range.begin() == 0 && storage.my_range.end() == r.begin()) || (storage.my_range.empty() && r.begin() == 0)));
96     for (long i = r.begin(); i < r.end(); ++i) {
97         storage.my_total += addend[i];
98         if (is_final) {
99             CHECK_MESSAGE(AddendHistory[i] < USED_FINAL, "addend used 'finally' twice?");
100             AddendHistory[i] |= USED_FINAL;
101             sum[i] = storage.my_total;
102             VerifySum(42, i, int(sum[i]), __LINE__);
103         }
104         else {
105             CHECK_MESSAGE(AddendHistory[i] == UNUSED, "addend used too many times");
106             AddendHistory[i] |= USED_NONFINAL;
107         }
108     }
109     if (storage.my_range.empty())
110         storage.my_range = r;
111     else
112         storage.my_range = Range(storage.my_range.begin(), r.end(), 1);
113 }
114 
115 template<typename T>
116 Storage<T> ScanWithInit(const Range & r, T init, bool is_final, Storage<T> & storage, std::vector<T> & sum, const std::vector<T> & addend) {
117     if (r.begin() == 0)
118         storage.my_total = init;
119     Scan(r, is_final, storage, sum, addend);
120     return storage;
121 }
122 
123 template<typename T>
124 class Accumulator {
125     const  std::vector<T> &my_array;
126     std::vector<T> & my_sum;
127     Storage<T> storage;
128     enum state_type {
129         full,       // Accumulator has sufficient information for final scan,
130                     // i.e. has seen all iterations to its left.
131                     // It's either the original Accumulator provided by the user
132                     // or a Accumulator constructed by a splitting constructor *and* subsequently
133                     // subjected to a reverse_join with a full accumulator.
134 
135         partial,    // Accumulator has only enough information for pre_scan.
136                     // i.e. has not seen all iterations to its left.
137                     // It's an Accumulator created by a splitting constructor that
138                     // has not yet been subjected to a reverse_join with a full accumulator.
139 
140         summary,    // Accumulator has summary of iterations processed, but not necessarily
141                     // the information required for a final_scan or pre_scan.
142                     // It's the result of "assign".
143 
144         trash       // Accumulator with possibly no useful information.
145                     // It was the source for "assign".
146 
147     };
148     mutable state_type my_state;
149     //! Equals this while object is fully constructed, NULL otherwise.
150     /** Used to detect premature destruction and accidental bitwise copy. */
151     Accumulator* self;
152     Accumulator& operator= (const Accumulator& other);
153 public:
154     Accumulator( T init, const std::vector<T> & array, std::vector<T> & sum ) :
155         my_array(array), my_sum(sum), storage(init), my_state(full)
156     {
157         // Set self as last action of constructor, to indicate that object is fully constructed.
158         self = this;
159     }
160     ~Accumulator() {
161         // Clear self as first action of destructor, to indicate that object is not fully constructed.
162         self = 0;
163     }
164     Accumulator( Accumulator& a, tbb::split ) :
165         my_array(a.my_array), my_sum(a.my_sum), storage(0), my_state(partial)
166     {
167         if (!(a.my_state == partial))
168             CHECK(a.my_state == full);
169         if (!(a.my_state == full))
170             CHECK(a.my_state == partial);
171         CHECK(ScanIsRunning);
172         // Set self as last action of constructor, to indicate that object is fully constructed.
173         self = this;
174     }
175     template<typename Tag>
176     void operator()( const Range& r, Tag /*tag*/ ) {
177         if(Tag::is_final_scan())
178             CHECK(my_state == full);
179         else
180             CHECK(my_state == partial);
181         Scan(r, Tag::is_final_scan(), storage, my_sum, my_array);
182         CHECK_MESSAGE(self==this, "this Accumulator corrupted or prematurely destroyed");
183     }
184     void reverse_join( const Accumulator& left_body) {
185         const Storage<T> & left = left_body.storage;
186         Storage<T> & right = storage;
187         CHECK(my_state == partial);
188         CHECK( ((left_body.my_state == full) || (left_body.my_state==partial)) );
189 
190         JoinStorages(left, right);
191 
192         CHECK(left_body.self == &left_body);
193         my_state = left_body.my_state;
194     }
195     void assign( const Accumulator& other ) {
196         CHECK(other.my_state == full);
197         CHECK(my_state == full);
198         storage.my_total = other.storage.my_total;
199         storage.my_range = other.storage.my_range;
200         CHECK(self == this);
201         CHECK_MESSAGE(other.self==&other, "other Accumulator corrupted or prematurely destroyed");
202         my_state = summary;
203         other.my_state = trash;
204     }
205     T get_total() {
206         return storage.my_total;
207     }
208 };
209 
210 
211 template<typename T, typename Scan, typename ReverseJoin>
212 T ParallelScanFunctionalInvoker(const Range& range, T idx, const Scan& scan, const ReverseJoin& reverse_join, int mode) {
213     switch (mode%3) {
214     case 0:
215         return tbb::parallel_scan(range, idx, scan, reverse_join);
216         break;
217     case 1:
218         return tbb::parallel_scan(range, idx, scan, reverse_join, tbb::simple_partitioner());
219         break;
220     default:
221         return tbb::parallel_scan(range, idx, scan, reverse_join, tbb::auto_partitioner());
222     }
223 }
224 
225 template<typename T>
226 class ScanBody {
227     const std::vector<T> &my_addend;
228     std::vector<T> &my_sum;
229     const T my_init;
230     ScanBody& operator= (const ScanBody&);
231 public:
232     ScanBody(T init, const std::vector<T> &addend, std::vector<T> &sum) :my_addend(addend), my_sum(sum), my_init(init) {}
233     template<typename S, typename Tag>
234     Storage<S> operator()(const Range& r, Storage<S> storage, Tag) const {
235         return ScanWithInit(r, my_init, Tag::is_final_scan(), storage, my_sum, my_addend);
236     }
237 };
238 
239 class JoinBody {
240 public:
241     template<typename T>
242     Storage<T> operator()(const Storage<T>& left, Storage<T>& right) const {
243         JoinStorages(left, right);
244         return right;
245     }
246 };
247 
248 struct ParallelScanTemplateFunctor {
249     template<typename T>
250     T operator()(Range range, T init, const std::vector<T> &addend, std::vector<T> &sum, int mode) {
251         for (long i = 0; i<MAXN; ++i) {
252             AddendHistory[i] = UNUSED;
253         }
254         ScanIsRunning = true;
255         ScanBody<T> sb(init, addend, sum);
256         JoinBody jb;
257         Storage<T> res = ParallelScanFunctionalInvoker(range, Storage<T>(0), sb, jb, mode);
258         ScanIsRunning = false;
259         if (range.empty())
260             res.my_total = init;
261         return res.my_total;
262     }
263 };
264 
265 struct ParallelScanLambda {
266     template<typename T>
267     T operator()(Range range, T init, const std::vector<T> &addend, std::vector<T> &sum, int mode) {
268         for (long i = 0; i<MAXN; ++i) {
269             AddendHistory[i] = UNUSED;
270         }
271         ScanIsRunning = true;
272         Storage<T> res = ParallelScanFunctionalInvoker(range, Storage<T>(0),
273             [&addend, &sum, init](const Range& r, Storage<T> storage, bool is_final_scan /*tag*/) -> Storage<T> {
274                 return ScanWithInit(r, init, is_final_scan, storage, sum, addend);
275             },
276             [](const Storage<T>& left, Storage<T>& right) -> Storage<T> {
277                 JoinStorages(left, right);
278                 return right;
279             },
280             mode);
281         ScanIsRunning = false;
282         if (range.empty())
283             res.my_total = init;
284         return res.my_total;
285     }
286 };
287 
288 void TestAccumulator( int mode ) {
289     typedef int T;
290     std::vector<T> addend(MAXN);
291     std::vector<T> sum(MAXN);
292     std::vector<T> control_sum(MAXN);
293     T control_total;
294     for( int n=0; n<=MAXN; n = n <=128? n+1: n*3) {
295         for( int gs : {1, 2, 100, 511, 12345, n/ 111, n/17, n-1, n}) {
296             if(gs<=0 || gs > n)
297                 continue;
298             control_total = 42;
299             for( long i=0; i<MAXN; ++i ) {
300                 addend[i] = -1;
301                 sum[i] = -2;
302                 control_sum[i] = -2;
303                 AddendHistory[i] = UNUSED;
304             }
305             for (long i = 0; i<n; ++i) {
306                 addend[i] = i;
307                 control_total += addend[i];
308                 control_sum[i] = control_total;
309             }
310 
311             Accumulator<T> acc( 42, addend, sum);
312             ScanIsRunning = true;
313 
314             switch (mode) {
315                 case 0:
316                     tbb::parallel_scan( Range( 0, n,  gs ), acc );
317                 break;
318                 case 1:
319                     tbb::parallel_scan( Range( 0, n, gs ), acc, tbb::simple_partitioner() );
320                 break;
321                 case 2:
322                     tbb::parallel_scan( Range( 0, n, gs ), acc, tbb::auto_partitioner() );
323                 break;
324             }
325 
326             ScanIsRunning = false;
327 
328             long used_once_count = 0;
329             for( long i=0; i<n; ++i )
330                 CHECK_MESSAGE((AddendHistory[i]&USED_FINAL), "failed to use addend[" << i << "] " << (AddendHistory[i] & USED_NONFINAL ? "(but used nonfinal)\n" : "\n"));
331             for( long i=0; i<n; ++i ) {
332                 VerifySum( 42, i, sum[i], __LINE__ );
333                 used_once_count += AddendHistory[i]==USED_FINAL;
334             }
335             if( n )
336                 CHECK(acc.get_total()==sum[n-1]);
337             else
338                 CHECK(acc.get_total()==42);
339             CHECK(control_total ==acc.get_total());
340             CHECK(control_sum==sum);
341         }
342     }
343 }
344 
345 template<typename ParallelScanWrapper>
346 void TestInterface( int mode, ParallelScanWrapper parallel_scan_wrapper ) {
347     using T = int;
348     std::vector<T> addend(MAXN);
349     std::vector<T> control_sum(MAXN);
350     T control_total(42);
351     for( long i=0; i<MAXN; ++i ) {
352         addend[i] = i;
353         control_total += addend[i];
354         control_sum[i] = control_total;
355         AddendHistory[i] = UNUSED;
356     }
357 
358     std::vector<T> sum(MAXN);
359     for (long i = 0; i<MAXN; ++i)
360         sum[i] = -2;
361     ScanIsRunning = true;
362     T total = parallel_scan_wrapper(Range(0, MAXN, 1), 42, addend, sum, mode);
363     ScanIsRunning = false;
364 
365     CHECK_MESSAGE(control_total==total, "Parallel prefix sum is not equal to serial");
366     CHECK_MESSAGE(control_sum==sum, "Parallel prefix vector is not equal to serial");
367 }
368 
369 
370 #if __TBB_CPP14_GENERIC_LAMBDAS_PRESENT
371 struct ParallelScanGenericLambda {
372     template<typename T>
373     T operator()(Range range, T init, const std::vector<T> &addend, std::vector<T> &sum, int mode) {
374         for (long i = 0; i<MAXN; ++i) {
375             AddendHistory[i] = UNUSED;
376         }
377         ScanIsRunning = true;
378         Storage<T> res = ParallelScanFunctionalInvoker(range, Storage<T>(0),
379             [&addend, &sum, init](const auto& rng, auto storage, auto scan_tag) {
380                 return ScanWithInit(rng, init, scan_tag.is_final_scan(), storage, sum, addend);
381             },
382             [](const auto& left, auto& right) {
383                 JoinStorages(left, right);
384                 return right;
385             },
386             mode);
387         ScanIsRunning = false;
388         if (range.empty())
389             res.my_total = init;
390         return res.my_total;
391     }
392 };
393 #endif /* __TBB_CPP14_GENERIC_LAMBDAS_PRESENT */
394 
395 
396 
397 // Test for parallel_scan with with different partitioners
398 //! \brief \ref error_guessing \ref resource_usage
399 TEST_CASE("parallel_scan testing with different partitioners") {
400     for (auto concurrency_level : utils::concurrency_range()) {
401         tbb::global_control control(tbb::global_control::max_allowed_parallelism, concurrency_level);
402         for (int mode = 0; mode < 3; mode++) {
403             NumberOfLiveStorage = 0;
404             TestAccumulator(mode);
405             // Test that all workers sleep when no work
406             TestCPUUserTime(concurrency_level);
407 
408             // Checking has to be done late, because when parallel_scan makes copies of
409             // the user's "Body", the copies might be destroyed slightly after parallel_scan
410             // returns.
411             CHECK(NumberOfLiveStorage == 0);
412         }
413     }
414 }
415 
416 // Test for parallel_scan with template functors
417 //! \brief \ref error_guessing \ref interface \ref resource_usage
418 TEST_CASE("parallel_scan testing with template functor") {
419     for (auto concurrency_level : utils::concurrency_range()) {
420         tbb::global_control control(tbb::global_control::max_allowed_parallelism, concurrency_level);
421         for (int mode = 0; mode < 3; mode++) {
422             NumberOfLiveStorage = 0;
423             TestInterface(mode,  ParallelScanTemplateFunctor());
424             // Test that all workers sleep when no work
425             TestCPUUserTime(concurrency_level);
426 
427             // Checking has to be done late, because when parallel_scan makes copies of
428             // the user's "Body", the copies might be destroyed slightly after parallel_scan
429             // returns.
430             CHECK(NumberOfLiveStorage == 0);
431         }
432     }
433 }
434 
435 // Test for parallel_scan with lambdas
436 //! \brief \ref error_guessing \ref interface \ref resource_usage
437 TEST_CASE("parallel_scan testing with lambdas") {
438     for (auto concurrency_level : utils::concurrency_range()) {
439         tbb::global_control control(tbb::global_control::max_allowed_parallelism, concurrency_level);
440         for (int mode = 0; mode < 3; mode++) {
441             NumberOfLiveStorage = 0;
442             TestInterface(mode,  ParallelScanLambda());
443 
444             // Test that all workers sleep when no work
445             TestCPUUserTime(concurrency_level);
446 
447             // Checking has to be done late, because when parallel_scan makes copies of
448             // the user's "Body", the copies might be destroyed slightly after parallel_scan
449             // returns.
450             CHECK(NumberOfLiveStorage == 0);
451         }
452     }
453 }
454 
455 #if __TBB_CPP14_GENERIC_LAMBDAS_PRESENT
456 // Test for parallel_scan with genetic lambdas
457 //! \brief \ref error_guessing \ref interface \ref resource_usage
458 TEST_CASE("parallel_scan testing with generic lambdas") {
459     for (auto concurrency_level : utils::concurrency_range()) {
460         tbb::global_control control(tbb::global_control::max_allowed_parallelism, concurrency_level);
461         for (int mode = 0; mode < 3; mode++) {
462             NumberOfLiveStorage = 0;
463             TestInterface(mode,  ParallelScanGenericLambda());
464             // Test that all workers sleep when no work
465             TestCPUUserTime(concurrency_level);
466 
467             // Checking has to be done late, because when parallel_scan makes copies of
468             // the user's "Body", the copies might be destroyed slightly after parallel_scan
469             // returns.
470             CHECK(NumberOfLiveStorage == 0);
471         }
472     }
473 }
474 #endif /* __TBB_CPP14_GENERIC_LAMBDAS_PRESENT */
475