xref: /oneTBB/test/tbb/test_join_node.cpp (revision ce0d258e)
1 /*
2     Copyright (c) 2005-2022 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifdef TBB_TEST_LOW_WORKLOAD
18     #undef MAX_TUPLE_TEST_SIZE
19     #define MAX_TUPLE_TEST_SIZE 3
20 #endif
21 
22 #include "common/config.h"
23 
24 #include "test_join_node.h"
25 #include "common/test_join_node_multiple_predecessors.h"
26 
27 //! \file test_join_node.cpp
28 //! \brief Test for [flow_graph.join_node] specification
29 
30 static std::atomic<int> output_count;
31 
32 // get the tag from the output tuple and emit it.
33 // the first tuple component is tag * 2 cast to the type
34 template<typename OutputTupleType>
35 class recirc_output_func_body {
36 public:
37     // we only need this to use input_node_helper
38     typedef typename tbb::flow::join_node<OutputTupleType, tbb::flow::tag_matching> join_node_type;
39     static const int N = std::tuple_size<OutputTupleType>::value;
40     int operator()(const OutputTupleType &v) {
41         int out = int(std::get<0>(v))/2;
42         input_node_helper<N, join_node_type>::only_check_value(out, v);
43         ++output_count;
44         return out;
45     }
46 };
47 
48 template<typename JType>
49 class tag_recirculation_test {
50 public:
51     typedef typename JType::output_type TType;
52     typedef typename std::tuple<int, tbb::flow::continue_msg> input_tuple_type;
53     typedef tbb::flow::join_node<input_tuple_type, tbb::flow::reserving> input_join_type;
54     static const int N = std::tuple_size<TType>::value;
55     static void test() {
56         input_node_helper<N, JType>::print_remark("Recirculation test of tag-matching join");
57         INFO(" >\n");
58         for(int maxTag = 1; maxTag <10; maxTag *= 3) {
59             for(int i = 0; i < N; ++i) all_input_nodes[i][0] = nullptr;
60 
61             tbb::flow::graph g;
62             // this is the tag-matching join we're testing
63             JType * my_join = makeJoin<N, JType, tbb::flow::tag_matching>::create(g);
64             // input_node for continue messages
65             tbb::flow::input_node<tbb::flow::continue_msg> snode(g, recirc_input_node_body());
66             // reserving join that matches recirculating tags with continue messages.
67             input_join_type * my_input_join = makeJoin<2, input_join_type, tbb::flow::reserving>::create(g);
68             // tbb::flow::make_edge(snode, tbb::flow::input_port<1>(*my_input_join));
69             tbb::flow::make_edge(snode, std::get<1>(my_input_join->input_ports()));
70             // queue to hold the tags
71             tbb::flow::queue_node<int> tag_queue(g);
72             tbb::flow::make_edge(tag_queue, tbb::flow::input_port<0>(*my_input_join));
73             // add all the function_nodes that are inputs to the tag-matching join
74             input_node_helper<N, JType>::add_recirc_func_nodes(*my_join, *my_input_join, g);
75             // add the function_node that accepts the output of the join and emits the int tag it was based on
76             tbb::flow::function_node<TType, int> recreate_tag(g, tbb::flow::unlimited, recirc_output_func_body<TType>());
77             tbb::flow::make_edge(*my_join, recreate_tag);
78             // now the recirculating part (output back to the queue)
79             tbb::flow::make_edge(recreate_tag, tag_queue);
80 
81             // put the tags into the queue
82             for(int t = 1; t<=maxTag; ++t) tag_queue.try_put(t);
83 
84             input_count = Recirc_count;
85             output_count = 0;
86 
87             // start up the source node to get things going
88             snode.activate();
89 
90             // wait for everything to stop
91             g.wait_for_all();
92 
93             CHECK_MESSAGE( (output_count==Recirc_count), "not all instances were received");
94 
95             int j{};
96             // grab the tags from the queue, record them
97             std::vector<bool> out_tally(maxTag, false);
98             for(int i = 0; i < maxTag; ++i) {
99                 CHECK_MESSAGE( (tag_queue.try_get(j)), "not enough tags in queue");
100                 CHECK_MESSAGE( (!out_tally.at(j-1)), "duplicate tag from queue");
101                 out_tally[j-1] = true;
102             }
103             CHECK_MESSAGE( (!tag_queue.try_get(j)), "Extra tags in recirculation queue");
104 
105             // deconstruct graph
106             input_node_helper<N, JType>::remove_recirc_func_nodes(*my_join, *my_input_join);
107             tbb::flow::remove_edge(*my_join, recreate_tag);
108             makeJoin<N, JType, tbb::flow::tag_matching>::destroy(my_join);
109             tbb::flow::remove_edge(tag_queue, tbb::flow::input_port<0>(*my_input_join));
110             tbb::flow::remove_edge(snode, tbb::flow::input_port<1>(*my_input_join));
111             makeJoin<2, input_join_type, tbb::flow::reserving>::destroy(my_input_join);
112         }
113     }
114 };
115 
116 template<typename JType>
117 class generate_recirc_test {
118 public:
119     typedef tbb::flow::join_node<JType, tbb::flow::tag_matching> join_node_type;
120     static void do_test() {
121         tag_recirculation_test<join_node_type>::test();
122     }
123 };
124 
125 //! Test hash buffers behavior
126 //! \brief \ref error_guessing
127 TEST_CASE("Tagged buffers test"){
128     TestTaggedBuffers();
129 }
130 
131 //! Test with various policies and tuple sizes
132 //! \brief \ref error_guessing
133 TEST_CASE("Main test"){
134     test_main<tbb::flow::queueing>();
135     test_main<tbb::flow::reserving>();
136     test_main<tbb::flow::tag_matching>();
137 }
138 
139 //! Test with recirculating tags
140 //! \brief \ref error_guessing
141 TEST_CASE("Recirculation test"){
142     generate_recirc_test<std::tuple<int,float> >::do_test();
143 }
144 
145 // TODO: Look deeper into this test to see if it has the right name
146 // and if it actually tests some kind of regression. It is possible
147 // that `connect_join_via_follows` and `connect_join_via_precedes`
148 // functions are redundant.
149 
150 //! Test maintaining correct count of ports without input
151 //! \brief \ref error_guessing
152 TEST_CASE("Test removal of the predecessor while having none") {
153     using namespace multiple_predecessors;
154 
155     test(connect_join_via_make_edge);
156 }
157