1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #if __INTEL_COMPILER && _MSC_VER 18 #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated 19 #endif 20 21 #include "common/config.h" 22 23 // TODO revamp: move parts dependent on __TBB_EXTRA_DEBUG into separate test(s) since having these 24 // parts in all of tests might make testing of the product, which is different from what is actually 25 // released. 26 #define __TBB_EXTRA_DEBUG 1 27 #include "tbb/flow_graph.h" 28 #include "tbb/spin_rw_mutex.h" 29 #include "tbb/global_control.h" 30 31 #include "common/test.h" 32 #include "common/utils.h" 33 #include "common/graph_utils.h" 34 #include "common/test_follows_and_precedes_api.h" 35 36 37 //! \file test_function_node.cpp 38 //! \brief Test for [flow_graph.function_node] specification 39 40 41 #define N 100 42 #define MAX_NODES 4 43 44 //! Performs test on function nodes with limited concurrency and buffering 45 /** These tests check: 46 1) that the number of executing copies never exceed the concurrency limit 47 2) that the node never rejects 48 3) that no items are lost 49 and 4) all of this happens even if there are multiple predecessors and successors 50 */ 51 52 template<typename IO> 53 struct pass_through { 54 IO operator()(const IO& i) { return i; } 55 }; 56 57 template< typename InputType, typename OutputType, typename Body > 58 void buffered_levels( size_t concurrency, Body body ) { 59 60 // Do for lc = 1 to concurrency level 61 for ( size_t lc = 1; lc <= concurrency; ++lc ) { 62 tbb::flow::graph g; 63 64 // Set the execute_counter back to zero in the harness 65 harness_graph_executor<InputType, OutputType>::execute_count = 0; 66 // Set the number of current executors to zero. 67 harness_graph_executor<InputType, OutputType>::current_executors = 0; 68 // Set the max allowed executors to lc. There is a check in the functor to make sure this is never exceeded. 69 harness_graph_executor<InputType, OutputType>::max_executors = lc; 70 71 // Create the function_node with the appropriate concurrency level, and use default buffering 72 tbb::flow::function_node< InputType, OutputType > exe_node( g, lc, body ); 73 tbb::flow::function_node<InputType, InputType> pass_thru( g, tbb::flow::unlimited, pass_through<InputType>()); 74 75 // Create a vector of identical exe_nodes and pass_thrus 76 std::vector< tbb::flow::function_node< InputType, OutputType > > exe_vec(2, exe_node); 77 std::vector< tbb::flow::function_node< InputType, InputType > > pass_thru_vec(2, pass_thru); 78 // Attach each pass_thru to its corresponding exe_node 79 for (size_t node_idx=0; node_idx<exe_vec.size(); ++node_idx) { 80 tbb::flow::make_edge(pass_thru_vec[node_idx], exe_vec[node_idx]); 81 } 82 83 // TODO: why the test is executed serially for the node pairs, not concurrently? 84 for (size_t node_idx=0; node_idx<exe_vec.size(); ++node_idx) { 85 // For num_receivers = 1 to MAX_NODES 86 for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { 87 // Create num_receivers counting receivers and connect the exe_vec[node_idx] to them. 88 std::vector< std::shared_ptr<harness_mapped_receiver<OutputType>> > receivers; 89 for (size_t i = 0; i < num_receivers; i++) { 90 receivers.push_back( std::make_shared<harness_mapped_receiver<OutputType>>(g) ); 91 } 92 93 for (size_t r = 0; r < num_receivers; ++r ) { 94 tbb::flow::make_edge( exe_vec[node_idx], *receivers[r] ); 95 } 96 97 // Do the test with varying numbers of senders 98 std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders; 99 for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) { 100 // Create num_senders senders, set there message limit each to N, and connect them to 101 // pass_thru_vec[node_idx] 102 senders.clear(); 103 for (size_t s = 0; s < num_senders; ++s ) { 104 senders.push_back( std::make_shared<harness_counting_sender<InputType>>() ); 105 senders.back()->my_limit = N; 106 senders.back()->register_successor(pass_thru_vec[node_idx] ); 107 } 108 109 // Initialize the receivers so they know how many senders and messages to check for 110 for (size_t r = 0; r < num_receivers; ++r ) { 111 receivers[r]->initialize_map( N, num_senders ); 112 } 113 114 // Do the test 115 utils::NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) ); 116 g.wait_for_all(); 117 118 // confirm that each sender was requested from N times 119 for (size_t s = 0; s < num_senders; ++s ) { 120 size_t n = senders[s]->my_received; 121 CHECK( n == N ); 122 CHECK( senders[s]->my_receiver.load(std::memory_order_relaxed) == &pass_thru_vec[node_idx] ); 123 } 124 // validate the receivers 125 for (size_t r = 0; r < num_receivers; ++r ) { 126 receivers[r]->validate(); 127 } 128 } 129 for (size_t r = 0; r < num_receivers; ++r ) { 130 tbb::flow::remove_edge( exe_vec[node_idx], *receivers[r] ); 131 } 132 CHECK( exe_vec[node_idx].try_put( InputType() ) == true ); 133 g.wait_for_all(); 134 for (size_t r = 0; r < num_receivers; ++r ) { 135 // since it's detached, nothing should have changed 136 receivers[r]->validate(); 137 } 138 139 } // for num_receivers 140 } // for node_idx 141 } // for concurrency level lc 142 } 143 144 const size_t Offset = 123; 145 std::atomic<size_t> global_execute_count; 146 147 struct inc_functor { 148 149 std::atomic<size_t> local_execute_count; 150 inc_functor( ) { local_execute_count = 0; } 151 inc_functor( const inc_functor &f ) { local_execute_count = size_t(f.local_execute_count); } 152 void operator=( const inc_functor &f ) { local_execute_count = size_t(f.local_execute_count); } 153 154 int operator()( int i ) { 155 ++global_execute_count; 156 ++local_execute_count; 157 return i; 158 } 159 160 }; 161 162 template< typename InputType, typename OutputType > 163 void buffered_levels_with_copy( size_t concurrency ) { 164 165 // Do for lc = 1 to concurrency level 166 for ( size_t lc = 1; lc <= concurrency; ++lc ) { 167 tbb::flow::graph g; 168 169 inc_functor cf; 170 cf.local_execute_count = Offset; 171 global_execute_count = Offset; 172 173 tbb::flow::function_node< InputType, OutputType > exe_node( g, lc, cf ); 174 175 for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { 176 177 std::vector< std::shared_ptr<harness_mapped_receiver<OutputType>> > receivers; 178 for (size_t i = 0; i < num_receivers; i++) { 179 receivers.push_back( std::make_shared<harness_mapped_receiver<OutputType>>(g) ); 180 } 181 182 for (size_t r = 0; r < num_receivers; ++r ) { 183 tbb::flow::make_edge( exe_node, *receivers[r] ); 184 } 185 186 std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders; 187 for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) { 188 senders.clear(); 189 for (size_t s = 0; s < num_senders; ++s ) { 190 senders.push_back( std::make_shared<harness_counting_sender<InputType>>() ); 191 senders.back()->my_limit = N; 192 tbb::flow::make_edge( *senders.back(), exe_node ); 193 } 194 195 for (size_t r = 0; r < num_receivers; ++r ) { 196 receivers[r]->initialize_map( N, num_senders ); 197 } 198 199 utils::NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) ); 200 g.wait_for_all(); 201 202 for (size_t s = 0; s < num_senders; ++s ) { 203 size_t n = senders[s]->my_received; 204 CHECK( n == N ); 205 CHECK( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_node ); 206 } 207 for (size_t r = 0; r < num_receivers; ++r ) { 208 receivers[r]->validate(); 209 } 210 } 211 for (size_t r = 0; r < num_receivers; ++r ) { 212 tbb::flow::remove_edge( exe_node, *receivers[r] ); 213 } 214 CHECK( exe_node.try_put( InputType() ) == true ); 215 g.wait_for_all(); 216 for (size_t r = 0; r < num_receivers; ++r ) { 217 receivers[r]->validate(); 218 } 219 } 220 221 // validate that the local body matches the global execute_count and both are correct 222 inc_functor body_copy = tbb::flow::copy_body<inc_functor>( exe_node ); 223 const size_t expected_count = N/2 * MAX_NODES * MAX_NODES * ( MAX_NODES + 1 ) + MAX_NODES + Offset; 224 size_t global_count = global_execute_count; 225 size_t inc_count = body_copy.local_execute_count; 226 CHECK(global_count == expected_count); 227 CHECK(global_count == inc_count ); 228 g.reset(tbb::flow::rf_reset_bodies); 229 body_copy = tbb::flow::copy_body<inc_functor>( exe_node ); 230 inc_count = body_copy.local_execute_count; 231 CHECK_MESSAGE( Offset == inc_count, "reset(rf_reset_bodies) did not reset functor" ); 232 } 233 } 234 235 template< typename InputType, typename OutputType > 236 void run_buffered_levels( int c ) { 237 buffered_levels<InputType,OutputType>( c, []( InputType i ) -> OutputType { return harness_graph_executor<InputType, OutputType>::func(i); } ); 238 buffered_levels<InputType,OutputType>( c, &harness_graph_executor<InputType, OutputType>::func ); 239 buffered_levels<InputType,OutputType>( c, typename harness_graph_executor<InputType, OutputType>::functor() ); 240 buffered_levels_with_copy<InputType,OutputType>( c ); 241 } 242 243 244 //! Performs test on executable nodes with limited concurrency 245 /** These tests check: 246 1) that the nodes will accepts puts up to the concurrency limit, 247 2) the nodes do not exceed the concurrency limit even when run with more threads (this is checked in the harness_graph_executor), 248 3) the nodes will receive puts from multiple successors simultaneously, 249 and 4) the nodes will send to multiple predecessors. 250 There is no checking of the contents of the messages for corruption. 251 */ 252 253 template< typename InputType, typename OutputType, typename Body > 254 void concurrency_levels( size_t concurrency, Body body ) { 255 256 for ( size_t lc = 1; lc <= concurrency; ++lc ) { 257 tbb::flow::graph g; 258 259 // Set the execute_counter back to zero in the harness 260 harness_graph_executor<InputType, OutputType>::execute_count = 0; 261 // Set the number of current executors to zero. 262 harness_graph_executor<InputType, OutputType>::current_executors = 0; 263 // Set the max allowed executors to lc. There is a check in the functor to make sure this is never exceeded. 264 harness_graph_executor<InputType, OutputType>::max_executors = lc; 265 266 typedef tbb::flow::function_node< InputType, OutputType, tbb::flow::rejecting > fnode_type; 267 fnode_type exe_node( g, lc, body ); 268 269 for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { 270 271 std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers; 272 for (size_t i = 0; i < num_receivers; ++i) { 273 receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) ); 274 } 275 276 for (size_t r = 0; r < num_receivers; ++r ) { 277 tbb::flow::make_edge( exe_node, *receivers[r] ); 278 } 279 280 std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders; 281 282 for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) { 283 senders.clear(); 284 { 285 // Exclusively lock m to prevent exe_node from finishing 286 tbb::spin_rw_mutex::scoped_lock l( 287 harness_graph_executor<InputType, OutputType>::template mutex_holder<tbb::spin_rw_mutex>::mutex 288 ); 289 290 // put to lc level, it will accept and then block at m 291 for ( size_t c = 0 ; c < lc ; ++c ) { 292 CHECK( exe_node.try_put( InputType() ) == true ); 293 } 294 // it only accepts to lc level 295 CHECK( exe_node.try_put( InputType() ) == false ); 296 297 for (size_t s = 0; s < num_senders; ++s ) { 298 senders.push_back( std::make_shared<harness_counting_sender<InputType>>() ); 299 // register a sender 300 senders.back()->my_limit = N; 301 exe_node.register_predecessor( *senders.back() ); 302 } 303 304 } // release lock at end of scope, setting the exe node free to continue 305 // wait for graph to settle down 306 g.wait_for_all(); 307 308 // confirm that each sender was requested from N times 309 for (size_t s = 0; s < num_senders; ++s ) { 310 size_t n = senders[s]->my_received; 311 CHECK( n == N ); 312 CHECK( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_node ); 313 } 314 // confirm that each receivers got N * num_senders + the initial lc puts 315 for (size_t r = 0; r < num_receivers; ++r ) { 316 size_t n = receivers[r]->my_count; 317 CHECK( n == num_senders*N+lc ); 318 receivers[r]->my_count = 0; 319 } 320 } 321 for (size_t r = 0; r < num_receivers; ++r ) { 322 tbb::flow::remove_edge( exe_node, *receivers[r] ); 323 } 324 CHECK( exe_node.try_put( InputType() ) == true ); 325 g.wait_for_all(); 326 for (size_t r = 0; r < num_receivers; ++r ) { 327 CHECK( int(receivers[r]->my_count) == 0 ); 328 } 329 } 330 } 331 } 332 333 334 template< typename InputType, typename OutputType > 335 void run_concurrency_levels( int c ) { 336 concurrency_levels<InputType,OutputType>( c, []( InputType i ) -> OutputType { return harness_graph_executor<InputType, OutputType>::template tfunc<tbb::spin_rw_mutex>(i); } ); 337 concurrency_levels<InputType,OutputType>( c, &harness_graph_executor<InputType, OutputType>::template tfunc<tbb::spin_rw_mutex> ); 338 concurrency_levels<InputType,OutputType>( c, typename harness_graph_executor<InputType, OutputType>::template tfunctor<tbb::spin_rw_mutex>() ); 339 } 340 341 342 struct empty_no_assign { 343 empty_no_assign() {} 344 empty_no_assign( int ) {} 345 operator int() { return 0; } 346 }; 347 348 template< typename InputType > 349 struct parallel_puts : private utils::NoAssign { 350 351 tbb::flow::receiver< InputType > * const my_exe_node; 352 353 parallel_puts( tbb::flow::receiver< InputType > &exe_node ) : my_exe_node(&exe_node) {} 354 355 void operator()( int ) const { 356 for ( int i = 0; i < N; ++i ) { 357 // the nodes will accept all puts 358 CHECK( my_exe_node->try_put( InputType() ) == true ); 359 } 360 } 361 362 }; 363 364 //! Performs test on executable nodes with unlimited concurrency 365 /** These tests check: 366 1) that the nodes will accept all puts 367 2) the nodes will receive puts from multiple predecessors simultaneously, 368 and 3) the nodes will send to multiple successors. 369 There is no checking of the contents of the messages for corruption. 370 */ 371 372 template< typename InputType, typename OutputType, typename Body > 373 void unlimited_concurrency( Body body ) { 374 375 for (unsigned p = 1; p < 2*utils::MaxThread; ++p) { 376 tbb::flow::graph g; 377 tbb::flow::function_node< InputType, OutputType, tbb::flow::rejecting > exe_node( g, tbb::flow::unlimited, body ); 378 379 for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { 380 381 std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers; 382 for (size_t i = 0; i < num_receivers; ++i) { 383 receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) ); 384 } 385 386 harness_graph_executor<InputType, OutputType>::execute_count = 0; 387 388 for (size_t r = 0; r < num_receivers; ++r ) { 389 tbb::flow::make_edge( exe_node, *receivers[r] ); 390 } 391 392 utils::NativeParallelFor( p, parallel_puts<InputType>(exe_node) ); 393 g.wait_for_all(); 394 395 // 2) the nodes will receive puts from multiple predecessors simultaneously, 396 size_t ec = harness_graph_executor<InputType, OutputType>::execute_count; 397 CHECK( ec == p*N ); 398 for (size_t r = 0; r < num_receivers; ++r ) { 399 size_t c = receivers[r]->my_count; 400 // 3) the nodes will send to multiple successors. 401 CHECK( c == p*N ); 402 } 403 for (size_t r = 0; r < num_receivers; ++r ) { 404 tbb::flow::remove_edge( exe_node, *receivers[r] ); 405 } 406 } 407 } 408 } 409 410 template< typename InputType, typename OutputType > 411 void run_unlimited_concurrency() { 412 harness_graph_executor<InputType, OutputType>::max_executors = 0; 413 unlimited_concurrency<InputType,OutputType>( []( InputType i ) -> OutputType { return harness_graph_executor<InputType, OutputType>::func(i); } ); 414 unlimited_concurrency<InputType,OutputType>( &harness_graph_executor<InputType, OutputType>::func ); 415 unlimited_concurrency<InputType,OutputType>( typename harness_graph_executor<InputType, OutputType>::functor() ); 416 } 417 418 struct continue_msg_to_int { 419 int my_int; 420 continue_msg_to_int(int x) : my_int(x) {} 421 int operator()(tbb::flow::continue_msg) { return my_int; } 422 }; 423 424 void test_function_node_with_continue_msg_as_input() { 425 // If this function terminates, then this test is successful 426 tbb::flow::graph g; 427 428 tbb::flow::broadcast_node<tbb::flow::continue_msg> Start(g); 429 430 tbb::flow::function_node<tbb::flow::continue_msg, int, tbb::flow::rejecting> FN1( g, tbb::flow::serial, continue_msg_to_int(42)); 431 tbb::flow::function_node<tbb::flow::continue_msg, int, tbb::flow::rejecting> FN2( g, tbb::flow::serial, continue_msg_to_int(43)); 432 433 tbb::flow::make_edge( Start, FN1 ); 434 tbb::flow::make_edge( Start, FN2 ); 435 436 Start.try_put( tbb::flow::continue_msg() ); 437 g.wait_for_all(); 438 } 439 440 //! Tests limited concurrency cases for nodes that accept data messages 441 void test_concurrency(int num_threads) { 442 tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, num_threads); 443 run_concurrency_levels<int,int>(num_threads); 444 run_concurrency_levels<int,tbb::flow::continue_msg>(num_threads); 445 run_buffered_levels<int, int>(num_threads); 446 run_unlimited_concurrency<int,int>(); 447 run_unlimited_concurrency<int,empty_no_assign>(); 448 run_unlimited_concurrency<empty_no_assign,int>(); 449 run_unlimited_concurrency<empty_no_assign,empty_no_assign>(); 450 run_unlimited_concurrency<int,tbb::flow::continue_msg>(); 451 run_unlimited_concurrency<empty_no_assign,tbb::flow::continue_msg>(); 452 test_function_node_with_continue_msg_as_input(); 453 } 454 455 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 456 #include <array> 457 #include <vector> 458 void test_follows_and_precedes_api() { 459 using msg_t = tbb::flow::continue_msg; 460 461 std::array<msg_t, 3> messages_for_follows = { {msg_t(), msg_t(), msg_t()} }; 462 std::vector<msg_t> messages_for_precedes = { msg_t() }; 463 464 pass_through<msg_t> pass_msg; 465 466 follows_and_precedes_testing::test_follows 467 <msg_t, tbb::flow::function_node<msg_t, msg_t>> 468 (messages_for_follows, tbb::flow::unlimited, pass_msg); 469 follows_and_precedes_testing::test_precedes 470 <msg_t, tbb::flow::function_node<msg_t, msg_t>> 471 (messages_for_precedes, tbb::flow::unlimited, pass_msg, tbb::flow::node_priority_t(1)); 472 } 473 #endif 474 475 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 476 477 int function_body_f(const int&) { return 1; } 478 479 template <typename Body> 480 void test_deduction_guides_common(Body body) { 481 using namespace tbb::flow; 482 graph g; 483 484 function_node f1(g, unlimited, body); 485 static_assert(std::is_same_v<decltype(f1), function_node<int, int>>); 486 487 function_node f2(g, unlimited, body, rejecting()); 488 static_assert(std::is_same_v<decltype(f2), function_node<int, int, rejecting>>); 489 490 function_node f3(g, unlimited, body, node_priority_t(5)); 491 static_assert(std::is_same_v<decltype(f3), function_node<int, int>>); 492 493 function_node f4(g, unlimited, body, rejecting(), node_priority_t(5)); 494 static_assert(std::is_same_v<decltype(f4), function_node<int, int, rejecting>>); 495 496 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 497 function_node f5(follows(f2), unlimited, body); 498 static_assert(std::is_same_v<decltype(f5), function_node<int, int>>); 499 500 function_node f6(follows(f5), unlimited, body, rejecting()); 501 static_assert(std::is_same_v<decltype(f6), function_node<int, int, rejecting>>); 502 503 function_node f7(follows(f6), unlimited, body, node_priority_t(5)); 504 static_assert(std::is_same_v<decltype(f7), function_node<int, int>>); 505 506 function_node f8(follows(f7), unlimited, body, rejecting(), node_priority_t(5)); 507 static_assert(std::is_same_v<decltype(f8), function_node<int, int, rejecting>>); 508 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 509 510 function_node f9(f1); 511 static_assert(std::is_same_v<decltype(f9), function_node<int, int>>); 512 } 513 514 void test_deduction_guides() { 515 test_deduction_guides_common([](const int&)->int { return 1; }); 516 test_deduction_guides_common([](const int&) mutable ->int { return 1; }); 517 test_deduction_guides_common(function_body_f); 518 } 519 520 #endif 521 522 //! Test various node bodies with concurrency 523 //! \brief \ref error_guessing 524 TEST_CASE("Concurrency test") { 525 for(unsigned int p = utils::MinThread; p <= utils::MaxThread; ++p ) { 526 test_concurrency(p); 527 } 528 } 529 530 //! NativeParallelFor testing with various concurrency settings 531 //! \brief \ref error_guessing 532 TEST_CASE("Lightweight testing"){ 533 lightweight_testing::test<tbb::flow::function_node>(10); 534 } 535 536 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 537 //! Test follows and precedes API 538 //! \brief \ref error_guessing 539 TEST_CASE("Flowgraph node set test"){ 540 test_follows_and_precedes_api(); 541 } 542 #endif 543 544 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 545 //! Test decution guides 546 //! \brief \ref requirement 547 TEST_CASE("Deduction guides test"){ 548 test_deduction_guides(); 549 } 550 #endif 551 552 //! try_release and try_consume test 553 //! \brief \ref error_guessing 554 TEST_CASE("try_release try_consume"){ 555 tbb::flow::graph g; 556 557 tbb::flow::function_node<int, int> fn(g, tbb::flow::unlimited, [](const int&v){return v;}); 558 559 CHECK_MESSAGE((fn.try_release()==false), "try_release should initially return false on a node"); 560 CHECK_MESSAGE((fn.try_consume()==false), "try_consume should initially return false on a node"); 561 } 562