151c0b2f7Stbbdev /*
2a088cfa0SKonstantin Boyarinov Copyright (c) 2005-2023 Intel Corporation
351c0b2f7Stbbdev
451c0b2f7Stbbdev Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev you may not use this file except in compliance with the License.
651c0b2f7Stbbdev You may obtain a copy of the License at
751c0b2f7Stbbdev
851c0b2f7Stbbdev http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev
1051c0b2f7Stbbdev Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev See the License for the specific language governing permissions and
1451c0b2f7Stbbdev limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev
1751c0b2f7Stbbdev #include "common/test.h"
1851c0b2f7Stbbdev #include "common/utils.h"
194523a761Stbbdev #include "common/dummy_body.h"
2051c0b2f7Stbbdev #include "common/spin_barrier.h"
2151c0b2f7Stbbdev #include "common/utils_concurrency_limit.h"
2251c0b2f7Stbbdev #include "common/cpu_usertime.h"
2351c0b2f7Stbbdev
2451c0b2f7Stbbdev #include "tbb/task.h"
2551c0b2f7Stbbdev #include "tbb/task_group.h"
2651c0b2f7Stbbdev #include "tbb/parallel_for.h"
2751c0b2f7Stbbdev #include "tbb/cache_aligned_allocator.h"
2851c0b2f7Stbbdev #include "tbb/global_control.h"
2951c0b2f7Stbbdev #include "tbb/concurrent_vector.h"
3051c0b2f7Stbbdev
3151c0b2f7Stbbdev #include <atomic>
3251c0b2f7Stbbdev #include <thread>
3351c0b2f7Stbbdev #include <thread>
3451c0b2f7Stbbdev
3551c0b2f7Stbbdev //! \file test_task.cpp
3651c0b2f7Stbbdev //! \brief Test for [internal] functionality
3751c0b2f7Stbbdev struct EmptyBody {
operator ()EmptyBody3851c0b2f7Stbbdev void operator()() const {}
3951c0b2f7Stbbdev };
4051c0b2f7Stbbdev
4151c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER)
4251c0b2f7Stbbdev // unreachable code
4351c0b2f7Stbbdev #pragma warning( push )
4451c0b2f7Stbbdev #pragma warning( disable: 4702 )
4551c0b2f7Stbbdev #endif
4651c0b2f7Stbbdev
4751c0b2f7Stbbdev template <typename Body = EmptyBody>
4851c0b2f7Stbbdev class CountingTask : public tbb::detail::d1::task {
4951c0b2f7Stbbdev public:
CountingTask(Body body,tbb::detail::d1::wait_context & wait)5051c0b2f7Stbbdev CountingTask( Body body, tbb::detail::d1::wait_context& wait ) : my_body(body), my_wait(wait) {}
5151c0b2f7Stbbdev
CountingTask(tbb::detail::d1::wait_context & wait)5251c0b2f7Stbbdev CountingTask( tbb::detail::d1::wait_context& wait ) : my_wait(wait) {}
5351c0b2f7Stbbdev
execute(tbb::detail::d1::execution_data &)5451c0b2f7Stbbdev task* execute( tbb::detail::d1::execution_data& ) override {
5551c0b2f7Stbbdev ++my_execute_counter;
5651c0b2f7Stbbdev my_body();
5751c0b2f7Stbbdev my_wait.release();
5851c0b2f7Stbbdev return nullptr;
5951c0b2f7Stbbdev }
6051c0b2f7Stbbdev
cancel(tbb::detail::d1::execution_data &)6151c0b2f7Stbbdev task* cancel( tbb::detail::d1::execution_data& ) override {
6251c0b2f7Stbbdev ++my_cancel_counter;
6351c0b2f7Stbbdev my_wait.release();
6451c0b2f7Stbbdev return nullptr;
6551c0b2f7Stbbdev }
6651c0b2f7Stbbdev
reset()6751c0b2f7Stbbdev static void reset() {
6851c0b2f7Stbbdev my_execute_counter = 0;
6951c0b2f7Stbbdev my_cancel_counter = 0;
7051c0b2f7Stbbdev }
7151c0b2f7Stbbdev
execute_counter()7251c0b2f7Stbbdev static std::size_t execute_counter() { return my_execute_counter; }
cancel_counter()7351c0b2f7Stbbdev static std::size_t cancel_counter() { return my_cancel_counter; }
7451c0b2f7Stbbdev
7551c0b2f7Stbbdev private:
7651c0b2f7Stbbdev Body my_body;
7751c0b2f7Stbbdev tbb::detail::d1::wait_context& my_wait;
7851c0b2f7Stbbdev
7951c0b2f7Stbbdev static std::atomic<std::size_t> my_execute_counter;
8051c0b2f7Stbbdev static std::atomic<std::size_t> my_cancel_counter;
8151c0b2f7Stbbdev }; // struct CountingTask
8251c0b2f7Stbbdev
8351c0b2f7Stbbdev
8451c0b2f7Stbbdev #if _MSC_VER && !defined(__INTEL_COMPILER)
8551c0b2f7Stbbdev #pragma warning( pop )
8651c0b2f7Stbbdev #endif // warning 4702 is back
8751c0b2f7Stbbdev
8851c0b2f7Stbbdev template <typename Body>
8951c0b2f7Stbbdev std::atomic<std::size_t> CountingTask<Body>::my_execute_counter(0);
9051c0b2f7Stbbdev
9151c0b2f7Stbbdev template <typename Body>
9251c0b2f7Stbbdev std::atomic<std::size_t> CountingTask<Body>::my_cancel_counter(0);
9351c0b2f7Stbbdev
9451c0b2f7Stbbdev #if TBB_USE_EXCEPTIONS
test_cancellation_on_exception(bool reset_ctx)9551c0b2f7Stbbdev void test_cancellation_on_exception( bool reset_ctx ) {
9651c0b2f7Stbbdev tbb::detail::d1::wait_context wait(1);
9751c0b2f7Stbbdev tbb::task_group_context test_context;
9851c0b2f7Stbbdev auto throw_body = [] {
9951c0b2f7Stbbdev throw 1;
10051c0b2f7Stbbdev };
10151c0b2f7Stbbdev CountingTask<decltype(throw_body)> task(throw_body, wait);
10251c0b2f7Stbbdev
10351c0b2f7Stbbdev constexpr std::size_t iter_counter = 1000;
10451c0b2f7Stbbdev for (std::size_t i = 0; i < iter_counter; ++i) {
10551c0b2f7Stbbdev try {
10651c0b2f7Stbbdev tbb::detail::d1::execute_and_wait(task, test_context, wait, test_context);
10751c0b2f7Stbbdev } catch(int ex) {
10851c0b2f7Stbbdev REQUIRE(ex == 1);
10951c0b2f7Stbbdev }
11051c0b2f7Stbbdev if (reset_ctx) {
11151c0b2f7Stbbdev test_context.reset();
11251c0b2f7Stbbdev }
11351c0b2f7Stbbdev wait.reserve(1);
11451c0b2f7Stbbdev }
11551c0b2f7Stbbdev wait.release(1);
11651c0b2f7Stbbdev
11751c0b2f7Stbbdev REQUIRE_MESSAGE(task.execute_counter() == (reset_ctx ? iter_counter : 1), "Some task was not executed");
11851c0b2f7Stbbdev REQUIRE_MESSAGE(task.cancel_counter() == iter_counter, "Some task was not canceled after the exception occurs");
11951c0b2f7Stbbdev task.reset();
12051c0b2f7Stbbdev }
12151c0b2f7Stbbdev #endif // TBB_USE_EXCEPTIONS
12251c0b2f7Stbbdev
12351c0b2f7Stbbdev //! \brief \ref error_guessing
12449e08aacStbbdev TEST_CASE("External threads sleep") {
12549e08aacStbbdev if (utils::get_platform_max_threads() < 2) return;
12649e08aacStbbdev utils::SpinBarrier barrier(2);
12749e08aacStbbdev
12849e08aacStbbdev tbb::task_group test_gr;
12949e08aacStbbdev
__anon509d20bd0202null13049e08aacStbbdev test_gr.run([&] {
13149e08aacStbbdev barrier.wait();
13249e08aacStbbdev TestCPUUserTime(2);
13349e08aacStbbdev });
13449e08aacStbbdev
13549e08aacStbbdev barrier.wait();
13649e08aacStbbdev
13749e08aacStbbdev test_gr.wait();
13849e08aacStbbdev }
13949e08aacStbbdev
14049e08aacStbbdev //! \brief \ref error_guessing
14151c0b2f7Stbbdev TEST_CASE("Test that task was executed p times") {
14251c0b2f7Stbbdev tbb::detail::d1::wait_context wait(1);
14351c0b2f7Stbbdev tbb::task_group_context test_context;
14451c0b2f7Stbbdev CountingTask<> test_task(wait);
14551c0b2f7Stbbdev
14651c0b2f7Stbbdev constexpr std::size_t iter_counter = 10000;
14751c0b2f7Stbbdev for (std::size_t i = 0; i < iter_counter; ++i) {
14851c0b2f7Stbbdev tbb::detail::d1::execute_and_wait(test_task, test_context, wait, test_context);
14951c0b2f7Stbbdev wait.reserve(1);
15051c0b2f7Stbbdev }
15151c0b2f7Stbbdev
15251c0b2f7Stbbdev wait.release(1);
15351c0b2f7Stbbdev
15451c0b2f7Stbbdev REQUIRE_MESSAGE(CountingTask<>::execute_counter() == iter_counter, "The task was not executed necessary times");
15551c0b2f7Stbbdev REQUIRE_MESSAGE(CountingTask<>::cancel_counter() == 0, "Some instance of the task was canceled");
15651c0b2f7Stbbdev CountingTask<>::reset();
15751c0b2f7Stbbdev }
15851c0b2f7Stbbdev
15951c0b2f7Stbbdev #if TBB_USE_EXCEPTIONS
16051c0b2f7Stbbdev //! \brief \ref error_guessing
16151c0b2f7Stbbdev TEST_CASE("Test cancellation on exception") {
16251c0b2f7Stbbdev test_cancellation_on_exception(/*reset_ctx = */true);
16351c0b2f7Stbbdev test_cancellation_on_exception(/*reset_ctx = */false);
16451c0b2f7Stbbdev }
16551c0b2f7Stbbdev #endif // TBB_USE_EXCEPTIONS
16651c0b2f7Stbbdev
16751c0b2f7Stbbdev //! \brief \ref error_guessing
16851c0b2f7Stbbdev TEST_CASE("Simple test parallelism usage") {
16955f9b178SIvan Kochin std::uint32_t threads_num = static_cast<std::uint32_t>(utils::get_platform_max_threads());
17051c0b2f7Stbbdev utils::SpinBarrier barrier(threads_num);
17151c0b2f7Stbbdev
__anon509d20bd0302null17251c0b2f7Stbbdev auto barrier_wait = [&barrier] {
17351c0b2f7Stbbdev barrier.wait();
17451c0b2f7Stbbdev };
17551c0b2f7Stbbdev
17651c0b2f7Stbbdev tbb::detail::d1::wait_context wait(threads_num);
17751c0b2f7Stbbdev tbb::detail::d1::task_group_context test_context;
17851c0b2f7Stbbdev using task_type = CountingTask<decltype(barrier_wait)>;
17951c0b2f7Stbbdev
18051c0b2f7Stbbdev std::vector<task_type, tbb::cache_aligned_allocator<task_type>> vector_test_task(threads_num, task_type(barrier_wait, wait));
18151c0b2f7Stbbdev
18251c0b2f7Stbbdev constexpr std::size_t iter_counter = 100;
18351c0b2f7Stbbdev for (std::size_t i = 0; i < iter_counter; ++i) {
18451c0b2f7Stbbdev for (std::size_t j = 0; j < threads_num; ++j) {
18551c0b2f7Stbbdev tbb::detail::d1::spawn(vector_test_task[j], test_context);
18651c0b2f7Stbbdev }
18751c0b2f7Stbbdev tbb::detail::d1::wait(wait, test_context);
18851c0b2f7Stbbdev wait.reserve(threads_num);
18951c0b2f7Stbbdev }
19051c0b2f7Stbbdev wait.release(threads_num);
19151c0b2f7Stbbdev
19251c0b2f7Stbbdev REQUIRE_MESSAGE(task_type::execute_counter() == iter_counter * threads_num, "Some task was not executed");
19351c0b2f7Stbbdev REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled");
19451c0b2f7Stbbdev task_type::reset();
19551c0b2f7Stbbdev }
19651c0b2f7Stbbdev
19751c0b2f7Stbbdev //! \brief \ref error_guessing
19851c0b2f7Stbbdev TEST_CASE("Test parallelism usage with parallel_for") {
19955f9b178SIvan Kochin std::uint32_t task_threads_num = static_cast<std::uint32_t>(utils::get_platform_max_threads());
20051c0b2f7Stbbdev utils::SpinBarrier barrier(task_threads_num);
20151c0b2f7Stbbdev
__anon509d20bd0402null20251c0b2f7Stbbdev auto barrier_wait = [&barrier] {
20351c0b2f7Stbbdev barrier.wait();
20451c0b2f7Stbbdev };
20551c0b2f7Stbbdev
20651c0b2f7Stbbdev std::size_t pfor_iter_count = 10000;
20751c0b2f7Stbbdev std::atomic<std::size_t> pfor_counter(0);
20851c0b2f7Stbbdev
__anon509d20bd0502null20951c0b2f7Stbbdev auto parallel_for_func = [&pfor_counter, pfor_iter_count] {
21051c0b2f7Stbbdev tbb::parallel_for(tbb::blocked_range<std::size_t>(0, pfor_iter_count),
21151c0b2f7Stbbdev [&pfor_counter] (tbb::blocked_range<std::size_t>& range) {
21251c0b2f7Stbbdev for (auto it = range.begin(); it != range.end(); ++it) {
21351c0b2f7Stbbdev ++pfor_counter;
21451c0b2f7Stbbdev }
21551c0b2f7Stbbdev }
21651c0b2f7Stbbdev );
21751c0b2f7Stbbdev };
21851c0b2f7Stbbdev
21951c0b2f7Stbbdev tbb::detail::d1::wait_context wait(task_threads_num);
22051c0b2f7Stbbdev tbb::detail::d1::task_group_context test_context;
22151c0b2f7Stbbdev using task_type = CountingTask<decltype(barrier_wait)>;
22251c0b2f7Stbbdev std::vector<task_type, tbb::cache_aligned_allocator<task_type>> vector_test_task(task_threads_num, task_type(barrier_wait, wait));
22351c0b2f7Stbbdev
22451c0b2f7Stbbdev constexpr std::size_t iter_count = 10;
22551c0b2f7Stbbdev constexpr std::size_t pfor_threads_num = 4;
22651c0b2f7Stbbdev for (std::size_t i = 0; i < iter_count; ++i) {
22751c0b2f7Stbbdev std::vector<std::thread> pfor_threads;
22851c0b2f7Stbbdev
22951c0b2f7Stbbdev for (std::size_t j = 0; j < task_threads_num; ++j) {
23051c0b2f7Stbbdev tbb::detail::d1::spawn(vector_test_task[j], test_context);
23151c0b2f7Stbbdev }
23251c0b2f7Stbbdev
23351c0b2f7Stbbdev for (std::size_t k = 0; k < pfor_threads_num; ++k) {
23451c0b2f7Stbbdev pfor_threads.emplace_back(parallel_for_func);
23551c0b2f7Stbbdev }
23651c0b2f7Stbbdev
23751c0b2f7Stbbdev tbb::detail::d1::wait(wait, test_context);
23851c0b2f7Stbbdev
23951c0b2f7Stbbdev for (auto& thread : pfor_threads) {
24051c0b2f7Stbbdev if (thread.joinable()) {
24151c0b2f7Stbbdev thread.join();
24251c0b2f7Stbbdev }
24351c0b2f7Stbbdev }
24451c0b2f7Stbbdev
24551c0b2f7Stbbdev wait.reserve(task_threads_num);
24651c0b2f7Stbbdev }
24751c0b2f7Stbbdev wait.release(task_threads_num);
24851c0b2f7Stbbdev
24951c0b2f7Stbbdev REQUIRE_MESSAGE(task_type::execute_counter() == task_threads_num * iter_count, "Some task was not executed");
25051c0b2f7Stbbdev REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled");
25151c0b2f7Stbbdev REQUIRE_MESSAGE(pfor_counter == iter_count * pfor_threads_num * pfor_iter_count, "Some parallel_for thread was not finished");
25251c0b2f7Stbbdev task_type::reset();
25351c0b2f7Stbbdev }
25451c0b2f7Stbbdev
25551c0b2f7Stbbdev //! \brief \ref error_guessing
25651c0b2f7Stbbdev TEST_CASE("Test parallelism usage with spawn tasks in different threads") {
25755f9b178SIvan Kochin std::uint32_t threads_num = static_cast<std::uint32_t>(utils::get_platform_max_threads());
25851c0b2f7Stbbdev utils::SpinBarrier barrier(threads_num);
25951c0b2f7Stbbdev
__anon509d20bd0702null26051c0b2f7Stbbdev auto barrier_wait = [&barrier] {
26151c0b2f7Stbbdev barrier.wait();
26251c0b2f7Stbbdev };
26351c0b2f7Stbbdev
26451c0b2f7Stbbdev tbb::detail::d1::wait_context wait(threads_num);
26551c0b2f7Stbbdev tbb::detail::d1::task_group_context test_context;
26651c0b2f7Stbbdev using task_type = CountingTask<decltype(barrier_wait)>;
26751c0b2f7Stbbdev std::vector<task_type, tbb::cache_aligned_allocator<task_type>> vector_test_task(threads_num, task_type(barrier_wait, wait));
26851c0b2f7Stbbdev
__anon509d20bd0802( std::size_t idx ) 26951c0b2f7Stbbdev auto thread_func = [&vector_test_task, &test_context] ( std::size_t idx ) {
27051c0b2f7Stbbdev tbb::detail::d1::spawn(vector_test_task[idx], test_context);
27151c0b2f7Stbbdev };
27251c0b2f7Stbbdev
27351c0b2f7Stbbdev constexpr std::size_t iter_count = 10;
27451c0b2f7Stbbdev for (std::size_t i = 0; i < iter_count; ++i) {
27551c0b2f7Stbbdev std::vector<std::thread> threads;
27651c0b2f7Stbbdev
27751c0b2f7Stbbdev for (std::size_t k = 0; k < threads_num - 1; ++k) {
27851c0b2f7Stbbdev threads.emplace_back(thread_func, k);
27951c0b2f7Stbbdev }
28051c0b2f7Stbbdev
28151c0b2f7Stbbdev for (auto& thread : threads) {
28251c0b2f7Stbbdev if (thread.joinable()) {
28351c0b2f7Stbbdev thread.join();
28451c0b2f7Stbbdev }
28551c0b2f7Stbbdev }
28651c0b2f7Stbbdev
28751c0b2f7Stbbdev tbb::detail::d1::execute_and_wait(vector_test_task[threads_num - 1], test_context, wait, test_context);
28851c0b2f7Stbbdev wait.reserve(threads_num);
28951c0b2f7Stbbdev }
29051c0b2f7Stbbdev wait.release(threads_num);
29151c0b2f7Stbbdev
29251c0b2f7Stbbdev REQUIRE_MESSAGE(task_type::execute_counter() == iter_count * threads_num, "Some task was not executed");
29351c0b2f7Stbbdev REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled");
29451c0b2f7Stbbdev task_type::reset();
29551c0b2f7Stbbdev }
29651c0b2f7Stbbdev
29751c0b2f7Stbbdev class SpawningTaskBody;
29851c0b2f7Stbbdev
29951c0b2f7Stbbdev using SpawningTask = CountingTask<SpawningTaskBody>;
30051c0b2f7Stbbdev
30151c0b2f7Stbbdev class SpawningTaskBody {
30251c0b2f7Stbbdev public:
30351c0b2f7Stbbdev using task_pool_type = std::vector<SpawningTask, tbb::cache_aligned_allocator<SpawningTask>>;
30451c0b2f7Stbbdev
SpawningTaskBody(task_pool_type & task_pool,tbb::task_group_context & test_ctx)30551c0b2f7Stbbdev SpawningTaskBody( task_pool_type& task_pool, tbb::task_group_context& test_ctx )
30651c0b2f7Stbbdev : my_task_pool(task_pool), my_test_ctx(test_ctx) {}
30751c0b2f7Stbbdev
operator ()() const30851c0b2f7Stbbdev void operator()() const {
30951c0b2f7Stbbdev std::size_t delta = 7;
31051c0b2f7Stbbdev std::size_t start_idx = my_current_task.fetch_add(delta);
31151c0b2f7Stbbdev
31251c0b2f7Stbbdev if (start_idx < my_task_pool.size()) {
31351c0b2f7Stbbdev for (std::size_t idx = start_idx; idx != std::min(my_task_pool.size(), start_idx + delta); ++idx) {
31451c0b2f7Stbbdev tbb::detail::d1::spawn(my_task_pool[idx], my_test_ctx);
31551c0b2f7Stbbdev }
31651c0b2f7Stbbdev }
31751c0b2f7Stbbdev }
31851c0b2f7Stbbdev private:
31951c0b2f7Stbbdev task_pool_type& my_task_pool;
32051c0b2f7Stbbdev tbb::task_group_context& my_test_ctx;
32151c0b2f7Stbbdev static std::atomic<std::size_t> my_current_task;
32251c0b2f7Stbbdev }; // class SpawningTaskBody
32351c0b2f7Stbbdev
32451c0b2f7Stbbdev std::atomic<std::size_t> SpawningTaskBody::my_current_task(0);
32551c0b2f7Stbbdev
32651c0b2f7Stbbdev //! \brief \ref error_guessing
32751c0b2f7Stbbdev TEST_CASE("Actively adding tasks") {
32855f9b178SIvan Kochin std::uint32_t task_number = 500 * static_cast<std::uint32_t>(utils::get_platform_max_threads());
32951c0b2f7Stbbdev
33051c0b2f7Stbbdev tbb::detail::d1::wait_context wait(task_number + 1);
33151c0b2f7Stbbdev tbb::task_group_context test_context;
33251c0b2f7Stbbdev
33351c0b2f7Stbbdev SpawningTaskBody::task_pool_type task_pool;
33451c0b2f7Stbbdev
33551c0b2f7Stbbdev SpawningTaskBody task_body{task_pool, test_context};
33651c0b2f7Stbbdev for (std::size_t i = 0; i < task_number; ++i) {
33751c0b2f7Stbbdev task_pool.emplace_back(task_body, wait);
33851c0b2f7Stbbdev }
33951c0b2f7Stbbdev
34051c0b2f7Stbbdev SpawningTask first_task(task_body, wait);
34151c0b2f7Stbbdev tbb::detail::d1::execute_and_wait(first_task, test_context, wait, test_context);
34251c0b2f7Stbbdev
34351c0b2f7Stbbdev REQUIRE_MESSAGE(SpawningTask::execute_counter() == task_number + 1, "Some tasks were not executed"); // Is it right?
34451c0b2f7Stbbdev REQUIRE_MESSAGE(SpawningTask::cancel_counter() == 0, "Some tasks were canceled");
34551c0b2f7Stbbdev }
34651c0b2f7Stbbdev
34751c0b2f7Stbbdev #if __TBB_RESUMABLE_TASKS
34851c0b2f7Stbbdev struct suspended_task : public tbb::detail::d1::task {
34951c0b2f7Stbbdev
suspended_tasksuspended_task35051c0b2f7Stbbdev suspended_task(tbb::task::suspend_point tag, tbb::detail::d1::wait_context& wait)
35151c0b2f7Stbbdev : my_suspend_tag(tag), my_wait(wait)
35251c0b2f7Stbbdev {}
35351c0b2f7Stbbdev
executesuspended_task35451c0b2f7Stbbdev task* execute(tbb::detail::d1::execution_data&) override {
35551c0b2f7Stbbdev tbb::parallel_for(tbb::blocked_range<std::size_t>(0, 100000),
35651c0b2f7Stbbdev [] (const tbb::blocked_range<std::size_t>& range) {
35751c0b2f7Stbbdev // Make some heavy work
35851c0b2f7Stbbdev std::atomic<int> sum{};
35951c0b2f7Stbbdev for (auto it = range.begin(); it != range.end(); ++it) {
36051c0b2f7Stbbdev ++sum;
36151c0b2f7Stbbdev }
36251c0b2f7Stbbdev },
36351c0b2f7Stbbdev tbb::static_partitioner{}
36451c0b2f7Stbbdev );
36551c0b2f7Stbbdev
36651c0b2f7Stbbdev my_wait.release();
36751c0b2f7Stbbdev tbb::task::resume(my_suspend_tag);
36851c0b2f7Stbbdev return nullptr;
36951c0b2f7Stbbdev }
37051c0b2f7Stbbdev
cancelsuspended_task37151c0b2f7Stbbdev task* cancel(tbb::detail::d1::execution_data&) override {
37251c0b2f7Stbbdev FAIL("The function should never be called.");
37351c0b2f7Stbbdev return nullptr;
37451c0b2f7Stbbdev }
37551c0b2f7Stbbdev
37651c0b2f7Stbbdev tbb::task::suspend_point my_suspend_tag;
37751c0b2f7Stbbdev tbb::detail::d1::wait_context& my_wait;
37851c0b2f7Stbbdev };
37951c0b2f7Stbbdev
38051c0b2f7Stbbdev //! \brief \ref error_guessing
38151c0b2f7Stbbdev TEST_CASE("Isolation + resumable tasks") {
38251c0b2f7Stbbdev std::atomic<int> suspend_flag{};
38351c0b2f7Stbbdev tbb::task_group_context test_context;
38451c0b2f7Stbbdev
38551c0b2f7Stbbdev std::atomic<int> suspend_count{};
38651c0b2f7Stbbdev std::atomic<int> resume_count{};
38751c0b2f7Stbbdev
38851c0b2f7Stbbdev tbb::parallel_for(tbb::blocked_range<std::size_t>(0, 100000),
__anon509d20bd0b02(const tbb::blocked_range<std::size_t>& range) 38951c0b2f7Stbbdev [&suspend_flag, &test_context, &suspend_count, &resume_count] (const tbb::blocked_range<std::size_t>& range) {
39051c0b2f7Stbbdev int ticket = 0;
39151c0b2f7Stbbdev for (auto it = range.begin(); it != range.end(); ++it) {
39251c0b2f7Stbbdev ticket = suspend_flag++;
39351c0b2f7Stbbdev }
39451c0b2f7Stbbdev
39551c0b2f7Stbbdev if (ticket % 5 == 0) {
39651c0b2f7Stbbdev std::vector<suspended_task, tbb::cache_aligned_allocator<suspended_task>> test_task;
39751c0b2f7Stbbdev tbb::detail::d1::wait_context wait(1);
39851c0b2f7Stbbdev ++suspend_count;
39951c0b2f7Stbbdev tbb::this_task_arena::isolate([&wait, &test_context, &test_task] {
400e77098d6SPavel Kumbrasev auto thread_id = std::this_thread::get_id();
401e77098d6SPavel Kumbrasev tbb::task::suspend([&wait, &test_context, &test_task, thread_id] (tbb::task::suspend_point tag) {
402e77098d6SPavel Kumbrasev CHECK(thread_id == std::this_thread::get_id());
40351c0b2f7Stbbdev test_task.emplace_back(tag, wait);
40451c0b2f7Stbbdev tbb::detail::d1::spawn(test_task[0], test_context);
40551c0b2f7Stbbdev });
40651c0b2f7Stbbdev }
40751c0b2f7Stbbdev );
40851c0b2f7Stbbdev tbb::detail::d1::wait(wait, test_context);
40951c0b2f7Stbbdev ++resume_count;
41051c0b2f7Stbbdev }
41151c0b2f7Stbbdev }
41251c0b2f7Stbbdev );
41351c0b2f7Stbbdev
41451c0b2f7Stbbdev CHECK(suspend_count == resume_count);
41551c0b2f7Stbbdev }
41651c0b2f7Stbbdev
41751c0b2f7Stbbdev struct bypass_task : public tbb::detail::d1::task {
41851c0b2f7Stbbdev using task_pool_type = std::vector<bypass_task, tbb::cache_aligned_allocator<bypass_task>>;
41951c0b2f7Stbbdev
bypass_taskbypass_task42051c0b2f7Stbbdev bypass_task(tbb::detail::d1::wait_context& wait, task_pool_type& task_pool,
4214523a761Stbbdev std::atomic<int>& resume_flag, tbb::task::suspend_point& suspend_tag)
42251c0b2f7Stbbdev : my_wait(wait), my_task_pool(task_pool), my_resume_flag(resume_flag), my_suspend_tag(suspend_tag)
42351c0b2f7Stbbdev {}
42451c0b2f7Stbbdev
executebypass_task42551c0b2f7Stbbdev task* execute(tbb::detail::d1::execution_data&) override {
4264523a761Stbbdev utils::doDummyWork(10000);
42751c0b2f7Stbbdev
4284523a761Stbbdev int expected = 1;
4294523a761Stbbdev if (my_resume_flag.compare_exchange_strong(expected, 2)) {
43051c0b2f7Stbbdev tbb::task::resume(my_suspend_tag);
43151c0b2f7Stbbdev }
43251c0b2f7Stbbdev
43351c0b2f7Stbbdev std::size_t ticket = my_current_task++;
4344523a761Stbbdev task* next = ticket < my_task_pool.size() ? &my_task_pool[ticket] : nullptr;
4354523a761Stbbdev
4364523a761Stbbdev if (!next && my_resume_flag != 2) {
4374523a761Stbbdev // Rarely all tasks can be executed before the suspend.
4384523a761Stbbdev // So, wait for the suspend before leaving.
4394523a761Stbbdev utils::SpinWaitWhileEq(my_resume_flag, 0);
4404523a761Stbbdev expected = 1;
4414523a761Stbbdev if (my_resume_flag.compare_exchange_strong(expected, 2)) {
4424523a761Stbbdev tbb::task::resume(my_suspend_tag);
4434523a761Stbbdev }
4444523a761Stbbdev }
4454523a761Stbbdev
4464523a761Stbbdev my_wait.release();
4474523a761Stbbdev return next;
44851c0b2f7Stbbdev }
44951c0b2f7Stbbdev
cancelbypass_task45051c0b2f7Stbbdev task* cancel(tbb::detail::d1::execution_data&) override {
45151c0b2f7Stbbdev FAIL("The function should never be called.");
45251c0b2f7Stbbdev return nullptr;
45351c0b2f7Stbbdev }
45451c0b2f7Stbbdev
45551c0b2f7Stbbdev tbb::detail::d1::wait_context& my_wait;
45651c0b2f7Stbbdev task_pool_type& my_task_pool;
4574523a761Stbbdev std::atomic<int>& my_resume_flag;
45851c0b2f7Stbbdev tbb::task::suspend_point& my_suspend_tag;
45951c0b2f7Stbbdev static std::atomic<int> my_current_task;
46051c0b2f7Stbbdev };
46151c0b2f7Stbbdev
46251c0b2f7Stbbdev std::atomic<int> bypass_task::my_current_task(0);
46351c0b2f7Stbbdev
46451c0b2f7Stbbdev thread_local int test_tls = 0;
46551c0b2f7Stbbdev
46651c0b2f7Stbbdev //! \brief \ref error_guessing
46751c0b2f7Stbbdev TEST_CASE("Bypass suspended by resume") {
46855f9b178SIvan Kochin std::uint32_t task_number = 500 * static_cast<std::uint32_t>(utils::get_platform_max_threads());
46951c0b2f7Stbbdev tbb::task_group_context test_context;
47051c0b2f7Stbbdev tbb::detail::d1::wait_context wait(task_number + 1);
47151c0b2f7Stbbdev
47251c0b2f7Stbbdev test_tls = 1;
47351c0b2f7Stbbdev
4744523a761Stbbdev std::atomic<int> resume_flag{0};
47551c0b2f7Stbbdev tbb::task::suspend_point test_suspend_tag;
47651c0b2f7Stbbdev
47751c0b2f7Stbbdev std::vector<bypass_task, tbb::cache_aligned_allocator<bypass_task>> test_task_pool;
47851c0b2f7Stbbdev
47951c0b2f7Stbbdev for (std::size_t i = 0; i < task_number; ++i) {
48051c0b2f7Stbbdev test_task_pool.emplace_back(wait, test_task_pool, resume_flag, test_suspend_tag);
48151c0b2f7Stbbdev }
48251c0b2f7Stbbdev
48351c0b2f7Stbbdev for (std::size_t i = 0; i < utils::get_platform_max_threads(); ++i) {
4844523a761Stbbdev std::size_t ticket = bypass_task::my_current_task++;
4854523a761Stbbdev if (ticket < test_task_pool.size()) {
4864523a761Stbbdev tbb::detail::d1::spawn(test_task_pool[ticket], test_context);
4874523a761Stbbdev }
48851c0b2f7Stbbdev }
48951c0b2f7Stbbdev
__anon509d20bd0e02null49051c0b2f7Stbbdev auto suspend_func = [&resume_flag, &test_suspend_tag] {
491e77098d6SPavel Kumbrasev auto thread_id = std::this_thread::get_id();
492e77098d6SPavel Kumbrasev tbb::task::suspend([&resume_flag, &test_suspend_tag, thread_id] (tbb::task::suspend_point tag) {
493e77098d6SPavel Kumbrasev CHECK(thread_id == std::this_thread::get_id());
49451c0b2f7Stbbdev test_suspend_tag = tag;
4954523a761Stbbdev resume_flag = 1;
49651c0b2f7Stbbdev });
49751c0b2f7Stbbdev };
49851c0b2f7Stbbdev using task_type = CountingTask<decltype(suspend_func)>;
49951c0b2f7Stbbdev task_type suspend_task(suspend_func, wait);
50051c0b2f7Stbbdev
50151c0b2f7Stbbdev tbb::detail::d1::execute_and_wait(suspend_task, test_context, wait, test_context);
50251c0b2f7Stbbdev CHECK(bypass_task::my_current_task >= test_task_pool.size());
50351c0b2f7Stbbdev REQUIRE_MESSAGE(test_tls == 1, "The wrong thread came out");
50451c0b2f7Stbbdev }
50551c0b2f7Stbbdev
50651c0b2f7Stbbdev //! \brief \ref error_guessing
50751c0b2f7Stbbdev TEST_CASE("Critical tasks + resume") {
50855f9b178SIvan Kochin std::uint32_t task_number = 500 * static_cast<std::uint32_t>(utils::get_platform_max_threads());
50951c0b2f7Stbbdev
51051c0b2f7Stbbdev tbb::task_group_context test_context;
51143c1805eSAlex tbb::detail::d1::wait_context wait{ 0 };
51251c0b2f7Stbbdev
5138b6f831cStbbdev // The test expects at least one thread in test_arena
5148b6f831cStbbdev int num_threads_in_test_arena = std::max(2, int(utils::get_platform_max_threads()));
5158b6f831cStbbdev tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, num_threads_in_test_arena);
5168b6f831cStbbdev tbb::task_arena test_arena(num_threads_in_test_arena);
51751c0b2f7Stbbdev
51851c0b2f7Stbbdev test_arena.initialize();
51951c0b2f7Stbbdev
52043c1805eSAlex std::atomic<bool> resume_flag{}, resumed{};
52151c0b2f7Stbbdev tbb::task::suspend_point test_suspend_tag;
52251c0b2f7Stbbdev
__anon509d20bd1002null52343c1805eSAlex auto task_body = [&resume_flag, &resumed, &test_suspend_tag] {
52451c0b2f7Stbbdev // Make some work
5254523a761Stbbdev utils::doDummyWork(1000);
52651c0b2f7Stbbdev
52751c0b2f7Stbbdev if (resume_flag.exchange(false)) {
52851c0b2f7Stbbdev tbb::task::resume(test_suspend_tag);
52943c1805eSAlex resumed = true;
53051c0b2f7Stbbdev }
53151c0b2f7Stbbdev };
53251c0b2f7Stbbdev
53351c0b2f7Stbbdev using task_type = CountingTask<decltype(task_body)>;
53451c0b2f7Stbbdev std::vector<task_type, tbb::cache_aligned_allocator<task_type>> test_tasks;
53551c0b2f7Stbbdev
53651c0b2f7Stbbdev for (std::size_t i = 0; i < task_number; ++i) {
53751c0b2f7Stbbdev test_tasks.emplace_back(task_body, wait);
53851c0b2f7Stbbdev }
53951c0b2f7Stbbdev
54043c1805eSAlex wait.reserve(task_number / 2);
54151c0b2f7Stbbdev for (std::size_t i = 0; i < task_number / 2; ++i) {
54251c0b2f7Stbbdev submit(test_tasks[i], test_arena, test_context, true);
54351c0b2f7Stbbdev }
54451c0b2f7Stbbdev
__anon509d20bd1102null54551c0b2f7Stbbdev auto suspend_func = [&resume_flag, &test_suspend_tag] {
546e77098d6SPavel Kumbrasev auto thread_id = std::this_thread::get_id();
547e77098d6SPavel Kumbrasev tbb::task::suspend([&resume_flag, &test_suspend_tag, thread_id] (tbb::task::suspend_point tag) {
548e77098d6SPavel Kumbrasev CHECK(thread_id == std::this_thread::get_id());
54951c0b2f7Stbbdev test_suspend_tag = tag;
55051c0b2f7Stbbdev resume_flag.store(true, std::memory_order_release);
55151c0b2f7Stbbdev });
55251c0b2f7Stbbdev };
55351c0b2f7Stbbdev using suspend_task_type = CountingTask<decltype(suspend_func)>;
55451c0b2f7Stbbdev suspend_task_type suspend_task(suspend_func, wait);
55551c0b2f7Stbbdev
55643c1805eSAlex wait.reserve(1);
55751c0b2f7Stbbdev submit(suspend_task, test_arena, test_context, true);
55851c0b2f7Stbbdev
__anon509d20bd1302null55943c1805eSAlex test_arena.execute([&wait, &test_tasks, &test_arena, &test_context, &resumed, task_number] {
56043c1805eSAlex tbb::this_task_arena::isolate([&wait, &test_tasks, &test_arena, &test_context, &resumed, task_number] {
56143c1805eSAlex do {
56243c1805eSAlex wait.reserve(task_number / 2);
56343c1805eSAlex tbb::parallel_for(tbb::blocked_range<std::size_t>(task_number / 2, task_number),
56451c0b2f7Stbbdev [&test_tasks, &test_arena, &test_context] (tbb::blocked_range<std::size_t>& range) {
56551c0b2f7Stbbdev for (std::size_t i = range.begin(); i != range.end(); ++i) {
56651c0b2f7Stbbdev submit(test_tasks[i], test_arena, test_context, true);
56751c0b2f7Stbbdev }
56843c1805eSAlex }
56943c1805eSAlex );
57043c1805eSAlex } while (!resumed);
57151c0b2f7Stbbdev });
57251c0b2f7Stbbdev });
57351c0b2f7Stbbdev
__anon509d20bd1602null57443c1805eSAlex test_arena.execute([&wait, &test_context] {
57551c0b2f7Stbbdev tbb::detail::d1::wait(wait, test_context);
57643c1805eSAlex });
57751c0b2f7Stbbdev }
57851c0b2f7Stbbdev
57951c0b2f7Stbbdev //! \brief \ref error_guessing
58051c0b2f7Stbbdev TEST_CASE("Stress testing") {
58155f9b178SIvan Kochin std::uint32_t task_number = static_cast<std::uint32_t>(utils::get_platform_max_threads());
58251c0b2f7Stbbdev
58351c0b2f7Stbbdev tbb::task_group_context test_context;
58451c0b2f7Stbbdev tbb::detail::d1::wait_context wait(task_number);
58551c0b2f7Stbbdev
58651c0b2f7Stbbdev tbb::task_arena test_arena;
58751c0b2f7Stbbdev
58851c0b2f7Stbbdev test_arena.initialize();
58951c0b2f7Stbbdev
__anon509d20bd1702null59051c0b2f7Stbbdev auto task_body = [] {
59151c0b2f7Stbbdev tbb::parallel_for(tbb::blocked_range<std::size_t>(0, 1000), [] (tbb::blocked_range<std::size_t>&) {
5924523a761Stbbdev utils::doDummyWork(100);
59351c0b2f7Stbbdev });
59451c0b2f7Stbbdev };
59551c0b2f7Stbbdev using task_type = CountingTask<decltype(task_body)>;
59651c0b2f7Stbbdev
59751c0b2f7Stbbdev std::size_t iter_counter = 20;
59851c0b2f7Stbbdev
59951c0b2f7Stbbdev std::vector<task_type, tbb::cache_aligned_allocator<task_type>> test_tasks;
60051c0b2f7Stbbdev
60151c0b2f7Stbbdev for (std::size_t j = 0; j < task_number; ++j) {
60251c0b2f7Stbbdev test_tasks.emplace_back(task_body, wait);
60351c0b2f7Stbbdev }
60451c0b2f7Stbbdev
__anon509d20bd1902null60551c0b2f7Stbbdev test_arena.execute([&test_tasks, &task_body, &wait, &test_context, &test_arena, iter_counter, task_number] {
60651c0b2f7Stbbdev for (std::size_t i = 0; i < iter_counter; ++i) {
60751c0b2f7Stbbdev
60851c0b2f7Stbbdev for (std::size_t j = 0; j < task_number; ++j) {
60951c0b2f7Stbbdev test_arena.enqueue(task_body);
61051c0b2f7Stbbdev }
61151c0b2f7Stbbdev
61251c0b2f7Stbbdev for (std::size_t j = 0; j < task_number / 2; ++j) {
61351c0b2f7Stbbdev tbb::detail::d1::spawn(test_tasks[j], test_context);
61451c0b2f7Stbbdev }
61551c0b2f7Stbbdev
61651c0b2f7Stbbdev for (std::size_t j = task_number / 2; j < task_number; ++j) {
61751c0b2f7Stbbdev submit(test_tasks[j], test_arena, test_context, true);
61851c0b2f7Stbbdev }
61951c0b2f7Stbbdev
62051c0b2f7Stbbdev tbb::detail::d1::wait(wait, test_context);
62151c0b2f7Stbbdev wait.reserve(task_number);
62251c0b2f7Stbbdev }
62351c0b2f7Stbbdev wait.release(task_number);
62451c0b2f7Stbbdev });
62551c0b2f7Stbbdev
62651c0b2f7Stbbdev
62751c0b2f7Stbbdev REQUIRE_MESSAGE(task_type::execute_counter() == task_number * iter_counter, "Some task was not executed");
62851c0b2f7Stbbdev REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled");
62951c0b2f7Stbbdev }
63051c0b2f7Stbbdev
63151c0b2f7Stbbdev //! \brief \ref error_guessing
63251c0b2f7Stbbdev TEST_CASE("All workers sleep") {
63355f9b178SIvan Kochin std::uint32_t thread_number = static_cast<std::uint32_t>(utils::get_platform_max_threads());
63451c0b2f7Stbbdev tbb::concurrent_vector<tbb::task::suspend_point> suspend_points;
63551c0b2f7Stbbdev
63651c0b2f7Stbbdev tbb::task_group test_gr;
63751c0b2f7Stbbdev
63851c0b2f7Stbbdev utils::SpinBarrier barrier(thread_number);
__anon509d20bd1a02null63951c0b2f7Stbbdev auto resumble_task = [&] {
64051c0b2f7Stbbdev barrier.wait();
641e77098d6SPavel Kumbrasev auto thread_id = std::this_thread::get_id();
64251c0b2f7Stbbdev tbb::task::suspend([&] (tbb::task::suspend_point sp) {
643e77098d6SPavel Kumbrasev CHECK(thread_id == std::this_thread::get_id());
64451c0b2f7Stbbdev suspend_points.push_back(sp);
64551c0b2f7Stbbdev barrier.wait();
64651c0b2f7Stbbdev });
64751c0b2f7Stbbdev };
64851c0b2f7Stbbdev
64951c0b2f7Stbbdev for (std::size_t i = 0; i < thread_number - 1; ++i) {
65051c0b2f7Stbbdev test_gr.run(resumble_task);
65151c0b2f7Stbbdev }
65251c0b2f7Stbbdev
65351c0b2f7Stbbdev barrier.wait();
65451c0b2f7Stbbdev barrier.wait();
65551c0b2f7Stbbdev TestCPUUserTime(thread_number);
65651c0b2f7Stbbdev
65751c0b2f7Stbbdev for (auto sp : suspend_points)
65851c0b2f7Stbbdev tbb::task::resume(sp);
65951c0b2f7Stbbdev test_gr.wait();
66051c0b2f7Stbbdev }
66151c0b2f7Stbbdev
66251c0b2f7Stbbdev #endif // __TBB_RESUMABLE_TASKS
66351c0b2f7Stbbdev
66451c0b2f7Stbbdev //! \brief \ref error_guessing
66551c0b2f7Stbbdev TEST_CASE("Enqueue with exception") {
66655f9b178SIvan Kochin std::uint32_t task_number = 500 * static_cast<std::uint32_t>(utils::get_platform_max_threads());
66751c0b2f7Stbbdev
66851c0b2f7Stbbdev tbb::task_group_context test_context;
66951c0b2f7Stbbdev tbb::detail::d1::wait_context wait(task_number);
67051c0b2f7Stbbdev
67151c0b2f7Stbbdev tbb::task_arena test_arena{int(std::thread::hardware_concurrency() + 1)};
67251c0b2f7Stbbdev
67351c0b2f7Stbbdev test_arena.initialize();
67451c0b2f7Stbbdev
__anon509d20bd1c02null67551c0b2f7Stbbdev auto task_body = [] {
6764523a761Stbbdev utils::doDummyWork(100);
67751c0b2f7Stbbdev };
67851c0b2f7Stbbdev
67951c0b2f7Stbbdev std::atomic<bool> end_flag{false};
__anon509d20bd1d02null68051c0b2f7Stbbdev auto check_body = [&end_flag] {
68151c0b2f7Stbbdev end_flag.store(true, std::memory_order_relaxed);
68251c0b2f7Stbbdev };
68351c0b2f7Stbbdev
68451c0b2f7Stbbdev using task_type = CountingTask<decltype(task_body)>;
68551c0b2f7Stbbdev std::vector<task_type, tbb::cache_aligned_allocator<task_type>> test_tasks;
68651c0b2f7Stbbdev
68751c0b2f7Stbbdev for (std::size_t j = 0; j < task_number; ++j) {
68851c0b2f7Stbbdev test_tasks.emplace_back(task_body, wait);
68951c0b2f7Stbbdev }
69051c0b2f7Stbbdev
69151c0b2f7Stbbdev {
69251c0b2f7Stbbdev tbb::global_control gc(tbb::global_control::max_allowed_parallelism, 1);
69351c0b2f7Stbbdev test_arena.enqueue(task_body);
69451c0b2f7Stbbdev // Initialize implicit arena
__anon509d20bd1e02(int) 69551c0b2f7Stbbdev tbb::parallel_for(0, 1, [] (int) {});
69651c0b2f7Stbbdev tbb::task_arena test_arena2(tbb::task_arena::attach{});
69751c0b2f7Stbbdev test_arena2.enqueue(task_body);
69851c0b2f7Stbbdev }
69951c0b2f7Stbbdev
70051c0b2f7Stbbdev constexpr std::size_t iter_count = 10;
70151c0b2f7Stbbdev for (std::size_t k = 0; k < iter_count; ++k) {
70251c0b2f7Stbbdev tbb::global_control gc(tbb::global_control::max_allowed_parallelism, 1);
70351c0b2f7Stbbdev test_arena.enqueue(check_body);
70451c0b2f7Stbbdev
70551c0b2f7Stbbdev while (!end_flag.load(std::memory_order_relaxed)) ;
70651c0b2f7Stbbdev
70751c0b2f7Stbbdev utils::Sleep(1);
70851c0b2f7Stbbdev end_flag.store(false, std::memory_order_relaxed);
70951c0b2f7Stbbdev
__anon509d20bd1f02null71051c0b2f7Stbbdev test_arena.execute([&test_tasks, &wait, &test_context, task_number] {
71151c0b2f7Stbbdev for (std::size_t j = 0; j < task_number; ++j) {
71251c0b2f7Stbbdev tbb::detail::d1::spawn(test_tasks[j], test_context);
71351c0b2f7Stbbdev }
71451c0b2f7Stbbdev
71551c0b2f7Stbbdev tbb::detail::d1::wait(wait, test_context);
71651c0b2f7Stbbdev wait.reserve(task_number);
71751c0b2f7Stbbdev });
71851c0b2f7Stbbdev }
71951c0b2f7Stbbdev wait.release(task_number);
72051c0b2f7Stbbdev
72151c0b2f7Stbbdev
72251c0b2f7Stbbdev REQUIRE_MESSAGE(task_type::execute_counter() == task_number * iter_count, "Some task was not executed");
72351c0b2f7Stbbdev REQUIRE_MESSAGE(task_type::cancel_counter() == 0, "Some task was canceled");
72451c0b2f7Stbbdev }
7254523a761Stbbdev
7264523a761Stbbdev struct resubmitting_task : public tbb::detail::d1::task {
7274523a761Stbbdev tbb::task_arena& my_arena;
7284523a761Stbbdev tbb::task_group_context& my_ctx;
7294523a761Stbbdev std::atomic<int> counter{100000};
7304523a761Stbbdev
resubmitting_taskresubmitting_task7314523a761Stbbdev resubmitting_task(tbb::task_arena& arena, tbb::task_group_context& ctx) : my_arena(arena), my_ctx(ctx)
7324523a761Stbbdev {}
7334523a761Stbbdev
executeresubmitting_task7344523a761Stbbdev tbb::detail::d1::task* execute(tbb::detail::d1::execution_data& ) override {
7354523a761Stbbdev if (counter-- > 0) {
7364523a761Stbbdev submit(*this, my_arena, my_ctx, true);
7374523a761Stbbdev }
7384523a761Stbbdev return nullptr;
7394523a761Stbbdev }
7404523a761Stbbdev
cancelresubmitting_task7414523a761Stbbdev tbb::detail::d1::task* cancel( tbb::detail::d1::execution_data& ) override {
7424523a761Stbbdev FAIL("The function should never be called.");
7434523a761Stbbdev return nullptr;
7444523a761Stbbdev }
7454523a761Stbbdev };
7464523a761Stbbdev
7474523a761Stbbdev //! \brief \ref error_guessing
7484523a761Stbbdev TEST_CASE("Test with priority inversion") {
7494523a761Stbbdev if (!utils::can_change_thread_priority()) return;
7504523a761Stbbdev
75155f9b178SIvan Kochin std::uint32_t thread_number = static_cast<std::uint32_t>(utils::get_platform_max_threads());
7524523a761Stbbdev tbb::global_control gc(tbb::global_control::max_allowed_parallelism, thread_number + 1);
7534523a761Stbbdev
7544523a761Stbbdev tbb::task_arena test_arena(2 * thread_number, thread_number);
7554523a761Stbbdev test_arena.initialize();
7564523a761Stbbdev utils::pinning_observer obsr(test_arena);
7574523a761Stbbdev CHECK_MESSAGE(obsr.is_observing(), "Arena observer has not been activated");
7584523a761Stbbdev
75955f9b178SIvan Kochin std::uint32_t critical_task_counter = 1000 * thread_number;
7604523a761Stbbdev std::atomic<std::size_t> task_counter{0};
7614523a761Stbbdev
7624523a761Stbbdev tbb::task_group_context test_context;
7634523a761Stbbdev tbb::detail::d1::wait_context wait(critical_task_counter);
7644523a761Stbbdev
__anon509d20bd2002null7654523a761Stbbdev auto critical_work = [&] {
7664523a761Stbbdev utils::doDummyWork(10);
7674523a761Stbbdev };
7684523a761Stbbdev
7694523a761Stbbdev using suspend_task_type = CountingTask<decltype(critical_work)>;
7704523a761Stbbdev suspend_task_type critical_task(critical_work, wait);
7714523a761Stbbdev
__anon509d20bd2102null7724523a761Stbbdev auto high_priority_thread_func = [&] {
7734523a761Stbbdev // Increase external threads priority
774*546fe273SIlya Isaev utils::increased_priority_guard guard{};
775*546fe273SIlya Isaev utils::suppress_unused_warning(guard);
7764523a761Stbbdev // pin external threads
7774523a761Stbbdev test_arena.execute([]{});
7784523a761Stbbdev while (task_counter++ < critical_task_counter) {
7794523a761Stbbdev submit(critical_task, test_arena, test_context, true);
7804523a761Stbbdev std::this_thread::sleep_for(std::chrono::milliseconds(1));
7814523a761Stbbdev }
7824523a761Stbbdev };
7834523a761Stbbdev
7844523a761Stbbdev resubmitting_task worker_task(test_arena, test_context);
7854523a761Stbbdev // warm up
7864523a761Stbbdev // take first core on execute
7874523a761Stbbdev utils::SpinBarrier barrier(thread_number + 1);
__anon509d20bd2302null7884523a761Stbbdev test_arena.execute([&] {
789a088cfa0SKonstantin Boyarinov tbb::parallel_for(std::uint32_t(0), thread_number + 1, [&] (std::uint32_t) {
7904523a761Stbbdev barrier.wait();
7914523a761Stbbdev submit(worker_task, test_arena, test_context, true);
7924523a761Stbbdev });
7934523a761Stbbdev });
7944523a761Stbbdev
7954523a761Stbbdev std::vector<std::thread> high_priority_threads;
7964523a761Stbbdev for (std::size_t i = 0; i < thread_number - 1; ++i) {
7974523a761Stbbdev high_priority_threads.emplace_back(high_priority_thread_func);
7984523a761Stbbdev }
7994523a761Stbbdev
800*546fe273SIlya Isaev utils::increased_priority_guard guard{};
801*546fe273SIlya Isaev utils::suppress_unused_warning(guard);
8024523a761Stbbdev while (task_counter++ < critical_task_counter) {
8034523a761Stbbdev submit(critical_task, test_arena, test_context, true);
8044523a761Stbbdev std::this_thread::sleep_for(std::chrono::milliseconds(1));
8054523a761Stbbdev }
8064523a761Stbbdev
8074523a761Stbbdev tbb::detail::d1::wait(wait, test_context);
8084523a761Stbbdev
8094523a761Stbbdev for (std::size_t i = 0; i < thread_number - 1; ++i) {
8104523a761Stbbdev high_priority_threads[i].join();
8114523a761Stbbdev }
8124523a761Stbbdev }
813112076d0SIlya Isaev
814112076d0SIlya Isaev // Explicit test for raii_guard move ctor because of copy elision optimization
815112076d0SIlya Isaev // TODO: consider better test file for the test case
816112076d0SIlya Isaev //! \brief \ref interface
817112076d0SIlya Isaev TEST_CASE("raii_guard move ctor") {
818112076d0SIlya Isaev int count{0};
__anon509d20bd2502null819112076d0SIlya Isaev auto func = [&count] {
820112076d0SIlya Isaev count++;
821112076d0SIlya Isaev CHECK(count == 1);
822112076d0SIlya Isaev };
823112076d0SIlya Isaev
824112076d0SIlya Isaev tbb::detail::d0::raii_guard<decltype(func)> guard1(func);
825112076d0SIlya Isaev tbb::detail::d0::raii_guard<decltype(func)> guard2(std::move(guard1));
826112076d0SIlya Isaev }
827c4568449SPavel Kumbrasev
828c4568449SPavel Kumbrasev //! \brief \ref error_guessing
829c4568449SPavel Kumbrasev TEST_CASE("Check correct arena destruction with enqueue") {
830c4568449SPavel Kumbrasev for (int i = 0; i < 100; ++i) {
831c4568449SPavel Kumbrasev tbb::task_scheduler_handle handle{ tbb::attach{} };
832c4568449SPavel Kumbrasev {
833c4568449SPavel Kumbrasev tbb::task_arena a(2, 0);
834c4568449SPavel Kumbrasev
__anon509d20bd2602null835c4568449SPavel Kumbrasev a.enqueue([] {
836c4568449SPavel Kumbrasev tbb::parallel_for(0, 100, [] (int) { std::this_thread::sleep_for(std::chrono::nanoseconds(10)); });
837c4568449SPavel Kumbrasev });
838c4568449SPavel Kumbrasev std::this_thread::sleep_for(std::chrono::microseconds(1));
839c4568449SPavel Kumbrasev }
840c4568449SPavel Kumbrasev tbb::finalize(handle, std::nothrow_t{});
841c4568449SPavel Kumbrasev }
842c4568449SPavel Kumbrasev }
843