xref: /oneTBB/test/tbb/test_limiter_node.cpp (revision 6edb5c3a)
1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #if __INTEL_COMPILER && _MSC_VER
18 #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
19 #endif
20 
21 #define TBB_PREVIEW_WAITING_FOR_WORKERS 1
22 
23 #include "common/config.h"
24 
25 #include "tbb/flow_graph.h"
26 
27 #include "common/test.h"
28 #include "common/utils.h"
29 #include "common/utils_assert.h"
30 #include "common/test_follows_and_precedes_api.h"
31 #include "tbb/global_control.h"
32 
33 #include <atomic>
34 
35 
36 //! \file test_limiter_node.cpp
37 //! \brief Test for [flow_graph.limiter_node] specification
38 
39 
40 const int L = 10;
41 const int N = 1000;
42 
43 using tbb::detail::d1::SUCCESSFULLY_ENQUEUED;
44 using tbb::detail::d1::graph_task;
45 
46 template< typename T >
47 struct serial_receiver : public tbb::flow::receiver<T>, utils::NoAssign {
48    T next_value;
49    tbb::flow::graph& my_graph;
50 
51    serial_receiver(tbb::flow::graph& g) : next_value(T(0)), my_graph(g) {}
52 
53    graph_task* try_put_task( const T &v ) override {
54        CHECK_MESSAGE( next_value++  == v, "" );
55        return const_cast<graph_task*>(SUCCESSFULLY_ENQUEUED);
56    }
57 
58     tbb::flow::graph& graph_reference() const override {
59         return my_graph;
60     }
61 };
62 
63 template< typename T >
64 struct parallel_receiver : public tbb::flow::receiver<T>, utils::NoAssign {
65 
66     std::atomic<int> my_count;
67     tbb::flow::graph& my_graph;
68 
69     parallel_receiver(tbb::flow::graph& g) : my_graph(g) { my_count = 0; }
70 
71     graph_task* try_put_task( const T &/*v*/ ) override {
72        ++my_count;
73        return const_cast<graph_task*>(SUCCESSFULLY_ENQUEUED);
74     }
75 
76     tbb::flow::graph& graph_reference() const override {
77         return my_graph;
78     }
79 };
80 
81 template< typename T >
82 struct empty_sender : public tbb::flow::sender<T> {
83         typedef typename tbb::flow::sender<T>::successor_type successor_type;
84 
85         bool register_successor( successor_type & ) override { return false; }
86         bool remove_successor( successor_type & ) override { return false; }
87 };
88 
89 
90 template< typename T >
91 struct put_body : utils::NoAssign {
92 
93     tbb::flow::limiter_node<T> &my_lim;
94     std::atomic<int> &my_accept_count;
95 
96     put_body( tbb::flow::limiter_node<T> &lim, std::atomic<int> &accept_count ) :
97         my_lim(lim), my_accept_count(accept_count) {}
98 
99     void operator()( int ) const {
100         for ( int i = 0; i < L; ++i ) {
101             bool msg = my_lim.try_put( T(i) );
102             if ( msg == true )
103                ++my_accept_count;
104         }
105     }
106 };
107 
108 template< typename T >
109 struct put_dec_body : utils::NoAssign {
110 
111     tbb::flow::limiter_node<T> &my_lim;
112     std::atomic<int> &my_accept_count;
113 
114     put_dec_body( tbb::flow::limiter_node<T> &lim, std::atomic<int> &accept_count ) :
115         my_lim(lim), my_accept_count(accept_count) {}
116 
117     void operator()( int ) const {
118         int local_accept_count = 0;
119         while ( local_accept_count < N ) {
120             bool msg = my_lim.try_put( T(local_accept_count) );
121             if ( msg == true ) {
122                 ++local_accept_count;
123                 ++my_accept_count;
124                 my_lim.decrementer().try_put( tbb::flow::continue_msg() );
125             }
126         }
127     }
128 
129 };
130 
131 template< typename Sender, typename Receiver >
132 void make_edge_impl(Sender& sender, Receiver& receiver){
133 #if __GNUC__ < 12 && !TBB_USE_DEBUG
134     // Seemingly, GNU compiler generates incorrect code for the call of limiter.register_successor in release (-03)
135     // The function pointer to make_edge workarounds the issue for unknown reason
136     auto make_edge_ptr = tbb::flow::make_edge<int>;
137     make_edge_ptr(sender, receiver);
138 #else
139     tbb::flow::make_edge(sender, receiver);
140 #endif
141 }
142 
143 template< typename T >
144 void test_puts_with_decrements( int num_threads, tbb::flow::limiter_node< T >& lim , tbb::flow::graph& g) {
145     parallel_receiver<T> r(g);
146     empty_sender< tbb::flow::continue_msg > s;
147     std::atomic<int> accept_count;
148     accept_count = 0;
149     tbb::flow::make_edge( lim, r );
150     tbb::flow::make_edge(s, lim.decrementer());
151 
152     // test puts with decrements
153     utils::NativeParallelFor( num_threads, put_dec_body<T>(lim, accept_count) );
154     int c = accept_count;
155     CHECK_MESSAGE( c == N*num_threads, "" );
156     CHECK_MESSAGE( r.my_count == N*num_threads, "" );
157 }
158 
159 //
160 // Tests
161 //
162 // limiter only forwards below the limit, multiple parallel senders / single receiver
163 // multiple parallel senders that put to decrement at each accept, limiter accepts new messages
164 //
165 //
166 template< typename T >
167 int test_parallel(int num_threads) {
168 
169    // test puts with no decrements
170    for ( int i = 0; i < L; ++i ) {
171        tbb::flow::graph g;
172        tbb::flow::limiter_node< T > lim(g, i);
173        parallel_receiver<T> r(g);
174        std::atomic<int> accept_count;
175        accept_count = 0;
176        tbb::flow::make_edge( lim, r );
177        // test puts with no decrements
178        utils::NativeParallelFor( num_threads, put_body<T>(lim, accept_count) );
179        g.wait_for_all();
180        int c = accept_count;
181        CHECK_MESSAGE( c == i, "" );
182    }
183 
184    // test puts with decrements
185    for ( int i = 1; i < L; ++i ) {
186        tbb::flow::graph g;
187        tbb::flow::limiter_node< T > lim(g, i);
188        test_puts_with_decrements(num_threads, lim, g);
189        tbb::flow::limiter_node< T > lim_copy( lim );
190        test_puts_with_decrements(num_threads, lim_copy, g);
191    }
192 
193    return 0;
194 }
195 
196 //
197 // Tests
198 //
199 // limiter only forwards below the limit, single sender / single receiver
200 // at reject, a put to decrement, will cause next message to be accepted
201 //
202 template< typename T >
203 int test_serial() {
204 
205    // test puts with no decrements
206    for ( int i = 0; i < L; ++i ) {
207        tbb::flow::graph g;
208        tbb::flow::limiter_node< T > lim(g, i);
209        serial_receiver<T> r(g);
210        tbb::flow::make_edge( lim, r );
211        for ( int j = 0; j < L; ++j ) {
212            bool msg = lim.try_put( T(j) );
213            CHECK_MESSAGE( (( j < i && msg == true ) || ( j >= i && msg == false )), "" );
214        }
215        g.wait_for_all();
216    }
217 
218    // test puts with decrements
219    for ( int i = 1; i < L; ++i ) {
220        tbb::flow::graph g;
221        tbb::flow::limiter_node< T > lim(g, i);
222        serial_receiver<T> r(g);
223        empty_sender< tbb::flow::continue_msg > s;
224        tbb::flow::make_edge( lim, r );
225        tbb::flow::make_edge(s, lim.decrementer());
226        for ( int j = 0; j < N; ++j ) {
227            bool msg = lim.try_put( T(j) );
228            CHECK_MESSAGE( (( j < i && msg == true ) || ( j >= i && msg == false )), "" );
229            if ( msg == false ) {
230                lim.decrementer().try_put( tbb::flow::continue_msg() );
231                msg = lim.try_put( T(j) );
232                CHECK_MESSAGE( msg == true, "" );
233            }
234        }
235    }
236    return 0;
237 }
238 
239 // reported bug in limiter (http://software.intel.com/en-us/comment/1752355)
240 #define DECREMENT_OUTPUT 1  // the port number of the decrement output of the multifunction_node
241 #define LIMITER_OUTPUT 0    // port number of the integer output
242 
243 typedef tbb::flow::multifunction_node<int, std::tuple<int,tbb::flow::continue_msg> > mfnode_type;
244 
245 std::atomic<size_t> emit_count;
246 std::atomic<size_t> emit_sum;
247 std::atomic<size_t> receive_count;
248 std::atomic<size_t> receive_sum;
249 
250 struct mfnode_body {
251     int max_cnt;
252     std::atomic<int>* my_cnt;
253     mfnode_body(const int& _max, std::atomic<int> &_my) : max_cnt(_max), my_cnt(&_my)  { }
254     void operator()(const int &/*in*/, mfnode_type::output_ports_type &out) {
255         int lcnt = ++(*my_cnt);
256         if(lcnt > max_cnt) {
257             return;
258         }
259         // put one continue_msg to the decrement of the limiter.
260         if(!std::get<DECREMENT_OUTPUT>(out).try_put(tbb::flow::continue_msg())) {
261             CHECK_MESSAGE( (false),"Unexpected rejection of decrement");
262         }
263         {
264             // put messages to the input of the limiter_node until it rejects.
265             while( std::get<LIMITER_OUTPUT>(out).try_put(lcnt) ) {
266                 emit_sum += lcnt;
267                 ++emit_count;
268             }
269         }
270     }
271 };
272 
273 struct fn_body {
274     int operator()(const int &in) {
275         receive_sum += in;
276         ++receive_count;
277         return in;
278     }
279 };
280 
281 //                   +------------+
282 //    +---------+    |            v
283 //    | mf_node |0---+       +----------+          +----------+
284 // +->|         |1---------->| lim_node |--------->| fn_node  |--+
285 // |  +---------+            +----------+          +----------+  |
286 // |                                                             |
287 // |                                                             |
288 // +-------------------------------------------------------------+
289 //
290 void
291 test_multifunction_to_limiter(int _max, int _nparallel) {
292     tbb::flow::graph g;
293     emit_count = 0;
294     emit_sum = 0;
295     receive_count = 0;
296     receive_sum = 0;
297     std::atomic<int> local_cnt;
298     local_cnt = 0;
299     mfnode_type mf_node(g, tbb::flow::unlimited, mfnode_body(_max, local_cnt));
300     tbb::flow::function_node<int, int> fn_node(g, tbb::flow::unlimited, fn_body());
301     tbb::flow::limiter_node<int> lim_node(g, _nparallel);
302     tbb::flow::make_edge(tbb::flow::output_port<LIMITER_OUTPUT>(mf_node), lim_node);
303     tbb::flow::make_edge(tbb::flow::output_port<DECREMENT_OUTPUT>(mf_node), lim_node.decrementer());
304     tbb::flow::make_edge(lim_node, fn_node);
305     tbb::flow::make_edge(fn_node, mf_node);
306 
307     mf_node.try_put(1);
308     g.wait_for_all();
309     CHECK_MESSAGE( (emit_count == receive_count), "counts do not match");
310     CHECK_MESSAGE( (emit_sum == receive_sum), "sums do not match");
311 
312     // reset, test again
313     g.reset();
314     emit_count = 0;
315     emit_sum = 0;
316     receive_count = 0;
317     receive_sum = 0;
318     local_cnt = 0;;
319     mf_node.try_put(1);
320     g.wait_for_all();
321     CHECK_MESSAGE( (emit_count == receive_count), "counts do not match");
322     CHECK_MESSAGE( (emit_sum == receive_sum), "sums do not match");
323 }
324 
325 
326 void
327 test_continue_msg_reception() {
328     tbb::flow::graph g;
329     tbb::flow::limiter_node<int> ln(g,2);
330     tbb::flow::queue_node<int>   qn(g);
331     tbb::flow::make_edge(ln, qn);
332     ln.decrementer().try_put(tbb::flow::continue_msg());
333     ln.try_put(42);
334     g.wait_for_all();
335     int outint;
336     CHECK_MESSAGE( (qn.try_get(outint) && outint == 42), "initial put to decrement stops node");
337 }
338 
339 
340 //
341 // This test ascertains that if a message is not successfully put
342 // to a successor, the message is not dropped but released.
343 //
344 
345 void test_reserve_release_messages() {
346     using namespace tbb::flow;
347     graph g;
348 
349     //making two queue_nodes: one broadcast_node and one limiter_node
350     queue_node<int> input_queue(g);
351     queue_node<int> output_queue(g);
352     broadcast_node<int> broad(g);
353     limiter_node<int, int> limit(g,2); //threshold of 2
354 
355     //edges
356     make_edge(input_queue, limit);
357     make_edge(limit, output_queue);
358     make_edge(broad,limit.decrementer());
359 
360     int list[4] = {19, 33, 72, 98}; //list to be put to the input queue
361 
362     input_queue.try_put(list[0]); // succeeds
363     input_queue.try_put(list[1]); // succeeds
364     input_queue.try_put(list[2]); // fails, stored in upstream buffer
365     g.wait_for_all();
366 
367     remove_edge(limit, output_queue); //remove successor
368 
369     //sending message to the decrement port of the limiter
370     broad.try_put(1); //failed message retrieved.
371     g.wait_for_all();
372 
373     make_edge_impl(limit, output_queue); //putting the successor back
374 
375     broad.try_put(1);  //drop the count
376 
377     input_queue.try_put(list[3]);  //success
378     g.wait_for_all();
379 
380     int var=0;
381 
382     for (int i=0; i<4; i++) {
383         output_queue.try_get(var);
384         CHECK_MESSAGE( (var==list[i]), "some data dropped, input does not match output");
385         g.wait_for_all();
386     }
387 }
388 
389 void test_decrementer() {
390     const int threshold = 5;
391     tbb::flow::graph g;
392     tbb::flow::limiter_node<int, int> limit(g, threshold);
393     tbb::flow::queue_node<int> queue(g);
394     make_edge(limit, queue);
395     int m = 0;
396     CHECK_MESSAGE( ( limit.try_put( m++ )), "Newly constructed limiter node does not accept message." );
397     CHECK_MESSAGE( limit.decrementer().try_put( -threshold ), // close limiter's gate
398                    "Limiter node decrementer's port does not accept message." );
399     CHECK_MESSAGE( ( !limit.try_put( m++ )), "Closed limiter node's accepts message." );
400     CHECK_MESSAGE( limit.decrementer().try_put( threshold + 5 ),  // open limiter's gate
401                    "Limiter node decrementer's port does not accept message." );
402     for( int i = 0; i < threshold; ++i )
403         CHECK_MESSAGE( ( limit.try_put( m++ )), "Limiter node does not accept message while open." );
404     CHECK_MESSAGE( ( !limit.try_put( m )), "Limiter node's gate is not closed." );
405     g.wait_for_all();
406     int expected[] = {0, 2, 3, 4, 5, 6};
407     int actual = -1; m = 0;
408     while( queue.try_get(actual) )
409         CHECK_MESSAGE( actual == expected[m++], "" );
410     CHECK_MESSAGE( ( sizeof(expected) / sizeof(expected[0]) == m), "Not all messages have been processed." );
411     g.wait_for_all();
412 
413     const size_t threshold2 = size_t(-1);
414     tbb::flow::limiter_node<int, long long> limit2(g, threshold2);
415     make_edge(limit2, queue);
416     CHECK_MESSAGE( ( limit2.try_put( 1 )), "Newly constructed limiter node does not accept message." );
417     long long decrement_value = (long long)( size_t(-1)/2 );
418     CHECK_MESSAGE( limit2.decrementer().try_put( -decrement_value ),
419                    "Limiter node decrementer's port does not accept message" );
420     CHECK_MESSAGE( ( limit2.try_put( 2 )), "Limiter's gate should not be closed yet." );
421     CHECK_MESSAGE( limit2.decrementer().try_put( -decrement_value ),
422                    "Limiter node decrementer's port does not accept message" );
423     CHECK_MESSAGE( ( !limit2.try_put( 3 )), "Overflow happened for internal counter." );
424     int expected2[] = {1, 2};
425     actual = -1; m = 0;
426     while( queue.try_get(actual) )
427         CHECK_MESSAGE( actual == expected2[m++], "" );
428     CHECK_MESSAGE( ( sizeof(expected2) / sizeof(expected2[0]) == m), "Not all messages have been processed." );
429     g.wait_for_all();
430 
431     const size_t threshold3 = 10;
432     tbb::flow::limiter_node<int, long long> limit3(g, threshold3);
433     make_edge(limit3, queue);
434     long long decrement_value3 = 3;
435     CHECK_MESSAGE( limit3.decrementer().try_put( -decrement_value3 ),
436                    "Limiter node decrementer's port does not accept message" );
437 
438     m = 0;
439     while( limit3.try_put( m ) ){ m++; };
440     CHECK_MESSAGE( m == threshold3 - decrement_value3, "Not all messages have been accepted." );
441 
442     actual = -1; m = 0;
443     while( queue.try_get(actual) ){
444         CHECK_MESSAGE( actual == m++, "Not all messages have been processed." );
445     }
446 
447     g.wait_for_all();
448     CHECK_MESSAGE( m == threshold3 - decrement_value3, "Not all messages have been processed." );
449 }
450 
451 void test_try_put_without_successors() {
452     tbb::flow::graph g;
453     int try_put_num{3};
454     tbb::flow::buffer_node<int> bn(g);
455     tbb::flow::limiter_node<int> ln(g, try_put_num);
456 
457     tbb::flow::make_edge(bn, ln);
458 
459     int i = 1;
460     for (; i <= try_put_num; i++)
461         bn.try_put(i);
462 
463     std::atomic<int> counter{0};
464     tbb::flow::function_node<int, int> fn(g, tbb::flow::unlimited,
465         [&](int input) {
466             counter += input;
467             return int{};
468         }
469     );
470 
471     make_edge_impl(ln, fn);
472 
473     g.wait_for_all();
474     CHECK((counter == i * try_put_num / 2));
475 
476     // Check the lost message
477     tbb::flow::remove_edge(bn, ln);
478     ln.decrementer().try_put(tbb::flow::continue_msg());
479     bn.try_put(try_put_num + 1);
480     g.wait_for_all();
481     CHECK((counter == i * try_put_num / 2));
482 
483 }
484 
485 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
486 #include <array>
487 #include <vector>
488 void test_follows_and_precedes_api() {
489     using msg_t = tbb::flow::continue_msg;
490 
491     std::array<msg_t, 3> messages_for_follows= { {msg_t(), msg_t(), msg_t()} };
492     std::vector<msg_t> messages_for_precedes = {msg_t()};
493 
494     follows_and_precedes_testing::test_follows
495         <msg_t, tbb::flow::limiter_node<msg_t, msg_t>>(messages_for_follows, 1000);
496     follows_and_precedes_testing::test_precedes
497         <msg_t, tbb::flow::limiter_node<msg_t, msg_t>>(messages_for_precedes, 1000);
498 
499 }
500 #endif
501 
502 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
503 void test_deduction_guides() {
504     using namespace tbb::flow;
505 
506     graph g;
507     broadcast_node<int> br(g);
508     limiter_node<int> l0(g, 100);
509 
510 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
511     limiter_node l1(follows(br), 100);
512     static_assert(std::is_same_v<decltype(l1), limiter_node<int>>);
513 
514     limiter_node l2(precedes(br), 100);
515     static_assert(std::is_same_v<decltype(l2), limiter_node<int>>);
516 #endif
517 
518     limiter_node l3(l0);
519     static_assert(std::is_same_v<decltype(l3), limiter_node<int>>);
520 }
521 #endif
522 
523 //! Test puts on limiter_node with decrements and varying parallelism levels
524 //! \brief \ref error_guessing
525 TEST_CASE("Serial and parallel tests") {
526     for (unsigned i = 1; i <= 2 * utils::MaxThread; ++i) {
527         tbb::task_arena arena(i);
528         arena.execute(
529             [i]() {
530                 test_serial<int>();
531                 test_parallel<int>(i);
532             }
533         );
534     }
535 }
536 
537 //! Test initial put of continue_msg on decrementer port does not stop message flow
538 //! \brief \ref error_guessing
539 TEST_CASE("Test continue_msg reception") {
540     test_continue_msg_reception();
541 }
542 
543 //! Test multifunction_node connected to limiter_node
544 //! \brief \ref error_guessing
545 TEST_CASE("Multifunction connected to limiter") {
546     test_multifunction_to_limiter(30,3);
547     test_multifunction_to_limiter(300,13);
548     test_multifunction_to_limiter(3000,1);
549 }
550 
551 //! Test message release if successor doesn't accept
552 //! \brief \ref requirement
553 TEST_CASE("Message is released if successor does not accept") {
554     test_reserve_release_messages();
555 }
556 
557 //! Test decrementer
558 //! \brief \ref requirement \ref error_guessing
559 TEST_CASE("Decrementer") {
560     test_decrementer();
561 }
562 
563 //! Test try_put() without successor
564 //! \brief \ref error_guessing
565 TEST_CASE("Test try_put() without successors") {
566     test_try_put_without_successors();
567 }
568 
569 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
570 //! Test follows and precedes API
571 //! \brief \ref error_guessing
572 TEST_CASE( "Support for follows and precedes API" ) {
573     test_follows_and_precedes_api();
574 }
575 #endif
576 
577 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
578 //! Test deduction guides
579 //! \brief \ref requirement
580 TEST_CASE( "Deduction guides" ) {
581     test_deduction_guides();
582 }
583 #endif
584 
585 //! Test correct node deallocation while using small_object_pool.
586 //! (see https://github.com/oneapi-src/oneTBB/issues/639)
587 //! \brief \ref error_guessing
588 TEST_CASE("Test correct node deallocation while using small_object_pool") {
589     struct TestLargeStruct {
590         char bytes[512]{ 0 };
591     };
592 
593     tbb::flow::graph graph;
594     tbb::flow::queue_node<TestLargeStruct> input_node{ graph };
595     tbb::flow::function_node<TestLargeStruct> func{ graph, tbb::flow::serial,
596         [](const TestLargeStruct& input) { return input; } };
597 
598     tbb::flow::make_edge( input_node, func );
599     CHECK( input_node.try_put( TestLargeStruct{} ) );
600     graph.wait_for_all();
601 
602     tbb::task_scheduler_handle handle = tbb::task_scheduler_handle::get();
603     REQUIRE_NOTHROW( tbb::finalize( handle, std::nothrow ) );
604 }
605