xref: /oneTBB/test/tbb/test_async_node.cpp (revision b15aabb3)
1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #if __INTEL_COMPILER && _MSC_VER
18 #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
19 #endif
20 
21 #include "common/config.h"
22 
23 #define __TBB_EXTRA_DEBUG 1
24 #include "tbb/flow_graph.h"
25 
26 #include "tbb/task.h"
27 #include "tbb/global_control.h"
28 
29 #include "common/test.h"
30 #include "common/utils.h"
31 #include "common/utils_assert.h"
32 #include "common/graph_utils.h"
33 #include "common/spin_barrier.h"
34 #include "common/test_follows_and_precedes_api.h"
35 
36 #include <string>
37 #include <thread>
38 #include <mutex>
39 
40 
41 //! \file test_async_node.cpp
42 //! \brief Test for [flow_graph.async_node] specification
43 
44 
45 class minimal_type {
46     template<typename T>
47     friend struct place_wrapper;
48 
49     int value;
50 
51 public:
52     minimal_type() : value(-1) {}
53     minimal_type(int v) : value(v) {}
54     minimal_type(const minimal_type &m) : value(m.value) { }
55     minimal_type &operator=(const minimal_type &m) { value = m.value; return *this; }
56 };
57 
58 template <typename T>
59 struct place_wrapper {
60     typedef T wrapped_type;
61     T value;
62     std::thread::id thread_id;
63 
64     place_wrapper( int v = 0 ) : value(v), thread_id(std::this_thread::get_id()) {}
65 
66     template <typename Q>
67     place_wrapper(const place_wrapper<Q>& v)
68         : value(v.value), thread_id(v.thread_id)
69     {}
70 
71     template <typename Q>
72     place_wrapper<Q>& operator=(const place_wrapper<Q>& v) {
73         if (this != &v) {
74             value = v.value;
75             thread_id = v.thread_id;
76         }
77         return *this;
78     }
79 
80 };
81 
82 template<typename T1, typename T2>
83 struct wrapper_helper {
84     static void check(const T1 &, const T2 &) { }
85     static void copy_value(const T1 &in, T2 &out) { out = in; }
86 };
87 
88 template<typename T1, typename T2>
89 struct wrapper_helper< place_wrapper<T1>, place_wrapper<T2> > {
90     static void check(const place_wrapper<T1> &a, const place_wrapper<T2> &b) {
91        CHECK_MESSAGE( ( (a.thread_id != b.thread_id)), "same thread used to execute adjacent nodes");
92        return;
93     }
94     static void copy_value(const place_wrapper<T1> &in, place_wrapper<T2> &out) {
95         out.value = in.value;
96     }
97 };
98 
99 const int NUMBER_OF_MSGS = 10;
100 const int UNKNOWN_NUMBER_OF_ITEMS = -1;
101 std::atomic<int> async_body_exec_count;
102 std::atomic<int> async_activity_processed_msg_count;
103 std::atomic<int> end_body_exec_count;
104 
105 // queueing required in test_reset for testing of cancellation
106 typedef tbb::flow::async_node< int, int, tbb::flow::queueing > counting_async_node_type;
107 typedef counting_async_node_type::gateway_type counting_gateway_type;
108 
109 struct counting_async_unlimited_body {
110 
111     counting_async_unlimited_body(tbb::task_group_context& graph_tgc) : my_tgc( graph_tgc ) {}
112 
113     void operator()( const int &input, counting_gateway_type& gateway) {
114         // TODO revamp: reconsider logging for the tests. It is known that frequent calls to
115         // doctest's INFO cause issues.
116 
117         // INFO( "Body execution with input == " << input << "\n");
118         ++async_body_exec_count;
119         if ( input == -1 ) {
120             bool result = my_tgc.cancel_group_execution();
121             // INFO( "Canceling graph execution\n" );
122             CHECK_MESSAGE( ( result == true), "attempted to cancel graph twice" );
123             utils::Sleep(50);
124         }
125         gateway.try_put(input);
126     }
127 private:
128     tbb::task_group_context& my_tgc;
129 };
130 
131 struct counting_async_serial_body : counting_async_unlimited_body {
132     typedef counting_async_unlimited_body base_type;
133     int my_async_body_exec_count;
134 
135     counting_async_serial_body(tbb::task_group_context& tgc)
136         : base_type(tgc), my_async_body_exec_count( 0 ) { }
137 
138     void operator()( const int &input, counting_gateway_type& gateway ) {
139         ++my_async_body_exec_count;
140         base_type::operator()( input, gateway );
141     }
142 };
143 
144 void test_reset() {
145     const int N = NUMBER_OF_MSGS;
146     async_body_exec_count = 0;
147 
148     tbb::task_group_context graph_ctx;
149     tbb::flow::graph g(graph_ctx);
150     counting_async_node_type a(g, tbb::flow::serial, counting_async_serial_body(graph_ctx) );
151 
152     const int R = 3;
153     std::vector< std::shared_ptr<harness_counting_receiver<int>> > r;
154     for (size_t i = 0; i < R; ++i) {
155         r.push_back( std::make_shared<harness_counting_receiver<int>>(g) );
156     }
157 
158     for (int i = 0; i < R; ++i) {
159         tbb::flow::make_edge(a, *r[i]);
160     }
161 
162     INFO( "One body execution\n" );
163     a.try_put(-1);
164     for (int i = 0; i < N; ++i) {
165        a.try_put(i);
166     }
167     g.wait_for_all();
168     // should be canceled with only 1 item reaching the async_body and the counting receivers
169     // and N items left in the node's queue
170     CHECK_MESSAGE( ( g.is_cancelled() == true), "task group not canceled" );
171 
172     counting_async_serial_body b1 = tbb::flow::copy_body<counting_async_serial_body>(a);
173     CHECK_MESSAGE( ( int(async_body_exec_count) == int(b1.my_async_body_exec_count)), "body and global body counts are different" );
174     CHECK_MESSAGE( ( int(async_body_exec_count) == 1), "global body execution count not 1"  );
175     for (int i = 0; i < R; ++i) {
176         CHECK_MESSAGE( ( int(r[i]->my_count) == 1), "counting receiver count not 1" );
177     }
178 
179     // should clear the async_node queue, but retain its local count at 1 and keep all edges
180     g.reset(tbb::flow::rf_reset_protocol);
181 
182     INFO( "N body executions\n" );
183     for (int i = 0; i < N; ++i) {
184        a.try_put(i);
185     }
186     g.wait_for_all();
187     CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" );
188 
189     // a total of N+1 items should have passed through the node body
190     // the local body count should also be N+1
191     // and the counting receivers should all have a count of N+1
192     counting_async_serial_body b2 = tbb::flow::copy_body<counting_async_serial_body>(a);
193     CHECK_MESSAGE( int(async_body_exec_count) == int(b2.my_async_body_exec_count),
194                    "local and global body execution counts are different" );
195     INFO( "async_body_exec_count==" << int(async_body_exec_count) << "\n" );
196     CHECK_MESSAGE( ( int(async_body_exec_count) == N+1), "global body execution count not N+1"  );
197     for (int i = 0; i < R; ++i) {
198         CHECK_MESSAGE( ( int(r[i]->my_count) == N+1), "counting receiver has not received N+1 items" );
199     }
200 
201     INFO( "N body executions with new bodies\n" );
202     // should clear the async_node queue and reset its local count to 0, but keep all edges
203     g.reset(tbb::flow::rf_reset_bodies);
204     for (int i = 0; i < N; ++i) {
205        a.try_put(i);
206     }
207     g.wait_for_all();
208     CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" );
209 
210     // a total of 2N+1 items should have passed through the node body
211     // the local body count should be N
212     // and the counting receivers should all have a count of 2N+1
213     counting_async_serial_body b3 = tbb::flow::copy_body<counting_async_serial_body>(a);
214     CHECK_MESSAGE( ( int(async_body_exec_count) == 2*N+1), "global body execution count not 2N+1"  );
215     CHECK_MESSAGE( ( int(b3.my_async_body_exec_count) == N), "local body execution count not N"  );
216     for (int i = 0; i < R; ++i) {
217         CHECK_MESSAGE( ( int(r[i]->my_count) == 2*N+1), "counting receiver has not received 2N+1 items" );
218     }
219 
220     // should clear the async_node queue and keep its local count at N and remove all edges
221     INFO( "N body executions with no edges\n" );
222     g.reset(tbb::flow::rf_clear_edges);
223     for (int i = 0; i < N; ++i) {
224        a.try_put(i);
225     }
226     g.wait_for_all();
227     CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" );
228 
229     // a total of 3N+1 items should have passed through the node body
230     // the local body count should now be 2*N
231     // and the counting receivers should remain at a count of 2N+1
232     counting_async_serial_body b4 = tbb::flow::copy_body<counting_async_serial_body>(a);
233     CHECK_MESSAGE( ( int(async_body_exec_count) == 3*N+1), "global body execution count not 3N+1"  );
234     CHECK_MESSAGE( ( int(b4.my_async_body_exec_count) == 2*N), "local body execution count not 2N"  );
235     for (int i = 0; i < R; ++i) {
236         CHECK_MESSAGE( ( int(r[i]->my_count) == 2*N+1), "counting receiver has not received 2N+1 items" );
237     }
238 
239     // put back 1 edge to receiver 0
240     INFO( "N body executions with 1 edge\n" );
241     tbb::flow::make_edge(a, *r[0]);
242     for (int i = 0; i < N; ++i) {
243        a.try_put(i);
244     }
245     g.wait_for_all();
246     CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" );
247 
248     // a total of 4N+1 items should have passed through the node body
249     // the local body count should now be 3*N
250     // and all of the counting receivers should remain at a count of 2N+1, except r[0] which should be 3N+1
251     counting_async_serial_body b5 = tbb::flow::copy_body<counting_async_serial_body>(a);
252     CHECK_MESSAGE( ( int(async_body_exec_count) == 4*N+1), "global body execution count not 4N+1"  );
253     CHECK_MESSAGE( ( int(b5.my_async_body_exec_count) == 3*N), "local body execution count not 3N"  );
254     CHECK_MESSAGE( ( int(r[0]->my_count) == 3*N+1), "counting receiver has not received 3N+1 items" );
255     for (int i = 1; i < R; ++i) {
256         CHECK_MESSAGE( ( int(r[i]->my_count) == 2*N+1), "counting receiver has not received 2N+1 items" );
257     }
258 
259     // should clear the async_node queue and keep its local count at N and remove all edges
260     INFO( "N body executions with no edges and new body\n" );
261     g.reset(static_cast<tbb::flow::reset_flags>(tbb::flow::rf_reset_bodies|tbb::flow::rf_clear_edges));
262     for (int i = 0; i < N; ++i) {
263        a.try_put(i);
264     }
265     g.wait_for_all();
266     CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" );
267 
268     // a total of 4N+1 items should have passed through the node body
269     // the local body count should now be 3*N
270     // and all of the counting receivers should remain at a count of 2N+1, except r[0] which should be 3N+1
271     counting_async_serial_body b6 = tbb::flow::copy_body<counting_async_serial_body>(a);
272     CHECK_MESSAGE( ( int(async_body_exec_count) == 5*N+1), "global body execution count not 5N+1"  );
273     CHECK_MESSAGE( ( int(b6.my_async_body_exec_count) == N), "local body execution count not N"  );
274     CHECK_MESSAGE( ( int(r[0]->my_count) == 3*N+1), "counting receiver has not received 3N+1 items" );
275     for (int i = 1; i < R; ++i) {
276         CHECK_MESSAGE( ( int(r[i]->my_count) == 2*N+1), "counting receiver has not received 2N+1 items" );
277     }
278 }
279 
280 
281 #include <mutex>
282 
283 template <typename T>
284 class async_activity_queue {
285 public:
286     void push( const T& item ) {
287         std::lock_guard<mutex_t> lock( m_mutex );
288         m_queue.push( item );
289     }
290 
291     bool try_pop( T& item ) {
292         std::lock_guard<mutex_t> lock( m_mutex );
293         if( m_queue.empty() )
294             return false;
295         item = m_queue.front();
296         m_queue.pop();
297         return true;
298     }
299 
300     bool empty() {
301         std::lock_guard<mutex_t> lock( m_mutex );
302         return m_queue.empty();
303     }
304 
305 private:
306     typedef std::mutex mutex_t;
307     mutex_t m_mutex;
308     std::queue<T> m_queue;
309 };
310 
311 template< typename Input, typename Output >
312 class async_activity : utils::NoAssign {
313 public:
314     typedef Input input_type;
315     typedef Output output_type;
316     typedef tbb::flow::async_node< input_type, output_type > async_node_type;
317     typedef typename async_node_type::gateway_type gateway_type;
318 
319     struct work_type {
320         input_type input;
321         gateway_type* gateway;
322     };
323 
324     class ServiceThreadBody {
325     public:
326         ServiceThreadBody( async_activity* activity ) : my_activity( activity ) {}
327         void operator()() { my_activity->process(); }
328     private:
329         async_activity* my_activity;
330     };
331 
332     async_activity(int expected_items, bool deferred = false, int sleep_time = 50)
333         : my_expected_items(expected_items), my_sleep_time(sleep_time)
334     {
335         is_active = !deferred;
336         my_quit = false;
337         std::thread( ServiceThreadBody( this ) ).swap( my_service_thread );
338     }
339 
340 private:
341 
342     async_activity( const async_activity& )
343         : my_expected_items(UNKNOWN_NUMBER_OF_ITEMS), my_sleep_time(0)
344     {
345         is_active = true;
346     }
347 
348 public:
349     ~async_activity() {
350         stop();
351         my_service_thread.join();
352     }
353 
354     void submit( const input_type &input, gateway_type& gateway ) {
355         work_type work = {input, &gateway};
356         my_work_queue.push( work );
357     }
358 
359     void process() {
360         do {
361             work_type work;
362             if( is_active && my_work_queue.try_pop( work ) ) {
363                 utils::Sleep(my_sleep_time);
364                 ++async_activity_processed_msg_count;
365                 output_type output;
366                 wrapper_helper<output_type, output_type>::copy_value(work.input, output);
367                 wrapper_helper<output_type, output_type>::check(work.input, output);
368                 work.gateway->try_put(output);
369                 if ( my_expected_items == UNKNOWN_NUMBER_OF_ITEMS ||
370                      int(async_activity_processed_msg_count) == my_expected_items ) {
371                     work.gateway->release_wait();
372                 }
373             }
374         } while( my_quit == false || !my_work_queue.empty());
375     }
376 
377     void stop() {
378         my_quit = true;
379     }
380 
381     void activate() {
382         is_active = true;
383     }
384 
385     bool should_reserve_each_time() {
386         if ( my_expected_items == UNKNOWN_NUMBER_OF_ITEMS )
387             return true;
388         else
389             return false;
390     }
391 
392 private:
393 
394     const int my_expected_items;
395     const int my_sleep_time;
396     std::atomic< bool > is_active;
397 
398     async_activity_queue<work_type> my_work_queue;
399 
400     std::atomic< bool > my_quit;
401 
402     std::thread my_service_thread;
403 };
404 
405 template<typename Input, typename Output>
406 struct basic_test {
407     typedef Input input_type;
408     typedef Output output_type;
409     typedef tbb::flow::async_node< input_type, output_type > async_node_type;
410     typedef typename async_node_type::gateway_type gateway_type;
411 
412     basic_test() {}
413 
414     static int run(int async_expected_items = UNKNOWN_NUMBER_OF_ITEMS) {
415         async_activity<input_type, output_type> my_async_activity(async_expected_items);
416 
417         tbb::flow::graph g;
418 
419         tbb::flow::function_node< int, input_type > start_node(
420             g, tbb::flow::unlimited, [](int input) { return input_type(input); }
421         );
422         async_node_type offload_node(
423             g, tbb::flow::unlimited,
424             [&] (const input_type &input, gateway_type& gateway) {
425                 ++async_body_exec_count;
426                 if(my_async_activity.should_reserve_each_time())
427                     gateway.reserve_wait();
428                 my_async_activity.submit(input, gateway);
429             }
430         );
431         tbb::flow::function_node< output_type > end_node(
432             g, tbb::flow::unlimited,
433             [&](const output_type& input) {
434                 ++end_body_exec_count;
435                 output_type output;
436                 wrapper_helper<output_type, output_type>::check(input, output);
437             }
438         );
439 
440         tbb::flow::make_edge( start_node, offload_node );
441         tbb::flow::make_edge( offload_node, end_node );
442 
443         async_body_exec_count = 0;
444         async_activity_processed_msg_count = 0;
445         end_body_exec_count = 0;
446 
447         if (async_expected_items != UNKNOWN_NUMBER_OF_ITEMS) {
448             offload_node.gateway().reserve_wait();
449         }
450         for (int i = 0; i < NUMBER_OF_MSGS; ++i) {
451             start_node.try_put(i);
452         }
453         g.wait_for_all();
454         CHECK_MESSAGE( ( async_body_exec_count == NUMBER_OF_MSGS), "AsyncBody processed wrong number of signals" );
455         CHECK_MESSAGE( ( async_activity_processed_msg_count == NUMBER_OF_MSGS), "AsyncActivity processed wrong number of signals" );
456         CHECK_MESSAGE( ( end_body_exec_count == NUMBER_OF_MSGS), "EndBody processed wrong number of signals");
457         INFO( "async_body_exec_count == " << int(async_body_exec_count) <<
458               " == async_activity_processed_msg_count == " << int(async_activity_processed_msg_count) <<
459               " == end_body_exec_count == " << int(end_body_exec_count) << "\n"
460         );
461         return 0;
462     }
463 
464 };
465 
466 int test_copy_ctor() {
467     const int N = NUMBER_OF_MSGS;
468     async_body_exec_count = 0;
469 
470     tbb::flow::graph g;
471 
472     harness_counting_receiver<int> r1(g);
473     harness_counting_receiver<int> r2(g);
474 
475     tbb::task_group_context graph_ctx;
476     counting_async_node_type a(g, tbb::flow::unlimited, counting_async_unlimited_body(graph_ctx) );
477     counting_async_node_type b(a);
478 
479     tbb::flow::make_edge(a, r1);                             // C++11-style of making edges
480     tbb::flow::make_edge(tbb::flow::output_port<0>(b), r2);  // usual way of making edges
481 
482     for (int i = 0; i < N; ++i) {
483        a.try_put(i);
484     }
485     g.wait_for_all();
486 
487     INFO("async_body_exec_count = " << int(async_body_exec_count) << "\n" );
488     INFO("r1.my_count == " << int(r1.my_count) << " and r2.my_count = " << int(r2.my_count) << "\n" );
489     CHECK_MESSAGE( ( int(async_body_exec_count) == NUMBER_OF_MSGS), "AsyncBody processed wrong number of signals" );
490     CHECK_MESSAGE( ( int(r1.my_count) == N), "counting receiver r1 has not received N items" );
491     CHECK_MESSAGE( ( int(r2.my_count) == 0), "counting receiver r2 has not received 0 items" );
492 
493     for (int i = 0; i < N; ++i) {
494        b.try_put(i);
495     }
496     g.wait_for_all();
497 
498     INFO("async_body_exec_count = " << int(async_body_exec_count) << "\n" );
499     INFO("r1.my_count == " << int(r1.my_count) << " and r2.my_count = " << int(r2.my_count) << "\n" );
500     CHECK_MESSAGE( ( int(async_body_exec_count) == 2*NUMBER_OF_MSGS), "AsyncBody processed wrong number of signals" );
501     CHECK_MESSAGE( ( int(r1.my_count) == N), "counting receiver r1 has not received N items" );
502     CHECK_MESSAGE( ( int(r2.my_count) == N), "counting receiver r2 has not received N items" );
503     return 0;
504 }
505 
506 std::atomic<int> main_tid_count;
507 
508 template<typename Input, typename Output>
509 struct spin_test {
510     typedef Input input_type;
511     typedef Output output_type;
512     typedef tbb::flow::async_node< input_type, output_type > async_node_type;
513     typedef typename async_node_type::gateway_type gateway_type;
514 
515     class end_body_type {
516         typedef Output output_type;
517         std::thread::id my_main_tid;
518         utils::SpinBarrier *my_barrier;
519     public:
520         end_body_type(std::thread::id t, utils::SpinBarrier &b) : my_main_tid(t), my_barrier(&b) { }
521 
522         void operator()( const output_type & ) {
523             ++end_body_exec_count;
524             if (std::this_thread::get_id() == my_main_tid) {
525                ++main_tid_count;
526             }
527             my_barrier->wait();
528         }
529     };
530 
531     spin_test() {}
532 
533     static int run(int nthreads, int async_expected_items = UNKNOWN_NUMBER_OF_ITEMS) {
534         async_activity<input_type, output_type> my_async_activity(async_expected_items, false, 0);
535         const int overall_message_count = nthreads * NUMBER_OF_MSGS;
536         utils::SpinBarrier spin_barrier(nthreads);
537 
538         tbb::flow::graph g;
539         tbb::flow::function_node<int, input_type> start_node(
540             g, tbb::flow::unlimited, [](int input) { return input_type(input); }
541         );
542         async_node_type offload_node(
543             g, tbb::flow::unlimited,
544             [&](const input_type &input, gateway_type& gateway) {
545                 ++async_body_exec_count;
546                 if(my_async_activity.should_reserve_each_time())
547                     gateway.reserve_wait();
548                 my_async_activity.submit(input, gateway);
549             }
550         );
551         tbb::flow::function_node<output_type> end_node(
552             g, tbb::flow::unlimited, end_body_type(std::this_thread::get_id(), spin_barrier)
553         );
554 
555         tbb::flow::make_edge( start_node, offload_node );
556         tbb::flow::make_edge( offload_node, end_node );
557 
558         async_body_exec_count = 0;
559         async_activity_processed_msg_count = 0;
560         end_body_exec_count = 0;
561         main_tid_count = 0;
562 
563         if (async_expected_items != UNKNOWN_NUMBER_OF_ITEMS ) {
564             offload_node.gateway().reserve_wait();
565         }
566         for (int i = 0; i < overall_message_count; ++i) {
567             start_node.try_put(i);
568         }
569         g.wait_for_all();
570         CHECK_MESSAGE( (async_body_exec_count == overall_message_count),
571                        "AsyncBody processed wrong number of signals" );
572         CHECK_MESSAGE( (async_activity_processed_msg_count == overall_message_count),
573                        "AsyncActivity processed wrong number of signals" );
574         CHECK_MESSAGE( (end_body_exec_count == overall_message_count),
575                        "EndBody processed wrong number of signals");
576 
577         INFO( "Main thread participated in " << main_tid_count << " end_body tasks\n");
578 
579         INFO("async_body_exec_count == " << int(async_body_exec_count) <<
580              " == async_activity_processed_msg_count == " << int(async_activity_processed_msg_count) <<
581              " == end_body_exec_count == " << int(end_body_exec_count) << "\n"
582         );
583         return 0;
584     }
585 
586 };
587 
588 void test_for_spin_avoidance() {
589     const int nthreads = 4;
590     tbb::global_control gc(tbb::global_control::max_allowed_parallelism, nthreads);
591     spin_test<int, int>::run(nthreads);
592 }
593 
594 template< typename Input, typename Output >
595 int run_tests() {
596     basic_test<Input, Output>::run();
597     basic_test<Input, Output>::run(NUMBER_OF_MSGS);
598     basic_test<place_wrapper<Input>, place_wrapper<Output> >::run();
599     basic_test<place_wrapper<Input>, place_wrapper<Output> >::run(NUMBER_OF_MSGS);
600     return 0;
601 }
602 
603 #include "tbb/parallel_for.h"
604 template<typename Input, typename Output>
605 class equeueing_on_inner_level {
606     typedef Input input_type;
607     typedef Output output_type;
608     typedef async_activity<input_type, output_type> async_activity_type;
609     typedef tbb::flow::async_node<Input, Output> async_node_type;
610     typedef typename async_node_type::gateway_type gateway_type;
611 
612     class body_graph_with_async {
613     public:
614         body_graph_with_async( utils::SpinBarrier& barrier, async_activity_type& activity )
615             : spin_barrier(&barrier), my_async_activity(&activity) {}
616 
617         void operator()(int) const {
618             tbb::flow::graph g;
619             tbb::flow::function_node< int, input_type > start_node(
620                 g, tbb::flow::unlimited, [](int input) { return input_type(input); }
621             );
622             async_node_type offload_node(
623                 g, tbb::flow::unlimited,
624                 [&](const input_type &input, gateway_type& gateway) {
625                     gateway.reserve_wait();
626                     my_async_activity->submit( input, gateway );
627                 }
628             );
629             tbb::flow::function_node< output_type > end_node( g, tbb::flow::unlimited, [](output_type){} );
630 
631             tbb::flow::make_edge( start_node, offload_node );
632             tbb::flow::make_edge( offload_node, end_node );
633 
634             start_node.try_put(1);
635 
636             spin_barrier->wait();
637 
638             my_async_activity->activate();
639 
640             g.wait_for_all();
641         }
642 
643     private:
644         utils::SpinBarrier* spin_barrier;
645         async_activity_type* my_async_activity;
646     };
647 
648 public:
649     static int run ()
650     {
651         const int nthreads = tbb::this_task_arena::max_concurrency();
652         utils::SpinBarrier spin_barrier( nthreads );
653 
654         async_activity_type my_async_activity( UNKNOWN_NUMBER_OF_ITEMS, true );
655 
656         tbb::parallel_for( 0, nthreads, body_graph_with_async( spin_barrier, my_async_activity ) );
657         return 0;
658     }
659 };
660 
661 int run_test_equeueing_on_inner_level() {
662     equeueing_on_inner_level<int, int>::run();
663     return 0;
664 }
665 
666 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
667 #include <array>
668 
669 template<typename NodeType>
670 class AsyncActivity {
671 public:
672     using gateway_t = typename NodeType::gateway_type;
673 
674     struct work_type {
675         int input;
676         gateway_t* gateway;
677     };
678 
679     AsyncActivity(size_t limit) : stop_limit(limit), c(0), thr([this]() {
680         while(!end_of_work()) {
681             work_type w;
682             while( my_q.try_pop(w) ) {
683                 int res = do_work(w.input);
684                 w.gateway->try_put(res);
685                 w.gateway->release_wait();
686                 ++c;
687             }
688         }
689     }) {}
690 
691     void submit(int i, gateway_t* gateway) {
692         work_type w = {i, gateway};
693         gateway->reserve_wait();
694         my_q.push(w);
695     }
696 
697     void wait_for_all() { thr.join(); }
698 
699 private:
700     bool end_of_work() { return c >= stop_limit; }
701 
702     int do_work(int& i) { return i + i; }
703 
704     async_activity_queue<work_type> my_q;
705     size_t stop_limit;
706     size_t c;
707     std::thread thr;
708 };
709 
710 void test_follows() {
711     using namespace tbb::flow;
712 
713     using input_t = int;
714     using output_t = int;
715     using node_t = async_node<input_t, output_t>;
716 
717     graph g;
718 
719     AsyncActivity<node_t> async_activity(3);
720 
721     std::array<broadcast_node<input_t>, 3> preds = {
722       {
723         broadcast_node<input_t>(g),
724         broadcast_node<input_t>(g),
725         broadcast_node<input_t>(g)
726       }
727     };
728 
729     node_t node(follows(preds[0], preds[1], preds[2]), unlimited, [&](int input, node_t::gateway_type& gtw) {
730         async_activity.submit(input, &gtw);
731     }, no_priority);
732 
733     buffer_node<output_t> buf(g);
734     make_edge(node, buf);
735 
736     for(auto& pred: preds) {
737         pred.try_put(1);
738     }
739 
740     g.wait_for_all();
741     async_activity.wait_for_all();
742 
743     output_t storage;
744     CHECK_MESSAGE((buf.try_get(storage) && buf.try_get(storage) && buf.try_get(storage) && !buf.try_get(storage)),
745                   "Not exact edge quantity was made");
746 }
747 
748 void test_precedes() {
749     using namespace tbb::flow;
750 
751     using input_t = int;
752     using output_t = int;
753     using node_t = async_node<input_t, output_t>;
754 
755     graph g;
756 
757     AsyncActivity<node_t> async_activity(1);
758 
759     std::array<buffer_node<input_t>, 1> successors = { {buffer_node<input_t>(g)} };
760 
761     broadcast_node<input_t> start(g);
762 
763     node_t node(precedes(successors[0]), unlimited, [&](int input, node_t::gateway_type& gtw) {
764         async_activity.submit(input, &gtw);
765     }, no_priority);
766 
767     make_edge(start, node);
768 
769     start.try_put(1);
770 
771     g.wait_for_all();
772     async_activity.wait_for_all();
773 
774     for(auto& successor : successors) {
775         output_t storage;
776         CHECK_MESSAGE((successor.try_get(storage) && !successor.try_get(storage)),
777                       "Not exact edge quantity was made");
778     }
779 }
780 
781 void test_follows_and_precedes_api() {
782     test_follows();
783     test_precedes();
784 }
785 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
786 
787 //! Test async bodies processing
788 //! \brief \ref requirement \ref error_guessing
789 TEST_CASE("Basic tests"){
790     tbb::task_arena arena(utils::MaxThread);
791     arena.execute(
792         [&]() {
793             run_tests<int, int>();
794             run_tests<minimal_type, minimal_type>();
795             run_tests<int, minimal_type>();
796         }
797     );
798 }
799 
800 //! NativeParallelFor test with various concurrency settings
801 //! \brief \ref requirement \ref error_guessing
802 TEST_CASE("Lightweight tests"){
803     lightweight_testing::test<tbb::flow::async_node>(NUMBER_OF_MSGS);
804 }
805 
806 //! Test reset and cancellation
807 //! \brief \ref error_guessing
808 TEST_CASE("Reset test"){
809     test_reset();
810 }
811 
812 //! Test
813 //! \brief \ref requirement \ref error_guessing
814 TEST_CASE("Copy constructor test"){
815     test_copy_ctor();
816 }
817 
818 //! Test if main thread spins
819 //! \brief \ref stress
820 TEST_CASE("Spin avoidance test"){
821     test_for_spin_avoidance();
822 }
823 
824 //! Test nested enqueing
825 //! \brief \ref error_guessing
826 TEST_CASE("Inner enqueing test"){
827     run_test_equeueing_on_inner_level();
828 }
829 
830 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
831 //! Test deprecated follows and preceedes API
832 //! \brief \ref error_guessing
833 TEST_CASE("Test follows and preceedes API"){
834     test_follows_and_precedes_api();
835 }
836 #endif
837