1 /*
2     Copyright (c) 2022 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #if _MSC_VER && !defined(__INTEL_COMPILER)
18 // unreachable code
19 #pragma warning( push )
20 #pragma warning( disable: 4702 )
21 #endif
22 
23 #if __INTEL_COMPILER && _MSC_VER
24 #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
25 #endif
26 
27 #include "common/config.h"
28 
29 // Include first to check missed header dependencies
30 #include "tbb/collaborative_call_once.h"
31 
32 #include "common/test.h"
33 #include "common/exception_handling.h"
34 #include "common/utils_concurrency_limit.h"
35 
36 #include "tbb/parallel_invoke.h"
37 #include "tbb/parallel_reduce.h"
38 #include "tbb/task_arena.h"
39 
40 //! \file test_collaborative_call_once.cpp
41 //! \brief Tests for [algorithms.collaborative_call_once] functionality
42 
43 struct increment_functor {
44     int ct{0};
45 
46     void operator()() {
47         ct++;
48     }
49 };
50 
51 struct sum_functor {
52     int sum{0};
53 
54     template<typename T>
55     void operator()(T first_op) {
56         sum += first_op;
57     }
58 
59     template<typename T, typename... Args>
60     void operator()(T first_op, Args&&... args) {
61         (*this)(first_op);
62         (*this)(std::forward<Args>(args)...);
63     }
64 };
65 
66 struct move_only_type {
67     const int* my_pointer;
68     move_only_type(move_only_type && other): my_pointer(other.my_pointer){ other.my_pointer=nullptr; }
69     explicit move_only_type(const int* value): my_pointer(value) {}
70 };
71 
72 
73 class call_once_exception : public std::exception {};
74 
75 template<typename Fn, typename... Args>
76 void call_once_in_for_loop(std::size_t N, Fn&& body, Args&&... args) {
77     tbb::collaborative_once_flag flag;
78     for (std::size_t i = 0; i < N; ++i) {
79         tbb::collaborative_call_once(flag, std::forward<Fn>(body), std::forward<Args>(args)...);
80     }
81 }
82 
83 template<typename Fn, typename... Args>
84 void call_once_in_parallel_for(std::size_t N, Fn&& body, Args&&... args) {
85     tbb::collaborative_once_flag flag;
86 #if __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN
87     auto stored_pack = tbb::detail::d0::save_pack(std::forward<Args>(args)...);
88     auto func = [&] { tbb::detail::d0::call(std::forward<Fn>(body), std::move(stored_pack)); };
89 #endif // __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN
90 
91     tbb::parallel_for(tbb::blocked_range<size_t>(0, N), [&](const tbb::blocked_range<size_t>& range) {
92         for (size_t i = range.begin(); i != range.end(); ++i) {
93 #if __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN
94             tbb::collaborative_call_once(flag, func);
95 #else
96             tbb::collaborative_call_once(flag, std::forward<Fn>(body), std::forward<Args>(args)...);
97 #endif //__TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN
98         }
99     });
100 }
101 
102 template<typename Fn, typename... Args>
103 void call_once_threads(std::size_t N, Fn&& body, Args&&... args) {
104     tbb::collaborative_once_flag flag;
105     std::vector<std::thread> threads;
106 
107 #if __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN
108     auto stored_pack = tbb::detail::d0::save_pack(std::forward<Args>(args)...);
109     auto func = [&] { tbb::detail::d0::call(std::forward<Fn>(body), std::move(stored_pack)); };
110 #endif // __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN
111 
112     for (std::size_t i = 0; i < N; ++i)
113     {
114         threads.push_back(std::thread([&]() {
115 #if __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN
116             tbb::collaborative_call_once(flag, func);
117 #else
118             tbb::collaborative_call_once(flag, std::forward<Fn>(body), std::forward<Args>(args)...);
119 #endif // __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN
120         }));
121     }
122     for (auto& thread : threads) {
123         thread.join();
124     }
125 }
126 
127 //! Test for functor to be called only once
128 //! \brief \ref interface \ref requirement
129 TEST_CASE("only calls once 1") {
130     {
131         increment_functor f;
132 
133         call_once_in_for_loop(1024, f);
134 
135         REQUIRE( f.ct == 1);
136     }
137     {
138         increment_functor f;
139 
140         call_once_in_parallel_for(100, f);
141 
142         REQUIRE(f.ct == 1);
143     }
144     {
145         increment_functor f;
146 
147         call_once_threads(utils::get_platform_max_threads(), f);
148 
149         REQUIRE(f.ct == 1);
150     }
151 }
152 
153 //! Test for functor to be called only once
154 //! \brief \ref interface \ref requirement
155 TEST_CASE("only calls once 2") {
156     {
157         sum_functor f;
158 
159         call_once_in_for_loop(1024, f, 1, 2, 3 ,4);
160 
161         REQUIRE(f.sum == 10);
162     }
163     {
164         sum_functor f;
165 
166         call_once_in_parallel_for(512, f, 1000, -1000);
167 
168         REQUIRE(f.sum == 0);
169     }
170     {
171         sum_functor f;
172 
173         call_once_threads(utils::get_platform_max_threads(), f, 0, -1, -5);
174 
175         REQUIRE(f.sum == -6);
176     }
177 }
178 
179 //! Test for correct handling move-only arguments
180 //! \brief \ref interface \ref requirement
181 TEST_CASE("only calls once - move only argument") {
182     const int value = 42;
183     int ready{0};
184 
185     auto func = [&ready, &value] (move_only_type other) {
186         REQUIRE(other.my_pointer == &value);
187         ready++;
188     };
189 
190     {
191         move_only_type mv(&value);
192 
193         call_once_in_parallel_for(512, func, std::move(mv));
194 
195         REQUIRE(ready == 1);
196         REQUIRE(mv.my_pointer == nullptr);
197     }
198 
199     {
200         move_only_type mv(&value);
201 
202         call_once_threads(utils::get_platform_max_threads(), func, std::move(mv));
203 
204         REQUIRE(ready == 2);
205         REQUIRE(mv.my_pointer == nullptr);
206     }
207 }
208 
209 //! Stress test for functor to be called only once
210 //! \brief \ref interface \ref requirement \ref stress
211 TEST_CASE("only calls once - stress test") {
212 #if TBB_TEST_LOW_WORKLOAD
213     constexpr std::size_t N = 32;
214 #elif __TBB_x86_32 || __arm__  || __ANDROID__
215     // Some C++ implementations allocate 8MB stacks for std::thread on 32 bit platforms
216     // that makes impossible to create more than ~500 threads.
217     // Android has been added to decrease testing time.
218     constexpr std::size_t N = tbb::detail::d0::max_nfs_size * 2;
219 #elif __TBB_USE_THREAD_SANITIZER
220     // Reduce execution time under Thread Sanitizer
221     constexpr std::size_t N = tbb::detail::d0::max_nfs_size + 64;
222 #else
223     constexpr std::size_t N = tbb::detail::d0::max_nfs_size * 4;
224 #endif
225     {
226         increment_functor f;
227 
228         call_once_threads(N, f);
229 
230         REQUIRE(f.ct == 1);
231     }
232     {
233         increment_functor f;
234 
235         utils::SpinBarrier barrier{N};
236         tbb::collaborative_once_flag flag;
237         utils::NativeParallelFor(N, [&](std::size_t) {
238             for (int i = 0; i < 100; ++i) {
239                 REQUIRE(f.ct == i);
240                 barrier.wait([&] {
241                     flag.~collaborative_once_flag();
242                     new (&flag) tbb::collaborative_once_flag{};
243                 });
244                 tbb::collaborative_call_once(flag, f);
245             }
246         });
247     }
248 }
249 
250 #if TBB_USE_EXCEPTIONS
251 
252 //! Test for collaborative_call_once exception handling
253 //! \brief \ref error_guessing
254 TEST_CASE("handles exceptions - state reset") {
255     bool b{ false };
256     auto setB = [&b]() { b = true; };
257     auto setBAndCancel = [&b]() {
258         b = true;
259         throw call_once_exception{};
260     };
261 
262     tbb::collaborative_once_flag flag;
263     REQUIRE_THROWS_AS(tbb::collaborative_call_once(flag, setBAndCancel), call_once_exception);
264     REQUIRE(b);
265 
266     b = false;
267     REQUIRE_THROWS_AS(tbb::collaborative_call_once(flag, setBAndCancel), call_once_exception);
268     REQUIRE(b);
269 
270     b = false;
271     tbb::collaborative_call_once(flag, setB);
272     REQUIRE(b);
273 
274     b = false;
275     tbb::collaborative_call_once(flag, setB); // Now the call_once flag should be set.
276     REQUIRE(!b);
277 
278     b = false;
279     REQUIRE_NOTHROW(tbb::collaborative_call_once(flag, setBAndCancel)); // Flag still set, so it shouldn't be called.
280     REQUIRE(!b);
281 }
282 
283 //! Stress test for collaborative_call_once exception handling
284 //! \brief \ref error_guessing \ref stress
285 TEST_CASE("handles exceptions - stress test") {
286 #if TBB_TEST_LOW_WORKLOAD
287     constexpr std::size_t N = 32;
288 #elif __TBB_x86_32 || __arm__ || __ANDROID__
289     // Some C++ implementations allocate 8MB stacks for std::thread on 32 bit platforms
290     // that makes impossible to create more than ~500 threads.
291     // Android has been added to decrease testing time.
292     constexpr std::size_t N = tbb::detail::d0::max_nfs_size * 2;
293 #else
294     constexpr std::size_t N = tbb::detail::d0::max_nfs_size * 4;
295 #endif
296 
297     int data{0};
298     std::atomic<bool> run_again{true};
299 
300     auto throwing_func = [&] {
301         utils::doDummyWork(10000);
302         if (data < 100) {
303             data++;
304             throw call_once_exception{};
305         }
306         run_again = false;
307     };
308 
309     tbb::collaborative_once_flag flag;
310 
311     utils::NativeParallelFor(N, [&](std::size_t) {
312         while(run_again) {
313             try {
314                 tbb::collaborative_call_once(flag, throwing_func);
315             } catch (const call_once_exception&) {
316                 // We expecting only const call_once_exception&
317             } catch (...) {
318                 FAIL("Unexpected exception");
319             }
320         }
321     });
322     REQUIRE(data == 100);
323 }
324 
325 #endif
326 
327 //! Test for multiple help from moonlighting threads
328 //! \brief \ref interface \ref requirement
329 TEST_CASE("multiple help") {
330     std::size_t num_threads = utils::get_platform_max_threads();
331     utils::SpinBarrier barrier{num_threads};
332 
333     tbb::collaborative_once_flag flag;
334 
335     tbb::parallel_for<std::size_t>(0, num_threads, [&](std::size_t) {
336         barrier.wait();
337         tbb::collaborative_call_once(flag, [&] {
338             tbb::parallel_for<std::size_t>(0, num_threads, [&](std::size_t) {
339                 barrier.wait();
340             });
341         });
342     });
343 }
344 
345 //! Test for collaborative work from different arenas
346 //! \brief \ref interface \ref requirement
347 TEST_CASE("multiple arenas") {
348     int num_threads = static_cast<int>(utils::get_platform_max_threads());
349     utils::SpinBarrier barrier(num_threads);
350     tbb::task_arena a1(num_threads), a2(num_threads);
351 
352     tbb::collaborative_once_flag flag;
353     for (auto i = 0; i < num_threads - 1; ++i) {
354         a1.enqueue([&] {
355             barrier.wait();
356             barrier.wait();
357 
358             tbb::collaborative_call_once(flag, [] {
359                 FAIL("Unreachable code. collaborative_once_flag must be already initialized at this moment");
360             });
361             // To sync collaborative_once_flag lifetime
362             barrier.wait();
363         });
364     }
365 
366     barrier.wait();
367 
368     a2.execute([&] {
369         utils::ConcurrencyTracker ct;
370         tbb::parallel_for(0, num_threads, [&](int) {
371             CHECK(utils::ConcurrencyTracker::PeakParallelism() == 1);
372         });
373         tbb::collaborative_call_once(flag, [&] {
374             barrier.wait();
375             tbb::parallel_for(0, num_threads, [&](int) {
376                 barrier.wait();
377             });
378         });
379         // To sync collaborative_once_flag lifetime
380         barrier.wait();
381     });
382 }
383 
384 using FibBuffer = std::vector<std::pair<tbb::collaborative_once_flag, std::uint64_t>>;
385 std::uint64_t collaborative_recursive_fib(int n, FibBuffer& buffer) {
386     if (n <= 1) {
387         return 1;
388     }
389     tbb::collaborative_call_once(buffer[n].first, [&]() {
390         std::uint64_t a, b;
391         tbb::parallel_invoke([&] { a = collaborative_recursive_fib(n - 2, buffer); },
392                              [&] { b = collaborative_recursive_fib(n - 1, buffer); });
393         buffer[n].second = a + b;
394     });
395     return buffer[n].second;
396 }
397 
398 std::uint64_t collaborative_recursive_fib(int n) {
399     FibBuffer buffer(n);
400     return collaborative_recursive_fib(n-1, buffer);
401 }
402 
403 //! Correctness test for Fibonacci example
404 //! \brief \ref interface \ref requirement
405 TEST_CASE("fibonacci example") {
406     constexpr int N = 93;
407     constexpr std::uint64_t expected_result = 12200160415121876738ull;
408 
409     auto collaborative = collaborative_recursive_fib(N);
410 
411     REQUIRE(collaborative == expected_result);
412 }
413 
414 #if _MSC_VER && !defined(__INTEL_COMPILER)
415 #pragma warning( pop )
416 #endif
417