xref: /oneTBB/test/tbb/test_input_node.cpp (revision 6caecf96)
1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 // have to expose the reset_node method to be able to reset a function_body
18 
19 #include "common/config.h"
20 
21 #include "tbb/flow_graph.h"
22 
23 #include "common/test.h"
24 #include "common/utils.h"
25 #include "common/utils_assert.h"
26 #include "common/concepts_common.h"
27 
28 
29 //! \file test_input_node.cpp
30 //! \brief Test for [flow_graph.input_node] specification
31 
32 
33 using tbb::detail::d1::graph_task;
34 using tbb::detail::d1::SUCCESSFULLY_ENQUEUED;
35 
36 const int N = 1000;
37 
38 template< typename T >
39 class test_push_receiver : public tbb::flow::receiver<T>, utils::NoAssign {
40 
41     std::atomic<int> my_counters[N];
42     tbb::flow::graph& my_graph;
43 
44 public:
45 
46     test_push_receiver(tbb::flow::graph& g) : my_graph(g) {
47         for (int i = 0; i < N; ++i )
48             my_counters[i] = 0;
49     }
50 
51     int get_count( int i ) {
52         int v = my_counters[i];
53         return v;
54     }
55 
56     typedef typename tbb::flow::receiver<T>::predecessor_type predecessor_type;
57 
58     graph_task* try_put_task( const T &v ) override {
59         int i = (int)v;
60         ++my_counters[i];
61         return const_cast<graph_task*>(SUCCESSFULLY_ENQUEUED);
62     }
63 
64     tbb::flow::graph& graph_reference() const override {
65         return my_graph;
66     }
67 };
68 
69 template< typename T >
70 class my_input_body {
71 
72     unsigned my_count;
73     int *ninvocations;
74 
75 public:
76 
77     my_input_body() : ninvocations(NULL) { my_count = 0; }
78     my_input_body(int &_inv) : ninvocations(&_inv)  { my_count = 0; }
79 
80     T operator()( tbb::flow_control& fc ) {
81         T v = (T)my_count++;
82         if(ninvocations) ++(*ninvocations);
83         if ( (int)v < N ){
84             return v;
85         }else{
86             fc.stop();
87             return T();
88         }
89     }
90 
91 };
92 
93 template< typename T >
94 class function_body {
95 
96     std::atomic<int> *my_counters;
97 
98 public:
99 
100     function_body( std::atomic<int> *counters ) : my_counters(counters) {
101         for (int i = 0; i < N; ++i )
102             my_counters[i] = 0;
103     }
104 
105     bool operator()( T v ) {
106         ++my_counters[(int)v];
107         return true;
108     }
109 
110 };
111 
112 template< typename T >
113 void test_single_dest() {
114     // push only
115     tbb::flow::graph g;
116     tbb::flow::input_node<T> src(g, my_input_body<T>() );
117     test_push_receiver<T> dest(g);
118     tbb::flow::make_edge( src, dest );
119     src.activate();
120     g.wait_for_all();
121     for (int i = 0; i < N; ++i ) {
122         CHECK_MESSAGE( dest.get_count(i) == 1, "" );
123     }
124 
125     // push only
126     std::atomic<int> counters3[N];
127     tbb::flow::input_node<T> src3(g, my_input_body<T>() );
128     src3.activate();
129 
130     function_body<T> b3( counters3 );
131     tbb::flow::function_node<T,bool> dest3(g, tbb::flow::unlimited, b3 );
132     tbb::flow::make_edge( src3, dest3 );
133     g.wait_for_all();
134     for (int i = 0; i < N; ++i ) {
135         int v = counters3[i];
136         CHECK_MESSAGE( v == 1, "" );
137     }
138 
139     // push & pull
140     tbb::flow::input_node<T> src2(g, my_input_body<T>() );
141     src2.activate();
142     std::atomic<int> counters2[N];
143 
144     function_body<T> b2( counters2 );
145     tbb::flow::function_node<T,bool,tbb::flow::rejecting> dest2(g, tbb::flow::serial, b2 );
146     tbb::flow::make_edge( src2, dest2 );
147     g.wait_for_all();
148     for (int i = 0; i < N; ++i ) {
149         int v = counters2[i];
150         CHECK_MESSAGE( v == 1, "" );
151     }
152 
153     // test copy constructor
154     tbb::flow::input_node<T> src_copy(src);
155     src_copy.activate();
156     test_push_receiver<T> dest_c(g);
157     CHECK_MESSAGE( src_copy.register_successor(dest_c), "" );
158     g.wait_for_all();
159     for (int i = 0; i < N; ++i ) {
160         CHECK_MESSAGE( dest_c.get_count(i) == 1, "" );
161     }
162 }
163 
164 void test_reset() {
165     //    input_node -> function_node
166     tbb::flow::graph g;
167     std::atomic<int> counters3[N];
168     tbb::flow::input_node<int> src3(g, my_input_body<int>());
169     src3.activate();
170     tbb::flow::input_node<int> src_inactive(g, my_input_body<int>());
171     function_body<int> b3( counters3 );
172     tbb::flow::function_node<int,bool> dest3(g, tbb::flow::unlimited, b3);
173     tbb::flow::make_edge( src3, dest3 );
174     //    source_node already in active state.  Let the graph run,
175     g.wait_for_all();
176     //    check the array for each value.
177     for (int i = 0; i < N; ++i ) {
178         int v = counters3[i];
179         CHECK_MESSAGE( v == 1, "" );
180         counters3[i] = 0;
181     }
182 
183     g.reset(tbb::flow::rf_reset_bodies);  // <-- re-initializes the counts.
184     // and spawns task to run input
185     src3.activate();
186 
187     g.wait_for_all();
188     //    check output queue again.  Should be the same contents.
189     for (int i = 0; i < N; ++i ) {
190         int v = counters3[i];
191         CHECK_MESSAGE( v == 1, "" );
192         counters3[i] = 0;
193     }
194     g.reset();  // doesn't reset the input_node_body to initial state, but does spawn a task
195                 // to run the input_node.
196 
197     g.wait_for_all();
198     // array should be all zero
199     for (int i = 0; i < N; ++i ) {
200         int v = counters3[i];
201         CHECK_MESSAGE( v == 0, "" );
202     }
203 
204     remove_edge(src3, dest3);
205     make_edge(src_inactive, dest3);
206 
207     // src_inactive doesn't run
208     g.wait_for_all();
209     for (int i = 0; i < N; ++i ) {
210         int v = counters3[i];
211         CHECK_MESSAGE( v == 0, "" );
212     }
213 
214     // run graph
215     src_inactive.activate();
216     g.wait_for_all();
217     // check output
218     for (int i = 0; i < N; ++i ) {
219         int v = counters3[i];
220         CHECK_MESSAGE( v == 1, "" );
221         counters3[i] = 0;
222     }
223     g.reset(tbb::flow::rf_reset_bodies);  // <-- reinitializes the counts
224     // src_inactive doesn't run
225     g.wait_for_all();
226     for (int i = 0; i < N; ++i ) {
227         int v = counters3[i];
228         CHECK_MESSAGE( v == 0, "" );
229     }
230 
231     // start it up
232     src_inactive.activate();
233     g.wait_for_all();
234     for (int i = 0; i < N; ++i ) {
235         int v = counters3[i];
236         CHECK_MESSAGE( v == 1, "" );
237         counters3[i] = 0;
238     }
239     g.reset();  // doesn't reset the input_node_body to initial state, and doesn't
240                 // spawn a task to run the input_node.
241 
242     g.wait_for_all();
243     // array should be all zero
244     for (int i = 0; i < N; ++i ) {
245         int v = counters3[i];
246         CHECK_MESSAGE( v == 0, "" );
247     }
248     src_inactive.activate();
249     // input_node_body is already in final state, so input_node will not forward a message.
250     g.wait_for_all();
251     for (int i = 0; i < N; ++i ) {
252         int v = counters3[i];
253         CHECK_MESSAGE( v == 0, "" );
254     }
255 }
256 
257 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
258 int input_body_f(tbb::flow_control&) { return 42; }
259 
260 void test_deduction_guides() {
261     using namespace tbb::flow;
262     graph g;
263 
264     auto lambda = [](tbb::flow_control&) { return 42; };
265     auto non_const_lambda = [](tbb::flow_control&) mutable { return 42; };
266 
267     // Tests for input_node(graph&, Body)
268     input_node s1(g, lambda);
269     static_assert(std::is_same_v<decltype(s1), input_node<int>>);
270 
271     input_node s2(g, non_const_lambda);
272     static_assert(std::is_same_v<decltype(s2), input_node<int>>);
273 
274     input_node s3(g, input_body_f);
275     static_assert(std::is_same_v<decltype(s3), input_node<int>>);
276 
277     input_node s4(s3);
278     static_assert(std::is_same_v<decltype(s4), input_node<int>>);
279 
280 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
281     broadcast_node<int> bc(g);
282 
283     // Tests for input_node(const node_set<Args...>&, Body)
284     input_node s5(precedes(bc), lambda);
285     static_assert(std::is_same_v<decltype(s5), input_node<int>>);
286 
287     input_node s6(precedes(bc), non_const_lambda);
288     static_assert(std::is_same_v<decltype(s6), input_node<int>>);
289 
290     input_node s7(precedes(bc), input_body_f);
291     static_assert(std::is_same_v<decltype(s7), input_node<int>>);
292 #endif
293     g.wait_for_all();
294 }
295 
296 #endif // __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
297 
298 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
299 #include <array>
300 void test_follows_and_precedes_api() {
301     using namespace tbb::flow;
302 
303     graph g;
304 
305     std::array<buffer_node<bool>, 3> successors {{
306                                                   buffer_node<bool>(g),
307                                                   buffer_node<bool>(g),
308                                                   buffer_node<bool>(g)
309         }};
310 
311     bool do_try_put = true;
312     input_node<bool> src(
313         precedes(successors[0], successors[1], successors[2]),
314         [&](tbb::flow_control& fc) -> bool {
315             if(!do_try_put)
316                 fc.stop();
317             do_try_put = !do_try_put;
318             return true;
319         }
320     );
321 
322     src.activate();
323     g.wait_for_all();
324 
325     bool storage;
326     for(auto& successor: successors) {
327         CHECK_MESSAGE((successor.try_get(storage) && !successor.try_get(storage)),
328                       "Not exact edge quantity was made");
329     }
330 }
331 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
332 
333 //! Test push, push-pull behavior and copy constructor
334 //! \brief \ref error_guessing \ref requirement
335 TEST_CASE("Single destination tests"){
336     for ( unsigned int p = utils::MinThread; p < utils::MaxThread; ++p ) {
337         tbb::task_arena arena(p);
338         arena.execute(
339             [&]() {
340                 test_single_dest<int>();
341                 test_single_dest<float>();
342             }
343         );
344 	}
345 }
346 
347 //! Test reset variants
348 //! \brief \ref error_guessing
349 TEST_CASE("Reset test"){
350     test_reset();
351 }
352 
353 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
354 //! Test follows and precedes API
355 //! \brief \ref error_guessing
356 TEST_CASE("Follows and precedes API"){
357     test_follows_and_precedes_api();
358 }
359 #endif
360 
361 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
362 //! Test deduction guides
363 //! \brief \ref requirement
364 TEST_CASE("Deduction guides"){
365     test_deduction_guides();
366 }
367 #endif
368 
369 //! Test try_get before activation
370 //! \brief \ref error_guessing
371 TEST_CASE("try_get before activation"){
372     tbb::flow::graph g;
373     tbb::flow::input_node<int> in(g, [&](tbb::flow_control& fc) { fc.stop(); return 0;});
374 
375     int tmp = -1;
376     CHECK_MESSAGE((in.try_get(tmp) == false), "try_get before activation should not succeed");
377 }
378 
379 #if __TBB_CPP20_CONCEPTS_PRESENT
380 //! \brief \ref error_guessing
381 TEST_CASE("constraints for input_node output") {
382     struct Object : test_concepts::Copyable, test_concepts::CopyAssignable {};
383 
384     static_assert(utils::well_formed_instantiation<tbb::flow::input_node, Object>);
385     static_assert(utils::well_formed_instantiation<tbb::flow::input_node, int>);
386     static_assert(!utils::well_formed_instantiation<tbb::flow::input_node, test_concepts::NonCopyable>);
387     static_assert(!utils::well_formed_instantiation<tbb::flow::input_node, test_concepts::NonCopyAssignable>);
388 }
389 
390 template <typename Output, typename Body>
391 concept can_call_input_node_ctor = requires( tbb::flow::graph& graph, Body body, tbb::flow::buffer_node<int> f ) {
392     tbb::flow::input_node<Output>(graph, body);
393 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
394     tbb::flow::input_node<Output>(tbb::flow::precedes(f), body);
395 #endif
396 };
397 
398 //! \brief \ref error_guessing
399 TEST_CASE("constraints for input_node body") {
400     using output_type = int;
401     using namespace test_concepts::input_node_body;
402 
403     static_assert(can_call_input_node_ctor<output_type, Correct<output_type>>);
404     static_assert(!can_call_input_node_ctor<output_type, NonCopyable<output_type>>);
405     static_assert(!can_call_input_node_ctor<output_type, NonDestructible<output_type>>);
406     static_assert(!can_call_input_node_ctor<output_type, NoOperatorRoundBrackets<output_type>>);
407     static_assert(!can_call_input_node_ctor<output_type, WrongInputOperatorRoundBrackets<output_type>>);
408     static_assert(!can_call_input_node_ctor<output_type, WrongReturnOperatorRoundBrackets<output_type>>);
409 }
410 #endif // __TBB_CPP20_CONCEPTS_PRESENT
411