1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #if __INTEL_COMPILER && _MSC_VER 18 #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated 19 #endif 20 21 #include "common/config.h" 22 23 #include "tbb/flow_graph.h" 24 25 #include "common/test.h" 26 #include "common/utils.h" 27 #include "common/utils_assert.h" 28 #include "common/test_follows_and_precedes_api.h" 29 30 #include <atomic> 31 32 33 //! \file test_limiter_node.cpp 34 //! \brief Test for [flow_graph.limiter_node] specification 35 36 37 const int L = 10; 38 const int N = 1000; 39 40 using tbb::detail::d1::SUCCESSFULLY_ENQUEUED; 41 using tbb::detail::d1::graph_task; 42 43 template< typename T > 44 struct serial_receiver : public tbb::flow::receiver<T>, utils::NoAssign { 45 T next_value; 46 tbb::flow::graph& my_graph; 47 48 serial_receiver(tbb::flow::graph& g) : next_value(T(0)), my_graph(g) {} 49 50 graph_task* try_put_task( const T &v ) override { 51 CHECK_MESSAGE( next_value++ == v, "" ); 52 return const_cast<graph_task*>(SUCCESSFULLY_ENQUEUED); 53 } 54 55 tbb::flow::graph& graph_reference() const override { 56 return my_graph; 57 } 58 }; 59 60 template< typename T > 61 struct parallel_receiver : public tbb::flow::receiver<T>, utils::NoAssign { 62 63 std::atomic<int> my_count; 64 tbb::flow::graph& my_graph; 65 66 parallel_receiver(tbb::flow::graph& g) : my_graph(g) { my_count = 0; } 67 68 graph_task* try_put_task( const T &/*v*/ ) override { 69 ++my_count; 70 return const_cast<graph_task*>(SUCCESSFULLY_ENQUEUED); 71 } 72 73 tbb::flow::graph& graph_reference() const override { 74 return my_graph; 75 } 76 }; 77 78 template< typename T > 79 struct empty_sender : public tbb::flow::sender<T> { 80 typedef typename tbb::flow::sender<T>::successor_type successor_type; 81 82 bool register_successor( successor_type & ) override { return false; } 83 bool remove_successor( successor_type & ) override { return false; } 84 }; 85 86 87 template< typename T > 88 struct put_body : utils::NoAssign { 89 90 tbb::flow::limiter_node<T> &my_lim; 91 std::atomic<int> &my_accept_count; 92 93 put_body( tbb::flow::limiter_node<T> &lim, std::atomic<int> &accept_count ) : 94 my_lim(lim), my_accept_count(accept_count) {} 95 96 void operator()( int ) const { 97 for ( int i = 0; i < L; ++i ) { 98 bool msg = my_lim.try_put( T(i) ); 99 if ( msg == true ) 100 ++my_accept_count; 101 } 102 } 103 }; 104 105 template< typename T > 106 struct put_dec_body : utils::NoAssign { 107 108 tbb::flow::limiter_node<T> &my_lim; 109 std::atomic<int> &my_accept_count; 110 111 put_dec_body( tbb::flow::limiter_node<T> &lim, std::atomic<int> &accept_count ) : 112 my_lim(lim), my_accept_count(accept_count) {} 113 114 void operator()( int ) const { 115 int local_accept_count = 0; 116 while ( local_accept_count < N ) { 117 bool msg = my_lim.try_put( T(local_accept_count) ); 118 if ( msg == true ) { 119 ++local_accept_count; 120 ++my_accept_count; 121 my_lim.decrementer().try_put( tbb::flow::continue_msg() ); 122 } 123 } 124 } 125 126 }; 127 128 template< typename T > 129 void test_puts_with_decrements( int num_threads, tbb::flow::limiter_node< T >& lim , tbb::flow::graph& g) { 130 parallel_receiver<T> r(g); 131 empty_sender< tbb::flow::continue_msg > s; 132 std::atomic<int> accept_count; 133 accept_count = 0; 134 tbb::flow::make_edge( lim, r ); 135 tbb::flow::make_edge(s, lim.decrementer()); 136 137 // test puts with decrements 138 utils::NativeParallelFor( num_threads, put_dec_body<T>(lim, accept_count) ); 139 int c = accept_count; 140 CHECK_MESSAGE( c == N*num_threads, "" ); 141 CHECK_MESSAGE( r.my_count == N*num_threads, "" ); 142 } 143 144 // 145 // Tests 146 // 147 // limiter only forwards below the limit, multiple parallel senders / single receiver 148 // multiple parallel senders that put to decrement at each accept, limiter accepts new messages 149 // 150 // 151 template< typename T > 152 int test_parallel(int num_threads) { 153 154 // test puts with no decrements 155 for ( int i = 0; i < L; ++i ) { 156 tbb::flow::graph g; 157 tbb::flow::limiter_node< T > lim(g, i); 158 parallel_receiver<T> r(g); 159 std::atomic<int> accept_count; 160 accept_count = 0; 161 tbb::flow::make_edge( lim, r ); 162 // test puts with no decrements 163 utils::NativeParallelFor( num_threads, put_body<T>(lim, accept_count) ); 164 g.wait_for_all(); 165 int c = accept_count; 166 CHECK_MESSAGE( c == i, "" ); 167 } 168 169 // test puts with decrements 170 for ( int i = 1; i < L; ++i ) { 171 tbb::flow::graph g; 172 tbb::flow::limiter_node< T > lim(g, i); 173 test_puts_with_decrements(num_threads, lim, g); 174 tbb::flow::limiter_node< T > lim_copy( lim ); 175 test_puts_with_decrements(num_threads, lim_copy, g); 176 } 177 178 return 0; 179 } 180 181 // 182 // Tests 183 // 184 // limiter only forwards below the limit, single sender / single receiver 185 // at reject, a put to decrement, will cause next message to be accepted 186 // 187 template< typename T > 188 int test_serial() { 189 190 // test puts with no decrements 191 for ( int i = 0; i < L; ++i ) { 192 tbb::flow::graph g; 193 tbb::flow::limiter_node< T > lim(g, i); 194 serial_receiver<T> r(g); 195 tbb::flow::make_edge( lim, r ); 196 for ( int j = 0; j < L; ++j ) { 197 bool msg = lim.try_put( T(j) ); 198 CHECK_MESSAGE( (( j < i && msg == true ) || ( j >= i && msg == false )), "" ); 199 } 200 g.wait_for_all(); 201 } 202 203 // test puts with decrements 204 for ( int i = 1; i < L; ++i ) { 205 tbb::flow::graph g; 206 tbb::flow::limiter_node< T > lim(g, i); 207 serial_receiver<T> r(g); 208 empty_sender< tbb::flow::continue_msg > s; 209 tbb::flow::make_edge( lim, r ); 210 tbb::flow::make_edge(s, lim.decrementer()); 211 for ( int j = 0; j < N; ++j ) { 212 bool msg = lim.try_put( T(j) ); 213 CHECK_MESSAGE( (( j < i && msg == true ) || ( j >= i && msg == false )), "" ); 214 if ( msg == false ) { 215 lim.decrementer().try_put( tbb::flow::continue_msg() ); 216 msg = lim.try_put( T(j) ); 217 CHECK_MESSAGE( msg == true, "" ); 218 } 219 } 220 } 221 return 0; 222 } 223 224 // reported bug in limiter (http://software.intel.com/en-us/comment/1752355) 225 #define DECREMENT_OUTPUT 1 // the port number of the decrement output of the multifunction_node 226 #define LIMITER_OUTPUT 0 // port number of the integer output 227 228 typedef tbb::flow::multifunction_node<int, std::tuple<int,tbb::flow::continue_msg> > mfnode_type; 229 230 std::atomic<size_t> emit_count; 231 std::atomic<size_t> emit_sum; 232 std::atomic<size_t> receive_count; 233 std::atomic<size_t> receive_sum; 234 235 struct mfnode_body { 236 int max_cnt; 237 std::atomic<int>* my_cnt; 238 mfnode_body(const int& _max, std::atomic<int> &_my) : max_cnt(_max), my_cnt(&_my) { } 239 void operator()(const int &/*in*/, mfnode_type::output_ports_type &out) { 240 int lcnt = ++(*my_cnt); 241 if(lcnt > max_cnt) { 242 return; 243 } 244 // put one continue_msg to the decrement of the limiter. 245 if(!std::get<DECREMENT_OUTPUT>(out).try_put(tbb::flow::continue_msg())) { 246 CHECK_MESSAGE( (false),"Unexpected rejection of decrement"); 247 } 248 { 249 // put messages to the input of the limiter_node until it rejects. 250 while( std::get<LIMITER_OUTPUT>(out).try_put(lcnt) ) { 251 emit_sum += lcnt; 252 ++emit_count; 253 } 254 } 255 } 256 }; 257 258 struct fn_body { 259 int operator()(const int &in) { 260 receive_sum += in; 261 ++receive_count; 262 return in; 263 } 264 }; 265 266 // +------------+ 267 // +---------+ | v 268 // | mf_node |0---+ +----------+ +----------+ 269 // +->| |1---------->| lim_node |--------->| fn_node |--+ 270 // | +---------+ +----------+ +----------+ | 271 // | | 272 // | | 273 // +-------------------------------------------------------------+ 274 // 275 void 276 test_multifunction_to_limiter(int _max, int _nparallel) { 277 tbb::flow::graph g; 278 emit_count = 0; 279 emit_sum = 0; 280 receive_count = 0; 281 receive_sum = 0; 282 std::atomic<int> local_cnt; 283 local_cnt = 0; 284 mfnode_type mf_node(g, tbb::flow::unlimited, mfnode_body(_max, local_cnt)); 285 tbb::flow::function_node<int, int> fn_node(g, tbb::flow::unlimited, fn_body()); 286 tbb::flow::limiter_node<int> lim_node(g, _nparallel); 287 tbb::flow::make_edge(tbb::flow::output_port<LIMITER_OUTPUT>(mf_node), lim_node); 288 tbb::flow::make_edge(tbb::flow::output_port<DECREMENT_OUTPUT>(mf_node), lim_node.decrementer()); 289 tbb::flow::make_edge(lim_node, fn_node); 290 tbb::flow::make_edge(fn_node, mf_node); 291 292 mf_node.try_put(1); 293 g.wait_for_all(); 294 CHECK_MESSAGE( (emit_count == receive_count), "counts do not match"); 295 CHECK_MESSAGE( (emit_sum == receive_sum), "sums do not match"); 296 297 // reset, test again 298 g.reset(); 299 emit_count = 0; 300 emit_sum = 0; 301 receive_count = 0; 302 receive_sum = 0; 303 local_cnt = 0;; 304 mf_node.try_put(1); 305 g.wait_for_all(); 306 CHECK_MESSAGE( (emit_count == receive_count), "counts do not match"); 307 CHECK_MESSAGE( (emit_sum == receive_sum), "sums do not match"); 308 } 309 310 311 void 312 test_continue_msg_reception() { 313 tbb::flow::graph g; 314 tbb::flow::limiter_node<int> ln(g,2); 315 tbb::flow::queue_node<int> qn(g); 316 tbb::flow::make_edge(ln, qn); 317 ln.decrementer().try_put(tbb::flow::continue_msg()); 318 ln.try_put(42); 319 g.wait_for_all(); 320 int outint; 321 CHECK_MESSAGE( (qn.try_get(outint) && outint == 42), "initial put to decrement stops node"); 322 } 323 324 325 // 326 // This test ascertains that if a message is not successfully put 327 // to a successor, the message is not dropped but released. 328 // 329 330 void test_reserve_release_messages() { 331 using namespace tbb::flow; 332 graph g; 333 334 //making two queue_nodes: one broadcast_node and one limiter_node 335 queue_node<int> input_queue(g); 336 queue_node<int> output_queue(g); 337 broadcast_node<int> broad(g); 338 limiter_node<int, int> limit(g,2); //threshold of 2 339 340 //edges 341 make_edge(input_queue, limit); 342 make_edge(limit, output_queue); 343 make_edge(broad,limit.decrementer()); 344 345 int list[4] = {19, 33, 72, 98}; //list to be put to the input queue 346 347 input_queue.try_put(list[0]); // succeeds 348 input_queue.try_put(list[1]); // succeeds 349 input_queue.try_put(list[2]); // fails, stored in upstream buffer 350 g.wait_for_all(); 351 352 remove_edge(limit, output_queue); //remove successor 353 354 //sending message to the decrement port of the limiter 355 broad.try_put(1); //failed message retrieved. 356 g.wait_for_all(); 357 358 make_edge(limit, output_queue); //putting the successor back 359 360 broad.try_put(1); //drop the count 361 362 input_queue.try_put(list[3]); //success 363 g.wait_for_all(); 364 365 int var=0; 366 367 for (int i=0; i<4; i++) { 368 output_queue.try_get(var); 369 CHECK_MESSAGE( (var==list[i]), "some data dropped, input does not match output"); 370 g.wait_for_all(); 371 } 372 } 373 374 void test_decrementer() { 375 const int threshold = 5; 376 tbb::flow::graph g; 377 tbb::flow::limiter_node<int, int> limit(g, threshold); 378 tbb::flow::queue_node<int> queue(g); 379 make_edge(limit, queue); 380 int m = 0; 381 CHECK_MESSAGE( ( limit.try_put( m++ )), "Newly constructed limiter node does not accept message." ); 382 CHECK_MESSAGE( limit.decrementer().try_put( -threshold ), // close limiter's gate 383 "Limiter node decrementer's port does not accept message." ); 384 CHECK_MESSAGE( ( !limit.try_put( m++ )), "Closed limiter node's accepts message." ); 385 CHECK_MESSAGE( limit.decrementer().try_put( threshold + 5 ), // open limiter's gate 386 "Limiter node decrementer's port does not accept message." ); 387 for( int i = 0; i < threshold; ++i ) 388 CHECK_MESSAGE( ( limit.try_put( m++ )), "Limiter node does not accept message while open." ); 389 CHECK_MESSAGE( ( !limit.try_put( m )), "Limiter node's gate is not closed." ); 390 g.wait_for_all(); 391 int expected[] = {0, 2, 3, 4, 5, 6}; 392 int actual = -1; m = 0; 393 while( queue.try_get(actual) ) 394 CHECK_MESSAGE( actual == expected[m++], "" ); 395 CHECK_MESSAGE( ( sizeof(expected) / sizeof(expected[0]) == m), "Not all messages have been processed." ); 396 g.wait_for_all(); 397 398 const size_t threshold2 = size_t(-1); 399 tbb::flow::limiter_node<int, long long> limit2(g, threshold2); 400 make_edge(limit2, queue); 401 CHECK_MESSAGE( ( limit2.try_put( 1 )), "Newly constructed limiter node does not accept message." ); 402 long long decrement_value = (long long)( size_t(-1)/2 ); 403 CHECK_MESSAGE( limit2.decrementer().try_put( -decrement_value ), 404 "Limiter node decrementer's port does not accept message" ); 405 CHECK_MESSAGE( ( limit2.try_put( 2 )), "Limiter's gate should not be closed yet." ); 406 CHECK_MESSAGE( limit2.decrementer().try_put( -decrement_value ), 407 "Limiter node decrementer's port does not accept message" ); 408 CHECK_MESSAGE( ( !limit2.try_put( 3 )), "Overflow happened for internal counter." ); 409 int expected2[] = {1, 2}; 410 actual = -1; m = 0; 411 while( queue.try_get(actual) ) 412 CHECK_MESSAGE( actual == expected2[m++], "" ); 413 CHECK_MESSAGE( ( sizeof(expected2) / sizeof(expected2[0]) == m), "Not all messages have been processed." ); 414 g.wait_for_all(); 415 } 416 417 void test_try_put_without_successors() { 418 tbb::flow::graph g; 419 std::size_t try_put_num{3}; 420 tbb::flow::buffer_node<int> bn(g); 421 tbb::flow::limiter_node<int> ln(g, try_put_num); 422 tbb::flow::make_edge(bn, ln); 423 std::size_t i = 1; 424 for (; i <= try_put_num; i++) 425 bn.try_put(i); 426 427 std::atomic<std::size_t> counter{0}; 428 tbb::flow::function_node<int, int> fn(g, tbb::flow::unlimited, 429 [&](int input) { 430 counter += input; 431 return int{}; 432 } 433 ); 434 tbb::flow::make_edge(ln, fn); 435 g.wait_for_all(); 436 CHECK((counter == i * try_put_num / 2)); 437 438 // Check the lost message 439 tbb::flow::remove_edge(bn, ln); 440 ln.decrementer().try_put(tbb::flow::continue_msg()); 441 bn.try_put(try_put_num + 1); 442 g.wait_for_all(); 443 CHECK((counter == i * try_put_num / 2)); 444 445 } 446 447 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 448 #include <array> 449 #include <vector> 450 void test_follows_and_precedes_api() { 451 using msg_t = tbb::flow::continue_msg; 452 453 std::array<msg_t, 3> messages_for_follows= { {msg_t(), msg_t(), msg_t()} }; 454 std::vector<msg_t> messages_for_precedes = {msg_t()}; 455 456 follows_and_precedes_testing::test_follows 457 <msg_t, tbb::flow::limiter_node<msg_t, msg_t>>(messages_for_follows, 1000); 458 follows_and_precedes_testing::test_precedes 459 <msg_t, tbb::flow::limiter_node<msg_t, msg_t>>(messages_for_precedes, 1000); 460 461 } 462 #endif 463 464 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 465 void test_deduction_guides() { 466 using namespace tbb::flow; 467 468 graph g; 469 broadcast_node<int> br(g); 470 limiter_node<int> l0(g, 100); 471 472 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 473 limiter_node l1(follows(br), 100); 474 static_assert(std::is_same_v<decltype(l1), limiter_node<int>>); 475 476 limiter_node l2(precedes(br), 100); 477 static_assert(std::is_same_v<decltype(l2), limiter_node<int>>); 478 #endif 479 480 limiter_node l3(l0); 481 static_assert(std::is_same_v<decltype(l3), limiter_node<int>>); 482 } 483 #endif 484 485 //! Test puts on limiter_node with decrements and varying parallelism levels 486 //! \brief \ref error_guessing 487 TEST_CASE("Serial and parallel tests") { 488 for (unsigned i = 1; i <= 2 * utils::MaxThread; ++i) { 489 tbb::task_arena arena(i); 490 arena.execute( 491 [i]() { 492 test_serial<int>(); 493 test_parallel<int>(i); 494 } 495 ); 496 } 497 } 498 499 //! Test initial put of continue_msg on decrementer port does not stop message flow 500 //! \brief \ref error_guessing 501 TEST_CASE("Test continue_msg reception") { 502 test_continue_msg_reception(); 503 } 504 505 //! Test multifunction_node connected to limiter_node 506 //! \brief \ref error_guessing 507 TEST_CASE("Multifunction connected to limiter") { 508 test_multifunction_to_limiter(30,3); 509 test_multifunction_to_limiter(300,13); 510 test_multifunction_to_limiter(3000,1); 511 } 512 513 //! Test message release if successor doesn't accept 514 //! \brief \ref requirement 515 TEST_CASE("Message is released if successor does not accept") { 516 test_reserve_release_messages(); 517 } 518 519 //! Test decrementer 520 //! \brief \ref requirement \ref error_guessing 521 TEST_CASE("Decrementer") { 522 test_decrementer(); 523 } 524 525 //! Test try_put() without successor 526 //! \brief \ref error_guessing 527 TEST_CASE("Test try_put() without successors") { 528 test_try_put_without_successors(); 529 } 530 531 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 532 //! Test follows and precedes API 533 //! \brief \ref error_guessing 534 TEST_CASE( "Support for follows and precedes API" ) { 535 test_follows_and_precedes_api(); 536 } 537 #endif 538 539 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 540 //! Test deduction guides 541 //! \brief \ref requirement 542 TEST_CASE( "Deduction guides" ) { 543 test_deduction_guides(); 544 } 545 #endif 546