1 /*
2     Copyright (c) 2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #define TBB_PREVIEW_COLLABORATIVE_CALL_ONCE 1
18 
19 #if _MSC_VER && !defined(__INTEL_COMPILER)
20 // unreachable code
21 #pragma warning( push )
22 #pragma warning( disable: 4702 )
23 #endif
24 
25 #if __INTEL_COMPILER && _MSC_VER
26 #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
27 #endif
28 
29 #include "common/config.h"
30 
31 // Include first to check missed header dependencies
32 #include "tbb/collaborative_call_once.h"
33 
34 #include "common/test.h"
35 #include "common/exception_handling.h"
36 #include "common/utils_concurrency_limit.h"
37 
38 #include "tbb/parallel_invoke.h"
39 #include "tbb/parallel_reduce.h"
40 #include "tbb/task_arena.h"
41 
42 //! \file test_collaborative_call_once.cpp
43 //! \brief Tests for [preview] functionality
44 
45 struct increment_functor {
46     int ct{0};
47 
48     void operator()() {
49         ct++;
50     }
51 };
52 
53 struct sum_functor {
54     int sum{0};
55 
56     template<typename T>
57     void operator()(T first_op) {
58         sum += first_op;
59     }
60 
61     template<typename T, typename... Args>
62     void operator()(T first_op, Args&&... args) {
63         (*this)(first_op);
64         (*this)(std::forward<Args>(args)...);
65     }
66 };
67 
68 struct move_only_type {
69     const int* my_pointer;
70     move_only_type(move_only_type && other): my_pointer(other.my_pointer){ other.my_pointer=nullptr; }
71     explicit move_only_type(const int* value): my_pointer(value) {}
72 };
73 
74 
75 class call_once_exception : public std::exception {};
76 
77 template<typename Fn, typename... Args>
78 void call_once_in_for_loop(std::size_t N, Fn&& body, Args&&... args) {
79     tbb::collaborative_once_flag flag;
80     for (std::size_t i = 0; i < N; ++i) {
81         tbb::collaborative_call_once(flag, std::forward<Fn>(body), std::forward<Args>(args)...);
82     }
83 }
84 
85 template<typename Fn, typename... Args>
86 void call_once_in_parallel_for(std::size_t N, Fn&& body, Args&&... args) {
87     tbb::collaborative_once_flag flag;
88 #if __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN
89     auto stored_pack = tbb::detail::d0::save_pack(std::forward<Args>(args)...);
90     auto func = [&] { tbb::detail::d0::call(std::forward<Fn>(body), std::move(stored_pack)); };
91 #endif // __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN
92 
93     tbb::parallel_for(tbb::blocked_range<size_t>(0, N), [&](const tbb::blocked_range<size_t>& range) {
94         for (size_t i = range.begin(); i != range.end(); ++i) {
95 #if __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN
96             tbb::collaborative_call_once(flag, func);
97 #else
98             tbb::collaborative_call_once(flag, std::forward<Fn>(body), std::forward<Args>(args)...);
99 #endif //__TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN
100         }
101     });
102 }
103 
104 template<typename Fn, typename... Args>
105 void call_once_threads(std::size_t N, Fn&& body, Args&&... args) {
106     tbb::collaborative_once_flag flag;
107     std::vector<std::thread> threads;
108 
109 #if __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN
110     auto stored_pack = tbb::detail::d0::save_pack(std::forward<Args>(args)...);
111     auto func = [&] { tbb::detail::d0::call(std::forward<Fn>(body), std::move(stored_pack)); };
112 #endif // __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN
113 
114     for (std::size_t i = 0; i < N; ++i)
115     {
116         threads.push_back(std::thread([&]() {
117 #if __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN
118             tbb::collaborative_call_once(flag, func);
119 #else
120             tbb::collaborative_call_once(flag, std::forward<Fn>(body), std::forward<Args>(args)...);
121 #endif // __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN
122         }));
123     }
124     for (auto& thread : threads) {
125         thread.join();
126     }
127 }
128 
129 //! Test for functor to be called only once
130 //! \brief \ref interface \ref requirement
131 TEST_CASE("only calls once 1") {
132     {
133         increment_functor f;
134 
135         call_once_in_for_loop(1024, f);
136 
137         REQUIRE( f.ct == 1);
138     }
139     {
140         increment_functor f;
141 
142         call_once_in_parallel_for(100, f);
143 
144         REQUIRE(f.ct == 1);
145     }
146     {
147         increment_functor f;
148 
149         call_once_threads(utils::get_platform_max_threads(), f);
150 
151         REQUIRE(f.ct == 1);
152     }
153 }
154 
155 //! Test for functor to be called only once
156 //! \brief \ref interface \ref requirement
157 TEST_CASE("only calls once 2") {
158     {
159         sum_functor f;
160 
161         call_once_in_for_loop(1024, f, 1, 2, 3 ,4);
162 
163         REQUIRE(f.sum == 10);
164     }
165     {
166         sum_functor f;
167 
168         call_once_in_parallel_for(512, f, 1000, -1000);
169 
170         REQUIRE(f.sum == 0);
171     }
172     {
173         sum_functor f;
174 
175         call_once_threads(utils::get_platform_max_threads(), f, 0, -1, -5);
176 
177         REQUIRE(f.sum == -6);
178     }
179 }
180 
181 //! Test for correct handling move-only arguments
182 //! \brief \ref interface \ref requirement
183 TEST_CASE("only calls once - move only argument") {
184     const int value = 42;
185     int ready{0};
186 
187     auto func = [&ready, &value] (move_only_type other) {
188         REQUIRE(other.my_pointer == &value);
189         ready++;
190     };
191 
192     {
193         move_only_type mv(&value);
194 
195         call_once_in_parallel_for(512, func, std::move(mv));
196 
197         REQUIRE(ready == 1);
198         REQUIRE(mv.my_pointer == nullptr);
199     }
200 
201     {
202         move_only_type mv(&value);
203 
204         call_once_threads(utils::get_platform_max_threads(), func, std::move(mv));
205 
206         REQUIRE(ready == 2);
207         REQUIRE(mv.my_pointer == nullptr);
208     }
209 }
210 
211 //! Stress test for functor to be called only once
212 //! \brief \ref interface \ref requirement \ref stress
213 TEST_CASE("only calls once - stress test") {
214 #if TBB_TEST_LOW_WORKLOAD
215     constexpr std::size_t N = 32;
216 #elif __TBB_x86_32 || __aarch32__  || __ANDROID__
217     // Some C++ implementations allocate 8MB stacks for std::thread on 32 bit platforms
218     // that makes impossible to create more than ~500 threads.
219     // Android has been added to decrease testing time.
220     constexpr std::size_t N = tbb::detail::d0::max_nfs_size * 2;
221 #else
222     constexpr std::size_t N = tbb::detail::d0::max_nfs_size * 4;
223 #endif
224     {
225         increment_functor f;
226 
227         call_once_threads(N, f);
228 
229         REQUIRE(f.ct == 1);
230     }
231     {
232         increment_functor f;
233 
234         utils::SpinBarrier barrier{N};
235         tbb::collaborative_once_flag flag;
236         utils::NativeParallelFor(N, [&](std::size_t) {
237             for (int i = 0; i < 100; ++i) {
238                 REQUIRE(f.ct == i);
239                 barrier.wait([&] {
240                     flag.~collaborative_once_flag();
241                     new (&flag) tbb::collaborative_once_flag{};
242                 });
243                 tbb::collaborative_call_once(flag, f);
244             }
245         });
246     }
247 }
248 
249 #if TBB_USE_EXCEPTIONS
250 
251 //! Test for collaborative_call_once exception handling
252 //! \brief \ref error_guessing
253 TEST_CASE("handles exceptions - state reset") {
254     bool b{ false };
255     auto setB = [&b]() { b = true; };
256     auto setBAndCancel = [&b]() {
257         b = true;
258         throw call_once_exception{};
259     };
260 
261     tbb::collaborative_once_flag flag;
262     REQUIRE_THROWS_AS(tbb::collaborative_call_once(flag, setBAndCancel), call_once_exception);
263     REQUIRE(b);
264 
265     b = false;
266     REQUIRE_THROWS_AS(tbb::collaborative_call_once(flag, setBAndCancel), call_once_exception);
267     REQUIRE(b);
268 
269     b = false;
270     tbb::collaborative_call_once(flag, setB);
271     REQUIRE(b);
272 
273     b = false;
274     tbb::collaborative_call_once(flag, setB); // Now the call_once flag should be set.
275     REQUIRE(!b);
276 
277     b = false;
278     REQUIRE_NOTHROW(tbb::collaborative_call_once(flag, setBAndCancel)); // Flag still set, so it shouldn't be called.
279     REQUIRE(!b);
280 }
281 
282 //! Stress test for collaborative_call_once exception handling
283 //! \brief \ref error_guessing \ref stress
284 TEST_CASE("handles exceptions - stress test") {
285 #if TBB_TEST_LOW_WORKLOAD
286     constexpr std::size_t N = 32;
287 #elif __TBB_x86_32 || __aarch32__ || __ANDROID__
288     // Some C++ implementations allocate 8MB stacks for std::thread on 32 bit platforms
289     // that makes impossible to create more than ~500 threads.
290     // Android has been added to decrease testing time.
291     constexpr std::size_t N = tbb::detail::d0::max_nfs_size * 2;
292 #else
293     constexpr std::size_t N = tbb::detail::d0::max_nfs_size * 4;
294 #endif
295 
296     int data{0};
297     std::atomic<bool> run_again{true};
298 
299     auto throwing_func = [&] {
300         utils::doDummyWork(10000);
301         if (data < 100) {
302             data++;
303             throw call_once_exception{};
304         }
305         run_again = false;
306     };
307 
308     tbb::collaborative_once_flag flag;
309 
310     utils::NativeParallelFor(N, [&](std::size_t) {
311         while(run_again) {
312             try {
313                 tbb::collaborative_call_once(flag, throwing_func);
314             } catch (const call_once_exception&) {
315                 // We expecting only const call_once_exception&
316             } catch (...) {
317                 FAIL("Unexpected exception");
318             }
319         }
320     });
321     REQUIRE(data == 100);
322 }
323 
324 #endif
325 
326 //! Test for multiple help from moonlighting threads
327 //! \brief \ref interface \ref requirement
328 TEST_CASE("multiple help") {
329     std::size_t num_threads = utils::get_platform_max_threads();
330     utils::SpinBarrier barrier{num_threads};
331 
332     tbb::collaborative_once_flag flag;
333 
334     tbb::parallel_for<std::size_t>(0, num_threads, [&](std::size_t) {
335         barrier.wait();
336         tbb::collaborative_call_once(flag, [&] {
337             tbb::parallel_for<std::size_t>(0, num_threads, [&](std::size_t) {
338                 barrier.wait();
339             });
340         });
341     });
342 }
343 
344 //! Test for collaborative work from different arenas
345 //! \brief \ref interface \ref requirement
346 TEST_CASE("multiple arenas") {
347     int num_threads = static_cast<int>(utils::get_platform_max_threads());
348     utils::SpinBarrier barrier(num_threads);
349     tbb::task_arena a1(num_threads), a2(num_threads);
350 
351     tbb::collaborative_once_flag flag;
352     for (auto i = 0; i < num_threads - 1; ++i) {
353         a1.enqueue([&] {
354             barrier.wait();
355             barrier.wait();
356 
357             tbb::collaborative_call_once(flag, [] {
358                 FAIL("Unreachable code. collaborative_once_flag must be already initialized at this moment");
359             });
360             // To sync collaborative_once_flag lifetime
361             barrier.wait();
362         });
363     }
364 
365     barrier.wait();
366 
367     a2.execute([&] {
368         utils::ConcurrencyTracker ct;
369         tbb::parallel_for(0, num_threads, [&](int) {
370             CHECK(utils::ConcurrencyTracker::PeakParallelism() == 1);
371         });
372         tbb::collaborative_call_once(flag, [&] {
373             barrier.wait();
374             tbb::parallel_for(0, num_threads, [&](int) {
375                 barrier.wait();
376             });
377         });
378         // To sync collaborative_once_flag lifetime
379         barrier.wait();
380     });
381 }
382 
383 using FibBuffer = std::vector<std::pair<tbb::collaborative_once_flag, std::uint64_t>>;
384 std::uint64_t collaborative_recursive_fib(int n, FibBuffer& buffer) {
385     if (n <= 1) {
386         return 1;
387     }
388     tbb::collaborative_call_once(buffer[n].first, [&]() {
389         std::uint64_t a, b;
390         tbb::parallel_invoke([&] { a = collaborative_recursive_fib(n - 2, buffer); },
391                              [&] { b = collaborative_recursive_fib(n - 1, buffer); });
392         buffer[n].second = a + b;
393     });
394     return buffer[n].second;
395 }
396 
397 std::uint64_t collaborative_recursive_fib(int n) {
398     FibBuffer buffer(n);
399     return collaborative_recursive_fib(n-1, buffer);
400 }
401 
402 //! Correctness test for Fibonacci example
403 //! \brief \ref interface \ref requirement
404 TEST_CASE("fibonacci example") {
405     constexpr int N = 93;
406     constexpr std::uint64_t expected_result = 12200160415121876738ull;
407 
408     auto collaborative = collaborative_recursive_fib(N);
409 
410     REQUIRE(collaborative == expected_result);
411 }
412 
413 #if _MSC_VER && !defined(__INTEL_COMPILER)
414 #pragma warning( pop )
415 #endif
416