1 /* 2 Copyright (c) 2022 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #if _MSC_VER && !defined(__INTEL_COMPILER) 18 // unreachable code 19 #pragma warning( push ) 20 #pragma warning( disable: 4702 ) 21 #endif 22 23 #if __INTEL_COMPILER && _MSC_VER 24 #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated 25 #endif 26 27 #include "common/config.h" 28 29 // Include first to check missed header dependencies 30 #include "tbb/collaborative_call_once.h" 31 32 #include "common/test.h" 33 #include "common/exception_handling.h" 34 #include "common/utils_concurrency_limit.h" 35 36 #include "tbb/parallel_invoke.h" 37 #include "tbb/parallel_reduce.h" 38 #include "tbb/task_arena.h" 39 40 //! \file test_collaborative_call_once.cpp 41 //! \brief Tests for [algorithms.collaborative_call_once] functionality 42 43 struct increment_functor { 44 int ct{0}; 45 46 void operator()() { 47 ct++; 48 } 49 }; 50 51 struct sum_functor { 52 int sum{0}; 53 54 template<typename T> 55 void operator()(T first_op) { 56 sum += first_op; 57 } 58 59 template<typename T, typename... Args> 60 void operator()(T first_op, Args&&... args) { 61 (*this)(first_op); 62 (*this)(std::forward<Args>(args)...); 63 } 64 }; 65 66 struct move_only_type { 67 const int* my_pointer; 68 move_only_type(move_only_type && other): my_pointer(other.my_pointer){ other.my_pointer=nullptr; } 69 explicit move_only_type(const int* value): my_pointer(value) {} 70 }; 71 72 73 class call_once_exception : public std::exception {}; 74 75 template<typename Fn, typename... Args> 76 void call_once_in_for_loop(std::size_t N, Fn&& body, Args&&... args) { 77 tbb::collaborative_once_flag flag; 78 for (std::size_t i = 0; i < N; ++i) { 79 tbb::collaborative_call_once(flag, std::forward<Fn>(body), std::forward<Args>(args)...); 80 } 81 } 82 83 template<typename Fn, typename... Args> 84 void call_once_in_parallel_for(std::size_t N, Fn&& body, Args&&... args) { 85 tbb::collaborative_once_flag flag; 86 #if __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN 87 auto stored_pack = tbb::detail::d0::save_pack(std::forward<Args>(args)...); 88 auto func = [&] { tbb::detail::d0::call(std::forward<Fn>(body), std::move(stored_pack)); }; 89 #endif // __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN 90 91 tbb::parallel_for(tbb::blocked_range<size_t>(0, N), [&](const tbb::blocked_range<size_t>& range) { 92 for (size_t i = range.begin(); i != range.end(); ++i) { 93 #if __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN 94 tbb::collaborative_call_once(flag, func); 95 #else 96 tbb::collaborative_call_once(flag, std::forward<Fn>(body), std::forward<Args>(args)...); 97 #endif //__TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN 98 } 99 }); 100 } 101 102 template<typename Fn, typename... Args> 103 void call_once_threads(std::size_t N, Fn&& body, Args&&... args) { 104 tbb::collaborative_once_flag flag; 105 std::vector<std::thread> threads; 106 107 #if __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN 108 auto stored_pack = tbb::detail::d0::save_pack(std::forward<Args>(args)...); 109 auto func = [&] { tbb::detail::d0::call(std::forward<Fn>(body), std::move(stored_pack)); }; 110 #endif // __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN 111 112 for (std::size_t i = 0; i < N; ++i) 113 { 114 threads.push_back(std::thread([&]() { 115 #if __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN 116 tbb::collaborative_call_once(flag, func); 117 #else 118 tbb::collaborative_call_once(flag, std::forward<Fn>(body), std::forward<Args>(args)...); 119 #endif // __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN 120 })); 121 } 122 for (auto& thread : threads) { 123 thread.join(); 124 } 125 } 126 127 //! Test for functor to be called only once 128 //! \brief \ref interface \ref requirement 129 TEST_CASE("only calls once 1") { 130 { 131 increment_functor f; 132 133 call_once_in_for_loop(1024, f); 134 135 REQUIRE( f.ct == 1); 136 } 137 { 138 increment_functor f; 139 140 call_once_in_parallel_for(100, f); 141 142 REQUIRE(f.ct == 1); 143 } 144 { 145 increment_functor f; 146 147 call_once_threads(utils::get_platform_max_threads(), f); 148 149 REQUIRE(f.ct == 1); 150 } 151 } 152 153 //! Test for functor to be called only once 154 //! \brief \ref interface \ref requirement 155 TEST_CASE("only calls once 2") { 156 { 157 sum_functor f; 158 159 call_once_in_for_loop(1024, f, 1, 2, 3 ,4); 160 161 REQUIRE(f.sum == 10); 162 } 163 { 164 sum_functor f; 165 166 call_once_in_parallel_for(512, f, 1000, -1000); 167 168 REQUIRE(f.sum == 0); 169 } 170 { 171 sum_functor f; 172 173 call_once_threads(utils::get_platform_max_threads(), f, 0, -1, -5); 174 175 REQUIRE(f.sum == -6); 176 } 177 } 178 179 //! Test for correct handling move-only arguments 180 //! \brief \ref interface \ref requirement 181 TEST_CASE("only calls once - move only argument") { 182 const int value = 42; 183 int ready{0}; 184 185 auto func = [&ready, &value] (move_only_type other) { 186 REQUIRE(other.my_pointer == &value); 187 ready++; 188 }; 189 190 { 191 move_only_type mv(&value); 192 193 call_once_in_parallel_for(512, func, std::move(mv)); 194 195 REQUIRE(ready == 1); 196 REQUIRE(mv.my_pointer == nullptr); 197 } 198 199 { 200 move_only_type mv(&value); 201 202 call_once_threads(utils::get_platform_max_threads(), func, std::move(mv)); 203 204 REQUIRE(ready == 2); 205 REQUIRE(mv.my_pointer == nullptr); 206 } 207 } 208 209 //! Stress test for functor to be called only once 210 //! \brief \ref interface \ref requirement \ref stress 211 TEST_CASE("only calls once - stress test") { 212 #if TBB_TEST_LOW_WORKLOAD 213 constexpr std::size_t N = 32; 214 #elif __TBB_x86_32 || __arm__ || __ANDROID__ 215 // Some C++ implementations allocate 8MB stacks for std::thread on 32 bit platforms 216 // that makes impossible to create more than ~500 threads. 217 // Android has been added to decrease testing time. 218 constexpr std::size_t N = tbb::detail::d0::max_nfs_size * 2; 219 #elif __TBB_USE_THREAD_SANITIZER 220 // Reduce execution time under Thread Sanitizer 221 constexpr std::size_t N = tbb::detail::d0::max_nfs_size + 64; 222 #else 223 constexpr std::size_t N = tbb::detail::d0::max_nfs_size * 4; 224 #endif 225 { 226 increment_functor f; 227 228 call_once_threads(N, f); 229 230 REQUIRE(f.ct == 1); 231 } 232 { 233 increment_functor f; 234 235 utils::SpinBarrier barrier{N}; 236 tbb::collaborative_once_flag flag; 237 utils::NativeParallelFor(N, [&](std::size_t) { 238 for (int i = 0; i < 100; ++i) { 239 REQUIRE(f.ct == i); 240 barrier.wait([&] { 241 flag.~collaborative_once_flag(); 242 new (&flag) tbb::collaborative_once_flag{}; 243 }); 244 tbb::collaborative_call_once(flag, f); 245 } 246 }); 247 } 248 } 249 250 #if TBB_USE_EXCEPTIONS 251 252 //! Test for collaborative_call_once exception handling 253 //! \brief \ref error_guessing 254 TEST_CASE("handles exceptions - state reset") { 255 bool b{ false }; 256 auto setB = [&b]() { b = true; }; 257 auto setBAndCancel = [&b]() { 258 b = true; 259 throw call_once_exception{}; 260 }; 261 262 tbb::collaborative_once_flag flag; 263 REQUIRE_THROWS_AS(tbb::collaborative_call_once(flag, setBAndCancel), call_once_exception); 264 REQUIRE(b); 265 266 b = false; 267 REQUIRE_THROWS_AS(tbb::collaborative_call_once(flag, setBAndCancel), call_once_exception); 268 REQUIRE(b); 269 270 b = false; 271 tbb::collaborative_call_once(flag, setB); 272 REQUIRE(b); 273 274 b = false; 275 tbb::collaborative_call_once(flag, setB); // Now the call_once flag should be set. 276 REQUIRE(!b); 277 278 b = false; 279 REQUIRE_NOTHROW(tbb::collaborative_call_once(flag, setBAndCancel)); // Flag still set, so it shouldn't be called. 280 REQUIRE(!b); 281 } 282 283 //! Stress test for collaborative_call_once exception handling 284 //! \brief \ref error_guessing \ref stress 285 TEST_CASE("handles exceptions - stress test") { 286 #if TBB_TEST_LOW_WORKLOAD 287 constexpr std::size_t N = 32; 288 #elif __TBB_x86_32 || __arm__ || __ANDROID__ 289 // Some C++ implementations allocate 8MB stacks for std::thread on 32 bit platforms 290 // that makes impossible to create more than ~500 threads. 291 // Android has been added to decrease testing time. 292 constexpr std::size_t N = tbb::detail::d0::max_nfs_size * 2; 293 #else 294 constexpr std::size_t N = tbb::detail::d0::max_nfs_size * 4; 295 #endif 296 297 int data{0}; 298 std::atomic<bool> run_again{true}; 299 300 auto throwing_func = [&] { 301 utils::doDummyWork(10000); 302 if (data < 100) { 303 data++; 304 throw call_once_exception{}; 305 } 306 run_again = false; 307 }; 308 309 tbb::collaborative_once_flag flag; 310 311 utils::NativeParallelFor(N, [&](std::size_t) { 312 while(run_again) { 313 try { 314 tbb::collaborative_call_once(flag, throwing_func); 315 } catch (const call_once_exception&) { 316 // We expecting only const call_once_exception& 317 } catch (...) { 318 FAIL("Unexpected exception"); 319 } 320 } 321 }); 322 REQUIRE(data == 100); 323 } 324 325 #endif 326 327 //! Test for multiple help from moonlighting threads 328 //! \brief \ref interface \ref requirement 329 TEST_CASE("multiple help") { 330 std::size_t num_threads = utils::get_platform_max_threads(); 331 utils::SpinBarrier barrier{num_threads}; 332 333 tbb::collaborative_once_flag flag; 334 335 tbb::parallel_for<std::size_t>(0, num_threads, [&](std::size_t) { 336 barrier.wait(); 337 tbb::collaborative_call_once(flag, [&] { 338 tbb::parallel_for<std::size_t>(0, num_threads, [&](std::size_t) { 339 barrier.wait(); 340 }); 341 }); 342 }); 343 } 344 345 //! Test for collaborative work from different arenas 346 //! \brief \ref interface \ref requirement 347 TEST_CASE("multiple arenas") { 348 int num_threads = static_cast<int>(utils::get_platform_max_threads()); 349 utils::SpinBarrier barrier(num_threads); 350 tbb::task_arena a1(num_threads), a2(num_threads); 351 352 tbb::collaborative_once_flag flag; 353 for (auto i = 0; i < num_threads - 1; ++i) { 354 a1.enqueue([&] { 355 barrier.wait(); 356 barrier.wait(); 357 358 tbb::collaborative_call_once(flag, [] { 359 FAIL("Unreachable code. collaborative_once_flag must be already initialized at this moment"); 360 }); 361 // To sync collaborative_once_flag lifetime 362 barrier.wait(); 363 }); 364 } 365 366 barrier.wait(); 367 368 a2.execute([&] { 369 utils::ConcurrencyTracker ct; 370 tbb::parallel_for(0, num_threads, [&](int) { 371 CHECK(utils::ConcurrencyTracker::PeakParallelism() == 1); 372 }); 373 tbb::collaborative_call_once(flag, [&] { 374 barrier.wait(); 375 tbb::parallel_for(0, num_threads, [&](int) { 376 barrier.wait(); 377 }); 378 }); 379 // To sync collaborative_once_flag lifetime 380 barrier.wait(); 381 }); 382 } 383 384 using FibBuffer = std::vector<std::pair<tbb::collaborative_once_flag, std::uint64_t>>; 385 std::uint64_t collaborative_recursive_fib(int n, FibBuffer& buffer) { 386 if (n <= 1) { 387 return 1; 388 } 389 tbb::collaborative_call_once(buffer[n].first, [&]() { 390 std::uint64_t a, b; 391 tbb::parallel_invoke([&] { a = collaborative_recursive_fib(n - 2, buffer); }, 392 [&] { b = collaborative_recursive_fib(n - 1, buffer); }); 393 buffer[n].second = a + b; 394 }); 395 return buffer[n].second; 396 } 397 398 std::uint64_t collaborative_recursive_fib(int n) { 399 FibBuffer buffer(n); 400 return collaborative_recursive_fib(n-1, buffer); 401 } 402 403 //! Correctness test for Fibonacci example 404 //! \brief \ref interface \ref requirement 405 TEST_CASE("fibonacci example") { 406 constexpr int N = 93; 407 constexpr std::uint64_t expected_result = 12200160415121876738ull; 408 409 auto collaborative = collaborative_recursive_fib(N); 410 411 REQUIRE(collaborative == expected_result); 412 } 413 414 #if _MSC_VER && !defined(__INTEL_COMPILER) 415 #pragma warning( pop ) 416 #endif 417