xref: /oneTBB/test/tbb/test_input_node.cpp (revision b15aabb3)
1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 // have to expose the reset_node method to be able to reset a function_body
18 
19 #include "common/config.h"
20 
21 #include "tbb/flow_graph.h"
22 
23 #include "common/test.h"
24 #include "common/utils.h"
25 #include "common/utils_assert.h"
26 
27 
28 //! \file test_input_node.cpp
29 //! \brief Test for [flow_graph.input_node] specification
30 
31 
32 using tbb::detail::d1::graph_task;
33 using tbb::detail::d1::SUCCESSFULLY_ENQUEUED;
34 
35 const int N = 1000;
36 
37 template< typename T >
38 class test_push_receiver : public tbb::flow::receiver<T>, utils::NoAssign {
39 
40     std::atomic<int> my_counters[N];
41     tbb::flow::graph& my_graph;
42 
43 public:
44 
45     test_push_receiver(tbb::flow::graph& g) : my_graph(g) {
46         for (int i = 0; i < N; ++i )
47             my_counters[i] = 0;
48     }
49 
50     int get_count( int i ) {
51         int v = my_counters[i];
52         return v;
53     }
54 
55     typedef typename tbb::flow::receiver<T>::predecessor_type predecessor_type;
56 
57     graph_task* try_put_task( const T &v ) override {
58         int i = (int)v;
59         ++my_counters[i];
60         return const_cast<graph_task*>(SUCCESSFULLY_ENQUEUED);
61     }
62 
63     tbb::flow::graph& graph_reference() const override {
64         return my_graph;
65     }
66 };
67 
68 template< typename T >
69 class my_input_body {
70 
71     unsigned my_count;
72     int *ninvocations;
73 
74 public:
75 
76     my_input_body() : ninvocations(NULL) { my_count = 0; }
77     my_input_body(int &_inv) : ninvocations(&_inv)  { my_count = 0; }
78 
79     T operator()( tbb::flow_control& fc ) {
80         T v = (T)my_count++;
81         if(ninvocations) ++(*ninvocations);
82         if ( (int)v < N ){
83             return v;
84         }else{
85             fc.stop();
86             return T();
87         }
88     }
89 
90 };
91 
92 template< typename T >
93 class function_body {
94 
95     std::atomic<int> *my_counters;
96 
97 public:
98 
99     function_body( std::atomic<int> *counters ) : my_counters(counters) {
100         for (int i = 0; i < N; ++i )
101             my_counters[i] = 0;
102     }
103 
104     bool operator()( T v ) {
105         ++my_counters[(int)v];
106         return true;
107     }
108 
109 };
110 
111 template< typename T >
112 void test_single_dest() {
113     // push only
114     tbb::flow::graph g;
115     tbb::flow::input_node<T> src(g, my_input_body<T>() );
116     test_push_receiver<T> dest(g);
117     tbb::flow::make_edge( src, dest );
118     src.activate();
119     g.wait_for_all();
120     for (int i = 0; i < N; ++i ) {
121         CHECK_MESSAGE( dest.get_count(i) == 1, "" );
122     }
123 
124     // push only
125     std::atomic<int> counters3[N];
126     tbb::flow::input_node<T> src3(g, my_input_body<T>() );
127     src3.activate();
128 
129     function_body<T> b3( counters3 );
130     tbb::flow::function_node<T,bool> dest3(g, tbb::flow::unlimited, b3 );
131     tbb::flow::make_edge( src3, dest3 );
132     g.wait_for_all();
133     for (int i = 0; i < N; ++i ) {
134         int v = counters3[i];
135         CHECK_MESSAGE( v == 1, "" );
136     }
137 
138     // push & pull
139     tbb::flow::input_node<T> src2(g, my_input_body<T>() );
140     src2.activate();
141     std::atomic<int> counters2[N];
142 
143     function_body<T> b2( counters2 );
144     tbb::flow::function_node<T,bool,tbb::flow::rejecting> dest2(g, tbb::flow::serial, b2 );
145     tbb::flow::make_edge( src2, dest2 );
146     g.wait_for_all();
147     for (int i = 0; i < N; ++i ) {
148         int v = counters2[i];
149         CHECK_MESSAGE( v == 1, "" );
150     }
151 
152     // test copy constructor
153     tbb::flow::input_node<T> src_copy(src);
154     src_copy.activate();
155     test_push_receiver<T> dest_c(g);
156     CHECK_MESSAGE( src_copy.register_successor(dest_c), "" );
157     g.wait_for_all();
158     for (int i = 0; i < N; ++i ) {
159         CHECK_MESSAGE( dest_c.get_count(i) == 1, "" );
160     }
161 }
162 
163 void test_reset() {
164     //    input_node -> function_node
165     tbb::flow::graph g;
166     std::atomic<int> counters3[N];
167     tbb::flow::input_node<int> src3(g, my_input_body<int>());
168     src3.activate();
169     tbb::flow::input_node<int> src_inactive(g, my_input_body<int>());
170     function_body<int> b3( counters3 );
171     tbb::flow::function_node<int,bool> dest3(g, tbb::flow::unlimited, b3);
172     tbb::flow::make_edge( src3, dest3 );
173     //    source_node already in active state.  Let the graph run,
174     g.wait_for_all();
175     //    check the array for each value.
176     for (int i = 0; i < N; ++i ) {
177         int v = counters3[i];
178         CHECK_MESSAGE( v == 1, "" );
179         counters3[i] = 0;
180     }
181 
182     g.reset(tbb::flow::rf_reset_bodies);  // <-- re-initializes the counts.
183     // and spawns task to run input
184     src3.activate();
185 
186     g.wait_for_all();
187     //    check output queue again.  Should be the same contents.
188     for (int i = 0; i < N; ++i ) {
189         int v = counters3[i];
190         CHECK_MESSAGE( v == 1, "" );
191         counters3[i] = 0;
192     }
193     g.reset();  // doesn't reset the input_node_body to initial state, but does spawn a task
194                 // to run the input_node.
195 
196     g.wait_for_all();
197     // array should be all zero
198     for (int i = 0; i < N; ++i ) {
199         int v = counters3[i];
200         CHECK_MESSAGE( v == 0, "" );
201     }
202 
203     remove_edge(src3, dest3);
204     make_edge(src_inactive, dest3);
205 
206     // src_inactive doesn't run
207     g.wait_for_all();
208     for (int i = 0; i < N; ++i ) {
209         int v = counters3[i];
210         CHECK_MESSAGE( v == 0, "" );
211     }
212 
213     // run graph
214     src_inactive.activate();
215     g.wait_for_all();
216     // check output
217     for (int i = 0; i < N; ++i ) {
218         int v = counters3[i];
219         CHECK_MESSAGE( v == 1, "" );
220         counters3[i] = 0;
221     }
222     g.reset(tbb::flow::rf_reset_bodies);  // <-- reinitializes the counts
223     // src_inactive doesn't run
224     g.wait_for_all();
225     for (int i = 0; i < N; ++i ) {
226         int v = counters3[i];
227         CHECK_MESSAGE( v == 0, "" );
228     }
229 
230     // start it up
231     src_inactive.activate();
232     g.wait_for_all();
233     for (int i = 0; i < N; ++i ) {
234         int v = counters3[i];
235         CHECK_MESSAGE( v == 1, "" );
236         counters3[i] = 0;
237     }
238     g.reset();  // doesn't reset the input_node_body to initial state, and doesn't
239                 // spawn a task to run the input_node.
240 
241     g.wait_for_all();
242     // array should be all zero
243     for (int i = 0; i < N; ++i ) {
244         int v = counters3[i];
245         CHECK_MESSAGE( v == 0, "" );
246     }
247     src_inactive.activate();
248     // input_node_body is already in final state, so input_node will not forward a message.
249     g.wait_for_all();
250     for (int i = 0; i < N; ++i ) {
251         int v = counters3[i];
252         CHECK_MESSAGE( v == 0, "" );
253     }
254 }
255 
256 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
257 int input_body_f(tbb::flow_control&) { return 42; }
258 
259 void test_deduction_guides() {
260     using namespace tbb::flow;
261     graph g;
262 
263     auto lambda = [](tbb::flow_control&) { return 42; };
264     auto non_const_lambda = [](tbb::flow_control&) mutable { return 42; };
265 
266     // Tests for input_node(graph&, Body)
267     input_node s1(g, lambda);
268     static_assert(std::is_same_v<decltype(s1), input_node<int>>);
269 
270     input_node s2(g, non_const_lambda);
271     static_assert(std::is_same_v<decltype(s2), input_node<int>>);
272 
273     input_node s3(g, input_body_f);
274     static_assert(std::is_same_v<decltype(s3), input_node<int>>);
275 
276     input_node s4(s3);
277     static_assert(std::is_same_v<decltype(s4), input_node<int>>);
278 
279 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
280     broadcast_node<int> bc(g);
281 
282     // Tests for input_node(const node_set<Args...>&, Body)
283     input_node s5(precedes(bc), lambda);
284     static_assert(std::is_same_v<decltype(s5), input_node<int>>);
285 
286     input_node s6(precedes(bc), non_const_lambda);
287     static_assert(std::is_same_v<decltype(s6), input_node<int>>);
288 
289     input_node s7(precedes(bc), input_body_f);
290     static_assert(std::is_same_v<decltype(s7), input_node<int>>);
291 #endif
292     g.wait_for_all();
293 }
294 
295 #endif // __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
296 
297 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
298 #include <array>
299 void test_follows_and_precedes_api() {
300     using namespace tbb::flow;
301 
302     graph g;
303 
304     std::array<buffer_node<bool>, 3> successors {{
305                                                   buffer_node<bool>(g),
306                                                   buffer_node<bool>(g),
307                                                   buffer_node<bool>(g)
308         }};
309 
310     bool do_try_put = true;
311     input_node<bool> src(
312         precedes(successors[0], successors[1], successors[2]),
313         [&](tbb::flow_control& fc) -> bool {
314             if(!do_try_put)
315                 fc.stop();
316             do_try_put = !do_try_put;
317             return true;
318         }
319     );
320 
321     src.activate();
322     g.wait_for_all();
323 
324     bool storage;
325     for(auto& successor: successors) {
326         CHECK_MESSAGE((successor.try_get(storage) && !successor.try_get(storage)),
327                       "Not exact edge quantity was made");
328     }
329 }
330 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
331 
332 //! Test push, push-pull behavior and copy constructor
333 //! \brief \ref error_guessing \ref requirement
334 TEST_CASE("Single destination tests"){
335     for ( unsigned int p = utils::MinThread; p < utils::MaxThread; ++p ) {
336         tbb::task_arena arena(p);
337         arena.execute(
338             [&]() {
339                 test_single_dest<int>();
340                 test_single_dest<float>();
341             }
342         );
343 	}
344 }
345 
346 //! Test reset variants
347 //! \brief \ref error_guessing
348 TEST_CASE("Reset test"){
349     test_reset();
350 }
351 
352 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
353 //! Test follows and precedes API
354 //! \brief \ref error_guessing
355 TEST_CASE("Follows and precedes API"){
356     test_follows_and_precedes_api();
357 }
358 #endif
359 
360 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
361 //! Test deduction guides
362 //! \brief \ref requirement
363 TEST_CASE("Deduction guides"){
364     test_deduction_guides();
365 }
366 #endif
367 
368 //! Test try_get before activation
369 //! \brief \ref error_guessing
370 TEST_CASE("try_get before activation"){
371     tbb::flow::graph g;
372     tbb::flow::input_node<int> in(g, [&](tbb::flow_control& fc) -> bool { fc.stop(); return 0;});
373 
374     int tmp = -1;
375     CHECK_MESSAGE((in.try_get(tmp) == false), "try_get before activation should not succeed");
376 }
377