151c0b2f7Stbbdev /*
2b15aabb3Stbbdev Copyright (c) 2005-2021 Intel Corporation
351c0b2f7Stbbdev
451c0b2f7Stbbdev Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev you may not use this file except in compliance with the License.
651c0b2f7Stbbdev You may obtain a copy of the License at
751c0b2f7Stbbdev
851c0b2f7Stbbdev http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev
1051c0b2f7Stbbdev Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev See the License for the specific language governing permissions and
1451c0b2f7Stbbdev limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev
17b15aabb3Stbbdev #if __INTEL_COMPILER && _MSC_VER
18b15aabb3Stbbdev #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
19b15aabb3Stbbdev #endif
20b15aabb3Stbbdev
2151c0b2f7Stbbdev #include "common/config.h"
2251c0b2f7Stbbdev
2351c0b2f7Stbbdev #include "tbb/flow_graph.h"
2451c0b2f7Stbbdev #include "tbb/spin_rw_mutex.h"
2551c0b2f7Stbbdev #include "tbb/global_control.h"
2651c0b2f7Stbbdev
2751c0b2f7Stbbdev #include "common/test.h"
2851c0b2f7Stbbdev #include "common/utils.h"
2951c0b2f7Stbbdev #include "common/graph_utils.h"
3051c0b2f7Stbbdev #include "common/test_follows_and_precedes_api.h"
31*478de5b1Stbbdev #include "common/concepts_common.h"
3251c0b2f7Stbbdev
3351c0b2f7Stbbdev
3451c0b2f7Stbbdev //! \file test_function_node.cpp
3551c0b2f7Stbbdev //! \brief Test for [flow_graph.function_node] specification
3651c0b2f7Stbbdev
3751c0b2f7Stbbdev
3851c0b2f7Stbbdev #define N 100
3951c0b2f7Stbbdev #define MAX_NODES 4
4051c0b2f7Stbbdev
4151c0b2f7Stbbdev //! Performs test on function nodes with limited concurrency and buffering
4251c0b2f7Stbbdev /** These tests check:
4351c0b2f7Stbbdev 1) that the number of executing copies never exceed the concurrency limit
4451c0b2f7Stbbdev 2) that the node never rejects
4551c0b2f7Stbbdev 3) that no items are lost
4651c0b2f7Stbbdev and 4) all of this happens even if there are multiple predecessors and successors
4751c0b2f7Stbbdev */
4851c0b2f7Stbbdev
4951c0b2f7Stbbdev template<typename IO>
5051c0b2f7Stbbdev struct pass_through {
operator ()pass_through5151c0b2f7Stbbdev IO operator()(const IO& i) { return i; }
5251c0b2f7Stbbdev };
5351c0b2f7Stbbdev
5451c0b2f7Stbbdev template< typename InputType, typename OutputType, typename Body >
buffered_levels(size_t concurrency,Body body)5551c0b2f7Stbbdev void buffered_levels( size_t concurrency, Body body ) {
5651c0b2f7Stbbdev
5751c0b2f7Stbbdev // Do for lc = 1 to concurrency level
5851c0b2f7Stbbdev for ( size_t lc = 1; lc <= concurrency; ++lc ) {
5951c0b2f7Stbbdev tbb::flow::graph g;
6051c0b2f7Stbbdev
6151c0b2f7Stbbdev // Set the execute_counter back to zero in the harness
6251c0b2f7Stbbdev harness_graph_executor<InputType, OutputType>::execute_count = 0;
6351c0b2f7Stbbdev // Set the number of current executors to zero.
6451c0b2f7Stbbdev harness_graph_executor<InputType, OutputType>::current_executors = 0;
6551c0b2f7Stbbdev // Set the max allowed executors to lc. There is a check in the functor to make sure this is never exceeded.
6651c0b2f7Stbbdev harness_graph_executor<InputType, OutputType>::max_executors = lc;
6751c0b2f7Stbbdev
6851c0b2f7Stbbdev // Create the function_node with the appropriate concurrency level, and use default buffering
6951c0b2f7Stbbdev tbb::flow::function_node< InputType, OutputType > exe_node( g, lc, body );
7051c0b2f7Stbbdev tbb::flow::function_node<InputType, InputType> pass_thru( g, tbb::flow::unlimited, pass_through<InputType>());
7151c0b2f7Stbbdev
7251c0b2f7Stbbdev // Create a vector of identical exe_nodes and pass_thrus
7351c0b2f7Stbbdev std::vector< tbb::flow::function_node< InputType, OutputType > > exe_vec(2, exe_node);
7451c0b2f7Stbbdev std::vector< tbb::flow::function_node< InputType, InputType > > pass_thru_vec(2, pass_thru);
7551c0b2f7Stbbdev // Attach each pass_thru to its corresponding exe_node
7651c0b2f7Stbbdev for (size_t node_idx=0; node_idx<exe_vec.size(); ++node_idx) {
7751c0b2f7Stbbdev tbb::flow::make_edge(pass_thru_vec[node_idx], exe_vec[node_idx]);
7851c0b2f7Stbbdev }
7951c0b2f7Stbbdev
8051c0b2f7Stbbdev // TODO: why the test is executed serially for the node pairs, not concurrently?
8151c0b2f7Stbbdev for (size_t node_idx=0; node_idx<exe_vec.size(); ++node_idx) {
8251c0b2f7Stbbdev // For num_receivers = 1 to MAX_NODES
8351c0b2f7Stbbdev for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
8451c0b2f7Stbbdev // Create num_receivers counting receivers and connect the exe_vec[node_idx] to them.
8551c0b2f7Stbbdev std::vector< std::shared_ptr<harness_mapped_receiver<OutputType>> > receivers;
8651c0b2f7Stbbdev for (size_t i = 0; i < num_receivers; i++) {
8751c0b2f7Stbbdev receivers.push_back( std::make_shared<harness_mapped_receiver<OutputType>>(g) );
8851c0b2f7Stbbdev }
8951c0b2f7Stbbdev
9051c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) {
9151c0b2f7Stbbdev tbb::flow::make_edge( exe_vec[node_idx], *receivers[r] );
9251c0b2f7Stbbdev }
9351c0b2f7Stbbdev
9451c0b2f7Stbbdev // Do the test with varying numbers of senders
9551c0b2f7Stbbdev std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders;
9651c0b2f7Stbbdev for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) {
9751c0b2f7Stbbdev // Create num_senders senders, set there message limit each to N, and connect them to
9851c0b2f7Stbbdev // pass_thru_vec[node_idx]
9951c0b2f7Stbbdev senders.clear();
10051c0b2f7Stbbdev for (size_t s = 0; s < num_senders; ++s ) {
10151c0b2f7Stbbdev senders.push_back( std::make_shared<harness_counting_sender<InputType>>() );
10251c0b2f7Stbbdev senders.back()->my_limit = N;
10351c0b2f7Stbbdev senders.back()->register_successor(pass_thru_vec[node_idx] );
10451c0b2f7Stbbdev }
10551c0b2f7Stbbdev
10651c0b2f7Stbbdev // Initialize the receivers so they know how many senders and messages to check for
10751c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) {
10851c0b2f7Stbbdev receivers[r]->initialize_map( N, num_senders );
10951c0b2f7Stbbdev }
11051c0b2f7Stbbdev
11151c0b2f7Stbbdev // Do the test
11251c0b2f7Stbbdev utils::NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) );
11351c0b2f7Stbbdev g.wait_for_all();
11451c0b2f7Stbbdev
11551c0b2f7Stbbdev // confirm that each sender was requested from N times
11651c0b2f7Stbbdev for (size_t s = 0; s < num_senders; ++s ) {
11751c0b2f7Stbbdev size_t n = senders[s]->my_received;
11851c0b2f7Stbbdev CHECK( n == N );
11951c0b2f7Stbbdev CHECK( senders[s]->my_receiver.load(std::memory_order_relaxed) == &pass_thru_vec[node_idx] );
12051c0b2f7Stbbdev }
12151c0b2f7Stbbdev // validate the receivers
12251c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) {
12351c0b2f7Stbbdev receivers[r]->validate();
12451c0b2f7Stbbdev }
12551c0b2f7Stbbdev }
12651c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) {
12751c0b2f7Stbbdev tbb::flow::remove_edge( exe_vec[node_idx], *receivers[r] );
12851c0b2f7Stbbdev }
12951c0b2f7Stbbdev CHECK( exe_vec[node_idx].try_put( InputType() ) == true );
13051c0b2f7Stbbdev g.wait_for_all();
13151c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) {
13251c0b2f7Stbbdev // since it's detached, nothing should have changed
13351c0b2f7Stbbdev receivers[r]->validate();
13451c0b2f7Stbbdev }
13551c0b2f7Stbbdev
13651c0b2f7Stbbdev } // for num_receivers
13751c0b2f7Stbbdev } // for node_idx
13851c0b2f7Stbbdev } // for concurrency level lc
13951c0b2f7Stbbdev }
14051c0b2f7Stbbdev
14151c0b2f7Stbbdev const size_t Offset = 123;
14251c0b2f7Stbbdev std::atomic<size_t> global_execute_count;
14351c0b2f7Stbbdev
14451c0b2f7Stbbdev struct inc_functor {
14551c0b2f7Stbbdev
14651c0b2f7Stbbdev std::atomic<size_t> local_execute_count;
inc_functorinc_functor14751c0b2f7Stbbdev inc_functor( ) { local_execute_count = 0; }
inc_functorinc_functor14851c0b2f7Stbbdev inc_functor( const inc_functor &f ) { local_execute_count = size_t(f.local_execute_count); }
operator =inc_functor14951c0b2f7Stbbdev void operator=( const inc_functor &f ) { local_execute_count = size_t(f.local_execute_count); }
15051c0b2f7Stbbdev
operator ()inc_functor15151c0b2f7Stbbdev int operator()( int i ) {
15251c0b2f7Stbbdev ++global_execute_count;
15351c0b2f7Stbbdev ++local_execute_count;
15451c0b2f7Stbbdev return i;
15551c0b2f7Stbbdev }
15651c0b2f7Stbbdev
15751c0b2f7Stbbdev };
15851c0b2f7Stbbdev
15951c0b2f7Stbbdev template< typename InputType, typename OutputType >
buffered_levels_with_copy(size_t concurrency)16051c0b2f7Stbbdev void buffered_levels_with_copy( size_t concurrency ) {
16151c0b2f7Stbbdev
16251c0b2f7Stbbdev // Do for lc = 1 to concurrency level
16351c0b2f7Stbbdev for ( size_t lc = 1; lc <= concurrency; ++lc ) {
16451c0b2f7Stbbdev tbb::flow::graph g;
16551c0b2f7Stbbdev
16651c0b2f7Stbbdev inc_functor cf;
16751c0b2f7Stbbdev cf.local_execute_count = Offset;
16851c0b2f7Stbbdev global_execute_count = Offset;
16951c0b2f7Stbbdev
17051c0b2f7Stbbdev tbb::flow::function_node< InputType, OutputType > exe_node( g, lc, cf );
17151c0b2f7Stbbdev
17251c0b2f7Stbbdev for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
17351c0b2f7Stbbdev
17451c0b2f7Stbbdev std::vector< std::shared_ptr<harness_mapped_receiver<OutputType>> > receivers;
17551c0b2f7Stbbdev for (size_t i = 0; i < num_receivers; i++) {
17651c0b2f7Stbbdev receivers.push_back( std::make_shared<harness_mapped_receiver<OutputType>>(g) );
17751c0b2f7Stbbdev }
17851c0b2f7Stbbdev
17951c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) {
18051c0b2f7Stbbdev tbb::flow::make_edge( exe_node, *receivers[r] );
18151c0b2f7Stbbdev }
18251c0b2f7Stbbdev
18351c0b2f7Stbbdev std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders;
18451c0b2f7Stbbdev for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) {
18551c0b2f7Stbbdev senders.clear();
18651c0b2f7Stbbdev for (size_t s = 0; s < num_senders; ++s ) {
18751c0b2f7Stbbdev senders.push_back( std::make_shared<harness_counting_sender<InputType>>() );
18851c0b2f7Stbbdev senders.back()->my_limit = N;
18951c0b2f7Stbbdev tbb::flow::make_edge( *senders.back(), exe_node );
19051c0b2f7Stbbdev }
19151c0b2f7Stbbdev
19251c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) {
19351c0b2f7Stbbdev receivers[r]->initialize_map( N, num_senders );
19451c0b2f7Stbbdev }
19551c0b2f7Stbbdev
19651c0b2f7Stbbdev utils::NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) );
19751c0b2f7Stbbdev g.wait_for_all();
19851c0b2f7Stbbdev
19951c0b2f7Stbbdev for (size_t s = 0; s < num_senders; ++s ) {
20051c0b2f7Stbbdev size_t n = senders[s]->my_received;
20151c0b2f7Stbbdev CHECK( n == N );
20251c0b2f7Stbbdev CHECK( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_node );
20351c0b2f7Stbbdev }
20451c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) {
20551c0b2f7Stbbdev receivers[r]->validate();
20651c0b2f7Stbbdev }
20751c0b2f7Stbbdev }
20851c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) {
20951c0b2f7Stbbdev tbb::flow::remove_edge( exe_node, *receivers[r] );
21051c0b2f7Stbbdev }
21151c0b2f7Stbbdev CHECK( exe_node.try_put( InputType() ) == true );
21251c0b2f7Stbbdev g.wait_for_all();
21351c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) {
21451c0b2f7Stbbdev receivers[r]->validate();
21551c0b2f7Stbbdev }
21651c0b2f7Stbbdev }
21751c0b2f7Stbbdev
21851c0b2f7Stbbdev // validate that the local body matches the global execute_count and both are correct
21951c0b2f7Stbbdev inc_functor body_copy = tbb::flow::copy_body<inc_functor>( exe_node );
22051c0b2f7Stbbdev const size_t expected_count = N/2 * MAX_NODES * MAX_NODES * ( MAX_NODES + 1 ) + MAX_NODES + Offset;
22151c0b2f7Stbbdev size_t global_count = global_execute_count;
22251c0b2f7Stbbdev size_t inc_count = body_copy.local_execute_count;
22351c0b2f7Stbbdev CHECK(global_count == expected_count);
22451c0b2f7Stbbdev CHECK(global_count == inc_count );
22551c0b2f7Stbbdev g.reset(tbb::flow::rf_reset_bodies);
22651c0b2f7Stbbdev body_copy = tbb::flow::copy_body<inc_functor>( exe_node );
22751c0b2f7Stbbdev inc_count = body_copy.local_execute_count;
22851c0b2f7Stbbdev CHECK_MESSAGE( Offset == inc_count, "reset(rf_reset_bodies) did not reset functor" );
22951c0b2f7Stbbdev }
23051c0b2f7Stbbdev }
23151c0b2f7Stbbdev
23251c0b2f7Stbbdev template< typename InputType, typename OutputType >
run_buffered_levels(int c)23351c0b2f7Stbbdev void run_buffered_levels( int c ) {
23451c0b2f7Stbbdev buffered_levels<InputType,OutputType>( c, []( InputType i ) -> OutputType { return harness_graph_executor<InputType, OutputType>::func(i); } );
23551c0b2f7Stbbdev buffered_levels<InputType,OutputType>( c, &harness_graph_executor<InputType, OutputType>::func );
23651c0b2f7Stbbdev buffered_levels<InputType,OutputType>( c, typename harness_graph_executor<InputType, OutputType>::functor() );
23751c0b2f7Stbbdev buffered_levels_with_copy<InputType,OutputType>( c );
23851c0b2f7Stbbdev }
23951c0b2f7Stbbdev
24051c0b2f7Stbbdev
24151c0b2f7Stbbdev //! Performs test on executable nodes with limited concurrency
24251c0b2f7Stbbdev /** These tests check:
24351c0b2f7Stbbdev 1) that the nodes will accepts puts up to the concurrency limit,
24451c0b2f7Stbbdev 2) the nodes do not exceed the concurrency limit even when run with more threads (this is checked in the harness_graph_executor),
24551c0b2f7Stbbdev 3) the nodes will receive puts from multiple successors simultaneously,
24651c0b2f7Stbbdev and 4) the nodes will send to multiple predecessors.
24751c0b2f7Stbbdev There is no checking of the contents of the messages for corruption.
24851c0b2f7Stbbdev */
24951c0b2f7Stbbdev
25051c0b2f7Stbbdev template< typename InputType, typename OutputType, typename Body >
concurrency_levels(size_t concurrency,Body body)25151c0b2f7Stbbdev void concurrency_levels( size_t concurrency, Body body ) {
25251c0b2f7Stbbdev
25351c0b2f7Stbbdev for ( size_t lc = 1; lc <= concurrency; ++lc ) {
25451c0b2f7Stbbdev tbb::flow::graph g;
25551c0b2f7Stbbdev
25651c0b2f7Stbbdev // Set the execute_counter back to zero in the harness
25751c0b2f7Stbbdev harness_graph_executor<InputType, OutputType>::execute_count = 0;
25851c0b2f7Stbbdev // Set the number of current executors to zero.
25951c0b2f7Stbbdev harness_graph_executor<InputType, OutputType>::current_executors = 0;
26051c0b2f7Stbbdev // Set the max allowed executors to lc. There is a check in the functor to make sure this is never exceeded.
26151c0b2f7Stbbdev harness_graph_executor<InputType, OutputType>::max_executors = lc;
26251c0b2f7Stbbdev
26351c0b2f7Stbbdev typedef tbb::flow::function_node< InputType, OutputType, tbb::flow::rejecting > fnode_type;
26451c0b2f7Stbbdev fnode_type exe_node( g, lc, body );
26551c0b2f7Stbbdev
26651c0b2f7Stbbdev for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
26751c0b2f7Stbbdev
26851c0b2f7Stbbdev std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers;
26951c0b2f7Stbbdev for (size_t i = 0; i < num_receivers; ++i) {
27051c0b2f7Stbbdev receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) );
27151c0b2f7Stbbdev }
27251c0b2f7Stbbdev
27351c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) {
27451c0b2f7Stbbdev tbb::flow::make_edge( exe_node, *receivers[r] );
27551c0b2f7Stbbdev }
27651c0b2f7Stbbdev
27751c0b2f7Stbbdev std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders;
27851c0b2f7Stbbdev
27951c0b2f7Stbbdev for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) {
28051c0b2f7Stbbdev senders.clear();
28151c0b2f7Stbbdev {
28251c0b2f7Stbbdev // Exclusively lock m to prevent exe_node from finishing
28351c0b2f7Stbbdev tbb::spin_rw_mutex::scoped_lock l(
28451c0b2f7Stbbdev harness_graph_executor<InputType, OutputType>::template mutex_holder<tbb::spin_rw_mutex>::mutex
28551c0b2f7Stbbdev );
28651c0b2f7Stbbdev
28751c0b2f7Stbbdev // put to lc level, it will accept and then block at m
28851c0b2f7Stbbdev for ( size_t c = 0 ; c < lc ; ++c ) {
28951c0b2f7Stbbdev CHECK( exe_node.try_put( InputType() ) == true );
29051c0b2f7Stbbdev }
29151c0b2f7Stbbdev // it only accepts to lc level
29251c0b2f7Stbbdev CHECK( exe_node.try_put( InputType() ) == false );
29351c0b2f7Stbbdev
29451c0b2f7Stbbdev for (size_t s = 0; s < num_senders; ++s ) {
29551c0b2f7Stbbdev senders.push_back( std::make_shared<harness_counting_sender<InputType>>() );
29651c0b2f7Stbbdev // register a sender
29751c0b2f7Stbbdev senders.back()->my_limit = N;
29851c0b2f7Stbbdev exe_node.register_predecessor( *senders.back() );
29951c0b2f7Stbbdev }
30051c0b2f7Stbbdev
30151c0b2f7Stbbdev } // release lock at end of scope, setting the exe node free to continue
30251c0b2f7Stbbdev // wait for graph to settle down
30351c0b2f7Stbbdev g.wait_for_all();
30451c0b2f7Stbbdev
30551c0b2f7Stbbdev // confirm that each sender was requested from N times
30651c0b2f7Stbbdev for (size_t s = 0; s < num_senders; ++s ) {
30751c0b2f7Stbbdev size_t n = senders[s]->my_received;
30851c0b2f7Stbbdev CHECK( n == N );
30951c0b2f7Stbbdev CHECK( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_node );
31051c0b2f7Stbbdev }
31151c0b2f7Stbbdev // confirm that each receivers got N * num_senders + the initial lc puts
31251c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) {
31351c0b2f7Stbbdev size_t n = receivers[r]->my_count;
31451c0b2f7Stbbdev CHECK( n == num_senders*N+lc );
31551c0b2f7Stbbdev receivers[r]->my_count = 0;
31651c0b2f7Stbbdev }
31751c0b2f7Stbbdev }
31851c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) {
31951c0b2f7Stbbdev tbb::flow::remove_edge( exe_node, *receivers[r] );
32051c0b2f7Stbbdev }
32151c0b2f7Stbbdev CHECK( exe_node.try_put( InputType() ) == true );
32251c0b2f7Stbbdev g.wait_for_all();
32351c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) {
32451c0b2f7Stbbdev CHECK( int(receivers[r]->my_count) == 0 );
32551c0b2f7Stbbdev }
32651c0b2f7Stbbdev }
32751c0b2f7Stbbdev }
32851c0b2f7Stbbdev }
32951c0b2f7Stbbdev
33051c0b2f7Stbbdev
33151c0b2f7Stbbdev template< typename InputType, typename OutputType >
run_concurrency_levels(int c)33251c0b2f7Stbbdev void run_concurrency_levels( int c ) {
33351c0b2f7Stbbdev concurrency_levels<InputType,OutputType>( c, []( InputType i ) -> OutputType { return harness_graph_executor<InputType, OutputType>::template tfunc<tbb::spin_rw_mutex>(i); } );
33451c0b2f7Stbbdev concurrency_levels<InputType,OutputType>( c, &harness_graph_executor<InputType, OutputType>::template tfunc<tbb::spin_rw_mutex> );
33551c0b2f7Stbbdev concurrency_levels<InputType,OutputType>( c, typename harness_graph_executor<InputType, OutputType>::template tfunctor<tbb::spin_rw_mutex>() );
33651c0b2f7Stbbdev }
33751c0b2f7Stbbdev
33851c0b2f7Stbbdev
33951c0b2f7Stbbdev struct empty_no_assign {
empty_no_assignempty_no_assign34051c0b2f7Stbbdev empty_no_assign() {}
empty_no_assignempty_no_assign34151c0b2f7Stbbdev empty_no_assign( int ) {}
operator intempty_no_assign34251c0b2f7Stbbdev operator int() { return 0; }
34351c0b2f7Stbbdev };
34451c0b2f7Stbbdev
34551c0b2f7Stbbdev template< typename InputType >
34651c0b2f7Stbbdev struct parallel_puts : private utils::NoAssign {
34751c0b2f7Stbbdev
34851c0b2f7Stbbdev tbb::flow::receiver< InputType > * const my_exe_node;
34951c0b2f7Stbbdev
parallel_putsparallel_puts35051c0b2f7Stbbdev parallel_puts( tbb::flow::receiver< InputType > &exe_node ) : my_exe_node(&exe_node) {}
35151c0b2f7Stbbdev
operator ()parallel_puts35251c0b2f7Stbbdev void operator()( int ) const {
35351c0b2f7Stbbdev for ( int i = 0; i < N; ++i ) {
35451c0b2f7Stbbdev // the nodes will accept all puts
35551c0b2f7Stbbdev CHECK( my_exe_node->try_put( InputType() ) == true );
35651c0b2f7Stbbdev }
35751c0b2f7Stbbdev }
35851c0b2f7Stbbdev
35951c0b2f7Stbbdev };
36051c0b2f7Stbbdev
36151c0b2f7Stbbdev //! Performs test on executable nodes with unlimited concurrency
36251c0b2f7Stbbdev /** These tests check:
36351c0b2f7Stbbdev 1) that the nodes will accept all puts
36451c0b2f7Stbbdev 2) the nodes will receive puts from multiple predecessors simultaneously,
36551c0b2f7Stbbdev and 3) the nodes will send to multiple successors.
36651c0b2f7Stbbdev There is no checking of the contents of the messages for corruption.
36751c0b2f7Stbbdev */
36851c0b2f7Stbbdev
36951c0b2f7Stbbdev template< typename InputType, typename OutputType, typename Body >
unlimited_concurrency(Body body)37051c0b2f7Stbbdev void unlimited_concurrency( Body body ) {
37151c0b2f7Stbbdev
37251c0b2f7Stbbdev for (unsigned p = 1; p < 2*utils::MaxThread; ++p) {
37351c0b2f7Stbbdev tbb::flow::graph g;
37451c0b2f7Stbbdev tbb::flow::function_node< InputType, OutputType, tbb::flow::rejecting > exe_node( g, tbb::flow::unlimited, body );
37551c0b2f7Stbbdev
37651c0b2f7Stbbdev for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
37751c0b2f7Stbbdev
37851c0b2f7Stbbdev std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers;
37951c0b2f7Stbbdev for (size_t i = 0; i < num_receivers; ++i) {
38051c0b2f7Stbbdev receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) );
38151c0b2f7Stbbdev }
38251c0b2f7Stbbdev
38351c0b2f7Stbbdev harness_graph_executor<InputType, OutputType>::execute_count = 0;
38451c0b2f7Stbbdev
38551c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) {
38651c0b2f7Stbbdev tbb::flow::make_edge( exe_node, *receivers[r] );
38751c0b2f7Stbbdev }
38851c0b2f7Stbbdev
38951c0b2f7Stbbdev utils::NativeParallelFor( p, parallel_puts<InputType>(exe_node) );
39051c0b2f7Stbbdev g.wait_for_all();
39151c0b2f7Stbbdev
39251c0b2f7Stbbdev // 2) the nodes will receive puts from multiple predecessors simultaneously,
39351c0b2f7Stbbdev size_t ec = harness_graph_executor<InputType, OutputType>::execute_count;
39451c0b2f7Stbbdev CHECK( ec == p*N );
39551c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) {
39651c0b2f7Stbbdev size_t c = receivers[r]->my_count;
39751c0b2f7Stbbdev // 3) the nodes will send to multiple successors.
39851c0b2f7Stbbdev CHECK( c == p*N );
39951c0b2f7Stbbdev }
40051c0b2f7Stbbdev for (size_t r = 0; r < num_receivers; ++r ) {
40151c0b2f7Stbbdev tbb::flow::remove_edge( exe_node, *receivers[r] );
40251c0b2f7Stbbdev }
40351c0b2f7Stbbdev }
40451c0b2f7Stbbdev }
40551c0b2f7Stbbdev }
40651c0b2f7Stbbdev
40751c0b2f7Stbbdev template< typename InputType, typename OutputType >
run_unlimited_concurrency()40851c0b2f7Stbbdev void run_unlimited_concurrency() {
40951c0b2f7Stbbdev harness_graph_executor<InputType, OutputType>::max_executors = 0;
41051c0b2f7Stbbdev unlimited_concurrency<InputType,OutputType>( []( InputType i ) -> OutputType { return harness_graph_executor<InputType, OutputType>::func(i); } );
41151c0b2f7Stbbdev unlimited_concurrency<InputType,OutputType>( &harness_graph_executor<InputType, OutputType>::func );
41251c0b2f7Stbbdev unlimited_concurrency<InputType,OutputType>( typename harness_graph_executor<InputType, OutputType>::functor() );
41351c0b2f7Stbbdev }
41451c0b2f7Stbbdev
41551c0b2f7Stbbdev struct continue_msg_to_int {
41651c0b2f7Stbbdev int my_int;
continue_msg_to_intcontinue_msg_to_int41751c0b2f7Stbbdev continue_msg_to_int(int x) : my_int(x) {}
operator ()continue_msg_to_int41851c0b2f7Stbbdev int operator()(tbb::flow::continue_msg) { return my_int; }
41951c0b2f7Stbbdev };
42051c0b2f7Stbbdev
test_function_node_with_continue_msg_as_input()42151c0b2f7Stbbdev void test_function_node_with_continue_msg_as_input() {
42251c0b2f7Stbbdev // If this function terminates, then this test is successful
42351c0b2f7Stbbdev tbb::flow::graph g;
42451c0b2f7Stbbdev
42551c0b2f7Stbbdev tbb::flow::broadcast_node<tbb::flow::continue_msg> Start(g);
42651c0b2f7Stbbdev
42751c0b2f7Stbbdev tbb::flow::function_node<tbb::flow::continue_msg, int, tbb::flow::rejecting> FN1( g, tbb::flow::serial, continue_msg_to_int(42));
42851c0b2f7Stbbdev tbb::flow::function_node<tbb::flow::continue_msg, int, tbb::flow::rejecting> FN2( g, tbb::flow::serial, continue_msg_to_int(43));
42951c0b2f7Stbbdev
43051c0b2f7Stbbdev tbb::flow::make_edge( Start, FN1 );
43151c0b2f7Stbbdev tbb::flow::make_edge( Start, FN2 );
43251c0b2f7Stbbdev
43351c0b2f7Stbbdev Start.try_put( tbb::flow::continue_msg() );
43451c0b2f7Stbbdev g.wait_for_all();
43551c0b2f7Stbbdev }
43651c0b2f7Stbbdev
43751c0b2f7Stbbdev //! Tests limited concurrency cases for nodes that accept data messages
test_concurrency(int num_threads)43851c0b2f7Stbbdev void test_concurrency(int num_threads) {
43951c0b2f7Stbbdev tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, num_threads);
44051c0b2f7Stbbdev run_concurrency_levels<int,int>(num_threads);
44151c0b2f7Stbbdev run_concurrency_levels<int,tbb::flow::continue_msg>(num_threads);
44251c0b2f7Stbbdev run_buffered_levels<int, int>(num_threads);
44351c0b2f7Stbbdev run_unlimited_concurrency<int,int>();
44451c0b2f7Stbbdev run_unlimited_concurrency<int,empty_no_assign>();
44551c0b2f7Stbbdev run_unlimited_concurrency<empty_no_assign,int>();
44651c0b2f7Stbbdev run_unlimited_concurrency<empty_no_assign,empty_no_assign>();
44751c0b2f7Stbbdev run_unlimited_concurrency<int,tbb::flow::continue_msg>();
44851c0b2f7Stbbdev run_unlimited_concurrency<empty_no_assign,tbb::flow::continue_msg>();
44951c0b2f7Stbbdev test_function_node_with_continue_msg_as_input();
45051c0b2f7Stbbdev }
45151c0b2f7Stbbdev
45251c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
45351c0b2f7Stbbdev #include <array>
45451c0b2f7Stbbdev #include <vector>
test_follows_and_precedes_api()45551c0b2f7Stbbdev void test_follows_and_precedes_api() {
45651c0b2f7Stbbdev using msg_t = tbb::flow::continue_msg;
45751c0b2f7Stbbdev
45851c0b2f7Stbbdev std::array<msg_t, 3> messages_for_follows = { {msg_t(), msg_t(), msg_t()} };
45951c0b2f7Stbbdev std::vector<msg_t> messages_for_precedes = { msg_t() };
46051c0b2f7Stbbdev
46151c0b2f7Stbbdev pass_through<msg_t> pass_msg;
46251c0b2f7Stbbdev
46351c0b2f7Stbbdev follows_and_precedes_testing::test_follows
46451c0b2f7Stbbdev <msg_t, tbb::flow::function_node<msg_t, msg_t>>
46551c0b2f7Stbbdev (messages_for_follows, tbb::flow::unlimited, pass_msg);
46651c0b2f7Stbbdev follows_and_precedes_testing::test_precedes
46751c0b2f7Stbbdev <msg_t, tbb::flow::function_node<msg_t, msg_t>>
46851c0b2f7Stbbdev (messages_for_precedes, tbb::flow::unlimited, pass_msg, tbb::flow::node_priority_t(1));
46951c0b2f7Stbbdev }
47051c0b2f7Stbbdev #endif
47151c0b2f7Stbbdev
47251c0b2f7Stbbdev
47351c0b2f7Stbbdev //! Test various node bodies with concurrency
47451c0b2f7Stbbdev //! \brief \ref error_guessing
47551c0b2f7Stbbdev TEST_CASE("Concurrency test") {
47651c0b2f7Stbbdev for(unsigned int p = utils::MinThread; p <= utils::MaxThread; ++p ) {
47751c0b2f7Stbbdev test_concurrency(p);
47851c0b2f7Stbbdev }
47951c0b2f7Stbbdev }
48051c0b2f7Stbbdev
48151c0b2f7Stbbdev //! NativeParallelFor testing with various concurrency settings
48251c0b2f7Stbbdev //! \brief \ref error_guessing
48351c0b2f7Stbbdev TEST_CASE("Lightweight testing"){
48451c0b2f7Stbbdev lightweight_testing::test<tbb::flow::function_node>(10);
48551c0b2f7Stbbdev }
48651c0b2f7Stbbdev
48751c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
48851c0b2f7Stbbdev //! Test follows and precedes API
48951c0b2f7Stbbdev //! \brief \ref error_guessing
49051c0b2f7Stbbdev TEST_CASE("Flowgraph node set test"){
49151c0b2f7Stbbdev test_follows_and_precedes_api();
49251c0b2f7Stbbdev }
49351c0b2f7Stbbdev #endif
49451c0b2f7Stbbdev
49551c0b2f7Stbbdev //! try_release and try_consume test
49651c0b2f7Stbbdev //! \brief \ref error_guessing
49751c0b2f7Stbbdev TEST_CASE("try_release try_consume"){
49851c0b2f7Stbbdev tbb::flow::graph g;
49951c0b2f7Stbbdev
__anondcbd16150402(const int&v)50051c0b2f7Stbbdev tbb::flow::function_node<int, int> fn(g, tbb::flow::unlimited, [](const int&v){return v;});
50151c0b2f7Stbbdev
50251c0b2f7Stbbdev CHECK_MESSAGE((fn.try_release()==false), "try_release should initially return false on a node");
50351c0b2f7Stbbdev CHECK_MESSAGE((fn.try_consume()==false), "try_consume should initially return false on a node");
50451c0b2f7Stbbdev }
505*478de5b1Stbbdev
506*478de5b1Stbbdev #if __TBB_CPP20_CONCEPTS_PRESENT
507*478de5b1Stbbdev //! \brief \ref error_guessing
508*478de5b1Stbbdev TEST_CASE("constraints for function_node input and output") {
509*478de5b1Stbbdev struct InputObject {
510*478de5b1Stbbdev InputObject() = default;
511*478de5b1Stbbdev InputObject( const InputObject& ) = default;
512*478de5b1Stbbdev };
513*478de5b1Stbbdev struct OutputObject : test_concepts::Copyable {};
514*478de5b1Stbbdev
515*478de5b1Stbbdev static_assert(utils::well_formed_instantiation<tbb::flow::function_node, InputObject, OutputObject>);
516*478de5b1Stbbdev static_assert(utils::well_formed_instantiation<tbb::flow::function_node, int, int>);
517*478de5b1Stbbdev static_assert(!utils::well_formed_instantiation<tbb::flow::function_node, test_concepts::NonCopyable, OutputObject>);
518*478de5b1Stbbdev static_assert(!utils::well_formed_instantiation<tbb::flow::function_node, test_concepts::NonDefaultInitializable, OutputObject>);
519*478de5b1Stbbdev static_assert(!utils::well_formed_instantiation<tbb::flow::function_node, InputObject, test_concepts::NonCopyable>);
520*478de5b1Stbbdev }
521*478de5b1Stbbdev
522*478de5b1Stbbdev template <typename Input, typename Output, typename Body>
523*478de5b1Stbbdev concept can_call_function_node_ctor = requires( tbb::flow::graph& graph, std::size_t concurrency, Body body,
524*478de5b1Stbbdev tbb::flow::node_priority_t priority, tbb::flow::buffer_node<int>& f ) {
525*478de5b1Stbbdev tbb::flow::function_node<Input, Output>(graph, concurrency, body);
526*478de5b1Stbbdev tbb::flow::function_node<Input, Output>(graph, concurrency, body, priority);
527*478de5b1Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
528*478de5b1Stbbdev tbb::flow::function_node<Input, Output>(tbb::flow::follows(f), concurrency, body);
529*478de5b1Stbbdev tbb::flow::function_node<Input, Output>(tbb::flow::follows(f), concurrency, body, priority);
530*478de5b1Stbbdev #endif
531*478de5b1Stbbdev };
532*478de5b1Stbbdev
533*478de5b1Stbbdev //! \brief \ref error_guessing
534*478de5b1Stbbdev TEST_CASE("constraints for function_node body") {
535*478de5b1Stbbdev using input_type = int;
536*478de5b1Stbbdev using output_type = int;
537*478de5b1Stbbdev using namespace test_concepts::function_node_body;
538*478de5b1Stbbdev
539*478de5b1Stbbdev static_assert(can_call_function_node_ctor<input_type, output_type, Correct<input_type, output_type>>);
540*478de5b1Stbbdev static_assert(!can_call_function_node_ctor<input_type, output_type, NonCopyable<input_type, output_type>>);
541*478de5b1Stbbdev static_assert(!can_call_function_node_ctor<input_type, output_type, NonDestructible<input_type, output_type>>);
542*478de5b1Stbbdev static_assert(!can_call_function_node_ctor<input_type, output_type, NoOperatorRoundBrackets<input_type, output_type>>);
543*478de5b1Stbbdev static_assert(!can_call_function_node_ctor<input_type, output_type, WrongInputRoundBrackets<input_type, output_type>>);
544*478de5b1Stbbdev static_assert(!can_call_function_node_ctor<input_type, output_type, WrongReturnRoundBrackets<input_type, output_type>>);
545*478de5b1Stbbdev }
546*478de5b1Stbbdev #endif // __TBB_CPP20_CONCEPTS_PRESENT
547