1 /* 2 Copyright (c) 2005-2020 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 18 #include "common/config.h" 19 20 #define __TBB_EXTRA_DEBUG 1 21 #include "tbb/flow_graph.h" 22 23 #include "tbb/task.h" 24 #include "tbb/global_control.h" 25 26 #include "common/test.h" 27 #include "common/utils.h" 28 #include "common/utils_assert.h" 29 #include "common/graph_utils.h" 30 #include "common/spin_barrier.h" 31 #include "common/test_follows_and_precedes_api.h" 32 33 #include <string> 34 #include <thread> 35 #include <mutex> 36 37 38 //! \file test_async_node.cpp 39 //! \brief Test for [flow_graph.async_node] specification 40 41 42 class minimal_type { 43 template<typename T> 44 friend struct place_wrapper; 45 46 int value; 47 48 public: 49 minimal_type() : value(-1) {} 50 minimal_type(int v) : value(v) {} 51 minimal_type(const minimal_type &m) : value(m.value) { } 52 minimal_type &operator=(const minimal_type &m) { value = m.value; return *this; } 53 }; 54 55 template <typename T> 56 struct place_wrapper { 57 typedef T wrapped_type; 58 T value; 59 std::thread::id thread_id; 60 61 place_wrapper( int v = 0 ) : value(v), thread_id(std::this_thread::get_id()) {} 62 63 template <typename Q> 64 place_wrapper(const place_wrapper<Q>& v) 65 : value(v.value), thread_id(v.thread_id) 66 {} 67 68 template <typename Q> 69 place_wrapper<Q>& operator=(const place_wrapper<Q>& v) { 70 if (this != &v) { 71 value = v.value; 72 thread_id = v.thread_id; 73 } 74 return *this; 75 } 76 77 }; 78 79 template<typename T1, typename T2> 80 struct wrapper_helper { 81 static void check(const T1 &, const T2 &) { } 82 static void copy_value(const T1 &in, T2 &out) { out = in; } 83 }; 84 85 template<typename T1, typename T2> 86 struct wrapper_helper< place_wrapper<T1>, place_wrapper<T2> > { 87 static void check(const place_wrapper<T1> &a, const place_wrapper<T2> &b) { 88 CHECK_MESSAGE( ( (a.thread_id != b.thread_id)), "same thread used to execute adjacent nodes"); 89 return; 90 } 91 static void copy_value(const place_wrapper<T1> &in, place_wrapper<T2> &out) { 92 out.value = in.value; 93 } 94 }; 95 96 const int NUMBER_OF_MSGS = 10; 97 const int UNKNOWN_NUMBER_OF_ITEMS = -1; 98 std::atomic<int> async_body_exec_count; 99 std::atomic<int> async_activity_processed_msg_count; 100 std::atomic<int> end_body_exec_count; 101 102 // queueing required in test_reset for testing of cancellation 103 typedef tbb::flow::async_node< int, int, tbb::flow::queueing > counting_async_node_type; 104 typedef counting_async_node_type::gateway_type counting_gateway_type; 105 106 struct counting_async_unlimited_body { 107 108 counting_async_unlimited_body(tbb::task_group_context& graph_tgc) : my_tgc( graph_tgc ) {} 109 110 void operator()( const int &input, counting_gateway_type& gateway) { 111 // TODO revamp: reconsider logging for the tests. It is known that frequent calls to 112 // doctest's INFO cause issues. 113 114 // INFO( "Body execution with input == " << input << "\n"); 115 ++async_body_exec_count; 116 if ( input == -1 ) { 117 bool result = my_tgc.cancel_group_execution(); 118 // INFO( "Canceling graph execution\n" ); 119 CHECK_MESSAGE( ( result == true), "attempted to cancel graph twice" ); 120 utils::Sleep(50); 121 } 122 gateway.try_put(input); 123 } 124 private: 125 tbb::task_group_context& my_tgc; 126 }; 127 128 struct counting_async_serial_body : counting_async_unlimited_body { 129 typedef counting_async_unlimited_body base_type; 130 int my_async_body_exec_count; 131 132 counting_async_serial_body(tbb::task_group_context& tgc) 133 : base_type(tgc), my_async_body_exec_count( 0 ) { } 134 135 void operator()( const int &input, counting_gateway_type& gateway ) { 136 ++my_async_body_exec_count; 137 base_type::operator()( input, gateway ); 138 } 139 }; 140 141 void test_reset() { 142 const int N = NUMBER_OF_MSGS; 143 async_body_exec_count = 0; 144 145 tbb::task_group_context graph_ctx; 146 tbb::flow::graph g(graph_ctx); 147 counting_async_node_type a(g, tbb::flow::serial, counting_async_serial_body(graph_ctx) ); 148 149 const int R = 3; 150 std::vector< std::shared_ptr<harness_counting_receiver<int>> > r; 151 for (size_t i = 0; i < R; ++i) { 152 r.push_back( std::make_shared<harness_counting_receiver<int>>(g) ); 153 } 154 155 for (int i = 0; i < R; ++i) { 156 tbb::flow::make_edge(a, *r[i]); 157 } 158 159 INFO( "One body execution\n" ); 160 a.try_put(-1); 161 for (int i = 0; i < N; ++i) { 162 a.try_put(i); 163 } 164 g.wait_for_all(); 165 // should be canceled with only 1 item reaching the async_body and the counting receivers 166 // and N items left in the node's queue 167 CHECK_MESSAGE( ( g.is_cancelled() == true), "task group not canceled" ); 168 169 counting_async_serial_body b1 = tbb::flow::copy_body<counting_async_serial_body>(a); 170 CHECK_MESSAGE( ( int(async_body_exec_count) == int(b1.my_async_body_exec_count)), "body and global body counts are different" ); 171 CHECK_MESSAGE( ( int(async_body_exec_count) == 1), "global body execution count not 1" ); 172 for (int i = 0; i < R; ++i) { 173 CHECK_MESSAGE( ( int(r[i]->my_count) == 1), "counting receiver count not 1" ); 174 } 175 176 // should clear the async_node queue, but retain its local count at 1 and keep all edges 177 g.reset(tbb::flow::rf_reset_protocol); 178 179 INFO( "N body executions\n" ); 180 for (int i = 0; i < N; ++i) { 181 a.try_put(i); 182 } 183 g.wait_for_all(); 184 CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" ); 185 186 // a total of N+1 items should have passed through the node body 187 // the local body count should also be N+1 188 // and the counting receivers should all have a count of N+1 189 counting_async_serial_body b2 = tbb::flow::copy_body<counting_async_serial_body>(a); 190 CHECK_MESSAGE( int(async_body_exec_count) == int(b2.my_async_body_exec_count), 191 "local and global body execution counts are different" ); 192 INFO( "async_body_exec_count==" << int(async_body_exec_count) << "\n" ); 193 CHECK_MESSAGE( ( int(async_body_exec_count) == N+1), "global body execution count not N+1" ); 194 for (int i = 0; i < R; ++i) { 195 CHECK_MESSAGE( ( int(r[i]->my_count) == N+1), "counting receiver has not received N+1 items" ); 196 } 197 198 INFO( "N body executions with new bodies\n" ); 199 // should clear the async_node queue and reset its local count to 0, but keep all edges 200 g.reset(tbb::flow::rf_reset_bodies); 201 for (int i = 0; i < N; ++i) { 202 a.try_put(i); 203 } 204 g.wait_for_all(); 205 CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" ); 206 207 // a total of 2N+1 items should have passed through the node body 208 // the local body count should be N 209 // and the counting receivers should all have a count of 2N+1 210 counting_async_serial_body b3 = tbb::flow::copy_body<counting_async_serial_body>(a); 211 CHECK_MESSAGE( ( int(async_body_exec_count) == 2*N+1), "global body execution count not 2N+1" ); 212 CHECK_MESSAGE( ( int(b3.my_async_body_exec_count) == N), "local body execution count not N" ); 213 for (int i = 0; i < R; ++i) { 214 CHECK_MESSAGE( ( int(r[i]->my_count) == 2*N+1), "counting receiver has not received 2N+1 items" ); 215 } 216 217 // should clear the async_node queue and keep its local count at N and remove all edges 218 INFO( "N body executions with no edges\n" ); 219 g.reset(tbb::flow::rf_clear_edges); 220 for (int i = 0; i < N; ++i) { 221 a.try_put(i); 222 } 223 g.wait_for_all(); 224 CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" ); 225 226 // a total of 3N+1 items should have passed through the node body 227 // the local body count should now be 2*N 228 // and the counting receivers should remain at a count of 2N+1 229 counting_async_serial_body b4 = tbb::flow::copy_body<counting_async_serial_body>(a); 230 CHECK_MESSAGE( ( int(async_body_exec_count) == 3*N+1), "global body execution count not 3N+1" ); 231 CHECK_MESSAGE( ( int(b4.my_async_body_exec_count) == 2*N), "local body execution count not 2N" ); 232 for (int i = 0; i < R; ++i) { 233 CHECK_MESSAGE( ( int(r[i]->my_count) == 2*N+1), "counting receiver has not received 2N+1 items" ); 234 } 235 236 // put back 1 edge to receiver 0 237 INFO( "N body executions with 1 edge\n" ); 238 tbb::flow::make_edge(a, *r[0]); 239 for (int i = 0; i < N; ++i) { 240 a.try_put(i); 241 } 242 g.wait_for_all(); 243 CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" ); 244 245 // a total of 4N+1 items should have passed through the node body 246 // the local body count should now be 3*N 247 // and all of the counting receivers should remain at a count of 2N+1, except r[0] which should be 3N+1 248 counting_async_serial_body b5 = tbb::flow::copy_body<counting_async_serial_body>(a); 249 CHECK_MESSAGE( ( int(async_body_exec_count) == 4*N+1), "global body execution count not 4N+1" ); 250 CHECK_MESSAGE( ( int(b5.my_async_body_exec_count) == 3*N), "local body execution count not 3N" ); 251 CHECK_MESSAGE( ( int(r[0]->my_count) == 3*N+1), "counting receiver has not received 3N+1 items" ); 252 for (int i = 1; i < R; ++i) { 253 CHECK_MESSAGE( ( int(r[i]->my_count) == 2*N+1), "counting receiver has not received 2N+1 items" ); 254 } 255 256 // should clear the async_node queue and keep its local count at N and remove all edges 257 INFO( "N body executions with no edges and new body\n" ); 258 g.reset(static_cast<tbb::flow::reset_flags>(tbb::flow::rf_reset_bodies|tbb::flow::rf_clear_edges)); 259 for (int i = 0; i < N; ++i) { 260 a.try_put(i); 261 } 262 g.wait_for_all(); 263 CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" ); 264 265 // a total of 4N+1 items should have passed through the node body 266 // the local body count should now be 3*N 267 // and all of the counting receivers should remain at a count of 2N+1, except r[0] which should be 3N+1 268 counting_async_serial_body b6 = tbb::flow::copy_body<counting_async_serial_body>(a); 269 CHECK_MESSAGE( ( int(async_body_exec_count) == 5*N+1), "global body execution count not 5N+1" ); 270 CHECK_MESSAGE( ( int(b6.my_async_body_exec_count) == N), "local body execution count not N" ); 271 CHECK_MESSAGE( ( int(r[0]->my_count) == 3*N+1), "counting receiver has not received 3N+1 items" ); 272 for (int i = 1; i < R; ++i) { 273 CHECK_MESSAGE( ( int(r[i]->my_count) == 2*N+1), "counting receiver has not received 2N+1 items" ); 274 } 275 } 276 277 278 #include <mutex> 279 280 template <typename T> 281 class async_activity_queue { 282 public: 283 void push( const T& item ) { 284 std::lock_guard<mutex_t> lock( m_mutex ); 285 m_queue.push( item ); 286 } 287 288 bool try_pop( T& item ) { 289 std::lock_guard<mutex_t> lock( m_mutex ); 290 if( m_queue.empty() ) 291 return false; 292 item = m_queue.front(); 293 m_queue.pop(); 294 return true; 295 } 296 297 bool empty() { 298 std::lock_guard<mutex_t> lock( m_mutex ); 299 return m_queue.empty(); 300 } 301 302 private: 303 typedef std::mutex mutex_t; 304 mutex_t m_mutex; 305 std::queue<T> m_queue; 306 }; 307 308 template< typename Input, typename Output > 309 class async_activity : utils::NoAssign { 310 public: 311 typedef Input input_type; 312 typedef Output output_type; 313 typedef tbb::flow::async_node< input_type, output_type > async_node_type; 314 typedef typename async_node_type::gateway_type gateway_type; 315 316 struct work_type { 317 input_type input; 318 gateway_type* gateway; 319 }; 320 321 class ServiceThreadBody { 322 public: 323 ServiceThreadBody( async_activity* activity ) : my_activity( activity ) {} 324 void operator()() { my_activity->process(); } 325 private: 326 async_activity* my_activity; 327 }; 328 329 async_activity(int expected_items, bool deferred = false, int sleep_time = 50) 330 : my_expected_items(expected_items), my_sleep_time(sleep_time) 331 { 332 is_active = !deferred; 333 my_quit = false; 334 std::thread( ServiceThreadBody( this ) ).swap( my_service_thread ); 335 } 336 337 private: 338 339 async_activity( const async_activity& ) 340 : my_expected_items(UNKNOWN_NUMBER_OF_ITEMS), my_sleep_time(0) 341 { 342 is_active = true; 343 } 344 345 public: 346 ~async_activity() { 347 stop(); 348 my_service_thread.join(); 349 } 350 351 void submit( const input_type &input, gateway_type& gateway ) { 352 work_type work = {input, &gateway}; 353 my_work_queue.push( work ); 354 } 355 356 void process() { 357 do { 358 work_type work; 359 if( is_active && my_work_queue.try_pop( work ) ) { 360 utils::Sleep(my_sleep_time); 361 ++async_activity_processed_msg_count; 362 output_type output; 363 wrapper_helper<output_type, output_type>::copy_value(work.input, output); 364 wrapper_helper<output_type, output_type>::check(work.input, output); 365 work.gateway->try_put(output); 366 if ( my_expected_items == UNKNOWN_NUMBER_OF_ITEMS || 367 int(async_activity_processed_msg_count) == my_expected_items ) { 368 work.gateway->release_wait(); 369 } 370 } 371 } while( my_quit == false || !my_work_queue.empty()); 372 } 373 374 void stop() { 375 my_quit = true; 376 } 377 378 void activate() { 379 is_active = true; 380 } 381 382 bool should_reserve_each_time() { 383 if ( my_expected_items == UNKNOWN_NUMBER_OF_ITEMS ) 384 return true; 385 else 386 return false; 387 } 388 389 private: 390 391 const int my_expected_items; 392 const int my_sleep_time; 393 std::atomic< bool > is_active; 394 395 async_activity_queue<work_type> my_work_queue; 396 397 std::atomic< bool > my_quit; 398 399 std::thread my_service_thread; 400 }; 401 402 template<typename Input, typename Output> 403 struct basic_test { 404 typedef Input input_type; 405 typedef Output output_type; 406 typedef tbb::flow::async_node< input_type, output_type > async_node_type; 407 typedef typename async_node_type::gateway_type gateway_type; 408 409 basic_test() {} 410 411 static int run(int async_expected_items = UNKNOWN_NUMBER_OF_ITEMS) { 412 async_activity<input_type, output_type> my_async_activity(async_expected_items); 413 414 tbb::flow::graph g; 415 416 tbb::flow::function_node< int, input_type > start_node( 417 g, tbb::flow::unlimited, [](int input) { return input_type(input); } 418 ); 419 async_node_type offload_node( 420 g, tbb::flow::unlimited, 421 [&] (const input_type &input, gateway_type& gateway) { 422 ++async_body_exec_count; 423 if(my_async_activity.should_reserve_each_time()) 424 gateway.reserve_wait(); 425 my_async_activity.submit(input, gateway); 426 } 427 ); 428 tbb::flow::function_node< output_type > end_node( 429 g, tbb::flow::unlimited, 430 [&](const output_type& input) { 431 ++end_body_exec_count; 432 output_type output; 433 wrapper_helper<output_type, output_type>::check(input, output); 434 } 435 ); 436 437 tbb::flow::make_edge( start_node, offload_node ); 438 tbb::flow::make_edge( offload_node, end_node ); 439 440 async_body_exec_count = 0; 441 async_activity_processed_msg_count = 0; 442 end_body_exec_count = 0; 443 444 if (async_expected_items != UNKNOWN_NUMBER_OF_ITEMS) { 445 offload_node.gateway().reserve_wait(); 446 } 447 for (int i = 0; i < NUMBER_OF_MSGS; ++i) { 448 start_node.try_put(i); 449 } 450 g.wait_for_all(); 451 CHECK_MESSAGE( ( async_body_exec_count == NUMBER_OF_MSGS), "AsyncBody processed wrong number of signals" ); 452 CHECK_MESSAGE( ( async_activity_processed_msg_count == NUMBER_OF_MSGS), "AsyncActivity processed wrong number of signals" ); 453 CHECK_MESSAGE( ( end_body_exec_count == NUMBER_OF_MSGS), "EndBody processed wrong number of signals"); 454 INFO( "async_body_exec_count == " << int(async_body_exec_count) << 455 " == async_activity_processed_msg_count == " << int(async_activity_processed_msg_count) << 456 " == end_body_exec_count == " << int(end_body_exec_count) << "\n" 457 ); 458 return 0; 459 } 460 461 }; 462 463 int test_copy_ctor() { 464 const int N = NUMBER_OF_MSGS; 465 async_body_exec_count = 0; 466 467 tbb::flow::graph g; 468 469 harness_counting_receiver<int> r1(g); 470 harness_counting_receiver<int> r2(g); 471 472 tbb::task_group_context graph_ctx; 473 counting_async_node_type a(g, tbb::flow::unlimited, counting_async_unlimited_body(graph_ctx) ); 474 counting_async_node_type b(a); 475 476 tbb::flow::make_edge(a, r1); // C++11-style of making edges 477 tbb::flow::make_edge(tbb::flow::output_port<0>(b), r2); // usual way of making edges 478 479 for (int i = 0; i < N; ++i) { 480 a.try_put(i); 481 } 482 g.wait_for_all(); 483 484 INFO("async_body_exec_count = " << int(async_body_exec_count) << "\n" ); 485 INFO("r1.my_count == " << int(r1.my_count) << " and r2.my_count = " << int(r2.my_count) << "\n" ); 486 CHECK_MESSAGE( ( int(async_body_exec_count) == NUMBER_OF_MSGS), "AsyncBody processed wrong number of signals" ); 487 CHECK_MESSAGE( ( int(r1.my_count) == N), "counting receiver r1 has not received N items" ); 488 CHECK_MESSAGE( ( int(r2.my_count) == 0), "counting receiver r2 has not received 0 items" ); 489 490 for (int i = 0; i < N; ++i) { 491 b.try_put(i); 492 } 493 g.wait_for_all(); 494 495 INFO("async_body_exec_count = " << int(async_body_exec_count) << "\n" ); 496 INFO("r1.my_count == " << int(r1.my_count) << " and r2.my_count = " << int(r2.my_count) << "\n" ); 497 CHECK_MESSAGE( ( int(async_body_exec_count) == 2*NUMBER_OF_MSGS), "AsyncBody processed wrong number of signals" ); 498 CHECK_MESSAGE( ( int(r1.my_count) == N), "counting receiver r1 has not received N items" ); 499 CHECK_MESSAGE( ( int(r2.my_count) == N), "counting receiver r2 has not received N items" ); 500 return 0; 501 } 502 503 std::atomic<int> main_tid_count; 504 505 template<typename Input, typename Output> 506 struct spin_test { 507 typedef Input input_type; 508 typedef Output output_type; 509 typedef tbb::flow::async_node< input_type, output_type > async_node_type; 510 typedef typename async_node_type::gateway_type gateway_type; 511 512 class end_body_type { 513 typedef Output output_type; 514 std::thread::id my_main_tid; 515 utils::SpinBarrier *my_barrier; 516 public: 517 end_body_type(std::thread::id t, utils::SpinBarrier &b) : my_main_tid(t), my_barrier(&b) { } 518 519 void operator()( const output_type & ) { 520 ++end_body_exec_count; 521 if (std::this_thread::get_id() == my_main_tid) { 522 ++main_tid_count; 523 } 524 my_barrier->timedWaitNoError(10); 525 } 526 }; 527 528 spin_test() {} 529 530 static int run(int nthreads, int async_expected_items = UNKNOWN_NUMBER_OF_ITEMS) { 531 async_activity<input_type, output_type> my_async_activity(async_expected_items, false, 0); 532 const int overall_message_count = nthreads * NUMBER_OF_MSGS; 533 utils::SpinBarrier spin_barrier(nthreads); 534 535 tbb::flow::graph g; 536 tbb::flow::function_node<int, input_type> start_node( 537 g, tbb::flow::unlimited, [](int input) { return input_type(input); } 538 ); 539 async_node_type offload_node( 540 g, tbb::flow::unlimited, 541 [&](const input_type &input, gateway_type& gateway) { 542 ++async_body_exec_count; 543 if(my_async_activity.should_reserve_each_time()) 544 gateway.reserve_wait(); 545 my_async_activity.submit(input, gateway); 546 } 547 ); 548 tbb::flow::function_node<output_type> end_node( 549 g, tbb::flow::unlimited, end_body_type(std::this_thread::get_id(), spin_barrier) 550 ); 551 552 tbb::flow::make_edge( start_node, offload_node ); 553 tbb::flow::make_edge( offload_node, end_node ); 554 555 async_body_exec_count = 0; 556 async_activity_processed_msg_count = 0; 557 end_body_exec_count = 0; 558 main_tid_count = 0; 559 560 if (async_expected_items != UNKNOWN_NUMBER_OF_ITEMS ) { 561 offload_node.gateway().reserve_wait(); 562 } 563 for (int i = 0; i < overall_message_count; ++i) { 564 start_node.try_put(i); 565 } 566 g.wait_for_all(); 567 CHECK_MESSAGE( (async_body_exec_count == overall_message_count), 568 "AsyncBody processed wrong number of signals" ); 569 CHECK_MESSAGE( (async_activity_processed_msg_count == overall_message_count), 570 "AsyncActivity processed wrong number of signals" ); 571 CHECK_MESSAGE( (end_body_exec_count == overall_message_count), 572 "EndBody processed wrong number of signals"); 573 574 INFO( "Main thread participated in " << main_tid_count << " end_body tasks\n"); 575 576 INFO("async_body_exec_count == " << int(async_body_exec_count) << 577 " == async_activity_processed_msg_count == " << int(async_activity_processed_msg_count) << 578 " == end_body_exec_count == " << int(end_body_exec_count) << "\n" 579 ); 580 return 0; 581 } 582 583 }; 584 585 void test_for_spin_avoidance() { 586 const int nthreads = 4; 587 tbb::global_control gc(tbb::global_control::max_allowed_parallelism, nthreads); 588 spin_test<int, int>::run(nthreads); 589 } 590 591 template< typename Input, typename Output > 592 int run_tests() { 593 basic_test<Input, Output>::run(); 594 basic_test<Input, Output>::run(NUMBER_OF_MSGS); 595 basic_test<place_wrapper<Input>, place_wrapper<Output> >::run(); 596 basic_test<place_wrapper<Input>, place_wrapper<Output> >::run(NUMBER_OF_MSGS); 597 return 0; 598 } 599 600 #include "tbb/parallel_for.h" 601 template<typename Input, typename Output> 602 class equeueing_on_inner_level { 603 typedef Input input_type; 604 typedef Output output_type; 605 typedef async_activity<input_type, output_type> async_activity_type; 606 typedef tbb::flow::async_node<Input, Output> async_node_type; 607 typedef typename async_node_type::gateway_type gateway_type; 608 609 class body_graph_with_async { 610 public: 611 body_graph_with_async( utils::SpinBarrier& barrier, async_activity_type& activity ) 612 : spin_barrier(&barrier), my_async_activity(&activity) {} 613 614 void operator()(int) const { 615 tbb::flow::graph g; 616 tbb::flow::function_node< int, input_type > start_node( 617 g, tbb::flow::unlimited, [](int input) { return input_type(input); } 618 ); 619 async_node_type offload_node( 620 g, tbb::flow::unlimited, 621 [&](const input_type &input, gateway_type& gateway) { 622 gateway.reserve_wait(); 623 my_async_activity->submit( input, gateway ); 624 } 625 ); 626 tbb::flow::function_node< output_type > end_node( g, tbb::flow::unlimited, [](output_type){} ); 627 628 tbb::flow::make_edge( start_node, offload_node ); 629 tbb::flow::make_edge( offload_node, end_node ); 630 631 start_node.try_put(1); 632 633 spin_barrier->wait(); 634 635 my_async_activity->activate(); 636 637 g.wait_for_all(); 638 } 639 640 private: 641 utils::SpinBarrier* spin_barrier; 642 async_activity_type* my_async_activity; 643 }; 644 645 public: 646 static int run () 647 { 648 const int nthreads = tbb::this_task_arena::max_concurrency(); 649 utils::SpinBarrier spin_barrier( nthreads ); 650 651 async_activity_type my_async_activity( UNKNOWN_NUMBER_OF_ITEMS, true ); 652 653 tbb::parallel_for( 0, nthreads, body_graph_with_async( spin_barrier, my_async_activity ) ); 654 return 0; 655 } 656 }; 657 658 int run_test_equeueing_on_inner_level() { 659 equeueing_on_inner_level<int, int>::run(); 660 return 0; 661 } 662 663 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 664 #include <array> 665 666 template<typename NodeType> 667 class AsyncActivity { 668 public: 669 using gateway_t = typename NodeType::gateway_type; 670 671 struct work_type { 672 int input; 673 gateway_t* gateway; 674 }; 675 676 AsyncActivity(size_t limit) : thr([this]() { 677 while(!end_of_work()) { 678 work_type w; 679 while( my_q.try_pop(w) ) { 680 int res = do_work(w.input); 681 w.gateway->try_put(res); 682 w.gateway->release_wait(); 683 ++c; 684 } 685 } 686 }), stop_limit(limit), c(0) {} 687 688 void submit(int i, gateway_t* gateway) { 689 work_type w = {i, gateway}; 690 gateway->reserve_wait(); 691 my_q.push(w); 692 } 693 694 void wait_for_all() { thr.join(); } 695 696 private: 697 bool end_of_work() { return c >= stop_limit; } 698 699 int do_work(int& i) { return i + i; } 700 701 async_activity_queue<work_type> my_q; 702 std::thread thr; 703 size_t stop_limit; 704 size_t c; 705 }; 706 707 void test_follows() { 708 using namespace tbb::flow; 709 710 using input_t = int; 711 using output_t = int; 712 using node_t = async_node<input_t, output_t>; 713 714 graph g; 715 716 AsyncActivity<node_t> async_activity(3); 717 718 std::array<broadcast_node<input_t>, 3> preds = { 719 { 720 broadcast_node<input_t>(g), 721 broadcast_node<input_t>(g), 722 broadcast_node<input_t>(g) 723 } 724 }; 725 726 node_t node(follows(preds[0], preds[1], preds[2]), unlimited, [&](int input, node_t::gateway_type& gtw) { 727 async_activity.submit(input, >w); 728 }, no_priority); 729 730 buffer_node<output_t> buf(g); 731 make_edge(node, buf); 732 733 for(auto& pred: preds) { 734 pred.try_put(1); 735 } 736 737 g.wait_for_all(); 738 async_activity.wait_for_all(); 739 740 output_t storage; 741 CHECK_MESSAGE((buf.try_get(storage) && buf.try_get(storage) && buf.try_get(storage) && !buf.try_get(storage)), 742 "Not exact edge quantity was made"); 743 } 744 745 void test_precedes() { 746 using namespace tbb::flow; 747 748 using input_t = int; 749 using output_t = int; 750 using node_t = async_node<input_t, output_t>; 751 752 graph g; 753 754 AsyncActivity<node_t> async_activity(1); 755 756 std::array<buffer_node<input_t>, 1> successors = { {buffer_node<input_t>(g)} }; 757 758 broadcast_node<input_t> start(g); 759 760 node_t node(precedes(successors[0]), unlimited, [&](int input, node_t::gateway_type& gtw) { 761 async_activity.submit(input, >w); 762 }, no_priority); 763 764 make_edge(start, node); 765 766 start.try_put(1); 767 768 g.wait_for_all(); 769 async_activity.wait_for_all(); 770 771 for(auto& successor : successors) { 772 output_t storage; 773 CHECK_MESSAGE((successor.try_get(storage) && !successor.try_get(storage)), 774 "Not exact edge quantity was made"); 775 } 776 } 777 778 void test_follows_and_precedes_api() { 779 test_follows(); 780 test_precedes(); 781 } 782 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 783 784 //! Test async bodies processing 785 //! \brief \ref requirement \ref error_guessing 786 TEST_CASE("Basic tests"){ 787 tbb::task_arena arena(utils::MaxThread); 788 arena.execute( 789 [&]() { 790 run_tests<int, int>(); 791 run_tests<minimal_type, minimal_type>(); 792 run_tests<int, minimal_type>(); 793 } 794 ); 795 } 796 797 //! NativeParallelFor test with various concurrency settings 798 //! \brief \ref requirement \ref error_guessing 799 TEST_CASE("Lightweight tests"){ 800 lightweight_testing::test<tbb::flow::async_node>(NUMBER_OF_MSGS); 801 } 802 803 //! Test reset and cancellation 804 //! \brief \ref error_guessing 805 TEST_CASE("Reset test"){ 806 test_reset(); 807 } 808 809 //! Test 810 //! \brief \ref requirement \ref error_guessing 811 TEST_CASE("Copy constructor test"){ 812 test_copy_ctor(); 813 } 814 815 //! Test if main thread spins 816 //! \brief \ref stress 817 TEST_CASE("Spin avoidance test"){ 818 test_for_spin_avoidance(); 819 } 820 821 //! Test nested enqueing 822 //! \brief \ref error_guessing 823 TEST_CASE("Inner enqueing test"){ 824 run_test_equeueing_on_inner_level(); 825 } 826 827 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 828 //! Test deprecated follows and preceedes API 829 //! \brief \ref error_guessing 830 TEST_CASE("Test follows and preceedes API"){ 831 test_follows_and_precedes_api(); 832 } 833 #endif 834