xref: /oneTBB/test/tbb/test_input_node.cpp (revision 963dfbf7)
1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 // have to expose the reset_node method to be able to reset a function_body
18 
19 #include "common/config.h"
20 
21 #include "tbb/flow_graph.h"
22 
23 #include "common/test.h"
24 #include "common/utils.h"
25 #include "common/utils_assert.h"
26 #include "common/concepts_common.h"
27 
28 
29 //! \file test_input_node.cpp
30 //! \brief Test for [flow_graph.input_node] specification
31 
32 
33 using tbb::detail::d1::graph_task;
34 using tbb::detail::d1::SUCCESSFULLY_ENQUEUED;
35 
36 const int N = 1000;
37 
38 template< typename T >
39 class test_push_receiver : public tbb::flow::receiver<T>, utils::NoAssign {
40 
41     std::atomic<int> my_counters[N];
42     tbb::flow::graph& my_graph;
43 
44 public:
45 
46     test_push_receiver(tbb::flow::graph& g) : my_graph(g) {
47         for (int i = 0; i < N; ++i )
48             my_counters[i] = 0;
49     }
50 
51     int get_count( int i ) {
52         int v = my_counters[i];
53         return v;
54     }
55 
56     typedef typename tbb::flow::receiver<T>::predecessor_type predecessor_type;
57 
58     graph_task* try_put_task( const T &v ) override {
59         int i = (int)v;
60         ++my_counters[i];
61         return const_cast<graph_task*>(SUCCESSFULLY_ENQUEUED);
62     }
63 
64     tbb::flow::graph& graph_reference() const override {
65         return my_graph;
66     }
67 };
68 
69 template< typename T >
70 class my_input_body {
71 
72     unsigned my_count;
73     int *ninvocations;
74 
75 public:
76 
77     my_input_body() : ninvocations(NULL) { my_count = 0; }
78     my_input_body(int &_inv) : ninvocations(&_inv)  { my_count = 0; }
79 
80     T operator()( tbb::flow_control& fc ) {
81         T v = (T)my_count++;
82         if(ninvocations) ++(*ninvocations);
83         if ( (int)v < N ){
84             return v;
85         }else{
86             fc.stop();
87             return T();
88         }
89     }
90 
91 };
92 
93 template< typename T >
94 class function_body {
95 
96     std::atomic<int> *my_counters;
97 
98 public:
99 
100     function_body( std::atomic<int> *counters ) : my_counters(counters) {
101         for (int i = 0; i < N; ++i )
102             my_counters[i] = 0;
103     }
104 
105     bool operator()( T v ) {
106         ++my_counters[(int)v];
107         return true;
108     }
109 
110 };
111 
112 template< typename T >
113 void test_single_dest() {
114     // push only
115     tbb::flow::graph g;
116     tbb::flow::input_node<T> src(g, my_input_body<T>() );
117     test_push_receiver<T> dest(g);
118     tbb::flow::make_edge( src, dest );
119     src.activate();
120     g.wait_for_all();
121     for (int i = 0; i < N; ++i ) {
122         CHECK_MESSAGE( dest.get_count(i) == 1, "" );
123     }
124 
125     // push only
126     std::atomic<int> counters3[N];
127     tbb::flow::input_node<T> src3(g, my_input_body<T>() );
128     src3.activate();
129 
130     function_body<T> b3( counters3 );
131     tbb::flow::function_node<T,bool> dest3(g, tbb::flow::unlimited, b3 );
132     tbb::flow::make_edge( src3, dest3 );
133     g.wait_for_all();
134     for (int i = 0; i < N; ++i ) {
135         int v = counters3[i];
136         CHECK_MESSAGE( v == 1, "" );
137     }
138 
139     // push & pull
140     tbb::flow::input_node<T> src2(g, my_input_body<T>() );
141     src2.activate();
142     std::atomic<int> counters2[N];
143 
144     function_body<T> b2( counters2 );
145     tbb::flow::function_node<T,bool,tbb::flow::rejecting> dest2(g, tbb::flow::serial, b2 );
146     tbb::flow::make_edge( src2, dest2 );
147     g.wait_for_all();
148     for (int i = 0; i < N; ++i ) {
149         int v = counters2[i];
150         CHECK_MESSAGE( v == 1, "" );
151     }
152 
153     // test copy constructor
154     tbb::flow::input_node<T> src_copy(src);
155     src_copy.activate();
156     test_push_receiver<T> dest_c(g);
157     CHECK_MESSAGE( src_copy.register_successor(dest_c), "" );
158     g.wait_for_all();
159     for (int i = 0; i < N; ++i ) {
160         CHECK_MESSAGE( dest_c.get_count(i) == 1, "" );
161     }
162 }
163 
164 void test_reset() {
165     //    input_node -> function_node
166     tbb::flow::graph g;
167     std::atomic<int> counters3[N];
168     tbb::flow::input_node<int> src3(g, my_input_body<int>());
169     src3.activate();
170     tbb::flow::input_node<int> src_inactive(g, my_input_body<int>());
171     function_body<int> b3( counters3 );
172     tbb::flow::function_node<int,bool> dest3(g, tbb::flow::unlimited, b3);
173     tbb::flow::make_edge( src3, dest3 );
174     //    source_node already in active state.  Let the graph run,
175     g.wait_for_all();
176     //    check the array for each value.
177     for (int i = 0; i < N; ++i ) {
178         int v = counters3[i];
179         CHECK_MESSAGE( v == 1, "" );
180         counters3[i] = 0;
181     }
182 
183     g.reset(tbb::flow::rf_reset_bodies);  // <-- re-initializes the counts.
184     // and spawns task to run input
185     src3.activate();
186 
187     g.wait_for_all();
188     //    check output queue again.  Should be the same contents.
189     for (int i = 0; i < N; ++i ) {
190         int v = counters3[i];
191         CHECK_MESSAGE( v == 1, "" );
192         counters3[i] = 0;
193     }
194     g.reset();  // doesn't reset the input_node_body to initial state, but does spawn a task
195                 // to run the input_node.
196 
197     g.wait_for_all();
198     // array should be all zero
199     for (int i = 0; i < N; ++i ) {
200         int v = counters3[i];
201         CHECK_MESSAGE( v == 0, "" );
202     }
203 
204     remove_edge(src3, dest3);
205     make_edge(src_inactive, dest3);
206 
207     // src_inactive doesn't run
208     g.wait_for_all();
209     for (int i = 0; i < N; ++i ) {
210         int v = counters3[i];
211         CHECK_MESSAGE( v == 0, "" );
212     }
213 
214     // run graph
215     src_inactive.activate();
216     g.wait_for_all();
217     // check output
218     for (int i = 0; i < N; ++i ) {
219         int v = counters3[i];
220         CHECK_MESSAGE( v == 1, "" );
221         counters3[i] = 0;
222     }
223     g.reset(tbb::flow::rf_reset_bodies);  // <-- reinitializes the counts
224     // src_inactive doesn't run
225     g.wait_for_all();
226     for (int i = 0; i < N; ++i ) {
227         int v = counters3[i];
228         CHECK_MESSAGE( v == 0, "" );
229     }
230 
231     // start it up
232     src_inactive.activate();
233     g.wait_for_all();
234     for (int i = 0; i < N; ++i ) {
235         int v = counters3[i];
236         CHECK_MESSAGE( v == 1, "" );
237         counters3[i] = 0;
238     }
239     g.reset();  // doesn't reset the input_node_body to initial state, and doesn't
240                 // spawn a task to run the input_node.
241 
242     g.wait_for_all();
243     // array should be all zero
244     for (int i = 0; i < N; ++i ) {
245         int v = counters3[i];
246         CHECK_MESSAGE( v == 0, "" );
247     }
248     src_inactive.activate();
249     // input_node_body is already in final state, so input_node will not forward a message.
250     g.wait_for_all();
251     for (int i = 0; i < N; ++i ) {
252         int v = counters3[i];
253         CHECK_MESSAGE( v == 0, "" );
254     }
255 }
256 
257 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
258 #include <array>
259 void test_follows_and_precedes_api() {
260     using namespace tbb::flow;
261 
262     graph g;
263 
264     std::array<buffer_node<bool>, 3> successors {{
265                                                   buffer_node<bool>(g),
266                                                   buffer_node<bool>(g),
267                                                   buffer_node<bool>(g)
268         }};
269 
270     bool do_try_put = true;
271     input_node<bool> src(
272         precedes(successors[0], successors[1], successors[2]),
273         [&](tbb::flow_control& fc) -> bool {
274             if(!do_try_put)
275                 fc.stop();
276             do_try_put = !do_try_put;
277             return true;
278         }
279     );
280 
281     src.activate();
282     g.wait_for_all();
283 
284     bool storage;
285     for(auto& successor: successors) {
286         CHECK_MESSAGE((successor.try_get(storage) && !successor.try_get(storage)),
287                       "Not exact edge quantity was made");
288     }
289 }
290 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
291 
292 //! Test push, push-pull behavior and copy constructor
293 //! \brief \ref error_guessing \ref requirement
294 TEST_CASE("Single destination tests"){
295     for ( unsigned int p = utils::MinThread; p < utils::MaxThread; ++p ) {
296         tbb::task_arena arena(p);
297         arena.execute(
298             [&]() {
299                 test_single_dest<int>();
300                 test_single_dest<float>();
301             }
302         );
303 	}
304 }
305 
306 //! Test reset variants
307 //! \brief \ref error_guessing
308 TEST_CASE("Reset test"){
309     test_reset();
310 }
311 
312 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
313 //! Test follows and precedes API
314 //! \brief \ref error_guessing
315 TEST_CASE("Follows and precedes API"){
316     test_follows_and_precedes_api();
317 }
318 #endif
319 
320 //! Test try_get before activation
321 //! \brief \ref error_guessing
322 TEST_CASE("try_get before activation"){
323     tbb::flow::graph g;
324     tbb::flow::input_node<int> in(g, [&](tbb::flow_control& fc) { fc.stop(); return 0;});
325 
326     int tmp = -1;
327     CHECK_MESSAGE((in.try_get(tmp) == false), "try_get before activation should not succeed");
328 }
329 
330 #if __TBB_CPP20_CONCEPTS_PRESENT
331 //! \brief \ref error_guessing
332 TEST_CASE("constraints for input_node output") {
333     struct Object : test_concepts::Copyable, test_concepts::CopyAssignable {};
334 
335     static_assert(utils::well_formed_instantiation<tbb::flow::input_node, Object>);
336     static_assert(utils::well_formed_instantiation<tbb::flow::input_node, int>);
337     static_assert(!utils::well_formed_instantiation<tbb::flow::input_node, test_concepts::NonCopyable>);
338     static_assert(!utils::well_formed_instantiation<tbb::flow::input_node, test_concepts::NonCopyAssignable>);
339 }
340 
341 template <typename Output, typename Body>
342 concept can_call_input_node_ctor = requires( tbb::flow::graph& graph, Body body, tbb::flow::buffer_node<int> f ) {
343     tbb::flow::input_node<Output>(graph, body);
344 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
345     tbb::flow::input_node<Output>(tbb::flow::precedes(f), body);
346 #endif
347 };
348 
349 //! \brief \ref error_guessing
350 TEST_CASE("constraints for input_node body") {
351     using output_type = int;
352     using namespace test_concepts::input_node_body;
353 
354     static_assert(can_call_input_node_ctor<output_type, Correct<output_type>>);
355     static_assert(!can_call_input_node_ctor<output_type, NonCopyable<output_type>>);
356     static_assert(!can_call_input_node_ctor<output_type, NonDestructible<output_type>>);
357     static_assert(!can_call_input_node_ctor<output_type, NoOperatorRoundBrackets<output_type>>);
358     static_assert(!can_call_input_node_ctor<output_type, WrongInputOperatorRoundBrackets<output_type>>);
359     static_assert(!can_call_input_node_ctor<output_type, WrongReturnOperatorRoundBrackets<output_type>>);
360 }
361 #endif // __TBB_CPP20_CONCEPTS_PRESENT
362