xref: /oneTBB/test/tbb/test_continue_node.cpp (revision d86ed7fb)
1 /*
2     Copyright (c) 2005-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "common/config.h"
18 
19 // TODO revamp: move parts dependent on __TBB_EXTRA_DEBUG into separate test(s) since having these
20 // parts in all of tests might make testing of the product, which is different from what is actually
21 // released.
22 #define __TBB_EXTRA_DEBUG 1
23 #include "tbb/flow_graph.h"
24 
25 #include "common/test.h"
26 #include "common/utils.h"
27 #include "common/graph_utils.h"
28 #include "common/test_follows_and_precedes_api.h"
29 
30 
31 //! \file test_continue_node.cpp
32 //! \brief Test for [flow_graph.continue_node] specification
33 
34 
35 #define N 1000
36 #define MAX_NODES 4
37 #define C 8
38 
39 // A class to use as a fake predecessor of continue_node
40 struct fake_continue_sender : public tbb::flow::sender<tbb::flow::continue_msg>
41 {
42     typedef tbb::flow::sender<tbb::flow::continue_msg>::successor_type successor_type;
43     // Define implementations of virtual methods that are abstract in the base class
44     bool register_successor( successor_type& ) override { return false; }
45     bool remove_successor( successor_type& )   override { return false; }
46 };
47 
48 template< typename InputType >
49 struct parallel_puts {
50 
51     tbb::flow::receiver< InputType > * const my_exe_node;
52 
53     parallel_puts( tbb::flow::receiver< InputType > &exe_node ) : my_exe_node(&exe_node) {}
54     parallel_puts& operator=(const parallel_puts&) = delete;
55 
56     void operator()( int ) const  {
57         for ( int i = 0; i < N; ++i ) {
58             // the nodes will accept all puts
59             CHECK_MESSAGE( my_exe_node->try_put( InputType() ) == true, "" );
60         }
61     }
62 
63 };
64 
65 template< typename OutputType >
66 void run_continue_nodes( int p, tbb::flow::graph& g, tbb::flow::continue_node< OutputType >& n ) {
67     fake_continue_sender fake_sender;
68     for (size_t i = 0; i < N; ++i) {
69         tbb::detail::d1::register_predecessor(n, fake_sender);
70     }
71 
72     for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
73         std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers;
74         for (size_t i = 0; i < num_receivers; ++i) {
75             receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) );
76         }
77         harness_graph_executor<tbb::flow::continue_msg, OutputType>::execute_count = 0;
78 
79         for (size_t r = 0; r < num_receivers; ++r ) {
80             tbb::flow::make_edge( n, *receivers[r] );
81         }
82 
83         utils::NativeParallelFor( p, parallel_puts<tbb::flow::continue_msg>(n) );
84         g.wait_for_all();
85 
86         // 2) the nodes will receive puts from multiple predecessors simultaneously,
87         size_t ec = harness_graph_executor<tbb::flow::continue_msg, OutputType>::execute_count;
88         CHECK_MESSAGE( (int)ec == p, "" );
89         for (size_t r = 0; r < num_receivers; ++r ) {
90             size_t c = receivers[r]->my_count;
91             // 3) the nodes will send to multiple successors.
92             CHECK_MESSAGE( (int)c == p, "" );
93         }
94 
95         for (size_t r = 0; r < num_receivers; ++r ) {
96             tbb::flow::remove_edge( n, *receivers[r] );
97         }
98     }
99 }
100 
101 template< typename OutputType, typename Body >
102 void continue_nodes( Body body ) {
103     for (int p = 1; p < 2*4/*MaxThread*/; ++p) {
104         tbb::flow::graph g;
105         tbb::flow::continue_node< OutputType > exe_node( g, body );
106         run_continue_nodes( p, g, exe_node);
107         exe_node.try_put(tbb::flow::continue_msg());
108         tbb::flow::continue_node< OutputType > exe_node_copy( exe_node );
109         run_continue_nodes( p, g, exe_node_copy);
110     }
111 }
112 
113 const size_t Offset = 123;
114 std::atomic<size_t> global_execute_count;
115 
116 template< typename OutputType >
117 struct inc_functor {
118 
119     std::atomic<size_t> local_execute_count;
120     inc_functor( ) { local_execute_count = 0; }
121     inc_functor( const inc_functor &f ) { local_execute_count = size_t(f.local_execute_count); }
122     void operator=(const inc_functor &f) { local_execute_count = size_t(f.local_execute_count); }
123 
124     OutputType operator()( tbb::flow::continue_msg ) {
125        ++global_execute_count;
126        ++local_execute_count;
127        return OutputType();
128     }
129 
130 };
131 
132 template< typename OutputType >
133 void continue_nodes_with_copy( ) {
134 
135     for (int p = 1; p < 2*4/*MaxThread*/; ++p) {
136         tbb::flow::graph g;
137         inc_functor<OutputType> cf;
138         cf.local_execute_count = Offset;
139         global_execute_count = Offset;
140 
141         tbb::flow::continue_node< OutputType > exe_node( g, cf );
142         fake_continue_sender fake_sender;
143         for (size_t i = 0; i < N; ++i) {
144             tbb::detail::d1::register_predecessor(exe_node, fake_sender);
145         }
146 
147         for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
148             std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers;
149             for (size_t i = 0; i < num_receivers; ++i) {
150                 receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) );
151             }
152 
153             for (size_t r = 0; r < num_receivers; ++r ) {
154                 tbb::flow::make_edge( exe_node, *receivers[r] );
155             }
156 
157             utils::NativeParallelFor( p, parallel_puts<tbb::flow::continue_msg>(exe_node) );
158             g.wait_for_all();
159 
160             // 2) the nodes will receive puts from multiple predecessors simultaneously,
161             for (size_t r = 0; r < num_receivers; ++r ) {
162                 size_t c = receivers[r]->my_count;
163                 // 3) the nodes will send to multiple successors.
164                 CHECK_MESSAGE( (int)c == p, "" );
165             }
166             for (size_t r = 0; r < num_receivers; ++r ) {
167                 tbb::flow::remove_edge( exe_node, *receivers[r] );
168             }
169         }
170 
171         // validate that the local body matches the global execute_count and both are correct
172         inc_functor<OutputType> body_copy = tbb::flow::copy_body< inc_functor<OutputType> >( exe_node );
173         const size_t expected_count = p*MAX_NODES + Offset;
174         size_t global_count = global_execute_count;
175         size_t inc_count = body_copy.local_execute_count;
176         CHECK_MESSAGE( global_count == expected_count, "" );
177         CHECK_MESSAGE( global_count == inc_count, "" );
178         g.reset(tbb::flow::rf_reset_bodies);
179         body_copy = tbb::flow::copy_body< inc_functor<OutputType> >( exe_node );
180         inc_count = body_copy.local_execute_count;
181         CHECK_MESSAGE( ( Offset == inc_count), "reset(rf_reset_bodies) did not reset functor" );
182 
183     }
184 }
185 
186 template< typename OutputType >
187 void run_continue_nodes() {
188     harness_graph_executor< tbb::flow::continue_msg, OutputType>::max_executors = 0;
189     continue_nodes<OutputType>( []( tbb::flow::continue_msg i ) -> OutputType { return harness_graph_executor<tbb::flow::continue_msg, OutputType>::func(i); } );
190     continue_nodes<OutputType>( &harness_graph_executor<tbb::flow::continue_msg, OutputType>::func );
191     continue_nodes<OutputType>( typename harness_graph_executor<tbb::flow::continue_msg, OutputType>::functor() );
192     continue_nodes_with_copy<OutputType>();
193 }
194 
195 //! Tests limited concurrency cases for nodes that accept data messages
196 void test_concurrency(int num_threads) {
197     tbb::task_arena arena(num_threads);
198     arena.execute(
199         [&] {
200             run_continue_nodes<tbb::flow::continue_msg>();
201             run_continue_nodes<int>();
202             run_continue_nodes<utils::NoAssign>();
203         }
204     );
205 }
206 /*
207  * Connection of two graphs is not currently supported, but works to some limited extent.
208  * This test is included to check for backward compatibility. It checks that a continue_node
209  * with predecessors in two different graphs receives the required
210  * number of continue messages before it executes.
211  */
212 using namespace tbb::flow;
213 
214 struct add_to_counter {
215     int* counter;
216     add_to_counter(int& var):counter(&var){}
217     void operator()(continue_msg){*counter+=1;}
218 };
219 
220 void test_two_graphs(){
221     int count=0;
222 
223     //graph g with broadcast_node and continue_node
224     graph g;
225     broadcast_node<continue_msg> start_g(g);
226     continue_node<continue_msg> first_g(g, add_to_counter(count));
227 
228     //graph h with broadcast_node
229     graph h;
230     broadcast_node<continue_msg> start_h(h);
231 
232     //making two edges to first_g from the two graphs
233     make_edge(start_g,first_g);
234     make_edge(start_h, first_g);
235 
236     //two try_puts from the two graphs
237     start_g.try_put(continue_msg());
238     start_h.try_put(continue_msg());
239     g.wait_for_all();
240     CHECK_MESSAGE( (count==1), "Not all continue messages received");
241 
242     //two try_puts from the graph that doesn't contain the node
243     count=0;
244     start_h.try_put(continue_msg());
245     start_h.try_put(continue_msg());
246     g.wait_for_all();
247     CHECK_MESSAGE( (count==1), "Not all continue messages received -1");
248 
249     //only one try_put
250     count=0;
251     start_g.try_put(continue_msg());
252     g.wait_for_all();
253     CHECK_MESSAGE( (count==0), "Node executed without waiting for all predecessors");
254 }
255 
256 struct lightweight_policy_body {
257     const std::thread::id my_thread_id;
258     std::atomic<size_t>& my_count;
259 
260     lightweight_policy_body( std::atomic<size_t>& count )
261         : my_thread_id(std::this_thread::get_id()), my_count(count)
262     {
263         my_count = 0;
264     }
265     lightweight_policy_body& operator=(const lightweight_policy_body&) = delete;
266     void operator()(tbb::flow::continue_msg) {
267         ++my_count;
268         std::thread::id body_thread_id = std::this_thread::get_id();
269         CHECK_MESSAGE( (body_thread_id == my_thread_id), "Body executed as not lightweight");
270     }
271 };
272 
273 void test_lightweight_policy() {
274     tbb::flow::graph g;
275     std::atomic<size_t> count1;
276     std::atomic<size_t> count2;
277     tbb::flow::continue_node<tbb::flow::continue_msg, tbb::flow::lightweight>
278         node1(g, lightweight_policy_body(count1));
279     tbb::flow::continue_node<tbb::flow::continue_msg, tbb::flow::lightweight>
280         node2(g, lightweight_policy_body(count2));
281 
282     tbb::flow::make_edge(node1, node2);
283     const size_t n = 10;
284     for(size_t i = 0; i < n; ++i) {
285         node1.try_put(tbb::flow::continue_msg());
286     }
287     g.wait_for_all();
288 
289     lightweight_policy_body body1 = tbb::flow::copy_body<lightweight_policy_body>(node1);
290     lightweight_policy_body body2 = tbb::flow::copy_body<lightweight_policy_body>(node2);
291     CHECK_MESSAGE( (body1.my_count == n), "Body of the first node needs to be executed N times");
292     CHECK_MESSAGE( (body2.my_count == n), "Body of the second node needs to be executed N times");
293 }
294 
295 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
296 #include <array>
297 #include <vector>
298 void test_follows_and_precedes_api() {
299     using msg_t = tbb::flow::continue_msg;
300 
301     std::array<msg_t, 3> messages_for_follows = { { msg_t(), msg_t(), msg_t() } };
302     std::vector<msg_t> messages_for_precedes  = { msg_t() };
303 
304     auto pass_through = [](const msg_t& msg) { return msg; };
305 
306     follows_and_precedes_testing::test_follows
307         <msg_t, tbb::flow::continue_node<msg_t>>
308         (messages_for_follows, pass_through, node_priority_t(0));
309 
310     follows_and_precedes_testing::test_precedes
311         <msg_t, tbb::flow::continue_node<msg_t>>
312         (messages_for_precedes, /* number_of_predecessors = */0, pass_through, node_priority_t(1));
313 }
314 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
315 
316 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
317 
318 template <typename ExpectedType, typename Body>
319 void test_deduction_guides_common(Body body) {
320     using namespace tbb::flow;
321     graph g;
322 
323     continue_node c1(g, body);
324     static_assert(std::is_same_v<decltype(c1), continue_node<ExpectedType>>);
325 
326     continue_node c2(g, body, lightweight());
327     static_assert(std::is_same_v<decltype(c2), continue_node<ExpectedType, lightweight>>);
328 
329     continue_node c3(g, 5, body);
330     static_assert(std::is_same_v<decltype(c3), continue_node<ExpectedType>>);
331 
332     continue_node c4(g, 5, body, lightweight());
333     static_assert(std::is_same_v<decltype(c4), continue_node<ExpectedType, lightweight>>);
334 
335     continue_node c5(g, body, node_priority_t(5));
336     static_assert(std::is_same_v<decltype(c5), continue_node<ExpectedType>>);
337 
338     continue_node c6(g, body, lightweight(), node_priority_t(5));
339     static_assert(std::is_same_v<decltype(c6), continue_node<ExpectedType, lightweight>>);
340 
341     continue_node c7(g, 5, body, node_priority_t(5));
342     static_assert(std::is_same_v<decltype(c7), continue_node<ExpectedType>>);
343 
344     continue_node c8(g, 5, body, lightweight(), node_priority_t(5));
345     static_assert(std::is_same_v<decltype(c8), continue_node<ExpectedType, lightweight>>);
346 
347 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
348     broadcast_node<continue_msg> b(g);
349 
350     continue_node c9(follows(b), body);
351     static_assert(std::is_same_v<decltype(c9), continue_node<ExpectedType>>);
352 
353     continue_node c10(follows(b), body, lightweight());
354     static_assert(std::is_same_v<decltype(c10), continue_node<ExpectedType, lightweight>>);
355 
356     continue_node c11(follows(b), 5, body);
357     static_assert(std::is_same_v<decltype(c11), continue_node<ExpectedType>>);
358 
359     continue_node c12(follows(b), 5, body, lightweight());
360     static_assert(std::is_same_v<decltype(c12), continue_node<ExpectedType, lightweight>>);
361 
362     continue_node c13(follows(b), body, node_priority_t(5));
363     static_assert(std::is_same_v<decltype(c13), continue_node<ExpectedType>>);
364 
365     continue_node c14(follows(b), body, lightweight(), node_priority_t(5));
366     static_assert(std::is_same_v<decltype(c14), continue_node<ExpectedType, lightweight>>);
367 
368     continue_node c15(follows(b), 5, body, node_priority_t(5));
369     static_assert(std::is_same_v<decltype(c15), continue_node<ExpectedType>>);
370 
371     continue_node c16(follows(b), 5, body, lightweight(), node_priority_t(5));
372     static_assert(std::is_same_v<decltype(c16), continue_node<ExpectedType, lightweight>>);
373 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
374 
375     continue_node c17(c1);
376     static_assert(std::is_same_v<decltype(c17), continue_node<ExpectedType>>);
377 }
378 
379 int continue_body_f(const tbb::flow::continue_msg&) { return 1; }
380 void continue_void_body_f(const tbb::flow::continue_msg&) {}
381 
382 void test_deduction_guides() {
383     using tbb::flow::continue_msg;
384     test_deduction_guides_common<int>([](const continue_msg&)->int { return 1; } );
385     test_deduction_guides_common<continue_msg>([](const continue_msg&) {});
386 
387     test_deduction_guides_common<int>([](const continue_msg&) mutable ->int { return 1; });
388     test_deduction_guides_common<continue_msg>([](const continue_msg&) mutable {});
389 
390     test_deduction_guides_common<int>(continue_body_f);
391     test_deduction_guides_common<continue_msg>(continue_void_body_f);
392 }
393 
394 #endif // __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
395 
396 // TODO: use pass_through from test_function_node instead
397 template<typename T>
398 struct passing_body {
399     T operator()(const T& val) {
400         return val;
401     }
402 };
403 
404 /*
405     The test covers the case when a node with non-default mutex type is a predecessor for continue_node,
406     because there used to be a bug when make_edge(node, continue_node)
407     did not update continue_node's predecesosor threshold
408     since the specialization of node's successor_cache for a continue_node was not chosen.
409 */
410 void test_successor_cache_specialization() {
411     using namespace tbb::flow;
412 
413     graph g;
414 
415     broadcast_node<continue_msg> node_with_default_mutex_type(g);
416     buffer_node<continue_msg> node_with_non_default_mutex_type(g);
417 
418     continue_node<continue_msg> node(g, passing_body<continue_msg>());
419 
420     make_edge(node_with_default_mutex_type, node);
421     make_edge(node_with_non_default_mutex_type, node);
422 
423     buffer_node<continue_msg> buf(g);
424 
425     make_edge(node, buf);
426 
427     node_with_default_mutex_type.try_put(continue_msg());
428     node_with_non_default_mutex_type.try_put(continue_msg());
429 
430     g.wait_for_all();
431 
432     continue_msg storage;
433     CHECK_MESSAGE((buf.try_get(storage) && !buf.try_get(storage)),
434                   "Wrong number of messages is passed via continue_node");
435 }
436 
437 //! Test concurrent continue_node for correctness
438 //! \brief \ref error_guessing
439 TEST_CASE("Concurrency testing") {
440     for( unsigned p=utils::MinThread; p<=utils::MaxThread; ++p ) {
441         test_concurrency(p);
442     }
443 }
444 
445 //! Test concurrent continue_node in separate graphs
446 //! \brief \ref error_guessing
447 TEST_CASE("Two graphs") { test_two_graphs(); }
448 
449 //! Test basic behaviour with lightweight body
450 //! \brief \ref requirement \ref error_guessing
451 TEST_CASE( "Lightweight policy" ) { test_lightweight_policy(); }
452 
453 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
454 //! Test deprecated follows and preceedes API
455 //! \brief \ref error_guessing
456 TEST_CASE( "Support for follows and precedes API" ) { test_follows_and_precedes_api(); }
457 #endif
458 
459 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
460 //! Test deduction guides
461 //! \brief requirement
462 TEST_CASE( "Deduction guides" ) { test_deduction_guides(); }
463 #endif
464 
465 //! Test for successor cache specialization
466 //! \brief \ref regression
467 TEST_CASE( "Regression for successor cache specialization" ) {
468     test_successor_cache_specialization();
469 }
470