1 /*
2     Copyright (c) 2020-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #if __INTEL_COMPILER && _MSC_VER
18 #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
19 #endif
20 
21 #include "conformance_flowgraph.h"
22 
23 //! \file conformance_limiter_node.cpp
24 //! \brief Test for [flow_graph.limiter_node] specification
25 
26 using input_msg = conformance::message</*default_ctor*/true, /*copy_ctor*/true/*enable for queue_node successor*/, /*copy_assign*/true/*enable for queue_node successor*/>;
27 
28 //! Test limiter_node limiting
29 //! \brief \ref requirement
30 TEST_CASE("limiter_node limiting"){
31     oneapi::tbb::flow::graph g;
32 
33     constexpr int limit = 5;
34     oneapi::tbb::flow::limiter_node<input_msg> node1(g, limit);
35     conformance::test_push_receiver<input_msg> node2(g);
36 
37     oneapi::tbb::flow::make_edge(node1, node2);
38 
39     for(int i = 0; i < limit * 2; ++i)
40         node1.try_put(input_msg(1));
41     g.wait_for_all();
42 
43     CHECK_MESSAGE((conformance::get_values(node2).size() == limit), "Descendant of the node needs be receive limited number of messages");
44 }
45 
46 //! Test node broadcast messages to successors
47 //! \brief \ref requirement
48 TEST_CASE("limiter_node broadcast"){
49     conformance::test_forwarding<oneapi::tbb::flow::limiter_node<int>, int>(1, 5);
50     conformance::test_forwarding<oneapi::tbb::flow::limiter_node<input_msg>, input_msg>(1, 5);
51 }
52 
53 //! Test node not buffered unsuccessful message, and try_get after rejection should not succeed.
54 //! \brief \ref requirement
55 TEST_CASE("limiter_node buffering"){
56     conformance::test_buffering<oneapi::tbb::flow::limiter_node<int>, int>(5);
57     conformance::test_buffering<oneapi::tbb::flow::limiter_node<int, int>, int>(5);
58 }
59 
60 //! The node that is constructed has a reference to the same graph object as src, has the same threshold.
61 //! The predecessors and successors of src are not copied.
62 //! \brief \ref interface
63 TEST_CASE("limiter_node copy constructor"){
64     using namespace oneapi::tbb::flow;
65     graph g;
66 
67     limiter_node<int> node0(g, 1);
68     limiter_node<int> node1(g, 1);
69     conformance::test_push_receiver<int> node2(g);
70     conformance::test_push_receiver<int> node3(g);
71 
72     oneapi::tbb::flow::make_edge(node0, node1);
73     oneapi::tbb::flow::make_edge(node1, node2);
74 
75     limiter_node<int> node_copy(node1);
76 
77     oneapi::tbb::flow::make_edge(node_copy, node3);
78 
79     node_copy.try_put(1);
80     g.wait_for_all();
81 
82     CHECK_MESSAGE((conformance::get_values(node2).size() == 0 && conformance::get_values(node3).size() == 1), "Copied node doesn`t copy successor");
83 
84     node_copy.try_put(1);
85     g.wait_for_all();
86 
87     CHECK_MESSAGE((conformance::get_values(node2).size() == 0 && conformance::get_values(node3).size() == 0), "Copied node copy threshold");
88 
89     node0.try_put(1);
90     g.wait_for_all();
91 
92     CHECK_MESSAGE((conformance::get_values(node2).size() == 1 && conformance::get_values(node3).size() == 0), "Copied node doesn`t copy predecessor");
93 }
94 
95 //! Test inheritance relations
96 //! \brief \ref interface
97 TEST_CASE("limiter_node superclasses"){
98     conformance::test_inheritance<oneapi::tbb::flow::limiter_node<int>, int, int>();
99     conformance::test_inheritance<oneapi::tbb::flow::limiter_node<float>, float, float>();
100     conformance::test_inheritance<oneapi::tbb::flow::limiter_node<input_msg>, input_msg, input_msg>();
101 }
102 
103 //! Test limiter_node decrementer
104 //! \brief \ref interface
105 TEST_CASE("limiter_node decrementer"){
106     const int threshold = 5;
107     oneapi::tbb::flow::graph g;
108     oneapi::tbb::flow::limiter_node<int, int> limit(g, threshold);
109     oneapi::tbb::flow::queue_node<int> queue(g);
110     make_edge(limit, queue);
111     int m = 0;
112     CHECK_MESSAGE(( limit.try_put( m++ )), "Newly constructed limiter node does not accept message." );
113     CHECK_MESSAGE(limit.decrementer().try_put( -threshold ), // close limiter's gate
114                    "Limiter node decrementer's port does not accept message." );
115     CHECK_MESSAGE(( !limit.try_put( m++ )), "Closed limiter node's accepts message." );
116     CHECK_MESSAGE(limit.decrementer().try_put( threshold + 5 ),  // open limiter's gate
117                    "Limiter node decrementer's port does not accept message." );
118     for( int i = 0; i < threshold; ++i )
119         CHECK_MESSAGE(( limit.try_put( m++ )), "Limiter node does not accept message while open." );
120     CHECK_MESSAGE(( !limit.try_put( m )), "Limiter node's gate is not closed." );
121     g.wait_for_all();
122     int expected[] = {0, 2, 3, 4, 5, 6};
123     int actual = -1; m = 0;
124     while( queue.try_get(actual) )
125         CHECK_MESSAGE(actual == expected[m++], "" );
126     CHECK_MESSAGE(( sizeof(expected) / sizeof(expected[0]) == m), "Not all messages have been processed." );
127     g.wait_for_all();
128 
129     const size_t threshold2 = size_t(-1);
130     oneapi::tbb::flow::limiter_node<int, long long> limit2(g, threshold2);
131     make_edge(limit2, queue);
132     CHECK_MESSAGE(( limit2.try_put( 1 )), "Newly constructed limiter node does not accept message." );
133     long long decrement_value = (long long)( size_t(-1)/2 );
134     CHECK_MESSAGE(limit2.decrementer().try_put( -decrement_value ),
135                    "Limiter node decrementer's port does not accept message" );
136     CHECK_MESSAGE(( limit2.try_put( 2 )), "Limiter's gate should not be closed yet." );
137     CHECK_MESSAGE(limit2.decrementer().try_put( -decrement_value ),
138                    "Limiter node decrementer's port does not accept message" );
139     CHECK_MESSAGE(( !limit2.try_put( 3 )), "Overflow happened for internal counter." );
140     int expected2[] = {1, 2};
141     actual = -1; m = 0;
142     while( queue.try_get(actual) )
143         CHECK_MESSAGE(actual == expected2[m++], "" );
144     CHECK_MESSAGE(( sizeof(expected2) / sizeof(expected2[0]) == m), "Not all messages have been processed." );
145     g.wait_for_all();
146 
147     const size_t threshold3 = 10;
148     oneapi::tbb::flow::limiter_node<int, long long> limit3(g, threshold3);
149     make_edge(limit3, queue);
150     long long decrement_value3 = 3;
151     CHECK_MESSAGE(limit3.decrementer().try_put( -decrement_value3 ),
152                    "Limiter node decrementer's port does not accept message" );
153 
154     m = 0;
155     while( limit3.try_put( m ) ){ m++; };
156     CHECK_MESSAGE(m == threshold3 - decrement_value3, "Not all messages have been accepted." );
157 
158     actual = -1; m = 0;
159     while( queue.try_get(actual) ){
160         CHECK_MESSAGE(actual == m++, "Not all messages have been processed." );
161     }
162 
163     g.wait_for_all();
164     CHECK_MESSAGE(m == threshold3 - decrement_value3, "Not all messages have been processed." );
165 
166     const size_t threshold4 = 10;
167     oneapi::tbb::flow::limiter_node<int> limit4(g, threshold4);
168     make_edge(limit4, queue);
169 
170     limit4.try_put(-1);
171     CHECK_MESSAGE(limit4.decrementer().try_put(oneapi::tbb::flow::continue_msg()),
172                    "Limiter node decrementer's port does not accept continue_msg" );
173 
174     m = 0;
175     while( limit4.try_put( m ) ){ m++; };
176     CHECK_MESSAGE(m == threshold4, "Not all messages have been accepted." );
177 
178     actual = -1; m = -1;
179     while( queue.try_get(actual) ){
180         CHECK_MESSAGE(actual == m++, "Not all messages have been processed." );
181     }
182 
183     g.wait_for_all();
184 }
185