151c0b2f7Stbbdev /* 2b15aabb3Stbbdev Copyright (c) 2020-2021 Intel Corporation 351c0b2f7Stbbdev 451c0b2f7Stbbdev Licensed under the Apache License, Version 2.0 (the "License"); 551c0b2f7Stbbdev you may not use this file except in compliance with the License. 651c0b2f7Stbbdev You may obtain a copy of the License at 751c0b2f7Stbbdev 851c0b2f7Stbbdev http://www.apache.org/licenses/LICENSE-2.0 951c0b2f7Stbbdev 1051c0b2f7Stbbdev Unless required by applicable law or agreed to in writing, software 1151c0b2f7Stbbdev distributed under the License is distributed on an "AS IS" BASIS, 1251c0b2f7Stbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 1351c0b2f7Stbbdev See the License for the specific language governing permissions and 1451c0b2f7Stbbdev limitations under the License. 1551c0b2f7Stbbdev */ 1651c0b2f7Stbbdev 17b15aabb3Stbbdev #if __INTEL_COMPILER && _MSC_VER 18b15aabb3Stbbdev #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated 19b15aabb3Stbbdev #endif 20b15aabb3Stbbdev 2151c0b2f7Stbbdev #include "conformance_flowgraph.h" 2251c0b2f7Stbbdev 2351c0b2f7Stbbdev //! \file conformance_limiter_node.cpp 2451c0b2f7Stbbdev //! \brief Test for [flow_graph.limiter_node] specification 2551c0b2f7Stbbdev 26*de0109beSIlya Mishin using input_msg = conformance::message</*default_ctor*/true, /*copy_ctor*/true/*enable for queue_node successor*/, /*copy_assign*/true/*enable for queue_node successor*/>; 2751c0b2f7Stbbdev 2851c0b2f7Stbbdev //! Test limiter_node limiting 2951c0b2f7Stbbdev //! \brief \ref requirement 3051c0b2f7Stbbdev TEST_CASE("limiter_node limiting"){ 31*de0109beSIlya Mishin oneapi::tbb::flow::graph g; 32*de0109beSIlya Mishin 33*de0109beSIlya Mishin constexpr int limit = 5; 34*de0109beSIlya Mishin oneapi::tbb::flow::limiter_node<input_msg> node1(g, limit); 35*de0109beSIlya Mishin conformance::test_push_receiver<input_msg> node2(g); 36*de0109beSIlya Mishin 37*de0109beSIlya Mishin oneapi::tbb::flow::make_edge(node1, node2); 38*de0109beSIlya Mishin 39*de0109beSIlya Mishin for(int i = 0; i < limit * 2; ++i) 40*de0109beSIlya Mishin node1.try_put(input_msg(1)); 41*de0109beSIlya Mishin g.wait_for_all(); 42*de0109beSIlya Mishin 43*de0109beSIlya Mishin CHECK_MESSAGE((conformance::get_values(node2).size() == limit), "Descendant of the node needs be receive limited number of messages"); 4451c0b2f7Stbbdev } 4551c0b2f7Stbbdev 46*de0109beSIlya Mishin //! Test node broadcast messages to successors 4751c0b2f7Stbbdev //! \brief \ref requirement 4851c0b2f7Stbbdev TEST_CASE("limiter_node broadcast"){ 49*de0109beSIlya Mishin conformance::test_forwarding<oneapi::tbb::flow::limiter_node<int>, int>(1, 5); 50*de0109beSIlya Mishin conformance::test_forwarding<oneapi::tbb::flow::limiter_node<input_msg>, input_msg>(1, 5); 5151c0b2f7Stbbdev } 5251c0b2f7Stbbdev 53*de0109beSIlya Mishin //! Test node not buffered unsuccessful message, and try_get after rejection should not succeed. 5451c0b2f7Stbbdev //! \brief \ref requirement 5551c0b2f7Stbbdev TEST_CASE("limiter_node buffering"){ 56*de0109beSIlya Mishin conformance::test_buffering<oneapi::tbb::flow::limiter_node<int>, int>(5); 57*de0109beSIlya Mishin conformance::test_buffering<oneapi::tbb::flow::limiter_node<int, int>, int>(5); 5851c0b2f7Stbbdev } 5951c0b2f7Stbbdev 60*de0109beSIlya Mishin //! The node that is constructed has a reference to the same graph object as src, has the same threshold. 61*de0109beSIlya Mishin //! The predecessors and successors of src are not copied. 6251c0b2f7Stbbdev //! \brief \ref interface 6351c0b2f7Stbbdev TEST_CASE("limiter_node copy constructor"){ 64*de0109beSIlya Mishin using namespace oneapi::tbb::flow; 65*de0109beSIlya Mishin graph g; 66*de0109beSIlya Mishin 67*de0109beSIlya Mishin limiter_node<int> node0(g, 1); 68*de0109beSIlya Mishin limiter_node<int> node1(g, 1); 69*de0109beSIlya Mishin conformance::test_push_receiver<int> node2(g); 70*de0109beSIlya Mishin conformance::test_push_receiver<int> node3(g); 71*de0109beSIlya Mishin 72*de0109beSIlya Mishin oneapi::tbb::flow::make_edge(node0, node1); 73*de0109beSIlya Mishin oneapi::tbb::flow::make_edge(node1, node2); 74*de0109beSIlya Mishin 75*de0109beSIlya Mishin limiter_node<int> node_copy(node1); 76*de0109beSIlya Mishin 77*de0109beSIlya Mishin oneapi::tbb::flow::make_edge(node_copy, node3); 78*de0109beSIlya Mishin 79*de0109beSIlya Mishin node_copy.try_put(1); 80*de0109beSIlya Mishin g.wait_for_all(); 81*de0109beSIlya Mishin 82*de0109beSIlya Mishin CHECK_MESSAGE((conformance::get_values(node2).size() == 0 && conformance::get_values(node3).size() == 1), "Copied node doesn`t copy successor"); 83*de0109beSIlya Mishin 84*de0109beSIlya Mishin node_copy.try_put(1); 85*de0109beSIlya Mishin g.wait_for_all(); 86*de0109beSIlya Mishin 87*de0109beSIlya Mishin CHECK_MESSAGE((conformance::get_values(node2).size() == 0 && conformance::get_values(node3).size() == 0), "Copied node copy threshold"); 88*de0109beSIlya Mishin 89*de0109beSIlya Mishin node0.try_put(1); 90*de0109beSIlya Mishin g.wait_for_all(); 91*de0109beSIlya Mishin 92*de0109beSIlya Mishin CHECK_MESSAGE((conformance::get_values(node2).size() == 1 && conformance::get_values(node3).size() == 0), "Copied node doesn`t copy predecessor"); 9351c0b2f7Stbbdev } 9451c0b2f7Stbbdev 9551c0b2f7Stbbdev //! Test inheritance relations 9651c0b2f7Stbbdev //! \brief \ref interface 9751c0b2f7Stbbdev TEST_CASE("limiter_node superclasses"){ 98*de0109beSIlya Mishin conformance::test_inheritance<oneapi::tbb::flow::limiter_node<int>, int, int>(); 99*de0109beSIlya Mishin conformance::test_inheritance<oneapi::tbb::flow::limiter_node<float>, float, float>(); 100*de0109beSIlya Mishin conformance::test_inheritance<oneapi::tbb::flow::limiter_node<input_msg>, input_msg, input_msg>(); 10151c0b2f7Stbbdev } 10251c0b2f7Stbbdev 103*de0109beSIlya Mishin //! Test limiter_node decrementer 104*de0109beSIlya Mishin //! \brief \ref interface 105*de0109beSIlya Mishin TEST_CASE("limiter_node decrementer"){ 106*de0109beSIlya Mishin const int threshold = 5; 107*de0109beSIlya Mishin oneapi::tbb::flow::graph g; 108*de0109beSIlya Mishin oneapi::tbb::flow::limiter_node<int, int> limit(g, threshold); 109*de0109beSIlya Mishin oneapi::tbb::flow::queue_node<int> queue(g); 110*de0109beSIlya Mishin make_edge(limit, queue); 111*de0109beSIlya Mishin int m = 0; 112*de0109beSIlya Mishin CHECK_MESSAGE(( limit.try_put( m++ )), "Newly constructed limiter node does not accept message." ); 113*de0109beSIlya Mishin CHECK_MESSAGE(limit.decrementer().try_put( -threshold ), // close limiter's gate 114*de0109beSIlya Mishin "Limiter node decrementer's port does not accept message." ); 115*de0109beSIlya Mishin CHECK_MESSAGE(( !limit.try_put( m++ )), "Closed limiter node's accepts message." ); 116*de0109beSIlya Mishin CHECK_MESSAGE(limit.decrementer().try_put( threshold + 5 ), // open limiter's gate 117*de0109beSIlya Mishin "Limiter node decrementer's port does not accept message." ); 118*de0109beSIlya Mishin for( int i = 0; i < threshold; ++i ) 119*de0109beSIlya Mishin CHECK_MESSAGE(( limit.try_put( m++ )), "Limiter node does not accept message while open." ); 120*de0109beSIlya Mishin CHECK_MESSAGE(( !limit.try_put( m )), "Limiter node's gate is not closed." ); 121*de0109beSIlya Mishin g.wait_for_all(); 122*de0109beSIlya Mishin int expected[] = {0, 2, 3, 4, 5, 6}; 123*de0109beSIlya Mishin int actual = -1; m = 0; 124*de0109beSIlya Mishin while( queue.try_get(actual) ) 125*de0109beSIlya Mishin CHECK_MESSAGE(actual == expected[m++], "" ); 126*de0109beSIlya Mishin CHECK_MESSAGE(( sizeof(expected) / sizeof(expected[0]) == m), "Not all messages have been processed." ); 127*de0109beSIlya Mishin g.wait_for_all(); 128*de0109beSIlya Mishin 129*de0109beSIlya Mishin const size_t threshold2 = size_t(-1); 130*de0109beSIlya Mishin oneapi::tbb::flow::limiter_node<int, long long> limit2(g, threshold2); 131*de0109beSIlya Mishin make_edge(limit2, queue); 132*de0109beSIlya Mishin CHECK_MESSAGE(( limit2.try_put( 1 )), "Newly constructed limiter node does not accept message." ); 133*de0109beSIlya Mishin long long decrement_value = (long long)( size_t(-1)/2 ); 134*de0109beSIlya Mishin CHECK_MESSAGE(limit2.decrementer().try_put( -decrement_value ), 135*de0109beSIlya Mishin "Limiter node decrementer's port does not accept message" ); 136*de0109beSIlya Mishin CHECK_MESSAGE(( limit2.try_put( 2 )), "Limiter's gate should not be closed yet." ); 137*de0109beSIlya Mishin CHECK_MESSAGE(limit2.decrementer().try_put( -decrement_value ), 138*de0109beSIlya Mishin "Limiter node decrementer's port does not accept message" ); 139*de0109beSIlya Mishin CHECK_MESSAGE(( !limit2.try_put( 3 )), "Overflow happened for internal counter." ); 140*de0109beSIlya Mishin int expected2[] = {1, 2}; 141*de0109beSIlya Mishin actual = -1; m = 0; 142*de0109beSIlya Mishin while( queue.try_get(actual) ) 143*de0109beSIlya Mishin CHECK_MESSAGE(actual == expected2[m++], "" ); 144*de0109beSIlya Mishin CHECK_MESSAGE(( sizeof(expected2) / sizeof(expected2[0]) == m), "Not all messages have been processed." ); 145*de0109beSIlya Mishin g.wait_for_all(); 146*de0109beSIlya Mishin 147*de0109beSIlya Mishin const size_t threshold3 = 10; 148*de0109beSIlya Mishin oneapi::tbb::flow::limiter_node<int, long long> limit3(g, threshold3); 149*de0109beSIlya Mishin make_edge(limit3, queue); 150*de0109beSIlya Mishin long long decrement_value3 = 3; 151*de0109beSIlya Mishin CHECK_MESSAGE(limit3.decrementer().try_put( -decrement_value3 ), 152*de0109beSIlya Mishin "Limiter node decrementer's port does not accept message" ); 153*de0109beSIlya Mishin 154*de0109beSIlya Mishin m = 0; 155*de0109beSIlya Mishin while( limit3.try_put( m ) ){ m++; }; 156*de0109beSIlya Mishin CHECK_MESSAGE(m == threshold3 - decrement_value3, "Not all messages have been accepted." ); 157*de0109beSIlya Mishin 158*de0109beSIlya Mishin actual = -1; m = 0; 159*de0109beSIlya Mishin while( queue.try_get(actual) ){ 160*de0109beSIlya Mishin CHECK_MESSAGE(actual == m++, "Not all messages have been processed." ); 161*de0109beSIlya Mishin } 162*de0109beSIlya Mishin 163*de0109beSIlya Mishin g.wait_for_all(); 164*de0109beSIlya Mishin CHECK_MESSAGE(m == threshold3 - decrement_value3, "Not all messages have been processed." ); 165*de0109beSIlya Mishin 166*de0109beSIlya Mishin const size_t threshold4 = 10; 167*de0109beSIlya Mishin oneapi::tbb::flow::limiter_node<int> limit4(g, threshold4); 168*de0109beSIlya Mishin make_edge(limit4, queue); 169*de0109beSIlya Mishin 170*de0109beSIlya Mishin limit4.try_put(-1); 171*de0109beSIlya Mishin CHECK_MESSAGE(limit4.decrementer().try_put(oneapi::tbb::flow::continue_msg()), 172*de0109beSIlya Mishin "Limiter node decrementer's port does not accept continue_msg" ); 173*de0109beSIlya Mishin 174*de0109beSIlya Mishin m = 0; 175*de0109beSIlya Mishin while( limit4.try_put( m ) ){ m++; }; 176*de0109beSIlya Mishin CHECK_MESSAGE(m == threshold4, "Not all messages have been accepted." ); 177*de0109beSIlya Mishin 178*de0109beSIlya Mishin actual = -1; m = -1; 179*de0109beSIlya Mishin while( queue.try_get(actual) ){ 180*de0109beSIlya Mishin CHECK_MESSAGE(actual == m++, "Not all messages have been processed." ); 181*de0109beSIlya Mishin } 182*de0109beSIlya Mishin 183*de0109beSIlya Mishin g.wait_for_all(); 184*de0109beSIlya Mishin } 185