xref: /oneTBB/test/tbb/test_split_node.cpp (revision c21e688a)
1 /*
2     Copyright (c) 2005-2022 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "common/config.h"
18 
19 #include "tbb/flow_graph.h"
20 
21 #include "common/test.h"
22 #include "common/utils.h"
23 #include "common/utils_assert.h"
24 #include "common/graph_utils.h"
25 
26 
27 //! \file test_split_node.cpp
28 //! \brief Test for [flow_graph.split_node] specification
29 
30 
31 #if defined(_MSC_VER) && _MSC_VER < 1600
32     #pragma warning (disable : 4503) //disabling the "decorated name length exceeded" warning for VS2008 and earlier
33 #endif
34 
35 //
36 // Tests
37 //
38 
39 const int Count = 300;
40 const int MaxPorts = 10;
41 const int MaxNInputs = 5; // max # of input_nodes to register for each split_node input in parallel test
42 
43 std::vector<bool> flags;   // for checking output
44 
45 template<typename T>
46 class name_of {
47 public:
name()48     static const char* name() { return  "Unknown"; }
49 };
50 template<>
51 class name_of<int> {
52 public:
name()53     static const char* name() { return  "int"; }
54 };
55 template<>
56 class name_of<float> {
57 public:
name()58     static const char* name() { return  "float"; }
59 };
60 template<>
61 class name_of<double> {
62 public:
name()63     static const char* name() { return  "double"; }
64 };
65 template<>
66 class name_of<long> {
67 public:
name()68     static const char* name() { return  "long"; }
69 };
70 template<>
71 class name_of<short> {
72 public:
name()73     static const char* name() { return  "short"; }
74 };
75 
76 // T must be arithmetic, and shouldn't wrap around for reasonable sizes of Count (which is now 150, and maxPorts is 10,
77 // so the max number generated right now is 1500 or so.)  Input will generate a series of TT with value
78 // (init_val + (i-1)*addend) * my_mult, where i is the i-th invocation of the body.  We are attaching addend
79 // input nodes to a join_port, and each will generate part of the numerical series the port is expecting
80 // to receive.  If there is only one input node, the series order will be maintained; if more than one,
81 // this is not guaranteed.
82 
83 template<int N>
84 struct tuple_helper {
85     template<typename TupleType>
set_elementtuple_helper86     static void set_element( TupleType &t, int i) {
87         std::get<N-1>(t) = (typename std::tuple_element<N-1,TupleType>::type)(i * (N+1));
88         tuple_helper<N-1>::set_element(t, i);
89     }
90 };
91 
92 template<>
93 struct tuple_helper<1> {
94     template<typename TupleType>
set_elementtuple_helper95     static void set_element(TupleType &t, int i) {
96         std::get<0>(t) = (typename std::tuple_element<0,TupleType>::type)(i * 2);
97     }
98 };
99 
100 // if we start N input_bodys they will all have the addend N, and my_count should be initialized to 0 .. N-1.
101 // the output tuples should have all the sequence, but the order will in general vary.
102 template<typename TupleType>
103 class my_input_body {
104     typedef TupleType TT;
105     static const int N = std::tuple_size<TT>::value;
106     int my_count;
107     int addend;
108 public:
my_input_body(int init_val,int addto)109     my_input_body(int init_val, int addto) : my_count(init_val), addend(addto) { }
operator ()(tbb::flow_control & fc)110     TT operator()( tbb::flow_control &fc) {
111         if(my_count >= Count){
112             fc.stop();
113             return TT();
114         }
115         TT v;
116         tuple_helper<N>::set_element(v, my_count);
117         my_count += addend;
118         return v;
119     }
120 };
121 
122 // allocator for split_node.
123 
124 template<int N, typename SType>
125 class makeSplit {
126 public:
create(tbb::flow::graph & g)127     static SType *create(tbb::flow::graph& g) {
128         SType *temp = new SType(g);
129         return temp;
130     }
destroy(SType * p)131     static void destroy(SType *p) { delete p; }
132 };
133 
134 // holder for sink_node pointers for eventual deletion
135 
136 static void* all_sink_nodes[MaxPorts];
137 
138 
139 template<int ELEM, typename SType>
140 class sink_node_helper {
141 public:
142     typedef typename SType::input_type TT;
143     typedef typename std::tuple_element<ELEM-1,TT>::type IT;
144     typedef typename tbb::flow::queue_node<IT> my_sink_node_type;
print_parallel_remark()145     static void print_parallel_remark() {
146         sink_node_helper<ELEM-1,SType>::print_parallel_remark();
147         INFO(", " << name_of<IT>::name());
148     }
print_serial_remark()149     static void print_serial_remark() {
150         sink_node_helper<ELEM-1,SType>::print_serial_remark();
151         INFO(", " << name_of<IT>::name());
152     }
add_sink_nodes(SType & my_split,tbb::flow::graph & g)153     static void add_sink_nodes(SType &my_split, tbb::flow::graph &g) {
154         my_sink_node_type *new_node = new my_sink_node_type(g);
155         tbb::flow::make_edge( tbb::flow::output_port<ELEM-1>(my_split) , *new_node);
156         all_sink_nodes[ELEM-1] = (void *)new_node;
157         sink_node_helper<ELEM-1, SType>::add_sink_nodes(my_split, g);
158     }
159 
check_sink_values()160     static void check_sink_values() {
161         my_sink_node_type *dp = reinterpret_cast<my_sink_node_type *>(all_sink_nodes[ELEM-1]);
162         for(int i = 0; i < Count; ++i) {
163             IT v{};
164             CHECK_MESSAGE(dp->try_get(v), "");
165             flags[((int)v) / (ELEM+1)] = true;
166         }
167         for(int i = 0; i < Count; ++i) {
168             CHECK_MESSAGE(flags[i], "");
169             flags[i] = false;  // reset for next test
170         }
171         sink_node_helper<ELEM-1,SType>::check_sink_values();
172     }
remove_sink_nodes(SType & my_split)173     static void remove_sink_nodes(SType& my_split) {
174         my_sink_node_type *dp = reinterpret_cast<my_sink_node_type *>(all_sink_nodes[ELEM-1]);
175         tbb::flow::remove_edge( tbb::flow::output_port<ELEM-1>(my_split) , *dp);
176         delete dp;
177         sink_node_helper<ELEM-1, SType>::remove_sink_nodes(my_split);
178     }
179 };
180 
181 template<typename SType>
182 class sink_node_helper<1, SType> {
183     typedef typename SType::input_type TT;
184     typedef typename std::tuple_element<0,TT>::type IT;
185     typedef typename tbb::flow::queue_node<IT> my_sink_node_type;
186 public:
print_parallel_remark()187     static void print_parallel_remark() {
188         INFO("Parallel test of split_node< " << name_of<IT>::name());
189     }
print_serial_remark()190     static void print_serial_remark() {
191         INFO("Serial test of split_node< " << name_of<IT>::name());
192     }
add_sink_nodes(SType & my_split,tbb::flow::graph & g)193     static void add_sink_nodes(SType &my_split, tbb::flow::graph &g) {
194         my_sink_node_type *new_node = new my_sink_node_type(g);
195         tbb::flow::make_edge( tbb::flow::output_port<0>(my_split) , *new_node);
196         all_sink_nodes[0] = (void *)new_node;
197     }
check_sink_values()198     static void check_sink_values() {
199         my_sink_node_type *dp = reinterpret_cast<my_sink_node_type *>(all_sink_nodes[0]);
200         for(int i = 0; i < Count; ++i) {
201             IT v{};
202             CHECK_MESSAGE(dp->try_get(v), "");
203             flags[((int)v) / 2] = true;
204         }
205         for(int i = 0; i < Count; ++i) {
206             CHECK_MESSAGE(flags[i], "");
207             flags[i] = false;  // reset for next test
208         }
209     }
remove_sink_nodes(SType & my_split)210     static void remove_sink_nodes(SType& my_split) {
211         my_sink_node_type *dp = reinterpret_cast<my_sink_node_type *>(all_sink_nodes[0]);
212         tbb::flow::remove_edge( tbb::flow::output_port<0>(my_split) , *dp);
213         delete dp;
214     }
215 };
216 
217 // parallel_test: create input_nodes that feed tuples into the split node
218 //    and queue_nodes that receive the output.
219 template<typename SType>
220 class parallel_test {
221 public:
222     typedef typename SType::input_type TType;
223     typedef tbb::flow::input_node<TType> input_type;
224     static const int N = std::tuple_size<TType>::value;
225 
test()226     static void test() {
227         input_type* all_input_nodes[MaxNInputs];
228         sink_node_helper<N,SType>::print_parallel_remark();
229         INFO(" >\n");
230         for(int i=0; i < MaxPorts; ++i) {
231             all_sink_nodes[i] = nullptr;
232         }
233         // try test for # inputs 1 .. MaxNInputs
234         for(int nInputs = 1; nInputs <= MaxNInputs; ++nInputs) {
235             tbb::flow::graph g;
236             SType* my_split = makeSplit<N,SType>::create(g);
237 
238             // add sinks first so when inputs start spitting out values they are there to catch them
239             sink_node_helper<N, SType>::add_sink_nodes((*my_split), g);
240 
241             // now create nInputs input_nodes, each spitting out i, i+nInputs, i+2*nInputs ...
242             // each element of the tuple is i*(n+1), where n is the tuple element index (1-N)
243             for(int i = 0; i < nInputs; ++i) {
244                 // create input node
245                 input_type *s = new input_type(g, my_input_body<TType>(i, nInputs) );
246                 tbb::flow::make_edge(*s, *my_split);
247                 all_input_nodes[i] = s;
248                 s->activate();
249             }
250 
251             g.wait_for_all();
252 
253             // check that we got Count values in each output queue, and all the index values
254             // are there.
255             sink_node_helper<N, SType>::check_sink_values();
256 
257             sink_node_helper<N, SType>::remove_sink_nodes(*my_split);
258             for(int i = 0; i < nInputs; ++i) {
259                 delete all_input_nodes[i];
260             }
261             makeSplit<N,SType>::destroy(my_split);
262         }
263     }
264 };
265 
266 //
267 // Single predecessor, single accepting successor at each port
268 
269 template<typename SType>
test_one_serial(SType & my_split,tbb::flow::graph & g)270 void test_one_serial( SType &my_split, tbb::flow::graph &g) {
271     typedef typename SType::input_type TType;
272     static const int TUPLE_SIZE = std::tuple_size<TType>::value;
273     sink_node_helper<TUPLE_SIZE, SType>::add_sink_nodes(my_split,g);
274     typedef TType q3_input_type;
275     tbb::flow::queue_node< q3_input_type >  q3(g);
276 
277     tbb::flow::make_edge( q3, my_split );
278 
279     // fill the  queue with its value one-at-a-time
280     flags.clear();
281     for (int i = 0; i < Count; ++i ) {
282         TType v;
283         tuple_helper<TUPLE_SIZE>::set_element(v, i);
284         CHECK_MESSAGE(my_split.try_put(v), "");
285         flags.push_back(false);
286     }
287 
288     g.wait_for_all();
289 
290     sink_node_helper<TUPLE_SIZE,SType>::check_sink_values();
291 
292     sink_node_helper<TUPLE_SIZE, SType>::remove_sink_nodes(my_split);
293 
294 }
295 
296 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
test_follows_and_precedes_api()297 void test_follows_and_precedes_api() {
298     using namespace tbb::flow;
299     using msg_t = std::tuple<int, float, double>;
300 
301     graph g;
302 
303     function_node<msg_t, msg_t> f1(g, unlimited, [](msg_t msg) { return msg; } );
304     auto f2(f1);
305     auto f3(f1);
306 
307     std::atomic<int> body_calls;
308     body_calls = 0;
309 
310     function_node<int, int> f4(g, unlimited, [&](int val) { ++body_calls; return val; } );
311     function_node<float, float> f5(g, unlimited, [&](float val) { ++body_calls; return val; } );
312     function_node<double, double> f6(g, unlimited, [&](double val) { ++body_calls; return val; } );
313 
314     split_node<msg_t> following_node(follows(f1, f2, f3));
315     make_edge(output_port<0>(following_node), f4);
316     make_edge(output_port<1>(following_node), f5);
317     make_edge(output_port<2>(following_node), f6);
318 
319     split_node<msg_t> preceding_node(precedes(f4, f5, f6));
320     make_edge(f1, preceding_node);
321     make_edge(f2, preceding_node);
322     make_edge(f3, preceding_node);
323 
324     msg_t msg(1, 2.2f, 3.3);
325     f1.try_put(msg);
326     f2.try_put(msg);
327     f3.try_put(msg);
328 
329     g.wait_for_all();
330 
331     // <number of try puts> * <number of splits by a input node> * <number of input nodes>
332     CHECK_MESSAGE( ((body_calls == 3*3*2)), "Not exact edge quantity was made");
333 }
334 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
335 
336 template<typename SType>
337 class serial_test {
338     typedef typename SType::input_type TType;
339     static const int TUPLE_SIZE = std::tuple_size<TType>::value;
340     static const int ELEMS = 3;
341 public:
test()342 static void test() {
343     tbb::flow::graph g;
344     flags.reserve(Count);
345     SType* my_split = makeSplit<TUPLE_SIZE,SType>::create(g);
346     sink_node_helper<TUPLE_SIZE, SType>::print_serial_remark(); INFO(" >\n");
347 
348     test_output_ports_return_ref(*my_split);
349 
350     test_one_serial<SType>(*my_split, g);
351     // build the vector with copy construction from the used split node.
352     std::vector<SType>split_vector(ELEMS, *my_split);
353     // destroy the tired old split_node in case we're accidentally reusing pieces of it.
354     makeSplit<TUPLE_SIZE,SType>::destroy(my_split);
355 
356 
357     for(int e = 0; e < ELEMS; ++e) {  // exercise each of the vector elements
358         test_one_serial<SType>(split_vector[e], g);
359     }
360 }
361 
362 }; // serial_test
363 
364 template<
365       template<typename> class TestType,  // serial_test or parallel_test
366       typename TupleType >                               // type of the input of the split
367 struct generate_test {
368     typedef tbb::flow::split_node<TupleType> split_node_type;
do_testgenerate_test369     static void do_test() {
370         TestType<split_node_type>::test();
371     }
372 }; // generate_test
373 
374 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
375 
test_deduction_guides()376 void test_deduction_guides() {
377     using namespace tbb::flow;
378     using tuple_type = std::tuple<int, int>;
379 
380     graph g;
381     split_node<tuple_type> s0(g);
382 
383     split_node s1(s0);
384     static_assert(std::is_same_v<decltype(s1), split_node<tuple_type>>);
385 
386 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
387     broadcast_node<tuple_type> b1(g), b2(g);
388     broadcast_node<int> b3(g), b4(g);
389 
390     split_node s2(follows(b1, b2));
391     static_assert(std::is_same_v<decltype(s2), split_node<tuple_type>>);
392 
393     split_node s3(precedes(b3, b4));
394     static_assert(std::is_same_v<decltype(s3), split_node<tuple_type>>);
395 #endif
396 }
397 
398 #endif
399 
400 //! Test output ports and message passing with different input tuples
401 //! \brief \ref requirement \ref error_guessing
402 TEST_CASE("Tuple tests"){
403     for (int p = 0; p < 2; ++p) {
404         generate_test<serial_test, std::tuple<float, double> >::do_test();
405 #if MAX_TUPLE_TEST_SIZE >= 4
406         generate_test<serial_test, std::tuple<float, double, int, long> >::do_test();
407 #endif
408 #if MAX_TUPLE_TEST_SIZE >= 6
409         generate_test<serial_test, std::tuple<double, double, int, long, int, short> >::do_test();
410 #endif
411 #if MAX_TUPLE_TEST_SIZE >= 8
412         generate_test<serial_test, std::tuple<float, double, double, double, float, int, float, long> >::do_test();
413 #endif
414 #if MAX_TUPLE_TEST_SIZE >= 10
415         generate_test<serial_test, std::tuple<float, double, int, double, double, float, long, int, float, long> >::do_test();
416 #endif
417         generate_test<parallel_test, std::tuple<float, double> >::do_test();
418 #if MAX_TUPLE_TEST_SIZE >= 3
419         generate_test<parallel_test, std::tuple<float, int, long> >::do_test();
420 #endif
421 #if MAX_TUPLE_TEST_SIZE >= 5
422         generate_test<parallel_test, std::tuple<double, double, int, int, short> >::do_test();
423 #endif
424 #if MAX_TUPLE_TEST_SIZE >= 7
425         generate_test<parallel_test, std::tuple<float, int, double, float, long, float, long> >::do_test();
426 #endif
427 #if MAX_TUPLE_TEST_SIZE >= 9
428         generate_test<parallel_test, std::tuple<float, double, int, double, double, long, int, float, long> >::do_test();
429 #endif
430     }
431 }
432 
433 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
434 //! Test decution guides
435 //! \brief \ref requirement
436 TEST_CASE("Test follows and precedes API"){
437     test_follows_and_precedes_api();
438 }
439 #endif
440 
441 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
442 //! Test decution guides
443 //! \brief \ref requirement
444 TEST_CASE("Deduction guides"){
445     test_deduction_guides();
446 }
447 #endif
448 
449