1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifdef TBB_TEST_LOW_WORKLOAD 18 #undef MAX_TUPLE_TEST_SIZE 19 #define MAX_TUPLE_TEST_SIZE 3 20 #endif 21 22 #include "common/config.h" 23 24 #include "test_join_node.h" 25 26 //! \file test_join_node.cpp 27 //! \brief Test for [flow_graph.join_node] specification 28 29 30 static std::atomic<int> output_count; 31 32 // get the tag from the output tuple and emit it. 33 // the first tuple component is tag * 2 cast to the type 34 template<typename OutputTupleType> 35 class recirc_output_func_body { 36 public: 37 // we only need this to use input_node_helper 38 typedef typename tbb::flow::join_node<OutputTupleType, tbb::flow::tag_matching> join_node_type; 39 static const int N = std::tuple_size<OutputTupleType>::value; 40 int operator()(const OutputTupleType &v) { 41 int out = int(std::get<0>(v))/2; 42 input_node_helper<N, join_node_type>::only_check_value(out, v); 43 ++output_count; 44 return out; 45 } 46 }; 47 48 template<typename JType> 49 class tag_recirculation_test { 50 public: 51 typedef typename JType::output_type TType; 52 typedef typename std::tuple<int, tbb::flow::continue_msg> input_tuple_type; 53 typedef tbb::flow::join_node<input_tuple_type, tbb::flow::reserving> input_join_type; 54 static const int N = std::tuple_size<TType>::value; 55 static void test() { 56 input_node_helper<N, JType>::print_remark("Recirculation test of tag-matching join"); 57 INFO(" >\n"); 58 for(int maxTag = 1; maxTag <10; maxTag *= 3) { 59 for(int i = 0; i < N; ++i) all_input_nodes[i][0] = NULL; 60 61 tbb::flow::graph g; 62 // this is the tag-matching join we're testing 63 JType * my_join = makeJoin<N, JType, tbb::flow::tag_matching>::create(g); 64 // input_node for continue messages 65 tbb::flow::input_node<tbb::flow::continue_msg> snode(g, recirc_input_node_body()); 66 // reserving join that matches recirculating tags with continue messages. 67 input_join_type * my_input_join = makeJoin<2, input_join_type, tbb::flow::reserving>::create(g); 68 // tbb::flow::make_edge(snode, tbb::flow::input_port<1>(*my_input_join)); 69 tbb::flow::make_edge(snode, std::get<1>(my_input_join->input_ports())); 70 // queue to hold the tags 71 tbb::flow::queue_node<int> tag_queue(g); 72 tbb::flow::make_edge(tag_queue, tbb::flow::input_port<0>(*my_input_join)); 73 // add all the function_nodes that are inputs to the tag-matching join 74 input_node_helper<N, JType>::add_recirc_func_nodes(*my_join, *my_input_join, g); 75 // add the function_node that accepts the output of the join and emits the int tag it was based on 76 tbb::flow::function_node<TType, int> recreate_tag(g, tbb::flow::unlimited, recirc_output_func_body<TType>()); 77 tbb::flow::make_edge(*my_join, recreate_tag); 78 // now the recirculating part (output back to the queue) 79 tbb::flow::make_edge(recreate_tag, tag_queue); 80 81 // put the tags into the queue 82 for(int t = 1; t<=maxTag; ++t) tag_queue.try_put(t); 83 84 input_count = Recirc_count; 85 output_count = 0; 86 87 // start up the source node to get things going 88 snode.activate(); 89 90 // wait for everything to stop 91 g.wait_for_all(); 92 93 CHECK_MESSAGE( (output_count==Recirc_count), "not all instances were received"); 94 95 int j{}; 96 // grab the tags from the queue, record them 97 std::vector<bool> out_tally(maxTag, false); 98 for(int i = 0; i < maxTag; ++i) { 99 CHECK_MESSAGE( (tag_queue.try_get(j)), "not enough tags in queue"); 100 CHECK_MESSAGE( (!out_tally.at(j-1)), "duplicate tag from queue"); 101 out_tally[j-1] = true; 102 } 103 CHECK_MESSAGE( (!tag_queue.try_get(j)), "Extra tags in recirculation queue"); 104 105 // deconstruct graph 106 input_node_helper<N, JType>::remove_recirc_func_nodes(*my_join, *my_input_join); 107 tbb::flow::remove_edge(*my_join, recreate_tag); 108 makeJoin<N, JType, tbb::flow::tag_matching>::destroy(my_join); 109 tbb::flow::remove_edge(tag_queue, tbb::flow::input_port<0>(*my_input_join)); 110 tbb::flow::remove_edge(snode, tbb::flow::input_port<1>(*my_input_join)); 111 makeJoin<2, input_join_type, tbb::flow::reserving>::destroy(my_input_join); 112 } 113 } 114 }; 115 116 template<typename JType> 117 class generate_recirc_test { 118 public: 119 typedef tbb::flow::join_node<JType, tbb::flow::tag_matching> join_node_type; 120 static void do_test() { 121 tag_recirculation_test<join_node_type>::test(); 122 } 123 }; 124 125 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 126 #include <array> 127 #include <vector> 128 void test_follows_and_precedes_api() { 129 using msg_t = tbb::flow::continue_msg; 130 using JoinOutputType = std::tuple<msg_t, msg_t, msg_t>; 131 132 std::array<msg_t, 3> messages_for_follows = { {msg_t(), msg_t(), msg_t()} }; 133 std::vector<msg_t> messages_for_precedes = {msg_t(), msg_t(), msg_t()}; 134 135 follows_and_precedes_testing::test_follows 136 <msg_t, tbb::flow::join_node<JoinOutputType>, tbb::flow::buffer_node<msg_t>>(messages_for_follows); 137 follows_and_precedes_testing::test_follows 138 <msg_t, tbb::flow::join_node<JoinOutputType, tbb::flow::queueing>>(messages_for_follows); 139 follows_and_precedes_testing::test_follows 140 <msg_t, tbb::flow::join_node<JoinOutputType, tbb::flow::reserving>, tbb::flow::buffer_node<msg_t>>(messages_for_follows); 141 auto b = [](msg_t) { return msg_t(); }; 142 class hash_compare { 143 public: 144 std::size_t hash(msg_t) const { return 0; } 145 bool equal(msg_t, msg_t) const { return true; } 146 }; 147 follows_and_precedes_testing::test_follows 148 <msg_t, tbb::flow::join_node<JoinOutputType, tbb::flow::key_matching<msg_t, hash_compare>>, tbb::flow::buffer_node<msg_t>> 149 (messages_for_follows, b, b, b); 150 151 follows_and_precedes_testing::test_precedes 152 <msg_t, tbb::flow::join_node<JoinOutputType>>(messages_for_precedes); 153 follows_and_precedes_testing::test_precedes 154 <msg_t, tbb::flow::join_node<JoinOutputType, tbb::flow::queueing>>(messages_for_precedes); 155 follows_and_precedes_testing::test_precedes 156 <msg_t, tbb::flow::join_node<JoinOutputType, tbb::flow::reserving>>(messages_for_precedes); 157 follows_and_precedes_testing::test_precedes 158 <msg_t, tbb::flow::join_node<JoinOutputType, tbb::flow::key_matching<msg_t, hash_compare>>> 159 (messages_for_precedes, b, b, b); 160 } 161 #endif 162 163 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 164 void test_deduction_guides() { 165 using namespace tbb::flow; 166 167 graph g; 168 using tuple_type = std::tuple<int, int, int>; 169 broadcast_node<int> b1(g), b2(g), b3(g); 170 broadcast_node<tuple_type> b4(g); 171 join_node<tuple_type> j0(g); 172 173 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 174 join_node j1(follows(b1, b2, b3)); 175 static_assert(std::is_same_v<decltype(j1), join_node<tuple_type>>); 176 177 join_node j2(follows(b1, b2, b3), reserving()); 178 static_assert(std::is_same_v<decltype(j2), join_node<tuple_type, reserving>>); 179 180 join_node j3(precedes(b4)); 181 static_assert(std::is_same_v<decltype(j3), join_node<tuple_type>>); 182 183 join_node j4(precedes(b4), reserving()); 184 static_assert(std::is_same_v<decltype(j4), join_node<tuple_type, reserving>>); 185 #endif 186 187 join_node j5(j0); 188 static_assert(std::is_same_v<decltype(j5), join_node<tuple_type>>); 189 } 190 191 #endif 192 193 namespace multiple_predecessors { 194 195 using namespace tbb::flow; 196 197 using join_node_t = join_node<std::tuple<continue_msg, continue_msg, continue_msg>, reserving>; 198 using queue_node_t = queue_node<std::tuple<continue_msg, continue_msg, continue_msg>>; 199 200 void twist_join_connections( 201 buffer_node<continue_msg>& bn1, buffer_node<continue_msg>& bn2, buffer_node<continue_msg>& bn3, 202 join_node_t& jn) 203 { 204 // order, in which edges are created/destroyed, is important 205 make_edge(bn1, input_port<0>(jn)); 206 make_edge(bn2, input_port<0>(jn)); 207 make_edge(bn3, input_port<0>(jn)); 208 209 remove_edge(bn3, input_port<0>(jn)); 210 make_edge (bn3, input_port<2>(jn)); 211 212 remove_edge(bn2, input_port<0>(jn)); 213 make_edge (bn2, input_port<1>(jn)); 214 } 215 216 std::unique_ptr<join_node_t> connect_join_via_make_edge( 217 graph& g, buffer_node<continue_msg>& bn1, buffer_node<continue_msg>& bn2, 218 buffer_node<continue_msg>& bn3, queue_node_t& qn) 219 { 220 std::unique_ptr<join_node_t> jn( new join_node_t(g) ); 221 twist_join_connections( bn1, bn2, bn3, *jn ); 222 make_edge(*jn, qn); 223 return jn; 224 } 225 226 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 227 std::unique_ptr<join_node_t> connect_join_via_follows( 228 graph&, buffer_node<continue_msg>& bn1, buffer_node<continue_msg>& bn2, 229 buffer_node<continue_msg>& bn3, queue_node_t& qn) 230 { 231 auto bn_set = make_node_set(bn1, bn2, bn3); 232 std::unique_ptr<join_node_t> jn( new join_node_t(follows(bn_set)) ); 233 make_edge(*jn, qn); 234 return jn; 235 } 236 237 std::unique_ptr<join_node_t> connect_join_via_precedes( 238 graph&, buffer_node<continue_msg>& bn1, buffer_node<continue_msg>& bn2, 239 buffer_node<continue_msg>& bn3, queue_node_t& qn) 240 { 241 auto qn_set = make_node_set(qn); 242 auto qn_copy_set = qn_set; 243 std::unique_ptr<join_node_t> jn( new join_node_t(precedes(qn_copy_set)) ); 244 twist_join_connections( bn1, bn2, bn3, *jn ); 245 return jn; 246 } 247 #endif // TBB_PREVIEW_FLOW_GRAPH_FEATURES 248 249 void run_and_check( 250 graph& g, buffer_node<continue_msg>& bn1, buffer_node<continue_msg>& bn2, 251 buffer_node<continue_msg>& bn3, queue_node_t& qn, bool expected) 252 { 253 std::tuple<continue_msg, continue_msg, continue_msg> msg; 254 255 bn1.try_put(continue_msg()); 256 bn2.try_put(continue_msg()); 257 bn3.try_put(continue_msg()); 258 g.wait_for_all(); 259 260 CHECK_MESSAGE( 261 (qn.try_get(msg) == expected), 262 "Unexpected message absence/existence at the end of the graph." 263 ); 264 } 265 266 template<typename ConnectJoinNodeFunc> 267 void test(ConnectJoinNodeFunc&& connect_join_node) { 268 graph g; 269 buffer_node<continue_msg> bn1(g); 270 buffer_node<continue_msg> bn2(g); 271 buffer_node<continue_msg> bn3(g); 272 queue_node_t qn(g); 273 274 auto jn = connect_join_node(g, bn1, bn2, bn3, qn); 275 276 run_and_check(g, bn1, bn2, bn3, qn, /*expected=*/true); 277 278 remove_edge(bn3, input_port<2>(*jn)); 279 remove_edge(bn2, input_port<1>(*jn)); 280 remove_edge(bn1, input_port<0>(*jn)); 281 remove_edge(*jn, qn); 282 283 run_and_check(g, bn1, bn2, bn3, qn, /*expected=*/false); 284 } 285 } // namespace multiple_predecessors 286 287 288 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 289 //! Test follows and precedes API 290 //! \brief \ref error_guessing 291 TEST_CASE("Test follows and preceedes API"){ 292 test_follows_and_precedes_api(); 293 } 294 #endif 295 296 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 297 //! Test deduction guides 298 //! \brief \ref requirement 299 TEST_CASE("Deduction guides test"){ 300 test_deduction_guides(); 301 } 302 #endif 303 304 //! Test hash buffers behavior 305 //! \brief \ref error_guessing 306 TEST_CASE("Tagged buffers test"){ 307 TestTaggedBuffers(); 308 } 309 310 //! Test with various policies and tuple sizes 311 //! \brief \ref error_guessing 312 TEST_CASE("Main test"){ 313 test_main<tbb::flow::queueing>(); 314 test_main<tbb::flow::reserving>(); 315 test_main<tbb::flow::tag_matching>(); 316 } 317 318 //! Test with recirculating tags 319 //! \brief \ref error_guessing 320 TEST_CASE("Recirculation test"){ 321 generate_recirc_test<std::tuple<int,float> >::do_test(); 322 } 323 324 //! Test maintaining correct count of ports without input 325 //! \brief \ref error_guessing 326 TEST_CASE("Test removal of the predecessor while having none") { 327 using namespace multiple_predecessors; 328 329 test(connect_join_via_make_edge); 330 #if TBB_PREVIEW_FLOW_GRAPH_FEATURES 331 test(connect_join_via_follows); 332 test(connect_join_via_precedes); 333 #endif 334 } 335