xref: /oneTBB/test/tbb/test_continue_node.cpp (revision b15aabb3)
151c0b2f7Stbbdev /*
2*b15aabb3Stbbdev     Copyright (c) 2005-2021 Intel Corporation
351c0b2f7Stbbdev 
451c0b2f7Stbbdev     Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev     you may not use this file except in compliance with the License.
651c0b2f7Stbbdev     You may obtain a copy of the License at
751c0b2f7Stbbdev 
851c0b2f7Stbbdev         http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev 
1051c0b2f7Stbbdev     Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev     distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev     See the License for the specific language governing permissions and
1451c0b2f7Stbbdev     limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev 
1751c0b2f7Stbbdev #include "common/config.h"
1851c0b2f7Stbbdev 
1951c0b2f7Stbbdev // TODO revamp: move parts dependent on __TBB_EXTRA_DEBUG into separate test(s) since having these
2051c0b2f7Stbbdev // parts in all of tests might make testing of the product, which is different from what is actually
2151c0b2f7Stbbdev // released.
2251c0b2f7Stbbdev #define __TBB_EXTRA_DEBUG 1
2351c0b2f7Stbbdev #include "tbb/flow_graph.h"
2451c0b2f7Stbbdev 
2551c0b2f7Stbbdev #include "common/test.h"
2651c0b2f7Stbbdev #include "common/utils.h"
2751c0b2f7Stbbdev #include "common/graph_utils.h"
2851c0b2f7Stbbdev #include "common/test_follows_and_precedes_api.h"
2951c0b2f7Stbbdev 
3051c0b2f7Stbbdev 
3151c0b2f7Stbbdev //! \file test_continue_node.cpp
3251c0b2f7Stbbdev //! \brief Test for [flow_graph.continue_node] specification
3351c0b2f7Stbbdev 
3451c0b2f7Stbbdev 
3551c0b2f7Stbbdev #define N 1000
3651c0b2f7Stbbdev #define MAX_NODES 4
3751c0b2f7Stbbdev #define C 8
3851c0b2f7Stbbdev 
3951c0b2f7Stbbdev // A class to use as a fake predecessor of continue_node
4051c0b2f7Stbbdev struct fake_continue_sender : public tbb::flow::sender<tbb::flow::continue_msg>
4151c0b2f7Stbbdev {
4251c0b2f7Stbbdev     typedef tbb::flow::sender<tbb::flow::continue_msg>::successor_type successor_type;
4351c0b2f7Stbbdev     // Define implementations of virtual methods that are abstract in the base class
4451c0b2f7Stbbdev     bool register_successor( successor_type& ) override { return false; }
4551c0b2f7Stbbdev     bool remove_successor( successor_type& )   override { return false; }
4651c0b2f7Stbbdev };
4751c0b2f7Stbbdev 
4851c0b2f7Stbbdev template< typename InputType >
4951c0b2f7Stbbdev struct parallel_puts {
5051c0b2f7Stbbdev 
5151c0b2f7Stbbdev     tbb::flow::receiver< InputType > * const my_exe_node;
5251c0b2f7Stbbdev 
5351c0b2f7Stbbdev     parallel_puts( tbb::flow::receiver< InputType > &exe_node ) : my_exe_node(&exe_node) {}
5451c0b2f7Stbbdev     parallel_puts& operator=(const parallel_puts&) = delete;
5551c0b2f7Stbbdev 
5651c0b2f7Stbbdev     void operator()( int ) const  {
5751c0b2f7Stbbdev         for ( int i = 0; i < N; ++i ) {
5851c0b2f7Stbbdev             // the nodes will accept all puts
5951c0b2f7Stbbdev             CHECK_MESSAGE( my_exe_node->try_put( InputType() ) == true, "" );
6051c0b2f7Stbbdev         }
6151c0b2f7Stbbdev     }
6251c0b2f7Stbbdev 
6351c0b2f7Stbbdev };
6451c0b2f7Stbbdev 
6551c0b2f7Stbbdev template< typename OutputType >
6651c0b2f7Stbbdev void run_continue_nodes( int p, tbb::flow::graph& g, tbb::flow::continue_node< OutputType >& n ) {
6751c0b2f7Stbbdev     fake_continue_sender fake_sender;
6851c0b2f7Stbbdev     for (size_t i = 0; i < N; ++i) {
6951c0b2f7Stbbdev         tbb::detail::d1::register_predecessor(n, fake_sender);
7051c0b2f7Stbbdev     }
7151c0b2f7Stbbdev 
7251c0b2f7Stbbdev     for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
7351c0b2f7Stbbdev         std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers;
7451c0b2f7Stbbdev         for (size_t i = 0; i < num_receivers; ++i) {
7551c0b2f7Stbbdev             receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) );
7651c0b2f7Stbbdev         }
7751c0b2f7Stbbdev         harness_graph_executor<tbb::flow::continue_msg, OutputType>::execute_count = 0;
7851c0b2f7Stbbdev 
7951c0b2f7Stbbdev         for (size_t r = 0; r < num_receivers; ++r ) {
8051c0b2f7Stbbdev             tbb::flow::make_edge( n, *receivers[r] );
8151c0b2f7Stbbdev         }
8251c0b2f7Stbbdev 
8351c0b2f7Stbbdev         utils::NativeParallelFor( p, parallel_puts<tbb::flow::continue_msg>(n) );
8451c0b2f7Stbbdev         g.wait_for_all();
8551c0b2f7Stbbdev 
8651c0b2f7Stbbdev         // 2) the nodes will receive puts from multiple predecessors simultaneously,
8751c0b2f7Stbbdev         size_t ec = harness_graph_executor<tbb::flow::continue_msg, OutputType>::execute_count;
8851c0b2f7Stbbdev         CHECK_MESSAGE( (int)ec == p, "" );
8951c0b2f7Stbbdev         for (size_t r = 0; r < num_receivers; ++r ) {
9051c0b2f7Stbbdev             size_t c = receivers[r]->my_count;
9151c0b2f7Stbbdev             // 3) the nodes will send to multiple successors.
9251c0b2f7Stbbdev             CHECK_MESSAGE( (int)c == p, "" );
9351c0b2f7Stbbdev         }
9451c0b2f7Stbbdev 
9551c0b2f7Stbbdev         for (size_t r = 0; r < num_receivers; ++r ) {
9651c0b2f7Stbbdev             tbb::flow::remove_edge( n, *receivers[r] );
9751c0b2f7Stbbdev         }
9851c0b2f7Stbbdev     }
9951c0b2f7Stbbdev }
10051c0b2f7Stbbdev 
10151c0b2f7Stbbdev template< typename OutputType, typename Body >
10251c0b2f7Stbbdev void continue_nodes( Body body ) {
10351c0b2f7Stbbdev     for (int p = 1; p < 2*4/*MaxThread*/; ++p) {
10451c0b2f7Stbbdev         tbb::flow::graph g;
10551c0b2f7Stbbdev         tbb::flow::continue_node< OutputType > exe_node( g, body );
10651c0b2f7Stbbdev         run_continue_nodes( p, g, exe_node);
10751c0b2f7Stbbdev         exe_node.try_put(tbb::flow::continue_msg());
10851c0b2f7Stbbdev         tbb::flow::continue_node< OutputType > exe_node_copy( exe_node );
10951c0b2f7Stbbdev         run_continue_nodes( p, g, exe_node_copy);
11051c0b2f7Stbbdev     }
11151c0b2f7Stbbdev }
11251c0b2f7Stbbdev 
11351c0b2f7Stbbdev const size_t Offset = 123;
11451c0b2f7Stbbdev std::atomic<size_t> global_execute_count;
11551c0b2f7Stbbdev 
11651c0b2f7Stbbdev template< typename OutputType >
11751c0b2f7Stbbdev struct inc_functor {
11851c0b2f7Stbbdev 
11951c0b2f7Stbbdev     std::atomic<size_t> local_execute_count;
12051c0b2f7Stbbdev     inc_functor( ) { local_execute_count = 0; }
12151c0b2f7Stbbdev     inc_functor( const inc_functor &f ) { local_execute_count = size_t(f.local_execute_count); }
12251c0b2f7Stbbdev     void operator=(const inc_functor &f) { local_execute_count = size_t(f.local_execute_count); }
12351c0b2f7Stbbdev 
12451c0b2f7Stbbdev     OutputType operator()( tbb::flow::continue_msg ) {
12551c0b2f7Stbbdev        ++global_execute_count;
12651c0b2f7Stbbdev        ++local_execute_count;
12751c0b2f7Stbbdev        return OutputType();
12851c0b2f7Stbbdev     }
12951c0b2f7Stbbdev 
13051c0b2f7Stbbdev };
13151c0b2f7Stbbdev 
13251c0b2f7Stbbdev template< typename OutputType >
13351c0b2f7Stbbdev void continue_nodes_with_copy( ) {
13451c0b2f7Stbbdev 
13551c0b2f7Stbbdev     for (int p = 1; p < 2*4/*MaxThread*/; ++p) {
13651c0b2f7Stbbdev         tbb::flow::graph g;
13751c0b2f7Stbbdev         inc_functor<OutputType> cf;
13851c0b2f7Stbbdev         cf.local_execute_count = Offset;
13951c0b2f7Stbbdev         global_execute_count = Offset;
14051c0b2f7Stbbdev 
14151c0b2f7Stbbdev         tbb::flow::continue_node< OutputType > exe_node( g, cf );
14251c0b2f7Stbbdev         fake_continue_sender fake_sender;
14351c0b2f7Stbbdev         for (size_t i = 0; i < N; ++i) {
14451c0b2f7Stbbdev             tbb::detail::d1::register_predecessor(exe_node, fake_sender);
14551c0b2f7Stbbdev         }
14651c0b2f7Stbbdev 
14751c0b2f7Stbbdev         for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
14851c0b2f7Stbbdev             std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers;
14951c0b2f7Stbbdev             for (size_t i = 0; i < num_receivers; ++i) {
15051c0b2f7Stbbdev                 receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) );
15151c0b2f7Stbbdev             }
15251c0b2f7Stbbdev 
15351c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
15451c0b2f7Stbbdev                 tbb::flow::make_edge( exe_node, *receivers[r] );
15551c0b2f7Stbbdev             }
15651c0b2f7Stbbdev 
15751c0b2f7Stbbdev             utils::NativeParallelFor( p, parallel_puts<tbb::flow::continue_msg>(exe_node) );
15851c0b2f7Stbbdev             g.wait_for_all();
15951c0b2f7Stbbdev 
16051c0b2f7Stbbdev             // 2) the nodes will receive puts from multiple predecessors simultaneously,
16151c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
16251c0b2f7Stbbdev                 size_t c = receivers[r]->my_count;
16351c0b2f7Stbbdev                 // 3) the nodes will send to multiple successors.
16451c0b2f7Stbbdev                 CHECK_MESSAGE( (int)c == p, "" );
16551c0b2f7Stbbdev             }
16651c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
16751c0b2f7Stbbdev                 tbb::flow::remove_edge( exe_node, *receivers[r] );
16851c0b2f7Stbbdev             }
16951c0b2f7Stbbdev         }
17051c0b2f7Stbbdev 
17151c0b2f7Stbbdev         // validate that the local body matches the global execute_count and both are correct
17251c0b2f7Stbbdev         inc_functor<OutputType> body_copy = tbb::flow::copy_body< inc_functor<OutputType> >( exe_node );
17351c0b2f7Stbbdev         const size_t expected_count = p*MAX_NODES + Offset;
17451c0b2f7Stbbdev         size_t global_count = global_execute_count;
17551c0b2f7Stbbdev         size_t inc_count = body_copy.local_execute_count;
17651c0b2f7Stbbdev         CHECK_MESSAGE( global_count == expected_count, "" );
17751c0b2f7Stbbdev         CHECK_MESSAGE( global_count == inc_count, "" );
17851c0b2f7Stbbdev         g.reset(tbb::flow::rf_reset_bodies);
17951c0b2f7Stbbdev         body_copy = tbb::flow::copy_body< inc_functor<OutputType> >( exe_node );
18051c0b2f7Stbbdev         inc_count = body_copy.local_execute_count;
18151c0b2f7Stbbdev         CHECK_MESSAGE( ( Offset == inc_count), "reset(rf_reset_bodies) did not reset functor" );
18251c0b2f7Stbbdev 
18351c0b2f7Stbbdev     }
18451c0b2f7Stbbdev }
18551c0b2f7Stbbdev 
18651c0b2f7Stbbdev template< typename OutputType >
18751c0b2f7Stbbdev void run_continue_nodes() {
18851c0b2f7Stbbdev     harness_graph_executor< tbb::flow::continue_msg, OutputType>::max_executors = 0;
18951c0b2f7Stbbdev     continue_nodes<OutputType>( []( tbb::flow::continue_msg i ) -> OutputType { return harness_graph_executor<tbb::flow::continue_msg, OutputType>::func(i); } );
19051c0b2f7Stbbdev     continue_nodes<OutputType>( &harness_graph_executor<tbb::flow::continue_msg, OutputType>::func );
19151c0b2f7Stbbdev     continue_nodes<OutputType>( typename harness_graph_executor<tbb::flow::continue_msg, OutputType>::functor() );
19251c0b2f7Stbbdev     continue_nodes_with_copy<OutputType>();
19351c0b2f7Stbbdev }
19451c0b2f7Stbbdev 
19551c0b2f7Stbbdev //! Tests limited concurrency cases for nodes that accept data messages
19651c0b2f7Stbbdev void test_concurrency(int num_threads) {
19751c0b2f7Stbbdev     tbb::task_arena arena(num_threads);
19851c0b2f7Stbbdev     arena.execute(
19951c0b2f7Stbbdev         [&] {
20051c0b2f7Stbbdev             run_continue_nodes<tbb::flow::continue_msg>();
20151c0b2f7Stbbdev             run_continue_nodes<int>();
20251c0b2f7Stbbdev             run_continue_nodes<utils::NoAssign>();
20351c0b2f7Stbbdev         }
20451c0b2f7Stbbdev     );
20551c0b2f7Stbbdev }
20651c0b2f7Stbbdev /*
20751c0b2f7Stbbdev  * Connection of two graphs is not currently supported, but works to some limited extent.
20851c0b2f7Stbbdev  * This test is included to check for backward compatibility. It checks that a continue_node
20951c0b2f7Stbbdev  * with predecessors in two different graphs receives the required
21051c0b2f7Stbbdev  * number of continue messages before it executes.
21151c0b2f7Stbbdev  */
21251c0b2f7Stbbdev using namespace tbb::flow;
21351c0b2f7Stbbdev 
21451c0b2f7Stbbdev struct add_to_counter {
21551c0b2f7Stbbdev     int* counter;
21651c0b2f7Stbbdev     add_to_counter(int& var):counter(&var){}
21751c0b2f7Stbbdev     void operator()(continue_msg){*counter+=1;}
21851c0b2f7Stbbdev };
21951c0b2f7Stbbdev 
22051c0b2f7Stbbdev void test_two_graphs(){
22151c0b2f7Stbbdev     int count=0;
22251c0b2f7Stbbdev 
22351c0b2f7Stbbdev     //graph g with broadcast_node and continue_node
22451c0b2f7Stbbdev     graph g;
22551c0b2f7Stbbdev     broadcast_node<continue_msg> start_g(g);
22651c0b2f7Stbbdev     continue_node<continue_msg> first_g(g, add_to_counter(count));
22751c0b2f7Stbbdev 
22851c0b2f7Stbbdev     //graph h with broadcast_node
22951c0b2f7Stbbdev     graph h;
23051c0b2f7Stbbdev     broadcast_node<continue_msg> start_h(h);
23151c0b2f7Stbbdev 
23251c0b2f7Stbbdev     //making two edges to first_g from the two graphs
23351c0b2f7Stbbdev     make_edge(start_g,first_g);
23451c0b2f7Stbbdev     make_edge(start_h, first_g);
23551c0b2f7Stbbdev 
23651c0b2f7Stbbdev     //two try_puts from the two graphs
23751c0b2f7Stbbdev     start_g.try_put(continue_msg());
23851c0b2f7Stbbdev     start_h.try_put(continue_msg());
23951c0b2f7Stbbdev     g.wait_for_all();
24051c0b2f7Stbbdev     CHECK_MESSAGE( (count==1), "Not all continue messages received");
24151c0b2f7Stbbdev 
24251c0b2f7Stbbdev     //two try_puts from the graph that doesn't contain the node
24351c0b2f7Stbbdev     count=0;
24451c0b2f7Stbbdev     start_h.try_put(continue_msg());
24551c0b2f7Stbbdev     start_h.try_put(continue_msg());
24651c0b2f7Stbbdev     g.wait_for_all();
24751c0b2f7Stbbdev     CHECK_MESSAGE( (count==1), "Not all continue messages received -1");
24851c0b2f7Stbbdev 
24951c0b2f7Stbbdev     //only one try_put
25051c0b2f7Stbbdev     count=0;
25151c0b2f7Stbbdev     start_g.try_put(continue_msg());
25251c0b2f7Stbbdev     g.wait_for_all();
25351c0b2f7Stbbdev     CHECK_MESSAGE( (count==0), "Node executed without waiting for all predecessors");
25451c0b2f7Stbbdev }
25551c0b2f7Stbbdev 
25651c0b2f7Stbbdev struct lightweight_policy_body {
25751c0b2f7Stbbdev     const std::thread::id my_thread_id;
25851c0b2f7Stbbdev     std::atomic<size_t>& my_count;
25951c0b2f7Stbbdev 
26051c0b2f7Stbbdev     lightweight_policy_body( std::atomic<size_t>& count )
26151c0b2f7Stbbdev         : my_thread_id(std::this_thread::get_id()), my_count(count)
26251c0b2f7Stbbdev     {
26351c0b2f7Stbbdev         my_count = 0;
26451c0b2f7Stbbdev     }
26551c0b2f7Stbbdev     lightweight_policy_body& operator=(const lightweight_policy_body&) = delete;
26651c0b2f7Stbbdev     void operator()(tbb::flow::continue_msg) {
26751c0b2f7Stbbdev         ++my_count;
26851c0b2f7Stbbdev         std::thread::id body_thread_id = std::this_thread::get_id();
26951c0b2f7Stbbdev         CHECK_MESSAGE( (body_thread_id == my_thread_id), "Body executed as not lightweight");
27051c0b2f7Stbbdev     }
27151c0b2f7Stbbdev };
27251c0b2f7Stbbdev 
27351c0b2f7Stbbdev void test_lightweight_policy() {
27451c0b2f7Stbbdev     tbb::flow::graph g;
27551c0b2f7Stbbdev     std::atomic<size_t> count1;
27651c0b2f7Stbbdev     std::atomic<size_t> count2;
27751c0b2f7Stbbdev     tbb::flow::continue_node<tbb::flow::continue_msg, tbb::flow::lightweight>
27851c0b2f7Stbbdev         node1(g, lightweight_policy_body(count1));
27951c0b2f7Stbbdev     tbb::flow::continue_node<tbb::flow::continue_msg, tbb::flow::lightweight>
28051c0b2f7Stbbdev         node2(g, lightweight_policy_body(count2));
28151c0b2f7Stbbdev 
28251c0b2f7Stbbdev     tbb::flow::make_edge(node1, node2);
28351c0b2f7Stbbdev     const size_t n = 10;
28451c0b2f7Stbbdev     for(size_t i = 0; i < n; ++i) {
28551c0b2f7Stbbdev         node1.try_put(tbb::flow::continue_msg());
28651c0b2f7Stbbdev     }
28751c0b2f7Stbbdev     g.wait_for_all();
28851c0b2f7Stbbdev 
28951c0b2f7Stbbdev     lightweight_policy_body body1 = tbb::flow::copy_body<lightweight_policy_body>(node1);
29051c0b2f7Stbbdev     lightweight_policy_body body2 = tbb::flow::copy_body<lightweight_policy_body>(node2);
29151c0b2f7Stbbdev     CHECK_MESSAGE( (body1.my_count == n), "Body of the first node needs to be executed N times");
29251c0b2f7Stbbdev     CHECK_MESSAGE( (body2.my_count == n), "Body of the second node needs to be executed N times");
29351c0b2f7Stbbdev }
29451c0b2f7Stbbdev 
29551c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
29651c0b2f7Stbbdev #include <array>
29751c0b2f7Stbbdev #include <vector>
29851c0b2f7Stbbdev void test_follows_and_precedes_api() {
29951c0b2f7Stbbdev     using msg_t = tbb::flow::continue_msg;
30051c0b2f7Stbbdev 
30151c0b2f7Stbbdev     std::array<msg_t, 3> messages_for_follows = { { msg_t(), msg_t(), msg_t() } };
30251c0b2f7Stbbdev     std::vector<msg_t> messages_for_precedes  = { msg_t() };
30351c0b2f7Stbbdev 
30451c0b2f7Stbbdev     auto pass_through = [](const msg_t& msg) { return msg; };
30551c0b2f7Stbbdev 
30651c0b2f7Stbbdev     follows_and_precedes_testing::test_follows
30751c0b2f7Stbbdev         <msg_t, tbb::flow::continue_node<msg_t>>
30851c0b2f7Stbbdev         (messages_for_follows, pass_through, node_priority_t(0));
30951c0b2f7Stbbdev 
31051c0b2f7Stbbdev     follows_and_precedes_testing::test_precedes
31151c0b2f7Stbbdev         <msg_t, tbb::flow::continue_node<msg_t>>
31251c0b2f7Stbbdev         (messages_for_precedes, /* number_of_predecessors = */0, pass_through, node_priority_t(1));
31351c0b2f7Stbbdev }
31451c0b2f7Stbbdev #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
31551c0b2f7Stbbdev 
31651c0b2f7Stbbdev #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
31751c0b2f7Stbbdev 
31851c0b2f7Stbbdev template <typename ExpectedType, typename Body>
31951c0b2f7Stbbdev void test_deduction_guides_common(Body body) {
32051c0b2f7Stbbdev     using namespace tbb::flow;
32151c0b2f7Stbbdev     graph g;
32251c0b2f7Stbbdev 
32351c0b2f7Stbbdev     continue_node c1(g, body);
32451c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(c1), continue_node<ExpectedType>>);
32551c0b2f7Stbbdev 
32651c0b2f7Stbbdev     continue_node c2(g, body, lightweight());
32751c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(c2), continue_node<ExpectedType, lightweight>>);
32851c0b2f7Stbbdev 
32951c0b2f7Stbbdev     continue_node c3(g, 5, body);
33051c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(c3), continue_node<ExpectedType>>);
33151c0b2f7Stbbdev 
33251c0b2f7Stbbdev     continue_node c4(g, 5, body, lightweight());
33351c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(c4), continue_node<ExpectedType, lightweight>>);
33451c0b2f7Stbbdev 
33551c0b2f7Stbbdev     continue_node c5(g, body, node_priority_t(5));
33651c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(c5), continue_node<ExpectedType>>);
33751c0b2f7Stbbdev 
33851c0b2f7Stbbdev     continue_node c6(g, body, lightweight(), node_priority_t(5));
33951c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(c6), continue_node<ExpectedType, lightweight>>);
34051c0b2f7Stbbdev 
34151c0b2f7Stbbdev     continue_node c7(g, 5, body, node_priority_t(5));
34251c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(c7), continue_node<ExpectedType>>);
34351c0b2f7Stbbdev 
34451c0b2f7Stbbdev     continue_node c8(g, 5, body, lightweight(), node_priority_t(5));
34551c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(c8), continue_node<ExpectedType, lightweight>>);
34651c0b2f7Stbbdev 
34751c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
34851c0b2f7Stbbdev     broadcast_node<continue_msg> b(g);
34951c0b2f7Stbbdev 
35051c0b2f7Stbbdev     continue_node c9(follows(b), body);
35151c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(c9), continue_node<ExpectedType>>);
35251c0b2f7Stbbdev 
35351c0b2f7Stbbdev     continue_node c10(follows(b), body, lightweight());
35451c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(c10), continue_node<ExpectedType, lightweight>>);
35551c0b2f7Stbbdev 
35651c0b2f7Stbbdev     continue_node c11(follows(b), 5, body);
35751c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(c11), continue_node<ExpectedType>>);
35851c0b2f7Stbbdev 
35951c0b2f7Stbbdev     continue_node c12(follows(b), 5, body, lightweight());
36051c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(c12), continue_node<ExpectedType, lightweight>>);
36151c0b2f7Stbbdev 
36251c0b2f7Stbbdev     continue_node c13(follows(b), body, node_priority_t(5));
36351c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(c13), continue_node<ExpectedType>>);
36451c0b2f7Stbbdev 
36551c0b2f7Stbbdev     continue_node c14(follows(b), body, lightweight(), node_priority_t(5));
36651c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(c14), continue_node<ExpectedType, lightweight>>);
36751c0b2f7Stbbdev 
36851c0b2f7Stbbdev     continue_node c15(follows(b), 5, body, node_priority_t(5));
36951c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(c15), continue_node<ExpectedType>>);
37051c0b2f7Stbbdev 
37151c0b2f7Stbbdev     continue_node c16(follows(b), 5, body, lightweight(), node_priority_t(5));
37251c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(c16), continue_node<ExpectedType, lightweight>>);
37351c0b2f7Stbbdev #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
37451c0b2f7Stbbdev 
37551c0b2f7Stbbdev     continue_node c17(c1);
37651c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(c17), continue_node<ExpectedType>>);
37751c0b2f7Stbbdev }
37851c0b2f7Stbbdev 
37951c0b2f7Stbbdev int continue_body_f(const tbb::flow::continue_msg&) { return 1; }
38051c0b2f7Stbbdev void continue_void_body_f(const tbb::flow::continue_msg&) {}
38151c0b2f7Stbbdev 
38251c0b2f7Stbbdev void test_deduction_guides() {
38351c0b2f7Stbbdev     using tbb::flow::continue_msg;
38451c0b2f7Stbbdev     test_deduction_guides_common<int>([](const continue_msg&)->int { return 1; } );
38551c0b2f7Stbbdev     test_deduction_guides_common<continue_msg>([](const continue_msg&) {});
38651c0b2f7Stbbdev 
38751c0b2f7Stbbdev     test_deduction_guides_common<int>([](const continue_msg&) mutable ->int { return 1; });
38851c0b2f7Stbbdev     test_deduction_guides_common<continue_msg>([](const continue_msg&) mutable {});
38951c0b2f7Stbbdev 
39051c0b2f7Stbbdev     test_deduction_guides_common<int>(continue_body_f);
39151c0b2f7Stbbdev     test_deduction_guides_common<continue_msg>(continue_void_body_f);
39251c0b2f7Stbbdev }
39351c0b2f7Stbbdev 
39451c0b2f7Stbbdev #endif // __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
39551c0b2f7Stbbdev 
39651c0b2f7Stbbdev // TODO: use pass_through from test_function_node instead
39751c0b2f7Stbbdev template<typename T>
39851c0b2f7Stbbdev struct passing_body {
39951c0b2f7Stbbdev     T operator()(const T& val) {
40051c0b2f7Stbbdev         return val;
40151c0b2f7Stbbdev     }
40251c0b2f7Stbbdev };
40351c0b2f7Stbbdev 
40451c0b2f7Stbbdev /*
40551c0b2f7Stbbdev     The test covers the case when a node with non-default mutex type is a predecessor for continue_node,
40651c0b2f7Stbbdev     because there used to be a bug when make_edge(node, continue_node)
40751c0b2f7Stbbdev     did not update continue_node's predecesosor threshold
40851c0b2f7Stbbdev     since the specialization of node's successor_cache for a continue_node was not chosen.
40951c0b2f7Stbbdev */
41051c0b2f7Stbbdev void test_successor_cache_specialization() {
41151c0b2f7Stbbdev     using namespace tbb::flow;
41251c0b2f7Stbbdev 
41351c0b2f7Stbbdev     graph g;
41451c0b2f7Stbbdev 
41551c0b2f7Stbbdev     broadcast_node<continue_msg> node_with_default_mutex_type(g);
41651c0b2f7Stbbdev     buffer_node<continue_msg> node_with_non_default_mutex_type(g);
41751c0b2f7Stbbdev 
41851c0b2f7Stbbdev     continue_node<continue_msg> node(g, passing_body<continue_msg>());
41951c0b2f7Stbbdev 
42051c0b2f7Stbbdev     make_edge(node_with_default_mutex_type, node);
42151c0b2f7Stbbdev     make_edge(node_with_non_default_mutex_type, node);
42251c0b2f7Stbbdev 
42351c0b2f7Stbbdev     buffer_node<continue_msg> buf(g);
42451c0b2f7Stbbdev 
42551c0b2f7Stbbdev     make_edge(node, buf);
42651c0b2f7Stbbdev 
42751c0b2f7Stbbdev     node_with_default_mutex_type.try_put(continue_msg());
42851c0b2f7Stbbdev     node_with_non_default_mutex_type.try_put(continue_msg());
42951c0b2f7Stbbdev 
43051c0b2f7Stbbdev     g.wait_for_all();
43151c0b2f7Stbbdev 
43251c0b2f7Stbbdev     continue_msg storage;
43351c0b2f7Stbbdev     CHECK_MESSAGE((buf.try_get(storage) && !buf.try_get(storage)),
43451c0b2f7Stbbdev                   "Wrong number of messages is passed via continue_node");
43551c0b2f7Stbbdev }
43651c0b2f7Stbbdev 
43751c0b2f7Stbbdev //! Test concurrent continue_node for correctness
43851c0b2f7Stbbdev //! \brief \ref error_guessing
43951c0b2f7Stbbdev TEST_CASE("Concurrency testing") {
44051c0b2f7Stbbdev     for( unsigned p=utils::MinThread; p<=utils::MaxThread; ++p ) {
44151c0b2f7Stbbdev         test_concurrency(p);
44251c0b2f7Stbbdev     }
44351c0b2f7Stbbdev }
44451c0b2f7Stbbdev 
44551c0b2f7Stbbdev //! Test concurrent continue_node in separate graphs
44651c0b2f7Stbbdev //! \brief \ref error_guessing
44751c0b2f7Stbbdev TEST_CASE("Two graphs") { test_two_graphs(); }
44851c0b2f7Stbbdev 
44951c0b2f7Stbbdev //! Test basic behaviour with lightweight body
45051c0b2f7Stbbdev //! \brief \ref requirement \ref error_guessing
45151c0b2f7Stbbdev TEST_CASE( "Lightweight policy" ) { test_lightweight_policy(); }
45251c0b2f7Stbbdev 
45351c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
45451c0b2f7Stbbdev //! Test deprecated follows and preceedes API
45551c0b2f7Stbbdev //! \brief \ref error_guessing
45651c0b2f7Stbbdev TEST_CASE( "Support for follows and precedes API" ) { test_follows_and_precedes_api(); }
45751c0b2f7Stbbdev #endif
45851c0b2f7Stbbdev 
45951c0b2f7Stbbdev #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
46051c0b2f7Stbbdev //! Test deduction guides
46151c0b2f7Stbbdev //! \brief requirement
46251c0b2f7Stbbdev TEST_CASE( "Deduction guides" ) { test_deduction_guides(); }
46351c0b2f7Stbbdev #endif
46451c0b2f7Stbbdev 
46551c0b2f7Stbbdev //! Test for successor cache specialization
46651c0b2f7Stbbdev //! \brief \ref regression
46751c0b2f7Stbbdev TEST_CASE( "Regression for successor cache specialization" ) {
46851c0b2f7Stbbdev     test_successor_cache_specialization();
46951c0b2f7Stbbdev }
470