xref: /oneTBB/test/tbb/test_function_node.cpp (revision de0109be)
151c0b2f7Stbbdev /*
2b15aabb3Stbbdev     Copyright (c) 2005-2021 Intel Corporation
351c0b2f7Stbbdev 
451c0b2f7Stbbdev     Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev     you may not use this file except in compliance with the License.
651c0b2f7Stbbdev     You may obtain a copy of the License at
751c0b2f7Stbbdev 
851c0b2f7Stbbdev         http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev 
1051c0b2f7Stbbdev     Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev     distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev     See the License for the specific language governing permissions and
1451c0b2f7Stbbdev     limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev 
17b15aabb3Stbbdev #if __INTEL_COMPILER && _MSC_VER
18b15aabb3Stbbdev #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
19b15aabb3Stbbdev #endif
20b15aabb3Stbbdev 
2151c0b2f7Stbbdev #include "common/config.h"
2251c0b2f7Stbbdev 
2351c0b2f7Stbbdev #include "tbb/flow_graph.h"
2451c0b2f7Stbbdev #include "tbb/spin_rw_mutex.h"
2551c0b2f7Stbbdev #include "tbb/global_control.h"
2651c0b2f7Stbbdev 
2751c0b2f7Stbbdev #include "common/test.h"
2851c0b2f7Stbbdev #include "common/utils.h"
2951c0b2f7Stbbdev #include "common/graph_utils.h"
3051c0b2f7Stbbdev #include "common/test_follows_and_precedes_api.h"
31*478de5b1Stbbdev #include "common/concepts_common.h"
3251c0b2f7Stbbdev 
3351c0b2f7Stbbdev 
3451c0b2f7Stbbdev //! \file test_function_node.cpp
3551c0b2f7Stbbdev //! \brief Test for [flow_graph.function_node] specification
3651c0b2f7Stbbdev 
3751c0b2f7Stbbdev 
3851c0b2f7Stbbdev #define N 100
3951c0b2f7Stbbdev #define MAX_NODES 4
4051c0b2f7Stbbdev 
4151c0b2f7Stbbdev //! Performs test on function nodes with limited concurrency and buffering
4251c0b2f7Stbbdev /** These tests check:
4351c0b2f7Stbbdev     1) that the number of executing copies never exceed the concurrency limit
4451c0b2f7Stbbdev     2) that the node never rejects
4551c0b2f7Stbbdev     3) that no items are lost
4651c0b2f7Stbbdev     and 4) all of this happens even if there are multiple predecessors and successors
4751c0b2f7Stbbdev */
4851c0b2f7Stbbdev 
4951c0b2f7Stbbdev template<typename IO>
5051c0b2f7Stbbdev struct pass_through {
operator ()pass_through5151c0b2f7Stbbdev     IO operator()(const IO& i) { return i; }
5251c0b2f7Stbbdev };
5351c0b2f7Stbbdev 
5451c0b2f7Stbbdev template< typename InputType, typename OutputType, typename Body >
buffered_levels(size_t concurrency,Body body)5551c0b2f7Stbbdev void buffered_levels( size_t concurrency, Body body ) {
5651c0b2f7Stbbdev 
5751c0b2f7Stbbdev    // Do for lc = 1 to concurrency level
5851c0b2f7Stbbdev    for ( size_t lc = 1; lc <= concurrency; ++lc ) {
5951c0b2f7Stbbdev    tbb::flow::graph g;
6051c0b2f7Stbbdev 
6151c0b2f7Stbbdev    // Set the execute_counter back to zero in the harness
6251c0b2f7Stbbdev    harness_graph_executor<InputType, OutputType>::execute_count = 0;
6351c0b2f7Stbbdev    // Set the number of current executors to zero.
6451c0b2f7Stbbdev    harness_graph_executor<InputType, OutputType>::current_executors = 0;
6551c0b2f7Stbbdev    // Set the max allowed executors to lc.  There is a check in the functor to make sure this is never exceeded.
6651c0b2f7Stbbdev    harness_graph_executor<InputType, OutputType>::max_executors = lc;
6751c0b2f7Stbbdev 
6851c0b2f7Stbbdev    // Create the function_node with the appropriate concurrency level, and use default buffering
6951c0b2f7Stbbdev    tbb::flow::function_node< InputType, OutputType > exe_node( g, lc, body );
7051c0b2f7Stbbdev    tbb::flow::function_node<InputType, InputType> pass_thru( g, tbb::flow::unlimited, pass_through<InputType>());
7151c0b2f7Stbbdev 
7251c0b2f7Stbbdev    // Create a vector of identical exe_nodes and pass_thrus
7351c0b2f7Stbbdev    std::vector< tbb::flow::function_node< InputType, OutputType > > exe_vec(2, exe_node);
7451c0b2f7Stbbdev    std::vector< tbb::flow::function_node< InputType, InputType > > pass_thru_vec(2, pass_thru);
7551c0b2f7Stbbdev    // Attach each pass_thru to its corresponding exe_node
7651c0b2f7Stbbdev    for (size_t node_idx=0; node_idx<exe_vec.size(); ++node_idx) {
7751c0b2f7Stbbdev        tbb::flow::make_edge(pass_thru_vec[node_idx], exe_vec[node_idx]);
7851c0b2f7Stbbdev    }
7951c0b2f7Stbbdev 
8051c0b2f7Stbbdev    // TODO: why the test is executed serially for the node pairs, not concurrently?
8151c0b2f7Stbbdev    for (size_t node_idx=0; node_idx<exe_vec.size(); ++node_idx) {
8251c0b2f7Stbbdev    // For num_receivers = 1 to MAX_NODES
8351c0b2f7Stbbdev        for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
8451c0b2f7Stbbdev            // Create num_receivers counting receivers and connect the exe_vec[node_idx] to them.
8551c0b2f7Stbbdev            std::vector< std::shared_ptr<harness_mapped_receiver<OutputType>> > receivers;
8651c0b2f7Stbbdev            for (size_t i = 0; i < num_receivers; i++) {
8751c0b2f7Stbbdev                receivers.push_back( std::make_shared<harness_mapped_receiver<OutputType>>(g) );
8851c0b2f7Stbbdev            }
8951c0b2f7Stbbdev 
9051c0b2f7Stbbdev            for (size_t r = 0; r < num_receivers; ++r ) {
9151c0b2f7Stbbdev                tbb::flow::make_edge( exe_vec[node_idx], *receivers[r] );
9251c0b2f7Stbbdev            }
9351c0b2f7Stbbdev 
9451c0b2f7Stbbdev            // Do the test with varying numbers of senders
9551c0b2f7Stbbdev            std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders;
9651c0b2f7Stbbdev            for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) {
9751c0b2f7Stbbdev                // Create num_senders senders, set there message limit each to N, and connect them to
9851c0b2f7Stbbdev                // pass_thru_vec[node_idx]
9951c0b2f7Stbbdev                senders.clear();
10051c0b2f7Stbbdev                for (size_t s = 0; s < num_senders; ++s ) {
10151c0b2f7Stbbdev                    senders.push_back( std::make_shared<harness_counting_sender<InputType>>() );
10251c0b2f7Stbbdev                    senders.back()->my_limit = N;
10351c0b2f7Stbbdev                    senders.back()->register_successor(pass_thru_vec[node_idx] );
10451c0b2f7Stbbdev                }
10551c0b2f7Stbbdev 
10651c0b2f7Stbbdev                // Initialize the receivers so they know how many senders and messages to check for
10751c0b2f7Stbbdev                for (size_t r = 0; r < num_receivers; ++r ) {
10851c0b2f7Stbbdev                    receivers[r]->initialize_map( N, num_senders );
10951c0b2f7Stbbdev                }
11051c0b2f7Stbbdev 
11151c0b2f7Stbbdev                // Do the test
11251c0b2f7Stbbdev                utils::NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) );
11351c0b2f7Stbbdev                g.wait_for_all();
11451c0b2f7Stbbdev 
11551c0b2f7Stbbdev                // confirm that each sender was requested from N times
11651c0b2f7Stbbdev                for (size_t s = 0; s < num_senders; ++s ) {
11751c0b2f7Stbbdev                    size_t n = senders[s]->my_received;
11851c0b2f7Stbbdev                    CHECK( n == N );
11951c0b2f7Stbbdev                    CHECK( senders[s]->my_receiver.load(std::memory_order_relaxed) == &pass_thru_vec[node_idx] );
12051c0b2f7Stbbdev                }
12151c0b2f7Stbbdev                // validate the receivers
12251c0b2f7Stbbdev                for (size_t r = 0; r < num_receivers; ++r ) {
12351c0b2f7Stbbdev                    receivers[r]->validate();
12451c0b2f7Stbbdev                }
12551c0b2f7Stbbdev            }
12651c0b2f7Stbbdev            for (size_t r = 0; r < num_receivers; ++r ) {
12751c0b2f7Stbbdev                tbb::flow::remove_edge( exe_vec[node_idx], *receivers[r] );
12851c0b2f7Stbbdev            }
12951c0b2f7Stbbdev            CHECK( exe_vec[node_idx].try_put( InputType() ) == true );
13051c0b2f7Stbbdev            g.wait_for_all();
13151c0b2f7Stbbdev            for (size_t r = 0; r < num_receivers; ++r ) {
13251c0b2f7Stbbdev                // since it's detached, nothing should have changed
13351c0b2f7Stbbdev                receivers[r]->validate();
13451c0b2f7Stbbdev            }
13551c0b2f7Stbbdev 
13651c0b2f7Stbbdev        } // for num_receivers
13751c0b2f7Stbbdev     } // for node_idx
13851c0b2f7Stbbdev     } // for concurrency level lc
13951c0b2f7Stbbdev }
14051c0b2f7Stbbdev 
14151c0b2f7Stbbdev const size_t Offset = 123;
14251c0b2f7Stbbdev std::atomic<size_t> global_execute_count;
14351c0b2f7Stbbdev 
14451c0b2f7Stbbdev struct inc_functor {
14551c0b2f7Stbbdev 
14651c0b2f7Stbbdev     std::atomic<size_t> local_execute_count;
inc_functorinc_functor14751c0b2f7Stbbdev     inc_functor( ) { local_execute_count = 0; }
inc_functorinc_functor14851c0b2f7Stbbdev     inc_functor( const inc_functor &f ) { local_execute_count = size_t(f.local_execute_count); }
operator =inc_functor14951c0b2f7Stbbdev     void operator=( const inc_functor &f ) { local_execute_count = size_t(f.local_execute_count); }
15051c0b2f7Stbbdev 
operator ()inc_functor15151c0b2f7Stbbdev     int operator()( int i ) {
15251c0b2f7Stbbdev        ++global_execute_count;
15351c0b2f7Stbbdev        ++local_execute_count;
15451c0b2f7Stbbdev        return i;
15551c0b2f7Stbbdev     }
15651c0b2f7Stbbdev 
15751c0b2f7Stbbdev };
15851c0b2f7Stbbdev 
15951c0b2f7Stbbdev template< typename InputType, typename OutputType >
buffered_levels_with_copy(size_t concurrency)16051c0b2f7Stbbdev void buffered_levels_with_copy( size_t concurrency ) {
16151c0b2f7Stbbdev 
16251c0b2f7Stbbdev     // Do for lc = 1 to concurrency level
16351c0b2f7Stbbdev     for ( size_t lc = 1; lc <= concurrency; ++lc ) {
16451c0b2f7Stbbdev         tbb::flow::graph g;
16551c0b2f7Stbbdev 
16651c0b2f7Stbbdev         inc_functor cf;
16751c0b2f7Stbbdev         cf.local_execute_count = Offset;
16851c0b2f7Stbbdev         global_execute_count = Offset;
16951c0b2f7Stbbdev 
17051c0b2f7Stbbdev         tbb::flow::function_node< InputType, OutputType > exe_node( g, lc, cf );
17151c0b2f7Stbbdev 
17251c0b2f7Stbbdev         for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
17351c0b2f7Stbbdev 
17451c0b2f7Stbbdev             std::vector< std::shared_ptr<harness_mapped_receiver<OutputType>> > receivers;
17551c0b2f7Stbbdev             for (size_t i = 0; i < num_receivers; i++) {
17651c0b2f7Stbbdev                 receivers.push_back( std::make_shared<harness_mapped_receiver<OutputType>>(g) );
17751c0b2f7Stbbdev             }
17851c0b2f7Stbbdev 
17951c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
18051c0b2f7Stbbdev                 tbb::flow::make_edge( exe_node, *receivers[r] );
18151c0b2f7Stbbdev             }
18251c0b2f7Stbbdev 
18351c0b2f7Stbbdev             std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders;
18451c0b2f7Stbbdev             for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) {
18551c0b2f7Stbbdev                 senders.clear();
18651c0b2f7Stbbdev                 for (size_t s = 0; s < num_senders; ++s ) {
18751c0b2f7Stbbdev                     senders.push_back( std::make_shared<harness_counting_sender<InputType>>() );
18851c0b2f7Stbbdev                     senders.back()->my_limit = N;
18951c0b2f7Stbbdev                     tbb::flow::make_edge( *senders.back(), exe_node );
19051c0b2f7Stbbdev                 }
19151c0b2f7Stbbdev 
19251c0b2f7Stbbdev                 for (size_t r = 0; r < num_receivers; ++r ) {
19351c0b2f7Stbbdev                     receivers[r]->initialize_map( N, num_senders );
19451c0b2f7Stbbdev                 }
19551c0b2f7Stbbdev 
19651c0b2f7Stbbdev                 utils::NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) );
19751c0b2f7Stbbdev                 g.wait_for_all();
19851c0b2f7Stbbdev 
19951c0b2f7Stbbdev                 for (size_t s = 0; s < num_senders; ++s ) {
20051c0b2f7Stbbdev                     size_t n = senders[s]->my_received;
20151c0b2f7Stbbdev                     CHECK( n == N );
20251c0b2f7Stbbdev                     CHECK( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_node );
20351c0b2f7Stbbdev                 }
20451c0b2f7Stbbdev                 for (size_t r = 0; r < num_receivers; ++r ) {
20551c0b2f7Stbbdev                     receivers[r]->validate();
20651c0b2f7Stbbdev                 }
20751c0b2f7Stbbdev             }
20851c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
20951c0b2f7Stbbdev                 tbb::flow::remove_edge( exe_node, *receivers[r] );
21051c0b2f7Stbbdev             }
21151c0b2f7Stbbdev             CHECK( exe_node.try_put( InputType() ) == true );
21251c0b2f7Stbbdev             g.wait_for_all();
21351c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
21451c0b2f7Stbbdev                 receivers[r]->validate();
21551c0b2f7Stbbdev             }
21651c0b2f7Stbbdev         }
21751c0b2f7Stbbdev 
21851c0b2f7Stbbdev         // validate that the local body matches the global execute_count and both are correct
21951c0b2f7Stbbdev         inc_functor body_copy = tbb::flow::copy_body<inc_functor>( exe_node );
22051c0b2f7Stbbdev         const size_t expected_count = N/2 * MAX_NODES * MAX_NODES * ( MAX_NODES + 1 ) + MAX_NODES + Offset;
22151c0b2f7Stbbdev         size_t global_count = global_execute_count;
22251c0b2f7Stbbdev         size_t inc_count = body_copy.local_execute_count;
22351c0b2f7Stbbdev         CHECK(global_count == expected_count);
22451c0b2f7Stbbdev         CHECK(global_count == inc_count );
22551c0b2f7Stbbdev         g.reset(tbb::flow::rf_reset_bodies);
22651c0b2f7Stbbdev         body_copy = tbb::flow::copy_body<inc_functor>( exe_node );
22751c0b2f7Stbbdev         inc_count = body_copy.local_execute_count;
22851c0b2f7Stbbdev         CHECK_MESSAGE( Offset == inc_count, "reset(rf_reset_bodies) did not reset functor" );
22951c0b2f7Stbbdev     }
23051c0b2f7Stbbdev }
23151c0b2f7Stbbdev 
23251c0b2f7Stbbdev template< typename InputType, typename OutputType >
run_buffered_levels(int c)23351c0b2f7Stbbdev void run_buffered_levels( int c ) {
23451c0b2f7Stbbdev     buffered_levels<InputType,OutputType>( c, []( InputType i ) -> OutputType { return harness_graph_executor<InputType, OutputType>::func(i); } );
23551c0b2f7Stbbdev     buffered_levels<InputType,OutputType>( c, &harness_graph_executor<InputType, OutputType>::func );
23651c0b2f7Stbbdev     buffered_levels<InputType,OutputType>( c, typename harness_graph_executor<InputType, OutputType>::functor() );
23751c0b2f7Stbbdev     buffered_levels_with_copy<InputType,OutputType>( c );
23851c0b2f7Stbbdev }
23951c0b2f7Stbbdev 
24051c0b2f7Stbbdev 
24151c0b2f7Stbbdev //! Performs test on executable nodes with limited concurrency
24251c0b2f7Stbbdev /** These tests check:
24351c0b2f7Stbbdev     1) that the nodes will accepts puts up to the concurrency limit,
24451c0b2f7Stbbdev     2) the nodes do not exceed the concurrency limit even when run with more threads (this is checked in the harness_graph_executor),
24551c0b2f7Stbbdev     3) the nodes will receive puts from multiple successors simultaneously,
24651c0b2f7Stbbdev     and 4) the nodes will send to multiple predecessors.
24751c0b2f7Stbbdev     There is no checking of the contents of the messages for corruption.
24851c0b2f7Stbbdev */
24951c0b2f7Stbbdev 
25051c0b2f7Stbbdev template< typename InputType, typename OutputType, typename Body >
concurrency_levels(size_t concurrency,Body body)25151c0b2f7Stbbdev void concurrency_levels( size_t concurrency, Body body ) {
25251c0b2f7Stbbdev 
25351c0b2f7Stbbdev     for ( size_t lc = 1; lc <= concurrency; ++lc ) {
25451c0b2f7Stbbdev         tbb::flow::graph g;
25551c0b2f7Stbbdev 
25651c0b2f7Stbbdev         // Set the execute_counter back to zero in the harness
25751c0b2f7Stbbdev         harness_graph_executor<InputType, OutputType>::execute_count = 0;
25851c0b2f7Stbbdev         // Set the number of current executors to zero.
25951c0b2f7Stbbdev         harness_graph_executor<InputType, OutputType>::current_executors = 0;
26051c0b2f7Stbbdev         // Set the max allowed executors to lc. There is a check in the functor to make sure this is never exceeded.
26151c0b2f7Stbbdev         harness_graph_executor<InputType, OutputType>::max_executors = lc;
26251c0b2f7Stbbdev 
26351c0b2f7Stbbdev         typedef tbb::flow::function_node< InputType, OutputType, tbb::flow::rejecting > fnode_type;
26451c0b2f7Stbbdev         fnode_type exe_node( g, lc, body );
26551c0b2f7Stbbdev 
26651c0b2f7Stbbdev         for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
26751c0b2f7Stbbdev 
26851c0b2f7Stbbdev             std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers;
26951c0b2f7Stbbdev             for (size_t i = 0; i < num_receivers; ++i) {
27051c0b2f7Stbbdev                 receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) );
27151c0b2f7Stbbdev             }
27251c0b2f7Stbbdev 
27351c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
27451c0b2f7Stbbdev                 tbb::flow::make_edge( exe_node, *receivers[r] );
27551c0b2f7Stbbdev             }
27651c0b2f7Stbbdev 
27751c0b2f7Stbbdev             std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders;
27851c0b2f7Stbbdev 
27951c0b2f7Stbbdev             for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) {
28051c0b2f7Stbbdev                 senders.clear();
28151c0b2f7Stbbdev                 {
28251c0b2f7Stbbdev                     // Exclusively lock m to prevent exe_node from finishing
28351c0b2f7Stbbdev                     tbb::spin_rw_mutex::scoped_lock l(
28451c0b2f7Stbbdev                         harness_graph_executor<InputType, OutputType>::template mutex_holder<tbb::spin_rw_mutex>::mutex
28551c0b2f7Stbbdev                     );
28651c0b2f7Stbbdev 
28751c0b2f7Stbbdev                     // put to lc level, it will accept and then block at m
28851c0b2f7Stbbdev                     for ( size_t c = 0 ; c < lc ; ++c ) {
28951c0b2f7Stbbdev                         CHECK( exe_node.try_put( InputType() ) == true );
29051c0b2f7Stbbdev                     }
29151c0b2f7Stbbdev                     // it only accepts to lc level
29251c0b2f7Stbbdev                     CHECK( exe_node.try_put( InputType() ) == false );
29351c0b2f7Stbbdev 
29451c0b2f7Stbbdev                     for (size_t s = 0; s < num_senders; ++s ) {
29551c0b2f7Stbbdev                         senders.push_back( std::make_shared<harness_counting_sender<InputType>>() );
29651c0b2f7Stbbdev                         // register a sender
29751c0b2f7Stbbdev                         senders.back()->my_limit = N;
29851c0b2f7Stbbdev                         exe_node.register_predecessor( *senders.back() );
29951c0b2f7Stbbdev                     }
30051c0b2f7Stbbdev 
30151c0b2f7Stbbdev                 } // release lock at end of scope, setting the exe node free to continue
30251c0b2f7Stbbdev                 // wait for graph to settle down
30351c0b2f7Stbbdev                 g.wait_for_all();
30451c0b2f7Stbbdev 
30551c0b2f7Stbbdev                 // confirm that each sender was requested from N times
30651c0b2f7Stbbdev                 for (size_t s = 0; s < num_senders; ++s ) {
30751c0b2f7Stbbdev                     size_t n = senders[s]->my_received;
30851c0b2f7Stbbdev                     CHECK( n == N );
30951c0b2f7Stbbdev                     CHECK( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_node );
31051c0b2f7Stbbdev                 }
31151c0b2f7Stbbdev                 // confirm that each receivers got N * num_senders + the initial lc puts
31251c0b2f7Stbbdev                 for (size_t r = 0; r < num_receivers; ++r ) {
31351c0b2f7Stbbdev                     size_t n = receivers[r]->my_count;
31451c0b2f7Stbbdev                     CHECK( n == num_senders*N+lc );
31551c0b2f7Stbbdev                     receivers[r]->my_count = 0;
31651c0b2f7Stbbdev                 }
31751c0b2f7Stbbdev             }
31851c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
31951c0b2f7Stbbdev                 tbb::flow::remove_edge( exe_node, *receivers[r] );
32051c0b2f7Stbbdev             }
32151c0b2f7Stbbdev             CHECK( exe_node.try_put( InputType() ) == true );
32251c0b2f7Stbbdev             g.wait_for_all();
32351c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
32451c0b2f7Stbbdev                 CHECK( int(receivers[r]->my_count) == 0 );
32551c0b2f7Stbbdev             }
32651c0b2f7Stbbdev         }
32751c0b2f7Stbbdev     }
32851c0b2f7Stbbdev }
32951c0b2f7Stbbdev 
33051c0b2f7Stbbdev 
33151c0b2f7Stbbdev template< typename InputType, typename OutputType >
run_concurrency_levels(int c)33251c0b2f7Stbbdev void run_concurrency_levels( int c ) {
33351c0b2f7Stbbdev     concurrency_levels<InputType,OutputType>( c, []( InputType i ) -> OutputType { return harness_graph_executor<InputType, OutputType>::template tfunc<tbb::spin_rw_mutex>(i); } );
33451c0b2f7Stbbdev     concurrency_levels<InputType,OutputType>( c, &harness_graph_executor<InputType, OutputType>::template tfunc<tbb::spin_rw_mutex> );
33551c0b2f7Stbbdev     concurrency_levels<InputType,OutputType>( c, typename harness_graph_executor<InputType, OutputType>::template tfunctor<tbb::spin_rw_mutex>() );
33651c0b2f7Stbbdev }
33751c0b2f7Stbbdev 
33851c0b2f7Stbbdev 
33951c0b2f7Stbbdev struct empty_no_assign {
empty_no_assignempty_no_assign34051c0b2f7Stbbdev    empty_no_assign() {}
empty_no_assignempty_no_assign34151c0b2f7Stbbdev    empty_no_assign( int ) {}
operator intempty_no_assign34251c0b2f7Stbbdev    operator int() { return 0; }
34351c0b2f7Stbbdev };
34451c0b2f7Stbbdev 
34551c0b2f7Stbbdev template< typename InputType >
34651c0b2f7Stbbdev struct parallel_puts : private utils::NoAssign {
34751c0b2f7Stbbdev 
34851c0b2f7Stbbdev     tbb::flow::receiver< InputType > * const my_exe_node;
34951c0b2f7Stbbdev 
parallel_putsparallel_puts35051c0b2f7Stbbdev     parallel_puts( tbb::flow::receiver< InputType > &exe_node ) : my_exe_node(&exe_node) {}
35151c0b2f7Stbbdev 
operator ()parallel_puts35251c0b2f7Stbbdev     void operator()( int ) const  {
35351c0b2f7Stbbdev         for ( int i = 0; i < N; ++i ) {
35451c0b2f7Stbbdev             // the nodes will accept all puts
35551c0b2f7Stbbdev             CHECK( my_exe_node->try_put( InputType() ) == true );
35651c0b2f7Stbbdev         }
35751c0b2f7Stbbdev     }
35851c0b2f7Stbbdev 
35951c0b2f7Stbbdev };
36051c0b2f7Stbbdev 
36151c0b2f7Stbbdev //! Performs test on executable nodes with unlimited concurrency
36251c0b2f7Stbbdev /** These tests check:
36351c0b2f7Stbbdev     1) that the nodes will accept all puts
36451c0b2f7Stbbdev     2) the nodes will receive puts from multiple predecessors simultaneously,
36551c0b2f7Stbbdev     and 3) the nodes will send to multiple successors.
36651c0b2f7Stbbdev     There is no checking of the contents of the messages for corruption.
36751c0b2f7Stbbdev */
36851c0b2f7Stbbdev 
36951c0b2f7Stbbdev template< typename InputType, typename OutputType, typename Body >
unlimited_concurrency(Body body)37051c0b2f7Stbbdev void unlimited_concurrency( Body body ) {
37151c0b2f7Stbbdev 
37251c0b2f7Stbbdev     for (unsigned p = 1; p < 2*utils::MaxThread; ++p) {
37351c0b2f7Stbbdev         tbb::flow::graph g;
37451c0b2f7Stbbdev         tbb::flow::function_node< InputType, OutputType, tbb::flow::rejecting > exe_node( g, tbb::flow::unlimited, body );
37551c0b2f7Stbbdev 
37651c0b2f7Stbbdev         for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
37751c0b2f7Stbbdev 
37851c0b2f7Stbbdev             std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers;
37951c0b2f7Stbbdev             for (size_t i = 0; i < num_receivers; ++i) {
38051c0b2f7Stbbdev                 receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) );
38151c0b2f7Stbbdev             }
38251c0b2f7Stbbdev 
38351c0b2f7Stbbdev             harness_graph_executor<InputType, OutputType>::execute_count = 0;
38451c0b2f7Stbbdev 
38551c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
38651c0b2f7Stbbdev                 tbb::flow::make_edge( exe_node, *receivers[r] );
38751c0b2f7Stbbdev             }
38851c0b2f7Stbbdev 
38951c0b2f7Stbbdev             utils::NativeParallelFor( p, parallel_puts<InputType>(exe_node) );
39051c0b2f7Stbbdev             g.wait_for_all();
39151c0b2f7Stbbdev 
39251c0b2f7Stbbdev             // 2) the nodes will receive puts from multiple predecessors simultaneously,
39351c0b2f7Stbbdev             size_t ec = harness_graph_executor<InputType, OutputType>::execute_count;
39451c0b2f7Stbbdev             CHECK( ec == p*N );
39551c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
39651c0b2f7Stbbdev                 size_t c = receivers[r]->my_count;
39751c0b2f7Stbbdev                 // 3) the nodes will send to multiple successors.
39851c0b2f7Stbbdev                 CHECK( c == p*N );
39951c0b2f7Stbbdev             }
40051c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
40151c0b2f7Stbbdev                 tbb::flow::remove_edge( exe_node, *receivers[r] );
40251c0b2f7Stbbdev             }
40351c0b2f7Stbbdev             }
40451c0b2f7Stbbdev         }
40551c0b2f7Stbbdev     }
40651c0b2f7Stbbdev 
40751c0b2f7Stbbdev template< typename InputType, typename OutputType >
run_unlimited_concurrency()40851c0b2f7Stbbdev void run_unlimited_concurrency() {
40951c0b2f7Stbbdev     harness_graph_executor<InputType, OutputType>::max_executors = 0;
41051c0b2f7Stbbdev     unlimited_concurrency<InputType,OutputType>( []( InputType i ) -> OutputType { return harness_graph_executor<InputType, OutputType>::func(i); } );
41151c0b2f7Stbbdev     unlimited_concurrency<InputType,OutputType>( &harness_graph_executor<InputType, OutputType>::func );
41251c0b2f7Stbbdev     unlimited_concurrency<InputType,OutputType>( typename harness_graph_executor<InputType, OutputType>::functor() );
41351c0b2f7Stbbdev }
41451c0b2f7Stbbdev 
41551c0b2f7Stbbdev struct continue_msg_to_int {
41651c0b2f7Stbbdev     int my_int;
continue_msg_to_intcontinue_msg_to_int41751c0b2f7Stbbdev     continue_msg_to_int(int x) : my_int(x) {}
operator ()continue_msg_to_int41851c0b2f7Stbbdev     int operator()(tbb::flow::continue_msg) { return my_int; }
41951c0b2f7Stbbdev };
42051c0b2f7Stbbdev 
test_function_node_with_continue_msg_as_input()42151c0b2f7Stbbdev void test_function_node_with_continue_msg_as_input() {
42251c0b2f7Stbbdev     // If this function terminates, then this test is successful
42351c0b2f7Stbbdev     tbb::flow::graph g;
42451c0b2f7Stbbdev 
42551c0b2f7Stbbdev     tbb::flow::broadcast_node<tbb::flow::continue_msg> Start(g);
42651c0b2f7Stbbdev 
42751c0b2f7Stbbdev     tbb::flow::function_node<tbb::flow::continue_msg, int, tbb::flow::rejecting> FN1( g, tbb::flow::serial, continue_msg_to_int(42));
42851c0b2f7Stbbdev     tbb::flow::function_node<tbb::flow::continue_msg, int, tbb::flow::rejecting> FN2( g, tbb::flow::serial, continue_msg_to_int(43));
42951c0b2f7Stbbdev 
43051c0b2f7Stbbdev     tbb::flow::make_edge( Start, FN1 );
43151c0b2f7Stbbdev     tbb::flow::make_edge( Start, FN2 );
43251c0b2f7Stbbdev 
43351c0b2f7Stbbdev     Start.try_put( tbb::flow::continue_msg() );
43451c0b2f7Stbbdev     g.wait_for_all();
43551c0b2f7Stbbdev }
43651c0b2f7Stbbdev 
43751c0b2f7Stbbdev //! Tests limited concurrency cases for nodes that accept data messages
test_concurrency(int num_threads)43851c0b2f7Stbbdev void test_concurrency(int num_threads) {
43951c0b2f7Stbbdev     tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, num_threads);
44051c0b2f7Stbbdev     run_concurrency_levels<int,int>(num_threads);
44151c0b2f7Stbbdev     run_concurrency_levels<int,tbb::flow::continue_msg>(num_threads);
44251c0b2f7Stbbdev     run_buffered_levels<int, int>(num_threads);
44351c0b2f7Stbbdev     run_unlimited_concurrency<int,int>();
44451c0b2f7Stbbdev     run_unlimited_concurrency<int,empty_no_assign>();
44551c0b2f7Stbbdev     run_unlimited_concurrency<empty_no_assign,int>();
44651c0b2f7Stbbdev     run_unlimited_concurrency<empty_no_assign,empty_no_assign>();
44751c0b2f7Stbbdev     run_unlimited_concurrency<int,tbb::flow::continue_msg>();
44851c0b2f7Stbbdev     run_unlimited_concurrency<empty_no_assign,tbb::flow::continue_msg>();
44951c0b2f7Stbbdev     test_function_node_with_continue_msg_as_input();
45051c0b2f7Stbbdev }
45151c0b2f7Stbbdev 
45251c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
45351c0b2f7Stbbdev #include <array>
45451c0b2f7Stbbdev #include <vector>
test_follows_and_precedes_api()45551c0b2f7Stbbdev void test_follows_and_precedes_api() {
45651c0b2f7Stbbdev     using msg_t = tbb::flow::continue_msg;
45751c0b2f7Stbbdev 
45851c0b2f7Stbbdev     std::array<msg_t, 3> messages_for_follows = { {msg_t(), msg_t(), msg_t()} };
45951c0b2f7Stbbdev     std::vector<msg_t> messages_for_precedes = { msg_t() };
46051c0b2f7Stbbdev 
46151c0b2f7Stbbdev     pass_through<msg_t> pass_msg;
46251c0b2f7Stbbdev 
46351c0b2f7Stbbdev     follows_and_precedes_testing::test_follows
46451c0b2f7Stbbdev         <msg_t, tbb::flow::function_node<msg_t, msg_t>>
46551c0b2f7Stbbdev         (messages_for_follows, tbb::flow::unlimited, pass_msg);
46651c0b2f7Stbbdev     follows_and_precedes_testing::test_precedes
46751c0b2f7Stbbdev         <msg_t, tbb::flow::function_node<msg_t, msg_t>>
46851c0b2f7Stbbdev         (messages_for_precedes, tbb::flow::unlimited, pass_msg, tbb::flow::node_priority_t(1));
46951c0b2f7Stbbdev }
47051c0b2f7Stbbdev #endif
47151c0b2f7Stbbdev 
47251c0b2f7Stbbdev 
47351c0b2f7Stbbdev //! Test various node bodies with concurrency
47451c0b2f7Stbbdev //! \brief \ref error_guessing
47551c0b2f7Stbbdev TEST_CASE("Concurrency test") {
47651c0b2f7Stbbdev     for(unsigned int p = utils::MinThread; p <= utils::MaxThread; ++p ) {
47751c0b2f7Stbbdev         test_concurrency(p);
47851c0b2f7Stbbdev     }
47951c0b2f7Stbbdev }
48051c0b2f7Stbbdev 
48151c0b2f7Stbbdev //! NativeParallelFor testing with various concurrency settings
48251c0b2f7Stbbdev //! \brief \ref error_guessing
48351c0b2f7Stbbdev TEST_CASE("Lightweight testing"){
48451c0b2f7Stbbdev    lightweight_testing::test<tbb::flow::function_node>(10);
48551c0b2f7Stbbdev }
48651c0b2f7Stbbdev 
48751c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
48851c0b2f7Stbbdev //! Test follows and precedes API
48951c0b2f7Stbbdev //! \brief \ref error_guessing
49051c0b2f7Stbbdev TEST_CASE("Flowgraph node set test"){
49151c0b2f7Stbbdev      test_follows_and_precedes_api();
49251c0b2f7Stbbdev }
49351c0b2f7Stbbdev #endif
49451c0b2f7Stbbdev 
49551c0b2f7Stbbdev //! try_release and try_consume test
49651c0b2f7Stbbdev //! \brief \ref error_guessing
49751c0b2f7Stbbdev TEST_CASE("try_release try_consume"){
49851c0b2f7Stbbdev     tbb::flow::graph g;
49951c0b2f7Stbbdev 
__anondcbd16150402(const int&v)50051c0b2f7Stbbdev     tbb::flow::function_node<int, int> fn(g, tbb::flow::unlimited, [](const int&v){return v;});
50151c0b2f7Stbbdev 
50251c0b2f7Stbbdev     CHECK_MESSAGE((fn.try_release()==false), "try_release should initially return false on a node");
50351c0b2f7Stbbdev     CHECK_MESSAGE((fn.try_consume()==false), "try_consume should initially return false on a node");
50451c0b2f7Stbbdev }
505*478de5b1Stbbdev 
506*478de5b1Stbbdev #if __TBB_CPP20_CONCEPTS_PRESENT
507*478de5b1Stbbdev //! \brief \ref error_guessing
508*478de5b1Stbbdev TEST_CASE("constraints for function_node input and output") {
509*478de5b1Stbbdev     struct InputObject {
510*478de5b1Stbbdev         InputObject() = default;
511*478de5b1Stbbdev         InputObject( const InputObject& ) = default;
512*478de5b1Stbbdev     };
513*478de5b1Stbbdev     struct OutputObject : test_concepts::Copyable {};
514*478de5b1Stbbdev 
515*478de5b1Stbbdev     static_assert(utils::well_formed_instantiation<tbb::flow::function_node, InputObject, OutputObject>);
516*478de5b1Stbbdev     static_assert(utils::well_formed_instantiation<tbb::flow::function_node, int, int>);
517*478de5b1Stbbdev     static_assert(!utils::well_formed_instantiation<tbb::flow::function_node, test_concepts::NonCopyable, OutputObject>);
518*478de5b1Stbbdev     static_assert(!utils::well_formed_instantiation<tbb::flow::function_node, test_concepts::NonDefaultInitializable, OutputObject>);
519*478de5b1Stbbdev     static_assert(!utils::well_formed_instantiation<tbb::flow::function_node, InputObject, test_concepts::NonCopyable>);
520*478de5b1Stbbdev }
521*478de5b1Stbbdev 
522*478de5b1Stbbdev template <typename Input, typename Output, typename Body>
523*478de5b1Stbbdev concept can_call_function_node_ctor = requires( tbb::flow::graph& graph, std::size_t concurrency, Body body,
524*478de5b1Stbbdev                                                 tbb::flow::node_priority_t priority, tbb::flow::buffer_node<int>& f ) {
525*478de5b1Stbbdev     tbb::flow::function_node<Input, Output>(graph, concurrency, body);
526*478de5b1Stbbdev     tbb::flow::function_node<Input, Output>(graph, concurrency, body, priority);
527*478de5b1Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
528*478de5b1Stbbdev     tbb::flow::function_node<Input, Output>(tbb::flow::follows(f), concurrency, body);
529*478de5b1Stbbdev     tbb::flow::function_node<Input, Output>(tbb::flow::follows(f), concurrency, body, priority);
530*478de5b1Stbbdev #endif
531*478de5b1Stbbdev };
532*478de5b1Stbbdev 
533*478de5b1Stbbdev //! \brief \ref error_guessing
534*478de5b1Stbbdev TEST_CASE("constraints for function_node body") {
535*478de5b1Stbbdev     using input_type = int;
536*478de5b1Stbbdev     using output_type = int;
537*478de5b1Stbbdev     using namespace test_concepts::function_node_body;
538*478de5b1Stbbdev 
539*478de5b1Stbbdev     static_assert(can_call_function_node_ctor<input_type, output_type, Correct<input_type, output_type>>);
540*478de5b1Stbbdev     static_assert(!can_call_function_node_ctor<input_type, output_type, NonCopyable<input_type, output_type>>);
541*478de5b1Stbbdev     static_assert(!can_call_function_node_ctor<input_type, output_type, NonDestructible<input_type, output_type>>);
542*478de5b1Stbbdev     static_assert(!can_call_function_node_ctor<input_type, output_type, NoOperatorRoundBrackets<input_type, output_type>>);
543*478de5b1Stbbdev     static_assert(!can_call_function_node_ctor<input_type, output_type, WrongInputRoundBrackets<input_type, output_type>>);
544*478de5b1Stbbdev     static_assert(!can_call_function_node_ctor<input_type, output_type, WrongReturnRoundBrackets<input_type, output_type>>);
545*478de5b1Stbbdev }
546*478de5b1Stbbdev #endif // __TBB_CPP20_CONCEPTS_PRESENT
547