151c0b2f7Stbbdev /* 2b15aabb3Stbbdev Copyright (c) 2005-2021 Intel Corporation 351c0b2f7Stbbdev 451c0b2f7Stbbdev Licensed under the Apache License, Version 2.0 (the "License"); 551c0b2f7Stbbdev you may not use this file except in compliance with the License. 651c0b2f7Stbbdev You may obtain a copy of the License at 751c0b2f7Stbbdev 851c0b2f7Stbbdev http://www.apache.org/licenses/LICENSE-2.0 951c0b2f7Stbbdev 1051c0b2f7Stbbdev Unless required by applicable law or agreed to in writing, software 1151c0b2f7Stbbdev distributed under the License is distributed on an "AS IS" BASIS, 1251c0b2f7Stbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 1351c0b2f7Stbbdev See the License for the specific language governing permissions and 1451c0b2f7Stbbdev limitations under the License. 1551c0b2f7Stbbdev */ 1651c0b2f7Stbbdev 17b15aabb3Stbbdev #if __INTEL_COMPILER && _MSC_VER 18b15aabb3Stbbdev #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated 19b15aabb3Stbbdev #endif 20b15aabb3Stbbdev 2151c0b2f7Stbbdev #include "common/config.h" 2251c0b2f7Stbbdev 2351c0b2f7Stbbdev #include "tbb/flow_graph.h" 2451c0b2f7Stbbdev #include "tbb/spin_rw_mutex.h" 2551c0b2f7Stbbdev #include "tbb/global_control.h" 2651c0b2f7Stbbdev 2751c0b2f7Stbbdev #include "common/test.h" 2851c0b2f7Stbbdev #include "common/utils.h" 2951c0b2f7Stbbdev #include "common/graph_utils.h" 3051c0b2f7Stbbdev #include "common/test_follows_and_precedes_api.h" 31*478de5b1Stbbdev #include "common/concepts_common.h" 3251c0b2f7Stbbdev 3351c0b2f7Stbbdev 3451c0b2f7Stbbdev //! \file test_function_node.cpp 3551c0b2f7Stbbdev //! \brief Test for [flow_graph.function_node] specification 3651c0b2f7Stbbdev 3751c0b2f7Stbbdev 3851c0b2f7Stbbdev #define N 100 3951c0b2f7Stbbdev #define MAX_NODES 4 4051c0b2f7Stbbdev 4151c0b2f7Stbbdev //! Performs test on function nodes with limited concurrency and buffering 4251c0b2f7Stbbdev /** These tests check: 4351c0b2f7Stbbdev 1) that the number of executing copies never exceed the concurrency limit 4451c0b2f7Stbbdev 2) that the node never rejects 4551c0b2f7Stbbdev 3) that no items are lost 4651c0b2f7Stbbdev and 4) all of this happens even if there are multiple predecessors and successors 4751c0b2f7Stbbdev */ 4851c0b2f7Stbbdev 4951c0b2f7Stbbdev template<typename IO> 5051c0b2f7Stbbdev struct pass_through { 5151c0b2f7Stbbdev IO operator()(const IO& i) { return i; } 5251c0b2f7Stbbdev }; 5351c0b2f7Stbbdev 5451c0b2f7Stbbdev template< typename InputType, typename OutputType, typename Body > 5551c0b2f7Stbbdev void buffered_levels( size_t concurrency, Body body ) { 5651c0b2f7Stbbdev 5751c0b2f7Stbbdev // Do for lc = 1 to concurrency level 5851c0b2f7Stbbdev for ( size_t lc = 1; lc <= concurrency; ++lc ) { 5951c0b2f7Stbbdev tbb::flow::graph g; 6051c0b2f7Stbbdev 6151c0b2f7Stbbdev // Set the execute_counter back to zero in the harness 6251c0b2f7Stbbdev harness_graph_executor<InputType, OutputType>::execute_count = 0; 6351c0b2f7Stbbdev // Set the number of current executors to zero. 6451c0b2f7Stbbdev harness_graph_executor<InputType, OutputType>::current_executors = 0; 6551c0b2f7Stbbdev // Set the max allowed executors to lc. There is a check in the functor to make sure this is never exceeded. 6651c0b2f7Stbbdev harness_graph_executor<InputType, OutputType>::max_executors = lc; 6751c0b2f7Stbbdev 6851c0b2f7Stbbdev // Create the function_node with the appropriate concurrency level, and use default buffering 6951c0b2f7Stbbdev tbb::flow::function_node< InputType, OutputType > exe_node( g, lc, body ); 7051c0b2f7Stbbdev tbb::flow::function_node<InputType, InputType> pass_thru( g, tbb::flow::unlimited, pass_through<InputType>()); 7151c0b2f7Stbbdev 7251c0b2f7Stbbdev // Create a vector of identical exe_nodes and pass_thrus 7351c0b2f7Stbbdev std::vector< tbb::flow::function_node< InputType, OutputType > > exe_vec(2, exe_node); 7451c0b2f7Stbbdev std::vector< tbb::flow::function_node< InputType, InputType > > pass_thru_vec(2, pass_thru); 7551c0b2f7Stbbdev // Attach each pass_thru to its corresponding exe_node 7651c0b2f7Stbbdev for (size_t node_idx=0; node_idx<exe_vec.size(); ++node_idx) { 7751c0b2f7Stbbdev tbb::flow::make_edge(pass_thru_vec[node_idx], exe_vec[node_idx]); 7851c0b2f7Stbbdev } 7951c0b2f7Stbbdev 8051c0b2f7Stbbdev // TODO: why the test is executed serially for the node pairs, not concurrently? 8151c0b2f7Stbbdev for (size_t node_idx=0; node_idx<exe_vec.size(); ++node_idx) { 8251c0b2f7Stbbdev // For num_receivers = 1 to MAX_NODES 8351c0b2f7Stbbdev for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { 8451c0b2f7Stbbdev // Create num_receivers counting receivers and connect the exe_vec[node_idx] to them. 8551c0b2f7Stbbdev std::vector< std::shared_ptr<harness_mapped_receiver<OutputType>> > receivers; 8651c0b2f7Stbbdev for (size_t i = 0; i < num_receivers; i++) { 8751c0b2f7Stbbdev receivers.push_back( std::make_shared<harness_mapped_receiver<OutputType>>(g) ); 8851c0b2f7Stbbdev } 8951c0b2f7Stbbdev 9051c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 9151c0b2f7Stbbdev tbb::flow::make_edge( exe_vec[node_idx], *receivers[r] ); 9251c0b2f7Stbbdev } 9351c0b2f7Stbbdev 9451c0b2f7Stbbdev // Do the test with varying numbers of senders 9551c0b2f7Stbbdev std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders; 9651c0b2f7Stbbdev for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) { 9751c0b2f7Stbbdev // Create num_senders senders, set there message limit each to N, and connect them to 9851c0b2f7Stbbdev // pass_thru_vec[node_idx] 9951c0b2f7Stbbdev senders.clear(); 10051c0b2f7Stbbdev for (size_t s = 0; s < num_senders; ++s ) { 10151c0b2f7Stbbdev senders.push_back( std::make_shared<harness_counting_sender<InputType>>() ); 10251c0b2f7Stbbdev senders.back()->my_limit = N; 10351c0b2f7Stbbdev senders.back()->register_successor(pass_thru_vec[node_idx] ); 10451c0b2f7Stbbdev } 10551c0b2f7Stbbdev 10651c0b2f7Stbbdev // Initialize the receivers so they know how many senders and messages to check for 10751c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 10851c0b2f7Stbbdev receivers[r]->initialize_map( N, num_senders ); 10951c0b2f7Stbbdev } 11051c0b2f7Stbbdev 11151c0b2f7Stbbdev // Do the test 11251c0b2f7Stbbdev utils::NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) ); 11351c0b2f7Stbbdev g.wait_for_all(); 11451c0b2f7Stbbdev 11551c0b2f7Stbbdev // confirm that each sender was requested from N times 11651c0b2f7Stbbdev for (size_t s = 0; s < num_senders; ++s ) { 11751c0b2f7Stbbdev size_t n = senders[s]->my_received; 11851c0b2f7Stbbdev CHECK( n == N ); 11951c0b2f7Stbbdev CHECK( senders[s]->my_receiver.load(std::memory_order_relaxed) == &pass_thru_vec[node_idx] ); 12051c0b2f7Stbbdev } 12151c0b2f7Stbbdev // validate the receivers 12251c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 12351c0b2f7Stbbdev receivers[r]->validate(); 12451c0b2f7Stbbdev } 12551c0b2f7Stbbdev } 12651c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 12751c0b2f7Stbbdev tbb::flow::remove_edge( exe_vec[node_idx], *receivers[r] ); 12851c0b2f7Stbbdev } 12951c0b2f7Stbbdev CHECK( exe_vec[node_idx].try_put( InputType() ) == true ); 13051c0b2f7Stbbdev g.wait_for_all(); 13151c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 13251c0b2f7Stbbdev // since it's detached, nothing should have changed 13351c0b2f7Stbbdev receivers[r]->validate(); 13451c0b2f7Stbbdev } 13551c0b2f7Stbbdev 13651c0b2f7Stbbdev } // for num_receivers 13751c0b2f7Stbbdev } // for node_idx 13851c0b2f7Stbbdev } // for concurrency level lc 13951c0b2f7Stbbdev } 14051c0b2f7Stbbdev 14151c0b2f7Stbbdev const size_t Offset = 123; 14251c0b2f7Stbbdev std::atomic<size_t> global_execute_count; 14351c0b2f7Stbbdev 14451c0b2f7Stbbdev struct inc_functor { 14551c0b2f7Stbbdev 14651c0b2f7Stbbdev std::atomic<size_t> local_execute_count; 14751c0b2f7Stbbdev inc_functor( ) { local_execute_count = 0; } 14851c0b2f7Stbbdev inc_functor( const inc_functor &f ) { local_execute_count = size_t(f.local_execute_count); } 14951c0b2f7Stbbdev void operator=( const inc_functor &f ) { local_execute_count = size_t(f.local_execute_count); } 15051c0b2f7Stbbdev 15151c0b2f7Stbbdev int operator()( int i ) { 15251c0b2f7Stbbdev ++global_execute_count; 15351c0b2f7Stbbdev ++local_execute_count; 15451c0b2f7Stbbdev return i; 15551c0b2f7Stbbdev } 15651c0b2f7Stbbdev 15751c0b2f7Stbbdev }; 15851c0b2f7Stbbdev 15951c0b2f7Stbbdev template< typename InputType, typename OutputType > 16051c0b2f7Stbbdev void buffered_levels_with_copy( size_t concurrency ) { 16151c0b2f7Stbbdev 16251c0b2f7Stbbdev // Do for lc = 1 to concurrency level 16351c0b2f7Stbbdev for ( size_t lc = 1; lc <= concurrency; ++lc ) { 16451c0b2f7Stbbdev tbb::flow::graph g; 16551c0b2f7Stbbdev 16651c0b2f7Stbbdev inc_functor cf; 16751c0b2f7Stbbdev cf.local_execute_count = Offset; 16851c0b2f7Stbbdev global_execute_count = Offset; 16951c0b2f7Stbbdev 17051c0b2f7Stbbdev tbb::flow::function_node< InputType, OutputType > exe_node( g, lc, cf ); 17151c0b2f7Stbbdev 17251c0b2f7Stbbdev for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { 17351c0b2f7Stbbdev 17451c0b2f7Stbbdev std::vector< std::shared_ptr<harness_mapped_receiver<OutputType>> > receivers; 17551c0b2f7Stbbdev for (size_t i = 0; i < num_receivers; i++) { 17651c0b2f7Stbbdev receivers.push_back( std::make_shared<harness_mapped_receiver<OutputType>>(g) ); 17751c0b2f7Stbbdev } 17851c0b2f7Stbbdev 17951c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 18051c0b2f7Stbbdev tbb::flow::make_edge( exe_node, *receivers[r] ); 18151c0b2f7Stbbdev } 18251c0b2f7Stbbdev 18351c0b2f7Stbbdev std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders; 18451c0b2f7Stbbdev for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) { 18551c0b2f7Stbbdev senders.clear(); 18651c0b2f7Stbbdev for (size_t s = 0; s < num_senders; ++s ) { 18751c0b2f7Stbbdev senders.push_back( std::make_shared<harness_counting_sender<InputType>>() ); 18851c0b2f7Stbbdev senders.back()->my_limit = N; 18951c0b2f7Stbbdev tbb::flow::make_edge( *senders.back(), exe_node ); 19051c0b2f7Stbbdev } 19151c0b2f7Stbbdev 19251c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 19351c0b2f7Stbbdev receivers[r]->initialize_map( N, num_senders ); 19451c0b2f7Stbbdev } 19551c0b2f7Stbbdev 19651c0b2f7Stbbdev utils::NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) ); 19751c0b2f7Stbbdev g.wait_for_all(); 19851c0b2f7Stbbdev 19951c0b2f7Stbbdev for (size_t s = 0; s < num_senders; ++s ) { 20051c0b2f7Stbbdev size_t n = senders[s]->my_received; 20151c0b2f7Stbbdev CHECK( n == N ); 20251c0b2f7Stbbdev CHECK( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_node ); 20351c0b2f7Stbbdev } 20451c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 20551c0b2f7Stbbdev receivers[r]->validate(); 20651c0b2f7Stbbdev } 20751c0b2f7Stbbdev } 20851c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 20951c0b2f7Stbbdev tbb::flow::remove_edge( exe_node, *receivers[r] ); 21051c0b2f7Stbbdev } 21151c0b2f7Stbbdev CHECK( exe_node.try_put( InputType() ) == true ); 21251c0b2f7Stbbdev g.wait_for_all(); 21351c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 21451c0b2f7Stbbdev receivers[r]->validate(); 21551c0b2f7Stbbdev } 21651c0b2f7Stbbdev } 21751c0b2f7Stbbdev 21851c0b2f7Stbbdev // validate that the local body matches the global execute_count and both are correct 21951c0b2f7Stbbdev inc_functor body_copy = tbb::flow::copy_body<inc_functor>( exe_node ); 22051c0b2f7Stbbdev const size_t expected_count = N/2 * MAX_NODES * MAX_NODES * ( MAX_NODES + 1 ) + MAX_NODES + Offset; 22151c0b2f7Stbbdev size_t global_count = global_execute_count; 22251c0b2f7Stbbdev size_t inc_count = body_copy.local_execute_count; 22351c0b2f7Stbbdev CHECK(global_count == expected_count); 22451c0b2f7Stbbdev CHECK(global_count == inc_count ); 22551c0b2f7Stbbdev g.reset(tbb::flow::rf_reset_bodies); 22651c0b2f7Stbbdev body_copy = tbb::flow::copy_body<inc_functor>( exe_node ); 22751c0b2f7Stbbdev inc_count = body_copy.local_execute_count; 22851c0b2f7Stbbdev CHECK_MESSAGE( Offset == inc_count, "reset(rf_reset_bodies) did not reset functor" ); 22951c0b2f7Stbbdev } 23051c0b2f7Stbbdev } 23151c0b2f7Stbbdev 23251c0b2f7Stbbdev template< typename InputType, typename OutputType > 23351c0b2f7Stbbdev void run_buffered_levels( int c ) { 23451c0b2f7Stbbdev buffered_levels<InputType,OutputType>( c, []( InputType i ) -> OutputType { return harness_graph_executor<InputType, OutputType>::func(i); } ); 23551c0b2f7Stbbdev buffered_levels<InputType,OutputType>( c, &harness_graph_executor<InputType, OutputType>::func ); 23651c0b2f7Stbbdev buffered_levels<InputType,OutputType>( c, typename harness_graph_executor<InputType, OutputType>::functor() ); 23751c0b2f7Stbbdev buffered_levels_with_copy<InputType,OutputType>( c ); 23851c0b2f7Stbbdev } 23951c0b2f7Stbbdev 24051c0b2f7Stbbdev 24151c0b2f7Stbbdev //! Performs test on executable nodes with limited concurrency 24251c0b2f7Stbbdev /** These tests check: 24351c0b2f7Stbbdev 1) that the nodes will accepts puts up to the concurrency limit, 24451c0b2f7Stbbdev 2) the nodes do not exceed the concurrency limit even when run with more threads (this is checked in the harness_graph_executor), 24551c0b2f7Stbbdev 3) the nodes will receive puts from multiple successors simultaneously, 24651c0b2f7Stbbdev and 4) the nodes will send to multiple predecessors. 24751c0b2f7Stbbdev There is no checking of the contents of the messages for corruption. 24851c0b2f7Stbbdev */ 24951c0b2f7Stbbdev 25051c0b2f7Stbbdev template< typename InputType, typename OutputType, typename Body > 25151c0b2f7Stbbdev void concurrency_levels( size_t concurrency, Body body ) { 25251c0b2f7Stbbdev 25351c0b2f7Stbbdev for ( size_t lc = 1; lc <= concurrency; ++lc ) { 25451c0b2f7Stbbdev tbb::flow::graph g; 25551c0b2f7Stbbdev 25651c0b2f7Stbbdev // Set the execute_counter back to zero in the harness 25751c0b2f7Stbbdev harness_graph_executor<InputType, OutputType>::execute_count = 0; 25851c0b2f7Stbbdev // Set the number of current executors to zero. 25951c0b2f7Stbbdev harness_graph_executor<InputType, OutputType>::current_executors = 0; 26051c0b2f7Stbbdev // Set the max allowed executors to lc. There is a check in the functor to make sure this is never exceeded. 26151c0b2f7Stbbdev harness_graph_executor<InputType, OutputType>::max_executors = lc; 26251c0b2f7Stbbdev 26351c0b2f7Stbbdev typedef tbb::flow::function_node< InputType, OutputType, tbb::flow::rejecting > fnode_type; 26451c0b2f7Stbbdev fnode_type exe_node( g, lc, body ); 26551c0b2f7Stbbdev 26651c0b2f7Stbbdev for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { 26751c0b2f7Stbbdev 26851c0b2f7Stbbdev std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers; 26951c0b2f7Stbbdev for (size_t i = 0; i < num_receivers; ++i) { 27051c0b2f7Stbbdev receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) ); 27151c0b2f7Stbbdev } 27251c0b2f7Stbbdev 27351c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 27451c0b2f7Stbbdev tbb::flow::make_edge( exe_node, *receivers[r] ); 27551c0b2f7Stbbdev } 27651c0b2f7Stbbdev 27751c0b2f7Stbbdev std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders; 27851c0b2f7Stbbdev 27951c0b2f7Stbbdev for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) { 28051c0b2f7Stbbdev senders.clear(); 28151c0b2f7Stbbdev { 28251c0b2f7Stbbdev // Exclusively lock m to prevent exe_node from finishing 28351c0b2f7Stbbdev tbb::spin_rw_mutex::scoped_lock l( 28451c0b2f7Stbbdev harness_graph_executor<InputType, OutputType>::template mutex_holder<tbb::spin_rw_mutex>::mutex 28551c0b2f7Stbbdev ); 28651c0b2f7Stbbdev 28751c0b2f7Stbbdev // put to lc level, it will accept and then block at m 28851c0b2f7Stbbdev for ( size_t c = 0 ; c < lc ; ++c ) { 28951c0b2f7Stbbdev CHECK( exe_node.try_put( InputType() ) == true ); 29051c0b2f7Stbbdev } 29151c0b2f7Stbbdev // it only accepts to lc level 29251c0b2f7Stbbdev CHECK( exe_node.try_put( InputType() ) == false ); 29351c0b2f7Stbbdev 29451c0b2f7Stbbdev for (size_t s = 0; s < num_senders; ++s ) { 29551c0b2f7Stbbdev senders.push_back( std::make_shared<harness_counting_sender<InputType>>() ); 29651c0b2f7Stbbdev // register a sender 29751c0b2f7Stbbdev senders.back()->my_limit = N; 29851c0b2f7Stbbdev exe_node.register_predecessor( *senders.back() ); 29951c0b2f7Stbbdev } 30051c0b2f7Stbbdev 30151c0b2f7Stbbdev } // release lock at end of scope, setting the exe node free to continue 30251c0b2f7Stbbdev // wait for graph to settle down 30351c0b2f7Stbbdev g.wait_for_all(); 30451c0b2f7Stbbdev 30551c0b2f7Stbbdev // confirm that each sender was requested from N times 30651c0b2f7Stbbdev for (size_t s = 0; s < num_senders; ++s ) { 30751c0b2f7Stbbdev size_t n = senders[s]->my_received; 30851c0b2f7Stbbdev CHECK( n == N ); 30951c0b2f7Stbbdev CHECK( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_node ); 31051c0b2f7Stbbdev } 31151c0b2f7Stbbdev // confirm that each receivers got N * num_senders + the initial lc puts 31251c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 31351c0b2f7Stbbdev size_t n = receivers[r]->my_count; 31451c0b2f7Stbbdev CHECK( n == num_senders*N+lc ); 31551c0b2f7Stbbdev receivers[r]->my_count = 0; 31651c0b2f7Stbbdev } 31751c0b2f7Stbbdev } 31851c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 31951c0b2f7Stbbdev tbb::flow::remove_edge( exe_node, *receivers[r] ); 32051c0b2f7Stbbdev } 32151c0b2f7Stbbdev CHECK( exe_node.try_put( InputType() ) == true ); 32251c0b2f7Stbbdev g.wait_for_all(); 32351c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 32451c0b2f7Stbbdev CHECK( int(receivers[r]->my_count) == 0 ); 32551c0b2f7Stbbdev } 32651c0b2f7Stbbdev } 32751c0b2f7Stbbdev } 32851c0b2f7Stbbdev } 32951c0b2f7Stbbdev 33051c0b2f7Stbbdev 33151c0b2f7Stbbdev template< typename InputType, typename OutputType > 33251c0b2f7Stbbdev void run_concurrency_levels( int c ) { 33351c0b2f7Stbbdev concurrency_levels<InputType,OutputType>( c, []( InputType i ) -> OutputType { return harness_graph_executor<InputType, OutputType>::template tfunc<tbb::spin_rw_mutex>(i); } ); 33451c0b2f7Stbbdev concurrency_levels<InputType,OutputType>( c, &harness_graph_executor<InputType, OutputType>::template tfunc<tbb::spin_rw_mutex> ); 33551c0b2f7Stbbdev concurrency_levels<InputType,OutputType>( c, typename harness_graph_executor<InputType, OutputType>::template tfunctor<tbb::spin_rw_mutex>() ); 33651c0b2f7Stbbdev } 33751c0b2f7Stbbdev 33851c0b2f7Stbbdev 33951c0b2f7Stbbdev struct empty_no_assign { 34051c0b2f7Stbbdev empty_no_assign() {} 34151c0b2f7Stbbdev empty_no_assign( int ) {} 34251c0b2f7Stbbdev operator int() { return 0; } 34351c0b2f7Stbbdev }; 34451c0b2f7Stbbdev 34551c0b2f7Stbbdev template< typename InputType > 34651c0b2f7Stbbdev struct parallel_puts : private utils::NoAssign { 34751c0b2f7Stbbdev 34851c0b2f7Stbbdev tbb::flow::receiver< InputType > * const my_exe_node; 34951c0b2f7Stbbdev 35051c0b2f7Stbbdev parallel_puts( tbb::flow::receiver< InputType > &exe_node ) : my_exe_node(&exe_node) {} 35151c0b2f7Stbbdev 35251c0b2f7Stbbdev void operator()( int ) const { 35351c0b2f7Stbbdev for ( int i = 0; i < N; ++i ) { 35451c0b2f7Stbbdev // the nodes will accept all puts 35551c0b2f7Stbbdev CHECK( my_exe_node->try_put( InputType() ) == true ); 35651c0b2f7Stbbdev } 35751c0b2f7Stbbdev } 35851c0b2f7Stbbdev 35951c0b2f7Stbbdev }; 36051c0b2f7Stbbdev 36151c0b2f7Stbbdev //! Performs test on executable nodes with unlimited concurrency 36251c0b2f7Stbbdev /** These tests check: 36351c0b2f7Stbbdev 1) that the nodes will accept all puts 36451c0b2f7Stbbdev 2) the nodes will receive puts from multiple predecessors simultaneously, 36551c0b2f7Stbbdev and 3) the nodes will send to multiple successors. 36651c0b2f7Stbbdev There is no checking of the contents of the messages for corruption. 36751c0b2f7Stbbdev */ 36851c0b2f7Stbbdev 36951c0b2f7Stbbdev template< typename InputType, typename OutputType, typename Body > 37051c0b2f7Stbbdev void unlimited_concurrency( Body body ) { 37151c0b2f7Stbbdev 37251c0b2f7Stbbdev for (unsigned p = 1; p < 2*utils::MaxThread; ++p) { 37351c0b2f7Stbbdev tbb::flow::graph g; 37451c0b2f7Stbbdev tbb::flow::function_node< InputType, OutputType, tbb::flow::rejecting > exe_node( g, tbb::flow::unlimited, body ); 37551c0b2f7Stbbdev 37651c0b2f7Stbbdev for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { 37751c0b2f7Stbbdev 37851c0b2f7Stbbdev std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers; 37951c0b2f7Stbbdev for (size_t i = 0; i < num_receivers; ++i) { 38051c0b2f7Stbbdev receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) ); 38151c0b2f7Stbbdev } 38251c0b2f7Stbbdev 38351c0b2f7Stbbdev harness_graph_executor<InputType, OutputType>::execute_count = 0; 38451c0b2f7Stbbdev 38551c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 38651c0b2f7Stbbdev tbb::flow::make_edge( exe_node, *receivers[r] ); 38751c0b2f7Stbbdev } 38851c0b2f7Stbbdev 38951c0b2f7Stbbdev utils::NativeParallelFor( p, parallel_puts<InputType>(exe_node) ); 39051c0b2f7Stbbdev g.wait_for_all(); 39151c0b2f7Stbbdev 39251c0b2f7Stbbdev // 2) the nodes will receive puts from multiple predecessors simultaneously, 39351c0b2f7Stbbdev size_t ec = harness_graph_executor<InputType, OutputType>::execute_count; 39451c0b2f7Stbbdev CHECK( ec == p*N ); 39551c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 39651c0b2f7Stbbdev size_t c = receivers[r]->my_count; 39751c0b2f7Stbbdev // 3) the nodes will send to multiple successors. 39851c0b2f7Stbbdev CHECK( c == p*N ); 39951c0b2f7Stbbdev } 40051c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 40151c0b2f7Stbbdev tbb::flow::remove_edge( exe_node, *receivers[r] ); 40251c0b2f7Stbbdev } 40351c0b2f7Stbbdev } 40451c0b2f7Stbbdev } 40551c0b2f7Stbbdev } 40651c0b2f7Stbbdev 40751c0b2f7Stbbdev template< typename InputType, typename OutputType > 40851c0b2f7Stbbdev void run_unlimited_concurrency() { 40951c0b2f7Stbbdev harness_graph_executor<InputType, OutputType>::max_executors = 0; 41051c0b2f7Stbbdev unlimited_concurrency<InputType,OutputType>( []( InputType i ) -> OutputType { return harness_graph_executor<InputType, OutputType>::func(i); } ); 41151c0b2f7Stbbdev unlimited_concurrency<InputType,OutputType>( &harness_graph_executor<InputType, OutputType>::func ); 41251c0b2f7Stbbdev unlimited_concurrency<InputType,OutputType>( typename harness_graph_executor<InputType, OutputType>::functor() ); 41351c0b2f7Stbbdev } 41451c0b2f7Stbbdev 41551c0b2f7Stbbdev struct continue_msg_to_int { 41651c0b2f7Stbbdev int my_int; 41751c0b2f7Stbbdev continue_msg_to_int(int x) : my_int(x) {} 41851c0b2f7Stbbdev int operator()(tbb::flow::continue_msg) { return my_int; } 41951c0b2f7Stbbdev }; 42051c0b2f7Stbbdev 42151c0b2f7Stbbdev void test_function_node_with_continue_msg_as_input() { 42251c0b2f7Stbbdev // If this function terminates, then this test is successful 42351c0b2f7Stbbdev tbb::flow::graph g; 42451c0b2f7Stbbdev 42551c0b2f7Stbbdev tbb::flow::broadcast_node<tbb::flow::continue_msg> Start(g); 42651c0b2f7Stbbdev 42751c0b2f7Stbbdev tbb::flow::function_node<tbb::flow::continue_msg, int, tbb::flow::rejecting> FN1( g, tbb::flow::serial, continue_msg_to_int(42)); 42851c0b2f7Stbbdev tbb::flow::function_node<tbb::flow::continue_msg, int, tbb::flow::rejecting> FN2( g, tbb::flow::serial, continue_msg_to_int(43)); 42951c0b2f7Stbbdev 43051c0b2f7Stbbdev tbb::flow::make_edge( Start, FN1 ); 43151c0b2f7Stbbdev tbb::flow::make_edge( Start, FN2 ); 43251c0b2f7Stbbdev 43351c0b2f7Stbbdev Start.try_put( tbb::flow::continue_msg() ); 43451c0b2f7Stbbdev g.wait_for_all(); 43551c0b2f7Stbbdev } 43651c0b2f7Stbbdev 43751c0b2f7Stbbdev //! Tests limited concurrency cases for nodes that accept data messages 43851c0b2f7Stbbdev void test_concurrency(int num_threads) { 43951c0b2f7Stbbdev tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, num_threads); 44051c0b2f7Stbbdev run_concurrency_levels<int,int>(num_threads); 44151c0b2f7Stbbdev run_concurrency_levels<int,tbb::flow::continue_msg>(num_threads); 44251c0b2f7Stbbdev run_buffered_levels<int, int>(num_threads); 44351c0b2f7Stbbdev run_unlimited_concurrency<int,int>(); 44451c0b2f7Stbbdev run_unlimited_concurrency<int,empty_no_assign>(); 44551c0b2f7Stbbdev run_unlimited_concurrency<empty_no_assign,int>(); 44651c0b2f7Stbbdev run_unlimited_concurrency<empty_no_assign,empty_no_assign>(); 44751c0b2f7Stbbdev run_unlimited_concurrency<int,tbb::flow::continue_msg>(); 44851c0b2f7Stbbdev run_unlimited_concurrency<empty_no_assign,tbb::flow::continue_msg>(); 44951c0b2f7Stbbdev test_function_node_with_continue_msg_as_input(); 45051c0b2f7Stbbdev } 45151c0b2f7Stbbdev 45251c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 45351c0b2f7Stbbdev #include <array> 45451c0b2f7Stbbdev #include <vector> 45551c0b2f7Stbbdev void test_follows_and_precedes_api() { 45651c0b2f7Stbbdev using msg_t = tbb::flow::continue_msg; 45751c0b2f7Stbbdev 45851c0b2f7Stbbdev std::array<msg_t, 3> messages_for_follows = { {msg_t(), msg_t(), msg_t()} }; 45951c0b2f7Stbbdev std::vector<msg_t> messages_for_precedes = { msg_t() }; 46051c0b2f7Stbbdev 46151c0b2f7Stbbdev pass_through<msg_t> pass_msg; 46251c0b2f7Stbbdev 46351c0b2f7Stbbdev follows_and_precedes_testing::test_follows 46451c0b2f7Stbbdev <msg_t, tbb::flow::function_node<msg_t, msg_t>> 46551c0b2f7Stbbdev (messages_for_follows, tbb::flow::unlimited, pass_msg); 46651c0b2f7Stbbdev follows_and_precedes_testing::test_precedes 46751c0b2f7Stbbdev <msg_t, tbb::flow::function_node<msg_t, msg_t>> 46851c0b2f7Stbbdev (messages_for_precedes, tbb::flow::unlimited, pass_msg, tbb::flow::node_priority_t(1)); 46951c0b2f7Stbbdev } 47051c0b2f7Stbbdev #endif 47151c0b2f7Stbbdev 47251c0b2f7Stbbdev #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 47351c0b2f7Stbbdev 47451c0b2f7Stbbdev int function_body_f(const int&) { return 1; } 47551c0b2f7Stbbdev 47651c0b2f7Stbbdev template <typename Body> 47751c0b2f7Stbbdev void test_deduction_guides_common(Body body) { 47851c0b2f7Stbbdev using namespace tbb::flow; 47951c0b2f7Stbbdev graph g; 48051c0b2f7Stbbdev 48151c0b2f7Stbbdev function_node f1(g, unlimited, body); 48251c0b2f7Stbbdev static_assert(std::is_same_v<decltype(f1), function_node<int, int>>); 48351c0b2f7Stbbdev 48451c0b2f7Stbbdev function_node f2(g, unlimited, body, rejecting()); 48551c0b2f7Stbbdev static_assert(std::is_same_v<decltype(f2), function_node<int, int, rejecting>>); 48651c0b2f7Stbbdev 48751c0b2f7Stbbdev function_node f3(g, unlimited, body, node_priority_t(5)); 48851c0b2f7Stbbdev static_assert(std::is_same_v<decltype(f3), function_node<int, int>>); 48951c0b2f7Stbbdev 49051c0b2f7Stbbdev function_node f4(g, unlimited, body, rejecting(), node_priority_t(5)); 49151c0b2f7Stbbdev static_assert(std::is_same_v<decltype(f4), function_node<int, int, rejecting>>); 49251c0b2f7Stbbdev 49351c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 49451c0b2f7Stbbdev function_node f5(follows(f2), unlimited, body); 49551c0b2f7Stbbdev static_assert(std::is_same_v<decltype(f5), function_node<int, int>>); 49651c0b2f7Stbbdev 49751c0b2f7Stbbdev function_node f6(follows(f5), unlimited, body, rejecting()); 49851c0b2f7Stbbdev static_assert(std::is_same_v<decltype(f6), function_node<int, int, rejecting>>); 49951c0b2f7Stbbdev 50051c0b2f7Stbbdev function_node f7(follows(f6), unlimited, body, node_priority_t(5)); 50151c0b2f7Stbbdev static_assert(std::is_same_v<decltype(f7), function_node<int, int>>); 50251c0b2f7Stbbdev 50351c0b2f7Stbbdev function_node f8(follows(f7), unlimited, body, rejecting(), node_priority_t(5)); 50451c0b2f7Stbbdev static_assert(std::is_same_v<decltype(f8), function_node<int, int, rejecting>>); 50551c0b2f7Stbbdev #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 50651c0b2f7Stbbdev 50751c0b2f7Stbbdev function_node f9(f1); 50851c0b2f7Stbbdev static_assert(std::is_same_v<decltype(f9), function_node<int, int>>); 50951c0b2f7Stbbdev } 51051c0b2f7Stbbdev 51151c0b2f7Stbbdev void test_deduction_guides() { 51251c0b2f7Stbbdev test_deduction_guides_common([](const int&)->int { return 1; }); 51351c0b2f7Stbbdev test_deduction_guides_common([](const int&) mutable ->int { return 1; }); 51451c0b2f7Stbbdev test_deduction_guides_common(function_body_f); 51551c0b2f7Stbbdev } 51651c0b2f7Stbbdev 51751c0b2f7Stbbdev #endif 51851c0b2f7Stbbdev 51951c0b2f7Stbbdev //! Test various node bodies with concurrency 52051c0b2f7Stbbdev //! \brief \ref error_guessing 52151c0b2f7Stbbdev TEST_CASE("Concurrency test") { 52251c0b2f7Stbbdev for(unsigned int p = utils::MinThread; p <= utils::MaxThread; ++p ) { 52351c0b2f7Stbbdev test_concurrency(p); 52451c0b2f7Stbbdev } 52551c0b2f7Stbbdev } 52651c0b2f7Stbbdev 52751c0b2f7Stbbdev //! NativeParallelFor testing with various concurrency settings 52851c0b2f7Stbbdev //! \brief \ref error_guessing 52951c0b2f7Stbbdev TEST_CASE("Lightweight testing"){ 53051c0b2f7Stbbdev lightweight_testing::test<tbb::flow::function_node>(10); 53151c0b2f7Stbbdev } 53251c0b2f7Stbbdev 53351c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 53451c0b2f7Stbbdev //! Test follows and precedes API 53551c0b2f7Stbbdev //! \brief \ref error_guessing 53651c0b2f7Stbbdev TEST_CASE("Flowgraph node set test"){ 53751c0b2f7Stbbdev test_follows_and_precedes_api(); 53851c0b2f7Stbbdev } 53951c0b2f7Stbbdev #endif 54051c0b2f7Stbbdev 54151c0b2f7Stbbdev #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 54251c0b2f7Stbbdev //! Test decution guides 54351c0b2f7Stbbdev //! \brief \ref requirement 54451c0b2f7Stbbdev TEST_CASE("Deduction guides test"){ 54551c0b2f7Stbbdev test_deduction_guides(); 54651c0b2f7Stbbdev } 54751c0b2f7Stbbdev #endif 54851c0b2f7Stbbdev 54951c0b2f7Stbbdev //! try_release and try_consume test 55051c0b2f7Stbbdev //! \brief \ref error_guessing 55151c0b2f7Stbbdev TEST_CASE("try_release try_consume"){ 55251c0b2f7Stbbdev tbb::flow::graph g; 55351c0b2f7Stbbdev 55451c0b2f7Stbbdev tbb::flow::function_node<int, int> fn(g, tbb::flow::unlimited, [](const int&v){return v;}); 55551c0b2f7Stbbdev 55651c0b2f7Stbbdev CHECK_MESSAGE((fn.try_release()==false), "try_release should initially return false on a node"); 55751c0b2f7Stbbdev CHECK_MESSAGE((fn.try_consume()==false), "try_consume should initially return false on a node"); 55851c0b2f7Stbbdev } 559*478de5b1Stbbdev 560*478de5b1Stbbdev #if __TBB_CPP20_CONCEPTS_PRESENT 561*478de5b1Stbbdev //! \brief \ref error_guessing 562*478de5b1Stbbdev TEST_CASE("constraints for function_node input and output") { 563*478de5b1Stbbdev struct InputObject { 564*478de5b1Stbbdev InputObject() = default; 565*478de5b1Stbbdev InputObject( const InputObject& ) = default; 566*478de5b1Stbbdev }; 567*478de5b1Stbbdev struct OutputObject : test_concepts::Copyable {}; 568*478de5b1Stbbdev 569*478de5b1Stbbdev static_assert(utils::well_formed_instantiation<tbb::flow::function_node, InputObject, OutputObject>); 570*478de5b1Stbbdev static_assert(utils::well_formed_instantiation<tbb::flow::function_node, int, int>); 571*478de5b1Stbbdev static_assert(!utils::well_formed_instantiation<tbb::flow::function_node, test_concepts::NonCopyable, OutputObject>); 572*478de5b1Stbbdev static_assert(!utils::well_formed_instantiation<tbb::flow::function_node, test_concepts::NonDefaultInitializable, OutputObject>); 573*478de5b1Stbbdev static_assert(!utils::well_formed_instantiation<tbb::flow::function_node, InputObject, test_concepts::NonCopyable>); 574*478de5b1Stbbdev } 575*478de5b1Stbbdev 576*478de5b1Stbbdev template <typename Input, typename Output, typename Body> 577*478de5b1Stbbdev concept can_call_function_node_ctor = requires( tbb::flow::graph& graph, std::size_t concurrency, Body body, 578*478de5b1Stbbdev tbb::flow::node_priority_t priority, tbb::flow::buffer_node<int>& f ) { 579*478de5b1Stbbdev tbb::flow::function_node<Input, Output>(graph, concurrency, body); 580*478de5b1Stbbdev tbb::flow::function_node<Input, Output>(graph, concurrency, body, priority); 581*478de5b1Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 582*478de5b1Stbbdev tbb::flow::function_node<Input, Output>(tbb::flow::follows(f), concurrency, body); 583*478de5b1Stbbdev tbb::flow::function_node<Input, Output>(tbb::flow::follows(f), concurrency, body, priority); 584*478de5b1Stbbdev #endif 585*478de5b1Stbbdev }; 586*478de5b1Stbbdev 587*478de5b1Stbbdev //! \brief \ref error_guessing 588*478de5b1Stbbdev TEST_CASE("constraints for function_node body") { 589*478de5b1Stbbdev using input_type = int; 590*478de5b1Stbbdev using output_type = int; 591*478de5b1Stbbdev using namespace test_concepts::function_node_body; 592*478de5b1Stbbdev 593*478de5b1Stbbdev static_assert(can_call_function_node_ctor<input_type, output_type, Correct<input_type, output_type>>); 594*478de5b1Stbbdev static_assert(!can_call_function_node_ctor<input_type, output_type, NonCopyable<input_type, output_type>>); 595*478de5b1Stbbdev static_assert(!can_call_function_node_ctor<input_type, output_type, NonDestructible<input_type, output_type>>); 596*478de5b1Stbbdev static_assert(!can_call_function_node_ctor<input_type, output_type, NoOperatorRoundBrackets<input_type, output_type>>); 597*478de5b1Stbbdev static_assert(!can_call_function_node_ctor<input_type, output_type, WrongInputRoundBrackets<input_type, output_type>>); 598*478de5b1Stbbdev static_assert(!can_call_function_node_ctor<input_type, output_type, WrongReturnRoundBrackets<input_type, output_type>>); 599*478de5b1Stbbdev } 600*478de5b1Stbbdev #endif // __TBB_CPP20_CONCEPTS_PRESENT 601