xref: /oneTBB/test/tbb/test_split_node.cpp (revision d86ed7fb)
1 /*
2     Copyright (c) 2005-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "common/config.h"
18 
19 // TODO revamp: move parts dependent on __TBB_EXTRA_DEBUG into separate test(s) since having these
20 // parts in all of tests might make testing of the product, which is different from what is actually
21 // released.
22 #define __TBB_EXTRA_DEBUG 1
23 #include "tbb/flow_graph.h"
24 
25 #include "common/test.h"
26 #include "common/utils.h"
27 #include "common/utils_assert.h"
28 #include "common/graph_utils.h"
29 
30 
31 //! \file test_split_node.cpp
32 //! \brief Test for [flow_graph.split_node] specification
33 
34 
35 #if defined(_MSC_VER) && _MSC_VER < 1600
36     #pragma warning (disable : 4503) //disabling the "decorated name length exceeded" warning for VS2008 and earlier
37 #endif
38 
39 //
40 // Tests
41 //
42 
43 const int Count = 300;
44 const int MaxPorts = 10;
45 const int MaxNInputs = 5; // max # of input_nodes to register for each split_node input in parallel test
46 
47 std::vector<bool> flags;   // for checking output
48 
49 template<typename T>
50 class name_of {
51 public:
52     static const char* name() { return  "Unknown"; }
53 };
54 template<>
55 class name_of<int> {
56 public:
57     static const char* name() { return  "int"; }
58 };
59 template<>
60 class name_of<float> {
61 public:
62     static const char* name() { return  "float"; }
63 };
64 template<>
65 class name_of<double> {
66 public:
67     static const char* name() { return  "double"; }
68 };
69 template<>
70 class name_of<long> {
71 public:
72     static const char* name() { return  "long"; }
73 };
74 template<>
75 class name_of<short> {
76 public:
77     static const char* name() { return  "short"; }
78 };
79 
80 // T must be arithmetic, and shouldn't wrap around for reasonable sizes of Count (which is now 150, and maxPorts is 10,
81 // so the max number generated right now is 1500 or so.)  Input will generate a series of TT with value
82 // (init_val + (i-1)*addend) * my_mult, where i is the i-th invocation of the body.  We are attaching addend
83 // input nodes to a join_port, and each will generate part of the numerical series the port is expecting
84 // to receive.  If there is only one input node, the series order will be maintained; if more than one,
85 // this is not guaranteed.
86 
87 template<int N>
88 struct tuple_helper {
89     template<typename TupleType>
90     static void set_element( TupleType &t, int i) {
91         std::get<N-1>(t) = (typename std::tuple_element<N-1,TupleType>::type)(i * (N+1));
92         tuple_helper<N-1>::set_element(t, i);
93     }
94 };
95 
96 template<>
97 struct tuple_helper<1> {
98     template<typename TupleType>
99     static void set_element(TupleType &t, int i) {
100         std::get<0>(t) = (typename std::tuple_element<0,TupleType>::type)(i * 2);
101     }
102 };
103 
104 // if we start N input_bodys they will all have the addend N, and my_count should be initialized to 0 .. N-1.
105 // the output tuples should have all the sequence, but the order will in general vary.
106 template<typename TupleType>
107 class my_input_body {
108     typedef TupleType TT;
109     static const int N = std::tuple_size<TT>::value;
110     int my_count;
111     int addend;
112 public:
113     my_input_body(int init_val, int addto) : my_count(init_val), addend(addto) { }
114     TT operator()( tbb::flow_control &fc) {
115         if(my_count >= Count){
116             fc.stop();
117             return TT();
118         }
119         TT v;
120         tuple_helper<N>::set_element(v, my_count);
121         my_count += addend;
122         return v;
123     }
124 };
125 
126 // allocator for split_node.
127 
128 template<int N, typename SType>
129 class makeSplit {
130 public:
131     static SType *create(tbb::flow::graph& g) {
132         SType *temp = new SType(g);
133         return temp;
134     }
135     static void destroy(SType *p) { delete p; }
136 };
137 
138 // holder for sink_node pointers for eventual deletion
139 
140 static void* all_sink_nodes[MaxPorts];
141 
142 
143 template<int ELEM, typename SType>
144 class sink_node_helper {
145 public:
146     typedef typename SType::input_type TT;
147     typedef typename std::tuple_element<ELEM-1,TT>::type IT;
148     typedef typename tbb::flow::queue_node<IT> my_sink_node_type;
149     static void print_parallel_remark() {
150         sink_node_helper<ELEM-1,SType>::print_parallel_remark();
151         INFO(", " << name_of<IT>::name());
152     }
153     static void print_serial_remark() {
154         sink_node_helper<ELEM-1,SType>::print_serial_remark();
155         INFO(", " << name_of<IT>::name());
156     }
157     static void add_sink_nodes(SType &my_split, tbb::flow::graph &g) {
158         my_sink_node_type *new_node = new my_sink_node_type(g);
159         tbb::flow::make_edge( tbb::flow::output_port<ELEM-1>(my_split) , *new_node);
160         all_sink_nodes[ELEM-1] = (void *)new_node;
161         sink_node_helper<ELEM-1, SType>::add_sink_nodes(my_split, g);
162     }
163 
164     static void check_sink_values() {
165         my_sink_node_type *dp = reinterpret_cast<my_sink_node_type *>(all_sink_nodes[ELEM-1]);
166         for(int i = 0; i < Count; ++i) {
167             IT v{};
168             CHECK_MESSAGE(dp->try_get(v), "");
169             flags[((int)v) / (ELEM+1)] = true;
170         }
171         for(int i = 0; i < Count; ++i) {
172             CHECK_MESSAGE(flags[i], "");
173             flags[i] = false;  // reset for next test
174         }
175         sink_node_helper<ELEM-1,SType>::check_sink_values();
176     }
177     static void remove_sink_nodes(SType& my_split) {
178         my_sink_node_type *dp = reinterpret_cast<my_sink_node_type *>(all_sink_nodes[ELEM-1]);
179         tbb::flow::remove_edge( tbb::flow::output_port<ELEM-1>(my_split) , *dp);
180         delete dp;
181         sink_node_helper<ELEM-1, SType>::remove_sink_nodes(my_split);
182     }
183 };
184 
185 template<typename SType>
186 class sink_node_helper<1, SType> {
187     typedef typename SType::input_type TT;
188     typedef typename std::tuple_element<0,TT>::type IT;
189     typedef typename tbb::flow::queue_node<IT> my_sink_node_type;
190 public:
191     static void print_parallel_remark() {
192         INFO("Parallel test of split_node< " << name_of<IT>::name());
193     }
194     static void print_serial_remark() {
195         INFO("Serial test of split_node< " << name_of<IT>::name());
196     }
197     static void add_sink_nodes(SType &my_split, tbb::flow::graph &g) {
198         my_sink_node_type *new_node = new my_sink_node_type(g);
199         tbb::flow::make_edge( tbb::flow::output_port<0>(my_split) , *new_node);
200         all_sink_nodes[0] = (void *)new_node;
201     }
202     static void check_sink_values() {
203         my_sink_node_type *dp = reinterpret_cast<my_sink_node_type *>(all_sink_nodes[0]);
204         for(int i = 0; i < Count; ++i) {
205             IT v{};
206             CHECK_MESSAGE(dp->try_get(v), "");
207             flags[((int)v) / 2] = true;
208         }
209         for(int i = 0; i < Count; ++i) {
210             CHECK_MESSAGE(flags[i], "");
211             flags[i] = false;  // reset for next test
212         }
213     }
214     static void remove_sink_nodes(SType& my_split) {
215         my_sink_node_type *dp = reinterpret_cast<my_sink_node_type *>(all_sink_nodes[0]);
216         tbb::flow::remove_edge( tbb::flow::output_port<0>(my_split) , *dp);
217         delete dp;
218     }
219 };
220 
221 // parallel_test: create input_nodes that feed tuples into the split node
222 //    and queue_nodes that receive the output.
223 template<typename SType>
224 class parallel_test {
225 public:
226     typedef typename SType::input_type TType;
227     typedef tbb::flow::input_node<TType> input_type;
228     static const int N = std::tuple_size<TType>::value;
229 
230     static void test() {
231         input_type* all_input_nodes[MaxNInputs];
232         sink_node_helper<N,SType>::print_parallel_remark();
233         INFO(" >\n");
234         for(int i=0; i < MaxPorts; ++i) {
235             all_sink_nodes[i] = NULL;
236         }
237         // try test for # inputs 1 .. MaxNInputs
238         for(int nInputs = 1; nInputs <= MaxNInputs; ++nInputs) {
239             tbb::flow::graph g;
240             SType* my_split = makeSplit<N,SType>::create(g);
241 
242             // add sinks first so when inputs start spitting out values they are there to catch them
243             sink_node_helper<N, SType>::add_sink_nodes((*my_split), g);
244 
245             // now create nInputs input_nodes, each spitting out i, i+nInputs, i+2*nInputs ...
246             // each element of the tuple is i*(n+1), where n is the tuple element index (1-N)
247             for(int i = 0; i < nInputs; ++i) {
248                 // create input node
249                 input_type *s = new input_type(g, my_input_body<TType>(i, nInputs) );
250                 tbb::flow::make_edge(*s, *my_split);
251                 all_input_nodes[i] = s;
252                 s->activate();
253             }
254 
255             g.wait_for_all();
256 
257             // check that we got Count values in each output queue, and all the index values
258             // are there.
259             sink_node_helper<N, SType>::check_sink_values();
260 
261             sink_node_helper<N, SType>::remove_sink_nodes(*my_split);
262             for(int i = 0; i < nInputs; ++i) {
263                 delete all_input_nodes[i];
264             }
265             makeSplit<N,SType>::destroy(my_split);
266         }
267     }
268 };
269 
270 //
271 // Single predecessor, single accepting successor at each port
272 
273 template<typename SType>
274 void test_one_serial( SType &my_split, tbb::flow::graph &g) {
275     typedef typename SType::input_type TType;
276     static const int TUPLE_SIZE = std::tuple_size<TType>::value;
277     sink_node_helper<TUPLE_SIZE, SType>::add_sink_nodes(my_split,g);
278     typedef TType q3_input_type;
279     tbb::flow::queue_node< q3_input_type >  q3(g);
280 
281     tbb::flow::make_edge( q3, my_split );
282 
283     // fill the  queue with its value one-at-a-time
284     flags.clear();
285     for (int i = 0; i < Count; ++i ) {
286         TType v;
287         tuple_helper<TUPLE_SIZE>::set_element(v, i);
288         CHECK_MESSAGE(my_split.try_put(v), "");
289         flags.push_back(false);
290     }
291 
292     g.wait_for_all();
293 
294     sink_node_helper<TUPLE_SIZE,SType>::check_sink_values();
295 
296     sink_node_helper<TUPLE_SIZE, SType>::remove_sink_nodes(my_split);
297 
298 }
299 
300 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
301 void test_follows_and_precedes_api() {
302     using namespace tbb::flow;
303     using msg_t = std::tuple<int, float, double>;
304 
305     graph g;
306 
307     function_node<msg_t, msg_t> f1(g, unlimited, [](msg_t msg) { return msg; } );
308     auto f2(f1);
309     auto f3(f1);
310 
311     std::atomic<int> body_calls;
312     body_calls = 0;
313 
314     function_node<int, int> f4(g, unlimited, [&](int val) { ++body_calls; return val; } );
315     function_node<float, float> f5(g, unlimited, [&](float val) { ++body_calls; return val; } );
316     function_node<double, double> f6(g, unlimited, [&](double val) { ++body_calls; return val; } );
317 
318     split_node<msg_t> following_node(follows(f1, f2, f3));
319     make_edge(output_port<0>(following_node), f4);
320     make_edge(output_port<1>(following_node), f5);
321     make_edge(output_port<2>(following_node), f6);
322 
323     split_node<msg_t> preceding_node(precedes(f4, f5, f6));
324     make_edge(f1, preceding_node);
325     make_edge(f2, preceding_node);
326     make_edge(f3, preceding_node);
327 
328     msg_t msg(1, 2.2f, 3.3);
329     f1.try_put(msg);
330     f2.try_put(msg);
331     f3.try_put(msg);
332 
333     g.wait_for_all();
334 
335     // <number of try puts> * <number of splits by a input node> * <number of input nodes>
336     CHECK_MESSAGE( ((body_calls == 3*3*2)), "Not exact edge quantity was made");
337 }
338 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
339 
340 template<typename SType>
341 class serial_test {
342     typedef typename SType::input_type TType;
343     static const int TUPLE_SIZE = std::tuple_size<TType>::value;
344     static const int ELEMS = 3;
345 public:
346 static void test() {
347     tbb::flow::graph g;
348     flags.reserve(Count);
349     SType* my_split = makeSplit<TUPLE_SIZE,SType>::create(g);
350     sink_node_helper<TUPLE_SIZE, SType>::print_serial_remark(); INFO(" >\n");
351 
352     test_output_ports_return_ref(*my_split);
353 
354     test_one_serial<SType>(*my_split, g);
355     // build the vector with copy construction from the used split node.
356     std::vector<SType>split_vector(ELEMS, *my_split);
357     // destroy the tired old split_node in case we're accidentally reusing pieces of it.
358     makeSplit<TUPLE_SIZE,SType>::destroy(my_split);
359 
360 
361     for(int e = 0; e < ELEMS; ++e) {  // exercise each of the vector elements
362         test_one_serial<SType>(split_vector[e], g);
363     }
364 }
365 
366 }; // serial_test
367 
368 template<
369       template<typename> class TestType,  // serial_test or parallel_test
370       typename TupleType >                               // type of the input of the split
371 struct generate_test {
372     typedef tbb::flow::split_node<TupleType> split_node_type;
373     static void do_test() {
374         TestType<split_node_type>::test();
375     }
376 }; // generate_test
377 
378 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
379 
380 void test_deduction_guides() {
381     using namespace tbb::flow;
382     using tuple_type = std::tuple<int, int>;
383 
384     graph g;
385     split_node<tuple_type> s0(g);
386 
387     split_node s1(s0);
388     static_assert(std::is_same_v<decltype(s1), split_node<tuple_type>>);
389 
390 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
391     broadcast_node<tuple_type> b1(g), b2(g);
392     broadcast_node<int> b3(g), b4(g);
393 
394     split_node s2(follows(b1, b2));
395     static_assert(std::is_same_v<decltype(s2), split_node<tuple_type>>);
396 
397     split_node s3(precedes(b3, b4));
398     static_assert(std::is_same_v<decltype(s3), split_node<tuple_type>>);
399 #endif
400 }
401 
402 #endif
403 
404 //! Test output ports and message passing with different input tuples
405 //! \brief \ref requirement \ref error_guessing
406 TEST_CASE("Tuple tests"){
407     for (int p = 0; p < 2; ++p) {
408         generate_test<serial_test, std::tuple<float, double> >::do_test();
409 #if MAX_TUPLE_TEST_SIZE >= 4
410         generate_test<serial_test, std::tuple<float, double, int, long> >::do_test();
411 #endif
412 #if MAX_TUPLE_TEST_SIZE >= 6
413         generate_test<serial_test, std::tuple<double, double, int, long, int, short> >::do_test();
414 #endif
415 #if MAX_TUPLE_TEST_SIZE >= 8
416         generate_test<serial_test, std::tuple<float, double, double, double, float, int, float, long> >::do_test();
417 #endif
418 #if MAX_TUPLE_TEST_SIZE >= 10
419         generate_test<serial_test, std::tuple<float, double, int, double, double, float, long, int, float, long> >::do_test();
420 #endif
421         generate_test<parallel_test, std::tuple<float, double> >::do_test();
422 #if MAX_TUPLE_TEST_SIZE >= 3
423         generate_test<parallel_test, std::tuple<float, int, long> >::do_test();
424 #endif
425 #if MAX_TUPLE_TEST_SIZE >= 5
426         generate_test<parallel_test, std::tuple<double, double, int, int, short> >::do_test();
427 #endif
428 #if MAX_TUPLE_TEST_SIZE >= 7
429         generate_test<parallel_test, std::tuple<float, int, double, float, long, float, long> >::do_test();
430 #endif
431 #if MAX_TUPLE_TEST_SIZE >= 9
432         generate_test<parallel_test, std::tuple<float, double, int, double, double, long, int, float, long> >::do_test();
433 #endif
434     }
435 }
436 
437 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
438 //! Test decution guides
439 //! \brief \ref requirement
440 TEST_CASE("Test follows and precedes API"){
441     test_follows_and_precedes_api();
442 }
443 #endif
444 
445 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
446 //! Test decution guides
447 //! \brief \ref requirement
448 TEST_CASE("Deduction guides"){
449     test_deduction_guides();
450 }
451 #endif
452 
453