1 /* 2 Copyright (c) 2018-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #if __INTEL_COMPILER && _MSC_VER 18 #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated 19 #endif 20 21 #include "common/config.h" 22 23 #include "tbb/flow_graph.h" 24 #include "tbb/parallel_for.h" 25 #include "tbb/global_control.h" 26 #include "tbb/task_arena.h" 27 28 #include "common/test.h" 29 #include "common/utils.h" 30 #include "common/utils_concurrency_limit.h" 31 #include "common/spin_barrier.h" 32 33 #include <vector> 34 #include <cstdlib> 35 #include <random> 36 #include <algorithm> 37 #include <memory> 38 39 40 //! \file test_flow_graph_priorities.cpp 41 //! \brief Test for [flow_graph.copy_body flow_graph.function_node flow_graph.multifunction_node flow_graph.continue_node flow_graph.async_node] specification 42 43 44 using namespace tbb::flow; 45 46 struct TaskInfo { 47 TaskInfo() : my_priority(-1), my_task_index(-1) {} 48 TaskInfo( int priority, int task_index ) 49 : my_priority(priority), my_task_index(task_index) {} 50 int my_priority; 51 int my_task_index; 52 }; 53 54 std::vector<TaskInfo> g_task_info; 55 56 std::atomic<unsigned> g_task_num; 57 58 void spin_for( double delta ) { 59 tbb::tick_count start = tbb::tick_count::now(); 60 while( (tbb::tick_count::now() - start).seconds() < delta ) ; 61 } 62 63 namespace PriorityNodesTakePrecedence { 64 65 std::atomic<bool> g_work_submitted; 66 67 const unsigned node_num = 100; 68 const unsigned start_index = node_num / 3; 69 const unsigned end_index = node_num * 2 / 3; 70 std::atomic<unsigned> g_priority_task_index; 71 72 void body_func( int priority, utils::SpinBarrier& my_barrier ) { 73 while( !g_work_submitted.load(std::memory_order_acquire) ) 74 tbb::detail::d0::yield(); 75 int current_task_index = g_task_num++; 76 if( priority != no_priority ) 77 g_task_info[g_priority_task_index++] = TaskInfo( priority, current_task_index ); 78 const bool all_threads_will_come = 79 unsigned(current_task_index) < node_num - (node_num % tbb::this_task_arena::max_concurrency()); 80 if( all_threads_will_come ) 81 my_barrier.wait(); 82 } 83 84 typedef multifunction_node< int, std::tuple<int> > multi_node; 85 86 template <typename T> 87 struct Body { 88 Body( int priority, utils::SpinBarrier& barrier ) 89 : my_priority( priority ), my_barrier( barrier ) {} 90 T operator()( const T& msg ) const { 91 body_func( my_priority, my_barrier ); 92 return msg; 93 } 94 void operator()( int msg, multi_node::output_ports_type& op ) const { 95 body_func( my_priority, my_barrier ); 96 std::get<0>(op).try_put( msg ); 97 } 98 private: 99 int my_priority; 100 utils::SpinBarrier& my_barrier; 101 }; 102 103 template<typename NodeType, typename BodyType> 104 struct node_creator_t { 105 NodeType* operator()( graph& g, unsigned index, utils::SpinBarrier& barrier ) { 106 if( start_index <= index && index < end_index ) 107 return new NodeType( g, unlimited, BodyType(index, barrier), node_priority_t(index) ); 108 else 109 return new NodeType( g, unlimited, BodyType(no_priority, barrier) ); 110 } 111 }; 112 113 template<typename BodyType> 114 struct node_creator_t< continue_node<continue_msg>, BodyType > { 115 continue_node<continue_msg>* operator()( graph& g, unsigned index, utils::SpinBarrier& barrier ) { 116 if( start_index <= index && index < end_index ) 117 return new continue_node<continue_msg>( g, BodyType(index, barrier), node_priority_t(index) ); 118 else 119 return new continue_node<continue_msg>( g, BodyType(no_priority, barrier) ); 120 } 121 }; 122 123 124 struct passthru_body { 125 template<typename T> 126 continue_msg operator()( T ) const { return continue_msg(); } 127 }; 128 129 template<typename NodeType, typename NodeTypeCreator> 130 void test_node( NodeTypeCreator node_creator ) { 131 const int num_threads = tbb::this_task_arena::max_concurrency(); 132 utils::SpinBarrier barrier( num_threads ); 133 graph g; 134 broadcast_node<typename NodeType::input_type> bn(g); 135 function_node<typename NodeType::input_type> tn(g, unlimited, passthru_body()); 136 // Using pointers to nodes to avoid errors on compilers, which try to generate assignment 137 // operator for the nodes 138 std::vector< std::unique_ptr<NodeType> > nodes; 139 for( unsigned i = 0; i < node_num; ++i ) { 140 nodes.push_back(std::unique_ptr<NodeType>( node_creator(g, i, barrier) )); 141 make_edge( bn, *nodes.back() ); 142 make_edge( *nodes.back(), tn ); 143 } 144 145 const size_t repeats = 10; 146 const size_t priority_nodes_num = end_index - start_index; 147 size_t global_order_failures = 0; 148 for( size_t repeat = 0; repeat < repeats; ++repeat ) { 149 g_work_submitted.store( false, std::memory_order_release ); 150 g_task_num = g_priority_task_index = 0; 151 g_task_info.clear(); g_task_info.resize( priority_nodes_num ); 152 153 bn.try_put( typename NodeType::input_type{} ); 154 // Setting of the flag is based on the knowledge that the calling thread broadcasts the 155 // message to successor nodes. Thus, once the calling thread returns from try_put() call all 156 // necessary tasks are spawned. Thus, this makes this test to be a whitebox test to some 157 // extent. 158 g_work_submitted.store( true, std::memory_order_release ); 159 160 g.wait_for_all(); 161 162 CHECK_MESSAGE( (g_priority_task_index == g_task_info.size()), "Incorrect number of tasks with priority." ); 163 CHECK_MESSAGE( (priority_nodes_num == g_task_info.size()), "Incorrect number of tasks with priority executed." ); 164 165 for( unsigned i = 0; i < g_priority_task_index; i += num_threads ) { 166 bool found = false; 167 unsigned highest_priority_within_group = end_index - i - 1; 168 for( unsigned j = i; j < i+num_threads; ++j ) { 169 if( g_task_info[j].my_priority == int(highest_priority_within_group) ) { 170 found = true; 171 break; 172 } 173 } 174 CHECK_MESSAGE( found, "Highest priority task within a group was not found" ); 175 } 176 for( unsigned i = 0; i < g_priority_task_index; ++i ) { 177 // This check might fail because priorities do not guarantee ordering, i.e. assumption 178 // that all priority nodes should increment the task counter before any subsequent 179 // no-priority node is not correct. In the worst case, a thread that took a priority 180 // node might be preempted and become the last to increment the counter. That's why the 181 // test passing is based on statistics, which could be affected by machine overload 182 // unfortunately. 183 // TODO revamp: reconsider the following check for this test 184 if( g_task_info[i].my_task_index > int(priority_nodes_num + num_threads) ) 185 ++global_order_failures; 186 } 187 } 188 float failure_ratio = float(global_order_failures) / float(repeats*priority_nodes_num); 189 CHECK_MESSAGE( 190 failure_ratio <= 0.1f, 191 "Nodes with priorities executed in wrong order too frequently over non-prioritized nodes." 192 ); 193 } 194 195 template<typename NodeType, typename NodeBody> 196 void call_within_arena( tbb::task_arena& arena ) { 197 arena.execute( 198 [&]() { 199 test_node<NodeType>( node_creator_t<NodeType, NodeBody>() ); 200 } 201 ); 202 } 203 204 void test( int num_threads ) { 205 INFO( "Testing execution of nodes with priority takes precedence (num_threads=" << num_threads << ") - " ); 206 tbb::task_arena arena(num_threads); 207 call_within_arena< function_node<int,int>, Body<int> >( arena ); 208 call_within_arena< multi_node, Body<int> >( arena ); 209 call_within_arena< continue_node<continue_msg>, Body<continue_msg> >( arena ); 210 } 211 212 } /* namespace PriorityNodesTakePrecedence */ 213 214 namespace ThreadsEagerReaction { 215 216 // TODO revamp: combine with similar queue from test_async_node 217 template <typename T> 218 class concurrent_queue { 219 public: 220 bool try_pop(T& item) { 221 std::lock_guard<queue_mutex> lock(mutex); 222 if ( q.empty() ) 223 return false; 224 item = q.front(); 225 q.pop(); 226 return true; 227 } 228 229 void push(const T& item) { 230 std::lock_guard<queue_mutex> lock(mutex); 231 q.push(item); 232 } 233 private: 234 std::queue<T> q; 235 using queue_mutex = std::mutex; 236 std::mutex mutex; 237 }; 238 239 using utils::SpinBarrier; 240 241 enum task_type_t { no_task, regular_task, async_task }; 242 243 struct profile_t { 244 task_type_t task_type; 245 unsigned global_task_id; 246 double elapsed; 247 }; 248 249 std::vector<unsigned> g_async_task_ids; 250 251 typedef unsigned data_type; 252 typedef async_node<data_type, data_type> async_node_type; 253 typedef multifunction_node< 254 data_type, std::tuple<data_type, data_type> > decider_node_type; 255 struct AsyncActivity { 256 typedef async_node_type::gateway_type gateway_type; 257 258 struct work_type { data_type input; gateway_type* gateway; }; 259 std::atomic<bool> done; 260 concurrent_queue<work_type> my_queue; 261 std::thread my_service_thread; 262 263 struct ServiceThreadFunc { 264 SpinBarrier& my_barrier; 265 ServiceThreadFunc(SpinBarrier& barrier) : my_barrier(barrier) {} 266 void operator()(AsyncActivity* activity) { 267 while (!activity->done) { 268 work_type work; 269 while (activity->my_queue.try_pop(work)) { 270 g_async_task_ids.push_back( ++g_task_num ); 271 work.gateway->try_put(work.input); 272 work.gateway->release_wait(); 273 my_barrier.wait(); 274 } 275 } 276 } 277 }; 278 void stop_and_wait() { done = true; my_service_thread.join(); } 279 280 void submit(data_type input, gateway_type* gateway) { 281 work_type work = { input, gateway }; 282 gateway->reserve_wait(); 283 my_queue.push(work); 284 } 285 AsyncActivity(SpinBarrier& barrier) 286 : done(false), my_service_thread(ServiceThreadFunc(barrier), this) {} 287 }; 288 289 struct StartBody { 290 bool has_run; 291 data_type operator()(tbb::flow_control& fc) { 292 if (has_run){ 293 fc.stop(); 294 return data_type(); 295 } 296 has_run = true; 297 return 1; 298 } 299 StartBody() : has_run(false) {} 300 }; 301 302 struct ParallelForBody { 303 SpinBarrier& my_barrier; 304 const data_type& my_input; 305 ParallelForBody(SpinBarrier& barrier, const data_type& input) 306 : my_barrier(barrier), my_input(input) {} 307 void operator()(const data_type&) const { 308 my_barrier.wait(); 309 ++g_task_num; 310 } 311 }; 312 313 struct CpuWorkBody { 314 SpinBarrier& my_barrier; 315 const int my_tasks_count; 316 data_type operator()(const data_type& input) { 317 tbb::parallel_for(0, my_tasks_count, ParallelForBody(my_barrier, input), tbb::simple_partitioner()); 318 return input; 319 } 320 CpuWorkBody(SpinBarrier& barrier, int tasks_count) 321 : my_barrier(barrier), my_tasks_count(tasks_count) {} 322 }; 323 324 struct DeciderBody { 325 const data_type my_limit; 326 DeciderBody( const data_type& limit ) : my_limit( limit ) {} 327 void operator()(data_type input, decider_node_type::output_ports_type& ports) { 328 if (input < my_limit) 329 std::get<0>(ports).try_put(input + 1); 330 } 331 }; 332 333 struct AsyncSubmissionBody { 334 AsyncActivity* my_activity; 335 void operator()(data_type input, async_node_type::gateway_type& gateway) { 336 my_activity->submit(input, &gateway); 337 } 338 AsyncSubmissionBody(AsyncActivity* activity) : my_activity(activity) {} 339 }; 340 341 void test( unsigned num_threads ) { 342 INFO( "Testing threads react eagerly on asynchronous tasks (num_threads=" << num_threads << ") - " ); 343 if( num_threads == std::thread::hardware_concurrency() ) { 344 // one thread is required for asynchronous compute resource 345 INFO("skipping test since it is designed to work on less number of threads than " 346 "hardware concurrency allows\n"); 347 return; 348 } 349 const unsigned cpu_threads = unsigned(num_threads); 350 const unsigned cpu_tasks_per_thread = 4; 351 const unsigned nested_cpu_tasks = cpu_tasks_per_thread * cpu_threads; 352 const unsigned async_subgraph_reruns = 8; 353 const unsigned cpu_subgraph_reruns = 2; 354 355 SpinBarrier barrier(cpu_threads + /*async thread=*/1); 356 g_task_num = 0; 357 g_async_task_ids.clear(); 358 g_async_task_ids.reserve(async_subgraph_reruns); 359 360 tbb::task_arena arena(cpu_threads); 361 arena.execute( 362 [&]() { 363 AsyncActivity activity(barrier); 364 graph g; 365 366 input_node<data_type> starter_node(g, StartBody()); 367 function_node<data_type, data_type> cpu_work_node( 368 g, unlimited, CpuWorkBody(barrier, nested_cpu_tasks)); 369 decider_node_type cpu_restarter_node(g, unlimited, DeciderBody(cpu_subgraph_reruns)); 370 async_node_type async_node(g, unlimited, AsyncSubmissionBody(&activity)); 371 decider_node_type async_restarter_node( 372 g, unlimited, DeciderBody(async_subgraph_reruns), node_priority_t(1) 373 ); 374 375 make_edge(starter_node, cpu_work_node); 376 make_edge(cpu_work_node, cpu_restarter_node); 377 make_edge(output_port<0>(cpu_restarter_node), cpu_work_node); 378 379 make_edge(starter_node, async_node); 380 make_edge(async_node, async_restarter_node); 381 make_edge(output_port<0>(async_restarter_node), async_node); 382 383 starter_node.activate(); 384 g.wait_for_all(); 385 activity.stop_and_wait(); 386 387 const size_t async_task_num = size_t(async_subgraph_reruns); 388 CHECK_MESSAGE( ( g_async_task_ids.size() == async_task_num), "Incorrect number of async tasks." ); 389 unsigned max_span = unsigned(2 * cpu_threads + 1); 390 for( size_t idx = 1; idx < async_task_num; ++idx ) { 391 CHECK_MESSAGE( (g_async_task_ids[idx] - g_async_task_ids[idx-1] <= max_span), 392 "Async tasks were not able to interfere with CPU tasks." ); 393 394 } 395 } 396 ); 397 INFO("done\n"); 398 } 399 } /* ThreadsEagerReaction */ 400 401 namespace LimitingExecutionToPriorityTask { 402 403 enum work_type_t { NONPRIORITIZED_WORK, PRIORITIZED_WORK }; 404 405 struct execution_tracker_t { 406 execution_tracker_t() { reset(); } 407 void reset() { 408 prioritized_work_submitter = std::thread::id(); 409 prioritized_work_started = false; 410 prioritized_work_finished = false; 411 prioritized_work_interrupted = false; 412 } 413 std::thread::id prioritized_work_submitter; 414 std::atomic<bool> prioritized_work_started; 415 bool prioritized_work_finished; 416 bool prioritized_work_interrupted; 417 } exec_tracker; 418 419 template<work_type_t work_type> 420 void do_node_work( int work_size ); 421 422 template<work_type_t> 423 void do_nested_work( const std::thread::id& tid, const tbb::blocked_range<int>& subrange ); 424 425 template<work_type_t work_type> 426 struct CommonBody { 427 CommonBody() : my_body_size( 0 ) { } 428 CommonBody( int body_size ) : my_body_size( body_size ) { } 429 continue_msg operator()( const continue_msg& msg ) const { 430 do_node_work<work_type>(my_body_size); 431 return msg; 432 } 433 void operator()( const tbb::blocked_range<int>& subrange ) const { 434 do_nested_work<work_type>( /*tid=*/std::this_thread::get_id(), subrange ); 435 } 436 int my_body_size; 437 }; 438 439 template<work_type_t work_type> 440 void do_node_work(int work_size) { 441 tbb::parallel_for( tbb::blocked_range<int>(0, work_size), CommonBody<work_type>(), 442 tbb::simple_partitioner() ); 443 } 444 445 template<work_type_t> 446 void do_nested_work( const std::thread::id& tid, const tbb::blocked_range<int>& /*subrange*/ ) { 447 // This is non-prioritized work... 448 if( !exec_tracker.prioritized_work_started || exec_tracker.prioritized_work_submitter != tid ) 449 return; 450 // ...being executed by the thread that initially started prioritized one... 451 CHECK_MESSAGE( exec_tracker.prioritized_work_started, 452 "Prioritized work should have been started by that time." ); 453 // ...prioritized work has been started already... 454 if( exec_tracker.prioritized_work_finished ) 455 return; 456 // ...but has not been finished yet 457 exec_tracker.prioritized_work_interrupted = true; 458 } 459 460 struct IsolationFunctor { 461 int work_size; 462 IsolationFunctor(int ws) : work_size(ws) {} 463 void operator()() const { 464 tbb::parallel_for( tbb::blocked_range<int>(0, work_size), CommonBody<PRIORITIZED_WORK>(), 465 tbb::simple_partitioner() ); 466 } 467 }; 468 469 template<> 470 void do_node_work<PRIORITIZED_WORK>(int work_size) { 471 exec_tracker.prioritized_work_submitter = std::this_thread::get_id(); 472 exec_tracker.prioritized_work_started = true; 473 tbb::this_task_arena::isolate( IsolationFunctor(work_size) ); 474 exec_tracker.prioritized_work_finished = true; 475 } 476 477 template<> 478 void do_nested_work<PRIORITIZED_WORK>( const std::thread::id& tid, 479 const tbb::blocked_range<int>& /*subrange*/ ) { 480 if( exec_tracker.prioritized_work_started && exec_tracker.prioritized_work_submitter == tid ) { 481 CHECK_MESSAGE( !exec_tracker.prioritized_work_interrupted, 482 "Thread was not fully devoted to processing of prioritized task." ); 483 } else { 484 // prolong processing of prioritized work so that the thread that started 485 // prioritized work has higher probability to help with non-prioritized one. 486 spin_for(0.1); 487 } 488 } 489 490 // Using pointers to nodes to avoid errors on compilers, which try to generate assignment operator 491 // for the nodes 492 typedef std::vector< std::unique_ptr<continue_node<continue_msg>> > nodes_container_t; 493 494 void create_nodes( nodes_container_t& nodes, graph& g, int num, int body_size ) { 495 for( int i = 0; i < num; ++i ) 496 nodes.push_back( 497 std::unique_ptr<continue_node<continue_msg>>( 498 new continue_node<continue_msg>( g, CommonBody<NONPRIORITIZED_WORK>( body_size ) ) 499 ) 500 ); 501 } 502 503 void test( int num_threads ) { 504 INFO( "Testing limit execution to priority tasks (num_threads=" << num_threads << ") - " ); 505 506 tbb::task_arena arena( num_threads ); 507 arena.execute( 508 [&]() { 509 const int nodes_num = 100; 510 const int priority_node_position_part = 10; 511 const int pivot = nodes_num / priority_node_position_part; 512 const int nodes_in_lane = 3 * num_threads; 513 const int small_problem_size = 100; 514 const int large_problem_size = 1000; 515 516 graph g; 517 nodes_container_t nodes; 518 create_nodes( nodes, g, pivot, large_problem_size ); 519 nodes.push_back( 520 std::unique_ptr<continue_node<continue_msg>>( 521 new continue_node<continue_msg>( 522 g, CommonBody<PRIORITIZED_WORK>(small_problem_size), node_priority_t(1) 523 ) 524 ) 525 ); 526 create_nodes( nodes, g, nodes_num - pivot - 1, large_problem_size ); 527 528 broadcast_node<continue_msg> bn(g); 529 for( int i = 0; i < nodes_num; ++i ) 530 if( i % nodes_in_lane == 0 ) 531 make_edge( bn, *nodes[i] ); 532 else 533 make_edge( *nodes[i-1], *nodes[i] ); 534 exec_tracker.reset(); 535 bn.try_put( continue_msg() ); 536 g.wait_for_all(); 537 } 538 ); 539 540 INFO( "done\n" ); 541 } 542 543 } /* namespace LimitingExecutionToPriorityTask */ 544 545 namespace NestedCase { 546 547 using tbb::task_arena; 548 549 struct InnerBody { 550 continue_msg operator()( const continue_msg& ) const { 551 return continue_msg(); 552 } 553 }; 554 555 struct OuterBody { 556 int my_max_threads; 557 task_arena** my_inner_arena; 558 OuterBody( int max_threads, task_arena** inner_arena ) 559 : my_max_threads(max_threads), my_inner_arena(inner_arena) {} 560 // copy constructor to please some old compilers 561 OuterBody( const OuterBody& rhs ) 562 : my_max_threads(rhs.my_max_threads), my_inner_arena(rhs.my_inner_arena) {} 563 int operator()( const int& ) { 564 graph inner_graph; 565 continue_node<continue_msg> start_node(inner_graph, InnerBody()); 566 continue_node<continue_msg> mid_node1(inner_graph, InnerBody(), node_priority_t(5)); 567 continue_node<continue_msg> mid_node2(inner_graph, InnerBody()); 568 continue_node<continue_msg> end_node(inner_graph, InnerBody(), node_priority_t(15)); 569 make_edge( start_node, mid_node1 ); 570 make_edge( mid_node1, end_node ); 571 make_edge( start_node, mid_node2 ); 572 make_edge( mid_node2, end_node ); 573 (*my_inner_arena)->execute( [&inner_graph]{ inner_graph.reset(); } ); 574 start_node.try_put( continue_msg() ); 575 inner_graph.wait_for_all(); 576 return 13; 577 } 578 }; 579 580 void execute_outer_graph( bool same_arena, task_arena& inner_arena, int max_threads, 581 graph& outer_graph, function_node<int,int>& start_node ) { 582 if( same_arena ) { 583 start_node.try_put( 42 ); 584 outer_graph.wait_for_all(); 585 return; 586 } 587 588 auto threads_range = utils::concurrency_range(max_threads); 589 for( auto num_threads : threads_range ) { 590 inner_arena.initialize( static_cast<int>(num_threads) ); 591 start_node.try_put( 42 ); 592 outer_graph.wait_for_all(); 593 inner_arena.terminate(); 594 } 595 } 596 597 void test_in_arena( int max_threads, task_arena& outer_arena, task_arena& inner_arena, 598 graph& outer_graph, function_node<int, int>& start_node ) { 599 bool same_arena = &outer_arena == &inner_arena; 600 auto threads_range = utils::concurrency_range(max_threads); 601 for( auto num_threads : threads_range ) { 602 INFO( "Testing nested nodes with specified priority in " << (same_arena? "same" : "different") 603 << " arenas, num_threads=" << num_threads << ") - " ); 604 outer_arena.initialize( static_cast<int>(num_threads) ); 605 outer_arena.execute( [&outer_graph]{ outer_graph.reset(); } ); 606 execute_outer_graph( same_arena, inner_arena, max_threads, outer_graph, start_node ); 607 outer_arena.terminate(); 608 INFO( "done\n" ); 609 } 610 } 611 612 void test( int max_threads ) { 613 task_arena outer_arena; task_arena inner_arena; 614 task_arena* inner_arena_pointer = &outer_arena; // make it same as outer arena in the beginning 615 616 graph outer_graph; 617 const unsigned num_outer_nodes = 10; 618 const size_t concurrency = unlimited; 619 std::vector< std::unique_ptr<function_node<int,int>> > outer_nodes; 620 for( unsigned node_index = 0; node_index < num_outer_nodes; ++node_index ) { 621 node_priority_t priority = no_priority; 622 if( node_index == num_outer_nodes / 2 ) 623 priority = 10; 624 625 outer_nodes.push_back( 626 std::unique_ptr< function_node<int, int> >( 627 new function_node<int,int>( 628 outer_graph, concurrency, OuterBody(max_threads, &inner_arena_pointer), priority 629 ) 630 ) 631 ); 632 } 633 634 for( unsigned node_index1 = 0; node_index1 < num_outer_nodes; ++node_index1 ) 635 for( unsigned node_index2 = node_index1+1; node_index2 < num_outer_nodes; ++node_index2 ) 636 make_edge( *outer_nodes[node_index1], *outer_nodes[node_index2] ); 637 638 test_in_arena( max_threads, outer_arena, outer_arena, outer_graph, *outer_nodes[0] ); 639 640 inner_arena_pointer = &inner_arena; 641 642 test_in_arena( max_threads, outer_arena, inner_arena, outer_graph, *outer_nodes[0] ); 643 } 644 } // namespace NestedCase 645 646 647 namespace BypassPrioritizedTask { 648 649 void common_body( int priority ) { 650 int current_task_index = g_task_num++; 651 g_task_info.push_back( TaskInfo( priority, current_task_index ) ); 652 } 653 654 struct Body { 655 Body( int priority ) : my_priority( priority ) {} 656 continue_msg operator()(const continue_msg&) { 657 common_body( my_priority ); 658 return continue_msg(); 659 } 660 int my_priority; 661 }; 662 663 struct InputNodeBody { 664 continue_msg operator()( tbb::flow_control& fc ){ 665 static bool is_source_executed = false; 666 667 if( is_source_executed ) { 668 fc.stop(); 669 return continue_msg(); 670 } 671 672 common_body( 0 ); 673 is_source_executed = true; 674 675 return continue_msg(); 676 } 677 }; 678 679 template<typename StarterNodeType> 680 StarterNodeType create_starter_node(graph& g) { 681 return continue_node<continue_msg>( g, Body(0) ); 682 } 683 684 template<> 685 input_node<continue_msg> create_starter_node<input_node<continue_msg>>(graph& g) { 686 return input_node<continue_msg>( g, InputNodeBody() ); 687 } 688 689 template<typename StarterNodeType> 690 void start_graph( StarterNodeType& starter ) { 691 starter.try_put( continue_msg() ); 692 } 693 694 template<> 695 void start_graph<input_node<continue_msg>>( input_node<continue_msg>& starter ) { 696 starter.activate(); 697 } 698 699 template<typename StarterNodeType> 700 void test_use_case() { 701 g_task_info.clear(); 702 g_task_num = 0; 703 graph g; 704 StarterNodeType starter = create_starter_node<StarterNodeType>(g); 705 continue_node<continue_msg> spawn_successor( g, Body(1), node_priority_t(1) ); 706 continue_node<continue_msg> bypass_successor( g, Body(2), node_priority_t(2) ); 707 708 make_edge( starter, spawn_successor ); 709 make_edge( starter, bypass_successor ); 710 711 start_graph<StarterNodeType>( starter ); 712 g.wait_for_all(); 713 714 CHECK_MESSAGE( g_task_info.size() == 3, "" ); 715 CHECK_MESSAGE( g_task_info[0].my_task_index == 0, "" ); 716 CHECK_MESSAGE( g_task_info[1].my_task_index == 1, "" ); 717 CHECK_MESSAGE( g_task_info[2].my_task_index == 2, "" ); 718 719 CHECK_MESSAGE( g_task_info[0].my_priority == 0, "" ); 720 CHECK_MESSAGE( g_task_info[1].my_priority == 2, "Bypassed task with higher priority executed in wrong order." ); 721 CHECK_MESSAGE( g_task_info[2].my_priority == 1, "" ); 722 } 723 724 //! The test checks that the task from the node with higher priority, which task gets bypassed, is 725 //! executed first than the one spawned with lower priority. 726 void test() { 727 test_use_case<continue_node<continue_msg>>(); 728 test_use_case<input_node<continue_msg>>(); 729 } 730 731 } // namespace BypassPrioritizedTask 732 733 namespace ManySuccessors { 734 735 struct no_priority_node_body { 736 void operator()(continue_msg) { 737 CHECK_MESSAGE( 738 barrier == 0, "Non-priority successor has to be executed after all priority successors" 739 ); 740 } 741 std::atomic<int>& barrier; 742 }; 743 744 struct priority_node_body { 745 void operator()(continue_msg) { 746 --barrier; 747 while (barrier) 748 tbb::detail::d0::yield(); 749 } 750 std::atomic<int>& barrier; 751 }; 752 753 void test(int num_threads) { 754 tbb::task_arena arena( num_threads ); 755 arena.execute( 756 [&]() { 757 graph g; 758 broadcast_node<continue_msg> bn(g); 759 std::vector< std::unique_ptr<continue_node<continue_msg>> > nodes; 760 std::atomic<int> barrier; 761 for (int i = 0; i < 2 * num_threads; ++i) 762 nodes.push_back( 763 std::unique_ptr<continue_node<continue_msg>>( 764 new continue_node<continue_msg>(g, no_priority_node_body{ barrier }) 765 ) 766 ); 767 for (int i = 0; i < num_threads; ++i) 768 nodes.push_back( 769 std::unique_ptr<continue_node<continue_msg>>( 770 new continue_node<continue_msg>(g, priority_node_body{ barrier }, /*priority*/1) 771 ) 772 ); 773 774 std::random_device rd; 775 std::mt19937 gen(rd()); 776 777 for (int trial = 0; trial < 10; ++trial) { 778 barrier = num_threads; 779 std::shuffle(nodes.begin(), nodes.end(), gen); 780 for (auto& n : nodes) 781 make_edge(bn, *n); 782 bn.try_put(continue_msg()); 783 g.wait_for_all(); 784 for (auto& n : nodes) 785 remove_edge(bn, *n); 786 } 787 } 788 ); 789 } 790 791 } // namespace ManySuccessors 792 793 #if TBB_USE_EXCEPTIONS 794 namespace Exceptions { 795 void test() { 796 using namespace tbb::flow; 797 graph g; 798 std::srand(42); 799 const unsigned num_messages = 50; 800 std::vector<unsigned> throwing_msgs; 801 std::atomic<unsigned> msg_count(0); 802 continue_node<unsigned> c(g, [&msg_count](continue_msg) { 803 return ++msg_count; 804 }, 2); 805 function_node<unsigned> f(g, unlimited, [&throwing_msgs](unsigned v) { 806 for( auto i : throwing_msgs ) { 807 if( i == v ) 808 throw std::runtime_error("Exception::test"); 809 } 810 }, 1); 811 make_edge(c, f); 812 for (int i = 0; i < 10; ++i) { 813 msg_count = 0; 814 g.reset(); 815 throwing_msgs.push_back(std::rand() % num_messages); 816 try { 817 for (unsigned j = 0; j < num_messages; ++j) { 818 c.try_put(continue_msg()); 819 } 820 g.wait_for_all(); 821 FAIL("Unreachable code. The exception is expected"); 822 } catch (std::runtime_error&) { 823 CHECK(g.is_cancelled()); 824 CHECK(g.exception_thrown()); 825 } catch (...) { 826 FAIL("Unexpected exception"); 827 } 828 } 829 } 830 } // namespace Exceptions 831 #endif 832 833 //! Test node prioritization 834 //! \brief \ref requirement 835 TEST_CASE("Priority nodes take precedence"){ 836 for( auto p : utils::concurrency_range() ) { 837 PriorityNodesTakePrecedence::test( static_cast<int>(p) ); 838 } 839 } 840 841 //! Test thread eager reaction 842 //! \brief \ref error_guessing 843 TEST_CASE("Thread eager reaction"){ 844 for( auto p : utils::concurrency_range() ) { 845 ThreadsEagerReaction::test( static_cast<int>(p) ); 846 } 847 } 848 849 //! Test prioritization under concurrency limits 850 //! \brief \ref error_guessing 851 TEST_CASE("Limiting execution to prioritized work") { 852 for( auto p : utils::concurrency_range() ) { 853 LimitingExecutionToPriorityTask::test( static_cast<int>(p) ); 854 } 855 } 856 857 //! Test nested graphs 858 //! \brief \ref error_guessing 859 TEST_CASE("Nested test case") { 860 std::size_t max_threads = utils::get_platform_max_threads(); 861 // The stepping for the threads is done inside. 862 NestedCase::test( static_cast<int>(max_threads) ); 863 } 864 865 //! Test bypassed task with higher priority 866 //! \brief \ref error_guessing 867 TEST_CASE("Bypass prioritized task"){ 868 tbb::global_control gc( tbb::global_control::max_allowed_parallelism, 1 ); 869 BypassPrioritizedTask::test(); 870 } 871 872 //! Test mixing prioritized and ordinary successors 873 //! \brief \ref error_guessing 874 TEST_CASE("Many successors") { 875 for( auto p : utils::concurrency_range() ) { 876 ManySuccessors::test( static_cast<int>(p) ); 877 } 878 } 879 880 #if TBB_USE_EXCEPTIONS 881 //! Test for exceptions 882 //! \brief \ref error_guessing 883 TEST_CASE("Exceptions") { 884 Exceptions::test(); 885 } 886 #endif 887