1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #if __INTEL_COMPILER && _MSC_VER 18 #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated 19 #endif 20 21 #define TBB_PREVIEW_WAITING_FOR_WORKERS 1 22 23 #include "common/config.h" 24 25 #include "tbb/flow_graph.h" 26 27 #include "common/test.h" 28 #include "common/utils.h" 29 #include "common/utils_assert.h" 30 #include "common/test_follows_and_precedes_api.h" 31 #include "tbb/global_control.h" 32 33 #include <atomic> 34 35 36 //! \file test_limiter_node.cpp 37 //! \brief Test for [flow_graph.limiter_node] specification 38 39 40 const int L = 10; 41 const int N = 1000; 42 43 using tbb::detail::d1::SUCCESSFULLY_ENQUEUED; 44 using tbb::detail::d1::graph_task; 45 46 template< typename T > 47 struct serial_receiver : public tbb::flow::receiver<T>, utils::NoAssign { 48 T next_value; 49 tbb::flow::graph& my_graph; 50 51 serial_receiver(tbb::flow::graph& g) : next_value(T(0)), my_graph(g) {} 52 53 graph_task* try_put_task( const T &v ) override { 54 CHECK_MESSAGE( next_value++ == v, "" ); 55 return const_cast<graph_task*>(SUCCESSFULLY_ENQUEUED); 56 } 57 58 tbb::flow::graph& graph_reference() const override { 59 return my_graph; 60 } 61 }; 62 63 template< typename T > 64 struct parallel_receiver : public tbb::flow::receiver<T>, utils::NoAssign { 65 66 std::atomic<int> my_count; 67 tbb::flow::graph& my_graph; 68 69 parallel_receiver(tbb::flow::graph& g) : my_graph(g) { my_count = 0; } 70 71 graph_task* try_put_task( const T &/*v*/ ) override { 72 ++my_count; 73 return const_cast<graph_task*>(SUCCESSFULLY_ENQUEUED); 74 } 75 76 tbb::flow::graph& graph_reference() const override { 77 return my_graph; 78 } 79 }; 80 81 template< typename T > 82 struct empty_sender : public tbb::flow::sender<T> { 83 typedef typename tbb::flow::sender<T>::successor_type successor_type; 84 85 bool register_successor( successor_type & ) override { return false; } 86 bool remove_successor( successor_type & ) override { return false; } 87 }; 88 89 90 template< typename T > 91 struct put_body : utils::NoAssign { 92 93 tbb::flow::limiter_node<T> &my_lim; 94 std::atomic<int> &my_accept_count; 95 96 put_body( tbb::flow::limiter_node<T> &lim, std::atomic<int> &accept_count ) : 97 my_lim(lim), my_accept_count(accept_count) {} 98 99 void operator()( int ) const { 100 for ( int i = 0; i < L; ++i ) { 101 bool msg = my_lim.try_put( T(i) ); 102 if ( msg == true ) 103 ++my_accept_count; 104 } 105 } 106 }; 107 108 template< typename T > 109 struct put_dec_body : utils::NoAssign { 110 111 tbb::flow::limiter_node<T> &my_lim; 112 std::atomic<int> &my_accept_count; 113 114 put_dec_body( tbb::flow::limiter_node<T> &lim, std::atomic<int> &accept_count ) : 115 my_lim(lim), my_accept_count(accept_count) {} 116 117 void operator()( int ) const { 118 int local_accept_count = 0; 119 while ( local_accept_count < N ) { 120 bool msg = my_lim.try_put( T(local_accept_count) ); 121 if ( msg == true ) { 122 ++local_accept_count; 123 ++my_accept_count; 124 my_lim.decrementer().try_put( tbb::flow::continue_msg() ); 125 } 126 } 127 } 128 129 }; 130 131 template< typename Sender, typename Receiver > 132 void make_edge_impl(Sender& sender, Receiver& receiver){ 133 #if __GNUC__ < 12 && !TBB_USE_DEBUG 134 // Seemingly, GNU compiler generates incorrect code for the call of limiter.register_successor in release (-03) 135 // The function pointer to make_edge workarounds the issue for unknown reason 136 auto make_edge_ptr = tbb::flow::make_edge<int>; 137 make_edge_ptr(sender, receiver); 138 #else 139 tbb::flow::make_edge(sender, receiver); 140 #endif 141 } 142 143 template< typename T > 144 void test_puts_with_decrements( int num_threads, tbb::flow::limiter_node< T >& lim , tbb::flow::graph& g) { 145 parallel_receiver<T> r(g); 146 empty_sender< tbb::flow::continue_msg > s; 147 std::atomic<int> accept_count; 148 accept_count = 0; 149 tbb::flow::make_edge( lim, r ); 150 tbb::flow::make_edge(s, lim.decrementer()); 151 152 // test puts with decrements 153 utils::NativeParallelFor( num_threads, put_dec_body<T>(lim, accept_count) ); 154 int c = accept_count; 155 CHECK_MESSAGE( c == N*num_threads, "" ); 156 CHECK_MESSAGE( r.my_count == N*num_threads, "" ); 157 } 158 159 // 160 // Tests 161 // 162 // limiter only forwards below the limit, multiple parallel senders / single receiver 163 // multiple parallel senders that put to decrement at each accept, limiter accepts new messages 164 // 165 // 166 template< typename T > 167 int test_parallel(int num_threads) { 168 169 // test puts with no decrements 170 for ( int i = 0; i < L; ++i ) { 171 tbb::flow::graph g; 172 tbb::flow::limiter_node< T > lim(g, i); 173 parallel_receiver<T> r(g); 174 std::atomic<int> accept_count; 175 accept_count = 0; 176 tbb::flow::make_edge( lim, r ); 177 // test puts with no decrements 178 utils::NativeParallelFor( num_threads, put_body<T>(lim, accept_count) ); 179 g.wait_for_all(); 180 int c = accept_count; 181 CHECK_MESSAGE( c == i, "" ); 182 } 183 184 // test puts with decrements 185 for ( int i = 1; i < L; ++i ) { 186 tbb::flow::graph g; 187 tbb::flow::limiter_node< T > lim(g, i); 188 test_puts_with_decrements(num_threads, lim, g); 189 tbb::flow::limiter_node< T > lim_copy( lim ); 190 test_puts_with_decrements(num_threads, lim_copy, g); 191 } 192 193 return 0; 194 } 195 196 // 197 // Tests 198 // 199 // limiter only forwards below the limit, single sender / single receiver 200 // at reject, a put to decrement, will cause next message to be accepted 201 // 202 template< typename T > 203 int test_serial() { 204 205 // test puts with no decrements 206 for ( int i = 0; i < L; ++i ) { 207 tbb::flow::graph g; 208 tbb::flow::limiter_node< T > lim(g, i); 209 serial_receiver<T> r(g); 210 tbb::flow::make_edge( lim, r ); 211 for ( int j = 0; j < L; ++j ) { 212 bool msg = lim.try_put( T(j) ); 213 CHECK_MESSAGE( (( j < i && msg == true ) || ( j >= i && msg == false )), "" ); 214 } 215 g.wait_for_all(); 216 } 217 218 // test puts with decrements 219 for ( int i = 1; i < L; ++i ) { 220 tbb::flow::graph g; 221 tbb::flow::limiter_node< T > lim(g, i); 222 serial_receiver<T> r(g); 223 empty_sender< tbb::flow::continue_msg > s; 224 tbb::flow::make_edge( lim, r ); 225 tbb::flow::make_edge(s, lim.decrementer()); 226 for ( int j = 0; j < N; ++j ) { 227 bool msg = lim.try_put( T(j) ); 228 CHECK_MESSAGE( (( j < i && msg == true ) || ( j >= i && msg == false )), "" ); 229 if ( msg == false ) { 230 lim.decrementer().try_put( tbb::flow::continue_msg() ); 231 msg = lim.try_put( T(j) ); 232 CHECK_MESSAGE( msg == true, "" ); 233 } 234 } 235 } 236 return 0; 237 } 238 239 // reported bug in limiter (http://software.intel.com/en-us/comment/1752355) 240 #define DECREMENT_OUTPUT 1 // the port number of the decrement output of the multifunction_node 241 #define LIMITER_OUTPUT 0 // port number of the integer output 242 243 typedef tbb::flow::multifunction_node<int, std::tuple<int,tbb::flow::continue_msg> > mfnode_type; 244 245 std::atomic<size_t> emit_count; 246 std::atomic<size_t> emit_sum; 247 std::atomic<size_t> receive_count; 248 std::atomic<size_t> receive_sum; 249 250 struct mfnode_body { 251 int max_cnt; 252 std::atomic<int>* my_cnt; 253 mfnode_body(const int& _max, std::atomic<int> &_my) : max_cnt(_max), my_cnt(&_my) { } 254 void operator()(const int &/*in*/, mfnode_type::output_ports_type &out) { 255 int lcnt = ++(*my_cnt); 256 if(lcnt > max_cnt) { 257 return; 258 } 259 // put one continue_msg to the decrement of the limiter. 260 if(!std::get<DECREMENT_OUTPUT>(out).try_put(tbb::flow::continue_msg())) { 261 CHECK_MESSAGE( (false),"Unexpected rejection of decrement"); 262 } 263 { 264 // put messages to the input of the limiter_node until it rejects. 265 while( std::get<LIMITER_OUTPUT>(out).try_put(lcnt) ) { 266 emit_sum += lcnt; 267 ++emit_count; 268 } 269 } 270 } 271 }; 272 273 struct fn_body { 274 int operator()(const int &in) { 275 receive_sum += in; 276 ++receive_count; 277 return in; 278 } 279 }; 280 281 // +------------+ 282 // +---------+ | v 283 // | mf_node |0---+ +----------+ +----------+ 284 // +->| |1---------->| lim_node |--------->| fn_node |--+ 285 // | +---------+ +----------+ +----------+ | 286 // | | 287 // | | 288 // +-------------------------------------------------------------+ 289 // 290 void 291 test_multifunction_to_limiter(int _max, int _nparallel) { 292 tbb::flow::graph g; 293 emit_count = 0; 294 emit_sum = 0; 295 receive_count = 0; 296 receive_sum = 0; 297 std::atomic<int> local_cnt; 298 local_cnt = 0; 299 mfnode_type mf_node(g, tbb::flow::unlimited, mfnode_body(_max, local_cnt)); 300 tbb::flow::function_node<int, int> fn_node(g, tbb::flow::unlimited, fn_body()); 301 tbb::flow::limiter_node<int> lim_node(g, _nparallel); 302 tbb::flow::make_edge(tbb::flow::output_port<LIMITER_OUTPUT>(mf_node), lim_node); 303 tbb::flow::make_edge(tbb::flow::output_port<DECREMENT_OUTPUT>(mf_node), lim_node.decrementer()); 304 tbb::flow::make_edge(lim_node, fn_node); 305 tbb::flow::make_edge(fn_node, mf_node); 306 307 mf_node.try_put(1); 308 g.wait_for_all(); 309 CHECK_MESSAGE( (emit_count == receive_count), "counts do not match"); 310 CHECK_MESSAGE( (emit_sum == receive_sum), "sums do not match"); 311 312 // reset, test again 313 g.reset(); 314 emit_count = 0; 315 emit_sum = 0; 316 receive_count = 0; 317 receive_sum = 0; 318 local_cnt = 0;; 319 mf_node.try_put(1); 320 g.wait_for_all(); 321 CHECK_MESSAGE( (emit_count == receive_count), "counts do not match"); 322 CHECK_MESSAGE( (emit_sum == receive_sum), "sums do not match"); 323 } 324 325 326 void 327 test_continue_msg_reception() { 328 tbb::flow::graph g; 329 tbb::flow::limiter_node<int> ln(g,2); 330 tbb::flow::queue_node<int> qn(g); 331 tbb::flow::make_edge(ln, qn); 332 ln.decrementer().try_put(tbb::flow::continue_msg()); 333 ln.try_put(42); 334 g.wait_for_all(); 335 int outint; 336 CHECK_MESSAGE( (qn.try_get(outint) && outint == 42), "initial put to decrement stops node"); 337 } 338 339 340 // 341 // This test ascertains that if a message is not successfully put 342 // to a successor, the message is not dropped but released. 343 // 344 345 void test_reserve_release_messages() { 346 using namespace tbb::flow; 347 graph g; 348 349 //making two queue_nodes: one broadcast_node and one limiter_node 350 queue_node<int> input_queue(g); 351 queue_node<int> output_queue(g); 352 broadcast_node<int> broad(g); 353 limiter_node<int, int> limit(g,2); //threshold of 2 354 355 //edges 356 make_edge(input_queue, limit); 357 make_edge(limit, output_queue); 358 make_edge(broad,limit.decrementer()); 359 360 int list[4] = {19, 33, 72, 98}; //list to be put to the input queue 361 362 input_queue.try_put(list[0]); // succeeds 363 input_queue.try_put(list[1]); // succeeds 364 input_queue.try_put(list[2]); // fails, stored in upstream buffer 365 g.wait_for_all(); 366 367 remove_edge(limit, output_queue); //remove successor 368 369 //sending message to the decrement port of the limiter 370 broad.try_put(1); //failed message retrieved. 371 g.wait_for_all(); 372 373 make_edge_impl(limit, output_queue); //putting the successor back 374 375 broad.try_put(1); //drop the count 376 377 input_queue.try_put(list[3]); //success 378 g.wait_for_all(); 379 380 int var=0; 381 382 for (int i=0; i<4; i++) { 383 output_queue.try_get(var); 384 CHECK_MESSAGE( (var==list[i]), "some data dropped, input does not match output"); 385 g.wait_for_all(); 386 } 387 } 388 389 void test_decrementer() { 390 const int threshold = 5; 391 tbb::flow::graph g; 392 tbb::flow::limiter_node<int, int> limit(g, threshold); 393 tbb::flow::queue_node<int> queue(g); 394 make_edge(limit, queue); 395 int m = 0; 396 CHECK_MESSAGE( ( limit.try_put( m++ )), "Newly constructed limiter node does not accept message." ); 397 CHECK_MESSAGE( limit.decrementer().try_put( -threshold ), // close limiter's gate 398 "Limiter node decrementer's port does not accept message." ); 399 CHECK_MESSAGE( ( !limit.try_put( m++ )), "Closed limiter node's accepts message." ); 400 CHECK_MESSAGE( limit.decrementer().try_put( threshold + 5 ), // open limiter's gate 401 "Limiter node decrementer's port does not accept message." ); 402 for( int i = 0; i < threshold; ++i ) 403 CHECK_MESSAGE( ( limit.try_put( m++ )), "Limiter node does not accept message while open." ); 404 CHECK_MESSAGE( ( !limit.try_put( m )), "Limiter node's gate is not closed." ); 405 g.wait_for_all(); 406 int expected[] = {0, 2, 3, 4, 5, 6}; 407 int actual = -1; m = 0; 408 while( queue.try_get(actual) ) 409 CHECK_MESSAGE( actual == expected[m++], "" ); 410 CHECK_MESSAGE( ( sizeof(expected) / sizeof(expected[0]) == m), "Not all messages have been processed." ); 411 g.wait_for_all(); 412 413 const size_t threshold2 = size_t(-1); 414 tbb::flow::limiter_node<int, long long> limit2(g, threshold2); 415 make_edge(limit2, queue); 416 CHECK_MESSAGE( ( limit2.try_put( 1 )), "Newly constructed limiter node does not accept message." ); 417 long long decrement_value = (long long)( size_t(-1)/2 ); 418 CHECK_MESSAGE( limit2.decrementer().try_put( -decrement_value ), 419 "Limiter node decrementer's port does not accept message" ); 420 CHECK_MESSAGE( ( limit2.try_put( 2 )), "Limiter's gate should not be closed yet." ); 421 CHECK_MESSAGE( limit2.decrementer().try_put( -decrement_value ), 422 "Limiter node decrementer's port does not accept message" ); 423 CHECK_MESSAGE( ( !limit2.try_put( 3 )), "Overflow happened for internal counter." ); 424 int expected2[] = {1, 2}; 425 actual = -1; m = 0; 426 while( queue.try_get(actual) ) 427 CHECK_MESSAGE( actual == expected2[m++], "" ); 428 CHECK_MESSAGE( ( sizeof(expected2) / sizeof(expected2[0]) == m), "Not all messages have been processed." ); 429 g.wait_for_all(); 430 431 const size_t threshold3 = 10; 432 tbb::flow::limiter_node<int, long long> limit3(g, threshold3); 433 make_edge(limit3, queue); 434 long long decrement_value3 = 3; 435 CHECK_MESSAGE( limit3.decrementer().try_put( -decrement_value3 ), 436 "Limiter node decrementer's port does not accept message" ); 437 438 m = 0; 439 while( limit3.try_put( m ) ){ m++; }; 440 CHECK_MESSAGE( m == threshold3 - decrement_value3, "Not all messages have been accepted." ); 441 442 actual = -1; m = 0; 443 while( queue.try_get(actual) ){ 444 CHECK_MESSAGE( actual == m++, "Not all messages have been processed." ); 445 } 446 447 g.wait_for_all(); 448 CHECK_MESSAGE( m == threshold3 - decrement_value3, "Not all messages have been processed." ); 449 } 450 451 void test_try_put_without_successors() { 452 tbb::flow::graph g; 453 int try_put_num{3}; 454 tbb::flow::buffer_node<int> bn(g); 455 tbb::flow::limiter_node<int> ln(g, try_put_num); 456 457 tbb::flow::make_edge(bn, ln); 458 459 int i = 1; 460 for (; i <= try_put_num; i++) 461 bn.try_put(i); 462 463 std::atomic<int> counter{0}; 464 tbb::flow::function_node<int, int> fn(g, tbb::flow::unlimited, 465 [&](int input) { 466 counter += input; 467 return int{}; 468 } 469 ); 470 471 make_edge_impl(ln, fn); 472 473 g.wait_for_all(); 474 CHECK((counter == i * try_put_num / 2)); 475 476 // Check the lost message 477 tbb::flow::remove_edge(bn, ln); 478 ln.decrementer().try_put(tbb::flow::continue_msg()); 479 bn.try_put(try_put_num + 1); 480 g.wait_for_all(); 481 CHECK((counter == i * try_put_num / 2)); 482 483 } 484 485 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 486 #include <array> 487 #include <vector> 488 void test_follows_and_precedes_api() { 489 using msg_t = tbb::flow::continue_msg; 490 491 std::array<msg_t, 3> messages_for_follows= { {msg_t(), msg_t(), msg_t()} }; 492 std::vector<msg_t> messages_for_precedes = {msg_t()}; 493 494 follows_and_precedes_testing::test_follows 495 <msg_t, tbb::flow::limiter_node<msg_t, msg_t>>(messages_for_follows, 1000); 496 follows_and_precedes_testing::test_precedes 497 <msg_t, tbb::flow::limiter_node<msg_t, msg_t>>(messages_for_precedes, 1000); 498 499 } 500 #endif 501 502 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 503 void test_deduction_guides() { 504 using namespace tbb::flow; 505 506 graph g; 507 broadcast_node<int> br(g); 508 limiter_node<int> l0(g, 100); 509 510 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 511 limiter_node l1(follows(br), 100); 512 static_assert(std::is_same_v<decltype(l1), limiter_node<int>>); 513 514 limiter_node l2(precedes(br), 100); 515 static_assert(std::is_same_v<decltype(l2), limiter_node<int>>); 516 #endif 517 518 limiter_node l3(l0); 519 static_assert(std::is_same_v<decltype(l3), limiter_node<int>>); 520 } 521 #endif 522 523 //! Test puts on limiter_node with decrements and varying parallelism levels 524 //! \brief \ref error_guessing 525 TEST_CASE("Serial and parallel tests") { 526 for (unsigned i = 1; i <= 2 * utils::MaxThread; ++i) { 527 tbb::task_arena arena(i); 528 arena.execute( 529 [i]() { 530 test_serial<int>(); 531 test_parallel<int>(i); 532 } 533 ); 534 } 535 } 536 537 //! Test initial put of continue_msg on decrementer port does not stop message flow 538 //! \brief \ref error_guessing 539 TEST_CASE("Test continue_msg reception") { 540 test_continue_msg_reception(); 541 } 542 543 //! Test multifunction_node connected to limiter_node 544 //! \brief \ref error_guessing 545 TEST_CASE("Multifunction connected to limiter") { 546 test_multifunction_to_limiter(30,3); 547 test_multifunction_to_limiter(300,13); 548 test_multifunction_to_limiter(3000,1); 549 } 550 551 //! Test message release if successor doesn't accept 552 //! \brief \ref requirement 553 TEST_CASE("Message is released if successor does not accept") { 554 test_reserve_release_messages(); 555 } 556 557 //! Test decrementer 558 //! \brief \ref requirement \ref error_guessing 559 TEST_CASE("Decrementer") { 560 test_decrementer(); 561 } 562 563 //! Test try_put() without successor 564 //! \brief \ref error_guessing 565 TEST_CASE("Test try_put() without successors") { 566 test_try_put_without_successors(); 567 } 568 569 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 570 //! Test follows and precedes API 571 //! \brief \ref error_guessing 572 TEST_CASE( "Support for follows and precedes API" ) { 573 test_follows_and_precedes_api(); 574 } 575 #endif 576 577 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 578 //! Test deduction guides 579 //! \brief \ref requirement 580 TEST_CASE( "Deduction guides" ) { 581 test_deduction_guides(); 582 } 583 #endif 584 585 //! Test correct node deallocation while using small_object_pool. 586 //! (see https://github.com/oneapi-src/oneTBB/issues/639) 587 //! \brief \ref error_guessing 588 TEST_CASE("Test correct node deallocation while using small_object_pool") { 589 struct TestLargeStruct { 590 char bytes[512]{ 0 }; 591 }; 592 593 tbb::flow::graph graph; 594 tbb::flow::queue_node<TestLargeStruct> input_node{ graph }; 595 tbb::flow::function_node<TestLargeStruct> func{ graph, tbb::flow::serial, 596 [](const TestLargeStruct& input) { return input; } }; 597 598 tbb::flow::make_edge( input_node, func ); 599 CHECK( input_node.try_put( TestLargeStruct{} ) ); 600 graph.wait_for_all(); 601 602 tbb::task_scheduler_handle handle = tbb::task_scheduler_handle::get(); 603 REQUIRE_NOTHROW( tbb::finalize( handle, std::nothrow ) ); 604 } 605