1 /* 2 Copyright (c) 2005-2023 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "common/config.h" 18 19 #include "tbb/flow_graph.h" 20 21 #include "common/test.h" 22 #include "common/utils.h" 23 #include "common/graph_utils.h" 24 #include "common/test_follows_and_precedes_api.h" 25 #include "common/concepts_common.h" 26 27 28 //! \file test_continue_node.cpp 29 //! \brief Test for [flow_graph.continue_node] specification 30 31 32 #define N 1000 33 #define MAX_NODES 4 34 #define C 8 35 36 // A class to use as a fake predecessor of continue_node 37 struct fake_continue_sender : public tbb::flow::sender<tbb::flow::continue_msg> 38 { 39 typedef tbb::flow::sender<tbb::flow::continue_msg>::successor_type successor_type; 40 // Define implementations of virtual methods that are abstract in the base class 41 bool register_successor( successor_type& ) override { return false; } 42 bool remove_successor( successor_type& ) override { return false; } 43 }; 44 45 template< typename InputType > 46 struct parallel_puts { 47 48 tbb::flow::receiver< InputType > * const my_exe_node; 49 50 parallel_puts( tbb::flow::receiver< InputType > &exe_node ) : my_exe_node(&exe_node) {} 51 parallel_puts& operator=(const parallel_puts&) = delete; 52 53 void operator()( int ) const { 54 for ( int i = 0; i < N; ++i ) { 55 // the nodes will accept all puts 56 CHECK_MESSAGE( my_exe_node->try_put( InputType() ) == true, "" ); 57 } 58 } 59 60 }; 61 62 template< typename OutputType > 63 void run_continue_nodes( int p, tbb::flow::graph& g, tbb::flow::continue_node< OutputType >& n ) { 64 fake_continue_sender fake_sender; 65 for (size_t i = 0; i < N; ++i) { 66 tbb::detail::d1::register_predecessor(n, fake_sender); 67 } 68 69 for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { 70 std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers; 71 for (size_t i = 0; i < num_receivers; ++i) { 72 receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) ); 73 } 74 harness_graph_executor<tbb::flow::continue_msg, OutputType>::execute_count = 0; 75 76 for (size_t r = 0; r < num_receivers; ++r ) { 77 tbb::flow::make_edge( n, *receivers[r] ); 78 } 79 80 utils::NativeParallelFor( p, parallel_puts<tbb::flow::continue_msg>(n) ); 81 g.wait_for_all(); 82 83 // 2) the nodes will receive puts from multiple predecessors simultaneously, 84 size_t ec = harness_graph_executor<tbb::flow::continue_msg, OutputType>::execute_count; 85 CHECK_MESSAGE( (int)ec == p, "" ); 86 for (size_t r = 0; r < num_receivers; ++r ) { 87 size_t c = receivers[r]->my_count; 88 // 3) the nodes will send to multiple successors. 89 CHECK_MESSAGE( (int)c == p, "" ); 90 } 91 92 for (size_t r = 0; r < num_receivers; ++r ) { 93 tbb::flow::remove_edge( n, *receivers[r] ); 94 } 95 } 96 } 97 98 template< typename OutputType, typename Body > 99 void continue_nodes( Body body ) { 100 for (int p = 1; p < 2*4/*MaxThread*/; ++p) { 101 tbb::flow::graph g; 102 tbb::flow::continue_node< OutputType > exe_node( g, body ); 103 run_continue_nodes( p, g, exe_node); 104 exe_node.try_put(tbb::flow::continue_msg()); 105 tbb::flow::continue_node< OutputType > exe_node_copy( exe_node ); 106 run_continue_nodes( p, g, exe_node_copy); 107 } 108 } 109 110 const size_t Offset = 123; 111 std::atomic<size_t> global_execute_count; 112 113 template< typename OutputType > 114 struct inc_functor { 115 116 std::atomic<size_t> local_execute_count; 117 inc_functor( ) { local_execute_count = 0; } 118 inc_functor( const inc_functor &f ) { local_execute_count = size_t(f.local_execute_count); } 119 void operator=(const inc_functor &f) { local_execute_count = size_t(f.local_execute_count); } 120 121 OutputType operator()( tbb::flow::continue_msg ) { 122 ++global_execute_count; 123 ++local_execute_count; 124 return OutputType(); 125 } 126 127 }; 128 129 template< typename OutputType > 130 void continue_nodes_with_copy( ) { 131 132 for (int p = 1; p < 2*4/*MaxThread*/; ++p) { 133 tbb::flow::graph g; 134 inc_functor<OutputType> cf; 135 cf.local_execute_count = Offset; 136 global_execute_count = Offset; 137 138 tbb::flow::continue_node< OutputType > exe_node( g, cf ); 139 fake_continue_sender fake_sender; 140 for (size_t i = 0; i < N; ++i) { 141 tbb::detail::d1::register_predecessor(exe_node, fake_sender); 142 } 143 144 for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { 145 std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers; 146 for (size_t i = 0; i < num_receivers; ++i) { 147 receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) ); 148 } 149 150 for (size_t r = 0; r < num_receivers; ++r ) { 151 tbb::flow::make_edge( exe_node, *receivers[r] ); 152 } 153 154 utils::NativeParallelFor( p, parallel_puts<tbb::flow::continue_msg>(exe_node) ); 155 g.wait_for_all(); 156 157 // 2) the nodes will receive puts from multiple predecessors simultaneously, 158 for (size_t r = 0; r < num_receivers; ++r ) { 159 size_t c = receivers[r]->my_count; 160 // 3) the nodes will send to multiple successors. 161 CHECK_MESSAGE( (int)c == p, "" ); 162 } 163 for (size_t r = 0; r < num_receivers; ++r ) { 164 tbb::flow::remove_edge( exe_node, *receivers[r] ); 165 } 166 } 167 168 // validate that the local body matches the global execute_count and both are correct 169 inc_functor<OutputType> body_copy = tbb::flow::copy_body< inc_functor<OutputType> >( exe_node ); 170 const size_t expected_count = p*MAX_NODES + Offset; 171 size_t global_count = global_execute_count; 172 size_t inc_count = body_copy.local_execute_count; 173 CHECK_MESSAGE( global_count == expected_count, "" ); 174 CHECK_MESSAGE( global_count == inc_count, "" ); 175 g.reset(tbb::flow::rf_reset_bodies); 176 body_copy = tbb::flow::copy_body< inc_functor<OutputType> >( exe_node ); 177 inc_count = body_copy.local_execute_count; 178 CHECK_MESSAGE( ( Offset == inc_count), "reset(rf_reset_bodies) did not reset functor" ); 179 180 } 181 } 182 183 template< typename OutputType > 184 void run_continue_nodes() { 185 harness_graph_executor< tbb::flow::continue_msg, OutputType>::max_executors = 0; 186 continue_nodes<OutputType>( []( tbb::flow::continue_msg i ) -> OutputType { return harness_graph_executor<tbb::flow::continue_msg, OutputType>::func(i); } ); 187 continue_nodes<OutputType>( &harness_graph_executor<tbb::flow::continue_msg, OutputType>::func ); 188 continue_nodes<OutputType>( typename harness_graph_executor<tbb::flow::continue_msg, OutputType>::functor() ); 189 continue_nodes_with_copy<OutputType>(); 190 } 191 192 //! Tests limited concurrency cases for nodes that accept data messages 193 void test_concurrency(int num_threads) { 194 tbb::task_arena arena(num_threads); 195 arena.execute( 196 [&] { 197 run_continue_nodes<tbb::flow::continue_msg>(); 198 run_continue_nodes<int>(); 199 run_continue_nodes<utils::NoAssign>(); 200 } 201 ); 202 } 203 /* 204 * Connection of two graphs is not currently supported, but works to some limited extent. 205 * This test is included to check for backward compatibility. It checks that a continue_node 206 * with predecessors in two different graphs receives the required 207 * number of continue messages before it executes. 208 */ 209 using namespace tbb::flow; 210 211 struct add_to_counter { 212 int* counter; 213 add_to_counter(int& var):counter(&var){} 214 void operator()(continue_msg){*counter+=1;} 215 }; 216 217 void test_two_graphs(){ 218 int count=0; 219 220 //graph g with broadcast_node and continue_node 221 graph g; 222 broadcast_node<continue_msg> start_g(g); 223 continue_node<continue_msg> first_g(g, add_to_counter(count)); 224 225 //graph h with broadcast_node 226 graph h; 227 broadcast_node<continue_msg> start_h(h); 228 229 //making two edges to first_g from the two graphs 230 make_edge(start_g,first_g); 231 make_edge(start_h, first_g); 232 233 //two try_puts from the two graphs 234 start_g.try_put(continue_msg()); 235 start_h.try_put(continue_msg()); 236 g.wait_for_all(); 237 CHECK_MESSAGE( (count==1), "Not all continue messages received"); 238 239 //two try_puts from the graph that doesn't contain the node 240 count=0; 241 start_h.try_put(continue_msg()); 242 start_h.try_put(continue_msg()); 243 g.wait_for_all(); 244 CHECK_MESSAGE( (count==1), "Not all continue messages received -1"); 245 246 //only one try_put 247 count=0; 248 start_g.try_put(continue_msg()); 249 g.wait_for_all(); 250 CHECK_MESSAGE( (count==0), "Node executed without waiting for all predecessors"); 251 } 252 253 struct lightweight_policy_body { 254 const std::thread::id my_thread_id; 255 std::atomic<size_t>& my_count; 256 257 lightweight_policy_body( std::atomic<size_t>& count ) 258 : my_thread_id(std::this_thread::get_id()), my_count(count) 259 { 260 my_count = 0; 261 } 262 263 lightweight_policy_body( const lightweight_policy_body& ) = default; 264 lightweight_policy_body& operator=( const lightweight_policy_body& ) = delete; 265 266 void operator()( tbb::flow::continue_msg ) { 267 ++my_count; 268 std::thread::id body_thread_id = std::this_thread::get_id(); 269 CHECK_MESSAGE( (body_thread_id == my_thread_id), "Body executed as not lightweight"); 270 } 271 }; 272 273 void test_lightweight_policy() { 274 tbb::flow::graph g; 275 std::atomic<size_t> count1; 276 std::atomic<size_t> count2; 277 tbb::flow::continue_node<tbb::flow::continue_msg, tbb::flow::lightweight> 278 node1(g, lightweight_policy_body(count1)); 279 tbb::flow::continue_node<tbb::flow::continue_msg, tbb::flow::lightweight> 280 node2(g, lightweight_policy_body(count2)); 281 282 tbb::flow::make_edge(node1, node2); 283 const size_t n = 10; 284 for(size_t i = 0; i < n; ++i) { 285 node1.try_put(tbb::flow::continue_msg()); 286 } 287 g.wait_for_all(); 288 289 lightweight_policy_body body1 = tbb::flow::copy_body<lightweight_policy_body>(node1); 290 lightweight_policy_body body2 = tbb::flow::copy_body<lightweight_policy_body>(node2); 291 CHECK_MESSAGE( (body1.my_count == n), "Body of the first node needs to be executed N times"); 292 CHECK_MESSAGE( (body2.my_count == n), "Body of the second node needs to be executed N times"); 293 } 294 295 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 296 #include <array> 297 #include <vector> 298 void test_follows_and_precedes_api() { 299 using msg_t = tbb::flow::continue_msg; 300 301 std::array<msg_t, 3> messages_for_follows = { { msg_t(), msg_t(), msg_t() } }; 302 std::vector<msg_t> messages_for_precedes = { msg_t() }; 303 304 auto pass_through = [](const msg_t& msg) { return msg; }; 305 306 follows_and_precedes_testing::test_follows 307 <msg_t, tbb::flow::continue_node<msg_t>> 308 (messages_for_follows, pass_through, node_priority_t(0)); 309 310 follows_and_precedes_testing::test_precedes 311 <msg_t, tbb::flow::continue_node<msg_t>> 312 (messages_for_precedes, /* number_of_predecessors = */0, pass_through, node_priority_t(1)); 313 } 314 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 315 316 // TODO: use pass_through from test_function_node instead 317 template<typename T> 318 struct passing_body { 319 T operator()(const T& val) { 320 return val; 321 } 322 }; 323 324 /* 325 The test covers the case when a node with non-default mutex type is a predecessor for continue_node, 326 because there used to be a bug when make_edge(node, continue_node) 327 did not update continue_node's predecesosor threshold 328 since the specialization of node's successor_cache for a continue_node was not chosen. 329 */ 330 void test_successor_cache_specialization() { 331 using namespace tbb::flow; 332 333 graph g; 334 335 broadcast_node<continue_msg> node_with_default_mutex_type(g); 336 buffer_node<continue_msg> node_with_non_default_mutex_type(g); 337 338 continue_node<continue_msg> node(g, passing_body<continue_msg>()); 339 340 make_edge(node_with_default_mutex_type, node); 341 make_edge(node_with_non_default_mutex_type, node); 342 343 buffer_node<continue_msg> buf(g); 344 345 make_edge(node, buf); 346 347 node_with_default_mutex_type.try_put(continue_msg()); 348 node_with_non_default_mutex_type.try_put(continue_msg()); 349 350 g.wait_for_all(); 351 352 continue_msg storage; 353 CHECK_MESSAGE((buf.try_get(storage) && !buf.try_get(storage)), 354 "Wrong number of messages is passed via continue_node"); 355 } 356 357 //! Test concurrent continue_node for correctness 358 //! \brief \ref error_guessing 359 TEST_CASE("Concurrency testing") { 360 for( unsigned p=utils::MinThread; p<=utils::MaxThread; ++p ) { 361 test_concurrency(p); 362 } 363 } 364 365 //! Test concurrent continue_node in separate graphs 366 //! \brief \ref error_guessing 367 TEST_CASE("Two graphs") { test_two_graphs(); } 368 369 //! Test basic behaviour with lightweight body 370 //! \brief \ref requirement \ref error_guessing 371 TEST_CASE( "Lightweight policy" ) { test_lightweight_policy(); } 372 373 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 374 //! Test deprecated follows and precedes API 375 //! \brief \ref error_guessing 376 TEST_CASE( "Support for follows and precedes API" ) { test_follows_and_precedes_api(); } 377 #endif 378 379 //! Test for successor cache specialization 380 //! \brief \ref regression 381 TEST_CASE( "Regression for successor cache specialization" ) { 382 test_successor_cache_specialization(); 383 } 384 385 #if __TBB_CPP20_CONCEPTS_PRESENT 386 //! \brief \ref error_guessing 387 TEST_CASE("constraints for continue_node input") { 388 static_assert(utils::well_formed_instantiation<tbb::flow::continue_node, test_concepts::Copyable>); 389 static_assert(!utils::well_formed_instantiation<tbb::flow::continue_node, test_concepts::NonCopyable>); 390 } 391 392 template <typename Input, typename Body> 393 concept can_call_continue_node_ctor = requires( tbb::flow::graph& graph, Body body, 394 tbb::flow::buffer_node<int>& f, std::size_t num, 395 tbb::flow::node_priority_t priority ) { 396 tbb::flow::continue_node<Input>(graph, body); 397 tbb::flow::continue_node<Input>(graph, body, priority); 398 tbb::flow::continue_node<Input>(graph, num, body); 399 tbb::flow::continue_node<Input>(graph, num, body, priority); 400 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 401 tbb::flow::continue_node<Input>(tbb::flow::follows(f), body); 402 tbb::flow::continue_node<Input>(tbb::flow::follows(f), body, priority); 403 tbb::flow::continue_node<Input>(tbb::flow::follows(f), num, body); 404 tbb::flow::continue_node<Input>(tbb::flow::follows(f), num, body, priority); 405 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 406 }; 407 408 //! \brief \ref error_guessing 409 TEST_CASE("constraints for continue_node body") { 410 using output_type = int; 411 using namespace test_concepts::continue_node_body; 412 413 static_assert(can_call_continue_node_ctor<output_type, Correct<output_type>>); 414 static_assert(!can_call_continue_node_ctor<output_type, NonCopyable<output_type>>); 415 static_assert(!can_call_continue_node_ctor<output_type, NonDestructible<output_type>>); 416 static_assert(!can_call_continue_node_ctor<output_type, NoOperatorRoundBrackets<output_type>>); 417 static_assert(!can_call_continue_node_ctor<output_type, WrongInputOperatorRoundBrackets<output_type>>); 418 static_assert(!can_call_continue_node_ctor<output_type, WrongReturnOperatorRoundBrackets<output_type>>); 419 } 420 #endif // __TBB_CPP20_CONCEPTS_PRESENT 421