1*51c0b2f7Stbbdev /* 2*51c0b2f7Stbbdev Copyright (c) 2005-2020 Intel Corporation 3*51c0b2f7Stbbdev 4*51c0b2f7Stbbdev Licensed under the Apache License, Version 2.0 (the "License"); 5*51c0b2f7Stbbdev you may not use this file except in compliance with the License. 6*51c0b2f7Stbbdev You may obtain a copy of the License at 7*51c0b2f7Stbbdev 8*51c0b2f7Stbbdev http://www.apache.org/licenses/LICENSE-2.0 9*51c0b2f7Stbbdev 10*51c0b2f7Stbbdev Unless required by applicable law or agreed to in writing, software 11*51c0b2f7Stbbdev distributed under the License is distributed on an "AS IS" BASIS, 12*51c0b2f7Stbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13*51c0b2f7Stbbdev See the License for the specific language governing permissions and 14*51c0b2f7Stbbdev limitations under the License. 15*51c0b2f7Stbbdev */ 16*51c0b2f7Stbbdev 17*51c0b2f7Stbbdev #include "common/config.h" 18*51c0b2f7Stbbdev 19*51c0b2f7Stbbdev // TODO revamp: move parts dependent on __TBB_EXTRA_DEBUG into separate test(s) since having these 20*51c0b2f7Stbbdev // parts in all of tests might make testing of the product, which is different from what is actually 21*51c0b2f7Stbbdev // released. 22*51c0b2f7Stbbdev #define __TBB_EXTRA_DEBUG 1 23*51c0b2f7Stbbdev #include "tbb/flow_graph.h" 24*51c0b2f7Stbbdev #include "tbb/spin_rw_mutex.h" 25*51c0b2f7Stbbdev 26*51c0b2f7Stbbdev #include "common/test.h" 27*51c0b2f7Stbbdev #include "common/utils.h" 28*51c0b2f7Stbbdev #include "common/graph_utils.h" 29*51c0b2f7Stbbdev #include "common/test_follows_and_precedes_api.h" 30*51c0b2f7Stbbdev 31*51c0b2f7Stbbdev 32*51c0b2f7Stbbdev //! \file test_multifunction_node.cpp 33*51c0b2f7Stbbdev //! \brief Test for [flow_graph.multifunction_node] specification 34*51c0b2f7Stbbdev 35*51c0b2f7Stbbdev 36*51c0b2f7Stbbdev #if TBB_USE_DEBUG 37*51c0b2f7Stbbdev #define N 16 38*51c0b2f7Stbbdev #else 39*51c0b2f7Stbbdev #define N 100 40*51c0b2f7Stbbdev #endif 41*51c0b2f7Stbbdev #define MAX_NODES 4 42*51c0b2f7Stbbdev 43*51c0b2f7Stbbdev //! Performs test on function nodes with limited concurrency and buffering 44*51c0b2f7Stbbdev /** These tests check: 45*51c0b2f7Stbbdev 1) that the number of executing copies never exceed the concurrency limit 46*51c0b2f7Stbbdev 2) that the node never rejects 47*51c0b2f7Stbbdev 3) that no items are lost 48*51c0b2f7Stbbdev and 4) all of this happens even if there are multiple predecessors and successors 49*51c0b2f7Stbbdev */ 50*51c0b2f7Stbbdev 51*51c0b2f7Stbbdev //! exercise buffered multifunction_node. 52*51c0b2f7Stbbdev template< typename InputType, typename OutputTuple, typename Body > 53*51c0b2f7Stbbdev void buffered_levels( size_t concurrency, Body body ) { 54*51c0b2f7Stbbdev typedef typename std::tuple_element<0,OutputTuple>::type OutputType; 55*51c0b2f7Stbbdev // Do for lc = 1 to concurrency level 56*51c0b2f7Stbbdev for ( size_t lc = 1; lc <= concurrency; ++lc ) { 57*51c0b2f7Stbbdev tbb::flow::graph g; 58*51c0b2f7Stbbdev 59*51c0b2f7Stbbdev // Set the execute_counter back to zero in the harness 60*51c0b2f7Stbbdev harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count = 0; 61*51c0b2f7Stbbdev // Set the number of current executors to zero. 62*51c0b2f7Stbbdev harness_graph_multifunction_executor<InputType, OutputTuple>::current_executors = 0; 63*51c0b2f7Stbbdev // Set the max allowed executors to lc. There is a check in the functor to make sure this is never exceeded. 64*51c0b2f7Stbbdev harness_graph_multifunction_executor<InputType, OutputTuple>::max_executors = lc; 65*51c0b2f7Stbbdev 66*51c0b2f7Stbbdev // Create the function_node with the appropriate concurrency level, and use default buffering 67*51c0b2f7Stbbdev tbb::flow::multifunction_node< InputType, OutputTuple > exe_node( g, lc, body ); 68*51c0b2f7Stbbdev 69*51c0b2f7Stbbdev //Create a vector of identical exe_nodes 70*51c0b2f7Stbbdev std::vector< tbb::flow::multifunction_node< InputType, OutputTuple > > exe_vec(2, exe_node); 71*51c0b2f7Stbbdev 72*51c0b2f7Stbbdev // exercise each of the copied nodes 73*51c0b2f7Stbbdev for (size_t node_idx=0; node_idx<exe_vec.size(); ++node_idx) { 74*51c0b2f7Stbbdev for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { 75*51c0b2f7Stbbdev // Create num_receivers counting receivers and connect the exe_vec[node_idx] to them. 76*51c0b2f7Stbbdev std::vector< std::shared_ptr<harness_mapped_receiver<OutputType>> > receivers; 77*51c0b2f7Stbbdev for (size_t i = 0; i < num_receivers; i++) { 78*51c0b2f7Stbbdev receivers.push_back( std::make_shared<harness_mapped_receiver<OutputType>>(g) ); 79*51c0b2f7Stbbdev } 80*51c0b2f7Stbbdev 81*51c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 82*51c0b2f7Stbbdev tbb::flow::make_edge( tbb::flow::output_port<0>(exe_vec[node_idx]), *receivers[r] ); 83*51c0b2f7Stbbdev } 84*51c0b2f7Stbbdev 85*51c0b2f7Stbbdev // Do the test with varying numbers of senders 86*51c0b2f7Stbbdev std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders; 87*51c0b2f7Stbbdev for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) { 88*51c0b2f7Stbbdev // Create num_senders senders, set their message limit each to N, and connect 89*51c0b2f7Stbbdev // them to the exe_vec[node_idx] 90*51c0b2f7Stbbdev senders.clear(); 91*51c0b2f7Stbbdev for (size_t s = 0; s < num_senders; ++s ) { 92*51c0b2f7Stbbdev senders.push_back( std::make_shared<harness_counting_sender<InputType>>() ); 93*51c0b2f7Stbbdev senders.back()->my_limit = N; 94*51c0b2f7Stbbdev tbb::flow::make_edge( *senders.back(), exe_vec[node_idx] ); 95*51c0b2f7Stbbdev } 96*51c0b2f7Stbbdev 97*51c0b2f7Stbbdev // Initialize the receivers so they know how many senders and messages to check for 98*51c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 99*51c0b2f7Stbbdev receivers[r]->initialize_map( N, num_senders ); 100*51c0b2f7Stbbdev } 101*51c0b2f7Stbbdev 102*51c0b2f7Stbbdev // Do the test 103*51c0b2f7Stbbdev utils::NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) ); 104*51c0b2f7Stbbdev g.wait_for_all(); 105*51c0b2f7Stbbdev 106*51c0b2f7Stbbdev // confirm that each sender was requested from N times 107*51c0b2f7Stbbdev for (size_t s = 0; s < num_senders; ++s ) { 108*51c0b2f7Stbbdev size_t n = senders[s]->my_received; 109*51c0b2f7Stbbdev CHECK_MESSAGE( n == N, "" ); 110*51c0b2f7Stbbdev CHECK_MESSAGE( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_vec[node_idx], "" ); 111*51c0b2f7Stbbdev } 112*51c0b2f7Stbbdev // validate the receivers 113*51c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 114*51c0b2f7Stbbdev receivers[r]->validate(); 115*51c0b2f7Stbbdev } 116*51c0b2f7Stbbdev } 117*51c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 118*51c0b2f7Stbbdev tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_vec[node_idx]), *receivers[r] ); 119*51c0b2f7Stbbdev } 120*51c0b2f7Stbbdev CHECK_MESSAGE( exe_vec[node_idx].try_put( InputType() ) == true, "" ); 121*51c0b2f7Stbbdev g.wait_for_all(); 122*51c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 123*51c0b2f7Stbbdev // since it's detached, nothing should have changed 124*51c0b2f7Stbbdev receivers[r]->validate(); 125*51c0b2f7Stbbdev } 126*51c0b2f7Stbbdev } 127*51c0b2f7Stbbdev } 128*51c0b2f7Stbbdev } 129*51c0b2f7Stbbdev } 130*51c0b2f7Stbbdev 131*51c0b2f7Stbbdev const size_t Offset = 123; 132*51c0b2f7Stbbdev std::atomic<size_t> global_execute_count; 133*51c0b2f7Stbbdev 134*51c0b2f7Stbbdev struct inc_functor { 135*51c0b2f7Stbbdev 136*51c0b2f7Stbbdev std::atomic<size_t> local_execute_count; 137*51c0b2f7Stbbdev inc_functor( ) { local_execute_count = 0; } 138*51c0b2f7Stbbdev inc_functor( const inc_functor &f ) { local_execute_count = size_t(f.local_execute_count); } 139*51c0b2f7Stbbdev 140*51c0b2f7Stbbdev template<typename output_ports_type> 141*51c0b2f7Stbbdev void operator()( int i, output_ports_type &p ) { 142*51c0b2f7Stbbdev ++global_execute_count; 143*51c0b2f7Stbbdev ++local_execute_count; 144*51c0b2f7Stbbdev (void)std::get<0>(p).try_put(i); 145*51c0b2f7Stbbdev } 146*51c0b2f7Stbbdev 147*51c0b2f7Stbbdev }; 148*51c0b2f7Stbbdev 149*51c0b2f7Stbbdev template< typename InputType, typename OutputTuple > 150*51c0b2f7Stbbdev void buffered_levels_with_copy( size_t concurrency ) { 151*51c0b2f7Stbbdev typedef typename std::tuple_element<0,OutputTuple>::type OutputType; 152*51c0b2f7Stbbdev // Do for lc = 1 to concurrency level 153*51c0b2f7Stbbdev for ( size_t lc = 1; lc <= concurrency; ++lc ) { 154*51c0b2f7Stbbdev tbb::flow::graph g; 155*51c0b2f7Stbbdev 156*51c0b2f7Stbbdev inc_functor cf; 157*51c0b2f7Stbbdev cf.local_execute_count = Offset; 158*51c0b2f7Stbbdev global_execute_count = Offset; 159*51c0b2f7Stbbdev 160*51c0b2f7Stbbdev tbb::flow::multifunction_node< InputType, OutputTuple > exe_node( g, lc, cf ); 161*51c0b2f7Stbbdev 162*51c0b2f7Stbbdev for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { 163*51c0b2f7Stbbdev 164*51c0b2f7Stbbdev std::vector< std::shared_ptr<harness_mapped_receiver<OutputType>> > receivers; 165*51c0b2f7Stbbdev for (size_t i = 0; i < num_receivers; i++) { 166*51c0b2f7Stbbdev receivers.push_back( std::make_shared<harness_mapped_receiver<OutputType>>(g) ); 167*51c0b2f7Stbbdev } 168*51c0b2f7Stbbdev 169*51c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 170*51c0b2f7Stbbdev tbb::flow::make_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] ); 171*51c0b2f7Stbbdev } 172*51c0b2f7Stbbdev 173*51c0b2f7Stbbdev std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders; 174*51c0b2f7Stbbdev for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) { 175*51c0b2f7Stbbdev senders.clear(); 176*51c0b2f7Stbbdev for (size_t s = 0; s < num_senders; ++s ) { 177*51c0b2f7Stbbdev senders.push_back( std::make_shared<harness_counting_sender<InputType>>() ); 178*51c0b2f7Stbbdev senders.back()->my_limit = N; 179*51c0b2f7Stbbdev tbb::flow::make_edge( *senders.back(), exe_node ); 180*51c0b2f7Stbbdev } 181*51c0b2f7Stbbdev 182*51c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 183*51c0b2f7Stbbdev receivers[r]->initialize_map( N, num_senders ); 184*51c0b2f7Stbbdev } 185*51c0b2f7Stbbdev 186*51c0b2f7Stbbdev utils::NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) ); 187*51c0b2f7Stbbdev g.wait_for_all(); 188*51c0b2f7Stbbdev 189*51c0b2f7Stbbdev for (size_t s = 0; s < num_senders; ++s ) { 190*51c0b2f7Stbbdev size_t n = senders[s]->my_received; 191*51c0b2f7Stbbdev CHECK_MESSAGE( n == N, "" ); 192*51c0b2f7Stbbdev CHECK_MESSAGE( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_node, "" ); 193*51c0b2f7Stbbdev } 194*51c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 195*51c0b2f7Stbbdev receivers[r]->validate(); 196*51c0b2f7Stbbdev } 197*51c0b2f7Stbbdev } 198*51c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 199*51c0b2f7Stbbdev tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] ); 200*51c0b2f7Stbbdev } 201*51c0b2f7Stbbdev CHECK_MESSAGE( exe_node.try_put( InputType() ) == true, "" ); 202*51c0b2f7Stbbdev g.wait_for_all(); 203*51c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 204*51c0b2f7Stbbdev receivers[r]->validate(); 205*51c0b2f7Stbbdev } 206*51c0b2f7Stbbdev } 207*51c0b2f7Stbbdev 208*51c0b2f7Stbbdev // validate that the local body matches the global execute_count and both are correct 209*51c0b2f7Stbbdev inc_functor body_copy = tbb::flow::copy_body<inc_functor>( exe_node ); 210*51c0b2f7Stbbdev const size_t expected_count = N/2 * MAX_NODES * MAX_NODES * ( MAX_NODES + 1 ) + MAX_NODES + Offset; 211*51c0b2f7Stbbdev size_t global_count = global_execute_count; 212*51c0b2f7Stbbdev size_t inc_count = body_copy.local_execute_count; 213*51c0b2f7Stbbdev CHECK_MESSAGE( (global_count == expected_count && global_count == inc_count), "" ); 214*51c0b2f7Stbbdev } 215*51c0b2f7Stbbdev } 216*51c0b2f7Stbbdev 217*51c0b2f7Stbbdev template< typename InputType, typename OutputTuple > 218*51c0b2f7Stbbdev void run_buffered_levels( int c ) { 219*51c0b2f7Stbbdev typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type; 220*51c0b2f7Stbbdev buffered_levels<InputType,OutputTuple>( c, []( InputType i, output_ports_type &p ) { harness_graph_multifunction_executor<InputType, OutputTuple>::func(i,p); } ); 221*51c0b2f7Stbbdev buffered_levels<InputType,OutputTuple>( c, &harness_graph_multifunction_executor<InputType, OutputTuple>::func ); 222*51c0b2f7Stbbdev buffered_levels<InputType,OutputTuple>( c, typename harness_graph_multifunction_executor<InputType, OutputTuple>::functor() ); 223*51c0b2f7Stbbdev buffered_levels_with_copy<InputType,OutputTuple>( c ); 224*51c0b2f7Stbbdev } 225*51c0b2f7Stbbdev 226*51c0b2f7Stbbdev 227*51c0b2f7Stbbdev //! Performs test on executable nodes with limited concurrency 228*51c0b2f7Stbbdev /** These tests check: 229*51c0b2f7Stbbdev 1) that the nodes will accepts puts up to the concurrency limit, 230*51c0b2f7Stbbdev 2) the nodes do not exceed the concurrency limit even when run with more threads (this is checked in the harness_graph_executor), 231*51c0b2f7Stbbdev 3) the nodes will receive puts from multiple successors simultaneously, 232*51c0b2f7Stbbdev and 4) the nodes will send to multiple predecessors. 233*51c0b2f7Stbbdev There is no checking of the contents of the messages for corruption. 234*51c0b2f7Stbbdev */ 235*51c0b2f7Stbbdev 236*51c0b2f7Stbbdev template< typename InputType, typename OutputTuple, typename Body > 237*51c0b2f7Stbbdev void concurrency_levels( size_t concurrency, Body body ) { 238*51c0b2f7Stbbdev typedef typename std::tuple_element<0,OutputTuple>::type OutputType; 239*51c0b2f7Stbbdev for ( size_t lc = 1; lc <= concurrency; ++lc ) { 240*51c0b2f7Stbbdev tbb::flow::graph g; 241*51c0b2f7Stbbdev 242*51c0b2f7Stbbdev // Set the execute_counter back to zero in the harness 243*51c0b2f7Stbbdev harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count = 0; 244*51c0b2f7Stbbdev // Set the number of current executors to zero. 245*51c0b2f7Stbbdev harness_graph_multifunction_executor<InputType, OutputTuple>::current_executors = 0; 246*51c0b2f7Stbbdev // Set the max allowed executors to lc. There is a check in the functor to make sure this is never exceeded. 247*51c0b2f7Stbbdev harness_graph_multifunction_executor<InputType, OutputTuple>::max_executors = lc; 248*51c0b2f7Stbbdev 249*51c0b2f7Stbbdev 250*51c0b2f7Stbbdev tbb::flow::multifunction_node< InputType, OutputTuple, tbb::flow::rejecting > exe_node( g, lc, body ); 251*51c0b2f7Stbbdev 252*51c0b2f7Stbbdev for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { 253*51c0b2f7Stbbdev 254*51c0b2f7Stbbdev std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers; 255*51c0b2f7Stbbdev for (size_t i = 0; i < num_receivers; ++i) { 256*51c0b2f7Stbbdev receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) ); 257*51c0b2f7Stbbdev } 258*51c0b2f7Stbbdev 259*51c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 260*51c0b2f7Stbbdev tbb::flow::make_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] ); 261*51c0b2f7Stbbdev } 262*51c0b2f7Stbbdev 263*51c0b2f7Stbbdev std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders; 264*51c0b2f7Stbbdev 265*51c0b2f7Stbbdev for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) { 266*51c0b2f7Stbbdev { 267*51c0b2f7Stbbdev // Exclusively lock m to prevent exe_node from finishing 268*51c0b2f7Stbbdev tbb::spin_rw_mutex::scoped_lock l( 269*51c0b2f7Stbbdev harness_graph_multifunction_executor< InputType, OutputTuple>::template mutex_holder<tbb::spin_rw_mutex>::mutex 270*51c0b2f7Stbbdev ); 271*51c0b2f7Stbbdev 272*51c0b2f7Stbbdev // put to lc level, it will accept and then block at m 273*51c0b2f7Stbbdev for ( size_t c = 0 ; c < lc ; ++c ) { 274*51c0b2f7Stbbdev CHECK_MESSAGE( exe_node.try_put( InputType() ) == true, "" ); 275*51c0b2f7Stbbdev } 276*51c0b2f7Stbbdev // it only accepts to lc level 277*51c0b2f7Stbbdev CHECK_MESSAGE( exe_node.try_put( InputType() ) == false, "" ); 278*51c0b2f7Stbbdev 279*51c0b2f7Stbbdev senders.clear(); 280*51c0b2f7Stbbdev for (size_t s = 0; s < num_senders; ++s ) { 281*51c0b2f7Stbbdev senders.push_back( std::make_shared<harness_counting_sender<InputType>>() ); 282*51c0b2f7Stbbdev senders.back()->my_limit = N; 283*51c0b2f7Stbbdev exe_node.register_predecessor( *senders.back() ); 284*51c0b2f7Stbbdev } 285*51c0b2f7Stbbdev 286*51c0b2f7Stbbdev } // release lock at end of scope, setting the exe node free to continue 287*51c0b2f7Stbbdev // wait for graph to settle down 288*51c0b2f7Stbbdev g.wait_for_all(); 289*51c0b2f7Stbbdev 290*51c0b2f7Stbbdev // confirm that each sender was requested from N times 291*51c0b2f7Stbbdev for (size_t s = 0; s < num_senders; ++s ) { 292*51c0b2f7Stbbdev size_t n = senders[s]->my_received; 293*51c0b2f7Stbbdev CHECK_MESSAGE( n == N, "" ); 294*51c0b2f7Stbbdev CHECK_MESSAGE( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_node, "" ); 295*51c0b2f7Stbbdev } 296*51c0b2f7Stbbdev // confirm that each receivers got N * num_senders + the initial lc puts 297*51c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 298*51c0b2f7Stbbdev size_t n = receivers[r]->my_count; 299*51c0b2f7Stbbdev CHECK_MESSAGE( n == num_senders*N+lc, "" ); 300*51c0b2f7Stbbdev receivers[r]->my_count = 0; 301*51c0b2f7Stbbdev } 302*51c0b2f7Stbbdev } 303*51c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 304*51c0b2f7Stbbdev tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] ); 305*51c0b2f7Stbbdev } 306*51c0b2f7Stbbdev CHECK_MESSAGE( exe_node.try_put( InputType() ) == true, "" ); 307*51c0b2f7Stbbdev g.wait_for_all(); 308*51c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 309*51c0b2f7Stbbdev CHECK_MESSAGE( int(receivers[r]->my_count) == 0, "" ); 310*51c0b2f7Stbbdev } 311*51c0b2f7Stbbdev } 312*51c0b2f7Stbbdev } 313*51c0b2f7Stbbdev } 314*51c0b2f7Stbbdev 315*51c0b2f7Stbbdev template< typename InputType, typename OutputTuple > 316*51c0b2f7Stbbdev void run_concurrency_levels( int c ) { 317*51c0b2f7Stbbdev typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type; 318*51c0b2f7Stbbdev concurrency_levels<InputType,OutputTuple>( c, []( InputType i, output_ports_type &p ) { harness_graph_multifunction_executor<InputType, OutputTuple>::template tfunc<tbb::spin_rw_mutex>(i,p); } ); 319*51c0b2f7Stbbdev concurrency_levels<InputType,OutputTuple>( c, &harness_graph_multifunction_executor<InputType, OutputTuple>::template tfunc<tbb::spin_rw_mutex> ); 320*51c0b2f7Stbbdev concurrency_levels<InputType,OutputTuple>( c, typename harness_graph_multifunction_executor<InputType, OutputTuple>::template tfunctor<tbb::spin_rw_mutex>() ); 321*51c0b2f7Stbbdev } 322*51c0b2f7Stbbdev 323*51c0b2f7Stbbdev 324*51c0b2f7Stbbdev struct empty_no_assign { 325*51c0b2f7Stbbdev empty_no_assign() {} 326*51c0b2f7Stbbdev empty_no_assign( int ) {} 327*51c0b2f7Stbbdev operator int() { return 0; } 328*51c0b2f7Stbbdev operator int() const { return 0; } 329*51c0b2f7Stbbdev }; 330*51c0b2f7Stbbdev 331*51c0b2f7Stbbdev template< typename InputType > 332*51c0b2f7Stbbdev struct parallel_puts : private utils::NoAssign { 333*51c0b2f7Stbbdev 334*51c0b2f7Stbbdev tbb::flow::receiver< InputType > * const my_exe_node; 335*51c0b2f7Stbbdev 336*51c0b2f7Stbbdev parallel_puts( tbb::flow::receiver< InputType > &exe_node ) : my_exe_node(&exe_node) {} 337*51c0b2f7Stbbdev 338*51c0b2f7Stbbdev void operator()( int ) const { 339*51c0b2f7Stbbdev for ( int i = 0; i < N; ++i ) { 340*51c0b2f7Stbbdev // the nodes will accept all puts 341*51c0b2f7Stbbdev CHECK_MESSAGE( my_exe_node->try_put( InputType() ) == true, "" ); 342*51c0b2f7Stbbdev } 343*51c0b2f7Stbbdev } 344*51c0b2f7Stbbdev 345*51c0b2f7Stbbdev }; 346*51c0b2f7Stbbdev 347*51c0b2f7Stbbdev //! Performs test on executable nodes with unlimited concurrency 348*51c0b2f7Stbbdev /** These tests check: 349*51c0b2f7Stbbdev 1) that the nodes will accept all puts 350*51c0b2f7Stbbdev 2) the nodes will receive puts from multiple predecessors simultaneously, 351*51c0b2f7Stbbdev and 3) the nodes will send to multiple successors. 352*51c0b2f7Stbbdev There is no checking of the contents of the messages for corruption. 353*51c0b2f7Stbbdev */ 354*51c0b2f7Stbbdev 355*51c0b2f7Stbbdev template< typename InputType, typename OutputTuple, typename Body > 356*51c0b2f7Stbbdev void unlimited_concurrency( Body body ) { 357*51c0b2f7Stbbdev typedef typename std::tuple_element<0,OutputTuple>::type OutputType; 358*51c0b2f7Stbbdev 359*51c0b2f7Stbbdev for (unsigned int p = 1; p < 2*utils::MaxThread; ++p) { 360*51c0b2f7Stbbdev tbb::flow::graph g; 361*51c0b2f7Stbbdev tbb::flow::multifunction_node< InputType, OutputTuple, tbb::flow::rejecting > exe_node( g, tbb::flow::unlimited, body ); 362*51c0b2f7Stbbdev 363*51c0b2f7Stbbdev for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { 364*51c0b2f7Stbbdev std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers; 365*51c0b2f7Stbbdev for (size_t i = 0; i < num_receivers; ++i) { 366*51c0b2f7Stbbdev receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) ); 367*51c0b2f7Stbbdev } 368*51c0b2f7Stbbdev 369*51c0b2f7Stbbdev harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count = 0; 370*51c0b2f7Stbbdev 371*51c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 372*51c0b2f7Stbbdev tbb::flow::make_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] ); 373*51c0b2f7Stbbdev } 374*51c0b2f7Stbbdev 375*51c0b2f7Stbbdev utils::NativeParallelFor( p, parallel_puts<InputType>(exe_node) ); 376*51c0b2f7Stbbdev g.wait_for_all(); 377*51c0b2f7Stbbdev 378*51c0b2f7Stbbdev // 2) the nodes will receive puts from multiple predecessors simultaneously, 379*51c0b2f7Stbbdev size_t ec = harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count; 380*51c0b2f7Stbbdev CHECK_MESSAGE( (unsigned int)ec == p*N, "" ); 381*51c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 382*51c0b2f7Stbbdev size_t c = receivers[r]->my_count; 383*51c0b2f7Stbbdev // 3) the nodes will send to multiple successors. 384*51c0b2f7Stbbdev CHECK_MESSAGE( (unsigned int)c == p*N, "" ); 385*51c0b2f7Stbbdev } 386*51c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 387*51c0b2f7Stbbdev tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] ); 388*51c0b2f7Stbbdev } 389*51c0b2f7Stbbdev } 390*51c0b2f7Stbbdev } 391*51c0b2f7Stbbdev } 392*51c0b2f7Stbbdev 393*51c0b2f7Stbbdev template< typename InputType, typename OutputTuple > 394*51c0b2f7Stbbdev void run_unlimited_concurrency() { 395*51c0b2f7Stbbdev harness_graph_multifunction_executor<InputType, OutputTuple>::max_executors = 0; 396*51c0b2f7Stbbdev typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type; 397*51c0b2f7Stbbdev unlimited_concurrency<InputType,OutputTuple>( []( InputType i, output_ports_type &p ) { harness_graph_multifunction_executor<InputType, OutputTuple>::func(i,p); } ); 398*51c0b2f7Stbbdev unlimited_concurrency<InputType,OutputTuple>( &harness_graph_multifunction_executor<InputType, OutputTuple>::func ); 399*51c0b2f7Stbbdev unlimited_concurrency<InputType,OutputTuple>( typename harness_graph_multifunction_executor<InputType, OutputTuple>::functor() ); 400*51c0b2f7Stbbdev } 401*51c0b2f7Stbbdev 402*51c0b2f7Stbbdev template<typename InputType, typename OutputTuple> 403*51c0b2f7Stbbdev struct oddEvenBody { 404*51c0b2f7Stbbdev typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type; 405*51c0b2f7Stbbdev typedef typename std::tuple_element<0,OutputTuple>::type EvenType; 406*51c0b2f7Stbbdev typedef typename std::tuple_element<1,OutputTuple>::type OddType; 407*51c0b2f7Stbbdev void operator() (const InputType &i, output_ports_type &p) { 408*51c0b2f7Stbbdev if((int)i % 2) { 409*51c0b2f7Stbbdev (void)std::get<1>(p).try_put(OddType(i)); 410*51c0b2f7Stbbdev } 411*51c0b2f7Stbbdev else { 412*51c0b2f7Stbbdev (void)std::get<0>(p).try_put(EvenType(i)); 413*51c0b2f7Stbbdev } 414*51c0b2f7Stbbdev } 415*51c0b2f7Stbbdev }; 416*51c0b2f7Stbbdev 417*51c0b2f7Stbbdev template<typename InputType, typename OutputTuple > 418*51c0b2f7Stbbdev void run_multiport_test(int num_threads) { 419*51c0b2f7Stbbdev typedef typename tbb::flow::multifunction_node<InputType, OutputTuple> mo_node_type; 420*51c0b2f7Stbbdev typedef typename std::tuple_element<0,OutputTuple>::type EvenType; 421*51c0b2f7Stbbdev typedef typename std::tuple_element<1,OutputTuple>::type OddType; 422*51c0b2f7Stbbdev tbb::task_arena arena(num_threads); 423*51c0b2f7Stbbdev arena.execute( 424*51c0b2f7Stbbdev [&] () { 425*51c0b2f7Stbbdev tbb::flow::graph g; 426*51c0b2f7Stbbdev mo_node_type mo_node(g, tbb::flow::unlimited, oddEvenBody<InputType, OutputTuple>() ); 427*51c0b2f7Stbbdev 428*51c0b2f7Stbbdev tbb::flow::queue_node<EvenType> q0(g); 429*51c0b2f7Stbbdev tbb::flow::queue_node<OddType> q1(g); 430*51c0b2f7Stbbdev 431*51c0b2f7Stbbdev tbb::flow::make_edge(tbb::flow::output_port<0>(mo_node), q0); 432*51c0b2f7Stbbdev tbb::flow::make_edge(tbb::flow::output_port<1>(mo_node), q1); 433*51c0b2f7Stbbdev 434*51c0b2f7Stbbdev for(InputType i = 0; i < N; ++i) { 435*51c0b2f7Stbbdev mo_node.try_put(i); 436*51c0b2f7Stbbdev } 437*51c0b2f7Stbbdev 438*51c0b2f7Stbbdev g.wait_for_all(); 439*51c0b2f7Stbbdev for(int i = 0; i < N/2; ++i) { 440*51c0b2f7Stbbdev EvenType e{}; 441*51c0b2f7Stbbdev OddType o{}; 442*51c0b2f7Stbbdev CHECK_MESSAGE( q0.try_get(e), "" ); 443*51c0b2f7Stbbdev CHECK_MESSAGE( (int)e % 2 == 0, "" ); 444*51c0b2f7Stbbdev CHECK_MESSAGE( q1.try_get(o), "" ); 445*51c0b2f7Stbbdev CHECK_MESSAGE( (int)o % 2 == 1, "" ); 446*51c0b2f7Stbbdev } 447*51c0b2f7Stbbdev } 448*51c0b2f7Stbbdev ); 449*51c0b2f7Stbbdev } 450*51c0b2f7Stbbdev 451*51c0b2f7Stbbdev //! Tests limited concurrency cases for nodes that accept data messages 452*51c0b2f7Stbbdev void test_concurrency(int num_threads) { 453*51c0b2f7Stbbdev tbb::task_arena arena(num_threads); 454*51c0b2f7Stbbdev arena.execute( 455*51c0b2f7Stbbdev [&] () { 456*51c0b2f7Stbbdev run_concurrency_levels<int,std::tuple<int> >(num_threads); 457*51c0b2f7Stbbdev run_concurrency_levels<int,std::tuple<tbb::flow::continue_msg> >(num_threads); 458*51c0b2f7Stbbdev run_buffered_levels<int, std::tuple<int> >(num_threads); 459*51c0b2f7Stbbdev run_unlimited_concurrency<int, std::tuple<int> >(); 460*51c0b2f7Stbbdev run_unlimited_concurrency<int,std::tuple<empty_no_assign> >(); 461*51c0b2f7Stbbdev run_unlimited_concurrency<empty_no_assign,std::tuple<int> >(); 462*51c0b2f7Stbbdev run_unlimited_concurrency<empty_no_assign,std::tuple<empty_no_assign> >(); 463*51c0b2f7Stbbdev run_unlimited_concurrency<int,std::tuple<tbb::flow::continue_msg> >(); 464*51c0b2f7Stbbdev run_unlimited_concurrency<empty_no_assign,std::tuple<tbb::flow::continue_msg> >(); 465*51c0b2f7Stbbdev run_multiport_test<int, std::tuple<int, int> >(num_threads); 466*51c0b2f7Stbbdev run_multiport_test<float, std::tuple<int, double> >(num_threads); 467*51c0b2f7Stbbdev } 468*51c0b2f7Stbbdev ); 469*51c0b2f7Stbbdev } 470*51c0b2f7Stbbdev 471*51c0b2f7Stbbdev template<typename Policy> 472*51c0b2f7Stbbdev void test_ports_return_references() { 473*51c0b2f7Stbbdev tbb::flow::graph g; 474*51c0b2f7Stbbdev typedef int InputType; 475*51c0b2f7Stbbdev typedef std::tuple<int> OutputTuple; 476*51c0b2f7Stbbdev tbb::flow::multifunction_node<InputType, OutputTuple, Policy> mf_node( 477*51c0b2f7Stbbdev g, tbb::flow::unlimited, 478*51c0b2f7Stbbdev &harness_graph_multifunction_executor<InputType, OutputTuple>::empty_func ); 479*51c0b2f7Stbbdev test_output_ports_return_ref(mf_node); 480*51c0b2f7Stbbdev } 481*51c0b2f7Stbbdev 482*51c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 483*51c0b2f7Stbbdev #include <array> 484*51c0b2f7Stbbdev #include <vector> 485*51c0b2f7Stbbdev 486*51c0b2f7Stbbdev void test_precedes() { 487*51c0b2f7Stbbdev using namespace tbb::flow; 488*51c0b2f7Stbbdev 489*51c0b2f7Stbbdev using multinode = multifunction_node<int, std::tuple<int, int>>; 490*51c0b2f7Stbbdev 491*51c0b2f7Stbbdev graph g; 492*51c0b2f7Stbbdev 493*51c0b2f7Stbbdev buffer_node<int> b1(g); 494*51c0b2f7Stbbdev buffer_node<int> b2(g); 495*51c0b2f7Stbbdev 496*51c0b2f7Stbbdev multinode node(precedes(b1, b2), unlimited, [](const int& i, multinode::output_ports_type& op) -> void { 497*51c0b2f7Stbbdev if (i % 2) 498*51c0b2f7Stbbdev std::get<0>(op).try_put(i); 499*51c0b2f7Stbbdev else 500*51c0b2f7Stbbdev std::get<1>(op).try_put(i); 501*51c0b2f7Stbbdev } 502*51c0b2f7Stbbdev ); 503*51c0b2f7Stbbdev 504*51c0b2f7Stbbdev node.try_put(0); 505*51c0b2f7Stbbdev node.try_put(1); 506*51c0b2f7Stbbdev g.wait_for_all(); 507*51c0b2f7Stbbdev 508*51c0b2f7Stbbdev int storage; 509*51c0b2f7Stbbdev CHECK_MESSAGE((b1.try_get(storage) && !b1.try_get(storage) && b2.try_get(storage) && !b2.try_get(storage)), 510*51c0b2f7Stbbdev "Not exact edge quantity was made"); 511*51c0b2f7Stbbdev } 512*51c0b2f7Stbbdev 513*51c0b2f7Stbbdev void test_follows_and_precedes_api() { 514*51c0b2f7Stbbdev using multinode = tbb::flow::multifunction_node<int, std::tuple<int, int, int>>; 515*51c0b2f7Stbbdev 516*51c0b2f7Stbbdev std::array<int, 3> messages_for_follows = { {0, 1, 2} }; 517*51c0b2f7Stbbdev 518*51c0b2f7Stbbdev follows_and_precedes_testing::test_follows 519*51c0b2f7Stbbdev <int, tbb::flow::multifunction_node<int, std::tuple<int, int, int>>> 520*51c0b2f7Stbbdev (messages_for_follows, tbb::flow::unlimited, [](const int& i, multinode::output_ports_type& op) -> void { 521*51c0b2f7Stbbdev std::get<0>(op).try_put(i); 522*51c0b2f7Stbbdev }); 523*51c0b2f7Stbbdev 524*51c0b2f7Stbbdev test_precedes(); 525*51c0b2f7Stbbdev } 526*51c0b2f7Stbbdev #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 527*51c0b2f7Stbbdev 528*51c0b2f7Stbbdev //! Test various node bodies with concurrency 529*51c0b2f7Stbbdev //! \brief \ref error_guessing 530*51c0b2f7Stbbdev TEST_CASE("Concurrency test"){ 531*51c0b2f7Stbbdev for( unsigned int p=utils::MinThread; p<=utils::MaxThread; ++p ) { 532*51c0b2f7Stbbdev test_concurrency(p); 533*51c0b2f7Stbbdev } 534*51c0b2f7Stbbdev } 535*51c0b2f7Stbbdev 536*51c0b2f7Stbbdev //! Test return types of ports 537*51c0b2f7Stbbdev //! \brief \ref error_guessing 538*51c0b2f7Stbbdev TEST_CASE("Test ports retrurn references"){ 539*51c0b2f7Stbbdev test_ports_return_references<tbb::flow::queueing>(); 540*51c0b2f7Stbbdev test_ports_return_references<tbb::flow::rejecting>(); 541*51c0b2f7Stbbdev } 542*51c0b2f7Stbbdev 543*51c0b2f7Stbbdev //! NativeParallelFor testing with various concurrency settings 544*51c0b2f7Stbbdev //! \brief \ref error_guessing 545*51c0b2f7Stbbdev TEST_CASE("Lightweight testing"){ 546*51c0b2f7Stbbdev lightweight_testing::test<tbb::flow::multifunction_node>(10); 547*51c0b2f7Stbbdev } 548*51c0b2f7Stbbdev 549*51c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 550*51c0b2f7Stbbdev //! Test follows and precedes API 551*51c0b2f7Stbbdev //! \brief \ref error_guessing 552*51c0b2f7Stbbdev TEST_CASE("Test follows-precedes API"){ 553*51c0b2f7Stbbdev test_follows_and_precedes_api(); 554*51c0b2f7Stbbdev } 555*51c0b2f7Stbbdev //! Test priority constructor with follows and precedes API 556*51c0b2f7Stbbdev //! \brief \ref error_guessing 557*51c0b2f7Stbbdev TEST_CASE("Test priority with follows and precedes"){ 558*51c0b2f7Stbbdev using namespace tbb::flow; 559*51c0b2f7Stbbdev 560*51c0b2f7Stbbdev using multinode = multifunction_node<int, std::tuple<int, int>>; 561*51c0b2f7Stbbdev 562*51c0b2f7Stbbdev graph g; 563*51c0b2f7Stbbdev 564*51c0b2f7Stbbdev buffer_node<int> b1(g); 565*51c0b2f7Stbbdev buffer_node<int> b2(g); 566*51c0b2f7Stbbdev 567*51c0b2f7Stbbdev multinode node(precedes(b1, b2), unlimited, [](const int& i, multinode::output_ports_type& op) -> void { 568*51c0b2f7Stbbdev if (i % 2) 569*51c0b2f7Stbbdev std::get<0>(op).try_put(i); 570*51c0b2f7Stbbdev else 571*51c0b2f7Stbbdev std::get<1>(op).try_put(i); 572*51c0b2f7Stbbdev } 573*51c0b2f7Stbbdev , node_priority_t(0)); 574*51c0b2f7Stbbdev 575*51c0b2f7Stbbdev node.try_put(0); 576*51c0b2f7Stbbdev node.try_put(1); 577*51c0b2f7Stbbdev g.wait_for_all(); 578*51c0b2f7Stbbdev 579*51c0b2f7Stbbdev int storage; 580*51c0b2f7Stbbdev CHECK_MESSAGE((b1.try_get(storage) && !b1.try_get(storage) && b2.try_get(storage) && !b2.try_get(storage)), 581*51c0b2f7Stbbdev "Not exact edge quantity was made"); 582*51c0b2f7Stbbdev } 583*51c0b2f7Stbbdev 584*51c0b2f7Stbbdev #endif 585*51c0b2f7Stbbdev 586