xref: /oneTBB/test/tbb/test_limiter_node.cpp (revision 51c0b2f7)
1 /*
2     Copyright (c) 2005-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "common/config.h"
18 
19 #include "tbb/flow_graph.h"
20 
21 #include "common/test.h"
22 #include "common/utils.h"
23 #include "common/utils_assert.h"
24 #include "common/test_follows_and_precedes_api.h"
25 
26 #include <atomic>
27 
28 
29 //! \file test_limiter_node.cpp
30 //! \brief Test for [flow_graph.limiter_node] specification
31 
32 
33 const int L = 10;
34 const int N = 1000;
35 
36 using tbb::detail::d1::SUCCESSFULLY_ENQUEUED;
37 using tbb::detail::d1::graph_task;
38 
39 template< typename T >
40 struct serial_receiver : public tbb::flow::receiver<T>, utils::NoAssign {
41    T next_value;
42    tbb::flow::graph& my_graph;
43 
44    serial_receiver(tbb::flow::graph& g) : next_value(T(0)), my_graph(g) {}
45 
46    graph_task* try_put_task( const T &v ) override {
47        CHECK_MESSAGE( next_value++  == v, "" );
48        return const_cast<graph_task*>(SUCCESSFULLY_ENQUEUED);
49    }
50 
51     tbb::flow::graph& graph_reference() const override {
52         return my_graph;
53     }
54 };
55 
56 template< typename T >
57 struct parallel_receiver : public tbb::flow::receiver<T>, utils::NoAssign {
58 
59     std::atomic<int> my_count;
60     tbb::flow::graph& my_graph;
61 
62     parallel_receiver(tbb::flow::graph& g) : my_graph(g) { my_count = 0; }
63 
64     graph_task* try_put_task( const T &/*v*/ ) override {
65        ++my_count;
66        return const_cast<graph_task*>(SUCCESSFULLY_ENQUEUED);
67     }
68 
69     tbb::flow::graph& graph_reference() const override {
70         return my_graph;
71     }
72 };
73 
74 template< typename T >
75 struct empty_sender : public tbb::flow::sender<T> {
76         typedef typename tbb::flow::sender<T>::successor_type successor_type;
77 
78         bool register_successor( successor_type & ) override { return false; }
79         bool remove_successor( successor_type & ) override { return false; }
80 };
81 
82 
83 template< typename T >
84 struct put_body : utils::NoAssign {
85 
86     tbb::flow::limiter_node<T> &my_lim;
87     std::atomic<int> &my_accept_count;
88 
89     put_body( tbb::flow::limiter_node<T> &lim, std::atomic<int> &accept_count ) :
90         my_lim(lim), my_accept_count(accept_count) {}
91 
92     void operator()( int ) const {
93         for ( int i = 0; i < L; ++i ) {
94             bool msg = my_lim.try_put( T(i) );
95             if ( msg == true )
96                ++my_accept_count;
97         }
98     }
99 };
100 
101 template< typename T >
102 struct put_dec_body : utils::NoAssign {
103 
104     tbb::flow::limiter_node<T> &my_lim;
105     std::atomic<int> &my_accept_count;
106 
107     put_dec_body( tbb::flow::limiter_node<T> &lim, std::atomic<int> &accept_count ) :
108         my_lim(lim), my_accept_count(accept_count) {}
109 
110     void operator()( int ) const {
111         int local_accept_count = 0;
112         while ( local_accept_count < N ) {
113             bool msg = my_lim.try_put( T(local_accept_count) );
114             if ( msg == true ) {
115                 ++local_accept_count;
116                 ++my_accept_count;
117                 my_lim.decrement.try_put( tbb::flow::continue_msg() );
118             }
119         }
120     }
121 
122 };
123 
124 template< typename T >
125 void test_puts_with_decrements( int num_threads, tbb::flow::limiter_node< T >& lim , tbb::flow::graph& g) {
126     parallel_receiver<T> r(g);
127     empty_sender< tbb::flow::continue_msg > s;
128     std::atomic<int> accept_count;
129     accept_count = 0;
130     tbb::flow::make_edge( lim, r );
131     tbb::flow::make_edge(s, lim.decrement);
132 
133     // test puts with decrements
134     utils::NativeParallelFor( num_threads, put_dec_body<T>(lim, accept_count) );
135     int c = accept_count;
136     CHECK_MESSAGE( c == N*num_threads, "" );
137     CHECK_MESSAGE( r.my_count == N*num_threads, "" );
138 }
139 
140 //
141 // Tests
142 //
143 // limiter only forwards below the limit, multiple parallel senders / single receiver
144 // multiple parallel senders that put to decrement at each accept, limiter accepts new messages
145 //
146 //
147 template< typename T >
148 int test_parallel(int num_threads) {
149 
150    // test puts with no decrements
151    for ( int i = 0; i < L; ++i ) {
152        tbb::flow::graph g;
153        tbb::flow::limiter_node< T > lim(g, i);
154        parallel_receiver<T> r(g);
155        std::atomic<int> accept_count;
156        accept_count = 0;
157        tbb::flow::make_edge( lim, r );
158        // test puts with no decrements
159        utils::NativeParallelFor( num_threads, put_body<T>(lim, accept_count) );
160        g.wait_for_all();
161        int c = accept_count;
162        CHECK_MESSAGE( c == i, "" );
163    }
164 
165    // test puts with decrements
166    for ( int i = 1; i < L; ++i ) {
167        tbb::flow::graph g;
168        tbb::flow::limiter_node< T > lim(g, i);
169        test_puts_with_decrements(num_threads, lim, g);
170        tbb::flow::limiter_node< T > lim_copy( lim );
171        test_puts_with_decrements(num_threads, lim_copy, g);
172    }
173 
174    return 0;
175 }
176 
177 //
178 // Tests
179 //
180 // limiter only forwards below the limit, single sender / single receiver
181 // at reject, a put to decrement, will cause next message to be accepted
182 //
183 template< typename T >
184 int test_serial() {
185 
186    // test puts with no decrements
187    for ( int i = 0; i < L; ++i ) {
188        tbb::flow::graph g;
189        tbb::flow::limiter_node< T > lim(g, i);
190        serial_receiver<T> r(g);
191        tbb::flow::make_edge( lim, r );
192        for ( int j = 0; j < L; ++j ) {
193            bool msg = lim.try_put( T(j) );
194            CHECK_MESSAGE( (( j < i && msg == true ) || ( j >= i && msg == false )), "" );
195        }
196        g.wait_for_all();
197    }
198 
199    // test puts with decrements
200    for ( int i = 1; i < L; ++i ) {
201        tbb::flow::graph g;
202        tbb::flow::limiter_node< T > lim(g, i);
203        serial_receiver<T> r(g);
204        empty_sender< tbb::flow::continue_msg > s;
205        tbb::flow::make_edge( lim, r );
206        tbb::flow::make_edge(s, lim.decrement);
207        for ( int j = 0; j < N; ++j ) {
208            bool msg = lim.try_put( T(j) );
209            CHECK_MESSAGE( (( j < i && msg == true ) || ( j >= i && msg == false )), "" );
210            if ( msg == false ) {
211                lim.decrement.try_put( tbb::flow::continue_msg() );
212                msg = lim.try_put( T(j) );
213                CHECK_MESSAGE( msg == true, "" );
214            }
215        }
216    }
217    return 0;
218 }
219 
220 // reported bug in limiter (http://software.intel.com/en-us/comment/1752355)
221 #define DECREMENT_OUTPUT 1  // the port number of the decrement output of the multifunction_node
222 #define LIMITER_OUTPUT 0    // port number of the integer output
223 
224 typedef tbb::flow::multifunction_node<int, std::tuple<int,tbb::flow::continue_msg> > mfnode_type;
225 
226 std::atomic<size_t> emit_count;
227 std::atomic<size_t> emit_sum;
228 std::atomic<size_t> receive_count;
229 std::atomic<size_t> receive_sum;
230 
231 struct mfnode_body {
232     int max_cnt;
233     std::atomic<int>* my_cnt;
234     mfnode_body(const int& _max, std::atomic<int> &_my) : max_cnt(_max), my_cnt(&_my)  { }
235     void operator()(const int &/*in*/, mfnode_type::output_ports_type &out) {
236         int lcnt = ++(*my_cnt);
237         if(lcnt > max_cnt) {
238             return;
239         }
240         // put one continue_msg to the decrement of the limiter.
241         if(!std::get<DECREMENT_OUTPUT>(out).try_put(tbb::flow::continue_msg())) {
242             CHECK_MESSAGE( (false),"Unexpected rejection of decrement");
243         }
244         {
245             // put messages to the input of the limiter_node until it rejects.
246             while( std::get<LIMITER_OUTPUT>(out).try_put(lcnt) ) {
247                 emit_sum += lcnt;
248                 ++emit_count;
249             }
250         }
251     }
252 };
253 
254 struct fn_body {
255     int operator()(const int &in) {
256         receive_sum += in;
257         ++receive_count;
258         return in;
259     }
260 };
261 
262 //                   +------------+
263 //    +---------+    |            v
264 //    | mf_node |0---+       +----------+          +----------+
265 // +->|         |1---------->| lim_node |--------->| fn_node  |--+
266 // |  +---------+            +----------+          +----------+  |
267 // |                                                             |
268 // |                                                             |
269 // +-------------------------------------------------------------+
270 //
271 void
272 test_multifunction_to_limiter(int _max, int _nparallel) {
273     tbb::flow::graph g;
274     emit_count = 0;
275     emit_sum = 0;
276     receive_count = 0;
277     receive_sum = 0;
278     std::atomic<int> local_cnt;
279     local_cnt = 0;
280     mfnode_type mf_node(g, tbb::flow::unlimited, mfnode_body(_max, local_cnt));
281     tbb::flow::function_node<int, int> fn_node(g, tbb::flow::unlimited, fn_body());
282     tbb::flow::limiter_node<int> lim_node(g, _nparallel);
283     tbb::flow::make_edge(tbb::flow::output_port<LIMITER_OUTPUT>(mf_node), lim_node);
284     tbb::flow::make_edge(tbb::flow::output_port<DECREMENT_OUTPUT>(mf_node), lim_node.decrement);
285     tbb::flow::make_edge(lim_node, fn_node);
286     tbb::flow::make_edge(fn_node, mf_node);
287 
288     mf_node.try_put(1);
289     g.wait_for_all();
290     CHECK_MESSAGE( (emit_count == receive_count), "counts do not match");
291     CHECK_MESSAGE( (emit_sum == receive_sum), "sums do not match");
292 
293     // reset, test again
294     g.reset();
295     emit_count = 0;
296     emit_sum = 0;
297     receive_count = 0;
298     receive_sum = 0;
299     local_cnt = 0;;
300     mf_node.try_put(1);
301     g.wait_for_all();
302     CHECK_MESSAGE( (emit_count == receive_count), "counts do not match");
303     CHECK_MESSAGE( (emit_sum == receive_sum), "sums do not match");
304 }
305 
306 
307 void
308 test_continue_msg_reception() {
309     tbb::flow::graph g;
310     tbb::flow::limiter_node<int> ln(g,2);
311     tbb::flow::queue_node<int>   qn(g);
312     tbb::flow::make_edge(ln, qn);
313     ln.decrement.try_put(tbb::flow::continue_msg());
314     ln.try_put(42);
315     g.wait_for_all();
316     int outint;
317     CHECK_MESSAGE( (qn.try_get(outint) && outint == 42), "initial put to decrement stops node");
318 }
319 
320 
321 //
322 // This test ascertains that if a message is not successfully put
323 // to a successor, the message is not dropped but released.
324 //
325 
326 void test_reserve_release_messages() {
327     using namespace tbb::flow;
328     graph g;
329 
330     //making two queue_nodes: one broadcast_node and one limiter_node
331     queue_node<int> input_queue(g);
332     queue_node<int> output_queue(g);
333     broadcast_node<int> broad(g);
334     limiter_node<int, int> limit(g,2); //threshold of 2
335 
336     //edges
337     make_edge(input_queue, limit);
338     make_edge(limit, output_queue);
339     make_edge(broad,limit.decrement);
340 
341     int list[4] = {19, 33, 72, 98}; //list to be put to the input queue
342 
343     input_queue.try_put(list[0]); // succeeds
344     input_queue.try_put(list[1]); // succeeds
345     input_queue.try_put(list[2]); // fails, stored in upstream buffer
346     g.wait_for_all();
347 
348     remove_edge(limit, output_queue); //remove successor
349 
350     //sending message to the decrement port of the limiter
351     broad.try_put(1); //failed message retrieved.
352     g.wait_for_all();
353 
354     make_edge(limit, output_queue); //putting the successor back
355 
356     broad.try_put(1);  //drop the count
357 
358     input_queue.try_put(list[3]);  //success
359     g.wait_for_all();
360 
361     int var=0;
362 
363     for (int i=0; i<4; i++) {
364         output_queue.try_get(var);
365         CHECK_MESSAGE( (var==list[i]), "some data dropped, input does not match output");
366         g.wait_for_all();
367     }
368 }
369 
370 void test_decrementer() {
371     const int threshold = 5;
372     tbb::flow::graph g;
373     tbb::flow::limiter_node<int, int> limit(g, threshold);
374     tbb::flow::queue_node<int> queue(g);
375     make_edge(limit, queue);
376     int m = 0;
377     CHECK_MESSAGE( ( limit.try_put( m++ )), "Newly constructed limiter node does not accept message." );
378     CHECK_MESSAGE( limit.decrement.try_put( -threshold ), // close limiter's gate
379                    "Limiter node decrementer's port does not accept message." );
380     CHECK_MESSAGE( ( !limit.try_put( m++ )), "Closed limiter node's accepts message." );
381     CHECK_MESSAGE( limit.decrement.try_put( threshold + 5 ),  // open limiter's gate
382                    "Limiter node decrementer's port does not accept message." );
383     for( int i = 0; i < threshold; ++i )
384         CHECK_MESSAGE( ( limit.try_put( m++ )), "Limiter node does not accept message while open." );
385     CHECK_MESSAGE( ( !limit.try_put( m )), "Limiter node's gate is not closed." );
386     g.wait_for_all();
387     int expected[] = {0, 2, 3, 4, 5, 6};
388     int actual = -1; m = 0;
389     while( queue.try_get(actual) )
390         CHECK_MESSAGE( actual == expected[m++], "" );
391     CHECK_MESSAGE( ( sizeof(expected) / sizeof(expected[0]) == m), "Not all messages have been processed." );
392     g.wait_for_all();
393 
394     const size_t threshold2 = size_t(-1);
395     tbb::flow::limiter_node<int, long long> limit2(g, threshold2);
396     make_edge(limit2, queue);
397     CHECK_MESSAGE( ( limit2.try_put( 1 )), "Newly constructed limiter node does not accept message." );
398     long long decrement_value = (long long)( size_t(-1)/2 );
399     CHECK_MESSAGE( limit2.decrement.try_put( -decrement_value ),
400                    "Limiter node decrementer's port does not accept message" );
401     CHECK_MESSAGE( ( limit2.try_put( 2 )), "Limiter's gate should not be closed yet." );
402     CHECK_MESSAGE( limit2.decrement.try_put( -decrement_value ),
403                    "Limiter node decrementer's port does not accept message" );
404     CHECK_MESSAGE( ( !limit2.try_put( 3 )), "Overflow happened for internal counter." );
405     int expected2[] = {1, 2};
406     actual = -1; m = 0;
407     while( queue.try_get(actual) )
408         CHECK_MESSAGE( actual == expected2[m++], "" );
409     CHECK_MESSAGE( ( sizeof(expected2) / sizeof(expected2[0]) == m), "Not all messages have been processed." );
410     g.wait_for_all();
411 }
412 
413 void test_try_put_without_successors() {
414     tbb::flow::graph g;
415     std::size_t try_put_num{3};
416     tbb::flow::buffer_node<int> bn(g);
417     tbb::flow::limiter_node<int> ln(g, try_put_num);
418     tbb::flow::make_edge(bn, ln);
419     std::size_t i = 1;
420     for (; i <= try_put_num; i++)
421         bn.try_put(i);
422 
423     std::atomic<std::size_t> counter{0};
424     tbb::flow::function_node<int, int> fn(g, tbb::flow::unlimited,
425         [&](int input) {
426             counter += input;
427             return int{};
428         }
429     );
430     tbb::flow::make_edge(ln, fn);
431     g.wait_for_all();
432     CHECK((counter == i * try_put_num / 2));
433 
434     // Check the lost message
435     tbb::flow::remove_edge(bn, ln);
436     ln.decrement.try_put(tbb::flow::continue_msg());
437     bn.try_put(try_put_num + 1);
438     g.wait_for_all();
439     CHECK((counter == i * try_put_num / 2));
440 
441 }
442 
443 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
444 #include <array>
445 #include <vector>
446 void test_follows_and_precedes_api() {
447     using msg_t = tbb::flow::continue_msg;
448 
449     std::array<msg_t, 3> messages_for_follows= { {msg_t(), msg_t(), msg_t()} };
450     std::vector<msg_t> messages_for_precedes = {msg_t()};
451 
452     follows_and_precedes_testing::test_follows
453         <msg_t, tbb::flow::limiter_node<msg_t, msg_t>>(messages_for_follows, 1000);
454     follows_and_precedes_testing::test_precedes
455         <msg_t, tbb::flow::limiter_node<msg_t, msg_t>>(messages_for_precedes, 1000);
456 
457 }
458 #endif
459 
460 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
461 void test_deduction_guides() {
462     using namespace tbb::flow;
463 
464     graph g;
465     broadcast_node<int> br(g);
466     limiter_node<int> l0(g, 100);
467 
468 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
469     limiter_node l1(follows(br), 100);
470     static_assert(std::is_same_v<decltype(l1), limiter_node<int>>);
471 
472     limiter_node l2(precedes(br), 100);
473     static_assert(std::is_same_v<decltype(l2), limiter_node<int>>);
474 #endif
475 
476     limiter_node l3(l0);
477     static_assert(std::is_same_v<decltype(l3), limiter_node<int>>);
478 }
479 #endif
480 
481 //! Test puts on limiter_node with decrements and varying parallelism levels
482 //! \brief \ref error_guessing
483 TEST_CASE("Serial and parallel tests") {
484     for (unsigned i = 1; i <= 2 * utils::MaxThread; ++i) {
485         tbb::task_arena arena(i);
486         arena.execute(
487             [i]() {
488                 test_serial<int>();
489                 test_parallel<int>(i);
490             }
491         );
492     }
493 }
494 
495 //! Test initial put of continue_msg on decrementer port does not stop message flow
496 //! \brief \ref error_guessing
497 TEST_CASE("Test continue_msg reception") {
498     test_continue_msg_reception();
499 }
500 
501 //! Test multifunction_node connected to limiter_node
502 //! \brief \ref error_guessing
503 TEST_CASE("Multifunction connected to limiter") {
504     test_multifunction_to_limiter(30,3);
505     test_multifunction_to_limiter(300,13);
506     test_multifunction_to_limiter(3000,1);
507 }
508 
509 //! Test message release if successor doesn't accept
510 //! \brief \ref requirement
511 TEST_CASE("Message is released if successor does not accept") {
512     test_reserve_release_messages();
513 }
514 
515 //! Test decrementer
516 //! \brief \ref requirement \ref error_guessing
517 TEST_CASE("Decrementer") {
518     test_decrementer();
519 }
520 
521 //! Test try_put() without successor
522 //! \brief \ref error_guessing
523 TEST_CASE("Test try_put() without successors") {
524     test_try_put_without_successors();
525 }
526 
527 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
528 //! Test follows and precedes API
529 //! \brief \ref error_guessing
530 TEST_CASE( "Support for follows and precedes API" ) {
531     test_follows_and_precedes_api();
532 }
533 #endif
534 
535 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
536 //! Test deduction guides
537 //! \brief \ref requirement
538 TEST_CASE( "Deduction guides" ) {
539     test_deduction_guides();
540 }
541 #endif
542