1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 // have to expose the reset_node method to be able to reset a function_body 18 19 #include "common/config.h" 20 21 #include "tbb/flow_graph.h" 22 23 #include "common/test.h" 24 #include "common/utils.h" 25 #include "common/utils_assert.h" 26 #include "common/concepts_common.h" 27 28 29 //! \file test_input_node.cpp 30 //! \brief Test for [flow_graph.input_node] specification 31 32 33 using tbb::detail::d1::graph_task; 34 using tbb::detail::d1::SUCCESSFULLY_ENQUEUED; 35 36 const int N = 1000; 37 38 template< typename T > 39 class test_push_receiver : public tbb::flow::receiver<T>, utils::NoAssign { 40 41 std::atomic<int> my_counters[N]; 42 tbb::flow::graph& my_graph; 43 44 public: 45 46 test_push_receiver(tbb::flow::graph& g) : my_graph(g) { 47 for (int i = 0; i < N; ++i ) 48 my_counters[i] = 0; 49 } 50 51 int get_count( int i ) { 52 int v = my_counters[i]; 53 return v; 54 } 55 56 typedef typename tbb::flow::receiver<T>::predecessor_type predecessor_type; 57 58 graph_task* try_put_task( const T &v ) override { 59 int i = (int)v; 60 ++my_counters[i]; 61 return const_cast<graph_task*>(SUCCESSFULLY_ENQUEUED); 62 } 63 64 tbb::flow::graph& graph_reference() const override { 65 return my_graph; 66 } 67 }; 68 69 template< typename T > 70 class my_input_body { 71 72 unsigned my_count; 73 int *ninvocations; 74 75 public: 76 77 my_input_body() : ninvocations(NULL) { my_count = 0; } 78 my_input_body(int &_inv) : ninvocations(&_inv) { my_count = 0; } 79 80 T operator()( tbb::flow_control& fc ) { 81 T v = (T)my_count++; 82 if(ninvocations) ++(*ninvocations); 83 if ( (int)v < N ){ 84 return v; 85 }else{ 86 fc.stop(); 87 return T(); 88 } 89 } 90 91 }; 92 93 template< typename T > 94 class function_body { 95 96 std::atomic<int> *my_counters; 97 98 public: 99 100 function_body( std::atomic<int> *counters ) : my_counters(counters) { 101 for (int i = 0; i < N; ++i ) 102 my_counters[i] = 0; 103 } 104 105 bool operator()( T v ) { 106 ++my_counters[(int)v]; 107 return true; 108 } 109 110 }; 111 112 template< typename T > 113 void test_single_dest() { 114 // push only 115 tbb::flow::graph g; 116 tbb::flow::input_node<T> src(g, my_input_body<T>() ); 117 test_push_receiver<T> dest(g); 118 tbb::flow::make_edge( src, dest ); 119 src.activate(); 120 g.wait_for_all(); 121 for (int i = 0; i < N; ++i ) { 122 CHECK_MESSAGE( dest.get_count(i) == 1, "" ); 123 } 124 125 // push only 126 std::atomic<int> counters3[N]; 127 tbb::flow::input_node<T> src3(g, my_input_body<T>() ); 128 src3.activate(); 129 130 function_body<T> b3( counters3 ); 131 tbb::flow::function_node<T,bool> dest3(g, tbb::flow::unlimited, b3 ); 132 tbb::flow::make_edge( src3, dest3 ); 133 g.wait_for_all(); 134 for (int i = 0; i < N; ++i ) { 135 int v = counters3[i]; 136 CHECK_MESSAGE( v == 1, "" ); 137 } 138 139 // push & pull 140 tbb::flow::input_node<T> src2(g, my_input_body<T>() ); 141 src2.activate(); 142 std::atomic<int> counters2[N]; 143 144 function_body<T> b2( counters2 ); 145 tbb::flow::function_node<T,bool,tbb::flow::rejecting> dest2(g, tbb::flow::serial, b2 ); 146 tbb::flow::make_edge( src2, dest2 ); 147 g.wait_for_all(); 148 for (int i = 0; i < N; ++i ) { 149 int v = counters2[i]; 150 CHECK_MESSAGE( v == 1, "" ); 151 } 152 153 // test copy constructor 154 tbb::flow::input_node<T> src_copy(src); 155 src_copy.activate(); 156 test_push_receiver<T> dest_c(g); 157 CHECK_MESSAGE( src_copy.register_successor(dest_c), "" ); 158 g.wait_for_all(); 159 for (int i = 0; i < N; ++i ) { 160 CHECK_MESSAGE( dest_c.get_count(i) == 1, "" ); 161 } 162 } 163 164 void test_reset() { 165 // input_node -> function_node 166 tbb::flow::graph g; 167 std::atomic<int> counters3[N]; 168 tbb::flow::input_node<int> src3(g, my_input_body<int>()); 169 src3.activate(); 170 tbb::flow::input_node<int> src_inactive(g, my_input_body<int>()); 171 function_body<int> b3( counters3 ); 172 tbb::flow::function_node<int,bool> dest3(g, tbb::flow::unlimited, b3); 173 tbb::flow::make_edge( src3, dest3 ); 174 // source_node already in active state. Let the graph run, 175 g.wait_for_all(); 176 // check the array for each value. 177 for (int i = 0; i < N; ++i ) { 178 int v = counters3[i]; 179 CHECK_MESSAGE( v == 1, "" ); 180 counters3[i] = 0; 181 } 182 183 g.reset(tbb::flow::rf_reset_bodies); // <-- re-initializes the counts. 184 // and spawns task to run input 185 src3.activate(); 186 187 g.wait_for_all(); 188 // check output queue again. Should be the same contents. 189 for (int i = 0; i < N; ++i ) { 190 int v = counters3[i]; 191 CHECK_MESSAGE( v == 1, "" ); 192 counters3[i] = 0; 193 } 194 g.reset(); // doesn't reset the input_node_body to initial state, but does spawn a task 195 // to run the input_node. 196 197 g.wait_for_all(); 198 // array should be all zero 199 for (int i = 0; i < N; ++i ) { 200 int v = counters3[i]; 201 CHECK_MESSAGE( v == 0, "" ); 202 } 203 204 remove_edge(src3, dest3); 205 make_edge(src_inactive, dest3); 206 207 // src_inactive doesn't run 208 g.wait_for_all(); 209 for (int i = 0; i < N; ++i ) { 210 int v = counters3[i]; 211 CHECK_MESSAGE( v == 0, "" ); 212 } 213 214 // run graph 215 src_inactive.activate(); 216 g.wait_for_all(); 217 // check output 218 for (int i = 0; i < N; ++i ) { 219 int v = counters3[i]; 220 CHECK_MESSAGE( v == 1, "" ); 221 counters3[i] = 0; 222 } 223 g.reset(tbb::flow::rf_reset_bodies); // <-- reinitializes the counts 224 // src_inactive doesn't run 225 g.wait_for_all(); 226 for (int i = 0; i < N; ++i ) { 227 int v = counters3[i]; 228 CHECK_MESSAGE( v == 0, "" ); 229 } 230 231 // start it up 232 src_inactive.activate(); 233 g.wait_for_all(); 234 for (int i = 0; i < N; ++i ) { 235 int v = counters3[i]; 236 CHECK_MESSAGE( v == 1, "" ); 237 counters3[i] = 0; 238 } 239 g.reset(); // doesn't reset the input_node_body to initial state, and doesn't 240 // spawn a task to run the input_node. 241 242 g.wait_for_all(); 243 // array should be all zero 244 for (int i = 0; i < N; ++i ) { 245 int v = counters3[i]; 246 CHECK_MESSAGE( v == 0, "" ); 247 } 248 src_inactive.activate(); 249 // input_node_body is already in final state, so input_node will not forward a message. 250 g.wait_for_all(); 251 for (int i = 0; i < N; ++i ) { 252 int v = counters3[i]; 253 CHECK_MESSAGE( v == 0, "" ); 254 } 255 } 256 257 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 258 int input_body_f(tbb::flow_control&) { return 42; } 259 260 void test_deduction_guides() { 261 using namespace tbb::flow; 262 graph g; 263 264 auto lambda = [](tbb::flow_control&) { return 42; }; 265 auto non_const_lambda = [](tbb::flow_control&) mutable { return 42; }; 266 267 // Tests for input_node(graph&, Body) 268 input_node s1(g, lambda); 269 static_assert(std::is_same_v<decltype(s1), input_node<int>>); 270 271 input_node s2(g, non_const_lambda); 272 static_assert(std::is_same_v<decltype(s2), input_node<int>>); 273 274 input_node s3(g, input_body_f); 275 static_assert(std::is_same_v<decltype(s3), input_node<int>>); 276 277 input_node s4(s3); 278 static_assert(std::is_same_v<decltype(s4), input_node<int>>); 279 280 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 281 broadcast_node<int> bc(g); 282 283 // Tests for input_node(const node_set<Args...>&, Body) 284 input_node s5(precedes(bc), lambda); 285 static_assert(std::is_same_v<decltype(s5), input_node<int>>); 286 287 input_node s6(precedes(bc), non_const_lambda); 288 static_assert(std::is_same_v<decltype(s6), input_node<int>>); 289 290 input_node s7(precedes(bc), input_body_f); 291 static_assert(std::is_same_v<decltype(s7), input_node<int>>); 292 #endif 293 g.wait_for_all(); 294 } 295 296 #endif // __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 297 298 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 299 #include <array> 300 void test_follows_and_precedes_api() { 301 using namespace tbb::flow; 302 303 graph g; 304 305 std::array<buffer_node<bool>, 3> successors {{ 306 buffer_node<bool>(g), 307 buffer_node<bool>(g), 308 buffer_node<bool>(g) 309 }}; 310 311 bool do_try_put = true; 312 input_node<bool> src( 313 precedes(successors[0], successors[1], successors[2]), 314 [&](tbb::flow_control& fc) -> bool { 315 if(!do_try_put) 316 fc.stop(); 317 do_try_put = !do_try_put; 318 return true; 319 } 320 ); 321 322 src.activate(); 323 g.wait_for_all(); 324 325 bool storage; 326 for(auto& successor: successors) { 327 CHECK_MESSAGE((successor.try_get(storage) && !successor.try_get(storage)), 328 "Not exact edge quantity was made"); 329 } 330 } 331 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 332 333 //! Test push, push-pull behavior and copy constructor 334 //! \brief \ref error_guessing \ref requirement 335 TEST_CASE("Single destination tests"){ 336 for ( unsigned int p = utils::MinThread; p < utils::MaxThread; ++p ) { 337 tbb::task_arena arena(p); 338 arena.execute( 339 [&]() { 340 test_single_dest<int>(); 341 test_single_dest<float>(); 342 } 343 ); 344 } 345 } 346 347 //! Test reset variants 348 //! \brief \ref error_guessing 349 TEST_CASE("Reset test"){ 350 test_reset(); 351 } 352 353 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 354 //! Test follows and precedes API 355 //! \brief \ref error_guessing 356 TEST_CASE("Follows and precedes API"){ 357 test_follows_and_precedes_api(); 358 } 359 #endif 360 361 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 362 //! Test deduction guides 363 //! \brief \ref requirement 364 TEST_CASE("Deduction guides"){ 365 test_deduction_guides(); 366 } 367 #endif 368 369 //! Test try_get before activation 370 //! \brief \ref error_guessing 371 TEST_CASE("try_get before activation"){ 372 tbb::flow::graph g; 373 tbb::flow::input_node<int> in(g, [&](tbb::flow_control& fc) { fc.stop(); return 0;}); 374 375 int tmp = -1; 376 CHECK_MESSAGE((in.try_get(tmp) == false), "try_get before activation should not succeed"); 377 } 378 379 #if __TBB_CPP20_CONCEPTS_PRESENT 380 //! \brief \ref error_guessing 381 TEST_CASE("constraints for input_node output") { 382 struct Object : test_concepts::Copyable, test_concepts::CopyAssignable {}; 383 384 static_assert(utils::well_formed_instantiation<tbb::flow::input_node, Object>); 385 static_assert(utils::well_formed_instantiation<tbb::flow::input_node, int>); 386 static_assert(!utils::well_formed_instantiation<tbb::flow::input_node, test_concepts::NonCopyable>); 387 static_assert(!utils::well_formed_instantiation<tbb::flow::input_node, test_concepts::NonCopyAssignable>); 388 } 389 390 template <typename Output, typename Body> 391 concept can_call_input_node_ctor = requires( tbb::flow::graph& graph, Body body, tbb::flow::buffer_node<int> f ) { 392 tbb::flow::input_node<Output>(graph, body); 393 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 394 tbb::flow::input_node<Output>(tbb::flow::precedes(f), body); 395 #endif 396 }; 397 398 //! \brief \ref error_guessing 399 TEST_CASE("constraints for input_node body") { 400 using output_type = int; 401 using namespace test_concepts::input_node_body; 402 403 static_assert(can_call_input_node_ctor<output_type, Correct<output_type>>); 404 static_assert(!can_call_input_node_ctor<output_type, NonCopyable<output_type>>); 405 static_assert(!can_call_input_node_ctor<output_type, NonDestructible<output_type>>); 406 static_assert(!can_call_input_node_ctor<output_type, NoOperatorRoundBrackets<output_type>>); 407 static_assert(!can_call_input_node_ctor<output_type, WrongInputOperatorRoundBrackets<output_type>>); 408 static_assert(!can_call_input_node_ctor<output_type, WrongReturnOperatorRoundBrackets<output_type>>); 409 } 410 #endif // __TBB_CPP20_CONCEPTS_PRESENT 411