1 /*
2 Copyright (c) 2018-2021 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #if __INTEL_COMPILER && _MSC_VER
18 #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
19 #endif
20
21 #include "common/config.h"
22
23 #include "tbb/flow_graph.h"
24 #include "tbb/parallel_for.h"
25 #include "tbb/global_control.h"
26 #include "tbb/task_arena.h"
27
28 #include "common/test.h"
29 #include "common/utils.h"
30 #include "common/utils_concurrency_limit.h"
31 #include "common/spin_barrier.h"
32
33 #include <vector>
34 #include <cstdlib>
35 #include <random>
36 #include <algorithm>
37 #include <memory>
38
39
40 //! \file test_flow_graph_priorities.cpp
41 //! \brief Test for [flow_graph.copy_body flow_graph.function_node flow_graph.multifunction_node flow_graph.continue_node flow_graph.async_node] specification
42
43
44 using namespace tbb::flow;
45
46 struct TaskInfo {
TaskInfoTaskInfo47 TaskInfo() : my_priority(-1), my_task_index(-1) {}
TaskInfoTaskInfo48 TaskInfo( int priority, int task_index )
49 : my_priority(priority), my_task_index(task_index) {}
50 int my_priority;
51 int my_task_index;
52 };
53
54 std::vector<TaskInfo> g_task_info;
55
56 std::atomic<unsigned> g_task_num;
57
spin_for(double delta)58 void spin_for( double delta ) {
59 tbb::tick_count start = tbb::tick_count::now();
60 while( (tbb::tick_count::now() - start).seconds() < delta ) ;
61 }
62
63 namespace PriorityNodesTakePrecedence {
64
65 std::atomic<bool> g_work_submitted;
66
67 const unsigned node_num = 100;
68 const unsigned start_index = node_num / 3;
69 const unsigned end_index = node_num * 2 / 3;
70 std::atomic<unsigned> g_priority_task_index;
71
body_func(int priority,utils::SpinBarrier & my_barrier)72 void body_func( int priority, utils::SpinBarrier& my_barrier ) {
73 while( !g_work_submitted.load(std::memory_order_acquire) )
74 tbb::detail::d0::yield();
75 int current_task_index = g_task_num++;
76 if( priority != no_priority )
77 g_task_info[g_priority_task_index++] = TaskInfo( priority, current_task_index );
78 const bool all_threads_will_come =
79 unsigned(current_task_index) < node_num - (node_num % tbb::this_task_arena::max_concurrency());
80 if( all_threads_will_come )
81 my_barrier.wait();
82 }
83
84 typedef multifunction_node< int, std::tuple<int> > multi_node;
85
86 template <typename T>
87 struct Body {
BodyPriorityNodesTakePrecedence::Body88 Body( int priority, utils::SpinBarrier& barrier )
89 : my_priority( priority ), my_barrier( barrier ) {}
operator ()PriorityNodesTakePrecedence::Body90 T operator()( const T& msg ) const {
91 body_func( my_priority, my_barrier );
92 return msg;
93 }
operator ()PriorityNodesTakePrecedence::Body94 void operator()( int msg, multi_node::output_ports_type& op ) const {
95 body_func( my_priority, my_barrier );
96 std::get<0>(op).try_put( msg );
97 }
98 private:
99 int my_priority;
100 utils::SpinBarrier& my_barrier;
101 };
102
103 template<typename NodeType, typename BodyType>
104 struct node_creator_t {
operator ()PriorityNodesTakePrecedence::node_creator_t105 NodeType* operator()( graph& g, unsigned index, utils::SpinBarrier& barrier ) {
106 if( start_index <= index && index < end_index )
107 return new NodeType( g, unlimited, BodyType(index, barrier), node_priority_t(index) );
108 else
109 return new NodeType( g, unlimited, BodyType(no_priority, barrier) );
110 }
111 };
112
113 template<typename BodyType>
114 struct node_creator_t< continue_node<continue_msg>, BodyType > {
operator ()PriorityNodesTakePrecedence::node_creator_t115 continue_node<continue_msg>* operator()( graph& g, unsigned index, utils::SpinBarrier& barrier ) {
116 if( start_index <= index && index < end_index )
117 return new continue_node<continue_msg>( g, BodyType(index, barrier), node_priority_t(index) );
118 else
119 return new continue_node<continue_msg>( g, BodyType(no_priority, barrier) );
120 }
121 };
122
123
124 struct passthru_body {
125 template<typename T>
operator ()PriorityNodesTakePrecedence::passthru_body126 continue_msg operator()( T ) const { return continue_msg(); }
127 };
128
129 template<typename NodeType, typename NodeTypeCreator>
test_node(NodeTypeCreator node_creator)130 void test_node( NodeTypeCreator node_creator ) {
131 const int num_threads = tbb::this_task_arena::max_concurrency();
132 utils::SpinBarrier barrier( num_threads );
133 graph g;
134 broadcast_node<typename NodeType::input_type> bn(g);
135 function_node<typename NodeType::input_type> tn(g, unlimited, passthru_body());
136 // Using pointers to nodes to avoid errors on compilers, which try to generate assignment
137 // operator for the nodes
138 std::vector< std::unique_ptr<NodeType> > nodes;
139 for( unsigned i = 0; i < node_num; ++i ) {
140 nodes.push_back(std::unique_ptr<NodeType>( node_creator(g, i, barrier) ));
141 make_edge( bn, *nodes.back() );
142 make_edge( *nodes.back(), tn );
143 }
144
145 const size_t repeats = 10;
146 const size_t priority_nodes_num = end_index - start_index;
147 size_t global_order_failures = 0;
148 for( size_t repeat = 0; repeat < repeats; ++repeat ) {
149 g_work_submitted.store( false, std::memory_order_release );
150 g_task_num = g_priority_task_index = 0;
151 g_task_info.clear(); g_task_info.resize( priority_nodes_num );
152
153 bn.try_put( typename NodeType::input_type{} );
154 // Setting of the flag is based on the knowledge that the calling thread broadcasts the
155 // message to successor nodes. Thus, once the calling thread returns from try_put() call all
156 // necessary tasks are spawned. Thus, this makes this test to be a whitebox test to some
157 // extent.
158 g_work_submitted.store( true, std::memory_order_release );
159
160 g.wait_for_all();
161
162 CHECK_MESSAGE( (g_priority_task_index == g_task_info.size()), "Incorrect number of tasks with priority." );
163 CHECK_MESSAGE( (priority_nodes_num == g_task_info.size()), "Incorrect number of tasks with priority executed." );
164
165 for( unsigned i = 0; i < g_priority_task_index; i += num_threads ) {
166 bool found = false;
167 unsigned highest_priority_within_group = end_index - i - 1;
168 for( unsigned j = i; j < i+num_threads; ++j ) {
169 if( g_task_info[j].my_priority == int(highest_priority_within_group) ) {
170 found = true;
171 break;
172 }
173 }
174 CHECK_MESSAGE( found, "Highest priority task within a group was not found" );
175 }
176 for( unsigned i = 0; i < g_priority_task_index; ++i ) {
177 // This check might fail because priorities do not guarantee ordering, i.e. assumption
178 // that all priority nodes should increment the task counter before any subsequent
179 // no-priority node is not correct. In the worst case, a thread that took a priority
180 // node might be preempted and become the last to increment the counter. That's why the
181 // test passing is based on statistics, which could be affected by machine overload
182 // unfortunately.
183 // TODO revamp: reconsider the following check for this test
184 if( g_task_info[i].my_task_index > int(priority_nodes_num + num_threads) )
185 ++global_order_failures;
186 }
187 }
188 float failure_ratio = float(global_order_failures) / float(repeats*priority_nodes_num);
189 CHECK_MESSAGE(
190 failure_ratio <= 0.1f,
191 "Nodes with priorities executed in wrong order too frequently over non-prioritized nodes."
192 );
193 }
194
195 template<typename NodeType, typename NodeBody>
call_within_arena(tbb::task_arena & arena)196 void call_within_arena( tbb::task_arena& arena ) {
197 arena.execute(
198 [&]() {
199 test_node<NodeType>( node_creator_t<NodeType, NodeBody>() );
200 }
201 );
202 }
203
test(int num_threads)204 void test( int num_threads ) {
205 INFO( "Testing execution of nodes with priority takes precedence (num_threads=" << num_threads << ") - " );
206 tbb::task_arena arena(num_threads);
207 call_within_arena< function_node<int,int>, Body<int> >( arena );
208 call_within_arena< multi_node, Body<int> >( arena );
209 call_within_arena< continue_node<continue_msg>, Body<continue_msg> >( arena );
210 }
211
212 } /* namespace PriorityNodesTakePrecedence */
213
214 namespace ThreadsEagerReaction {
215
216 // TODO revamp: combine with similar queue from test_async_node
217 template <typename T>
218 class concurrent_queue {
219 public:
try_pop(T & item)220 bool try_pop(T& item) {
221 std::lock_guard<queue_mutex> lock(mutex);
222 if ( q.empty() )
223 return false;
224 item = q.front();
225 q.pop();
226 return true;
227 }
228
push(const T & item)229 void push(const T& item) {
230 std::lock_guard<queue_mutex> lock(mutex);
231 q.push(item);
232 }
233 private:
234 std::queue<T> q;
235 using queue_mutex = std::mutex;
236 std::mutex mutex;
237 };
238
239 using utils::SpinBarrier;
240
241 enum task_type_t { no_task, regular_task, async_task };
242
243 struct profile_t {
244 task_type_t task_type;
245 unsigned global_task_id;
246 double elapsed;
247 };
248
249 std::vector<unsigned> g_async_task_ids;
250
251 typedef unsigned data_type;
252 typedef async_node<data_type, data_type> async_node_type;
253 typedef multifunction_node<
254 data_type, std::tuple<data_type, data_type> > decider_node_type;
255 struct AsyncActivity {
256 typedef async_node_type::gateway_type gateway_type;
257
258 struct work_type { data_type input; gateway_type* gateway; };
259 std::atomic<bool> done;
260 concurrent_queue<work_type> my_queue;
261 std::thread my_service_thread;
262
263 struct ServiceThreadFunc {
264 SpinBarrier& my_barrier;
ServiceThreadFuncThreadsEagerReaction::AsyncActivity::ServiceThreadFunc265 ServiceThreadFunc(SpinBarrier& barrier) : my_barrier(barrier) {}
operator ()ThreadsEagerReaction::AsyncActivity::ServiceThreadFunc266 void operator()(AsyncActivity* activity) {
267 while (!activity->done) {
268 work_type work;
269 while (activity->my_queue.try_pop(work)) {
270 g_async_task_ids.push_back( ++g_task_num );
271 work.gateway->try_put(work.input);
272 work.gateway->release_wait();
273 my_barrier.wait();
274 }
275 }
276 }
277 };
stop_and_waitThreadsEagerReaction::AsyncActivity278 void stop_and_wait() { done = true; my_service_thread.join(); }
279
submitThreadsEagerReaction::AsyncActivity280 void submit(data_type input, gateway_type* gateway) {
281 work_type work = { input, gateway };
282 gateway->reserve_wait();
283 my_queue.push(work);
284 }
AsyncActivityThreadsEagerReaction::AsyncActivity285 AsyncActivity(SpinBarrier& barrier)
286 : done(false), my_service_thread(ServiceThreadFunc(barrier), this) {}
287 };
288
289 struct StartBody {
290 bool has_run;
operator ()ThreadsEagerReaction::StartBody291 data_type operator()(tbb::flow_control& fc) {
292 if (has_run){
293 fc.stop();
294 return data_type();
295 }
296 has_run = true;
297 return 1;
298 }
StartBodyThreadsEagerReaction::StartBody299 StartBody() : has_run(false) {}
300 };
301
302 struct ParallelForBody {
303 SpinBarrier& my_barrier;
304 const data_type& my_input;
ParallelForBodyThreadsEagerReaction::ParallelForBody305 ParallelForBody(SpinBarrier& barrier, const data_type& input)
306 : my_barrier(barrier), my_input(input) {}
operator ()ThreadsEagerReaction::ParallelForBody307 void operator()(const data_type&) const {
308 my_barrier.wait();
309 ++g_task_num;
310 }
311 };
312
313 struct CpuWorkBody {
314 SpinBarrier& my_barrier;
315 const int my_tasks_count;
operator ()ThreadsEagerReaction::CpuWorkBody316 data_type operator()(const data_type& input) {
317 tbb::parallel_for(0, my_tasks_count, ParallelForBody(my_barrier, input), tbb::simple_partitioner());
318 return input;
319 }
CpuWorkBodyThreadsEagerReaction::CpuWorkBody320 CpuWorkBody(SpinBarrier& barrier, int tasks_count)
321 : my_barrier(barrier), my_tasks_count(tasks_count) {}
322 };
323
324 struct DeciderBody {
325 const data_type my_limit;
DeciderBodyThreadsEagerReaction::DeciderBody326 DeciderBody( const data_type& limit ) : my_limit( limit ) {}
operator ()ThreadsEagerReaction::DeciderBody327 void operator()(data_type input, decider_node_type::output_ports_type& ports) {
328 if (input < my_limit)
329 std::get<0>(ports).try_put(input + 1);
330 }
331 };
332
333 struct AsyncSubmissionBody {
334 AsyncActivity* my_activity;
335 // It is important that async_node in the test executes without spawning a TBB task, because
336 // it passes the work to asynchronous thread, which unlocks the barrier that is waited
337 // by every execution thread (asynchronous thread and any TBB worker or main thread).
338 // This is why async_node's body marked noexcept.
operator ()ThreadsEagerReaction::AsyncSubmissionBody339 void operator()(data_type input, async_node_type::gateway_type& gateway) noexcept {
340 my_activity->submit(input, &gateway);
341 }
AsyncSubmissionBodyThreadsEagerReaction::AsyncSubmissionBody342 AsyncSubmissionBody(AsyncActivity* activity) : my_activity(activity) {}
343 };
344
test(unsigned num_threads)345 void test( unsigned num_threads ) {
346 INFO( "Testing threads react eagerly on asynchronous tasks (num_threads=" << num_threads << ") - " );
347 if( num_threads == std::thread::hardware_concurrency() ) {
348 // one thread is required for asynchronous compute resource
349 INFO("skipping test since it is designed to work on less number of threads than "
350 "hardware concurrency allows\n");
351 return;
352 }
353 const unsigned cpu_threads = unsigned(num_threads);
354 const unsigned cpu_tasks_per_thread = 4;
355 const unsigned nested_cpu_tasks = cpu_tasks_per_thread * cpu_threads;
356 const unsigned async_subgraph_reruns = 8;
357 const unsigned cpu_subgraph_reruns = 2;
358
359 SpinBarrier barrier(cpu_threads + /*async thread=*/1);
360 g_task_num = 0;
361 g_async_task_ids.clear();
362 g_async_task_ids.reserve(async_subgraph_reruns);
363
364 tbb::task_arena arena(cpu_threads);
365 arena.execute(
366 [&]() {
367 AsyncActivity activity(barrier);
368 graph g;
369
370 input_node<data_type> starter_node(g, StartBody());
371 function_node<data_type, data_type> cpu_work_node(
372 g, unlimited, CpuWorkBody(barrier, nested_cpu_tasks));
373 decider_node_type cpu_restarter_node(g, unlimited, DeciderBody(cpu_subgraph_reruns));
374 async_node_type async_node(g, unlimited, AsyncSubmissionBody(&activity));
375 decider_node_type async_restarter_node(
376 g, unlimited, DeciderBody(async_subgraph_reruns), node_priority_t(1)
377 );
378
379 make_edge(starter_node, cpu_work_node);
380 make_edge(cpu_work_node, cpu_restarter_node);
381 make_edge(output_port<0>(cpu_restarter_node), cpu_work_node);
382
383 make_edge(starter_node, async_node);
384 make_edge(async_node, async_restarter_node);
385 make_edge(output_port<0>(async_restarter_node), async_node);
386
387 starter_node.activate();
388 g.wait_for_all();
389 activity.stop_and_wait();
390
391 const size_t async_task_num = size_t(async_subgraph_reruns);
392 CHECK_MESSAGE( ( g_async_task_ids.size() == async_task_num), "Incorrect number of async tasks." );
393 unsigned max_span = unsigned(2 * cpu_threads + 1);
394 for( size_t idx = 1; idx < async_task_num; ++idx ) {
395 CHECK_MESSAGE( (g_async_task_ids[idx] - g_async_task_ids[idx-1] <= max_span),
396 "Async tasks were not able to interfere with CPU tasks." );
397
398 }
399 }
400 );
401 INFO("done\n");
402 }
403 } /* ThreadsEagerReaction */
404
405 namespace LimitingExecutionToPriorityTask {
406
407 enum work_type_t { NONPRIORITIZED_WORK, PRIORITIZED_WORK };
408
409 struct execution_tracker_t {
execution_tracker_tLimitingExecutionToPriorityTask::execution_tracker_t410 execution_tracker_t() { reset(); }
resetLimitingExecutionToPriorityTask::execution_tracker_t411 void reset() {
412 prioritized_work_submitter = std::thread::id();
413 prioritized_work_started = false;
414 prioritized_work_finished = false;
415 prioritized_work_interrupted = false;
416 }
417 std::thread::id prioritized_work_submitter;
418 std::atomic<bool> prioritized_work_started;
419 bool prioritized_work_finished;
420 bool prioritized_work_interrupted;
421 } exec_tracker;
422
423 template<work_type_t work_type>
424 void do_node_work( int work_size );
425
426 template<work_type_t>
427 void do_nested_work( const std::thread::id& tid, const tbb::blocked_range<int>& subrange );
428
429 template<work_type_t work_type>
430 struct CommonBody {
CommonBodyLimitingExecutionToPriorityTask::CommonBody431 CommonBody() : my_body_size( 0 ) { }
CommonBodyLimitingExecutionToPriorityTask::CommonBody432 CommonBody( int body_size ) : my_body_size( body_size ) { }
operator ()LimitingExecutionToPriorityTask::CommonBody433 continue_msg operator()( const continue_msg& msg ) const {
434 do_node_work<work_type>(my_body_size);
435 return msg;
436 }
operator ()LimitingExecutionToPriorityTask::CommonBody437 void operator()( const tbb::blocked_range<int>& subrange ) const {
438 do_nested_work<work_type>( /*tid=*/std::this_thread::get_id(), subrange );
439 }
440 int my_body_size;
441 };
442
443 template<work_type_t work_type>
do_node_work(int work_size)444 void do_node_work(int work_size) {
445 tbb::parallel_for( tbb::blocked_range<int>(0, work_size), CommonBody<work_type>(),
446 tbb::simple_partitioner() );
447 }
448
449 template<work_type_t>
do_nested_work(const std::thread::id & tid,const tbb::blocked_range<int> &)450 void do_nested_work( const std::thread::id& tid, const tbb::blocked_range<int>& /*subrange*/ ) {
451 // This is non-prioritized work...
452 if( !exec_tracker.prioritized_work_started || exec_tracker.prioritized_work_submitter != tid )
453 return;
454 // ...being executed by the thread that initially started prioritized one...
455 CHECK_MESSAGE( exec_tracker.prioritized_work_started,
456 "Prioritized work should have been started by that time." );
457 // ...prioritized work has been started already...
458 if( exec_tracker.prioritized_work_finished )
459 return;
460 // ...but has not been finished yet
461 exec_tracker.prioritized_work_interrupted = true;
462 }
463
464 struct IsolationFunctor {
465 int work_size;
IsolationFunctorLimitingExecutionToPriorityTask::IsolationFunctor466 IsolationFunctor(int ws) : work_size(ws) {}
operator ()LimitingExecutionToPriorityTask::IsolationFunctor467 void operator()() const {
468 tbb::parallel_for( tbb::blocked_range<int>(0, work_size), CommonBody<PRIORITIZED_WORK>(),
469 tbb::simple_partitioner() );
470 }
471 };
472
473 template<>
do_node_work(int work_size)474 void do_node_work<PRIORITIZED_WORK>(int work_size) {
475 exec_tracker.prioritized_work_submitter = std::this_thread::get_id();
476 exec_tracker.prioritized_work_started = true;
477 tbb::this_task_arena::isolate( IsolationFunctor(work_size) );
478 exec_tracker.prioritized_work_finished = true;
479 }
480
481 template<>
do_nested_work(const std::thread::id & tid,const tbb::blocked_range<int> &)482 void do_nested_work<PRIORITIZED_WORK>( const std::thread::id& tid,
483 const tbb::blocked_range<int>& /*subrange*/ ) {
484 if( exec_tracker.prioritized_work_started && exec_tracker.prioritized_work_submitter == tid ) {
485 CHECK_MESSAGE( !exec_tracker.prioritized_work_interrupted,
486 "Thread was not fully devoted to processing of prioritized task." );
487 } else {
488 // prolong processing of prioritized work so that the thread that started
489 // prioritized work has higher probability to help with non-prioritized one.
490 spin_for(0.1);
491 }
492 }
493
494 // Using pointers to nodes to avoid errors on compilers, which try to generate assignment operator
495 // for the nodes
496 typedef std::vector< std::unique_ptr<continue_node<continue_msg>> > nodes_container_t;
497
create_nodes(nodes_container_t & nodes,graph & g,int num,int body_size)498 void create_nodes( nodes_container_t& nodes, graph& g, int num, int body_size ) {
499 for( int i = 0; i < num; ++i )
500 nodes.push_back(
501 std::unique_ptr<continue_node<continue_msg>>(
502 new continue_node<continue_msg>( g, CommonBody<NONPRIORITIZED_WORK>( body_size ) )
503 )
504 );
505 }
506
test(int num_threads)507 void test( int num_threads ) {
508 INFO( "Testing limit execution to priority tasks (num_threads=" << num_threads << ") - " );
509
510 tbb::task_arena arena( num_threads );
511 arena.execute(
512 [&]() {
513 const int nodes_num = 100;
514 const int priority_node_position_part = 10;
515 const int pivot = nodes_num / priority_node_position_part;
516 const int nodes_in_lane = 3 * num_threads;
517 const int small_problem_size = 100;
518 const int large_problem_size = 1000;
519
520 graph g;
521 nodes_container_t nodes;
522 create_nodes( nodes, g, pivot, large_problem_size );
523 nodes.push_back(
524 std::unique_ptr<continue_node<continue_msg>>(
525 new continue_node<continue_msg>(
526 g, CommonBody<PRIORITIZED_WORK>(small_problem_size), node_priority_t(1)
527 )
528 )
529 );
530 create_nodes( nodes, g, nodes_num - pivot - 1, large_problem_size );
531
532 broadcast_node<continue_msg> bn(g);
533 for( int i = 0; i < nodes_num; ++i )
534 if( i % nodes_in_lane == 0 )
535 make_edge( bn, *nodes[i] );
536 else
537 make_edge( *nodes[i-1], *nodes[i] );
538 exec_tracker.reset();
539 bn.try_put( continue_msg() );
540 g.wait_for_all();
541 }
542 );
543
544 INFO( "done\n" );
545 }
546
547 } /* namespace LimitingExecutionToPriorityTask */
548
549 namespace NestedCase {
550
551 using tbb::task_arena;
552
553 struct InnerBody {
operator ()NestedCase::InnerBody554 continue_msg operator()( const continue_msg& ) const {
555 return continue_msg();
556 }
557 };
558
559 struct OuterBody {
560 int my_max_threads;
561 task_arena** my_inner_arena;
OuterBodyNestedCase::OuterBody562 OuterBody( int max_threads, task_arena** inner_arena )
563 : my_max_threads(max_threads), my_inner_arena(inner_arena) {}
564 // copy constructor to please some old compilers
OuterBodyNestedCase::OuterBody565 OuterBody( const OuterBody& rhs )
566 : my_max_threads(rhs.my_max_threads), my_inner_arena(rhs.my_inner_arena) {}
operator ()NestedCase::OuterBody567 int operator()( const int& ) {
568 graph inner_graph;
569 continue_node<continue_msg> start_node(inner_graph, InnerBody());
570 continue_node<continue_msg> mid_node1(inner_graph, InnerBody(), node_priority_t(5));
571 continue_node<continue_msg> mid_node2(inner_graph, InnerBody());
572 continue_node<continue_msg> end_node(inner_graph, InnerBody(), node_priority_t(15));
573 make_edge( start_node, mid_node1 );
574 make_edge( mid_node1, end_node );
575 make_edge( start_node, mid_node2 );
576 make_edge( mid_node2, end_node );
577 (*my_inner_arena)->execute( [&inner_graph]{ inner_graph.reset(); } );
578 start_node.try_put( continue_msg() );
579 inner_graph.wait_for_all();
580 return 13;
581 }
582 };
583
execute_outer_graph(bool same_arena,task_arena & inner_arena,int max_threads,graph & outer_graph,function_node<int,int> & start_node)584 void execute_outer_graph( bool same_arena, task_arena& inner_arena, int max_threads,
585 graph& outer_graph, function_node<int,int>& start_node ) {
586 if( same_arena ) {
587 start_node.try_put( 42 );
588 outer_graph.wait_for_all();
589 return;
590 }
591
592 auto threads_range = utils::concurrency_range(max_threads);
593 for( auto num_threads : threads_range ) {
594 inner_arena.initialize( static_cast<int>(num_threads) );
595 start_node.try_put( 42 );
596 outer_graph.wait_for_all();
597 inner_arena.terminate();
598 }
599 }
600
test_in_arena(int max_threads,task_arena & outer_arena,task_arena & inner_arena,graph & outer_graph,function_node<int,int> & start_node)601 void test_in_arena( int max_threads, task_arena& outer_arena, task_arena& inner_arena,
602 graph& outer_graph, function_node<int, int>& start_node ) {
603 bool same_arena = &outer_arena == &inner_arena;
604 auto threads_range = utils::concurrency_range(max_threads);
605 for( auto num_threads : threads_range ) {
606 INFO( "Testing nested nodes with specified priority in " << (same_arena? "same" : "different")
607 << " arenas, num_threads=" << num_threads << ") - " );
608 outer_arena.initialize( static_cast<int>(num_threads) );
609 outer_arena.execute( [&outer_graph]{ outer_graph.reset(); } );
610 execute_outer_graph( same_arena, inner_arena, max_threads, outer_graph, start_node );
611 outer_arena.terminate();
612 INFO( "done\n" );
613 }
614 }
615
test(int max_threads)616 void test( int max_threads ) {
617 task_arena outer_arena; task_arena inner_arena;
618 task_arena* inner_arena_pointer = &outer_arena; // make it same as outer arena in the beginning
619
620 graph outer_graph;
621 const unsigned num_outer_nodes = 10;
622 const size_t concurrency = unlimited;
623 std::vector< std::unique_ptr<function_node<int,int>> > outer_nodes;
624 for( unsigned node_index = 0; node_index < num_outer_nodes; ++node_index ) {
625 node_priority_t priority = no_priority;
626 if( node_index == num_outer_nodes / 2 )
627 priority = 10;
628
629 outer_nodes.push_back(
630 std::unique_ptr< function_node<int, int> >(
631 new function_node<int,int>(
632 outer_graph, concurrency, OuterBody(max_threads, &inner_arena_pointer), priority
633 )
634 )
635 );
636 }
637
638 for( unsigned node_index1 = 0; node_index1 < num_outer_nodes; ++node_index1 )
639 for( unsigned node_index2 = node_index1+1; node_index2 < num_outer_nodes; ++node_index2 )
640 make_edge( *outer_nodes[node_index1], *outer_nodes[node_index2] );
641
642 test_in_arena( max_threads, outer_arena, outer_arena, outer_graph, *outer_nodes[0] );
643
644 inner_arena_pointer = &inner_arena;
645
646 test_in_arena( max_threads, outer_arena, inner_arena, outer_graph, *outer_nodes[0] );
647 }
648 } // namespace NestedCase
649
650
651 namespace BypassPrioritizedTask {
652
common_body(int priority)653 void common_body( int priority ) {
654 int current_task_index = g_task_num++;
655 g_task_info.push_back( TaskInfo( priority, current_task_index ) );
656 }
657
658 struct Body {
BodyBypassPrioritizedTask::Body659 Body( int priority ) : my_priority( priority ) {}
operator ()BypassPrioritizedTask::Body660 continue_msg operator()(const continue_msg&) {
661 common_body( my_priority );
662 return continue_msg();
663 }
664 int my_priority;
665 };
666
667 struct InputNodeBody {
operator ()BypassPrioritizedTask::InputNodeBody668 continue_msg operator()( tbb::flow_control& fc ){
669 static bool is_source_executed = false;
670
671 if( is_source_executed ) {
672 fc.stop();
673 return continue_msg();
674 }
675
676 common_body( 0 );
677 is_source_executed = true;
678
679 return continue_msg();
680 }
681 };
682
683 template<typename StarterNodeType>
create_starter_node(graph & g)684 StarterNodeType create_starter_node(graph& g) {
685 return continue_node<continue_msg>( g, Body(0) );
686 }
687
688 template<>
create_starter_node(graph & g)689 input_node<continue_msg> create_starter_node<input_node<continue_msg>>(graph& g) {
690 return input_node<continue_msg>( g, InputNodeBody() );
691 }
692
693 template<typename StarterNodeType>
start_graph(StarterNodeType & starter)694 void start_graph( StarterNodeType& starter ) {
695 starter.try_put( continue_msg() );
696 }
697
698 template<>
start_graph(input_node<continue_msg> & starter)699 void start_graph<input_node<continue_msg>>( input_node<continue_msg>& starter ) {
700 starter.activate();
701 }
702
703 template<typename StarterNodeType>
test_use_case()704 void test_use_case() {
705 g_task_info.clear();
706 g_task_num = 0;
707 graph g;
708 StarterNodeType starter = create_starter_node<StarterNodeType>(g);
709 continue_node<continue_msg> spawn_successor( g, Body(1), node_priority_t(1) );
710 continue_node<continue_msg> bypass_successor( g, Body(2), node_priority_t(2) );
711
712 make_edge( starter, spawn_successor );
713 make_edge( starter, bypass_successor );
714
715 start_graph<StarterNodeType>( starter );
716 g.wait_for_all();
717
718 CHECK_MESSAGE( g_task_info.size() == 3, "" );
719 CHECK_MESSAGE( g_task_info[0].my_task_index == 0, "" );
720 CHECK_MESSAGE( g_task_info[1].my_task_index == 1, "" );
721 CHECK_MESSAGE( g_task_info[2].my_task_index == 2, "" );
722
723 CHECK_MESSAGE( g_task_info[0].my_priority == 0, "" );
724 CHECK_MESSAGE( g_task_info[1].my_priority == 2, "Bypassed task with higher priority executed in wrong order." );
725 CHECK_MESSAGE( g_task_info[2].my_priority == 1, "" );
726 }
727
728 //! The test checks that the task from the node with higher priority, which task gets bypassed, is
729 //! executed first than the one spawned with lower priority.
test()730 void test() {
731 test_use_case<continue_node<continue_msg>>();
732 test_use_case<input_node<continue_msg>>();
733 }
734
735 } // namespace BypassPrioritizedTask
736
737 namespace ManySuccessors {
738
739 struct no_priority_node_body {
operator ()ManySuccessors::no_priority_node_body740 void operator()(continue_msg) {
741 CHECK_MESSAGE(
742 barrier == 0, "Non-priority successor has to be executed after all priority successors"
743 );
744 }
745 std::atomic<int>& barrier;
746 };
747
748 struct priority_node_body {
operator ()ManySuccessors::priority_node_body749 void operator()(continue_msg) {
750 --barrier;
751 while (barrier)
752 tbb::detail::d0::yield();
753 }
754 std::atomic<int>& barrier;
755 };
756
test(int num_threads)757 void test(int num_threads) {
758 tbb::task_arena arena( num_threads );
759 arena.execute(
760 [&]() {
761 graph g;
762 broadcast_node<continue_msg> bn(g);
763 std::vector< std::unique_ptr<continue_node<continue_msg>> > nodes;
764 std::atomic<int> barrier;
765 for (int i = 0; i < 2 * num_threads; ++i)
766 nodes.push_back(
767 std::unique_ptr<continue_node<continue_msg>>(
768 new continue_node<continue_msg>(g, no_priority_node_body{ barrier })
769 )
770 );
771 for (int i = 0; i < num_threads; ++i)
772 nodes.push_back(
773 std::unique_ptr<continue_node<continue_msg>>(
774 new continue_node<continue_msg>(g, priority_node_body{ barrier }, /*priority*/1)
775 )
776 );
777
778 std::random_device rd;
779 std::mt19937 gen(rd());
780
781 for (int trial = 0; trial < 10; ++trial) {
782 barrier = num_threads;
783 std::shuffle(nodes.begin(), nodes.end(), gen);
784 for (auto& n : nodes)
785 make_edge(bn, *n);
786 bn.try_put(continue_msg());
787 g.wait_for_all();
788 for (auto& n : nodes)
789 remove_edge(bn, *n);
790 }
791 }
792 );
793 }
794
795 } // namespace ManySuccessors
796
797 #if TBB_USE_EXCEPTIONS
798 namespace Exceptions {
test()799 void test() {
800 using namespace tbb::flow;
801 graph g;
802 std::srand(42);
803 const unsigned num_messages = 50;
804 std::vector<unsigned> throwing_msgs;
805 std::atomic<unsigned> msg_count(0);
806 continue_node<unsigned> c(g, [&msg_count](continue_msg) {
807 return ++msg_count;
808 }, 2);
809 function_node<unsigned> f(g, unlimited, [&throwing_msgs](unsigned v) {
810 for( auto i : throwing_msgs ) {
811 if( i == v )
812 throw std::runtime_error("Exception::test");
813 }
814 }, 1);
815 make_edge(c, f);
816 for (int i = 0; i < 10; ++i) {
817 msg_count = 0;
818 g.reset();
819 throwing_msgs.push_back(std::rand() % num_messages);
820 try {
821 for (unsigned j = 0; j < num_messages; ++j) {
822 c.try_put(continue_msg());
823 }
824 g.wait_for_all();
825 FAIL("Unreachable code. The exception is expected");
826 } catch (std::runtime_error&) {
827 CHECK(g.is_cancelled());
828 CHECK(g.exception_thrown());
829 } catch (...) {
830 FAIL("Unexpected exception");
831 }
832 }
833 }
834 } // namespace Exceptions
835 #endif
836
837 //! Test node prioritization
838 //! \brief \ref requirement
839 TEST_CASE("Priority nodes take precedence"){
840 for( auto p : utils::concurrency_range() ) {
841 PriorityNodesTakePrecedence::test( static_cast<int>(p) );
842 }
843 }
844
845 //! Test thread eager reaction
846 //! \brief \ref error_guessing
847 TEST_CASE("Thread eager reaction"){
848 for( auto p : utils::concurrency_range() ) {
849 ThreadsEagerReaction::test( static_cast<int>(p) );
850 }
851 }
852
853 //! Test prioritization under concurrency limits
854 //! \brief \ref error_guessing
855 TEST_CASE("Limiting execution to prioritized work") {
856 for( auto p : utils::concurrency_range() ) {
857 LimitingExecutionToPriorityTask::test( static_cast<int>(p) );
858 }
859 }
860
861 //! Test nested graphs
862 //! \brief \ref error_guessing
863 TEST_CASE("Nested test case") {
864 std::size_t max_threads = utils::get_platform_max_threads();
865 // The stepping for the threads is done inside.
866 NestedCase::test( static_cast<int>(max_threads) );
867 }
868
869 //! Test bypassed task with higher priority
870 //! \brief \ref error_guessing
871 TEST_CASE("Bypass prioritized task"){
872 tbb::global_control gc( tbb::global_control::max_allowed_parallelism, 1 );
873 BypassPrioritizedTask::test();
874 }
875
876 //! Test mixing prioritized and ordinary successors
877 //! \brief \ref error_guessing
878 TEST_CASE("Many successors") {
879 for( auto p : utils::concurrency_range() ) {
880 ManySuccessors::test( static_cast<int>(p) );
881 }
882 }
883
884 #if TBB_USE_EXCEPTIONS
885 //! Test for exceptions
886 //! \brief \ref error_guessing
887 TEST_CASE("Exceptions") {
888 Exceptions::test();
889 }
890 #endif
891