1 /* 2 Copyright (c) 2005-2023 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #if __INTEL_COMPILER && _MSC_VER 18 #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated 19 #endif 20 21 #include "common/config.h" 22 23 #include "tbb/flow_graph.h" 24 25 #include "tbb/task.h" 26 #include "tbb/global_control.h" 27 28 #include "common/test.h" 29 #include "common/utils.h" 30 #include "common/utils_assert.h" 31 #include "common/graph_utils.h" 32 #include "common/spin_barrier.h" 33 #include "common/test_follows_and_precedes_api.h" 34 #include "common/concepts_common.h" 35 36 #include <string> 37 #include <thread> 38 #include <mutex> 39 40 41 //! \file test_async_node.cpp 42 //! \brief Test for [flow_graph.async_node] specification 43 44 45 class minimal_type { 46 template<typename T> 47 friend struct place_wrapper; 48 49 int value; 50 51 public: 52 minimal_type() : value(-1) {} 53 minimal_type(int v) : value(v) {} 54 minimal_type(const minimal_type &m) : value(m.value) { } 55 minimal_type &operator=(const minimal_type &m) { value = m.value; return *this; } 56 }; 57 58 template <typename T> 59 struct place_wrapper { 60 typedef T wrapped_type; 61 T value; 62 std::thread::id thread_id; 63 64 place_wrapper( int v = 0 ) : value(v), thread_id(std::this_thread::get_id()) {} 65 66 template <typename Q> 67 place_wrapper(const place_wrapper<Q>& v) 68 : value(v.value), thread_id(v.thread_id) 69 {} 70 71 template <typename Q> 72 place_wrapper<Q>& operator=(const place_wrapper<Q>& v) { 73 if (this != &v) { 74 value = v.value; 75 thread_id = v.thread_id; 76 } 77 return *this; 78 } 79 80 }; 81 82 template<typename T1, typename T2> 83 struct wrapper_helper { 84 static void check(const T1 &, const T2 &) { } 85 static void copy_value(const T1 &in, T2 &out) { out = in; } 86 }; 87 88 template<typename T1, typename T2> 89 struct wrapper_helper< place_wrapper<T1>, place_wrapper<T2> > { 90 static void check(const place_wrapper<T1> &a, const place_wrapper<T2> &b) { 91 CHECK_MESSAGE( ( (a.thread_id != b.thread_id)), "same thread used to execute adjacent nodes"); 92 return; 93 } 94 static void copy_value(const place_wrapper<T1> &in, place_wrapper<T2> &out) { 95 out.value = in.value; 96 } 97 }; 98 99 const int NUMBER_OF_MSGS = 10; 100 const int UNKNOWN_NUMBER_OF_ITEMS = -1; 101 std::atomic<int> async_body_exec_count; 102 std::atomic<int> async_activity_processed_msg_count; 103 std::atomic<int> end_body_exec_count; 104 105 // queueing required in test_reset for testing of cancellation 106 typedef tbb::flow::async_node< int, int, tbb::flow::queueing > counting_async_node_type; 107 typedef counting_async_node_type::gateway_type counting_gateway_type; 108 109 struct counting_async_unlimited_body { 110 111 counting_async_unlimited_body(tbb::task_group_context& graph_tgc) : my_tgc( graph_tgc ) {} 112 113 void operator()( const int &input, counting_gateway_type& gateway) { 114 // TODO revamp: reconsider logging for the tests. It is known that frequent calls to 115 // doctest's INFO cause issues. 116 117 // INFO( "Body execution with input == " << input << "\n"); 118 ++async_body_exec_count; 119 if ( input == -1 ) { 120 bool result = my_tgc.cancel_group_execution(); 121 // INFO( "Canceling graph execution\n" ); 122 CHECK_MESSAGE( ( result == true), "attempted to cancel graph twice" ); 123 utils::Sleep(50); 124 } 125 gateway.try_put(input); 126 } 127 private: 128 tbb::task_group_context& my_tgc; 129 }; 130 131 struct counting_async_serial_body : counting_async_unlimited_body { 132 typedef counting_async_unlimited_body base_type; 133 int my_async_body_exec_count; 134 135 counting_async_serial_body(tbb::task_group_context& tgc) 136 : base_type(tgc), my_async_body_exec_count( 0 ) { } 137 138 void operator()( const int &input, counting_gateway_type& gateway ) { 139 ++my_async_body_exec_count; 140 base_type::operator()( input, gateway ); 141 } 142 }; 143 144 void test_reset() { 145 const int N = NUMBER_OF_MSGS; 146 async_body_exec_count = 0; 147 148 tbb::task_group_context graph_ctx; 149 tbb::flow::graph g(graph_ctx); 150 counting_async_node_type a(g, tbb::flow::serial, counting_async_serial_body(graph_ctx) ); 151 152 const int R = 3; 153 std::vector< std::shared_ptr<harness_counting_receiver<int>> > r; 154 for (size_t i = 0; i < R; ++i) { 155 r.push_back( std::make_shared<harness_counting_receiver<int>>(g) ); 156 } 157 158 for (int i = 0; i < R; ++i) { 159 tbb::flow::make_edge(a, *r[i]); 160 } 161 162 INFO( "One body execution\n" ); 163 a.try_put(-1); 164 for (int i = 0; i < N; ++i) { 165 a.try_put(i); 166 } 167 g.wait_for_all(); 168 // should be canceled with only 1 item reaching the async_body and the counting receivers 169 // and N items left in the node's queue 170 CHECK_MESSAGE( ( g.is_cancelled() == true), "task group not canceled" ); 171 172 counting_async_serial_body b1 = tbb::flow::copy_body<counting_async_serial_body>(a); 173 CHECK_MESSAGE( ( int(async_body_exec_count) == int(b1.my_async_body_exec_count)), "body and global body counts are different" ); 174 CHECK_MESSAGE( ( int(async_body_exec_count) == 1), "global body execution count not 1" ); 175 for (int i = 0; i < R; ++i) { 176 CHECK_MESSAGE( ( int(r[i]->my_count) == 1), "counting receiver count not 1" ); 177 } 178 179 // should clear the async_node queue, but retain its local count at 1 and keep all edges 180 g.reset(tbb::flow::rf_reset_protocol); 181 182 INFO( "N body executions\n" ); 183 for (int i = 0; i < N; ++i) { 184 a.try_put(i); 185 } 186 g.wait_for_all(); 187 CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" ); 188 189 // a total of N+1 items should have passed through the node body 190 // the local body count should also be N+1 191 // and the counting receivers should all have a count of N+1 192 counting_async_serial_body b2 = tbb::flow::copy_body<counting_async_serial_body>(a); 193 CHECK_MESSAGE( int(async_body_exec_count) == int(b2.my_async_body_exec_count), 194 "local and global body execution counts are different" ); 195 INFO( "async_body_exec_count==" << int(async_body_exec_count) << "\n" ); 196 CHECK_MESSAGE( ( int(async_body_exec_count) == N+1), "global body execution count not N+1" ); 197 for (int i = 0; i < R; ++i) { 198 CHECK_MESSAGE( ( int(r[i]->my_count) == N+1), "counting receiver has not received N+1 items" ); 199 } 200 201 INFO( "N body executions with new bodies\n" ); 202 // should clear the async_node queue and reset its local count to 0, but keep all edges 203 g.reset(tbb::flow::rf_reset_bodies); 204 for (int i = 0; i < N; ++i) { 205 a.try_put(i); 206 } 207 g.wait_for_all(); 208 CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" ); 209 210 // a total of 2N+1 items should have passed through the node body 211 // the local body count should be N 212 // and the counting receivers should all have a count of 2N+1 213 counting_async_serial_body b3 = tbb::flow::copy_body<counting_async_serial_body>(a); 214 CHECK_MESSAGE( ( int(async_body_exec_count) == 2*N+1), "global body execution count not 2N+1" ); 215 CHECK_MESSAGE( ( int(b3.my_async_body_exec_count) == N), "local body execution count not N" ); 216 for (int i = 0; i < R; ++i) { 217 CHECK_MESSAGE( ( int(r[i]->my_count) == 2*N+1), "counting receiver has not received 2N+1 items" ); 218 } 219 220 // should clear the async_node queue and keep its local count at N and remove all edges 221 INFO( "N body executions with no edges\n" ); 222 g.reset(tbb::flow::rf_clear_edges); 223 for (int i = 0; i < N; ++i) { 224 a.try_put(i); 225 } 226 g.wait_for_all(); 227 CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" ); 228 229 // a total of 3N+1 items should have passed through the node body 230 // the local body count should now be 2*N 231 // and the counting receivers should remain at a count of 2N+1 232 counting_async_serial_body b4 = tbb::flow::copy_body<counting_async_serial_body>(a); 233 CHECK_MESSAGE( ( int(async_body_exec_count) == 3*N+1), "global body execution count not 3N+1" ); 234 CHECK_MESSAGE( ( int(b4.my_async_body_exec_count) == 2*N), "local body execution count not 2N" ); 235 for (int i = 0; i < R; ++i) { 236 CHECK_MESSAGE( ( int(r[i]->my_count) == 2*N+1), "counting receiver has not received 2N+1 items" ); 237 } 238 239 // put back 1 edge to receiver 0 240 INFO( "N body executions with 1 edge\n" ); 241 tbb::flow::make_edge(a, *r[0]); 242 for (int i = 0; i < N; ++i) { 243 a.try_put(i); 244 } 245 g.wait_for_all(); 246 CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" ); 247 248 // a total of 4N+1 items should have passed through the node body 249 // the local body count should now be 3*N 250 // and all of the counting receivers should remain at a count of 2N+1, except r[0] which should be 3N+1 251 counting_async_serial_body b5 = tbb::flow::copy_body<counting_async_serial_body>(a); 252 CHECK_MESSAGE( ( int(async_body_exec_count) == 4*N+1), "global body execution count not 4N+1" ); 253 CHECK_MESSAGE( ( int(b5.my_async_body_exec_count) == 3*N), "local body execution count not 3N" ); 254 CHECK_MESSAGE( ( int(r[0]->my_count) == 3*N+1), "counting receiver has not received 3N+1 items" ); 255 for (int i = 1; i < R; ++i) { 256 CHECK_MESSAGE( ( int(r[i]->my_count) == 2*N+1), "counting receiver has not received 2N+1 items" ); 257 } 258 259 // should clear the async_node queue and keep its local count at N and remove all edges 260 INFO( "N body executions with no edges and new body\n" ); 261 g.reset(static_cast<tbb::flow::reset_flags>(tbb::flow::rf_reset_bodies|tbb::flow::rf_clear_edges)); 262 for (int i = 0; i < N; ++i) { 263 a.try_put(i); 264 } 265 g.wait_for_all(); 266 CHECK_MESSAGE( ( g.is_cancelled() == false), "task group not canceled" ); 267 268 // a total of 4N+1 items should have passed through the node body 269 // the local body count should now be 3*N 270 // and all of the counting receivers should remain at a count of 2N+1, except r[0] which should be 3N+1 271 counting_async_serial_body b6 = tbb::flow::copy_body<counting_async_serial_body>(a); 272 CHECK_MESSAGE( ( int(async_body_exec_count) == 5*N+1), "global body execution count not 5N+1" ); 273 CHECK_MESSAGE( ( int(b6.my_async_body_exec_count) == N), "local body execution count not N" ); 274 CHECK_MESSAGE( ( int(r[0]->my_count) == 3*N+1), "counting receiver has not received 3N+1 items" ); 275 for (int i = 1; i < R; ++i) { 276 CHECK_MESSAGE( ( int(r[i]->my_count) == 2*N+1), "counting receiver has not received 2N+1 items" ); 277 } 278 } 279 280 281 #include <mutex> 282 283 template <typename T> 284 class async_activity_queue { 285 public: 286 void push( const T& item ) { 287 std::lock_guard<mutex_t> lock( m_mutex ); 288 m_queue.push( item ); 289 } 290 291 bool try_pop( T& item ) { 292 std::lock_guard<mutex_t> lock( m_mutex ); 293 if( m_queue.empty() ) 294 return false; 295 item = m_queue.front(); 296 m_queue.pop(); 297 return true; 298 } 299 300 bool empty() { 301 std::lock_guard<mutex_t> lock( m_mutex ); 302 return m_queue.empty(); 303 } 304 305 private: 306 typedef std::mutex mutex_t; 307 mutex_t m_mutex; 308 std::queue<T> m_queue; 309 }; 310 311 template< typename Input, typename Output > 312 class async_activity : utils::NoAssign { 313 public: 314 typedef Input input_type; 315 typedef Output output_type; 316 typedef tbb::flow::async_node< input_type, output_type > async_node_type; 317 typedef typename async_node_type::gateway_type gateway_type; 318 319 struct work_type { 320 input_type input; 321 gateway_type* gateway; 322 }; 323 324 class ServiceThreadBody { 325 public: 326 ServiceThreadBody( async_activity* activity ) : my_activity( activity ) {} 327 void operator()() { my_activity->process(); } 328 private: 329 async_activity* my_activity; 330 }; 331 332 async_activity(int expected_items, bool deferred = false, int sleep_time = 50) 333 : my_expected_items(expected_items), my_sleep_time(sleep_time) 334 { 335 is_active = !deferred; 336 my_quit = false; 337 std::thread( ServiceThreadBody( this ) ).swap( my_service_thread ); 338 } 339 340 private: 341 342 async_activity( const async_activity& ) 343 : my_expected_items(UNKNOWN_NUMBER_OF_ITEMS), my_sleep_time(0) 344 { 345 is_active = true; 346 } 347 348 public: 349 ~async_activity() { 350 stop(); 351 my_service_thread.join(); 352 } 353 354 void submit( const input_type &input, gateway_type& gateway ) { 355 work_type work = {input, &gateway}; 356 my_work_queue.push( work ); 357 } 358 359 void process() { 360 do { 361 work_type work; 362 if( is_active && my_work_queue.try_pop( work ) ) { 363 utils::Sleep(my_sleep_time); 364 ++async_activity_processed_msg_count; 365 output_type output; 366 wrapper_helper<output_type, output_type>::copy_value(work.input, output); 367 wrapper_helper<output_type, output_type>::check(work.input, output); 368 work.gateway->try_put(output); 369 if ( my_expected_items == UNKNOWN_NUMBER_OF_ITEMS || 370 int(async_activity_processed_msg_count) == my_expected_items ) { 371 work.gateway->release_wait(); 372 } 373 } 374 } while( my_quit == false || !my_work_queue.empty()); 375 } 376 377 void stop() { 378 my_quit = true; 379 } 380 381 void activate() { 382 is_active = true; 383 } 384 385 bool should_reserve_each_time() { 386 if ( my_expected_items == UNKNOWN_NUMBER_OF_ITEMS ) 387 return true; 388 else 389 return false; 390 } 391 392 private: 393 394 const int my_expected_items; 395 const int my_sleep_time; 396 std::atomic< bool > is_active; 397 398 async_activity_queue<work_type> my_work_queue; 399 400 std::atomic< bool > my_quit; 401 402 std::thread my_service_thread; 403 }; 404 405 template<typename Input, typename Output> 406 struct basic_test { 407 typedef Input input_type; 408 typedef Output output_type; 409 typedef tbb::flow::async_node< input_type, output_type > async_node_type; 410 typedef typename async_node_type::gateway_type gateway_type; 411 412 basic_test() {} 413 414 static int run(int async_expected_items = UNKNOWN_NUMBER_OF_ITEMS) { 415 async_activity<input_type, output_type> my_async_activity(async_expected_items); 416 417 tbb::flow::graph g; 418 419 tbb::flow::function_node< int, input_type > start_node( 420 g, tbb::flow::unlimited, [](int input) { return input_type(input); } 421 ); 422 async_node_type offload_node( 423 g, tbb::flow::unlimited, 424 [&] (const input_type &input, gateway_type& gateway) { 425 ++async_body_exec_count; 426 if(my_async_activity.should_reserve_each_time()) 427 gateway.reserve_wait(); 428 my_async_activity.submit(input, gateway); 429 } 430 ); 431 tbb::flow::function_node< output_type > end_node( 432 g, tbb::flow::unlimited, 433 [&](const output_type& input) { 434 ++end_body_exec_count; 435 output_type output; 436 wrapper_helper<output_type, output_type>::check(input, output); 437 } 438 ); 439 440 tbb::flow::make_edge( start_node, offload_node ); 441 tbb::flow::make_edge( offload_node, end_node ); 442 443 async_body_exec_count = 0; 444 async_activity_processed_msg_count = 0; 445 end_body_exec_count = 0; 446 447 if (async_expected_items != UNKNOWN_NUMBER_OF_ITEMS) { 448 offload_node.gateway().reserve_wait(); 449 } 450 for (int i = 0; i < NUMBER_OF_MSGS; ++i) { 451 start_node.try_put(i); 452 } 453 g.wait_for_all(); 454 CHECK_MESSAGE( ( async_body_exec_count == NUMBER_OF_MSGS), "AsyncBody processed wrong number of signals" ); 455 CHECK_MESSAGE( ( async_activity_processed_msg_count == NUMBER_OF_MSGS), "AsyncActivity processed wrong number of signals" ); 456 CHECK_MESSAGE( ( end_body_exec_count == NUMBER_OF_MSGS), "EndBody processed wrong number of signals"); 457 INFO( "async_body_exec_count == " << int(async_body_exec_count) << 458 " == async_activity_processed_msg_count == " << int(async_activity_processed_msg_count) << 459 " == end_body_exec_count == " << int(end_body_exec_count) << "\n" 460 ); 461 return 0; 462 } 463 464 }; 465 466 int test_copy_ctor() { 467 const int N = NUMBER_OF_MSGS; 468 async_body_exec_count = 0; 469 470 tbb::flow::graph g; 471 472 harness_counting_receiver<int> r1(g); 473 harness_counting_receiver<int> r2(g); 474 475 tbb::task_group_context graph_ctx; 476 counting_async_node_type a(g, tbb::flow::unlimited, counting_async_unlimited_body(graph_ctx) ); 477 counting_async_node_type b(a); 478 479 tbb::flow::make_edge(a, r1); // C++11-style of making edges 480 tbb::flow::make_edge(tbb::flow::output_port<0>(b), r2); // usual way of making edges 481 482 for (int i = 0; i < N; ++i) { 483 a.try_put(i); 484 } 485 g.wait_for_all(); 486 487 INFO("async_body_exec_count = " << int(async_body_exec_count) << "\n" ); 488 INFO("r1.my_count == " << int(r1.my_count) << " and r2.my_count = " << int(r2.my_count) << "\n" ); 489 CHECK_MESSAGE( ( int(async_body_exec_count) == NUMBER_OF_MSGS), "AsyncBody processed wrong number of signals" ); 490 CHECK_MESSAGE( ( int(r1.my_count) == N), "counting receiver r1 has not received N items" ); 491 CHECK_MESSAGE( ( int(r2.my_count) == 0), "counting receiver r2 has not received 0 items" ); 492 493 for (int i = 0; i < N; ++i) { 494 b.try_put(i); 495 } 496 g.wait_for_all(); 497 498 INFO("async_body_exec_count = " << int(async_body_exec_count) << "\n" ); 499 INFO("r1.my_count == " << int(r1.my_count) << " and r2.my_count = " << int(r2.my_count) << "\n" ); 500 CHECK_MESSAGE( ( int(async_body_exec_count) == 2*NUMBER_OF_MSGS), "AsyncBody processed wrong number of signals" ); 501 CHECK_MESSAGE( ( int(r1.my_count) == N), "counting receiver r1 has not received N items" ); 502 CHECK_MESSAGE( ( int(r2.my_count) == N), "counting receiver r2 has not received N items" ); 503 return 0; 504 } 505 506 std::atomic<int> main_tid_count; 507 508 template<typename Input, typename Output> 509 struct spin_test { 510 typedef Input input_type; 511 typedef Output output_type; 512 typedef tbb::flow::async_node< input_type, output_type > async_node_type; 513 typedef typename async_node_type::gateway_type gateway_type; 514 515 class end_body_type { 516 typedef Output output_type; 517 std::thread::id my_main_tid; 518 utils::SpinBarrier *my_barrier; 519 public: 520 end_body_type(std::thread::id t, utils::SpinBarrier &b) : my_main_tid(t), my_barrier(&b) { } 521 522 void operator()( const output_type & ) { 523 ++end_body_exec_count; 524 if (std::this_thread::get_id() == my_main_tid) { 525 ++main_tid_count; 526 } 527 my_barrier->wait(); 528 } 529 }; 530 531 spin_test() {} 532 533 static int run(int nthreads, int async_expected_items = UNKNOWN_NUMBER_OF_ITEMS) { 534 async_activity<input_type, output_type> my_async_activity(async_expected_items, false, 0); 535 const int overall_message_count = nthreads * NUMBER_OF_MSGS; 536 utils::SpinBarrier spin_barrier(nthreads); 537 538 tbb::flow::graph g; 539 tbb::flow::function_node<int, input_type> start_node( 540 g, tbb::flow::unlimited, [](int input) { return input_type(input); } 541 ); 542 async_node_type offload_node( 543 g, tbb::flow::unlimited, 544 [&](const input_type &input, gateway_type& gateway) { 545 ++async_body_exec_count; 546 if(my_async_activity.should_reserve_each_time()) 547 gateway.reserve_wait(); 548 my_async_activity.submit(input, gateway); 549 } 550 ); 551 tbb::flow::function_node<output_type> end_node( 552 g, tbb::flow::unlimited, end_body_type(std::this_thread::get_id(), spin_barrier) 553 ); 554 555 tbb::flow::make_edge( start_node, offload_node ); 556 tbb::flow::make_edge( offload_node, end_node ); 557 558 async_body_exec_count = 0; 559 async_activity_processed_msg_count = 0; 560 end_body_exec_count = 0; 561 main_tid_count = 0; 562 563 if (async_expected_items != UNKNOWN_NUMBER_OF_ITEMS ) { 564 offload_node.gateway().reserve_wait(); 565 } 566 for (int i = 0; i < overall_message_count; ++i) { 567 start_node.try_put(i); 568 } 569 g.wait_for_all(); 570 CHECK_MESSAGE( (async_body_exec_count == overall_message_count), 571 "AsyncBody processed wrong number of signals" ); 572 CHECK_MESSAGE( (async_activity_processed_msg_count == overall_message_count), 573 "AsyncActivity processed wrong number of signals" ); 574 CHECK_MESSAGE( (end_body_exec_count == overall_message_count), 575 "EndBody processed wrong number of signals"); 576 577 INFO( "Main thread participated in " << main_tid_count << " end_body tasks\n"); 578 579 INFO("async_body_exec_count == " << int(async_body_exec_count) << 580 " == async_activity_processed_msg_count == " << int(async_activity_processed_msg_count) << 581 " == end_body_exec_count == " << int(end_body_exec_count) << "\n" 582 ); 583 return 0; 584 } 585 586 }; 587 588 void test_for_spin_avoidance() { 589 const int nthreads = 4; 590 tbb::global_control gc(tbb::global_control::max_allowed_parallelism, nthreads); 591 tbb::task_arena a(nthreads); 592 a.execute([&] { 593 spin_test<int, int>::run(nthreads); 594 }); 595 } 596 597 template< typename Input, typename Output > 598 int run_tests() { 599 basic_test<Input, Output>::run(); 600 basic_test<Input, Output>::run(NUMBER_OF_MSGS); 601 basic_test<place_wrapper<Input>, place_wrapper<Output> >::run(); 602 basic_test<place_wrapper<Input>, place_wrapper<Output> >::run(NUMBER_OF_MSGS); 603 return 0; 604 } 605 606 #include "tbb/parallel_for.h" 607 template<typename Input, typename Output> 608 class enqueueing_on_inner_level { 609 typedef Input input_type; 610 typedef Output output_type; 611 typedef async_activity<input_type, output_type> async_activity_type; 612 typedef tbb::flow::async_node<Input, Output> async_node_type; 613 typedef typename async_node_type::gateway_type gateway_type; 614 615 class body_graph_with_async { 616 public: 617 body_graph_with_async( utils::SpinBarrier& barrier, async_activity_type& activity ) 618 : spin_barrier(&barrier), my_async_activity(&activity) {} 619 620 void operator()(int) const { 621 tbb::flow::graph g; 622 tbb::flow::function_node< int, input_type > start_node( 623 g, tbb::flow::unlimited, [](int input) { return input_type(input); } 624 ); 625 async_node_type offload_node( 626 g, tbb::flow::unlimited, 627 [&](const input_type &input, gateway_type& gateway) { 628 gateway.reserve_wait(); 629 my_async_activity->submit( input, gateway ); 630 } 631 ); 632 tbb::flow::function_node< output_type > end_node( g, tbb::flow::unlimited, [](output_type){} ); 633 634 tbb::flow::make_edge( start_node, offload_node ); 635 tbb::flow::make_edge( offload_node, end_node ); 636 637 start_node.try_put(1); 638 639 spin_barrier->wait(); 640 641 my_async_activity->activate(); 642 643 g.wait_for_all(); 644 } 645 646 private: 647 utils::SpinBarrier* spin_barrier; 648 async_activity_type* my_async_activity; 649 }; 650 651 public: 652 static int run () 653 { 654 const int nthreads = tbb::this_task_arena::max_concurrency(); 655 utils::SpinBarrier spin_barrier( nthreads ); 656 657 async_activity_type my_async_activity( UNKNOWN_NUMBER_OF_ITEMS, true ); 658 659 tbb::parallel_for( 0, nthreads, body_graph_with_async( spin_barrier, my_async_activity ) ); 660 return 0; 661 } 662 }; 663 664 int run_test_enqueueing_on_inner_level() { 665 enqueueing_on_inner_level<int, int>::run(); 666 return 0; 667 } 668 669 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 670 #include <array> 671 672 template<typename NodeType> 673 class AsyncActivity { 674 public: 675 using gateway_t = typename NodeType::gateway_type; 676 677 struct work_type { 678 int input; 679 gateway_t* gateway; 680 }; 681 682 AsyncActivity(size_t limit) : stop_limit(limit), c(0), thr([this]() { 683 while(!end_of_work()) { 684 work_type w; 685 while( my_q.try_pop(w) ) { 686 int res = do_work(w.input); 687 w.gateway->try_put(res); 688 w.gateway->release_wait(); 689 ++c; 690 } 691 } 692 }) {} 693 694 void submit(int i, gateway_t* gateway) { 695 work_type w = {i, gateway}; 696 gateway->reserve_wait(); 697 my_q.push(w); 698 } 699 700 void wait_for_all() { thr.join(); } 701 702 private: 703 bool end_of_work() { return c >= stop_limit; } 704 705 int do_work(int& i) { return i + i; } 706 707 async_activity_queue<work_type> my_q; 708 size_t stop_limit; 709 size_t c; 710 std::thread thr; 711 }; 712 713 void test_follows() { 714 using namespace tbb::flow; 715 716 using input_t = int; 717 using output_t = int; 718 using node_t = async_node<input_t, output_t>; 719 720 graph g; 721 722 AsyncActivity<node_t> async_activity(3); 723 724 std::array<broadcast_node<input_t>, 3> preds = { 725 { 726 broadcast_node<input_t>(g), 727 broadcast_node<input_t>(g), 728 broadcast_node<input_t>(g) 729 } 730 }; 731 732 node_t node(follows(preds[0], preds[1], preds[2]), unlimited, [&](int input, node_t::gateway_type& gtw) { 733 async_activity.submit(input, >w); 734 }, no_priority); 735 736 buffer_node<output_t> buf(g); 737 make_edge(node, buf); 738 739 for(auto& pred: preds) { 740 pred.try_put(1); 741 } 742 743 g.wait_for_all(); 744 async_activity.wait_for_all(); 745 746 output_t storage; 747 CHECK_MESSAGE((buf.try_get(storage) && buf.try_get(storage) && buf.try_get(storage) && !buf.try_get(storage)), 748 "Not exact edge quantity was made"); 749 } 750 751 void test_precedes() { 752 using namespace tbb::flow; 753 754 using input_t = int; 755 using output_t = int; 756 using node_t = async_node<input_t, output_t>; 757 758 graph g; 759 760 AsyncActivity<node_t> async_activity(1); 761 762 std::array<buffer_node<input_t>, 1> successors = { {buffer_node<input_t>(g)} }; 763 764 broadcast_node<input_t> start(g); 765 766 node_t node(precedes(successors[0]), unlimited, [&](int input, node_t::gateway_type& gtw) { 767 async_activity.submit(input, >w); 768 }, no_priority); 769 770 make_edge(start, node); 771 772 start.try_put(1); 773 774 g.wait_for_all(); 775 async_activity.wait_for_all(); 776 777 for(auto& successor : successors) { 778 output_t storage; 779 CHECK_MESSAGE((successor.try_get(storage) && !successor.try_get(storage)), 780 "Not exact edge quantity was made"); 781 } 782 } 783 784 void test_follows_and_precedes_api() { 785 test_follows(); 786 test_precedes(); 787 } 788 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 789 790 //! Test async bodies processing 791 //! \brief \ref requirement \ref error_guessing 792 TEST_CASE("Basic tests"){ 793 tbb::task_arena arena(utils::MaxThread); 794 arena.execute( 795 [&]() { 796 run_tests<int, int>(); 797 run_tests<minimal_type, minimal_type>(); 798 run_tests<int, minimal_type>(); 799 } 800 ); 801 } 802 803 //! NativeParallelFor test with various concurrency settings 804 //! \brief \ref requirement \ref error_guessing 805 TEST_CASE("Lightweight tests"){ 806 lightweight_testing::test<tbb::flow::async_node>(NUMBER_OF_MSGS); 807 } 808 809 //! Test reset and cancellation 810 //! \brief \ref error_guessing 811 TEST_CASE("Reset test"){ 812 test_reset(); 813 } 814 815 //! Test 816 //! \brief \ref requirement \ref error_guessing 817 TEST_CASE("Copy constructor test"){ 818 test_copy_ctor(); 819 } 820 821 //! Test if main thread spins 822 //! \brief \ref stress 823 TEST_CASE("Spin avoidance test"){ 824 test_for_spin_avoidance(); 825 } 826 827 //! Test nested enqueuing 828 //! \brief \ref error_guessing 829 TEST_CASE("Inner enqueuing test"){ 830 run_test_enqueueing_on_inner_level(); 831 } 832 833 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 834 //! Test deprecated follows and precedes API 835 //! \brief \ref error_guessing 836 TEST_CASE("Test follows and precedes API"){ 837 test_follows_and_precedes_api(); 838 } 839 #endif 840 841 #if __TBB_CPP20_CONCEPTS_PRESENT 842 //! \brief \ref error_guessing 843 TEST_CASE("constraints for async_node input") { 844 struct InputObject { 845 InputObject() = default; 846 InputObject( const InputObject& ) = default; 847 }; 848 849 static_assert(utils::well_formed_instantiation<tbb::flow::async_node, InputObject, int>); 850 static_assert(utils::well_formed_instantiation<tbb::flow::async_node, int, int>); 851 static_assert(!utils::well_formed_instantiation<tbb::flow::async_node, test_concepts::NonCopyable, int>); 852 static_assert(!utils::well_formed_instantiation<tbb::flow::async_node, test_concepts::NonDefaultInitializable, int>); 853 } 854 855 template <typename Input, typename Output, typename Body> 856 concept can_call_async_node_ctor = requires( tbb::flow::graph& graph, std::size_t concurrency, 857 Body body, tbb::flow::node_priority_t priority, tbb::flow::buffer_node<int>& f ) { 858 tbb::flow::async_node<Input, Output>(graph, concurrency, body); 859 tbb::flow::async_node<Input, Output>(graph, concurrency, body, priority); 860 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 861 tbb::flow::async_node<Input, Output>(tbb::flow::follows(f), concurrency, body); 862 tbb::flow::async_node<Input, Output>(tbb::flow::follows(f), concurrency, body, priority); 863 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 864 }; 865 866 //! \brief \ref error_guessing 867 TEST_CASE("constraints for async_node body") { 868 using input_type = int; 869 using output_type = input_type; 870 using namespace test_concepts::async_node_body; 871 872 static_assert(can_call_async_node_ctor<input_type, output_type, Correct<input_type, output_type>>); 873 static_assert(!can_call_async_node_ctor<input_type, output_type, NonCopyable<input_type, output_type>>); 874 static_assert(!can_call_async_node_ctor<input_type, output_type, NonDestructible<input_type, output_type>>); 875 static_assert(!can_call_async_node_ctor<input_type, output_type, NoOperatorRoundBrackets<input_type, output_type>>); 876 static_assert(!can_call_async_node_ctor<input_type, output_type, WrongFirstInputOperatorRoundBrackets<input_type, output_type>>); 877 static_assert(!can_call_async_node_ctor<input_type, output_type, WrongSecondInputOperatorRoundBrackets<input_type, output_type>>); 878 } 879 880 #endif // __TBB_CPP20_CONCEPTS_PRESENT 881