151c0b2f7Stbbdev /*
2f8f7f738SPavel Kumbrasev Copyright (c) 2005-2023 Intel Corporation
351c0b2f7Stbbdev
451c0b2f7Stbbdev Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev you may not use this file except in compliance with the License.
651c0b2f7Stbbdev You may obtain a copy of the License at
751c0b2f7Stbbdev
851c0b2f7Stbbdev http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev
1051c0b2f7Stbbdev Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev See the License for the specific language governing permissions and
1451c0b2f7Stbbdev limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev
1751c0b2f7Stbbdev #include "common/test.h"
1851c0b2f7Stbbdev
1951c0b2f7Stbbdev #define __TBB_EXTRA_DEBUG 1
20b15aabb3Stbbdev #include "common/concurrency_tracker.h"
21c4568449SPavel Kumbrasev #include "common/cpu_usertime.h"
2251c0b2f7Stbbdev #include "common/spin_barrier.h"
2351c0b2f7Stbbdev #include "common/utils.h"
2451c0b2f7Stbbdev #include "common/utils_report.h"
2551c0b2f7Stbbdev #include "common/utils_concurrency_limit.h"
2651c0b2f7Stbbdev
2751c0b2f7Stbbdev #include "tbb/task_arena.h"
2851c0b2f7Stbbdev #include "tbb/task_scheduler_observer.h"
2951c0b2f7Stbbdev #include "tbb/enumerable_thread_specific.h"
3051c0b2f7Stbbdev #include "tbb/parallel_for.h"
3151c0b2f7Stbbdev #include "tbb/global_control.h"
3251c0b2f7Stbbdev #include "tbb/concurrent_set.h"
3351c0b2f7Stbbdev #include "tbb/spin_mutex.h"
3451c0b2f7Stbbdev #include "tbb/spin_rw_mutex.h"
35478de5b1Stbbdev #include "tbb/task_group.h"
3651c0b2f7Stbbdev
3751c0b2f7Stbbdev #include <atomic>
38c4568449SPavel Kumbrasev #include <condition_variable>
39c4568449SPavel Kumbrasev #include <cstdio>
40c4568449SPavel Kumbrasev #include <cstdlib>
41c4568449SPavel Kumbrasev #include <stdexcept>
42c4568449SPavel Kumbrasev #include <thread>
43c4568449SPavel Kumbrasev #include <vector>
4451c0b2f7Stbbdev
4551c0b2f7Stbbdev //#include "harness_fp.h"
4651c0b2f7Stbbdev
4751c0b2f7Stbbdev //! \file test_task_arena.cpp
4851c0b2f7Stbbdev //! \brief Test for [scheduler.task_arena scheduler.task_scheduler_observer] specification
4951c0b2f7Stbbdev
5051c0b2f7Stbbdev //--------------------------------------------------//
5151c0b2f7Stbbdev // Test that task_arena::initialize and task_arena::terminate work when doing nothing else.
5251c0b2f7Stbbdev /* maxthread is treated as the biggest possible concurrency level. */
InitializeAndTerminate(int maxthread)5351c0b2f7Stbbdev void InitializeAndTerminate( int maxthread ) {
5451c0b2f7Stbbdev for( int i=0; i<200; ++i ) {
5551c0b2f7Stbbdev switch( i&3 ) {
5651c0b2f7Stbbdev // Arena is created inactive, initialization is always explicit. Lazy initialization is covered by other test functions.
5751c0b2f7Stbbdev // Explicit initialization can either keep the original values or change those.
5851c0b2f7Stbbdev // Arena termination can be explicit or implicit (in the destructor).
5951c0b2f7Stbbdev // TODO: extend with concurrency level checks if such a method is added.
6051c0b2f7Stbbdev default: {
6151c0b2f7Stbbdev tbb::task_arena arena( std::rand() % maxthread + 1 );
6251c0b2f7Stbbdev CHECK_MESSAGE(!arena.is_active(), "arena should not be active until initialized");
6351c0b2f7Stbbdev arena.initialize();
6451c0b2f7Stbbdev CHECK(arena.is_active());
6551c0b2f7Stbbdev arena.terminate();
6651c0b2f7Stbbdev CHECK_MESSAGE(!arena.is_active(), "arena should not be active; it was terminated");
6751c0b2f7Stbbdev break;
6851c0b2f7Stbbdev }
6951c0b2f7Stbbdev case 0: {
7051c0b2f7Stbbdev tbb::task_arena arena( 1 );
7151c0b2f7Stbbdev CHECK_MESSAGE(!arena.is_active(), "arena should not be active until initialized");
7251c0b2f7Stbbdev arena.initialize( std::rand() % maxthread + 1 ); // change the parameters
7351c0b2f7Stbbdev CHECK(arena.is_active());
7451c0b2f7Stbbdev break;
7551c0b2f7Stbbdev }
7651c0b2f7Stbbdev case 1: {
7751c0b2f7Stbbdev tbb::task_arena arena( tbb::task_arena::automatic );
7851c0b2f7Stbbdev CHECK(!arena.is_active());
7951c0b2f7Stbbdev arena.initialize();
8051c0b2f7Stbbdev CHECK(arena.is_active());
8151c0b2f7Stbbdev break;
8251c0b2f7Stbbdev }
8351c0b2f7Stbbdev case 2: {
8451c0b2f7Stbbdev tbb::task_arena arena;
8551c0b2f7Stbbdev CHECK_MESSAGE(!arena.is_active(), "arena should not be active until initialized");
8651c0b2f7Stbbdev arena.initialize( std::rand() % maxthread + 1 );
8751c0b2f7Stbbdev CHECK(arena.is_active());
8851c0b2f7Stbbdev arena.terminate();
8951c0b2f7Stbbdev CHECK_MESSAGE(!arena.is_active(), "arena should not be active; it was terminated");
9051c0b2f7Stbbdev break;
9151c0b2f7Stbbdev }
9251c0b2f7Stbbdev }
9351c0b2f7Stbbdev }
9451c0b2f7Stbbdev }
9551c0b2f7Stbbdev
9651c0b2f7Stbbdev //--------------------------------------------------//
9751c0b2f7Stbbdev
9851c0b2f7Stbbdev // Definitions used in more than one test
9951c0b2f7Stbbdev typedef tbb::blocked_range<int> Range;
10051c0b2f7Stbbdev
10151c0b2f7Stbbdev // slot_id value: -1 is reserved by current_slot(), -2 is set in on_scheduler_exit() below
10251c0b2f7Stbbdev static tbb::enumerable_thread_specific<int> local_id, old_id, slot_id(-3);
10351c0b2f7Stbbdev
ResetTLS()10451c0b2f7Stbbdev void ResetTLS() {
10551c0b2f7Stbbdev local_id.clear();
10651c0b2f7Stbbdev old_id.clear();
10751c0b2f7Stbbdev slot_id.clear();
10851c0b2f7Stbbdev }
10951c0b2f7Stbbdev
11051c0b2f7Stbbdev class ArenaObserver : public tbb::task_scheduler_observer {
11151c0b2f7Stbbdev int myId; // unique observer/arena id within a test
11251c0b2f7Stbbdev int myMaxConcurrency; // concurrency of the associated arena
11351c0b2f7Stbbdev int myNumReservedSlots; // reserved slots in the associated arena
on_scheduler_entry(bool is_worker)11451c0b2f7Stbbdev void on_scheduler_entry( bool is_worker ) override {
11551c0b2f7Stbbdev int current_index = tbb::this_task_arena::current_thread_index();
11651c0b2f7Stbbdev CHECK(current_index < (myMaxConcurrency > 1 ? myMaxConcurrency : 2));
11751c0b2f7Stbbdev if (is_worker) {
11851c0b2f7Stbbdev CHECK(current_index >= myNumReservedSlots);
11951c0b2f7Stbbdev }
12051c0b2f7Stbbdev CHECK_MESSAGE(!old_id.local(), "double call to on_scheduler_entry");
12151c0b2f7Stbbdev old_id.local() = local_id.local();
12251c0b2f7Stbbdev CHECK_MESSAGE(old_id.local() != myId, "double entry to the same arena");
12351c0b2f7Stbbdev local_id.local() = myId;
12451c0b2f7Stbbdev slot_id.local() = current_index;
12551c0b2f7Stbbdev }
on_scheduler_exit(bool)12651c0b2f7Stbbdev void on_scheduler_exit( bool /*is_worker*/ ) override {
12751c0b2f7Stbbdev CHECK_MESSAGE(local_id.local() == myId, "nesting of arenas is broken");
12851c0b2f7Stbbdev CHECK(slot_id.local() == tbb::this_task_arena::current_thread_index());
12951c0b2f7Stbbdev slot_id.local() = -2;
13051c0b2f7Stbbdev local_id.local() = old_id.local();
13151c0b2f7Stbbdev old_id.local() = 0;
13251c0b2f7Stbbdev }
13351c0b2f7Stbbdev public:
ArenaObserver(tbb::task_arena & a,int maxConcurrency,int numReservedSlots,int id)13451c0b2f7Stbbdev ArenaObserver(tbb::task_arena &a, int maxConcurrency, int numReservedSlots, int id)
13551c0b2f7Stbbdev : tbb::task_scheduler_observer(a)
13651c0b2f7Stbbdev , myId(id)
13751c0b2f7Stbbdev , myMaxConcurrency(maxConcurrency)
13851c0b2f7Stbbdev , myNumReservedSlots(numReservedSlots) {
13951c0b2f7Stbbdev CHECK(myId);
14051c0b2f7Stbbdev observe(true);
14151c0b2f7Stbbdev }
~ArenaObserver()14251c0b2f7Stbbdev ~ArenaObserver () {
14374207e5dSAnton Potapov observe(false);
14451c0b2f7Stbbdev CHECK_MESSAGE(!old_id.local(), "inconsistent observer state");
14551c0b2f7Stbbdev }
14651c0b2f7Stbbdev };
14751c0b2f7Stbbdev
14851c0b2f7Stbbdev struct IndexTrackingBody { // Must be used together with ArenaObserver
operator ()IndexTrackingBody14951c0b2f7Stbbdev void operator() ( const Range& ) const {
15051c0b2f7Stbbdev CHECK(slot_id.local() == tbb::this_task_arena::current_thread_index());
151b15aabb3Stbbdev utils::doDummyWork(50000);
15251c0b2f7Stbbdev }
15351c0b2f7Stbbdev };
15451c0b2f7Stbbdev
15551c0b2f7Stbbdev struct AsynchronousWork {
15651c0b2f7Stbbdev utils::SpinBarrier &my_barrier;
15751c0b2f7Stbbdev bool my_is_blocking;
AsynchronousWorkAsynchronousWork15851c0b2f7Stbbdev AsynchronousWork(utils::SpinBarrier &a_barrier, bool blocking = true)
15951c0b2f7Stbbdev : my_barrier(a_barrier), my_is_blocking(blocking) {}
operator ()AsynchronousWork16051c0b2f7Stbbdev void operator()() const {
16151c0b2f7Stbbdev CHECK_MESSAGE(local_id.local() != 0, "not in explicit arena");
16251c0b2f7Stbbdev tbb::parallel_for(Range(0,500), IndexTrackingBody(), tbb::simple_partitioner());
163b15aabb3Stbbdev if(my_is_blocking) my_barrier.wait(); // must be asynchronous to an external thread
16451c0b2f7Stbbdev else my_barrier.signalNoWait();
16551c0b2f7Stbbdev }
16651c0b2f7Stbbdev };
16751c0b2f7Stbbdev
16851c0b2f7Stbbdev //-----------------------------------------------------------------------------------------//
16951c0b2f7Stbbdev
17051c0b2f7Stbbdev // Test that task_arenas might be created and used from multiple application threads.
17151c0b2f7Stbbdev // Also tests arena observers. The parameter p is the index of an app thread running this test.
TestConcurrentArenasFunc(int idx)17251c0b2f7Stbbdev void TestConcurrentArenasFunc(int idx) {
17351c0b2f7Stbbdev // A regression test for observer activation order:
17451c0b2f7Stbbdev // check that arena observer can be activated before local observer
17551c0b2f7Stbbdev struct LocalObserver : public tbb::task_scheduler_observer {
17651c0b2f7Stbbdev LocalObserver() : tbb::task_scheduler_observer() { observe(true); }
177112076d0SIlya Isaev LocalObserver(tbb::task_arena& a) : tbb::task_scheduler_observer(a) {
178112076d0SIlya Isaev observe(true);
179112076d0SIlya Isaev }
18074207e5dSAnton Potapov ~LocalObserver() {
18174207e5dSAnton Potapov observe(false);
18274207e5dSAnton Potapov }
18351c0b2f7Stbbdev };
184112076d0SIlya Isaev
18551c0b2f7Stbbdev tbb::task_arena a1;
18651c0b2f7Stbbdev a1.initialize(1,0);
18751c0b2f7Stbbdev ArenaObserver o1(a1, 1, 0, idx*2+1); // the last argument is a "unique" observer/arena id for the test
18851c0b2f7Stbbdev CHECK_MESSAGE(o1.is_observing(), "Arena observer has not been activated");
189112076d0SIlya Isaev
19051c0b2f7Stbbdev tbb::task_arena a2(2,1);
19151c0b2f7Stbbdev ArenaObserver o2(a2, 2, 1, idx*2+2);
19251c0b2f7Stbbdev CHECK_MESSAGE(o2.is_observing(), "Arena observer has not been activated");
193112076d0SIlya Isaev
194112076d0SIlya Isaev LocalObserver lo1;
195112076d0SIlya Isaev CHECK_MESSAGE(lo1.is_observing(), "Local observer has not been activated");
196112076d0SIlya Isaev
197112076d0SIlya Isaev tbb::task_arena a3(1, 0);
198112076d0SIlya Isaev LocalObserver lo2(a3);
199112076d0SIlya Isaev CHECK_MESSAGE(lo2.is_observing(), "Local observer has not been activated");
200112076d0SIlya Isaev
20151c0b2f7Stbbdev utils::SpinBarrier barrier(2);
20251c0b2f7Stbbdev AsynchronousWork work(barrier);
203112076d0SIlya Isaev
20451c0b2f7Stbbdev a1.enqueue(work); // put async work
20551c0b2f7Stbbdev barrier.wait();
206112076d0SIlya Isaev
20751c0b2f7Stbbdev a2.enqueue(work); // another work
20851c0b2f7Stbbdev a2.execute(work);
209112076d0SIlya Isaev
210112076d0SIlya Isaev a3.execute([] {
211112076d0SIlya Isaev utils::doDummyWork(100);
212112076d0SIlya Isaev });
213112076d0SIlya Isaev
21451c0b2f7Stbbdev a1.debug_wait_until_empty();
21551c0b2f7Stbbdev a2.debug_wait_until_empty();
21651c0b2f7Stbbdev }
21751c0b2f7Stbbdev
TestConcurrentArenas(int p)21851c0b2f7Stbbdev void TestConcurrentArenas(int p) {
21951c0b2f7Stbbdev // TODO REVAMP fix with global control
22051c0b2f7Stbbdev ResetTLS();
22151c0b2f7Stbbdev utils::NativeParallelFor( p, &TestConcurrentArenasFunc );
22251c0b2f7Stbbdev }
22351c0b2f7Stbbdev //--------------------------------------------------//
22451c0b2f7Stbbdev // Test multiple application threads working with a single arena at the same time.
22551c0b2f7Stbbdev class MultipleMastersPart1 : utils::NoAssign {
22651c0b2f7Stbbdev tbb::task_arena &my_a;
22751c0b2f7Stbbdev utils::SpinBarrier &my_b1, &my_b2;
22851c0b2f7Stbbdev public:
MultipleMastersPart1(tbb::task_arena & a,utils::SpinBarrier & b1,utils::SpinBarrier & b2)22951c0b2f7Stbbdev MultipleMastersPart1( tbb::task_arena &a, utils::SpinBarrier &b1, utils::SpinBarrier &b2)
23051c0b2f7Stbbdev : my_a(a), my_b1(b1), my_b2(b2) {}
operator ()(int) const23151c0b2f7Stbbdev void operator()(int) const {
23251c0b2f7Stbbdev my_a.execute(AsynchronousWork(my_b2, /*blocking=*/false));
23351c0b2f7Stbbdev my_b1.wait();
23451c0b2f7Stbbdev // A regression test for bugs 1954 & 1971
23551c0b2f7Stbbdev my_a.enqueue(AsynchronousWork(my_b2, /*blocking=*/false));
23651c0b2f7Stbbdev }
23751c0b2f7Stbbdev };
23851c0b2f7Stbbdev
23951c0b2f7Stbbdev class MultipleMastersPart2 : utils::NoAssign {
24051c0b2f7Stbbdev tbb::task_arena &my_a;
24151c0b2f7Stbbdev utils::SpinBarrier &my_b;
24251c0b2f7Stbbdev public:
MultipleMastersPart2(tbb::task_arena & a,utils::SpinBarrier & b)24351c0b2f7Stbbdev MultipleMastersPart2( tbb::task_arena &a, utils::SpinBarrier &b) : my_a(a), my_b(b) {}
operator ()(int) const24451c0b2f7Stbbdev void operator()(int) const {
24551c0b2f7Stbbdev my_a.execute(AsynchronousWork(my_b, /*blocking=*/false));
24651c0b2f7Stbbdev }
24751c0b2f7Stbbdev };
24851c0b2f7Stbbdev
24951c0b2f7Stbbdev class MultipleMastersPart3 : utils::NoAssign {
25051c0b2f7Stbbdev tbb::task_arena &my_a;
25151c0b2f7Stbbdev utils::SpinBarrier &my_b;
25251c0b2f7Stbbdev using wait_context = tbb::detail::d1::wait_context;
25351c0b2f7Stbbdev
25451c0b2f7Stbbdev struct Runner : NoAssign {
25551c0b2f7Stbbdev wait_context& myWait;
RunnerMultipleMastersPart3::Runner25651c0b2f7Stbbdev Runner(wait_context& w) : myWait(w) {}
operator ()MultipleMastersPart3::Runner25751c0b2f7Stbbdev void operator()() const {
258b15aabb3Stbbdev utils::doDummyWork(10000);
25951c0b2f7Stbbdev myWait.release();
26051c0b2f7Stbbdev }
26151c0b2f7Stbbdev };
26251c0b2f7Stbbdev
26351c0b2f7Stbbdev struct Waiter : NoAssign {
26451c0b2f7Stbbdev wait_context& myWait;
WaiterMultipleMastersPart3::Waiter26551c0b2f7Stbbdev Waiter(wait_context& w) : myWait(w) {}
operator ()MultipleMastersPart3::Waiter26651c0b2f7Stbbdev void operator()() const {
26751c0b2f7Stbbdev tbb::task_group_context ctx;
26851c0b2f7Stbbdev tbb::detail::d1::wait(myWait, ctx);
26951c0b2f7Stbbdev }
27051c0b2f7Stbbdev };
27151c0b2f7Stbbdev
27251c0b2f7Stbbdev public:
MultipleMastersPart3(tbb::task_arena & a,utils::SpinBarrier & b)27351c0b2f7Stbbdev MultipleMastersPart3(tbb::task_arena &a, utils::SpinBarrier &b)
27451c0b2f7Stbbdev : my_a(a), my_b(b) {}
operator ()(int) const27551c0b2f7Stbbdev void operator()(int) const {
27651c0b2f7Stbbdev wait_context wait(0);
27751c0b2f7Stbbdev my_b.wait(); // increases chances for task_arena initialization contention
27851c0b2f7Stbbdev for( int i=0; i<100; ++i) {
27951c0b2f7Stbbdev wait.reserve();
28051c0b2f7Stbbdev my_a.enqueue(Runner(wait));
28151c0b2f7Stbbdev my_a.execute(Waiter(wait));
28251c0b2f7Stbbdev }
28351c0b2f7Stbbdev my_b.wait();
28451c0b2f7Stbbdev }
28551c0b2f7Stbbdev };
28651c0b2f7Stbbdev
TestMultipleMasters(int p)28751c0b2f7Stbbdev void TestMultipleMasters(int p) {
28851c0b2f7Stbbdev {
28951c0b2f7Stbbdev ResetTLS();
29051c0b2f7Stbbdev tbb::task_arena a(1,0);
29151c0b2f7Stbbdev a.initialize();
29251c0b2f7Stbbdev ArenaObserver o(a, 1, 0, 1);
29351c0b2f7Stbbdev utils::SpinBarrier barrier1(p), barrier2(2*p+1); // each of p threads will submit two tasks signaling the barrier
29451c0b2f7Stbbdev NativeParallelFor( p, MultipleMastersPart1(a, barrier1, barrier2) );
29551c0b2f7Stbbdev barrier2.wait();
29651c0b2f7Stbbdev a.debug_wait_until_empty();
29751c0b2f7Stbbdev } {
29851c0b2f7Stbbdev ResetTLS();
29951c0b2f7Stbbdev tbb::task_arena a(2,1);
30051c0b2f7Stbbdev ArenaObserver o(a, 2, 1, 2);
30151c0b2f7Stbbdev utils::SpinBarrier barrier(p+2);
30251c0b2f7Stbbdev a.enqueue(AsynchronousWork(barrier, /*blocking=*/true)); // occupy the worker, a regression test for bug 1981
303b15aabb3Stbbdev // TODO: buggy test. A worker threads need time to occupy the slot to prevent an external thread from taking an enqueue task.
30451c0b2f7Stbbdev utils::Sleep(10);
30551c0b2f7Stbbdev NativeParallelFor( p, MultipleMastersPart2(a, barrier) );
30651c0b2f7Stbbdev barrier.wait();
30751c0b2f7Stbbdev a.debug_wait_until_empty();
30851c0b2f7Stbbdev } {
30951c0b2f7Stbbdev // Regression test for the bug 1981 part 2 (task_arena::execute() with wait_for_all for an enqueued task)
31051c0b2f7Stbbdev tbb::task_arena a(p,1);
311b15aabb3Stbbdev utils::SpinBarrier barrier(p+1); // for external threads to avoid endless waiting at least in some runs
312b15aabb3Stbbdev // "Oversubscribe" the arena by 1 external thread
31351c0b2f7Stbbdev NativeParallelFor( p+1, MultipleMastersPart3(a, barrier) );
31451c0b2f7Stbbdev a.debug_wait_until_empty();
31551c0b2f7Stbbdev }
31651c0b2f7Stbbdev }
31751c0b2f7Stbbdev
31851c0b2f7Stbbdev //--------------------------------------------------//
31951c0b2f7Stbbdev // TODO: explain what TestArenaEntryConsistency does
32051c0b2f7Stbbdev #include <sstream>
32151c0b2f7Stbbdev #include <stdexcept>
32249e08aacStbbdev #include "oneapi/tbb/detail/_exception.h"
32351c0b2f7Stbbdev #include "common/fp_control.h"
32451c0b2f7Stbbdev
32551c0b2f7Stbbdev struct TestArenaEntryBody : FPModeContext {
32651c0b2f7Stbbdev std::atomic<int> &my_stage; // each execute increases it
32751c0b2f7Stbbdev std::stringstream my_id;
32851c0b2f7Stbbdev bool is_caught, is_expected;
32951c0b2f7Stbbdev enum { arenaFPMode = 1 };
33051c0b2f7Stbbdev
TestArenaEntryBodyTestArenaEntryBody33151c0b2f7Stbbdev TestArenaEntryBody(std::atomic<int> &s, int idx, int i) // init thread-specific instance
33251c0b2f7Stbbdev : FPModeContext(idx+i)
33351c0b2f7Stbbdev , my_stage(s)
33451c0b2f7Stbbdev , is_caught(false)
33551c0b2f7Stbbdev #if TBB_USE_EXCEPTIONS
33651c0b2f7Stbbdev , is_expected( (idx&(1<<i)) != 0 )
33751c0b2f7Stbbdev #else
33851c0b2f7Stbbdev , is_expected(false)
33951c0b2f7Stbbdev #endif
34051c0b2f7Stbbdev {
34151c0b2f7Stbbdev my_id << idx << ':' << i << '@';
34251c0b2f7Stbbdev }
operator ()TestArenaEntryBody34351c0b2f7Stbbdev void operator()() { // inside task_arena::execute()
34451c0b2f7Stbbdev // synchronize with other stages
34551c0b2f7Stbbdev int stage = my_stage++;
34651c0b2f7Stbbdev int slot = tbb::this_task_arena::current_thread_index();
34751c0b2f7Stbbdev CHECK(slot >= 0);
34851c0b2f7Stbbdev CHECK(slot <= 1);
34951c0b2f7Stbbdev // wait until the third stage is delegated and then starts on slot 0
350b15aabb3Stbbdev while(my_stage < 2+slot) utils::yield();
35151c0b2f7Stbbdev // deduct its entry type and put it into id, it helps to find source of a problem
35251c0b2f7Stbbdev my_id << (stage < 3 ? (tbb::this_task_arena::current_thread_index()?
35351c0b2f7Stbbdev "delegated_to_worker" : stage < 2? "direct" : "delegated_to_master")
35451c0b2f7Stbbdev : stage == 3? "nested_same_ctx" : "nested_alien_ctx");
35551c0b2f7Stbbdev AssertFPMode(arenaFPMode);
35651c0b2f7Stbbdev if (is_expected) {
35751c0b2f7Stbbdev TBB_TEST_THROW(std::logic_error(my_id.str()));
35851c0b2f7Stbbdev }
35951c0b2f7Stbbdev // no code can be put here since exceptions can be thrown
36051c0b2f7Stbbdev }
on_exceptionTestArenaEntryBody36151c0b2f7Stbbdev void on_exception(const char *e) { // outside arena, in catch block
36251c0b2f7Stbbdev is_caught = true;
36351c0b2f7Stbbdev CHECK(my_id.str() == e);
36451c0b2f7Stbbdev assertFPMode();
36551c0b2f7Stbbdev }
after_executeTestArenaEntryBody36651c0b2f7Stbbdev void after_execute() { // outside arena and catch block
36751c0b2f7Stbbdev CHECK(is_caught == is_expected);
36851c0b2f7Stbbdev assertFPMode();
36951c0b2f7Stbbdev }
37051c0b2f7Stbbdev };
37151c0b2f7Stbbdev
37251c0b2f7Stbbdev class ForEachArenaEntryBody : utils::NoAssign {
37351c0b2f7Stbbdev tbb::task_arena &my_a; // expected task_arena(2,1)
37451c0b2f7Stbbdev std::atomic<int> &my_stage; // each execute increases it
37551c0b2f7Stbbdev int my_idx;
37651c0b2f7Stbbdev
37751c0b2f7Stbbdev public:
ForEachArenaEntryBody(tbb::task_arena & a,std::atomic<int> & c)37851c0b2f7Stbbdev ForEachArenaEntryBody(tbb::task_arena &a, std::atomic<int> &c)
37951c0b2f7Stbbdev : my_a(a), my_stage(c), my_idx(0) {}
38051c0b2f7Stbbdev
test(int idx)38151c0b2f7Stbbdev void test(int idx) {
38251c0b2f7Stbbdev my_idx = idx;
38351c0b2f7Stbbdev my_stage = 0;
38451c0b2f7Stbbdev NativeParallelFor(3, *this); // test cross-arena calls
38551c0b2f7Stbbdev CHECK(my_stage == 3);
38651c0b2f7Stbbdev my_a.execute(*this); // test nested calls
38751c0b2f7Stbbdev CHECK(my_stage == 5);
38851c0b2f7Stbbdev }
38951c0b2f7Stbbdev
39051c0b2f7Stbbdev // task_arena functor for nested tests
operator ()() const39151c0b2f7Stbbdev void operator()() const {
39251c0b2f7Stbbdev test_arena_entry(3); // in current task group context
39351c0b2f7Stbbdev tbb::parallel_for(4, 5, *this); // in different context
39451c0b2f7Stbbdev }
39551c0b2f7Stbbdev
39651c0b2f7Stbbdev // NativeParallelFor & parallel_for functor
operator ()(int i) const39751c0b2f7Stbbdev void operator()(int i) const {
39851c0b2f7Stbbdev test_arena_entry(i);
39951c0b2f7Stbbdev }
40051c0b2f7Stbbdev
40151c0b2f7Stbbdev private:
test_arena_entry(int i) const40251c0b2f7Stbbdev void test_arena_entry(int i) const {
40351c0b2f7Stbbdev GetRoundingMode();
40451c0b2f7Stbbdev TestArenaEntryBody scoped_functor(my_stage, my_idx, i);
40551c0b2f7Stbbdev GetRoundingMode();
40651c0b2f7Stbbdev #if TBB_USE_EXCEPTIONS
40751c0b2f7Stbbdev try {
40851c0b2f7Stbbdev my_a.execute(scoped_functor);
40951c0b2f7Stbbdev } catch(std::logic_error &e) {
41051c0b2f7Stbbdev scoped_functor.on_exception(e.what());
41151c0b2f7Stbbdev } catch(...) { CHECK_MESSAGE(false, "Unexpected exception type"); }
41251c0b2f7Stbbdev #else
41351c0b2f7Stbbdev my_a.execute(scoped_functor);
41451c0b2f7Stbbdev #endif
41551c0b2f7Stbbdev scoped_functor.after_execute();
41651c0b2f7Stbbdev }
41751c0b2f7Stbbdev };
41851c0b2f7Stbbdev
TestArenaEntryConsistency()41951c0b2f7Stbbdev void TestArenaEntryConsistency() {
42051c0b2f7Stbbdev tbb::task_arena a(2, 1);
42151c0b2f7Stbbdev std::atomic<int> c;
42251c0b2f7Stbbdev ForEachArenaEntryBody body(a, c);
42351c0b2f7Stbbdev
42451c0b2f7Stbbdev FPModeContext fp_scope(TestArenaEntryBody::arenaFPMode);
42551c0b2f7Stbbdev a.initialize(); // capture FP settings to arena
42651c0b2f7Stbbdev fp_scope.setNextFPMode();
42751c0b2f7Stbbdev
42851c0b2f7Stbbdev for (int i = 0; i < 100; i++) // not less than 32 = 2^5 of entry types
42951c0b2f7Stbbdev body.test(i);
43051c0b2f7Stbbdev }
43151c0b2f7Stbbdev //--------------------------------------------------
43251c0b2f7Stbbdev // Test that the requested degree of concurrency for task_arena is achieved in various conditions
43351c0b2f7Stbbdev class TestArenaConcurrencyBody : utils::NoAssign {
43451c0b2f7Stbbdev tbb::task_arena &my_a;
43551c0b2f7Stbbdev int my_max_concurrency;
43651c0b2f7Stbbdev int my_reserved_slots;
43751c0b2f7Stbbdev utils::SpinBarrier *my_barrier;
43851c0b2f7Stbbdev utils::SpinBarrier *my_worker_barrier;
43951c0b2f7Stbbdev public:
TestArenaConcurrencyBody(tbb::task_arena & a,int max_concurrency,int reserved_slots,utils::SpinBarrier * b=nullptr,utils::SpinBarrier * wb=nullptr)44057f524caSIlya Isaev TestArenaConcurrencyBody( tbb::task_arena &a, int max_concurrency, int reserved_slots, utils::SpinBarrier *b = nullptr, utils::SpinBarrier *wb = nullptr )
44151c0b2f7Stbbdev : my_a(a), my_max_concurrency(max_concurrency), my_reserved_slots(reserved_slots), my_barrier(b), my_worker_barrier(wb) {}
44251c0b2f7Stbbdev // NativeParallelFor's functor
operator ()(int) const44351c0b2f7Stbbdev void operator()( int ) const {
44451c0b2f7Stbbdev CHECK_MESSAGE( local_id.local() == 0, "TLS was not cleaned?" );
44551c0b2f7Stbbdev local_id.local() = 1;
44651c0b2f7Stbbdev my_a.execute( *this );
44751c0b2f7Stbbdev }
44851c0b2f7Stbbdev // Arena's functor
operator ()() const44951c0b2f7Stbbdev void operator()() const {
45051c0b2f7Stbbdev int idx = tbb::this_task_arena::current_thread_index();
451c4568449SPavel Kumbrasev REQUIRE( idx < (my_max_concurrency > 1 ? my_max_concurrency : 2) );
452c4568449SPavel Kumbrasev REQUIRE( my_a.max_concurrency() == tbb::this_task_arena::max_concurrency() );
45351c0b2f7Stbbdev int max_arena_concurrency = tbb::this_task_arena::max_concurrency();
454c4568449SPavel Kumbrasev REQUIRE( max_arena_concurrency == my_max_concurrency );
45551c0b2f7Stbbdev if ( my_worker_barrier ) {
45651c0b2f7Stbbdev if ( local_id.local() == 1 ) {
457b15aabb3Stbbdev // External thread in a reserved slot
458b15aabb3Stbbdev CHECK_MESSAGE( idx < my_reserved_slots, "External threads are supposed to use only reserved slots in this test" );
45951c0b2f7Stbbdev } else {
46051c0b2f7Stbbdev // Worker thread
46151c0b2f7Stbbdev CHECK( idx >= my_reserved_slots );
46251c0b2f7Stbbdev my_worker_barrier->wait();
46351c0b2f7Stbbdev }
46451c0b2f7Stbbdev } else if ( my_barrier )
46551c0b2f7Stbbdev CHECK_MESSAGE( local_id.local() == 1, "Workers are not supposed to enter the arena in this test" );
46651c0b2f7Stbbdev if ( my_barrier ) my_barrier->wait();
467b15aabb3Stbbdev else utils::Sleep( 1 );
46851c0b2f7Stbbdev }
46951c0b2f7Stbbdev };
47051c0b2f7Stbbdev
TestArenaConcurrency(int p,int reserved=0,int step=1)47151c0b2f7Stbbdev void TestArenaConcurrency( int p, int reserved = 0, int step = 1) {
47251c0b2f7Stbbdev for (; reserved <= p; reserved += step) {
47351c0b2f7Stbbdev tbb::task_arena a( p, reserved );
474478de5b1Stbbdev if (p - reserved < tbb::this_task_arena::max_concurrency()) {
475478de5b1Stbbdev // Check concurrency with worker & reserved external threads.
47651c0b2f7Stbbdev ResetTLS();
47751c0b2f7Stbbdev utils::SpinBarrier b( p );
47851c0b2f7Stbbdev utils::SpinBarrier wb( p-reserved );
47951c0b2f7Stbbdev TestArenaConcurrencyBody test( a, p, reserved, &b, &wb );
480478de5b1Stbbdev for ( int i = reserved; i < p; ++i ) // requests p-reserved worker threads
48151c0b2f7Stbbdev a.enqueue( test );
48251c0b2f7Stbbdev if ( reserved==1 )
48351c0b2f7Stbbdev test( 0 ); // calls execute()
48451c0b2f7Stbbdev else
48551c0b2f7Stbbdev utils::NativeParallelFor( reserved, test );
48651c0b2f7Stbbdev a.debug_wait_until_empty();
487b15aabb3Stbbdev } { // Check if multiple external threads alone can achieve maximum concurrency.
48851c0b2f7Stbbdev ResetTLS();
48951c0b2f7Stbbdev utils::SpinBarrier b( p );
49051c0b2f7Stbbdev utils::NativeParallelFor( p, TestArenaConcurrencyBody( a, p, reserved, &b ) );
49151c0b2f7Stbbdev a.debug_wait_until_empty();
492b15aabb3Stbbdev } { // Check oversubscription by external threads.
49351c0b2f7Stbbdev #if !_WIN32 || !_WIN64
49451c0b2f7Stbbdev // Some C++ implementations allocate 8MB stacks for std::thread on 32 bit platforms
49551c0b2f7Stbbdev // that makes impossible to create more than ~500 threads.
49651c0b2f7Stbbdev if ( !(sizeof(std::size_t) == 4 && p > 200) )
49751c0b2f7Stbbdev #endif
49851c0b2f7Stbbdev #if TBB_TEST_LOW_WORKLOAD
49951c0b2f7Stbbdev if ( p <= 16 )
50051c0b2f7Stbbdev #endif
50151c0b2f7Stbbdev {
50251c0b2f7Stbbdev ResetTLS();
50351c0b2f7Stbbdev utils::NativeParallelFor(2 * p, TestArenaConcurrencyBody(a, p, reserved));
50451c0b2f7Stbbdev a.debug_wait_until_empty();
50551c0b2f7Stbbdev }
50651c0b2f7Stbbdev }
50751c0b2f7Stbbdev }
50851c0b2f7Stbbdev }
50951c0b2f7Stbbdev
510b15aabb3Stbbdev struct TestMandatoryConcurrencyObserver : public tbb::task_scheduler_observer {
511b15aabb3Stbbdev utils::SpinBarrier& m_barrier;
512b15aabb3Stbbdev
TestMandatoryConcurrencyObserverTestMandatoryConcurrencyObserver513b15aabb3Stbbdev TestMandatoryConcurrencyObserver(tbb::task_arena& a, utils::SpinBarrier& barrier)
514b15aabb3Stbbdev : tbb::task_scheduler_observer(a), m_barrier(barrier) {
515b15aabb3Stbbdev observe(true);
516b15aabb3Stbbdev }
~TestMandatoryConcurrencyObserverTestMandatoryConcurrencyObserver51774207e5dSAnton Potapov ~TestMandatoryConcurrencyObserver() {
51874207e5dSAnton Potapov observe(false);
51974207e5dSAnton Potapov }
on_scheduler_exitTestMandatoryConcurrencyObserver520b15aabb3Stbbdev void on_scheduler_exit(bool worker) override {
521b15aabb3Stbbdev if (worker) {
522b15aabb3Stbbdev m_barrier.wait();
523b15aabb3Stbbdev }
524b15aabb3Stbbdev }
525b15aabb3Stbbdev };
526b15aabb3Stbbdev
TestMandatoryConcurrency()527b15aabb3Stbbdev void TestMandatoryConcurrency() {
528b15aabb3Stbbdev tbb::task_arena a(1);
529b15aabb3Stbbdev a.execute([&a] {
530b15aabb3Stbbdev int n_threads = 4;
531b15aabb3Stbbdev utils::SpinBarrier exit_barrier(2);
532b15aabb3Stbbdev TestMandatoryConcurrencyObserver observer(a, exit_barrier);
533b15aabb3Stbbdev for (int j = 0; j < 5; ++j) {
534b15aabb3Stbbdev utils::ExactConcurrencyLevel::check(1);
535b15aabb3Stbbdev std::atomic<int> num_tasks{ 0 }, curr_tasks{ 0 };
536b15aabb3Stbbdev utils::SpinBarrier barrier(n_threads);
537b15aabb3Stbbdev utils::NativeParallelFor(n_threads, [&](int) {
538b15aabb3Stbbdev for (int i = 0; i < 5; ++i) {
539b15aabb3Stbbdev barrier.wait();
540b15aabb3Stbbdev a.enqueue([&] {
541b15aabb3Stbbdev CHECK(tbb::this_task_arena::max_concurrency() == 2);
542b15aabb3Stbbdev CHECK(a.max_concurrency() == 2);
543b15aabb3Stbbdev ++curr_tasks;
544b15aabb3Stbbdev CHECK(curr_tasks == 1);
545b15aabb3Stbbdev utils::doDummyWork(1000);
546b15aabb3Stbbdev CHECK(curr_tasks == 1);
547b15aabb3Stbbdev --curr_tasks;
548b15aabb3Stbbdev ++num_tasks;
549b15aabb3Stbbdev });
550b15aabb3Stbbdev barrier.wait();
551b15aabb3Stbbdev }
552b15aabb3Stbbdev });
553b15aabb3Stbbdev do {
554b15aabb3Stbbdev exit_barrier.wait();
555b15aabb3Stbbdev } while (num_tasks < n_threads * 5);
556b15aabb3Stbbdev }
557b15aabb3Stbbdev });
558b15aabb3Stbbdev }
559b15aabb3Stbbdev
TestConcurrentFunctionality(int min_thread_num=1,int max_thread_num=3)56051c0b2f7Stbbdev void TestConcurrentFunctionality(int min_thread_num = 1, int max_thread_num = 3) {
561b15aabb3Stbbdev TestMandatoryConcurrency();
56251c0b2f7Stbbdev InitializeAndTerminate(max_thread_num);
56351c0b2f7Stbbdev for (int p = min_thread_num; p <= max_thread_num; ++p) {
56451c0b2f7Stbbdev TestConcurrentArenas(p);
56551c0b2f7Stbbdev TestMultipleMasters(p);
56651c0b2f7Stbbdev TestArenaConcurrency(p);
56751c0b2f7Stbbdev }
56851c0b2f7Stbbdev }
56951c0b2f7Stbbdev
57051c0b2f7Stbbdev //--------------------------------------------------//
57151c0b2f7Stbbdev // Test creation/initialization of a task_arena that references an existing arena (aka attach).
57251c0b2f7Stbbdev // This part of the test uses the knowledge of task_arena internals
57351c0b2f7Stbbdev
57451c0b2f7Stbbdev struct TaskArenaValidator {
57551c0b2f7Stbbdev int my_slot_at_construction;
57651c0b2f7Stbbdev const tbb::task_arena& my_arena;
TaskArenaValidatorTaskArenaValidator57751c0b2f7Stbbdev TaskArenaValidator( const tbb::task_arena& other )
57851c0b2f7Stbbdev : my_slot_at_construction(tbb::this_task_arena::current_thread_index())
57951c0b2f7Stbbdev , my_arena(other)
58051c0b2f7Stbbdev {}
58151c0b2f7Stbbdev // Inspect the internal state
concurrencyTaskArenaValidator58251c0b2f7Stbbdev int concurrency() { return my_arena.debug_max_concurrency(); }
reserved_for_mastersTaskArenaValidator58351c0b2f7Stbbdev int reserved_for_masters() { return my_arena.debug_reserved_slots(); }
58451c0b2f7Stbbdev
58551c0b2f7Stbbdev // This method should be called in task_arena::execute() for a captured arena
58651c0b2f7Stbbdev // by the same thread that created the validator.
operator ()TaskArenaValidator58751c0b2f7Stbbdev void operator()() {
58851c0b2f7Stbbdev CHECK_MESSAGE( tbb::this_task_arena::current_thread_index()==my_slot_at_construction,
58951c0b2f7Stbbdev "Current thread index has changed since the validator construction" );
59051c0b2f7Stbbdev }
59151c0b2f7Stbbdev };
59251c0b2f7Stbbdev
ValidateAttachedArena(tbb::task_arena & arena,bool expect_activated,int expect_concurrency,int expect_masters)59351c0b2f7Stbbdev void ValidateAttachedArena( tbb::task_arena& arena, bool expect_activated,
59451c0b2f7Stbbdev int expect_concurrency, int expect_masters ) {
59551c0b2f7Stbbdev CHECK_MESSAGE( arena.is_active()==expect_activated, "Unexpected activation state" );
59651c0b2f7Stbbdev if( arena.is_active() ) {
59751c0b2f7Stbbdev TaskArenaValidator validator( arena );
59851c0b2f7Stbbdev CHECK_MESSAGE( validator.concurrency()==expect_concurrency, "Unexpected arena size" );
59951c0b2f7Stbbdev CHECK_MESSAGE( validator.reserved_for_masters()==expect_masters, "Unexpected # of reserved slots" );
60051c0b2f7Stbbdev if ( tbb::this_task_arena::current_thread_index() != tbb::task_arena::not_initialized ) {
60151c0b2f7Stbbdev CHECK(tbb::this_task_arena::current_thread_index() >= 0);
60251c0b2f7Stbbdev // for threads already in arena, check that the thread index remains the same
60351c0b2f7Stbbdev arena.execute( validator );
60451c0b2f7Stbbdev } else { // not_initialized
60551c0b2f7Stbbdev // Test the deprecated method
60651c0b2f7Stbbdev CHECK(tbb::this_task_arena::current_thread_index() == -1);
60751c0b2f7Stbbdev }
60851c0b2f7Stbbdev
60951c0b2f7Stbbdev // Ideally, there should be a check for having the same internal arena object,
61051c0b2f7Stbbdev // but that object is not easily accessible for implicit arenas.
61151c0b2f7Stbbdev }
61251c0b2f7Stbbdev }
61351c0b2f7Stbbdev
61451c0b2f7Stbbdev struct TestAttachBody : utils::NoAssign {
61551c0b2f7Stbbdev static thread_local int my_idx; // safe to modify and use within the NativeParallelFor functor
61651c0b2f7Stbbdev const int maxthread;
TestAttachBodyTestAttachBody61751c0b2f7Stbbdev TestAttachBody( int max_thr ) : maxthread(max_thr) {}
61851c0b2f7Stbbdev
61951c0b2f7Stbbdev // The functor body for NativeParallelFor
operator ()TestAttachBody62051c0b2f7Stbbdev void operator()( int idx ) const {
62151c0b2f7Stbbdev my_idx = idx;
62251c0b2f7Stbbdev
62351c0b2f7Stbbdev int default_threads = tbb::this_task_arena::max_concurrency();
62451c0b2f7Stbbdev
6255d4a4acfSIvan Kochin tbb::task_arena arena{tbb::task_arena::attach()};
62651c0b2f7Stbbdev ValidateAttachedArena( arena, false, -1, -1 ); // Nothing yet to attach to
62751c0b2f7Stbbdev
62851c0b2f7Stbbdev arena.terminate();
62951c0b2f7Stbbdev ValidateAttachedArena( arena, false, -1, -1 );
63051c0b2f7Stbbdev
63151c0b2f7Stbbdev // attach to an auto-initialized arena
63251c0b2f7Stbbdev tbb::parallel_for(0, 1, [](int) {});
63351c0b2f7Stbbdev
6345d4a4acfSIvan Kochin tbb::task_arena arena2{tbb::task_arena::attach()};
63551c0b2f7Stbbdev ValidateAttachedArena( arena2, true, default_threads, 1 );
63651c0b2f7Stbbdev
6371eaccf7aSAlex tbb::task_arena arena3;
6381eaccf7aSAlex arena3.initialize(tbb::attach());
6391eaccf7aSAlex ValidateAttachedArena( arena3, true, default_threads, 1 );
6401eaccf7aSAlex
6411eaccf7aSAlex
64251c0b2f7Stbbdev // attach to another task_arena
64351c0b2f7Stbbdev arena.initialize( maxthread, std::min(maxthread,idx) );
64451c0b2f7Stbbdev arena.execute( *this );
64551c0b2f7Stbbdev }
64651c0b2f7Stbbdev
64751c0b2f7Stbbdev // The functor body for task_arena::execute above
operator ()TestAttachBody64851c0b2f7Stbbdev void operator()() const {
6495d4a4acfSIvan Kochin tbb::task_arena arena2{tbb::task_arena::attach()};
65051c0b2f7Stbbdev ValidateAttachedArena( arena2, true, maxthread, std::min(maxthread,my_idx) );
65151c0b2f7Stbbdev }
65251c0b2f7Stbbdev
65351c0b2f7Stbbdev // The functor body for tbb::parallel_for
operator ()TestAttachBody65451c0b2f7Stbbdev void operator()( const Range& r ) const {
65551c0b2f7Stbbdev for( int i = r.begin(); i<r.end(); ++i ) {
6565d4a4acfSIvan Kochin tbb::task_arena arena2{tbb::task_arena::attach()};
65751c0b2f7Stbbdev ValidateAttachedArena( arena2, true, tbb::this_task_arena::max_concurrency(), 1 );
65851c0b2f7Stbbdev }
65951c0b2f7Stbbdev }
66051c0b2f7Stbbdev };
66151c0b2f7Stbbdev
66251c0b2f7Stbbdev thread_local int TestAttachBody::my_idx;
66351c0b2f7Stbbdev
TestAttach(int maxthread)66451c0b2f7Stbbdev void TestAttach( int maxthread ) {
66551c0b2f7Stbbdev // Externally concurrent, but no concurrency within a thread
66651c0b2f7Stbbdev utils::NativeParallelFor( std::max(maxthread,4), TestAttachBody( maxthread ) );
66751c0b2f7Stbbdev // Concurrent within the current arena; may also serve as a stress test
66851c0b2f7Stbbdev tbb::parallel_for( Range(0,10000*maxthread), TestAttachBody( maxthread ) );
66951c0b2f7Stbbdev }
67051c0b2f7Stbbdev
67151c0b2f7Stbbdev //--------------------------------------------------//
67251c0b2f7Stbbdev
67351c0b2f7Stbbdev // Test that task_arena::enqueue does not tolerate a non-const functor.
67451c0b2f7Stbbdev // TODO: can it be reworked as SFINAE-based compile-time check?
67551c0b2f7Stbbdev struct TestFunctor {
operator ()TestFunctor67651c0b2f7Stbbdev void operator()() { CHECK_MESSAGE( false, "Non-const operator called" ); }
operator ()TestFunctor67751c0b2f7Stbbdev void operator()() const { /* library requires this overload only */ }
67851c0b2f7Stbbdev };
67951c0b2f7Stbbdev
TestConstantFunctorRequirement()68051c0b2f7Stbbdev void TestConstantFunctorRequirement() {
68151c0b2f7Stbbdev tbb::task_arena a;
68251c0b2f7Stbbdev TestFunctor tf;
68351c0b2f7Stbbdev a.enqueue( tf );
68451c0b2f7Stbbdev }
68551c0b2f7Stbbdev
68651c0b2f7Stbbdev //--------------------------------------------------//
68751c0b2f7Stbbdev
68851c0b2f7Stbbdev #include "tbb/parallel_reduce.h"
68951c0b2f7Stbbdev #include "tbb/parallel_invoke.h"
69051c0b2f7Stbbdev
69151c0b2f7Stbbdev // Test this_task_arena::isolate
69251c0b2f7Stbbdev namespace TestIsolatedExecuteNS {
69351c0b2f7Stbbdev template <typename NestedPartitioner>
69451c0b2f7Stbbdev class NestedParFor : utils::NoAssign {
69551c0b2f7Stbbdev public:
NestedParFor()69651c0b2f7Stbbdev NestedParFor() {}
operator ()() const69751c0b2f7Stbbdev void operator()() const {
69851c0b2f7Stbbdev NestedPartitioner p;
69951c0b2f7Stbbdev tbb::parallel_for( 0, 10, utils::DummyBody( 10 ), p );
70051c0b2f7Stbbdev }
70151c0b2f7Stbbdev };
70251c0b2f7Stbbdev
70351c0b2f7Stbbdev template <typename NestedPartitioner>
70451c0b2f7Stbbdev class ParForBody : utils::NoAssign {
70551c0b2f7Stbbdev bool myOuterIsolation;
70651c0b2f7Stbbdev tbb::enumerable_thread_specific<int> &myEts;
70751c0b2f7Stbbdev std::atomic<bool> &myIsStolen;
70851c0b2f7Stbbdev public:
ParForBody(bool outer_isolation,tbb::enumerable_thread_specific<int> & ets,std::atomic<bool> & is_stolen)70951c0b2f7Stbbdev ParForBody( bool outer_isolation, tbb::enumerable_thread_specific<int> &ets, std::atomic<bool> &is_stolen )
71051c0b2f7Stbbdev : myOuterIsolation( outer_isolation ), myEts( ets ), myIsStolen( is_stolen ) {}
operator ()(int) const71151c0b2f7Stbbdev void operator()( int ) const {
71251c0b2f7Stbbdev int &e = myEts.local();
71351c0b2f7Stbbdev if ( e++ > 0 ) myIsStolen = true;
71451c0b2f7Stbbdev if ( myOuterIsolation )
71551c0b2f7Stbbdev NestedParFor<NestedPartitioner>()();
71651c0b2f7Stbbdev else
71751c0b2f7Stbbdev tbb::this_task_arena::isolate( NestedParFor<NestedPartitioner>() );
71851c0b2f7Stbbdev --e;
71951c0b2f7Stbbdev }
72051c0b2f7Stbbdev };
72151c0b2f7Stbbdev
72251c0b2f7Stbbdev template <typename OuterPartitioner, typename NestedPartitioner>
72351c0b2f7Stbbdev class OuterParFor : utils::NoAssign {
72451c0b2f7Stbbdev bool myOuterIsolation;
72551c0b2f7Stbbdev std::atomic<bool> &myIsStolen;
72651c0b2f7Stbbdev public:
OuterParFor(bool outer_isolation,std::atomic<bool> & is_stolen)72751c0b2f7Stbbdev OuterParFor( bool outer_isolation, std::atomic<bool> &is_stolen ) : myOuterIsolation( outer_isolation ), myIsStolen( is_stolen ) {}
operator ()() const72851c0b2f7Stbbdev void operator()() const {
72951c0b2f7Stbbdev tbb::enumerable_thread_specific<int> ets( 0 );
73051c0b2f7Stbbdev OuterPartitioner p;
73151c0b2f7Stbbdev tbb::parallel_for( 0, 1000, ParForBody<NestedPartitioner>( myOuterIsolation, ets, myIsStolen ), p );
73251c0b2f7Stbbdev }
73351c0b2f7Stbbdev };
73451c0b2f7Stbbdev
73551c0b2f7Stbbdev template <typename OuterPartitioner, typename NestedPartitioner>
TwoLoopsTest(bool outer_isolation)73651c0b2f7Stbbdev void TwoLoopsTest( bool outer_isolation ) {
73751c0b2f7Stbbdev std::atomic<bool> is_stolen;
73851c0b2f7Stbbdev is_stolen = false;
73951c0b2f7Stbbdev const int max_repeats = 100;
74051c0b2f7Stbbdev if ( outer_isolation ) {
74151c0b2f7Stbbdev for ( int i = 0; i <= max_repeats; ++i ) {
74251c0b2f7Stbbdev tbb::this_task_arena::isolate( OuterParFor<OuterPartitioner, NestedPartitioner>( outer_isolation, is_stolen ) );
74351c0b2f7Stbbdev if ( is_stolen ) break;
74451c0b2f7Stbbdev }
74551c0b2f7Stbbdev // TODO: was ASSERT_WARNING
74651c0b2f7Stbbdev if (!is_stolen) {
74751c0b2f7Stbbdev REPORT("Warning: isolate() should not block stealing on nested levels without isolation\n");
74851c0b2f7Stbbdev }
74951c0b2f7Stbbdev } else {
75051c0b2f7Stbbdev for ( int i = 0; i <= max_repeats; ++i ) {
75151c0b2f7Stbbdev OuterParFor<OuterPartitioner, NestedPartitioner>( outer_isolation, is_stolen )();
75251c0b2f7Stbbdev }
753*c4a799dfSJhaShweta1 REQUIRE_MESSAGE( !is_stolen, "isolate() on nested levels should prevent stealing from outer levels" );
75451c0b2f7Stbbdev }
75551c0b2f7Stbbdev }
75651c0b2f7Stbbdev
TwoLoopsTest(bool outer_isolation)75751c0b2f7Stbbdev void TwoLoopsTest( bool outer_isolation ) {
75851c0b2f7Stbbdev TwoLoopsTest<tbb::simple_partitioner, tbb::simple_partitioner>( outer_isolation );
75951c0b2f7Stbbdev TwoLoopsTest<tbb::simple_partitioner, tbb::affinity_partitioner>( outer_isolation );
76051c0b2f7Stbbdev TwoLoopsTest<tbb::affinity_partitioner, tbb::simple_partitioner>( outer_isolation );
76151c0b2f7Stbbdev TwoLoopsTest<tbb::affinity_partitioner, tbb::affinity_partitioner>( outer_isolation );
76251c0b2f7Stbbdev }
76351c0b2f7Stbbdev
TwoLoopsTest()76451c0b2f7Stbbdev void TwoLoopsTest() {
76551c0b2f7Stbbdev TwoLoopsTest( true );
76651c0b2f7Stbbdev TwoLoopsTest( false );
76751c0b2f7Stbbdev }
76851c0b2f7Stbbdev //--------------------------------------------------//
76951c0b2f7Stbbdev class HeavyMixTestBody : utils::NoAssign {
77051c0b2f7Stbbdev tbb::enumerable_thread_specific<utils::FastRandom<>>& myRandom;
77151c0b2f7Stbbdev tbb::enumerable_thread_specific<int>& myIsolatedLevel;
77251c0b2f7Stbbdev int myNestedLevel;
77351c0b2f7Stbbdev
77451c0b2f7Stbbdev template <typename Partitioner, typename Body>
RunTwoBodies(utils::FastRandom<> & rnd,const Body & body,Partitioner & p,tbb::task_group_context * ctx=nullptr)77557f524caSIlya Isaev static void RunTwoBodies( utils::FastRandom<>& rnd, const Body &body, Partitioner& p, tbb::task_group_context* ctx = nullptr ) {
77651c0b2f7Stbbdev if ( rnd.get() % 2 ) {
77751c0b2f7Stbbdev if (ctx )
77851c0b2f7Stbbdev tbb::parallel_for( 0, 2, body, p, *ctx );
77951c0b2f7Stbbdev else
78051c0b2f7Stbbdev tbb::parallel_for( 0, 2, body, p );
78151c0b2f7Stbbdev } else {
78251c0b2f7Stbbdev tbb::parallel_invoke( body, body );
78351c0b2f7Stbbdev }
78451c0b2f7Stbbdev }
78551c0b2f7Stbbdev
78651c0b2f7Stbbdev template <typename Partitioner>
78751c0b2f7Stbbdev class IsolatedBody : utils::NoAssign {
78851c0b2f7Stbbdev const HeavyMixTestBody &myHeavyMixTestBody;
78951c0b2f7Stbbdev Partitioner &myPartitioner;
79051c0b2f7Stbbdev public:
IsolatedBody(const HeavyMixTestBody & body,Partitioner & partitioner)79151c0b2f7Stbbdev IsolatedBody( const HeavyMixTestBody &body, Partitioner &partitioner )
79251c0b2f7Stbbdev : myHeavyMixTestBody( body ), myPartitioner( partitioner ) {}
operator ()() const79351c0b2f7Stbbdev void operator()() const {
79451c0b2f7Stbbdev RunTwoBodies( myHeavyMixTestBody.myRandom.local(),
79551c0b2f7Stbbdev HeavyMixTestBody( myHeavyMixTestBody.myRandom, myHeavyMixTestBody.myIsolatedLevel,
79651c0b2f7Stbbdev myHeavyMixTestBody.myNestedLevel + 1 ),
79751c0b2f7Stbbdev myPartitioner );
79851c0b2f7Stbbdev }
79951c0b2f7Stbbdev };
80051c0b2f7Stbbdev
80151c0b2f7Stbbdev template <typename Partitioner>
RunNextLevel(utils::FastRandom<> & rnd,int & isolated_level) const80251c0b2f7Stbbdev void RunNextLevel( utils::FastRandom<>& rnd, int &isolated_level ) const {
80351c0b2f7Stbbdev Partitioner p;
80451c0b2f7Stbbdev switch ( rnd.get() % 2 ) {
80551c0b2f7Stbbdev case 0: {
80651c0b2f7Stbbdev // No features
80751c0b2f7Stbbdev tbb::task_group_context ctx;
80851c0b2f7Stbbdev RunTwoBodies( rnd, HeavyMixTestBody(myRandom, myIsolatedLevel, myNestedLevel + 1), p, &ctx );
80951c0b2f7Stbbdev break;
81051c0b2f7Stbbdev }
81151c0b2f7Stbbdev case 1: {
81251c0b2f7Stbbdev // Isolation
81351c0b2f7Stbbdev int previous_isolation = isolated_level;
81451c0b2f7Stbbdev isolated_level = myNestedLevel;
81551c0b2f7Stbbdev tbb::this_task_arena::isolate( IsolatedBody<Partitioner>( *this, p ) );
81651c0b2f7Stbbdev isolated_level = previous_isolation;
81751c0b2f7Stbbdev break;
81851c0b2f7Stbbdev }
81951c0b2f7Stbbdev }
82051c0b2f7Stbbdev }
82151c0b2f7Stbbdev public:
HeavyMixTestBody(tbb::enumerable_thread_specific<utils::FastRandom<>> & random,tbb::enumerable_thread_specific<int> & isolated_level,int nested_level)82251c0b2f7Stbbdev HeavyMixTestBody( tbb::enumerable_thread_specific<utils::FastRandom<>>& random,
82351c0b2f7Stbbdev tbb::enumerable_thread_specific<int>& isolated_level, int nested_level )
82451c0b2f7Stbbdev : myRandom( random ), myIsolatedLevel( isolated_level )
82551c0b2f7Stbbdev , myNestedLevel( nested_level ) {}
operator ()() const82651c0b2f7Stbbdev void operator()() const {
82751c0b2f7Stbbdev int &isolated_level = myIsolatedLevel.local();
828b15aabb3Stbbdev CHECK_FAST_MESSAGE( myNestedLevel > isolated_level, "The outer-level task should not be stolen on isolated level" );
82951c0b2f7Stbbdev if ( myNestedLevel == 20 )
83051c0b2f7Stbbdev return;
83151c0b2f7Stbbdev utils::FastRandom<>& rnd = myRandom.local();
83251c0b2f7Stbbdev if ( rnd.get() % 2 == 1 ) {
83351c0b2f7Stbbdev RunNextLevel<tbb::auto_partitioner>( rnd, isolated_level );
83451c0b2f7Stbbdev } else {
83551c0b2f7Stbbdev RunNextLevel<tbb::affinity_partitioner>( rnd, isolated_level );
83651c0b2f7Stbbdev }
83751c0b2f7Stbbdev }
operator ()(int) const83851c0b2f7Stbbdev void operator()(int) const {
83951c0b2f7Stbbdev this->operator()();
84051c0b2f7Stbbdev }
84151c0b2f7Stbbdev };
84251c0b2f7Stbbdev
84351c0b2f7Stbbdev struct RandomInitializer {
operator ()TestIsolatedExecuteNS::RandomInitializer84451c0b2f7Stbbdev utils::FastRandom<> operator()() {
84551c0b2f7Stbbdev return utils::FastRandom<>( tbb::this_task_arena::current_thread_index() );
84651c0b2f7Stbbdev }
84751c0b2f7Stbbdev };
84851c0b2f7Stbbdev
HeavyMixTest()84951c0b2f7Stbbdev void HeavyMixTest() {
85051c0b2f7Stbbdev std::size_t num_threads = tbb::this_task_arena::max_concurrency() < 3 ? 3 : tbb::this_task_arena::max_concurrency();
85151c0b2f7Stbbdev tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, num_threads);
85251c0b2f7Stbbdev
85351c0b2f7Stbbdev RandomInitializer init_random;
85451c0b2f7Stbbdev tbb::enumerable_thread_specific<utils::FastRandom<>> random( init_random );
85551c0b2f7Stbbdev tbb::enumerable_thread_specific<int> isolated_level( 0 );
85651c0b2f7Stbbdev for ( int i = 0; i < 5; ++i ) {
85751c0b2f7Stbbdev HeavyMixTestBody b( random, isolated_level, 1 );
85851c0b2f7Stbbdev b( 0 );
85951c0b2f7Stbbdev }
86051c0b2f7Stbbdev }
86151c0b2f7Stbbdev
86251c0b2f7Stbbdev //--------------------------------------------------//
86351c0b2f7Stbbdev #if TBB_USE_EXCEPTIONS
86451c0b2f7Stbbdev struct MyException {};
86551c0b2f7Stbbdev struct IsolatedBodyThrowsException {
operator ()TestIsolatedExecuteNS::IsolatedBodyThrowsException86651c0b2f7Stbbdev void operator()() const {
86751c0b2f7Stbbdev #if _MSC_VER && !__INTEL_COMPILER
86851c0b2f7Stbbdev // Workaround an unreachable code warning in task_arena_function.
86951c0b2f7Stbbdev volatile bool workaround = true;
87051c0b2f7Stbbdev if (workaround)
87151c0b2f7Stbbdev #endif
87251c0b2f7Stbbdev {
87351c0b2f7Stbbdev throw MyException();
87451c0b2f7Stbbdev }
87551c0b2f7Stbbdev }
87651c0b2f7Stbbdev };
87751c0b2f7Stbbdev struct ExceptionTestBody : utils::NoAssign {
87851c0b2f7Stbbdev tbb::enumerable_thread_specific<int>& myEts;
87951c0b2f7Stbbdev std::atomic<bool>& myIsStolen;
ExceptionTestBodyTestIsolatedExecuteNS::ExceptionTestBody88051c0b2f7Stbbdev ExceptionTestBody( tbb::enumerable_thread_specific<int>& ets, std::atomic<bool>& is_stolen )
88151c0b2f7Stbbdev : myEts( ets ), myIsStolen( is_stolen ) {}
operator ()TestIsolatedExecuteNS::ExceptionTestBody88251c0b2f7Stbbdev void operator()( int i ) const {
88351c0b2f7Stbbdev try {
88451c0b2f7Stbbdev tbb::this_task_arena::isolate( IsolatedBodyThrowsException() );
88551c0b2f7Stbbdev REQUIRE_MESSAGE( false, "The exception has been lost" );
88651c0b2f7Stbbdev }
88751c0b2f7Stbbdev catch ( MyException ) {}
88851c0b2f7Stbbdev catch ( ... ) {
88951c0b2f7Stbbdev REQUIRE_MESSAGE( false, "Unexpected exception" );
89051c0b2f7Stbbdev }
89151c0b2f7Stbbdev // Check that nested algorithms can steal outer-level tasks
89251c0b2f7Stbbdev int &e = myEts.local();
89351c0b2f7Stbbdev if ( e++ > 0 ) myIsStolen = true;
89451c0b2f7Stbbdev // work imbalance increases chances for stealing
89551c0b2f7Stbbdev tbb::parallel_for( 0, 10+i, utils::DummyBody( 100 ) );
89651c0b2f7Stbbdev --e;
89751c0b2f7Stbbdev }
89851c0b2f7Stbbdev };
89951c0b2f7Stbbdev
90051c0b2f7Stbbdev #endif /* TBB_USE_EXCEPTIONS */
ExceptionTest()90151c0b2f7Stbbdev void ExceptionTest() {
90251c0b2f7Stbbdev #if TBB_USE_EXCEPTIONS
90351c0b2f7Stbbdev tbb::enumerable_thread_specific<int> ets;
90451c0b2f7Stbbdev std::atomic<bool> is_stolen;
90551c0b2f7Stbbdev is_stolen = false;
90649e08aacStbbdev for ( ;; ) {
90751c0b2f7Stbbdev tbb::parallel_for( 0, 1000, ExceptionTestBody( ets, is_stolen ) );
90851c0b2f7Stbbdev if ( is_stolen ) break;
90951c0b2f7Stbbdev }
91051c0b2f7Stbbdev REQUIRE_MESSAGE( is_stolen, "isolate should not affect non-isolated work" );
91151c0b2f7Stbbdev #endif /* TBB_USE_EXCEPTIONS */
91251c0b2f7Stbbdev }
91351c0b2f7Stbbdev
91451c0b2f7Stbbdev struct NonConstBody {
91551c0b2f7Stbbdev unsigned int state;
operator ()TestIsolatedExecuteNS::NonConstBody91651c0b2f7Stbbdev void operator()() {
91751c0b2f7Stbbdev state ^= ~0u;
91851c0b2f7Stbbdev }
91951c0b2f7Stbbdev };
92051c0b2f7Stbbdev
TestNonConstBody()92151c0b2f7Stbbdev void TestNonConstBody() {
92251c0b2f7Stbbdev NonConstBody body;
92351c0b2f7Stbbdev body.state = 0x6c97d5ed;
92451c0b2f7Stbbdev tbb::this_task_arena::isolate(body);
92551c0b2f7Stbbdev REQUIRE_MESSAGE(body.state == 0x93682a12, "The wrong state");
92651c0b2f7Stbbdev }
92751c0b2f7Stbbdev
92851c0b2f7Stbbdev // TODO: Consider tbb::task_group instead of explicit task API.
92951c0b2f7Stbbdev class TestEnqueueTask : public tbb::detail::d1::task {
93051c0b2f7Stbbdev using wait_context = tbb::detail::d1::wait_context;
93151c0b2f7Stbbdev
93251c0b2f7Stbbdev tbb::enumerable_thread_specific<bool>& executed;
93351c0b2f7Stbbdev std::atomic<int>& completed;
93451c0b2f7Stbbdev
93551c0b2f7Stbbdev public:
93651c0b2f7Stbbdev wait_context& waiter;
93751c0b2f7Stbbdev tbb::task_arena& arena;
93851c0b2f7Stbbdev static const int N = 100;
93951c0b2f7Stbbdev
TestEnqueueTask(tbb::enumerable_thread_specific<bool> & exe,std::atomic<int> & c,wait_context & w,tbb::task_arena & a)94051c0b2f7Stbbdev TestEnqueueTask(tbb::enumerable_thread_specific<bool>& exe, std::atomic<int>& c, wait_context& w, tbb::task_arena& a)
94151c0b2f7Stbbdev : executed(exe), completed(c), waiter(w), arena(a) {}
94251c0b2f7Stbbdev
execute(tbb::detail::d1::execution_data &)94351c0b2f7Stbbdev tbb::detail::d1::task* execute(tbb::detail::d1::execution_data&) override {
94451c0b2f7Stbbdev for (int i = 0; i < N; ++i) {
94551c0b2f7Stbbdev arena.enqueue([&]() {
94651c0b2f7Stbbdev executed.local() = true;
94751c0b2f7Stbbdev ++completed;
948b15aabb3Stbbdev for (int j = 0; j < 100; j++) utils::yield();
94951c0b2f7Stbbdev waiter.release(1);
95051c0b2f7Stbbdev });
95151c0b2f7Stbbdev }
95251c0b2f7Stbbdev return nullptr;
95351c0b2f7Stbbdev }
cancel(tbb::detail::d1::execution_data &)95451c0b2f7Stbbdev tbb::detail::d1::task* cancel(tbb::detail::d1::execution_data&) override { return nullptr; }
95551c0b2f7Stbbdev };
95651c0b2f7Stbbdev
95751c0b2f7Stbbdev class TestEnqueueIsolateBody : utils::NoCopy {
95851c0b2f7Stbbdev tbb::enumerable_thread_specific<bool>& executed;
95951c0b2f7Stbbdev std::atomic<int>& completed;
96051c0b2f7Stbbdev tbb::task_arena& arena;
96151c0b2f7Stbbdev public:
96251c0b2f7Stbbdev static const int N = 100;
96351c0b2f7Stbbdev
TestEnqueueIsolateBody(tbb::enumerable_thread_specific<bool> & exe,std::atomic<int> & c,tbb::task_arena & a)96451c0b2f7Stbbdev TestEnqueueIsolateBody(tbb::enumerable_thread_specific<bool>& exe, std::atomic<int>& c, tbb::task_arena& a)
96551c0b2f7Stbbdev : executed(exe), completed(c), arena(a) {}
operator ()()96651c0b2f7Stbbdev void operator()() {
96751c0b2f7Stbbdev tbb::task_group_context ctx;
96851c0b2f7Stbbdev tbb::detail::d1::wait_context waiter(N);
96951c0b2f7Stbbdev
97051c0b2f7Stbbdev TestEnqueueTask root(executed, completed, waiter, arena);
97151c0b2f7Stbbdev tbb::detail::d1::execute_and_wait(root, ctx, waiter, ctx);
97251c0b2f7Stbbdev }
97351c0b2f7Stbbdev };
97451c0b2f7Stbbdev
TestEnqueue()97551c0b2f7Stbbdev void TestEnqueue() {
97651c0b2f7Stbbdev tbb::enumerable_thread_specific<bool> executed(false);
97751c0b2f7Stbbdev std::atomic<int> completed;
9785d4a4acfSIvan Kochin tbb::task_arena arena{tbb::task_arena::attach()};
97951c0b2f7Stbbdev
98051c0b2f7Stbbdev // Check that the main thread can process enqueued tasks.
98151c0b2f7Stbbdev completed = 0;
98251c0b2f7Stbbdev TestEnqueueIsolateBody b1(executed, completed, arena);
98351c0b2f7Stbbdev b1();
98451c0b2f7Stbbdev
98551c0b2f7Stbbdev if (!executed.local()) {
98651c0b2f7Stbbdev REPORT("Warning: No one enqueued task has executed by the main thread.\n");
98751c0b2f7Stbbdev }
98851c0b2f7Stbbdev
98951c0b2f7Stbbdev executed.local() = false;
99051c0b2f7Stbbdev completed = 0;
99151c0b2f7Stbbdev const int N = 100;
99251c0b2f7Stbbdev // Create enqueued tasks out of isolation.
99351c0b2f7Stbbdev
99451c0b2f7Stbbdev tbb::task_group_context ctx;
99551c0b2f7Stbbdev tbb::detail::d1::wait_context waiter(N);
99651c0b2f7Stbbdev for (int i = 0; i < N; ++i) {
99751c0b2f7Stbbdev arena.enqueue([&]() {
99851c0b2f7Stbbdev executed.local() = true;
99951c0b2f7Stbbdev ++completed;
1000b15aabb3Stbbdev utils::yield();
100151c0b2f7Stbbdev waiter.release(1);
100251c0b2f7Stbbdev });
100351c0b2f7Stbbdev }
100451c0b2f7Stbbdev TestEnqueueIsolateBody b2(executed, completed, arena);
100551c0b2f7Stbbdev tbb::this_task_arena::isolate(b2);
100651c0b2f7Stbbdev REQUIRE_MESSAGE(executed.local() == false, "An enqueued task was executed within isolate.");
100751c0b2f7Stbbdev
100851c0b2f7Stbbdev tbb::detail::d1::wait(waiter, ctx);
1009b15aabb3Stbbdev // while (completed < TestEnqueueTask::N + N) utils::yield();
101051c0b2f7Stbbdev }
101151c0b2f7Stbbdev }
101251c0b2f7Stbbdev
TestIsolatedExecute()101351c0b2f7Stbbdev void TestIsolatedExecute() {
101451c0b2f7Stbbdev // At least 3 threads (owner + 2 thieves) are required to reproduce a situation when the owner steals outer
101551c0b2f7Stbbdev // level task on a nested level. If we have only one thief then it will execute outer level tasks first and
101651c0b2f7Stbbdev // the owner will not have a possibility to steal outer level tasks.
101751c0b2f7Stbbdev int platform_max_thread = tbb::this_task_arena::max_concurrency();
101851c0b2f7Stbbdev int num_threads = utils::min( platform_max_thread, 3 );
101951c0b2f7Stbbdev {
102051c0b2f7Stbbdev // Too many threads require too many work to reproduce the stealing from outer level.
102151c0b2f7Stbbdev tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, utils::max(num_threads, 7));
102251c0b2f7Stbbdev TestIsolatedExecuteNS::TwoLoopsTest();
102351c0b2f7Stbbdev TestIsolatedExecuteNS::HeavyMixTest();
102451c0b2f7Stbbdev TestIsolatedExecuteNS::ExceptionTest();
102551c0b2f7Stbbdev }
102651c0b2f7Stbbdev tbb::global_control ctl(tbb::global_control::max_allowed_parallelism, num_threads);
102751c0b2f7Stbbdev TestIsolatedExecuteNS::HeavyMixTest();
102851c0b2f7Stbbdev TestIsolatedExecuteNS::TestNonConstBody();
102951c0b2f7Stbbdev TestIsolatedExecuteNS::TestEnqueue();
103051c0b2f7Stbbdev }
103151c0b2f7Stbbdev
103251c0b2f7Stbbdev //-----------------------------------------------------------------------------------------//
103351c0b2f7Stbbdev
103451c0b2f7Stbbdev class TestDelegatedSpawnWaitBody : utils::NoAssign {
103551c0b2f7Stbbdev tbb::task_arena &my_a;
103651c0b2f7Stbbdev utils::SpinBarrier &my_b1, &my_b2;
103751c0b2f7Stbbdev public:
TestDelegatedSpawnWaitBody(tbb::task_arena & a,utils::SpinBarrier & b1,utils::SpinBarrier & b2)103851c0b2f7Stbbdev TestDelegatedSpawnWaitBody( tbb::task_arena &a, utils::SpinBarrier &b1, utils::SpinBarrier &b2)
103951c0b2f7Stbbdev : my_a(a), my_b1(b1), my_b2(b2) {}
104051c0b2f7Stbbdev // NativeParallelFor's functor
operator ()(int idx) const104151c0b2f7Stbbdev void operator()(int idx) const {
104251c0b2f7Stbbdev if ( idx==0 ) { // thread 0 works in the arena, thread 1 waits for it (to prevent test hang)
104351c0b2f7Stbbdev for (int i = 0; i < 2; ++i) {
104451c0b2f7Stbbdev my_a.enqueue([this] { my_b1.wait(); }); // tasks to sync with workers
104551c0b2f7Stbbdev }
104651c0b2f7Stbbdev tbb::task_group tg;
104751c0b2f7Stbbdev my_b1.wait(); // sync with the workers
104851c0b2f7Stbbdev for( int i=0; i<100000; ++i) {
104951c0b2f7Stbbdev my_a.execute([&tg] { tg.run([] {}); });
105051c0b2f7Stbbdev }
105151c0b2f7Stbbdev my_a.execute([&tg] {tg.wait(); });
105251c0b2f7Stbbdev }
105351c0b2f7Stbbdev
105451c0b2f7Stbbdev my_b2.wait(); // sync both threads
105551c0b2f7Stbbdev }
105651c0b2f7Stbbdev };
105751c0b2f7Stbbdev
TestDelegatedSpawnWait()105851c0b2f7Stbbdev void TestDelegatedSpawnWait() {
1059478de5b1Stbbdev if (tbb::this_task_arena::max_concurrency() < 3) {
1060478de5b1Stbbdev // The test requires at least 2 worker threads
1061478de5b1Stbbdev return;
1062478de5b1Stbbdev }
106351c0b2f7Stbbdev // Regression test for a bug with missed wakeup notification from a delegated task
106451c0b2f7Stbbdev tbb::task_arena a(2,0);
106551c0b2f7Stbbdev a.initialize();
106651c0b2f7Stbbdev utils::SpinBarrier barrier1(3), barrier2(2);
106751c0b2f7Stbbdev utils::NativeParallelFor( 2, TestDelegatedSpawnWaitBody(a, barrier1, barrier2) );
106851c0b2f7Stbbdev a.debug_wait_until_empty();
106951c0b2f7Stbbdev }
107051c0b2f7Stbbdev
107151c0b2f7Stbbdev //-----------------------------------------------------------------------------------------//
107251c0b2f7Stbbdev
107351c0b2f7Stbbdev class TestMultipleWaitsArenaWait : utils::NoAssign {
107451c0b2f7Stbbdev using wait_context = tbb::detail::d1::wait_context;
107551c0b2f7Stbbdev public:
TestMultipleWaitsArenaWait(int idx,int bunch_size,int num_tasks,std::vector<wait_context * > & waiters,std::atomic<int> & processed,tbb::task_group_context & tgc)107651c0b2f7Stbbdev TestMultipleWaitsArenaWait( int idx, int bunch_size, int num_tasks, std::vector<wait_context*>& waiters, std::atomic<int>& processed, tbb::task_group_context& tgc )
107751c0b2f7Stbbdev : my_idx( idx ), my_bunch_size( bunch_size ), my_num_tasks(num_tasks), my_waiters( waiters ), my_processed( processed ), my_context(tgc) {}
operator ()() const107851c0b2f7Stbbdev void operator()() const {
107951c0b2f7Stbbdev ++my_processed;
108051c0b2f7Stbbdev // Wait for all tasks
108151c0b2f7Stbbdev if ( my_idx < my_num_tasks ) {
108251c0b2f7Stbbdev tbb::detail::d1::wait(*my_waiters[my_idx], my_context);
108351c0b2f7Stbbdev }
108451c0b2f7Stbbdev // Signal waiting tasks
108551c0b2f7Stbbdev if ( my_idx >= my_bunch_size ) {
108651c0b2f7Stbbdev my_waiters[my_idx-my_bunch_size]->release();
108751c0b2f7Stbbdev }
108851c0b2f7Stbbdev }
108951c0b2f7Stbbdev private:
109051c0b2f7Stbbdev int my_idx;
109151c0b2f7Stbbdev int my_bunch_size;
109251c0b2f7Stbbdev int my_num_tasks;
109351c0b2f7Stbbdev std::vector<wait_context*>& my_waiters;
109451c0b2f7Stbbdev std::atomic<int>& my_processed;
109551c0b2f7Stbbdev tbb::task_group_context& my_context;
109651c0b2f7Stbbdev };
109751c0b2f7Stbbdev
109851c0b2f7Stbbdev class TestMultipleWaitsThreadBody : utils::NoAssign {
109951c0b2f7Stbbdev using wait_context = tbb::detail::d1::wait_context;
110051c0b2f7Stbbdev public:
TestMultipleWaitsThreadBody(int bunch_size,int num_tasks,tbb::task_arena & a,std::vector<wait_context * > & waiters,std::atomic<int> & processed,tbb::task_group_context & tgc)110151c0b2f7Stbbdev TestMultipleWaitsThreadBody( int bunch_size, int num_tasks, tbb::task_arena& a, std::vector<wait_context*>& waiters, std::atomic<int>& processed, tbb::task_group_context& tgc )
110251c0b2f7Stbbdev : my_bunch_size( bunch_size ), my_num_tasks( num_tasks ), my_arena( a ), my_waiters( waiters ), my_processed( processed ), my_context(tgc) {}
operator ()(int idx) const110351c0b2f7Stbbdev void operator()( int idx ) const {
110451c0b2f7Stbbdev my_arena.execute( TestMultipleWaitsArenaWait( idx, my_bunch_size, my_num_tasks, my_waiters, my_processed, my_context ) );
110551c0b2f7Stbbdev --my_processed;
110651c0b2f7Stbbdev }
110751c0b2f7Stbbdev private:
110851c0b2f7Stbbdev int my_bunch_size;
110951c0b2f7Stbbdev int my_num_tasks;
111051c0b2f7Stbbdev tbb::task_arena& my_arena;
111151c0b2f7Stbbdev std::vector<wait_context*>& my_waiters;
111251c0b2f7Stbbdev std::atomic<int>& my_processed;
111351c0b2f7Stbbdev tbb::task_group_context& my_context;
111451c0b2f7Stbbdev };
111551c0b2f7Stbbdev
TestMultipleWaits(int num_threads,int num_bunches,int bunch_size)111651c0b2f7Stbbdev void TestMultipleWaits( int num_threads, int num_bunches, int bunch_size ) {
111751c0b2f7Stbbdev tbb::task_arena a( num_threads );
111851c0b2f7Stbbdev const int num_tasks = (num_bunches-1)*bunch_size;
111951c0b2f7Stbbdev
112051c0b2f7Stbbdev tbb::task_group_context tgc;
112151c0b2f7Stbbdev std::vector<tbb::detail::d1::wait_context*> waiters(num_tasks);
112251c0b2f7Stbbdev for (auto& w : waiters) w = new tbb::detail::d1::wait_context(0);
112351c0b2f7Stbbdev
112451c0b2f7Stbbdev std::atomic<int> processed(0);
112551c0b2f7Stbbdev for ( int repeats = 0; repeats<10; ++repeats ) {
112651c0b2f7Stbbdev int idx = 0;
112751c0b2f7Stbbdev for ( int bunch = 0; bunch < num_bunches-1; ++bunch ) {
1128*c4a799dfSJhaShweta1 // Sync with the previous bunch of waiters to prevent "false" nested dependencies (when a nested task waits for an outer task).
1129b15aabb3Stbbdev while ( processed < bunch*bunch_size ) utils::yield();
113051c0b2f7Stbbdev // Run the bunch of threads/waiters that depend on the next bunch of threads/waiters.
113151c0b2f7Stbbdev for ( int i = 0; i<bunch_size; ++i ) {
113251c0b2f7Stbbdev waiters[idx]->reserve();
113351c0b2f7Stbbdev std::thread( TestMultipleWaitsThreadBody( bunch_size, num_tasks, a, waiters, processed, tgc ), idx++ ).detach();
113451c0b2f7Stbbdev }
113551c0b2f7Stbbdev }
113651c0b2f7Stbbdev // No sync because the threads of the last bunch do not call wait_for_all.
113751c0b2f7Stbbdev // Run the last bunch of threads.
113851c0b2f7Stbbdev for ( int i = 0; i<bunch_size; ++i )
113951c0b2f7Stbbdev std::thread( TestMultipleWaitsThreadBody( bunch_size, num_tasks, a, waiters, processed, tgc ), idx++ ).detach();
1140b15aabb3Stbbdev while ( processed ) utils::yield();
114151c0b2f7Stbbdev }
114251c0b2f7Stbbdev for (auto w : waiters) delete w;
114351c0b2f7Stbbdev }
114451c0b2f7Stbbdev
TestMultipleWaits()114551c0b2f7Stbbdev void TestMultipleWaits() {
114651c0b2f7Stbbdev // Limit the number of threads to prevent heavy oversubscription.
114751c0b2f7Stbbdev #if TBB_TEST_LOW_WORKLOAD
114851c0b2f7Stbbdev const int max_threads = std::min( 4, tbb::this_task_arena::max_concurrency() );
114951c0b2f7Stbbdev #else
115051c0b2f7Stbbdev const int max_threads = std::min( 16, tbb::this_task_arena::max_concurrency() );
115151c0b2f7Stbbdev #endif
115251c0b2f7Stbbdev
115351c0b2f7Stbbdev utils::FastRandom<> rnd(1234);
115451c0b2f7Stbbdev for ( int threads = 1; threads <= max_threads; threads += utils::max( threads/2, 1 ) ) {
115551c0b2f7Stbbdev for ( int i = 0; i<3; ++i ) {
115651c0b2f7Stbbdev const int num_bunches = 3 + rnd.get()%3;
115751c0b2f7Stbbdev const int bunch_size = max_threads + rnd.get()%max_threads;
115851c0b2f7Stbbdev TestMultipleWaits( threads, num_bunches, bunch_size );
115951c0b2f7Stbbdev }
116051c0b2f7Stbbdev }
116151c0b2f7Stbbdev }
116251c0b2f7Stbbdev
116351c0b2f7Stbbdev //--------------------------------------------------//
116451c0b2f7Stbbdev
TestSmallStackSize()116551c0b2f7Stbbdev void TestSmallStackSize() {
116651c0b2f7Stbbdev tbb::global_control gc(tbb::global_control::thread_stack_size,
116751c0b2f7Stbbdev tbb::global_control::active_value(tbb::global_control::thread_stack_size) / 2 );
116851c0b2f7Stbbdev // The test produces the warning (not a error) if fails. So the test is run many times
116951c0b2f7Stbbdev // to make the log annoying (to force to consider it as an error).
117051c0b2f7Stbbdev for (int i = 0; i < 100; ++i) {
117151c0b2f7Stbbdev tbb::task_arena a;
117251c0b2f7Stbbdev a.initialize();
117351c0b2f7Stbbdev }
117451c0b2f7Stbbdev }
117551c0b2f7Stbbdev
117651c0b2f7Stbbdev //--------------------------------------------------//
117751c0b2f7Stbbdev
117851c0b2f7Stbbdev namespace TestMoveSemanticsNS {
117951c0b2f7Stbbdev struct TestFunctor {
operator ()TestMoveSemanticsNS::TestFunctor118051c0b2f7Stbbdev void operator()() const {};
118151c0b2f7Stbbdev };
118251c0b2f7Stbbdev
118351c0b2f7Stbbdev struct MoveOnlyFunctor : utils::MoveOnly, TestFunctor {
MoveOnlyFunctorTestMoveSemanticsNS::MoveOnlyFunctor118451c0b2f7Stbbdev MoveOnlyFunctor() : utils::MoveOnly() {};
MoveOnlyFunctorTestMoveSemanticsNS::MoveOnlyFunctor118551c0b2f7Stbbdev MoveOnlyFunctor(MoveOnlyFunctor&& other) : utils::MoveOnly(std::move(other)) {};
118651c0b2f7Stbbdev };
118751c0b2f7Stbbdev
118851c0b2f7Stbbdev struct MovePreferableFunctor : utils::Movable, TestFunctor {
MovePreferableFunctorTestMoveSemanticsNS::MovePreferableFunctor118951c0b2f7Stbbdev MovePreferableFunctor() : utils::Movable() {};
MovePreferableFunctorTestMoveSemanticsNS::MovePreferableFunctor119051c0b2f7Stbbdev MovePreferableFunctor(MovePreferableFunctor&& other) : utils::Movable( std::move(other) ) {};
MovePreferableFunctorTestMoveSemanticsNS::MovePreferableFunctor119151c0b2f7Stbbdev MovePreferableFunctor(const MovePreferableFunctor& other) : utils::Movable(other) {};
119251c0b2f7Stbbdev };
119351c0b2f7Stbbdev
119451c0b2f7Stbbdev struct NoMoveNoCopyFunctor : utils::NoCopy, TestFunctor {
NoMoveNoCopyFunctorTestMoveSemanticsNS::NoMoveNoCopyFunctor119551c0b2f7Stbbdev NoMoveNoCopyFunctor() : utils::NoCopy() {};
119651c0b2f7Stbbdev // mv ctor is not allowed as cp ctor from parent NoCopy
119751c0b2f7Stbbdev private:
119851c0b2f7Stbbdev NoMoveNoCopyFunctor(NoMoveNoCopyFunctor&&);
119951c0b2f7Stbbdev };
120051c0b2f7Stbbdev
TestFunctors()120151c0b2f7Stbbdev void TestFunctors() {
120251c0b2f7Stbbdev tbb::task_arena ta;
120351c0b2f7Stbbdev MovePreferableFunctor mpf;
120451c0b2f7Stbbdev // execute() doesn't have any copies or moves of arguments inside the impl
120551c0b2f7Stbbdev ta.execute( NoMoveNoCopyFunctor() );
120651c0b2f7Stbbdev
120751c0b2f7Stbbdev ta.enqueue( MoveOnlyFunctor() );
120851c0b2f7Stbbdev ta.enqueue( mpf );
120951c0b2f7Stbbdev REQUIRE_MESSAGE(mpf.alive, "object was moved when was passed by lval");
121051c0b2f7Stbbdev mpf.Reset();
121151c0b2f7Stbbdev ta.enqueue( std::move(mpf) );
121251c0b2f7Stbbdev REQUIRE_MESSAGE(!mpf.alive, "object was copied when was passed by rval");
121351c0b2f7Stbbdev mpf.Reset();
121451c0b2f7Stbbdev }
121551c0b2f7Stbbdev }
121651c0b2f7Stbbdev
TestMoveSemantics()121751c0b2f7Stbbdev void TestMoveSemantics() {
121851c0b2f7Stbbdev TestMoveSemanticsNS::TestFunctors();
121951c0b2f7Stbbdev }
122051c0b2f7Stbbdev
122151c0b2f7Stbbdev //--------------------------------------------------//
122251c0b2f7Stbbdev
122351c0b2f7Stbbdev #include <vector>
122451c0b2f7Stbbdev
122551c0b2f7Stbbdev #include "common/state_trackable.h"
122651c0b2f7Stbbdev
122751c0b2f7Stbbdev namespace TestReturnValueNS {
122851c0b2f7Stbbdev struct noDefaultTag {};
122951c0b2f7Stbbdev class ReturnType : public StateTrackable<> {
123051c0b2f7Stbbdev static const int SIZE = 42;
123151c0b2f7Stbbdev std::vector<int> data;
123251c0b2f7Stbbdev public:
ReturnType(noDefaultTag)123351c0b2f7Stbbdev ReturnType(noDefaultTag) : StateTrackable<>(0) {}
123451c0b2f7Stbbdev // Define copy constructor to test that it is never called
ReturnType(const ReturnType & r)123551c0b2f7Stbbdev ReturnType(const ReturnType& r) : StateTrackable<>(r), data(r.data) {}
ReturnType(ReturnType && r)123651c0b2f7Stbbdev ReturnType(ReturnType&& r) : StateTrackable<>(std::move(r)), data(std::move(r.data)) {}
123751c0b2f7Stbbdev
fill()123851c0b2f7Stbbdev void fill() {
123951c0b2f7Stbbdev for (int i = 0; i < SIZE; ++i)
124051c0b2f7Stbbdev data.push_back(i);
124151c0b2f7Stbbdev }
check()124251c0b2f7Stbbdev void check() {
124351c0b2f7Stbbdev REQUIRE(data.size() == unsigned(SIZE));
124451c0b2f7Stbbdev for (int i = 0; i < SIZE; ++i)
124551c0b2f7Stbbdev REQUIRE(data[i] == i);
124651c0b2f7Stbbdev StateTrackableCounters::counters_type& cnts = StateTrackableCounters::counters;
124751c0b2f7Stbbdev REQUIRE(cnts[StateTrackableBase::DefaultInitialized] == 0);
124851c0b2f7Stbbdev REQUIRE(cnts[StateTrackableBase::DirectInitialized] == 1);
124951c0b2f7Stbbdev std::size_t copied = cnts[StateTrackableBase::CopyInitialized];
125051c0b2f7Stbbdev std::size_t moved = cnts[StateTrackableBase::MoveInitialized];
125151c0b2f7Stbbdev REQUIRE(cnts[StateTrackableBase::Destroyed] == copied + moved);
12525d4a4acfSIvan Kochin // The number of copies/moves should not exceed 3 if copy elision takes a place:
12535d4a4acfSIvan Kochin // function return, store to an internal storage, acquire internal storage.
12545d4a4acfSIvan Kochin // For compilation, without copy elision, this number may be grown up to 7.
12555d4a4acfSIvan Kochin REQUIRE((copied == 0 && moved <= 7));
12565d4a4acfSIvan Kochin WARN_MESSAGE(moved <= 3,
12575d4a4acfSIvan Kochin "Warning: The number of copies/moves should not exceed 3 if copy elision takes a place."
12585d4a4acfSIvan Kochin "Take an attention to this warning only if copy elision is enabled."
12595d4a4acfSIvan Kochin );
126051c0b2f7Stbbdev }
126151c0b2f7Stbbdev };
126251c0b2f7Stbbdev
126351c0b2f7Stbbdev template <typename R>
function()126451c0b2f7Stbbdev R function() {
126551c0b2f7Stbbdev noDefaultTag tag;
126651c0b2f7Stbbdev R r(tag);
126751c0b2f7Stbbdev r.fill();
126851c0b2f7Stbbdev return r;
126951c0b2f7Stbbdev }
127051c0b2f7Stbbdev
127151c0b2f7Stbbdev template <>
function()127251c0b2f7Stbbdev void function<void>() {}
127351c0b2f7Stbbdev
127451c0b2f7Stbbdev template <typename R>
127551c0b2f7Stbbdev struct Functor {
operator ()TestReturnValueNS::Functor127651c0b2f7Stbbdev R operator()() const {
127751c0b2f7Stbbdev return function<R>();
127851c0b2f7Stbbdev }
127951c0b2f7Stbbdev };
128051c0b2f7Stbbdev
arena()128151c0b2f7Stbbdev tbb::task_arena& arena() {
128251c0b2f7Stbbdev static tbb::task_arena a;
128351c0b2f7Stbbdev return a;
128451c0b2f7Stbbdev }
128551c0b2f7Stbbdev
128651c0b2f7Stbbdev template <typename F>
TestExecute(F & f)128751c0b2f7Stbbdev void TestExecute(F &f) {
128851c0b2f7Stbbdev StateTrackableCounters::reset();
12895d4a4acfSIvan Kochin ReturnType r{arena().execute(f)};
129051c0b2f7Stbbdev r.check();
129151c0b2f7Stbbdev }
129251c0b2f7Stbbdev
129351c0b2f7Stbbdev template <typename F>
TestExecute(const F & f)129451c0b2f7Stbbdev void TestExecute(const F &f) {
129551c0b2f7Stbbdev StateTrackableCounters::reset();
12965d4a4acfSIvan Kochin ReturnType r{arena().execute(f)};
129751c0b2f7Stbbdev r.check();
129851c0b2f7Stbbdev }
129951c0b2f7Stbbdev template <typename F>
TestIsolate(F & f)130051c0b2f7Stbbdev void TestIsolate(F &f) {
130151c0b2f7Stbbdev StateTrackableCounters::reset();
13025d4a4acfSIvan Kochin ReturnType r{tbb::this_task_arena::isolate(f)};
130351c0b2f7Stbbdev r.check();
130451c0b2f7Stbbdev }
130551c0b2f7Stbbdev
130651c0b2f7Stbbdev template <typename F>
TestIsolate(const F & f)130751c0b2f7Stbbdev void TestIsolate(const F &f) {
130851c0b2f7Stbbdev StateTrackableCounters::reset();
13095d4a4acfSIvan Kochin ReturnType r{tbb::this_task_arena::isolate(f)};
131051c0b2f7Stbbdev r.check();
131151c0b2f7Stbbdev }
131251c0b2f7Stbbdev
Test()131351c0b2f7Stbbdev void Test() {
131451c0b2f7Stbbdev TestExecute(Functor<ReturnType>());
131551c0b2f7Stbbdev Functor<ReturnType> f1;
131651c0b2f7Stbbdev TestExecute(f1);
131751c0b2f7Stbbdev TestExecute(function<ReturnType>);
131851c0b2f7Stbbdev
131951c0b2f7Stbbdev arena().execute(Functor<void>());
132051c0b2f7Stbbdev Functor<void> f2;
132151c0b2f7Stbbdev arena().execute(f2);
132251c0b2f7Stbbdev arena().execute(function<void>);
132351c0b2f7Stbbdev TestIsolate(Functor<ReturnType>());
132451c0b2f7Stbbdev TestIsolate(f1);
132551c0b2f7Stbbdev TestIsolate(function<ReturnType>);
132651c0b2f7Stbbdev tbb::this_task_arena::isolate(Functor<void>());
132751c0b2f7Stbbdev tbb::this_task_arena::isolate(f2);
132851c0b2f7Stbbdev tbb::this_task_arena::isolate(function<void>);
132951c0b2f7Stbbdev }
133051c0b2f7Stbbdev }
133151c0b2f7Stbbdev
TestReturnValue()133251c0b2f7Stbbdev void TestReturnValue() {
133351c0b2f7Stbbdev TestReturnValueNS::Test();
133451c0b2f7Stbbdev }
133551c0b2f7Stbbdev
133651c0b2f7Stbbdev //--------------------------------------------------//
133751c0b2f7Stbbdev
133851c0b2f7Stbbdev // MyObserver checks if threads join to the same arena
133951c0b2f7Stbbdev struct MyObserver: public tbb::task_scheduler_observer {
134051c0b2f7Stbbdev tbb::enumerable_thread_specific<tbb::task_arena*>& my_tls;
134151c0b2f7Stbbdev tbb::task_arena& my_arena;
134251c0b2f7Stbbdev std::atomic<int>& my_failure_counter;
134351c0b2f7Stbbdev std::atomic<int>& my_counter;
1344b15aabb3Stbbdev utils::SpinBarrier& m_barrier;
134551c0b2f7Stbbdev
MyObserverMyObserver134651c0b2f7Stbbdev MyObserver(tbb::task_arena& a,
134751c0b2f7Stbbdev tbb::enumerable_thread_specific<tbb::task_arena*>& tls,
134851c0b2f7Stbbdev std::atomic<int>& failure_counter,
1349b15aabb3Stbbdev std::atomic<int>& counter,
1350b15aabb3Stbbdev utils::SpinBarrier& barrier)
135151c0b2f7Stbbdev : tbb::task_scheduler_observer(a), my_tls(tls), my_arena(a),
1352b15aabb3Stbbdev my_failure_counter(failure_counter), my_counter(counter), m_barrier(barrier) {
135351c0b2f7Stbbdev observe(true);
135451c0b2f7Stbbdev }
~MyObserverMyObserver135574207e5dSAnton Potapov ~MyObserver(){
135674207e5dSAnton Potapov observe(false);
135774207e5dSAnton Potapov }
on_scheduler_entryMyObserver135851c0b2f7Stbbdev void on_scheduler_entry(bool worker) override {
135951c0b2f7Stbbdev if (worker) {
136051c0b2f7Stbbdev ++my_counter;
136151c0b2f7Stbbdev tbb::task_arena*& cur_arena = my_tls.local();
136257f524caSIlya Isaev if (cur_arena != nullptr && cur_arena != &my_arena) {
136351c0b2f7Stbbdev ++my_failure_counter;
136451c0b2f7Stbbdev }
136551c0b2f7Stbbdev cur_arena = &my_arena;
136651c0b2f7Stbbdev m_barrier.wait();
136751c0b2f7Stbbdev }
136851c0b2f7Stbbdev }
on_scheduler_exitMyObserver1369b15aabb3Stbbdev void on_scheduler_exit(bool worker) override {
1370b15aabb3Stbbdev if (worker) {
1371b15aabb3Stbbdev m_barrier.wait(); // before wakeup
1372b15aabb3Stbbdev m_barrier.wait(); // after wakeup
137351c0b2f7Stbbdev }
137451c0b2f7Stbbdev }
137551c0b2f7Stbbdev };
137651c0b2f7Stbbdev
TestArenaWorkersMigrationWithNumThreads(int n_threads=0)137751c0b2f7Stbbdev void TestArenaWorkersMigrationWithNumThreads(int n_threads = 0) {
137851c0b2f7Stbbdev if (n_threads == 0) {
137951c0b2f7Stbbdev n_threads = tbb::this_task_arena::max_concurrency();
138051c0b2f7Stbbdev }
1381b15aabb3Stbbdev
138251c0b2f7Stbbdev const int max_n_arenas = 8;
138351c0b2f7Stbbdev int n_arenas = 2;
1384b15aabb3Stbbdev if(n_threads > 16) {
138551c0b2f7Stbbdev n_arenas = max_n_arenas;
1386b15aabb3Stbbdev } else if (n_threads > 8) {
138751c0b2f7Stbbdev n_arenas = 4;
1388b15aabb3Stbbdev }
1389b15aabb3Stbbdev
1390b15aabb3Stbbdev int n_workers = n_threads - 1;
1391b15aabb3Stbbdev n_workers = n_arenas * (n_workers / n_arenas);
1392b15aabb3Stbbdev if (n_workers == 0) {
1393b15aabb3Stbbdev return;
1394b15aabb3Stbbdev }
1395b15aabb3Stbbdev
1396b15aabb3Stbbdev n_threads = n_workers + 1;
1397b15aabb3Stbbdev tbb::global_control control(tbb::global_control::max_allowed_parallelism, n_threads);
1398b15aabb3Stbbdev
1399b15aabb3Stbbdev const int n_repetitions = 20;
1400b15aabb3Stbbdev const int n_outer_repetitions = 100;
140151c0b2f7Stbbdev std::multiset<float> failure_ratio; // for median calculating
1402b15aabb3Stbbdev utils::SpinBarrier barrier(n_threads);
1403b15aabb3Stbbdev utils::SpinBarrier worker_barrier(n_workers);
140451c0b2f7Stbbdev MyObserver* observer[max_n_arenas];
140551c0b2f7Stbbdev std::vector<tbb::task_arena> arenas(n_arenas);
140651c0b2f7Stbbdev std::atomic<int> failure_counter;
140751c0b2f7Stbbdev std::atomic<int> counter;
140851c0b2f7Stbbdev tbb::enumerable_thread_specific<tbb::task_arena*> tls;
1409b15aabb3Stbbdev
141051c0b2f7Stbbdev for (int i = 0; i < n_arenas; ++i) {
1411b15aabb3Stbbdev arenas[i].initialize(n_workers / n_arenas + 1); // +1 for master
1412b15aabb3Stbbdev observer[i] = new MyObserver(arenas[i], tls, failure_counter, counter, barrier);
141351c0b2f7Stbbdev }
1414b15aabb3Stbbdev
141551c0b2f7Stbbdev int ii = 0;
141651c0b2f7Stbbdev for (; ii < n_outer_repetitions; ++ii) {
141751c0b2f7Stbbdev failure_counter = 0;
141851c0b2f7Stbbdev counter = 0;
1419b15aabb3Stbbdev
142051c0b2f7Stbbdev // Main code
1421b15aabb3Stbbdev auto wakeup = [&arenas] { for (auto& a : arenas) a.enqueue([]{}); };
1422b15aabb3Stbbdev wakeup();
1423b15aabb3Stbbdev for (int j = 0; j < n_repetitions; ++j) {
1424b15aabb3Stbbdev barrier.wait(); // entry
1425b15aabb3Stbbdev barrier.wait(); // exit1
1426b15aabb3Stbbdev wakeup();
1427b15aabb3Stbbdev barrier.wait(); // exit2
1428b15aabb3Stbbdev }
1429b15aabb3Stbbdev barrier.wait(); // entry
1430b15aabb3Stbbdev barrier.wait(); // exit1
1431b15aabb3Stbbdev barrier.wait(); // exit2
1432b15aabb3Stbbdev
1433b15aabb3Stbbdev failure_ratio.insert(float(failure_counter) / counter);
143451c0b2f7Stbbdev tls.clear();
143551c0b2f7Stbbdev // collect 3 elements in failure_ratio before calculating median
143651c0b2f7Stbbdev if (ii > 1) {
143751c0b2f7Stbbdev std::multiset<float>::iterator it = failure_ratio.begin();
143851c0b2f7Stbbdev std::advance(it, failure_ratio.size() / 2);
143951c0b2f7Stbbdev if (*it < 0.02)
144051c0b2f7Stbbdev break;
144151c0b2f7Stbbdev }
144251c0b2f7Stbbdev }
144351c0b2f7Stbbdev for (int i = 0; i < n_arenas; ++i) {
144451c0b2f7Stbbdev delete observer[i];
144551c0b2f7Stbbdev }
144651c0b2f7Stbbdev // check if median is so big
144751c0b2f7Stbbdev std::multiset<float>::iterator it = failure_ratio.begin();
144851c0b2f7Stbbdev std::advance(it, failure_ratio.size() / 2);
144951c0b2f7Stbbdev // TODO: decrease constants 0.05 and 0.3 by setting ratio between n_threads and n_arenas
145051c0b2f7Stbbdev if (*it > 0.05) {
145151c0b2f7Stbbdev REPORT("Warning: So many cases when threads join to different arenas.\n");
145251c0b2f7Stbbdev REQUIRE_MESSAGE(*it <= 0.3, "A lot of cases when threads join to different arenas.\n");
145351c0b2f7Stbbdev }
145451c0b2f7Stbbdev }
145551c0b2f7Stbbdev
TestArenaWorkersMigration()145651c0b2f7Stbbdev void TestArenaWorkersMigration() {
145751c0b2f7Stbbdev TestArenaWorkersMigrationWithNumThreads(4);
145851c0b2f7Stbbdev if (tbb::this_task_arena::max_concurrency() != 4) {
145951c0b2f7Stbbdev TestArenaWorkersMigrationWithNumThreads();
146051c0b2f7Stbbdev }
146151c0b2f7Stbbdev }
146251c0b2f7Stbbdev
146351c0b2f7Stbbdev //--------------------------------------------------//
TestDefaultCreatedWorkersAmount()146451c0b2f7Stbbdev void TestDefaultCreatedWorkersAmount() {
146551c0b2f7Stbbdev int threads = tbb::this_task_arena::max_concurrency();
146651c0b2f7Stbbdev utils::NativeParallelFor(1, [threads](int idx) {
146751c0b2f7Stbbdev REQUIRE_MESSAGE(idx == 0, "more than 1 thread is going to reset TLS");
146851c0b2f7Stbbdev utils::SpinBarrier barrier(threads);
146951c0b2f7Stbbdev ResetTLS();
147031199975SPavel for (auto blocked : { false, true }) {
147131199975SPavel for (int trail = 0; trail < (blocked ? 10 : 10000); ++trail) {
147231199975SPavel tbb::parallel_for(0, threads, [threads, blocked, &barrier](int) {
147331199975SPavel CHECK_FAST_MESSAGE(threads == tbb::this_task_arena::max_concurrency(), "concurrency level is not equal specified threadnum");
147431199975SPavel CHECK_FAST_MESSAGE(tbb::this_task_arena::current_thread_index() < tbb::this_task_arena::max_concurrency(), "amount of created threads is more than specified by default");
147551c0b2f7Stbbdev local_id.local() = 1;
147631199975SPavel if (blocked) {
147751c0b2f7Stbbdev // If there is more threads than expected, 'sleep' gives a chance to join unexpected threads.
147851c0b2f7Stbbdev utils::Sleep(1);
147951c0b2f7Stbbdev barrier.wait();
148031199975SPavel }
148151c0b2f7Stbbdev }, tbb::simple_partitioner());
148231199975SPavel REQUIRE_MESSAGE(local_id.size() <= size_t(threads), "amount of created threads is not equal to default num");
148331199975SPavel if (blocked) {
148451c0b2f7Stbbdev REQUIRE_MESSAGE(local_id.size() == size_t(threads), "amount of created threads is not equal to default num");
148551c0b2f7Stbbdev }
148631199975SPavel }
148731199975SPavel }
148851c0b2f7Stbbdev });
148951c0b2f7Stbbdev }
149051c0b2f7Stbbdev
TestAbilityToCreateWorkers(int thread_num)149151c0b2f7Stbbdev void TestAbilityToCreateWorkers(int thread_num) {
149251c0b2f7Stbbdev tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, thread_num);
1493b15aabb3Stbbdev // Checks only some part of reserved-external threads amount:
149451c0b2f7Stbbdev // 0 and 1 reserved threads are important cases but it is also needed
149551c0b2f7Stbbdev // to collect some statistic data with other amount and to not consume
1496*c4a799dfSJhaShweta1 // whole test session time checking each amount
149751c0b2f7Stbbdev TestArenaConcurrency(thread_num - 1, 0, int(thread_num / 2.72));
149851c0b2f7Stbbdev TestArenaConcurrency(thread_num, 1, int(thread_num / 3.14));
149951c0b2f7Stbbdev }
150051c0b2f7Stbbdev
TestDefaultWorkersLimit()150151c0b2f7Stbbdev void TestDefaultWorkersLimit() {
150251c0b2f7Stbbdev TestDefaultCreatedWorkersAmount();
150351c0b2f7Stbbdev #if TBB_TEST_LOW_WORKLOAD
150451c0b2f7Stbbdev TestAbilityToCreateWorkers(24);
150551c0b2f7Stbbdev #else
150651c0b2f7Stbbdev TestAbilityToCreateWorkers(256);
150751c0b2f7Stbbdev #endif
150851c0b2f7Stbbdev }
150951c0b2f7Stbbdev
151051c0b2f7Stbbdev #if TBB_USE_EXCEPTIONS
151151c0b2f7Stbbdev
ExceptionInExecute()151251c0b2f7Stbbdev void ExceptionInExecute() {
151351c0b2f7Stbbdev std::size_t thread_number = utils::get_platform_max_threads();
151455f9b178SIvan Kochin int arena_concurrency = static_cast<int>(thread_number) / 2;
151555f9b178SIvan Kochin tbb::task_arena test_arena(arena_concurrency, arena_concurrency);
151651c0b2f7Stbbdev
151751c0b2f7Stbbdev std::atomic<int> canceled_task{};
151851c0b2f7Stbbdev
151951c0b2f7Stbbdev auto parallel_func = [&test_arena, &canceled_task] (std::size_t) {
152051c0b2f7Stbbdev for (std::size_t i = 0; i < 1000; ++i) {
152151c0b2f7Stbbdev try {
152251c0b2f7Stbbdev test_arena.execute([] {
152351c0b2f7Stbbdev volatile bool suppress_unreachable_code_warning = true;
152451c0b2f7Stbbdev if (suppress_unreachable_code_warning) {
152551c0b2f7Stbbdev throw -1;
152651c0b2f7Stbbdev }
152751c0b2f7Stbbdev });
152851c0b2f7Stbbdev FAIL("An exception should have thrown.");
152951c0b2f7Stbbdev } catch (int) {
153051c0b2f7Stbbdev ++canceled_task;
153151c0b2f7Stbbdev } catch (...) {
153251c0b2f7Stbbdev FAIL("Wrong type of exception.");
153351c0b2f7Stbbdev }
153451c0b2f7Stbbdev }
153551c0b2f7Stbbdev };
153651c0b2f7Stbbdev
153751c0b2f7Stbbdev utils::NativeParallelFor(thread_number, parallel_func);
153851c0b2f7Stbbdev CHECK(canceled_task == thread_number * 1000);
153951c0b2f7Stbbdev }
154051c0b2f7Stbbdev
154151c0b2f7Stbbdev #endif // TBB_USE_EXCEPTIONS
154251c0b2f7Stbbdev
154351c0b2f7Stbbdev class simple_observer : public tbb::task_scheduler_observer {
154451c0b2f7Stbbdev static std::atomic<int> idx_counter;
154551c0b2f7Stbbdev int my_idx;
154651c0b2f7Stbbdev int myMaxConcurrency; // concurrency of the associated arena
154751c0b2f7Stbbdev int myNumReservedSlots; // reserved slots in the associated arena
on_scheduler_entry(bool is_worker)154851c0b2f7Stbbdev void on_scheduler_entry( bool is_worker ) override {
154951c0b2f7Stbbdev int current_index = tbb::this_task_arena::current_thread_index();
155051c0b2f7Stbbdev CHECK(current_index < (myMaxConcurrency > 1 ? myMaxConcurrency : 2));
155151c0b2f7Stbbdev if (is_worker) {
155251c0b2f7Stbbdev CHECK(current_index >= myNumReservedSlots);
155351c0b2f7Stbbdev }
155451c0b2f7Stbbdev }
on_scheduler_exit(bool)155551c0b2f7Stbbdev void on_scheduler_exit( bool /*is_worker*/ ) override
155651c0b2f7Stbbdev {}
155751c0b2f7Stbbdev public:
simple_observer(tbb::task_arena & a,int maxConcurrency,int numReservedSlots)155851c0b2f7Stbbdev simple_observer(tbb::task_arena &a, int maxConcurrency, int numReservedSlots)
155951c0b2f7Stbbdev : tbb::task_scheduler_observer(a), my_idx(idx_counter++)
156051c0b2f7Stbbdev , myMaxConcurrency(maxConcurrency)
156151c0b2f7Stbbdev , myNumReservedSlots(numReservedSlots) {
156251c0b2f7Stbbdev observe(true);
156351c0b2f7Stbbdev }
156451c0b2f7Stbbdev
~simple_observer()156574207e5dSAnton Potapov ~simple_observer(){
156674207e5dSAnton Potapov observe(false);
156774207e5dSAnton Potapov }
156874207e5dSAnton Potapov
operator <(const simple_observer & lhs,const simple_observer & rhs)156951c0b2f7Stbbdev friend bool operator<(const simple_observer& lhs, const simple_observer& rhs) {
157051c0b2f7Stbbdev return lhs.my_idx < rhs.my_idx;
157151c0b2f7Stbbdev }
157251c0b2f7Stbbdev };
157351c0b2f7Stbbdev
157451c0b2f7Stbbdev std::atomic<int> simple_observer::idx_counter{};
157551c0b2f7Stbbdev
157651c0b2f7Stbbdev struct arena_handler {
157751c0b2f7Stbbdev enum arena_status {
157851c0b2f7Stbbdev alive,
157951c0b2f7Stbbdev deleting,
158051c0b2f7Stbbdev deleted
158151c0b2f7Stbbdev };
158251c0b2f7Stbbdev
158351c0b2f7Stbbdev tbb::task_arena* arena;
158451c0b2f7Stbbdev
158551c0b2f7Stbbdev std::atomic<arena_status> status{alive};
158651c0b2f7Stbbdev tbb::spin_rw_mutex arena_in_use{};
158751c0b2f7Stbbdev
158851c0b2f7Stbbdev tbb::concurrent_set<simple_observer> observers;
158951c0b2f7Stbbdev
arena_handlerarena_handler159051c0b2f7Stbbdev arena_handler(tbb::task_arena* ptr) : arena(ptr)
159151c0b2f7Stbbdev {}
159251c0b2f7Stbbdev
operator <(const arena_handler & lhs,const arena_handler & rhs)159351c0b2f7Stbbdev friend bool operator<(const arena_handler& lhs, const arena_handler& rhs) {
159451c0b2f7Stbbdev return lhs.arena < rhs.arena;
159551c0b2f7Stbbdev }
159651c0b2f7Stbbdev };
159751c0b2f7Stbbdev
159851c0b2f7Stbbdev // TODO: Add observer operations
StressTestMixFunctionality()159951c0b2f7Stbbdev void StressTestMixFunctionality() {
160051c0b2f7Stbbdev enum operation_type {
160151c0b2f7Stbbdev create_arena,
160251c0b2f7Stbbdev delete_arena,
160351c0b2f7Stbbdev attach_observer,
160451c0b2f7Stbbdev detach_observer,
160551c0b2f7Stbbdev arena_execute,
160651c0b2f7Stbbdev enqueue_task,
160751c0b2f7Stbbdev last_operation_marker
160851c0b2f7Stbbdev };
160951c0b2f7Stbbdev
161051c0b2f7Stbbdev std::size_t operations_number = last_operation_marker;
161151c0b2f7Stbbdev std::size_t thread_number = utils::get_platform_max_threads();
161251c0b2f7Stbbdev utils::FastRandom<> operation_rnd(42);
161351c0b2f7Stbbdev tbb::spin_mutex random_operation_guard;
161451c0b2f7Stbbdev
161551c0b2f7Stbbdev auto get_random_operation = [&operation_rnd, &random_operation_guard, operations_number] () {
161651c0b2f7Stbbdev tbb::spin_mutex::scoped_lock lock(random_operation_guard);
161751c0b2f7Stbbdev return static_cast<operation_type>(operation_rnd.get() % operations_number);
161851c0b2f7Stbbdev };
161951c0b2f7Stbbdev
162051c0b2f7Stbbdev utils::FastRandom<> arena_rnd(42);
162151c0b2f7Stbbdev tbb::spin_mutex random_arena_guard;
162251c0b2f7Stbbdev auto get_random_arena = [&arena_rnd, &random_arena_guard] () {
162351c0b2f7Stbbdev tbb::spin_mutex::scoped_lock lock(random_arena_guard);
162451c0b2f7Stbbdev return arena_rnd.get();
162551c0b2f7Stbbdev };
162651c0b2f7Stbbdev
162751c0b2f7Stbbdev tbb::concurrent_set<arena_handler> arenas_pool;
162851c0b2f7Stbbdev
162951c0b2f7Stbbdev std::vector<std::thread> thread_pool;
163051c0b2f7Stbbdev
163151c0b2f7Stbbdev utils::SpinBarrier thread_barrier(thread_number);
1632b15aabb3Stbbdev std::size_t max_operations = 20000;
163351c0b2f7Stbbdev std::atomic<std::size_t> curr_operation{};
16342ca2a5f2SAlex
16352ca2a5f2SAlex auto find_arena = [&arenas_pool](tbb::spin_rw_mutex::scoped_lock& lock) -> decltype(arenas_pool.begin()) {
16362ca2a5f2SAlex for (auto curr_arena = arenas_pool.begin(); curr_arena != arenas_pool.end(); ++curr_arena) {
16372ca2a5f2SAlex if (lock.try_acquire(curr_arena->arena_in_use, /*writer*/ false)) {
16382ca2a5f2SAlex if (curr_arena->status == arena_handler::alive) {
16392ca2a5f2SAlex return curr_arena;
16402ca2a5f2SAlex }
16412ca2a5f2SAlex else {
16422ca2a5f2SAlex lock.release();
16432ca2a5f2SAlex }
16442ca2a5f2SAlex }
16452ca2a5f2SAlex }
16462ca2a5f2SAlex return arenas_pool.end();
16472ca2a5f2SAlex };
16482ca2a5f2SAlex
164951c0b2f7Stbbdev auto thread_func = [&] () {
165051c0b2f7Stbbdev arenas_pool.emplace(new tbb::task_arena());
165151c0b2f7Stbbdev thread_barrier.wait();
165251c0b2f7Stbbdev while (curr_operation++ < max_operations) {
165351c0b2f7Stbbdev switch (get_random_operation()) {
165451c0b2f7Stbbdev case create_arena :
165551c0b2f7Stbbdev {
165651c0b2f7Stbbdev arenas_pool.emplace(new tbb::task_arena());
165751c0b2f7Stbbdev break;
165851c0b2f7Stbbdev }
165951c0b2f7Stbbdev case delete_arena :
166051c0b2f7Stbbdev {
166151c0b2f7Stbbdev auto curr_arena = arenas_pool.begin();
166251c0b2f7Stbbdev for (; curr_arena != arenas_pool.end(); ++curr_arena) {
166351c0b2f7Stbbdev arena_handler::arena_status curr_status = arena_handler::alive;
166451c0b2f7Stbbdev if (curr_arena->status.compare_exchange_strong(curr_status, arena_handler::deleting)) {
166551c0b2f7Stbbdev break;
166651c0b2f7Stbbdev }
166751c0b2f7Stbbdev }
166851c0b2f7Stbbdev
166951c0b2f7Stbbdev if (curr_arena == arenas_pool.end()) break;
167051c0b2f7Stbbdev
167151c0b2f7Stbbdev tbb::spin_rw_mutex::scoped_lock lock(curr_arena->arena_in_use, /*writer*/ true);
167251c0b2f7Stbbdev
167351c0b2f7Stbbdev delete curr_arena->arena;
167451c0b2f7Stbbdev curr_arena->status.store(arena_handler::deleted);
167551c0b2f7Stbbdev
167651c0b2f7Stbbdev break;
167751c0b2f7Stbbdev }
167851c0b2f7Stbbdev case attach_observer :
167951c0b2f7Stbbdev {
168051c0b2f7Stbbdev tbb::spin_rw_mutex::scoped_lock lock{};
168151c0b2f7Stbbdev
16822ca2a5f2SAlex auto curr_arena = find_arena(lock);
16832ca2a5f2SAlex if (curr_arena != arenas_pool.end()) {
168451c0b2f7Stbbdev curr_arena->observers.emplace(*curr_arena->arena, thread_number, 1);
168551c0b2f7Stbbdev }
168651c0b2f7Stbbdev break;
168751c0b2f7Stbbdev }
168851c0b2f7Stbbdev case detach_observer:
168951c0b2f7Stbbdev {
169051c0b2f7Stbbdev auto arena_number = get_random_arena() % arenas_pool.size();
169151c0b2f7Stbbdev auto curr_arena = arenas_pool.begin();
169251c0b2f7Stbbdev std::advance(curr_arena, arena_number);
169351c0b2f7Stbbdev
169451c0b2f7Stbbdev for (auto it = curr_arena->observers.begin(); it != curr_arena->observers.end(); ++it) {
169551c0b2f7Stbbdev if (it->is_observing()) {
169651c0b2f7Stbbdev it->observe(false);
169751c0b2f7Stbbdev break;
169851c0b2f7Stbbdev }
169951c0b2f7Stbbdev }
170051c0b2f7Stbbdev
170151c0b2f7Stbbdev break;
170251c0b2f7Stbbdev }
170351c0b2f7Stbbdev case arena_execute:
170451c0b2f7Stbbdev {
170551c0b2f7Stbbdev tbb::spin_rw_mutex::scoped_lock lock{};
17062ca2a5f2SAlex auto curr_arena = find_arena(lock);
170751c0b2f7Stbbdev
17082ca2a5f2SAlex if (curr_arena != arenas_pool.end()) {
170951c0b2f7Stbbdev curr_arena->arena->execute([]() {
1710a080baf9SAlex tbb::affinity_partitioner aff;
17112ca2a5f2SAlex tbb::parallel_for(0, 10000, utils::DummyBody(10), tbb::auto_partitioner{});
17122ca2a5f2SAlex tbb::parallel_for(0, 10000, utils::DummyBody(10), aff);
171351c0b2f7Stbbdev });
17142ca2a5f2SAlex }
171551c0b2f7Stbbdev
171651c0b2f7Stbbdev break;
171751c0b2f7Stbbdev }
171851c0b2f7Stbbdev case enqueue_task:
171951c0b2f7Stbbdev {
172051c0b2f7Stbbdev tbb::spin_rw_mutex::scoped_lock lock{};
17212ca2a5f2SAlex auto curr_arena = find_arena(lock);
172251c0b2f7Stbbdev
17232ca2a5f2SAlex if (curr_arena != arenas_pool.end()) {
17242ca2a5f2SAlex curr_arena->arena->enqueue([] { utils::doDummyWork(1000); });
17252ca2a5f2SAlex }
172651c0b2f7Stbbdev
172751c0b2f7Stbbdev break;
172851c0b2f7Stbbdev }
172951c0b2f7Stbbdev case last_operation_marker :
173051c0b2f7Stbbdev break;
173151c0b2f7Stbbdev }
173251c0b2f7Stbbdev }
173351c0b2f7Stbbdev };
173451c0b2f7Stbbdev
173551c0b2f7Stbbdev for (std::size_t i = 0; i < thread_number - 1; ++i) {
173651c0b2f7Stbbdev thread_pool.emplace_back(thread_func);
173751c0b2f7Stbbdev }
173851c0b2f7Stbbdev
173951c0b2f7Stbbdev thread_func();
174051c0b2f7Stbbdev
174151c0b2f7Stbbdev for (std::size_t i = 0; i < thread_number - 1; ++i) {
174251c0b2f7Stbbdev if (thread_pool[i].joinable()) thread_pool[i].join();
174351c0b2f7Stbbdev }
174451c0b2f7Stbbdev
174551c0b2f7Stbbdev for (auto& handler : arenas_pool) {
174651c0b2f7Stbbdev if (handler.status != arena_handler::deleted) delete handler.arena;
174751c0b2f7Stbbdev }
174851c0b2f7Stbbdev }
174951c0b2f7Stbbdev
175051c0b2f7Stbbdev struct enqueue_test_helper {
enqueue_test_helperenqueue_test_helper175151c0b2f7Stbbdev enqueue_test_helper(tbb::task_arena& arena, tbb::enumerable_thread_specific<bool>& ets , std::atomic<std::size_t>& task_counter)
175251c0b2f7Stbbdev : my_arena(arena), my_ets(ets), my_task_counter(task_counter)
175351c0b2f7Stbbdev {}
175451c0b2f7Stbbdev
enqueue_test_helperenqueue_test_helper175551c0b2f7Stbbdev enqueue_test_helper(const enqueue_test_helper& ef) : my_arena(ef.my_arena), my_ets(ef.my_ets), my_task_counter(ef.my_task_counter)
175651c0b2f7Stbbdev {}
175751c0b2f7Stbbdev
operator ()enqueue_test_helper175851c0b2f7Stbbdev void operator() () const {
175951c0b2f7Stbbdev CHECK(my_ets.local());
176051c0b2f7Stbbdev if (my_task_counter++ < 100000) my_arena.enqueue(enqueue_test_helper(my_arena, my_ets, my_task_counter));
1761b15aabb3Stbbdev utils::yield();
176251c0b2f7Stbbdev }
176351c0b2f7Stbbdev
176451c0b2f7Stbbdev tbb::task_arena& my_arena;
176551c0b2f7Stbbdev tbb::enumerable_thread_specific<bool>& my_ets;
176651c0b2f7Stbbdev std::atomic<std::size_t>& my_task_counter;
176751c0b2f7Stbbdev };
176834c7d170SAlex
test_threads_sleep(int concurrency,int reserved_slots,int num_external_threads)1769c4568449SPavel Kumbrasev void test_threads_sleep(int concurrency, int reserved_slots, int num_external_threads) {
1770c4568449SPavel Kumbrasev tbb::task_arena a(concurrency, reserved_slots);
1771c4568449SPavel Kumbrasev std::mutex m;
1772c4568449SPavel Kumbrasev std::condition_variable cond_var;
1773c4568449SPavel Kumbrasev bool completed{ false };
1774c4568449SPavel Kumbrasev utils::SpinBarrier barrier( concurrency - reserved_slots + 1 );
1775c4568449SPavel Kumbrasev
1776c4568449SPavel Kumbrasev auto body = [&] {
1777c4568449SPavel Kumbrasev std::unique_lock<std::mutex> lock(m);
1778c4568449SPavel Kumbrasev cond_var.wait(lock, [&] { return completed == true; });
1779c4568449SPavel Kumbrasev };
1780c4568449SPavel Kumbrasev
1781c4568449SPavel Kumbrasev for (int i = 0; i < concurrency - reserved_slots; ++i) {
1782c4568449SPavel Kumbrasev a.enqueue([&] {
1783c4568449SPavel Kumbrasev body();
1784c4568449SPavel Kumbrasev barrier.signalNoWait();
1785c4568449SPavel Kumbrasev });
1786c4568449SPavel Kumbrasev }
1787c4568449SPavel Kumbrasev std::vector<std::thread> threads;
1788c4568449SPavel Kumbrasev for (int i = 0; i < num_external_threads; ++i) {
1789c4568449SPavel Kumbrasev threads.emplace_back([&]() { a.execute(body); });
1790c4568449SPavel Kumbrasev }
1791c4568449SPavel Kumbrasev TestCPUUserTime(concurrency);
1792c4568449SPavel Kumbrasev
1793c4568449SPavel Kumbrasev {
1794c4568449SPavel Kumbrasev std::lock_guard<std::mutex> lock(m);
1795c4568449SPavel Kumbrasev completed = true;
1796c4568449SPavel Kumbrasev cond_var.notify_all();
1797c4568449SPavel Kumbrasev }
1798c4568449SPavel Kumbrasev for (auto& t : threads) {
1799c4568449SPavel Kumbrasev t.join();
1800c4568449SPavel Kumbrasev }
1801c4568449SPavel Kumbrasev barrier.wait();
1802c4568449SPavel Kumbrasev }
1803c4568449SPavel Kumbrasev
test_threads_sleep(int concurrency,int reserved_slots)1804c4568449SPavel Kumbrasev void test_threads_sleep(int concurrency, int reserved_slots) {
1805c4568449SPavel Kumbrasev test_threads_sleep(concurrency, reserved_slots, reserved_slots);
1806c4568449SPavel Kumbrasev test_threads_sleep(concurrency, reserved_slots, 2 * concurrency);
1807c4568449SPavel Kumbrasev }
1808c4568449SPavel Kumbrasev
180951c0b2f7Stbbdev //--------------------------------------------------//
1810552f342bSPavel
1811c6045a4fSPavel // This test requires TBB in an uninitialized state
1812552f342bSPavel //! \brief \ref requirement
1813552f342bSPavel TEST_CASE("task_arena initialize soft limit ignoring affinity mask") {
1814c6045a4fSPavel REQUIRE_MESSAGE((tbb::this_task_arena::current_thread_index() == tbb::task_arena::not_initialized), "TBB was initialized state");
1815552f342bSPavel tbb::enumerable_thread_specific<int> ets;
1816552f342bSPavel
1817552f342bSPavel tbb::task_arena arena(int(utils::get_platform_max_threads() * 2));
__anon1d1198c31c02null1818552f342bSPavel arena.execute([&ets] {
1819552f342bSPavel tbb::parallel_for(0, 10000000, [&ets](int){
1820552f342bSPavel ets.local() = 1;
1821552f342bSPavel utils::doDummyWork(100);
1822552f342bSPavel });
1823552f342bSPavel });
1824552f342bSPavel
1825552f342bSPavel CHECK(ets.combine(std::plus<int>{}) <= int(utils::get_platform_max_threads()));
1826552f342bSPavel }
1827552f342bSPavel
182851c0b2f7Stbbdev //! Test for task arena in concurrent cases
182951c0b2f7Stbbdev //! \brief \ref requirement
183051c0b2f7Stbbdev TEST_CASE("Test for concurrent functionality") {
183151c0b2f7Stbbdev TestConcurrentFunctionality();
183251c0b2f7Stbbdev }
183351c0b2f7Stbbdev
18347cee2251SJhaShweta1 #if !EMSCRIPTEN
18357cee2251SJhaShweta1 //! For emscripten, FPU control state has not been set correctly
183651c0b2f7Stbbdev //! Test for arena entry consistency
183751c0b2f7Stbbdev //! \brief \ref requirement \ref error_guessing
183851c0b2f7Stbbdev TEST_CASE("Test for task arena entry consistency") {
183951c0b2f7Stbbdev TestArenaEntryConsistency();
184051c0b2f7Stbbdev }
18417cee2251SJhaShweta1 #endif
184251c0b2f7Stbbdev
184351c0b2f7Stbbdev //! Test for task arena attach functionality
184451c0b2f7Stbbdev //! \brief \ref requirement \ref interface
184551c0b2f7Stbbdev TEST_CASE("Test for the attach functionality") {
184651c0b2f7Stbbdev TestAttach(4);
184751c0b2f7Stbbdev }
184851c0b2f7Stbbdev
184951c0b2f7Stbbdev //! Test for constant functor requirements
185051c0b2f7Stbbdev //! \brief \ref requirement \ref interface
185151c0b2f7Stbbdev TEST_CASE("Test for constant functor requirement") {
185251c0b2f7Stbbdev TestConstantFunctorRequirement();
185351c0b2f7Stbbdev }
185451c0b2f7Stbbdev
185551c0b2f7Stbbdev //! Test for move semantics support
185651c0b2f7Stbbdev //! \brief \ref requirement \ref interface
185751c0b2f7Stbbdev TEST_CASE("Move semantics support") {
185851c0b2f7Stbbdev TestMoveSemantics();
185951c0b2f7Stbbdev }
186051c0b2f7Stbbdev
186151c0b2f7Stbbdev //! Test for different return value types
186251c0b2f7Stbbdev //! \brief \ref requirement \ref interface
186351c0b2f7Stbbdev TEST_CASE("Return value test") {
186451c0b2f7Stbbdev TestReturnValue();
186551c0b2f7Stbbdev }
186651c0b2f7Stbbdev
186751c0b2f7Stbbdev //! Test for delegated task spawn in case of unsuccessful slot attach
186851c0b2f7Stbbdev //! \brief \ref error_guessing
186951c0b2f7Stbbdev TEST_CASE("Delegated spawn wait") {
187051c0b2f7Stbbdev TestDelegatedSpawnWait();
187151c0b2f7Stbbdev }
187251c0b2f7Stbbdev
18737cee2251SJhaShweta1 #if !EMSCRIPTEN
18747cee2251SJhaShweta1 //! For emscripten, FPU control state has not been set correctly
187551c0b2f7Stbbdev //! Test task arena isolation functionality
187651c0b2f7Stbbdev //! \brief \ref requirement \ref interface
187751c0b2f7Stbbdev TEST_CASE("Isolated execute") {
187851c0b2f7Stbbdev // Isolation tests cases is valid only for more then 2 threads
187951c0b2f7Stbbdev if (tbb::this_task_arena::max_concurrency() > 2) {
188051c0b2f7Stbbdev TestIsolatedExecute();
188151c0b2f7Stbbdev }
188251c0b2f7Stbbdev }
18837cee2251SJhaShweta1 #endif
188451c0b2f7Stbbdev
188551c0b2f7Stbbdev //! Test for TBB Workers creation limits
188651c0b2f7Stbbdev //! \brief \ref requirement
188751c0b2f7Stbbdev TEST_CASE("Default workers limit") {
188851c0b2f7Stbbdev TestDefaultWorkersLimit();
188951c0b2f7Stbbdev }
189051c0b2f7Stbbdev
189151c0b2f7Stbbdev //! Test for workers migration between arenas
189251c0b2f7Stbbdev //! \brief \ref error_guessing \ref stress
189351c0b2f7Stbbdev TEST_CASE("Arena workers migration") {
189451c0b2f7Stbbdev TestArenaWorkersMigration();
189551c0b2f7Stbbdev }
189651c0b2f7Stbbdev
18977cee2251SJhaShweta1 #if !EMSCRIPTEN
18987cee2251SJhaShweta1 //! For emscripten, FPU control state has not been set correctly
189951c0b2f7Stbbdev //! Test for multiple waits, threads should not block each other
190051c0b2f7Stbbdev //! \brief \ref requirement
190151c0b2f7Stbbdev TEST_CASE("Multiple waits") {
190251c0b2f7Stbbdev TestMultipleWaits();
190351c0b2f7Stbbdev }
19047cee2251SJhaShweta1 #endif
190551c0b2f7Stbbdev
190651c0b2f7Stbbdev //! Test for small stack size settings and arena initialization
190751c0b2f7Stbbdev //! \brief \ref error_guessing
190851c0b2f7Stbbdev TEST_CASE("Small stack size") {
190951c0b2f7Stbbdev TestSmallStackSize();
191051c0b2f7Stbbdev }
191151c0b2f7Stbbdev
191251c0b2f7Stbbdev #if TBB_USE_EXCEPTIONS
191351c0b2f7Stbbdev //! \brief \ref requirement \ref stress
191451c0b2f7Stbbdev TEST_CASE("Test for exceptions during execute.") {
191551c0b2f7Stbbdev ExceptionInExecute();
191651c0b2f7Stbbdev }
191749e08aacStbbdev
191849e08aacStbbdev //! \brief \ref error_guessing
191949e08aacStbbdev TEST_CASE("Exception thrown during tbb::task_arena::execute call") {
192049e08aacStbbdev struct throwing_obj {
throwing_objthrowing_obj192149e08aacStbbdev throwing_obj() {
192249e08aacStbbdev volatile bool flag = true;
192349e08aacStbbdev if (flag) throw std::exception{};
192449e08aacStbbdev }
192549e08aacStbbdev throwing_obj(const throwing_obj&) = default;
~throwing_objthrowing_obj192649e08aacStbbdev ~throwing_obj() { FAIL("An destructor was called."); }
192749e08aacStbbdev };
192849e08aacStbbdev
192949e08aacStbbdev tbb::task_arena arena;
193049e08aacStbbdev
__anon1d1198c31e02null193149e08aacStbbdev REQUIRE_THROWS_AS( [&] {
193249e08aacStbbdev arena.execute([] {
193349e08aacStbbdev return throwing_obj{};
193449e08aacStbbdev });
193549e08aacStbbdev }(), std::exception );
193649e08aacStbbdev }
193751c0b2f7Stbbdev #endif // TBB_USE_EXCEPTIONS
1938c4568449SPavel Kumbrasev
193951c0b2f7Stbbdev //! \brief \ref stress
194051c0b2f7Stbbdev TEST_CASE("Stress test with mixing functionality") {
194151c0b2f7Stbbdev StressTestMixFunctionality();
194251c0b2f7Stbbdev }
1943c4568449SPavel Kumbrasev
194451c0b2f7Stbbdev //! \brief \ref stress
194551c0b2f7Stbbdev TEST_CASE("Workers oversubscription") {
194651c0b2f7Stbbdev std::size_t num_threads = utils::get_platform_max_threads();
194751c0b2f7Stbbdev tbb::enumerable_thread_specific<bool> ets;
194851c0b2f7Stbbdev tbb::global_control gl(tbb::global_control::max_allowed_parallelism, num_threads * 2);
194955f9b178SIvan Kochin tbb::task_arena arena(static_cast<int>(num_threads) * 2);
195051c0b2f7Stbbdev
195151c0b2f7Stbbdev utils::SpinBarrier barrier(num_threads * 2);
195251c0b2f7Stbbdev
__anon1d1198c32002null195351c0b2f7Stbbdev arena.execute([&] {
195451c0b2f7Stbbdev tbb::parallel_for(std::size_t(0), num_threads * 2,
195551c0b2f7Stbbdev [&] (const std::size_t&) {
195651c0b2f7Stbbdev ets.local() = true;
195751c0b2f7Stbbdev barrier.wait();
195851c0b2f7Stbbdev }
195951c0b2f7Stbbdev );
196051c0b2f7Stbbdev });
196151c0b2f7Stbbdev
1962b15aabb3Stbbdev utils::yield();
196351c0b2f7Stbbdev
196451c0b2f7Stbbdev std::atomic<std::size_t> task_counter{0};
196551c0b2f7Stbbdev for (std::size_t i = 0; i < num_threads / 4 + 1; ++i) {
196651c0b2f7Stbbdev arena.enqueue(enqueue_test_helper(arena, ets, task_counter));
196751c0b2f7Stbbdev }
196851c0b2f7Stbbdev
1969b15aabb3Stbbdev while (task_counter < 100000) utils::yield();
197051c0b2f7Stbbdev
__anon1d1198c32202null197151c0b2f7Stbbdev arena.execute([&] {
197251c0b2f7Stbbdev tbb::parallel_for(std::size_t(0), num_threads * 2,
197351c0b2f7Stbbdev [&] (const std::size_t&) {
197451c0b2f7Stbbdev CHECK(ets.local());
197551c0b2f7Stbbdev barrier.wait();
197651c0b2f7Stbbdev }
197751c0b2f7Stbbdev );
197851c0b2f7Stbbdev });
197951c0b2f7Stbbdev }
1980c4568449SPavel Kumbrasev
1981478de5b1Stbbdev #if TBB_USE_EXCEPTIONS
1982478de5b1Stbbdev //! The test for error in scheduling empty task_handle
1983478de5b1Stbbdev //! \brief \ref requirement
198474b7fc74SAnton Potapov TEST_CASE("Empty task_handle cannot be scheduled"
should_fail()198574b7fc74SAnton Potapov * doctest::should_fail() //Test needs to revised as implementation uses assertions instead of exceptions
198674b7fc74SAnton Potapov * doctest::skip() //skip the test for now, to not pollute the test log
198774b7fc74SAnton Potapov ){
1988478de5b1Stbbdev tbb::task_arena ta;
1989478de5b1Stbbdev
1990478de5b1Stbbdev CHECK_THROWS_WITH_AS(ta.enqueue(tbb::task_handle{}), "Attempt to schedule empty task_handle", std::runtime_error);
1991478de5b1Stbbdev CHECK_THROWS_WITH_AS(tbb::this_task_arena::enqueue(tbb::task_handle{}), "Attempt to schedule empty task_handle", std::runtime_error);
1992478de5b1Stbbdev }
199374b7fc74SAnton Potapov #endif
199474b7fc74SAnton Potapov
19957cee2251SJhaShweta1 #if !EMSCRIPTEN
19967cee2251SJhaShweta1 //! For emscripten, FPU control state has not been set correctly
1997c4568449SPavel Kumbrasev //! \brief \ref error_guessing
1998c4568449SPavel Kumbrasev TEST_CASE("Test threads sleep") {
1999c4568449SPavel Kumbrasev for (auto concurrency_level : utils::concurrency_range()) {
2000c4568449SPavel Kumbrasev int conc = int(concurrency_level);
2001c4568449SPavel Kumbrasev test_threads_sleep(conc, 0);
2002c4568449SPavel Kumbrasev test_threads_sleep(conc, 1);
2003c4568449SPavel Kumbrasev test_threads_sleep(conc, conc/2);
2004c4568449SPavel Kumbrasev test_threads_sleep(conc, conc);
2005c4568449SPavel Kumbrasev }
2006c4568449SPavel Kumbrasev }
20077cee2251SJhaShweta1 #endif
2008c4568449SPavel Kumbrasev
200974b7fc74SAnton Potapov #if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS
2010478de5b1Stbbdev
2011478de5b1Stbbdev //! Basic test for is_inside_task in task_group
2012478de5b1Stbbdev //! \brief \ref interface \ref requirement
2013478de5b1Stbbdev TEST_CASE("is_inside_task in task_group"){
2014478de5b1Stbbdev CHECK( false == tbb::is_inside_task());
2015478de5b1Stbbdev
2016478de5b1Stbbdev tbb::task_group tg;
__anon1d1198c32402null2017478de5b1Stbbdev tg.run_and_wait([&]{
2018478de5b1Stbbdev CHECK( true == tbb::is_inside_task());
2019478de5b1Stbbdev });
2020478de5b1Stbbdev }
2021478de5b1Stbbdev
2022478de5b1Stbbdev //! Basic test for is_inside_task in arena::execute
2023478de5b1Stbbdev //! \brief \ref interface \ref requirement
2024478de5b1Stbbdev TEST_CASE("is_inside_task in arena::execute"){
2025478de5b1Stbbdev CHECK( false == tbb::is_inside_task());
2026478de5b1Stbbdev
2027478de5b1Stbbdev tbb::task_arena arena;
2028478de5b1Stbbdev
__anon1d1198c32502null2029478de5b1Stbbdev arena.execute([&]{
2030478de5b1Stbbdev // The execute method is processed outside of any task
2031478de5b1Stbbdev CHECK( false == tbb::is_inside_task());
2032478de5b1Stbbdev });
2033478de5b1Stbbdev }
2034478de5b1Stbbdev
2035478de5b1Stbbdev //! The test for is_inside_task in arena::execute when inside other task
2036478de5b1Stbbdev //! \brief \ref error_guessing
2037478de5b1Stbbdev TEST_CASE("is_inside_task in arena::execute") {
2038478de5b1Stbbdev CHECK(false == tbb::is_inside_task());
2039478de5b1Stbbdev
2040478de5b1Stbbdev tbb::task_arena arena;
2041478de5b1Stbbdev tbb::task_group tg;
__anon1d1198c32602null2042478de5b1Stbbdev tg.run_and_wait([&] {
2043478de5b1Stbbdev arena.execute([&] {
2044478de5b1Stbbdev // The execute method is processed outside of any task
2045478de5b1Stbbdev CHECK(false == tbb::is_inside_task());
2046478de5b1Stbbdev });
2047478de5b1Stbbdev });
2048478de5b1Stbbdev }
2049478de5b1Stbbdev #endif //__TBB_PREVIEW_TASK_GROUP_EXTENSIONS
2050f8f7f738SPavel Kumbrasev
2051f8f7f738SPavel Kumbrasev //! \brief \ref interface \ref requirement \ref regression
2052f8f7f738SPavel Kumbrasev TEST_CASE("worker threads occupy slots in correct range") {
2053f8f7f738SPavel Kumbrasev std::vector<tbb::task_arena> arenas(42);
2054f8f7f738SPavel Kumbrasev for (auto& arena : arenas) {
2055f8f7f738SPavel Kumbrasev arena.initialize(1, 0);
2056f8f7f738SPavel Kumbrasev }
2057f8f7f738SPavel Kumbrasev
20583f657a34SPavel Kumbrasev std::atomic<int> counter{0};
2059f8f7f738SPavel Kumbrasev for (auto& arena : arenas) {
__anon1d1198c32802null20603f657a34SPavel Kumbrasev arena.enqueue([&] {
2061f8f7f738SPavel Kumbrasev CHECK(tbb::this_task_arena::current_thread_index() == 0);
20623f657a34SPavel Kumbrasev ++counter;
2063f8f7f738SPavel Kumbrasev });
2064f8f7f738SPavel Kumbrasev }
2065f8f7f738SPavel Kumbrasev
20663f657a34SPavel Kumbrasev while (counter < 42) { utils::yield(); }
2067f8f7f738SPavel Kumbrasev }
2068