1 /*
2 Copyright (c) 2005-2021 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #if __INTEL_COMPILER && _MSC_VER
18 #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
19 #endif
20
21 #include "common/config.h"
22
23 #include "tbb/flow_graph.h"
24 #include "tbb/spin_rw_mutex.h"
25
26 #include "common/test.h"
27 #include "common/utils.h"
28 #include "common/graph_utils.h"
29 #include "common/test_follows_and_precedes_api.h"
30 #include "common/concepts_common.h"
31
32
33 //! \file test_multifunction_node.cpp
34 //! \brief Test for [flow_graph.multifunction_node] specification
35
36
37 #if TBB_USE_DEBUG
38 #define N 16
39 #else
40 #define N 100
41 #endif
42 #define MAX_NODES 4
43
44 //! Performs test on function nodes with limited concurrency and buffering
45 /** These tests check:
46 1) that the number of executing copies never exceed the concurrency limit
47 2) that the node never rejects
48 3) that no items are lost
49 and 4) all of this happens even if there are multiple predecessors and successors
50 */
51
52 //! exercise buffered multifunction_node.
53 template< typename InputType, typename OutputTuple, typename Body >
buffered_levels(size_t concurrency,Body body)54 void buffered_levels( size_t concurrency, Body body ) {
55 typedef typename std::tuple_element<0,OutputTuple>::type OutputType;
56 // Do for lc = 1 to concurrency level
57 for ( size_t lc = 1; lc <= concurrency; ++lc ) {
58 tbb::flow::graph g;
59
60 // Set the execute_counter back to zero in the harness
61 harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count = 0;
62 // Set the number of current executors to zero.
63 harness_graph_multifunction_executor<InputType, OutputTuple>::current_executors = 0;
64 // Set the max allowed executors to lc. There is a check in the functor to make sure this is never exceeded.
65 harness_graph_multifunction_executor<InputType, OutputTuple>::max_executors = lc;
66
67 // Create the function_node with the appropriate concurrency level, and use default buffering
68 tbb::flow::multifunction_node< InputType, OutputTuple > exe_node( g, lc, body );
69
70 //Create a vector of identical exe_nodes
71 std::vector< tbb::flow::multifunction_node< InputType, OutputTuple > > exe_vec(2, exe_node);
72
73 // exercise each of the copied nodes
74 for (size_t node_idx=0; node_idx<exe_vec.size(); ++node_idx) {
75 for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
76 // Create num_receivers counting receivers and connect the exe_vec[node_idx] to them.
77 std::vector< std::shared_ptr<harness_mapped_receiver<OutputType>> > receivers;
78 for (size_t i = 0; i < num_receivers; i++) {
79 receivers.push_back( std::make_shared<harness_mapped_receiver<OutputType>>(g) );
80 }
81
82 for (size_t r = 0; r < num_receivers; ++r ) {
83 tbb::flow::make_edge( tbb::flow::output_port<0>(exe_vec[node_idx]), *receivers[r] );
84 }
85
86 // Do the test with varying numbers of senders
87 std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders;
88 for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) {
89 // Create num_senders senders, set their message limit each to N, and connect
90 // them to the exe_vec[node_idx]
91 senders.clear();
92 for (size_t s = 0; s < num_senders; ++s ) {
93 senders.push_back( std::make_shared<harness_counting_sender<InputType>>() );
94 senders.back()->my_limit = N;
95 tbb::flow::make_edge( *senders.back(), exe_vec[node_idx] );
96 }
97
98 // Initialize the receivers so they know how many senders and messages to check for
99 for (size_t r = 0; r < num_receivers; ++r ) {
100 receivers[r]->initialize_map( N, num_senders );
101 }
102
103 // Do the test
104 utils::NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) );
105 g.wait_for_all();
106
107 // confirm that each sender was requested from N times
108 for (size_t s = 0; s < num_senders; ++s ) {
109 size_t n = senders[s]->my_received;
110 CHECK_MESSAGE( n == N, "" );
111 CHECK_MESSAGE( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_vec[node_idx], "" );
112 }
113 // validate the receivers
114 for (size_t r = 0; r < num_receivers; ++r ) {
115 receivers[r]->validate();
116 }
117 }
118 for (size_t r = 0; r < num_receivers; ++r ) {
119 tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_vec[node_idx]), *receivers[r] );
120 }
121 CHECK_MESSAGE( exe_vec[node_idx].try_put( InputType() ) == true, "" );
122 g.wait_for_all();
123 for (size_t r = 0; r < num_receivers; ++r ) {
124 // since it's detached, nothing should have changed
125 receivers[r]->validate();
126 }
127 }
128 }
129 }
130 }
131
132 const size_t Offset = 123;
133 std::atomic<size_t> global_execute_count;
134
135 struct inc_functor {
136
137 std::atomic<size_t> local_execute_count;
inc_functorinc_functor138 inc_functor( ) { local_execute_count = 0; }
inc_functorinc_functor139 inc_functor( const inc_functor &f ) { local_execute_count = size_t(f.local_execute_count); }
140
141 template<typename output_ports_type>
operator ()inc_functor142 void operator()( int i, output_ports_type &p ) {
143 ++global_execute_count;
144 ++local_execute_count;
145 (void)std::get<0>(p).try_put(i);
146 }
147
148 };
149
150 template< typename InputType, typename OutputTuple >
buffered_levels_with_copy(size_t concurrency)151 void buffered_levels_with_copy( size_t concurrency ) {
152 typedef typename std::tuple_element<0,OutputTuple>::type OutputType;
153 // Do for lc = 1 to concurrency level
154 for ( size_t lc = 1; lc <= concurrency; ++lc ) {
155 tbb::flow::graph g;
156
157 inc_functor cf;
158 cf.local_execute_count = Offset;
159 global_execute_count = Offset;
160
161 tbb::flow::multifunction_node< InputType, OutputTuple > exe_node( g, lc, cf );
162
163 for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
164
165 std::vector< std::shared_ptr<harness_mapped_receiver<OutputType>> > receivers;
166 for (size_t i = 0; i < num_receivers; i++) {
167 receivers.push_back( std::make_shared<harness_mapped_receiver<OutputType>>(g) );
168 }
169
170 for (size_t r = 0; r < num_receivers; ++r ) {
171 tbb::flow::make_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] );
172 }
173
174 std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders;
175 for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) {
176 senders.clear();
177 for (size_t s = 0; s < num_senders; ++s ) {
178 senders.push_back( std::make_shared<harness_counting_sender<InputType>>() );
179 senders.back()->my_limit = N;
180 tbb::flow::make_edge( *senders.back(), exe_node );
181 }
182
183 for (size_t r = 0; r < num_receivers; ++r ) {
184 receivers[r]->initialize_map( N, num_senders );
185 }
186
187 utils::NativeParallelFor( (int)num_senders, parallel_put_until_limit<InputType>(senders) );
188 g.wait_for_all();
189
190 for (size_t s = 0; s < num_senders; ++s ) {
191 size_t n = senders[s]->my_received;
192 CHECK_MESSAGE( n == N, "" );
193 CHECK_MESSAGE( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_node, "" );
194 }
195 for (size_t r = 0; r < num_receivers; ++r ) {
196 receivers[r]->validate();
197 }
198 }
199 for (size_t r = 0; r < num_receivers; ++r ) {
200 tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] );
201 }
202 CHECK_MESSAGE( exe_node.try_put( InputType() ) == true, "" );
203 g.wait_for_all();
204 for (size_t r = 0; r < num_receivers; ++r ) {
205 receivers[r]->validate();
206 }
207 }
208
209 // validate that the local body matches the global execute_count and both are correct
210 inc_functor body_copy = tbb::flow::copy_body<inc_functor>( exe_node );
211 const size_t expected_count = N/2 * MAX_NODES * MAX_NODES * ( MAX_NODES + 1 ) + MAX_NODES + Offset;
212 size_t global_count = global_execute_count;
213 size_t inc_count = body_copy.local_execute_count;
214 CHECK_MESSAGE( (global_count == expected_count && global_count == inc_count), "" );
215 }
216 }
217
218 template< typename InputType, typename OutputTuple >
run_buffered_levels(int c)219 void run_buffered_levels( int c ) {
220 typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type;
221 buffered_levels<InputType,OutputTuple>( c, []( InputType i, output_ports_type &p ) { harness_graph_multifunction_executor<InputType, OutputTuple>::func(i,p); } );
222 buffered_levels<InputType,OutputTuple>( c, &harness_graph_multifunction_executor<InputType, OutputTuple>::func );
223 buffered_levels<InputType,OutputTuple>( c, typename harness_graph_multifunction_executor<InputType, OutputTuple>::functor() );
224 buffered_levels_with_copy<InputType,OutputTuple>( c );
225 }
226
227
228 //! Performs test on executable nodes with limited concurrency
229 /** These tests check:
230 1) that the nodes will accepts puts up to the concurrency limit,
231 2) the nodes do not exceed the concurrency limit even when run with more threads (this is checked in the harness_graph_executor),
232 3) the nodes will receive puts from multiple successors simultaneously,
233 and 4) the nodes will send to multiple predecessors.
234 There is no checking of the contents of the messages for corruption.
235 */
236
237 template< typename InputType, typename OutputTuple, typename Body >
concurrency_levels(size_t concurrency,Body body)238 void concurrency_levels( size_t concurrency, Body body ) {
239 typedef typename std::tuple_element<0,OutputTuple>::type OutputType;
240 for ( size_t lc = 1; lc <= concurrency; ++lc ) {
241 tbb::flow::graph g;
242
243 // Set the execute_counter back to zero in the harness
244 harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count = 0;
245 // Set the number of current executors to zero.
246 harness_graph_multifunction_executor<InputType, OutputTuple>::current_executors = 0;
247 // Set the max allowed executors to lc. There is a check in the functor to make sure this is never exceeded.
248 harness_graph_multifunction_executor<InputType, OutputTuple>::max_executors = lc;
249
250
251 tbb::flow::multifunction_node< InputType, OutputTuple, tbb::flow::rejecting > exe_node( g, lc, body );
252
253 for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
254
255 std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers;
256 for (size_t i = 0; i < num_receivers; ++i) {
257 receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) );
258 }
259
260 for (size_t r = 0; r < num_receivers; ++r ) {
261 tbb::flow::make_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] );
262 }
263
264 std::vector< std::shared_ptr<harness_counting_sender<InputType>> > senders;
265
266 for (size_t num_senders = 1; num_senders <= MAX_NODES; ++num_senders ) {
267 {
268 // Exclusively lock m to prevent exe_node from finishing
269 tbb::spin_rw_mutex::scoped_lock l(
270 harness_graph_multifunction_executor< InputType, OutputTuple>::template mutex_holder<tbb::spin_rw_mutex>::mutex
271 );
272
273 // put to lc level, it will accept and then block at m
274 for ( size_t c = 0 ; c < lc ; ++c ) {
275 CHECK_MESSAGE( exe_node.try_put( InputType() ) == true, "" );
276 }
277 // it only accepts to lc level
278 CHECK_MESSAGE( exe_node.try_put( InputType() ) == false, "" );
279
280 senders.clear();
281 for (size_t s = 0; s < num_senders; ++s ) {
282 senders.push_back( std::make_shared<harness_counting_sender<InputType>>() );
283 senders.back()->my_limit = N;
284 exe_node.register_predecessor( *senders.back() );
285 }
286
287 } // release lock at end of scope, setting the exe node free to continue
288 // wait for graph to settle down
289 g.wait_for_all();
290
291 // confirm that each sender was requested from N times
292 for (size_t s = 0; s < num_senders; ++s ) {
293 size_t n = senders[s]->my_received;
294 CHECK_MESSAGE( n == N, "" );
295 CHECK_MESSAGE( senders[s]->my_receiver.load(std::memory_order_relaxed) == &exe_node, "" );
296 }
297 // confirm that each receivers got N * num_senders + the initial lc puts
298 for (size_t r = 0; r < num_receivers; ++r ) {
299 size_t n = receivers[r]->my_count;
300 CHECK_MESSAGE( n == num_senders*N+lc, "" );
301 receivers[r]->my_count = 0;
302 }
303 }
304 for (size_t r = 0; r < num_receivers; ++r ) {
305 tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] );
306 }
307 CHECK_MESSAGE( exe_node.try_put( InputType() ) == true, "" );
308 g.wait_for_all();
309 for (size_t r = 0; r < num_receivers; ++r ) {
310 CHECK_MESSAGE( int(receivers[r]->my_count) == 0, "" );
311 }
312 }
313 }
314 }
315
316 template< typename InputType, typename OutputTuple >
run_concurrency_levels(int c)317 void run_concurrency_levels( int c ) {
318 typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type;
319 concurrency_levels<InputType,OutputTuple>( c, []( InputType i, output_ports_type &p ) { harness_graph_multifunction_executor<InputType, OutputTuple>::template tfunc<tbb::spin_rw_mutex>(i,p); } );
320 concurrency_levels<InputType,OutputTuple>( c, &harness_graph_multifunction_executor<InputType, OutputTuple>::template tfunc<tbb::spin_rw_mutex> );
321 concurrency_levels<InputType,OutputTuple>( c, typename harness_graph_multifunction_executor<InputType, OutputTuple>::template tfunctor<tbb::spin_rw_mutex>() );
322 }
323
324
325 struct empty_no_assign {
empty_no_assignempty_no_assign326 empty_no_assign() {}
empty_no_assignempty_no_assign327 empty_no_assign( int ) {}
operator intempty_no_assign328 operator int() { return 0; }
operator intempty_no_assign329 operator int() const { return 0; }
330 };
331
332 template< typename InputType >
333 struct parallel_puts : private utils::NoAssign {
334
335 tbb::flow::receiver< InputType > * const my_exe_node;
336
parallel_putsparallel_puts337 parallel_puts( tbb::flow::receiver< InputType > &exe_node ) : my_exe_node(&exe_node) {}
338
operator ()parallel_puts339 void operator()( int ) const {
340 for ( int i = 0; i < N; ++i ) {
341 // the nodes will accept all puts
342 CHECK_MESSAGE( my_exe_node->try_put( InputType() ) == true, "" );
343 }
344 }
345
346 };
347
348 //! Performs test on executable nodes with unlimited concurrency
349 /** These tests check:
350 1) that the nodes will accept all puts
351 2) the nodes will receive puts from multiple predecessors simultaneously,
352 and 3) the nodes will send to multiple successors.
353 There is no checking of the contents of the messages for corruption.
354 */
355
356 template< typename InputType, typename OutputTuple, typename Body >
unlimited_concurrency(Body body)357 void unlimited_concurrency( Body body ) {
358 typedef typename std::tuple_element<0,OutputTuple>::type OutputType;
359
360 for (unsigned int p = 1; p < 2*utils::MaxThread; ++p) {
361 tbb::flow::graph g;
362 tbb::flow::multifunction_node< InputType, OutputTuple, tbb::flow::rejecting > exe_node( g, tbb::flow::unlimited, body );
363
364 for (size_t num_receivers = 1; num_receivers <= MAX_NODES; ++num_receivers ) {
365 std::vector< std::shared_ptr<harness_counting_receiver<OutputType>> > receivers;
366 for (size_t i = 0; i < num_receivers; ++i) {
367 receivers.push_back( std::make_shared<harness_counting_receiver<OutputType>>(g) );
368 }
369
370 harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count = 0;
371
372 for (size_t r = 0; r < num_receivers; ++r ) {
373 tbb::flow::make_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] );
374 }
375
376 utils::NativeParallelFor( p, parallel_puts<InputType>(exe_node) );
377 g.wait_for_all();
378
379 // 2) the nodes will receive puts from multiple predecessors simultaneously,
380 size_t ec = harness_graph_multifunction_executor<InputType, OutputTuple>::execute_count;
381 CHECK_MESSAGE( (unsigned int)ec == p*N, "" );
382 for (size_t r = 0; r < num_receivers; ++r ) {
383 size_t c = receivers[r]->my_count;
384 // 3) the nodes will send to multiple successors.
385 CHECK_MESSAGE( (unsigned int)c == p*N, "" );
386 }
387 for (size_t r = 0; r < num_receivers; ++r ) {
388 tbb::flow::remove_edge( tbb::flow::output_port<0>(exe_node), *receivers[r] );
389 }
390 }
391 }
392 }
393
394 template< typename InputType, typename OutputTuple >
run_unlimited_concurrency()395 void run_unlimited_concurrency() {
396 harness_graph_multifunction_executor<InputType, OutputTuple>::max_executors = 0;
397 typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type;
398 unlimited_concurrency<InputType,OutputTuple>( []( InputType i, output_ports_type &p ) { harness_graph_multifunction_executor<InputType, OutputTuple>::func(i,p); } );
399 unlimited_concurrency<InputType,OutputTuple>( &harness_graph_multifunction_executor<InputType, OutputTuple>::func );
400 unlimited_concurrency<InputType,OutputTuple>( typename harness_graph_multifunction_executor<InputType, OutputTuple>::functor() );
401 }
402
403 template<typename InputType, typename OutputTuple>
404 struct oddEvenBody {
405 typedef typename tbb::flow::multifunction_node<InputType,OutputTuple>::output_ports_type output_ports_type;
406 typedef typename std::tuple_element<0,OutputTuple>::type EvenType;
407 typedef typename std::tuple_element<1,OutputTuple>::type OddType;
operator ()oddEvenBody408 void operator() (const InputType &i, output_ports_type &p) {
409 if((int)i % 2) {
410 (void)std::get<1>(p).try_put(OddType(i));
411 }
412 else {
413 (void)std::get<0>(p).try_put(EvenType(i));
414 }
415 }
416 };
417
418 template<typename InputType, typename OutputTuple >
run_multiport_test(int num_threads)419 void run_multiport_test(int num_threads) {
420 typedef typename tbb::flow::multifunction_node<InputType, OutputTuple> mo_node_type;
421 typedef typename std::tuple_element<0,OutputTuple>::type EvenType;
422 typedef typename std::tuple_element<1,OutputTuple>::type OddType;
423 tbb::task_arena arena(num_threads);
424 arena.execute(
425 [&] () {
426 tbb::flow::graph g;
427 mo_node_type mo_node(g, tbb::flow::unlimited, oddEvenBody<InputType, OutputTuple>() );
428
429 tbb::flow::queue_node<EvenType> q0(g);
430 tbb::flow::queue_node<OddType> q1(g);
431
432 tbb::flow::make_edge(tbb::flow::output_port<0>(mo_node), q0);
433 tbb::flow::make_edge(tbb::flow::output_port<1>(mo_node), q1);
434
435 for(InputType i = 0; i < N; ++i) {
436 mo_node.try_put(i);
437 }
438
439 g.wait_for_all();
440 for(int i = 0; i < N/2; ++i) {
441 EvenType e{};
442 OddType o{};
443 CHECK_MESSAGE( q0.try_get(e), "" );
444 CHECK_MESSAGE( (int)e % 2 == 0, "" );
445 CHECK_MESSAGE( q1.try_get(o), "" );
446 CHECK_MESSAGE( (int)o % 2 == 1, "" );
447 }
448 }
449 );
450 }
451
452 //! Tests limited concurrency cases for nodes that accept data messages
test_concurrency(int num_threads)453 void test_concurrency(int num_threads) {
454 tbb::task_arena arena(num_threads);
455 arena.execute(
456 [&] () {
457 run_concurrency_levels<int,std::tuple<int> >(num_threads);
458 run_concurrency_levels<int,std::tuple<tbb::flow::continue_msg> >(num_threads);
459 run_buffered_levels<int, std::tuple<int> >(num_threads);
460 run_unlimited_concurrency<int, std::tuple<int> >();
461 run_unlimited_concurrency<int,std::tuple<empty_no_assign> >();
462 run_unlimited_concurrency<empty_no_assign,std::tuple<int> >();
463 run_unlimited_concurrency<empty_no_assign,std::tuple<empty_no_assign> >();
464 run_unlimited_concurrency<int,std::tuple<tbb::flow::continue_msg> >();
465 run_unlimited_concurrency<empty_no_assign,std::tuple<tbb::flow::continue_msg> >();
466 run_multiport_test<int, std::tuple<int, int> >(num_threads);
467 run_multiport_test<float, std::tuple<int, double> >(num_threads);
468 }
469 );
470 }
471
472 template<typename Policy>
test_ports_return_references()473 void test_ports_return_references() {
474 tbb::flow::graph g;
475 typedef int InputType;
476 typedef std::tuple<int> OutputTuple;
477 tbb::flow::multifunction_node<InputType, OutputTuple, Policy> mf_node(
478 g, tbb::flow::unlimited,
479 &harness_graph_multifunction_executor<InputType, OutputTuple>::empty_func );
480 test_output_ports_return_ref(mf_node);
481 }
482
483 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
484 #include <array>
485 #include <vector>
486
test_precedes()487 void test_precedes() {
488 using namespace tbb::flow;
489
490 using multinode = multifunction_node<int, std::tuple<int, int>>;
491
492 graph g;
493
494 buffer_node<int> b1(g);
495 buffer_node<int> b2(g);
496
497 multinode node(precedes(b1, b2), unlimited, [](const int& i, multinode::output_ports_type& op) -> void {
498 if (i % 2)
499 std::get<0>(op).try_put(i);
500 else
501 std::get<1>(op).try_put(i);
502 }
503 );
504
505 node.try_put(0);
506 node.try_put(1);
507 g.wait_for_all();
508
509 int storage;
510 CHECK_MESSAGE((b1.try_get(storage) && !b1.try_get(storage) && b2.try_get(storage) && !b2.try_get(storage)),
511 "Not exact edge quantity was made");
512 }
513
test_follows_and_precedes_api()514 void test_follows_and_precedes_api() {
515 using multinode = tbb::flow::multifunction_node<int, std::tuple<int, int, int>>;
516
517 std::array<int, 3> messages_for_follows = { {0, 1, 2} };
518
519 follows_and_precedes_testing::test_follows
520 <int, tbb::flow::multifunction_node<int, std::tuple<int, int, int>>>
521 (messages_for_follows, tbb::flow::unlimited, [](const int& i, multinode::output_ports_type& op) -> void {
522 std::get<0>(op).try_put(i);
523 });
524
525 test_precedes();
526 }
527 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
528
529 //! Test various node bodies with concurrency
530 //! \brief \ref error_guessing
531 TEST_CASE("Concurrency test"){
532 for( unsigned int p=utils::MinThread; p<=utils::MaxThread; ++p ) {
533 test_concurrency(p);
534 }
535 }
536
537 //! Test return types of ports
538 //! \brief \ref error_guessing
539 TEST_CASE("Test ports retrurn references"){
540 test_ports_return_references<tbb::flow::queueing>();
541 test_ports_return_references<tbb::flow::rejecting>();
542 }
543
544 //! NativeParallelFor testing with various concurrency settings
545 //! \brief \ref error_guessing
546 TEST_CASE("Lightweight testing"){
547 lightweight_testing::test<tbb::flow::multifunction_node>(10);
548 }
549
550 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
551 //! Test follows and precedes API
552 //! \brief \ref error_guessing
553 TEST_CASE("Test follows-precedes API"){
554 test_follows_and_precedes_api();
555 }
556 //! Test priority constructor with follows and precedes API
557 //! \brief \ref error_guessing
558 TEST_CASE("Test priority with follows and precedes"){
559 using namespace tbb::flow;
560
561 using multinode = multifunction_node<int, std::tuple<int, int>>;
562
563 graph g;
564
565 buffer_node<int> b1(g);
566 buffer_node<int> b2(g);
567
__anon0fcb77200802(const int& i, multinode::output_ports_type& op) 568 multinode node(precedes(b1, b2), unlimited, [](const int& i, multinode::output_ports_type& op) -> void {
569 if (i % 2)
570 std::get<0>(op).try_put(i);
571 else
572 std::get<1>(op).try_put(i);
573 }
574 , node_priority_t(0));
575
576 node.try_put(0);
577 node.try_put(1);
578 g.wait_for_all();
579
580 int storage;
581 CHECK_MESSAGE((b1.try_get(storage) && !b1.try_get(storage) && b2.try_get(storage) && !b2.try_get(storage)),
582 "Not exact edge quantity was made");
583 }
584
585 #endif
586
587 #if __TBB_CPP20_CONCEPTS_PRESENT
588 //! \brief \ref error_guessing
589 TEST_CASE("constraints for multifunction_node input") {
590 struct InputObject {
591 InputObject() = default;
592 InputObject( const InputObject& ) = default;
593 };
594
595 static_assert(utils::well_formed_instantiation<tbb::flow::multifunction_node, InputObject, int>);
596 static_assert(utils::well_formed_instantiation<tbb::flow::multifunction_node, int, int>);
597 static_assert(!utils::well_formed_instantiation<tbb::flow::multifunction_node, test_concepts::NonCopyable, int>);
598 static_assert(!utils::well_formed_instantiation<tbb::flow::multifunction_node, test_concepts::NonDefaultInitializable, int>);
599 }
600
601 template <typename Input, typename Output, typename Body>
602 concept can_call_multifunction_node_ctor = requires( tbb::flow::graph& graph, std::size_t concurrency, Body body,
603 tbb::flow::node_priority_t priority, tbb::flow::buffer_node<int>& f ) {
604 tbb::flow::multifunction_node<Input, Output>(graph, concurrency, body);
605 tbb::flow::multifunction_node<Input, Output>(graph, concurrency, body, priority);
606 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
607 tbb::flow::multifunction_node<Input, Output>(tbb::flow::follows(f), concurrency, body);
608 tbb::flow::multifunction_node<Input, Output>(tbb::flow::follows(f), concurrency, body, priority);
609 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
610 };
611
612 //! \brief \ref error_guessing
613 TEST_CASE("constraints for multifunction_node body") {
614 using input_type = int;
615 using output_type = std::tuple<int>;
616 using namespace test_concepts::multifunction_node_body;
617
618 static_assert(can_call_multifunction_node_ctor<input_type, output_type, Correct<input_type, output_type>>);
619 static_assert(!can_call_multifunction_node_ctor<input_type, output_type, NonCopyable<input_type, output_type>>);
620 static_assert(!can_call_multifunction_node_ctor<input_type, output_type, NonDestructible<input_type, output_type>>);
621 static_assert(!can_call_multifunction_node_ctor<input_type, output_type, NoOperatorRoundBrackets<input_type, output_type>>);
622 static_assert(!can_call_multifunction_node_ctor<input_type, output_type, WrongFirstInputOperatorRoundBrackets<input_type, output_type>>);
623 static_assert(!can_call_multifunction_node_ctor<input_type, output_type, WrongSecondInputOperatorRoundBrackets<input_type, output_type>>);
624 }
625 #endif // __TBB_CPP20_CONCEPTS_PRESENT
626