xref: /oneTBB/test/tbb/test_composite_node.cpp (revision d86ed7fb)
1 /*
2     Copyright (c) 2005-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "common/config.h"
18 
19 // TODO revamp: move parts dependent on __TBB_EXTRA_DEBUG into separate test(s) since having these
20 // parts in all of tests might make testing of the product, which is different from what is actually
21 // released.
22 #define __TBB_EXTRA_DEBUG 1
23 #include "tbb/flow_graph.h"
24 
25 #include "common/test.h"
26 #include "common/utils.h"
27 #include "common/graph_utils.h"
28 
29 #include <tuple>
30 #include <cmath>
31 #include <vector>
32 
33 
34 //! \file test_composite_node.cpp
35 //! \brief Test for [flow_graph.composite_node] specification
36 
37 
38 struct passthru_body {
39     int operator()( int i ) {
40         return i;
41     }
42 };
43 
44 class my_input_body{
45     int start;
46     int finish;
47     int step;
48 public:
49     my_input_body(int f, int s) : start(1), finish(f), step(s) {}
50     int operator()(tbb::flow_control& fc) {
51        int a = start;
52        if (start <= finish) {
53            a = start;
54            start+=step;
55            return a;
56        }
57        else {
58            fc.stop();
59            return int();
60        };
61    }
62 };
63 
64 struct m_fxn_body{
65     void operator()(int, tbb::flow::multifunction_node<int, std::tuple<int,int> >::output_ports_type ) {}
66 };
67 
68 struct ct_body {
69 ct_body(){}
70     void operator()(tbb::flow::continue_msg){}
71 };
72 
73 struct seq_body {
74 int operator()(int i){return i;}
75 };
76 
77 template<int N, typename T1, typename T2>
78 struct compare {
79     static void compare_refs(T1 tuple1, T2 tuple2) {
80     CHECK_MESSAGE( ( &std::get<N>(tuple1) == &std::get<N>(tuple2)), "ports not set correctly");
81     compare<N-1, T1, T2>::compare_refs(tuple1, tuple2);
82     }
83 };
84 
85 template<typename T1, typename T2>
86 struct compare<1, T1, T2> {
87     static void compare_refs(T1 tuple1, T2 tuple2) {
88     CHECK_MESSAGE( (&std::get<0>(tuple1) == &std::get<0>(tuple2)), "port 0 not correctly set");
89     }
90 };
91 
92 void add_all_nodes (){
93     tbb::flow::graph g;
94 
95     typedef std::tuple<tbb::flow::continue_msg, std::tuple<int, int>, int, int, int, int,
96                              int, int, int, int, int, int, int, int > InputTupleType;
97 
98     typedef std::tuple<tbb::flow::continue_msg, std::tuple<int, int>, tbb::flow::tagged_msg<size_t, int, float>,
99                              int, int, int, int, int, int, int, int, int, int, int, int >  OutputTupleType;
100 
101     typedef std::tuple< > EmptyTupleType;
102 
103     typedef tbb::flow::composite_node<InputTupleType, OutputTupleType > input_output_type;
104     typedef tbb::flow::composite_node<InputTupleType, EmptyTupleType > input_only_type;
105     typedef tbb::flow::composite_node<EmptyTupleType, OutputTupleType > output_only_type;
106 
107     const size_t NUM_INPUTS = std::tuple_size<InputTupleType>::value;
108     const size_t NUM_OUTPUTS = std::tuple_size<OutputTupleType>::value;
109 
110     //node types
111     tbb::flow::continue_node<tbb::flow::continue_msg> ct(g, ct_body());
112     tbb::flow::split_node< std::tuple<int, int> > s(g);
113     tbb::flow::input_node<int> src(g, my_input_body(20,5));
114     tbb::flow::function_node<int, int> fxn(g, tbb::flow::unlimited, passthru_body());
115     tbb::flow::multifunction_node<int, std::tuple<int, int> > m_fxn(g, tbb::flow::unlimited, m_fxn_body());
116     tbb::flow::broadcast_node<int> bc(g);
117     tbb::flow::limiter_node<int> lim(g, 2);
118     tbb::flow::indexer_node<int, float> ind(g);
119     tbb::flow::join_node< std::tuple< int, int >, tbb::flow::queueing > j(g);
120     tbb::flow::queue_node<int> q(g);
121     tbb::flow::buffer_node<int> bf(g);
122     tbb::flow::priority_queue_node<int> pq(g);
123     tbb::flow::write_once_node<int> wo(g);
124     tbb::flow::overwrite_node<int> ovw(g);
125     tbb::flow::sequencer_node<int> seq(g, seq_body());
126 
127     auto input_tuple = std::tie(ct, s, m_fxn, fxn, bc, tbb::flow::input_port<0>(j), lim, q, tbb::flow::input_port<0>(ind),
128                                 pq, ovw, wo, bf, seq);
129     auto output_tuple = std::tie(ct,j, ind, fxn, src, bc, tbb::flow::output_port<0>(s), lim, tbb::flow::output_port<0>(m_fxn),
130                                  q, pq, ovw, wo, bf, seq );
131 
132     //composite_node with both input_ports and output_ports
133     input_output_type a_node(g);
134     a_node.set_external_ports(input_tuple, output_tuple);
135 
136     a_node.add_visible_nodes(src, fxn, m_fxn, bc, lim, ind, s, ct, j, q, bf, pq, wo, ovw, seq);
137     a_node.add_nodes(src, fxn, m_fxn, bc, lim, ind, s, ct, j, q, bf, pq, wo, ovw, seq);
138 
139     auto a_node_input_ports_ptr = a_node.input_ports();
140     compare<NUM_INPUTS-1, decltype(a_node_input_ports_ptr), decltype(input_tuple)>::compare_refs(a_node_input_ports_ptr, input_tuple);
141     CHECK_MESSAGE(NUM_INPUTS == std::tuple_size<decltype(a_node_input_ports_ptr)>::value, "not all declared input ports were bound to nodes");
142 
143     auto a_node_output_ports_ptr = a_node.output_ports();
144     compare<NUM_OUTPUTS-1, decltype(a_node_output_ports_ptr), decltype(output_tuple)>::compare_refs(a_node_output_ports_ptr, output_tuple);
145     CHECK_MESSAGE( (NUM_OUTPUTS == std::tuple_size<decltype(a_node_output_ports_ptr)>::value), "not all declared output ports were bound to nodes");
146 
147     //composite_node with only input_ports
148     input_only_type b_node(g);
149     b_node.set_external_ports(input_tuple);
150 
151     b_node.add_visible_nodes(src, fxn, m_fxn, bc, lim, ind, s, ct, j, q, bf, pq, wo, ovw, seq);
152     b_node.add_nodes(src, fxn, m_fxn, bc, lim, ind, s, ct, j, q, bf, pq, wo, ovw, seq);
153 
154     auto b_node_input_ports_ptr = b_node.input_ports();
155     compare<NUM_INPUTS-1, decltype(b_node_input_ports_ptr), decltype(input_tuple)>::compare_refs(b_node_input_ports_ptr, input_tuple);
156     CHECK_MESSAGE(NUM_INPUTS == std::tuple_size<decltype(b_node_input_ports_ptr)>::value, "not all declared input ports were bound to nodes");
157 
158     //composite_node with only output_ports
159     output_only_type c_node(g);
160     c_node.set_external_ports(output_tuple);
161 
162     // Reset is not suppose to do anything. Check that it can be called.
163     g.reset();
164 
165     c_node.add_visible_nodes(src, fxn, m_fxn, bc, lim, ind, s, ct, j, q, bf, pq, wo, ovw, seq);
166 
167     c_node.add_nodes(src, fxn, m_fxn, bc, lim, ind, s, ct, j, q, bf, pq, wo, ovw, seq);
168 
169     auto c_node_output_ports_ptr = c_node.output_ports();
170     compare<NUM_OUTPUTS-1, decltype(c_node_output_ports_ptr), decltype(output_tuple)>::compare_refs(c_node_output_ports_ptr, output_tuple);
171     CHECK_MESSAGE(NUM_OUTPUTS == std::tuple_size<decltype(c_node_output_ports_ptr)>::value, "not all declared input ports were bound to nodes");
172 }
173 
174 struct tiny_node : public tbb::flow::composite_node< std::tuple< int >, std::tuple< int > > {
175     tbb::flow::function_node< int, int > f1;
176     tbb::flow::function_node< int, int > f2;
177     typedef tbb::flow::composite_node< std::tuple< int >, std::tuple< int > > base_type;
178 
179 public:
180     tiny_node(tbb::flow::graph &g, bool hidden = false) : base_type(g), f1(g, tbb::flow::unlimited, passthru_body() ), f2(g, tbb::flow::unlimited, passthru_body() ) {
181         tbb::flow::make_edge( f1, f2 );
182 
183         std::tuple<tbb::flow::function_node< int, int >& > input_tuple(f1);
184         std::tuple<tbb::flow::function_node< int, int >& > output_tuple(f2);
185         base_type::set_external_ports( input_tuple, output_tuple );
186 
187         if(hidden)
188             base_type::add_nodes(f1, f2);
189         else
190             base_type::add_visible_nodes(f1, f2);
191 
192     }
193 };
194 
195 int test_tiny(bool hidden = false) {
196     tbb::flow::graph g;
197     tbb::flow::function_node< int, int > f0( g, tbb::flow::unlimited, passthru_body() );
198     tiny_node t(g, hidden);
199     CHECK_MESSAGE( (&tbb::flow::input_port<0>(t) == &t.f1), "f1 not bound to input port 0 in composite_node t");
200     CHECK_MESSAGE( (&tbb::flow::output_port<0>(t) == &t.f2), "f2 not bound to output port 0 in composite_node t");
201 
202     tiny_node t1(g, hidden);
203     CHECK_MESSAGE( (&std::get<0>(t1.input_ports()) == &t1.f1), "f1 not bound to input port 0 in composite_node t1");
204     CHECK_MESSAGE( (&std::get<0>(t1.output_ports()) == &t1.f2), "f2 not bound to output port 0 in composite_node t1");
205 
206     test_input_ports_return_ref(t1);
207     test_output_ports_return_ref(t1);
208 
209     tiny_node t2(g, hidden);
210     CHECK_MESSAGE( (&tbb::flow::input_port<0>(t2) == &t2.f1), "f1 not bound to input port 0 in composite_node t2");
211     CHECK_MESSAGE( (&tbb::flow::output_port<0>(t2) == &t2.f2), "f2 not bound to output port 0 in composite_node t2");
212 
213     tbb::flow::function_node< int, int > f3( g, tbb::flow::unlimited, passthru_body() );
214     tbb::flow::make_edge( f0, t );
215     tbb::flow::make_edge( t, t1 );
216     tbb::flow::make_edge( t1, t2 );
217     tbb::flow::make_edge( t2 , f3 );
218     tbb::flow::queue_node<int> q(g);
219     tbb::flow::make_edge(f3, q);
220     f0.try_put(1);
221     g.wait_for_all();
222 
223     int i, j =0;
224     q.try_get(i);
225     CHECK_MESSAGE( ( i == 1), "item did not go through graph");
226     q.try_get(j);
227     CHECK_MESSAGE( ( !j), "unexpected item in graph");
228     g.wait_for_all();
229 
230     tbb::flow::remove_edge(f3, q);
231     tbb::flow::remove_edge(t2, f3);
232     tbb::flow::remove_edge(t1, t2);
233 
234     tbb::flow::make_edge( t1 , f3 );
235     tbb::flow::make_edge(f3, q);
236 
237     f0.try_put(2);
238     g.wait_for_all();
239 
240     q.try_get(i);
241     CHECK_MESSAGE( ( i == 2), "item did not go through graph after removal of edge");
242     q.try_get(j);
243     CHECK_MESSAGE( ( !j), "unexpected item in graph after removal of edge");
244 
245     return 0;
246 }
247 
248 class adder_node : public tbb::flow::composite_node< std::tuple< int, int >, std::tuple< int > > {
249 public:
250     tbb::flow::join_node< std::tuple< int, int >, tbb::flow::queueing > j;
251     tbb::flow::function_node< std::tuple< int, int >, int > f;
252 private:
253     typedef tbb::flow::composite_node< std::tuple< int, int >, std::tuple< int > > base_type;
254 
255     struct f_body {
256         int operator()( const std::tuple< int, int > &t ) {
257             return std::get<0>(t) + std::get<1>(t);
258         }
259     };
260 
261 public:
262     adder_node(tbb::flow::graph &g, bool hidden = false) : base_type(g), j(g), f(g, tbb::flow::unlimited, f_body() ) {
263         tbb::flow::make_edge( j, f );
264 
265         base_type::set_external_ports(base_type::input_ports_type(tbb::flow::input_port<0>(j), tbb::flow::input_port<1>(j)), base_type::output_ports_type(f));
266 
267         if (hidden)
268             base_type::add_nodes(j, f);
269         else
270             base_type::add_visible_nodes(j, f);
271 
272     }
273 };
274 
275 struct square_body { int operator()(int v) { return v*v; } };
276 struct cube_body { int operator()(int v) { return v*v*v; } };
277 int adder_sum(int i) {
278     return (int)(pow(3*pow(i,3) + pow(i, 2),2));
279 }
280 int test_adder(bool hidden = false) {
281     tbb::flow::graph g;
282     tbb::flow::function_node<int,int> s(g, tbb::flow::unlimited, square_body());
283     tbb::flow::function_node<int,int> c(g, tbb::flow::unlimited, cube_body());
284     tbb::flow::function_node<int,int> p(g, tbb::flow::unlimited, passthru_body());
285 
286     adder_node a0(g, hidden);
287     CHECK_MESSAGE( (&tbb::flow::input_port<0>(a0) == &tbb::flow::input_port<0>(a0.j)), "input_port 0 of j not bound to input port 0 in composite_node a0");
288     CHECK_MESSAGE( (&tbb::flow::input_port<1>(a0) == &tbb::flow::input_port<1>(a0.j)), "input_port 1 of j not bound to input port 1 in composite_node a0");
289     CHECK_MESSAGE( (&tbb::flow::output_port<0>(a0) == &a0.f), "f not bound to output port 0 in composite_node a0");
290 
291     adder_node a1(g, hidden);
292     CHECK_MESSAGE( (&std::get<0>(a0.input_ports()) == &tbb::flow::input_port<0>(a0.j)), "input_port 0 of j not bound to input port 0 in composite_node a1");
293     CHECK_MESSAGE( (&std::get<1>(a0.input_ports()) == &tbb::flow::input_port<1>(a0.j)), "input_port1 of j not bound to input port 1 in composite_node a1");
294     CHECK_MESSAGE( (&std::get<0>(a0.output_ports()) == &a0.f), "f not bound to output port 0 in composite_node a1");
295 
296     adder_node a2(g, hidden);
297     CHECK_MESSAGE( (&tbb::flow::input_port<0>(a2) == &tbb::flow::input_port<0>(a2.j)), "input_port 0 of j not bound to input port 0 in composite_node a2");
298     CHECK_MESSAGE( (&tbb::flow::input_port<1>(a2) == &tbb::flow::input_port<1>(a2.j)), "input_port 1 of j not bound to input port 1 in composite_node a2");
299     CHECK_MESSAGE( (&tbb::flow::output_port<0>(a2) == &a2.f), "f not bound to output port 0 in composite_node a2");
300 
301     adder_node a3(g, hidden);
302     CHECK_MESSAGE( (&std::get<0>(a3.input_ports()) == &tbb::flow::input_port<0>(a3.j)), "input_port 0 of j not bound to input port 0 in composite_node a3");
303     CHECK_MESSAGE( (&std::get<1>(a3.input_ports()) == &tbb::flow::input_port<1>(a3.j)), "input_port1 of j not bound to input port 1 in composite_node a3");
304     CHECK_MESSAGE( (&std::get<0>(a3.output_ports()) == &a3.f), "f not bound to output port 0 in composite_node a3");
305 
306     tbb::flow::function_node<int,int> s2(g, tbb::flow::unlimited, square_body());
307     tbb::flow::queue_node<int> q(g);
308 
309     tbb::flow::make_edge( s, tbb::flow::input_port<0>(a0) );
310     tbb::flow::make_edge( c, tbb::flow::input_port<1>(a0) );
311 
312     tbb::flow::make_edge( c, tbb::flow::input_port<0>(a1) );
313     tbb::flow::make_edge( c, tbb::flow::input_port<1>(a1) );
314 
315     tbb::flow::make_edge( tbb::flow::output_port<0>(a0), tbb::flow::input_port<0>(a2) );
316     tbb::flow::make_edge( tbb::flow::output_port<0>(a1), tbb::flow::input_port<1>(a2) );
317 
318     tbb::flow::make_edge( tbb::flow::output_port<0>(a2), s2 );
319     tbb::flow::make_edge( s2, q );
320 
321     int sum_total=0;
322     int result=0;
323     for ( int i = 1; i < 4; ++i ) {
324         s.try_put(i);
325         c.try_put(i);
326         sum_total += adder_sum(i);
327         g.wait_for_all();
328     }
329 
330     int j;
331     for ( int i = 1; i < 4; ++i ) {
332         q.try_get(j);
333         result += j;
334     }
335     g.wait_for_all();
336     CHECK_MESSAGE( (result == sum_total), "the sum from the graph does not match the calculated value");
337 
338     tbb::flow::remove_edge(s2, q);
339     tbb::flow::remove_edge( a2, s2 );
340     tbb::flow::make_edge( a0, a3 );
341     tbb::flow::make_edge( a1, tbb::flow::input_port<1>(a3) );
342     tbb::flow::make_edge( a3, s2 );
343     tbb::flow::make_edge( s2, q );
344 
345     sum_total=0;
346     result=0;
347     for ( int i = 10; i < 20; ++i ) {
348         s.try_put(i);
349         c.try_put(i);
350         sum_total += adder_sum(i);
351         g.wait_for_all();
352     }
353 
354     for ( int i = 10; i < 20; ++i ) {
355         q.try_get(j);
356         result += j;
357     }
358     g.wait_for_all();
359     CHECK_MESSAGE( (result == sum_total), "the new sum after the replacement of the nodes does not match the calculated value");
360 
361     return 0;
362 }
363 
364 /*
365                                               outer composite node (outer_node)
366                                      |-------------------------------------------------------------------|
367                                      |                                                                   |
368                                      |  |------------------|  |------------------|  |------------------| |
369              |---------------------| |--| inner composite  | /| inner composite  | /| inner composite  | | |-------------------|
370              |broadcast node(input)|/|  | node             |/ | node             |/ | node             |-+-| queue node(output)|
371              |---------------------|\|  |(inner_node1)     |\ | (inner_node2)    |\ | (inner_node3)    | | |-------------------|
372                                      |--|                  | \|                  | \|                  | |
373                                      |  |------------------|  |------------------|  |------------------| |
374                                      |                                                                   |
375                                      |-------------------------------------------------------------------|
376 
377 */
378 int test_nested_adder(bool hidden=false) {
379     tbb::flow::graph g;
380     tbb::flow::composite_node<std::tuple<int, int>, std::tuple<int> > outer_node(g);
381     typedef tbb::flow::composite_node<std::tuple<int, int>, std::tuple<int> > base_type;
382     tbb::flow::broadcast_node<int> input(g);
383     tbb::flow::queue_node<int> output(g);
384 
385     adder_node inner_node1(g, hidden);
386     adder_node inner_node2(g, hidden);
387     adder_node inner_node3(g, hidden);
388 
389     outer_node.set_external_ports(base_type::input_ports_type(tbb::flow::input_port<0>(inner_node1), tbb::flow::input_port<1>(inner_node1)), base_type::output_ports_type(tbb::flow::output_port<0>(inner_node3)));
390 
391     CHECK_MESSAGE( (&tbb::flow::input_port<0>(outer_node) == &tbb::flow::input_port<0>(inner_node1)), "input port 0 of inner_node1 not bound to input port 0 in outer_node");
392     CHECK_MESSAGE( (&tbb::flow::input_port<1>(outer_node) == &tbb::flow::input_port<1>(inner_node1)), "input port 1 of inner_node1 not bound to input port 1 in outer_node");
393     CHECK_MESSAGE( (&tbb::flow::output_port<0>(outer_node) == &tbb::flow::output_port<0>(inner_node3)), "output port 0 of inner_node3 not bound to output port 0 in outer_node");
394 
395     tbb::flow::make_edge(input, tbb::flow::input_port<0>(outer_node)/*inner_node1*/);
396     tbb::flow::make_edge(input, tbb::flow::input_port<1>(outer_node)/*inner_node1*/);
397 
398     tbb::flow::make_edge(inner_node1, tbb::flow::input_port<0>(inner_node2));
399     tbb::flow::make_edge(inner_node1, tbb::flow::input_port<1>(inner_node2));
400 
401     tbb::flow::make_edge(inner_node2, tbb::flow::input_port<0>(inner_node3));
402     tbb::flow::make_edge(inner_node2, tbb::flow::input_port<1>(inner_node3));
403 
404     tbb::flow::make_edge(outer_node/*inner_node3*/, output);
405 
406     if(hidden)
407         outer_node.add_nodes(inner_node1, inner_node2, inner_node3);
408     else
409         outer_node.add_visible_nodes(inner_node1, inner_node2, inner_node3);
410 
411     int out;
412     for (int i = 1; i < 200000; ++i) {
413         input.try_put(i);
414         g.wait_for_all();
415         output.try_get(out);
416         CHECK_MESSAGE( (tbb::flow::output_port<0>(outer_node).try_get(out) == output.try_get(out)), "output from outer_node does not match output from graph");
417         CHECK_MESSAGE( (out == 8*i), "output from outer_node not correct");
418     }
419     g.wait_for_all();
420 
421     return 0;
422 }
423 
424 template< typename T >
425 class prefix_node : public tbb::flow::composite_node< std::tuple< T, T, T, T, T >, std::tuple< T, T, T, T, T > > {
426     typedef std::tuple< T, T, T, T, T > my_tuple_t;
427 public:
428     tbb::flow::join_node< my_tuple_t, tbb::flow::queueing > j;
429     tbb::flow::split_node< my_tuple_t > s;
430 private:
431     tbb::flow::function_node< my_tuple_t, my_tuple_t > f;
432     typedef tbb::flow::composite_node< my_tuple_t, my_tuple_t > base_type;
433 
434     struct f_body {
435         my_tuple_t operator()( const my_tuple_t &t ) {
436             return my_tuple_t( std::get<0>(t),
437                                std::get<0>(t) + std::get<1>(t),
438                                std::get<0>(t) + std::get<1>(t) + std::get<2>(t),
439                                std::get<0>(t) + std::get<1>(t) + std::get<2>(t) + std::get<3>(t),
440                                std::get<0>(t) + std::get<1>(t) + std::get<2>(t) + std::get<3>(t) + std::get<4>(t) );
441         }
442     };
443 
444 public:
445     prefix_node(tbb::flow::graph &g, bool hidden = false ) : base_type(g), j(g), s(g), f(g, tbb::flow::serial, f_body() ) {
446         tbb::flow::make_edge( j, f );
447         tbb::flow::make_edge( f, s );
448 
449     typename base_type::input_ports_type input_tuple(tbb::flow::input_port<0>(j), tbb::flow::input_port<1>(j), tbb::flow::input_port<2>(j), tbb::flow::input_port<3>(j), tbb::flow::input_port<4>(j));
450 
451     typename base_type::output_ports_type output_tuple(tbb::flow::output_port<0>(s), tbb::flow::output_port<1>(s), tbb::flow::output_port<2>(s), tbb::flow::output_port<3>(s), tbb::flow::output_port<4>(s));
452 
453     base_type::set_external_ports(input_tuple, output_tuple);
454 
455         if(hidden)
456             base_type::add_nodes(j,s,f);
457         else
458             base_type::add_visible_nodes(j,s,f);
459 
460     }
461 };
462 
463 int test_prefix(bool hidden = false) {
464     tbb::flow::graph g;
465     prefix_node<double> p(g, hidden);
466 
467     CHECK_MESSAGE( (&std::get<0>(p.input_ports()) == &tbb::flow::input_port<0>(p.j)), "input port 0 of j is not bound to input port 0 of composite node p");
468     CHECK_MESSAGE( (&tbb::flow::input_port<1>(p.j) == &tbb::flow::input_port<1>(p.j)), "input port 1 of j is not bound to input port 1 of composite node p");
469     CHECK_MESSAGE( (&std::get<2>(p.input_ports()) == &tbb::flow::input_port<2>(p.j)), "input port 2 of j is not bound to input port 2 of composite node p");
470     CHECK_MESSAGE( (&tbb::flow::input_port<3>(p.j) == &tbb::flow::input_port<3>(p.j)), "input port 3 of j is not bound to input port 3 of composite node p");
471     CHECK_MESSAGE( (&std::get<4>(p.input_ports()) == &tbb::flow::input_port<4>(p.j)), "input port 4 of j is not bound to input port 4 of composite node p");
472 
473 
474     CHECK_MESSAGE( (&std::get<0>(p.output_ports()) == &tbb::flow::output_port<0>(p.s)), "output port 0 of s is not bound to output port 0 of composite node p");
475     CHECK_MESSAGE( (&tbb::flow::output_port<1>(p.s) == &tbb::flow::output_port<1>(p.s)), "output port 1 of s is not bound to output port 1 of composite node p");
476     CHECK_MESSAGE( (&std::get<2>(p.output_ports()) == &tbb::flow::output_port<2>(p.s)), "output port 2 of s is not bound to output port 2 of composite node p");
477     CHECK_MESSAGE( (&tbb::flow::output_port<3>(p.s) == &tbb::flow::output_port<3>(p.s)), "output port 3 of s is not bound to output port 3 of composite node p");
478     CHECK_MESSAGE( (&std::get<4>(p.output_ports()) == &tbb::flow::output_port<4>(p.s)), "output port 4 of s is not bound to output port 4 of composite node p");
479 
480     std::vector< tbb::flow::queue_node<double> > v( 5, tbb::flow::queue_node<double>(g) );
481     tbb::flow::make_edge( tbb::flow::output_port<0>(p), v[0] );
482     tbb::flow::make_edge( tbb::flow::output_port<1>(p), v[1] );
483     tbb::flow::make_edge( tbb::flow::output_port<2>(p), v[2] );
484     tbb::flow::make_edge( tbb::flow::output_port<3>(p), v[3] );
485     tbb::flow::make_edge( tbb::flow::output_port<4>(p), v[4] );
486 
487     for(  double offset = 1; offset < 10000; offset *= 10 ) {
488         tbb::flow::input_port<0>(p).try_put( offset );
489         tbb::flow::input_port<1>(p).try_put( offset + 1 );
490         tbb::flow::input_port<2>(p).try_put( offset + 2 );
491         tbb::flow::input_port<3>(p).try_put( offset + 3 );
492         tbb::flow::input_port<4>(p).try_put( offset + 4 );
493     }
494     g.wait_for_all();
495 
496     double x;
497     while ( v[0].try_get(x) ) {
498         g.wait_for_all();
499         for ( int i = 1; i < 5; ++i ) {
500             v[i].try_get(x);
501             g.wait_for_all();
502         }
503     }
504     return 0;
505 }
506 
507 struct input_only_output_only_seq {
508     int operator()(int i){ return (i + 3) / 4 - 1;}
509 };
510 
511 void input_only_output_only_composite(bool hidden) {
512     tbb::flow::graph g;
513 
514     tbb::flow::composite_node<std::tuple<int>, std::tuple<int> > input_output(g);
515 
516     typedef tbb::flow::composite_node<std::tuple<int>, std::tuple<> > input_only_composite;
517     typedef tbb::flow::composite_node<std::tuple<>, std::tuple<int> > output_only_composite;
518 
519     typedef tbb::flow::input_node<int> src_type;
520     typedef tbb::flow::queue_node<int> q_type;
521     typedef tbb::flow::function_node<int, int> f_type;
522     typedef tbb::flow::sequencer_node<int> sequencer_type;
523 
524     int num = 0;
525     int finish=1000;
526     int step = 4;
527 
528     input_only_composite a_in(g);
529     output_only_composite a_out(g);
530 
531     src_type src(g, my_input_body(finish, step));
532     q_type que(g);
533     f_type f(g, 1, passthru_body());
534 
535     // Sequencer_node is needed, because serial function_node guarantees only serial body execution,
536     // not a sequential order of messages dispatch
537     sequencer_type seq(g, input_only_output_only_seq());
538 
539     std::tuple<f_type& > input_tuple(f);
540     a_in.set_external_ports(input_tuple);
541     CHECK_MESSAGE( (&std::get<0>(a_in.input_ports()) == &f), "f not bound to input port 0 in composite_node a_in");
542 
543     std::tuple<src_type&> output_tuple(src);
544     a_out.set_external_ports(output_tuple);
545     CHECK_MESSAGE( (&std::get<0>(a_out.output_ports()) == &src), "src not bound to output port 0 in composite_node a_out");
546 
547     if(hidden) {
548         a_in.add_nodes(f, seq, que);
549         a_out.add_nodes(src);
550     } else {
551         a_in.add_visible_nodes(f, seq, que);
552         a_out.add_visible_nodes(src);
553     }
554 
555     tbb::flow::make_edge(a_out, a_in);
556     tbb::flow::make_edge(f, seq);
557     tbb::flow::make_edge(seq, que);
558     src.activate();
559     g.wait_for_all();
560 
561     for(int i = 1; i<finish/step; ++i) {
562         que.try_get(num);
563         CHECK_MESSAGE( (num == 4*i - 3), "number does not match position in sequence");
564     }
565     g.wait_for_all();
566 }
567 
568 //! Test all node types inside composite node
569 //! \brief \ref error_guessing
570 TEST_CASE("Add all nodes"){
571     add_all_nodes();
572 }
573 
574 //! Test single node inside composite nodes
575 //! \brief \ref error_guessing
576 TEST_CASE("Tiny tests"){
577     test_tiny(false);
578     test_tiny(true);
579 }
580 
581 //! Test basic adders in composite node
582 //! \brief \ref error_guessing
583 TEST_CASE("Adder tests"){
584     test_adder(false);
585     test_adder(true);
586 }
587 
588 //! Test nested adders in composite node
589 //! \brief \ref error_guessing
590 TEST_CASE("Nested adder tests"){
591     test_nested_adder(true);
592     test_nested_adder(false);
593 }
594 
595 //! Test returning a subset of inputs
596 //! \brief \ref error_guessing
597 TEST_CASE("Prefix test"){
598     test_prefix(false);
599     test_prefix(true);
600 }
601 
602 //! Test input-only composite node
603 //! \brief \ref error_guessing \ref boundary
604 TEST_CASE("Input-only composite"){
605     input_only_output_only_composite(true);
606     input_only_output_only_composite(false);
607 }
608 
609