1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #if __INTEL_COMPILER && _MSC_VER
18 #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
19 #endif
20 
21 #include "common/config.h"
22 
23 #include "tbb/flow_graph.h"
24 #include "tbb/spin_rw_mutex.h"
25 
26 #include "common/test.h"
27 #include "common/utils.h"
28 #include "common/graph_utils.h"
29 #include "common/test_follows_and_precedes_api.h"
30 #include "common/concepts_common.h"
31 
32 
33 //! \file test_multifunction_node.cpp
34 //! \brief Test for [flow_graph.multifunction_node] specification
35 
36 
37 #if TBB_USE_DEBUG
38 #define N 16
39 #else
40 #define N 100
41 #endif
42 #define MAX_NODES 4
43 
44 //! Performs test on function nodes with limited concurrency and buffering
45 /** These tests check:
46     1) that the number of executing copies never exceed the concurrency limit
47     2) that the node never rejects
48     3) that no items are lost
49     and 4) all of this happens even if there are multiple predecessors and successors
50 */
51 
52 //! exercise buffered multifunction_node.
53 template< typename InputType, typename OutputTuple, typename Body >
54 void buffered_levels( size_t concurrency, Body body ) {
55     typedef typename std::tuple_element<0,OutputTuple>::type OutputType;
56     // Do for lc = 1 to concurrency level
57     for ( size_t lc = 1; lc <= concurrency; ++lc ) {
58         tbb::flow::graph g;
59 
60         // Set the execute_counter back to zero in the harness
61         harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count = 0;
62         // Set the number of current executors to zero.
63         harness_graph_multifunction_executor<InputType, OutputTuple>::current_executors = 0;
64         // Set the max allowed executors to lc.  There is a check in the functor to make sure this is never exceeded.
65         harness_graph_multifunction_executor<InputType, OutputTuple>::max_executors = lc;
66 
67         // Create the function_node with the appropriate concurrency level, and use default buffering
68         tbb::flow::multifunction_node< InputType, OutputTuple > exe_node( g, lc, body );
69 
70         //Create a vector of identical exe_nodes
71         std::vector< tbb::flow::multifunction_node< InputType, OutputTuple > > exe_vec(2, exe_node);
72 
73         // exercise each of the copied nodes
74         for (size_t node_idx=0; node_idx<exe_vec.size(); ++node_idx) {
75             for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
76                 // Create num_receivers counting receivers and connect the exe_vec[node_idx] to them.
77                 std::vector< std::shared_ptr<harness_mapped_receiver<OutputType>> > receivers;
78                 for (size_t i = 0; i < num_receivers; i++) {
79                     receivers.push_back( std::make_shared<harness_mapped_receiver<OutputType>>(g) );
80                 }
81 
82                 for (size_t r = 0; r < num_receivers; ++r ) {
83                     tbb::flow::make_edge( tbb::flow::output_port<0>(exe_vec[node_idx]), *receivers[r] );
84                 }
85 
86                 // Do the test with varying numbers of senders
87                 std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders;
88                 for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) {
89                     // Create num_senders senders, set their message limit each to N, and connect
90                     // them to the exe_vec[node_idx]
91                     senders.clear();
92                     for (size_t s = 0; s < num_senders; ++s ) {
93                         senders.push_back( std::make_shared<harness_counting_sender<InputType>>() );
94                         senders.back()->my_limit = N;
95                         tbb::flow::make_edge( *senders.back(), exe_vec[node_idx] );
96                     }
97 
98                     // Initialize the receivers so they know how many senders and messages to check for
99                     for (size_t r = 0; r < num_receivers; ++r ) {
100                         receivers[r]->initialize_map( N, num_senders );
101                     }
102 
103                     // Do the test
104                     utils::NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) );
105                     g.wait_for_all();
106 
107                     // confirm that each sender was requested from N times
108                     for (size_t s = 0; s < num_senders; ++s ) {
109                         size_t n = senders[s]->my_received;
110                         CHECK_MESSAGE( n == N, "" );
111                         CHECK_MESSAGE( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_vec[node_idx], "" );
112                     }
113                     // validate the receivers
114                     for (size_t r = 0; r < num_receivers; ++r ) {
115                         receivers[r]->validate();
116                     }
117                 }
118                 for (size_t r = 0; r < num_receivers; ++r ) {
119                     tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_vec[node_idx]), *receivers[r] );
120                 }
121                 CHECK_MESSAGE( exe_vec[node_idx].try_put( InputType() ) == true, "" );
122                 g.wait_for_all();
123                 for (size_t r = 0; r < num_receivers; ++r ) {
124                     // since it's detached, nothing should have changed
125                     receivers[r]->validate();
126                 }
127             }
128         }
129     }
130 }
131 
132 const size_t Offset = 123;
133 std::atomic<size_t> global_execute_count;
134 
135 struct inc_functor {
136 
137     std::atomic<size_t> local_execute_count;
138     inc_functor( ) { local_execute_count = 0; }
139     inc_functor( const inc_functor &f ) { local_execute_count = size_t(f.local_execute_count); }
140 
141     template<typename output_ports_type>
142     void operator()( int i, output_ports_type &p ) {
143        ++global_execute_count;
144        ++local_execute_count;
145        (void)std::get<0>(p).try_put(i);
146     }
147 
148 };
149 
150 template< typename InputType, typename OutputTuple >
151 void buffered_levels_with_copy( size_t concurrency ) {
152     typedef typename std::tuple_element<0,OutputTuple>::type OutputType;
153     // Do for lc = 1 to concurrency level
154     for ( size_t lc = 1; lc <= concurrency; ++lc ) {
155         tbb::flow::graph g;
156 
157         inc_functor cf;
158         cf.local_execute_count = Offset;
159         global_execute_count = Offset;
160 
161         tbb::flow::multifunction_node< InputType, OutputTuple > exe_node( g, lc, cf );
162 
163         for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
164 
165             std::vector< std::shared_ptr<harness_mapped_receiver<OutputType>> > receivers;
166             for (size_t i = 0; i < num_receivers; i++) {
167                 receivers.push_back( std::make_shared<harness_mapped_receiver<OutputType>>(g) );
168             }
169 
170             for (size_t r = 0; r < num_receivers; ++r ) {
171                tbb::flow::make_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] );
172             }
173 
174             std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders;
175             for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) {
176                 senders.clear();
177                 for (size_t s = 0; s < num_senders; ++s ) {
178                     senders.push_back( std::make_shared<harness_counting_sender<InputType>>() );
179                     senders.back()->my_limit = N;
180                     tbb::flow::make_edge( *senders.back(), exe_node );
181                 }
182 
183                 for (size_t r = 0; r < num_receivers; ++r ) {
184                     receivers[r]->initialize_map( N, num_senders );
185                 }
186 
187                 utils::NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) );
188                 g.wait_for_all();
189 
190                 for (size_t s = 0; s < num_senders; ++s ) {
191                     size_t n = senders[s]->my_received;
192                     CHECK_MESSAGE( n == N, "" );
193                     CHECK_MESSAGE( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_node, "" );
194                 }
195                 for (size_t r = 0; r < num_receivers; ++r ) {
196                     receivers[r]->validate();
197                 }
198             }
199             for (size_t r = 0; r < num_receivers; ++r ) {
200                 tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] );
201             }
202             CHECK_MESSAGE( exe_node.try_put( InputType() ) == true, "" );
203             g.wait_for_all();
204             for (size_t r = 0; r < num_receivers; ++r ) {
205                 receivers[r]->validate();
206             }
207         }
208 
209         // validate that the local body matches the global execute_count and both are correct
210         inc_functor body_copy = tbb::flow::copy_body<inc_functor>( exe_node );
211         const size_t expected_count = N/2 * MAX_NODES * MAX_NODES * ( MAX_NODES + 1 ) + MAX_NODES + Offset;
212         size_t global_count = global_execute_count;
213         size_t inc_count = body_copy.local_execute_count;
214         CHECK_MESSAGE( (global_count == expected_count && global_count == inc_count), "" );
215     }
216 }
217 
218 template< typename InputType, typename OutputTuple >
219 void run_buffered_levels( int c ) {
220     typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type;
221     buffered_levels<InputType,OutputTuple>( c, []( InputType i, output_ports_type &p ) { harness_graph_multifunction_executor<InputType, OutputTuple>::func(i,p); } );
222     buffered_levels<InputType,OutputTuple>( c, &harness_graph_multifunction_executor<InputType, OutputTuple>::func );
223     buffered_levels<InputType,OutputTuple>( c, typename harness_graph_multifunction_executor<InputType, OutputTuple>::functor() );
224     buffered_levels_with_copy<InputType,OutputTuple>( c );
225 }
226 
227 
228 //! Performs test on executable nodes with limited concurrency
229 /** These tests check:
230     1) that the nodes will accepts puts up to the concurrency limit,
231     2) the nodes do not exceed the concurrency limit even when run with more threads (this is checked in the harness_graph_executor),
232     3) the nodes will receive puts from multiple successors simultaneously,
233     and 4) the nodes will send to multiple predecessors.
234     There is no checking of the contents of the messages for corruption.
235 */
236 
237 template< typename InputType, typename OutputTuple, typename Body >
238 void concurrency_levels( size_t concurrency, Body body ) {
239     typedef typename std::tuple_element<0,OutputTuple>::type OutputType;
240     for ( size_t lc = 1; lc <= concurrency; ++lc ) {
241         tbb::flow::graph g;
242 
243         // Set the execute_counter back to zero in the harness
244         harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count = 0;
245         // Set the number of current executors to zero.
246         harness_graph_multifunction_executor<InputType, OutputTuple>::current_executors = 0;
247         // Set the max allowed executors to lc.  There is a check in the functor to make sure this is never exceeded.
248         harness_graph_multifunction_executor<InputType, OutputTuple>::max_executors = lc;
249 
250 
251         tbb::flow::multifunction_node< InputType, OutputTuple, tbb::flow::rejecting > exe_node( g, lc, body );
252 
253         for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
254 
255             std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers;
256             for (size_t i = 0; i < num_receivers; ++i) {
257                 receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) );
258             }
259 
260             for (size_t r = 0; r < num_receivers; ++r ) {
261                 tbb::flow::make_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] );
262             }
263 
264             std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders;
265 
266             for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) {
267                 {
268                     // Exclusively lock m to prevent exe_node from finishing
269                     tbb::spin_rw_mutex::scoped_lock l(
270                         harness_graph_multifunction_executor< InputType, OutputTuple>::template mutex_holder<tbb::spin_rw_mutex>::mutex
271                     );
272 
273                     // put to lc level, it will accept and then block at m
274                     for ( size_t c = 0 ; c < lc ; ++c ) {
275                         CHECK_MESSAGE( exe_node.try_put( InputType() ) == true, "" );
276                     }
277                     // it only accepts to lc level
278                     CHECK_MESSAGE( exe_node.try_put( InputType() ) == false, "" );
279 
280                     senders.clear();
281                     for (size_t s = 0; s < num_senders; ++s ) {
282                         senders.push_back( std::make_shared<harness_counting_sender<InputType>>() );
283                         senders.back()->my_limit = N;
284                         exe_node.register_predecessor( *senders.back() );
285                     }
286 
287                 } // release lock at end of scope, setting the exe node free to continue
288                 // wait for graph to settle down
289                 g.wait_for_all();
290 
291                 // confirm that each sender was requested from N times
292                 for (size_t s = 0; s < num_senders; ++s ) {
293                     size_t n = senders[s]->my_received;
294                     CHECK_MESSAGE( n == N, "" );
295                     CHECK_MESSAGE( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_node, "" );
296                 }
297                 // confirm that each receivers got N * num_senders + the initial lc puts
298                 for (size_t r = 0; r < num_receivers; ++r ) {
299                     size_t n = receivers[r]->my_count;
300                     CHECK_MESSAGE( n == num_senders*N+lc, "" );
301                     receivers[r]->my_count = 0;
302                 }
303             }
304             for (size_t r = 0; r < num_receivers; ++r ) {
305                 tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] );
306             }
307             CHECK_MESSAGE( exe_node.try_put( InputType() ) == true, "" );
308             g.wait_for_all();
309             for (size_t r = 0; r < num_receivers; ++r ) {
310                 CHECK_MESSAGE( int(receivers[r]->my_count) == 0, "" );
311             }
312         }
313     }
314 }
315 
316 template< typename InputType, typename OutputTuple >
317 void run_concurrency_levels( int c ) {
318     typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type;
319     concurrency_levels<InputType,OutputTuple>( c, []( InputType i, output_ports_type &p ) { harness_graph_multifunction_executor<InputType, OutputTuple>::template tfunc<tbb::spin_rw_mutex>(i,p); } );
320     concurrency_levels<InputType,OutputTuple>( c, &harness_graph_multifunction_executor<InputType, OutputTuple>::template tfunc<tbb::spin_rw_mutex> );
321     concurrency_levels<InputType,OutputTuple>( c, typename harness_graph_multifunction_executor<InputType, OutputTuple>::template tfunctor<tbb::spin_rw_mutex>() );
322 }
323 
324 
325 struct empty_no_assign {
326    empty_no_assign() {}
327    empty_no_assign( int ) {}
328    operator int() { return 0; }
329    operator int() const { return 0; }
330 };
331 
332 template< typename InputType >
333 struct parallel_puts : private utils::NoAssign {
334 
335     tbb::flow::receiver< InputType > * const my_exe_node;
336 
337     parallel_puts( tbb::flow::receiver< InputType > &exe_node ) : my_exe_node(&exe_node) {}
338 
339     void operator()( int ) const  {
340         for ( int i = 0; i < N; ++i ) {
341             // the nodes will accept all puts
342             CHECK_MESSAGE( my_exe_node->try_put( InputType() ) == true, "" );
343         }
344     }
345 
346 };
347 
348 //! Performs test on executable nodes with unlimited concurrency
349 /** These tests check:
350     1) that the nodes will accept all puts
351     2) the nodes will receive puts from multiple predecessors simultaneously,
352     and 3) the nodes will send to multiple successors.
353     There is no checking of the contents of the messages for corruption.
354 */
355 
356 template< typename InputType, typename OutputTuple, typename Body >
357 void unlimited_concurrency( Body body ) {
358     typedef typename std::tuple_element<0,OutputTuple>::type OutputType;
359 
360     for (unsigned int p = 1; p < 2*utils::MaxThread; ++p) {
361         tbb::flow::graph g;
362         tbb::flow::multifunction_node< InputType, OutputTuple, tbb::flow::rejecting > exe_node( g, tbb::flow::unlimited, body );
363 
364         for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
365             std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers;
366             for (size_t i = 0; i < num_receivers; ++i) {
367                 receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) );
368             }
369 
370             harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count = 0;
371 
372             for (size_t r = 0; r < num_receivers; ++r ) {
373                 tbb::flow::make_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] );
374             }
375 
376             utils::NativeParallelFor( p, parallel_puts<InputType>(exe_node) );
377             g.wait_for_all();
378 
379             // 2) the nodes will receive puts from multiple predecessors simultaneously,
380             size_t ec = harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count;
381             CHECK_MESSAGE( (unsigned int)ec == p*N, "" );
382             for (size_t r = 0; r < num_receivers; ++r ) {
383                 size_t c = receivers[r]->my_count;
384                 // 3) the nodes will send to multiple successors.
385                 CHECK_MESSAGE( (unsigned int)c == p*N, "" );
386             }
387             for (size_t r = 0; r < num_receivers; ++r ) {
388                 tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] );
389             }
390         }
391     }
392 }
393 
394 template< typename InputType, typename OutputTuple >
395 void run_unlimited_concurrency() {
396     harness_graph_multifunction_executor<InputType, OutputTuple>::max_executors = 0;
397     typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type;
398     unlimited_concurrency<InputType,OutputTuple>( []( InputType i, output_ports_type &p ) { harness_graph_multifunction_executor<InputType, OutputTuple>::func(i,p); } );
399     unlimited_concurrency<InputType,OutputTuple>( &harness_graph_multifunction_executor<InputType, OutputTuple>::func );
400     unlimited_concurrency<InputType,OutputTuple>( typename harness_graph_multifunction_executor<InputType, OutputTuple>::functor() );
401 }
402 
403 template<typename InputType, typename OutputTuple>
404 struct oddEvenBody {
405     typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type;
406     typedef typename std::tuple_element<0,OutputTuple>::type EvenType;
407     typedef typename std::tuple_element<1,OutputTuple>::type OddType;
408     void operator() (const InputType &i, output_ports_type &p) {
409         if((int)i % 2) {
410             (void)std::get<1>(p).try_put(OddType(i));
411         }
412         else {
413             (void)std::get<0>(p).try_put(EvenType(i));
414         }
415     }
416 };
417 
418 template<typename InputType, typename OutputTuple >
419 void run_multiport_test(int num_threads) {
420     typedef typename tbb::flow::multifunction_node<InputType, OutputTuple> mo_node_type;
421     typedef typename std::tuple_element<0,OutputTuple>::type EvenType;
422     typedef typename std::tuple_element<1,OutputTuple>::type OddType;
423     tbb::task_arena arena(num_threads);
424     arena.execute(
425         [&] () {
426             tbb::flow::graph g;
427             mo_node_type mo_node(g, tbb::flow::unlimited, oddEvenBody<InputType, OutputTuple>() );
428 
429             tbb::flow::queue_node<EvenType> q0(g);
430             tbb::flow::queue_node<OddType> q1(g);
431 
432             tbb::flow::make_edge(tbb::flow::output_port<0>(mo_node), q0);
433             tbb::flow::make_edge(tbb::flow::output_port<1>(mo_node), q1);
434 
435             for(InputType i = 0; i < N; ++i) {
436                 mo_node.try_put(i);
437             }
438 
439             g.wait_for_all();
440             for(int i = 0; i < N/2; ++i) {
441                 EvenType e{};
442                 OddType o{};
443                 CHECK_MESSAGE( q0.try_get(e), "" );
444                 CHECK_MESSAGE( (int)e % 2 == 0, "" );
445                 CHECK_MESSAGE( q1.try_get(o), "" );
446                 CHECK_MESSAGE( (int)o % 2 == 1, "" );
447             }
448         }
449     );
450 }
451 
452 //! Tests limited concurrency cases for nodes that accept data messages
453 void test_concurrency(int num_threads) {
454     tbb::task_arena arena(num_threads);
455     arena.execute(
456         [&] () {
457             run_concurrency_levels<int,std::tuple<int> >(num_threads);
458             run_concurrency_levels<int,std::tuple<tbb::flow::continue_msg> >(num_threads);
459             run_buffered_levels<int, std::tuple<int> >(num_threads);
460             run_unlimited_concurrency<int, std::tuple<int> >();
461             run_unlimited_concurrency<int,std::tuple<empty_no_assign> >();
462             run_unlimited_concurrency<empty_no_assign,std::tuple<int> >();
463             run_unlimited_concurrency<empty_no_assign,std::tuple<empty_no_assign> >();
464             run_unlimited_concurrency<int,std::tuple<tbb::flow::continue_msg> >();
465             run_unlimited_concurrency<empty_no_assign,std::tuple<tbb::flow::continue_msg> >();
466             run_multiport_test<int, std::tuple<int, int> >(num_threads);
467             run_multiport_test<float, std::tuple<int, double> >(num_threads);
468         }
469     );
470 }
471 
472 template<typename Policy>
473 void test_ports_return_references() {
474     tbb::flow::graph g;
475     typedef int InputType;
476     typedef std::tuple<int> OutputTuple;
477     tbb::flow::multifunction_node<InputType, OutputTuple, Policy> mf_node(
478         g, tbb::flow::unlimited,
479         &harness_graph_multifunction_executor<InputType, OutputTuple>::empty_func );
480     test_output_ports_return_ref(mf_node);
481 }
482 
483 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
484 #include <array>
485 #include <vector>
486 
487 void test_precedes() {
488     using namespace tbb::flow;
489 
490     using multinode = multifunction_node<int, std::tuple<int, int>>;
491 
492     graph g;
493 
494     buffer_node<int> b1(g);
495     buffer_node<int> b2(g);
496 
497     multinode node(precedes(b1, b2), unlimited, [](const int& i, multinode::output_ports_type& op) -> void {
498             if (i % 2)
499                 std::get<0>(op).try_put(i);
500             else
501                 std::get<1>(op).try_put(i);
502         }
503     );
504 
505     node.try_put(0);
506     node.try_put(1);
507     g.wait_for_all();
508 
509     int storage;
510     CHECK_MESSAGE((b1.try_get(storage) && !b1.try_get(storage) && b2.try_get(storage) && !b2.try_get(storage)),
511             "Not exact edge quantity was made");
512 }
513 
514 void test_follows_and_precedes_api() {
515     using multinode = tbb::flow::multifunction_node<int, std::tuple<int, int, int>>;
516 
517     std::array<int, 3> messages_for_follows = { {0, 1, 2} };
518 
519     follows_and_precedes_testing::test_follows
520         <int, tbb::flow::multifunction_node<int, std::tuple<int, int, int>>>
521         (messages_for_follows, tbb::flow::unlimited, [](const int& i, multinode::output_ports_type& op) -> void {
522             std::get<0>(op).try_put(i);
523         });
524 
525     test_precedes();
526 }
527 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
528 
529 //! Test various node bodies with concurrency
530 //! \brief \ref error_guessing
531 TEST_CASE("Concurrency test"){
532     for( unsigned int p=utils::MinThread; p<=utils::MaxThread; ++p ) {
533        test_concurrency(p);
534     }
535 }
536 
537 //! Test return types of ports
538 //! \brief \ref error_guessing
539 TEST_CASE("Test ports retrurn references"){
540     test_ports_return_references<tbb::flow::queueing>();
541     test_ports_return_references<tbb::flow::rejecting>();
542 }
543 
544 //! NativeParallelFor testing with various concurrency settings
545 //! \brief \ref error_guessing
546 TEST_CASE("Lightweight testing"){
547     lightweight_testing::test<tbb::flow::multifunction_node>(10);
548 }
549 
550 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
551 //! Test follows and precedes API
552 //! \brief \ref error_guessing
553 TEST_CASE("Test follows-precedes API"){
554     test_follows_and_precedes_api();
555 }
556 //! Test priority constructor with follows and precedes API
557 //! \brief \ref error_guessing
558 TEST_CASE("Test priority with follows and precedes"){
559     using namespace tbb::flow;
560 
561     using multinode = multifunction_node<int, std::tuple<int, int>>;
562 
563     graph g;
564 
565     buffer_node<int> b1(g);
566     buffer_node<int> b2(g);
567 
568     multinode node(precedes(b1, b2), unlimited, [](const int& i, multinode::output_ports_type& op) -> void {
569             if (i % 2)
570                 std::get<0>(op).try_put(i);
571             else
572                 std::get<1>(op).try_put(i);
573         }
574         , node_priority_t(0));
575 
576     node.try_put(0);
577     node.try_put(1);
578     g.wait_for_all();
579 
580     int storage;
581     CHECK_MESSAGE((b1.try_get(storage) && !b1.try_get(storage) && b2.try_get(storage) && !b2.try_get(storage)),
582             "Not exact edge quantity was made");
583 }
584 
585 #endif
586 
587 #if __TBB_CPP20_CONCEPTS_PRESENT
588 //! \brief \ref error_guessing
589 TEST_CASE("constraints for multifunction_node input") {
590     struct InputObject {
591         InputObject() = default;
592         InputObject( const InputObject& ) = default;
593     };
594 
595     static_assert(utils::well_formed_instantiation<tbb::flow::multifunction_node, InputObject, int>);
596     static_assert(utils::well_formed_instantiation<tbb::flow::multifunction_node, int, int>);
597     static_assert(!utils::well_formed_instantiation<tbb::flow::multifunction_node, test_concepts::NonCopyable, int>);
598     static_assert(!utils::well_formed_instantiation<tbb::flow::multifunction_node, test_concepts::NonDefaultInitializable, int>);
599 }
600 
601 template <typename Input, typename Output, typename Body>
602 concept can_call_multifunction_node_ctor = requires( tbb::flow::graph& graph, std::size_t concurrency, Body body,
603                                                      tbb::flow::node_priority_t priority, tbb::flow::buffer_node<int>& f ) {
604     tbb::flow::multifunction_node<Input, Output>(graph, concurrency, body);
605     tbb::flow::multifunction_node<Input, Output>(graph, concurrency, body, priority);
606 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
607     tbb::flow::multifunction_node<Input, Output>(tbb::flow::follows(f), concurrency, body);
608     tbb::flow::multifunction_node<Input, Output>(tbb::flow::follows(f), concurrency, body, priority);
609 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
610 };
611 
612 //! \brief \ref error_guessing
613 TEST_CASE("constraints for multifunction_node body") {
614     using input_type = int;
615     using output_type = std::tuple<int>;
616     using namespace test_concepts::multifunction_node_body;
617 
618     static_assert(can_call_multifunction_node_ctor<input_type, output_type, Correct<input_type, output_type>>);
619     static_assert(!can_call_multifunction_node_ctor<input_type, output_type, NonCopyable<input_type, output_type>>);
620     static_assert(!can_call_multifunction_node_ctor<input_type, output_type, NonDestructible<input_type, output_type>>);
621     static_assert(!can_call_multifunction_node_ctor<input_type, output_type, NoOperatorRoundBrackets<input_type, output_type>>);
622     static_assert(!can_call_multifunction_node_ctor<input_type, output_type, WrongFirstInputOperatorRoundBrackets<input_type, output_type>>);
623     static_assert(!can_call_multifunction_node_ctor<input_type, output_type, WrongSecondInputOperatorRoundBrackets<input_type, output_type>>);
624 }
625 #endif // __TBB_CPP20_CONCEPTS_PRESENT
626