151c0b2f7Stbbdev /*
2b15aabb3Stbbdev     Copyright (c) 2020-2021 Intel Corporation
351c0b2f7Stbbdev 
451c0b2f7Stbbdev     Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev     you may not use this file except in compliance with the License.
651c0b2f7Stbbdev     You may obtain a copy of the License at
751c0b2f7Stbbdev 
851c0b2f7Stbbdev         http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev 
1051c0b2f7Stbbdev     Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev     distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev     See the License for the specific language governing permissions and
1451c0b2f7Stbbdev     limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev 
17b15aabb3Stbbdev #if __INTEL_COMPILER && _MSC_VER
18b15aabb3Stbbdev #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
19b15aabb3Stbbdev #endif
20b15aabb3Stbbdev 
2151c0b2f7Stbbdev #include "conformance_flowgraph.h"
2251c0b2f7Stbbdev 
2351c0b2f7Stbbdev //! \file conformance_limiter_node.cpp
2451c0b2f7Stbbdev //! \brief Test for [flow_graph.limiter_node] specification
2551c0b2f7Stbbdev 
26*de0109beSIlya Mishin using input_msg = conformance::message</*default_ctor*/true, /*copy_ctor*/true/*enable for queue_node successor*/, /*copy_assign*/true/*enable for queue_node successor*/>;
2751c0b2f7Stbbdev 
2851c0b2f7Stbbdev //! Test limiter_node limiting
2951c0b2f7Stbbdev //! \brief \ref requirement
3051c0b2f7Stbbdev TEST_CASE("limiter_node limiting"){
31*de0109beSIlya Mishin     oneapi::tbb::flow::graph g;
32*de0109beSIlya Mishin 
33*de0109beSIlya Mishin     constexpr int limit = 5;
34*de0109beSIlya Mishin     oneapi::tbb::flow::limiter_node<input_msg> node1(g, limit);
35*de0109beSIlya Mishin     conformance::test_push_receiver<input_msg> node2(g);
36*de0109beSIlya Mishin 
37*de0109beSIlya Mishin     oneapi::tbb::flow::make_edge(node1, node2);
38*de0109beSIlya Mishin 
39*de0109beSIlya Mishin     for(int i = 0; i < limit * 2; ++i)
40*de0109beSIlya Mishin         node1.try_put(input_msg(1));
41*de0109beSIlya Mishin     g.wait_for_all();
42*de0109beSIlya Mishin 
43*de0109beSIlya Mishin     CHECK_MESSAGE((conformance::get_values(node2).size() == limit), "Descendant of the node needs be receive limited number of messages");
4451c0b2f7Stbbdev }
4551c0b2f7Stbbdev 
46*de0109beSIlya Mishin //! Test node broadcast messages to successors
4751c0b2f7Stbbdev //! \brief \ref requirement
4851c0b2f7Stbbdev TEST_CASE("limiter_node broadcast"){
49*de0109beSIlya Mishin     conformance::test_forwarding<oneapi::tbb::flow::limiter_node<int>, int>(1, 5);
50*de0109beSIlya Mishin     conformance::test_forwarding<oneapi::tbb::flow::limiter_node<input_msg>, input_msg>(1, 5);
5151c0b2f7Stbbdev }
5251c0b2f7Stbbdev 
53*de0109beSIlya Mishin //! Test node not buffered unsuccessful message, and try_get after rejection should not succeed.
5451c0b2f7Stbbdev //! \brief \ref requirement
5551c0b2f7Stbbdev TEST_CASE("limiter_node buffering"){
56*de0109beSIlya Mishin     conformance::test_buffering<oneapi::tbb::flow::limiter_node<int>, int>(5);
57*de0109beSIlya Mishin     conformance::test_buffering<oneapi::tbb::flow::limiter_node<int, int>, int>(5);
5851c0b2f7Stbbdev }
5951c0b2f7Stbbdev 
60*de0109beSIlya Mishin //! The node that is constructed has a reference to the same graph object as src, has the same threshold.
61*de0109beSIlya Mishin //! The predecessors and successors of src are not copied.
6251c0b2f7Stbbdev //! \brief \ref interface
6351c0b2f7Stbbdev TEST_CASE("limiter_node copy constructor"){
64*de0109beSIlya Mishin     using namespace oneapi::tbb::flow;
65*de0109beSIlya Mishin     graph g;
66*de0109beSIlya Mishin 
67*de0109beSIlya Mishin     limiter_node<int> node0(g, 1);
68*de0109beSIlya Mishin     limiter_node<int> node1(g, 1);
69*de0109beSIlya Mishin     conformance::test_push_receiver<int> node2(g);
70*de0109beSIlya Mishin     conformance::test_push_receiver<int> node3(g);
71*de0109beSIlya Mishin 
72*de0109beSIlya Mishin     oneapi::tbb::flow::make_edge(node0, node1);
73*de0109beSIlya Mishin     oneapi::tbb::flow::make_edge(node1, node2);
74*de0109beSIlya Mishin 
75*de0109beSIlya Mishin     limiter_node<int> node_copy(node1);
76*de0109beSIlya Mishin 
77*de0109beSIlya Mishin     oneapi::tbb::flow::make_edge(node_copy, node3);
78*de0109beSIlya Mishin 
79*de0109beSIlya Mishin     node_copy.try_put(1);
80*de0109beSIlya Mishin     g.wait_for_all();
81*de0109beSIlya Mishin 
82*de0109beSIlya Mishin     CHECK_MESSAGE((conformance::get_values(node2).size() == 0 && conformance::get_values(node3).size() == 1), "Copied node doesn`t copy successor");
83*de0109beSIlya Mishin 
84*de0109beSIlya Mishin     node_copy.try_put(1);
85*de0109beSIlya Mishin     g.wait_for_all();
86*de0109beSIlya Mishin 
87*de0109beSIlya Mishin     CHECK_MESSAGE((conformance::get_values(node2).size() == 0 && conformance::get_values(node3).size() == 0), "Copied node copy threshold");
88*de0109beSIlya Mishin 
89*de0109beSIlya Mishin     node0.try_put(1);
90*de0109beSIlya Mishin     g.wait_for_all();
91*de0109beSIlya Mishin 
92*de0109beSIlya Mishin     CHECK_MESSAGE((conformance::get_values(node2).size() == 1 && conformance::get_values(node3).size() == 0), "Copied node doesn`t copy predecessor");
9351c0b2f7Stbbdev }
9451c0b2f7Stbbdev 
9551c0b2f7Stbbdev //! Test inheritance relations
9651c0b2f7Stbbdev //! \brief \ref interface
9751c0b2f7Stbbdev TEST_CASE("limiter_node superclasses"){
98*de0109beSIlya Mishin     conformance::test_inheritance<oneapi::tbb::flow::limiter_node<int>, int, int>();
99*de0109beSIlya Mishin     conformance::test_inheritance<oneapi::tbb::flow::limiter_node<float>, float, float>();
100*de0109beSIlya Mishin     conformance::test_inheritance<oneapi::tbb::flow::limiter_node<input_msg>, input_msg, input_msg>();
10151c0b2f7Stbbdev }
10251c0b2f7Stbbdev 
103*de0109beSIlya Mishin //! Test limiter_node decrementer
104*de0109beSIlya Mishin //! \brief \ref interface
105*de0109beSIlya Mishin TEST_CASE("limiter_node decrementer"){
106*de0109beSIlya Mishin     const int threshold = 5;
107*de0109beSIlya Mishin     oneapi::tbb::flow::graph g;
108*de0109beSIlya Mishin     oneapi::tbb::flow::limiter_node<int, int> limit(g, threshold);
109*de0109beSIlya Mishin     oneapi::tbb::flow::queue_node<int> queue(g);
110*de0109beSIlya Mishin     make_edge(limit, queue);
111*de0109beSIlya Mishin     int m = 0;
112*de0109beSIlya Mishin     CHECK_MESSAGE(( limit.try_put( m++ )), "Newly constructed limiter node does not accept message." );
113*de0109beSIlya Mishin     CHECK_MESSAGE(limit.decrementer().try_put( -threshold ), // close limiter's gate
114*de0109beSIlya Mishin                    "Limiter node decrementer's port does not accept message." );
115*de0109beSIlya Mishin     CHECK_MESSAGE(( !limit.try_put( m++ )), "Closed limiter node's accepts message." );
116*de0109beSIlya Mishin     CHECK_MESSAGE(limit.decrementer().try_put( threshold + 5 ),  // open limiter's gate
117*de0109beSIlya Mishin                    "Limiter node decrementer's port does not accept message." );
118*de0109beSIlya Mishin     for( int i = 0; i < threshold; ++i )
119*de0109beSIlya Mishin         CHECK_MESSAGE(( limit.try_put( m++ )), "Limiter node does not accept message while open." );
120*de0109beSIlya Mishin     CHECK_MESSAGE(( !limit.try_put( m )), "Limiter node's gate is not closed." );
121*de0109beSIlya Mishin     g.wait_for_all();
122*de0109beSIlya Mishin     int expected[] = {0, 2, 3, 4, 5, 6};
123*de0109beSIlya Mishin     int actual = -1; m = 0;
124*de0109beSIlya Mishin     while( queue.try_get(actual) )
125*de0109beSIlya Mishin         CHECK_MESSAGE(actual == expected[m++], "" );
126*de0109beSIlya Mishin     CHECK_MESSAGE(( sizeof(expected) / sizeof(expected[0]) == m), "Not all messages have been processed." );
127*de0109beSIlya Mishin     g.wait_for_all();
128*de0109beSIlya Mishin 
129*de0109beSIlya Mishin     const size_t threshold2 = size_t(-1);
130*de0109beSIlya Mishin     oneapi::tbb::flow::limiter_node<int, long long> limit2(g, threshold2);
131*de0109beSIlya Mishin     make_edge(limit2, queue);
132*de0109beSIlya Mishin     CHECK_MESSAGE(( limit2.try_put( 1 )), "Newly constructed limiter node does not accept message." );
133*de0109beSIlya Mishin     long long decrement_value = (long long)( size_t(-1)/2 );
134*de0109beSIlya Mishin     CHECK_MESSAGE(limit2.decrementer().try_put( -decrement_value ),
135*de0109beSIlya Mishin                    "Limiter node decrementer's port does not accept message" );
136*de0109beSIlya Mishin     CHECK_MESSAGE(( limit2.try_put( 2 )), "Limiter's gate should not be closed yet." );
137*de0109beSIlya Mishin     CHECK_MESSAGE(limit2.decrementer().try_put( -decrement_value ),
138*de0109beSIlya Mishin                    "Limiter node decrementer's port does not accept message" );
139*de0109beSIlya Mishin     CHECK_MESSAGE(( !limit2.try_put( 3 )), "Overflow happened for internal counter." );
140*de0109beSIlya Mishin     int expected2[] = {1, 2};
141*de0109beSIlya Mishin     actual = -1; m = 0;
142*de0109beSIlya Mishin     while( queue.try_get(actual) )
143*de0109beSIlya Mishin         CHECK_MESSAGE(actual == expected2[m++], "" );
144*de0109beSIlya Mishin     CHECK_MESSAGE(( sizeof(expected2) / sizeof(expected2[0]) == m), "Not all messages have been processed." );
145*de0109beSIlya Mishin     g.wait_for_all();
146*de0109beSIlya Mishin 
147*de0109beSIlya Mishin     const size_t threshold3 = 10;
148*de0109beSIlya Mishin     oneapi::tbb::flow::limiter_node<int, long long> limit3(g, threshold3);
149*de0109beSIlya Mishin     make_edge(limit3, queue);
150*de0109beSIlya Mishin     long long decrement_value3 = 3;
151*de0109beSIlya Mishin     CHECK_MESSAGE(limit3.decrementer().try_put( -decrement_value3 ),
152*de0109beSIlya Mishin                    "Limiter node decrementer's port does not accept message" );
153*de0109beSIlya Mishin 
154*de0109beSIlya Mishin     m = 0;
155*de0109beSIlya Mishin     while( limit3.try_put( m ) ){ m++; };
156*de0109beSIlya Mishin     CHECK_MESSAGE(m == threshold3 - decrement_value3, "Not all messages have been accepted." );
157*de0109beSIlya Mishin 
158*de0109beSIlya Mishin     actual = -1; m = 0;
159*de0109beSIlya Mishin     while( queue.try_get(actual) ){
160*de0109beSIlya Mishin         CHECK_MESSAGE(actual == m++, "Not all messages have been processed." );
161*de0109beSIlya Mishin     }
162*de0109beSIlya Mishin 
163*de0109beSIlya Mishin     g.wait_for_all();
164*de0109beSIlya Mishin     CHECK_MESSAGE(m == threshold3 - decrement_value3, "Not all messages have been processed." );
165*de0109beSIlya Mishin 
166*de0109beSIlya Mishin     const size_t threshold4 = 10;
167*de0109beSIlya Mishin     oneapi::tbb::flow::limiter_node<int> limit4(g, threshold4);
168*de0109beSIlya Mishin     make_edge(limit4, queue);
169*de0109beSIlya Mishin 
170*de0109beSIlya Mishin     limit4.try_put(-1);
171*de0109beSIlya Mishin     CHECK_MESSAGE(limit4.decrementer().try_put(oneapi::tbb::flow::continue_msg()),
172*de0109beSIlya Mishin                    "Limiter node decrementer's port does not accept continue_msg" );
173*de0109beSIlya Mishin 
174*de0109beSIlya Mishin     m = 0;
175*de0109beSIlya Mishin     while( limit4.try_put( m ) ){ m++; };
176*de0109beSIlya Mishin     CHECK_MESSAGE(m == threshold4, "Not all messages have been accepted." );
177*de0109beSIlya Mishin 
178*de0109beSIlya Mishin     actual = -1; m = -1;
179*de0109beSIlya Mishin     while( queue.try_get(actual) ){
180*de0109beSIlya Mishin         CHECK_MESSAGE(actual == m++, "Not all messages have been processed." );
181*de0109beSIlya Mishin     }
182*de0109beSIlya Mishin 
183*de0109beSIlya Mishin     g.wait_for_all();
184*de0109beSIlya Mishin }
185