xref: /oneTBB/test/tbb/test_composite_node.cpp (revision b15aabb3)
1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #if __INTEL_COMPILER && _MSC_VER
18 #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
19 #endif
20 
21 #include "common/config.h"
22 
23 // TODO revamp: move parts dependent on __TBB_EXTRA_DEBUG into separate test(s) since having these
24 // parts in all of tests might make testing of the product, which is different from what is actually
25 // released.
26 #define __TBB_EXTRA_DEBUG 1
27 #include "tbb/flow_graph.h"
28 
29 #include "common/test.h"
30 #include "common/utils.h"
31 #include "common/graph_utils.h"
32 
33 #include <tuple>
34 #include <cmath>
35 #include <vector>
36 
37 
38 //! \file test_composite_node.cpp
39 //! \brief Test for [flow_graph.composite_node] specification
40 
41 
42 struct passthru_body {
43     int operator()( int i ) {
44         return i;
45     }
46 };
47 
48 class my_input_body{
49     int start;
50     int finish;
51     int step;
52 public:
53     my_input_body(int f, int s) : start(1), finish(f), step(s) {}
54     int operator()(tbb::flow_control& fc) {
55        int a = start;
56        if (start <= finish) {
57            a = start;
58            start+=step;
59            return a;
60        }
61        else {
62            fc.stop();
63            return int();
64        };
65    }
66 };
67 
68 struct m_fxn_body{
69     void operator()(int, tbb::flow::multifunction_node<int, std::tuple<int,int> >::output_ports_type ) {}
70 };
71 
72 struct ct_body {
73 ct_body(){}
74     void operator()(tbb::flow::continue_msg){}
75 };
76 
77 struct seq_body {
78 int operator()(int i){return i;}
79 };
80 
81 template<int N, typename T1, typename T2>
82 struct compare {
83     static void compare_refs(T1 tuple1, T2 tuple2) {
84     CHECK_MESSAGE( ( &std::get<N>(tuple1) == &std::get<N>(tuple2)), "ports not set correctly");
85     compare<N-1, T1, T2>::compare_refs(tuple1, tuple2);
86     }
87 };
88 
89 template<typename T1, typename T2>
90 struct compare<1, T1, T2> {
91     static void compare_refs(T1 tuple1, T2 tuple2) {
92     CHECK_MESSAGE( (&std::get<0>(tuple1) == &std::get<0>(tuple2)), "port 0 not correctly set");
93     }
94 };
95 
96 void add_all_nodes (){
97     tbb::flow::graph g;
98 
99     typedef std::tuple<tbb::flow::continue_msg, std::tuple<int, int>, int, int, int, int,
100                              int, int, int, int, int, int, int, int > InputTupleType;
101 
102     typedef std::tuple<tbb::flow::continue_msg, std::tuple<int, int>, tbb::flow::tagged_msg<size_t, int, float>,
103                              int, int, int, int, int, int, int, int, int, int, int, int >  OutputTupleType;
104 
105     typedef std::tuple< > EmptyTupleType;
106 
107     typedef tbb::flow::composite_node<InputTupleType, OutputTupleType > input_output_type;
108     typedef tbb::flow::composite_node<InputTupleType, EmptyTupleType > input_only_type;
109     typedef tbb::flow::composite_node<EmptyTupleType, OutputTupleType > output_only_type;
110 
111     const size_t NUM_INPUTS = std::tuple_size<InputTupleType>::value;
112     const size_t NUM_OUTPUTS = std::tuple_size<OutputTupleType>::value;
113 
114     //node types
115     tbb::flow::continue_node<tbb::flow::continue_msg> ct(g, ct_body());
116     tbb::flow::split_node< std::tuple<int, int> > s(g);
117     tbb::flow::input_node<int> src(g, my_input_body(20,5));
118     tbb::flow::function_node<int, int> fxn(g, tbb::flow::unlimited, passthru_body());
119     tbb::flow::multifunction_node<int, std::tuple<int, int> > m_fxn(g, tbb::flow::unlimited, m_fxn_body());
120     tbb::flow::broadcast_node<int> bc(g);
121     tbb::flow::limiter_node<int> lim(g, 2);
122     tbb::flow::indexer_node<int, float> ind(g);
123     tbb::flow::join_node< std::tuple< int, int >, tbb::flow::queueing > j(g);
124     tbb::flow::queue_node<int> q(g);
125     tbb::flow::buffer_node<int> bf(g);
126     tbb::flow::priority_queue_node<int> pq(g);
127     tbb::flow::write_once_node<int> wo(g);
128     tbb::flow::overwrite_node<int> ovw(g);
129     tbb::flow::sequencer_node<int> seq(g, seq_body());
130 
131     auto input_tuple = std::tie(ct, s, m_fxn, fxn, bc, tbb::flow::input_port<0>(j), lim, q, tbb::flow::input_port<0>(ind),
132                                 pq, ovw, wo, bf, seq);
133     auto output_tuple = std::tie(ct,j, ind, fxn, src, bc, tbb::flow::output_port<0>(s), lim, tbb::flow::output_port<0>(m_fxn),
134                                  q, pq, ovw, wo, bf, seq );
135 
136     //composite_node with both input_ports and output_ports
137     input_output_type a_node(g);
138     a_node.set_external_ports(input_tuple, output_tuple);
139 
140     a_node.add_visible_nodes(src, fxn, m_fxn, bc, lim, ind, s, ct, j, q, bf, pq, wo, ovw, seq);
141     a_node.add_nodes(src, fxn, m_fxn, bc, lim, ind, s, ct, j, q, bf, pq, wo, ovw, seq);
142 
143     auto a_node_input_ports_ptr = a_node.input_ports();
144     compare<NUM_INPUTS-1, decltype(a_node_input_ports_ptr), decltype(input_tuple)>::compare_refs(a_node_input_ports_ptr, input_tuple);
145     CHECK_MESSAGE(NUM_INPUTS == std::tuple_size<decltype(a_node_input_ports_ptr)>::value, "not all declared input ports were bound to nodes");
146 
147     auto a_node_output_ports_ptr = a_node.output_ports();
148     compare<NUM_OUTPUTS-1, decltype(a_node_output_ports_ptr), decltype(output_tuple)>::compare_refs(a_node_output_ports_ptr, output_tuple);
149     CHECK_MESSAGE( (NUM_OUTPUTS == std::tuple_size<decltype(a_node_output_ports_ptr)>::value), "not all declared output ports were bound to nodes");
150 
151     //composite_node with only input_ports
152     input_only_type b_node(g);
153     b_node.set_external_ports(input_tuple);
154 
155     b_node.add_visible_nodes(src, fxn, m_fxn, bc, lim, ind, s, ct, j, q, bf, pq, wo, ovw, seq);
156     b_node.add_nodes(src, fxn, m_fxn, bc, lim, ind, s, ct, j, q, bf, pq, wo, ovw, seq);
157 
158     auto b_node_input_ports_ptr = b_node.input_ports();
159     compare<NUM_INPUTS-1, decltype(b_node_input_ports_ptr), decltype(input_tuple)>::compare_refs(b_node_input_ports_ptr, input_tuple);
160     CHECK_MESSAGE(NUM_INPUTS == std::tuple_size<decltype(b_node_input_ports_ptr)>::value, "not all declared input ports were bound to nodes");
161 
162     //composite_node with only output_ports
163     output_only_type c_node(g);
164     c_node.set_external_ports(output_tuple);
165 
166     // Reset is not suppose to do anything. Check that it can be called.
167     g.reset();
168 
169     c_node.add_visible_nodes(src, fxn, m_fxn, bc, lim, ind, s, ct, j, q, bf, pq, wo, ovw, seq);
170 
171     c_node.add_nodes(src, fxn, m_fxn, bc, lim, ind, s, ct, j, q, bf, pq, wo, ovw, seq);
172 
173     auto c_node_output_ports_ptr = c_node.output_ports();
174     compare<NUM_OUTPUTS-1, decltype(c_node_output_ports_ptr), decltype(output_tuple)>::compare_refs(c_node_output_ports_ptr, output_tuple);
175     CHECK_MESSAGE(NUM_OUTPUTS == std::tuple_size<decltype(c_node_output_ports_ptr)>::value, "not all declared input ports were bound to nodes");
176 }
177 
178 struct tiny_node : public tbb::flow::composite_node< std::tuple< int >, std::tuple< int > > {
179     tbb::flow::function_node< int, int > f1;
180     tbb::flow::function_node< int, int > f2;
181     typedef tbb::flow::composite_node< std::tuple< int >, std::tuple< int > > base_type;
182 
183 public:
184     tiny_node(tbb::flow::graph &g, bool hidden = false) : base_type(g), f1(g, tbb::flow::unlimited, passthru_body() ), f2(g, tbb::flow::unlimited, passthru_body() ) {
185         tbb::flow::make_edge( f1, f2 );
186 
187         std::tuple<tbb::flow::function_node< int, int >& > input_tuple(f1);
188         std::tuple<tbb::flow::function_node< int, int >& > output_tuple(f2);
189         base_type::set_external_ports( input_tuple, output_tuple );
190 
191         if(hidden)
192             base_type::add_nodes(f1, f2);
193         else
194             base_type::add_visible_nodes(f1, f2);
195 
196     }
197 };
198 
199 int test_tiny(bool hidden = false) {
200     tbb::flow::graph g;
201     tbb::flow::function_node< int, int > f0( g, tbb::flow::unlimited, passthru_body() );
202     tiny_node t(g, hidden);
203     CHECK_MESSAGE( (&tbb::flow::input_port<0>(t) == &t.f1), "f1 not bound to input port 0 in composite_node t");
204     CHECK_MESSAGE( (&tbb::flow::output_port<0>(t) == &t.f2), "f2 not bound to output port 0 in composite_node t");
205 
206     tiny_node t1(g, hidden);
207     CHECK_MESSAGE( (&std::get<0>(t1.input_ports()) == &t1.f1), "f1 not bound to input port 0 in composite_node t1");
208     CHECK_MESSAGE( (&std::get<0>(t1.output_ports()) == &t1.f2), "f2 not bound to output port 0 in composite_node t1");
209 
210     test_input_ports_return_ref(t1);
211     test_output_ports_return_ref(t1);
212 
213     tiny_node t2(g, hidden);
214     CHECK_MESSAGE( (&tbb::flow::input_port<0>(t2) == &t2.f1), "f1 not bound to input port 0 in composite_node t2");
215     CHECK_MESSAGE( (&tbb::flow::output_port<0>(t2) == &t2.f2), "f2 not bound to output port 0 in composite_node t2");
216 
217     tbb::flow::function_node< int, int > f3( g, tbb::flow::unlimited, passthru_body() );
218     tbb::flow::make_edge( f0, t );
219     tbb::flow::make_edge( t, t1 );
220     tbb::flow::make_edge( t1, t2 );
221     tbb::flow::make_edge( t2 , f3 );
222     tbb::flow::queue_node<int> q(g);
223     tbb::flow::make_edge(f3, q);
224     f0.try_put(1);
225     g.wait_for_all();
226 
227     int i, j =0;
228     q.try_get(i);
229     CHECK_MESSAGE( ( i == 1), "item did not go through graph");
230     q.try_get(j);
231     CHECK_MESSAGE( ( !j), "unexpected item in graph");
232     g.wait_for_all();
233 
234     tbb::flow::remove_edge(f3, q);
235     tbb::flow::remove_edge(t2, f3);
236     tbb::flow::remove_edge(t1, t2);
237 
238     tbb::flow::make_edge( t1 , f3 );
239     tbb::flow::make_edge(f3, q);
240 
241     f0.try_put(2);
242     g.wait_for_all();
243 
244     q.try_get(i);
245     CHECK_MESSAGE( ( i == 2), "item did not go through graph after removal of edge");
246     q.try_get(j);
247     CHECK_MESSAGE( ( !j), "unexpected item in graph after removal of edge");
248 
249     return 0;
250 }
251 
252 class adder_node : public tbb::flow::composite_node< std::tuple< int, int >, std::tuple< int > > {
253 public:
254     tbb::flow::join_node< std::tuple< int, int >, tbb::flow::queueing > j;
255     tbb::flow::function_node< std::tuple< int, int >, int > f;
256 private:
257     typedef tbb::flow::composite_node< std::tuple< int, int >, std::tuple< int > > base_type;
258 
259     struct f_body {
260         int operator()( const std::tuple< int, int > &t ) {
261             return std::get<0>(t) + std::get<1>(t);
262         }
263     };
264 
265 public:
266     adder_node(tbb::flow::graph &g, bool hidden = false) : base_type(g), j(g), f(g, tbb::flow::unlimited, f_body() ) {
267         tbb::flow::make_edge( j, f );
268 
269         base_type::set_external_ports(base_type::input_ports_type(tbb::flow::input_port<0>(j), tbb::flow::input_port<1>(j)), base_type::output_ports_type(f));
270 
271         if (hidden)
272             base_type::add_nodes(j, f);
273         else
274             base_type::add_visible_nodes(j, f);
275 
276     }
277 };
278 
279 struct square_body { int operator()(int v) { return v*v; } };
280 struct cube_body { int operator()(int v) { return v*v*v; } };
281 int adder_sum(int i) {
282     return (int)(pow(3*pow(i,3) + pow(i, 2),2));
283 }
284 int test_adder(bool hidden = false) {
285     tbb::flow::graph g;
286     tbb::flow::function_node<int,int> s(g, tbb::flow::unlimited, square_body());
287     tbb::flow::function_node<int,int> c(g, tbb::flow::unlimited, cube_body());
288     tbb::flow::function_node<int,int> p(g, tbb::flow::unlimited, passthru_body());
289 
290     adder_node a0(g, hidden);
291     CHECK_MESSAGE( (&tbb::flow::input_port<0>(a0) == &tbb::flow::input_port<0>(a0.j)), "input_port 0 of j not bound to input port 0 in composite_node a0");
292     CHECK_MESSAGE( (&tbb::flow::input_port<1>(a0) == &tbb::flow::input_port<1>(a0.j)), "input_port 1 of j not bound to input port 1 in composite_node a0");
293     CHECK_MESSAGE( (&tbb::flow::output_port<0>(a0) == &a0.f), "f not bound to output port 0 in composite_node a0");
294 
295     adder_node a1(g, hidden);
296     CHECK_MESSAGE( (&std::get<0>(a0.input_ports()) == &tbb::flow::input_port<0>(a0.j)), "input_port 0 of j not bound to input port 0 in composite_node a1");
297     CHECK_MESSAGE( (&std::get<1>(a0.input_ports()) == &tbb::flow::input_port<1>(a0.j)), "input_port1 of j not bound to input port 1 in composite_node a1");
298     CHECK_MESSAGE( (&std::get<0>(a0.output_ports()) == &a0.f), "f not bound to output port 0 in composite_node a1");
299 
300     adder_node a2(g, hidden);
301     CHECK_MESSAGE( (&tbb::flow::input_port<0>(a2) == &tbb::flow::input_port<0>(a2.j)), "input_port 0 of j not bound to input port 0 in composite_node a2");
302     CHECK_MESSAGE( (&tbb::flow::input_port<1>(a2) == &tbb::flow::input_port<1>(a2.j)), "input_port 1 of j not bound to input port 1 in composite_node a2");
303     CHECK_MESSAGE( (&tbb::flow::output_port<0>(a2) == &a2.f), "f not bound to output port 0 in composite_node a2");
304 
305     adder_node a3(g, hidden);
306     CHECK_MESSAGE( (&std::get<0>(a3.input_ports()) == &tbb::flow::input_port<0>(a3.j)), "input_port 0 of j not bound to input port 0 in composite_node a3");
307     CHECK_MESSAGE( (&std::get<1>(a3.input_ports()) == &tbb::flow::input_port<1>(a3.j)), "input_port1 of j not bound to input port 1 in composite_node a3");
308     CHECK_MESSAGE( (&std::get<0>(a3.output_ports()) == &a3.f), "f not bound to output port 0 in composite_node a3");
309 
310     tbb::flow::function_node<int,int> s2(g, tbb::flow::unlimited, square_body());
311     tbb::flow::queue_node<int> q(g);
312 
313     tbb::flow::make_edge( s, tbb::flow::input_port<0>(a0) );
314     tbb::flow::make_edge( c, tbb::flow::input_port<1>(a0) );
315 
316     tbb::flow::make_edge( c, tbb::flow::input_port<0>(a1) );
317     tbb::flow::make_edge( c, tbb::flow::input_port<1>(a1) );
318 
319     tbb::flow::make_edge( tbb::flow::output_port<0>(a0), tbb::flow::input_port<0>(a2) );
320     tbb::flow::make_edge( tbb::flow::output_port<0>(a1), tbb::flow::input_port<1>(a2) );
321 
322     tbb::flow::make_edge( tbb::flow::output_port<0>(a2), s2 );
323     tbb::flow::make_edge( s2, q );
324 
325     int sum_total=0;
326     int result=0;
327     for ( int i = 1; i < 4; ++i ) {
328         s.try_put(i);
329         c.try_put(i);
330         sum_total += adder_sum(i);
331         g.wait_for_all();
332     }
333 
334     int j;
335     for ( int i = 1; i < 4; ++i ) {
336         q.try_get(j);
337         result += j;
338     }
339     g.wait_for_all();
340     CHECK_MESSAGE( (result == sum_total), "the sum from the graph does not match the calculated value");
341 
342     tbb::flow::remove_edge(s2, q);
343     tbb::flow::remove_edge( a2, s2 );
344     tbb::flow::make_edge( a0, a3 );
345     tbb::flow::make_edge( a1, tbb::flow::input_port<1>(a3) );
346     tbb::flow::make_edge( a3, s2 );
347     tbb::flow::make_edge( s2, q );
348 
349     sum_total=0;
350     result=0;
351     for ( int i = 10; i < 20; ++i ) {
352         s.try_put(i);
353         c.try_put(i);
354         sum_total += adder_sum(i);
355         g.wait_for_all();
356     }
357 
358     for ( int i = 10; i < 20; ++i ) {
359         q.try_get(j);
360         result += j;
361     }
362     g.wait_for_all();
363     CHECK_MESSAGE( (result == sum_total), "the new sum after the replacement of the nodes does not match the calculated value");
364 
365     return 0;
366 }
367 
368 /*
369                                               outer composite node (outer_node)
370                                      |-------------------------------------------------------------------|
371                                      |                                                                   |
372                                      |  |------------------|  |------------------|  |------------------| |
373              |---------------------| |--| inner composite  | /| inner composite  | /| inner composite  | | |-------------------|
374              |broadcast node(input)|/|  | node             |/ | node             |/ | node             |-+-| queue node(output)|
375              |---------------------|\|  |(inner_node1)     |\ | (inner_node2)    |\ | (inner_node3)    | | |-------------------|
376                                      |--|                  | \|                  | \|                  | |
377                                      |  |------------------|  |------------------|  |------------------| |
378                                      |                                                                   |
379                                      |-------------------------------------------------------------------|
380 
381 */
382 int test_nested_adder(bool hidden=false) {
383     tbb::flow::graph g;
384     tbb::flow::composite_node<std::tuple<int, int>, std::tuple<int> > outer_node(g);
385     typedef tbb::flow::composite_node<std::tuple<int, int>, std::tuple<int> > base_type;
386     tbb::flow::broadcast_node<int> input(g);
387     tbb::flow::queue_node<int> output(g);
388 
389     adder_node inner_node1(g, hidden);
390     adder_node inner_node2(g, hidden);
391     adder_node inner_node3(g, hidden);
392 
393     outer_node.set_external_ports(base_type::input_ports_type(tbb::flow::input_port<0>(inner_node1), tbb::flow::input_port<1>(inner_node1)), base_type::output_ports_type(tbb::flow::output_port<0>(inner_node3)));
394 
395     CHECK_MESSAGE( (&tbb::flow::input_port<0>(outer_node) == &tbb::flow::input_port<0>(inner_node1)), "input port 0 of inner_node1 not bound to input port 0 in outer_node");
396     CHECK_MESSAGE( (&tbb::flow::input_port<1>(outer_node) == &tbb::flow::input_port<1>(inner_node1)), "input port 1 of inner_node1 not bound to input port 1 in outer_node");
397     CHECK_MESSAGE( (&tbb::flow::output_port<0>(outer_node) == &tbb::flow::output_port<0>(inner_node3)), "output port 0 of inner_node3 not bound to output port 0 in outer_node");
398 
399     tbb::flow::make_edge(input, tbb::flow::input_port<0>(outer_node)/*inner_node1*/);
400     tbb::flow::make_edge(input, tbb::flow::input_port<1>(outer_node)/*inner_node1*/);
401 
402     tbb::flow::make_edge(inner_node1, tbb::flow::input_port<0>(inner_node2));
403     tbb::flow::make_edge(inner_node1, tbb::flow::input_port<1>(inner_node2));
404 
405     tbb::flow::make_edge(inner_node2, tbb::flow::input_port<0>(inner_node3));
406     tbb::flow::make_edge(inner_node2, tbb::flow::input_port<1>(inner_node3));
407 
408     tbb::flow::make_edge(outer_node/*inner_node3*/, output);
409 
410     if(hidden)
411         outer_node.add_nodes(inner_node1, inner_node2, inner_node3);
412     else
413         outer_node.add_visible_nodes(inner_node1, inner_node2, inner_node3);
414 
415     int out;
416     for (int i = 1; i < 200000; ++i) {
417         input.try_put(i);
418         g.wait_for_all();
419         output.try_get(out);
420         CHECK_MESSAGE( (tbb::flow::output_port<0>(outer_node).try_get(out) == output.try_get(out)), "output from outer_node does not match output from graph");
421         CHECK_MESSAGE( (out == 8*i), "output from outer_node not correct");
422     }
423     g.wait_for_all();
424 
425     return 0;
426 }
427 
428 template< typename T >
429 class prefix_node : public tbb::flow::composite_node< std::tuple< T, T, T, T, T >, std::tuple< T, T, T, T, T > > {
430     typedef std::tuple< T, T, T, T, T > my_tuple_t;
431 public:
432     tbb::flow::join_node< my_tuple_t, tbb::flow::queueing > j;
433     tbb::flow::split_node< my_tuple_t > s;
434 private:
435     tbb::flow::function_node< my_tuple_t, my_tuple_t > f;
436     typedef tbb::flow::composite_node< my_tuple_t, my_tuple_t > base_type;
437 
438     struct f_body {
439         my_tuple_t operator()( const my_tuple_t &t ) {
440             return my_tuple_t( std::get<0>(t),
441                                std::get<0>(t) + std::get<1>(t),
442                                std::get<0>(t) + std::get<1>(t) + std::get<2>(t),
443                                std::get<0>(t) + std::get<1>(t) + std::get<2>(t) + std::get<3>(t),
444                                std::get<0>(t) + std::get<1>(t) + std::get<2>(t) + std::get<3>(t) + std::get<4>(t) );
445         }
446     };
447 
448 public:
449     prefix_node(tbb::flow::graph &g, bool hidden = false ) : base_type(g), j(g), s(g), f(g, tbb::flow::serial, f_body() ) {
450         tbb::flow::make_edge( j, f );
451         tbb::flow::make_edge( f, s );
452 
453     typename base_type::input_ports_type input_tuple(tbb::flow::input_port<0>(j), tbb::flow::input_port<1>(j), tbb::flow::input_port<2>(j), tbb::flow::input_port<3>(j), tbb::flow::input_port<4>(j));
454 
455     typename base_type::output_ports_type output_tuple(tbb::flow::output_port<0>(s), tbb::flow::output_port<1>(s), tbb::flow::output_port<2>(s), tbb::flow::output_port<3>(s), tbb::flow::output_port<4>(s));
456 
457     base_type::set_external_ports(input_tuple, output_tuple);
458 
459         if(hidden)
460             base_type::add_nodes(j,s,f);
461         else
462             base_type::add_visible_nodes(j,s,f);
463 
464     }
465 };
466 
467 int test_prefix(bool hidden = false) {
468     tbb::flow::graph g;
469     prefix_node<double> p(g, hidden);
470 
471     CHECK_MESSAGE( (&std::get<0>(p.input_ports()) == &tbb::flow::input_port<0>(p.j)), "input port 0 of j is not bound to input port 0 of composite node p");
472     CHECK_MESSAGE( (&tbb::flow::input_port<1>(p.j) == &tbb::flow::input_port<1>(p.j)), "input port 1 of j is not bound to input port 1 of composite node p");
473     CHECK_MESSAGE( (&std::get<2>(p.input_ports()) == &tbb::flow::input_port<2>(p.j)), "input port 2 of j is not bound to input port 2 of composite node p");
474     CHECK_MESSAGE( (&tbb::flow::input_port<3>(p.j) == &tbb::flow::input_port<3>(p.j)), "input port 3 of j is not bound to input port 3 of composite node p");
475     CHECK_MESSAGE( (&std::get<4>(p.input_ports()) == &tbb::flow::input_port<4>(p.j)), "input port 4 of j is not bound to input port 4 of composite node p");
476 
477 
478     CHECK_MESSAGE( (&std::get<0>(p.output_ports()) == &tbb::flow::output_port<0>(p.s)), "output port 0 of s is not bound to output port 0 of composite node p");
479     CHECK_MESSAGE( (&tbb::flow::output_port<1>(p.s) == &tbb::flow::output_port<1>(p.s)), "output port 1 of s is not bound to output port 1 of composite node p");
480     CHECK_MESSAGE( (&std::get<2>(p.output_ports()) == &tbb::flow::output_port<2>(p.s)), "output port 2 of s is not bound to output port 2 of composite node p");
481     CHECK_MESSAGE( (&tbb::flow::output_port<3>(p.s) == &tbb::flow::output_port<3>(p.s)), "output port 3 of s is not bound to output port 3 of composite node p");
482     CHECK_MESSAGE( (&std::get<4>(p.output_ports()) == &tbb::flow::output_port<4>(p.s)), "output port 4 of s is not bound to output port 4 of composite node p");
483 
484     std::vector< tbb::flow::queue_node<double> > v( 5, tbb::flow::queue_node<double>(g) );
485     tbb::flow::make_edge( tbb::flow::output_port<0>(p), v[0] );
486     tbb::flow::make_edge( tbb::flow::output_port<1>(p), v[1] );
487     tbb::flow::make_edge( tbb::flow::output_port<2>(p), v[2] );
488     tbb::flow::make_edge( tbb::flow::output_port<3>(p), v[3] );
489     tbb::flow::make_edge( tbb::flow::output_port<4>(p), v[4] );
490 
491     for(  double offset = 1; offset < 10000; offset *= 10 ) {
492         tbb::flow::input_port<0>(p).try_put( offset );
493         tbb::flow::input_port<1>(p).try_put( offset + 1 );
494         tbb::flow::input_port<2>(p).try_put( offset + 2 );
495         tbb::flow::input_port<3>(p).try_put( offset + 3 );
496         tbb::flow::input_port<4>(p).try_put( offset + 4 );
497     }
498     g.wait_for_all();
499 
500     double x;
501     while ( v[0].try_get(x) ) {
502         g.wait_for_all();
503         for ( int i = 1; i < 5; ++i ) {
504             v[i].try_get(x);
505             g.wait_for_all();
506         }
507     }
508     return 0;
509 }
510 
511 struct input_only_output_only_seq {
512     int operator()(int i){ return (i + 3) / 4 - 1;}
513 };
514 
515 void input_only_output_only_composite(bool hidden) {
516     tbb::flow::graph g;
517 
518     tbb::flow::composite_node<std::tuple<int>, std::tuple<int> > input_output(g);
519 
520     typedef tbb::flow::composite_node<std::tuple<int>, std::tuple<> > input_only_composite;
521     typedef tbb::flow::composite_node<std::tuple<>, std::tuple<int> > output_only_composite;
522 
523     typedef tbb::flow::input_node<int> src_type;
524     typedef tbb::flow::queue_node<int> q_type;
525     typedef tbb::flow::function_node<int, int> f_type;
526     typedef tbb::flow::sequencer_node<int> sequencer_type;
527 
528     int num = 0;
529     int finish=1000;
530     int step = 4;
531 
532     input_only_composite a_in(g);
533     output_only_composite a_out(g);
534 
535     src_type src(g, my_input_body(finish, step));
536     q_type que(g);
537     f_type f(g, 1, passthru_body());
538 
539     // Sequencer_node is needed, because serial function_node guarantees only serial body execution,
540     // not a sequential order of messages dispatch
541     sequencer_type seq(g, input_only_output_only_seq());
542 
543     std::tuple<f_type& > input_tuple(f);
544     a_in.set_external_ports(input_tuple);
545     CHECK_MESSAGE( (&std::get<0>(a_in.input_ports()) == &f), "f not bound to input port 0 in composite_node a_in");
546 
547     std::tuple<src_type&> output_tuple(src);
548     a_out.set_external_ports(output_tuple);
549     CHECK_MESSAGE( (&std::get<0>(a_out.output_ports()) == &src), "src not bound to output port 0 in composite_node a_out");
550 
551     if(hidden) {
552         a_in.add_nodes(f, seq, que);
553         a_out.add_nodes(src);
554     } else {
555         a_in.add_visible_nodes(f, seq, que);
556         a_out.add_visible_nodes(src);
557     }
558 
559     tbb::flow::make_edge(a_out, a_in);
560     tbb::flow::make_edge(f, seq);
561     tbb::flow::make_edge(seq, que);
562     src.activate();
563     g.wait_for_all();
564 
565     for(int i = 1; i<finish/step; ++i) {
566         que.try_get(num);
567         CHECK_MESSAGE( (num == 4*i - 3), "number does not match position in sequence");
568     }
569     g.wait_for_all();
570 }
571 
572 //! Test all node types inside composite node
573 //! \brief \ref error_guessing
574 TEST_CASE("Add all nodes"){
575     add_all_nodes();
576 }
577 
578 //! Test single node inside composite nodes
579 //! \brief \ref error_guessing
580 TEST_CASE("Tiny tests"){
581     test_tiny(false);
582     test_tiny(true);
583 }
584 
585 //! Test basic adders in composite node
586 //! \brief \ref error_guessing
587 TEST_CASE("Adder tests"){
588     test_adder(false);
589     test_adder(true);
590 }
591 
592 //! Test nested adders in composite node
593 //! \brief \ref error_guessing
594 TEST_CASE("Nested adder tests"){
595     test_nested_adder(true);
596     test_nested_adder(false);
597 }
598 
599 //! Test returning a subset of inputs
600 //! \brief \ref error_guessing
601 TEST_CASE("Prefix test"){
602     test_prefix(false);
603     test_prefix(true);
604 }
605 
606 //! Test input-only composite node
607 //! \brief \ref error_guessing \ref boundary
608 TEST_CASE("Input-only composite"){
609     input_only_output_only_composite(true);
610     input_only_output_only_composite(false);
611 }
612 
613