1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #if __INTEL_COMPILER && _MSC_VER 18 #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated 19 #endif 20 21 #include "common/config.h" 22 23 #include "tbb/flow_graph.h" 24 #include "tbb/spin_rw_mutex.h" 25 26 #include "common/test.h" 27 #include "common/utils.h" 28 #include "common/graph_utils.h" 29 #include "common/test_follows_and_precedes_api.h" 30 #include "common/concepts_common.h" 31 32 33 //! \file test_multifunction_node.cpp 34 //! \brief Test for [flow_graph.multifunction_node] specification 35 36 37 #if TBB_USE_DEBUG 38 #define N 16 39 #else 40 #define N 100 41 #endif 42 #define MAX_NODES 4 43 44 //! Performs test on function nodes with limited concurrency and buffering 45 /** These tests check: 46 1) that the number of executing copies never exceed the concurrency limit 47 2) that the node never rejects 48 3) that no items are lost 49 and 4) all of this happens even if there are multiple predecessors and successors 50 */ 51 52 //! exercise buffered multifunction_node. 53 template< typename InputType, typename OutputTuple, typename Body > 54 void buffered_levels( size_t concurrency, Body body ) { 55 typedef typename std::tuple_element<0,OutputTuple>::type OutputType; 56 // Do for lc = 1 to concurrency level 57 for ( size_t lc = 1; lc <= concurrency; ++lc ) { 58 tbb::flow::graph g; 59 60 // Set the execute_counter back to zero in the harness 61 harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count = 0; 62 // Set the number of current executors to zero. 63 harness_graph_multifunction_executor<InputType, OutputTuple>::current_executors = 0; 64 // Set the max allowed executors to lc. There is a check in the functor to make sure this is never exceeded. 65 harness_graph_multifunction_executor<InputType, OutputTuple>::max_executors = lc; 66 67 // Create the function_node with the appropriate concurrency level, and use default buffering 68 tbb::flow::multifunction_node< InputType, OutputTuple > exe_node( g, lc, body ); 69 70 //Create a vector of identical exe_nodes 71 std::vector< tbb::flow::multifunction_node< InputType, OutputTuple > > exe_vec(2, exe_node); 72 73 // exercise each of the copied nodes 74 for (size_t node_idx=0; node_idx<exe_vec.size(); ++node_idx) { 75 for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { 76 // Create num_receivers counting receivers and connect the exe_vec[node_idx] to them. 77 std::vector< std::shared_ptr<harness_mapped_receiver<OutputType>> > receivers; 78 for (size_t i = 0; i < num_receivers; i++) { 79 receivers.push_back( std::make_shared<harness_mapped_receiver<OutputType>>(g) ); 80 } 81 82 for (size_t r = 0; r < num_receivers; ++r ) { 83 tbb::flow::make_edge( tbb::flow::output_port<0>(exe_vec[node_idx]), *receivers[r] ); 84 } 85 86 // Do the test with varying numbers of senders 87 std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders; 88 for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) { 89 // Create num_senders senders, set their message limit each to N, and connect 90 // them to the exe_vec[node_idx] 91 senders.clear(); 92 for (size_t s = 0; s < num_senders; ++s ) { 93 senders.push_back( std::make_shared<harness_counting_sender<InputType>>() ); 94 senders.back()->my_limit = N; 95 tbb::flow::make_edge( *senders.back(), exe_vec[node_idx] ); 96 } 97 98 // Initialize the receivers so they know how many senders and messages to check for 99 for (size_t r = 0; r < num_receivers; ++r ) { 100 receivers[r]->initialize_map( N, num_senders ); 101 } 102 103 // Do the test 104 utils::NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) ); 105 g.wait_for_all(); 106 107 // confirm that each sender was requested from N times 108 for (size_t s = 0; s < num_senders; ++s ) { 109 size_t n = senders[s]->my_received; 110 CHECK_MESSAGE( n == N, "" ); 111 CHECK_MESSAGE( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_vec[node_idx], "" ); 112 } 113 // validate the receivers 114 for (size_t r = 0; r < num_receivers; ++r ) { 115 receivers[r]->validate(); 116 } 117 } 118 for (size_t r = 0; r < num_receivers; ++r ) { 119 tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_vec[node_idx]), *receivers[r] ); 120 } 121 CHECK_MESSAGE( exe_vec[node_idx].try_put( InputType() ) == true, "" ); 122 g.wait_for_all(); 123 for (size_t r = 0; r < num_receivers; ++r ) { 124 // since it's detached, nothing should have changed 125 receivers[r]->validate(); 126 } 127 } 128 } 129 } 130 } 131 132 const size_t Offset = 123; 133 std::atomic<size_t> global_execute_count; 134 135 struct inc_functor { 136 137 std::atomic<size_t> local_execute_count; 138 inc_functor( ) { local_execute_count = 0; } 139 inc_functor( const inc_functor &f ) { local_execute_count = size_t(f.local_execute_count); } 140 141 template<typename output_ports_type> 142 void operator()( int i, output_ports_type &p ) { 143 ++global_execute_count; 144 ++local_execute_count; 145 (void)std::get<0>(p).try_put(i); 146 } 147 148 }; 149 150 template< typename InputType, typename OutputTuple > 151 void buffered_levels_with_copy( size_t concurrency ) { 152 typedef typename std::tuple_element<0,OutputTuple>::type OutputType; 153 // Do for lc = 1 to concurrency level 154 for ( size_t lc = 1; lc <= concurrency; ++lc ) { 155 tbb::flow::graph g; 156 157 inc_functor cf; 158 cf.local_execute_count = Offset; 159 global_execute_count = Offset; 160 161 tbb::flow::multifunction_node< InputType, OutputTuple > exe_node( g, lc, cf ); 162 163 for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { 164 165 std::vector< std::shared_ptr<harness_mapped_receiver<OutputType>> > receivers; 166 for (size_t i = 0; i < num_receivers; i++) { 167 receivers.push_back( std::make_shared<harness_mapped_receiver<OutputType>>(g) ); 168 } 169 170 for (size_t r = 0; r < num_receivers; ++r ) { 171 tbb::flow::make_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] ); 172 } 173 174 std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders; 175 for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) { 176 senders.clear(); 177 for (size_t s = 0; s < num_senders; ++s ) { 178 senders.push_back( std::make_shared<harness_counting_sender<InputType>>() ); 179 senders.back()->my_limit = N; 180 tbb::flow::make_edge( *senders.back(), exe_node ); 181 } 182 183 for (size_t r = 0; r < num_receivers; ++r ) { 184 receivers[r]->initialize_map( N, num_senders ); 185 } 186 187 utils::NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) ); 188 g.wait_for_all(); 189 190 for (size_t s = 0; s < num_senders; ++s ) { 191 size_t n = senders[s]->my_received; 192 CHECK_MESSAGE( n == N, "" ); 193 CHECK_MESSAGE( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_node, "" ); 194 } 195 for (size_t r = 0; r < num_receivers; ++r ) { 196 receivers[r]->validate(); 197 } 198 } 199 for (size_t r = 0; r < num_receivers; ++r ) { 200 tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] ); 201 } 202 CHECK_MESSAGE( exe_node.try_put( InputType() ) == true, "" ); 203 g.wait_for_all(); 204 for (size_t r = 0; r < num_receivers; ++r ) { 205 receivers[r]->validate(); 206 } 207 } 208 209 // validate that the local body matches the global execute_count and both are correct 210 inc_functor body_copy = tbb::flow::copy_body<inc_functor>( exe_node ); 211 const size_t expected_count = N/2 * MAX_NODES * MAX_NODES * ( MAX_NODES + 1 ) + MAX_NODES + Offset; 212 size_t global_count = global_execute_count; 213 size_t inc_count = body_copy.local_execute_count; 214 CHECK_MESSAGE( (global_count == expected_count && global_count == inc_count), "" ); 215 } 216 } 217 218 template< typename InputType, typename OutputTuple > 219 void run_buffered_levels( int c ) { 220 typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type; 221 buffered_levels<InputType,OutputTuple>( c, []( InputType i, output_ports_type &p ) { harness_graph_multifunction_executor<InputType, OutputTuple>::func(i,p); } ); 222 buffered_levels<InputType,OutputTuple>( c, &harness_graph_multifunction_executor<InputType, OutputTuple>::func ); 223 buffered_levels<InputType,OutputTuple>( c, typename harness_graph_multifunction_executor<InputType, OutputTuple>::functor() ); 224 buffered_levels_with_copy<InputType,OutputTuple>( c ); 225 } 226 227 228 //! Performs test on executable nodes with limited concurrency 229 /** These tests check: 230 1) that the nodes will accepts puts up to the concurrency limit, 231 2) the nodes do not exceed the concurrency limit even when run with more threads (this is checked in the harness_graph_executor), 232 3) the nodes will receive puts from multiple successors simultaneously, 233 and 4) the nodes will send to multiple predecessors. 234 There is no checking of the contents of the messages for corruption. 235 */ 236 237 template< typename InputType, typename OutputTuple, typename Body > 238 void concurrency_levels( size_t concurrency, Body body ) { 239 typedef typename std::tuple_element<0,OutputTuple>::type OutputType; 240 for ( size_t lc = 1; lc <= concurrency; ++lc ) { 241 tbb::flow::graph g; 242 243 // Set the execute_counter back to zero in the harness 244 harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count = 0; 245 // Set the number of current executors to zero. 246 harness_graph_multifunction_executor<InputType, OutputTuple>::current_executors = 0; 247 // Set the max allowed executors to lc. There is a check in the functor to make sure this is never exceeded. 248 harness_graph_multifunction_executor<InputType, OutputTuple>::max_executors = lc; 249 250 251 tbb::flow::multifunction_node< InputType, OutputTuple, tbb::flow::rejecting > exe_node( g, lc, body ); 252 253 for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { 254 255 std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers; 256 for (size_t i = 0; i < num_receivers; ++i) { 257 receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) ); 258 } 259 260 for (size_t r = 0; r < num_receivers; ++r ) { 261 tbb::flow::make_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] ); 262 } 263 264 std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders; 265 266 for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) { 267 { 268 // Exclusively lock m to prevent exe_node from finishing 269 tbb::spin_rw_mutex::scoped_lock l( 270 harness_graph_multifunction_executor< InputType, OutputTuple>::template mutex_holder<tbb::spin_rw_mutex>::mutex 271 ); 272 273 // put to lc level, it will accept and then block at m 274 for ( size_t c = 0 ; c < lc ; ++c ) { 275 CHECK_MESSAGE( exe_node.try_put( InputType() ) == true, "" ); 276 } 277 // it only accepts to lc level 278 CHECK_MESSAGE( exe_node.try_put( InputType() ) == false, "" ); 279 280 senders.clear(); 281 for (size_t s = 0; s < num_senders; ++s ) { 282 senders.push_back( std::make_shared<harness_counting_sender<InputType>>() ); 283 senders.back()->my_limit = N; 284 exe_node.register_predecessor( *senders.back() ); 285 } 286 287 } // release lock at end of scope, setting the exe node free to continue 288 // wait for graph to settle down 289 g.wait_for_all(); 290 291 // confirm that each sender was requested from N times 292 for (size_t s = 0; s < num_senders; ++s ) { 293 size_t n = senders[s]->my_received; 294 CHECK_MESSAGE( n == N, "" ); 295 CHECK_MESSAGE( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_node, "" ); 296 } 297 // confirm that each receivers got N * num_senders + the initial lc puts 298 for (size_t r = 0; r < num_receivers; ++r ) { 299 size_t n = receivers[r]->my_count; 300 CHECK_MESSAGE( n == num_senders*N+lc, "" ); 301 receivers[r]->my_count = 0; 302 } 303 } 304 for (size_t r = 0; r < num_receivers; ++r ) { 305 tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] ); 306 } 307 CHECK_MESSAGE( exe_node.try_put( InputType() ) == true, "" ); 308 g.wait_for_all(); 309 for (size_t r = 0; r < num_receivers; ++r ) { 310 CHECK_MESSAGE( int(receivers[r]->my_count) == 0, "" ); 311 } 312 } 313 } 314 } 315 316 template< typename InputType, typename OutputTuple > 317 void run_concurrency_levels( int c ) { 318 typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type; 319 concurrency_levels<InputType,OutputTuple>( c, []( InputType i, output_ports_type &p ) { harness_graph_multifunction_executor<InputType, OutputTuple>::template tfunc<tbb::spin_rw_mutex>(i,p); } ); 320 concurrency_levels<InputType,OutputTuple>( c, &harness_graph_multifunction_executor<InputType, OutputTuple>::template tfunc<tbb::spin_rw_mutex> ); 321 concurrency_levels<InputType,OutputTuple>( c, typename harness_graph_multifunction_executor<InputType, OutputTuple>::template tfunctor<tbb::spin_rw_mutex>() ); 322 } 323 324 325 struct empty_no_assign { 326 empty_no_assign() {} 327 empty_no_assign( int ) {} 328 operator int() { return 0; } 329 operator int() const { return 0; } 330 }; 331 332 template< typename InputType > 333 struct parallel_puts : private utils::NoAssign { 334 335 tbb::flow::receiver< InputType > * const my_exe_node; 336 337 parallel_puts( tbb::flow::receiver< InputType > &exe_node ) : my_exe_node(&exe_node) {} 338 339 void operator()( int ) const { 340 for ( int i = 0; i < N; ++i ) { 341 // the nodes will accept all puts 342 CHECK_MESSAGE( my_exe_node->try_put( InputType() ) == true, "" ); 343 } 344 } 345 346 }; 347 348 //! Performs test on executable nodes with unlimited concurrency 349 /** These tests check: 350 1) that the nodes will accept all puts 351 2) the nodes will receive puts from multiple predecessors simultaneously, 352 and 3) the nodes will send to multiple successors. 353 There is no checking of the contents of the messages for corruption. 354 */ 355 356 template< typename InputType, typename OutputTuple, typename Body > 357 void unlimited_concurrency( Body body ) { 358 typedef typename std::tuple_element<0,OutputTuple>::type OutputType; 359 360 for (unsigned int p = 1; p < 2*utils::MaxThread; ++p) { 361 tbb::flow::graph g; 362 tbb::flow::multifunction_node< InputType, OutputTuple, tbb::flow::rejecting > exe_node( g, tbb::flow::unlimited, body ); 363 364 for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { 365 std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers; 366 for (size_t i = 0; i < num_receivers; ++i) { 367 receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) ); 368 } 369 370 harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count = 0; 371 372 for (size_t r = 0; r < num_receivers; ++r ) { 373 tbb::flow::make_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] ); 374 } 375 376 utils::NativeParallelFor( p, parallel_puts<InputType>(exe_node) ); 377 g.wait_for_all(); 378 379 // 2) the nodes will receive puts from multiple predecessors simultaneously, 380 size_t ec = harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count; 381 CHECK_MESSAGE( (unsigned int)ec == p*N, "" ); 382 for (size_t r = 0; r < num_receivers; ++r ) { 383 size_t c = receivers[r]->my_count; 384 // 3) the nodes will send to multiple successors. 385 CHECK_MESSAGE( (unsigned int)c == p*N, "" ); 386 } 387 for (size_t r = 0; r < num_receivers; ++r ) { 388 tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] ); 389 } 390 } 391 } 392 } 393 394 template< typename InputType, typename OutputTuple > 395 void run_unlimited_concurrency() { 396 harness_graph_multifunction_executor<InputType, OutputTuple>::max_executors = 0; 397 typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type; 398 unlimited_concurrency<InputType,OutputTuple>( []( InputType i, output_ports_type &p ) { harness_graph_multifunction_executor<InputType, OutputTuple>::func(i,p); } ); 399 unlimited_concurrency<InputType,OutputTuple>( &harness_graph_multifunction_executor<InputType, OutputTuple>::func ); 400 unlimited_concurrency<InputType,OutputTuple>( typename harness_graph_multifunction_executor<InputType, OutputTuple>::functor() ); 401 } 402 403 template<typename InputType, typename OutputTuple> 404 struct oddEvenBody { 405 typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type; 406 typedef typename std::tuple_element<0,OutputTuple>::type EvenType; 407 typedef typename std::tuple_element<1,OutputTuple>::type OddType; 408 void operator() (const InputType &i, output_ports_type &p) { 409 if((int)i % 2) { 410 (void)std::get<1>(p).try_put(OddType(i)); 411 } 412 else { 413 (void)std::get<0>(p).try_put(EvenType(i)); 414 } 415 } 416 }; 417 418 template<typename InputType, typename OutputTuple > 419 void run_multiport_test(int num_threads) { 420 typedef typename tbb::flow::multifunction_node<InputType, OutputTuple> mo_node_type; 421 typedef typename std::tuple_element<0,OutputTuple>::type EvenType; 422 typedef typename std::tuple_element<1,OutputTuple>::type OddType; 423 tbb::task_arena arena(num_threads); 424 arena.execute( 425 [&] () { 426 tbb::flow::graph g; 427 mo_node_type mo_node(g, tbb::flow::unlimited, oddEvenBody<InputType, OutputTuple>() ); 428 429 tbb::flow::queue_node<EvenType> q0(g); 430 tbb::flow::queue_node<OddType> q1(g); 431 432 tbb::flow::make_edge(tbb::flow::output_port<0>(mo_node), q0); 433 tbb::flow::make_edge(tbb::flow::output_port<1>(mo_node), q1); 434 435 for(InputType i = 0; i < N; ++i) { 436 mo_node.try_put(i); 437 } 438 439 g.wait_for_all(); 440 for(int i = 0; i < N/2; ++i) { 441 EvenType e{}; 442 OddType o{}; 443 CHECK_MESSAGE( q0.try_get(e), "" ); 444 CHECK_MESSAGE( (int)e % 2 == 0, "" ); 445 CHECK_MESSAGE( q1.try_get(o), "" ); 446 CHECK_MESSAGE( (int)o % 2 == 1, "" ); 447 } 448 } 449 ); 450 } 451 452 //! Tests limited concurrency cases for nodes that accept data messages 453 void test_concurrency(int num_threads) { 454 tbb::task_arena arena(num_threads); 455 arena.execute( 456 [&] () { 457 run_concurrency_levels<int,std::tuple<int> >(num_threads); 458 run_concurrency_levels<int,std::tuple<tbb::flow::continue_msg> >(num_threads); 459 run_buffered_levels<int, std::tuple<int> >(num_threads); 460 run_unlimited_concurrency<int, std::tuple<int> >(); 461 run_unlimited_concurrency<int,std::tuple<empty_no_assign> >(); 462 run_unlimited_concurrency<empty_no_assign,std::tuple<int> >(); 463 run_unlimited_concurrency<empty_no_assign,std::tuple<empty_no_assign> >(); 464 run_unlimited_concurrency<int,std::tuple<tbb::flow::continue_msg> >(); 465 run_unlimited_concurrency<empty_no_assign,std::tuple<tbb::flow::continue_msg> >(); 466 run_multiport_test<int, std::tuple<int, int> >(num_threads); 467 run_multiport_test<float, std::tuple<int, double> >(num_threads); 468 } 469 ); 470 } 471 472 template<typename Policy> 473 void test_ports_return_references() { 474 tbb::flow::graph g; 475 typedef int InputType; 476 typedef std::tuple<int> OutputTuple; 477 tbb::flow::multifunction_node<InputType, OutputTuple, Policy> mf_node( 478 g, tbb::flow::unlimited, 479 &harness_graph_multifunction_executor<InputType, OutputTuple>::empty_func ); 480 test_output_ports_return_ref(mf_node); 481 } 482 483 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 484 #include <array> 485 #include <vector> 486 487 void test_precedes() { 488 using namespace tbb::flow; 489 490 using multinode = multifunction_node<int, std::tuple<int, int>>; 491 492 graph g; 493 494 buffer_node<int> b1(g); 495 buffer_node<int> b2(g); 496 497 multinode node(precedes(b1, b2), unlimited, [](const int& i, multinode::output_ports_type& op) -> void { 498 if (i % 2) 499 std::get<0>(op).try_put(i); 500 else 501 std::get<1>(op).try_put(i); 502 } 503 ); 504 505 node.try_put(0); 506 node.try_put(1); 507 g.wait_for_all(); 508 509 int storage; 510 CHECK_MESSAGE((b1.try_get(storage) && !b1.try_get(storage) && b2.try_get(storage) && !b2.try_get(storage)), 511 "Not exact edge quantity was made"); 512 } 513 514 void test_follows_and_precedes_api() { 515 using multinode = tbb::flow::multifunction_node<int, std::tuple<int, int, int>>; 516 517 std::array<int, 3> messages_for_follows = { {0, 1, 2} }; 518 519 follows_and_precedes_testing::test_follows 520 <int, tbb::flow::multifunction_node<int, std::tuple<int, int, int>>> 521 (messages_for_follows, tbb::flow::unlimited, [](const int& i, multinode::output_ports_type& op) -> void { 522 std::get<0>(op).try_put(i); 523 }); 524 525 test_precedes(); 526 } 527 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 528 529 //! Test various node bodies with concurrency 530 //! \brief \ref error_guessing 531 TEST_CASE("Concurrency test"){ 532 for( unsigned int p=utils::MinThread; p<=utils::MaxThread; ++p ) { 533 test_concurrency(p); 534 } 535 } 536 537 //! Test return types of ports 538 //! \brief \ref error_guessing 539 TEST_CASE("Test ports retrurn references"){ 540 test_ports_return_references<tbb::flow::queueing>(); 541 test_ports_return_references<tbb::flow::rejecting>(); 542 } 543 544 //! NativeParallelFor testing with various concurrency settings 545 //! \brief \ref error_guessing 546 TEST_CASE("Lightweight testing"){ 547 lightweight_testing::test<tbb::flow::multifunction_node>(10); 548 } 549 550 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 551 //! Test follows and precedes API 552 //! \brief \ref error_guessing 553 TEST_CASE("Test follows-precedes API"){ 554 test_follows_and_precedes_api(); 555 } 556 //! Test priority constructor with follows and precedes API 557 //! \brief \ref error_guessing 558 TEST_CASE("Test priority with follows and precedes"){ 559 using namespace tbb::flow; 560 561 using multinode = multifunction_node<int, std::tuple<int, int>>; 562 563 graph g; 564 565 buffer_node<int> b1(g); 566 buffer_node<int> b2(g); 567 568 multinode node(precedes(b1, b2), unlimited, [](const int& i, multinode::output_ports_type& op) -> void { 569 if (i % 2) 570 std::get<0>(op).try_put(i); 571 else 572 std::get<1>(op).try_put(i); 573 } 574 , node_priority_t(0)); 575 576 node.try_put(0); 577 node.try_put(1); 578 g.wait_for_all(); 579 580 int storage; 581 CHECK_MESSAGE((b1.try_get(storage) && !b1.try_get(storage) && b2.try_get(storage) && !b2.try_get(storage)), 582 "Not exact edge quantity was made"); 583 } 584 585 #endif 586 587 #if __TBB_CPP20_CONCEPTS_PRESENT 588 //! \brief \ref error_guessing 589 TEST_CASE("constraints for multifunction_node input") { 590 struct InputObject { 591 InputObject() = default; 592 InputObject( const InputObject& ) = default; 593 }; 594 595 static_assert(utils::well_formed_instantiation<tbb::flow::multifunction_node, InputObject, int>); 596 static_assert(utils::well_formed_instantiation<tbb::flow::multifunction_node, int, int>); 597 static_assert(!utils::well_formed_instantiation<tbb::flow::multifunction_node, test_concepts::NonCopyable, int>); 598 static_assert(!utils::well_formed_instantiation<tbb::flow::multifunction_node, test_concepts::NonDefaultInitializable, int>); 599 } 600 601 template <typename Input, typename Output, typename Body> 602 concept can_call_multifunction_node_ctor = requires( tbb::flow::graph& graph, std::size_t concurrency, Body body, 603 tbb::flow::node_priority_t priority, tbb::flow::buffer_node<int>& f ) { 604 tbb::flow::multifunction_node<Input, Output>(graph, concurrency, body); 605 tbb::flow::multifunction_node<Input, Output>(graph, concurrency, body, priority); 606 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 607 tbb::flow::multifunction_node<Input, Output>(tbb::flow::follows(f), concurrency, body); 608 tbb::flow::multifunction_node<Input, Output>(tbb::flow::follows(f), concurrency, body, priority); 609 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 610 }; 611 612 //! \brief \ref error_guessing 613 TEST_CASE("constraints for multifunction_node body") { 614 using input_type = int; 615 using output_type = std::tuple<int>; 616 using namespace test_concepts::multifunction_node_body; 617 618 static_assert(can_call_multifunction_node_ctor<input_type, output_type, Correct<input_type, output_type>>); 619 static_assert(!can_call_multifunction_node_ctor<input_type, output_type, NonCopyable<input_type, output_type>>); 620 static_assert(!can_call_multifunction_node_ctor<input_type, output_type, NonDestructible<input_type, output_type>>); 621 static_assert(!can_call_multifunction_node_ctor<input_type, output_type, NoOperatorRoundBrackets<input_type, output_type>>); 622 static_assert(!can_call_multifunction_node_ctor<input_type, output_type, WrongFirstInputOperatorRoundBrackets<input_type, output_type>>); 623 static_assert(!can_call_multifunction_node_ctor<input_type, output_type, WrongSecondInputOperatorRoundBrackets<input_type, output_type>>); 624 } 625 #endif // __TBB_CPP20_CONCEPTS_PRESENT 626