1*51c0b2f7Stbbdev /* 2*51c0b2f7Stbbdev Copyright (c) 2005-2020 Intel Corporation 3*51c0b2f7Stbbdev 4*51c0b2f7Stbbdev Licensed under the Apache License, Version 2.0 (the "License"); 5*51c0b2f7Stbbdev you may not use this file except in compliance with the License. 6*51c0b2f7Stbbdev You may obtain a copy of the License at 7*51c0b2f7Stbbdev 8*51c0b2f7Stbbdev http://www.apache.org/licenses/LICENSE-2.0 9*51c0b2f7Stbbdev 10*51c0b2f7Stbbdev Unless required by applicable law or agreed to in writing, software 11*51c0b2f7Stbbdev distributed under the License is distributed on an "AS IS" BASIS, 12*51c0b2f7Stbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13*51c0b2f7Stbbdev See the License for the specific language governing permissions and 14*51c0b2f7Stbbdev limitations under the License. 15*51c0b2f7Stbbdev */ 16*51c0b2f7Stbbdev 17*51c0b2f7Stbbdev #include "common/config.h" 18*51c0b2f7Stbbdev 19*51c0b2f7Stbbdev // TODO revamp: move parts dependent on __TBB_EXTRA_DEBUG into separate test(s) since having these 20*51c0b2f7Stbbdev // parts in all of tests might make testing of the product, which is different from what is actually 21*51c0b2f7Stbbdev // released. 22*51c0b2f7Stbbdev #define __TBB_EXTRA_DEBUG 1 23*51c0b2f7Stbbdev #include "tbb/flow_graph.h" 24*51c0b2f7Stbbdev #include "tbb/spin_rw_mutex.h" 25*51c0b2f7Stbbdev #include "tbb/global_control.h" 26*51c0b2f7Stbbdev 27*51c0b2f7Stbbdev #include "common/test.h" 28*51c0b2f7Stbbdev #include "common/utils.h" 29*51c0b2f7Stbbdev #include "common/graph_utils.h" 30*51c0b2f7Stbbdev #include "common/test_follows_and_precedes_api.h" 31*51c0b2f7Stbbdev 32*51c0b2f7Stbbdev 33*51c0b2f7Stbbdev //! \file test_function_node.cpp 34*51c0b2f7Stbbdev //! \brief Test for [flow_graph.function_node] specification 35*51c0b2f7Stbbdev 36*51c0b2f7Stbbdev 37*51c0b2f7Stbbdev #define N 100 38*51c0b2f7Stbbdev #define MAX_NODES 4 39*51c0b2f7Stbbdev 40*51c0b2f7Stbbdev //! Performs test on function nodes with limited concurrency and buffering 41*51c0b2f7Stbbdev /** These tests check: 42*51c0b2f7Stbbdev 1) that the number of executing copies never exceed the concurrency limit 43*51c0b2f7Stbbdev 2) that the node never rejects 44*51c0b2f7Stbbdev 3) that no items are lost 45*51c0b2f7Stbbdev and 4) all of this happens even if there are multiple predecessors and successors 46*51c0b2f7Stbbdev */ 47*51c0b2f7Stbbdev 48*51c0b2f7Stbbdev template<typename IO> 49*51c0b2f7Stbbdev struct pass_through { 50*51c0b2f7Stbbdev IO operator()(const IO& i) { return i; } 51*51c0b2f7Stbbdev }; 52*51c0b2f7Stbbdev 53*51c0b2f7Stbbdev template< typename InputType, typename OutputType, typename Body > 54*51c0b2f7Stbbdev void buffered_levels( size_t concurrency, Body body ) { 55*51c0b2f7Stbbdev 56*51c0b2f7Stbbdev // Do for lc = 1 to concurrency level 57*51c0b2f7Stbbdev for ( size_t lc = 1; lc <= concurrency; ++lc ) { 58*51c0b2f7Stbbdev tbb::flow::graph g; 59*51c0b2f7Stbbdev 60*51c0b2f7Stbbdev // Set the execute_counter back to zero in the harness 61*51c0b2f7Stbbdev harness_graph_executor<InputType, OutputType>::execute_count = 0; 62*51c0b2f7Stbbdev // Set the number of current executors to zero. 63*51c0b2f7Stbbdev harness_graph_executor<InputType, OutputType>::current_executors = 0; 64*51c0b2f7Stbbdev // Set the max allowed executors to lc. There is a check in the functor to make sure this is never exceeded. 65*51c0b2f7Stbbdev harness_graph_executor<InputType, OutputType>::max_executors = lc; 66*51c0b2f7Stbbdev 67*51c0b2f7Stbbdev // Create the function_node with the appropriate concurrency level, and use default buffering 68*51c0b2f7Stbbdev tbb::flow::function_node< InputType, OutputType > exe_node( g, lc, body ); 69*51c0b2f7Stbbdev tbb::flow::function_node<InputType, InputType> pass_thru( g, tbb::flow::unlimited, pass_through<InputType>()); 70*51c0b2f7Stbbdev 71*51c0b2f7Stbbdev // Create a vector of identical exe_nodes and pass_thrus 72*51c0b2f7Stbbdev std::vector< tbb::flow::function_node< InputType, OutputType > > exe_vec(2, exe_node); 73*51c0b2f7Stbbdev std::vector< tbb::flow::function_node< InputType, InputType > > pass_thru_vec(2, pass_thru); 74*51c0b2f7Stbbdev // Attach each pass_thru to its corresponding exe_node 75*51c0b2f7Stbbdev for (size_t node_idx=0; node_idx<exe_vec.size(); ++node_idx) { 76*51c0b2f7Stbbdev tbb::flow::make_edge(pass_thru_vec[node_idx], exe_vec[node_idx]); 77*51c0b2f7Stbbdev } 78*51c0b2f7Stbbdev 79*51c0b2f7Stbbdev // TODO: why the test is executed serially for the node pairs, not concurrently? 80*51c0b2f7Stbbdev for (size_t node_idx=0; node_idx<exe_vec.size(); ++node_idx) { 81*51c0b2f7Stbbdev // For num_receivers = 1 to MAX_NODES 82*51c0b2f7Stbbdev for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { 83*51c0b2f7Stbbdev // Create num_receivers counting receivers and connect the exe_vec[node_idx] to them. 84*51c0b2f7Stbbdev std::vector< std::shared_ptr<harness_mapped_receiver<OutputType>> > receivers; 85*51c0b2f7Stbbdev for (size_t i = 0; i < num_receivers; i++) { 86*51c0b2f7Stbbdev receivers.push_back( std::make_shared<harness_mapped_receiver<OutputType>>(g) ); 87*51c0b2f7Stbbdev } 88*51c0b2f7Stbbdev 89*51c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 90*51c0b2f7Stbbdev tbb::flow::make_edge( exe_vec[node_idx], *receivers[r] ); 91*51c0b2f7Stbbdev } 92*51c0b2f7Stbbdev 93*51c0b2f7Stbbdev // Do the test with varying numbers of senders 94*51c0b2f7Stbbdev std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders; 95*51c0b2f7Stbbdev for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) { 96*51c0b2f7Stbbdev // Create num_senders senders, set there message limit each to N, and connect them to 97*51c0b2f7Stbbdev // pass_thru_vec[node_idx] 98*51c0b2f7Stbbdev senders.clear(); 99*51c0b2f7Stbbdev for (size_t s = 0; s < num_senders; ++s ) { 100*51c0b2f7Stbbdev senders.push_back( std::make_shared<harness_counting_sender<InputType>>() ); 101*51c0b2f7Stbbdev senders.back()->my_limit = N; 102*51c0b2f7Stbbdev senders.back()->register_successor(pass_thru_vec[node_idx] ); 103*51c0b2f7Stbbdev } 104*51c0b2f7Stbbdev 105*51c0b2f7Stbbdev // Initialize the receivers so they know how many senders and messages to check for 106*51c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 107*51c0b2f7Stbbdev receivers[r]->initialize_map( N, num_senders ); 108*51c0b2f7Stbbdev } 109*51c0b2f7Stbbdev 110*51c0b2f7Stbbdev // Do the test 111*51c0b2f7Stbbdev utils::NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) ); 112*51c0b2f7Stbbdev g.wait_for_all(); 113*51c0b2f7Stbbdev 114*51c0b2f7Stbbdev // confirm that each sender was requested from N times 115*51c0b2f7Stbbdev for (size_t s = 0; s < num_senders; ++s ) { 116*51c0b2f7Stbbdev size_t n = senders[s]->my_received; 117*51c0b2f7Stbbdev CHECK( n == N ); 118*51c0b2f7Stbbdev CHECK( senders[s]->my_receiver.load(std::memory_order_relaxed) == &pass_thru_vec[node_idx] ); 119*51c0b2f7Stbbdev } 120*51c0b2f7Stbbdev // validate the receivers 121*51c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 122*51c0b2f7Stbbdev receivers[r]->validate(); 123*51c0b2f7Stbbdev } 124*51c0b2f7Stbbdev } 125*51c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 126*51c0b2f7Stbbdev tbb::flow::remove_edge( exe_vec[node_idx], *receivers[r] ); 127*51c0b2f7Stbbdev } 128*51c0b2f7Stbbdev CHECK( exe_vec[node_idx].try_put( InputType() ) == true ); 129*51c0b2f7Stbbdev g.wait_for_all(); 130*51c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 131*51c0b2f7Stbbdev // since it's detached, nothing should have changed 132*51c0b2f7Stbbdev receivers[r]->validate(); 133*51c0b2f7Stbbdev } 134*51c0b2f7Stbbdev 135*51c0b2f7Stbbdev } // for num_receivers 136*51c0b2f7Stbbdev } // for node_idx 137*51c0b2f7Stbbdev } // for concurrency level lc 138*51c0b2f7Stbbdev } 139*51c0b2f7Stbbdev 140*51c0b2f7Stbbdev const size_t Offset = 123; 141*51c0b2f7Stbbdev std::atomic<size_t> global_execute_count; 142*51c0b2f7Stbbdev 143*51c0b2f7Stbbdev struct inc_functor { 144*51c0b2f7Stbbdev 145*51c0b2f7Stbbdev std::atomic<size_t> local_execute_count; 146*51c0b2f7Stbbdev inc_functor( ) { local_execute_count = 0; } 147*51c0b2f7Stbbdev inc_functor( const inc_functor &f ) { local_execute_count = size_t(f.local_execute_count); } 148*51c0b2f7Stbbdev void operator=( const inc_functor &f ) { local_execute_count = size_t(f.local_execute_count); } 149*51c0b2f7Stbbdev 150*51c0b2f7Stbbdev int operator()( int i ) { 151*51c0b2f7Stbbdev ++global_execute_count; 152*51c0b2f7Stbbdev ++local_execute_count; 153*51c0b2f7Stbbdev return i; 154*51c0b2f7Stbbdev } 155*51c0b2f7Stbbdev 156*51c0b2f7Stbbdev }; 157*51c0b2f7Stbbdev 158*51c0b2f7Stbbdev template< typename InputType, typename OutputType > 159*51c0b2f7Stbbdev void buffered_levels_with_copy( size_t concurrency ) { 160*51c0b2f7Stbbdev 161*51c0b2f7Stbbdev // Do for lc = 1 to concurrency level 162*51c0b2f7Stbbdev for ( size_t lc = 1; lc <= concurrency; ++lc ) { 163*51c0b2f7Stbbdev tbb::flow::graph g; 164*51c0b2f7Stbbdev 165*51c0b2f7Stbbdev inc_functor cf; 166*51c0b2f7Stbbdev cf.local_execute_count = Offset; 167*51c0b2f7Stbbdev global_execute_count = Offset; 168*51c0b2f7Stbbdev 169*51c0b2f7Stbbdev tbb::flow::function_node< InputType, OutputType > exe_node( g, lc, cf ); 170*51c0b2f7Stbbdev 171*51c0b2f7Stbbdev for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { 172*51c0b2f7Stbbdev 173*51c0b2f7Stbbdev std::vector< std::shared_ptr<harness_mapped_receiver<OutputType>> > receivers; 174*51c0b2f7Stbbdev for (size_t i = 0; i < num_receivers; i++) { 175*51c0b2f7Stbbdev receivers.push_back( std::make_shared<harness_mapped_receiver<OutputType>>(g) ); 176*51c0b2f7Stbbdev } 177*51c0b2f7Stbbdev 178*51c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 179*51c0b2f7Stbbdev tbb::flow::make_edge( exe_node, *receivers[r] ); 180*51c0b2f7Stbbdev } 181*51c0b2f7Stbbdev 182*51c0b2f7Stbbdev std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders; 183*51c0b2f7Stbbdev for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) { 184*51c0b2f7Stbbdev senders.clear(); 185*51c0b2f7Stbbdev for (size_t s = 0; s < num_senders; ++s ) { 186*51c0b2f7Stbbdev senders.push_back( std::make_shared<harness_counting_sender<InputType>>() ); 187*51c0b2f7Stbbdev senders.back()->my_limit = N; 188*51c0b2f7Stbbdev tbb::flow::make_edge( *senders.back(), exe_node ); 189*51c0b2f7Stbbdev } 190*51c0b2f7Stbbdev 191*51c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 192*51c0b2f7Stbbdev receivers[r]->initialize_map( N, num_senders ); 193*51c0b2f7Stbbdev } 194*51c0b2f7Stbbdev 195*51c0b2f7Stbbdev utils::NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) ); 196*51c0b2f7Stbbdev g.wait_for_all(); 197*51c0b2f7Stbbdev 198*51c0b2f7Stbbdev for (size_t s = 0; s < num_senders; ++s ) { 199*51c0b2f7Stbbdev size_t n = senders[s]->my_received; 200*51c0b2f7Stbbdev CHECK( n == N ); 201*51c0b2f7Stbbdev CHECK( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_node ); 202*51c0b2f7Stbbdev } 203*51c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 204*51c0b2f7Stbbdev receivers[r]->validate(); 205*51c0b2f7Stbbdev } 206*51c0b2f7Stbbdev } 207*51c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 208*51c0b2f7Stbbdev tbb::flow::remove_edge( exe_node, *receivers[r] ); 209*51c0b2f7Stbbdev } 210*51c0b2f7Stbbdev CHECK( exe_node.try_put( InputType() ) == true ); 211*51c0b2f7Stbbdev g.wait_for_all(); 212*51c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 213*51c0b2f7Stbbdev receivers[r]->validate(); 214*51c0b2f7Stbbdev } 215*51c0b2f7Stbbdev } 216*51c0b2f7Stbbdev 217*51c0b2f7Stbbdev // validate that the local body matches the global execute_count and both are correct 218*51c0b2f7Stbbdev inc_functor body_copy = tbb::flow::copy_body<inc_functor>( exe_node ); 219*51c0b2f7Stbbdev const size_t expected_count = N/2 * MAX_NODES * MAX_NODES * ( MAX_NODES + 1 ) + MAX_NODES + Offset; 220*51c0b2f7Stbbdev size_t global_count = global_execute_count; 221*51c0b2f7Stbbdev size_t inc_count = body_copy.local_execute_count; 222*51c0b2f7Stbbdev CHECK(global_count == expected_count); 223*51c0b2f7Stbbdev CHECK(global_count == inc_count ); 224*51c0b2f7Stbbdev g.reset(tbb::flow::rf_reset_bodies); 225*51c0b2f7Stbbdev body_copy = tbb::flow::copy_body<inc_functor>( exe_node ); 226*51c0b2f7Stbbdev inc_count = body_copy.local_execute_count; 227*51c0b2f7Stbbdev CHECK_MESSAGE( Offset == inc_count, "reset(rf_reset_bodies) did not reset functor" ); 228*51c0b2f7Stbbdev } 229*51c0b2f7Stbbdev } 230*51c0b2f7Stbbdev 231*51c0b2f7Stbbdev template< typename InputType, typename OutputType > 232*51c0b2f7Stbbdev void run_buffered_levels( int c ) { 233*51c0b2f7Stbbdev buffered_levels<InputType,OutputType>( c, []( InputType i ) -> OutputType { return harness_graph_executor<InputType, OutputType>::func(i); } ); 234*51c0b2f7Stbbdev buffered_levels<InputType,OutputType>( c, &harness_graph_executor<InputType, OutputType>::func ); 235*51c0b2f7Stbbdev buffered_levels<InputType,OutputType>( c, typename harness_graph_executor<InputType, OutputType>::functor() ); 236*51c0b2f7Stbbdev buffered_levels_with_copy<InputType,OutputType>( c ); 237*51c0b2f7Stbbdev } 238*51c0b2f7Stbbdev 239*51c0b2f7Stbbdev 240*51c0b2f7Stbbdev //! Performs test on executable nodes with limited concurrency 241*51c0b2f7Stbbdev /** These tests check: 242*51c0b2f7Stbbdev 1) that the nodes will accepts puts up to the concurrency limit, 243*51c0b2f7Stbbdev 2) the nodes do not exceed the concurrency limit even when run with more threads (this is checked in the harness_graph_executor), 244*51c0b2f7Stbbdev 3) the nodes will receive puts from multiple successors simultaneously, 245*51c0b2f7Stbbdev and 4) the nodes will send to multiple predecessors. 246*51c0b2f7Stbbdev There is no checking of the contents of the messages for corruption. 247*51c0b2f7Stbbdev */ 248*51c0b2f7Stbbdev 249*51c0b2f7Stbbdev template< typename InputType, typename OutputType, typename Body > 250*51c0b2f7Stbbdev void concurrency_levels( size_t concurrency, Body body ) { 251*51c0b2f7Stbbdev 252*51c0b2f7Stbbdev for ( size_t lc = 1; lc <= concurrency; ++lc ) { 253*51c0b2f7Stbbdev tbb::flow::graph g; 254*51c0b2f7Stbbdev 255*51c0b2f7Stbbdev // Set the execute_counter back to zero in the harness 256*51c0b2f7Stbbdev harness_graph_executor<InputType, OutputType>::execute_count = 0; 257*51c0b2f7Stbbdev // Set the number of current executors to zero. 258*51c0b2f7Stbbdev harness_graph_executor<InputType, OutputType>::current_executors = 0; 259*51c0b2f7Stbbdev // Set the max allowed executors to lc. There is a check in the functor to make sure this is never exceeded. 260*51c0b2f7Stbbdev harness_graph_executor<InputType, OutputType>::max_executors = lc; 261*51c0b2f7Stbbdev 262*51c0b2f7Stbbdev typedef tbb::flow::function_node< InputType, OutputType, tbb::flow::rejecting > fnode_type; 263*51c0b2f7Stbbdev fnode_type exe_node( g, lc, body ); 264*51c0b2f7Stbbdev 265*51c0b2f7Stbbdev for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { 266*51c0b2f7Stbbdev 267*51c0b2f7Stbbdev std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers; 268*51c0b2f7Stbbdev for (size_t i = 0; i < num_receivers; ++i) { 269*51c0b2f7Stbbdev receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) ); 270*51c0b2f7Stbbdev } 271*51c0b2f7Stbbdev 272*51c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 273*51c0b2f7Stbbdev tbb::flow::make_edge( exe_node, *receivers[r] ); 274*51c0b2f7Stbbdev } 275*51c0b2f7Stbbdev 276*51c0b2f7Stbbdev std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders; 277*51c0b2f7Stbbdev 278*51c0b2f7Stbbdev for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) { 279*51c0b2f7Stbbdev senders.clear(); 280*51c0b2f7Stbbdev { 281*51c0b2f7Stbbdev // Exclusively lock m to prevent exe_node from finishing 282*51c0b2f7Stbbdev tbb::spin_rw_mutex::scoped_lock l( 283*51c0b2f7Stbbdev harness_graph_executor<InputType, OutputType>::template mutex_holder<tbb::spin_rw_mutex>::mutex 284*51c0b2f7Stbbdev ); 285*51c0b2f7Stbbdev 286*51c0b2f7Stbbdev // put to lc level, it will accept and then block at m 287*51c0b2f7Stbbdev for ( size_t c = 0 ; c < lc ; ++c ) { 288*51c0b2f7Stbbdev CHECK( exe_node.try_put( InputType() ) == true ); 289*51c0b2f7Stbbdev } 290*51c0b2f7Stbbdev // it only accepts to lc level 291*51c0b2f7Stbbdev CHECK( exe_node.try_put( InputType() ) == false ); 292*51c0b2f7Stbbdev 293*51c0b2f7Stbbdev for (size_t s = 0; s < num_senders; ++s ) { 294*51c0b2f7Stbbdev senders.push_back( std::make_shared<harness_counting_sender<InputType>>() ); 295*51c0b2f7Stbbdev // register a sender 296*51c0b2f7Stbbdev senders.back()->my_limit = N; 297*51c0b2f7Stbbdev exe_node.register_predecessor( *senders.back() ); 298*51c0b2f7Stbbdev } 299*51c0b2f7Stbbdev 300*51c0b2f7Stbbdev } // release lock at end of scope, setting the exe node free to continue 301*51c0b2f7Stbbdev // wait for graph to settle down 302*51c0b2f7Stbbdev g.wait_for_all(); 303*51c0b2f7Stbbdev 304*51c0b2f7Stbbdev // confirm that each sender was requested from N times 305*51c0b2f7Stbbdev for (size_t s = 0; s < num_senders; ++s ) { 306*51c0b2f7Stbbdev size_t n = senders[s]->my_received; 307*51c0b2f7Stbbdev CHECK( n == N ); 308*51c0b2f7Stbbdev CHECK( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_node ); 309*51c0b2f7Stbbdev } 310*51c0b2f7Stbbdev // confirm that each receivers got N * num_senders + the initial lc puts 311*51c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 312*51c0b2f7Stbbdev size_t n = receivers[r]->my_count; 313*51c0b2f7Stbbdev CHECK( n == num_senders*N+lc ); 314*51c0b2f7Stbbdev receivers[r]->my_count = 0; 315*51c0b2f7Stbbdev } 316*51c0b2f7Stbbdev } 317*51c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 318*51c0b2f7Stbbdev tbb::flow::remove_edge( exe_node, *receivers[r] ); 319*51c0b2f7Stbbdev } 320*51c0b2f7Stbbdev CHECK( exe_node.try_put( InputType() ) == true ); 321*51c0b2f7Stbbdev g.wait_for_all(); 322*51c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 323*51c0b2f7Stbbdev CHECK( int(receivers[r]->my_count) == 0 ); 324*51c0b2f7Stbbdev } 325*51c0b2f7Stbbdev } 326*51c0b2f7Stbbdev } 327*51c0b2f7Stbbdev } 328*51c0b2f7Stbbdev 329*51c0b2f7Stbbdev 330*51c0b2f7Stbbdev template< typename InputType, typename OutputType > 331*51c0b2f7Stbbdev void run_concurrency_levels( int c ) { 332*51c0b2f7Stbbdev concurrency_levels<InputType,OutputType>( c, []( InputType i ) -> OutputType { return harness_graph_executor<InputType, OutputType>::template tfunc<tbb::spin_rw_mutex>(i); } ); 333*51c0b2f7Stbbdev concurrency_levels<InputType,OutputType>( c, &harness_graph_executor<InputType, OutputType>::template tfunc<tbb::spin_rw_mutex> ); 334*51c0b2f7Stbbdev concurrency_levels<InputType,OutputType>( c, typename harness_graph_executor<InputType, OutputType>::template tfunctor<tbb::spin_rw_mutex>() ); 335*51c0b2f7Stbbdev } 336*51c0b2f7Stbbdev 337*51c0b2f7Stbbdev 338*51c0b2f7Stbbdev struct empty_no_assign { 339*51c0b2f7Stbbdev empty_no_assign() {} 340*51c0b2f7Stbbdev empty_no_assign( int ) {} 341*51c0b2f7Stbbdev operator int() { return 0; } 342*51c0b2f7Stbbdev }; 343*51c0b2f7Stbbdev 344*51c0b2f7Stbbdev template< typename InputType > 345*51c0b2f7Stbbdev struct parallel_puts : private utils::NoAssign { 346*51c0b2f7Stbbdev 347*51c0b2f7Stbbdev tbb::flow::receiver< InputType > * const my_exe_node; 348*51c0b2f7Stbbdev 349*51c0b2f7Stbbdev parallel_puts( tbb::flow::receiver< InputType > &exe_node ) : my_exe_node(&exe_node) {} 350*51c0b2f7Stbbdev 351*51c0b2f7Stbbdev void operator()( int ) const { 352*51c0b2f7Stbbdev for ( int i = 0; i < N; ++i ) { 353*51c0b2f7Stbbdev // the nodes will accept all puts 354*51c0b2f7Stbbdev CHECK( my_exe_node->try_put( InputType() ) == true ); 355*51c0b2f7Stbbdev } 356*51c0b2f7Stbbdev } 357*51c0b2f7Stbbdev 358*51c0b2f7Stbbdev }; 359*51c0b2f7Stbbdev 360*51c0b2f7Stbbdev //! Performs test on executable nodes with unlimited concurrency 361*51c0b2f7Stbbdev /** These tests check: 362*51c0b2f7Stbbdev 1) that the nodes will accept all puts 363*51c0b2f7Stbbdev 2) the nodes will receive puts from multiple predecessors simultaneously, 364*51c0b2f7Stbbdev and 3) the nodes will send to multiple successors. 365*51c0b2f7Stbbdev There is no checking of the contents of the messages for corruption. 366*51c0b2f7Stbbdev */ 367*51c0b2f7Stbbdev 368*51c0b2f7Stbbdev template< typename InputType, typename OutputType, typename Body > 369*51c0b2f7Stbbdev void unlimited_concurrency( Body body ) { 370*51c0b2f7Stbbdev 371*51c0b2f7Stbbdev for (unsigned p = 1; p < 2*utils::MaxThread; ++p) { 372*51c0b2f7Stbbdev tbb::flow::graph g; 373*51c0b2f7Stbbdev tbb::flow::function_node< InputType, OutputType, tbb::flow::rejecting > exe_node( g, tbb::flow::unlimited, body ); 374*51c0b2f7Stbbdev 375*51c0b2f7Stbbdev for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { 376*51c0b2f7Stbbdev 377*51c0b2f7Stbbdev std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers; 378*51c0b2f7Stbbdev for (size_t i = 0; i < num_receivers; ++i) { 379*51c0b2f7Stbbdev receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) ); 380*51c0b2f7Stbbdev } 381*51c0b2f7Stbbdev 382*51c0b2f7Stbbdev harness_graph_executor<InputType, OutputType>::execute_count = 0; 383*51c0b2f7Stbbdev 384*51c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 385*51c0b2f7Stbbdev tbb::flow::make_edge( exe_node, *receivers[r] ); 386*51c0b2f7Stbbdev } 387*51c0b2f7Stbbdev 388*51c0b2f7Stbbdev utils::NativeParallelFor( p, parallel_puts<InputType>(exe_node) ); 389*51c0b2f7Stbbdev g.wait_for_all(); 390*51c0b2f7Stbbdev 391*51c0b2f7Stbbdev // 2) the nodes will receive puts from multiple predecessors simultaneously, 392*51c0b2f7Stbbdev size_t ec = harness_graph_executor<InputType, OutputType>::execute_count; 393*51c0b2f7Stbbdev CHECK( ec == p*N ); 394*51c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 395*51c0b2f7Stbbdev size_t c = receivers[r]->my_count; 396*51c0b2f7Stbbdev // 3) the nodes will send to multiple successors. 397*51c0b2f7Stbbdev CHECK( c == p*N ); 398*51c0b2f7Stbbdev } 399*51c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 400*51c0b2f7Stbbdev tbb::flow::remove_edge( exe_node, *receivers[r] ); 401*51c0b2f7Stbbdev } 402*51c0b2f7Stbbdev } 403*51c0b2f7Stbbdev } 404*51c0b2f7Stbbdev } 405*51c0b2f7Stbbdev 406*51c0b2f7Stbbdev template< typename InputType, typename OutputType > 407*51c0b2f7Stbbdev void run_unlimited_concurrency() { 408*51c0b2f7Stbbdev harness_graph_executor<InputType, OutputType>::max_executors = 0; 409*51c0b2f7Stbbdev unlimited_concurrency<InputType,OutputType>( []( InputType i ) -> OutputType { return harness_graph_executor<InputType, OutputType>::func(i); } ); 410*51c0b2f7Stbbdev unlimited_concurrency<InputType,OutputType>( &harness_graph_executor<InputType, OutputType>::func ); 411*51c0b2f7Stbbdev unlimited_concurrency<InputType,OutputType>( typename harness_graph_executor<InputType, OutputType>::functor() ); 412*51c0b2f7Stbbdev } 413*51c0b2f7Stbbdev 414*51c0b2f7Stbbdev struct continue_msg_to_int { 415*51c0b2f7Stbbdev int my_int; 416*51c0b2f7Stbbdev continue_msg_to_int(int x) : my_int(x) {} 417*51c0b2f7Stbbdev int operator()(tbb::flow::continue_msg) { return my_int; } 418*51c0b2f7Stbbdev }; 419*51c0b2f7Stbbdev 420*51c0b2f7Stbbdev void test_function_node_with_continue_msg_as_input() { 421*51c0b2f7Stbbdev // If this function terminates, then this test is successful 422*51c0b2f7Stbbdev tbb::flow::graph g; 423*51c0b2f7Stbbdev 424*51c0b2f7Stbbdev tbb::flow::broadcast_node<tbb::flow::continue_msg> Start(g); 425*51c0b2f7Stbbdev 426*51c0b2f7Stbbdev tbb::flow::function_node<tbb::flow::continue_msg, int, tbb::flow::rejecting> FN1( g, tbb::flow::serial, continue_msg_to_int(42)); 427*51c0b2f7Stbbdev tbb::flow::function_node<tbb::flow::continue_msg, int, tbb::flow::rejecting> FN2( g, tbb::flow::serial, continue_msg_to_int(43)); 428*51c0b2f7Stbbdev 429*51c0b2f7Stbbdev tbb::flow::make_edge( Start, FN1 ); 430*51c0b2f7Stbbdev tbb::flow::make_edge( Start, FN2 ); 431*51c0b2f7Stbbdev 432*51c0b2f7Stbbdev Start.try_put( tbb::flow::continue_msg() ); 433*51c0b2f7Stbbdev g.wait_for_all(); 434*51c0b2f7Stbbdev } 435*51c0b2f7Stbbdev 436*51c0b2f7Stbbdev //! Tests limited concurrency cases for nodes that accept data messages 437*51c0b2f7Stbbdev void test_concurrency(int num_threads) { 438*51c0b2f7Stbbdev tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, num_threads); 439*51c0b2f7Stbbdev run_concurrency_levels<int,int>(num_threads); 440*51c0b2f7Stbbdev run_concurrency_levels<int,tbb::flow::continue_msg>(num_threads); 441*51c0b2f7Stbbdev run_buffered_levels<int, int>(num_threads); 442*51c0b2f7Stbbdev run_unlimited_concurrency<int,int>(); 443*51c0b2f7Stbbdev run_unlimited_concurrency<int,empty_no_assign>(); 444*51c0b2f7Stbbdev run_unlimited_concurrency<empty_no_assign,int>(); 445*51c0b2f7Stbbdev run_unlimited_concurrency<empty_no_assign,empty_no_assign>(); 446*51c0b2f7Stbbdev run_unlimited_concurrency<int,tbb::flow::continue_msg>(); 447*51c0b2f7Stbbdev run_unlimited_concurrency<empty_no_assign,tbb::flow::continue_msg>(); 448*51c0b2f7Stbbdev test_function_node_with_continue_msg_as_input(); 449*51c0b2f7Stbbdev } 450*51c0b2f7Stbbdev 451*51c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 452*51c0b2f7Stbbdev #include <array> 453*51c0b2f7Stbbdev #include <vector> 454*51c0b2f7Stbbdev void test_follows_and_precedes_api() { 455*51c0b2f7Stbbdev using msg_t = tbb::flow::continue_msg; 456*51c0b2f7Stbbdev 457*51c0b2f7Stbbdev std::array<msg_t, 3> messages_for_follows = { {msg_t(), msg_t(), msg_t()} }; 458*51c0b2f7Stbbdev std::vector<msg_t> messages_for_precedes = { msg_t() }; 459*51c0b2f7Stbbdev 460*51c0b2f7Stbbdev pass_through<msg_t> pass_msg; 461*51c0b2f7Stbbdev 462*51c0b2f7Stbbdev follows_and_precedes_testing::test_follows 463*51c0b2f7Stbbdev <msg_t, tbb::flow::function_node<msg_t, msg_t>> 464*51c0b2f7Stbbdev (messages_for_follows, tbb::flow::unlimited, pass_msg); 465*51c0b2f7Stbbdev follows_and_precedes_testing::test_precedes 466*51c0b2f7Stbbdev <msg_t, tbb::flow::function_node<msg_t, msg_t>> 467*51c0b2f7Stbbdev (messages_for_precedes, tbb::flow::unlimited, pass_msg, tbb::flow::node_priority_t(1)); 468*51c0b2f7Stbbdev } 469*51c0b2f7Stbbdev #endif 470*51c0b2f7Stbbdev 471*51c0b2f7Stbbdev #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 472*51c0b2f7Stbbdev 473*51c0b2f7Stbbdev int function_body_f(const int&) { return 1; } 474*51c0b2f7Stbbdev 475*51c0b2f7Stbbdev template <typename Body> 476*51c0b2f7Stbbdev void test_deduction_guides_common(Body body) { 477*51c0b2f7Stbbdev using namespace tbb::flow; 478*51c0b2f7Stbbdev graph g; 479*51c0b2f7Stbbdev 480*51c0b2f7Stbbdev function_node f1(g, unlimited, body); 481*51c0b2f7Stbbdev static_assert(std::is_same_v<decltype(f1), function_node<int, int>>); 482*51c0b2f7Stbbdev 483*51c0b2f7Stbbdev function_node f2(g, unlimited, body, rejecting()); 484*51c0b2f7Stbbdev static_assert(std::is_same_v<decltype(f2), function_node<int, int, rejecting>>); 485*51c0b2f7Stbbdev 486*51c0b2f7Stbbdev function_node f3(g, unlimited, body, node_priority_t(5)); 487*51c0b2f7Stbbdev static_assert(std::is_same_v<decltype(f3), function_node<int, int>>); 488*51c0b2f7Stbbdev 489*51c0b2f7Stbbdev function_node f4(g, unlimited, body, rejecting(), node_priority_t(5)); 490*51c0b2f7Stbbdev static_assert(std::is_same_v<decltype(f4), function_node<int, int, rejecting>>); 491*51c0b2f7Stbbdev 492*51c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 493*51c0b2f7Stbbdev function_node f5(follows(f2), unlimited, body); 494*51c0b2f7Stbbdev static_assert(std::is_same_v<decltype(f5), function_node<int, int>>); 495*51c0b2f7Stbbdev 496*51c0b2f7Stbbdev function_node f6(follows(f5), unlimited, body, rejecting()); 497*51c0b2f7Stbbdev static_assert(std::is_same_v<decltype(f6), function_node<int, int, rejecting>>); 498*51c0b2f7Stbbdev 499*51c0b2f7Stbbdev function_node f7(follows(f6), unlimited, body, node_priority_t(5)); 500*51c0b2f7Stbbdev static_assert(std::is_same_v<decltype(f7), function_node<int, int>>); 501*51c0b2f7Stbbdev 502*51c0b2f7Stbbdev function_node f8(follows(f7), unlimited, body, rejecting(), node_priority_t(5)); 503*51c0b2f7Stbbdev static_assert(std::is_same_v<decltype(f8), function_node<int, int, rejecting>>); 504*51c0b2f7Stbbdev #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 505*51c0b2f7Stbbdev 506*51c0b2f7Stbbdev function_node f9(f1); 507*51c0b2f7Stbbdev static_assert(std::is_same_v<decltype(f9), function_node<int, int>>); 508*51c0b2f7Stbbdev } 509*51c0b2f7Stbbdev 510*51c0b2f7Stbbdev void test_deduction_guides() { 511*51c0b2f7Stbbdev test_deduction_guides_common([](const int&)->int { return 1; }); 512*51c0b2f7Stbbdev test_deduction_guides_common([](const int&) mutable ->int { return 1; }); 513*51c0b2f7Stbbdev test_deduction_guides_common(function_body_f); 514*51c0b2f7Stbbdev } 515*51c0b2f7Stbbdev 516*51c0b2f7Stbbdev #endif 517*51c0b2f7Stbbdev 518*51c0b2f7Stbbdev //! Test various node bodies with concurrency 519*51c0b2f7Stbbdev //! \brief \ref error_guessing 520*51c0b2f7Stbbdev TEST_CASE("Concurrency test") { 521*51c0b2f7Stbbdev for(unsigned int p = utils::MinThread; p <= utils::MaxThread; ++p ) { 522*51c0b2f7Stbbdev test_concurrency(p); 523*51c0b2f7Stbbdev } 524*51c0b2f7Stbbdev } 525*51c0b2f7Stbbdev 526*51c0b2f7Stbbdev //! NativeParallelFor testing with various concurrency settings 527*51c0b2f7Stbbdev //! \brief \ref error_guessing 528*51c0b2f7Stbbdev TEST_CASE("Lightweight testing"){ 529*51c0b2f7Stbbdev lightweight_testing::test<tbb::flow::function_node>(10); 530*51c0b2f7Stbbdev } 531*51c0b2f7Stbbdev 532*51c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 533*51c0b2f7Stbbdev //! Test follows and precedes API 534*51c0b2f7Stbbdev //! \brief \ref error_guessing 535*51c0b2f7Stbbdev TEST_CASE("Flowgraph node set test"){ 536*51c0b2f7Stbbdev test_follows_and_precedes_api(); 537*51c0b2f7Stbbdev } 538*51c0b2f7Stbbdev #endif 539*51c0b2f7Stbbdev 540*51c0b2f7Stbbdev #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 541*51c0b2f7Stbbdev //! Test decution guides 542*51c0b2f7Stbbdev //! \brief \ref requirement 543*51c0b2f7Stbbdev TEST_CASE("Deduction guides test"){ 544*51c0b2f7Stbbdev test_deduction_guides(); 545*51c0b2f7Stbbdev } 546*51c0b2f7Stbbdev #endif 547*51c0b2f7Stbbdev 548*51c0b2f7Stbbdev //! try_release and try_consume test 549*51c0b2f7Stbbdev //! \brief \ref error_guessing 550*51c0b2f7Stbbdev TEST_CASE("try_release try_consume"){ 551*51c0b2f7Stbbdev tbb::flow::graph g; 552*51c0b2f7Stbbdev 553*51c0b2f7Stbbdev tbb::flow::function_node<int, int> fn(g, tbb::flow::unlimited, [](const int&v){return v;}); 554*51c0b2f7Stbbdev 555*51c0b2f7Stbbdev CHECK_MESSAGE((fn.try_release()==false), "try_release should initially return false on a node"); 556*51c0b2f7Stbbdev CHECK_MESSAGE((fn.try_consume()==false), "try_consume should initially return false on a node"); 557*51c0b2f7Stbbdev } 558