1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "common/test.h" 18 #include "common/utils.h" 19 #include "common/dummy_body.h" 20 #include "common/spin_barrier.h" 21 #include "common/utils_concurrency_limit.h" 22 #include "common/cpu_usertime.h" 23 24 #include "tbb/task.h" 25 #include "tbb/task_group.h" 26 #include "tbb/parallel_for.h" 27 #include "tbb/cache_aligned_allocator.h" 28 #include "tbb/global_control.h" 29 #include "tbb/concurrent_vector.h" 30 31 #include <atomic> 32 #include <thread> 33 #include <thread> 34 35 //! \file test_task.cpp 36 //! \brief Test for [internal] functionality 37 38 struct EmptyBody { 39 void operator()() const {} 40 }; 41 42 #if _MSC_VER && !defined(__INTEL_COMPILER) 43 // unreachable code 44 #pragma warning( push ) 45 #pragma warning( disable: 4702 ) 46 #endif 47 48 template <typename Body = EmptyBody> 49 class CountingTask : public tbb::detail::d1::task { 50 public: 51 CountingTask( Body body, tbb::detail::d1::wait_context& wait ) : my_body(body), my_wait(wait) {} 52 53 CountingTask( tbb::detail::d1::wait_context& wait ) : my_wait(wait) {} 54 55 task* execute( tbb::detail::d1::execution_data& ) override { 56 ++my_execute_counter; 57 my_body(); 58 my_wait.release(); 59 return nullptr; 60 } 61 62 task* cancel( tbb::detail::d1::execution_data& ) override { 63 ++my_cancel_counter; 64 my_wait.release(); 65 return nullptr; 66 } 67 68 static void reset() { 69 my_execute_counter = 0; 70 my_cancel_counter = 0; 71 } 72 73 static std::size_t execute_counter() { return my_execute_counter; } 74 static std::size_t cancel_counter() { return my_cancel_counter; } 75 76 private: 77 Body my_body; 78 tbb::detail::d1::wait_context& my_wait; 79 80 static std::atomic<std::size_t> my_execute_counter; 81 static std::atomic<std::size_t> my_cancel_counter; 82 }; // struct CountingTask 83 84 85 #if _MSC_VER && !defined(__INTEL_COMPILER) 86 #pragma warning( pop ) 87 #endif // warning 4702 is back 88 89 template <typename Body> 90 std::atomic<std::size_t> CountingTask<Body>::my_execute_counter(0); 91 92 template <typename Body> 93 std::atomic<std::size_t> CountingTask<Body>::my_cancel_counter(0); 94 95 #if TBB_USE_EXCEPTIONS 96 void test_cancellation_on_exception( bool reset_ctx ) { 97 tbb::detail::d1::wait_context wait(1); 98 tbb::task_group_context test_context; 99 auto throw_body = [] { 100 throw 1; 101 }; 102 CountingTask<decltype(throw_body)> task(throw_body, wait); 103 104 constexpr std::size_t iter_counter = 1000; 105 for (std::size_t i = 0; i < iter_counter; ++i) { 106 try { 107 tbb::detail::d1::execute_and_wait(task, test_context, wait, test_context); 108 } catch(int ex) { 109 REQUIRE(ex == 1); 110 } 111 if (reset_ctx) { 112 test_context.reset(); 113 } 114 wait.reserve(1); 115 } 116 wait.release(1); 117 118 REQUIRE_MESSAGE(task.execute_counter() == (reset_ctx ? iter_counter : 1), "Some task was not executed"); 119 REQUIRE_MESSAGE(task.cancel_counter() == iter_counter, "Some task was not canceled after the exception occurs"); 120 task.reset(); 121 } 122 #endif // TBB_USE_EXCEPTIONS 123 124 //! \brief \ref error_guessing 125 TEST_CASE("External threads sleep") { 126 if (utils::get_platform_max_threads() < 2) return; 127 utils::SpinBarrier barrier(2); 128 129 tbb::task_group test_gr; 130 131 test_gr.run([&] { 132 barrier.wait(); 133 TestCPUUserTime(2); 134 }); 135 136 barrier.wait(); 137 138 test_gr.wait(); 139 } 140 141 //! \brief \ref error_guessing 142 TEST_CASE("Test that task was executed p times") { 143 tbb::detail::d1::wait_context wait(1); 144 tbb::task_group_context test_context; 145 CountingTask<> test_task(wait); 146 147 constexpr std::size_t iter_counter = 10000; 148 for (std::size_t i = 0; i < iter_counter; ++i) { 149 tbb::detail::d1::execute_and_wait(test_task, test_context, wait, test_context); 150 wait.reserve(1); 151 } 152 153 wait.release(1); 154 155 REQUIRE_MESSAGE(CountingTask<>::execute_counter() == iter_counter, "The task was not executed necessary times"); 156 REQUIRE_MESSAGE(CountingTask<>::cancel_counter() == 0, "Some instance of the task was canceled"); 157 CountingTask<>::reset(); 158 } 159 160 #if TBB_USE_EXCEPTIONS 161 //! \brief \ref error_guessing 162 TEST_CASE("Test cancellation on exception") { 163 test_cancellation_on_exception(/*reset_ctx = */true); 164 test_cancellation_on_exception(/*reset_ctx = */false); 165 } 166 #endif // TBB_USE_EXCEPTIONS 167 168 //! \brief \ref error_guessing 169 TEST_CASE("Simple test parallelism usage") { 170 std::uint32_t threads_num = static_cast<std::uint32_t>(utils::get_platform_max_threads()); 171 utils::SpinBarrier barrier(threads_num); 172 173 auto barrier_wait = [&barrier] { 174 barrier.wait(); 175 }; 176 177 tbb::detail::d1::wait_context wait(threads_num); 178 tbb::detail::d1::task_group_context test_context; 179 using task_type = CountingTask<decltype(barrier_wait)>; 180 181 std::vector<task_type, tbb::cache_aligned_allocator<task_type>> vector_test_task(threads_num, task_type(barrier_wait, wait)); 182 183 constexpr std::size_t iter_counter = 100; 184 for (std::size_t i = 0; i < iter_counter; ++i) { 185 for (std::size_t j = 0; j < threads_num; ++j) { 186 tbb::detail::d1::spawn(vector_test_task[j], test_context); 187 } 188 tbb::detail::d1::wait(wait, test_context); 189 wait.reserve(threads_num); 190 } 191 wait.release(threads_num); 192 193 REQUIRE_MESSAGE(task_type::execute_counter() == iter_counter * threads_num, "Some task was not executed"); 194 REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled"); 195 task_type::reset(); 196 } 197 198 //! \brief \ref error_guessing 199 TEST_CASE("Test parallelism usage with parallel_for") { 200 std::uint32_t task_threads_num = static_cast<std::uint32_t>(utils::get_platform_max_threads()); 201 utils::SpinBarrier barrier(task_threads_num); 202 203 auto barrier_wait = [&barrier] { 204 barrier.wait(); 205 }; 206 207 std::size_t pfor_iter_count = 10000; 208 std::atomic<std::size_t> pfor_counter(0); 209 210 auto parallel_for_func = [&pfor_counter, pfor_iter_count] { 211 tbb::parallel_for(tbb::blocked_range<std::size_t>(0, pfor_iter_count), 212 [&pfor_counter] (tbb::blocked_range<std::size_t>& range) { 213 for (auto it = range.begin(); it != range.end(); ++it) { 214 ++pfor_counter; 215 } 216 } 217 ); 218 }; 219 220 tbb::detail::d1::wait_context wait(task_threads_num); 221 tbb::detail::d1::task_group_context test_context; 222 using task_type = CountingTask<decltype(barrier_wait)>; 223 std::vector<task_type, tbb::cache_aligned_allocator<task_type>> vector_test_task(task_threads_num, task_type(barrier_wait, wait)); 224 225 constexpr std::size_t iter_count = 10; 226 constexpr std::size_t pfor_threads_num = 4; 227 for (std::size_t i = 0; i < iter_count; ++i) { 228 std::vector<std::thread> pfor_threads; 229 230 for (std::size_t j = 0; j < task_threads_num; ++j) { 231 tbb::detail::d1::spawn(vector_test_task[j], test_context); 232 } 233 234 for (std::size_t k = 0; k < pfor_threads_num; ++k) { 235 pfor_threads.emplace_back(parallel_for_func); 236 } 237 238 tbb::detail::d1::wait(wait, test_context); 239 240 for (auto& thread : pfor_threads) { 241 if (thread.joinable()) { 242 thread.join(); 243 } 244 } 245 246 wait.reserve(task_threads_num); 247 } 248 wait.release(task_threads_num); 249 250 REQUIRE_MESSAGE(task_type::execute_counter() == task_threads_num * iter_count, "Some task was not executed"); 251 REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled"); 252 REQUIRE_MESSAGE(pfor_counter == iter_count * pfor_threads_num * pfor_iter_count, "Some parallel_for thread was not finished"); 253 task_type::reset(); 254 } 255 256 //! \brief \ref error_guessing 257 TEST_CASE("Test parallelism usage with spawn tasks in different threads") { 258 std::uint32_t threads_num = static_cast<std::uint32_t>(utils::get_platform_max_threads()); 259 utils::SpinBarrier barrier(threads_num); 260 261 auto barrier_wait = [&barrier] { 262 barrier.wait(); 263 }; 264 265 tbb::detail::d1::wait_context wait(threads_num); 266 tbb::detail::d1::task_group_context test_context; 267 using task_type = CountingTask<decltype(barrier_wait)>; 268 std::vector<task_type, tbb::cache_aligned_allocator<task_type>> vector_test_task(threads_num, task_type(barrier_wait, wait)); 269 270 auto thread_func = [&vector_test_task, &test_context] ( std::size_t idx ) { 271 tbb::detail::d1::spawn(vector_test_task[idx], test_context); 272 }; 273 274 constexpr std::size_t iter_count = 10; 275 for (std::size_t i = 0; i < iter_count; ++i) { 276 std::vector<std::thread> threads; 277 278 for (std::size_t k = 0; k < threads_num - 1; ++k) { 279 threads.emplace_back(thread_func, k); 280 } 281 282 for (auto& thread : threads) { 283 if (thread.joinable()) { 284 thread.join(); 285 } 286 } 287 288 tbb::detail::d1::execute_and_wait(vector_test_task[threads_num - 1], test_context, wait, test_context); 289 wait.reserve(threads_num); 290 } 291 wait.release(threads_num); 292 293 REQUIRE_MESSAGE(task_type::execute_counter() == iter_count * threads_num, "Some task was not executed"); 294 REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled"); 295 task_type::reset(); 296 } 297 298 class SpawningTaskBody; 299 300 using SpawningTask = CountingTask<SpawningTaskBody>; 301 302 class SpawningTaskBody { 303 public: 304 using task_pool_type = std::vector<SpawningTask, tbb::cache_aligned_allocator<SpawningTask>>; 305 306 SpawningTaskBody( task_pool_type& task_pool, tbb::task_group_context& test_ctx ) 307 : my_task_pool(task_pool), my_test_ctx(test_ctx) {} 308 309 void operator()() const { 310 std::size_t delta = 7; 311 std::size_t start_idx = my_current_task.fetch_add(delta); 312 313 if (start_idx < my_task_pool.size()) { 314 for (std::size_t idx = start_idx; idx != std::min(my_task_pool.size(), start_idx + delta); ++idx) { 315 tbb::detail::d1::spawn(my_task_pool[idx], my_test_ctx); 316 } 317 } 318 } 319 private: 320 task_pool_type& my_task_pool; 321 tbb::task_group_context& my_test_ctx; 322 static std::atomic<std::size_t> my_current_task; 323 }; // class SpawningTaskBody 324 325 std::atomic<std::size_t> SpawningTaskBody::my_current_task(0); 326 327 //! \brief \ref error_guessing 328 TEST_CASE("Actively adding tasks") { 329 std::uint32_t task_number = 500 * static_cast<std::uint32_t>(utils::get_platform_max_threads()); 330 331 tbb::detail::d1::wait_context wait(task_number + 1); 332 tbb::task_group_context test_context; 333 334 SpawningTaskBody::task_pool_type task_pool; 335 336 SpawningTaskBody task_body{task_pool, test_context}; 337 for (std::size_t i = 0; i < task_number; ++i) { 338 task_pool.emplace_back(task_body, wait); 339 } 340 341 SpawningTask first_task(task_body, wait); 342 tbb::detail::d1::execute_and_wait(first_task, test_context, wait, test_context); 343 344 REQUIRE_MESSAGE(SpawningTask::execute_counter() == task_number + 1, "Some tasks were not executed"); // Is it right? 345 REQUIRE_MESSAGE(SpawningTask::cancel_counter() == 0, "Some tasks were canceled"); 346 } 347 348 #if __TBB_RESUMABLE_TASKS 349 350 struct suspended_task : public tbb::detail::d1::task { 351 352 suspended_task(tbb::task::suspend_point tag, tbb::detail::d1::wait_context& wait) 353 : my_suspend_tag(tag), my_wait(wait) 354 {} 355 356 task* execute(tbb::detail::d1::execution_data&) override { 357 tbb::parallel_for(tbb::blocked_range<std::size_t>(0, 100000), 358 [] (const tbb::blocked_range<std::size_t>& range) { 359 // Make some heavy work 360 std::atomic<int> sum{}; 361 for (auto it = range.begin(); it != range.end(); ++it) { 362 ++sum; 363 } 364 }, 365 tbb::static_partitioner{} 366 ); 367 368 my_wait.release(); 369 tbb::task::resume(my_suspend_tag); 370 return nullptr; 371 } 372 373 task* cancel(tbb::detail::d1::execution_data&) override { 374 FAIL("The function should never be called."); 375 return nullptr; 376 } 377 378 tbb::task::suspend_point my_suspend_tag; 379 tbb::detail::d1::wait_context& my_wait; 380 }; 381 382 //! \brief \ref error_guessing 383 TEST_CASE("Isolation + resumable tasks") { 384 std::atomic<int> suspend_flag{}; 385 tbb::task_group_context test_context; 386 387 std::atomic<int> suspend_count{}; 388 std::atomic<int> resume_count{}; 389 390 tbb::parallel_for(tbb::blocked_range<std::size_t>(0, 100000), 391 [&suspend_flag, &test_context, &suspend_count, &resume_count] (const tbb::blocked_range<std::size_t>& range) { 392 int ticket = 0; 393 for (auto it = range.begin(); it != range.end(); ++it) { 394 ticket = suspend_flag++; 395 } 396 397 if (ticket % 5 == 0) { 398 std::vector<suspended_task, tbb::cache_aligned_allocator<suspended_task>> test_task; 399 tbb::detail::d1::wait_context wait(1); 400 ++suspend_count; 401 tbb::this_task_arena::isolate([&wait, &test_context, &test_task] { 402 tbb::task::suspend([&wait, &test_context, &test_task] (tbb::task::suspend_point tag) { 403 test_task.emplace_back(tag, wait); 404 tbb::detail::d1::spawn(test_task[0], test_context); 405 }); 406 } 407 ); 408 tbb::detail::d1::wait(wait, test_context); 409 ++resume_count; 410 } 411 } 412 ); 413 414 CHECK(suspend_count == resume_count); 415 } 416 417 struct bypass_task : public tbb::detail::d1::task { 418 using task_pool_type = std::vector<bypass_task, tbb::cache_aligned_allocator<bypass_task>>; 419 420 bypass_task(tbb::detail::d1::wait_context& wait, task_pool_type& task_pool, 421 std::atomic<int>& resume_flag, tbb::task::suspend_point& suspend_tag) 422 : my_wait(wait), my_task_pool(task_pool), my_resume_flag(resume_flag), my_suspend_tag(suspend_tag) 423 {} 424 425 task* execute(tbb::detail::d1::execution_data&) override { 426 utils::doDummyWork(10000); 427 428 int expected = 1; 429 if (my_resume_flag.compare_exchange_strong(expected, 2)) { 430 tbb::task::resume(my_suspend_tag); 431 } 432 433 std::size_t ticket = my_current_task++; 434 task* next = ticket < my_task_pool.size() ? &my_task_pool[ticket] : nullptr; 435 436 if (!next && my_resume_flag != 2) { 437 // Rarely all tasks can be executed before the suspend. 438 // So, wait for the suspend before leaving. 439 utils::SpinWaitWhileEq(my_resume_flag, 0); 440 expected = 1; 441 if (my_resume_flag.compare_exchange_strong(expected, 2)) { 442 tbb::task::resume(my_suspend_tag); 443 } 444 } 445 446 my_wait.release(); 447 return next; 448 } 449 450 task* cancel(tbb::detail::d1::execution_data&) override { 451 FAIL("The function should never be called."); 452 return nullptr; 453 } 454 455 tbb::detail::d1::wait_context& my_wait; 456 task_pool_type& my_task_pool; 457 std::atomic<int>& my_resume_flag; 458 tbb::task::suspend_point& my_suspend_tag; 459 static std::atomic<int> my_current_task; 460 }; 461 462 std::atomic<int> bypass_task::my_current_task(0); 463 464 thread_local int test_tls = 0; 465 466 //! \brief \ref error_guessing 467 TEST_CASE("Bypass suspended by resume") { 468 std::uint32_t task_number = 500 * static_cast<std::uint32_t>(utils::get_platform_max_threads()); 469 tbb::task_group_context test_context; 470 tbb::detail::d1::wait_context wait(task_number + 1); 471 472 test_tls = 1; 473 474 std::atomic<int> resume_flag{0}; 475 tbb::task::suspend_point test_suspend_tag; 476 477 std::vector<bypass_task, tbb::cache_aligned_allocator<bypass_task>> test_task_pool; 478 479 for (std::size_t i = 0; i < task_number; ++i) { 480 test_task_pool.emplace_back(wait, test_task_pool, resume_flag, test_suspend_tag); 481 } 482 483 for (std::size_t i = 0; i < utils::get_platform_max_threads(); ++i) { 484 std::size_t ticket = bypass_task::my_current_task++; 485 if (ticket < test_task_pool.size()) { 486 tbb::detail::d1::spawn(test_task_pool[ticket], test_context); 487 } 488 } 489 490 auto suspend_func = [&resume_flag, &test_suspend_tag] { 491 tbb::task::suspend([&resume_flag, &test_suspend_tag] (tbb::task::suspend_point tag) { 492 test_suspend_tag = tag; 493 resume_flag = 1; 494 }); 495 }; 496 using task_type = CountingTask<decltype(suspend_func)>; 497 task_type suspend_task(suspend_func, wait); 498 499 tbb::detail::d1::execute_and_wait(suspend_task, test_context, wait, test_context); 500 CHECK(bypass_task::my_current_task >= test_task_pool.size()); 501 REQUIRE_MESSAGE(test_tls == 1, "The wrong thread came out"); 502 } 503 504 //! \brief \ref error_guessing 505 TEST_CASE("Critical tasks + resume") { 506 std::uint32_t task_number = 500 * static_cast<std::uint32_t>(utils::get_platform_max_threads()); 507 508 tbb::task_group_context test_context; 509 tbb::detail::d1::wait_context wait(task_number); 510 511 // The test expects at least one thread in test_arena 512 int num_threads_in_test_arena = std::max(2, int(utils::get_platform_max_threads())); 513 tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, num_threads_in_test_arena); 514 tbb::task_arena test_arena(num_threads_in_test_arena); 515 516 test_arena.initialize(); 517 518 std::atomic<bool> resume_flag{}; 519 tbb::task::suspend_point test_suspend_tag; 520 521 auto task_body = [&resume_flag, &test_suspend_tag] { 522 // Make some work 523 utils::doDummyWork(1000); 524 525 if (resume_flag.exchange(false)) { 526 tbb::task::resume(test_suspend_tag); 527 } 528 }; 529 530 using task_type = CountingTask<decltype(task_body)>; 531 std::vector<task_type, tbb::cache_aligned_allocator<task_type>> test_tasks; 532 533 for (std::size_t i = 0; i < task_number; ++i) { 534 test_tasks.emplace_back(task_body, wait); 535 } 536 537 for (std::size_t i = 0; i < task_number / 2; ++i) { 538 submit(test_tasks[i], test_arena, test_context, true); 539 } 540 541 auto suspend_func = [&resume_flag, &test_suspend_tag] { 542 tbb::task::suspend([&resume_flag, &test_suspend_tag] (tbb::task::suspend_point tag) { 543 test_suspend_tag = tag; 544 resume_flag.store(true, std::memory_order_release); 545 }); 546 }; 547 using suspend_task_type = CountingTask<decltype(suspend_func)>; 548 suspend_task_type suspend_task(suspend_func, wait); 549 550 submit(suspend_task, test_arena, test_context, true); 551 552 test_arena.execute([&test_tasks, &test_arena, &test_context, task_number] { 553 tbb::this_task_arena::isolate([&test_tasks, &test_arena, &test_context, task_number] { 554 tbb::parallel_for(tbb::blocked_range<std::size_t>(task_number / 2, task_number - 1), 555 [&test_tasks, &test_arena, &test_context] (tbb::blocked_range<std::size_t>& range) { 556 for (std::size_t i = range.begin(); i != range.end(); ++i) { 557 submit(test_tasks[i], test_arena, test_context, true); 558 } 559 }); 560 }); 561 }); 562 563 tbb::detail::d1::wait(wait, test_context); 564 } 565 566 //! \brief \ref error_guessing 567 TEST_CASE("Stress testing") { 568 std::uint32_t task_number = static_cast<std::uint32_t>(utils::get_platform_max_threads()); 569 570 tbb::task_group_context test_context; 571 tbb::detail::d1::wait_context wait(task_number); 572 573 tbb::task_arena test_arena; 574 575 test_arena.initialize(); 576 577 auto task_body = [] { 578 tbb::parallel_for(tbb::blocked_range<std::size_t>(0, 1000), [] (tbb::blocked_range<std::size_t>&) { 579 utils::doDummyWork(100); 580 }); 581 }; 582 using task_type = CountingTask<decltype(task_body)>; 583 584 std::size_t iter_counter = 20; 585 586 std::vector<task_type, tbb::cache_aligned_allocator<task_type>> test_tasks; 587 588 for (std::size_t j = 0; j < task_number; ++j) { 589 test_tasks.emplace_back(task_body, wait); 590 } 591 592 test_arena.execute([&test_tasks, &task_body, &wait, &test_context, &test_arena, iter_counter, task_number] { 593 for (std::size_t i = 0; i < iter_counter; ++i) { 594 595 for (std::size_t j = 0; j < task_number; ++j) { 596 test_arena.enqueue(task_body); 597 } 598 599 for (std::size_t j = 0; j < task_number / 2; ++j) { 600 tbb::detail::d1::spawn(test_tasks[j], test_context); 601 } 602 603 for (std::size_t j = task_number / 2; j < task_number; ++j) { 604 submit(test_tasks[j], test_arena, test_context, true); 605 } 606 607 tbb::detail::d1::wait(wait, test_context); 608 wait.reserve(task_number); 609 } 610 wait.release(task_number); 611 }); 612 613 614 REQUIRE_MESSAGE(task_type::execute_counter() == task_number * iter_counter, "Some task was not executed"); 615 REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled"); 616 } 617 618 //! \brief \ref error_guessing 619 TEST_CASE("All workers sleep") { 620 std::uint32_t thread_number = static_cast<std::uint32_t>(utils::get_platform_max_threads()); 621 tbb::concurrent_vector<tbb::task::suspend_point> suspend_points; 622 623 tbb::task_group test_gr; 624 625 utils::SpinBarrier barrier(thread_number); 626 auto resumble_task = [&] { 627 barrier.wait(); 628 tbb::task::suspend([&] (tbb::task::suspend_point sp) { 629 suspend_points.push_back(sp); 630 barrier.wait(); 631 }); 632 }; 633 634 for (std::size_t i = 0; i < thread_number - 1; ++i) { 635 test_gr.run(resumble_task); 636 } 637 638 barrier.wait(); 639 barrier.wait(); 640 TestCPUUserTime(thread_number); 641 642 for (auto sp : suspend_points) 643 tbb::task::resume(sp); 644 test_gr.wait(); 645 } 646 647 #endif // __TBB_RESUMABLE_TASKS 648 649 //! \brief \ref error_guessing 650 TEST_CASE("Enqueue with exception") { 651 std::uint32_t task_number = 500 * static_cast<std::uint32_t>(utils::get_platform_max_threads()); 652 653 tbb::task_group_context test_context; 654 tbb::detail::d1::wait_context wait(task_number); 655 656 tbb::task_arena test_arena{int(std::thread::hardware_concurrency() + 1)}; 657 658 test_arena.initialize(); 659 660 auto task_body = [] { 661 utils::doDummyWork(100); 662 }; 663 664 std::atomic<bool> end_flag{false}; 665 auto check_body = [&end_flag] { 666 end_flag.store(true, std::memory_order_relaxed); 667 }; 668 669 using task_type = CountingTask<decltype(task_body)>; 670 std::vector<task_type, tbb::cache_aligned_allocator<task_type>> test_tasks; 671 672 for (std::size_t j = 0; j < task_number; ++j) { 673 test_tasks.emplace_back(task_body, wait); 674 } 675 676 { 677 tbb::global_control gc(tbb::global_control::max_allowed_parallelism, 1); 678 test_arena.enqueue(task_body); 679 // Initialize implicit arena 680 tbb::parallel_for(0, 1, [] (int) {}); 681 tbb::task_arena test_arena2(tbb::task_arena::attach{}); 682 test_arena2.enqueue(task_body); 683 } 684 685 constexpr std::size_t iter_count = 10; 686 for (std::size_t k = 0; k < iter_count; ++k) { 687 tbb::global_control gc(tbb::global_control::max_allowed_parallelism, 1); 688 test_arena.enqueue(check_body); 689 690 while (!end_flag.load(std::memory_order_relaxed)) ; 691 692 utils::Sleep(1); 693 end_flag.store(false, std::memory_order_relaxed); 694 695 test_arena.execute([&test_tasks, &wait, &test_context, task_number] { 696 for (std::size_t j = 0; j < task_number; ++j) { 697 tbb::detail::d1::spawn(test_tasks[j], test_context); 698 } 699 700 tbb::detail::d1::wait(wait, test_context); 701 wait.reserve(task_number); 702 }); 703 } 704 wait.release(task_number); 705 706 707 REQUIRE_MESSAGE(task_type::execute_counter() == task_number * iter_count, "Some task was not executed"); 708 REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled"); 709 } 710 711 struct resubmitting_task : public tbb::detail::d1::task { 712 tbb::task_arena& my_arena; 713 tbb::task_group_context& my_ctx; 714 std::atomic<int> counter{100000}; 715 716 resubmitting_task(tbb::task_arena& arena, tbb::task_group_context& ctx) : my_arena(arena), my_ctx(ctx) 717 {} 718 719 tbb::detail::d1::task* execute(tbb::detail::d1::execution_data& ) override { 720 if (counter-- > 0) { 721 submit(*this, my_arena, my_ctx, true); 722 } 723 return nullptr; 724 } 725 726 tbb::detail::d1::task* cancel( tbb::detail::d1::execution_data& ) override { 727 FAIL("The function should never be called."); 728 return nullptr; 729 } 730 }; 731 732 //! \brief \ref error_guessing 733 TEST_CASE("Test with priority inversion") { 734 if (!utils::can_change_thread_priority()) return; 735 736 std::uint32_t thread_number = static_cast<std::uint32_t>(utils::get_platform_max_threads()); 737 tbb::global_control gc(tbb::global_control::max_allowed_parallelism, thread_number + 1); 738 739 tbb::task_arena test_arena(2 * thread_number, thread_number); 740 test_arena.initialize(); 741 utils::pinning_observer obsr(test_arena); 742 CHECK_MESSAGE(obsr.is_observing(), "Arena observer has not been activated"); 743 744 std::uint32_t critical_task_counter = 1000 * thread_number; 745 std::atomic<std::size_t> task_counter{0}; 746 747 tbb::task_group_context test_context; 748 tbb::detail::d1::wait_context wait(critical_task_counter); 749 750 auto critical_work = [&] { 751 utils::doDummyWork(10); 752 }; 753 754 using suspend_task_type = CountingTask<decltype(critical_work)>; 755 suspend_task_type critical_task(critical_work, wait); 756 757 auto high_priority_thread_func = [&] { 758 // Increase external threads priority 759 utils::increase_thread_priority(); 760 // pin external threads 761 test_arena.execute([]{}); 762 while (task_counter++ < critical_task_counter) { 763 submit(critical_task, test_arena, test_context, true); 764 std::this_thread::sleep_for(std::chrono::milliseconds(1)); 765 } 766 }; 767 768 resubmitting_task worker_task(test_arena, test_context); 769 // warm up 770 // take first core on execute 771 utils::SpinBarrier barrier(thread_number + 1); 772 test_arena.execute([&] { 773 tbb::parallel_for(std::uint32_t(0), thread_number + 1, [&] (std::uint32_t&) { 774 barrier.wait(); 775 submit(worker_task, test_arena, test_context, true); 776 }); 777 }); 778 779 std::vector<std::thread> high_priority_threads; 780 for (std::size_t i = 0; i < thread_number - 1; ++i) { 781 high_priority_threads.emplace_back(high_priority_thread_func); 782 } 783 784 utils::increase_thread_priority(); 785 while (task_counter++ < critical_task_counter) { 786 submit(critical_task, test_arena, test_context, true); 787 std::this_thread::sleep_for(std::chrono::milliseconds(1)); 788 } 789 790 tbb::detail::d1::wait(wait, test_context); 791 792 for (std::size_t i = 0; i < thread_number - 1; ++i) { 793 high_priority_threads[i].join(); 794 } 795 obsr.observe(false); 796 } 797 798 // Explicit test for raii_guard move ctor because of copy elision optimization 799 // TODO: consider better test file for the test case 800 //! \brief \ref interface 801 TEST_CASE("raii_guard move ctor") { 802 int count{0}; 803 auto func = [&count] { 804 count++; 805 CHECK(count == 1); 806 }; 807 808 tbb::detail::d0::raii_guard<decltype(func)> guard1(func); 809 tbb::detail::d0::raii_guard<decltype(func)> guard2(std::move(guard1)); 810 } 811