xref: /oneTBB/test/tbb/test_parallel_reduce.cpp (revision b15aabb3)
1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include <atomic>
18 
19 #include "common/parallel_reduce_common.h"
20 #include "common/cpu_usertime.h"
21 #include "common/exception_handling.h"
22 
23 //! \file test_parallel_reduce.cpp
24 //! \brief Test for [algorithms.parallel_reduce algorithms.parallel_deterministic_reduce] specification
25 
26 using ValueType = uint64_t;
27 
28 struct Sum {
29     template<typename T>
30     T operator() ( const T& v1, const T& v2 ) const {
31         return v1 + v2;
32     }
33 };
34 
35 struct Accumulator {
36     ValueType operator() ( const tbb::blocked_range<ValueType*>& r, ValueType value ) const {
37         for ( ValueType* pv = r.begin(); pv != r.end(); ++pv )
38             value += *pv;
39         return value;
40     }
41 };
42 
43 class ParallelSumTester {
44 public:
45     ParallelSumTester( const ParallelSumTester& ) = default;
46     void operator=( const ParallelSumTester& ) = delete;
47 
48     ParallelSumTester() : m_range(nullptr, nullptr) {
49         m_array = new ValueType[unsigned(count)];
50         for ( ValueType i = 0; i < count; ++i )
51             m_array[i] = i + 1;
52         m_range = tbb::blocked_range<ValueType*>( m_array, m_array + count );
53     }
54     ~ParallelSumTester() { delete[] m_array; }
55 
56     template<typename Partitioner>
57     void CheckParallelReduce() {
58         Partitioner partitioner;
59         ValueType result1 = reduce_invoker<ValueType>( m_range, Accumulator(), Sum(), partitioner );
60         REQUIRE_MESSAGE( result1 == expected, "Wrong parallel summation result" );
61         ValueType result2 = reduce_invoker<ValueType>( m_range,
62             [](const tbb::blocked_range<ValueType*>& r, ValueType value) -> ValueType {
63                 for ( const ValueType* pv = r.begin(); pv != r.end(); ++pv )
64                     value += *pv;
65                 return value;
66             },
67             Sum(),
68             partitioner
69         );
70         REQUIRE_MESSAGE( result2 == expected, "Wrong parallel summation result" );
71     }
72 private:
73     ValueType* m_array;
74     tbb::blocked_range<ValueType*> m_range;
75     static const ValueType count, expected;
76 };
77 
78 const ValueType ParallelSumTester::count = 1000000;
79 const ValueType ParallelSumTester::expected = count * (count + 1) / 2;
80 
81 namespace test_cancellation {
82 
83 struct ReduceToCancel {
84     std::size_t operator()( const tbb::blocked_range<std::size_t>&, std::size_t ) const {
85         ++g_CurExecuted;
86         Cancellator::WaitUntilReady();
87         return 1;
88     }
89 }; // struct ReduceToCancel
90 
91 struct JoinToCancel {
92     std::size_t operator()( std::size_t, std::size_t ) const {
93         ++g_CurExecuted;
94         Cancellator::WaitUntilReady();
95         return 1;
96     }
97 }; // struct Join
98 
99 struct ReduceFunctorToCancel {
100     std::size_t result;
101 
102     ReduceFunctorToCancel() : result(0) {}
103     ReduceFunctorToCancel( ReduceFunctorToCancel&, tbb::split ) : result(0) {}
104 
105     void operator()( const tbb::blocked_range<std::size_t>& br ) {
106         result = ReduceToCancel{}(br, result);
107     }
108 
109     void join( ReduceFunctorToCancel& rhs ) {
110         result = JoinToCancel{}(result, rhs.result);
111     }
112 }; // struct ReduceFunctorToCancel
113 
114 static constexpr std::size_t buffer_test_size = 1024;
115 static constexpr std::size_t maxParallelReduceRunnerMode = 9;
116 
117 template <std::size_t Mode>
118 class ParallelReduceRunner {
119     tbb::task_group_context& my_ctx;
120 
121     static_assert(Mode >= 0 && Mode <= maxParallelReduceRunnerMode, "Incorrect mode for ParallelReduceTask");
122 
123     template <typename... Args>
124     void run_parallel_reduce( Args&&... args ) const {
125         switch(Mode % 5) {
126             case 0 : {
127                 tbb::parallel_reduce(std::forward<Args>(args)..., my_ctx);
128                 break;
129             }
130             case 1 : {
131                 tbb::parallel_reduce(std::forward<Args>(args)..., tbb::simple_partitioner{}, my_ctx);
132                 break;
133             }
134             case 2 : {
135                 tbb::parallel_reduce(std::forward<Args>(args)..., tbb::auto_partitioner{}, my_ctx);
136                 break;
137             }
138             case 3 : {
139                 tbb::parallel_reduce(std::forward<Args>(args)..., tbb::static_partitioner{}, my_ctx);
140                 break;
141             }
142             case 4 : {
143                 tbb::affinity_partitioner aff;
144                 tbb::parallel_reduce(std::forward<Args>(args)..., aff, my_ctx);
145                 break;
146             }
147         }
148     }
149 
150 public:
151     ParallelReduceRunner( tbb::task_group_context& ctx )
152         : my_ctx(ctx) {}
153 
154     void operator()() const {
155         tbb::blocked_range<std::size_t> br(0, buffer_test_size);
156         if (Mode < 5) {
157             ReduceFunctorToCancel functor;
158             run_parallel_reduce(br, functor);
159         } else {
160             run_parallel_reduce(br, 0, ReduceToCancel{}, JoinToCancel{});
161         }
162     }
163 }; // class ParallelReduceRunner
164 
165 static constexpr std::size_t maxParallelDeterministicReduceRunnerMode = 5;
166 
167 // TODO: unify with ParallelReduceRunner
168 template <std::size_t Mode>
169 class ParallelDeterministicReduceRunner {
170     tbb::task_group_context& my_ctx;
171 
172     static_assert(Mode >= 0 && Mode <= maxParallelDeterministicReduceRunnerMode, "Incorrect Mode for deterministic_reduce task");
173 
174     template <typename... Args>
175     void run_parallel_deterministic_reduce( Args&&... args ) const {
176         switch(Mode % 3) {
177             case 0 : {
178                 tbb::parallel_deterministic_reduce(std::forward<Args>(args)..., my_ctx);
179                 break;
180             }
181             case 1 : {
182                 tbb::parallel_deterministic_reduce(std::forward<Args>(args)..., tbb::simple_partitioner{}, my_ctx);
183                 break;
184             }
185             case 2 : {
186                 tbb::parallel_deterministic_reduce(std::forward<Args>(args)..., tbb::static_partitioner{}, my_ctx);
187                 break;
188             }
189         }
190     }
191 
192 public:
193     ParallelDeterministicReduceRunner( tbb::task_group_context& ctx )
194         : my_ctx(ctx) {}
195 
196     void operator()() const {
197         tbb::blocked_range<std::size_t> br(0, buffer_test_size);
198         if (Mode < 3) {
199             ReduceFunctorToCancel functor;
200             run_parallel_deterministic_reduce(br, functor);
201         } else {
202             run_parallel_deterministic_reduce(br, 0, ReduceToCancel{}, JoinToCancel{});
203         }
204     }
205 }; // class ParallelDeterministicReduceRunner
206 
207 template <std::size_t Mode>
208 void run_parallel_reduce_cancellation_test() {
209     for ( auto concurrency_level : utils::concurrency_range() ) {
210         if (concurrency_level < 2) continue;
211 
212         tbb::global_control gc(tbb::global_control::max_allowed_parallelism, concurrency_level);
213         ResetEhGlobals();
214         RunCancellationTest<ParallelReduceRunner<Mode>, Cancellator>();
215     }
216 }
217 
218 template <std::size_t Mode>
219 void run_parallel_deterministic_reduce_cancellation_test() {
220     for ( auto concurrency_level : utils::concurrency_range() ) {
221         if (concurrency_level < 2) continue;
222 
223         tbb::global_control gc(tbb::global_control::max_allowed_parallelism, concurrency_level);
224         ResetEhGlobals();
225         RunCancellationTest<ParallelDeterministicReduceRunner<Mode>, Cancellator>();
226     }
227 }
228 
229 template <std::size_t Mode>
230 struct ParallelReduceTestRunner {
231     static void run() {
232         run_parallel_reduce_cancellation_test<Mode>();
233         ParallelReduceTestRunner<Mode + 1>::run();
234     }
235 }; // struct ParallelReduceTestRunner
236 
237 template <>
238 struct ParallelReduceTestRunner<maxParallelReduceRunnerMode> {
239     static void run() {
240         run_parallel_reduce_cancellation_test<maxParallelReduceRunnerMode>();
241     }
242 }; // struct ParallelReduceTestRunner<maxParallelReduceRunnerMode>
243 
244 template <std::size_t Mode>
245 struct ParallelDeterministicReduceTestRunner {
246     static void run() {
247         run_parallel_deterministic_reduce_cancellation_test<Mode>();
248         ParallelDeterministicReduceTestRunner<Mode + 1>::run();
249     }
250 }; // struct ParallelDeterministicReduceTestRunner
251 
252 template <>
253 struct ParallelDeterministicReduceTestRunner<maxParallelDeterministicReduceRunnerMode> {
254     static void run() {
255         run_parallel_deterministic_reduce_cancellation_test<maxParallelDeterministicReduceRunnerMode>();
256     }
257 }; // struct ParallelDeterministicReduceTestRunner<maxParallelDeterministicReduceRunnerMode>
258 
259 } // namespace test_cancellation
260 
261 //! Test parallel summation correctness
262 //! \brief \ref stress
263 TEST_CASE("Test parallel summation correctness") {
264     ParallelSumTester pst;
265     pst.CheckParallelReduce<utils_default_partitioner>();
266     pst.CheckParallelReduce<tbb::simple_partitioner>();
267     pst.CheckParallelReduce<tbb::auto_partitioner>();
268     pst.CheckParallelReduce<tbb::affinity_partitioner>();
269     pst.CheckParallelReduce<tbb::static_partitioner>();
270 }
271 
272 static std::atomic<long> ForkCount;
273 static std::atomic<long> FooBodyCount;
274 
275 //! Class with public interface that is exactly minimal requirements for Range concept
276 class MinimalRange {
277     size_t begin, end;
278     friend class FooBody;
279     explicit MinimalRange( size_t i ) : begin(0), end(i) {}
280     template <typename Partitioner_> friend void TestSplitting( std::size_t nthread );
281 public:
282     MinimalRange( MinimalRange& r, tbb::split ) : end(r.end) {
283         begin = r.end = (r.begin+r.end)/2;
284     }
285     bool is_divisible() const {return end-begin>=2;}
286     bool empty() const {return begin==end;}
287 };
288 
289 //! Class with public interface that is exactly minimal requirements for Body of a parallel_reduce
290 class FooBody {
291 private:
292     FooBody( const FooBody& );          // Deny access
293     void operator=( const FooBody& );   // Deny access
294     template <typename Partitioner_> friend void TestSplitting( std::size_t nthread );
295     //! Parent that created this body via split operation.  NULL if original body.
296     FooBody* parent;
297     //! Total number of index values processed by body and its children.
298     size_t sum;
299     //! Number of join operations done so far on this body and its children.
300     long join_count;
301     //! Range that has been processed so far by this body and its children.
302     size_t begin, end;
303     //! True if body has not yet been processed at least once by operator().
304     bool is_new;
305     //! 1 if body was created by split; 0 if original body.
306     int forked;
307     FooBody() {++FooBodyCount;}
308 public:
309     ~FooBody() {
310         forked = 0xDEADBEEF;
311         sum=0xDEADBEEF;
312         join_count=0xDEADBEEF;
313         --FooBodyCount;
314     }
315     FooBody( FooBody& other, tbb::split ) {
316         ++FooBodyCount;
317         ++ForkCount;
318         sum = 0;
319         parent = &other;
320         join_count = 0;
321         is_new = true;
322         forked = 1;
323     }
324 
325     void init() {
326         sum = 0;
327         parent = nullptr;
328         join_count = 0;
329         is_new = true;
330         forked = 0;
331         begin = ~size_t(0);
332         end = ~size_t(0);
333     }
334 
335     void join( FooBody& s ) {
336         REQUIRE( s.forked==1 );
337         REQUIRE( this!=&s );
338         REQUIRE( this==s.parent );
339         REQUIRE( end==s.begin );
340         end = s.end;
341         sum += s.sum;
342         join_count += s.join_count + 1;
343         s.forked = 2;
344     }
345     void operator()( const MinimalRange& r ) {
346         for( size_t k=r.begin; k<r.end; ++k )
347             ++sum;
348         if( is_new ) {
349             is_new = false;
350             begin = r.begin;
351         } else
352             REQUIRE( end==r.begin );
353         end = r.end;
354     }
355 };
356 
357 template<typename Partitioner>
358 void TestSplitting( std::size_t nthread ) {
359     ForkCount = 0;
360     long join_count = 0;
361     Partitioner partitioner;
362     for( size_t i=0; i<=1000; ++i ) {
363         FooBody f;
364         f.init();
365         REQUIRE_MESSAGE( FooBodyCount==1, "Wrong initial BodyCount value" );
366         reduce_invoker(MinimalRange(i), f, partitioner);
367 
368         if (nthread == 1) REQUIRE_MESSAGE(ForkCount==0, "Body was split during 1 thread execution");
369 
370         join_count += f.join_count;
371         REQUIRE_MESSAGE( FooBodyCount==1, "Some copies of FooBody was not removed after reduction");
372         REQUIRE_MESSAGE( f.sum==i, "Incorrect reduction" );
373         REQUIRE_MESSAGE( f.begin==(i==0 ? ~size_t(0) : 0), "Incorrect range borders" );
374         REQUIRE_MESSAGE( f.end==(i==0 ? ~size_t(0) : i), "Incorrect range borders" );
375     }
376 }
377 
378 //! Test splitting range and body during reduction, test that all workers sleep when no work
379 //! \brief \ref resource_usage \ref error_guessing
380 TEST_CASE("Test splitting range and body during reduction, test that all workers sleep when no work") {
381     for ( auto concurrency_level : utils::concurrency_range() ) {
382         tbb::global_control control(tbb::global_control::max_allowed_parallelism, concurrency_level);
383 
384         TestSplitting<tbb::simple_partitioner>(concurrency_level);
385         TestSplitting<tbb::static_partitioner>(concurrency_level);
386         TestSplitting<tbb::auto_partitioner>(concurrency_level);
387         TestSplitting<tbb::affinity_partitioner>(concurrency_level);
388         TestSplitting<utils_default_partitioner>(concurrency_level);
389 
390         // Test that all workers sleep when no work
391         TestCPUUserTime(concurrency_level);
392     }
393 }
394 
395 //! Define overloads of parallel_deterministic_reduce that accept "undesired" types of partitioners
396 namespace unsupported {
397     template<typename Range, typename Body>
398     void parallel_deterministic_reduce(const Range&, Body&, const tbb::auto_partitioner&) { }
399     template<typename Range, typename Body>
400     void parallel_deterministic_reduce(const Range&, Body&, tbb::affinity_partitioner&) { }
401 
402     template<typename Range, typename Value, typename RealBody, typename Reduction>
403     Value parallel_deterministic_reduce(const Range& , const Value& identity, const RealBody& , const Reduction& , const tbb::auto_partitioner&) {
404         return identity;
405     }
406     template<typename Range, typename Value, typename RealBody, typename Reduction>
407     Value parallel_deterministic_reduce(const Range& , const Value& identity, const RealBody& , const Reduction& , tbb::affinity_partitioner&) {
408         return identity;
409     }
410 }
411 
412 struct Body {
413     float value;
414     Body() : value(0) {}
415     Body(Body&, tbb::split) { value = 0; }
416     void operator()(const tbb::blocked_range<int>&) {}
417     void join(Body&) {}
418 };
419 
420 //! Check that other types of partitioners are not supported (auto, affinity)
421 //! In the case of "unsupported" API unexpectedly sneaking into namespace tbb,
422 //! this test should result in a compilation error due to overload resolution ambiguity
423 //! \brief \ref negative \ref error_guessing
424 TEST_CASE("Test Unsupported Partitioners") {
425     using namespace tbb;
426     using namespace unsupported;
427     Body body;
428     parallel_deterministic_reduce(blocked_range<int>(0, 10), body, tbb::auto_partitioner());
429 
430     tbb::affinity_partitioner ap;
431     parallel_deterministic_reduce(blocked_range<int>(0, 10), body, ap);
432 
433     parallel_deterministic_reduce(
434         blocked_range<int>(0, 10),
435         0,
436         [](const blocked_range<int>&, int init)->int {
437             return init;
438         },
439         [](int x, int y)->int {
440             return x + y;
441         },
442         tbb::auto_partitioner()
443     );
444     parallel_deterministic_reduce(
445         blocked_range<int>(0, 10),
446         0,
447         [](const blocked_range<int>&, int init)->int {
448             return init;
449         },
450         [](int x, int y)->int {
451             return x + y;
452         },
453         ap
454     );
455 }
456 
457 //! Testing tbb::parallel_reduce with tbb::task_group_context
458 //! \brief \ref interface \ref error_guessing
459 TEST_CASE("cancellation test for tbb::parallel_reduce") {
460     test_cancellation::ParallelReduceTestRunner</*First mode = */0>::run();
461 }
462 
463 //! Testing tbb::parallel_deterministic_reduce with tbb::task_group_context
464 //! \brief \ref interface \ref error_guessing
465 TEST_CASE("cancellation test for tbb::parallel_deterministic_reduce") {
466     test_cancellation::ParallelDeterministicReduceTestRunner</*First mode = */0>::run();
467 }
468