1.. _Data_Flow_Graph:
2
3Data Flow Graph
4===============
5
6
7In a data flow graph, nodes are computations that send and receive data
8messages. Some nodes may only send messages, others may only receive
9messages, and others may send messages in response to messages that they
10receive.
11
12
13In the following data flow graph, the left-most node generates the
14integer values from 1 to 10 and passes them to two successor nodes. One
15of the successors squares each value it receives and passes the result
16downstream. The second successor cubes each value it receives and passes
17the result downstream. The right-most node receives values from both of
18the middle nodes. As it receives each value, it adds it to a running sum
19of values. When the application is run to completion, the value of sum
20will be equal to the sum of the sequence of squares and cubes from 1 to
2110.
22
23
24.. container:: fignone
25   :name: simple_data_flow_title
26
27
28   Simple Data Flow Graph
29
30
31   .. container:: imagecenter
32
33
34      |image0|
35
36
37The following code snippet shows an implementation of the **Simple Data
38Flow Graph** shown above:
39
40
41::
42
43
44       int sum = 0;
45       graph g;
46       function_node< int, int > squarer( g, unlimited, [](const int &v) {
47           return v*v;
48       } );
49       function_node< int, int > cuber( g, unlimited, [](const int &v) {
50           return v*v*v;
51       } );
52       function_node< int, int > summer( g, 1, [&](const int &v ) -> int {
53           return sum += v;
54       } );
55       make_edge( squarer, summer );
56       make_edge( cuber, summer );
57
58
59       for ( int i = 1; i <= 10; ++i ) {
60         squarer.try_put(i);
61         cuber.try_put(i);
62       }
63       g.wait_for_all();
64
65
66       cout << "Sum is " << sum << "\n";
67
68
69In the implementation above, the following function_nodes are created:
70
71
72-  one to square values
73-  one to cube values
74-  one to add values to the global sum
75
76
77Since the squarer and cuber nodes are side-effect free, they are created
78with an unlimited concurrency. The summer node updates the sum through a
79reference to a global variable and therefore is not safe to execute in
80parallel. It is therefore created with a concurrency limit of 1. The
81node F from **Simple Data Flow Graph** above is implemented as a loop
82that puts messages to both the squarer and cuber node.
83
84
85A slight improvement over the first implementation is to introduce an
86additional node type, a ``broadcast_node``. A ``broadcast_node`` broadcasts any
87message it receives to all of its successors.
88
89
90This enables replacing the two ``try_put``'s in the loop with a single
91``try_put``:
92
93
94::
95
96
97       broadcast_node<int> b(g);
98       make_edge( b, squarer );
99       make_edge( b, cuber );
100       for ( int i = 1; i <= 10; ++i ) {
101         b.try_put(i);
102       }
103       g.wait_for_all();
104
105
106An even better option, which will make the implementation even more like
107the **Simple Data Flow Graph** above, is to introduce an ``input_node``. An
108``input_node``, as the name implies only sends messages and does not
109receive messages. Its constructor takes two arguments:
110
111
112::
113
114
115   template< typename Body > input_node( graph &g, Body body)
116
117The body is a function object, or lambda expression, that contains a
118function operator:
119
120
121::
122
123
124   Output Body::operator()( oneapi::tbb::flow_control &fc );
125
126
127You can replace the loop in the example with an ``input_node``
128
129
130::
131
132
133       input_node< int > src( g, src_body(10) );
134       make_edge( src, squarer );
135       make_edge( src, cuber );
136       src.activate();
137       g.wait_for_all();
138
139
140The runtime library will repeatedly invoke the function operator in
141``src_body`` until ``fc.stop()`` is invoked inside the body. You therefore
142need to create body that will act like the body of the loop in the **Simple Data Flow Graph**
143above. The final implementation after all of these changes is shown
144below:
145
146
147::
148
149
150       class src_body {
151           const int my_limit;
152           int my_next_value;
153       public:
154           src_body(int l) : my_limit(l), my_next_value(1) {}
155           int operator()( oneapi::tbb::flow_control& fc ) {
156               if ( my_next_value <= my_limit ) {
157                   return my_next_value++;
158               } else {
159                   fc.stop();
160                   return int();
161               }
162           }
163       };
164
165
166       int main() {
167         int sum = 0;
168         graph g;
169         function_node< int, int > squarer( g, unlimited, [](const int &v) {
170             return v*v;
171         } );
172         function_node< int, int > cuber( g, unlimited, [](const int &v) {
173             return v*v*v;
174         } );
175         function_node< int, int > summer( g, 1, [&](const int &v ) -> int {
176             return sum += v;
177         } );
178         make_edge( squarer, summer );
179         make_edge( cuber, summer );
180         input_node< int > src( g, src_body(10) );
181         make_edge( src, squarer );
182         make_edge( src, cuber );
183         src.activate();
184         g.wait_for_all();
185         cout << "Sum is " << sum << "\n";
186       }
187
188
189This final implementation has all of the nodes and edges from the
190**Simple Data Flow Graph** above. In this simple example, there is not
191much advantage in using an ``input_node`` over an explicit loop. But,
192because an ``input_node`` is able to react to the behavior of downstream
193nodes, it can limit memory use in more complex graphs. For more
194information, see:ref:`create_token_based_system` .
195
196
197.. |image0| image:: Images/flow_graph.jpg
198
199