1 /*
2     Copyright (c) 2018-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #if __INTEL_COMPILER && _MSC_VER
18 #pragma warning(disable : 2586) // decorated name length exceeded, name was truncated
19 #endif
20 
21 #include "common/config.h"
22 
23 #include "tbb/flow_graph.h"
24 #include "tbb/parallel_for.h"
25 #include "tbb/global_control.h"
26 #include "tbb/task_arena.h"
27 
28 #include "common/test.h"
29 #include "common/utils.h"
30 #include "common/utils_concurrency_limit.h"
31 #include "common/spin_barrier.h"
32 
33 #include <vector>
34 #include <cstdlib>
35 #include <random>
36 #include <algorithm>
37 #include <memory>
38 
39 
40 //! \file test_flow_graph_priorities.cpp
41 //! \brief Test for [flow_graph.copy_body flow_graph.function_node flow_graph.multifunction_node flow_graph.continue_node flow_graph.async_node] specification
42 
43 
44 using namespace tbb::flow;
45 
46 struct TaskInfo {
47     TaskInfo() : my_priority(-1), my_task_index(-1) {}
48     TaskInfo( int priority, int task_index )
49         : my_priority(priority), my_task_index(task_index) {}
50     int my_priority;
51     int my_task_index;
52 };
53 
54 std::vector<TaskInfo> g_task_info;
55 
56 std::atomic<unsigned> g_task_num;
57 
58 void spin_for( double delta ) {
59     tbb::tick_count start = tbb::tick_count::now();
60     while( (tbb::tick_count::now() - start).seconds() < delta ) ;
61 }
62 
63 namespace PriorityNodesTakePrecedence {
64 
65 std::atomic<bool> g_work_submitted;
66 
67 const unsigned node_num = 100;
68 const unsigned start_index = node_num / 3;
69 const unsigned end_index = node_num * 2 / 3;
70 std::atomic<unsigned> g_priority_task_index;
71 
72 void body_func( int priority, utils::SpinBarrier& my_barrier ) {
73     while( !g_work_submitted.load(std::memory_order_acquire) )
74         tbb::detail::d0::yield();
75     int current_task_index = g_task_num++;
76     if( priority != no_priority )
77         g_task_info[g_priority_task_index++] = TaskInfo( priority, current_task_index );
78     const bool all_threads_will_come =
79         unsigned(current_task_index) < node_num - (node_num % tbb::this_task_arena::max_concurrency());
80     if( all_threads_will_come )
81         my_barrier.wait();
82 }
83 
84 typedef multifunction_node< int, std::tuple<int> > multi_node;
85 
86 template <typename T>
87 struct Body {
88     Body( int priority, utils::SpinBarrier& barrier )
89         : my_priority( priority ), my_barrier( barrier ) {}
90     T operator()( const T& msg ) const {
91         body_func( my_priority, my_barrier );
92         return msg;
93     }
94     void operator()( int msg, multi_node::output_ports_type& op ) const {
95         body_func( my_priority, my_barrier );
96         std::get<0>(op).try_put( msg );
97     }
98 private:
99     int my_priority;
100     utils::SpinBarrier& my_barrier;
101 };
102 
103 template<typename NodeType, typename BodyType>
104 struct node_creator_t {
105     NodeType* operator()( graph& g, unsigned index, utils::SpinBarrier& barrier ) {
106         if( start_index <= index && index < end_index )
107             return new NodeType( g, unlimited, BodyType(index, barrier), node_priority_t(index) );
108         else
109             return new NodeType( g, unlimited, BodyType(no_priority, barrier) );
110     }
111 };
112 
113 template<typename BodyType>
114 struct node_creator_t< continue_node<continue_msg>, BodyType > {
115     continue_node<continue_msg>* operator()( graph& g, unsigned index, utils::SpinBarrier& barrier ) {
116         if( start_index <= index && index < end_index )
117             return new continue_node<continue_msg>( g, BodyType(index, barrier), node_priority_t(index) );
118         else
119             return new continue_node<continue_msg>( g, BodyType(no_priority, barrier) );
120     }
121 };
122 
123 
124 struct passthru_body {
125     template<typename T>
126     continue_msg operator()( T ) const { return continue_msg(); }
127 };
128 
129 template<typename NodeType, typename NodeTypeCreator>
130 void test_node( NodeTypeCreator node_creator ) {
131     const int num_threads = tbb::this_task_arena::max_concurrency();
132     utils::SpinBarrier barrier( num_threads );
133     graph g;
134     broadcast_node<typename NodeType::input_type> bn(g);
135     function_node<typename NodeType::input_type> tn(g, unlimited, passthru_body());
136     // Using pointers to nodes to avoid errors on compilers, which try to generate assignment
137     // operator for the nodes
138     std::vector< std::unique_ptr<NodeType> > nodes;
139     for( unsigned i = 0; i < node_num; ++i ) {
140         nodes.push_back(std::unique_ptr<NodeType>( node_creator(g, i, barrier) ));
141         make_edge( bn, *nodes.back() );
142         make_edge( *nodes.back(), tn );
143     }
144 
145     const size_t repeats = 10;
146     const size_t priority_nodes_num = end_index - start_index;
147     size_t global_order_failures = 0;
148     for( size_t repeat = 0; repeat < repeats; ++repeat ) {
149         g_work_submitted.store( false, std::memory_order_release );
150         g_task_num = g_priority_task_index = 0;
151         g_task_info.clear(); g_task_info.resize( priority_nodes_num );
152 
153         bn.try_put( typename NodeType::input_type{} );
154         // Setting of the flag is based on the knowledge that the calling thread broadcasts the
155         // message to successor nodes. Thus, once the calling thread returns from try_put() call all
156         // necessary tasks are spawned. Thus, this makes this test to be a whitebox test to some
157         // extent.
158         g_work_submitted.store( true, std::memory_order_release );
159 
160         g.wait_for_all();
161 
162         CHECK_MESSAGE( (g_priority_task_index == g_task_info.size()), "Incorrect number of tasks with priority." );
163         CHECK_MESSAGE( (priority_nodes_num == g_task_info.size()), "Incorrect number of tasks with priority executed." );
164 
165         for( unsigned i = 0; i < g_priority_task_index; i += num_threads ) {
166             bool found = false;
167             unsigned highest_priority_within_group = end_index - i - 1;
168             for( unsigned j = i; j < i+num_threads; ++j ) {
169                 if( g_task_info[j].my_priority == int(highest_priority_within_group) ) {
170                     found = true;
171                     break;
172                 }
173             }
174             CHECK_MESSAGE( found, "Highest priority task within a group was not found" );
175         }
176         for( unsigned i = 0; i < g_priority_task_index; ++i ) {
177             // This check might fail because priorities do not guarantee ordering, i.e. assumption
178             // that all priority nodes should increment the task counter before any subsequent
179             // no-priority node is not correct. In the worst case, a thread that took a priority
180             // node might be preempted and become the last to increment the counter. That's why the
181             // test passing is based on statistics, which could be affected by machine overload
182             // unfortunately.
183             // TODO revamp: reconsider the following check for this test
184             if( g_task_info[i].my_task_index > int(priority_nodes_num + num_threads) )
185                 ++global_order_failures;
186         }
187     }
188     float failure_ratio = float(global_order_failures) / float(repeats*priority_nodes_num);
189     CHECK_MESSAGE(
190         failure_ratio <= 0.1f,
191         "Nodes with priorities executed in wrong order too frequently over non-prioritized nodes."
192     );
193 }
194 
195 template<typename NodeType, typename NodeBody>
196 void call_within_arena( tbb::task_arena& arena ) {
197     arena.execute(
198         [&]() {
199             test_node<NodeType>( node_creator_t<NodeType, NodeBody>() );
200         }
201     );
202 }
203 
204 void test( int num_threads ) {
205     INFO( "Testing execution of nodes with priority takes precedence (num_threads=" << num_threads << ") - " );
206     tbb::task_arena arena(num_threads);
207     call_within_arena< function_node<int,int>, Body<int> >( arena );
208     call_within_arena< multi_node, Body<int> >( arena );
209     call_within_arena< continue_node<continue_msg>, Body<continue_msg> >( arena );
210 }
211 
212 } /* namespace PriorityNodesTakePrecedence */
213 
214 namespace ThreadsEagerReaction {
215 
216 // TODO revamp: combine with similar queue from test_async_node
217 template <typename T>
218 class concurrent_queue {
219 public:
220     bool try_pop(T& item) {
221         std::lock_guard<queue_mutex> lock(mutex);
222         if ( q.empty() )
223             return false;
224         item = q.front();
225         q.pop();
226         return true;
227     }
228 
229     void push(const T& item) {
230         std::lock_guard<queue_mutex> lock(mutex);
231         q.push(item);
232     }
233 private:
234     std::queue<T> q;
235     using queue_mutex = std::mutex;
236     std::mutex mutex;
237 };
238 
239 using utils::SpinBarrier;
240 
241 enum task_type_t { no_task, regular_task, async_task };
242 
243 struct profile_t {
244     task_type_t task_type;
245     unsigned global_task_id;
246     double elapsed;
247 };
248 
249 std::vector<unsigned> g_async_task_ids;
250 
251 typedef unsigned data_type;
252 typedef async_node<data_type, data_type> async_node_type;
253 typedef multifunction_node<
254     data_type, std::tuple<data_type, data_type> > decider_node_type;
255 struct AsyncActivity {
256     typedef async_node_type::gateway_type gateway_type;
257 
258     struct work_type { data_type input; gateway_type* gateway; };
259     std::atomic<bool> done;
260     concurrent_queue<work_type> my_queue;
261     std::thread my_service_thread;
262 
263     struct ServiceThreadFunc {
264         SpinBarrier& my_barrier;
265         ServiceThreadFunc(SpinBarrier& barrier) : my_barrier(barrier) {}
266         void operator()(AsyncActivity* activity) {
267             while (!activity->done) {
268                 work_type work;
269                 while (activity->my_queue.try_pop(work)) {
270                     g_async_task_ids.push_back( ++g_task_num );
271                     work.gateway->try_put(work.input);
272                     work.gateway->release_wait();
273                     my_barrier.wait();
274                 }
275             }
276         }
277     };
278     void stop_and_wait() { done = true; my_service_thread.join(); }
279 
280     void submit(data_type input, gateway_type* gateway) {
281         work_type work = { input, gateway };
282         gateway->reserve_wait();
283         my_queue.push(work);
284     }
285     AsyncActivity(SpinBarrier& barrier)
286         : done(false), my_service_thread(ServiceThreadFunc(barrier), this) {}
287 };
288 
289 struct StartBody {
290     bool has_run;
291     data_type operator()(tbb::flow_control& fc) {
292         if (has_run){
293             fc.stop();
294             return data_type();
295         }
296         has_run = true;
297         return 1;
298     }
299     StartBody() : has_run(false) {}
300 };
301 
302 struct ParallelForBody {
303     SpinBarrier& my_barrier;
304     const data_type& my_input;
305     ParallelForBody(SpinBarrier& barrier, const data_type& input)
306         : my_barrier(barrier), my_input(input) {}
307     void operator()(const data_type&) const {
308         my_barrier.wait();
309         ++g_task_num;
310     }
311 };
312 
313 struct CpuWorkBody {
314     SpinBarrier& my_barrier;
315     const int my_tasks_count;
316     data_type operator()(const data_type& input) {
317         tbb::parallel_for(0, my_tasks_count, ParallelForBody(my_barrier, input), tbb::simple_partitioner());
318         return input;
319     }
320     CpuWorkBody(SpinBarrier& barrier, int tasks_count)
321         : my_barrier(barrier), my_tasks_count(tasks_count) {}
322 };
323 
324 struct DeciderBody {
325     const data_type my_limit;
326     DeciderBody( const data_type& limit ) : my_limit( limit ) {}
327     void operator()(data_type input, decider_node_type::output_ports_type& ports) {
328         if (input < my_limit)
329             std::get<0>(ports).try_put(input + 1);
330     }
331 };
332 
333 struct AsyncSubmissionBody {
334     AsyncActivity* my_activity;
335     void operator()(data_type input, async_node_type::gateway_type& gateway) {
336         my_activity->submit(input, &gateway);
337     }
338     AsyncSubmissionBody(AsyncActivity* activity) : my_activity(activity) {}
339 };
340 
341 void test( unsigned num_threads ) {
342     INFO( "Testing threads react eagerly on asynchronous tasks (num_threads=" << num_threads << ") - " );
343     if( num_threads == std::thread::hardware_concurrency() ) {
344         // one thread is required for asynchronous compute resource
345         INFO("skipping test since it is designed to work on less number of threads than "
346              "hardware concurrency allows\n");
347         return;
348     }
349     const unsigned cpu_threads = unsigned(num_threads);
350     const unsigned cpu_tasks_per_thread = 4;
351     const unsigned nested_cpu_tasks = cpu_tasks_per_thread * cpu_threads;
352     const unsigned async_subgraph_reruns = 8;
353     const unsigned cpu_subgraph_reruns = 2;
354 
355     SpinBarrier barrier(cpu_threads + /*async thread=*/1);
356     g_task_num = 0;
357     g_async_task_ids.clear();
358     g_async_task_ids.reserve(async_subgraph_reruns);
359 
360     tbb::task_arena arena(cpu_threads);
361 	arena.execute(
362         [&]() {
363             AsyncActivity activity(barrier);
364             graph g;
365 
366             input_node<data_type> starter_node(g, StartBody());
367             function_node<data_type, data_type> cpu_work_node(
368                 g, unlimited, CpuWorkBody(barrier, nested_cpu_tasks));
369             decider_node_type cpu_restarter_node(g, unlimited, DeciderBody(cpu_subgraph_reruns));
370             async_node_type async_node(g, unlimited, AsyncSubmissionBody(&activity));
371             decider_node_type async_restarter_node(
372                 g, unlimited, DeciderBody(async_subgraph_reruns), node_priority_t(1)
373             );
374 
375             make_edge(starter_node, cpu_work_node);
376             make_edge(cpu_work_node, cpu_restarter_node);
377             make_edge(output_port<0>(cpu_restarter_node), cpu_work_node);
378 
379             make_edge(starter_node, async_node);
380             make_edge(async_node, async_restarter_node);
381             make_edge(output_port<0>(async_restarter_node), async_node);
382 
383             starter_node.activate();
384             g.wait_for_all();
385             activity.stop_and_wait();
386 
387             const size_t async_task_num = size_t(async_subgraph_reruns);
388             CHECK_MESSAGE( ( g_async_task_ids.size() == async_task_num), "Incorrect number of async tasks." );
389             unsigned max_span = unsigned(2 * cpu_threads + 1);
390             for( size_t idx = 1; idx < async_task_num; ++idx ) {
391                 CHECK_MESSAGE( (g_async_task_ids[idx] - g_async_task_ids[idx-1] <= max_span),
392                                "Async tasks were not able to interfere with CPU tasks." );
393 
394             }
395         }
396     );
397     INFO("done\n");
398 }
399 } /* ThreadsEagerReaction */
400 
401 namespace LimitingExecutionToPriorityTask {
402 
403 enum work_type_t { NONPRIORITIZED_WORK, PRIORITIZED_WORK };
404 
405 struct execution_tracker_t {
406     execution_tracker_t() { reset(); }
407     void reset() {
408         prioritized_work_submitter = std::thread::id();
409         prioritized_work_started = false;
410         prioritized_work_finished = false;
411         prioritized_work_interrupted = false;
412     }
413     std::thread::id prioritized_work_submitter;
414     std::atomic<bool> prioritized_work_started;
415     bool prioritized_work_finished;
416     bool prioritized_work_interrupted;
417 } exec_tracker;
418 
419 template<work_type_t work_type>
420 void do_node_work( int work_size );
421 
422 template<work_type_t>
423 void do_nested_work( const std::thread::id& tid, const tbb::blocked_range<int>& subrange );
424 
425 template<work_type_t work_type>
426 struct CommonBody {
427     CommonBody() : my_body_size( 0 ) { }
428     CommonBody( int body_size ) : my_body_size( body_size ) { }
429     continue_msg operator()( const continue_msg& msg ) const {
430         do_node_work<work_type>(my_body_size);
431         return msg;
432     }
433     void operator()( const tbb::blocked_range<int>& subrange ) const {
434         do_nested_work<work_type>( /*tid=*/std::this_thread::get_id(), subrange );
435     }
436     int my_body_size;
437 };
438 
439 template<work_type_t work_type>
440 void do_node_work(int work_size) {
441     tbb::parallel_for( tbb::blocked_range<int>(0, work_size), CommonBody<work_type>(),
442                        tbb::simple_partitioner() );
443 }
444 
445 template<work_type_t>
446 void do_nested_work( const std::thread::id& tid, const tbb::blocked_range<int>& /*subrange*/ ) {
447     // This is non-prioritized work...
448     if( !exec_tracker.prioritized_work_started || exec_tracker.prioritized_work_submitter != tid )
449         return;
450     // ...being executed by the thread that initially started prioritized one...
451     CHECK_MESSAGE( exec_tracker.prioritized_work_started,
452                    "Prioritized work should have been started by that time." );
453     // ...prioritized work has been started already...
454     if( exec_tracker.prioritized_work_finished )
455         return;
456     // ...but has not been finished yet
457     exec_tracker.prioritized_work_interrupted = true;
458 }
459 
460 struct IsolationFunctor {
461     int work_size;
462     IsolationFunctor(int ws) : work_size(ws) {}
463     void operator()() const {
464         tbb::parallel_for( tbb::blocked_range<int>(0, work_size), CommonBody<PRIORITIZED_WORK>(),
465                            tbb::simple_partitioner() );
466     }
467 };
468 
469 template<>
470 void do_node_work<PRIORITIZED_WORK>(int work_size) {
471     exec_tracker.prioritized_work_submitter = std::this_thread::get_id();
472     exec_tracker.prioritized_work_started = true;
473     tbb::this_task_arena::isolate( IsolationFunctor(work_size) );
474     exec_tracker.prioritized_work_finished = true;
475 }
476 
477 template<>
478 void do_nested_work<PRIORITIZED_WORK>( const std::thread::id& tid,
479                                        const tbb::blocked_range<int>& /*subrange*/ ) {
480     if( exec_tracker.prioritized_work_started && exec_tracker.prioritized_work_submitter == tid ) {
481         CHECK_MESSAGE( !exec_tracker.prioritized_work_interrupted,
482                        "Thread was not fully devoted to processing of prioritized task." );
483     } else {
484         // prolong processing of prioritized work so that the thread that started
485         // prioritized work has higher probability to help with non-prioritized one.
486         spin_for(0.1);
487     }
488 }
489 
490 // Using pointers to nodes to avoid errors on compilers, which try to generate assignment operator
491 // for the nodes
492 typedef std::vector< std::unique_ptr<continue_node<continue_msg>> > nodes_container_t;
493 
494 void create_nodes( nodes_container_t& nodes, graph& g, int num, int body_size ) {
495     for( int i = 0; i < num; ++i )
496         nodes.push_back(
497             std::unique_ptr<continue_node<continue_msg>>(
498                 new continue_node<continue_msg>( g, CommonBody<NONPRIORITIZED_WORK>( body_size ) )
499             )
500         );
501 }
502 
503 void test( int num_threads ) {
504     INFO( "Testing limit execution to priority tasks (num_threads=" << num_threads << ") - " );
505 
506     tbb::task_arena arena( num_threads );
507 	arena.execute(
508         [&]() {
509             const int nodes_num = 100;
510             const int priority_node_position_part = 10;
511             const int pivot = nodes_num / priority_node_position_part;
512             const int nodes_in_lane = 3 * num_threads;
513             const int small_problem_size = 100;
514             const int large_problem_size = 1000;
515 
516             graph g;
517             nodes_container_t nodes;
518             create_nodes( nodes, g, pivot, large_problem_size );
519             nodes.push_back(
520                 std::unique_ptr<continue_node<continue_msg>>(
521                     new continue_node<continue_msg>(
522                         g, CommonBody<PRIORITIZED_WORK>(small_problem_size), node_priority_t(1)
523                     )
524                 )
525             );
526             create_nodes( nodes, g, nodes_num - pivot - 1, large_problem_size );
527 
528             broadcast_node<continue_msg> bn(g);
529             for( int i = 0; i < nodes_num; ++i )
530                 if( i % nodes_in_lane == 0 )
531                     make_edge( bn, *nodes[i] );
532                 else
533                     make_edge( *nodes[i-1], *nodes[i] );
534             exec_tracker.reset();
535             bn.try_put( continue_msg() );
536             g.wait_for_all();
537         }
538 	);
539 
540     INFO( "done\n" );
541 }
542 
543 } /* namespace LimitingExecutionToPriorityTask */
544 
545 namespace NestedCase {
546 
547 using tbb::task_arena;
548 
549 struct InnerBody {
550     continue_msg operator()( const continue_msg& ) const {
551         return continue_msg();
552     }
553 };
554 
555 struct OuterBody {
556     int my_max_threads;
557     task_arena** my_inner_arena;
558     OuterBody( int max_threads, task_arena** inner_arena )
559         : my_max_threads(max_threads), my_inner_arena(inner_arena) {}
560     // copy constructor to please some old compilers
561     OuterBody( const OuterBody& rhs )
562         : my_max_threads(rhs.my_max_threads), my_inner_arena(rhs.my_inner_arena) {}
563     int operator()( const int& ) {
564         graph inner_graph;
565         continue_node<continue_msg> start_node(inner_graph, InnerBody());
566         continue_node<continue_msg> mid_node1(inner_graph, InnerBody(), node_priority_t(5));
567         continue_node<continue_msg> mid_node2(inner_graph, InnerBody());
568         continue_node<continue_msg> end_node(inner_graph, InnerBody(), node_priority_t(15));
569         make_edge( start_node, mid_node1 );
570         make_edge( mid_node1, end_node );
571         make_edge( start_node, mid_node2 );
572         make_edge( mid_node2, end_node );
573         (*my_inner_arena)->execute( [&inner_graph]{ inner_graph.reset(); } );
574         start_node.try_put( continue_msg() );
575         inner_graph.wait_for_all();
576         return 13;
577     }
578 };
579 
580 void execute_outer_graph( bool same_arena, task_arena& inner_arena, int max_threads,
581                           graph& outer_graph, function_node<int,int>& start_node ) {
582     if( same_arena ) {
583         start_node.try_put( 42 );
584         outer_graph.wait_for_all();
585         return;
586     }
587 
588     auto threads_range = utils::concurrency_range(max_threads);
589     for( auto num_threads : threads_range ) {
590         inner_arena.initialize( static_cast<int>(num_threads) );
591         start_node.try_put( 42 );
592         outer_graph.wait_for_all();
593         inner_arena.terminate();
594     }
595 }
596 
597 void test_in_arena( int max_threads, task_arena& outer_arena, task_arena& inner_arena,
598                     graph& outer_graph, function_node<int, int>& start_node ) {
599     bool same_arena = &outer_arena == &inner_arena;
600     auto threads_range = utils::concurrency_range(max_threads);
601     for( auto num_threads : threads_range ) {
602         INFO( "Testing nested nodes with specified priority in " << (same_arena? "same" : "different")
603               << " arenas, num_threads=" << num_threads << ") - " );
604         outer_arena.initialize( static_cast<int>(num_threads) );
605         outer_arena.execute( [&outer_graph]{ outer_graph.reset(); } );
606         execute_outer_graph( same_arena, inner_arena, max_threads, outer_graph, start_node );
607         outer_arena.terminate();
608         INFO( "done\n" );
609     }
610 }
611 
612 void test( int max_threads ) {
613     task_arena outer_arena; task_arena inner_arena;
614     task_arena* inner_arena_pointer = &outer_arena; // make it same as outer arena in the beginning
615 
616     graph outer_graph;
617     const unsigned num_outer_nodes = 10;
618     const size_t concurrency = unlimited;
619     std::vector< std::unique_ptr<function_node<int,int>> > outer_nodes;
620     for( unsigned node_index = 0; node_index < num_outer_nodes; ++node_index ) {
621         node_priority_t priority = no_priority;
622         if( node_index == num_outer_nodes / 2 )
623             priority = 10;
624 
625         outer_nodes.push_back(
626             std::unique_ptr< function_node<int, int> >(
627                 new function_node<int,int>(
628                     outer_graph, concurrency, OuterBody(max_threads, &inner_arena_pointer), priority
629                 )
630             )
631         );
632     }
633 
634     for( unsigned node_index1 = 0; node_index1 < num_outer_nodes; ++node_index1 )
635         for( unsigned node_index2 = node_index1+1; node_index2 < num_outer_nodes; ++node_index2 )
636             make_edge( *outer_nodes[node_index1], *outer_nodes[node_index2] );
637 
638     test_in_arena( max_threads, outer_arena, outer_arena, outer_graph, *outer_nodes[0] );
639 
640     inner_arena_pointer = &inner_arena;
641 
642     test_in_arena( max_threads, outer_arena, inner_arena, outer_graph, *outer_nodes[0] );
643 }
644 } // namespace NestedCase
645 
646 
647 namespace BypassPrioritizedTask {
648 
649 void common_body( int priority ) {
650     int current_task_index = g_task_num++;
651     g_task_info.push_back( TaskInfo( priority, current_task_index ) );
652 }
653 
654 struct Body {
655     Body( int priority ) : my_priority( priority ) {}
656     continue_msg operator()(const continue_msg&) {
657         common_body( my_priority );
658         return continue_msg();
659     }
660     int my_priority;
661 };
662 
663 struct InputNodeBody {
664     continue_msg operator()( tbb::flow_control& fc ){
665         static bool is_source_executed = false;
666 
667         if( is_source_executed ) {
668             fc.stop();
669             return continue_msg();
670         }
671 
672         common_body( 0 );
673         is_source_executed = true;
674 
675         return continue_msg();
676     }
677 };
678 
679 template<typename StarterNodeType>
680 StarterNodeType create_starter_node(graph& g) {
681     return continue_node<continue_msg>( g, Body(0) );
682 }
683 
684 template<>
685 input_node<continue_msg> create_starter_node<input_node<continue_msg>>(graph& g) {
686     return input_node<continue_msg>( g, InputNodeBody() );
687 }
688 
689 template<typename StarterNodeType>
690 void start_graph( StarterNodeType& starter ) {
691     starter.try_put( continue_msg() );
692 }
693 
694 template<>
695 void start_graph<input_node<continue_msg>>( input_node<continue_msg>& starter ) {
696     starter.activate();
697 }
698 
699 template<typename StarterNodeType>
700 void test_use_case() {
701     g_task_info.clear();
702     g_task_num = 0;
703     graph g;
704     StarterNodeType starter = create_starter_node<StarterNodeType>(g);
705     continue_node<continue_msg> spawn_successor( g, Body(1), node_priority_t(1) );
706     continue_node<continue_msg> bypass_successor( g, Body(2), node_priority_t(2) );
707 
708     make_edge( starter, spawn_successor );
709     make_edge( starter, bypass_successor );
710 
711     start_graph<StarterNodeType>( starter );
712     g.wait_for_all();
713 
714     CHECK_MESSAGE( g_task_info.size() == 3, "" );
715     CHECK_MESSAGE( g_task_info[0].my_task_index == 0, "" );
716     CHECK_MESSAGE( g_task_info[1].my_task_index == 1, "" );
717     CHECK_MESSAGE( g_task_info[2].my_task_index == 2, "" );
718 
719     CHECK_MESSAGE( g_task_info[0].my_priority == 0, "" );
720     CHECK_MESSAGE( g_task_info[1].my_priority == 2, "Bypassed task with higher priority executed in wrong order." );
721     CHECK_MESSAGE( g_task_info[2].my_priority == 1, "" );
722 }
723 
724 //! The test checks that the task from the node with higher priority, which task gets bypassed, is
725 //! executed first than the one spawned with lower priority.
726 void test() {
727     test_use_case<continue_node<continue_msg>>();
728     test_use_case<input_node<continue_msg>>();
729 }
730 
731 } // namespace BypassPrioritizedTask
732 
733 namespace ManySuccessors {
734 
735 struct no_priority_node_body {
736     void operator()(continue_msg) {
737         CHECK_MESSAGE(
738             barrier == 0, "Non-priority successor has to be executed after all priority successors"
739         );
740     }
741     std::atomic<int>& barrier;
742 };
743 
744 struct priority_node_body {
745     void operator()(continue_msg) {
746         --barrier;
747         while (barrier)
748             tbb::detail::d0::yield();
749     }
750     std::atomic<int>& barrier;
751 };
752 
753 void test(int num_threads) {
754     tbb::task_arena arena( num_threads );
755     arena.execute(
756         [&]() {
757             graph g;
758             broadcast_node<continue_msg> bn(g);
759             std::vector< std::unique_ptr<continue_node<continue_msg>> > nodes;
760             std::atomic<int> barrier;
761             for (int i = 0; i < 2 * num_threads; ++i)
762                 nodes.push_back(
763                     std::unique_ptr<continue_node<continue_msg>>(
764                         new continue_node<continue_msg>(g, no_priority_node_body{ barrier })
765                     )
766                 );
767             for (int i = 0; i < num_threads; ++i)
768                 nodes.push_back(
769                     std::unique_ptr<continue_node<continue_msg>>(
770                         new continue_node<continue_msg>(g, priority_node_body{ barrier }, /*priority*/1)
771                     )
772                 );
773 
774             std::random_device rd;
775             std::mt19937 gen(rd());
776 
777             for (int trial = 0; trial < 10; ++trial) {
778                 barrier = num_threads;
779                 std::shuffle(nodes.begin(), nodes.end(), gen);
780                 for (auto& n : nodes)
781                     make_edge(bn, *n);
782                 bn.try_put(continue_msg());
783                 g.wait_for_all();
784                 for (auto& n : nodes)
785                     remove_edge(bn, *n);
786             }
787         }
788     );
789 }
790 
791 } // namespace ManySuccessors
792 
793 #if TBB_USE_EXCEPTIONS
794 namespace Exceptions {
795     void test() {
796         using namespace tbb::flow;
797         graph g;
798         std::srand(42);
799         const unsigned num_messages = 50;
800         std::vector<unsigned> throwing_msgs;
801         std::atomic<unsigned> msg_count(0);
802         continue_node<unsigned> c(g, [&msg_count](continue_msg) {
803             return ++msg_count;
804         }, 2);
805         function_node<unsigned> f(g, unlimited, [&throwing_msgs](unsigned v) {
806             for( auto i : throwing_msgs ) {
807                 if( i == v )
808                     throw std::runtime_error("Exception::test");
809             }
810         }, 1);
811         make_edge(c, f);
812         for (int i = 0; i < 10; ++i) {
813             msg_count = 0;
814             g.reset();
815             throwing_msgs.push_back(std::rand() % num_messages);
816             try {
817                 for (unsigned j = 0; j < num_messages; ++j) {
818                     c.try_put(continue_msg());
819                 }
820                 g.wait_for_all();
821                 FAIL("Unreachable code. The exception is expected");
822             } catch (std::runtime_error&) {
823                 CHECK(g.is_cancelled());
824                 CHECK(g.exception_thrown());
825             } catch (...) {
826                 FAIL("Unexpected exception");
827             }
828         }
829     }
830 } // namespace Exceptions
831 #endif
832 
833 //! Test node prioritization
834 //! \brief \ref requirement
835 TEST_CASE("Priority nodes take precedence"){
836     for( auto p : utils::concurrency_range() ) {
837         PriorityNodesTakePrecedence::test( static_cast<int>(p) );
838     }
839 }
840 
841 //! Test thread eager reaction
842 //! \brief \ref error_guessing
843 TEST_CASE("Thread eager reaction"){
844     for( auto p : utils::concurrency_range() ) {
845         ThreadsEagerReaction::test( static_cast<int>(p) );
846     }
847 }
848 
849 //! Test prioritization under concurrency limits
850 //! \brief \ref error_guessing
851 TEST_CASE("Limiting execution to prioritized work") {
852     for( auto p : utils::concurrency_range() ) {
853         LimitingExecutionToPriorityTask::test( static_cast<int>(p) );
854     }
855 }
856 
857 //! Test nested graphs
858 //! \brief \ref error_guessing
859 TEST_CASE("Nested test case") {
860     std::size_t max_threads = utils::get_platform_max_threads();
861     // The stepping for the threads is done inside.
862     NestedCase::test( static_cast<int>(max_threads) );
863 }
864 
865 //! Test bypassed task with higher priority
866 //! \brief \ref error_guessing
867 TEST_CASE("Bypass prioritized task"){
868     tbb::global_control gc( tbb::global_control::max_allowed_parallelism, 1 );
869     BypassPrioritizedTask::test();
870 }
871 
872 //! Test mixing prioritized and ordinary successors
873 //! \brief \ref error_guessing
874 TEST_CASE("Many successors") {
875     for( auto p : utils::concurrency_range() ) {
876         ManySuccessors::test( static_cast<int>(p) );
877     }
878 }
879 
880 #if TBB_USE_EXCEPTIONS
881 //! Test for exceptions
882 //! \brief \ref error_guessing
883 TEST_CASE("Exceptions") {
884     Exceptions::test();
885 }
886 #endif
887