xref: /oneTBB/test/tbb/test_limiter_node.cpp (revision 74b7fc74)
1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #if __INTEL_COMPILER && _MSC_VER
18 #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
19 #endif
20 
21 #define TBB_PREVIEW_WAITING_FOR_WORKERS 1
22 
23 #include "common/config.h"
24 
25 #include "tbb/flow_graph.h"
26 
27 #include "common/test.h"
28 #include "common/utils.h"
29 #include "common/utils_assert.h"
30 #include "common/test_follows_and_precedes_api.h"
31 #include "tbb/global_control.h"
32 
33 #include <atomic>
34 
35 
36 //! \file test_limiter_node.cpp
37 //! \brief Test for [flow_graph.limiter_node] specification
38 
39 
40 const int L = 10;
41 const int N = 1000;
42 
43 using tbb::detail::d1::SUCCESSFULLY_ENQUEUED;
44 using tbb::detail::d1::graph_task;
45 
46 template< typename T >
47 struct serial_receiver : public tbb::flow::receiver<T>, utils::NoAssign {
48    T next_value;
49    tbb::flow::graph& my_graph;
50 
51    serial_receiver(tbb::flow::graph& g) : next_value(T(0)), my_graph(g) {}
52 
53    graph_task* try_put_task( const T &v ) override {
54        CHECK_MESSAGE( next_value++  == v, "" );
55        return const_cast<graph_task*>(SUCCESSFULLY_ENQUEUED);
56    }
57 
58     tbb::flow::graph& graph_reference() const override {
59         return my_graph;
60     }
61 };
62 
63 template< typename T >
64 struct parallel_receiver : public tbb::flow::receiver<T>, utils::NoAssign {
65 
66     std::atomic<int> my_count;
67     tbb::flow::graph& my_graph;
68 
69     parallel_receiver(tbb::flow::graph& g) : my_graph(g) { my_count = 0; }
70 
71     graph_task* try_put_task( const T &/*v*/ ) override {
72        ++my_count;
73        return const_cast<graph_task*>(SUCCESSFULLY_ENQUEUED);
74     }
75 
76     tbb::flow::graph& graph_reference() const override {
77         return my_graph;
78     }
79 };
80 
81 template< typename T >
82 struct empty_sender : public tbb::flow::sender<T> {
83         typedef typename tbb::flow::sender<T>::successor_type successor_type;
84 
85         bool register_successor( successor_type & ) override { return false; }
86         bool remove_successor( successor_type & ) override { return false; }
87 };
88 
89 
90 template< typename T >
91 struct put_body : utils::NoAssign {
92 
93     tbb::flow::limiter_node<T> &my_lim;
94     std::atomic<int> &my_accept_count;
95 
96     put_body( tbb::flow::limiter_node<T> &lim, std::atomic<int> &accept_count ) :
97         my_lim(lim), my_accept_count(accept_count) {}
98 
99     void operator()( int ) const {
100         for ( int i = 0; i < L; ++i ) {
101             bool msg = my_lim.try_put( T(i) );
102             if ( msg == true )
103                ++my_accept_count;
104         }
105     }
106 };
107 
108 template< typename T >
109 struct put_dec_body : utils::NoAssign {
110 
111     tbb::flow::limiter_node<T> &my_lim;
112     std::atomic<int> &my_accept_count;
113 
114     put_dec_body( tbb::flow::limiter_node<T> &lim, std::atomic<int> &accept_count ) :
115         my_lim(lim), my_accept_count(accept_count) {}
116 
117     void operator()( int ) const {
118         int local_accept_count = 0;
119         while ( local_accept_count < N ) {
120             bool msg = my_lim.try_put( T(local_accept_count) );
121             if ( msg == true ) {
122                 ++local_accept_count;
123                 ++my_accept_count;
124                 my_lim.decrementer().try_put( tbb::flow::continue_msg() );
125             }
126         }
127     }
128 
129 };
130 
131 template< typename T >
132 void test_puts_with_decrements( int num_threads, tbb::flow::limiter_node< T >& lim , tbb::flow::graph& g) {
133     parallel_receiver<T> r(g);
134     empty_sender< tbb::flow::continue_msg > s;
135     std::atomic<int> accept_count;
136     accept_count = 0;
137     tbb::flow::make_edge( lim, r );
138     tbb::flow::make_edge(s, lim.decrementer());
139 
140     // test puts with decrements
141     utils::NativeParallelFor( num_threads, put_dec_body<T>(lim, accept_count) );
142     int c = accept_count;
143     CHECK_MESSAGE( c == N*num_threads, "" );
144     CHECK_MESSAGE( r.my_count == N*num_threads, "" );
145 }
146 
147 //
148 // Tests
149 //
150 // limiter only forwards below the limit, multiple parallel senders / single receiver
151 // multiple parallel senders that put to decrement at each accept, limiter accepts new messages
152 //
153 //
154 template< typename T >
155 int test_parallel(int num_threads) {
156 
157    // test puts with no decrements
158    for ( int i = 0; i < L; ++i ) {
159        tbb::flow::graph g;
160        tbb::flow::limiter_node< T > lim(g, i);
161        parallel_receiver<T> r(g);
162        std::atomic<int> accept_count;
163        accept_count = 0;
164        tbb::flow::make_edge( lim, r );
165        // test puts with no decrements
166        utils::NativeParallelFor( num_threads, put_body<T>(lim, accept_count) );
167        g.wait_for_all();
168        int c = accept_count;
169        CHECK_MESSAGE( c == i, "" );
170    }
171 
172    // test puts with decrements
173    for ( int i = 1; i < L; ++i ) {
174        tbb::flow::graph g;
175        tbb::flow::limiter_node< T > lim(g, i);
176        test_puts_with_decrements(num_threads, lim, g);
177        tbb::flow::limiter_node< T > lim_copy( lim );
178        test_puts_with_decrements(num_threads, lim_copy, g);
179    }
180 
181    return 0;
182 }
183 
184 //
185 // Tests
186 //
187 // limiter only forwards below the limit, single sender / single receiver
188 // at reject, a put to decrement, will cause next message to be accepted
189 //
190 template< typename T >
191 int test_serial() {
192 
193    // test puts with no decrements
194    for ( int i = 0; i < L; ++i ) {
195        tbb::flow::graph g;
196        tbb::flow::limiter_node< T > lim(g, i);
197        serial_receiver<T> r(g);
198        tbb::flow::make_edge( lim, r );
199        for ( int j = 0; j < L; ++j ) {
200            bool msg = lim.try_put( T(j) );
201            CHECK_MESSAGE( (( j < i && msg == true ) || ( j >= i && msg == false )), "" );
202        }
203        g.wait_for_all();
204    }
205 
206    // test puts with decrements
207    for ( int i = 1; i < L; ++i ) {
208        tbb::flow::graph g;
209        tbb::flow::limiter_node< T > lim(g, i);
210        serial_receiver<T> r(g);
211        empty_sender< tbb::flow::continue_msg > s;
212        tbb::flow::make_edge( lim, r );
213        tbb::flow::make_edge(s, lim.decrementer());
214        for ( int j = 0; j < N; ++j ) {
215            bool msg = lim.try_put( T(j) );
216            CHECK_MESSAGE( (( j < i && msg == true ) || ( j >= i && msg == false )), "" );
217            if ( msg == false ) {
218                lim.decrementer().try_put( tbb::flow::continue_msg() );
219                msg = lim.try_put( T(j) );
220                CHECK_MESSAGE( msg == true, "" );
221            }
222        }
223    }
224    return 0;
225 }
226 
227 // reported bug in limiter (http://software.intel.com/en-us/comment/1752355)
228 #define DECREMENT_OUTPUT 1  // the port number of the decrement output of the multifunction_node
229 #define LIMITER_OUTPUT 0    // port number of the integer output
230 
231 typedef tbb::flow::multifunction_node<int, std::tuple<int,tbb::flow::continue_msg> > mfnode_type;
232 
233 std::atomic<size_t> emit_count;
234 std::atomic<size_t> emit_sum;
235 std::atomic<size_t> receive_count;
236 std::atomic<size_t> receive_sum;
237 
238 struct mfnode_body {
239     int max_cnt;
240     std::atomic<int>* my_cnt;
241     mfnode_body(const int& _max, std::atomic<int> &_my) : max_cnt(_max), my_cnt(&_my)  { }
242     void operator()(const int &/*in*/, mfnode_type::output_ports_type &out) {
243         int lcnt = ++(*my_cnt);
244         if(lcnt > max_cnt) {
245             return;
246         }
247         // put one continue_msg to the decrement of the limiter.
248         if(!std::get<DECREMENT_OUTPUT>(out).try_put(tbb::flow::continue_msg())) {
249             CHECK_MESSAGE( (false),"Unexpected rejection of decrement");
250         }
251         {
252             // put messages to the input of the limiter_node until it rejects.
253             while( std::get<LIMITER_OUTPUT>(out).try_put(lcnt) ) {
254                 emit_sum += lcnt;
255                 ++emit_count;
256             }
257         }
258     }
259 };
260 
261 struct fn_body {
262     int operator()(const int &in) {
263         receive_sum += in;
264         ++receive_count;
265         return in;
266     }
267 };
268 
269 //                   +------------+
270 //    +---------+    |            v
271 //    | mf_node |0---+       +----------+          +----------+
272 // +->|         |1---------->| lim_node |--------->| fn_node  |--+
273 // |  +---------+            +----------+          +----------+  |
274 // |                                                             |
275 // |                                                             |
276 // +-------------------------------------------------------------+
277 //
278 void
279 test_multifunction_to_limiter(int _max, int _nparallel) {
280     tbb::flow::graph g;
281     emit_count = 0;
282     emit_sum = 0;
283     receive_count = 0;
284     receive_sum = 0;
285     std::atomic<int> local_cnt;
286     local_cnt = 0;
287     mfnode_type mf_node(g, tbb::flow::unlimited, mfnode_body(_max, local_cnt));
288     tbb::flow::function_node<int, int> fn_node(g, tbb::flow::unlimited, fn_body());
289     tbb::flow::limiter_node<int> lim_node(g, _nparallel);
290     tbb::flow::make_edge(tbb::flow::output_port<LIMITER_OUTPUT>(mf_node), lim_node);
291     tbb::flow::make_edge(tbb::flow::output_port<DECREMENT_OUTPUT>(mf_node), lim_node.decrementer());
292     tbb::flow::make_edge(lim_node, fn_node);
293     tbb::flow::make_edge(fn_node, mf_node);
294 
295     mf_node.try_put(1);
296     g.wait_for_all();
297     CHECK_MESSAGE( (emit_count == receive_count), "counts do not match");
298     CHECK_MESSAGE( (emit_sum == receive_sum), "sums do not match");
299 
300     // reset, test again
301     g.reset();
302     emit_count = 0;
303     emit_sum = 0;
304     receive_count = 0;
305     receive_sum = 0;
306     local_cnt = 0;;
307     mf_node.try_put(1);
308     g.wait_for_all();
309     CHECK_MESSAGE( (emit_count == receive_count), "counts do not match");
310     CHECK_MESSAGE( (emit_sum == receive_sum), "sums do not match");
311 }
312 
313 
314 void
315 test_continue_msg_reception() {
316     tbb::flow::graph g;
317     tbb::flow::limiter_node<int> ln(g,2);
318     tbb::flow::queue_node<int>   qn(g);
319     tbb::flow::make_edge(ln, qn);
320     ln.decrementer().try_put(tbb::flow::continue_msg());
321     ln.try_put(42);
322     g.wait_for_all();
323     int outint;
324     CHECK_MESSAGE( (qn.try_get(outint) && outint == 42), "initial put to decrement stops node");
325 }
326 
327 
328 //
329 // This test ascertains that if a message is not successfully put
330 // to a successor, the message is not dropped but released.
331 //
332 
333 void test_reserve_release_messages() {
334     using namespace tbb::flow;
335     graph g;
336 
337     //making two queue_nodes: one broadcast_node and one limiter_node
338     queue_node<int> input_queue(g);
339     queue_node<int> output_queue(g);
340     broadcast_node<int> broad(g);
341     limiter_node<int, int> limit(g,2); //threshold of 2
342 
343     //edges
344     make_edge(input_queue, limit);
345     make_edge(limit, output_queue);
346     make_edge(broad,limit.decrementer());
347 
348     int list[4] = {19, 33, 72, 98}; //list to be put to the input queue
349 
350     input_queue.try_put(list[0]); // succeeds
351     input_queue.try_put(list[1]); // succeeds
352     input_queue.try_put(list[2]); // fails, stored in upstream buffer
353     g.wait_for_all();
354 
355     remove_edge(limit, output_queue); //remove successor
356 
357     //sending message to the decrement port of the limiter
358     broad.try_put(1); //failed message retrieved.
359     g.wait_for_all();
360 
361     tbb::flow::make_edge(limit, output_queue); //putting the successor back
362 
363     broad.try_put(1);  //drop the count
364 
365     input_queue.try_put(list[3]);  //success
366     g.wait_for_all();
367 
368     int var=0;
369 
370     for (int i=0; i<4; i++) {
371         output_queue.try_get(var);
372         CHECK_MESSAGE( (var==list[i]), "some data dropped, input does not match output");
373         g.wait_for_all();
374     }
375 }
376 
377 void test_decrementer() {
378     const int threshold = 5;
379     tbb::flow::graph g;
380     tbb::flow::limiter_node<int, int> limit(g, threshold);
381     tbb::flow::queue_node<int> queue(g);
382     make_edge(limit, queue);
383     int m = 0;
384     CHECK_MESSAGE( ( limit.try_put( m++ )), "Newly constructed limiter node does not accept message." );
385     CHECK_MESSAGE( limit.decrementer().try_put( -threshold ), // close limiter's gate
386                    "Limiter node decrementer's port does not accept message." );
387     CHECK_MESSAGE( ( !limit.try_put( m++ )), "Closed limiter node's accepts message." );
388     CHECK_MESSAGE( limit.decrementer().try_put( threshold + 5 ),  // open limiter's gate
389                    "Limiter node decrementer's port does not accept message." );
390     for( int i = 0; i < threshold; ++i )
391         CHECK_MESSAGE( ( limit.try_put( m++ )), "Limiter node does not accept message while open." );
392     CHECK_MESSAGE( ( !limit.try_put( m )), "Limiter node's gate is not closed." );
393     g.wait_for_all();
394     int expected[] = {0, 2, 3, 4, 5, 6};
395     int actual = -1; m = 0;
396     while( queue.try_get(actual) )
397         CHECK_MESSAGE( actual == expected[m++], "" );
398     CHECK_MESSAGE( ( sizeof(expected) / sizeof(expected[0]) == m), "Not all messages have been processed." );
399     g.wait_for_all();
400 
401     const size_t threshold2 = size_t(-1);
402     tbb::flow::limiter_node<int, long long> limit2(g, threshold2);
403     make_edge(limit2, queue);
404     CHECK_MESSAGE( ( limit2.try_put( 1 )), "Newly constructed limiter node does not accept message." );
405     long long decrement_value = (long long)( size_t(-1)/2 );
406     CHECK_MESSAGE( limit2.decrementer().try_put( -decrement_value ),
407                    "Limiter node decrementer's port does not accept message" );
408     CHECK_MESSAGE( ( limit2.try_put( 2 )), "Limiter's gate should not be closed yet." );
409     CHECK_MESSAGE( limit2.decrementer().try_put( -decrement_value ),
410                    "Limiter node decrementer's port does not accept message" );
411     CHECK_MESSAGE( ( !limit2.try_put( 3 )), "Overflow happened for internal counter." );
412     int expected2[] = {1, 2};
413     actual = -1; m = 0;
414     while( queue.try_get(actual) )
415         CHECK_MESSAGE( actual == expected2[m++], "" );
416     CHECK_MESSAGE( ( sizeof(expected2) / sizeof(expected2[0]) == m), "Not all messages have been processed." );
417     g.wait_for_all();
418 
419     const size_t threshold3 = 10;
420     tbb::flow::limiter_node<int, long long> limit3(g, threshold3);
421     make_edge(limit3, queue);
422     long long decrement_value3 = 3;
423     CHECK_MESSAGE( limit3.decrementer().try_put( -decrement_value3 ),
424                    "Limiter node decrementer's port does not accept message" );
425 
426     m = 0;
427     while( limit3.try_put( m ) ){ m++; };
428     CHECK_MESSAGE( m == threshold3 - decrement_value3, "Not all messages have been accepted." );
429 
430     actual = -1; m = 0;
431     while( queue.try_get(actual) ){
432         CHECK_MESSAGE( actual == m++, "Not all messages have been processed." );
433     }
434 
435     g.wait_for_all();
436     CHECK_MESSAGE( m == threshold3 - decrement_value3, "Not all messages have been processed." );
437 }
438 
439 void test_try_put_without_successors() {
440     tbb::flow::graph g;
441     int try_put_num{3};
442     tbb::flow::buffer_node<int> bn(g);
443     tbb::flow::limiter_node<int> ln(g, try_put_num);
444 
445     tbb::flow::make_edge(bn, ln);
446 
447     int i = 1;
448     for (; i <= try_put_num; i++)
449         bn.try_put(i);
450 
451     std::atomic<int> counter{0};
452     tbb::flow::function_node<int, int> fn(g, tbb::flow::unlimited,
453         [&](int input) {
454             counter += input;
455             return int{};
456         }
457     );
458 
459     tbb::flow::make_edge(ln, fn);
460 
461     g.wait_for_all();
462     CHECK((counter == i * try_put_num / 2));
463 
464     // Check the lost message
465     tbb::flow::remove_edge(bn, ln);
466     ln.decrementer().try_put(tbb::flow::continue_msg());
467     bn.try_put(try_put_num + 1);
468     g.wait_for_all();
469     CHECK((counter == i * try_put_num / 2));
470 
471 }
472 
473 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
474 #include <array>
475 #include <vector>
476 void test_follows_and_precedes_api() {
477     using msg_t = tbb::flow::continue_msg;
478 
479     std::array<msg_t, 3> messages_for_follows= { {msg_t(), msg_t(), msg_t()} };
480     std::vector<msg_t> messages_for_precedes = {msg_t()};
481 
482     follows_and_precedes_testing::test_follows
483         <msg_t, tbb::flow::limiter_node<msg_t, msg_t>>(messages_for_follows, 1000);
484     follows_and_precedes_testing::test_precedes
485         <msg_t, tbb::flow::limiter_node<msg_t, msg_t>>(messages_for_precedes, 1000);
486 
487 }
488 #endif
489 
490 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
491 void test_deduction_guides() {
492     using namespace tbb::flow;
493 
494     graph g;
495     broadcast_node<int> br(g);
496     limiter_node<int> l0(g, 100);
497 
498 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
499     limiter_node l1(follows(br), 100);
500     static_assert(std::is_same_v<decltype(l1), limiter_node<int>>);
501 
502     limiter_node l2(precedes(br), 100);
503     static_assert(std::is_same_v<decltype(l2), limiter_node<int>>);
504 #endif
505 
506     limiter_node l3(l0);
507     static_assert(std::is_same_v<decltype(l3), limiter_node<int>>);
508 }
509 #endif
510 
511 void test_decrement_while_try_put_task() {
512     constexpr int threshold = 50000;
513 
514     tbb::flow::graph graph{};
515     std::atomic<int> processed;
516     tbb::flow::input_node<int> input{ graph, [&](tbb::flow_control & fc) -> int {
517         static int i = {};
518         if (i++ >= threshold) fc.stop();
519         return i;
520     }};
521     tbb::flow::limiter_node<int, int> blockingNode{ graph, 1 };
522     tbb::flow::multifunction_node<int, std::tuple<int>> processing{ graph, tbb::flow::serial,
523         [&](const int & value, typename decltype(processing)::output_ports_type & out) {
524             if (value != threshold)
525                 std::get<0>(out).try_put(1);
526             processed.store(value);
527         }};
528 
529     tbb::flow::make_edge(input, blockingNode);
530     tbb::flow::make_edge(blockingNode, processing);
531     tbb::flow::make_edge(processing, blockingNode.decrementer());
532 
533     input.activate();
534 
535     graph.wait_for_all();
536     CHECK_MESSAGE(processed.load() == threshold, "decrementer terminate flow graph work");
537 }
538 
539 
540 //! Test puts on limiter_node with decrements and varying parallelism levels
541 //! \brief \ref error_guessing
542 TEST_CASE("Serial and parallel tests") {
543     for (unsigned i = 1; i <= 2 * utils::MaxThread; ++i) {
544         tbb::task_arena arena(i);
545         arena.execute(
546             [i]() {
547                 test_serial<int>();
548                 test_parallel<int>(i);
549             }
550         );
551     }
552 }
553 
554 //! Test initial put of continue_msg on decrementer port does not stop message flow
555 //! \brief \ref error_guessing
556 TEST_CASE("Test continue_msg reception") {
557     test_continue_msg_reception();
558 }
559 
560 //! Test put message on decrementer port does not stop message flow
561 //! \brief \ref error_guessing
562 TEST_CASE("Test try_put to decrementer while try_put to limiter_node") {
563     test_decrement_while_try_put_task();
564 }
565 
566 //! Test multifunction_node connected to limiter_node
567 //! \brief \ref error_guessing
568 TEST_CASE("Multifunction connected to limiter") {
569     test_multifunction_to_limiter(30,3);
570     test_multifunction_to_limiter(300,13);
571     test_multifunction_to_limiter(3000,1);
572 }
573 
574 //! Test message release if successor doesn't accept
575 //! \brief \ref requirement
576 TEST_CASE("Message is released if successor does not accept") {
577     test_reserve_release_messages();
578 }
579 
580 //! Test decrementer
581 //! \brief \ref requirement \ref error_guessing
582 TEST_CASE("Decrementer") {
583     test_decrementer();
584 }
585 
586 //! Test try_put() without successor
587 //! \brief \ref error_guessing
588 TEST_CASE("Test try_put() without successors") {
589     test_try_put_without_successors();
590 }
591 
592 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
593 //! Test follows and precedes API
594 //! \brief \ref error_guessing
595 TEST_CASE( "Support for follows and precedes API" ) {
596     test_follows_and_precedes_api();
597 }
598 #endif
599 
600 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
601 //! Test deduction guides
602 //! \brief \ref requirement
603 TEST_CASE( "Deduction guides" ) {
604     test_deduction_guides();
605 }
606 #endif
607 
608 //! Test correct node deallocation while using small_object_pool.
609 //! (see https://github.com/oneapi-src/oneTBB/issues/639)
610 //! \brief \ref error_guessing
611 TEST_CASE("Test correct node deallocation while using small_object_pool") {
612     struct TestLargeStruct {
613         char bytes[512]{ 0 };
614     };
615 
616     tbb::flow::graph graph;
617     tbb::flow::queue_node<TestLargeStruct> input_node{ graph };
618     tbb::flow::function_node<TestLargeStruct> func{ graph, tbb::flow::serial,
619         [](const TestLargeStruct& input) { return input; } };
620 
621     tbb::flow::make_edge( input_node, func );
622     CHECK( input_node.try_put( TestLargeStruct{} ) );
623     graph.wait_for_all();
624 
625     tbb::task_scheduler_handle handle = tbb::task_scheduler_handle::get();
626     REQUIRE_NOTHROW( tbb::finalize( handle, std::nothrow ) );
627 }
628