xref: /oneTBB/test/tbb/test_resumable_tasks.cpp (revision e77098d6)
151c0b2f7Stbbdev /*
2*e77098d6SPavel Kumbrasev     Copyright (c) 2005-2022 Intel Corporation
351c0b2f7Stbbdev 
451c0b2f7Stbbdev     Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev     you may not use this file except in compliance with the License.
651c0b2f7Stbbdev     You may obtain a copy of the License at
751c0b2f7Stbbdev 
851c0b2f7Stbbdev         http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev 
1051c0b2f7Stbbdev     Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev     distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev     See the License for the specific language governing permissions and
1451c0b2f7Stbbdev     limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev 
1751c0b2f7Stbbdev //! \file test_resumable_tasks.cpp
1851c0b2f7Stbbdev //! \brief Test for [scheduler.resumable_tasks] specification
1951c0b2f7Stbbdev 
2051c0b2f7Stbbdev #include "common/test.h"
2151c0b2f7Stbbdev #include "common/utils.h"
2251c0b2f7Stbbdev 
2351c0b2f7Stbbdev #include "tbb/task.h"
2451c0b2f7Stbbdev 
2551c0b2f7Stbbdev #if __TBB_RESUMABLE_TASKS
2651c0b2f7Stbbdev 
2751c0b2f7Stbbdev #include "tbb/global_control.h"
2851c0b2f7Stbbdev #include "tbb/task_arena.h"
2951c0b2f7Stbbdev #include "tbb/parallel_for.h"
3051c0b2f7Stbbdev #include "tbb/task_scheduler_observer.h"
3151c0b2f7Stbbdev #include "tbb/task_group.h"
3251c0b2f7Stbbdev 
3351c0b2f7Stbbdev #include <algorithm>
3451c0b2f7Stbbdev #include <thread>
3551c0b2f7Stbbdev #include <queue>
3651c0b2f7Stbbdev #include <condition_variable>
3751c0b2f7Stbbdev 
3851c0b2f7Stbbdev const int N = 10;
3951c0b2f7Stbbdev 
4051c0b2f7Stbbdev // External activity used in all tests, which resumes suspended execution point
4151c0b2f7Stbbdev class AsyncActivity {
4251c0b2f7Stbbdev public:
AsyncActivity(int num_)4351c0b2f7Stbbdev     AsyncActivity(int num_) : m_numAsyncThreads(num_) {
4451c0b2f7Stbbdev         for (int i = 0; i < m_numAsyncThreads ; ++i) {
4551c0b2f7Stbbdev             m_asyncThreads.push_back( new std::thread(AsyncActivity::asyncLoop, this) );
4651c0b2f7Stbbdev         }
4751c0b2f7Stbbdev     }
~AsyncActivity()4851c0b2f7Stbbdev     ~AsyncActivity() {
4951c0b2f7Stbbdev         {
5051c0b2f7Stbbdev             std::lock_guard<std::mutex> lock(m_mutex);
5151c0b2f7Stbbdev             for (int i = 0; i < m_numAsyncThreads; ++i) {
5251c0b2f7Stbbdev                 m_tagQueue.push(nullptr);
5351c0b2f7Stbbdev             }
5451c0b2f7Stbbdev             m_condvar.notify_all();
5551c0b2f7Stbbdev         }
5651c0b2f7Stbbdev         for (int i = 0; i < m_numAsyncThreads; ++i) {
5751c0b2f7Stbbdev             m_asyncThreads[i]->join();
5851c0b2f7Stbbdev             delete m_asyncThreads[i];
5951c0b2f7Stbbdev         }
6051c0b2f7Stbbdev         CHECK(m_tagQueue.empty());
6151c0b2f7Stbbdev     }
submit(tbb::task::suspend_point ctx)6251c0b2f7Stbbdev     void submit(tbb::task::suspend_point ctx) {
6351c0b2f7Stbbdev         std::lock_guard<std::mutex> lock(m_mutex);
6451c0b2f7Stbbdev         m_tagQueue.push(ctx);
6551c0b2f7Stbbdev         m_condvar.notify_one();
6651c0b2f7Stbbdev     }
6751c0b2f7Stbbdev 
6851c0b2f7Stbbdev private:
asyncLoop(AsyncActivity * async)6951c0b2f7Stbbdev     static void asyncLoop(AsyncActivity* async) {
7051c0b2f7Stbbdev         tbb::task::suspend_point tag;
7151c0b2f7Stbbdev         for (;;) {
7251c0b2f7Stbbdev             {
7351c0b2f7Stbbdev                 std::unique_lock<std::mutex> lock(async->m_mutex);
7451c0b2f7Stbbdev                 async->m_condvar.wait(lock, [async] {return !async->m_tagQueue.empty(); });
7551c0b2f7Stbbdev                 tag = async->m_tagQueue.front();
7651c0b2f7Stbbdev                 async->m_tagQueue.pop();
7751c0b2f7Stbbdev             }
7851c0b2f7Stbbdev             if (!tag) {
7951c0b2f7Stbbdev                 break;
8051c0b2f7Stbbdev             }
8151c0b2f7Stbbdev             tbb::task::resume(tag);
8251c0b2f7Stbbdev         };
8351c0b2f7Stbbdev     }
8451c0b2f7Stbbdev 
8551c0b2f7Stbbdev     const int m_numAsyncThreads;
8651c0b2f7Stbbdev     std::mutex m_mutex;
8751c0b2f7Stbbdev     std::condition_variable m_condvar;
8851c0b2f7Stbbdev     std::queue<tbb::task::suspend_point> m_tagQueue;
8951c0b2f7Stbbdev     std::vector<std::thread*> m_asyncThreads;
9051c0b2f7Stbbdev };
9151c0b2f7Stbbdev 
9251c0b2f7Stbbdev struct SuspendBody {
SuspendBodySuspendBody93*e77098d6SPavel Kumbrasev     SuspendBody(AsyncActivity& a_, std::thread::id id) :
94*e77098d6SPavel Kumbrasev         m_asyncActivity(a_), thread_id(id) {}
operator ()SuspendBody9551c0b2f7Stbbdev     void operator()(tbb::task::suspend_point tag) {
96*e77098d6SPavel Kumbrasev         CHECK(thread_id == std::this_thread::get_id());
9751c0b2f7Stbbdev         m_asyncActivity.submit(tag);
9851c0b2f7Stbbdev     }
9951c0b2f7Stbbdev 
10051c0b2f7Stbbdev private:
10151c0b2f7Stbbdev     AsyncActivity& m_asyncActivity;
102*e77098d6SPavel Kumbrasev     std::thread::id thread_id;
10351c0b2f7Stbbdev };
10451c0b2f7Stbbdev 
10551c0b2f7Stbbdev class InnermostArenaBody {
10651c0b2f7Stbbdev public:
InnermostArenaBody(AsyncActivity & a_)10751c0b2f7Stbbdev     InnermostArenaBody(AsyncActivity& a_) : m_asyncActivity(a_) {}
10851c0b2f7Stbbdev 
operator ()()10951c0b2f7Stbbdev     void operator()() {
11051c0b2f7Stbbdev         InnermostOuterParFor inner_outer_body(m_asyncActivity);
11151c0b2f7Stbbdev         tbb::parallel_for(0, N, inner_outer_body );
11251c0b2f7Stbbdev     }
11351c0b2f7Stbbdev 
11451c0b2f7Stbbdev private:
11551c0b2f7Stbbdev     struct InnermostInnerParFor {
InnermostInnerParForInnermostArenaBody::InnermostInnerParFor11651c0b2f7Stbbdev         InnermostInnerParFor(AsyncActivity& a_) : m_asyncActivity(a_) {}
operator ()InnermostArenaBody::InnermostInnerParFor11751c0b2f7Stbbdev         void operator()(int) const {
118*e77098d6SPavel Kumbrasev             tbb::task::suspend(SuspendBody(m_asyncActivity, std::this_thread::get_id()));
11951c0b2f7Stbbdev         }
12051c0b2f7Stbbdev         AsyncActivity& m_asyncActivity;
12151c0b2f7Stbbdev     };
12251c0b2f7Stbbdev     struct InnermostOuterParFor {
InnermostOuterParForInnermostArenaBody::InnermostOuterParFor12351c0b2f7Stbbdev         InnermostOuterParFor(AsyncActivity& a_) : m_asyncActivity(a_) {}
operator ()InnermostArenaBody::InnermostOuterParFor12451c0b2f7Stbbdev         void operator()(int) const {
125*e77098d6SPavel Kumbrasev             tbb::task::suspend(SuspendBody(m_asyncActivity, std::this_thread::get_id()));
12651c0b2f7Stbbdev             InnermostInnerParFor inner_inner_body(m_asyncActivity);
12751c0b2f7Stbbdev             tbb::parallel_for(0, N, inner_inner_body);
12851c0b2f7Stbbdev         }
12951c0b2f7Stbbdev         AsyncActivity& m_asyncActivity;
13051c0b2f7Stbbdev     };
13151c0b2f7Stbbdev     AsyncActivity& m_asyncActivity;
13251c0b2f7Stbbdev };
13351c0b2f7Stbbdev 
13451c0b2f7Stbbdev #include "tbb/enumerable_thread_specific.h"
13551c0b2f7Stbbdev 
13651c0b2f7Stbbdev class OutermostArenaBody {
13751c0b2f7Stbbdev public:
OutermostArenaBody(AsyncActivity & a_,tbb::task_arena & o_,tbb::task_arena & i_,tbb::task_arena & id_,tbb::enumerable_thread_specific<int> & ets)13851c0b2f7Stbbdev     OutermostArenaBody(AsyncActivity& a_, tbb::task_arena& o_, tbb::task_arena& i_
13951c0b2f7Stbbdev             , tbb::task_arena& id_, tbb::enumerable_thread_specific<int>& ets) :
14051c0b2f7Stbbdev         m_asyncActivity(a_), m_outermostArena(o_), m_innermostArena(i_), m_innermostArenaDefault(id_), m_local(ets) {}
14151c0b2f7Stbbdev 
operator ()()14251c0b2f7Stbbdev     void operator()() {
14351c0b2f7Stbbdev         tbb::parallel_for(0, 32, *this);
14451c0b2f7Stbbdev     }
14551c0b2f7Stbbdev 
operator ()(int i) const14651c0b2f7Stbbdev     void operator()(int i) const {
147*e77098d6SPavel Kumbrasev         tbb::task::suspend([&] (tbb::task::suspend_point sp) { m_asyncActivity.submit(sp); });
14851c0b2f7Stbbdev 
14951c0b2f7Stbbdev         tbb::task_arena& nested_arena = (i % 3 == 0) ?
15051c0b2f7Stbbdev             m_outermostArena : (i % 3 == 1 ? m_innermostArena : m_innermostArenaDefault);
15151c0b2f7Stbbdev 
15251c0b2f7Stbbdev         if (i % 3 != 0) {
15351c0b2f7Stbbdev             // We can only guarantee recall coorectness for "not-same" nested arenas entry
15451c0b2f7Stbbdev             m_local.local() = i;
15551c0b2f7Stbbdev         }
15651c0b2f7Stbbdev         InnermostArenaBody innermost_arena_body(m_asyncActivity);
15751c0b2f7Stbbdev         nested_arena.execute(innermost_arena_body);
15851c0b2f7Stbbdev         if (i % 3 != 0) {
15951c0b2f7Stbbdev             CHECK_MESSAGE(i == m_local.local(), "Original thread wasn't recalled for innermost nested arena.");
16051c0b2f7Stbbdev         }
16151c0b2f7Stbbdev     }
16251c0b2f7Stbbdev 
16351c0b2f7Stbbdev private:
16451c0b2f7Stbbdev     AsyncActivity& m_asyncActivity;
16551c0b2f7Stbbdev     tbb::task_arena& m_outermostArena;
16651c0b2f7Stbbdev     tbb::task_arena& m_innermostArena;
16751c0b2f7Stbbdev     tbb::task_arena& m_innermostArenaDefault;
16851c0b2f7Stbbdev     tbb::enumerable_thread_specific<int>& m_local;
16951c0b2f7Stbbdev };
17051c0b2f7Stbbdev 
TestNestedArena()17151c0b2f7Stbbdev void TestNestedArena() {
17251c0b2f7Stbbdev     AsyncActivity asyncActivity(4);
17351c0b2f7Stbbdev 
17451c0b2f7Stbbdev     tbb::task_arena outermost_arena;
17551c0b2f7Stbbdev     tbb::task_arena innermost_arena(2,2);
17651c0b2f7Stbbdev     tbb::task_arena innermost_arena_default;
17751c0b2f7Stbbdev 
17851c0b2f7Stbbdev     outermost_arena.initialize();
17951c0b2f7Stbbdev     innermost_arena_default.initialize();
18051c0b2f7Stbbdev     innermost_arena.initialize();
18151c0b2f7Stbbdev 
18251c0b2f7Stbbdev     tbb::enumerable_thread_specific<int> ets;
18351c0b2f7Stbbdev 
18451c0b2f7Stbbdev     OutermostArenaBody outer_arena_body(asyncActivity, outermost_arena, innermost_arena, innermost_arena_default, ets);
18551c0b2f7Stbbdev     outermost_arena.execute(outer_arena_body);
18651c0b2f7Stbbdev }
18751c0b2f7Stbbdev 
18851c0b2f7Stbbdev // External activity used in all tests, which resumes suspended execution point
18951c0b2f7Stbbdev class EpochAsyncActivity {
19051c0b2f7Stbbdev public:
EpochAsyncActivity(int num_,std::atomic<int> & e_)19151c0b2f7Stbbdev     EpochAsyncActivity(int num_, std::atomic<int>& e_) : m_numAsyncThreads(num_), m_globalEpoch(e_) {
19251c0b2f7Stbbdev         for (int i = 0; i < m_numAsyncThreads ; ++i) {
19351c0b2f7Stbbdev             m_asyncThreads.push_back( new std::thread(EpochAsyncActivity::asyncLoop, this) );
19451c0b2f7Stbbdev         }
19551c0b2f7Stbbdev     }
~EpochAsyncActivity()19651c0b2f7Stbbdev     ~EpochAsyncActivity() {
19751c0b2f7Stbbdev         {
19851c0b2f7Stbbdev             std::lock_guard<std::mutex> lock(m_mutex);
19951c0b2f7Stbbdev             for (int i = 0; i < m_numAsyncThreads; ++i) {
20051c0b2f7Stbbdev                 m_tagQueue.push(nullptr);
20151c0b2f7Stbbdev             }
20251c0b2f7Stbbdev             m_condvar.notify_all();
20351c0b2f7Stbbdev         }
20451c0b2f7Stbbdev         for (int i = 0; i < m_numAsyncThreads; ++i) {
20551c0b2f7Stbbdev             m_asyncThreads[i]->join();
20651c0b2f7Stbbdev             delete m_asyncThreads[i];
20751c0b2f7Stbbdev         }
20851c0b2f7Stbbdev         CHECK(m_tagQueue.empty());
20951c0b2f7Stbbdev     }
submit(tbb::task::suspend_point ctx)21051c0b2f7Stbbdev     void submit(tbb::task::suspend_point ctx) {
21151c0b2f7Stbbdev         std::lock_guard<std::mutex> lock(m_mutex);
21251c0b2f7Stbbdev         m_tagQueue.push(ctx);
21351c0b2f7Stbbdev         m_condvar.notify_one();
21451c0b2f7Stbbdev     }
21551c0b2f7Stbbdev 
21651c0b2f7Stbbdev private:
asyncLoop(EpochAsyncActivity * async)21751c0b2f7Stbbdev     static void asyncLoop(EpochAsyncActivity* async) {
21851c0b2f7Stbbdev         tbb::task::suspend_point tag;
21951c0b2f7Stbbdev         for (;;) {
22051c0b2f7Stbbdev             {
22151c0b2f7Stbbdev                 std::unique_lock<std::mutex> lock(async->m_mutex);
22251c0b2f7Stbbdev                 async->m_condvar.wait(lock, [async] {return !async->m_tagQueue.empty(); });
22351c0b2f7Stbbdev                 tag = async->m_tagQueue.front();
22451c0b2f7Stbbdev                 async->m_tagQueue.pop();
22551c0b2f7Stbbdev             }
22651c0b2f7Stbbdev             if (!tag) {
22751c0b2f7Stbbdev                 break;
22851c0b2f7Stbbdev             }
22951c0b2f7Stbbdev             // Track the global epoch
23051c0b2f7Stbbdev             async->m_globalEpoch++;
23151c0b2f7Stbbdev             tbb::task::resume(tag);
23251c0b2f7Stbbdev         };
23351c0b2f7Stbbdev     }
23451c0b2f7Stbbdev 
23551c0b2f7Stbbdev     const int m_numAsyncThreads;
23651c0b2f7Stbbdev     std::atomic<int>& m_globalEpoch;
23751c0b2f7Stbbdev     std::mutex m_mutex;
23851c0b2f7Stbbdev     std::condition_variable m_condvar;
23951c0b2f7Stbbdev     std::queue<tbb::task::suspend_point> m_tagQueue;
24051c0b2f7Stbbdev     std::vector<std::thread*> m_asyncThreads;
24151c0b2f7Stbbdev };
24251c0b2f7Stbbdev 
24351c0b2f7Stbbdev struct EpochSuspendBody {
EpochSuspendBodyEpochSuspendBody24451c0b2f7Stbbdev     EpochSuspendBody(EpochAsyncActivity& a_, std::atomic<int>& e_, int& le_) :
24551c0b2f7Stbbdev         m_asyncActivity(a_), m_globalEpoch(e_), m_localEpoch(le_) {}
24651c0b2f7Stbbdev 
operator ()EpochSuspendBody24751c0b2f7Stbbdev     void operator()(tbb::task::suspend_point ctx) {
24851c0b2f7Stbbdev         m_localEpoch = m_globalEpoch;
24951c0b2f7Stbbdev         m_asyncActivity.submit(ctx);
25051c0b2f7Stbbdev     }
25151c0b2f7Stbbdev 
25251c0b2f7Stbbdev private:
25351c0b2f7Stbbdev     EpochAsyncActivity& m_asyncActivity;
25451c0b2f7Stbbdev     std::atomic<int>& m_globalEpoch;
25551c0b2f7Stbbdev     int& m_localEpoch;
25651c0b2f7Stbbdev };
25751c0b2f7Stbbdev 
25851c0b2f7Stbbdev // Simple test for basic resumable tasks functionality
TestSuspendResume()25951c0b2f7Stbbdev void TestSuspendResume() {
260f81abcb6SIlya Isaev #if __TBB_USE_SANITIZERS
261f81abcb6SIlya Isaev     constexpr int iter_size = 100;
262f81abcb6SIlya Isaev #else
263f81abcb6SIlya Isaev     constexpr int iter_size = 50000;
264f81abcb6SIlya Isaev #endif
265f81abcb6SIlya Isaev 
26651c0b2f7Stbbdev     std::atomic<int> global_epoch; global_epoch = 0;
26751c0b2f7Stbbdev     EpochAsyncActivity async(4, global_epoch);
26851c0b2f7Stbbdev 
26951c0b2f7Stbbdev     tbb::enumerable_thread_specific<int, tbb::cache_aligned_allocator<int>, tbb::ets_suspend_aware> ets_fiber;
27051c0b2f7Stbbdev     std::atomic<int> inner_par_iters, outer_par_iters;
27151c0b2f7Stbbdev     inner_par_iters = outer_par_iters = 0;
27251c0b2f7Stbbdev 
27351c0b2f7Stbbdev     tbb::parallel_for(0, N, [&](int) {
274f81abcb6SIlya Isaev         for (int i = 0; i < iter_size; ++i) {
27551c0b2f7Stbbdev             ets_fiber.local() = i;
27651c0b2f7Stbbdev 
27751c0b2f7Stbbdev             int local_epoch;
27851c0b2f7Stbbdev             tbb::task::suspend(EpochSuspendBody(async, global_epoch, local_epoch));
27951c0b2f7Stbbdev             CHECK(local_epoch < global_epoch);
28051c0b2f7Stbbdev             CHECK(ets_fiber.local() == i);
28151c0b2f7Stbbdev 
28251c0b2f7Stbbdev             tbb::parallel_for(0, N, [&](int) {
28351c0b2f7Stbbdev                 int local_epoch2;
28451c0b2f7Stbbdev                 tbb::task::suspend(EpochSuspendBody(async, global_epoch, local_epoch2));
28551c0b2f7Stbbdev                 CHECK(local_epoch2 < global_epoch);
28651c0b2f7Stbbdev                 ++inner_par_iters;
28751c0b2f7Stbbdev             });
28851c0b2f7Stbbdev 
28951c0b2f7Stbbdev             ets_fiber.local() = i;
29051c0b2f7Stbbdev             tbb::task::suspend(EpochSuspendBody(async, global_epoch, local_epoch));
29151c0b2f7Stbbdev             CHECK(local_epoch < global_epoch);
29251c0b2f7Stbbdev             CHECK(ets_fiber.local() == i);
29351c0b2f7Stbbdev         }
29451c0b2f7Stbbdev         ++outer_par_iters;
29551c0b2f7Stbbdev     });
29651c0b2f7Stbbdev     CHECK(outer_par_iters == N);
297f81abcb6SIlya Isaev     CHECK(inner_par_iters == N*N*iter_size);
29851c0b2f7Stbbdev }
29951c0b2f7Stbbdev 
300b15aabb3Stbbdev // During cleanup external thread's local task pool may
30151c0b2f7Stbbdev // e.g. contain proxies of affinitized tasks, but can be recalled
TestCleanupMaster()30251c0b2f7Stbbdev void TestCleanupMaster() {
30351c0b2f7Stbbdev     if (tbb::this_task_arena::max_concurrency() == 1) {
30451c0b2f7Stbbdev         // The test requires at least 2 threads
30551c0b2f7Stbbdev         return;
30651c0b2f7Stbbdev     }
30751c0b2f7Stbbdev     AsyncActivity asyncActivity(4);
30851c0b2f7Stbbdev     tbb::task_group tg;
30951c0b2f7Stbbdev     std::atomic<int> iter_spawned;
31051c0b2f7Stbbdev     std::atomic<int> iter_executed;
31151c0b2f7Stbbdev 
31251c0b2f7Stbbdev     for (int i = 0; i < 100; i++) {
31351c0b2f7Stbbdev         iter_spawned = 0;
31451c0b2f7Stbbdev         iter_executed = 0;
31551c0b2f7Stbbdev 
31651c0b2f7Stbbdev         utils::NativeParallelFor(N, [&asyncActivity, &tg, &iter_spawned, &iter_executed](int j) {
31751c0b2f7Stbbdev             for (int k = 0; k < j*10 + 1; ++k) {
31851c0b2f7Stbbdev                 tg.run([&asyncActivity, j, &iter_executed] {
319b15aabb3Stbbdev                     utils::doDummyWork(j * 10);
320*e77098d6SPavel Kumbrasev                     tbb::task::suspend(SuspendBody(asyncActivity, std::this_thread::get_id()));
32151c0b2f7Stbbdev                     iter_executed++;
32251c0b2f7Stbbdev                 });
32351c0b2f7Stbbdev                 iter_spawned++;
32451c0b2f7Stbbdev             }
32551c0b2f7Stbbdev         });
32651c0b2f7Stbbdev         CHECK(iter_spawned == 460);
32751c0b2f7Stbbdev         tg.wait();
32851c0b2f7Stbbdev         CHECK(iter_executed == 460);
32951c0b2f7Stbbdev     }
33051c0b2f7Stbbdev }
33151c0b2f7Stbbdev class ParForSuspendBody {
33251c0b2f7Stbbdev     AsyncActivity& asyncActivity;
33351c0b2f7Stbbdev     int m_numIters;
33451c0b2f7Stbbdev public:
ParForSuspendBody(AsyncActivity & a_,int iters)33551c0b2f7Stbbdev     ParForSuspendBody(AsyncActivity& a_, int iters) : asyncActivity(a_), m_numIters(iters) {}
operator ()(int) const33651c0b2f7Stbbdev     void operator()(int) const {
337b15aabb3Stbbdev         utils::doDummyWork(m_numIters);
338*e77098d6SPavel Kumbrasev         tbb::task::suspend(SuspendBody(asyncActivity, std::this_thread::get_id()));
33951c0b2f7Stbbdev     }
34051c0b2f7Stbbdev };
34151c0b2f7Stbbdev 
TestNativeThread()34251c0b2f7Stbbdev void TestNativeThread() {
34351c0b2f7Stbbdev     AsyncActivity asyncActivity(4);
34451c0b2f7Stbbdev 
34551c0b2f7Stbbdev     tbb::task_arena arena;
34651c0b2f7Stbbdev     tbb::task_group tg;
34751c0b2f7Stbbdev     std::atomic<int> iter{};
34851c0b2f7Stbbdev     utils::NativeParallelFor(arena.max_concurrency() / 2, [&arena, &tg, &asyncActivity, &iter](int) {
34951c0b2f7Stbbdev         for (int i = 0; i < 10; i++) {
35051c0b2f7Stbbdev             arena.execute([&tg, &asyncActivity, &iter]() {
35151c0b2f7Stbbdev                 tg.run([&asyncActivity]() {
352*e77098d6SPavel Kumbrasev                     tbb::task::suspend(SuspendBody(asyncActivity, std::this_thread::get_id()));
35351c0b2f7Stbbdev                 });
35451c0b2f7Stbbdev                 iter++;
35551c0b2f7Stbbdev             });
35651c0b2f7Stbbdev         }
35751c0b2f7Stbbdev     });
35851c0b2f7Stbbdev 
35951c0b2f7Stbbdev     CHECK(iter == (arena.max_concurrency() / 2 * 10));
36051c0b2f7Stbbdev     arena.execute([&tg](){
36151c0b2f7Stbbdev         tg.wait();
36251c0b2f7Stbbdev     });
36351c0b2f7Stbbdev }
36451c0b2f7Stbbdev 
36551c0b2f7Stbbdev class ObserverTracker : public tbb::task_scheduler_observer {
36651c0b2f7Stbbdev     static thread_local bool is_in_arena;
36751c0b2f7Stbbdev public:
36851c0b2f7Stbbdev     std::atomic<int> counter;
36951c0b2f7Stbbdev 
ObserverTracker(tbb::task_arena & a)37051c0b2f7Stbbdev     ObserverTracker(tbb::task_arena& a) : tbb::task_scheduler_observer(a) {
37151c0b2f7Stbbdev         counter = 0;
37251c0b2f7Stbbdev         observe(true);
37351c0b2f7Stbbdev     }
on_scheduler_entry(bool)37451c0b2f7Stbbdev     void on_scheduler_entry(bool) override {
37551c0b2f7Stbbdev         bool& l = is_in_arena;
37651c0b2f7Stbbdev         CHECK_MESSAGE(l == false, "The thread must call on_scheduler_entry only one time.");
37751c0b2f7Stbbdev         l = true;
37851c0b2f7Stbbdev         ++counter;
37951c0b2f7Stbbdev     }
on_scheduler_exit(bool)38051c0b2f7Stbbdev     void on_scheduler_exit(bool) override {
38151c0b2f7Stbbdev         bool& l = is_in_arena;
38251c0b2f7Stbbdev         CHECK_MESSAGE(l == true, "The thread must call on_scheduler_entry before calling on_scheduler_exit.");
38351c0b2f7Stbbdev         l = false;
38451c0b2f7Stbbdev     }
38551c0b2f7Stbbdev };
38651c0b2f7Stbbdev 
38751c0b2f7Stbbdev thread_local bool ObserverTracker::is_in_arena;
38851c0b2f7Stbbdev 
TestObservers()38951c0b2f7Stbbdev void TestObservers() {
39051c0b2f7Stbbdev     tbb::task_arena arena;
39151c0b2f7Stbbdev     ObserverTracker tracker(arena);
39251c0b2f7Stbbdev     do {
39351c0b2f7Stbbdev         arena.execute([] {
39451c0b2f7Stbbdev             tbb::parallel_for(0, 10, [](int) {
395*e77098d6SPavel Kumbrasev                 auto thread_id = std::this_thread::get_id();
396*e77098d6SPavel Kumbrasev                 tbb::task::suspend([thread_id](tbb::task::suspend_point tag) {
397*e77098d6SPavel Kumbrasev                     CHECK(thread_id == std::this_thread::get_id());
39851c0b2f7Stbbdev                     tbb::task::resume(tag);
39951c0b2f7Stbbdev                 });
40051c0b2f7Stbbdev             }, tbb::simple_partitioner());
40151c0b2f7Stbbdev         });
40251c0b2f7Stbbdev     } while (tracker.counter < 100);
40351c0b2f7Stbbdev     tracker.observe(false);
40451c0b2f7Stbbdev }
40551c0b2f7Stbbdev 
40651c0b2f7Stbbdev class TestCaseGuard {
40751c0b2f7Stbbdev     static thread_local bool m_local;
40851c0b2f7Stbbdev     tbb::global_control m_threadLimit;
40951c0b2f7Stbbdev     tbb::global_control m_stackLimit;
41051c0b2f7Stbbdev public:
TestCaseGuard()41151c0b2f7Stbbdev     TestCaseGuard()
41251c0b2f7Stbbdev         : m_threadLimit(tbb::global_control::max_allowed_parallelism, std::max(tbb::this_task_arena::max_concurrency(), 16))
41351c0b2f7Stbbdev         , m_stackLimit(tbb::global_control::thread_stack_size, 128*1024)
41451c0b2f7Stbbdev     {
41551c0b2f7Stbbdev         CHECK(m_local == false);
41651c0b2f7Stbbdev         m_local = true;
41751c0b2f7Stbbdev     }
~TestCaseGuard()41851c0b2f7Stbbdev     ~TestCaseGuard() {
41951c0b2f7Stbbdev         CHECK(m_local == true);
42051c0b2f7Stbbdev         m_local = false;
42151c0b2f7Stbbdev     }
42251c0b2f7Stbbdev };
42351c0b2f7Stbbdev 
42451c0b2f7Stbbdev thread_local bool TestCaseGuard::m_local = false;
42551c0b2f7Stbbdev 
42651c0b2f7Stbbdev //! Nested test for suspend and resume
42751c0b2f7Stbbdev //! \brief \ref error_guessing
42851c0b2f7Stbbdev TEST_CASE("Nested test for suspend and resume") {
42951c0b2f7Stbbdev     TestCaseGuard guard;
43051c0b2f7Stbbdev     TestSuspendResume();
43151c0b2f7Stbbdev }
43251c0b2f7Stbbdev 
43351c0b2f7Stbbdev //! Nested arena complex test
43451c0b2f7Stbbdev //! \brief \ref error_guessing
43551c0b2f7Stbbdev TEST_CASE("Nested arena") {
43651c0b2f7Stbbdev     TestCaseGuard guard;
43751c0b2f7Stbbdev     TestNestedArena();
43851c0b2f7Stbbdev }
43951c0b2f7Stbbdev 
44051c0b2f7Stbbdev //! Test with external threads
44151c0b2f7Stbbdev //! \brief \ref error_guessing
44251c0b2f7Stbbdev TEST_CASE("External threads") {
44351c0b2f7Stbbdev     TestNativeThread();
44451c0b2f7Stbbdev }
44551c0b2f7Stbbdev 
44651c0b2f7Stbbdev //! Stress test with external threads
44751c0b2f7Stbbdev //! \brief \ref stress
44851c0b2f7Stbbdev TEST_CASE("Stress test with external threads") {
44951c0b2f7Stbbdev     TestCleanupMaster();
45051c0b2f7Stbbdev }
45151c0b2f7Stbbdev 
45251c0b2f7Stbbdev //! Test with an arena observer
45351c0b2f7Stbbdev //! \brief \ref error_guessing
45451c0b2f7Stbbdev TEST_CASE("Arena observer") {
45551c0b2f7Stbbdev     TestObservers();
45651c0b2f7Stbbdev }
45751c0b2f7Stbbdev #endif /* __TBB_RESUMABLE_TASKS */
458