1 /*
2 Copyright (c) 2005-2022 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #include "common/config.h"
18
19 #include "tbb/flow_graph.h"
20
21 #include "common/test.h"
22 #include "common/utils.h"
23 #include "common/utils_assert.h"
24 #include "common/graph_utils.h"
25
26
27 //! \file test_split_node.cpp
28 //! \brief Test for [flow_graph.split_node] specification
29
30
31 #if defined(_MSC_VER) && _MSC_VER < 1600
32 #pragma warning (disable : 4503) //disabling the "decorated name length exceeded" warning for VS2008 and earlier
33 #endif
34
35 //
36 // Tests
37 //
38
39 const int Count = 300;
40 const int MaxPorts = 10;
41 const int MaxNInputs = 5; // max # of input_nodes to register for each split_node input in parallel test
42
43 std::vector<bool> flags; // for checking output
44
45 template<typename T>
46 class name_of {
47 public:
name()48 static const char* name() { return "Unknown"; }
49 };
50 template<>
51 class name_of<int> {
52 public:
name()53 static const char* name() { return "int"; }
54 };
55 template<>
56 class name_of<float> {
57 public:
name()58 static const char* name() { return "float"; }
59 };
60 template<>
61 class name_of<double> {
62 public:
name()63 static const char* name() { return "double"; }
64 };
65 template<>
66 class name_of<long> {
67 public:
name()68 static const char* name() { return "long"; }
69 };
70 template<>
71 class name_of<short> {
72 public:
name()73 static const char* name() { return "short"; }
74 };
75
76 // T must be arithmetic, and shouldn't wrap around for reasonable sizes of Count (which is now 150, and maxPorts is 10,
77 // so the max number generated right now is 1500 or so.) Input will generate a series of TT with value
78 // (init_val + (i-1)*addend) * my_mult, where i is the i-th invocation of the body. We are attaching addend
79 // input nodes to a join_port, and each will generate part of the numerical series the port is expecting
80 // to receive. If there is only one input node, the series order will be maintained; if more than one,
81 // this is not guaranteed.
82
83 template<int N>
84 struct tuple_helper {
85 template<typename TupleType>
set_elementtuple_helper86 static void set_element( TupleType &t, int i) {
87 std::get<N-1>(t) = (typename std::tuple_element<N-1,TupleType>::type)(i * (N+1));
88 tuple_helper<N-1>::set_element(t, i);
89 }
90 };
91
92 template<>
93 struct tuple_helper<1> {
94 template<typename TupleType>
set_elementtuple_helper95 static void set_element(TupleType &t, int i) {
96 std::get<0>(t) = (typename std::tuple_element<0,TupleType>::type)(i * 2);
97 }
98 };
99
100 // if we start N input_bodys they will all have the addend N, and my_count should be initialized to 0 .. N-1.
101 // the output tuples should have all the sequence, but the order will in general vary.
102 template<typename TupleType>
103 class my_input_body {
104 typedef TupleType TT;
105 static const int N = std::tuple_size<TT>::value;
106 int my_count;
107 int addend;
108 public:
my_input_body(int init_val,int addto)109 my_input_body(int init_val, int addto) : my_count(init_val), addend(addto) { }
operator ()(tbb::flow_control & fc)110 TT operator()( tbb::flow_control &fc) {
111 if(my_count >= Count){
112 fc.stop();
113 return TT();
114 }
115 TT v;
116 tuple_helper<N>::set_element(v, my_count);
117 my_count += addend;
118 return v;
119 }
120 };
121
122 // allocator for split_node.
123
124 template<int N, typename SType>
125 class makeSplit {
126 public:
create(tbb::flow::graph & g)127 static SType *create(tbb::flow::graph& g) {
128 SType *temp = new SType(g);
129 return temp;
130 }
destroy(SType * p)131 static void destroy(SType *p) { delete p; }
132 };
133
134 // holder for sink_node pointers for eventual deletion
135
136 static void* all_sink_nodes[MaxPorts];
137
138
139 template<int ELEM, typename SType>
140 class sink_node_helper {
141 public:
142 typedef typename SType::input_type TT;
143 typedef typename std::tuple_element<ELEM-1,TT>::type IT;
144 typedef typename tbb::flow::queue_node<IT> my_sink_node_type;
print_parallel_remark()145 static void print_parallel_remark() {
146 sink_node_helper<ELEM-1,SType>::print_parallel_remark();
147 INFO(", " << name_of<IT>::name());
148 }
print_serial_remark()149 static void print_serial_remark() {
150 sink_node_helper<ELEM-1,SType>::print_serial_remark();
151 INFO(", " << name_of<IT>::name());
152 }
add_sink_nodes(SType & my_split,tbb::flow::graph & g)153 static void add_sink_nodes(SType &my_split, tbb::flow::graph &g) {
154 my_sink_node_type *new_node = new my_sink_node_type(g);
155 tbb::flow::make_edge( tbb::flow::output_port<ELEM-1>(my_split) , *new_node);
156 all_sink_nodes[ELEM-1] = (void *)new_node;
157 sink_node_helper<ELEM-1, SType>::add_sink_nodes(my_split, g);
158 }
159
check_sink_values()160 static void check_sink_values() {
161 my_sink_node_type *dp = reinterpret_cast<my_sink_node_type *>(all_sink_nodes[ELEM-1]);
162 for(int i = 0; i < Count; ++i) {
163 IT v{};
164 CHECK_MESSAGE(dp->try_get(v), "");
165 flags[((int)v) / (ELEM+1)] = true;
166 }
167 for(int i = 0; i < Count; ++i) {
168 CHECK_MESSAGE(flags[i], "");
169 flags[i] = false; // reset for next test
170 }
171 sink_node_helper<ELEM-1,SType>::check_sink_values();
172 }
remove_sink_nodes(SType & my_split)173 static void remove_sink_nodes(SType& my_split) {
174 my_sink_node_type *dp = reinterpret_cast<my_sink_node_type *>(all_sink_nodes[ELEM-1]);
175 tbb::flow::remove_edge( tbb::flow::output_port<ELEM-1>(my_split) , *dp);
176 delete dp;
177 sink_node_helper<ELEM-1, SType>::remove_sink_nodes(my_split);
178 }
179 };
180
181 template<typename SType>
182 class sink_node_helper<1, SType> {
183 typedef typename SType::input_type TT;
184 typedef typename std::tuple_element<0,TT>::type IT;
185 typedef typename tbb::flow::queue_node<IT> my_sink_node_type;
186 public:
print_parallel_remark()187 static void print_parallel_remark() {
188 INFO("Parallel test of split_node< " << name_of<IT>::name());
189 }
print_serial_remark()190 static void print_serial_remark() {
191 INFO("Serial test of split_node< " << name_of<IT>::name());
192 }
add_sink_nodes(SType & my_split,tbb::flow::graph & g)193 static void add_sink_nodes(SType &my_split, tbb::flow::graph &g) {
194 my_sink_node_type *new_node = new my_sink_node_type(g);
195 tbb::flow::make_edge( tbb::flow::output_port<0>(my_split) , *new_node);
196 all_sink_nodes[0] = (void *)new_node;
197 }
check_sink_values()198 static void check_sink_values() {
199 my_sink_node_type *dp = reinterpret_cast<my_sink_node_type *>(all_sink_nodes[0]);
200 for(int i = 0; i < Count; ++i) {
201 IT v{};
202 CHECK_MESSAGE(dp->try_get(v), "");
203 flags[((int)v) / 2] = true;
204 }
205 for(int i = 0; i < Count; ++i) {
206 CHECK_MESSAGE(flags[i], "");
207 flags[i] = false; // reset for next test
208 }
209 }
remove_sink_nodes(SType & my_split)210 static void remove_sink_nodes(SType& my_split) {
211 my_sink_node_type *dp = reinterpret_cast<my_sink_node_type *>(all_sink_nodes[0]);
212 tbb::flow::remove_edge( tbb::flow::output_port<0>(my_split) , *dp);
213 delete dp;
214 }
215 };
216
217 // parallel_test: create input_nodes that feed tuples into the split node
218 // and queue_nodes that receive the output.
219 template<typename SType>
220 class parallel_test {
221 public:
222 typedef typename SType::input_type TType;
223 typedef tbb::flow::input_node<TType> input_type;
224 static const int N = std::tuple_size<TType>::value;
225
test()226 static void test() {
227 input_type* all_input_nodes[MaxNInputs];
228 sink_node_helper<N,SType>::print_parallel_remark();
229 INFO(" >\n");
230 for(int i=0; i < MaxPorts; ++i) {
231 all_sink_nodes[i] = nullptr;
232 }
233 // try test for # inputs 1 .. MaxNInputs
234 for(int nInputs = 1; nInputs <= MaxNInputs; ++nInputs) {
235 tbb::flow::graph g;
236 SType* my_split = makeSplit<N,SType>::create(g);
237
238 // add sinks first so when inputs start spitting out values they are there to catch them
239 sink_node_helper<N, SType>::add_sink_nodes((*my_split), g);
240
241 // now create nInputs input_nodes, each spitting out i, i+nInputs, i+2*nInputs ...
242 // each element of the tuple is i*(n+1), where n is the tuple element index (1-N)
243 for(int i = 0; i < nInputs; ++i) {
244 // create input node
245 input_type *s = new input_type(g, my_input_body<TType>(i, nInputs) );
246 tbb::flow::make_edge(*s, *my_split);
247 all_input_nodes[i] = s;
248 s->activate();
249 }
250
251 g.wait_for_all();
252
253 // check that we got Count values in each output queue, and all the index values
254 // are there.
255 sink_node_helper<N, SType>::check_sink_values();
256
257 sink_node_helper<N, SType>::remove_sink_nodes(*my_split);
258 for(int i = 0; i < nInputs; ++i) {
259 delete all_input_nodes[i];
260 }
261 makeSplit<N,SType>::destroy(my_split);
262 }
263 }
264 };
265
266 //
267 // Single predecessor, single accepting successor at each port
268
269 template<typename SType>
test_one_serial(SType & my_split,tbb::flow::graph & g)270 void test_one_serial( SType &my_split, tbb::flow::graph &g) {
271 typedef typename SType::input_type TType;
272 static const int TUPLE_SIZE = std::tuple_size<TType>::value;
273 sink_node_helper<TUPLE_SIZE, SType>::add_sink_nodes(my_split,g);
274 typedef TType q3_input_type;
275 tbb::flow::queue_node< q3_input_type > q3(g);
276
277 tbb::flow::make_edge( q3, my_split );
278
279 // fill the queue with its value one-at-a-time
280 flags.clear();
281 for (int i = 0; i < Count; ++i ) {
282 TType v;
283 tuple_helper<TUPLE_SIZE>::set_element(v, i);
284 CHECK_MESSAGE(my_split.try_put(v), "");
285 flags.push_back(false);
286 }
287
288 g.wait_for_all();
289
290 sink_node_helper<TUPLE_SIZE,SType>::check_sink_values();
291
292 sink_node_helper<TUPLE_SIZE, SType>::remove_sink_nodes(my_split);
293
294 }
295
296 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
test_follows_and_precedes_api()297 void test_follows_and_precedes_api() {
298 using namespace tbb::flow;
299 using msg_t = std::tuple<int, float, double>;
300
301 graph g;
302
303 function_node<msg_t, msg_t> f1(g, unlimited, [](msg_t msg) { return msg; } );
304 auto f2(f1);
305 auto f3(f1);
306
307 std::atomic<int> body_calls;
308 body_calls = 0;
309
310 function_node<int, int> f4(g, unlimited, [&](int val) { ++body_calls; return val; } );
311 function_node<float, float> f5(g, unlimited, [&](float val) { ++body_calls; return val; } );
312 function_node<double, double> f6(g, unlimited, [&](double val) { ++body_calls; return val; } );
313
314 split_node<msg_t> following_node(follows(f1, f2, f3));
315 make_edge(output_port<0>(following_node), f4);
316 make_edge(output_port<1>(following_node), f5);
317 make_edge(output_port<2>(following_node), f6);
318
319 split_node<msg_t> preceding_node(precedes(f4, f5, f6));
320 make_edge(f1, preceding_node);
321 make_edge(f2, preceding_node);
322 make_edge(f3, preceding_node);
323
324 msg_t msg(1, 2.2f, 3.3);
325 f1.try_put(msg);
326 f2.try_put(msg);
327 f3.try_put(msg);
328
329 g.wait_for_all();
330
331 // <number of try puts> * <number of splits by a input node> * <number of input nodes>
332 CHECK_MESSAGE( ((body_calls == 3*3*2)), "Not exact edge quantity was made");
333 }
334 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
335
336 template<typename SType>
337 class serial_test {
338 typedef typename SType::input_type TType;
339 static const int TUPLE_SIZE = std::tuple_size<TType>::value;
340 static const int ELEMS = 3;
341 public:
test()342 static void test() {
343 tbb::flow::graph g;
344 flags.reserve(Count);
345 SType* my_split = makeSplit<TUPLE_SIZE,SType>::create(g);
346 sink_node_helper<TUPLE_SIZE, SType>::print_serial_remark(); INFO(" >\n");
347
348 test_output_ports_return_ref(*my_split);
349
350 test_one_serial<SType>(*my_split, g);
351 // build the vector with copy construction from the used split node.
352 std::vector<SType>split_vector(ELEMS, *my_split);
353 // destroy the tired old split_node in case we're accidentally reusing pieces of it.
354 makeSplit<TUPLE_SIZE,SType>::destroy(my_split);
355
356
357 for(int e = 0; e < ELEMS; ++e) { // exercise each of the vector elements
358 test_one_serial<SType>(split_vector[e], g);
359 }
360 }
361
362 }; // serial_test
363
364 template<
365 template<typename> class TestType, // serial_test or parallel_test
366 typename TupleType > // type of the input of the split
367 struct generate_test {
368 typedef tbb::flow::split_node<TupleType> split_node_type;
do_testgenerate_test369 static void do_test() {
370 TestType<split_node_type>::test();
371 }
372 }; // generate_test
373
374 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
375
test_deduction_guides()376 void test_deduction_guides() {
377 using namespace tbb::flow;
378 using tuple_type = std::tuple<int, int>;
379
380 graph g;
381 split_node<tuple_type> s0(g);
382
383 split_node s1(s0);
384 static_assert(std::is_same_v<decltype(s1), split_node<tuple_type>>);
385
386 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
387 broadcast_node<tuple_type> b1(g), b2(g);
388 broadcast_node<int> b3(g), b4(g);
389
390 split_node s2(follows(b1, b2));
391 static_assert(std::is_same_v<decltype(s2), split_node<tuple_type>>);
392
393 split_node s3(precedes(b3, b4));
394 static_assert(std::is_same_v<decltype(s3), split_node<tuple_type>>);
395 #endif
396 }
397
398 #endif
399
400 //! Test output ports and message passing with different input tuples
401 //! \brief \ref requirement \ref error_guessing
402 TEST_CASE("Tuple tests"){
403 for (int p = 0; p < 2; ++p) {
404 generate_test<serial_test, std::tuple<float, double> >::do_test();
405 #if MAX_TUPLE_TEST_SIZE >= 4
406 generate_test<serial_test, std::tuple<float, double, int, long> >::do_test();
407 #endif
408 #if MAX_TUPLE_TEST_SIZE >= 6
409 generate_test<serial_test, std::tuple<double, double, int, long, int, short> >::do_test();
410 #endif
411 #if MAX_TUPLE_TEST_SIZE >= 8
412 generate_test<serial_test, std::tuple<float, double, double, double, float, int, float, long> >::do_test();
413 #endif
414 #if MAX_TUPLE_TEST_SIZE >= 10
415 generate_test<serial_test, std::tuple<float, double, int, double, double, float, long, int, float, long> >::do_test();
416 #endif
417 generate_test<parallel_test, std::tuple<float, double> >::do_test();
418 #if MAX_TUPLE_TEST_SIZE >= 3
419 generate_test<parallel_test, std::tuple<float, int, long> >::do_test();
420 #endif
421 #if MAX_TUPLE_TEST_SIZE >= 5
422 generate_test<parallel_test, std::tuple<double, double, int, int, short> >::do_test();
423 #endif
424 #if MAX_TUPLE_TEST_SIZE >= 7
425 generate_test<parallel_test, std::tuple<float, int, double, float, long, float, long> >::do_test();
426 #endif
427 #if MAX_TUPLE_TEST_SIZE >= 9
428 generate_test<parallel_test, std::tuple<float, double, int, double, double, long, int, float, long> >::do_test();
429 #endif
430 }
431 }
432
433 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
434 //! Test decution guides
435 //! \brief \ref requirement
436 TEST_CASE("Test follows and precedes API"){
437 test_follows_and_precedes_api();
438 }
439 #endif
440
441 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
442 //! Test decution guides
443 //! \brief \ref requirement
444 TEST_CASE("Deduction guides"){
445 test_deduction_guides();
446 }
447 #endif
448
449