1 /*
2     Copyright (c) 2018-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #if __INTEL_COMPILER && _MSC_VER
18 #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
19 #endif
20 
21 #include "common/config.h"
22 
23 #include "tbb/flow_graph.h"
24 #include "tbb/parallel_for.h"
25 #include "tbb/global_control.h"
26 #include "tbb/task_arena.h"
27 
28 #include "common/test.h"
29 #include "common/utils.h"
30 #include "common/utils_concurrency_limit.h"
31 #include "common/spin_barrier.h"
32 
33 #include <vector>
34 #include <cstdlib>
35 #include <random>
36 #include <algorithm>
37 #include <memory>
38 
39 
40 //! \file test_flow_graph_priorities.cpp
41 //! \brief Test for [flow_graph.copy_body flow_graph.function_node flow_graph.multifunction_node flow_graph.continue_node flow_graph.async_node] specification
42 
43 
44 using namespace tbb::flow;
45 
46 struct TaskInfo {
TaskInfoTaskInfo47     TaskInfo() : my_priority(-1), my_task_index(-1) {}
TaskInfoTaskInfo48     TaskInfo( int priority, int task_index )
49         : my_priority(priority), my_task_index(task_index) {}
50     int my_priority;
51     int my_task_index;
52 };
53 
54 std::vector<TaskInfo> g_task_info;
55 
56 std::atomic<unsigned> g_task_num;
57 
spin_for(double delta)58 void spin_for( double delta ) {
59     tbb::tick_count start = tbb::tick_count::now();
60     while( (tbb::tick_count::now() - start).seconds() < delta ) ;
61 }
62 
63 namespace PriorityNodesTakePrecedence {
64 
65 std::atomic<bool> g_work_submitted;
66 
67 const unsigned node_num = 100;
68 const unsigned start_index = node_num / 3;
69 const unsigned end_index = node_num * 2 / 3;
70 std::atomic<unsigned> g_priority_task_index;
71 
body_func(int priority,utils::SpinBarrier & my_barrier)72 void body_func( int priority, utils::SpinBarrier& my_barrier ) {
73     while( !g_work_submitted.load(std::memory_order_acquire) )
74         tbb::detail::d0::yield();
75     int current_task_index = g_task_num++;
76     if( priority != no_priority )
77         g_task_info[g_priority_task_index++] = TaskInfo( priority, current_task_index );
78     const bool all_threads_will_come =
79         unsigned(current_task_index) < node_num - (node_num % tbb::this_task_arena::max_concurrency());
80     if( all_threads_will_come )
81         my_barrier.wait();
82 }
83 
84 typedef multifunction_node< int, std::tuple<int> > multi_node;
85 
86 template <typename T>
87 struct Body {
BodyPriorityNodesTakePrecedence::Body88     Body( int priority, utils::SpinBarrier& barrier )
89         : my_priority( priority ), my_barrier( barrier ) {}
operator ()PriorityNodesTakePrecedence::Body90     T operator()( const T& msg ) const {
91         body_func( my_priority, my_barrier );
92         return msg;
93     }
operator ()PriorityNodesTakePrecedence::Body94     void operator()( int msg, multi_node::output_ports_type& op ) const {
95         body_func( my_priority, my_barrier );
96         std::get<0>(op).try_put( msg );
97     }
98 private:
99     int my_priority;
100     utils::SpinBarrier& my_barrier;
101 };
102 
103 template<typename NodeType, typename BodyType>
104 struct node_creator_t {
operator ()PriorityNodesTakePrecedence::node_creator_t105     NodeType* operator()( graph& g, unsigned index, utils::SpinBarrier& barrier ) {
106         if( start_index <= index && index < end_index )
107             return new NodeType( g, unlimited, BodyType(index, barrier), node_priority_t(index) );
108         else
109             return new NodeType( g, unlimited, BodyType(no_priority, barrier) );
110     }
111 };
112 
113 template<typename BodyType>
114 struct node_creator_t< continue_node<continue_msg>, BodyType > {
operator ()PriorityNodesTakePrecedence::node_creator_t115     continue_node<continue_msg>* operator()( graph& g, unsigned index, utils::SpinBarrier& barrier ) {
116         if( start_index <= index && index < end_index )
117             return new continue_node<continue_msg>( g, BodyType(index, barrier), node_priority_t(index) );
118         else
119             return new continue_node<continue_msg>( g, BodyType(no_priority, barrier) );
120     }
121 };
122 
123 
124 struct passthru_body {
125     template<typename T>
operator ()PriorityNodesTakePrecedence::passthru_body126     continue_msg operator()( T ) const { return continue_msg(); }
127 };
128 
129 template<typename NodeType, typename NodeTypeCreator>
test_node(NodeTypeCreator node_creator)130 void test_node( NodeTypeCreator node_creator ) {
131     const int num_threads = tbb::this_task_arena::max_concurrency();
132     utils::SpinBarrier barrier( num_threads );
133     graph g;
134     broadcast_node<typename NodeType::input_type> bn(g);
135     function_node<typename NodeType::input_type> tn(g, unlimited, passthru_body());
136     // Using pointers to nodes to avoid errors on compilers, which try to generate assignment
137     // operator for the nodes
138     std::vector< std::unique_ptr<NodeType> > nodes;
139     for( unsigned i = 0; i < node_num; ++i ) {
140         nodes.push_back(std::unique_ptr<NodeType>( node_creator(g, i, barrier) ));
141         make_edge( bn, *nodes.back() );
142         make_edge( *nodes.back(), tn );
143     }
144 
145     const size_t repeats = 10;
146     const size_t priority_nodes_num = end_index - start_index;
147     size_t global_order_failures = 0;
148     for( size_t repeat = 0; repeat < repeats; ++repeat ) {
149         g_work_submitted.store( false, std::memory_order_release );
150         g_task_num = g_priority_task_index = 0;
151         g_task_info.clear(); g_task_info.resize( priority_nodes_num );
152 
153         bn.try_put( typename NodeType::input_type{} );
154         // Setting of the flag is based on the knowledge that the calling thread broadcasts the
155         // message to successor nodes. Thus, once the calling thread returns from try_put() call all
156         // necessary tasks are spawned. Thus, this makes this test to be a whitebox test to some
157         // extent.
158         g_work_submitted.store( true, std::memory_order_release );
159 
160         g.wait_for_all();
161 
162         CHECK_MESSAGE( (g_priority_task_index == g_task_info.size()), "Incorrect number of tasks with priority." );
163         CHECK_MESSAGE( (priority_nodes_num == g_task_info.size()), "Incorrect number of tasks with priority executed." );
164 
165         for( unsigned i = 0; i < g_priority_task_index; i += num_threads ) {
166             bool found = false;
167             unsigned highest_priority_within_group = end_index - i - 1;
168             for( unsigned j = i; j < i+num_threads; ++j ) {
169                 if( g_task_info[j].my_priority == int(highest_priority_within_group) ) {
170                     found = true;
171                     break;
172                 }
173             }
174             CHECK_MESSAGE( found, "Highest priority task within a group was not found" );
175         }
176         for( unsigned i = 0; i < g_priority_task_index; ++i ) {
177             // This check might fail because priorities do not guarantee ordering, i.e. assumption
178             // that all priority nodes should increment the task counter before any subsequent
179             // no-priority node is not correct. In the worst case, a thread that took a priority
180             // node might be preempted and become the last to increment the counter. That's why the
181             // test passing is based on statistics, which could be affected by machine overload
182             // unfortunately.
183             // TODO revamp: reconsider the following check for this test
184             if( g_task_info[i].my_task_index > int(priority_nodes_num + num_threads) )
185                 ++global_order_failures;
186         }
187     }
188     float failure_ratio = float(global_order_failures) / float(repeats*priority_nodes_num);
189     CHECK_MESSAGE(
190         failure_ratio <= 0.1f,
191         "Nodes with priorities executed in wrong order too frequently over non-prioritized nodes."
192     );
193 }
194 
195 template<typename NodeType, typename NodeBody>
call_within_arena(tbb::task_arena & arena)196 void call_within_arena( tbb::task_arena& arena ) {
197     arena.execute(
198         [&]() {
199             test_node<NodeType>( node_creator_t<NodeType, NodeBody>() );
200         }
201     );
202 }
203 
test(int num_threads)204 void test( int num_threads ) {
205     INFO( "Testing execution of nodes with priority takes precedence (num_threads=" << num_threads << ") - " );
206     tbb::task_arena arena(num_threads);
207     call_within_arena< function_node<int,int>, Body<int> >( arena );
208     call_within_arena< multi_node, Body<int> >( arena );
209     call_within_arena< continue_node<continue_msg>, Body<continue_msg> >( arena );
210 }
211 
212 } /* namespace PriorityNodesTakePrecedence */
213 
214 namespace ThreadsEagerReaction {
215 
216 // TODO revamp: combine with similar queue from test_async_node
217 template <typename T>
218 class concurrent_queue {
219 public:
try_pop(T & item)220     bool try_pop(T& item) {
221         std::lock_guard<queue_mutex> lock(mutex);
222         if ( q.empty() )
223             return false;
224         item = q.front();
225         q.pop();
226         return true;
227     }
228 
push(const T & item)229     void push(const T& item) {
230         std::lock_guard<queue_mutex> lock(mutex);
231         q.push(item);
232     }
233 private:
234     std::queue<T> q;
235     using queue_mutex = std::mutex;
236     std::mutex mutex;
237 };
238 
239 using utils::SpinBarrier;
240 
241 enum task_type_t { no_task, regular_task, async_task };
242 
243 struct profile_t {
244     task_type_t task_type;
245     unsigned global_task_id;
246     double elapsed;
247 };
248 
249 std::vector<unsigned> g_async_task_ids;
250 
251 typedef unsigned data_type;
252 typedef async_node<data_type, data_type> async_node_type;
253 typedef multifunction_node<
254     data_type, std::tuple<data_type, data_type> > decider_node_type;
255 struct AsyncActivity {
256     typedef async_node_type::gateway_type gateway_type;
257 
258     struct work_type { data_type input; gateway_type* gateway; };
259     std::atomic<bool> done;
260     concurrent_queue<work_type> my_queue;
261     std::thread my_service_thread;
262 
263     struct ServiceThreadFunc {
264         SpinBarrier& my_barrier;
ServiceThreadFuncThreadsEagerReaction::AsyncActivity::ServiceThreadFunc265         ServiceThreadFunc(SpinBarrier& barrier) : my_barrier(barrier) {}
operator ()ThreadsEagerReaction::AsyncActivity::ServiceThreadFunc266         void operator()(AsyncActivity* activity) {
267             while (!activity->done) {
268                 work_type work;
269                 while (activity->my_queue.try_pop(work)) {
270                     g_async_task_ids.push_back( ++g_task_num );
271                     work.gateway->try_put(work.input);
272                     work.gateway->release_wait();
273                     my_barrier.wait();
274                 }
275             }
276         }
277     };
stop_and_waitThreadsEagerReaction::AsyncActivity278     void stop_and_wait() { done = true; my_service_thread.join(); }
279 
submitThreadsEagerReaction::AsyncActivity280     void submit(data_type input, gateway_type* gateway) {
281         work_type work = { input, gateway };
282         gateway->reserve_wait();
283         my_queue.push(work);
284     }
AsyncActivityThreadsEagerReaction::AsyncActivity285     AsyncActivity(SpinBarrier& barrier)
286         : done(false), my_service_thread(ServiceThreadFunc(barrier), this) {}
287 };
288 
289 struct StartBody {
290     bool has_run;
operator ()ThreadsEagerReaction::StartBody291     data_type operator()(tbb::flow_control& fc) {
292         if (has_run){
293             fc.stop();
294             return data_type();
295         }
296         has_run = true;
297         return 1;
298     }
StartBodyThreadsEagerReaction::StartBody299     StartBody() : has_run(false) {}
300 };
301 
302 struct ParallelForBody {
303     SpinBarrier& my_barrier;
304     const data_type& my_input;
ParallelForBodyThreadsEagerReaction::ParallelForBody305     ParallelForBody(SpinBarrier& barrier, const data_type& input)
306         : my_barrier(barrier), my_input(input) {}
operator ()ThreadsEagerReaction::ParallelForBody307     void operator()(const data_type&) const {
308         my_barrier.wait();
309         ++g_task_num;
310     }
311 };
312 
313 struct CpuWorkBody {
314     SpinBarrier& my_barrier;
315     const int my_tasks_count;
operator ()ThreadsEagerReaction::CpuWorkBody316     data_type operator()(const data_type& input) {
317         tbb::parallel_for(0, my_tasks_count, ParallelForBody(my_barrier, input), tbb::simple_partitioner());
318         return input;
319     }
CpuWorkBodyThreadsEagerReaction::CpuWorkBody320     CpuWorkBody(SpinBarrier& barrier, int tasks_count)
321         : my_barrier(barrier), my_tasks_count(tasks_count) {}
322 };
323 
324 struct DeciderBody {
325     const data_type my_limit;
DeciderBodyThreadsEagerReaction::DeciderBody326     DeciderBody( const data_type& limit ) : my_limit( limit ) {}
operator ()ThreadsEagerReaction::DeciderBody327     void operator()(data_type input, decider_node_type::output_ports_type& ports) {
328         if (input < my_limit)
329             std::get<0>(ports).try_put(input + 1);
330     }
331 };
332 
333 struct AsyncSubmissionBody {
334     AsyncActivity* my_activity;
335     // It is important that async_node in the test executes without spawning a TBB task, because
336     // it passes the work to asynchronous thread, which unlocks the barrier that is waited
337     // by every execution thread (asynchronous thread and any TBB worker or main thread).
338     // This is why async_node's body marked noexcept.
operator ()ThreadsEagerReaction::AsyncSubmissionBody339     void operator()(data_type input, async_node_type::gateway_type& gateway) noexcept {
340         my_activity->submit(input, &gateway);
341     }
AsyncSubmissionBodyThreadsEagerReaction::AsyncSubmissionBody342     AsyncSubmissionBody(AsyncActivity* activity) : my_activity(activity) {}
343 };
344 
test(unsigned num_threads)345 void test( unsigned num_threads ) {
346     INFO( "Testing threads react eagerly on asynchronous tasks (num_threads=" << num_threads << ") - " );
347     if( num_threads == std::thread::hardware_concurrency() ) {
348         // one thread is required for asynchronous compute resource
349         INFO("skipping test since it is designed to work on less number of threads than "
350              "hardware concurrency allows\n");
351         return;
352     }
353     const unsigned cpu_threads = unsigned(num_threads);
354     const unsigned cpu_tasks_per_thread = 4;
355     const unsigned nested_cpu_tasks = cpu_tasks_per_thread * cpu_threads;
356     const unsigned async_subgraph_reruns = 8;
357     const unsigned cpu_subgraph_reruns = 2;
358 
359     SpinBarrier barrier(cpu_threads + /*async thread=*/1);
360     g_task_num = 0;
361     g_async_task_ids.clear();
362     g_async_task_ids.reserve(async_subgraph_reruns);
363 
364     tbb::task_arena arena(cpu_threads);
365 	arena.execute(
366         [&]() {
367             AsyncActivity activity(barrier);
368             graph g;
369 
370             input_node<data_type> starter_node(g, StartBody());
371             function_node<data_type, data_type> cpu_work_node(
372                 g, unlimited, CpuWorkBody(barrier, nested_cpu_tasks));
373             decider_node_type cpu_restarter_node(g, unlimited, DeciderBody(cpu_subgraph_reruns));
374             async_node_type async_node(g, unlimited, AsyncSubmissionBody(&activity));
375             decider_node_type async_restarter_node(
376                 g, unlimited, DeciderBody(async_subgraph_reruns), node_priority_t(1)
377             );
378 
379             make_edge(starter_node, cpu_work_node);
380             make_edge(cpu_work_node, cpu_restarter_node);
381             make_edge(output_port<0>(cpu_restarter_node), cpu_work_node);
382 
383             make_edge(starter_node, async_node);
384             make_edge(async_node, async_restarter_node);
385             make_edge(output_port<0>(async_restarter_node), async_node);
386 
387             starter_node.activate();
388             g.wait_for_all();
389             activity.stop_and_wait();
390 
391             const size_t async_task_num = size_t(async_subgraph_reruns);
392             CHECK_MESSAGE( ( g_async_task_ids.size() == async_task_num), "Incorrect number of async tasks." );
393             unsigned max_span = unsigned(2 * cpu_threads + 1);
394             for( size_t idx = 1; idx < async_task_num; ++idx ) {
395                 CHECK_MESSAGE( (g_async_task_ids[idx] - g_async_task_ids[idx-1] <= max_span),
396                                "Async tasks were not able to interfere with CPU tasks." );
397 
398             }
399         }
400     );
401     INFO("done\n");
402 }
403 } /* ThreadsEagerReaction */
404 
405 namespace LimitingExecutionToPriorityTask {
406 
407 enum work_type_t { NONPRIORITIZED_WORK, PRIORITIZED_WORK };
408 
409 struct execution_tracker_t {
execution_tracker_tLimitingExecutionToPriorityTask::execution_tracker_t410     execution_tracker_t() { reset(); }
resetLimitingExecutionToPriorityTask::execution_tracker_t411     void reset() {
412         prioritized_work_submitter = std::thread::id();
413         prioritized_work_started = false;
414         prioritized_work_finished = false;
415         prioritized_work_interrupted = false;
416     }
417     std::thread::id prioritized_work_submitter;
418     std::atomic<bool> prioritized_work_started;
419     bool prioritized_work_finished;
420     bool prioritized_work_interrupted;
421 } exec_tracker;
422 
423 template<work_type_t work_type>
424 void do_node_work( int work_size );
425 
426 template<work_type_t>
427 void do_nested_work( const std::thread::id& tid, const tbb::blocked_range<int>& subrange );
428 
429 template<work_type_t work_type>
430 struct CommonBody {
CommonBodyLimitingExecutionToPriorityTask::CommonBody431     CommonBody() : my_body_size( 0 ) { }
CommonBodyLimitingExecutionToPriorityTask::CommonBody432     CommonBody( int body_size ) : my_body_size( body_size ) { }
operator ()LimitingExecutionToPriorityTask::CommonBody433     continue_msg operator()( const continue_msg& msg ) const {
434         do_node_work<work_type>(my_body_size);
435         return msg;
436     }
operator ()LimitingExecutionToPriorityTask::CommonBody437     void operator()( const tbb::blocked_range<int>& subrange ) const {
438         do_nested_work<work_type>( /*tid=*/std::this_thread::get_id(), subrange );
439     }
440     int my_body_size;
441 };
442 
443 template<work_type_t work_type>
do_node_work(int work_size)444 void do_node_work(int work_size) {
445     tbb::parallel_for( tbb::blocked_range<int>(0, work_size), CommonBody<work_type>(),
446                        tbb::simple_partitioner() );
447 }
448 
449 template<work_type_t>
do_nested_work(const std::thread::id & tid,const tbb::blocked_range<int> &)450 void do_nested_work( const std::thread::id& tid, const tbb::blocked_range<int>& /*subrange*/ ) {
451     // This is non-prioritized work...
452     if( !exec_tracker.prioritized_work_started || exec_tracker.prioritized_work_submitter != tid )
453         return;
454     // ...being executed by the thread that initially started prioritized one...
455     CHECK_MESSAGE( exec_tracker.prioritized_work_started,
456                    "Prioritized work should have been started by that time." );
457     // ...prioritized work has been started already...
458     if( exec_tracker.prioritized_work_finished )
459         return;
460     // ...but has not been finished yet
461     exec_tracker.prioritized_work_interrupted = true;
462 }
463 
464 struct IsolationFunctor {
465     int work_size;
IsolationFunctorLimitingExecutionToPriorityTask::IsolationFunctor466     IsolationFunctor(int ws) : work_size(ws) {}
operator ()LimitingExecutionToPriorityTask::IsolationFunctor467     void operator()() const {
468         tbb::parallel_for( tbb::blocked_range<int>(0, work_size), CommonBody<PRIORITIZED_WORK>(),
469                            tbb::simple_partitioner() );
470     }
471 };
472 
473 template<>
do_node_work(int work_size)474 void do_node_work<PRIORITIZED_WORK>(int work_size) {
475     exec_tracker.prioritized_work_submitter = std::this_thread::get_id();
476     exec_tracker.prioritized_work_started = true;
477     tbb::this_task_arena::isolate( IsolationFunctor(work_size) );
478     exec_tracker.prioritized_work_finished = true;
479 }
480 
481 template<>
do_nested_work(const std::thread::id & tid,const tbb::blocked_range<int> &)482 void do_nested_work<PRIORITIZED_WORK>( const std::thread::id& tid,
483                                        const tbb::blocked_range<int>& /*subrange*/ ) {
484     if( exec_tracker.prioritized_work_started && exec_tracker.prioritized_work_submitter == tid ) {
485         CHECK_MESSAGE( !exec_tracker.prioritized_work_interrupted,
486                        "Thread was not fully devoted to processing of prioritized task." );
487     } else {
488         // prolong processing of prioritized work so that the thread that started
489         // prioritized work has higher probability to help with non-prioritized one.
490         spin_for(0.1);
491     }
492 }
493 
494 // Using pointers to nodes to avoid errors on compilers, which try to generate assignment operator
495 // for the nodes
496 typedef std::vector< std::unique_ptr<continue_node<continue_msg>> > nodes_container_t;
497 
create_nodes(nodes_container_t & nodes,graph & g,int num,int body_size)498 void create_nodes( nodes_container_t& nodes, graph& g, int num, int body_size ) {
499     for( int i = 0; i < num; ++i )
500         nodes.push_back(
501             std::unique_ptr<continue_node<continue_msg>>(
502                 new continue_node<continue_msg>( g, CommonBody<NONPRIORITIZED_WORK>( body_size ) )
503             )
504         );
505 }
506 
test(int num_threads)507 void test( int num_threads ) {
508     INFO( "Testing limit execution to priority tasks (num_threads=" << num_threads << ") - " );
509 
510     tbb::task_arena arena( num_threads );
511 	arena.execute(
512         [&]() {
513             const int nodes_num = 100;
514             const int priority_node_position_part = 10;
515             const int pivot = nodes_num / priority_node_position_part;
516             const int nodes_in_lane = 3 * num_threads;
517             const int small_problem_size = 100;
518             const int large_problem_size = 1000;
519 
520             graph g;
521             nodes_container_t nodes;
522             create_nodes( nodes, g, pivot, large_problem_size );
523             nodes.push_back(
524                 std::unique_ptr<continue_node<continue_msg>>(
525                     new continue_node<continue_msg>(
526                         g, CommonBody<PRIORITIZED_WORK>(small_problem_size), node_priority_t(1)
527                     )
528                 )
529             );
530             create_nodes( nodes, g, nodes_num - pivot - 1, large_problem_size );
531 
532             broadcast_node<continue_msg> bn(g);
533             for( int i = 0; i < nodes_num; ++i )
534                 if( i % nodes_in_lane == 0 )
535                     make_edge( bn, *nodes[i] );
536                 else
537                     make_edge( *nodes[i-1], *nodes[i] );
538             exec_tracker.reset();
539             bn.try_put( continue_msg() );
540             g.wait_for_all();
541         }
542 	);
543 
544     INFO( "done\n" );
545 }
546 
547 } /* namespace LimitingExecutionToPriorityTask */
548 
549 namespace NestedCase {
550 
551 using tbb::task_arena;
552 
553 struct InnerBody {
operator ()NestedCase::InnerBody554     continue_msg operator()( const continue_msg& ) const {
555         return continue_msg();
556     }
557 };
558 
559 struct OuterBody {
560     int my_max_threads;
561     task_arena** my_inner_arena;
OuterBodyNestedCase::OuterBody562     OuterBody( int max_threads, task_arena** inner_arena )
563         : my_max_threads(max_threads), my_inner_arena(inner_arena) {}
564     // copy constructor to please some old compilers
OuterBodyNestedCase::OuterBody565     OuterBody( const OuterBody& rhs )
566         : my_max_threads(rhs.my_max_threads), my_inner_arena(rhs.my_inner_arena) {}
operator ()NestedCase::OuterBody567     int operator()( const int& ) {
568         graph inner_graph;
569         continue_node<continue_msg> start_node(inner_graph, InnerBody());
570         continue_node<continue_msg> mid_node1(inner_graph, InnerBody(), node_priority_t(5));
571         continue_node<continue_msg> mid_node2(inner_graph, InnerBody());
572         continue_node<continue_msg> end_node(inner_graph, InnerBody(), node_priority_t(15));
573         make_edge( start_node, mid_node1 );
574         make_edge( mid_node1, end_node );
575         make_edge( start_node, mid_node2 );
576         make_edge( mid_node2, end_node );
577         (*my_inner_arena)->execute( [&inner_graph]{ inner_graph.reset(); } );
578         start_node.try_put( continue_msg() );
579         inner_graph.wait_for_all();
580         return 13;
581     }
582 };
583 
execute_outer_graph(bool same_arena,task_arena & inner_arena,int max_threads,graph & outer_graph,function_node<int,int> & start_node)584 void execute_outer_graph( bool same_arena, task_arena& inner_arena, int max_threads,
585                           graph& outer_graph, function_node<int,int>& start_node ) {
586     if( same_arena ) {
587         start_node.try_put( 42 );
588         outer_graph.wait_for_all();
589         return;
590     }
591 
592     auto threads_range = utils::concurrency_range(max_threads);
593     for( auto num_threads : threads_range ) {
594         inner_arena.initialize( static_cast<int>(num_threads) );
595         start_node.try_put( 42 );
596         outer_graph.wait_for_all();
597         inner_arena.terminate();
598     }
599 }
600 
test_in_arena(int max_threads,task_arena & outer_arena,task_arena & inner_arena,graph & outer_graph,function_node<int,int> & start_node)601 void test_in_arena( int max_threads, task_arena& outer_arena, task_arena& inner_arena,
602                     graph& outer_graph, function_node<int, int>& start_node ) {
603     bool same_arena = &outer_arena == &inner_arena;
604     auto threads_range = utils::concurrency_range(max_threads);
605     for( auto num_threads : threads_range ) {
606         INFO( "Testing nested nodes with specified priority in " << (same_arena? "same" : "different")
607               << " arenas, num_threads=" << num_threads << ") - " );
608         outer_arena.initialize( static_cast<int>(num_threads) );
609         outer_arena.execute( [&outer_graph]{ outer_graph.reset(); } );
610         execute_outer_graph( same_arena, inner_arena, max_threads, outer_graph, start_node );
611         outer_arena.terminate();
612         INFO( "done\n" );
613     }
614 }
615 
test(int max_threads)616 void test( int max_threads ) {
617     task_arena outer_arena; task_arena inner_arena;
618     task_arena* inner_arena_pointer = &outer_arena; // make it same as outer arena in the beginning
619 
620     graph outer_graph;
621     const unsigned num_outer_nodes = 10;
622     const size_t concurrency = unlimited;
623     std::vector< std::unique_ptr<function_node<int,int>> > outer_nodes;
624     for( unsigned node_index = 0; node_index < num_outer_nodes; ++node_index ) {
625         node_priority_t priority = no_priority;
626         if( node_index == num_outer_nodes / 2 )
627             priority = 10;
628 
629         outer_nodes.push_back(
630             std::unique_ptr< function_node<int, int> >(
631                 new function_node<int,int>(
632                     outer_graph, concurrency, OuterBody(max_threads, &inner_arena_pointer), priority
633                 )
634             )
635         );
636     }
637 
638     for( unsigned node_index1 = 0; node_index1 < num_outer_nodes; ++node_index1 )
639         for( unsigned node_index2 = node_index1+1; node_index2 < num_outer_nodes; ++node_index2 )
640             make_edge( *outer_nodes[node_index1], *outer_nodes[node_index2] );
641 
642     test_in_arena( max_threads, outer_arena, outer_arena, outer_graph, *outer_nodes[0] );
643 
644     inner_arena_pointer = &inner_arena;
645 
646     test_in_arena( max_threads, outer_arena, inner_arena, outer_graph, *outer_nodes[0] );
647 }
648 } // namespace NestedCase
649 
650 
651 namespace BypassPrioritizedTask {
652 
common_body(int priority)653 void common_body( int priority ) {
654     int current_task_index = g_task_num++;
655     g_task_info.push_back( TaskInfo( priority, current_task_index ) );
656 }
657 
658 struct Body {
BodyBypassPrioritizedTask::Body659     Body( int priority ) : my_priority( priority ) {}
operator ()BypassPrioritizedTask::Body660     continue_msg operator()(const continue_msg&) {
661         common_body( my_priority );
662         return continue_msg();
663     }
664     int my_priority;
665 };
666 
667 struct InputNodeBody {
operator ()BypassPrioritizedTask::InputNodeBody668     continue_msg operator()( tbb::flow_control& fc ){
669         static bool is_source_executed = false;
670 
671         if( is_source_executed ) {
672             fc.stop();
673             return continue_msg();
674         }
675 
676         common_body( 0 );
677         is_source_executed = true;
678 
679         return continue_msg();
680     }
681 };
682 
683 template<typename StarterNodeType>
create_starter_node(graph & g)684 StarterNodeType create_starter_node(graph& g) {
685     return continue_node<continue_msg>( g, Body(0) );
686 }
687 
688 template<>
create_starter_node(graph & g)689 input_node<continue_msg> create_starter_node<input_node<continue_msg>>(graph& g) {
690     return input_node<continue_msg>( g, InputNodeBody() );
691 }
692 
693 template<typename StarterNodeType>
start_graph(StarterNodeType & starter)694 void start_graph( StarterNodeType& starter ) {
695     starter.try_put( continue_msg() );
696 }
697 
698 template<>
start_graph(input_node<continue_msg> & starter)699 void start_graph<input_node<continue_msg>>( input_node<continue_msg>& starter ) {
700     starter.activate();
701 }
702 
703 template<typename StarterNodeType>
test_use_case()704 void test_use_case() {
705     g_task_info.clear();
706     g_task_num = 0;
707     graph g;
708     StarterNodeType starter = create_starter_node<StarterNodeType>(g);
709     continue_node<continue_msg> spawn_successor( g, Body(1), node_priority_t(1) );
710     continue_node<continue_msg> bypass_successor( g, Body(2), node_priority_t(2) );
711 
712     make_edge( starter, spawn_successor );
713     make_edge( starter, bypass_successor );
714 
715     start_graph<StarterNodeType>( starter );
716     g.wait_for_all();
717 
718     CHECK_MESSAGE( g_task_info.size() == 3, "" );
719     CHECK_MESSAGE( g_task_info[0].my_task_index == 0, "" );
720     CHECK_MESSAGE( g_task_info[1].my_task_index == 1, "" );
721     CHECK_MESSAGE( g_task_info[2].my_task_index == 2, "" );
722 
723     CHECK_MESSAGE( g_task_info[0].my_priority == 0, "" );
724     CHECK_MESSAGE( g_task_info[1].my_priority == 2, "Bypassed task with higher priority executed in wrong order." );
725     CHECK_MESSAGE( g_task_info[2].my_priority == 1, "" );
726 }
727 
728 //! The test checks that the task from the node with higher priority, which task gets bypassed, is
729 //! executed first than the one spawned with lower priority.
test()730 void test() {
731     test_use_case<continue_node<continue_msg>>();
732     test_use_case<input_node<continue_msg>>();
733 }
734 
735 } // namespace BypassPrioritizedTask
736 
737 namespace ManySuccessors {
738 
739 struct no_priority_node_body {
operator ()ManySuccessors::no_priority_node_body740     void operator()(continue_msg) {
741         CHECK_MESSAGE(
742             barrier == 0, "Non-priority successor has to be executed after all priority successors"
743         );
744     }
745     std::atomic<int>& barrier;
746 };
747 
748 struct priority_node_body {
operator ()ManySuccessors::priority_node_body749     void operator()(continue_msg) {
750         --barrier;
751         while (barrier)
752             tbb::detail::d0::yield();
753     }
754     std::atomic<int>& barrier;
755 };
756 
test(int num_threads)757 void test(int num_threads) {
758     tbb::task_arena arena( num_threads );
759     arena.execute(
760         [&]() {
761             graph g;
762             broadcast_node<continue_msg> bn(g);
763             std::vector< std::unique_ptr<continue_node<continue_msg>> > nodes;
764             std::atomic<int> barrier;
765             for (int i = 0; i < 2 * num_threads; ++i)
766                 nodes.push_back(
767                     std::unique_ptr<continue_node<continue_msg>>(
768                         new continue_node<continue_msg>(g, no_priority_node_body{ barrier })
769                     )
770                 );
771             for (int i = 0; i < num_threads; ++i)
772                 nodes.push_back(
773                     std::unique_ptr<continue_node<continue_msg>>(
774                         new continue_node<continue_msg>(g, priority_node_body{ barrier }, /*priority*/1)
775                     )
776                 );
777 
778             std::random_device rd;
779             std::mt19937 gen(rd());
780 
781             for (int trial = 0; trial < 10; ++trial) {
782                 barrier = num_threads;
783                 std::shuffle(nodes.begin(), nodes.end(), gen);
784                 for (auto& n : nodes)
785                     make_edge(bn, *n);
786                 bn.try_put(continue_msg());
787                 g.wait_for_all();
788                 for (auto& n : nodes)
789                     remove_edge(bn, *n);
790             }
791         }
792     );
793 }
794 
795 } // namespace ManySuccessors
796 
797 #if TBB_USE_EXCEPTIONS
798 namespace Exceptions {
test()799     void test() {
800         using namespace tbb::flow;
801         graph g;
802         std::srand(42);
803         const unsigned num_messages = 50;
804         std::vector<unsigned> throwing_msgs;
805         std::atomic<unsigned> msg_count(0);
806         continue_node<unsigned> c(g, [&msg_count](continue_msg) {
807             return ++msg_count;
808         }, 2);
809         function_node<unsigned> f(g, unlimited, [&throwing_msgs](unsigned v) {
810             for( auto i : throwing_msgs ) {
811                 if( i == v )
812                     throw std::runtime_error("Exception::test");
813             }
814         }, 1);
815         make_edge(c, f);
816         for (int i = 0; i < 10; ++i) {
817             msg_count = 0;
818             g.reset();
819             throwing_msgs.push_back(std::rand() % num_messages);
820             try {
821                 for (unsigned j = 0; j < num_messages; ++j) {
822                     c.try_put(continue_msg());
823                 }
824                 g.wait_for_all();
825                 FAIL("Unreachable code. The exception is expected");
826             } catch (std::runtime_error&) {
827                 CHECK(g.is_cancelled());
828                 CHECK(g.exception_thrown());
829             } catch (...) {
830                 FAIL("Unexpected exception");
831             }
832         }
833     }
834 } // namespace Exceptions
835 #endif
836 
837 //! Test node prioritization
838 //! \brief \ref requirement
839 TEST_CASE("Priority nodes take precedence"){
840     for( auto p : utils::concurrency_range() ) {
841         PriorityNodesTakePrecedence::test( static_cast<int>(p) );
842     }
843 }
844 
845 //! Test thread eager reaction
846 //! \brief \ref error_guessing
847 TEST_CASE("Thread eager reaction"){
848     for( auto p : utils::concurrency_range() ) {
849         ThreadsEagerReaction::test( static_cast<int>(p) );
850     }
851 }
852 
853 //! Test prioritization under concurrency limits
854 //! \brief \ref error_guessing
855 TEST_CASE("Limiting execution to prioritized work") {
856     for( auto p : utils::concurrency_range() ) {
857         LimitingExecutionToPriorityTask::test( static_cast<int>(p) );
858     }
859 }
860 
861 //! Test nested graphs
862 //! \brief \ref error_guessing
863 TEST_CASE("Nested test case") {
864     std::size_t max_threads = utils::get_platform_max_threads();
865     // The stepping for the threads is done inside.
866     NestedCase::test( static_cast<int>(max_threads) );
867 }
868 
869 //! Test bypassed task with higher priority
870 //! \brief \ref error_guessing
871 TEST_CASE("Bypass prioritized task"){
872     tbb::global_control gc( tbb::global_control::max_allowed_parallelism, 1 );
873     BypassPrioritizedTask::test();
874 }
875 
876 //! Test mixing prioritized and ordinary successors
877 //! \brief \ref error_guessing
878 TEST_CASE("Many successors") {
879     for( auto p : utils::concurrency_range() ) {
880         ManySuccessors::test( static_cast<int>(p) );
881     }
882 }
883 
884 #if TBB_USE_EXCEPTIONS
885 //! Test for exceptions
886 //! \brief \ref error_guessing
887 TEST_CASE("Exceptions") {
888     Exceptions::test();
889 }
890 #endif
891