xref: /oneTBB/test/tbb/test_async_node.cpp (revision 51c0b2f7)
1*51c0b2f7Stbbdev /*
2*51c0b2f7Stbbdev     Copyright (c) 2005-2020 Intel Corporation
3*51c0b2f7Stbbdev 
4*51c0b2f7Stbbdev     Licensed under the Apache License, Version 2.0 (the "License");
5*51c0b2f7Stbbdev     you may not use this file except in compliance with the License.
6*51c0b2f7Stbbdev     You may obtain a copy of the License at
7*51c0b2f7Stbbdev 
8*51c0b2f7Stbbdev         http://www.apache.org/licenses/LICENSE-2.0
9*51c0b2f7Stbbdev 
10*51c0b2f7Stbbdev     Unless required by applicable law or agreed to in writing, software
11*51c0b2f7Stbbdev     distributed under the License is distributed on an "AS IS" BASIS,
12*51c0b2f7Stbbdev     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13*51c0b2f7Stbbdev     See the License for the specific language governing permissions and
14*51c0b2f7Stbbdev     limitations under the License.
15*51c0b2f7Stbbdev */
16*51c0b2f7Stbbdev 
17*51c0b2f7Stbbdev 
18*51c0b2f7Stbbdev #include "common/config.h"
19*51c0b2f7Stbbdev 
20*51c0b2f7Stbbdev #define __TBB_EXTRA_DEBUG 1
21*51c0b2f7Stbbdev #include "tbb/flow_graph.h"
22*51c0b2f7Stbbdev 
23*51c0b2f7Stbbdev #include "tbb/task.h"
24*51c0b2f7Stbbdev #include "tbb/global_control.h"
25*51c0b2f7Stbbdev 
26*51c0b2f7Stbbdev #include "common/test.h"
27*51c0b2f7Stbbdev #include "common/utils.h"
28*51c0b2f7Stbbdev #include "common/utils_assert.h"
29*51c0b2f7Stbbdev #include "common/graph_utils.h"
30*51c0b2f7Stbbdev #include "common/spin_barrier.h"
31*51c0b2f7Stbbdev #include "common/test_follows_and_precedes_api.h"
32*51c0b2f7Stbbdev 
33*51c0b2f7Stbbdev #include <string>
34*51c0b2f7Stbbdev #include <thread>
35*51c0b2f7Stbbdev #include <mutex>
36*51c0b2f7Stbbdev 
37*51c0b2f7Stbbdev 
38*51c0b2f7Stbbdev //! \file test_async_node.cpp
39*51c0b2f7Stbbdev //! \brief Test for [flow_graph.async_node] specification
40*51c0b2f7Stbbdev 
41*51c0b2f7Stbbdev 
42*51c0b2f7Stbbdev class minimal_type {
43*51c0b2f7Stbbdev     template<typename T>
44*51c0b2f7Stbbdev     friend struct place_wrapper;
45*51c0b2f7Stbbdev 
46*51c0b2f7Stbbdev     int value;
47*51c0b2f7Stbbdev 
48*51c0b2f7Stbbdev public:
49*51c0b2f7Stbbdev     minimal_type() : value(-1) {}
50*51c0b2f7Stbbdev     minimal_type(int v) : value(v) {}
51*51c0b2f7Stbbdev     minimal_type(const minimal_type &m) : value(m.value) { }
52*51c0b2f7Stbbdev     minimal_type &operator=(const minimal_type &m) { value = m.value; return *this; }
53*51c0b2f7Stbbdev };
54*51c0b2f7Stbbdev 
55*51c0b2f7Stbbdev template <typename T>
56*51c0b2f7Stbbdev struct place_wrapper {
57*51c0b2f7Stbbdev     typedef T wrapped_type;
58*51c0b2f7Stbbdev     T value;
59*51c0b2f7Stbbdev     std::thread::id thread_id;
60*51c0b2f7Stbbdev 
61*51c0b2f7Stbbdev     place_wrapper( int v = 0 ) : value(v), thread_id(std::this_thread::get_id()) {}
62*51c0b2f7Stbbdev 
63*51c0b2f7Stbbdev     template <typename Q>
64*51c0b2f7Stbbdev     place_wrapper(const place_wrapper<Q>& v)
65*51c0b2f7Stbbdev         : value(v.value), thread_id(v.thread_id)
66*51c0b2f7Stbbdev     {}
67*51c0b2f7Stbbdev 
68*51c0b2f7Stbbdev     template <typename Q>
69*51c0b2f7Stbbdev     place_wrapper<Q>& operator=(const place_wrapper<Q>& v) {
70*51c0b2f7Stbbdev         if (this != &v) {
71*51c0b2f7Stbbdev             value = v.value;
72*51c0b2f7Stbbdev             thread_id = v.thread_id;
73*51c0b2f7Stbbdev         }
74*51c0b2f7Stbbdev         return *this;
75*51c0b2f7Stbbdev     }
76*51c0b2f7Stbbdev 
77*51c0b2f7Stbbdev };
78*51c0b2f7Stbbdev 
79*51c0b2f7Stbbdev template<typename T1, typename T2>
80*51c0b2f7Stbbdev struct wrapper_helper {
81*51c0b2f7Stbbdev     static void check(const T1 &, const T2 &) { }
82*51c0b2f7Stbbdev     static void copy_value(const T1 &in, T2 &out) { out = in; }
83*51c0b2f7Stbbdev };
84*51c0b2f7Stbbdev 
85*51c0b2f7Stbbdev template<typename T1, typename T2>
86*51c0b2f7Stbbdev struct wrapper_helper< place_wrapper<T1>, place_wrapper<T2> > {
87*51c0b2f7Stbbdev     static void check(const place_wrapper<T1> &a, const place_wrapper<T2> &b) {
88*51c0b2f7Stbbdev        CHECK_MESSAGE( ( (a.thread_id != b.thread_id)), "same thread used to execute adjacent nodes");
89*51c0b2f7Stbbdev        return;
90*51c0b2f7Stbbdev     }
91*51c0b2f7Stbbdev     static void copy_value(const place_wrapper<T1> &in, place_wrapper<T2> &out) {
92*51c0b2f7Stbbdev         out.value = in.value;
93*51c0b2f7Stbbdev     }
94*51c0b2f7Stbbdev };
95*51c0b2f7Stbbdev 
96*51c0b2f7Stbbdev const int NUMBER_OF_MSGS = 10;
97*51c0b2f7Stbbdev const int UNKNOWN_NUMBER_OF_ITEMS = -1;
98*51c0b2f7Stbbdev std::atomic<int> async_body_exec_count;
99*51c0b2f7Stbbdev std::atomic<int> async_activity_processed_msg_count;
100*51c0b2f7Stbbdev std::atomic<int> end_body_exec_count;
101*51c0b2f7Stbbdev 
102*51c0b2f7Stbbdev // queueing required in test_reset for testing of cancellation
103*51c0b2f7Stbbdev typedef tbb::flow::async_node< int, int, tbb::flow::queueing > counting_async_node_type;
104*51c0b2f7Stbbdev typedef counting_async_node_type::gateway_type counting_gateway_type;
105*51c0b2f7Stbbdev 
106*51c0b2f7Stbbdev struct counting_async_unlimited_body {
107*51c0b2f7Stbbdev 
108*51c0b2f7Stbbdev     counting_async_unlimited_body(tbb::task_group_context& graph_tgc) : my_tgc( graph_tgc ) {}
109*51c0b2f7Stbbdev 
110*51c0b2f7Stbbdev     void operator()( const int &input, counting_gateway_type& gateway) {
111*51c0b2f7Stbbdev         // TODO revamp: reconsider logging for the tests. It is known that frequent calls to
112*51c0b2f7Stbbdev         // doctest's INFO cause issues.
113*51c0b2f7Stbbdev 
114*51c0b2f7Stbbdev         // INFO( "Body execution with input == " << input << "\n");
115*51c0b2f7Stbbdev         ++async_body_exec_count;
116*51c0b2f7Stbbdev         if ( input == -1 ) {
117*51c0b2f7Stbbdev             bool result = my_tgc.cancel_group_execution();
118*51c0b2f7Stbbdev             // INFO( "Canceling graph execution\n" );
119*51c0b2f7Stbbdev             CHECK_MESSAGE( ( result == true), "attempted to cancel graph twice" );
120*51c0b2f7Stbbdev             utils::Sleep(50);
121*51c0b2f7Stbbdev         }
122*51c0b2f7Stbbdev         gateway.try_put(input);
123*51c0b2f7Stbbdev     }
124*51c0b2f7Stbbdev private:
125*51c0b2f7Stbbdev     tbb::task_group_context& my_tgc;
126*51c0b2f7Stbbdev };
127*51c0b2f7Stbbdev 
128*51c0b2f7Stbbdev struct counting_async_serial_body : counting_async_unlimited_body {
129*51c0b2f7Stbbdev     typedef counting_async_unlimited_body base_type;
130*51c0b2f7Stbbdev     int my_async_body_exec_count;
131*51c0b2f7Stbbdev 
132*51c0b2f7Stbbdev     counting_async_serial_body(tbb::task_group_context& tgc)
133*51c0b2f7Stbbdev         : base_type(tgc), my_async_body_exec_count( 0 ) { }
134*51c0b2f7Stbbdev 
135*51c0b2f7Stbbdev     void operator()( const int &input, counting_gateway_type& gateway ) {
136*51c0b2f7Stbbdev         ++my_async_body_exec_count;
137*51c0b2f7Stbbdev         base_type::operator()( input, gateway );
138*51c0b2f7Stbbdev     }
139*51c0b2f7Stbbdev };
140*51c0b2f7Stbbdev 
141*51c0b2f7Stbbdev void test_reset() {
142*51c0b2f7Stbbdev     const int N = NUMBER_OF_MSGS;
143*51c0b2f7Stbbdev     async_body_exec_count = 0;
144*51c0b2f7Stbbdev 
145*51c0b2f7Stbbdev     tbb::task_group_context graph_ctx;
146*51c0b2f7Stbbdev     tbb::flow::graph g(graph_ctx);
147*51c0b2f7Stbbdev     counting_async_node_type a(g, tbb::flow::serial, counting_async_serial_body(graph_ctx) );
148*51c0b2f7Stbbdev 
149*51c0b2f7Stbbdev     const int R = 3;
150*51c0b2f7Stbbdev     std::vector< std::shared_ptr<harness_counting_receiver<int>> > r;
151*51c0b2f7Stbbdev     for (size_t i = 0; i < R; ++i) {
152*51c0b2f7Stbbdev         r.push_back( std::make_shared<harness_counting_receiver<int>>(g) );
153*51c0b2f7Stbbdev     }
154*51c0b2f7Stbbdev 
155*51c0b2f7Stbbdev     for (int i = 0; i < R; ++i) {
156*51c0b2f7Stbbdev         tbb::flow::make_edge(a, *r[i]);
157*51c0b2f7Stbbdev     }
158*51c0b2f7Stbbdev 
159*51c0b2f7Stbbdev     INFO( "One body execution\n" );
160*51c0b2f7Stbbdev     a.try_put(-1);
161*51c0b2f7Stbbdev     for (int i = 0; i < N; ++i) {
162*51c0b2f7Stbbdev        a.try_put(i);
163*51c0b2f7Stbbdev     }
164*51c0b2f7Stbbdev     g.wait_for_all();
165*51c0b2f7Stbbdev     // should be canceled with only 1 item reaching the async_body and the counting receivers
166*51c0b2f7Stbbdev     // and N items left in the node's queue
167*51c0b2f7Stbbdev     CHECK_MESSAGE( ( g.is_cancelled() == true), "task group not canceled" );
168*51c0b2f7Stbbdev 
169*51c0b2f7Stbbdev     counting_async_serial_body b1 = tbb::flow::copy_body<counting_async_serial_body>(a);
170*51c0b2f7Stbbdev     CHECK_MESSAGE( ( int(async_body_exec_count) == int(b1.my_async_body_exec_count)), "body and global body counts are different" );
171*51c0b2f7Stbbdev     CHECK_MESSAGE( ( int(async_body_exec_count) == 1), "global body execution count not 1"  );
172*51c0b2f7Stbbdev     for (int i = 0; i < R; ++i) {
173*51c0b2f7Stbbdev         CHECK_MESSAGE( ( int(r[i]->my_count) == 1), "counting receiver count not 1" );
174*51c0b2f7Stbbdev     }
175*51c0b2f7Stbbdev 
176*51c0b2f7Stbbdev     // should clear the async_node queue, but retain its local count at 1 and keep all edges
177*51c0b2f7Stbbdev     g.reset(tbb::flow::rf_reset_protocol);
178*51c0b2f7Stbbdev 
179*51c0b2f7Stbbdev     INFO( "N body executions\n" );
180*51c0b2f7Stbbdev     for (int i = 0; i < N; ++i) {
181*51c0b2f7Stbbdev        a.try_put(i);
182*51c0b2f7Stbbdev     }
183*51c0b2f7Stbbdev     g.wait_for_all();
184*51c0b2f7Stbbdev     CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" );
185*51c0b2f7Stbbdev 
186*51c0b2f7Stbbdev     // a total of N+1 items should have passed through the node body
187*51c0b2f7Stbbdev     // the local body count should also be N+1
188*51c0b2f7Stbbdev     // and the counting receivers should all have a count of N+1
189*51c0b2f7Stbbdev     counting_async_serial_body b2 = tbb::flow::copy_body<counting_async_serial_body>(a);
190*51c0b2f7Stbbdev     CHECK_MESSAGE( int(async_body_exec_count) == int(b2.my_async_body_exec_count),
191*51c0b2f7Stbbdev                    "local and global body execution counts are different" );
192*51c0b2f7Stbbdev     INFO( "async_body_exec_count==" << int(async_body_exec_count) << "\n" );
193*51c0b2f7Stbbdev     CHECK_MESSAGE( ( int(async_body_exec_count) == N+1), "global body execution count not N+1"  );
194*51c0b2f7Stbbdev     for (int i = 0; i < R; ++i) {
195*51c0b2f7Stbbdev         CHECK_MESSAGE( ( int(r[i]->my_count) == N+1), "counting receiver has not received N+1 items" );
196*51c0b2f7Stbbdev     }
197*51c0b2f7Stbbdev 
198*51c0b2f7Stbbdev     INFO( "N body executions with new bodies\n" );
199*51c0b2f7Stbbdev     // should clear the async_node queue and reset its local count to 0, but keep all edges
200*51c0b2f7Stbbdev     g.reset(tbb::flow::rf_reset_bodies);
201*51c0b2f7Stbbdev     for (int i = 0; i < N; ++i) {
202*51c0b2f7Stbbdev        a.try_put(i);
203*51c0b2f7Stbbdev     }
204*51c0b2f7Stbbdev     g.wait_for_all();
205*51c0b2f7Stbbdev     CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" );
206*51c0b2f7Stbbdev 
207*51c0b2f7Stbbdev     // a total of 2N+1 items should have passed through the node body
208*51c0b2f7Stbbdev     // the local body count should be N
209*51c0b2f7Stbbdev     // and the counting receivers should all have a count of 2N+1
210*51c0b2f7Stbbdev     counting_async_serial_body b3 = tbb::flow::copy_body<counting_async_serial_body>(a);
211*51c0b2f7Stbbdev     CHECK_MESSAGE( ( int(async_body_exec_count) == 2*N+1), "global body execution count not 2N+1"  );
212*51c0b2f7Stbbdev     CHECK_MESSAGE( ( int(b3.my_async_body_exec_count) == N), "local body execution count not N"  );
213*51c0b2f7Stbbdev     for (int i = 0; i < R; ++i) {
214*51c0b2f7Stbbdev         CHECK_MESSAGE( ( int(r[i]->my_count) == 2*N+1), "counting receiver has not received 2N+1 items" );
215*51c0b2f7Stbbdev     }
216*51c0b2f7Stbbdev 
217*51c0b2f7Stbbdev     // should clear the async_node queue and keep its local count at N and remove all edges
218*51c0b2f7Stbbdev     INFO( "N body executions with no edges\n" );
219*51c0b2f7Stbbdev     g.reset(tbb::flow::rf_clear_edges);
220*51c0b2f7Stbbdev     for (int i = 0; i < N; ++i) {
221*51c0b2f7Stbbdev        a.try_put(i);
222*51c0b2f7Stbbdev     }
223*51c0b2f7Stbbdev     g.wait_for_all();
224*51c0b2f7Stbbdev     CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" );
225*51c0b2f7Stbbdev 
226*51c0b2f7Stbbdev     // a total of 3N+1 items should have passed through the node body
227*51c0b2f7Stbbdev     // the local body count should now be 2*N
228*51c0b2f7Stbbdev     // and the counting receivers should remain at a count of 2N+1
229*51c0b2f7Stbbdev     counting_async_serial_body b4 = tbb::flow::copy_body<counting_async_serial_body>(a);
230*51c0b2f7Stbbdev     CHECK_MESSAGE( ( int(async_body_exec_count) == 3*N+1), "global body execution count not 3N+1"  );
231*51c0b2f7Stbbdev     CHECK_MESSAGE( ( int(b4.my_async_body_exec_count) == 2*N), "local body execution count not 2N"  );
232*51c0b2f7Stbbdev     for (int i = 0; i < R; ++i) {
233*51c0b2f7Stbbdev         CHECK_MESSAGE( ( int(r[i]->my_count) == 2*N+1), "counting receiver has not received 2N+1 items" );
234*51c0b2f7Stbbdev     }
235*51c0b2f7Stbbdev 
236*51c0b2f7Stbbdev     // put back 1 edge to receiver 0
237*51c0b2f7Stbbdev     INFO( "N body executions with 1 edge\n" );
238*51c0b2f7Stbbdev     tbb::flow::make_edge(a, *r[0]);
239*51c0b2f7Stbbdev     for (int i = 0; i < N; ++i) {
240*51c0b2f7Stbbdev        a.try_put(i);
241*51c0b2f7Stbbdev     }
242*51c0b2f7Stbbdev     g.wait_for_all();
243*51c0b2f7Stbbdev     CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" );
244*51c0b2f7Stbbdev 
245*51c0b2f7Stbbdev     // a total of 4N+1 items should have passed through the node body
246*51c0b2f7Stbbdev     // the local body count should now be 3*N
247*51c0b2f7Stbbdev     // and all of the counting receivers should remain at a count of 2N+1, except r[0] which should be 3N+1
248*51c0b2f7Stbbdev     counting_async_serial_body b5 = tbb::flow::copy_body<counting_async_serial_body>(a);
249*51c0b2f7Stbbdev     CHECK_MESSAGE( ( int(async_body_exec_count) == 4*N+1), "global body execution count not 4N+1"  );
250*51c0b2f7Stbbdev     CHECK_MESSAGE( ( int(b5.my_async_body_exec_count) == 3*N), "local body execution count not 3N"  );
251*51c0b2f7Stbbdev     CHECK_MESSAGE( ( int(r[0]->my_count) == 3*N+1), "counting receiver has not received 3N+1 items" );
252*51c0b2f7Stbbdev     for (int i = 1; i < R; ++i) {
253*51c0b2f7Stbbdev         CHECK_MESSAGE( ( int(r[i]->my_count) == 2*N+1), "counting receiver has not received 2N+1 items" );
254*51c0b2f7Stbbdev     }
255*51c0b2f7Stbbdev 
256*51c0b2f7Stbbdev     // should clear the async_node queue and keep its local count at N and remove all edges
257*51c0b2f7Stbbdev     INFO( "N body executions with no edges and new body\n" );
258*51c0b2f7Stbbdev     g.reset(static_cast<tbb::flow::reset_flags>(tbb::flow::rf_reset_bodies|tbb::flow::rf_clear_edges));
259*51c0b2f7Stbbdev     for (int i = 0; i < N; ++i) {
260*51c0b2f7Stbbdev        a.try_put(i);
261*51c0b2f7Stbbdev     }
262*51c0b2f7Stbbdev     g.wait_for_all();
263*51c0b2f7Stbbdev     CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" );
264*51c0b2f7Stbbdev 
265*51c0b2f7Stbbdev     // a total of 4N+1 items should have passed through the node body
266*51c0b2f7Stbbdev     // the local body count should now be 3*N
267*51c0b2f7Stbbdev     // and all of the counting receivers should remain at a count of 2N+1, except r[0] which should be 3N+1
268*51c0b2f7Stbbdev     counting_async_serial_body b6 = tbb::flow::copy_body<counting_async_serial_body>(a);
269*51c0b2f7Stbbdev     CHECK_MESSAGE( ( int(async_body_exec_count) == 5*N+1), "global body execution count not 5N+1"  );
270*51c0b2f7Stbbdev     CHECK_MESSAGE( ( int(b6.my_async_body_exec_count) == N), "local body execution count not N"  );
271*51c0b2f7Stbbdev     CHECK_MESSAGE( ( int(r[0]->my_count) == 3*N+1), "counting receiver has not received 3N+1 items" );
272*51c0b2f7Stbbdev     for (int i = 1; i < R; ++i) {
273*51c0b2f7Stbbdev         CHECK_MESSAGE( ( int(r[i]->my_count) == 2*N+1), "counting receiver has not received 2N+1 items" );
274*51c0b2f7Stbbdev     }
275*51c0b2f7Stbbdev }
276*51c0b2f7Stbbdev 
277*51c0b2f7Stbbdev 
278*51c0b2f7Stbbdev #include <mutex>
279*51c0b2f7Stbbdev 
280*51c0b2f7Stbbdev template <typename T>
281*51c0b2f7Stbbdev class async_activity_queue {
282*51c0b2f7Stbbdev public:
283*51c0b2f7Stbbdev     void push( const T& item ) {
284*51c0b2f7Stbbdev         std::lock_guard<mutex_t> lock( m_mutex );
285*51c0b2f7Stbbdev         m_queue.push( item );
286*51c0b2f7Stbbdev     }
287*51c0b2f7Stbbdev 
288*51c0b2f7Stbbdev     bool try_pop( T& item ) {
289*51c0b2f7Stbbdev         std::lock_guard<mutex_t> lock( m_mutex );
290*51c0b2f7Stbbdev         if( m_queue.empty() )
291*51c0b2f7Stbbdev             return false;
292*51c0b2f7Stbbdev         item = m_queue.front();
293*51c0b2f7Stbbdev         m_queue.pop();
294*51c0b2f7Stbbdev         return true;
295*51c0b2f7Stbbdev     }
296*51c0b2f7Stbbdev 
297*51c0b2f7Stbbdev     bool empty() {
298*51c0b2f7Stbbdev         std::lock_guard<mutex_t> lock( m_mutex );
299*51c0b2f7Stbbdev         return m_queue.empty();
300*51c0b2f7Stbbdev     }
301*51c0b2f7Stbbdev 
302*51c0b2f7Stbbdev private:
303*51c0b2f7Stbbdev     typedef std::mutex mutex_t;
304*51c0b2f7Stbbdev     mutex_t m_mutex;
305*51c0b2f7Stbbdev     std::queue<T> m_queue;
306*51c0b2f7Stbbdev };
307*51c0b2f7Stbbdev 
308*51c0b2f7Stbbdev template< typename Input, typename Output >
309*51c0b2f7Stbbdev class async_activity : utils::NoAssign {
310*51c0b2f7Stbbdev public:
311*51c0b2f7Stbbdev     typedef Input input_type;
312*51c0b2f7Stbbdev     typedef Output output_type;
313*51c0b2f7Stbbdev     typedef tbb::flow::async_node< input_type, output_type > async_node_type;
314*51c0b2f7Stbbdev     typedef typename async_node_type::gateway_type gateway_type;
315*51c0b2f7Stbbdev 
316*51c0b2f7Stbbdev     struct work_type {
317*51c0b2f7Stbbdev         input_type input;
318*51c0b2f7Stbbdev         gateway_type* gateway;
319*51c0b2f7Stbbdev     };
320*51c0b2f7Stbbdev 
321*51c0b2f7Stbbdev     class ServiceThreadBody {
322*51c0b2f7Stbbdev     public:
323*51c0b2f7Stbbdev         ServiceThreadBody( async_activity* activity ) : my_activity( activity ) {}
324*51c0b2f7Stbbdev         void operator()() { my_activity->process(); }
325*51c0b2f7Stbbdev     private:
326*51c0b2f7Stbbdev         async_activity* my_activity;
327*51c0b2f7Stbbdev     };
328*51c0b2f7Stbbdev 
329*51c0b2f7Stbbdev     async_activity(int expected_items, bool deferred = false, int sleep_time = 50)
330*51c0b2f7Stbbdev         : my_expected_items(expected_items), my_sleep_time(sleep_time)
331*51c0b2f7Stbbdev     {
332*51c0b2f7Stbbdev         is_active = !deferred;
333*51c0b2f7Stbbdev         my_quit = false;
334*51c0b2f7Stbbdev         std::thread( ServiceThreadBody( this ) ).swap( my_service_thread );
335*51c0b2f7Stbbdev     }
336*51c0b2f7Stbbdev 
337*51c0b2f7Stbbdev private:
338*51c0b2f7Stbbdev 
339*51c0b2f7Stbbdev     async_activity( const async_activity& )
340*51c0b2f7Stbbdev         : my_expected_items(UNKNOWN_NUMBER_OF_ITEMS), my_sleep_time(0)
341*51c0b2f7Stbbdev     {
342*51c0b2f7Stbbdev         is_active = true;
343*51c0b2f7Stbbdev     }
344*51c0b2f7Stbbdev 
345*51c0b2f7Stbbdev public:
346*51c0b2f7Stbbdev     ~async_activity() {
347*51c0b2f7Stbbdev         stop();
348*51c0b2f7Stbbdev         my_service_thread.join();
349*51c0b2f7Stbbdev     }
350*51c0b2f7Stbbdev 
351*51c0b2f7Stbbdev     void submit( const input_type &input, gateway_type& gateway ) {
352*51c0b2f7Stbbdev         work_type work = {input, &gateway};
353*51c0b2f7Stbbdev         my_work_queue.push( work );
354*51c0b2f7Stbbdev     }
355*51c0b2f7Stbbdev 
356*51c0b2f7Stbbdev     void process() {
357*51c0b2f7Stbbdev         do {
358*51c0b2f7Stbbdev             work_type work;
359*51c0b2f7Stbbdev             if( is_active && my_work_queue.try_pop( work ) ) {
360*51c0b2f7Stbbdev                 utils::Sleep(my_sleep_time);
361*51c0b2f7Stbbdev                 ++async_activity_processed_msg_count;
362*51c0b2f7Stbbdev                 output_type output;
363*51c0b2f7Stbbdev                 wrapper_helper<output_type, output_type>::copy_value(work.input, output);
364*51c0b2f7Stbbdev                 wrapper_helper<output_type, output_type>::check(work.input, output);
365*51c0b2f7Stbbdev                 work.gateway->try_put(output);
366*51c0b2f7Stbbdev                 if ( my_expected_items == UNKNOWN_NUMBER_OF_ITEMS ||
367*51c0b2f7Stbbdev                      int(async_activity_processed_msg_count) == my_expected_items ) {
368*51c0b2f7Stbbdev                     work.gateway->release_wait();
369*51c0b2f7Stbbdev                 }
370*51c0b2f7Stbbdev             }
371*51c0b2f7Stbbdev         } while( my_quit == false || !my_work_queue.empty());
372*51c0b2f7Stbbdev     }
373*51c0b2f7Stbbdev 
374*51c0b2f7Stbbdev     void stop() {
375*51c0b2f7Stbbdev         my_quit = true;
376*51c0b2f7Stbbdev     }
377*51c0b2f7Stbbdev 
378*51c0b2f7Stbbdev     void activate() {
379*51c0b2f7Stbbdev         is_active = true;
380*51c0b2f7Stbbdev     }
381*51c0b2f7Stbbdev 
382*51c0b2f7Stbbdev     bool should_reserve_each_time() {
383*51c0b2f7Stbbdev         if ( my_expected_items == UNKNOWN_NUMBER_OF_ITEMS )
384*51c0b2f7Stbbdev             return true;
385*51c0b2f7Stbbdev         else
386*51c0b2f7Stbbdev             return false;
387*51c0b2f7Stbbdev     }
388*51c0b2f7Stbbdev 
389*51c0b2f7Stbbdev private:
390*51c0b2f7Stbbdev 
391*51c0b2f7Stbbdev     const int my_expected_items;
392*51c0b2f7Stbbdev     const int my_sleep_time;
393*51c0b2f7Stbbdev     std::atomic< bool > is_active;
394*51c0b2f7Stbbdev 
395*51c0b2f7Stbbdev     async_activity_queue<work_type> my_work_queue;
396*51c0b2f7Stbbdev 
397*51c0b2f7Stbbdev     std::atomic< bool > my_quit;
398*51c0b2f7Stbbdev 
399*51c0b2f7Stbbdev     std::thread my_service_thread;
400*51c0b2f7Stbbdev };
401*51c0b2f7Stbbdev 
402*51c0b2f7Stbbdev template<typename Input, typename Output>
403*51c0b2f7Stbbdev struct basic_test {
404*51c0b2f7Stbbdev     typedef Input input_type;
405*51c0b2f7Stbbdev     typedef Output output_type;
406*51c0b2f7Stbbdev     typedef tbb::flow::async_node< input_type, output_type > async_node_type;
407*51c0b2f7Stbbdev     typedef typename async_node_type::gateway_type gateway_type;
408*51c0b2f7Stbbdev 
409*51c0b2f7Stbbdev     basic_test() {}
410*51c0b2f7Stbbdev 
411*51c0b2f7Stbbdev     static int run(int async_expected_items = UNKNOWN_NUMBER_OF_ITEMS) {
412*51c0b2f7Stbbdev         async_activity<input_type, output_type> my_async_activity(async_expected_items);
413*51c0b2f7Stbbdev 
414*51c0b2f7Stbbdev         tbb::flow::graph g;
415*51c0b2f7Stbbdev 
416*51c0b2f7Stbbdev         tbb::flow::function_node< int, input_type > start_node(
417*51c0b2f7Stbbdev             g, tbb::flow::unlimited, [](int input) { return input_type(input); }
418*51c0b2f7Stbbdev         );
419*51c0b2f7Stbbdev         async_node_type offload_node(
420*51c0b2f7Stbbdev             g, tbb::flow::unlimited,
421*51c0b2f7Stbbdev             [&] (const input_type &input, gateway_type& gateway) {
422*51c0b2f7Stbbdev                 ++async_body_exec_count;
423*51c0b2f7Stbbdev                 if(my_async_activity.should_reserve_each_time())
424*51c0b2f7Stbbdev                     gateway.reserve_wait();
425*51c0b2f7Stbbdev                 my_async_activity.submit(input, gateway);
426*51c0b2f7Stbbdev             }
427*51c0b2f7Stbbdev         );
428*51c0b2f7Stbbdev         tbb::flow::function_node< output_type > end_node(
429*51c0b2f7Stbbdev             g, tbb::flow::unlimited,
430*51c0b2f7Stbbdev             [&](const output_type& input) {
431*51c0b2f7Stbbdev                 ++end_body_exec_count;
432*51c0b2f7Stbbdev                 output_type output;
433*51c0b2f7Stbbdev                 wrapper_helper<output_type, output_type>::check(input, output);
434*51c0b2f7Stbbdev             }
435*51c0b2f7Stbbdev         );
436*51c0b2f7Stbbdev 
437*51c0b2f7Stbbdev         tbb::flow::make_edge( start_node, offload_node );
438*51c0b2f7Stbbdev         tbb::flow::make_edge( offload_node, end_node );
439*51c0b2f7Stbbdev 
440*51c0b2f7Stbbdev         async_body_exec_count = 0;
441*51c0b2f7Stbbdev         async_activity_processed_msg_count = 0;
442*51c0b2f7Stbbdev         end_body_exec_count = 0;
443*51c0b2f7Stbbdev 
444*51c0b2f7Stbbdev         if (async_expected_items != UNKNOWN_NUMBER_OF_ITEMS) {
445*51c0b2f7Stbbdev             offload_node.gateway().reserve_wait();
446*51c0b2f7Stbbdev         }
447*51c0b2f7Stbbdev         for (int i = 0; i < NUMBER_OF_MSGS; ++i) {
448*51c0b2f7Stbbdev             start_node.try_put(i);
449*51c0b2f7Stbbdev         }
450*51c0b2f7Stbbdev         g.wait_for_all();
451*51c0b2f7Stbbdev         CHECK_MESSAGE( ( async_body_exec_count == NUMBER_OF_MSGS), "AsyncBody processed wrong number of signals" );
452*51c0b2f7Stbbdev         CHECK_MESSAGE( ( async_activity_processed_msg_count == NUMBER_OF_MSGS), "AsyncActivity processed wrong number of signals" );
453*51c0b2f7Stbbdev         CHECK_MESSAGE( ( end_body_exec_count == NUMBER_OF_MSGS), "EndBody processed wrong number of signals");
454*51c0b2f7Stbbdev         INFO( "async_body_exec_count == " << int(async_body_exec_count) <<
455*51c0b2f7Stbbdev               " == async_activity_processed_msg_count == " << int(async_activity_processed_msg_count) <<
456*51c0b2f7Stbbdev               " == end_body_exec_count == " << int(end_body_exec_count) << "\n"
457*51c0b2f7Stbbdev         );
458*51c0b2f7Stbbdev         return 0;
459*51c0b2f7Stbbdev     }
460*51c0b2f7Stbbdev 
461*51c0b2f7Stbbdev };
462*51c0b2f7Stbbdev 
463*51c0b2f7Stbbdev int test_copy_ctor() {
464*51c0b2f7Stbbdev     const int N = NUMBER_OF_MSGS;
465*51c0b2f7Stbbdev     async_body_exec_count = 0;
466*51c0b2f7Stbbdev 
467*51c0b2f7Stbbdev     tbb::flow::graph g;
468*51c0b2f7Stbbdev 
469*51c0b2f7Stbbdev     harness_counting_receiver<int> r1(g);
470*51c0b2f7Stbbdev     harness_counting_receiver<int> r2(g);
471*51c0b2f7Stbbdev 
472*51c0b2f7Stbbdev     tbb::task_group_context graph_ctx;
473*51c0b2f7Stbbdev     counting_async_node_type a(g, tbb::flow::unlimited, counting_async_unlimited_body(graph_ctx) );
474*51c0b2f7Stbbdev     counting_async_node_type b(a);
475*51c0b2f7Stbbdev 
476*51c0b2f7Stbbdev     tbb::flow::make_edge(a, r1);                             // C++11-style of making edges
477*51c0b2f7Stbbdev     tbb::flow::make_edge(tbb::flow::output_port<0>(b), r2);  // usual way of making edges
478*51c0b2f7Stbbdev 
479*51c0b2f7Stbbdev     for (int i = 0; i < N; ++i) {
480*51c0b2f7Stbbdev        a.try_put(i);
481*51c0b2f7Stbbdev     }
482*51c0b2f7Stbbdev     g.wait_for_all();
483*51c0b2f7Stbbdev 
484*51c0b2f7Stbbdev     INFO("async_body_exec_count = " << int(async_body_exec_count) << "\n" );
485*51c0b2f7Stbbdev     INFO("r1.my_count == " << int(r1.my_count) << " and r2.my_count = " << int(r2.my_count) << "\n" );
486*51c0b2f7Stbbdev     CHECK_MESSAGE( ( int(async_body_exec_count) == NUMBER_OF_MSGS), "AsyncBody processed wrong number of signals" );
487*51c0b2f7Stbbdev     CHECK_MESSAGE( ( int(r1.my_count) == N), "counting receiver r1 has not received N items" );
488*51c0b2f7Stbbdev     CHECK_MESSAGE( ( int(r2.my_count) == 0), "counting receiver r2 has not received 0 items" );
489*51c0b2f7Stbbdev 
490*51c0b2f7Stbbdev     for (int i = 0; i < N; ++i) {
491*51c0b2f7Stbbdev        b.try_put(i);
492*51c0b2f7Stbbdev     }
493*51c0b2f7Stbbdev     g.wait_for_all();
494*51c0b2f7Stbbdev 
495*51c0b2f7Stbbdev     INFO("async_body_exec_count = " << int(async_body_exec_count) << "\n" );
496*51c0b2f7Stbbdev     INFO("r1.my_count == " << int(r1.my_count) << " and r2.my_count = " << int(r2.my_count) << "\n" );
497*51c0b2f7Stbbdev     CHECK_MESSAGE( ( int(async_body_exec_count) == 2*NUMBER_OF_MSGS), "AsyncBody processed wrong number of signals" );
498*51c0b2f7Stbbdev     CHECK_MESSAGE( ( int(r1.my_count) == N), "counting receiver r1 has not received N items" );
499*51c0b2f7Stbbdev     CHECK_MESSAGE( ( int(r2.my_count) == N), "counting receiver r2 has not received N items" );
500*51c0b2f7Stbbdev     return 0;
501*51c0b2f7Stbbdev }
502*51c0b2f7Stbbdev 
503*51c0b2f7Stbbdev std::atomic<int> main_tid_count;
504*51c0b2f7Stbbdev 
505*51c0b2f7Stbbdev template<typename Input, typename Output>
506*51c0b2f7Stbbdev struct spin_test {
507*51c0b2f7Stbbdev     typedef Input input_type;
508*51c0b2f7Stbbdev     typedef Output output_type;
509*51c0b2f7Stbbdev     typedef tbb::flow::async_node< input_type, output_type > async_node_type;
510*51c0b2f7Stbbdev     typedef typename async_node_type::gateway_type gateway_type;
511*51c0b2f7Stbbdev 
512*51c0b2f7Stbbdev     class end_body_type {
513*51c0b2f7Stbbdev         typedef Output output_type;
514*51c0b2f7Stbbdev         std::thread::id my_main_tid;
515*51c0b2f7Stbbdev         utils::SpinBarrier *my_barrier;
516*51c0b2f7Stbbdev     public:
517*51c0b2f7Stbbdev         end_body_type(std::thread::id t, utils::SpinBarrier &b) : my_main_tid(t), my_barrier(&b) { }
518*51c0b2f7Stbbdev 
519*51c0b2f7Stbbdev         void operator()( const output_type & ) {
520*51c0b2f7Stbbdev             ++end_body_exec_count;
521*51c0b2f7Stbbdev             if (std::this_thread::get_id() == my_main_tid) {
522*51c0b2f7Stbbdev                ++main_tid_count;
523*51c0b2f7Stbbdev             }
524*51c0b2f7Stbbdev             my_barrier->timedWaitNoError(10);
525*51c0b2f7Stbbdev         }
526*51c0b2f7Stbbdev     };
527*51c0b2f7Stbbdev 
528*51c0b2f7Stbbdev     spin_test() {}
529*51c0b2f7Stbbdev 
530*51c0b2f7Stbbdev     static int run(int nthreads, int async_expected_items = UNKNOWN_NUMBER_OF_ITEMS) {
531*51c0b2f7Stbbdev         async_activity<input_type, output_type> my_async_activity(async_expected_items, false, 0);
532*51c0b2f7Stbbdev         const int overall_message_count = nthreads * NUMBER_OF_MSGS;
533*51c0b2f7Stbbdev         utils::SpinBarrier spin_barrier(nthreads);
534*51c0b2f7Stbbdev 
535*51c0b2f7Stbbdev         tbb::flow::graph g;
536*51c0b2f7Stbbdev         tbb::flow::function_node<int, input_type> start_node(
537*51c0b2f7Stbbdev             g, tbb::flow::unlimited, [](int input) { return input_type(input); }
538*51c0b2f7Stbbdev         );
539*51c0b2f7Stbbdev         async_node_type offload_node(
540*51c0b2f7Stbbdev             g, tbb::flow::unlimited,
541*51c0b2f7Stbbdev             [&](const input_type &input, gateway_type& gateway) {
542*51c0b2f7Stbbdev                 ++async_body_exec_count;
543*51c0b2f7Stbbdev                 if(my_async_activity.should_reserve_each_time())
544*51c0b2f7Stbbdev                     gateway.reserve_wait();
545*51c0b2f7Stbbdev                 my_async_activity.submit(input, gateway);
546*51c0b2f7Stbbdev             }
547*51c0b2f7Stbbdev         );
548*51c0b2f7Stbbdev         tbb::flow::function_node<output_type> end_node(
549*51c0b2f7Stbbdev             g, tbb::flow::unlimited, end_body_type(std::this_thread::get_id(), spin_barrier)
550*51c0b2f7Stbbdev         );
551*51c0b2f7Stbbdev 
552*51c0b2f7Stbbdev         tbb::flow::make_edge( start_node, offload_node );
553*51c0b2f7Stbbdev         tbb::flow::make_edge( offload_node, end_node );
554*51c0b2f7Stbbdev 
555*51c0b2f7Stbbdev         async_body_exec_count = 0;
556*51c0b2f7Stbbdev         async_activity_processed_msg_count = 0;
557*51c0b2f7Stbbdev         end_body_exec_count = 0;
558*51c0b2f7Stbbdev         main_tid_count = 0;
559*51c0b2f7Stbbdev 
560*51c0b2f7Stbbdev         if (async_expected_items != UNKNOWN_NUMBER_OF_ITEMS ) {
561*51c0b2f7Stbbdev             offload_node.gateway().reserve_wait();
562*51c0b2f7Stbbdev         }
563*51c0b2f7Stbbdev         for (int i = 0; i < overall_message_count; ++i) {
564*51c0b2f7Stbbdev             start_node.try_put(i);
565*51c0b2f7Stbbdev         }
566*51c0b2f7Stbbdev         g.wait_for_all();
567*51c0b2f7Stbbdev         CHECK_MESSAGE( (async_body_exec_count == overall_message_count),
568*51c0b2f7Stbbdev                        "AsyncBody processed wrong number of signals" );
569*51c0b2f7Stbbdev         CHECK_MESSAGE( (async_activity_processed_msg_count == overall_message_count),
570*51c0b2f7Stbbdev                        "AsyncActivity processed wrong number of signals" );
571*51c0b2f7Stbbdev         CHECK_MESSAGE( (end_body_exec_count == overall_message_count),
572*51c0b2f7Stbbdev                        "EndBody processed wrong number of signals");
573*51c0b2f7Stbbdev 
574*51c0b2f7Stbbdev         INFO( "Main thread participated in " << main_tid_count << " end_body tasks\n");
575*51c0b2f7Stbbdev 
576*51c0b2f7Stbbdev         INFO("async_body_exec_count == " << int(async_body_exec_count) <<
577*51c0b2f7Stbbdev              " == async_activity_processed_msg_count == " << int(async_activity_processed_msg_count) <<
578*51c0b2f7Stbbdev              " == end_body_exec_count == " << int(end_body_exec_count) << "\n"
579*51c0b2f7Stbbdev         );
580*51c0b2f7Stbbdev         return 0;
581*51c0b2f7Stbbdev     }
582*51c0b2f7Stbbdev 
583*51c0b2f7Stbbdev };
584*51c0b2f7Stbbdev 
585*51c0b2f7Stbbdev void test_for_spin_avoidance() {
586*51c0b2f7Stbbdev     const int nthreads = 4;
587*51c0b2f7Stbbdev     tbb::global_control gc(tbb::global_control::max_allowed_parallelism, nthreads);
588*51c0b2f7Stbbdev     spin_test<int, int>::run(nthreads);
589*51c0b2f7Stbbdev }
590*51c0b2f7Stbbdev 
591*51c0b2f7Stbbdev template< typename Input, typename Output >
592*51c0b2f7Stbbdev int run_tests() {
593*51c0b2f7Stbbdev     basic_test<Input, Output>::run();
594*51c0b2f7Stbbdev     basic_test<Input, Output>::run(NUMBER_OF_MSGS);
595*51c0b2f7Stbbdev     basic_test<place_wrapper<Input>, place_wrapper<Output> >::run();
596*51c0b2f7Stbbdev     basic_test<place_wrapper<Input>, place_wrapper<Output> >::run(NUMBER_OF_MSGS);
597*51c0b2f7Stbbdev     return 0;
598*51c0b2f7Stbbdev }
599*51c0b2f7Stbbdev 
600*51c0b2f7Stbbdev #include "tbb/parallel_for.h"
601*51c0b2f7Stbbdev template<typename Input, typename Output>
602*51c0b2f7Stbbdev class equeueing_on_inner_level {
603*51c0b2f7Stbbdev     typedef Input input_type;
604*51c0b2f7Stbbdev     typedef Output output_type;
605*51c0b2f7Stbbdev     typedef async_activity<input_type, output_type> async_activity_type;
606*51c0b2f7Stbbdev     typedef tbb::flow::async_node<Input, Output> async_node_type;
607*51c0b2f7Stbbdev     typedef typename async_node_type::gateway_type gateway_type;
608*51c0b2f7Stbbdev 
609*51c0b2f7Stbbdev     class body_graph_with_async {
610*51c0b2f7Stbbdev     public:
611*51c0b2f7Stbbdev         body_graph_with_async( utils::SpinBarrier& barrier, async_activity_type& activity )
612*51c0b2f7Stbbdev             : spin_barrier(&barrier), my_async_activity(&activity) {}
613*51c0b2f7Stbbdev 
614*51c0b2f7Stbbdev         void operator()(int) const {
615*51c0b2f7Stbbdev             tbb::flow::graph g;
616*51c0b2f7Stbbdev             tbb::flow::function_node< int, input_type > start_node(
617*51c0b2f7Stbbdev                 g, tbb::flow::unlimited, [](int input) { return input_type(input); }
618*51c0b2f7Stbbdev             );
619*51c0b2f7Stbbdev             async_node_type offload_node(
620*51c0b2f7Stbbdev                 g, tbb::flow::unlimited,
621*51c0b2f7Stbbdev                 [&](const input_type &input, gateway_type& gateway) {
622*51c0b2f7Stbbdev                     gateway.reserve_wait();
623*51c0b2f7Stbbdev                     my_async_activity->submit( input, gateway );
624*51c0b2f7Stbbdev                 }
625*51c0b2f7Stbbdev             );
626*51c0b2f7Stbbdev             tbb::flow::function_node< output_type > end_node( g, tbb::flow::unlimited, [](output_type){} );
627*51c0b2f7Stbbdev 
628*51c0b2f7Stbbdev             tbb::flow::make_edge( start_node, offload_node );
629*51c0b2f7Stbbdev             tbb::flow::make_edge( offload_node, end_node );
630*51c0b2f7Stbbdev 
631*51c0b2f7Stbbdev             start_node.try_put(1);
632*51c0b2f7Stbbdev 
633*51c0b2f7Stbbdev             spin_barrier->wait();
634*51c0b2f7Stbbdev 
635*51c0b2f7Stbbdev             my_async_activity->activate();
636*51c0b2f7Stbbdev 
637*51c0b2f7Stbbdev             g.wait_for_all();
638*51c0b2f7Stbbdev         }
639*51c0b2f7Stbbdev 
640*51c0b2f7Stbbdev     private:
641*51c0b2f7Stbbdev         utils::SpinBarrier* spin_barrier;
642*51c0b2f7Stbbdev         async_activity_type* my_async_activity;
643*51c0b2f7Stbbdev     };
644*51c0b2f7Stbbdev 
645*51c0b2f7Stbbdev public:
646*51c0b2f7Stbbdev     static int run ()
647*51c0b2f7Stbbdev     {
648*51c0b2f7Stbbdev         const int nthreads = tbb::this_task_arena::max_concurrency();
649*51c0b2f7Stbbdev         utils::SpinBarrier spin_barrier( nthreads );
650*51c0b2f7Stbbdev 
651*51c0b2f7Stbbdev         async_activity_type my_async_activity( UNKNOWN_NUMBER_OF_ITEMS, true );
652*51c0b2f7Stbbdev 
653*51c0b2f7Stbbdev         tbb::parallel_for( 0, nthreads, body_graph_with_async( spin_barrier, my_async_activity ) );
654*51c0b2f7Stbbdev         return 0;
655*51c0b2f7Stbbdev     }
656*51c0b2f7Stbbdev };
657*51c0b2f7Stbbdev 
658*51c0b2f7Stbbdev int run_test_equeueing_on_inner_level() {
659*51c0b2f7Stbbdev     equeueing_on_inner_level<int, int>::run();
660*51c0b2f7Stbbdev     return 0;
661*51c0b2f7Stbbdev }
662*51c0b2f7Stbbdev 
663*51c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
664*51c0b2f7Stbbdev #include <array>
665*51c0b2f7Stbbdev 
666*51c0b2f7Stbbdev template<typename NodeType>
667*51c0b2f7Stbbdev class AsyncActivity {
668*51c0b2f7Stbbdev public:
669*51c0b2f7Stbbdev     using gateway_t = typename NodeType::gateway_type;
670*51c0b2f7Stbbdev 
671*51c0b2f7Stbbdev     struct work_type {
672*51c0b2f7Stbbdev         int input;
673*51c0b2f7Stbbdev         gateway_t* gateway;
674*51c0b2f7Stbbdev     };
675*51c0b2f7Stbbdev 
676*51c0b2f7Stbbdev     AsyncActivity(size_t limit) : thr([this]() {
677*51c0b2f7Stbbdev         while(!end_of_work()) {
678*51c0b2f7Stbbdev             work_type w;
679*51c0b2f7Stbbdev             while( my_q.try_pop(w) ) {
680*51c0b2f7Stbbdev                 int res = do_work(w.input);
681*51c0b2f7Stbbdev                 w.gateway->try_put(res);
682*51c0b2f7Stbbdev                 w.gateway->release_wait();
683*51c0b2f7Stbbdev                 ++c;
684*51c0b2f7Stbbdev             }
685*51c0b2f7Stbbdev         }
686*51c0b2f7Stbbdev     }), stop_limit(limit), c(0) {}
687*51c0b2f7Stbbdev 
688*51c0b2f7Stbbdev     void submit(int i, gateway_t* gateway) {
689*51c0b2f7Stbbdev         work_type w = {i, gateway};
690*51c0b2f7Stbbdev         gateway->reserve_wait();
691*51c0b2f7Stbbdev         my_q.push(w);
692*51c0b2f7Stbbdev     }
693*51c0b2f7Stbbdev 
694*51c0b2f7Stbbdev     void wait_for_all() { thr.join(); }
695*51c0b2f7Stbbdev 
696*51c0b2f7Stbbdev private:
697*51c0b2f7Stbbdev     bool end_of_work() { return c >= stop_limit; }
698*51c0b2f7Stbbdev 
699*51c0b2f7Stbbdev     int do_work(int& i) { return i + i; }
700*51c0b2f7Stbbdev 
701*51c0b2f7Stbbdev     async_activity_queue<work_type> my_q;
702*51c0b2f7Stbbdev     std::thread thr;
703*51c0b2f7Stbbdev     size_t stop_limit;
704*51c0b2f7Stbbdev     size_t c;
705*51c0b2f7Stbbdev };
706*51c0b2f7Stbbdev 
707*51c0b2f7Stbbdev void test_follows() {
708*51c0b2f7Stbbdev     using namespace tbb::flow;
709*51c0b2f7Stbbdev 
710*51c0b2f7Stbbdev     using input_t = int;
711*51c0b2f7Stbbdev     using output_t = int;
712*51c0b2f7Stbbdev     using node_t = async_node<input_t, output_t>;
713*51c0b2f7Stbbdev 
714*51c0b2f7Stbbdev     graph g;
715*51c0b2f7Stbbdev 
716*51c0b2f7Stbbdev     AsyncActivity<node_t> async_activity(3);
717*51c0b2f7Stbbdev 
718*51c0b2f7Stbbdev     std::array<broadcast_node<input_t>, 3> preds = {
719*51c0b2f7Stbbdev       {
720*51c0b2f7Stbbdev         broadcast_node<input_t>(g),
721*51c0b2f7Stbbdev         broadcast_node<input_t>(g),
722*51c0b2f7Stbbdev         broadcast_node<input_t>(g)
723*51c0b2f7Stbbdev       }
724*51c0b2f7Stbbdev     };
725*51c0b2f7Stbbdev 
726*51c0b2f7Stbbdev     node_t node(follows(preds[0], preds[1], preds[2]), unlimited, [&](int input, node_t::gateway_type& gtw) {
727*51c0b2f7Stbbdev         async_activity.submit(input, &gtw);
728*51c0b2f7Stbbdev     }, no_priority);
729*51c0b2f7Stbbdev 
730*51c0b2f7Stbbdev     buffer_node<output_t> buf(g);
731*51c0b2f7Stbbdev     make_edge(node, buf);
732*51c0b2f7Stbbdev 
733*51c0b2f7Stbbdev     for(auto& pred: preds) {
734*51c0b2f7Stbbdev         pred.try_put(1);
735*51c0b2f7Stbbdev     }
736*51c0b2f7Stbbdev 
737*51c0b2f7Stbbdev     g.wait_for_all();
738*51c0b2f7Stbbdev     async_activity.wait_for_all();
739*51c0b2f7Stbbdev 
740*51c0b2f7Stbbdev     output_t storage;
741*51c0b2f7Stbbdev     CHECK_MESSAGE((buf.try_get(storage) && buf.try_get(storage) && buf.try_get(storage) && !buf.try_get(storage)),
742*51c0b2f7Stbbdev                   "Not exact edge quantity was made");
743*51c0b2f7Stbbdev }
744*51c0b2f7Stbbdev 
745*51c0b2f7Stbbdev void test_precedes() {
746*51c0b2f7Stbbdev     using namespace tbb::flow;
747*51c0b2f7Stbbdev 
748*51c0b2f7Stbbdev     using input_t = int;
749*51c0b2f7Stbbdev     using output_t = int;
750*51c0b2f7Stbbdev     using node_t = async_node<input_t, output_t>;
751*51c0b2f7Stbbdev 
752*51c0b2f7Stbbdev     graph g;
753*51c0b2f7Stbbdev 
754*51c0b2f7Stbbdev     AsyncActivity<node_t> async_activity(1);
755*51c0b2f7Stbbdev 
756*51c0b2f7Stbbdev     std::array<buffer_node<input_t>, 1> successors = { {buffer_node<input_t>(g)} };
757*51c0b2f7Stbbdev 
758*51c0b2f7Stbbdev     broadcast_node<input_t> start(g);
759*51c0b2f7Stbbdev 
760*51c0b2f7Stbbdev     node_t node(precedes(successors[0]), unlimited, [&](int input, node_t::gateway_type& gtw) {
761*51c0b2f7Stbbdev         async_activity.submit(input, &gtw);
762*51c0b2f7Stbbdev     }, no_priority);
763*51c0b2f7Stbbdev 
764*51c0b2f7Stbbdev     make_edge(start, node);
765*51c0b2f7Stbbdev 
766*51c0b2f7Stbbdev     start.try_put(1);
767*51c0b2f7Stbbdev 
768*51c0b2f7Stbbdev     g.wait_for_all();
769*51c0b2f7Stbbdev     async_activity.wait_for_all();
770*51c0b2f7Stbbdev 
771*51c0b2f7Stbbdev     for(auto& successor : successors) {
772*51c0b2f7Stbbdev         output_t storage;
773*51c0b2f7Stbbdev         CHECK_MESSAGE((successor.try_get(storage) && !successor.try_get(storage)),
774*51c0b2f7Stbbdev                       "Not exact edge quantity was made");
775*51c0b2f7Stbbdev     }
776*51c0b2f7Stbbdev }
777*51c0b2f7Stbbdev 
778*51c0b2f7Stbbdev void test_follows_and_precedes_api() {
779*51c0b2f7Stbbdev     test_follows();
780*51c0b2f7Stbbdev     test_precedes();
781*51c0b2f7Stbbdev }
782*51c0b2f7Stbbdev #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
783*51c0b2f7Stbbdev 
784*51c0b2f7Stbbdev //! Test async bodies processing
785*51c0b2f7Stbbdev //! \brief \ref requirement \ref error_guessing
786*51c0b2f7Stbbdev TEST_CASE("Basic tests"){
787*51c0b2f7Stbbdev     tbb::task_arena arena(utils::MaxThread);
788*51c0b2f7Stbbdev     arena.execute(
789*51c0b2f7Stbbdev         [&]() {
790*51c0b2f7Stbbdev             run_tests<int, int>();
791*51c0b2f7Stbbdev             run_tests<minimal_type, minimal_type>();
792*51c0b2f7Stbbdev             run_tests<int, minimal_type>();
793*51c0b2f7Stbbdev         }
794*51c0b2f7Stbbdev     );
795*51c0b2f7Stbbdev }
796*51c0b2f7Stbbdev 
797*51c0b2f7Stbbdev //! NativeParallelFor test with various concurrency settings
798*51c0b2f7Stbbdev //! \brief \ref requirement \ref error_guessing
799*51c0b2f7Stbbdev TEST_CASE("Lightweight tests"){
800*51c0b2f7Stbbdev     lightweight_testing::test<tbb::flow::async_node>(NUMBER_OF_MSGS);
801*51c0b2f7Stbbdev }
802*51c0b2f7Stbbdev 
803*51c0b2f7Stbbdev //! Test reset and cancellation
804*51c0b2f7Stbbdev //! \brief \ref error_guessing
805*51c0b2f7Stbbdev TEST_CASE("Reset test"){
806*51c0b2f7Stbbdev     test_reset();
807*51c0b2f7Stbbdev }
808*51c0b2f7Stbbdev 
809*51c0b2f7Stbbdev //! Test
810*51c0b2f7Stbbdev //! \brief \ref requirement \ref error_guessing
811*51c0b2f7Stbbdev TEST_CASE("Copy constructor test"){
812*51c0b2f7Stbbdev     test_copy_ctor();
813*51c0b2f7Stbbdev }
814*51c0b2f7Stbbdev 
815*51c0b2f7Stbbdev //! Test if main thread spins
816*51c0b2f7Stbbdev //! \brief \ref stress
817*51c0b2f7Stbbdev TEST_CASE("Spin avoidance test"){
818*51c0b2f7Stbbdev     test_for_spin_avoidance();
819*51c0b2f7Stbbdev }
820*51c0b2f7Stbbdev 
821*51c0b2f7Stbbdev //! Test nested enqueing
822*51c0b2f7Stbbdev //! \brief \ref error_guessing
823*51c0b2f7Stbbdev TEST_CASE("Inner enqueing test"){
824*51c0b2f7Stbbdev     run_test_equeueing_on_inner_level();
825*51c0b2f7Stbbdev }
826*51c0b2f7Stbbdev 
827*51c0b2f7Stbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
828*51c0b2f7Stbbdev //! Test deprecated follows and preceedes API
829*51c0b2f7Stbbdev //! \brief \ref error_guessing
830*51c0b2f7Stbbdev TEST_CASE("Test follows and preceedes API"){
831*51c0b2f7Stbbdev     test_follows_and_precedes_api();
832*51c0b2f7Stbbdev }
833*51c0b2f7Stbbdev #endif
834