1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 // have to expose the reset_node method to be able to reset a function_body 18 19 #include "common/config.h" 20 21 #include "tbb/flow_graph.h" 22 23 #include "common/test.h" 24 #include "common/utils.h" 25 #include "common/utils_assert.h" 26 27 28 //! \file test_input_node.cpp 29 //! \brief Test for [flow_graph.input_node] specification 30 31 32 using tbb::detail::d1::graph_task; 33 using tbb::detail::d1::SUCCESSFULLY_ENQUEUED; 34 35 const int N = 1000; 36 37 template< typename T > 38 class test_push_receiver : public tbb::flow::receiver<T>, utils::NoAssign { 39 40 std::atomic<int> my_counters[N]; 41 tbb::flow::graph& my_graph; 42 43 public: 44 45 test_push_receiver(tbb::flow::graph& g) : my_graph(g) { 46 for (int i = 0; i < N; ++i ) 47 my_counters[i] = 0; 48 } 49 50 int get_count( int i ) { 51 int v = my_counters[i]; 52 return v; 53 } 54 55 typedef typename tbb::flow::receiver<T>::predecessor_type predecessor_type; 56 57 graph_task* try_put_task( const T &v ) override { 58 int i = (int)v; 59 ++my_counters[i]; 60 return const_cast<graph_task*>(SUCCESSFULLY_ENQUEUED); 61 } 62 63 tbb::flow::graph& graph_reference() const override { 64 return my_graph; 65 } 66 }; 67 68 template< typename T > 69 class my_input_body { 70 71 unsigned my_count; 72 int *ninvocations; 73 74 public: 75 76 my_input_body() : ninvocations(NULL) { my_count = 0; } 77 my_input_body(int &_inv) : ninvocations(&_inv) { my_count = 0; } 78 79 T operator()( tbb::flow_control& fc ) { 80 T v = (T)my_count++; 81 if(ninvocations) ++(*ninvocations); 82 if ( (int)v < N ){ 83 return v; 84 }else{ 85 fc.stop(); 86 return T(); 87 } 88 } 89 90 }; 91 92 template< typename T > 93 class function_body { 94 95 std::atomic<int> *my_counters; 96 97 public: 98 99 function_body( std::atomic<int> *counters ) : my_counters(counters) { 100 for (int i = 0; i < N; ++i ) 101 my_counters[i] = 0; 102 } 103 104 bool operator()( T v ) { 105 ++my_counters[(int)v]; 106 return true; 107 } 108 109 }; 110 111 template< typename T > 112 void test_single_dest() { 113 // push only 114 tbb::flow::graph g; 115 tbb::flow::input_node<T> src(g, my_input_body<T>() ); 116 test_push_receiver<T> dest(g); 117 tbb::flow::make_edge( src, dest ); 118 src.activate(); 119 g.wait_for_all(); 120 for (int i = 0; i < N; ++i ) { 121 CHECK_MESSAGE( dest.get_count(i) == 1, "" ); 122 } 123 124 // push only 125 std::atomic<int> counters3[N]; 126 tbb::flow::input_node<T> src3(g, my_input_body<T>() ); 127 src3.activate(); 128 129 function_body<T> b3( counters3 ); 130 tbb::flow::function_node<T,bool> dest3(g, tbb::flow::unlimited, b3 ); 131 tbb::flow::make_edge( src3, dest3 ); 132 g.wait_for_all(); 133 for (int i = 0; i < N; ++i ) { 134 int v = counters3[i]; 135 CHECK_MESSAGE( v == 1, "" ); 136 } 137 138 // push & pull 139 tbb::flow::input_node<T> src2(g, my_input_body<T>() ); 140 src2.activate(); 141 std::atomic<int> counters2[N]; 142 143 function_body<T> b2( counters2 ); 144 tbb::flow::function_node<T,bool,tbb::flow::rejecting> dest2(g, tbb::flow::serial, b2 ); 145 tbb::flow::make_edge( src2, dest2 ); 146 g.wait_for_all(); 147 for (int i = 0; i < N; ++i ) { 148 int v = counters2[i]; 149 CHECK_MESSAGE( v == 1, "" ); 150 } 151 152 // test copy constructor 153 tbb::flow::input_node<T> src_copy(src); 154 src_copy.activate(); 155 test_push_receiver<T> dest_c(g); 156 CHECK_MESSAGE( src_copy.register_successor(dest_c), "" ); 157 g.wait_for_all(); 158 for (int i = 0; i < N; ++i ) { 159 CHECK_MESSAGE( dest_c.get_count(i) == 1, "" ); 160 } 161 } 162 163 void test_reset() { 164 // input_node -> function_node 165 tbb::flow::graph g; 166 std::atomic<int> counters3[N]; 167 tbb::flow::input_node<int> src3(g, my_input_body<int>()); 168 src3.activate(); 169 tbb::flow::input_node<int> src_inactive(g, my_input_body<int>()); 170 function_body<int> b3( counters3 ); 171 tbb::flow::function_node<int,bool> dest3(g, tbb::flow::unlimited, b3); 172 tbb::flow::make_edge( src3, dest3 ); 173 // source_node already in active state. Let the graph run, 174 g.wait_for_all(); 175 // check the array for each value. 176 for (int i = 0; i < N; ++i ) { 177 int v = counters3[i]; 178 CHECK_MESSAGE( v == 1, "" ); 179 counters3[i] = 0; 180 } 181 182 g.reset(tbb::flow::rf_reset_bodies); // <-- re-initializes the counts. 183 // and spawns task to run input 184 src3.activate(); 185 186 g.wait_for_all(); 187 // check output queue again. Should be the same contents. 188 for (int i = 0; i < N; ++i ) { 189 int v = counters3[i]; 190 CHECK_MESSAGE( v == 1, "" ); 191 counters3[i] = 0; 192 } 193 g.reset(); // doesn't reset the input_node_body to initial state, but does spawn a task 194 // to run the input_node. 195 196 g.wait_for_all(); 197 // array should be all zero 198 for (int i = 0; i < N; ++i ) { 199 int v = counters3[i]; 200 CHECK_MESSAGE( v == 0, "" ); 201 } 202 203 remove_edge(src3, dest3); 204 make_edge(src_inactive, dest3); 205 206 // src_inactive doesn't run 207 g.wait_for_all(); 208 for (int i = 0; i < N; ++i ) { 209 int v = counters3[i]; 210 CHECK_MESSAGE( v == 0, "" ); 211 } 212 213 // run graph 214 src_inactive.activate(); 215 g.wait_for_all(); 216 // check output 217 for (int i = 0; i < N; ++i ) { 218 int v = counters3[i]; 219 CHECK_MESSAGE( v == 1, "" ); 220 counters3[i] = 0; 221 } 222 g.reset(tbb::flow::rf_reset_bodies); // <-- reinitializes the counts 223 // src_inactive doesn't run 224 g.wait_for_all(); 225 for (int i = 0; i < N; ++i ) { 226 int v = counters3[i]; 227 CHECK_MESSAGE( v == 0, "" ); 228 } 229 230 // start it up 231 src_inactive.activate(); 232 g.wait_for_all(); 233 for (int i = 0; i < N; ++i ) { 234 int v = counters3[i]; 235 CHECK_MESSAGE( v == 1, "" ); 236 counters3[i] = 0; 237 } 238 g.reset(); // doesn't reset the input_node_body to initial state, and doesn't 239 // spawn a task to run the input_node. 240 241 g.wait_for_all(); 242 // array should be all zero 243 for (int i = 0; i < N; ++i ) { 244 int v = counters3[i]; 245 CHECK_MESSAGE( v == 0, "" ); 246 } 247 src_inactive.activate(); 248 // input_node_body is already in final state, so input_node will not forward a message. 249 g.wait_for_all(); 250 for (int i = 0; i < N; ++i ) { 251 int v = counters3[i]; 252 CHECK_MESSAGE( v == 0, "" ); 253 } 254 } 255 256 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 257 int input_body_f(tbb::flow_control&) { return 42; } 258 259 void test_deduction_guides() { 260 using namespace tbb::flow; 261 graph g; 262 263 auto lambda = [](tbb::flow_control&) { return 42; }; 264 auto non_const_lambda = [](tbb::flow_control&) mutable { return 42; }; 265 266 // Tests for input_node(graph&, Body) 267 input_node s1(g, lambda); 268 static_assert(std::is_same_v<decltype(s1), input_node<int>>); 269 270 input_node s2(g, non_const_lambda); 271 static_assert(std::is_same_v<decltype(s2), input_node<int>>); 272 273 input_node s3(g, input_body_f); 274 static_assert(std::is_same_v<decltype(s3), input_node<int>>); 275 276 input_node s4(s3); 277 static_assert(std::is_same_v<decltype(s4), input_node<int>>); 278 279 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 280 broadcast_node<int> bc(g); 281 282 // Tests for input_node(const node_set<Args...>&, Body) 283 input_node s5(precedes(bc), lambda); 284 static_assert(std::is_same_v<decltype(s5), input_node<int>>); 285 286 input_node s6(precedes(bc), non_const_lambda); 287 static_assert(std::is_same_v<decltype(s6), input_node<int>>); 288 289 input_node s7(precedes(bc), input_body_f); 290 static_assert(std::is_same_v<decltype(s7), input_node<int>>); 291 #endif 292 g.wait_for_all(); 293 } 294 295 #endif // __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 296 297 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 298 #include <array> 299 void test_follows_and_precedes_api() { 300 using namespace tbb::flow; 301 302 graph g; 303 304 std::array<buffer_node<bool>, 3> successors {{ 305 buffer_node<bool>(g), 306 buffer_node<bool>(g), 307 buffer_node<bool>(g) 308 }}; 309 310 bool do_try_put = true; 311 input_node<bool> src( 312 precedes(successors[0], successors[1], successors[2]), 313 [&](tbb::flow_control& fc) -> bool { 314 if(!do_try_put) 315 fc.stop(); 316 do_try_put = !do_try_put; 317 return true; 318 } 319 ); 320 321 src.activate(); 322 g.wait_for_all(); 323 324 bool storage; 325 for(auto& successor: successors) { 326 CHECK_MESSAGE((successor.try_get(storage) && !successor.try_get(storage)), 327 "Not exact edge quantity was made"); 328 } 329 } 330 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 331 332 //! Test push, push-pull behavior and copy constructor 333 //! \brief \ref error_guessing \ref requirement 334 TEST_CASE("Single destination tests"){ 335 for ( unsigned int p = utils::MinThread; p < utils::MaxThread; ++p ) { 336 tbb::task_arena arena(p); 337 arena.execute( 338 [&]() { 339 test_single_dest<int>(); 340 test_single_dest<float>(); 341 } 342 ); 343 } 344 } 345 346 //! Test reset variants 347 //! \brief \ref error_guessing 348 TEST_CASE("Reset test"){ 349 test_reset(); 350 } 351 352 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 353 //! Test follows and precedes API 354 //! \brief \ref error_guessing 355 TEST_CASE("Follows and precedes API"){ 356 test_follows_and_precedes_api(); 357 } 358 #endif 359 360 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 361 //! Test deduction guides 362 //! \brief \ref requirement 363 TEST_CASE("Deduction guides"){ 364 test_deduction_guides(); 365 } 366 #endif 367 368 //! Test try_get before activation 369 //! \brief \ref error_guessing 370 TEST_CASE("try_get before activation"){ 371 tbb::flow::graph g; 372 tbb::flow::input_node<int> in(g, [&](tbb::flow_control& fc) -> bool { fc.stop(); return 0;}); 373 374 int tmp = -1; 375 CHECK_MESSAGE((in.try_get(tmp) == false), "try_get before activation should not succeed"); 376 } 377