1 /* 2 Copyright (c) 2005-2020 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "common/config.h" 18 19 // TODO revamp: move parts dependent on __TBB_EXTRA_DEBUG into separate test(s) since having these 20 // parts in all of tests might make testing of the product, which is different from what is actually 21 // released. 22 #define __TBB_EXTRA_DEBUG 1 23 #include "tbb/flow_graph.h" 24 #include "tbb/spin_rw_mutex.h" 25 #include "tbb/global_control.h" 26 27 #include "common/test.h" 28 #include "common/utils.h" 29 #include "common/graph_utils.h" 30 #include "common/test_follows_and_precedes_api.h" 31 32 33 //! \file test_function_node.cpp 34 //! \brief Test for [flow_graph.function_node] specification 35 36 37 #define N 100 38 #define MAX_NODES 4 39 40 //! Performs test on function nodes with limited concurrency and buffering 41 /** These tests check: 42 1) that the number of executing copies never exceed the concurrency limit 43 2) that the node never rejects 44 3) that no items are lost 45 and 4) all of this happens even if there are multiple predecessors and successors 46 */ 47 48 template<typename IO> 49 struct pass_through { 50 IO operator()(const IO& i) { return i; } 51 }; 52 53 template< typename InputType, typename OutputType, typename Body > 54 void buffered_levels( size_t concurrency, Body body ) { 55 56 // Do for lc = 1 to concurrency level 57 for ( size_t lc = 1; lc <= concurrency; ++lc ) { 58 tbb::flow::graph g; 59 60 // Set the execute_counter back to zero in the harness 61 harness_graph_executor<InputType, OutputType>::execute_count = 0; 62 // Set the number of current executors to zero. 63 harness_graph_executor<InputType, OutputType>::current_executors = 0; 64 // Set the max allowed executors to lc. There is a check in the functor to make sure this is never exceeded. 65 harness_graph_executor<InputType, OutputType>::max_executors = lc; 66 67 // Create the function_node with the appropriate concurrency level, and use default buffering 68 tbb::flow::function_node< InputType, OutputType > exe_node( g, lc, body ); 69 tbb::flow::function_node<InputType, InputType> pass_thru( g, tbb::flow::unlimited, pass_through<InputType>()); 70 71 // Create a vector of identical exe_nodes and pass_thrus 72 std::vector< tbb::flow::function_node< InputType, OutputType > > exe_vec(2, exe_node); 73 std::vector< tbb::flow::function_node< InputType, InputType > > pass_thru_vec(2, pass_thru); 74 // Attach each pass_thru to its corresponding exe_node 75 for (size_t node_idx=0; node_idx<exe_vec.size(); ++node_idx) { 76 tbb::flow::make_edge(pass_thru_vec[node_idx], exe_vec[node_idx]); 77 } 78 79 // TODO: why the test is executed serially for the node pairs, not concurrently? 80 for (size_t node_idx=0; node_idx<exe_vec.size(); ++node_idx) { 81 // For num_receivers = 1 to MAX_NODES 82 for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { 83 // Create num_receivers counting receivers and connect the exe_vec[node_idx] to them. 84 std::vector< std::shared_ptr<harness_mapped_receiver<OutputType>> > receivers; 85 for (size_t i = 0; i < num_receivers; i++) { 86 receivers.push_back( std::make_shared<harness_mapped_receiver<OutputType>>(g) ); 87 } 88 89 for (size_t r = 0; r < num_receivers; ++r ) { 90 tbb::flow::make_edge( exe_vec[node_idx], *receivers[r] ); 91 } 92 93 // Do the test with varying numbers of senders 94 std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders; 95 for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) { 96 // Create num_senders senders, set there message limit each to N, and connect them to 97 // pass_thru_vec[node_idx] 98 senders.clear(); 99 for (size_t s = 0; s < num_senders; ++s ) { 100 senders.push_back( std::make_shared<harness_counting_sender<InputType>>() ); 101 senders.back()->my_limit = N; 102 senders.back()->register_successor(pass_thru_vec[node_idx] ); 103 } 104 105 // Initialize the receivers so they know how many senders and messages to check for 106 for (size_t r = 0; r < num_receivers; ++r ) { 107 receivers[r]->initialize_map( N, num_senders ); 108 } 109 110 // Do the test 111 utils::NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) ); 112 g.wait_for_all(); 113 114 // confirm that each sender was requested from N times 115 for (size_t s = 0; s < num_senders; ++s ) { 116 size_t n = senders[s]->my_received; 117 CHECK( n == N ); 118 CHECK( senders[s]->my_receiver.load(std::memory_order_relaxed) == &pass_thru_vec[node_idx] ); 119 } 120 // validate the receivers 121 for (size_t r = 0; r < num_receivers; ++r ) { 122 receivers[r]->validate(); 123 } 124 } 125 for (size_t r = 0; r < num_receivers; ++r ) { 126 tbb::flow::remove_edge( exe_vec[node_idx], *receivers[r] ); 127 } 128 CHECK( exe_vec[node_idx].try_put( InputType() ) == true ); 129 g.wait_for_all(); 130 for (size_t r = 0; r < num_receivers; ++r ) { 131 // since it's detached, nothing should have changed 132 receivers[r]->validate(); 133 } 134 135 } // for num_receivers 136 } // for node_idx 137 } // for concurrency level lc 138 } 139 140 const size_t Offset = 123; 141 std::atomic<size_t> global_execute_count; 142 143 struct inc_functor { 144 145 std::atomic<size_t> local_execute_count; 146 inc_functor( ) { local_execute_count = 0; } 147 inc_functor( const inc_functor &f ) { local_execute_count = size_t(f.local_execute_count); } 148 void operator=( const inc_functor &f ) { local_execute_count = size_t(f.local_execute_count); } 149 150 int operator()( int i ) { 151 ++global_execute_count; 152 ++local_execute_count; 153 return i; 154 } 155 156 }; 157 158 template< typename InputType, typename OutputType > 159 void buffered_levels_with_copy( size_t concurrency ) { 160 161 // Do for lc = 1 to concurrency level 162 for ( size_t lc = 1; lc <= concurrency; ++lc ) { 163 tbb::flow::graph g; 164 165 inc_functor cf; 166 cf.local_execute_count = Offset; 167 global_execute_count = Offset; 168 169 tbb::flow::function_node< InputType, OutputType > exe_node( g, lc, cf ); 170 171 for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { 172 173 std::vector< std::shared_ptr<harness_mapped_receiver<OutputType>> > receivers; 174 for (size_t i = 0; i < num_receivers; i++) { 175 receivers.push_back( std::make_shared<harness_mapped_receiver<OutputType>>(g) ); 176 } 177 178 for (size_t r = 0; r < num_receivers; ++r ) { 179 tbb::flow::make_edge( exe_node, *receivers[r] ); 180 } 181 182 std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders; 183 for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) { 184 senders.clear(); 185 for (size_t s = 0; s < num_senders; ++s ) { 186 senders.push_back( std::make_shared<harness_counting_sender<InputType>>() ); 187 senders.back()->my_limit = N; 188 tbb::flow::make_edge( *senders.back(), exe_node ); 189 } 190 191 for (size_t r = 0; r < num_receivers; ++r ) { 192 receivers[r]->initialize_map( N, num_senders ); 193 } 194 195 utils::NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) ); 196 g.wait_for_all(); 197 198 for (size_t s = 0; s < num_senders; ++s ) { 199 size_t n = senders[s]->my_received; 200 CHECK( n == N ); 201 CHECK( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_node ); 202 } 203 for (size_t r = 0; r < num_receivers; ++r ) { 204 receivers[r]->validate(); 205 } 206 } 207 for (size_t r = 0; r < num_receivers; ++r ) { 208 tbb::flow::remove_edge( exe_node, *receivers[r] ); 209 } 210 CHECK( exe_node.try_put( InputType() ) == true ); 211 g.wait_for_all(); 212 for (size_t r = 0; r < num_receivers; ++r ) { 213 receivers[r]->validate(); 214 } 215 } 216 217 // validate that the local body matches the global execute_count and both are correct 218 inc_functor body_copy = tbb::flow::copy_body<inc_functor>( exe_node ); 219 const size_t expected_count = N/2 * MAX_NODES * MAX_NODES * ( MAX_NODES + 1 ) + MAX_NODES + Offset; 220 size_t global_count = global_execute_count; 221 size_t inc_count = body_copy.local_execute_count; 222 CHECK(global_count == expected_count); 223 CHECK(global_count == inc_count ); 224 g.reset(tbb::flow::rf_reset_bodies); 225 body_copy = tbb::flow::copy_body<inc_functor>( exe_node ); 226 inc_count = body_copy.local_execute_count; 227 CHECK_MESSAGE( Offset == inc_count, "reset(rf_reset_bodies) did not reset functor" ); 228 } 229 } 230 231 template< typename InputType, typename OutputType > 232 void run_buffered_levels( int c ) { 233 buffered_levels<InputType,OutputType>( c, []( InputType i ) -> OutputType { return harness_graph_executor<InputType, OutputType>::func(i); } ); 234 buffered_levels<InputType,OutputType>( c, &harness_graph_executor<InputType, OutputType>::func ); 235 buffered_levels<InputType,OutputType>( c, typename harness_graph_executor<InputType, OutputType>::functor() ); 236 buffered_levels_with_copy<InputType,OutputType>( c ); 237 } 238 239 240 //! Performs test on executable nodes with limited concurrency 241 /** These tests check: 242 1) that the nodes will accepts puts up to the concurrency limit, 243 2) the nodes do not exceed the concurrency limit even when run with more threads (this is checked in the harness_graph_executor), 244 3) the nodes will receive puts from multiple successors simultaneously, 245 and 4) the nodes will send to multiple predecessors. 246 There is no checking of the contents of the messages for corruption. 247 */ 248 249 template< typename InputType, typename OutputType, typename Body > 250 void concurrency_levels( size_t concurrency, Body body ) { 251 252 for ( size_t lc = 1; lc <= concurrency; ++lc ) { 253 tbb::flow::graph g; 254 255 // Set the execute_counter back to zero in the harness 256 harness_graph_executor<InputType, OutputType>::execute_count = 0; 257 // Set the number of current executors to zero. 258 harness_graph_executor<InputType, OutputType>::current_executors = 0; 259 // Set the max allowed executors to lc. There is a check in the functor to make sure this is never exceeded. 260 harness_graph_executor<InputType, OutputType>::max_executors = lc; 261 262 typedef tbb::flow::function_node< InputType, OutputType, tbb::flow::rejecting > fnode_type; 263 fnode_type exe_node( g, lc, body ); 264 265 for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { 266 267 std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers; 268 for (size_t i = 0; i < num_receivers; ++i) { 269 receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) ); 270 } 271 272 for (size_t r = 0; r < num_receivers; ++r ) { 273 tbb::flow::make_edge( exe_node, *receivers[r] ); 274 } 275 276 std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders; 277 278 for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) { 279 senders.clear(); 280 { 281 // Exclusively lock m to prevent exe_node from finishing 282 tbb::spin_rw_mutex::scoped_lock l( 283 harness_graph_executor<InputType, OutputType>::template mutex_holder<tbb::spin_rw_mutex>::mutex 284 ); 285 286 // put to lc level, it will accept and then block at m 287 for ( size_t c = 0 ; c < lc ; ++c ) { 288 CHECK( exe_node.try_put( InputType() ) == true ); 289 } 290 // it only accepts to lc level 291 CHECK( exe_node.try_put( InputType() ) == false ); 292 293 for (size_t s = 0; s < num_senders; ++s ) { 294 senders.push_back( std::make_shared<harness_counting_sender<InputType>>() ); 295 // register a sender 296 senders.back()->my_limit = N; 297 exe_node.register_predecessor( *senders.back() ); 298 } 299 300 } // release lock at end of scope, setting the exe node free to continue 301 // wait for graph to settle down 302 g.wait_for_all(); 303 304 // confirm that each sender was requested from N times 305 for (size_t s = 0; s < num_senders; ++s ) { 306 size_t n = senders[s]->my_received; 307 CHECK( n == N ); 308 CHECK( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_node ); 309 } 310 // confirm that each receivers got N * num_senders + the initial lc puts 311 for (size_t r = 0; r < num_receivers; ++r ) { 312 size_t n = receivers[r]->my_count; 313 CHECK( n == num_senders*N+lc ); 314 receivers[r]->my_count = 0; 315 } 316 } 317 for (size_t r = 0; r < num_receivers; ++r ) { 318 tbb::flow::remove_edge( exe_node, *receivers[r] ); 319 } 320 CHECK( exe_node.try_put( InputType() ) == true ); 321 g.wait_for_all(); 322 for (size_t r = 0; r < num_receivers; ++r ) { 323 CHECK( int(receivers[r]->my_count) == 0 ); 324 } 325 } 326 } 327 } 328 329 330 template< typename InputType, typename OutputType > 331 void run_concurrency_levels( int c ) { 332 concurrency_levels<InputType,OutputType>( c, []( InputType i ) -> OutputType { return harness_graph_executor<InputType, OutputType>::template tfunc<tbb::spin_rw_mutex>(i); } ); 333 concurrency_levels<InputType,OutputType>( c, &harness_graph_executor<InputType, OutputType>::template tfunc<tbb::spin_rw_mutex> ); 334 concurrency_levels<InputType,OutputType>( c, typename harness_graph_executor<InputType, OutputType>::template tfunctor<tbb::spin_rw_mutex>() ); 335 } 336 337 338 struct empty_no_assign { 339 empty_no_assign() {} 340 empty_no_assign( int ) {} 341 operator int() { return 0; } 342 }; 343 344 template< typename InputType > 345 struct parallel_puts : private utils::NoAssign { 346 347 tbb::flow::receiver< InputType > * const my_exe_node; 348 349 parallel_puts( tbb::flow::receiver< InputType > &exe_node ) : my_exe_node(&exe_node) {} 350 351 void operator()( int ) const { 352 for ( int i = 0; i < N; ++i ) { 353 // the nodes will accept all puts 354 CHECK( my_exe_node->try_put( InputType() ) == true ); 355 } 356 } 357 358 }; 359 360 //! Performs test on executable nodes with unlimited concurrency 361 /** These tests check: 362 1) that the nodes will accept all puts 363 2) the nodes will receive puts from multiple predecessors simultaneously, 364 and 3) the nodes will send to multiple successors. 365 There is no checking of the contents of the messages for corruption. 366 */ 367 368 template< typename InputType, typename OutputType, typename Body > 369 void unlimited_concurrency( Body body ) { 370 371 for (unsigned p = 1; p < 2*utils::MaxThread; ++p) { 372 tbb::flow::graph g; 373 tbb::flow::function_node< InputType, OutputType, tbb::flow::rejecting > exe_node( g, tbb::flow::unlimited, body ); 374 375 for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { 376 377 std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers; 378 for (size_t i = 0; i < num_receivers; ++i) { 379 receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) ); 380 } 381 382 harness_graph_executor<InputType, OutputType>::execute_count = 0; 383 384 for (size_t r = 0; r < num_receivers; ++r ) { 385 tbb::flow::make_edge( exe_node, *receivers[r] ); 386 } 387 388 utils::NativeParallelFor( p, parallel_puts<InputType>(exe_node) ); 389 g.wait_for_all(); 390 391 // 2) the nodes will receive puts from multiple predecessors simultaneously, 392 size_t ec = harness_graph_executor<InputType, OutputType>::execute_count; 393 CHECK( ec == p*N ); 394 for (size_t r = 0; r < num_receivers; ++r ) { 395 size_t c = receivers[r]->my_count; 396 // 3) the nodes will send to multiple successors. 397 CHECK( c == p*N ); 398 } 399 for (size_t r = 0; r < num_receivers; ++r ) { 400 tbb::flow::remove_edge( exe_node, *receivers[r] ); 401 } 402 } 403 } 404 } 405 406 template< typename InputType, typename OutputType > 407 void run_unlimited_concurrency() { 408 harness_graph_executor<InputType, OutputType>::max_executors = 0; 409 unlimited_concurrency<InputType,OutputType>( []( InputType i ) -> OutputType { return harness_graph_executor<InputType, OutputType>::func(i); } ); 410 unlimited_concurrency<InputType,OutputType>( &harness_graph_executor<InputType, OutputType>::func ); 411 unlimited_concurrency<InputType,OutputType>( typename harness_graph_executor<InputType, OutputType>::functor() ); 412 } 413 414 struct continue_msg_to_int { 415 int my_int; 416 continue_msg_to_int(int x) : my_int(x) {} 417 int operator()(tbb::flow::continue_msg) { return my_int; } 418 }; 419 420 void test_function_node_with_continue_msg_as_input() { 421 // If this function terminates, then this test is successful 422 tbb::flow::graph g; 423 424 tbb::flow::broadcast_node<tbb::flow::continue_msg> Start(g); 425 426 tbb::flow::function_node<tbb::flow::continue_msg, int, tbb::flow::rejecting> FN1( g, tbb::flow::serial, continue_msg_to_int(42)); 427 tbb::flow::function_node<tbb::flow::continue_msg, int, tbb::flow::rejecting> FN2( g, tbb::flow::serial, continue_msg_to_int(43)); 428 429 tbb::flow::make_edge( Start, FN1 ); 430 tbb::flow::make_edge( Start, FN2 ); 431 432 Start.try_put( tbb::flow::continue_msg() ); 433 g.wait_for_all(); 434 } 435 436 //! Tests limited concurrency cases for nodes that accept data messages 437 void test_concurrency(int num_threads) { 438 tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, num_threads); 439 run_concurrency_levels<int,int>(num_threads); 440 run_concurrency_levels<int,tbb::flow::continue_msg>(num_threads); 441 run_buffered_levels<int, int>(num_threads); 442 run_unlimited_concurrency<int,int>(); 443 run_unlimited_concurrency<int,empty_no_assign>(); 444 run_unlimited_concurrency<empty_no_assign,int>(); 445 run_unlimited_concurrency<empty_no_assign,empty_no_assign>(); 446 run_unlimited_concurrency<int,tbb::flow::continue_msg>(); 447 run_unlimited_concurrency<empty_no_assign,tbb::flow::continue_msg>(); 448 test_function_node_with_continue_msg_as_input(); 449 } 450 451 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 452 #include <array> 453 #include <vector> 454 void test_follows_and_precedes_api() { 455 using msg_t = tbb::flow::continue_msg; 456 457 std::array<msg_t, 3> messages_for_follows = { {msg_t(), msg_t(), msg_t()} }; 458 std::vector<msg_t> messages_for_precedes = { msg_t() }; 459 460 pass_through<msg_t> pass_msg; 461 462 follows_and_precedes_testing::test_follows 463 <msg_t, tbb::flow::function_node<msg_t, msg_t>> 464 (messages_for_follows, tbb::flow::unlimited, pass_msg); 465 follows_and_precedes_testing::test_precedes 466 <msg_t, tbb::flow::function_node<msg_t, msg_t>> 467 (messages_for_precedes, tbb::flow::unlimited, pass_msg, tbb::flow::node_priority_t(1)); 468 } 469 #endif 470 471 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 472 473 int function_body_f(const int&) { return 1; } 474 475 template <typename Body> 476 void test_deduction_guides_common(Body body) { 477 using namespace tbb::flow; 478 graph g; 479 480 function_node f1(g, unlimited, body); 481 static_assert(std::is_same_v<decltype(f1), function_node<int, int>>); 482 483 function_node f2(g, unlimited, body, rejecting()); 484 static_assert(std::is_same_v<decltype(f2), function_node<int, int, rejecting>>); 485 486 function_node f3(g, unlimited, body, node_priority_t(5)); 487 static_assert(std::is_same_v<decltype(f3), function_node<int, int>>); 488 489 function_node f4(g, unlimited, body, rejecting(), node_priority_t(5)); 490 static_assert(std::is_same_v<decltype(f4), function_node<int, int, rejecting>>); 491 492 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 493 function_node f5(follows(f2), unlimited, body); 494 static_assert(std::is_same_v<decltype(f5), function_node<int, int>>); 495 496 function_node f6(follows(f5), unlimited, body, rejecting()); 497 static_assert(std::is_same_v<decltype(f6), function_node<int, int, rejecting>>); 498 499 function_node f7(follows(f6), unlimited, body, node_priority_t(5)); 500 static_assert(std::is_same_v<decltype(f7), function_node<int, int>>); 501 502 function_node f8(follows(f7), unlimited, body, rejecting(), node_priority_t(5)); 503 static_assert(std::is_same_v<decltype(f8), function_node<int, int, rejecting>>); 504 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 505 506 function_node f9(f1); 507 static_assert(std::is_same_v<decltype(f9), function_node<int, int>>); 508 } 509 510 void test_deduction_guides() { 511 test_deduction_guides_common([](const int&)->int { return 1; }); 512 test_deduction_guides_common([](const int&) mutable ->int { return 1; }); 513 test_deduction_guides_common(function_body_f); 514 } 515 516 #endif 517 518 //! Test various node bodies with concurrency 519 //! \brief \ref error_guessing 520 TEST_CASE("Concurrency test") { 521 for(unsigned int p = utils::MinThread; p <= utils::MaxThread; ++p ) { 522 test_concurrency(p); 523 } 524 } 525 526 //! NativeParallelFor testing with various concurrency settings 527 //! \brief \ref error_guessing 528 TEST_CASE("Lightweight testing"){ 529 lightweight_testing::test<tbb::flow::function_node>(10); 530 } 531 532 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 533 //! Test follows and precedes API 534 //! \brief \ref error_guessing 535 TEST_CASE("Flowgraph node set test"){ 536 test_follows_and_precedes_api(); 537 } 538 #endif 539 540 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 541 //! Test decution guides 542 //! \brief \ref requirement 543 TEST_CASE("Deduction guides test"){ 544 test_deduction_guides(); 545 } 546 #endif 547 548 //! try_release and try_consume test 549 //! \brief \ref error_guessing 550 TEST_CASE("try_release try_consume"){ 551 tbb::flow::graph g; 552 553 tbb::flow::function_node<int, int> fn(g, tbb::flow::unlimited, [](const int&v){return v;}); 554 555 CHECK_MESSAGE((fn.try_release()==false), "try_release should initially return false on a node"); 556 CHECK_MESSAGE((fn.try_consume()==false), "try_consume should initially return false on a node"); 557 } 558