xref: /oneTBB/test/tbb/test_parallel_for.cpp (revision 51c0b2f7)
1 /*
2     Copyright (c) 2005-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "common/test.h"
18 #include "common/config.h"
19 #include "common/utils.h"
20 #include "common/utils_concurrency_limit.h"
21 #include "common/utils_report.h"
22 #include "common/vector_types.h"
23 #include "common/cpu_usertime.h"
24 #include "common/spin_barrier.h"
25 #include "common/exception_handling.h"
26 
27 #include "tbb/tick_count.h"
28 #include "tbb/blocked_range.h"
29 #include "tbb/parallel_for.h"
30 #include "tbb/global_control.h"
31 #include "tbb/test_partitioner.h"
32 
33 #include <cstdio>
34 #include <vector>
35 #include <sstream>
36 
37 //! \file test_parallel_for.cpp
38 //! \brief Test for [algorithms.parallel_for] specification
39 
40 #if _MSC_VER
41 #pragma warning (push)
42 #if __TBB_MSVC_UNREACHABLE_CODE_IGNORED
43     // Suppress pointless "unreachable code" warning.
44     #pragma warning (disable: 4702)
45 #endif
46 #if defined(_Wp64)
47     // Workaround for overzealous compiler warnings in /Wp64 mode
48     #pragma warning (disable: 4267)
49 #endif
50 #define _SCL_SECURE_NO_WARNINGS
51 #endif //#if _MSC_VER
52 
53 
54 #if (HAVE_m128 || HAVE_m256)
55 template<typename ClassWithVectorType>
56 struct SSE_Functor {
57     ClassWithVectorType* Src, * Dst;
58     SSE_Functor( ClassWithVectorType* src, ClassWithVectorType* dst ) : Src(src), Dst(dst) {}
59 
60     void operator()( tbb::blocked_range<int>& r ) const {
61         for( int i=r.begin(); i!=r.end(); ++i )
62             Dst[i] = Src[i];
63     }
64 };
65 
66 //! Test that parallel_for works with stack-allocated __m128
67 template<typename ClassWithVectorType>
68 void TestVectorTypes() {
69     const int aSize = 300;
70     ClassWithVectorType Array1[aSize], Array2[aSize];
71     for( int i=0; i<aSize; ++i ) {
72         // VC8 does not properly align a temporary value; to work around, use explicit variable
73         ClassWithVectorType foo(i);
74         Array1[i] = foo;
75     }
76     tbb::parallel_for( tbb::blocked_range<int>(0,aSize), SSE_Functor<ClassWithVectorType>(Array1, Array2) );
77     for( int i=0; i<aSize; ++i ) {
78         ClassWithVectorType foo(i);
79         CHECK( Array2[i]==foo ) ;
80     }
81 }
82 #endif /* HAVE_m128 || HAVE_m256 */
83 
84 struct TestSimplePartitionerStabilityFunctor {
85   std::vector<int> & ranges;
86   TestSimplePartitionerStabilityFunctor(std::vector<int> & theRanges):ranges(theRanges){}
87   void operator()(tbb::blocked_range<size_t>& r)const{
88       ranges.at(r.begin()) = 1;
89   }
90 };
91 void TestSimplePartitionerStability(){
92     const std::size_t repeat_count= 10;
93     const std::size_t rangeToSplitSize=1000000;
94     const std::size_t grainsizeStep=rangeToSplitSize/repeat_count;
95     typedef TestSimplePartitionerStabilityFunctor FunctorType;
96 
97     for (std::size_t i=0 , grainsize=grainsizeStep; i<repeat_count;i++, grainsize+=grainsizeStep){
98         std::vector<int> firstSeries(rangeToSplitSize,0);
99         std::vector<int> secondSeries(rangeToSplitSize,0);
100 
101         tbb::parallel_for(tbb::blocked_range<size_t>(0,rangeToSplitSize,grainsize),FunctorType(firstSeries),tbb::simple_partitioner());
102         tbb::parallel_for(tbb::blocked_range<size_t>(0,rangeToSplitSize,grainsize),FunctorType(secondSeries),tbb::simple_partitioner());
103         std::stringstream str; str<<i;
104         CHECK_MESSAGE(firstSeries==secondSeries, ("splitting range with tbb::simple_partitioner must be reproducible; i=" +str.str()).c_str() );
105     }
106 }
107 
108 namespace various_range_implementations {
109 
110 using namespace test_partitioner_utils;
111 using namespace test_partitioner_utils::TestRanges;
112 
113 // Body ensures that initial work distribution is done uniformly through affinity mechanism and not through work stealing
114 class Body {
115     utils::SpinBarrier &m_sb;
116 public:
117     Body(utils::SpinBarrier& sb) : m_sb(sb) { }
118     Body(Body& b, tbb::split) : m_sb(b.m_sb) { }
119 
120     template <typename Range>
121     void operator()(Range& r) const {
122         INFO("Executing range [" << r.begin() << ", " << r.end() << "]");
123         m_sb.timedWait(10); // waiting for all threads
124     }
125 };
126 
127 namespace correctness {
128 
129 /* Testing only correctness (that is parallel_for does not hang) */
130 template <typename RangeType, bool /* feedback */, bool ensure_non_emptiness>
131 void test() {
132     RangeType range( 0, utils::get_platform_max_threads(), NULL, false, ensure_non_emptiness );
133     tbb::affinity_partitioner ap;
134     tbb::parallel_for( range, SimpleBody(), ap );
135 }
136 
137 } // namespace correctness
138 
139 namespace uniform_distribution {
140 
141 /* Body of parallel_for algorithm would hang if non-uniform work distribution happened  */
142 template <typename RangeType, bool feedback, bool ensure_non_emptiness>
143 void test() {
144     static const std::size_t thread_num = utils::get_platform_max_threads();
145     utils::SpinBarrier sb( thread_num );
146     RangeType range(0, thread_num, NULL, feedback, ensure_non_emptiness);
147     const Body sync_body( sb );
148     tbb::affinity_partitioner ap;
149     tbb::parallel_for( range, sync_body, ap );
150     tbb::parallel_for( range, sync_body, tbb::static_partitioner() );
151 }
152 
153 } // namespace uniform_distribution
154 
155 void test() {
156     const bool provide_feedback = false;
157     const bool ensure_non_empty_range = true;
158 
159     // BlockedRange does not take into account feedback and non-emptiness settings but uses the
160     // tbb::blocked_range implementation
161     uniform_distribution::test<BlockedRange, !provide_feedback, !ensure_non_empty_range>();
162     using correctness::test;
163 
164     {
165         test<RoundedDownRange, provide_feedback, ensure_non_empty_range>();
166         test<RoundedDownRange, provide_feedback, !ensure_non_empty_range>();
167     }
168 
169     {
170         test<RoundedUpRange, provide_feedback, ensure_non_empty_range>();
171         test<RoundedUpRange, provide_feedback, !ensure_non_empty_range>();
172     }
173 
174     // Testing that parallel_for algorithm works with such weird ranges
175     correctness::test<Range1_2, /* provide_feedback= */ false, !ensure_non_empty_range>();
176     correctness::test<Range1_999, /* provide_feedback= */ false, !ensure_non_empty_range>();
177     correctness::test<Range999_1, /* provide_feedback= */ false, !ensure_non_empty_range>();
178 
179     // The following ranges do not comply with the proportion suggested by partitioner. Therefore
180     // they have to provide the proportion in which they were actually split back to partitioner and
181     // ensure theirs non-emptiness
182     test<Range1_2, provide_feedback, ensure_non_empty_range>();
183     test<Range1_999, provide_feedback, ensure_non_empty_range>();
184     test<Range999_1, provide_feedback, ensure_non_empty_range>();
185 }
186 
187 } // namespace various_range_implementations
188 
189 namespace test_cancellation {
190 
191 struct FunctorToCancel {
192     static std::atomic<bool> need_to_wait;
193 
194     void operator()( std::size_t ) const {
195         ++g_CurExecuted;
196         if (need_to_wait) {
197             need_to_wait = Cancellator::WaitUntilReady();
198         }
199     }
200 
201     void operator()( const tbb::blocked_range<std::size_t>& ) const {
202         ++g_CurExecuted;
203         Cancellator::WaitUntilReady();
204     }
205 
206     static void reset() { need_to_wait = true; }
207 }; // struct FunctorToCancel
208 
209 std::atomic<bool> FunctorToCancel::need_to_wait(true);
210 
211 static constexpr std::size_t buffer_test_size = 1024;
212 static constexpr std::size_t maxParallelForRunnerMode = 14;
213 
214 template <std::size_t Mode>
215 class ParallelForRunner {
216     tbb::task_group_context& my_ctx;
217     const std::size_t worker_task_step = 1;
218 
219     static_assert(Mode >= 0 && Mode <= maxParallelForRunnerMode, "Incorrect mode for ParallelForRunner");
220 
221     template <typename Partitioner, typename... Args>
222     void run_parallel_for( Args&&... args ) const {
223         Partitioner part;
224         tbb::parallel_for(std::forward<Args>(args)..., part, my_ctx);
225     }
226 
227     template <typename... Args>
228     void run_overload( Args&&... args ) const {
229 
230         switch(Mode % 5) {
231             case 0 : {
232                 tbb::parallel_for(std::forward<Args>(args)..., my_ctx);
233                 break;
234             }
235             case 1 : {
236                 run_parallel_for<tbb::simple_partitioner>(std::forward<Args>(args)...);
237                 break;
238             }
239             case 2 : {
240                 run_parallel_for<tbb::auto_partitioner>(std::forward<Args>(args)...);
241                 break;
242             }
243             case 3 : {
244                 run_parallel_for<tbb::static_partitioner>(std::forward<Args>(args)...);
245                 break;
246             }
247             case 4 : {
248                 run_parallel_for<tbb::affinity_partitioner>(std::forward<Args>(args)...);
249                 break;
250             }
251         }
252     }
253 
254 public:
255     ParallelForRunner( tbb::task_group_context& ctx )
256         : my_ctx(ctx) {}
257 
258     ~ParallelForRunner() { FunctorToCancel::reset(); }
259 
260     void operator()() const {
261         if (Mode < 5) {
262             // Overload with blocked range
263             tbb::blocked_range<std::size_t> br(0, buffer_test_size);
264             run_overload(br, FunctorToCancel{});
265         } else if (Mode < 10) {
266             // Overload with two indexes
267             run_overload(std::size_t(0), buffer_test_size, FunctorToCancel{});
268         } else {
269             // Overload with two indexes and step
270             run_overload(std::size_t(0), buffer_test_size, worker_task_step, FunctorToCancel{});
271         }
272     }
273 }; // class ParallelForRunner
274 
275 template <std::size_t Mode>
276 void run_parallel_for_cancellation_test() {
277     // TODO: enable concurrency_range
278     ResetEhGlobals();
279     RunCancellationTest<ParallelForRunner<Mode>, Cancellator>();
280 }
281 
282 template <std::size_t Mode>
283 struct ParallelForTestRunner {
284     static void run() {
285         run_parallel_for_cancellation_test<Mode>();
286         ParallelForTestRunner<Mode + 1>::run();
287     }
288 }; // struct ParallelForTestRunner
289 
290 template <>
291 struct ParallelForTestRunner<maxParallelForRunnerMode> {
292     static void run() {
293         run_parallel_for_cancellation_test<maxParallelForRunnerMode>();
294     }
295 }; // struct ParallelForTestRunner<maxParallelForRunnerMode>
296 
297 } // namespace test_cancellation
298 
299 #if TBB_USE_EXCEPTIONS && !__TBB_THROW_ACROSS_MODULE_BOUNDARY_BROKEN && TBB_REVAMP_TODO
300 //! Testing exceptions
301 //! \brief \ref requirement
302 TEST_CASE("Exceptions support") {
303     for ( int p = MinThread; p <= MaxThread; ++p ) {
304         if ( p > 0 ) {
305             tbb::global_control control(tbb::global_control::max_allowed_parallelism, p);
306             TestExceptionsSupport();
307         }
308     }
309 }
310 #endif /* TBB_USE_EXCEPTIONS && !__TBB_THROW_ACROSS_MODULE_BOUNDARY_BROKEN */
311 
312 //! Testing cancellation
313 //! \brief \ref error_guessing
314 TEST_CASE("Vector types") {
315 #if HAVE_m128
316     TestVectorTypes<ClassWithSSE>();
317 #endif
318 #if HAVE_m256
319     if (have_AVX()) TestVectorTypes<ClassWithAVX>();
320 #endif
321 }
322 
323 //! Testing workers going to sleep
324 //! \brief \ref resource_usage
325 TEST_CASE("That all workers sleep when no work") {
326     const std::size_t N = 100000;
327     std::atomic<int> counter{};
328 
329     tbb::parallel_for(std::size_t(0), N, [&](std::size_t) {
330         for (volatile int i = 0; i < 1000; ++i) {
331             ++counter;
332         }
333     }, tbb::simple_partitioner());
334     TestCPUUserTime(utils::get_platform_max_threads());
335 }
336 
337 //! Testing simple partitioner stability
338 //! \brief \ref error_guessing
339 TEST_CASE("Simple partitioner stability") {
340     TestSimplePartitionerStability();
341 }
342 
343 //! Testing various range implementations
344 //! \brief \ref requirement
345 TEST_CASE("Various range implementations") {
346     various_range_implementations::test();
347 }
348 
349 //! Testing parallel_for with explicit task_group_context
350 //! \brief \ref interface \ref error_guessing
351 TEST_CASE("Сancellation test for tbb::parallel_for") {
352     test_cancellation::ParallelForTestRunner</*FirstMode = */0>::run();
353 }
354 
355 #if _MSC_VER
356 #pragma warning (pop)
357 #endif
358