1 /* 2 Copyright (c) 2005-2020 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "common/test.h" 18 #include "common/utils.h" 19 #include "common/spin_barrier.h" 20 #include "common/utils_concurrency_limit.h" 21 #include "common/cpu_usertime.h" 22 23 #include "tbb/task.h" 24 #include "tbb/task_group.h" 25 #include "tbb/parallel_for.h" 26 #include "tbb/cache_aligned_allocator.h" 27 #include "tbb/global_control.h" 28 #include "tbb/concurrent_vector.h" 29 30 #include <atomic> 31 #include <thread> 32 #include <thread> 33 34 //! \file test_task.cpp 35 //! \brief Test for [internal] functionality 36 37 struct EmptyBody { 38 void operator()() const {} 39 }; 40 41 #if _MSC_VER && !defined(__INTEL_COMPILER) 42 // unreachable code 43 #pragma warning( push ) 44 #pragma warning( disable: 4702 ) 45 #endif 46 47 template <typename Body = EmptyBody> 48 class CountingTask : public tbb::detail::d1::task { 49 public: 50 CountingTask( Body body, tbb::detail::d1::wait_context& wait ) : my_body(body), my_wait(wait) {} 51 52 CountingTask( tbb::detail::d1::wait_context& wait ) : my_wait(wait) {} 53 54 task* execute( tbb::detail::d1::execution_data& ) override { 55 ++my_execute_counter; 56 my_body(); 57 my_wait.release(); 58 return nullptr; 59 } 60 61 task* cancel( tbb::detail::d1::execution_data& ) override { 62 ++my_cancel_counter; 63 my_wait.release(); 64 return nullptr; 65 } 66 67 static void reset() { 68 my_execute_counter = 0; 69 my_cancel_counter = 0; 70 } 71 72 static std::size_t execute_counter() { return my_execute_counter; } 73 static std::size_t cancel_counter() { return my_cancel_counter; } 74 75 private: 76 Body my_body; 77 tbb::detail::d1::wait_context& my_wait; 78 79 static std::atomic<std::size_t> my_execute_counter; 80 static std::atomic<std::size_t> my_cancel_counter; 81 }; // struct CountingTask 82 83 84 #if _MSC_VER && !defined(__INTEL_COMPILER) 85 #pragma warning( pop ) 86 #endif // warning 4702 is back 87 88 template <typename Body> 89 std::atomic<std::size_t> CountingTask<Body>::my_execute_counter(0); 90 91 template <typename Body> 92 std::atomic<std::size_t> CountingTask<Body>::my_cancel_counter(0); 93 94 #if TBB_USE_EXCEPTIONS 95 void test_cancellation_on_exception( bool reset_ctx ) { 96 tbb::detail::d1::wait_context wait(1); 97 tbb::task_group_context test_context; 98 auto throw_body = [] { 99 throw 1; 100 }; 101 CountingTask<decltype(throw_body)> task(throw_body, wait); 102 103 constexpr std::size_t iter_counter = 1000; 104 for (std::size_t i = 0; i < iter_counter; ++i) { 105 try { 106 tbb::detail::d1::execute_and_wait(task, test_context, wait, test_context); 107 } catch(int ex) { 108 REQUIRE(ex == 1); 109 } 110 if (reset_ctx) { 111 test_context.reset(); 112 } 113 wait.reserve(1); 114 } 115 wait.release(1); 116 117 REQUIRE_MESSAGE(task.execute_counter() == (reset_ctx ? iter_counter : 1), "Some task was not executed"); 118 REQUIRE_MESSAGE(task.cancel_counter() == iter_counter, "Some task was not canceled after the exception occurs"); 119 task.reset(); 120 } 121 #endif // TBB_USE_EXCEPTIONS 122 123 //! \brief \ref error_guessing 124 TEST_CASE("Test that task was executed p times") { 125 tbb::detail::d1::wait_context wait(1); 126 tbb::task_group_context test_context; 127 CountingTask<> test_task(wait); 128 129 constexpr std::size_t iter_counter = 10000; 130 for (std::size_t i = 0; i < iter_counter; ++i) { 131 tbb::detail::d1::execute_and_wait(test_task, test_context, wait, test_context); 132 wait.reserve(1); 133 } 134 135 wait.release(1); 136 137 REQUIRE_MESSAGE(CountingTask<>::execute_counter() == iter_counter, "The task was not executed necessary times"); 138 REQUIRE_MESSAGE(CountingTask<>::cancel_counter() == 0, "Some instance of the task was canceled"); 139 CountingTask<>::reset(); 140 } 141 142 #if TBB_USE_EXCEPTIONS 143 //! \brief \ref error_guessing 144 TEST_CASE("Test cancellation on exception") { 145 test_cancellation_on_exception(/*reset_ctx = */true); 146 test_cancellation_on_exception(/*reset_ctx = */false); 147 } 148 #endif // TBB_USE_EXCEPTIONS 149 150 //! \brief \ref error_guessing 151 TEST_CASE("Simple test parallelism usage") { 152 std::size_t threads_num = utils::get_platform_max_threads(); 153 utils::SpinBarrier barrier(threads_num); 154 155 auto barrier_wait = [&barrier] { 156 barrier.wait(); 157 }; 158 159 tbb::detail::d1::wait_context wait(threads_num); 160 tbb::detail::d1::task_group_context test_context; 161 using task_type = CountingTask<decltype(barrier_wait)>; 162 163 std::vector<task_type, tbb::cache_aligned_allocator<task_type>> vector_test_task(threads_num, task_type(barrier_wait, wait)); 164 165 constexpr std::size_t iter_counter = 100; 166 for (std::size_t i = 0; i < iter_counter; ++i) { 167 for (std::size_t j = 0; j < threads_num; ++j) { 168 tbb::detail::d1::spawn(vector_test_task[j], test_context); 169 } 170 tbb::detail::d1::wait(wait, test_context); 171 wait.reserve(threads_num); 172 } 173 wait.release(threads_num); 174 175 REQUIRE_MESSAGE(task_type::execute_counter() == iter_counter * threads_num, "Some task was not executed"); 176 REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled"); 177 task_type::reset(); 178 } 179 180 //! \brief \ref error_guessing 181 TEST_CASE("Test parallelism usage with parallel_for") { 182 std::size_t task_threads_num = utils::get_platform_max_threads(); 183 utils::SpinBarrier barrier(task_threads_num); 184 185 auto barrier_wait = [&barrier] { 186 barrier.wait(); 187 }; 188 189 std::size_t pfor_iter_count = 10000; 190 std::atomic<std::size_t> pfor_counter(0); 191 192 auto parallel_for_func = [&pfor_counter, pfor_iter_count] { 193 tbb::parallel_for(tbb::blocked_range<std::size_t>(0, pfor_iter_count), 194 [&pfor_counter] (tbb::blocked_range<std::size_t>& range) { 195 for (auto it = range.begin(); it != range.end(); ++it) { 196 ++pfor_counter; 197 } 198 } 199 ); 200 }; 201 202 tbb::detail::d1::wait_context wait(task_threads_num); 203 tbb::detail::d1::task_group_context test_context; 204 using task_type = CountingTask<decltype(barrier_wait)>; 205 std::vector<task_type, tbb::cache_aligned_allocator<task_type>> vector_test_task(task_threads_num, task_type(barrier_wait, wait)); 206 207 constexpr std::size_t iter_count = 10; 208 constexpr std::size_t pfor_threads_num = 4; 209 for (std::size_t i = 0; i < iter_count; ++i) { 210 std::vector<std::thread> pfor_threads; 211 212 for (std::size_t j = 0; j < task_threads_num; ++j) { 213 tbb::detail::d1::spawn(vector_test_task[j], test_context); 214 } 215 216 for (std::size_t k = 0; k < pfor_threads_num; ++k) { 217 pfor_threads.emplace_back(parallel_for_func); 218 } 219 220 tbb::detail::d1::wait(wait, test_context); 221 222 for (auto& thread : pfor_threads) { 223 if (thread.joinable()) { 224 thread.join(); 225 } 226 } 227 228 wait.reserve(task_threads_num); 229 } 230 wait.release(task_threads_num); 231 232 REQUIRE_MESSAGE(task_type::execute_counter() == task_threads_num * iter_count, "Some task was not executed"); 233 REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled"); 234 REQUIRE_MESSAGE(pfor_counter == iter_count * pfor_threads_num * pfor_iter_count, "Some parallel_for thread was not finished"); 235 task_type::reset(); 236 } 237 238 //! \brief \ref error_guessing 239 TEST_CASE("Test parallelism usage with spawn tasks in different threads") { 240 std::size_t threads_num = utils::get_platform_max_threads(); 241 utils::SpinBarrier barrier(threads_num); 242 243 auto barrier_wait = [&barrier] { 244 barrier.wait(); 245 }; 246 247 tbb::detail::d1::wait_context wait(threads_num); 248 tbb::detail::d1::task_group_context test_context; 249 using task_type = CountingTask<decltype(barrier_wait)>; 250 std::vector<task_type, tbb::cache_aligned_allocator<task_type>> vector_test_task(threads_num, task_type(barrier_wait, wait)); 251 252 auto thread_func = [&vector_test_task, &test_context] ( std::size_t idx ) { 253 tbb::detail::d1::spawn(vector_test_task[idx], test_context); 254 }; 255 256 constexpr std::size_t iter_count = 10; 257 for (std::size_t i = 0; i < iter_count; ++i) { 258 std::vector<std::thread> threads; 259 260 for (std::size_t k = 0; k < threads_num - 1; ++k) { 261 threads.emplace_back(thread_func, k); 262 } 263 264 for (auto& thread : threads) { 265 if (thread.joinable()) { 266 thread.join(); 267 } 268 } 269 270 tbb::detail::d1::execute_and_wait(vector_test_task[threads_num - 1], test_context, wait, test_context); 271 wait.reserve(threads_num); 272 } 273 wait.release(threads_num); 274 275 REQUIRE_MESSAGE(task_type::execute_counter() == iter_count * threads_num, "Some task was not executed"); 276 REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled"); 277 task_type::reset(); 278 } 279 280 class SpawningTaskBody; 281 282 using SpawningTask = CountingTask<SpawningTaskBody>; 283 284 class SpawningTaskBody { 285 public: 286 using task_pool_type = std::vector<SpawningTask, tbb::cache_aligned_allocator<SpawningTask>>; 287 288 SpawningTaskBody( task_pool_type& task_pool, tbb::task_group_context& test_ctx ) 289 : my_task_pool(task_pool), my_test_ctx(test_ctx) {} 290 291 void operator()() const { 292 std::size_t delta = 7; 293 std::size_t start_idx = my_current_task.fetch_add(delta); 294 295 if (start_idx < my_task_pool.size()) { 296 for (std::size_t idx = start_idx; idx != std::min(my_task_pool.size(), start_idx + delta); ++idx) { 297 tbb::detail::d1::spawn(my_task_pool[idx], my_test_ctx); 298 } 299 } 300 } 301 private: 302 task_pool_type& my_task_pool; 303 tbb::task_group_context& my_test_ctx; 304 static std::atomic<std::size_t> my_current_task; 305 }; // class SpawningTaskBody 306 307 std::atomic<std::size_t> SpawningTaskBody::my_current_task(0); 308 309 //! \brief \ref error_guessing 310 TEST_CASE("Actively adding tasks") { 311 std::size_t task_number = 500 * utils::get_platform_max_threads(); 312 313 tbb::detail::d1::wait_context wait(task_number + 1); 314 tbb::task_group_context test_context; 315 316 SpawningTaskBody::task_pool_type task_pool; 317 318 SpawningTaskBody task_body{task_pool, test_context}; 319 for (std::size_t i = 0; i < task_number; ++i) { 320 task_pool.emplace_back(task_body, wait); 321 } 322 323 SpawningTask first_task(task_body, wait); 324 tbb::detail::d1::execute_and_wait(first_task, test_context, wait, test_context); 325 326 REQUIRE_MESSAGE(SpawningTask::execute_counter() == task_number + 1, "Some tasks were not executed"); // Is it right? 327 REQUIRE_MESSAGE(SpawningTask::cancel_counter() == 0, "Some tasks were canceled"); 328 } 329 330 #if __TBB_RESUMABLE_TASKS 331 332 struct suspended_task : public tbb::detail::d1::task { 333 334 suspended_task(tbb::task::suspend_point tag, tbb::detail::d1::wait_context& wait) 335 : my_suspend_tag(tag), my_wait(wait) 336 {} 337 338 task* execute(tbb::detail::d1::execution_data&) override { 339 tbb::parallel_for(tbb::blocked_range<std::size_t>(0, 100000), 340 [] (const tbb::blocked_range<std::size_t>& range) { 341 // Make some heavy work 342 std::atomic<int> sum{}; 343 for (auto it = range.begin(); it != range.end(); ++it) { 344 ++sum; 345 } 346 }, 347 tbb::static_partitioner{} 348 ); 349 350 my_wait.release(); 351 tbb::task::resume(my_suspend_tag); 352 return nullptr; 353 } 354 355 task* cancel(tbb::detail::d1::execution_data&) override { 356 FAIL("The function should never be called."); 357 return nullptr; 358 } 359 360 tbb::task::suspend_point my_suspend_tag; 361 tbb::detail::d1::wait_context& my_wait; 362 }; 363 364 //! \brief \ref error_guessing 365 TEST_CASE("Isolation + resumable tasks") { 366 std::atomic<int> suspend_flag{}; 367 tbb::task_group_context test_context; 368 369 std::atomic<int> suspend_count{}; 370 std::atomic<int> resume_count{}; 371 372 tbb::parallel_for(tbb::blocked_range<std::size_t>(0, 100000), 373 [&suspend_flag, &test_context, &suspend_count, &resume_count] (const tbb::blocked_range<std::size_t>& range) { 374 int ticket = 0; 375 for (auto it = range.begin(); it != range.end(); ++it) { 376 ticket = suspend_flag++; 377 } 378 379 if (ticket % 5 == 0) { 380 std::vector<suspended_task, tbb::cache_aligned_allocator<suspended_task>> test_task; 381 tbb::detail::d1::wait_context wait(1); 382 ++suspend_count; 383 tbb::this_task_arena::isolate([&wait, &test_context, &test_task] { 384 tbb::task::suspend([&wait, &test_context, &test_task] (tbb::task::suspend_point tag) { 385 test_task.emplace_back(tag, wait); 386 tbb::detail::d1::spawn(test_task[0], test_context); 387 }); 388 } 389 ); 390 tbb::detail::d1::wait(wait, test_context); 391 ++resume_count; 392 } 393 } 394 ); 395 396 CHECK(suspend_count == resume_count); 397 } 398 399 struct bypass_task : public tbb::detail::d1::task { 400 using task_pool_type = std::vector<bypass_task, tbb::cache_aligned_allocator<bypass_task>>; 401 402 bypass_task(tbb::detail::d1::wait_context& wait, task_pool_type& task_pool, 403 std::atomic<bool>& resume_flag, tbb::task::suspend_point& suspend_tag) 404 : my_wait(wait), my_task_pool(task_pool), my_resume_flag(resume_flag), my_suspend_tag(suspend_tag) 405 {} 406 407 task* execute(tbb::detail::d1::execution_data&) override { 408 std::atomic<int> sum{}; 409 410 // Make some heavy work 411 for (std::size_t i = 0; i < 100000; ++i) { 412 ++sum; 413 } 414 415 my_wait.release(); 416 417 if (my_resume_flag.exchange(false)) { 418 tbb::task::resume(my_suspend_tag); 419 } 420 421 std::size_t ticket = my_current_task++; 422 return ticket < my_task_pool.size() ? &my_task_pool[ticket] : nullptr; 423 } 424 425 task* cancel(tbb::detail::d1::execution_data&) override { 426 FAIL("The function should never be called."); 427 return nullptr; 428 } 429 430 tbb::detail::d1::wait_context& my_wait; 431 task_pool_type& my_task_pool; 432 std::atomic<bool>& my_resume_flag; 433 tbb::task::suspend_point& my_suspend_tag; 434 static std::atomic<int> my_current_task; 435 }; 436 437 std::atomic<int> bypass_task::my_current_task(0); 438 439 thread_local int test_tls = 0; 440 441 //! \brief \ref error_guessing 442 TEST_CASE("Bypass suspended by resume") { 443 std::size_t task_number = 500 * utils::get_platform_max_threads(); 444 tbb::task_group_context test_context; 445 tbb::detail::d1::wait_context wait(task_number + 1); 446 447 test_tls = 1; 448 449 std::atomic<bool> resume_flag{false}; 450 tbb::task::suspend_point test_suspend_tag; 451 452 std::vector<bypass_task, tbb::cache_aligned_allocator<bypass_task>> test_task_pool; 453 454 for (std::size_t i = 0; i < task_number; ++i) { 455 test_task_pool.emplace_back(wait, test_task_pool, resume_flag, test_suspend_tag); 456 } 457 458 for (std::size_t i = 0; i < utils::get_platform_max_threads(); ++i) { 459 tbb::detail::d1::spawn(test_task_pool[bypass_task::my_current_task++], test_context); 460 } 461 462 auto suspend_func = [&resume_flag, &test_suspend_tag] { 463 tbb::task::suspend([&resume_flag, &test_suspend_tag] (tbb::task::suspend_point tag) { 464 test_suspend_tag = tag; 465 resume_flag.store(true, std::memory_order_release); 466 }); 467 }; 468 using task_type = CountingTask<decltype(suspend_func)>; 469 task_type suspend_task(suspend_func, wait); 470 471 tbb::detail::d1::execute_and_wait(suspend_task, test_context, wait, test_context); 472 CHECK(bypass_task::my_current_task >= test_task_pool.size()); 473 REQUIRE_MESSAGE(test_tls == 1, "The wrong thread came out"); 474 } 475 476 //! \brief \ref error_guessing 477 TEST_CASE("Critical tasks + resume") { 478 std::size_t task_number = 500 * utils::get_platform_max_threads(); 479 480 tbb::task_group_context test_context; 481 tbb::detail::d1::wait_context wait(task_number); 482 483 tbb::task_arena test_arena; 484 485 test_arena.initialize(); 486 487 std::atomic<bool> resume_flag{}; 488 tbb::task::suspend_point test_suspend_tag; 489 490 auto task_body = [&resume_flag, &test_suspend_tag] { 491 std::atomic<int> sum{}; 492 493 // Make some work 494 for (std::size_t i = 0; i < 1000; ++i) { 495 ++sum; 496 } 497 498 if (resume_flag.exchange(false)) { 499 tbb::task::resume(test_suspend_tag); 500 } 501 }; 502 503 using task_type = CountingTask<decltype(task_body)>; 504 std::vector<task_type, tbb::cache_aligned_allocator<task_type>> test_tasks; 505 506 for (std::size_t i = 0; i < task_number; ++i) { 507 test_tasks.emplace_back(task_body, wait); 508 } 509 510 for (std::size_t i = 0; i < task_number / 2; ++i) { 511 submit(test_tasks[i], test_arena, test_context, true); 512 } 513 514 auto suspend_func = [&resume_flag, &test_suspend_tag] { 515 tbb::task::suspend([&resume_flag, &test_suspend_tag] (tbb::task::suspend_point tag) { 516 test_suspend_tag = tag; 517 resume_flag.store(true, std::memory_order_release); 518 }); 519 }; 520 using suspend_task_type = CountingTask<decltype(suspend_func)>; 521 suspend_task_type suspend_task(suspend_func, wait); 522 523 submit(suspend_task, test_arena, test_context, true); 524 525 test_arena.execute([&test_tasks, &test_arena, &test_context, task_number] { 526 tbb::this_task_arena::isolate([&test_tasks, &test_arena, &test_context, task_number] { 527 tbb::parallel_for(tbb::blocked_range<std::size_t>(task_number / 2, task_number - 1), 528 [&test_tasks, &test_arena, &test_context] (tbb::blocked_range<std::size_t>& range) { 529 for (std::size_t i = range.begin(); i != range.end(); ++i) { 530 submit(test_tasks[i], test_arena, test_context, true); 531 } 532 }); 533 }); 534 }); 535 536 tbb::detail::d1::wait(wait, test_context); 537 } 538 539 //! \brief \ref error_guessing 540 TEST_CASE("Stress testing") { 541 std::size_t task_number = utils::get_platform_max_threads(); 542 543 tbb::task_group_context test_context; 544 tbb::detail::d1::wait_context wait(task_number); 545 546 tbb::task_arena test_arena; 547 548 test_arena.initialize(); 549 550 auto task_body = [] { 551 tbb::parallel_for(tbb::blocked_range<std::size_t>(0, 1000), [] (tbb::blocked_range<std::size_t>&) { 552 std::atomic<int> sum{}; 553 // Make some work 554 for (std::size_t i = 0; i < 100; ++i) { 555 ++sum; 556 } 557 }); 558 }; 559 using task_type = CountingTask<decltype(task_body)>; 560 561 std::size_t iter_counter = 20; 562 563 std::vector<task_type, tbb::cache_aligned_allocator<task_type>> test_tasks; 564 565 for (std::size_t j = 0; j < task_number; ++j) { 566 test_tasks.emplace_back(task_body, wait); 567 } 568 569 test_arena.execute([&test_tasks, &task_body, &wait, &test_context, &test_arena, iter_counter, task_number] { 570 for (std::size_t i = 0; i < iter_counter; ++i) { 571 572 for (std::size_t j = 0; j < task_number; ++j) { 573 test_arena.enqueue(task_body); 574 } 575 576 for (std::size_t j = 0; j < task_number / 2; ++j) { 577 tbb::detail::d1::spawn(test_tasks[j], test_context); 578 } 579 580 for (std::size_t j = task_number / 2; j < task_number; ++j) { 581 submit(test_tasks[j], test_arena, test_context, true); 582 } 583 584 tbb::detail::d1::wait(wait, test_context); 585 wait.reserve(task_number); 586 } 587 wait.release(task_number); 588 }); 589 590 591 REQUIRE_MESSAGE(task_type::execute_counter() == task_number * iter_counter, "Some task was not executed"); 592 REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled"); 593 } 594 595 //! \brief \ref error_guessing 596 TEST_CASE("All workers sleep") { 597 std::size_t thread_number = utils::get_platform_max_threads(); 598 tbb::concurrent_vector<tbb::task::suspend_point> suspend_points; 599 600 tbb::task_group test_gr; 601 602 utils::SpinBarrier barrier(thread_number); 603 auto resumble_task = [&] { 604 barrier.wait(); 605 tbb::task::suspend([&] (tbb::task::suspend_point sp) { 606 suspend_points.push_back(sp); 607 barrier.wait(); 608 }); 609 }; 610 611 for (std::size_t i = 0; i < thread_number - 1; ++i) { 612 test_gr.run(resumble_task); 613 } 614 615 barrier.wait(); 616 barrier.wait(); 617 TestCPUUserTime(thread_number); 618 619 for (auto sp : suspend_points) 620 tbb::task::resume(sp); 621 test_gr.wait(); 622 } 623 624 //! \brief \ref error_guessing 625 TEST_CASE("External threads sleep") { 626 if (utils::get_platform_max_threads() < 2) return; 627 utils::SpinBarrier barrier(2); 628 629 tbb::task_group test_gr; 630 631 test_gr.run([&] { 632 barrier.wait(); 633 TestCPUUserTime(2); 634 }); 635 636 barrier.wait(); 637 638 test_gr.wait(); 639 } 640 641 #endif // __TBB_RESUMABLE_TASKS 642 643 //! \brief \ref error_guessing 644 TEST_CASE("Enqueue with exception") { 645 std::size_t task_number = 500 * utils::get_platform_max_threads(); 646 647 tbb::task_group_context test_context; 648 tbb::detail::d1::wait_context wait(task_number); 649 650 tbb::task_arena test_arena{int(std::thread::hardware_concurrency() + 1)}; 651 652 test_arena.initialize(); 653 654 auto task_body = [] { 655 std::atomic<int> sum{}; 656 // Make some work 657 for (std::size_t i = 0; i < 100; ++i) { 658 ++sum; 659 } 660 }; 661 662 std::atomic<bool> end_flag{false}; 663 auto check_body = [&end_flag] { 664 end_flag.store(true, std::memory_order_relaxed); 665 }; 666 667 using task_type = CountingTask<decltype(task_body)>; 668 std::vector<task_type, tbb::cache_aligned_allocator<task_type>> test_tasks; 669 670 for (std::size_t j = 0; j < task_number; ++j) { 671 test_tasks.emplace_back(task_body, wait); 672 } 673 674 { 675 tbb::global_control gc(tbb::global_control::max_allowed_parallelism, 1); 676 test_arena.enqueue(task_body); 677 // Initialize implicit arena 678 tbb::parallel_for(0, 1, [] (int) {}); 679 tbb::task_arena test_arena2(tbb::task_arena::attach{}); 680 test_arena2.enqueue(task_body); 681 } 682 683 constexpr std::size_t iter_count = 10; 684 for (std::size_t k = 0; k < iter_count; ++k) { 685 tbb::global_control gc(tbb::global_control::max_allowed_parallelism, 1); 686 test_arena.enqueue(check_body); 687 688 while (!end_flag.load(std::memory_order_relaxed)) ; 689 690 utils::Sleep(1); 691 end_flag.store(false, std::memory_order_relaxed); 692 693 test_arena.execute([&test_tasks, &wait, &test_context, task_number] { 694 for (std::size_t j = 0; j < task_number; ++j) { 695 tbb::detail::d1::spawn(test_tasks[j], test_context); 696 } 697 698 tbb::detail::d1::wait(wait, test_context); 699 wait.reserve(task_number); 700 }); 701 } 702 wait.release(task_number); 703 704 705 REQUIRE_MESSAGE(task_type::execute_counter() == task_number * iter_count, "Some task was not executed"); 706 REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled"); 707 } 708