1 /*
2 Copyright (c) 2005-2023 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #include "common/config.h"
18
19 #include "tbb/flow_graph.h"
20
21 #include "common/test.h"
22 #include "common/utils.h"
23 #include "common/graph_utils.h"
24 #include "common/test_follows_and_precedes_api.h"
25 #include "common/concepts_common.h"
26
27
28 //! \file test_continue_node.cpp
29 //! \brief Test for [flow_graph.continue_node] specification
30
31
32 #define N 1000
33 #define MAX_NODES 4
34 #define C 8
35
36 // A class to use as a fake predecessor of continue_node
37 struct fake_continue_sender : public tbb::flow::sender<tbb::flow::continue_msg>
38 {
39 typedef tbb::flow::sender<tbb::flow::continue_msg>::successor_type successor_type;
40 // Define implementations of virtual methods that are abstract in the base class
register_successorfake_continue_sender41 bool register_successor( successor_type& ) override { return false; }
remove_successorfake_continue_sender42 bool remove_successor( successor_type& ) override { return false; }
43 };
44
45 template< typename InputType >
46 struct parallel_puts {
47
48 tbb::flow::receiver< InputType > * const my_exe_node;
49
parallel_putsparallel_puts50 parallel_puts( tbb::flow::receiver< InputType > &exe_node ) : my_exe_node(&exe_node) {}
51 parallel_puts& operator=(const parallel_puts&) = delete;
52
operator ()parallel_puts53 void operator()( int ) const {
54 for ( int i = 0; i < N; ++i ) {
55 // the nodes will accept all puts
56 CHECK_MESSAGE( my_exe_node->try_put( InputType() ) == true, "" );
57 }
58 }
59
60 };
61
62 template< typename OutputType >
run_continue_nodes(int p,tbb::flow::graph & g,tbb::flow::continue_node<OutputType> & n)63 void run_continue_nodes( int p, tbb::flow::graph& g, tbb::flow::continue_node< OutputType >& n ) {
64 fake_continue_sender fake_sender;
65 for (size_t i = 0; i < N; ++i) {
66 tbb::detail::d1::register_predecessor(n, fake_sender);
67 }
68
69 for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
70 std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers;
71 for (size_t i = 0; i < num_receivers; ++i) {
72 receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) );
73 }
74 harness_graph_executor<tbb::flow::continue_msg, OutputType>::execute_count = 0;
75
76 for (size_t r = 0; r < num_receivers; ++r ) {
77 tbb::flow::make_edge( n, *receivers[r] );
78 }
79
80 utils::NativeParallelFor( p, parallel_puts<tbb::flow::continue_msg>(n) );
81 g.wait_for_all();
82
83 // 2) the nodes will receive puts from multiple predecessors simultaneously,
84 size_t ec = harness_graph_executor<tbb::flow::continue_msg, OutputType>::execute_count;
85 CHECK_MESSAGE( (int)ec == p, "" );
86 for (size_t r = 0; r < num_receivers; ++r ) {
87 size_t c = receivers[r]->my_count;
88 // 3) the nodes will send to multiple successors.
89 CHECK_MESSAGE( (int)c == p, "" );
90 }
91
92 for (size_t r = 0; r < num_receivers; ++r ) {
93 tbb::flow::remove_edge( n, *receivers[r] );
94 }
95 }
96 }
97
98 template< typename OutputType, typename Body >
continue_nodes(Body body)99 void continue_nodes( Body body ) {
100 for (int p = 1; p < 2*4/*MaxThread*/; ++p) {
101 tbb::flow::graph g;
102 tbb::flow::continue_node< OutputType > exe_node( g, body );
103 run_continue_nodes( p, g, exe_node);
104 exe_node.try_put(tbb::flow::continue_msg());
105 tbb::flow::continue_node< OutputType > exe_node_copy( exe_node );
106 run_continue_nodes( p, g, exe_node_copy);
107 }
108 }
109
110 const size_t Offset = 123;
111 std::atomic<size_t> global_execute_count;
112
113 template< typename OutputType >
114 struct inc_functor {
115
116 std::atomic<size_t> local_execute_count;
inc_functorinc_functor117 inc_functor( ) { local_execute_count = 0; }
inc_functorinc_functor118 inc_functor( const inc_functor &f ) { local_execute_count = size_t(f.local_execute_count); }
operator =inc_functor119 void operator=(const inc_functor &f) { local_execute_count = size_t(f.local_execute_count); }
120
operator ()inc_functor121 OutputType operator()( tbb::flow::continue_msg ) {
122 ++global_execute_count;
123 ++local_execute_count;
124 return OutputType();
125 }
126
127 };
128
129 template< typename OutputType >
continue_nodes_with_copy()130 void continue_nodes_with_copy( ) {
131
132 for (int p = 1; p < 2*4/*MaxThread*/; ++p) {
133 tbb::flow::graph g;
134 inc_functor<OutputType> cf;
135 cf.local_execute_count = Offset;
136 global_execute_count = Offset;
137
138 tbb::flow::continue_node< OutputType > exe_node( g, cf );
139 fake_continue_sender fake_sender;
140 for (size_t i = 0; i < N; ++i) {
141 tbb::detail::d1::register_predecessor(exe_node, fake_sender);
142 }
143
144 for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
145 std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers;
146 for (size_t i = 0; i < num_receivers; ++i) {
147 receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) );
148 }
149
150 for (size_t r = 0; r < num_receivers; ++r ) {
151 tbb::flow::make_edge( exe_node, *receivers[r] );
152 }
153
154 utils::NativeParallelFor( p, parallel_puts<tbb::flow::continue_msg>(exe_node) );
155 g.wait_for_all();
156
157 // 2) the nodes will receive puts from multiple predecessors simultaneously,
158 for (size_t r = 0; r < num_receivers; ++r ) {
159 size_t c = receivers[r]->my_count;
160 // 3) the nodes will send to multiple successors.
161 CHECK_MESSAGE( (int)c == p, "" );
162 }
163 for (size_t r = 0; r < num_receivers; ++r ) {
164 tbb::flow::remove_edge( exe_node, *receivers[r] );
165 }
166 }
167
168 // validate that the local body matches the global execute_count and both are correct
169 inc_functor<OutputType> body_copy = tbb::flow::copy_body< inc_functor<OutputType> >( exe_node );
170 const size_t expected_count = p*MAX_NODES + Offset;
171 size_t global_count = global_execute_count;
172 size_t inc_count = body_copy.local_execute_count;
173 CHECK_MESSAGE( global_count == expected_count, "" );
174 CHECK_MESSAGE( global_count == inc_count, "" );
175 g.reset(tbb::flow::rf_reset_bodies);
176 body_copy = tbb::flow::copy_body< inc_functor<OutputType> >( exe_node );
177 inc_count = body_copy.local_execute_count;
178 CHECK_MESSAGE( ( Offset == inc_count), "reset(rf_reset_bodies) did not reset functor" );
179
180 }
181 }
182
183 template< typename OutputType >
run_continue_nodes()184 void run_continue_nodes() {
185 harness_graph_executor< tbb::flow::continue_msg, OutputType>::max_executors = 0;
186 continue_nodes<OutputType>( []( tbb::flow::continue_msg i ) -> OutputType { return harness_graph_executor<tbb::flow::continue_msg, OutputType>::func(i); } );
187 continue_nodes<OutputType>( &harness_graph_executor<tbb::flow::continue_msg, OutputType>::func );
188 continue_nodes<OutputType>( typename harness_graph_executor<tbb::flow::continue_msg, OutputType>::functor() );
189 continue_nodes_with_copy<OutputType>();
190 }
191
192 //! Tests limited concurrency cases for nodes that accept data messages
test_concurrency(int num_threads)193 void test_concurrency(int num_threads) {
194 tbb::task_arena arena(num_threads);
195 arena.execute(
196 [&] {
197 run_continue_nodes<tbb::flow::continue_msg>();
198 run_continue_nodes<int>();
199 run_continue_nodes<utils::NoAssign>();
200 }
201 );
202 }
203 /*
204 * Connection of two graphs is not currently supported, but works to some limited extent.
205 * This test is included to check for backward compatibility. It checks that a continue_node
206 * with predecessors in two different graphs receives the required
207 * number of continue messages before it executes.
208 */
209 using namespace tbb::flow;
210
211 struct add_to_counter {
212 int* counter;
add_to_counteradd_to_counter213 add_to_counter(int& var):counter(&var){}
operator ()add_to_counter214 void operator()(continue_msg){*counter+=1;}
215 };
216
test_two_graphs()217 void test_two_graphs(){
218 int count=0;
219
220 //graph g with broadcast_node and continue_node
221 graph g;
222 broadcast_node<continue_msg> start_g(g);
223 continue_node<continue_msg> first_g(g, add_to_counter(count));
224
225 //graph h with broadcast_node
226 graph h;
227 broadcast_node<continue_msg> start_h(h);
228
229 //making two edges to first_g from the two graphs
230 make_edge(start_g,first_g);
231 make_edge(start_h, first_g);
232
233 //two try_puts from the two graphs
234 start_g.try_put(continue_msg());
235 start_h.try_put(continue_msg());
236 g.wait_for_all();
237 CHECK_MESSAGE( (count==1), "Not all continue messages received");
238
239 //two try_puts from the graph that doesn't contain the node
240 count=0;
241 start_h.try_put(continue_msg());
242 start_h.try_put(continue_msg());
243 g.wait_for_all();
244 CHECK_MESSAGE( (count==1), "Not all continue messages received -1");
245
246 //only one try_put
247 count=0;
248 start_g.try_put(continue_msg());
249 g.wait_for_all();
250 CHECK_MESSAGE( (count==0), "Node executed without waiting for all predecessors");
251 }
252
253 struct lightweight_policy_body {
254 const std::thread::id my_thread_id;
255 std::atomic<size_t>& my_count;
256
lightweight_policy_bodylightweight_policy_body257 lightweight_policy_body( std::atomic<size_t>& count )
258 : my_thread_id(std::this_thread::get_id()), my_count(count)
259 {
260 my_count = 0;
261 }
262
263 lightweight_policy_body( const lightweight_policy_body& ) = default;
264 lightweight_policy_body& operator=( const lightweight_policy_body& ) = delete;
265
operator ()lightweight_policy_body266 void operator()( tbb::flow::continue_msg ) {
267 ++my_count;
268 std::thread::id body_thread_id = std::this_thread::get_id();
269 CHECK_MESSAGE( (body_thread_id == my_thread_id), "Body executed as not lightweight");
270 }
271 };
272
test_lightweight_policy()273 void test_lightweight_policy() {
274 tbb::flow::graph g;
275 std::atomic<size_t> count1;
276 std::atomic<size_t> count2;
277 tbb::flow::continue_node<tbb::flow::continue_msg, tbb::flow::lightweight>
278 node1(g, lightweight_policy_body(count1));
279 tbb::flow::continue_node<tbb::flow::continue_msg, tbb::flow::lightweight>
280 node2(g, lightweight_policy_body(count2));
281
282 tbb::flow::make_edge(node1, node2);
283 const size_t n = 10;
284 for(size_t i = 0; i < n; ++i) {
285 node1.try_put(tbb::flow::continue_msg());
286 }
287 g.wait_for_all();
288
289 lightweight_policy_body body1 = tbb::flow::copy_body<lightweight_policy_body>(node1);
290 lightweight_policy_body body2 = tbb::flow::copy_body<lightweight_policy_body>(node2);
291 CHECK_MESSAGE( (body1.my_count == n), "Body of the first node needs to be executed N times");
292 CHECK_MESSAGE( (body2.my_count == n), "Body of the second node needs to be executed N times");
293 }
294
295 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
296 #include <array>
297 #include <vector>
test_follows_and_precedes_api()298 void test_follows_and_precedes_api() {
299 using msg_t = tbb::flow::continue_msg;
300
301 std::array<msg_t, 3> messages_for_follows = { { msg_t(), msg_t(), msg_t() } };
302 std::vector<msg_t> messages_for_precedes = { msg_t() };
303
304 auto pass_through = [](const msg_t& msg) { return msg; };
305
306 follows_and_precedes_testing::test_follows
307 <msg_t, tbb::flow::continue_node<msg_t>>
308 (messages_for_follows, pass_through, node_priority_t(0));
309
310 follows_and_precedes_testing::test_precedes
311 <msg_t, tbb::flow::continue_node<msg_t>>
312 (messages_for_precedes, /* number_of_predecessors = */0, pass_through, node_priority_t(1));
313 }
314 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
315
316 // TODO: use pass_through from test_function_node instead
317 template<typename T>
318 struct passing_body {
operator ()passing_body319 T operator()(const T& val) {
320 return val;
321 }
322 };
323
324 /*
325 The test covers the case when a node with non-default mutex type is a predecessor for continue_node,
326 because there used to be a bug when make_edge(node, continue_node)
327 did not update continue_node's predecesosor threshold
328 since the specialization of node's successor_cache for a continue_node was not chosen.
329 */
test_successor_cache_specialization()330 void test_successor_cache_specialization() {
331 using namespace tbb::flow;
332
333 graph g;
334
335 broadcast_node<continue_msg> node_with_default_mutex_type(g);
336 buffer_node<continue_msg> node_with_non_default_mutex_type(g);
337
338 continue_node<continue_msg> node(g, passing_body<continue_msg>());
339
340 make_edge(node_with_default_mutex_type, node);
341 make_edge(node_with_non_default_mutex_type, node);
342
343 buffer_node<continue_msg> buf(g);
344
345 make_edge(node, buf);
346
347 node_with_default_mutex_type.try_put(continue_msg());
348 node_with_non_default_mutex_type.try_put(continue_msg());
349
350 g.wait_for_all();
351
352 continue_msg storage;
353 CHECK_MESSAGE((buf.try_get(storage) && !buf.try_get(storage)),
354 "Wrong number of messages is passed via continue_node");
355 }
356
357 //! Test concurrent continue_node for correctness
358 //! \brief \ref error_guessing
359 TEST_CASE("Concurrency testing") {
360 for( unsigned p=utils::MinThread; p<=utils::MaxThread; ++p ) {
361 test_concurrency(p);
362 }
363 }
364
365 //! Test concurrent continue_node in separate graphs
366 //! \brief \ref error_guessing
367 TEST_CASE("Two graphs") { test_two_graphs(); }
368
369 //! Test basic behaviour with lightweight body
370 //! \brief \ref requirement \ref error_guessing
371 TEST_CASE( "Lightweight policy" ) { test_lightweight_policy(); }
372
373 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
374 //! Test deprecated follows and precedes API
375 //! \brief \ref error_guessing
376 TEST_CASE( "Support for follows and precedes API" ) { test_follows_and_precedes_api(); }
377 #endif
378
379 //! Test for successor cache specialization
380 //! \brief \ref regression
381 TEST_CASE( "Regression for successor cache specialization" ) {
382 test_successor_cache_specialization();
383 }
384
385 #if __TBB_CPP20_CONCEPTS_PRESENT
386 //! \brief \ref error_guessing
387 TEST_CASE("constraints for continue_node input") {
388 static_assert(utils::well_formed_instantiation<tbb::flow::continue_node, test_concepts::Copyable>);
389 static_assert(!utils::well_formed_instantiation<tbb::flow::continue_node, test_concepts::NonCopyable>);
390 }
391
392 template <typename Input, typename Body>
393 concept can_call_continue_node_ctor = requires( tbb::flow::graph& graph, Body body,
394 tbb::flow::buffer_node<int>& f, std::size_t num,
395 tbb::flow::node_priority_t priority ) {
396 tbb::flow::continue_node<Input>(graph, body);
397 tbb::flow::continue_node<Input>(graph, body, priority);
398 tbb::flow::continue_node<Input>(graph, num, body);
399 tbb::flow::continue_node<Input>(graph, num, body, priority);
400 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
401 tbb::flow::continue_node<Input>(tbb::flow::follows(f), body);
402 tbb::flow::continue_node<Input>(tbb::flow::follows(f), body, priority);
403 tbb::flow::continue_node<Input>(tbb::flow::follows(f), num, body);
404 tbb::flow::continue_node<Input>(tbb::flow::follows(f), num, body, priority);
405 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
406 };
407
408 //! \brief \ref error_guessing
409 TEST_CASE("constraints for continue_node body") {
410 using output_type = int;
411 using namespace test_concepts::continue_node_body;
412
413 static_assert(can_call_continue_node_ctor<output_type, Correct<output_type>>);
414 static_assert(!can_call_continue_node_ctor<output_type, NonCopyable<output_type>>);
415 static_assert(!can_call_continue_node_ctor<output_type, NonDestructible<output_type>>);
416 static_assert(!can_call_continue_node_ctor<output_type, NoOperatorRoundBrackets<output_type>>);
417 static_assert(!can_call_continue_node_ctor<output_type, WrongInputOperatorRoundBrackets<output_type>>);
418 static_assert(!can_call_continue_node_ctor<output_type, WrongReturnOperatorRoundBrackets<output_type>>);
419 }
420 #endif // __TBB_CPP20_CONCEPTS_PRESENT
421