xref: /oneTBB/test/tbb/test_function_node.cpp (revision 51c0b2f7)
1*51c0b2f7Stbbdev /*
2*51c0b2f7Stbbdev     Copyright (c) 2005-2020 Intel Corporation
3*51c0b2f7Stbbdev 
4*51c0b2f7Stbbdev     Licensed under the Apache License, Version 2.0 (the "License");
5*51c0b2f7Stbbdev     you may not use this file except in compliance with the License.
6*51c0b2f7Stbbdev     You may obtain a copy of the License at
7*51c0b2f7Stbbdev 
8*51c0b2f7Stbbdev         http://www.apache.org/licenses/LICENSE-2.0
9*51c0b2f7Stbbdev 
10*51c0b2f7Stbbdev     Unless required by applicable law or agreed to in writing, software
11*51c0b2f7Stbbdev     distributed under the License is distributed on an "AS IS" BASIS,
12*51c0b2f7Stbbdev     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13*51c0b2f7Stbbdev     See the License for the specific language governing permissions and
14*51c0b2f7Stbbdev     limitations under the License.
15*51c0b2f7Stbbdev */
16*51c0b2f7Stbbdev 
17*51c0b2f7Stbbdev #include "common/config.h"
18*51c0b2f7Stbbdev 
19*51c0b2f7Stbbdev // TODO revamp: move parts dependent on __TBB_EXTRA_DEBUG into separate test(s) since having these
20*51c0b2f7Stbbdev // parts in all of tests might make testing of the product, which is different from what is actually
21*51c0b2f7Stbbdev // released.
22*51c0b2f7Stbbdev #define __TBB_EXTRA_DEBUG 1
23*51c0b2f7Stbbdev #include "tbb/flow_graph.h"
24*51c0b2f7Stbbdev #include "tbb/spin_rw_mutex.h"
25*51c0b2f7Stbbdev #include "tbb/global_control.h"
26*51c0b2f7Stbbdev 
27*51c0b2f7Stbbdev #include "common/test.h"
28*51c0b2f7Stbbdev #include "common/utils.h"
29*51c0b2f7Stbbdev #include "common/graph_utils.h"
30*51c0b2f7Stbbdev #include "common/test_follows_and_precedes_api.h"
31*51c0b2f7Stbbdev 
32*51c0b2f7Stbbdev 
33*51c0b2f7Stbbdev //! \file test_function_node.cpp
34*51c0b2f7Stbbdev //! \brief Test for [flow_graph.function_node] specification
35*51c0b2f7Stbbdev 
36*51c0b2f7Stbbdev 
37*51c0b2f7Stbbdev #define N 100
38*51c0b2f7Stbbdev #define MAX_NODES 4
39*51c0b2f7Stbbdev 
40*51c0b2f7Stbbdev //! Performs test on function nodes with limited concurrency and buffering
41*51c0b2f7Stbbdev /** These tests check:
42*51c0b2f7Stbbdev     1) that the number of executing copies never exceed the concurrency limit
43*51c0b2f7Stbbdev     2) that the node never rejects
44*51c0b2f7Stbbdev     3) that no items are lost
45*51c0b2f7Stbbdev     and 4) all of this happens even if there are multiple predecessors and successors
46*51c0b2f7Stbbdev */
47*51c0b2f7Stbbdev 
48*51c0b2f7Stbbdev template<typename IO>
49*51c0b2f7Stbbdev struct pass_through {
50*51c0b2f7Stbbdev     IO operator()(const IO& i) { return i; }
51*51c0b2f7Stbbdev };
52*51c0b2f7Stbbdev 
53*51c0b2f7Stbbdev template< typename InputType, typename OutputType, typename Body >
54*51c0b2f7Stbbdev void buffered_levels( size_t concurrency, Body body ) {
55*51c0b2f7Stbbdev 
56*51c0b2f7Stbbdev    // Do for lc = 1 to concurrency level
57*51c0b2f7Stbbdev    for ( size_t lc = 1; lc <= concurrency; ++lc ) {
58*51c0b2f7Stbbdev    tbb::flow::graph g;
59*51c0b2f7Stbbdev 
60*51c0b2f7Stbbdev    // Set the execute_counter back to zero in the harness
61*51c0b2f7Stbbdev    harness_graph_executor<InputType, OutputType>::execute_count = 0;
62*51c0b2f7Stbbdev    // Set the number of current executors to zero.
63*51c0b2f7Stbbdev    harness_graph_executor<InputType, OutputType>::current_executors = 0;
64*51c0b2f7Stbbdev    // Set the max allowed executors to lc.  There is a check in the functor to make sure this is never exceeded.
65*51c0b2f7Stbbdev    harness_graph_executor<InputType, OutputType>::max_executors = lc;
66*51c0b2f7Stbbdev 
67*51c0b2f7Stbbdev    // Create the function_node with the appropriate concurrency level, and use default buffering
68*51c0b2f7Stbbdev    tbb::flow::function_node< InputType, OutputType > exe_node( g, lc, body );
69*51c0b2f7Stbbdev    tbb::flow::function_node<InputType, InputType> pass_thru( g, tbb::flow::unlimited, pass_through<InputType>());
70*51c0b2f7Stbbdev 
71*51c0b2f7Stbbdev    // Create a vector of identical exe_nodes and pass_thrus
72*51c0b2f7Stbbdev    std::vector< tbb::flow::function_node< InputType, OutputType > > exe_vec(2, exe_node);
73*51c0b2f7Stbbdev    std::vector< tbb::flow::function_node< InputType, InputType > > pass_thru_vec(2, pass_thru);
74*51c0b2f7Stbbdev    // Attach each pass_thru to its corresponding exe_node
75*51c0b2f7Stbbdev    for (size_t node_idx=0; node_idx<exe_vec.size(); ++node_idx) {
76*51c0b2f7Stbbdev        tbb::flow::make_edge(pass_thru_vec[node_idx], exe_vec[node_idx]);
77*51c0b2f7Stbbdev    }
78*51c0b2f7Stbbdev 
79*51c0b2f7Stbbdev    // TODO: why the test is executed serially for the node pairs, not concurrently?
80*51c0b2f7Stbbdev    for (size_t node_idx=0; node_idx<exe_vec.size(); ++node_idx) {
81*51c0b2f7Stbbdev    // For num_receivers = 1 to MAX_NODES
82*51c0b2f7Stbbdev        for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
83*51c0b2f7Stbbdev            // Create num_receivers counting receivers and connect the exe_vec[node_idx] to them.
84*51c0b2f7Stbbdev            std::vector< std::shared_ptr<harness_mapped_receiver<OutputType>> > receivers;
85*51c0b2f7Stbbdev            for (size_t i = 0; i < num_receivers; i++) {
86*51c0b2f7Stbbdev                receivers.push_back( std::make_shared<harness_mapped_receiver<OutputType>>(g) );
87*51c0b2f7Stbbdev            }
88*51c0b2f7Stbbdev 
89*51c0b2f7Stbbdev            for (size_t r = 0; r < num_receivers; ++r ) {
90*51c0b2f7Stbbdev                tbb::flow::make_edge( exe_vec[node_idx], *receivers[r] );
91*51c0b2f7Stbbdev            }
92*51c0b2f7Stbbdev 
93*51c0b2f7Stbbdev            // Do the test with varying numbers of senders
94*51c0b2f7Stbbdev            std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders;
95*51c0b2f7Stbbdev            for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) {
96*51c0b2f7Stbbdev                // Create num_senders senders, set there message limit each to N, and connect them to
97*51c0b2f7Stbbdev                // pass_thru_vec[node_idx]
98*51c0b2f7Stbbdev                senders.clear();
99*51c0b2f7Stbbdev                for (size_t s = 0; s < num_senders; ++s ) {
100*51c0b2f7Stbbdev                    senders.push_back( std::make_shared<harness_counting_sender<InputType>>() );
101*51c0b2f7Stbbdev                    senders.back()->my_limit = N;
102*51c0b2f7Stbbdev                    senders.back()->register_successor(pass_thru_vec[node_idx] );
103*51c0b2f7Stbbdev                }
104*51c0b2f7Stbbdev 
105*51c0b2f7Stbbdev                // Initialize the receivers so they know how many senders and messages to check for
106*51c0b2f7Stbbdev                for (size_t r = 0; r < num_receivers; ++r ) {
107*51c0b2f7Stbbdev                    receivers[r]->initialize_map( N, num_senders );
108*51c0b2f7Stbbdev                }
109*51c0b2f7Stbbdev 
110*51c0b2f7Stbbdev                // Do the test
111*51c0b2f7Stbbdev                utils::NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) );
112*51c0b2f7Stbbdev                g.wait_for_all();
113*51c0b2f7Stbbdev 
114*51c0b2f7Stbbdev                // confirm that each sender was requested from N times
115*51c0b2f7Stbbdev                for (size_t s = 0; s < num_senders; ++s ) {
116*51c0b2f7Stbbdev                    size_t n = senders[s]->my_received;
117*51c0b2f7Stbbdev                    CHECK( n == N );
118*51c0b2f7Stbbdev                    CHECK( senders[s]->my_receiver.load(std::memory_order_relaxed) == &pass_thru_vec[node_idx] );
119*51c0b2f7Stbbdev                }
120*51c0b2f7Stbbdev                // validate the receivers
121*51c0b2f7Stbbdev                for (size_t r = 0; r < num_receivers; ++r ) {
122*51c0b2f7Stbbdev                    receivers[r]->validate();
123*51c0b2f7Stbbdev                }
124*51c0b2f7Stbbdev            }
125*51c0b2f7Stbbdev            for (size_t r = 0; r < num_receivers; ++r ) {
126*51c0b2f7Stbbdev                tbb::flow::remove_edge( exe_vec[node_idx], *receivers[r] );
127*51c0b2f7Stbbdev            }
128*51c0b2f7Stbbdev            CHECK( exe_vec[node_idx].try_put( InputType() ) == true );
129*51c0b2f7Stbbdev            g.wait_for_all();
130*51c0b2f7Stbbdev            for (size_t r = 0; r < num_receivers; ++r ) {
131*51c0b2f7Stbbdev                // since it's detached, nothing should have changed
132*51c0b2f7Stbbdev                receivers[r]->validate();
133*51c0b2f7Stbbdev            }
134*51c0b2f7Stbbdev 
135*51c0b2f7Stbbdev        } // for num_receivers
136*51c0b2f7Stbbdev     } // for node_idx
137*51c0b2f7Stbbdev     } // for concurrency level lc
138*51c0b2f7Stbbdev }
139*51c0b2f7Stbbdev 
140*51c0b2f7Stbbdev const size_t Offset = 123;
141*51c0b2f7Stbbdev std::atomic<size_t> global_execute_count;
142*51c0b2f7Stbbdev 
143*51c0b2f7Stbbdev struct inc_functor {
144*51c0b2f7Stbbdev 
145*51c0b2f7Stbbdev     std::atomic<size_t> local_execute_count;
146*51c0b2f7Stbbdev     inc_functor( ) { local_execute_count = 0; }
147*51c0b2f7Stbbdev     inc_functor( const inc_functor &f ) { local_execute_count = size_t(f.local_execute_count); }
148*51c0b2f7Stbbdev     void operator=( const inc_functor &f ) { local_execute_count = size_t(f.local_execute_count); }
149*51c0b2f7Stbbdev 
150*51c0b2f7Stbbdev     int operator()( int i ) {
151*51c0b2f7Stbbdev        ++global_execute_count;
152*51c0b2f7Stbbdev        ++local_execute_count;
153*51c0b2f7Stbbdev        return i;
154*51c0b2f7Stbbdev     }
155*51c0b2f7Stbbdev 
156*51c0b2f7Stbbdev };
157*51c0b2f7Stbbdev 
158*51c0b2f7Stbbdev template< typename InputType, typename OutputType >
159*51c0b2f7Stbbdev void buffered_levels_with_copy( size_t concurrency ) {
160*51c0b2f7Stbbdev 
161*51c0b2f7Stbbdev     // Do for lc = 1 to concurrency level
162*51c0b2f7Stbbdev     for ( size_t lc = 1; lc <= concurrency; ++lc ) {
163*51c0b2f7Stbbdev         tbb::flow::graph g;
164*51c0b2f7Stbbdev 
165*51c0b2f7Stbbdev         inc_functor cf;
166*51c0b2f7Stbbdev         cf.local_execute_count = Offset;
167*51c0b2f7Stbbdev         global_execute_count = Offset;
168*51c0b2f7Stbbdev 
169*51c0b2f7Stbbdev         tbb::flow::function_node< InputType, OutputType > exe_node( g, lc, cf );
170*51c0b2f7Stbbdev 
171*51c0b2f7Stbbdev         for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
172*51c0b2f7Stbbdev 
173*51c0b2f7Stbbdev             std::vector< std::shared_ptr<harness_mapped_receiver<OutputType>> > receivers;
174*51c0b2f7Stbbdev             for (size_t i = 0; i < num_receivers; i++) {
175*51c0b2f7Stbbdev                 receivers.push_back( std::make_shared<harness_mapped_receiver<OutputType>>(g) );
176*51c0b2f7Stbbdev             }
177*51c0b2f7Stbbdev 
178*51c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
179*51c0b2f7Stbbdev                 tbb::flow::make_edge( exe_node, *receivers[r] );
180*51c0b2f7Stbbdev             }
181*51c0b2f7Stbbdev 
182*51c0b2f7Stbbdev             std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders;
183*51c0b2f7Stbbdev             for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) {
184*51c0b2f7Stbbdev                 senders.clear();
185*51c0b2f7Stbbdev                 for (size_t s = 0; s < num_senders; ++s ) {
186*51c0b2f7Stbbdev                     senders.push_back( std::make_shared<harness_counting_sender<InputType>>() );
187*51c0b2f7Stbbdev                     senders.back()->my_limit = N;
188*51c0b2f7Stbbdev                     tbb::flow::make_edge( *senders.back(), exe_node );
189*51c0b2f7Stbbdev                 }
190*51c0b2f7Stbbdev 
191*51c0b2f7Stbbdev                 for (size_t r = 0; r < num_receivers; ++r ) {
192*51c0b2f7Stbbdev                     receivers[r]->initialize_map( N, num_senders );
193*51c0b2f7Stbbdev                 }
194*51c0b2f7Stbbdev 
195*51c0b2f7Stbbdev                 utils::NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) );
196*51c0b2f7Stbbdev                 g.wait_for_all();
197*51c0b2f7Stbbdev 
198*51c0b2f7Stbbdev                 for (size_t s = 0; s < num_senders; ++s ) {
199*51c0b2f7Stbbdev                     size_t n = senders[s]->my_received;
200*51c0b2f7Stbbdev                     CHECK( n == N );
201*51c0b2f7Stbbdev                     CHECK( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_node );
202*51c0b2f7Stbbdev                 }
203*51c0b2f7Stbbdev                 for (size_t r = 0; r < num_receivers; ++r ) {
204*51c0b2f7Stbbdev                     receivers[r]->validate();
205*51c0b2f7Stbbdev                 }
206*51c0b2f7Stbbdev             }
207*51c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
208*51c0b2f7Stbbdev                 tbb::flow::remove_edge( exe_node, *receivers[r] );
209*51c0b2f7Stbbdev             }
210*51c0b2f7Stbbdev             CHECK( exe_node.try_put( InputType() ) == true );
211*51c0b2f7Stbbdev             g.wait_for_all();
212*51c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
213*51c0b2f7Stbbdev                 receivers[r]->validate();
214*51c0b2f7Stbbdev             }
215*51c0b2f7Stbbdev         }
216*51c0b2f7Stbbdev 
217*51c0b2f7Stbbdev         // validate that the local body matches the global execute_count and both are correct
218*51c0b2f7Stbbdev         inc_functor body_copy = tbb::flow::copy_body<inc_functor>( exe_node );
219*51c0b2f7Stbbdev         const size_t expected_count = N/2 * MAX_NODES * MAX_NODES * ( MAX_NODES + 1 ) + MAX_NODES + Offset;
220*51c0b2f7Stbbdev         size_t global_count = global_execute_count;
221*51c0b2f7Stbbdev         size_t inc_count = body_copy.local_execute_count;
222*51c0b2f7Stbbdev         CHECK(global_count == expected_count);
223*51c0b2f7Stbbdev         CHECK(global_count == inc_count );
224*51c0b2f7Stbbdev         g.reset(tbb::flow::rf_reset_bodies);
225*51c0b2f7Stbbdev         body_copy = tbb::flow::copy_body<inc_functor>( exe_node );
226*51c0b2f7Stbbdev         inc_count = body_copy.local_execute_count;
227*51c0b2f7Stbbdev         CHECK_MESSAGE( Offset == inc_count, "reset(rf_reset_bodies) did not reset functor" );
228*51c0b2f7Stbbdev     }
229*51c0b2f7Stbbdev }
230*51c0b2f7Stbbdev 
231*51c0b2f7Stbbdev template< typename InputType, typename OutputType >
232*51c0b2f7Stbbdev void run_buffered_levels( int c ) {
233*51c0b2f7Stbbdev     buffered_levels<InputType,OutputType>( c, []( InputType i ) -> OutputType { return harness_graph_executor<InputType, OutputType>::func(i); } );
234*51c0b2f7Stbbdev     buffered_levels<InputType,OutputType>( c, &harness_graph_executor<InputType, OutputType>::func );
235*51c0b2f7Stbbdev     buffered_levels<InputType,OutputType>( c, typename harness_graph_executor<InputType, OutputType>::functor() );
236*51c0b2f7Stbbdev     buffered_levels_with_copy<InputType,OutputType>( c );
237*51c0b2f7Stbbdev }
238*51c0b2f7Stbbdev 
239*51c0b2f7Stbbdev 
240*51c0b2f7Stbbdev //! Performs test on executable nodes with limited concurrency
241*51c0b2f7Stbbdev /** These tests check:
242*51c0b2f7Stbbdev     1) that the nodes will accepts puts up to the concurrency limit,
243*51c0b2f7Stbbdev     2) the nodes do not exceed the concurrency limit even when run with more threads (this is checked in the harness_graph_executor),
244*51c0b2f7Stbbdev     3) the nodes will receive puts from multiple successors simultaneously,
245*51c0b2f7Stbbdev     and 4) the nodes will send to multiple predecessors.
246*51c0b2f7Stbbdev     There is no checking of the contents of the messages for corruption.
247*51c0b2f7Stbbdev */
248*51c0b2f7Stbbdev 
249*51c0b2f7Stbbdev template< typename InputType, typename OutputType, typename Body >
250*51c0b2f7Stbbdev void concurrency_levels( size_t concurrency, Body body ) {
251*51c0b2f7Stbbdev 
252*51c0b2f7Stbbdev     for ( size_t lc = 1; lc <= concurrency; ++lc ) {
253*51c0b2f7Stbbdev         tbb::flow::graph g;
254*51c0b2f7Stbbdev 
255*51c0b2f7Stbbdev         // Set the execute_counter back to zero in the harness
256*51c0b2f7Stbbdev         harness_graph_executor<InputType, OutputType>::execute_count = 0;
257*51c0b2f7Stbbdev         // Set the number of current executors to zero.
258*51c0b2f7Stbbdev         harness_graph_executor<InputType, OutputType>::current_executors = 0;
259*51c0b2f7Stbbdev         // Set the max allowed executors to lc. There is a check in the functor to make sure this is never exceeded.
260*51c0b2f7Stbbdev         harness_graph_executor<InputType, OutputType>::max_executors = lc;
261*51c0b2f7Stbbdev 
262*51c0b2f7Stbbdev         typedef tbb::flow::function_node< InputType, OutputType, tbb::flow::rejecting > fnode_type;
263*51c0b2f7Stbbdev         fnode_type exe_node( g, lc, body );
264*51c0b2f7Stbbdev 
265*51c0b2f7Stbbdev         for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
266*51c0b2f7Stbbdev 
267*51c0b2f7Stbbdev             std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers;
268*51c0b2f7Stbbdev             for (size_t i = 0; i < num_receivers; ++i) {
269*51c0b2f7Stbbdev                 receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) );
270*51c0b2f7Stbbdev             }
271*51c0b2f7Stbbdev 
272*51c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
273*51c0b2f7Stbbdev                 tbb::flow::make_edge( exe_node, *receivers[r] );
274*51c0b2f7Stbbdev             }
275*51c0b2f7Stbbdev 
276*51c0b2f7Stbbdev             std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders;
277*51c0b2f7Stbbdev 
278*51c0b2f7Stbbdev             for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) {
279*51c0b2f7Stbbdev                 senders.clear();
280*51c0b2f7Stbbdev                 {
281*51c0b2f7Stbbdev                     // Exclusively lock m to prevent exe_node from finishing
282*51c0b2f7Stbbdev                     tbb::spin_rw_mutex::scoped_lock l(
283*51c0b2f7Stbbdev                         harness_graph_executor<InputType, OutputType>::template mutex_holder<tbb::spin_rw_mutex>::mutex
284*51c0b2f7Stbbdev                     );
285*51c0b2f7Stbbdev 
286*51c0b2f7Stbbdev                     // put to lc level, it will accept and then block at m
287*51c0b2f7Stbbdev                     for ( size_t c = 0 ; c < lc ; ++c ) {
288*51c0b2f7Stbbdev                         CHECK( exe_node.try_put( InputType() ) == true );
289*51c0b2f7Stbbdev                     }
290*51c0b2f7Stbbdev                     // it only accepts to lc level
291*51c0b2f7Stbbdev                     CHECK( exe_node.try_put( InputType() ) == false );
292*51c0b2f7Stbbdev 
293*51c0b2f7Stbbdev                     for (size_t s = 0; s < num_senders; ++s ) {
294*51c0b2f7Stbbdev                         senders.push_back( std::make_shared<harness_counting_sender<InputType>>() );
295*51c0b2f7Stbbdev                         // register a sender
296*51c0b2f7Stbbdev                         senders.back()->my_limit = N;
297*51c0b2f7Stbbdev                         exe_node.register_predecessor( *senders.back() );
298*51c0b2f7Stbbdev                     }
299*51c0b2f7Stbbdev 
300*51c0b2f7Stbbdev                 } // release lock at end of scope, setting the exe node free to continue
301*51c0b2f7Stbbdev                 // wait for graph to settle down
302*51c0b2f7Stbbdev                 g.wait_for_all();
303*51c0b2f7Stbbdev 
304*51c0b2f7Stbbdev                 // confirm that each sender was requested from N times
305*51c0b2f7Stbbdev                 for (size_t s = 0; s < num_senders; ++s ) {
306*51c0b2f7Stbbdev                     size_t n = senders[s]->my_received;
307*51c0b2f7Stbbdev                     CHECK( n == N );
308*51c0b2f7Stbbdev                     CHECK( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_node );
309*51c0b2f7Stbbdev                 }
310*51c0b2f7Stbbdev                 // confirm that each receivers got N * num_senders + the initial lc puts
311*51c0b2f7Stbbdev                 for (size_t r = 0; r < num_receivers; ++r ) {
312*51c0b2f7Stbbdev                     size_t n = receivers[r]->my_count;
313*51c0b2f7Stbbdev                     CHECK( n == num_senders*N+lc );
314*51c0b2f7Stbbdev                     receivers[r]->my_count = 0;
315*51c0b2f7Stbbdev                 }
316*51c0b2f7Stbbdev             }
317*51c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
318*51c0b2f7Stbbdev                 tbb::flow::remove_edge( exe_node, *receivers[r] );
319*51c0b2f7Stbbdev             }
320*51c0b2f7Stbbdev             CHECK( exe_node.try_put( InputType() ) == true );
321*51c0b2f7Stbbdev             g.wait_for_all();
322*51c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
323*51c0b2f7Stbbdev                 CHECK( int(receivers[r]->my_count) == 0 );
324*51c0b2f7Stbbdev             }
325*51c0b2f7Stbbdev         }
326*51c0b2f7Stbbdev     }
327*51c0b2f7Stbbdev }
328*51c0b2f7Stbbdev 
329*51c0b2f7Stbbdev 
330*51c0b2f7Stbbdev template< typename InputType, typename OutputType >
331*51c0b2f7Stbbdev void run_concurrency_levels( int c ) {
332*51c0b2f7Stbbdev     concurrency_levels<InputType,OutputType>( c, []( InputType i ) -> OutputType { return harness_graph_executor<InputType, OutputType>::template tfunc<tbb::spin_rw_mutex>(i); } );
333*51c0b2f7Stbbdev     concurrency_levels<InputType,OutputType>( c, &harness_graph_executor<InputType, OutputType>::template tfunc<tbb::spin_rw_mutex> );
334*51c0b2f7Stbbdev     concurrency_levels<InputType,OutputType>( c, typename harness_graph_executor<InputType, OutputType>::template tfunctor<tbb::spin_rw_mutex>() );
335*51c0b2f7Stbbdev }
336*51c0b2f7Stbbdev 
337*51c0b2f7Stbbdev 
338*51c0b2f7Stbbdev struct empty_no_assign {
339*51c0b2f7Stbbdev    empty_no_assign() {}
340*51c0b2f7Stbbdev    empty_no_assign( int ) {}
341*51c0b2f7Stbbdev    operator int() { return 0; }
342*51c0b2f7Stbbdev };
343*51c0b2f7Stbbdev 
344*51c0b2f7Stbbdev template< typename InputType >
345*51c0b2f7Stbbdev struct parallel_puts : private utils::NoAssign {
346*51c0b2f7Stbbdev 
347*51c0b2f7Stbbdev     tbb::flow::receiver< InputType > * const my_exe_node;
348*51c0b2f7Stbbdev 
349*51c0b2f7Stbbdev     parallel_puts( tbb::flow::receiver< InputType > &exe_node ) : my_exe_node(&exe_node) {}
350*51c0b2f7Stbbdev 
351*51c0b2f7Stbbdev     void operator()( int ) const  {
352*51c0b2f7Stbbdev         for ( int i = 0; i < N; ++i ) {
353*51c0b2f7Stbbdev             // the nodes will accept all puts
354*51c0b2f7Stbbdev             CHECK( my_exe_node->try_put( InputType() ) == true );
355*51c0b2f7Stbbdev         }
356*51c0b2f7Stbbdev     }
357*51c0b2f7Stbbdev 
358*51c0b2f7Stbbdev };
359*51c0b2f7Stbbdev 
360*51c0b2f7Stbbdev //! Performs test on executable nodes with unlimited concurrency
361*51c0b2f7Stbbdev /** These tests check:
362*51c0b2f7Stbbdev     1) that the nodes will accept all puts
363*51c0b2f7Stbbdev     2) the nodes will receive puts from multiple predecessors simultaneously,
364*51c0b2f7Stbbdev     and 3) the nodes will send to multiple successors.
365*51c0b2f7Stbbdev     There is no checking of the contents of the messages for corruption.
366*51c0b2f7Stbbdev */
367*51c0b2f7Stbbdev 
368*51c0b2f7Stbbdev template< typename InputType, typename OutputType, typename Body >
369*51c0b2f7Stbbdev void unlimited_concurrency( Body body ) {
370*51c0b2f7Stbbdev 
371*51c0b2f7Stbbdev     for (unsigned p = 1; p < 2*utils::MaxThread; ++p) {
372*51c0b2f7Stbbdev         tbb::flow::graph g;
373*51c0b2f7Stbbdev         tbb::flow::function_node< InputType, OutputType, tbb::flow::rejecting > exe_node( g, tbb::flow::unlimited, body );
374*51c0b2f7Stbbdev 
375*51c0b2f7Stbbdev         for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
376*51c0b2f7Stbbdev 
377*51c0b2f7Stbbdev             std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers;
378*51c0b2f7Stbbdev             for (size_t i = 0; i < num_receivers; ++i) {
379*51c0b2f7Stbbdev                 receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) );
380*51c0b2f7Stbbdev             }
381*51c0b2f7Stbbdev 
382*51c0b2f7Stbbdev             harness_graph_executor<InputType, OutputType>::execute_count = 0;
383*51c0b2f7Stbbdev 
384*51c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
385*51c0b2f7Stbbdev                 tbb::flow::make_edge( exe_node, *receivers[r] );
386*51c0b2f7Stbbdev             }
387*51c0b2f7Stbbdev 
388*51c0b2f7Stbbdev             utils::NativeParallelFor( p, parallel_puts<InputType>(exe_node) );
389*51c0b2f7Stbbdev             g.wait_for_all();
390*51c0b2f7Stbbdev 
391*51c0b2f7Stbbdev             // 2) the nodes will receive puts from multiple predecessors simultaneously,
392*51c0b2f7Stbbdev             size_t ec = harness_graph_executor<InputType, OutputType>::execute_count;
393*51c0b2f7Stbbdev             CHECK( ec == p*N );
394*51c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
395*51c0b2f7Stbbdev                 size_t c = receivers[r]->my_count;
396*51c0b2f7Stbbdev                 // 3) the nodes will send to multiple successors.
397*51c0b2f7Stbbdev                 CHECK( c == p*N );
398*51c0b2f7Stbbdev             }
399*51c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
400*51c0b2f7Stbbdev                 tbb::flow::remove_edge( exe_node, *receivers[r] );
401*51c0b2f7Stbbdev             }
402*51c0b2f7Stbbdev             }
403*51c0b2f7Stbbdev         }
404*51c0b2f7Stbbdev     }
405*51c0b2f7Stbbdev 
406*51c0b2f7Stbbdev template< typename InputType, typename OutputType >
407*51c0b2f7Stbbdev void run_unlimited_concurrency() {
408*51c0b2f7Stbbdev     harness_graph_executor<InputType, OutputType>::max_executors = 0;
409*51c0b2f7Stbbdev     unlimited_concurrency<InputType,OutputType>( []( InputType i ) -> OutputType { return harness_graph_executor<InputType, OutputType>::func(i); } );
410*51c0b2f7Stbbdev     unlimited_concurrency<InputType,OutputType>( &harness_graph_executor<InputType, OutputType>::func );
411*51c0b2f7Stbbdev     unlimited_concurrency<InputType,OutputType>( typename harness_graph_executor<InputType, OutputType>::functor() );
412*51c0b2f7Stbbdev }
413*51c0b2f7Stbbdev 
414*51c0b2f7Stbbdev struct continue_msg_to_int {
415*51c0b2f7Stbbdev     int my_int;
416*51c0b2f7Stbbdev     continue_msg_to_int(int x) : my_int(x) {}
417*51c0b2f7Stbbdev     int operator()(tbb::flow::continue_msg) { return my_int; }
418*51c0b2f7Stbbdev };
419*51c0b2f7Stbbdev 
420*51c0b2f7Stbbdev void test_function_node_with_continue_msg_as_input() {
421*51c0b2f7Stbbdev     // If this function terminates, then this test is successful
422*51c0b2f7Stbbdev     tbb::flow::graph g;
423*51c0b2f7Stbbdev 
424*51c0b2f7Stbbdev     tbb::flow::broadcast_node<tbb::flow::continue_msg> Start(g);
425*51c0b2f7Stbbdev 
426*51c0b2f7Stbbdev     tbb::flow::function_node<tbb::flow::continue_msg, int, tbb::flow::rejecting> FN1( g, tbb::flow::serial, continue_msg_to_int(42));
427*51c0b2f7Stbbdev     tbb::flow::function_node<tbb::flow::continue_msg, int, tbb::flow::rejecting> FN2( g, tbb::flow::serial, continue_msg_to_int(43));
428*51c0b2f7Stbbdev 
429*51c0b2f7Stbbdev     tbb::flow::make_edge( Start, FN1 );
430*51c0b2f7Stbbdev     tbb::flow::make_edge( Start, FN2 );
431*51c0b2f7Stbbdev 
432*51c0b2f7Stbbdev     Start.try_put( tbb::flow::continue_msg() );
433*51c0b2f7Stbbdev     g.wait_for_all();
434*51c0b2f7Stbbdev }
435*51c0b2f7Stbbdev 
436*51c0b2f7Stbbdev //! Tests limited concurrency cases for nodes that accept data messages
437*51c0b2f7Stbbdev void test_concurrency(int num_threads) {
438*51c0b2f7Stbbdev     tbb::global_control thread_limit(tbb::global_control::max_allowed_parallelism, num_threads);
439*51c0b2f7Stbbdev     run_concurrency_levels<int,int>(num_threads);
440*51c0b2f7Stbbdev     run_concurrency_levels<int,tbb::flow::continue_msg>(num_threads);
441*51c0b2f7Stbbdev     run_buffered_levels<int, int>(num_threads);
442*51c0b2f7Stbbdev     run_unlimited_concurrency<int,int>();
443*51c0b2f7Stbbdev     run_unlimited_concurrency<int,empty_no_assign>();
444*51c0b2f7Stbbdev     run_unlimited_concurrency<empty_no_assign,int>();
445*51c0b2f7Stbbdev     run_unlimited_concurrency<empty_no_assign,empty_no_assign>();
446*51c0b2f7Stbbdev     run_unlimited_concurrency<int,tbb::flow::continue_msg>();
447*51c0b2f7Stbbdev     run_unlimited_concurrency<empty_no_assign,tbb::flow::continue_msg>();
448*51c0b2f7Stbbdev     test_function_node_with_continue_msg_as_input();
449*51c0b2f7Stbbdev }
450*51c0b2f7Stbbdev 
451*51c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
452*51c0b2f7Stbbdev #include <array>
453*51c0b2f7Stbbdev #include <vector>
454*51c0b2f7Stbbdev void test_follows_and_precedes_api() {
455*51c0b2f7Stbbdev     using msg_t = tbb::flow::continue_msg;
456*51c0b2f7Stbbdev 
457*51c0b2f7Stbbdev     std::array<msg_t, 3> messages_for_follows = { {msg_t(), msg_t(), msg_t()} };
458*51c0b2f7Stbbdev     std::vector<msg_t> messages_for_precedes = { msg_t() };
459*51c0b2f7Stbbdev 
460*51c0b2f7Stbbdev     pass_through<msg_t> pass_msg;
461*51c0b2f7Stbbdev 
462*51c0b2f7Stbbdev     follows_and_precedes_testing::test_follows
463*51c0b2f7Stbbdev         <msg_t, tbb::flow::function_node<msg_t, msg_t>>
464*51c0b2f7Stbbdev         (messages_for_follows, tbb::flow::unlimited, pass_msg);
465*51c0b2f7Stbbdev     follows_and_precedes_testing::test_precedes
466*51c0b2f7Stbbdev         <msg_t, tbb::flow::function_node<msg_t, msg_t>>
467*51c0b2f7Stbbdev         (messages_for_precedes, tbb::flow::unlimited, pass_msg, tbb::flow::node_priority_t(1));
468*51c0b2f7Stbbdev }
469*51c0b2f7Stbbdev #endif
470*51c0b2f7Stbbdev 
471*51c0b2f7Stbbdev #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
472*51c0b2f7Stbbdev 
473*51c0b2f7Stbbdev int function_body_f(const int&) { return 1; }
474*51c0b2f7Stbbdev 
475*51c0b2f7Stbbdev template <typename Body>
476*51c0b2f7Stbbdev void test_deduction_guides_common(Body body) {
477*51c0b2f7Stbbdev     using namespace tbb::flow;
478*51c0b2f7Stbbdev     graph g;
479*51c0b2f7Stbbdev 
480*51c0b2f7Stbbdev     function_node f1(g, unlimited, body);
481*51c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(f1), function_node<int, int>>);
482*51c0b2f7Stbbdev 
483*51c0b2f7Stbbdev     function_node f2(g, unlimited, body, rejecting());
484*51c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(f2), function_node<int, int, rejecting>>);
485*51c0b2f7Stbbdev 
486*51c0b2f7Stbbdev     function_node f3(g, unlimited, body, node_priority_t(5));
487*51c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(f3), function_node<int, int>>);
488*51c0b2f7Stbbdev 
489*51c0b2f7Stbbdev     function_node f4(g, unlimited, body, rejecting(), node_priority_t(5));
490*51c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(f4), function_node<int, int, rejecting>>);
491*51c0b2f7Stbbdev 
492*51c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
493*51c0b2f7Stbbdev     function_node f5(follows(f2), unlimited, body);
494*51c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(f5), function_node<int, int>>);
495*51c0b2f7Stbbdev 
496*51c0b2f7Stbbdev     function_node f6(follows(f5), unlimited, body, rejecting());
497*51c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(f6), function_node<int, int, rejecting>>);
498*51c0b2f7Stbbdev 
499*51c0b2f7Stbbdev     function_node f7(follows(f6), unlimited, body, node_priority_t(5));
500*51c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(f7), function_node<int, int>>);
501*51c0b2f7Stbbdev 
502*51c0b2f7Stbbdev     function_node f8(follows(f7), unlimited, body, rejecting(), node_priority_t(5));
503*51c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(f8), function_node<int, int, rejecting>>);
504*51c0b2f7Stbbdev #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
505*51c0b2f7Stbbdev 
506*51c0b2f7Stbbdev     function_node f9(f1);
507*51c0b2f7Stbbdev     static_assert(std::is_same_v<decltype(f9), function_node<int, int>>);
508*51c0b2f7Stbbdev }
509*51c0b2f7Stbbdev 
510*51c0b2f7Stbbdev void test_deduction_guides() {
511*51c0b2f7Stbbdev     test_deduction_guides_common([](const int&)->int { return 1; });
512*51c0b2f7Stbbdev     test_deduction_guides_common([](const int&) mutable ->int { return 1; });
513*51c0b2f7Stbbdev     test_deduction_guides_common(function_body_f);
514*51c0b2f7Stbbdev }
515*51c0b2f7Stbbdev 
516*51c0b2f7Stbbdev #endif
517*51c0b2f7Stbbdev 
518*51c0b2f7Stbbdev //! Test various node bodies with concurrency
519*51c0b2f7Stbbdev //! \brief \ref error_guessing
520*51c0b2f7Stbbdev TEST_CASE("Concurrency test") {
521*51c0b2f7Stbbdev     for(unsigned int p = utils::MinThread; p <= utils::MaxThread; ++p ) {
522*51c0b2f7Stbbdev         test_concurrency(p);
523*51c0b2f7Stbbdev     }
524*51c0b2f7Stbbdev }
525*51c0b2f7Stbbdev 
526*51c0b2f7Stbbdev //! NativeParallelFor testing with various concurrency settings
527*51c0b2f7Stbbdev //! \brief \ref error_guessing
528*51c0b2f7Stbbdev TEST_CASE("Lightweight testing"){
529*51c0b2f7Stbbdev    lightweight_testing::test<tbb::flow::function_node>(10);
530*51c0b2f7Stbbdev }
531*51c0b2f7Stbbdev 
532*51c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
533*51c0b2f7Stbbdev //! Test follows and precedes API
534*51c0b2f7Stbbdev //! \brief \ref error_guessing
535*51c0b2f7Stbbdev TEST_CASE("Flowgraph node set test"){
536*51c0b2f7Stbbdev      test_follows_and_precedes_api();
537*51c0b2f7Stbbdev }
538*51c0b2f7Stbbdev #endif
539*51c0b2f7Stbbdev 
540*51c0b2f7Stbbdev #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
541*51c0b2f7Stbbdev //! Test decution guides
542*51c0b2f7Stbbdev //! \brief \ref requirement
543*51c0b2f7Stbbdev TEST_CASE("Deduction guides test"){
544*51c0b2f7Stbbdev      test_deduction_guides();
545*51c0b2f7Stbbdev }
546*51c0b2f7Stbbdev #endif
547*51c0b2f7Stbbdev 
548*51c0b2f7Stbbdev //! try_release and try_consume test
549*51c0b2f7Stbbdev //! \brief \ref error_guessing
550*51c0b2f7Stbbdev TEST_CASE("try_release try_consume"){
551*51c0b2f7Stbbdev     tbb::flow::graph g;
552*51c0b2f7Stbbdev 
553*51c0b2f7Stbbdev     tbb::flow::function_node<int, int> fn(g, tbb::flow::unlimited, [](const int&v){return v;});
554*51c0b2f7Stbbdev 
555*51c0b2f7Stbbdev     CHECK_MESSAGE((fn.try_release()==false), "try_release should initially return false on a node");
556*51c0b2f7Stbbdev     CHECK_MESSAGE((fn.try_consume()==false), "try_consume should initially return false on a node");
557*51c0b2f7Stbbdev }
558