151c0b2f7Stbbdev /*
2b15aabb3Stbbdev Copyright (c) 2005-2021 Intel Corporation
351c0b2f7Stbbdev
451c0b2f7Stbbdev Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev you may not use this file except in compliance with the License.
651c0b2f7Stbbdev You may obtain a copy of the License at
751c0b2f7Stbbdev
851c0b2f7Stbbdev http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev
1051c0b2f7Stbbdev Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev See the License for the specific language governing permissions and
1451c0b2f7Stbbdev limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev
17b15aabb3Stbbdev #if __INTEL_COMPILER && _MSC_VER
18b15aabb3Stbbdev #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
19b15aabb3Stbbdev #endif
20b15aabb3Stbbdev
2151c0b2f7Stbbdev #include "common/config.h"
2251c0b2f7Stbbdev
2351c0b2f7Stbbdev #include "tbb/flow_graph.h"
2451c0b2f7Stbbdev #include "tbb/spin_rw_mutex.h"
2551c0b2f7Stbbdev
2651c0b2f7Stbbdev #include "common/test.h"
2751c0b2f7Stbbdev #include "common/utils.h"
2851c0b2f7Stbbdev #include "common/graph_utils.h"
2951c0b2f7Stbbdev #include "common/test_follows_and_precedes_api.h"
30*478de5b1Stbbdev #include "common/concepts_common.h"
3151c0b2f7Stbbdev
3251c0b2f7Stbbdev
3351c0b2f7Stbbdev //! \file test_multifunction_node.cpp
3451c0b2f7Stbbdev //! \brief Test for [flow_graph.multifunction_node] specification
3551c0b2f7Stbbdev
3651c0b2f7Stbbdev
3751c0b2f7Stbbdev #if TBB_USE_DEBUG
3851c0b2f7Stbbdev #define N 16
3951c0b2f7Stbbdev #else
4051c0b2f7Stbbdev #define N 100
4151c0b2f7Stbbdev #endif
4251c0b2f7Stbbdev #define MAX_NODES 4
4351c0b2f7Stbbdev
4451c0b2f7Stbbdev //! Performs test on function nodes with limited concurrency and buffering
4551c0b2f7Stbbdev /** These tests check:
4651c0b2f7Stbbdev 1) that the number of executing copies never exceed the concurrency limit
4751c0b2f7Stbbdev 2) that the node never rejects
4851c0b2f7Stbbdev 3) that no items are lost
4951c0b2f7Stbbdev and 4) all of this happens even if there are multiple predecessors and successors
5051c0b2f7Stbbdev */
5151c0b2f7Stbbdev
5251c0b2f7Stbbdev //! exercise buffered multifunction_node.
5351c0b2f7Stbbdev template< typename InputType, typename OutputTuple, typename Body >
buffered_levels(size_t concurrency,Body body)5451c0b2f7Stbbdev void buffered_levels( size_t concurrency, Body body ) {
5551c0b2f7Stbbdev typedef typename std::tuple_element<0,OutputTuple>::type OutputType;
5651c0b2f7Stbbdev // Do for lc = 1 to concurrency level
5751c0b2f7Stbbdev for ( size_t lc = 1; lc <= concurrency; ++lc ) {
5851c0b2f7Stbbdev tbb::flow::graph g;
5951c0b2f7Stbbdev
6051c0b2f7Stbbdev // Set the execute_counter back to zero in the harness
6151c0b2f7Stbbdev harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count = 0;
6251c0b2f7Stbbdev // Set the number of current executors to zero.
6351c0b2f7Stbbdev harness_graph_multifunction_executor<InputType, OutputTuple>::current_executors = 0;
6451c0b2f7Stbbdev // Set the max allowed executors to lc. There is a check in the functor to make sure this is never exceeded.
6551c0b2f7Stbbdev harness_graph_multifunction_executor<InputType, OutputTuple>::max_executors = lc;
6651c0b2f7Stbbdev
6751c0b2f7Stbbdev // Create the function_node with the appropriate concurrency level, and use default buffering
6851c0b2f7Stbbdev tbb::flow::multifunction_node< InputType, OutputTuple > exe_node( g, lc, body );
6951c0b2f7Stbbdev
7051c0b2f7Stbbdev //Create a vector of identical exe_nodes
7151c0b2f7Stbbdev std::vector< tbb::flow::multifunction_node< InputType, OutputTuple > > exe_vec(2, exe_node);
7251c0b2f7Stbbdev
7351c0b2f7Stbbdev // exercise each of the copied nodes
7451c0b2f7Stbbdev for (size_t node_idx=0; node_idx<exe_vec.size(); ++node_idx) {
7551c0b2f7Stbbdev for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
7651c0b2f7Stbbdev // Create num_receivers counting receivers and connect the exe_vec[node_idx] to them.
7751c0b2f7Stbbdev std::vector< std::shared_ptr<harness_mapped_receiver<OutputType>> > receivers;
7851c0b2f7Stbbdev for (size_t i = 0; i < num_receivers; i++) {
7951c0b2f7Stbbdev receivers.push_back( std::make_shared<harness_mapped_receiver<OutputType>>(g) );
8051c0b2f7Stbbdev }
8151c0b2f7Stbbdev
8251c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) {
8351c0b2f7Stbbdev tbb::flow::make_edge( tbb::flow::output_port<0>(exe_vec[node_idx]), *receivers[r] );
8451c0b2f7Stbbdev }
8551c0b2f7Stbbdev
8651c0b2f7Stbbdev // Do the test with varying numbers of senders
8751c0b2f7Stbbdev std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders;
8851c0b2f7Stbbdev for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) {
8951c0b2f7Stbbdev // Create num_senders senders, set their message limit each to N, and connect
9051c0b2f7Stbbdev // them to the exe_vec[node_idx]
9151c0b2f7Stbbdev senders.clear();
9251c0b2f7Stbbdev for (size_t s = 0; s < num_senders; ++s ) {
9351c0b2f7Stbbdev senders.push_back( std::make_shared<harness_counting_sender<InputType>>() );
9451c0b2f7Stbbdev senders.back()->my_limit = N;
9551c0b2f7Stbbdev tbb::flow::make_edge( *senders.back(), exe_vec[node_idx] );
9651c0b2f7Stbbdev }
9751c0b2f7Stbbdev
9851c0b2f7Stbbdev // Initialize the receivers so they know how many senders and messages to check for
9951c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) {
10051c0b2f7Stbbdev receivers[r]->initialize_map( N, num_senders );
10151c0b2f7Stbbdev }
10251c0b2f7Stbbdev
10351c0b2f7Stbbdev // Do the test
10451c0b2f7Stbbdev utils::NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) );
10551c0b2f7Stbbdev g.wait_for_all();
10651c0b2f7Stbbdev
10751c0b2f7Stbbdev // confirm that each sender was requested from N times
10851c0b2f7Stbbdev for (size_t s = 0; s < num_senders; ++s ) {
10951c0b2f7Stbbdev size_t n = senders[s]->my_received;
11051c0b2f7Stbbdev CHECK_MESSAGE( n == N, "" );
11151c0b2f7Stbbdev CHECK_MESSAGE( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_vec[node_idx], "" );
11251c0b2f7Stbbdev }
11351c0b2f7Stbbdev // validate the receivers
11451c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) {
11551c0b2f7Stbbdev receivers[r]->validate();
11651c0b2f7Stbbdev }
11751c0b2f7Stbbdev }
11851c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) {
11951c0b2f7Stbbdev tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_vec[node_idx]), *receivers[r] );
12051c0b2f7Stbbdev }
12151c0b2f7Stbbdev CHECK_MESSAGE( exe_vec[node_idx].try_put( InputType() ) == true, "" );
12251c0b2f7Stbbdev g.wait_for_all();
12351c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) {
12451c0b2f7Stbbdev // since it's detached, nothing should have changed
12551c0b2f7Stbbdev receivers[r]->validate();
12651c0b2f7Stbbdev }
12751c0b2f7Stbbdev }
12851c0b2f7Stbbdev }
12951c0b2f7Stbbdev }
13051c0b2f7Stbbdev }
13151c0b2f7Stbbdev
13251c0b2f7Stbbdev const size_t Offset = 123;
13351c0b2f7Stbbdev std::atomic<size_t> global_execute_count;
13451c0b2f7Stbbdev
13551c0b2f7Stbbdev struct inc_functor {
13651c0b2f7Stbbdev
13751c0b2f7Stbbdev std::atomic<size_t> local_execute_count;
inc_functorinc_functor13851c0b2f7Stbbdev inc_functor( ) { local_execute_count = 0; }
inc_functorinc_functor13951c0b2f7Stbbdev inc_functor( const inc_functor &f ) { local_execute_count = size_t(f.local_execute_count); }
14051c0b2f7Stbbdev
14151c0b2f7Stbbdev template<typename output_ports_type>
operator ()inc_functor14251c0b2f7Stbbdev void operator()( int i, output_ports_type &p ) {
14351c0b2f7Stbbdev ++global_execute_count;
14451c0b2f7Stbbdev ++local_execute_count;
14551c0b2f7Stbbdev (void)std::get<0>(p).try_put(i);
14651c0b2f7Stbbdev }
14751c0b2f7Stbbdev
14851c0b2f7Stbbdev };
14951c0b2f7Stbbdev
15051c0b2f7Stbbdev template< typename InputType, typename OutputTuple >
buffered_levels_with_copy(size_t concurrency)15151c0b2f7Stbbdev void buffered_levels_with_copy( size_t concurrency ) {
15251c0b2f7Stbbdev typedef typename std::tuple_element<0,OutputTuple>::type OutputType;
15351c0b2f7Stbbdev // Do for lc = 1 to concurrency level
15451c0b2f7Stbbdev for ( size_t lc = 1; lc <= concurrency; ++lc ) {
15551c0b2f7Stbbdev tbb::flow::graph g;
15651c0b2f7Stbbdev
15751c0b2f7Stbbdev inc_functor cf;
15851c0b2f7Stbbdev cf.local_execute_count = Offset;
15951c0b2f7Stbbdev global_execute_count = Offset;
16051c0b2f7Stbbdev
16151c0b2f7Stbbdev tbb::flow::multifunction_node< InputType, OutputTuple > exe_node( g, lc, cf );
16251c0b2f7Stbbdev
16351c0b2f7Stbbdev for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
16451c0b2f7Stbbdev
16551c0b2f7Stbbdev std::vector< std::shared_ptr<harness_mapped_receiver<OutputType>> > receivers;
16651c0b2f7Stbbdev for (size_t i = 0; i < num_receivers; i++) {
16751c0b2f7Stbbdev receivers.push_back( std::make_shared<harness_mapped_receiver<OutputType>>(g) );
16851c0b2f7Stbbdev }
16951c0b2f7Stbbdev
17051c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) {
17151c0b2f7Stbbdev tbb::flow::make_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] );
17251c0b2f7Stbbdev }
17351c0b2f7Stbbdev
17451c0b2f7Stbbdev std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders;
17551c0b2f7Stbbdev for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) {
17651c0b2f7Stbbdev senders.clear();
17751c0b2f7Stbbdev for (size_t s = 0; s < num_senders; ++s ) {
17851c0b2f7Stbbdev senders.push_back( std::make_shared<harness_counting_sender<InputType>>() );
17951c0b2f7Stbbdev senders.back()->my_limit = N;
18051c0b2f7Stbbdev tbb::flow::make_edge( *senders.back(), exe_node );
18151c0b2f7Stbbdev }
18251c0b2f7Stbbdev
18351c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) {
18451c0b2f7Stbbdev receivers[r]->initialize_map( N, num_senders );
18551c0b2f7Stbbdev }
18651c0b2f7Stbbdev
18751c0b2f7Stbbdev utils::NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) );
18851c0b2f7Stbbdev g.wait_for_all();
18951c0b2f7Stbbdev
19051c0b2f7Stbbdev for (size_t s = 0; s < num_senders; ++s ) {
19151c0b2f7Stbbdev size_t n = senders[s]->my_received;
19251c0b2f7Stbbdev CHECK_MESSAGE( n == N, "" );
19351c0b2f7Stbbdev CHECK_MESSAGE( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_node, "" );
19451c0b2f7Stbbdev }
19551c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) {
19651c0b2f7Stbbdev receivers[r]->validate();
19751c0b2f7Stbbdev }
19851c0b2f7Stbbdev }
19951c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) {
20051c0b2f7Stbbdev tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] );
20151c0b2f7Stbbdev }
20251c0b2f7Stbbdev CHECK_MESSAGE( exe_node.try_put( InputType() ) == true, "" );
20351c0b2f7Stbbdev g.wait_for_all();
20451c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) {
20551c0b2f7Stbbdev receivers[r]->validate();
20651c0b2f7Stbbdev }
20751c0b2f7Stbbdev }
20851c0b2f7Stbbdev
20951c0b2f7Stbbdev // validate that the local body matches the global execute_count and both are correct
21051c0b2f7Stbbdev inc_functor body_copy = tbb::flow::copy_body<inc_functor>( exe_node );
21151c0b2f7Stbbdev const size_t expected_count = N/2 * MAX_NODES * MAX_NODES * ( MAX_NODES + 1 ) + MAX_NODES + Offset;
21251c0b2f7Stbbdev size_t global_count = global_execute_count;
21351c0b2f7Stbbdev size_t inc_count = body_copy.local_execute_count;
21451c0b2f7Stbbdev CHECK_MESSAGE( (global_count == expected_count && global_count == inc_count), "" );
21551c0b2f7Stbbdev }
21651c0b2f7Stbbdev }
21751c0b2f7Stbbdev
21851c0b2f7Stbbdev template< typename InputType, typename OutputTuple >
run_buffered_levels(int c)21951c0b2f7Stbbdev void run_buffered_levels( int c ) {
22051c0b2f7Stbbdev typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type;
22151c0b2f7Stbbdev buffered_levels<InputType,OutputTuple>( c, []( InputType i, output_ports_type &p ) { harness_graph_multifunction_executor<InputType, OutputTuple>::func(i,p); } );
22251c0b2f7Stbbdev buffered_levels<InputType,OutputTuple>( c, &harness_graph_multifunction_executor<InputType, OutputTuple>::func );
22351c0b2f7Stbbdev buffered_levels<InputType,OutputTuple>( c, typename harness_graph_multifunction_executor<InputType, OutputTuple>::functor() );
22451c0b2f7Stbbdev buffered_levels_with_copy<InputType,OutputTuple>( c );
22551c0b2f7Stbbdev }
22651c0b2f7Stbbdev
22751c0b2f7Stbbdev
22851c0b2f7Stbbdev //! Performs test on executable nodes with limited concurrency
22951c0b2f7Stbbdev /** These tests check:
23051c0b2f7Stbbdev 1) that the nodes will accepts puts up to the concurrency limit,
23151c0b2f7Stbbdev 2) the nodes do not exceed the concurrency limit even when run with more threads (this is checked in the harness_graph_executor),
23251c0b2f7Stbbdev 3) the nodes will receive puts from multiple successors simultaneously,
23351c0b2f7Stbbdev and 4) the nodes will send to multiple predecessors.
23451c0b2f7Stbbdev There is no checking of the contents of the messages for corruption.
23551c0b2f7Stbbdev */
23651c0b2f7Stbbdev
23751c0b2f7Stbbdev template< typename InputType, typename OutputTuple, typename Body >
concurrency_levels(size_t concurrency,Body body)23851c0b2f7Stbbdev void concurrency_levels( size_t concurrency, Body body ) {
23951c0b2f7Stbbdev typedef typename std::tuple_element<0,OutputTuple>::type OutputType;
24051c0b2f7Stbbdev for ( size_t lc = 1; lc <= concurrency; ++lc ) {
24151c0b2f7Stbbdev tbb::flow::graph g;
24251c0b2f7Stbbdev
24351c0b2f7Stbbdev // Set the execute_counter back to zero in the harness
24451c0b2f7Stbbdev harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count = 0;
24551c0b2f7Stbbdev // Set the number of current executors to zero.
24651c0b2f7Stbbdev harness_graph_multifunction_executor<InputType, OutputTuple>::current_executors = 0;
24751c0b2f7Stbbdev // Set the max allowed executors to lc. There is a check in the functor to make sure this is never exceeded.
24851c0b2f7Stbbdev harness_graph_multifunction_executor<InputType, OutputTuple>::max_executors = lc;
24951c0b2f7Stbbdev
25051c0b2f7Stbbdev
25151c0b2f7Stbbdev tbb::flow::multifunction_node< InputType, OutputTuple, tbb::flow::rejecting > exe_node( g, lc, body );
25251c0b2f7Stbbdev
25351c0b2f7Stbbdev for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
25451c0b2f7Stbbdev
25551c0b2f7Stbbdev std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers;
25651c0b2f7Stbbdev for (size_t i = 0; i < num_receivers; ++i) {
25751c0b2f7Stbbdev receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) );
25851c0b2f7Stbbdev }
25951c0b2f7Stbbdev
26051c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) {
26151c0b2f7Stbbdev tbb::flow::make_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] );
26251c0b2f7Stbbdev }
26351c0b2f7Stbbdev
26451c0b2f7Stbbdev std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders;
26551c0b2f7Stbbdev
26651c0b2f7Stbbdev for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) {
26751c0b2f7Stbbdev {
26851c0b2f7Stbbdev // Exclusively lock m to prevent exe_node from finishing
26951c0b2f7Stbbdev tbb::spin_rw_mutex::scoped_lock l(
27051c0b2f7Stbbdev harness_graph_multifunction_executor< InputType, OutputTuple>::template mutex_holder<tbb::spin_rw_mutex>::mutex
27151c0b2f7Stbbdev );
27251c0b2f7Stbbdev
27351c0b2f7Stbbdev // put to lc level, it will accept and then block at m
27451c0b2f7Stbbdev for ( size_t c = 0 ; c < lc ; ++c ) {
27551c0b2f7Stbbdev CHECK_MESSAGE( exe_node.try_put( InputType() ) == true, "" );
27651c0b2f7Stbbdev }
27751c0b2f7Stbbdev // it only accepts to lc level
27851c0b2f7Stbbdev CHECK_MESSAGE( exe_node.try_put( InputType() ) == false, "" );
27951c0b2f7Stbbdev
28051c0b2f7Stbbdev senders.clear();
28151c0b2f7Stbbdev for (size_t s = 0; s < num_senders; ++s ) {
28251c0b2f7Stbbdev senders.push_back( std::make_shared<harness_counting_sender<InputType>>() );
28351c0b2f7Stbbdev senders.back()->my_limit = N;
28451c0b2f7Stbbdev exe_node.register_predecessor( *senders.back() );
28551c0b2f7Stbbdev }
28651c0b2f7Stbbdev
28751c0b2f7Stbbdev } // release lock at end of scope, setting the exe node free to continue
28851c0b2f7Stbbdev // wait for graph to settle down
28951c0b2f7Stbbdev g.wait_for_all();
29051c0b2f7Stbbdev
29151c0b2f7Stbbdev // confirm that each sender was requested from N times
29251c0b2f7Stbbdev for (size_t s = 0; s < num_senders; ++s ) {
29351c0b2f7Stbbdev size_t n = senders[s]->my_received;
29451c0b2f7Stbbdev CHECK_MESSAGE( n == N, "" );
29551c0b2f7Stbbdev CHECK_MESSAGE( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_node, "" );
29651c0b2f7Stbbdev }
29751c0b2f7Stbbdev // confirm that each receivers got N * num_senders + the initial lc puts
29851c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) {
29951c0b2f7Stbbdev size_t n = receivers[r]->my_count;
30051c0b2f7Stbbdev CHECK_MESSAGE( n == num_senders*N+lc, "" );
30151c0b2f7Stbbdev receivers[r]->my_count = 0;
30251c0b2f7Stbbdev }
30351c0b2f7Stbbdev }
30451c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) {
30551c0b2f7Stbbdev tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] );
30651c0b2f7Stbbdev }
30751c0b2f7Stbbdev CHECK_MESSAGE( exe_node.try_put( InputType() ) == true, "" );
30851c0b2f7Stbbdev g.wait_for_all();
30951c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) {
31051c0b2f7Stbbdev CHECK_MESSAGE( int(receivers[r]->my_count) == 0, "" );
31151c0b2f7Stbbdev }
31251c0b2f7Stbbdev }
31351c0b2f7Stbbdev }
31451c0b2f7Stbbdev }
31551c0b2f7Stbbdev
31651c0b2f7Stbbdev template< typename InputType, typename OutputTuple >
run_concurrency_levels(int c)31751c0b2f7Stbbdev void run_concurrency_levels( int c ) {
31851c0b2f7Stbbdev typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type;
31951c0b2f7Stbbdev concurrency_levels<InputType,OutputTuple>( c, []( InputType i, output_ports_type &p ) { harness_graph_multifunction_executor<InputType, OutputTuple>::template tfunc<tbb::spin_rw_mutex>(i,p); } );
32051c0b2f7Stbbdev concurrency_levels<InputType,OutputTuple>( c, &harness_graph_multifunction_executor<InputType, OutputTuple>::template tfunc<tbb::spin_rw_mutex> );
32151c0b2f7Stbbdev concurrency_levels<InputType,OutputTuple>( c, typename harness_graph_multifunction_executor<InputType, OutputTuple>::template tfunctor<tbb::spin_rw_mutex>() );
32251c0b2f7Stbbdev }
32351c0b2f7Stbbdev
32451c0b2f7Stbbdev
32551c0b2f7Stbbdev struct empty_no_assign {
empty_no_assignempty_no_assign32651c0b2f7Stbbdev empty_no_assign() {}
empty_no_assignempty_no_assign32751c0b2f7Stbbdev empty_no_assign( int ) {}
operator intempty_no_assign32851c0b2f7Stbbdev operator int() { return 0; }
operator intempty_no_assign32951c0b2f7Stbbdev operator int() const { return 0; }
33051c0b2f7Stbbdev };
33151c0b2f7Stbbdev
33251c0b2f7Stbbdev template< typename InputType >
33351c0b2f7Stbbdev struct parallel_puts : private utils::NoAssign {
33451c0b2f7Stbbdev
33551c0b2f7Stbbdev tbb::flow::receiver< InputType > * const my_exe_node;
33651c0b2f7Stbbdev
parallel_putsparallel_puts33751c0b2f7Stbbdev parallel_puts( tbb::flow::receiver< InputType > &exe_node ) : my_exe_node(&exe_node) {}
33851c0b2f7Stbbdev
operator ()parallel_puts33951c0b2f7Stbbdev void operator()( int ) const {
34051c0b2f7Stbbdev for ( int i = 0; i < N; ++i ) {
34151c0b2f7Stbbdev // the nodes will accept all puts
34251c0b2f7Stbbdev CHECK_MESSAGE( my_exe_node->try_put( InputType() ) == true, "" );
34351c0b2f7Stbbdev }
34451c0b2f7Stbbdev }
34551c0b2f7Stbbdev
34651c0b2f7Stbbdev };
34751c0b2f7Stbbdev
34851c0b2f7Stbbdev //! Performs test on executable nodes with unlimited concurrency
34951c0b2f7Stbbdev /** These tests check:
35051c0b2f7Stbbdev 1) that the nodes will accept all puts
35151c0b2f7Stbbdev 2) the nodes will receive puts from multiple predecessors simultaneously,
35251c0b2f7Stbbdev and 3) the nodes will send to multiple successors.
35351c0b2f7Stbbdev There is no checking of the contents of the messages for corruption.
35451c0b2f7Stbbdev */
35551c0b2f7Stbbdev
35651c0b2f7Stbbdev template< typename InputType, typename OutputTuple, typename Body >
unlimited_concurrency(Body body)35751c0b2f7Stbbdev void unlimited_concurrency( Body body ) {
35851c0b2f7Stbbdev typedef typename std::tuple_element<0,OutputTuple>::type OutputType;
35951c0b2f7Stbbdev
36051c0b2f7Stbbdev for (unsigned int p = 1; p < 2*utils::MaxThread; ++p) {
36151c0b2f7Stbbdev tbb::flow::graph g;
36251c0b2f7Stbbdev tbb::flow::multifunction_node< InputType, OutputTuple, tbb::flow::rejecting > exe_node( g, tbb::flow::unlimited, body );
36351c0b2f7Stbbdev
36451c0b2f7Stbbdev for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
36551c0b2f7Stbbdev std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers;
36651c0b2f7Stbbdev for (size_t i = 0; i < num_receivers; ++i) {
36751c0b2f7Stbbdev receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) );
36851c0b2f7Stbbdev }
36951c0b2f7Stbbdev
37051c0b2f7Stbbdev harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count = 0;
37151c0b2f7Stbbdev
37251c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) {
37351c0b2f7Stbbdev tbb::flow::make_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] );
37451c0b2f7Stbbdev }
37551c0b2f7Stbbdev
37651c0b2f7Stbbdev utils::NativeParallelFor( p, parallel_puts<InputType>(exe_node) );
37751c0b2f7Stbbdev g.wait_for_all();
37851c0b2f7Stbbdev
37951c0b2f7Stbbdev // 2) the nodes will receive puts from multiple predecessors simultaneously,
38051c0b2f7Stbbdev size_t ec = harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count;
38151c0b2f7Stbbdev CHECK_MESSAGE( (unsigned int)ec == p*N, "" );
38251c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) {
38351c0b2f7Stbbdev size_t c = receivers[r]->my_count;
38451c0b2f7Stbbdev // 3) the nodes will send to multiple successors.
38551c0b2f7Stbbdev CHECK_MESSAGE( (unsigned int)c == p*N, "" );
38651c0b2f7Stbbdev }
38751c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) {
38851c0b2f7Stbbdev tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] );
38951c0b2f7Stbbdev }
39051c0b2f7Stbbdev }
39151c0b2f7Stbbdev }
39251c0b2f7Stbbdev }
39351c0b2f7Stbbdev
39451c0b2f7Stbbdev template< typename InputType, typename OutputTuple >
run_unlimited_concurrency()39551c0b2f7Stbbdev void run_unlimited_concurrency() {
39651c0b2f7Stbbdev harness_graph_multifunction_executor<InputType, OutputTuple>::max_executors = 0;
39751c0b2f7Stbbdev typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type;
39851c0b2f7Stbbdev unlimited_concurrency<InputType,OutputTuple>( []( InputType i, output_ports_type &p ) { harness_graph_multifunction_executor<InputType, OutputTuple>::func(i,p); } );
39951c0b2f7Stbbdev unlimited_concurrency<InputType,OutputTuple>( &harness_graph_multifunction_executor<InputType, OutputTuple>::func );
40051c0b2f7Stbbdev unlimited_concurrency<InputType,OutputTuple>( typename harness_graph_multifunction_executor<InputType, OutputTuple>::functor() );
40151c0b2f7Stbbdev }
40251c0b2f7Stbbdev
40351c0b2f7Stbbdev template<typename InputType, typename OutputTuple>
40451c0b2f7Stbbdev struct oddEvenBody {
40551c0b2f7Stbbdev typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type;
40651c0b2f7Stbbdev typedef typename std::tuple_element<0,OutputTuple>::type EvenType;
40751c0b2f7Stbbdev typedef typename std::tuple_element<1,OutputTuple>::type OddType;
operator ()oddEvenBody40851c0b2f7Stbbdev void operator() (const InputType &i, output_ports_type &p) {
40951c0b2f7Stbbdev if((int)i % 2) {
41051c0b2f7Stbbdev (void)std::get<1>(p).try_put(OddType(i));
41151c0b2f7Stbbdev }
41251c0b2f7Stbbdev else {
41351c0b2f7Stbbdev (void)std::get<0>(p).try_put(EvenType(i));
41451c0b2f7Stbbdev }
41551c0b2f7Stbbdev }
41651c0b2f7Stbbdev };
41751c0b2f7Stbbdev
41851c0b2f7Stbbdev template<typename InputType, typename OutputTuple >
run_multiport_test(int num_threads)41951c0b2f7Stbbdev void run_multiport_test(int num_threads) {
42051c0b2f7Stbbdev typedef typename tbb::flow::multifunction_node<InputType, OutputTuple> mo_node_type;
42151c0b2f7Stbbdev typedef typename std::tuple_element<0,OutputTuple>::type EvenType;
42251c0b2f7Stbbdev typedef typename std::tuple_element<1,OutputTuple>::type OddType;
42351c0b2f7Stbbdev tbb::task_arena arena(num_threads);
42451c0b2f7Stbbdev arena.execute(
42551c0b2f7Stbbdev [&] () {
42651c0b2f7Stbbdev tbb::flow::graph g;
42751c0b2f7Stbbdev mo_node_type mo_node(g, tbb::flow::unlimited, oddEvenBody<InputType, OutputTuple>() );
42851c0b2f7Stbbdev
42951c0b2f7Stbbdev tbb::flow::queue_node<EvenType> q0(g);
43051c0b2f7Stbbdev tbb::flow::queue_node<OddType> q1(g);
43151c0b2f7Stbbdev
43251c0b2f7Stbbdev tbb::flow::make_edge(tbb::flow::output_port<0>(mo_node), q0);
43351c0b2f7Stbbdev tbb::flow::make_edge(tbb::flow::output_port<1>(mo_node), q1);
43451c0b2f7Stbbdev
43551c0b2f7Stbbdev for(InputType i = 0; i < N; ++i) {
43651c0b2f7Stbbdev mo_node.try_put(i);
43751c0b2f7Stbbdev }
43851c0b2f7Stbbdev
43951c0b2f7Stbbdev g.wait_for_all();
44051c0b2f7Stbbdev for(int i = 0; i < N/2; ++i) {
44151c0b2f7Stbbdev EvenType e{};
44251c0b2f7Stbbdev OddType o{};
44351c0b2f7Stbbdev CHECK_MESSAGE( q0.try_get(e), "" );
44451c0b2f7Stbbdev CHECK_MESSAGE( (int)e % 2 == 0, "" );
44551c0b2f7Stbbdev CHECK_MESSAGE( q1.try_get(o), "" );
44651c0b2f7Stbbdev CHECK_MESSAGE( (int)o % 2 == 1, "" );
44751c0b2f7Stbbdev }
44851c0b2f7Stbbdev }
44951c0b2f7Stbbdev );
45051c0b2f7Stbbdev }
45151c0b2f7Stbbdev
45251c0b2f7Stbbdev //! Tests limited concurrency cases for nodes that accept data messages
test_concurrency(int num_threads)45351c0b2f7Stbbdev void test_concurrency(int num_threads) {
45451c0b2f7Stbbdev tbb::task_arena arena(num_threads);
45551c0b2f7Stbbdev arena.execute(
45651c0b2f7Stbbdev [&] () {
45751c0b2f7Stbbdev run_concurrency_levels<int,std::tuple<int> >(num_threads);
45851c0b2f7Stbbdev run_concurrency_levels<int,std::tuple<tbb::flow::continue_msg> >(num_threads);
45951c0b2f7Stbbdev run_buffered_levels<int, std::tuple<int> >(num_threads);
46051c0b2f7Stbbdev run_unlimited_concurrency<int, std::tuple<int> >();
46151c0b2f7Stbbdev run_unlimited_concurrency<int,std::tuple<empty_no_assign> >();
46251c0b2f7Stbbdev run_unlimited_concurrency<empty_no_assign,std::tuple<int> >();
46351c0b2f7Stbbdev run_unlimited_concurrency<empty_no_assign,std::tuple<empty_no_assign> >();
46451c0b2f7Stbbdev run_unlimited_concurrency<int,std::tuple<tbb::flow::continue_msg> >();
46551c0b2f7Stbbdev run_unlimited_concurrency<empty_no_assign,std::tuple<tbb::flow::continue_msg> >();
46651c0b2f7Stbbdev run_multiport_test<int, std::tuple<int, int> >(num_threads);
46751c0b2f7Stbbdev run_multiport_test<float, std::tuple<int, double> >(num_threads);
46851c0b2f7Stbbdev }
46951c0b2f7Stbbdev );
47051c0b2f7Stbbdev }
47151c0b2f7Stbbdev
47251c0b2f7Stbbdev template<typename Policy>
test_ports_return_references()47351c0b2f7Stbbdev void test_ports_return_references() {
47451c0b2f7Stbbdev tbb::flow::graph g;
47551c0b2f7Stbbdev typedef int InputType;
47651c0b2f7Stbbdev typedef std::tuple<int> OutputTuple;
47751c0b2f7Stbbdev tbb::flow::multifunction_node<InputType, OutputTuple, Policy> mf_node(
47851c0b2f7Stbbdev g, tbb::flow::unlimited,
47951c0b2f7Stbbdev &harness_graph_multifunction_executor<InputType, OutputTuple>::empty_func );
48051c0b2f7Stbbdev test_output_ports_return_ref(mf_node);
48151c0b2f7Stbbdev }
48251c0b2f7Stbbdev
48351c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
48451c0b2f7Stbbdev #include <array>
48551c0b2f7Stbbdev #include <vector>
48651c0b2f7Stbbdev
test_precedes()48751c0b2f7Stbbdev void test_precedes() {
48851c0b2f7Stbbdev using namespace tbb::flow;
48951c0b2f7Stbbdev
49051c0b2f7Stbbdev using multinode = multifunction_node<int, std::tuple<int, int>>;
49151c0b2f7Stbbdev
49251c0b2f7Stbbdev graph g;
49351c0b2f7Stbbdev
49451c0b2f7Stbbdev buffer_node<int> b1(g);
49551c0b2f7Stbbdev buffer_node<int> b2(g);
49651c0b2f7Stbbdev
49751c0b2f7Stbbdev multinode node(precedes(b1, b2), unlimited, [](const int& i, multinode::output_ports_type& op) -> void {
49851c0b2f7Stbbdev if (i % 2)
49951c0b2f7Stbbdev std::get<0>(op).try_put(i);
50051c0b2f7Stbbdev else
50151c0b2f7Stbbdev std::get<1>(op).try_put(i);
50251c0b2f7Stbbdev }
50351c0b2f7Stbbdev );
50451c0b2f7Stbbdev
50551c0b2f7Stbbdev node.try_put(0);
50651c0b2f7Stbbdev node.try_put(1);
50751c0b2f7Stbbdev g.wait_for_all();
50851c0b2f7Stbbdev
50951c0b2f7Stbbdev int storage;
51051c0b2f7Stbbdev CHECK_MESSAGE((b1.try_get(storage) && !b1.try_get(storage) && b2.try_get(storage) && !b2.try_get(storage)),
51151c0b2f7Stbbdev "Not exact edge quantity was made");
51251c0b2f7Stbbdev }
51351c0b2f7Stbbdev
test_follows_and_precedes_api()51451c0b2f7Stbbdev void test_follows_and_precedes_api() {
51551c0b2f7Stbbdev using multinode = tbb::flow::multifunction_node<int, std::tuple<int, int, int>>;
51651c0b2f7Stbbdev
51751c0b2f7Stbbdev std::array<int, 3> messages_for_follows = { {0, 1, 2} };
51851c0b2f7Stbbdev
51951c0b2f7Stbbdev follows_and_precedes_testing::test_follows
52051c0b2f7Stbbdev <int, tbb::flow::multifunction_node<int, std::tuple<int, int, int>>>
52151c0b2f7Stbbdev (messages_for_follows, tbb::flow::unlimited, [](const int& i, multinode::output_ports_type& op) -> void {
52251c0b2f7Stbbdev std::get<0>(op).try_put(i);
52351c0b2f7Stbbdev });
52451c0b2f7Stbbdev
52551c0b2f7Stbbdev test_precedes();
52651c0b2f7Stbbdev }
52751c0b2f7Stbbdev #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
52851c0b2f7Stbbdev
52951c0b2f7Stbbdev //! Test various node bodies with concurrency
53051c0b2f7Stbbdev //! \brief \ref error_guessing
53151c0b2f7Stbbdev TEST_CASE("Concurrency test"){
53251c0b2f7Stbbdev for( unsigned int p=utils::MinThread; p<=utils::MaxThread; ++p ) {
53351c0b2f7Stbbdev test_concurrency(p);
53451c0b2f7Stbbdev }
53551c0b2f7Stbbdev }
53651c0b2f7Stbbdev
53751c0b2f7Stbbdev //! Test return types of ports
53851c0b2f7Stbbdev //! \brief \ref error_guessing
53951c0b2f7Stbbdev TEST_CASE("Test ports retrurn references"){
54051c0b2f7Stbbdev test_ports_return_references<tbb::flow::queueing>();
54151c0b2f7Stbbdev test_ports_return_references<tbb::flow::rejecting>();
54251c0b2f7Stbbdev }
54351c0b2f7Stbbdev
54451c0b2f7Stbbdev //! NativeParallelFor testing with various concurrency settings
54551c0b2f7Stbbdev //! \brief \ref error_guessing
54651c0b2f7Stbbdev TEST_CASE("Lightweight testing"){
54751c0b2f7Stbbdev lightweight_testing::test<tbb::flow::multifunction_node>(10);
54851c0b2f7Stbbdev }
54951c0b2f7Stbbdev
55051c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
55151c0b2f7Stbbdev //! Test follows and precedes API
55251c0b2f7Stbbdev //! \brief \ref error_guessing
55351c0b2f7Stbbdev TEST_CASE("Test follows-precedes API"){
55451c0b2f7Stbbdev test_follows_and_precedes_api();
55551c0b2f7Stbbdev }
55651c0b2f7Stbbdev //! Test priority constructor with follows and precedes API
55751c0b2f7Stbbdev //! \brief \ref error_guessing
55851c0b2f7Stbbdev TEST_CASE("Test priority with follows and precedes"){
55951c0b2f7Stbbdev using namespace tbb::flow;
56051c0b2f7Stbbdev
56151c0b2f7Stbbdev using multinode = multifunction_node<int, std::tuple<int, int>>;
56251c0b2f7Stbbdev
56351c0b2f7Stbbdev graph g;
56451c0b2f7Stbbdev
56551c0b2f7Stbbdev buffer_node<int> b1(g);
56651c0b2f7Stbbdev buffer_node<int> b2(g);
56751c0b2f7Stbbdev
__anon0fcb77200802(const int& i, multinode::output_ports_type& op) 56851c0b2f7Stbbdev multinode node(precedes(b1, b2), unlimited, [](const int& i, multinode::output_ports_type& op) -> void {
56951c0b2f7Stbbdev if (i % 2)
57051c0b2f7Stbbdev std::get<0>(op).try_put(i);
57151c0b2f7Stbbdev else
57251c0b2f7Stbbdev std::get<1>(op).try_put(i);
57351c0b2f7Stbbdev }
57451c0b2f7Stbbdev , node_priority_t(0));
57551c0b2f7Stbbdev
57651c0b2f7Stbbdev node.try_put(0);
57751c0b2f7Stbbdev node.try_put(1);
57851c0b2f7Stbbdev g.wait_for_all();
57951c0b2f7Stbbdev
58051c0b2f7Stbbdev int storage;
58151c0b2f7Stbbdev CHECK_MESSAGE((b1.try_get(storage) && !b1.try_get(storage) && b2.try_get(storage) && !b2.try_get(storage)),
58251c0b2f7Stbbdev "Not exact edge quantity was made");
58351c0b2f7Stbbdev }
58451c0b2f7Stbbdev
58551c0b2f7Stbbdev #endif
58651c0b2f7Stbbdev
587*478de5b1Stbbdev #if __TBB_CPP20_CONCEPTS_PRESENT
588*478de5b1Stbbdev //! \brief \ref error_guessing
589*478de5b1Stbbdev TEST_CASE("constraints for multifunction_node input") {
590*478de5b1Stbbdev struct InputObject {
591*478de5b1Stbbdev InputObject() = default;
592*478de5b1Stbbdev InputObject( const InputObject& ) = default;
593*478de5b1Stbbdev };
594*478de5b1Stbbdev
595*478de5b1Stbbdev static_assert(utils::well_formed_instantiation<tbb::flow::multifunction_node, InputObject, int>);
596*478de5b1Stbbdev static_assert(utils::well_formed_instantiation<tbb::flow::multifunction_node, int, int>);
597*478de5b1Stbbdev static_assert(!utils::well_formed_instantiation<tbb::flow::multifunction_node, test_concepts::NonCopyable, int>);
598*478de5b1Stbbdev static_assert(!utils::well_formed_instantiation<tbb::flow::multifunction_node, test_concepts::NonDefaultInitializable, int>);
599*478de5b1Stbbdev }
600*478de5b1Stbbdev
601*478de5b1Stbbdev template <typename Input, typename Output, typename Body>
602*478de5b1Stbbdev concept can_call_multifunction_node_ctor = requires( tbb::flow::graph& graph, std::size_t concurrency, Body body,
603*478de5b1Stbbdev tbb::flow::node_priority_t priority, tbb::flow::buffer_node<int>& f ) {
604*478de5b1Stbbdev tbb::flow::multifunction_node<Input, Output>(graph, concurrency, body);
605*478de5b1Stbbdev tbb::flow::multifunction_node<Input, Output>(graph, concurrency, body, priority);
606*478de5b1Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
607*478de5b1Stbbdev tbb::flow::multifunction_node<Input, Output>(tbb::flow::follows(f), concurrency, body);
608*478de5b1Stbbdev tbb::flow::multifunction_node<Input, Output>(tbb::flow::follows(f), concurrency, body, priority);
609*478de5b1Stbbdev #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
610*478de5b1Stbbdev };
611*478de5b1Stbbdev
612*478de5b1Stbbdev //! \brief \ref error_guessing
613*478de5b1Stbbdev TEST_CASE("constraints for multifunction_node body") {
614*478de5b1Stbbdev using input_type = int;
615*478de5b1Stbbdev using output_type = std::tuple<int>;
616*478de5b1Stbbdev using namespace test_concepts::multifunction_node_body;
617*478de5b1Stbbdev
618*478de5b1Stbbdev static_assert(can_call_multifunction_node_ctor<input_type, output_type, Correct<input_type, output_type>>);
619*478de5b1Stbbdev static_assert(!can_call_multifunction_node_ctor<input_type, output_type, NonCopyable<input_type, output_type>>);
620*478de5b1Stbbdev static_assert(!can_call_multifunction_node_ctor<input_type, output_type, NonDestructible<input_type, output_type>>);
621*478de5b1Stbbdev static_assert(!can_call_multifunction_node_ctor<input_type, output_type, NoOperatorRoundBrackets<input_type, output_type>>);
622*478de5b1Stbbdev static_assert(!can_call_multifunction_node_ctor<input_type, output_type, WrongFirstInputOperatorRoundBrackets<input_type, output_type>>);
623*478de5b1Stbbdev static_assert(!can_call_multifunction_node_ctor<input_type, output_type, WrongSecondInputOperatorRoundBrackets<input_type, output_type>>);
624*478de5b1Stbbdev }
625*478de5b1Stbbdev #endif // __TBB_CPP20_CONCEPTS_PRESENT
626