1 /* 2 Copyright (c) 2005-2023 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "common/test.h" 18 #include "common/utils.h" 19 #include "common/dummy_body.h" 20 #include "common/spin_barrier.h" 21 #include "common/utils_concurrency_limit.h" 22 #include "common/cpu_usertime.h" 23 24 #include "tbb/task.h" 25 #include "tbb/task_group.h" 26 #include "tbb/parallel_for.h" 27 #include "tbb/cache_aligned_allocator.h" 28 #include "tbb/global_control.h" 29 #include "tbb/concurrent_vector.h" 30 31 #include <atomic> 32 #include <thread> 33 #include <thread> 34 35 //! \file test_task.cpp 36 //! \brief Test for [internal] functionality 37 struct EmptyBody { 38 void operator()() const {} 39 }; 40 41 #if _MSC_VER && !defined(__INTEL_COMPILER) 42 // unreachable code 43 #pragma warning( push ) 44 #pragma warning( disable: 4702 ) 45 #endif 46 47 template <typename Body = EmptyBody> 48 class CountingTask : public tbb::detail::d1::task { 49 public: 50 CountingTask( Body body, tbb::detail::d1::wait_context& wait ) : my_body(body), my_wait(wait) {} 51 52 CountingTask( tbb::detail::d1::wait_context& wait ) : my_wait(wait) {} 53 54 task* execute( tbb::detail::d1::execution_data& ) override { 55 ++my_execute_counter; 56 my_body(); 57 my_wait.release(); 58 return nullptr; 59 } 60 61 task* cancel( tbb::detail::d1::execution_data& ) override { 62 ++my_cancel_counter; 63 my_wait.release(); 64 return nullptr; 65 } 66 67 static void reset() { 68 my_execute_counter = 0; 69 my_cancel_counter = 0; 70 } 71 72 static std::size_t execute_counter() { return my_execute_counter; } 73 static std::size_t cancel_counter() { return my_cancel_counter; } 74 75 private: 76 Body my_body; 77 tbb::detail::d1::wait_context& my_wait; 78 79 static std::atomic<std::size_t> my_execute_counter; 80 static std::atomic<std::size_t> my_cancel_counter; 81 }; // struct CountingTask 82 83 84 #if _MSC_VER && !defined(__INTEL_COMPILER) 85 #pragma warning( pop ) 86 #endif // warning 4702 is back 87 88 template <typename Body> 89 std::atomic<std::size_t> CountingTask<Body>::my_execute_counter(0); 90 91 template <typename Body> 92 std::atomic<std::size_t> CountingTask<Body>::my_cancel_counter(0); 93 94 #if TBB_USE_EXCEPTIONS 95 void test_cancellation_on_exception( bool reset_ctx ) { 96 tbb::detail::d1::wait_context wait(1); 97 tbb::task_group_context test_context; 98 auto throw_body = [] { 99 throw 1; 100 }; 101 CountingTask<decltype(throw_body)> task(throw_body, wait); 102 103 constexpr std::size_t iter_counter = 1000; 104 for (std::size_t i = 0; i < iter_counter; ++i) { 105 try { 106 tbb::detail::d1::execute_and_wait(task, test_context, wait, test_context); 107 } catch(int ex) { 108 REQUIRE(ex == 1); 109 } 110 if (reset_ctx) { 111 test_context.reset(); 112 } 113 wait.reserve(1); 114 } 115 wait.release(1); 116 117 REQUIRE_MESSAGE(task.execute_counter() == (reset_ctx ? iter_counter : 1), "Some task was not executed"); 118 REQUIRE_MESSAGE(task.cancel_counter() == iter_counter, "Some task was not canceled after the exception occurs"); 119 task.reset(); 120 } 121 #endif // TBB_USE_EXCEPTIONS 122 123 //! \brief \ref error_guessing 124 TEST_CASE("External threads sleep") { 125 if (utils::get_platform_max_threads() < 2) return; 126 utils::SpinBarrier barrier(2); 127 128 tbb::task_group test_gr; 129 130 test_gr.run([&] { 131 barrier.wait(); 132 TestCPUUserTime(2); 133 }); 134 135 barrier.wait(); 136 137 test_gr.wait(); 138 } 139 140 //! \brief \ref error_guessing 141 TEST_CASE("Test that task was executed p times") { 142 tbb::detail::d1::wait_context wait(1); 143 tbb::task_group_context test_context; 144 CountingTask<> test_task(wait); 145 146 constexpr std::size_t iter_counter = 10000; 147 for (std::size_t i = 0; i < iter_counter; ++i) { 148 tbb::detail::d1::execute_and_wait(test_task, test_context, wait, test_context); 149 wait.reserve(1); 150 } 151 152 wait.release(1); 153 154 REQUIRE_MESSAGE(CountingTask<>::execute_counter() == iter_counter, "The task was not executed necessary times"); 155 REQUIRE_MESSAGE(CountingTask<>::cancel_counter() == 0, "Some instance of the task was canceled"); 156 CountingTask<>::reset(); 157 } 158 159 #if TBB_USE_EXCEPTIONS 160 //! \brief \ref error_guessing 161 TEST_CASE("Test cancellation on exception") { 162 test_cancellation_on_exception(/*reset_ctx = */true); 163 test_cancellation_on_exception(/*reset_ctx = */false); 164 } 165 #endif // TBB_USE_EXCEPTIONS 166 167 //! \brief \ref error_guessing 168 TEST_CASE("Simple test parallelism usage") { 169 std::uint32_t threads_num = static_cast<std::uint32_t>(utils::get_platform_max_threads()); 170 utils::SpinBarrier barrier(threads_num); 171 172 auto barrier_wait = [&barrier] { 173 barrier.wait(); 174 }; 175 176 tbb::detail::d1::wait_context wait(threads_num); 177 tbb::detail::d1::task_group_context test_context; 178 using task_type = CountingTask<decltype(barrier_wait)>; 179 180 std::vector<task_type, tbb::cache_aligned_allocator<task_type>> vector_test_task(threads_num, task_type(barrier_wait, wait)); 181 182 constexpr std::size_t iter_counter = 100; 183 for (std::size_t i = 0; i < iter_counter; ++i) { 184 for (std::size_t j = 0; j < threads_num; ++j) { 185 tbb::detail::d1::spawn(vector_test_task[j], test_context); 186 } 187 tbb::detail::d1::wait(wait, test_context); 188 wait.reserve(threads_num); 189 } 190 wait.release(threads_num); 191 192 REQUIRE_MESSAGE(task_type::execute_counter() == iter_counter * threads_num, "Some task was not executed"); 193 REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled"); 194 task_type::reset(); 195 } 196 197 //! \brief \ref error_guessing 198 TEST_CASE("Test parallelism usage with parallel_for") { 199 std::uint32_t task_threads_num = static_cast<std::uint32_t>(utils::get_platform_max_threads()); 200 utils::SpinBarrier barrier(task_threads_num); 201 202 auto barrier_wait = [&barrier] { 203 barrier.wait(); 204 }; 205 206 std::size_t pfor_iter_count = 10000; 207 std::atomic<std::size_t> pfor_counter(0); 208 209 auto parallel_for_func = [&pfor_counter, pfor_iter_count] { 210 tbb::parallel_for(tbb::blocked_range<std::size_t>(0, pfor_iter_count), 211 [&pfor_counter] (tbb::blocked_range<std::size_t>& range) { 212 for (auto it = range.begin(); it != range.end(); ++it) { 213 ++pfor_counter; 214 } 215 } 216 ); 217 }; 218 219 tbb::detail::d1::wait_context wait(task_threads_num); 220 tbb::detail::d1::task_group_context test_context; 221 using task_type = CountingTask<decltype(barrier_wait)>; 222 std::vector<task_type, tbb::cache_aligned_allocator<task_type>> vector_test_task(task_threads_num, task_type(barrier_wait, wait)); 223 224 constexpr std::size_t iter_count = 10; 225 constexpr std::size_t pfor_threads_num = 4; 226 for (std::size_t i = 0; i < iter_count; ++i) { 227 std::vector<std::thread> pfor_threads; 228 229 for (std::size_t j = 0; j < task_threads_num; ++j) { 230 tbb::detail::d1::spawn(vector_test_task[j], test_context); 231 } 232 233 for (std::size_t k = 0; k < pfor_threads_num; ++k) { 234 pfor_threads.emplace_back(parallel_for_func); 235 } 236 237 tbb::detail::d1::wait(wait, test_context); 238 239 for (auto& thread : pfor_threads) { 240 if (thread.joinable()) { 241 thread.join(); 242 } 243 } 244 245 wait.reserve(task_threads_num); 246 } 247 wait.release(task_threads_num); 248 249 REQUIRE_MESSAGE(task_type::execute_counter() == task_threads_num * iter_count, "Some task was not executed"); 250 REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled"); 251 REQUIRE_MESSAGE(pfor_counter == iter_count * pfor_threads_num * pfor_iter_count, "Some parallel_for thread was not finished"); 252 task_type::reset(); 253 } 254 255 //! \brief \ref error_guessing 256 TEST_CASE("Test parallelism usage with spawn tasks in different threads") { 257 std::uint32_t threads_num = static_cast<std::uint32_t>(utils::get_platform_max_threads()); 258 utils::SpinBarrier barrier(threads_num); 259 260 auto barrier_wait = [&barrier] { 261 barrier.wait(); 262 }; 263 264 tbb::detail::d1::wait_context wait(threads_num); 265 tbb::detail::d1::task_group_context test_context; 266 using task_type = CountingTask<decltype(barrier_wait)>; 267 std::vector<task_type, tbb::cache_aligned_allocator<task_type>> vector_test_task(threads_num, task_type(barrier_wait, wait)); 268 269 auto thread_func = [&vector_test_task, &test_context] ( std::size_t idx ) { 270 tbb::detail::d1::spawn(vector_test_task[idx], test_context); 271 }; 272 273 constexpr std::size_t iter_count = 10; 274 for (std::size_t i = 0; i < iter_count; ++i) { 275 std::vector<std::thread> threads; 276 277 for (std::size_t k = 0; k < threads_num - 1; ++k) { 278 threads.emplace_back(thread_func, k); 279 } 280 281 for (auto& thread : threads) { 282 if (thread.joinable()) { 283 thread.join(); 284 } 285 } 286 287 tbb::detail::d1::execute_and_wait(vector_test_task[threads_num - 1], test_context, wait, test_context); 288 wait.reserve(threads_num); 289 } 290 wait.release(threads_num); 291 292 REQUIRE_MESSAGE(task_type::execute_counter() == iter_count * threads_num, "Some task was not executed"); 293 REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled"); 294 task_type::reset(); 295 } 296 297 class SpawningTaskBody; 298 299 using SpawningTask = CountingTask<SpawningTaskBody>; 300 301 class SpawningTaskBody { 302 public: 303 using task_pool_type = std::vector<SpawningTask, tbb::cache_aligned_allocator<SpawningTask>>; 304 305 SpawningTaskBody( task_pool_type& task_pool, tbb::task_group_context& test_ctx ) 306 : my_task_pool(task_pool), my_test_ctx(test_ctx) {} 307 308 void operator()() const { 309 std::size_t delta = 7; 310 std::size_t start_idx = my_current_task.fetch_add(delta); 311 312 if (start_idx < my_task_pool.size()) { 313 for (std::size_t idx = start_idx; idx != std::min(my_task_pool.size(), start_idx + delta); ++idx) { 314 tbb::detail::d1::spawn(my_task_pool[idx], my_test_ctx); 315 } 316 } 317 } 318 private: 319 task_pool_type& my_task_pool; 320 tbb::task_group_context& my_test_ctx; 321 static std::atomic<std::size_t> my_current_task; 322 }; // class SpawningTaskBody 323 324 std::atomic<std::size_t> SpawningTaskBody::my_current_task(0); 325 326 //! \brief \ref error_guessing 327 TEST_CASE("Actively adding tasks") { 328 std::uint32_t task_number = 500 * static_cast<std::uint32_t>(utils::get_platform_max_threads()); 329 330 tbb::detail::d1::wait_context wait(task_number + 1); 331 tbb::task_group_context test_context; 332 333 SpawningTaskBody::task_pool_type task_pool; 334 335 SpawningTaskBody task_body{task_pool, test_context}; 336 for (std::size_t i = 0; i < task_number; ++i) { 337 task_pool.emplace_back(task_body, wait); 338 } 339 340 SpawningTask first_task(task_body, wait); 341 tbb::detail::d1::execute_and_wait(first_task, test_context, wait, test_context); 342 343 REQUIRE_MESSAGE(SpawningTask::execute_counter() == task_number + 1, "Some tasks were not executed"); // Is it right? 344 REQUIRE_MESSAGE(SpawningTask::cancel_counter() == 0, "Some tasks were canceled"); 345 } 346 347 #if __TBB_RESUMABLE_TASKS 348 struct suspended_task : public tbb::detail::d1::task { 349 350 suspended_task(tbb::task::suspend_point tag, tbb::detail::d1::wait_context& wait) 351 : my_suspend_tag(tag), my_wait(wait) 352 {} 353 354 task* execute(tbb::detail::d1::execution_data&) override { 355 tbb::parallel_for(tbb::blocked_range<std::size_t>(0, 100000), 356 [] (const tbb::blocked_range<std::size_t>& range) { 357 // Make some heavy work 358 std::atomic<int> sum{}; 359 for (auto it = range.begin(); it != range.end(); ++it) { 360 ++sum; 361 } 362 }, 363 tbb::static_partitioner{} 364 ); 365 366 my_wait.release(); 367 tbb::task::resume(my_suspend_tag); 368 return nullptr; 369 } 370 371 task* cancel(tbb::detail::d1::execution_data&) override { 372 FAIL("The function should never be called."); 373 return nullptr; 374 } 375 376 tbb::task::suspend_point my_suspend_tag; 377 tbb::detail::d1::wait_context& my_wait; 378 }; 379 380 //! \brief \ref error_guessing 381 TEST_CASE("Isolation + resumable tasks") { 382 std::atomic<int> suspend_flag{}; 383 tbb::task_group_context test_context; 384 385 std::atomic<int> suspend_count{}; 386 std::atomic<int> resume_count{}; 387 388 tbb::parallel_for(tbb::blocked_range<std::size_t>(0, 100000), 389 [&suspend_flag, &test_context, &suspend_count, &resume_count] (const tbb::blocked_range<std::size_t>& range) { 390 int ticket = 0; 391 for (auto it = range.begin(); it != range.end(); ++it) { 392 ticket = suspend_flag++; 393 } 394 395 if (ticket % 5 == 0) { 396 std::vector<suspended_task, tbb::cache_aligned_allocator<suspended_task>> test_task; 397 tbb::detail::d1::wait_context wait(1); 398 ++suspend_count; 399 tbb::this_task_arena::isolate([&wait, &test_context, &test_task] { 400 auto thread_id = std::this_thread::get_id(); 401 tbb::task::suspend([&wait, &test_context, &test_task, thread_id] (tbb::task::suspend_point tag) { 402 CHECK(thread_id == std::this_thread::get_id()); 403 test_task.emplace_back(tag, wait); 404 tbb::detail::d1::spawn(test_task[0], test_context); 405 }); 406 } 407 ); 408 tbb::detail::d1::wait(wait, test_context); 409 ++resume_count; 410 } 411 } 412 ); 413 414 CHECK(suspend_count == resume_count); 415 } 416 417 struct bypass_task : public tbb::detail::d1::task { 418 using task_pool_type = std::vector<bypass_task, tbb::cache_aligned_allocator<bypass_task>>; 419 420 bypass_task(tbb::detail::d1::wait_context& wait, task_pool_type& task_pool, 421 std::atomic<int>& resume_flag, tbb::task::suspend_point& suspend_tag) 422 : my_wait(wait), my_task_pool(task_pool), my_resume_flag(resume_flag), my_suspend_tag(suspend_tag) 423 {} 424 425 task* execute(tbb::detail::d1::execution_data&) override { 426 utils::doDummyWork(10000); 427 428 int expected = 1; 429 if (my_resume_flag.compare_exchange_strong(expected, 2)) { 430 tbb::task::resume(my_suspend_tag); 431 } 432 433 std::size_t ticket = my_current_task++; 434 task* next = ticket < my_task_pool.size() ? &my_task_pool[ticket] : nullptr; 435 436 if (!next && my_resume_flag != 2) { 437 // Rarely all tasks can be executed before the suspend. 438 // So, wait for the suspend before leaving. 439 utils::SpinWaitWhileEq(my_resume_flag, 0); 440 expected = 1; 441 if (my_resume_flag.compare_exchange_strong(expected, 2)) { 442 tbb::task::resume(my_suspend_tag); 443 } 444 } 445 446 my_wait.release(); 447 return next; 448 } 449 450 task* cancel(tbb::detail::d1::execution_data&) override { 451 FAIL("The function should never be called."); 452 return nullptr; 453 } 454 455 tbb::detail::d1::wait_context& my_wait; 456 task_pool_type& my_task_pool; 457 std::atomic<int>& my_resume_flag; 458 tbb::task::suspend_point& my_suspend_tag; 459 static std::atomic<int> my_current_task; 460 }; 461 462 std::atomic<int> bypass_task::my_current_task(0); 463 464 thread_local int test_tls = 0; 465 466 //! \brief \ref error_guessing 467 TEST_CASE("Bypass suspended by resume") { 468 std::uint32_t task_number = 500 * static_cast<std::uint32_t>(utils::get_platform_max_threads()); 469 tbb::task_group_context test_context; 470 tbb::detail::d1::wait_context wait(task_number + 1); 471 472 test_tls = 1; 473 474 std::atomic<int> resume_flag{0}; 475 tbb::task::suspend_point test_suspend_tag; 476 477 std::vector<bypass_task, tbb::cache_aligned_allocator<bypass_task>> test_task_pool; 478 479 for (std::size_t i = 0; i < task_number; ++i) { 480 test_task_pool.emplace_back(wait, test_task_pool, resume_flag, test_suspend_tag); 481 } 482 483 for (std::size_t i = 0; i < utils::get_platform_max_threads(); ++i) { 484 std::size_t ticket = bypass_task::my_current_task++; 485 if (ticket < test_task_pool.size()) { 486 tbb::detail::d1::spawn(test_task_pool[ticket], test_context); 487 } 488 } 489 490 auto suspend_func = [&resume_flag, &test_suspend_tag] { 491 auto thread_id = std::this_thread::get_id(); 492 tbb::task::suspend([&resume_flag, &test_suspend_tag, thread_id] (tbb::task::suspend_point tag) { 493 CHECK(thread_id == std::this_thread::get_id()); 494 test_suspend_tag = tag; 495 resume_flag = 1; 496 }); 497 }; 498 using task_type = CountingTask<decltype(suspend_func)>; 499 task_type suspend_task(suspend_func, wait); 500 501 tbb::detail::d1::execute_and_wait(suspend_task, test_context, wait, test_context); 502 CHECK(bypass_task::my_current_task >= test_task_pool.size()); 503 REQUIRE_MESSAGE(test_tls == 1, "The wrong thread came out"); 504 } 505 506 //! \brief \ref error_guessing 507 TEST_CASE("Critical tasks + resume") { 508 std::uint32_t task_number = 500 * static_cast<std::uint32_t>(utils::get_platform_max_threads()); 509 510 tbb::task_group_context test_context; 511 tbb::detail::d1::wait_context wait{ 0 }; 512 513 // The test expects at least one thread in test_arena 514 int num_threads_in_test_arena = std::max(2, int(utils::get_platform_max_threads())); 515 tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, num_threads_in_test_arena); 516 tbb::task_arena test_arena(num_threads_in_test_arena); 517 518 test_arena.initialize(); 519 520 std::atomic<bool> resume_flag{}, resumed{}; 521 tbb::task::suspend_point test_suspend_tag; 522 523 auto task_body = [&resume_flag, &resumed, &test_suspend_tag] { 524 // Make some work 525 utils::doDummyWork(1000); 526 527 if (resume_flag.exchange(false)) { 528 tbb::task::resume(test_suspend_tag); 529 resumed = true; 530 } 531 }; 532 533 using task_type = CountingTask<decltype(task_body)>; 534 std::vector<task_type, tbb::cache_aligned_allocator<task_type>> test_tasks; 535 536 for (std::size_t i = 0; i < task_number; ++i) { 537 test_tasks.emplace_back(task_body, wait); 538 } 539 540 wait.reserve(task_number / 2); 541 for (std::size_t i = 0; i < task_number / 2; ++i) { 542 submit(test_tasks[i], test_arena, test_context, true); 543 } 544 545 auto suspend_func = [&resume_flag, &test_suspend_tag] { 546 auto thread_id = std::this_thread::get_id(); 547 tbb::task::suspend([&resume_flag, &test_suspend_tag, thread_id] (tbb::task::suspend_point tag) { 548 CHECK(thread_id == std::this_thread::get_id()); 549 test_suspend_tag = tag; 550 resume_flag.store(true, std::memory_order_release); 551 }); 552 }; 553 using suspend_task_type = CountingTask<decltype(suspend_func)>; 554 suspend_task_type suspend_task(suspend_func, wait); 555 556 wait.reserve(1); 557 submit(suspend_task, test_arena, test_context, true); 558 559 test_arena.execute([&wait, &test_tasks, &test_arena, &test_context, &resumed, task_number] { 560 tbb::this_task_arena::isolate([&wait, &test_tasks, &test_arena, &test_context, &resumed, task_number] { 561 do { 562 wait.reserve(task_number / 2); 563 tbb::parallel_for(tbb::blocked_range<std::size_t>(task_number / 2, task_number), 564 [&test_tasks, &test_arena, &test_context] (tbb::blocked_range<std::size_t>& range) { 565 for (std::size_t i = range.begin(); i != range.end(); ++i) { 566 submit(test_tasks[i], test_arena, test_context, true); 567 } 568 } 569 ); 570 } while (!resumed); 571 }); 572 }); 573 574 test_arena.execute([&wait, &test_context] { 575 tbb::detail::d1::wait(wait, test_context); 576 }); 577 } 578 579 //! \brief \ref error_guessing 580 TEST_CASE("Stress testing") { 581 std::uint32_t task_number = static_cast<std::uint32_t>(utils::get_platform_max_threads()); 582 583 tbb::task_group_context test_context; 584 tbb::detail::d1::wait_context wait(task_number); 585 586 tbb::task_arena test_arena; 587 588 test_arena.initialize(); 589 590 auto task_body = [] { 591 tbb::parallel_for(tbb::blocked_range<std::size_t>(0, 1000), [] (tbb::blocked_range<std::size_t>&) { 592 utils::doDummyWork(100); 593 }); 594 }; 595 using task_type = CountingTask<decltype(task_body)>; 596 597 std::size_t iter_counter = 20; 598 599 std::vector<task_type, tbb::cache_aligned_allocator<task_type>> test_tasks; 600 601 for (std::size_t j = 0; j < task_number; ++j) { 602 test_tasks.emplace_back(task_body, wait); 603 } 604 605 test_arena.execute([&test_tasks, &task_body, &wait, &test_context, &test_arena, iter_counter, task_number] { 606 for (std::size_t i = 0; i < iter_counter; ++i) { 607 608 for (std::size_t j = 0; j < task_number; ++j) { 609 test_arena.enqueue(task_body); 610 } 611 612 for (std::size_t j = 0; j < task_number / 2; ++j) { 613 tbb::detail::d1::spawn(test_tasks[j], test_context); 614 } 615 616 for (std::size_t j = task_number / 2; j < task_number; ++j) { 617 submit(test_tasks[j], test_arena, test_context, true); 618 } 619 620 tbb::detail::d1::wait(wait, test_context); 621 wait.reserve(task_number); 622 } 623 wait.release(task_number); 624 }); 625 626 627 REQUIRE_MESSAGE(task_type::execute_counter() == task_number * iter_counter, "Some task was not executed"); 628 REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled"); 629 } 630 631 //! \brief \ref error_guessing 632 TEST_CASE("All workers sleep") { 633 std::uint32_t thread_number = static_cast<std::uint32_t>(utils::get_platform_max_threads()); 634 tbb::concurrent_vector<tbb::task::suspend_point> suspend_points; 635 636 tbb::task_group test_gr; 637 638 utils::SpinBarrier barrier(thread_number); 639 auto resumble_task = [&] { 640 barrier.wait(); 641 auto thread_id = std::this_thread::get_id(); 642 tbb::task::suspend([&] (tbb::task::suspend_point sp) { 643 CHECK(thread_id == std::this_thread::get_id()); 644 suspend_points.push_back(sp); 645 barrier.wait(); 646 }); 647 }; 648 649 for (std::size_t i = 0; i < thread_number - 1; ++i) { 650 test_gr.run(resumble_task); 651 } 652 653 barrier.wait(); 654 barrier.wait(); 655 TestCPUUserTime(thread_number); 656 657 for (auto sp : suspend_points) 658 tbb::task::resume(sp); 659 test_gr.wait(); 660 } 661 662 #endif // __TBB_RESUMABLE_TASKS 663 664 //! \brief \ref error_guessing 665 TEST_CASE("Enqueue with exception") { 666 std::uint32_t task_number = 500 * static_cast<std::uint32_t>(utils::get_platform_max_threads()); 667 668 tbb::task_group_context test_context; 669 tbb::detail::d1::wait_context wait(task_number); 670 671 tbb::task_arena test_arena{int(std::thread::hardware_concurrency() + 1)}; 672 673 test_arena.initialize(); 674 675 auto task_body = [] { 676 utils::doDummyWork(100); 677 }; 678 679 std::atomic<bool> end_flag{false}; 680 auto check_body = [&end_flag] { 681 end_flag.store(true, std::memory_order_relaxed); 682 }; 683 684 using task_type = CountingTask<decltype(task_body)>; 685 std::vector<task_type, tbb::cache_aligned_allocator<task_type>> test_tasks; 686 687 for (std::size_t j = 0; j < task_number; ++j) { 688 test_tasks.emplace_back(task_body, wait); 689 } 690 691 { 692 tbb::global_control gc(tbb::global_control::max_allowed_parallelism, 1); 693 test_arena.enqueue(task_body); 694 // Initialize implicit arena 695 tbb::parallel_for(0, 1, [] (int) {}); 696 tbb::task_arena test_arena2(tbb::task_arena::attach{}); 697 test_arena2.enqueue(task_body); 698 } 699 700 constexpr std::size_t iter_count = 10; 701 for (std::size_t k = 0; k < iter_count; ++k) { 702 tbb::global_control gc(tbb::global_control::max_allowed_parallelism, 1); 703 test_arena.enqueue(check_body); 704 705 while (!end_flag.load(std::memory_order_relaxed)) ; 706 707 utils::Sleep(1); 708 end_flag.store(false, std::memory_order_relaxed); 709 710 test_arena.execute([&test_tasks, &wait, &test_context, task_number] { 711 for (std::size_t j = 0; j < task_number; ++j) { 712 tbb::detail::d1::spawn(test_tasks[j], test_context); 713 } 714 715 tbb::detail::d1::wait(wait, test_context); 716 wait.reserve(task_number); 717 }); 718 } 719 wait.release(task_number); 720 721 722 REQUIRE_MESSAGE(task_type::execute_counter() == task_number * iter_count, "Some task was not executed"); 723 REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled"); 724 } 725 726 struct resubmitting_task : public tbb::detail::d1::task { 727 tbb::task_arena& my_arena; 728 tbb::task_group_context& my_ctx; 729 std::atomic<int> counter{100000}; 730 731 resubmitting_task(tbb::task_arena& arena, tbb::task_group_context& ctx) : my_arena(arena), my_ctx(ctx) 732 {} 733 734 tbb::detail::d1::task* execute(tbb::detail::d1::execution_data& ) override { 735 if (counter-- > 0) { 736 submit(*this, my_arena, my_ctx, true); 737 } 738 return nullptr; 739 } 740 741 tbb::detail::d1::task* cancel( tbb::detail::d1::execution_data& ) override { 742 FAIL("The function should never be called."); 743 return nullptr; 744 } 745 }; 746 747 //! \brief \ref error_guessing 748 TEST_CASE("Test with priority inversion") { 749 if (!utils::can_change_thread_priority()) return; 750 751 std::uint32_t thread_number = static_cast<std::uint32_t>(utils::get_platform_max_threads()); 752 tbb::global_control gc(tbb::global_control::max_allowed_parallelism, thread_number + 1); 753 754 tbb::task_arena test_arena(2 * thread_number, thread_number); 755 test_arena.initialize(); 756 utils::pinning_observer obsr(test_arena); 757 CHECK_MESSAGE(obsr.is_observing(), "Arena observer has not been activated"); 758 759 std::uint32_t critical_task_counter = 1000 * thread_number; 760 std::atomic<std::size_t> task_counter{0}; 761 762 tbb::task_group_context test_context; 763 tbb::detail::d1::wait_context wait(critical_task_counter); 764 765 auto critical_work = [&] { 766 utils::doDummyWork(10); 767 }; 768 769 using suspend_task_type = CountingTask<decltype(critical_work)>; 770 suspend_task_type critical_task(critical_work, wait); 771 772 auto high_priority_thread_func = [&] { 773 // Increase external threads priority 774 utils::increase_thread_priority(); 775 // pin external threads 776 test_arena.execute([]{}); 777 while (task_counter++ < critical_task_counter) { 778 submit(critical_task, test_arena, test_context, true); 779 std::this_thread::sleep_for(std::chrono::milliseconds(1)); 780 } 781 }; 782 783 resubmitting_task worker_task(test_arena, test_context); 784 // warm up 785 // take first core on execute 786 utils::SpinBarrier barrier(thread_number + 1); 787 test_arena.execute([&] { 788 tbb::parallel_for(std::uint32_t(0), thread_number + 1, [&] (std::uint32_t) { 789 barrier.wait(); 790 submit(worker_task, test_arena, test_context, true); 791 }); 792 }); 793 794 std::vector<std::thread> high_priority_threads; 795 for (std::size_t i = 0; i < thread_number - 1; ++i) { 796 high_priority_threads.emplace_back(high_priority_thread_func); 797 } 798 799 utils::increase_thread_priority(); 800 while (task_counter++ < critical_task_counter) { 801 submit(critical_task, test_arena, test_context, true); 802 std::this_thread::sleep_for(std::chrono::milliseconds(1)); 803 } 804 805 tbb::detail::d1::wait(wait, test_context); 806 807 for (std::size_t i = 0; i < thread_number - 1; ++i) { 808 high_priority_threads[i].join(); 809 } 810 } 811 812 // Explicit test for raii_guard move ctor because of copy elision optimization 813 // TODO: consider better test file for the test case 814 //! \brief \ref interface 815 TEST_CASE("raii_guard move ctor") { 816 int count{0}; 817 auto func = [&count] { 818 count++; 819 CHECK(count == 1); 820 }; 821 822 tbb::detail::d0::raii_guard<decltype(func)> guard1(func); 823 tbb::detail::d0::raii_guard<decltype(func)> guard2(std::move(guard1)); 824 } 825 826 //! \brief \ref error_guessing 827 TEST_CASE("Check correct arena destruction with enqueue") { 828 for (int i = 0; i < 100; ++i) { 829 tbb::task_scheduler_handle handle{ tbb::attach{} }; 830 { 831 tbb::task_arena a(2, 0); 832 833 a.enqueue([] { 834 tbb::parallel_for(0, 100, [] (int) { std::this_thread::sleep_for(std::chrono::nanoseconds(10)); }); 835 }); 836 std::this_thread::sleep_for(std::chrono::microseconds(1)); 837 } 838 tbb::finalize(handle, std::nothrow_t{}); 839 } 840 } 841