1 /*
2     Copyright (c) 2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #if _MSC_VER && !defined(__INTEL_COMPILER)
18 // unreachable code
19 #pragma warning( push )
20 #pragma warning( disable: 4702 )
21 #endif
22 
23 #if __INTEL_COMPILER && _MSC_VER
24 #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
25 #endif
26 
27 #include "common/config.h"
28 
29 // Include first to check missed header dependencies
30 #include "tbb/collaborative_call_once.h"
31 
32 #include "common/test.h"
33 #include "common/exception_handling.h"
34 #include "common/utils_concurrency_limit.h"
35 
36 #include "tbb/parallel_invoke.h"
37 #include "tbb/parallel_reduce.h"
38 #include "tbb/task_arena.h"
39 
40 //! \file test_collaborative_call_once.cpp
41 //! \brief Tests for [algorithms.collaborative_call_once] functionality
42 
43 struct increment_functor {
44     int ct{0};
45 
46     void operator()() {
47         ct++;
48     }
49 };
50 
51 struct sum_functor {
52     int sum{0};
53 
54     template<typename T>
55     void operator()(T first_op) {
56         sum += first_op;
57     }
58 
59     template<typename T, typename... Args>
60     void operator()(T first_op, Args&&... args) {
61         (*this)(first_op);
62         (*this)(std::forward<Args>(args)...);
63     }
64 };
65 
66 struct move_only_type {
67     const int* my_pointer;
68     move_only_type(move_only_type && other): my_pointer(other.my_pointer){ other.my_pointer=nullptr; }
69     explicit move_only_type(const int* value): my_pointer(value) {}
70 };
71 
72 
73 class call_once_exception : public std::exception {};
74 
75 template<typename Fn, typename... Args>
76 void call_once_in_for_loop(std::size_t N, Fn&& body, Args&&... args) {
77     tbb::collaborative_once_flag flag;
78     for (std::size_t i = 0; i < N; ++i) {
79         tbb::collaborative_call_once(flag, std::forward<Fn>(body), std::forward<Args>(args)...);
80     }
81 }
82 
83 template<typename Fn, typename... Args>
84 void call_once_in_parallel_for(std::size_t N, Fn&& body, Args&&... args) {
85     tbb::collaborative_once_flag flag;
86 #if __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN
87     auto stored_pack = tbb::detail::d0::save_pack(std::forward<Args>(args)...);
88     auto func = [&] { tbb::detail::d0::call(std::forward<Fn>(body), std::move(stored_pack)); };
89 #endif // __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN
90 
91     tbb::parallel_for(tbb::blocked_range<size_t>(0, N), [&](const tbb::blocked_range<size_t>& range) {
92         for (size_t i = range.begin(); i != range.end(); ++i) {
93 #if __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN
94             tbb::collaborative_call_once(flag, func);
95 #else
96             tbb::collaborative_call_once(flag, std::forward<Fn>(body), std::forward<Args>(args)...);
97 #endif //__TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN
98         }
99     });
100 }
101 
102 template<typename Fn, typename... Args>
103 void call_once_threads(std::size_t N, Fn&& body, Args&&... args) {
104     tbb::collaborative_once_flag flag;
105     std::vector<std::thread> threads;
106 
107 #if __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN
108     auto stored_pack = tbb::detail::d0::save_pack(std::forward<Args>(args)...);
109     auto func = [&] { tbb::detail::d0::call(std::forward<Fn>(body), std::move(stored_pack)); };
110 #endif // __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN
111 
112     for (std::size_t i = 0; i < N; ++i)
113     {
114         threads.push_back(std::thread([&]() {
115 #if __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN
116             tbb::collaborative_call_once(flag, func);
117 #else
118             tbb::collaborative_call_once(flag, std::forward<Fn>(body), std::forward<Args>(args)...);
119 #endif // __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN
120         }));
121     }
122     for (auto& thread : threads) {
123         thread.join();
124     }
125 }
126 
127 //! Test for functor to be called only once
128 //! \brief \ref interface \ref requirement
129 TEST_CASE("only calls once 1") {
130     {
131         increment_functor f;
132 
133         call_once_in_for_loop(1024, f);
134 
135         REQUIRE( f.ct == 1);
136     }
137     {
138         increment_functor f;
139 
140         call_once_in_parallel_for(100, f);
141 
142         REQUIRE(f.ct == 1);
143     }
144     {
145         increment_functor f;
146 
147         call_once_threads(utils::get_platform_max_threads(), f);
148 
149         REQUIRE(f.ct == 1);
150     }
151 }
152 
153 //! Test for functor to be called only once
154 //! \brief \ref interface \ref requirement
155 TEST_CASE("only calls once 2") {
156     {
157         sum_functor f;
158 
159         call_once_in_for_loop(1024, f, 1, 2, 3 ,4);
160 
161         REQUIRE(f.sum == 10);
162     }
163     {
164         sum_functor f;
165 
166         call_once_in_parallel_for(512, f, 1000, -1000);
167 
168         REQUIRE(f.sum == 0);
169     }
170     {
171         sum_functor f;
172 
173         call_once_threads(utils::get_platform_max_threads(), f, 0, -1, -5);
174 
175         REQUIRE(f.sum == -6);
176     }
177 }
178 
179 //! Test for correct handling move-only arguments
180 //! \brief \ref interface \ref requirement
181 TEST_CASE("only calls once - move only argument") {
182     const int value = 42;
183     int ready{0};
184 
185     auto func = [&ready, &value] (move_only_type other) {
186         REQUIRE(other.my_pointer == &value);
187         ready++;
188     };
189 
190     {
191         move_only_type mv(&value);
192 
193         call_once_in_parallel_for(512, func, std::move(mv));
194 
195         REQUIRE(ready == 1);
196         REQUIRE(mv.my_pointer == nullptr);
197     }
198 
199     {
200         move_only_type mv(&value);
201 
202         call_once_threads(utils::get_platform_max_threads(), func, std::move(mv));
203 
204         REQUIRE(ready == 2);
205         REQUIRE(mv.my_pointer == nullptr);
206     }
207 }
208 
209 //! Stress test for functor to be called only once
210 //! \brief \ref interface \ref requirement \ref stress
211 TEST_CASE("only calls once - stress test") {
212 #if TBB_TEST_LOW_WORKLOAD
213     constexpr std::size_t N = 32;
214 #elif __TBB_x86_32 || __aarch32__  || __ANDROID__
215     // Some C++ implementations allocate 8MB stacks for std::thread on 32 bit platforms
216     // that makes impossible to create more than ~500 threads.
217     // Android has been added to decrease testing time.
218     constexpr std::size_t N = tbb::detail::d0::max_nfs_size * 2;
219 #else
220     constexpr std::size_t N = tbb::detail::d0::max_nfs_size * 4;
221 #endif
222     {
223         increment_functor f;
224 
225         call_once_threads(N, f);
226 
227         REQUIRE(f.ct == 1);
228     }
229     {
230         increment_functor f;
231 
232         utils::SpinBarrier barrier{N};
233         tbb::collaborative_once_flag flag;
234         utils::NativeParallelFor(N, [&](std::size_t) {
235             for (int i = 0; i < 100; ++i) {
236                 REQUIRE(f.ct == i);
237                 barrier.wait([&] {
238                     flag.~collaborative_once_flag();
239                     new (&flag) tbb::collaborative_once_flag{};
240                 });
241                 tbb::collaborative_call_once(flag, f);
242             }
243         });
244     }
245 }
246 
247 #if TBB_USE_EXCEPTIONS
248 
249 //! Test for collaborative_call_once exception handling
250 //! \brief \ref error_guessing
251 TEST_CASE("handles exceptions - state reset") {
252     bool b{ false };
253     auto setB = [&b]() { b = true; };
254     auto setBAndCancel = [&b]() {
255         b = true;
256         throw call_once_exception{};
257     };
258 
259     tbb::collaborative_once_flag flag;
260     REQUIRE_THROWS_AS(tbb::collaborative_call_once(flag, setBAndCancel), call_once_exception);
261     REQUIRE(b);
262 
263     b = false;
264     REQUIRE_THROWS_AS(tbb::collaborative_call_once(flag, setBAndCancel), call_once_exception);
265     REQUIRE(b);
266 
267     b = false;
268     tbb::collaborative_call_once(flag, setB);
269     REQUIRE(b);
270 
271     b = false;
272     tbb::collaborative_call_once(flag, setB); // Now the call_once flag should be set.
273     REQUIRE(!b);
274 
275     b = false;
276     REQUIRE_NOTHROW(tbb::collaborative_call_once(flag, setBAndCancel)); // Flag still set, so it shouldn't be called.
277     REQUIRE(!b);
278 }
279 
280 //! Stress test for collaborative_call_once exception handling
281 //! \brief \ref error_guessing \ref stress
282 TEST_CASE("handles exceptions - stress test") {
283 #if TBB_TEST_LOW_WORKLOAD
284     constexpr std::size_t N = 32;
285 #elif __TBB_x86_32 || __aarch32__ || __ANDROID__
286     // Some C++ implementations allocate 8MB stacks for std::thread on 32 bit platforms
287     // that makes impossible to create more than ~500 threads.
288     // Android has been added to decrease testing time.
289     constexpr std::size_t N = tbb::detail::d0::max_nfs_size * 2;
290 #else
291     constexpr std::size_t N = tbb::detail::d0::max_nfs_size * 4;
292 #endif
293 
294     int data{0};
295     std::atomic<bool> run_again{true};
296 
297     auto throwing_func = [&] {
298         utils::doDummyWork(10000);
299         if (data < 100) {
300             data++;
301             throw call_once_exception{};
302         }
303         run_again = false;
304     };
305 
306     tbb::collaborative_once_flag flag;
307 
308     utils::NativeParallelFor(N, [&](std::size_t) {
309         while(run_again) {
310             try {
311                 tbb::collaborative_call_once(flag, throwing_func);
312             } catch (const call_once_exception&) {
313                 // We expecting only const call_once_exception&
314             } catch (...) {
315                 FAIL("Unexpected exception");
316             }
317         }
318     });
319     REQUIRE(data == 100);
320 }
321 
322 #endif
323 
324 //! Test for multiple help from moonlighting threads
325 //! \brief \ref interface \ref requirement
326 TEST_CASE("multiple help") {
327     std::size_t num_threads = utils::get_platform_max_threads();
328     utils::SpinBarrier barrier{num_threads};
329 
330     tbb::collaborative_once_flag flag;
331 
332     tbb::parallel_for<std::size_t>(0, num_threads, [&](std::size_t) {
333         barrier.wait();
334         tbb::collaborative_call_once(flag, [&] {
335             tbb::parallel_for<std::size_t>(0, num_threads, [&](std::size_t) {
336                 barrier.wait();
337             });
338         });
339     });
340 }
341 
342 //! Test for collaborative work from different arenas
343 //! \brief \ref interface \ref requirement
344 TEST_CASE("multiple arenas") {
345     int num_threads = static_cast<int>(utils::get_platform_max_threads());
346     utils::SpinBarrier barrier(num_threads);
347     tbb::task_arena a1(num_threads), a2(num_threads);
348 
349     tbb::collaborative_once_flag flag;
350     for (auto i = 0; i < num_threads - 1; ++i) {
351         a1.enqueue([&] {
352             barrier.wait();
353             barrier.wait();
354 
355             tbb::collaborative_call_once(flag, [] {
356                 FAIL("Unreachable code. collaborative_once_flag must be already initialized at this moment");
357             });
358             // To sync collaborative_once_flag lifetime
359             barrier.wait();
360         });
361     }
362 
363     barrier.wait();
364 
365     a2.execute([&] {
366         utils::ConcurrencyTracker ct;
367         tbb::parallel_for(0, num_threads, [&](int) {
368             CHECK(utils::ConcurrencyTracker::PeakParallelism() == 1);
369         });
370         tbb::collaborative_call_once(flag, [&] {
371             barrier.wait();
372             tbb::parallel_for(0, num_threads, [&](int) {
373                 barrier.wait();
374             });
375         });
376         // To sync collaborative_once_flag lifetime
377         barrier.wait();
378     });
379 }
380 
381 using FibBuffer = std::vector<std::pair<tbb::collaborative_once_flag, std::uint64_t>>;
382 std::uint64_t collaborative_recursive_fib(int n, FibBuffer& buffer) {
383     if (n <= 1) {
384         return 1;
385     }
386     tbb::collaborative_call_once(buffer[n].first, [&]() {
387         std::uint64_t a, b;
388         tbb::parallel_invoke([&] { a = collaborative_recursive_fib(n - 2, buffer); },
389                              [&] { b = collaborative_recursive_fib(n - 1, buffer); });
390         buffer[n].second = a + b;
391     });
392     return buffer[n].second;
393 }
394 
395 std::uint64_t collaborative_recursive_fib(int n) {
396     FibBuffer buffer(n);
397     return collaborative_recursive_fib(n-1, buffer);
398 }
399 
400 //! Correctness test for Fibonacci example
401 //! \brief \ref interface \ref requirement
402 TEST_CASE("fibonacci example") {
403     constexpr int N = 93;
404     constexpr std::uint64_t expected_result = 12200160415121876738ull;
405 
406     auto collaborative = collaborative_recursive_fib(N);
407 
408     REQUIRE(collaborative == expected_result);
409 }
410 
411 #if _MSC_VER && !defined(__INTEL_COMPILER)
412 #pragma warning( pop )
413 #endif
414