xref: /oneTBB/test/tbb/test_continue_node.cpp (revision 5d8ddb3f)
1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "common/config.h"
18 
19 #include "tbb/flow_graph.h"
20 
21 #include "common/test.h"
22 #include "common/utils.h"
23 #include "common/graph_utils.h"
24 #include "common/test_follows_and_precedes_api.h"
25 #include "common/concepts_common.h"
26 
27 
28 //! \file test_continue_node.cpp
29 //! \brief Test for [flow_graph.continue_node] specification
30 
31 
32 #define N 1000
33 #define MAX_NODES 4
34 #define C 8
35 
36 // A class to use as a fake predecessor of continue_node
37 struct fake_continue_sender : public tbb::flow::sender<tbb::flow::continue_msg>
38 {
39     typedef tbb::flow::sender<tbb::flow::continue_msg>::successor_type successor_type;
40     // Define implementations of virtual methods that are abstract in the base class
41     bool register_successor( successor_type& ) override { return false; }
42     bool remove_successor( successor_type& )   override { return false; }
43 };
44 
45 template< typename InputType >
46 struct parallel_puts {
47 
48     tbb::flow::receiver< InputType > * const my_exe_node;
49 
50     parallel_puts( tbb::flow::receiver< InputType > &exe_node ) : my_exe_node(&exe_node) {}
51     parallel_puts& operator=(const parallel_puts&) = delete;
52 
53     void operator()( int ) const  {
54         for ( int i = 0; i < N; ++i ) {
55             // the nodes will accept all puts
56             CHECK_MESSAGE( my_exe_node->try_put( InputType() ) == true, "" );
57         }
58     }
59 
60 };
61 
62 template< typename OutputType >
63 void run_continue_nodes( int p, tbb::flow::graph& g, tbb::flow::continue_node< OutputType >& n ) {
64     fake_continue_sender fake_sender;
65     for (size_t i = 0; i < N; ++i) {
66         tbb::detail::d1::register_predecessor(n, fake_sender);
67     }
68 
69     for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
70         std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers;
71         for (size_t i = 0; i < num_receivers; ++i) {
72             receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) );
73         }
74         harness_graph_executor<tbb::flow::continue_msg, OutputType>::execute_count = 0;
75 
76         for (size_t r = 0; r < num_receivers; ++r ) {
77             tbb::flow::make_edge( n, *receivers[r] );
78         }
79 
80         utils::NativeParallelFor( p, parallel_puts<tbb::flow::continue_msg>(n) );
81         g.wait_for_all();
82 
83         // 2) the nodes will receive puts from multiple predecessors simultaneously,
84         size_t ec = harness_graph_executor<tbb::flow::continue_msg, OutputType>::execute_count;
85         CHECK_MESSAGE( (int)ec == p, "" );
86         for (size_t r = 0; r < num_receivers; ++r ) {
87             size_t c = receivers[r]->my_count;
88             // 3) the nodes will send to multiple successors.
89             CHECK_MESSAGE( (int)c == p, "" );
90         }
91 
92         for (size_t r = 0; r < num_receivers; ++r ) {
93             tbb::flow::remove_edge( n, *receivers[r] );
94         }
95     }
96 }
97 
98 template< typename OutputType, typename Body >
99 void continue_nodes( Body body ) {
100     for (int p = 1; p < 2*4/*MaxThread*/; ++p) {
101         tbb::flow::graph g;
102         tbb::flow::continue_node< OutputType > exe_node( g, body );
103         run_continue_nodes( p, g, exe_node);
104         exe_node.try_put(tbb::flow::continue_msg());
105         tbb::flow::continue_node< OutputType > exe_node_copy( exe_node );
106         run_continue_nodes( p, g, exe_node_copy);
107     }
108 }
109 
110 const size_t Offset = 123;
111 std::atomic<size_t> global_execute_count;
112 
113 template< typename OutputType >
114 struct inc_functor {
115 
116     std::atomic<size_t> local_execute_count;
117     inc_functor( ) { local_execute_count = 0; }
118     inc_functor( const inc_functor &f ) { local_execute_count = size_t(f.local_execute_count); }
119     void operator=(const inc_functor &f) { local_execute_count = size_t(f.local_execute_count); }
120 
121     OutputType operator()( tbb::flow::continue_msg ) {
122        ++global_execute_count;
123        ++local_execute_count;
124        return OutputType();
125     }
126 
127 };
128 
129 template< typename OutputType >
130 void continue_nodes_with_copy( ) {
131 
132     for (int p = 1; p < 2*4/*MaxThread*/; ++p) {
133         tbb::flow::graph g;
134         inc_functor<OutputType> cf;
135         cf.local_execute_count = Offset;
136         global_execute_count = Offset;
137 
138         tbb::flow::continue_node< OutputType > exe_node( g, cf );
139         fake_continue_sender fake_sender;
140         for (size_t i = 0; i < N; ++i) {
141             tbb::detail::d1::register_predecessor(exe_node, fake_sender);
142         }
143 
144         for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
145             std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers;
146             for (size_t i = 0; i < num_receivers; ++i) {
147                 receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) );
148             }
149 
150             for (size_t r = 0; r < num_receivers; ++r ) {
151                 tbb::flow::make_edge( exe_node, *receivers[r] );
152             }
153 
154             utils::NativeParallelFor( p, parallel_puts<tbb::flow::continue_msg>(exe_node) );
155             g.wait_for_all();
156 
157             // 2) the nodes will receive puts from multiple predecessors simultaneously,
158             for (size_t r = 0; r < num_receivers; ++r ) {
159                 size_t c = receivers[r]->my_count;
160                 // 3) the nodes will send to multiple successors.
161                 CHECK_MESSAGE( (int)c == p, "" );
162             }
163             for (size_t r = 0; r < num_receivers; ++r ) {
164                 tbb::flow::remove_edge( exe_node, *receivers[r] );
165             }
166         }
167 
168         // validate that the local body matches the global execute_count and both are correct
169         inc_functor<OutputType> body_copy = tbb::flow::copy_body< inc_functor<OutputType> >( exe_node );
170         const size_t expected_count = p*MAX_NODES + Offset;
171         size_t global_count = global_execute_count;
172         size_t inc_count = body_copy.local_execute_count;
173         CHECK_MESSAGE( global_count == expected_count, "" );
174         CHECK_MESSAGE( global_count == inc_count, "" );
175         g.reset(tbb::flow::rf_reset_bodies);
176         body_copy = tbb::flow::copy_body< inc_functor<OutputType> >( exe_node );
177         inc_count = body_copy.local_execute_count;
178         CHECK_MESSAGE( ( Offset == inc_count), "reset(rf_reset_bodies) did not reset functor" );
179 
180     }
181 }
182 
183 template< typename OutputType >
184 void run_continue_nodes() {
185     harness_graph_executor< tbb::flow::continue_msg, OutputType>::max_executors = 0;
186     continue_nodes<OutputType>( []( tbb::flow::continue_msg i ) -> OutputType { return harness_graph_executor<tbb::flow::continue_msg, OutputType>::func(i); } );
187     continue_nodes<OutputType>( &harness_graph_executor<tbb::flow::continue_msg, OutputType>::func );
188     continue_nodes<OutputType>( typename harness_graph_executor<tbb::flow::continue_msg, OutputType>::functor() );
189     continue_nodes_with_copy<OutputType>();
190 }
191 
192 //! Tests limited concurrency cases for nodes that accept data messages
193 void test_concurrency(int num_threads) {
194     tbb::task_arena arena(num_threads);
195     arena.execute(
196         [&] {
197             run_continue_nodes<tbb::flow::continue_msg>();
198             run_continue_nodes<int>();
199             run_continue_nodes<utils::NoAssign>();
200         }
201     );
202 }
203 /*
204  * Connection of two graphs is not currently supported, but works to some limited extent.
205  * This test is included to check for backward compatibility. It checks that a continue_node
206  * with predecessors in two different graphs receives the required
207  * number of continue messages before it executes.
208  */
209 using namespace tbb::flow;
210 
211 struct add_to_counter {
212     int* counter;
213     add_to_counter(int& var):counter(&var){}
214     void operator()(continue_msg){*counter+=1;}
215 };
216 
217 void test_two_graphs(){
218     int count=0;
219 
220     //graph g with broadcast_node and continue_node
221     graph g;
222     broadcast_node<continue_msg> start_g(g);
223     continue_node<continue_msg> first_g(g, add_to_counter(count));
224 
225     //graph h with broadcast_node
226     graph h;
227     broadcast_node<continue_msg> start_h(h);
228 
229     //making two edges to first_g from the two graphs
230     make_edge(start_g,first_g);
231     make_edge(start_h, first_g);
232 
233     //two try_puts from the two graphs
234     start_g.try_put(continue_msg());
235     start_h.try_put(continue_msg());
236     g.wait_for_all();
237     CHECK_MESSAGE( (count==1), "Not all continue messages received");
238 
239     //two try_puts from the graph that doesn't contain the node
240     count=0;
241     start_h.try_put(continue_msg());
242     start_h.try_put(continue_msg());
243     g.wait_for_all();
244     CHECK_MESSAGE( (count==1), "Not all continue messages received -1");
245 
246     //only one try_put
247     count=0;
248     start_g.try_put(continue_msg());
249     g.wait_for_all();
250     CHECK_MESSAGE( (count==0), "Node executed without waiting for all predecessors");
251 }
252 
253 struct lightweight_policy_body {
254     const std::thread::id my_thread_id;
255     std::atomic<size_t>& my_count;
256 
257     lightweight_policy_body( std::atomic<size_t>& count )
258         : my_thread_id(std::this_thread::get_id()), my_count(count)
259     {
260         my_count = 0;
261     }
262     lightweight_policy_body& operator=(const lightweight_policy_body&) = delete;
263     void operator()(tbb::flow::continue_msg) {
264         ++my_count;
265         std::thread::id body_thread_id = std::this_thread::get_id();
266         CHECK_MESSAGE( (body_thread_id == my_thread_id), "Body executed as not lightweight");
267     }
268 };
269 
270 void test_lightweight_policy() {
271     tbb::flow::graph g;
272     std::atomic<size_t> count1;
273     std::atomic<size_t> count2;
274     tbb::flow::continue_node<tbb::flow::continue_msg, tbb::flow::lightweight>
275         node1(g, lightweight_policy_body(count1));
276     tbb::flow::continue_node<tbb::flow::continue_msg, tbb::flow::lightweight>
277         node2(g, lightweight_policy_body(count2));
278 
279     tbb::flow::make_edge(node1, node2);
280     const size_t n = 10;
281     for(size_t i = 0; i < n; ++i) {
282         node1.try_put(tbb::flow::continue_msg());
283     }
284     g.wait_for_all();
285 
286     lightweight_policy_body body1 = tbb::flow::copy_body<lightweight_policy_body>(node1);
287     lightweight_policy_body body2 = tbb::flow::copy_body<lightweight_policy_body>(node2);
288     CHECK_MESSAGE( (body1.my_count == n), "Body of the first node needs to be executed N times");
289     CHECK_MESSAGE( (body2.my_count == n), "Body of the second node needs to be executed N times");
290 }
291 
292 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
293 #include <array>
294 #include <vector>
295 void test_follows_and_precedes_api() {
296     using msg_t = tbb::flow::continue_msg;
297 
298     std::array<msg_t, 3> messages_for_follows = { { msg_t(), msg_t(), msg_t() } };
299     std::vector<msg_t> messages_for_precedes  = { msg_t() };
300 
301     auto pass_through = [](const msg_t& msg) { return msg; };
302 
303     follows_and_precedes_testing::test_follows
304         <msg_t, tbb::flow::continue_node<msg_t>>
305         (messages_for_follows, pass_through, node_priority_t(0));
306 
307     follows_and_precedes_testing::test_precedes
308         <msg_t, tbb::flow::continue_node<msg_t>>
309         (messages_for_precedes, /* number_of_predecessors = */0, pass_through, node_priority_t(1));
310 }
311 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
312 
313 // TODO: use pass_through from test_function_node instead
314 template<typename T>
315 struct passing_body {
316     T operator()(const T& val) {
317         return val;
318     }
319 };
320 
321 /*
322     The test covers the case when a node with non-default mutex type is a predecessor for continue_node,
323     because there used to be a bug when make_edge(node, continue_node)
324     did not update continue_node's predecesosor threshold
325     since the specialization of node's successor_cache for a continue_node was not chosen.
326 */
327 void test_successor_cache_specialization() {
328     using namespace tbb::flow;
329 
330     graph g;
331 
332     broadcast_node<continue_msg> node_with_default_mutex_type(g);
333     buffer_node<continue_msg> node_with_non_default_mutex_type(g);
334 
335     continue_node<continue_msg> node(g, passing_body<continue_msg>());
336 
337     make_edge(node_with_default_mutex_type, node);
338     make_edge(node_with_non_default_mutex_type, node);
339 
340     buffer_node<continue_msg> buf(g);
341 
342     make_edge(node, buf);
343 
344     node_with_default_mutex_type.try_put(continue_msg());
345     node_with_non_default_mutex_type.try_put(continue_msg());
346 
347     g.wait_for_all();
348 
349     continue_msg storage;
350     CHECK_MESSAGE((buf.try_get(storage) && !buf.try_get(storage)),
351                   "Wrong number of messages is passed via continue_node");
352 }
353 
354 //! Test concurrent continue_node for correctness
355 //! \brief \ref error_guessing
356 TEST_CASE("Concurrency testing") {
357     for( unsigned p=utils::MinThread; p<=utils::MaxThread; ++p ) {
358         test_concurrency(p);
359     }
360 }
361 
362 //! Test concurrent continue_node in separate graphs
363 //! \brief \ref error_guessing
364 TEST_CASE("Two graphs") { test_two_graphs(); }
365 
366 //! Test basic behaviour with lightweight body
367 //! \brief \ref requirement \ref error_guessing
368 TEST_CASE( "Lightweight policy" ) { test_lightweight_policy(); }
369 
370 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
371 //! Test deprecated follows and preceedes API
372 //! \brief \ref error_guessing
373 TEST_CASE( "Support for follows and precedes API" ) { test_follows_and_precedes_api(); }
374 #endif
375 
376 //! Test for successor cache specialization
377 //! \brief \ref regression
378 TEST_CASE( "Regression for successor cache specialization" ) {
379     test_successor_cache_specialization();
380 }
381 
382 #if __TBB_CPP20_CONCEPTS_PRESENT
383 //! \brief \ref error_guessing
384 TEST_CASE("constraints for continue_node input") {
385     static_assert(utils::well_formed_instantiation<tbb::flow::continue_node, test_concepts::Copyable>);
386     static_assert(!utils::well_formed_instantiation<tbb::flow::continue_node, test_concepts::NonCopyable>);
387 }
388 
389 template <typename Input, typename Body>
390 concept can_call_continue_node_ctor = requires( tbb::flow::graph& graph, Body body,
391                                                 tbb::flow::buffer_node<int>& f, std::size_t num,
392                                                 tbb::flow::node_priority_t priority  ) {
393     tbb::flow::continue_node<Input>(graph, body);
394     tbb::flow::continue_node<Input>(graph, body, priority);
395     tbb::flow::continue_node<Input>(graph, num, body);
396     tbb::flow::continue_node<Input>(graph, num, body, priority);
397 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
398     tbb::flow::continue_node<Input>(tbb::flow::follows(f), body);
399     tbb::flow::continue_node<Input>(tbb::flow::follows(f), body, priority);
400     tbb::flow::continue_node<Input>(tbb::flow::follows(f), num, body);
401     tbb::flow::continue_node<Input>(tbb::flow::follows(f), num, body, priority);
402 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
403 };
404 
405 //! \brief \ref error_guessing
406 TEST_CASE("constraints for continue_node body") {
407     using output_type = int;
408     using namespace test_concepts::continue_node_body;
409 
410     static_assert(can_call_continue_node_ctor<output_type, Correct<output_type>>);
411     static_assert(!can_call_continue_node_ctor<output_type, NonCopyable<output_type>>);
412     static_assert(!can_call_continue_node_ctor<output_type, NonDestructible<output_type>>);
413     static_assert(!can_call_continue_node_ctor<output_type, NoOperatorRoundBrackets<output_type>>);
414     static_assert(!can_call_continue_node_ctor<output_type, WrongInputOperatorRoundBrackets<output_type>>);
415     static_assert(!can_call_continue_node_ctor<output_type, WrongReturnOperatorRoundBrackets<output_type>>);
416 }
417 #endif // __TBB_CPP20_CONCEPTS_PRESENT
418