1 /* 2 Copyright (c) 2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #define TBB_PREVIEW_COLLABORATIVE_CALL_ONCE 1 18 19 #if _MSC_VER && !defined(__INTEL_COMPILER) 20 // unreachable code 21 #pragma warning( push ) 22 #pragma warning( disable: 4702 ) 23 #endif 24 25 #if __INTEL_COMPILER && _MSC_VER 26 #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated 27 #endif 28 29 #include "common/config.h" 30 31 // Include first to check missed header dependencies 32 #include "tbb/collaborative_call_once.h" 33 34 #include "common/test.h" 35 #include "common/exception_handling.h" 36 #include "common/utils_concurrency_limit.h" 37 38 #include "tbb/parallel_invoke.h" 39 #include "tbb/parallel_reduce.h" 40 #include "tbb/task_arena.h" 41 42 //! \file test_collaborative_call_once.cpp 43 //! \brief Tests for [preview] functionality 44 45 struct increment_functor { 46 int ct{0}; 47 48 void operator()() { 49 ct++; 50 } 51 }; 52 53 struct sum_functor { 54 int sum{0}; 55 56 template<typename T> 57 void operator()(T first_op) { 58 sum += first_op; 59 } 60 61 template<typename T, typename... Args> 62 void operator()(T first_op, Args&&... args) { 63 (*this)(first_op); 64 (*this)(std::forward<Args>(args)...); 65 } 66 }; 67 68 struct move_only_type { 69 const int* my_pointer; 70 move_only_type(move_only_type && other): my_pointer(other.my_pointer){ other.my_pointer=nullptr; } 71 explicit move_only_type(const int* value): my_pointer(value) {} 72 }; 73 74 75 class call_once_exception : public std::exception {}; 76 77 template<typename Fn, typename... Args> 78 void call_once_in_for_loop(std::size_t N, Fn&& body, Args&&... args) { 79 tbb::collaborative_once_flag flag; 80 for (std::size_t i = 0; i < N; ++i) { 81 tbb::collaborative_call_once(flag, std::forward<Fn>(body), std::forward<Args>(args)...); 82 } 83 } 84 85 template<typename Fn, typename... Args> 86 void call_once_in_parallel_for(std::size_t N, Fn&& body, Args&&... args) { 87 tbb::collaborative_once_flag flag; 88 #if __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN 89 auto stored_pack = tbb::detail::d0::save_pack(std::forward<Args>(args)...); 90 auto func = [&] { tbb::detail::d0::call(std::forward<Fn>(body), std::move(stored_pack)); }; 91 #endif // __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN 92 93 tbb::parallel_for(tbb::blocked_range<size_t>(0, N), [&](const tbb::blocked_range<size_t>& range) { 94 for (size_t i = range.begin(); i != range.end(); ++i) { 95 #if __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN 96 tbb::collaborative_call_once(flag, func); 97 #else 98 tbb::collaborative_call_once(flag, std::forward<Fn>(body), std::forward<Args>(args)...); 99 #endif //__TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN 100 } 101 }); 102 } 103 104 template<typename Fn, typename... Args> 105 void call_once_threads(std::size_t N, Fn&& body, Args&&... args) { 106 tbb::collaborative_once_flag flag; 107 std::vector<std::thread> threads; 108 109 #if __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN 110 auto stored_pack = tbb::detail::d0::save_pack(std::forward<Args>(args)...); 111 auto func = [&] { tbb::detail::d0::call(std::forward<Fn>(body), std::move(stored_pack)); }; 112 #endif // __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN 113 114 for (std::size_t i = 0; i < N; ++i) 115 { 116 threads.push_back(std::thread([&]() { 117 #if __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN 118 tbb::collaborative_call_once(flag, func); 119 #else 120 tbb::collaborative_call_once(flag, std::forward<Fn>(body), std::forward<Args>(args)...); 121 #endif // __TBB_GCC_PARAMETER_PACK_IN_LAMBDAS_BROKEN 122 })); 123 } 124 for (auto& thread : threads) { 125 thread.join(); 126 } 127 } 128 129 //! Test for functor to be called only once 130 //! \brief \ref interface \ref requirement 131 TEST_CASE("only calls once 1") { 132 { 133 increment_functor f; 134 135 call_once_in_for_loop(1024, f); 136 137 REQUIRE( f.ct == 1); 138 } 139 { 140 increment_functor f; 141 142 call_once_in_parallel_for(100, f); 143 144 REQUIRE(f.ct == 1); 145 } 146 { 147 increment_functor f; 148 149 call_once_threads(utils::get_platform_max_threads(), f); 150 151 REQUIRE(f.ct == 1); 152 } 153 } 154 155 //! Test for functor to be called only once 156 //! \brief \ref interface \ref requirement 157 TEST_CASE("only calls once 2") { 158 { 159 sum_functor f; 160 161 call_once_in_for_loop(1024, f, 1, 2, 3 ,4); 162 163 REQUIRE(f.sum == 10); 164 } 165 { 166 sum_functor f; 167 168 call_once_in_parallel_for(512, f, 1000, -1000); 169 170 REQUIRE(f.sum == 0); 171 } 172 { 173 sum_functor f; 174 175 call_once_threads(utils::get_platform_max_threads(), f, 0, -1, -5); 176 177 REQUIRE(f.sum == -6); 178 } 179 } 180 181 //! Test for correct handling move-only arguments 182 //! \brief \ref interface \ref requirement 183 TEST_CASE("only calls once - move only argument") { 184 const int value = 42; 185 int ready{0}; 186 187 auto func = [&ready, &value] (move_only_type other) { 188 REQUIRE(other.my_pointer == &value); 189 ready++; 190 }; 191 192 { 193 move_only_type mv(&value); 194 195 call_once_in_parallel_for(512, func, std::move(mv)); 196 197 REQUIRE(ready == 1); 198 REQUIRE(mv.my_pointer == nullptr); 199 } 200 201 { 202 move_only_type mv(&value); 203 204 call_once_threads(utils::get_platform_max_threads(), func, std::move(mv)); 205 206 REQUIRE(ready == 2); 207 REQUIRE(mv.my_pointer == nullptr); 208 } 209 } 210 211 //! Stress test for functor to be called only once 212 //! \brief \ref interface \ref requirement \ref stress 213 TEST_CASE("only calls once - stress test") { 214 #if TBB_TEST_LOW_WORKLOAD 215 constexpr std::size_t N = 32; 216 #elif __TBB_x86_32 || __aarch32__ || __ANDROID__ 217 // Some C++ implementations allocate 8MB stacks for std::thread on 32 bit platforms 218 // that makes impossible to create more than ~500 threads. 219 // Android has been added to decrease testing time. 220 constexpr std::size_t N = tbb::detail::d0::max_nfs_size * 2; 221 #else 222 constexpr std::size_t N = tbb::detail::d0::max_nfs_size * 4; 223 #endif 224 { 225 increment_functor f; 226 227 call_once_threads(N, f); 228 229 REQUIRE(f.ct == 1); 230 } 231 { 232 increment_functor f; 233 234 utils::SpinBarrier barrier{N}; 235 tbb::collaborative_once_flag flag; 236 utils::NativeParallelFor(N, [&](std::size_t) { 237 for (int i = 0; i < 100; ++i) { 238 REQUIRE(f.ct == i); 239 barrier.wait([&] { 240 flag.~collaborative_once_flag(); 241 new (&flag) tbb::collaborative_once_flag{}; 242 }); 243 tbb::collaborative_call_once(flag, f); 244 } 245 }); 246 } 247 } 248 249 #if TBB_USE_EXCEPTIONS 250 251 //! Test for collaborative_call_once exception handling 252 //! \brief \ref error_guessing 253 TEST_CASE("handles exceptions - state reset") { 254 bool b{ false }; 255 auto setB = [&b]() { b = true; }; 256 auto setBAndCancel = [&b]() { 257 b = true; 258 throw call_once_exception{}; 259 }; 260 261 tbb::collaborative_once_flag flag; 262 REQUIRE_THROWS_AS(tbb::collaborative_call_once(flag, setBAndCancel), call_once_exception); 263 REQUIRE(b); 264 265 b = false; 266 REQUIRE_THROWS_AS(tbb::collaborative_call_once(flag, setBAndCancel), call_once_exception); 267 REQUIRE(b); 268 269 b = false; 270 tbb::collaborative_call_once(flag, setB); 271 REQUIRE(b); 272 273 b = false; 274 tbb::collaborative_call_once(flag, setB); // Now the call_once flag should be set. 275 REQUIRE(!b); 276 277 b = false; 278 REQUIRE_NOTHROW(tbb::collaborative_call_once(flag, setBAndCancel)); // Flag still set, so it shouldn't be called. 279 REQUIRE(!b); 280 } 281 282 //! Stress test for collaborative_call_once exception handling 283 //! \brief \ref error_guessing \ref stress 284 TEST_CASE("handles exceptions - stress test") { 285 #if TBB_TEST_LOW_WORKLOAD 286 constexpr std::size_t N = 32; 287 #elif __TBB_x86_32 || __aarch32__ || __ANDROID__ 288 // Some C++ implementations allocate 8MB stacks for std::thread on 32 bit platforms 289 // that makes impossible to create more than ~500 threads. 290 // Android has been added to decrease testing time. 291 constexpr std::size_t N = tbb::detail::d0::max_nfs_size * 2; 292 #else 293 constexpr std::size_t N = tbb::detail::d0::max_nfs_size * 4; 294 #endif 295 296 int data{0}; 297 std::atomic<bool> run_again{true}; 298 299 auto throwing_func = [&] { 300 utils::doDummyWork(10000); 301 if (data < 100) { 302 data++; 303 throw call_once_exception{}; 304 } 305 run_again = false; 306 }; 307 308 tbb::collaborative_once_flag flag; 309 310 utils::NativeParallelFor(N, [&](std::size_t) { 311 while(run_again) { 312 try { 313 tbb::collaborative_call_once(flag, throwing_func); 314 } catch (const call_once_exception&) { 315 // We expecting only const call_once_exception& 316 } catch (...) { 317 FAIL("Unexpected exception"); 318 } 319 } 320 }); 321 REQUIRE(data == 100); 322 } 323 324 #endif 325 326 //! Test for multiple help from moonlighting threads 327 //! \brief \ref interface \ref requirement 328 TEST_CASE("multiple help") { 329 std::size_t num_threads = utils::get_platform_max_threads(); 330 utils::SpinBarrier barrier{num_threads}; 331 332 tbb::collaborative_once_flag flag; 333 334 tbb::parallel_for<std::size_t>(0, num_threads, [&](std::size_t) { 335 barrier.wait(); 336 tbb::collaborative_call_once(flag, [&] { 337 tbb::parallel_for<std::size_t>(0, num_threads, [&](std::size_t) { 338 barrier.wait(); 339 }); 340 }); 341 }); 342 } 343 344 //! Test for collaborative work from different arenas 345 //! \brief \ref interface \ref requirement 346 TEST_CASE("multiple arenas") { 347 int num_threads = static_cast<int>(utils::get_platform_max_threads()); 348 utils::SpinBarrier barrier(num_threads); 349 tbb::task_arena a1(num_threads), a2(num_threads); 350 351 tbb::collaborative_once_flag flag; 352 for (auto i = 0; i < num_threads - 1; ++i) { 353 a1.enqueue([&] { 354 barrier.wait(); 355 barrier.wait(); 356 357 tbb::collaborative_call_once(flag, [] { 358 FAIL("Unreachable code. collaborative_once_flag must be already initialized at this moment"); 359 }); 360 // To sync collaborative_once_flag lifetime 361 barrier.wait(); 362 }); 363 } 364 365 barrier.wait(); 366 367 a2.execute([&] { 368 utils::ConcurrencyTracker ct; 369 tbb::parallel_for(0, num_threads, [&](int) { 370 CHECK(utils::ConcurrencyTracker::PeakParallelism() == 1); 371 }); 372 tbb::collaborative_call_once(flag, [&] { 373 barrier.wait(); 374 tbb::parallel_for(0, num_threads, [&](int) { 375 barrier.wait(); 376 }); 377 }); 378 // To sync collaborative_once_flag lifetime 379 barrier.wait(); 380 }); 381 } 382 383 using FibBuffer = std::vector<std::pair<tbb::collaborative_once_flag, std::uint64_t>>; 384 std::uint64_t collaborative_recursive_fib(int n, FibBuffer& buffer) { 385 if (n <= 1) { 386 return 1; 387 } 388 tbb::collaborative_call_once(buffer[n].first, [&]() { 389 std::uint64_t a, b; 390 tbb::parallel_invoke([&] { a = collaborative_recursive_fib(n - 2, buffer); }, 391 [&] { b = collaborative_recursive_fib(n - 1, buffer); }); 392 buffer[n].second = a + b; 393 }); 394 return buffer[n].second; 395 } 396 397 std::uint64_t collaborative_recursive_fib(int n) { 398 FibBuffer buffer(n); 399 return collaborative_recursive_fib(n-1, buffer); 400 } 401 402 //! Correctness test for Fibonacci example 403 //! \brief \ref interface \ref requirement 404 TEST_CASE("fibonacci example") { 405 constexpr int N = 93; 406 constexpr std::uint64_t expected_result = 12200160415121876738ull; 407 408 auto collaborative = collaborative_recursive_fib(N); 409 410 REQUIRE(collaborative == expected_result); 411 } 412 413 #if _MSC_VER && !defined(__INTEL_COMPILER) 414 #pragma warning( pop ) 415 #endif 416