151c0b2f7Stbbdev /*
2b15aabb3Stbbdev Copyright (c) 2018-2021 Intel Corporation
351c0b2f7Stbbdev
451c0b2f7Stbbdev Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev you may not use this file except in compliance with the License.
651c0b2f7Stbbdev You may obtain a copy of the License at
751c0b2f7Stbbdev
851c0b2f7Stbbdev http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev
1051c0b2f7Stbbdev Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev See the License for the specific language governing permissions and
1451c0b2f7Stbbdev limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev
17b15aabb3Stbbdev #if __INTEL_COMPILER && _MSC_VER
18b15aabb3Stbbdev #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
19b15aabb3Stbbdev #endif
20b15aabb3Stbbdev
2151c0b2f7Stbbdev #include "common/config.h"
2251c0b2f7Stbbdev
2351c0b2f7Stbbdev #include "tbb/flow_graph.h"
2451c0b2f7Stbbdev #include "tbb/parallel_for.h"
25b15aabb3Stbbdev #include "tbb/global_control.h"
26b15aabb3Stbbdev #include "tbb/task_arena.h"
2751c0b2f7Stbbdev
2851c0b2f7Stbbdev #include "common/test.h"
2951c0b2f7Stbbdev #include "common/utils.h"
308dcbd5b1Stbbdev #include "common/utils_concurrency_limit.h"
3151c0b2f7Stbbdev #include "common/spin_barrier.h"
3251c0b2f7Stbbdev
3351c0b2f7Stbbdev #include <vector>
3451c0b2f7Stbbdev #include <cstdlib>
3551c0b2f7Stbbdev #include <random>
3651c0b2f7Stbbdev #include <algorithm>
37b15aabb3Stbbdev #include <memory>
3851c0b2f7Stbbdev
3951c0b2f7Stbbdev
4051c0b2f7Stbbdev //! \file test_flow_graph_priorities.cpp
4151c0b2f7Stbbdev //! \brief Test for [flow_graph.copy_body flow_graph.function_node flow_graph.multifunction_node flow_graph.continue_node flow_graph.async_node] specification
4251c0b2f7Stbbdev
4351c0b2f7Stbbdev
4451c0b2f7Stbbdev using namespace tbb::flow;
4551c0b2f7Stbbdev
4651c0b2f7Stbbdev struct TaskInfo {
TaskInfoTaskInfo4751c0b2f7Stbbdev TaskInfo() : my_priority(-1), my_task_index(-1) {}
TaskInfoTaskInfo4851c0b2f7Stbbdev TaskInfo( int priority, int task_index )
4951c0b2f7Stbbdev : my_priority(priority), my_task_index(task_index) {}
5051c0b2f7Stbbdev int my_priority;
5151c0b2f7Stbbdev int my_task_index;
5251c0b2f7Stbbdev };
5351c0b2f7Stbbdev
5451c0b2f7Stbbdev std::vector<TaskInfo> g_task_info;
5551c0b2f7Stbbdev
5651c0b2f7Stbbdev std::atomic<unsigned> g_task_num;
5751c0b2f7Stbbdev
spin_for(double delta)5851c0b2f7Stbbdev void spin_for( double delta ) {
5951c0b2f7Stbbdev tbb::tick_count start = tbb::tick_count::now();
6051c0b2f7Stbbdev while( (tbb::tick_count::now() - start).seconds() < delta ) ;
6151c0b2f7Stbbdev }
6251c0b2f7Stbbdev
6351c0b2f7Stbbdev namespace PriorityNodesTakePrecedence {
6451c0b2f7Stbbdev
6551c0b2f7Stbbdev std::atomic<bool> g_work_submitted;
6651c0b2f7Stbbdev
6751c0b2f7Stbbdev const unsigned node_num = 100;
6851c0b2f7Stbbdev const unsigned start_index = node_num / 3;
6951c0b2f7Stbbdev const unsigned end_index = node_num * 2 / 3;
7051c0b2f7Stbbdev std::atomic<unsigned> g_priority_task_index;
7151c0b2f7Stbbdev
body_func(int priority,utils::SpinBarrier & my_barrier)7251c0b2f7Stbbdev void body_func( int priority, utils::SpinBarrier& my_barrier ) {
73b15aabb3Stbbdev while( !g_work_submitted.load(std::memory_order_acquire) )
74b15aabb3Stbbdev tbb::detail::d0::yield();
7551c0b2f7Stbbdev int current_task_index = g_task_num++;
7651c0b2f7Stbbdev if( priority != no_priority )
7751c0b2f7Stbbdev g_task_info[g_priority_task_index++] = TaskInfo( priority, current_task_index );
7851c0b2f7Stbbdev const bool all_threads_will_come =
7951c0b2f7Stbbdev unsigned(current_task_index) < node_num - (node_num % tbb::this_task_arena::max_concurrency());
8051c0b2f7Stbbdev if( all_threads_will_come )
8151c0b2f7Stbbdev my_barrier.wait();
8251c0b2f7Stbbdev }
8351c0b2f7Stbbdev
8451c0b2f7Stbbdev typedef multifunction_node< int, std::tuple<int> > multi_node;
8551c0b2f7Stbbdev
8651c0b2f7Stbbdev template <typename T>
8751c0b2f7Stbbdev struct Body {
BodyPriorityNodesTakePrecedence::Body8851c0b2f7Stbbdev Body( int priority, utils::SpinBarrier& barrier )
8951c0b2f7Stbbdev : my_priority( priority ), my_barrier( barrier ) {}
operator ()PriorityNodesTakePrecedence::Body9051c0b2f7Stbbdev T operator()( const T& msg ) const {
9151c0b2f7Stbbdev body_func( my_priority, my_barrier );
9251c0b2f7Stbbdev return msg;
9351c0b2f7Stbbdev }
operator ()PriorityNodesTakePrecedence::Body9451c0b2f7Stbbdev void operator()( int msg, multi_node::output_ports_type& op ) const {
9551c0b2f7Stbbdev body_func( my_priority, my_barrier );
9651c0b2f7Stbbdev std::get<0>(op).try_put( msg );
9751c0b2f7Stbbdev }
9851c0b2f7Stbbdev private:
9951c0b2f7Stbbdev int my_priority;
10051c0b2f7Stbbdev utils::SpinBarrier& my_barrier;
10151c0b2f7Stbbdev };
10251c0b2f7Stbbdev
10351c0b2f7Stbbdev template<typename NodeType, typename BodyType>
10451c0b2f7Stbbdev struct node_creator_t {
operator ()PriorityNodesTakePrecedence::node_creator_t10551c0b2f7Stbbdev NodeType* operator()( graph& g, unsigned index, utils::SpinBarrier& barrier ) {
10651c0b2f7Stbbdev if( start_index <= index && index < end_index )
10751c0b2f7Stbbdev return new NodeType( g, unlimited, BodyType(index, barrier), node_priority_t(index) );
10851c0b2f7Stbbdev else
10951c0b2f7Stbbdev return new NodeType( g, unlimited, BodyType(no_priority, barrier) );
11051c0b2f7Stbbdev }
11151c0b2f7Stbbdev };
11251c0b2f7Stbbdev
11351c0b2f7Stbbdev template<typename BodyType>
11451c0b2f7Stbbdev struct node_creator_t< continue_node<continue_msg>, BodyType > {
operator ()PriorityNodesTakePrecedence::node_creator_t11551c0b2f7Stbbdev continue_node<continue_msg>* operator()( graph& g, unsigned index, utils::SpinBarrier& barrier ) {
11651c0b2f7Stbbdev if( start_index <= index && index < end_index )
11751c0b2f7Stbbdev return new continue_node<continue_msg>( g, BodyType(index, barrier), node_priority_t(index) );
11851c0b2f7Stbbdev else
11951c0b2f7Stbbdev return new continue_node<continue_msg>( g, BodyType(no_priority, barrier) );
12051c0b2f7Stbbdev }
12151c0b2f7Stbbdev };
12251c0b2f7Stbbdev
12351c0b2f7Stbbdev
12451c0b2f7Stbbdev struct passthru_body {
12551c0b2f7Stbbdev template<typename T>
operator ()PriorityNodesTakePrecedence::passthru_body12651c0b2f7Stbbdev continue_msg operator()( T ) const { return continue_msg(); }
12751c0b2f7Stbbdev };
12851c0b2f7Stbbdev
12951c0b2f7Stbbdev template<typename NodeType, typename NodeTypeCreator>
test_node(NodeTypeCreator node_creator)13051c0b2f7Stbbdev void test_node( NodeTypeCreator node_creator ) {
13151c0b2f7Stbbdev const int num_threads = tbb::this_task_arena::max_concurrency();
13251c0b2f7Stbbdev utils::SpinBarrier barrier( num_threads );
13351c0b2f7Stbbdev graph g;
13451c0b2f7Stbbdev broadcast_node<typename NodeType::input_type> bn(g);
13551c0b2f7Stbbdev function_node<typename NodeType::input_type> tn(g, unlimited, passthru_body());
13651c0b2f7Stbbdev // Using pointers to nodes to avoid errors on compilers, which try to generate assignment
13751c0b2f7Stbbdev // operator for the nodes
138b15aabb3Stbbdev std::vector< std::unique_ptr<NodeType> > nodes;
13951c0b2f7Stbbdev for( unsigned i = 0; i < node_num; ++i ) {
140b15aabb3Stbbdev nodes.push_back(std::unique_ptr<NodeType>( node_creator(g, i, barrier) ));
14151c0b2f7Stbbdev make_edge( bn, *nodes.back() );
14251c0b2f7Stbbdev make_edge( *nodes.back(), tn );
14351c0b2f7Stbbdev }
14451c0b2f7Stbbdev
14551c0b2f7Stbbdev const size_t repeats = 10;
14651c0b2f7Stbbdev const size_t priority_nodes_num = end_index - start_index;
14751c0b2f7Stbbdev size_t global_order_failures = 0;
14851c0b2f7Stbbdev for( size_t repeat = 0; repeat < repeats; ++repeat ) {
14951c0b2f7Stbbdev g_work_submitted.store( false, std::memory_order_release );
15051c0b2f7Stbbdev g_task_num = g_priority_task_index = 0;
15151c0b2f7Stbbdev g_task_info.clear(); g_task_info.resize( priority_nodes_num );
15251c0b2f7Stbbdev
15351c0b2f7Stbbdev bn.try_put( typename NodeType::input_type{} );
15451c0b2f7Stbbdev // Setting of the flag is based on the knowledge that the calling thread broadcasts the
15551c0b2f7Stbbdev // message to successor nodes. Thus, once the calling thread returns from try_put() call all
15651c0b2f7Stbbdev // necessary tasks are spawned. Thus, this makes this test to be a whitebox test to some
15751c0b2f7Stbbdev // extent.
15851c0b2f7Stbbdev g_work_submitted.store( true, std::memory_order_release );
15951c0b2f7Stbbdev
16051c0b2f7Stbbdev g.wait_for_all();
16151c0b2f7Stbbdev
16251c0b2f7Stbbdev CHECK_MESSAGE( (g_priority_task_index == g_task_info.size()), "Incorrect number of tasks with priority." );
16351c0b2f7Stbbdev CHECK_MESSAGE( (priority_nodes_num == g_task_info.size()), "Incorrect number of tasks with priority executed." );
16451c0b2f7Stbbdev
16551c0b2f7Stbbdev for( unsigned i = 0; i < g_priority_task_index; i += num_threads ) {
16651c0b2f7Stbbdev bool found = false;
16751c0b2f7Stbbdev unsigned highest_priority_within_group = end_index - i - 1;
16851c0b2f7Stbbdev for( unsigned j = i; j < i+num_threads; ++j ) {
16951c0b2f7Stbbdev if( g_task_info[j].my_priority == int(highest_priority_within_group) ) {
17051c0b2f7Stbbdev found = true;
17151c0b2f7Stbbdev break;
17251c0b2f7Stbbdev }
17351c0b2f7Stbbdev }
17451c0b2f7Stbbdev CHECK_MESSAGE( found, "Highest priority task within a group was not found" );
17551c0b2f7Stbbdev }
17651c0b2f7Stbbdev for( unsigned i = 0; i < g_priority_task_index; ++i ) {
17751c0b2f7Stbbdev // This check might fail because priorities do not guarantee ordering, i.e. assumption
17851c0b2f7Stbbdev // that all priority nodes should increment the task counter before any subsequent
17951c0b2f7Stbbdev // no-priority node is not correct. In the worst case, a thread that took a priority
18051c0b2f7Stbbdev // node might be preempted and become the last to increment the counter. That's why the
18151c0b2f7Stbbdev // test passing is based on statistics, which could be affected by machine overload
18251c0b2f7Stbbdev // unfortunately.
18351c0b2f7Stbbdev // TODO revamp: reconsider the following check for this test
184b15aabb3Stbbdev if( g_task_info[i].my_task_index > int(priority_nodes_num + num_threads) )
18551c0b2f7Stbbdev ++global_order_failures;
18651c0b2f7Stbbdev }
18751c0b2f7Stbbdev }
18851c0b2f7Stbbdev float failure_ratio = float(global_order_failures) / float(repeats*priority_nodes_num);
18951c0b2f7Stbbdev CHECK_MESSAGE(
19051c0b2f7Stbbdev failure_ratio <= 0.1f,
19151c0b2f7Stbbdev "Nodes with priorities executed in wrong order too frequently over non-prioritized nodes."
19251c0b2f7Stbbdev );
19351c0b2f7Stbbdev }
19451c0b2f7Stbbdev
19551c0b2f7Stbbdev template<typename NodeType, typename NodeBody>
call_within_arena(tbb::task_arena & arena)19651c0b2f7Stbbdev void call_within_arena( tbb::task_arena& arena ) {
19751c0b2f7Stbbdev arena.execute(
19851c0b2f7Stbbdev [&]() {
19951c0b2f7Stbbdev test_node<NodeType>( node_creator_t<NodeType, NodeBody>() );
20051c0b2f7Stbbdev }
20151c0b2f7Stbbdev );
20251c0b2f7Stbbdev }
20351c0b2f7Stbbdev
test(int num_threads)20451c0b2f7Stbbdev void test( int num_threads ) {
20551c0b2f7Stbbdev INFO( "Testing execution of nodes with priority takes precedence (num_threads=" << num_threads << ") - " );
20651c0b2f7Stbbdev tbb::task_arena arena(num_threads);
20751c0b2f7Stbbdev call_within_arena< function_node<int,int>, Body<int> >( arena );
20851c0b2f7Stbbdev call_within_arena< multi_node, Body<int> >( arena );
20951c0b2f7Stbbdev call_within_arena< continue_node<continue_msg>, Body<continue_msg> >( arena );
21051c0b2f7Stbbdev }
21151c0b2f7Stbbdev
21251c0b2f7Stbbdev } /* namespace PriorityNodesTakePrecedence */
21351c0b2f7Stbbdev
21451c0b2f7Stbbdev namespace ThreadsEagerReaction {
21551c0b2f7Stbbdev
21651c0b2f7Stbbdev // TODO revamp: combine with similar queue from test_async_node
21751c0b2f7Stbbdev template <typename T>
21851c0b2f7Stbbdev class concurrent_queue {
21951c0b2f7Stbbdev public:
try_pop(T & item)22051c0b2f7Stbbdev bool try_pop(T& item) {
22151c0b2f7Stbbdev std::lock_guard<queue_mutex> lock(mutex);
22251c0b2f7Stbbdev if ( q.empty() )
22351c0b2f7Stbbdev return false;
22451c0b2f7Stbbdev item = q.front();
22551c0b2f7Stbbdev q.pop();
22651c0b2f7Stbbdev return true;
22751c0b2f7Stbbdev }
22851c0b2f7Stbbdev
push(const T & item)22951c0b2f7Stbbdev void push(const T& item) {
23051c0b2f7Stbbdev std::lock_guard<queue_mutex> lock(mutex);
23151c0b2f7Stbbdev q.push(item);
23251c0b2f7Stbbdev }
23351c0b2f7Stbbdev private:
23451c0b2f7Stbbdev std::queue<T> q;
23551c0b2f7Stbbdev using queue_mutex = std::mutex;
23651c0b2f7Stbbdev std::mutex mutex;
23751c0b2f7Stbbdev };
23851c0b2f7Stbbdev
23951c0b2f7Stbbdev using utils::SpinBarrier;
24051c0b2f7Stbbdev
24151c0b2f7Stbbdev enum task_type_t { no_task, regular_task, async_task };
24251c0b2f7Stbbdev
24351c0b2f7Stbbdev struct profile_t {
24451c0b2f7Stbbdev task_type_t task_type;
24551c0b2f7Stbbdev unsigned global_task_id;
24651c0b2f7Stbbdev double elapsed;
24751c0b2f7Stbbdev };
24851c0b2f7Stbbdev
24951c0b2f7Stbbdev std::vector<unsigned> g_async_task_ids;
25051c0b2f7Stbbdev
25151c0b2f7Stbbdev typedef unsigned data_type;
25251c0b2f7Stbbdev typedef async_node<data_type, data_type> async_node_type;
25351c0b2f7Stbbdev typedef multifunction_node<
25451c0b2f7Stbbdev data_type, std::tuple<data_type, data_type> > decider_node_type;
25551c0b2f7Stbbdev struct AsyncActivity {
25651c0b2f7Stbbdev typedef async_node_type::gateway_type gateway_type;
25751c0b2f7Stbbdev
25851c0b2f7Stbbdev struct work_type { data_type input; gateway_type* gateway; };
25959ac78faSAlex std::atomic<bool> done;
26051c0b2f7Stbbdev concurrent_queue<work_type> my_queue;
26151c0b2f7Stbbdev std::thread my_service_thread;
26251c0b2f7Stbbdev
26351c0b2f7Stbbdev struct ServiceThreadFunc {
26451c0b2f7Stbbdev SpinBarrier& my_barrier;
ServiceThreadFuncThreadsEagerReaction::AsyncActivity::ServiceThreadFunc26551c0b2f7Stbbdev ServiceThreadFunc(SpinBarrier& barrier) : my_barrier(barrier) {}
operator ()ThreadsEagerReaction::AsyncActivity::ServiceThreadFunc26651c0b2f7Stbbdev void operator()(AsyncActivity* activity) {
26751c0b2f7Stbbdev while (!activity->done) {
26851c0b2f7Stbbdev work_type work;
26951c0b2f7Stbbdev while (activity->my_queue.try_pop(work)) {
27051c0b2f7Stbbdev g_async_task_ids.push_back( ++g_task_num );
27151c0b2f7Stbbdev work.gateway->try_put(work.input);
27251c0b2f7Stbbdev work.gateway->release_wait();
27351c0b2f7Stbbdev my_barrier.wait();
27451c0b2f7Stbbdev }
27551c0b2f7Stbbdev }
27651c0b2f7Stbbdev }
27751c0b2f7Stbbdev };
stop_and_waitThreadsEagerReaction::AsyncActivity27851c0b2f7Stbbdev void stop_and_wait() { done = true; my_service_thread.join(); }
27951c0b2f7Stbbdev
submitThreadsEagerReaction::AsyncActivity28051c0b2f7Stbbdev void submit(data_type input, gateway_type* gateway) {
28151c0b2f7Stbbdev work_type work = { input, gateway };
28251c0b2f7Stbbdev gateway->reserve_wait();
28351c0b2f7Stbbdev my_queue.push(work);
28451c0b2f7Stbbdev }
AsyncActivityThreadsEagerReaction::AsyncActivity28551c0b2f7Stbbdev AsyncActivity(SpinBarrier& barrier)
28651c0b2f7Stbbdev : done(false), my_service_thread(ServiceThreadFunc(barrier), this) {}
28751c0b2f7Stbbdev };
28851c0b2f7Stbbdev
28951c0b2f7Stbbdev struct StartBody {
29051c0b2f7Stbbdev bool has_run;
operator ()ThreadsEagerReaction::StartBody29151c0b2f7Stbbdev data_type operator()(tbb::flow_control& fc) {
29251c0b2f7Stbbdev if (has_run){
29351c0b2f7Stbbdev fc.stop();
29451c0b2f7Stbbdev return data_type();
29551c0b2f7Stbbdev }
29651c0b2f7Stbbdev has_run = true;
29751c0b2f7Stbbdev return 1;
29851c0b2f7Stbbdev }
StartBodyThreadsEagerReaction::StartBody29951c0b2f7Stbbdev StartBody() : has_run(false) {}
30051c0b2f7Stbbdev };
30151c0b2f7Stbbdev
30251c0b2f7Stbbdev struct ParallelForBody {
30351c0b2f7Stbbdev SpinBarrier& my_barrier;
30451c0b2f7Stbbdev const data_type& my_input;
ParallelForBodyThreadsEagerReaction::ParallelForBody30551c0b2f7Stbbdev ParallelForBody(SpinBarrier& barrier, const data_type& input)
30651c0b2f7Stbbdev : my_barrier(barrier), my_input(input) {}
operator ()ThreadsEagerReaction::ParallelForBody30751c0b2f7Stbbdev void operator()(const data_type&) const {
30851c0b2f7Stbbdev my_barrier.wait();
30951c0b2f7Stbbdev ++g_task_num;
31051c0b2f7Stbbdev }
31151c0b2f7Stbbdev };
31251c0b2f7Stbbdev
31351c0b2f7Stbbdev struct CpuWorkBody {
31451c0b2f7Stbbdev SpinBarrier& my_barrier;
31551c0b2f7Stbbdev const int my_tasks_count;
operator ()ThreadsEagerReaction::CpuWorkBody31651c0b2f7Stbbdev data_type operator()(const data_type& input) {
31751c0b2f7Stbbdev tbb::parallel_for(0, my_tasks_count, ParallelForBody(my_barrier, input), tbb::simple_partitioner());
31851c0b2f7Stbbdev return input;
31951c0b2f7Stbbdev }
CpuWorkBodyThreadsEagerReaction::CpuWorkBody32051c0b2f7Stbbdev CpuWorkBody(SpinBarrier& barrier, int tasks_count)
32151c0b2f7Stbbdev : my_barrier(barrier), my_tasks_count(tasks_count) {}
32251c0b2f7Stbbdev };
32351c0b2f7Stbbdev
32451c0b2f7Stbbdev struct DeciderBody {
32551c0b2f7Stbbdev const data_type my_limit;
DeciderBodyThreadsEagerReaction::DeciderBody32651c0b2f7Stbbdev DeciderBody( const data_type& limit ) : my_limit( limit ) {}
operator ()ThreadsEagerReaction::DeciderBody32751c0b2f7Stbbdev void operator()(data_type input, decider_node_type::output_ports_type& ports) {
32851c0b2f7Stbbdev if (input < my_limit)
32951c0b2f7Stbbdev std::get<0>(ports).try_put(input + 1);
33051c0b2f7Stbbdev }
33151c0b2f7Stbbdev };
33251c0b2f7Stbbdev
33351c0b2f7Stbbdev struct AsyncSubmissionBody {
33451c0b2f7Stbbdev AsyncActivity* my_activity;
335*324afd9eSIlya Mishin // It is important that async_node in the test executes without spawning a TBB task, because
336*324afd9eSIlya Mishin // it passes the work to asynchronous thread, which unlocks the barrier that is waited
337*324afd9eSIlya Mishin // by every execution thread (asynchronous thread and any TBB worker or main thread).
338*324afd9eSIlya Mishin // This is why async_node's body marked noexcept.
operator ()ThreadsEagerReaction::AsyncSubmissionBody339*324afd9eSIlya Mishin void operator()(data_type input, async_node_type::gateway_type& gateway) noexcept {
34051c0b2f7Stbbdev my_activity->submit(input, &gateway);
34151c0b2f7Stbbdev }
AsyncSubmissionBodyThreadsEagerReaction::AsyncSubmissionBody34251c0b2f7Stbbdev AsyncSubmissionBody(AsyncActivity* activity) : my_activity(activity) {}
34351c0b2f7Stbbdev };
34451c0b2f7Stbbdev
test(unsigned num_threads)34551c0b2f7Stbbdev void test( unsigned num_threads ) {
34651c0b2f7Stbbdev INFO( "Testing threads react eagerly on asynchronous tasks (num_threads=" << num_threads << ") - " );
34751c0b2f7Stbbdev if( num_threads == std::thread::hardware_concurrency() ) {
34851c0b2f7Stbbdev // one thread is required for asynchronous compute resource
34951c0b2f7Stbbdev INFO("skipping test since it is designed to work on less number of threads than "
35051c0b2f7Stbbdev "hardware concurrency allows\n");
35151c0b2f7Stbbdev return;
35251c0b2f7Stbbdev }
35351c0b2f7Stbbdev const unsigned cpu_threads = unsigned(num_threads);
35451c0b2f7Stbbdev const unsigned cpu_tasks_per_thread = 4;
35551c0b2f7Stbbdev const unsigned nested_cpu_tasks = cpu_tasks_per_thread * cpu_threads;
35651c0b2f7Stbbdev const unsigned async_subgraph_reruns = 8;
35751c0b2f7Stbbdev const unsigned cpu_subgraph_reruns = 2;
35851c0b2f7Stbbdev
35951c0b2f7Stbbdev SpinBarrier barrier(cpu_threads + /*async thread=*/1);
36051c0b2f7Stbbdev g_task_num = 0;
36151c0b2f7Stbbdev g_async_task_ids.clear();
36251c0b2f7Stbbdev g_async_task_ids.reserve(async_subgraph_reruns);
36351c0b2f7Stbbdev
36451c0b2f7Stbbdev tbb::task_arena arena(cpu_threads);
36551c0b2f7Stbbdev arena.execute(
36651c0b2f7Stbbdev [&]() {
36751c0b2f7Stbbdev AsyncActivity activity(barrier);
36851c0b2f7Stbbdev graph g;
36951c0b2f7Stbbdev
37051c0b2f7Stbbdev input_node<data_type> starter_node(g, StartBody());
37151c0b2f7Stbbdev function_node<data_type, data_type> cpu_work_node(
37251c0b2f7Stbbdev g, unlimited, CpuWorkBody(barrier, nested_cpu_tasks));
37351c0b2f7Stbbdev decider_node_type cpu_restarter_node(g, unlimited, DeciderBody(cpu_subgraph_reruns));
37451c0b2f7Stbbdev async_node_type async_node(g, unlimited, AsyncSubmissionBody(&activity));
37551c0b2f7Stbbdev decider_node_type async_restarter_node(
37651c0b2f7Stbbdev g, unlimited, DeciderBody(async_subgraph_reruns), node_priority_t(1)
37751c0b2f7Stbbdev );
37851c0b2f7Stbbdev
37951c0b2f7Stbbdev make_edge(starter_node, cpu_work_node);
38051c0b2f7Stbbdev make_edge(cpu_work_node, cpu_restarter_node);
38151c0b2f7Stbbdev make_edge(output_port<0>(cpu_restarter_node), cpu_work_node);
38251c0b2f7Stbbdev
38351c0b2f7Stbbdev make_edge(starter_node, async_node);
38451c0b2f7Stbbdev make_edge(async_node, async_restarter_node);
38551c0b2f7Stbbdev make_edge(output_port<0>(async_restarter_node), async_node);
38651c0b2f7Stbbdev
38751c0b2f7Stbbdev starter_node.activate();
38851c0b2f7Stbbdev g.wait_for_all();
38951c0b2f7Stbbdev activity.stop_and_wait();
39051c0b2f7Stbbdev
39151c0b2f7Stbbdev const size_t async_task_num = size_t(async_subgraph_reruns);
39251c0b2f7Stbbdev CHECK_MESSAGE( ( g_async_task_ids.size() == async_task_num), "Incorrect number of async tasks." );
39351c0b2f7Stbbdev unsigned max_span = unsigned(2 * cpu_threads + 1);
39451c0b2f7Stbbdev for( size_t idx = 1; idx < async_task_num; ++idx ) {
39551c0b2f7Stbbdev CHECK_MESSAGE( (g_async_task_ids[idx] - g_async_task_ids[idx-1] <= max_span),
39651c0b2f7Stbbdev "Async tasks were not able to interfere with CPU tasks." );
39751c0b2f7Stbbdev
39851c0b2f7Stbbdev }
39951c0b2f7Stbbdev }
40051c0b2f7Stbbdev );
40151c0b2f7Stbbdev INFO("done\n");
40251c0b2f7Stbbdev }
40351c0b2f7Stbbdev } /* ThreadsEagerReaction */
40451c0b2f7Stbbdev
40551c0b2f7Stbbdev namespace LimitingExecutionToPriorityTask {
40651c0b2f7Stbbdev
40751c0b2f7Stbbdev enum work_type_t { NONPRIORITIZED_WORK, PRIORITIZED_WORK };
40851c0b2f7Stbbdev
40951c0b2f7Stbbdev struct execution_tracker_t {
execution_tracker_tLimitingExecutionToPriorityTask::execution_tracker_t41051c0b2f7Stbbdev execution_tracker_t() { reset(); }
resetLimitingExecutionToPriorityTask::execution_tracker_t41151c0b2f7Stbbdev void reset() {
41251c0b2f7Stbbdev prioritized_work_submitter = std::thread::id();
41351c0b2f7Stbbdev prioritized_work_started = false;
41451c0b2f7Stbbdev prioritized_work_finished = false;
41551c0b2f7Stbbdev prioritized_work_interrupted = false;
41651c0b2f7Stbbdev }
41751c0b2f7Stbbdev std::thread::id prioritized_work_submitter;
41859ac78faSAlex std::atomic<bool> prioritized_work_started;
41951c0b2f7Stbbdev bool prioritized_work_finished;
42051c0b2f7Stbbdev bool prioritized_work_interrupted;
42151c0b2f7Stbbdev } exec_tracker;
42251c0b2f7Stbbdev
42351c0b2f7Stbbdev template<work_type_t work_type>
42451c0b2f7Stbbdev void do_node_work( int work_size );
42551c0b2f7Stbbdev
42651c0b2f7Stbbdev template<work_type_t>
42751c0b2f7Stbbdev void do_nested_work( const std::thread::id& tid, const tbb::blocked_range<int>& subrange );
42851c0b2f7Stbbdev
42951c0b2f7Stbbdev template<work_type_t work_type>
43051c0b2f7Stbbdev struct CommonBody {
CommonBodyLimitingExecutionToPriorityTask::CommonBody43151c0b2f7Stbbdev CommonBody() : my_body_size( 0 ) { }
CommonBodyLimitingExecutionToPriorityTask::CommonBody43251c0b2f7Stbbdev CommonBody( int body_size ) : my_body_size( body_size ) { }
operator ()LimitingExecutionToPriorityTask::CommonBody43351c0b2f7Stbbdev continue_msg operator()( const continue_msg& msg ) const {
43451c0b2f7Stbbdev do_node_work<work_type>(my_body_size);
43551c0b2f7Stbbdev return msg;
43651c0b2f7Stbbdev }
operator ()LimitingExecutionToPriorityTask::CommonBody43751c0b2f7Stbbdev void operator()( const tbb::blocked_range<int>& subrange ) const {
43851c0b2f7Stbbdev do_nested_work<work_type>( /*tid=*/std::this_thread::get_id(), subrange );
43951c0b2f7Stbbdev }
44051c0b2f7Stbbdev int my_body_size;
44151c0b2f7Stbbdev };
44251c0b2f7Stbbdev
44351c0b2f7Stbbdev template<work_type_t work_type>
do_node_work(int work_size)44451c0b2f7Stbbdev void do_node_work(int work_size) {
44551c0b2f7Stbbdev tbb::parallel_for( tbb::blocked_range<int>(0, work_size), CommonBody<work_type>(),
44651c0b2f7Stbbdev tbb::simple_partitioner() );
44751c0b2f7Stbbdev }
44851c0b2f7Stbbdev
44951c0b2f7Stbbdev template<work_type_t>
do_nested_work(const std::thread::id & tid,const tbb::blocked_range<int> &)45051c0b2f7Stbbdev void do_nested_work( const std::thread::id& tid, const tbb::blocked_range<int>& /*subrange*/ ) {
45151c0b2f7Stbbdev // This is non-prioritized work...
45259ac78faSAlex if( !exec_tracker.prioritized_work_started || exec_tracker.prioritized_work_submitter != tid )
45351c0b2f7Stbbdev return;
45451c0b2f7Stbbdev // ...being executed by the thread that initially started prioritized one...
45551c0b2f7Stbbdev CHECK_MESSAGE( exec_tracker.prioritized_work_started,
45651c0b2f7Stbbdev "Prioritized work should have been started by that time." );
45751c0b2f7Stbbdev // ...prioritized work has been started already...
45851c0b2f7Stbbdev if( exec_tracker.prioritized_work_finished )
45951c0b2f7Stbbdev return;
46051c0b2f7Stbbdev // ...but has not been finished yet
46151c0b2f7Stbbdev exec_tracker.prioritized_work_interrupted = true;
46251c0b2f7Stbbdev }
46351c0b2f7Stbbdev
46451c0b2f7Stbbdev struct IsolationFunctor {
46551c0b2f7Stbbdev int work_size;
IsolationFunctorLimitingExecutionToPriorityTask::IsolationFunctor46651c0b2f7Stbbdev IsolationFunctor(int ws) : work_size(ws) {}
operator ()LimitingExecutionToPriorityTask::IsolationFunctor46751c0b2f7Stbbdev void operator()() const {
46851c0b2f7Stbbdev tbb::parallel_for( tbb::blocked_range<int>(0, work_size), CommonBody<PRIORITIZED_WORK>(),
46951c0b2f7Stbbdev tbb::simple_partitioner() );
47051c0b2f7Stbbdev }
47151c0b2f7Stbbdev };
47251c0b2f7Stbbdev
47351c0b2f7Stbbdev template<>
do_node_work(int work_size)47451c0b2f7Stbbdev void do_node_work<PRIORITIZED_WORK>(int work_size) {
47551c0b2f7Stbbdev exec_tracker.prioritized_work_submitter = std::this_thread::get_id();
47651c0b2f7Stbbdev exec_tracker.prioritized_work_started = true;
47751c0b2f7Stbbdev tbb::this_task_arena::isolate( IsolationFunctor(work_size) );
47851c0b2f7Stbbdev exec_tracker.prioritized_work_finished = true;
47951c0b2f7Stbbdev }
48051c0b2f7Stbbdev
48151c0b2f7Stbbdev template<>
do_nested_work(const std::thread::id & tid,const tbb::blocked_range<int> &)48251c0b2f7Stbbdev void do_nested_work<PRIORITIZED_WORK>( const std::thread::id& tid,
48351c0b2f7Stbbdev const tbb::blocked_range<int>& /*subrange*/ ) {
48459ac78faSAlex if( exec_tracker.prioritized_work_started && exec_tracker.prioritized_work_submitter == tid ) {
48551c0b2f7Stbbdev CHECK_MESSAGE( !exec_tracker.prioritized_work_interrupted,
48651c0b2f7Stbbdev "Thread was not fully devoted to processing of prioritized task." );
48751c0b2f7Stbbdev } else {
48851c0b2f7Stbbdev // prolong processing of prioritized work so that the thread that started
48951c0b2f7Stbbdev // prioritized work has higher probability to help with non-prioritized one.
49051c0b2f7Stbbdev spin_for(0.1);
49151c0b2f7Stbbdev }
49251c0b2f7Stbbdev }
49351c0b2f7Stbbdev
49451c0b2f7Stbbdev // Using pointers to nodes to avoid errors on compilers, which try to generate assignment operator
49551c0b2f7Stbbdev // for the nodes
496b15aabb3Stbbdev typedef std::vector< std::unique_ptr<continue_node<continue_msg>> > nodes_container_t;
49751c0b2f7Stbbdev
create_nodes(nodes_container_t & nodes,graph & g,int num,int body_size)49851c0b2f7Stbbdev void create_nodes( nodes_container_t& nodes, graph& g, int num, int body_size ) {
49951c0b2f7Stbbdev for( int i = 0; i < num; ++i )
50051c0b2f7Stbbdev nodes.push_back(
501b15aabb3Stbbdev std::unique_ptr<continue_node<continue_msg>>(
50251c0b2f7Stbbdev new continue_node<continue_msg>( g, CommonBody<NONPRIORITIZED_WORK>( body_size ) )
503b15aabb3Stbbdev )
50451c0b2f7Stbbdev );
50551c0b2f7Stbbdev }
50651c0b2f7Stbbdev
test(int num_threads)50751c0b2f7Stbbdev void test( int num_threads ) {
50851c0b2f7Stbbdev INFO( "Testing limit execution to priority tasks (num_threads=" << num_threads << ") - " );
50951c0b2f7Stbbdev
51051c0b2f7Stbbdev tbb::task_arena arena( num_threads );
51151c0b2f7Stbbdev arena.execute(
51251c0b2f7Stbbdev [&]() {
51351c0b2f7Stbbdev const int nodes_num = 100;
51451c0b2f7Stbbdev const int priority_node_position_part = 10;
51551c0b2f7Stbbdev const int pivot = nodes_num / priority_node_position_part;
51651c0b2f7Stbbdev const int nodes_in_lane = 3 * num_threads;
51751c0b2f7Stbbdev const int small_problem_size = 100;
51851c0b2f7Stbbdev const int large_problem_size = 1000;
51951c0b2f7Stbbdev
52051c0b2f7Stbbdev graph g;
52151c0b2f7Stbbdev nodes_container_t nodes;
52251c0b2f7Stbbdev create_nodes( nodes, g, pivot, large_problem_size );
52351c0b2f7Stbbdev nodes.push_back(
524b15aabb3Stbbdev std::unique_ptr<continue_node<continue_msg>>(
52551c0b2f7Stbbdev new continue_node<continue_msg>(
52651c0b2f7Stbbdev g, CommonBody<PRIORITIZED_WORK>(small_problem_size), node_priority_t(1)
52751c0b2f7Stbbdev )
528b15aabb3Stbbdev )
52951c0b2f7Stbbdev );
53051c0b2f7Stbbdev create_nodes( nodes, g, nodes_num - pivot - 1, large_problem_size );
53151c0b2f7Stbbdev
53251c0b2f7Stbbdev broadcast_node<continue_msg> bn(g);
53351c0b2f7Stbbdev for( int i = 0; i < nodes_num; ++i )
53451c0b2f7Stbbdev if( i % nodes_in_lane == 0 )
53551c0b2f7Stbbdev make_edge( bn, *nodes[i] );
53651c0b2f7Stbbdev else
53751c0b2f7Stbbdev make_edge( *nodes[i-1], *nodes[i] );
53851c0b2f7Stbbdev exec_tracker.reset();
53951c0b2f7Stbbdev bn.try_put( continue_msg() );
54051c0b2f7Stbbdev g.wait_for_all();
54151c0b2f7Stbbdev }
54251c0b2f7Stbbdev );
543b15aabb3Stbbdev
544b15aabb3Stbbdev INFO( "done\n" );
54551c0b2f7Stbbdev }
54651c0b2f7Stbbdev
54751c0b2f7Stbbdev } /* namespace LimitingExecutionToPriorityTask */
54851c0b2f7Stbbdev
54951c0b2f7Stbbdev namespace NestedCase {
55051c0b2f7Stbbdev
55151c0b2f7Stbbdev using tbb::task_arena;
55251c0b2f7Stbbdev
55351c0b2f7Stbbdev struct InnerBody {
operator ()NestedCase::InnerBody55451c0b2f7Stbbdev continue_msg operator()( const continue_msg& ) const {
55551c0b2f7Stbbdev return continue_msg();
55651c0b2f7Stbbdev }
55751c0b2f7Stbbdev };
55851c0b2f7Stbbdev
55951c0b2f7Stbbdev struct OuterBody {
56051c0b2f7Stbbdev int my_max_threads;
561b15aabb3Stbbdev task_arena** my_inner_arena;
OuterBodyNestedCase::OuterBody562b15aabb3Stbbdev OuterBody( int max_threads, task_arena** inner_arena )
56351c0b2f7Stbbdev : my_max_threads(max_threads), my_inner_arena(inner_arena) {}
56451c0b2f7Stbbdev // copy constructor to please some old compilers
OuterBodyNestedCase::OuterBody56551c0b2f7Stbbdev OuterBody( const OuterBody& rhs )
56651c0b2f7Stbbdev : my_max_threads(rhs.my_max_threads), my_inner_arena(rhs.my_inner_arena) {}
operator ()NestedCase::OuterBody56751c0b2f7Stbbdev int operator()( const int& ) {
56851c0b2f7Stbbdev graph inner_graph;
56951c0b2f7Stbbdev continue_node<continue_msg> start_node(inner_graph, InnerBody());
57051c0b2f7Stbbdev continue_node<continue_msg> mid_node1(inner_graph, InnerBody(), node_priority_t(5));
57151c0b2f7Stbbdev continue_node<continue_msg> mid_node2(inner_graph, InnerBody());
57251c0b2f7Stbbdev continue_node<continue_msg> end_node(inner_graph, InnerBody(), node_priority_t(15));
57351c0b2f7Stbbdev make_edge( start_node, mid_node1 );
57451c0b2f7Stbbdev make_edge( mid_node1, end_node );
57551c0b2f7Stbbdev make_edge( start_node, mid_node2 );
57651c0b2f7Stbbdev make_edge( mid_node2, end_node );
577b15aabb3Stbbdev (*my_inner_arena)->execute( [&inner_graph]{ inner_graph.reset(); } );
57851c0b2f7Stbbdev start_node.try_put( continue_msg() );
57951c0b2f7Stbbdev inner_graph.wait_for_all();
58051c0b2f7Stbbdev return 13;
58151c0b2f7Stbbdev }
58251c0b2f7Stbbdev };
58351c0b2f7Stbbdev
execute_outer_graph(bool same_arena,task_arena & inner_arena,int max_threads,graph & outer_graph,function_node<int,int> & start_node)58451c0b2f7Stbbdev void execute_outer_graph( bool same_arena, task_arena& inner_arena, int max_threads,
58551c0b2f7Stbbdev graph& outer_graph, function_node<int,int>& start_node ) {
58651c0b2f7Stbbdev if( same_arena ) {
58751c0b2f7Stbbdev start_node.try_put( 42 );
58851c0b2f7Stbbdev outer_graph.wait_for_all();
58951c0b2f7Stbbdev return;
59051c0b2f7Stbbdev }
591b15aabb3Stbbdev
592b15aabb3Stbbdev auto threads_range = utils::concurrency_range(max_threads);
593b15aabb3Stbbdev for( auto num_threads : threads_range ) {
59455f9b178SIvan Kochin inner_arena.initialize( static_cast<int>(num_threads) );
59551c0b2f7Stbbdev start_node.try_put( 42 );
59651c0b2f7Stbbdev outer_graph.wait_for_all();
59751c0b2f7Stbbdev inner_arena.terminate();
59851c0b2f7Stbbdev }
59951c0b2f7Stbbdev }
60051c0b2f7Stbbdev
test_in_arena(int max_threads,task_arena & outer_arena,task_arena & inner_arena,graph & outer_graph,function_node<int,int> & start_node)601b15aabb3Stbbdev void test_in_arena( int max_threads, task_arena& outer_arena, task_arena& inner_arena,
602b15aabb3Stbbdev graph& outer_graph, function_node<int, int>& start_node ) {
603b15aabb3Stbbdev bool same_arena = &outer_arena == &inner_arena;
604b15aabb3Stbbdev auto threads_range = utils::concurrency_range(max_threads);
605b15aabb3Stbbdev for( auto num_threads : threads_range ) {
606b15aabb3Stbbdev INFO( "Testing nested nodes with specified priority in " << (same_arena? "same" : "different")
607b15aabb3Stbbdev << " arenas, num_threads=" << num_threads << ") - " );
60855f9b178SIvan Kochin outer_arena.initialize( static_cast<int>(num_threads) );
609b15aabb3Stbbdev outer_arena.execute( [&outer_graph]{ outer_graph.reset(); } );
610b15aabb3Stbbdev execute_outer_graph( same_arena, inner_arena, max_threads, outer_graph, start_node );
611b15aabb3Stbbdev outer_arena.terminate();
612b15aabb3Stbbdev INFO( "done\n" );
613b15aabb3Stbbdev }
614b15aabb3Stbbdev }
615b15aabb3Stbbdev
test(int max_threads)616b15aabb3Stbbdev void test( int max_threads ) {
617b15aabb3Stbbdev task_arena outer_arena; task_arena inner_arena;
618b15aabb3Stbbdev task_arena* inner_arena_pointer = &outer_arena; // make it same as outer arena in the beginning
619b15aabb3Stbbdev
62051c0b2f7Stbbdev graph outer_graph;
62151c0b2f7Stbbdev const unsigned num_outer_nodes = 10;
62251c0b2f7Stbbdev const size_t concurrency = unlimited;
623b15aabb3Stbbdev std::vector< std::unique_ptr<function_node<int,int>> > outer_nodes;
62451c0b2f7Stbbdev for( unsigned node_index = 0; node_index < num_outer_nodes; ++node_index ) {
62551c0b2f7Stbbdev node_priority_t priority = no_priority;
62651c0b2f7Stbbdev if( node_index == num_outer_nodes / 2 )
62751c0b2f7Stbbdev priority = 10;
62851c0b2f7Stbbdev
62951c0b2f7Stbbdev outer_nodes.push_back(
630b15aabb3Stbbdev std::unique_ptr< function_node<int, int> >(
63151c0b2f7Stbbdev new function_node<int,int>(
632b15aabb3Stbbdev outer_graph, concurrency, OuterBody(max_threads, &inner_arena_pointer), priority
633b15aabb3Stbbdev )
63451c0b2f7Stbbdev )
63551c0b2f7Stbbdev );
63651c0b2f7Stbbdev }
63751c0b2f7Stbbdev
63851c0b2f7Stbbdev for( unsigned node_index1 = 0; node_index1 < num_outer_nodes; ++node_index1 )
63951c0b2f7Stbbdev for( unsigned node_index2 = node_index1+1; node_index2 < num_outer_nodes; ++node_index2 )
64051c0b2f7Stbbdev make_edge( *outer_nodes[node_index1], *outer_nodes[node_index2] );
64151c0b2f7Stbbdev
642b15aabb3Stbbdev test_in_arena( max_threads, outer_arena, outer_arena, outer_graph, *outer_nodes[0] );
64351c0b2f7Stbbdev
644b15aabb3Stbbdev inner_arena_pointer = &inner_arena;
64551c0b2f7Stbbdev
646b15aabb3Stbbdev test_in_arena( max_threads, outer_arena, inner_arena, outer_graph, *outer_nodes[0] );
64751c0b2f7Stbbdev }
64851c0b2f7Stbbdev } // namespace NestedCase
64951c0b2f7Stbbdev
65051c0b2f7Stbbdev
65151c0b2f7Stbbdev namespace BypassPrioritizedTask {
65251c0b2f7Stbbdev
common_body(int priority)65351c0b2f7Stbbdev void common_body( int priority ) {
65451c0b2f7Stbbdev int current_task_index = g_task_num++;
65551c0b2f7Stbbdev g_task_info.push_back( TaskInfo( priority, current_task_index ) );
65651c0b2f7Stbbdev }
65751c0b2f7Stbbdev
65851c0b2f7Stbbdev struct Body {
BodyBypassPrioritizedTask::Body65951c0b2f7Stbbdev Body( int priority ) : my_priority( priority ) {}
operator ()BypassPrioritizedTask::Body66051c0b2f7Stbbdev continue_msg operator()(const continue_msg&) {
66151c0b2f7Stbbdev common_body( my_priority );
66251c0b2f7Stbbdev return continue_msg();
66351c0b2f7Stbbdev }
66451c0b2f7Stbbdev int my_priority;
66551c0b2f7Stbbdev };
66651c0b2f7Stbbdev
66751c0b2f7Stbbdev struct InputNodeBody {
operator ()BypassPrioritizedTask::InputNodeBody66851c0b2f7Stbbdev continue_msg operator()( tbb::flow_control& fc ){
66951c0b2f7Stbbdev static bool is_source_executed = false;
67051c0b2f7Stbbdev
67151c0b2f7Stbbdev if( is_source_executed ) {
67251c0b2f7Stbbdev fc.stop();
67351c0b2f7Stbbdev return continue_msg();
67451c0b2f7Stbbdev }
67551c0b2f7Stbbdev
67651c0b2f7Stbbdev common_body( 0 );
67751c0b2f7Stbbdev is_source_executed = true;
67851c0b2f7Stbbdev
67951c0b2f7Stbbdev return continue_msg();
68051c0b2f7Stbbdev }
68151c0b2f7Stbbdev };
68251c0b2f7Stbbdev
68351c0b2f7Stbbdev template<typename StarterNodeType>
create_starter_node(graph & g)68451c0b2f7Stbbdev StarterNodeType create_starter_node(graph& g) {
68551c0b2f7Stbbdev return continue_node<continue_msg>( g, Body(0) );
68651c0b2f7Stbbdev }
68751c0b2f7Stbbdev
68851c0b2f7Stbbdev template<>
create_starter_node(graph & g)68951c0b2f7Stbbdev input_node<continue_msg> create_starter_node<input_node<continue_msg>>(graph& g) {
69051c0b2f7Stbbdev return input_node<continue_msg>( g, InputNodeBody() );
69151c0b2f7Stbbdev }
69251c0b2f7Stbbdev
69351c0b2f7Stbbdev template<typename StarterNodeType>
start_graph(StarterNodeType & starter)69451c0b2f7Stbbdev void start_graph( StarterNodeType& starter ) {
69551c0b2f7Stbbdev starter.try_put( continue_msg() );
69651c0b2f7Stbbdev }
69751c0b2f7Stbbdev
69851c0b2f7Stbbdev template<>
start_graph(input_node<continue_msg> & starter)69951c0b2f7Stbbdev void start_graph<input_node<continue_msg>>( input_node<continue_msg>& starter ) {
70051c0b2f7Stbbdev starter.activate();
70151c0b2f7Stbbdev }
70251c0b2f7Stbbdev
70351c0b2f7Stbbdev template<typename StarterNodeType>
test_use_case()70451c0b2f7Stbbdev void test_use_case() {
70551c0b2f7Stbbdev g_task_info.clear();
70651c0b2f7Stbbdev g_task_num = 0;
70751c0b2f7Stbbdev graph g;
70851c0b2f7Stbbdev StarterNodeType starter = create_starter_node<StarterNodeType>(g);
70951c0b2f7Stbbdev continue_node<continue_msg> spawn_successor( g, Body(1), node_priority_t(1) );
71051c0b2f7Stbbdev continue_node<continue_msg> bypass_successor( g, Body(2), node_priority_t(2) );
71151c0b2f7Stbbdev
71251c0b2f7Stbbdev make_edge( starter, spawn_successor );
71351c0b2f7Stbbdev make_edge( starter, bypass_successor );
71451c0b2f7Stbbdev
71551c0b2f7Stbbdev start_graph<StarterNodeType>( starter );
71651c0b2f7Stbbdev g.wait_for_all();
71751c0b2f7Stbbdev
71851c0b2f7Stbbdev CHECK_MESSAGE( g_task_info.size() == 3, "" );
71951c0b2f7Stbbdev CHECK_MESSAGE( g_task_info[0].my_task_index == 0, "" );
72051c0b2f7Stbbdev CHECK_MESSAGE( g_task_info[1].my_task_index == 1, "" );
72151c0b2f7Stbbdev CHECK_MESSAGE( g_task_info[2].my_task_index == 2, "" );
72251c0b2f7Stbbdev
72351c0b2f7Stbbdev CHECK_MESSAGE( g_task_info[0].my_priority == 0, "" );
72451c0b2f7Stbbdev CHECK_MESSAGE( g_task_info[1].my_priority == 2, "Bypassed task with higher priority executed in wrong order." );
72551c0b2f7Stbbdev CHECK_MESSAGE( g_task_info[2].my_priority == 1, "" );
72651c0b2f7Stbbdev }
72751c0b2f7Stbbdev
72851c0b2f7Stbbdev //! The test checks that the task from the node with higher priority, which task gets bypassed, is
72951c0b2f7Stbbdev //! executed first than the one spawned with lower priority.
test()73051c0b2f7Stbbdev void test() {
73151c0b2f7Stbbdev test_use_case<continue_node<continue_msg>>();
73251c0b2f7Stbbdev test_use_case<input_node<continue_msg>>();
73351c0b2f7Stbbdev }
73451c0b2f7Stbbdev
73551c0b2f7Stbbdev } // namespace BypassPrioritizedTask
73651c0b2f7Stbbdev
73751c0b2f7Stbbdev namespace ManySuccessors {
73851c0b2f7Stbbdev
73951c0b2f7Stbbdev struct no_priority_node_body {
operator ()ManySuccessors::no_priority_node_body74051c0b2f7Stbbdev void operator()(continue_msg) {
74151c0b2f7Stbbdev CHECK_MESSAGE(
74251c0b2f7Stbbdev barrier == 0, "Non-priority successor has to be executed after all priority successors"
74351c0b2f7Stbbdev );
74451c0b2f7Stbbdev }
74551c0b2f7Stbbdev std::atomic<int>& barrier;
74651c0b2f7Stbbdev };
74751c0b2f7Stbbdev
74851c0b2f7Stbbdev struct priority_node_body {
operator ()ManySuccessors::priority_node_body74951c0b2f7Stbbdev void operator()(continue_msg) {
75051c0b2f7Stbbdev --barrier;
75151c0b2f7Stbbdev while (barrier)
752b15aabb3Stbbdev tbb::detail::d0::yield();
75351c0b2f7Stbbdev }
75451c0b2f7Stbbdev std::atomic<int>& barrier;
75551c0b2f7Stbbdev };
75651c0b2f7Stbbdev
test(int num_threads)75751c0b2f7Stbbdev void test(int num_threads) {
75851c0b2f7Stbbdev tbb::task_arena arena( num_threads );
75951c0b2f7Stbbdev arena.execute(
76051c0b2f7Stbbdev [&]() {
76151c0b2f7Stbbdev graph g;
76251c0b2f7Stbbdev broadcast_node<continue_msg> bn(g);
763b15aabb3Stbbdev std::vector< std::unique_ptr<continue_node<continue_msg>> > nodes;
76451c0b2f7Stbbdev std::atomic<int> barrier;
76551c0b2f7Stbbdev for (int i = 0; i < 2 * num_threads; ++i)
766b15aabb3Stbbdev nodes.push_back(
767b15aabb3Stbbdev std::unique_ptr<continue_node<continue_msg>>(
768b15aabb3Stbbdev new continue_node<continue_msg>(g, no_priority_node_body{ barrier })
769b15aabb3Stbbdev )
770b15aabb3Stbbdev );
77151c0b2f7Stbbdev for (int i = 0; i < num_threads; ++i)
772b15aabb3Stbbdev nodes.push_back(
773b15aabb3Stbbdev std::unique_ptr<continue_node<continue_msg>>(
774b15aabb3Stbbdev new continue_node<continue_msg>(g, priority_node_body{ barrier }, /*priority*/1)
775b15aabb3Stbbdev )
776b15aabb3Stbbdev );
77751c0b2f7Stbbdev
77851c0b2f7Stbbdev std::random_device rd;
77951c0b2f7Stbbdev std::mt19937 gen(rd());
78051c0b2f7Stbbdev
78151c0b2f7Stbbdev for (int trial = 0; trial < 10; ++trial) {
78251c0b2f7Stbbdev barrier = num_threads;
78351c0b2f7Stbbdev std::shuffle(nodes.begin(), nodes.end(), gen);
784b15aabb3Stbbdev for (auto& n : nodes)
78551c0b2f7Stbbdev make_edge(bn, *n);
78651c0b2f7Stbbdev bn.try_put(continue_msg());
78751c0b2f7Stbbdev g.wait_for_all();
788b15aabb3Stbbdev for (auto& n : nodes)
78951c0b2f7Stbbdev remove_edge(bn, *n);
79051c0b2f7Stbbdev }
79151c0b2f7Stbbdev }
79251c0b2f7Stbbdev );
79351c0b2f7Stbbdev }
79451c0b2f7Stbbdev
79551c0b2f7Stbbdev } // namespace ManySuccessors
79651c0b2f7Stbbdev
79751c0b2f7Stbbdev #if TBB_USE_EXCEPTIONS
79851c0b2f7Stbbdev namespace Exceptions {
test()79951c0b2f7Stbbdev void test() {
80051c0b2f7Stbbdev using namespace tbb::flow;
80151c0b2f7Stbbdev graph g;
80251c0b2f7Stbbdev std::srand(42);
80349e08aacStbbdev const unsigned num_messages = 50;
80449e08aacStbbdev std::vector<unsigned> throwing_msgs;
80549e08aacStbbdev std::atomic<unsigned> msg_count(0);
80649e08aacStbbdev continue_node<unsigned> c(g, [&msg_count](continue_msg) {
80749e08aacStbbdev return ++msg_count;
80851c0b2f7Stbbdev }, 2);
80949e08aacStbbdev function_node<unsigned> f(g, unlimited, [&throwing_msgs](unsigned v) {
81049e08aacStbbdev for( auto i : throwing_msgs ) {
81149e08aacStbbdev if( i == v )
81251c0b2f7Stbbdev throw std::runtime_error("Exception::test");
81351c0b2f7Stbbdev }
81451c0b2f7Stbbdev }, 1);
81551c0b2f7Stbbdev make_edge(c, f);
81651c0b2f7Stbbdev for (int i = 0; i < 10; ++i) {
81749e08aacStbbdev msg_count = 0;
81849e08aacStbbdev g.reset();
81949e08aacStbbdev throwing_msgs.push_back(std::rand() % num_messages);
82051c0b2f7Stbbdev try {
82149e08aacStbbdev for (unsigned j = 0; j < num_messages; ++j) {
82251c0b2f7Stbbdev c.try_put(continue_msg());
82351c0b2f7Stbbdev }
82451c0b2f7Stbbdev g.wait_for_all();
82551c0b2f7Stbbdev FAIL("Unreachable code. The exception is expected");
82651c0b2f7Stbbdev } catch (std::runtime_error&) {
82751c0b2f7Stbbdev CHECK(g.is_cancelled());
82849e08aacStbbdev CHECK(g.exception_thrown());
82951c0b2f7Stbbdev } catch (...) {
83051c0b2f7Stbbdev FAIL("Unexpected exception");
83151c0b2f7Stbbdev }
83251c0b2f7Stbbdev }
83351c0b2f7Stbbdev }
83451c0b2f7Stbbdev } // namespace Exceptions
83551c0b2f7Stbbdev #endif
83651c0b2f7Stbbdev
83751c0b2f7Stbbdev //! Test node prioritization
83851c0b2f7Stbbdev //! \brief \ref requirement
83951c0b2f7Stbbdev TEST_CASE("Priority nodes take precedence"){
8408dcbd5b1Stbbdev for( auto p : utils::concurrency_range() ) {
84155f9b178SIvan Kochin PriorityNodesTakePrecedence::test( static_cast<int>(p) );
84251c0b2f7Stbbdev }
84351c0b2f7Stbbdev }
84451c0b2f7Stbbdev
84551c0b2f7Stbbdev //! Test thread eager reaction
84651c0b2f7Stbbdev //! \brief \ref error_guessing
84751c0b2f7Stbbdev TEST_CASE("Thread eager reaction"){
8488dcbd5b1Stbbdev for( auto p : utils::concurrency_range() ) {
84955f9b178SIvan Kochin ThreadsEagerReaction::test( static_cast<int>(p) );
85051c0b2f7Stbbdev }
85151c0b2f7Stbbdev }
85251c0b2f7Stbbdev
85351c0b2f7Stbbdev //! Test prioritization under concurrency limits
85451c0b2f7Stbbdev //! \brief \ref error_guessing
85551c0b2f7Stbbdev TEST_CASE("Limiting execution to prioritized work") {
8568dcbd5b1Stbbdev for( auto p : utils::concurrency_range() ) {
85755f9b178SIvan Kochin LimitingExecutionToPriorityTask::test( static_cast<int>(p) );
85851c0b2f7Stbbdev }
85951c0b2f7Stbbdev }
86051c0b2f7Stbbdev
86151c0b2f7Stbbdev //! Test nested graphs
86251c0b2f7Stbbdev //! \brief \ref error_guessing
86351c0b2f7Stbbdev TEST_CASE("Nested test case") {
864b15aabb3Stbbdev std::size_t max_threads = utils::get_platform_max_threads();
865b15aabb3Stbbdev // The stepping for the threads is done inside.
86655f9b178SIvan Kochin NestedCase::test( static_cast<int>(max_threads) );
86751c0b2f7Stbbdev }
86851c0b2f7Stbbdev
86951c0b2f7Stbbdev //! Test bypassed task with higher priority
87051c0b2f7Stbbdev //! \brief \ref error_guessing
87151c0b2f7Stbbdev TEST_CASE("Bypass prioritized task"){
8728dcbd5b1Stbbdev tbb::global_control gc( tbb::global_control::max_allowed_parallelism, 1 );
87351c0b2f7Stbbdev BypassPrioritizedTask::test();
87451c0b2f7Stbbdev }
87551c0b2f7Stbbdev
87651c0b2f7Stbbdev //! Test mixing prioritized and ordinary successors
87751c0b2f7Stbbdev //! \brief \ref error_guessing
87851c0b2f7Stbbdev TEST_CASE("Many successors") {
8798dcbd5b1Stbbdev for( auto p : utils::concurrency_range() ) {
88055f9b178SIvan Kochin ManySuccessors::test( static_cast<int>(p) );
88151c0b2f7Stbbdev }
88251c0b2f7Stbbdev }
88351c0b2f7Stbbdev
88451c0b2f7Stbbdev #if TBB_USE_EXCEPTIONS
88551c0b2f7Stbbdev //! Test for exceptions
88651c0b2f7Stbbdev //! \brief \ref error_guessing
88751c0b2f7Stbbdev TEST_CASE("Exceptions") {
88851c0b2f7Stbbdev Exceptions::test();
88951c0b2f7Stbbdev }
89051c0b2f7Stbbdev #endif
891