151c0b2f7Stbbdev /*
2*b15aabb3Stbbdev     Copyright (c) 2005-2021 Intel Corporation
351c0b2f7Stbbdev 
451c0b2f7Stbbdev     Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev     you may not use this file except in compliance with the License.
651c0b2f7Stbbdev     You may obtain a copy of the License at
751c0b2f7Stbbdev 
851c0b2f7Stbbdev         http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev 
1051c0b2f7Stbbdev     Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev     distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev     See the License for the specific language governing permissions and
1451c0b2f7Stbbdev     limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev 
17*b15aabb3Stbbdev #if __INTEL_COMPILER && _MSC_VER
18*b15aabb3Stbbdev #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
19*b15aabb3Stbbdev #endif
20*b15aabb3Stbbdev 
2151c0b2f7Stbbdev #include "common/config.h"
2251c0b2f7Stbbdev 
2351c0b2f7Stbbdev // TODO revamp: move parts dependent on __TBB_EXTRA_DEBUG into separate test(s) since having these
2451c0b2f7Stbbdev // parts in all of tests might make testing of the product, which is different from what is actually
2551c0b2f7Stbbdev // released.
2651c0b2f7Stbbdev #define __TBB_EXTRA_DEBUG 1
2751c0b2f7Stbbdev #include "tbb/flow_graph.h"
2851c0b2f7Stbbdev #include "tbb/spin_rw_mutex.h"
2951c0b2f7Stbbdev 
3051c0b2f7Stbbdev #include "common/test.h"
3151c0b2f7Stbbdev #include "common/utils.h"
3251c0b2f7Stbbdev #include "common/graph_utils.h"
3351c0b2f7Stbbdev #include "common/test_follows_and_precedes_api.h"
3451c0b2f7Stbbdev 
3551c0b2f7Stbbdev 
3651c0b2f7Stbbdev //! \file test_multifunction_node.cpp
3751c0b2f7Stbbdev //! \brief Test for [flow_graph.multifunction_node] specification
3851c0b2f7Stbbdev 
3951c0b2f7Stbbdev 
4051c0b2f7Stbbdev #if TBB_USE_DEBUG
4151c0b2f7Stbbdev #define N 16
4251c0b2f7Stbbdev #else
4351c0b2f7Stbbdev #define N 100
4451c0b2f7Stbbdev #endif
4551c0b2f7Stbbdev #define MAX_NODES 4
4651c0b2f7Stbbdev 
4751c0b2f7Stbbdev //! Performs test on function nodes with limited concurrency and buffering
4851c0b2f7Stbbdev /** These tests check:
4951c0b2f7Stbbdev     1) that the number of executing copies never exceed the concurrency limit
5051c0b2f7Stbbdev     2) that the node never rejects
5151c0b2f7Stbbdev     3) that no items are lost
5251c0b2f7Stbbdev     and 4) all of this happens even if there are multiple predecessors and successors
5351c0b2f7Stbbdev */
5451c0b2f7Stbbdev 
5551c0b2f7Stbbdev //! exercise buffered multifunction_node.
5651c0b2f7Stbbdev template< typename InputType, typename OutputTuple, typename Body >
5751c0b2f7Stbbdev void buffered_levels( size_t concurrency, Body body ) {
5851c0b2f7Stbbdev     typedef typename std::tuple_element<0,OutputTuple>::type OutputType;
5951c0b2f7Stbbdev     // Do for lc = 1 to concurrency level
6051c0b2f7Stbbdev     for ( size_t lc = 1; lc <= concurrency; ++lc ) {
6151c0b2f7Stbbdev         tbb::flow::graph g;
6251c0b2f7Stbbdev 
6351c0b2f7Stbbdev         // Set the execute_counter back to zero in the harness
6451c0b2f7Stbbdev         harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count = 0;
6551c0b2f7Stbbdev         // Set the number of current executors to zero.
6651c0b2f7Stbbdev         harness_graph_multifunction_executor<InputType, OutputTuple>::current_executors = 0;
6751c0b2f7Stbbdev         // Set the max allowed executors to lc.  There is a check in the functor to make sure this is never exceeded.
6851c0b2f7Stbbdev         harness_graph_multifunction_executor<InputType, OutputTuple>::max_executors = lc;
6951c0b2f7Stbbdev 
7051c0b2f7Stbbdev         // Create the function_node with the appropriate concurrency level, and use default buffering
7151c0b2f7Stbbdev         tbb::flow::multifunction_node< InputType, OutputTuple > exe_node( g, lc, body );
7251c0b2f7Stbbdev 
7351c0b2f7Stbbdev         //Create a vector of identical exe_nodes
7451c0b2f7Stbbdev         std::vector< tbb::flow::multifunction_node< InputType, OutputTuple > > exe_vec(2, exe_node);
7551c0b2f7Stbbdev 
7651c0b2f7Stbbdev         // exercise each of the copied nodes
7751c0b2f7Stbbdev         for (size_t node_idx=0; node_idx<exe_vec.size(); ++node_idx) {
7851c0b2f7Stbbdev             for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
7951c0b2f7Stbbdev                 // Create num_receivers counting receivers and connect the exe_vec[node_idx] to them.
8051c0b2f7Stbbdev                 std::vector< std::shared_ptr<harness_mapped_receiver<OutputType>> > receivers;
8151c0b2f7Stbbdev                 for (size_t i = 0; i < num_receivers; i++) {
8251c0b2f7Stbbdev                     receivers.push_back( std::make_shared<harness_mapped_receiver<OutputType>>(g) );
8351c0b2f7Stbbdev                 }
8451c0b2f7Stbbdev 
8551c0b2f7Stbbdev                 for (size_t r = 0; r < num_receivers; ++r ) {
8651c0b2f7Stbbdev                     tbb::flow::make_edge( tbb::flow::output_port<0>(exe_vec[node_idx]), *receivers[r] );
8751c0b2f7Stbbdev                 }
8851c0b2f7Stbbdev 
8951c0b2f7Stbbdev                 // Do the test with varying numbers of senders
9051c0b2f7Stbbdev                 std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders;
9151c0b2f7Stbbdev                 for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) {
9251c0b2f7Stbbdev                     // Create num_senders senders, set their message limit each to N, and connect
9351c0b2f7Stbbdev                     // them to the exe_vec[node_idx]
9451c0b2f7Stbbdev                     senders.clear();
9551c0b2f7Stbbdev                     for (size_t s = 0; s < num_senders; ++s ) {
9651c0b2f7Stbbdev                         senders.push_back( std::make_shared<harness_counting_sender<InputType>>() );
9751c0b2f7Stbbdev                         senders.back()->my_limit = N;
9851c0b2f7Stbbdev                         tbb::flow::make_edge( *senders.back(), exe_vec[node_idx] );
9951c0b2f7Stbbdev                     }
10051c0b2f7Stbbdev 
10151c0b2f7Stbbdev                     // Initialize the receivers so they know how many senders and messages to check for
10251c0b2f7Stbbdev                     for (size_t r = 0; r < num_receivers; ++r ) {
10351c0b2f7Stbbdev                         receivers[r]->initialize_map( N, num_senders );
10451c0b2f7Stbbdev                     }
10551c0b2f7Stbbdev 
10651c0b2f7Stbbdev                     // Do the test
10751c0b2f7Stbbdev                     utils::NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) );
10851c0b2f7Stbbdev                     g.wait_for_all();
10951c0b2f7Stbbdev 
11051c0b2f7Stbbdev                     // confirm that each sender was requested from N times
11151c0b2f7Stbbdev                     for (size_t s = 0; s < num_senders; ++s ) {
11251c0b2f7Stbbdev                         size_t n = senders[s]->my_received;
11351c0b2f7Stbbdev                         CHECK_MESSAGE( n == N, "" );
11451c0b2f7Stbbdev                         CHECK_MESSAGE( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_vec[node_idx], "" );
11551c0b2f7Stbbdev                     }
11651c0b2f7Stbbdev                     // validate the receivers
11751c0b2f7Stbbdev                     for (size_t r = 0; r < num_receivers; ++r ) {
11851c0b2f7Stbbdev                         receivers[r]->validate();
11951c0b2f7Stbbdev                     }
12051c0b2f7Stbbdev                 }
12151c0b2f7Stbbdev                 for (size_t r = 0; r < num_receivers; ++r ) {
12251c0b2f7Stbbdev                     tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_vec[node_idx]), *receivers[r] );
12351c0b2f7Stbbdev                 }
12451c0b2f7Stbbdev                 CHECK_MESSAGE( exe_vec[node_idx].try_put( InputType() ) == true, "" );
12551c0b2f7Stbbdev                 g.wait_for_all();
12651c0b2f7Stbbdev                 for (size_t r = 0; r < num_receivers; ++r ) {
12751c0b2f7Stbbdev                     // since it's detached, nothing should have changed
12851c0b2f7Stbbdev                     receivers[r]->validate();
12951c0b2f7Stbbdev                 }
13051c0b2f7Stbbdev             }
13151c0b2f7Stbbdev         }
13251c0b2f7Stbbdev     }
13351c0b2f7Stbbdev }
13451c0b2f7Stbbdev 
13551c0b2f7Stbbdev const size_t Offset = 123;
13651c0b2f7Stbbdev std::atomic<size_t> global_execute_count;
13751c0b2f7Stbbdev 
13851c0b2f7Stbbdev struct inc_functor {
13951c0b2f7Stbbdev 
14051c0b2f7Stbbdev     std::atomic<size_t> local_execute_count;
14151c0b2f7Stbbdev     inc_functor( ) { local_execute_count = 0; }
14251c0b2f7Stbbdev     inc_functor( const inc_functor &f ) { local_execute_count = size_t(f.local_execute_count); }
14351c0b2f7Stbbdev 
14451c0b2f7Stbbdev     template<typename output_ports_type>
14551c0b2f7Stbbdev     void operator()( int i, output_ports_type &p ) {
14651c0b2f7Stbbdev        ++global_execute_count;
14751c0b2f7Stbbdev        ++local_execute_count;
14851c0b2f7Stbbdev        (void)std::get<0>(p).try_put(i);
14951c0b2f7Stbbdev     }
15051c0b2f7Stbbdev 
15151c0b2f7Stbbdev };
15251c0b2f7Stbbdev 
15351c0b2f7Stbbdev template< typename InputType, typename OutputTuple >
15451c0b2f7Stbbdev void buffered_levels_with_copy( size_t concurrency ) {
15551c0b2f7Stbbdev     typedef typename std::tuple_element<0,OutputTuple>::type OutputType;
15651c0b2f7Stbbdev     // Do for lc = 1 to concurrency level
15751c0b2f7Stbbdev     for ( size_t lc = 1; lc <= concurrency; ++lc ) {
15851c0b2f7Stbbdev         tbb::flow::graph g;
15951c0b2f7Stbbdev 
16051c0b2f7Stbbdev         inc_functor cf;
16151c0b2f7Stbbdev         cf.local_execute_count = Offset;
16251c0b2f7Stbbdev         global_execute_count = Offset;
16351c0b2f7Stbbdev 
16451c0b2f7Stbbdev         tbb::flow::multifunction_node< InputType, OutputTuple > exe_node( g, lc, cf );
16551c0b2f7Stbbdev 
16651c0b2f7Stbbdev         for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
16751c0b2f7Stbbdev 
16851c0b2f7Stbbdev             std::vector< std::shared_ptr<harness_mapped_receiver<OutputType>> > receivers;
16951c0b2f7Stbbdev             for (size_t i = 0; i < num_receivers; i++) {
17051c0b2f7Stbbdev                 receivers.push_back( std::make_shared<harness_mapped_receiver<OutputType>>(g) );
17151c0b2f7Stbbdev             }
17251c0b2f7Stbbdev 
17351c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
17451c0b2f7Stbbdev                tbb::flow::make_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] );
17551c0b2f7Stbbdev             }
17651c0b2f7Stbbdev 
17751c0b2f7Stbbdev             std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders;
17851c0b2f7Stbbdev             for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) {
17951c0b2f7Stbbdev                 senders.clear();
18051c0b2f7Stbbdev                 for (size_t s = 0; s < num_senders; ++s ) {
18151c0b2f7Stbbdev                     senders.push_back( std::make_shared<harness_counting_sender<InputType>>() );
18251c0b2f7Stbbdev                     senders.back()->my_limit = N;
18351c0b2f7Stbbdev                     tbb::flow::make_edge( *senders.back(), exe_node );
18451c0b2f7Stbbdev                 }
18551c0b2f7Stbbdev 
18651c0b2f7Stbbdev                 for (size_t r = 0; r < num_receivers; ++r ) {
18751c0b2f7Stbbdev                     receivers[r]->initialize_map( N, num_senders );
18851c0b2f7Stbbdev                 }
18951c0b2f7Stbbdev 
19051c0b2f7Stbbdev                 utils::NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) );
19151c0b2f7Stbbdev                 g.wait_for_all();
19251c0b2f7Stbbdev 
19351c0b2f7Stbbdev                 for (size_t s = 0; s < num_senders; ++s ) {
19451c0b2f7Stbbdev                     size_t n = senders[s]->my_received;
19551c0b2f7Stbbdev                     CHECK_MESSAGE( n == N, "" );
19651c0b2f7Stbbdev                     CHECK_MESSAGE( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_node, "" );
19751c0b2f7Stbbdev                 }
19851c0b2f7Stbbdev                 for (size_t r = 0; r < num_receivers; ++r ) {
19951c0b2f7Stbbdev                     receivers[r]->validate();
20051c0b2f7Stbbdev                 }
20151c0b2f7Stbbdev             }
20251c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
20351c0b2f7Stbbdev                 tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] );
20451c0b2f7Stbbdev             }
20551c0b2f7Stbbdev             CHECK_MESSAGE( exe_node.try_put( InputType() ) == true, "" );
20651c0b2f7Stbbdev             g.wait_for_all();
20751c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
20851c0b2f7Stbbdev                 receivers[r]->validate();
20951c0b2f7Stbbdev             }
21051c0b2f7Stbbdev         }
21151c0b2f7Stbbdev 
21251c0b2f7Stbbdev         // validate that the local body matches the global execute_count and both are correct
21351c0b2f7Stbbdev         inc_functor body_copy = tbb::flow::copy_body<inc_functor>( exe_node );
21451c0b2f7Stbbdev         const size_t expected_count = N/2 * MAX_NODES * MAX_NODES * ( MAX_NODES + 1 ) + MAX_NODES + Offset;
21551c0b2f7Stbbdev         size_t global_count = global_execute_count;
21651c0b2f7Stbbdev         size_t inc_count = body_copy.local_execute_count;
21751c0b2f7Stbbdev         CHECK_MESSAGE( (global_count == expected_count && global_count == inc_count), "" );
21851c0b2f7Stbbdev     }
21951c0b2f7Stbbdev }
22051c0b2f7Stbbdev 
22151c0b2f7Stbbdev template< typename InputType, typename OutputTuple >
22251c0b2f7Stbbdev void run_buffered_levels( int c ) {
22351c0b2f7Stbbdev     typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type;
22451c0b2f7Stbbdev     buffered_levels<InputType,OutputTuple>( c, []( InputType i, output_ports_type &p ) { harness_graph_multifunction_executor<InputType, OutputTuple>::func(i,p); } );
22551c0b2f7Stbbdev     buffered_levels<InputType,OutputTuple>( c, &harness_graph_multifunction_executor<InputType, OutputTuple>::func );
22651c0b2f7Stbbdev     buffered_levels<InputType,OutputTuple>( c, typename harness_graph_multifunction_executor<InputType, OutputTuple>::functor() );
22751c0b2f7Stbbdev     buffered_levels_with_copy<InputType,OutputTuple>( c );
22851c0b2f7Stbbdev }
22951c0b2f7Stbbdev 
23051c0b2f7Stbbdev 
23151c0b2f7Stbbdev //! Performs test on executable nodes with limited concurrency
23251c0b2f7Stbbdev /** These tests check:
23351c0b2f7Stbbdev     1) that the nodes will accepts puts up to the concurrency limit,
23451c0b2f7Stbbdev     2) the nodes do not exceed the concurrency limit even when run with more threads (this is checked in the harness_graph_executor),
23551c0b2f7Stbbdev     3) the nodes will receive puts from multiple successors simultaneously,
23651c0b2f7Stbbdev     and 4) the nodes will send to multiple predecessors.
23751c0b2f7Stbbdev     There is no checking of the contents of the messages for corruption.
23851c0b2f7Stbbdev */
23951c0b2f7Stbbdev 
24051c0b2f7Stbbdev template< typename InputType, typename OutputTuple, typename Body >
24151c0b2f7Stbbdev void concurrency_levels( size_t concurrency, Body body ) {
24251c0b2f7Stbbdev     typedef typename std::tuple_element<0,OutputTuple>::type OutputType;
24351c0b2f7Stbbdev     for ( size_t lc = 1; lc <= concurrency; ++lc ) {
24451c0b2f7Stbbdev         tbb::flow::graph g;
24551c0b2f7Stbbdev 
24651c0b2f7Stbbdev         // Set the execute_counter back to zero in the harness
24751c0b2f7Stbbdev         harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count = 0;
24851c0b2f7Stbbdev         // Set the number of current executors to zero.
24951c0b2f7Stbbdev         harness_graph_multifunction_executor<InputType, OutputTuple>::current_executors = 0;
25051c0b2f7Stbbdev         // Set the max allowed executors to lc.  There is a check in the functor to make sure this is never exceeded.
25151c0b2f7Stbbdev         harness_graph_multifunction_executor<InputType, OutputTuple>::max_executors = lc;
25251c0b2f7Stbbdev 
25351c0b2f7Stbbdev 
25451c0b2f7Stbbdev         tbb::flow::multifunction_node< InputType, OutputTuple, tbb::flow::rejecting > exe_node( g, lc, body );
25551c0b2f7Stbbdev 
25651c0b2f7Stbbdev         for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
25751c0b2f7Stbbdev 
25851c0b2f7Stbbdev             std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers;
25951c0b2f7Stbbdev             for (size_t i = 0; i < num_receivers; ++i) {
26051c0b2f7Stbbdev                 receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) );
26151c0b2f7Stbbdev             }
26251c0b2f7Stbbdev 
26351c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
26451c0b2f7Stbbdev                 tbb::flow::make_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] );
26551c0b2f7Stbbdev             }
26651c0b2f7Stbbdev 
26751c0b2f7Stbbdev             std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders;
26851c0b2f7Stbbdev 
26951c0b2f7Stbbdev             for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) {
27051c0b2f7Stbbdev                 {
27151c0b2f7Stbbdev                     // Exclusively lock m to prevent exe_node from finishing
27251c0b2f7Stbbdev                     tbb::spin_rw_mutex::scoped_lock l(
27351c0b2f7Stbbdev                         harness_graph_multifunction_executor< InputType, OutputTuple>::template mutex_holder<tbb::spin_rw_mutex>::mutex
27451c0b2f7Stbbdev                     );
27551c0b2f7Stbbdev 
27651c0b2f7Stbbdev                     // put to lc level, it will accept and then block at m
27751c0b2f7Stbbdev                     for ( size_t c = 0 ; c < lc ; ++c ) {
27851c0b2f7Stbbdev                         CHECK_MESSAGE( exe_node.try_put( InputType() ) == true, "" );
27951c0b2f7Stbbdev                     }
28051c0b2f7Stbbdev                     // it only accepts to lc level
28151c0b2f7Stbbdev                     CHECK_MESSAGE( exe_node.try_put( InputType() ) == false, "" );
28251c0b2f7Stbbdev 
28351c0b2f7Stbbdev                     senders.clear();
28451c0b2f7Stbbdev                     for (size_t s = 0; s < num_senders; ++s ) {
28551c0b2f7Stbbdev                         senders.push_back( std::make_shared<harness_counting_sender<InputType>>() );
28651c0b2f7Stbbdev                         senders.back()->my_limit = N;
28751c0b2f7Stbbdev                         exe_node.register_predecessor( *senders.back() );
28851c0b2f7Stbbdev                     }
28951c0b2f7Stbbdev 
29051c0b2f7Stbbdev                 } // release lock at end of scope, setting the exe node free to continue
29151c0b2f7Stbbdev                 // wait for graph to settle down
29251c0b2f7Stbbdev                 g.wait_for_all();
29351c0b2f7Stbbdev 
29451c0b2f7Stbbdev                 // confirm that each sender was requested from N times
29551c0b2f7Stbbdev                 for (size_t s = 0; s < num_senders; ++s ) {
29651c0b2f7Stbbdev                     size_t n = senders[s]->my_received;
29751c0b2f7Stbbdev                     CHECK_MESSAGE( n == N, "" );
29851c0b2f7Stbbdev                     CHECK_MESSAGE( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_node, "" );
29951c0b2f7Stbbdev                 }
30051c0b2f7Stbbdev                 // confirm that each receivers got N * num_senders + the initial lc puts
30151c0b2f7Stbbdev                 for (size_t r = 0; r < num_receivers; ++r ) {
30251c0b2f7Stbbdev                     size_t n = receivers[r]->my_count;
30351c0b2f7Stbbdev                     CHECK_MESSAGE( n == num_senders*N+lc, "" );
30451c0b2f7Stbbdev                     receivers[r]->my_count = 0;
30551c0b2f7Stbbdev                 }
30651c0b2f7Stbbdev             }
30751c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
30851c0b2f7Stbbdev                 tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] );
30951c0b2f7Stbbdev             }
31051c0b2f7Stbbdev             CHECK_MESSAGE( exe_node.try_put( InputType() ) == true, "" );
31151c0b2f7Stbbdev             g.wait_for_all();
31251c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
31351c0b2f7Stbbdev                 CHECK_MESSAGE( int(receivers[r]->my_count) == 0, "" );
31451c0b2f7Stbbdev             }
31551c0b2f7Stbbdev         }
31651c0b2f7Stbbdev     }
31751c0b2f7Stbbdev }
31851c0b2f7Stbbdev 
31951c0b2f7Stbbdev template< typename InputType, typename OutputTuple >
32051c0b2f7Stbbdev void run_concurrency_levels( int c ) {
32151c0b2f7Stbbdev     typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type;
32251c0b2f7Stbbdev     concurrency_levels<InputType,OutputTuple>( c, []( InputType i, output_ports_type &p ) { harness_graph_multifunction_executor<InputType, OutputTuple>::template tfunc<tbb::spin_rw_mutex>(i,p); } );
32351c0b2f7Stbbdev     concurrency_levels<InputType,OutputTuple>( c, &harness_graph_multifunction_executor<InputType, OutputTuple>::template tfunc<tbb::spin_rw_mutex> );
32451c0b2f7Stbbdev     concurrency_levels<InputType,OutputTuple>( c, typename harness_graph_multifunction_executor<InputType, OutputTuple>::template tfunctor<tbb::spin_rw_mutex>() );
32551c0b2f7Stbbdev }
32651c0b2f7Stbbdev 
32751c0b2f7Stbbdev 
32851c0b2f7Stbbdev struct empty_no_assign {
32951c0b2f7Stbbdev    empty_no_assign() {}
33051c0b2f7Stbbdev    empty_no_assign( int ) {}
33151c0b2f7Stbbdev    operator int() { return 0; }
33251c0b2f7Stbbdev    operator int() const { return 0; }
33351c0b2f7Stbbdev };
33451c0b2f7Stbbdev 
33551c0b2f7Stbbdev template< typename InputType >
33651c0b2f7Stbbdev struct parallel_puts : private utils::NoAssign {
33751c0b2f7Stbbdev 
33851c0b2f7Stbbdev     tbb::flow::receiver< InputType > * const my_exe_node;
33951c0b2f7Stbbdev 
34051c0b2f7Stbbdev     parallel_puts( tbb::flow::receiver< InputType > &exe_node ) : my_exe_node(&exe_node) {}
34151c0b2f7Stbbdev 
34251c0b2f7Stbbdev     void operator()( int ) const  {
34351c0b2f7Stbbdev         for ( int i = 0; i < N; ++i ) {
34451c0b2f7Stbbdev             // the nodes will accept all puts
34551c0b2f7Stbbdev             CHECK_MESSAGE( my_exe_node->try_put( InputType() ) == true, "" );
34651c0b2f7Stbbdev         }
34751c0b2f7Stbbdev     }
34851c0b2f7Stbbdev 
34951c0b2f7Stbbdev };
35051c0b2f7Stbbdev 
35151c0b2f7Stbbdev //! Performs test on executable nodes with unlimited concurrency
35251c0b2f7Stbbdev /** These tests check:
35351c0b2f7Stbbdev     1) that the nodes will accept all puts
35451c0b2f7Stbbdev     2) the nodes will receive puts from multiple predecessors simultaneously,
35551c0b2f7Stbbdev     and 3) the nodes will send to multiple successors.
35651c0b2f7Stbbdev     There is no checking of the contents of the messages for corruption.
35751c0b2f7Stbbdev */
35851c0b2f7Stbbdev 
35951c0b2f7Stbbdev template< typename InputType, typename OutputTuple, typename Body >
36051c0b2f7Stbbdev void unlimited_concurrency( Body body ) {
36151c0b2f7Stbbdev     typedef typename std::tuple_element<0,OutputTuple>::type OutputType;
36251c0b2f7Stbbdev 
36351c0b2f7Stbbdev     for (unsigned int p = 1; p < 2*utils::MaxThread; ++p) {
36451c0b2f7Stbbdev         tbb::flow::graph g;
36551c0b2f7Stbbdev         tbb::flow::multifunction_node< InputType, OutputTuple, tbb::flow::rejecting > exe_node( g, tbb::flow::unlimited, body );
36651c0b2f7Stbbdev 
36751c0b2f7Stbbdev         for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
36851c0b2f7Stbbdev             std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers;
36951c0b2f7Stbbdev             for (size_t i = 0; i < num_receivers; ++i) {
37051c0b2f7Stbbdev                 receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) );
37151c0b2f7Stbbdev             }
37251c0b2f7Stbbdev 
37351c0b2f7Stbbdev             harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count = 0;
37451c0b2f7Stbbdev 
37551c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
37651c0b2f7Stbbdev                 tbb::flow::make_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] );
37751c0b2f7Stbbdev             }
37851c0b2f7Stbbdev 
37951c0b2f7Stbbdev             utils::NativeParallelFor( p, parallel_puts<InputType>(exe_node) );
38051c0b2f7Stbbdev             g.wait_for_all();
38151c0b2f7Stbbdev 
38251c0b2f7Stbbdev             // 2) the nodes will receive puts from multiple predecessors simultaneously,
38351c0b2f7Stbbdev             size_t ec = harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count;
38451c0b2f7Stbbdev             CHECK_MESSAGE( (unsigned int)ec == p*N, "" );
38551c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
38651c0b2f7Stbbdev                 size_t c = receivers[r]->my_count;
38751c0b2f7Stbbdev                 // 3) the nodes will send to multiple successors.
38851c0b2f7Stbbdev                 CHECK_MESSAGE( (unsigned int)c == p*N, "" );
38951c0b2f7Stbbdev             }
39051c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
39151c0b2f7Stbbdev                 tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] );
39251c0b2f7Stbbdev             }
39351c0b2f7Stbbdev         }
39451c0b2f7Stbbdev     }
39551c0b2f7Stbbdev }
39651c0b2f7Stbbdev 
39751c0b2f7Stbbdev template< typename InputType, typename OutputTuple >
39851c0b2f7Stbbdev void run_unlimited_concurrency() {
39951c0b2f7Stbbdev     harness_graph_multifunction_executor<InputType, OutputTuple>::max_executors = 0;
40051c0b2f7Stbbdev     typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type;
40151c0b2f7Stbbdev     unlimited_concurrency<InputType,OutputTuple>( []( InputType i, output_ports_type &p ) { harness_graph_multifunction_executor<InputType, OutputTuple>::func(i,p); } );
40251c0b2f7Stbbdev     unlimited_concurrency<InputType,OutputTuple>( &harness_graph_multifunction_executor<InputType, OutputTuple>::func );
40351c0b2f7Stbbdev     unlimited_concurrency<InputType,OutputTuple>( typename harness_graph_multifunction_executor<InputType, OutputTuple>::functor() );
40451c0b2f7Stbbdev }
40551c0b2f7Stbbdev 
40651c0b2f7Stbbdev template<typename InputType, typename OutputTuple>
40751c0b2f7Stbbdev struct oddEvenBody {
40851c0b2f7Stbbdev     typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type;
40951c0b2f7Stbbdev     typedef typename std::tuple_element<0,OutputTuple>::type EvenType;
41051c0b2f7Stbbdev     typedef typename std::tuple_element<1,OutputTuple>::type OddType;
41151c0b2f7Stbbdev     void operator() (const InputType &i, output_ports_type &p) {
41251c0b2f7Stbbdev         if((int)i % 2) {
41351c0b2f7Stbbdev             (void)std::get<1>(p).try_put(OddType(i));
41451c0b2f7Stbbdev         }
41551c0b2f7Stbbdev         else {
41651c0b2f7Stbbdev             (void)std::get<0>(p).try_put(EvenType(i));
41751c0b2f7Stbbdev         }
41851c0b2f7Stbbdev     }
41951c0b2f7Stbbdev };
42051c0b2f7Stbbdev 
42151c0b2f7Stbbdev template<typename InputType, typename OutputTuple >
42251c0b2f7Stbbdev void run_multiport_test(int num_threads) {
42351c0b2f7Stbbdev     typedef typename tbb::flow::multifunction_node<InputType, OutputTuple> mo_node_type;
42451c0b2f7Stbbdev     typedef typename std::tuple_element<0,OutputTuple>::type EvenType;
42551c0b2f7Stbbdev     typedef typename std::tuple_element<1,OutputTuple>::type OddType;
42651c0b2f7Stbbdev     tbb::task_arena arena(num_threads);
42751c0b2f7Stbbdev     arena.execute(
42851c0b2f7Stbbdev         [&] () {
42951c0b2f7Stbbdev             tbb::flow::graph g;
43051c0b2f7Stbbdev             mo_node_type mo_node(g, tbb::flow::unlimited, oddEvenBody<InputType, OutputTuple>() );
43151c0b2f7Stbbdev 
43251c0b2f7Stbbdev             tbb::flow::queue_node<EvenType> q0(g);
43351c0b2f7Stbbdev             tbb::flow::queue_node<OddType> q1(g);
43451c0b2f7Stbbdev 
43551c0b2f7Stbbdev             tbb::flow::make_edge(tbb::flow::output_port<0>(mo_node), q0);
43651c0b2f7Stbbdev             tbb::flow::make_edge(tbb::flow::output_port<1>(mo_node), q1);
43751c0b2f7Stbbdev 
43851c0b2f7Stbbdev             for(InputType i = 0; i < N; ++i) {
43951c0b2f7Stbbdev                 mo_node.try_put(i);
44051c0b2f7Stbbdev             }
44151c0b2f7Stbbdev 
44251c0b2f7Stbbdev             g.wait_for_all();
44351c0b2f7Stbbdev             for(int i = 0; i < N/2; ++i) {
44451c0b2f7Stbbdev                 EvenType e{};
44551c0b2f7Stbbdev                 OddType o{};
44651c0b2f7Stbbdev                 CHECK_MESSAGE( q0.try_get(e), "" );
44751c0b2f7Stbbdev                 CHECK_MESSAGE( (int)e % 2 == 0, "" );
44851c0b2f7Stbbdev                 CHECK_MESSAGE( q1.try_get(o), "" );
44951c0b2f7Stbbdev                 CHECK_MESSAGE( (int)o % 2 == 1, "" );
45051c0b2f7Stbbdev             }
45151c0b2f7Stbbdev         }
45251c0b2f7Stbbdev     );
45351c0b2f7Stbbdev }
45451c0b2f7Stbbdev 
45551c0b2f7Stbbdev //! Tests limited concurrency cases for nodes that accept data messages
45651c0b2f7Stbbdev void test_concurrency(int num_threads) {
45751c0b2f7Stbbdev     tbb::task_arena arena(num_threads);
45851c0b2f7Stbbdev     arena.execute(
45951c0b2f7Stbbdev         [&] () {
46051c0b2f7Stbbdev             run_concurrency_levels<int,std::tuple<int> >(num_threads);
46151c0b2f7Stbbdev             run_concurrency_levels<int,std::tuple<tbb::flow::continue_msg> >(num_threads);
46251c0b2f7Stbbdev             run_buffered_levels<int, std::tuple<int> >(num_threads);
46351c0b2f7Stbbdev             run_unlimited_concurrency<int, std::tuple<int> >();
46451c0b2f7Stbbdev             run_unlimited_concurrency<int,std::tuple<empty_no_assign> >();
46551c0b2f7Stbbdev             run_unlimited_concurrency<empty_no_assign,std::tuple<int> >();
46651c0b2f7Stbbdev             run_unlimited_concurrency<empty_no_assign,std::tuple<empty_no_assign> >();
46751c0b2f7Stbbdev             run_unlimited_concurrency<int,std::tuple<tbb::flow::continue_msg> >();
46851c0b2f7Stbbdev             run_unlimited_concurrency<empty_no_assign,std::tuple<tbb::flow::continue_msg> >();
46951c0b2f7Stbbdev             run_multiport_test<int, std::tuple<int, int> >(num_threads);
47051c0b2f7Stbbdev             run_multiport_test<float, std::tuple<int, double> >(num_threads);
47151c0b2f7Stbbdev         }
47251c0b2f7Stbbdev     );
47351c0b2f7Stbbdev }
47451c0b2f7Stbbdev 
47551c0b2f7Stbbdev template<typename Policy>
47651c0b2f7Stbbdev void test_ports_return_references() {
47751c0b2f7Stbbdev     tbb::flow::graph g;
47851c0b2f7Stbbdev     typedef int InputType;
47951c0b2f7Stbbdev     typedef std::tuple<int> OutputTuple;
48051c0b2f7Stbbdev     tbb::flow::multifunction_node<InputType, OutputTuple, Policy> mf_node(
48151c0b2f7Stbbdev         g, tbb::flow::unlimited,
48251c0b2f7Stbbdev         &harness_graph_multifunction_executor<InputType, OutputTuple>::empty_func );
48351c0b2f7Stbbdev     test_output_ports_return_ref(mf_node);
48451c0b2f7Stbbdev }
48551c0b2f7Stbbdev 
48651c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
48751c0b2f7Stbbdev #include <array>
48851c0b2f7Stbbdev #include <vector>
48951c0b2f7Stbbdev 
49051c0b2f7Stbbdev void test_precedes() {
49151c0b2f7Stbbdev     using namespace tbb::flow;
49251c0b2f7Stbbdev 
49351c0b2f7Stbbdev     using multinode = multifunction_node<int, std::tuple<int, int>>;
49451c0b2f7Stbbdev 
49551c0b2f7Stbbdev     graph g;
49651c0b2f7Stbbdev 
49751c0b2f7Stbbdev     buffer_node<int> b1(g);
49851c0b2f7Stbbdev     buffer_node<int> b2(g);
49951c0b2f7Stbbdev 
50051c0b2f7Stbbdev     multinode node(precedes(b1, b2), unlimited, [](const int& i, multinode::output_ports_type& op) -> void {
50151c0b2f7Stbbdev             if (i % 2)
50251c0b2f7Stbbdev                 std::get<0>(op).try_put(i);
50351c0b2f7Stbbdev             else
50451c0b2f7Stbbdev                 std::get<1>(op).try_put(i);
50551c0b2f7Stbbdev         }
50651c0b2f7Stbbdev     );
50751c0b2f7Stbbdev 
50851c0b2f7Stbbdev     node.try_put(0);
50951c0b2f7Stbbdev     node.try_put(1);
51051c0b2f7Stbbdev     g.wait_for_all();
51151c0b2f7Stbbdev 
51251c0b2f7Stbbdev     int storage;
51351c0b2f7Stbbdev     CHECK_MESSAGE((b1.try_get(storage) && !b1.try_get(storage) && b2.try_get(storage) && !b2.try_get(storage)),
51451c0b2f7Stbbdev             "Not exact edge quantity was made");
51551c0b2f7Stbbdev }
51651c0b2f7Stbbdev 
51751c0b2f7Stbbdev void test_follows_and_precedes_api() {
51851c0b2f7Stbbdev     using multinode = tbb::flow::multifunction_node<int, std::tuple<int, int, int>>;
51951c0b2f7Stbbdev 
52051c0b2f7Stbbdev     std::array<int, 3> messages_for_follows = { {0, 1, 2} };
52151c0b2f7Stbbdev 
52251c0b2f7Stbbdev     follows_and_precedes_testing::test_follows
52351c0b2f7Stbbdev         <int, tbb::flow::multifunction_node<int, std::tuple<int, int, int>>>
52451c0b2f7Stbbdev         (messages_for_follows, tbb::flow::unlimited, [](const int& i, multinode::output_ports_type& op) -> void {
52551c0b2f7Stbbdev             std::get<0>(op).try_put(i);
52651c0b2f7Stbbdev         });
52751c0b2f7Stbbdev 
52851c0b2f7Stbbdev     test_precedes();
52951c0b2f7Stbbdev }
53051c0b2f7Stbbdev #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
53151c0b2f7Stbbdev 
53251c0b2f7Stbbdev //! Test various node bodies with concurrency
53351c0b2f7Stbbdev //! \brief \ref error_guessing
53451c0b2f7Stbbdev TEST_CASE("Concurrency test"){
53551c0b2f7Stbbdev     for( unsigned int p=utils::MinThread; p<=utils::MaxThread; ++p ) {
53651c0b2f7Stbbdev        test_concurrency(p);
53751c0b2f7Stbbdev     }
53851c0b2f7Stbbdev }
53951c0b2f7Stbbdev 
54051c0b2f7Stbbdev //! Test return types of ports
54151c0b2f7Stbbdev //! \brief \ref error_guessing
54251c0b2f7Stbbdev TEST_CASE("Test ports retrurn references"){
54351c0b2f7Stbbdev     test_ports_return_references<tbb::flow::queueing>();
54451c0b2f7Stbbdev     test_ports_return_references<tbb::flow::rejecting>();
54551c0b2f7Stbbdev }
54651c0b2f7Stbbdev 
54751c0b2f7Stbbdev //! NativeParallelFor testing with various concurrency settings
54851c0b2f7Stbbdev //! \brief \ref error_guessing
54951c0b2f7Stbbdev TEST_CASE("Lightweight testing"){
55051c0b2f7Stbbdev     lightweight_testing::test<tbb::flow::multifunction_node>(10);
55151c0b2f7Stbbdev }
55251c0b2f7Stbbdev 
55351c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
55451c0b2f7Stbbdev //! Test follows and precedes API
55551c0b2f7Stbbdev //! \brief \ref error_guessing
55651c0b2f7Stbbdev TEST_CASE("Test follows-precedes API"){
55751c0b2f7Stbbdev     test_follows_and_precedes_api();
55851c0b2f7Stbbdev }
55951c0b2f7Stbbdev //! Test priority constructor with follows and precedes API
56051c0b2f7Stbbdev //! \brief \ref error_guessing
56151c0b2f7Stbbdev TEST_CASE("Test priority with follows and precedes"){
56251c0b2f7Stbbdev     using namespace tbb::flow;
56351c0b2f7Stbbdev 
56451c0b2f7Stbbdev     using multinode = multifunction_node<int, std::tuple<int, int>>;
56551c0b2f7Stbbdev 
56651c0b2f7Stbbdev     graph g;
56751c0b2f7Stbbdev 
56851c0b2f7Stbbdev     buffer_node<int> b1(g);
56951c0b2f7Stbbdev     buffer_node<int> b2(g);
57051c0b2f7Stbbdev 
57151c0b2f7Stbbdev     multinode node(precedes(b1, b2), unlimited, [](const int& i, multinode::output_ports_type& op) -> void {
57251c0b2f7Stbbdev             if (i % 2)
57351c0b2f7Stbbdev                 std::get<0>(op).try_put(i);
57451c0b2f7Stbbdev             else
57551c0b2f7Stbbdev                 std::get<1>(op).try_put(i);
57651c0b2f7Stbbdev         }
57751c0b2f7Stbbdev         , node_priority_t(0));
57851c0b2f7Stbbdev 
57951c0b2f7Stbbdev     node.try_put(0);
58051c0b2f7Stbbdev     node.try_put(1);
58151c0b2f7Stbbdev     g.wait_for_all();
58251c0b2f7Stbbdev 
58351c0b2f7Stbbdev     int storage;
58451c0b2f7Stbbdev     CHECK_MESSAGE((b1.try_get(storage) && !b1.try_get(storage) && b2.try_get(storage) && !b2.try_get(storage)),
58551c0b2f7Stbbdev             "Not exact edge quantity was made");
58651c0b2f7Stbbdev }
58751c0b2f7Stbbdev 
58851c0b2f7Stbbdev #endif
58951c0b2f7Stbbdev 
590