1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "common/test.h" 18 #include "common/utils.h" 19 #include "common/dummy_body.h" 20 #include "common/spin_barrier.h" 21 #include "common/utils_concurrency_limit.h" 22 #include "common/cpu_usertime.h" 23 24 #include "tbb/task.h" 25 #include "tbb/task_group.h" 26 #include "tbb/parallel_for.h" 27 #include "tbb/cache_aligned_allocator.h" 28 #include "tbb/global_control.h" 29 #include "tbb/concurrent_vector.h" 30 31 #include <atomic> 32 #include <thread> 33 #include <thread> 34 35 //! \file test_task.cpp 36 //! \brief Test for [internal] functionality 37 38 struct EmptyBody { 39 void operator()() const {} 40 }; 41 42 #if _MSC_VER && !defined(__INTEL_COMPILER) 43 // unreachable code 44 #pragma warning( push ) 45 #pragma warning( disable: 4702 ) 46 #endif 47 48 template <typename Body = EmptyBody> 49 class CountingTask : public tbb::detail::d1::task { 50 public: 51 CountingTask( Body body, tbb::detail::d1::wait_context& wait ) : my_body(body), my_wait(wait) {} 52 53 CountingTask( tbb::detail::d1::wait_context& wait ) : my_wait(wait) {} 54 55 task* execute( tbb::detail::d1::execution_data& ) override { 56 ++my_execute_counter; 57 my_body(); 58 my_wait.release(); 59 return nullptr; 60 } 61 62 task* cancel( tbb::detail::d1::execution_data& ) override { 63 ++my_cancel_counter; 64 my_wait.release(); 65 return nullptr; 66 } 67 68 static void reset() { 69 my_execute_counter = 0; 70 my_cancel_counter = 0; 71 } 72 73 static std::size_t execute_counter() { return my_execute_counter; } 74 static std::size_t cancel_counter() { return my_cancel_counter; } 75 76 private: 77 Body my_body; 78 tbb::detail::d1::wait_context& my_wait; 79 80 static std::atomic<std::size_t> my_execute_counter; 81 static std::atomic<std::size_t> my_cancel_counter; 82 }; // struct CountingTask 83 84 85 #if _MSC_VER && !defined(__INTEL_COMPILER) 86 #pragma warning( pop ) 87 #endif // warning 4702 is back 88 89 template <typename Body> 90 std::atomic<std::size_t> CountingTask<Body>::my_execute_counter(0); 91 92 template <typename Body> 93 std::atomic<std::size_t> CountingTask<Body>::my_cancel_counter(0); 94 95 #if TBB_USE_EXCEPTIONS 96 void test_cancellation_on_exception( bool reset_ctx ) { 97 tbb::detail::d1::wait_context wait(1); 98 tbb::task_group_context test_context; 99 auto throw_body = [] { 100 throw 1; 101 }; 102 CountingTask<decltype(throw_body)> task(throw_body, wait); 103 104 constexpr std::size_t iter_counter = 1000; 105 for (std::size_t i = 0; i < iter_counter; ++i) { 106 try { 107 tbb::detail::d1::execute_and_wait(task, test_context, wait, test_context); 108 } catch(int ex) { 109 REQUIRE(ex == 1); 110 } 111 if (reset_ctx) { 112 test_context.reset(); 113 } 114 wait.reserve(1); 115 } 116 wait.release(1); 117 118 REQUIRE_MESSAGE(task.execute_counter() == (reset_ctx ? iter_counter : 1), "Some task was not executed"); 119 REQUIRE_MESSAGE(task.cancel_counter() == iter_counter, "Some task was not canceled after the exception occurs"); 120 task.reset(); 121 } 122 #endif // TBB_USE_EXCEPTIONS 123 124 //! \brief \ref error_guessing 125 TEST_CASE("External threads sleep") { 126 if (utils::get_platform_max_threads() < 2) return; 127 utils::SpinBarrier barrier(2); 128 129 tbb::task_group test_gr; 130 131 test_gr.run([&] { 132 barrier.wait(); 133 TestCPUUserTime(2); 134 }); 135 136 barrier.wait(); 137 138 test_gr.wait(); 139 } 140 141 //! \brief \ref error_guessing 142 TEST_CASE("Test that task was executed p times") { 143 tbb::detail::d1::wait_context wait(1); 144 tbb::task_group_context test_context; 145 CountingTask<> test_task(wait); 146 147 constexpr std::size_t iter_counter = 10000; 148 for (std::size_t i = 0; i < iter_counter; ++i) { 149 tbb::detail::d1::execute_and_wait(test_task, test_context, wait, test_context); 150 wait.reserve(1); 151 } 152 153 wait.release(1); 154 155 REQUIRE_MESSAGE(CountingTask<>::execute_counter() == iter_counter, "The task was not executed necessary times"); 156 REQUIRE_MESSAGE(CountingTask<>::cancel_counter() == 0, "Some instance of the task was canceled"); 157 CountingTask<>::reset(); 158 } 159 160 #if TBB_USE_EXCEPTIONS 161 //! \brief \ref error_guessing 162 TEST_CASE("Test cancellation on exception") { 163 test_cancellation_on_exception(/*reset_ctx = */true); 164 test_cancellation_on_exception(/*reset_ctx = */false); 165 } 166 #endif // TBB_USE_EXCEPTIONS 167 168 //! \brief \ref error_guessing 169 TEST_CASE("Simple test parallelism usage") { 170 std::size_t threads_num = utils::get_platform_max_threads(); 171 utils::SpinBarrier barrier(threads_num); 172 173 auto barrier_wait = [&barrier] { 174 barrier.wait(); 175 }; 176 177 tbb::detail::d1::wait_context wait(threads_num); 178 tbb::detail::d1::task_group_context test_context; 179 using task_type = CountingTask<decltype(barrier_wait)>; 180 181 std::vector<task_type, tbb::cache_aligned_allocator<task_type>> vector_test_task(threads_num, task_type(barrier_wait, wait)); 182 183 constexpr std::size_t iter_counter = 100; 184 for (std::size_t i = 0; i < iter_counter; ++i) { 185 for (std::size_t j = 0; j < threads_num; ++j) { 186 tbb::detail::d1::spawn(vector_test_task[j], test_context); 187 } 188 tbb::detail::d1::wait(wait, test_context); 189 wait.reserve(threads_num); 190 } 191 wait.release(threads_num); 192 193 REQUIRE_MESSAGE(task_type::execute_counter() == iter_counter * threads_num, "Some task was not executed"); 194 REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled"); 195 task_type::reset(); 196 } 197 198 //! \brief \ref error_guessing 199 TEST_CASE("Test parallelism usage with parallel_for") { 200 std::size_t task_threads_num = utils::get_platform_max_threads(); 201 utils::SpinBarrier barrier(task_threads_num); 202 203 auto barrier_wait = [&barrier] { 204 barrier.wait(); 205 }; 206 207 std::size_t pfor_iter_count = 10000; 208 std::atomic<std::size_t> pfor_counter(0); 209 210 auto parallel_for_func = [&pfor_counter, pfor_iter_count] { 211 tbb::parallel_for(tbb::blocked_range<std::size_t>(0, pfor_iter_count), 212 [&pfor_counter] (tbb::blocked_range<std::size_t>& range) { 213 for (auto it = range.begin(); it != range.end(); ++it) { 214 ++pfor_counter; 215 } 216 } 217 ); 218 }; 219 220 tbb::detail::d1::wait_context wait(task_threads_num); 221 tbb::detail::d1::task_group_context test_context; 222 using task_type = CountingTask<decltype(barrier_wait)>; 223 std::vector<task_type, tbb::cache_aligned_allocator<task_type>> vector_test_task(task_threads_num, task_type(barrier_wait, wait)); 224 225 constexpr std::size_t iter_count = 10; 226 constexpr std::size_t pfor_threads_num = 4; 227 for (std::size_t i = 0; i < iter_count; ++i) { 228 std::vector<std::thread> pfor_threads; 229 230 for (std::size_t j = 0; j < task_threads_num; ++j) { 231 tbb::detail::d1::spawn(vector_test_task[j], test_context); 232 } 233 234 for (std::size_t k = 0; k < pfor_threads_num; ++k) { 235 pfor_threads.emplace_back(parallel_for_func); 236 } 237 238 tbb::detail::d1::wait(wait, test_context); 239 240 for (auto& thread : pfor_threads) { 241 if (thread.joinable()) { 242 thread.join(); 243 } 244 } 245 246 wait.reserve(task_threads_num); 247 } 248 wait.release(task_threads_num); 249 250 REQUIRE_MESSAGE(task_type::execute_counter() == task_threads_num * iter_count, "Some task was not executed"); 251 REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled"); 252 REQUIRE_MESSAGE(pfor_counter == iter_count * pfor_threads_num * pfor_iter_count, "Some parallel_for thread was not finished"); 253 task_type::reset(); 254 } 255 256 //! \brief \ref error_guessing 257 TEST_CASE("Test parallelism usage with spawn tasks in different threads") { 258 std::size_t threads_num = utils::get_platform_max_threads(); 259 utils::SpinBarrier barrier(threads_num); 260 261 auto barrier_wait = [&barrier] { 262 barrier.wait(); 263 }; 264 265 tbb::detail::d1::wait_context wait(threads_num); 266 tbb::detail::d1::task_group_context test_context; 267 using task_type = CountingTask<decltype(barrier_wait)>; 268 std::vector<task_type, tbb::cache_aligned_allocator<task_type>> vector_test_task(threads_num, task_type(barrier_wait, wait)); 269 270 auto thread_func = [&vector_test_task, &test_context] ( std::size_t idx ) { 271 tbb::detail::d1::spawn(vector_test_task[idx], test_context); 272 }; 273 274 constexpr std::size_t iter_count = 10; 275 for (std::size_t i = 0; i < iter_count; ++i) { 276 std::vector<std::thread> threads; 277 278 for (std::size_t k = 0; k < threads_num - 1; ++k) { 279 threads.emplace_back(thread_func, k); 280 } 281 282 for (auto& thread : threads) { 283 if (thread.joinable()) { 284 thread.join(); 285 } 286 } 287 288 tbb::detail::d1::execute_and_wait(vector_test_task[threads_num - 1], test_context, wait, test_context); 289 wait.reserve(threads_num); 290 } 291 wait.release(threads_num); 292 293 REQUIRE_MESSAGE(task_type::execute_counter() == iter_count * threads_num, "Some task was not executed"); 294 REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled"); 295 task_type::reset(); 296 } 297 298 class SpawningTaskBody; 299 300 using SpawningTask = CountingTask<SpawningTaskBody>; 301 302 class SpawningTaskBody { 303 public: 304 using task_pool_type = std::vector<SpawningTask, tbb::cache_aligned_allocator<SpawningTask>>; 305 306 SpawningTaskBody( task_pool_type& task_pool, tbb::task_group_context& test_ctx ) 307 : my_task_pool(task_pool), my_test_ctx(test_ctx) {} 308 309 void operator()() const { 310 std::size_t delta = 7; 311 std::size_t start_idx = my_current_task.fetch_add(delta); 312 313 if (start_idx < my_task_pool.size()) { 314 for (std::size_t idx = start_idx; idx != std::min(my_task_pool.size(), start_idx + delta); ++idx) { 315 tbb::detail::d1::spawn(my_task_pool[idx], my_test_ctx); 316 } 317 } 318 } 319 private: 320 task_pool_type& my_task_pool; 321 tbb::task_group_context& my_test_ctx; 322 static std::atomic<std::size_t> my_current_task; 323 }; // class SpawningTaskBody 324 325 std::atomic<std::size_t> SpawningTaskBody::my_current_task(0); 326 327 //! \brief \ref error_guessing 328 TEST_CASE("Actively adding tasks") { 329 std::size_t task_number = 500 * utils::get_platform_max_threads(); 330 331 tbb::detail::d1::wait_context wait(task_number + 1); 332 tbb::task_group_context test_context; 333 334 SpawningTaskBody::task_pool_type task_pool; 335 336 SpawningTaskBody task_body{task_pool, test_context}; 337 for (std::size_t i = 0; i < task_number; ++i) { 338 task_pool.emplace_back(task_body, wait); 339 } 340 341 SpawningTask first_task(task_body, wait); 342 tbb::detail::d1::execute_and_wait(first_task, test_context, wait, test_context); 343 344 REQUIRE_MESSAGE(SpawningTask::execute_counter() == task_number + 1, "Some tasks were not executed"); // Is it right? 345 REQUIRE_MESSAGE(SpawningTask::cancel_counter() == 0, "Some tasks were canceled"); 346 } 347 348 #if __TBB_RESUMABLE_TASKS 349 350 struct suspended_task : public tbb::detail::d1::task { 351 352 suspended_task(tbb::task::suspend_point tag, tbb::detail::d1::wait_context& wait) 353 : my_suspend_tag(tag), my_wait(wait) 354 {} 355 356 task* execute(tbb::detail::d1::execution_data&) override { 357 tbb::parallel_for(tbb::blocked_range<std::size_t>(0, 100000), 358 [] (const tbb::blocked_range<std::size_t>& range) { 359 // Make some heavy work 360 std::atomic<int> sum{}; 361 for (auto it = range.begin(); it != range.end(); ++it) { 362 ++sum; 363 } 364 }, 365 tbb::static_partitioner{} 366 ); 367 368 my_wait.release(); 369 tbb::task::resume(my_suspend_tag); 370 return nullptr; 371 } 372 373 task* cancel(tbb::detail::d1::execution_data&) override { 374 FAIL("The function should never be called."); 375 return nullptr; 376 } 377 378 tbb::task::suspend_point my_suspend_tag; 379 tbb::detail::d1::wait_context& my_wait; 380 }; 381 382 //! \brief \ref error_guessing 383 TEST_CASE("Isolation + resumable tasks") { 384 std::atomic<int> suspend_flag{}; 385 tbb::task_group_context test_context; 386 387 std::atomic<int> suspend_count{}; 388 std::atomic<int> resume_count{}; 389 390 tbb::parallel_for(tbb::blocked_range<std::size_t>(0, 100000), 391 [&suspend_flag, &test_context, &suspend_count, &resume_count] (const tbb::blocked_range<std::size_t>& range) { 392 int ticket = 0; 393 for (auto it = range.begin(); it != range.end(); ++it) { 394 ticket = suspend_flag++; 395 } 396 397 if (ticket % 5 == 0) { 398 std::vector<suspended_task, tbb::cache_aligned_allocator<suspended_task>> test_task; 399 tbb::detail::d1::wait_context wait(1); 400 ++suspend_count; 401 tbb::this_task_arena::isolate([&wait, &test_context, &test_task] { 402 tbb::task::suspend([&wait, &test_context, &test_task] (tbb::task::suspend_point tag) { 403 test_task.emplace_back(tag, wait); 404 tbb::detail::d1::spawn(test_task[0], test_context); 405 }); 406 } 407 ); 408 tbb::detail::d1::wait(wait, test_context); 409 ++resume_count; 410 } 411 } 412 ); 413 414 CHECK(suspend_count == resume_count); 415 } 416 417 struct bypass_task : public tbb::detail::d1::task { 418 using task_pool_type = std::vector<bypass_task, tbb::cache_aligned_allocator<bypass_task>>; 419 420 bypass_task(tbb::detail::d1::wait_context& wait, task_pool_type& task_pool, 421 std::atomic<int>& resume_flag, tbb::task::suspend_point& suspend_tag) 422 : my_wait(wait), my_task_pool(task_pool), my_resume_flag(resume_flag), my_suspend_tag(suspend_tag) 423 {} 424 425 task* execute(tbb::detail::d1::execution_data&) override { 426 utils::doDummyWork(10000); 427 428 int expected = 1; 429 if (my_resume_flag.compare_exchange_strong(expected, 2)) { 430 tbb::task::resume(my_suspend_tag); 431 } 432 433 std::size_t ticket = my_current_task++; 434 task* next = ticket < my_task_pool.size() ? &my_task_pool[ticket] : nullptr; 435 436 if (!next && my_resume_flag != 2) { 437 // Rarely all tasks can be executed before the suspend. 438 // So, wait for the suspend before leaving. 439 utils::SpinWaitWhileEq(my_resume_flag, 0); 440 expected = 1; 441 if (my_resume_flag.compare_exchange_strong(expected, 2)) { 442 tbb::task::resume(my_suspend_tag); 443 } 444 } 445 446 my_wait.release(); 447 return next; 448 } 449 450 task* cancel(tbb::detail::d1::execution_data&) override { 451 FAIL("The function should never be called."); 452 return nullptr; 453 } 454 455 tbb::detail::d1::wait_context& my_wait; 456 task_pool_type& my_task_pool; 457 std::atomic<int>& my_resume_flag; 458 tbb::task::suspend_point& my_suspend_tag; 459 static std::atomic<int> my_current_task; 460 }; 461 462 std::atomic<int> bypass_task::my_current_task(0); 463 464 thread_local int test_tls = 0; 465 466 //! \brief \ref error_guessing 467 TEST_CASE("Bypass suspended by resume") { 468 std::size_t task_number = 500 * utils::get_platform_max_threads(); 469 tbb::task_group_context test_context; 470 tbb::detail::d1::wait_context wait(task_number + 1); 471 472 test_tls = 1; 473 474 std::atomic<int> resume_flag{0}; 475 tbb::task::suspend_point test_suspend_tag; 476 477 std::vector<bypass_task, tbb::cache_aligned_allocator<bypass_task>> test_task_pool; 478 479 for (std::size_t i = 0; i < task_number; ++i) { 480 test_task_pool.emplace_back(wait, test_task_pool, resume_flag, test_suspend_tag); 481 } 482 483 for (std::size_t i = 0; i < utils::get_platform_max_threads(); ++i) { 484 std::size_t ticket = bypass_task::my_current_task++; 485 if (ticket < test_task_pool.size()) { 486 tbb::detail::d1::spawn(test_task_pool[ticket], test_context); 487 } 488 } 489 490 auto suspend_func = [&resume_flag, &test_suspend_tag] { 491 tbb::task::suspend([&resume_flag, &test_suspend_tag] (tbb::task::suspend_point tag) { 492 test_suspend_tag = tag; 493 resume_flag = 1; 494 }); 495 }; 496 using task_type = CountingTask<decltype(suspend_func)>; 497 task_type suspend_task(suspend_func, wait); 498 499 tbb::detail::d1::execute_and_wait(suspend_task, test_context, wait, test_context); 500 CHECK(bypass_task::my_current_task >= test_task_pool.size()); 501 REQUIRE_MESSAGE(test_tls == 1, "The wrong thread came out"); 502 } 503 504 //! \brief \ref error_guessing 505 TEST_CASE("Critical tasks + resume") { 506 std::size_t task_number = 500 * utils::get_platform_max_threads(); 507 508 tbb::task_group_context test_context; 509 tbb::detail::d1::wait_context wait(task_number); 510 511 tbb::task_arena test_arena; 512 513 test_arena.initialize(); 514 515 std::atomic<bool> resume_flag{}; 516 tbb::task::suspend_point test_suspend_tag; 517 518 auto task_body = [&resume_flag, &test_suspend_tag] { 519 // Make some work 520 utils::doDummyWork(1000); 521 522 if (resume_flag.exchange(false)) { 523 tbb::task::resume(test_suspend_tag); 524 } 525 }; 526 527 using task_type = CountingTask<decltype(task_body)>; 528 std::vector<task_type, tbb::cache_aligned_allocator<task_type>> test_tasks; 529 530 for (std::size_t i = 0; i < task_number; ++i) { 531 test_tasks.emplace_back(task_body, wait); 532 } 533 534 for (std::size_t i = 0; i < task_number / 2; ++i) { 535 submit(test_tasks[i], test_arena, test_context, true); 536 } 537 538 auto suspend_func = [&resume_flag, &test_suspend_tag] { 539 tbb::task::suspend([&resume_flag, &test_suspend_tag] (tbb::task::suspend_point tag) { 540 test_suspend_tag = tag; 541 resume_flag.store(true, std::memory_order_release); 542 }); 543 }; 544 using suspend_task_type = CountingTask<decltype(suspend_func)>; 545 suspend_task_type suspend_task(suspend_func, wait); 546 547 submit(suspend_task, test_arena, test_context, true); 548 549 test_arena.execute([&test_tasks, &test_arena, &test_context, task_number] { 550 tbb::this_task_arena::isolate([&test_tasks, &test_arena, &test_context, task_number] { 551 tbb::parallel_for(tbb::blocked_range<std::size_t>(task_number / 2, task_number - 1), 552 [&test_tasks, &test_arena, &test_context] (tbb::blocked_range<std::size_t>& range) { 553 for (std::size_t i = range.begin(); i != range.end(); ++i) { 554 submit(test_tasks[i], test_arena, test_context, true); 555 } 556 }); 557 }); 558 }); 559 560 tbb::detail::d1::wait(wait, test_context); 561 } 562 563 //! \brief \ref error_guessing 564 TEST_CASE("Stress testing") { 565 std::size_t task_number = utils::get_platform_max_threads(); 566 567 tbb::task_group_context test_context; 568 tbb::detail::d1::wait_context wait(task_number); 569 570 tbb::task_arena test_arena; 571 572 test_arena.initialize(); 573 574 auto task_body = [] { 575 tbb::parallel_for(tbb::blocked_range<std::size_t>(0, 1000), [] (tbb::blocked_range<std::size_t>&) { 576 utils::doDummyWork(100); 577 }); 578 }; 579 using task_type = CountingTask<decltype(task_body)>; 580 581 std::size_t iter_counter = 20; 582 583 std::vector<task_type, tbb::cache_aligned_allocator<task_type>> test_tasks; 584 585 for (std::size_t j = 0; j < task_number; ++j) { 586 test_tasks.emplace_back(task_body, wait); 587 } 588 589 test_arena.execute([&test_tasks, &task_body, &wait, &test_context, &test_arena, iter_counter, task_number] { 590 for (std::size_t i = 0; i < iter_counter; ++i) { 591 592 for (std::size_t j = 0; j < task_number; ++j) { 593 test_arena.enqueue(task_body); 594 } 595 596 for (std::size_t j = 0; j < task_number / 2; ++j) { 597 tbb::detail::d1::spawn(test_tasks[j], test_context); 598 } 599 600 for (std::size_t j = task_number / 2; j < task_number; ++j) { 601 submit(test_tasks[j], test_arena, test_context, true); 602 } 603 604 tbb::detail::d1::wait(wait, test_context); 605 wait.reserve(task_number); 606 } 607 wait.release(task_number); 608 }); 609 610 611 REQUIRE_MESSAGE(task_type::execute_counter() == task_number * iter_counter, "Some task was not executed"); 612 REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled"); 613 } 614 615 //! \brief \ref error_guessing 616 TEST_CASE("All workers sleep") { 617 std::size_t thread_number = utils::get_platform_max_threads(); 618 tbb::concurrent_vector<tbb::task::suspend_point> suspend_points; 619 620 tbb::task_group test_gr; 621 622 utils::SpinBarrier barrier(thread_number); 623 auto resumble_task = [&] { 624 barrier.wait(); 625 tbb::task::suspend([&] (tbb::task::suspend_point sp) { 626 suspend_points.push_back(sp); 627 barrier.wait(); 628 }); 629 }; 630 631 for (std::size_t i = 0; i < thread_number - 1; ++i) { 632 test_gr.run(resumble_task); 633 } 634 635 barrier.wait(); 636 barrier.wait(); 637 TestCPUUserTime(thread_number); 638 639 for (auto sp : suspend_points) 640 tbb::task::resume(sp); 641 test_gr.wait(); 642 } 643 644 #endif // __TBB_RESUMABLE_TASKS 645 646 //! \brief \ref error_guessing 647 TEST_CASE("Enqueue with exception") { 648 std::size_t task_number = 500 * utils::get_platform_max_threads(); 649 650 tbb::task_group_context test_context; 651 tbb::detail::d1::wait_context wait(task_number); 652 653 tbb::task_arena test_arena{int(std::thread::hardware_concurrency() + 1)}; 654 655 test_arena.initialize(); 656 657 auto task_body = [] { 658 utils::doDummyWork(100); 659 }; 660 661 std::atomic<bool> end_flag{false}; 662 auto check_body = [&end_flag] { 663 end_flag.store(true, std::memory_order_relaxed); 664 }; 665 666 using task_type = CountingTask<decltype(task_body)>; 667 std::vector<task_type, tbb::cache_aligned_allocator<task_type>> test_tasks; 668 669 for (std::size_t j = 0; j < task_number; ++j) { 670 test_tasks.emplace_back(task_body, wait); 671 } 672 673 { 674 tbb::global_control gc(tbb::global_control::max_allowed_parallelism, 1); 675 test_arena.enqueue(task_body); 676 // Initialize implicit arena 677 tbb::parallel_for(0, 1, [] (int) {}); 678 tbb::task_arena test_arena2(tbb::task_arena::attach{}); 679 test_arena2.enqueue(task_body); 680 } 681 682 constexpr std::size_t iter_count = 10; 683 for (std::size_t k = 0; k < iter_count; ++k) { 684 tbb::global_control gc(tbb::global_control::max_allowed_parallelism, 1); 685 test_arena.enqueue(check_body); 686 687 while (!end_flag.load(std::memory_order_relaxed)) ; 688 689 utils::Sleep(1); 690 end_flag.store(false, std::memory_order_relaxed); 691 692 test_arena.execute([&test_tasks, &wait, &test_context, task_number] { 693 for (std::size_t j = 0; j < task_number; ++j) { 694 tbb::detail::d1::spawn(test_tasks[j], test_context); 695 } 696 697 tbb::detail::d1::wait(wait, test_context); 698 wait.reserve(task_number); 699 }); 700 } 701 wait.release(task_number); 702 703 704 REQUIRE_MESSAGE(task_type::execute_counter() == task_number * iter_count, "Some task was not executed"); 705 REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled"); 706 } 707 708 struct resubmitting_task : public tbb::detail::d1::task { 709 tbb::task_arena& my_arena; 710 tbb::task_group_context& my_ctx; 711 std::atomic<int> counter{100000}; 712 713 resubmitting_task(tbb::task_arena& arena, tbb::task_group_context& ctx) : my_arena(arena), my_ctx(ctx) 714 {} 715 716 tbb::detail::d1::task* execute(tbb::detail::d1::execution_data& ) override { 717 if (counter-- > 0) { 718 submit(*this, my_arena, my_ctx, true); 719 } 720 return nullptr; 721 } 722 723 tbb::detail::d1::task* cancel( tbb::detail::d1::execution_data& ) override { 724 FAIL("The function should never be called."); 725 return nullptr; 726 } 727 }; 728 729 //! \brief \ref error_guessing 730 TEST_CASE("Test with priority inversion") { 731 if (!utils::can_change_thread_priority()) return; 732 733 std::size_t thread_number = utils::get_platform_max_threads(); 734 tbb::global_control gc(tbb::global_control::max_allowed_parallelism, thread_number + 1); 735 736 tbb::task_arena test_arena(2 * thread_number, thread_number); 737 test_arena.initialize(); 738 utils::pinning_observer obsr(test_arena); 739 CHECK_MESSAGE(obsr.is_observing(), "Arena observer has not been activated"); 740 741 std::size_t critical_task_counter = 1000 * thread_number; 742 std::atomic<std::size_t> task_counter{0}; 743 744 tbb::task_group_context test_context; 745 tbb::detail::d1::wait_context wait(critical_task_counter); 746 747 auto critical_work = [&] { 748 utils::doDummyWork(10); 749 }; 750 751 using suspend_task_type = CountingTask<decltype(critical_work)>; 752 suspend_task_type critical_task(critical_work, wait); 753 754 auto high_priority_thread_func = [&] { 755 // Increase external threads priority 756 utils::increase_thread_priority(); 757 // pin external threads 758 test_arena.execute([]{}); 759 while (task_counter++ < critical_task_counter) { 760 submit(critical_task, test_arena, test_context, true); 761 std::this_thread::sleep_for(std::chrono::milliseconds(1)); 762 } 763 }; 764 765 resubmitting_task worker_task(test_arena, test_context); 766 // warm up 767 // take first core on execute 768 utils::SpinBarrier barrier(thread_number + 1); 769 test_arena.execute([&] { 770 tbb::parallel_for(std::size_t(0), thread_number + 1, [&] (std::size_t&) { 771 barrier.wait(); 772 submit(worker_task, test_arena, test_context, true); 773 }); 774 }); 775 776 std::vector<std::thread> high_priority_threads; 777 for (std::size_t i = 0; i < thread_number - 1; ++i) { 778 high_priority_threads.emplace_back(high_priority_thread_func); 779 } 780 781 utils::increase_thread_priority(); 782 while (task_counter++ < critical_task_counter) { 783 submit(critical_task, test_arena, test_context, true); 784 std::this_thread::sleep_for(std::chrono::milliseconds(1)); 785 } 786 787 tbb::detail::d1::wait(wait, test_context); 788 789 for (std::size_t i = 0; i < thread_number - 1; ++i) { 790 high_priority_threads[i].join(); 791 } 792 obsr.observe(false); 793 } 794