1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #if __INTEL_COMPILER && _MSC_VER 18 #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated 19 #endif 20 21 #define TBB_PREVIEW_WAITING_FOR_WORKERS 1 22 23 #include "common/config.h" 24 25 #include "tbb/flow_graph.h" 26 27 #include "common/test.h" 28 #include "common/utils.h" 29 #include "common/utils_assert.h" 30 #include "common/test_follows_and_precedes_api.h" 31 #include "tbb/global_control.h" 32 33 #include <atomic> 34 35 36 //! \file test_limiter_node.cpp 37 //! \brief Test for [flow_graph.limiter_node] specification 38 39 40 const int L = 10; 41 const int N = 1000; 42 43 using tbb::detail::d1::SUCCESSFULLY_ENQUEUED; 44 using tbb::detail::d1::graph_task; 45 46 template< typename T > 47 struct serial_receiver : public tbb::flow::receiver<T>, utils::NoAssign { 48 T next_value; 49 tbb::flow::graph& my_graph; 50 51 serial_receiver(tbb::flow::graph& g) : next_value(T(0)), my_graph(g) {} 52 53 graph_task* try_put_task( const T &v ) override { 54 CHECK_MESSAGE( next_value++ == v, "" ); 55 return const_cast<graph_task*>(SUCCESSFULLY_ENQUEUED); 56 } 57 58 tbb::flow::graph& graph_reference() const override { 59 return my_graph; 60 } 61 }; 62 63 template< typename T > 64 struct parallel_receiver : public tbb::flow::receiver<T>, utils::NoAssign { 65 66 std::atomic<int> my_count; 67 tbb::flow::graph& my_graph; 68 69 parallel_receiver(tbb::flow::graph& g) : my_graph(g) { my_count = 0; } 70 71 graph_task* try_put_task( const T &/*v*/ ) override { 72 ++my_count; 73 return const_cast<graph_task*>(SUCCESSFULLY_ENQUEUED); 74 } 75 76 tbb::flow::graph& graph_reference() const override { 77 return my_graph; 78 } 79 }; 80 81 template< typename T > 82 struct empty_sender : public tbb::flow::sender<T> { 83 typedef typename tbb::flow::sender<T>::successor_type successor_type; 84 85 bool register_successor( successor_type & ) override { return false; } 86 bool remove_successor( successor_type & ) override { return false; } 87 }; 88 89 90 template< typename T > 91 struct put_body : utils::NoAssign { 92 93 tbb::flow::limiter_node<T> &my_lim; 94 std::atomic<int> &my_accept_count; 95 96 put_body( tbb::flow::limiter_node<T> &lim, std::atomic<int> &accept_count ) : 97 my_lim(lim), my_accept_count(accept_count) {} 98 99 void operator()( int ) const { 100 for ( int i = 0; i < L; ++i ) { 101 bool msg = my_lim.try_put( T(i) ); 102 if ( msg == true ) 103 ++my_accept_count; 104 } 105 } 106 }; 107 108 template< typename T > 109 struct put_dec_body : utils::NoAssign { 110 111 tbb::flow::limiter_node<T> &my_lim; 112 std::atomic<int> &my_accept_count; 113 114 put_dec_body( tbb::flow::limiter_node<T> &lim, std::atomic<int> &accept_count ) : 115 my_lim(lim), my_accept_count(accept_count) {} 116 117 void operator()( int ) const { 118 int local_accept_count = 0; 119 while ( local_accept_count < N ) { 120 bool msg = my_lim.try_put( T(local_accept_count) ); 121 if ( msg == true ) { 122 ++local_accept_count; 123 ++my_accept_count; 124 my_lim.decrementer().try_put( tbb::flow::continue_msg() ); 125 } 126 } 127 } 128 129 }; 130 131 template< typename T > 132 void test_puts_with_decrements( int num_threads, tbb::flow::limiter_node< T >& lim , tbb::flow::graph& g) { 133 parallel_receiver<T> r(g); 134 empty_sender< tbb::flow::continue_msg > s; 135 std::atomic<int> accept_count; 136 accept_count = 0; 137 tbb::flow::make_edge( lim, r ); 138 tbb::flow::make_edge(s, lim.decrementer()); 139 140 // test puts with decrements 141 utils::NativeParallelFor( num_threads, put_dec_body<T>(lim, accept_count) ); 142 int c = accept_count; 143 CHECK_MESSAGE( c == N*num_threads, "" ); 144 CHECK_MESSAGE( r.my_count == N*num_threads, "" ); 145 } 146 147 // 148 // Tests 149 // 150 // limiter only forwards below the limit, multiple parallel senders / single receiver 151 // multiple parallel senders that put to decrement at each accept, limiter accepts new messages 152 // 153 // 154 template< typename T > 155 int test_parallel(int num_threads) { 156 157 // test puts with no decrements 158 for ( int i = 0; i < L; ++i ) { 159 tbb::flow::graph g; 160 tbb::flow::limiter_node< T > lim(g, i); 161 parallel_receiver<T> r(g); 162 std::atomic<int> accept_count; 163 accept_count = 0; 164 tbb::flow::make_edge( lim, r ); 165 // test puts with no decrements 166 utils::NativeParallelFor( num_threads, put_body<T>(lim, accept_count) ); 167 g.wait_for_all(); 168 int c = accept_count; 169 CHECK_MESSAGE( c == i, "" ); 170 } 171 172 // test puts with decrements 173 for ( int i = 1; i < L; ++i ) { 174 tbb::flow::graph g; 175 tbb::flow::limiter_node< T > lim(g, i); 176 test_puts_with_decrements(num_threads, lim, g); 177 tbb::flow::limiter_node< T > lim_copy( lim ); 178 test_puts_with_decrements(num_threads, lim_copy, g); 179 } 180 181 return 0; 182 } 183 184 // 185 // Tests 186 // 187 // limiter only forwards below the limit, single sender / single receiver 188 // at reject, a put to decrement, will cause next message to be accepted 189 // 190 template< typename T > 191 int test_serial() { 192 193 // test puts with no decrements 194 for ( int i = 0; i < L; ++i ) { 195 tbb::flow::graph g; 196 tbb::flow::limiter_node< T > lim(g, i); 197 serial_receiver<T> r(g); 198 tbb::flow::make_edge( lim, r ); 199 for ( int j = 0; j < L; ++j ) { 200 bool msg = lim.try_put( T(j) ); 201 CHECK_MESSAGE( (( j < i && msg == true ) || ( j >= i && msg == false )), "" ); 202 } 203 g.wait_for_all(); 204 } 205 206 // test puts with decrements 207 for ( int i = 1; i < L; ++i ) { 208 tbb::flow::graph g; 209 tbb::flow::limiter_node< T > lim(g, i); 210 serial_receiver<T> r(g); 211 empty_sender< tbb::flow::continue_msg > s; 212 tbb::flow::make_edge( lim, r ); 213 tbb::flow::make_edge(s, lim.decrementer()); 214 for ( int j = 0; j < N; ++j ) { 215 bool msg = lim.try_put( T(j) ); 216 CHECK_MESSAGE( (( j < i && msg == true ) || ( j >= i && msg == false )), "" ); 217 if ( msg == false ) { 218 lim.decrementer().try_put( tbb::flow::continue_msg() ); 219 msg = lim.try_put( T(j) ); 220 CHECK_MESSAGE( msg == true, "" ); 221 } 222 } 223 } 224 return 0; 225 } 226 227 // reported bug in limiter (http://software.intel.com/en-us/comment/1752355) 228 #define DECREMENT_OUTPUT 1 // the port number of the decrement output of the multifunction_node 229 #define LIMITER_OUTPUT 0 // port number of the integer output 230 231 typedef tbb::flow::multifunction_node<int, std::tuple<int,tbb::flow::continue_msg> > mfnode_type; 232 233 std::atomic<size_t> emit_count; 234 std::atomic<size_t> emit_sum; 235 std::atomic<size_t> receive_count; 236 std::atomic<size_t> receive_sum; 237 238 struct mfnode_body { 239 int max_cnt; 240 std::atomic<int>* my_cnt; 241 mfnode_body(const int& _max, std::atomic<int> &_my) : max_cnt(_max), my_cnt(&_my) { } 242 void operator()(const int &/*in*/, mfnode_type::output_ports_type &out) { 243 int lcnt = ++(*my_cnt); 244 if(lcnt > max_cnt) { 245 return; 246 } 247 // put one continue_msg to the decrement of the limiter. 248 if(!std::get<DECREMENT_OUTPUT>(out).try_put(tbb::flow::continue_msg())) { 249 CHECK_MESSAGE( (false),"Unexpected rejection of decrement"); 250 } 251 { 252 // put messages to the input of the limiter_node until it rejects. 253 while( std::get<LIMITER_OUTPUT>(out).try_put(lcnt) ) { 254 emit_sum += lcnt; 255 ++emit_count; 256 } 257 } 258 } 259 }; 260 261 struct fn_body { 262 int operator()(const int &in) { 263 receive_sum += in; 264 ++receive_count; 265 return in; 266 } 267 }; 268 269 // +------------+ 270 // +---------+ | v 271 // | mf_node |0---+ +----------+ +----------+ 272 // +->| |1---------->| lim_node |--------->| fn_node |--+ 273 // | +---------+ +----------+ +----------+ | 274 // | | 275 // | | 276 // +-------------------------------------------------------------+ 277 // 278 void 279 test_multifunction_to_limiter(int _max, int _nparallel) { 280 tbb::flow::graph g; 281 emit_count = 0; 282 emit_sum = 0; 283 receive_count = 0; 284 receive_sum = 0; 285 std::atomic<int> local_cnt; 286 local_cnt = 0; 287 mfnode_type mf_node(g, tbb::flow::unlimited, mfnode_body(_max, local_cnt)); 288 tbb::flow::function_node<int, int> fn_node(g, tbb::flow::unlimited, fn_body()); 289 tbb::flow::limiter_node<int> lim_node(g, _nparallel); 290 tbb::flow::make_edge(tbb::flow::output_port<LIMITER_OUTPUT>(mf_node), lim_node); 291 tbb::flow::make_edge(tbb::flow::output_port<DECREMENT_OUTPUT>(mf_node), lim_node.decrementer()); 292 tbb::flow::make_edge(lim_node, fn_node); 293 tbb::flow::make_edge(fn_node, mf_node); 294 295 mf_node.try_put(1); 296 g.wait_for_all(); 297 CHECK_MESSAGE( (emit_count == receive_count), "counts do not match"); 298 CHECK_MESSAGE( (emit_sum == receive_sum), "sums do not match"); 299 300 // reset, test again 301 g.reset(); 302 emit_count = 0; 303 emit_sum = 0; 304 receive_count = 0; 305 receive_sum = 0; 306 local_cnt = 0;; 307 mf_node.try_put(1); 308 g.wait_for_all(); 309 CHECK_MESSAGE( (emit_count == receive_count), "counts do not match"); 310 CHECK_MESSAGE( (emit_sum == receive_sum), "sums do not match"); 311 } 312 313 314 void 315 test_continue_msg_reception() { 316 tbb::flow::graph g; 317 tbb::flow::limiter_node<int> ln(g,2); 318 tbb::flow::queue_node<int> qn(g); 319 tbb::flow::make_edge(ln, qn); 320 ln.decrementer().try_put(tbb::flow::continue_msg()); 321 ln.try_put(42); 322 g.wait_for_all(); 323 int outint; 324 CHECK_MESSAGE( (qn.try_get(outint) && outint == 42), "initial put to decrement stops node"); 325 } 326 327 328 // 329 // This test ascertains that if a message is not successfully put 330 // to a successor, the message is not dropped but released. 331 // 332 333 void test_reserve_release_messages() { 334 using namespace tbb::flow; 335 graph g; 336 337 //making two queue_nodes: one broadcast_node and one limiter_node 338 queue_node<int> input_queue(g); 339 queue_node<int> output_queue(g); 340 broadcast_node<int> broad(g); 341 limiter_node<int, int> limit(g,2); //threshold of 2 342 343 //edges 344 make_edge(input_queue, limit); 345 make_edge(limit, output_queue); 346 make_edge(broad,limit.decrementer()); 347 348 int list[4] = {19, 33, 72, 98}; //list to be put to the input queue 349 350 input_queue.try_put(list[0]); // succeeds 351 input_queue.try_put(list[1]); // succeeds 352 input_queue.try_put(list[2]); // fails, stored in upstream buffer 353 g.wait_for_all(); 354 355 remove_edge(limit, output_queue); //remove successor 356 357 //sending message to the decrement port of the limiter 358 broad.try_put(1); //failed message retrieved. 359 g.wait_for_all(); 360 361 tbb::flow::make_edge(limit, output_queue); //putting the successor back 362 363 broad.try_put(1); //drop the count 364 365 input_queue.try_put(list[3]); //success 366 g.wait_for_all(); 367 368 int var=0; 369 370 for (int i=0; i<4; i++) { 371 output_queue.try_get(var); 372 CHECK_MESSAGE( (var==list[i]), "some data dropped, input does not match output"); 373 g.wait_for_all(); 374 } 375 } 376 377 void test_decrementer() { 378 const int threshold = 5; 379 tbb::flow::graph g; 380 tbb::flow::limiter_node<int, int> limit(g, threshold); 381 tbb::flow::queue_node<int> queue(g); 382 make_edge(limit, queue); 383 int m = 0; 384 CHECK_MESSAGE( ( limit.try_put( m++ )), "Newly constructed limiter node does not accept message." ); 385 CHECK_MESSAGE( limit.decrementer().try_put( -threshold ), // close limiter's gate 386 "Limiter node decrementer's port does not accept message." ); 387 CHECK_MESSAGE( ( !limit.try_put( m++ )), "Closed limiter node's accepts message." ); 388 CHECK_MESSAGE( limit.decrementer().try_put( threshold + 5 ), // open limiter's gate 389 "Limiter node decrementer's port does not accept message." ); 390 for( int i = 0; i < threshold; ++i ) 391 CHECK_MESSAGE( ( limit.try_put( m++ )), "Limiter node does not accept message while open." ); 392 CHECK_MESSAGE( ( !limit.try_put( m )), "Limiter node's gate is not closed." ); 393 g.wait_for_all(); 394 int expected[] = {0, 2, 3, 4, 5, 6}; 395 int actual = -1; m = 0; 396 while( queue.try_get(actual) ) 397 CHECK_MESSAGE( actual == expected[m++], "" ); 398 CHECK_MESSAGE( ( sizeof(expected) / sizeof(expected[0]) == m), "Not all messages have been processed." ); 399 g.wait_for_all(); 400 401 const size_t threshold2 = size_t(-1); 402 tbb::flow::limiter_node<int, long long> limit2(g, threshold2); 403 make_edge(limit2, queue); 404 CHECK_MESSAGE( ( limit2.try_put( 1 )), "Newly constructed limiter node does not accept message." ); 405 long long decrement_value = (long long)( size_t(-1)/2 ); 406 CHECK_MESSAGE( limit2.decrementer().try_put( -decrement_value ), 407 "Limiter node decrementer's port does not accept message" ); 408 CHECK_MESSAGE( ( limit2.try_put( 2 )), "Limiter's gate should not be closed yet." ); 409 CHECK_MESSAGE( limit2.decrementer().try_put( -decrement_value ), 410 "Limiter node decrementer's port does not accept message" ); 411 CHECK_MESSAGE( ( !limit2.try_put( 3 )), "Overflow happened for internal counter." ); 412 int expected2[] = {1, 2}; 413 actual = -1; m = 0; 414 while( queue.try_get(actual) ) 415 CHECK_MESSAGE( actual == expected2[m++], "" ); 416 CHECK_MESSAGE( ( sizeof(expected2) / sizeof(expected2[0]) == m), "Not all messages have been processed." ); 417 g.wait_for_all(); 418 419 const size_t threshold3 = 10; 420 tbb::flow::limiter_node<int, long long> limit3(g, threshold3); 421 make_edge(limit3, queue); 422 long long decrement_value3 = 3; 423 CHECK_MESSAGE( limit3.decrementer().try_put( -decrement_value3 ), 424 "Limiter node decrementer's port does not accept message" ); 425 426 m = 0; 427 while( limit3.try_put( m ) ){ m++; }; 428 CHECK_MESSAGE( m == threshold3 - decrement_value3, "Not all messages have been accepted." ); 429 430 actual = -1; m = 0; 431 while( queue.try_get(actual) ){ 432 CHECK_MESSAGE( actual == m++, "Not all messages have been processed." ); 433 } 434 435 g.wait_for_all(); 436 CHECK_MESSAGE( m == threshold3 - decrement_value3, "Not all messages have been processed." ); 437 } 438 439 void test_try_put_without_successors() { 440 tbb::flow::graph g; 441 int try_put_num{3}; 442 tbb::flow::buffer_node<int> bn(g); 443 tbb::flow::limiter_node<int> ln(g, try_put_num); 444 445 tbb::flow::make_edge(bn, ln); 446 447 int i = 1; 448 for (; i <= try_put_num; i++) 449 bn.try_put(i); 450 451 std::atomic<int> counter{0}; 452 tbb::flow::function_node<int, int> fn(g, tbb::flow::unlimited, 453 [&](int input) { 454 counter += input; 455 return int{}; 456 } 457 ); 458 459 tbb::flow::make_edge(ln, fn); 460 461 g.wait_for_all(); 462 CHECK((counter == i * try_put_num / 2)); 463 464 // Check the lost message 465 tbb::flow::remove_edge(bn, ln); 466 ln.decrementer().try_put(tbb::flow::continue_msg()); 467 bn.try_put(try_put_num + 1); 468 g.wait_for_all(); 469 CHECK((counter == i * try_put_num / 2)); 470 471 } 472 473 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 474 #include <array> 475 #include <vector> 476 void test_follows_and_precedes_api() { 477 using msg_t = tbb::flow::continue_msg; 478 479 std::array<msg_t, 3> messages_for_follows= { {msg_t(), msg_t(), msg_t()} }; 480 std::vector<msg_t> messages_for_precedes = {msg_t()}; 481 482 follows_and_precedes_testing::test_follows 483 <msg_t, tbb::flow::limiter_node<msg_t, msg_t>>(messages_for_follows, 1000); 484 follows_and_precedes_testing::test_precedes 485 <msg_t, tbb::flow::limiter_node<msg_t, msg_t>>(messages_for_precedes, 1000); 486 487 } 488 #endif 489 490 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 491 void test_deduction_guides() { 492 using namespace tbb::flow; 493 494 graph g; 495 broadcast_node<int> br(g); 496 limiter_node<int> l0(g, 100); 497 498 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 499 limiter_node l1(follows(br), 100); 500 static_assert(std::is_same_v<decltype(l1), limiter_node<int>>); 501 502 limiter_node l2(precedes(br), 100); 503 static_assert(std::is_same_v<decltype(l2), limiter_node<int>>); 504 #endif 505 506 limiter_node l3(l0); 507 static_assert(std::is_same_v<decltype(l3), limiter_node<int>>); 508 } 509 #endif 510 511 void test_decrement_while_try_put_task() { 512 constexpr int threshold = 50000; 513 514 tbb::flow::graph graph{}; 515 std::atomic<int> processed; 516 tbb::flow::input_node<int> input{ graph, [&](tbb::flow_control & fc) -> int { 517 static int i = {}; 518 if (i++ >= threshold) fc.stop(); 519 return i; 520 }}; 521 tbb::flow::limiter_node<int, int> blockingNode{ graph, 1 }; 522 tbb::flow::multifunction_node<int, std::tuple<int>> processing{ graph, tbb::flow::serial, 523 [&](const int & value, typename decltype(processing)::output_ports_type & out) { 524 if (value != threshold) 525 std::get<0>(out).try_put(1); 526 processed.store(value); 527 }}; 528 529 tbb::flow::make_edge(input, blockingNode); 530 tbb::flow::make_edge(blockingNode, processing); 531 tbb::flow::make_edge(processing, blockingNode.decrementer()); 532 533 input.activate(); 534 535 graph.wait_for_all(); 536 CHECK_MESSAGE(processed.load() == threshold, "decrementer terminate flow graph work"); 537 } 538 539 540 //! Test puts on limiter_node with decrements and varying parallelism levels 541 //! \brief \ref error_guessing 542 TEST_CASE("Serial and parallel tests") { 543 for (unsigned i = 1; i <= 2 * utils::MaxThread; ++i) { 544 tbb::task_arena arena(i); 545 arena.execute( 546 [i]() { 547 test_serial<int>(); 548 test_parallel<int>(i); 549 } 550 ); 551 } 552 } 553 554 //! Test initial put of continue_msg on decrementer port does not stop message flow 555 //! \brief \ref error_guessing 556 TEST_CASE("Test continue_msg reception") { 557 test_continue_msg_reception(); 558 } 559 560 //! Test put message on decrementer port does not stop message flow 561 //! \brief \ref error_guessing 562 TEST_CASE("Test try_put to decrementer while try_put to limiter_node") { 563 test_decrement_while_try_put_task(); 564 } 565 566 //! Test multifunction_node connected to limiter_node 567 //! \brief \ref error_guessing 568 TEST_CASE("Multifunction connected to limiter") { 569 test_multifunction_to_limiter(30,3); 570 test_multifunction_to_limiter(300,13); 571 test_multifunction_to_limiter(3000,1); 572 } 573 574 //! Test message release if successor doesn't accept 575 //! \brief \ref requirement 576 TEST_CASE("Message is released if successor does not accept") { 577 test_reserve_release_messages(); 578 } 579 580 //! Test decrementer 581 //! \brief \ref requirement \ref error_guessing 582 TEST_CASE("Decrementer") { 583 test_decrementer(); 584 } 585 586 //! Test try_put() without successor 587 //! \brief \ref error_guessing 588 TEST_CASE("Test try_put() without successors") { 589 test_try_put_without_successors(); 590 } 591 592 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 593 //! Test follows and precedes API 594 //! \brief \ref error_guessing 595 TEST_CASE( "Support for follows and precedes API" ) { 596 test_follows_and_precedes_api(); 597 } 598 #endif 599 600 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 601 //! Test deduction guides 602 //! \brief \ref requirement 603 TEST_CASE( "Deduction guides" ) { 604 test_deduction_guides(); 605 } 606 #endif 607 608 //! Test correct node deallocation while using small_object_pool. 609 //! (see https://github.com/oneapi-src/oneTBB/issues/639) 610 //! \brief \ref error_guessing 611 TEST_CASE("Test correct node deallocation while using small_object_pool") { 612 struct TestLargeStruct { 613 char bytes[512]{ 0 }; 614 }; 615 616 tbb::flow::graph graph; 617 tbb::flow::queue_node<TestLargeStruct> input_node{ graph }; 618 tbb::flow::function_node<TestLargeStruct> func{ graph, tbb::flow::serial, 619 [](const TestLargeStruct& input) { return input; } }; 620 621 tbb::flow::make_edge( input_node, func ); 622 CHECK( input_node.try_put( TestLargeStruct{} ) ); 623 graph.wait_for_all(); 624 625 tbb::task_scheduler_handle handle = tbb::task_scheduler_handle::get(); 626 REQUIRE_NOTHROW( tbb::finalize( handle, std::nothrow ) ); 627 } 628