151c0b2f7Stbbdev /* 2*b15aabb3Stbbdev Copyright (c) 2005-2021 Intel Corporation 351c0b2f7Stbbdev 451c0b2f7Stbbdev Licensed under the Apache License, Version 2.0 (the "License"); 551c0b2f7Stbbdev you may not use this file except in compliance with the License. 651c0b2f7Stbbdev You may obtain a copy of the License at 751c0b2f7Stbbdev 851c0b2f7Stbbdev http://www.apache.org/licenses/LICENSE-2.0 951c0b2f7Stbbdev 1051c0b2f7Stbbdev Unless required by applicable law or agreed to in writing, software 1151c0b2f7Stbbdev distributed under the License is distributed on an "AS IS" BASIS, 1251c0b2f7Stbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 1351c0b2f7Stbbdev See the License for the specific language governing permissions and 1451c0b2f7Stbbdev limitations under the License. 1551c0b2f7Stbbdev */ 1651c0b2f7Stbbdev 17*b15aabb3Stbbdev #if __INTEL_COMPILER && _MSC_VER 18*b15aabb3Stbbdev #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated 19*b15aabb3Stbbdev #endif 20*b15aabb3Stbbdev 2151c0b2f7Stbbdev #include "common/config.h" 2251c0b2f7Stbbdev 2351c0b2f7Stbbdev // TODO revamp: move parts dependent on __TBB_EXTRA_DEBUG into separate test(s) since having these 2451c0b2f7Stbbdev // parts in all of tests might make testing of the product, which is different from what is actually 2551c0b2f7Stbbdev // released. 2651c0b2f7Stbbdev #define __TBB_EXTRA_DEBUG 1 2751c0b2f7Stbbdev #include "tbb/flow_graph.h" 2851c0b2f7Stbbdev #include "tbb/spin_rw_mutex.h" 2951c0b2f7Stbbdev 3051c0b2f7Stbbdev #include "common/test.h" 3151c0b2f7Stbbdev #include "common/utils.h" 3251c0b2f7Stbbdev #include "common/graph_utils.h" 3351c0b2f7Stbbdev #include "common/test_follows_and_precedes_api.h" 3451c0b2f7Stbbdev 3551c0b2f7Stbbdev 3651c0b2f7Stbbdev //! \file test_multifunction_node.cpp 3751c0b2f7Stbbdev //! \brief Test for [flow_graph.multifunction_node] specification 3851c0b2f7Stbbdev 3951c0b2f7Stbbdev 4051c0b2f7Stbbdev #if TBB_USE_DEBUG 4151c0b2f7Stbbdev #define N 16 4251c0b2f7Stbbdev #else 4351c0b2f7Stbbdev #define N 100 4451c0b2f7Stbbdev #endif 4551c0b2f7Stbbdev #define MAX_NODES 4 4651c0b2f7Stbbdev 4751c0b2f7Stbbdev //! Performs test on function nodes with limited concurrency and buffering 4851c0b2f7Stbbdev /** These tests check: 4951c0b2f7Stbbdev 1) that the number of executing copies never exceed the concurrency limit 5051c0b2f7Stbbdev 2) that the node never rejects 5151c0b2f7Stbbdev 3) that no items are lost 5251c0b2f7Stbbdev and 4) all of this happens even if there are multiple predecessors and successors 5351c0b2f7Stbbdev */ 5451c0b2f7Stbbdev 5551c0b2f7Stbbdev //! exercise buffered multifunction_node. 5651c0b2f7Stbbdev template< typename InputType, typename OutputTuple, typename Body > 5751c0b2f7Stbbdev void buffered_levels( size_t concurrency, Body body ) { 5851c0b2f7Stbbdev typedef typename std::tuple_element<0,OutputTuple>::type OutputType; 5951c0b2f7Stbbdev // Do for lc = 1 to concurrency level 6051c0b2f7Stbbdev for ( size_t lc = 1; lc <= concurrency; ++lc ) { 6151c0b2f7Stbbdev tbb::flow::graph g; 6251c0b2f7Stbbdev 6351c0b2f7Stbbdev // Set the execute_counter back to zero in the harness 6451c0b2f7Stbbdev harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count = 0; 6551c0b2f7Stbbdev // Set the number of current executors to zero. 6651c0b2f7Stbbdev harness_graph_multifunction_executor<InputType, OutputTuple>::current_executors = 0; 6751c0b2f7Stbbdev // Set the max allowed executors to lc. There is a check in the functor to make sure this is never exceeded. 6851c0b2f7Stbbdev harness_graph_multifunction_executor<InputType, OutputTuple>::max_executors = lc; 6951c0b2f7Stbbdev 7051c0b2f7Stbbdev // Create the function_node with the appropriate concurrency level, and use default buffering 7151c0b2f7Stbbdev tbb::flow::multifunction_node< InputType, OutputTuple > exe_node( g, lc, body ); 7251c0b2f7Stbbdev 7351c0b2f7Stbbdev //Create a vector of identical exe_nodes 7451c0b2f7Stbbdev std::vector< tbb::flow::multifunction_node< InputType, OutputTuple > > exe_vec(2, exe_node); 7551c0b2f7Stbbdev 7651c0b2f7Stbbdev // exercise each of the copied nodes 7751c0b2f7Stbbdev for (size_t node_idx=0; node_idx<exe_vec.size(); ++node_idx) { 7851c0b2f7Stbbdev for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { 7951c0b2f7Stbbdev // Create num_receivers counting receivers and connect the exe_vec[node_idx] to them. 8051c0b2f7Stbbdev std::vector< std::shared_ptr<harness_mapped_receiver<OutputType>> > receivers; 8151c0b2f7Stbbdev for (size_t i = 0; i < num_receivers; i++) { 8251c0b2f7Stbbdev receivers.push_back( std::make_shared<harness_mapped_receiver<OutputType>>(g) ); 8351c0b2f7Stbbdev } 8451c0b2f7Stbbdev 8551c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 8651c0b2f7Stbbdev tbb::flow::make_edge( tbb::flow::output_port<0>(exe_vec[node_idx]), *receivers[r] ); 8751c0b2f7Stbbdev } 8851c0b2f7Stbbdev 8951c0b2f7Stbbdev // Do the test with varying numbers of senders 9051c0b2f7Stbbdev std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders; 9151c0b2f7Stbbdev for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) { 9251c0b2f7Stbbdev // Create num_senders senders, set their message limit each to N, and connect 9351c0b2f7Stbbdev // them to the exe_vec[node_idx] 9451c0b2f7Stbbdev senders.clear(); 9551c0b2f7Stbbdev for (size_t s = 0; s < num_senders; ++s ) { 9651c0b2f7Stbbdev senders.push_back( std::make_shared<harness_counting_sender<InputType>>() ); 9751c0b2f7Stbbdev senders.back()->my_limit = N; 9851c0b2f7Stbbdev tbb::flow::make_edge( *senders.back(), exe_vec[node_idx] ); 9951c0b2f7Stbbdev } 10051c0b2f7Stbbdev 10151c0b2f7Stbbdev // Initialize the receivers so they know how many senders and messages to check for 10251c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 10351c0b2f7Stbbdev receivers[r]->initialize_map( N, num_senders ); 10451c0b2f7Stbbdev } 10551c0b2f7Stbbdev 10651c0b2f7Stbbdev // Do the test 10751c0b2f7Stbbdev utils::NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) ); 10851c0b2f7Stbbdev g.wait_for_all(); 10951c0b2f7Stbbdev 11051c0b2f7Stbbdev // confirm that each sender was requested from N times 11151c0b2f7Stbbdev for (size_t s = 0; s < num_senders; ++s ) { 11251c0b2f7Stbbdev size_t n = senders[s]->my_received; 11351c0b2f7Stbbdev CHECK_MESSAGE( n == N, "" ); 11451c0b2f7Stbbdev CHECK_MESSAGE( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_vec[node_idx], "" ); 11551c0b2f7Stbbdev } 11651c0b2f7Stbbdev // validate the receivers 11751c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 11851c0b2f7Stbbdev receivers[r]->validate(); 11951c0b2f7Stbbdev } 12051c0b2f7Stbbdev } 12151c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 12251c0b2f7Stbbdev tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_vec[node_idx]), *receivers[r] ); 12351c0b2f7Stbbdev } 12451c0b2f7Stbbdev CHECK_MESSAGE( exe_vec[node_idx].try_put( InputType() ) == true, "" ); 12551c0b2f7Stbbdev g.wait_for_all(); 12651c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 12751c0b2f7Stbbdev // since it's detached, nothing should have changed 12851c0b2f7Stbbdev receivers[r]->validate(); 12951c0b2f7Stbbdev } 13051c0b2f7Stbbdev } 13151c0b2f7Stbbdev } 13251c0b2f7Stbbdev } 13351c0b2f7Stbbdev } 13451c0b2f7Stbbdev 13551c0b2f7Stbbdev const size_t Offset = 123; 13651c0b2f7Stbbdev std::atomic<size_t> global_execute_count; 13751c0b2f7Stbbdev 13851c0b2f7Stbbdev struct inc_functor { 13951c0b2f7Stbbdev 14051c0b2f7Stbbdev std::atomic<size_t> local_execute_count; 14151c0b2f7Stbbdev inc_functor( ) { local_execute_count = 0; } 14251c0b2f7Stbbdev inc_functor( const inc_functor &f ) { local_execute_count = size_t(f.local_execute_count); } 14351c0b2f7Stbbdev 14451c0b2f7Stbbdev template<typename output_ports_type> 14551c0b2f7Stbbdev void operator()( int i, output_ports_type &p ) { 14651c0b2f7Stbbdev ++global_execute_count; 14751c0b2f7Stbbdev ++local_execute_count; 14851c0b2f7Stbbdev (void)std::get<0>(p).try_put(i); 14951c0b2f7Stbbdev } 15051c0b2f7Stbbdev 15151c0b2f7Stbbdev }; 15251c0b2f7Stbbdev 15351c0b2f7Stbbdev template< typename InputType, typename OutputTuple > 15451c0b2f7Stbbdev void buffered_levels_with_copy( size_t concurrency ) { 15551c0b2f7Stbbdev typedef typename std::tuple_element<0,OutputTuple>::type OutputType; 15651c0b2f7Stbbdev // Do for lc = 1 to concurrency level 15751c0b2f7Stbbdev for ( size_t lc = 1; lc <= concurrency; ++lc ) { 15851c0b2f7Stbbdev tbb::flow::graph g; 15951c0b2f7Stbbdev 16051c0b2f7Stbbdev inc_functor cf; 16151c0b2f7Stbbdev cf.local_execute_count = Offset; 16251c0b2f7Stbbdev global_execute_count = Offset; 16351c0b2f7Stbbdev 16451c0b2f7Stbbdev tbb::flow::multifunction_node< InputType, OutputTuple > exe_node( g, lc, cf ); 16551c0b2f7Stbbdev 16651c0b2f7Stbbdev for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { 16751c0b2f7Stbbdev 16851c0b2f7Stbbdev std::vector< std::shared_ptr<harness_mapped_receiver<OutputType>> > receivers; 16951c0b2f7Stbbdev for (size_t i = 0; i < num_receivers; i++) { 17051c0b2f7Stbbdev receivers.push_back( std::make_shared<harness_mapped_receiver<OutputType>>(g) ); 17151c0b2f7Stbbdev } 17251c0b2f7Stbbdev 17351c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 17451c0b2f7Stbbdev tbb::flow::make_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] ); 17551c0b2f7Stbbdev } 17651c0b2f7Stbbdev 17751c0b2f7Stbbdev std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders; 17851c0b2f7Stbbdev for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) { 17951c0b2f7Stbbdev senders.clear(); 18051c0b2f7Stbbdev for (size_t s = 0; s < num_senders; ++s ) { 18151c0b2f7Stbbdev senders.push_back( std::make_shared<harness_counting_sender<InputType>>() ); 18251c0b2f7Stbbdev senders.back()->my_limit = N; 18351c0b2f7Stbbdev tbb::flow::make_edge( *senders.back(), exe_node ); 18451c0b2f7Stbbdev } 18551c0b2f7Stbbdev 18651c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 18751c0b2f7Stbbdev receivers[r]->initialize_map( N, num_senders ); 18851c0b2f7Stbbdev } 18951c0b2f7Stbbdev 19051c0b2f7Stbbdev utils::NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) ); 19151c0b2f7Stbbdev g.wait_for_all(); 19251c0b2f7Stbbdev 19351c0b2f7Stbbdev for (size_t s = 0; s < num_senders; ++s ) { 19451c0b2f7Stbbdev size_t n = senders[s]->my_received; 19551c0b2f7Stbbdev CHECK_MESSAGE( n == N, "" ); 19651c0b2f7Stbbdev CHECK_MESSAGE( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_node, "" ); 19751c0b2f7Stbbdev } 19851c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 19951c0b2f7Stbbdev receivers[r]->validate(); 20051c0b2f7Stbbdev } 20151c0b2f7Stbbdev } 20251c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 20351c0b2f7Stbbdev tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] ); 20451c0b2f7Stbbdev } 20551c0b2f7Stbbdev CHECK_MESSAGE( exe_node.try_put( InputType() ) == true, "" ); 20651c0b2f7Stbbdev g.wait_for_all(); 20751c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 20851c0b2f7Stbbdev receivers[r]->validate(); 20951c0b2f7Stbbdev } 21051c0b2f7Stbbdev } 21151c0b2f7Stbbdev 21251c0b2f7Stbbdev // validate that the local body matches the global execute_count and both are correct 21351c0b2f7Stbbdev inc_functor body_copy = tbb::flow::copy_body<inc_functor>( exe_node ); 21451c0b2f7Stbbdev const size_t expected_count = N/2 * MAX_NODES * MAX_NODES * ( MAX_NODES + 1 ) + MAX_NODES + Offset; 21551c0b2f7Stbbdev size_t global_count = global_execute_count; 21651c0b2f7Stbbdev size_t inc_count = body_copy.local_execute_count; 21751c0b2f7Stbbdev CHECK_MESSAGE( (global_count == expected_count && global_count == inc_count), "" ); 21851c0b2f7Stbbdev } 21951c0b2f7Stbbdev } 22051c0b2f7Stbbdev 22151c0b2f7Stbbdev template< typename InputType, typename OutputTuple > 22251c0b2f7Stbbdev void run_buffered_levels( int c ) { 22351c0b2f7Stbbdev typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type; 22451c0b2f7Stbbdev buffered_levels<InputType,OutputTuple>( c, []( InputType i, output_ports_type &p ) { harness_graph_multifunction_executor<InputType, OutputTuple>::func(i,p); } ); 22551c0b2f7Stbbdev buffered_levels<InputType,OutputTuple>( c, &harness_graph_multifunction_executor<InputType, OutputTuple>::func ); 22651c0b2f7Stbbdev buffered_levels<InputType,OutputTuple>( c, typename harness_graph_multifunction_executor<InputType, OutputTuple>::functor() ); 22751c0b2f7Stbbdev buffered_levels_with_copy<InputType,OutputTuple>( c ); 22851c0b2f7Stbbdev } 22951c0b2f7Stbbdev 23051c0b2f7Stbbdev 23151c0b2f7Stbbdev //! Performs test on executable nodes with limited concurrency 23251c0b2f7Stbbdev /** These tests check: 23351c0b2f7Stbbdev 1) that the nodes will accepts puts up to the concurrency limit, 23451c0b2f7Stbbdev 2) the nodes do not exceed the concurrency limit even when run with more threads (this is checked in the harness_graph_executor), 23551c0b2f7Stbbdev 3) the nodes will receive puts from multiple successors simultaneously, 23651c0b2f7Stbbdev and 4) the nodes will send to multiple predecessors. 23751c0b2f7Stbbdev There is no checking of the contents of the messages for corruption. 23851c0b2f7Stbbdev */ 23951c0b2f7Stbbdev 24051c0b2f7Stbbdev template< typename InputType, typename OutputTuple, typename Body > 24151c0b2f7Stbbdev void concurrency_levels( size_t concurrency, Body body ) { 24251c0b2f7Stbbdev typedef typename std::tuple_element<0,OutputTuple>::type OutputType; 24351c0b2f7Stbbdev for ( size_t lc = 1; lc <= concurrency; ++lc ) { 24451c0b2f7Stbbdev tbb::flow::graph g; 24551c0b2f7Stbbdev 24651c0b2f7Stbbdev // Set the execute_counter back to zero in the harness 24751c0b2f7Stbbdev harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count = 0; 24851c0b2f7Stbbdev // Set the number of current executors to zero. 24951c0b2f7Stbbdev harness_graph_multifunction_executor<InputType, OutputTuple>::current_executors = 0; 25051c0b2f7Stbbdev // Set the max allowed executors to lc. There is a check in the functor to make sure this is never exceeded. 25151c0b2f7Stbbdev harness_graph_multifunction_executor<InputType, OutputTuple>::max_executors = lc; 25251c0b2f7Stbbdev 25351c0b2f7Stbbdev 25451c0b2f7Stbbdev tbb::flow::multifunction_node< InputType, OutputTuple, tbb::flow::rejecting > exe_node( g, lc, body ); 25551c0b2f7Stbbdev 25651c0b2f7Stbbdev for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { 25751c0b2f7Stbbdev 25851c0b2f7Stbbdev std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers; 25951c0b2f7Stbbdev for (size_t i = 0; i < num_receivers; ++i) { 26051c0b2f7Stbbdev receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) ); 26151c0b2f7Stbbdev } 26251c0b2f7Stbbdev 26351c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 26451c0b2f7Stbbdev tbb::flow::make_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] ); 26551c0b2f7Stbbdev } 26651c0b2f7Stbbdev 26751c0b2f7Stbbdev std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders; 26851c0b2f7Stbbdev 26951c0b2f7Stbbdev for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) { 27051c0b2f7Stbbdev { 27151c0b2f7Stbbdev // Exclusively lock m to prevent exe_node from finishing 27251c0b2f7Stbbdev tbb::spin_rw_mutex::scoped_lock l( 27351c0b2f7Stbbdev harness_graph_multifunction_executor< InputType, OutputTuple>::template mutex_holder<tbb::spin_rw_mutex>::mutex 27451c0b2f7Stbbdev ); 27551c0b2f7Stbbdev 27651c0b2f7Stbbdev // put to lc level, it will accept and then block at m 27751c0b2f7Stbbdev for ( size_t c = 0 ; c < lc ; ++c ) { 27851c0b2f7Stbbdev CHECK_MESSAGE( exe_node.try_put( InputType() ) == true, "" ); 27951c0b2f7Stbbdev } 28051c0b2f7Stbbdev // it only accepts to lc level 28151c0b2f7Stbbdev CHECK_MESSAGE( exe_node.try_put( InputType() ) == false, "" ); 28251c0b2f7Stbbdev 28351c0b2f7Stbbdev senders.clear(); 28451c0b2f7Stbbdev for (size_t s = 0; s < num_senders; ++s ) { 28551c0b2f7Stbbdev senders.push_back( std::make_shared<harness_counting_sender<InputType>>() ); 28651c0b2f7Stbbdev senders.back()->my_limit = N; 28751c0b2f7Stbbdev exe_node.register_predecessor( *senders.back() ); 28851c0b2f7Stbbdev } 28951c0b2f7Stbbdev 29051c0b2f7Stbbdev } // release lock at end of scope, setting the exe node free to continue 29151c0b2f7Stbbdev // wait for graph to settle down 29251c0b2f7Stbbdev g.wait_for_all(); 29351c0b2f7Stbbdev 29451c0b2f7Stbbdev // confirm that each sender was requested from N times 29551c0b2f7Stbbdev for (size_t s = 0; s < num_senders; ++s ) { 29651c0b2f7Stbbdev size_t n = senders[s]->my_received; 29751c0b2f7Stbbdev CHECK_MESSAGE( n == N, "" ); 29851c0b2f7Stbbdev CHECK_MESSAGE( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_node, "" ); 29951c0b2f7Stbbdev } 30051c0b2f7Stbbdev // confirm that each receivers got N * num_senders + the initial lc puts 30151c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 30251c0b2f7Stbbdev size_t n = receivers[r]->my_count; 30351c0b2f7Stbbdev CHECK_MESSAGE( n == num_senders*N+lc, "" ); 30451c0b2f7Stbbdev receivers[r]->my_count = 0; 30551c0b2f7Stbbdev } 30651c0b2f7Stbbdev } 30751c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 30851c0b2f7Stbbdev tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] ); 30951c0b2f7Stbbdev } 31051c0b2f7Stbbdev CHECK_MESSAGE( exe_node.try_put( InputType() ) == true, "" ); 31151c0b2f7Stbbdev g.wait_for_all(); 31251c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 31351c0b2f7Stbbdev CHECK_MESSAGE( int(receivers[r]->my_count) == 0, "" ); 31451c0b2f7Stbbdev } 31551c0b2f7Stbbdev } 31651c0b2f7Stbbdev } 31751c0b2f7Stbbdev } 31851c0b2f7Stbbdev 31951c0b2f7Stbbdev template< typename InputType, typename OutputTuple > 32051c0b2f7Stbbdev void run_concurrency_levels( int c ) { 32151c0b2f7Stbbdev typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type; 32251c0b2f7Stbbdev concurrency_levels<InputType,OutputTuple>( c, []( InputType i, output_ports_type &p ) { harness_graph_multifunction_executor<InputType, OutputTuple>::template tfunc<tbb::spin_rw_mutex>(i,p); } ); 32351c0b2f7Stbbdev concurrency_levels<InputType,OutputTuple>( c, &harness_graph_multifunction_executor<InputType, OutputTuple>::template tfunc<tbb::spin_rw_mutex> ); 32451c0b2f7Stbbdev concurrency_levels<InputType,OutputTuple>( c, typename harness_graph_multifunction_executor<InputType, OutputTuple>::template tfunctor<tbb::spin_rw_mutex>() ); 32551c0b2f7Stbbdev } 32651c0b2f7Stbbdev 32751c0b2f7Stbbdev 32851c0b2f7Stbbdev struct empty_no_assign { 32951c0b2f7Stbbdev empty_no_assign() {} 33051c0b2f7Stbbdev empty_no_assign( int ) {} 33151c0b2f7Stbbdev operator int() { return 0; } 33251c0b2f7Stbbdev operator int() const { return 0; } 33351c0b2f7Stbbdev }; 33451c0b2f7Stbbdev 33551c0b2f7Stbbdev template< typename InputType > 33651c0b2f7Stbbdev struct parallel_puts : private utils::NoAssign { 33751c0b2f7Stbbdev 33851c0b2f7Stbbdev tbb::flow::receiver< InputType > * const my_exe_node; 33951c0b2f7Stbbdev 34051c0b2f7Stbbdev parallel_puts( tbb::flow::receiver< InputType > &exe_node ) : my_exe_node(&exe_node) {} 34151c0b2f7Stbbdev 34251c0b2f7Stbbdev void operator()( int ) const { 34351c0b2f7Stbbdev for ( int i = 0; i < N; ++i ) { 34451c0b2f7Stbbdev // the nodes will accept all puts 34551c0b2f7Stbbdev CHECK_MESSAGE( my_exe_node->try_put( InputType() ) == true, "" ); 34651c0b2f7Stbbdev } 34751c0b2f7Stbbdev } 34851c0b2f7Stbbdev 34951c0b2f7Stbbdev }; 35051c0b2f7Stbbdev 35151c0b2f7Stbbdev //! Performs test on executable nodes with unlimited concurrency 35251c0b2f7Stbbdev /** These tests check: 35351c0b2f7Stbbdev 1) that the nodes will accept all puts 35451c0b2f7Stbbdev 2) the nodes will receive puts from multiple predecessors simultaneously, 35551c0b2f7Stbbdev and 3) the nodes will send to multiple successors. 35651c0b2f7Stbbdev There is no checking of the contents of the messages for corruption. 35751c0b2f7Stbbdev */ 35851c0b2f7Stbbdev 35951c0b2f7Stbbdev template< typename InputType, typename OutputTuple, typename Body > 36051c0b2f7Stbbdev void unlimited_concurrency( Body body ) { 36151c0b2f7Stbbdev typedef typename std::tuple_element<0,OutputTuple>::type OutputType; 36251c0b2f7Stbbdev 36351c0b2f7Stbbdev for (unsigned int p = 1; p < 2*utils::MaxThread; ++p) { 36451c0b2f7Stbbdev tbb::flow::graph g; 36551c0b2f7Stbbdev tbb::flow::multifunction_node< InputType, OutputTuple, tbb::flow::rejecting > exe_node( g, tbb::flow::unlimited, body ); 36651c0b2f7Stbbdev 36751c0b2f7Stbbdev for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { 36851c0b2f7Stbbdev std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers; 36951c0b2f7Stbbdev for (size_t i = 0; i < num_receivers; ++i) { 37051c0b2f7Stbbdev receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) ); 37151c0b2f7Stbbdev } 37251c0b2f7Stbbdev 37351c0b2f7Stbbdev harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count = 0; 37451c0b2f7Stbbdev 37551c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 37651c0b2f7Stbbdev tbb::flow::make_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] ); 37751c0b2f7Stbbdev } 37851c0b2f7Stbbdev 37951c0b2f7Stbbdev utils::NativeParallelFor( p, parallel_puts<InputType>(exe_node) ); 38051c0b2f7Stbbdev g.wait_for_all(); 38151c0b2f7Stbbdev 38251c0b2f7Stbbdev // 2) the nodes will receive puts from multiple predecessors simultaneously, 38351c0b2f7Stbbdev size_t ec = harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count; 38451c0b2f7Stbbdev CHECK_MESSAGE( (unsigned int)ec == p*N, "" ); 38551c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 38651c0b2f7Stbbdev size_t c = receivers[r]->my_count; 38751c0b2f7Stbbdev // 3) the nodes will send to multiple successors. 38851c0b2f7Stbbdev CHECK_MESSAGE( (unsigned int)c == p*N, "" ); 38951c0b2f7Stbbdev } 39051c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 39151c0b2f7Stbbdev tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] ); 39251c0b2f7Stbbdev } 39351c0b2f7Stbbdev } 39451c0b2f7Stbbdev } 39551c0b2f7Stbbdev } 39651c0b2f7Stbbdev 39751c0b2f7Stbbdev template< typename InputType, typename OutputTuple > 39851c0b2f7Stbbdev void run_unlimited_concurrency() { 39951c0b2f7Stbbdev harness_graph_multifunction_executor<InputType, OutputTuple>::max_executors = 0; 40051c0b2f7Stbbdev typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type; 40151c0b2f7Stbbdev unlimited_concurrency<InputType,OutputTuple>( []( InputType i, output_ports_type &p ) { harness_graph_multifunction_executor<InputType, OutputTuple>::func(i,p); } ); 40251c0b2f7Stbbdev unlimited_concurrency<InputType,OutputTuple>( &harness_graph_multifunction_executor<InputType, OutputTuple>::func ); 40351c0b2f7Stbbdev unlimited_concurrency<InputType,OutputTuple>( typename harness_graph_multifunction_executor<InputType, OutputTuple>::functor() ); 40451c0b2f7Stbbdev } 40551c0b2f7Stbbdev 40651c0b2f7Stbbdev template<typename InputType, typename OutputTuple> 40751c0b2f7Stbbdev struct oddEvenBody { 40851c0b2f7Stbbdev typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type; 40951c0b2f7Stbbdev typedef typename std::tuple_element<0,OutputTuple>::type EvenType; 41051c0b2f7Stbbdev typedef typename std::tuple_element<1,OutputTuple>::type OddType; 41151c0b2f7Stbbdev void operator() (const InputType &i, output_ports_type &p) { 41251c0b2f7Stbbdev if((int)i % 2) { 41351c0b2f7Stbbdev (void)std::get<1>(p).try_put(OddType(i)); 41451c0b2f7Stbbdev } 41551c0b2f7Stbbdev else { 41651c0b2f7Stbbdev (void)std::get<0>(p).try_put(EvenType(i)); 41751c0b2f7Stbbdev } 41851c0b2f7Stbbdev } 41951c0b2f7Stbbdev }; 42051c0b2f7Stbbdev 42151c0b2f7Stbbdev template<typename InputType, typename OutputTuple > 42251c0b2f7Stbbdev void run_multiport_test(int num_threads) { 42351c0b2f7Stbbdev typedef typename tbb::flow::multifunction_node<InputType, OutputTuple> mo_node_type; 42451c0b2f7Stbbdev typedef typename std::tuple_element<0,OutputTuple>::type EvenType; 42551c0b2f7Stbbdev typedef typename std::tuple_element<1,OutputTuple>::type OddType; 42651c0b2f7Stbbdev tbb::task_arena arena(num_threads); 42751c0b2f7Stbbdev arena.execute( 42851c0b2f7Stbbdev [&] () { 42951c0b2f7Stbbdev tbb::flow::graph g; 43051c0b2f7Stbbdev mo_node_type mo_node(g, tbb::flow::unlimited, oddEvenBody<InputType, OutputTuple>() ); 43151c0b2f7Stbbdev 43251c0b2f7Stbbdev tbb::flow::queue_node<EvenType> q0(g); 43351c0b2f7Stbbdev tbb::flow::queue_node<OddType> q1(g); 43451c0b2f7Stbbdev 43551c0b2f7Stbbdev tbb::flow::make_edge(tbb::flow::output_port<0>(mo_node), q0); 43651c0b2f7Stbbdev tbb::flow::make_edge(tbb::flow::output_port<1>(mo_node), q1); 43751c0b2f7Stbbdev 43851c0b2f7Stbbdev for(InputType i = 0; i < N; ++i) { 43951c0b2f7Stbbdev mo_node.try_put(i); 44051c0b2f7Stbbdev } 44151c0b2f7Stbbdev 44251c0b2f7Stbbdev g.wait_for_all(); 44351c0b2f7Stbbdev for(int i = 0; i < N/2; ++i) { 44451c0b2f7Stbbdev EvenType e{}; 44551c0b2f7Stbbdev OddType o{}; 44651c0b2f7Stbbdev CHECK_MESSAGE( q0.try_get(e), "" ); 44751c0b2f7Stbbdev CHECK_MESSAGE( (int)e % 2 == 0, "" ); 44851c0b2f7Stbbdev CHECK_MESSAGE( q1.try_get(o), "" ); 44951c0b2f7Stbbdev CHECK_MESSAGE( (int)o % 2 == 1, "" ); 45051c0b2f7Stbbdev } 45151c0b2f7Stbbdev } 45251c0b2f7Stbbdev ); 45351c0b2f7Stbbdev } 45451c0b2f7Stbbdev 45551c0b2f7Stbbdev //! Tests limited concurrency cases for nodes that accept data messages 45651c0b2f7Stbbdev void test_concurrency(int num_threads) { 45751c0b2f7Stbbdev tbb::task_arena arena(num_threads); 45851c0b2f7Stbbdev arena.execute( 45951c0b2f7Stbbdev [&] () { 46051c0b2f7Stbbdev run_concurrency_levels<int,std::tuple<int> >(num_threads); 46151c0b2f7Stbbdev run_concurrency_levels<int,std::tuple<tbb::flow::continue_msg> >(num_threads); 46251c0b2f7Stbbdev run_buffered_levels<int, std::tuple<int> >(num_threads); 46351c0b2f7Stbbdev run_unlimited_concurrency<int, std::tuple<int> >(); 46451c0b2f7Stbbdev run_unlimited_concurrency<int,std::tuple<empty_no_assign> >(); 46551c0b2f7Stbbdev run_unlimited_concurrency<empty_no_assign,std::tuple<int> >(); 46651c0b2f7Stbbdev run_unlimited_concurrency<empty_no_assign,std::tuple<empty_no_assign> >(); 46751c0b2f7Stbbdev run_unlimited_concurrency<int,std::tuple<tbb::flow::continue_msg> >(); 46851c0b2f7Stbbdev run_unlimited_concurrency<empty_no_assign,std::tuple<tbb::flow::continue_msg> >(); 46951c0b2f7Stbbdev run_multiport_test<int, std::tuple<int, int> >(num_threads); 47051c0b2f7Stbbdev run_multiport_test<float, std::tuple<int, double> >(num_threads); 47151c0b2f7Stbbdev } 47251c0b2f7Stbbdev ); 47351c0b2f7Stbbdev } 47451c0b2f7Stbbdev 47551c0b2f7Stbbdev template<typename Policy> 47651c0b2f7Stbbdev void test_ports_return_references() { 47751c0b2f7Stbbdev tbb::flow::graph g; 47851c0b2f7Stbbdev typedef int InputType; 47951c0b2f7Stbbdev typedef std::tuple<int> OutputTuple; 48051c0b2f7Stbbdev tbb::flow::multifunction_node<InputType, OutputTuple, Policy> mf_node( 48151c0b2f7Stbbdev g, tbb::flow::unlimited, 48251c0b2f7Stbbdev &harness_graph_multifunction_executor<InputType, OutputTuple>::empty_func ); 48351c0b2f7Stbbdev test_output_ports_return_ref(mf_node); 48451c0b2f7Stbbdev } 48551c0b2f7Stbbdev 48651c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 48751c0b2f7Stbbdev #include <array> 48851c0b2f7Stbbdev #include <vector> 48951c0b2f7Stbbdev 49051c0b2f7Stbbdev void test_precedes() { 49151c0b2f7Stbbdev using namespace tbb::flow; 49251c0b2f7Stbbdev 49351c0b2f7Stbbdev using multinode = multifunction_node<int, std::tuple<int, int>>; 49451c0b2f7Stbbdev 49551c0b2f7Stbbdev graph g; 49651c0b2f7Stbbdev 49751c0b2f7Stbbdev buffer_node<int> b1(g); 49851c0b2f7Stbbdev buffer_node<int> b2(g); 49951c0b2f7Stbbdev 50051c0b2f7Stbbdev multinode node(precedes(b1, b2), unlimited, [](const int& i, multinode::output_ports_type& op) -> void { 50151c0b2f7Stbbdev if (i % 2) 50251c0b2f7Stbbdev std::get<0>(op).try_put(i); 50351c0b2f7Stbbdev else 50451c0b2f7Stbbdev std::get<1>(op).try_put(i); 50551c0b2f7Stbbdev } 50651c0b2f7Stbbdev ); 50751c0b2f7Stbbdev 50851c0b2f7Stbbdev node.try_put(0); 50951c0b2f7Stbbdev node.try_put(1); 51051c0b2f7Stbbdev g.wait_for_all(); 51151c0b2f7Stbbdev 51251c0b2f7Stbbdev int storage; 51351c0b2f7Stbbdev CHECK_MESSAGE((b1.try_get(storage) && !b1.try_get(storage) && b2.try_get(storage) && !b2.try_get(storage)), 51451c0b2f7Stbbdev "Not exact edge quantity was made"); 51551c0b2f7Stbbdev } 51651c0b2f7Stbbdev 51751c0b2f7Stbbdev void test_follows_and_precedes_api() { 51851c0b2f7Stbbdev using multinode = tbb::flow::multifunction_node<int, std::tuple<int, int, int>>; 51951c0b2f7Stbbdev 52051c0b2f7Stbbdev std::array<int, 3> messages_for_follows = { {0, 1, 2} }; 52151c0b2f7Stbbdev 52251c0b2f7Stbbdev follows_and_precedes_testing::test_follows 52351c0b2f7Stbbdev <int, tbb::flow::multifunction_node<int, std::tuple<int, int, int>>> 52451c0b2f7Stbbdev (messages_for_follows, tbb::flow::unlimited, [](const int& i, multinode::output_ports_type& op) -> void { 52551c0b2f7Stbbdev std::get<0>(op).try_put(i); 52651c0b2f7Stbbdev }); 52751c0b2f7Stbbdev 52851c0b2f7Stbbdev test_precedes(); 52951c0b2f7Stbbdev } 53051c0b2f7Stbbdev #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 53151c0b2f7Stbbdev 53251c0b2f7Stbbdev //! Test various node bodies with concurrency 53351c0b2f7Stbbdev //! \brief \ref error_guessing 53451c0b2f7Stbbdev TEST_CASE("Concurrency test"){ 53551c0b2f7Stbbdev for( unsigned int p=utils::MinThread; p<=utils::MaxThread; ++p ) { 53651c0b2f7Stbbdev test_concurrency(p); 53751c0b2f7Stbbdev } 53851c0b2f7Stbbdev } 53951c0b2f7Stbbdev 54051c0b2f7Stbbdev //! Test return types of ports 54151c0b2f7Stbbdev //! \brief \ref error_guessing 54251c0b2f7Stbbdev TEST_CASE("Test ports retrurn references"){ 54351c0b2f7Stbbdev test_ports_return_references<tbb::flow::queueing>(); 54451c0b2f7Stbbdev test_ports_return_references<tbb::flow::rejecting>(); 54551c0b2f7Stbbdev } 54651c0b2f7Stbbdev 54751c0b2f7Stbbdev //! NativeParallelFor testing with various concurrency settings 54851c0b2f7Stbbdev //! \brief \ref error_guessing 54951c0b2f7Stbbdev TEST_CASE("Lightweight testing"){ 55051c0b2f7Stbbdev lightweight_testing::test<tbb::flow::multifunction_node>(10); 55151c0b2f7Stbbdev } 55251c0b2f7Stbbdev 55351c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 55451c0b2f7Stbbdev //! Test follows and precedes API 55551c0b2f7Stbbdev //! \brief \ref error_guessing 55651c0b2f7Stbbdev TEST_CASE("Test follows-precedes API"){ 55751c0b2f7Stbbdev test_follows_and_precedes_api(); 55851c0b2f7Stbbdev } 55951c0b2f7Stbbdev //! Test priority constructor with follows and precedes API 56051c0b2f7Stbbdev //! \brief \ref error_guessing 56151c0b2f7Stbbdev TEST_CASE("Test priority with follows and precedes"){ 56251c0b2f7Stbbdev using namespace tbb::flow; 56351c0b2f7Stbbdev 56451c0b2f7Stbbdev using multinode = multifunction_node<int, std::tuple<int, int>>; 56551c0b2f7Stbbdev 56651c0b2f7Stbbdev graph g; 56751c0b2f7Stbbdev 56851c0b2f7Stbbdev buffer_node<int> b1(g); 56951c0b2f7Stbbdev buffer_node<int> b2(g); 57051c0b2f7Stbbdev 57151c0b2f7Stbbdev multinode node(precedes(b1, b2), unlimited, [](const int& i, multinode::output_ports_type& op) -> void { 57251c0b2f7Stbbdev if (i % 2) 57351c0b2f7Stbbdev std::get<0>(op).try_put(i); 57451c0b2f7Stbbdev else 57551c0b2f7Stbbdev std::get<1>(op).try_put(i); 57651c0b2f7Stbbdev } 57751c0b2f7Stbbdev , node_priority_t(0)); 57851c0b2f7Stbbdev 57951c0b2f7Stbbdev node.try_put(0); 58051c0b2f7Stbbdev node.try_put(1); 58151c0b2f7Stbbdev g.wait_for_all(); 58251c0b2f7Stbbdev 58351c0b2f7Stbbdev int storage; 58451c0b2f7Stbbdev CHECK_MESSAGE((b1.try_get(storage) && !b1.try_get(storage) && b2.try_get(storage) && !b2.try_get(storage)), 58551c0b2f7Stbbdev "Not exact edge quantity was made"); 58651c0b2f7Stbbdev } 58751c0b2f7Stbbdev 58851c0b2f7Stbbdev #endif 58951c0b2f7Stbbdev 590