1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #if __INTEL_COMPILER && _MSC_VER 18 #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated 19 #endif 20 21 #include "common/config.h" 22 23 #include "tbb/flow_graph.h" 24 25 #include "common/test.h" 26 #include "common/utils.h" 27 #include "common/utils_assert.h" 28 #include "common/test_follows_and_precedes_api.h" 29 30 #include <atomic> 31 32 33 //! \file test_limiter_node.cpp 34 //! \brief Test for [flow_graph.limiter_node] specification 35 36 37 const int L = 10; 38 const int N = 1000; 39 40 using tbb::detail::d1::SUCCESSFULLY_ENQUEUED; 41 using tbb::detail::d1::graph_task; 42 43 template< typename T > 44 struct serial_receiver : public tbb::flow::receiver<T>, utils::NoAssign { 45 T next_value; 46 tbb::flow::graph& my_graph; 47 48 serial_receiver(tbb::flow::graph& g) : next_value(T(0)), my_graph(g) {} 49 50 graph_task* try_put_task( const T &v ) override { 51 CHECK_MESSAGE( next_value++ == v, "" ); 52 return const_cast<graph_task*>(SUCCESSFULLY_ENQUEUED); 53 } 54 55 tbb::flow::graph& graph_reference() const override { 56 return my_graph; 57 } 58 }; 59 60 template< typename T > 61 struct parallel_receiver : public tbb::flow::receiver<T>, utils::NoAssign { 62 63 std::atomic<int> my_count; 64 tbb::flow::graph& my_graph; 65 66 parallel_receiver(tbb::flow::graph& g) : my_graph(g) { my_count = 0; } 67 68 graph_task* try_put_task( const T &/*v*/ ) override { 69 ++my_count; 70 return const_cast<graph_task*>(SUCCESSFULLY_ENQUEUED); 71 } 72 73 tbb::flow::graph& graph_reference() const override { 74 return my_graph; 75 } 76 }; 77 78 template< typename T > 79 struct empty_sender : public tbb::flow::sender<T> { 80 typedef typename tbb::flow::sender<T>::successor_type successor_type; 81 82 bool register_successor( successor_type & ) override { return false; } 83 bool remove_successor( successor_type & ) override { return false; } 84 }; 85 86 87 template< typename T > 88 struct put_body : utils::NoAssign { 89 90 tbb::flow::limiter_node<T> &my_lim; 91 std::atomic<int> &my_accept_count; 92 93 put_body( tbb::flow::limiter_node<T> &lim, std::atomic<int> &accept_count ) : 94 my_lim(lim), my_accept_count(accept_count) {} 95 96 void operator()( int ) const { 97 for ( int i = 0; i < L; ++i ) { 98 bool msg = my_lim.try_put( T(i) ); 99 if ( msg == true ) 100 ++my_accept_count; 101 } 102 } 103 }; 104 105 template< typename T > 106 struct put_dec_body : utils::NoAssign { 107 108 tbb::flow::limiter_node<T> &my_lim; 109 std::atomic<int> &my_accept_count; 110 111 put_dec_body( tbb::flow::limiter_node<T> &lim, std::atomic<int> &accept_count ) : 112 my_lim(lim), my_accept_count(accept_count) {} 113 114 void operator()( int ) const { 115 int local_accept_count = 0; 116 while ( local_accept_count < N ) { 117 bool msg = my_lim.try_put( T(local_accept_count) ); 118 if ( msg == true ) { 119 ++local_accept_count; 120 ++my_accept_count; 121 my_lim.decrementer().try_put( tbb::flow::continue_msg() ); 122 } 123 } 124 } 125 126 }; 127 128 template< typename Sender, typename Receiver > 129 void make_edge_impl(Sender& sender, Receiver& receiver){ 130 #if __GNUC__ < 12 && !TBB_USE_DEBUG 131 // Seemingly, GNU compiler generates incorrect code for the call of limiter.register_successor in release (-03) 132 // The function pointer to make_edge workarounds the issue for unknown reason 133 auto make_edge_ptr = tbb::flow::make_edge<int>; 134 make_edge_ptr(sender, receiver); 135 #else 136 tbb::flow::make_edge(sender, receiver); 137 #endif 138 } 139 140 template< typename T > 141 void test_puts_with_decrements( int num_threads, tbb::flow::limiter_node< T >& lim , tbb::flow::graph& g) { 142 parallel_receiver<T> r(g); 143 empty_sender< tbb::flow::continue_msg > s; 144 std::atomic<int> accept_count; 145 accept_count = 0; 146 tbb::flow::make_edge( lim, r ); 147 tbb::flow::make_edge(s, lim.decrementer()); 148 149 // test puts with decrements 150 utils::NativeParallelFor( num_threads, put_dec_body<T>(lim, accept_count) ); 151 int c = accept_count; 152 CHECK_MESSAGE( c == N*num_threads, "" ); 153 CHECK_MESSAGE( r.my_count == N*num_threads, "" ); 154 } 155 156 // 157 // Tests 158 // 159 // limiter only forwards below the limit, multiple parallel senders / single receiver 160 // multiple parallel senders that put to decrement at each accept, limiter accepts new messages 161 // 162 // 163 template< typename T > 164 int test_parallel(int num_threads) { 165 166 // test puts with no decrements 167 for ( int i = 0; i < L; ++i ) { 168 tbb::flow::graph g; 169 tbb::flow::limiter_node< T > lim(g, i); 170 parallel_receiver<T> r(g); 171 std::atomic<int> accept_count; 172 accept_count = 0; 173 tbb::flow::make_edge( lim, r ); 174 // test puts with no decrements 175 utils::NativeParallelFor( num_threads, put_body<T>(lim, accept_count) ); 176 g.wait_for_all(); 177 int c = accept_count; 178 CHECK_MESSAGE( c == i, "" ); 179 } 180 181 // test puts with decrements 182 for ( int i = 1; i < L; ++i ) { 183 tbb::flow::graph g; 184 tbb::flow::limiter_node< T > lim(g, i); 185 test_puts_with_decrements(num_threads, lim, g); 186 tbb::flow::limiter_node< T > lim_copy( lim ); 187 test_puts_with_decrements(num_threads, lim_copy, g); 188 } 189 190 return 0; 191 } 192 193 // 194 // Tests 195 // 196 // limiter only forwards below the limit, single sender / single receiver 197 // at reject, a put to decrement, will cause next message to be accepted 198 // 199 template< typename T > 200 int test_serial() { 201 202 // test puts with no decrements 203 for ( int i = 0; i < L; ++i ) { 204 tbb::flow::graph g; 205 tbb::flow::limiter_node< T > lim(g, i); 206 serial_receiver<T> r(g); 207 tbb::flow::make_edge( lim, r ); 208 for ( int j = 0; j < L; ++j ) { 209 bool msg = lim.try_put( T(j) ); 210 CHECK_MESSAGE( (( j < i && msg == true ) || ( j >= i && msg == false )), "" ); 211 } 212 g.wait_for_all(); 213 } 214 215 // test puts with decrements 216 for ( int i = 1; i < L; ++i ) { 217 tbb::flow::graph g; 218 tbb::flow::limiter_node< T > lim(g, i); 219 serial_receiver<T> r(g); 220 empty_sender< tbb::flow::continue_msg > s; 221 tbb::flow::make_edge( lim, r ); 222 tbb::flow::make_edge(s, lim.decrementer()); 223 for ( int j = 0; j < N; ++j ) { 224 bool msg = lim.try_put( T(j) ); 225 CHECK_MESSAGE( (( j < i && msg == true ) || ( j >= i && msg == false )), "" ); 226 if ( msg == false ) { 227 lim.decrementer().try_put( tbb::flow::continue_msg() ); 228 msg = lim.try_put( T(j) ); 229 CHECK_MESSAGE( msg == true, "" ); 230 } 231 } 232 } 233 return 0; 234 } 235 236 // reported bug in limiter (http://software.intel.com/en-us/comment/1752355) 237 #define DECREMENT_OUTPUT 1 // the port number of the decrement output of the multifunction_node 238 #define LIMITER_OUTPUT 0 // port number of the integer output 239 240 typedef tbb::flow::multifunction_node<int, std::tuple<int,tbb::flow::continue_msg> > mfnode_type; 241 242 std::atomic<size_t> emit_count; 243 std::atomic<size_t> emit_sum; 244 std::atomic<size_t> receive_count; 245 std::atomic<size_t> receive_sum; 246 247 struct mfnode_body { 248 int max_cnt; 249 std::atomic<int>* my_cnt; 250 mfnode_body(const int& _max, std::atomic<int> &_my) : max_cnt(_max), my_cnt(&_my) { } 251 void operator()(const int &/*in*/, mfnode_type::output_ports_type &out) { 252 int lcnt = ++(*my_cnt); 253 if(lcnt > max_cnt) { 254 return; 255 } 256 // put one continue_msg to the decrement of the limiter. 257 if(!std::get<DECREMENT_OUTPUT>(out).try_put(tbb::flow::continue_msg())) { 258 CHECK_MESSAGE( (false),"Unexpected rejection of decrement"); 259 } 260 { 261 // put messages to the input of the limiter_node until it rejects. 262 while( std::get<LIMITER_OUTPUT>(out).try_put(lcnt) ) { 263 emit_sum += lcnt; 264 ++emit_count; 265 } 266 } 267 } 268 }; 269 270 struct fn_body { 271 int operator()(const int &in) { 272 receive_sum += in; 273 ++receive_count; 274 return in; 275 } 276 }; 277 278 // +------------+ 279 // +---------+ | v 280 // | mf_node |0---+ +----------+ +----------+ 281 // +->| |1---------->| lim_node |--------->| fn_node |--+ 282 // | +---------+ +----------+ +----------+ | 283 // | | 284 // | | 285 // +-------------------------------------------------------------+ 286 // 287 void 288 test_multifunction_to_limiter(int _max, int _nparallel) { 289 tbb::flow::graph g; 290 emit_count = 0; 291 emit_sum = 0; 292 receive_count = 0; 293 receive_sum = 0; 294 std::atomic<int> local_cnt; 295 local_cnt = 0; 296 mfnode_type mf_node(g, tbb::flow::unlimited, mfnode_body(_max, local_cnt)); 297 tbb::flow::function_node<int, int> fn_node(g, tbb::flow::unlimited, fn_body()); 298 tbb::flow::limiter_node<int> lim_node(g, _nparallel); 299 tbb::flow::make_edge(tbb::flow::output_port<LIMITER_OUTPUT>(mf_node), lim_node); 300 tbb::flow::make_edge(tbb::flow::output_port<DECREMENT_OUTPUT>(mf_node), lim_node.decrementer()); 301 tbb::flow::make_edge(lim_node, fn_node); 302 tbb::flow::make_edge(fn_node, mf_node); 303 304 mf_node.try_put(1); 305 g.wait_for_all(); 306 CHECK_MESSAGE( (emit_count == receive_count), "counts do not match"); 307 CHECK_MESSAGE( (emit_sum == receive_sum), "sums do not match"); 308 309 // reset, test again 310 g.reset(); 311 emit_count = 0; 312 emit_sum = 0; 313 receive_count = 0; 314 receive_sum = 0; 315 local_cnt = 0;; 316 mf_node.try_put(1); 317 g.wait_for_all(); 318 CHECK_MESSAGE( (emit_count == receive_count), "counts do not match"); 319 CHECK_MESSAGE( (emit_sum == receive_sum), "sums do not match"); 320 } 321 322 323 void 324 test_continue_msg_reception() { 325 tbb::flow::graph g; 326 tbb::flow::limiter_node<int> ln(g,2); 327 tbb::flow::queue_node<int> qn(g); 328 tbb::flow::make_edge(ln, qn); 329 ln.decrementer().try_put(tbb::flow::continue_msg()); 330 ln.try_put(42); 331 g.wait_for_all(); 332 int outint; 333 CHECK_MESSAGE( (qn.try_get(outint) && outint == 42), "initial put to decrement stops node"); 334 } 335 336 337 // 338 // This test ascertains that if a message is not successfully put 339 // to a successor, the message is not dropped but released. 340 // 341 342 void test_reserve_release_messages() { 343 using namespace tbb::flow; 344 graph g; 345 346 //making two queue_nodes: one broadcast_node and one limiter_node 347 queue_node<int> input_queue(g); 348 queue_node<int> output_queue(g); 349 broadcast_node<int> broad(g); 350 limiter_node<int, int> limit(g,2); //threshold of 2 351 352 //edges 353 make_edge(input_queue, limit); 354 make_edge(limit, output_queue); 355 make_edge(broad,limit.decrementer()); 356 357 int list[4] = {19, 33, 72, 98}; //list to be put to the input queue 358 359 input_queue.try_put(list[0]); // succeeds 360 input_queue.try_put(list[1]); // succeeds 361 input_queue.try_put(list[2]); // fails, stored in upstream buffer 362 g.wait_for_all(); 363 364 remove_edge(limit, output_queue); //remove successor 365 366 //sending message to the decrement port of the limiter 367 broad.try_put(1); //failed message retrieved. 368 g.wait_for_all(); 369 370 make_edge_impl(limit, output_queue); //putting the successor back 371 372 broad.try_put(1); //drop the count 373 374 input_queue.try_put(list[3]); //success 375 g.wait_for_all(); 376 377 int var=0; 378 379 for (int i=0; i<4; i++) { 380 output_queue.try_get(var); 381 CHECK_MESSAGE( (var==list[i]), "some data dropped, input does not match output"); 382 g.wait_for_all(); 383 } 384 } 385 386 void test_decrementer() { 387 const int threshold = 5; 388 tbb::flow::graph g; 389 tbb::flow::limiter_node<int, int> limit(g, threshold); 390 tbb::flow::queue_node<int> queue(g); 391 make_edge(limit, queue); 392 int m = 0; 393 CHECK_MESSAGE( ( limit.try_put( m++ )), "Newly constructed limiter node does not accept message." ); 394 CHECK_MESSAGE( limit.decrementer().try_put( -threshold ), // close limiter's gate 395 "Limiter node decrementer's port does not accept message." ); 396 CHECK_MESSAGE( ( !limit.try_put( m++ )), "Closed limiter node's accepts message." ); 397 CHECK_MESSAGE( limit.decrementer().try_put( threshold + 5 ), // open limiter's gate 398 "Limiter node decrementer's port does not accept message." ); 399 for( int i = 0; i < threshold; ++i ) 400 CHECK_MESSAGE( ( limit.try_put( m++ )), "Limiter node does not accept message while open." ); 401 CHECK_MESSAGE( ( !limit.try_put( m )), "Limiter node's gate is not closed." ); 402 g.wait_for_all(); 403 int expected[] = {0, 2, 3, 4, 5, 6}; 404 int actual = -1; m = 0; 405 while( queue.try_get(actual) ) 406 CHECK_MESSAGE( actual == expected[m++], "" ); 407 CHECK_MESSAGE( ( sizeof(expected) / sizeof(expected[0]) == m), "Not all messages have been processed." ); 408 g.wait_for_all(); 409 410 const size_t threshold2 = size_t(-1); 411 tbb::flow::limiter_node<int, long long> limit2(g, threshold2); 412 make_edge(limit2, queue); 413 CHECK_MESSAGE( ( limit2.try_put( 1 )), "Newly constructed limiter node does not accept message." ); 414 long long decrement_value = (long long)( size_t(-1)/2 ); 415 CHECK_MESSAGE( limit2.decrementer().try_put( -decrement_value ), 416 "Limiter node decrementer's port does not accept message" ); 417 CHECK_MESSAGE( ( limit2.try_put( 2 )), "Limiter's gate should not be closed yet." ); 418 CHECK_MESSAGE( limit2.decrementer().try_put( -decrement_value ), 419 "Limiter node decrementer's port does not accept message" ); 420 CHECK_MESSAGE( ( !limit2.try_put( 3 )), "Overflow happened for internal counter." ); 421 int expected2[] = {1, 2}; 422 actual = -1; m = 0; 423 while( queue.try_get(actual) ) 424 CHECK_MESSAGE( actual == expected2[m++], "" ); 425 CHECK_MESSAGE( ( sizeof(expected2) / sizeof(expected2[0]) == m), "Not all messages have been processed." ); 426 g.wait_for_all(); 427 428 const size_t threshold3 = 10; 429 tbb::flow::limiter_node<int, long long> limit3(g, threshold3); 430 make_edge(limit3, queue); 431 long long decrement_value3 = 3; 432 CHECK_MESSAGE( limit3.decrementer().try_put( -decrement_value3 ), 433 "Limiter node decrementer's port does not accept message" ); 434 435 m = 0; 436 while( limit3.try_put( m ) ){ m++; }; 437 CHECK_MESSAGE( m == threshold3 - decrement_value3, "Not all messages have been accepted." ); 438 439 actual = -1; m = 0; 440 while( queue.try_get(actual) ){ 441 CHECK_MESSAGE( actual == m++, "Not all messages have been processed." ); 442 } 443 444 g.wait_for_all(); 445 CHECK_MESSAGE( m == threshold3 - decrement_value3, "Not all messages have been processed." ); 446 } 447 448 void test_try_put_without_successors() { 449 tbb::flow::graph g; 450 int try_put_num{3}; 451 tbb::flow::buffer_node<int> bn(g); 452 tbb::flow::limiter_node<int> ln(g, try_put_num); 453 454 tbb::flow::make_edge(bn, ln); 455 456 int i = 1; 457 for (; i <= try_put_num; i++) 458 bn.try_put(i); 459 460 std::atomic<int> counter{0}; 461 tbb::flow::function_node<int, int> fn(g, tbb::flow::unlimited, 462 [&](int input) { 463 counter += input; 464 return int{}; 465 } 466 ); 467 468 make_edge_impl(ln, fn); 469 470 g.wait_for_all(); 471 CHECK((counter == i * try_put_num / 2)); 472 473 // Check the lost message 474 tbb::flow::remove_edge(bn, ln); 475 ln.decrementer().try_put(tbb::flow::continue_msg()); 476 bn.try_put(try_put_num + 1); 477 g.wait_for_all(); 478 CHECK((counter == i * try_put_num / 2)); 479 480 } 481 482 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 483 #include <array> 484 #include <vector> 485 void test_follows_and_precedes_api() { 486 using msg_t = tbb::flow::continue_msg; 487 488 std::array<msg_t, 3> messages_for_follows= { {msg_t(), msg_t(), msg_t()} }; 489 std::vector<msg_t> messages_for_precedes = {msg_t()}; 490 491 follows_and_precedes_testing::test_follows 492 <msg_t, tbb::flow::limiter_node<msg_t, msg_t>>(messages_for_follows, 1000); 493 follows_and_precedes_testing::test_precedes 494 <msg_t, tbb::flow::limiter_node<msg_t, msg_t>>(messages_for_precedes, 1000); 495 496 } 497 #endif 498 499 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 500 void test_deduction_guides() { 501 using namespace tbb::flow; 502 503 graph g; 504 broadcast_node<int> br(g); 505 limiter_node<int> l0(g, 100); 506 507 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 508 limiter_node l1(follows(br), 100); 509 static_assert(std::is_same_v<decltype(l1), limiter_node<int>>); 510 511 limiter_node l2(precedes(br), 100); 512 static_assert(std::is_same_v<decltype(l2), limiter_node<int>>); 513 #endif 514 515 limiter_node l3(l0); 516 static_assert(std::is_same_v<decltype(l3), limiter_node<int>>); 517 } 518 #endif 519 520 //! Test puts on limiter_node with decrements and varying parallelism levels 521 //! \brief \ref error_guessing 522 TEST_CASE("Serial and parallel tests") { 523 for (unsigned i = 1; i <= 2 * utils::MaxThread; ++i) { 524 tbb::task_arena arena(i); 525 arena.execute( 526 [i]() { 527 test_serial<int>(); 528 test_parallel<int>(i); 529 } 530 ); 531 } 532 } 533 534 //! Test initial put of continue_msg on decrementer port does not stop message flow 535 //! \brief \ref error_guessing 536 TEST_CASE("Test continue_msg reception") { 537 test_continue_msg_reception(); 538 } 539 540 //! Test multifunction_node connected to limiter_node 541 //! \brief \ref error_guessing 542 TEST_CASE("Multifunction connected to limiter") { 543 test_multifunction_to_limiter(30,3); 544 test_multifunction_to_limiter(300,13); 545 test_multifunction_to_limiter(3000,1); 546 } 547 548 //! Test message release if successor doesn't accept 549 //! \brief \ref requirement 550 TEST_CASE("Message is released if successor does not accept") { 551 test_reserve_release_messages(); 552 } 553 554 //! Test decrementer 555 //! \brief \ref requirement \ref error_guessing 556 TEST_CASE("Decrementer") { 557 test_decrementer(); 558 } 559 560 //! Test try_put() without successor 561 //! \brief \ref error_guessing 562 TEST_CASE("Test try_put() without successors") { 563 test_try_put_without_successors(); 564 } 565 566 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 567 //! Test follows and precedes API 568 //! \brief \ref error_guessing 569 TEST_CASE( "Support for follows and precedes API" ) { 570 test_follows_and_precedes_api(); 571 } 572 #endif 573 574 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 575 //! Test deduction guides 576 //! \brief \ref requirement 577 TEST_CASE( "Deduction guides" ) { 578 test_deduction_guides(); 579 } 580 #endif 581