1 /* 2 Copyright (c) 2005-2020 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "common/test.h" 18 #include "common/utils.h" 19 #include "common/spin_barrier.h" 20 #include "common/utils_concurrency_limit.h" 21 #include "common/cpu_usertime.h" 22 23 #include "tbb/task.h" 24 #include "tbb/task_group.h" 25 #include "tbb/parallel_for.h" 26 #include "tbb/cache_aligned_allocator.h" 27 #include "tbb/global_control.h" 28 #include "tbb/concurrent_vector.h" 29 30 #include <atomic> 31 #include <thread> 32 #include <thread> 33 34 //! \file test_task.cpp 35 //! \brief Test for [internal] functionality 36 37 struct EmptyBody { 38 void operator()() const {} 39 }; 40 41 #if _MSC_VER && !defined(__INTEL_COMPILER) 42 // unreachable code 43 #pragma warning( push ) 44 #pragma warning( disable: 4702 ) 45 #endif 46 47 template <typename Body = EmptyBody> 48 class CountingTask : public tbb::detail::d1::task { 49 public: 50 CountingTask( Body body, tbb::detail::d1::wait_context& wait ) : my_body(body), my_wait(wait) {} 51 52 CountingTask( tbb::detail::d1::wait_context& wait ) : my_wait(wait) {} 53 54 task* execute( tbb::detail::d1::execution_data& ) override { 55 ++my_execute_counter; 56 my_body(); 57 my_wait.release(); 58 return nullptr; 59 } 60 61 task* cancel( tbb::detail::d1::execution_data& ) override { 62 ++my_cancel_counter; 63 my_wait.release(); 64 return nullptr; 65 } 66 67 static void reset() { 68 my_execute_counter = 0; 69 my_cancel_counter = 0; 70 } 71 72 static std::size_t execute_counter() { return my_execute_counter; } 73 static std::size_t cancel_counter() { return my_cancel_counter; } 74 75 private: 76 Body my_body; 77 tbb::detail::d1::wait_context& my_wait; 78 79 static std::atomic<std::size_t> my_execute_counter; 80 static std::atomic<std::size_t> my_cancel_counter; 81 }; // struct CountingTask 82 83 84 #if _MSC_VER && !defined(__INTEL_COMPILER) 85 #pragma warning( pop ) 86 #endif // warning 4702 is back 87 88 template <typename Body> 89 std::atomic<std::size_t> CountingTask<Body>::my_execute_counter(0); 90 91 template <typename Body> 92 std::atomic<std::size_t> CountingTask<Body>::my_cancel_counter(0); 93 94 #if TBB_USE_EXCEPTIONS 95 void test_cancellation_on_exception( bool reset_ctx ) { 96 tbb::detail::d1::wait_context wait(1); 97 tbb::task_group_context test_context; 98 auto throw_body = [] { 99 throw 1; 100 }; 101 CountingTask<decltype(throw_body)> task(throw_body, wait); 102 103 constexpr std::size_t iter_counter = 1000; 104 for (std::size_t i = 0; i < iter_counter; ++i) { 105 try { 106 tbb::detail::d1::execute_and_wait(task, test_context, wait, test_context); 107 } catch(int ex) { 108 REQUIRE(ex == 1); 109 } 110 if (reset_ctx) { 111 test_context.reset(); 112 } 113 wait.reserve(1); 114 } 115 wait.release(1); 116 117 REQUIRE_MESSAGE(task.execute_counter() == (reset_ctx ? iter_counter : 1), "Some task was not executed"); 118 REQUIRE_MESSAGE(task.cancel_counter() == iter_counter, "Some task was not canceled after the exception occurs"); 119 task.reset(); 120 } 121 #endif // TBB_USE_EXCEPTIONS 122 123 //! \brief \ref error_guessing 124 TEST_CASE("External threads sleep") { 125 if (utils::get_platform_max_threads() < 2) return; 126 utils::SpinBarrier barrier(2); 127 128 tbb::task_group test_gr; 129 130 test_gr.run([&] { 131 barrier.wait(); 132 TestCPUUserTime(2); 133 }); 134 135 barrier.wait(); 136 137 test_gr.wait(); 138 } 139 140 //! \brief \ref error_guessing 141 TEST_CASE("Test that task was executed p times") { 142 tbb::detail::d1::wait_context wait(1); 143 tbb::task_group_context test_context; 144 CountingTask<> test_task(wait); 145 146 constexpr std::size_t iter_counter = 10000; 147 for (std::size_t i = 0; i < iter_counter; ++i) { 148 tbb::detail::d1::execute_and_wait(test_task, test_context, wait, test_context); 149 wait.reserve(1); 150 } 151 152 wait.release(1); 153 154 REQUIRE_MESSAGE(CountingTask<>::execute_counter() == iter_counter, "The task was not executed necessary times"); 155 REQUIRE_MESSAGE(CountingTask<>::cancel_counter() == 0, "Some instance of the task was canceled"); 156 CountingTask<>::reset(); 157 } 158 159 #if TBB_USE_EXCEPTIONS 160 //! \brief \ref error_guessing 161 TEST_CASE("Test cancellation on exception") { 162 test_cancellation_on_exception(/*reset_ctx = */true); 163 test_cancellation_on_exception(/*reset_ctx = */false); 164 } 165 #endif // TBB_USE_EXCEPTIONS 166 167 //! \brief \ref error_guessing 168 TEST_CASE("Simple test parallelism usage") { 169 std::size_t threads_num = utils::get_platform_max_threads(); 170 utils::SpinBarrier barrier(threads_num); 171 172 auto barrier_wait = [&barrier] { 173 barrier.wait(); 174 }; 175 176 tbb::detail::d1::wait_context wait(threads_num); 177 tbb::detail::d1::task_group_context test_context; 178 using task_type = CountingTask<decltype(barrier_wait)>; 179 180 std::vector<task_type, tbb::cache_aligned_allocator<task_type>> vector_test_task(threads_num, task_type(barrier_wait, wait)); 181 182 constexpr std::size_t iter_counter = 100; 183 for (std::size_t i = 0; i < iter_counter; ++i) { 184 for (std::size_t j = 0; j < threads_num; ++j) { 185 tbb::detail::d1::spawn(vector_test_task[j], test_context); 186 } 187 tbb::detail::d1::wait(wait, test_context); 188 wait.reserve(threads_num); 189 } 190 wait.release(threads_num); 191 192 REQUIRE_MESSAGE(task_type::execute_counter() == iter_counter * threads_num, "Some task was not executed"); 193 REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled"); 194 task_type::reset(); 195 } 196 197 //! \brief \ref error_guessing 198 TEST_CASE("Test parallelism usage with parallel_for") { 199 std::size_t task_threads_num = utils::get_platform_max_threads(); 200 utils::SpinBarrier barrier(task_threads_num); 201 202 auto barrier_wait = [&barrier] { 203 barrier.wait(); 204 }; 205 206 std::size_t pfor_iter_count = 10000; 207 std::atomic<std::size_t> pfor_counter(0); 208 209 auto parallel_for_func = [&pfor_counter, pfor_iter_count] { 210 tbb::parallel_for(tbb::blocked_range<std::size_t>(0, pfor_iter_count), 211 [&pfor_counter] (tbb::blocked_range<std::size_t>& range) { 212 for (auto it = range.begin(); it != range.end(); ++it) { 213 ++pfor_counter; 214 } 215 } 216 ); 217 }; 218 219 tbb::detail::d1::wait_context wait(task_threads_num); 220 tbb::detail::d1::task_group_context test_context; 221 using task_type = CountingTask<decltype(barrier_wait)>; 222 std::vector<task_type, tbb::cache_aligned_allocator<task_type>> vector_test_task(task_threads_num, task_type(barrier_wait, wait)); 223 224 constexpr std::size_t iter_count = 10; 225 constexpr std::size_t pfor_threads_num = 4; 226 for (std::size_t i = 0; i < iter_count; ++i) { 227 std::vector<std::thread> pfor_threads; 228 229 for (std::size_t j = 0; j < task_threads_num; ++j) { 230 tbb::detail::d1::spawn(vector_test_task[j], test_context); 231 } 232 233 for (std::size_t k = 0; k < pfor_threads_num; ++k) { 234 pfor_threads.emplace_back(parallel_for_func); 235 } 236 237 tbb::detail::d1::wait(wait, test_context); 238 239 for (auto& thread : pfor_threads) { 240 if (thread.joinable()) { 241 thread.join(); 242 } 243 } 244 245 wait.reserve(task_threads_num); 246 } 247 wait.release(task_threads_num); 248 249 REQUIRE_MESSAGE(task_type::execute_counter() == task_threads_num * iter_count, "Some task was not executed"); 250 REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled"); 251 REQUIRE_MESSAGE(pfor_counter == iter_count * pfor_threads_num * pfor_iter_count, "Some parallel_for thread was not finished"); 252 task_type::reset(); 253 } 254 255 //! \brief \ref error_guessing 256 TEST_CASE("Test parallelism usage with spawn tasks in different threads") { 257 std::size_t threads_num = utils::get_platform_max_threads(); 258 utils::SpinBarrier barrier(threads_num); 259 260 auto barrier_wait = [&barrier] { 261 barrier.wait(); 262 }; 263 264 tbb::detail::d1::wait_context wait(threads_num); 265 tbb::detail::d1::task_group_context test_context; 266 using task_type = CountingTask<decltype(barrier_wait)>; 267 std::vector<task_type, tbb::cache_aligned_allocator<task_type>> vector_test_task(threads_num, task_type(barrier_wait, wait)); 268 269 auto thread_func = [&vector_test_task, &test_context] ( std::size_t idx ) { 270 tbb::detail::d1::spawn(vector_test_task[idx], test_context); 271 }; 272 273 constexpr std::size_t iter_count = 10; 274 for (std::size_t i = 0; i < iter_count; ++i) { 275 std::vector<std::thread> threads; 276 277 for (std::size_t k = 0; k < threads_num - 1; ++k) { 278 threads.emplace_back(thread_func, k); 279 } 280 281 for (auto& thread : threads) { 282 if (thread.joinable()) { 283 thread.join(); 284 } 285 } 286 287 tbb::detail::d1::execute_and_wait(vector_test_task[threads_num - 1], test_context, wait, test_context); 288 wait.reserve(threads_num); 289 } 290 wait.release(threads_num); 291 292 REQUIRE_MESSAGE(task_type::execute_counter() == iter_count * threads_num, "Some task was not executed"); 293 REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled"); 294 task_type::reset(); 295 } 296 297 class SpawningTaskBody; 298 299 using SpawningTask = CountingTask<SpawningTaskBody>; 300 301 class SpawningTaskBody { 302 public: 303 using task_pool_type = std::vector<SpawningTask, tbb::cache_aligned_allocator<SpawningTask>>; 304 305 SpawningTaskBody( task_pool_type& task_pool, tbb::task_group_context& test_ctx ) 306 : my_task_pool(task_pool), my_test_ctx(test_ctx) {} 307 308 void operator()() const { 309 std::size_t delta = 7; 310 std::size_t start_idx = my_current_task.fetch_add(delta); 311 312 if (start_idx < my_task_pool.size()) { 313 for (std::size_t idx = start_idx; idx != std::min(my_task_pool.size(), start_idx + delta); ++idx) { 314 tbb::detail::d1::spawn(my_task_pool[idx], my_test_ctx); 315 } 316 } 317 } 318 private: 319 task_pool_type& my_task_pool; 320 tbb::task_group_context& my_test_ctx; 321 static std::atomic<std::size_t> my_current_task; 322 }; // class SpawningTaskBody 323 324 std::atomic<std::size_t> SpawningTaskBody::my_current_task(0); 325 326 //! \brief \ref error_guessing 327 TEST_CASE("Actively adding tasks") { 328 std::size_t task_number = 500 * utils::get_platform_max_threads(); 329 330 tbb::detail::d1::wait_context wait(task_number + 1); 331 tbb::task_group_context test_context; 332 333 SpawningTaskBody::task_pool_type task_pool; 334 335 SpawningTaskBody task_body{task_pool, test_context}; 336 for (std::size_t i = 0; i < task_number; ++i) { 337 task_pool.emplace_back(task_body, wait); 338 } 339 340 SpawningTask first_task(task_body, wait); 341 tbb::detail::d1::execute_and_wait(first_task, test_context, wait, test_context); 342 343 REQUIRE_MESSAGE(SpawningTask::execute_counter() == task_number + 1, "Some tasks were not executed"); // Is it right? 344 REQUIRE_MESSAGE(SpawningTask::cancel_counter() == 0, "Some tasks were canceled"); 345 } 346 347 #if __TBB_RESUMABLE_TASKS 348 349 struct suspended_task : public tbb::detail::d1::task { 350 351 suspended_task(tbb::task::suspend_point tag, tbb::detail::d1::wait_context& wait) 352 : my_suspend_tag(tag), my_wait(wait) 353 {} 354 355 task* execute(tbb::detail::d1::execution_data&) override { 356 tbb::parallel_for(tbb::blocked_range<std::size_t>(0, 100000), 357 [] (const tbb::blocked_range<std::size_t>& range) { 358 // Make some heavy work 359 std::atomic<int> sum{}; 360 for (auto it = range.begin(); it != range.end(); ++it) { 361 ++sum; 362 } 363 }, 364 tbb::static_partitioner{} 365 ); 366 367 my_wait.release(); 368 tbb::task::resume(my_suspend_tag); 369 return nullptr; 370 } 371 372 task* cancel(tbb::detail::d1::execution_data&) override { 373 FAIL("The function should never be called."); 374 return nullptr; 375 } 376 377 tbb::task::suspend_point my_suspend_tag; 378 tbb::detail::d1::wait_context& my_wait; 379 }; 380 381 //! \brief \ref error_guessing 382 TEST_CASE("Isolation + resumable tasks") { 383 std::atomic<int> suspend_flag{}; 384 tbb::task_group_context test_context; 385 386 std::atomic<int> suspend_count{}; 387 std::atomic<int> resume_count{}; 388 389 tbb::parallel_for(tbb::blocked_range<std::size_t>(0, 100000), 390 [&suspend_flag, &test_context, &suspend_count, &resume_count] (const tbb::blocked_range<std::size_t>& range) { 391 int ticket = 0; 392 for (auto it = range.begin(); it != range.end(); ++it) { 393 ticket = suspend_flag++; 394 } 395 396 if (ticket % 5 == 0) { 397 std::vector<suspended_task, tbb::cache_aligned_allocator<suspended_task>> test_task; 398 tbb::detail::d1::wait_context wait(1); 399 ++suspend_count; 400 tbb::this_task_arena::isolate([&wait, &test_context, &test_task] { 401 tbb::task::suspend([&wait, &test_context, &test_task] (tbb::task::suspend_point tag) { 402 test_task.emplace_back(tag, wait); 403 tbb::detail::d1::spawn(test_task[0], test_context); 404 }); 405 } 406 ); 407 tbb::detail::d1::wait(wait, test_context); 408 ++resume_count; 409 } 410 } 411 ); 412 413 CHECK(suspend_count == resume_count); 414 } 415 416 struct bypass_task : public tbb::detail::d1::task { 417 using task_pool_type = std::vector<bypass_task, tbb::cache_aligned_allocator<bypass_task>>; 418 419 bypass_task(tbb::detail::d1::wait_context& wait, task_pool_type& task_pool, 420 std::atomic<bool>& resume_flag, tbb::task::suspend_point& suspend_tag) 421 : my_wait(wait), my_task_pool(task_pool), my_resume_flag(resume_flag), my_suspend_tag(suspend_tag) 422 {} 423 424 task* execute(tbb::detail::d1::execution_data&) override { 425 std::atomic<int> sum{}; 426 427 // Make some heavy work 428 for (std::size_t i = 0; i < 100000; ++i) { 429 ++sum; 430 } 431 432 my_wait.release(); 433 434 if (my_resume_flag.exchange(false)) { 435 tbb::task::resume(my_suspend_tag); 436 } 437 438 std::size_t ticket = my_current_task++; 439 return ticket < my_task_pool.size() ? &my_task_pool[ticket] : nullptr; 440 } 441 442 task* cancel(tbb::detail::d1::execution_data&) override { 443 FAIL("The function should never be called."); 444 return nullptr; 445 } 446 447 tbb::detail::d1::wait_context& my_wait; 448 task_pool_type& my_task_pool; 449 std::atomic<bool>& my_resume_flag; 450 tbb::task::suspend_point& my_suspend_tag; 451 static std::atomic<int> my_current_task; 452 }; 453 454 std::atomic<int> bypass_task::my_current_task(0); 455 456 thread_local int test_tls = 0; 457 458 //! \brief \ref error_guessing 459 TEST_CASE("Bypass suspended by resume") { 460 std::size_t task_number = 500 * utils::get_platform_max_threads(); 461 tbb::task_group_context test_context; 462 tbb::detail::d1::wait_context wait(task_number + 1); 463 464 test_tls = 1; 465 466 std::atomic<bool> resume_flag{false}; 467 tbb::task::suspend_point test_suspend_tag; 468 469 std::vector<bypass_task, tbb::cache_aligned_allocator<bypass_task>> test_task_pool; 470 471 for (std::size_t i = 0; i < task_number; ++i) { 472 test_task_pool.emplace_back(wait, test_task_pool, resume_flag, test_suspend_tag); 473 } 474 475 for (std::size_t i = 0; i < utils::get_platform_max_threads(); ++i) { 476 tbb::detail::d1::spawn(test_task_pool[bypass_task::my_current_task++], test_context); 477 } 478 479 auto suspend_func = [&resume_flag, &test_suspend_tag] { 480 tbb::task::suspend([&resume_flag, &test_suspend_tag] (tbb::task::suspend_point tag) { 481 test_suspend_tag = tag; 482 resume_flag.store(true, std::memory_order_release); 483 }); 484 }; 485 using task_type = CountingTask<decltype(suspend_func)>; 486 task_type suspend_task(suspend_func, wait); 487 488 tbb::detail::d1::execute_and_wait(suspend_task, test_context, wait, test_context); 489 CHECK(bypass_task::my_current_task >= test_task_pool.size()); 490 REQUIRE_MESSAGE(test_tls == 1, "The wrong thread came out"); 491 } 492 493 //! \brief \ref error_guessing 494 TEST_CASE("Critical tasks + resume") { 495 std::size_t task_number = 500 * utils::get_platform_max_threads(); 496 497 tbb::task_group_context test_context; 498 tbb::detail::d1::wait_context wait(task_number); 499 500 tbb::task_arena test_arena; 501 502 test_arena.initialize(); 503 504 std::atomic<bool> resume_flag{}; 505 tbb::task::suspend_point test_suspend_tag; 506 507 auto task_body = [&resume_flag, &test_suspend_tag] { 508 std::atomic<int> sum{}; 509 510 // Make some work 511 for (std::size_t i = 0; i < 1000; ++i) { 512 ++sum; 513 } 514 515 if (resume_flag.exchange(false)) { 516 tbb::task::resume(test_suspend_tag); 517 } 518 }; 519 520 using task_type = CountingTask<decltype(task_body)>; 521 std::vector<task_type, tbb::cache_aligned_allocator<task_type>> test_tasks; 522 523 for (std::size_t i = 0; i < task_number; ++i) { 524 test_tasks.emplace_back(task_body, wait); 525 } 526 527 for (std::size_t i = 0; i < task_number / 2; ++i) { 528 submit(test_tasks[i], test_arena, test_context, true); 529 } 530 531 auto suspend_func = [&resume_flag, &test_suspend_tag] { 532 tbb::task::suspend([&resume_flag, &test_suspend_tag] (tbb::task::suspend_point tag) { 533 test_suspend_tag = tag; 534 resume_flag.store(true, std::memory_order_release); 535 }); 536 }; 537 using suspend_task_type = CountingTask<decltype(suspend_func)>; 538 suspend_task_type suspend_task(suspend_func, wait); 539 540 submit(suspend_task, test_arena, test_context, true); 541 542 test_arena.execute([&test_tasks, &test_arena, &test_context, task_number] { 543 tbb::this_task_arena::isolate([&test_tasks, &test_arena, &test_context, task_number] { 544 tbb::parallel_for(tbb::blocked_range<std::size_t>(task_number / 2, task_number - 1), 545 [&test_tasks, &test_arena, &test_context] (tbb::blocked_range<std::size_t>& range) { 546 for (std::size_t i = range.begin(); i != range.end(); ++i) { 547 submit(test_tasks[i], test_arena, test_context, true); 548 } 549 }); 550 }); 551 }); 552 553 tbb::detail::d1::wait(wait, test_context); 554 } 555 556 //! \brief \ref error_guessing 557 TEST_CASE("Stress testing") { 558 std::size_t task_number = utils::get_platform_max_threads(); 559 560 tbb::task_group_context test_context; 561 tbb::detail::d1::wait_context wait(task_number); 562 563 tbb::task_arena test_arena; 564 565 test_arena.initialize(); 566 567 auto task_body = [] { 568 tbb::parallel_for(tbb::blocked_range<std::size_t>(0, 1000), [] (tbb::blocked_range<std::size_t>&) { 569 std::atomic<int> sum{}; 570 // Make some work 571 for (std::size_t i = 0; i < 100; ++i) { 572 ++sum; 573 } 574 }); 575 }; 576 using task_type = CountingTask<decltype(task_body)>; 577 578 std::size_t iter_counter = 20; 579 580 std::vector<task_type, tbb::cache_aligned_allocator<task_type>> test_tasks; 581 582 for (std::size_t j = 0; j < task_number; ++j) { 583 test_tasks.emplace_back(task_body, wait); 584 } 585 586 test_arena.execute([&test_tasks, &task_body, &wait, &test_context, &test_arena, iter_counter, task_number] { 587 for (std::size_t i = 0; i < iter_counter; ++i) { 588 589 for (std::size_t j = 0; j < task_number; ++j) { 590 test_arena.enqueue(task_body); 591 } 592 593 for (std::size_t j = 0; j < task_number / 2; ++j) { 594 tbb::detail::d1::spawn(test_tasks[j], test_context); 595 } 596 597 for (std::size_t j = task_number / 2; j < task_number; ++j) { 598 submit(test_tasks[j], test_arena, test_context, true); 599 } 600 601 tbb::detail::d1::wait(wait, test_context); 602 wait.reserve(task_number); 603 } 604 wait.release(task_number); 605 }); 606 607 608 REQUIRE_MESSAGE(task_type::execute_counter() == task_number * iter_counter, "Some task was not executed"); 609 REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled"); 610 } 611 612 //! \brief \ref error_guessing 613 TEST_CASE("All workers sleep") { 614 std::size_t thread_number = utils::get_platform_max_threads(); 615 tbb::concurrent_vector<tbb::task::suspend_point> suspend_points; 616 617 tbb::task_group test_gr; 618 619 utils::SpinBarrier barrier(thread_number); 620 auto resumble_task = [&] { 621 barrier.wait(); 622 tbb::task::suspend([&] (tbb::task::suspend_point sp) { 623 suspend_points.push_back(sp); 624 barrier.wait(); 625 }); 626 }; 627 628 for (std::size_t i = 0; i < thread_number - 1; ++i) { 629 test_gr.run(resumble_task); 630 } 631 632 barrier.wait(); 633 barrier.wait(); 634 TestCPUUserTime(thread_number); 635 636 for (auto sp : suspend_points) 637 tbb::task::resume(sp); 638 test_gr.wait(); 639 } 640 641 #endif // __TBB_RESUMABLE_TASKS 642 643 //! \brief \ref error_guessing 644 TEST_CASE("Enqueue with exception") { 645 std::size_t task_number = 500 * utils::get_platform_max_threads(); 646 647 tbb::task_group_context test_context; 648 tbb::detail::d1::wait_context wait(task_number); 649 650 tbb::task_arena test_arena{int(std::thread::hardware_concurrency() + 1)}; 651 652 test_arena.initialize(); 653 654 auto task_body = [] { 655 std::atomic<int> sum{}; 656 // Make some work 657 for (std::size_t i = 0; i < 100; ++i) { 658 ++sum; 659 } 660 }; 661 662 std::atomic<bool> end_flag{false}; 663 auto check_body = [&end_flag] { 664 end_flag.store(true, std::memory_order_relaxed); 665 }; 666 667 using task_type = CountingTask<decltype(task_body)>; 668 std::vector<task_type, tbb::cache_aligned_allocator<task_type>> test_tasks; 669 670 for (std::size_t j = 0; j < task_number; ++j) { 671 test_tasks.emplace_back(task_body, wait); 672 } 673 674 { 675 tbb::global_control gc(tbb::global_control::max_allowed_parallelism, 1); 676 test_arena.enqueue(task_body); 677 // Initialize implicit arena 678 tbb::parallel_for(0, 1, [] (int) {}); 679 tbb::task_arena test_arena2(tbb::task_arena::attach{}); 680 test_arena2.enqueue(task_body); 681 } 682 683 constexpr std::size_t iter_count = 10; 684 for (std::size_t k = 0; k < iter_count; ++k) { 685 tbb::global_control gc(tbb::global_control::max_allowed_parallelism, 1); 686 test_arena.enqueue(check_body); 687 688 while (!end_flag.load(std::memory_order_relaxed)) ; 689 690 utils::Sleep(1); 691 end_flag.store(false, std::memory_order_relaxed); 692 693 test_arena.execute([&test_tasks, &wait, &test_context, task_number] { 694 for (std::size_t j = 0; j < task_number; ++j) { 695 tbb::detail::d1::spawn(test_tasks[j], test_context); 696 } 697 698 tbb::detail::d1::wait(wait, test_context); 699 wait.reserve(task_number); 700 }); 701 } 702 wait.release(task_number); 703 704 705 REQUIRE_MESSAGE(task_type::execute_counter() == task_number * iter_count, "Some task was not executed"); 706 REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled"); 707 } 708