151c0b2f7Stbbdev /*
2b15aabb3Stbbdev     Copyright (c) 2018-2021 Intel Corporation
351c0b2f7Stbbdev 
451c0b2f7Stbbdev     Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev     you may not use this file except in compliance with the License.
651c0b2f7Stbbdev     You may obtain a copy of the License at
751c0b2f7Stbbdev 
851c0b2f7Stbbdev         http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev 
1051c0b2f7Stbbdev     Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev     distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev     See the License for the specific language governing permissions and
1451c0b2f7Stbbdev     limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev 
17b15aabb3Stbbdev #if __INTEL_COMPILER && _MSC_VER
18b15aabb3Stbbdev #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
19b15aabb3Stbbdev #endif
20b15aabb3Stbbdev 
2151c0b2f7Stbbdev #include "common/config.h"
2251c0b2f7Stbbdev 
2351c0b2f7Stbbdev #include "tbb/flow_graph.h"
2451c0b2f7Stbbdev #include "tbb/parallel_for.h"
25b15aabb3Stbbdev #include "tbb/global_control.h"
26b15aabb3Stbbdev #include "tbb/task_arena.h"
2751c0b2f7Stbbdev 
2851c0b2f7Stbbdev #include "common/test.h"
2951c0b2f7Stbbdev #include "common/utils.h"
308dcbd5b1Stbbdev #include "common/utils_concurrency_limit.h"
3151c0b2f7Stbbdev #include "common/spin_barrier.h"
3251c0b2f7Stbbdev 
3351c0b2f7Stbbdev #include <vector>
3451c0b2f7Stbbdev #include <cstdlib>
3551c0b2f7Stbbdev #include <random>
3651c0b2f7Stbbdev #include <algorithm>
37b15aabb3Stbbdev #include <memory>
3851c0b2f7Stbbdev 
3951c0b2f7Stbbdev 
4051c0b2f7Stbbdev //! \file test_flow_graph_priorities.cpp
4151c0b2f7Stbbdev //! \brief Test for [flow_graph.copy_body flow_graph.function_node flow_graph.multifunction_node flow_graph.continue_node flow_graph.async_node] specification
4251c0b2f7Stbbdev 
4351c0b2f7Stbbdev 
4451c0b2f7Stbbdev using namespace tbb::flow;
4551c0b2f7Stbbdev 
4651c0b2f7Stbbdev struct TaskInfo {
TaskInfoTaskInfo4751c0b2f7Stbbdev     TaskInfo() : my_priority(-1), my_task_index(-1) {}
TaskInfoTaskInfo4851c0b2f7Stbbdev     TaskInfo( int priority, int task_index )
4951c0b2f7Stbbdev         : my_priority(priority), my_task_index(task_index) {}
5051c0b2f7Stbbdev     int my_priority;
5151c0b2f7Stbbdev     int my_task_index;
5251c0b2f7Stbbdev };
5351c0b2f7Stbbdev 
5451c0b2f7Stbbdev std::vector<TaskInfo> g_task_info;
5551c0b2f7Stbbdev 
5651c0b2f7Stbbdev std::atomic<unsigned> g_task_num;
5751c0b2f7Stbbdev 
spin_for(double delta)5851c0b2f7Stbbdev void spin_for( double delta ) {
5951c0b2f7Stbbdev     tbb::tick_count start = tbb::tick_count::now();
6051c0b2f7Stbbdev     while( (tbb::tick_count::now() - start).seconds() < delta ) ;
6151c0b2f7Stbbdev }
6251c0b2f7Stbbdev 
6351c0b2f7Stbbdev namespace PriorityNodesTakePrecedence {
6451c0b2f7Stbbdev 
6551c0b2f7Stbbdev std::atomic<bool> g_work_submitted;
6651c0b2f7Stbbdev 
6751c0b2f7Stbbdev const unsigned node_num = 100;
6851c0b2f7Stbbdev const unsigned start_index = node_num / 3;
6951c0b2f7Stbbdev const unsigned end_index = node_num * 2 / 3;
7051c0b2f7Stbbdev std::atomic<unsigned> g_priority_task_index;
7151c0b2f7Stbbdev 
body_func(int priority,utils::SpinBarrier & my_barrier)7251c0b2f7Stbbdev void body_func( int priority, utils::SpinBarrier& my_barrier ) {
73b15aabb3Stbbdev     while( !g_work_submitted.load(std::memory_order_acquire) )
74b15aabb3Stbbdev         tbb::detail::d0::yield();
7551c0b2f7Stbbdev     int current_task_index = g_task_num++;
7651c0b2f7Stbbdev     if( priority != no_priority )
7751c0b2f7Stbbdev         g_task_info[g_priority_task_index++] = TaskInfo( priority, current_task_index );
7851c0b2f7Stbbdev     const bool all_threads_will_come =
7951c0b2f7Stbbdev         unsigned(current_task_index) < node_num - (node_num % tbb::this_task_arena::max_concurrency());
8051c0b2f7Stbbdev     if( all_threads_will_come )
8151c0b2f7Stbbdev         my_barrier.wait();
8251c0b2f7Stbbdev }
8351c0b2f7Stbbdev 
8451c0b2f7Stbbdev typedef multifunction_node< int, std::tuple<int> > multi_node;
8551c0b2f7Stbbdev 
8651c0b2f7Stbbdev template <typename T>
8751c0b2f7Stbbdev struct Body {
BodyPriorityNodesTakePrecedence::Body8851c0b2f7Stbbdev     Body( int priority, utils::SpinBarrier& barrier )
8951c0b2f7Stbbdev         : my_priority( priority ), my_barrier( barrier ) {}
operator ()PriorityNodesTakePrecedence::Body9051c0b2f7Stbbdev     T operator()( const T& msg ) const {
9151c0b2f7Stbbdev         body_func( my_priority, my_barrier );
9251c0b2f7Stbbdev         return msg;
9351c0b2f7Stbbdev     }
operator ()PriorityNodesTakePrecedence::Body9451c0b2f7Stbbdev     void operator()( int msg, multi_node::output_ports_type& op ) const {
9551c0b2f7Stbbdev         body_func( my_priority, my_barrier );
9651c0b2f7Stbbdev         std::get<0>(op).try_put( msg );
9751c0b2f7Stbbdev     }
9851c0b2f7Stbbdev private:
9951c0b2f7Stbbdev     int my_priority;
10051c0b2f7Stbbdev     utils::SpinBarrier& my_barrier;
10151c0b2f7Stbbdev };
10251c0b2f7Stbbdev 
10351c0b2f7Stbbdev template<typename NodeType, typename BodyType>
10451c0b2f7Stbbdev struct node_creator_t {
operator ()PriorityNodesTakePrecedence::node_creator_t10551c0b2f7Stbbdev     NodeType* operator()( graph& g, unsigned index, utils::SpinBarrier& barrier ) {
10651c0b2f7Stbbdev         if( start_index <= index && index < end_index )
10751c0b2f7Stbbdev             return new NodeType( g, unlimited, BodyType(index, barrier), node_priority_t(index) );
10851c0b2f7Stbbdev         else
10951c0b2f7Stbbdev             return new NodeType( g, unlimited, BodyType(no_priority, barrier) );
11051c0b2f7Stbbdev     }
11151c0b2f7Stbbdev };
11251c0b2f7Stbbdev 
11351c0b2f7Stbbdev template<typename BodyType>
11451c0b2f7Stbbdev struct node_creator_t< continue_node<continue_msg>, BodyType > {
operator ()PriorityNodesTakePrecedence::node_creator_t11551c0b2f7Stbbdev     continue_node<continue_msg>* operator()( graph& g, unsigned index, utils::SpinBarrier& barrier ) {
11651c0b2f7Stbbdev         if( start_index <= index && index < end_index )
11751c0b2f7Stbbdev             return new continue_node<continue_msg>( g, BodyType(index, barrier), node_priority_t(index) );
11851c0b2f7Stbbdev         else
11951c0b2f7Stbbdev             return new continue_node<continue_msg>( g, BodyType(no_priority, barrier) );
12051c0b2f7Stbbdev     }
12151c0b2f7Stbbdev };
12251c0b2f7Stbbdev 
12351c0b2f7Stbbdev 
12451c0b2f7Stbbdev struct passthru_body {
12551c0b2f7Stbbdev     template<typename T>
operator ()PriorityNodesTakePrecedence::passthru_body12651c0b2f7Stbbdev     continue_msg operator()( T ) const { return continue_msg(); }
12751c0b2f7Stbbdev };
12851c0b2f7Stbbdev 
12951c0b2f7Stbbdev template<typename NodeType, typename NodeTypeCreator>
test_node(NodeTypeCreator node_creator)13051c0b2f7Stbbdev void test_node( NodeTypeCreator node_creator ) {
13151c0b2f7Stbbdev     const int num_threads = tbb::this_task_arena::max_concurrency();
13251c0b2f7Stbbdev     utils::SpinBarrier barrier( num_threads );
13351c0b2f7Stbbdev     graph g;
13451c0b2f7Stbbdev     broadcast_node<typename NodeType::input_type> bn(g);
13551c0b2f7Stbbdev     function_node<typename NodeType::input_type> tn(g, unlimited, passthru_body());
13651c0b2f7Stbbdev     // Using pointers to nodes to avoid errors on compilers, which try to generate assignment
13751c0b2f7Stbbdev     // operator for the nodes
138b15aabb3Stbbdev     std::vector< std::unique_ptr<NodeType> > nodes;
13951c0b2f7Stbbdev     for( unsigned i = 0; i < node_num; ++i ) {
140b15aabb3Stbbdev         nodes.push_back(std::unique_ptr<NodeType>( node_creator(g, i, barrier) ));
14151c0b2f7Stbbdev         make_edge( bn, *nodes.back() );
14251c0b2f7Stbbdev         make_edge( *nodes.back(), tn );
14351c0b2f7Stbbdev     }
14451c0b2f7Stbbdev 
14551c0b2f7Stbbdev     const size_t repeats = 10;
14651c0b2f7Stbbdev     const size_t priority_nodes_num = end_index - start_index;
14751c0b2f7Stbbdev     size_t global_order_failures = 0;
14851c0b2f7Stbbdev     for( size_t repeat = 0; repeat < repeats; ++repeat ) {
14951c0b2f7Stbbdev         g_work_submitted.store( false, std::memory_order_release );
15051c0b2f7Stbbdev         g_task_num = g_priority_task_index = 0;
15151c0b2f7Stbbdev         g_task_info.clear(); g_task_info.resize( priority_nodes_num );
15251c0b2f7Stbbdev 
15351c0b2f7Stbbdev         bn.try_put( typename NodeType::input_type{} );
15451c0b2f7Stbbdev         // Setting of the flag is based on the knowledge that the calling thread broadcasts the
15551c0b2f7Stbbdev         // message to successor nodes. Thus, once the calling thread returns from try_put() call all
15651c0b2f7Stbbdev         // necessary tasks are spawned. Thus, this makes this test to be a whitebox test to some
15751c0b2f7Stbbdev         // extent.
15851c0b2f7Stbbdev         g_work_submitted.store( true, std::memory_order_release );
15951c0b2f7Stbbdev 
16051c0b2f7Stbbdev         g.wait_for_all();
16151c0b2f7Stbbdev 
16251c0b2f7Stbbdev         CHECK_MESSAGE( (g_priority_task_index == g_task_info.size()), "Incorrect number of tasks with priority." );
16351c0b2f7Stbbdev         CHECK_MESSAGE( (priority_nodes_num == g_task_info.size()), "Incorrect number of tasks with priority executed." );
16451c0b2f7Stbbdev 
16551c0b2f7Stbbdev         for( unsigned i = 0; i < g_priority_task_index; i += num_threads ) {
16651c0b2f7Stbbdev             bool found = false;
16751c0b2f7Stbbdev             unsigned highest_priority_within_group = end_index - i - 1;
16851c0b2f7Stbbdev             for( unsigned j = i; j < i+num_threads; ++j ) {
16951c0b2f7Stbbdev                 if( g_task_info[j].my_priority == int(highest_priority_within_group) ) {
17051c0b2f7Stbbdev                     found = true;
17151c0b2f7Stbbdev                     break;
17251c0b2f7Stbbdev                 }
17351c0b2f7Stbbdev             }
17451c0b2f7Stbbdev             CHECK_MESSAGE( found, "Highest priority task within a group was not found" );
17551c0b2f7Stbbdev         }
17651c0b2f7Stbbdev         for( unsigned i = 0; i < g_priority_task_index; ++i ) {
17751c0b2f7Stbbdev             // This check might fail because priorities do not guarantee ordering, i.e. assumption
17851c0b2f7Stbbdev             // that all priority nodes should increment the task counter before any subsequent
17951c0b2f7Stbbdev             // no-priority node is not correct. In the worst case, a thread that took a priority
18051c0b2f7Stbbdev             // node might be preempted and become the last to increment the counter. That's why the
18151c0b2f7Stbbdev             // test passing is based on statistics, which could be affected by machine overload
18251c0b2f7Stbbdev             // unfortunately.
18351c0b2f7Stbbdev             // TODO revamp: reconsider the following check for this test
184b15aabb3Stbbdev             if( g_task_info[i].my_task_index > int(priority_nodes_num + num_threads) )
18551c0b2f7Stbbdev                 ++global_order_failures;
18651c0b2f7Stbbdev         }
18751c0b2f7Stbbdev     }
18851c0b2f7Stbbdev     float failure_ratio = float(global_order_failures) / float(repeats*priority_nodes_num);
18951c0b2f7Stbbdev     CHECK_MESSAGE(
19051c0b2f7Stbbdev         failure_ratio <= 0.1f,
19151c0b2f7Stbbdev         "Nodes with priorities executed in wrong order too frequently over non-prioritized nodes."
19251c0b2f7Stbbdev     );
19351c0b2f7Stbbdev }
19451c0b2f7Stbbdev 
19551c0b2f7Stbbdev template<typename NodeType, typename NodeBody>
call_within_arena(tbb::task_arena & arena)19651c0b2f7Stbbdev void call_within_arena( tbb::task_arena& arena ) {
19751c0b2f7Stbbdev     arena.execute(
19851c0b2f7Stbbdev         [&]() {
19951c0b2f7Stbbdev             test_node<NodeType>( node_creator_t<NodeType, NodeBody>() );
20051c0b2f7Stbbdev         }
20151c0b2f7Stbbdev     );
20251c0b2f7Stbbdev }
20351c0b2f7Stbbdev 
test(int num_threads)20451c0b2f7Stbbdev void test( int num_threads ) {
20551c0b2f7Stbbdev     INFO( "Testing execution of nodes with priority takes precedence (num_threads=" << num_threads << ") - " );
20651c0b2f7Stbbdev     tbb::task_arena arena(num_threads);
20751c0b2f7Stbbdev     call_within_arena< function_node<int,int>, Body<int> >( arena );
20851c0b2f7Stbbdev     call_within_arena< multi_node, Body<int> >( arena );
20951c0b2f7Stbbdev     call_within_arena< continue_node<continue_msg>, Body<continue_msg> >( arena );
21051c0b2f7Stbbdev }
21151c0b2f7Stbbdev 
21251c0b2f7Stbbdev } /* namespace PriorityNodesTakePrecedence */
21351c0b2f7Stbbdev 
21451c0b2f7Stbbdev namespace ThreadsEagerReaction {
21551c0b2f7Stbbdev 
21651c0b2f7Stbbdev // TODO revamp: combine with similar queue from test_async_node
21751c0b2f7Stbbdev template <typename T>
21851c0b2f7Stbbdev class concurrent_queue {
21951c0b2f7Stbbdev public:
try_pop(T & item)22051c0b2f7Stbbdev     bool try_pop(T& item) {
22151c0b2f7Stbbdev         std::lock_guard<queue_mutex> lock(mutex);
22251c0b2f7Stbbdev         if ( q.empty() )
22351c0b2f7Stbbdev             return false;
22451c0b2f7Stbbdev         item = q.front();
22551c0b2f7Stbbdev         q.pop();
22651c0b2f7Stbbdev         return true;
22751c0b2f7Stbbdev     }
22851c0b2f7Stbbdev 
push(const T & item)22951c0b2f7Stbbdev     void push(const T& item) {
23051c0b2f7Stbbdev         std::lock_guard<queue_mutex> lock(mutex);
23151c0b2f7Stbbdev         q.push(item);
23251c0b2f7Stbbdev     }
23351c0b2f7Stbbdev private:
23451c0b2f7Stbbdev     std::queue<T> q;
23551c0b2f7Stbbdev     using queue_mutex = std::mutex;
23651c0b2f7Stbbdev     std::mutex mutex;
23751c0b2f7Stbbdev };
23851c0b2f7Stbbdev 
23951c0b2f7Stbbdev using utils::SpinBarrier;
24051c0b2f7Stbbdev 
24151c0b2f7Stbbdev enum task_type_t { no_task, regular_task, async_task };
24251c0b2f7Stbbdev 
24351c0b2f7Stbbdev struct profile_t {
24451c0b2f7Stbbdev     task_type_t task_type;
24551c0b2f7Stbbdev     unsigned global_task_id;
24651c0b2f7Stbbdev     double elapsed;
24751c0b2f7Stbbdev };
24851c0b2f7Stbbdev 
24951c0b2f7Stbbdev std::vector<unsigned> g_async_task_ids;
25051c0b2f7Stbbdev 
25151c0b2f7Stbbdev typedef unsigned data_type;
25251c0b2f7Stbbdev typedef async_node<data_type, data_type> async_node_type;
25351c0b2f7Stbbdev typedef multifunction_node<
25451c0b2f7Stbbdev     data_type, std::tuple<data_type, data_type> > decider_node_type;
25551c0b2f7Stbbdev struct AsyncActivity {
25651c0b2f7Stbbdev     typedef async_node_type::gateway_type gateway_type;
25751c0b2f7Stbbdev 
25851c0b2f7Stbbdev     struct work_type { data_type input; gateway_type* gateway; };
25959ac78faSAlex     std::atomic<bool> done;
26051c0b2f7Stbbdev     concurrent_queue<work_type> my_queue;
26151c0b2f7Stbbdev     std::thread my_service_thread;
26251c0b2f7Stbbdev 
26351c0b2f7Stbbdev     struct ServiceThreadFunc {
26451c0b2f7Stbbdev         SpinBarrier& my_barrier;
ServiceThreadFuncThreadsEagerReaction::AsyncActivity::ServiceThreadFunc26551c0b2f7Stbbdev         ServiceThreadFunc(SpinBarrier& barrier) : my_barrier(barrier) {}
operator ()ThreadsEagerReaction::AsyncActivity::ServiceThreadFunc26651c0b2f7Stbbdev         void operator()(AsyncActivity* activity) {
26751c0b2f7Stbbdev             while (!activity->done) {
26851c0b2f7Stbbdev                 work_type work;
26951c0b2f7Stbbdev                 while (activity->my_queue.try_pop(work)) {
27051c0b2f7Stbbdev                     g_async_task_ids.push_back( ++g_task_num );
27151c0b2f7Stbbdev                     work.gateway->try_put(work.input);
27251c0b2f7Stbbdev                     work.gateway->release_wait();
27351c0b2f7Stbbdev                     my_barrier.wait();
27451c0b2f7Stbbdev                 }
27551c0b2f7Stbbdev             }
27651c0b2f7Stbbdev         }
27751c0b2f7Stbbdev     };
stop_and_waitThreadsEagerReaction::AsyncActivity27851c0b2f7Stbbdev     void stop_and_wait() { done = true; my_service_thread.join(); }
27951c0b2f7Stbbdev 
submitThreadsEagerReaction::AsyncActivity28051c0b2f7Stbbdev     void submit(data_type input, gateway_type* gateway) {
28151c0b2f7Stbbdev         work_type work = { input, gateway };
28251c0b2f7Stbbdev         gateway->reserve_wait();
28351c0b2f7Stbbdev         my_queue.push(work);
28451c0b2f7Stbbdev     }
AsyncActivityThreadsEagerReaction::AsyncActivity28551c0b2f7Stbbdev     AsyncActivity(SpinBarrier& barrier)
28651c0b2f7Stbbdev         : done(false), my_service_thread(ServiceThreadFunc(barrier), this) {}
28751c0b2f7Stbbdev };
28851c0b2f7Stbbdev 
28951c0b2f7Stbbdev struct StartBody {
29051c0b2f7Stbbdev     bool has_run;
operator ()ThreadsEagerReaction::StartBody29151c0b2f7Stbbdev     data_type operator()(tbb::flow_control& fc) {
29251c0b2f7Stbbdev         if (has_run){
29351c0b2f7Stbbdev             fc.stop();
29451c0b2f7Stbbdev             return data_type();
29551c0b2f7Stbbdev         }
29651c0b2f7Stbbdev         has_run = true;
29751c0b2f7Stbbdev         return 1;
29851c0b2f7Stbbdev     }
StartBodyThreadsEagerReaction::StartBody29951c0b2f7Stbbdev     StartBody() : has_run(false) {}
30051c0b2f7Stbbdev };
30151c0b2f7Stbbdev 
30251c0b2f7Stbbdev struct ParallelForBody {
30351c0b2f7Stbbdev     SpinBarrier& my_barrier;
30451c0b2f7Stbbdev     const data_type& my_input;
ParallelForBodyThreadsEagerReaction::ParallelForBody30551c0b2f7Stbbdev     ParallelForBody(SpinBarrier& barrier, const data_type& input)
30651c0b2f7Stbbdev         : my_barrier(barrier), my_input(input) {}
operator ()ThreadsEagerReaction::ParallelForBody30751c0b2f7Stbbdev     void operator()(const data_type&) const {
30851c0b2f7Stbbdev         my_barrier.wait();
30951c0b2f7Stbbdev         ++g_task_num;
31051c0b2f7Stbbdev     }
31151c0b2f7Stbbdev };
31251c0b2f7Stbbdev 
31351c0b2f7Stbbdev struct CpuWorkBody {
31451c0b2f7Stbbdev     SpinBarrier& my_barrier;
31551c0b2f7Stbbdev     const int my_tasks_count;
operator ()ThreadsEagerReaction::CpuWorkBody31651c0b2f7Stbbdev     data_type operator()(const data_type& input) {
31751c0b2f7Stbbdev         tbb::parallel_for(0, my_tasks_count, ParallelForBody(my_barrier, input), tbb::simple_partitioner());
31851c0b2f7Stbbdev         return input;
31951c0b2f7Stbbdev     }
CpuWorkBodyThreadsEagerReaction::CpuWorkBody32051c0b2f7Stbbdev     CpuWorkBody(SpinBarrier& barrier, int tasks_count)
32151c0b2f7Stbbdev         : my_barrier(barrier), my_tasks_count(tasks_count) {}
32251c0b2f7Stbbdev };
32351c0b2f7Stbbdev 
32451c0b2f7Stbbdev struct DeciderBody {
32551c0b2f7Stbbdev     const data_type my_limit;
DeciderBodyThreadsEagerReaction::DeciderBody32651c0b2f7Stbbdev     DeciderBody( const data_type& limit ) : my_limit( limit ) {}
operator ()ThreadsEagerReaction::DeciderBody32751c0b2f7Stbbdev     void operator()(data_type input, decider_node_type::output_ports_type& ports) {
32851c0b2f7Stbbdev         if (input < my_limit)
32951c0b2f7Stbbdev             std::get<0>(ports).try_put(input + 1);
33051c0b2f7Stbbdev     }
33151c0b2f7Stbbdev };
33251c0b2f7Stbbdev 
33351c0b2f7Stbbdev struct AsyncSubmissionBody {
33451c0b2f7Stbbdev     AsyncActivity* my_activity;
335*324afd9eSIlya Mishin     // It is important that async_node in the test executes without spawning a TBB task, because
336*324afd9eSIlya Mishin     // it passes the work to asynchronous thread, which unlocks the barrier that is waited
337*324afd9eSIlya Mishin     // by every execution thread (asynchronous thread and any TBB worker or main thread).
338*324afd9eSIlya Mishin     // This is why async_node's body marked noexcept.
operator ()ThreadsEagerReaction::AsyncSubmissionBody339*324afd9eSIlya Mishin     void operator()(data_type input, async_node_type::gateway_type& gateway) noexcept {
34051c0b2f7Stbbdev         my_activity->submit(input, &gateway);
34151c0b2f7Stbbdev     }
AsyncSubmissionBodyThreadsEagerReaction::AsyncSubmissionBody34251c0b2f7Stbbdev     AsyncSubmissionBody(AsyncActivity* activity) : my_activity(activity) {}
34351c0b2f7Stbbdev };
34451c0b2f7Stbbdev 
test(unsigned num_threads)34551c0b2f7Stbbdev void test( unsigned num_threads ) {
34651c0b2f7Stbbdev     INFO( "Testing threads react eagerly on asynchronous tasks (num_threads=" << num_threads << ") - " );
34751c0b2f7Stbbdev     if( num_threads == std::thread::hardware_concurrency() ) {
34851c0b2f7Stbbdev         // one thread is required for asynchronous compute resource
34951c0b2f7Stbbdev         INFO("skipping test since it is designed to work on less number of threads than "
35051c0b2f7Stbbdev              "hardware concurrency allows\n");
35151c0b2f7Stbbdev         return;
35251c0b2f7Stbbdev     }
35351c0b2f7Stbbdev     const unsigned cpu_threads = unsigned(num_threads);
35451c0b2f7Stbbdev     const unsigned cpu_tasks_per_thread = 4;
35551c0b2f7Stbbdev     const unsigned nested_cpu_tasks = cpu_tasks_per_thread * cpu_threads;
35651c0b2f7Stbbdev     const unsigned async_subgraph_reruns = 8;
35751c0b2f7Stbbdev     const unsigned cpu_subgraph_reruns = 2;
35851c0b2f7Stbbdev 
35951c0b2f7Stbbdev     SpinBarrier barrier(cpu_threads + /*async thread=*/1);
36051c0b2f7Stbbdev     g_task_num = 0;
36151c0b2f7Stbbdev     g_async_task_ids.clear();
36251c0b2f7Stbbdev     g_async_task_ids.reserve(async_subgraph_reruns);
36351c0b2f7Stbbdev 
36451c0b2f7Stbbdev     tbb::task_arena arena(cpu_threads);
36551c0b2f7Stbbdev 	arena.execute(
36651c0b2f7Stbbdev         [&]() {
36751c0b2f7Stbbdev             AsyncActivity activity(barrier);
36851c0b2f7Stbbdev             graph g;
36951c0b2f7Stbbdev 
37051c0b2f7Stbbdev             input_node<data_type> starter_node(g, StartBody());
37151c0b2f7Stbbdev             function_node<data_type, data_type> cpu_work_node(
37251c0b2f7Stbbdev                 g, unlimited, CpuWorkBody(barrier, nested_cpu_tasks));
37351c0b2f7Stbbdev             decider_node_type cpu_restarter_node(g, unlimited, DeciderBody(cpu_subgraph_reruns));
37451c0b2f7Stbbdev             async_node_type async_node(g, unlimited, AsyncSubmissionBody(&activity));
37551c0b2f7Stbbdev             decider_node_type async_restarter_node(
37651c0b2f7Stbbdev                 g, unlimited, DeciderBody(async_subgraph_reruns), node_priority_t(1)
37751c0b2f7Stbbdev             );
37851c0b2f7Stbbdev 
37951c0b2f7Stbbdev             make_edge(starter_node, cpu_work_node);
38051c0b2f7Stbbdev             make_edge(cpu_work_node, cpu_restarter_node);
38151c0b2f7Stbbdev             make_edge(output_port<0>(cpu_restarter_node), cpu_work_node);
38251c0b2f7Stbbdev 
38351c0b2f7Stbbdev             make_edge(starter_node, async_node);
38451c0b2f7Stbbdev             make_edge(async_node, async_restarter_node);
38551c0b2f7Stbbdev             make_edge(output_port<0>(async_restarter_node), async_node);
38651c0b2f7Stbbdev 
38751c0b2f7Stbbdev             starter_node.activate();
38851c0b2f7Stbbdev             g.wait_for_all();
38951c0b2f7Stbbdev             activity.stop_and_wait();
39051c0b2f7Stbbdev 
39151c0b2f7Stbbdev             const size_t async_task_num = size_t(async_subgraph_reruns);
39251c0b2f7Stbbdev             CHECK_MESSAGE( ( g_async_task_ids.size() == async_task_num), "Incorrect number of async tasks." );
39351c0b2f7Stbbdev             unsigned max_span = unsigned(2 * cpu_threads + 1);
39451c0b2f7Stbbdev             for( size_t idx = 1; idx < async_task_num; ++idx ) {
39551c0b2f7Stbbdev                 CHECK_MESSAGE( (g_async_task_ids[idx] - g_async_task_ids[idx-1] <= max_span),
39651c0b2f7Stbbdev                                "Async tasks were not able to interfere with CPU tasks." );
39751c0b2f7Stbbdev 
39851c0b2f7Stbbdev             }
39951c0b2f7Stbbdev         }
40051c0b2f7Stbbdev     );
40151c0b2f7Stbbdev     INFO("done\n");
40251c0b2f7Stbbdev }
40351c0b2f7Stbbdev } /* ThreadsEagerReaction */
40451c0b2f7Stbbdev 
40551c0b2f7Stbbdev namespace LimitingExecutionToPriorityTask {
40651c0b2f7Stbbdev 
40751c0b2f7Stbbdev enum work_type_t { NONPRIORITIZED_WORK, PRIORITIZED_WORK };
40851c0b2f7Stbbdev 
40951c0b2f7Stbbdev struct execution_tracker_t {
execution_tracker_tLimitingExecutionToPriorityTask::execution_tracker_t41051c0b2f7Stbbdev     execution_tracker_t() { reset(); }
resetLimitingExecutionToPriorityTask::execution_tracker_t41151c0b2f7Stbbdev     void reset() {
41251c0b2f7Stbbdev         prioritized_work_submitter = std::thread::id();
41351c0b2f7Stbbdev         prioritized_work_started = false;
41451c0b2f7Stbbdev         prioritized_work_finished = false;
41551c0b2f7Stbbdev         prioritized_work_interrupted = false;
41651c0b2f7Stbbdev     }
41751c0b2f7Stbbdev     std::thread::id prioritized_work_submitter;
41859ac78faSAlex     std::atomic<bool> prioritized_work_started;
41951c0b2f7Stbbdev     bool prioritized_work_finished;
42051c0b2f7Stbbdev     bool prioritized_work_interrupted;
42151c0b2f7Stbbdev } exec_tracker;
42251c0b2f7Stbbdev 
42351c0b2f7Stbbdev template<work_type_t work_type>
42451c0b2f7Stbbdev void do_node_work( int work_size );
42551c0b2f7Stbbdev 
42651c0b2f7Stbbdev template<work_type_t>
42751c0b2f7Stbbdev void do_nested_work( const std::thread::id& tid, const tbb::blocked_range<int>& subrange );
42851c0b2f7Stbbdev 
42951c0b2f7Stbbdev template<work_type_t work_type>
43051c0b2f7Stbbdev struct CommonBody {
CommonBodyLimitingExecutionToPriorityTask::CommonBody43151c0b2f7Stbbdev     CommonBody() : my_body_size( 0 ) { }
CommonBodyLimitingExecutionToPriorityTask::CommonBody43251c0b2f7Stbbdev     CommonBody( int body_size ) : my_body_size( body_size ) { }
operator ()LimitingExecutionToPriorityTask::CommonBody43351c0b2f7Stbbdev     continue_msg operator()( const continue_msg& msg ) const {
43451c0b2f7Stbbdev         do_node_work<work_type>(my_body_size);
43551c0b2f7Stbbdev         return msg;
43651c0b2f7Stbbdev     }
operator ()LimitingExecutionToPriorityTask::CommonBody43751c0b2f7Stbbdev     void operator()( const tbb::blocked_range<int>& subrange ) const {
43851c0b2f7Stbbdev         do_nested_work<work_type>( /*tid=*/std::this_thread::get_id(), subrange );
43951c0b2f7Stbbdev     }
44051c0b2f7Stbbdev     int my_body_size;
44151c0b2f7Stbbdev };
44251c0b2f7Stbbdev 
44351c0b2f7Stbbdev template<work_type_t work_type>
do_node_work(int work_size)44451c0b2f7Stbbdev void do_node_work(int work_size) {
44551c0b2f7Stbbdev     tbb::parallel_for( tbb::blocked_range<int>(0, work_size), CommonBody<work_type>(),
44651c0b2f7Stbbdev                        tbb::simple_partitioner() );
44751c0b2f7Stbbdev }
44851c0b2f7Stbbdev 
44951c0b2f7Stbbdev template<work_type_t>
do_nested_work(const std::thread::id & tid,const tbb::blocked_range<int> &)45051c0b2f7Stbbdev void do_nested_work( const std::thread::id& tid, const tbb::blocked_range<int>& /*subrange*/ ) {
45151c0b2f7Stbbdev     // This is non-prioritized work...
45259ac78faSAlex     if( !exec_tracker.prioritized_work_started || exec_tracker.prioritized_work_submitter != tid )
45351c0b2f7Stbbdev         return;
45451c0b2f7Stbbdev     // ...being executed by the thread that initially started prioritized one...
45551c0b2f7Stbbdev     CHECK_MESSAGE( exec_tracker.prioritized_work_started,
45651c0b2f7Stbbdev                    "Prioritized work should have been started by that time." );
45751c0b2f7Stbbdev     // ...prioritized work has been started already...
45851c0b2f7Stbbdev     if( exec_tracker.prioritized_work_finished )
45951c0b2f7Stbbdev         return;
46051c0b2f7Stbbdev     // ...but has not been finished yet
46151c0b2f7Stbbdev     exec_tracker.prioritized_work_interrupted = true;
46251c0b2f7Stbbdev }
46351c0b2f7Stbbdev 
46451c0b2f7Stbbdev struct IsolationFunctor {
46551c0b2f7Stbbdev     int work_size;
IsolationFunctorLimitingExecutionToPriorityTask::IsolationFunctor46651c0b2f7Stbbdev     IsolationFunctor(int ws) : work_size(ws) {}
operator ()LimitingExecutionToPriorityTask::IsolationFunctor46751c0b2f7Stbbdev     void operator()() const {
46851c0b2f7Stbbdev         tbb::parallel_for( tbb::blocked_range<int>(0, work_size), CommonBody<PRIORITIZED_WORK>(),
46951c0b2f7Stbbdev                            tbb::simple_partitioner() );
47051c0b2f7Stbbdev     }
47151c0b2f7Stbbdev };
47251c0b2f7Stbbdev 
47351c0b2f7Stbbdev template<>
do_node_work(int work_size)47451c0b2f7Stbbdev void do_node_work<PRIORITIZED_WORK>(int work_size) {
47551c0b2f7Stbbdev     exec_tracker.prioritized_work_submitter = std::this_thread::get_id();
47651c0b2f7Stbbdev     exec_tracker.prioritized_work_started = true;
47751c0b2f7Stbbdev     tbb::this_task_arena::isolate( IsolationFunctor(work_size) );
47851c0b2f7Stbbdev     exec_tracker.prioritized_work_finished = true;
47951c0b2f7Stbbdev }
48051c0b2f7Stbbdev 
48151c0b2f7Stbbdev template<>
do_nested_work(const std::thread::id & tid,const tbb::blocked_range<int> &)48251c0b2f7Stbbdev void do_nested_work<PRIORITIZED_WORK>( const std::thread::id& tid,
48351c0b2f7Stbbdev                                        const tbb::blocked_range<int>& /*subrange*/ ) {
48459ac78faSAlex     if( exec_tracker.prioritized_work_started && exec_tracker.prioritized_work_submitter == tid ) {
48551c0b2f7Stbbdev         CHECK_MESSAGE( !exec_tracker.prioritized_work_interrupted,
48651c0b2f7Stbbdev                        "Thread was not fully devoted to processing of prioritized task." );
48751c0b2f7Stbbdev     } else {
48851c0b2f7Stbbdev         // prolong processing of prioritized work so that the thread that started
48951c0b2f7Stbbdev         // prioritized work has higher probability to help with non-prioritized one.
49051c0b2f7Stbbdev         spin_for(0.1);
49151c0b2f7Stbbdev     }
49251c0b2f7Stbbdev }
49351c0b2f7Stbbdev 
49451c0b2f7Stbbdev // Using pointers to nodes to avoid errors on compilers, which try to generate assignment operator
49551c0b2f7Stbbdev // for the nodes
496b15aabb3Stbbdev typedef std::vector< std::unique_ptr<continue_node<continue_msg>> > nodes_container_t;
49751c0b2f7Stbbdev 
create_nodes(nodes_container_t & nodes,graph & g,int num,int body_size)49851c0b2f7Stbbdev void create_nodes( nodes_container_t& nodes, graph& g, int num, int body_size ) {
49951c0b2f7Stbbdev     for( int i = 0; i < num; ++i )
50051c0b2f7Stbbdev         nodes.push_back(
501b15aabb3Stbbdev             std::unique_ptr<continue_node<continue_msg>>(
50251c0b2f7Stbbdev                 new continue_node<continue_msg>( g, CommonBody<NONPRIORITIZED_WORK>( body_size ) )
503b15aabb3Stbbdev             )
50451c0b2f7Stbbdev         );
50551c0b2f7Stbbdev }
50651c0b2f7Stbbdev 
test(int num_threads)50751c0b2f7Stbbdev void test( int num_threads ) {
50851c0b2f7Stbbdev     INFO( "Testing limit execution to priority tasks (num_threads=" << num_threads << ") - " );
50951c0b2f7Stbbdev 
51051c0b2f7Stbbdev     tbb::task_arena arena( num_threads );
51151c0b2f7Stbbdev 	arena.execute(
51251c0b2f7Stbbdev         [&]() {
51351c0b2f7Stbbdev             const int nodes_num = 100;
51451c0b2f7Stbbdev             const int priority_node_position_part = 10;
51551c0b2f7Stbbdev             const int pivot = nodes_num / priority_node_position_part;
51651c0b2f7Stbbdev             const int nodes_in_lane = 3 * num_threads;
51751c0b2f7Stbbdev             const int small_problem_size = 100;
51851c0b2f7Stbbdev             const int large_problem_size = 1000;
51951c0b2f7Stbbdev 
52051c0b2f7Stbbdev             graph g;
52151c0b2f7Stbbdev             nodes_container_t nodes;
52251c0b2f7Stbbdev             create_nodes( nodes, g, pivot, large_problem_size );
52351c0b2f7Stbbdev             nodes.push_back(
524b15aabb3Stbbdev                 std::unique_ptr<continue_node<continue_msg>>(
52551c0b2f7Stbbdev                     new continue_node<continue_msg>(
52651c0b2f7Stbbdev                         g, CommonBody<PRIORITIZED_WORK>(small_problem_size), node_priority_t(1)
52751c0b2f7Stbbdev                     )
528b15aabb3Stbbdev                 )
52951c0b2f7Stbbdev             );
53051c0b2f7Stbbdev             create_nodes( nodes, g, nodes_num - pivot - 1, large_problem_size );
53151c0b2f7Stbbdev 
53251c0b2f7Stbbdev             broadcast_node<continue_msg> bn(g);
53351c0b2f7Stbbdev             for( int i = 0; i < nodes_num; ++i )
53451c0b2f7Stbbdev                 if( i % nodes_in_lane == 0 )
53551c0b2f7Stbbdev                     make_edge( bn, *nodes[i] );
53651c0b2f7Stbbdev                 else
53751c0b2f7Stbbdev                     make_edge( *nodes[i-1], *nodes[i] );
53851c0b2f7Stbbdev             exec_tracker.reset();
53951c0b2f7Stbbdev             bn.try_put( continue_msg() );
54051c0b2f7Stbbdev             g.wait_for_all();
54151c0b2f7Stbbdev         }
54251c0b2f7Stbbdev 	);
543b15aabb3Stbbdev 
544b15aabb3Stbbdev     INFO( "done\n" );
54551c0b2f7Stbbdev }
54651c0b2f7Stbbdev 
54751c0b2f7Stbbdev } /* namespace LimitingExecutionToPriorityTask */
54851c0b2f7Stbbdev 
54951c0b2f7Stbbdev namespace NestedCase {
55051c0b2f7Stbbdev 
55151c0b2f7Stbbdev using tbb::task_arena;
55251c0b2f7Stbbdev 
55351c0b2f7Stbbdev struct InnerBody {
operator ()NestedCase::InnerBody55451c0b2f7Stbbdev     continue_msg operator()( const continue_msg& ) const {
55551c0b2f7Stbbdev         return continue_msg();
55651c0b2f7Stbbdev     }
55751c0b2f7Stbbdev };
55851c0b2f7Stbbdev 
55951c0b2f7Stbbdev struct OuterBody {
56051c0b2f7Stbbdev     int my_max_threads;
561b15aabb3Stbbdev     task_arena** my_inner_arena;
OuterBodyNestedCase::OuterBody562b15aabb3Stbbdev     OuterBody( int max_threads, task_arena** inner_arena )
56351c0b2f7Stbbdev         : my_max_threads(max_threads), my_inner_arena(inner_arena) {}
56451c0b2f7Stbbdev     // copy constructor to please some old compilers
OuterBodyNestedCase::OuterBody56551c0b2f7Stbbdev     OuterBody( const OuterBody& rhs )
56651c0b2f7Stbbdev         : my_max_threads(rhs.my_max_threads), my_inner_arena(rhs.my_inner_arena) {}
operator ()NestedCase::OuterBody56751c0b2f7Stbbdev     int operator()( const int& ) {
56851c0b2f7Stbbdev         graph inner_graph;
56951c0b2f7Stbbdev         continue_node<continue_msg> start_node(inner_graph, InnerBody());
57051c0b2f7Stbbdev         continue_node<continue_msg> mid_node1(inner_graph, InnerBody(), node_priority_t(5));
57151c0b2f7Stbbdev         continue_node<continue_msg> mid_node2(inner_graph, InnerBody());
57251c0b2f7Stbbdev         continue_node<continue_msg> end_node(inner_graph, InnerBody(), node_priority_t(15));
57351c0b2f7Stbbdev         make_edge( start_node, mid_node1 );
57451c0b2f7Stbbdev         make_edge( mid_node1, end_node );
57551c0b2f7Stbbdev         make_edge( start_node, mid_node2 );
57651c0b2f7Stbbdev         make_edge( mid_node2, end_node );
577b15aabb3Stbbdev         (*my_inner_arena)->execute( [&inner_graph]{ inner_graph.reset(); } );
57851c0b2f7Stbbdev         start_node.try_put( continue_msg() );
57951c0b2f7Stbbdev         inner_graph.wait_for_all();
58051c0b2f7Stbbdev         return 13;
58151c0b2f7Stbbdev     }
58251c0b2f7Stbbdev };
58351c0b2f7Stbbdev 
execute_outer_graph(bool same_arena,task_arena & inner_arena,int max_threads,graph & outer_graph,function_node<int,int> & start_node)58451c0b2f7Stbbdev void execute_outer_graph( bool same_arena, task_arena& inner_arena, int max_threads,
58551c0b2f7Stbbdev                           graph& outer_graph, function_node<int,int>& start_node ) {
58651c0b2f7Stbbdev     if( same_arena ) {
58751c0b2f7Stbbdev         start_node.try_put( 42 );
58851c0b2f7Stbbdev         outer_graph.wait_for_all();
58951c0b2f7Stbbdev         return;
59051c0b2f7Stbbdev     }
591b15aabb3Stbbdev 
592b15aabb3Stbbdev     auto threads_range = utils::concurrency_range(max_threads);
593b15aabb3Stbbdev     for( auto num_threads : threads_range ) {
59455f9b178SIvan Kochin         inner_arena.initialize( static_cast<int>(num_threads) );
59551c0b2f7Stbbdev         start_node.try_put( 42 );
59651c0b2f7Stbbdev         outer_graph.wait_for_all();
59751c0b2f7Stbbdev         inner_arena.terminate();
59851c0b2f7Stbbdev     }
59951c0b2f7Stbbdev }
60051c0b2f7Stbbdev 
test_in_arena(int max_threads,task_arena & outer_arena,task_arena & inner_arena,graph & outer_graph,function_node<int,int> & start_node)601b15aabb3Stbbdev void test_in_arena( int max_threads, task_arena& outer_arena, task_arena& inner_arena,
602b15aabb3Stbbdev                     graph& outer_graph, function_node<int, int>& start_node ) {
603b15aabb3Stbbdev     bool same_arena = &outer_arena == &inner_arena;
604b15aabb3Stbbdev     auto threads_range = utils::concurrency_range(max_threads);
605b15aabb3Stbbdev     for( auto num_threads : threads_range ) {
606b15aabb3Stbbdev         INFO( "Testing nested nodes with specified priority in " << (same_arena? "same" : "different")
607b15aabb3Stbbdev               << " arenas, num_threads=" << num_threads << ") - " );
60855f9b178SIvan Kochin         outer_arena.initialize( static_cast<int>(num_threads) );
609b15aabb3Stbbdev         outer_arena.execute( [&outer_graph]{ outer_graph.reset(); } );
610b15aabb3Stbbdev         execute_outer_graph( same_arena, inner_arena, max_threads, outer_graph, start_node );
611b15aabb3Stbbdev         outer_arena.terminate();
612b15aabb3Stbbdev         INFO( "done\n" );
613b15aabb3Stbbdev     }
614b15aabb3Stbbdev }
615b15aabb3Stbbdev 
test(int max_threads)616b15aabb3Stbbdev void test( int max_threads ) {
617b15aabb3Stbbdev     task_arena outer_arena; task_arena inner_arena;
618b15aabb3Stbbdev     task_arena* inner_arena_pointer = &outer_arena; // make it same as outer arena in the beginning
619b15aabb3Stbbdev 
62051c0b2f7Stbbdev     graph outer_graph;
62151c0b2f7Stbbdev     const unsigned num_outer_nodes = 10;
62251c0b2f7Stbbdev     const size_t concurrency = unlimited;
623b15aabb3Stbbdev     std::vector< std::unique_ptr<function_node<int,int>> > outer_nodes;
62451c0b2f7Stbbdev     for( unsigned node_index = 0; node_index < num_outer_nodes; ++node_index ) {
62551c0b2f7Stbbdev         node_priority_t priority = no_priority;
62651c0b2f7Stbbdev         if( node_index == num_outer_nodes / 2 )
62751c0b2f7Stbbdev             priority = 10;
62851c0b2f7Stbbdev 
62951c0b2f7Stbbdev         outer_nodes.push_back(
630b15aabb3Stbbdev             std::unique_ptr< function_node<int, int> >(
63151c0b2f7Stbbdev                 new function_node<int,int>(
632b15aabb3Stbbdev                     outer_graph, concurrency, OuterBody(max_threads, &inner_arena_pointer), priority
633b15aabb3Stbbdev                 )
63451c0b2f7Stbbdev             )
63551c0b2f7Stbbdev         );
63651c0b2f7Stbbdev     }
63751c0b2f7Stbbdev 
63851c0b2f7Stbbdev     for( unsigned node_index1 = 0; node_index1 < num_outer_nodes; ++node_index1 )
63951c0b2f7Stbbdev         for( unsigned node_index2 = node_index1+1; node_index2 < num_outer_nodes; ++node_index2 )
64051c0b2f7Stbbdev             make_edge( *outer_nodes[node_index1], *outer_nodes[node_index2] );
64151c0b2f7Stbbdev 
642b15aabb3Stbbdev     test_in_arena( max_threads, outer_arena, outer_arena, outer_graph, *outer_nodes[0] );
64351c0b2f7Stbbdev 
644b15aabb3Stbbdev     inner_arena_pointer = &inner_arena;
64551c0b2f7Stbbdev 
646b15aabb3Stbbdev     test_in_arena( max_threads, outer_arena, inner_arena, outer_graph, *outer_nodes[0] );
64751c0b2f7Stbbdev }
64851c0b2f7Stbbdev } // namespace NestedCase
64951c0b2f7Stbbdev 
65051c0b2f7Stbbdev 
65151c0b2f7Stbbdev namespace BypassPrioritizedTask {
65251c0b2f7Stbbdev 
common_body(int priority)65351c0b2f7Stbbdev void common_body( int priority ) {
65451c0b2f7Stbbdev     int current_task_index = g_task_num++;
65551c0b2f7Stbbdev     g_task_info.push_back( TaskInfo( priority, current_task_index ) );
65651c0b2f7Stbbdev }
65751c0b2f7Stbbdev 
65851c0b2f7Stbbdev struct Body {
BodyBypassPrioritizedTask::Body65951c0b2f7Stbbdev     Body( int priority ) : my_priority( priority ) {}
operator ()BypassPrioritizedTask::Body66051c0b2f7Stbbdev     continue_msg operator()(const continue_msg&) {
66151c0b2f7Stbbdev         common_body( my_priority );
66251c0b2f7Stbbdev         return continue_msg();
66351c0b2f7Stbbdev     }
66451c0b2f7Stbbdev     int my_priority;
66551c0b2f7Stbbdev };
66651c0b2f7Stbbdev 
66751c0b2f7Stbbdev struct InputNodeBody {
operator ()BypassPrioritizedTask::InputNodeBody66851c0b2f7Stbbdev     continue_msg operator()( tbb::flow_control& fc ){
66951c0b2f7Stbbdev         static bool is_source_executed = false;
67051c0b2f7Stbbdev 
67151c0b2f7Stbbdev         if( is_source_executed ) {
67251c0b2f7Stbbdev             fc.stop();
67351c0b2f7Stbbdev             return continue_msg();
67451c0b2f7Stbbdev         }
67551c0b2f7Stbbdev 
67651c0b2f7Stbbdev         common_body( 0 );
67751c0b2f7Stbbdev         is_source_executed = true;
67851c0b2f7Stbbdev 
67951c0b2f7Stbbdev         return continue_msg();
68051c0b2f7Stbbdev     }
68151c0b2f7Stbbdev };
68251c0b2f7Stbbdev 
68351c0b2f7Stbbdev template<typename StarterNodeType>
create_starter_node(graph & g)68451c0b2f7Stbbdev StarterNodeType create_starter_node(graph& g) {
68551c0b2f7Stbbdev     return continue_node<continue_msg>( g, Body(0) );
68651c0b2f7Stbbdev }
68751c0b2f7Stbbdev 
68851c0b2f7Stbbdev template<>
create_starter_node(graph & g)68951c0b2f7Stbbdev input_node<continue_msg> create_starter_node<input_node<continue_msg>>(graph& g) {
69051c0b2f7Stbbdev     return input_node<continue_msg>( g, InputNodeBody() );
69151c0b2f7Stbbdev }
69251c0b2f7Stbbdev 
69351c0b2f7Stbbdev template<typename StarterNodeType>
start_graph(StarterNodeType & starter)69451c0b2f7Stbbdev void start_graph( StarterNodeType& starter ) {
69551c0b2f7Stbbdev     starter.try_put( continue_msg() );
69651c0b2f7Stbbdev }
69751c0b2f7Stbbdev 
69851c0b2f7Stbbdev template<>
start_graph(input_node<continue_msg> & starter)69951c0b2f7Stbbdev void start_graph<input_node<continue_msg>>( input_node<continue_msg>& starter ) {
70051c0b2f7Stbbdev     starter.activate();
70151c0b2f7Stbbdev }
70251c0b2f7Stbbdev 
70351c0b2f7Stbbdev template<typename StarterNodeType>
test_use_case()70451c0b2f7Stbbdev void test_use_case() {
70551c0b2f7Stbbdev     g_task_info.clear();
70651c0b2f7Stbbdev     g_task_num = 0;
70751c0b2f7Stbbdev     graph g;
70851c0b2f7Stbbdev     StarterNodeType starter = create_starter_node<StarterNodeType>(g);
70951c0b2f7Stbbdev     continue_node<continue_msg> spawn_successor( g, Body(1), node_priority_t(1) );
71051c0b2f7Stbbdev     continue_node<continue_msg> bypass_successor( g, Body(2), node_priority_t(2) );
71151c0b2f7Stbbdev 
71251c0b2f7Stbbdev     make_edge( starter, spawn_successor );
71351c0b2f7Stbbdev     make_edge( starter, bypass_successor );
71451c0b2f7Stbbdev 
71551c0b2f7Stbbdev     start_graph<StarterNodeType>( starter );
71651c0b2f7Stbbdev     g.wait_for_all();
71751c0b2f7Stbbdev 
71851c0b2f7Stbbdev     CHECK_MESSAGE( g_task_info.size() == 3, "" );
71951c0b2f7Stbbdev     CHECK_MESSAGE( g_task_info[0].my_task_index == 0, "" );
72051c0b2f7Stbbdev     CHECK_MESSAGE( g_task_info[1].my_task_index == 1, "" );
72151c0b2f7Stbbdev     CHECK_MESSAGE( g_task_info[2].my_task_index == 2, "" );
72251c0b2f7Stbbdev 
72351c0b2f7Stbbdev     CHECK_MESSAGE( g_task_info[0].my_priority == 0, "" );
72451c0b2f7Stbbdev     CHECK_MESSAGE( g_task_info[1].my_priority == 2, "Bypassed task with higher priority executed in wrong order." );
72551c0b2f7Stbbdev     CHECK_MESSAGE( g_task_info[2].my_priority == 1, "" );
72651c0b2f7Stbbdev }
72751c0b2f7Stbbdev 
72851c0b2f7Stbbdev //! The test checks that the task from the node with higher priority, which task gets bypassed, is
72951c0b2f7Stbbdev //! executed first than the one spawned with lower priority.
test()73051c0b2f7Stbbdev void test() {
73151c0b2f7Stbbdev     test_use_case<continue_node<continue_msg>>();
73251c0b2f7Stbbdev     test_use_case<input_node<continue_msg>>();
73351c0b2f7Stbbdev }
73451c0b2f7Stbbdev 
73551c0b2f7Stbbdev } // namespace BypassPrioritizedTask
73651c0b2f7Stbbdev 
73751c0b2f7Stbbdev namespace ManySuccessors {
73851c0b2f7Stbbdev 
73951c0b2f7Stbbdev struct no_priority_node_body {
operator ()ManySuccessors::no_priority_node_body74051c0b2f7Stbbdev     void operator()(continue_msg) {
74151c0b2f7Stbbdev         CHECK_MESSAGE(
74251c0b2f7Stbbdev             barrier == 0, "Non-priority successor has to be executed after all priority successors"
74351c0b2f7Stbbdev         );
74451c0b2f7Stbbdev     }
74551c0b2f7Stbbdev     std::atomic<int>& barrier;
74651c0b2f7Stbbdev };
74751c0b2f7Stbbdev 
74851c0b2f7Stbbdev struct priority_node_body {
operator ()ManySuccessors::priority_node_body74951c0b2f7Stbbdev     void operator()(continue_msg) {
75051c0b2f7Stbbdev         --barrier;
75151c0b2f7Stbbdev         while (barrier)
752b15aabb3Stbbdev             tbb::detail::d0::yield();
75351c0b2f7Stbbdev     }
75451c0b2f7Stbbdev     std::atomic<int>& barrier;
75551c0b2f7Stbbdev };
75651c0b2f7Stbbdev 
test(int num_threads)75751c0b2f7Stbbdev void test(int num_threads) {
75851c0b2f7Stbbdev     tbb::task_arena arena( num_threads );
75951c0b2f7Stbbdev     arena.execute(
76051c0b2f7Stbbdev         [&]() {
76151c0b2f7Stbbdev             graph g;
76251c0b2f7Stbbdev             broadcast_node<continue_msg> bn(g);
763b15aabb3Stbbdev             std::vector< std::unique_ptr<continue_node<continue_msg>> > nodes;
76451c0b2f7Stbbdev             std::atomic<int> barrier;
76551c0b2f7Stbbdev             for (int i = 0; i < 2 * num_threads; ++i)
766b15aabb3Stbbdev                 nodes.push_back(
767b15aabb3Stbbdev                     std::unique_ptr<continue_node<continue_msg>>(
768b15aabb3Stbbdev                         new continue_node<continue_msg>(g, no_priority_node_body{ barrier })
769b15aabb3Stbbdev                     )
770b15aabb3Stbbdev                 );
77151c0b2f7Stbbdev             for (int i = 0; i < num_threads; ++i)
772b15aabb3Stbbdev                 nodes.push_back(
773b15aabb3Stbbdev                     std::unique_ptr<continue_node<continue_msg>>(
774b15aabb3Stbbdev                         new continue_node<continue_msg>(g, priority_node_body{ barrier }, /*priority*/1)
775b15aabb3Stbbdev                     )
776b15aabb3Stbbdev                 );
77751c0b2f7Stbbdev 
77851c0b2f7Stbbdev             std::random_device rd;
77951c0b2f7Stbbdev             std::mt19937 gen(rd());
78051c0b2f7Stbbdev 
78151c0b2f7Stbbdev             for (int trial = 0; trial < 10; ++trial) {
78251c0b2f7Stbbdev                 barrier = num_threads;
78351c0b2f7Stbbdev                 std::shuffle(nodes.begin(), nodes.end(), gen);
784b15aabb3Stbbdev                 for (auto& n : nodes)
78551c0b2f7Stbbdev                     make_edge(bn, *n);
78651c0b2f7Stbbdev                 bn.try_put(continue_msg());
78751c0b2f7Stbbdev                 g.wait_for_all();
788b15aabb3Stbbdev                 for (auto& n : nodes)
78951c0b2f7Stbbdev                     remove_edge(bn, *n);
79051c0b2f7Stbbdev             }
79151c0b2f7Stbbdev         }
79251c0b2f7Stbbdev     );
79351c0b2f7Stbbdev }
79451c0b2f7Stbbdev 
79551c0b2f7Stbbdev } // namespace ManySuccessors
79651c0b2f7Stbbdev 
79751c0b2f7Stbbdev #if TBB_USE_EXCEPTIONS
79851c0b2f7Stbbdev namespace Exceptions {
test()79951c0b2f7Stbbdev     void test() {
80051c0b2f7Stbbdev         using namespace tbb::flow;
80151c0b2f7Stbbdev         graph g;
80251c0b2f7Stbbdev         std::srand(42);
80349e08aacStbbdev         const unsigned num_messages = 50;
80449e08aacStbbdev         std::vector<unsigned> throwing_msgs;
80549e08aacStbbdev         std::atomic<unsigned> msg_count(0);
80649e08aacStbbdev         continue_node<unsigned> c(g, [&msg_count](continue_msg) {
80749e08aacStbbdev             return ++msg_count;
80851c0b2f7Stbbdev         }, 2);
80949e08aacStbbdev         function_node<unsigned> f(g, unlimited, [&throwing_msgs](unsigned v) {
81049e08aacStbbdev             for( auto i : throwing_msgs ) {
81149e08aacStbbdev                 if( i == v )
81251c0b2f7Stbbdev                     throw std::runtime_error("Exception::test");
81351c0b2f7Stbbdev             }
81451c0b2f7Stbbdev         }, 1);
81551c0b2f7Stbbdev         make_edge(c, f);
81651c0b2f7Stbbdev         for (int i = 0; i < 10; ++i) {
81749e08aacStbbdev             msg_count = 0;
81849e08aacStbbdev             g.reset();
81949e08aacStbbdev             throwing_msgs.push_back(std::rand() % num_messages);
82051c0b2f7Stbbdev             try {
82149e08aacStbbdev                 for (unsigned j = 0; j < num_messages; ++j) {
82251c0b2f7Stbbdev                     c.try_put(continue_msg());
82351c0b2f7Stbbdev                 }
82451c0b2f7Stbbdev                 g.wait_for_all();
82551c0b2f7Stbbdev                 FAIL("Unreachable code. The exception is expected");
82651c0b2f7Stbbdev             } catch (std::runtime_error&) {
82751c0b2f7Stbbdev                 CHECK(g.is_cancelled());
82849e08aacStbbdev                 CHECK(g.exception_thrown());
82951c0b2f7Stbbdev             } catch (...) {
83051c0b2f7Stbbdev                 FAIL("Unexpected exception");
83151c0b2f7Stbbdev             }
83251c0b2f7Stbbdev         }
83351c0b2f7Stbbdev     }
83451c0b2f7Stbbdev } // namespace Exceptions
83551c0b2f7Stbbdev #endif
83651c0b2f7Stbbdev 
83751c0b2f7Stbbdev //! Test node prioritization
83851c0b2f7Stbbdev //! \brief \ref requirement
83951c0b2f7Stbbdev TEST_CASE("Priority nodes take precedence"){
8408dcbd5b1Stbbdev     for( auto p : utils::concurrency_range() ) {
84155f9b178SIvan Kochin         PriorityNodesTakePrecedence::test( static_cast<int>(p) );
84251c0b2f7Stbbdev     }
84351c0b2f7Stbbdev }
84451c0b2f7Stbbdev 
84551c0b2f7Stbbdev //! Test thread eager reaction
84651c0b2f7Stbbdev //! \brief \ref error_guessing
84751c0b2f7Stbbdev TEST_CASE("Thread eager reaction"){
8488dcbd5b1Stbbdev     for( auto p : utils::concurrency_range() ) {
84955f9b178SIvan Kochin         ThreadsEagerReaction::test( static_cast<int>(p) );
85051c0b2f7Stbbdev     }
85151c0b2f7Stbbdev }
85251c0b2f7Stbbdev 
85351c0b2f7Stbbdev //! Test prioritization under concurrency limits
85451c0b2f7Stbbdev //! \brief \ref error_guessing
85551c0b2f7Stbbdev TEST_CASE("Limiting execution to prioritized work") {
8568dcbd5b1Stbbdev     for( auto p : utils::concurrency_range() ) {
85755f9b178SIvan Kochin         LimitingExecutionToPriorityTask::test( static_cast<int>(p) );
85851c0b2f7Stbbdev     }
85951c0b2f7Stbbdev }
86051c0b2f7Stbbdev 
86151c0b2f7Stbbdev //! Test nested graphs
86251c0b2f7Stbbdev //! \brief \ref error_guessing
86351c0b2f7Stbbdev TEST_CASE("Nested test case") {
864b15aabb3Stbbdev     std::size_t max_threads = utils::get_platform_max_threads();
865b15aabb3Stbbdev     // The stepping for the threads is done inside.
86655f9b178SIvan Kochin     NestedCase::test( static_cast<int>(max_threads) );
86751c0b2f7Stbbdev }
86851c0b2f7Stbbdev 
86951c0b2f7Stbbdev //! Test bypassed task with higher priority
87051c0b2f7Stbbdev //! \brief \ref error_guessing
87151c0b2f7Stbbdev TEST_CASE("Bypass prioritized task"){
8728dcbd5b1Stbbdev     tbb::global_control gc( tbb::global_control::max_allowed_parallelism, 1 );
87351c0b2f7Stbbdev     BypassPrioritizedTask::test();
87451c0b2f7Stbbdev }
87551c0b2f7Stbbdev 
87651c0b2f7Stbbdev //! Test mixing prioritized and ordinary successors
87751c0b2f7Stbbdev //! \brief \ref error_guessing
87851c0b2f7Stbbdev TEST_CASE("Many successors") {
8798dcbd5b1Stbbdev     for( auto p : utils::concurrency_range() ) {
88055f9b178SIvan Kochin         ManySuccessors::test( static_cast<int>(p) );
88151c0b2f7Stbbdev     }
88251c0b2f7Stbbdev }
88351c0b2f7Stbbdev 
88451c0b2f7Stbbdev #if TBB_USE_EXCEPTIONS
88551c0b2f7Stbbdev //! Test for exceptions
88651c0b2f7Stbbdev //! \brief \ref error_guessing
88751c0b2f7Stbbdev TEST_CASE("Exceptions") {
88851c0b2f7Stbbdev     Exceptions::test();
88951c0b2f7Stbbdev }
89051c0b2f7Stbbdev #endif
891