1 /* 2 Copyright (c) 2005-2020 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #include "common/config.h" 18 19 #include "tbb/flow_graph.h" 20 21 #include "common/test.h" 22 #include "common/utils.h" 23 #include "common/utils_assert.h" 24 #include "common/test_follows_and_precedes_api.h" 25 26 #include <atomic> 27 28 29 //! \file test_limiter_node.cpp 30 //! \brief Test for [flow_graph.limiter_node] specification 31 32 33 const int L = 10; 34 const int N = 1000; 35 36 using tbb::detail::d1::SUCCESSFULLY_ENQUEUED; 37 using tbb::detail::d1::graph_task; 38 39 template< typename T > 40 struct serial_receiver : public tbb::flow::receiver<T>, utils::NoAssign { 41 T next_value; 42 tbb::flow::graph& my_graph; 43 44 serial_receiver(tbb::flow::graph& g) : next_value(T(0)), my_graph(g) {} 45 46 graph_task* try_put_task( const T &v ) override { 47 CHECK_MESSAGE( next_value++ == v, "" ); 48 return const_cast<graph_task*>(SUCCESSFULLY_ENQUEUED); 49 } 50 51 tbb::flow::graph& graph_reference() const override { 52 return my_graph; 53 } 54 }; 55 56 template< typename T > 57 struct parallel_receiver : public tbb::flow::receiver<T>, utils::NoAssign { 58 59 std::atomic<int> my_count; 60 tbb::flow::graph& my_graph; 61 62 parallel_receiver(tbb::flow::graph& g) : my_graph(g) { my_count = 0; } 63 64 graph_task* try_put_task( const T &/*v*/ ) override { 65 ++my_count; 66 return const_cast<graph_task*>(SUCCESSFULLY_ENQUEUED); 67 } 68 69 tbb::flow::graph& graph_reference() const override { 70 return my_graph; 71 } 72 }; 73 74 template< typename T > 75 struct empty_sender : public tbb::flow::sender<T> { 76 typedef typename tbb::flow::sender<T>::successor_type successor_type; 77 78 bool register_successor( successor_type & ) override { return false; } 79 bool remove_successor( successor_type & ) override { return false; } 80 }; 81 82 83 template< typename T > 84 struct put_body : utils::NoAssign { 85 86 tbb::flow::limiter_node<T> &my_lim; 87 std::atomic<int> &my_accept_count; 88 89 put_body( tbb::flow::limiter_node<T> &lim, std::atomic<int> &accept_count ) : 90 my_lim(lim), my_accept_count(accept_count) {} 91 92 void operator()( int ) const { 93 for ( int i = 0; i < L; ++i ) { 94 bool msg = my_lim.try_put( T(i) ); 95 if ( msg == true ) 96 ++my_accept_count; 97 } 98 } 99 }; 100 101 template< typename T > 102 struct put_dec_body : utils::NoAssign { 103 104 tbb::flow::limiter_node<T> &my_lim; 105 std::atomic<int> &my_accept_count; 106 107 put_dec_body( tbb::flow::limiter_node<T> &lim, std::atomic<int> &accept_count ) : 108 my_lim(lim), my_accept_count(accept_count) {} 109 110 void operator()( int ) const { 111 int local_accept_count = 0; 112 while ( local_accept_count < N ) { 113 bool msg = my_lim.try_put( T(local_accept_count) ); 114 if ( msg == true ) { 115 ++local_accept_count; 116 ++my_accept_count; 117 my_lim.decrement.try_put( tbb::flow::continue_msg() ); 118 } 119 } 120 } 121 122 }; 123 124 template< typename T > 125 void test_puts_with_decrements( int num_threads, tbb::flow::limiter_node< T >& lim , tbb::flow::graph& g) { 126 parallel_receiver<T> r(g); 127 empty_sender< tbb::flow::continue_msg > s; 128 std::atomic<int> accept_count; 129 accept_count = 0; 130 tbb::flow::make_edge( lim, r ); 131 tbb::flow::make_edge(s, lim.decrement); 132 133 // test puts with decrements 134 utils::NativeParallelFor( num_threads, put_dec_body<T>(lim, accept_count) ); 135 int c = accept_count; 136 CHECK_MESSAGE( c == N*num_threads, "" ); 137 CHECK_MESSAGE( r.my_count == N*num_threads, "" ); 138 } 139 140 // 141 // Tests 142 // 143 // limiter only forwards below the limit, multiple parallel senders / single receiver 144 // multiple parallel senders that put to decrement at each accept, limiter accepts new messages 145 // 146 // 147 template< typename T > 148 int test_parallel(int num_threads) { 149 150 // test puts with no decrements 151 for ( int i = 0; i < L; ++i ) { 152 tbb::flow::graph g; 153 tbb::flow::limiter_node< T > lim(g, i); 154 parallel_receiver<T> r(g); 155 std::atomic<int> accept_count; 156 accept_count = 0; 157 tbb::flow::make_edge( lim, r ); 158 // test puts with no decrements 159 utils::NativeParallelFor( num_threads, put_body<T>(lim, accept_count) ); 160 g.wait_for_all(); 161 int c = accept_count; 162 CHECK_MESSAGE( c == i, "" ); 163 } 164 165 // test puts with decrements 166 for ( int i = 1; i < L; ++i ) { 167 tbb::flow::graph g; 168 tbb::flow::limiter_node< T > lim(g, i); 169 test_puts_with_decrements(num_threads, lim, g); 170 tbb::flow::limiter_node< T > lim_copy( lim ); 171 test_puts_with_decrements(num_threads, lim_copy, g); 172 } 173 174 return 0; 175 } 176 177 // 178 // Tests 179 // 180 // limiter only forwards below the limit, single sender / single receiver 181 // at reject, a put to decrement, will cause next message to be accepted 182 // 183 template< typename T > 184 int test_serial() { 185 186 // test puts with no decrements 187 for ( int i = 0; i < L; ++i ) { 188 tbb::flow::graph g; 189 tbb::flow::limiter_node< T > lim(g, i); 190 serial_receiver<T> r(g); 191 tbb::flow::make_edge( lim, r ); 192 for ( int j = 0; j < L; ++j ) { 193 bool msg = lim.try_put( T(j) ); 194 CHECK_MESSAGE( (( j < i && msg == true ) || ( j >= i && msg == false )), "" ); 195 } 196 g.wait_for_all(); 197 } 198 199 // test puts with decrements 200 for ( int i = 1; i < L; ++i ) { 201 tbb::flow::graph g; 202 tbb::flow::limiter_node< T > lim(g, i); 203 serial_receiver<T> r(g); 204 empty_sender< tbb::flow::continue_msg > s; 205 tbb::flow::make_edge( lim, r ); 206 tbb::flow::make_edge(s, lim.decrement); 207 for ( int j = 0; j < N; ++j ) { 208 bool msg = lim.try_put( T(j) ); 209 CHECK_MESSAGE( (( j < i && msg == true ) || ( j >= i && msg == false )), "" ); 210 if ( msg == false ) { 211 lim.decrement.try_put( tbb::flow::continue_msg() ); 212 msg = lim.try_put( T(j) ); 213 CHECK_MESSAGE( msg == true, "" ); 214 } 215 } 216 } 217 return 0; 218 } 219 220 // reported bug in limiter (http://software.intel.com/en-us/comment/1752355) 221 #define DECREMENT_OUTPUT 1 // the port number of the decrement output of the multifunction_node 222 #define LIMITER_OUTPUT 0 // port number of the integer output 223 224 typedef tbb::flow::multifunction_node<int, std::tuple<int,tbb::flow::continue_msg> > mfnode_type; 225 226 std::atomic<size_t> emit_count; 227 std::atomic<size_t> emit_sum; 228 std::atomic<size_t> receive_count; 229 std::atomic<size_t> receive_sum; 230 231 struct mfnode_body { 232 int max_cnt; 233 std::atomic<int>* my_cnt; 234 mfnode_body(const int& _max, std::atomic<int> &_my) : max_cnt(_max), my_cnt(&_my) { } 235 void operator()(const int &/*in*/, mfnode_type::output_ports_type &out) { 236 int lcnt = ++(*my_cnt); 237 if(lcnt > max_cnt) { 238 return; 239 } 240 // put one continue_msg to the decrement of the limiter. 241 if(!std::get<DECREMENT_OUTPUT>(out).try_put(tbb::flow::continue_msg())) { 242 CHECK_MESSAGE( (false),"Unexpected rejection of decrement"); 243 } 244 { 245 // put messages to the input of the limiter_node until it rejects. 246 while( std::get<LIMITER_OUTPUT>(out).try_put(lcnt) ) { 247 emit_sum += lcnt; 248 ++emit_count; 249 } 250 } 251 } 252 }; 253 254 struct fn_body { 255 int operator()(const int &in) { 256 receive_sum += in; 257 ++receive_count; 258 return in; 259 } 260 }; 261 262 // +------------+ 263 // +---------+ | v 264 // | mf_node |0---+ +----------+ +----------+ 265 // +->| |1---------->| lim_node |--------->| fn_node |--+ 266 // | +---------+ +----------+ +----------+ | 267 // | | 268 // | | 269 // +-------------------------------------------------------------+ 270 // 271 void 272 test_multifunction_to_limiter(int _max, int _nparallel) { 273 tbb::flow::graph g; 274 emit_count = 0; 275 emit_sum = 0; 276 receive_count = 0; 277 receive_sum = 0; 278 std::atomic<int> local_cnt; 279 local_cnt = 0; 280 mfnode_type mf_node(g, tbb::flow::unlimited, mfnode_body(_max, local_cnt)); 281 tbb::flow::function_node<int, int> fn_node(g, tbb::flow::unlimited, fn_body()); 282 tbb::flow::limiter_node<int> lim_node(g, _nparallel); 283 tbb::flow::make_edge(tbb::flow::output_port<LIMITER_OUTPUT>(mf_node), lim_node); 284 tbb::flow::make_edge(tbb::flow::output_port<DECREMENT_OUTPUT>(mf_node), lim_node.decrement); 285 tbb::flow::make_edge(lim_node, fn_node); 286 tbb::flow::make_edge(fn_node, mf_node); 287 288 mf_node.try_put(1); 289 g.wait_for_all(); 290 CHECK_MESSAGE( (emit_count == receive_count), "counts do not match"); 291 CHECK_MESSAGE( (emit_sum == receive_sum), "sums do not match"); 292 293 // reset, test again 294 g.reset(); 295 emit_count = 0; 296 emit_sum = 0; 297 receive_count = 0; 298 receive_sum = 0; 299 local_cnt = 0;; 300 mf_node.try_put(1); 301 g.wait_for_all(); 302 CHECK_MESSAGE( (emit_count == receive_count), "counts do not match"); 303 CHECK_MESSAGE( (emit_sum == receive_sum), "sums do not match"); 304 } 305 306 307 void 308 test_continue_msg_reception() { 309 tbb::flow::graph g; 310 tbb::flow::limiter_node<int> ln(g,2); 311 tbb::flow::queue_node<int> qn(g); 312 tbb::flow::make_edge(ln, qn); 313 ln.decrement.try_put(tbb::flow::continue_msg()); 314 ln.try_put(42); 315 g.wait_for_all(); 316 int outint; 317 CHECK_MESSAGE( (qn.try_get(outint) && outint == 42), "initial put to decrement stops node"); 318 } 319 320 321 // 322 // This test ascertains that if a message is not successfully put 323 // to a successor, the message is not dropped but released. 324 // 325 326 void test_reserve_release_messages() { 327 using namespace tbb::flow; 328 graph g; 329 330 //making two queue_nodes: one broadcast_node and one limiter_node 331 queue_node<int> input_queue(g); 332 queue_node<int> output_queue(g); 333 broadcast_node<int> broad(g); 334 limiter_node<int, int> limit(g,2); //threshold of 2 335 336 //edges 337 make_edge(input_queue, limit); 338 make_edge(limit, output_queue); 339 make_edge(broad,limit.decrement); 340 341 int list[4] = {19, 33, 72, 98}; //list to be put to the input queue 342 343 input_queue.try_put(list[0]); // succeeds 344 input_queue.try_put(list[1]); // succeeds 345 input_queue.try_put(list[2]); // fails, stored in upstream buffer 346 g.wait_for_all(); 347 348 remove_edge(limit, output_queue); //remove successor 349 350 //sending message to the decrement port of the limiter 351 broad.try_put(1); //failed message retrieved. 352 g.wait_for_all(); 353 354 make_edge(limit, output_queue); //putting the successor back 355 356 broad.try_put(1); //drop the count 357 358 input_queue.try_put(list[3]); //success 359 g.wait_for_all(); 360 361 int var=0; 362 363 for (int i=0; i<4; i++) { 364 output_queue.try_get(var); 365 CHECK_MESSAGE( (var==list[i]), "some data dropped, input does not match output"); 366 g.wait_for_all(); 367 } 368 } 369 370 void test_decrementer() { 371 const int threshold = 5; 372 tbb::flow::graph g; 373 tbb::flow::limiter_node<int, int> limit(g, threshold); 374 tbb::flow::queue_node<int> queue(g); 375 make_edge(limit, queue); 376 int m = 0; 377 CHECK_MESSAGE( ( limit.try_put( m++ )), "Newly constructed limiter node does not accept message." ); 378 CHECK_MESSAGE( limit.decrement.try_put( -threshold ), // close limiter's gate 379 "Limiter node decrementer's port does not accept message." ); 380 CHECK_MESSAGE( ( !limit.try_put( m++ )), "Closed limiter node's accepts message." ); 381 CHECK_MESSAGE( limit.decrement.try_put( threshold + 5 ), // open limiter's gate 382 "Limiter node decrementer's port does not accept message." ); 383 for( int i = 0; i < threshold; ++i ) 384 CHECK_MESSAGE( ( limit.try_put( m++ )), "Limiter node does not accept message while open." ); 385 CHECK_MESSAGE( ( !limit.try_put( m )), "Limiter node's gate is not closed." ); 386 g.wait_for_all(); 387 int expected[] = {0, 2, 3, 4, 5, 6}; 388 int actual = -1; m = 0; 389 while( queue.try_get(actual) ) 390 CHECK_MESSAGE( actual == expected[m++], "" ); 391 CHECK_MESSAGE( ( sizeof(expected) / sizeof(expected[0]) == m), "Not all messages have been processed." ); 392 g.wait_for_all(); 393 394 const size_t threshold2 = size_t(-1); 395 tbb::flow::limiter_node<int, long long> limit2(g, threshold2); 396 make_edge(limit2, queue); 397 CHECK_MESSAGE( ( limit2.try_put( 1 )), "Newly constructed limiter node does not accept message." ); 398 long long decrement_value = (long long)( size_t(-1)/2 ); 399 CHECK_MESSAGE( limit2.decrement.try_put( -decrement_value ), 400 "Limiter node decrementer's port does not accept message" ); 401 CHECK_MESSAGE( ( limit2.try_put( 2 )), "Limiter's gate should not be closed yet." ); 402 CHECK_MESSAGE( limit2.decrement.try_put( -decrement_value ), 403 "Limiter node decrementer's port does not accept message" ); 404 CHECK_MESSAGE( ( !limit2.try_put( 3 )), "Overflow happened for internal counter." ); 405 int expected2[] = {1, 2}; 406 actual = -1; m = 0; 407 while( queue.try_get(actual) ) 408 CHECK_MESSAGE( actual == expected2[m++], "" ); 409 CHECK_MESSAGE( ( sizeof(expected2) / sizeof(expected2[0]) == m), "Not all messages have been processed." ); 410 g.wait_for_all(); 411 } 412 413 void test_try_put_without_successors() { 414 tbb::flow::graph g; 415 std::size_t try_put_num{3}; 416 tbb::flow::buffer_node<int> bn(g); 417 tbb::flow::limiter_node<int> ln(g, try_put_num); 418 tbb::flow::make_edge(bn, ln); 419 std::size_t i = 1; 420 for (; i <= try_put_num; i++) 421 bn.try_put(i); 422 423 std::atomic<std::size_t> counter{0}; 424 tbb::flow::function_node<int, int> fn(g, tbb::flow::unlimited, 425 [&](int input) { 426 counter += input; 427 return int{}; 428 } 429 ); 430 tbb::flow::make_edge(ln, fn); 431 g.wait_for_all(); 432 CHECK((counter == i * try_put_num / 2)); 433 434 // Check the lost message 435 tbb::flow::remove_edge(bn, ln); 436 ln.decrement.try_put(tbb::flow::continue_msg()); 437 bn.try_put(try_put_num + 1); 438 g.wait_for_all(); 439 CHECK((counter == i * try_put_num / 2)); 440 441 } 442 443 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 444 #include <array> 445 #include <vector> 446 void test_follows_and_precedes_api() { 447 using msg_t = tbb::flow::continue_msg; 448 449 std::array<msg_t, 3> messages_for_follows= { {msg_t(), msg_t(), msg_t()} }; 450 std::vector<msg_t> messages_for_precedes = {msg_t()}; 451 452 follows_and_precedes_testing::test_follows 453 <msg_t, tbb::flow::limiter_node<msg_t, msg_t>>(messages_for_follows, 1000); 454 follows_and_precedes_testing::test_precedes 455 <msg_t, tbb::flow::limiter_node<msg_t, msg_t>>(messages_for_precedes, 1000); 456 457 } 458 #endif 459 460 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 461 void test_deduction_guides() { 462 using namespace tbb::flow; 463 464 graph g; 465 broadcast_node<int> br(g); 466 limiter_node<int> l0(g, 100); 467 468 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 469 limiter_node l1(follows(br), 100); 470 static_assert(std::is_same_v<decltype(l1), limiter_node<int>>); 471 472 limiter_node l2(precedes(br), 100); 473 static_assert(std::is_same_v<decltype(l2), limiter_node<int>>); 474 #endif 475 476 limiter_node l3(l0); 477 static_assert(std::is_same_v<decltype(l3), limiter_node<int>>); 478 } 479 #endif 480 481 //! Test puts on limiter_node with decrements and varying parallelism levels 482 //! \brief \ref error_guessing 483 TEST_CASE("Serial and parallel tests") { 484 for (unsigned i = 1; i <= 2 * utils::MaxThread; ++i) { 485 tbb::task_arena arena(i); 486 arena.execute( 487 [i]() { 488 test_serial<int>(); 489 test_parallel<int>(i); 490 } 491 ); 492 } 493 } 494 495 //! Test initial put of continue_msg on decrementer port does not stop message flow 496 //! \brief \ref error_guessing 497 TEST_CASE("Test continue_msg reception") { 498 test_continue_msg_reception(); 499 } 500 501 //! Test multifunction_node connected to limiter_node 502 //! \brief \ref error_guessing 503 TEST_CASE("Multifunction connected to limiter") { 504 test_multifunction_to_limiter(30,3); 505 test_multifunction_to_limiter(300,13); 506 test_multifunction_to_limiter(3000,1); 507 } 508 509 //! Test message release if successor doesn't accept 510 //! \brief \ref requirement 511 TEST_CASE("Message is released if successor does not accept") { 512 test_reserve_release_messages(); 513 } 514 515 //! Test decrementer 516 //! \brief \ref requirement \ref error_guessing 517 TEST_CASE("Decrementer") { 518 test_decrementer(); 519 } 520 521 //! Test try_put() without successor 522 //! \brief \ref error_guessing 523 TEST_CASE("Test try_put() without successors") { 524 test_try_put_without_successors(); 525 } 526 527 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 528 //! Test follows and precedes API 529 //! \brief \ref error_guessing 530 TEST_CASE( "Support for follows and precedes API" ) { 531 test_follows_and_precedes_api(); 532 } 533 #endif 534 535 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 536 //! Test deduction guides 537 //! \brief \ref requirement 538 TEST_CASE( "Deduction guides" ) { 539 test_deduction_guides(); 540 } 541 #endif 542