1 /* 2 Copyright (c) 2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #if _MSC_VER && !defined(__INTEL_COMPILER) 18 // unreachable code 19 #pragma warning( push ) 20 #pragma warning( disable: 4702 ) 21 #endif 22 23 #if __INTEL_COMPILER && _MSC_VER 24 #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated 25 #endif 26 27 #include "common/config.h" 28 29 // Include first to check missed header dependencies 30 #include "tbb/collaborative_call_once.h" 31 32 #include "common/test.h" 33 #include "common/exception_handling.h" 34 #include "common/utils_concurrency_limit.h" 35 36 #include "tbb/parallel_invoke.h" 37 #include "tbb/parallel_reduce.h" 38 #include "tbb/task_arena.h" 39 40 //! \file test_collaborative_call_once.cpp 41 //! \brief Tests for [algorithms.collaborative_call_once] functionality 42 43 struct increment_functor { 44 int ct{0}; 45 46 void operator()() { 47 ct++; 48 } 49 }; 50 51 struct sum_functor { 52 int sum{0}; 53 54 template<typename T> 55 void operator()(T first_op) { 56 sum += first_op; 57 } 58 59 template<typename T, typename... Args> 60 void operator()(T first_op, Args&&... args) { 61 (*this)(first_op); 62 (*this)(std::forward<Args>(args)...); 63 } 64 }; 65 66 struct move_only_type { 67 const int* my_pointer; 68 move_only_type(move_only_type && other): my_pointer(other.my_pointer){ other.my_pointer=nullptr; } 69 explicit move_only_type(const int* value): my_pointer(value) {} 70 }; 71 72 73 class call_once_exception : public std::exception {}; 74 75 template<typename Fn, typename... Args> 76 void call_once_in_for_loop(std::size_t N, Fn&& body, Args&&... args) { 77 tbb::collaborative_once_flag flag; 78 for (std::size_t i = 0; i < N; ++i) { 79 tbb::collaborative_call_once(flag, std::forward<Fn>(body), std::forward<Args>(args)...); 80 } 81 } 82 83 template<typename Fn, typename... Args> 84 void call_once_in_parallel_for(std::size_t N, Fn&& body, Args&&... args) { 85 tbb::collaborative_once_flag flag; 86 #if __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN 87 auto stored_pack = tbb::detail::d0::save_pack(std::forward<Args>(args)...); 88 auto func = [&] { tbb::detail::d0::call(std::forward<Fn>(body), std::move(stored_pack)); }; 89 #endif // __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN 90 91 tbb::parallel_for(tbb::blocked_range<size_t>(0, N), [&](const tbb::blocked_range<size_t>& range) { 92 for (size_t i = range.begin(); i != range.end(); ++i) { 93 #if __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN 94 tbb::collaborative_call_once(flag, func); 95 #else 96 tbb::collaborative_call_once(flag, std::forward<Fn>(body), std::forward<Args>(args)...); 97 #endif //__TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN 98 } 99 }); 100 } 101 102 template<typename Fn, typename... Args> 103 void call_once_threads(std::size_t N, Fn&& body, Args&&... args) { 104 tbb::collaborative_once_flag flag; 105 std::vector<std::thread> threads; 106 107 #if __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN 108 auto stored_pack = tbb::detail::d0::save_pack(std::forward<Args>(args)...); 109 auto func = [&] { tbb::detail::d0::call(std::forward<Fn>(body), std::move(stored_pack)); }; 110 #endif // __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN 111 112 for (std::size_t i = 0; i < N; ++i) 113 { 114 threads.push_back(std::thread([&]() { 115 #if __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN 116 tbb::collaborative_call_once(flag, func); 117 #else 118 tbb::collaborative_call_once(flag, std::forward<Fn>(body), std::forward<Args>(args)...); 119 #endif // __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN 120 })); 121 } 122 for (auto& thread : threads) { 123 thread.join(); 124 } 125 } 126 127 //! Test for functor to be called only once 128 //! \brief \ref interface \ref requirement 129 TEST_CASE("only calls once 1") { 130 { 131 increment_functor f; 132 133 call_once_in_for_loop(1024, f); 134 135 REQUIRE( f.ct == 1); 136 } 137 { 138 increment_functor f; 139 140 call_once_in_parallel_for(100, f); 141 142 REQUIRE(f.ct == 1); 143 } 144 { 145 increment_functor f; 146 147 call_once_threads(utils::get_platform_max_threads(), f); 148 149 REQUIRE(f.ct == 1); 150 } 151 } 152 153 //! Test for functor to be called only once 154 //! \brief \ref interface \ref requirement 155 TEST_CASE("only calls once 2") { 156 { 157 sum_functor f; 158 159 call_once_in_for_loop(1024, f, 1, 2, 3 ,4); 160 161 REQUIRE(f.sum == 10); 162 } 163 { 164 sum_functor f; 165 166 call_once_in_parallel_for(512, f, 1000, -1000); 167 168 REQUIRE(f.sum == 0); 169 } 170 { 171 sum_functor f; 172 173 call_once_threads(utils::get_platform_max_threads(), f, 0, -1, -5); 174 175 REQUIRE(f.sum == -6); 176 } 177 } 178 179 //! Test for correct handling move-only arguments 180 //! \brief \ref interface \ref requirement 181 TEST_CASE("only calls once - move only argument") { 182 const int value = 42; 183 int ready{0}; 184 185 auto func = [&ready, &value] (move_only_type other) { 186 REQUIRE(other.my_pointer == &value); 187 ready++; 188 }; 189 190 { 191 move_only_type mv(&value); 192 193 call_once_in_parallel_for(512, func, std::move(mv)); 194 195 REQUIRE(ready == 1); 196 REQUIRE(mv.my_pointer == nullptr); 197 } 198 199 { 200 move_only_type mv(&value); 201 202 call_once_threads(utils::get_platform_max_threads(), func, std::move(mv)); 203 204 REQUIRE(ready == 2); 205 REQUIRE(mv.my_pointer == nullptr); 206 } 207 } 208 209 //! Stress test for functor to be called only once 210 //! \brief \ref interface \ref requirement \ref stress 211 TEST_CASE("only calls once - stress test") { 212 #if TBB_TEST_LOW_WORKLOAD 213 constexpr std::size_t N = 32; 214 #elif __TBB_x86_32 || __aarch32__ || __ANDROID__ 215 // Some C++ implementations allocate 8MB stacks for std::thread on 32 bit platforms 216 // that makes impossible to create more than ~500 threads. 217 // Android has been added to decrease testing time. 218 constexpr std::size_t N = tbb::detail::d0::max_nfs_size * 2; 219 #else 220 constexpr std::size_t N = tbb::detail::d0::max_nfs_size * 4; 221 #endif 222 { 223 increment_functor f; 224 225 call_once_threads(N, f); 226 227 REQUIRE(f.ct == 1); 228 } 229 { 230 increment_functor f; 231 232 utils::SpinBarrier barrier{N}; 233 tbb::collaborative_once_flag flag; 234 utils::NativeParallelFor(N, [&](std::size_t) { 235 for (int i = 0; i < 100; ++i) { 236 REQUIRE(f.ct == i); 237 barrier.wait([&] { 238 flag.~collaborative_once_flag(); 239 new (&flag) tbb::collaborative_once_flag{}; 240 }); 241 tbb::collaborative_call_once(flag, f); 242 } 243 }); 244 } 245 } 246 247 #if TBB_USE_EXCEPTIONS 248 249 //! Test for collaborative_call_once exception handling 250 //! \brief \ref error_guessing 251 TEST_CASE("handles exceptions - state reset") { 252 bool b{ false }; 253 auto setB = [&b]() { b = true; }; 254 auto setBAndCancel = [&b]() { 255 b = true; 256 throw call_once_exception{}; 257 }; 258 259 tbb::collaborative_once_flag flag; 260 REQUIRE_THROWS_AS(tbb::collaborative_call_once(flag, setBAndCancel), call_once_exception); 261 REQUIRE(b); 262 263 b = false; 264 REQUIRE_THROWS_AS(tbb::collaborative_call_once(flag, setBAndCancel), call_once_exception); 265 REQUIRE(b); 266 267 b = false; 268 tbb::collaborative_call_once(flag, setB); 269 REQUIRE(b); 270 271 b = false; 272 tbb::collaborative_call_once(flag, setB); // Now the call_once flag should be set. 273 REQUIRE(!b); 274 275 b = false; 276 REQUIRE_NOTHROW(tbb::collaborative_call_once(flag, setBAndCancel)); // Flag still set, so it shouldn't be called. 277 REQUIRE(!b); 278 } 279 280 //! Stress test for collaborative_call_once exception handling 281 //! \brief \ref error_guessing \ref stress 282 TEST_CASE("handles exceptions - stress test") { 283 #if TBB_TEST_LOW_WORKLOAD 284 constexpr std::size_t N = 32; 285 #elif __TBB_x86_32 || __aarch32__ || __ANDROID__ 286 // Some C++ implementations allocate 8MB stacks for std::thread on 32 bit platforms 287 // that makes impossible to create more than ~500 threads. 288 // Android has been added to decrease testing time. 289 constexpr std::size_t N = tbb::detail::d0::max_nfs_size * 2; 290 #else 291 constexpr std::size_t N = tbb::detail::d0::max_nfs_size * 4; 292 #endif 293 294 int data{0}; 295 std::atomic<bool> run_again{true}; 296 297 auto throwing_func = [&] { 298 utils::doDummyWork(10000); 299 if (data < 100) { 300 data++; 301 throw call_once_exception{}; 302 } 303 run_again = false; 304 }; 305 306 tbb::collaborative_once_flag flag; 307 308 utils::NativeParallelFor(N, [&](std::size_t) { 309 while(run_again) { 310 try { 311 tbb::collaborative_call_once(flag, throwing_func); 312 } catch (const call_once_exception&) { 313 // We expecting only const call_once_exception& 314 } catch (...) { 315 FAIL("Unexpected exception"); 316 } 317 } 318 }); 319 REQUIRE(data == 100); 320 } 321 322 #endif 323 324 //! Test for multiple help from moonlighting threads 325 //! \brief \ref interface \ref requirement 326 TEST_CASE("multiple help") { 327 std::size_t num_threads = utils::get_platform_max_threads(); 328 utils::SpinBarrier barrier{num_threads}; 329 330 tbb::collaborative_once_flag flag; 331 332 tbb::parallel_for<std::size_t>(0, num_threads, [&](std::size_t) { 333 barrier.wait(); 334 tbb::collaborative_call_once(flag, [&] { 335 tbb::parallel_for<std::size_t>(0, num_threads, [&](std::size_t) { 336 barrier.wait(); 337 }); 338 }); 339 }); 340 } 341 342 //! Test for collaborative work from different arenas 343 //! \brief \ref interface \ref requirement 344 TEST_CASE("multiple arenas") { 345 int num_threads = static_cast<int>(utils::get_platform_max_threads()); 346 utils::SpinBarrier barrier(num_threads); 347 tbb::task_arena a1(num_threads), a2(num_threads); 348 349 tbb::collaborative_once_flag flag; 350 for (auto i = 0; i < num_threads - 1; ++i) { 351 a1.enqueue([&] { 352 barrier.wait(); 353 barrier.wait(); 354 355 tbb::collaborative_call_once(flag, [] { 356 FAIL("Unreachable code. collaborative_once_flag must be already initialized at this moment"); 357 }); 358 // To sync collaborative_once_flag lifetime 359 barrier.wait(); 360 }); 361 } 362 363 barrier.wait(); 364 365 a2.execute([&] { 366 utils::ConcurrencyTracker ct; 367 tbb::parallel_for(0, num_threads, [&](int) { 368 CHECK(utils::ConcurrencyTracker::PeakParallelism() == 1); 369 }); 370 tbb::collaborative_call_once(flag, [&] { 371 barrier.wait(); 372 tbb::parallel_for(0, num_threads, [&](int) { 373 barrier.wait(); 374 }); 375 }); 376 // To sync collaborative_once_flag lifetime 377 barrier.wait(); 378 }); 379 } 380 381 using FibBuffer = std::vector<std::pair<tbb::collaborative_once_flag, std::uint64_t>>; 382 std::uint64_t collaborative_recursive_fib(int n, FibBuffer& buffer) { 383 if (n <= 1) { 384 return 1; 385 } 386 tbb::collaborative_call_once(buffer[n].first, [&]() { 387 std::uint64_t a, b; 388 tbb::parallel_invoke([&] { a = collaborative_recursive_fib(n - 2, buffer); }, 389 [&] { b = collaborative_recursive_fib(n - 1, buffer); }); 390 buffer[n].second = a + b; 391 }); 392 return buffer[n].second; 393 } 394 395 std::uint64_t collaborative_recursive_fib(int n) { 396 FibBuffer buffer(n); 397 return collaborative_recursive_fib(n-1, buffer); 398 } 399 400 //! Correctness test for Fibonacci example 401 //! \brief \ref interface \ref requirement 402 TEST_CASE("fibonacci example") { 403 constexpr int N = 93; 404 constexpr std::uint64_t expected_result = 12200160415121876738ull; 405 406 auto collaborative = collaborative_recursive_fib(N); 407 408 REQUIRE(collaborative == expected_result); 409 } 410 411 #if _MSC_VER && !defined(__INTEL_COMPILER) 412 #pragma warning( pop ) 413 #endif 414