151c0b2f7Stbbdev /*
2b15aabb3Stbbdev     Copyright (c) 2005-2021 Intel Corporation
351c0b2f7Stbbdev 
451c0b2f7Stbbdev     Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev     you may not use this file except in compliance with the License.
651c0b2f7Stbbdev     You may obtain a copy of the License at
751c0b2f7Stbbdev 
851c0b2f7Stbbdev         http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev 
1051c0b2f7Stbbdev     Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev     distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev     See the License for the specific language governing permissions and
1451c0b2f7Stbbdev     limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev 
17b15aabb3Stbbdev #if __INTEL_COMPILER && _MSC_VER
18b15aabb3Stbbdev #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
19b15aabb3Stbbdev #endif
20b15aabb3Stbbdev 
2151c0b2f7Stbbdev #include "common/config.h"
2251c0b2f7Stbbdev 
2351c0b2f7Stbbdev #include "tbb/flow_graph.h"
2451c0b2f7Stbbdev #include "tbb/spin_rw_mutex.h"
2551c0b2f7Stbbdev 
2651c0b2f7Stbbdev #include "common/test.h"
2751c0b2f7Stbbdev #include "common/utils.h"
2851c0b2f7Stbbdev #include "common/graph_utils.h"
2951c0b2f7Stbbdev #include "common/test_follows_and_precedes_api.h"
30*478de5b1Stbbdev #include "common/concepts_common.h"
3151c0b2f7Stbbdev 
3251c0b2f7Stbbdev 
3351c0b2f7Stbbdev //! \file test_multifunction_node.cpp
3451c0b2f7Stbbdev //! \brief Test for [flow_graph.multifunction_node] specification
3551c0b2f7Stbbdev 
3651c0b2f7Stbbdev 
3751c0b2f7Stbbdev #if TBB_USE_DEBUG
3851c0b2f7Stbbdev #define N 16
3951c0b2f7Stbbdev #else
4051c0b2f7Stbbdev #define N 100
4151c0b2f7Stbbdev #endif
4251c0b2f7Stbbdev #define MAX_NODES 4
4351c0b2f7Stbbdev 
4451c0b2f7Stbbdev //! Performs test on function nodes with limited concurrency and buffering
4551c0b2f7Stbbdev /** These tests check:
4651c0b2f7Stbbdev     1) that the number of executing copies never exceed the concurrency limit
4751c0b2f7Stbbdev     2) that the node never rejects
4851c0b2f7Stbbdev     3) that no items are lost
4951c0b2f7Stbbdev     and 4) all of this happens even if there are multiple predecessors and successors
5051c0b2f7Stbbdev */
5151c0b2f7Stbbdev 
5251c0b2f7Stbbdev //! exercise buffered multifunction_node.
5351c0b2f7Stbbdev template< typename InputType, typename OutputTuple, typename Body >
buffered_levels(size_t concurrency,Body body)5451c0b2f7Stbbdev void buffered_levels( size_t concurrency, Body body ) {
5551c0b2f7Stbbdev     typedef typename std::tuple_element<0,OutputTuple>::type OutputType;
5651c0b2f7Stbbdev     // Do for lc = 1 to concurrency level
5751c0b2f7Stbbdev     for ( size_t lc = 1; lc <= concurrency; ++lc ) {
5851c0b2f7Stbbdev         tbb::flow::graph g;
5951c0b2f7Stbbdev 
6051c0b2f7Stbbdev         // Set the execute_counter back to zero in the harness
6151c0b2f7Stbbdev         harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count = 0;
6251c0b2f7Stbbdev         // Set the number of current executors to zero.
6351c0b2f7Stbbdev         harness_graph_multifunction_executor<InputType, OutputTuple>::current_executors = 0;
6451c0b2f7Stbbdev         // Set the max allowed executors to lc.  There is a check in the functor to make sure this is never exceeded.
6551c0b2f7Stbbdev         harness_graph_multifunction_executor<InputType, OutputTuple>::max_executors = lc;
6651c0b2f7Stbbdev 
6751c0b2f7Stbbdev         // Create the function_node with the appropriate concurrency level, and use default buffering
6851c0b2f7Stbbdev         tbb::flow::multifunction_node< InputType, OutputTuple > exe_node( g, lc, body );
6951c0b2f7Stbbdev 
7051c0b2f7Stbbdev         //Create a vector of identical exe_nodes
7151c0b2f7Stbbdev         std::vector< tbb::flow::multifunction_node< InputType, OutputTuple > > exe_vec(2, exe_node);
7251c0b2f7Stbbdev 
7351c0b2f7Stbbdev         // exercise each of the copied nodes
7451c0b2f7Stbbdev         for (size_t node_idx=0; node_idx<exe_vec.size(); ++node_idx) {
7551c0b2f7Stbbdev             for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
7651c0b2f7Stbbdev                 // Create num_receivers counting receivers and connect the exe_vec[node_idx] to them.
7751c0b2f7Stbbdev                 std::vector< std::shared_ptr<harness_mapped_receiver<OutputType>> > receivers;
7851c0b2f7Stbbdev                 for (size_t i = 0; i < num_receivers; i++) {
7951c0b2f7Stbbdev                     receivers.push_back( std::make_shared<harness_mapped_receiver<OutputType>>(g) );
8051c0b2f7Stbbdev                 }
8151c0b2f7Stbbdev 
8251c0b2f7Stbbdev                 for (size_t r = 0; r < num_receivers; ++r ) {
8351c0b2f7Stbbdev                     tbb::flow::make_edge( tbb::flow::output_port<0>(exe_vec[node_idx]), *receivers[r] );
8451c0b2f7Stbbdev                 }
8551c0b2f7Stbbdev 
8651c0b2f7Stbbdev                 // Do the test with varying numbers of senders
8751c0b2f7Stbbdev                 std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders;
8851c0b2f7Stbbdev                 for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) {
8951c0b2f7Stbbdev                     // Create num_senders senders, set their message limit each to N, and connect
9051c0b2f7Stbbdev                     // them to the exe_vec[node_idx]
9151c0b2f7Stbbdev                     senders.clear();
9251c0b2f7Stbbdev                     for (size_t s = 0; s < num_senders; ++s ) {
9351c0b2f7Stbbdev                         senders.push_back( std::make_shared<harness_counting_sender<InputType>>() );
9451c0b2f7Stbbdev                         senders.back()->my_limit = N;
9551c0b2f7Stbbdev                         tbb::flow::make_edge( *senders.back(), exe_vec[node_idx] );
9651c0b2f7Stbbdev                     }
9751c0b2f7Stbbdev 
9851c0b2f7Stbbdev                     // Initialize the receivers so they know how many senders and messages to check for
9951c0b2f7Stbbdev                     for (size_t r = 0; r < num_receivers; ++r ) {
10051c0b2f7Stbbdev                         receivers[r]->initialize_map( N, num_senders );
10151c0b2f7Stbbdev                     }
10251c0b2f7Stbbdev 
10351c0b2f7Stbbdev                     // Do the test
10451c0b2f7Stbbdev                     utils::NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) );
10551c0b2f7Stbbdev                     g.wait_for_all();
10651c0b2f7Stbbdev 
10751c0b2f7Stbbdev                     // confirm that each sender was requested from N times
10851c0b2f7Stbbdev                     for (size_t s = 0; s < num_senders; ++s ) {
10951c0b2f7Stbbdev                         size_t n = senders[s]->my_received;
11051c0b2f7Stbbdev                         CHECK_MESSAGE( n == N, "" );
11151c0b2f7Stbbdev                         CHECK_MESSAGE( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_vec[node_idx], "" );
11251c0b2f7Stbbdev                     }
11351c0b2f7Stbbdev                     // validate the receivers
11451c0b2f7Stbbdev                     for (size_t r = 0; r < num_receivers; ++r ) {
11551c0b2f7Stbbdev                         receivers[r]->validate();
11651c0b2f7Stbbdev                     }
11751c0b2f7Stbbdev                 }
11851c0b2f7Stbbdev                 for (size_t r = 0; r < num_receivers; ++r ) {
11951c0b2f7Stbbdev                     tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_vec[node_idx]), *receivers[r] );
12051c0b2f7Stbbdev                 }
12151c0b2f7Stbbdev                 CHECK_MESSAGE( exe_vec[node_idx].try_put( InputType() ) == true, "" );
12251c0b2f7Stbbdev                 g.wait_for_all();
12351c0b2f7Stbbdev                 for (size_t r = 0; r < num_receivers; ++r ) {
12451c0b2f7Stbbdev                     // since it's detached, nothing should have changed
12551c0b2f7Stbbdev                     receivers[r]->validate();
12651c0b2f7Stbbdev                 }
12751c0b2f7Stbbdev             }
12851c0b2f7Stbbdev         }
12951c0b2f7Stbbdev     }
13051c0b2f7Stbbdev }
13151c0b2f7Stbbdev 
13251c0b2f7Stbbdev const size_t Offset = 123;
13351c0b2f7Stbbdev std::atomic<size_t> global_execute_count;
13451c0b2f7Stbbdev 
13551c0b2f7Stbbdev struct inc_functor {
13651c0b2f7Stbbdev 
13751c0b2f7Stbbdev     std::atomic<size_t> local_execute_count;
inc_functorinc_functor13851c0b2f7Stbbdev     inc_functor( ) { local_execute_count = 0; }
inc_functorinc_functor13951c0b2f7Stbbdev     inc_functor( const inc_functor &f ) { local_execute_count = size_t(f.local_execute_count); }
14051c0b2f7Stbbdev 
14151c0b2f7Stbbdev     template<typename output_ports_type>
operator ()inc_functor14251c0b2f7Stbbdev     void operator()( int i, output_ports_type &p ) {
14351c0b2f7Stbbdev        ++global_execute_count;
14451c0b2f7Stbbdev        ++local_execute_count;
14551c0b2f7Stbbdev        (void)std::get<0>(p).try_put(i);
14651c0b2f7Stbbdev     }
14751c0b2f7Stbbdev 
14851c0b2f7Stbbdev };
14951c0b2f7Stbbdev 
15051c0b2f7Stbbdev template< typename InputType, typename OutputTuple >
buffered_levels_with_copy(size_t concurrency)15151c0b2f7Stbbdev void buffered_levels_with_copy( size_t concurrency ) {
15251c0b2f7Stbbdev     typedef typename std::tuple_element<0,OutputTuple>::type OutputType;
15351c0b2f7Stbbdev     // Do for lc = 1 to concurrency level
15451c0b2f7Stbbdev     for ( size_t lc = 1; lc <= concurrency; ++lc ) {
15551c0b2f7Stbbdev         tbb::flow::graph g;
15651c0b2f7Stbbdev 
15751c0b2f7Stbbdev         inc_functor cf;
15851c0b2f7Stbbdev         cf.local_execute_count = Offset;
15951c0b2f7Stbbdev         global_execute_count = Offset;
16051c0b2f7Stbbdev 
16151c0b2f7Stbbdev         tbb::flow::multifunction_node< InputType, OutputTuple > exe_node( g, lc, cf );
16251c0b2f7Stbbdev 
16351c0b2f7Stbbdev         for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
16451c0b2f7Stbbdev 
16551c0b2f7Stbbdev             std::vector< std::shared_ptr<harness_mapped_receiver<OutputType>> > receivers;
16651c0b2f7Stbbdev             for (size_t i = 0; i < num_receivers; i++) {
16751c0b2f7Stbbdev                 receivers.push_back( std::make_shared<harness_mapped_receiver<OutputType>>(g) );
16851c0b2f7Stbbdev             }
16951c0b2f7Stbbdev 
17051c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
17151c0b2f7Stbbdev                tbb::flow::make_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] );
17251c0b2f7Stbbdev             }
17351c0b2f7Stbbdev 
17451c0b2f7Stbbdev             std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders;
17551c0b2f7Stbbdev             for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) {
17651c0b2f7Stbbdev                 senders.clear();
17751c0b2f7Stbbdev                 for (size_t s = 0; s < num_senders; ++s ) {
17851c0b2f7Stbbdev                     senders.push_back( std::make_shared<harness_counting_sender<InputType>>() );
17951c0b2f7Stbbdev                     senders.back()->my_limit = N;
18051c0b2f7Stbbdev                     tbb::flow::make_edge( *senders.back(), exe_node );
18151c0b2f7Stbbdev                 }
18251c0b2f7Stbbdev 
18351c0b2f7Stbbdev                 for (size_t r = 0; r < num_receivers; ++r ) {
18451c0b2f7Stbbdev                     receivers[r]->initialize_map( N, num_senders );
18551c0b2f7Stbbdev                 }
18651c0b2f7Stbbdev 
18751c0b2f7Stbbdev                 utils::NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) );
18851c0b2f7Stbbdev                 g.wait_for_all();
18951c0b2f7Stbbdev 
19051c0b2f7Stbbdev                 for (size_t s = 0; s < num_senders; ++s ) {
19151c0b2f7Stbbdev                     size_t n = senders[s]->my_received;
19251c0b2f7Stbbdev                     CHECK_MESSAGE( n == N, "" );
19351c0b2f7Stbbdev                     CHECK_MESSAGE( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_node, "" );
19451c0b2f7Stbbdev                 }
19551c0b2f7Stbbdev                 for (size_t r = 0; r < num_receivers; ++r ) {
19651c0b2f7Stbbdev                     receivers[r]->validate();
19751c0b2f7Stbbdev                 }
19851c0b2f7Stbbdev             }
19951c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
20051c0b2f7Stbbdev                 tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] );
20151c0b2f7Stbbdev             }
20251c0b2f7Stbbdev             CHECK_MESSAGE( exe_node.try_put( InputType() ) == true, "" );
20351c0b2f7Stbbdev             g.wait_for_all();
20451c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
20551c0b2f7Stbbdev                 receivers[r]->validate();
20651c0b2f7Stbbdev             }
20751c0b2f7Stbbdev         }
20851c0b2f7Stbbdev 
20951c0b2f7Stbbdev         // validate that the local body matches the global execute_count and both are correct
21051c0b2f7Stbbdev         inc_functor body_copy = tbb::flow::copy_body<inc_functor>( exe_node );
21151c0b2f7Stbbdev         const size_t expected_count = N/2 * MAX_NODES * MAX_NODES * ( MAX_NODES + 1 ) + MAX_NODES + Offset;
21251c0b2f7Stbbdev         size_t global_count = global_execute_count;
21351c0b2f7Stbbdev         size_t inc_count = body_copy.local_execute_count;
21451c0b2f7Stbbdev         CHECK_MESSAGE( (global_count == expected_count && global_count == inc_count), "" );
21551c0b2f7Stbbdev     }
21651c0b2f7Stbbdev }
21751c0b2f7Stbbdev 
21851c0b2f7Stbbdev template< typename InputType, typename OutputTuple >
run_buffered_levels(int c)21951c0b2f7Stbbdev void run_buffered_levels( int c ) {
22051c0b2f7Stbbdev     typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type;
22151c0b2f7Stbbdev     buffered_levels<InputType,OutputTuple>( c, []( InputType i, output_ports_type &p ) { harness_graph_multifunction_executor<InputType, OutputTuple>::func(i,p); } );
22251c0b2f7Stbbdev     buffered_levels<InputType,OutputTuple>( c, &harness_graph_multifunction_executor<InputType, OutputTuple>::func );
22351c0b2f7Stbbdev     buffered_levels<InputType,OutputTuple>( c, typename harness_graph_multifunction_executor<InputType, OutputTuple>::functor() );
22451c0b2f7Stbbdev     buffered_levels_with_copy<InputType,OutputTuple>( c );
22551c0b2f7Stbbdev }
22651c0b2f7Stbbdev 
22751c0b2f7Stbbdev 
22851c0b2f7Stbbdev //! Performs test on executable nodes with limited concurrency
22951c0b2f7Stbbdev /** These tests check:
23051c0b2f7Stbbdev     1) that the nodes will accepts puts up to the concurrency limit,
23151c0b2f7Stbbdev     2) the nodes do not exceed the concurrency limit even when run with more threads (this is checked in the harness_graph_executor),
23251c0b2f7Stbbdev     3) the nodes will receive puts from multiple successors simultaneously,
23351c0b2f7Stbbdev     and 4) the nodes will send to multiple predecessors.
23451c0b2f7Stbbdev     There is no checking of the contents of the messages for corruption.
23551c0b2f7Stbbdev */
23651c0b2f7Stbbdev 
23751c0b2f7Stbbdev template< typename InputType, typename OutputTuple, typename Body >
concurrency_levels(size_t concurrency,Body body)23851c0b2f7Stbbdev void concurrency_levels( size_t concurrency, Body body ) {
23951c0b2f7Stbbdev     typedef typename std::tuple_element<0,OutputTuple>::type OutputType;
24051c0b2f7Stbbdev     for ( size_t lc = 1; lc <= concurrency; ++lc ) {
24151c0b2f7Stbbdev         tbb::flow::graph g;
24251c0b2f7Stbbdev 
24351c0b2f7Stbbdev         // Set the execute_counter back to zero in the harness
24451c0b2f7Stbbdev         harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count = 0;
24551c0b2f7Stbbdev         // Set the number of current executors to zero.
24651c0b2f7Stbbdev         harness_graph_multifunction_executor<InputType, OutputTuple>::current_executors = 0;
24751c0b2f7Stbbdev         // Set the max allowed executors to lc.  There is a check in the functor to make sure this is never exceeded.
24851c0b2f7Stbbdev         harness_graph_multifunction_executor<InputType, OutputTuple>::max_executors = lc;
24951c0b2f7Stbbdev 
25051c0b2f7Stbbdev 
25151c0b2f7Stbbdev         tbb::flow::multifunction_node< InputType, OutputTuple, tbb::flow::rejecting > exe_node( g, lc, body );
25251c0b2f7Stbbdev 
25351c0b2f7Stbbdev         for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
25451c0b2f7Stbbdev 
25551c0b2f7Stbbdev             std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers;
25651c0b2f7Stbbdev             for (size_t i = 0; i < num_receivers; ++i) {
25751c0b2f7Stbbdev                 receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) );
25851c0b2f7Stbbdev             }
25951c0b2f7Stbbdev 
26051c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
26151c0b2f7Stbbdev                 tbb::flow::make_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] );
26251c0b2f7Stbbdev             }
26351c0b2f7Stbbdev 
26451c0b2f7Stbbdev             std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders;
26551c0b2f7Stbbdev 
26651c0b2f7Stbbdev             for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) {
26751c0b2f7Stbbdev                 {
26851c0b2f7Stbbdev                     // Exclusively lock m to prevent exe_node from finishing
26951c0b2f7Stbbdev                     tbb::spin_rw_mutex::scoped_lock l(
27051c0b2f7Stbbdev                         harness_graph_multifunction_executor< InputType, OutputTuple>::template mutex_holder<tbb::spin_rw_mutex>::mutex
27151c0b2f7Stbbdev                     );
27251c0b2f7Stbbdev 
27351c0b2f7Stbbdev                     // put to lc level, it will accept and then block at m
27451c0b2f7Stbbdev                     for ( size_t c = 0 ; c < lc ; ++c ) {
27551c0b2f7Stbbdev                         CHECK_MESSAGE( exe_node.try_put( InputType() ) == true, "" );
27651c0b2f7Stbbdev                     }
27751c0b2f7Stbbdev                     // it only accepts to lc level
27851c0b2f7Stbbdev                     CHECK_MESSAGE( exe_node.try_put( InputType() ) == false, "" );
27951c0b2f7Stbbdev 
28051c0b2f7Stbbdev                     senders.clear();
28151c0b2f7Stbbdev                     for (size_t s = 0; s < num_senders; ++s ) {
28251c0b2f7Stbbdev                         senders.push_back( std::make_shared<harness_counting_sender<InputType>>() );
28351c0b2f7Stbbdev                         senders.back()->my_limit = N;
28451c0b2f7Stbbdev                         exe_node.register_predecessor( *senders.back() );
28551c0b2f7Stbbdev                     }
28651c0b2f7Stbbdev 
28751c0b2f7Stbbdev                 } // release lock at end of scope, setting the exe node free to continue
28851c0b2f7Stbbdev                 // wait for graph to settle down
28951c0b2f7Stbbdev                 g.wait_for_all();
29051c0b2f7Stbbdev 
29151c0b2f7Stbbdev                 // confirm that each sender was requested from N times
29251c0b2f7Stbbdev                 for (size_t s = 0; s < num_senders; ++s ) {
29351c0b2f7Stbbdev                     size_t n = senders[s]->my_received;
29451c0b2f7Stbbdev                     CHECK_MESSAGE( n == N, "" );
29551c0b2f7Stbbdev                     CHECK_MESSAGE( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_node, "" );
29651c0b2f7Stbbdev                 }
29751c0b2f7Stbbdev                 // confirm that each receivers got N * num_senders + the initial lc puts
29851c0b2f7Stbbdev                 for (size_t r = 0; r < num_receivers; ++r ) {
29951c0b2f7Stbbdev                     size_t n = receivers[r]->my_count;
30051c0b2f7Stbbdev                     CHECK_MESSAGE( n == num_senders*N+lc, "" );
30151c0b2f7Stbbdev                     receivers[r]->my_count = 0;
30251c0b2f7Stbbdev                 }
30351c0b2f7Stbbdev             }
30451c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
30551c0b2f7Stbbdev                 tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] );
30651c0b2f7Stbbdev             }
30751c0b2f7Stbbdev             CHECK_MESSAGE( exe_node.try_put( InputType() ) == true, "" );
30851c0b2f7Stbbdev             g.wait_for_all();
30951c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
31051c0b2f7Stbbdev                 CHECK_MESSAGE( int(receivers[r]->my_count) == 0, "" );
31151c0b2f7Stbbdev             }
31251c0b2f7Stbbdev         }
31351c0b2f7Stbbdev     }
31451c0b2f7Stbbdev }
31551c0b2f7Stbbdev 
31651c0b2f7Stbbdev template< typename InputType, typename OutputTuple >
run_concurrency_levels(int c)31751c0b2f7Stbbdev void run_concurrency_levels( int c ) {
31851c0b2f7Stbbdev     typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type;
31951c0b2f7Stbbdev     concurrency_levels<InputType,OutputTuple>( c, []( InputType i, output_ports_type &p ) { harness_graph_multifunction_executor<InputType, OutputTuple>::template tfunc<tbb::spin_rw_mutex>(i,p); } );
32051c0b2f7Stbbdev     concurrency_levels<InputType,OutputTuple>( c, &harness_graph_multifunction_executor<InputType, OutputTuple>::template tfunc<tbb::spin_rw_mutex> );
32151c0b2f7Stbbdev     concurrency_levels<InputType,OutputTuple>( c, typename harness_graph_multifunction_executor<InputType, OutputTuple>::template tfunctor<tbb::spin_rw_mutex>() );
32251c0b2f7Stbbdev }
32351c0b2f7Stbbdev 
32451c0b2f7Stbbdev 
32551c0b2f7Stbbdev struct empty_no_assign {
empty_no_assignempty_no_assign32651c0b2f7Stbbdev    empty_no_assign() {}
empty_no_assignempty_no_assign32751c0b2f7Stbbdev    empty_no_assign( int ) {}
operator intempty_no_assign32851c0b2f7Stbbdev    operator int() { return 0; }
operator intempty_no_assign32951c0b2f7Stbbdev    operator int() const { return 0; }
33051c0b2f7Stbbdev };
33151c0b2f7Stbbdev 
33251c0b2f7Stbbdev template< typename InputType >
33351c0b2f7Stbbdev struct parallel_puts : private utils::NoAssign {
33451c0b2f7Stbbdev 
33551c0b2f7Stbbdev     tbb::flow::receiver< InputType > * const my_exe_node;
33651c0b2f7Stbbdev 
parallel_putsparallel_puts33751c0b2f7Stbbdev     parallel_puts( tbb::flow::receiver< InputType > &exe_node ) : my_exe_node(&exe_node) {}
33851c0b2f7Stbbdev 
operator ()parallel_puts33951c0b2f7Stbbdev     void operator()( int ) const  {
34051c0b2f7Stbbdev         for ( int i = 0; i < N; ++i ) {
34151c0b2f7Stbbdev             // the nodes will accept all puts
34251c0b2f7Stbbdev             CHECK_MESSAGE( my_exe_node->try_put( InputType() ) == true, "" );
34351c0b2f7Stbbdev         }
34451c0b2f7Stbbdev     }
34551c0b2f7Stbbdev 
34651c0b2f7Stbbdev };
34751c0b2f7Stbbdev 
34851c0b2f7Stbbdev //! Performs test on executable nodes with unlimited concurrency
34951c0b2f7Stbbdev /** These tests check:
35051c0b2f7Stbbdev     1) that the nodes will accept all puts
35151c0b2f7Stbbdev     2) the nodes will receive puts from multiple predecessors simultaneously,
35251c0b2f7Stbbdev     and 3) the nodes will send to multiple successors.
35351c0b2f7Stbbdev     There is no checking of the contents of the messages for corruption.
35451c0b2f7Stbbdev */
35551c0b2f7Stbbdev 
35651c0b2f7Stbbdev template< typename InputType, typename OutputTuple, typename Body >
unlimited_concurrency(Body body)35751c0b2f7Stbbdev void unlimited_concurrency( Body body ) {
35851c0b2f7Stbbdev     typedef typename std::tuple_element<0,OutputTuple>::type OutputType;
35951c0b2f7Stbbdev 
36051c0b2f7Stbbdev     for (unsigned int p = 1; p < 2*utils::MaxThread; ++p) {
36151c0b2f7Stbbdev         tbb::flow::graph g;
36251c0b2f7Stbbdev         tbb::flow::multifunction_node< InputType, OutputTuple, tbb::flow::rejecting > exe_node( g, tbb::flow::unlimited, body );
36351c0b2f7Stbbdev 
36451c0b2f7Stbbdev         for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
36551c0b2f7Stbbdev             std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers;
36651c0b2f7Stbbdev             for (size_t i = 0; i < num_receivers; ++i) {
36751c0b2f7Stbbdev                 receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) );
36851c0b2f7Stbbdev             }
36951c0b2f7Stbbdev 
37051c0b2f7Stbbdev             harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count = 0;
37151c0b2f7Stbbdev 
37251c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
37351c0b2f7Stbbdev                 tbb::flow::make_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] );
37451c0b2f7Stbbdev             }
37551c0b2f7Stbbdev 
37651c0b2f7Stbbdev             utils::NativeParallelFor( p, parallel_puts<InputType>(exe_node) );
37751c0b2f7Stbbdev             g.wait_for_all();
37851c0b2f7Stbbdev 
37951c0b2f7Stbbdev             // 2) the nodes will receive puts from multiple predecessors simultaneously,
38051c0b2f7Stbbdev             size_t ec = harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count;
38151c0b2f7Stbbdev             CHECK_MESSAGE( (unsigned int)ec == p*N, "" );
38251c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
38351c0b2f7Stbbdev                 size_t c = receivers[r]->my_count;
38451c0b2f7Stbbdev                 // 3) the nodes will send to multiple successors.
38551c0b2f7Stbbdev                 CHECK_MESSAGE( (unsigned int)c == p*N, "" );
38651c0b2f7Stbbdev             }
38751c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
38851c0b2f7Stbbdev                 tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] );
38951c0b2f7Stbbdev             }
39051c0b2f7Stbbdev         }
39151c0b2f7Stbbdev     }
39251c0b2f7Stbbdev }
39351c0b2f7Stbbdev 
39451c0b2f7Stbbdev template< typename InputType, typename OutputTuple >
run_unlimited_concurrency()39551c0b2f7Stbbdev void run_unlimited_concurrency() {
39651c0b2f7Stbbdev     harness_graph_multifunction_executor<InputType, OutputTuple>::max_executors = 0;
39751c0b2f7Stbbdev     typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type;
39851c0b2f7Stbbdev     unlimited_concurrency<InputType,OutputTuple>( []( InputType i, output_ports_type &p ) { harness_graph_multifunction_executor<InputType, OutputTuple>::func(i,p); } );
39951c0b2f7Stbbdev     unlimited_concurrency<InputType,OutputTuple>( &harness_graph_multifunction_executor<InputType, OutputTuple>::func );
40051c0b2f7Stbbdev     unlimited_concurrency<InputType,OutputTuple>( typename harness_graph_multifunction_executor<InputType, OutputTuple>::functor() );
40151c0b2f7Stbbdev }
40251c0b2f7Stbbdev 
40351c0b2f7Stbbdev template<typename InputType, typename OutputTuple>
40451c0b2f7Stbbdev struct oddEvenBody {
40551c0b2f7Stbbdev     typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type;
40651c0b2f7Stbbdev     typedef typename std::tuple_element<0,OutputTuple>::type EvenType;
40751c0b2f7Stbbdev     typedef typename std::tuple_element<1,OutputTuple>::type OddType;
operator ()oddEvenBody40851c0b2f7Stbbdev     void operator() (const InputType &i, output_ports_type &p) {
40951c0b2f7Stbbdev         if((int)i % 2) {
41051c0b2f7Stbbdev             (void)std::get<1>(p).try_put(OddType(i));
41151c0b2f7Stbbdev         }
41251c0b2f7Stbbdev         else {
41351c0b2f7Stbbdev             (void)std::get<0>(p).try_put(EvenType(i));
41451c0b2f7Stbbdev         }
41551c0b2f7Stbbdev     }
41651c0b2f7Stbbdev };
41751c0b2f7Stbbdev 
41851c0b2f7Stbbdev template<typename InputType, typename OutputTuple >
run_multiport_test(int num_threads)41951c0b2f7Stbbdev void run_multiport_test(int num_threads) {
42051c0b2f7Stbbdev     typedef typename tbb::flow::multifunction_node<InputType, OutputTuple> mo_node_type;
42151c0b2f7Stbbdev     typedef typename std::tuple_element<0,OutputTuple>::type EvenType;
42251c0b2f7Stbbdev     typedef typename std::tuple_element<1,OutputTuple>::type OddType;
42351c0b2f7Stbbdev     tbb::task_arena arena(num_threads);
42451c0b2f7Stbbdev     arena.execute(
42551c0b2f7Stbbdev         [&] () {
42651c0b2f7Stbbdev             tbb::flow::graph g;
42751c0b2f7Stbbdev             mo_node_type mo_node(g, tbb::flow::unlimited, oddEvenBody<InputType, OutputTuple>() );
42851c0b2f7Stbbdev 
42951c0b2f7Stbbdev             tbb::flow::queue_node<EvenType> q0(g);
43051c0b2f7Stbbdev             tbb::flow::queue_node<OddType> q1(g);
43151c0b2f7Stbbdev 
43251c0b2f7Stbbdev             tbb::flow::make_edge(tbb::flow::output_port<0>(mo_node), q0);
43351c0b2f7Stbbdev             tbb::flow::make_edge(tbb::flow::output_port<1>(mo_node), q1);
43451c0b2f7Stbbdev 
43551c0b2f7Stbbdev             for(InputType i = 0; i < N; ++i) {
43651c0b2f7Stbbdev                 mo_node.try_put(i);
43751c0b2f7Stbbdev             }
43851c0b2f7Stbbdev 
43951c0b2f7Stbbdev             g.wait_for_all();
44051c0b2f7Stbbdev             for(int i = 0; i < N/2; ++i) {
44151c0b2f7Stbbdev                 EvenType e{};
44251c0b2f7Stbbdev                 OddType o{};
44351c0b2f7Stbbdev                 CHECK_MESSAGE( q0.try_get(e), "" );
44451c0b2f7Stbbdev                 CHECK_MESSAGE( (int)e % 2 == 0, "" );
44551c0b2f7Stbbdev                 CHECK_MESSAGE( q1.try_get(o), "" );
44651c0b2f7Stbbdev                 CHECK_MESSAGE( (int)o % 2 == 1, "" );
44751c0b2f7Stbbdev             }
44851c0b2f7Stbbdev         }
44951c0b2f7Stbbdev     );
45051c0b2f7Stbbdev }
45151c0b2f7Stbbdev 
45251c0b2f7Stbbdev //! Tests limited concurrency cases for nodes that accept data messages
test_concurrency(int num_threads)45351c0b2f7Stbbdev void test_concurrency(int num_threads) {
45451c0b2f7Stbbdev     tbb::task_arena arena(num_threads);
45551c0b2f7Stbbdev     arena.execute(
45651c0b2f7Stbbdev         [&] () {
45751c0b2f7Stbbdev             run_concurrency_levels<int,std::tuple<int> >(num_threads);
45851c0b2f7Stbbdev             run_concurrency_levels<int,std::tuple<tbb::flow::continue_msg> >(num_threads);
45951c0b2f7Stbbdev             run_buffered_levels<int, std::tuple<int> >(num_threads);
46051c0b2f7Stbbdev             run_unlimited_concurrency<int, std::tuple<int> >();
46151c0b2f7Stbbdev             run_unlimited_concurrency<int,std::tuple<empty_no_assign> >();
46251c0b2f7Stbbdev             run_unlimited_concurrency<empty_no_assign,std::tuple<int> >();
46351c0b2f7Stbbdev             run_unlimited_concurrency<empty_no_assign,std::tuple<empty_no_assign> >();
46451c0b2f7Stbbdev             run_unlimited_concurrency<int,std::tuple<tbb::flow::continue_msg> >();
46551c0b2f7Stbbdev             run_unlimited_concurrency<empty_no_assign,std::tuple<tbb::flow::continue_msg> >();
46651c0b2f7Stbbdev             run_multiport_test<int, std::tuple<int, int> >(num_threads);
46751c0b2f7Stbbdev             run_multiport_test<float, std::tuple<int, double> >(num_threads);
46851c0b2f7Stbbdev         }
46951c0b2f7Stbbdev     );
47051c0b2f7Stbbdev }
47151c0b2f7Stbbdev 
47251c0b2f7Stbbdev template<typename Policy>
test_ports_return_references()47351c0b2f7Stbbdev void test_ports_return_references() {
47451c0b2f7Stbbdev     tbb::flow::graph g;
47551c0b2f7Stbbdev     typedef int InputType;
47651c0b2f7Stbbdev     typedef std::tuple<int> OutputTuple;
47751c0b2f7Stbbdev     tbb::flow::multifunction_node<InputType, OutputTuple, Policy> mf_node(
47851c0b2f7Stbbdev         g, tbb::flow::unlimited,
47951c0b2f7Stbbdev         &harness_graph_multifunction_executor<InputType, OutputTuple>::empty_func );
48051c0b2f7Stbbdev     test_output_ports_return_ref(mf_node);
48151c0b2f7Stbbdev }
48251c0b2f7Stbbdev 
48351c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
48451c0b2f7Stbbdev #include <array>
48551c0b2f7Stbbdev #include <vector>
48651c0b2f7Stbbdev 
test_precedes()48751c0b2f7Stbbdev void test_precedes() {
48851c0b2f7Stbbdev     using namespace tbb::flow;
48951c0b2f7Stbbdev 
49051c0b2f7Stbbdev     using multinode = multifunction_node<int, std::tuple<int, int>>;
49151c0b2f7Stbbdev 
49251c0b2f7Stbbdev     graph g;
49351c0b2f7Stbbdev 
49451c0b2f7Stbbdev     buffer_node<int> b1(g);
49551c0b2f7Stbbdev     buffer_node<int> b2(g);
49651c0b2f7Stbbdev 
49751c0b2f7Stbbdev     multinode node(precedes(b1, b2), unlimited, [](const int& i, multinode::output_ports_type& op) -> void {
49851c0b2f7Stbbdev             if (i % 2)
49951c0b2f7Stbbdev                 std::get<0>(op).try_put(i);
50051c0b2f7Stbbdev             else
50151c0b2f7Stbbdev                 std::get<1>(op).try_put(i);
50251c0b2f7Stbbdev         }
50351c0b2f7Stbbdev     );
50451c0b2f7Stbbdev 
50551c0b2f7Stbbdev     node.try_put(0);
50651c0b2f7Stbbdev     node.try_put(1);
50751c0b2f7Stbbdev     g.wait_for_all();
50851c0b2f7Stbbdev 
50951c0b2f7Stbbdev     int storage;
51051c0b2f7Stbbdev     CHECK_MESSAGE((b1.try_get(storage) && !b1.try_get(storage) && b2.try_get(storage) && !b2.try_get(storage)),
51151c0b2f7Stbbdev             "Not exact edge quantity was made");
51251c0b2f7Stbbdev }
51351c0b2f7Stbbdev 
test_follows_and_precedes_api()51451c0b2f7Stbbdev void test_follows_and_precedes_api() {
51551c0b2f7Stbbdev     using multinode = tbb::flow::multifunction_node<int, std::tuple<int, int, int>>;
51651c0b2f7Stbbdev 
51751c0b2f7Stbbdev     std::array<int, 3> messages_for_follows = { {0, 1, 2} };
51851c0b2f7Stbbdev 
51951c0b2f7Stbbdev     follows_and_precedes_testing::test_follows
52051c0b2f7Stbbdev         <int, tbb::flow::multifunction_node<int, std::tuple<int, int, int>>>
52151c0b2f7Stbbdev         (messages_for_follows, tbb::flow::unlimited, [](const int& i, multinode::output_ports_type& op) -> void {
52251c0b2f7Stbbdev             std::get<0>(op).try_put(i);
52351c0b2f7Stbbdev         });
52451c0b2f7Stbbdev 
52551c0b2f7Stbbdev     test_precedes();
52651c0b2f7Stbbdev }
52751c0b2f7Stbbdev #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
52851c0b2f7Stbbdev 
52951c0b2f7Stbbdev //! Test various node bodies with concurrency
53051c0b2f7Stbbdev //! \brief \ref error_guessing
53151c0b2f7Stbbdev TEST_CASE("Concurrency test"){
53251c0b2f7Stbbdev     for( unsigned int p=utils::MinThread; p<=utils::MaxThread; ++p ) {
53351c0b2f7Stbbdev        test_concurrency(p);
53451c0b2f7Stbbdev     }
53551c0b2f7Stbbdev }
53651c0b2f7Stbbdev 
53751c0b2f7Stbbdev //! Test return types of ports
53851c0b2f7Stbbdev //! \brief \ref error_guessing
53951c0b2f7Stbbdev TEST_CASE("Test ports retrurn references"){
54051c0b2f7Stbbdev     test_ports_return_references<tbb::flow::queueing>();
54151c0b2f7Stbbdev     test_ports_return_references<tbb::flow::rejecting>();
54251c0b2f7Stbbdev }
54351c0b2f7Stbbdev 
54451c0b2f7Stbbdev //! NativeParallelFor testing with various concurrency settings
54551c0b2f7Stbbdev //! \brief \ref error_guessing
54651c0b2f7Stbbdev TEST_CASE("Lightweight testing"){
54751c0b2f7Stbbdev     lightweight_testing::test<tbb::flow::multifunction_node>(10);
54851c0b2f7Stbbdev }
54951c0b2f7Stbbdev 
55051c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
55151c0b2f7Stbbdev //! Test follows and precedes API
55251c0b2f7Stbbdev //! \brief \ref error_guessing
55351c0b2f7Stbbdev TEST_CASE("Test follows-precedes API"){
55451c0b2f7Stbbdev     test_follows_and_precedes_api();
55551c0b2f7Stbbdev }
55651c0b2f7Stbbdev //! Test priority constructor with follows and precedes API
55751c0b2f7Stbbdev //! \brief \ref error_guessing
55851c0b2f7Stbbdev TEST_CASE("Test priority with follows and precedes"){
55951c0b2f7Stbbdev     using namespace tbb::flow;
56051c0b2f7Stbbdev 
56151c0b2f7Stbbdev     using multinode = multifunction_node<int, std::tuple<int, int>>;
56251c0b2f7Stbbdev 
56351c0b2f7Stbbdev     graph g;
56451c0b2f7Stbbdev 
56551c0b2f7Stbbdev     buffer_node<int> b1(g);
56651c0b2f7Stbbdev     buffer_node<int> b2(g);
56751c0b2f7Stbbdev 
__anon0fcb77200802(const int& i, multinode::output_ports_type& op) 56851c0b2f7Stbbdev     multinode node(precedes(b1, b2), unlimited, [](const int& i, multinode::output_ports_type& op) -> void {
56951c0b2f7Stbbdev             if (i % 2)
57051c0b2f7Stbbdev                 std::get<0>(op).try_put(i);
57151c0b2f7Stbbdev             else
57251c0b2f7Stbbdev                 std::get<1>(op).try_put(i);
57351c0b2f7Stbbdev         }
57451c0b2f7Stbbdev         , node_priority_t(0));
57551c0b2f7Stbbdev 
57651c0b2f7Stbbdev     node.try_put(0);
57751c0b2f7Stbbdev     node.try_put(1);
57851c0b2f7Stbbdev     g.wait_for_all();
57951c0b2f7Stbbdev 
58051c0b2f7Stbbdev     int storage;
58151c0b2f7Stbbdev     CHECK_MESSAGE((b1.try_get(storage) && !b1.try_get(storage) && b2.try_get(storage) && !b2.try_get(storage)),
58251c0b2f7Stbbdev             "Not exact edge quantity was made");
58351c0b2f7Stbbdev }
58451c0b2f7Stbbdev 
58551c0b2f7Stbbdev #endif
58651c0b2f7Stbbdev 
587*478de5b1Stbbdev #if __TBB_CPP20_CONCEPTS_PRESENT
588*478de5b1Stbbdev //! \brief \ref error_guessing
589*478de5b1Stbbdev TEST_CASE("constraints for multifunction_node input") {
590*478de5b1Stbbdev     struct InputObject {
591*478de5b1Stbbdev         InputObject() = default;
592*478de5b1Stbbdev         InputObject( const InputObject& ) = default;
593*478de5b1Stbbdev     };
594*478de5b1Stbbdev 
595*478de5b1Stbbdev     static_assert(utils::well_formed_instantiation<tbb::flow::multifunction_node, InputObject, int>);
596*478de5b1Stbbdev     static_assert(utils::well_formed_instantiation<tbb::flow::multifunction_node, int, int>);
597*478de5b1Stbbdev     static_assert(!utils::well_formed_instantiation<tbb::flow::multifunction_node, test_concepts::NonCopyable, int>);
598*478de5b1Stbbdev     static_assert(!utils::well_formed_instantiation<tbb::flow::multifunction_node, test_concepts::NonDefaultInitializable, int>);
599*478de5b1Stbbdev }
600*478de5b1Stbbdev 
601*478de5b1Stbbdev template <typename Input, typename Output, typename Body>
602*478de5b1Stbbdev concept can_call_multifunction_node_ctor = requires( tbb::flow::graph& graph, std::size_t concurrency, Body body,
603*478de5b1Stbbdev                                                      tbb::flow::node_priority_t priority, tbb::flow::buffer_node<int>& f ) {
604*478de5b1Stbbdev     tbb::flow::multifunction_node<Input, Output>(graph, concurrency, body);
605*478de5b1Stbbdev     tbb::flow::multifunction_node<Input, Output>(graph, concurrency, body, priority);
606*478de5b1Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
607*478de5b1Stbbdev     tbb::flow::multifunction_node<Input, Output>(tbb::flow::follows(f), concurrency, body);
608*478de5b1Stbbdev     tbb::flow::multifunction_node<Input, Output>(tbb::flow::follows(f), concurrency, body, priority);
609*478de5b1Stbbdev #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
610*478de5b1Stbbdev };
611*478de5b1Stbbdev 
612*478de5b1Stbbdev //! \brief \ref error_guessing
613*478de5b1Stbbdev TEST_CASE("constraints for multifunction_node body") {
614*478de5b1Stbbdev     using input_type = int;
615*478de5b1Stbbdev     using output_type = std::tuple<int>;
616*478de5b1Stbbdev     using namespace test_concepts::multifunction_node_body;
617*478de5b1Stbbdev 
618*478de5b1Stbbdev     static_assert(can_call_multifunction_node_ctor<input_type, output_type, Correct<input_type, output_type>>);
619*478de5b1Stbbdev     static_assert(!can_call_multifunction_node_ctor<input_type, output_type, NonCopyable<input_type, output_type>>);
620*478de5b1Stbbdev     static_assert(!can_call_multifunction_node_ctor<input_type, output_type, NonDestructible<input_type, output_type>>);
621*478de5b1Stbbdev     static_assert(!can_call_multifunction_node_ctor<input_type, output_type, NoOperatorRoundBrackets<input_type, output_type>>);
622*478de5b1Stbbdev     static_assert(!can_call_multifunction_node_ctor<input_type, output_type, WrongFirstInputOperatorRoundBrackets<input_type, output_type>>);
623*478de5b1Stbbdev     static_assert(!can_call_multifunction_node_ctor<input_type, output_type, WrongSecondInputOperatorRoundBrackets<input_type, output_type>>);
624*478de5b1Stbbdev }
625*478de5b1Stbbdev #endif // __TBB_CPP20_CONCEPTS_PRESENT
626