1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #if __INTEL_COMPILER && _MSC_VER 18 #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated 19 #endif 20 21 #include "common/config.h" 22 23 // TODO revamp: move parts dependent on __TBB_EXTRA_DEBUG into separate test(s) since having these 24 // parts in all of tests might make testing of the product, which is different from what is actually 25 // released. 26 #define __TBB_EXTRA_DEBUG 1 27 #include "tbb/flow_graph.h" 28 #include "tbb/spin_rw_mutex.h" 29 30 #include "common/test.h" 31 #include "common/utils.h" 32 #include "common/graph_utils.h" 33 #include "common/test_follows_and_precedes_api.h" 34 35 36 //! \file test_multifunction_node.cpp 37 //! \brief Test for [flow_graph.multifunction_node] specification 38 39 40 #if TBB_USE_DEBUG 41 #define N 16 42 #else 43 #define N 100 44 #endif 45 #define MAX_NODES 4 46 47 //! Performs test on function nodes with limited concurrency and buffering 48 /** These tests check: 49 1) that the number of executing copies never exceed the concurrency limit 50 2) that the node never rejects 51 3) that no items are lost 52 and 4) all of this happens even if there are multiple predecessors and successors 53 */ 54 55 //! exercise buffered multifunction_node. 56 template< typename InputType, typename OutputTuple, typename Body > 57 void buffered_levels( size_t concurrency, Body body ) { 58 typedef typename std::tuple_element<0,OutputTuple>::type OutputType; 59 // Do for lc = 1 to concurrency level 60 for ( size_t lc = 1; lc <= concurrency; ++lc ) { 61 tbb::flow::graph g; 62 63 // Set the execute_counter back to zero in the harness 64 harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count = 0; 65 // Set the number of current executors to zero. 66 harness_graph_multifunction_executor<InputType, OutputTuple>::current_executors = 0; 67 // Set the max allowed executors to lc. There is a check in the functor to make sure this is never exceeded. 68 harness_graph_multifunction_executor<InputType, OutputTuple>::max_executors = lc; 69 70 // Create the function_node with the appropriate concurrency level, and use default buffering 71 tbb::flow::multifunction_node< InputType, OutputTuple > exe_node( g, lc, body ); 72 73 //Create a vector of identical exe_nodes 74 std::vector< tbb::flow::multifunction_node< InputType, OutputTuple > > exe_vec(2, exe_node); 75 76 // exercise each of the copied nodes 77 for (size_t node_idx=0; node_idx<exe_vec.size(); ++node_idx) { 78 for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { 79 // Create num_receivers counting receivers and connect the exe_vec[node_idx] to them. 80 std::vector< std::shared_ptr<harness_mapped_receiver<OutputType>> > receivers; 81 for (size_t i = 0; i < num_receivers; i++) { 82 receivers.push_back( std::make_shared<harness_mapped_receiver<OutputType>>(g) ); 83 } 84 85 for (size_t r = 0; r < num_receivers; ++r ) { 86 tbb::flow::make_edge( tbb::flow::output_port<0>(exe_vec[node_idx]), *receivers[r] ); 87 } 88 89 // Do the test with varying numbers of senders 90 std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders; 91 for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) { 92 // Create num_senders senders, set their message limit each to N, and connect 93 // them to the exe_vec[node_idx] 94 senders.clear(); 95 for (size_t s = 0; s < num_senders; ++s ) { 96 senders.push_back( std::make_shared<harness_counting_sender<InputType>>() ); 97 senders.back()->my_limit = N; 98 tbb::flow::make_edge( *senders.back(), exe_vec[node_idx] ); 99 } 100 101 // Initialize the receivers so they know how many senders and messages to check for 102 for (size_t r = 0; r < num_receivers; ++r ) { 103 receivers[r]->initialize_map( N, num_senders ); 104 } 105 106 // Do the test 107 utils::NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) ); 108 g.wait_for_all(); 109 110 // confirm that each sender was requested from N times 111 for (size_t s = 0; s < num_senders; ++s ) { 112 size_t n = senders[s]->my_received; 113 CHECK_MESSAGE( n == N, "" ); 114 CHECK_MESSAGE( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_vec[node_idx], "" ); 115 } 116 // validate the receivers 117 for (size_t r = 0; r < num_receivers; ++r ) { 118 receivers[r]->validate(); 119 } 120 } 121 for (size_t r = 0; r < num_receivers; ++r ) { 122 tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_vec[node_idx]), *receivers[r] ); 123 } 124 CHECK_MESSAGE( exe_vec[node_idx].try_put( InputType() ) == true, "" ); 125 g.wait_for_all(); 126 for (size_t r = 0; r < num_receivers; ++r ) { 127 // since it's detached, nothing should have changed 128 receivers[r]->validate(); 129 } 130 } 131 } 132 } 133 } 134 135 const size_t Offset = 123; 136 std::atomic<size_t> global_execute_count; 137 138 struct inc_functor { 139 140 std::atomic<size_t> local_execute_count; 141 inc_functor( ) { local_execute_count = 0; } 142 inc_functor( const inc_functor &f ) { local_execute_count = size_t(f.local_execute_count); } 143 144 template<typename output_ports_type> 145 void operator()( int i, output_ports_type &p ) { 146 ++global_execute_count; 147 ++local_execute_count; 148 (void)std::get<0>(p).try_put(i); 149 } 150 151 }; 152 153 template< typename InputType, typename OutputTuple > 154 void buffered_levels_with_copy( size_t concurrency ) { 155 typedef typename std::tuple_element<0,OutputTuple>::type OutputType; 156 // Do for lc = 1 to concurrency level 157 for ( size_t lc = 1; lc <= concurrency; ++lc ) { 158 tbb::flow::graph g; 159 160 inc_functor cf; 161 cf.local_execute_count = Offset; 162 global_execute_count = Offset; 163 164 tbb::flow::multifunction_node< InputType, OutputTuple > exe_node( g, lc, cf ); 165 166 for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { 167 168 std::vector< std::shared_ptr<harness_mapped_receiver<OutputType>> > receivers; 169 for (size_t i = 0; i < num_receivers; i++) { 170 receivers.push_back( std::make_shared<harness_mapped_receiver<OutputType>>(g) ); 171 } 172 173 for (size_t r = 0; r < num_receivers; ++r ) { 174 tbb::flow::make_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] ); 175 } 176 177 std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders; 178 for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) { 179 senders.clear(); 180 for (size_t s = 0; s < num_senders; ++s ) { 181 senders.push_back( std::make_shared<harness_counting_sender<InputType>>() ); 182 senders.back()->my_limit = N; 183 tbb::flow::make_edge( *senders.back(), exe_node ); 184 } 185 186 for (size_t r = 0; r < num_receivers; ++r ) { 187 receivers[r]->initialize_map( N, num_senders ); 188 } 189 190 utils::NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) ); 191 g.wait_for_all(); 192 193 for (size_t s = 0; s < num_senders; ++s ) { 194 size_t n = senders[s]->my_received; 195 CHECK_MESSAGE( n == N, "" ); 196 CHECK_MESSAGE( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_node, "" ); 197 } 198 for (size_t r = 0; r < num_receivers; ++r ) { 199 receivers[r]->validate(); 200 } 201 } 202 for (size_t r = 0; r < num_receivers; ++r ) { 203 tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] ); 204 } 205 CHECK_MESSAGE( exe_node.try_put( InputType() ) == true, "" ); 206 g.wait_for_all(); 207 for (size_t r = 0; r < num_receivers; ++r ) { 208 receivers[r]->validate(); 209 } 210 } 211 212 // validate that the local body matches the global execute_count and both are correct 213 inc_functor body_copy = tbb::flow::copy_body<inc_functor>( exe_node ); 214 const size_t expected_count = N/2 * MAX_NODES * MAX_NODES * ( MAX_NODES + 1 ) + MAX_NODES + Offset; 215 size_t global_count = global_execute_count; 216 size_t inc_count = body_copy.local_execute_count; 217 CHECK_MESSAGE( (global_count == expected_count && global_count == inc_count), "" ); 218 } 219 } 220 221 template< typename InputType, typename OutputTuple > 222 void run_buffered_levels( int c ) { 223 typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type; 224 buffered_levels<InputType,OutputTuple>( c, []( InputType i, output_ports_type &p ) { harness_graph_multifunction_executor<InputType, OutputTuple>::func(i,p); } ); 225 buffered_levels<InputType,OutputTuple>( c, &harness_graph_multifunction_executor<InputType, OutputTuple>::func ); 226 buffered_levels<InputType,OutputTuple>( c, typename harness_graph_multifunction_executor<InputType, OutputTuple>::functor() ); 227 buffered_levels_with_copy<InputType,OutputTuple>( c ); 228 } 229 230 231 //! Performs test on executable nodes with limited concurrency 232 /** These tests check: 233 1) that the nodes will accepts puts up to the concurrency limit, 234 2) the nodes do not exceed the concurrency limit even when run with more threads (this is checked in the harness_graph_executor), 235 3) the nodes will receive puts from multiple successors simultaneously, 236 and 4) the nodes will send to multiple predecessors. 237 There is no checking of the contents of the messages for corruption. 238 */ 239 240 template< typename InputType, typename OutputTuple, typename Body > 241 void concurrency_levels( size_t concurrency, Body body ) { 242 typedef typename std::tuple_element<0,OutputTuple>::type OutputType; 243 for ( size_t lc = 1; lc <= concurrency; ++lc ) { 244 tbb::flow::graph g; 245 246 // Set the execute_counter back to zero in the harness 247 harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count = 0; 248 // Set the number of current executors to zero. 249 harness_graph_multifunction_executor<InputType, OutputTuple>::current_executors = 0; 250 // Set the max allowed executors to lc. There is a check in the functor to make sure this is never exceeded. 251 harness_graph_multifunction_executor<InputType, OutputTuple>::max_executors = lc; 252 253 254 tbb::flow::multifunction_node< InputType, OutputTuple, tbb::flow::rejecting > exe_node( g, lc, body ); 255 256 for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { 257 258 std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers; 259 for (size_t i = 0; i < num_receivers; ++i) { 260 receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) ); 261 } 262 263 for (size_t r = 0; r < num_receivers; ++r ) { 264 tbb::flow::make_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] ); 265 } 266 267 std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders; 268 269 for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) { 270 { 271 // Exclusively lock m to prevent exe_node from finishing 272 tbb::spin_rw_mutex::scoped_lock l( 273 harness_graph_multifunction_executor< InputType, OutputTuple>::template mutex_holder<tbb::spin_rw_mutex>::mutex 274 ); 275 276 // put to lc level, it will accept and then block at m 277 for ( size_t c = 0 ; c < lc ; ++c ) { 278 CHECK_MESSAGE( exe_node.try_put( InputType() ) == true, "" ); 279 } 280 // it only accepts to lc level 281 CHECK_MESSAGE( exe_node.try_put( InputType() ) == false, "" ); 282 283 senders.clear(); 284 for (size_t s = 0; s < num_senders; ++s ) { 285 senders.push_back( std::make_shared<harness_counting_sender<InputType>>() ); 286 senders.back()->my_limit = N; 287 exe_node.register_predecessor( *senders.back() ); 288 } 289 290 } // release lock at end of scope, setting the exe node free to continue 291 // wait for graph to settle down 292 g.wait_for_all(); 293 294 // confirm that each sender was requested from N times 295 for (size_t s = 0; s < num_senders; ++s ) { 296 size_t n = senders[s]->my_received; 297 CHECK_MESSAGE( n == N, "" ); 298 CHECK_MESSAGE( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_node, "" ); 299 } 300 // confirm that each receivers got N * num_senders + the initial lc puts 301 for (size_t r = 0; r < num_receivers; ++r ) { 302 size_t n = receivers[r]->my_count; 303 CHECK_MESSAGE( n == num_senders*N+lc, "" ); 304 receivers[r]->my_count = 0; 305 } 306 } 307 for (size_t r = 0; r < num_receivers; ++r ) { 308 tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] ); 309 } 310 CHECK_MESSAGE( exe_node.try_put( InputType() ) == true, "" ); 311 g.wait_for_all(); 312 for (size_t r = 0; r < num_receivers; ++r ) { 313 CHECK_MESSAGE( int(receivers[r]->my_count) == 0, "" ); 314 } 315 } 316 } 317 } 318 319 template< typename InputType, typename OutputTuple > 320 void run_concurrency_levels( int c ) { 321 typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type; 322 concurrency_levels<InputType,OutputTuple>( c, []( InputType i, output_ports_type &p ) { harness_graph_multifunction_executor<InputType, OutputTuple>::template tfunc<tbb::spin_rw_mutex>(i,p); } ); 323 concurrency_levels<InputType,OutputTuple>( c, &harness_graph_multifunction_executor<InputType, OutputTuple>::template tfunc<tbb::spin_rw_mutex> ); 324 concurrency_levels<InputType,OutputTuple>( c, typename harness_graph_multifunction_executor<InputType, OutputTuple>::template tfunctor<tbb::spin_rw_mutex>() ); 325 } 326 327 328 struct empty_no_assign { 329 empty_no_assign() {} 330 empty_no_assign( int ) {} 331 operator int() { return 0; } 332 operator int() const { return 0; } 333 }; 334 335 template< typename InputType > 336 struct parallel_puts : private utils::NoAssign { 337 338 tbb::flow::receiver< InputType > * const my_exe_node; 339 340 parallel_puts( tbb::flow::receiver< InputType > &exe_node ) : my_exe_node(&exe_node) {} 341 342 void operator()( int ) const { 343 for ( int i = 0; i < N; ++i ) { 344 // the nodes will accept all puts 345 CHECK_MESSAGE( my_exe_node->try_put( InputType() ) == true, "" ); 346 } 347 } 348 349 }; 350 351 //! Performs test on executable nodes with unlimited concurrency 352 /** These tests check: 353 1) that the nodes will accept all puts 354 2) the nodes will receive puts from multiple predecessors simultaneously, 355 and 3) the nodes will send to multiple successors. 356 There is no checking of the contents of the messages for corruption. 357 */ 358 359 template< typename InputType, typename OutputTuple, typename Body > 360 void unlimited_concurrency( Body body ) { 361 typedef typename std::tuple_element<0,OutputTuple>::type OutputType; 362 363 for (unsigned int p = 1; p < 2*utils::MaxThread; ++p) { 364 tbb::flow::graph g; 365 tbb::flow::multifunction_node< InputType, OutputTuple, tbb::flow::rejecting > exe_node( g, tbb::flow::unlimited, body ); 366 367 for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { 368 std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers; 369 for (size_t i = 0; i < num_receivers; ++i) { 370 receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) ); 371 } 372 373 harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count = 0; 374 375 for (size_t r = 0; r < num_receivers; ++r ) { 376 tbb::flow::make_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] ); 377 } 378 379 utils::NativeParallelFor( p, parallel_puts<InputType>(exe_node) ); 380 g.wait_for_all(); 381 382 // 2) the nodes will receive puts from multiple predecessors simultaneously, 383 size_t ec = harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count; 384 CHECK_MESSAGE( (unsigned int)ec == p*N, "" ); 385 for (size_t r = 0; r < num_receivers; ++r ) { 386 size_t c = receivers[r]->my_count; 387 // 3) the nodes will send to multiple successors. 388 CHECK_MESSAGE( (unsigned int)c == p*N, "" ); 389 } 390 for (size_t r = 0; r < num_receivers; ++r ) { 391 tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] ); 392 } 393 } 394 } 395 } 396 397 template< typename InputType, typename OutputTuple > 398 void run_unlimited_concurrency() { 399 harness_graph_multifunction_executor<InputType, OutputTuple>::max_executors = 0; 400 typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type; 401 unlimited_concurrency<InputType,OutputTuple>( []( InputType i, output_ports_type &p ) { harness_graph_multifunction_executor<InputType, OutputTuple>::func(i,p); } ); 402 unlimited_concurrency<InputType,OutputTuple>( &harness_graph_multifunction_executor<InputType, OutputTuple>::func ); 403 unlimited_concurrency<InputType,OutputTuple>( typename harness_graph_multifunction_executor<InputType, OutputTuple>::functor() ); 404 } 405 406 template<typename InputType, typename OutputTuple> 407 struct oddEvenBody { 408 typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type; 409 typedef typename std::tuple_element<0,OutputTuple>::type EvenType; 410 typedef typename std::tuple_element<1,OutputTuple>::type OddType; 411 void operator() (const InputType &i, output_ports_type &p) { 412 if((int)i % 2) { 413 (void)std::get<1>(p).try_put(OddType(i)); 414 } 415 else { 416 (void)std::get<0>(p).try_put(EvenType(i)); 417 } 418 } 419 }; 420 421 template<typename InputType, typename OutputTuple > 422 void run_multiport_test(int num_threads) { 423 typedef typename tbb::flow::multifunction_node<InputType, OutputTuple> mo_node_type; 424 typedef typename std::tuple_element<0,OutputTuple>::type EvenType; 425 typedef typename std::tuple_element<1,OutputTuple>::type OddType; 426 tbb::task_arena arena(num_threads); 427 arena.execute( 428 [&] () { 429 tbb::flow::graph g; 430 mo_node_type mo_node(g, tbb::flow::unlimited, oddEvenBody<InputType, OutputTuple>() ); 431 432 tbb::flow::queue_node<EvenType> q0(g); 433 tbb::flow::queue_node<OddType> q1(g); 434 435 tbb::flow::make_edge(tbb::flow::output_port<0>(mo_node), q0); 436 tbb::flow::make_edge(tbb::flow::output_port<1>(mo_node), q1); 437 438 for(InputType i = 0; i < N; ++i) { 439 mo_node.try_put(i); 440 } 441 442 g.wait_for_all(); 443 for(int i = 0; i < N/2; ++i) { 444 EvenType e{}; 445 OddType o{}; 446 CHECK_MESSAGE( q0.try_get(e), "" ); 447 CHECK_MESSAGE( (int)e % 2 == 0, "" ); 448 CHECK_MESSAGE( q1.try_get(o), "" ); 449 CHECK_MESSAGE( (int)o % 2 == 1, "" ); 450 } 451 } 452 ); 453 } 454 455 //! Tests limited concurrency cases for nodes that accept data messages 456 void test_concurrency(int num_threads) { 457 tbb::task_arena arena(num_threads); 458 arena.execute( 459 [&] () { 460 run_concurrency_levels<int,std::tuple<int> >(num_threads); 461 run_concurrency_levels<int,std::tuple<tbb::flow::continue_msg> >(num_threads); 462 run_buffered_levels<int, std::tuple<int> >(num_threads); 463 run_unlimited_concurrency<int, std::tuple<int> >(); 464 run_unlimited_concurrency<int,std::tuple<empty_no_assign> >(); 465 run_unlimited_concurrency<empty_no_assign,std::tuple<int> >(); 466 run_unlimited_concurrency<empty_no_assign,std::tuple<empty_no_assign> >(); 467 run_unlimited_concurrency<int,std::tuple<tbb::flow::continue_msg> >(); 468 run_unlimited_concurrency<empty_no_assign,std::tuple<tbb::flow::continue_msg> >(); 469 run_multiport_test<int, std::tuple<int, int> >(num_threads); 470 run_multiport_test<float, std::tuple<int, double> >(num_threads); 471 } 472 ); 473 } 474 475 template<typename Policy> 476 void test_ports_return_references() { 477 tbb::flow::graph g; 478 typedef int InputType; 479 typedef std::tuple<int> OutputTuple; 480 tbb::flow::multifunction_node<InputType, OutputTuple, Policy> mf_node( 481 g, tbb::flow::unlimited, 482 &harness_graph_multifunction_executor<InputType, OutputTuple>::empty_func ); 483 test_output_ports_return_ref(mf_node); 484 } 485 486 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 487 #include <array> 488 #include <vector> 489 490 void test_precedes() { 491 using namespace tbb::flow; 492 493 using multinode = multifunction_node<int, std::tuple<int, int>>; 494 495 graph g; 496 497 buffer_node<int> b1(g); 498 buffer_node<int> b2(g); 499 500 multinode node(precedes(b1, b2), unlimited, [](const int& i, multinode::output_ports_type& op) -> void { 501 if (i % 2) 502 std::get<0>(op).try_put(i); 503 else 504 std::get<1>(op).try_put(i); 505 } 506 ); 507 508 node.try_put(0); 509 node.try_put(1); 510 g.wait_for_all(); 511 512 int storage; 513 CHECK_MESSAGE((b1.try_get(storage) && !b1.try_get(storage) && b2.try_get(storage) && !b2.try_get(storage)), 514 "Not exact edge quantity was made"); 515 } 516 517 void test_follows_and_precedes_api() { 518 using multinode = tbb::flow::multifunction_node<int, std::tuple<int, int, int>>; 519 520 std::array<int, 3> messages_for_follows = { {0, 1, 2} }; 521 522 follows_and_precedes_testing::test_follows 523 <int, tbb::flow::multifunction_node<int, std::tuple<int, int, int>>> 524 (messages_for_follows, tbb::flow::unlimited, [](const int& i, multinode::output_ports_type& op) -> void { 525 std::get<0>(op).try_put(i); 526 }); 527 528 test_precedes(); 529 } 530 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 531 532 //! Test various node bodies with concurrency 533 //! \brief \ref error_guessing 534 TEST_CASE("Concurrency test"){ 535 for( unsigned int p=utils::MinThread; p<=utils::MaxThread; ++p ) { 536 test_concurrency(p); 537 } 538 } 539 540 //! Test return types of ports 541 //! \brief \ref error_guessing 542 TEST_CASE("Test ports retrurn references"){ 543 test_ports_return_references<tbb::flow::queueing>(); 544 test_ports_return_references<tbb::flow::rejecting>(); 545 } 546 547 //! NativeParallelFor testing with various concurrency settings 548 //! \brief \ref error_guessing 549 TEST_CASE("Lightweight testing"){ 550 lightweight_testing::test<tbb::flow::multifunction_node>(10); 551 } 552 553 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 554 //! Test follows and precedes API 555 //! \brief \ref error_guessing 556 TEST_CASE("Test follows-precedes API"){ 557 test_follows_and_precedes_api(); 558 } 559 //! Test priority constructor with follows and precedes API 560 //! \brief \ref error_guessing 561 TEST_CASE("Test priority with follows and precedes"){ 562 using namespace tbb::flow; 563 564 using multinode = multifunction_node<int, std::tuple<int, int>>; 565 566 graph g; 567 568 buffer_node<int> b1(g); 569 buffer_node<int> b2(g); 570 571 multinode node(precedes(b1, b2), unlimited, [](const int& i, multinode::output_ports_type& op) -> void { 572 if (i % 2) 573 std::get<0>(op).try_put(i); 574 else 575 std::get<1>(op).try_put(i); 576 } 577 , node_priority_t(0)); 578 579 node.try_put(0); 580 node.try_put(1); 581 g.wait_for_all(); 582 583 int storage; 584 CHECK_MESSAGE((b1.try_get(storage) && !b1.try_get(storage) && b2.try_get(storage) && !b2.try_get(storage)), 585 "Not exact edge quantity was made"); 586 } 587 588 #endif 589 590