1 /* 2 Copyright (c) 2005-2020 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "common/config.h" 18 19 // TODO revamp: move parts dependent on __TBB_EXTRA_DEBUG into separate test(s) since having these 20 // parts in all of tests might make testing of the product, which is different from what is actually 21 // released. 22 #define __TBB_EXTRA_DEBUG 1 23 #include "tbb/flow_graph.h" 24 #include "tbb/spin_rw_mutex.h" 25 26 #include "common/test.h" 27 #include "common/utils.h" 28 #include "common/graph_utils.h" 29 #include "common/test_follows_and_precedes_api.h" 30 31 32 //! \file test_multifunction_node.cpp 33 //! \brief Test for [flow_graph.multifunction_node] specification 34 35 36 #if TBB_USE_DEBUG 37 #define N 16 38 #else 39 #define N 100 40 #endif 41 #define MAX_NODES 4 42 43 //! Performs test on function nodes with limited concurrency and buffering 44 /** These tests check: 45 1) that the number of executing copies never exceed the concurrency limit 46 2) that the node never rejects 47 3) that no items are lost 48 and 4) all of this happens even if there are multiple predecessors and successors 49 */ 50 51 //! exercise buffered multifunction_node. 52 template< typename InputType, typename OutputTuple, typename Body > 53 void buffered_levels( size_t concurrency, Body body ) { 54 typedef typename std::tuple_element<0,OutputTuple>::type OutputType; 55 // Do for lc = 1 to concurrency level 56 for ( size_t lc = 1; lc <= concurrency; ++lc ) { 57 tbb::flow::graph g; 58 59 // Set the execute_counter back to zero in the harness 60 harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count = 0; 61 // Set the number of current executors to zero. 62 harness_graph_multifunction_executor<InputType, OutputTuple>::current_executors = 0; 63 // Set the max allowed executors to lc. There is a check in the functor to make sure this is never exceeded. 64 harness_graph_multifunction_executor<InputType, OutputTuple>::max_executors = lc; 65 66 // Create the function_node with the appropriate concurrency level, and use default buffering 67 tbb::flow::multifunction_node< InputType, OutputTuple > exe_node( g, lc, body ); 68 69 //Create a vector of identical exe_nodes 70 std::vector< tbb::flow::multifunction_node< InputType, OutputTuple > > exe_vec(2, exe_node); 71 72 // exercise each of the copied nodes 73 for (size_t node_idx=0; node_idx<exe_vec.size(); ++node_idx) { 74 for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { 75 // Create num_receivers counting receivers and connect the exe_vec[node_idx] to them. 76 std::vector< std::shared_ptr<harness_mapped_receiver<OutputType>> > receivers; 77 for (size_t i = 0; i < num_receivers; i++) { 78 receivers.push_back( std::make_shared<harness_mapped_receiver<OutputType>>(g) ); 79 } 80 81 for (size_t r = 0; r < num_receivers; ++r ) { 82 tbb::flow::make_edge( tbb::flow::output_port<0>(exe_vec[node_idx]), *receivers[r] ); 83 } 84 85 // Do the test with varying numbers of senders 86 std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders; 87 for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) { 88 // Create num_senders senders, set their message limit each to N, and connect 89 // them to the exe_vec[node_idx] 90 senders.clear(); 91 for (size_t s = 0; s < num_senders; ++s ) { 92 senders.push_back( std::make_shared<harness_counting_sender<InputType>>() ); 93 senders.back()->my_limit = N; 94 tbb::flow::make_edge( *senders.back(), exe_vec[node_idx] ); 95 } 96 97 // Initialize the receivers so they know how many senders and messages to check for 98 for (size_t r = 0; r < num_receivers; ++r ) { 99 receivers[r]->initialize_map( N, num_senders ); 100 } 101 102 // Do the test 103 utils::NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) ); 104 g.wait_for_all(); 105 106 // confirm that each sender was requested from N times 107 for (size_t s = 0; s < num_senders; ++s ) { 108 size_t n = senders[s]->my_received; 109 CHECK_MESSAGE( n == N, "" ); 110 CHECK_MESSAGE( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_vec[node_idx], "" ); 111 } 112 // validate the receivers 113 for (size_t r = 0; r < num_receivers; ++r ) { 114 receivers[r]->validate(); 115 } 116 } 117 for (size_t r = 0; r < num_receivers; ++r ) { 118 tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_vec[node_idx]), *receivers[r] ); 119 } 120 CHECK_MESSAGE( exe_vec[node_idx].try_put( InputType() ) == true, "" ); 121 g.wait_for_all(); 122 for (size_t r = 0; r < num_receivers; ++r ) { 123 // since it's detached, nothing should have changed 124 receivers[r]->validate(); 125 } 126 } 127 } 128 } 129 } 130 131 const size_t Offset = 123; 132 std::atomic<size_t> global_execute_count; 133 134 struct inc_functor { 135 136 std::atomic<size_t> local_execute_count; 137 inc_functor( ) { local_execute_count = 0; } 138 inc_functor( const inc_functor &f ) { local_execute_count = size_t(f.local_execute_count); } 139 140 template<typename output_ports_type> 141 void operator()( int i, output_ports_type &p ) { 142 ++global_execute_count; 143 ++local_execute_count; 144 (void)std::get<0>(p).try_put(i); 145 } 146 147 }; 148 149 template< typename InputType, typename OutputTuple > 150 void buffered_levels_with_copy( size_t concurrency ) { 151 typedef typename std::tuple_element<0,OutputTuple>::type OutputType; 152 // Do for lc = 1 to concurrency level 153 for ( size_t lc = 1; lc <= concurrency; ++lc ) { 154 tbb::flow::graph g; 155 156 inc_functor cf; 157 cf.local_execute_count = Offset; 158 global_execute_count = Offset; 159 160 tbb::flow::multifunction_node< InputType, OutputTuple > exe_node( g, lc, cf ); 161 162 for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { 163 164 std::vector< std::shared_ptr<harness_mapped_receiver<OutputType>> > receivers; 165 for (size_t i = 0; i < num_receivers; i++) { 166 receivers.push_back( std::make_shared<harness_mapped_receiver<OutputType>>(g) ); 167 } 168 169 for (size_t r = 0; r < num_receivers; ++r ) { 170 tbb::flow::make_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] ); 171 } 172 173 std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders; 174 for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) { 175 senders.clear(); 176 for (size_t s = 0; s < num_senders; ++s ) { 177 senders.push_back( std::make_shared<harness_counting_sender<InputType>>() ); 178 senders.back()->my_limit = N; 179 tbb::flow::make_edge( *senders.back(), exe_node ); 180 } 181 182 for (size_t r = 0; r < num_receivers; ++r ) { 183 receivers[r]->initialize_map( N, num_senders ); 184 } 185 186 utils::NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) ); 187 g.wait_for_all(); 188 189 for (size_t s = 0; s < num_senders; ++s ) { 190 size_t n = senders[s]->my_received; 191 CHECK_MESSAGE( n == N, "" ); 192 CHECK_MESSAGE( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_node, "" ); 193 } 194 for (size_t r = 0; r < num_receivers; ++r ) { 195 receivers[r]->validate(); 196 } 197 } 198 for (size_t r = 0; r < num_receivers; ++r ) { 199 tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] ); 200 } 201 CHECK_MESSAGE( exe_node.try_put( InputType() ) == true, "" ); 202 g.wait_for_all(); 203 for (size_t r = 0; r < num_receivers; ++r ) { 204 receivers[r]->validate(); 205 } 206 } 207 208 // validate that the local body matches the global execute_count and both are correct 209 inc_functor body_copy = tbb::flow::copy_body<inc_functor>( exe_node ); 210 const size_t expected_count = N/2 * MAX_NODES * MAX_NODES * ( MAX_NODES + 1 ) + MAX_NODES + Offset; 211 size_t global_count = global_execute_count; 212 size_t inc_count = body_copy.local_execute_count; 213 CHECK_MESSAGE( (global_count == expected_count && global_count == inc_count), "" ); 214 } 215 } 216 217 template< typename InputType, typename OutputTuple > 218 void run_buffered_levels( int c ) { 219 typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type; 220 buffered_levels<InputType,OutputTuple>( c, []( InputType i, output_ports_type &p ) { harness_graph_multifunction_executor<InputType, OutputTuple>::func(i,p); } ); 221 buffered_levels<InputType,OutputTuple>( c, &harness_graph_multifunction_executor<InputType, OutputTuple>::func ); 222 buffered_levels<InputType,OutputTuple>( c, typename harness_graph_multifunction_executor<InputType, OutputTuple>::functor() ); 223 buffered_levels_with_copy<InputType,OutputTuple>( c ); 224 } 225 226 227 //! Performs test on executable nodes with limited concurrency 228 /** These tests check: 229 1) that the nodes will accepts puts up to the concurrency limit, 230 2) the nodes do not exceed the concurrency limit even when run with more threads (this is checked in the harness_graph_executor), 231 3) the nodes will receive puts from multiple successors simultaneously, 232 and 4) the nodes will send to multiple predecessors. 233 There is no checking of the contents of the messages for corruption. 234 */ 235 236 template< typename InputType, typename OutputTuple, typename Body > 237 void concurrency_levels( size_t concurrency, Body body ) { 238 typedef typename std::tuple_element<0,OutputTuple>::type OutputType; 239 for ( size_t lc = 1; lc <= concurrency; ++lc ) { 240 tbb::flow::graph g; 241 242 // Set the execute_counter back to zero in the harness 243 harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count = 0; 244 // Set the number of current executors to zero. 245 harness_graph_multifunction_executor<InputType, OutputTuple>::current_executors = 0; 246 // Set the max allowed executors to lc. There is a check in the functor to make sure this is never exceeded. 247 harness_graph_multifunction_executor<InputType, OutputTuple>::max_executors = lc; 248 249 250 tbb::flow::multifunction_node< InputType, OutputTuple, tbb::flow::rejecting > exe_node( g, lc, body ); 251 252 for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { 253 254 std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers; 255 for (size_t i = 0; i < num_receivers; ++i) { 256 receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) ); 257 } 258 259 for (size_t r = 0; r < num_receivers; ++r ) { 260 tbb::flow::make_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] ); 261 } 262 263 std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders; 264 265 for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) { 266 { 267 // Exclusively lock m to prevent exe_node from finishing 268 tbb::spin_rw_mutex::scoped_lock l( 269 harness_graph_multifunction_executor< InputType, OutputTuple>::template mutex_holder<tbb::spin_rw_mutex>::mutex 270 ); 271 272 // put to lc level, it will accept and then block at m 273 for ( size_t c = 0 ; c < lc ; ++c ) { 274 CHECK_MESSAGE( exe_node.try_put( InputType() ) == true, "" ); 275 } 276 // it only accepts to lc level 277 CHECK_MESSAGE( exe_node.try_put( InputType() ) == false, "" ); 278 279 senders.clear(); 280 for (size_t s = 0; s < num_senders; ++s ) { 281 senders.push_back( std::make_shared<harness_counting_sender<InputType>>() ); 282 senders.back()->my_limit = N; 283 exe_node.register_predecessor( *senders.back() ); 284 } 285 286 } // release lock at end of scope, setting the exe node free to continue 287 // wait for graph to settle down 288 g.wait_for_all(); 289 290 // confirm that each sender was requested from N times 291 for (size_t s = 0; s < num_senders; ++s ) { 292 size_t n = senders[s]->my_received; 293 CHECK_MESSAGE( n == N, "" ); 294 CHECK_MESSAGE( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_node, "" ); 295 } 296 // confirm that each receivers got N * num_senders + the initial lc puts 297 for (size_t r = 0; r < num_receivers; ++r ) { 298 size_t n = receivers[r]->my_count; 299 CHECK_MESSAGE( n == num_senders*N+lc, "" ); 300 receivers[r]->my_count = 0; 301 } 302 } 303 for (size_t r = 0; r < num_receivers; ++r ) { 304 tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] ); 305 } 306 CHECK_MESSAGE( exe_node.try_put( InputType() ) == true, "" ); 307 g.wait_for_all(); 308 for (size_t r = 0; r < num_receivers; ++r ) { 309 CHECK_MESSAGE( int(receivers[r]->my_count) == 0, "" ); 310 } 311 } 312 } 313 } 314 315 template< typename InputType, typename OutputTuple > 316 void run_concurrency_levels( int c ) { 317 typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type; 318 concurrency_levels<InputType,OutputTuple>( c, []( InputType i, output_ports_type &p ) { harness_graph_multifunction_executor<InputType, OutputTuple>::template tfunc<tbb::spin_rw_mutex>(i,p); } ); 319 concurrency_levels<InputType,OutputTuple>( c, &harness_graph_multifunction_executor<InputType, OutputTuple>::template tfunc<tbb::spin_rw_mutex> ); 320 concurrency_levels<InputType,OutputTuple>( c, typename harness_graph_multifunction_executor<InputType, OutputTuple>::template tfunctor<tbb::spin_rw_mutex>() ); 321 } 322 323 324 struct empty_no_assign { 325 empty_no_assign() {} 326 empty_no_assign( int ) {} 327 operator int() { return 0; } 328 operator int() const { return 0; } 329 }; 330 331 template< typename InputType > 332 struct parallel_puts : private utils::NoAssign { 333 334 tbb::flow::receiver< InputType > * const my_exe_node; 335 336 parallel_puts( tbb::flow::receiver< InputType > &exe_node ) : my_exe_node(&exe_node) {} 337 338 void operator()( int ) const { 339 for ( int i = 0; i < N; ++i ) { 340 // the nodes will accept all puts 341 CHECK_MESSAGE( my_exe_node->try_put( InputType() ) == true, "" ); 342 } 343 } 344 345 }; 346 347 //! Performs test on executable nodes with unlimited concurrency 348 /** These tests check: 349 1) that the nodes will accept all puts 350 2) the nodes will receive puts from multiple predecessors simultaneously, 351 and 3) the nodes will send to multiple successors. 352 There is no checking of the contents of the messages for corruption. 353 */ 354 355 template< typename InputType, typename OutputTuple, typename Body > 356 void unlimited_concurrency( Body body ) { 357 typedef typename std::tuple_element<0,OutputTuple>::type OutputType; 358 359 for (unsigned int p = 1; p < 2*utils::MaxThread; ++p) { 360 tbb::flow::graph g; 361 tbb::flow::multifunction_node< InputType, OutputTuple, tbb::flow::rejecting > exe_node( g, tbb::flow::unlimited, body ); 362 363 for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { 364 std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers; 365 for (size_t i = 0; i < num_receivers; ++i) { 366 receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) ); 367 } 368 369 harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count = 0; 370 371 for (size_t r = 0; r < num_receivers; ++r ) { 372 tbb::flow::make_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] ); 373 } 374 375 utils::NativeParallelFor( p, parallel_puts<InputType>(exe_node) ); 376 g.wait_for_all(); 377 378 // 2) the nodes will receive puts from multiple predecessors simultaneously, 379 size_t ec = harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count; 380 CHECK_MESSAGE( (unsigned int)ec == p*N, "" ); 381 for (size_t r = 0; r < num_receivers; ++r ) { 382 size_t c = receivers[r]->my_count; 383 // 3) the nodes will send to multiple successors. 384 CHECK_MESSAGE( (unsigned int)c == p*N, "" ); 385 } 386 for (size_t r = 0; r < num_receivers; ++r ) { 387 tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] ); 388 } 389 } 390 } 391 } 392 393 template< typename InputType, typename OutputTuple > 394 void run_unlimited_concurrency() { 395 harness_graph_multifunction_executor<InputType, OutputTuple>::max_executors = 0; 396 typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type; 397 unlimited_concurrency<InputType,OutputTuple>( []( InputType i, output_ports_type &p ) { harness_graph_multifunction_executor<InputType, OutputTuple>::func(i,p); } ); 398 unlimited_concurrency<InputType,OutputTuple>( &harness_graph_multifunction_executor<InputType, OutputTuple>::func ); 399 unlimited_concurrency<InputType,OutputTuple>( typename harness_graph_multifunction_executor<InputType, OutputTuple>::functor() ); 400 } 401 402 template<typename InputType, typename OutputTuple> 403 struct oddEvenBody { 404 typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type; 405 typedef typename std::tuple_element<0,OutputTuple>::type EvenType; 406 typedef typename std::tuple_element<1,OutputTuple>::type OddType; 407 void operator() (const InputType &i, output_ports_type &p) { 408 if((int)i % 2) { 409 (void)std::get<1>(p).try_put(OddType(i)); 410 } 411 else { 412 (void)std::get<0>(p).try_put(EvenType(i)); 413 } 414 } 415 }; 416 417 template<typename InputType, typename OutputTuple > 418 void run_multiport_test(int num_threads) { 419 typedef typename tbb::flow::multifunction_node<InputType, OutputTuple> mo_node_type; 420 typedef typename std::tuple_element<0,OutputTuple>::type EvenType; 421 typedef typename std::tuple_element<1,OutputTuple>::type OddType; 422 tbb::task_arena arena(num_threads); 423 arena.execute( 424 [&] () { 425 tbb::flow::graph g; 426 mo_node_type mo_node(g, tbb::flow::unlimited, oddEvenBody<InputType, OutputTuple>() ); 427 428 tbb::flow::queue_node<EvenType> q0(g); 429 tbb::flow::queue_node<OddType> q1(g); 430 431 tbb::flow::make_edge(tbb::flow::output_port<0>(mo_node), q0); 432 tbb::flow::make_edge(tbb::flow::output_port<1>(mo_node), q1); 433 434 for(InputType i = 0; i < N; ++i) { 435 mo_node.try_put(i); 436 } 437 438 g.wait_for_all(); 439 for(int i = 0; i < N/2; ++i) { 440 EvenType e{}; 441 OddType o{}; 442 CHECK_MESSAGE( q0.try_get(e), "" ); 443 CHECK_MESSAGE( (int)e % 2 == 0, "" ); 444 CHECK_MESSAGE( q1.try_get(o), "" ); 445 CHECK_MESSAGE( (int)o % 2 == 1, "" ); 446 } 447 } 448 ); 449 } 450 451 //! Tests limited concurrency cases for nodes that accept data messages 452 void test_concurrency(int num_threads) { 453 tbb::task_arena arena(num_threads); 454 arena.execute( 455 [&] () { 456 run_concurrency_levels<int,std::tuple<int> >(num_threads); 457 run_concurrency_levels<int,std::tuple<tbb::flow::continue_msg> >(num_threads); 458 run_buffered_levels<int, std::tuple<int> >(num_threads); 459 run_unlimited_concurrency<int, std::tuple<int> >(); 460 run_unlimited_concurrency<int,std::tuple<empty_no_assign> >(); 461 run_unlimited_concurrency<empty_no_assign,std::tuple<int> >(); 462 run_unlimited_concurrency<empty_no_assign,std::tuple<empty_no_assign> >(); 463 run_unlimited_concurrency<int,std::tuple<tbb::flow::continue_msg> >(); 464 run_unlimited_concurrency<empty_no_assign,std::tuple<tbb::flow::continue_msg> >(); 465 run_multiport_test<int, std::tuple<int, int> >(num_threads); 466 run_multiport_test<float, std::tuple<int, double> >(num_threads); 467 } 468 ); 469 } 470 471 template<typename Policy> 472 void test_ports_return_references() { 473 tbb::flow::graph g; 474 typedef int InputType; 475 typedef std::tuple<int> OutputTuple; 476 tbb::flow::multifunction_node<InputType, OutputTuple, Policy> mf_node( 477 g, tbb::flow::unlimited, 478 &harness_graph_multifunction_executor<InputType, OutputTuple>::empty_func ); 479 test_output_ports_return_ref(mf_node); 480 } 481 482 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 483 #include <array> 484 #include <vector> 485 486 void test_precedes() { 487 using namespace tbb::flow; 488 489 using multinode = multifunction_node<int, std::tuple<int, int>>; 490 491 graph g; 492 493 buffer_node<int> b1(g); 494 buffer_node<int> b2(g); 495 496 multinode node(precedes(b1, b2), unlimited, [](const int& i, multinode::output_ports_type& op) -> void { 497 if (i % 2) 498 std::get<0>(op).try_put(i); 499 else 500 std::get<1>(op).try_put(i); 501 } 502 ); 503 504 node.try_put(0); 505 node.try_put(1); 506 g.wait_for_all(); 507 508 int storage; 509 CHECK_MESSAGE((b1.try_get(storage) && !b1.try_get(storage) && b2.try_get(storage) && !b2.try_get(storage)), 510 "Not exact edge quantity was made"); 511 } 512 513 void test_follows_and_precedes_api() { 514 using multinode = tbb::flow::multifunction_node<int, std::tuple<int, int, int>>; 515 516 std::array<int, 3> messages_for_follows = { {0, 1, 2} }; 517 518 follows_and_precedes_testing::test_follows 519 <int, tbb::flow::multifunction_node<int, std::tuple<int, int, int>>> 520 (messages_for_follows, tbb::flow::unlimited, [](const int& i, multinode::output_ports_type& op) -> void { 521 std::get<0>(op).try_put(i); 522 }); 523 524 test_precedes(); 525 } 526 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 527 528 //! Test various node bodies with concurrency 529 //! \brief \ref error_guessing 530 TEST_CASE("Concurrency test"){ 531 for( unsigned int p=utils::MinThread; p<=utils::MaxThread; ++p ) { 532 test_concurrency(p); 533 } 534 } 535 536 //! Test return types of ports 537 //! \brief \ref error_guessing 538 TEST_CASE("Test ports retrurn references"){ 539 test_ports_return_references<tbb::flow::queueing>(); 540 test_ports_return_references<tbb::flow::rejecting>(); 541 } 542 543 //! NativeParallelFor testing with various concurrency settings 544 //! \brief \ref error_guessing 545 TEST_CASE("Lightweight testing"){ 546 lightweight_testing::test<tbb::flow::multifunction_node>(10); 547 } 548 549 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 550 //! Test follows and precedes API 551 //! \brief \ref error_guessing 552 TEST_CASE("Test follows-precedes API"){ 553 test_follows_and_precedes_api(); 554 } 555 //! Test priority constructor with follows and precedes API 556 //! \brief \ref error_guessing 557 TEST_CASE("Test priority with follows and precedes"){ 558 using namespace tbb::flow; 559 560 using multinode = multifunction_node<int, std::tuple<int, int>>; 561 562 graph g; 563 564 buffer_node<int> b1(g); 565 buffer_node<int> b2(g); 566 567 multinode node(precedes(b1, b2), unlimited, [](const int& i, multinode::output_ports_type& op) -> void { 568 if (i % 2) 569 std::get<0>(op).try_put(i); 570 else 571 std::get<1>(op).try_put(i); 572 } 573 , node_priority_t(0)); 574 575 node.try_put(0); 576 node.try_put(1); 577 g.wait_for_all(); 578 579 int storage; 580 CHECK_MESSAGE((b1.try_get(storage) && !b1.try_get(storage) && b2.try_get(storage) && !b2.try_get(storage)), 581 "Not exact edge quantity was made"); 582 } 583 584 #endif 585 586