xref: /oneTBB/test/tbb/test_limiter_node.cpp (revision 20514c0e)
1 /*
2     Copyright (c) 2005-2023 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #if __INTEL_COMPILER && _MSC_VER
18 #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
19 #endif
20 
21 #include "common/config.h"
22 
23 #include "tbb/flow_graph.h"
24 
25 #include "common/test.h"
26 #include "common/utils.h"
27 #include "common/utils_assert.h"
28 #include "common/test_follows_and_precedes_api.h"
29 #include "tbb/global_control.h"
30 
31 #include <atomic>
32 
33 
34 //! \file test_limiter_node.cpp
35 //! \brief Test for [flow_graph.limiter_node] specification
36 
37 
38 const int L = 10;
39 const int N = 1000;
40 
41 using tbb::detail::d1::SUCCESSFULLY_ENQUEUED;
42 using tbb::detail::d1::graph_task;
43 
44 template< typename T >
45 struct serial_receiver : public tbb::flow::receiver<T>, utils::NoAssign {
46    T next_value;
47    tbb::flow::graph& my_graph;
48 
49    serial_receiver(tbb::flow::graph& g) : next_value(T(0)), my_graph(g) {}
50 
51    graph_task* try_put_task( const T &v ) override {
52        CHECK_MESSAGE( next_value++  == v, "" );
53        return const_cast<graph_task*>(SUCCESSFULLY_ENQUEUED);
54    }
55 
56     tbb::flow::graph& graph_reference() const override {
57         return my_graph;
58     }
59 };
60 
61 template< typename T >
62 struct parallel_receiver : public tbb::flow::receiver<T>, utils::NoAssign {
63 
64     std::atomic<int> my_count;
65     tbb::flow::graph& my_graph;
66 
67     parallel_receiver(tbb::flow::graph& g) : my_graph(g) { my_count = 0; }
68 
69     graph_task* try_put_task( const T &/*v*/ ) override {
70        ++my_count;
71        return const_cast<graph_task*>(SUCCESSFULLY_ENQUEUED);
72     }
73 
74     tbb::flow::graph& graph_reference() const override {
75         return my_graph;
76     }
77 };
78 
79 template< typename T >
80 struct empty_sender : public tbb::flow::sender<T> {
81         typedef typename tbb::flow::sender<T>::successor_type successor_type;
82 
83         bool register_successor( successor_type & ) override { return false; }
84         bool remove_successor( successor_type & ) override { return false; }
85 };
86 
87 
88 template< typename T >
89 struct put_body : utils::NoAssign {
90 
91     tbb::flow::limiter_node<T> &my_lim;
92     std::atomic<int> &my_accept_count;
93 
94     put_body( tbb::flow::limiter_node<T> &lim, std::atomic<int> &accept_count ) :
95         my_lim(lim), my_accept_count(accept_count) {}
96 
97     void operator()( int ) const {
98         for ( int i = 0; i < L; ++i ) {
99             bool msg = my_lim.try_put( T(i) );
100             if ( msg == true )
101                ++my_accept_count;
102         }
103     }
104 };
105 
106 template< typename T >
107 struct put_dec_body : utils::NoAssign {
108 
109     tbb::flow::limiter_node<T> &my_lim;
110     std::atomic<int> &my_accept_count;
111 
112     put_dec_body( tbb::flow::limiter_node<T> &lim, std::atomic<int> &accept_count ) :
113         my_lim(lim), my_accept_count(accept_count) {}
114 
115     void operator()( int ) const {
116         int local_accept_count = 0;
117         while ( local_accept_count < N ) {
118             bool msg = my_lim.try_put( T(local_accept_count) );
119             if ( msg == true ) {
120                 ++local_accept_count;
121                 ++my_accept_count;
122                 my_lim.decrementer().try_put( tbb::flow::continue_msg() );
123             }
124         }
125     }
126 
127 };
128 
129 template< typename T >
130 void test_puts_with_decrements( int num_threads, tbb::flow::limiter_node< T >& lim , tbb::flow::graph& g) {
131     parallel_receiver<T> r(g);
132     empty_sender< tbb::flow::continue_msg > s;
133     std::atomic<int> accept_count;
134     accept_count = 0;
135     tbb::flow::make_edge( lim, r );
136     tbb::flow::make_edge(s, lim.decrementer());
137 
138     // test puts with decrements
139     utils::NativeParallelFor( num_threads, put_dec_body<T>(lim, accept_count) );
140     int c = accept_count;
141     CHECK_MESSAGE( c == N*num_threads, "" );
142     CHECK_MESSAGE( r.my_count == N*num_threads, "" );
143 }
144 
145 //
146 // Tests
147 //
148 // limiter only forwards below the limit, multiple parallel senders / single receiver
149 // multiple parallel senders that put to decrement at each accept, limiter accepts new messages
150 //
151 //
152 template< typename T >
153 int test_parallel(int num_threads) {
154 
155    // test puts with no decrements
156    for ( int i = 0; i < L; ++i ) {
157        tbb::flow::graph g;
158        tbb::flow::limiter_node< T > lim(g, i);
159        parallel_receiver<T> r(g);
160        std::atomic<int> accept_count;
161        accept_count = 0;
162        tbb::flow::make_edge( lim, r );
163        // test puts with no decrements
164        utils::NativeParallelFor( num_threads, put_body<T>(lim, accept_count) );
165        g.wait_for_all();
166        int c = accept_count;
167        CHECK_MESSAGE( c == i, "" );
168    }
169 
170    // test puts with decrements
171    for ( int i = 1; i < L; ++i ) {
172        tbb::flow::graph g;
173        tbb::flow::limiter_node< T > lim(g, i);
174        test_puts_with_decrements(num_threads, lim, g);
175        tbb::flow::limiter_node< T > lim_copy( lim );
176        test_puts_with_decrements(num_threads, lim_copy, g);
177    }
178 
179    return 0;
180 }
181 
182 //
183 // Tests
184 //
185 // limiter only forwards below the limit, single sender / single receiver
186 // at reject, a put to decrement, will cause next message to be accepted
187 //
188 template< typename T >
189 int test_serial() {
190 
191    // test puts with no decrements
192    for ( int i = 0; i < L; ++i ) {
193        tbb::flow::graph g;
194        tbb::flow::limiter_node< T > lim(g, i);
195        serial_receiver<T> r(g);
196        tbb::flow::make_edge( lim, r );
197        for ( int j = 0; j < L; ++j ) {
198            bool msg = lim.try_put( T(j) );
199            CHECK_MESSAGE( (( j < i && msg == true ) || ( j >= i && msg == false )), "" );
200        }
201        g.wait_for_all();
202    }
203 
204    // test puts with decrements
205    for ( int i = 1; i < L; ++i ) {
206        tbb::flow::graph g;
207        tbb::flow::limiter_node< T > lim(g, i);
208        serial_receiver<T> r(g);
209        empty_sender< tbb::flow::continue_msg > s;
210        tbb::flow::make_edge( lim, r );
211        tbb::flow::make_edge(s, lim.decrementer());
212        for ( int j = 0; j < N; ++j ) {
213            bool msg = lim.try_put( T(j) );
214            CHECK_MESSAGE( (( j < i && msg == true ) || ( j >= i && msg == false )), "" );
215            if ( msg == false ) {
216                lim.decrementer().try_put( tbb::flow::continue_msg() );
217                msg = lim.try_put( T(j) );
218                CHECK_MESSAGE( msg == true, "" );
219            }
220        }
221    }
222    return 0;
223 }
224 
225 // reported bug in limiter (https://community.intel.com/t5/Intel-oneAPI-Threading-Building/multifun-node-try-put-several-messages-to-one-successor-crashes/m-p/922844)
226 #define DECREMENT_OUTPUT 1  // the port number of the decrement output of the multifunction_node
227 #define LIMITER_OUTPUT 0    // port number of the integer output
228 
229 typedef tbb::flow::multifunction_node<int, std::tuple<int,tbb::flow::continue_msg> > mfnode_type;
230 
231 std::atomic<size_t> emit_count;
232 std::atomic<size_t> emit_sum;
233 std::atomic<size_t> receive_count;
234 std::atomic<size_t> receive_sum;
235 
236 struct mfnode_body {
237     int max_cnt;
238     std::atomic<int>* my_cnt;
239     mfnode_body(const int& _max, std::atomic<int> &_my) : max_cnt(_max), my_cnt(&_my)  { }
240     void operator()(const int &/*in*/, mfnode_type::output_ports_type &out) {
241         int lcnt = ++(*my_cnt);
242         if(lcnt > max_cnt) {
243             return;
244         }
245         // put one continue_msg to the decrement of the limiter.
246         if(!std::get<DECREMENT_OUTPUT>(out).try_put(tbb::flow::continue_msg())) {
247             CHECK_MESSAGE( (false),"Unexpected rejection of decrement");
248         }
249         {
250             // put messages to the input of the limiter_node until it rejects.
251             while( std::get<LIMITER_OUTPUT>(out).try_put(lcnt) ) {
252                 emit_sum += lcnt;
253                 ++emit_count;
254             }
255         }
256     }
257 };
258 
259 struct fn_body {
260     int operator()(const int &in) {
261         receive_sum += in;
262         ++receive_count;
263         return in;
264     }
265 };
266 
267 //                   +------------+
268 //    +---------+    |            v
269 //    | mf_node |0---+       +----------+          +----------+
270 // +->|         |1---------->| lim_node |--------->| fn_node  |--+
271 // |  +---------+            +----------+          +----------+  |
272 // |                                                             |
273 // |                                                             |
274 // +-------------------------------------------------------------+
275 //
276 void
277 test_multifunction_to_limiter(int _max, int _nparallel) {
278     tbb::flow::graph g;
279     emit_count = 0;
280     emit_sum = 0;
281     receive_count = 0;
282     receive_sum = 0;
283     std::atomic<int> local_cnt;
284     local_cnt = 0;
285     mfnode_type mf_node(g, tbb::flow::unlimited, mfnode_body(_max, local_cnt));
286     tbb::flow::function_node<int, int> fn_node(g, tbb::flow::unlimited, fn_body());
287     tbb::flow::limiter_node<int> lim_node(g, _nparallel);
288     tbb::flow::make_edge(tbb::flow::output_port<LIMITER_OUTPUT>(mf_node), lim_node);
289     tbb::flow::make_edge(tbb::flow::output_port<DECREMENT_OUTPUT>(mf_node), lim_node.decrementer());
290     tbb::flow::make_edge(lim_node, fn_node);
291     tbb::flow::make_edge(fn_node, mf_node);
292 
293     mf_node.try_put(1);
294     g.wait_for_all();
295     CHECK_MESSAGE( (emit_count == receive_count), "counts do not match");
296     CHECK_MESSAGE( (emit_sum == receive_sum), "sums do not match");
297 
298     // reset, test again
299     g.reset();
300     emit_count = 0;
301     emit_sum = 0;
302     receive_count = 0;
303     receive_sum = 0;
304     local_cnt = 0;
305     mf_node.try_put(1);
306     g.wait_for_all();
307     CHECK_MESSAGE( (emit_count == receive_count), "counts do not match");
308     CHECK_MESSAGE( (emit_sum == receive_sum), "sums do not match");
309 }
310 
311 
312 void
313 test_continue_msg_reception() {
314     tbb::flow::graph g;
315     tbb::flow::limiter_node<int> ln(g,2);
316     tbb::flow::queue_node<int>   qn(g);
317     tbb::flow::make_edge(ln, qn);
318     ln.decrementer().try_put(tbb::flow::continue_msg());
319     ln.try_put(42);
320     g.wait_for_all();
321     int outint;
322     CHECK_MESSAGE( (qn.try_get(outint) && outint == 42), "initial put to decrement stops node");
323 }
324 
325 
326 //
327 // This test ascertains that if a message is not successfully put
328 // to a successor, the message is not dropped but released.
329 //
330 
331 void test_reserve_release_messages() {
332     using namespace tbb::flow;
333     graph g;
334 
335     //making two queue_nodes: one broadcast_node and one limiter_node
336     queue_node<int> input_queue(g);
337     queue_node<int> output_queue(g);
338     broadcast_node<int> broad(g);
339     limiter_node<int, int> limit(g,2); //threshold of 2
340 
341     //edges
342     make_edge(input_queue, limit);
343     make_edge(limit, output_queue);
344     make_edge(broad,limit.decrementer());
345 
346     int list[4] = {19, 33, 72, 98}; //list to be put to the input queue
347 
348     input_queue.try_put(list[0]); // succeeds
349     input_queue.try_put(list[1]); // succeeds
350     input_queue.try_put(list[2]); // fails, stored in upstream buffer
351     g.wait_for_all();
352 
353     remove_edge(limit, output_queue); //remove successor
354 
355     //sending message to the decrement port of the limiter
356     broad.try_put(1); //failed message retrieved.
357     g.wait_for_all();
358 
359     tbb::flow::make_edge(limit, output_queue); //putting the successor back
360 
361     broad.try_put(1);  //drop the count
362 
363     input_queue.try_put(list[3]);  //success
364     g.wait_for_all();
365 
366     int var=0;
367 
368     for (int i=0; i<4; i++) {
369         output_queue.try_get(var);
370         CHECK_MESSAGE( (var==list[i]), "some data dropped, input does not match output");
371         g.wait_for_all();
372     }
373 }
374 
375 void test_decrementer() {
376     const int threshold = 5;
377     tbb::flow::graph g;
378     tbb::flow::limiter_node<int, int> limit(g, threshold);
379     tbb::flow::queue_node<int> queue(g);
380     make_edge(limit, queue);
381     int m = 0;
382     CHECK_MESSAGE( ( limit.try_put( m++ )), "Newly constructed limiter node does not accept message." );
383     CHECK_MESSAGE( limit.decrementer().try_put( -threshold ), // close limiter's gate
384                    "Limiter node decrementer's port does not accept message." );
385     CHECK_MESSAGE( ( !limit.try_put( m++ )), "Closed limiter node's accepts message." );
386     CHECK_MESSAGE( limit.decrementer().try_put( threshold + 5 ),  // open limiter's gate
387                    "Limiter node decrementer's port does not accept message." );
388     for( int i = 0; i < threshold; ++i )
389         CHECK_MESSAGE( ( limit.try_put( m++ )), "Limiter node does not accept message while open." );
390     CHECK_MESSAGE( ( !limit.try_put( m )), "Limiter node's gate is not closed." );
391     g.wait_for_all();
392     int expected[] = {0, 2, 3, 4, 5, 6};
393     int actual = -1; m = 0;
394     while( queue.try_get(actual) )
395         CHECK_MESSAGE( actual == expected[m++], "" );
396     CHECK_MESSAGE( ( sizeof(expected) / sizeof(expected[0]) == m), "Not all messages have been processed." );
397     g.wait_for_all();
398 
399     const size_t threshold2 = size_t(-1);
400     tbb::flow::limiter_node<int, long long> limit2(g, threshold2);
401     make_edge(limit2, queue);
402     CHECK_MESSAGE( ( limit2.try_put( 1 )), "Newly constructed limiter node does not accept message." );
403     long long decrement_value = (long long)( size_t(-1)/2 );
404     CHECK_MESSAGE( limit2.decrementer().try_put( -decrement_value ),
405                    "Limiter node decrementer's port does not accept message" );
406     CHECK_MESSAGE( ( limit2.try_put( 2 )), "Limiter's gate should not be closed yet." );
407     CHECK_MESSAGE( limit2.decrementer().try_put( -decrement_value ),
408                    "Limiter node decrementer's port does not accept message" );
409     CHECK_MESSAGE( ( !limit2.try_put( 3 )), "Overflow happened for internal counter." );
410     int expected2[] = {1, 2};
411     actual = -1; m = 0;
412     while( queue.try_get(actual) )
413         CHECK_MESSAGE( actual == expected2[m++], "" );
414     CHECK_MESSAGE( ( sizeof(expected2) / sizeof(expected2[0]) == m), "Not all messages have been processed." );
415     g.wait_for_all();
416 
417     const size_t threshold3 = 10;
418     tbb::flow::limiter_node<int, long long> limit3(g, threshold3);
419     make_edge(limit3, queue);
420     long long decrement_value3 = 3;
421     CHECK_MESSAGE( limit3.decrementer().try_put( -decrement_value3 ),
422                    "Limiter node decrementer's port does not accept message" );
423 
424     m = 0;
425     while( limit3.try_put( m ) ){ m++; };
426     CHECK_MESSAGE( m == threshold3 - decrement_value3, "Not all messages have been accepted." );
427 
428     actual = -1; m = 0;
429     while( queue.try_get(actual) ){
430         CHECK_MESSAGE( actual == m++, "Not all messages have been processed." );
431     }
432 
433     g.wait_for_all();
434     CHECK_MESSAGE( m == threshold3 - decrement_value3, "Not all messages have been processed." );
435 }
436 
437 void test_try_put_without_successors() {
438     tbb::flow::graph g;
439     int try_put_num{3};
440     tbb::flow::buffer_node<int> bn(g);
441     tbb::flow::limiter_node<int> ln(g, try_put_num);
442 
443     tbb::flow::make_edge(bn, ln);
444 
445     int i = 1;
446     for (; i <= try_put_num; i++)
447         bn.try_put(i);
448 
449     std::atomic<int> counter{0};
450     tbb::flow::function_node<int, int> fn(g, tbb::flow::unlimited,
451         [&](int input) {
452             counter += input;
453             return int{};
454         }
455     );
456 
457     tbb::flow::make_edge(ln, fn);
458 
459     g.wait_for_all();
460     CHECK((counter == i * try_put_num / 2));
461 
462     // Check the lost message
463     tbb::flow::remove_edge(bn, ln);
464     ln.decrementer().try_put(tbb::flow::continue_msg());
465     bn.try_put(try_put_num + 1);
466     g.wait_for_all();
467     CHECK((counter == i * try_put_num / 2));
468 
469 }
470 
471 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
472 #include <array>
473 #include <vector>
474 void test_follows_and_precedes_api() {
475     using msg_t = tbb::flow::continue_msg;
476 
477     std::array<msg_t, 3> messages_for_follows= { {msg_t(), msg_t(), msg_t()} };
478     std::vector<msg_t> messages_for_precedes = {msg_t()};
479 
480     follows_and_precedes_testing::test_follows
481         <msg_t, tbb::flow::limiter_node<msg_t, msg_t>>(messages_for_follows, 1000);
482     follows_and_precedes_testing::test_precedes
483         <msg_t, tbb::flow::limiter_node<msg_t, msg_t>>(messages_for_precedes, 1000);
484 
485 }
486 #endif
487 
488 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
489 void test_deduction_guides() {
490     using namespace tbb::flow;
491 
492     graph g;
493     broadcast_node<int> br(g);
494     limiter_node<int> l0(g, 100);
495 
496 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
497     limiter_node l1(follows(br), 100);
498     static_assert(std::is_same_v<decltype(l1), limiter_node<int>>);
499 
500     limiter_node l2(precedes(br), 100);
501     static_assert(std::is_same_v<decltype(l2), limiter_node<int>>);
502 #endif
503 
504     limiter_node l3(l0);
505     static_assert(std::is_same_v<decltype(l3), limiter_node<int>>);
506 }
507 #endif
508 
509 void test_decrement_while_try_put_task() {
510     constexpr int threshold = 50000;
511 
512     tbb::flow::graph graph{};
513     std::atomic<int> processed;
514     tbb::flow::input_node<int> input{ graph, [&](tbb::flow_control & fc) -> int {
515         static int i = {};
516         if (i++ >= threshold) fc.stop();
517         return i;
518     }};
519     tbb::flow::limiter_node<int, int> blockingNode{ graph, 1 };
520     tbb::flow::multifunction_node<int, std::tuple<int>> processing{ graph, tbb::flow::serial,
521         [&](const int & value, typename decltype(processing)::output_ports_type & out) {
522             if (value != threshold)
523                 std::get<0>(out).try_put(1);
524             processed.store(value);
525         }};
526 
527     tbb::flow::make_edge(input, blockingNode);
528     tbb::flow::make_edge(blockingNode, processing);
529     tbb::flow::make_edge(processing, blockingNode.decrementer());
530 
531     input.activate();
532 
533     graph.wait_for_all();
534     CHECK_MESSAGE(processed.load() == threshold, "decrementer terminate flow graph work");
535 }
536 
537 
538 //! Test puts on limiter_node with decrements and varying parallelism levels
539 //! \brief \ref error_guessing
540 TEST_CASE("Serial and parallel tests") {
541     for (unsigned i = 1; i <= 2 * utils::MaxThread; ++i) {
542         tbb::task_arena arena(i);
543         arena.execute(
544             [i]() {
545                 test_serial<int>();
546                 test_parallel<int>(i);
547             }
548         );
549     }
550 }
551 
552 //! Test initial put of continue_msg on decrementer port does not stop message flow
553 //! \brief \ref error_guessing
554 TEST_CASE("Test continue_msg reception") {
555     test_continue_msg_reception();
556 }
557 
558 //! Test put message on decrementer port does not stop message flow
559 //! \brief \ref error_guessing
560 TEST_CASE("Test try_put to decrementer while try_put to limiter_node") {
561     test_decrement_while_try_put_task();
562 }
563 
564 //! Test multifunction_node connected to limiter_node
565 //! \brief \ref error_guessing
566 TEST_CASE("Multifunction connected to limiter") {
567     test_multifunction_to_limiter(30,3);
568     test_multifunction_to_limiter(300,13);
569     test_multifunction_to_limiter(3000,1);
570 }
571 
572 //! Test message release if successor doesn't accept
573 //! \brief \ref requirement
574 TEST_CASE("Message is released if successor does not accept") {
575     test_reserve_release_messages();
576 }
577 
578 //! Test decrementer
579 //! \brief \ref requirement \ref error_guessing
580 TEST_CASE("Decrementer") {
581     test_decrementer();
582 }
583 
584 //! Test try_put() without successor
585 //! \brief \ref error_guessing
586 TEST_CASE("Test try_put() without successors") {
587     test_try_put_without_successors();
588 }
589 
590 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
591 //! Test follows and precedes API
592 //! \brief \ref error_guessing
593 TEST_CASE( "Support for follows and precedes API" ) {
594     test_follows_and_precedes_api();
595 }
596 #endif
597 
598 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
599 //! Test deduction guides
600 //! \brief \ref requirement
601 TEST_CASE( "Deduction guides" ) {
602     test_deduction_guides();
603 }
604 #endif
605 
606 struct TestLargeStruct {
607     char bytes[512]{ 0 };
608 };
609 
610 //! Test correct node deallocation while using small_object_pool.
611 //! (see https://github.com/oneapi-src/oneTBB/issues/639)
612 //! \brief \ref error_guessing
613 TEST_CASE("Test correct node deallocation while using small_object_pool") {
614     tbb::flow::graph graph;
615     tbb::flow::queue_node<TestLargeStruct> input_node( graph );
616     tbb::flow::function_node<TestLargeStruct> func( graph, tbb::flow::serial,
617         [](const TestLargeStruct& input) { return input; } );
618 
619     tbb::flow::make_edge( input_node, func );
620     CHECK( input_node.try_put( TestLargeStruct{} ) );
621     graph.wait_for_all();
622 
623     tbb::task_scheduler_handle handle{ tbb::attach{} };
624     tbb::finalize( handle, std::nothrow );
625 }
626