xref: /oneTBB/test/tbb/test_async_node.cpp (revision 6caecf96)
1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #if __INTEL_COMPILER && _MSC_VER
18 #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
19 #endif
20 
21 #include "common/config.h"
22 
23 #include "tbb/flow_graph.h"
24 
25 #include "tbb/task.h"
26 #include "tbb/global_control.h"
27 
28 #include "common/test.h"
29 #include "common/utils.h"
30 #include "common/utils_assert.h"
31 #include "common/graph_utils.h"
32 #include "common/spin_barrier.h"
33 #include "common/test_follows_and_precedes_api.h"
34 #include "common/concepts_common.h"
35 
36 #include <string>
37 #include <thread>
38 #include <mutex>
39 
40 
41 //! \file test_async_node.cpp
42 //! \brief Test for [flow_graph.async_node] specification
43 
44 
45 class minimal_type {
46     template<typename T>
47     friend struct place_wrapper;
48 
49     int value;
50 
51 public:
52     minimal_type() : value(-1) {}
53     minimal_type(int v) : value(v) {}
54     minimal_type(const minimal_type &m) : value(m.value) { }
55     minimal_type &operator=(const minimal_type &m) { value = m.value; return *this; }
56 };
57 
58 template <typename T>
59 struct place_wrapper {
60     typedef T wrapped_type;
61     T value;
62     std::thread::id thread_id;
63 
64     place_wrapper( int v = 0 ) : value(v), thread_id(std::this_thread::get_id()) {}
65 
66     template <typename Q>
67     place_wrapper(const place_wrapper<Q>& v)
68         : value(v.value), thread_id(v.thread_id)
69     {}
70 
71     template <typename Q>
72     place_wrapper<Q>& operator=(const place_wrapper<Q>& v) {
73         if (this != &v) {
74             value = v.value;
75             thread_id = v.thread_id;
76         }
77         return *this;
78     }
79 
80 };
81 
82 template<typename T1, typename T2>
83 struct wrapper_helper {
84     static void check(const T1 &, const T2 &) { }
85     static void copy_value(const T1 &in, T2 &out) { out = in; }
86 };
87 
88 template<typename T1, typename T2>
89 struct wrapper_helper< place_wrapper<T1>, place_wrapper<T2> > {
90     static void check(const place_wrapper<T1> &a, const place_wrapper<T2> &b) {
91        CHECK_MESSAGE( ( (a.thread_id != b.thread_id)), "same thread used to execute adjacent nodes");
92        return;
93     }
94     static void copy_value(const place_wrapper<T1> &in, place_wrapper<T2> &out) {
95         out.value = in.value;
96     }
97 };
98 
99 const int NUMBER_OF_MSGS = 10;
100 const int UNKNOWN_NUMBER_OF_ITEMS = -1;
101 std::atomic<int> async_body_exec_count;
102 std::atomic<int> async_activity_processed_msg_count;
103 std::atomic<int> end_body_exec_count;
104 
105 // queueing required in test_reset for testing of cancellation
106 typedef tbb::flow::async_node< int, int, tbb::flow::queueing > counting_async_node_type;
107 typedef counting_async_node_type::gateway_type counting_gateway_type;
108 
109 struct counting_async_unlimited_body {
110 
111     counting_async_unlimited_body(tbb::task_group_context& graph_tgc) : my_tgc( graph_tgc ) {}
112 
113     void operator()( const int &input, counting_gateway_type& gateway) {
114         // TODO revamp: reconsider logging for the tests. It is known that frequent calls to
115         // doctest's INFO cause issues.
116 
117         // INFO( "Body execution with input == " << input << "\n");
118         ++async_body_exec_count;
119         if ( input == -1 ) {
120             bool result = my_tgc.cancel_group_execution();
121             // INFO( "Canceling graph execution\n" );
122             CHECK_MESSAGE( ( result == true), "attempted to cancel graph twice" );
123             utils::Sleep(50);
124         }
125         gateway.try_put(input);
126     }
127 private:
128     tbb::task_group_context& my_tgc;
129 };
130 
131 struct counting_async_serial_body : counting_async_unlimited_body {
132     typedef counting_async_unlimited_body base_type;
133     int my_async_body_exec_count;
134 
135     counting_async_serial_body(tbb::task_group_context& tgc)
136         : base_type(tgc), my_async_body_exec_count( 0 ) { }
137 
138     void operator()( const int &input, counting_gateway_type& gateway ) {
139         ++my_async_body_exec_count;
140         base_type::operator()( input, gateway );
141     }
142 };
143 
144 void test_reset() {
145     const int N = NUMBER_OF_MSGS;
146     async_body_exec_count = 0;
147 
148     tbb::task_group_context graph_ctx;
149     tbb::flow::graph g(graph_ctx);
150     counting_async_node_type a(g, tbb::flow::serial, counting_async_serial_body(graph_ctx) );
151 
152     const int R = 3;
153     std::vector< std::shared_ptr<harness_counting_receiver<int>> > r;
154     for (size_t i = 0; i < R; ++i) {
155         r.push_back( std::make_shared<harness_counting_receiver<int>>(g) );
156     }
157 
158     for (int i = 0; i < R; ++i) {
159         tbb::flow::make_edge(a, *r[i]);
160     }
161 
162     INFO( "One body execution\n" );
163     a.try_put(-1);
164     for (int i = 0; i < N; ++i) {
165        a.try_put(i);
166     }
167     g.wait_for_all();
168     // should be canceled with only 1 item reaching the async_body and the counting receivers
169     // and N items left in the node's queue
170     CHECK_MESSAGE( ( g.is_cancelled() == true), "task group not canceled" );
171 
172     counting_async_serial_body b1 = tbb::flow::copy_body<counting_async_serial_body>(a);
173     CHECK_MESSAGE( ( int(async_body_exec_count) == int(b1.my_async_body_exec_count)), "body and global body counts are different" );
174     CHECK_MESSAGE( ( int(async_body_exec_count) == 1), "global body execution count not 1"  );
175     for (int i = 0; i < R; ++i) {
176         CHECK_MESSAGE( ( int(r[i]->my_count) == 1), "counting receiver count not 1" );
177     }
178 
179     // should clear the async_node queue, but retain its local count at 1 and keep all edges
180     g.reset(tbb::flow::rf_reset_protocol);
181 
182     INFO( "N body executions\n" );
183     for (int i = 0; i < N; ++i) {
184        a.try_put(i);
185     }
186     g.wait_for_all();
187     CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" );
188 
189     // a total of N+1 items should have passed through the node body
190     // the local body count should also be N+1
191     // and the counting receivers should all have a count of N+1
192     counting_async_serial_body b2 = tbb::flow::copy_body<counting_async_serial_body>(a);
193     CHECK_MESSAGE( int(async_body_exec_count) == int(b2.my_async_body_exec_count),
194                    "local and global body execution counts are different" );
195     INFO( "async_body_exec_count==" << int(async_body_exec_count) << "\n" );
196     CHECK_MESSAGE( ( int(async_body_exec_count) == N+1), "global body execution count not N+1"  );
197     for (int i = 0; i < R; ++i) {
198         CHECK_MESSAGE( ( int(r[i]->my_count) == N+1), "counting receiver has not received N+1 items" );
199     }
200 
201     INFO( "N body executions with new bodies\n" );
202     // should clear the async_node queue and reset its local count to 0, but keep all edges
203     g.reset(tbb::flow::rf_reset_bodies);
204     for (int i = 0; i < N; ++i) {
205        a.try_put(i);
206     }
207     g.wait_for_all();
208     CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" );
209 
210     // a total of 2N+1 items should have passed through the node body
211     // the local body count should be N
212     // and the counting receivers should all have a count of 2N+1
213     counting_async_serial_body b3 = tbb::flow::copy_body<counting_async_serial_body>(a);
214     CHECK_MESSAGE( ( int(async_body_exec_count) == 2*N+1), "global body execution count not 2N+1"  );
215     CHECK_MESSAGE( ( int(b3.my_async_body_exec_count) == N), "local body execution count not N"  );
216     for (int i = 0; i < R; ++i) {
217         CHECK_MESSAGE( ( int(r[i]->my_count) == 2*N+1), "counting receiver has not received 2N+1 items" );
218     }
219 
220     // should clear the async_node queue and keep its local count at N and remove all edges
221     INFO( "N body executions with no edges\n" );
222     g.reset(tbb::flow::rf_clear_edges);
223     for (int i = 0; i < N; ++i) {
224        a.try_put(i);
225     }
226     g.wait_for_all();
227     CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" );
228 
229     // a total of 3N+1 items should have passed through the node body
230     // the local body count should now be 2*N
231     // and the counting receivers should remain at a count of 2N+1
232     counting_async_serial_body b4 = tbb::flow::copy_body<counting_async_serial_body>(a);
233     CHECK_MESSAGE( ( int(async_body_exec_count) == 3*N+1), "global body execution count not 3N+1"  );
234     CHECK_MESSAGE( ( int(b4.my_async_body_exec_count) == 2*N), "local body execution count not 2N"  );
235     for (int i = 0; i < R; ++i) {
236         CHECK_MESSAGE( ( int(r[i]->my_count) == 2*N+1), "counting receiver has not received 2N+1 items" );
237     }
238 
239     // put back 1 edge to receiver 0
240     INFO( "N body executions with 1 edge\n" );
241     tbb::flow::make_edge(a, *r[0]);
242     for (int i = 0; i < N; ++i) {
243        a.try_put(i);
244     }
245     g.wait_for_all();
246     CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" );
247 
248     // a total of 4N+1 items should have passed through the node body
249     // the local body count should now be 3*N
250     // and all of the counting receivers should remain at a count of 2N+1, except r[0] which should be 3N+1
251     counting_async_serial_body b5 = tbb::flow::copy_body<counting_async_serial_body>(a);
252     CHECK_MESSAGE( ( int(async_body_exec_count) == 4*N+1), "global body execution count not 4N+1"  );
253     CHECK_MESSAGE( ( int(b5.my_async_body_exec_count) == 3*N), "local body execution count not 3N"  );
254     CHECK_MESSAGE( ( int(r[0]->my_count) == 3*N+1), "counting receiver has not received 3N+1 items" );
255     for (int i = 1; i < R; ++i) {
256         CHECK_MESSAGE( ( int(r[i]->my_count) == 2*N+1), "counting receiver has not received 2N+1 items" );
257     }
258 
259     // should clear the async_node queue and keep its local count at N and remove all edges
260     INFO( "N body executions with no edges and new body\n" );
261     g.reset(static_cast<tbb::flow::reset_flags>(tbb::flow::rf_reset_bodies|tbb::flow::rf_clear_edges));
262     for (int i = 0; i < N; ++i) {
263        a.try_put(i);
264     }
265     g.wait_for_all();
266     CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" );
267 
268     // a total of 4N+1 items should have passed through the node body
269     // the local body count should now be 3*N
270     // and all of the counting receivers should remain at a count of 2N+1, except r[0] which should be 3N+1
271     counting_async_serial_body b6 = tbb::flow::copy_body<counting_async_serial_body>(a);
272     CHECK_MESSAGE( ( int(async_body_exec_count) == 5*N+1), "global body execution count not 5N+1"  );
273     CHECK_MESSAGE( ( int(b6.my_async_body_exec_count) == N), "local body execution count not N"  );
274     CHECK_MESSAGE( ( int(r[0]->my_count) == 3*N+1), "counting receiver has not received 3N+1 items" );
275     for (int i = 1; i < R; ++i) {
276         CHECK_MESSAGE( ( int(r[i]->my_count) == 2*N+1), "counting receiver has not received 2N+1 items" );
277     }
278 }
279 
280 
281 #include <mutex>
282 
283 template <typename T>
284 class async_activity_queue {
285 public:
286     void push( const T& item ) {
287         std::lock_guard<mutex_t> lock( m_mutex );
288         m_queue.push( item );
289     }
290 
291     bool try_pop( T& item ) {
292         std::lock_guard<mutex_t> lock( m_mutex );
293         if( m_queue.empty() )
294             return false;
295         item = m_queue.front();
296         m_queue.pop();
297         return true;
298     }
299 
300     bool empty() {
301         std::lock_guard<mutex_t> lock( m_mutex );
302         return m_queue.empty();
303     }
304 
305 private:
306     typedef std::mutex mutex_t;
307     mutex_t m_mutex;
308     std::queue<T> m_queue;
309 };
310 
311 template< typename Input, typename Output >
312 class async_activity : utils::NoAssign {
313 public:
314     typedef Input input_type;
315     typedef Output output_type;
316     typedef tbb::flow::async_node< input_type, output_type > async_node_type;
317     typedef typename async_node_type::gateway_type gateway_type;
318 
319     struct work_type {
320         input_type input;
321         gateway_type* gateway;
322     };
323 
324     class ServiceThreadBody {
325     public:
326         ServiceThreadBody( async_activity* activity ) : my_activity( activity ) {}
327         void operator()() { my_activity->process(); }
328     private:
329         async_activity* my_activity;
330     };
331 
332     async_activity(int expected_items, bool deferred = false, int sleep_time = 50)
333         : my_expected_items(expected_items), my_sleep_time(sleep_time)
334     {
335         is_active = !deferred;
336         my_quit = false;
337         std::thread( ServiceThreadBody( this ) ).swap( my_service_thread );
338     }
339 
340 private:
341 
342     async_activity( const async_activity& )
343         : my_expected_items(UNKNOWN_NUMBER_OF_ITEMS), my_sleep_time(0)
344     {
345         is_active = true;
346     }
347 
348 public:
349     ~async_activity() {
350         stop();
351         my_service_thread.join();
352     }
353 
354     void submit( const input_type &input, gateway_type& gateway ) {
355         work_type work = {input, &gateway};
356         my_work_queue.push( work );
357     }
358 
359     void process() {
360         do {
361             work_type work;
362             if( is_active && my_work_queue.try_pop( work ) ) {
363                 utils::Sleep(my_sleep_time);
364                 ++async_activity_processed_msg_count;
365                 output_type output;
366                 wrapper_helper<output_type, output_type>::copy_value(work.input, output);
367                 wrapper_helper<output_type, output_type>::check(work.input, output);
368                 work.gateway->try_put(output);
369                 if ( my_expected_items == UNKNOWN_NUMBER_OF_ITEMS ||
370                      int(async_activity_processed_msg_count) == my_expected_items ) {
371                     work.gateway->release_wait();
372                 }
373             }
374         } while( my_quit == false || !my_work_queue.empty());
375     }
376 
377     void stop() {
378         my_quit = true;
379     }
380 
381     void activate() {
382         is_active = true;
383     }
384 
385     bool should_reserve_each_time() {
386         if ( my_expected_items == UNKNOWN_NUMBER_OF_ITEMS )
387             return true;
388         else
389             return false;
390     }
391 
392 private:
393 
394     const int my_expected_items;
395     const int my_sleep_time;
396     std::atomic< bool > is_active;
397 
398     async_activity_queue<work_type> my_work_queue;
399 
400     std::atomic< bool > my_quit;
401 
402     std::thread my_service_thread;
403 };
404 
405 template<typename Input, typename Output>
406 struct basic_test {
407     typedef Input input_type;
408     typedef Output output_type;
409     typedef tbb::flow::async_node< input_type, output_type > async_node_type;
410     typedef typename async_node_type::gateway_type gateway_type;
411 
412     basic_test() {}
413 
414     static int run(int async_expected_items = UNKNOWN_NUMBER_OF_ITEMS) {
415         async_activity<input_type, output_type> my_async_activity(async_expected_items);
416 
417         tbb::flow::graph g;
418 
419         tbb::flow::function_node< int, input_type > start_node(
420             g, tbb::flow::unlimited, [](int input) { return input_type(input); }
421         );
422         async_node_type offload_node(
423             g, tbb::flow::unlimited,
424             [&] (const input_type &input, gateway_type& gateway) {
425                 ++async_body_exec_count;
426                 if(my_async_activity.should_reserve_each_time())
427                     gateway.reserve_wait();
428                 my_async_activity.submit(input, gateway);
429             }
430         );
431         tbb::flow::function_node< output_type > end_node(
432             g, tbb::flow::unlimited,
433             [&](const output_type& input) {
434                 ++end_body_exec_count;
435                 output_type output;
436                 wrapper_helper<output_type, output_type>::check(input, output);
437             }
438         );
439 
440         tbb::flow::make_edge( start_node, offload_node );
441         tbb::flow::make_edge( offload_node, end_node );
442 
443         async_body_exec_count = 0;
444         async_activity_processed_msg_count = 0;
445         end_body_exec_count = 0;
446 
447         if (async_expected_items != UNKNOWN_NUMBER_OF_ITEMS) {
448             offload_node.gateway().reserve_wait();
449         }
450         for (int i = 0; i < NUMBER_OF_MSGS; ++i) {
451             start_node.try_put(i);
452         }
453         g.wait_for_all();
454         CHECK_MESSAGE( ( async_body_exec_count == NUMBER_OF_MSGS), "AsyncBody processed wrong number of signals" );
455         CHECK_MESSAGE( ( async_activity_processed_msg_count == NUMBER_OF_MSGS), "AsyncActivity processed wrong number of signals" );
456         CHECK_MESSAGE( ( end_body_exec_count == NUMBER_OF_MSGS), "EndBody processed wrong number of signals");
457         INFO( "async_body_exec_count == " << int(async_body_exec_count) <<
458               " == async_activity_processed_msg_count == " << int(async_activity_processed_msg_count) <<
459               " == end_body_exec_count == " << int(end_body_exec_count) << "\n"
460         );
461         return 0;
462     }
463 
464 };
465 
466 int test_copy_ctor() {
467     const int N = NUMBER_OF_MSGS;
468     async_body_exec_count = 0;
469 
470     tbb::flow::graph g;
471 
472     harness_counting_receiver<int> r1(g);
473     harness_counting_receiver<int> r2(g);
474 
475     tbb::task_group_context graph_ctx;
476     counting_async_node_type a(g, tbb::flow::unlimited, counting_async_unlimited_body(graph_ctx) );
477     counting_async_node_type b(a);
478 
479     tbb::flow::make_edge(a, r1);                             // C++11-style of making edges
480     tbb::flow::make_edge(tbb::flow::output_port<0>(b), r2);  // usual way of making edges
481 
482     for (int i = 0; i < N; ++i) {
483        a.try_put(i);
484     }
485     g.wait_for_all();
486 
487     INFO("async_body_exec_count = " << int(async_body_exec_count) << "\n" );
488     INFO("r1.my_count == " << int(r1.my_count) << " and r2.my_count = " << int(r2.my_count) << "\n" );
489     CHECK_MESSAGE( ( int(async_body_exec_count) == NUMBER_OF_MSGS), "AsyncBody processed wrong number of signals" );
490     CHECK_MESSAGE( ( int(r1.my_count) == N), "counting receiver r1 has not received N items" );
491     CHECK_MESSAGE( ( int(r2.my_count) == 0), "counting receiver r2 has not received 0 items" );
492 
493     for (int i = 0; i < N; ++i) {
494        b.try_put(i);
495     }
496     g.wait_for_all();
497 
498     INFO("async_body_exec_count = " << int(async_body_exec_count) << "\n" );
499     INFO("r1.my_count == " << int(r1.my_count) << " and r2.my_count = " << int(r2.my_count) << "\n" );
500     CHECK_MESSAGE( ( int(async_body_exec_count) == 2*NUMBER_OF_MSGS), "AsyncBody processed wrong number of signals" );
501     CHECK_MESSAGE( ( int(r1.my_count) == N), "counting receiver r1 has not received N items" );
502     CHECK_MESSAGE( ( int(r2.my_count) == N), "counting receiver r2 has not received N items" );
503     return 0;
504 }
505 
506 std::atomic<int> main_tid_count;
507 
508 template<typename Input, typename Output>
509 struct spin_test {
510     typedef Input input_type;
511     typedef Output output_type;
512     typedef tbb::flow::async_node< input_type, output_type > async_node_type;
513     typedef typename async_node_type::gateway_type gateway_type;
514 
515     class end_body_type {
516         typedef Output output_type;
517         std::thread::id my_main_tid;
518         utils::SpinBarrier *my_barrier;
519     public:
520         end_body_type(std::thread::id t, utils::SpinBarrier &b) : my_main_tid(t), my_barrier(&b) { }
521 
522         void operator()( const output_type & ) {
523             ++end_body_exec_count;
524             if (std::this_thread::get_id() == my_main_tid) {
525                ++main_tid_count;
526             }
527             my_barrier->wait();
528         }
529     };
530 
531     spin_test() {}
532 
533     static int run(int nthreads, int async_expected_items = UNKNOWN_NUMBER_OF_ITEMS) {
534         async_activity<input_type, output_type> my_async_activity(async_expected_items, false, 0);
535         const int overall_message_count = nthreads * NUMBER_OF_MSGS;
536         utils::SpinBarrier spin_barrier(nthreads);
537 
538         tbb::flow::graph g;
539         tbb::flow::function_node<int, input_type> start_node(
540             g, tbb::flow::unlimited, [](int input) { return input_type(input); }
541         );
542         async_node_type offload_node(
543             g, tbb::flow::unlimited,
544             [&](const input_type &input, gateway_type& gateway) {
545                 ++async_body_exec_count;
546                 if(my_async_activity.should_reserve_each_time())
547                     gateway.reserve_wait();
548                 my_async_activity.submit(input, gateway);
549             }
550         );
551         tbb::flow::function_node<output_type> end_node(
552             g, tbb::flow::unlimited, end_body_type(std::this_thread::get_id(), spin_barrier)
553         );
554 
555         tbb::flow::make_edge( start_node, offload_node );
556         tbb::flow::make_edge( offload_node, end_node );
557 
558         async_body_exec_count = 0;
559         async_activity_processed_msg_count = 0;
560         end_body_exec_count = 0;
561         main_tid_count = 0;
562 
563         if (async_expected_items != UNKNOWN_NUMBER_OF_ITEMS ) {
564             offload_node.gateway().reserve_wait();
565         }
566         for (int i = 0; i < overall_message_count; ++i) {
567             start_node.try_put(i);
568         }
569         g.wait_for_all();
570         CHECK_MESSAGE( (async_body_exec_count == overall_message_count),
571                        "AsyncBody processed wrong number of signals" );
572         CHECK_MESSAGE( (async_activity_processed_msg_count == overall_message_count),
573                        "AsyncActivity processed wrong number of signals" );
574         CHECK_MESSAGE( (end_body_exec_count == overall_message_count),
575                        "EndBody processed wrong number of signals");
576 
577         INFO( "Main thread participated in " << main_tid_count << " end_body tasks\n");
578 
579         INFO("async_body_exec_count == " << int(async_body_exec_count) <<
580              " == async_activity_processed_msg_count == " << int(async_activity_processed_msg_count) <<
581              " == end_body_exec_count == " << int(end_body_exec_count) << "\n"
582         );
583         return 0;
584     }
585 
586 };
587 
588 void test_for_spin_avoidance() {
589     const int nthreads = 4;
590     tbb::global_control gc(tbb::global_control::max_allowed_parallelism, nthreads);
591     tbb::task_arena a(nthreads);
592     a.execute([&] {
593         spin_test<int, int>::run(nthreads);
594     });
595 }
596 
597 template< typename Input, typename Output >
598 int run_tests() {
599     basic_test<Input, Output>::run();
600     basic_test<Input, Output>::run(NUMBER_OF_MSGS);
601     basic_test<place_wrapper<Input>, place_wrapper<Output> >::run();
602     basic_test<place_wrapper<Input>, place_wrapper<Output> >::run(NUMBER_OF_MSGS);
603     return 0;
604 }
605 
606 #include "tbb/parallel_for.h"
607 template<typename Input, typename Output>
608 class equeueing_on_inner_level {
609     typedef Input input_type;
610     typedef Output output_type;
611     typedef async_activity<input_type, output_type> async_activity_type;
612     typedef tbb::flow::async_node<Input, Output> async_node_type;
613     typedef typename async_node_type::gateway_type gateway_type;
614 
615     class body_graph_with_async {
616     public:
617         body_graph_with_async( utils::SpinBarrier& barrier, async_activity_type& activity )
618             : spin_barrier(&barrier), my_async_activity(&activity) {}
619 
620         void operator()(int) const {
621             tbb::flow::graph g;
622             tbb::flow::function_node< int, input_type > start_node(
623                 g, tbb::flow::unlimited, [](int input) { return input_type(input); }
624             );
625             async_node_type offload_node(
626                 g, tbb::flow::unlimited,
627                 [&](const input_type &input, gateway_type& gateway) {
628                     gateway.reserve_wait();
629                     my_async_activity->submit( input, gateway );
630                 }
631             );
632             tbb::flow::function_node< output_type > end_node( g, tbb::flow::unlimited, [](output_type){} );
633 
634             tbb::flow::make_edge( start_node, offload_node );
635             tbb::flow::make_edge( offload_node, end_node );
636 
637             start_node.try_put(1);
638 
639             spin_barrier->wait();
640 
641             my_async_activity->activate();
642 
643             g.wait_for_all();
644         }
645 
646     private:
647         utils::SpinBarrier* spin_barrier;
648         async_activity_type* my_async_activity;
649     };
650 
651 public:
652     static int run ()
653     {
654         const int nthreads = tbb::this_task_arena::max_concurrency();
655         utils::SpinBarrier spin_barrier( nthreads );
656 
657         async_activity_type my_async_activity( UNKNOWN_NUMBER_OF_ITEMS, true );
658 
659         tbb::parallel_for( 0, nthreads, body_graph_with_async( spin_barrier, my_async_activity ) );
660         return 0;
661     }
662 };
663 
664 int run_test_equeueing_on_inner_level() {
665     equeueing_on_inner_level<int, int>::run();
666     return 0;
667 }
668 
669 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
670 #include <array>
671 
672 template<typename NodeType>
673 class AsyncActivity {
674 public:
675     using gateway_t = typename NodeType::gateway_type;
676 
677     struct work_type {
678         int input;
679         gateway_t* gateway;
680     };
681 
682     AsyncActivity(size_t limit) : stop_limit(limit), c(0), thr([this]() {
683         while(!end_of_work()) {
684             work_type w;
685             while( my_q.try_pop(w) ) {
686                 int res = do_work(w.input);
687                 w.gateway->try_put(res);
688                 w.gateway->release_wait();
689                 ++c;
690             }
691         }
692     }) {}
693 
694     void submit(int i, gateway_t* gateway) {
695         work_type w = {i, gateway};
696         gateway->reserve_wait();
697         my_q.push(w);
698     }
699 
700     void wait_for_all() { thr.join(); }
701 
702 private:
703     bool end_of_work() { return c >= stop_limit; }
704 
705     int do_work(int& i) { return i + i; }
706 
707     async_activity_queue<work_type> my_q;
708     size_t stop_limit;
709     size_t c;
710     std::thread thr;
711 };
712 
713 void test_follows() {
714     using namespace tbb::flow;
715 
716     using input_t = int;
717     using output_t = int;
718     using node_t = async_node<input_t, output_t>;
719 
720     graph g;
721 
722     AsyncActivity<node_t> async_activity(3);
723 
724     std::array<broadcast_node<input_t>, 3> preds = {
725       {
726         broadcast_node<input_t>(g),
727         broadcast_node<input_t>(g),
728         broadcast_node<input_t>(g)
729       }
730     };
731 
732     node_t node(follows(preds[0], preds[1], preds[2]), unlimited, [&](int input, node_t::gateway_type& gtw) {
733         async_activity.submit(input, &gtw);
734     }, no_priority);
735 
736     buffer_node<output_t> buf(g);
737     make_edge(node, buf);
738 
739     for(auto& pred: preds) {
740         pred.try_put(1);
741     }
742 
743     g.wait_for_all();
744     async_activity.wait_for_all();
745 
746     output_t storage;
747     CHECK_MESSAGE((buf.try_get(storage) && buf.try_get(storage) && buf.try_get(storage) && !buf.try_get(storage)),
748                   "Not exact edge quantity was made");
749 }
750 
751 void test_precedes() {
752     using namespace tbb::flow;
753 
754     using input_t = int;
755     using output_t = int;
756     using node_t = async_node<input_t, output_t>;
757 
758     graph g;
759 
760     AsyncActivity<node_t> async_activity(1);
761 
762     std::array<buffer_node<input_t>, 1> successors = { {buffer_node<input_t>(g)} };
763 
764     broadcast_node<input_t> start(g);
765 
766     node_t node(precedes(successors[0]), unlimited, [&](int input, node_t::gateway_type& gtw) {
767         async_activity.submit(input, &gtw);
768     }, no_priority);
769 
770     make_edge(start, node);
771 
772     start.try_put(1);
773 
774     g.wait_for_all();
775     async_activity.wait_for_all();
776 
777     for(auto& successor : successors) {
778         output_t storage;
779         CHECK_MESSAGE((successor.try_get(storage) && !successor.try_get(storage)),
780                       "Not exact edge quantity was made");
781     }
782 }
783 
784 void test_follows_and_precedes_api() {
785     test_follows();
786     test_precedes();
787 }
788 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
789 
790 //! Test async bodies processing
791 //! \brief \ref requirement \ref error_guessing
792 TEST_CASE("Basic tests"){
793     tbb::task_arena arena(utils::MaxThread);
794     arena.execute(
795         [&]() {
796             run_tests<int, int>();
797             run_tests<minimal_type, minimal_type>();
798             run_tests<int, minimal_type>();
799         }
800     );
801 }
802 
803 //! NativeParallelFor test with various concurrency settings
804 //! \brief \ref requirement \ref error_guessing
805 TEST_CASE("Lightweight tests"){
806     lightweight_testing::test<tbb::flow::async_node>(NUMBER_OF_MSGS);
807 }
808 
809 //! Test reset and cancellation
810 //! \brief \ref error_guessing
811 TEST_CASE("Reset test"){
812     test_reset();
813 }
814 
815 //! Test
816 //! \brief \ref requirement \ref error_guessing
817 TEST_CASE("Copy constructor test"){
818     test_copy_ctor();
819 }
820 
821 //! Test if main thread spins
822 //! \brief \ref stress
823 TEST_CASE("Spin avoidance test"){
824     test_for_spin_avoidance();
825 }
826 
827 //! Test nested enqueing
828 //! \brief \ref error_guessing
829 TEST_CASE("Inner enqueing test"){
830     run_test_equeueing_on_inner_level();
831 }
832 
833 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
834 //! Test deprecated follows and preceedes API
835 //! \brief \ref error_guessing
836 TEST_CASE("Test follows and preceedes API"){
837     test_follows_and_precedes_api();
838 }
839 #endif
840 
841 #if __TBB_CPP20_CONCEPTS_PRESENT
842 //! \brief \ref error_guessing
843 TEST_CASE("constraints for async_node input") {
844     struct InputObject {
845         InputObject() = default;
846         InputObject( const InputObject& ) = default;
847     };
848 
849     static_assert(utils::well_formed_instantiation<tbb::flow::async_node, InputObject, int>);
850     static_assert(utils::well_formed_instantiation<tbb::flow::async_node, int, int>);
851     static_assert(!utils::well_formed_instantiation<tbb::flow::async_node, test_concepts::NonCopyable, int>);
852     static_assert(!utils::well_formed_instantiation<tbb::flow::async_node, test_concepts::NonDefaultInitializable, int>);
853 }
854 
855 template <typename Input, typename Output, typename Body>
856 concept can_call_async_node_ctor = requires( tbb::flow::graph& graph, std::size_t concurrency,
857                                              Body body, tbb::flow::node_priority_t priority, tbb::flow::buffer_node<int>& f ) {
858     tbb::flow::async_node<Input, Output>(graph, concurrency, body);
859     tbb::flow::async_node<Input, Output>(graph, concurrency, body, priority);
860 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
861     tbb::flow::async_node<Input, Output>(tbb::flow::follows(f), concurrency, body);
862     tbb::flow::async_node<Input, Output>(tbb::flow::follows(f), concurrency, body, priority);
863 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
864 };
865 
866 //! \brief \ref error_guessing
867 TEST_CASE("constraints for async_node body") {
868     using input_type = int;
869     using output_type = input_type;
870     using namespace test_concepts::async_node_body;
871 
872     static_assert(can_call_async_node_ctor<input_type, output_type, Correct<input_type, output_type>>);
873     static_assert(!can_call_async_node_ctor<input_type, output_type, NonCopyable<input_type, output_type>>);
874     static_assert(!can_call_async_node_ctor<input_type, output_type, NonDestructible<input_type, output_type>>);
875     static_assert(!can_call_async_node_ctor<input_type, output_type, NoOperatorRoundBrackets<input_type, output_type>>);
876     static_assert(!can_call_async_node_ctor<input_type, output_type, WrongFirstInputOperatorRoundBrackets<input_type, output_type>>);
877     static_assert(!can_call_async_node_ctor<input_type, output_type, WrongSecondInputOperatorRoundBrackets<input_type, output_type>>);
878 }
879 
880 #endif // __TBB_CPP20_CONCEPTS_PRESENT
881