1*51c0b2f7Stbbdev /* 2*51c0b2f7Stbbdev Copyright (c) 2005-2020 Intel Corporation 3*51c0b2f7Stbbdev 4*51c0b2f7Stbbdev Licensed under the Apache License, Version 2.0 (the "License"); 5*51c0b2f7Stbbdev you may not use this file except in compliance with the License. 6*51c0b2f7Stbbdev You may obtain a copy of the License at 7*51c0b2f7Stbbdev 8*51c0b2f7Stbbdev http://www.apache.org/licenses/LICENSE-2.0 9*51c0b2f7Stbbdev 10*51c0b2f7Stbbdev Unless required by applicable law or agreed to in writing, software 11*51c0b2f7Stbbdev distributed under the License is distributed on an "AS IS" BASIS, 12*51c0b2f7Stbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13*51c0b2f7Stbbdev See the License for the specific language governing permissions and 14*51c0b2f7Stbbdev limitations under the License. 15*51c0b2f7Stbbdev */ 16*51c0b2f7Stbbdev 17*51c0b2f7Stbbdev #include "common/config.h" 18*51c0b2f7Stbbdev 19*51c0b2f7Stbbdev // TODO revamp: move parts dependent on __TBB_EXTRA_DEBUG into separate test(s) since having these 20*51c0b2f7Stbbdev // parts in all of tests might make testing of the product, which is different from what is actually 21*51c0b2f7Stbbdev // released. 22*51c0b2f7Stbbdev #define __TBB_EXTRA_DEBUG 1 23*51c0b2f7Stbbdev #include "tbb/flow_graph.h" 24*51c0b2f7Stbbdev 25*51c0b2f7Stbbdev #include "common/test.h" 26*51c0b2f7Stbbdev #include "common/utils.h" 27*51c0b2f7Stbbdev #include "common/graph_utils.h" 28*51c0b2f7Stbbdev #include "common/test_follows_and_precedes_api.h" 29*51c0b2f7Stbbdev 30*51c0b2f7Stbbdev 31*51c0b2f7Stbbdev //! \file test_continue_node.cpp 32*51c0b2f7Stbbdev //! \brief Test for [flow_graph.continue_node] specification 33*51c0b2f7Stbbdev 34*51c0b2f7Stbbdev 35*51c0b2f7Stbbdev #define N 1000 36*51c0b2f7Stbbdev #define MAX_NODES 4 37*51c0b2f7Stbbdev #define C 8 38*51c0b2f7Stbbdev 39*51c0b2f7Stbbdev // A class to use as a fake predecessor of continue_node 40*51c0b2f7Stbbdev struct fake_continue_sender : public tbb::flow::sender<tbb::flow::continue_msg> 41*51c0b2f7Stbbdev { 42*51c0b2f7Stbbdev typedef tbb::flow::sender<tbb::flow::continue_msg>::successor_type successor_type; 43*51c0b2f7Stbbdev // Define implementations of virtual methods that are abstract in the base class 44*51c0b2f7Stbbdev bool register_successor( successor_type& ) override { return false; } 45*51c0b2f7Stbbdev bool remove_successor( successor_type& ) override { return false; } 46*51c0b2f7Stbbdev }; 47*51c0b2f7Stbbdev 48*51c0b2f7Stbbdev template< typename InputType > 49*51c0b2f7Stbbdev struct parallel_puts { 50*51c0b2f7Stbbdev 51*51c0b2f7Stbbdev tbb::flow::receiver< InputType > * const my_exe_node; 52*51c0b2f7Stbbdev 53*51c0b2f7Stbbdev parallel_puts( tbb::flow::receiver< InputType > &exe_node ) : my_exe_node(&exe_node) {} 54*51c0b2f7Stbbdev parallel_puts& operator=(const parallel_puts&) = delete; 55*51c0b2f7Stbbdev 56*51c0b2f7Stbbdev void operator()( int ) const { 57*51c0b2f7Stbbdev for ( int i = 0; i < N; ++i ) { 58*51c0b2f7Stbbdev // the nodes will accept all puts 59*51c0b2f7Stbbdev CHECK_MESSAGE( my_exe_node->try_put( InputType() ) == true, "" ); 60*51c0b2f7Stbbdev } 61*51c0b2f7Stbbdev } 62*51c0b2f7Stbbdev 63*51c0b2f7Stbbdev }; 64*51c0b2f7Stbbdev 65*51c0b2f7Stbbdev template< typename OutputType > 66*51c0b2f7Stbbdev void run_continue_nodes( int p, tbb::flow::graph& g, tbb::flow::continue_node< OutputType >& n ) { 67*51c0b2f7Stbbdev fake_continue_sender fake_sender; 68*51c0b2f7Stbbdev for (size_t i = 0; i < N; ++i) { 69*51c0b2f7Stbbdev tbb::detail::d1::register_predecessor(n, fake_sender); 70*51c0b2f7Stbbdev } 71*51c0b2f7Stbbdev 72*51c0b2f7Stbbdev for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { 73*51c0b2f7Stbbdev std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers; 74*51c0b2f7Stbbdev for (size_t i = 0; i < num_receivers; ++i) { 75*51c0b2f7Stbbdev receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) ); 76*51c0b2f7Stbbdev } 77*51c0b2f7Stbbdev harness_graph_executor<tbb::flow::continue_msg, OutputType>::execute_count = 0; 78*51c0b2f7Stbbdev 79*51c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 80*51c0b2f7Stbbdev tbb::flow::make_edge( n, *receivers[r] ); 81*51c0b2f7Stbbdev } 82*51c0b2f7Stbbdev 83*51c0b2f7Stbbdev utils::NativeParallelFor( p, parallel_puts<tbb::flow::continue_msg>(n) ); 84*51c0b2f7Stbbdev g.wait_for_all(); 85*51c0b2f7Stbbdev 86*51c0b2f7Stbbdev // 2) the nodes will receive puts from multiple predecessors simultaneously, 87*51c0b2f7Stbbdev size_t ec = harness_graph_executor<tbb::flow::continue_msg, OutputType>::execute_count; 88*51c0b2f7Stbbdev CHECK_MESSAGE( (int)ec == p, "" ); 89*51c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 90*51c0b2f7Stbbdev size_t c = receivers[r]->my_count; 91*51c0b2f7Stbbdev // 3) the nodes will send to multiple successors. 92*51c0b2f7Stbbdev CHECK_MESSAGE( (int)c == p, "" ); 93*51c0b2f7Stbbdev } 94*51c0b2f7Stbbdev 95*51c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 96*51c0b2f7Stbbdev tbb::flow::remove_edge( n, *receivers[r] ); 97*51c0b2f7Stbbdev } 98*51c0b2f7Stbbdev } 99*51c0b2f7Stbbdev } 100*51c0b2f7Stbbdev 101*51c0b2f7Stbbdev template< typename OutputType, typename Body > 102*51c0b2f7Stbbdev void continue_nodes( Body body ) { 103*51c0b2f7Stbbdev for (int p = 1; p < 2*4/*MaxThread*/; ++p) { 104*51c0b2f7Stbbdev tbb::flow::graph g; 105*51c0b2f7Stbbdev tbb::flow::continue_node< OutputType > exe_node( g, body ); 106*51c0b2f7Stbbdev run_continue_nodes( p, g, exe_node); 107*51c0b2f7Stbbdev exe_node.try_put(tbb::flow::continue_msg()); 108*51c0b2f7Stbbdev tbb::flow::continue_node< OutputType > exe_node_copy( exe_node ); 109*51c0b2f7Stbbdev run_continue_nodes( p, g, exe_node_copy); 110*51c0b2f7Stbbdev } 111*51c0b2f7Stbbdev } 112*51c0b2f7Stbbdev 113*51c0b2f7Stbbdev const size_t Offset = 123; 114*51c0b2f7Stbbdev std::atomic<size_t> global_execute_count; 115*51c0b2f7Stbbdev 116*51c0b2f7Stbbdev template< typename OutputType > 117*51c0b2f7Stbbdev struct inc_functor { 118*51c0b2f7Stbbdev 119*51c0b2f7Stbbdev std::atomic<size_t> local_execute_count; 120*51c0b2f7Stbbdev inc_functor( ) { local_execute_count = 0; } 121*51c0b2f7Stbbdev inc_functor( const inc_functor &f ) { local_execute_count = size_t(f.local_execute_count); } 122*51c0b2f7Stbbdev void operator=(const inc_functor &f) { local_execute_count = size_t(f.local_execute_count); } 123*51c0b2f7Stbbdev 124*51c0b2f7Stbbdev OutputType operator()( tbb::flow::continue_msg ) { 125*51c0b2f7Stbbdev ++global_execute_count; 126*51c0b2f7Stbbdev ++local_execute_count; 127*51c0b2f7Stbbdev return OutputType(); 128*51c0b2f7Stbbdev } 129*51c0b2f7Stbbdev 130*51c0b2f7Stbbdev }; 131*51c0b2f7Stbbdev 132*51c0b2f7Stbbdev template< typename OutputType > 133*51c0b2f7Stbbdev void continue_nodes_with_copy( ) { 134*51c0b2f7Stbbdev 135*51c0b2f7Stbbdev for (int p = 1; p < 2*4/*MaxThread*/; ++p) { 136*51c0b2f7Stbbdev tbb::flow::graph g; 137*51c0b2f7Stbbdev inc_functor<OutputType> cf; 138*51c0b2f7Stbbdev cf.local_execute_count = Offset; 139*51c0b2f7Stbbdev global_execute_count = Offset; 140*51c0b2f7Stbbdev 141*51c0b2f7Stbbdev tbb::flow::continue_node< OutputType > exe_node( g, cf ); 142*51c0b2f7Stbbdev fake_continue_sender fake_sender; 143*51c0b2f7Stbbdev for (size_t i = 0; i < N; ++i) { 144*51c0b2f7Stbbdev tbb::detail::d1::register_predecessor(exe_node, fake_sender); 145*51c0b2f7Stbbdev } 146*51c0b2f7Stbbdev 147*51c0b2f7Stbbdev for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { 148*51c0b2f7Stbbdev std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers; 149*51c0b2f7Stbbdev for (size_t i = 0; i < num_receivers; ++i) { 150*51c0b2f7Stbbdev receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) ); 151*51c0b2f7Stbbdev } 152*51c0b2f7Stbbdev 153*51c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 154*51c0b2f7Stbbdev tbb::flow::make_edge( exe_node, *receivers[r] ); 155*51c0b2f7Stbbdev } 156*51c0b2f7Stbbdev 157*51c0b2f7Stbbdev utils::NativeParallelFor( p, parallel_puts<tbb::flow::continue_msg>(exe_node) ); 158*51c0b2f7Stbbdev g.wait_for_all(); 159*51c0b2f7Stbbdev 160*51c0b2f7Stbbdev // 2) the nodes will receive puts from multiple predecessors simultaneously, 161*51c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 162*51c0b2f7Stbbdev size_t c = receivers[r]->my_count; 163*51c0b2f7Stbbdev // 3) the nodes will send to multiple successors. 164*51c0b2f7Stbbdev CHECK_MESSAGE( (int)c == p, "" ); 165*51c0b2f7Stbbdev } 166*51c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 167*51c0b2f7Stbbdev tbb::flow::remove_edge( exe_node, *receivers[r] ); 168*51c0b2f7Stbbdev } 169*51c0b2f7Stbbdev } 170*51c0b2f7Stbbdev 171*51c0b2f7Stbbdev // validate that the local body matches the global execute_count and both are correct 172*51c0b2f7Stbbdev inc_functor<OutputType> body_copy = tbb::flow::copy_body< inc_functor<OutputType> >( exe_node ); 173*51c0b2f7Stbbdev const size_t expected_count = p*MAX_NODES + Offset; 174*51c0b2f7Stbbdev size_t global_count = global_execute_count; 175*51c0b2f7Stbbdev size_t inc_count = body_copy.local_execute_count; 176*51c0b2f7Stbbdev CHECK_MESSAGE( global_count == expected_count, "" ); 177*51c0b2f7Stbbdev CHECK_MESSAGE( global_count == inc_count, "" ); 178*51c0b2f7Stbbdev g.reset(tbb::flow::rf_reset_bodies); 179*51c0b2f7Stbbdev body_copy = tbb::flow::copy_body< inc_functor<OutputType> >( exe_node ); 180*51c0b2f7Stbbdev inc_count = body_copy.local_execute_count; 181*51c0b2f7Stbbdev CHECK_MESSAGE( ( Offset == inc_count), "reset(rf_reset_bodies) did not reset functor" ); 182*51c0b2f7Stbbdev 183*51c0b2f7Stbbdev } 184*51c0b2f7Stbbdev } 185*51c0b2f7Stbbdev 186*51c0b2f7Stbbdev template< typename OutputType > 187*51c0b2f7Stbbdev void run_continue_nodes() { 188*51c0b2f7Stbbdev harness_graph_executor< tbb::flow::continue_msg, OutputType>::max_executors = 0; 189*51c0b2f7Stbbdev continue_nodes<OutputType>( []( tbb::flow::continue_msg i ) -> OutputType { return harness_graph_executor<tbb::flow::continue_msg, OutputType>::func(i); } ); 190*51c0b2f7Stbbdev continue_nodes<OutputType>( &harness_graph_executor<tbb::flow::continue_msg, OutputType>::func ); 191*51c0b2f7Stbbdev continue_nodes<OutputType>( typename harness_graph_executor<tbb::flow::continue_msg, OutputType>::functor() ); 192*51c0b2f7Stbbdev continue_nodes_with_copy<OutputType>(); 193*51c0b2f7Stbbdev } 194*51c0b2f7Stbbdev 195*51c0b2f7Stbbdev //! Tests limited concurrency cases for nodes that accept data messages 196*51c0b2f7Stbbdev void test_concurrency(int num_threads) { 197*51c0b2f7Stbbdev tbb::task_arena arena(num_threads); 198*51c0b2f7Stbbdev arena.execute( 199*51c0b2f7Stbbdev [&] { 200*51c0b2f7Stbbdev run_continue_nodes<tbb::flow::continue_msg>(); 201*51c0b2f7Stbbdev run_continue_nodes<int>(); 202*51c0b2f7Stbbdev run_continue_nodes<utils::NoAssign>(); 203*51c0b2f7Stbbdev } 204*51c0b2f7Stbbdev ); 205*51c0b2f7Stbbdev } 206*51c0b2f7Stbbdev /* 207*51c0b2f7Stbbdev * Connection of two graphs is not currently supported, but works to some limited extent. 208*51c0b2f7Stbbdev * This test is included to check for backward compatibility. It checks that a continue_node 209*51c0b2f7Stbbdev * with predecessors in two different graphs receives the required 210*51c0b2f7Stbbdev * number of continue messages before it executes. 211*51c0b2f7Stbbdev */ 212*51c0b2f7Stbbdev using namespace tbb::flow; 213*51c0b2f7Stbbdev 214*51c0b2f7Stbbdev struct add_to_counter { 215*51c0b2f7Stbbdev int* counter; 216*51c0b2f7Stbbdev add_to_counter(int& var):counter(&var){} 217*51c0b2f7Stbbdev void operator()(continue_msg){*counter+=1;} 218*51c0b2f7Stbbdev }; 219*51c0b2f7Stbbdev 220*51c0b2f7Stbbdev void test_two_graphs(){ 221*51c0b2f7Stbbdev int count=0; 222*51c0b2f7Stbbdev 223*51c0b2f7Stbbdev //graph g with broadcast_node and continue_node 224*51c0b2f7Stbbdev graph g; 225*51c0b2f7Stbbdev broadcast_node<continue_msg> start_g(g); 226*51c0b2f7Stbbdev continue_node<continue_msg> first_g(g, add_to_counter(count)); 227*51c0b2f7Stbbdev 228*51c0b2f7Stbbdev //graph h with broadcast_node 229*51c0b2f7Stbbdev graph h; 230*51c0b2f7Stbbdev broadcast_node<continue_msg> start_h(h); 231*51c0b2f7Stbbdev 232*51c0b2f7Stbbdev //making two edges to first_g from the two graphs 233*51c0b2f7Stbbdev make_edge(start_g,first_g); 234*51c0b2f7Stbbdev make_edge(start_h, first_g); 235*51c0b2f7Stbbdev 236*51c0b2f7Stbbdev //two try_puts from the two graphs 237*51c0b2f7Stbbdev start_g.try_put(continue_msg()); 238*51c0b2f7Stbbdev start_h.try_put(continue_msg()); 239*51c0b2f7Stbbdev g.wait_for_all(); 240*51c0b2f7Stbbdev CHECK_MESSAGE( (count==1), "Not all continue messages received"); 241*51c0b2f7Stbbdev 242*51c0b2f7Stbbdev //two try_puts from the graph that doesn't contain the node 243*51c0b2f7Stbbdev count=0; 244*51c0b2f7Stbbdev start_h.try_put(continue_msg()); 245*51c0b2f7Stbbdev start_h.try_put(continue_msg()); 246*51c0b2f7Stbbdev g.wait_for_all(); 247*51c0b2f7Stbbdev CHECK_MESSAGE( (count==1), "Not all continue messages received -1"); 248*51c0b2f7Stbbdev 249*51c0b2f7Stbbdev //only one try_put 250*51c0b2f7Stbbdev count=0; 251*51c0b2f7Stbbdev start_g.try_put(continue_msg()); 252*51c0b2f7Stbbdev g.wait_for_all(); 253*51c0b2f7Stbbdev CHECK_MESSAGE( (count==0), "Node executed without waiting for all predecessors"); 254*51c0b2f7Stbbdev } 255*51c0b2f7Stbbdev 256*51c0b2f7Stbbdev struct lightweight_policy_body { 257*51c0b2f7Stbbdev const std::thread::id my_thread_id; 258*51c0b2f7Stbbdev std::atomic<size_t>& my_count; 259*51c0b2f7Stbbdev 260*51c0b2f7Stbbdev lightweight_policy_body( std::atomic<size_t>& count ) 261*51c0b2f7Stbbdev : my_thread_id(std::this_thread::get_id()), my_count(count) 262*51c0b2f7Stbbdev { 263*51c0b2f7Stbbdev my_count = 0; 264*51c0b2f7Stbbdev } 265*51c0b2f7Stbbdev lightweight_policy_body& operator=(const lightweight_policy_body&) = delete; 266*51c0b2f7Stbbdev void operator()(tbb::flow::continue_msg) { 267*51c0b2f7Stbbdev ++my_count; 268*51c0b2f7Stbbdev std::thread::id body_thread_id = std::this_thread::get_id(); 269*51c0b2f7Stbbdev CHECK_MESSAGE( (body_thread_id == my_thread_id), "Body executed as not lightweight"); 270*51c0b2f7Stbbdev } 271*51c0b2f7Stbbdev }; 272*51c0b2f7Stbbdev 273*51c0b2f7Stbbdev void test_lightweight_policy() { 274*51c0b2f7Stbbdev tbb::flow::graph g; 275*51c0b2f7Stbbdev std::atomic<size_t> count1; 276*51c0b2f7Stbbdev std::atomic<size_t> count2; 277*51c0b2f7Stbbdev tbb::flow::continue_node<tbb::flow::continue_msg, tbb::flow::lightweight> 278*51c0b2f7Stbbdev node1(g, lightweight_policy_body(count1)); 279*51c0b2f7Stbbdev tbb::flow::continue_node<tbb::flow::continue_msg, tbb::flow::lightweight> 280*51c0b2f7Stbbdev node2(g, lightweight_policy_body(count2)); 281*51c0b2f7Stbbdev 282*51c0b2f7Stbbdev tbb::flow::make_edge(node1, node2); 283*51c0b2f7Stbbdev const size_t n = 10; 284*51c0b2f7Stbbdev for(size_t i = 0; i < n; ++i) { 285*51c0b2f7Stbbdev node1.try_put(tbb::flow::continue_msg()); 286*51c0b2f7Stbbdev } 287*51c0b2f7Stbbdev g.wait_for_all(); 288*51c0b2f7Stbbdev 289*51c0b2f7Stbbdev lightweight_policy_body body1 = tbb::flow::copy_body<lightweight_policy_body>(node1); 290*51c0b2f7Stbbdev lightweight_policy_body body2 = tbb::flow::copy_body<lightweight_policy_body>(node2); 291*51c0b2f7Stbbdev CHECK_MESSAGE( (body1.my_count == n), "Body of the first node needs to be executed N times"); 292*51c0b2f7Stbbdev CHECK_MESSAGE( (body2.my_count == n), "Body of the second node needs to be executed N times"); 293*51c0b2f7Stbbdev } 294*51c0b2f7Stbbdev 295*51c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 296*51c0b2f7Stbbdev #include <array> 297*51c0b2f7Stbbdev #include <vector> 298*51c0b2f7Stbbdev void test_follows_and_precedes_api() { 299*51c0b2f7Stbbdev using msg_t = tbb::flow::continue_msg; 300*51c0b2f7Stbbdev 301*51c0b2f7Stbbdev std::array<msg_t, 3> messages_for_follows = { { msg_t(), msg_t(), msg_t() } }; 302*51c0b2f7Stbbdev std::vector<msg_t> messages_for_precedes = { msg_t() }; 303*51c0b2f7Stbbdev 304*51c0b2f7Stbbdev auto pass_through = [](const msg_t& msg) { return msg; }; 305*51c0b2f7Stbbdev 306*51c0b2f7Stbbdev follows_and_precedes_testing::test_follows 307*51c0b2f7Stbbdev <msg_t, tbb::flow::continue_node<msg_t>> 308*51c0b2f7Stbbdev (messages_for_follows, pass_through, node_priority_t(0)); 309*51c0b2f7Stbbdev 310*51c0b2f7Stbbdev follows_and_precedes_testing::test_precedes 311*51c0b2f7Stbbdev <msg_t, tbb::flow::continue_node<msg_t>> 312*51c0b2f7Stbbdev (messages_for_precedes, /* number_of_predecessors = */0, pass_through, node_priority_t(1)); 313*51c0b2f7Stbbdev } 314*51c0b2f7Stbbdev #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 315*51c0b2f7Stbbdev 316*51c0b2f7Stbbdev #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 317*51c0b2f7Stbbdev 318*51c0b2f7Stbbdev template <typename ExpectedType, typename Body> 319*51c0b2f7Stbbdev void test_deduction_guides_common(Body body) { 320*51c0b2f7Stbbdev using namespace tbb::flow; 321*51c0b2f7Stbbdev graph g; 322*51c0b2f7Stbbdev 323*51c0b2f7Stbbdev continue_node c1(g, body); 324*51c0b2f7Stbbdev static_assert(std::is_same_v<decltype(c1), continue_node<ExpectedType>>); 325*51c0b2f7Stbbdev 326*51c0b2f7Stbbdev continue_node c2(g, body, lightweight()); 327*51c0b2f7Stbbdev static_assert(std::is_same_v<decltype(c2), continue_node<ExpectedType, lightweight>>); 328*51c0b2f7Stbbdev 329*51c0b2f7Stbbdev continue_node c3(g, 5, body); 330*51c0b2f7Stbbdev static_assert(std::is_same_v<decltype(c3), continue_node<ExpectedType>>); 331*51c0b2f7Stbbdev 332*51c0b2f7Stbbdev continue_node c4(g, 5, body, lightweight()); 333*51c0b2f7Stbbdev static_assert(std::is_same_v<decltype(c4), continue_node<ExpectedType, lightweight>>); 334*51c0b2f7Stbbdev 335*51c0b2f7Stbbdev continue_node c5(g, body, node_priority_t(5)); 336*51c0b2f7Stbbdev static_assert(std::is_same_v<decltype(c5), continue_node<ExpectedType>>); 337*51c0b2f7Stbbdev 338*51c0b2f7Stbbdev continue_node c6(g, body, lightweight(), node_priority_t(5)); 339*51c0b2f7Stbbdev static_assert(std::is_same_v<decltype(c6), continue_node<ExpectedType, lightweight>>); 340*51c0b2f7Stbbdev 341*51c0b2f7Stbbdev continue_node c7(g, 5, body, node_priority_t(5)); 342*51c0b2f7Stbbdev static_assert(std::is_same_v<decltype(c7), continue_node<ExpectedType>>); 343*51c0b2f7Stbbdev 344*51c0b2f7Stbbdev continue_node c8(g, 5, body, lightweight(), node_priority_t(5)); 345*51c0b2f7Stbbdev static_assert(std::is_same_v<decltype(c8), continue_node<ExpectedType, lightweight>>); 346*51c0b2f7Stbbdev 347*51c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 348*51c0b2f7Stbbdev broadcast_node<continue_msg> b(g); 349*51c0b2f7Stbbdev 350*51c0b2f7Stbbdev continue_node c9(follows(b), body); 351*51c0b2f7Stbbdev static_assert(std::is_same_v<decltype(c9), continue_node<ExpectedType>>); 352*51c0b2f7Stbbdev 353*51c0b2f7Stbbdev continue_node c10(follows(b), body, lightweight()); 354*51c0b2f7Stbbdev static_assert(std::is_same_v<decltype(c10), continue_node<ExpectedType, lightweight>>); 355*51c0b2f7Stbbdev 356*51c0b2f7Stbbdev continue_node c11(follows(b), 5, body); 357*51c0b2f7Stbbdev static_assert(std::is_same_v<decltype(c11), continue_node<ExpectedType>>); 358*51c0b2f7Stbbdev 359*51c0b2f7Stbbdev continue_node c12(follows(b), 5, body, lightweight()); 360*51c0b2f7Stbbdev static_assert(std::is_same_v<decltype(c12), continue_node<ExpectedType, lightweight>>); 361*51c0b2f7Stbbdev 362*51c0b2f7Stbbdev continue_node c13(follows(b), body, node_priority_t(5)); 363*51c0b2f7Stbbdev static_assert(std::is_same_v<decltype(c13), continue_node<ExpectedType>>); 364*51c0b2f7Stbbdev 365*51c0b2f7Stbbdev continue_node c14(follows(b), body, lightweight(), node_priority_t(5)); 366*51c0b2f7Stbbdev static_assert(std::is_same_v<decltype(c14), continue_node<ExpectedType, lightweight>>); 367*51c0b2f7Stbbdev 368*51c0b2f7Stbbdev continue_node c15(follows(b), 5, body, node_priority_t(5)); 369*51c0b2f7Stbbdev static_assert(std::is_same_v<decltype(c15), continue_node<ExpectedType>>); 370*51c0b2f7Stbbdev 371*51c0b2f7Stbbdev continue_node c16(follows(b), 5, body, lightweight(), node_priority_t(5)); 372*51c0b2f7Stbbdev static_assert(std::is_same_v<decltype(c16), continue_node<ExpectedType, lightweight>>); 373*51c0b2f7Stbbdev #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 374*51c0b2f7Stbbdev 375*51c0b2f7Stbbdev continue_node c17(c1); 376*51c0b2f7Stbbdev static_assert(std::is_same_v<decltype(c17), continue_node<ExpectedType>>); 377*51c0b2f7Stbbdev } 378*51c0b2f7Stbbdev 379*51c0b2f7Stbbdev int continue_body_f(const tbb::flow::continue_msg&) { return 1; } 380*51c0b2f7Stbbdev void continue_void_body_f(const tbb::flow::continue_msg&) {} 381*51c0b2f7Stbbdev 382*51c0b2f7Stbbdev void test_deduction_guides() { 383*51c0b2f7Stbbdev using tbb::flow::continue_msg; 384*51c0b2f7Stbbdev test_deduction_guides_common<int>([](const continue_msg&)->int { return 1; } ); 385*51c0b2f7Stbbdev test_deduction_guides_common<continue_msg>([](const continue_msg&) {}); 386*51c0b2f7Stbbdev 387*51c0b2f7Stbbdev test_deduction_guides_common<int>([](const continue_msg&) mutable ->int { return 1; }); 388*51c0b2f7Stbbdev test_deduction_guides_common<continue_msg>([](const continue_msg&) mutable {}); 389*51c0b2f7Stbbdev 390*51c0b2f7Stbbdev test_deduction_guides_common<int>(continue_body_f); 391*51c0b2f7Stbbdev test_deduction_guides_common<continue_msg>(continue_void_body_f); 392*51c0b2f7Stbbdev } 393*51c0b2f7Stbbdev 394*51c0b2f7Stbbdev #endif // __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 395*51c0b2f7Stbbdev 396*51c0b2f7Stbbdev // TODO: use pass_through from test_function_node instead 397*51c0b2f7Stbbdev template<typename T> 398*51c0b2f7Stbbdev struct passing_body { 399*51c0b2f7Stbbdev T operator()(const T& val) { 400*51c0b2f7Stbbdev return val; 401*51c0b2f7Stbbdev } 402*51c0b2f7Stbbdev }; 403*51c0b2f7Stbbdev 404*51c0b2f7Stbbdev /* 405*51c0b2f7Stbbdev The test covers the case when a node with non-default mutex type is a predecessor for continue_node, 406*51c0b2f7Stbbdev because there used to be a bug when make_edge(node, continue_node) 407*51c0b2f7Stbbdev did not update continue_node's predecesosor threshold 408*51c0b2f7Stbbdev since the specialization of node's successor_cache for a continue_node was not chosen. 409*51c0b2f7Stbbdev */ 410*51c0b2f7Stbbdev void test_successor_cache_specialization() { 411*51c0b2f7Stbbdev using namespace tbb::flow; 412*51c0b2f7Stbbdev 413*51c0b2f7Stbbdev graph g; 414*51c0b2f7Stbbdev 415*51c0b2f7Stbbdev broadcast_node<continue_msg> node_with_default_mutex_type(g); 416*51c0b2f7Stbbdev buffer_node<continue_msg> node_with_non_default_mutex_type(g); 417*51c0b2f7Stbbdev 418*51c0b2f7Stbbdev continue_node<continue_msg> node(g, passing_body<continue_msg>()); 419*51c0b2f7Stbbdev 420*51c0b2f7Stbbdev make_edge(node_with_default_mutex_type, node); 421*51c0b2f7Stbbdev make_edge(node_with_non_default_mutex_type, node); 422*51c0b2f7Stbbdev 423*51c0b2f7Stbbdev buffer_node<continue_msg> buf(g); 424*51c0b2f7Stbbdev 425*51c0b2f7Stbbdev make_edge(node, buf); 426*51c0b2f7Stbbdev 427*51c0b2f7Stbbdev node_with_default_mutex_type.try_put(continue_msg()); 428*51c0b2f7Stbbdev node_with_non_default_mutex_type.try_put(continue_msg()); 429*51c0b2f7Stbbdev 430*51c0b2f7Stbbdev g.wait_for_all(); 431*51c0b2f7Stbbdev 432*51c0b2f7Stbbdev continue_msg storage; 433*51c0b2f7Stbbdev CHECK_MESSAGE((buf.try_get(storage) && !buf.try_get(storage)), 434*51c0b2f7Stbbdev "Wrong number of messages is passed via continue_node"); 435*51c0b2f7Stbbdev } 436*51c0b2f7Stbbdev 437*51c0b2f7Stbbdev //! Test concurrent continue_node for correctness 438*51c0b2f7Stbbdev //! \brief \ref error_guessing 439*51c0b2f7Stbbdev TEST_CASE("Concurrency testing") { 440*51c0b2f7Stbbdev for( unsigned p=utils::MinThread; p<=utils::MaxThread; ++p ) { 441*51c0b2f7Stbbdev test_concurrency(p); 442*51c0b2f7Stbbdev } 443*51c0b2f7Stbbdev } 444*51c0b2f7Stbbdev 445*51c0b2f7Stbbdev //! Test concurrent continue_node in separate graphs 446*51c0b2f7Stbbdev //! \brief \ref error_guessing 447*51c0b2f7Stbbdev TEST_CASE("Two graphs") { test_two_graphs(); } 448*51c0b2f7Stbbdev 449*51c0b2f7Stbbdev //! Test basic behaviour with lightweight body 450*51c0b2f7Stbbdev //! \brief \ref requirement \ref error_guessing 451*51c0b2f7Stbbdev TEST_CASE( "Lightweight policy" ) { test_lightweight_policy(); } 452*51c0b2f7Stbbdev 453*51c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 454*51c0b2f7Stbbdev //! Test deprecated follows and preceedes API 455*51c0b2f7Stbbdev //! \brief \ref error_guessing 456*51c0b2f7Stbbdev TEST_CASE( "Support for follows and precedes API" ) { test_follows_and_precedes_api(); } 457*51c0b2f7Stbbdev #endif 458*51c0b2f7Stbbdev 459*51c0b2f7Stbbdev #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 460*51c0b2f7Stbbdev //! Test deduction guides 461*51c0b2f7Stbbdev //! \brief requirement 462*51c0b2f7Stbbdev TEST_CASE( "Deduction guides" ) { test_deduction_guides(); } 463*51c0b2f7Stbbdev #endif 464*51c0b2f7Stbbdev 465*51c0b2f7Stbbdev //! Test for successor cache specialization 466*51c0b2f7Stbbdev //! \brief \ref regression 467*51c0b2f7Stbbdev TEST_CASE( "Regression for successor cache specialization" ) { 468*51c0b2f7Stbbdev test_successor_cache_specialization(); 469*51c0b2f7Stbbdev } 470