1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "common/test.h" 18 #include "common/utils.h" 19 #include "common/dummy_body.h" 20 #include "common/spin_barrier.h" 21 #include "common/utils_concurrency_limit.h" 22 #include "common/cpu_usertime.h" 23 24 #include "tbb/task.h" 25 #include "tbb/task_group.h" 26 #include "tbb/parallel_for.h" 27 #include "tbb/cache_aligned_allocator.h" 28 #include "tbb/global_control.h" 29 #include "tbb/concurrent_vector.h" 30 31 #include <atomic> 32 #include <thread> 33 #include <thread> 34 35 //! \file test_task.cpp 36 //! \brief Test for [internal] functionality 37 struct EmptyBody { 38 void operator()() const {} 39 }; 40 41 #if _MSC_VER && !defined(__INTEL_COMPILER) 42 // unreachable code 43 #pragma warning( push ) 44 #pragma warning( disable: 4702 ) 45 #endif 46 47 template <typename Body = EmptyBody> 48 class CountingTask : public tbb::detail::d1::task { 49 public: 50 CountingTask( Body body, tbb::detail::d1::wait_context& wait ) : my_body(body), my_wait(wait) {} 51 52 CountingTask( tbb::detail::d1::wait_context& wait ) : my_wait(wait) {} 53 54 task* execute( tbb::detail::d1::execution_data& ) override { 55 ++my_execute_counter; 56 my_body(); 57 my_wait.release(); 58 return nullptr; 59 } 60 61 task* cancel( tbb::detail::d1::execution_data& ) override { 62 ++my_cancel_counter; 63 my_wait.release(); 64 return nullptr; 65 } 66 67 static void reset() { 68 my_execute_counter = 0; 69 my_cancel_counter = 0; 70 } 71 72 static std::size_t execute_counter() { return my_execute_counter; } 73 static std::size_t cancel_counter() { return my_cancel_counter; } 74 75 private: 76 Body my_body; 77 tbb::detail::d1::wait_context& my_wait; 78 79 static std::atomic<std::size_t> my_execute_counter; 80 static std::atomic<std::size_t> my_cancel_counter; 81 }; // struct CountingTask 82 83 84 #if _MSC_VER && !defined(__INTEL_COMPILER) 85 #pragma warning( pop ) 86 #endif // warning 4702 is back 87 88 template <typename Body> 89 std::atomic<std::size_t> CountingTask<Body>::my_execute_counter(0); 90 91 template <typename Body> 92 std::atomic<std::size_t> CountingTask<Body>::my_cancel_counter(0); 93 94 #if TBB_USE_EXCEPTIONS 95 void test_cancellation_on_exception( bool reset_ctx ) { 96 tbb::detail::d1::wait_context wait(1); 97 tbb::task_group_context test_context; 98 auto throw_body = [] { 99 throw 1; 100 }; 101 CountingTask<decltype(throw_body)> task(throw_body, wait); 102 103 constexpr std::size_t iter_counter = 1000; 104 for (std::size_t i = 0; i < iter_counter; ++i) { 105 try { 106 tbb::detail::d1::execute_and_wait(task, test_context, wait, test_context); 107 } catch(int ex) { 108 REQUIRE(ex == 1); 109 } 110 if (reset_ctx) { 111 test_context.reset(); 112 } 113 wait.reserve(1); 114 } 115 wait.release(1); 116 117 REQUIRE_MESSAGE(task.execute_counter() == (reset_ctx ? iter_counter : 1), "Some task was not executed"); 118 REQUIRE_MESSAGE(task.cancel_counter() == iter_counter, "Some task was not canceled after the exception occurs"); 119 task.reset(); 120 } 121 #endif // TBB_USE_EXCEPTIONS 122 123 //! \brief \ref error_guessing 124 TEST_CASE("External threads sleep") { 125 if (utils::get_platform_max_threads() < 2) return; 126 utils::SpinBarrier barrier(2); 127 128 tbb::task_group test_gr; 129 130 test_gr.run([&] { 131 barrier.wait(); 132 TestCPUUserTime(2); 133 }); 134 135 barrier.wait(); 136 137 test_gr.wait(); 138 } 139 140 //! \brief \ref error_guessing 141 TEST_CASE("Test that task was executed p times") { 142 tbb::detail::d1::wait_context wait(1); 143 tbb::task_group_context test_context; 144 CountingTask<> test_task(wait); 145 146 constexpr std::size_t iter_counter = 10000; 147 for (std::size_t i = 0; i < iter_counter; ++i) { 148 tbb::detail::d1::execute_and_wait(test_task, test_context, wait, test_context); 149 wait.reserve(1); 150 } 151 152 wait.release(1); 153 154 REQUIRE_MESSAGE(CountingTask<>::execute_counter() == iter_counter, "The task was not executed necessary times"); 155 REQUIRE_MESSAGE(CountingTask<>::cancel_counter() == 0, "Some instance of the task was canceled"); 156 CountingTask<>::reset(); 157 } 158 159 #if TBB_USE_EXCEPTIONS 160 //! \brief \ref error_guessing 161 TEST_CASE("Test cancellation on exception") { 162 test_cancellation_on_exception(/*reset_ctx = */true); 163 test_cancellation_on_exception(/*reset_ctx = */false); 164 } 165 #endif // TBB_USE_EXCEPTIONS 166 167 //! \brief \ref error_guessing 168 TEST_CASE("Simple test parallelism usage") { 169 std::uint32_t threads_num = static_cast<std::uint32_t>(utils::get_platform_max_threads()); 170 utils::SpinBarrier barrier(threads_num); 171 172 auto barrier_wait = [&barrier] { 173 barrier.wait(); 174 }; 175 176 tbb::detail::d1::wait_context wait(threads_num); 177 tbb::detail::d1::task_group_context test_context; 178 using task_type = CountingTask<decltype(barrier_wait)>; 179 180 std::vector<task_type, tbb::cache_aligned_allocator<task_type>> vector_test_task(threads_num, task_type(barrier_wait, wait)); 181 182 constexpr std::size_t iter_counter = 100; 183 for (std::size_t i = 0; i < iter_counter; ++i) { 184 for (std::size_t j = 0; j < threads_num; ++j) { 185 tbb::detail::d1::spawn(vector_test_task[j], test_context); 186 } 187 tbb::detail::d1::wait(wait, test_context); 188 wait.reserve(threads_num); 189 } 190 wait.release(threads_num); 191 192 REQUIRE_MESSAGE(task_type::execute_counter() == iter_counter * threads_num, "Some task was not executed"); 193 REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled"); 194 task_type::reset(); 195 } 196 197 //! \brief \ref error_guessing 198 TEST_CASE("Test parallelism usage with parallel_for") { 199 std::uint32_t task_threads_num = static_cast<std::uint32_t>(utils::get_platform_max_threads()); 200 utils::SpinBarrier barrier(task_threads_num); 201 202 auto barrier_wait = [&barrier] { 203 barrier.wait(); 204 }; 205 206 std::size_t pfor_iter_count = 10000; 207 std::atomic<std::size_t> pfor_counter(0); 208 209 auto parallel_for_func = [&pfor_counter, pfor_iter_count] { 210 tbb::parallel_for(tbb::blocked_range<std::size_t>(0, pfor_iter_count), 211 [&pfor_counter] (tbb::blocked_range<std::size_t>& range) { 212 for (auto it = range.begin(); it != range.end(); ++it) { 213 ++pfor_counter; 214 } 215 } 216 ); 217 }; 218 219 tbb::detail::d1::wait_context wait(task_threads_num); 220 tbb::detail::d1::task_group_context test_context; 221 using task_type = CountingTask<decltype(barrier_wait)>; 222 std::vector<task_type, tbb::cache_aligned_allocator<task_type>> vector_test_task(task_threads_num, task_type(barrier_wait, wait)); 223 224 constexpr std::size_t iter_count = 10; 225 constexpr std::size_t pfor_threads_num = 4; 226 for (std::size_t i = 0; i < iter_count; ++i) { 227 std::vector<std::thread> pfor_threads; 228 229 for (std::size_t j = 0; j < task_threads_num; ++j) { 230 tbb::detail::d1::spawn(vector_test_task[j], test_context); 231 } 232 233 for (std::size_t k = 0; k < pfor_threads_num; ++k) { 234 pfor_threads.emplace_back(parallel_for_func); 235 } 236 237 tbb::detail::d1::wait(wait, test_context); 238 239 for (auto& thread : pfor_threads) { 240 if (thread.joinable()) { 241 thread.join(); 242 } 243 } 244 245 wait.reserve(task_threads_num); 246 } 247 wait.release(task_threads_num); 248 249 REQUIRE_MESSAGE(task_type::execute_counter() == task_threads_num * iter_count, "Some task was not executed"); 250 REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled"); 251 REQUIRE_MESSAGE(pfor_counter == iter_count * pfor_threads_num * pfor_iter_count, "Some parallel_for thread was not finished"); 252 task_type::reset(); 253 } 254 255 //! \brief \ref error_guessing 256 TEST_CASE("Test parallelism usage with spawn tasks in different threads") { 257 std::uint32_t threads_num = static_cast<std::uint32_t>(utils::get_platform_max_threads()); 258 utils::SpinBarrier barrier(threads_num); 259 260 auto barrier_wait = [&barrier] { 261 barrier.wait(); 262 }; 263 264 tbb::detail::d1::wait_context wait(threads_num); 265 tbb::detail::d1::task_group_context test_context; 266 using task_type = CountingTask<decltype(barrier_wait)>; 267 std::vector<task_type, tbb::cache_aligned_allocator<task_type>> vector_test_task(threads_num, task_type(barrier_wait, wait)); 268 269 auto thread_func = [&vector_test_task, &test_context] ( std::size_t idx ) { 270 tbb::detail::d1::spawn(vector_test_task[idx], test_context); 271 }; 272 273 constexpr std::size_t iter_count = 10; 274 for (std::size_t i = 0; i < iter_count; ++i) { 275 std::vector<std::thread> threads; 276 277 for (std::size_t k = 0; k < threads_num - 1; ++k) { 278 threads.emplace_back(thread_func, k); 279 } 280 281 for (auto& thread : threads) { 282 if (thread.joinable()) { 283 thread.join(); 284 } 285 } 286 287 tbb::detail::d1::execute_and_wait(vector_test_task[threads_num - 1], test_context, wait, test_context); 288 wait.reserve(threads_num); 289 } 290 wait.release(threads_num); 291 292 REQUIRE_MESSAGE(task_type::execute_counter() == iter_count * threads_num, "Some task was not executed"); 293 REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled"); 294 task_type::reset(); 295 } 296 297 class SpawningTaskBody; 298 299 using SpawningTask = CountingTask<SpawningTaskBody>; 300 301 class SpawningTaskBody { 302 public: 303 using task_pool_type = std::vector<SpawningTask, tbb::cache_aligned_allocator<SpawningTask>>; 304 305 SpawningTaskBody( task_pool_type& task_pool, tbb::task_group_context& test_ctx ) 306 : my_task_pool(task_pool), my_test_ctx(test_ctx) {} 307 308 void operator()() const { 309 std::size_t delta = 7; 310 std::size_t start_idx = my_current_task.fetch_add(delta); 311 312 if (start_idx < my_task_pool.size()) { 313 for (std::size_t idx = start_idx; idx != std::min(my_task_pool.size(), start_idx + delta); ++idx) { 314 tbb::detail::d1::spawn(my_task_pool[idx], my_test_ctx); 315 } 316 } 317 } 318 private: 319 task_pool_type& my_task_pool; 320 tbb::task_group_context& my_test_ctx; 321 static std::atomic<std::size_t> my_current_task; 322 }; // class SpawningTaskBody 323 324 std::atomic<std::size_t> SpawningTaskBody::my_current_task(0); 325 326 //! \brief \ref error_guessing 327 TEST_CASE("Actively adding tasks") { 328 std::uint32_t task_number = 500 * static_cast<std::uint32_t>(utils::get_platform_max_threads()); 329 330 tbb::detail::d1::wait_context wait(task_number + 1); 331 tbb::task_group_context test_context; 332 333 SpawningTaskBody::task_pool_type task_pool; 334 335 SpawningTaskBody task_body{task_pool, test_context}; 336 for (std::size_t i = 0; i < task_number; ++i) { 337 task_pool.emplace_back(task_body, wait); 338 } 339 340 SpawningTask first_task(task_body, wait); 341 tbb::detail::d1::execute_and_wait(first_task, test_context, wait, test_context); 342 343 REQUIRE_MESSAGE(SpawningTask::execute_counter() == task_number + 1, "Some tasks were not executed"); // Is it right? 344 REQUIRE_MESSAGE(SpawningTask::cancel_counter() == 0, "Some tasks were canceled"); 345 } 346 347 #if __TBB_RESUMABLE_TASKS 348 struct suspended_task : public tbb::detail::d1::task { 349 350 suspended_task(tbb::task::suspend_point tag, tbb::detail::d1::wait_context& wait) 351 : my_suspend_tag(tag), my_wait(wait) 352 {} 353 354 task* execute(tbb::detail::d1::execution_data&) override { 355 tbb::parallel_for(tbb::blocked_range<std::size_t>(0, 100000), 356 [] (const tbb::blocked_range<std::size_t>& range) { 357 // Make some heavy work 358 std::atomic<int> sum{}; 359 for (auto it = range.begin(); it != range.end(); ++it) { 360 ++sum; 361 } 362 }, 363 tbb::static_partitioner{} 364 ); 365 366 my_wait.release(); 367 tbb::task::resume(my_suspend_tag); 368 return nullptr; 369 } 370 371 task* cancel(tbb::detail::d1::execution_data&) override { 372 FAIL("The function should never be called."); 373 return nullptr; 374 } 375 376 tbb::task::suspend_point my_suspend_tag; 377 tbb::detail::d1::wait_context& my_wait; 378 }; 379 380 //! \brief \ref error_guessing 381 TEST_CASE("Isolation + resumable tasks") { 382 std::atomic<int> suspend_flag{}; 383 tbb::task_group_context test_context; 384 385 std::atomic<int> suspend_count{}; 386 std::atomic<int> resume_count{}; 387 388 tbb::parallel_for(tbb::blocked_range<std::size_t>(0, 100000), 389 [&suspend_flag, &test_context, &suspend_count, &resume_count] (const tbb::blocked_range<std::size_t>& range) { 390 int ticket = 0; 391 for (auto it = range.begin(); it != range.end(); ++it) { 392 ticket = suspend_flag++; 393 } 394 395 if (ticket % 5 == 0) { 396 std::vector<suspended_task, tbb::cache_aligned_allocator<suspended_task>> test_task; 397 tbb::detail::d1::wait_context wait(1); 398 ++suspend_count; 399 tbb::this_task_arena::isolate([&wait, &test_context, &test_task] { 400 tbb::task::suspend([&wait, &test_context, &test_task] (tbb::task::suspend_point tag) { 401 test_task.emplace_back(tag, wait); 402 tbb::detail::d1::spawn(test_task[0], test_context); 403 }); 404 } 405 ); 406 tbb::detail::d1::wait(wait, test_context); 407 ++resume_count; 408 } 409 } 410 ); 411 412 CHECK(suspend_count == resume_count); 413 } 414 415 struct bypass_task : public tbb::detail::d1::task { 416 using task_pool_type = std::vector<bypass_task, tbb::cache_aligned_allocator<bypass_task>>; 417 418 bypass_task(tbb::detail::d1::wait_context& wait, task_pool_type& task_pool, 419 std::atomic<int>& resume_flag, tbb::task::suspend_point& suspend_tag) 420 : my_wait(wait), my_task_pool(task_pool), my_resume_flag(resume_flag), my_suspend_tag(suspend_tag) 421 {} 422 423 task* execute(tbb::detail::d1::execution_data&) override { 424 utils::doDummyWork(10000); 425 426 int expected = 1; 427 if (my_resume_flag.compare_exchange_strong(expected, 2)) { 428 tbb::task::resume(my_suspend_tag); 429 } 430 431 std::size_t ticket = my_current_task++; 432 task* next = ticket < my_task_pool.size() ? &my_task_pool[ticket] : nullptr; 433 434 if (!next && my_resume_flag != 2) { 435 // Rarely all tasks can be executed before the suspend. 436 // So, wait for the suspend before leaving. 437 utils::SpinWaitWhileEq(my_resume_flag, 0); 438 expected = 1; 439 if (my_resume_flag.compare_exchange_strong(expected, 2)) { 440 tbb::task::resume(my_suspend_tag); 441 } 442 } 443 444 my_wait.release(); 445 return next; 446 } 447 448 task* cancel(tbb::detail::d1::execution_data&) override { 449 FAIL("The function should never be called."); 450 return nullptr; 451 } 452 453 tbb::detail::d1::wait_context& my_wait; 454 task_pool_type& my_task_pool; 455 std::atomic<int>& my_resume_flag; 456 tbb::task::suspend_point& my_suspend_tag; 457 static std::atomic<int> my_current_task; 458 }; 459 460 std::atomic<int> bypass_task::my_current_task(0); 461 462 thread_local int test_tls = 0; 463 464 //! \brief \ref error_guessing 465 TEST_CASE("Bypass suspended by resume") { 466 std::uint32_t task_number = 500 * static_cast<std::uint32_t>(utils::get_platform_max_threads()); 467 tbb::task_group_context test_context; 468 tbb::detail::d1::wait_context wait(task_number + 1); 469 470 test_tls = 1; 471 472 std::atomic<int> resume_flag{0}; 473 tbb::task::suspend_point test_suspend_tag; 474 475 std::vector<bypass_task, tbb::cache_aligned_allocator<bypass_task>> test_task_pool; 476 477 for (std::size_t i = 0; i < task_number; ++i) { 478 test_task_pool.emplace_back(wait, test_task_pool, resume_flag, test_suspend_tag); 479 } 480 481 for (std::size_t i = 0; i < utils::get_platform_max_threads(); ++i) { 482 std::size_t ticket = bypass_task::my_current_task++; 483 if (ticket < test_task_pool.size()) { 484 tbb::detail::d1::spawn(test_task_pool[ticket], test_context); 485 } 486 } 487 488 auto suspend_func = [&resume_flag, &test_suspend_tag] { 489 tbb::task::suspend([&resume_flag, &test_suspend_tag] (tbb::task::suspend_point tag) { 490 test_suspend_tag = tag; 491 resume_flag = 1; 492 }); 493 }; 494 using task_type = CountingTask<decltype(suspend_func)>; 495 task_type suspend_task(suspend_func, wait); 496 497 tbb::detail::d1::execute_and_wait(suspend_task, test_context, wait, test_context); 498 CHECK(bypass_task::my_current_task >= test_task_pool.size()); 499 REQUIRE_MESSAGE(test_tls == 1, "The wrong thread came out"); 500 } 501 502 //! \brief \ref error_guessing 503 TEST_CASE("Critical tasks + resume") { 504 std::uint32_t task_number = 500 * static_cast<std::uint32_t>(utils::get_platform_max_threads()); 505 506 tbb::task_group_context test_context; 507 tbb::detail::d1::wait_context wait{ 0 }; 508 509 // The test expects at least one thread in test_arena 510 int num_threads_in_test_arena = std::max(2, int(utils::get_platform_max_threads())); 511 tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, num_threads_in_test_arena); 512 tbb::task_arena test_arena(num_threads_in_test_arena); 513 514 test_arena.initialize(); 515 516 std::atomic<bool> resume_flag{}, resumed{}; 517 tbb::task::suspend_point test_suspend_tag; 518 519 auto task_body = [&resume_flag, &resumed, &test_suspend_tag] { 520 // Make some work 521 utils::doDummyWork(1000); 522 523 if (resume_flag.exchange(false)) { 524 tbb::task::resume(test_suspend_tag); 525 resumed = true; 526 } 527 }; 528 529 using task_type = CountingTask<decltype(task_body)>; 530 std::vector<task_type, tbb::cache_aligned_allocator<task_type>> test_tasks; 531 532 for (std::size_t i = 0; i < task_number; ++i) { 533 test_tasks.emplace_back(task_body, wait); 534 } 535 536 wait.reserve(task_number / 2); 537 for (std::size_t i = 0; i < task_number / 2; ++i) { 538 submit(test_tasks[i], test_arena, test_context, true); 539 } 540 541 auto suspend_func = [&resume_flag, &test_suspend_tag] { 542 tbb::task::suspend([&resume_flag, &test_suspend_tag] (tbb::task::suspend_point tag) { 543 test_suspend_tag = tag; 544 resume_flag.store(true, std::memory_order_release); 545 }); 546 }; 547 using suspend_task_type = CountingTask<decltype(suspend_func)>; 548 suspend_task_type suspend_task(suspend_func, wait); 549 550 wait.reserve(1); 551 submit(suspend_task, test_arena, test_context, true); 552 553 test_arena.execute([&wait, &test_tasks, &test_arena, &test_context, &resumed, task_number] { 554 tbb::this_task_arena::isolate([&wait, &test_tasks, &test_arena, &test_context, &resumed, task_number] { 555 do { 556 wait.reserve(task_number / 2); 557 tbb::parallel_for(tbb::blocked_range<std::size_t>(task_number / 2, task_number), 558 [&test_tasks, &test_arena, &test_context] (tbb::blocked_range<std::size_t>& range) { 559 for (std::size_t i = range.begin(); i != range.end(); ++i) { 560 submit(test_tasks[i], test_arena, test_context, true); 561 } 562 } 563 ); 564 } while (!resumed); 565 }); 566 }); 567 568 test_arena.execute([&wait, &test_context] { 569 tbb::detail::d1::wait(wait, test_context); 570 }); 571 } 572 573 //! \brief \ref error_guessing 574 TEST_CASE("Stress testing") { 575 std::uint32_t task_number = static_cast<std::uint32_t>(utils::get_platform_max_threads()); 576 577 tbb::task_group_context test_context; 578 tbb::detail::d1::wait_context wait(task_number); 579 580 tbb::task_arena test_arena; 581 582 test_arena.initialize(); 583 584 auto task_body = [] { 585 tbb::parallel_for(tbb::blocked_range<std::size_t>(0, 1000), [] (tbb::blocked_range<std::size_t>&) { 586 utils::doDummyWork(100); 587 }); 588 }; 589 using task_type = CountingTask<decltype(task_body)>; 590 591 std::size_t iter_counter = 20; 592 593 std::vector<task_type, tbb::cache_aligned_allocator<task_type>> test_tasks; 594 595 for (std::size_t j = 0; j < task_number; ++j) { 596 test_tasks.emplace_back(task_body, wait); 597 } 598 599 test_arena.execute([&test_tasks, &task_body, &wait, &test_context, &test_arena, iter_counter, task_number] { 600 for (std::size_t i = 0; i < iter_counter; ++i) { 601 602 for (std::size_t j = 0; j < task_number; ++j) { 603 test_arena.enqueue(task_body); 604 } 605 606 for (std::size_t j = 0; j < task_number / 2; ++j) { 607 tbb::detail::d1::spawn(test_tasks[j], test_context); 608 } 609 610 for (std::size_t j = task_number / 2; j < task_number; ++j) { 611 submit(test_tasks[j], test_arena, test_context, true); 612 } 613 614 tbb::detail::d1::wait(wait, test_context); 615 wait.reserve(task_number); 616 } 617 wait.release(task_number); 618 }); 619 620 621 REQUIRE_MESSAGE(task_type::execute_counter() == task_number * iter_counter, "Some task was not executed"); 622 REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled"); 623 } 624 625 //! \brief \ref error_guessing 626 TEST_CASE("All workers sleep") { 627 std::uint32_t thread_number = static_cast<std::uint32_t>(utils::get_platform_max_threads()); 628 tbb::concurrent_vector<tbb::task::suspend_point> suspend_points; 629 630 tbb::task_group test_gr; 631 632 utils::SpinBarrier barrier(thread_number); 633 auto resumble_task = [&] { 634 barrier.wait(); 635 tbb::task::suspend([&] (tbb::task::suspend_point sp) { 636 suspend_points.push_back(sp); 637 barrier.wait(); 638 }); 639 }; 640 641 for (std::size_t i = 0; i < thread_number - 1; ++i) { 642 test_gr.run(resumble_task); 643 } 644 645 barrier.wait(); 646 barrier.wait(); 647 TestCPUUserTime(thread_number); 648 649 for (auto sp : suspend_points) 650 tbb::task::resume(sp); 651 test_gr.wait(); 652 } 653 654 #endif // __TBB_RESUMABLE_TASKS 655 656 //! \brief \ref error_guessing 657 TEST_CASE("Enqueue with exception") { 658 std::uint32_t task_number = 500 * static_cast<std::uint32_t>(utils::get_platform_max_threads()); 659 660 tbb::task_group_context test_context; 661 tbb::detail::d1::wait_context wait(task_number); 662 663 tbb::task_arena test_arena{int(std::thread::hardware_concurrency() + 1)}; 664 665 test_arena.initialize(); 666 667 auto task_body = [] { 668 utils::doDummyWork(100); 669 }; 670 671 std::atomic<bool> end_flag{false}; 672 auto check_body = [&end_flag] { 673 end_flag.store(true, std::memory_order_relaxed); 674 }; 675 676 using task_type = CountingTask<decltype(task_body)>; 677 std::vector<task_type, tbb::cache_aligned_allocator<task_type>> test_tasks; 678 679 for (std::size_t j = 0; j < task_number; ++j) { 680 test_tasks.emplace_back(task_body, wait); 681 } 682 683 { 684 tbb::global_control gc(tbb::global_control::max_allowed_parallelism, 1); 685 test_arena.enqueue(task_body); 686 // Initialize implicit arena 687 tbb::parallel_for(0, 1, [] (int) {}); 688 tbb::task_arena test_arena2(tbb::task_arena::attach{}); 689 test_arena2.enqueue(task_body); 690 } 691 692 constexpr std::size_t iter_count = 10; 693 for (std::size_t k = 0; k < iter_count; ++k) { 694 tbb::global_control gc(tbb::global_control::max_allowed_parallelism, 1); 695 test_arena.enqueue(check_body); 696 697 while (!end_flag.load(std::memory_order_relaxed)) ; 698 699 utils::Sleep(1); 700 end_flag.store(false, std::memory_order_relaxed); 701 702 test_arena.execute([&test_tasks, &wait, &test_context, task_number] { 703 for (std::size_t j = 0; j < task_number; ++j) { 704 tbb::detail::d1::spawn(test_tasks[j], test_context); 705 } 706 707 tbb::detail::d1::wait(wait, test_context); 708 wait.reserve(task_number); 709 }); 710 } 711 wait.release(task_number); 712 713 714 REQUIRE_MESSAGE(task_type::execute_counter() == task_number * iter_count, "Some task was not executed"); 715 REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled"); 716 } 717 718 struct resubmitting_task : public tbb::detail::d1::task { 719 tbb::task_arena& my_arena; 720 tbb::task_group_context& my_ctx; 721 std::atomic<int> counter{100000}; 722 723 resubmitting_task(tbb::task_arena& arena, tbb::task_group_context& ctx) : my_arena(arena), my_ctx(ctx) 724 {} 725 726 tbb::detail::d1::task* execute(tbb::detail::d1::execution_data& ) override { 727 if (counter-- > 0) { 728 submit(*this, my_arena, my_ctx, true); 729 } 730 return nullptr; 731 } 732 733 tbb::detail::d1::task* cancel( tbb::detail::d1::execution_data& ) override { 734 FAIL("The function should never be called."); 735 return nullptr; 736 } 737 }; 738 739 //! \brief \ref error_guessing 740 TEST_CASE("Test with priority inversion") { 741 if (!utils::can_change_thread_priority()) return; 742 743 std::uint32_t thread_number = static_cast<std::uint32_t>(utils::get_platform_max_threads()); 744 tbb::global_control gc(tbb::global_control::max_allowed_parallelism, thread_number + 1); 745 746 tbb::task_arena test_arena(2 * thread_number, thread_number); 747 test_arena.initialize(); 748 utils::pinning_observer obsr(test_arena); 749 CHECK_MESSAGE(obsr.is_observing(), "Arena observer has not been activated"); 750 751 std::uint32_t critical_task_counter = 1000 * thread_number; 752 std::atomic<std::size_t> task_counter{0}; 753 754 tbb::task_group_context test_context; 755 tbb::detail::d1::wait_context wait(critical_task_counter); 756 757 auto critical_work = [&] { 758 utils::doDummyWork(10); 759 }; 760 761 using suspend_task_type = CountingTask<decltype(critical_work)>; 762 suspend_task_type critical_task(critical_work, wait); 763 764 auto high_priority_thread_func = [&] { 765 // Increase external threads priority 766 utils::increase_thread_priority(); 767 // pin external threads 768 test_arena.execute([]{}); 769 while (task_counter++ < critical_task_counter) { 770 submit(critical_task, test_arena, test_context, true); 771 std::this_thread::sleep_for(std::chrono::milliseconds(1)); 772 } 773 }; 774 775 resubmitting_task worker_task(test_arena, test_context); 776 // warm up 777 // take first core on execute 778 utils::SpinBarrier barrier(thread_number + 1); 779 test_arena.execute([&] { 780 tbb::parallel_for(std::uint32_t(0), thread_number + 1, [&] (std::uint32_t&) { 781 barrier.wait(); 782 submit(worker_task, test_arena, test_context, true); 783 }); 784 }); 785 786 std::vector<std::thread> high_priority_threads; 787 for (std::size_t i = 0; i < thread_number - 1; ++i) { 788 high_priority_threads.emplace_back(high_priority_thread_func); 789 } 790 791 utils::increase_thread_priority(); 792 while (task_counter++ < critical_task_counter) { 793 submit(critical_task, test_arena, test_context, true); 794 std::this_thread::sleep_for(std::chrono::milliseconds(1)); 795 } 796 797 tbb::detail::d1::wait(wait, test_context); 798 799 for (std::size_t i = 0; i < thread_number - 1; ++i) { 800 high_priority_threads[i].join(); 801 } 802 obsr.observe(false); 803 } 804 805 // Explicit test for raii_guard move ctor because of copy elision optimization 806 // TODO: consider better test file for the test case 807 //! \brief \ref interface 808 TEST_CASE("raii_guard move ctor") { 809 int count{0}; 810 auto func = [&count] { 811 count++; 812 CHECK(count == 1); 813 }; 814 815 tbb::detail::d0::raii_guard<decltype(func)> guard1(func); 816 tbb::detail::d0::raii_guard<decltype(func)> guard2(std::move(guard1)); 817 } 818