151c0b2f7Stbbdev /* 2*b15aabb3Stbbdev Copyright (c) 2005-2021 Intel Corporation 351c0b2f7Stbbdev 451c0b2f7Stbbdev Licensed under the Apache License, Version 2.0 (the "License"); 551c0b2f7Stbbdev you may not use this file except in compliance with the License. 651c0b2f7Stbbdev You may obtain a copy of the License at 751c0b2f7Stbbdev 851c0b2f7Stbbdev http://www.apache.org/licenses/LICENSE-2.0 951c0b2f7Stbbdev 1051c0b2f7Stbbdev Unless required by applicable law or agreed to in writing, software 1151c0b2f7Stbbdev distributed under the License is distributed on an "AS IS" BASIS, 1251c0b2f7Stbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 1351c0b2f7Stbbdev See the License for the specific language governing permissions and 1451c0b2f7Stbbdev limitations under the License. 1551c0b2f7Stbbdev */ 1651c0b2f7Stbbdev 17*b15aabb3Stbbdev #if __INTEL_COMPILER && _MSC_VER 18*b15aabb3Stbbdev #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated 19*b15aabb3Stbbdev #endif 20*b15aabb3Stbbdev 2151c0b2f7Stbbdev #include "common/config.h" 2251c0b2f7Stbbdev 2351c0b2f7Stbbdev // TODO revamp: move parts dependent on __TBB_EXTRA_DEBUG into separate test(s) since having these 2451c0b2f7Stbbdev // parts in all of tests might make testing of the product, which is different from what is actually 2551c0b2f7Stbbdev // released. 2651c0b2f7Stbbdev #define __TBB_EXTRA_DEBUG 1 2751c0b2f7Stbbdev #include "tbb/flow_graph.h" 2851c0b2f7Stbbdev #include "tbb/spin_rw_mutex.h" 2951c0b2f7Stbbdev #include "tbb/global_control.h" 3051c0b2f7Stbbdev 3151c0b2f7Stbbdev #include "common/test.h" 3251c0b2f7Stbbdev #include "common/utils.h" 3351c0b2f7Stbbdev #include "common/graph_utils.h" 3451c0b2f7Stbbdev #include "common/test_follows_and_precedes_api.h" 3551c0b2f7Stbbdev 3651c0b2f7Stbbdev 3751c0b2f7Stbbdev //! \file test_function_node.cpp 3851c0b2f7Stbbdev //! \brief Test for [flow_graph.function_node] specification 3951c0b2f7Stbbdev 4051c0b2f7Stbbdev 4151c0b2f7Stbbdev #define N 100 4251c0b2f7Stbbdev #define MAX_NODES 4 4351c0b2f7Stbbdev 4451c0b2f7Stbbdev //! Performs test on function nodes with limited concurrency and buffering 4551c0b2f7Stbbdev /** These tests check: 4651c0b2f7Stbbdev 1) that the number of executing copies never exceed the concurrency limit 4751c0b2f7Stbbdev 2) that the node never rejects 4851c0b2f7Stbbdev 3) that no items are lost 4951c0b2f7Stbbdev and 4) all of this happens even if there are multiple predecessors and successors 5051c0b2f7Stbbdev */ 5151c0b2f7Stbbdev 5251c0b2f7Stbbdev template<typename IO> 5351c0b2f7Stbbdev struct pass_through { 5451c0b2f7Stbbdev IO operator()(const IO& i) { return i; } 5551c0b2f7Stbbdev }; 5651c0b2f7Stbbdev 5751c0b2f7Stbbdev template< typename InputType, typename OutputType, typename Body > 5851c0b2f7Stbbdev void buffered_levels( size_t concurrency, Body body ) { 5951c0b2f7Stbbdev 6051c0b2f7Stbbdev // Do for lc = 1 to concurrency level 6151c0b2f7Stbbdev for ( size_t lc = 1; lc <= concurrency; ++lc ) { 6251c0b2f7Stbbdev tbb::flow::graph g; 6351c0b2f7Stbbdev 6451c0b2f7Stbbdev // Set the execute_counter back to zero in the harness 6551c0b2f7Stbbdev harness_graph_executor<InputType, OutputType>::execute_count = 0; 6651c0b2f7Stbbdev // Set the number of current executors to zero. 6751c0b2f7Stbbdev harness_graph_executor<InputType, OutputType>::current_executors = 0; 6851c0b2f7Stbbdev // Set the max allowed executors to lc. There is a check in the functor to make sure this is never exceeded. 6951c0b2f7Stbbdev harness_graph_executor<InputType, OutputType>::max_executors = lc; 7051c0b2f7Stbbdev 7151c0b2f7Stbbdev // Create the function_node with the appropriate concurrency level, and use default buffering 7251c0b2f7Stbbdev tbb::flow::function_node< InputType, OutputType > exe_node( g, lc, body ); 7351c0b2f7Stbbdev tbb::flow::function_node<InputType, InputType> pass_thru( g, tbb::flow::unlimited, pass_through<InputType>()); 7451c0b2f7Stbbdev 7551c0b2f7Stbbdev // Create a vector of identical exe_nodes and pass_thrus 7651c0b2f7Stbbdev std::vector< tbb::flow::function_node< InputType, OutputType > > exe_vec(2, exe_node); 7751c0b2f7Stbbdev std::vector< tbb::flow::function_node< InputType, InputType > > pass_thru_vec(2, pass_thru); 7851c0b2f7Stbbdev // Attach each pass_thru to its corresponding exe_node 7951c0b2f7Stbbdev for (size_t node_idx=0; node_idx<exe_vec.size(); ++node_idx) { 8051c0b2f7Stbbdev tbb::flow::make_edge(pass_thru_vec[node_idx], exe_vec[node_idx]); 8151c0b2f7Stbbdev } 8251c0b2f7Stbbdev 8351c0b2f7Stbbdev // TODO: why the test is executed serially for the node pairs, not concurrently? 8451c0b2f7Stbbdev for (size_t node_idx=0; node_idx<exe_vec.size(); ++node_idx) { 8551c0b2f7Stbbdev // For num_receivers = 1 to MAX_NODES 8651c0b2f7Stbbdev for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { 8751c0b2f7Stbbdev // Create num_receivers counting receivers and connect the exe_vec[node_idx] to them. 8851c0b2f7Stbbdev std::vector< std::shared_ptr<harness_mapped_receiver<OutputType>> > receivers; 8951c0b2f7Stbbdev for (size_t i = 0; i < num_receivers; i++) { 9051c0b2f7Stbbdev receivers.push_back( std::make_shared<harness_mapped_receiver<OutputType>>(g) ); 9151c0b2f7Stbbdev } 9251c0b2f7Stbbdev 9351c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 9451c0b2f7Stbbdev tbb::flow::make_edge( exe_vec[node_idx], *receivers[r] ); 9551c0b2f7Stbbdev } 9651c0b2f7Stbbdev 9751c0b2f7Stbbdev // Do the test with varying numbers of senders 9851c0b2f7Stbbdev std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders; 9951c0b2f7Stbbdev for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) { 10051c0b2f7Stbbdev // Create num_senders senders, set there message limit each to N, and connect them to 10151c0b2f7Stbbdev // pass_thru_vec[node_idx] 10251c0b2f7Stbbdev senders.clear(); 10351c0b2f7Stbbdev for (size_t s = 0; s < num_senders; ++s ) { 10451c0b2f7Stbbdev senders.push_back( std::make_shared<harness_counting_sender<InputType>>() ); 10551c0b2f7Stbbdev senders.back()->my_limit = N; 10651c0b2f7Stbbdev senders.back()->register_successor(pass_thru_vec[node_idx] ); 10751c0b2f7Stbbdev } 10851c0b2f7Stbbdev 10951c0b2f7Stbbdev // Initialize the receivers so they know how many senders and messages to check for 11051c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 11151c0b2f7Stbbdev receivers[r]->initialize_map( N, num_senders ); 11251c0b2f7Stbbdev } 11351c0b2f7Stbbdev 11451c0b2f7Stbbdev // Do the test 11551c0b2f7Stbbdev utils::NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) ); 11651c0b2f7Stbbdev g.wait_for_all(); 11751c0b2f7Stbbdev 11851c0b2f7Stbbdev // confirm that each sender was requested from N times 11951c0b2f7Stbbdev for (size_t s = 0; s < num_senders; ++s ) { 12051c0b2f7Stbbdev size_t n = senders[s]->my_received; 12151c0b2f7Stbbdev CHECK( n == N ); 12251c0b2f7Stbbdev CHECK( senders[s]->my_receiver.load(std::memory_order_relaxed) == &pass_thru_vec[node_idx] ); 12351c0b2f7Stbbdev } 12451c0b2f7Stbbdev // validate the receivers 12551c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 12651c0b2f7Stbbdev receivers[r]->validate(); 12751c0b2f7Stbbdev } 12851c0b2f7Stbbdev } 12951c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 13051c0b2f7Stbbdev tbb::flow::remove_edge( exe_vec[node_idx], *receivers[r] ); 13151c0b2f7Stbbdev } 13251c0b2f7Stbbdev CHECK( exe_vec[node_idx].try_put( InputType() ) == true ); 13351c0b2f7Stbbdev g.wait_for_all(); 13451c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 13551c0b2f7Stbbdev // since it's detached, nothing should have changed 13651c0b2f7Stbbdev receivers[r]->validate(); 13751c0b2f7Stbbdev } 13851c0b2f7Stbbdev 13951c0b2f7Stbbdev } // for num_receivers 14051c0b2f7Stbbdev } // for node_idx 14151c0b2f7Stbbdev } // for concurrency level lc 14251c0b2f7Stbbdev } 14351c0b2f7Stbbdev 14451c0b2f7Stbbdev const size_t Offset = 123; 14551c0b2f7Stbbdev std::atomic<size_t> global_execute_count; 14651c0b2f7Stbbdev 14751c0b2f7Stbbdev struct inc_functor { 14851c0b2f7Stbbdev 14951c0b2f7Stbbdev std::atomic<size_t> local_execute_count; 15051c0b2f7Stbbdev inc_functor( ) { local_execute_count = 0; } 15151c0b2f7Stbbdev inc_functor( const inc_functor &f ) { local_execute_count = size_t(f.local_execute_count); } 15251c0b2f7Stbbdev void operator=( const inc_functor &f ) { local_execute_count = size_t(f.local_execute_count); } 15351c0b2f7Stbbdev 15451c0b2f7Stbbdev int operator()( int i ) { 15551c0b2f7Stbbdev ++global_execute_count; 15651c0b2f7Stbbdev ++local_execute_count; 15751c0b2f7Stbbdev return i; 15851c0b2f7Stbbdev } 15951c0b2f7Stbbdev 16051c0b2f7Stbbdev }; 16151c0b2f7Stbbdev 16251c0b2f7Stbbdev template< typename InputType, typename OutputType > 16351c0b2f7Stbbdev void buffered_levels_with_copy( size_t concurrency ) { 16451c0b2f7Stbbdev 16551c0b2f7Stbbdev // Do for lc = 1 to concurrency level 16651c0b2f7Stbbdev for ( size_t lc = 1; lc <= concurrency; ++lc ) { 16751c0b2f7Stbbdev tbb::flow::graph g; 16851c0b2f7Stbbdev 16951c0b2f7Stbbdev inc_functor cf; 17051c0b2f7Stbbdev cf.local_execute_count = Offset; 17151c0b2f7Stbbdev global_execute_count = Offset; 17251c0b2f7Stbbdev 17351c0b2f7Stbbdev tbb::flow::function_node< InputType, OutputType > exe_node( g, lc, cf ); 17451c0b2f7Stbbdev 17551c0b2f7Stbbdev for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { 17651c0b2f7Stbbdev 17751c0b2f7Stbbdev std::vector< std::shared_ptr<harness_mapped_receiver<OutputType>> > receivers; 17851c0b2f7Stbbdev for (size_t i = 0; i < num_receivers; i++) { 17951c0b2f7Stbbdev receivers.push_back( std::make_shared<harness_mapped_receiver<OutputType>>(g) ); 18051c0b2f7Stbbdev } 18151c0b2f7Stbbdev 18251c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 18351c0b2f7Stbbdev tbb::flow::make_edge( exe_node, *receivers[r] ); 18451c0b2f7Stbbdev } 18551c0b2f7Stbbdev 18651c0b2f7Stbbdev std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders; 18751c0b2f7Stbbdev for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) { 18851c0b2f7Stbbdev senders.clear(); 18951c0b2f7Stbbdev for (size_t s = 0; s < num_senders; ++s ) { 19051c0b2f7Stbbdev senders.push_back( std::make_shared<harness_counting_sender<InputType>>() ); 19151c0b2f7Stbbdev senders.back()->my_limit = N; 19251c0b2f7Stbbdev tbb::flow::make_edge( *senders.back(), exe_node ); 19351c0b2f7Stbbdev } 19451c0b2f7Stbbdev 19551c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 19651c0b2f7Stbbdev receivers[r]->initialize_map( N, num_senders ); 19751c0b2f7Stbbdev } 19851c0b2f7Stbbdev 19951c0b2f7Stbbdev utils::NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) ); 20051c0b2f7Stbbdev g.wait_for_all(); 20151c0b2f7Stbbdev 20251c0b2f7Stbbdev for (size_t s = 0; s < num_senders; ++s ) { 20351c0b2f7Stbbdev size_t n = senders[s]->my_received; 20451c0b2f7Stbbdev CHECK( n == N ); 20551c0b2f7Stbbdev CHECK( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_node ); 20651c0b2f7Stbbdev } 20751c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 20851c0b2f7Stbbdev receivers[r]->validate(); 20951c0b2f7Stbbdev } 21051c0b2f7Stbbdev } 21151c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 21251c0b2f7Stbbdev tbb::flow::remove_edge( exe_node, *receivers[r] ); 21351c0b2f7Stbbdev } 21451c0b2f7Stbbdev CHECK( exe_node.try_put( InputType() ) == true ); 21551c0b2f7Stbbdev g.wait_for_all(); 21651c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 21751c0b2f7Stbbdev receivers[r]->validate(); 21851c0b2f7Stbbdev } 21951c0b2f7Stbbdev } 22051c0b2f7Stbbdev 22151c0b2f7Stbbdev // validate that the local body matches the global execute_count and both are correct 22251c0b2f7Stbbdev inc_functor body_copy = tbb::flow::copy_body<inc_functor>( exe_node ); 22351c0b2f7Stbbdev const size_t expected_count = N/2 * MAX_NODES * MAX_NODES * ( MAX_NODES + 1 ) + MAX_NODES + Offset; 22451c0b2f7Stbbdev size_t global_count = global_execute_count; 22551c0b2f7Stbbdev size_t inc_count = body_copy.local_execute_count; 22651c0b2f7Stbbdev CHECK(global_count == expected_count); 22751c0b2f7Stbbdev CHECK(global_count == inc_count ); 22851c0b2f7Stbbdev g.reset(tbb::flow::rf_reset_bodies); 22951c0b2f7Stbbdev body_copy = tbb::flow::copy_body<inc_functor>( exe_node ); 23051c0b2f7Stbbdev inc_count = body_copy.local_execute_count; 23151c0b2f7Stbbdev CHECK_MESSAGE( Offset == inc_count, "reset(rf_reset_bodies) did not reset functor" ); 23251c0b2f7Stbbdev } 23351c0b2f7Stbbdev } 23451c0b2f7Stbbdev 23551c0b2f7Stbbdev template< typename InputType, typename OutputType > 23651c0b2f7Stbbdev void run_buffered_levels( int c ) { 23751c0b2f7Stbbdev buffered_levels<InputType,OutputType>( c, []( InputType i ) -> OutputType { return harness_graph_executor<InputType, OutputType>::func(i); } ); 23851c0b2f7Stbbdev buffered_levels<InputType,OutputType>( c, &harness_graph_executor<InputType, OutputType>::func ); 23951c0b2f7Stbbdev buffered_levels<InputType,OutputType>( c, typename harness_graph_executor<InputType, OutputType>::functor() ); 24051c0b2f7Stbbdev buffered_levels_with_copy<InputType,OutputType>( c ); 24151c0b2f7Stbbdev } 24251c0b2f7Stbbdev 24351c0b2f7Stbbdev 24451c0b2f7Stbbdev //! Performs test on executable nodes with limited concurrency 24551c0b2f7Stbbdev /** These tests check: 24651c0b2f7Stbbdev 1) that the nodes will accepts puts up to the concurrency limit, 24751c0b2f7Stbbdev 2) the nodes do not exceed the concurrency limit even when run with more threads (this is checked in the harness_graph_executor), 24851c0b2f7Stbbdev 3) the nodes will receive puts from multiple successors simultaneously, 24951c0b2f7Stbbdev and 4) the nodes will send to multiple predecessors. 25051c0b2f7Stbbdev There is no checking of the contents of the messages for corruption. 25151c0b2f7Stbbdev */ 25251c0b2f7Stbbdev 25351c0b2f7Stbbdev template< typename InputType, typename OutputType, typename Body > 25451c0b2f7Stbbdev void concurrency_levels( size_t concurrency, Body body ) { 25551c0b2f7Stbbdev 25651c0b2f7Stbbdev for ( size_t lc = 1; lc <= concurrency; ++lc ) { 25751c0b2f7Stbbdev tbb::flow::graph g; 25851c0b2f7Stbbdev 25951c0b2f7Stbbdev // Set the execute_counter back to zero in the harness 26051c0b2f7Stbbdev harness_graph_executor<InputType, OutputType>::execute_count = 0; 26151c0b2f7Stbbdev // Set the number of current executors to zero. 26251c0b2f7Stbbdev harness_graph_executor<InputType, OutputType>::current_executors = 0; 26351c0b2f7Stbbdev // Set the max allowed executors to lc. There is a check in the functor to make sure this is never exceeded. 26451c0b2f7Stbbdev harness_graph_executor<InputType, OutputType>::max_executors = lc; 26551c0b2f7Stbbdev 26651c0b2f7Stbbdev typedef tbb::flow::function_node< InputType, OutputType, tbb::flow::rejecting > fnode_type; 26751c0b2f7Stbbdev fnode_type exe_node( g, lc, body ); 26851c0b2f7Stbbdev 26951c0b2f7Stbbdev for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { 27051c0b2f7Stbbdev 27151c0b2f7Stbbdev std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers; 27251c0b2f7Stbbdev for (size_t i = 0; i < num_receivers; ++i) { 27351c0b2f7Stbbdev receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) ); 27451c0b2f7Stbbdev } 27551c0b2f7Stbbdev 27651c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 27751c0b2f7Stbbdev tbb::flow::make_edge( exe_node, *receivers[r] ); 27851c0b2f7Stbbdev } 27951c0b2f7Stbbdev 28051c0b2f7Stbbdev std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders; 28151c0b2f7Stbbdev 28251c0b2f7Stbbdev for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) { 28351c0b2f7Stbbdev senders.clear(); 28451c0b2f7Stbbdev { 28551c0b2f7Stbbdev // Exclusively lock m to prevent exe_node from finishing 28651c0b2f7Stbbdev tbb::spin_rw_mutex::scoped_lock l( 28751c0b2f7Stbbdev harness_graph_executor<InputType, OutputType>::template mutex_holder<tbb::spin_rw_mutex>::mutex 28851c0b2f7Stbbdev ); 28951c0b2f7Stbbdev 29051c0b2f7Stbbdev // put to lc level, it will accept and then block at m 29151c0b2f7Stbbdev for ( size_t c = 0 ; c < lc ; ++c ) { 29251c0b2f7Stbbdev CHECK( exe_node.try_put( InputType() ) == true ); 29351c0b2f7Stbbdev } 29451c0b2f7Stbbdev // it only accepts to lc level 29551c0b2f7Stbbdev CHECK( exe_node.try_put( InputType() ) == false ); 29651c0b2f7Stbbdev 29751c0b2f7Stbbdev for (size_t s = 0; s < num_senders; ++s ) { 29851c0b2f7Stbbdev senders.push_back( std::make_shared<harness_counting_sender<InputType>>() ); 29951c0b2f7Stbbdev // register a sender 30051c0b2f7Stbbdev senders.back()->my_limit = N; 30151c0b2f7Stbbdev exe_node.register_predecessor( *senders.back() ); 30251c0b2f7Stbbdev } 30351c0b2f7Stbbdev 30451c0b2f7Stbbdev } // release lock at end of scope, setting the exe node free to continue 30551c0b2f7Stbbdev // wait for graph to settle down 30651c0b2f7Stbbdev g.wait_for_all(); 30751c0b2f7Stbbdev 30851c0b2f7Stbbdev // confirm that each sender was requested from N times 30951c0b2f7Stbbdev for (size_t s = 0; s < num_senders; ++s ) { 31051c0b2f7Stbbdev size_t n = senders[s]->my_received; 31151c0b2f7Stbbdev CHECK( n == N ); 31251c0b2f7Stbbdev CHECK( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_node ); 31351c0b2f7Stbbdev } 31451c0b2f7Stbbdev // confirm that each receivers got N * num_senders + the initial lc puts 31551c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 31651c0b2f7Stbbdev size_t n = receivers[r]->my_count; 31751c0b2f7Stbbdev CHECK( n == num_senders*N+lc ); 31851c0b2f7Stbbdev receivers[r]->my_count = 0; 31951c0b2f7Stbbdev } 32051c0b2f7Stbbdev } 32151c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 32251c0b2f7Stbbdev tbb::flow::remove_edge( exe_node, *receivers[r] ); 32351c0b2f7Stbbdev } 32451c0b2f7Stbbdev CHECK( exe_node.try_put( InputType() ) == true ); 32551c0b2f7Stbbdev g.wait_for_all(); 32651c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 32751c0b2f7Stbbdev CHECK( int(receivers[r]->my_count) == 0 ); 32851c0b2f7Stbbdev } 32951c0b2f7Stbbdev } 33051c0b2f7Stbbdev } 33151c0b2f7Stbbdev } 33251c0b2f7Stbbdev 33351c0b2f7Stbbdev 33451c0b2f7Stbbdev template< typename InputType, typename OutputType > 33551c0b2f7Stbbdev void run_concurrency_levels( int c ) { 33651c0b2f7Stbbdev concurrency_levels<InputType,OutputType>( c, []( InputType i ) -> OutputType { return harness_graph_executor<InputType, OutputType>::template tfunc<tbb::spin_rw_mutex>(i); } ); 33751c0b2f7Stbbdev concurrency_levels<InputType,OutputType>( c, &harness_graph_executor<InputType, OutputType>::template tfunc<tbb::spin_rw_mutex> ); 33851c0b2f7Stbbdev concurrency_levels<InputType,OutputType>( c, typename harness_graph_executor<InputType, OutputType>::template tfunctor<tbb::spin_rw_mutex>() ); 33951c0b2f7Stbbdev } 34051c0b2f7Stbbdev 34151c0b2f7Stbbdev 34251c0b2f7Stbbdev struct empty_no_assign { 34351c0b2f7Stbbdev empty_no_assign() {} 34451c0b2f7Stbbdev empty_no_assign( int ) {} 34551c0b2f7Stbbdev operator int() { return 0; } 34651c0b2f7Stbbdev }; 34751c0b2f7Stbbdev 34851c0b2f7Stbbdev template< typename InputType > 34951c0b2f7Stbbdev struct parallel_puts : private utils::NoAssign { 35051c0b2f7Stbbdev 35151c0b2f7Stbbdev tbb::flow::receiver< InputType > * const my_exe_node; 35251c0b2f7Stbbdev 35351c0b2f7Stbbdev parallel_puts( tbb::flow::receiver< InputType > &exe_node ) : my_exe_node(&exe_node) {} 35451c0b2f7Stbbdev 35551c0b2f7Stbbdev void operator()( int ) const { 35651c0b2f7Stbbdev for ( int i = 0; i < N; ++i ) { 35751c0b2f7Stbbdev // the nodes will accept all puts 35851c0b2f7Stbbdev CHECK( my_exe_node->try_put( InputType() ) == true ); 35951c0b2f7Stbbdev } 36051c0b2f7Stbbdev } 36151c0b2f7Stbbdev 36251c0b2f7Stbbdev }; 36351c0b2f7Stbbdev 36451c0b2f7Stbbdev //! Performs test on executable nodes with unlimited concurrency 36551c0b2f7Stbbdev /** These tests check: 36651c0b2f7Stbbdev 1) that the nodes will accept all puts 36751c0b2f7Stbbdev 2) the nodes will receive puts from multiple predecessors simultaneously, 36851c0b2f7Stbbdev and 3) the nodes will send to multiple successors. 36951c0b2f7Stbbdev There is no checking of the contents of the messages for corruption. 37051c0b2f7Stbbdev */ 37151c0b2f7Stbbdev 37251c0b2f7Stbbdev template< typename InputType, typename OutputType, typename Body > 37351c0b2f7Stbbdev void unlimited_concurrency( Body body ) { 37451c0b2f7Stbbdev 37551c0b2f7Stbbdev for (unsigned p = 1; p < 2*utils::MaxThread; ++p) { 37651c0b2f7Stbbdev tbb::flow::graph g; 37751c0b2f7Stbbdev tbb::flow::function_node< InputType, OutputType, tbb::flow::rejecting > exe_node( g, tbb::flow::unlimited, body ); 37851c0b2f7Stbbdev 37951c0b2f7Stbbdev for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { 38051c0b2f7Stbbdev 38151c0b2f7Stbbdev std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers; 38251c0b2f7Stbbdev for (size_t i = 0; i < num_receivers; ++i) { 38351c0b2f7Stbbdev receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) ); 38451c0b2f7Stbbdev } 38551c0b2f7Stbbdev 38651c0b2f7Stbbdev harness_graph_executor<InputType, OutputType>::execute_count = 0; 38751c0b2f7Stbbdev 38851c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 38951c0b2f7Stbbdev tbb::flow::make_edge( exe_node, *receivers[r] ); 39051c0b2f7Stbbdev } 39151c0b2f7Stbbdev 39251c0b2f7Stbbdev utils::NativeParallelFor( p, parallel_puts<InputType>(exe_node) ); 39351c0b2f7Stbbdev g.wait_for_all(); 39451c0b2f7Stbbdev 39551c0b2f7Stbbdev // 2) the nodes will receive puts from multiple predecessors simultaneously, 39651c0b2f7Stbbdev size_t ec = harness_graph_executor<InputType, OutputType>::execute_count; 39751c0b2f7Stbbdev CHECK( ec == p*N ); 39851c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 39951c0b2f7Stbbdev size_t c = receivers[r]->my_count; 40051c0b2f7Stbbdev // 3) the nodes will send to multiple successors. 40151c0b2f7Stbbdev CHECK( c == p*N ); 40251c0b2f7Stbbdev } 40351c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 40451c0b2f7Stbbdev tbb::flow::remove_edge( exe_node, *receivers[r] ); 40551c0b2f7Stbbdev } 40651c0b2f7Stbbdev } 40751c0b2f7Stbbdev } 40851c0b2f7Stbbdev } 40951c0b2f7Stbbdev 41051c0b2f7Stbbdev template< typename InputType, typename OutputType > 41151c0b2f7Stbbdev void run_unlimited_concurrency() { 41251c0b2f7Stbbdev harness_graph_executor<InputType, OutputType>::max_executors = 0; 41351c0b2f7Stbbdev unlimited_concurrency<InputType,OutputType>( []( InputType i ) -> OutputType { return harness_graph_executor<InputType, OutputType>::func(i); } ); 41451c0b2f7Stbbdev unlimited_concurrency<InputType,OutputType>( &harness_graph_executor<InputType, OutputType>::func ); 41551c0b2f7Stbbdev unlimited_concurrency<InputType,OutputType>( typename harness_graph_executor<InputType, OutputType>::functor() ); 41651c0b2f7Stbbdev } 41751c0b2f7Stbbdev 41851c0b2f7Stbbdev struct continue_msg_to_int { 41951c0b2f7Stbbdev int my_int; 42051c0b2f7Stbbdev continue_msg_to_int(int x) : my_int(x) {} 42151c0b2f7Stbbdev int operator()(tbb::flow::continue_msg) { return my_int; } 42251c0b2f7Stbbdev }; 42351c0b2f7Stbbdev 42451c0b2f7Stbbdev void test_function_node_with_continue_msg_as_input() { 42551c0b2f7Stbbdev // If this function terminates, then this test is successful 42651c0b2f7Stbbdev tbb::flow::graph g; 42751c0b2f7Stbbdev 42851c0b2f7Stbbdev tbb::flow::broadcast_node<tbb::flow::continue_msg> Start(g); 42951c0b2f7Stbbdev 43051c0b2f7Stbbdev tbb::flow::function_node<tbb::flow::continue_msg, int, tbb::flow::rejecting> FN1( g, tbb::flow::serial, continue_msg_to_int(42)); 43151c0b2f7Stbbdev tbb::flow::function_node<tbb::flow::continue_msg, int, tbb::flow::rejecting> FN2( g, tbb::flow::serial, continue_msg_to_int(43)); 43251c0b2f7Stbbdev 43351c0b2f7Stbbdev tbb::flow::make_edge( Start, FN1 ); 43451c0b2f7Stbbdev tbb::flow::make_edge( Start, FN2 ); 43551c0b2f7Stbbdev 43651c0b2f7Stbbdev Start.try_put( tbb::flow::continue_msg() ); 43751c0b2f7Stbbdev g.wait_for_all(); 43851c0b2f7Stbbdev } 43951c0b2f7Stbbdev 44051c0b2f7Stbbdev //! Tests limited concurrency cases for nodes that accept data messages 44151c0b2f7Stbbdev void test_concurrency(int num_threads) { 44251c0b2f7Stbbdev tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, num_threads); 44351c0b2f7Stbbdev run_concurrency_levels<int,int>(num_threads); 44451c0b2f7Stbbdev run_concurrency_levels<int,tbb::flow::continue_msg>(num_threads); 44551c0b2f7Stbbdev run_buffered_levels<int, int>(num_threads); 44651c0b2f7Stbbdev run_unlimited_concurrency<int,int>(); 44751c0b2f7Stbbdev run_unlimited_concurrency<int,empty_no_assign>(); 44851c0b2f7Stbbdev run_unlimited_concurrency<empty_no_assign,int>(); 44951c0b2f7Stbbdev run_unlimited_concurrency<empty_no_assign,empty_no_assign>(); 45051c0b2f7Stbbdev run_unlimited_concurrency<int,tbb::flow::continue_msg>(); 45151c0b2f7Stbbdev run_unlimited_concurrency<empty_no_assign,tbb::flow::continue_msg>(); 45251c0b2f7Stbbdev test_function_node_with_continue_msg_as_input(); 45351c0b2f7Stbbdev } 45451c0b2f7Stbbdev 45551c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 45651c0b2f7Stbbdev #include <array> 45751c0b2f7Stbbdev #include <vector> 45851c0b2f7Stbbdev void test_follows_and_precedes_api() { 45951c0b2f7Stbbdev using msg_t = tbb::flow::continue_msg; 46051c0b2f7Stbbdev 46151c0b2f7Stbbdev std::array<msg_t, 3> messages_for_follows = { {msg_t(), msg_t(), msg_t()} }; 46251c0b2f7Stbbdev std::vector<msg_t> messages_for_precedes = { msg_t() }; 46351c0b2f7Stbbdev 46451c0b2f7Stbbdev pass_through<msg_t> pass_msg; 46551c0b2f7Stbbdev 46651c0b2f7Stbbdev follows_and_precedes_testing::test_follows 46751c0b2f7Stbbdev <msg_t, tbb::flow::function_node<msg_t, msg_t>> 46851c0b2f7Stbbdev (messages_for_follows, tbb::flow::unlimited, pass_msg); 46951c0b2f7Stbbdev follows_and_precedes_testing::test_precedes 47051c0b2f7Stbbdev <msg_t, tbb::flow::function_node<msg_t, msg_t>> 47151c0b2f7Stbbdev (messages_for_precedes, tbb::flow::unlimited, pass_msg, tbb::flow::node_priority_t(1)); 47251c0b2f7Stbbdev } 47351c0b2f7Stbbdev #endif 47451c0b2f7Stbbdev 47551c0b2f7Stbbdev #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 47651c0b2f7Stbbdev 47751c0b2f7Stbbdev int function_body_f(const int&) { return 1; } 47851c0b2f7Stbbdev 47951c0b2f7Stbbdev template <typename Body> 48051c0b2f7Stbbdev void test_deduction_guides_common(Body body) { 48151c0b2f7Stbbdev using namespace tbb::flow; 48251c0b2f7Stbbdev graph g; 48351c0b2f7Stbbdev 48451c0b2f7Stbbdev function_node f1(g, unlimited, body); 48551c0b2f7Stbbdev static_assert(std::is_same_v<decltype(f1), function_node<int, int>>); 48651c0b2f7Stbbdev 48751c0b2f7Stbbdev function_node f2(g, unlimited, body, rejecting()); 48851c0b2f7Stbbdev static_assert(std::is_same_v<decltype(f2), function_node<int, int, rejecting>>); 48951c0b2f7Stbbdev 49051c0b2f7Stbbdev function_node f3(g, unlimited, body, node_priority_t(5)); 49151c0b2f7Stbbdev static_assert(std::is_same_v<decltype(f3), function_node<int, int>>); 49251c0b2f7Stbbdev 49351c0b2f7Stbbdev function_node f4(g, unlimited, body, rejecting(), node_priority_t(5)); 49451c0b2f7Stbbdev static_assert(std::is_same_v<decltype(f4), function_node<int, int, rejecting>>); 49551c0b2f7Stbbdev 49651c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 49751c0b2f7Stbbdev function_node f5(follows(f2), unlimited, body); 49851c0b2f7Stbbdev static_assert(std::is_same_v<decltype(f5), function_node<int, int>>); 49951c0b2f7Stbbdev 50051c0b2f7Stbbdev function_node f6(follows(f5), unlimited, body, rejecting()); 50151c0b2f7Stbbdev static_assert(std::is_same_v<decltype(f6), function_node<int, int, rejecting>>); 50251c0b2f7Stbbdev 50351c0b2f7Stbbdev function_node f7(follows(f6), unlimited, body, node_priority_t(5)); 50451c0b2f7Stbbdev static_assert(std::is_same_v<decltype(f7), function_node<int, int>>); 50551c0b2f7Stbbdev 50651c0b2f7Stbbdev function_node f8(follows(f7), unlimited, body, rejecting(), node_priority_t(5)); 50751c0b2f7Stbbdev static_assert(std::is_same_v<decltype(f8), function_node<int, int, rejecting>>); 50851c0b2f7Stbbdev #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 50951c0b2f7Stbbdev 51051c0b2f7Stbbdev function_node f9(f1); 51151c0b2f7Stbbdev static_assert(std::is_same_v<decltype(f9), function_node<int, int>>); 51251c0b2f7Stbbdev } 51351c0b2f7Stbbdev 51451c0b2f7Stbbdev void test_deduction_guides() { 51551c0b2f7Stbbdev test_deduction_guides_common([](const int&)->int { return 1; }); 51651c0b2f7Stbbdev test_deduction_guides_common([](const int&) mutable ->int { return 1; }); 51751c0b2f7Stbbdev test_deduction_guides_common(function_body_f); 51851c0b2f7Stbbdev } 51951c0b2f7Stbbdev 52051c0b2f7Stbbdev #endif 52151c0b2f7Stbbdev 52251c0b2f7Stbbdev //! Test various node bodies with concurrency 52351c0b2f7Stbbdev //! \brief \ref error_guessing 52451c0b2f7Stbbdev TEST_CASE("Concurrency test") { 52551c0b2f7Stbbdev for(unsigned int p = utils::MinThread; p <= utils::MaxThread; ++p ) { 52651c0b2f7Stbbdev test_concurrency(p); 52751c0b2f7Stbbdev } 52851c0b2f7Stbbdev } 52951c0b2f7Stbbdev 53051c0b2f7Stbbdev //! NativeParallelFor testing with various concurrency settings 53151c0b2f7Stbbdev //! \brief \ref error_guessing 53251c0b2f7Stbbdev TEST_CASE("Lightweight testing"){ 53351c0b2f7Stbbdev lightweight_testing::test<tbb::flow::function_node>(10); 53451c0b2f7Stbbdev } 53551c0b2f7Stbbdev 53651c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 53751c0b2f7Stbbdev //! Test follows and precedes API 53851c0b2f7Stbbdev //! \brief \ref error_guessing 53951c0b2f7Stbbdev TEST_CASE("Flowgraph node set test"){ 54051c0b2f7Stbbdev test_follows_and_precedes_api(); 54151c0b2f7Stbbdev } 54251c0b2f7Stbbdev #endif 54351c0b2f7Stbbdev 54451c0b2f7Stbbdev #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 54551c0b2f7Stbbdev //! Test decution guides 54651c0b2f7Stbbdev //! \brief \ref requirement 54751c0b2f7Stbbdev TEST_CASE("Deduction guides test"){ 54851c0b2f7Stbbdev test_deduction_guides(); 54951c0b2f7Stbbdev } 55051c0b2f7Stbbdev #endif 55151c0b2f7Stbbdev 55251c0b2f7Stbbdev //! try_release and try_consume test 55351c0b2f7Stbbdev //! \brief \ref error_guessing 55451c0b2f7Stbbdev TEST_CASE("try_release try_consume"){ 55551c0b2f7Stbbdev tbb::flow::graph g; 55651c0b2f7Stbbdev 55751c0b2f7Stbbdev tbb::flow::function_node<int, int> fn(g, tbb::flow::unlimited, [](const int&v){return v;}); 55851c0b2f7Stbbdev 55951c0b2f7Stbbdev CHECK_MESSAGE((fn.try_release()==false), "try_release should initially return false on a node"); 56051c0b2f7Stbbdev CHECK_MESSAGE((fn.try_consume()==false), "try_consume should initially return false on a node"); 56151c0b2f7Stbbdev } 562