1.. _Data_Flow_Graph: 2 3Data Flow Graph 4=============== 5 6 7In a data flow graph, nodes are computations that send and receive data 8messages. Some nodes may only send messages, others may only receive 9messages, and others may send messages in response to messages that they 10receive. 11 12 13In the following data flow graph, the left-most node generates the 14integer values from 1 to 10 and passes them to two successor nodes. One 15of the successors squares each value it receives and passes the result 16downstream. The second successor cubes each value it receives and passes 17the result downstream. The right-most node receives values from both of 18the middle nodes. As it receives each value, it adds it to a running sum 19of values. When the application is run to completion, the value of sum 20will be equal to the sum of the sequence of squares and cubes from 1 to 2110. 22 23 24.. container:: fignone 25 :name: simple_data_flow_title 26 27 28 Simple Data Flow Graph 29 30 31 .. container:: imagecenter 32 33 34 |image0| 35 36 37The following code snippet shows an implementation of the **Simple Data 38Flow Graph** shown above: 39 40 41:: 42 43 44 int sum = 0; 45 graph g; 46 function_node< int, int > squarer( g, unlimited, [](const int &v) { 47 return v*v; 48 } ); 49 function_node< int, int > cuber( g, unlimited, [](const int &v) { 50 return v*v*v; 51 } ); 52 function_node< int, int > summer( g, 1, [&](const int &v ) -> int { 53 return sum += v; 54 } ); 55 make_edge( squarer, summer ); 56 make_edge( cuber, summer ); 57 58 59 for ( int i = 1; i <= 10; ++i ) { 60 squarer.try_put(i); 61 cuber.try_put(i); 62 } 63 g.wait_for_all(); 64 65 66 cout << "Sum is " << sum << "\n"; 67 68 69In the implementation above, the following function_nodes are created: 70 71 72- one to square values 73- one to cube values 74- one to add values to the global sum 75 76 77Since the squarer and cuber nodes are side-effect free, they are created 78with an unlimited concurrency. The summer node updates the sum through a 79reference to a global variable and therefore is not safe to execute in 80parallel. It is therefore created with a concurrency limit of 1. The 81node F from **Simple Data Flow Graph** above is implemented as a loop 82that puts messages to both the squarer and cuber node. 83 84 85A slight improvement over the first implementation is to introduce an 86additional node type, a ``broadcast_node``. A ``broadcast_node`` broadcasts any 87message it receives to all of its successors. 88 89 90This enables replacing the two ``try_put``'s in the loop with a single 91``try_put``: 92 93 94:: 95 96 97 broadcast_node<int> b(g); 98 make_edge( b, squarer ); 99 make_edge( b, cuber ); 100 for ( int i = 1; i <= 10; ++i ) { 101 b.try_put(i); 102 } 103 g.wait_for_all(); 104 105 106An even better option, which will make the implementation even more like 107the **Simple Data Flow Graph** above, is to introduce an ``input_node``. An 108``input_node``, as the name implies only sends messages and does not 109receive messages. Its constructor takes two arguments: 110 111 112:: 113 114 115 template< typename Body > input_node( graph &g, Body body) 116 117The body is a function object, or lambda expression, that contains a 118function operator: 119 120 121:: 122 123 124 Output Body::operator()( oneapi::tbb::flow_control &fc ); 125 126 127You can replace the loop in the example with an ``input_node`` 128 129 130:: 131 132 133 input_node< int > src( g, src_body(10) ); 134 make_edge( src, squarer ); 135 make_edge( src, cuber ); 136 src.activate(); 137 g.wait_for_all(); 138 139 140The runtime library will repeatedly invoke the function operator in 141``src_body`` until ``fc.stop()`` is invoked inside the body. You therefore 142need to create body that will act like the body of the loop in the **Simple Data Flow Graph** 143above. The final implementation after all of these changes is shown 144below: 145 146 147:: 148 149 150 class src_body { 151 const int my_limit; 152 int my_next_value; 153 public: 154 src_body(int l) : my_limit(l), my_next_value(1) {} 155 int operator()( oneapi::tbb::flow_control& fc ) { 156 if ( my_next_value <= my_limit ) { 157 return my_next_value++; 158 } else { 159 fc.stop(); 160 return int(); 161 } 162 } 163 }; 164 165 166 int main() { 167 int sum = 0; 168 graph g; 169 function_node< int, int > squarer( g, unlimited, [](const int &v) { 170 return v*v; 171 } ); 172 function_node< int, int > cuber( g, unlimited, [](const int &v) { 173 return v*v*v; 174 } ); 175 function_node< int, int > summer( g, 1, [&](const int &v ) -> int { 176 return sum += v; 177 } ); 178 make_edge( squarer, summer ); 179 make_edge( cuber, summer ); 180 input_node< int > src( g, src_body(10) ); 181 make_edge( src, squarer ); 182 make_edge( src, cuber ); 183 src.activate(); 184 g.wait_for_all(); 185 cout << "Sum is " << sum << "\n"; 186 } 187 188 189This final implementation has all of the nodes and edges from the 190**Simple Data Flow Graph** above. In this simple example, there is not 191much advantage in using an ``input_node`` over an explicit loop. But, 192because an ``input_node`` is able to react to the behavior of downstream 193nodes, it can limit memory use in more complex graphs. For more 194information, see:ref:`create_token_based_system` . 195 196 197.. |image0| image:: Images/flow_graph.jpg 198 199