xref: /oneTBB/test/tbb/test_async_node.cpp (revision 51c0b2f7)
1 /*
2     Copyright (c) 2005-2020 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 
18 #include "common/config.h"
19 
20 #define __TBB_EXTRA_DEBUG 1
21 #include "tbb/flow_graph.h"
22 
23 #include "tbb/task.h"
24 #include "tbb/global_control.h"
25 
26 #include "common/test.h"
27 #include "common/utils.h"
28 #include "common/utils_assert.h"
29 #include "common/graph_utils.h"
30 #include "common/spin_barrier.h"
31 #include "common/test_follows_and_precedes_api.h"
32 
33 #include <string>
34 #include <thread>
35 #include <mutex>
36 
37 
38 //! \file test_async_node.cpp
39 //! \brief Test for [flow_graph.async_node] specification
40 
41 
42 class minimal_type {
43     template<typename T>
44     friend struct place_wrapper;
45 
46     int value;
47 
48 public:
49     minimal_type() : value(-1) {}
50     minimal_type(int v) : value(v) {}
51     minimal_type(const minimal_type &m) : value(m.value) { }
52     minimal_type &operator=(const minimal_type &m) { value = m.value; return *this; }
53 };
54 
55 template <typename T>
56 struct place_wrapper {
57     typedef T wrapped_type;
58     T value;
59     std::thread::id thread_id;
60 
61     place_wrapper( int v = 0 ) : value(v), thread_id(std::this_thread::get_id()) {}
62 
63     template <typename Q>
64     place_wrapper(const place_wrapper<Q>& v)
65         : value(v.value), thread_id(v.thread_id)
66     {}
67 
68     template <typename Q>
69     place_wrapper<Q>& operator=(const place_wrapper<Q>& v) {
70         if (this != &v) {
71             value = v.value;
72             thread_id = v.thread_id;
73         }
74         return *this;
75     }
76 
77 };
78 
79 template<typename T1, typename T2>
80 struct wrapper_helper {
81     static void check(const T1 &, const T2 &) { }
82     static void copy_value(const T1 &in, T2 &out) { out = in; }
83 };
84 
85 template<typename T1, typename T2>
86 struct wrapper_helper< place_wrapper<T1>, place_wrapper<T2> > {
87     static void check(const place_wrapper<T1> &a, const place_wrapper<T2> &b) {
88        CHECK_MESSAGE( ( (a.thread_id != b.thread_id)), "same thread used to execute adjacent nodes");
89        return;
90     }
91     static void copy_value(const place_wrapper<T1> &in, place_wrapper<T2> &out) {
92         out.value = in.value;
93     }
94 };
95 
96 const int NUMBER_OF_MSGS = 10;
97 const int UNKNOWN_NUMBER_OF_ITEMS = -1;
98 std::atomic<int> async_body_exec_count;
99 std::atomic<int> async_activity_processed_msg_count;
100 std::atomic<int> end_body_exec_count;
101 
102 // queueing required in test_reset for testing of cancellation
103 typedef tbb::flow::async_node< int, int, tbb::flow::queueing > counting_async_node_type;
104 typedef counting_async_node_type::gateway_type counting_gateway_type;
105 
106 struct counting_async_unlimited_body {
107 
108     counting_async_unlimited_body(tbb::task_group_context& graph_tgc) : my_tgc( graph_tgc ) {}
109 
110     void operator()( const int &input, counting_gateway_type& gateway) {
111         // TODO revamp: reconsider logging for the tests. It is known that frequent calls to
112         // doctest's INFO cause issues.
113 
114         // INFO( "Body execution with input == " << input << "\n");
115         ++async_body_exec_count;
116         if ( input == -1 ) {
117             bool result = my_tgc.cancel_group_execution();
118             // INFO( "Canceling graph execution\n" );
119             CHECK_MESSAGE( ( result == true), "attempted to cancel graph twice" );
120             utils::Sleep(50);
121         }
122         gateway.try_put(input);
123     }
124 private:
125     tbb::task_group_context& my_tgc;
126 };
127 
128 struct counting_async_serial_body : counting_async_unlimited_body {
129     typedef counting_async_unlimited_body base_type;
130     int my_async_body_exec_count;
131 
132     counting_async_serial_body(tbb::task_group_context& tgc)
133         : base_type(tgc), my_async_body_exec_count( 0 ) { }
134 
135     void operator()( const int &input, counting_gateway_type& gateway ) {
136         ++my_async_body_exec_count;
137         base_type::operator()( input, gateway );
138     }
139 };
140 
141 void test_reset() {
142     const int N = NUMBER_OF_MSGS;
143     async_body_exec_count = 0;
144 
145     tbb::task_group_context graph_ctx;
146     tbb::flow::graph g(graph_ctx);
147     counting_async_node_type a(g, tbb::flow::serial, counting_async_serial_body(graph_ctx) );
148 
149     const int R = 3;
150     std::vector< std::shared_ptr<harness_counting_receiver<int>> > r;
151     for (size_t i = 0; i < R; ++i) {
152         r.push_back( std::make_shared<harness_counting_receiver<int>>(g) );
153     }
154 
155     for (int i = 0; i < R; ++i) {
156         tbb::flow::make_edge(a, *r[i]);
157     }
158 
159     INFO( "One body execution\n" );
160     a.try_put(-1);
161     for (int i = 0; i < N; ++i) {
162        a.try_put(i);
163     }
164     g.wait_for_all();
165     // should be canceled with only 1 item reaching the async_body and the counting receivers
166     // and N items left in the node's queue
167     CHECK_MESSAGE( ( g.is_cancelled() == true), "task group not canceled" );
168 
169     counting_async_serial_body b1 = tbb::flow::copy_body<counting_async_serial_body>(a);
170     CHECK_MESSAGE( ( int(async_body_exec_count) == int(b1.my_async_body_exec_count)), "body and global body counts are different" );
171     CHECK_MESSAGE( ( int(async_body_exec_count) == 1), "global body execution count not 1"  );
172     for (int i = 0; i < R; ++i) {
173         CHECK_MESSAGE( ( int(r[i]->my_count) == 1), "counting receiver count not 1" );
174     }
175 
176     // should clear the async_node queue, but retain its local count at 1 and keep all edges
177     g.reset(tbb::flow::rf_reset_protocol);
178 
179     INFO( "N body executions\n" );
180     for (int i = 0; i < N; ++i) {
181        a.try_put(i);
182     }
183     g.wait_for_all();
184     CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" );
185 
186     // a total of N+1 items should have passed through the node body
187     // the local body count should also be N+1
188     // and the counting receivers should all have a count of N+1
189     counting_async_serial_body b2 = tbb::flow::copy_body<counting_async_serial_body>(a);
190     CHECK_MESSAGE( int(async_body_exec_count) == int(b2.my_async_body_exec_count),
191                    "local and global body execution counts are different" );
192     INFO( "async_body_exec_count==" << int(async_body_exec_count) << "\n" );
193     CHECK_MESSAGE( ( int(async_body_exec_count) == N+1), "global body execution count not N+1"  );
194     for (int i = 0; i < R; ++i) {
195         CHECK_MESSAGE( ( int(r[i]->my_count) == N+1), "counting receiver has not received N+1 items" );
196     }
197 
198     INFO( "N body executions with new bodies\n" );
199     // should clear the async_node queue and reset its local count to 0, but keep all edges
200     g.reset(tbb::flow::rf_reset_bodies);
201     for (int i = 0; i < N; ++i) {
202        a.try_put(i);
203     }
204     g.wait_for_all();
205     CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" );
206 
207     // a total of 2N+1 items should have passed through the node body
208     // the local body count should be N
209     // and the counting receivers should all have a count of 2N+1
210     counting_async_serial_body b3 = tbb::flow::copy_body<counting_async_serial_body>(a);
211     CHECK_MESSAGE( ( int(async_body_exec_count) == 2*N+1), "global body execution count not 2N+1"  );
212     CHECK_MESSAGE( ( int(b3.my_async_body_exec_count) == N), "local body execution count not N"  );
213     for (int i = 0; i < R; ++i) {
214         CHECK_MESSAGE( ( int(r[i]->my_count) == 2*N+1), "counting receiver has not received 2N+1 items" );
215     }
216 
217     // should clear the async_node queue and keep its local count at N and remove all edges
218     INFO( "N body executions with no edges\n" );
219     g.reset(tbb::flow::rf_clear_edges);
220     for (int i = 0; i < N; ++i) {
221        a.try_put(i);
222     }
223     g.wait_for_all();
224     CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" );
225 
226     // a total of 3N+1 items should have passed through the node body
227     // the local body count should now be 2*N
228     // and the counting receivers should remain at a count of 2N+1
229     counting_async_serial_body b4 = tbb::flow::copy_body<counting_async_serial_body>(a);
230     CHECK_MESSAGE( ( int(async_body_exec_count) == 3*N+1), "global body execution count not 3N+1"  );
231     CHECK_MESSAGE( ( int(b4.my_async_body_exec_count) == 2*N), "local body execution count not 2N"  );
232     for (int i = 0; i < R; ++i) {
233         CHECK_MESSAGE( ( int(r[i]->my_count) == 2*N+1), "counting receiver has not received 2N+1 items" );
234     }
235 
236     // put back 1 edge to receiver 0
237     INFO( "N body executions with 1 edge\n" );
238     tbb::flow::make_edge(a, *r[0]);
239     for (int i = 0; i < N; ++i) {
240        a.try_put(i);
241     }
242     g.wait_for_all();
243     CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" );
244 
245     // a total of 4N+1 items should have passed through the node body
246     // the local body count should now be 3*N
247     // and all of the counting receivers should remain at a count of 2N+1, except r[0] which should be 3N+1
248     counting_async_serial_body b5 = tbb::flow::copy_body<counting_async_serial_body>(a);
249     CHECK_MESSAGE( ( int(async_body_exec_count) == 4*N+1), "global body execution count not 4N+1"  );
250     CHECK_MESSAGE( ( int(b5.my_async_body_exec_count) == 3*N), "local body execution count not 3N"  );
251     CHECK_MESSAGE( ( int(r[0]->my_count) == 3*N+1), "counting receiver has not received 3N+1 items" );
252     for (int i = 1; i < R; ++i) {
253         CHECK_MESSAGE( ( int(r[i]->my_count) == 2*N+1), "counting receiver has not received 2N+1 items" );
254     }
255 
256     // should clear the async_node queue and keep its local count at N and remove all edges
257     INFO( "N body executions with no edges and new body\n" );
258     g.reset(static_cast<tbb::flow::reset_flags>(tbb::flow::rf_reset_bodies|tbb::flow::rf_clear_edges));
259     for (int i = 0; i < N; ++i) {
260        a.try_put(i);
261     }
262     g.wait_for_all();
263     CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" );
264 
265     // a total of 4N+1 items should have passed through the node body
266     // the local body count should now be 3*N
267     // and all of the counting receivers should remain at a count of 2N+1, except r[0] which should be 3N+1
268     counting_async_serial_body b6 = tbb::flow::copy_body<counting_async_serial_body>(a);
269     CHECK_MESSAGE( ( int(async_body_exec_count) == 5*N+1), "global body execution count not 5N+1"  );
270     CHECK_MESSAGE( ( int(b6.my_async_body_exec_count) == N), "local body execution count not N"  );
271     CHECK_MESSAGE( ( int(r[0]->my_count) == 3*N+1), "counting receiver has not received 3N+1 items" );
272     for (int i = 1; i < R; ++i) {
273         CHECK_MESSAGE( ( int(r[i]->my_count) == 2*N+1), "counting receiver has not received 2N+1 items" );
274     }
275 }
276 
277 
278 #include <mutex>
279 
280 template <typename T>
281 class async_activity_queue {
282 public:
283     void push( const T& item ) {
284         std::lock_guard<mutex_t> lock( m_mutex );
285         m_queue.push( item );
286     }
287 
288     bool try_pop( T& item ) {
289         std::lock_guard<mutex_t> lock( m_mutex );
290         if( m_queue.empty() )
291             return false;
292         item = m_queue.front();
293         m_queue.pop();
294         return true;
295     }
296 
297     bool empty() {
298         std::lock_guard<mutex_t> lock( m_mutex );
299         return m_queue.empty();
300     }
301 
302 private:
303     typedef std::mutex mutex_t;
304     mutex_t m_mutex;
305     std::queue<T> m_queue;
306 };
307 
308 template< typename Input, typename Output >
309 class async_activity : utils::NoAssign {
310 public:
311     typedef Input input_type;
312     typedef Output output_type;
313     typedef tbb::flow::async_node< input_type, output_type > async_node_type;
314     typedef typename async_node_type::gateway_type gateway_type;
315 
316     struct work_type {
317         input_type input;
318         gateway_type* gateway;
319     };
320 
321     class ServiceThreadBody {
322     public:
323         ServiceThreadBody( async_activity* activity ) : my_activity( activity ) {}
324         void operator()() { my_activity->process(); }
325     private:
326         async_activity* my_activity;
327     };
328 
329     async_activity(int expected_items, bool deferred = false, int sleep_time = 50)
330         : my_expected_items(expected_items), my_sleep_time(sleep_time)
331     {
332         is_active = !deferred;
333         my_quit = false;
334         std::thread( ServiceThreadBody( this ) ).swap( my_service_thread );
335     }
336 
337 private:
338 
339     async_activity( const async_activity& )
340         : my_expected_items(UNKNOWN_NUMBER_OF_ITEMS), my_sleep_time(0)
341     {
342         is_active = true;
343     }
344 
345 public:
346     ~async_activity() {
347         stop();
348         my_service_thread.join();
349     }
350 
351     void submit( const input_type &input, gateway_type& gateway ) {
352         work_type work = {input, &gateway};
353         my_work_queue.push( work );
354     }
355 
356     void process() {
357         do {
358             work_type work;
359             if( is_active && my_work_queue.try_pop( work ) ) {
360                 utils::Sleep(my_sleep_time);
361                 ++async_activity_processed_msg_count;
362                 output_type output;
363                 wrapper_helper<output_type, output_type>::copy_value(work.input, output);
364                 wrapper_helper<output_type, output_type>::check(work.input, output);
365                 work.gateway->try_put(output);
366                 if ( my_expected_items == UNKNOWN_NUMBER_OF_ITEMS ||
367                      int(async_activity_processed_msg_count) == my_expected_items ) {
368                     work.gateway->release_wait();
369                 }
370             }
371         } while( my_quit == false || !my_work_queue.empty());
372     }
373 
374     void stop() {
375         my_quit = true;
376     }
377 
378     void activate() {
379         is_active = true;
380     }
381 
382     bool should_reserve_each_time() {
383         if ( my_expected_items == UNKNOWN_NUMBER_OF_ITEMS )
384             return true;
385         else
386             return false;
387     }
388 
389 private:
390 
391     const int my_expected_items;
392     const int my_sleep_time;
393     std::atomic< bool > is_active;
394 
395     async_activity_queue<work_type> my_work_queue;
396 
397     std::atomic< bool > my_quit;
398 
399     std::thread my_service_thread;
400 };
401 
402 template<typename Input, typename Output>
403 struct basic_test {
404     typedef Input input_type;
405     typedef Output output_type;
406     typedef tbb::flow::async_node< input_type, output_type > async_node_type;
407     typedef typename async_node_type::gateway_type gateway_type;
408 
409     basic_test() {}
410 
411     static int run(int async_expected_items = UNKNOWN_NUMBER_OF_ITEMS) {
412         async_activity<input_type, output_type> my_async_activity(async_expected_items);
413 
414         tbb::flow::graph g;
415 
416         tbb::flow::function_node< int, input_type > start_node(
417             g, tbb::flow::unlimited, [](int input) { return input_type(input); }
418         );
419         async_node_type offload_node(
420             g, tbb::flow::unlimited,
421             [&] (const input_type &input, gateway_type& gateway) {
422                 ++async_body_exec_count;
423                 if(my_async_activity.should_reserve_each_time())
424                     gateway.reserve_wait();
425                 my_async_activity.submit(input, gateway);
426             }
427         );
428         tbb::flow::function_node< output_type > end_node(
429             g, tbb::flow::unlimited,
430             [&](const output_type& input) {
431                 ++end_body_exec_count;
432                 output_type output;
433                 wrapper_helper<output_type, output_type>::check(input, output);
434             }
435         );
436 
437         tbb::flow::make_edge( start_node, offload_node );
438         tbb::flow::make_edge( offload_node, end_node );
439 
440         async_body_exec_count = 0;
441         async_activity_processed_msg_count = 0;
442         end_body_exec_count = 0;
443 
444         if (async_expected_items != UNKNOWN_NUMBER_OF_ITEMS) {
445             offload_node.gateway().reserve_wait();
446         }
447         for (int i = 0; i < NUMBER_OF_MSGS; ++i) {
448             start_node.try_put(i);
449         }
450         g.wait_for_all();
451         CHECK_MESSAGE( ( async_body_exec_count == NUMBER_OF_MSGS), "AsyncBody processed wrong number of signals" );
452         CHECK_MESSAGE( ( async_activity_processed_msg_count == NUMBER_OF_MSGS), "AsyncActivity processed wrong number of signals" );
453         CHECK_MESSAGE( ( end_body_exec_count == NUMBER_OF_MSGS), "EndBody processed wrong number of signals");
454         INFO( "async_body_exec_count == " << int(async_body_exec_count) <<
455               " == async_activity_processed_msg_count == " << int(async_activity_processed_msg_count) <<
456               " == end_body_exec_count == " << int(end_body_exec_count) << "\n"
457         );
458         return 0;
459     }
460 
461 };
462 
463 int test_copy_ctor() {
464     const int N = NUMBER_OF_MSGS;
465     async_body_exec_count = 0;
466 
467     tbb::flow::graph g;
468 
469     harness_counting_receiver<int> r1(g);
470     harness_counting_receiver<int> r2(g);
471 
472     tbb::task_group_context graph_ctx;
473     counting_async_node_type a(g, tbb::flow::unlimited, counting_async_unlimited_body(graph_ctx) );
474     counting_async_node_type b(a);
475 
476     tbb::flow::make_edge(a, r1);                             // C++11-style of making edges
477     tbb::flow::make_edge(tbb::flow::output_port<0>(b), r2);  // usual way of making edges
478 
479     for (int i = 0; i < N; ++i) {
480        a.try_put(i);
481     }
482     g.wait_for_all();
483 
484     INFO("async_body_exec_count = " << int(async_body_exec_count) << "\n" );
485     INFO("r1.my_count == " << int(r1.my_count) << " and r2.my_count = " << int(r2.my_count) << "\n" );
486     CHECK_MESSAGE( ( int(async_body_exec_count) == NUMBER_OF_MSGS), "AsyncBody processed wrong number of signals" );
487     CHECK_MESSAGE( ( int(r1.my_count) == N), "counting receiver r1 has not received N items" );
488     CHECK_MESSAGE( ( int(r2.my_count) == 0), "counting receiver r2 has not received 0 items" );
489 
490     for (int i = 0; i < N; ++i) {
491        b.try_put(i);
492     }
493     g.wait_for_all();
494 
495     INFO("async_body_exec_count = " << int(async_body_exec_count) << "\n" );
496     INFO("r1.my_count == " << int(r1.my_count) << " and r2.my_count = " << int(r2.my_count) << "\n" );
497     CHECK_MESSAGE( ( int(async_body_exec_count) == 2*NUMBER_OF_MSGS), "AsyncBody processed wrong number of signals" );
498     CHECK_MESSAGE( ( int(r1.my_count) == N), "counting receiver r1 has not received N items" );
499     CHECK_MESSAGE( ( int(r2.my_count) == N), "counting receiver r2 has not received N items" );
500     return 0;
501 }
502 
503 std::atomic<int> main_tid_count;
504 
505 template<typename Input, typename Output>
506 struct spin_test {
507     typedef Input input_type;
508     typedef Output output_type;
509     typedef tbb::flow::async_node< input_type, output_type > async_node_type;
510     typedef typename async_node_type::gateway_type gateway_type;
511 
512     class end_body_type {
513         typedef Output output_type;
514         std::thread::id my_main_tid;
515         utils::SpinBarrier *my_barrier;
516     public:
517         end_body_type(std::thread::id t, utils::SpinBarrier &b) : my_main_tid(t), my_barrier(&b) { }
518 
519         void operator()( const output_type & ) {
520             ++end_body_exec_count;
521             if (std::this_thread::get_id() == my_main_tid) {
522                ++main_tid_count;
523             }
524             my_barrier->timedWaitNoError(10);
525         }
526     };
527 
528     spin_test() {}
529 
530     static int run(int nthreads, int async_expected_items = UNKNOWN_NUMBER_OF_ITEMS) {
531         async_activity<input_type, output_type> my_async_activity(async_expected_items, false, 0);
532         const int overall_message_count = nthreads * NUMBER_OF_MSGS;
533         utils::SpinBarrier spin_barrier(nthreads);
534 
535         tbb::flow::graph g;
536         tbb::flow::function_node<int, input_type> start_node(
537             g, tbb::flow::unlimited, [](int input) { return input_type(input); }
538         );
539         async_node_type offload_node(
540             g, tbb::flow::unlimited,
541             [&](const input_type &input, gateway_type& gateway) {
542                 ++async_body_exec_count;
543                 if(my_async_activity.should_reserve_each_time())
544                     gateway.reserve_wait();
545                 my_async_activity.submit(input, gateway);
546             }
547         );
548         tbb::flow::function_node<output_type> end_node(
549             g, tbb::flow::unlimited, end_body_type(std::this_thread::get_id(), spin_barrier)
550         );
551 
552         tbb::flow::make_edge( start_node, offload_node );
553         tbb::flow::make_edge( offload_node, end_node );
554 
555         async_body_exec_count = 0;
556         async_activity_processed_msg_count = 0;
557         end_body_exec_count = 0;
558         main_tid_count = 0;
559 
560         if (async_expected_items != UNKNOWN_NUMBER_OF_ITEMS ) {
561             offload_node.gateway().reserve_wait();
562         }
563         for (int i = 0; i < overall_message_count; ++i) {
564             start_node.try_put(i);
565         }
566         g.wait_for_all();
567         CHECK_MESSAGE( (async_body_exec_count == overall_message_count),
568                        "AsyncBody processed wrong number of signals" );
569         CHECK_MESSAGE( (async_activity_processed_msg_count == overall_message_count),
570                        "AsyncActivity processed wrong number of signals" );
571         CHECK_MESSAGE( (end_body_exec_count == overall_message_count),
572                        "EndBody processed wrong number of signals");
573 
574         INFO( "Main thread participated in " << main_tid_count << " end_body tasks\n");
575 
576         INFO("async_body_exec_count == " << int(async_body_exec_count) <<
577              " == async_activity_processed_msg_count == " << int(async_activity_processed_msg_count) <<
578              " == end_body_exec_count == " << int(end_body_exec_count) << "\n"
579         );
580         return 0;
581     }
582 
583 };
584 
585 void test_for_spin_avoidance() {
586     const int nthreads = 4;
587     tbb::global_control gc(tbb::global_control::max_allowed_parallelism, nthreads);
588     spin_test<int, int>::run(nthreads);
589 }
590 
591 template< typename Input, typename Output >
592 int run_tests() {
593     basic_test<Input, Output>::run();
594     basic_test<Input, Output>::run(NUMBER_OF_MSGS);
595     basic_test<place_wrapper<Input>, place_wrapper<Output> >::run();
596     basic_test<place_wrapper<Input>, place_wrapper<Output> >::run(NUMBER_OF_MSGS);
597     return 0;
598 }
599 
600 #include "tbb/parallel_for.h"
601 template<typename Input, typename Output>
602 class equeueing_on_inner_level {
603     typedef Input input_type;
604     typedef Output output_type;
605     typedef async_activity<input_type, output_type> async_activity_type;
606     typedef tbb::flow::async_node<Input, Output> async_node_type;
607     typedef typename async_node_type::gateway_type gateway_type;
608 
609     class body_graph_with_async {
610     public:
611         body_graph_with_async( utils::SpinBarrier& barrier, async_activity_type& activity )
612             : spin_barrier(&barrier), my_async_activity(&activity) {}
613 
614         void operator()(int) const {
615             tbb::flow::graph g;
616             tbb::flow::function_node< int, input_type > start_node(
617                 g, tbb::flow::unlimited, [](int input) { return input_type(input); }
618             );
619             async_node_type offload_node(
620                 g, tbb::flow::unlimited,
621                 [&](const input_type &input, gateway_type& gateway) {
622                     gateway.reserve_wait();
623                     my_async_activity->submit( input, gateway );
624                 }
625             );
626             tbb::flow::function_node< output_type > end_node( g, tbb::flow::unlimited, [](output_type){} );
627 
628             tbb::flow::make_edge( start_node, offload_node );
629             tbb::flow::make_edge( offload_node, end_node );
630 
631             start_node.try_put(1);
632 
633             spin_barrier->wait();
634 
635             my_async_activity->activate();
636 
637             g.wait_for_all();
638         }
639 
640     private:
641         utils::SpinBarrier* spin_barrier;
642         async_activity_type* my_async_activity;
643     };
644 
645 public:
646     static int run ()
647     {
648         const int nthreads = tbb::this_task_arena::max_concurrency();
649         utils::SpinBarrier spin_barrier( nthreads );
650 
651         async_activity_type my_async_activity( UNKNOWN_NUMBER_OF_ITEMS, true );
652 
653         tbb::parallel_for( 0, nthreads, body_graph_with_async( spin_barrier, my_async_activity ) );
654         return 0;
655     }
656 };
657 
658 int run_test_equeueing_on_inner_level() {
659     equeueing_on_inner_level<int, int>::run();
660     return 0;
661 }
662 
663 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
664 #include <array>
665 
666 template<typename NodeType>
667 class AsyncActivity {
668 public:
669     using gateway_t = typename NodeType::gateway_type;
670 
671     struct work_type {
672         int input;
673         gateway_t* gateway;
674     };
675 
676     AsyncActivity(size_t limit) : thr([this]() {
677         while(!end_of_work()) {
678             work_type w;
679             while( my_q.try_pop(w) ) {
680                 int res = do_work(w.input);
681                 w.gateway->try_put(res);
682                 w.gateway->release_wait();
683                 ++c;
684             }
685         }
686     }), stop_limit(limit), c(0) {}
687 
688     void submit(int i, gateway_t* gateway) {
689         work_type w = {i, gateway};
690         gateway->reserve_wait();
691         my_q.push(w);
692     }
693 
694     void wait_for_all() { thr.join(); }
695 
696 private:
697     bool end_of_work() { return c >= stop_limit; }
698 
699     int do_work(int& i) { return i + i; }
700 
701     async_activity_queue<work_type> my_q;
702     std::thread thr;
703     size_t stop_limit;
704     size_t c;
705 };
706 
707 void test_follows() {
708     using namespace tbb::flow;
709 
710     using input_t = int;
711     using output_t = int;
712     using node_t = async_node<input_t, output_t>;
713 
714     graph g;
715 
716     AsyncActivity<node_t> async_activity(3);
717 
718     std::array<broadcast_node<input_t>, 3> preds = {
719       {
720         broadcast_node<input_t>(g),
721         broadcast_node<input_t>(g),
722         broadcast_node<input_t>(g)
723       }
724     };
725 
726     node_t node(follows(preds[0], preds[1], preds[2]), unlimited, [&](int input, node_t::gateway_type& gtw) {
727         async_activity.submit(input, &gtw);
728     }, no_priority);
729 
730     buffer_node<output_t> buf(g);
731     make_edge(node, buf);
732 
733     for(auto& pred: preds) {
734         pred.try_put(1);
735     }
736 
737     g.wait_for_all();
738     async_activity.wait_for_all();
739 
740     output_t storage;
741     CHECK_MESSAGE((buf.try_get(storage) && buf.try_get(storage) && buf.try_get(storage) && !buf.try_get(storage)),
742                   "Not exact edge quantity was made");
743 }
744 
745 void test_precedes() {
746     using namespace tbb::flow;
747 
748     using input_t = int;
749     using output_t = int;
750     using node_t = async_node<input_t, output_t>;
751 
752     graph g;
753 
754     AsyncActivity<node_t> async_activity(1);
755 
756     std::array<buffer_node<input_t>, 1> successors = { {buffer_node<input_t>(g)} };
757 
758     broadcast_node<input_t> start(g);
759 
760     node_t node(precedes(successors[0]), unlimited, [&](int input, node_t::gateway_type& gtw) {
761         async_activity.submit(input, &gtw);
762     }, no_priority);
763 
764     make_edge(start, node);
765 
766     start.try_put(1);
767 
768     g.wait_for_all();
769     async_activity.wait_for_all();
770 
771     for(auto& successor : successors) {
772         output_t storage;
773         CHECK_MESSAGE((successor.try_get(storage) && !successor.try_get(storage)),
774                       "Not exact edge quantity was made");
775     }
776 }
777 
778 void test_follows_and_precedes_api() {
779     test_follows();
780     test_precedes();
781 }
782 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
783 
784 //! Test async bodies processing
785 //! \brief \ref requirement \ref error_guessing
786 TEST_CASE("Basic tests"){
787     tbb::task_arena arena(utils::MaxThread);
788     arena.execute(
789         [&]() {
790             run_tests<int, int>();
791             run_tests<minimal_type, minimal_type>();
792             run_tests<int, minimal_type>();
793         }
794     );
795 }
796 
797 //! NativeParallelFor test with various concurrency settings
798 //! \brief \ref requirement \ref error_guessing
799 TEST_CASE("Lightweight tests"){
800     lightweight_testing::test<tbb::flow::async_node>(NUMBER_OF_MSGS);
801 }
802 
803 //! Test reset and cancellation
804 //! \brief \ref error_guessing
805 TEST_CASE("Reset test"){
806     test_reset();
807 }
808 
809 //! Test
810 //! \brief \ref requirement \ref error_guessing
811 TEST_CASE("Copy constructor test"){
812     test_copy_ctor();
813 }
814 
815 //! Test if main thread spins
816 //! \brief \ref stress
817 TEST_CASE("Spin avoidance test"){
818     test_for_spin_avoidance();
819 }
820 
821 //! Test nested enqueing
822 //! \brief \ref error_guessing
823 TEST_CASE("Inner enqueing test"){
824     run_test_equeueing_on_inner_level();
825 }
826 
827 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
828 //! Test deprecated follows and preceedes API
829 //! \brief \ref error_guessing
830 TEST_CASE("Test follows and preceedes API"){
831     test_follows_and_precedes_api();
832 }
833 #endif
834