1 /*
2 Copyright (c) 2005-2023 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #if __INTEL_COMPILER && _MSC_VER
18 #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
19 #endif
20
21 #include "common/config.h"
22
23 #include "tbb/flow_graph.h"
24
25 #include "tbb/task.h"
26 #include "tbb/global_control.h"
27
28 #include "common/test.h"
29 #include "common/utils.h"
30 #include "common/utils_assert.h"
31 #include "common/graph_utils.h"
32 #include "common/spin_barrier.h"
33 #include "common/test_follows_and_precedes_api.h"
34 #include "common/concepts_common.h"
35
36 #include <string>
37 #include <thread>
38 #include <mutex>
39
40
41 //! \file test_async_node.cpp
42 //! \brief Test for [flow_graph.async_node] specification
43
44
45 class minimal_type {
46 template<typename T>
47 friend struct place_wrapper;
48
49 int value;
50
51 public:
minimal_type()52 minimal_type() : value(-1) {}
minimal_type(int v)53 minimal_type(int v) : value(v) {}
minimal_type(const minimal_type & m)54 minimal_type(const minimal_type &m) : value(m.value) { }
operator =(const minimal_type & m)55 minimal_type &operator=(const minimal_type &m) { value = m.value; return *this; }
56 };
57
58 template <typename T>
59 struct place_wrapper {
60 typedef T wrapped_type;
61 T value;
62 std::thread::id thread_id;
63
place_wrapperplace_wrapper64 place_wrapper( int v = 0 ) : value(v), thread_id(std::this_thread::get_id()) {}
65
66 template <typename Q>
place_wrapperplace_wrapper67 place_wrapper(const place_wrapper<Q>& v)
68 : value(v.value), thread_id(v.thread_id)
69 {}
70
71 template <typename Q>
operator =place_wrapper72 place_wrapper<Q>& operator=(const place_wrapper<Q>& v) {
73 if (this != &v) {
74 value = v.value;
75 thread_id = v.thread_id;
76 }
77 return *this;
78 }
79
80 };
81
82 template<typename T1, typename T2>
83 struct wrapper_helper {
checkwrapper_helper84 static void check(const T1 &, const T2 &) { }
copy_valuewrapper_helper85 static void copy_value(const T1 &in, T2 &out) { out = in; }
86 };
87
88 template<typename T1, typename T2>
89 struct wrapper_helper< place_wrapper<T1>, place_wrapper<T2> > {
checkwrapper_helper90 static void check(const place_wrapper<T1> &a, const place_wrapper<T2> &b) {
91 CHECK_MESSAGE( ( (a.thread_id != b.thread_id)), "same thread used to execute adjacent nodes");
92 return;
93 }
copy_valuewrapper_helper94 static void copy_value(const place_wrapper<T1> &in, place_wrapper<T2> &out) {
95 out.value = in.value;
96 }
97 };
98
99 const int NUMBER_OF_MSGS = 10;
100 const int UNKNOWN_NUMBER_OF_ITEMS = -1;
101 std::atomic<int> async_body_exec_count;
102 std::atomic<int> async_activity_processed_msg_count;
103 std::atomic<int> end_body_exec_count;
104
105 // queueing required in test_reset for testing of cancellation
106 typedef tbb::flow::async_node< int, int, tbb::flow::queueing > counting_async_node_type;
107 typedef counting_async_node_type::gateway_type counting_gateway_type;
108
109 struct counting_async_unlimited_body {
110
counting_async_unlimited_bodycounting_async_unlimited_body111 counting_async_unlimited_body(tbb::task_group_context& graph_tgc) : my_tgc( graph_tgc ) {}
112
operator ()counting_async_unlimited_body113 void operator()( const int &input, counting_gateway_type& gateway) {
114 // TODO revamp: reconsider logging for the tests. It is known that frequent calls to
115 // doctest's INFO cause issues.
116
117 // INFO( "Body execution with input == " << input << "\n");
118 ++async_body_exec_count;
119 if ( input == -1 ) {
120 bool result = my_tgc.cancel_group_execution();
121 // INFO( "Canceling graph execution\n" );
122 CHECK_MESSAGE( ( result == true), "attempted to cancel graph twice" );
123 utils::Sleep(50);
124 }
125 gateway.try_put(input);
126 }
127 private:
128 tbb::task_group_context& my_tgc;
129 };
130
131 struct counting_async_serial_body : counting_async_unlimited_body {
132 typedef counting_async_unlimited_body base_type;
133 int my_async_body_exec_count;
134
counting_async_serial_bodycounting_async_serial_body135 counting_async_serial_body(tbb::task_group_context& tgc)
136 : base_type(tgc), my_async_body_exec_count( 0 ) { }
137
operator ()counting_async_serial_body138 void operator()( const int &input, counting_gateway_type& gateway ) {
139 ++my_async_body_exec_count;
140 base_type::operator()( input, gateway );
141 }
142 };
143
test_reset()144 void test_reset() {
145 const int N = NUMBER_OF_MSGS;
146 async_body_exec_count = 0;
147
148 tbb::task_group_context graph_ctx;
149 tbb::flow::graph g(graph_ctx);
150 counting_async_node_type a(g, tbb::flow::serial, counting_async_serial_body(graph_ctx) );
151
152 const int R = 3;
153 std::vector< std::shared_ptr<harness_counting_receiver<int>> > r;
154 for (size_t i = 0; i < R; ++i) {
155 r.push_back( std::make_shared<harness_counting_receiver<int>>(g) );
156 }
157
158 for (int i = 0; i < R; ++i) {
159 tbb::flow::make_edge(a, *r[i]);
160 }
161
162 INFO( "One body execution\n" );
163 a.try_put(-1);
164 for (int i = 0; i < N; ++i) {
165 a.try_put(i);
166 }
167 g.wait_for_all();
168 // should be canceled with only 1 item reaching the async_body and the counting receivers
169 // and N items left in the node's queue
170 CHECK_MESSAGE( ( g.is_cancelled() == true), "task group not canceled" );
171
172 counting_async_serial_body b1 = tbb::flow::copy_body<counting_async_serial_body>(a);
173 CHECK_MESSAGE( ( int(async_body_exec_count) == int(b1.my_async_body_exec_count)), "body and global body counts are different" );
174 CHECK_MESSAGE( ( int(async_body_exec_count) == 1), "global body execution count not 1" );
175 for (int i = 0; i < R; ++i) {
176 CHECK_MESSAGE( ( int(r[i]->my_count) == 1), "counting receiver count not 1" );
177 }
178
179 // should clear the async_node queue, but retain its local count at 1 and keep all edges
180 g.reset(tbb::flow::rf_reset_protocol);
181
182 INFO( "N body executions\n" );
183 for (int i = 0; i < N; ++i) {
184 a.try_put(i);
185 }
186 g.wait_for_all();
187 CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" );
188
189 // a total of N+1 items should have passed through the node body
190 // the local body count should also be N+1
191 // and the counting receivers should all have a count of N+1
192 counting_async_serial_body b2 = tbb::flow::copy_body<counting_async_serial_body>(a);
193 CHECK_MESSAGE( int(async_body_exec_count) == int(b2.my_async_body_exec_count),
194 "local and global body execution counts are different" );
195 INFO( "async_body_exec_count==" << int(async_body_exec_count) << "\n" );
196 CHECK_MESSAGE( ( int(async_body_exec_count) == N+1), "global body execution count not N+1" );
197 for (int i = 0; i < R; ++i) {
198 CHECK_MESSAGE( ( int(r[i]->my_count) == N+1), "counting receiver has not received N+1 items" );
199 }
200
201 INFO( "N body executions with new bodies\n" );
202 // should clear the async_node queue and reset its local count to 0, but keep all edges
203 g.reset(tbb::flow::rf_reset_bodies);
204 for (int i = 0; i < N; ++i) {
205 a.try_put(i);
206 }
207 g.wait_for_all();
208 CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" );
209
210 // a total of 2N+1 items should have passed through the node body
211 // the local body count should be N
212 // and the counting receivers should all have a count of 2N+1
213 counting_async_serial_body b3 = tbb::flow::copy_body<counting_async_serial_body>(a);
214 CHECK_MESSAGE( ( int(async_body_exec_count) == 2*N+1), "global body execution count not 2N+1" );
215 CHECK_MESSAGE( ( int(b3.my_async_body_exec_count) == N), "local body execution count not N" );
216 for (int i = 0; i < R; ++i) {
217 CHECK_MESSAGE( ( int(r[i]->my_count) == 2*N+1), "counting receiver has not received 2N+1 items" );
218 }
219
220 // should clear the async_node queue and keep its local count at N and remove all edges
221 INFO( "N body executions with no edges\n" );
222 g.reset(tbb::flow::rf_clear_edges);
223 for (int i = 0; i < N; ++i) {
224 a.try_put(i);
225 }
226 g.wait_for_all();
227 CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" );
228
229 // a total of 3N+1 items should have passed through the node body
230 // the local body count should now be 2*N
231 // and the counting receivers should remain at a count of 2N+1
232 counting_async_serial_body b4 = tbb::flow::copy_body<counting_async_serial_body>(a);
233 CHECK_MESSAGE( ( int(async_body_exec_count) == 3*N+1), "global body execution count not 3N+1" );
234 CHECK_MESSAGE( ( int(b4.my_async_body_exec_count) == 2*N), "local body execution count not 2N" );
235 for (int i = 0; i < R; ++i) {
236 CHECK_MESSAGE( ( int(r[i]->my_count) == 2*N+1), "counting receiver has not received 2N+1 items" );
237 }
238
239 // put back 1 edge to receiver 0
240 INFO( "N body executions with 1 edge\n" );
241 tbb::flow::make_edge(a, *r[0]);
242 for (int i = 0; i < N; ++i) {
243 a.try_put(i);
244 }
245 g.wait_for_all();
246 CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" );
247
248 // a total of 4N+1 items should have passed through the node body
249 // the local body count should now be 3*N
250 // and all of the counting receivers should remain at a count of 2N+1, except r[0] which should be 3N+1
251 counting_async_serial_body b5 = tbb::flow::copy_body<counting_async_serial_body>(a);
252 CHECK_MESSAGE( ( int(async_body_exec_count) == 4*N+1), "global body execution count not 4N+1" );
253 CHECK_MESSAGE( ( int(b5.my_async_body_exec_count) == 3*N), "local body execution count not 3N" );
254 CHECK_MESSAGE( ( int(r[0]->my_count) == 3*N+1), "counting receiver has not received 3N+1 items" );
255 for (int i = 1; i < R; ++i) {
256 CHECK_MESSAGE( ( int(r[i]->my_count) == 2*N+1), "counting receiver has not received 2N+1 items" );
257 }
258
259 // should clear the async_node queue and keep its local count at N and remove all edges
260 INFO( "N body executions with no edges and new body\n" );
261 g.reset(static_cast<tbb::flow::reset_flags>(tbb::flow::rf_reset_bodies|tbb::flow::rf_clear_edges));
262 for (int i = 0; i < N; ++i) {
263 a.try_put(i);
264 }
265 g.wait_for_all();
266 CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" );
267
268 // a total of 4N+1 items should have passed through the node body
269 // the local body count should now be 3*N
270 // and all of the counting receivers should remain at a count of 2N+1, except r[0] which should be 3N+1
271 counting_async_serial_body b6 = tbb::flow::copy_body<counting_async_serial_body>(a);
272 CHECK_MESSAGE( ( int(async_body_exec_count) == 5*N+1), "global body execution count not 5N+1" );
273 CHECK_MESSAGE( ( int(b6.my_async_body_exec_count) == N), "local body execution count not N" );
274 CHECK_MESSAGE( ( int(r[0]->my_count) == 3*N+1), "counting receiver has not received 3N+1 items" );
275 for (int i = 1; i < R; ++i) {
276 CHECK_MESSAGE( ( int(r[i]->my_count) == 2*N+1), "counting receiver has not received 2N+1 items" );
277 }
278 }
279
280
281 #include <mutex>
282
283 template <typename T>
284 class async_activity_queue {
285 public:
push(const T & item)286 void push( const T& item ) {
287 std::lock_guard<mutex_t> lock( m_mutex );
288 m_queue.push( item );
289 }
290
try_pop(T & item)291 bool try_pop( T& item ) {
292 std::lock_guard<mutex_t> lock( m_mutex );
293 if( m_queue.empty() )
294 return false;
295 item = m_queue.front();
296 m_queue.pop();
297 return true;
298 }
299
empty()300 bool empty() {
301 std::lock_guard<mutex_t> lock( m_mutex );
302 return m_queue.empty();
303 }
304
305 private:
306 typedef std::mutex mutex_t;
307 mutex_t m_mutex;
308 std::queue<T> m_queue;
309 };
310
311 template< typename Input, typename Output >
312 class async_activity : utils::NoAssign {
313 public:
314 typedef Input input_type;
315 typedef Output output_type;
316 typedef tbb::flow::async_node< input_type, output_type > async_node_type;
317 typedef typename async_node_type::gateway_type gateway_type;
318
319 struct work_type {
320 input_type input;
321 gateway_type* gateway;
322 };
323
324 class ServiceThreadBody {
325 public:
ServiceThreadBody(async_activity * activity)326 ServiceThreadBody( async_activity* activity ) : my_activity( activity ) {}
operator ()()327 void operator()() { my_activity->process(); }
328 private:
329 async_activity* my_activity;
330 };
331
async_activity(int expected_items,bool deferred=false,int sleep_time=50)332 async_activity(int expected_items, bool deferred = false, int sleep_time = 50)
333 : my_expected_items(expected_items), my_sleep_time(sleep_time)
334 {
335 is_active = !deferred;
336 my_quit = false;
337 std::thread( ServiceThreadBody( this ) ).swap( my_service_thread );
338 }
339
340 private:
341
async_activity(const async_activity &)342 async_activity( const async_activity& )
343 : my_expected_items(UNKNOWN_NUMBER_OF_ITEMS), my_sleep_time(0)
344 {
345 is_active = true;
346 }
347
348 public:
~async_activity()349 ~async_activity() {
350 stop();
351 my_service_thread.join();
352 }
353
submit(const input_type & input,gateway_type & gateway)354 void submit( const input_type &input, gateway_type& gateway ) {
355 work_type work = {input, &gateway};
356 my_work_queue.push( work );
357 }
358
process()359 void process() {
360 do {
361 work_type work;
362 if( is_active && my_work_queue.try_pop( work ) ) {
363 utils::Sleep(my_sleep_time);
364 ++async_activity_processed_msg_count;
365 output_type output;
366 wrapper_helper<output_type, output_type>::copy_value(work.input, output);
367 wrapper_helper<output_type, output_type>::check(work.input, output);
368 work.gateway->try_put(output);
369 if ( my_expected_items == UNKNOWN_NUMBER_OF_ITEMS ||
370 int(async_activity_processed_msg_count) == my_expected_items ) {
371 work.gateway->release_wait();
372 }
373 }
374 } while( my_quit == false || !my_work_queue.empty());
375 }
376
stop()377 void stop() {
378 my_quit = true;
379 }
380
activate()381 void activate() {
382 is_active = true;
383 }
384
should_reserve_each_time()385 bool should_reserve_each_time() {
386 if ( my_expected_items == UNKNOWN_NUMBER_OF_ITEMS )
387 return true;
388 else
389 return false;
390 }
391
392 private:
393
394 const int my_expected_items;
395 const int my_sleep_time;
396 std::atomic< bool > is_active;
397
398 async_activity_queue<work_type> my_work_queue;
399
400 std::atomic< bool > my_quit;
401
402 std::thread my_service_thread;
403 };
404
405 template<typename Input, typename Output>
406 struct basic_test {
407 typedef Input input_type;
408 typedef Output output_type;
409 typedef tbb::flow::async_node< input_type, output_type > async_node_type;
410 typedef typename async_node_type::gateway_type gateway_type;
411
basic_testbasic_test412 basic_test() {}
413
runbasic_test414 static int run(int async_expected_items = UNKNOWN_NUMBER_OF_ITEMS) {
415 async_activity<input_type, output_type> my_async_activity(async_expected_items);
416
417 tbb::flow::graph g;
418
419 tbb::flow::function_node< int, input_type > start_node(
420 g, tbb::flow::unlimited, [](int input) { return input_type(input); }
421 );
422 async_node_type offload_node(
423 g, tbb::flow::unlimited,
424 [&] (const input_type &input, gateway_type& gateway) {
425 ++async_body_exec_count;
426 if(my_async_activity.should_reserve_each_time())
427 gateway.reserve_wait();
428 my_async_activity.submit(input, gateway);
429 }
430 );
431 tbb::flow::function_node< output_type > end_node(
432 g, tbb::flow::unlimited,
433 [&](const output_type& input) {
434 ++end_body_exec_count;
435 output_type output;
436 wrapper_helper<output_type, output_type>::check(input, output);
437 }
438 );
439
440 tbb::flow::make_edge( start_node, offload_node );
441 tbb::flow::make_edge( offload_node, end_node );
442
443 async_body_exec_count = 0;
444 async_activity_processed_msg_count = 0;
445 end_body_exec_count = 0;
446
447 if (async_expected_items != UNKNOWN_NUMBER_OF_ITEMS) {
448 offload_node.gateway().reserve_wait();
449 }
450 for (int i = 0; i < NUMBER_OF_MSGS; ++i) {
451 start_node.try_put(i);
452 }
453 g.wait_for_all();
454 CHECK_MESSAGE( ( async_body_exec_count == NUMBER_OF_MSGS), "AsyncBody processed wrong number of signals" );
455 CHECK_MESSAGE( ( async_activity_processed_msg_count == NUMBER_OF_MSGS), "AsyncActivity processed wrong number of signals" );
456 CHECK_MESSAGE( ( end_body_exec_count == NUMBER_OF_MSGS), "EndBody processed wrong number of signals");
457 INFO( "async_body_exec_count == " << int(async_body_exec_count) <<
458 " == async_activity_processed_msg_count == " << int(async_activity_processed_msg_count) <<
459 " == end_body_exec_count == " << int(end_body_exec_count) << "\n"
460 );
461 return 0;
462 }
463
464 };
465
test_copy_ctor()466 int test_copy_ctor() {
467 const int N = NUMBER_OF_MSGS;
468 async_body_exec_count = 0;
469
470 tbb::flow::graph g;
471
472 harness_counting_receiver<int> r1(g);
473 harness_counting_receiver<int> r2(g);
474
475 tbb::task_group_context graph_ctx;
476 counting_async_node_type a(g, tbb::flow::unlimited, counting_async_unlimited_body(graph_ctx) );
477 counting_async_node_type b(a);
478
479 tbb::flow::make_edge(a, r1); // C++11-style of making edges
480 tbb::flow::make_edge(tbb::flow::output_port<0>(b), r2); // usual way of making edges
481
482 for (int i = 0; i < N; ++i) {
483 a.try_put(i);
484 }
485 g.wait_for_all();
486
487 INFO("async_body_exec_count = " << int(async_body_exec_count) << "\n" );
488 INFO("r1.my_count == " << int(r1.my_count) << " and r2.my_count = " << int(r2.my_count) << "\n" );
489 CHECK_MESSAGE( ( int(async_body_exec_count) == NUMBER_OF_MSGS), "AsyncBody processed wrong number of signals" );
490 CHECK_MESSAGE( ( int(r1.my_count) == N), "counting receiver r1 has not received N items" );
491 CHECK_MESSAGE( ( int(r2.my_count) == 0), "counting receiver r2 has not received 0 items" );
492
493 for (int i = 0; i < N; ++i) {
494 b.try_put(i);
495 }
496 g.wait_for_all();
497
498 INFO("async_body_exec_count = " << int(async_body_exec_count) << "\n" );
499 INFO("r1.my_count == " << int(r1.my_count) << " and r2.my_count = " << int(r2.my_count) << "\n" );
500 CHECK_MESSAGE( ( int(async_body_exec_count) == 2*NUMBER_OF_MSGS), "AsyncBody processed wrong number of signals" );
501 CHECK_MESSAGE( ( int(r1.my_count) == N), "counting receiver r1 has not received N items" );
502 CHECK_MESSAGE( ( int(r2.my_count) == N), "counting receiver r2 has not received N items" );
503 return 0;
504 }
505
506 std::atomic<int> main_tid_count;
507
508 template<typename Input, typename Output>
509 struct spin_test {
510 typedef Input input_type;
511 typedef Output output_type;
512 typedef tbb::flow::async_node< input_type, output_type > async_node_type;
513 typedef typename async_node_type::gateway_type gateway_type;
514
515 class end_body_type {
516 typedef Output output_type;
517 std::thread::id my_main_tid;
518 utils::SpinBarrier *my_barrier;
519 public:
end_body_type(std::thread::id t,utils::SpinBarrier & b)520 end_body_type(std::thread::id t, utils::SpinBarrier &b) : my_main_tid(t), my_barrier(&b) { }
521
operator ()(const output_type &)522 void operator()( const output_type & ) {
523 ++end_body_exec_count;
524 if (std::this_thread::get_id() == my_main_tid) {
525 ++main_tid_count;
526 }
527 my_barrier->wait();
528 }
529 };
530
spin_testspin_test531 spin_test() {}
532
runspin_test533 static int run(int nthreads, int async_expected_items = UNKNOWN_NUMBER_OF_ITEMS) {
534 async_activity<input_type, output_type> my_async_activity(async_expected_items, false, 0);
535 const int overall_message_count = nthreads * NUMBER_OF_MSGS;
536 utils::SpinBarrier spin_barrier(nthreads);
537
538 tbb::flow::graph g;
539 tbb::flow::function_node<int, input_type> start_node(
540 g, tbb::flow::unlimited, [](int input) { return input_type(input); }
541 );
542 async_node_type offload_node(
543 g, tbb::flow::unlimited,
544 [&](const input_type &input, gateway_type& gateway) {
545 ++async_body_exec_count;
546 if(my_async_activity.should_reserve_each_time())
547 gateway.reserve_wait();
548 my_async_activity.submit(input, gateway);
549 }
550 );
551 tbb::flow::function_node<output_type> end_node(
552 g, tbb::flow::unlimited, end_body_type(std::this_thread::get_id(), spin_barrier)
553 );
554
555 tbb::flow::make_edge( start_node, offload_node );
556 tbb::flow::make_edge( offload_node, end_node );
557
558 async_body_exec_count = 0;
559 async_activity_processed_msg_count = 0;
560 end_body_exec_count = 0;
561 main_tid_count = 0;
562
563 if (async_expected_items != UNKNOWN_NUMBER_OF_ITEMS ) {
564 offload_node.gateway().reserve_wait();
565 }
566 for (int i = 0; i < overall_message_count; ++i) {
567 start_node.try_put(i);
568 }
569 g.wait_for_all();
570 CHECK_MESSAGE( (async_body_exec_count == overall_message_count),
571 "AsyncBody processed wrong number of signals" );
572 CHECK_MESSAGE( (async_activity_processed_msg_count == overall_message_count),
573 "AsyncActivity processed wrong number of signals" );
574 CHECK_MESSAGE( (end_body_exec_count == overall_message_count),
575 "EndBody processed wrong number of signals");
576
577 INFO( "Main thread participated in " << main_tid_count << " end_body tasks\n");
578
579 INFO("async_body_exec_count == " << int(async_body_exec_count) <<
580 " == async_activity_processed_msg_count == " << int(async_activity_processed_msg_count) <<
581 " == end_body_exec_count == " << int(end_body_exec_count) << "\n"
582 );
583 return 0;
584 }
585
586 };
587
test_for_spin_avoidance()588 void test_for_spin_avoidance() {
589 const int nthreads = 4;
590 tbb::global_control gc(tbb::global_control::max_allowed_parallelism, nthreads);
591 tbb::task_arena a(nthreads);
592 a.execute([&] {
593 spin_test<int, int>::run(nthreads);
594 });
595 }
596
597 template< typename Input, typename Output >
run_tests()598 int run_tests() {
599 basic_test<Input, Output>::run();
600 basic_test<Input, Output>::run(NUMBER_OF_MSGS);
601 basic_test<place_wrapper<Input>, place_wrapper<Output> >::run();
602 basic_test<place_wrapper<Input>, place_wrapper<Output> >::run(NUMBER_OF_MSGS);
603 return 0;
604 }
605
606 #include "tbb/parallel_for.h"
607 template<typename Input, typename Output>
608 class enqueueing_on_inner_level {
609 typedef Input input_type;
610 typedef Output output_type;
611 typedef async_activity<input_type, output_type> async_activity_type;
612 typedef tbb::flow::async_node<Input, Output> async_node_type;
613 typedef typename async_node_type::gateway_type gateway_type;
614
615 class body_graph_with_async {
616 public:
body_graph_with_async(utils::SpinBarrier & barrier,async_activity_type & activity)617 body_graph_with_async( utils::SpinBarrier& barrier, async_activity_type& activity )
618 : spin_barrier(&barrier), my_async_activity(&activity) {}
619
operator ()(int) const620 void operator()(int) const {
621 tbb::flow::graph g;
622 tbb::flow::function_node< int, input_type > start_node(
623 g, tbb::flow::unlimited, [](int input) { return input_type(input); }
624 );
625 async_node_type offload_node(
626 g, tbb::flow::unlimited,
627 [&](const input_type &input, gateway_type& gateway) {
628 gateway.reserve_wait();
629 my_async_activity->submit( input, gateway );
630 }
631 );
632 tbb::flow::function_node< output_type > end_node( g, tbb::flow::unlimited, [](output_type){} );
633
634 tbb::flow::make_edge( start_node, offload_node );
635 tbb::flow::make_edge( offload_node, end_node );
636
637 start_node.try_put(1);
638
639 spin_barrier->wait();
640
641 my_async_activity->activate();
642
643 g.wait_for_all();
644 }
645
646 private:
647 utils::SpinBarrier* spin_barrier;
648 async_activity_type* my_async_activity;
649 };
650
651 public:
run()652 static int run ()
653 {
654 const int nthreads = tbb::this_task_arena::max_concurrency();
655 utils::SpinBarrier spin_barrier( nthreads );
656
657 async_activity_type my_async_activity( UNKNOWN_NUMBER_OF_ITEMS, true );
658
659 tbb::parallel_for( 0, nthreads, body_graph_with_async( spin_barrier, my_async_activity ) );
660 return 0;
661 }
662 };
663
run_test_enqueueing_on_inner_level()664 int run_test_enqueueing_on_inner_level() {
665 enqueueing_on_inner_level<int, int>::run();
666 return 0;
667 }
668
669 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
670 #include <array>
671
672 template<typename NodeType>
673 class AsyncActivity {
674 public:
675 using gateway_t = typename NodeType::gateway_type;
676
677 struct work_type {
678 int input;
679 gateway_t* gateway;
680 };
681
__anon62ab872d0a02() 682 AsyncActivity(size_t limit) : stop_limit(limit), c(0), thr([this]() {
683 while(!end_of_work()) {
684 work_type w;
685 while( my_q.try_pop(w) ) {
686 int res = do_work(w.input);
687 w.gateway->try_put(res);
688 w.gateway->release_wait();
689 ++c;
690 }
691 }
692 }) {}
693
submit(int i,gateway_t * gateway)694 void submit(int i, gateway_t* gateway) {
695 work_type w = {i, gateway};
696 gateway->reserve_wait();
697 my_q.push(w);
698 }
699
wait_for_all()700 void wait_for_all() { thr.join(); }
701
702 private:
end_of_work()703 bool end_of_work() { return c >= stop_limit; }
704
do_work(int & i)705 int do_work(int& i) { return i + i; }
706
707 async_activity_queue<work_type> my_q;
708 size_t stop_limit;
709 size_t c;
710 std::thread thr;
711 };
712
test_follows()713 void test_follows() {
714 using namespace tbb::flow;
715
716 using input_t = int;
717 using output_t = int;
718 using node_t = async_node<input_t, output_t>;
719
720 graph g;
721
722 AsyncActivity<node_t> async_activity(3);
723
724 std::array<broadcast_node<input_t>, 3> preds = {
725 {
726 broadcast_node<input_t>(g),
727 broadcast_node<input_t>(g),
728 broadcast_node<input_t>(g)
729 }
730 };
731
732 node_t node(follows(preds[0], preds[1], preds[2]), unlimited, [&](int input, node_t::gateway_type& gtw) {
733 async_activity.submit(input, >w);
734 }, no_priority);
735
736 buffer_node<output_t> buf(g);
737 make_edge(node, buf);
738
739 for(auto& pred: preds) {
740 pred.try_put(1);
741 }
742
743 g.wait_for_all();
744 async_activity.wait_for_all();
745
746 output_t storage;
747 CHECK_MESSAGE((buf.try_get(storage) && buf.try_get(storage) && buf.try_get(storage) && !buf.try_get(storage)),
748 "Not exact edge quantity was made");
749 }
750
test_precedes()751 void test_precedes() {
752 using namespace tbb::flow;
753
754 using input_t = int;
755 using output_t = int;
756 using node_t = async_node<input_t, output_t>;
757
758 graph g;
759
760 AsyncActivity<node_t> async_activity(1);
761
762 std::array<buffer_node<input_t>, 1> successors = { {buffer_node<input_t>(g)} };
763
764 broadcast_node<input_t> start(g);
765
766 node_t node(precedes(successors[0]), unlimited, [&](int input, node_t::gateway_type& gtw) {
767 async_activity.submit(input, >w);
768 }, no_priority);
769
770 make_edge(start, node);
771
772 start.try_put(1);
773
774 g.wait_for_all();
775 async_activity.wait_for_all();
776
777 for(auto& successor : successors) {
778 output_t storage;
779 CHECK_MESSAGE((successor.try_get(storage) && !successor.try_get(storage)),
780 "Not exact edge quantity was made");
781 }
782 }
783
test_follows_and_precedes_api()784 void test_follows_and_precedes_api() {
785 test_follows();
786 test_precedes();
787 }
788 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
789
790 //! Test async bodies processing
791 //! \brief \ref requirement \ref error_guessing
792 TEST_CASE("Basic tests"){
793 tbb::task_arena arena(utils::MaxThread);
794 arena.execute(
__anon62ab872d0d02() 795 [&]() {
796 run_tests<int, int>();
797 run_tests<minimal_type, minimal_type>();
798 run_tests<int, minimal_type>();
799 }
800 );
801 }
802
803 //! NativeParallelFor test with various concurrency settings
804 //! \brief \ref requirement \ref error_guessing
805 TEST_CASE("Lightweight tests"){
806 lightweight_testing::test<tbb::flow::async_node>(NUMBER_OF_MSGS);
807 }
808
809 //! Test reset and cancellation
810 //! \brief \ref error_guessing
811 TEST_CASE("Reset test"){
812 test_reset();
813 }
814
815 //! Test
816 //! \brief \ref requirement \ref error_guessing
817 TEST_CASE("Copy constructor test"){
818 test_copy_ctor();
819 }
820
821 //! Test if main thread spins
822 //! \brief \ref stress
823 TEST_CASE("Spin avoidance test"){
824 test_for_spin_avoidance();
825 }
826
827 //! Test nested enqueuing
828 //! \brief \ref error_guessing
829 TEST_CASE("Inner enqueuing test"){
830 run_test_enqueueing_on_inner_level();
831 }
832
833 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
834 //! Test deprecated follows and precedes API
835 //! \brief \ref error_guessing
836 TEST_CASE("Test follows and precedes API"){
837 test_follows_and_precedes_api();
838 }
839 #endif
840
841 #if __TBB_CPP20_CONCEPTS_PRESENT
842 //! \brief \ref error_guessing
843 TEST_CASE("constraints for async_node input") {
844 struct InputObject {
845 InputObject() = default;
846 InputObject( const InputObject& ) = default;
847 };
848
849 static_assert(utils::well_formed_instantiation<tbb::flow::async_node, InputObject, int>);
850 static_assert(utils::well_formed_instantiation<tbb::flow::async_node, int, int>);
851 static_assert(!utils::well_formed_instantiation<tbb::flow::async_node, test_concepts::NonCopyable, int>);
852 static_assert(!utils::well_formed_instantiation<tbb::flow::async_node, test_concepts::NonDefaultInitializable, int>);
853 }
854
855 template <typename Input, typename Output, typename Body>
856 concept can_call_async_node_ctor = requires( tbb::flow::graph& graph, std::size_t concurrency,
857 Body body, tbb::flow::node_priority_t priority, tbb::flow::buffer_node<int>& f ) {
858 tbb::flow::async_node<Input, Output>(graph, concurrency, body);
859 tbb::flow::async_node<Input, Output>(graph, concurrency, body, priority);
860 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
861 tbb::flow::async_node<Input, Output>(tbb::flow::follows(f), concurrency, body);
862 tbb::flow::async_node<Input, Output>(tbb::flow::follows(f), concurrency, body, priority);
863 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
864 };
865
866 //! \brief \ref error_guessing
867 TEST_CASE("constraints for async_node body") {
868 using input_type = int;
869 using output_type = input_type;
870 using namespace test_concepts::async_node_body;
871
872 static_assert(can_call_async_node_ctor<input_type, output_type, Correct<input_type, output_type>>);
873 static_assert(!can_call_async_node_ctor<input_type, output_type, NonCopyable<input_type, output_type>>);
874 static_assert(!can_call_async_node_ctor<input_type, output_type, NonDestructible<input_type, output_type>>);
875 static_assert(!can_call_async_node_ctor<input_type, output_type, NoOperatorRoundBrackets<input_type, output_type>>);
876 static_assert(!can_call_async_node_ctor<input_type, output_type, WrongFirstInputOperatorRoundBrackets<input_type, output_type>>);
877 static_assert(!can_call_async_node_ctor<input_type, output_type, WrongSecondInputOperatorRoundBrackets<input_type, output_type>>);
878 }
879
880 #endif // __TBB_CPP20_CONCEPTS_PRESENT
881