1 /* 2 Copyright (c) 2020-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #if __INTEL_COMPILER && _MSC_VER 18 #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated 19 #endif 20 21 22 #include "common/test.h" 23 24 #include "common/utils.h" 25 #include "common/graph_utils.h" 26 27 #include "oneapi/tbb/flow_graph.h" 28 #include "oneapi/tbb/task_arena.h" 29 #include "oneapi/tbb/global_control.h" 30 31 #include "conformance_flowgraph.h" 32 33 //! \file conformance_multifunction_node.cpp 34 //! \brief Test for [flow_graph.function_node] specification 35 36 /* 37 TODO: implement missing conformance tests for multifunction_node: 38 - [ ] Implement test_forwarding that checks messages are broadcast to all the successors connected 39 to the output port the message is being sent to. And check that the value passed is the 40 actual one received. 41 - [ ] Explicit test for copy constructor of the node. 42 - [ ] Constructor with explicitly passed Policy parameter: `template<typename Body> 43 multifunction_node( graph &g, size_t concurrency, Body body, Policy(), node_priority_t priority = no_priority )'. 44 - [ ] Concurrency testing of the node: make a loop over possible concurrency levels. It is 45 important to test at least on five values: 1, oneapi::tbb::flow::serial, `max_allowed_parallelism' 46 obtained from `oneapi::tbb::global_control', `oneapi::tbb::flow::unlimited', and, if `max allowed 47 parallelism' is > 2, use something in the middle of the [1, max_allowed_parallelism] 48 interval. Use `utils::ExactConcurrencyLevel' entity (extending it if necessary). 49 - [ ] make `test_rejecting' deterministic, i.e. avoid dependency on OS scheduling of the threads; 50 add check that `try_put()' returns `false' 51 - [ ] The `copy_body' function copies altered body (e.g. after successful `try_put()' call). 52 - [ ] `output_ports_type' is defined and accessible by the user. 53 - [ ] Explicit test on `mfn::output_ports()' method. 54 - [ ] The copy constructor and copy assignment are called for the node's input and output types. 55 - [ ] Add CTAD test. 56 */ 57 58 template< typename OutputType > 59 struct mf_functor { 60 61 std::atomic<std::size_t>& local_execute_count; 62 63 mf_functor(std::atomic<std::size_t>& execute_count ) : 64 local_execute_count (execute_count) 65 { } 66 67 mf_functor( const mf_functor &f ) : local_execute_count(f.local_execute_count) { } 68 void operator=(const mf_functor &f) { local_execute_count = std::size_t(f.local_execute_count); } 69 70 void operator()( const int& argument, oneapi::tbb::flow::multifunction_node<int, std::tuple<int>>::output_ports_type &op ) { 71 ++local_execute_count; 72 std::get<0>(op).try_put(argument); 73 } 74 75 }; 76 77 template<typename I, typename O> 78 void test_inheritance(){ 79 using namespace oneapi::tbb::flow; 80 81 CHECK_MESSAGE( (std::is_base_of<graph_node, multifunction_node<I, O>>::value), "multifunction_node should be derived from graph_node"); 82 CHECK_MESSAGE( (std::is_base_of<receiver<I>, multifunction_node<I, O>>::value), "multifunction_node should be derived from receiver<Input>"); 83 } 84 85 void test_multifunc_body(){ 86 oneapi::tbb::flow::graph g; 87 std::atomic<size_t> local_count(0); 88 mf_functor<std::tuple<int>> fun(local_count); 89 90 oneapi::tbb::flow::multifunction_node<int, std::tuple<int>, oneapi::tbb::flow::rejecting> node1(g, oneapi::tbb::flow::unlimited, fun); 91 92 const size_t n = 10; 93 for(size_t i = 0; i < n; ++i) { 94 CHECK_MESSAGE((node1.try_put(1) == true), "try_put needs to return true"); 95 } 96 g.wait_for_all(); 97 98 CHECK_MESSAGE( (local_count == n), "Body of the node needs to be executed N times"); 99 } 100 101 template<typename I, typename O> 102 struct CopyCounterBody{ 103 size_t copy_count; 104 105 CopyCounterBody(): 106 copy_count(0) {} 107 108 CopyCounterBody(const CopyCounterBody<I, O>& other): 109 copy_count(other.copy_count + 1) {} 110 111 CopyCounterBody& operator=(const CopyCounterBody<I, O>& other) 112 { copy_count = other.copy_count + 1; return *this;} 113 114 void operator()( const I& argument, oneapi::tbb::flow::multifunction_node<int, std::tuple<int>>::output_ports_type &op ) { 115 std::get<0>(op).try_put(argument); 116 } 117 }; 118 119 void test_copies(){ 120 using namespace oneapi::tbb::flow; 121 122 CopyCounterBody<int, std::tuple<int>> b; 123 124 graph g; 125 multifunction_node<int, std::tuple<int>> fn(g, unlimited, b); 126 127 CopyCounterBody<int, std::tuple<int>> b2 = copy_body<CopyCounterBody<int, std::tuple<int>>, 128 multifunction_node<int, std::tuple<int>>>(fn); 129 130 CHECK_MESSAGE( (b.copy_count + 2 <= b2.copy_count), "copy_body and constructor should copy bodies"); 131 } 132 133 template< typename OutputType > 134 struct id_functor { 135 void operator()( const int& argument, oneapi::tbb::flow::multifunction_node<int, std::tuple<int>>::output_ports_type &op ) { 136 std::get<0>(op).try_put(argument); 137 } 138 }; 139 140 void test_forwarding(){ 141 oneapi::tbb::flow::graph g; 142 id_functor<int> fun; 143 144 oneapi::tbb::flow::multifunction_node<int, std::tuple<int>> node1(g, oneapi::tbb::flow::unlimited, fun); 145 test_push_receiver<int> node2(g); 146 test_push_receiver<int> node3(g); 147 148 oneapi::tbb::flow::make_edge(node1, node2); 149 oneapi::tbb::flow::make_edge(node1, node3); 150 151 node1.try_put(1); 152 g.wait_for_all(); 153 154 CHECK_MESSAGE( (get_count(node3) == 1), "Descendant of the node must receive one message."); 155 CHECK_MESSAGE( (get_count(node2) == 1), "Descendant of the node must receive one message."); 156 } 157 158 void test_rejecting_buffering(){ 159 oneapi::tbb::flow::graph g; 160 id_functor<int> fun; 161 162 oneapi::tbb::flow::multifunction_node<int, std::tuple<int>, oneapi::tbb::flow::rejecting> node(g, oneapi::tbb::flow::unlimited, fun); 163 oneapi::tbb::flow::limiter_node<int> rejecter(g, 0); 164 165 oneapi::tbb::flow::make_edge(node, rejecter); 166 node.try_put(1); 167 168 int tmp = -1; 169 CHECK_MESSAGE( (std::get<0>(node.output_ports()).try_get(tmp) == false), "try_get after rejection should not succeed"); 170 CHECK_MESSAGE( (tmp == -1), "try_get after rejection should alter passed value"); 171 g.wait_for_all(); 172 } 173 174 void test_policy_ctors(){ 175 using namespace oneapi::tbb::flow; 176 graph g; 177 178 id_functor<int> fun; 179 180 multifunction_node<int, std::tuple<int>, lightweight> lw_node(g, oneapi::tbb::flow::serial, fun); 181 multifunction_node<int, std::tuple<int>, queueing_lightweight> qlw_node(g, oneapi::tbb::flow::serial, fun); 182 multifunction_node<int, std::tuple<int>, rejecting_lightweight> rlw_node(g, oneapi::tbb::flow::serial, fun); 183 184 } 185 186 std::atomic<size_t> my_concurrency; 187 std::atomic<size_t> my_max_concurrency; 188 189 struct concurrency_functor { 190 void operator()( const int& argument, oneapi::tbb::flow::multifunction_node<int, std::tuple<int>>::output_ports_type &op ) { 191 ++my_concurrency; 192 193 size_t old_value = my_max_concurrency; 194 while(my_max_concurrency < my_concurrency && 195 !my_max_concurrency.compare_exchange_weak(old_value, my_concurrency)) 196 ; 197 198 size_t ms = 1000; 199 std::chrono::milliseconds sleep_time( ms ); 200 std::this_thread::sleep_for( sleep_time ); 201 202 --my_concurrency; 203 std::get<0>(op).try_put(argument); 204 } 205 206 }; 207 208 void test_node_concurrency(){ 209 my_concurrency = 0; 210 my_max_concurrency = 0; 211 212 oneapi::tbb::flow::graph g; 213 214 concurrency_functor counter; 215 oneapi::tbb::flow::multifunction_node <int, std::tuple<int>> fnode(g, oneapi::tbb::flow::serial, counter); 216 217 test_push_receiver<int> sink(g); 218 219 make_edge(std::get<0>(fnode.output_ports()), sink); 220 221 for(int i = 0; i < 10; ++i){ 222 fnode.try_put(i); 223 } 224 225 g.wait_for_all(); 226 CHECK_MESSAGE( ( my_max_concurrency.load() == 1), "Measured parallelism over limit"); 227 } 228 229 230 void test_priority(){ 231 size_t concurrency_limit = 1; 232 oneapi::tbb::global_control control(oneapi::tbb::global_control::max_allowed_parallelism, concurrency_limit); 233 234 oneapi::tbb::flow::graph g; 235 236 oneapi::tbb::flow::continue_node<int> source(g, 237 [](oneapi::tbb::flow::continue_msg){ return 1;}); 238 source.try_put(oneapi::tbb::flow::continue_msg()); 239 240 first_functor<int>::first_id = -1; 241 first_functor<int> low_functor(1); 242 first_functor<int> high_functor(2); 243 244 oneapi::tbb::flow::multifunction_node<int, std::tuple<int>> high(g, oneapi::tbb::flow::unlimited, high_functor, oneapi::tbb::flow::node_priority_t(1)); 245 oneapi::tbb::flow::multifunction_node<int, std::tuple<int>> low(g, oneapi::tbb::flow::unlimited, low_functor); 246 247 make_edge(source, low); 248 make_edge(source, high); 249 250 g.wait_for_all(); 251 252 CHECK_MESSAGE( (first_functor<int>::first_id == 2), "High priority node should execute first"); 253 } 254 255 void test_rejecting(){ 256 oneapi::tbb::flow::graph g; 257 oneapi::tbb::flow::multifunction_node <int, std::tuple<int>, oneapi::tbb::flow::rejecting> fnode(g, oneapi::tbb::flow::serial, 258 [&](const int& argument, oneapi::tbb::flow::multifunction_node<int, std::tuple<int>>::output_ports_type &op ){ 259 size_t ms = 50; 260 std::chrono::milliseconds sleep_time( ms ); 261 std::this_thread::sleep_for( sleep_time ); 262 std::get<0>(op).try_put(argument); 263 }); 264 265 test_push_receiver<int> sink(g); 266 267 make_edge(std::get<0>(fnode.output_ports()), sink); 268 269 for(int i = 0; i < 10; ++i){ 270 fnode.try_put(i); 271 } 272 273 g.wait_for_all(); 274 CHECK_MESSAGE( (get_count(sink) == 1), "Messages should be rejected while the first is being processed"); 275 } 276 277 //! Test multifunction_node with rejecting policy 278 //! \brief \ref interface 279 TEST_CASE("multifunction_node with rejecting policy"){ 280 test_rejecting(); 281 } 282 283 //! Test priorities 284 //! \brief \ref interface 285 TEST_CASE("multifunction_node priority"){ 286 test_priority(); 287 } 288 289 //! Test concurrency 290 //! \brief \ref interface 291 TEST_CASE("multifunction_node concurrency"){ 292 test_node_concurrency(); 293 } 294 295 //! Test constructors 296 //! \brief \ref interface 297 TEST_CASE("multifunction_node constructors"){ 298 test_policy_ctors(); 299 } 300 301 //! Test function_node buffering 302 //! \brief \ref requirement 303 TEST_CASE("multifunction_node buffering"){ 304 test_rejecting_buffering(); 305 } 306 307 //! Test function_node broadcasting 308 //! \brief \ref requirement 309 TEST_CASE("multifunction_node broadcast"){ 310 test_forwarding(); 311 } 312 313 //! Test body copying and copy_body logic 314 //! \brief \ref interface 315 TEST_CASE("multifunction_node constructors"){ 316 test_copies(); 317 } 318 319 //! Test calling function body 320 //! \brief \ref interface \ref requirement 321 TEST_CASE("multifunction_node body") { 322 test_multifunc_body(); 323 } 324 325 //! Test inheritance relations 326 //! \brief \ref interface 327 TEST_CASE("multifunction_node superclasses"){ 328 test_inheritance<int, std::tuple<int>>(); 329 test_inheritance<void*, std::tuple<float>>(); 330 } 331