xref: /oneTBB/test/tbb/test_continue_node.cpp (revision 478de5b1)
151c0b2f7Stbbdev /*
2b15aabb3Stbbdev     Copyright (c) 2005-2021 Intel Corporation
351c0b2f7Stbbdev 
451c0b2f7Stbbdev     Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev     you may not use this file except in compliance with the License.
651c0b2f7Stbbdev     You may obtain a copy of the License at
751c0b2f7Stbbdev 
851c0b2f7Stbbdev         http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev 
1051c0b2f7Stbbdev     Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev     distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev     See the License for the specific language governing permissions and
1451c0b2f7Stbbdev     limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev 
1751c0b2f7Stbbdev #include "common/config.h"
1851c0b2f7Stbbdev 
1951c0b2f7Stbbdev #include "tbb/flow_graph.h"
2051c0b2f7Stbbdev 
2151c0b2f7Stbbdev #include "common/test.h"
2251c0b2f7Stbbdev #include "common/utils.h"
2351c0b2f7Stbbdev #include "common/graph_utils.h"
2451c0b2f7Stbbdev #include "common/test_follows_and_precedes_api.h"
25*478de5b1Stbbdev #include "common/concepts_common.h"
2651c0b2f7Stbbdev 
2751c0b2f7Stbbdev 
2851c0b2f7Stbbdev //! \file test_continue_node.cpp
2951c0b2f7Stbbdev //! \brief Test for [flow_graph.continue_node] specification
3051c0b2f7Stbbdev 
3151c0b2f7Stbbdev 
3251c0b2f7Stbbdev #define N 1000
3351c0b2f7Stbbdev #define MAX_NODES 4
3451c0b2f7Stbbdev #define C 8
3551c0b2f7Stbbdev 
3651c0b2f7Stbbdev // A class to use as a fake predecessor of continue_node
3751c0b2f7Stbbdev struct fake_continue_sender : public tbb::flow::sender<tbb::flow::continue_msg>
3851c0b2f7Stbbdev {
3951c0b2f7Stbbdev     typedef tbb::flow::sender<tbb::flow::continue_msg>::successor_type successor_type;
4051c0b2f7Stbbdev     // Define implementations of virtual methods that are abstract in the base class
4151c0b2f7Stbbdev     bool register_successor( successor_type& ) override { return false; }
4251c0b2f7Stbbdev     bool remove_successor( successor_type& )   override { return false; }
4351c0b2f7Stbbdev };
4451c0b2f7Stbbdev 
4551c0b2f7Stbbdev template< typename InputType >
4651c0b2f7Stbbdev struct parallel_puts {
4751c0b2f7Stbbdev 
4851c0b2f7Stbbdev     tbb::flow::receiver< InputType > * const my_exe_node;
4951c0b2f7Stbbdev 
5051c0b2f7Stbbdev     parallel_puts( tbb::flow::receiver< InputType > &exe_node ) : my_exe_node(&exe_node) {}
5151c0b2f7Stbbdev     parallel_puts& operator=(const parallel_puts&) = delete;
5251c0b2f7Stbbdev 
5351c0b2f7Stbbdev     void operator()( int ) const  {
5451c0b2f7Stbbdev         for ( int i = 0; i < N; ++i ) {
5551c0b2f7Stbbdev             // the nodes will accept all puts
5651c0b2f7Stbbdev             CHECK_MESSAGE( my_exe_node->try_put( InputType() ) == true, "" );
5751c0b2f7Stbbdev         }
5851c0b2f7Stbbdev     }
5951c0b2f7Stbbdev 
6051c0b2f7Stbbdev };
6151c0b2f7Stbbdev 
6251c0b2f7Stbbdev template< typename OutputType >
6351c0b2f7Stbbdev void run_continue_nodes( int p, tbb::flow::graph& g, tbb::flow::continue_node< OutputType >& n ) {
6451c0b2f7Stbbdev     fake_continue_sender fake_sender;
6551c0b2f7Stbbdev     for (size_t i = 0; i < N; ++i) {
6651c0b2f7Stbbdev         tbb::detail::d1::register_predecessor(n, fake_sender);
6751c0b2f7Stbbdev     }
6851c0b2f7Stbbdev 
6951c0b2f7Stbbdev     for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
7051c0b2f7Stbbdev         std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers;
7151c0b2f7Stbbdev         for (size_t i = 0; i < num_receivers; ++i) {
7251c0b2f7Stbbdev             receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) );
7351c0b2f7Stbbdev         }
7451c0b2f7Stbbdev         harness_graph_executor<tbb::flow::continue_msg, OutputType>::execute_count = 0;
7551c0b2f7Stbbdev 
7651c0b2f7Stbbdev         for (size_t r = 0; r < num_receivers; ++r ) {
7751c0b2f7Stbbdev             tbb::flow::make_edge( n, *receivers[r] );
7851c0b2f7Stbbdev         }
7951c0b2f7Stbbdev 
8051c0b2f7Stbbdev         utils::NativeParallelFor( p, parallel_puts<tbb::flow::continue_msg>(n) );
8151c0b2f7Stbbdev         g.wait_for_all();
8251c0b2f7Stbbdev 
8351c0b2f7Stbbdev         // 2) the nodes will receive puts from multiple predecessors simultaneously,
8451c0b2f7Stbbdev         size_t ec = harness_graph_executor<tbb::flow::continue_msg, OutputType>::execute_count;
8551c0b2f7Stbbdev         CHECK_MESSAGE( (int)ec == p, "" );
8651c0b2f7Stbbdev         for (size_t r = 0; r < num_receivers; ++r ) {
8751c0b2f7Stbbdev             size_t c = receivers[r]->my_count;
8851c0b2f7Stbbdev             // 3) the nodes will send to multiple successors.
8951c0b2f7Stbbdev             CHECK_MESSAGE( (int)c == p, "" );
9051c0b2f7Stbbdev         }
9151c0b2f7Stbbdev 
9251c0b2f7Stbbdev         for (size_t r = 0; r < num_receivers; ++r ) {
9351c0b2f7Stbbdev             tbb::flow::remove_edge( n, *receivers[r] );
9451c0b2f7Stbbdev         }
9551c0b2f7Stbbdev     }
9651c0b2f7Stbbdev }
9751c0b2f7Stbbdev 
9851c0b2f7Stbbdev template< typename OutputType, typename Body >
9951c0b2f7Stbbdev void continue_nodes( Body body ) {
10051c0b2f7Stbbdev     for (int p = 1; p < 2*4/*MaxThread*/; ++p) {
10151c0b2f7Stbbdev         tbb::flow::graph g;
10251c0b2f7Stbbdev         tbb::flow::continue_node< OutputType > exe_node( g, body );
10351c0b2f7Stbbdev         run_continue_nodes( p, g, exe_node);
10451c0b2f7Stbbdev         exe_node.try_put(tbb::flow::continue_msg());
10551c0b2f7Stbbdev         tbb::flow::continue_node< OutputType > exe_node_copy( exe_node );
10651c0b2f7Stbbdev         run_continue_nodes( p, g, exe_node_copy);
10751c0b2f7Stbbdev     }
10851c0b2f7Stbbdev }
10951c0b2f7Stbbdev 
11051c0b2f7Stbbdev const size_t Offset = 123;
11151c0b2f7Stbbdev std::atomic<size_t> global_execute_count;
11251c0b2f7Stbbdev 
11351c0b2f7Stbbdev template< typename OutputType >
11451c0b2f7Stbbdev struct inc_functor {
11551c0b2f7Stbbdev 
11651c0b2f7Stbbdev     std::atomic<size_t> local_execute_count;
11751c0b2f7Stbbdev     inc_functor( ) { local_execute_count = 0; }
11851c0b2f7Stbbdev     inc_functor( const inc_functor &f ) { local_execute_count = size_t(f.local_execute_count); }
11951c0b2f7Stbbdev     void operator=(const inc_functor &f) { local_execute_count = size_t(f.local_execute_count); }
12051c0b2f7Stbbdev 
12151c0b2f7Stbbdev     OutputType operator()( tbb::flow::continue_msg ) {
12251c0b2f7Stbbdev        ++global_execute_count;
12351c0b2f7Stbbdev        ++local_execute_count;
12451c0b2f7Stbbdev        return OutputType();
12551c0b2f7Stbbdev     }
12651c0b2f7Stbbdev 
12751c0b2f7Stbbdev };
12851c0b2f7Stbbdev 
12951c0b2f7Stbbdev template< typename OutputType >
13051c0b2f7Stbbdev void continue_nodes_with_copy( ) {
13151c0b2f7Stbbdev 
13251c0b2f7Stbbdev     for (int p = 1; p < 2*4/*MaxThread*/; ++p) {
13351c0b2f7Stbbdev         tbb::flow::graph g;
13451c0b2f7Stbbdev         inc_functor<OutputType> cf;
13551c0b2f7Stbbdev         cf.local_execute_count = Offset;
13651c0b2f7Stbbdev         global_execute_count = Offset;
13751c0b2f7Stbbdev 
13851c0b2f7Stbbdev         tbb::flow::continue_node< OutputType > exe_node( g, cf );
13951c0b2f7Stbbdev         fake_continue_sender fake_sender;
14051c0b2f7Stbbdev         for (size_t i = 0; i < N; ++i) {
14151c0b2f7Stbbdev             tbb::detail::d1::register_predecessor(exe_node, fake_sender);
14251c0b2f7Stbbdev         }
14351c0b2f7Stbbdev 
14451c0b2f7Stbbdev         for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
14551c0b2f7Stbbdev             std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers;
14651c0b2f7Stbbdev             for (size_t i = 0; i < num_receivers; ++i) {
14751c0b2f7Stbbdev                 receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) );
14851c0b2f7Stbbdev             }
14951c0b2f7Stbbdev 
15051c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
15151c0b2f7Stbbdev                 tbb::flow::make_edge( exe_node, *receivers[r] );
15251c0b2f7Stbbdev             }
15351c0b2f7Stbbdev 
15451c0b2f7Stbbdev             utils::NativeParallelFor( p, parallel_puts<tbb::flow::continue_msg>(exe_node) );
15551c0b2f7Stbbdev             g.wait_for_all();
15651c0b2f7Stbbdev 
15751c0b2f7Stbbdev             // 2) the nodes will receive puts from multiple predecessors simultaneously,
15851c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
15951c0b2f7Stbbdev                 size_t c = receivers[r]->my_count;
16051c0b2f7Stbbdev                 // 3) the nodes will send to multiple successors.
16151c0b2f7Stbbdev                 CHECK_MESSAGE( (int)c == p, "" );
16251c0b2f7Stbbdev             }
16351c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
16451c0b2f7Stbbdev                 tbb::flow::remove_edge( exe_node, *receivers[r] );
16551c0b2f7Stbbdev             }
16651c0b2f7Stbbdev         }
16751c0b2f7Stbbdev 
16851c0b2f7Stbbdev         // validate that the local body matches the global execute_count and both are correct
16951c0b2f7Stbbdev         inc_functor<OutputType> body_copy = tbb::flow::copy_body< inc_functor<OutputType> >( exe_node );
17051c0b2f7Stbbdev         const size_t expected_count = p*MAX_NODES + Offset;
17151c0b2f7Stbbdev         size_t global_count = global_execute_count;
17251c0b2f7Stbbdev         size_t inc_count = body_copy.local_execute_count;
17351c0b2f7Stbbdev         CHECK_MESSAGE( global_count == expected_count, "" );
17451c0b2f7Stbbdev         CHECK_MESSAGE( global_count == inc_count, "" );
17551c0b2f7Stbbdev         g.reset(tbb::flow::rf_reset_bodies);
17651c0b2f7Stbbdev         body_copy = tbb::flow::copy_body< inc_functor<OutputType> >( exe_node );
17751c0b2f7Stbbdev         inc_count = body_copy.local_execute_count;
17851c0b2f7Stbbdev         CHECK_MESSAGE( ( Offset == inc_count), "reset(rf_reset_bodies) did not reset functor" );
17951c0b2f7Stbbdev 
18051c0b2f7Stbbdev     }
18151c0b2f7Stbbdev }
18251c0b2f7Stbbdev 
18351c0b2f7Stbbdev template< typename OutputType >
18451c0b2f7Stbbdev void run_continue_nodes() {
18551c0b2f7Stbbdev     harness_graph_executor< tbb::flow::continue_msg, OutputType>::max_executors = 0;
18651c0b2f7Stbbdev     continue_nodes<OutputType>( []( tbb::flow::continue_msg i ) -> OutputType { return harness_graph_executor<tbb::flow::continue_msg, OutputType>::func(i); } );
18751c0b2f7Stbbdev     continue_nodes<OutputType>( &harness_graph_executor<tbb::flow::continue_msg, OutputType>::func );
18851c0b2f7Stbbdev     continue_nodes<OutputType>( typename harness_graph_executor<tbb::flow::continue_msg, OutputType>::functor() );
18951c0b2f7Stbbdev     continue_nodes_with_copy<OutputType>();
19051c0b2f7Stbbdev }
19151c0b2f7Stbbdev 
19251c0b2f7Stbbdev //! Tests limited concurrency cases for nodes that accept data messages
19351c0b2f7Stbbdev void test_concurrency(int num_threads) {
19451c0b2f7Stbbdev     tbb::task_arena arena(num_threads);
19551c0b2f7Stbbdev     arena.execute(
19651c0b2f7Stbbdev         [&] {
19751c0b2f7Stbbdev             run_continue_nodes<tbb::flow::continue_msg>();
19851c0b2f7Stbbdev             run_continue_nodes<int>();
19951c0b2f7Stbbdev             run_continue_nodes<utils::NoAssign>();
20051c0b2f7Stbbdev         }
20151c0b2f7Stbbdev     );
20251c0b2f7Stbbdev }
20351c0b2f7Stbbdev /*
20451c0b2f7Stbbdev  * Connection of two graphs is not currently supported, but works to some limited extent.
20551c0b2f7Stbbdev  * This test is included to check for backward compatibility. It checks that a continue_node
20651c0b2f7Stbbdev  * with predecessors in two different graphs receives the required
20751c0b2f7Stbbdev  * number of continue messages before it executes.
20851c0b2f7Stbbdev  */
20951c0b2f7Stbbdev using namespace tbb::flow;
21051c0b2f7Stbbdev 
21151c0b2f7Stbbdev struct add_to_counter {
21251c0b2f7Stbbdev     int* counter;
21351c0b2f7Stbbdev     add_to_counter(int& var):counter(&var){}
21451c0b2f7Stbbdev     void operator()(continue_msg){*counter+=1;}
21551c0b2f7Stbbdev };
21651c0b2f7Stbbdev 
21751c0b2f7Stbbdev void test_two_graphs(){
21851c0b2f7Stbbdev     int count=0;
21951c0b2f7Stbbdev 
22051c0b2f7Stbbdev     //graph g with broadcast_node and continue_node
22151c0b2f7Stbbdev     graph g;
22251c0b2f7Stbbdev     broadcast_node<continue_msg> start_g(g);
22351c0b2f7Stbbdev     continue_node<continue_msg> first_g(g, add_to_counter(count));
22451c0b2f7Stbbdev 
22551c0b2f7Stbbdev     //graph h with broadcast_node
22651c0b2f7Stbbdev     graph h;
22751c0b2f7Stbbdev     broadcast_node<continue_msg> start_h(h);
22851c0b2f7Stbbdev 
22951c0b2f7Stbbdev     //making two edges to first_g from the two graphs
23051c0b2f7Stbbdev     make_edge(start_g,first_g);
23151c0b2f7Stbbdev     make_edge(start_h, first_g);
23251c0b2f7Stbbdev 
23351c0b2f7Stbbdev     //two try_puts from the two graphs
23451c0b2f7Stbbdev     start_g.try_put(continue_msg());
23551c0b2f7Stbbdev     start_h.try_put(continue_msg());
23651c0b2f7Stbbdev     g.wait_for_all();
23751c0b2f7Stbbdev     CHECK_MESSAGE( (count==1), "Not all continue messages received");
23851c0b2f7Stbbdev 
23951c0b2f7Stbbdev     //two try_puts from the graph that doesn't contain the node
24051c0b2f7Stbbdev     count=0;
24151c0b2f7Stbbdev     start_h.try_put(continue_msg());
24251c0b2f7Stbbdev     start_h.try_put(continue_msg());
24351c0b2f7Stbbdev     g.wait_for_all();
24451c0b2f7Stbbdev     CHECK_MESSAGE( (count==1), "Not all continue messages received -1");
24551c0b2f7Stbbdev 
24651c0b2f7Stbbdev     //only one try_put
24751c0b2f7Stbbdev     count=0;
24851c0b2f7Stbbdev     start_g.try_put(continue_msg());
24951c0b2f7Stbbdev     g.wait_for_all();
25051c0b2f7Stbbdev     CHECK_MESSAGE( (count==0), "Node executed without waiting for all predecessors");
25151c0b2f7Stbbdev }
25251c0b2f7Stbbdev 
25351c0b2f7Stbbdev struct lightweight_policy_body {
25451c0b2f7Stbbdev     const std::thread::id my_thread_id;
25551c0b2f7Stbbdev     std::atomic<size_t>& my_count;
25651c0b2f7Stbbdev 
25751c0b2f7Stbbdev     lightweight_policy_body( std::atomic<size_t>& count )
25851c0b2f7Stbbdev         : my_thread_id(std::this_thread::get_id()), my_count(count)
25951c0b2f7Stbbdev     {
26051c0b2f7Stbbdev         my_count = 0;
26151c0b2f7Stbbdev     }
26251c0b2f7Stbbdev     lightweight_policy_body& operator=(const lightweight_policy_body&) = delete;
26351c0b2f7Stbbdev     void operator()(tbb::flow::continue_msg) {
26451c0b2f7Stbbdev         ++my_count;
26551c0b2f7Stbbdev         std::thread::id body_thread_id = std::this_thread::get_id();
26651c0b2f7Stbbdev         CHECK_MESSAGE( (body_thread_id == my_thread_id), "Body executed as not lightweight");
26751c0b2f7Stbbdev     }
26851c0b2f7Stbbdev };
26951c0b2f7Stbbdev 
27051c0b2f7Stbbdev void test_lightweight_policy() {
27151c0b2f7Stbbdev     tbb::flow::graph g;
27251c0b2f7Stbbdev     std::atomic<size_t> count1;
27351c0b2f7Stbbdev     std::atomic<size_t> count2;
27451c0b2f7Stbbdev     tbb::flow::continue_node<tbb::flow::continue_msg, tbb::flow::lightweight>
27551c0b2f7Stbbdev         node1(g, lightweight_policy_body(count1));
27651c0b2f7Stbbdev     tbb::flow::continue_node<tbb::flow::continue_msg, tbb::flow::lightweight>
27751c0b2f7Stbbdev         node2(g, lightweight_policy_body(count2));
27851c0b2f7Stbbdev 
27951c0b2f7Stbbdev     tbb::flow::make_edge(node1, node2);
28051c0b2f7Stbbdev     const size_t n = 10;
28151c0b2f7Stbbdev     for(size_t i = 0; i < n; ++i) {
28251c0b2f7Stbbdev         node1.try_put(tbb::flow::continue_msg());
28351c0b2f7Stbbdev     }
28451c0b2f7Stbbdev     g.wait_for_all();
28551c0b2f7Stbbdev 
28651c0b2f7Stbbdev     lightweight_policy_body body1 = tbb::flow::copy_body<lightweight_policy_body>(node1);
28751c0b2f7Stbbdev     lightweight_policy_body body2 = tbb::flow::copy_body<lightweight_policy_body>(node2);
28851c0b2f7Stbbdev     CHECK_MESSAGE( (body1.my_count == n), "Body of the first node needs to be executed N times");
28951c0b2f7Stbbdev     CHECK_MESSAGE( (body2.my_count == n), "Body of the second node needs to be executed N times");
29051c0b2f7Stbbdev }
29151c0b2f7Stbbdev 
29251c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
29351c0b2f7Stbbdev #include <array>
29451c0b2f7Stbbdev #include <vector>
29551c0b2f7Stbbdev void test_follows_and_precedes_api() {
29651c0b2f7Stbbdev     using msg_t = tbb::flow::continue_msg;
29751c0b2f7Stbbdev 
29851c0b2f7Stbbdev     std::array<msg_t, 3> messages_for_follows = { { msg_t(), msg_t(), msg_t() } };
29951c0b2f7Stbbdev     std::vector<msg_t> messages_for_precedes  = { msg_t() };
30051c0b2f7Stbbdev 
30151c0b2f7Stbbdev     auto pass_through = [](const msg_t& msg) { return msg; };
30251c0b2f7Stbbdev 
30351c0b2f7Stbbdev     follows_and_precedes_testing::test_follows
30451c0b2f7Stbbdev         <msg_t, tbb::flow::continue_node<msg_t>>
30551c0b2f7Stbbdev         (messages_for_follows, pass_through, node_priority_t(0));
30651c0b2f7Stbbdev 
30751c0b2f7Stbbdev     follows_and_precedes_testing::test_precedes
30851c0b2f7Stbbdev         <msg_t, tbb::flow::continue_node<msg_t>>
30951c0b2f7Stbbdev         (messages_for_precedes, /* number_of_predecessors = */0, pass_through, node_priority_t(1));
31051c0b2f7Stbbdev }
31151c0b2f7Stbbdev #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
31251c0b2f7Stbbdev 
31351c0b2f7Stbbdev #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
31451c0b2f7Stbbdev 
31551c0b2f7Stbbdev template <typename ExpectedType, typename Body>
31651c0b2f7Stbbdev void test_deduction_guides_common(Body body) {
31751c0b2f7Stbbdev     using namespace tbb::flow;
31851c0b2f7Stbbdev     graph g;
31951c0b2f7Stbbdev 
32051c0b2f7Stbbdev     continue_node c1(g, body);
32151c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(c1), continue_node<ExpectedType>>);
32251c0b2f7Stbbdev 
32351c0b2f7Stbbdev     continue_node c2(g, body, lightweight());
32451c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(c2), continue_node<ExpectedType, lightweight>>);
32551c0b2f7Stbbdev 
32651c0b2f7Stbbdev     continue_node c3(g, 5, body);
32751c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(c3), continue_node<ExpectedType>>);
32851c0b2f7Stbbdev 
32951c0b2f7Stbbdev     continue_node c4(g, 5, body, lightweight());
33051c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(c4), continue_node<ExpectedType, lightweight>>);
33151c0b2f7Stbbdev 
33251c0b2f7Stbbdev     continue_node c5(g, body, node_priority_t(5));
33351c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(c5), continue_node<ExpectedType>>);
33451c0b2f7Stbbdev 
33551c0b2f7Stbbdev     continue_node c6(g, body, lightweight(), node_priority_t(5));
33651c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(c6), continue_node<ExpectedType, lightweight>>);
33751c0b2f7Stbbdev 
33851c0b2f7Stbbdev     continue_node c7(g, 5, body, node_priority_t(5));
33951c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(c7), continue_node<ExpectedType>>);
34051c0b2f7Stbbdev 
34151c0b2f7Stbbdev     continue_node c8(g, 5, body, lightweight(), node_priority_t(5));
34251c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(c8), continue_node<ExpectedType, lightweight>>);
34351c0b2f7Stbbdev 
34451c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
34551c0b2f7Stbbdev     broadcast_node<continue_msg> b(g);
34651c0b2f7Stbbdev 
34751c0b2f7Stbbdev     continue_node c9(follows(b), body);
34851c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(c9), continue_node<ExpectedType>>);
34951c0b2f7Stbbdev 
35051c0b2f7Stbbdev     continue_node c10(follows(b), body, lightweight());
35151c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(c10), continue_node<ExpectedType, lightweight>>);
35251c0b2f7Stbbdev 
35351c0b2f7Stbbdev     continue_node c11(follows(b), 5, body);
35451c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(c11), continue_node<ExpectedType>>);
35551c0b2f7Stbbdev 
35651c0b2f7Stbbdev     continue_node c12(follows(b), 5, body, lightweight());
35751c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(c12), continue_node<ExpectedType, lightweight>>);
35851c0b2f7Stbbdev 
35951c0b2f7Stbbdev     continue_node c13(follows(b), body, node_priority_t(5));
36051c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(c13), continue_node<ExpectedType>>);
36151c0b2f7Stbbdev 
36251c0b2f7Stbbdev     continue_node c14(follows(b), body, lightweight(), node_priority_t(5));
36351c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(c14), continue_node<ExpectedType, lightweight>>);
36451c0b2f7Stbbdev 
36551c0b2f7Stbbdev     continue_node c15(follows(b), 5, body, node_priority_t(5));
36651c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(c15), continue_node<ExpectedType>>);
36751c0b2f7Stbbdev 
36851c0b2f7Stbbdev     continue_node c16(follows(b), 5, body, lightweight(), node_priority_t(5));
36951c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(c16), continue_node<ExpectedType, lightweight>>);
37051c0b2f7Stbbdev #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
37151c0b2f7Stbbdev 
37251c0b2f7Stbbdev     continue_node c17(c1);
37351c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(c17), continue_node<ExpectedType>>);
37451c0b2f7Stbbdev }
37551c0b2f7Stbbdev 
37651c0b2f7Stbbdev int continue_body_f(const tbb::flow::continue_msg&) { return 1; }
37751c0b2f7Stbbdev void continue_void_body_f(const tbb::flow::continue_msg&) {}
37851c0b2f7Stbbdev 
37951c0b2f7Stbbdev void test_deduction_guides() {
38051c0b2f7Stbbdev     using tbb::flow::continue_msg;
38151c0b2f7Stbbdev     test_deduction_guides_common<int>([](const continue_msg&)->int { return 1; } );
38251c0b2f7Stbbdev     test_deduction_guides_common<continue_msg>([](const continue_msg&) {});
38351c0b2f7Stbbdev 
38451c0b2f7Stbbdev     test_deduction_guides_common<int>([](const continue_msg&) mutable ->int { return 1; });
38551c0b2f7Stbbdev     test_deduction_guides_common<continue_msg>([](const continue_msg&) mutable {});
38651c0b2f7Stbbdev 
38751c0b2f7Stbbdev     test_deduction_guides_common<int>(continue_body_f);
38851c0b2f7Stbbdev     test_deduction_guides_common<continue_msg>(continue_void_body_f);
38951c0b2f7Stbbdev }
39051c0b2f7Stbbdev 
39151c0b2f7Stbbdev #endif // __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
39251c0b2f7Stbbdev 
39351c0b2f7Stbbdev // TODO: use pass_through from test_function_node instead
39451c0b2f7Stbbdev template<typename T>
39551c0b2f7Stbbdev struct passing_body {
39651c0b2f7Stbbdev     T operator()(const T& val) {
39751c0b2f7Stbbdev         return val;
39851c0b2f7Stbbdev     }
39951c0b2f7Stbbdev };
40051c0b2f7Stbbdev 
40151c0b2f7Stbbdev /*
40251c0b2f7Stbbdev     The test covers the case when a node with non-default mutex type is a predecessor for continue_node,
40351c0b2f7Stbbdev     because there used to be a bug when make_edge(node, continue_node)
40451c0b2f7Stbbdev     did not update continue_node's predecesosor threshold
40551c0b2f7Stbbdev     since the specialization of node's successor_cache for a continue_node was not chosen.
40651c0b2f7Stbbdev */
40751c0b2f7Stbbdev void test_successor_cache_specialization() {
40851c0b2f7Stbbdev     using namespace tbb::flow;
40951c0b2f7Stbbdev 
41051c0b2f7Stbbdev     graph g;
41151c0b2f7Stbbdev 
41251c0b2f7Stbbdev     broadcast_node<continue_msg> node_with_default_mutex_type(g);
41351c0b2f7Stbbdev     buffer_node<continue_msg> node_with_non_default_mutex_type(g);
41451c0b2f7Stbbdev 
41551c0b2f7Stbbdev     continue_node<continue_msg> node(g, passing_body<continue_msg>());
41651c0b2f7Stbbdev 
41751c0b2f7Stbbdev     make_edge(node_with_default_mutex_type, node);
41851c0b2f7Stbbdev     make_edge(node_with_non_default_mutex_type, node);
41951c0b2f7Stbbdev 
42051c0b2f7Stbbdev     buffer_node<continue_msg> buf(g);
42151c0b2f7Stbbdev 
42251c0b2f7Stbbdev     make_edge(node, buf);
42351c0b2f7Stbbdev 
42451c0b2f7Stbbdev     node_with_default_mutex_type.try_put(continue_msg());
42551c0b2f7Stbbdev     node_with_non_default_mutex_type.try_put(continue_msg());
42651c0b2f7Stbbdev 
42751c0b2f7Stbbdev     g.wait_for_all();
42851c0b2f7Stbbdev 
42951c0b2f7Stbbdev     continue_msg storage;
43051c0b2f7Stbbdev     CHECK_MESSAGE((buf.try_get(storage) && !buf.try_get(storage)),
43151c0b2f7Stbbdev                   "Wrong number of messages is passed via continue_node");
43251c0b2f7Stbbdev }
43351c0b2f7Stbbdev 
43451c0b2f7Stbbdev //! Test concurrent continue_node for correctness
43551c0b2f7Stbbdev //! \brief \ref error_guessing
43651c0b2f7Stbbdev TEST_CASE("Concurrency testing") {
43751c0b2f7Stbbdev     for( unsigned p=utils::MinThread; p<=utils::MaxThread; ++p ) {
43851c0b2f7Stbbdev         test_concurrency(p);
43951c0b2f7Stbbdev     }
44051c0b2f7Stbbdev }
44151c0b2f7Stbbdev 
44251c0b2f7Stbbdev //! Test concurrent continue_node in separate graphs
44351c0b2f7Stbbdev //! \brief \ref error_guessing
44451c0b2f7Stbbdev TEST_CASE("Two graphs") { test_two_graphs(); }
44551c0b2f7Stbbdev 
44651c0b2f7Stbbdev //! Test basic behaviour with lightweight body
44751c0b2f7Stbbdev //! \brief \ref requirement \ref error_guessing
44851c0b2f7Stbbdev TEST_CASE( "Lightweight policy" ) { test_lightweight_policy(); }
44951c0b2f7Stbbdev 
45051c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
45151c0b2f7Stbbdev //! Test deprecated follows and preceedes API
45251c0b2f7Stbbdev //! \brief \ref error_guessing
45351c0b2f7Stbbdev TEST_CASE( "Support for follows and precedes API" ) { test_follows_and_precedes_api(); }
45451c0b2f7Stbbdev #endif
45551c0b2f7Stbbdev 
45651c0b2f7Stbbdev #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
45751c0b2f7Stbbdev //! Test deduction guides
45851c0b2f7Stbbdev //! \brief requirement
45951c0b2f7Stbbdev TEST_CASE( "Deduction guides" ) { test_deduction_guides(); }
46051c0b2f7Stbbdev #endif
46151c0b2f7Stbbdev 
46251c0b2f7Stbbdev //! Test for successor cache specialization
46351c0b2f7Stbbdev //! \brief \ref regression
46451c0b2f7Stbbdev TEST_CASE( "Regression for successor cache specialization" ) {
46551c0b2f7Stbbdev     test_successor_cache_specialization();
46651c0b2f7Stbbdev }
467*478de5b1Stbbdev 
468*478de5b1Stbbdev #if __TBB_CPP20_CONCEPTS_PRESENT
469*478de5b1Stbbdev //! \brief \ref error_guessing
470*478de5b1Stbbdev TEST_CASE("constraints for continue_node input") {
471*478de5b1Stbbdev     static_assert(utils::well_formed_instantiation<tbb::flow::continue_node, test_concepts::Copyable>);
472*478de5b1Stbbdev     static_assert(!utils::well_formed_instantiation<tbb::flow::continue_node, test_concepts::NonCopyable>);
473*478de5b1Stbbdev }
474*478de5b1Stbbdev 
475*478de5b1Stbbdev template <typename Input, typename Body>
476*478de5b1Stbbdev concept can_call_continue_node_ctor = requires( tbb::flow::graph& graph, Body body,
477*478de5b1Stbbdev                                                 tbb::flow::buffer_node<int>& f, std::size_t num,
478*478de5b1Stbbdev                                                 tbb::flow::node_priority_t priority  ) {
479*478de5b1Stbbdev     tbb::flow::continue_node<Input>(graph, body);
480*478de5b1Stbbdev     tbb::flow::continue_node<Input>(graph, body, priority);
481*478de5b1Stbbdev     tbb::flow::continue_node<Input>(graph, num, body);
482*478de5b1Stbbdev     tbb::flow::continue_node<Input>(graph, num, body, priority);
483*478de5b1Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
484*478de5b1Stbbdev     tbb::flow::continue_node<Input>(tbb::flow::follows(f), body);
485*478de5b1Stbbdev     tbb::flow::continue_node<Input>(tbb::flow::follows(f), body, priority);
486*478de5b1Stbbdev     tbb::flow::continue_node<Input>(tbb::flow::follows(f), num, body);
487*478de5b1Stbbdev     tbb::flow::continue_node<Input>(tbb::flow::follows(f), num, body, priority);
488*478de5b1Stbbdev #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
489*478de5b1Stbbdev };
490*478de5b1Stbbdev 
491*478de5b1Stbbdev //! \brief \ref error_guessing
492*478de5b1Stbbdev TEST_CASE("constraints for continue_node body") {
493*478de5b1Stbbdev     using output_type = int;
494*478de5b1Stbbdev     using namespace test_concepts::continue_node_body;
495*478de5b1Stbbdev 
496*478de5b1Stbbdev     static_assert(can_call_continue_node_ctor<output_type, Correct<output_type>>);
497*478de5b1Stbbdev     static_assert(!can_call_continue_node_ctor<output_type, NonCopyable<output_type>>);
498*478de5b1Stbbdev     static_assert(!can_call_continue_node_ctor<output_type, NonDestructible<output_type>>);
499*478de5b1Stbbdev     static_assert(!can_call_continue_node_ctor<output_type, NoOperatorRoundBrackets<output_type>>);
500*478de5b1Stbbdev     static_assert(!can_call_continue_node_ctor<output_type, WrongInputOperatorRoundBrackets<output_type>>);
501*478de5b1Stbbdev     static_assert(!can_call_continue_node_ctor<output_type, WrongReturnOperatorRoundBrackets<output_type>>);
502*478de5b1Stbbdev }
503*478de5b1Stbbdev #endif // __TBB_CPP20_CONCEPTS_PRESENT
504