xref: /oneTBB/test/tbb/test_flow_graph.cpp (revision b15aabb3)
1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #if __INTEL_COMPILER && _MSC_VER
18 #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
19 #endif
20 
21 #include "common/config.h"
22 
23 // TODO revamp: move parts dependent on __TBB_EXTRA_DEBUG into separate test(s) since having these
24 // parts in all of tests might make testing of the product, which is different from what is actually
25 // released.
26 #define __TBB_EXTRA_DEBUG 1
27 #include "tbb/flow_graph.h"
28 
29 #include "common/test.h"
30 #include "common/utils.h"
31 #include "common/graph_utils.h"
32 #include "common/spin_barrier.h"
33 
34 
35 //! \file test_flow_graph.cpp
36 //! \brief Test for [flow_graph.continue_msg flow_graph.graph_node flow_graph.input_port flow_graph.output_port flow_graph.join_node flow_graph.split_node flow_graph.limiter_node flow_graph.write_once_node flow_graph.overwrite_node flow_graph.make_edge flow_graph.graph flow_graph.buffer_node flow_graph.function_node flow_graph.multifunction_node flow_graph.continue_node flow_graph.input_node] specification
37 
38 const int T = 4;
39 const int W = 4;
40 
41 struct decrement_wait : utils::NoAssign {
42 
43     tbb::flow::graph * const my_graph;
44     bool * const my_done_flag;
45 
46     decrement_wait( tbb::flow::graph &h, bool *done_flag ) : my_graph(&h), my_done_flag(done_flag) {}
47 
48     void operator()(int i) const {
49         utils::Sleep(10 * i);
50 
51         my_done_flag[i] = true;
52         my_graph->release_wait();
53     }
54 };
55 
56 static void test_wait_count() {
57     tbb::flow::graph h;
58     for (int i = 0; i < T; ++i ) {
59         bool done_flag[W];
60         for (int j = 0; j < W; ++j ) {
61             for ( int w = 0; w < W; ++w ) done_flag[w] = false;
62             for ( int w = 0; w < j; ++w ) h.reserve_wait();
63 
64             utils::NativeParallelFor( j, decrement_wait(h, done_flag) );
65             h.wait_for_all();
66             for ( int w = 0; w < W; ++w ) {
67                 if ( w < j ) CHECK_MESSAGE( done_flag[w] == true, "" );
68                 else CHECK_MESSAGE( done_flag[w] == false, "" );
69             }
70         }
71     }
72 }
73 
74 // Encapsulate object we want to store in vector (because contained type must have
75 // copy constructor and assignment operator
76 class my_int_buffer {
77     tbb::flow::buffer_node<int> *b;
78     tbb::flow::graph& my_graph;
79 public:
80     my_int_buffer(tbb::flow::graph &g) : my_graph(g) { b = new tbb::flow::buffer_node<int>(my_graph); }
81     my_int_buffer(const my_int_buffer& other) : my_graph(other.my_graph) {
82         b = new tbb::flow::buffer_node<int>(my_graph);
83     }
84     ~my_int_buffer() { delete b; }
85     my_int_buffer& operator=(const my_int_buffer& /*other*/) {
86         return *this;
87     }
88 };
89 
90 // test the graph iterator, delete nodes from graph, test again
91 void test_iterator() {
92     tbb::flow::graph g;
93     my_int_buffer a_buffer(g);
94     my_int_buffer b_buffer(g);
95     my_int_buffer c_buffer(g);
96     my_int_buffer *d_buffer = new my_int_buffer(g);
97     my_int_buffer e_buffer(g);
98     std::vector< my_int_buffer > my_buffer_vector(10, c_buffer);
99 
100     int count = 0;
101     for (tbb::flow::graph::iterator it = g.begin(); it != g.end(); ++it) {
102         count++;
103     }
104     CHECK_MESSAGE( (count==15), "error in iterator count");
105 
106     delete d_buffer;
107 
108     count = 0;
109     for (tbb::flow::graph::iterator it = g.begin(); it != g.end(); ++it) {
110         count++;
111     }
112     CHECK_MESSAGE( (count==14), "error in iterator count");
113 
114     my_buffer_vector.clear();
115 
116     count = 0;
117     for (tbb::flow::graph::iterator it = g.begin(); it != g.end(); ++it) {
118         count++;
119     }
120     CHECK_MESSAGE( (count==4), "error in iterator count");
121 }
122 
123 class AddRemoveBody : utils::NoAssign {
124     tbb::flow::graph& g;
125     int nThreads;
126     utils::SpinBarrier &barrier;
127 public:
128     AddRemoveBody(int nthr, utils::SpinBarrier &barrier_, tbb::flow::graph& _g) :
129         g(_g), nThreads(nthr), barrier(barrier_)
130     {}
131     void operator()(const int /*threadID*/) const {
132         my_int_buffer b(g);
133         {
134             std::vector<my_int_buffer> my_buffer_vector(100, b);
135             barrier.wait();  // wait until all nodes are created
136             // now test that the proper number of nodes were created
137             int count = 0;
138             for (tbb::flow::graph::iterator it = g.begin(); it != g.end(); ++it) {
139                 count++;
140             }
141             CHECK_MESSAGE( (count==101*nThreads), "error in iterator count");
142             barrier.wait();  // wait until all threads are done counting
143         } // all nodes but for the initial node on this thread are deleted
144         barrier.wait(); // wait until all threads have deleted all nodes in their vectors
145         // now test that all the nodes were deleted except for the initial node
146         int count = 0;
147         for (tbb::flow::graph::iterator it = g.begin(); it != g.end(); ++it) {
148             count++;
149         }
150         CHECK_MESSAGE( (count==nThreads), "error in iterator count");
151         barrier.wait();  // wait until all threads are done counting
152     } // initial node gets deleted
153 };
154 
155 void test_parallel(int nThreads) {
156     tbb::flow::graph g;
157     utils::SpinBarrier barrier(nThreads);
158     AddRemoveBody body(nThreads, barrier, g);
159     NativeParallelFor(nThreads, body);
160 }
161 
162 /*
163  * Functors for graph arena spawn tests
164  */
165 
166 inline void check_arena(tbb::task_arena* midway_arena) {
167     CHECK_MESSAGE(midway_arena->max_concurrency() == 2, "");
168     CHECK_MESSAGE(tbb::this_task_arena::max_concurrency() == 1, "");
169 }
170 
171 struct run_functor {
172     tbb::task_arena* midway_arena;
173     int return_value;
174     run_functor(tbb::task_arena* a) : midway_arena(a), return_value(1) {}
175     int operator()() {
176         check_arena(midway_arena);
177         return return_value;
178     }
179 };
180 
181 template < typename T >
182 struct function_body {
183     tbb::task_arena* midway_arena;
184     function_body(tbb::task_arena* a) : midway_arena(a) {}
185     tbb::flow::continue_msg operator()(const T& /*arg*/) {
186         check_arena(midway_arena);
187         return tbb::flow::continue_msg();
188     }
189 };
190 
191 typedef tbb::flow::multifunction_node< int, std::tuple< int > > mf_node;
192 
193 struct multifunction_body {
194     tbb::task_arena* midway_arena;
195     multifunction_body(tbb::task_arena* a) : midway_arena(a) {}
196     void operator()(const int& /*arg*/, mf_node::output_ports_type& /*outports*/) {
197         check_arena(midway_arena);
198     }
199 };
200 
201 struct input_body {
202     tbb::task_arena* midway_arena;
203     int counter;
204     input_body(tbb::task_arena* a) : midway_arena(a), counter(0) {}
205     int operator()(tbb::flow_control &fc) {
206         check_arena(midway_arena);
207         if (counter++ >= 1) {
208             fc.stop();
209         }
210         return int();
211     }
212 };
213 
214 struct nodes_test_functor : utils::NoAssign {
215     tbb::task_arena* midway_arena;
216     tbb::flow::graph& my_graph;
217 
218     nodes_test_functor(tbb::task_arena* a, tbb::flow::graph& g) : midway_arena(a), my_graph(g) {}
219     void operator()() const {
220 
221         // Define test nodes
222         // Continue, function, source nodes
223         tbb::flow::continue_node< tbb::flow::continue_msg > c_n(my_graph, function_body<tbb::flow::continue_msg>(midway_arena));
224         tbb::flow::function_node< int > f_n(my_graph, tbb::flow::unlimited, function_body<int>(midway_arena));
225         tbb::flow::input_node< int > s_n(my_graph, input_body(midway_arena));
226 
227         // Multifunction node
228         mf_node m_n(my_graph, tbb::flow::unlimited, multifunction_body(midway_arena));
229 
230         // Join node
231         tbb::flow::function_node< std::tuple< int, int > > join_f_n(
232             my_graph, tbb::flow::unlimited, function_body< std::tuple< int, int > >(midway_arena)
233         );
234         tbb::flow::join_node< std::tuple< int, int > > j_n(my_graph);
235         make_edge(j_n, join_f_n);
236 
237         // Split node
238         tbb::flow::function_node< int > split_f_n1 = f_n;
239         tbb::flow::function_node< int > split_f_n2 = f_n;
240         tbb::flow::split_node< std::tuple< int, int > > sp_n(my_graph);
241         make_edge(tbb::flow::output_port<0>(sp_n), split_f_n1);
242         make_edge(tbb::flow::output_port<1>(sp_n), split_f_n2);
243 
244         // Overwrite node
245         tbb::flow::function_node< int > ow_f_n = f_n;
246         tbb::flow::overwrite_node< int > ow_n(my_graph);
247         make_edge(ow_n, ow_f_n);
248 
249         // Write once node
250         tbb::flow::function_node< int > w_f_n = f_n;
251         tbb::flow::write_once_node< int > w_n(my_graph);
252         make_edge(w_n, w_f_n);
253 
254         // Buffer node
255         tbb::flow::function_node< int > buf_f_n = f_n;
256         tbb::flow::buffer_node< int > buf_n(my_graph);
257         make_edge(w_n, buf_f_n);
258 
259         // Limiter node
260         tbb::flow::function_node< int > l_f_n = f_n;
261         tbb::flow::limiter_node< int > l_n(my_graph, 1);
262         make_edge(l_n, l_f_n);
263 
264         // Execute nodes
265         c_n.try_put( tbb::flow::continue_msg() );
266         f_n.try_put(1);
267         m_n.try_put(1);
268         s_n.activate();
269 
270         tbb::flow::input_port<0>(j_n).try_put(1);
271         tbb::flow::input_port<1>(j_n).try_put(1);
272 
273         std::tuple< int, int > sp_tuple(1, 1);
274         sp_n.try_put(sp_tuple);
275 
276         ow_n.try_put(1);
277         w_n.try_put(1);
278         buf_n.try_put(1);
279         l_n.try_put(1);
280 
281         my_graph.wait_for_all();
282     }
283 };
284 
285 void test_graph_arena() {
286     // There is only one thread for execution (external thread).
287     // So, if graph's tasks get spawned in different arena
288     // external thread won't be able to find them in its own arena.
289     // In this case test should hang.
290     tbb::task_arena arena(1);
291 	arena.execute(
292         [&]() {
293             tbb::flow::graph g;
294             tbb::task_arena midway_arena;
295             midway_arena.initialize(2);
296             midway_arena.execute(nodes_test_functor(&midway_arena, g));
297 
298         }
299 	);
300 }
301 
302 //! Test wait counts
303 //! \brief error_guessing
304 TEST_CASE("Test wait_count"){
305     for(unsigned int p=utils::MinThread; p<=utils::MaxThread; ++p ) {
306         tbb::task_arena arena(p);
307         arena.execute(
308             [&]() {
309                 test_wait_count();
310             }
311         );
312 	}
313 }
314 
315 //! Test graph iterators
316 //! \brief interface
317 TEST_CASE("Test graph::iterator"){
318     for(unsigned int p=utils::MinThread; p<=utils::MaxThread; ++p ) {
319         tbb::task_arena arena(p);
320         arena.execute(
321             [&]() {
322                 test_iterator();
323             }
324         );
325 	}
326 }
327 
328 //! Test parallel for body
329 //! \brief \ref error_guessing
330 TEST_CASE("Test parallel"){
331     for(unsigned int p=utils::MinThread; p<=utils::MaxThread; ++p ) {
332         tbb::task_arena arena(p);
333         arena.execute(
334             [&]() {
335                 test_parallel(p);
336             }
337         );
338 	}
339 }
340 
341 //! Test separate arena isn't used
342 //! \brief \ref error_guessing
343 TEST_CASE("Test graph_arena"){
344     test_graph_arena();
345 }
346 
347 //! Graph iterator
348 //! \brief \ref error_guessing
349 TEST_CASE("graph iterator") {
350     using namespace tbb::flow;
351 
352     graph g;
353 
354     auto past_end = g.end();
355     ++past_end;
356 
357     continue_node<int> n(g, [](const continue_msg &){return 1;});
358 
359     size_t item_count = 0;
360 
361     for(auto it = g.cbegin(); it != g.cend(); it++)
362         ++item_count;
363     CHECK_MESSAGE((item_count == 1), "Should find 1 item");
364 
365     item_count = 0;
366     auto jt(g.begin());
367     for(; jt != g.end(); jt++)
368         ++item_count;
369     CHECK_MESSAGE((item_count == 1), "Should find 1 item");
370 
371     graph g2;
372     continue_node<int> n2(g, [](const continue_msg &){return 1;});
373     CHECK_MESSAGE((g.begin() != g2.begin()), "Different graphs should have different iterators");
374 }
375