151c0b2f7Stbbdev /* 2b15aabb3Stbbdev Copyright (c) 2005-2021 Intel Corporation 351c0b2f7Stbbdev 451c0b2f7Stbbdev Licensed under the Apache License, Version 2.0 (the "License"); 551c0b2f7Stbbdev you may not use this file except in compliance with the License. 651c0b2f7Stbbdev You may obtain a copy of the License at 751c0b2f7Stbbdev 851c0b2f7Stbbdev http://www.apache.org/licenses/LICENSE-2.0 951c0b2f7Stbbdev 1051c0b2f7Stbbdev Unless required by applicable law or agreed to in writing, software 1151c0b2f7Stbbdev distributed under the License is distributed on an "AS IS" BASIS, 1251c0b2f7Stbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 1351c0b2f7Stbbdev See the License for the specific language governing permissions and 1451c0b2f7Stbbdev limitations under the License. 1551c0b2f7Stbbdev */ 1651c0b2f7Stbbdev 1751c0b2f7Stbbdev #include "common/config.h" 1851c0b2f7Stbbdev 1951c0b2f7Stbbdev #include "tbb/flow_graph.h" 2051c0b2f7Stbbdev 2151c0b2f7Stbbdev #include "common/test.h" 2251c0b2f7Stbbdev #include "common/utils.h" 2351c0b2f7Stbbdev #include "common/graph_utils.h" 2451c0b2f7Stbbdev #include "common/test_follows_and_precedes_api.h" 25478de5b1Stbbdev #include "common/concepts_common.h" 2651c0b2f7Stbbdev 2751c0b2f7Stbbdev 2851c0b2f7Stbbdev //! \file test_continue_node.cpp 2951c0b2f7Stbbdev //! \brief Test for [flow_graph.continue_node] specification 3051c0b2f7Stbbdev 3151c0b2f7Stbbdev 3251c0b2f7Stbbdev #define N 1000 3351c0b2f7Stbbdev #define MAX_NODES 4 3451c0b2f7Stbbdev #define C 8 3551c0b2f7Stbbdev 3651c0b2f7Stbbdev // A class to use as a fake predecessor of continue_node 3751c0b2f7Stbbdev struct fake_continue_sender : public tbb::flow::sender<tbb::flow::continue_msg> 3851c0b2f7Stbbdev { 3951c0b2f7Stbbdev typedef tbb::flow::sender<tbb::flow::continue_msg>::successor_type successor_type; 4051c0b2f7Stbbdev // Define implementations of virtual methods that are abstract in the base class 4151c0b2f7Stbbdev bool register_successor( successor_type& ) override { return false; } 4251c0b2f7Stbbdev bool remove_successor( successor_type& ) override { return false; } 4351c0b2f7Stbbdev }; 4451c0b2f7Stbbdev 4551c0b2f7Stbbdev template< typename InputType > 4651c0b2f7Stbbdev struct parallel_puts { 4751c0b2f7Stbbdev 4851c0b2f7Stbbdev tbb::flow::receiver< InputType > * const my_exe_node; 4951c0b2f7Stbbdev 5051c0b2f7Stbbdev parallel_puts( tbb::flow::receiver< InputType > &exe_node ) : my_exe_node(&exe_node) {} 5151c0b2f7Stbbdev parallel_puts& operator=(const parallel_puts&) = delete; 5251c0b2f7Stbbdev 5351c0b2f7Stbbdev void operator()( int ) const { 5451c0b2f7Stbbdev for ( int i = 0; i < N; ++i ) { 5551c0b2f7Stbbdev // the nodes will accept all puts 5651c0b2f7Stbbdev CHECK_MESSAGE( my_exe_node->try_put( InputType() ) == true, "" ); 5751c0b2f7Stbbdev } 5851c0b2f7Stbbdev } 5951c0b2f7Stbbdev 6051c0b2f7Stbbdev }; 6151c0b2f7Stbbdev 6251c0b2f7Stbbdev template< typename OutputType > 6351c0b2f7Stbbdev void run_continue_nodes( int p, tbb::flow::graph& g, tbb::flow::continue_node< OutputType >& n ) { 6451c0b2f7Stbbdev fake_continue_sender fake_sender; 6551c0b2f7Stbbdev for (size_t i = 0; i < N; ++i) { 6651c0b2f7Stbbdev tbb::detail::d1::register_predecessor(n, fake_sender); 6751c0b2f7Stbbdev } 6851c0b2f7Stbbdev 6951c0b2f7Stbbdev for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { 7051c0b2f7Stbbdev std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers; 7151c0b2f7Stbbdev for (size_t i = 0; i < num_receivers; ++i) { 7251c0b2f7Stbbdev receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) ); 7351c0b2f7Stbbdev } 7451c0b2f7Stbbdev harness_graph_executor<tbb::flow::continue_msg, OutputType>::execute_count = 0; 7551c0b2f7Stbbdev 7651c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 7751c0b2f7Stbbdev tbb::flow::make_edge( n, *receivers[r] ); 7851c0b2f7Stbbdev } 7951c0b2f7Stbbdev 8051c0b2f7Stbbdev utils::NativeParallelFor( p, parallel_puts<tbb::flow::continue_msg>(n) ); 8151c0b2f7Stbbdev g.wait_for_all(); 8251c0b2f7Stbbdev 8351c0b2f7Stbbdev // 2) the nodes will receive puts from multiple predecessors simultaneously, 8451c0b2f7Stbbdev size_t ec = harness_graph_executor<tbb::flow::continue_msg, OutputType>::execute_count; 8551c0b2f7Stbbdev CHECK_MESSAGE( (int)ec == p, "" ); 8651c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 8751c0b2f7Stbbdev size_t c = receivers[r]->my_count; 8851c0b2f7Stbbdev // 3) the nodes will send to multiple successors. 8951c0b2f7Stbbdev CHECK_MESSAGE( (int)c == p, "" ); 9051c0b2f7Stbbdev } 9151c0b2f7Stbbdev 9251c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 9351c0b2f7Stbbdev tbb::flow::remove_edge( n, *receivers[r] ); 9451c0b2f7Stbbdev } 9551c0b2f7Stbbdev } 9651c0b2f7Stbbdev } 9751c0b2f7Stbbdev 9851c0b2f7Stbbdev template< typename OutputType, typename Body > 9951c0b2f7Stbbdev void continue_nodes( Body body ) { 10051c0b2f7Stbbdev for (int p = 1; p < 2*4/*MaxThread*/; ++p) { 10151c0b2f7Stbbdev tbb::flow::graph g; 10251c0b2f7Stbbdev tbb::flow::continue_node< OutputType > exe_node( g, body ); 10351c0b2f7Stbbdev run_continue_nodes( p, g, exe_node); 10451c0b2f7Stbbdev exe_node.try_put(tbb::flow::continue_msg()); 10551c0b2f7Stbbdev tbb::flow::continue_node< OutputType > exe_node_copy( exe_node ); 10651c0b2f7Stbbdev run_continue_nodes( p, g, exe_node_copy); 10751c0b2f7Stbbdev } 10851c0b2f7Stbbdev } 10951c0b2f7Stbbdev 11051c0b2f7Stbbdev const size_t Offset = 123; 11151c0b2f7Stbbdev std::atomic<size_t> global_execute_count; 11251c0b2f7Stbbdev 11351c0b2f7Stbbdev template< typename OutputType > 11451c0b2f7Stbbdev struct inc_functor { 11551c0b2f7Stbbdev 11651c0b2f7Stbbdev std::atomic<size_t> local_execute_count; 11751c0b2f7Stbbdev inc_functor( ) { local_execute_count = 0; } 11851c0b2f7Stbbdev inc_functor( const inc_functor &f ) { local_execute_count = size_t(f.local_execute_count); } 11951c0b2f7Stbbdev void operator=(const inc_functor &f) { local_execute_count = size_t(f.local_execute_count); } 12051c0b2f7Stbbdev 12151c0b2f7Stbbdev OutputType operator()( tbb::flow::continue_msg ) { 12251c0b2f7Stbbdev ++global_execute_count; 12351c0b2f7Stbbdev ++local_execute_count; 12451c0b2f7Stbbdev return OutputType(); 12551c0b2f7Stbbdev } 12651c0b2f7Stbbdev 12751c0b2f7Stbbdev }; 12851c0b2f7Stbbdev 12951c0b2f7Stbbdev template< typename OutputType > 13051c0b2f7Stbbdev void continue_nodes_with_copy( ) { 13151c0b2f7Stbbdev 13251c0b2f7Stbbdev for (int p = 1; p < 2*4/*MaxThread*/; ++p) { 13351c0b2f7Stbbdev tbb::flow::graph g; 13451c0b2f7Stbbdev inc_functor<OutputType> cf; 13551c0b2f7Stbbdev cf.local_execute_count = Offset; 13651c0b2f7Stbbdev global_execute_count = Offset; 13751c0b2f7Stbbdev 13851c0b2f7Stbbdev tbb::flow::continue_node< OutputType > exe_node( g, cf ); 13951c0b2f7Stbbdev fake_continue_sender fake_sender; 14051c0b2f7Stbbdev for (size_t i = 0; i < N; ++i) { 14151c0b2f7Stbbdev tbb::detail::d1::register_predecessor(exe_node, fake_sender); 14251c0b2f7Stbbdev } 14351c0b2f7Stbbdev 14451c0b2f7Stbbdev for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { 14551c0b2f7Stbbdev std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers; 14651c0b2f7Stbbdev for (size_t i = 0; i < num_receivers; ++i) { 14751c0b2f7Stbbdev receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) ); 14851c0b2f7Stbbdev } 14951c0b2f7Stbbdev 15051c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 15151c0b2f7Stbbdev tbb::flow::make_edge( exe_node, *receivers[r] ); 15251c0b2f7Stbbdev } 15351c0b2f7Stbbdev 15451c0b2f7Stbbdev utils::NativeParallelFor( p, parallel_puts<tbb::flow::continue_msg>(exe_node) ); 15551c0b2f7Stbbdev g.wait_for_all(); 15651c0b2f7Stbbdev 15751c0b2f7Stbbdev // 2) the nodes will receive puts from multiple predecessors simultaneously, 15851c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 15951c0b2f7Stbbdev size_t c = receivers[r]->my_count; 16051c0b2f7Stbbdev // 3) the nodes will send to multiple successors. 16151c0b2f7Stbbdev CHECK_MESSAGE( (int)c == p, "" ); 16251c0b2f7Stbbdev } 16351c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 16451c0b2f7Stbbdev tbb::flow::remove_edge( exe_node, *receivers[r] ); 16551c0b2f7Stbbdev } 16651c0b2f7Stbbdev } 16751c0b2f7Stbbdev 16851c0b2f7Stbbdev // validate that the local body matches the global execute_count and both are correct 16951c0b2f7Stbbdev inc_functor<OutputType> body_copy = tbb::flow::copy_body< inc_functor<OutputType> >( exe_node ); 17051c0b2f7Stbbdev const size_t expected_count = p*MAX_NODES + Offset; 17151c0b2f7Stbbdev size_t global_count = global_execute_count; 17251c0b2f7Stbbdev size_t inc_count = body_copy.local_execute_count; 17351c0b2f7Stbbdev CHECK_MESSAGE( global_count == expected_count, "" ); 17451c0b2f7Stbbdev CHECK_MESSAGE( global_count == inc_count, "" ); 17551c0b2f7Stbbdev g.reset(tbb::flow::rf_reset_bodies); 17651c0b2f7Stbbdev body_copy = tbb::flow::copy_body< inc_functor<OutputType> >( exe_node ); 17751c0b2f7Stbbdev inc_count = body_copy.local_execute_count; 17851c0b2f7Stbbdev CHECK_MESSAGE( ( Offset == inc_count), "reset(rf_reset_bodies) did not reset functor" ); 17951c0b2f7Stbbdev 18051c0b2f7Stbbdev } 18151c0b2f7Stbbdev } 18251c0b2f7Stbbdev 18351c0b2f7Stbbdev template< typename OutputType > 18451c0b2f7Stbbdev void run_continue_nodes() { 18551c0b2f7Stbbdev harness_graph_executor< tbb::flow::continue_msg, OutputType>::max_executors = 0; 18651c0b2f7Stbbdev continue_nodes<OutputType>( []( tbb::flow::continue_msg i ) -> OutputType { return harness_graph_executor<tbb::flow::continue_msg, OutputType>::func(i); } ); 18751c0b2f7Stbbdev continue_nodes<OutputType>( &harness_graph_executor<tbb::flow::continue_msg, OutputType>::func ); 18851c0b2f7Stbbdev continue_nodes<OutputType>( typename harness_graph_executor<tbb::flow::continue_msg, OutputType>::functor() ); 18951c0b2f7Stbbdev continue_nodes_with_copy<OutputType>(); 19051c0b2f7Stbbdev } 19151c0b2f7Stbbdev 19251c0b2f7Stbbdev //! Tests limited concurrency cases for nodes that accept data messages 19351c0b2f7Stbbdev void test_concurrency(int num_threads) { 19451c0b2f7Stbbdev tbb::task_arena arena(num_threads); 19551c0b2f7Stbbdev arena.execute( 19651c0b2f7Stbbdev [&] { 19751c0b2f7Stbbdev run_continue_nodes<tbb::flow::continue_msg>(); 19851c0b2f7Stbbdev run_continue_nodes<int>(); 19951c0b2f7Stbbdev run_continue_nodes<utils::NoAssign>(); 20051c0b2f7Stbbdev } 20151c0b2f7Stbbdev ); 20251c0b2f7Stbbdev } 20351c0b2f7Stbbdev /* 20451c0b2f7Stbbdev * Connection of two graphs is not currently supported, but works to some limited extent. 20551c0b2f7Stbbdev * This test is included to check for backward compatibility. It checks that a continue_node 20651c0b2f7Stbbdev * with predecessors in two different graphs receives the required 20751c0b2f7Stbbdev * number of continue messages before it executes. 20851c0b2f7Stbbdev */ 20951c0b2f7Stbbdev using namespace tbb::flow; 21051c0b2f7Stbbdev 21151c0b2f7Stbbdev struct add_to_counter { 21251c0b2f7Stbbdev int* counter; 21351c0b2f7Stbbdev add_to_counter(int& var):counter(&var){} 21451c0b2f7Stbbdev void operator()(continue_msg){*counter+=1;} 21551c0b2f7Stbbdev }; 21651c0b2f7Stbbdev 21751c0b2f7Stbbdev void test_two_graphs(){ 21851c0b2f7Stbbdev int count=0; 21951c0b2f7Stbbdev 22051c0b2f7Stbbdev //graph g with broadcast_node and continue_node 22151c0b2f7Stbbdev graph g; 22251c0b2f7Stbbdev broadcast_node<continue_msg> start_g(g); 22351c0b2f7Stbbdev continue_node<continue_msg> first_g(g, add_to_counter(count)); 22451c0b2f7Stbbdev 22551c0b2f7Stbbdev //graph h with broadcast_node 22651c0b2f7Stbbdev graph h; 22751c0b2f7Stbbdev broadcast_node<continue_msg> start_h(h); 22851c0b2f7Stbbdev 22951c0b2f7Stbbdev //making two edges to first_g from the two graphs 23051c0b2f7Stbbdev make_edge(start_g,first_g); 23151c0b2f7Stbbdev make_edge(start_h, first_g); 23251c0b2f7Stbbdev 23351c0b2f7Stbbdev //two try_puts from the two graphs 23451c0b2f7Stbbdev start_g.try_put(continue_msg()); 23551c0b2f7Stbbdev start_h.try_put(continue_msg()); 23651c0b2f7Stbbdev g.wait_for_all(); 23751c0b2f7Stbbdev CHECK_MESSAGE( (count==1), "Not all continue messages received"); 23851c0b2f7Stbbdev 23951c0b2f7Stbbdev //two try_puts from the graph that doesn't contain the node 24051c0b2f7Stbbdev count=0; 24151c0b2f7Stbbdev start_h.try_put(continue_msg()); 24251c0b2f7Stbbdev start_h.try_put(continue_msg()); 24351c0b2f7Stbbdev g.wait_for_all(); 24451c0b2f7Stbbdev CHECK_MESSAGE( (count==1), "Not all continue messages received -1"); 24551c0b2f7Stbbdev 24651c0b2f7Stbbdev //only one try_put 24751c0b2f7Stbbdev count=0; 24851c0b2f7Stbbdev start_g.try_put(continue_msg()); 24951c0b2f7Stbbdev g.wait_for_all(); 25051c0b2f7Stbbdev CHECK_MESSAGE( (count==0), "Node executed without waiting for all predecessors"); 25151c0b2f7Stbbdev } 25251c0b2f7Stbbdev 25351c0b2f7Stbbdev struct lightweight_policy_body { 25451c0b2f7Stbbdev const std::thread::id my_thread_id; 25551c0b2f7Stbbdev std::atomic<size_t>& my_count; 25651c0b2f7Stbbdev 25751c0b2f7Stbbdev lightweight_policy_body( std::atomic<size_t>& count ) 25851c0b2f7Stbbdev : my_thread_id(std::this_thread::get_id()), my_count(count) 25951c0b2f7Stbbdev { 26051c0b2f7Stbbdev my_count = 0; 26151c0b2f7Stbbdev } 262*1168c5cbSvlserov 263*1168c5cbSvlserov lightweight_policy_body( const lightweight_policy_body& ) = default; 26451c0b2f7Stbbdev lightweight_policy_body& operator=( const lightweight_policy_body& ) = delete; 265*1168c5cbSvlserov 26651c0b2f7Stbbdev void operator()( tbb::flow::continue_msg ) { 26751c0b2f7Stbbdev ++my_count; 26851c0b2f7Stbbdev std::thread::id body_thread_id = std::this_thread::get_id(); 26951c0b2f7Stbbdev CHECK_MESSAGE( (body_thread_id == my_thread_id), "Body executed as not lightweight"); 27051c0b2f7Stbbdev } 27151c0b2f7Stbbdev }; 27251c0b2f7Stbbdev 27351c0b2f7Stbbdev void test_lightweight_policy() { 27451c0b2f7Stbbdev tbb::flow::graph g; 27551c0b2f7Stbbdev std::atomic<size_t> count1; 27651c0b2f7Stbbdev std::atomic<size_t> count2; 27751c0b2f7Stbbdev tbb::flow::continue_node<tbb::flow::continue_msg, tbb::flow::lightweight> 27851c0b2f7Stbbdev node1(g, lightweight_policy_body(count1)); 27951c0b2f7Stbbdev tbb::flow::continue_node<tbb::flow::continue_msg, tbb::flow::lightweight> 28051c0b2f7Stbbdev node2(g, lightweight_policy_body(count2)); 28151c0b2f7Stbbdev 28251c0b2f7Stbbdev tbb::flow::make_edge(node1, node2); 28351c0b2f7Stbbdev const size_t n = 10; 28451c0b2f7Stbbdev for(size_t i = 0; i < n; ++i) { 28551c0b2f7Stbbdev node1.try_put(tbb::flow::continue_msg()); 28651c0b2f7Stbbdev } 28751c0b2f7Stbbdev g.wait_for_all(); 28851c0b2f7Stbbdev 28951c0b2f7Stbbdev lightweight_policy_body body1 = tbb::flow::copy_body<lightweight_policy_body>(node1); 29051c0b2f7Stbbdev lightweight_policy_body body2 = tbb::flow::copy_body<lightweight_policy_body>(node2); 29151c0b2f7Stbbdev CHECK_MESSAGE( (body1.my_count == n), "Body of the first node needs to be executed N times"); 29251c0b2f7Stbbdev CHECK_MESSAGE( (body2.my_count == n), "Body of the second node needs to be executed N times"); 29351c0b2f7Stbbdev } 29451c0b2f7Stbbdev 29551c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 29651c0b2f7Stbbdev #include <array> 29751c0b2f7Stbbdev #include <vector> 29851c0b2f7Stbbdev void test_follows_and_precedes_api() { 29951c0b2f7Stbbdev using msg_t = tbb::flow::continue_msg; 30051c0b2f7Stbbdev 30151c0b2f7Stbbdev std::array<msg_t, 3> messages_for_follows = { { msg_t(), msg_t(), msg_t() } }; 30251c0b2f7Stbbdev std::vector<msg_t> messages_for_precedes = { msg_t() }; 30351c0b2f7Stbbdev 30451c0b2f7Stbbdev auto pass_through = [](const msg_t& msg) { return msg; }; 30551c0b2f7Stbbdev 30651c0b2f7Stbbdev follows_and_precedes_testing::test_follows 30751c0b2f7Stbbdev <msg_t, tbb::flow::continue_node<msg_t>> 30851c0b2f7Stbbdev (messages_for_follows, pass_through, node_priority_t(0)); 30951c0b2f7Stbbdev 31051c0b2f7Stbbdev follows_and_precedes_testing::test_precedes 31151c0b2f7Stbbdev <msg_t, tbb::flow::continue_node<msg_t>> 31251c0b2f7Stbbdev (messages_for_precedes, /* number_of_predecessors = */0, pass_through, node_priority_t(1)); 31351c0b2f7Stbbdev } 31451c0b2f7Stbbdev #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 31551c0b2f7Stbbdev 31651c0b2f7Stbbdev // TODO: use pass_through from test_function_node instead 31751c0b2f7Stbbdev template<typename T> 31851c0b2f7Stbbdev struct passing_body { 31951c0b2f7Stbbdev T operator()(const T& val) { 32051c0b2f7Stbbdev return val; 32151c0b2f7Stbbdev } 32251c0b2f7Stbbdev }; 32351c0b2f7Stbbdev 32451c0b2f7Stbbdev /* 32551c0b2f7Stbbdev The test covers the case when a node with non-default mutex type is a predecessor for continue_node, 32651c0b2f7Stbbdev because there used to be a bug when make_edge(node, continue_node) 32751c0b2f7Stbbdev did not update continue_node's predecesosor threshold 32851c0b2f7Stbbdev since the specialization of node's successor_cache for a continue_node was not chosen. 32951c0b2f7Stbbdev */ 33051c0b2f7Stbbdev void test_successor_cache_specialization() { 33151c0b2f7Stbbdev using namespace tbb::flow; 33251c0b2f7Stbbdev 33351c0b2f7Stbbdev graph g; 33451c0b2f7Stbbdev 33551c0b2f7Stbbdev broadcast_node<continue_msg> node_with_default_mutex_type(g); 33651c0b2f7Stbbdev buffer_node<continue_msg> node_with_non_default_mutex_type(g); 33751c0b2f7Stbbdev 33851c0b2f7Stbbdev continue_node<continue_msg> node(g, passing_body<continue_msg>()); 33951c0b2f7Stbbdev 34051c0b2f7Stbbdev make_edge(node_with_default_mutex_type, node); 34151c0b2f7Stbbdev make_edge(node_with_non_default_mutex_type, node); 34251c0b2f7Stbbdev 34351c0b2f7Stbbdev buffer_node<continue_msg> buf(g); 34451c0b2f7Stbbdev 34551c0b2f7Stbbdev make_edge(node, buf); 34651c0b2f7Stbbdev 34751c0b2f7Stbbdev node_with_default_mutex_type.try_put(continue_msg()); 34851c0b2f7Stbbdev node_with_non_default_mutex_type.try_put(continue_msg()); 34951c0b2f7Stbbdev 35051c0b2f7Stbbdev g.wait_for_all(); 35151c0b2f7Stbbdev 35251c0b2f7Stbbdev continue_msg storage; 35351c0b2f7Stbbdev CHECK_MESSAGE((buf.try_get(storage) && !buf.try_get(storage)), 35451c0b2f7Stbbdev "Wrong number of messages is passed via continue_node"); 35551c0b2f7Stbbdev } 35651c0b2f7Stbbdev 35751c0b2f7Stbbdev //! Test concurrent continue_node for correctness 35851c0b2f7Stbbdev //! \brief \ref error_guessing 35951c0b2f7Stbbdev TEST_CASE("Concurrency testing") { 36051c0b2f7Stbbdev for( unsigned p=utils::MinThread; p<=utils::MaxThread; ++p ) { 36151c0b2f7Stbbdev test_concurrency(p); 36251c0b2f7Stbbdev } 36351c0b2f7Stbbdev } 36451c0b2f7Stbbdev 36551c0b2f7Stbbdev //! Test concurrent continue_node in separate graphs 36651c0b2f7Stbbdev //! \brief \ref error_guessing 36751c0b2f7Stbbdev TEST_CASE("Two graphs") { test_two_graphs(); } 36851c0b2f7Stbbdev 36951c0b2f7Stbbdev //! Test basic behaviour with lightweight body 37051c0b2f7Stbbdev //! \brief \ref requirement \ref error_guessing 37151c0b2f7Stbbdev TEST_CASE( "Lightweight policy" ) { test_lightweight_policy(); } 37251c0b2f7Stbbdev 37351c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 37451c0b2f7Stbbdev //! Test deprecated follows and preceedes API 37551c0b2f7Stbbdev //! \brief \ref error_guessing 37651c0b2f7Stbbdev TEST_CASE( "Support for follows and precedes API" ) { test_follows_and_precedes_api(); } 37751c0b2f7Stbbdev #endif 37851c0b2f7Stbbdev 37951c0b2f7Stbbdev //! Test for successor cache specialization 38051c0b2f7Stbbdev //! \brief \ref regression 38151c0b2f7Stbbdev TEST_CASE( "Regression for successor cache specialization" ) { 38251c0b2f7Stbbdev test_successor_cache_specialization(); 38351c0b2f7Stbbdev } 384478de5b1Stbbdev 385478de5b1Stbbdev #if __TBB_CPP20_CONCEPTS_PRESENT 386478de5b1Stbbdev //! \brief \ref error_guessing 387478de5b1Stbbdev TEST_CASE("constraints for continue_node input") { 388478de5b1Stbbdev static_assert(utils::well_formed_instantiation<tbb::flow::continue_node, test_concepts::Copyable>); 389478de5b1Stbbdev static_assert(!utils::well_formed_instantiation<tbb::flow::continue_node, test_concepts::NonCopyable>); 390478de5b1Stbbdev } 391478de5b1Stbbdev 392478de5b1Stbbdev template <typename Input, typename Body> 393478de5b1Stbbdev concept can_call_continue_node_ctor = requires( tbb::flow::graph& graph, Body body, 394478de5b1Stbbdev tbb::flow::buffer_node<int>& f, std::size_t num, 395478de5b1Stbbdev tbb::flow::node_priority_t priority ) { 396478de5b1Stbbdev tbb::flow::continue_node<Input>(graph, body); 397478de5b1Stbbdev tbb::flow::continue_node<Input>(graph, body, priority); 398478de5b1Stbbdev tbb::flow::continue_node<Input>(graph, num, body); 399478de5b1Stbbdev tbb::flow::continue_node<Input>(graph, num, body, priority); 400478de5b1Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 401478de5b1Stbbdev tbb::flow::continue_node<Input>(tbb::flow::follows(f), body); 402478de5b1Stbbdev tbb::flow::continue_node<Input>(tbb::flow::follows(f), body, priority); 403478de5b1Stbbdev tbb::flow::continue_node<Input>(tbb::flow::follows(f), num, body); 404478de5b1Stbbdev tbb::flow::continue_node<Input>(tbb::flow::follows(f), num, body, priority); 405478de5b1Stbbdev #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 406478de5b1Stbbdev }; 407478de5b1Stbbdev 408478de5b1Stbbdev //! \brief \ref error_guessing 409478de5b1Stbbdev TEST_CASE("constraints for continue_node body") { 410478de5b1Stbbdev using output_type = int; 411478de5b1Stbbdev using namespace test_concepts::continue_node_body; 412478de5b1Stbbdev 413478de5b1Stbbdev static_assert(can_call_continue_node_ctor<output_type, Correct<output_type>>); 414478de5b1Stbbdev static_assert(!can_call_continue_node_ctor<output_type, NonCopyable<output_type>>); 415478de5b1Stbbdev static_assert(!can_call_continue_node_ctor<output_type, NonDestructible<output_type>>); 416478de5b1Stbbdev static_assert(!can_call_continue_node_ctor<output_type, NoOperatorRoundBrackets<output_type>>); 417478de5b1Stbbdev static_assert(!can_call_continue_node_ctor<output_type, WrongInputOperatorRoundBrackets<output_type>>); 418478de5b1Stbbdev static_assert(!can_call_continue_node_ctor<output_type, WrongReturnOperatorRoundBrackets<output_type>>); 419478de5b1Stbbdev } 420478de5b1Stbbdev #endif // __TBB_CPP20_CONCEPTS_PRESENT 421