1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #if __INTEL_COMPILER && _MSC_VER 18 #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated 19 #endif 20 21 #include "common/config.h" 22 23 #include "tbb/flow_graph.h" 24 25 #include "common/test.h" 26 #include "common/utils.h" 27 #include "common/utils_assert.h" 28 #include "common/test_follows_and_precedes_api.h" 29 30 #include <atomic> 31 32 33 //! \file test_limiter_node.cpp 34 //! \brief Test for [flow_graph.limiter_node] specification 35 36 37 const int L = 10; 38 const int N = 1000; 39 40 using tbb::detail::d1::SUCCESSFULLY_ENQUEUED; 41 using tbb::detail::d1::graph_task; 42 43 template< typename T > 44 struct serial_receiver : public tbb::flow::receiver<T>, utils::NoAssign { 45 T next_value; 46 tbb::flow::graph& my_graph; 47 48 serial_receiver(tbb::flow::graph& g) : next_value(T(0)), my_graph(g) {} 49 50 graph_task* try_put_task( const T &v ) override { 51 CHECK_MESSAGE( next_value++ == v, "" ); 52 return const_cast<graph_task*>(SUCCESSFULLY_ENQUEUED); 53 } 54 55 tbb::flow::graph& graph_reference() const override { 56 return my_graph; 57 } 58 }; 59 60 template< typename T > 61 struct parallel_receiver : public tbb::flow::receiver<T>, utils::NoAssign { 62 63 std::atomic<int> my_count; 64 tbb::flow::graph& my_graph; 65 66 parallel_receiver(tbb::flow::graph& g) : my_graph(g) { my_count = 0; } 67 68 graph_task* try_put_task( const T &/*v*/ ) override { 69 ++my_count; 70 return const_cast<graph_task*>(SUCCESSFULLY_ENQUEUED); 71 } 72 73 tbb::flow::graph& graph_reference() const override { 74 return my_graph; 75 } 76 }; 77 78 template< typename T > 79 struct empty_sender : public tbb::flow::sender<T> { 80 typedef typename tbb::flow::sender<T>::successor_type successor_type; 81 82 bool register_successor( successor_type & ) override { return false; } 83 bool remove_successor( successor_type & ) override { return false; } 84 }; 85 86 87 template< typename T > 88 struct put_body : utils::NoAssign { 89 90 tbb::flow::limiter_node<T> &my_lim; 91 std::atomic<int> &my_accept_count; 92 93 put_body( tbb::flow::limiter_node<T> &lim, std::atomic<int> &accept_count ) : 94 my_lim(lim), my_accept_count(accept_count) {} 95 96 void operator()( int ) const { 97 for ( int i = 0; i < L; ++i ) { 98 bool msg = my_lim.try_put( T(i) ); 99 if ( msg == true ) 100 ++my_accept_count; 101 } 102 } 103 }; 104 105 template< typename T > 106 struct put_dec_body : utils::NoAssign { 107 108 tbb::flow::limiter_node<T> &my_lim; 109 std::atomic<int> &my_accept_count; 110 111 put_dec_body( tbb::flow::limiter_node<T> &lim, std::atomic<int> &accept_count ) : 112 my_lim(lim), my_accept_count(accept_count) {} 113 114 void operator()( int ) const { 115 int local_accept_count = 0; 116 while ( local_accept_count < N ) { 117 bool msg = my_lim.try_put( T(local_accept_count) ); 118 if ( msg == true ) { 119 ++local_accept_count; 120 ++my_accept_count; 121 my_lim.decrementer().try_put( tbb::flow::continue_msg() ); 122 } 123 } 124 } 125 126 }; 127 128 template< typename T > 129 void test_puts_with_decrements( int num_threads, tbb::flow::limiter_node< T >& lim , tbb::flow::graph& g) { 130 parallel_receiver<T> r(g); 131 empty_sender< tbb::flow::continue_msg > s; 132 std::atomic<int> accept_count; 133 accept_count = 0; 134 tbb::flow::make_edge( lim, r ); 135 tbb::flow::make_edge(s, lim.decrementer()); 136 137 // test puts with decrements 138 utils::NativeParallelFor( num_threads, put_dec_body<T>(lim, accept_count) ); 139 int c = accept_count; 140 CHECK_MESSAGE( c == N*num_threads, "" ); 141 CHECK_MESSAGE( r.my_count == N*num_threads, "" ); 142 } 143 144 // 145 // Tests 146 // 147 // limiter only forwards below the limit, multiple parallel senders / single receiver 148 // multiple parallel senders that put to decrement at each accept, limiter accepts new messages 149 // 150 // 151 template< typename T > 152 int test_parallel(int num_threads) { 153 154 // test puts with no decrements 155 for ( int i = 0; i < L; ++i ) { 156 tbb::flow::graph g; 157 tbb::flow::limiter_node< T > lim(g, i); 158 parallel_receiver<T> r(g); 159 std::atomic<int> accept_count; 160 accept_count = 0; 161 tbb::flow::make_edge( lim, r ); 162 // test puts with no decrements 163 utils::NativeParallelFor( num_threads, put_body<T>(lim, accept_count) ); 164 g.wait_for_all(); 165 int c = accept_count; 166 CHECK_MESSAGE( c == i, "" ); 167 } 168 169 // test puts with decrements 170 for ( int i = 1; i < L; ++i ) { 171 tbb::flow::graph g; 172 tbb::flow::limiter_node< T > lim(g, i); 173 test_puts_with_decrements(num_threads, lim, g); 174 tbb::flow::limiter_node< T > lim_copy( lim ); 175 test_puts_with_decrements(num_threads, lim_copy, g); 176 } 177 178 return 0; 179 } 180 181 // 182 // Tests 183 // 184 // limiter only forwards below the limit, single sender / single receiver 185 // at reject, a put to decrement, will cause next message to be accepted 186 // 187 template< typename T > 188 int test_serial() { 189 190 // test puts with no decrements 191 for ( int i = 0; i < L; ++i ) { 192 tbb::flow::graph g; 193 tbb::flow::limiter_node< T > lim(g, i); 194 serial_receiver<T> r(g); 195 tbb::flow::make_edge( lim, r ); 196 for ( int j = 0; j < L; ++j ) { 197 bool msg = lim.try_put( T(j) ); 198 CHECK_MESSAGE( (( j < i && msg == true ) || ( j >= i && msg == false )), "" ); 199 } 200 g.wait_for_all(); 201 } 202 203 // test puts with decrements 204 for ( int i = 1; i < L; ++i ) { 205 tbb::flow::graph g; 206 tbb::flow::limiter_node< T > lim(g, i); 207 serial_receiver<T> r(g); 208 empty_sender< tbb::flow::continue_msg > s; 209 tbb::flow::make_edge( lim, r ); 210 tbb::flow::make_edge(s, lim.decrementer()); 211 for ( int j = 0; j < N; ++j ) { 212 bool msg = lim.try_put( T(j) ); 213 CHECK_MESSAGE( (( j < i && msg == true ) || ( j >= i && msg == false )), "" ); 214 if ( msg == false ) { 215 lim.decrementer().try_put( tbb::flow::continue_msg() ); 216 msg = lim.try_put( T(j) ); 217 CHECK_MESSAGE( msg == true, "" ); 218 } 219 } 220 } 221 return 0; 222 } 223 224 // reported bug in limiter (http://software.intel.com/en-us/comment/1752355) 225 #define DECREMENT_OUTPUT 1 // the port number of the decrement output of the multifunction_node 226 #define LIMITER_OUTPUT 0 // port number of the integer output 227 228 typedef tbb::flow::multifunction_node<int, std::tuple<int,tbb::flow::continue_msg> > mfnode_type; 229 230 std::atomic<size_t> emit_count; 231 std::atomic<size_t> emit_sum; 232 std::atomic<size_t> receive_count; 233 std::atomic<size_t> receive_sum; 234 235 struct mfnode_body { 236 int max_cnt; 237 std::atomic<int>* my_cnt; 238 mfnode_body(const int& _max, std::atomic<int> &_my) : max_cnt(_max), my_cnt(&_my) { } 239 void operator()(const int &/*in*/, mfnode_type::output_ports_type &out) { 240 int lcnt = ++(*my_cnt); 241 if(lcnt > max_cnt) { 242 return; 243 } 244 // put one continue_msg to the decrement of the limiter. 245 if(!std::get<DECREMENT_OUTPUT>(out).try_put(tbb::flow::continue_msg())) { 246 CHECK_MESSAGE( (false),"Unexpected rejection of decrement"); 247 } 248 { 249 // put messages to the input of the limiter_node until it rejects. 250 while( std::get<LIMITER_OUTPUT>(out).try_put(lcnt) ) { 251 emit_sum += lcnt; 252 ++emit_count; 253 } 254 } 255 } 256 }; 257 258 struct fn_body { 259 int operator()(const int &in) { 260 receive_sum += in; 261 ++receive_count; 262 return in; 263 } 264 }; 265 266 // +------------+ 267 // +---------+ | v 268 // | mf_node |0---+ +----------+ +----------+ 269 // +->| |1---------->| lim_node |--------->| fn_node |--+ 270 // | +---------+ +----------+ +----------+ | 271 // | | 272 // | | 273 // +-------------------------------------------------------------+ 274 // 275 void 276 test_multifunction_to_limiter(int _max, int _nparallel) { 277 tbb::flow::graph g; 278 emit_count = 0; 279 emit_sum = 0; 280 receive_count = 0; 281 receive_sum = 0; 282 std::atomic<int> local_cnt; 283 local_cnt = 0; 284 mfnode_type mf_node(g, tbb::flow::unlimited, mfnode_body(_max, local_cnt)); 285 tbb::flow::function_node<int, int> fn_node(g, tbb::flow::unlimited, fn_body()); 286 tbb::flow::limiter_node<int> lim_node(g, _nparallel); 287 tbb::flow::make_edge(tbb::flow::output_port<LIMITER_OUTPUT>(mf_node), lim_node); 288 tbb::flow::make_edge(tbb::flow::output_port<DECREMENT_OUTPUT>(mf_node), lim_node.decrementer()); 289 tbb::flow::make_edge(lim_node, fn_node); 290 tbb::flow::make_edge(fn_node, mf_node); 291 292 mf_node.try_put(1); 293 g.wait_for_all(); 294 CHECK_MESSAGE( (emit_count == receive_count), "counts do not match"); 295 CHECK_MESSAGE( (emit_sum == receive_sum), "sums do not match"); 296 297 // reset, test again 298 g.reset(); 299 emit_count = 0; 300 emit_sum = 0; 301 receive_count = 0; 302 receive_sum = 0; 303 local_cnt = 0;; 304 mf_node.try_put(1); 305 g.wait_for_all(); 306 CHECK_MESSAGE( (emit_count == receive_count), "counts do not match"); 307 CHECK_MESSAGE( (emit_sum == receive_sum), "sums do not match"); 308 } 309 310 311 void 312 test_continue_msg_reception() { 313 tbb::flow::graph g; 314 tbb::flow::limiter_node<int> ln(g,2); 315 tbb::flow::queue_node<int> qn(g); 316 tbb::flow::make_edge(ln, qn); 317 ln.decrementer().try_put(tbb::flow::continue_msg()); 318 ln.try_put(42); 319 g.wait_for_all(); 320 int outint; 321 CHECK_MESSAGE( (qn.try_get(outint) && outint == 42), "initial put to decrement stops node"); 322 } 323 324 325 // 326 // This test ascertains that if a message is not successfully put 327 // to a successor, the message is not dropped but released. 328 // 329 330 void test_reserve_release_messages() { 331 using namespace tbb::flow; 332 graph g; 333 334 //making two queue_nodes: one broadcast_node and one limiter_node 335 queue_node<int> input_queue(g); 336 queue_node<int> output_queue(g); 337 broadcast_node<int> broad(g); 338 limiter_node<int, int> limit(g,2); //threshold of 2 339 340 //edges 341 make_edge(input_queue, limit); 342 make_edge(limit, output_queue); 343 make_edge(broad,limit.decrementer()); 344 345 int list[4] = {19, 33, 72, 98}; //list to be put to the input queue 346 347 input_queue.try_put(list[0]); // succeeds 348 input_queue.try_put(list[1]); // succeeds 349 input_queue.try_put(list[2]); // fails, stored in upstream buffer 350 g.wait_for_all(); 351 352 remove_edge(limit, output_queue); //remove successor 353 354 //sending message to the decrement port of the limiter 355 broad.try_put(1); //failed message retrieved. 356 g.wait_for_all(); 357 358 #if __GNUC__ && __GNUC__ < 10 && !TBB_USE_DEBUG 359 // Seemingly, GNU compiler generates incorrect code for the call of limiter.register_successor in release (-03) 360 // The function pointer to make_edge workarounds the issue for unknown reason 361 auto make_edge_ptr = make_edge<int>; 362 make_edge_ptr(limit, output_queue); //putting the successor back 363 #else 364 make_edge(limit, output_queue); //putting the successor back 365 #endif 366 367 broad.try_put(1); //drop the count 368 369 input_queue.try_put(list[3]); //success 370 g.wait_for_all(); 371 372 int var=0; 373 374 for (int i=0; i<4; i++) { 375 output_queue.try_get(var); 376 CHECK_MESSAGE( (var==list[i]), "some data dropped, input does not match output"); 377 g.wait_for_all(); 378 } 379 } 380 381 void test_decrementer() { 382 const int threshold = 5; 383 tbb::flow::graph g; 384 tbb::flow::limiter_node<int, int> limit(g, threshold); 385 tbb::flow::queue_node<int> queue(g); 386 make_edge(limit, queue); 387 int m = 0; 388 CHECK_MESSAGE( ( limit.try_put( m++ )), "Newly constructed limiter node does not accept message." ); 389 CHECK_MESSAGE( limit.decrementer().try_put( -threshold ), // close limiter's gate 390 "Limiter node decrementer's port does not accept message." ); 391 CHECK_MESSAGE( ( !limit.try_put( m++ )), "Closed limiter node's accepts message." ); 392 CHECK_MESSAGE( limit.decrementer().try_put( threshold + 5 ), // open limiter's gate 393 "Limiter node decrementer's port does not accept message." ); 394 for( int i = 0; i < threshold; ++i ) 395 CHECK_MESSAGE( ( limit.try_put( m++ )), "Limiter node does not accept message while open." ); 396 CHECK_MESSAGE( ( !limit.try_put( m )), "Limiter node's gate is not closed." ); 397 g.wait_for_all(); 398 int expected[] = {0, 2, 3, 4, 5, 6}; 399 int actual = -1; m = 0; 400 while( queue.try_get(actual) ) 401 CHECK_MESSAGE( actual == expected[m++], "" ); 402 CHECK_MESSAGE( ( sizeof(expected) / sizeof(expected[0]) == m), "Not all messages have been processed." ); 403 g.wait_for_all(); 404 405 const size_t threshold2 = size_t(-1); 406 tbb::flow::limiter_node<int, long long> limit2(g, threshold2); 407 make_edge(limit2, queue); 408 CHECK_MESSAGE( ( limit2.try_put( 1 )), "Newly constructed limiter node does not accept message." ); 409 long long decrement_value = (long long)( size_t(-1)/2 ); 410 CHECK_MESSAGE( limit2.decrementer().try_put( -decrement_value ), 411 "Limiter node decrementer's port does not accept message" ); 412 CHECK_MESSAGE( ( limit2.try_put( 2 )), "Limiter's gate should not be closed yet." ); 413 CHECK_MESSAGE( limit2.decrementer().try_put( -decrement_value ), 414 "Limiter node decrementer's port does not accept message" ); 415 CHECK_MESSAGE( ( !limit2.try_put( 3 )), "Overflow happened for internal counter." ); 416 int expected2[] = {1, 2}; 417 actual = -1; m = 0; 418 while( queue.try_get(actual) ) 419 CHECK_MESSAGE( actual == expected2[m++], "" ); 420 CHECK_MESSAGE( ( sizeof(expected2) / sizeof(expected2[0]) == m), "Not all messages have been processed." ); 421 g.wait_for_all(); 422 } 423 424 void test_try_put_without_successors() { 425 tbb::flow::graph g; 426 std::size_t try_put_num{3}; 427 tbb::flow::buffer_node<int> bn(g); 428 tbb::flow::limiter_node<int> ln(g, try_put_num); 429 tbb::flow::make_edge(bn, ln); 430 std::size_t i = 1; 431 for (; i <= try_put_num; i++) 432 bn.try_put(i); 433 434 std::atomic<std::size_t> counter{0}; 435 tbb::flow::function_node<int, int> fn(g, tbb::flow::unlimited, 436 [&](int input) { 437 counter += input; 438 return int{}; 439 } 440 ); 441 tbb::flow::make_edge(ln, fn); 442 g.wait_for_all(); 443 CHECK((counter == i * try_put_num / 2)); 444 445 // Check the lost message 446 tbb::flow::remove_edge(bn, ln); 447 ln.decrementer().try_put(tbb::flow::continue_msg()); 448 bn.try_put(try_put_num + 1); 449 g.wait_for_all(); 450 CHECK((counter == i * try_put_num / 2)); 451 452 } 453 454 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 455 #include <array> 456 #include <vector> 457 void test_follows_and_precedes_api() { 458 using msg_t = tbb::flow::continue_msg; 459 460 std::array<msg_t, 3> messages_for_follows= { {msg_t(), msg_t(), msg_t()} }; 461 std::vector<msg_t> messages_for_precedes = {msg_t()}; 462 463 follows_and_precedes_testing::test_follows 464 <msg_t, tbb::flow::limiter_node<msg_t, msg_t>>(messages_for_follows, 1000); 465 follows_and_precedes_testing::test_precedes 466 <msg_t, tbb::flow::limiter_node<msg_t, msg_t>>(messages_for_precedes, 1000); 467 468 } 469 #endif 470 471 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 472 void test_deduction_guides() { 473 using namespace tbb::flow; 474 475 graph g; 476 broadcast_node<int> br(g); 477 limiter_node<int> l0(g, 100); 478 479 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 480 limiter_node l1(follows(br), 100); 481 static_assert(std::is_same_v<decltype(l1), limiter_node<int>>); 482 483 limiter_node l2(precedes(br), 100); 484 static_assert(std::is_same_v<decltype(l2), limiter_node<int>>); 485 #endif 486 487 limiter_node l3(l0); 488 static_assert(std::is_same_v<decltype(l3), limiter_node<int>>); 489 } 490 #endif 491 492 //! Test puts on limiter_node with decrements and varying parallelism levels 493 //! \brief \ref error_guessing 494 TEST_CASE("Serial and parallel tests") { 495 for (unsigned i = 1; i <= 2 * utils::MaxThread; ++i) { 496 tbb::task_arena arena(i); 497 arena.execute( 498 [i]() { 499 test_serial<int>(); 500 test_parallel<int>(i); 501 } 502 ); 503 } 504 } 505 506 //! Test initial put of continue_msg on decrementer port does not stop message flow 507 //! \brief \ref error_guessing 508 TEST_CASE("Test continue_msg reception") { 509 test_continue_msg_reception(); 510 } 511 512 //! Test multifunction_node connected to limiter_node 513 //! \brief \ref error_guessing 514 TEST_CASE("Multifunction connected to limiter") { 515 test_multifunction_to_limiter(30,3); 516 test_multifunction_to_limiter(300,13); 517 test_multifunction_to_limiter(3000,1); 518 } 519 520 //! Test message release if successor doesn't accept 521 //! \brief \ref requirement 522 TEST_CASE("Message is released if successor does not accept") { 523 test_reserve_release_messages(); 524 } 525 526 //! Test decrementer 527 //! \brief \ref requirement \ref error_guessing 528 TEST_CASE("Decrementer") { 529 test_decrementer(); 530 } 531 532 //! Test try_put() without successor 533 //! \brief \ref error_guessing 534 TEST_CASE("Test try_put() without successors") { 535 test_try_put_without_successors(); 536 } 537 538 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 539 //! Test follows and precedes API 540 //! \brief \ref error_guessing 541 TEST_CASE( "Support for follows and precedes API" ) { 542 test_follows_and_precedes_api(); 543 } 544 #endif 545 546 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 547 //! Test deduction guides 548 //! \brief \ref requirement 549 TEST_CASE( "Deduction guides" ) { 550 test_deduction_guides(); 551 } 552 #endif 553