1 /*
2     Copyright (c) 2005-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #include "common/config.h"
18 
19 // TODO revamp: move parts dependent on __TBB_EXTRA_DEBUG into separate test(s) since having these
20 // parts in all of tests might make testing of the product, which is different from what is actually
21 // released.
22 #define __TBB_EXTRA_DEBUG 1
23 #include "tbb/flow_graph.h"
24 #include "tbb/spin_rw_mutex.h"
25 
26 #include "common/test.h"
27 #include "common/utils.h"
28 #include "common/graph_utils.h"
29 #include "common/test_follows_and_precedes_api.h"
30 
31 
32 //! \file test_multifunction_node.cpp
33 //! \brief Test for [flow_graph.multifunction_node] specification
34 
35 
36 #if TBB_USE_DEBUG
37 #define N 16
38 #else
39 #define N 100
40 #endif
41 #define MAX_NODES 4
42 
43 //! Performs test on function nodes with limited concurrency and buffering
44 /** These tests check:
45     1) that the number of executing copies never exceed the concurrency limit
46     2) that the node never rejects
47     3) that no items are lost
48     and 4) all of this happens even if there are multiple predecessors and successors
49 */
50 
51 //! exercise buffered multifunction_node.
52 template< typename InputType, typename OutputTuple, typename Body >
53 void buffered_levels( size_t concurrency, Body body ) {
54     typedef typename std::tuple_element<0,OutputTuple>::type OutputType;
55     // Do for lc = 1 to concurrency level
56     for ( size_t lc = 1; lc <= concurrency; ++lc ) {
57         tbb::flow::graph g;
58 
59         // Set the execute_counter back to zero in the harness
60         harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count = 0;
61         // Set the number of current executors to zero.
62         harness_graph_multifunction_executor<InputType, OutputTuple>::current_executors = 0;
63         // Set the max allowed executors to lc.  There is a check in the functor to make sure this is never exceeded.
64         harness_graph_multifunction_executor<InputType, OutputTuple>::max_executors = lc;
65 
66         // Create the function_node with the appropriate concurrency level, and use default buffering
67         tbb::flow::multifunction_node< InputType, OutputTuple > exe_node( g, lc, body );
68 
69         //Create a vector of identical exe_nodes
70         std::vector< tbb::flow::multifunction_node< InputType, OutputTuple > > exe_vec(2, exe_node);
71 
72         // exercise each of the copied nodes
73         for (size_t node_idx=0; node_idx<exe_vec.size(); ++node_idx) {
74             for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
75                 // Create num_receivers counting receivers and connect the exe_vec[node_idx] to them.
76                 std::vector< std::shared_ptr<harness_mapped_receiver<OutputType>> > receivers;
77                 for (size_t i = 0; i < num_receivers; i++) {
78                     receivers.push_back( std::make_shared<harness_mapped_receiver<OutputType>>(g) );
79                 }
80 
81                 for (size_t r = 0; r < num_receivers; ++r ) {
82                     tbb::flow::make_edge( tbb::flow::output_port<0>(exe_vec[node_idx]), *receivers[r] );
83                 }
84 
85                 // Do the test with varying numbers of senders
86                 std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders;
87                 for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) {
88                     // Create num_senders senders, set their message limit each to N, and connect
89                     // them to the exe_vec[node_idx]
90                     senders.clear();
91                     for (size_t s = 0; s < num_senders; ++s ) {
92                         senders.push_back( std::make_shared<harness_counting_sender<InputType>>() );
93                         senders.back()->my_limit = N;
94                         tbb::flow::make_edge( *senders.back(), exe_vec[node_idx] );
95                     }
96 
97                     // Initialize the receivers so they know how many senders and messages to check for
98                     for (size_t r = 0; r < num_receivers; ++r ) {
99                         receivers[r]->initialize_map( N, num_senders );
100                     }
101 
102                     // Do the test
103                     utils::NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) );
104                     g.wait_for_all();
105 
106                     // confirm that each sender was requested from N times
107                     for (size_t s = 0; s < num_senders; ++s ) {
108                         size_t n = senders[s]->my_received;
109                         CHECK_MESSAGE( n == N, "" );
110                         CHECK_MESSAGE( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_vec[node_idx], "" );
111                     }
112                     // validate the receivers
113                     for (size_t r = 0; r < num_receivers; ++r ) {
114                         receivers[r]->validate();
115                     }
116                 }
117                 for (size_t r = 0; r < num_receivers; ++r ) {
118                     tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_vec[node_idx]), *receivers[r] );
119                 }
120                 CHECK_MESSAGE( exe_vec[node_idx].try_put( InputType() ) == true, "" );
121                 g.wait_for_all();
122                 for (size_t r = 0; r < num_receivers; ++r ) {
123                     // since it's detached, nothing should have changed
124                     receivers[r]->validate();
125                 }
126             }
127         }
128     }
129 }
130 
131 const size_t Offset = 123;
132 std::atomic<size_t> global_execute_count;
133 
134 struct inc_functor {
135 
136     std::atomic<size_t> local_execute_count;
137     inc_functor( ) { local_execute_count = 0; }
138     inc_functor( const inc_functor &f ) { local_execute_count = size_t(f.local_execute_count); }
139 
140     template<typename output_ports_type>
141     void operator()( int i, output_ports_type &p ) {
142        ++global_execute_count;
143        ++local_execute_count;
144        (void)std::get<0>(p).try_put(i);
145     }
146 
147 };
148 
149 template< typename InputType, typename OutputTuple >
150 void buffered_levels_with_copy( size_t concurrency ) {
151     typedef typename std::tuple_element<0,OutputTuple>::type OutputType;
152     // Do for lc = 1 to concurrency level
153     for ( size_t lc = 1; lc <= concurrency; ++lc ) {
154         tbb::flow::graph g;
155 
156         inc_functor cf;
157         cf.local_execute_count = Offset;
158         global_execute_count = Offset;
159 
160         tbb::flow::multifunction_node< InputType, OutputTuple > exe_node( g, lc, cf );
161 
162         for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
163 
164             std::vector< std::shared_ptr<harness_mapped_receiver<OutputType>> > receivers;
165             for (size_t i = 0; i < num_receivers; i++) {
166                 receivers.push_back( std::make_shared<harness_mapped_receiver<OutputType>>(g) );
167             }
168 
169             for (size_t r = 0; r < num_receivers; ++r ) {
170                tbb::flow::make_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] );
171             }
172 
173             std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders;
174             for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) {
175                 senders.clear();
176                 for (size_t s = 0; s < num_senders; ++s ) {
177                     senders.push_back( std::make_shared<harness_counting_sender<InputType>>() );
178                     senders.back()->my_limit = N;
179                     tbb::flow::make_edge( *senders.back(), exe_node );
180                 }
181 
182                 for (size_t r = 0; r < num_receivers; ++r ) {
183                     receivers[r]->initialize_map( N, num_senders );
184                 }
185 
186                 utils::NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) );
187                 g.wait_for_all();
188 
189                 for (size_t s = 0; s < num_senders; ++s ) {
190                     size_t n = senders[s]->my_received;
191                     CHECK_MESSAGE( n == N, "" );
192                     CHECK_MESSAGE( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_node, "" );
193                 }
194                 for (size_t r = 0; r < num_receivers; ++r ) {
195                     receivers[r]->validate();
196                 }
197             }
198             for (size_t r = 0; r < num_receivers; ++r ) {
199                 tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] );
200             }
201             CHECK_MESSAGE( exe_node.try_put( InputType() ) == true, "" );
202             g.wait_for_all();
203             for (size_t r = 0; r < num_receivers; ++r ) {
204                 receivers[r]->validate();
205             }
206         }
207 
208         // validate that the local body matches the global execute_count and both are correct
209         inc_functor body_copy = tbb::flow::copy_body<inc_functor>( exe_node );
210         const size_t expected_count = N/2 * MAX_NODES * MAX_NODES * ( MAX_NODES + 1 ) + MAX_NODES + Offset;
211         size_t global_count = global_execute_count;
212         size_t inc_count = body_copy.local_execute_count;
213         CHECK_MESSAGE( (global_count == expected_count && global_count == inc_count), "" );
214     }
215 }
216 
217 template< typename InputType, typename OutputTuple >
218 void run_buffered_levels( int c ) {
219     typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type;
220     buffered_levels<InputType,OutputTuple>( c, []( InputType i, output_ports_type &p ) { harness_graph_multifunction_executor<InputType, OutputTuple>::func(i,p); } );
221     buffered_levels<InputType,OutputTuple>( c, &harness_graph_multifunction_executor<InputType, OutputTuple>::func );
222     buffered_levels<InputType,OutputTuple>( c, typename harness_graph_multifunction_executor<InputType, OutputTuple>::functor() );
223     buffered_levels_with_copy<InputType,OutputTuple>( c );
224 }
225 
226 
227 //! Performs test on executable nodes with limited concurrency
228 /** These tests check:
229     1) that the nodes will accepts puts up to the concurrency limit,
230     2) the nodes do not exceed the concurrency limit even when run with more threads (this is checked in the harness_graph_executor),
231     3) the nodes will receive puts from multiple successors simultaneously,
232     and 4) the nodes will send to multiple predecessors.
233     There is no checking of the contents of the messages for corruption.
234 */
235 
236 template< typename InputType, typename OutputTuple, typename Body >
237 void concurrency_levels( size_t concurrency, Body body ) {
238     typedef typename std::tuple_element<0,OutputTuple>::type OutputType;
239     for ( size_t lc = 1; lc <= concurrency; ++lc ) {
240         tbb::flow::graph g;
241 
242         // Set the execute_counter back to zero in the harness
243         harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count = 0;
244         // Set the number of current executors to zero.
245         harness_graph_multifunction_executor<InputType, OutputTuple>::current_executors = 0;
246         // Set the max allowed executors to lc.  There is a check in the functor to make sure this is never exceeded.
247         harness_graph_multifunction_executor<InputType, OutputTuple>::max_executors = lc;
248 
249 
250         tbb::flow::multifunction_node< InputType, OutputTuple, tbb::flow::rejecting > exe_node( g, lc, body );
251 
252         for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
253 
254             std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers;
255             for (size_t i = 0; i < num_receivers; ++i) {
256                 receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) );
257             }
258 
259             for (size_t r = 0; r < num_receivers; ++r ) {
260                 tbb::flow::make_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] );
261             }
262 
263             std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders;
264 
265             for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) {
266                 {
267                     // Exclusively lock m to prevent exe_node from finishing
268                     tbb::spin_rw_mutex::scoped_lock l(
269                         harness_graph_multifunction_executor< InputType, OutputTuple>::template mutex_holder<tbb::spin_rw_mutex>::mutex
270                     );
271 
272                     // put to lc level, it will accept and then block at m
273                     for ( size_t c = 0 ; c < lc ; ++c ) {
274                         CHECK_MESSAGE( exe_node.try_put( InputType() ) == true, "" );
275                     }
276                     // it only accepts to lc level
277                     CHECK_MESSAGE( exe_node.try_put( InputType() ) == false, "" );
278 
279                     senders.clear();
280                     for (size_t s = 0; s < num_senders; ++s ) {
281                         senders.push_back( std::make_shared<harness_counting_sender<InputType>>() );
282                         senders.back()->my_limit = N;
283                         exe_node.register_predecessor( *senders.back() );
284                     }
285 
286                 } // release lock at end of scope, setting the exe node free to continue
287                 // wait for graph to settle down
288                 g.wait_for_all();
289 
290                 // confirm that each sender was requested from N times
291                 for (size_t s = 0; s < num_senders; ++s ) {
292                     size_t n = senders[s]->my_received;
293                     CHECK_MESSAGE( n == N, "" );
294                     CHECK_MESSAGE( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_node, "" );
295                 }
296                 // confirm that each receivers got N * num_senders + the initial lc puts
297                 for (size_t r = 0; r < num_receivers; ++r ) {
298                     size_t n = receivers[r]->my_count;
299                     CHECK_MESSAGE( n == num_senders*N+lc, "" );
300                     receivers[r]->my_count = 0;
301                 }
302             }
303             for (size_t r = 0; r < num_receivers; ++r ) {
304                 tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] );
305             }
306             CHECK_MESSAGE( exe_node.try_put( InputType() ) == true, "" );
307             g.wait_for_all();
308             for (size_t r = 0; r < num_receivers; ++r ) {
309                 CHECK_MESSAGE( int(receivers[r]->my_count) == 0, "" );
310             }
311         }
312     }
313 }
314 
315 template< typename InputType, typename OutputTuple >
316 void run_concurrency_levels( int c ) {
317     typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type;
318     concurrency_levels<InputType,OutputTuple>( c, []( InputType i, output_ports_type &p ) { harness_graph_multifunction_executor<InputType, OutputTuple>::template tfunc<tbb::spin_rw_mutex>(i,p); } );
319     concurrency_levels<InputType,OutputTuple>( c, &harness_graph_multifunction_executor<InputType, OutputTuple>::template tfunc<tbb::spin_rw_mutex> );
320     concurrency_levels<InputType,OutputTuple>( c, typename harness_graph_multifunction_executor<InputType, OutputTuple>::template tfunctor<tbb::spin_rw_mutex>() );
321 }
322 
323 
324 struct empty_no_assign {
325    empty_no_assign() {}
326    empty_no_assign( int ) {}
327    operator int() { return 0; }
328    operator int() const { return 0; }
329 };
330 
331 template< typename InputType >
332 struct parallel_puts : private utils::NoAssign {
333 
334     tbb::flow::receiver< InputType > * const my_exe_node;
335 
336     parallel_puts( tbb::flow::receiver< InputType > &exe_node ) : my_exe_node(&exe_node) {}
337 
338     void operator()( int ) const  {
339         for ( int i = 0; i < N; ++i ) {
340             // the nodes will accept all puts
341             CHECK_MESSAGE( my_exe_node->try_put( InputType() ) == true, "" );
342         }
343     }
344 
345 };
346 
347 //! Performs test on executable nodes with unlimited concurrency
348 /** These tests check:
349     1) that the nodes will accept all puts
350     2) the nodes will receive puts from multiple predecessors simultaneously,
351     and 3) the nodes will send to multiple successors.
352     There is no checking of the contents of the messages for corruption.
353 */
354 
355 template< typename InputType, typename OutputTuple, typename Body >
356 void unlimited_concurrency( Body body ) {
357     typedef typename std::tuple_element<0,OutputTuple>::type OutputType;
358 
359     for (unsigned int p = 1; p < 2*utils::MaxThread; ++p) {
360         tbb::flow::graph g;
361         tbb::flow::multifunction_node< InputType, OutputTuple, tbb::flow::rejecting > exe_node( g, tbb::flow::unlimited, body );
362 
363         for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
364             std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers;
365             for (size_t i = 0; i < num_receivers; ++i) {
366                 receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) );
367             }
368 
369             harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count = 0;
370 
371             for (size_t r = 0; r < num_receivers; ++r ) {
372                 tbb::flow::make_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] );
373             }
374 
375             utils::NativeParallelFor( p, parallel_puts<InputType>(exe_node) );
376             g.wait_for_all();
377 
378             // 2) the nodes will receive puts from multiple predecessors simultaneously,
379             size_t ec = harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count;
380             CHECK_MESSAGE( (unsigned int)ec == p*N, "" );
381             for (size_t r = 0; r < num_receivers; ++r ) {
382                 size_t c = receivers[r]->my_count;
383                 // 3) the nodes will send to multiple successors.
384                 CHECK_MESSAGE( (unsigned int)c == p*N, "" );
385             }
386             for (size_t r = 0; r < num_receivers; ++r ) {
387                 tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] );
388             }
389         }
390     }
391 }
392 
393 template< typename InputType, typename OutputTuple >
394 void run_unlimited_concurrency() {
395     harness_graph_multifunction_executor<InputType, OutputTuple>::max_executors = 0;
396     typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type;
397     unlimited_concurrency<InputType,OutputTuple>( []( InputType i, output_ports_type &p ) { harness_graph_multifunction_executor<InputType, OutputTuple>::func(i,p); } );
398     unlimited_concurrency<InputType,OutputTuple>( &harness_graph_multifunction_executor<InputType, OutputTuple>::func );
399     unlimited_concurrency<InputType,OutputTuple>( typename harness_graph_multifunction_executor<InputType, OutputTuple>::functor() );
400 }
401 
402 template<typename InputType, typename OutputTuple>
403 struct oddEvenBody {
404     typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type;
405     typedef typename std::tuple_element<0,OutputTuple>::type EvenType;
406     typedef typename std::tuple_element<1,OutputTuple>::type OddType;
407     void operator() (const InputType &i, output_ports_type &p) {
408         if((int)i % 2) {
409             (void)std::get<1>(p).try_put(OddType(i));
410         }
411         else {
412             (void)std::get<0>(p).try_put(EvenType(i));
413         }
414     }
415 };
416 
417 template<typename InputType, typename OutputTuple >
418 void run_multiport_test(int num_threads) {
419     typedef typename tbb::flow::multifunction_node<InputType, OutputTuple> mo_node_type;
420     typedef typename std::tuple_element<0,OutputTuple>::type EvenType;
421     typedef typename std::tuple_element<1,OutputTuple>::type OddType;
422     tbb::task_arena arena(num_threads);
423     arena.execute(
424         [&] () {
425             tbb::flow::graph g;
426             mo_node_type mo_node(g, tbb::flow::unlimited, oddEvenBody<InputType, OutputTuple>() );
427 
428             tbb::flow::queue_node<EvenType> q0(g);
429             tbb::flow::queue_node<OddType> q1(g);
430 
431             tbb::flow::make_edge(tbb::flow::output_port<0>(mo_node), q0);
432             tbb::flow::make_edge(tbb::flow::output_port<1>(mo_node), q1);
433 
434             for(InputType i = 0; i < N; ++i) {
435                 mo_node.try_put(i);
436             }
437 
438             g.wait_for_all();
439             for(int i = 0; i < N/2; ++i) {
440                 EvenType e{};
441                 OddType o{};
442                 CHECK_MESSAGE( q0.try_get(e), "" );
443                 CHECK_MESSAGE( (int)e % 2 == 0, "" );
444                 CHECK_MESSAGE( q1.try_get(o), "" );
445                 CHECK_MESSAGE( (int)o % 2 == 1, "" );
446             }
447         }
448     );
449 }
450 
451 //! Tests limited concurrency cases for nodes that accept data messages
452 void test_concurrency(int num_threads) {
453     tbb::task_arena arena(num_threads);
454     arena.execute(
455         [&] () {
456             run_concurrency_levels<int,std::tuple<int> >(num_threads);
457             run_concurrency_levels<int,std::tuple<tbb::flow::continue_msg> >(num_threads);
458             run_buffered_levels<int, std::tuple<int> >(num_threads);
459             run_unlimited_concurrency<int, std::tuple<int> >();
460             run_unlimited_concurrency<int,std::tuple<empty_no_assign> >();
461             run_unlimited_concurrency<empty_no_assign,std::tuple<int> >();
462             run_unlimited_concurrency<empty_no_assign,std::tuple<empty_no_assign> >();
463             run_unlimited_concurrency<int,std::tuple<tbb::flow::continue_msg> >();
464             run_unlimited_concurrency<empty_no_assign,std::tuple<tbb::flow::continue_msg> >();
465             run_multiport_test<int, std::tuple<int, int> >(num_threads);
466             run_multiport_test<float, std::tuple<int, double> >(num_threads);
467         }
468     );
469 }
470 
471 template<typename Policy>
472 void test_ports_return_references() {
473     tbb::flow::graph g;
474     typedef int InputType;
475     typedef std::tuple<int> OutputTuple;
476     tbb::flow::multifunction_node<InputType, OutputTuple, Policy> mf_node(
477         g, tbb::flow::unlimited,
478         &harness_graph_multifunction_executor<InputType, OutputTuple>::empty_func );
479     test_output_ports_return_ref(mf_node);
480 }
481 
482 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
483 #include <array>
484 #include <vector>
485 
486 void test_precedes() {
487     using namespace tbb::flow;
488 
489     using multinode = multifunction_node<int, std::tuple<int, int>>;
490 
491     graph g;
492 
493     buffer_node<int> b1(g);
494     buffer_node<int> b2(g);
495 
496     multinode node(precedes(b1, b2), unlimited, [](const int& i, multinode::output_ports_type& op) -> void {
497             if (i % 2)
498                 std::get<0>(op).try_put(i);
499             else
500                 std::get<1>(op).try_put(i);
501         }
502     );
503 
504     node.try_put(0);
505     node.try_put(1);
506     g.wait_for_all();
507 
508     int storage;
509     CHECK_MESSAGE((b1.try_get(storage) && !b1.try_get(storage) && b2.try_get(storage) && !b2.try_get(storage)),
510             "Not exact edge quantity was made");
511 }
512 
513 void test_follows_and_precedes_api() {
514     using multinode = tbb::flow::multifunction_node<int, std::tuple<int, int, int>>;
515 
516     std::array<int, 3> messages_for_follows = { {0, 1, 2} };
517 
518     follows_and_precedes_testing::test_follows
519         <int, tbb::flow::multifunction_node<int, std::tuple<int, int, int>>>
520         (messages_for_follows, tbb::flow::unlimited, [](const int& i, multinode::output_ports_type& op) -> void {
521             std::get<0>(op).try_put(i);
522         });
523 
524     test_precedes();
525 }
526 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
527 
528 //! Test various node bodies with concurrency
529 //! \brief \ref error_guessing
530 TEST_CASE("Concurrency test"){
531     for( unsigned int p=utils::MinThread; p<=utils::MaxThread; ++p ) {
532        test_concurrency(p);
533     }
534 }
535 
536 //! Test return types of ports
537 //! \brief \ref error_guessing
538 TEST_CASE("Test ports retrurn references"){
539     test_ports_return_references<tbb::flow::queueing>();
540     test_ports_return_references<tbb::flow::rejecting>();
541 }
542 
543 //! NativeParallelFor testing with various concurrency settings
544 //! \brief \ref error_guessing
545 TEST_CASE("Lightweight testing"){
546     lightweight_testing::test<tbb::flow::multifunction_node>(10);
547 }
548 
549 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
550 //! Test follows and precedes API
551 //! \brief \ref error_guessing
552 TEST_CASE("Test follows-precedes API"){
553     test_follows_and_precedes_api();
554 }
555 //! Test priority constructor with follows and precedes API
556 //! \brief \ref error_guessing
557 TEST_CASE("Test priority with follows and precedes"){
558     using namespace tbb::flow;
559 
560     using multinode = multifunction_node<int, std::tuple<int, int>>;
561 
562     graph g;
563 
564     buffer_node<int> b1(g);
565     buffer_node<int> b2(g);
566 
567     multinode node(precedes(b1, b2), unlimited, [](const int& i, multinode::output_ports_type& op) -> void {
568             if (i % 2)
569                 std::get<0>(op).try_put(i);
570             else
571                 std::get<1>(op).try_put(i);
572         }
573         , node_priority_t(0));
574 
575     node.try_put(0);
576     node.try_put(1);
577     g.wait_for_all();
578 
579     int storage;
580     CHECK_MESSAGE((b1.try_get(storage) && !b1.try_get(storage) && b2.try_get(storage) && !b2.try_get(storage)),
581             "Not exact edge quantity was made");
582 }
583 
584 #endif
585 
586