xref: /oneTBB/test/tbb/test_flow_graph.cpp (revision fa3268c3)
1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #if __INTEL_COMPILER && _MSC_VER
18 #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
19 #endif
20 
21 #include "common/config.h"
22 
23 #include "tbb/flow_graph.h"
24 
25 #include "common/test.h"
26 #include "common/utils.h"
27 #include "common/graph_utils.h"
28 #include "common/spin_barrier.h"
29 
30 
31 //! \file test_flow_graph.cpp
32 //! \brief Test for [flow_graph.continue_msg flow_graph.graph_node flow_graph.input_port flow_graph.output_port flow_graph.join_node flow_graph.split_node flow_graph.limiter_node flow_graph.write_once_node flow_graph.overwrite_node flow_graph.make_edge flow_graph.graph flow_graph.buffer_node flow_graph.function_node flow_graph.multifunction_node flow_graph.continue_node flow_graph.input_node] specification
33 
34 const int T = 4;
35 const int W = 4;
36 
37 struct decrement_wait : utils::NoAssign {
38 
39     tbb::flow::graph * const my_graph;
40     bool * const my_done_flag;
41 
42     decrement_wait( tbb::flow::graph &h, bool *done_flag ) : my_graph(&h), my_done_flag(done_flag) {}
43 
44     void operator()(int i) const {
45         utils::Sleep(10 * i);
46 
47         my_done_flag[i] = true;
48         my_graph->release_wait();
49     }
50 };
51 
52 static void test_wait_count() {
53     tbb::flow::graph h;
54     for (int i = 0; i < T; ++i ) {
55         bool done_flag[W];
56         for (int j = 0; j < W; ++j ) {
57             for ( int w = 0; w < W; ++w ) done_flag[w] = false;
58             for ( int w = 0; w < j; ++w ) h.reserve_wait();
59 
60             utils::NativeParallelFor( j, decrement_wait(h, done_flag) );
61             h.wait_for_all();
62             for ( int w = 0; w < W; ++w ) {
63                 if ( w < j ) CHECK_MESSAGE( done_flag[w] == true, "" );
64                 else CHECK_MESSAGE( done_flag[w] == false, "" );
65             }
66         }
67     }
68 }
69 
70 // Encapsulate object we want to store in vector (because contained type must have
71 // copy constructor and assignment operator
72 class my_int_buffer {
73     tbb::flow::buffer_node<int> *b;
74     tbb::flow::graph& my_graph;
75 public:
76     my_int_buffer(tbb::flow::graph &g) : my_graph(g) { b = new tbb::flow::buffer_node<int>(my_graph); }
77     my_int_buffer(const my_int_buffer& other) : my_graph(other.my_graph) {
78         b = new tbb::flow::buffer_node<int>(my_graph);
79     }
80     ~my_int_buffer() { delete b; }
81     my_int_buffer& operator=(const my_int_buffer& /*other*/) {
82         return *this;
83     }
84 };
85 
86 // test the graph iterator, delete nodes from graph, test again
87 void test_iterator() {
88     tbb::flow::graph g;
89     my_int_buffer a_buffer(g);
90     my_int_buffer b_buffer(g);
91     my_int_buffer c_buffer(g);
92     my_int_buffer *d_buffer = new my_int_buffer(g);
93     my_int_buffer e_buffer(g);
94     std::vector< my_int_buffer > my_buffer_vector(10, c_buffer);
95 
96     int count = 0;
97     for (tbb::flow::graph::iterator it = g.begin(); it != g.end(); ++it) {
98         count++;
99     }
100     CHECK_MESSAGE( (count==15), "error in iterator count");
101 
102     delete d_buffer;
103 
104     count = 0;
105     for (tbb::flow::graph::iterator it = g.begin(); it != g.end(); ++it) {
106         count++;
107     }
108     CHECK_MESSAGE( (count==14), "error in iterator count");
109 
110     my_buffer_vector.clear();
111 
112     count = 0;
113     for (tbb::flow::graph::iterator it = g.begin(); it != g.end(); ++it) {
114         count++;
115     }
116     CHECK_MESSAGE( (count==4), "error in iterator count");
117 }
118 
119 class AddRemoveBody : utils::NoAssign {
120     tbb::flow::graph& g;
121     int nThreads;
122     utils::SpinBarrier &barrier;
123 public:
124     AddRemoveBody(int nthr, utils::SpinBarrier &barrier_, tbb::flow::graph& _g) :
125         g(_g), nThreads(nthr), barrier(barrier_)
126     {}
127     void operator()(const int /*threadID*/) const {
128         my_int_buffer b(g);
129         {
130             std::vector<my_int_buffer> my_buffer_vector(100, b);
131             barrier.wait();  // wait until all nodes are created
132             // now test that the proper number of nodes were created
133             int count = 0;
134             for (tbb::flow::graph::iterator it = g.begin(); it != g.end(); ++it) {
135                 count++;
136             }
137             CHECK_MESSAGE( (count==101*nThreads), "error in iterator count");
138             barrier.wait();  // wait until all threads are done counting
139         } // all nodes but for the initial node on this thread are deleted
140         barrier.wait(); // wait until all threads have deleted all nodes in their vectors
141         // now test that all the nodes were deleted except for the initial node
142         int count = 0;
143         for (tbb::flow::graph::iterator it = g.begin(); it != g.end(); ++it) {
144             count++;
145         }
146         CHECK_MESSAGE( (count==nThreads), "error in iterator count");
147         barrier.wait();  // wait until all threads are done counting
148     } // initial node gets deleted
149 };
150 
151 void test_parallel(int nThreads) {
152     tbb::flow::graph g;
153     utils::SpinBarrier barrier(nThreads);
154     AddRemoveBody body(nThreads, barrier, g);
155     NativeParallelFor(nThreads, body);
156 }
157 
158 /*
159  * Functors for graph arena spawn tests
160  */
161 
162 inline void check_arena(tbb::task_arena* midway_arena) {
163     CHECK_MESSAGE(midway_arena->max_concurrency() == 2, "");
164     CHECK_MESSAGE(tbb::this_task_arena::max_concurrency() == 1, "");
165 }
166 
167 struct run_functor {
168     tbb::task_arena* midway_arena;
169     int return_value;
170     run_functor(tbb::task_arena* a) : midway_arena(a), return_value(1) {}
171     int operator()() {
172         check_arena(midway_arena);
173         return return_value;
174     }
175 };
176 
177 template < typename T >
178 struct function_body {
179     tbb::task_arena* midway_arena;
180     function_body(tbb::task_arena* a) : midway_arena(a) {}
181     tbb::flow::continue_msg operator()(const T& /*arg*/) {
182         check_arena(midway_arena);
183         return tbb::flow::continue_msg();
184     }
185 };
186 
187 typedef tbb::flow::multifunction_node< int, std::tuple< int > > mf_node;
188 
189 struct multifunction_body {
190     tbb::task_arena* midway_arena;
191     multifunction_body(tbb::task_arena* a) : midway_arena(a) {}
192     void operator()(const int& /*arg*/, mf_node::output_ports_type& /*outports*/) {
193         check_arena(midway_arena);
194     }
195 };
196 
197 struct input_body {
198     tbb::task_arena* midway_arena;
199     int counter;
200     input_body(tbb::task_arena* a) : midway_arena(a), counter(0) {}
201     int operator()(tbb::flow_control &fc) {
202         check_arena(midway_arena);
203         if (counter++ >= 1) {
204             fc.stop();
205         }
206         return int();
207     }
208 };
209 
210 struct nodes_test_functor : utils::NoAssign {
211     tbb::task_arena* midway_arena;
212     tbb::flow::graph& my_graph;
213 
214     nodes_test_functor(tbb::task_arena* a, tbb::flow::graph& g) : midway_arena(a), my_graph(g) {}
215     void operator()() const {
216 
217         // Define test nodes
218         // Continue, function, source nodes
219         tbb::flow::continue_node< tbb::flow::continue_msg > c_n(my_graph, function_body<tbb::flow::continue_msg>(midway_arena));
220         tbb::flow::function_node< int > f_n(my_graph, tbb::flow::unlimited, function_body<int>(midway_arena));
221         tbb::flow::input_node< int > s_n(my_graph, input_body(midway_arena));
222 
223         // Multifunction node
224         mf_node m_n(my_graph, tbb::flow::unlimited, multifunction_body(midway_arena));
225 
226         // Join node
227         tbb::flow::function_node< std::tuple< int, int > > join_f_n(
228             my_graph, tbb::flow::unlimited, function_body< std::tuple< int, int > >(midway_arena)
229         );
230         tbb::flow::join_node< std::tuple< int, int > > j_n(my_graph);
231         make_edge(j_n, join_f_n);
232 
233         // Split node
234         tbb::flow::function_node< int > split_f_n1 = f_n;
235         tbb::flow::function_node< int > split_f_n2 = f_n;
236         tbb::flow::split_node< std::tuple< int, int > > sp_n(my_graph);
237         make_edge(tbb::flow::output_port<0>(sp_n), split_f_n1);
238         make_edge(tbb::flow::output_port<1>(sp_n), split_f_n2);
239 
240         // Overwrite node
241         tbb::flow::function_node< int > ow_f_n = f_n;
242         tbb::flow::overwrite_node< int > ow_n(my_graph);
243         make_edge(ow_n, ow_f_n);
244 
245         // Write once node
246         tbb::flow::function_node< int > w_f_n = f_n;
247         tbb::flow::write_once_node< int > w_n(my_graph);
248         make_edge(w_n, w_f_n);
249 
250         // Buffer node
251         tbb::flow::function_node< int > buf_f_n = f_n;
252         tbb::flow::buffer_node< int > buf_n(my_graph);
253         make_edge(w_n, buf_f_n);
254 
255         // Limiter node
256         tbb::flow::function_node< int > l_f_n = f_n;
257         tbb::flow::limiter_node< int > l_n(my_graph, 1);
258         make_edge(l_n, l_f_n);
259 
260         // Execute nodes
261         c_n.try_put( tbb::flow::continue_msg() );
262         f_n.try_put(1);
263         m_n.try_put(1);
264         s_n.activate();
265 
266         tbb::flow::input_port<0>(j_n).try_put(1);
267         tbb::flow::input_port<1>(j_n).try_put(1);
268 
269         std::tuple< int, int > sp_tuple(1, 1);
270         sp_n.try_put(sp_tuple);
271 
272         ow_n.try_put(1);
273         w_n.try_put(1);
274         buf_n.try_put(1);
275         l_n.try_put(1);
276 
277         my_graph.wait_for_all();
278     }
279 };
280 
281 void test_graph_arena() {
282     // There is only one thread for execution (external thread).
283     // So, if graph's tasks get spawned in different arena
284     // external thread won't be able to find them in its own arena.
285     // In this case test should hang.
286     tbb::task_arena arena(1);
287 	arena.execute(
288         [&]() {
289             tbb::flow::graph g;
290             tbb::task_arena midway_arena;
291             midway_arena.initialize(2);
292             midway_arena.execute(nodes_test_functor(&midway_arena, g));
293 
294         }
295 	);
296 }
297 
298 //! Test wait counts
299 //! \brief error_guessing
300 TEST_CASE("Test wait_count"){
301     for(unsigned int p=utils::MinThread; p<=utils::MaxThread; ++p ) {
302         tbb::task_arena arena(p);
303         arena.execute(
304             [&]() {
305                 test_wait_count();
306             }
307         );
308 	}
309 }
310 
311 //! Test graph iterators
312 //! \brief interface
313 TEST_CASE("Test graph::iterator"){
314     for(unsigned int p=utils::MinThread; p<=utils::MaxThread; ++p ) {
315         tbb::task_arena arena(p);
316         arena.execute(
317             [&]() {
318                 test_iterator();
319             }
320         );
321 	}
322 }
323 
324 //! Test parallel for body
325 //! \brief \ref error_guessing
326 TEST_CASE("Test parallel"){
327     for(unsigned int p=utils::MinThread; p<=utils::MaxThread; ++p ) {
328         tbb::task_arena arena(p);
329         arena.execute(
330             [&]() {
331                 test_parallel(p);
332             }
333         );
334 	}
335 }
336 
337 //! Test separate arena isn't used
338 //! \brief \ref error_guessing
339 TEST_CASE("Test graph_arena"){
340     test_graph_arena();
341 }
342 
343 //! Graph iterator
344 //! \brief \ref error_guessing
345 TEST_CASE("graph iterator") {
346     using namespace tbb::flow;
347 
348     graph g;
349 
350     auto past_end = g.end();
351     ++past_end;
352 
353     continue_node<int> n(g, [](const continue_msg &){return 1;});
354 
355     size_t item_count = 0;
356 
357     for(auto it = g.cbegin(); it != g.cend(); it++)
358         ++item_count;
359     CHECK_MESSAGE((item_count == 1), "Should find 1 item");
360 
361     item_count = 0;
362     auto jt(g.begin());
363     for(; jt != g.end(); jt++)
364         ++item_count;
365     CHECK_MESSAGE((item_count == 1), "Should find 1 item");
366 
367     graph g2;
368     continue_node<int> n2(g, [](const continue_msg &){return 1;});
369     CHECK_MESSAGE((g.begin() != g2.begin()), "Different graphs should have different iterators");
370 }
371