xref: /oneTBB/test/tbb/test_limiter_node.cpp (revision adf44fbf)
1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #if __INTEL_COMPILER && _MSC_VER
18 #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
19 #endif
20 
21 #include "common/config.h"
22 
23 #include "tbb/flow_graph.h"
24 
25 #include "common/test.h"
26 #include "common/utils.h"
27 #include "common/utils_assert.h"
28 #include "common/test_follows_and_precedes_api.h"
29 
30 #include <atomic>
31 
32 
33 //! \file test_limiter_node.cpp
34 //! \brief Test for [flow_graph.limiter_node] specification
35 
36 
37 const int L = 10;
38 const int N = 1000;
39 
40 using tbb::detail::d1::SUCCESSFULLY_ENQUEUED;
41 using tbb::detail::d1::graph_task;
42 
43 template< typename T >
44 struct serial_receiver : public tbb::flow::receiver<T>, utils::NoAssign {
45    T next_value;
46    tbb::flow::graph& my_graph;
47 
48    serial_receiver(tbb::flow::graph& g) : next_value(T(0)), my_graph(g) {}
49 
50    graph_task* try_put_task( const T &v ) override {
51        CHECK_MESSAGE( next_value++  == v, "" );
52        return const_cast<graph_task*>(SUCCESSFULLY_ENQUEUED);
53    }
54 
55     tbb::flow::graph& graph_reference() const override {
56         return my_graph;
57     }
58 };
59 
60 template< typename T >
61 struct parallel_receiver : public tbb::flow::receiver<T>, utils::NoAssign {
62 
63     std::atomic<int> my_count;
64     tbb::flow::graph& my_graph;
65 
66     parallel_receiver(tbb::flow::graph& g) : my_graph(g) { my_count = 0; }
67 
68     graph_task* try_put_task( const T &/*v*/ ) override {
69        ++my_count;
70        return const_cast<graph_task*>(SUCCESSFULLY_ENQUEUED);
71     }
72 
73     tbb::flow::graph& graph_reference() const override {
74         return my_graph;
75     }
76 };
77 
78 template< typename T >
79 struct empty_sender : public tbb::flow::sender<T> {
80         typedef typename tbb::flow::sender<T>::successor_type successor_type;
81 
82         bool register_successor( successor_type & ) override { return false; }
83         bool remove_successor( successor_type & ) override { return false; }
84 };
85 
86 
87 template< typename T >
88 struct put_body : utils::NoAssign {
89 
90     tbb::flow::limiter_node<T> &my_lim;
91     std::atomic<int> &my_accept_count;
92 
93     put_body( tbb::flow::limiter_node<T> &lim, std::atomic<int> &accept_count ) :
94         my_lim(lim), my_accept_count(accept_count) {}
95 
96     void operator()( int ) const {
97         for ( int i = 0; i < L; ++i ) {
98             bool msg = my_lim.try_put( T(i) );
99             if ( msg == true )
100                ++my_accept_count;
101         }
102     }
103 };
104 
105 template< typename T >
106 struct put_dec_body : utils::NoAssign {
107 
108     tbb::flow::limiter_node<T> &my_lim;
109     std::atomic<int> &my_accept_count;
110 
111     put_dec_body( tbb::flow::limiter_node<T> &lim, std::atomic<int> &accept_count ) :
112         my_lim(lim), my_accept_count(accept_count) {}
113 
114     void operator()( int ) const {
115         int local_accept_count = 0;
116         while ( local_accept_count < N ) {
117             bool msg = my_lim.try_put( T(local_accept_count) );
118             if ( msg == true ) {
119                 ++local_accept_count;
120                 ++my_accept_count;
121                 my_lim.decrementer().try_put( tbb::flow::continue_msg() );
122             }
123         }
124     }
125 
126 };
127 
128 template< typename Sender, typename Receiver >
129 void make_edge_impl(Sender& sender, Receiver& receiver){
130 #if __GNUC__ < 12 && !TBB_USE_DEBUG
131     // Seemingly, GNU compiler generates incorrect code for the call of limiter.register_successor in release (-03)
132     // The function pointer to make_edge workarounds the issue for unknown reason
133     auto make_edge_ptr = tbb::flow::make_edge<int>;
134     make_edge_ptr(sender, receiver);
135 #else
136     tbb::flow::make_edge(sender, receiver);
137 #endif
138 }
139 
140 template< typename T >
141 void test_puts_with_decrements( int num_threads, tbb::flow::limiter_node< T >& lim , tbb::flow::graph& g) {
142     parallel_receiver<T> r(g);
143     empty_sender< tbb::flow::continue_msg > s;
144     std::atomic<int> accept_count;
145     accept_count = 0;
146     tbb::flow::make_edge( lim, r );
147     tbb::flow::make_edge(s, lim.decrementer());
148 
149     // test puts with decrements
150     utils::NativeParallelFor( num_threads, put_dec_body<T>(lim, accept_count) );
151     int c = accept_count;
152     CHECK_MESSAGE( c == N*num_threads, "" );
153     CHECK_MESSAGE( r.my_count == N*num_threads, "" );
154 }
155 
156 //
157 // Tests
158 //
159 // limiter only forwards below the limit, multiple parallel senders / single receiver
160 // multiple parallel senders that put to decrement at each accept, limiter accepts new messages
161 //
162 //
163 template< typename T >
164 int test_parallel(int num_threads) {
165 
166    // test puts with no decrements
167    for ( int i = 0; i < L; ++i ) {
168        tbb::flow::graph g;
169        tbb::flow::limiter_node< T > lim(g, i);
170        parallel_receiver<T> r(g);
171        std::atomic<int> accept_count;
172        accept_count = 0;
173        tbb::flow::make_edge( lim, r );
174        // test puts with no decrements
175        utils::NativeParallelFor( num_threads, put_body<T>(lim, accept_count) );
176        g.wait_for_all();
177        int c = accept_count;
178        CHECK_MESSAGE( c == i, "" );
179    }
180 
181    // test puts with decrements
182    for ( int i = 1; i < L; ++i ) {
183        tbb::flow::graph g;
184        tbb::flow::limiter_node< T > lim(g, i);
185        test_puts_with_decrements(num_threads, lim, g);
186        tbb::flow::limiter_node< T > lim_copy( lim );
187        test_puts_with_decrements(num_threads, lim_copy, g);
188    }
189 
190    return 0;
191 }
192 
193 //
194 // Tests
195 //
196 // limiter only forwards below the limit, single sender / single receiver
197 // at reject, a put to decrement, will cause next message to be accepted
198 //
199 template< typename T >
200 int test_serial() {
201 
202    // test puts with no decrements
203    for ( int i = 0; i < L; ++i ) {
204        tbb::flow::graph g;
205        tbb::flow::limiter_node< T > lim(g, i);
206        serial_receiver<T> r(g);
207        tbb::flow::make_edge( lim, r );
208        for ( int j = 0; j < L; ++j ) {
209            bool msg = lim.try_put( T(j) );
210            CHECK_MESSAGE( (( j < i && msg == true ) || ( j >= i && msg == false )), "" );
211        }
212        g.wait_for_all();
213    }
214 
215    // test puts with decrements
216    for ( int i = 1; i < L; ++i ) {
217        tbb::flow::graph g;
218        tbb::flow::limiter_node< T > lim(g, i);
219        serial_receiver<T> r(g);
220        empty_sender< tbb::flow::continue_msg > s;
221        tbb::flow::make_edge( lim, r );
222        tbb::flow::make_edge(s, lim.decrementer());
223        for ( int j = 0; j < N; ++j ) {
224            bool msg = lim.try_put( T(j) );
225            CHECK_MESSAGE( (( j < i && msg == true ) || ( j >= i && msg == false )), "" );
226            if ( msg == false ) {
227                lim.decrementer().try_put( tbb::flow::continue_msg() );
228                msg = lim.try_put( T(j) );
229                CHECK_MESSAGE( msg == true, "" );
230            }
231        }
232    }
233    return 0;
234 }
235 
236 // reported bug in limiter (http://software.intel.com/en-us/comment/1752355)
237 #define DECREMENT_OUTPUT 1  // the port number of the decrement output of the multifunction_node
238 #define LIMITER_OUTPUT 0    // port number of the integer output
239 
240 typedef tbb::flow::multifunction_node<int, std::tuple<int,tbb::flow::continue_msg> > mfnode_type;
241 
242 std::atomic<size_t> emit_count;
243 std::atomic<size_t> emit_sum;
244 std::atomic<size_t> receive_count;
245 std::atomic<size_t> receive_sum;
246 
247 struct mfnode_body {
248     int max_cnt;
249     std::atomic<int>* my_cnt;
250     mfnode_body(const int& _max, std::atomic<int> &_my) : max_cnt(_max), my_cnt(&_my)  { }
251     void operator()(const int &/*in*/, mfnode_type::output_ports_type &out) {
252         int lcnt = ++(*my_cnt);
253         if(lcnt > max_cnt) {
254             return;
255         }
256         // put one continue_msg to the decrement of the limiter.
257         if(!std::get<DECREMENT_OUTPUT>(out).try_put(tbb::flow::continue_msg())) {
258             CHECK_MESSAGE( (false),"Unexpected rejection of decrement");
259         }
260         {
261             // put messages to the input of the limiter_node until it rejects.
262             while( std::get<LIMITER_OUTPUT>(out).try_put(lcnt) ) {
263                 emit_sum += lcnt;
264                 ++emit_count;
265             }
266         }
267     }
268 };
269 
270 struct fn_body {
271     int operator()(const int &in) {
272         receive_sum += in;
273         ++receive_count;
274         return in;
275     }
276 };
277 
278 //                   +------------+
279 //    +---------+    |            v
280 //    | mf_node |0---+       +----------+          +----------+
281 // +->|         |1---------->| lim_node |--------->| fn_node  |--+
282 // |  +---------+            +----------+          +----------+  |
283 // |                                                             |
284 // |                                                             |
285 // +-------------------------------------------------------------+
286 //
287 void
288 test_multifunction_to_limiter(int _max, int _nparallel) {
289     tbb::flow::graph g;
290     emit_count = 0;
291     emit_sum = 0;
292     receive_count = 0;
293     receive_sum = 0;
294     std::atomic<int> local_cnt;
295     local_cnt = 0;
296     mfnode_type mf_node(g, tbb::flow::unlimited, mfnode_body(_max, local_cnt));
297     tbb::flow::function_node<int, int> fn_node(g, tbb::flow::unlimited, fn_body());
298     tbb::flow::limiter_node<int> lim_node(g, _nparallel);
299     tbb::flow::make_edge(tbb::flow::output_port<LIMITER_OUTPUT>(mf_node), lim_node);
300     tbb::flow::make_edge(tbb::flow::output_port<DECREMENT_OUTPUT>(mf_node), lim_node.decrementer());
301     tbb::flow::make_edge(lim_node, fn_node);
302     tbb::flow::make_edge(fn_node, mf_node);
303 
304     mf_node.try_put(1);
305     g.wait_for_all();
306     CHECK_MESSAGE( (emit_count == receive_count), "counts do not match");
307     CHECK_MESSAGE( (emit_sum == receive_sum), "sums do not match");
308 
309     // reset, test again
310     g.reset();
311     emit_count = 0;
312     emit_sum = 0;
313     receive_count = 0;
314     receive_sum = 0;
315     local_cnt = 0;;
316     mf_node.try_put(1);
317     g.wait_for_all();
318     CHECK_MESSAGE( (emit_count == receive_count), "counts do not match");
319     CHECK_MESSAGE( (emit_sum == receive_sum), "sums do not match");
320 }
321 
322 
323 void
324 test_continue_msg_reception() {
325     tbb::flow::graph g;
326     tbb::flow::limiter_node<int> ln(g,2);
327     tbb::flow::queue_node<int>   qn(g);
328     tbb::flow::make_edge(ln, qn);
329     ln.decrementer().try_put(tbb::flow::continue_msg());
330     ln.try_put(42);
331     g.wait_for_all();
332     int outint;
333     CHECK_MESSAGE( (qn.try_get(outint) && outint == 42), "initial put to decrement stops node");
334 }
335 
336 
337 //
338 // This test ascertains that if a message is not successfully put
339 // to a successor, the message is not dropped but released.
340 //
341 
342 void test_reserve_release_messages() {
343     using namespace tbb::flow;
344     graph g;
345 
346     //making two queue_nodes: one broadcast_node and one limiter_node
347     queue_node<int> input_queue(g);
348     queue_node<int> output_queue(g);
349     broadcast_node<int> broad(g);
350     limiter_node<int, int> limit(g,2); //threshold of 2
351 
352     //edges
353     make_edge(input_queue, limit);
354     make_edge(limit, output_queue);
355     make_edge(broad,limit.decrementer());
356 
357     int list[4] = {19, 33, 72, 98}; //list to be put to the input queue
358 
359     input_queue.try_put(list[0]); // succeeds
360     input_queue.try_put(list[1]); // succeeds
361     input_queue.try_put(list[2]); // fails, stored in upstream buffer
362     g.wait_for_all();
363 
364     remove_edge(limit, output_queue); //remove successor
365 
366     //sending message to the decrement port of the limiter
367     broad.try_put(1); //failed message retrieved.
368     g.wait_for_all();
369 
370     make_edge_impl(limit, output_queue); //putting the successor back
371 
372     broad.try_put(1);  //drop the count
373 
374     input_queue.try_put(list[3]);  //success
375     g.wait_for_all();
376 
377     int var=0;
378 
379     for (int i=0; i<4; i++) {
380         output_queue.try_get(var);
381         CHECK_MESSAGE( (var==list[i]), "some data dropped, input does not match output");
382         g.wait_for_all();
383     }
384 }
385 
386 void test_decrementer() {
387     const int threshold = 5;
388     tbb::flow::graph g;
389     tbb::flow::limiter_node<int, int> limit(g, threshold);
390     tbb::flow::queue_node<int> queue(g);
391     make_edge(limit, queue);
392     int m = 0;
393     CHECK_MESSAGE( ( limit.try_put( m++ )), "Newly constructed limiter node does not accept message." );
394     CHECK_MESSAGE( limit.decrementer().try_put( -threshold ), // close limiter's gate
395                    "Limiter node decrementer's port does not accept message." );
396     CHECK_MESSAGE( ( !limit.try_put( m++ )), "Closed limiter node's accepts message." );
397     CHECK_MESSAGE( limit.decrementer().try_put( threshold + 5 ),  // open limiter's gate
398                    "Limiter node decrementer's port does not accept message." );
399     for( int i = 0; i < threshold; ++i )
400         CHECK_MESSAGE( ( limit.try_put( m++ )), "Limiter node does not accept message while open." );
401     CHECK_MESSAGE( ( !limit.try_put( m )), "Limiter node's gate is not closed." );
402     g.wait_for_all();
403     int expected[] = {0, 2, 3, 4, 5, 6};
404     int actual = -1; m = 0;
405     while( queue.try_get(actual) )
406         CHECK_MESSAGE( actual == expected[m++], "" );
407     CHECK_MESSAGE( ( sizeof(expected) / sizeof(expected[0]) == m), "Not all messages have been processed." );
408     g.wait_for_all();
409 
410     const size_t threshold2 = size_t(-1);
411     tbb::flow::limiter_node<int, long long> limit2(g, threshold2);
412     make_edge(limit2, queue);
413     CHECK_MESSAGE( ( limit2.try_put( 1 )), "Newly constructed limiter node does not accept message." );
414     long long decrement_value = (long long)( size_t(-1)/2 );
415     CHECK_MESSAGE( limit2.decrementer().try_put( -decrement_value ),
416                    "Limiter node decrementer's port does not accept message" );
417     CHECK_MESSAGE( ( limit2.try_put( 2 )), "Limiter's gate should not be closed yet." );
418     CHECK_MESSAGE( limit2.decrementer().try_put( -decrement_value ),
419                    "Limiter node decrementer's port does not accept message" );
420     CHECK_MESSAGE( ( !limit2.try_put( 3 )), "Overflow happened for internal counter." );
421     int expected2[] = {1, 2};
422     actual = -1; m = 0;
423     while( queue.try_get(actual) )
424         CHECK_MESSAGE( actual == expected2[m++], "" );
425     CHECK_MESSAGE( ( sizeof(expected2) / sizeof(expected2[0]) == m), "Not all messages have been processed." );
426     g.wait_for_all();
427 
428     const size_t threshold3 = 10;
429     tbb::flow::limiter_node<int, long long> limit3(g, threshold3);
430     make_edge(limit3, queue);
431     long long decrement_value3 = 3;
432     CHECK_MESSAGE( limit3.decrementer().try_put( -decrement_value3 ),
433                    "Limiter node decrementer's port does not accept message" );
434 
435     m = 0;
436     while( limit3.try_put( m ) ){ m++; };
437     CHECK_MESSAGE( m == threshold3 - decrement_value3, "Not all messages have been accepted." );
438 
439     actual = -1; m = 0;
440     while( queue.try_get(actual) ){
441         CHECK_MESSAGE( actual == m++, "Not all messages have been processed." );
442     }
443 
444     g.wait_for_all();
445     CHECK_MESSAGE( m == threshold3 - decrement_value3, "Not all messages have been processed." );
446 }
447 
448 void test_try_put_without_successors() {
449     tbb::flow::graph g;
450     int try_put_num{3};
451     tbb::flow::buffer_node<int> bn(g);
452     tbb::flow::limiter_node<int> ln(g, try_put_num);
453 
454     tbb::flow::make_edge(bn, ln);
455 
456     int i = 1;
457     for (; i <= try_put_num; i++)
458         bn.try_put(i);
459 
460     std::atomic<int> counter{0};
461     tbb::flow::function_node<int, int> fn(g, tbb::flow::unlimited,
462         [&](int input) {
463             counter += input;
464             return int{};
465         }
466     );
467 
468     make_edge_impl(ln, fn);
469 
470     g.wait_for_all();
471     CHECK((counter == i * try_put_num / 2));
472 
473     // Check the lost message
474     tbb::flow::remove_edge(bn, ln);
475     ln.decrementer().try_put(tbb::flow::continue_msg());
476     bn.try_put(try_put_num + 1);
477     g.wait_for_all();
478     CHECK((counter == i * try_put_num / 2));
479 
480 }
481 
482 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
483 #include <array>
484 #include <vector>
485 void test_follows_and_precedes_api() {
486     using msg_t = tbb::flow::continue_msg;
487 
488     std::array<msg_t, 3> messages_for_follows= { {msg_t(), msg_t(), msg_t()} };
489     std::vector<msg_t> messages_for_precedes = {msg_t()};
490 
491     follows_and_precedes_testing::test_follows
492         <msg_t, tbb::flow::limiter_node<msg_t, msg_t>>(messages_for_follows, 1000);
493     follows_and_precedes_testing::test_precedes
494         <msg_t, tbb::flow::limiter_node<msg_t, msg_t>>(messages_for_precedes, 1000);
495 
496 }
497 #endif
498 
499 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
500 void test_deduction_guides() {
501     using namespace tbb::flow;
502 
503     graph g;
504     broadcast_node<int> br(g);
505     limiter_node<int> l0(g, 100);
506 
507 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
508     limiter_node l1(follows(br), 100);
509     static_assert(std::is_same_v<decltype(l1), limiter_node<int>>);
510 
511     limiter_node l2(precedes(br), 100);
512     static_assert(std::is_same_v<decltype(l2), limiter_node<int>>);
513 #endif
514 
515     limiter_node l3(l0);
516     static_assert(std::is_same_v<decltype(l3), limiter_node<int>>);
517 }
518 #endif
519 
520 //! Test puts on limiter_node with decrements and varying parallelism levels
521 //! \brief \ref error_guessing
522 TEST_CASE("Serial and parallel tests") {
523     for (unsigned i = 1; i <= 2 * utils::MaxThread; ++i) {
524         tbb::task_arena arena(i);
525         arena.execute(
526             [i]() {
527                 test_serial<int>();
528                 test_parallel<int>(i);
529             }
530         );
531     }
532 }
533 
534 //! Test initial put of continue_msg on decrementer port does not stop message flow
535 //! \brief \ref error_guessing
536 TEST_CASE("Test continue_msg reception") {
537     test_continue_msg_reception();
538 }
539 
540 //! Test multifunction_node connected to limiter_node
541 //! \brief \ref error_guessing
542 TEST_CASE("Multifunction connected to limiter") {
543     test_multifunction_to_limiter(30,3);
544     test_multifunction_to_limiter(300,13);
545     test_multifunction_to_limiter(3000,1);
546 }
547 
548 //! Test message release if successor doesn't accept
549 //! \brief \ref requirement
550 TEST_CASE("Message is released if successor does not accept") {
551     test_reserve_release_messages();
552 }
553 
554 //! Test decrementer
555 //! \brief \ref requirement \ref error_guessing
556 TEST_CASE("Decrementer") {
557     test_decrementer();
558 }
559 
560 //! Test try_put() without successor
561 //! \brief \ref error_guessing
562 TEST_CASE("Test try_put() without successors") {
563     test_try_put_without_successors();
564 }
565 
566 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
567 //! Test follows and precedes API
568 //! \brief \ref error_guessing
569 TEST_CASE( "Support for follows and precedes API" ) {
570     test_follows_and_precedes_api();
571 }
572 #endif
573 
574 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
575 //! Test deduction guides
576 //! \brief \ref requirement
577 TEST_CASE( "Deduction guides" ) {
578     test_deduction_guides();
579 }
580 #endif
581