151c0b2f7Stbbdev /* 2*b15aabb3Stbbdev Copyright (c) 2005-2021 Intel Corporation 351c0b2f7Stbbdev 451c0b2f7Stbbdev Licensed under the Apache License, Version 2.0 (the "License"); 551c0b2f7Stbbdev you may not use this file except in compliance with the License. 651c0b2f7Stbbdev You may obtain a copy of the License at 751c0b2f7Stbbdev 851c0b2f7Stbbdev http://www.apache.org/licenses/LICENSE-2.0 951c0b2f7Stbbdev 1051c0b2f7Stbbdev Unless required by applicable law or agreed to in writing, software 1151c0b2f7Stbbdev distributed under the License is distributed on an "AS IS" BASIS, 1251c0b2f7Stbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 1351c0b2f7Stbbdev See the License for the specific language governing permissions and 1451c0b2f7Stbbdev limitations under the License. 1551c0b2f7Stbbdev */ 1651c0b2f7Stbbdev 1751c0b2f7Stbbdev #include "common/config.h" 1851c0b2f7Stbbdev 1951c0b2f7Stbbdev // TODO revamp: move parts dependent on __TBB_EXTRA_DEBUG into separate test(s) since having these 2051c0b2f7Stbbdev // parts in all of tests might make testing of the product, which is different from what is actually 2151c0b2f7Stbbdev // released. 2251c0b2f7Stbbdev #define __TBB_EXTRA_DEBUG 1 2351c0b2f7Stbbdev #include "tbb/flow_graph.h" 2451c0b2f7Stbbdev 2551c0b2f7Stbbdev #include "common/test.h" 2651c0b2f7Stbbdev #include "common/utils.h" 2751c0b2f7Stbbdev #include "common/graph_utils.h" 2851c0b2f7Stbbdev #include "common/test_follows_and_precedes_api.h" 2951c0b2f7Stbbdev 3051c0b2f7Stbbdev 3151c0b2f7Stbbdev //! \file test_continue_node.cpp 3251c0b2f7Stbbdev //! \brief Test for [flow_graph.continue_node] specification 3351c0b2f7Stbbdev 3451c0b2f7Stbbdev 3551c0b2f7Stbbdev #define N 1000 3651c0b2f7Stbbdev #define MAX_NODES 4 3751c0b2f7Stbbdev #define C 8 3851c0b2f7Stbbdev 3951c0b2f7Stbbdev // A class to use as a fake predecessor of continue_node 4051c0b2f7Stbbdev struct fake_continue_sender : public tbb::flow::sender<tbb::flow::continue_msg> 4151c0b2f7Stbbdev { 4251c0b2f7Stbbdev typedef tbb::flow::sender<tbb::flow::continue_msg>::successor_type successor_type; 4351c0b2f7Stbbdev // Define implementations of virtual methods that are abstract in the base class 4451c0b2f7Stbbdev bool register_successor( successor_type& ) override { return false; } 4551c0b2f7Stbbdev bool remove_successor( successor_type& ) override { return false; } 4651c0b2f7Stbbdev }; 4751c0b2f7Stbbdev 4851c0b2f7Stbbdev template< typename InputType > 4951c0b2f7Stbbdev struct parallel_puts { 5051c0b2f7Stbbdev 5151c0b2f7Stbbdev tbb::flow::receiver< InputType > * const my_exe_node; 5251c0b2f7Stbbdev 5351c0b2f7Stbbdev parallel_puts( tbb::flow::receiver< InputType > &exe_node ) : my_exe_node(&exe_node) {} 5451c0b2f7Stbbdev parallel_puts& operator=(const parallel_puts&) = delete; 5551c0b2f7Stbbdev 5651c0b2f7Stbbdev void operator()( int ) const { 5751c0b2f7Stbbdev for ( int i = 0; i < N; ++i ) { 5851c0b2f7Stbbdev // the nodes will accept all puts 5951c0b2f7Stbbdev CHECK_MESSAGE( my_exe_node->try_put( InputType() ) == true, "" ); 6051c0b2f7Stbbdev } 6151c0b2f7Stbbdev } 6251c0b2f7Stbbdev 6351c0b2f7Stbbdev }; 6451c0b2f7Stbbdev 6551c0b2f7Stbbdev template< typename OutputType > 6651c0b2f7Stbbdev void run_continue_nodes( int p, tbb::flow::graph& g, tbb::flow::continue_node< OutputType >& n ) { 6751c0b2f7Stbbdev fake_continue_sender fake_sender; 6851c0b2f7Stbbdev for (size_t i = 0; i < N; ++i) { 6951c0b2f7Stbbdev tbb::detail::d1::register_predecessor(n, fake_sender); 7051c0b2f7Stbbdev } 7151c0b2f7Stbbdev 7251c0b2f7Stbbdev for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { 7351c0b2f7Stbbdev std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers; 7451c0b2f7Stbbdev for (size_t i = 0; i < num_receivers; ++i) { 7551c0b2f7Stbbdev receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) ); 7651c0b2f7Stbbdev } 7751c0b2f7Stbbdev harness_graph_executor<tbb::flow::continue_msg, OutputType>::execute_count = 0; 7851c0b2f7Stbbdev 7951c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 8051c0b2f7Stbbdev tbb::flow::make_edge( n, *receivers[r] ); 8151c0b2f7Stbbdev } 8251c0b2f7Stbbdev 8351c0b2f7Stbbdev utils::NativeParallelFor( p, parallel_puts<tbb::flow::continue_msg>(n) ); 8451c0b2f7Stbbdev g.wait_for_all(); 8551c0b2f7Stbbdev 8651c0b2f7Stbbdev // 2) the nodes will receive puts from multiple predecessors simultaneously, 8751c0b2f7Stbbdev size_t ec = harness_graph_executor<tbb::flow::continue_msg, OutputType>::execute_count; 8851c0b2f7Stbbdev CHECK_MESSAGE( (int)ec == p, "" ); 8951c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 9051c0b2f7Stbbdev size_t c = receivers[r]->my_count; 9151c0b2f7Stbbdev // 3) the nodes will send to multiple successors. 9251c0b2f7Stbbdev CHECK_MESSAGE( (int)c == p, "" ); 9351c0b2f7Stbbdev } 9451c0b2f7Stbbdev 9551c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 9651c0b2f7Stbbdev tbb::flow::remove_edge( n, *receivers[r] ); 9751c0b2f7Stbbdev } 9851c0b2f7Stbbdev } 9951c0b2f7Stbbdev } 10051c0b2f7Stbbdev 10151c0b2f7Stbbdev template< typename OutputType, typename Body > 10251c0b2f7Stbbdev void continue_nodes( Body body ) { 10351c0b2f7Stbbdev for (int p = 1; p < 2*4/*MaxThread*/; ++p) { 10451c0b2f7Stbbdev tbb::flow::graph g; 10551c0b2f7Stbbdev tbb::flow::continue_node< OutputType > exe_node( g, body ); 10651c0b2f7Stbbdev run_continue_nodes( p, g, exe_node); 10751c0b2f7Stbbdev exe_node.try_put(tbb::flow::continue_msg()); 10851c0b2f7Stbbdev tbb::flow::continue_node< OutputType > exe_node_copy( exe_node ); 10951c0b2f7Stbbdev run_continue_nodes( p, g, exe_node_copy); 11051c0b2f7Stbbdev } 11151c0b2f7Stbbdev } 11251c0b2f7Stbbdev 11351c0b2f7Stbbdev const size_t Offset = 123; 11451c0b2f7Stbbdev std::atomic<size_t> global_execute_count; 11551c0b2f7Stbbdev 11651c0b2f7Stbbdev template< typename OutputType > 11751c0b2f7Stbbdev struct inc_functor { 11851c0b2f7Stbbdev 11951c0b2f7Stbbdev std::atomic<size_t> local_execute_count; 12051c0b2f7Stbbdev inc_functor( ) { local_execute_count = 0; } 12151c0b2f7Stbbdev inc_functor( const inc_functor &f ) { local_execute_count = size_t(f.local_execute_count); } 12251c0b2f7Stbbdev void operator=(const inc_functor &f) { local_execute_count = size_t(f.local_execute_count); } 12351c0b2f7Stbbdev 12451c0b2f7Stbbdev OutputType operator()( tbb::flow::continue_msg ) { 12551c0b2f7Stbbdev ++global_execute_count; 12651c0b2f7Stbbdev ++local_execute_count; 12751c0b2f7Stbbdev return OutputType(); 12851c0b2f7Stbbdev } 12951c0b2f7Stbbdev 13051c0b2f7Stbbdev }; 13151c0b2f7Stbbdev 13251c0b2f7Stbbdev template< typename OutputType > 13351c0b2f7Stbbdev void continue_nodes_with_copy( ) { 13451c0b2f7Stbbdev 13551c0b2f7Stbbdev for (int p = 1; p < 2*4/*MaxThread*/; ++p) { 13651c0b2f7Stbbdev tbb::flow::graph g; 13751c0b2f7Stbbdev inc_functor<OutputType> cf; 13851c0b2f7Stbbdev cf.local_execute_count = Offset; 13951c0b2f7Stbbdev global_execute_count = Offset; 14051c0b2f7Stbbdev 14151c0b2f7Stbbdev tbb::flow::continue_node< OutputType > exe_node( g, cf ); 14251c0b2f7Stbbdev fake_continue_sender fake_sender; 14351c0b2f7Stbbdev for (size_t i = 0; i < N; ++i) { 14451c0b2f7Stbbdev tbb::detail::d1::register_predecessor(exe_node, fake_sender); 14551c0b2f7Stbbdev } 14651c0b2f7Stbbdev 14751c0b2f7Stbbdev for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) { 14851c0b2f7Stbbdev std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers; 14951c0b2f7Stbbdev for (size_t i = 0; i < num_receivers; ++i) { 15051c0b2f7Stbbdev receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) ); 15151c0b2f7Stbbdev } 15251c0b2f7Stbbdev 15351c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 15451c0b2f7Stbbdev tbb::flow::make_edge( exe_node, *receivers[r] ); 15551c0b2f7Stbbdev } 15651c0b2f7Stbbdev 15751c0b2f7Stbbdev utils::NativeParallelFor( p, parallel_puts<tbb::flow::continue_msg>(exe_node) ); 15851c0b2f7Stbbdev g.wait_for_all(); 15951c0b2f7Stbbdev 16051c0b2f7Stbbdev // 2) the nodes will receive puts from multiple predecessors simultaneously, 16151c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 16251c0b2f7Stbbdev size_t c = receivers[r]->my_count; 16351c0b2f7Stbbdev // 3) the nodes will send to multiple successors. 16451c0b2f7Stbbdev CHECK_MESSAGE( (int)c == p, "" ); 16551c0b2f7Stbbdev } 16651c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) { 16751c0b2f7Stbbdev tbb::flow::remove_edge( exe_node, *receivers[r] ); 16851c0b2f7Stbbdev } 16951c0b2f7Stbbdev } 17051c0b2f7Stbbdev 17151c0b2f7Stbbdev // validate that the local body matches the global execute_count and both are correct 17251c0b2f7Stbbdev inc_functor<OutputType> body_copy = tbb::flow::copy_body< inc_functor<OutputType> >( exe_node ); 17351c0b2f7Stbbdev const size_t expected_count = p*MAX_NODES + Offset; 17451c0b2f7Stbbdev size_t global_count = global_execute_count; 17551c0b2f7Stbbdev size_t inc_count = body_copy.local_execute_count; 17651c0b2f7Stbbdev CHECK_MESSAGE( global_count == expected_count, "" ); 17751c0b2f7Stbbdev CHECK_MESSAGE( global_count == inc_count, "" ); 17851c0b2f7Stbbdev g.reset(tbb::flow::rf_reset_bodies); 17951c0b2f7Stbbdev body_copy = tbb::flow::copy_body< inc_functor<OutputType> >( exe_node ); 18051c0b2f7Stbbdev inc_count = body_copy.local_execute_count; 18151c0b2f7Stbbdev CHECK_MESSAGE( ( Offset == inc_count), "reset(rf_reset_bodies) did not reset functor" ); 18251c0b2f7Stbbdev 18351c0b2f7Stbbdev } 18451c0b2f7Stbbdev } 18551c0b2f7Stbbdev 18651c0b2f7Stbbdev template< typename OutputType > 18751c0b2f7Stbbdev void run_continue_nodes() { 18851c0b2f7Stbbdev harness_graph_executor< tbb::flow::continue_msg, OutputType>::max_executors = 0; 18951c0b2f7Stbbdev continue_nodes<OutputType>( []( tbb::flow::continue_msg i ) -> OutputType { return harness_graph_executor<tbb::flow::continue_msg, OutputType>::func(i); } ); 19051c0b2f7Stbbdev continue_nodes<OutputType>( &harness_graph_executor<tbb::flow::continue_msg, OutputType>::func ); 19151c0b2f7Stbbdev continue_nodes<OutputType>( typename harness_graph_executor<tbb::flow::continue_msg, OutputType>::functor() ); 19251c0b2f7Stbbdev continue_nodes_with_copy<OutputType>(); 19351c0b2f7Stbbdev } 19451c0b2f7Stbbdev 19551c0b2f7Stbbdev //! Tests limited concurrency cases for nodes that accept data messages 19651c0b2f7Stbbdev void test_concurrency(int num_threads) { 19751c0b2f7Stbbdev tbb::task_arena arena(num_threads); 19851c0b2f7Stbbdev arena.execute( 19951c0b2f7Stbbdev [&] { 20051c0b2f7Stbbdev run_continue_nodes<tbb::flow::continue_msg>(); 20151c0b2f7Stbbdev run_continue_nodes<int>(); 20251c0b2f7Stbbdev run_continue_nodes<utils::NoAssign>(); 20351c0b2f7Stbbdev } 20451c0b2f7Stbbdev ); 20551c0b2f7Stbbdev } 20651c0b2f7Stbbdev /* 20751c0b2f7Stbbdev * Connection of two graphs is not currently supported, but works to some limited extent. 20851c0b2f7Stbbdev * This test is included to check for backward compatibility. It checks that a continue_node 20951c0b2f7Stbbdev * with predecessors in two different graphs receives the required 21051c0b2f7Stbbdev * number of continue messages before it executes. 21151c0b2f7Stbbdev */ 21251c0b2f7Stbbdev using namespace tbb::flow; 21351c0b2f7Stbbdev 21451c0b2f7Stbbdev struct add_to_counter { 21551c0b2f7Stbbdev int* counter; 21651c0b2f7Stbbdev add_to_counter(int& var):counter(&var){} 21751c0b2f7Stbbdev void operator()(continue_msg){*counter+=1;} 21851c0b2f7Stbbdev }; 21951c0b2f7Stbbdev 22051c0b2f7Stbbdev void test_two_graphs(){ 22151c0b2f7Stbbdev int count=0; 22251c0b2f7Stbbdev 22351c0b2f7Stbbdev //graph g with broadcast_node and continue_node 22451c0b2f7Stbbdev graph g; 22551c0b2f7Stbbdev broadcast_node<continue_msg> start_g(g); 22651c0b2f7Stbbdev continue_node<continue_msg> first_g(g, add_to_counter(count)); 22751c0b2f7Stbbdev 22851c0b2f7Stbbdev //graph h with broadcast_node 22951c0b2f7Stbbdev graph h; 23051c0b2f7Stbbdev broadcast_node<continue_msg> start_h(h); 23151c0b2f7Stbbdev 23251c0b2f7Stbbdev //making two edges to first_g from the two graphs 23351c0b2f7Stbbdev make_edge(start_g,first_g); 23451c0b2f7Stbbdev make_edge(start_h, first_g); 23551c0b2f7Stbbdev 23651c0b2f7Stbbdev //two try_puts from the two graphs 23751c0b2f7Stbbdev start_g.try_put(continue_msg()); 23851c0b2f7Stbbdev start_h.try_put(continue_msg()); 23951c0b2f7Stbbdev g.wait_for_all(); 24051c0b2f7Stbbdev CHECK_MESSAGE( (count==1), "Not all continue messages received"); 24151c0b2f7Stbbdev 24251c0b2f7Stbbdev //two try_puts from the graph that doesn't contain the node 24351c0b2f7Stbbdev count=0; 24451c0b2f7Stbbdev start_h.try_put(continue_msg()); 24551c0b2f7Stbbdev start_h.try_put(continue_msg()); 24651c0b2f7Stbbdev g.wait_for_all(); 24751c0b2f7Stbbdev CHECK_MESSAGE( (count==1), "Not all continue messages received -1"); 24851c0b2f7Stbbdev 24951c0b2f7Stbbdev //only one try_put 25051c0b2f7Stbbdev count=0; 25151c0b2f7Stbbdev start_g.try_put(continue_msg()); 25251c0b2f7Stbbdev g.wait_for_all(); 25351c0b2f7Stbbdev CHECK_MESSAGE( (count==0), "Node executed without waiting for all predecessors"); 25451c0b2f7Stbbdev } 25551c0b2f7Stbbdev 25651c0b2f7Stbbdev struct lightweight_policy_body { 25751c0b2f7Stbbdev const std::thread::id my_thread_id; 25851c0b2f7Stbbdev std::atomic<size_t>& my_count; 25951c0b2f7Stbbdev 26051c0b2f7Stbbdev lightweight_policy_body( std::atomic<size_t>& count ) 26151c0b2f7Stbbdev : my_thread_id(std::this_thread::get_id()), my_count(count) 26251c0b2f7Stbbdev { 26351c0b2f7Stbbdev my_count = 0; 26451c0b2f7Stbbdev } 26551c0b2f7Stbbdev lightweight_policy_body& operator=(const lightweight_policy_body&) = delete; 26651c0b2f7Stbbdev void operator()(tbb::flow::continue_msg) { 26751c0b2f7Stbbdev ++my_count; 26851c0b2f7Stbbdev std::thread::id body_thread_id = std::this_thread::get_id(); 26951c0b2f7Stbbdev CHECK_MESSAGE( (body_thread_id == my_thread_id), "Body executed as not lightweight"); 27051c0b2f7Stbbdev } 27151c0b2f7Stbbdev }; 27251c0b2f7Stbbdev 27351c0b2f7Stbbdev void test_lightweight_policy() { 27451c0b2f7Stbbdev tbb::flow::graph g; 27551c0b2f7Stbbdev std::atomic<size_t> count1; 27651c0b2f7Stbbdev std::atomic<size_t> count2; 27751c0b2f7Stbbdev tbb::flow::continue_node<tbb::flow::continue_msg, tbb::flow::lightweight> 27851c0b2f7Stbbdev node1(g, lightweight_policy_body(count1)); 27951c0b2f7Stbbdev tbb::flow::continue_node<tbb::flow::continue_msg, tbb::flow::lightweight> 28051c0b2f7Stbbdev node2(g, lightweight_policy_body(count2)); 28151c0b2f7Stbbdev 28251c0b2f7Stbbdev tbb::flow::make_edge(node1, node2); 28351c0b2f7Stbbdev const size_t n = 10; 28451c0b2f7Stbbdev for(size_t i = 0; i < n; ++i) { 28551c0b2f7Stbbdev node1.try_put(tbb::flow::continue_msg()); 28651c0b2f7Stbbdev } 28751c0b2f7Stbbdev g.wait_for_all(); 28851c0b2f7Stbbdev 28951c0b2f7Stbbdev lightweight_policy_body body1 = tbb::flow::copy_body<lightweight_policy_body>(node1); 29051c0b2f7Stbbdev lightweight_policy_body body2 = tbb::flow::copy_body<lightweight_policy_body>(node2); 29151c0b2f7Stbbdev CHECK_MESSAGE( (body1.my_count == n), "Body of the first node needs to be executed N times"); 29251c0b2f7Stbbdev CHECK_MESSAGE( (body2.my_count == n), "Body of the second node needs to be executed N times"); 29351c0b2f7Stbbdev } 29451c0b2f7Stbbdev 29551c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 29651c0b2f7Stbbdev #include <array> 29751c0b2f7Stbbdev #include <vector> 29851c0b2f7Stbbdev void test_follows_and_precedes_api() { 29951c0b2f7Stbbdev using msg_t = tbb::flow::continue_msg; 30051c0b2f7Stbbdev 30151c0b2f7Stbbdev std::array<msg_t, 3> messages_for_follows = { { msg_t(), msg_t(), msg_t() } }; 30251c0b2f7Stbbdev std::vector<msg_t> messages_for_precedes = { msg_t() }; 30351c0b2f7Stbbdev 30451c0b2f7Stbbdev auto pass_through = [](const msg_t& msg) { return msg; }; 30551c0b2f7Stbbdev 30651c0b2f7Stbbdev follows_and_precedes_testing::test_follows 30751c0b2f7Stbbdev <msg_t, tbb::flow::continue_node<msg_t>> 30851c0b2f7Stbbdev (messages_for_follows, pass_through, node_priority_t(0)); 30951c0b2f7Stbbdev 31051c0b2f7Stbbdev follows_and_precedes_testing::test_precedes 31151c0b2f7Stbbdev <msg_t, tbb::flow::continue_node<msg_t>> 31251c0b2f7Stbbdev (messages_for_precedes, /* number_of_predecessors = */0, pass_through, node_priority_t(1)); 31351c0b2f7Stbbdev } 31451c0b2f7Stbbdev #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 31551c0b2f7Stbbdev 31651c0b2f7Stbbdev #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 31751c0b2f7Stbbdev 31851c0b2f7Stbbdev template <typename ExpectedType, typename Body> 31951c0b2f7Stbbdev void test_deduction_guides_common(Body body) { 32051c0b2f7Stbbdev using namespace tbb::flow; 32151c0b2f7Stbbdev graph g; 32251c0b2f7Stbbdev 32351c0b2f7Stbbdev continue_node c1(g, body); 32451c0b2f7Stbbdev static_assert(std::is_same_v<decltype(c1), continue_node<ExpectedType>>); 32551c0b2f7Stbbdev 32651c0b2f7Stbbdev continue_node c2(g, body, lightweight()); 32751c0b2f7Stbbdev static_assert(std::is_same_v<decltype(c2), continue_node<ExpectedType, lightweight>>); 32851c0b2f7Stbbdev 32951c0b2f7Stbbdev continue_node c3(g, 5, body); 33051c0b2f7Stbbdev static_assert(std::is_same_v<decltype(c3), continue_node<ExpectedType>>); 33151c0b2f7Stbbdev 33251c0b2f7Stbbdev continue_node c4(g, 5, body, lightweight()); 33351c0b2f7Stbbdev static_assert(std::is_same_v<decltype(c4), continue_node<ExpectedType, lightweight>>); 33451c0b2f7Stbbdev 33551c0b2f7Stbbdev continue_node c5(g, body, node_priority_t(5)); 33651c0b2f7Stbbdev static_assert(std::is_same_v<decltype(c5), continue_node<ExpectedType>>); 33751c0b2f7Stbbdev 33851c0b2f7Stbbdev continue_node c6(g, body, lightweight(), node_priority_t(5)); 33951c0b2f7Stbbdev static_assert(std::is_same_v<decltype(c6), continue_node<ExpectedType, lightweight>>); 34051c0b2f7Stbbdev 34151c0b2f7Stbbdev continue_node c7(g, 5, body, node_priority_t(5)); 34251c0b2f7Stbbdev static_assert(std::is_same_v<decltype(c7), continue_node<ExpectedType>>); 34351c0b2f7Stbbdev 34451c0b2f7Stbbdev continue_node c8(g, 5, body, lightweight(), node_priority_t(5)); 34551c0b2f7Stbbdev static_assert(std::is_same_v<decltype(c8), continue_node<ExpectedType, lightweight>>); 34651c0b2f7Stbbdev 34751c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 34851c0b2f7Stbbdev broadcast_node<continue_msg> b(g); 34951c0b2f7Stbbdev 35051c0b2f7Stbbdev continue_node c9(follows(b), body); 35151c0b2f7Stbbdev static_assert(std::is_same_v<decltype(c9), continue_node<ExpectedType>>); 35251c0b2f7Stbbdev 35351c0b2f7Stbbdev continue_node c10(follows(b), body, lightweight()); 35451c0b2f7Stbbdev static_assert(std::is_same_v<decltype(c10), continue_node<ExpectedType, lightweight>>); 35551c0b2f7Stbbdev 35651c0b2f7Stbbdev continue_node c11(follows(b), 5, body); 35751c0b2f7Stbbdev static_assert(std::is_same_v<decltype(c11), continue_node<ExpectedType>>); 35851c0b2f7Stbbdev 35951c0b2f7Stbbdev continue_node c12(follows(b), 5, body, lightweight()); 36051c0b2f7Stbbdev static_assert(std::is_same_v<decltype(c12), continue_node<ExpectedType, lightweight>>); 36151c0b2f7Stbbdev 36251c0b2f7Stbbdev continue_node c13(follows(b), body, node_priority_t(5)); 36351c0b2f7Stbbdev static_assert(std::is_same_v<decltype(c13), continue_node<ExpectedType>>); 36451c0b2f7Stbbdev 36551c0b2f7Stbbdev continue_node c14(follows(b), body, lightweight(), node_priority_t(5)); 36651c0b2f7Stbbdev static_assert(std::is_same_v<decltype(c14), continue_node<ExpectedType, lightweight>>); 36751c0b2f7Stbbdev 36851c0b2f7Stbbdev continue_node c15(follows(b), 5, body, node_priority_t(5)); 36951c0b2f7Stbbdev static_assert(std::is_same_v<decltype(c15), continue_node<ExpectedType>>); 37051c0b2f7Stbbdev 37151c0b2f7Stbbdev continue_node c16(follows(b), 5, body, lightweight(), node_priority_t(5)); 37251c0b2f7Stbbdev static_assert(std::is_same_v<decltype(c16), continue_node<ExpectedType, lightweight>>); 37351c0b2f7Stbbdev #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 37451c0b2f7Stbbdev 37551c0b2f7Stbbdev continue_node c17(c1); 37651c0b2f7Stbbdev static_assert(std::is_same_v<decltype(c17), continue_node<ExpectedType>>); 37751c0b2f7Stbbdev } 37851c0b2f7Stbbdev 37951c0b2f7Stbbdev int continue_body_f(const tbb::flow::continue_msg&) { return 1; } 38051c0b2f7Stbbdev void continue_void_body_f(const tbb::flow::continue_msg&) {} 38151c0b2f7Stbbdev 38251c0b2f7Stbbdev void test_deduction_guides() { 38351c0b2f7Stbbdev using tbb::flow::continue_msg; 38451c0b2f7Stbbdev test_deduction_guides_common<int>([](const continue_msg&)->int { return 1; } ); 38551c0b2f7Stbbdev test_deduction_guides_common<continue_msg>([](const continue_msg&) {}); 38651c0b2f7Stbbdev 38751c0b2f7Stbbdev test_deduction_guides_common<int>([](const continue_msg&) mutable ->int { return 1; }); 38851c0b2f7Stbbdev test_deduction_guides_common<continue_msg>([](const continue_msg&) mutable {}); 38951c0b2f7Stbbdev 39051c0b2f7Stbbdev test_deduction_guides_common<int>(continue_body_f); 39151c0b2f7Stbbdev test_deduction_guides_common<continue_msg>(continue_void_body_f); 39251c0b2f7Stbbdev } 39351c0b2f7Stbbdev 39451c0b2f7Stbbdev #endif // __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 39551c0b2f7Stbbdev 39651c0b2f7Stbbdev // TODO: use pass_through from test_function_node instead 39751c0b2f7Stbbdev template<typename T> 39851c0b2f7Stbbdev struct passing_body { 39951c0b2f7Stbbdev T operator()(const T& val) { 40051c0b2f7Stbbdev return val; 40151c0b2f7Stbbdev } 40251c0b2f7Stbbdev }; 40351c0b2f7Stbbdev 40451c0b2f7Stbbdev /* 40551c0b2f7Stbbdev The test covers the case when a node with non-default mutex type is a predecessor for continue_node, 40651c0b2f7Stbbdev because there used to be a bug when make_edge(node, continue_node) 40751c0b2f7Stbbdev did not update continue_node's predecesosor threshold 40851c0b2f7Stbbdev since the specialization of node's successor_cache for a continue_node was not chosen. 40951c0b2f7Stbbdev */ 41051c0b2f7Stbbdev void test_successor_cache_specialization() { 41151c0b2f7Stbbdev using namespace tbb::flow; 41251c0b2f7Stbbdev 41351c0b2f7Stbbdev graph g; 41451c0b2f7Stbbdev 41551c0b2f7Stbbdev broadcast_node<continue_msg> node_with_default_mutex_type(g); 41651c0b2f7Stbbdev buffer_node<continue_msg> node_with_non_default_mutex_type(g); 41751c0b2f7Stbbdev 41851c0b2f7Stbbdev continue_node<continue_msg> node(g, passing_body<continue_msg>()); 41951c0b2f7Stbbdev 42051c0b2f7Stbbdev make_edge(node_with_default_mutex_type, node); 42151c0b2f7Stbbdev make_edge(node_with_non_default_mutex_type, node); 42251c0b2f7Stbbdev 42351c0b2f7Stbbdev buffer_node<continue_msg> buf(g); 42451c0b2f7Stbbdev 42551c0b2f7Stbbdev make_edge(node, buf); 42651c0b2f7Stbbdev 42751c0b2f7Stbbdev node_with_default_mutex_type.try_put(continue_msg()); 42851c0b2f7Stbbdev node_with_non_default_mutex_type.try_put(continue_msg()); 42951c0b2f7Stbbdev 43051c0b2f7Stbbdev g.wait_for_all(); 43151c0b2f7Stbbdev 43251c0b2f7Stbbdev continue_msg storage; 43351c0b2f7Stbbdev CHECK_MESSAGE((buf.try_get(storage) && !buf.try_get(storage)), 43451c0b2f7Stbbdev "Wrong number of messages is passed via continue_node"); 43551c0b2f7Stbbdev } 43651c0b2f7Stbbdev 43751c0b2f7Stbbdev //! Test concurrent continue_node for correctness 43851c0b2f7Stbbdev //! \brief \ref error_guessing 43951c0b2f7Stbbdev TEST_CASE("Concurrency testing") { 44051c0b2f7Stbbdev for( unsigned p=utils::MinThread; p<=utils::MaxThread; ++p ) { 44151c0b2f7Stbbdev test_concurrency(p); 44251c0b2f7Stbbdev } 44351c0b2f7Stbbdev } 44451c0b2f7Stbbdev 44551c0b2f7Stbbdev //! Test concurrent continue_node in separate graphs 44651c0b2f7Stbbdev //! \brief \ref error_guessing 44751c0b2f7Stbbdev TEST_CASE("Two graphs") { test_two_graphs(); } 44851c0b2f7Stbbdev 44951c0b2f7Stbbdev //! Test basic behaviour with lightweight body 45051c0b2f7Stbbdev //! \brief \ref requirement \ref error_guessing 45151c0b2f7Stbbdev TEST_CASE( "Lightweight policy" ) { test_lightweight_policy(); } 45251c0b2f7Stbbdev 45351c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 45451c0b2f7Stbbdev //! Test deprecated follows and preceedes API 45551c0b2f7Stbbdev //! \brief \ref error_guessing 45651c0b2f7Stbbdev TEST_CASE( "Support for follows and precedes API" ) { test_follows_and_precedes_api(); } 45751c0b2f7Stbbdev #endif 45851c0b2f7Stbbdev 45951c0b2f7Stbbdev #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT 46051c0b2f7Stbbdev //! Test deduction guides 46151c0b2f7Stbbdev //! \brief requirement 46251c0b2f7Stbbdev TEST_CASE( "Deduction guides" ) { test_deduction_guides(); } 46351c0b2f7Stbbdev #endif 46451c0b2f7Stbbdev 46551c0b2f7Stbbdev //! Test for successor cache specialization 46651c0b2f7Stbbdev //! \brief \ref regression 46751c0b2f7Stbbdev TEST_CASE( "Regression for successor cache specialization" ) { 46851c0b2f7Stbbdev test_successor_cache_specialization(); 46951c0b2f7Stbbdev } 470