151c0b2f7Stbbdev /*
2*e77098d6SPavel Kumbrasev Copyright (c) 2005-2022 Intel Corporation
351c0b2f7Stbbdev
451c0b2f7Stbbdev Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev you may not use this file except in compliance with the License.
651c0b2f7Stbbdev You may obtain a copy of the License at
751c0b2f7Stbbdev
851c0b2f7Stbbdev http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev
1051c0b2f7Stbbdev Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev See the License for the specific language governing permissions and
1451c0b2f7Stbbdev limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev
1751c0b2f7Stbbdev //! \file test_resumable_tasks.cpp
1851c0b2f7Stbbdev //! \brief Test for [scheduler.resumable_tasks] specification
1951c0b2f7Stbbdev
2051c0b2f7Stbbdev #include "common/test.h"
2151c0b2f7Stbbdev #include "common/utils.h"
2251c0b2f7Stbbdev
2351c0b2f7Stbbdev #include "tbb/task.h"
2451c0b2f7Stbbdev
2551c0b2f7Stbbdev #if __TBB_RESUMABLE_TASKS
2651c0b2f7Stbbdev
2751c0b2f7Stbbdev #include "tbb/global_control.h"
2851c0b2f7Stbbdev #include "tbb/task_arena.h"
2951c0b2f7Stbbdev #include "tbb/parallel_for.h"
3051c0b2f7Stbbdev #include "tbb/task_scheduler_observer.h"
3151c0b2f7Stbbdev #include "tbb/task_group.h"
3251c0b2f7Stbbdev
3351c0b2f7Stbbdev #include <algorithm>
3451c0b2f7Stbbdev #include <thread>
3551c0b2f7Stbbdev #include <queue>
3651c0b2f7Stbbdev #include <condition_variable>
3751c0b2f7Stbbdev
3851c0b2f7Stbbdev const int N = 10;
3951c0b2f7Stbbdev
4051c0b2f7Stbbdev // External activity used in all tests, which resumes suspended execution point
4151c0b2f7Stbbdev class AsyncActivity {
4251c0b2f7Stbbdev public:
AsyncActivity(int num_)4351c0b2f7Stbbdev AsyncActivity(int num_) : m_numAsyncThreads(num_) {
4451c0b2f7Stbbdev for (int i = 0; i < m_numAsyncThreads ; ++i) {
4551c0b2f7Stbbdev m_asyncThreads.push_back( new std::thread(AsyncActivity::asyncLoop, this) );
4651c0b2f7Stbbdev }
4751c0b2f7Stbbdev }
~AsyncActivity()4851c0b2f7Stbbdev ~AsyncActivity() {
4951c0b2f7Stbbdev {
5051c0b2f7Stbbdev std::lock_guard<std::mutex> lock(m_mutex);
5151c0b2f7Stbbdev for (int i = 0; i < m_numAsyncThreads; ++i) {
5251c0b2f7Stbbdev m_tagQueue.push(nullptr);
5351c0b2f7Stbbdev }
5451c0b2f7Stbbdev m_condvar.notify_all();
5551c0b2f7Stbbdev }
5651c0b2f7Stbbdev for (int i = 0; i < m_numAsyncThreads; ++i) {
5751c0b2f7Stbbdev m_asyncThreads[i]->join();
5851c0b2f7Stbbdev delete m_asyncThreads[i];
5951c0b2f7Stbbdev }
6051c0b2f7Stbbdev CHECK(m_tagQueue.empty());
6151c0b2f7Stbbdev }
submit(tbb::task::suspend_point ctx)6251c0b2f7Stbbdev void submit(tbb::task::suspend_point ctx) {
6351c0b2f7Stbbdev std::lock_guard<std::mutex> lock(m_mutex);
6451c0b2f7Stbbdev m_tagQueue.push(ctx);
6551c0b2f7Stbbdev m_condvar.notify_one();
6651c0b2f7Stbbdev }
6751c0b2f7Stbbdev
6851c0b2f7Stbbdev private:
asyncLoop(AsyncActivity * async)6951c0b2f7Stbbdev static void asyncLoop(AsyncActivity* async) {
7051c0b2f7Stbbdev tbb::task::suspend_point tag;
7151c0b2f7Stbbdev for (;;) {
7251c0b2f7Stbbdev {
7351c0b2f7Stbbdev std::unique_lock<std::mutex> lock(async->m_mutex);
7451c0b2f7Stbbdev async->m_condvar.wait(lock, [async] {return !async->m_tagQueue.empty(); });
7551c0b2f7Stbbdev tag = async->m_tagQueue.front();
7651c0b2f7Stbbdev async->m_tagQueue.pop();
7751c0b2f7Stbbdev }
7851c0b2f7Stbbdev if (!tag) {
7951c0b2f7Stbbdev break;
8051c0b2f7Stbbdev }
8151c0b2f7Stbbdev tbb::task::resume(tag);
8251c0b2f7Stbbdev };
8351c0b2f7Stbbdev }
8451c0b2f7Stbbdev
8551c0b2f7Stbbdev const int m_numAsyncThreads;
8651c0b2f7Stbbdev std::mutex m_mutex;
8751c0b2f7Stbbdev std::condition_variable m_condvar;
8851c0b2f7Stbbdev std::queue<tbb::task::suspend_point> m_tagQueue;
8951c0b2f7Stbbdev std::vector<std::thread*> m_asyncThreads;
9051c0b2f7Stbbdev };
9151c0b2f7Stbbdev
9251c0b2f7Stbbdev struct SuspendBody {
SuspendBodySuspendBody93*e77098d6SPavel Kumbrasev SuspendBody(AsyncActivity& a_, std::thread::id id) :
94*e77098d6SPavel Kumbrasev m_asyncActivity(a_), thread_id(id) {}
operator ()SuspendBody9551c0b2f7Stbbdev void operator()(tbb::task::suspend_point tag) {
96*e77098d6SPavel Kumbrasev CHECK(thread_id == std::this_thread::get_id());
9751c0b2f7Stbbdev m_asyncActivity.submit(tag);
9851c0b2f7Stbbdev }
9951c0b2f7Stbbdev
10051c0b2f7Stbbdev private:
10151c0b2f7Stbbdev AsyncActivity& m_asyncActivity;
102*e77098d6SPavel Kumbrasev std::thread::id thread_id;
10351c0b2f7Stbbdev };
10451c0b2f7Stbbdev
10551c0b2f7Stbbdev class InnermostArenaBody {
10651c0b2f7Stbbdev public:
InnermostArenaBody(AsyncActivity & a_)10751c0b2f7Stbbdev InnermostArenaBody(AsyncActivity& a_) : m_asyncActivity(a_) {}
10851c0b2f7Stbbdev
operator ()()10951c0b2f7Stbbdev void operator()() {
11051c0b2f7Stbbdev InnermostOuterParFor inner_outer_body(m_asyncActivity);
11151c0b2f7Stbbdev tbb::parallel_for(0, N, inner_outer_body );
11251c0b2f7Stbbdev }
11351c0b2f7Stbbdev
11451c0b2f7Stbbdev private:
11551c0b2f7Stbbdev struct InnermostInnerParFor {
InnermostInnerParForInnermostArenaBody::InnermostInnerParFor11651c0b2f7Stbbdev InnermostInnerParFor(AsyncActivity& a_) : m_asyncActivity(a_) {}
operator ()InnermostArenaBody::InnermostInnerParFor11751c0b2f7Stbbdev void operator()(int) const {
118*e77098d6SPavel Kumbrasev tbb::task::suspend(SuspendBody(m_asyncActivity, std::this_thread::get_id()));
11951c0b2f7Stbbdev }
12051c0b2f7Stbbdev AsyncActivity& m_asyncActivity;
12151c0b2f7Stbbdev };
12251c0b2f7Stbbdev struct InnermostOuterParFor {
InnermostOuterParForInnermostArenaBody::InnermostOuterParFor12351c0b2f7Stbbdev InnermostOuterParFor(AsyncActivity& a_) : m_asyncActivity(a_) {}
operator ()InnermostArenaBody::InnermostOuterParFor12451c0b2f7Stbbdev void operator()(int) const {
125*e77098d6SPavel Kumbrasev tbb::task::suspend(SuspendBody(m_asyncActivity, std::this_thread::get_id()));
12651c0b2f7Stbbdev InnermostInnerParFor inner_inner_body(m_asyncActivity);
12751c0b2f7Stbbdev tbb::parallel_for(0, N, inner_inner_body);
12851c0b2f7Stbbdev }
12951c0b2f7Stbbdev AsyncActivity& m_asyncActivity;
13051c0b2f7Stbbdev };
13151c0b2f7Stbbdev AsyncActivity& m_asyncActivity;
13251c0b2f7Stbbdev };
13351c0b2f7Stbbdev
13451c0b2f7Stbbdev #include "tbb/enumerable_thread_specific.h"
13551c0b2f7Stbbdev
13651c0b2f7Stbbdev class OutermostArenaBody {
13751c0b2f7Stbbdev public:
OutermostArenaBody(AsyncActivity & a_,tbb::task_arena & o_,tbb::task_arena & i_,tbb::task_arena & id_,tbb::enumerable_thread_specific<int> & ets)13851c0b2f7Stbbdev OutermostArenaBody(AsyncActivity& a_, tbb::task_arena& o_, tbb::task_arena& i_
13951c0b2f7Stbbdev , tbb::task_arena& id_, tbb::enumerable_thread_specific<int>& ets) :
14051c0b2f7Stbbdev m_asyncActivity(a_), m_outermostArena(o_), m_innermostArena(i_), m_innermostArenaDefault(id_), m_local(ets) {}
14151c0b2f7Stbbdev
operator ()()14251c0b2f7Stbbdev void operator()() {
14351c0b2f7Stbbdev tbb::parallel_for(0, 32, *this);
14451c0b2f7Stbbdev }
14551c0b2f7Stbbdev
operator ()(int i) const14651c0b2f7Stbbdev void operator()(int i) const {
147*e77098d6SPavel Kumbrasev tbb::task::suspend([&] (tbb::task::suspend_point sp) { m_asyncActivity.submit(sp); });
14851c0b2f7Stbbdev
14951c0b2f7Stbbdev tbb::task_arena& nested_arena = (i % 3 == 0) ?
15051c0b2f7Stbbdev m_outermostArena : (i % 3 == 1 ? m_innermostArena : m_innermostArenaDefault);
15151c0b2f7Stbbdev
15251c0b2f7Stbbdev if (i % 3 != 0) {
15351c0b2f7Stbbdev // We can only guarantee recall coorectness for "not-same" nested arenas entry
15451c0b2f7Stbbdev m_local.local() = i;
15551c0b2f7Stbbdev }
15651c0b2f7Stbbdev InnermostArenaBody innermost_arena_body(m_asyncActivity);
15751c0b2f7Stbbdev nested_arena.execute(innermost_arena_body);
15851c0b2f7Stbbdev if (i % 3 != 0) {
15951c0b2f7Stbbdev CHECK_MESSAGE(i == m_local.local(), "Original thread wasn't recalled for innermost nested arena.");
16051c0b2f7Stbbdev }
16151c0b2f7Stbbdev }
16251c0b2f7Stbbdev
16351c0b2f7Stbbdev private:
16451c0b2f7Stbbdev AsyncActivity& m_asyncActivity;
16551c0b2f7Stbbdev tbb::task_arena& m_outermostArena;
16651c0b2f7Stbbdev tbb::task_arena& m_innermostArena;
16751c0b2f7Stbbdev tbb::task_arena& m_innermostArenaDefault;
16851c0b2f7Stbbdev tbb::enumerable_thread_specific<int>& m_local;
16951c0b2f7Stbbdev };
17051c0b2f7Stbbdev
TestNestedArena()17151c0b2f7Stbbdev void TestNestedArena() {
17251c0b2f7Stbbdev AsyncActivity asyncActivity(4);
17351c0b2f7Stbbdev
17451c0b2f7Stbbdev tbb::task_arena outermost_arena;
17551c0b2f7Stbbdev tbb::task_arena innermost_arena(2,2);
17651c0b2f7Stbbdev tbb::task_arena innermost_arena_default;
17751c0b2f7Stbbdev
17851c0b2f7Stbbdev outermost_arena.initialize();
17951c0b2f7Stbbdev innermost_arena_default.initialize();
18051c0b2f7Stbbdev innermost_arena.initialize();
18151c0b2f7Stbbdev
18251c0b2f7Stbbdev tbb::enumerable_thread_specific<int> ets;
18351c0b2f7Stbbdev
18451c0b2f7Stbbdev OutermostArenaBody outer_arena_body(asyncActivity, outermost_arena, innermost_arena, innermost_arena_default, ets);
18551c0b2f7Stbbdev outermost_arena.execute(outer_arena_body);
18651c0b2f7Stbbdev }
18751c0b2f7Stbbdev
18851c0b2f7Stbbdev // External activity used in all tests, which resumes suspended execution point
18951c0b2f7Stbbdev class EpochAsyncActivity {
19051c0b2f7Stbbdev public:
EpochAsyncActivity(int num_,std::atomic<int> & e_)19151c0b2f7Stbbdev EpochAsyncActivity(int num_, std::atomic<int>& e_) : m_numAsyncThreads(num_), m_globalEpoch(e_) {
19251c0b2f7Stbbdev for (int i = 0; i < m_numAsyncThreads ; ++i) {
19351c0b2f7Stbbdev m_asyncThreads.push_back( new std::thread(EpochAsyncActivity::asyncLoop, this) );
19451c0b2f7Stbbdev }
19551c0b2f7Stbbdev }
~EpochAsyncActivity()19651c0b2f7Stbbdev ~EpochAsyncActivity() {
19751c0b2f7Stbbdev {
19851c0b2f7Stbbdev std::lock_guard<std::mutex> lock(m_mutex);
19951c0b2f7Stbbdev for (int i = 0; i < m_numAsyncThreads; ++i) {
20051c0b2f7Stbbdev m_tagQueue.push(nullptr);
20151c0b2f7Stbbdev }
20251c0b2f7Stbbdev m_condvar.notify_all();
20351c0b2f7Stbbdev }
20451c0b2f7Stbbdev for (int i = 0; i < m_numAsyncThreads; ++i) {
20551c0b2f7Stbbdev m_asyncThreads[i]->join();
20651c0b2f7Stbbdev delete m_asyncThreads[i];
20751c0b2f7Stbbdev }
20851c0b2f7Stbbdev CHECK(m_tagQueue.empty());
20951c0b2f7Stbbdev }
submit(tbb::task::suspend_point ctx)21051c0b2f7Stbbdev void submit(tbb::task::suspend_point ctx) {
21151c0b2f7Stbbdev std::lock_guard<std::mutex> lock(m_mutex);
21251c0b2f7Stbbdev m_tagQueue.push(ctx);
21351c0b2f7Stbbdev m_condvar.notify_one();
21451c0b2f7Stbbdev }
21551c0b2f7Stbbdev
21651c0b2f7Stbbdev private:
asyncLoop(EpochAsyncActivity * async)21751c0b2f7Stbbdev static void asyncLoop(EpochAsyncActivity* async) {
21851c0b2f7Stbbdev tbb::task::suspend_point tag;
21951c0b2f7Stbbdev for (;;) {
22051c0b2f7Stbbdev {
22151c0b2f7Stbbdev std::unique_lock<std::mutex> lock(async->m_mutex);
22251c0b2f7Stbbdev async->m_condvar.wait(lock, [async] {return !async->m_tagQueue.empty(); });
22351c0b2f7Stbbdev tag = async->m_tagQueue.front();
22451c0b2f7Stbbdev async->m_tagQueue.pop();
22551c0b2f7Stbbdev }
22651c0b2f7Stbbdev if (!tag) {
22751c0b2f7Stbbdev break;
22851c0b2f7Stbbdev }
22951c0b2f7Stbbdev // Track the global epoch
23051c0b2f7Stbbdev async->m_globalEpoch++;
23151c0b2f7Stbbdev tbb::task::resume(tag);
23251c0b2f7Stbbdev };
23351c0b2f7Stbbdev }
23451c0b2f7Stbbdev
23551c0b2f7Stbbdev const int m_numAsyncThreads;
23651c0b2f7Stbbdev std::atomic<int>& m_globalEpoch;
23751c0b2f7Stbbdev std::mutex m_mutex;
23851c0b2f7Stbbdev std::condition_variable m_condvar;
23951c0b2f7Stbbdev std::queue<tbb::task::suspend_point> m_tagQueue;
24051c0b2f7Stbbdev std::vector<std::thread*> m_asyncThreads;
24151c0b2f7Stbbdev };
24251c0b2f7Stbbdev
24351c0b2f7Stbbdev struct EpochSuspendBody {
EpochSuspendBodyEpochSuspendBody24451c0b2f7Stbbdev EpochSuspendBody(EpochAsyncActivity& a_, std::atomic<int>& e_, int& le_) :
24551c0b2f7Stbbdev m_asyncActivity(a_), m_globalEpoch(e_), m_localEpoch(le_) {}
24651c0b2f7Stbbdev
operator ()EpochSuspendBody24751c0b2f7Stbbdev void operator()(tbb::task::suspend_point ctx) {
24851c0b2f7Stbbdev m_localEpoch = m_globalEpoch;
24951c0b2f7Stbbdev m_asyncActivity.submit(ctx);
25051c0b2f7Stbbdev }
25151c0b2f7Stbbdev
25251c0b2f7Stbbdev private:
25351c0b2f7Stbbdev EpochAsyncActivity& m_asyncActivity;
25451c0b2f7Stbbdev std::atomic<int>& m_globalEpoch;
25551c0b2f7Stbbdev int& m_localEpoch;
25651c0b2f7Stbbdev };
25751c0b2f7Stbbdev
25851c0b2f7Stbbdev // Simple test for basic resumable tasks functionality
TestSuspendResume()25951c0b2f7Stbbdev void TestSuspendResume() {
260f81abcb6SIlya Isaev #if __TBB_USE_SANITIZERS
261f81abcb6SIlya Isaev constexpr int iter_size = 100;
262f81abcb6SIlya Isaev #else
263f81abcb6SIlya Isaev constexpr int iter_size = 50000;
264f81abcb6SIlya Isaev #endif
265f81abcb6SIlya Isaev
26651c0b2f7Stbbdev std::atomic<int> global_epoch; global_epoch = 0;
26751c0b2f7Stbbdev EpochAsyncActivity async(4, global_epoch);
26851c0b2f7Stbbdev
26951c0b2f7Stbbdev tbb::enumerable_thread_specific<int, tbb::cache_aligned_allocator<int>, tbb::ets_suspend_aware> ets_fiber;
27051c0b2f7Stbbdev std::atomic<int> inner_par_iters, outer_par_iters;
27151c0b2f7Stbbdev inner_par_iters = outer_par_iters = 0;
27251c0b2f7Stbbdev
27351c0b2f7Stbbdev tbb::parallel_for(0, N, [&](int) {
274f81abcb6SIlya Isaev for (int i = 0; i < iter_size; ++i) {
27551c0b2f7Stbbdev ets_fiber.local() = i;
27651c0b2f7Stbbdev
27751c0b2f7Stbbdev int local_epoch;
27851c0b2f7Stbbdev tbb::task::suspend(EpochSuspendBody(async, global_epoch, local_epoch));
27951c0b2f7Stbbdev CHECK(local_epoch < global_epoch);
28051c0b2f7Stbbdev CHECK(ets_fiber.local() == i);
28151c0b2f7Stbbdev
28251c0b2f7Stbbdev tbb::parallel_for(0, N, [&](int) {
28351c0b2f7Stbbdev int local_epoch2;
28451c0b2f7Stbbdev tbb::task::suspend(EpochSuspendBody(async, global_epoch, local_epoch2));
28551c0b2f7Stbbdev CHECK(local_epoch2 < global_epoch);
28651c0b2f7Stbbdev ++inner_par_iters;
28751c0b2f7Stbbdev });
28851c0b2f7Stbbdev
28951c0b2f7Stbbdev ets_fiber.local() = i;
29051c0b2f7Stbbdev tbb::task::suspend(EpochSuspendBody(async, global_epoch, local_epoch));
29151c0b2f7Stbbdev CHECK(local_epoch < global_epoch);
29251c0b2f7Stbbdev CHECK(ets_fiber.local() == i);
29351c0b2f7Stbbdev }
29451c0b2f7Stbbdev ++outer_par_iters;
29551c0b2f7Stbbdev });
29651c0b2f7Stbbdev CHECK(outer_par_iters == N);
297f81abcb6SIlya Isaev CHECK(inner_par_iters == N*N*iter_size);
29851c0b2f7Stbbdev }
29951c0b2f7Stbbdev
300b15aabb3Stbbdev // During cleanup external thread's local task pool may
30151c0b2f7Stbbdev // e.g. contain proxies of affinitized tasks, but can be recalled
TestCleanupMaster()30251c0b2f7Stbbdev void TestCleanupMaster() {
30351c0b2f7Stbbdev if (tbb::this_task_arena::max_concurrency() == 1) {
30451c0b2f7Stbbdev // The test requires at least 2 threads
30551c0b2f7Stbbdev return;
30651c0b2f7Stbbdev }
30751c0b2f7Stbbdev AsyncActivity asyncActivity(4);
30851c0b2f7Stbbdev tbb::task_group tg;
30951c0b2f7Stbbdev std::atomic<int> iter_spawned;
31051c0b2f7Stbbdev std::atomic<int> iter_executed;
31151c0b2f7Stbbdev
31251c0b2f7Stbbdev for (int i = 0; i < 100; i++) {
31351c0b2f7Stbbdev iter_spawned = 0;
31451c0b2f7Stbbdev iter_executed = 0;
31551c0b2f7Stbbdev
31651c0b2f7Stbbdev utils::NativeParallelFor(N, [&asyncActivity, &tg, &iter_spawned, &iter_executed](int j) {
31751c0b2f7Stbbdev for (int k = 0; k < j*10 + 1; ++k) {
31851c0b2f7Stbbdev tg.run([&asyncActivity, j, &iter_executed] {
319b15aabb3Stbbdev utils::doDummyWork(j * 10);
320*e77098d6SPavel Kumbrasev tbb::task::suspend(SuspendBody(asyncActivity, std::this_thread::get_id()));
32151c0b2f7Stbbdev iter_executed++;
32251c0b2f7Stbbdev });
32351c0b2f7Stbbdev iter_spawned++;
32451c0b2f7Stbbdev }
32551c0b2f7Stbbdev });
32651c0b2f7Stbbdev CHECK(iter_spawned == 460);
32751c0b2f7Stbbdev tg.wait();
32851c0b2f7Stbbdev CHECK(iter_executed == 460);
32951c0b2f7Stbbdev }
33051c0b2f7Stbbdev }
33151c0b2f7Stbbdev class ParForSuspendBody {
33251c0b2f7Stbbdev AsyncActivity& asyncActivity;
33351c0b2f7Stbbdev int m_numIters;
33451c0b2f7Stbbdev public:
ParForSuspendBody(AsyncActivity & a_,int iters)33551c0b2f7Stbbdev ParForSuspendBody(AsyncActivity& a_, int iters) : asyncActivity(a_), m_numIters(iters) {}
operator ()(int) const33651c0b2f7Stbbdev void operator()(int) const {
337b15aabb3Stbbdev utils::doDummyWork(m_numIters);
338*e77098d6SPavel Kumbrasev tbb::task::suspend(SuspendBody(asyncActivity, std::this_thread::get_id()));
33951c0b2f7Stbbdev }
34051c0b2f7Stbbdev };
34151c0b2f7Stbbdev
TestNativeThread()34251c0b2f7Stbbdev void TestNativeThread() {
34351c0b2f7Stbbdev AsyncActivity asyncActivity(4);
34451c0b2f7Stbbdev
34551c0b2f7Stbbdev tbb::task_arena arena;
34651c0b2f7Stbbdev tbb::task_group tg;
34751c0b2f7Stbbdev std::atomic<int> iter{};
34851c0b2f7Stbbdev utils::NativeParallelFor(arena.max_concurrency() / 2, [&arena, &tg, &asyncActivity, &iter](int) {
34951c0b2f7Stbbdev for (int i = 0; i < 10; i++) {
35051c0b2f7Stbbdev arena.execute([&tg, &asyncActivity, &iter]() {
35151c0b2f7Stbbdev tg.run([&asyncActivity]() {
352*e77098d6SPavel Kumbrasev tbb::task::suspend(SuspendBody(asyncActivity, std::this_thread::get_id()));
35351c0b2f7Stbbdev });
35451c0b2f7Stbbdev iter++;
35551c0b2f7Stbbdev });
35651c0b2f7Stbbdev }
35751c0b2f7Stbbdev });
35851c0b2f7Stbbdev
35951c0b2f7Stbbdev CHECK(iter == (arena.max_concurrency() / 2 * 10));
36051c0b2f7Stbbdev arena.execute([&tg](){
36151c0b2f7Stbbdev tg.wait();
36251c0b2f7Stbbdev });
36351c0b2f7Stbbdev }
36451c0b2f7Stbbdev
36551c0b2f7Stbbdev class ObserverTracker : public tbb::task_scheduler_observer {
36651c0b2f7Stbbdev static thread_local bool is_in_arena;
36751c0b2f7Stbbdev public:
36851c0b2f7Stbbdev std::atomic<int> counter;
36951c0b2f7Stbbdev
ObserverTracker(tbb::task_arena & a)37051c0b2f7Stbbdev ObserverTracker(tbb::task_arena& a) : tbb::task_scheduler_observer(a) {
37151c0b2f7Stbbdev counter = 0;
37251c0b2f7Stbbdev observe(true);
37351c0b2f7Stbbdev }
on_scheduler_entry(bool)37451c0b2f7Stbbdev void on_scheduler_entry(bool) override {
37551c0b2f7Stbbdev bool& l = is_in_arena;
37651c0b2f7Stbbdev CHECK_MESSAGE(l == false, "The thread must call on_scheduler_entry only one time.");
37751c0b2f7Stbbdev l = true;
37851c0b2f7Stbbdev ++counter;
37951c0b2f7Stbbdev }
on_scheduler_exit(bool)38051c0b2f7Stbbdev void on_scheduler_exit(bool) override {
38151c0b2f7Stbbdev bool& l = is_in_arena;
38251c0b2f7Stbbdev CHECK_MESSAGE(l == true, "The thread must call on_scheduler_entry before calling on_scheduler_exit.");
38351c0b2f7Stbbdev l = false;
38451c0b2f7Stbbdev }
38551c0b2f7Stbbdev };
38651c0b2f7Stbbdev
38751c0b2f7Stbbdev thread_local bool ObserverTracker::is_in_arena;
38851c0b2f7Stbbdev
TestObservers()38951c0b2f7Stbbdev void TestObservers() {
39051c0b2f7Stbbdev tbb::task_arena arena;
39151c0b2f7Stbbdev ObserverTracker tracker(arena);
39251c0b2f7Stbbdev do {
39351c0b2f7Stbbdev arena.execute([] {
39451c0b2f7Stbbdev tbb::parallel_for(0, 10, [](int) {
395*e77098d6SPavel Kumbrasev auto thread_id = std::this_thread::get_id();
396*e77098d6SPavel Kumbrasev tbb::task::suspend([thread_id](tbb::task::suspend_point tag) {
397*e77098d6SPavel Kumbrasev CHECK(thread_id == std::this_thread::get_id());
39851c0b2f7Stbbdev tbb::task::resume(tag);
39951c0b2f7Stbbdev });
40051c0b2f7Stbbdev }, tbb::simple_partitioner());
40151c0b2f7Stbbdev });
40251c0b2f7Stbbdev } while (tracker.counter < 100);
40351c0b2f7Stbbdev tracker.observe(false);
40451c0b2f7Stbbdev }
40551c0b2f7Stbbdev
40651c0b2f7Stbbdev class TestCaseGuard {
40751c0b2f7Stbbdev static thread_local bool m_local;
40851c0b2f7Stbbdev tbb::global_control m_threadLimit;
40951c0b2f7Stbbdev tbb::global_control m_stackLimit;
41051c0b2f7Stbbdev public:
TestCaseGuard()41151c0b2f7Stbbdev TestCaseGuard()
41251c0b2f7Stbbdev : m_threadLimit(tbb::global_control::max_allowed_parallelism, std::max(tbb::this_task_arena::max_concurrency(), 16))
41351c0b2f7Stbbdev , m_stackLimit(tbb::global_control::thread_stack_size, 128*1024)
41451c0b2f7Stbbdev {
41551c0b2f7Stbbdev CHECK(m_local == false);
41651c0b2f7Stbbdev m_local = true;
41751c0b2f7Stbbdev }
~TestCaseGuard()41851c0b2f7Stbbdev ~TestCaseGuard() {
41951c0b2f7Stbbdev CHECK(m_local == true);
42051c0b2f7Stbbdev m_local = false;
42151c0b2f7Stbbdev }
42251c0b2f7Stbbdev };
42351c0b2f7Stbbdev
42451c0b2f7Stbbdev thread_local bool TestCaseGuard::m_local = false;
42551c0b2f7Stbbdev
42651c0b2f7Stbbdev //! Nested test for suspend and resume
42751c0b2f7Stbbdev //! \brief \ref error_guessing
42851c0b2f7Stbbdev TEST_CASE("Nested test for suspend and resume") {
42951c0b2f7Stbbdev TestCaseGuard guard;
43051c0b2f7Stbbdev TestSuspendResume();
43151c0b2f7Stbbdev }
43251c0b2f7Stbbdev
43351c0b2f7Stbbdev //! Nested arena complex test
43451c0b2f7Stbbdev //! \brief \ref error_guessing
43551c0b2f7Stbbdev TEST_CASE("Nested arena") {
43651c0b2f7Stbbdev TestCaseGuard guard;
43751c0b2f7Stbbdev TestNestedArena();
43851c0b2f7Stbbdev }
43951c0b2f7Stbbdev
44051c0b2f7Stbbdev //! Test with external threads
44151c0b2f7Stbbdev //! \brief \ref error_guessing
44251c0b2f7Stbbdev TEST_CASE("External threads") {
44351c0b2f7Stbbdev TestNativeThread();
44451c0b2f7Stbbdev }
44551c0b2f7Stbbdev
44651c0b2f7Stbbdev //! Stress test with external threads
44751c0b2f7Stbbdev //! \brief \ref stress
44851c0b2f7Stbbdev TEST_CASE("Stress test with external threads") {
44951c0b2f7Stbbdev TestCleanupMaster();
45051c0b2f7Stbbdev }
45151c0b2f7Stbbdev
45251c0b2f7Stbbdev //! Test with an arena observer
45351c0b2f7Stbbdev //! \brief \ref error_guessing
45451c0b2f7Stbbdev TEST_CASE("Arena observer") {
45551c0b2f7Stbbdev TestObservers();
45651c0b2f7Stbbdev }
45751c0b2f7Stbbdev #endif /* __TBB_RESUMABLE_TASKS */
458