1 /*
2 Copyright (c) 2022 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #if _MSC_VER && !defined(__INTEL_COMPILER)
18 // unreachable code
19 #pragma warning( push )
20 #pragma warning( disable: 4702 )
21 #endif
22
23 #if __INTEL_COMPILER && _MSC_VER
24 #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
25 #endif
26
27 #include "common/config.h"
28
29 // Include first to check missed header dependencies
30 #include "tbb/collaborative_call_once.h"
31
32 #include "common/test.h"
33 #include "common/exception_handling.h"
34 #include "common/utils_concurrency_limit.h"
35
36 #include "tbb/parallel_invoke.h"
37 #include "tbb/parallel_reduce.h"
38 #include "tbb/task_arena.h"
39
40 //! \file test_collaborative_call_once.cpp
41 //! \brief Tests for [algorithms.collaborative_call_once] functionality
42
43 struct increment_functor {
44 int ct{0};
45
operator ()increment_functor46 void operator()() {
47 ct++;
48 }
49 };
50
51 struct sum_functor {
52 int sum{0};
53
54 template<typename T>
operator ()sum_functor55 void operator()(T first_op) {
56 sum += first_op;
57 }
58
59 template<typename T, typename... Args>
operator ()sum_functor60 void operator()(T first_op, Args&&... args) {
61 (*this)(first_op);
62 (*this)(std::forward<Args>(args)...);
63 }
64 };
65
66 struct move_only_type {
67 const int* my_pointer;
move_only_typemove_only_type68 move_only_type(move_only_type && other): my_pointer(other.my_pointer){ other.my_pointer=nullptr; }
move_only_typemove_only_type69 explicit move_only_type(const int* value): my_pointer(value) {}
70 };
71
72
73 class call_once_exception : public std::exception {};
74
75 template<typename Fn, typename... Args>
call_once_in_for_loop(std::size_t N,Fn && body,Args &&...args)76 void call_once_in_for_loop(std::size_t N, Fn&& body, Args&&... args) {
77 tbb::collaborative_once_flag flag;
78 for (std::size_t i = 0; i < N; ++i) {
79 tbb::collaborative_call_once(flag, std::forward<Fn>(body), std::forward<Args>(args)...);
80 }
81 }
82
83 template<typename Fn, typename... Args>
call_once_in_parallel_for(std::size_t N,Fn && body,Args &&...args)84 void call_once_in_parallel_for(std::size_t N, Fn&& body, Args&&... args) {
85 tbb::collaborative_once_flag flag;
86 #if __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN
87 auto stored_pack = tbb::detail::d0::save_pack(std::forward<Args>(args)...);
88 auto func = [&] { tbb::detail::d0::call(std::forward<Fn>(body), std::move(stored_pack)); };
89 #endif // __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN
90
91 tbb::parallel_for(tbb::blocked_range<size_t>(0, N), [&](const tbb::blocked_range<size_t>& range) {
92 for (size_t i = range.begin(); i != range.end(); ++i) {
93 #if __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN
94 tbb::collaborative_call_once(flag, func);
95 #else
96 tbb::collaborative_call_once(flag, std::forward<Fn>(body), std::forward<Args>(args)...);
97 #endif //__TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN
98 }
99 });
100 }
101
102 template<typename Fn, typename... Args>
call_once_threads(std::size_t N,Fn && body,Args &&...args)103 void call_once_threads(std::size_t N, Fn&& body, Args&&... args) {
104 tbb::collaborative_once_flag flag;
105 std::vector<std::thread> threads;
106
107 #if __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN
108 auto stored_pack = tbb::detail::d0::save_pack(std::forward<Args>(args)...);
109 auto func = [&] { tbb::detail::d0::call(std::forward<Fn>(body), std::move(stored_pack)); };
110 #endif // __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN
111
112 for (std::size_t i = 0; i < N; ++i)
113 {
114 threads.push_back(std::thread([&]() {
115 #if __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN
116 tbb::collaborative_call_once(flag, func);
117 #else
118 tbb::collaborative_call_once(flag, std::forward<Fn>(body), std::forward<Args>(args)...);
119 #endif // __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN
120 }));
121 }
122 for (auto& thread : threads) {
123 thread.join();
124 }
125 }
126
127 //! Test for functor to be called only once
128 //! \brief \ref interface \ref requirement
129 TEST_CASE("only calls once 1") {
130 {
131 increment_functor f;
132
133 call_once_in_for_loop(1024, f);
134
135 REQUIRE( f.ct == 1);
136 }
137 {
138 increment_functor f;
139
140 call_once_in_parallel_for(100, f);
141
142 REQUIRE(f.ct == 1);
143 }
144 {
145 increment_functor f;
146
147 call_once_threads(utils::get_platform_max_threads(), f);
148
149 REQUIRE(f.ct == 1);
150 }
151 }
152
153 //! Test for functor to be called only once
154 //! \brief \ref interface \ref requirement
155 TEST_CASE("only calls once 2") {
156 {
157 sum_functor f;
158
159 call_once_in_for_loop(1024, f, 1, 2, 3 ,4);
160
161 REQUIRE(f.sum == 10);
162 }
163 {
164 sum_functor f;
165
166 call_once_in_parallel_for(512, f, 1000, -1000);
167
168 REQUIRE(f.sum == 0);
169 }
170 {
171 sum_functor f;
172
173 call_once_threads(utils::get_platform_max_threads(), f, 0, -1, -5);
174
175 REQUIRE(f.sum == -6);
176 }
177 }
178
179 //! Test for correct handling move-only arguments
180 //! \brief \ref interface \ref requirement
181 TEST_CASE("only calls once - move only argument") {
182 const int value = 42;
183 int ready{0};
184
__anon8532fc100502(move_only_type other) 185 auto func = [&ready, &value] (move_only_type other) {
186 REQUIRE(other.my_pointer == &value);
187 ready++;
188 };
189
190 {
191 move_only_type mv(&value);
192
193 call_once_in_parallel_for(512, func, std::move(mv));
194
195 REQUIRE(ready == 1);
196 REQUIRE(mv.my_pointer == nullptr);
197 }
198
199 {
200 move_only_type mv(&value);
201
202 call_once_threads(utils::get_platform_max_threads(), func, std::move(mv));
203
204 REQUIRE(ready == 2);
205 REQUIRE(mv.my_pointer == nullptr);
206 }
207 }
208
209 //! Stress test for functor to be called only once
210 //! \brief \ref interface \ref requirement \ref stress
211 TEST_CASE("only calls once - stress test") {
212 #if TBB_TEST_LOW_WORKLOAD
213 constexpr std::size_t N = 32;
214 #elif __TBB_x86_32 || __arm__ || __ANDROID__
215 // Some C++ implementations allocate 8MB stacks for std::thread on 32 bit platforms
216 // that makes impossible to create more than ~500 threads.
217 // Android has been added to decrease testing time.
218 constexpr std::size_t N = tbb::detail::d0::max_nfs_size * 2;
219 #elif __TBB_USE_THREAD_SANITIZER
220 // Reduce execution time under Thread Sanitizer
221 constexpr std::size_t N = tbb::detail::d0::max_nfs_size + 64;
222 #else
223 constexpr std::size_t N = tbb::detail::d0::max_nfs_size * 4;
224 #endif
225 {
226 increment_functor f;
227
228 call_once_threads(N, f);
229
230 REQUIRE(f.ct == 1);
231 }
232 {
233 increment_functor f;
234
235 utils::SpinBarrier barrier{N};
236 tbb::collaborative_once_flag flag;
__anon8532fc100602(std::size_t) 237 utils::NativeParallelFor(N, [&](std::size_t) {
238 for (int i = 0; i < 100; ++i) {
239 REQUIRE(f.ct == i);
240 barrier.wait([&] {
241 flag.~collaborative_once_flag();
242 new (&flag) tbb::collaborative_once_flag{};
243 });
244 tbb::collaborative_call_once(flag, f);
245 }
246 });
247 }
248 }
249
250 #if TBB_USE_EXCEPTIONS
251
252 //! Test for collaborative_call_once exception handling
253 //! \brief \ref error_guessing
254 TEST_CASE("handles exceptions - state reset") {
255 bool b{ false };
__anon8532fc100802() 256 auto setB = [&b]() { b = true; };
__anon8532fc100902() 257 auto setBAndCancel = [&b]() {
258 b = true;
259 throw call_once_exception{};
260 };
261
262 tbb::collaborative_once_flag flag;
263 REQUIRE_THROWS_AS(tbb::collaborative_call_once(flag, setBAndCancel), call_once_exception);
264 REQUIRE(b);
265
266 b = false;
267 REQUIRE_THROWS_AS(tbb::collaborative_call_once(flag, setBAndCancel), call_once_exception);
268 REQUIRE(b);
269
270 b = false;
271 tbb::collaborative_call_once(flag, setB);
272 REQUIRE(b);
273
274 b = false;
275 tbb::collaborative_call_once(flag, setB); // Now the call_once flag should be set.
276 REQUIRE(!b);
277
278 b = false;
279 REQUIRE_NOTHROW(tbb::collaborative_call_once(flag, setBAndCancel)); // Flag still set, so it shouldn't be called.
280 REQUIRE(!b);
281 }
282
283 //! Stress test for collaborative_call_once exception handling
284 //! \brief \ref error_guessing \ref stress
285 TEST_CASE("handles exceptions - stress test") {
286 #if TBB_TEST_LOW_WORKLOAD
287 constexpr std::size_t N = 32;
288 #elif __TBB_x86_32 || __arm__ || __ANDROID__
289 // Some C++ implementations allocate 8MB stacks for std::thread on 32 bit platforms
290 // that makes impossible to create more than ~500 threads.
291 // Android has been added to decrease testing time.
292 constexpr std::size_t N = tbb::detail::d0::max_nfs_size * 2;
293 #else
294 constexpr std::size_t N = tbb::detail::d0::max_nfs_size * 4;
295 #endif
296
297 int data{0};
298 std::atomic<bool> run_again{true};
299
__anon8532fc100a02null300 auto throwing_func = [&] {
301 utils::doDummyWork(10000);
302 if (data < 100) {
303 data++;
304 throw call_once_exception{};
305 }
306 run_again = false;
307 };
308
309 tbb::collaborative_once_flag flag;
310
__anon8532fc100b02(std::size_t) 311 utils::NativeParallelFor(N, [&](std::size_t) {
312 while(run_again) {
313 try {
314 tbb::collaborative_call_once(flag, throwing_func);
315 } catch (const call_once_exception&) {
316 // We expecting only const call_once_exception&
317 } catch (...) {
318 FAIL("Unexpected exception");
319 }
320 }
321 });
322 REQUIRE(data == 100);
323 }
324
325 #endif
326
327 //! Test for multiple help from moonlighting threads
328 //! \brief \ref interface \ref requirement
329 TEST_CASE("multiple help") {
330 std::size_t num_threads = utils::get_platform_max_threads();
331 utils::SpinBarrier barrier{num_threads};
332
333 tbb::collaborative_once_flag flag;
334
__anon8532fc100c02(std::size_t) 335 tbb::parallel_for<std::size_t>(0, num_threads, [&](std::size_t) {
336 barrier.wait();
337 tbb::collaborative_call_once(flag, [&] {
338 tbb::parallel_for<std::size_t>(0, num_threads, [&](std::size_t) {
339 barrier.wait();
340 });
341 });
342 });
343 }
344
345 //! Test for collaborative work from different arenas
346 //! \brief \ref interface \ref requirement
347 TEST_CASE("multiple arenas") {
348 int num_threads = static_cast<int>(utils::get_platform_max_threads());
349 utils::SpinBarrier barrier(num_threads);
350 tbb::task_arena a1(num_threads), a2(num_threads);
351
352 tbb::collaborative_once_flag flag;
353 for (auto i = 0; i < num_threads - 1; ++i) {
__anon8532fc100f02null354 a1.enqueue([&] {
355 barrier.wait();
356 barrier.wait();
357
358 tbb::collaborative_call_once(flag, [] {
359 FAIL("Unreachable code. collaborative_once_flag must be already initialized at this moment");
360 });
361 // To sync collaborative_once_flag lifetime
362 barrier.wait();
363 });
364 }
365
366 barrier.wait();
367
__anon8532fc101102null368 a2.execute([&] {
369 utils::ConcurrencyTracker ct;
370 tbb::parallel_for(0, num_threads, [&](int) {
371 CHECK(utils::ConcurrencyTracker::PeakParallelism() == 1);
372 });
373 tbb::collaborative_call_once(flag, [&] {
374 barrier.wait();
375 tbb::parallel_for(0, num_threads, [&](int) {
376 barrier.wait();
377 });
378 });
379 // To sync collaborative_once_flag lifetime
380 barrier.wait();
381 });
382 }
383
384 using FibBuffer = std::vector<std::pair<tbb::collaborative_once_flag, std::uint64_t>>;
collaborative_recursive_fib(int n,FibBuffer & buffer)385 std::uint64_t collaborative_recursive_fib(int n, FibBuffer& buffer) {
386 if (n <= 1) {
387 return 1;
388 }
389 tbb::collaborative_call_once(buffer[n].first, [&]() {
390 std::uint64_t a, b;
391 tbb::parallel_invoke([&] { a = collaborative_recursive_fib(n - 2, buffer); },
392 [&] { b = collaborative_recursive_fib(n - 1, buffer); });
393 buffer[n].second = a + b;
394 });
395 return buffer[n].second;
396 }
397
collaborative_recursive_fib(int n)398 std::uint64_t collaborative_recursive_fib(int n) {
399 FibBuffer buffer(n);
400 return collaborative_recursive_fib(n-1, buffer);
401 }
402
403 //! Correctness test for Fibonacci example
404 //! \brief \ref interface \ref requirement
405 TEST_CASE("fibonacci example") {
406 constexpr int N = 93;
407 constexpr std::uint64_t expected_result = 12200160415121876738ull;
408
409 auto collaborative = collaborative_recursive_fib(N);
410
411 REQUIRE(collaborative == expected_result);
412 }
413
414 #if _MSC_VER && !defined(__INTEL_COMPILER)
415 #pragma warning( pop )
416 #endif
417