xref: /oneTBB/test/tbb/test_limiter_node.cpp (revision b15aabb3)
1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #if __INTEL_COMPILER && _MSC_VER
18 #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
19 #endif
20 
21 #include "common/config.h"
22 
23 #include "tbb/flow_graph.h"
24 
25 #include "common/test.h"
26 #include "common/utils.h"
27 #include "common/utils_assert.h"
28 #include "common/test_follows_and_precedes_api.h"
29 
30 #include <atomic>
31 
32 
33 //! \file test_limiter_node.cpp
34 //! \brief Test for [flow_graph.limiter_node] specification
35 
36 
37 const int L = 10;
38 const int N = 1000;
39 
40 using tbb::detail::d1::SUCCESSFULLY_ENQUEUED;
41 using tbb::detail::d1::graph_task;
42 
43 template< typename T >
44 struct serial_receiver : public tbb::flow::receiver<T>, utils::NoAssign {
45    T next_value;
46    tbb::flow::graph& my_graph;
47 
48    serial_receiver(tbb::flow::graph& g) : next_value(T(0)), my_graph(g) {}
49 
50    graph_task* try_put_task( const T &v ) override {
51        CHECK_MESSAGE( next_value++  == v, "" );
52        return const_cast<graph_task*>(SUCCESSFULLY_ENQUEUED);
53    }
54 
55     tbb::flow::graph& graph_reference() const override {
56         return my_graph;
57     }
58 };
59 
60 template< typename T >
61 struct parallel_receiver : public tbb::flow::receiver<T>, utils::NoAssign {
62 
63     std::atomic<int> my_count;
64     tbb::flow::graph& my_graph;
65 
66     parallel_receiver(tbb::flow::graph& g) : my_graph(g) { my_count = 0; }
67 
68     graph_task* try_put_task( const T &/*v*/ ) override {
69        ++my_count;
70        return const_cast<graph_task*>(SUCCESSFULLY_ENQUEUED);
71     }
72 
73     tbb::flow::graph& graph_reference() const override {
74         return my_graph;
75     }
76 };
77 
78 template< typename T >
79 struct empty_sender : public tbb::flow::sender<T> {
80         typedef typename tbb::flow::sender<T>::successor_type successor_type;
81 
82         bool register_successor( successor_type & ) override { return false; }
83         bool remove_successor( successor_type & ) override { return false; }
84 };
85 
86 
87 template< typename T >
88 struct put_body : utils::NoAssign {
89 
90     tbb::flow::limiter_node<T> &my_lim;
91     std::atomic<int> &my_accept_count;
92 
93     put_body( tbb::flow::limiter_node<T> &lim, std::atomic<int> &accept_count ) :
94         my_lim(lim), my_accept_count(accept_count) {}
95 
96     void operator()( int ) const {
97         for ( int i = 0; i < L; ++i ) {
98             bool msg = my_lim.try_put( T(i) );
99             if ( msg == true )
100                ++my_accept_count;
101         }
102     }
103 };
104 
105 template< typename T >
106 struct put_dec_body : utils::NoAssign {
107 
108     tbb::flow::limiter_node<T> &my_lim;
109     std::atomic<int> &my_accept_count;
110 
111     put_dec_body( tbb::flow::limiter_node<T> &lim, std::atomic<int> &accept_count ) :
112         my_lim(lim), my_accept_count(accept_count) {}
113 
114     void operator()( int ) const {
115         int local_accept_count = 0;
116         while ( local_accept_count < N ) {
117             bool msg = my_lim.try_put( T(local_accept_count) );
118             if ( msg == true ) {
119                 ++local_accept_count;
120                 ++my_accept_count;
121                 my_lim.decrementer().try_put( tbb::flow::continue_msg() );
122             }
123         }
124     }
125 
126 };
127 
128 template< typename T >
129 void test_puts_with_decrements( int num_threads, tbb::flow::limiter_node< T >& lim , tbb::flow::graph& g) {
130     parallel_receiver<T> r(g);
131     empty_sender< tbb::flow::continue_msg > s;
132     std::atomic<int> accept_count;
133     accept_count = 0;
134     tbb::flow::make_edge( lim, r );
135     tbb::flow::make_edge(s, lim.decrementer());
136 
137     // test puts with decrements
138     utils::NativeParallelFor( num_threads, put_dec_body<T>(lim, accept_count) );
139     int c = accept_count;
140     CHECK_MESSAGE( c == N*num_threads, "" );
141     CHECK_MESSAGE( r.my_count == N*num_threads, "" );
142 }
143 
144 //
145 // Tests
146 //
147 // limiter only forwards below the limit, multiple parallel senders / single receiver
148 // multiple parallel senders that put to decrement at each accept, limiter accepts new messages
149 //
150 //
151 template< typename T >
152 int test_parallel(int num_threads) {
153 
154    // test puts with no decrements
155    for ( int i = 0; i < L; ++i ) {
156        tbb::flow::graph g;
157        tbb::flow::limiter_node< T > lim(g, i);
158        parallel_receiver<T> r(g);
159        std::atomic<int> accept_count;
160        accept_count = 0;
161        tbb::flow::make_edge( lim, r );
162        // test puts with no decrements
163        utils::NativeParallelFor( num_threads, put_body<T>(lim, accept_count) );
164        g.wait_for_all();
165        int c = accept_count;
166        CHECK_MESSAGE( c == i, "" );
167    }
168 
169    // test puts with decrements
170    for ( int i = 1; i < L; ++i ) {
171        tbb::flow::graph g;
172        tbb::flow::limiter_node< T > lim(g, i);
173        test_puts_with_decrements(num_threads, lim, g);
174        tbb::flow::limiter_node< T > lim_copy( lim );
175        test_puts_with_decrements(num_threads, lim_copy, g);
176    }
177 
178    return 0;
179 }
180 
181 //
182 // Tests
183 //
184 // limiter only forwards below the limit, single sender / single receiver
185 // at reject, a put to decrement, will cause next message to be accepted
186 //
187 template< typename T >
188 int test_serial() {
189 
190    // test puts with no decrements
191    for ( int i = 0; i < L; ++i ) {
192        tbb::flow::graph g;
193        tbb::flow::limiter_node< T > lim(g, i);
194        serial_receiver<T> r(g);
195        tbb::flow::make_edge( lim, r );
196        for ( int j = 0; j < L; ++j ) {
197            bool msg = lim.try_put( T(j) );
198            CHECK_MESSAGE( (( j < i && msg == true ) || ( j >= i && msg == false )), "" );
199        }
200        g.wait_for_all();
201    }
202 
203    // test puts with decrements
204    for ( int i = 1; i < L; ++i ) {
205        tbb::flow::graph g;
206        tbb::flow::limiter_node< T > lim(g, i);
207        serial_receiver<T> r(g);
208        empty_sender< tbb::flow::continue_msg > s;
209        tbb::flow::make_edge( lim, r );
210        tbb::flow::make_edge(s, lim.decrementer());
211        for ( int j = 0; j < N; ++j ) {
212            bool msg = lim.try_put( T(j) );
213            CHECK_MESSAGE( (( j < i && msg == true ) || ( j >= i && msg == false )), "" );
214            if ( msg == false ) {
215                lim.decrementer().try_put( tbb::flow::continue_msg() );
216                msg = lim.try_put( T(j) );
217                CHECK_MESSAGE( msg == true, "" );
218            }
219        }
220    }
221    return 0;
222 }
223 
224 // reported bug in limiter (http://software.intel.com/en-us/comment/1752355)
225 #define DECREMENT_OUTPUT 1  // the port number of the decrement output of the multifunction_node
226 #define LIMITER_OUTPUT 0    // port number of the integer output
227 
228 typedef tbb::flow::multifunction_node<int, std::tuple<int,tbb::flow::continue_msg> > mfnode_type;
229 
230 std::atomic<size_t> emit_count;
231 std::atomic<size_t> emit_sum;
232 std::atomic<size_t> receive_count;
233 std::atomic<size_t> receive_sum;
234 
235 struct mfnode_body {
236     int max_cnt;
237     std::atomic<int>* my_cnt;
238     mfnode_body(const int& _max, std::atomic<int> &_my) : max_cnt(_max), my_cnt(&_my)  { }
239     void operator()(const int &/*in*/, mfnode_type::output_ports_type &out) {
240         int lcnt = ++(*my_cnt);
241         if(lcnt > max_cnt) {
242             return;
243         }
244         // put one continue_msg to the decrement of the limiter.
245         if(!std::get<DECREMENT_OUTPUT>(out).try_put(tbb::flow::continue_msg())) {
246             CHECK_MESSAGE( (false),"Unexpected rejection of decrement");
247         }
248         {
249             // put messages to the input of the limiter_node until it rejects.
250             while( std::get<LIMITER_OUTPUT>(out).try_put(lcnt) ) {
251                 emit_sum += lcnt;
252                 ++emit_count;
253             }
254         }
255     }
256 };
257 
258 struct fn_body {
259     int operator()(const int &in) {
260         receive_sum += in;
261         ++receive_count;
262         return in;
263     }
264 };
265 
266 //                   +------------+
267 //    +---------+    |            v
268 //    | mf_node |0---+       +----------+          +----------+
269 // +->|         |1---------->| lim_node |--------->| fn_node  |--+
270 // |  +---------+            +----------+          +----------+  |
271 // |                                                             |
272 // |                                                             |
273 // +-------------------------------------------------------------+
274 //
275 void
276 test_multifunction_to_limiter(int _max, int _nparallel) {
277     tbb::flow::graph g;
278     emit_count = 0;
279     emit_sum = 0;
280     receive_count = 0;
281     receive_sum = 0;
282     std::atomic<int> local_cnt;
283     local_cnt = 0;
284     mfnode_type mf_node(g, tbb::flow::unlimited, mfnode_body(_max, local_cnt));
285     tbb::flow::function_node<int, int> fn_node(g, tbb::flow::unlimited, fn_body());
286     tbb::flow::limiter_node<int> lim_node(g, _nparallel);
287     tbb::flow::make_edge(tbb::flow::output_port<LIMITER_OUTPUT>(mf_node), lim_node);
288     tbb::flow::make_edge(tbb::flow::output_port<DECREMENT_OUTPUT>(mf_node), lim_node.decrementer());
289     tbb::flow::make_edge(lim_node, fn_node);
290     tbb::flow::make_edge(fn_node, mf_node);
291 
292     mf_node.try_put(1);
293     g.wait_for_all();
294     CHECK_MESSAGE( (emit_count == receive_count), "counts do not match");
295     CHECK_MESSAGE( (emit_sum == receive_sum), "sums do not match");
296 
297     // reset, test again
298     g.reset();
299     emit_count = 0;
300     emit_sum = 0;
301     receive_count = 0;
302     receive_sum = 0;
303     local_cnt = 0;;
304     mf_node.try_put(1);
305     g.wait_for_all();
306     CHECK_MESSAGE( (emit_count == receive_count), "counts do not match");
307     CHECK_MESSAGE( (emit_sum == receive_sum), "sums do not match");
308 }
309 
310 
311 void
312 test_continue_msg_reception() {
313     tbb::flow::graph g;
314     tbb::flow::limiter_node<int> ln(g,2);
315     tbb::flow::queue_node<int>   qn(g);
316     tbb::flow::make_edge(ln, qn);
317     ln.decrementer().try_put(tbb::flow::continue_msg());
318     ln.try_put(42);
319     g.wait_for_all();
320     int outint;
321     CHECK_MESSAGE( (qn.try_get(outint) && outint == 42), "initial put to decrement stops node");
322 }
323 
324 
325 //
326 // This test ascertains that if a message is not successfully put
327 // to a successor, the message is not dropped but released.
328 //
329 
330 void test_reserve_release_messages() {
331     using namespace tbb::flow;
332     graph g;
333 
334     //making two queue_nodes: one broadcast_node and one limiter_node
335     queue_node<int> input_queue(g);
336     queue_node<int> output_queue(g);
337     broadcast_node<int> broad(g);
338     limiter_node<int, int> limit(g,2); //threshold of 2
339 
340     //edges
341     make_edge(input_queue, limit);
342     make_edge(limit, output_queue);
343     make_edge(broad,limit.decrementer());
344 
345     int list[4] = {19, 33, 72, 98}; //list to be put to the input queue
346 
347     input_queue.try_put(list[0]); // succeeds
348     input_queue.try_put(list[1]); // succeeds
349     input_queue.try_put(list[2]); // fails, stored in upstream buffer
350     g.wait_for_all();
351 
352     remove_edge(limit, output_queue); //remove successor
353 
354     //sending message to the decrement port of the limiter
355     broad.try_put(1); //failed message retrieved.
356     g.wait_for_all();
357 
358     make_edge(limit, output_queue); //putting the successor back
359 
360     broad.try_put(1);  //drop the count
361 
362     input_queue.try_put(list[3]);  //success
363     g.wait_for_all();
364 
365     int var=0;
366 
367     for (int i=0; i<4; i++) {
368         output_queue.try_get(var);
369         CHECK_MESSAGE( (var==list[i]), "some data dropped, input does not match output");
370         g.wait_for_all();
371     }
372 }
373 
374 void test_decrementer() {
375     const int threshold = 5;
376     tbb::flow::graph g;
377     tbb::flow::limiter_node<int, int> limit(g, threshold);
378     tbb::flow::queue_node<int> queue(g);
379     make_edge(limit, queue);
380     int m = 0;
381     CHECK_MESSAGE( ( limit.try_put( m++ )), "Newly constructed limiter node does not accept message." );
382     CHECK_MESSAGE( limit.decrementer().try_put( -threshold ), // close limiter's gate
383                    "Limiter node decrementer's port does not accept message." );
384     CHECK_MESSAGE( ( !limit.try_put( m++ )), "Closed limiter node's accepts message." );
385     CHECK_MESSAGE( limit.decrementer().try_put( threshold + 5 ),  // open limiter's gate
386                    "Limiter node decrementer's port does not accept message." );
387     for( int i = 0; i < threshold; ++i )
388         CHECK_MESSAGE( ( limit.try_put( m++ )), "Limiter node does not accept message while open." );
389     CHECK_MESSAGE( ( !limit.try_put( m )), "Limiter node's gate is not closed." );
390     g.wait_for_all();
391     int expected[] = {0, 2, 3, 4, 5, 6};
392     int actual = -1; m = 0;
393     while( queue.try_get(actual) )
394         CHECK_MESSAGE( actual == expected[m++], "" );
395     CHECK_MESSAGE( ( sizeof(expected) / sizeof(expected[0]) == m), "Not all messages have been processed." );
396     g.wait_for_all();
397 
398     const size_t threshold2 = size_t(-1);
399     tbb::flow::limiter_node<int, long long> limit2(g, threshold2);
400     make_edge(limit2, queue);
401     CHECK_MESSAGE( ( limit2.try_put( 1 )), "Newly constructed limiter node does not accept message." );
402     long long decrement_value = (long long)( size_t(-1)/2 );
403     CHECK_MESSAGE( limit2.decrementer().try_put( -decrement_value ),
404                    "Limiter node decrementer's port does not accept message" );
405     CHECK_MESSAGE( ( limit2.try_put( 2 )), "Limiter's gate should not be closed yet." );
406     CHECK_MESSAGE( limit2.decrementer().try_put( -decrement_value ),
407                    "Limiter node decrementer's port does not accept message" );
408     CHECK_MESSAGE( ( !limit2.try_put( 3 )), "Overflow happened for internal counter." );
409     int expected2[] = {1, 2};
410     actual = -1; m = 0;
411     while( queue.try_get(actual) )
412         CHECK_MESSAGE( actual == expected2[m++], "" );
413     CHECK_MESSAGE( ( sizeof(expected2) / sizeof(expected2[0]) == m), "Not all messages have been processed." );
414     g.wait_for_all();
415 }
416 
417 void test_try_put_without_successors() {
418     tbb::flow::graph g;
419     std::size_t try_put_num{3};
420     tbb::flow::buffer_node<int> bn(g);
421     tbb::flow::limiter_node<int> ln(g, try_put_num);
422     tbb::flow::make_edge(bn, ln);
423     std::size_t i = 1;
424     for (; i <= try_put_num; i++)
425         bn.try_put(i);
426 
427     std::atomic<std::size_t> counter{0};
428     tbb::flow::function_node<int, int> fn(g, tbb::flow::unlimited,
429         [&](int input) {
430             counter += input;
431             return int{};
432         }
433     );
434     tbb::flow::make_edge(ln, fn);
435     g.wait_for_all();
436     CHECK((counter == i * try_put_num / 2));
437 
438     // Check the lost message
439     tbb::flow::remove_edge(bn, ln);
440     ln.decrementer().try_put(tbb::flow::continue_msg());
441     bn.try_put(try_put_num + 1);
442     g.wait_for_all();
443     CHECK((counter == i * try_put_num / 2));
444 
445 }
446 
447 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
448 #include <array>
449 #include <vector>
450 void test_follows_and_precedes_api() {
451     using msg_t = tbb::flow::continue_msg;
452 
453     std::array<msg_t, 3> messages_for_follows= { {msg_t(), msg_t(), msg_t()} };
454     std::vector<msg_t> messages_for_precedes = {msg_t()};
455 
456     follows_and_precedes_testing::test_follows
457         <msg_t, tbb::flow::limiter_node<msg_t, msg_t>>(messages_for_follows, 1000);
458     follows_and_precedes_testing::test_precedes
459         <msg_t, tbb::flow::limiter_node<msg_t, msg_t>>(messages_for_precedes, 1000);
460 
461 }
462 #endif
463 
464 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
465 void test_deduction_guides() {
466     using namespace tbb::flow;
467 
468     graph g;
469     broadcast_node<int> br(g);
470     limiter_node<int> l0(g, 100);
471 
472 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
473     limiter_node l1(follows(br), 100);
474     static_assert(std::is_same_v<decltype(l1), limiter_node<int>>);
475 
476     limiter_node l2(precedes(br), 100);
477     static_assert(std::is_same_v<decltype(l2), limiter_node<int>>);
478 #endif
479 
480     limiter_node l3(l0);
481     static_assert(std::is_same_v<decltype(l3), limiter_node<int>>);
482 }
483 #endif
484 
485 //! Test puts on limiter_node with decrements and varying parallelism levels
486 //! \brief \ref error_guessing
487 TEST_CASE("Serial and parallel tests") {
488     for (unsigned i = 1; i <= 2 * utils::MaxThread; ++i) {
489         tbb::task_arena arena(i);
490         arena.execute(
491             [i]() {
492                 test_serial<int>();
493                 test_parallel<int>(i);
494             }
495         );
496     }
497 }
498 
499 //! Test initial put of continue_msg on decrementer port does not stop message flow
500 //! \brief \ref error_guessing
501 TEST_CASE("Test continue_msg reception") {
502     test_continue_msg_reception();
503 }
504 
505 //! Test multifunction_node connected to limiter_node
506 //! \brief \ref error_guessing
507 TEST_CASE("Multifunction connected to limiter") {
508     test_multifunction_to_limiter(30,3);
509     test_multifunction_to_limiter(300,13);
510     test_multifunction_to_limiter(3000,1);
511 }
512 
513 //! Test message release if successor doesn't accept
514 //! \brief \ref requirement
515 TEST_CASE("Message is released if successor does not accept") {
516     test_reserve_release_messages();
517 }
518 
519 //! Test decrementer
520 //! \brief \ref requirement \ref error_guessing
521 TEST_CASE("Decrementer") {
522     test_decrementer();
523 }
524 
525 //! Test try_put() without successor
526 //! \brief \ref error_guessing
527 TEST_CASE("Test try_put() without successors") {
528     test_try_put_without_successors();
529 }
530 
531 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
532 //! Test follows and precedes API
533 //! \brief \ref error_guessing
534 TEST_CASE( "Support for follows and precedes API" ) {
535     test_follows_and_precedes_api();
536 }
537 #endif
538 
539 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
540 //! Test deduction guides
541 //! \brief \ref requirement
542 TEST_CASE( "Deduction guides" ) {
543     test_deduction_guides();
544 }
545 #endif
546