1 /* 2 Copyright (c) 2020-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #if __INTEL_COMPILER && _MSC_VER 18 #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated 19 #endif 20 21 #include "conformance_flowgraph.h" 22 23 //! \file conformance_limiter_node.cpp 24 //! \brief Test for [flow_graph.limiter_node] specification 25 26 using input_msg = conformance::message</*default_ctor*/true, /*copy_ctor*/true/*enable for queue_node successor*/, /*copy_assign*/true/*enable for queue_node successor*/>; 27 28 //! Test limiter_node limiting 29 //! \brief \ref requirement 30 TEST_CASE("limiter_node limiting"){ 31 oneapi::tbb::flow::graph g; 32 33 constexpr int limit = 5; 34 oneapi::tbb::flow::limiter_node<input_msg> node1(g, limit); 35 conformance::test_push_receiver<input_msg> node2(g); 36 37 oneapi::tbb::flow::make_edge(node1, node2); 38 39 for(int i = 0; i < limit * 2; ++i) 40 node1.try_put(input_msg(1)); 41 g.wait_for_all(); 42 43 CHECK_MESSAGE((conformance::get_values(node2).size() == limit), "Descendant of the node needs be receive limited number of messages"); 44 } 45 46 //! Test node broadcast messages to successors 47 //! \brief \ref requirement 48 TEST_CASE("limiter_node broadcast"){ 49 conformance::test_forwarding<oneapi::tbb::flow::limiter_node<int>, int>(1, 5); 50 conformance::test_forwarding<oneapi::tbb::flow::limiter_node<input_msg>, input_msg>(1, 5); 51 } 52 53 //! Test node not buffered unsuccessful message, and try_get after rejection should not succeed. 54 //! \brief \ref requirement 55 TEST_CASE("limiter_node buffering"){ 56 conformance::test_buffering<oneapi::tbb::flow::limiter_node<int>, int>(5); 57 conformance::test_buffering<oneapi::tbb::flow::limiter_node<int, int>, int>(5); 58 } 59 60 //! The node that is constructed has a reference to the same graph object as src, has the same threshold. 61 //! The predecessors and successors of src are not copied. 62 //! \brief \ref interface 63 TEST_CASE("limiter_node copy constructor"){ 64 using namespace oneapi::tbb::flow; 65 graph g; 66 67 limiter_node<int> node0(g, 1); 68 limiter_node<int> node1(g, 1); 69 conformance::test_push_receiver<int> node2(g); 70 conformance::test_push_receiver<int> node3(g); 71 72 oneapi::tbb::flow::make_edge(node0, node1); 73 oneapi::tbb::flow::make_edge(node1, node2); 74 75 limiter_node<int> node_copy(node1); 76 77 oneapi::tbb::flow::make_edge(node_copy, node3); 78 79 node_copy.try_put(1); 80 g.wait_for_all(); 81 82 CHECK_MESSAGE((conformance::get_values(node2).size() == 0 && conformance::get_values(node3).size() == 1), "Copied node doesn`t copy successor"); 83 84 node_copy.try_put(1); 85 g.wait_for_all(); 86 87 CHECK_MESSAGE((conformance::get_values(node2).size() == 0 && conformance::get_values(node3).size() == 0), "Copied node copy threshold"); 88 89 node0.try_put(1); 90 g.wait_for_all(); 91 92 CHECK_MESSAGE((conformance::get_values(node2).size() == 1 && conformance::get_values(node3).size() == 0), "Copied node doesn`t copy predecessor"); 93 } 94 95 //! Test inheritance relations 96 //! \brief \ref interface 97 TEST_CASE("limiter_node superclasses"){ 98 conformance::test_inheritance<oneapi::tbb::flow::limiter_node<int>, int, int>(); 99 conformance::test_inheritance<oneapi::tbb::flow::limiter_node<float>, float, float>(); 100 conformance::test_inheritance<oneapi::tbb::flow::limiter_node<input_msg>, input_msg, input_msg>(); 101 } 102 103 //! Test limiter_node decrementer 104 //! \brief \ref interface 105 TEST_CASE("limiter_node decrementer"){ 106 const int threshold = 5; 107 oneapi::tbb::flow::graph g; 108 oneapi::tbb::flow::limiter_node<int, int> limit(g, threshold); 109 oneapi::tbb::flow::queue_node<int> queue(g); 110 make_edge(limit, queue); 111 int m = 0; 112 CHECK_MESSAGE(( limit.try_put( m++ )), "Newly constructed limiter node does not accept message." ); 113 CHECK_MESSAGE(limit.decrementer().try_put( -threshold ), // close limiter's gate 114 "Limiter node decrementer's port does not accept message." ); 115 CHECK_MESSAGE(( !limit.try_put( m++ )), "Closed limiter node's accepts message." ); 116 CHECK_MESSAGE(limit.decrementer().try_put( threshold + 5 ), // open limiter's gate 117 "Limiter node decrementer's port does not accept message." ); 118 for( int i = 0; i < threshold; ++i ) 119 CHECK_MESSAGE(( limit.try_put( m++ )), "Limiter node does not accept message while open." ); 120 CHECK_MESSAGE(( !limit.try_put( m )), "Limiter node's gate is not closed." ); 121 g.wait_for_all(); 122 int expected[] = {0, 2, 3, 4, 5, 6}; 123 int actual = -1; m = 0; 124 while( queue.try_get(actual) ) 125 CHECK_MESSAGE(actual == expected[m++], "" ); 126 CHECK_MESSAGE(( sizeof(expected) / sizeof(expected[0]) == m), "Not all messages have been processed." ); 127 g.wait_for_all(); 128 129 const size_t threshold2 = size_t(-1); 130 oneapi::tbb::flow::limiter_node<int, long long> limit2(g, threshold2); 131 make_edge(limit2, queue); 132 CHECK_MESSAGE(( limit2.try_put( 1 )), "Newly constructed limiter node does not accept message." ); 133 long long decrement_value = (long long)( size_t(-1)/2 ); 134 CHECK_MESSAGE(limit2.decrementer().try_put( -decrement_value ), 135 "Limiter node decrementer's port does not accept message" ); 136 CHECK_MESSAGE(( limit2.try_put( 2 )), "Limiter's gate should not be closed yet." ); 137 CHECK_MESSAGE(limit2.decrementer().try_put( -decrement_value ), 138 "Limiter node decrementer's port does not accept message" ); 139 CHECK_MESSAGE(( !limit2.try_put( 3 )), "Overflow happened for internal counter." ); 140 int expected2[] = {1, 2}; 141 actual = -1; m = 0; 142 while( queue.try_get(actual) ) 143 CHECK_MESSAGE(actual == expected2[m++], "" ); 144 CHECK_MESSAGE(( sizeof(expected2) / sizeof(expected2[0]) == m), "Not all messages have been processed." ); 145 g.wait_for_all(); 146 147 const size_t threshold3 = 10; 148 oneapi::tbb::flow::limiter_node<int, long long> limit3(g, threshold3); 149 make_edge(limit3, queue); 150 long long decrement_value3 = 3; 151 CHECK_MESSAGE(limit3.decrementer().try_put( -decrement_value3 ), 152 "Limiter node decrementer's port does not accept message" ); 153 154 m = 0; 155 while( limit3.try_put( m ) ){ m++; }; 156 CHECK_MESSAGE(m == threshold3 - decrement_value3, "Not all messages have been accepted." ); 157 158 actual = -1; m = 0; 159 while( queue.try_get(actual) ){ 160 CHECK_MESSAGE(actual == m++, "Not all messages have been processed." ); 161 } 162 163 g.wait_for_all(); 164 CHECK_MESSAGE(m == threshold3 - decrement_value3, "Not all messages have been processed." ); 165 166 const size_t threshold4 = 10; 167 oneapi::tbb::flow::limiter_node<int> limit4(g, threshold4); 168 make_edge(limit4, queue); 169 170 limit4.try_put(-1); 171 CHECK_MESSAGE(limit4.decrementer().try_put(oneapi::tbb::flow::continue_msg()), 172 "Limiter node decrementer's port does not accept continue_msg" ); 173 174 m = 0; 175 while( limit4.try_put( m ) ){ m++; }; 176 CHECK_MESSAGE(m == threshold4, "Not all messages have been accepted." ); 177 178 actual = -1; m = -1; 179 while( queue.try_get(actual) ){ 180 CHECK_MESSAGE(actual == m++, "Not all messages have been processed." ); 181 } 182 183 g.wait_for_all(); 184 } 185