xref: /oneTBB/test/tbb/test_continue_node.cpp (revision 51c0b2f7)
1*51c0b2f7Stbbdev /*
2*51c0b2f7Stbbdev     Copyright (c) 2005-2020 Intel Corporation
3*51c0b2f7Stbbdev 
4*51c0b2f7Stbbdev     Licensed under the Apache License, Version 2.0 (the "License");
5*51c0b2f7Stbbdev     you may not use this file except in compliance with the License.
6*51c0b2f7Stbbdev     You may obtain a copy of the License at
7*51c0b2f7Stbbdev 
8*51c0b2f7Stbbdev         http://www.apache.org/licenses/LICENSE-2.0
9*51c0b2f7Stbbdev 
10*51c0b2f7Stbbdev     Unless required by applicable law or agreed to in writing, software
11*51c0b2f7Stbbdev     distributed under the License is distributed on an "AS IS" BASIS,
12*51c0b2f7Stbbdev     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13*51c0b2f7Stbbdev     See the License for the specific language governing permissions and
14*51c0b2f7Stbbdev     limitations under the License.
15*51c0b2f7Stbbdev */
16*51c0b2f7Stbbdev 
17*51c0b2f7Stbbdev #include "common/config.h"
18*51c0b2f7Stbbdev 
19*51c0b2f7Stbbdev // TODO revamp: move parts dependent on __TBB_EXTRA_DEBUG into separate test(s) since having these
20*51c0b2f7Stbbdev // parts in all of tests might make testing of the product, which is different from what is actually
21*51c0b2f7Stbbdev // released.
22*51c0b2f7Stbbdev #define __TBB_EXTRA_DEBUG 1
23*51c0b2f7Stbbdev #include "tbb/flow_graph.h"
24*51c0b2f7Stbbdev 
25*51c0b2f7Stbbdev #include "common/test.h"
26*51c0b2f7Stbbdev #include "common/utils.h"
27*51c0b2f7Stbbdev #include "common/graph_utils.h"
28*51c0b2f7Stbbdev #include "common/test_follows_and_precedes_api.h"
29*51c0b2f7Stbbdev 
30*51c0b2f7Stbbdev 
31*51c0b2f7Stbbdev //! \file test_continue_node.cpp
32*51c0b2f7Stbbdev //! \brief Test for [flow_graph.continue_node] specification
33*51c0b2f7Stbbdev 
34*51c0b2f7Stbbdev 
35*51c0b2f7Stbbdev #define N 1000
36*51c0b2f7Stbbdev #define MAX_NODES 4
37*51c0b2f7Stbbdev #define C 8
38*51c0b2f7Stbbdev 
39*51c0b2f7Stbbdev // A class to use as a fake predecessor of continue_node
40*51c0b2f7Stbbdev struct fake_continue_sender : public tbb::flow::sender<tbb::flow::continue_msg>
41*51c0b2f7Stbbdev {
42*51c0b2f7Stbbdev     typedef tbb::flow::sender<tbb::flow::continue_msg>::successor_type successor_type;
43*51c0b2f7Stbbdev     // Define implementations of virtual methods that are abstract in the base class
44*51c0b2f7Stbbdev     bool register_successor( successor_type& ) override { return false; }
45*51c0b2f7Stbbdev     bool remove_successor( successor_type& )   override { return false; }
46*51c0b2f7Stbbdev };
47*51c0b2f7Stbbdev 
48*51c0b2f7Stbbdev template< typename InputType >
49*51c0b2f7Stbbdev struct parallel_puts {
50*51c0b2f7Stbbdev 
51*51c0b2f7Stbbdev     tbb::flow::receiver< InputType > * const my_exe_node;
52*51c0b2f7Stbbdev 
53*51c0b2f7Stbbdev     parallel_puts( tbb::flow::receiver< InputType > &exe_node ) : my_exe_node(&exe_node) {}
54*51c0b2f7Stbbdev     parallel_puts& operator=(const parallel_puts&) = delete;
55*51c0b2f7Stbbdev 
56*51c0b2f7Stbbdev     void operator()( int ) const  {
57*51c0b2f7Stbbdev         for ( int i = 0; i < N; ++i ) {
58*51c0b2f7Stbbdev             // the nodes will accept all puts
59*51c0b2f7Stbbdev             CHECK_MESSAGE( my_exe_node->try_put( InputType() ) == true, "" );
60*51c0b2f7Stbbdev         }
61*51c0b2f7Stbbdev     }
62*51c0b2f7Stbbdev 
63*51c0b2f7Stbbdev };
64*51c0b2f7Stbbdev 
65*51c0b2f7Stbbdev template< typename OutputType >
66*51c0b2f7Stbbdev void run_continue_nodes( int p, tbb::flow::graph& g, tbb::flow::continue_node< OutputType >& n ) {
67*51c0b2f7Stbbdev     fake_continue_sender fake_sender;
68*51c0b2f7Stbbdev     for (size_t i = 0; i < N; ++i) {
69*51c0b2f7Stbbdev         tbb::detail::d1::register_predecessor(n, fake_sender);
70*51c0b2f7Stbbdev     }
71*51c0b2f7Stbbdev 
72*51c0b2f7Stbbdev     for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
73*51c0b2f7Stbbdev         std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers;
74*51c0b2f7Stbbdev         for (size_t i = 0; i < num_receivers; ++i) {
75*51c0b2f7Stbbdev             receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) );
76*51c0b2f7Stbbdev         }
77*51c0b2f7Stbbdev         harness_graph_executor<tbb::flow::continue_msg, OutputType>::execute_count = 0;
78*51c0b2f7Stbbdev 
79*51c0b2f7Stbbdev         for (size_t r = 0; r < num_receivers; ++r ) {
80*51c0b2f7Stbbdev             tbb::flow::make_edge( n, *receivers[r] );
81*51c0b2f7Stbbdev         }
82*51c0b2f7Stbbdev 
83*51c0b2f7Stbbdev         utils::NativeParallelFor( p, parallel_puts<tbb::flow::continue_msg>(n) );
84*51c0b2f7Stbbdev         g.wait_for_all();
85*51c0b2f7Stbbdev 
86*51c0b2f7Stbbdev         // 2) the nodes will receive puts from multiple predecessors simultaneously,
87*51c0b2f7Stbbdev         size_t ec = harness_graph_executor<tbb::flow::continue_msg, OutputType>::execute_count;
88*51c0b2f7Stbbdev         CHECK_MESSAGE( (int)ec == p, "" );
89*51c0b2f7Stbbdev         for (size_t r = 0; r < num_receivers; ++r ) {
90*51c0b2f7Stbbdev             size_t c = receivers[r]->my_count;
91*51c0b2f7Stbbdev             // 3) the nodes will send to multiple successors.
92*51c0b2f7Stbbdev             CHECK_MESSAGE( (int)c == p, "" );
93*51c0b2f7Stbbdev         }
94*51c0b2f7Stbbdev 
95*51c0b2f7Stbbdev         for (size_t r = 0; r < num_receivers; ++r ) {
96*51c0b2f7Stbbdev             tbb::flow::remove_edge( n, *receivers[r] );
97*51c0b2f7Stbbdev         }
98*51c0b2f7Stbbdev     }
99*51c0b2f7Stbbdev }
100*51c0b2f7Stbbdev 
101*51c0b2f7Stbbdev template< typename OutputType, typename Body >
102*51c0b2f7Stbbdev void continue_nodes( Body body ) {
103*51c0b2f7Stbbdev     for (int p = 1; p < 2*4/*MaxThread*/; ++p) {
104*51c0b2f7Stbbdev         tbb::flow::graph g;
105*51c0b2f7Stbbdev         tbb::flow::continue_node< OutputType > exe_node( g, body );
106*51c0b2f7Stbbdev         run_continue_nodes( p, g, exe_node);
107*51c0b2f7Stbbdev         exe_node.try_put(tbb::flow::continue_msg());
108*51c0b2f7Stbbdev         tbb::flow::continue_node< OutputType > exe_node_copy( exe_node );
109*51c0b2f7Stbbdev         run_continue_nodes( p, g, exe_node_copy);
110*51c0b2f7Stbbdev     }
111*51c0b2f7Stbbdev }
112*51c0b2f7Stbbdev 
113*51c0b2f7Stbbdev const size_t Offset = 123;
114*51c0b2f7Stbbdev std::atomic<size_t> global_execute_count;
115*51c0b2f7Stbbdev 
116*51c0b2f7Stbbdev template< typename OutputType >
117*51c0b2f7Stbbdev struct inc_functor {
118*51c0b2f7Stbbdev 
119*51c0b2f7Stbbdev     std::atomic<size_t> local_execute_count;
120*51c0b2f7Stbbdev     inc_functor( ) { local_execute_count = 0; }
121*51c0b2f7Stbbdev     inc_functor( const inc_functor &f ) { local_execute_count = size_t(f.local_execute_count); }
122*51c0b2f7Stbbdev     void operator=(const inc_functor &f) { local_execute_count = size_t(f.local_execute_count); }
123*51c0b2f7Stbbdev 
124*51c0b2f7Stbbdev     OutputType operator()( tbb::flow::continue_msg ) {
125*51c0b2f7Stbbdev        ++global_execute_count;
126*51c0b2f7Stbbdev        ++local_execute_count;
127*51c0b2f7Stbbdev        return OutputType();
128*51c0b2f7Stbbdev     }
129*51c0b2f7Stbbdev 
130*51c0b2f7Stbbdev };
131*51c0b2f7Stbbdev 
132*51c0b2f7Stbbdev template< typename OutputType >
133*51c0b2f7Stbbdev void continue_nodes_with_copy( ) {
134*51c0b2f7Stbbdev 
135*51c0b2f7Stbbdev     for (int p = 1; p < 2*4/*MaxThread*/; ++p) {
136*51c0b2f7Stbbdev         tbb::flow::graph g;
137*51c0b2f7Stbbdev         inc_functor<OutputType> cf;
138*51c0b2f7Stbbdev         cf.local_execute_count = Offset;
139*51c0b2f7Stbbdev         global_execute_count = Offset;
140*51c0b2f7Stbbdev 
141*51c0b2f7Stbbdev         tbb::flow::continue_node< OutputType > exe_node( g, cf );
142*51c0b2f7Stbbdev         fake_continue_sender fake_sender;
143*51c0b2f7Stbbdev         for (size_t i = 0; i < N; ++i) {
144*51c0b2f7Stbbdev             tbb::detail::d1::register_predecessor(exe_node, fake_sender);
145*51c0b2f7Stbbdev         }
146*51c0b2f7Stbbdev 
147*51c0b2f7Stbbdev         for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
148*51c0b2f7Stbbdev             std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers;
149*51c0b2f7Stbbdev             for (size_t i = 0; i < num_receivers; ++i) {
150*51c0b2f7Stbbdev                 receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) );
151*51c0b2f7Stbbdev             }
152*51c0b2f7Stbbdev 
153*51c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
154*51c0b2f7Stbbdev                 tbb::flow::make_edge( exe_node, *receivers[r] );
155*51c0b2f7Stbbdev             }
156*51c0b2f7Stbbdev 
157*51c0b2f7Stbbdev             utils::NativeParallelFor( p, parallel_puts<tbb::flow::continue_msg>(exe_node) );
158*51c0b2f7Stbbdev             g.wait_for_all();
159*51c0b2f7Stbbdev 
160*51c0b2f7Stbbdev             // 2) the nodes will receive puts from multiple predecessors simultaneously,
161*51c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
162*51c0b2f7Stbbdev                 size_t c = receivers[r]->my_count;
163*51c0b2f7Stbbdev                 // 3) the nodes will send to multiple successors.
164*51c0b2f7Stbbdev                 CHECK_MESSAGE( (int)c == p, "" );
165*51c0b2f7Stbbdev             }
166*51c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
167*51c0b2f7Stbbdev                 tbb::flow::remove_edge( exe_node, *receivers[r] );
168*51c0b2f7Stbbdev             }
169*51c0b2f7Stbbdev         }
170*51c0b2f7Stbbdev 
171*51c0b2f7Stbbdev         // validate that the local body matches the global execute_count and both are correct
172*51c0b2f7Stbbdev         inc_functor<OutputType> body_copy = tbb::flow::copy_body< inc_functor<OutputType> >( exe_node );
173*51c0b2f7Stbbdev         const size_t expected_count = p*MAX_NODES + Offset;
174*51c0b2f7Stbbdev         size_t global_count = global_execute_count;
175*51c0b2f7Stbbdev         size_t inc_count = body_copy.local_execute_count;
176*51c0b2f7Stbbdev         CHECK_MESSAGE( global_count == expected_count, "" );
177*51c0b2f7Stbbdev         CHECK_MESSAGE( global_count == inc_count, "" );
178*51c0b2f7Stbbdev         g.reset(tbb::flow::rf_reset_bodies);
179*51c0b2f7Stbbdev         body_copy = tbb::flow::copy_body< inc_functor<OutputType> >( exe_node );
180*51c0b2f7Stbbdev         inc_count = body_copy.local_execute_count;
181*51c0b2f7Stbbdev         CHECK_MESSAGE( ( Offset == inc_count), "reset(rf_reset_bodies) did not reset functor" );
182*51c0b2f7Stbbdev 
183*51c0b2f7Stbbdev     }
184*51c0b2f7Stbbdev }
185*51c0b2f7Stbbdev 
186*51c0b2f7Stbbdev template< typename OutputType >
187*51c0b2f7Stbbdev void run_continue_nodes() {
188*51c0b2f7Stbbdev     harness_graph_executor< tbb::flow::continue_msg, OutputType>::max_executors = 0;
189*51c0b2f7Stbbdev     continue_nodes<OutputType>( []( tbb::flow::continue_msg i ) -> OutputType { return harness_graph_executor<tbb::flow::continue_msg, OutputType>::func(i); } );
190*51c0b2f7Stbbdev     continue_nodes<OutputType>( &harness_graph_executor<tbb::flow::continue_msg, OutputType>::func );
191*51c0b2f7Stbbdev     continue_nodes<OutputType>( typename harness_graph_executor<tbb::flow::continue_msg, OutputType>::functor() );
192*51c0b2f7Stbbdev     continue_nodes_with_copy<OutputType>();
193*51c0b2f7Stbbdev }
194*51c0b2f7Stbbdev 
195*51c0b2f7Stbbdev //! Tests limited concurrency cases for nodes that accept data messages
196*51c0b2f7Stbbdev void test_concurrency(int num_threads) {
197*51c0b2f7Stbbdev     tbb::task_arena arena(num_threads);
198*51c0b2f7Stbbdev     arena.execute(
199*51c0b2f7Stbbdev         [&] {
200*51c0b2f7Stbbdev             run_continue_nodes<tbb::flow::continue_msg>();
201*51c0b2f7Stbbdev             run_continue_nodes<int>();
202*51c0b2f7Stbbdev             run_continue_nodes<utils::NoAssign>();
203*51c0b2f7Stbbdev         }
204*51c0b2f7Stbbdev     );
205*51c0b2f7Stbbdev }
206*51c0b2f7Stbbdev /*
207*51c0b2f7Stbbdev  * Connection of two graphs is not currently supported, but works to some limited extent.
208*51c0b2f7Stbbdev  * This test is included to check for backward compatibility. It checks that a continue_node
209*51c0b2f7Stbbdev  * with predecessors in two different graphs receives the required
210*51c0b2f7Stbbdev  * number of continue messages before it executes.
211*51c0b2f7Stbbdev  */
212*51c0b2f7Stbbdev using namespace tbb::flow;
213*51c0b2f7Stbbdev 
214*51c0b2f7Stbbdev struct add_to_counter {
215*51c0b2f7Stbbdev     int* counter;
216*51c0b2f7Stbbdev     add_to_counter(int& var):counter(&var){}
217*51c0b2f7Stbbdev     void operator()(continue_msg){*counter+=1;}
218*51c0b2f7Stbbdev };
219*51c0b2f7Stbbdev 
220*51c0b2f7Stbbdev void test_two_graphs(){
221*51c0b2f7Stbbdev     int count=0;
222*51c0b2f7Stbbdev 
223*51c0b2f7Stbbdev     //graph g with broadcast_node and continue_node
224*51c0b2f7Stbbdev     graph g;
225*51c0b2f7Stbbdev     broadcast_node<continue_msg> start_g(g);
226*51c0b2f7Stbbdev     continue_node<continue_msg> first_g(g, add_to_counter(count));
227*51c0b2f7Stbbdev 
228*51c0b2f7Stbbdev     //graph h with broadcast_node
229*51c0b2f7Stbbdev     graph h;
230*51c0b2f7Stbbdev     broadcast_node<continue_msg> start_h(h);
231*51c0b2f7Stbbdev 
232*51c0b2f7Stbbdev     //making two edges to first_g from the two graphs
233*51c0b2f7Stbbdev     make_edge(start_g,first_g);
234*51c0b2f7Stbbdev     make_edge(start_h, first_g);
235*51c0b2f7Stbbdev 
236*51c0b2f7Stbbdev     //two try_puts from the two graphs
237*51c0b2f7Stbbdev     start_g.try_put(continue_msg());
238*51c0b2f7Stbbdev     start_h.try_put(continue_msg());
239*51c0b2f7Stbbdev     g.wait_for_all();
240*51c0b2f7Stbbdev     CHECK_MESSAGE( (count==1), "Not all continue messages received");
241*51c0b2f7Stbbdev 
242*51c0b2f7Stbbdev     //two try_puts from the graph that doesn't contain the node
243*51c0b2f7Stbbdev     count=0;
244*51c0b2f7Stbbdev     start_h.try_put(continue_msg());
245*51c0b2f7Stbbdev     start_h.try_put(continue_msg());
246*51c0b2f7Stbbdev     g.wait_for_all();
247*51c0b2f7Stbbdev     CHECK_MESSAGE( (count==1), "Not all continue messages received -1");
248*51c0b2f7Stbbdev 
249*51c0b2f7Stbbdev     //only one try_put
250*51c0b2f7Stbbdev     count=0;
251*51c0b2f7Stbbdev     start_g.try_put(continue_msg());
252*51c0b2f7Stbbdev     g.wait_for_all();
253*51c0b2f7Stbbdev     CHECK_MESSAGE( (count==0), "Node executed without waiting for all predecessors");
254*51c0b2f7Stbbdev }
255*51c0b2f7Stbbdev 
256*51c0b2f7Stbbdev struct lightweight_policy_body {
257*51c0b2f7Stbbdev     const std::thread::id my_thread_id;
258*51c0b2f7Stbbdev     std::atomic<size_t>& my_count;
259*51c0b2f7Stbbdev 
260*51c0b2f7Stbbdev     lightweight_policy_body( std::atomic<size_t>& count )
261*51c0b2f7Stbbdev         : my_thread_id(std::this_thread::get_id()), my_count(count)
262*51c0b2f7Stbbdev     {
263*51c0b2f7Stbbdev         my_count = 0;
264*51c0b2f7Stbbdev     }
265*51c0b2f7Stbbdev     lightweight_policy_body& operator=(const lightweight_policy_body&) = delete;
266*51c0b2f7Stbbdev     void operator()(tbb::flow::continue_msg) {
267*51c0b2f7Stbbdev         ++my_count;
268*51c0b2f7Stbbdev         std::thread::id body_thread_id = std::this_thread::get_id();
269*51c0b2f7Stbbdev         CHECK_MESSAGE( (body_thread_id == my_thread_id), "Body executed as not lightweight");
270*51c0b2f7Stbbdev     }
271*51c0b2f7Stbbdev };
272*51c0b2f7Stbbdev 
273*51c0b2f7Stbbdev void test_lightweight_policy() {
274*51c0b2f7Stbbdev     tbb::flow::graph g;
275*51c0b2f7Stbbdev     std::atomic<size_t> count1;
276*51c0b2f7Stbbdev     std::atomic<size_t> count2;
277*51c0b2f7Stbbdev     tbb::flow::continue_node<tbb::flow::continue_msg, tbb::flow::lightweight>
278*51c0b2f7Stbbdev         node1(g, lightweight_policy_body(count1));
279*51c0b2f7Stbbdev     tbb::flow::continue_node<tbb::flow::continue_msg, tbb::flow::lightweight>
280*51c0b2f7Stbbdev         node2(g, lightweight_policy_body(count2));
281*51c0b2f7Stbbdev 
282*51c0b2f7Stbbdev     tbb::flow::make_edge(node1, node2);
283*51c0b2f7Stbbdev     const size_t n = 10;
284*51c0b2f7Stbbdev     for(size_t i = 0; i < n; ++i) {
285*51c0b2f7Stbbdev         node1.try_put(tbb::flow::continue_msg());
286*51c0b2f7Stbbdev     }
287*51c0b2f7Stbbdev     g.wait_for_all();
288*51c0b2f7Stbbdev 
289*51c0b2f7Stbbdev     lightweight_policy_body body1 = tbb::flow::copy_body<lightweight_policy_body>(node1);
290*51c0b2f7Stbbdev     lightweight_policy_body body2 = tbb::flow::copy_body<lightweight_policy_body>(node2);
291*51c0b2f7Stbbdev     CHECK_MESSAGE( (body1.my_count == n), "Body of the first node needs to be executed N times");
292*51c0b2f7Stbbdev     CHECK_MESSAGE( (body2.my_count == n), "Body of the second node needs to be executed N times");
293*51c0b2f7Stbbdev }
294*51c0b2f7Stbbdev 
295*51c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
296*51c0b2f7Stbbdev #include <array>
297*51c0b2f7Stbbdev #include <vector>
298*51c0b2f7Stbbdev void test_follows_and_precedes_api() {
299*51c0b2f7Stbbdev     using msg_t = tbb::flow::continue_msg;
300*51c0b2f7Stbbdev 
301*51c0b2f7Stbbdev     std::array<msg_t, 3> messages_for_follows = { { msg_t(), msg_t(), msg_t() } };
302*51c0b2f7Stbbdev     std::vector<msg_t> messages_for_precedes  = { msg_t() };
303*51c0b2f7Stbbdev 
304*51c0b2f7Stbbdev     auto pass_through = [](const msg_t& msg) { return msg; };
305*51c0b2f7Stbbdev 
306*51c0b2f7Stbbdev     follows_and_precedes_testing::test_follows
307*51c0b2f7Stbbdev         <msg_t, tbb::flow::continue_node<msg_t>>
308*51c0b2f7Stbbdev         (messages_for_follows, pass_through, node_priority_t(0));
309*51c0b2f7Stbbdev 
310*51c0b2f7Stbbdev     follows_and_precedes_testing::test_precedes
311*51c0b2f7Stbbdev         <msg_t, tbb::flow::continue_node<msg_t>>
312*51c0b2f7Stbbdev         (messages_for_precedes, /* number_of_predecessors = */0, pass_through, node_priority_t(1));
313*51c0b2f7Stbbdev }
314*51c0b2f7Stbbdev #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
315*51c0b2f7Stbbdev 
316*51c0b2f7Stbbdev #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
317*51c0b2f7Stbbdev 
318*51c0b2f7Stbbdev template <typename ExpectedType, typename Body>
319*51c0b2f7Stbbdev void test_deduction_guides_common(Body body) {
320*51c0b2f7Stbbdev     using namespace tbb::flow;
321*51c0b2f7Stbbdev     graph g;
322*51c0b2f7Stbbdev 
323*51c0b2f7Stbbdev     continue_node c1(g, body);
324*51c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(c1), continue_node<ExpectedType>>);
325*51c0b2f7Stbbdev 
326*51c0b2f7Stbbdev     continue_node c2(g, body, lightweight());
327*51c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(c2), continue_node<ExpectedType, lightweight>>);
328*51c0b2f7Stbbdev 
329*51c0b2f7Stbbdev     continue_node c3(g, 5, body);
330*51c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(c3), continue_node<ExpectedType>>);
331*51c0b2f7Stbbdev 
332*51c0b2f7Stbbdev     continue_node c4(g, 5, body, lightweight());
333*51c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(c4), continue_node<ExpectedType, lightweight>>);
334*51c0b2f7Stbbdev 
335*51c0b2f7Stbbdev     continue_node c5(g, body, node_priority_t(5));
336*51c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(c5), continue_node<ExpectedType>>);
337*51c0b2f7Stbbdev 
338*51c0b2f7Stbbdev     continue_node c6(g, body, lightweight(), node_priority_t(5));
339*51c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(c6), continue_node<ExpectedType, lightweight>>);
340*51c0b2f7Stbbdev 
341*51c0b2f7Stbbdev     continue_node c7(g, 5, body, node_priority_t(5));
342*51c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(c7), continue_node<ExpectedType>>);
343*51c0b2f7Stbbdev 
344*51c0b2f7Stbbdev     continue_node c8(g, 5, body, lightweight(), node_priority_t(5));
345*51c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(c8), continue_node<ExpectedType, lightweight>>);
346*51c0b2f7Stbbdev 
347*51c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
348*51c0b2f7Stbbdev     broadcast_node<continue_msg> b(g);
349*51c0b2f7Stbbdev 
350*51c0b2f7Stbbdev     continue_node c9(follows(b), body);
351*51c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(c9), continue_node<ExpectedType>>);
352*51c0b2f7Stbbdev 
353*51c0b2f7Stbbdev     continue_node c10(follows(b), body, lightweight());
354*51c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(c10), continue_node<ExpectedType, lightweight>>);
355*51c0b2f7Stbbdev 
356*51c0b2f7Stbbdev     continue_node c11(follows(b), 5, body);
357*51c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(c11), continue_node<ExpectedType>>);
358*51c0b2f7Stbbdev 
359*51c0b2f7Stbbdev     continue_node c12(follows(b), 5, body, lightweight());
360*51c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(c12), continue_node<ExpectedType, lightweight>>);
361*51c0b2f7Stbbdev 
362*51c0b2f7Stbbdev     continue_node c13(follows(b), body, node_priority_t(5));
363*51c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(c13), continue_node<ExpectedType>>);
364*51c0b2f7Stbbdev 
365*51c0b2f7Stbbdev     continue_node c14(follows(b), body, lightweight(), node_priority_t(5));
366*51c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(c14), continue_node<ExpectedType, lightweight>>);
367*51c0b2f7Stbbdev 
368*51c0b2f7Stbbdev     continue_node c15(follows(b), 5, body, node_priority_t(5));
369*51c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(c15), continue_node<ExpectedType>>);
370*51c0b2f7Stbbdev 
371*51c0b2f7Stbbdev     continue_node c16(follows(b), 5, body, lightweight(), node_priority_t(5));
372*51c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(c16), continue_node<ExpectedType, lightweight>>);
373*51c0b2f7Stbbdev #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
374*51c0b2f7Stbbdev 
375*51c0b2f7Stbbdev     continue_node c17(c1);
376*51c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(c17), continue_node<ExpectedType>>);
377*51c0b2f7Stbbdev }
378*51c0b2f7Stbbdev 
379*51c0b2f7Stbbdev int continue_body_f(const tbb::flow::continue_msg&) { return 1; }
380*51c0b2f7Stbbdev void continue_void_body_f(const tbb::flow::continue_msg&) {}
381*51c0b2f7Stbbdev 
382*51c0b2f7Stbbdev void test_deduction_guides() {
383*51c0b2f7Stbbdev     using tbb::flow::continue_msg;
384*51c0b2f7Stbbdev     test_deduction_guides_common<int>([](const continue_msg&)->int { return 1; } );
385*51c0b2f7Stbbdev     test_deduction_guides_common<continue_msg>([](const continue_msg&) {});
386*51c0b2f7Stbbdev 
387*51c0b2f7Stbbdev     test_deduction_guides_common<int>([](const continue_msg&) mutable ->int { return 1; });
388*51c0b2f7Stbbdev     test_deduction_guides_common<continue_msg>([](const continue_msg&) mutable {});
389*51c0b2f7Stbbdev 
390*51c0b2f7Stbbdev     test_deduction_guides_common<int>(continue_body_f);
391*51c0b2f7Stbbdev     test_deduction_guides_common<continue_msg>(continue_void_body_f);
392*51c0b2f7Stbbdev }
393*51c0b2f7Stbbdev 
394*51c0b2f7Stbbdev #endif // __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
395*51c0b2f7Stbbdev 
396*51c0b2f7Stbbdev // TODO: use pass_through from test_function_node instead
397*51c0b2f7Stbbdev template<typename T>
398*51c0b2f7Stbbdev struct passing_body {
399*51c0b2f7Stbbdev     T operator()(const T& val) {
400*51c0b2f7Stbbdev         return val;
401*51c0b2f7Stbbdev     }
402*51c0b2f7Stbbdev };
403*51c0b2f7Stbbdev 
404*51c0b2f7Stbbdev /*
405*51c0b2f7Stbbdev     The test covers the case when a node with non-default mutex type is a predecessor for continue_node,
406*51c0b2f7Stbbdev     because there used to be a bug when make_edge(node, continue_node)
407*51c0b2f7Stbbdev     did not update continue_node's predecesosor threshold
408*51c0b2f7Stbbdev     since the specialization of node's successor_cache for a continue_node was not chosen.
409*51c0b2f7Stbbdev */
410*51c0b2f7Stbbdev void test_successor_cache_specialization() {
411*51c0b2f7Stbbdev     using namespace tbb::flow;
412*51c0b2f7Stbbdev 
413*51c0b2f7Stbbdev     graph g;
414*51c0b2f7Stbbdev 
415*51c0b2f7Stbbdev     broadcast_node<continue_msg> node_with_default_mutex_type(g);
416*51c0b2f7Stbbdev     buffer_node<continue_msg> node_with_non_default_mutex_type(g);
417*51c0b2f7Stbbdev 
418*51c0b2f7Stbbdev     continue_node<continue_msg> node(g, passing_body<continue_msg>());
419*51c0b2f7Stbbdev 
420*51c0b2f7Stbbdev     make_edge(node_with_default_mutex_type, node);
421*51c0b2f7Stbbdev     make_edge(node_with_non_default_mutex_type, node);
422*51c0b2f7Stbbdev 
423*51c0b2f7Stbbdev     buffer_node<continue_msg> buf(g);
424*51c0b2f7Stbbdev 
425*51c0b2f7Stbbdev     make_edge(node, buf);
426*51c0b2f7Stbbdev 
427*51c0b2f7Stbbdev     node_with_default_mutex_type.try_put(continue_msg());
428*51c0b2f7Stbbdev     node_with_non_default_mutex_type.try_put(continue_msg());
429*51c0b2f7Stbbdev 
430*51c0b2f7Stbbdev     g.wait_for_all();
431*51c0b2f7Stbbdev 
432*51c0b2f7Stbbdev     continue_msg storage;
433*51c0b2f7Stbbdev     CHECK_MESSAGE((buf.try_get(storage) && !buf.try_get(storage)),
434*51c0b2f7Stbbdev                   "Wrong number of messages is passed via continue_node");
435*51c0b2f7Stbbdev }
436*51c0b2f7Stbbdev 
437*51c0b2f7Stbbdev //! Test concurrent continue_node for correctness
438*51c0b2f7Stbbdev //! \brief \ref error_guessing
439*51c0b2f7Stbbdev TEST_CASE("Concurrency testing") {
440*51c0b2f7Stbbdev     for( unsigned p=utils::MinThread; p<=utils::MaxThread; ++p ) {
441*51c0b2f7Stbbdev         test_concurrency(p);
442*51c0b2f7Stbbdev     }
443*51c0b2f7Stbbdev }
444*51c0b2f7Stbbdev 
445*51c0b2f7Stbbdev //! Test concurrent continue_node in separate graphs
446*51c0b2f7Stbbdev //! \brief \ref error_guessing
447*51c0b2f7Stbbdev TEST_CASE("Two graphs") { test_two_graphs(); }
448*51c0b2f7Stbbdev 
449*51c0b2f7Stbbdev //! Test basic behaviour with lightweight body
450*51c0b2f7Stbbdev //! \brief \ref requirement \ref error_guessing
451*51c0b2f7Stbbdev TEST_CASE( "Lightweight policy" ) { test_lightweight_policy(); }
452*51c0b2f7Stbbdev 
453*51c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
454*51c0b2f7Stbbdev //! Test deprecated follows and preceedes API
455*51c0b2f7Stbbdev //! \brief \ref error_guessing
456*51c0b2f7Stbbdev TEST_CASE( "Support for follows and precedes API" ) { test_follows_and_precedes_api(); }
457*51c0b2f7Stbbdev #endif
458*51c0b2f7Stbbdev 
459*51c0b2f7Stbbdev #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
460*51c0b2f7Stbbdev //! Test deduction guides
461*51c0b2f7Stbbdev //! \brief requirement
462*51c0b2f7Stbbdev TEST_CASE( "Deduction guides" ) { test_deduction_guides(); }
463*51c0b2f7Stbbdev #endif
464*51c0b2f7Stbbdev 
465*51c0b2f7Stbbdev //! Test for successor cache specialization
466*51c0b2f7Stbbdev //! \brief \ref regression
467*51c0b2f7Stbbdev TEST_CASE( "Regression for successor cache specialization" ) {
468*51c0b2f7Stbbdev     test_successor_cache_specialization();
469*51c0b2f7Stbbdev }
470