1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #if __INTEL_COMPILER && _MSC_VER
18 #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
19 #endif
20 
21 #include "common/config.h"
22 
23 // TODO revamp: move parts dependent on __TBB_EXTRA_DEBUG into separate test(s) since having these
24 // parts in all of tests might make testing of the product, which is different from what is actually
25 // released.
26 #define __TBB_EXTRA_DEBUG 1
27 #include "tbb/flow_graph.h"
28 #include "tbb/spin_rw_mutex.h"
29 
30 #include "common/test.h"
31 #include "common/utils.h"
32 #include "common/graph_utils.h"
33 #include "common/test_follows_and_precedes_api.h"
34 
35 
36 //! \file test_multifunction_node.cpp
37 //! \brief Test for [flow_graph.multifunction_node] specification
38 
39 
40 #if TBB_USE_DEBUG
41 #define N 16
42 #else
43 #define N 100
44 #endif
45 #define MAX_NODES 4
46 
47 //! Performs test on function nodes with limited concurrency and buffering
48 /** These tests check:
49     1) that the number of executing copies never exceed the concurrency limit
50     2) that the node never rejects
51     3) that no items are lost
52     and 4) all of this happens even if there are multiple predecessors and successors
53 */
54 
55 //! exercise buffered multifunction_node.
56 template< typename InputType, typename OutputTuple, typename Body >
57 void buffered_levels( size_t concurrency, Body body ) {
58     typedef typename std::tuple_element<0,OutputTuple>::type OutputType;
59     // Do for lc = 1 to concurrency level
60     for ( size_t lc = 1; lc <= concurrency; ++lc ) {
61         tbb::flow::graph g;
62 
63         // Set the execute_counter back to zero in the harness
64         harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count = 0;
65         // Set the number of current executors to zero.
66         harness_graph_multifunction_executor<InputType, OutputTuple>::current_executors = 0;
67         // Set the max allowed executors to lc.  There is a check in the functor to make sure this is never exceeded.
68         harness_graph_multifunction_executor<InputType, OutputTuple>::max_executors = lc;
69 
70         // Create the function_node with the appropriate concurrency level, and use default buffering
71         tbb::flow::multifunction_node< InputType, OutputTuple > exe_node( g, lc, body );
72 
73         //Create a vector of identical exe_nodes
74         std::vector< tbb::flow::multifunction_node< InputType, OutputTuple > > exe_vec(2, exe_node);
75 
76         // exercise each of the copied nodes
77         for (size_t node_idx=0; node_idx<exe_vec.size(); ++node_idx) {
78             for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
79                 // Create num_receivers counting receivers and connect the exe_vec[node_idx] to them.
80                 std::vector< std::shared_ptr<harness_mapped_receiver<OutputType>> > receivers;
81                 for (size_t i = 0; i < num_receivers; i++) {
82                     receivers.push_back( std::make_shared<harness_mapped_receiver<OutputType>>(g) );
83                 }
84 
85                 for (size_t r = 0; r < num_receivers; ++r ) {
86                     tbb::flow::make_edge( tbb::flow::output_port<0>(exe_vec[node_idx]), *receivers[r] );
87                 }
88 
89                 // Do the test with varying numbers of senders
90                 std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders;
91                 for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) {
92                     // Create num_senders senders, set their message limit each to N, and connect
93                     // them to the exe_vec[node_idx]
94                     senders.clear();
95                     for (size_t s = 0; s < num_senders; ++s ) {
96                         senders.push_back( std::make_shared<harness_counting_sender<InputType>>() );
97                         senders.back()->my_limit = N;
98                         tbb::flow::make_edge( *senders.back(), exe_vec[node_idx] );
99                     }
100 
101                     // Initialize the receivers so they know how many senders and messages to check for
102                     for (size_t r = 0; r < num_receivers; ++r ) {
103                         receivers[r]->initialize_map( N, num_senders );
104                     }
105 
106                     // Do the test
107                     utils::NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) );
108                     g.wait_for_all();
109 
110                     // confirm that each sender was requested from N times
111                     for (size_t s = 0; s < num_senders; ++s ) {
112                         size_t n = senders[s]->my_received;
113                         CHECK_MESSAGE( n == N, "" );
114                         CHECK_MESSAGE( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_vec[node_idx], "" );
115                     }
116                     // validate the receivers
117                     for (size_t r = 0; r < num_receivers; ++r ) {
118                         receivers[r]->validate();
119                     }
120                 }
121                 for (size_t r = 0; r < num_receivers; ++r ) {
122                     tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_vec[node_idx]), *receivers[r] );
123                 }
124                 CHECK_MESSAGE( exe_vec[node_idx].try_put( InputType() ) == true, "" );
125                 g.wait_for_all();
126                 for (size_t r = 0; r < num_receivers; ++r ) {
127                     // since it's detached, nothing should have changed
128                     receivers[r]->validate();
129                 }
130             }
131         }
132     }
133 }
134 
135 const size_t Offset = 123;
136 std::atomic<size_t> global_execute_count;
137 
138 struct inc_functor {
139 
140     std::atomic<size_t> local_execute_count;
141     inc_functor( ) { local_execute_count = 0; }
142     inc_functor( const inc_functor &f ) { local_execute_count = size_t(f.local_execute_count); }
143 
144     template<typename output_ports_type>
145     void operator()( int i, output_ports_type &p ) {
146        ++global_execute_count;
147        ++local_execute_count;
148        (void)std::get<0>(p).try_put(i);
149     }
150 
151 };
152 
153 template< typename InputType, typename OutputTuple >
154 void buffered_levels_with_copy( size_t concurrency ) {
155     typedef typename std::tuple_element<0,OutputTuple>::type OutputType;
156     // Do for lc = 1 to concurrency level
157     for ( size_t lc = 1; lc <= concurrency; ++lc ) {
158         tbb::flow::graph g;
159 
160         inc_functor cf;
161         cf.local_execute_count = Offset;
162         global_execute_count = Offset;
163 
164         tbb::flow::multifunction_node< InputType, OutputTuple > exe_node( g, lc, cf );
165 
166         for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
167 
168             std::vector< std::shared_ptr<harness_mapped_receiver<OutputType>> > receivers;
169             for (size_t i = 0; i < num_receivers; i++) {
170                 receivers.push_back( std::make_shared<harness_mapped_receiver<OutputType>>(g) );
171             }
172 
173             for (size_t r = 0; r < num_receivers; ++r ) {
174                tbb::flow::make_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] );
175             }
176 
177             std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders;
178             for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) {
179                 senders.clear();
180                 for (size_t s = 0; s < num_senders; ++s ) {
181                     senders.push_back( std::make_shared<harness_counting_sender<InputType>>() );
182                     senders.back()->my_limit = N;
183                     tbb::flow::make_edge( *senders.back(), exe_node );
184                 }
185 
186                 for (size_t r = 0; r < num_receivers; ++r ) {
187                     receivers[r]->initialize_map( N, num_senders );
188                 }
189 
190                 utils::NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) );
191                 g.wait_for_all();
192 
193                 for (size_t s = 0; s < num_senders; ++s ) {
194                     size_t n = senders[s]->my_received;
195                     CHECK_MESSAGE( n == N, "" );
196                     CHECK_MESSAGE( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_node, "" );
197                 }
198                 for (size_t r = 0; r < num_receivers; ++r ) {
199                     receivers[r]->validate();
200                 }
201             }
202             for (size_t r = 0; r < num_receivers; ++r ) {
203                 tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] );
204             }
205             CHECK_MESSAGE( exe_node.try_put( InputType() ) == true, "" );
206             g.wait_for_all();
207             for (size_t r = 0; r < num_receivers; ++r ) {
208                 receivers[r]->validate();
209             }
210         }
211 
212         // validate that the local body matches the global execute_count and both are correct
213         inc_functor body_copy = tbb::flow::copy_body<inc_functor>( exe_node );
214         const size_t expected_count = N/2 * MAX_NODES * MAX_NODES * ( MAX_NODES + 1 ) + MAX_NODES + Offset;
215         size_t global_count = global_execute_count;
216         size_t inc_count = body_copy.local_execute_count;
217         CHECK_MESSAGE( (global_count == expected_count && global_count == inc_count), "" );
218     }
219 }
220 
221 template< typename InputType, typename OutputTuple >
222 void run_buffered_levels( int c ) {
223     typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type;
224     buffered_levels<InputType,OutputTuple>( c, []( InputType i, output_ports_type &p ) { harness_graph_multifunction_executor<InputType, OutputTuple>::func(i,p); } );
225     buffered_levels<InputType,OutputTuple>( c, &harness_graph_multifunction_executor<InputType, OutputTuple>::func );
226     buffered_levels<InputType,OutputTuple>( c, typename harness_graph_multifunction_executor<InputType, OutputTuple>::functor() );
227     buffered_levels_with_copy<InputType,OutputTuple>( c );
228 }
229 
230 
231 //! Performs test on executable nodes with limited concurrency
232 /** These tests check:
233     1) that the nodes will accepts puts up to the concurrency limit,
234     2) the nodes do not exceed the concurrency limit even when run with more threads (this is checked in the harness_graph_executor),
235     3) the nodes will receive puts from multiple successors simultaneously,
236     and 4) the nodes will send to multiple predecessors.
237     There is no checking of the contents of the messages for corruption.
238 */
239 
240 template< typename InputType, typename OutputTuple, typename Body >
241 void concurrency_levels( size_t concurrency, Body body ) {
242     typedef typename std::tuple_element<0,OutputTuple>::type OutputType;
243     for ( size_t lc = 1; lc <= concurrency; ++lc ) {
244         tbb::flow::graph g;
245 
246         // Set the execute_counter back to zero in the harness
247         harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count = 0;
248         // Set the number of current executors to zero.
249         harness_graph_multifunction_executor<InputType, OutputTuple>::current_executors = 0;
250         // Set the max allowed executors to lc.  There is a check in the functor to make sure this is never exceeded.
251         harness_graph_multifunction_executor<InputType, OutputTuple>::max_executors = lc;
252 
253 
254         tbb::flow::multifunction_node< InputType, OutputTuple, tbb::flow::rejecting > exe_node( g, lc, body );
255 
256         for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
257 
258             std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers;
259             for (size_t i = 0; i < num_receivers; ++i) {
260                 receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) );
261             }
262 
263             for (size_t r = 0; r < num_receivers; ++r ) {
264                 tbb::flow::make_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] );
265             }
266 
267             std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders;
268 
269             for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) {
270                 {
271                     // Exclusively lock m to prevent exe_node from finishing
272                     tbb::spin_rw_mutex::scoped_lock l(
273                         harness_graph_multifunction_executor< InputType, OutputTuple>::template mutex_holder<tbb::spin_rw_mutex>::mutex
274                     );
275 
276                     // put to lc level, it will accept and then block at m
277                     for ( size_t c = 0 ; c < lc ; ++c ) {
278                         CHECK_MESSAGE( exe_node.try_put( InputType() ) == true, "" );
279                     }
280                     // it only accepts to lc level
281                     CHECK_MESSAGE( exe_node.try_put( InputType() ) == false, "" );
282 
283                     senders.clear();
284                     for (size_t s = 0; s < num_senders; ++s ) {
285                         senders.push_back( std::make_shared<harness_counting_sender<InputType>>() );
286                         senders.back()->my_limit = N;
287                         exe_node.register_predecessor( *senders.back() );
288                     }
289 
290                 } // release lock at end of scope, setting the exe node free to continue
291                 // wait for graph to settle down
292                 g.wait_for_all();
293 
294                 // confirm that each sender was requested from N times
295                 for (size_t s = 0; s < num_senders; ++s ) {
296                     size_t n = senders[s]->my_received;
297                     CHECK_MESSAGE( n == N, "" );
298                     CHECK_MESSAGE( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_node, "" );
299                 }
300                 // confirm that each receivers got N * num_senders + the initial lc puts
301                 for (size_t r = 0; r < num_receivers; ++r ) {
302                     size_t n = receivers[r]->my_count;
303                     CHECK_MESSAGE( n == num_senders*N+lc, "" );
304                     receivers[r]->my_count = 0;
305                 }
306             }
307             for (size_t r = 0; r < num_receivers; ++r ) {
308                 tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] );
309             }
310             CHECK_MESSAGE( exe_node.try_put( InputType() ) == true, "" );
311             g.wait_for_all();
312             for (size_t r = 0; r < num_receivers; ++r ) {
313                 CHECK_MESSAGE( int(receivers[r]->my_count) == 0, "" );
314             }
315         }
316     }
317 }
318 
319 template< typename InputType, typename OutputTuple >
320 void run_concurrency_levels( int c ) {
321     typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type;
322     concurrency_levels<InputType,OutputTuple>( c, []( InputType i, output_ports_type &p ) { harness_graph_multifunction_executor<InputType, OutputTuple>::template tfunc<tbb::spin_rw_mutex>(i,p); } );
323     concurrency_levels<InputType,OutputTuple>( c, &harness_graph_multifunction_executor<InputType, OutputTuple>::template tfunc<tbb::spin_rw_mutex> );
324     concurrency_levels<InputType,OutputTuple>( c, typename harness_graph_multifunction_executor<InputType, OutputTuple>::template tfunctor<tbb::spin_rw_mutex>() );
325 }
326 
327 
328 struct empty_no_assign {
329    empty_no_assign() {}
330    empty_no_assign( int ) {}
331    operator int() { return 0; }
332    operator int() const { return 0; }
333 };
334 
335 template< typename InputType >
336 struct parallel_puts : private utils::NoAssign {
337 
338     tbb::flow::receiver< InputType > * const my_exe_node;
339 
340     parallel_puts( tbb::flow::receiver< InputType > &exe_node ) : my_exe_node(&exe_node) {}
341 
342     void operator()( int ) const  {
343         for ( int i = 0; i < N; ++i ) {
344             // the nodes will accept all puts
345             CHECK_MESSAGE( my_exe_node->try_put( InputType() ) == true, "" );
346         }
347     }
348 
349 };
350 
351 //! Performs test on executable nodes with unlimited concurrency
352 /** These tests check:
353     1) that the nodes will accept all puts
354     2) the nodes will receive puts from multiple predecessors simultaneously,
355     and 3) the nodes will send to multiple successors.
356     There is no checking of the contents of the messages for corruption.
357 */
358 
359 template< typename InputType, typename OutputTuple, typename Body >
360 void unlimited_concurrency( Body body ) {
361     typedef typename std::tuple_element<0,OutputTuple>::type OutputType;
362 
363     for (unsigned int p = 1; p < 2*utils::MaxThread; ++p) {
364         tbb::flow::graph g;
365         tbb::flow::multifunction_node< InputType, OutputTuple, tbb::flow::rejecting > exe_node( g, tbb::flow::unlimited, body );
366 
367         for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
368             std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers;
369             for (size_t i = 0; i < num_receivers; ++i) {
370                 receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) );
371             }
372 
373             harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count = 0;
374 
375             for (size_t r = 0; r < num_receivers; ++r ) {
376                 tbb::flow::make_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] );
377             }
378 
379             utils::NativeParallelFor( p, parallel_puts<InputType>(exe_node) );
380             g.wait_for_all();
381 
382             // 2) the nodes will receive puts from multiple predecessors simultaneously,
383             size_t ec = harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count;
384             CHECK_MESSAGE( (unsigned int)ec == p*N, "" );
385             for (size_t r = 0; r < num_receivers; ++r ) {
386                 size_t c = receivers[r]->my_count;
387                 // 3) the nodes will send to multiple successors.
388                 CHECK_MESSAGE( (unsigned int)c == p*N, "" );
389             }
390             for (size_t r = 0; r < num_receivers; ++r ) {
391                 tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] );
392             }
393         }
394     }
395 }
396 
397 template< typename InputType, typename OutputTuple >
398 void run_unlimited_concurrency() {
399     harness_graph_multifunction_executor<InputType, OutputTuple>::max_executors = 0;
400     typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type;
401     unlimited_concurrency<InputType,OutputTuple>( []( InputType i, output_ports_type &p ) { harness_graph_multifunction_executor<InputType, OutputTuple>::func(i,p); } );
402     unlimited_concurrency<InputType,OutputTuple>( &harness_graph_multifunction_executor<InputType, OutputTuple>::func );
403     unlimited_concurrency<InputType,OutputTuple>( typename harness_graph_multifunction_executor<InputType, OutputTuple>::functor() );
404 }
405 
406 template<typename InputType, typename OutputTuple>
407 struct oddEvenBody {
408     typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type;
409     typedef typename std::tuple_element<0,OutputTuple>::type EvenType;
410     typedef typename std::tuple_element<1,OutputTuple>::type OddType;
411     void operator() (const InputType &i, output_ports_type &p) {
412         if((int)i % 2) {
413             (void)std::get<1>(p).try_put(OddType(i));
414         }
415         else {
416             (void)std::get<0>(p).try_put(EvenType(i));
417         }
418     }
419 };
420 
421 template<typename InputType, typename OutputTuple >
422 void run_multiport_test(int num_threads) {
423     typedef typename tbb::flow::multifunction_node<InputType, OutputTuple> mo_node_type;
424     typedef typename std::tuple_element<0,OutputTuple>::type EvenType;
425     typedef typename std::tuple_element<1,OutputTuple>::type OddType;
426     tbb::task_arena arena(num_threads);
427     arena.execute(
428         [&] () {
429             tbb::flow::graph g;
430             mo_node_type mo_node(g, tbb::flow::unlimited, oddEvenBody<InputType, OutputTuple>() );
431 
432             tbb::flow::queue_node<EvenType> q0(g);
433             tbb::flow::queue_node<OddType> q1(g);
434 
435             tbb::flow::make_edge(tbb::flow::output_port<0>(mo_node), q0);
436             tbb::flow::make_edge(tbb::flow::output_port<1>(mo_node), q1);
437 
438             for(InputType i = 0; i < N; ++i) {
439                 mo_node.try_put(i);
440             }
441 
442             g.wait_for_all();
443             for(int i = 0; i < N/2; ++i) {
444                 EvenType e{};
445                 OddType o{};
446                 CHECK_MESSAGE( q0.try_get(e), "" );
447                 CHECK_MESSAGE( (int)e % 2 == 0, "" );
448                 CHECK_MESSAGE( q1.try_get(o), "" );
449                 CHECK_MESSAGE( (int)o % 2 == 1, "" );
450             }
451         }
452     );
453 }
454 
455 //! Tests limited concurrency cases for nodes that accept data messages
456 void test_concurrency(int num_threads) {
457     tbb::task_arena arena(num_threads);
458     arena.execute(
459         [&] () {
460             run_concurrency_levels<int,std::tuple<int> >(num_threads);
461             run_concurrency_levels<int,std::tuple<tbb::flow::continue_msg> >(num_threads);
462             run_buffered_levels<int, std::tuple<int> >(num_threads);
463             run_unlimited_concurrency<int, std::tuple<int> >();
464             run_unlimited_concurrency<int,std::tuple<empty_no_assign> >();
465             run_unlimited_concurrency<empty_no_assign,std::tuple<int> >();
466             run_unlimited_concurrency<empty_no_assign,std::tuple<empty_no_assign> >();
467             run_unlimited_concurrency<int,std::tuple<tbb::flow::continue_msg> >();
468             run_unlimited_concurrency<empty_no_assign,std::tuple<tbb::flow::continue_msg> >();
469             run_multiport_test<int, std::tuple<int, int> >(num_threads);
470             run_multiport_test<float, std::tuple<int, double> >(num_threads);
471         }
472     );
473 }
474 
475 template<typename Policy>
476 void test_ports_return_references() {
477     tbb::flow::graph g;
478     typedef int InputType;
479     typedef std::tuple<int> OutputTuple;
480     tbb::flow::multifunction_node<InputType, OutputTuple, Policy> mf_node(
481         g, tbb::flow::unlimited,
482         &harness_graph_multifunction_executor<InputType, OutputTuple>::empty_func );
483     test_output_ports_return_ref(mf_node);
484 }
485 
486 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
487 #include <array>
488 #include <vector>
489 
490 void test_precedes() {
491     using namespace tbb::flow;
492 
493     using multinode = multifunction_node<int, std::tuple<int, int>>;
494 
495     graph g;
496 
497     buffer_node<int> b1(g);
498     buffer_node<int> b2(g);
499 
500     multinode node(precedes(b1, b2), unlimited, [](const int& i, multinode::output_ports_type& op) -> void {
501             if (i % 2)
502                 std::get<0>(op).try_put(i);
503             else
504                 std::get<1>(op).try_put(i);
505         }
506     );
507 
508     node.try_put(0);
509     node.try_put(1);
510     g.wait_for_all();
511 
512     int storage;
513     CHECK_MESSAGE((b1.try_get(storage) && !b1.try_get(storage) && b2.try_get(storage) && !b2.try_get(storage)),
514             "Not exact edge quantity was made");
515 }
516 
517 void test_follows_and_precedes_api() {
518     using multinode = tbb::flow::multifunction_node<int, std::tuple<int, int, int>>;
519 
520     std::array<int, 3> messages_for_follows = { {0, 1, 2} };
521 
522     follows_and_precedes_testing::test_follows
523         <int, tbb::flow::multifunction_node<int, std::tuple<int, int, int>>>
524         (messages_for_follows, tbb::flow::unlimited, [](const int& i, multinode::output_ports_type& op) -> void {
525             std::get<0>(op).try_put(i);
526         });
527 
528     test_precedes();
529 }
530 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
531 
532 //! Test various node bodies with concurrency
533 //! \brief \ref error_guessing
534 TEST_CASE("Concurrency test"){
535     for( unsigned int p=utils::MinThread; p<=utils::MaxThread; ++p ) {
536        test_concurrency(p);
537     }
538 }
539 
540 //! Test return types of ports
541 //! \brief \ref error_guessing
542 TEST_CASE("Test ports retrurn references"){
543     test_ports_return_references<tbb::flow::queueing>();
544     test_ports_return_references<tbb::flow::rejecting>();
545 }
546 
547 //! NativeParallelFor testing with various concurrency settings
548 //! \brief \ref error_guessing
549 TEST_CASE("Lightweight testing"){
550     lightweight_testing::test<tbb::flow::multifunction_node>(10);
551 }
552 
553 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
554 //! Test follows and precedes API
555 //! \brief \ref error_guessing
556 TEST_CASE("Test follows-precedes API"){
557     test_follows_and_precedes_api();
558 }
559 //! Test priority constructor with follows and precedes API
560 //! \brief \ref error_guessing
561 TEST_CASE("Test priority with follows and precedes"){
562     using namespace tbb::flow;
563 
564     using multinode = multifunction_node<int, std::tuple<int, int>>;
565 
566     graph g;
567 
568     buffer_node<int> b1(g);
569     buffer_node<int> b2(g);
570 
571     multinode node(precedes(b1, b2), unlimited, [](const int& i, multinode::output_ports_type& op) -> void {
572             if (i % 2)
573                 std::get<0>(op).try_put(i);
574             else
575                 std::get<1>(op).try_put(i);
576         }
577         , node_priority_t(0));
578 
579     node.try_put(0);
580     node.try_put(1);
581     g.wait_for_all();
582 
583     int storage;
584     CHECK_MESSAGE((b1.try_get(storage) && !b1.try_get(storage) && b2.try_get(storage) && !b2.try_get(storage)),
585             "Not exact edge quantity was made");
586 }
587 
588 #endif
589 
590