1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 // have to expose the reset_node method to be able to reset a function_body 18 19 #include "common/config.h" 20 21 #include "tbb/flow_graph.h" 22 23 #include "common/test.h" 24 #include "common/utils.h" 25 #include "common/utils_assert.h" 26 #include "common/concepts_common.h" 27 28 29 //! \file test_input_node.cpp 30 //! \brief Test for [flow_graph.input_node] specification 31 32 33 using tbb::detail::d1::graph_task; 34 using tbb::detail::d1::SUCCESSFULLY_ENQUEUED; 35 36 const int N = 1000; 37 38 template< typename T > 39 class test_push_receiver : public tbb::flow::receiver<T>, utils::NoAssign { 40 41 std::atomic<int> my_counters[N]; 42 tbb::flow::graph& my_graph; 43 44 public: 45 46 test_push_receiver(tbb::flow::graph& g) : my_graph(g) { 47 for (int i = 0; i < N; ++i ) 48 my_counters[i] = 0; 49 } 50 51 int get_count( int i ) { 52 int v = my_counters[i]; 53 return v; 54 } 55 56 typedef typename tbb::flow::receiver<T>::predecessor_type predecessor_type; 57 58 graph_task* try_put_task( const T &v ) override { 59 int i = (int)v; 60 ++my_counters[i]; 61 return const_cast<graph_task*>(SUCCESSFULLY_ENQUEUED); 62 } 63 64 tbb::flow::graph& graph_reference() const override { 65 return my_graph; 66 } 67 }; 68 69 template< typename T > 70 class my_input_body { 71 72 unsigned my_count; 73 int *ninvocations; 74 75 public: 76 77 my_input_body() : ninvocations(nullptr) { my_count = 0; } 78 my_input_body(int &_inv) : ninvocations(&_inv) { my_count = 0; } 79 80 T operator()( tbb::flow_control& fc ) { 81 T v = (T)my_count++; 82 if(ninvocations) ++(*ninvocations); 83 if ( (int)v < N ){ 84 return v; 85 }else{ 86 fc.stop(); 87 return T(); 88 } 89 } 90 91 }; 92 93 template< typename T > 94 class function_body { 95 96 std::atomic<int> *my_counters; 97 98 public: 99 100 function_body( std::atomic<int> *counters ) : my_counters(counters) { 101 for (int i = 0; i < N; ++i ) 102 my_counters[i] = 0; 103 } 104 105 bool operator()( T v ) { 106 ++my_counters[(int)v]; 107 return true; 108 } 109 110 }; 111 112 template< typename T > 113 void test_single_dest() { 114 // push only 115 tbb::flow::graph g; 116 tbb::flow::input_node<T> src(g, my_input_body<T>() ); 117 test_push_receiver<T> dest(g); 118 tbb::flow::make_edge( src, dest ); 119 src.activate(); 120 g.wait_for_all(); 121 for (int i = 0; i < N; ++i ) { 122 CHECK_MESSAGE( dest.get_count(i) == 1, "" ); 123 } 124 125 // push only 126 std::atomic<int> counters3[N]; 127 tbb::flow::input_node<T> src3(g, my_input_body<T>() ); 128 src3.activate(); 129 130 function_body<T> b3( counters3 ); 131 tbb::flow::function_node<T,bool> dest3(g, tbb::flow::unlimited, b3 ); 132 tbb::flow::make_edge( src3, dest3 ); 133 g.wait_for_all(); 134 for (int i = 0; i < N; ++i ) { 135 int v = counters3[i]; 136 CHECK_MESSAGE( v == 1, "" ); 137 } 138 139 // push & pull 140 tbb::flow::input_node<T> src2(g, my_input_body<T>() ); 141 src2.activate(); 142 std::atomic<int> counters2[N]; 143 144 function_body<T> b2( counters2 ); 145 tbb::flow::function_node<T,bool,tbb::flow::rejecting> dest2(g, tbb::flow::serial, b2 ); 146 tbb::flow::make_edge( src2, dest2 ); 147 g.wait_for_all(); 148 for (int i = 0; i < N; ++i ) { 149 int v = counters2[i]; 150 CHECK_MESSAGE( v == 1, "" ); 151 } 152 153 // test copy constructor 154 tbb::flow::input_node<T> src_copy(src); 155 src_copy.activate(); 156 test_push_receiver<T> dest_c(g); 157 CHECK_MESSAGE( src_copy.register_successor(dest_c), "" ); 158 g.wait_for_all(); 159 for (int i = 0; i < N; ++i ) { 160 CHECK_MESSAGE( dest_c.get_count(i) == 1, "" ); 161 } 162 } 163 164 void test_reset() { 165 // input_node -> function_node 166 tbb::flow::graph g; 167 std::atomic<int> counters3[N]; 168 tbb::flow::input_node<int> src3(g, my_input_body<int>()); 169 src3.activate(); 170 tbb::flow::input_node<int> src_inactive(g, my_input_body<int>()); 171 function_body<int> b3( counters3 ); 172 tbb::flow::function_node<int,bool> dest3(g, tbb::flow::unlimited, b3); 173 tbb::flow::make_edge( src3, dest3 ); 174 // source_node already in active state. Let the graph run, 175 g.wait_for_all(); 176 // check the array for each value. 177 for (int i = 0; i < N; ++i ) { 178 int v = counters3[i]; 179 CHECK_MESSAGE( v == 1, "" ); 180 counters3[i] = 0; 181 } 182 183 g.reset(tbb::flow::rf_reset_bodies); // <-- re-initializes the counts. 184 // and spawns task to run input 185 src3.activate(); 186 187 g.wait_for_all(); 188 // check output queue again. Should be the same contents. 189 for (int i = 0; i < N; ++i ) { 190 int v = counters3[i]; 191 CHECK_MESSAGE( v == 1, "" ); 192 counters3[i] = 0; 193 } 194 g.reset(); // doesn't reset the input_node_body to initial state, but does spawn a task 195 // to run the input_node. 196 197 g.wait_for_all(); 198 // array should be all zero 199 for (int i = 0; i < N; ++i ) { 200 int v = counters3[i]; 201 CHECK_MESSAGE( v == 0, "" ); 202 } 203 204 remove_edge(src3, dest3); 205 make_edge(src_inactive, dest3); 206 207 // src_inactive doesn't run 208 g.wait_for_all(); 209 for (int i = 0; i < N; ++i ) { 210 int v = counters3[i]; 211 CHECK_MESSAGE( v == 0, "" ); 212 } 213 214 // run graph 215 src_inactive.activate(); 216 g.wait_for_all(); 217 // check output 218 for (int i = 0; i < N; ++i ) { 219 int v = counters3[i]; 220 CHECK_MESSAGE( v == 1, "" ); 221 counters3[i] = 0; 222 } 223 g.reset(tbb::flow::rf_reset_bodies); // <-- reinitializes the counts 224 // src_inactive doesn't run 225 g.wait_for_all(); 226 for (int i = 0; i < N; ++i ) { 227 int v = counters3[i]; 228 CHECK_MESSAGE( v == 0, "" ); 229 } 230 231 // start it up 232 src_inactive.activate(); 233 g.wait_for_all(); 234 for (int i = 0; i < N; ++i ) { 235 int v = counters3[i]; 236 CHECK_MESSAGE( v == 1, "" ); 237 counters3[i] = 0; 238 } 239 g.reset(); // doesn't reset the input_node_body to initial state, and doesn't 240 // spawn a task to run the input_node. 241 242 g.wait_for_all(); 243 // array should be all zero 244 for (int i = 0; i < N; ++i ) { 245 int v = counters3[i]; 246 CHECK_MESSAGE( v == 0, "" ); 247 } 248 src_inactive.activate(); 249 // input_node_body is already in final state, so input_node will not forward a message. 250 g.wait_for_all(); 251 for (int i = 0; i < N; ++i ) { 252 int v = counters3[i]; 253 CHECK_MESSAGE( v == 0, "" ); 254 } 255 } 256 257 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 258 #include <array> 259 void test_follows_and_precedes_api() { 260 using namespace tbb::flow; 261 262 graph g; 263 264 std::array<buffer_node<bool>, 3> successors {{ 265 buffer_node<bool>(g), 266 buffer_node<bool>(g), 267 buffer_node<bool>(g) 268 }}; 269 270 bool do_try_put = true; 271 input_node<bool> src( 272 precedes(successors[0], successors[1], successors[2]), 273 [&](tbb::flow_control& fc) -> bool { 274 if(!do_try_put) 275 fc.stop(); 276 do_try_put = !do_try_put; 277 return true; 278 } 279 ); 280 281 src.activate(); 282 g.wait_for_all(); 283 284 bool storage; 285 for(auto& successor: successors) { 286 CHECK_MESSAGE((successor.try_get(storage) && !successor.try_get(storage)), 287 "Not exact edge quantity was made"); 288 } 289 } 290 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 291 292 //! Test push, push-pull behavior and copy constructor 293 //! \brief \ref error_guessing \ref requirement 294 TEST_CASE("Single destination tests"){ 295 for ( unsigned int p = utils::MinThread; p < utils::MaxThread; ++p ) { 296 tbb::task_arena arena(p); 297 arena.execute( 298 [&]() { 299 test_single_dest<int>(); 300 test_single_dest<float>(); 301 } 302 ); 303 } 304 } 305 306 //! Test reset variants 307 //! \brief \ref error_guessing 308 TEST_CASE("Reset test"){ 309 test_reset(); 310 } 311 312 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 313 //! Test follows and precedes API 314 //! \brief \ref error_guessing 315 TEST_CASE("Follows and precedes API"){ 316 test_follows_and_precedes_api(); 317 } 318 #endif 319 320 //! Test try_get before activation 321 //! \brief \ref error_guessing 322 TEST_CASE("try_get before activation"){ 323 tbb::flow::graph g; 324 tbb::flow::input_node<int> in(g, [&](tbb::flow_control& fc) { fc.stop(); return 0;}); 325 326 int tmp = -1; 327 CHECK_MESSAGE((in.try_get(tmp) == false), "try_get before activation should not succeed"); 328 } 329 330 #if __TBB_CPP20_CONCEPTS_PRESENT 331 //! \brief \ref error_guessing 332 TEST_CASE("constraints for input_node output") { 333 struct Object : test_concepts::Copyable, test_concepts::CopyAssignable {}; 334 335 static_assert(utils::well_formed_instantiation<tbb::flow::input_node, Object>); 336 static_assert(utils::well_formed_instantiation<tbb::flow::input_node, int>); 337 static_assert(!utils::well_formed_instantiation<tbb::flow::input_node, test_concepts::NonCopyable>); 338 static_assert(!utils::well_formed_instantiation<tbb::flow::input_node, test_concepts::NonCopyAssignable>); 339 } 340 341 template <typename Output, typename Body> 342 concept can_call_input_node_ctor = requires( tbb::flow::graph& graph, Body body, tbb::flow::buffer_node<int> f ) { 343 tbb::flow::input_node<Output>(graph, body); 344 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 345 tbb::flow::input_node<Output>(tbb::flow::precedes(f), body); 346 #endif 347 }; 348 349 //! \brief \ref error_guessing 350 TEST_CASE("constraints for input_node body") { 351 using output_type = int; 352 using namespace test_concepts::input_node_body; 353 354 static_assert(can_call_input_node_ctor<output_type, Correct<output_type>>); 355 static_assert(!can_call_input_node_ctor<output_type, NonCopyable<output_type>>); 356 static_assert(!can_call_input_node_ctor<output_type, NonDestructible<output_type>>); 357 static_assert(!can_call_input_node_ctor<output_type, NoOperatorRoundBrackets<output_type>>); 358 static_assert(!can_call_input_node_ctor<output_type, WrongInputOperatorRoundBrackets<output_type>>); 359 static_assert(!can_call_input_node_ctor<output_type, WrongReturnOperatorRoundBrackets<output_type>>); 360 } 361 #endif // __TBB_CPP20_CONCEPTS_PRESENT 362