1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "common/config.h" 18 19 // TODO revamp: move parts dependent on __TBB_EXTRA_DEBUG into separate test(s) since having these 20 // parts in all of tests might make testing of the product, which is different from what is actually 21 // released. 22 #define __TBB_EXTRA_DEBUG 1 23 #include "tbb/flow_graph.h" 24 25 #include "common/test.h" 26 #include "common/utils.h" 27 #include "common/graph_utils.h" 28 #include "common/test_follows_and_precedes_api.h" 29 30 31 //! \file test_continue_node.cpp 32 //! \brief Test for [flow_graph.continue_node] specification 33 34 35 #define N 1000 36 #define MAX_NODES 4 37 #define C 8 38 39 // A class to use as a fake predecessor of continue_node 40 struct fake_continue_sender : public tbb::flow::sender<tbb::flow::continue_msg> 41 { 42 typedef tbb::flow::sender<tbb::flow::continue_msg>::successor_type successor_type; 43 // Define implementations of virtual methods that are abstract in the base class 44 bool register_successor( successor_type& ) override { return false; } 45 bool remove_successor( successor_type& ) override { return false; } 46 }; 47 48 template< typename InputType > 49 struct parallel_puts { 50 51 tbb::flow::receiver< InputType > * const my_exe_node; 52 53 parallel_puts( tbb::flow::receiver< InputType > &exe_node ) : my_exe_node(&exe_node) {} 54 parallel_puts& operator=(const parallel_puts&) = delete; 55 56 void operator()( int ) const { 57 for ( int i = 0; i < N; ++i ) { 58 // the nodes will accept all puts 59 CHECK_MESSAGE( my_exe_node->try_put( InputType() ) == true, "" ); 60 } 61 } 62 63 }; 64 65 template< typename OutputType > 66 void run_continue_nodes( int p, tbb::flow::graph& g, tbb::flow::continue_node< OutputType >& n ) { 67 fake_continue_sender fake_sender; 68 for (size_t i = 0; i < N; ++i) { 69 tbb::detail::d1::register_predecessor(n, fake_sender); 70 } 71 72 for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { 73 std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers; 74 for (size_t i = 0; i < num_receivers; ++i) { 75 receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) ); 76 } 77 harness_graph_executor<tbb::flow::continue_msg, OutputType>::execute_count = 0; 78 79 for (size_t r = 0; r < num_receivers; ++r ) { 80 tbb::flow::make_edge( n, *receivers[r] ); 81 } 82 83 utils::NativeParallelFor( p, parallel_puts<tbb::flow::continue_msg>(n) ); 84 g.wait_for_all(); 85 86 // 2) the nodes will receive puts from multiple predecessors simultaneously, 87 size_t ec = harness_graph_executor<tbb::flow::continue_msg, OutputType>::execute_count; 88 CHECK_MESSAGE( (int)ec == p, "" ); 89 for (size_t r = 0; r < num_receivers; ++r ) { 90 size_t c = receivers[r]->my_count; 91 // 3) the nodes will send to multiple successors. 92 CHECK_MESSAGE( (int)c == p, "" ); 93 } 94 95 for (size_t r = 0; r < num_receivers; ++r ) { 96 tbb::flow::remove_edge( n, *receivers[r] ); 97 } 98 } 99 } 100 101 template< typename OutputType, typename Body > 102 void continue_nodes( Body body ) { 103 for (int p = 1; p < 2*4/*MaxThread*/; ++p) { 104 tbb::flow::graph g; 105 tbb::flow::continue_node< OutputType > exe_node( g, body ); 106 run_continue_nodes( p, g, exe_node); 107 exe_node.try_put(tbb::flow::continue_msg()); 108 tbb::flow::continue_node< OutputType > exe_node_copy( exe_node ); 109 run_continue_nodes( p, g, exe_node_copy); 110 } 111 } 112 113 const size_t Offset = 123; 114 std::atomic<size_t> global_execute_count; 115 116 template< typename OutputType > 117 struct inc_functor { 118 119 std::atomic<size_t> local_execute_count; 120 inc_functor( ) { local_execute_count = 0; } 121 inc_functor( const inc_functor &f ) { local_execute_count = size_t(f.local_execute_count); } 122 void operator=(const inc_functor &f) { local_execute_count = size_t(f.local_execute_count); } 123 124 OutputType operator()( tbb::flow::continue_msg ) { 125 ++global_execute_count; 126 ++local_execute_count; 127 return OutputType(); 128 } 129 130 }; 131 132 template< typename OutputType > 133 void continue_nodes_with_copy( ) { 134 135 for (int p = 1; p < 2*4/*MaxThread*/; ++p) { 136 tbb::flow::graph g; 137 inc_functor<OutputType> cf; 138 cf.local_execute_count = Offset; 139 global_execute_count = Offset; 140 141 tbb::flow::continue_node< OutputType > exe_node( g, cf ); 142 fake_continue_sender fake_sender; 143 for (size_t i = 0; i < N; ++i) { 144 tbb::detail::d1::register_predecessor(exe_node, fake_sender); 145 } 146 147 for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { 148 std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers; 149 for (size_t i = 0; i < num_receivers; ++i) { 150 receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) ); 151 } 152 153 for (size_t r = 0; r < num_receivers; ++r ) { 154 tbb::flow::make_edge( exe_node, *receivers[r] ); 155 } 156 157 utils::NativeParallelFor( p, parallel_puts<tbb::flow::continue_msg>(exe_node) ); 158 g.wait_for_all(); 159 160 // 2) the nodes will receive puts from multiple predecessors simultaneously, 161 for (size_t r = 0; r < num_receivers; ++r ) { 162 size_t c = receivers[r]->my_count; 163 // 3) the nodes will send to multiple successors. 164 CHECK_MESSAGE( (int)c == p, "" ); 165 } 166 for (size_t r = 0; r < num_receivers; ++r ) { 167 tbb::flow::remove_edge( exe_node, *receivers[r] ); 168 } 169 } 170 171 // validate that the local body matches the global execute_count and both are correct 172 inc_functor<OutputType> body_copy = tbb::flow::copy_body< inc_functor<OutputType> >( exe_node ); 173 const size_t expected_count = p*MAX_NODES + Offset; 174 size_t global_count = global_execute_count; 175 size_t inc_count = body_copy.local_execute_count; 176 CHECK_MESSAGE( global_count == expected_count, "" ); 177 CHECK_MESSAGE( global_count == inc_count, "" ); 178 g.reset(tbb::flow::rf_reset_bodies); 179 body_copy = tbb::flow::copy_body< inc_functor<OutputType> >( exe_node ); 180 inc_count = body_copy.local_execute_count; 181 CHECK_MESSAGE( ( Offset == inc_count), "reset(rf_reset_bodies) did not reset functor" ); 182 183 } 184 } 185 186 template< typename OutputType > 187 void run_continue_nodes() { 188 harness_graph_executor< tbb::flow::continue_msg, OutputType>::max_executors = 0; 189 continue_nodes<OutputType>( []( tbb::flow::continue_msg i ) -> OutputType { return harness_graph_executor<tbb::flow::continue_msg, OutputType>::func(i); } ); 190 continue_nodes<OutputType>( &harness_graph_executor<tbb::flow::continue_msg, OutputType>::func ); 191 continue_nodes<OutputType>( typename harness_graph_executor<tbb::flow::continue_msg, OutputType>::functor() ); 192 continue_nodes_with_copy<OutputType>(); 193 } 194 195 //! Tests limited concurrency cases for nodes that accept data messages 196 void test_concurrency(int num_threads) { 197 tbb::task_arena arena(num_threads); 198 arena.execute( 199 [&] { 200 run_continue_nodes<tbb::flow::continue_msg>(); 201 run_continue_nodes<int>(); 202 run_continue_nodes<utils::NoAssign>(); 203 } 204 ); 205 } 206 /* 207 * Connection of two graphs is not currently supported, but works to some limited extent. 208 * This test is included to check for backward compatibility. It checks that a continue_node 209 * with predecessors in two different graphs receives the required 210 * number of continue messages before it executes. 211 */ 212 using namespace tbb::flow; 213 214 struct add_to_counter { 215 int* counter; 216 add_to_counter(int& var):counter(&var){} 217 void operator()(continue_msg){*counter+=1;} 218 }; 219 220 void test_two_graphs(){ 221 int count=0; 222 223 //graph g with broadcast_node and continue_node 224 graph g; 225 broadcast_node<continue_msg> start_g(g); 226 continue_node<continue_msg> first_g(g, add_to_counter(count)); 227 228 //graph h with broadcast_node 229 graph h; 230 broadcast_node<continue_msg> start_h(h); 231 232 //making two edges to first_g from the two graphs 233 make_edge(start_g,first_g); 234 make_edge(start_h, first_g); 235 236 //two try_puts from the two graphs 237 start_g.try_put(continue_msg()); 238 start_h.try_put(continue_msg()); 239 g.wait_for_all(); 240 CHECK_MESSAGE( (count==1), "Not all continue messages received"); 241 242 //two try_puts from the graph that doesn't contain the node 243 count=0; 244 start_h.try_put(continue_msg()); 245 start_h.try_put(continue_msg()); 246 g.wait_for_all(); 247 CHECK_MESSAGE( (count==1), "Not all continue messages received -1"); 248 249 //only one try_put 250 count=0; 251 start_g.try_put(continue_msg()); 252 g.wait_for_all(); 253 CHECK_MESSAGE( (count==0), "Node executed without waiting for all predecessors"); 254 } 255 256 struct lightweight_policy_body { 257 const std::thread::id my_thread_id; 258 std::atomic<size_t>& my_count; 259 260 lightweight_policy_body( std::atomic<size_t>& count ) 261 : my_thread_id(std::this_thread::get_id()), my_count(count) 262 { 263 my_count = 0; 264 } 265 lightweight_policy_body& operator=(const lightweight_policy_body&) = delete; 266 void operator()(tbb::flow::continue_msg) { 267 ++my_count; 268 std::thread::id body_thread_id = std::this_thread::get_id(); 269 CHECK_MESSAGE( (body_thread_id == my_thread_id), "Body executed as not lightweight"); 270 } 271 }; 272 273 void test_lightweight_policy() { 274 tbb::flow::graph g; 275 std::atomic<size_t> count1; 276 std::atomic<size_t> count2; 277 tbb::flow::continue_node<tbb::flow::continue_msg, tbb::flow::lightweight> 278 node1(g, lightweight_policy_body(count1)); 279 tbb::flow::continue_node<tbb::flow::continue_msg, tbb::flow::lightweight> 280 node2(g, lightweight_policy_body(count2)); 281 282 tbb::flow::make_edge(node1, node2); 283 const size_t n = 10; 284 for(size_t i = 0; i < n; ++i) { 285 node1.try_put(tbb::flow::continue_msg()); 286 } 287 g.wait_for_all(); 288 289 lightweight_policy_body body1 = tbb::flow::copy_body<lightweight_policy_body>(node1); 290 lightweight_policy_body body2 = tbb::flow::copy_body<lightweight_policy_body>(node2); 291 CHECK_MESSAGE( (body1.my_count == n), "Body of the first node needs to be executed N times"); 292 CHECK_MESSAGE( (body2.my_count == n), "Body of the second node needs to be executed N times"); 293 } 294 295 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 296 #include <array> 297 #include <vector> 298 void test_follows_and_precedes_api() { 299 using msg_t = tbb::flow::continue_msg; 300 301 std::array<msg_t, 3> messages_for_follows = { { msg_t(), msg_t(), msg_t() } }; 302 std::vector<msg_t> messages_for_precedes = { msg_t() }; 303 304 auto pass_through = [](const msg_t& msg) { return msg; }; 305 306 follows_and_precedes_testing::test_follows 307 <msg_t, tbb::flow::continue_node<msg_t>> 308 (messages_for_follows, pass_through, node_priority_t(0)); 309 310 follows_and_precedes_testing::test_precedes 311 <msg_t, tbb::flow::continue_node<msg_t>> 312 (messages_for_precedes, /* number_of_predecessors = */0, pass_through, node_priority_t(1)); 313 } 314 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 315 316 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 317 318 template <typename ExpectedType, typename Body> 319 void test_deduction_guides_common(Body body) { 320 using namespace tbb::flow; 321 graph g; 322 323 continue_node c1(g, body); 324 static_assert(std::is_same_v<decltype(c1), continue_node<ExpectedType>>); 325 326 continue_node c2(g, body, lightweight()); 327 static_assert(std::is_same_v<decltype(c2), continue_node<ExpectedType, lightweight>>); 328 329 continue_node c3(g, 5, body); 330 static_assert(std::is_same_v<decltype(c3), continue_node<ExpectedType>>); 331 332 continue_node c4(g, 5, body, lightweight()); 333 static_assert(std::is_same_v<decltype(c4), continue_node<ExpectedType, lightweight>>); 334 335 continue_node c5(g, body, node_priority_t(5)); 336 static_assert(std::is_same_v<decltype(c5), continue_node<ExpectedType>>); 337 338 continue_node c6(g, body, lightweight(), node_priority_t(5)); 339 static_assert(std::is_same_v<decltype(c6), continue_node<ExpectedType, lightweight>>); 340 341 continue_node c7(g, 5, body, node_priority_t(5)); 342 static_assert(std::is_same_v<decltype(c7), continue_node<ExpectedType>>); 343 344 continue_node c8(g, 5, body, lightweight(), node_priority_t(5)); 345 static_assert(std::is_same_v<decltype(c8), continue_node<ExpectedType, lightweight>>); 346 347 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 348 broadcast_node<continue_msg> b(g); 349 350 continue_node c9(follows(b), body); 351 static_assert(std::is_same_v<decltype(c9), continue_node<ExpectedType>>); 352 353 continue_node c10(follows(b), body, lightweight()); 354 static_assert(std::is_same_v<decltype(c10), continue_node<ExpectedType, lightweight>>); 355 356 continue_node c11(follows(b), 5, body); 357 static_assert(std::is_same_v<decltype(c11), continue_node<ExpectedType>>); 358 359 continue_node c12(follows(b), 5, body, lightweight()); 360 static_assert(std::is_same_v<decltype(c12), continue_node<ExpectedType, lightweight>>); 361 362 continue_node c13(follows(b), body, node_priority_t(5)); 363 static_assert(std::is_same_v<decltype(c13), continue_node<ExpectedType>>); 364 365 continue_node c14(follows(b), body, lightweight(), node_priority_t(5)); 366 static_assert(std::is_same_v<decltype(c14), continue_node<ExpectedType, lightweight>>); 367 368 continue_node c15(follows(b), 5, body, node_priority_t(5)); 369 static_assert(std::is_same_v<decltype(c15), continue_node<ExpectedType>>); 370 371 continue_node c16(follows(b), 5, body, lightweight(), node_priority_t(5)); 372 static_assert(std::is_same_v<decltype(c16), continue_node<ExpectedType, lightweight>>); 373 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 374 375 continue_node c17(c1); 376 static_assert(std::is_same_v<decltype(c17), continue_node<ExpectedType>>); 377 } 378 379 int continue_body_f(const tbb::flow::continue_msg&) { return 1; } 380 void continue_void_body_f(const tbb::flow::continue_msg&) {} 381 382 void test_deduction_guides() { 383 using tbb::flow::continue_msg; 384 test_deduction_guides_common<int>([](const continue_msg&)->int { return 1; } ); 385 test_deduction_guides_common<continue_msg>([](const continue_msg&) {}); 386 387 test_deduction_guides_common<int>([](const continue_msg&) mutable ->int { return 1; }); 388 test_deduction_guides_common<continue_msg>([](const continue_msg&) mutable {}); 389 390 test_deduction_guides_common<int>(continue_body_f); 391 test_deduction_guides_common<continue_msg>(continue_void_body_f); 392 } 393 394 #endif // __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 395 396 // TODO: use pass_through from test_function_node instead 397 template<typename T> 398 struct passing_body { 399 T operator()(const T& val) { 400 return val; 401 } 402 }; 403 404 /* 405 The test covers the case when a node with non-default mutex type is a predecessor for continue_node, 406 because there used to be a bug when make_edge(node, continue_node) 407 did not update continue_node's predecesosor threshold 408 since the specialization of node's successor_cache for a continue_node was not chosen. 409 */ 410 void test_successor_cache_specialization() { 411 using namespace tbb::flow; 412 413 graph g; 414 415 broadcast_node<continue_msg> node_with_default_mutex_type(g); 416 buffer_node<continue_msg> node_with_non_default_mutex_type(g); 417 418 continue_node<continue_msg> node(g, passing_body<continue_msg>()); 419 420 make_edge(node_with_default_mutex_type, node); 421 make_edge(node_with_non_default_mutex_type, node); 422 423 buffer_node<continue_msg> buf(g); 424 425 make_edge(node, buf); 426 427 node_with_default_mutex_type.try_put(continue_msg()); 428 node_with_non_default_mutex_type.try_put(continue_msg()); 429 430 g.wait_for_all(); 431 432 continue_msg storage; 433 CHECK_MESSAGE((buf.try_get(storage) && !buf.try_get(storage)), 434 "Wrong number of messages is passed via continue_node"); 435 } 436 437 //! Test concurrent continue_node for correctness 438 //! \brief \ref error_guessing 439 TEST_CASE("Concurrency testing") { 440 for( unsigned p=utils::MinThread; p<=utils::MaxThread; ++p ) { 441 test_concurrency(p); 442 } 443 } 444 445 //! Test concurrent continue_node in separate graphs 446 //! \brief \ref error_guessing 447 TEST_CASE("Two graphs") { test_two_graphs(); } 448 449 //! Test basic behaviour with lightweight body 450 //! \brief \ref requirement \ref error_guessing 451 TEST_CASE( "Lightweight policy" ) { test_lightweight_policy(); } 452 453 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 454 //! Test deprecated follows and preceedes API 455 //! \brief \ref error_guessing 456 TEST_CASE( "Support for follows and precedes API" ) { test_follows_and_precedes_api(); } 457 #endif 458 459 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 460 //! Test deduction guides 461 //! \brief requirement 462 TEST_CASE( "Deduction guides" ) { test_deduction_guides(); } 463 #endif 464 465 //! Test for successor cache specialization 466 //! \brief \ref regression 467 TEST_CASE( "Regression for successor cache specialization" ) { 468 test_successor_cache_specialization(); 469 } 470