1 /*
2 Copyright (c) 2005-2023 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #if __INTEL_COMPILER && _MSC_VER
18 #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
19 #endif
20
21 #include "common/config.h"
22
23 #include "tbb/flow_graph.h"
24
25 #include "common/test.h"
26 #include "common/utils.h"
27 #include "common/utils_assert.h"
28 #include "common/test_follows_and_precedes_api.h"
29 #include "tbb/global_control.h"
30
31 #include <atomic>
32
33
34 //! \file test_limiter_node.cpp
35 //! \brief Test for [flow_graph.limiter_node] specification
36
37
38 const int L = 10;
39 const int N = 1000;
40
41 using tbb::detail::d1::SUCCESSFULLY_ENQUEUED;
42 using tbb::detail::d1::graph_task;
43
44 template< typename T >
45 struct serial_receiver : public tbb::flow::receiver<T>, utils::NoAssign {
46 T next_value;
47 tbb::flow::graph& my_graph;
48
serial_receiverserial_receiver49 serial_receiver(tbb::flow::graph& g) : next_value(T(0)), my_graph(g) {}
50
try_put_taskserial_receiver51 graph_task* try_put_task( const T &v ) override {
52 CHECK_MESSAGE( next_value++ == v, "" );
53 return const_cast<graph_task*>(SUCCESSFULLY_ENQUEUED);
54 }
55
graph_referenceserial_receiver56 tbb::flow::graph& graph_reference() const override {
57 return my_graph;
58 }
59 };
60
61 template< typename T >
62 struct parallel_receiver : public tbb::flow::receiver<T>, utils::NoAssign {
63
64 std::atomic<int> my_count;
65 tbb::flow::graph& my_graph;
66
parallel_receiverparallel_receiver67 parallel_receiver(tbb::flow::graph& g) : my_graph(g) { my_count = 0; }
68
try_put_taskparallel_receiver69 graph_task* try_put_task( const T &/*v*/ ) override {
70 ++my_count;
71 return const_cast<graph_task*>(SUCCESSFULLY_ENQUEUED);
72 }
73
graph_referenceparallel_receiver74 tbb::flow::graph& graph_reference() const override {
75 return my_graph;
76 }
77 };
78
79 template< typename T >
80 struct empty_sender : public tbb::flow::sender<T> {
81 typedef typename tbb::flow::sender<T>::successor_type successor_type;
82
register_successorempty_sender83 bool register_successor( successor_type & ) override { return false; }
remove_successorempty_sender84 bool remove_successor( successor_type & ) override { return false; }
85 };
86
87
88 template< typename T >
89 struct put_body : utils::NoAssign {
90
91 tbb::flow::limiter_node<T> &my_lim;
92 std::atomic<int> &my_accept_count;
93
put_bodyput_body94 put_body( tbb::flow::limiter_node<T> &lim, std::atomic<int> &accept_count ) :
95 my_lim(lim), my_accept_count(accept_count) {}
96
operator ()put_body97 void operator()( int ) const {
98 for ( int i = 0; i < L; ++i ) {
99 bool msg = my_lim.try_put( T(i) );
100 if ( msg == true )
101 ++my_accept_count;
102 }
103 }
104 };
105
106 template< typename T >
107 struct put_dec_body : utils::NoAssign {
108
109 tbb::flow::limiter_node<T> &my_lim;
110 std::atomic<int> &my_accept_count;
111
put_dec_bodyput_dec_body112 put_dec_body( tbb::flow::limiter_node<T> &lim, std::atomic<int> &accept_count ) :
113 my_lim(lim), my_accept_count(accept_count) {}
114
operator ()put_dec_body115 void operator()( int ) const {
116 int local_accept_count = 0;
117 while ( local_accept_count < N ) {
118 bool msg = my_lim.try_put( T(local_accept_count) );
119 if ( msg == true ) {
120 ++local_accept_count;
121 ++my_accept_count;
122 my_lim.decrementer().try_put( tbb::flow::continue_msg() );
123 }
124 }
125 }
126
127 };
128
129 template< typename T >
test_puts_with_decrements(int num_threads,tbb::flow::limiter_node<T> & lim,tbb::flow::graph & g)130 void test_puts_with_decrements( int num_threads, tbb::flow::limiter_node< T >& lim , tbb::flow::graph& g) {
131 parallel_receiver<T> r(g);
132 empty_sender< tbb::flow::continue_msg > s;
133 std::atomic<int> accept_count;
134 accept_count = 0;
135 tbb::flow::make_edge( lim, r );
136 tbb::flow::make_edge(s, lim.decrementer());
137
138 // test puts with decrements
139 utils::NativeParallelFor( num_threads, put_dec_body<T>(lim, accept_count) );
140 int c = accept_count;
141 CHECK_MESSAGE( c == N*num_threads, "" );
142 CHECK_MESSAGE( r.my_count == N*num_threads, "" );
143 }
144
145 //
146 // Tests
147 //
148 // limiter only forwards below the limit, multiple parallel senders / single receiver
149 // multiple parallel senders that put to decrement at each accept, limiter accepts new messages
150 //
151 //
152 template< typename T >
test_parallel(int num_threads)153 int test_parallel(int num_threads) {
154
155 // test puts with no decrements
156 for ( int i = 0; i < L; ++i ) {
157 tbb::flow::graph g;
158 tbb::flow::limiter_node< T > lim(g, i);
159 parallel_receiver<T> r(g);
160 std::atomic<int> accept_count;
161 accept_count = 0;
162 tbb::flow::make_edge( lim, r );
163 // test puts with no decrements
164 utils::NativeParallelFor( num_threads, put_body<T>(lim, accept_count) );
165 g.wait_for_all();
166 int c = accept_count;
167 CHECK_MESSAGE( c == i, "" );
168 }
169
170 // test puts with decrements
171 for ( int i = 1; i < L; ++i ) {
172 tbb::flow::graph g;
173 tbb::flow::limiter_node< T > lim(g, i);
174 test_puts_with_decrements(num_threads, lim, g);
175 tbb::flow::limiter_node< T > lim_copy( lim );
176 test_puts_with_decrements(num_threads, lim_copy, g);
177 }
178
179 return 0;
180 }
181
182 //
183 // Tests
184 //
185 // limiter only forwards below the limit, single sender / single receiver
186 // at reject, a put to decrement, will cause next message to be accepted
187 //
188 template< typename T >
test_serial()189 int test_serial() {
190
191 // test puts with no decrements
192 for ( int i = 0; i < L; ++i ) {
193 tbb::flow::graph g;
194 tbb::flow::limiter_node< T > lim(g, i);
195 serial_receiver<T> r(g);
196 tbb::flow::make_edge( lim, r );
197 for ( int j = 0; j < L; ++j ) {
198 bool msg = lim.try_put( T(j) );
199 CHECK_MESSAGE( (( j < i && msg == true ) || ( j >= i && msg == false )), "" );
200 }
201 g.wait_for_all();
202 }
203
204 // test puts with decrements
205 for ( int i = 1; i < L; ++i ) {
206 tbb::flow::graph g;
207 tbb::flow::limiter_node< T > lim(g, i);
208 serial_receiver<T> r(g);
209 empty_sender< tbb::flow::continue_msg > s;
210 tbb::flow::make_edge( lim, r );
211 tbb::flow::make_edge(s, lim.decrementer());
212 for ( int j = 0; j < N; ++j ) {
213 bool msg = lim.try_put( T(j) );
214 CHECK_MESSAGE( (( j < i && msg == true ) || ( j >= i && msg == false )), "" );
215 if ( msg == false ) {
216 lim.decrementer().try_put( tbb::flow::continue_msg() );
217 msg = lim.try_put( T(j) );
218 CHECK_MESSAGE( msg == true, "" );
219 }
220 }
221 }
222 return 0;
223 }
224
225 // reported bug in limiter (https://community.intel.com/t5/Intel-oneAPI-Threading-Building/multifun-node-try-put-several-messages-to-one-successor-crashes/m-p/922844)
226 #define DECREMENT_OUTPUT 1 // the port number of the decrement output of the multifunction_node
227 #define LIMITER_OUTPUT 0 // port number of the integer output
228
229 typedef tbb::flow::multifunction_node<int, std::tuple<int,tbb::flow::continue_msg> > mfnode_type;
230
231 std::atomic<size_t> emit_count;
232 std::atomic<size_t> emit_sum;
233 std::atomic<size_t> receive_count;
234 std::atomic<size_t> receive_sum;
235
236 struct mfnode_body {
237 int max_cnt;
238 std::atomic<int>* my_cnt;
mfnode_bodymfnode_body239 mfnode_body(const int& _max, std::atomic<int> &_my) : max_cnt(_max), my_cnt(&_my) { }
operator ()mfnode_body240 void operator()(const int &/*in*/, mfnode_type::output_ports_type &out) {
241 int lcnt = ++(*my_cnt);
242 if(lcnt > max_cnt) {
243 return;
244 }
245 // put one continue_msg to the decrement of the limiter.
246 if(!std::get<DECREMENT_OUTPUT>(out).try_put(tbb::flow::continue_msg())) {
247 CHECK_MESSAGE( (false),"Unexpected rejection of decrement");
248 }
249 {
250 // put messages to the input of the limiter_node until it rejects.
251 while( std::get<LIMITER_OUTPUT>(out).try_put(lcnt) ) {
252 emit_sum += lcnt;
253 ++emit_count;
254 }
255 }
256 }
257 };
258
259 struct fn_body {
operator ()fn_body260 int operator()(const int &in) {
261 receive_sum += in;
262 ++receive_count;
263 return in;
264 }
265 };
266
267 // +------------+
268 // +---------+ | v
269 // | mf_node |0---+ +----------+ +----------+
270 // +->| |1---------->| lim_node |--------->| fn_node |--+
271 // | +---------+ +----------+ +----------+ |
272 // | |
273 // | |
274 // +-------------------------------------------------------------+
275 //
276 void
test_multifunction_to_limiter(int _max,int _nparallel)277 test_multifunction_to_limiter(int _max, int _nparallel) {
278 tbb::flow::graph g;
279 emit_count = 0;
280 emit_sum = 0;
281 receive_count = 0;
282 receive_sum = 0;
283 std::atomic<int> local_cnt;
284 local_cnt = 0;
285 mfnode_type mf_node(g, tbb::flow::unlimited, mfnode_body(_max, local_cnt));
286 tbb::flow::function_node<int, int> fn_node(g, tbb::flow::unlimited, fn_body());
287 tbb::flow::limiter_node<int> lim_node(g, _nparallel);
288 tbb::flow::make_edge(tbb::flow::output_port<LIMITER_OUTPUT>(mf_node), lim_node);
289 tbb::flow::make_edge(tbb::flow::output_port<DECREMENT_OUTPUT>(mf_node), lim_node.decrementer());
290 tbb::flow::make_edge(lim_node, fn_node);
291 tbb::flow::make_edge(fn_node, mf_node);
292
293 mf_node.try_put(1);
294 g.wait_for_all();
295 CHECK_MESSAGE( (emit_count == receive_count), "counts do not match");
296 CHECK_MESSAGE( (emit_sum == receive_sum), "sums do not match");
297
298 // reset, test again
299 g.reset();
300 emit_count = 0;
301 emit_sum = 0;
302 receive_count = 0;
303 receive_sum = 0;
304 local_cnt = 0;
305 mf_node.try_put(1);
306 g.wait_for_all();
307 CHECK_MESSAGE( (emit_count == receive_count), "counts do not match");
308 CHECK_MESSAGE( (emit_sum == receive_sum), "sums do not match");
309 }
310
311
312 void
test_continue_msg_reception()313 test_continue_msg_reception() {
314 tbb::flow::graph g;
315 tbb::flow::limiter_node<int> ln(g,2);
316 tbb::flow::queue_node<int> qn(g);
317 tbb::flow::make_edge(ln, qn);
318 ln.decrementer().try_put(tbb::flow::continue_msg());
319 ln.try_put(42);
320 g.wait_for_all();
321 int outint;
322 CHECK_MESSAGE( (qn.try_get(outint) && outint == 42), "initial put to decrement stops node");
323 }
324
325
326 //
327 // This test ascertains that if a message is not successfully put
328 // to a successor, the message is not dropped but released.
329 //
330
test_reserve_release_messages()331 void test_reserve_release_messages() {
332 using namespace tbb::flow;
333 graph g;
334
335 //making two queue_nodes: one broadcast_node and one limiter_node
336 queue_node<int> input_queue(g);
337 queue_node<int> output_queue(g);
338 broadcast_node<int> broad(g);
339 limiter_node<int, int> limit(g,2); //threshold of 2
340
341 //edges
342 make_edge(input_queue, limit);
343 make_edge(limit, output_queue);
344 make_edge(broad,limit.decrementer());
345
346 int list[4] = {19, 33, 72, 98}; //list to be put to the input queue
347
348 input_queue.try_put(list[0]); // succeeds
349 input_queue.try_put(list[1]); // succeeds
350 input_queue.try_put(list[2]); // fails, stored in upstream buffer
351 g.wait_for_all();
352
353 remove_edge(limit, output_queue); //remove successor
354
355 //sending message to the decrement port of the limiter
356 broad.try_put(1); //failed message retrieved.
357 g.wait_for_all();
358
359 tbb::flow::make_edge(limit, output_queue); //putting the successor back
360
361 broad.try_put(1); //drop the count
362
363 input_queue.try_put(list[3]); //success
364 g.wait_for_all();
365
366 int var=0;
367
368 for (int i=0; i<4; i++) {
369 output_queue.try_get(var);
370 CHECK_MESSAGE( (var==list[i]), "some data dropped, input does not match output");
371 g.wait_for_all();
372 }
373 }
374
test_decrementer()375 void test_decrementer() {
376 const int threshold = 5;
377 tbb::flow::graph g;
378 tbb::flow::limiter_node<int, int> limit(g, threshold);
379 tbb::flow::queue_node<int> queue(g);
380 make_edge(limit, queue);
381 int m = 0;
382 CHECK_MESSAGE( ( limit.try_put( m++ )), "Newly constructed limiter node does not accept message." );
383 CHECK_MESSAGE( limit.decrementer().try_put( -threshold ), // close limiter's gate
384 "Limiter node decrementer's port does not accept message." );
385 CHECK_MESSAGE( ( !limit.try_put( m++ )), "Closed limiter node's accepts message." );
386 CHECK_MESSAGE( limit.decrementer().try_put( threshold + 5 ), // open limiter's gate
387 "Limiter node decrementer's port does not accept message." );
388 for( int i = 0; i < threshold; ++i )
389 CHECK_MESSAGE( ( limit.try_put( m++ )), "Limiter node does not accept message while open." );
390 CHECK_MESSAGE( ( !limit.try_put( m )), "Limiter node's gate is not closed." );
391 g.wait_for_all();
392 int expected[] = {0, 2, 3, 4, 5, 6};
393 int actual = -1; m = 0;
394 while( queue.try_get(actual) )
395 CHECK_MESSAGE( actual == expected[m++], "" );
396 CHECK_MESSAGE( ( sizeof(expected) / sizeof(expected[0]) == m), "Not all messages have been processed." );
397 g.wait_for_all();
398
399 const size_t threshold2 = size_t(-1);
400 tbb::flow::limiter_node<int, long long> limit2(g, threshold2);
401 make_edge(limit2, queue);
402 CHECK_MESSAGE( ( limit2.try_put( 1 )), "Newly constructed limiter node does not accept message." );
403 long long decrement_value = (long long)( size_t(-1)/2 );
404 CHECK_MESSAGE( limit2.decrementer().try_put( -decrement_value ),
405 "Limiter node decrementer's port does not accept message" );
406 CHECK_MESSAGE( ( limit2.try_put( 2 )), "Limiter's gate should not be closed yet." );
407 CHECK_MESSAGE( limit2.decrementer().try_put( -decrement_value ),
408 "Limiter node decrementer's port does not accept message" );
409 CHECK_MESSAGE( ( !limit2.try_put( 3 )), "Overflow happened for internal counter." );
410 int expected2[] = {1, 2};
411 actual = -1; m = 0;
412 while( queue.try_get(actual) )
413 CHECK_MESSAGE( actual == expected2[m++], "" );
414 CHECK_MESSAGE( ( sizeof(expected2) / sizeof(expected2[0]) == m), "Not all messages have been processed." );
415 g.wait_for_all();
416
417 const size_t threshold3 = 10;
418 tbb::flow::limiter_node<int, long long> limit3(g, threshold3);
419 make_edge(limit3, queue);
420 long long decrement_value3 = 3;
421 CHECK_MESSAGE( limit3.decrementer().try_put( -decrement_value3 ),
422 "Limiter node decrementer's port does not accept message" );
423
424 m = 0;
425 while( limit3.try_put( m ) ){ m++; };
426 CHECK_MESSAGE( m == threshold3 - decrement_value3, "Not all messages have been accepted." );
427
428 actual = -1; m = 0;
429 while( queue.try_get(actual) ){
430 CHECK_MESSAGE( actual == m++, "Not all messages have been processed." );
431 }
432
433 g.wait_for_all();
434 CHECK_MESSAGE( m == threshold3 - decrement_value3, "Not all messages have been processed." );
435 }
436
test_try_put_without_successors()437 void test_try_put_without_successors() {
438 tbb::flow::graph g;
439 int try_put_num{3};
440 tbb::flow::buffer_node<int> bn(g);
441 tbb::flow::limiter_node<int> ln(g, try_put_num);
442
443 tbb::flow::make_edge(bn, ln);
444
445 int i = 1;
446 for (; i <= try_put_num; i++)
447 bn.try_put(i);
448
449 std::atomic<int> counter{0};
450 tbb::flow::function_node<int, int> fn(g, tbb::flow::unlimited,
451 [&](int input) {
452 counter += input;
453 return int{};
454 }
455 );
456
457 tbb::flow::make_edge(ln, fn);
458
459 g.wait_for_all();
460 CHECK((counter == i * try_put_num / 2));
461
462 // Check the lost message
463 tbb::flow::remove_edge(bn, ln);
464 ln.decrementer().try_put(tbb::flow::continue_msg());
465 bn.try_put(try_put_num + 1);
466 g.wait_for_all();
467 CHECK((counter == i * try_put_num / 2));
468
469 }
470
471 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
472 #include <array>
473 #include <vector>
test_follows_and_precedes_api()474 void test_follows_and_precedes_api() {
475 using msg_t = tbb::flow::continue_msg;
476
477 std::array<msg_t, 3> messages_for_follows= { {msg_t(), msg_t(), msg_t()} };
478 std::vector<msg_t> messages_for_precedes = {msg_t()};
479
480 follows_and_precedes_testing::test_follows
481 <msg_t, tbb::flow::limiter_node<msg_t, msg_t>>(messages_for_follows, 1000);
482 follows_and_precedes_testing::test_precedes
483 <msg_t, tbb::flow::limiter_node<msg_t, msg_t>>(messages_for_precedes, 1000);
484
485 }
486 #endif
487
488 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
test_deduction_guides()489 void test_deduction_guides() {
490 using namespace tbb::flow;
491
492 graph g;
493 broadcast_node<int> br(g);
494 limiter_node<int> l0(g, 100);
495
496 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
497 limiter_node l1(follows(br), 100);
498 static_assert(std::is_same_v<decltype(l1), limiter_node<int>>);
499
500 limiter_node l2(precedes(br), 100);
501 static_assert(std::is_same_v<decltype(l2), limiter_node<int>>);
502 #endif
503
504 limiter_node l3(l0);
505 static_assert(std::is_same_v<decltype(l3), limiter_node<int>>);
506 }
507 #endif
508
test_decrement_while_try_put_task()509 void test_decrement_while_try_put_task() {
510 constexpr int threshold = 50000;
511
512 tbb::flow::graph graph{};
513 std::atomic<int> processed;
514 tbb::flow::input_node<int> input{ graph, [&](tbb::flow_control & fc) -> int {
515 static int i = {};
516 if (i++ >= threshold) fc.stop();
517 return i;
518 }};
519 tbb::flow::limiter_node<int, int> blockingNode{ graph, 1 };
520 tbb::flow::multifunction_node<int, std::tuple<int>> processing{ graph, tbb::flow::serial,
521 [&](const int & value, typename decltype(processing)::output_ports_type & out) {
522 if (value != threshold)
523 std::get<0>(out).try_put(1);
524 processed.store(value);
525 }};
526
527 tbb::flow::make_edge(input, blockingNode);
528 tbb::flow::make_edge(blockingNode, processing);
529 tbb::flow::make_edge(processing, blockingNode.decrementer());
530
531 input.activate();
532
533 graph.wait_for_all();
534 CHECK_MESSAGE(processed.load() == threshold, "decrementer terminate flow graph work");
535 }
536
537
538 //! Test puts on limiter_node with decrements and varying parallelism levels
539 //! \brief \ref error_guessing
540 TEST_CASE("Serial and parallel tests") {
541 for (unsigned i = 1; i <= 2 * utils::MaxThread; ++i) {
542 tbb::task_arena arena(i);
543 arena.execute(
__anon70523bc50402() 544 [i]() {
545 test_serial<int>();
546 test_parallel<int>(i);
547 }
548 );
549 }
550 }
551
552 //! Test initial put of continue_msg on decrementer port does not stop message flow
553 //! \brief \ref error_guessing
554 TEST_CASE("Test continue_msg reception") {
555 test_continue_msg_reception();
556 }
557
558 //! Test put message on decrementer port does not stop message flow
559 //! \brief \ref error_guessing
560 TEST_CASE("Test try_put to decrementer while try_put to limiter_node") {
561 test_decrement_while_try_put_task();
562 }
563
564 //! Test multifunction_node connected to limiter_node
565 //! \brief \ref error_guessing
566 TEST_CASE("Multifunction connected to limiter") {
567 test_multifunction_to_limiter(30,3);
568 test_multifunction_to_limiter(300,13);
569 test_multifunction_to_limiter(3000,1);
570 }
571
572 //! Test message release if successor doesn't accept
573 //! \brief \ref requirement
574 TEST_CASE("Message is released if successor does not accept") {
575 test_reserve_release_messages();
576 }
577
578 //! Test decrementer
579 //! \brief \ref requirement \ref error_guessing
580 TEST_CASE("Decrementer") {
581 test_decrementer();
582 }
583
584 //! Test try_put() without successor
585 //! \brief \ref error_guessing
586 TEST_CASE("Test try_put() without successors") {
587 test_try_put_without_successors();
588 }
589
590 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
591 //! Test follows and precedes API
592 //! \brief \ref error_guessing
593 TEST_CASE( "Support for follows and precedes API" ) {
594 test_follows_and_precedes_api();
595 }
596 #endif
597
598 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
599 //! Test deduction guides
600 //! \brief \ref requirement
601 TEST_CASE( "Deduction guides" ) {
602 test_deduction_guides();
603 }
604 #endif
605
606 struct TestLargeStruct {
607 char bytes[512]{ 0 };
608 };
609
610 //! Test correct node deallocation while using small_object_pool.
611 //! (see https://github.com/oneapi-src/oneTBB/issues/639)
612 //! \brief \ref error_guessing
613 TEST_CASE("Test correct node deallocation while using small_object_pool") {
614 tbb::flow::graph graph;
615 tbb::flow::queue_node<TestLargeStruct> input_node( graph );
616 tbb::flow::function_node<TestLargeStruct> func( graph, tbb::flow::serial,
__anon70523bc50502(const TestLargeStruct& input) 617 [](const TestLargeStruct& input) { return input; } );
618
619 tbb::flow::make_edge( input_node, func );
620 CHECK( input_node.try_put( TestLargeStruct{} ) );
621 graph.wait_for_all();
622
623 tbb::task_scheduler_handle handle{ tbb::attach{} };
624 tbb::finalize( handle, std::nothrow );
625 }
626