xref: /oneTBB/test/tbb/test_continue_node.cpp (revision 35e0f552)
1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "common/config.h"
18 
19 #include "tbb/flow_graph.h"
20 
21 #include "common/test.h"
22 #include "common/utils.h"
23 #include "common/graph_utils.h"
24 #include "common/test_follows_and_precedes_api.h"
25 #include "common/concepts_common.h"
26 
27 
28 //! \file test_continue_node.cpp
29 //! \brief Test for [flow_graph.continue_node] specification
30 
31 
32 #define N 1000
33 #define MAX_NODES 4
34 #define C 8
35 
36 // A class to use as a fake predecessor of continue_node
37 struct fake_continue_sender : public tbb::flow::sender<tbb::flow::continue_msg>
38 {
39     typedef tbb::flow::sender<tbb::flow::continue_msg>::successor_type successor_type;
40     // Define implementations of virtual methods that are abstract in the base class
41     bool register_successor( successor_type& ) override { return false; }
42     bool remove_successor( successor_type& )   override { return false; }
43 };
44 
45 template< typename InputType >
46 struct parallel_puts {
47 
48     tbb::flow::receiver< InputType > * const my_exe_node;
49 
50     parallel_puts( tbb::flow::receiver< InputType > &exe_node ) : my_exe_node(&exe_node) {}
51     parallel_puts& operator=(const parallel_puts&) = delete;
52 
53     void operator()( int ) const  {
54         for ( int i = 0; i < N; ++i ) {
55             // the nodes will accept all puts
56             CHECK_MESSAGE( my_exe_node->try_put( InputType() ) == true, "" );
57         }
58     }
59 
60 };
61 
62 template< typename OutputType >
63 void run_continue_nodes( int p, tbb::flow::graph& g, tbb::flow::continue_node< OutputType >& n ) {
64     fake_continue_sender fake_sender;
65     for (size_t i = 0; i < N; ++i) {
66         tbb::detail::d1::register_predecessor(n, fake_sender);
67     }
68 
69     for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
70         std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers;
71         for (size_t i = 0; i < num_receivers; ++i) {
72             receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) );
73         }
74         harness_graph_executor<tbb::flow::continue_msg, OutputType>::execute_count = 0;
75 
76         for (size_t r = 0; r < num_receivers; ++r ) {
77             tbb::flow::make_edge( n, *receivers[r] );
78         }
79 
80         utils::NativeParallelFor( p, parallel_puts<tbb::flow::continue_msg>(n) );
81         g.wait_for_all();
82 
83         // 2) the nodes will receive puts from multiple predecessors simultaneously,
84         size_t ec = harness_graph_executor<tbb::flow::continue_msg, OutputType>::execute_count;
85         CHECK_MESSAGE( (int)ec == p, "" );
86         for (size_t r = 0; r < num_receivers; ++r ) {
87             size_t c = receivers[r]->my_count;
88             // 3) the nodes will send to multiple successors.
89             CHECK_MESSAGE( (int)c == p, "" );
90         }
91 
92         for (size_t r = 0; r < num_receivers; ++r ) {
93             tbb::flow::remove_edge( n, *receivers[r] );
94         }
95     }
96 }
97 
98 template< typename OutputType, typename Body >
99 void continue_nodes( Body body ) {
100     for (int p = 1; p < 2*4/*MaxThread*/; ++p) {
101         tbb::flow::graph g;
102         tbb::flow::continue_node< OutputType > exe_node( g, body );
103         run_continue_nodes( p, g, exe_node);
104         exe_node.try_put(tbb::flow::continue_msg());
105         tbb::flow::continue_node< OutputType > exe_node_copy( exe_node );
106         run_continue_nodes( p, g, exe_node_copy);
107     }
108 }
109 
110 const size_t Offset = 123;
111 std::atomic<size_t> global_execute_count;
112 
113 template< typename OutputType >
114 struct inc_functor {
115 
116     std::atomic<size_t> local_execute_count;
117     inc_functor( ) { local_execute_count = 0; }
118     inc_functor( const inc_functor &f ) { local_execute_count = size_t(f.local_execute_count); }
119     void operator=(const inc_functor &f) { local_execute_count = size_t(f.local_execute_count); }
120 
121     OutputType operator()( tbb::flow::continue_msg ) {
122        ++global_execute_count;
123        ++local_execute_count;
124        return OutputType();
125     }
126 
127 };
128 
129 template< typename OutputType >
130 void continue_nodes_with_copy( ) {
131 
132     for (int p = 1; p < 2*4/*MaxThread*/; ++p) {
133         tbb::flow::graph g;
134         inc_functor<OutputType> cf;
135         cf.local_execute_count = Offset;
136         global_execute_count = Offset;
137 
138         tbb::flow::continue_node< OutputType > exe_node( g, cf );
139         fake_continue_sender fake_sender;
140         for (size_t i = 0; i < N; ++i) {
141             tbb::detail::d1::register_predecessor(exe_node, fake_sender);
142         }
143 
144         for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
145             std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers;
146             for (size_t i = 0; i < num_receivers; ++i) {
147                 receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) );
148             }
149 
150             for (size_t r = 0; r < num_receivers; ++r ) {
151                 tbb::flow::make_edge( exe_node, *receivers[r] );
152             }
153 
154             utils::NativeParallelFor( p, parallel_puts<tbb::flow::continue_msg>(exe_node) );
155             g.wait_for_all();
156 
157             // 2) the nodes will receive puts from multiple predecessors simultaneously,
158             for (size_t r = 0; r < num_receivers; ++r ) {
159                 size_t c = receivers[r]->my_count;
160                 // 3) the nodes will send to multiple successors.
161                 CHECK_MESSAGE( (int)c == p, "" );
162             }
163             for (size_t r = 0; r < num_receivers; ++r ) {
164                 tbb::flow::remove_edge( exe_node, *receivers[r] );
165             }
166         }
167 
168         // validate that the local body matches the global execute_count and both are correct
169         inc_functor<OutputType> body_copy = tbb::flow::copy_body< inc_functor<OutputType> >( exe_node );
170         const size_t expected_count = p*MAX_NODES + Offset;
171         size_t global_count = global_execute_count;
172         size_t inc_count = body_copy.local_execute_count;
173         CHECK_MESSAGE( global_count == expected_count, "" );
174         CHECK_MESSAGE( global_count == inc_count, "" );
175         g.reset(tbb::flow::rf_reset_bodies);
176         body_copy = tbb::flow::copy_body< inc_functor<OutputType> >( exe_node );
177         inc_count = body_copy.local_execute_count;
178         CHECK_MESSAGE( ( Offset == inc_count), "reset(rf_reset_bodies) did not reset functor" );
179 
180     }
181 }
182 
183 template< typename OutputType >
184 void run_continue_nodes() {
185     harness_graph_executor< tbb::flow::continue_msg, OutputType>::max_executors = 0;
186     continue_nodes<OutputType>( []( tbb::flow::continue_msg i ) -> OutputType { return harness_graph_executor<tbb::flow::continue_msg, OutputType>::func(i); } );
187     continue_nodes<OutputType>( &harness_graph_executor<tbb::flow::continue_msg, OutputType>::func );
188     continue_nodes<OutputType>( typename harness_graph_executor<tbb::flow::continue_msg, OutputType>::functor() );
189     continue_nodes_with_copy<OutputType>();
190 }
191 
192 //! Tests limited concurrency cases for nodes that accept data messages
193 void test_concurrency(int num_threads) {
194     tbb::task_arena arena(num_threads);
195     arena.execute(
196         [&] {
197             run_continue_nodes<tbb::flow::continue_msg>();
198             run_continue_nodes<int>();
199             run_continue_nodes<utils::NoAssign>();
200         }
201     );
202 }
203 /*
204  * Connection of two graphs is not currently supported, but works to some limited extent.
205  * This test is included to check for backward compatibility. It checks that a continue_node
206  * with predecessors in two different graphs receives the required
207  * number of continue messages before it executes.
208  */
209 using namespace tbb::flow;
210 
211 struct add_to_counter {
212     int* counter;
213     add_to_counter(int& var):counter(&var){}
214     void operator()(continue_msg){*counter+=1;}
215 };
216 
217 void test_two_graphs(){
218     int count=0;
219 
220     //graph g with broadcast_node and continue_node
221     graph g;
222     broadcast_node<continue_msg> start_g(g);
223     continue_node<continue_msg> first_g(g, add_to_counter(count));
224 
225     //graph h with broadcast_node
226     graph h;
227     broadcast_node<continue_msg> start_h(h);
228 
229     //making two edges to first_g from the two graphs
230     make_edge(start_g,first_g);
231     make_edge(start_h, first_g);
232 
233     //two try_puts from the two graphs
234     start_g.try_put(continue_msg());
235     start_h.try_put(continue_msg());
236     g.wait_for_all();
237     CHECK_MESSAGE( (count==1), "Not all continue messages received");
238 
239     //two try_puts from the graph that doesn't contain the node
240     count=0;
241     start_h.try_put(continue_msg());
242     start_h.try_put(continue_msg());
243     g.wait_for_all();
244     CHECK_MESSAGE( (count==1), "Not all continue messages received -1");
245 
246     //only one try_put
247     count=0;
248     start_g.try_put(continue_msg());
249     g.wait_for_all();
250     CHECK_MESSAGE( (count==0), "Node executed without waiting for all predecessors");
251 }
252 
253 struct lightweight_policy_body {
254     const std::thread::id my_thread_id;
255     std::atomic<size_t>& my_count;
256 
257     lightweight_policy_body( std::atomic<size_t>& count )
258         : my_thread_id(std::this_thread::get_id()), my_count(count)
259     {
260         my_count = 0;
261     }
262 
263     lightweight_policy_body( const lightweight_policy_body& ) = default;
264     lightweight_policy_body& operator=( const lightweight_policy_body& ) = delete;
265 
266     void operator()( tbb::flow::continue_msg ) {
267         ++my_count;
268         std::thread::id body_thread_id = std::this_thread::get_id();
269         CHECK_MESSAGE( (body_thread_id == my_thread_id), "Body executed as not lightweight");
270     }
271 };
272 
273 void test_lightweight_policy() {
274     tbb::flow::graph g;
275     std::atomic<size_t> count1;
276     std::atomic<size_t> count2;
277     tbb::flow::continue_node<tbb::flow::continue_msg, tbb::flow::lightweight>
278         node1(g, lightweight_policy_body(count1));
279     tbb::flow::continue_node<tbb::flow::continue_msg, tbb::flow::lightweight>
280         node2(g, lightweight_policy_body(count2));
281 
282     tbb::flow::make_edge(node1, node2);
283     const size_t n = 10;
284     for(size_t i = 0; i < n; ++i) {
285         node1.try_put(tbb::flow::continue_msg());
286     }
287     g.wait_for_all();
288 
289     lightweight_policy_body body1 = tbb::flow::copy_body<lightweight_policy_body>(node1);
290     lightweight_policy_body body2 = tbb::flow::copy_body<lightweight_policy_body>(node2);
291     CHECK_MESSAGE( (body1.my_count == n), "Body of the first node needs to be executed N times");
292     CHECK_MESSAGE( (body2.my_count == n), "Body of the second node needs to be executed N times");
293 }
294 
295 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
296 #include <array>
297 #include <vector>
298 void test_follows_and_precedes_api() {
299     using msg_t = tbb::flow::continue_msg;
300 
301     std::array<msg_t, 3> messages_for_follows = { { msg_t(), msg_t(), msg_t() } };
302     std::vector<msg_t> messages_for_precedes  = { msg_t() };
303 
304     auto pass_through = [](const msg_t& msg) { return msg; };
305 
306     follows_and_precedes_testing::test_follows
307         <msg_t, tbb::flow::continue_node<msg_t>>
308         (messages_for_follows, pass_through, node_priority_t(0));
309 
310     follows_and_precedes_testing::test_precedes
311         <msg_t, tbb::flow::continue_node<msg_t>>
312         (messages_for_precedes, /* number_of_predecessors = */0, pass_through, node_priority_t(1));
313 }
314 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
315 
316 // TODO: use pass_through from test_function_node instead
317 template<typename T>
318 struct passing_body {
319     T operator()(const T& val) {
320         return val;
321     }
322 };
323 
324 /*
325     The test covers the case when a node with non-default mutex type is a predecessor for continue_node,
326     because there used to be a bug when make_edge(node, continue_node)
327     did not update continue_node's predecesosor threshold
328     since the specialization of node's successor_cache for a continue_node was not chosen.
329 */
330 void test_successor_cache_specialization() {
331     using namespace tbb::flow;
332 
333     graph g;
334 
335     broadcast_node<continue_msg> node_with_default_mutex_type(g);
336     buffer_node<continue_msg> node_with_non_default_mutex_type(g);
337 
338     continue_node<continue_msg> node(g, passing_body<continue_msg>());
339 
340     make_edge(node_with_default_mutex_type, node);
341     make_edge(node_with_non_default_mutex_type, node);
342 
343     buffer_node<continue_msg> buf(g);
344 
345     make_edge(node, buf);
346 
347     node_with_default_mutex_type.try_put(continue_msg());
348     node_with_non_default_mutex_type.try_put(continue_msg());
349 
350     g.wait_for_all();
351 
352     continue_msg storage;
353     CHECK_MESSAGE((buf.try_get(storage) && !buf.try_get(storage)),
354                   "Wrong number of messages is passed via continue_node");
355 }
356 
357 //! Test concurrent continue_node for correctness
358 //! \brief \ref error_guessing
359 TEST_CASE("Concurrency testing") {
360     for( unsigned p=utils::MinThread; p<=utils::MaxThread; ++p ) {
361         test_concurrency(p);
362     }
363 }
364 
365 //! Test concurrent continue_node in separate graphs
366 //! \brief \ref error_guessing
367 TEST_CASE("Two graphs") { test_two_graphs(); }
368 
369 //! Test basic behaviour with lightweight body
370 //! \brief \ref requirement \ref error_guessing
371 TEST_CASE( "Lightweight policy" ) { test_lightweight_policy(); }
372 
373 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
374 //! Test deprecated follows and preceedes API
375 //! \brief \ref error_guessing
376 TEST_CASE( "Support for follows and precedes API" ) { test_follows_and_precedes_api(); }
377 #endif
378 
379 //! Test for successor cache specialization
380 //! \brief \ref regression
381 TEST_CASE( "Regression for successor cache specialization" ) {
382     test_successor_cache_specialization();
383 }
384 
385 #if __TBB_CPP20_CONCEPTS_PRESENT
386 //! \brief \ref error_guessing
387 TEST_CASE("constraints for continue_node input") {
388     static_assert(utils::well_formed_instantiation<tbb::flow::continue_node, test_concepts::Copyable>);
389     static_assert(!utils::well_formed_instantiation<tbb::flow::continue_node, test_concepts::NonCopyable>);
390 }
391 
392 template <typename Input, typename Body>
393 concept can_call_continue_node_ctor = requires( tbb::flow::graph& graph, Body body,
394                                                 tbb::flow::buffer_node<int>& f, std::size_t num,
395                                                 tbb::flow::node_priority_t priority  ) {
396     tbb::flow::continue_node<Input>(graph, body);
397     tbb::flow::continue_node<Input>(graph, body, priority);
398     tbb::flow::continue_node<Input>(graph, num, body);
399     tbb::flow::continue_node<Input>(graph, num, body, priority);
400 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
401     tbb::flow::continue_node<Input>(tbb::flow::follows(f), body);
402     tbb::flow::continue_node<Input>(tbb::flow::follows(f), body, priority);
403     tbb::flow::continue_node<Input>(tbb::flow::follows(f), num, body);
404     tbb::flow::continue_node<Input>(tbb::flow::follows(f), num, body, priority);
405 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
406 };
407 
408 //! \brief \ref error_guessing
409 TEST_CASE("constraints for continue_node body") {
410     using output_type = int;
411     using namespace test_concepts::continue_node_body;
412 
413     static_assert(can_call_continue_node_ctor<output_type, Correct<output_type>>);
414     static_assert(!can_call_continue_node_ctor<output_type, NonCopyable<output_type>>);
415     static_assert(!can_call_continue_node_ctor<output_type, NonDestructible<output_type>>);
416     static_assert(!can_call_continue_node_ctor<output_type, NoOperatorRoundBrackets<output_type>>);
417     static_assert(!can_call_continue_node_ctor<output_type, WrongInputOperatorRoundBrackets<output_type>>);
418     static_assert(!can_call_continue_node_ctor<output_type, WrongReturnOperatorRoundBrackets<output_type>>);
419 }
420 #endif // __TBB_CPP20_CONCEPTS_PRESENT
421