xref: /oneTBB/test/tbb/test_task.cpp (revision 546fe273)
151c0b2f7Stbbdev /*
2a088cfa0SKonstantin Boyarinov     Copyright (c) 2005-2023 Intel Corporation
351c0b2f7Stbbdev 
451c0b2f7Stbbdev     Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev     you may not use this file except in compliance with the License.
651c0b2f7Stbbdev     You may obtain a copy of the License at
751c0b2f7Stbbdev 
851c0b2f7Stbbdev         http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev 
1051c0b2f7Stbbdev     Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev     distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev     See the License for the specific language governing permissions and
1451c0b2f7Stbbdev     limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev 
1751c0b2f7Stbbdev #include "common/test.h"
1851c0b2f7Stbbdev #include "common/utils.h"
194523a761Stbbdev #include "common/dummy_body.h"
2051c0b2f7Stbbdev #include "common/spin_barrier.h"
2151c0b2f7Stbbdev #include "common/utils_concurrency_limit.h"
2251c0b2f7Stbbdev #include "common/cpu_usertime.h"
2351c0b2f7Stbbdev 
2451c0b2f7Stbbdev #include "tbb/task.h"
2551c0b2f7Stbbdev #include "tbb/task_group.h"
2651c0b2f7Stbbdev #include "tbb/parallel_for.h"
2751c0b2f7Stbbdev #include "tbb/cache_aligned_allocator.h"
2851c0b2f7Stbbdev #include "tbb/global_control.h"
2951c0b2f7Stbbdev #include "tbb/concurrent_vector.h"
3051c0b2f7Stbbdev 
3151c0b2f7Stbbdev #include <atomic>
3251c0b2f7Stbbdev #include <thread>
3351c0b2f7Stbbdev #include <thread>
3451c0b2f7Stbbdev 
3551c0b2f7Stbbdev //! \file test_task.cpp
3651c0b2f7Stbbdev //! \brief Test for [internal] functionality
3751c0b2f7Stbbdev struct EmptyBody {
operator ()EmptyBody3851c0b2f7Stbbdev     void operator()() const {}
3951c0b2f7Stbbdev };
4051c0b2f7Stbbdev 
4151c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER)
4251c0b2f7Stbbdev // unreachable code
4351c0b2f7Stbbdev #pragma warning( push )
4451c0b2f7Stbbdev #pragma warning( disable: 4702 )
4551c0b2f7Stbbdev #endif
4651c0b2f7Stbbdev 
4751c0b2f7Stbbdev template <typename Body = EmptyBody>
4851c0b2f7Stbbdev class CountingTask : public tbb::detail::d1::task {
4951c0b2f7Stbbdev public:
CountingTask(Body body,tbb::detail::d1::wait_context & wait)5051c0b2f7Stbbdev     CountingTask( Body body, tbb::detail::d1::wait_context& wait ) : my_body(body), my_wait(wait) {}
5151c0b2f7Stbbdev 
CountingTask(tbb::detail::d1::wait_context & wait)5251c0b2f7Stbbdev     CountingTask( tbb::detail::d1::wait_context& wait ) : my_wait(wait) {}
5351c0b2f7Stbbdev 
execute(tbb::detail::d1::execution_data &)5451c0b2f7Stbbdev     task* execute( tbb::detail::d1::execution_data& ) override {
5551c0b2f7Stbbdev         ++my_execute_counter;
5651c0b2f7Stbbdev         my_body();
5751c0b2f7Stbbdev         my_wait.release();
5851c0b2f7Stbbdev         return nullptr;
5951c0b2f7Stbbdev     }
6051c0b2f7Stbbdev 
cancel(tbb::detail::d1::execution_data &)6151c0b2f7Stbbdev     task* cancel( tbb::detail::d1::execution_data& ) override {
6251c0b2f7Stbbdev         ++my_cancel_counter;
6351c0b2f7Stbbdev         my_wait.release();
6451c0b2f7Stbbdev         return nullptr;
6551c0b2f7Stbbdev     }
6651c0b2f7Stbbdev 
reset()6751c0b2f7Stbbdev     static void reset() {
6851c0b2f7Stbbdev         my_execute_counter = 0;
6951c0b2f7Stbbdev         my_cancel_counter = 0;
7051c0b2f7Stbbdev     }
7151c0b2f7Stbbdev 
execute_counter()7251c0b2f7Stbbdev     static std::size_t execute_counter() { return my_execute_counter; }
cancel_counter()7351c0b2f7Stbbdev     static std::size_t cancel_counter() { return my_cancel_counter; }
7451c0b2f7Stbbdev 
7551c0b2f7Stbbdev private:
7651c0b2f7Stbbdev     Body my_body;
7751c0b2f7Stbbdev     tbb::detail::d1::wait_context& my_wait;
7851c0b2f7Stbbdev 
7951c0b2f7Stbbdev     static std::atomic<std::size_t> my_execute_counter;
8051c0b2f7Stbbdev     static std::atomic<std::size_t> my_cancel_counter;
8151c0b2f7Stbbdev }; // struct CountingTask
8251c0b2f7Stbbdev 
8351c0b2f7Stbbdev 
8451c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER)
8551c0b2f7Stbbdev #pragma warning( pop )
8651c0b2f7Stbbdev #endif // warning 4702 is back
8751c0b2f7Stbbdev 
8851c0b2f7Stbbdev template <typename Body>
8951c0b2f7Stbbdev std::atomic<std::size_t> CountingTask<Body>::my_execute_counter(0);
9051c0b2f7Stbbdev 
9151c0b2f7Stbbdev template <typename Body>
9251c0b2f7Stbbdev std::atomic<std::size_t> CountingTask<Body>::my_cancel_counter(0);
9351c0b2f7Stbbdev 
9451c0b2f7Stbbdev #if TBB_USE_EXCEPTIONS
test_cancellation_on_exception(bool reset_ctx)9551c0b2f7Stbbdev void test_cancellation_on_exception( bool reset_ctx ) {
9651c0b2f7Stbbdev     tbb::detail::d1::wait_context wait(1);
9751c0b2f7Stbbdev     tbb::task_group_context test_context;
9851c0b2f7Stbbdev     auto throw_body = [] {
9951c0b2f7Stbbdev         throw 1;
10051c0b2f7Stbbdev     };
10151c0b2f7Stbbdev     CountingTask<decltype(throw_body)> task(throw_body, wait);
10251c0b2f7Stbbdev 
10351c0b2f7Stbbdev     constexpr std::size_t iter_counter = 1000;
10451c0b2f7Stbbdev     for (std::size_t i = 0; i < iter_counter; ++i) {
10551c0b2f7Stbbdev         try {
10651c0b2f7Stbbdev             tbb::detail::d1::execute_and_wait(task, test_context, wait, test_context);
10751c0b2f7Stbbdev         } catch(int ex) {
10851c0b2f7Stbbdev             REQUIRE(ex == 1);
10951c0b2f7Stbbdev         }
11051c0b2f7Stbbdev         if (reset_ctx) {
11151c0b2f7Stbbdev             test_context.reset();
11251c0b2f7Stbbdev         }
11351c0b2f7Stbbdev         wait.reserve(1);
11451c0b2f7Stbbdev     }
11551c0b2f7Stbbdev     wait.release(1);
11651c0b2f7Stbbdev 
11751c0b2f7Stbbdev     REQUIRE_MESSAGE(task.execute_counter() == (reset_ctx ? iter_counter : 1), "Some task was not executed");
11851c0b2f7Stbbdev     REQUIRE_MESSAGE(task.cancel_counter() == iter_counter, "Some task was not canceled after the exception occurs");
11951c0b2f7Stbbdev     task.reset();
12051c0b2f7Stbbdev }
12151c0b2f7Stbbdev #endif // TBB_USE_EXCEPTIONS
12251c0b2f7Stbbdev 
12351c0b2f7Stbbdev //! \brief \ref error_guessing
12449e08aacStbbdev TEST_CASE("External threads sleep") {
12549e08aacStbbdev     if (utils::get_platform_max_threads() < 2) return;
12649e08aacStbbdev     utils::SpinBarrier barrier(2);
12749e08aacStbbdev 
12849e08aacStbbdev     tbb::task_group test_gr;
12949e08aacStbbdev 
__anon509d20bd0202null13049e08aacStbbdev     test_gr.run([&] {
13149e08aacStbbdev         barrier.wait();
13249e08aacStbbdev         TestCPUUserTime(2);
13349e08aacStbbdev     });
13449e08aacStbbdev 
13549e08aacStbbdev     barrier.wait();
13649e08aacStbbdev 
13749e08aacStbbdev     test_gr.wait();
13849e08aacStbbdev }
13949e08aacStbbdev 
14049e08aacStbbdev //! \brief \ref error_guessing
14151c0b2f7Stbbdev TEST_CASE("Test that task was executed p times") {
14251c0b2f7Stbbdev     tbb::detail::d1::wait_context wait(1);
14351c0b2f7Stbbdev     tbb::task_group_context test_context;
14451c0b2f7Stbbdev     CountingTask<> test_task(wait);
14551c0b2f7Stbbdev 
14651c0b2f7Stbbdev     constexpr std::size_t iter_counter = 10000;
14751c0b2f7Stbbdev     for (std::size_t i = 0; i < iter_counter; ++i) {
14851c0b2f7Stbbdev         tbb::detail::d1::execute_and_wait(test_task, test_context, wait, test_context);
14951c0b2f7Stbbdev         wait.reserve(1);
15051c0b2f7Stbbdev     }
15151c0b2f7Stbbdev 
15251c0b2f7Stbbdev     wait.release(1);
15351c0b2f7Stbbdev 
15451c0b2f7Stbbdev     REQUIRE_MESSAGE(CountingTask<>::execute_counter() == iter_counter, "The task was not executed necessary times");
15551c0b2f7Stbbdev     REQUIRE_MESSAGE(CountingTask<>::cancel_counter() == 0, "Some instance of the task was canceled");
15651c0b2f7Stbbdev     CountingTask<>::reset();
15751c0b2f7Stbbdev }
15851c0b2f7Stbbdev 
15951c0b2f7Stbbdev #if TBB_USE_EXCEPTIONS
16051c0b2f7Stbbdev //! \brief \ref error_guessing
16151c0b2f7Stbbdev TEST_CASE("Test cancellation on exception") {
16251c0b2f7Stbbdev     test_cancellation_on_exception(/*reset_ctx = */true);
16351c0b2f7Stbbdev     test_cancellation_on_exception(/*reset_ctx = */false);
16451c0b2f7Stbbdev }
16551c0b2f7Stbbdev #endif // TBB_USE_EXCEPTIONS
16651c0b2f7Stbbdev 
16751c0b2f7Stbbdev //! \brief \ref error_guessing
16851c0b2f7Stbbdev TEST_CASE("Simple test parallelism usage") {
16955f9b178SIvan Kochin     std::uint32_t threads_num = static_cast<std::uint32_t>(utils::get_platform_max_threads());
17051c0b2f7Stbbdev     utils::SpinBarrier barrier(threads_num);
17151c0b2f7Stbbdev 
__anon509d20bd0302null17251c0b2f7Stbbdev     auto barrier_wait = [&barrier] {
17351c0b2f7Stbbdev         barrier.wait();
17451c0b2f7Stbbdev     };
17551c0b2f7Stbbdev 
17651c0b2f7Stbbdev     tbb::detail::d1::wait_context wait(threads_num);
17751c0b2f7Stbbdev     tbb::detail::d1::task_group_context test_context;
17851c0b2f7Stbbdev     using task_type = CountingTask<decltype(barrier_wait)>;
17951c0b2f7Stbbdev 
18051c0b2f7Stbbdev     std::vector<task_type, tbb::cache_aligned_allocator<task_type>> vector_test_task(threads_num, task_type(barrier_wait, wait));
18151c0b2f7Stbbdev 
18251c0b2f7Stbbdev     constexpr std::size_t iter_counter = 100;
18351c0b2f7Stbbdev     for (std::size_t i = 0; i < iter_counter; ++i) {
18451c0b2f7Stbbdev         for (std::size_t j = 0; j < threads_num; ++j) {
18551c0b2f7Stbbdev             tbb::detail::d1::spawn(vector_test_task[j], test_context);
18651c0b2f7Stbbdev         }
18751c0b2f7Stbbdev         tbb::detail::d1::wait(wait, test_context);
18851c0b2f7Stbbdev         wait.reserve(threads_num);
18951c0b2f7Stbbdev     }
19051c0b2f7Stbbdev     wait.release(threads_num);
19151c0b2f7Stbbdev 
19251c0b2f7Stbbdev     REQUIRE_MESSAGE(task_type::execute_counter() == iter_counter * threads_num, "Some task was not executed");
19351c0b2f7Stbbdev     REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled");
19451c0b2f7Stbbdev     task_type::reset();
19551c0b2f7Stbbdev }
19651c0b2f7Stbbdev 
19751c0b2f7Stbbdev //! \brief \ref error_guessing
19851c0b2f7Stbbdev TEST_CASE("Test parallelism usage with parallel_for") {
19955f9b178SIvan Kochin     std::uint32_t task_threads_num = static_cast<std::uint32_t>(utils::get_platform_max_threads());
20051c0b2f7Stbbdev     utils::SpinBarrier barrier(task_threads_num);
20151c0b2f7Stbbdev 
__anon509d20bd0402null20251c0b2f7Stbbdev     auto barrier_wait = [&barrier] {
20351c0b2f7Stbbdev         barrier.wait();
20451c0b2f7Stbbdev     };
20551c0b2f7Stbbdev 
20651c0b2f7Stbbdev     std::size_t pfor_iter_count = 10000;
20751c0b2f7Stbbdev     std::atomic<std::size_t> pfor_counter(0);
20851c0b2f7Stbbdev 
__anon509d20bd0502null20951c0b2f7Stbbdev     auto parallel_for_func = [&pfor_counter, pfor_iter_count] {
21051c0b2f7Stbbdev         tbb::parallel_for(tbb::blocked_range<std::size_t>(0, pfor_iter_count),
21151c0b2f7Stbbdev                           [&pfor_counter] (tbb::blocked_range<std::size_t>& range) {
21251c0b2f7Stbbdev                               for (auto it = range.begin(); it != range.end(); ++it) {
21351c0b2f7Stbbdev                                   ++pfor_counter;
21451c0b2f7Stbbdev                               }
21551c0b2f7Stbbdev                            }
21651c0b2f7Stbbdev         );
21751c0b2f7Stbbdev     };
21851c0b2f7Stbbdev 
21951c0b2f7Stbbdev     tbb::detail::d1::wait_context wait(task_threads_num);
22051c0b2f7Stbbdev     tbb::detail::d1::task_group_context test_context;
22151c0b2f7Stbbdev     using task_type = CountingTask<decltype(barrier_wait)>;
22251c0b2f7Stbbdev     std::vector<task_type, tbb::cache_aligned_allocator<task_type>> vector_test_task(task_threads_num, task_type(barrier_wait, wait));
22351c0b2f7Stbbdev 
22451c0b2f7Stbbdev     constexpr std::size_t iter_count = 10;
22551c0b2f7Stbbdev     constexpr std::size_t pfor_threads_num = 4;
22651c0b2f7Stbbdev     for (std::size_t i = 0; i < iter_count; ++i) {
22751c0b2f7Stbbdev         std::vector<std::thread> pfor_threads;
22851c0b2f7Stbbdev 
22951c0b2f7Stbbdev         for (std::size_t j = 0; j < task_threads_num; ++j) {
23051c0b2f7Stbbdev             tbb::detail::d1::spawn(vector_test_task[j], test_context);
23151c0b2f7Stbbdev         }
23251c0b2f7Stbbdev 
23351c0b2f7Stbbdev         for (std::size_t k = 0; k < pfor_threads_num; ++k) {
23451c0b2f7Stbbdev             pfor_threads.emplace_back(parallel_for_func);
23551c0b2f7Stbbdev         }
23651c0b2f7Stbbdev 
23751c0b2f7Stbbdev         tbb::detail::d1::wait(wait, test_context);
23851c0b2f7Stbbdev 
23951c0b2f7Stbbdev         for (auto& thread : pfor_threads) {
24051c0b2f7Stbbdev             if (thread.joinable()) {
24151c0b2f7Stbbdev                 thread.join();
24251c0b2f7Stbbdev             }
24351c0b2f7Stbbdev         }
24451c0b2f7Stbbdev 
24551c0b2f7Stbbdev         wait.reserve(task_threads_num);
24651c0b2f7Stbbdev     }
24751c0b2f7Stbbdev     wait.release(task_threads_num);
24851c0b2f7Stbbdev 
24951c0b2f7Stbbdev     REQUIRE_MESSAGE(task_type::execute_counter() == task_threads_num * iter_count, "Some task was not executed");
25051c0b2f7Stbbdev     REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled");
25151c0b2f7Stbbdev     REQUIRE_MESSAGE(pfor_counter == iter_count * pfor_threads_num * pfor_iter_count, "Some parallel_for thread was not finished");
25251c0b2f7Stbbdev     task_type::reset();
25351c0b2f7Stbbdev }
25451c0b2f7Stbbdev 
25551c0b2f7Stbbdev //! \brief \ref error_guessing
25651c0b2f7Stbbdev TEST_CASE("Test parallelism usage with spawn tasks in different threads") {
25755f9b178SIvan Kochin     std::uint32_t threads_num = static_cast<std::uint32_t>(utils::get_platform_max_threads());
25851c0b2f7Stbbdev     utils::SpinBarrier barrier(threads_num);
25951c0b2f7Stbbdev 
__anon509d20bd0702null26051c0b2f7Stbbdev     auto barrier_wait = [&barrier] {
26151c0b2f7Stbbdev         barrier.wait();
26251c0b2f7Stbbdev     };
26351c0b2f7Stbbdev 
26451c0b2f7Stbbdev     tbb::detail::d1::wait_context wait(threads_num);
26551c0b2f7Stbbdev     tbb::detail::d1::task_group_context test_context;
26651c0b2f7Stbbdev     using task_type = CountingTask<decltype(barrier_wait)>;
26751c0b2f7Stbbdev     std::vector<task_type, tbb::cache_aligned_allocator<task_type>> vector_test_task(threads_num, task_type(barrier_wait, wait));
26851c0b2f7Stbbdev 
__anon509d20bd0802( std::size_t idx ) 26951c0b2f7Stbbdev     auto thread_func = [&vector_test_task, &test_context] ( std::size_t idx ) {
27051c0b2f7Stbbdev         tbb::detail::d1::spawn(vector_test_task[idx], test_context);
27151c0b2f7Stbbdev     };
27251c0b2f7Stbbdev 
27351c0b2f7Stbbdev     constexpr std::size_t iter_count = 10;
27451c0b2f7Stbbdev     for (std::size_t i = 0; i < iter_count; ++i) {
27551c0b2f7Stbbdev         std::vector<std::thread> threads;
27651c0b2f7Stbbdev 
27751c0b2f7Stbbdev         for (std::size_t k = 0; k < threads_num - 1; ++k) {
27851c0b2f7Stbbdev             threads.emplace_back(thread_func, k);
27951c0b2f7Stbbdev         }
28051c0b2f7Stbbdev 
28151c0b2f7Stbbdev         for (auto& thread : threads) {
28251c0b2f7Stbbdev             if (thread.joinable()) {
28351c0b2f7Stbbdev                 thread.join();
28451c0b2f7Stbbdev             }
28551c0b2f7Stbbdev         }
28651c0b2f7Stbbdev 
28751c0b2f7Stbbdev         tbb::detail::d1::execute_and_wait(vector_test_task[threads_num - 1], test_context, wait, test_context);
28851c0b2f7Stbbdev         wait.reserve(threads_num);
28951c0b2f7Stbbdev     }
29051c0b2f7Stbbdev     wait.release(threads_num);
29151c0b2f7Stbbdev 
29251c0b2f7Stbbdev     REQUIRE_MESSAGE(task_type::execute_counter() == iter_count * threads_num, "Some task was not executed");
29351c0b2f7Stbbdev     REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled");
29451c0b2f7Stbbdev     task_type::reset();
29551c0b2f7Stbbdev }
29651c0b2f7Stbbdev 
29751c0b2f7Stbbdev class SpawningTaskBody;
29851c0b2f7Stbbdev 
29951c0b2f7Stbbdev using SpawningTask = CountingTask<SpawningTaskBody>;
30051c0b2f7Stbbdev 
30151c0b2f7Stbbdev class SpawningTaskBody {
30251c0b2f7Stbbdev public:
30351c0b2f7Stbbdev     using task_pool_type = std::vector<SpawningTask, tbb::cache_aligned_allocator<SpawningTask>>;
30451c0b2f7Stbbdev 
SpawningTaskBody(task_pool_type & task_pool,tbb::task_group_context & test_ctx)30551c0b2f7Stbbdev     SpawningTaskBody( task_pool_type& task_pool, tbb::task_group_context& test_ctx )
30651c0b2f7Stbbdev         : my_task_pool(task_pool), my_test_ctx(test_ctx) {}
30751c0b2f7Stbbdev 
operator ()() const30851c0b2f7Stbbdev     void operator()() const {
30951c0b2f7Stbbdev         std::size_t delta = 7;
31051c0b2f7Stbbdev         std::size_t start_idx = my_current_task.fetch_add(delta);
31151c0b2f7Stbbdev 
31251c0b2f7Stbbdev         if (start_idx < my_task_pool.size()) {
31351c0b2f7Stbbdev             for (std::size_t idx = start_idx; idx != std::min(my_task_pool.size(), start_idx + delta); ++idx) {
31451c0b2f7Stbbdev                 tbb::detail::d1::spawn(my_task_pool[idx], my_test_ctx);
31551c0b2f7Stbbdev             }
31651c0b2f7Stbbdev         }
31751c0b2f7Stbbdev     }
31851c0b2f7Stbbdev private:
31951c0b2f7Stbbdev     task_pool_type& my_task_pool;
32051c0b2f7Stbbdev     tbb::task_group_context& my_test_ctx;
32151c0b2f7Stbbdev     static std::atomic<std::size_t> my_current_task;
32251c0b2f7Stbbdev }; // class SpawningTaskBody
32351c0b2f7Stbbdev 
32451c0b2f7Stbbdev std::atomic<std::size_t> SpawningTaskBody::my_current_task(0);
32551c0b2f7Stbbdev 
32651c0b2f7Stbbdev //! \brief \ref error_guessing
32751c0b2f7Stbbdev TEST_CASE("Actively adding tasks") {
32855f9b178SIvan Kochin     std::uint32_t task_number = 500 * static_cast<std::uint32_t>(utils::get_platform_max_threads());
32951c0b2f7Stbbdev 
33051c0b2f7Stbbdev     tbb::detail::d1::wait_context wait(task_number + 1);
33151c0b2f7Stbbdev     tbb::task_group_context test_context;
33251c0b2f7Stbbdev 
33351c0b2f7Stbbdev     SpawningTaskBody::task_pool_type task_pool;
33451c0b2f7Stbbdev 
33551c0b2f7Stbbdev     SpawningTaskBody task_body{task_pool, test_context};
33651c0b2f7Stbbdev     for (std::size_t i = 0; i < task_number; ++i) {
33751c0b2f7Stbbdev         task_pool.emplace_back(task_body, wait);
33851c0b2f7Stbbdev     }
33951c0b2f7Stbbdev 
34051c0b2f7Stbbdev     SpawningTask first_task(task_body, wait);
34151c0b2f7Stbbdev     tbb::detail::d1::execute_and_wait(first_task, test_context, wait, test_context);
34251c0b2f7Stbbdev 
34351c0b2f7Stbbdev     REQUIRE_MESSAGE(SpawningTask::execute_counter() == task_number + 1, "Some tasks were not executed"); // Is it right?
34451c0b2f7Stbbdev     REQUIRE_MESSAGE(SpawningTask::cancel_counter() == 0, "Some tasks were canceled");
34551c0b2f7Stbbdev }
34651c0b2f7Stbbdev 
34751c0b2f7Stbbdev #if __TBB_RESUMABLE_TASKS
34851c0b2f7Stbbdev struct suspended_task : public tbb::detail::d1::task {
34951c0b2f7Stbbdev 
suspended_tasksuspended_task35051c0b2f7Stbbdev     suspended_task(tbb::task::suspend_point tag, tbb::detail::d1::wait_context& wait)
35151c0b2f7Stbbdev         : my_suspend_tag(tag), my_wait(wait)
35251c0b2f7Stbbdev     {}
35351c0b2f7Stbbdev 
executesuspended_task35451c0b2f7Stbbdev     task* execute(tbb::detail::d1::execution_data&) override {
35551c0b2f7Stbbdev         tbb::parallel_for(tbb::blocked_range<std::size_t>(0, 100000),
35651c0b2f7Stbbdev             [] (const tbb::blocked_range<std::size_t>& range) {
35751c0b2f7Stbbdev                 // Make some heavy work
35851c0b2f7Stbbdev                 std::atomic<int> sum{};
35951c0b2f7Stbbdev                 for (auto it = range.begin(); it != range.end(); ++it) {
36051c0b2f7Stbbdev                     ++sum;
36151c0b2f7Stbbdev                 }
36251c0b2f7Stbbdev             },
36351c0b2f7Stbbdev             tbb::static_partitioner{}
36451c0b2f7Stbbdev         );
36551c0b2f7Stbbdev 
36651c0b2f7Stbbdev         my_wait.release();
36751c0b2f7Stbbdev         tbb::task::resume(my_suspend_tag);
36851c0b2f7Stbbdev         return nullptr;
36951c0b2f7Stbbdev     }
37051c0b2f7Stbbdev 
cancelsuspended_task37151c0b2f7Stbbdev     task* cancel(tbb::detail::d1::execution_data&) override {
37251c0b2f7Stbbdev         FAIL("The function should never be called.");
37351c0b2f7Stbbdev         return nullptr;
37451c0b2f7Stbbdev     }
37551c0b2f7Stbbdev 
37651c0b2f7Stbbdev     tbb::task::suspend_point my_suspend_tag;
37751c0b2f7Stbbdev     tbb::detail::d1::wait_context& my_wait;
37851c0b2f7Stbbdev };
37951c0b2f7Stbbdev 
38051c0b2f7Stbbdev //! \brief \ref error_guessing
38151c0b2f7Stbbdev TEST_CASE("Isolation + resumable tasks") {
38251c0b2f7Stbbdev     std::atomic<int> suspend_flag{};
38351c0b2f7Stbbdev     tbb::task_group_context test_context;
38451c0b2f7Stbbdev 
38551c0b2f7Stbbdev     std::atomic<int> suspend_count{};
38651c0b2f7Stbbdev     std::atomic<int> resume_count{};
38751c0b2f7Stbbdev 
38851c0b2f7Stbbdev     tbb::parallel_for(tbb::blocked_range<std::size_t>(0, 100000),
__anon509d20bd0b02(const tbb::blocked_range<std::size_t>& range) 38951c0b2f7Stbbdev         [&suspend_flag, &test_context, &suspend_count, &resume_count] (const tbb::blocked_range<std::size_t>& range) {
39051c0b2f7Stbbdev             int ticket = 0;
39151c0b2f7Stbbdev             for (auto it = range.begin(); it != range.end(); ++it) {
39251c0b2f7Stbbdev                 ticket = suspend_flag++;
39351c0b2f7Stbbdev             }
39451c0b2f7Stbbdev 
39551c0b2f7Stbbdev             if (ticket % 5 == 0) {
39651c0b2f7Stbbdev                 std::vector<suspended_task, tbb::cache_aligned_allocator<suspended_task>> test_task;
39751c0b2f7Stbbdev                 tbb::detail::d1::wait_context wait(1);
39851c0b2f7Stbbdev                 ++suspend_count;
39951c0b2f7Stbbdev                 tbb::this_task_arena::isolate([&wait, &test_context, &test_task] {
400e77098d6SPavel Kumbrasev                     auto thread_id = std::this_thread::get_id();
401e77098d6SPavel Kumbrasev                     tbb::task::suspend([&wait, &test_context, &test_task, thread_id] (tbb::task::suspend_point tag) {
402e77098d6SPavel Kumbrasev                         CHECK(thread_id == std::this_thread::get_id());
40351c0b2f7Stbbdev                         test_task.emplace_back(tag, wait);
40451c0b2f7Stbbdev                         tbb::detail::d1::spawn(test_task[0], test_context);
40551c0b2f7Stbbdev                     });
40651c0b2f7Stbbdev                     }
40751c0b2f7Stbbdev                 );
40851c0b2f7Stbbdev                 tbb::detail::d1::wait(wait, test_context);
40951c0b2f7Stbbdev                 ++resume_count;
41051c0b2f7Stbbdev             }
41151c0b2f7Stbbdev         }
41251c0b2f7Stbbdev     );
41351c0b2f7Stbbdev 
41451c0b2f7Stbbdev     CHECK(suspend_count == resume_count);
41551c0b2f7Stbbdev }
41651c0b2f7Stbbdev 
41751c0b2f7Stbbdev struct bypass_task : public tbb::detail::d1::task {
41851c0b2f7Stbbdev     using task_pool_type = std::vector<bypass_task, tbb::cache_aligned_allocator<bypass_task>>;
41951c0b2f7Stbbdev 
bypass_taskbypass_task42051c0b2f7Stbbdev     bypass_task(tbb::detail::d1::wait_context& wait, task_pool_type& task_pool,
4214523a761Stbbdev                 std::atomic<int>& resume_flag, tbb::task::suspend_point& suspend_tag)
42251c0b2f7Stbbdev         : my_wait(wait), my_task_pool(task_pool), my_resume_flag(resume_flag), my_suspend_tag(suspend_tag)
42351c0b2f7Stbbdev     {}
42451c0b2f7Stbbdev 
executebypass_task42551c0b2f7Stbbdev     task* execute(tbb::detail::d1::execution_data&) override {
4264523a761Stbbdev         utils::doDummyWork(10000);
42751c0b2f7Stbbdev 
4284523a761Stbbdev         int expected = 1;
4294523a761Stbbdev         if (my_resume_flag.compare_exchange_strong(expected, 2)) {
43051c0b2f7Stbbdev             tbb::task::resume(my_suspend_tag);
43151c0b2f7Stbbdev         }
43251c0b2f7Stbbdev 
43351c0b2f7Stbbdev         std::size_t ticket = my_current_task++;
4344523a761Stbbdev         task* next = ticket < my_task_pool.size() ? &my_task_pool[ticket] : nullptr;
4354523a761Stbbdev 
4364523a761Stbbdev         if (!next && my_resume_flag != 2) {
4374523a761Stbbdev             // Rarely all tasks can be executed before the suspend.
4384523a761Stbbdev             // So, wait for the suspend before leaving.
4394523a761Stbbdev             utils::SpinWaitWhileEq(my_resume_flag, 0);
4404523a761Stbbdev             expected = 1;
4414523a761Stbbdev             if (my_resume_flag.compare_exchange_strong(expected, 2)) {
4424523a761Stbbdev                 tbb::task::resume(my_suspend_tag);
4434523a761Stbbdev             }
4444523a761Stbbdev         }
4454523a761Stbbdev 
4464523a761Stbbdev         my_wait.release();
4474523a761Stbbdev         return next;
44851c0b2f7Stbbdev     }
44951c0b2f7Stbbdev 
cancelbypass_task45051c0b2f7Stbbdev     task* cancel(tbb::detail::d1::execution_data&) override {
45151c0b2f7Stbbdev         FAIL("The function should never be called.");
45251c0b2f7Stbbdev         return nullptr;
45351c0b2f7Stbbdev     }
45451c0b2f7Stbbdev 
45551c0b2f7Stbbdev     tbb::detail::d1::wait_context& my_wait;
45651c0b2f7Stbbdev     task_pool_type& my_task_pool;
4574523a761Stbbdev     std::atomic<int>& my_resume_flag;
45851c0b2f7Stbbdev     tbb::task::suspend_point& my_suspend_tag;
45951c0b2f7Stbbdev     static std::atomic<int> my_current_task;
46051c0b2f7Stbbdev };
46151c0b2f7Stbbdev 
46251c0b2f7Stbbdev std::atomic<int> bypass_task::my_current_task(0);
46351c0b2f7Stbbdev 
46451c0b2f7Stbbdev thread_local int test_tls = 0;
46551c0b2f7Stbbdev 
46651c0b2f7Stbbdev //! \brief \ref error_guessing
46751c0b2f7Stbbdev TEST_CASE("Bypass suspended by resume") {
46855f9b178SIvan Kochin     std::uint32_t task_number = 500 * static_cast<std::uint32_t>(utils::get_platform_max_threads());
46951c0b2f7Stbbdev     tbb::task_group_context test_context;
47051c0b2f7Stbbdev     tbb::detail::d1::wait_context wait(task_number + 1);
47151c0b2f7Stbbdev 
47251c0b2f7Stbbdev     test_tls = 1;
47351c0b2f7Stbbdev 
4744523a761Stbbdev     std::atomic<int> resume_flag{0};
47551c0b2f7Stbbdev     tbb::task::suspend_point test_suspend_tag;
47651c0b2f7Stbbdev 
47751c0b2f7Stbbdev     std::vector<bypass_task, tbb::cache_aligned_allocator<bypass_task>> test_task_pool;
47851c0b2f7Stbbdev 
47951c0b2f7Stbbdev     for (std::size_t i = 0; i < task_number; ++i) {
48051c0b2f7Stbbdev         test_task_pool.emplace_back(wait, test_task_pool, resume_flag, test_suspend_tag);
48151c0b2f7Stbbdev     }
48251c0b2f7Stbbdev 
48351c0b2f7Stbbdev     for (std::size_t i = 0; i < utils::get_platform_max_threads(); ++i) {
4844523a761Stbbdev         std::size_t ticket = bypass_task::my_current_task++;
4854523a761Stbbdev         if (ticket < test_task_pool.size()) {
4864523a761Stbbdev             tbb::detail::d1::spawn(test_task_pool[ticket], test_context);
4874523a761Stbbdev         }
48851c0b2f7Stbbdev     }
48951c0b2f7Stbbdev 
__anon509d20bd0e02null49051c0b2f7Stbbdev     auto suspend_func = [&resume_flag, &test_suspend_tag] {
491e77098d6SPavel Kumbrasev         auto thread_id = std::this_thread::get_id();
492e77098d6SPavel Kumbrasev         tbb::task::suspend([&resume_flag, &test_suspend_tag, thread_id] (tbb::task::suspend_point tag) {
493e77098d6SPavel Kumbrasev             CHECK(thread_id == std::this_thread::get_id());
49451c0b2f7Stbbdev             test_suspend_tag = tag;
4954523a761Stbbdev             resume_flag = 1;
49651c0b2f7Stbbdev         });
49751c0b2f7Stbbdev     };
49851c0b2f7Stbbdev     using task_type = CountingTask<decltype(suspend_func)>;
49951c0b2f7Stbbdev     task_type suspend_task(suspend_func, wait);
50051c0b2f7Stbbdev 
50151c0b2f7Stbbdev     tbb::detail::d1::execute_and_wait(suspend_task, test_context, wait, test_context);
50251c0b2f7Stbbdev     CHECK(bypass_task::my_current_task >= test_task_pool.size());
50351c0b2f7Stbbdev     REQUIRE_MESSAGE(test_tls == 1, "The wrong thread came out");
50451c0b2f7Stbbdev }
50551c0b2f7Stbbdev 
50651c0b2f7Stbbdev //! \brief \ref error_guessing
50751c0b2f7Stbbdev TEST_CASE("Critical tasks + resume") {
50855f9b178SIvan Kochin     std::uint32_t task_number = 500 * static_cast<std::uint32_t>(utils::get_platform_max_threads());
50951c0b2f7Stbbdev 
51051c0b2f7Stbbdev     tbb::task_group_context test_context;
51143c1805eSAlex     tbb::detail::d1::wait_context wait{ 0 };
51251c0b2f7Stbbdev 
5138b6f831cStbbdev     // The test expects at least one thread in test_arena
5148b6f831cStbbdev     int num_threads_in_test_arena = std::max(2, int(utils::get_platform_max_threads()));
5158b6f831cStbbdev     tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, num_threads_in_test_arena);
5168b6f831cStbbdev     tbb::task_arena test_arena(num_threads_in_test_arena);
51751c0b2f7Stbbdev 
51851c0b2f7Stbbdev     test_arena.initialize();
51951c0b2f7Stbbdev 
52043c1805eSAlex     std::atomic<bool> resume_flag{}, resumed{};
52151c0b2f7Stbbdev     tbb::task::suspend_point test_suspend_tag;
52251c0b2f7Stbbdev 
__anon509d20bd1002null52343c1805eSAlex     auto task_body = [&resume_flag, &resumed, &test_suspend_tag] {
52451c0b2f7Stbbdev         // Make some work
5254523a761Stbbdev         utils::doDummyWork(1000);
52651c0b2f7Stbbdev 
52751c0b2f7Stbbdev         if (resume_flag.exchange(false)) {
52851c0b2f7Stbbdev             tbb::task::resume(test_suspend_tag);
52943c1805eSAlex             resumed = true;
53051c0b2f7Stbbdev         }
53151c0b2f7Stbbdev     };
53251c0b2f7Stbbdev 
53351c0b2f7Stbbdev     using task_type = CountingTask<decltype(task_body)>;
53451c0b2f7Stbbdev     std::vector<task_type, tbb::cache_aligned_allocator<task_type>> test_tasks;
53551c0b2f7Stbbdev 
53651c0b2f7Stbbdev     for (std::size_t i = 0; i < task_number; ++i) {
53751c0b2f7Stbbdev         test_tasks.emplace_back(task_body, wait);
53851c0b2f7Stbbdev     }
53951c0b2f7Stbbdev 
54043c1805eSAlex     wait.reserve(task_number / 2);
54151c0b2f7Stbbdev     for (std::size_t i = 0; i < task_number / 2; ++i) {
54251c0b2f7Stbbdev         submit(test_tasks[i], test_arena, test_context, true);
54351c0b2f7Stbbdev     }
54451c0b2f7Stbbdev 
__anon509d20bd1102null54551c0b2f7Stbbdev     auto suspend_func = [&resume_flag, &test_suspend_tag] {
546e77098d6SPavel Kumbrasev         auto thread_id = std::this_thread::get_id();
547e77098d6SPavel Kumbrasev         tbb::task::suspend([&resume_flag, &test_suspend_tag, thread_id] (tbb::task::suspend_point tag) {
548e77098d6SPavel Kumbrasev             CHECK(thread_id == std::this_thread::get_id());
54951c0b2f7Stbbdev             test_suspend_tag = tag;
55051c0b2f7Stbbdev             resume_flag.store(true, std::memory_order_release);
55151c0b2f7Stbbdev         });
55251c0b2f7Stbbdev     };
55351c0b2f7Stbbdev     using suspend_task_type = CountingTask<decltype(suspend_func)>;
55451c0b2f7Stbbdev     suspend_task_type suspend_task(suspend_func, wait);
55551c0b2f7Stbbdev 
55643c1805eSAlex     wait.reserve(1);
55751c0b2f7Stbbdev     submit(suspend_task, test_arena, test_context, true);
55851c0b2f7Stbbdev 
__anon509d20bd1302null55943c1805eSAlex     test_arena.execute([&wait, &test_tasks, &test_arena, &test_context, &resumed, task_number] {
56043c1805eSAlex         tbb::this_task_arena::isolate([&wait, &test_tasks, &test_arena, &test_context, &resumed, task_number] {
56143c1805eSAlex             do {
56243c1805eSAlex                 wait.reserve(task_number / 2);
56343c1805eSAlex                 tbb::parallel_for(tbb::blocked_range<std::size_t>(task_number / 2, task_number),
56451c0b2f7Stbbdev                     [&test_tasks, &test_arena, &test_context] (tbb::blocked_range<std::size_t>& range) {
56551c0b2f7Stbbdev                         for (std::size_t i = range.begin(); i != range.end(); ++i) {
56651c0b2f7Stbbdev                             submit(test_tasks[i], test_arena, test_context, true);
56751c0b2f7Stbbdev                         }
56843c1805eSAlex                     }
56943c1805eSAlex                 );
57043c1805eSAlex             } while (!resumed);
57151c0b2f7Stbbdev         });
57251c0b2f7Stbbdev     });
57351c0b2f7Stbbdev 
__anon509d20bd1602null57443c1805eSAlex     test_arena.execute([&wait, &test_context] {
57551c0b2f7Stbbdev         tbb::detail::d1::wait(wait, test_context);
57643c1805eSAlex     });
57751c0b2f7Stbbdev }
57851c0b2f7Stbbdev 
57951c0b2f7Stbbdev //! \brief \ref error_guessing
58051c0b2f7Stbbdev TEST_CASE("Stress testing") {
58155f9b178SIvan Kochin     std::uint32_t task_number = static_cast<std::uint32_t>(utils::get_platform_max_threads());
58251c0b2f7Stbbdev 
58351c0b2f7Stbbdev     tbb::task_group_context test_context;
58451c0b2f7Stbbdev     tbb::detail::d1::wait_context wait(task_number);
58551c0b2f7Stbbdev 
58651c0b2f7Stbbdev     tbb::task_arena test_arena;
58751c0b2f7Stbbdev 
58851c0b2f7Stbbdev     test_arena.initialize();
58951c0b2f7Stbbdev 
__anon509d20bd1702null59051c0b2f7Stbbdev     auto task_body = [] {
59151c0b2f7Stbbdev         tbb::parallel_for(tbb::blocked_range<std::size_t>(0, 1000), [] (tbb::blocked_range<std::size_t>&) {
5924523a761Stbbdev             utils::doDummyWork(100);
59351c0b2f7Stbbdev         });
59451c0b2f7Stbbdev     };
59551c0b2f7Stbbdev     using task_type = CountingTask<decltype(task_body)>;
59651c0b2f7Stbbdev 
59751c0b2f7Stbbdev     std::size_t iter_counter = 20;
59851c0b2f7Stbbdev 
59951c0b2f7Stbbdev     std::vector<task_type, tbb::cache_aligned_allocator<task_type>> test_tasks;
60051c0b2f7Stbbdev 
60151c0b2f7Stbbdev     for (std::size_t j = 0; j < task_number; ++j) {
60251c0b2f7Stbbdev         test_tasks.emplace_back(task_body, wait);
60351c0b2f7Stbbdev     }
60451c0b2f7Stbbdev 
__anon509d20bd1902null60551c0b2f7Stbbdev     test_arena.execute([&test_tasks, &task_body, &wait, &test_context, &test_arena, iter_counter, task_number] {
60651c0b2f7Stbbdev         for (std::size_t i = 0; i < iter_counter; ++i) {
60751c0b2f7Stbbdev 
60851c0b2f7Stbbdev             for (std::size_t j = 0; j < task_number; ++j) {
60951c0b2f7Stbbdev                 test_arena.enqueue(task_body);
61051c0b2f7Stbbdev             }
61151c0b2f7Stbbdev 
61251c0b2f7Stbbdev             for (std::size_t j = 0; j < task_number / 2; ++j) {
61351c0b2f7Stbbdev                 tbb::detail::d1::spawn(test_tasks[j], test_context);
61451c0b2f7Stbbdev             }
61551c0b2f7Stbbdev 
61651c0b2f7Stbbdev             for (std::size_t j = task_number / 2; j < task_number; ++j) {
61751c0b2f7Stbbdev                 submit(test_tasks[j], test_arena, test_context, true);
61851c0b2f7Stbbdev             }
61951c0b2f7Stbbdev 
62051c0b2f7Stbbdev             tbb::detail::d1::wait(wait, test_context);
62151c0b2f7Stbbdev             wait.reserve(task_number);
62251c0b2f7Stbbdev         }
62351c0b2f7Stbbdev         wait.release(task_number);
62451c0b2f7Stbbdev     });
62551c0b2f7Stbbdev 
62651c0b2f7Stbbdev 
62751c0b2f7Stbbdev     REQUIRE_MESSAGE(task_type::execute_counter() == task_number * iter_counter, "Some task was not executed");
62851c0b2f7Stbbdev     REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled");
62951c0b2f7Stbbdev }
63051c0b2f7Stbbdev 
63151c0b2f7Stbbdev //! \brief \ref error_guessing
63251c0b2f7Stbbdev TEST_CASE("All workers sleep") {
63355f9b178SIvan Kochin     std::uint32_t thread_number = static_cast<std::uint32_t>(utils::get_platform_max_threads());
63451c0b2f7Stbbdev     tbb::concurrent_vector<tbb::task::suspend_point> suspend_points;
63551c0b2f7Stbbdev 
63651c0b2f7Stbbdev     tbb::task_group test_gr;
63751c0b2f7Stbbdev 
63851c0b2f7Stbbdev     utils::SpinBarrier barrier(thread_number);
__anon509d20bd1a02null63951c0b2f7Stbbdev     auto resumble_task = [&] {
64051c0b2f7Stbbdev         barrier.wait();
641e77098d6SPavel Kumbrasev         auto thread_id = std::this_thread::get_id();
64251c0b2f7Stbbdev         tbb::task::suspend([&] (tbb::task::suspend_point sp) {
643e77098d6SPavel Kumbrasev             CHECK(thread_id == std::this_thread::get_id());
64451c0b2f7Stbbdev             suspend_points.push_back(sp);
64551c0b2f7Stbbdev             barrier.wait();
64651c0b2f7Stbbdev         });
64751c0b2f7Stbbdev     };
64851c0b2f7Stbbdev 
64951c0b2f7Stbbdev     for (std::size_t i = 0; i < thread_number - 1; ++i) {
65051c0b2f7Stbbdev         test_gr.run(resumble_task);
65151c0b2f7Stbbdev     }
65251c0b2f7Stbbdev 
65351c0b2f7Stbbdev     barrier.wait();
65451c0b2f7Stbbdev     barrier.wait();
65551c0b2f7Stbbdev     TestCPUUserTime(thread_number);
65651c0b2f7Stbbdev 
65751c0b2f7Stbbdev     for (auto sp : suspend_points)
65851c0b2f7Stbbdev         tbb::task::resume(sp);
65951c0b2f7Stbbdev     test_gr.wait();
66051c0b2f7Stbbdev }
66151c0b2f7Stbbdev 
66251c0b2f7Stbbdev #endif // __TBB_RESUMABLE_TASKS
66351c0b2f7Stbbdev 
66451c0b2f7Stbbdev //! \brief \ref error_guessing
66551c0b2f7Stbbdev TEST_CASE("Enqueue with exception") {
66655f9b178SIvan Kochin     std::uint32_t task_number = 500 * static_cast<std::uint32_t>(utils::get_platform_max_threads());
66751c0b2f7Stbbdev 
66851c0b2f7Stbbdev     tbb::task_group_context test_context;
66951c0b2f7Stbbdev     tbb::detail::d1::wait_context wait(task_number);
67051c0b2f7Stbbdev 
67151c0b2f7Stbbdev     tbb::task_arena test_arena{int(std::thread::hardware_concurrency() + 1)};
67251c0b2f7Stbbdev 
67351c0b2f7Stbbdev     test_arena.initialize();
67451c0b2f7Stbbdev 
__anon509d20bd1c02null67551c0b2f7Stbbdev     auto task_body = [] {
6764523a761Stbbdev         utils::doDummyWork(100);
67751c0b2f7Stbbdev     };
67851c0b2f7Stbbdev 
67951c0b2f7Stbbdev     std::atomic<bool> end_flag{false};
__anon509d20bd1d02null68051c0b2f7Stbbdev     auto check_body = [&end_flag] {
68151c0b2f7Stbbdev         end_flag.store(true, std::memory_order_relaxed);
68251c0b2f7Stbbdev     };
68351c0b2f7Stbbdev 
68451c0b2f7Stbbdev     using task_type = CountingTask<decltype(task_body)>;
68551c0b2f7Stbbdev     std::vector<task_type, tbb::cache_aligned_allocator<task_type>> test_tasks;
68651c0b2f7Stbbdev 
68751c0b2f7Stbbdev     for (std::size_t j = 0; j < task_number; ++j) {
68851c0b2f7Stbbdev         test_tasks.emplace_back(task_body, wait);
68951c0b2f7Stbbdev     }
69051c0b2f7Stbbdev 
69151c0b2f7Stbbdev     {
69251c0b2f7Stbbdev         tbb::global_control gc(tbb::global_control::max_allowed_parallelism, 1);
69351c0b2f7Stbbdev         test_arena.enqueue(task_body);
69451c0b2f7Stbbdev         // Initialize implicit arena
__anon509d20bd1e02(int) 69551c0b2f7Stbbdev         tbb::parallel_for(0, 1, [] (int) {});
69651c0b2f7Stbbdev         tbb::task_arena test_arena2(tbb::task_arena::attach{});
69751c0b2f7Stbbdev         test_arena2.enqueue(task_body);
69851c0b2f7Stbbdev     }
69951c0b2f7Stbbdev 
70051c0b2f7Stbbdev     constexpr std::size_t iter_count = 10;
70151c0b2f7Stbbdev     for (std::size_t k = 0; k < iter_count; ++k) {
70251c0b2f7Stbbdev         tbb::global_control gc(tbb::global_control::max_allowed_parallelism, 1);
70351c0b2f7Stbbdev         test_arena.enqueue(check_body);
70451c0b2f7Stbbdev 
70551c0b2f7Stbbdev         while (!end_flag.load(std::memory_order_relaxed)) ;
70651c0b2f7Stbbdev 
70751c0b2f7Stbbdev         utils::Sleep(1);
70851c0b2f7Stbbdev         end_flag.store(false, std::memory_order_relaxed);
70951c0b2f7Stbbdev 
__anon509d20bd1f02null71051c0b2f7Stbbdev         test_arena.execute([&test_tasks, &wait, &test_context, task_number] {
71151c0b2f7Stbbdev             for (std::size_t j = 0; j < task_number; ++j) {
71251c0b2f7Stbbdev                 tbb::detail::d1::spawn(test_tasks[j], test_context);
71351c0b2f7Stbbdev             }
71451c0b2f7Stbbdev 
71551c0b2f7Stbbdev             tbb::detail::d1::wait(wait, test_context);
71651c0b2f7Stbbdev             wait.reserve(task_number);
71751c0b2f7Stbbdev         });
71851c0b2f7Stbbdev     }
71951c0b2f7Stbbdev     wait.release(task_number);
72051c0b2f7Stbbdev 
72151c0b2f7Stbbdev 
72251c0b2f7Stbbdev     REQUIRE_MESSAGE(task_type::execute_counter() == task_number * iter_count, "Some task was not executed");
72351c0b2f7Stbbdev     REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled");
72451c0b2f7Stbbdev }
7254523a761Stbbdev 
7264523a761Stbbdev struct resubmitting_task : public tbb::detail::d1::task {
7274523a761Stbbdev     tbb::task_arena& my_arena;
7284523a761Stbbdev     tbb::task_group_context& my_ctx;
7294523a761Stbbdev     std::atomic<int> counter{100000};
7304523a761Stbbdev 
resubmitting_taskresubmitting_task7314523a761Stbbdev     resubmitting_task(tbb::task_arena& arena, tbb::task_group_context& ctx) : my_arena(arena), my_ctx(ctx)
7324523a761Stbbdev     {}
7334523a761Stbbdev 
executeresubmitting_task7344523a761Stbbdev     tbb::detail::d1::task* execute(tbb::detail::d1::execution_data& ) override {
7354523a761Stbbdev         if (counter-- > 0) {
7364523a761Stbbdev             submit(*this, my_arena, my_ctx, true);
7374523a761Stbbdev         }
7384523a761Stbbdev         return nullptr;
7394523a761Stbbdev     }
7404523a761Stbbdev 
cancelresubmitting_task7414523a761Stbbdev     tbb::detail::d1::task* cancel( tbb::detail::d1::execution_data& ) override {
7424523a761Stbbdev         FAIL("The function should never be called.");
7434523a761Stbbdev         return nullptr;
7444523a761Stbbdev     }
7454523a761Stbbdev };
7464523a761Stbbdev 
7474523a761Stbbdev //! \brief \ref error_guessing
7484523a761Stbbdev TEST_CASE("Test with priority inversion") {
7494523a761Stbbdev     if (!utils::can_change_thread_priority()) return;
7504523a761Stbbdev 
75155f9b178SIvan Kochin     std::uint32_t thread_number = static_cast<std::uint32_t>(utils::get_platform_max_threads());
7524523a761Stbbdev     tbb::global_control gc(tbb::global_control::max_allowed_parallelism, thread_number + 1);
7534523a761Stbbdev 
7544523a761Stbbdev     tbb::task_arena test_arena(2 * thread_number, thread_number);
7554523a761Stbbdev     test_arena.initialize();
7564523a761Stbbdev     utils::pinning_observer obsr(test_arena);
7574523a761Stbbdev     CHECK_MESSAGE(obsr.is_observing(), "Arena observer has not been activated");
7584523a761Stbbdev 
75955f9b178SIvan Kochin     std::uint32_t  critical_task_counter = 1000 * thread_number;
7604523a761Stbbdev     std::atomic<std::size_t> task_counter{0};
7614523a761Stbbdev 
7624523a761Stbbdev     tbb::task_group_context test_context;
7634523a761Stbbdev     tbb::detail::d1::wait_context wait(critical_task_counter);
7644523a761Stbbdev 
__anon509d20bd2002null7654523a761Stbbdev     auto critical_work = [&] {
7664523a761Stbbdev         utils::doDummyWork(10);
7674523a761Stbbdev     };
7684523a761Stbbdev 
7694523a761Stbbdev     using suspend_task_type = CountingTask<decltype(critical_work)>;
7704523a761Stbbdev     suspend_task_type critical_task(critical_work, wait);
7714523a761Stbbdev 
__anon509d20bd2102null7724523a761Stbbdev     auto high_priority_thread_func = [&] {
7734523a761Stbbdev         // Increase external threads priority
774*546fe273SIlya Isaev         utils::increased_priority_guard guard{};
775*546fe273SIlya Isaev         utils::suppress_unused_warning(guard);
7764523a761Stbbdev         // pin external threads
7774523a761Stbbdev         test_arena.execute([]{});
7784523a761Stbbdev         while (task_counter++ < critical_task_counter) {
7794523a761Stbbdev             submit(critical_task, test_arena, test_context, true);
7804523a761Stbbdev             std::this_thread::sleep_for(std::chrono::milliseconds(1));
7814523a761Stbbdev         }
7824523a761Stbbdev     };
7834523a761Stbbdev 
7844523a761Stbbdev     resubmitting_task worker_task(test_arena, test_context);
7854523a761Stbbdev     // warm up
7864523a761Stbbdev     // take first core on execute
7874523a761Stbbdev     utils::SpinBarrier barrier(thread_number + 1);
__anon509d20bd2302null7884523a761Stbbdev     test_arena.execute([&] {
789a088cfa0SKonstantin Boyarinov         tbb::parallel_for(std::uint32_t(0), thread_number + 1, [&] (std::uint32_t) {
7904523a761Stbbdev             barrier.wait();
7914523a761Stbbdev             submit(worker_task, test_arena, test_context, true);
7924523a761Stbbdev         });
7934523a761Stbbdev     });
7944523a761Stbbdev 
7954523a761Stbbdev     std::vector<std::thread> high_priority_threads;
7964523a761Stbbdev     for (std::size_t i = 0; i < thread_number - 1; ++i) {
7974523a761Stbbdev         high_priority_threads.emplace_back(high_priority_thread_func);
7984523a761Stbbdev     }
7994523a761Stbbdev 
800*546fe273SIlya Isaev     utils::increased_priority_guard guard{};
801*546fe273SIlya Isaev     utils::suppress_unused_warning(guard);
8024523a761Stbbdev     while (task_counter++ < critical_task_counter) {
8034523a761Stbbdev         submit(critical_task, test_arena, test_context, true);
8044523a761Stbbdev         std::this_thread::sleep_for(std::chrono::milliseconds(1));
8054523a761Stbbdev     }
8064523a761Stbbdev 
8074523a761Stbbdev     tbb::detail::d1::wait(wait, test_context);
8084523a761Stbbdev 
8094523a761Stbbdev     for (std::size_t i = 0; i < thread_number - 1; ++i) {
8104523a761Stbbdev         high_priority_threads[i].join();
8114523a761Stbbdev     }
8124523a761Stbbdev }
813112076d0SIlya Isaev 
814112076d0SIlya Isaev // Explicit test for raii_guard move ctor because of copy elision optimization
815112076d0SIlya Isaev // TODO: consider better test file for the test case
816112076d0SIlya Isaev //! \brief \ref interface
817112076d0SIlya Isaev TEST_CASE("raii_guard move ctor") {
818112076d0SIlya Isaev     int count{0};
__anon509d20bd2502null819112076d0SIlya Isaev     auto func = [&count] {
820112076d0SIlya Isaev         count++;
821112076d0SIlya Isaev         CHECK(count == 1);
822112076d0SIlya Isaev     };
823112076d0SIlya Isaev 
824112076d0SIlya Isaev     tbb::detail::d0::raii_guard<decltype(func)> guard1(func);
825112076d0SIlya Isaev     tbb::detail::d0::raii_guard<decltype(func)> guard2(std::move(guard1));
826112076d0SIlya Isaev }
827c4568449SPavel Kumbrasev 
828c4568449SPavel Kumbrasev //! \brief \ref error_guessing
829c4568449SPavel Kumbrasev TEST_CASE("Check correct arena destruction with enqueue") {
830c4568449SPavel Kumbrasev     for (int i = 0; i < 100; ++i) {
831c4568449SPavel Kumbrasev         tbb::task_scheduler_handle handle{ tbb::attach{} };
832c4568449SPavel Kumbrasev         {
833c4568449SPavel Kumbrasev             tbb::task_arena a(2, 0);
834c4568449SPavel Kumbrasev 
__anon509d20bd2602null835c4568449SPavel Kumbrasev             a.enqueue([] {
836c4568449SPavel Kumbrasev                 tbb::parallel_for(0, 100, [] (int) { std::this_thread::sleep_for(std::chrono::nanoseconds(10)); });
837c4568449SPavel Kumbrasev             });
838c4568449SPavel Kumbrasev             std::this_thread::sleep_for(std::chrono::microseconds(1));
839c4568449SPavel Kumbrasev         }
840c4568449SPavel Kumbrasev         tbb::finalize(handle, std::nothrow_t{});
841c4568449SPavel Kumbrasev     }
842c4568449SPavel Kumbrasev }
843