1 /* 2 Copyright (c) 2005-2022 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifdef TBB_TEST_LOW_WORKLOAD 18 #undef MAX_TUPLE_TEST_SIZE 19 #define MAX_TUPLE_TEST_SIZE 3 20 #endif 21 22 #include "common/config.h" 23 24 #include "test_join_node.h" 25 #include "common/test_join_node_multiple_predecessors.h" 26 27 //! \file test_join_node.cpp 28 //! \brief Test for [flow_graph.join_node] specification 29 30 static std::atomic<int> output_count; 31 32 // get the tag from the output tuple and emit it. 33 // the first tuple component is tag * 2 cast to the type 34 template<typename OutputTupleType> 35 class recirc_output_func_body { 36 public: 37 // we only need this to use input_node_helper 38 typedef typename tbb::flow::join_node<OutputTupleType, tbb::flow::tag_matching> join_node_type; 39 static const int N = std::tuple_size<OutputTupleType>::value; operator ()(const OutputTupleType & v)40 int operator()(const OutputTupleType &v) { 41 int out = int(std::get<0>(v))/2; 42 input_node_helper<N, join_node_type>::only_check_value(out, v); 43 ++output_count; 44 return out; 45 } 46 }; 47 48 template<typename JType> 49 class tag_recirculation_test { 50 public: 51 typedef typename JType::output_type TType; 52 typedef typename std::tuple<int, tbb::flow::continue_msg> input_tuple_type; 53 typedef tbb::flow::join_node<input_tuple_type, tbb::flow::reserving> input_join_type; 54 static const int N = std::tuple_size<TType>::value; test()55 static void test() { 56 input_node_helper<N, JType>::print_remark("Recirculation test of tag-matching join"); 57 INFO(" >\n"); 58 for(int maxTag = 1; maxTag <10; maxTag *= 3) { 59 for(int i = 0; i < N; ++i) all_input_nodes[i][0] = nullptr; 60 61 tbb::flow::graph g; 62 // this is the tag-matching join we're testing 63 JType * my_join = makeJoin<N, JType, tbb::flow::tag_matching>::create(g); 64 // input_node for continue messages 65 tbb::flow::input_node<tbb::flow::continue_msg> snode(g, recirc_input_node_body()); 66 // reserving join that matches recirculating tags with continue messages. 67 input_join_type * my_input_join = makeJoin<2, input_join_type, tbb::flow::reserving>::create(g); 68 // tbb::flow::make_edge(snode, tbb::flow::input_port<1>(*my_input_join)); 69 tbb::flow::make_edge(snode, std::get<1>(my_input_join->input_ports())); 70 // queue to hold the tags 71 tbb::flow::queue_node<int> tag_queue(g); 72 tbb::flow::make_edge(tag_queue, tbb::flow::input_port<0>(*my_input_join)); 73 // add all the function_nodes that are inputs to the tag-matching join 74 input_node_helper<N, JType>::add_recirc_func_nodes(*my_join, *my_input_join, g); 75 // add the function_node that accepts the output of the join and emits the int tag it was based on 76 tbb::flow::function_node<TType, int> recreate_tag(g, tbb::flow::unlimited, recirc_output_func_body<TType>()); 77 tbb::flow::make_edge(*my_join, recreate_tag); 78 // now the recirculating part (output back to the queue) 79 tbb::flow::make_edge(recreate_tag, tag_queue); 80 81 // put the tags into the queue 82 for(int t = 1; t<=maxTag; ++t) tag_queue.try_put(t); 83 84 input_count = Recirc_count; 85 output_count = 0; 86 87 // start up the source node to get things going 88 snode.activate(); 89 90 // wait for everything to stop 91 g.wait_for_all(); 92 93 CHECK_MESSAGE( (output_count==Recirc_count), "not all instances were received"); 94 95 int j{}; 96 // grab the tags from the queue, record them 97 std::vector<bool> out_tally(maxTag, false); 98 for(int i = 0; i < maxTag; ++i) { 99 CHECK_MESSAGE( (tag_queue.try_get(j)), "not enough tags in queue"); 100 CHECK_MESSAGE( (!out_tally.at(j-1)), "duplicate tag from queue"); 101 out_tally[j-1] = true; 102 } 103 CHECK_MESSAGE( (!tag_queue.try_get(j)), "Extra tags in recirculation queue"); 104 105 // deconstruct graph 106 input_node_helper<N, JType>::remove_recirc_func_nodes(*my_join, *my_input_join); 107 tbb::flow::remove_edge(*my_join, recreate_tag); 108 makeJoin<N, JType, tbb::flow::tag_matching>::destroy(my_join); 109 tbb::flow::remove_edge(tag_queue, tbb::flow::input_port<0>(*my_input_join)); 110 tbb::flow::remove_edge(snode, tbb::flow::input_port<1>(*my_input_join)); 111 makeJoin<2, input_join_type, tbb::flow::reserving>::destroy(my_input_join); 112 } 113 } 114 }; 115 116 template<typename JType> 117 class generate_recirc_test { 118 public: 119 typedef tbb::flow::join_node<JType, tbb::flow::tag_matching> join_node_type; do_test()120 static void do_test() { 121 tag_recirculation_test<join_node_type>::test(); 122 } 123 }; 124 125 //! Test hash buffers behavior 126 //! \brief \ref error_guessing 127 TEST_CASE("Tagged buffers test"){ 128 TestTaggedBuffers(); 129 } 130 131 //! Test with various policies and tuple sizes 132 //! \brief \ref error_guessing 133 TEST_CASE("Main test"){ 134 test_main<tbb::flow::queueing>(); 135 test_main<tbb::flow::reserving>(); 136 test_main<tbb::flow::tag_matching>(); 137 } 138 139 //! Test with recirculating tags 140 //! \brief \ref error_guessing 141 TEST_CASE("Recirculation test"){ 142 generate_recirc_test<std::tuple<int,float> >::do_test(); 143 } 144 145 // TODO: Look deeper into this test to see if it has the right name 146 // and if it actually tests some kind of regression. It is possible 147 // that `connect_join_via_follows` and `connect_join_via_precedes` 148 // functions are redundant. 149 150 //! Test maintaining correct count of ports without input 151 //! \brief \ref error_guessing 152 TEST_CASE("Test removal of the predecessor while having none") { 153 using namespace multiple_predecessors; 154 155 test(connect_join_via_make_edge); 156 } 157