1*51c0b2f7Stbbdev /*
2*51c0b2f7Stbbdev     Copyright (c) 2005-2020 Intel Corporation
3*51c0b2f7Stbbdev 
4*51c0b2f7Stbbdev     Licensed under the Apache License, Version 2.0 (the "License");
5*51c0b2f7Stbbdev     you may not use this file except in compliance with the License.
6*51c0b2f7Stbbdev     You may obtain a copy of the License at
7*51c0b2f7Stbbdev 
8*51c0b2f7Stbbdev         http://www.apache.org/licenses/LICENSE-2.0
9*51c0b2f7Stbbdev 
10*51c0b2f7Stbbdev     Unless required by applicable law or agreed to in writing, software
11*51c0b2f7Stbbdev     distributed under the License is distributed on an "AS IS" BASIS,
12*51c0b2f7Stbbdev     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13*51c0b2f7Stbbdev     See the License for the specific language governing permissions and
14*51c0b2f7Stbbdev     limitations under the License.
15*51c0b2f7Stbbdev */
16*51c0b2f7Stbbdev 
17*51c0b2f7Stbbdev #include "common/config.h"
18*51c0b2f7Stbbdev 
19*51c0b2f7Stbbdev // TODO revamp: move parts dependent on __TBB_EXTRA_DEBUG into separate test(s) since having these
20*51c0b2f7Stbbdev // parts in all of tests might make testing of the product, which is different from what is actually
21*51c0b2f7Stbbdev // released.
22*51c0b2f7Stbbdev #define __TBB_EXTRA_DEBUG 1
23*51c0b2f7Stbbdev #include "tbb/flow_graph.h"
24*51c0b2f7Stbbdev #include "tbb/spin_rw_mutex.h"
25*51c0b2f7Stbbdev 
26*51c0b2f7Stbbdev #include "common/test.h"
27*51c0b2f7Stbbdev #include "common/utils.h"
28*51c0b2f7Stbbdev #include "common/graph_utils.h"
29*51c0b2f7Stbbdev #include "common/test_follows_and_precedes_api.h"
30*51c0b2f7Stbbdev 
31*51c0b2f7Stbbdev 
32*51c0b2f7Stbbdev //! \file test_multifunction_node.cpp
33*51c0b2f7Stbbdev //! \brief Test for [flow_graph.multifunction_node] specification
34*51c0b2f7Stbbdev 
35*51c0b2f7Stbbdev 
36*51c0b2f7Stbbdev #if TBB_USE_DEBUG
37*51c0b2f7Stbbdev #define N 16
38*51c0b2f7Stbbdev #else
39*51c0b2f7Stbbdev #define N 100
40*51c0b2f7Stbbdev #endif
41*51c0b2f7Stbbdev #define MAX_NODES 4
42*51c0b2f7Stbbdev 
43*51c0b2f7Stbbdev //! Performs test on function nodes with limited concurrency and buffering
44*51c0b2f7Stbbdev /** These tests check:
45*51c0b2f7Stbbdev     1) that the number of executing copies never exceed the concurrency limit
46*51c0b2f7Stbbdev     2) that the node never rejects
47*51c0b2f7Stbbdev     3) that no items are lost
48*51c0b2f7Stbbdev     and 4) all of this happens even if there are multiple predecessors and successors
49*51c0b2f7Stbbdev */
50*51c0b2f7Stbbdev 
51*51c0b2f7Stbbdev //! exercise buffered multifunction_node.
52*51c0b2f7Stbbdev template< typename InputType, typename OutputTuple, typename Body >
53*51c0b2f7Stbbdev void buffered_levels( size_t concurrency, Body body ) {
54*51c0b2f7Stbbdev     typedef typename std::tuple_element<0,OutputTuple>::type OutputType;
55*51c0b2f7Stbbdev     // Do for lc = 1 to concurrency level
56*51c0b2f7Stbbdev     for ( size_t lc = 1; lc <= concurrency; ++lc ) {
57*51c0b2f7Stbbdev         tbb::flow::graph g;
58*51c0b2f7Stbbdev 
59*51c0b2f7Stbbdev         // Set the execute_counter back to zero in the harness
60*51c0b2f7Stbbdev         harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count = 0;
61*51c0b2f7Stbbdev         // Set the number of current executors to zero.
62*51c0b2f7Stbbdev         harness_graph_multifunction_executor<InputType, OutputTuple>::current_executors = 0;
63*51c0b2f7Stbbdev         // Set the max allowed executors to lc.  There is a check in the functor to make sure this is never exceeded.
64*51c0b2f7Stbbdev         harness_graph_multifunction_executor<InputType, OutputTuple>::max_executors = lc;
65*51c0b2f7Stbbdev 
66*51c0b2f7Stbbdev         // Create the function_node with the appropriate concurrency level, and use default buffering
67*51c0b2f7Stbbdev         tbb::flow::multifunction_node< InputType, OutputTuple > exe_node( g, lc, body );
68*51c0b2f7Stbbdev 
69*51c0b2f7Stbbdev         //Create a vector of identical exe_nodes
70*51c0b2f7Stbbdev         std::vector< tbb::flow::multifunction_node< InputType, OutputTuple > > exe_vec(2, exe_node);
71*51c0b2f7Stbbdev 
72*51c0b2f7Stbbdev         // exercise each of the copied nodes
73*51c0b2f7Stbbdev         for (size_t node_idx=0; node_idx<exe_vec.size(); ++node_idx) {
74*51c0b2f7Stbbdev             for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
75*51c0b2f7Stbbdev                 // Create num_receivers counting receivers and connect the exe_vec[node_idx] to them.
76*51c0b2f7Stbbdev                 std::vector< std::shared_ptr<harness_mapped_receiver<OutputType>> > receivers;
77*51c0b2f7Stbbdev                 for (size_t i = 0; i < num_receivers; i++) {
78*51c0b2f7Stbbdev                     receivers.push_back( std::make_shared<harness_mapped_receiver<OutputType>>(g) );
79*51c0b2f7Stbbdev                 }
80*51c0b2f7Stbbdev 
81*51c0b2f7Stbbdev                 for (size_t r = 0; r < num_receivers; ++r ) {
82*51c0b2f7Stbbdev                     tbb::flow::make_edge( tbb::flow::output_port<0>(exe_vec[node_idx]), *receivers[r] );
83*51c0b2f7Stbbdev                 }
84*51c0b2f7Stbbdev 
85*51c0b2f7Stbbdev                 // Do the test with varying numbers of senders
86*51c0b2f7Stbbdev                 std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders;
87*51c0b2f7Stbbdev                 for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) {
88*51c0b2f7Stbbdev                     // Create num_senders senders, set their message limit each to N, and connect
89*51c0b2f7Stbbdev                     // them to the exe_vec[node_idx]
90*51c0b2f7Stbbdev                     senders.clear();
91*51c0b2f7Stbbdev                     for (size_t s = 0; s < num_senders; ++s ) {
92*51c0b2f7Stbbdev                         senders.push_back( std::make_shared<harness_counting_sender<InputType>>() );
93*51c0b2f7Stbbdev                         senders.back()->my_limit = N;
94*51c0b2f7Stbbdev                         tbb::flow::make_edge( *senders.back(), exe_vec[node_idx] );
95*51c0b2f7Stbbdev                     }
96*51c0b2f7Stbbdev 
97*51c0b2f7Stbbdev                     // Initialize the receivers so they know how many senders and messages to check for
98*51c0b2f7Stbbdev                     for (size_t r = 0; r < num_receivers; ++r ) {
99*51c0b2f7Stbbdev                         receivers[r]->initialize_map( N, num_senders );
100*51c0b2f7Stbbdev                     }
101*51c0b2f7Stbbdev 
102*51c0b2f7Stbbdev                     // Do the test
103*51c0b2f7Stbbdev                     utils::NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) );
104*51c0b2f7Stbbdev                     g.wait_for_all();
105*51c0b2f7Stbbdev 
106*51c0b2f7Stbbdev                     // confirm that each sender was requested from N times
107*51c0b2f7Stbbdev                     for (size_t s = 0; s < num_senders; ++s ) {
108*51c0b2f7Stbbdev                         size_t n = senders[s]->my_received;
109*51c0b2f7Stbbdev                         CHECK_MESSAGE( n == N, "" );
110*51c0b2f7Stbbdev                         CHECK_MESSAGE( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_vec[node_idx], "" );
111*51c0b2f7Stbbdev                     }
112*51c0b2f7Stbbdev                     // validate the receivers
113*51c0b2f7Stbbdev                     for (size_t r = 0; r < num_receivers; ++r ) {
114*51c0b2f7Stbbdev                         receivers[r]->validate();
115*51c0b2f7Stbbdev                     }
116*51c0b2f7Stbbdev                 }
117*51c0b2f7Stbbdev                 for (size_t r = 0; r < num_receivers; ++r ) {
118*51c0b2f7Stbbdev                     tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_vec[node_idx]), *receivers[r] );
119*51c0b2f7Stbbdev                 }
120*51c0b2f7Stbbdev                 CHECK_MESSAGE( exe_vec[node_idx].try_put( InputType() ) == true, "" );
121*51c0b2f7Stbbdev                 g.wait_for_all();
122*51c0b2f7Stbbdev                 for (size_t r = 0; r < num_receivers; ++r ) {
123*51c0b2f7Stbbdev                     // since it's detached, nothing should have changed
124*51c0b2f7Stbbdev                     receivers[r]->validate();
125*51c0b2f7Stbbdev                 }
126*51c0b2f7Stbbdev             }
127*51c0b2f7Stbbdev         }
128*51c0b2f7Stbbdev     }
129*51c0b2f7Stbbdev }
130*51c0b2f7Stbbdev 
131*51c0b2f7Stbbdev const size_t Offset = 123;
132*51c0b2f7Stbbdev std::atomic<size_t> global_execute_count;
133*51c0b2f7Stbbdev 
134*51c0b2f7Stbbdev struct inc_functor {
135*51c0b2f7Stbbdev 
136*51c0b2f7Stbbdev     std::atomic<size_t> local_execute_count;
137*51c0b2f7Stbbdev     inc_functor( ) { local_execute_count = 0; }
138*51c0b2f7Stbbdev     inc_functor( const inc_functor &f ) { local_execute_count = size_t(f.local_execute_count); }
139*51c0b2f7Stbbdev 
140*51c0b2f7Stbbdev     template<typename output_ports_type>
141*51c0b2f7Stbbdev     void operator()( int i, output_ports_type &p ) {
142*51c0b2f7Stbbdev        ++global_execute_count;
143*51c0b2f7Stbbdev        ++local_execute_count;
144*51c0b2f7Stbbdev        (void)std::get<0>(p).try_put(i);
145*51c0b2f7Stbbdev     }
146*51c0b2f7Stbbdev 
147*51c0b2f7Stbbdev };
148*51c0b2f7Stbbdev 
149*51c0b2f7Stbbdev template< typename InputType, typename OutputTuple >
150*51c0b2f7Stbbdev void buffered_levels_with_copy( size_t concurrency ) {
151*51c0b2f7Stbbdev     typedef typename std::tuple_element<0,OutputTuple>::type OutputType;
152*51c0b2f7Stbbdev     // Do for lc = 1 to concurrency level
153*51c0b2f7Stbbdev     for ( size_t lc = 1; lc <= concurrency; ++lc ) {
154*51c0b2f7Stbbdev         tbb::flow::graph g;
155*51c0b2f7Stbbdev 
156*51c0b2f7Stbbdev         inc_functor cf;
157*51c0b2f7Stbbdev         cf.local_execute_count = Offset;
158*51c0b2f7Stbbdev         global_execute_count = Offset;
159*51c0b2f7Stbbdev 
160*51c0b2f7Stbbdev         tbb::flow::multifunction_node< InputType, OutputTuple > exe_node( g, lc, cf );
161*51c0b2f7Stbbdev 
162*51c0b2f7Stbbdev         for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
163*51c0b2f7Stbbdev 
164*51c0b2f7Stbbdev             std::vector< std::shared_ptr<harness_mapped_receiver<OutputType>> > receivers;
165*51c0b2f7Stbbdev             for (size_t i = 0; i < num_receivers; i++) {
166*51c0b2f7Stbbdev                 receivers.push_back( std::make_shared<harness_mapped_receiver<OutputType>>(g) );
167*51c0b2f7Stbbdev             }
168*51c0b2f7Stbbdev 
169*51c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
170*51c0b2f7Stbbdev                tbb::flow::make_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] );
171*51c0b2f7Stbbdev             }
172*51c0b2f7Stbbdev 
173*51c0b2f7Stbbdev             std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders;
174*51c0b2f7Stbbdev             for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) {
175*51c0b2f7Stbbdev                 senders.clear();
176*51c0b2f7Stbbdev                 for (size_t s = 0; s < num_senders; ++s ) {
177*51c0b2f7Stbbdev                     senders.push_back( std::make_shared<harness_counting_sender<InputType>>() );
178*51c0b2f7Stbbdev                     senders.back()->my_limit = N;
179*51c0b2f7Stbbdev                     tbb::flow::make_edge( *senders.back(), exe_node );
180*51c0b2f7Stbbdev                 }
181*51c0b2f7Stbbdev 
182*51c0b2f7Stbbdev                 for (size_t r = 0; r < num_receivers; ++r ) {
183*51c0b2f7Stbbdev                     receivers[r]->initialize_map( N, num_senders );
184*51c0b2f7Stbbdev                 }
185*51c0b2f7Stbbdev 
186*51c0b2f7Stbbdev                 utils::NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) );
187*51c0b2f7Stbbdev                 g.wait_for_all();
188*51c0b2f7Stbbdev 
189*51c0b2f7Stbbdev                 for (size_t s = 0; s < num_senders; ++s ) {
190*51c0b2f7Stbbdev                     size_t n = senders[s]->my_received;
191*51c0b2f7Stbbdev                     CHECK_MESSAGE( n == N, "" );
192*51c0b2f7Stbbdev                     CHECK_MESSAGE( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_node, "" );
193*51c0b2f7Stbbdev                 }
194*51c0b2f7Stbbdev                 for (size_t r = 0; r < num_receivers; ++r ) {
195*51c0b2f7Stbbdev                     receivers[r]->validate();
196*51c0b2f7Stbbdev                 }
197*51c0b2f7Stbbdev             }
198*51c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
199*51c0b2f7Stbbdev                 tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] );
200*51c0b2f7Stbbdev             }
201*51c0b2f7Stbbdev             CHECK_MESSAGE( exe_node.try_put( InputType() ) == true, "" );
202*51c0b2f7Stbbdev             g.wait_for_all();
203*51c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
204*51c0b2f7Stbbdev                 receivers[r]->validate();
205*51c0b2f7Stbbdev             }
206*51c0b2f7Stbbdev         }
207*51c0b2f7Stbbdev 
208*51c0b2f7Stbbdev         // validate that the local body matches the global execute_count and both are correct
209*51c0b2f7Stbbdev         inc_functor body_copy = tbb::flow::copy_body<inc_functor>( exe_node );
210*51c0b2f7Stbbdev         const size_t expected_count = N/2 * MAX_NODES * MAX_NODES * ( MAX_NODES + 1 ) + MAX_NODES + Offset;
211*51c0b2f7Stbbdev         size_t global_count = global_execute_count;
212*51c0b2f7Stbbdev         size_t inc_count = body_copy.local_execute_count;
213*51c0b2f7Stbbdev         CHECK_MESSAGE( (global_count == expected_count && global_count == inc_count), "" );
214*51c0b2f7Stbbdev     }
215*51c0b2f7Stbbdev }
216*51c0b2f7Stbbdev 
217*51c0b2f7Stbbdev template< typename InputType, typename OutputTuple >
218*51c0b2f7Stbbdev void run_buffered_levels( int c ) {
219*51c0b2f7Stbbdev     typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type;
220*51c0b2f7Stbbdev     buffered_levels<InputType,OutputTuple>( c, []( InputType i, output_ports_type &p ) { harness_graph_multifunction_executor<InputType, OutputTuple>::func(i,p); } );
221*51c0b2f7Stbbdev     buffered_levels<InputType,OutputTuple>( c, &harness_graph_multifunction_executor<InputType, OutputTuple>::func );
222*51c0b2f7Stbbdev     buffered_levels<InputType,OutputTuple>( c, typename harness_graph_multifunction_executor<InputType, OutputTuple>::functor() );
223*51c0b2f7Stbbdev     buffered_levels_with_copy<InputType,OutputTuple>( c );
224*51c0b2f7Stbbdev }
225*51c0b2f7Stbbdev 
226*51c0b2f7Stbbdev 
227*51c0b2f7Stbbdev //! Performs test on executable nodes with limited concurrency
228*51c0b2f7Stbbdev /** These tests check:
229*51c0b2f7Stbbdev     1) that the nodes will accepts puts up to the concurrency limit,
230*51c0b2f7Stbbdev     2) the nodes do not exceed the concurrency limit even when run with more threads (this is checked in the harness_graph_executor),
231*51c0b2f7Stbbdev     3) the nodes will receive puts from multiple successors simultaneously,
232*51c0b2f7Stbbdev     and 4) the nodes will send to multiple predecessors.
233*51c0b2f7Stbbdev     There is no checking of the contents of the messages for corruption.
234*51c0b2f7Stbbdev */
235*51c0b2f7Stbbdev 
236*51c0b2f7Stbbdev template< typename InputType, typename OutputTuple, typename Body >
237*51c0b2f7Stbbdev void concurrency_levels( size_t concurrency, Body body ) {
238*51c0b2f7Stbbdev     typedef typename std::tuple_element<0,OutputTuple>::type OutputType;
239*51c0b2f7Stbbdev     for ( size_t lc = 1; lc <= concurrency; ++lc ) {
240*51c0b2f7Stbbdev         tbb::flow::graph g;
241*51c0b2f7Stbbdev 
242*51c0b2f7Stbbdev         // Set the execute_counter back to zero in the harness
243*51c0b2f7Stbbdev         harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count = 0;
244*51c0b2f7Stbbdev         // Set the number of current executors to zero.
245*51c0b2f7Stbbdev         harness_graph_multifunction_executor<InputType, OutputTuple>::current_executors = 0;
246*51c0b2f7Stbbdev         // Set the max allowed executors to lc.  There is a check in the functor to make sure this is never exceeded.
247*51c0b2f7Stbbdev         harness_graph_multifunction_executor<InputType, OutputTuple>::max_executors = lc;
248*51c0b2f7Stbbdev 
249*51c0b2f7Stbbdev 
250*51c0b2f7Stbbdev         tbb::flow::multifunction_node< InputType, OutputTuple, tbb::flow::rejecting > exe_node( g, lc, body );
251*51c0b2f7Stbbdev 
252*51c0b2f7Stbbdev         for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
253*51c0b2f7Stbbdev 
254*51c0b2f7Stbbdev             std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers;
255*51c0b2f7Stbbdev             for (size_t i = 0; i < num_receivers; ++i) {
256*51c0b2f7Stbbdev                 receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) );
257*51c0b2f7Stbbdev             }
258*51c0b2f7Stbbdev 
259*51c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
260*51c0b2f7Stbbdev                 tbb::flow::make_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] );
261*51c0b2f7Stbbdev             }
262*51c0b2f7Stbbdev 
263*51c0b2f7Stbbdev             std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders;
264*51c0b2f7Stbbdev 
265*51c0b2f7Stbbdev             for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) {
266*51c0b2f7Stbbdev                 {
267*51c0b2f7Stbbdev                     // Exclusively lock m to prevent exe_node from finishing
268*51c0b2f7Stbbdev                     tbb::spin_rw_mutex::scoped_lock l(
269*51c0b2f7Stbbdev                         harness_graph_multifunction_executor< InputType, OutputTuple>::template mutex_holder<tbb::spin_rw_mutex>::mutex
270*51c0b2f7Stbbdev                     );
271*51c0b2f7Stbbdev 
272*51c0b2f7Stbbdev                     // put to lc level, it will accept and then block at m
273*51c0b2f7Stbbdev                     for ( size_t c = 0 ; c < lc ; ++c ) {
274*51c0b2f7Stbbdev                         CHECK_MESSAGE( exe_node.try_put( InputType() ) == true, "" );
275*51c0b2f7Stbbdev                     }
276*51c0b2f7Stbbdev                     // it only accepts to lc level
277*51c0b2f7Stbbdev                     CHECK_MESSAGE( exe_node.try_put( InputType() ) == false, "" );
278*51c0b2f7Stbbdev 
279*51c0b2f7Stbbdev                     senders.clear();
280*51c0b2f7Stbbdev                     for (size_t s = 0; s < num_senders; ++s ) {
281*51c0b2f7Stbbdev                         senders.push_back( std::make_shared<harness_counting_sender<InputType>>() );
282*51c0b2f7Stbbdev                         senders.back()->my_limit = N;
283*51c0b2f7Stbbdev                         exe_node.register_predecessor( *senders.back() );
284*51c0b2f7Stbbdev                     }
285*51c0b2f7Stbbdev 
286*51c0b2f7Stbbdev                 } // release lock at end of scope, setting the exe node free to continue
287*51c0b2f7Stbbdev                 // wait for graph to settle down
288*51c0b2f7Stbbdev                 g.wait_for_all();
289*51c0b2f7Stbbdev 
290*51c0b2f7Stbbdev                 // confirm that each sender was requested from N times
291*51c0b2f7Stbbdev                 for (size_t s = 0; s < num_senders; ++s ) {
292*51c0b2f7Stbbdev                     size_t n = senders[s]->my_received;
293*51c0b2f7Stbbdev                     CHECK_MESSAGE( n == N, "" );
294*51c0b2f7Stbbdev                     CHECK_MESSAGE( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_node, "" );
295*51c0b2f7Stbbdev                 }
296*51c0b2f7Stbbdev                 // confirm that each receivers got N * num_senders + the initial lc puts
297*51c0b2f7Stbbdev                 for (size_t r = 0; r < num_receivers; ++r ) {
298*51c0b2f7Stbbdev                     size_t n = receivers[r]->my_count;
299*51c0b2f7Stbbdev                     CHECK_MESSAGE( n == num_senders*N+lc, "" );
300*51c0b2f7Stbbdev                     receivers[r]->my_count = 0;
301*51c0b2f7Stbbdev                 }
302*51c0b2f7Stbbdev             }
303*51c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
304*51c0b2f7Stbbdev                 tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] );
305*51c0b2f7Stbbdev             }
306*51c0b2f7Stbbdev             CHECK_MESSAGE( exe_node.try_put( InputType() ) == true, "" );
307*51c0b2f7Stbbdev             g.wait_for_all();
308*51c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
309*51c0b2f7Stbbdev                 CHECK_MESSAGE( int(receivers[r]->my_count) == 0, "" );
310*51c0b2f7Stbbdev             }
311*51c0b2f7Stbbdev         }
312*51c0b2f7Stbbdev     }
313*51c0b2f7Stbbdev }
314*51c0b2f7Stbbdev 
315*51c0b2f7Stbbdev template< typename InputType, typename OutputTuple >
316*51c0b2f7Stbbdev void run_concurrency_levels( int c ) {
317*51c0b2f7Stbbdev     typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type;
318*51c0b2f7Stbbdev     concurrency_levels<InputType,OutputTuple>( c, []( InputType i, output_ports_type &p ) { harness_graph_multifunction_executor<InputType, OutputTuple>::template tfunc<tbb::spin_rw_mutex>(i,p); } );
319*51c0b2f7Stbbdev     concurrency_levels<InputType,OutputTuple>( c, &harness_graph_multifunction_executor<InputType, OutputTuple>::template tfunc<tbb::spin_rw_mutex> );
320*51c0b2f7Stbbdev     concurrency_levels<InputType,OutputTuple>( c, typename harness_graph_multifunction_executor<InputType, OutputTuple>::template tfunctor<tbb::spin_rw_mutex>() );
321*51c0b2f7Stbbdev }
322*51c0b2f7Stbbdev 
323*51c0b2f7Stbbdev 
324*51c0b2f7Stbbdev struct empty_no_assign {
325*51c0b2f7Stbbdev    empty_no_assign() {}
326*51c0b2f7Stbbdev    empty_no_assign( int ) {}
327*51c0b2f7Stbbdev    operator int() { return 0; }
328*51c0b2f7Stbbdev    operator int() const { return 0; }
329*51c0b2f7Stbbdev };
330*51c0b2f7Stbbdev 
331*51c0b2f7Stbbdev template< typename InputType >
332*51c0b2f7Stbbdev struct parallel_puts : private utils::NoAssign {
333*51c0b2f7Stbbdev 
334*51c0b2f7Stbbdev     tbb::flow::receiver< InputType > * const my_exe_node;
335*51c0b2f7Stbbdev 
336*51c0b2f7Stbbdev     parallel_puts( tbb::flow::receiver< InputType > &exe_node ) : my_exe_node(&exe_node) {}
337*51c0b2f7Stbbdev 
338*51c0b2f7Stbbdev     void operator()( int ) const  {
339*51c0b2f7Stbbdev         for ( int i = 0; i < N; ++i ) {
340*51c0b2f7Stbbdev             // the nodes will accept all puts
341*51c0b2f7Stbbdev             CHECK_MESSAGE( my_exe_node->try_put( InputType() ) == true, "" );
342*51c0b2f7Stbbdev         }
343*51c0b2f7Stbbdev     }
344*51c0b2f7Stbbdev 
345*51c0b2f7Stbbdev };
346*51c0b2f7Stbbdev 
347*51c0b2f7Stbbdev //! Performs test on executable nodes with unlimited concurrency
348*51c0b2f7Stbbdev /** These tests check:
349*51c0b2f7Stbbdev     1) that the nodes will accept all puts
350*51c0b2f7Stbbdev     2) the nodes will receive puts from multiple predecessors simultaneously,
351*51c0b2f7Stbbdev     and 3) the nodes will send to multiple successors.
352*51c0b2f7Stbbdev     There is no checking of the contents of the messages for corruption.
353*51c0b2f7Stbbdev */
354*51c0b2f7Stbbdev 
355*51c0b2f7Stbbdev template< typename InputType, typename OutputTuple, typename Body >
356*51c0b2f7Stbbdev void unlimited_concurrency( Body body ) {
357*51c0b2f7Stbbdev     typedef typename std::tuple_element<0,OutputTuple>::type OutputType;
358*51c0b2f7Stbbdev 
359*51c0b2f7Stbbdev     for (unsigned int p = 1; p < 2*utils::MaxThread; ++p) {
360*51c0b2f7Stbbdev         tbb::flow::graph g;
361*51c0b2f7Stbbdev         tbb::flow::multifunction_node< InputType, OutputTuple, tbb::flow::rejecting > exe_node( g, tbb::flow::unlimited, body );
362*51c0b2f7Stbbdev 
363*51c0b2f7Stbbdev         for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
364*51c0b2f7Stbbdev             std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers;
365*51c0b2f7Stbbdev             for (size_t i = 0; i < num_receivers; ++i) {
366*51c0b2f7Stbbdev                 receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) );
367*51c0b2f7Stbbdev             }
368*51c0b2f7Stbbdev 
369*51c0b2f7Stbbdev             harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count = 0;
370*51c0b2f7Stbbdev 
371*51c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
372*51c0b2f7Stbbdev                 tbb::flow::make_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] );
373*51c0b2f7Stbbdev             }
374*51c0b2f7Stbbdev 
375*51c0b2f7Stbbdev             utils::NativeParallelFor( p, parallel_puts<InputType>(exe_node) );
376*51c0b2f7Stbbdev             g.wait_for_all();
377*51c0b2f7Stbbdev 
378*51c0b2f7Stbbdev             // 2) the nodes will receive puts from multiple predecessors simultaneously,
379*51c0b2f7Stbbdev             size_t ec = harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count;
380*51c0b2f7Stbbdev             CHECK_MESSAGE( (unsigned int)ec == p*N, "" );
381*51c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
382*51c0b2f7Stbbdev                 size_t c = receivers[r]->my_count;
383*51c0b2f7Stbbdev                 // 3) the nodes will send to multiple successors.
384*51c0b2f7Stbbdev                 CHECK_MESSAGE( (unsigned int)c == p*N, "" );
385*51c0b2f7Stbbdev             }
386*51c0b2f7Stbbdev             for (size_t r = 0; r < num_receivers; ++r ) {
387*51c0b2f7Stbbdev                 tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] );
388*51c0b2f7Stbbdev             }
389*51c0b2f7Stbbdev         }
390*51c0b2f7Stbbdev     }
391*51c0b2f7Stbbdev }
392*51c0b2f7Stbbdev 
393*51c0b2f7Stbbdev template< typename InputType, typename OutputTuple >
394*51c0b2f7Stbbdev void run_unlimited_concurrency() {
395*51c0b2f7Stbbdev     harness_graph_multifunction_executor<InputType, OutputTuple>::max_executors = 0;
396*51c0b2f7Stbbdev     typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type;
397*51c0b2f7Stbbdev     unlimited_concurrency<InputType,OutputTuple>( []( InputType i, output_ports_type &p ) { harness_graph_multifunction_executor<InputType, OutputTuple>::func(i,p); } );
398*51c0b2f7Stbbdev     unlimited_concurrency<InputType,OutputTuple>( &harness_graph_multifunction_executor<InputType, OutputTuple>::func );
399*51c0b2f7Stbbdev     unlimited_concurrency<InputType,OutputTuple>( typename harness_graph_multifunction_executor<InputType, OutputTuple>::functor() );
400*51c0b2f7Stbbdev }
401*51c0b2f7Stbbdev 
402*51c0b2f7Stbbdev template<typename InputType, typename OutputTuple>
403*51c0b2f7Stbbdev struct oddEvenBody {
404*51c0b2f7Stbbdev     typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type;
405*51c0b2f7Stbbdev     typedef typename std::tuple_element<0,OutputTuple>::type EvenType;
406*51c0b2f7Stbbdev     typedef typename std::tuple_element<1,OutputTuple>::type OddType;
407*51c0b2f7Stbbdev     void operator() (const InputType &i, output_ports_type &p) {
408*51c0b2f7Stbbdev         if((int)i % 2) {
409*51c0b2f7Stbbdev             (void)std::get<1>(p).try_put(OddType(i));
410*51c0b2f7Stbbdev         }
411*51c0b2f7Stbbdev         else {
412*51c0b2f7Stbbdev             (void)std::get<0>(p).try_put(EvenType(i));
413*51c0b2f7Stbbdev         }
414*51c0b2f7Stbbdev     }
415*51c0b2f7Stbbdev };
416*51c0b2f7Stbbdev 
417*51c0b2f7Stbbdev template<typename InputType, typename OutputTuple >
418*51c0b2f7Stbbdev void run_multiport_test(int num_threads) {
419*51c0b2f7Stbbdev     typedef typename tbb::flow::multifunction_node<InputType, OutputTuple> mo_node_type;
420*51c0b2f7Stbbdev     typedef typename std::tuple_element<0,OutputTuple>::type EvenType;
421*51c0b2f7Stbbdev     typedef typename std::tuple_element<1,OutputTuple>::type OddType;
422*51c0b2f7Stbbdev     tbb::task_arena arena(num_threads);
423*51c0b2f7Stbbdev     arena.execute(
424*51c0b2f7Stbbdev         [&] () {
425*51c0b2f7Stbbdev             tbb::flow::graph g;
426*51c0b2f7Stbbdev             mo_node_type mo_node(g, tbb::flow::unlimited, oddEvenBody<InputType, OutputTuple>() );
427*51c0b2f7Stbbdev 
428*51c0b2f7Stbbdev             tbb::flow::queue_node<EvenType> q0(g);
429*51c0b2f7Stbbdev             tbb::flow::queue_node<OddType> q1(g);
430*51c0b2f7Stbbdev 
431*51c0b2f7Stbbdev             tbb::flow::make_edge(tbb::flow::output_port<0>(mo_node), q0);
432*51c0b2f7Stbbdev             tbb::flow::make_edge(tbb::flow::output_port<1>(mo_node), q1);
433*51c0b2f7Stbbdev 
434*51c0b2f7Stbbdev             for(InputType i = 0; i < N; ++i) {
435*51c0b2f7Stbbdev                 mo_node.try_put(i);
436*51c0b2f7Stbbdev             }
437*51c0b2f7Stbbdev 
438*51c0b2f7Stbbdev             g.wait_for_all();
439*51c0b2f7Stbbdev             for(int i = 0; i < N/2; ++i) {
440*51c0b2f7Stbbdev                 EvenType e{};
441*51c0b2f7Stbbdev                 OddType o{};
442*51c0b2f7Stbbdev                 CHECK_MESSAGE( q0.try_get(e), "" );
443*51c0b2f7Stbbdev                 CHECK_MESSAGE( (int)e % 2 == 0, "" );
444*51c0b2f7Stbbdev                 CHECK_MESSAGE( q1.try_get(o), "" );
445*51c0b2f7Stbbdev                 CHECK_MESSAGE( (int)o % 2 == 1, "" );
446*51c0b2f7Stbbdev             }
447*51c0b2f7Stbbdev         }
448*51c0b2f7Stbbdev     );
449*51c0b2f7Stbbdev }
450*51c0b2f7Stbbdev 
451*51c0b2f7Stbbdev //! Tests limited concurrency cases for nodes that accept data messages
452*51c0b2f7Stbbdev void test_concurrency(int num_threads) {
453*51c0b2f7Stbbdev     tbb::task_arena arena(num_threads);
454*51c0b2f7Stbbdev     arena.execute(
455*51c0b2f7Stbbdev         [&] () {
456*51c0b2f7Stbbdev             run_concurrency_levels<int,std::tuple<int> >(num_threads);
457*51c0b2f7Stbbdev             run_concurrency_levels<int,std::tuple<tbb::flow::continue_msg> >(num_threads);
458*51c0b2f7Stbbdev             run_buffered_levels<int, std::tuple<int> >(num_threads);
459*51c0b2f7Stbbdev             run_unlimited_concurrency<int, std::tuple<int> >();
460*51c0b2f7Stbbdev             run_unlimited_concurrency<int,std::tuple<empty_no_assign> >();
461*51c0b2f7Stbbdev             run_unlimited_concurrency<empty_no_assign,std::tuple<int> >();
462*51c0b2f7Stbbdev             run_unlimited_concurrency<empty_no_assign,std::tuple<empty_no_assign> >();
463*51c0b2f7Stbbdev             run_unlimited_concurrency<int,std::tuple<tbb::flow::continue_msg> >();
464*51c0b2f7Stbbdev             run_unlimited_concurrency<empty_no_assign,std::tuple<tbb::flow::continue_msg> >();
465*51c0b2f7Stbbdev             run_multiport_test<int, std::tuple<int, int> >(num_threads);
466*51c0b2f7Stbbdev             run_multiport_test<float, std::tuple<int, double> >(num_threads);
467*51c0b2f7Stbbdev         }
468*51c0b2f7Stbbdev     );
469*51c0b2f7Stbbdev }
470*51c0b2f7Stbbdev 
471*51c0b2f7Stbbdev template<typename Policy>
472*51c0b2f7Stbbdev void test_ports_return_references() {
473*51c0b2f7Stbbdev     tbb::flow::graph g;
474*51c0b2f7Stbbdev     typedef int InputType;
475*51c0b2f7Stbbdev     typedef std::tuple<int> OutputTuple;
476*51c0b2f7Stbbdev     tbb::flow::multifunction_node<InputType, OutputTuple, Policy> mf_node(
477*51c0b2f7Stbbdev         g, tbb::flow::unlimited,
478*51c0b2f7Stbbdev         &harness_graph_multifunction_executor<InputType, OutputTuple>::empty_func );
479*51c0b2f7Stbbdev     test_output_ports_return_ref(mf_node);
480*51c0b2f7Stbbdev }
481*51c0b2f7Stbbdev 
482*51c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
483*51c0b2f7Stbbdev #include <array>
484*51c0b2f7Stbbdev #include <vector>
485*51c0b2f7Stbbdev 
486*51c0b2f7Stbbdev void test_precedes() {
487*51c0b2f7Stbbdev     using namespace tbb::flow;
488*51c0b2f7Stbbdev 
489*51c0b2f7Stbbdev     using multinode = multifunction_node<int, std::tuple<int, int>>;
490*51c0b2f7Stbbdev 
491*51c0b2f7Stbbdev     graph g;
492*51c0b2f7Stbbdev 
493*51c0b2f7Stbbdev     buffer_node<int> b1(g);
494*51c0b2f7Stbbdev     buffer_node<int> b2(g);
495*51c0b2f7Stbbdev 
496*51c0b2f7Stbbdev     multinode node(precedes(b1, b2), unlimited, [](const int& i, multinode::output_ports_type& op) -> void {
497*51c0b2f7Stbbdev             if (i % 2)
498*51c0b2f7Stbbdev                 std::get<0>(op).try_put(i);
499*51c0b2f7Stbbdev             else
500*51c0b2f7Stbbdev                 std::get<1>(op).try_put(i);
501*51c0b2f7Stbbdev         }
502*51c0b2f7Stbbdev     );
503*51c0b2f7Stbbdev 
504*51c0b2f7Stbbdev     node.try_put(0);
505*51c0b2f7Stbbdev     node.try_put(1);
506*51c0b2f7Stbbdev     g.wait_for_all();
507*51c0b2f7Stbbdev 
508*51c0b2f7Stbbdev     int storage;
509*51c0b2f7Stbbdev     CHECK_MESSAGE((b1.try_get(storage) && !b1.try_get(storage) && b2.try_get(storage) && !b2.try_get(storage)),
510*51c0b2f7Stbbdev             "Not exact edge quantity was made");
511*51c0b2f7Stbbdev }
512*51c0b2f7Stbbdev 
513*51c0b2f7Stbbdev void test_follows_and_precedes_api() {
514*51c0b2f7Stbbdev     using multinode = tbb::flow::multifunction_node<int, std::tuple<int, int, int>>;
515*51c0b2f7Stbbdev 
516*51c0b2f7Stbbdev     std::array<int, 3> messages_for_follows = { {0, 1, 2} };
517*51c0b2f7Stbbdev 
518*51c0b2f7Stbbdev     follows_and_precedes_testing::test_follows
519*51c0b2f7Stbbdev         <int, tbb::flow::multifunction_node<int, std::tuple<int, int, int>>>
520*51c0b2f7Stbbdev         (messages_for_follows, tbb::flow::unlimited, [](const int& i, multinode::output_ports_type& op) -> void {
521*51c0b2f7Stbbdev             std::get<0>(op).try_put(i);
522*51c0b2f7Stbbdev         });
523*51c0b2f7Stbbdev 
524*51c0b2f7Stbbdev     test_precedes();
525*51c0b2f7Stbbdev }
526*51c0b2f7Stbbdev #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
527*51c0b2f7Stbbdev 
528*51c0b2f7Stbbdev //! Test various node bodies with concurrency
529*51c0b2f7Stbbdev //! \brief \ref error_guessing
530*51c0b2f7Stbbdev TEST_CASE("Concurrency test"){
531*51c0b2f7Stbbdev     for( unsigned int p=utils::MinThread; p<=utils::MaxThread; ++p ) {
532*51c0b2f7Stbbdev        test_concurrency(p);
533*51c0b2f7Stbbdev     }
534*51c0b2f7Stbbdev }
535*51c0b2f7Stbbdev 
536*51c0b2f7Stbbdev //! Test return types of ports
537*51c0b2f7Stbbdev //! \brief \ref error_guessing
538*51c0b2f7Stbbdev TEST_CASE("Test ports retrurn references"){
539*51c0b2f7Stbbdev     test_ports_return_references<tbb::flow::queueing>();
540*51c0b2f7Stbbdev     test_ports_return_references<tbb::flow::rejecting>();
541*51c0b2f7Stbbdev }
542*51c0b2f7Stbbdev 
543*51c0b2f7Stbbdev //! NativeParallelFor testing with various concurrency settings
544*51c0b2f7Stbbdev //! \brief \ref error_guessing
545*51c0b2f7Stbbdev TEST_CASE("Lightweight testing"){
546*51c0b2f7Stbbdev     lightweight_testing::test<tbb::flow::multifunction_node>(10);
547*51c0b2f7Stbbdev }
548*51c0b2f7Stbbdev 
549*51c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
550*51c0b2f7Stbbdev //! Test follows and precedes API
551*51c0b2f7Stbbdev //! \brief \ref error_guessing
552*51c0b2f7Stbbdev TEST_CASE("Test follows-precedes API"){
553*51c0b2f7Stbbdev     test_follows_and_precedes_api();
554*51c0b2f7Stbbdev }
555*51c0b2f7Stbbdev //! Test priority constructor with follows and precedes API
556*51c0b2f7Stbbdev //! \brief \ref error_guessing
557*51c0b2f7Stbbdev TEST_CASE("Test priority with follows and precedes"){
558*51c0b2f7Stbbdev     using namespace tbb::flow;
559*51c0b2f7Stbbdev 
560*51c0b2f7Stbbdev     using multinode = multifunction_node<int, std::tuple<int, int>>;
561*51c0b2f7Stbbdev 
562*51c0b2f7Stbbdev     graph g;
563*51c0b2f7Stbbdev 
564*51c0b2f7Stbbdev     buffer_node<int> b1(g);
565*51c0b2f7Stbbdev     buffer_node<int> b2(g);
566*51c0b2f7Stbbdev 
567*51c0b2f7Stbbdev     multinode node(precedes(b1, b2), unlimited, [](const int& i, multinode::output_ports_type& op) -> void {
568*51c0b2f7Stbbdev             if (i % 2)
569*51c0b2f7Stbbdev                 std::get<0>(op).try_put(i);
570*51c0b2f7Stbbdev             else
571*51c0b2f7Stbbdev                 std::get<1>(op).try_put(i);
572*51c0b2f7Stbbdev         }
573*51c0b2f7Stbbdev         , node_priority_t(0));
574*51c0b2f7Stbbdev 
575*51c0b2f7Stbbdev     node.try_put(0);
576*51c0b2f7Stbbdev     node.try_put(1);
577*51c0b2f7Stbbdev     g.wait_for_all();
578*51c0b2f7Stbbdev 
579*51c0b2f7Stbbdev     int storage;
580*51c0b2f7Stbbdev     CHECK_MESSAGE((b1.try_get(storage) && !b1.try_get(storage) && b2.try_get(storage) && !b2.try_get(storage)),
581*51c0b2f7Stbbdev             "Not exact edge quantity was made");
582*51c0b2f7Stbbdev }
583*51c0b2f7Stbbdev 
584*51c0b2f7Stbbdev #endif
585*51c0b2f7Stbbdev 
586