1*51c0b2f7Stbbdev /* 2*51c0b2f7Stbbdev Copyright (c) 2005-2020 Intel Corporation 3*51c0b2f7Stbbdev 4*51c0b2f7Stbbdev Licensed under the Apache License, Version 2.0 (the "License"); 5*51c0b2f7Stbbdev you may not use this file except in compliance with the License. 6*51c0b2f7Stbbdev You may obtain a copy of the License at 7*51c0b2f7Stbbdev 8*51c0b2f7Stbbdev http://www.apache.org/licenses/LICENSE-2.0 9*51c0b2f7Stbbdev 10*51c0b2f7Stbbdev Unless required by applicable law or agreed to in writing, software 11*51c0b2f7Stbbdev distributed under the License is distributed on an "AS IS" BASIS, 12*51c0b2f7Stbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13*51c0b2f7Stbbdev See the License for the specific language governing permissions and 14*51c0b2f7Stbbdev limitations under the License. 15*51c0b2f7Stbbdev */ 16*51c0b2f7Stbbdev 17*51c0b2f7Stbbdev // have to expose the reset_node method to be able to reset a function_body 18*51c0b2f7Stbbdev 19*51c0b2f7Stbbdev #include "common/config.h" 20*51c0b2f7Stbbdev 21*51c0b2f7Stbbdev #include "tbb/flow_graph.h" 22*51c0b2f7Stbbdev 23*51c0b2f7Stbbdev #include "common/test.h" 24*51c0b2f7Stbbdev #include "common/utils.h" 25*51c0b2f7Stbbdev #include "common/utils_assert.h" 26*51c0b2f7Stbbdev 27*51c0b2f7Stbbdev 28*51c0b2f7Stbbdev //! \file test_input_node.cpp 29*51c0b2f7Stbbdev //! \brief Test for [flow_graph.input_node] specification 30*51c0b2f7Stbbdev 31*51c0b2f7Stbbdev 32*51c0b2f7Stbbdev using tbb::detail::d1::graph_task; 33*51c0b2f7Stbbdev using tbb::detail::d1::SUCCESSFULLY_ENQUEUED; 34*51c0b2f7Stbbdev 35*51c0b2f7Stbbdev const int N = 1000; 36*51c0b2f7Stbbdev 37*51c0b2f7Stbbdev template< typename T > 38*51c0b2f7Stbbdev class test_push_receiver : public tbb::flow::receiver<T>, utils::NoAssign { 39*51c0b2f7Stbbdev 40*51c0b2f7Stbbdev std::atomic<int> my_counters[N]; 41*51c0b2f7Stbbdev tbb::flow::graph& my_graph; 42*51c0b2f7Stbbdev 43*51c0b2f7Stbbdev public: 44*51c0b2f7Stbbdev 45*51c0b2f7Stbbdev test_push_receiver(tbb::flow::graph& g) : my_graph(g) { 46*51c0b2f7Stbbdev for (int i = 0; i < N; ++i ) 47*51c0b2f7Stbbdev my_counters[i] = 0; 48*51c0b2f7Stbbdev } 49*51c0b2f7Stbbdev 50*51c0b2f7Stbbdev int get_count( int i ) { 51*51c0b2f7Stbbdev int v = my_counters[i]; 52*51c0b2f7Stbbdev return v; 53*51c0b2f7Stbbdev } 54*51c0b2f7Stbbdev 55*51c0b2f7Stbbdev typedef typename tbb::flow::receiver<T>::predecessor_type predecessor_type; 56*51c0b2f7Stbbdev 57*51c0b2f7Stbbdev graph_task* try_put_task( const T &v ) override { 58*51c0b2f7Stbbdev int i = (int)v; 59*51c0b2f7Stbbdev ++my_counters[i]; 60*51c0b2f7Stbbdev return const_cast<graph_task*>(SUCCESSFULLY_ENQUEUED); 61*51c0b2f7Stbbdev } 62*51c0b2f7Stbbdev 63*51c0b2f7Stbbdev tbb::flow::graph& graph_reference() const override { 64*51c0b2f7Stbbdev return my_graph; 65*51c0b2f7Stbbdev } 66*51c0b2f7Stbbdev }; 67*51c0b2f7Stbbdev 68*51c0b2f7Stbbdev template< typename T > 69*51c0b2f7Stbbdev class my_input_body { 70*51c0b2f7Stbbdev 71*51c0b2f7Stbbdev unsigned my_count; 72*51c0b2f7Stbbdev int *ninvocations; 73*51c0b2f7Stbbdev 74*51c0b2f7Stbbdev public: 75*51c0b2f7Stbbdev 76*51c0b2f7Stbbdev my_input_body() : ninvocations(NULL) { my_count = 0; } 77*51c0b2f7Stbbdev my_input_body(int &_inv) : ninvocations(&_inv) { my_count = 0; } 78*51c0b2f7Stbbdev 79*51c0b2f7Stbbdev T operator()( tbb::flow_control& fc ) { 80*51c0b2f7Stbbdev T v = (T)my_count++; 81*51c0b2f7Stbbdev if(ninvocations) ++(*ninvocations); 82*51c0b2f7Stbbdev if ( (int)v < N ){ 83*51c0b2f7Stbbdev return v; 84*51c0b2f7Stbbdev }else{ 85*51c0b2f7Stbbdev fc.stop(); 86*51c0b2f7Stbbdev return T(); 87*51c0b2f7Stbbdev } 88*51c0b2f7Stbbdev } 89*51c0b2f7Stbbdev 90*51c0b2f7Stbbdev }; 91*51c0b2f7Stbbdev 92*51c0b2f7Stbbdev template< typename T > 93*51c0b2f7Stbbdev class function_body { 94*51c0b2f7Stbbdev 95*51c0b2f7Stbbdev std::atomic<int> *my_counters; 96*51c0b2f7Stbbdev 97*51c0b2f7Stbbdev public: 98*51c0b2f7Stbbdev 99*51c0b2f7Stbbdev function_body( std::atomic<int> *counters ) : my_counters(counters) { 100*51c0b2f7Stbbdev for (int i = 0; i < N; ++i ) 101*51c0b2f7Stbbdev my_counters[i] = 0; 102*51c0b2f7Stbbdev } 103*51c0b2f7Stbbdev 104*51c0b2f7Stbbdev bool operator()( T v ) { 105*51c0b2f7Stbbdev ++my_counters[(int)v]; 106*51c0b2f7Stbbdev return true; 107*51c0b2f7Stbbdev } 108*51c0b2f7Stbbdev 109*51c0b2f7Stbbdev }; 110*51c0b2f7Stbbdev 111*51c0b2f7Stbbdev template< typename T > 112*51c0b2f7Stbbdev void test_single_dest() { 113*51c0b2f7Stbbdev // push only 114*51c0b2f7Stbbdev tbb::flow::graph g; 115*51c0b2f7Stbbdev tbb::flow::input_node<T> src(g, my_input_body<T>() ); 116*51c0b2f7Stbbdev test_push_receiver<T> dest(g); 117*51c0b2f7Stbbdev tbb::flow::make_edge( src, dest ); 118*51c0b2f7Stbbdev src.activate(); 119*51c0b2f7Stbbdev g.wait_for_all(); 120*51c0b2f7Stbbdev for (int i = 0; i < N; ++i ) { 121*51c0b2f7Stbbdev CHECK_MESSAGE( dest.get_count(i) == 1, "" ); 122*51c0b2f7Stbbdev } 123*51c0b2f7Stbbdev 124*51c0b2f7Stbbdev // push only 125*51c0b2f7Stbbdev std::atomic<int> counters3[N]; 126*51c0b2f7Stbbdev tbb::flow::input_node<T> src3(g, my_input_body<T>() ); 127*51c0b2f7Stbbdev src3.activate(); 128*51c0b2f7Stbbdev 129*51c0b2f7Stbbdev function_body<T> b3( counters3 ); 130*51c0b2f7Stbbdev tbb::flow::function_node<T,bool> dest3(g, tbb::flow::unlimited, b3 ); 131*51c0b2f7Stbbdev tbb::flow::make_edge( src3, dest3 ); 132*51c0b2f7Stbbdev g.wait_for_all(); 133*51c0b2f7Stbbdev for (int i = 0; i < N; ++i ) { 134*51c0b2f7Stbbdev int v = counters3[i]; 135*51c0b2f7Stbbdev CHECK_MESSAGE( v == 1, "" ); 136*51c0b2f7Stbbdev } 137*51c0b2f7Stbbdev 138*51c0b2f7Stbbdev // push & pull 139*51c0b2f7Stbbdev tbb::flow::input_node<T> src2(g, my_input_body<T>() ); 140*51c0b2f7Stbbdev src2.activate(); 141*51c0b2f7Stbbdev std::atomic<int> counters2[N]; 142*51c0b2f7Stbbdev 143*51c0b2f7Stbbdev function_body<T> b2( counters2 ); 144*51c0b2f7Stbbdev tbb::flow::function_node<T,bool,tbb::flow::rejecting> dest2(g, tbb::flow::serial, b2 ); 145*51c0b2f7Stbbdev tbb::flow::make_edge( src2, dest2 ); 146*51c0b2f7Stbbdev g.wait_for_all(); 147*51c0b2f7Stbbdev for (int i = 0; i < N; ++i ) { 148*51c0b2f7Stbbdev int v = counters2[i]; 149*51c0b2f7Stbbdev CHECK_MESSAGE( v == 1, "" ); 150*51c0b2f7Stbbdev } 151*51c0b2f7Stbbdev 152*51c0b2f7Stbbdev // test copy constructor 153*51c0b2f7Stbbdev tbb::flow::input_node<T> src_copy(src); 154*51c0b2f7Stbbdev src_copy.activate(); 155*51c0b2f7Stbbdev test_push_receiver<T> dest_c(g); 156*51c0b2f7Stbbdev CHECK_MESSAGE( src_copy.register_successor(dest_c), "" ); 157*51c0b2f7Stbbdev g.wait_for_all(); 158*51c0b2f7Stbbdev for (int i = 0; i < N; ++i ) { 159*51c0b2f7Stbbdev CHECK_MESSAGE( dest_c.get_count(i) == 1, "" ); 160*51c0b2f7Stbbdev } 161*51c0b2f7Stbbdev } 162*51c0b2f7Stbbdev 163*51c0b2f7Stbbdev void test_reset() { 164*51c0b2f7Stbbdev // input_node -> function_node 165*51c0b2f7Stbbdev tbb::flow::graph g; 166*51c0b2f7Stbbdev std::atomic<int> counters3[N]; 167*51c0b2f7Stbbdev tbb::flow::input_node<int> src3(g, my_input_body<int>()); 168*51c0b2f7Stbbdev src3.activate(); 169*51c0b2f7Stbbdev tbb::flow::input_node<int> src_inactive(g, my_input_body<int>()); 170*51c0b2f7Stbbdev function_body<int> b3( counters3 ); 171*51c0b2f7Stbbdev tbb::flow::function_node<int,bool> dest3(g, tbb::flow::unlimited, b3); 172*51c0b2f7Stbbdev tbb::flow::make_edge( src3, dest3 ); 173*51c0b2f7Stbbdev // source_node already in active state. Let the graph run, 174*51c0b2f7Stbbdev g.wait_for_all(); 175*51c0b2f7Stbbdev // check the array for each value. 176*51c0b2f7Stbbdev for (int i = 0; i < N; ++i ) { 177*51c0b2f7Stbbdev int v = counters3[i]; 178*51c0b2f7Stbbdev CHECK_MESSAGE( v == 1, "" ); 179*51c0b2f7Stbbdev counters3[i] = 0; 180*51c0b2f7Stbbdev } 181*51c0b2f7Stbbdev 182*51c0b2f7Stbbdev g.reset(tbb::flow::rf_reset_bodies); // <-- re-initializes the counts. 183*51c0b2f7Stbbdev // and spawns task to run input 184*51c0b2f7Stbbdev src3.activate(); 185*51c0b2f7Stbbdev 186*51c0b2f7Stbbdev g.wait_for_all(); 187*51c0b2f7Stbbdev // check output queue again. Should be the same contents. 188*51c0b2f7Stbbdev for (int i = 0; i < N; ++i ) { 189*51c0b2f7Stbbdev int v = counters3[i]; 190*51c0b2f7Stbbdev CHECK_MESSAGE( v == 1, "" ); 191*51c0b2f7Stbbdev counters3[i] = 0; 192*51c0b2f7Stbbdev } 193*51c0b2f7Stbbdev g.reset(); // doesn't reset the input_node_body to initial state, but does spawn a task 194*51c0b2f7Stbbdev // to run the input_node. 195*51c0b2f7Stbbdev 196*51c0b2f7Stbbdev g.wait_for_all(); 197*51c0b2f7Stbbdev // array should be all zero 198*51c0b2f7Stbbdev for (int i = 0; i < N; ++i ) { 199*51c0b2f7Stbbdev int v = counters3[i]; 200*51c0b2f7Stbbdev CHECK_MESSAGE( v == 0, "" ); 201*51c0b2f7Stbbdev } 202*51c0b2f7Stbbdev 203*51c0b2f7Stbbdev remove_edge(src3, dest3); 204*51c0b2f7Stbbdev make_edge(src_inactive, dest3); 205*51c0b2f7Stbbdev 206*51c0b2f7Stbbdev // src_inactive doesn't run 207*51c0b2f7Stbbdev g.wait_for_all(); 208*51c0b2f7Stbbdev for (int i = 0; i < N; ++i ) { 209*51c0b2f7Stbbdev int v = counters3[i]; 210*51c0b2f7Stbbdev CHECK_MESSAGE( v == 0, "" ); 211*51c0b2f7Stbbdev } 212*51c0b2f7Stbbdev 213*51c0b2f7Stbbdev // run graph 214*51c0b2f7Stbbdev src_inactive.activate(); 215*51c0b2f7Stbbdev g.wait_for_all(); 216*51c0b2f7Stbbdev // check output 217*51c0b2f7Stbbdev for (int i = 0; i < N; ++i ) { 218*51c0b2f7Stbbdev int v = counters3[i]; 219*51c0b2f7Stbbdev CHECK_MESSAGE( v == 1, "" ); 220*51c0b2f7Stbbdev counters3[i] = 0; 221*51c0b2f7Stbbdev } 222*51c0b2f7Stbbdev g.reset(tbb::flow::rf_reset_bodies); // <-- reinitializes the counts 223*51c0b2f7Stbbdev // src_inactive doesn't run 224*51c0b2f7Stbbdev g.wait_for_all(); 225*51c0b2f7Stbbdev for (int i = 0; i < N; ++i ) { 226*51c0b2f7Stbbdev int v = counters3[i]; 227*51c0b2f7Stbbdev CHECK_MESSAGE( v == 0, "" ); 228*51c0b2f7Stbbdev } 229*51c0b2f7Stbbdev 230*51c0b2f7Stbbdev // start it up 231*51c0b2f7Stbbdev src_inactive.activate(); 232*51c0b2f7Stbbdev g.wait_for_all(); 233*51c0b2f7Stbbdev for (int i = 0; i < N; ++i ) { 234*51c0b2f7Stbbdev int v = counters3[i]; 235*51c0b2f7Stbbdev CHECK_MESSAGE( v == 1, "" ); 236*51c0b2f7Stbbdev counters3[i] = 0; 237*51c0b2f7Stbbdev } 238*51c0b2f7Stbbdev g.reset(); // doesn't reset the input_node_body to initial state, and doesn't 239*51c0b2f7Stbbdev // spawn a task to run the input_node. 240*51c0b2f7Stbbdev 241*51c0b2f7Stbbdev g.wait_for_all(); 242*51c0b2f7Stbbdev // array should be all zero 243*51c0b2f7Stbbdev for (int i = 0; i < N; ++i ) { 244*51c0b2f7Stbbdev int v = counters3[i]; 245*51c0b2f7Stbbdev CHECK_MESSAGE( v == 0, "" ); 246*51c0b2f7Stbbdev } 247*51c0b2f7Stbbdev src_inactive.activate(); 248*51c0b2f7Stbbdev // input_node_body is already in final state, so input_node will not forward a message. 249*51c0b2f7Stbbdev g.wait_for_all(); 250*51c0b2f7Stbbdev for (int i = 0; i < N; ++i ) { 251*51c0b2f7Stbbdev int v = counters3[i]; 252*51c0b2f7Stbbdev CHECK_MESSAGE( v == 0, "" ); 253*51c0b2f7Stbbdev } 254*51c0b2f7Stbbdev } 255*51c0b2f7Stbbdev 256*51c0b2f7Stbbdev #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 257*51c0b2f7Stbbdev int input_body_f(tbb::flow_control&) { return 42; } 258*51c0b2f7Stbbdev 259*51c0b2f7Stbbdev void test_deduction_guides() { 260*51c0b2f7Stbbdev using namespace tbb::flow; 261*51c0b2f7Stbbdev graph g; 262*51c0b2f7Stbbdev 263*51c0b2f7Stbbdev auto lambda = [](tbb::flow_control&) { return 42; }; 264*51c0b2f7Stbbdev auto non_const_lambda = [](tbb::flow_control&) mutable { return 42; }; 265*51c0b2f7Stbbdev 266*51c0b2f7Stbbdev // Tests for input_node(graph&, Body) 267*51c0b2f7Stbbdev input_node s1(g, lambda); 268*51c0b2f7Stbbdev static_assert(std::is_same_v<decltype(s1), input_node<int>>); 269*51c0b2f7Stbbdev 270*51c0b2f7Stbbdev input_node s2(g, non_const_lambda); 271*51c0b2f7Stbbdev static_assert(std::is_same_v<decltype(s2), input_node<int>>); 272*51c0b2f7Stbbdev 273*51c0b2f7Stbbdev input_node s3(g, input_body_f); 274*51c0b2f7Stbbdev static_assert(std::is_same_v<decltype(s3), input_node<int>>); 275*51c0b2f7Stbbdev 276*51c0b2f7Stbbdev input_node s4(s3); 277*51c0b2f7Stbbdev static_assert(std::is_same_v<decltype(s4), input_node<int>>); 278*51c0b2f7Stbbdev 279*51c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 280*51c0b2f7Stbbdev broadcast_node<int> bc(g); 281*51c0b2f7Stbbdev 282*51c0b2f7Stbbdev // Tests for input_node(const node_set<Args...>&, Body) 283*51c0b2f7Stbbdev input_node s5(precedes(bc), lambda); 284*51c0b2f7Stbbdev static_assert(std::is_same_v<decltype(s5), input_node<int>>); 285*51c0b2f7Stbbdev 286*51c0b2f7Stbbdev input_node s6(precedes(bc), non_const_lambda); 287*51c0b2f7Stbbdev static_assert(std::is_same_v<decltype(s6), input_node<int>>); 288*51c0b2f7Stbbdev 289*51c0b2f7Stbbdev input_node s7(precedes(bc), input_body_f); 290*51c0b2f7Stbbdev static_assert(std::is_same_v<decltype(s7), input_node<int>>); 291*51c0b2f7Stbbdev #endif 292*51c0b2f7Stbbdev g.wait_for_all(); 293*51c0b2f7Stbbdev } 294*51c0b2f7Stbbdev 295*51c0b2f7Stbbdev #endif // __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 296*51c0b2f7Stbbdev 297*51c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 298*51c0b2f7Stbbdev #include <array> 299*51c0b2f7Stbbdev void test_follows_and_precedes_api() { 300*51c0b2f7Stbbdev using namespace tbb::flow; 301*51c0b2f7Stbbdev 302*51c0b2f7Stbbdev graph g; 303*51c0b2f7Stbbdev 304*51c0b2f7Stbbdev std::array<buffer_node<bool>, 3> successors {{ 305*51c0b2f7Stbbdev buffer_node<bool>(g), 306*51c0b2f7Stbbdev buffer_node<bool>(g), 307*51c0b2f7Stbbdev buffer_node<bool>(g) 308*51c0b2f7Stbbdev }}; 309*51c0b2f7Stbbdev 310*51c0b2f7Stbbdev bool do_try_put = true; 311*51c0b2f7Stbbdev input_node<bool> src( 312*51c0b2f7Stbbdev precedes(successors[0], successors[1], successors[2]), 313*51c0b2f7Stbbdev [&](tbb::flow_control& fc) -> bool { 314*51c0b2f7Stbbdev if(!do_try_put) 315*51c0b2f7Stbbdev fc.stop(); 316*51c0b2f7Stbbdev do_try_put = !do_try_put; 317*51c0b2f7Stbbdev return true; 318*51c0b2f7Stbbdev } 319*51c0b2f7Stbbdev ); 320*51c0b2f7Stbbdev 321*51c0b2f7Stbbdev src.activate(); 322*51c0b2f7Stbbdev g.wait_for_all(); 323*51c0b2f7Stbbdev 324*51c0b2f7Stbbdev bool storage; 325*51c0b2f7Stbbdev for(auto& successor: successors) { 326*51c0b2f7Stbbdev CHECK_MESSAGE((successor.try_get(storage) && !successor.try_get(storage)), 327*51c0b2f7Stbbdev "Not exact edge quantity was made"); 328*51c0b2f7Stbbdev } 329*51c0b2f7Stbbdev } 330*51c0b2f7Stbbdev #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 331*51c0b2f7Stbbdev 332*51c0b2f7Stbbdev //! Test push, push-pull behavior and copy constructor 333*51c0b2f7Stbbdev //! \brief \ref error_guessing \ref requirement 334*51c0b2f7Stbbdev TEST_CASE("Single destination tests"){ 335*51c0b2f7Stbbdev for ( unsigned int p = utils::MinThread; p < utils::MaxThread; ++p ) { 336*51c0b2f7Stbbdev tbb::task_arena arena(p); 337*51c0b2f7Stbbdev arena.execute( 338*51c0b2f7Stbbdev [&]() { 339*51c0b2f7Stbbdev test_single_dest<int>(); 340*51c0b2f7Stbbdev test_single_dest<float>(); 341*51c0b2f7Stbbdev } 342*51c0b2f7Stbbdev ); 343*51c0b2f7Stbbdev } 344*51c0b2f7Stbbdev } 345*51c0b2f7Stbbdev 346*51c0b2f7Stbbdev //! Test reset variants 347*51c0b2f7Stbbdev //! \brief \ref error_guessing 348*51c0b2f7Stbbdev TEST_CASE("Reset test"){ 349*51c0b2f7Stbbdev test_reset(); 350*51c0b2f7Stbbdev } 351*51c0b2f7Stbbdev 352*51c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 353*51c0b2f7Stbbdev //! Test follows and precedes API 354*51c0b2f7Stbbdev //! \brief \ref error_guessing 355*51c0b2f7Stbbdev TEST_CASE("Follows and precedes API"){ 356*51c0b2f7Stbbdev test_follows_and_precedes_api(); 357*51c0b2f7Stbbdev } 358*51c0b2f7Stbbdev #endif 359*51c0b2f7Stbbdev 360*51c0b2f7Stbbdev #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 361*51c0b2f7Stbbdev //! Test deduction guides 362*51c0b2f7Stbbdev //! \brief \ref requirement 363*51c0b2f7Stbbdev TEST_CASE("Deduction guides"){ 364*51c0b2f7Stbbdev test_deduction_guides(); 365*51c0b2f7Stbbdev } 366*51c0b2f7Stbbdev #endif 367*51c0b2f7Stbbdev 368*51c0b2f7Stbbdev //! Test try_get before activation 369*51c0b2f7Stbbdev //! \brief \ref error_guessing 370*51c0b2f7Stbbdev TEST_CASE("try_get before activation"){ 371*51c0b2f7Stbbdev tbb::flow::graph g; 372*51c0b2f7Stbbdev tbb::flow::input_node<int> in(g, [&](tbb::flow_control& fc) -> bool { fc.stop(); return 0;}); 373*51c0b2f7Stbbdev 374*51c0b2f7Stbbdev int tmp = -1; 375*51c0b2f7Stbbdev CHECK_MESSAGE((in.try_get(tmp) == false), "try_get before activation should not succeed"); 376*51c0b2f7Stbbdev } 377