151c0b2f7Stbbdev /*
2*c4a799dfSJhaShweta1 Copyright (c) 2005-2023 Intel Corporation
351c0b2f7Stbbdev
451c0b2f7Stbbdev Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev you may not use this file except in compliance with the License.
651c0b2f7Stbbdev You may obtain a copy of the License at
751c0b2f7Stbbdev
851c0b2f7Stbbdev http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev
1051c0b2f7Stbbdev Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev See the License for the specific language governing permissions and
1451c0b2f7Stbbdev limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev
1751c0b2f7Stbbdev #include "common/config.h"
1851c0b2f7Stbbdev
1951c0b2f7Stbbdev #include "tbb/flow_graph.h"
2051c0b2f7Stbbdev
2151c0b2f7Stbbdev #include "common/test.h"
2251c0b2f7Stbbdev #include "common/utils.h"
2351c0b2f7Stbbdev #include "common/graph_utils.h"
2451c0b2f7Stbbdev #include "common/test_follows_and_precedes_api.h"
25478de5b1Stbbdev #include "common/concepts_common.h"
2651c0b2f7Stbbdev
2751c0b2f7Stbbdev
2851c0b2f7Stbbdev //! \file test_continue_node.cpp
2951c0b2f7Stbbdev //! \brief Test for [flow_graph.continue_node] specification
3051c0b2f7Stbbdev
3151c0b2f7Stbbdev
3251c0b2f7Stbbdev #define N 1000
3351c0b2f7Stbbdev #define MAX_NODES 4
3451c0b2f7Stbbdev #define C 8
3551c0b2f7Stbbdev
3651c0b2f7Stbbdev // A class to use as a fake predecessor of continue_node
3751c0b2f7Stbbdev struct fake_continue_sender : public tbb::flow::sender<tbb::flow::continue_msg>
3851c0b2f7Stbbdev {
3951c0b2f7Stbbdev typedef tbb::flow::sender<tbb::flow::continue_msg>::successor_type successor_type;
4051c0b2f7Stbbdev // Define implementations of virtual methods that are abstract in the base class
register_successorfake_continue_sender4151c0b2f7Stbbdev bool register_successor( successor_type& ) override { return false; }
remove_successorfake_continue_sender4251c0b2f7Stbbdev bool remove_successor( successor_type& ) override { return false; }
4351c0b2f7Stbbdev };
4451c0b2f7Stbbdev
4551c0b2f7Stbbdev template< typename InputType >
4651c0b2f7Stbbdev struct parallel_puts {
4751c0b2f7Stbbdev
4851c0b2f7Stbbdev tbb::flow::receiver< InputType > * const my_exe_node;
4951c0b2f7Stbbdev
parallel_putsparallel_puts5051c0b2f7Stbbdev parallel_puts( tbb::flow::receiver< InputType > &exe_node ) : my_exe_node(&exe_node) {}
5151c0b2f7Stbbdev parallel_puts& operator=(const parallel_puts&) = delete;
5251c0b2f7Stbbdev
operator ()parallel_puts5351c0b2f7Stbbdev void operator()( int ) const {
5451c0b2f7Stbbdev for ( int i = 0; i < N; ++i ) {
5551c0b2f7Stbbdev // the nodes will accept all puts
5651c0b2f7Stbbdev CHECK_MESSAGE( my_exe_node->try_put( InputType() ) == true, "" );
5751c0b2f7Stbbdev }
5851c0b2f7Stbbdev }
5951c0b2f7Stbbdev
6051c0b2f7Stbbdev };
6151c0b2f7Stbbdev
6251c0b2f7Stbbdev template< typename OutputType >
run_continue_nodes(int p,tbb::flow::graph & g,tbb::flow::continue_node<OutputType> & n)6351c0b2f7Stbbdev void run_continue_nodes( int p, tbb::flow::graph& g, tbb::flow::continue_node< OutputType >& n ) {
6451c0b2f7Stbbdev fake_continue_sender fake_sender;
6551c0b2f7Stbbdev for (size_t i = 0; i < N; ++i) {
6651c0b2f7Stbbdev tbb::detail::d1::register_predecessor(n, fake_sender);
6751c0b2f7Stbbdev }
6851c0b2f7Stbbdev
6951c0b2f7Stbbdev for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
7051c0b2f7Stbbdev std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers;
7151c0b2f7Stbbdev for (size_t i = 0; i < num_receivers; ++i) {
7251c0b2f7Stbbdev receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) );
7351c0b2f7Stbbdev }
7451c0b2f7Stbbdev harness_graph_executor<tbb::flow::continue_msg, OutputType>::execute_count = 0;
7551c0b2f7Stbbdev
7651c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) {
7751c0b2f7Stbbdev tbb::flow::make_edge( n, *receivers[r] );
7851c0b2f7Stbbdev }
7951c0b2f7Stbbdev
8051c0b2f7Stbbdev utils::NativeParallelFor( p, parallel_puts<tbb::flow::continue_msg>(n) );
8151c0b2f7Stbbdev g.wait_for_all();
8251c0b2f7Stbbdev
8351c0b2f7Stbbdev // 2) the nodes will receive puts from multiple predecessors simultaneously,
8451c0b2f7Stbbdev size_t ec = harness_graph_executor<tbb::flow::continue_msg, OutputType>::execute_count;
8551c0b2f7Stbbdev CHECK_MESSAGE( (int)ec == p, "" );
8651c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) {
8751c0b2f7Stbbdev size_t c = receivers[r]->my_count;
8851c0b2f7Stbbdev // 3) the nodes will send to multiple successors.
8951c0b2f7Stbbdev CHECK_MESSAGE( (int)c == p, "" );
9051c0b2f7Stbbdev }
9151c0b2f7Stbbdev
9251c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) {
9351c0b2f7Stbbdev tbb::flow::remove_edge( n, *receivers[r] );
9451c0b2f7Stbbdev }
9551c0b2f7Stbbdev }
9651c0b2f7Stbbdev }
9751c0b2f7Stbbdev
9851c0b2f7Stbbdev template< typename OutputType, typename Body >
continue_nodes(Body body)9951c0b2f7Stbbdev void continue_nodes( Body body ) {
10051c0b2f7Stbbdev for (int p = 1; p < 2*4/*MaxThread*/; ++p) {
10151c0b2f7Stbbdev tbb::flow::graph g;
10251c0b2f7Stbbdev tbb::flow::continue_node< OutputType > exe_node( g, body );
10351c0b2f7Stbbdev run_continue_nodes( p, g, exe_node);
10451c0b2f7Stbbdev exe_node.try_put(tbb::flow::continue_msg());
10551c0b2f7Stbbdev tbb::flow::continue_node< OutputType > exe_node_copy( exe_node );
10651c0b2f7Stbbdev run_continue_nodes( p, g, exe_node_copy);
10751c0b2f7Stbbdev }
10851c0b2f7Stbbdev }
10951c0b2f7Stbbdev
11051c0b2f7Stbbdev const size_t Offset = 123;
11151c0b2f7Stbbdev std::atomic<size_t> global_execute_count;
11251c0b2f7Stbbdev
11351c0b2f7Stbbdev template< typename OutputType >
11451c0b2f7Stbbdev struct inc_functor {
11551c0b2f7Stbbdev
11651c0b2f7Stbbdev std::atomic<size_t> local_execute_count;
inc_functorinc_functor11751c0b2f7Stbbdev inc_functor( ) { local_execute_count = 0; }
inc_functorinc_functor11851c0b2f7Stbbdev inc_functor( const inc_functor &f ) { local_execute_count = size_t(f.local_execute_count); }
operator =inc_functor11951c0b2f7Stbbdev void operator=(const inc_functor &f) { local_execute_count = size_t(f.local_execute_count); }
12051c0b2f7Stbbdev
operator ()inc_functor12151c0b2f7Stbbdev OutputType operator()( tbb::flow::continue_msg ) {
12251c0b2f7Stbbdev ++global_execute_count;
12351c0b2f7Stbbdev ++local_execute_count;
12451c0b2f7Stbbdev return OutputType();
12551c0b2f7Stbbdev }
12651c0b2f7Stbbdev
12751c0b2f7Stbbdev };
12851c0b2f7Stbbdev
12951c0b2f7Stbbdev template< typename OutputType >
continue_nodes_with_copy()13051c0b2f7Stbbdev void continue_nodes_with_copy( ) {
13151c0b2f7Stbbdev
13251c0b2f7Stbbdev for (int p = 1; p < 2*4/*MaxThread*/; ++p) {
13351c0b2f7Stbbdev tbb::flow::graph g;
13451c0b2f7Stbbdev inc_functor<OutputType> cf;
13551c0b2f7Stbbdev cf.local_execute_count = Offset;
13651c0b2f7Stbbdev global_execute_count = Offset;
13751c0b2f7Stbbdev
13851c0b2f7Stbbdev tbb::flow::continue_node< OutputType > exe_node( g, cf );
13951c0b2f7Stbbdev fake_continue_sender fake_sender;
14051c0b2f7Stbbdev for (size_t i = 0; i < N; ++i) {
14151c0b2f7Stbbdev tbb::detail::d1::register_predecessor(exe_node, fake_sender);
14251c0b2f7Stbbdev }
14351c0b2f7Stbbdev
14451c0b2f7Stbbdev for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
14551c0b2f7Stbbdev std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers;
14651c0b2f7Stbbdev for (size_t i = 0; i < num_receivers; ++i) {
14751c0b2f7Stbbdev receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) );
14851c0b2f7Stbbdev }
14951c0b2f7Stbbdev
15051c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) {
15151c0b2f7Stbbdev tbb::flow::make_edge( exe_node, *receivers[r] );
15251c0b2f7Stbbdev }
15351c0b2f7Stbbdev
15451c0b2f7Stbbdev utils::NativeParallelFor( p, parallel_puts<tbb::flow::continue_msg>(exe_node) );
15551c0b2f7Stbbdev g.wait_for_all();
15651c0b2f7Stbbdev
15751c0b2f7Stbbdev // 2) the nodes will receive puts from multiple predecessors simultaneously,
15851c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) {
15951c0b2f7Stbbdev size_t c = receivers[r]->my_count;
16051c0b2f7Stbbdev // 3) the nodes will send to multiple successors.
16151c0b2f7Stbbdev CHECK_MESSAGE( (int)c == p, "" );
16251c0b2f7Stbbdev }
16351c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) {
16451c0b2f7Stbbdev tbb::flow::remove_edge( exe_node, *receivers[r] );
16551c0b2f7Stbbdev }
16651c0b2f7Stbbdev }
16751c0b2f7Stbbdev
16851c0b2f7Stbbdev // validate that the local body matches the global execute_count and both are correct
16951c0b2f7Stbbdev inc_functor<OutputType> body_copy = tbb::flow::copy_body< inc_functor<OutputType> >( exe_node );
17051c0b2f7Stbbdev const size_t expected_count = p*MAX_NODES + Offset;
17151c0b2f7Stbbdev size_t global_count = global_execute_count;
17251c0b2f7Stbbdev size_t inc_count = body_copy.local_execute_count;
17351c0b2f7Stbbdev CHECK_MESSAGE( global_count == expected_count, "" );
17451c0b2f7Stbbdev CHECK_MESSAGE( global_count == inc_count, "" );
17551c0b2f7Stbbdev g.reset(tbb::flow::rf_reset_bodies);
17651c0b2f7Stbbdev body_copy = tbb::flow::copy_body< inc_functor<OutputType> >( exe_node );
17751c0b2f7Stbbdev inc_count = body_copy.local_execute_count;
17851c0b2f7Stbbdev CHECK_MESSAGE( ( Offset == inc_count), "reset(rf_reset_bodies) did not reset functor" );
17951c0b2f7Stbbdev
18051c0b2f7Stbbdev }
18151c0b2f7Stbbdev }
18251c0b2f7Stbbdev
18351c0b2f7Stbbdev template< typename OutputType >
run_continue_nodes()18451c0b2f7Stbbdev void run_continue_nodes() {
18551c0b2f7Stbbdev harness_graph_executor< tbb::flow::continue_msg, OutputType>::max_executors = 0;
18651c0b2f7Stbbdev continue_nodes<OutputType>( []( tbb::flow::continue_msg i ) -> OutputType { return harness_graph_executor<tbb::flow::continue_msg, OutputType>::func(i); } );
18751c0b2f7Stbbdev continue_nodes<OutputType>( &harness_graph_executor<tbb::flow::continue_msg, OutputType>::func );
18851c0b2f7Stbbdev continue_nodes<OutputType>( typename harness_graph_executor<tbb::flow::continue_msg, OutputType>::functor() );
18951c0b2f7Stbbdev continue_nodes_with_copy<OutputType>();
19051c0b2f7Stbbdev }
19151c0b2f7Stbbdev
19251c0b2f7Stbbdev //! Tests limited concurrency cases for nodes that accept data messages
test_concurrency(int num_threads)19351c0b2f7Stbbdev void test_concurrency(int num_threads) {
19451c0b2f7Stbbdev tbb::task_arena arena(num_threads);
19551c0b2f7Stbbdev arena.execute(
19651c0b2f7Stbbdev [&] {
19751c0b2f7Stbbdev run_continue_nodes<tbb::flow::continue_msg>();
19851c0b2f7Stbbdev run_continue_nodes<int>();
19951c0b2f7Stbbdev run_continue_nodes<utils::NoAssign>();
20051c0b2f7Stbbdev }
20151c0b2f7Stbbdev );
20251c0b2f7Stbbdev }
20351c0b2f7Stbbdev /*
20451c0b2f7Stbbdev * Connection of two graphs is not currently supported, but works to some limited extent.
20551c0b2f7Stbbdev * This test is included to check for backward compatibility. It checks that a continue_node
20651c0b2f7Stbbdev * with predecessors in two different graphs receives the required
20751c0b2f7Stbbdev * number of continue messages before it executes.
20851c0b2f7Stbbdev */
20951c0b2f7Stbbdev using namespace tbb::flow;
21051c0b2f7Stbbdev
21151c0b2f7Stbbdev struct add_to_counter {
21251c0b2f7Stbbdev int* counter;
add_to_counteradd_to_counter21351c0b2f7Stbbdev add_to_counter(int& var):counter(&var){}
operator ()add_to_counter21451c0b2f7Stbbdev void operator()(continue_msg){*counter+=1;}
21551c0b2f7Stbbdev };
21651c0b2f7Stbbdev
test_two_graphs()21751c0b2f7Stbbdev void test_two_graphs(){
21851c0b2f7Stbbdev int count=0;
21951c0b2f7Stbbdev
22051c0b2f7Stbbdev //graph g with broadcast_node and continue_node
22151c0b2f7Stbbdev graph g;
22251c0b2f7Stbbdev broadcast_node<continue_msg> start_g(g);
22351c0b2f7Stbbdev continue_node<continue_msg> first_g(g, add_to_counter(count));
22451c0b2f7Stbbdev
22551c0b2f7Stbbdev //graph h with broadcast_node
22651c0b2f7Stbbdev graph h;
22751c0b2f7Stbbdev broadcast_node<continue_msg> start_h(h);
22851c0b2f7Stbbdev
22951c0b2f7Stbbdev //making two edges to first_g from the two graphs
23051c0b2f7Stbbdev make_edge(start_g,first_g);
23151c0b2f7Stbbdev make_edge(start_h, first_g);
23251c0b2f7Stbbdev
23351c0b2f7Stbbdev //two try_puts from the two graphs
23451c0b2f7Stbbdev start_g.try_put(continue_msg());
23551c0b2f7Stbbdev start_h.try_put(continue_msg());
23651c0b2f7Stbbdev g.wait_for_all();
23751c0b2f7Stbbdev CHECK_MESSAGE( (count==1), "Not all continue messages received");
23851c0b2f7Stbbdev
23951c0b2f7Stbbdev //two try_puts from the graph that doesn't contain the node
24051c0b2f7Stbbdev count=0;
24151c0b2f7Stbbdev start_h.try_put(continue_msg());
24251c0b2f7Stbbdev start_h.try_put(continue_msg());
24351c0b2f7Stbbdev g.wait_for_all();
24451c0b2f7Stbbdev CHECK_MESSAGE( (count==1), "Not all continue messages received -1");
24551c0b2f7Stbbdev
24651c0b2f7Stbbdev //only one try_put
24751c0b2f7Stbbdev count=0;
24851c0b2f7Stbbdev start_g.try_put(continue_msg());
24951c0b2f7Stbbdev g.wait_for_all();
25051c0b2f7Stbbdev CHECK_MESSAGE( (count==0), "Node executed without waiting for all predecessors");
25151c0b2f7Stbbdev }
25251c0b2f7Stbbdev
25351c0b2f7Stbbdev struct lightweight_policy_body {
25451c0b2f7Stbbdev const std::thread::id my_thread_id;
25551c0b2f7Stbbdev std::atomic<size_t>& my_count;
25651c0b2f7Stbbdev
lightweight_policy_bodylightweight_policy_body25751c0b2f7Stbbdev lightweight_policy_body( std::atomic<size_t>& count )
25851c0b2f7Stbbdev : my_thread_id(std::this_thread::get_id()), my_count(count)
25951c0b2f7Stbbdev {
26051c0b2f7Stbbdev my_count = 0;
26151c0b2f7Stbbdev }
2621168c5cbSvlserov
2631168c5cbSvlserov lightweight_policy_body( const lightweight_policy_body& ) = default;
26451c0b2f7Stbbdev lightweight_policy_body& operator=( const lightweight_policy_body& ) = delete;
2651168c5cbSvlserov
operator ()lightweight_policy_body26651c0b2f7Stbbdev void operator()( tbb::flow::continue_msg ) {
26751c0b2f7Stbbdev ++my_count;
26851c0b2f7Stbbdev std::thread::id body_thread_id = std::this_thread::get_id();
26951c0b2f7Stbbdev CHECK_MESSAGE( (body_thread_id == my_thread_id), "Body executed as not lightweight");
27051c0b2f7Stbbdev }
27151c0b2f7Stbbdev };
27251c0b2f7Stbbdev
test_lightweight_policy()27351c0b2f7Stbbdev void test_lightweight_policy() {
27451c0b2f7Stbbdev tbb::flow::graph g;
27551c0b2f7Stbbdev std::atomic<size_t> count1;
27651c0b2f7Stbbdev std::atomic<size_t> count2;
27751c0b2f7Stbbdev tbb::flow::continue_node<tbb::flow::continue_msg, tbb::flow::lightweight>
27851c0b2f7Stbbdev node1(g, lightweight_policy_body(count1));
27951c0b2f7Stbbdev tbb::flow::continue_node<tbb::flow::continue_msg, tbb::flow::lightweight>
28051c0b2f7Stbbdev node2(g, lightweight_policy_body(count2));
28151c0b2f7Stbbdev
28251c0b2f7Stbbdev tbb::flow::make_edge(node1, node2);
28351c0b2f7Stbbdev const size_t n = 10;
28451c0b2f7Stbbdev for(size_t i = 0; i < n; ++i) {
28551c0b2f7Stbbdev node1.try_put(tbb::flow::continue_msg());
28651c0b2f7Stbbdev }
28751c0b2f7Stbbdev g.wait_for_all();
28851c0b2f7Stbbdev
28951c0b2f7Stbbdev lightweight_policy_body body1 = tbb::flow::copy_body<lightweight_policy_body>(node1);
29051c0b2f7Stbbdev lightweight_policy_body body2 = tbb::flow::copy_body<lightweight_policy_body>(node2);
29151c0b2f7Stbbdev CHECK_MESSAGE( (body1.my_count == n), "Body of the first node needs to be executed N times");
29251c0b2f7Stbbdev CHECK_MESSAGE( (body2.my_count == n), "Body of the second node needs to be executed N times");
29351c0b2f7Stbbdev }
29451c0b2f7Stbbdev
29551c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
29651c0b2f7Stbbdev #include <array>
29751c0b2f7Stbbdev #include <vector>
test_follows_and_precedes_api()29851c0b2f7Stbbdev void test_follows_and_precedes_api() {
29951c0b2f7Stbbdev using msg_t = tbb::flow::continue_msg;
30051c0b2f7Stbbdev
30151c0b2f7Stbbdev std::array<msg_t, 3> messages_for_follows = { { msg_t(), msg_t(), msg_t() } };
30251c0b2f7Stbbdev std::vector<msg_t> messages_for_precedes = { msg_t() };
30351c0b2f7Stbbdev
30451c0b2f7Stbbdev auto pass_through = [](const msg_t& msg) { return msg; };
30551c0b2f7Stbbdev
30651c0b2f7Stbbdev follows_and_precedes_testing::test_follows
30751c0b2f7Stbbdev <msg_t, tbb::flow::continue_node<msg_t>>
30851c0b2f7Stbbdev (messages_for_follows, pass_through, node_priority_t(0));
30951c0b2f7Stbbdev
31051c0b2f7Stbbdev follows_and_precedes_testing::test_precedes
31151c0b2f7Stbbdev <msg_t, tbb::flow::continue_node<msg_t>>
31251c0b2f7Stbbdev (messages_for_precedes, /* number_of_predecessors = */0, pass_through, node_priority_t(1));
31351c0b2f7Stbbdev }
31451c0b2f7Stbbdev #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
31551c0b2f7Stbbdev
31651c0b2f7Stbbdev // TODO: use pass_through from test_function_node instead
31751c0b2f7Stbbdev template<typename T>
31851c0b2f7Stbbdev struct passing_body {
operator ()passing_body31951c0b2f7Stbbdev T operator()(const T& val) {
32051c0b2f7Stbbdev return val;
32151c0b2f7Stbbdev }
32251c0b2f7Stbbdev };
32351c0b2f7Stbbdev
32451c0b2f7Stbbdev /*
32551c0b2f7Stbbdev The test covers the case when a node with non-default mutex type is a predecessor for continue_node,
32651c0b2f7Stbbdev because there used to be a bug when make_edge(node, continue_node)
32751c0b2f7Stbbdev did not update continue_node's predecesosor threshold
32851c0b2f7Stbbdev since the specialization of node's successor_cache for a continue_node was not chosen.
32951c0b2f7Stbbdev */
test_successor_cache_specialization()33051c0b2f7Stbbdev void test_successor_cache_specialization() {
33151c0b2f7Stbbdev using namespace tbb::flow;
33251c0b2f7Stbbdev
33351c0b2f7Stbbdev graph g;
33451c0b2f7Stbbdev
33551c0b2f7Stbbdev broadcast_node<continue_msg> node_with_default_mutex_type(g);
33651c0b2f7Stbbdev buffer_node<continue_msg> node_with_non_default_mutex_type(g);
33751c0b2f7Stbbdev
33851c0b2f7Stbbdev continue_node<continue_msg> node(g, passing_body<continue_msg>());
33951c0b2f7Stbbdev
34051c0b2f7Stbbdev make_edge(node_with_default_mutex_type, node);
34151c0b2f7Stbbdev make_edge(node_with_non_default_mutex_type, node);
34251c0b2f7Stbbdev
34351c0b2f7Stbbdev buffer_node<continue_msg> buf(g);
34451c0b2f7Stbbdev
34551c0b2f7Stbbdev make_edge(node, buf);
34651c0b2f7Stbbdev
34751c0b2f7Stbbdev node_with_default_mutex_type.try_put(continue_msg());
34851c0b2f7Stbbdev node_with_non_default_mutex_type.try_put(continue_msg());
34951c0b2f7Stbbdev
35051c0b2f7Stbbdev g.wait_for_all();
35151c0b2f7Stbbdev
35251c0b2f7Stbbdev continue_msg storage;
35351c0b2f7Stbbdev CHECK_MESSAGE((buf.try_get(storage) && !buf.try_get(storage)),
35451c0b2f7Stbbdev "Wrong number of messages is passed via continue_node");
35551c0b2f7Stbbdev }
35651c0b2f7Stbbdev
35751c0b2f7Stbbdev //! Test concurrent continue_node for correctness
35851c0b2f7Stbbdev //! \brief \ref error_guessing
35951c0b2f7Stbbdev TEST_CASE("Concurrency testing") {
36051c0b2f7Stbbdev for( unsigned p=utils::MinThread; p<=utils::MaxThread; ++p ) {
36151c0b2f7Stbbdev test_concurrency(p);
36251c0b2f7Stbbdev }
36351c0b2f7Stbbdev }
36451c0b2f7Stbbdev
36551c0b2f7Stbbdev //! Test concurrent continue_node in separate graphs
36651c0b2f7Stbbdev //! \brief \ref error_guessing
36751c0b2f7Stbbdev TEST_CASE("Two graphs") { test_two_graphs(); }
36851c0b2f7Stbbdev
36951c0b2f7Stbbdev //! Test basic behaviour with lightweight body
37051c0b2f7Stbbdev //! \brief \ref requirement \ref error_guessing
37151c0b2f7Stbbdev TEST_CASE( "Lightweight policy" ) { test_lightweight_policy(); }
37251c0b2f7Stbbdev
37351c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
374*c4a799dfSJhaShweta1 //! Test deprecated follows and precedes API
37551c0b2f7Stbbdev //! \brief \ref error_guessing
37651c0b2f7Stbbdev TEST_CASE( "Support for follows and precedes API" ) { test_follows_and_precedes_api(); }
37751c0b2f7Stbbdev #endif
37851c0b2f7Stbbdev
37951c0b2f7Stbbdev //! Test for successor cache specialization
38051c0b2f7Stbbdev //! \brief \ref regression
38151c0b2f7Stbbdev TEST_CASE( "Regression for successor cache specialization" ) {
38251c0b2f7Stbbdev test_successor_cache_specialization();
38351c0b2f7Stbbdev }
384478de5b1Stbbdev
385478de5b1Stbbdev #if __TBB_CPP20_CONCEPTS_PRESENT
386478de5b1Stbbdev //! \brief \ref error_guessing
387478de5b1Stbbdev TEST_CASE("constraints for continue_node input") {
388478de5b1Stbbdev static_assert(utils::well_formed_instantiation<tbb::flow::continue_node, test_concepts::Copyable>);
389478de5b1Stbbdev static_assert(!utils::well_formed_instantiation<tbb::flow::continue_node, test_concepts::NonCopyable>);
390478de5b1Stbbdev }
391478de5b1Stbbdev
392478de5b1Stbbdev template <typename Input, typename Body>
393478de5b1Stbbdev concept can_call_continue_node_ctor = requires( tbb::flow::graph& graph, Body body,
394478de5b1Stbbdev tbb::flow::buffer_node<int>& f, std::size_t num,
395478de5b1Stbbdev tbb::flow::node_priority_t priority ) {
396478de5b1Stbbdev tbb::flow::continue_node<Input>(graph, body);
397478de5b1Stbbdev tbb::flow::continue_node<Input>(graph, body, priority);
398478de5b1Stbbdev tbb::flow::continue_node<Input>(graph, num, body);
399478de5b1Stbbdev tbb::flow::continue_node<Input>(graph, num, body, priority);
400478de5b1Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
401478de5b1Stbbdev tbb::flow::continue_node<Input>(tbb::flow::follows(f), body);
402478de5b1Stbbdev tbb::flow::continue_node<Input>(tbb::flow::follows(f), body, priority);
403478de5b1Stbbdev tbb::flow::continue_node<Input>(tbb::flow::follows(f), num, body);
404478de5b1Stbbdev tbb::flow::continue_node<Input>(tbb::flow::follows(f), num, body, priority);
405478de5b1Stbbdev #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
406478de5b1Stbbdev };
407478de5b1Stbbdev
408478de5b1Stbbdev //! \brief \ref error_guessing
409478de5b1Stbbdev TEST_CASE("constraints for continue_node body") {
410478de5b1Stbbdev using output_type = int;
411478de5b1Stbbdev using namespace test_concepts::continue_node_body;
412478de5b1Stbbdev
413478de5b1Stbbdev static_assert(can_call_continue_node_ctor<output_type, Correct<output_type>>);
414478de5b1Stbbdev static_assert(!can_call_continue_node_ctor<output_type, NonCopyable<output_type>>);
415478de5b1Stbbdev static_assert(!can_call_continue_node_ctor<output_type, NonDestructible<output_type>>);
416478de5b1Stbbdev static_assert(!can_call_continue_node_ctor<output_type, NoOperatorRoundBrackets<output_type>>);
417478de5b1Stbbdev static_assert(!can_call_continue_node_ctor<output_type, WrongInputOperatorRoundBrackets<output_type>>);
418478de5b1Stbbdev static_assert(!can_call_continue_node_ctor<output_type, WrongReturnOperatorRoundBrackets<output_type>>);
419478de5b1Stbbdev }
420478de5b1Stbbdev #endif // __TBB_CPP20_CONCEPTS_PRESENT
421