xref: /oneTBB/test/tbb/test_function_node.cpp (revision b15aabb3)
151c0b2f7Stbbdev /*
2*b15aabb3Stbbdev     Copyright (c) 2005-2021 Intel Corporation
351c0b2f7Stbbdev 
451c0b2f7Stbbdev     Licensed under the Apache License, Version 2.0 (the "License");
551c0b2f7Stbbdev     you may not use this file except in compliance with the License.
651c0b2f7Stbbdev     You may obtain a copy of the License at
751c0b2f7Stbbdev 
851c0b2f7Stbbdev         http://www.apache.org/licenses/LICENSE-2.0
951c0b2f7Stbbdev 
1051c0b2f7Stbbdev     Unless required by applicable law or agreed to in writing, software
1151c0b2f7Stbbdev     distributed under the License is distributed on an "AS IS" BASIS,
1251c0b2f7Stbbdev     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1351c0b2f7Stbbdev     See the License for the specific language governing permissions and
1451c0b2f7Stbbdev     limitations under the License.
1551c0b2f7Stbbdev */
1651c0b2f7Stbbdev 
17*b15aabb3Stbbdev #if __INTEL_COMPILER && _MSC_VER
18*b15aabb3Stbbdev #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
19*b15aabb3Stbbdev #endif
20*b15aabb3Stbbdev 
2151c0b2f7Stbbdev #include "common/config.h"
2251c0b2f7Stbbdev 
2351c0b2f7Stbbdev // TODO revamp: move parts dependent on __TBB_EXTRA_DEBUG into separate test(s) since having these
2451c0b2f7Stbbdev // parts in all of tests might make testing of the product, which is different from what is actually
2551c0b2f7Stbbdev // released.
2651c0b2f7Stbbdev #define __TBB_EXTRA_DEBUG 1
2751c0b2f7Stbbdev #include "tbb/flow_graph.h"
2851c0b2f7Stbbdev #include "tbb/spin_rw_mutex.h"
2951c0b2f7Stbbdev #include "tbb/global_control.h"
3051c0b2f7Stbbdev 
3151c0b2f7Stbbdev #include "common/test.h"
3251c0b2f7Stbbdev #include "common/utils.h"
3351c0b2f7Stbbdev #include "common/graph_utils.h"
3451c0b2f7Stbbdev #include "common/test_follows_and_precedes_api.h"
3551c0b2f7Stbbdev 
3651c0b2f7Stbbdev 
3751c0b2f7Stbbdev //! \file test_function_node.cpp
3851c0b2f7Stbbdev //! \brief Test for [flow_graph.function_node] specification
3951c0b2f7Stbbdev 
4051c0b2f7Stbbdev 
4151c0b2f7Stbbdev #define N 100
4251c0b2f7Stbbdev #define MAX_NODES 4
4351c0b2f7Stbbdev 
4451c0b2f7Stbbdev //! Performs test on function nodes with limited concurrency and buffering
4551c0b2f7Stbbdev /** These tests check:
4651c0b2f7Stbbdev     1) that the number of executing copies never exceed the concurrency limit
4751c0b2f7Stbbdev     2) that the node never rejects
4851c0b2f7Stbbdev     3) that no items are lost
4951c0b2f7Stbbdev     and 4) all of this happens even if there are multiple predecessors and successors
5051c0b2f7Stbbdev */
5151c0b2f7Stbbdev 
5251c0b2f7Stbbdev template<typename IO>
5351c0b2f7Stbbdev struct pass_through {
5451c0b2f7Stbbdev     IO operator()(const IO& i) { return i; }
5551c0b2f7Stbbdev };
5651c0b2f7Stbbdev 
5751c0b2f7Stbbdev template< typename InputType, typename OutputType, typename Body >
5851c0b2f7Stbbdev void buffered_levels( size_t concurrency, Body body ) {
5951c0b2f7Stbbdev 
6051c0b2f7Stbbdev    // Do for lc = 1 to concurrency level
6151c0b2f7Stbbdev    for ( size_t lc = 1; lc <= concurrency; ++lc ) {
6251c0b2f7Stbbdev    tbb::flow::graph g;
6351c0b2f7Stbbdev 
6451c0b2f7Stbbdev    // Set the execute_counter back to zero in the harness
6551c0b2f7Stbbdev    harness_graph_executor<InputType, OutputType>::execute_count = 0;
6651c0b2f7Stbbdev    // Set the number of current executors to zero.
6751c0b2f7Stbbdev    harness_graph_executor<InputType, OutputType>::current_executors = 0;
6851c0b2f7Stbbdev    // Set the max allowed executors to lc.  There is a check in the functor to make sure this is never exceeded.
6951c0b2f7Stbbdev    harness_graph_executor<InputType, OutputType>::max_executors = lc;
7051c0b2f7Stbbdev 
7151c0b2f7Stbbdev    // Create the function_node with the appropriate concurrency level, and use default buffering
7251c0b2f7Stbbdev    tbb::flow::function_node< InputType, OutputType > exe_node( g, lc, body );
7351c0b2f7Stbbdev    tbb::flow::function_node<InputType, InputType> pass_thru( g, tbb::flow::unlimited, pass_through<InputType>());
7451c0b2f7Stbbdev 
7551c0b2f7Stbbdev    // Create a vector of identical exe_nodes and pass_thrus
7651c0b2f7Stbbdev    std::vector< tbb::flow::function_node< InputType, OutputType > > exe_vec(2, exe_node);
7751c0b2f7Stbbdev    std::vector< tbb::flow::function_node< InputType, InputType > > pass_thru_vec(2, pass_thru);
7851c0b2f7Stbbdev    // Attach each pass_thru to its corresponding exe_node
7951c0b2f7Stbbdev    for (size_t node_idx=0; node_idx<exe_vec.size(); ++node_idx) {
8051c0b2f7Stbbdev        tbb::flow::make_edge(pass_thru_vec[node_idx], exe_vec[node_idx]);
8151c0b2f7Stbbdev    }
8251c0b2f7Stbbdev 
8351c0b2f7Stbbdev    // TODO: why the test is executed serially for the node pairs, not concurrently?
8451c0b2f7Stbbdev    for (size_t node_idx=0; node_idx<exe_vec.size(); ++node_idx) {
8551c0b2f7Stbbdev    // For num_receivers = 1 to MAX_NODES
8651c0b2f7Stbbdev        for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
8751c0b2f7Stbbdev            // Create num_receivers counting receivers and connect the exe_vec[node_idx] to them.
8851c0b2f7Stbbdev            std::vector< std::shared_ptr<harness_mapped_receiver<OutputType>> > receivers;
8951c0b2f7Stbbdev            for (size_t i = 0; i < num_receivers; i++) {
9051c0b2f7Stbbdev                receivers.push_back( std::make_shared<harness_mapped_receiver<OutputType>>(g) );
9151c0b2f7Stbbdev            }
9251c0b2f7Stbbdev 
9351c0b2f7Stbbdev            for (size_t r = 0; r < num_receivers; ++r ) {
9451c0b2f7Stbbdev                tbb::flow::make_edge( exe_vec[node_idx], *receivers[r] );
9551c0b2f7Stbbdev            }
9651c0b2f7Stbbdev 
9751c0b2f7Stbbdev            // Do the test with varying numbers of senders
9851c0b2f7Stbbdev            std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders;
9951c0b2f7Stbbdev            for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) {
10051c0b2f7Stbbdev                // Create num_senders senders, set there message limit each to N, and connect them to
10151c0b2f7Stbbdev                // pass_thru_vec[node_idx]
10251c0b2f7Stbbdev                senders.clear();
10351c0b2f7Stbbdev                for (size_t s = 0; s < num_senders; ++s ) {
10451c0b2f7Stbbdev                    senders.push_back( std::make_shared<harness_counting_sender<InputType>>() );
10551c0b2f7Stbbdev                    senders.back()->my_limit = N;
10651c0b2f7Stbbdev                    senders.back()->register_successor(pass_thru_vec[node_idx] );
10751c0b2f7Stbbdev                }
10851c0b2f7Stbbdev 
10951c0b2f7Stbbdev                // Initialize the receivers so they know how many senders and messages to check for
11051c0b2f7Stbbdev                for (size_t r = 0; r < num_receivers; ++r ) {
11151c0b2f7Stbbdev                    receivers[r]->initialize_map( N, num_senders );
11251c0b2f7Stbbdev                }
11351c0b2f7Stbbdev 
11451c0b2f7Stbbdev                // Do the test
11551c0b2f7Stbbdev                utils::NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) );
11651c0b2f7Stbbdev                g.wait_for_all();
11751c0b2f7Stbbdev 
11851c0b2f7Stbbdev                // confirm that each sender was requested from N times
11951c0b2f7Stbbdev                for (size_t s = 0; s < num_senders; ++s ) {
12051c0b2f7Stbbdev                    size_t n = senders[s]->my_received;
12151c0b2f7Stbbdev                    CHECK( n == N );
12251c0b2f7Stbbdev                    CHECK( senders[s]->my_receiver.load(std::memory_order_relaxed) == &pass_thru_vec[node_idx] );
12351c0b2f7Stbbdev                }
12451c0b2f7Stbbdev                // validate the receivers
12551c0b2f7Stbbdev                for (size_t r = 0; r < num_receivers; ++r ) {
12651c0b2f7Stbbdev                    receivers[r]->validate();
12751c0b2f7Stbbdev                }
12851c0b2f7Stbbdev            }
12951c0b2f7Stbbdev            for (size_t r = 0; r < num_receivers; ++r ) {
13051c0b2f7Stbbdev                tbb::flow::remove_edge( exe_vec[node_idx], *receivers[r] );
13151c0b2f7Stbbdev            }
13251c0b2f7Stbbdev            CHECK( exe_vec[node_idx].try_put( InputType() ) == true );
13351c0b2f7Stbbdev            g.wait_for_all();
13451c0b2f7Stbbdev            for (size_t r = 0; r < num_receivers; ++r ) {
13551c0b2f7Stbbdev                // since it's detached, nothing should have changed
13651c0b2f7Stbbdev                receivers[r]->validate();
13751c0b2f7Stbbdev            }
13851c0b2f7Stbbdev 
13951c0b2f7Stbbdev        } // for num_receivers
14051c0b2f7Stbbdev     } // for node_idx
14151c0b2f7Stbbdev     } // for concurrency level lc
14251c0b2f7Stbbdev }
14351c0b2f7Stbbdev 
14451c0b2f7Stbbdev const size_t Offset = 123;
14551c0b2f7Stbbdev std::atomic<size_t> global_execute_count;
14651c0b2f7Stbbdev 
14751c0b2f7Stbbdev struct inc_functor {
14851c0b2f7Stbbdev 
14951c0b2f7Stbbdev     std::atomic<size_t> local_execute_count;
15051c0b2f7Stbbdev     inc_functor( ) { local_execute_count = 0; }
15151c0b2f7Stbbdev     inc_functor( const inc_functor &f ) { local_execute_count = size_t(f.local_execute_count); }
15251c0b2f7Stbbdev     void operator=( const inc_functor &f ) { local_execute_count = size_t(f.local_execute_count); }
15351c0b2f7Stbbdev 
15451c0b2f7Stbbdev     int operator()( int i ) {
15551c0b2f7Stbbdev        ++global_execute_count;
15651c0b2f7Stbbdev        ++local_execute_count;
15751c0b2f7Stbbdev        return i;
15851c0b2f7Stbbdev     }
15951c0b2f7Stbbdev 
16051c0b2f7Stbbdev };
16151c0b2f7Stbbdev 
16251c0b2f7Stbbdev template< typename InputType, typename OutputType >
16351c0b2f7Stbbdev void buffered_levels_with_copy( size_t concurrency ) {
16451c0b2f7Stbbdev 
16551c0b2f7Stbbdev     // Do for lc = 1 to concurrency level
16651c0b2f7Stbbdev     for ( size_t lc = 1; lc <= concurrency; ++lc ) {
16751c0b2f7Stbbdev         tbb::flow::graph g;
16851c0b2f7Stbbdev 
16951c0b2f7Stbbdev         inc_functor cf;
17051c0b2f7Stbbdev         cf.local_execute_count = Offset;
17151c0b2f7Stbbdev         global_execute_count = Offset;
17251c0b2f7Stbbdev 
17351c0b2f7Stbbdev         tbb::flow::function_node< InputType, OutputType > exe_node( g, lc, cf );
17451c0b2f7Stbbdev 
17551c0b2f7Stbbdev         for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
17651c0b2f7Stbbdev 
17751c0b2f7Stbbdev             std::vector< std::shared_ptr<harness_mapped_receiver<OutputType>> > receivers;
17851c0b2f7Stbbdev             for (size_t i = 0; i < num_receivers; i++) {
17951c0b2f7Stbbdev                 receivers.push_back( std::make_shared<harness_mapped_receiver<OutputType>>(g) );
18051c0b2f7Stbbdev             }
18151c0b2f7Stbbdev 
18251c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
18351c0b2f7Stbbdev                 tbb::flow::make_edge( exe_node, *receivers[r] );
18451c0b2f7Stbbdev             }
18551c0b2f7Stbbdev 
18651c0b2f7Stbbdev             std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders;
18751c0b2f7Stbbdev             for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) {
18851c0b2f7Stbbdev                 senders.clear();
18951c0b2f7Stbbdev                 for (size_t s = 0; s < num_senders; ++s ) {
19051c0b2f7Stbbdev                     senders.push_back( std::make_shared<harness_counting_sender<InputType>>() );
19151c0b2f7Stbbdev                     senders.back()->my_limit = N;
19251c0b2f7Stbbdev                     tbb::flow::make_edge( *senders.back(), exe_node );
19351c0b2f7Stbbdev                 }
19451c0b2f7Stbbdev 
19551c0b2f7Stbbdev                 for (size_t r = 0; r < num_receivers; ++r ) {
19651c0b2f7Stbbdev                     receivers[r]->initialize_map( N, num_senders );
19751c0b2f7Stbbdev                 }
19851c0b2f7Stbbdev 
19951c0b2f7Stbbdev                 utils::NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) );
20051c0b2f7Stbbdev                 g.wait_for_all();
20151c0b2f7Stbbdev 
20251c0b2f7Stbbdev                 for (size_t s = 0; s < num_senders; ++s ) {
20351c0b2f7Stbbdev                     size_t n = senders[s]->my_received;
20451c0b2f7Stbbdev                     CHECK( n == N );
20551c0b2f7Stbbdev                     CHECK( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_node );
20651c0b2f7Stbbdev                 }
20751c0b2f7Stbbdev                 for (size_t r = 0; r < num_receivers; ++r ) {
20851c0b2f7Stbbdev                     receivers[r]->validate();
20951c0b2f7Stbbdev                 }
21051c0b2f7Stbbdev             }
21151c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
21251c0b2f7Stbbdev                 tbb::flow::remove_edge( exe_node, *receivers[r] );
21351c0b2f7Stbbdev             }
21451c0b2f7Stbbdev             CHECK( exe_node.try_put( InputType() ) == true );
21551c0b2f7Stbbdev             g.wait_for_all();
21651c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
21751c0b2f7Stbbdev                 receivers[r]->validate();
21851c0b2f7Stbbdev             }
21951c0b2f7Stbbdev         }
22051c0b2f7Stbbdev 
22151c0b2f7Stbbdev         // validate that the local body matches the global execute_count and both are correct
22251c0b2f7Stbbdev         inc_functor body_copy = tbb::flow::copy_body<inc_functor>( exe_node );
22351c0b2f7Stbbdev         const size_t expected_count = N/2 * MAX_NODES * MAX_NODES * ( MAX_NODES + 1 ) + MAX_NODES + Offset;
22451c0b2f7Stbbdev         size_t global_count = global_execute_count;
22551c0b2f7Stbbdev         size_t inc_count = body_copy.local_execute_count;
22651c0b2f7Stbbdev         CHECK(global_count == expected_count);
22751c0b2f7Stbbdev         CHECK(global_count == inc_count );
22851c0b2f7Stbbdev         g.reset(tbb::flow::rf_reset_bodies);
22951c0b2f7Stbbdev         body_copy = tbb::flow::copy_body<inc_functor>( exe_node );
23051c0b2f7Stbbdev         inc_count = body_copy.local_execute_count;
23151c0b2f7Stbbdev         CHECK_MESSAGE( Offset == inc_count, "reset(rf_reset_bodies) did not reset functor" );
23251c0b2f7Stbbdev     }
23351c0b2f7Stbbdev }
23451c0b2f7Stbbdev 
23551c0b2f7Stbbdev template< typename InputType, typename OutputType >
23651c0b2f7Stbbdev void run_buffered_levels( int c ) {
23751c0b2f7Stbbdev     buffered_levels<InputType,OutputType>( c, []( InputType i ) -> OutputType { return harness_graph_executor<InputType, OutputType>::func(i); } );
23851c0b2f7Stbbdev     buffered_levels<InputType,OutputType>( c, &harness_graph_executor<InputType, OutputType>::func );
23951c0b2f7Stbbdev     buffered_levels<InputType,OutputType>( c, typename harness_graph_executor<InputType, OutputType>::functor() );
24051c0b2f7Stbbdev     buffered_levels_with_copy<InputType,OutputType>( c );
24151c0b2f7Stbbdev }
24251c0b2f7Stbbdev 
24351c0b2f7Stbbdev 
24451c0b2f7Stbbdev //! Performs test on executable nodes with limited concurrency
24551c0b2f7Stbbdev /** These tests check:
24651c0b2f7Stbbdev     1) that the nodes will accepts puts up to the concurrency limit,
24751c0b2f7Stbbdev     2) the nodes do not exceed the concurrency limit even when run with more threads (this is checked in the harness_graph_executor),
24851c0b2f7Stbbdev     3) the nodes will receive puts from multiple successors simultaneously,
24951c0b2f7Stbbdev     and 4) the nodes will send to multiple predecessors.
25051c0b2f7Stbbdev     There is no checking of the contents of the messages for corruption.
25151c0b2f7Stbbdev */
25251c0b2f7Stbbdev 
25351c0b2f7Stbbdev template< typename InputType, typename OutputType, typename Body >
25451c0b2f7Stbbdev void concurrency_levels( size_t concurrency, Body body ) {
25551c0b2f7Stbbdev 
25651c0b2f7Stbbdev     for ( size_t lc = 1; lc <= concurrency; ++lc ) {
25751c0b2f7Stbbdev         tbb::flow::graph g;
25851c0b2f7Stbbdev 
25951c0b2f7Stbbdev         // Set the execute_counter back to zero in the harness
26051c0b2f7Stbbdev         harness_graph_executor<InputType, OutputType>::execute_count = 0;
26151c0b2f7Stbbdev         // Set the number of current executors to zero.
26251c0b2f7Stbbdev         harness_graph_executor<InputType, OutputType>::current_executors = 0;
26351c0b2f7Stbbdev         // Set the max allowed executors to lc. There is a check in the functor to make sure this is never exceeded.
26451c0b2f7Stbbdev         harness_graph_executor<InputType, OutputType>::max_executors = lc;
26551c0b2f7Stbbdev 
26651c0b2f7Stbbdev         typedef tbb::flow::function_node< InputType, OutputType, tbb::flow::rejecting > fnode_type;
26751c0b2f7Stbbdev         fnode_type exe_node( g, lc, body );
26851c0b2f7Stbbdev 
26951c0b2f7Stbbdev         for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
27051c0b2f7Stbbdev 
27151c0b2f7Stbbdev             std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers;
27251c0b2f7Stbbdev             for (size_t i = 0; i < num_receivers; ++i) {
27351c0b2f7Stbbdev                 receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) );
27451c0b2f7Stbbdev             }
27551c0b2f7Stbbdev 
27651c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
27751c0b2f7Stbbdev                 tbb::flow::make_edge( exe_node, *receivers[r] );
27851c0b2f7Stbbdev             }
27951c0b2f7Stbbdev 
28051c0b2f7Stbbdev             std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders;
28151c0b2f7Stbbdev 
28251c0b2f7Stbbdev             for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) {
28351c0b2f7Stbbdev                 senders.clear();
28451c0b2f7Stbbdev                 {
28551c0b2f7Stbbdev                     // Exclusively lock m to prevent exe_node from finishing
28651c0b2f7Stbbdev                     tbb::spin_rw_mutex::scoped_lock l(
28751c0b2f7Stbbdev                         harness_graph_executor<InputType, OutputType>::template mutex_holder<tbb::spin_rw_mutex>::mutex
28851c0b2f7Stbbdev                     );
28951c0b2f7Stbbdev 
29051c0b2f7Stbbdev                     // put to lc level, it will accept and then block at m
29151c0b2f7Stbbdev                     for ( size_t c = 0 ; c < lc ; ++c ) {
29251c0b2f7Stbbdev                         CHECK( exe_node.try_put( InputType() ) == true );
29351c0b2f7Stbbdev                     }
29451c0b2f7Stbbdev                     // it only accepts to lc level
29551c0b2f7Stbbdev                     CHECK( exe_node.try_put( InputType() ) == false );
29651c0b2f7Stbbdev 
29751c0b2f7Stbbdev                     for (size_t s = 0; s < num_senders; ++s ) {
29851c0b2f7Stbbdev                         senders.push_back( std::make_shared<harness_counting_sender<InputType>>() );
29951c0b2f7Stbbdev                         // register a sender
30051c0b2f7Stbbdev                         senders.back()->my_limit = N;
30151c0b2f7Stbbdev                         exe_node.register_predecessor( *senders.back() );
30251c0b2f7Stbbdev                     }
30351c0b2f7Stbbdev 
30451c0b2f7Stbbdev                 } // release lock at end of scope, setting the exe node free to continue
30551c0b2f7Stbbdev                 // wait for graph to settle down
30651c0b2f7Stbbdev                 g.wait_for_all();
30751c0b2f7Stbbdev 
30851c0b2f7Stbbdev                 // confirm that each sender was requested from N times
30951c0b2f7Stbbdev                 for (size_t s = 0; s < num_senders; ++s ) {
31051c0b2f7Stbbdev                     size_t n = senders[s]->my_received;
31151c0b2f7Stbbdev                     CHECK( n == N );
31251c0b2f7Stbbdev                     CHECK( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_node );
31351c0b2f7Stbbdev                 }
31451c0b2f7Stbbdev                 // confirm that each receivers got N * num_senders + the initial lc puts
31551c0b2f7Stbbdev                 for (size_t r = 0; r < num_receivers; ++r ) {
31651c0b2f7Stbbdev                     size_t n = receivers[r]->my_count;
31751c0b2f7Stbbdev                     CHECK( n == num_senders*N+lc );
31851c0b2f7Stbbdev                     receivers[r]->my_count = 0;
31951c0b2f7Stbbdev                 }
32051c0b2f7Stbbdev             }
32151c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
32251c0b2f7Stbbdev                 tbb::flow::remove_edge( exe_node, *receivers[r] );
32351c0b2f7Stbbdev             }
32451c0b2f7Stbbdev             CHECK( exe_node.try_put( InputType() ) == true );
32551c0b2f7Stbbdev             g.wait_for_all();
32651c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
32751c0b2f7Stbbdev                 CHECK( int(receivers[r]->my_count) == 0 );
32851c0b2f7Stbbdev             }
32951c0b2f7Stbbdev         }
33051c0b2f7Stbbdev     }
33151c0b2f7Stbbdev }
33251c0b2f7Stbbdev 
33351c0b2f7Stbbdev 
33451c0b2f7Stbbdev template< typename InputType, typename OutputType >
33551c0b2f7Stbbdev void run_concurrency_levels( int c ) {
33651c0b2f7Stbbdev     concurrency_levels<InputType,OutputType>( c, []( InputType i ) -> OutputType { return harness_graph_executor<InputType, OutputType>::template tfunc<tbb::spin_rw_mutex>(i); } );
33751c0b2f7Stbbdev     concurrency_levels<InputType,OutputType>( c, &harness_graph_executor<InputType, OutputType>::template tfunc<tbb::spin_rw_mutex> );
33851c0b2f7Stbbdev     concurrency_levels<InputType,OutputType>( c, typename harness_graph_executor<InputType, OutputType>::template tfunctor<tbb::spin_rw_mutex>() );
33951c0b2f7Stbbdev }
34051c0b2f7Stbbdev 
34151c0b2f7Stbbdev 
34251c0b2f7Stbbdev struct empty_no_assign {
34351c0b2f7Stbbdev    empty_no_assign() {}
34451c0b2f7Stbbdev    empty_no_assign( int ) {}
34551c0b2f7Stbbdev    operator int() { return 0; }
34651c0b2f7Stbbdev };
34751c0b2f7Stbbdev 
34851c0b2f7Stbbdev template< typename InputType >
34951c0b2f7Stbbdev struct parallel_puts : private utils::NoAssign {
35051c0b2f7Stbbdev 
35151c0b2f7Stbbdev     tbb::flow::receiver< InputType > * const my_exe_node;
35251c0b2f7Stbbdev 
35351c0b2f7Stbbdev     parallel_puts( tbb::flow::receiver< InputType > &exe_node ) : my_exe_node(&exe_node) {}
35451c0b2f7Stbbdev 
35551c0b2f7Stbbdev     void operator()( int ) const  {
35651c0b2f7Stbbdev         for ( int i = 0; i < N; ++i ) {
35751c0b2f7Stbbdev             // the nodes will accept all puts
35851c0b2f7Stbbdev             CHECK( my_exe_node->try_put( InputType() ) == true );
35951c0b2f7Stbbdev         }
36051c0b2f7Stbbdev     }
36151c0b2f7Stbbdev 
36251c0b2f7Stbbdev };
36351c0b2f7Stbbdev 
36451c0b2f7Stbbdev //! Performs test on executable nodes with unlimited concurrency
36551c0b2f7Stbbdev /** These tests check:
36651c0b2f7Stbbdev     1) that the nodes will accept all puts
36751c0b2f7Stbbdev     2) the nodes will receive puts from multiple predecessors simultaneously,
36851c0b2f7Stbbdev     and 3) the nodes will send to multiple successors.
36951c0b2f7Stbbdev     There is no checking of the contents of the messages for corruption.
37051c0b2f7Stbbdev */
37151c0b2f7Stbbdev 
37251c0b2f7Stbbdev template< typename InputType, typename OutputType, typename Body >
37351c0b2f7Stbbdev void unlimited_concurrency( Body body ) {
37451c0b2f7Stbbdev 
37551c0b2f7Stbbdev     for (unsigned p = 1; p < 2*utils::MaxThread; ++p) {
37651c0b2f7Stbbdev         tbb::flow::graph g;
37751c0b2f7Stbbdev         tbb::flow::function_node< InputType, OutputType, tbb::flow::rejecting > exe_node( g, tbb::flow::unlimited, body );
37851c0b2f7Stbbdev 
37951c0b2f7Stbbdev         for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
38051c0b2f7Stbbdev 
38151c0b2f7Stbbdev             std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers;
38251c0b2f7Stbbdev             for (size_t i = 0; i < num_receivers; ++i) {
38351c0b2f7Stbbdev                 receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) );
38451c0b2f7Stbbdev             }
38551c0b2f7Stbbdev 
38651c0b2f7Stbbdev             harness_graph_executor<InputType, OutputType>::execute_count = 0;
38751c0b2f7Stbbdev 
38851c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
38951c0b2f7Stbbdev                 tbb::flow::make_edge( exe_node, *receivers[r] );
39051c0b2f7Stbbdev             }
39151c0b2f7Stbbdev 
39251c0b2f7Stbbdev             utils::NativeParallelFor( p, parallel_puts<InputType>(exe_node) );
39351c0b2f7Stbbdev             g.wait_for_all();
39451c0b2f7Stbbdev 
39551c0b2f7Stbbdev             // 2) the nodes will receive puts from multiple predecessors simultaneously,
39651c0b2f7Stbbdev             size_t ec = harness_graph_executor<InputType, OutputType>::execute_count;
39751c0b2f7Stbbdev             CHECK( ec == p*N );
39851c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
39951c0b2f7Stbbdev                 size_t c = receivers[r]->my_count;
40051c0b2f7Stbbdev                 // 3) the nodes will send to multiple successors.
40151c0b2f7Stbbdev                 CHECK( c == p*N );
40251c0b2f7Stbbdev             }
40351c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
40451c0b2f7Stbbdev                 tbb::flow::remove_edge( exe_node, *receivers[r] );
40551c0b2f7Stbbdev             }
40651c0b2f7Stbbdev             }
40751c0b2f7Stbbdev         }
40851c0b2f7Stbbdev     }
40951c0b2f7Stbbdev 
41051c0b2f7Stbbdev template< typename InputType, typename OutputType >
41151c0b2f7Stbbdev void run_unlimited_concurrency() {
41251c0b2f7Stbbdev     harness_graph_executor<InputType, OutputType>::max_executors = 0;
41351c0b2f7Stbbdev     unlimited_concurrency<InputType,OutputType>( []( InputType i ) -> OutputType { return harness_graph_executor<InputType, OutputType>::func(i); } );
41451c0b2f7Stbbdev     unlimited_concurrency<InputType,OutputType>( &harness_graph_executor<InputType, OutputType>::func );
41551c0b2f7Stbbdev     unlimited_concurrency<InputType,OutputType>( typename harness_graph_executor<InputType, OutputType>::functor() );
41651c0b2f7Stbbdev }
41751c0b2f7Stbbdev 
41851c0b2f7Stbbdev struct continue_msg_to_int {
41951c0b2f7Stbbdev     int my_int;
42051c0b2f7Stbbdev     continue_msg_to_int(int x) : my_int(x) {}
42151c0b2f7Stbbdev     int operator()(tbb::flow::continue_msg) { return my_int; }
42251c0b2f7Stbbdev };
42351c0b2f7Stbbdev 
42451c0b2f7Stbbdev void test_function_node_with_continue_msg_as_input() {
42551c0b2f7Stbbdev     // If this function terminates, then this test is successful
42651c0b2f7Stbbdev     tbb::flow::graph g;
42751c0b2f7Stbbdev 
42851c0b2f7Stbbdev     tbb::flow::broadcast_node<tbb::flow::continue_msg> Start(g);
42951c0b2f7Stbbdev 
43051c0b2f7Stbbdev     tbb::flow::function_node<tbb::flow::continue_msg, int, tbb::flow::rejecting> FN1( g, tbb::flow::serial, continue_msg_to_int(42));
43151c0b2f7Stbbdev     tbb::flow::function_node<tbb::flow::continue_msg, int, tbb::flow::rejecting> FN2( g, tbb::flow::serial, continue_msg_to_int(43));
43251c0b2f7Stbbdev 
43351c0b2f7Stbbdev     tbb::flow::make_edge( Start, FN1 );
43451c0b2f7Stbbdev     tbb::flow::make_edge( Start, FN2 );
43551c0b2f7Stbbdev 
43651c0b2f7Stbbdev     Start.try_put( tbb::flow::continue_msg() );
43751c0b2f7Stbbdev     g.wait_for_all();
43851c0b2f7Stbbdev }
43951c0b2f7Stbbdev 
44051c0b2f7Stbbdev //! Tests limited concurrency cases for nodes that accept data messages
44151c0b2f7Stbbdev void test_concurrency(int num_threads) {
44251c0b2f7Stbbdev     tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, num_threads);
44351c0b2f7Stbbdev     run_concurrency_levels<int,int>(num_threads);
44451c0b2f7Stbbdev     run_concurrency_levels<int,tbb::flow::continue_msg>(num_threads);
44551c0b2f7Stbbdev     run_buffered_levels<int, int>(num_threads);
44651c0b2f7Stbbdev     run_unlimited_concurrency<int,int>();
44751c0b2f7Stbbdev     run_unlimited_concurrency<int,empty_no_assign>();
44851c0b2f7Stbbdev     run_unlimited_concurrency<empty_no_assign,int>();
44951c0b2f7Stbbdev     run_unlimited_concurrency<empty_no_assign,empty_no_assign>();
45051c0b2f7Stbbdev     run_unlimited_concurrency<int,tbb::flow::continue_msg>();
45151c0b2f7Stbbdev     run_unlimited_concurrency<empty_no_assign,tbb::flow::continue_msg>();
45251c0b2f7Stbbdev     test_function_node_with_continue_msg_as_input();
45351c0b2f7Stbbdev }
45451c0b2f7Stbbdev 
45551c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
45651c0b2f7Stbbdev #include <array>
45751c0b2f7Stbbdev #include <vector>
45851c0b2f7Stbbdev void test_follows_and_precedes_api() {
45951c0b2f7Stbbdev     using msg_t = tbb::flow::continue_msg;
46051c0b2f7Stbbdev 
46151c0b2f7Stbbdev     std::array<msg_t, 3> messages_for_follows = { {msg_t(), msg_t(), msg_t()} };
46251c0b2f7Stbbdev     std::vector<msg_t> messages_for_precedes = { msg_t() };
46351c0b2f7Stbbdev 
46451c0b2f7Stbbdev     pass_through<msg_t> pass_msg;
46551c0b2f7Stbbdev 
46651c0b2f7Stbbdev     follows_and_precedes_testing::test_follows
46751c0b2f7Stbbdev         <msg_t, tbb::flow::function_node<msg_t, msg_t>>
46851c0b2f7Stbbdev         (messages_for_follows, tbb::flow::unlimited, pass_msg);
46951c0b2f7Stbbdev     follows_and_precedes_testing::test_precedes
47051c0b2f7Stbbdev         <msg_t, tbb::flow::function_node<msg_t, msg_t>>
47151c0b2f7Stbbdev         (messages_for_precedes, tbb::flow::unlimited, pass_msg, tbb::flow::node_priority_t(1));
47251c0b2f7Stbbdev }
47351c0b2f7Stbbdev #endif
47451c0b2f7Stbbdev 
47551c0b2f7Stbbdev #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
47651c0b2f7Stbbdev 
47751c0b2f7Stbbdev int function_body_f(const int&) { return 1; }
47851c0b2f7Stbbdev 
47951c0b2f7Stbbdev template <typename Body>
48051c0b2f7Stbbdev void test_deduction_guides_common(Body body) {
48151c0b2f7Stbbdev     using namespace tbb::flow;
48251c0b2f7Stbbdev     graph g;
48351c0b2f7Stbbdev 
48451c0b2f7Stbbdev     function_node f1(g, unlimited, body);
48551c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(f1), function_node<int, int>>);
48651c0b2f7Stbbdev 
48751c0b2f7Stbbdev     function_node f2(g, unlimited, body, rejecting());
48851c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(f2), function_node<int, int, rejecting>>);
48951c0b2f7Stbbdev 
49051c0b2f7Stbbdev     function_node f3(g, unlimited, body, node_priority_t(5));
49151c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(f3), function_node<int, int>>);
49251c0b2f7Stbbdev 
49351c0b2f7Stbbdev     function_node f4(g, unlimited, body, rejecting(), node_priority_t(5));
49451c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(f4), function_node<int, int, rejecting>>);
49551c0b2f7Stbbdev 
49651c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
49751c0b2f7Stbbdev     function_node f5(follows(f2), unlimited, body);
49851c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(f5), function_node<int, int>>);
49951c0b2f7Stbbdev 
50051c0b2f7Stbbdev     function_node f6(follows(f5), unlimited, body, rejecting());
50151c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(f6), function_node<int, int, rejecting>>);
50251c0b2f7Stbbdev 
50351c0b2f7Stbbdev     function_node f7(follows(f6), unlimited, body, node_priority_t(5));
50451c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(f7), function_node<int, int>>);
50551c0b2f7Stbbdev 
50651c0b2f7Stbbdev     function_node f8(follows(f7), unlimited, body, rejecting(), node_priority_t(5));
50751c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(f8), function_node<int, int, rejecting>>);
50851c0b2f7Stbbdev #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
50951c0b2f7Stbbdev 
51051c0b2f7Stbbdev     function_node f9(f1);
51151c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(f9), function_node<int, int>>);
51251c0b2f7Stbbdev }
51351c0b2f7Stbbdev 
51451c0b2f7Stbbdev void test_deduction_guides() {
51551c0b2f7Stbbdev     test_deduction_guides_common([](const int&)->int { return 1; });
51651c0b2f7Stbbdev     test_deduction_guides_common([](const int&) mutable ->int { return 1; });
51751c0b2f7Stbbdev     test_deduction_guides_common(function_body_f);
51851c0b2f7Stbbdev }
51951c0b2f7Stbbdev 
52051c0b2f7Stbbdev #endif
52151c0b2f7Stbbdev 
52251c0b2f7Stbbdev //! Test various node bodies with concurrency
52351c0b2f7Stbbdev //! \brief \ref error_guessing
52451c0b2f7Stbbdev TEST_CASE("Concurrency test") {
52551c0b2f7Stbbdev     for(unsigned int p = utils::MinThread; p <= utils::MaxThread; ++p ) {
52651c0b2f7Stbbdev         test_concurrency(p);
52751c0b2f7Stbbdev     }
52851c0b2f7Stbbdev }
52951c0b2f7Stbbdev 
53051c0b2f7Stbbdev //! NativeParallelFor testing with various concurrency settings
53151c0b2f7Stbbdev //! \brief \ref error_guessing
53251c0b2f7Stbbdev TEST_CASE("Lightweight testing"){
53351c0b2f7Stbbdev    lightweight_testing::test<tbb::flow::function_node>(10);
53451c0b2f7Stbbdev }
53551c0b2f7Stbbdev 
53651c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
53751c0b2f7Stbbdev //! Test follows and precedes API
53851c0b2f7Stbbdev //! \brief \ref error_guessing
53951c0b2f7Stbbdev TEST_CASE("Flowgraph node set test"){
54051c0b2f7Stbbdev      test_follows_and_precedes_api();
54151c0b2f7Stbbdev }
54251c0b2f7Stbbdev #endif
54351c0b2f7Stbbdev 
54451c0b2f7Stbbdev #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
54551c0b2f7Stbbdev //! Test decution guides
54651c0b2f7Stbbdev //! \brief \ref requirement
54751c0b2f7Stbbdev TEST_CASE("Deduction guides test"){
54851c0b2f7Stbbdev      test_deduction_guides();
54951c0b2f7Stbbdev }
55051c0b2f7Stbbdev #endif
55151c0b2f7Stbbdev 
55251c0b2f7Stbbdev //! try_release and try_consume test
55351c0b2f7Stbbdev //! \brief \ref error_guessing
55451c0b2f7Stbbdev TEST_CASE("try_release try_consume"){
55551c0b2f7Stbbdev     tbb::flow::graph g;
55651c0b2f7Stbbdev 
55751c0b2f7Stbbdev     tbb::flow::function_node<int, int> fn(g, tbb::flow::unlimited, [](const int&v){return v;});
55851c0b2f7Stbbdev 
55951c0b2f7Stbbdev     CHECK_MESSAGE((fn.try_release()==false), "try_release should initially return false on a node");
56051c0b2f7Stbbdev     CHECK_MESSAGE((fn.try_consume()==false), "try_consume should initially return false on a node");
56151c0b2f7Stbbdev }
562