xref: /oneTBB/test/tbb/test_continue_node.cpp (revision c4a799df)
151c0b2f7Stbbdev /*
2*c4a799dfSJhaShweta1     Copyright (c) 2005-2023 Intel Corporation
351c0b2f7Stbbdev 
451c0b2f7Stbbdev     Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev     you may not use this file except in compliance with the License.
651c0b2f7Stbbdev     You may obtain a copy of the License at
751c0b2f7Stbbdev 
851c0b2f7Stbbdev         http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev 
1051c0b2f7Stbbdev     Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev     distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev     See the License for the specific language governing permissions and
1451c0b2f7Stbbdev     limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev 
1751c0b2f7Stbbdev #include "common/config.h"
1851c0b2f7Stbbdev 
1951c0b2f7Stbbdev #include "tbb/flow_graph.h"
2051c0b2f7Stbbdev 
2151c0b2f7Stbbdev #include "common/test.h"
2251c0b2f7Stbbdev #include "common/utils.h"
2351c0b2f7Stbbdev #include "common/graph_utils.h"
2451c0b2f7Stbbdev #include "common/test_follows_and_precedes_api.h"
25478de5b1Stbbdev #include "common/concepts_common.h"
2651c0b2f7Stbbdev 
2751c0b2f7Stbbdev 
2851c0b2f7Stbbdev //! \file test_continue_node.cpp
2951c0b2f7Stbbdev //! \brief Test for [flow_graph.continue_node] specification
3051c0b2f7Stbbdev 
3151c0b2f7Stbbdev 
3251c0b2f7Stbbdev #define N 1000
3351c0b2f7Stbbdev #define MAX_NODES 4
3451c0b2f7Stbbdev #define C 8
3551c0b2f7Stbbdev 
3651c0b2f7Stbbdev // A class to use as a fake predecessor of continue_node
3751c0b2f7Stbbdev struct fake_continue_sender : public tbb::flow::sender<tbb::flow::continue_msg>
3851c0b2f7Stbbdev {
3951c0b2f7Stbbdev     typedef tbb::flow::sender<tbb::flow::continue_msg>::successor_type successor_type;
4051c0b2f7Stbbdev     // Define implementations of virtual methods that are abstract in the base class
register_successorfake_continue_sender4151c0b2f7Stbbdev     bool register_successor( successor_type& ) override { return false; }
remove_successorfake_continue_sender4251c0b2f7Stbbdev     bool remove_successor( successor_type& )   override { return false; }
4351c0b2f7Stbbdev };
4451c0b2f7Stbbdev 
4551c0b2f7Stbbdev template< typename InputType >
4651c0b2f7Stbbdev struct parallel_puts {
4751c0b2f7Stbbdev 
4851c0b2f7Stbbdev     tbb::flow::receiver< InputType > * const my_exe_node;
4951c0b2f7Stbbdev 
parallel_putsparallel_puts5051c0b2f7Stbbdev     parallel_puts( tbb::flow::receiver< InputType > &exe_node ) : my_exe_node(&exe_node) {}
5151c0b2f7Stbbdev     parallel_puts& operator=(const parallel_puts&) = delete;
5251c0b2f7Stbbdev 
operator ()parallel_puts5351c0b2f7Stbbdev     void operator()( int ) const  {
5451c0b2f7Stbbdev         for ( int i = 0; i < N; ++i ) {
5551c0b2f7Stbbdev             // the nodes will accept all puts
5651c0b2f7Stbbdev             CHECK_MESSAGE( my_exe_node->try_put( InputType() ) == true, "" );
5751c0b2f7Stbbdev         }
5851c0b2f7Stbbdev     }
5951c0b2f7Stbbdev 
6051c0b2f7Stbbdev };
6151c0b2f7Stbbdev 
6251c0b2f7Stbbdev template< typename OutputType >
run_continue_nodes(int p,tbb::flow::graph & g,tbb::flow::continue_node<OutputType> & n)6351c0b2f7Stbbdev void run_continue_nodes( int p, tbb::flow::graph& g, tbb::flow::continue_node< OutputType >& n ) {
6451c0b2f7Stbbdev     fake_continue_sender fake_sender;
6551c0b2f7Stbbdev     for (size_t i = 0; i < N; ++i) {
6651c0b2f7Stbbdev         tbb::detail::d1::register_predecessor(n, fake_sender);
6751c0b2f7Stbbdev     }
6851c0b2f7Stbbdev 
6951c0b2f7Stbbdev     for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
7051c0b2f7Stbbdev         std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers;
7151c0b2f7Stbbdev         for (size_t i = 0; i < num_receivers; ++i) {
7251c0b2f7Stbbdev             receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) );
7351c0b2f7Stbbdev         }
7451c0b2f7Stbbdev         harness_graph_executor<tbb::flow::continue_msg, OutputType>::execute_count = 0;
7551c0b2f7Stbbdev 
7651c0b2f7Stbbdev         for (size_t r = 0; r < num_receivers; ++r ) {
7751c0b2f7Stbbdev             tbb::flow::make_edge( n, *receivers[r] );
7851c0b2f7Stbbdev         }
7951c0b2f7Stbbdev 
8051c0b2f7Stbbdev         utils::NativeParallelFor( p, parallel_puts<tbb::flow::continue_msg>(n) );
8151c0b2f7Stbbdev         g.wait_for_all();
8251c0b2f7Stbbdev 
8351c0b2f7Stbbdev         // 2) the nodes will receive puts from multiple predecessors simultaneously,
8451c0b2f7Stbbdev         size_t ec = harness_graph_executor<tbb::flow::continue_msg, OutputType>::execute_count;
8551c0b2f7Stbbdev         CHECK_MESSAGE( (int)ec == p, "" );
8651c0b2f7Stbbdev         for (size_t r = 0; r < num_receivers; ++r ) {
8751c0b2f7Stbbdev             size_t c = receivers[r]->my_count;
8851c0b2f7Stbbdev             // 3) the nodes will send to multiple successors.
8951c0b2f7Stbbdev             CHECK_MESSAGE( (int)c == p, "" );
9051c0b2f7Stbbdev         }
9151c0b2f7Stbbdev 
9251c0b2f7Stbbdev         for (size_t r = 0; r < num_receivers; ++r ) {
9351c0b2f7Stbbdev             tbb::flow::remove_edge( n, *receivers[r] );
9451c0b2f7Stbbdev         }
9551c0b2f7Stbbdev     }
9651c0b2f7Stbbdev }
9751c0b2f7Stbbdev 
9851c0b2f7Stbbdev template< typename OutputType, typename Body >
continue_nodes(Body body)9951c0b2f7Stbbdev void continue_nodes( Body body ) {
10051c0b2f7Stbbdev     for (int p = 1; p < 2*4/*MaxThread*/; ++p) {
10151c0b2f7Stbbdev         tbb::flow::graph g;
10251c0b2f7Stbbdev         tbb::flow::continue_node< OutputType > exe_node( g, body );
10351c0b2f7Stbbdev         run_continue_nodes( p, g, exe_node);
10451c0b2f7Stbbdev         exe_node.try_put(tbb::flow::continue_msg());
10551c0b2f7Stbbdev         tbb::flow::continue_node< OutputType > exe_node_copy( exe_node );
10651c0b2f7Stbbdev         run_continue_nodes( p, g, exe_node_copy);
10751c0b2f7Stbbdev     }
10851c0b2f7Stbbdev }
10951c0b2f7Stbbdev 
11051c0b2f7Stbbdev const size_t Offset = 123;
11151c0b2f7Stbbdev std::atomic<size_t> global_execute_count;
11251c0b2f7Stbbdev 
11351c0b2f7Stbbdev template< typename OutputType >
11451c0b2f7Stbbdev struct inc_functor {
11551c0b2f7Stbbdev 
11651c0b2f7Stbbdev     std::atomic<size_t> local_execute_count;
inc_functorinc_functor11751c0b2f7Stbbdev     inc_functor( ) { local_execute_count = 0; }
inc_functorinc_functor11851c0b2f7Stbbdev     inc_functor( const inc_functor &f ) { local_execute_count = size_t(f.local_execute_count); }
operator =inc_functor11951c0b2f7Stbbdev     void operator=(const inc_functor &f) { local_execute_count = size_t(f.local_execute_count); }
12051c0b2f7Stbbdev 
operator ()inc_functor12151c0b2f7Stbbdev     OutputType operator()( tbb::flow::continue_msg ) {
12251c0b2f7Stbbdev        ++global_execute_count;
12351c0b2f7Stbbdev        ++local_execute_count;
12451c0b2f7Stbbdev        return OutputType();
12551c0b2f7Stbbdev     }
12651c0b2f7Stbbdev 
12751c0b2f7Stbbdev };
12851c0b2f7Stbbdev 
12951c0b2f7Stbbdev template< typename OutputType >
continue_nodes_with_copy()13051c0b2f7Stbbdev void continue_nodes_with_copy( ) {
13151c0b2f7Stbbdev 
13251c0b2f7Stbbdev     for (int p = 1; p < 2*4/*MaxThread*/; ++p) {
13351c0b2f7Stbbdev         tbb::flow::graph g;
13451c0b2f7Stbbdev         inc_functor<OutputType> cf;
13551c0b2f7Stbbdev         cf.local_execute_count = Offset;
13651c0b2f7Stbbdev         global_execute_count = Offset;
13751c0b2f7Stbbdev 
13851c0b2f7Stbbdev         tbb::flow::continue_node< OutputType > exe_node( g, cf );
13951c0b2f7Stbbdev         fake_continue_sender fake_sender;
14051c0b2f7Stbbdev         for (size_t i = 0; i < N; ++i) {
14151c0b2f7Stbbdev             tbb::detail::d1::register_predecessor(exe_node, fake_sender);
14251c0b2f7Stbbdev         }
14351c0b2f7Stbbdev 
14451c0b2f7Stbbdev         for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
14551c0b2f7Stbbdev             std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers;
14651c0b2f7Stbbdev             for (size_t i = 0; i < num_receivers; ++i) {
14751c0b2f7Stbbdev                 receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) );
14851c0b2f7Stbbdev             }
14951c0b2f7Stbbdev 
15051c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
15151c0b2f7Stbbdev                 tbb::flow::make_edge( exe_node, *receivers[r] );
15251c0b2f7Stbbdev             }
15351c0b2f7Stbbdev 
15451c0b2f7Stbbdev             utils::NativeParallelFor( p, parallel_puts<tbb::flow::continue_msg>(exe_node) );
15551c0b2f7Stbbdev             g.wait_for_all();
15651c0b2f7Stbbdev 
15751c0b2f7Stbbdev             // 2) the nodes will receive puts from multiple predecessors simultaneously,
15851c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
15951c0b2f7Stbbdev                 size_t c = receivers[r]->my_count;
16051c0b2f7Stbbdev                 // 3) the nodes will send to multiple successors.
16151c0b2f7Stbbdev                 CHECK_MESSAGE( (int)c == p, "" );
16251c0b2f7Stbbdev             }
16351c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
16451c0b2f7Stbbdev                 tbb::flow::remove_edge( exe_node, *receivers[r] );
16551c0b2f7Stbbdev             }
16651c0b2f7Stbbdev         }
16751c0b2f7Stbbdev 
16851c0b2f7Stbbdev         // validate that the local body matches the global execute_count and both are correct
16951c0b2f7Stbbdev         inc_functor<OutputType> body_copy = tbb::flow::copy_body< inc_functor<OutputType> >( exe_node );
17051c0b2f7Stbbdev         const size_t expected_count = p*MAX_NODES + Offset;
17151c0b2f7Stbbdev         size_t global_count = global_execute_count;
17251c0b2f7Stbbdev         size_t inc_count = body_copy.local_execute_count;
17351c0b2f7Stbbdev         CHECK_MESSAGE( global_count == expected_count, "" );
17451c0b2f7Stbbdev         CHECK_MESSAGE( global_count == inc_count, "" );
17551c0b2f7Stbbdev         g.reset(tbb::flow::rf_reset_bodies);
17651c0b2f7Stbbdev         body_copy = tbb::flow::copy_body< inc_functor<OutputType> >( exe_node );
17751c0b2f7Stbbdev         inc_count = body_copy.local_execute_count;
17851c0b2f7Stbbdev         CHECK_MESSAGE( ( Offset == inc_count), "reset(rf_reset_bodies) did not reset functor" );
17951c0b2f7Stbbdev 
18051c0b2f7Stbbdev     }
18151c0b2f7Stbbdev }
18251c0b2f7Stbbdev 
18351c0b2f7Stbbdev template< typename OutputType >
run_continue_nodes()18451c0b2f7Stbbdev void run_continue_nodes() {
18551c0b2f7Stbbdev     harness_graph_executor< tbb::flow::continue_msg, OutputType>::max_executors = 0;
18651c0b2f7Stbbdev     continue_nodes<OutputType>( []( tbb::flow::continue_msg i ) -> OutputType { return harness_graph_executor<tbb::flow::continue_msg, OutputType>::func(i); } );
18751c0b2f7Stbbdev     continue_nodes<OutputType>( &harness_graph_executor<tbb::flow::continue_msg, OutputType>::func );
18851c0b2f7Stbbdev     continue_nodes<OutputType>( typename harness_graph_executor<tbb::flow::continue_msg, OutputType>::functor() );
18951c0b2f7Stbbdev     continue_nodes_with_copy<OutputType>();
19051c0b2f7Stbbdev }
19151c0b2f7Stbbdev 
19251c0b2f7Stbbdev //! Tests limited concurrency cases for nodes that accept data messages
test_concurrency(int num_threads)19351c0b2f7Stbbdev void test_concurrency(int num_threads) {
19451c0b2f7Stbbdev     tbb::task_arena arena(num_threads);
19551c0b2f7Stbbdev     arena.execute(
19651c0b2f7Stbbdev         [&] {
19751c0b2f7Stbbdev             run_continue_nodes<tbb::flow::continue_msg>();
19851c0b2f7Stbbdev             run_continue_nodes<int>();
19951c0b2f7Stbbdev             run_continue_nodes<utils::NoAssign>();
20051c0b2f7Stbbdev         }
20151c0b2f7Stbbdev     );
20251c0b2f7Stbbdev }
20351c0b2f7Stbbdev /*
20451c0b2f7Stbbdev  * Connection of two graphs is not currently supported, but works to some limited extent.
20551c0b2f7Stbbdev  * This test is included to check for backward compatibility. It checks that a continue_node
20651c0b2f7Stbbdev  * with predecessors in two different graphs receives the required
20751c0b2f7Stbbdev  * number of continue messages before it executes.
20851c0b2f7Stbbdev  */
20951c0b2f7Stbbdev using namespace tbb::flow;
21051c0b2f7Stbbdev 
21151c0b2f7Stbbdev struct add_to_counter {
21251c0b2f7Stbbdev     int* counter;
add_to_counteradd_to_counter21351c0b2f7Stbbdev     add_to_counter(int& var):counter(&var){}
operator ()add_to_counter21451c0b2f7Stbbdev     void operator()(continue_msg){*counter+=1;}
21551c0b2f7Stbbdev };
21651c0b2f7Stbbdev 
test_two_graphs()21751c0b2f7Stbbdev void test_two_graphs(){
21851c0b2f7Stbbdev     int count=0;
21951c0b2f7Stbbdev 
22051c0b2f7Stbbdev     //graph g with broadcast_node and continue_node
22151c0b2f7Stbbdev     graph g;
22251c0b2f7Stbbdev     broadcast_node<continue_msg> start_g(g);
22351c0b2f7Stbbdev     continue_node<continue_msg> first_g(g, add_to_counter(count));
22451c0b2f7Stbbdev 
22551c0b2f7Stbbdev     //graph h with broadcast_node
22651c0b2f7Stbbdev     graph h;
22751c0b2f7Stbbdev     broadcast_node<continue_msg> start_h(h);
22851c0b2f7Stbbdev 
22951c0b2f7Stbbdev     //making two edges to first_g from the two graphs
23051c0b2f7Stbbdev     make_edge(start_g,first_g);
23151c0b2f7Stbbdev     make_edge(start_h, first_g);
23251c0b2f7Stbbdev 
23351c0b2f7Stbbdev     //two try_puts from the two graphs
23451c0b2f7Stbbdev     start_g.try_put(continue_msg());
23551c0b2f7Stbbdev     start_h.try_put(continue_msg());
23651c0b2f7Stbbdev     g.wait_for_all();
23751c0b2f7Stbbdev     CHECK_MESSAGE( (count==1), "Not all continue messages received");
23851c0b2f7Stbbdev 
23951c0b2f7Stbbdev     //two try_puts from the graph that doesn't contain the node
24051c0b2f7Stbbdev     count=0;
24151c0b2f7Stbbdev     start_h.try_put(continue_msg());
24251c0b2f7Stbbdev     start_h.try_put(continue_msg());
24351c0b2f7Stbbdev     g.wait_for_all();
24451c0b2f7Stbbdev     CHECK_MESSAGE( (count==1), "Not all continue messages received -1");
24551c0b2f7Stbbdev 
24651c0b2f7Stbbdev     //only one try_put
24751c0b2f7Stbbdev     count=0;
24851c0b2f7Stbbdev     start_g.try_put(continue_msg());
24951c0b2f7Stbbdev     g.wait_for_all();
25051c0b2f7Stbbdev     CHECK_MESSAGE( (count==0), "Node executed without waiting for all predecessors");
25151c0b2f7Stbbdev }
25251c0b2f7Stbbdev 
25351c0b2f7Stbbdev struct lightweight_policy_body {
25451c0b2f7Stbbdev     const std::thread::id my_thread_id;
25551c0b2f7Stbbdev     std::atomic<size_t>& my_count;
25651c0b2f7Stbbdev 
lightweight_policy_bodylightweight_policy_body25751c0b2f7Stbbdev     lightweight_policy_body( std::atomic<size_t>& count )
25851c0b2f7Stbbdev         : my_thread_id(std::this_thread::get_id()), my_count(count)
25951c0b2f7Stbbdev     {
26051c0b2f7Stbbdev         my_count = 0;
26151c0b2f7Stbbdev     }
2621168c5cbSvlserov 
2631168c5cbSvlserov     lightweight_policy_body( const lightweight_policy_body& ) = default;
26451c0b2f7Stbbdev     lightweight_policy_body& operator=( const lightweight_policy_body& ) = delete;
2651168c5cbSvlserov 
operator ()lightweight_policy_body26651c0b2f7Stbbdev     void operator()( tbb::flow::continue_msg ) {
26751c0b2f7Stbbdev         ++my_count;
26851c0b2f7Stbbdev         std::thread::id body_thread_id = std::this_thread::get_id();
26951c0b2f7Stbbdev         CHECK_MESSAGE( (body_thread_id == my_thread_id), "Body executed as not lightweight");
27051c0b2f7Stbbdev     }
27151c0b2f7Stbbdev };
27251c0b2f7Stbbdev 
test_lightweight_policy()27351c0b2f7Stbbdev void test_lightweight_policy() {
27451c0b2f7Stbbdev     tbb::flow::graph g;
27551c0b2f7Stbbdev     std::atomic<size_t> count1;
27651c0b2f7Stbbdev     std::atomic<size_t> count2;
27751c0b2f7Stbbdev     tbb::flow::continue_node<tbb::flow::continue_msg, tbb::flow::lightweight>
27851c0b2f7Stbbdev         node1(g, lightweight_policy_body(count1));
27951c0b2f7Stbbdev     tbb::flow::continue_node<tbb::flow::continue_msg, tbb::flow::lightweight>
28051c0b2f7Stbbdev         node2(g, lightweight_policy_body(count2));
28151c0b2f7Stbbdev 
28251c0b2f7Stbbdev     tbb::flow::make_edge(node1, node2);
28351c0b2f7Stbbdev     const size_t n = 10;
28451c0b2f7Stbbdev     for(size_t i = 0; i < n; ++i) {
28551c0b2f7Stbbdev         node1.try_put(tbb::flow::continue_msg());
28651c0b2f7Stbbdev     }
28751c0b2f7Stbbdev     g.wait_for_all();
28851c0b2f7Stbbdev 
28951c0b2f7Stbbdev     lightweight_policy_body body1 = tbb::flow::copy_body<lightweight_policy_body>(node1);
29051c0b2f7Stbbdev     lightweight_policy_body body2 = tbb::flow::copy_body<lightweight_policy_body>(node2);
29151c0b2f7Stbbdev     CHECK_MESSAGE( (body1.my_count == n), "Body of the first node needs to be executed N times");
29251c0b2f7Stbbdev     CHECK_MESSAGE( (body2.my_count == n), "Body of the second node needs to be executed N times");
29351c0b2f7Stbbdev }
29451c0b2f7Stbbdev 
29551c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
29651c0b2f7Stbbdev #include <array>
29751c0b2f7Stbbdev #include <vector>
test_follows_and_precedes_api()29851c0b2f7Stbbdev void test_follows_and_precedes_api() {
29951c0b2f7Stbbdev     using msg_t = tbb::flow::continue_msg;
30051c0b2f7Stbbdev 
30151c0b2f7Stbbdev     std::array<msg_t, 3> messages_for_follows = { { msg_t(), msg_t(), msg_t() } };
30251c0b2f7Stbbdev     std::vector<msg_t> messages_for_precedes  = { msg_t() };
30351c0b2f7Stbbdev 
30451c0b2f7Stbbdev     auto pass_through = [](const msg_t& msg) { return msg; };
30551c0b2f7Stbbdev 
30651c0b2f7Stbbdev     follows_and_precedes_testing::test_follows
30751c0b2f7Stbbdev         <msg_t, tbb::flow::continue_node<msg_t>>
30851c0b2f7Stbbdev         (messages_for_follows, pass_through, node_priority_t(0));
30951c0b2f7Stbbdev 
31051c0b2f7Stbbdev     follows_and_precedes_testing::test_precedes
31151c0b2f7Stbbdev         <msg_t, tbb::flow::continue_node<msg_t>>
31251c0b2f7Stbbdev         (messages_for_precedes, /* number_of_predecessors = */0, pass_through, node_priority_t(1));
31351c0b2f7Stbbdev }
31451c0b2f7Stbbdev #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
31551c0b2f7Stbbdev 
31651c0b2f7Stbbdev // TODO: use pass_through from test_function_node instead
31751c0b2f7Stbbdev template<typename T>
31851c0b2f7Stbbdev struct passing_body {
operator ()passing_body31951c0b2f7Stbbdev     T operator()(const T& val) {
32051c0b2f7Stbbdev         return val;
32151c0b2f7Stbbdev     }
32251c0b2f7Stbbdev };
32351c0b2f7Stbbdev 
32451c0b2f7Stbbdev /*
32551c0b2f7Stbbdev     The test covers the case when a node with non-default mutex type is a predecessor for continue_node,
32651c0b2f7Stbbdev     because there used to be a bug when make_edge(node, continue_node)
32751c0b2f7Stbbdev     did not update continue_node's predecesosor threshold
32851c0b2f7Stbbdev     since the specialization of node's successor_cache for a continue_node was not chosen.
32951c0b2f7Stbbdev */
test_successor_cache_specialization()33051c0b2f7Stbbdev void test_successor_cache_specialization() {
33151c0b2f7Stbbdev     using namespace tbb::flow;
33251c0b2f7Stbbdev 
33351c0b2f7Stbbdev     graph g;
33451c0b2f7Stbbdev 
33551c0b2f7Stbbdev     broadcast_node<continue_msg> node_with_default_mutex_type(g);
33651c0b2f7Stbbdev     buffer_node<continue_msg> node_with_non_default_mutex_type(g);
33751c0b2f7Stbbdev 
33851c0b2f7Stbbdev     continue_node<continue_msg> node(g, passing_body<continue_msg>());
33951c0b2f7Stbbdev 
34051c0b2f7Stbbdev     make_edge(node_with_default_mutex_type, node);
34151c0b2f7Stbbdev     make_edge(node_with_non_default_mutex_type, node);
34251c0b2f7Stbbdev 
34351c0b2f7Stbbdev     buffer_node<continue_msg> buf(g);
34451c0b2f7Stbbdev 
34551c0b2f7Stbbdev     make_edge(node, buf);
34651c0b2f7Stbbdev 
34751c0b2f7Stbbdev     node_with_default_mutex_type.try_put(continue_msg());
34851c0b2f7Stbbdev     node_with_non_default_mutex_type.try_put(continue_msg());
34951c0b2f7Stbbdev 
35051c0b2f7Stbbdev     g.wait_for_all();
35151c0b2f7Stbbdev 
35251c0b2f7Stbbdev     continue_msg storage;
35351c0b2f7Stbbdev     CHECK_MESSAGE((buf.try_get(storage) && !buf.try_get(storage)),
35451c0b2f7Stbbdev                   "Wrong number of messages is passed via continue_node");
35551c0b2f7Stbbdev }
35651c0b2f7Stbbdev 
35751c0b2f7Stbbdev //! Test concurrent continue_node for correctness
35851c0b2f7Stbbdev //! \brief \ref error_guessing
35951c0b2f7Stbbdev TEST_CASE("Concurrency testing") {
36051c0b2f7Stbbdev     for( unsigned p=utils::MinThread; p<=utils::MaxThread; ++p ) {
36151c0b2f7Stbbdev         test_concurrency(p);
36251c0b2f7Stbbdev     }
36351c0b2f7Stbbdev }
36451c0b2f7Stbbdev 
36551c0b2f7Stbbdev //! Test concurrent continue_node in separate graphs
36651c0b2f7Stbbdev //! \brief \ref error_guessing
36751c0b2f7Stbbdev TEST_CASE("Two graphs") { test_two_graphs(); }
36851c0b2f7Stbbdev 
36951c0b2f7Stbbdev //! Test basic behaviour with lightweight body
37051c0b2f7Stbbdev //! \brief \ref requirement \ref error_guessing
37151c0b2f7Stbbdev TEST_CASE( "Lightweight policy" ) { test_lightweight_policy(); }
37251c0b2f7Stbbdev 
37351c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
374*c4a799dfSJhaShweta1 //! Test deprecated follows and precedes API
37551c0b2f7Stbbdev //! \brief \ref error_guessing
37651c0b2f7Stbbdev TEST_CASE( "Support for follows and precedes API" ) { test_follows_and_precedes_api(); }
37751c0b2f7Stbbdev #endif
37851c0b2f7Stbbdev 
37951c0b2f7Stbbdev //! Test for successor cache specialization
38051c0b2f7Stbbdev //! \brief \ref regression
38151c0b2f7Stbbdev TEST_CASE( "Regression for successor cache specialization" ) {
38251c0b2f7Stbbdev     test_successor_cache_specialization();
38351c0b2f7Stbbdev }
384478de5b1Stbbdev 
385478de5b1Stbbdev #if __TBB_CPP20_CONCEPTS_PRESENT
386478de5b1Stbbdev //! \brief \ref error_guessing
387478de5b1Stbbdev TEST_CASE("constraints for continue_node input") {
388478de5b1Stbbdev     static_assert(utils::well_formed_instantiation<tbb::flow::continue_node, test_concepts::Copyable>);
389478de5b1Stbbdev     static_assert(!utils::well_formed_instantiation<tbb::flow::continue_node, test_concepts::NonCopyable>);
390478de5b1Stbbdev }
391478de5b1Stbbdev 
392478de5b1Stbbdev template <typename Input, typename Body>
393478de5b1Stbbdev concept can_call_continue_node_ctor = requires( tbb::flow::graph& graph, Body body,
394478de5b1Stbbdev                                                 tbb::flow::buffer_node<int>& f, std::size_t num,
395478de5b1Stbbdev                                                 tbb::flow::node_priority_t priority  ) {
396478de5b1Stbbdev     tbb::flow::continue_node<Input>(graph, body);
397478de5b1Stbbdev     tbb::flow::continue_node<Input>(graph, body, priority);
398478de5b1Stbbdev     tbb::flow::continue_node<Input>(graph, num, body);
399478de5b1Stbbdev     tbb::flow::continue_node<Input>(graph, num, body, priority);
400478de5b1Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
401478de5b1Stbbdev     tbb::flow::continue_node<Input>(tbb::flow::follows(f), body);
402478de5b1Stbbdev     tbb::flow::continue_node<Input>(tbb::flow::follows(f), body, priority);
403478de5b1Stbbdev     tbb::flow::continue_node<Input>(tbb::flow::follows(f), num, body);
404478de5b1Stbbdev     tbb::flow::continue_node<Input>(tbb::flow::follows(f), num, body, priority);
405478de5b1Stbbdev #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
406478de5b1Stbbdev };
407478de5b1Stbbdev 
408478de5b1Stbbdev //! \brief \ref error_guessing
409478de5b1Stbbdev TEST_CASE("constraints for continue_node body") {
410478de5b1Stbbdev     using output_type = int;
411478de5b1Stbbdev     using namespace test_concepts::continue_node_body;
412478de5b1Stbbdev 
413478de5b1Stbbdev     static_assert(can_call_continue_node_ctor<output_type, Correct<output_type>>);
414478de5b1Stbbdev     static_assert(!can_call_continue_node_ctor<output_type, NonCopyable<output_type>>);
415478de5b1Stbbdev     static_assert(!can_call_continue_node_ctor<output_type, NonDestructible<output_type>>);
416478de5b1Stbbdev     static_assert(!can_call_continue_node_ctor<output_type, NoOperatorRoundBrackets<output_type>>);
417478de5b1Stbbdev     static_assert(!can_call_continue_node_ctor<output_type, WrongInputOperatorRoundBrackets<output_type>>);
418478de5b1Stbbdev     static_assert(!can_call_continue_node_ctor<output_type, WrongReturnOperatorRoundBrackets<output_type>>);
419478de5b1Stbbdev }
420478de5b1Stbbdev #endif // __TBB_CPP20_CONCEPTS_PRESENT
421