1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #if __INTEL_COMPILER && _MSC_VER 18 #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated 19 #endif 20 21 #include "common/config.h" 22 23 #include "tbb/flow_graph.h" 24 25 #include "common/test.h" 26 #include "common/utils.h" 27 #include "common/utils_assert.h" 28 #include "common/test_follows_and_precedes_api.h" 29 #include "tbb/global_control.h" 30 31 #include <atomic> 32 33 34 //! \file test_limiter_node.cpp 35 //! \brief Test for [flow_graph.limiter_node] specification 36 37 38 const int L = 10; 39 const int N = 1000; 40 41 using tbb::detail::d1::SUCCESSFULLY_ENQUEUED; 42 using tbb::detail::d1::graph_task; 43 44 template< typename T > 45 struct serial_receiver : public tbb::flow::receiver<T>, utils::NoAssign { 46 T next_value; 47 tbb::flow::graph& my_graph; 48 49 serial_receiver(tbb::flow::graph& g) : next_value(T(0)), my_graph(g) {} 50 51 graph_task* try_put_task( const T &v ) override { 52 CHECK_MESSAGE( next_value++ == v, "" ); 53 return const_cast<graph_task*>(SUCCESSFULLY_ENQUEUED); 54 } 55 56 tbb::flow::graph& graph_reference() const override { 57 return my_graph; 58 } 59 }; 60 61 template< typename T > 62 struct parallel_receiver : public tbb::flow::receiver<T>, utils::NoAssign { 63 64 std::atomic<int> my_count; 65 tbb::flow::graph& my_graph; 66 67 parallel_receiver(tbb::flow::graph& g) : my_graph(g) { my_count = 0; } 68 69 graph_task* try_put_task( const T &/*v*/ ) override { 70 ++my_count; 71 return const_cast<graph_task*>(SUCCESSFULLY_ENQUEUED); 72 } 73 74 tbb::flow::graph& graph_reference() const override { 75 return my_graph; 76 } 77 }; 78 79 template< typename T > 80 struct empty_sender : public tbb::flow::sender<T> { 81 typedef typename tbb::flow::sender<T>::successor_type successor_type; 82 83 bool register_successor( successor_type & ) override { return false; } 84 bool remove_successor( successor_type & ) override { return false; } 85 }; 86 87 88 template< typename T > 89 struct put_body : utils::NoAssign { 90 91 tbb::flow::limiter_node<T> &my_lim; 92 std::atomic<int> &my_accept_count; 93 94 put_body( tbb::flow::limiter_node<T> &lim, std::atomic<int> &accept_count ) : 95 my_lim(lim), my_accept_count(accept_count) {} 96 97 void operator()( int ) const { 98 for ( int i = 0; i < L; ++i ) { 99 bool msg = my_lim.try_put( T(i) ); 100 if ( msg == true ) 101 ++my_accept_count; 102 } 103 } 104 }; 105 106 template< typename T > 107 struct put_dec_body : utils::NoAssign { 108 109 tbb::flow::limiter_node<T> &my_lim; 110 std::atomic<int> &my_accept_count; 111 112 put_dec_body( tbb::flow::limiter_node<T> &lim, std::atomic<int> &accept_count ) : 113 my_lim(lim), my_accept_count(accept_count) {} 114 115 void operator()( int ) const { 116 int local_accept_count = 0; 117 while ( local_accept_count < N ) { 118 bool msg = my_lim.try_put( T(local_accept_count) ); 119 if ( msg == true ) { 120 ++local_accept_count; 121 ++my_accept_count; 122 my_lim.decrementer().try_put( tbb::flow::continue_msg() ); 123 } 124 } 125 } 126 127 }; 128 129 template< typename T > 130 void test_puts_with_decrements( int num_threads, tbb::flow::limiter_node< T >& lim , tbb::flow::graph& g) { 131 parallel_receiver<T> r(g); 132 empty_sender< tbb::flow::continue_msg > s; 133 std::atomic<int> accept_count; 134 accept_count = 0; 135 tbb::flow::make_edge( lim, r ); 136 tbb::flow::make_edge(s, lim.decrementer()); 137 138 // test puts with decrements 139 utils::NativeParallelFor( num_threads, put_dec_body<T>(lim, accept_count) ); 140 int c = accept_count; 141 CHECK_MESSAGE( c == N*num_threads, "" ); 142 CHECK_MESSAGE( r.my_count == N*num_threads, "" ); 143 } 144 145 // 146 // Tests 147 // 148 // limiter only forwards below the limit, multiple parallel senders / single receiver 149 // multiple parallel senders that put to decrement at each accept, limiter accepts new messages 150 // 151 // 152 template< typename T > 153 int test_parallel(int num_threads) { 154 155 // test puts with no decrements 156 for ( int i = 0; i < L; ++i ) { 157 tbb::flow::graph g; 158 tbb::flow::limiter_node< T > lim(g, i); 159 parallel_receiver<T> r(g); 160 std::atomic<int> accept_count; 161 accept_count = 0; 162 tbb::flow::make_edge( lim, r ); 163 // test puts with no decrements 164 utils::NativeParallelFor( num_threads, put_body<T>(lim, accept_count) ); 165 g.wait_for_all(); 166 int c = accept_count; 167 CHECK_MESSAGE( c == i, "" ); 168 } 169 170 // test puts with decrements 171 for ( int i = 1; i < L; ++i ) { 172 tbb::flow::graph g; 173 tbb::flow::limiter_node< T > lim(g, i); 174 test_puts_with_decrements(num_threads, lim, g); 175 tbb::flow::limiter_node< T > lim_copy( lim ); 176 test_puts_with_decrements(num_threads, lim_copy, g); 177 } 178 179 return 0; 180 } 181 182 // 183 // Tests 184 // 185 // limiter only forwards below the limit, single sender / single receiver 186 // at reject, a put to decrement, will cause next message to be accepted 187 // 188 template< typename T > 189 int test_serial() { 190 191 // test puts with no decrements 192 for ( int i = 0; i < L; ++i ) { 193 tbb::flow::graph g; 194 tbb::flow::limiter_node< T > lim(g, i); 195 serial_receiver<T> r(g); 196 tbb::flow::make_edge( lim, r ); 197 for ( int j = 0; j < L; ++j ) { 198 bool msg = lim.try_put( T(j) ); 199 CHECK_MESSAGE( (( j < i && msg == true ) || ( j >= i && msg == false )), "" ); 200 } 201 g.wait_for_all(); 202 } 203 204 // test puts with decrements 205 for ( int i = 1; i < L; ++i ) { 206 tbb::flow::graph g; 207 tbb::flow::limiter_node< T > lim(g, i); 208 serial_receiver<T> r(g); 209 empty_sender< tbb::flow::continue_msg > s; 210 tbb::flow::make_edge( lim, r ); 211 tbb::flow::make_edge(s, lim.decrementer()); 212 for ( int j = 0; j < N; ++j ) { 213 bool msg = lim.try_put( T(j) ); 214 CHECK_MESSAGE( (( j < i && msg == true ) || ( j >= i && msg == false )), "" ); 215 if ( msg == false ) { 216 lim.decrementer().try_put( tbb::flow::continue_msg() ); 217 msg = lim.try_put( T(j) ); 218 CHECK_MESSAGE( msg == true, "" ); 219 } 220 } 221 } 222 return 0; 223 } 224 225 // reported bug in limiter (http://software.intel.com/en-us/comment/1752355) 226 #define DECREMENT_OUTPUT 1 // the port number of the decrement output of the multifunction_node 227 #define LIMITER_OUTPUT 0 // port number of the integer output 228 229 typedef tbb::flow::multifunction_node<int, std::tuple<int,tbb::flow::continue_msg> > mfnode_type; 230 231 std::atomic<size_t> emit_count; 232 std::atomic<size_t> emit_sum; 233 std::atomic<size_t> receive_count; 234 std::atomic<size_t> receive_sum; 235 236 struct mfnode_body { 237 int max_cnt; 238 std::atomic<int>* my_cnt; 239 mfnode_body(const int& _max, std::atomic<int> &_my) : max_cnt(_max), my_cnt(&_my) { } 240 void operator()(const int &/*in*/, mfnode_type::output_ports_type &out) { 241 int lcnt = ++(*my_cnt); 242 if(lcnt > max_cnt) { 243 return; 244 } 245 // put one continue_msg to the decrement of the limiter. 246 if(!std::get<DECREMENT_OUTPUT>(out).try_put(tbb::flow::continue_msg())) { 247 CHECK_MESSAGE( (false),"Unexpected rejection of decrement"); 248 } 249 { 250 // put messages to the input of the limiter_node until it rejects. 251 while( std::get<LIMITER_OUTPUT>(out).try_put(lcnt) ) { 252 emit_sum += lcnt; 253 ++emit_count; 254 } 255 } 256 } 257 }; 258 259 struct fn_body { 260 int operator()(const int &in) { 261 receive_sum += in; 262 ++receive_count; 263 return in; 264 } 265 }; 266 267 // +------------+ 268 // +---------+ | v 269 // | mf_node |0---+ +----------+ +----------+ 270 // +->| |1---------->| lim_node |--------->| fn_node |--+ 271 // | +---------+ +----------+ +----------+ | 272 // | | 273 // | | 274 // +-------------------------------------------------------------+ 275 // 276 void 277 test_multifunction_to_limiter(int _max, int _nparallel) { 278 tbb::flow::graph g; 279 emit_count = 0; 280 emit_sum = 0; 281 receive_count = 0; 282 receive_sum = 0; 283 std::atomic<int> local_cnt; 284 local_cnt = 0; 285 mfnode_type mf_node(g, tbb::flow::unlimited, mfnode_body(_max, local_cnt)); 286 tbb::flow::function_node<int, int> fn_node(g, tbb::flow::unlimited, fn_body()); 287 tbb::flow::limiter_node<int> lim_node(g, _nparallel); 288 tbb::flow::make_edge(tbb::flow::output_port<LIMITER_OUTPUT>(mf_node), lim_node); 289 tbb::flow::make_edge(tbb::flow::output_port<DECREMENT_OUTPUT>(mf_node), lim_node.decrementer()); 290 tbb::flow::make_edge(lim_node, fn_node); 291 tbb::flow::make_edge(fn_node, mf_node); 292 293 mf_node.try_put(1); 294 g.wait_for_all(); 295 CHECK_MESSAGE( (emit_count == receive_count), "counts do not match"); 296 CHECK_MESSAGE( (emit_sum == receive_sum), "sums do not match"); 297 298 // reset, test again 299 g.reset(); 300 emit_count = 0; 301 emit_sum = 0; 302 receive_count = 0; 303 receive_sum = 0; 304 local_cnt = 0;; 305 mf_node.try_put(1); 306 g.wait_for_all(); 307 CHECK_MESSAGE( (emit_count == receive_count), "counts do not match"); 308 CHECK_MESSAGE( (emit_sum == receive_sum), "sums do not match"); 309 } 310 311 312 void 313 test_continue_msg_reception() { 314 tbb::flow::graph g; 315 tbb::flow::limiter_node<int> ln(g,2); 316 tbb::flow::queue_node<int> qn(g); 317 tbb::flow::make_edge(ln, qn); 318 ln.decrementer().try_put(tbb::flow::continue_msg()); 319 ln.try_put(42); 320 g.wait_for_all(); 321 int outint; 322 CHECK_MESSAGE( (qn.try_get(outint) && outint == 42), "initial put to decrement stops node"); 323 } 324 325 326 // 327 // This test ascertains that if a message is not successfully put 328 // to a successor, the message is not dropped but released. 329 // 330 331 void test_reserve_release_messages() { 332 using namespace tbb::flow; 333 graph g; 334 335 //making two queue_nodes: one broadcast_node and one limiter_node 336 queue_node<int> input_queue(g); 337 queue_node<int> output_queue(g); 338 broadcast_node<int> broad(g); 339 limiter_node<int, int> limit(g,2); //threshold of 2 340 341 //edges 342 make_edge(input_queue, limit); 343 make_edge(limit, output_queue); 344 make_edge(broad,limit.decrementer()); 345 346 int list[4] = {19, 33, 72, 98}; //list to be put to the input queue 347 348 input_queue.try_put(list[0]); // succeeds 349 input_queue.try_put(list[1]); // succeeds 350 input_queue.try_put(list[2]); // fails, stored in upstream buffer 351 g.wait_for_all(); 352 353 remove_edge(limit, output_queue); //remove successor 354 355 //sending message to the decrement port of the limiter 356 broad.try_put(1); //failed message retrieved. 357 g.wait_for_all(); 358 359 tbb::flow::make_edge(limit, output_queue); //putting the successor back 360 361 broad.try_put(1); //drop the count 362 363 input_queue.try_put(list[3]); //success 364 g.wait_for_all(); 365 366 int var=0; 367 368 for (int i=0; i<4; i++) { 369 output_queue.try_get(var); 370 CHECK_MESSAGE( (var==list[i]), "some data dropped, input does not match output"); 371 g.wait_for_all(); 372 } 373 } 374 375 void test_decrementer() { 376 const int threshold = 5; 377 tbb::flow::graph g; 378 tbb::flow::limiter_node<int, int> limit(g, threshold); 379 tbb::flow::queue_node<int> queue(g); 380 make_edge(limit, queue); 381 int m = 0; 382 CHECK_MESSAGE( ( limit.try_put( m++ )), "Newly constructed limiter node does not accept message." ); 383 CHECK_MESSAGE( limit.decrementer().try_put( -threshold ), // close limiter's gate 384 "Limiter node decrementer's port does not accept message." ); 385 CHECK_MESSAGE( ( !limit.try_put( m++ )), "Closed limiter node's accepts message." ); 386 CHECK_MESSAGE( limit.decrementer().try_put( threshold + 5 ), // open limiter's gate 387 "Limiter node decrementer's port does not accept message." ); 388 for( int i = 0; i < threshold; ++i ) 389 CHECK_MESSAGE( ( limit.try_put( m++ )), "Limiter node does not accept message while open." ); 390 CHECK_MESSAGE( ( !limit.try_put( m )), "Limiter node's gate is not closed." ); 391 g.wait_for_all(); 392 int expected[] = {0, 2, 3, 4, 5, 6}; 393 int actual = -1; m = 0; 394 while( queue.try_get(actual) ) 395 CHECK_MESSAGE( actual == expected[m++], "" ); 396 CHECK_MESSAGE( ( sizeof(expected) / sizeof(expected[0]) == m), "Not all messages have been processed." ); 397 g.wait_for_all(); 398 399 const size_t threshold2 = size_t(-1); 400 tbb::flow::limiter_node<int, long long> limit2(g, threshold2); 401 make_edge(limit2, queue); 402 CHECK_MESSAGE( ( limit2.try_put( 1 )), "Newly constructed limiter node does not accept message." ); 403 long long decrement_value = (long long)( size_t(-1)/2 ); 404 CHECK_MESSAGE( limit2.decrementer().try_put( -decrement_value ), 405 "Limiter node decrementer's port does not accept message" ); 406 CHECK_MESSAGE( ( limit2.try_put( 2 )), "Limiter's gate should not be closed yet." ); 407 CHECK_MESSAGE( limit2.decrementer().try_put( -decrement_value ), 408 "Limiter node decrementer's port does not accept message" ); 409 CHECK_MESSAGE( ( !limit2.try_put( 3 )), "Overflow happened for internal counter." ); 410 int expected2[] = {1, 2}; 411 actual = -1; m = 0; 412 while( queue.try_get(actual) ) 413 CHECK_MESSAGE( actual == expected2[m++], "" ); 414 CHECK_MESSAGE( ( sizeof(expected2) / sizeof(expected2[0]) == m), "Not all messages have been processed." ); 415 g.wait_for_all(); 416 417 const size_t threshold3 = 10; 418 tbb::flow::limiter_node<int, long long> limit3(g, threshold3); 419 make_edge(limit3, queue); 420 long long decrement_value3 = 3; 421 CHECK_MESSAGE( limit3.decrementer().try_put( -decrement_value3 ), 422 "Limiter node decrementer's port does not accept message" ); 423 424 m = 0; 425 while( limit3.try_put( m ) ){ m++; }; 426 CHECK_MESSAGE( m == threshold3 - decrement_value3, "Not all messages have been accepted." ); 427 428 actual = -1; m = 0; 429 while( queue.try_get(actual) ){ 430 CHECK_MESSAGE( actual == m++, "Not all messages have been processed." ); 431 } 432 433 g.wait_for_all(); 434 CHECK_MESSAGE( m == threshold3 - decrement_value3, "Not all messages have been processed." ); 435 } 436 437 void test_try_put_without_successors() { 438 tbb::flow::graph g; 439 int try_put_num{3}; 440 tbb::flow::buffer_node<int> bn(g); 441 tbb::flow::limiter_node<int> ln(g, try_put_num); 442 443 tbb::flow::make_edge(bn, ln); 444 445 int i = 1; 446 for (; i <= try_put_num; i++) 447 bn.try_put(i); 448 449 std::atomic<int> counter{0}; 450 tbb::flow::function_node<int, int> fn(g, tbb::flow::unlimited, 451 [&](int input) { 452 counter += input; 453 return int{}; 454 } 455 ); 456 457 tbb::flow::make_edge(ln, fn); 458 459 g.wait_for_all(); 460 CHECK((counter == i * try_put_num / 2)); 461 462 // Check the lost message 463 tbb::flow::remove_edge(bn, ln); 464 ln.decrementer().try_put(tbb::flow::continue_msg()); 465 bn.try_put(try_put_num + 1); 466 g.wait_for_all(); 467 CHECK((counter == i * try_put_num / 2)); 468 469 } 470 471 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 472 #include <array> 473 #include <vector> 474 void test_follows_and_precedes_api() { 475 using msg_t = tbb::flow::continue_msg; 476 477 std::array<msg_t, 3> messages_for_follows= { {msg_t(), msg_t(), msg_t()} }; 478 std::vector<msg_t> messages_for_precedes = {msg_t()}; 479 480 follows_and_precedes_testing::test_follows 481 <msg_t, tbb::flow::limiter_node<msg_t, msg_t>>(messages_for_follows, 1000); 482 follows_and_precedes_testing::test_precedes 483 <msg_t, tbb::flow::limiter_node<msg_t, msg_t>>(messages_for_precedes, 1000); 484 485 } 486 #endif 487 488 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 489 void test_deduction_guides() { 490 using namespace tbb::flow; 491 492 graph g; 493 broadcast_node<int> br(g); 494 limiter_node<int> l0(g, 100); 495 496 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 497 limiter_node l1(follows(br), 100); 498 static_assert(std::is_same_v<decltype(l1), limiter_node<int>>); 499 500 limiter_node l2(precedes(br), 100); 501 static_assert(std::is_same_v<decltype(l2), limiter_node<int>>); 502 #endif 503 504 limiter_node l3(l0); 505 static_assert(std::is_same_v<decltype(l3), limiter_node<int>>); 506 } 507 #endif 508 509 void test_decrement_while_try_put_task() { 510 constexpr int threshold = 50000; 511 512 tbb::flow::graph graph{}; 513 std::atomic<int> processed; 514 tbb::flow::input_node<int> input{ graph, [&](tbb::flow_control & fc) -> int { 515 static int i = {}; 516 if (i++ >= threshold) fc.stop(); 517 return i; 518 }}; 519 tbb::flow::limiter_node<int, int> blockingNode{ graph, 1 }; 520 tbb::flow::multifunction_node<int, std::tuple<int>> processing{ graph, tbb::flow::serial, 521 [&](const int & value, typename decltype(processing)::output_ports_type & out) { 522 if (value != threshold) 523 std::get<0>(out).try_put(1); 524 processed.store(value); 525 }}; 526 527 tbb::flow::make_edge(input, blockingNode); 528 tbb::flow::make_edge(blockingNode, processing); 529 tbb::flow::make_edge(processing, blockingNode.decrementer()); 530 531 input.activate(); 532 533 graph.wait_for_all(); 534 CHECK_MESSAGE(processed.load() == threshold, "decrementer terminate flow graph work"); 535 } 536 537 538 //! Test puts on limiter_node with decrements and varying parallelism levels 539 //! \brief \ref error_guessing 540 TEST_CASE("Serial and parallel tests") { 541 for (unsigned i = 1; i <= 2 * utils::MaxThread; ++i) { 542 tbb::task_arena arena(i); 543 arena.execute( 544 [i]() { 545 test_serial<int>(); 546 test_parallel<int>(i); 547 } 548 ); 549 } 550 } 551 552 //! Test initial put of continue_msg on decrementer port does not stop message flow 553 //! \brief \ref error_guessing 554 TEST_CASE("Test continue_msg reception") { 555 test_continue_msg_reception(); 556 } 557 558 //! Test put message on decrementer port does not stop message flow 559 //! \brief \ref error_guessing 560 TEST_CASE("Test try_put to decrementer while try_put to limiter_node") { 561 test_decrement_while_try_put_task(); 562 } 563 564 //! Test multifunction_node connected to limiter_node 565 //! \brief \ref error_guessing 566 TEST_CASE("Multifunction connected to limiter") { 567 test_multifunction_to_limiter(30,3); 568 test_multifunction_to_limiter(300,13); 569 test_multifunction_to_limiter(3000,1); 570 } 571 572 //! Test message release if successor doesn't accept 573 //! \brief \ref requirement 574 TEST_CASE("Message is released if successor does not accept") { 575 test_reserve_release_messages(); 576 } 577 578 //! Test decrementer 579 //! \brief \ref requirement \ref error_guessing 580 TEST_CASE("Decrementer") { 581 test_decrementer(); 582 } 583 584 //! Test try_put() without successor 585 //! \brief \ref error_guessing 586 TEST_CASE("Test try_put() without successors") { 587 test_try_put_without_successors(); 588 } 589 590 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 591 //! Test follows and precedes API 592 //! \brief \ref error_guessing 593 TEST_CASE( "Support for follows and precedes API" ) { 594 test_follows_and_precedes_api(); 595 } 596 #endif 597 598 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 599 //! Test deduction guides 600 //! \brief \ref requirement 601 TEST_CASE( "Deduction guides" ) { 602 test_deduction_guides(); 603 } 604 #endif 605 606 struct TestLargeStruct { 607 char bytes[512]{ 0 }; 608 }; 609 610 //! Test correct node deallocation while using small_object_pool. 611 //! (see https://github.com/oneapi-src/oneTBB/issues/639) 612 //! \brief \ref error_guessing 613 TEST_CASE("Test correct node deallocation while using small_object_pool") { 614 tbb::flow::graph graph; 615 tbb::flow::queue_node<TestLargeStruct> input_node( graph ); 616 tbb::flow::function_node<TestLargeStruct> func( graph, tbb::flow::serial, 617 [](const TestLargeStruct& input) { return input; } ); 618 619 tbb::flow::make_edge( input_node, func ); 620 CHECK( input_node.try_put( TestLargeStruct{} ) ); 621 graph.wait_for_all(); 622 623 tbb::task_scheduler_handle handle{ tbb::attach{} }; 624 tbb::finalize( handle, std::nothrow ); 625 } 626