1 /*
2     Copyright (c) 2005-2022 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef __TBB_flow_graph_impl_H
18 #define __TBB_flow_graph_impl_H
19 
20 // #include "../config.h"
21 #include "_task.h"
22 #include "../task_group.h"
23 #include "../task_arena.h"
24 #include "../flow_graph_abstractions.h"
25 
26 #include "../concurrent_priority_queue.h"
27 
28 #include <list>
29 
30 namespace tbb {
31 namespace detail {
32 
33 namespace d1 {
34 
35 class graph_task;
36 static graph_task* const SUCCESSFULLY_ENQUEUED = (graph_task*)-1;
37 typedef unsigned int node_priority_t;
38 static const node_priority_t no_priority = node_priority_t(0);
39 
40 class graph;
41 class graph_node;
42 
43 template <typename GraphContainerType, typename GraphNodeType>
44 class graph_iterator {
45     friend class graph;
46     friend class graph_node;
47 public:
48     typedef size_t size_type;
49     typedef GraphNodeType value_type;
50     typedef GraphNodeType* pointer;
51     typedef GraphNodeType& reference;
52     typedef const GraphNodeType& const_reference;
53     typedef std::forward_iterator_tag iterator_category;
54 
55     //! Copy constructor
graph_iterator(const graph_iterator & other)56     graph_iterator(const graph_iterator& other) :
57         my_graph(other.my_graph), current_node(other.current_node)
58     {}
59 
60     //! Assignment
61     graph_iterator& operator=(const graph_iterator& other) {
62         if (this != &other) {
63             my_graph = other.my_graph;
64             current_node = other.current_node;
65         }
66         return *this;
67     }
68 
69     //! Dereference
70     reference operator*() const;
71 
72     //! Dereference
73     pointer operator->() const;
74 
75     //! Equality
76     bool operator==(const graph_iterator& other) const {
77         return ((my_graph == other.my_graph) && (current_node == other.current_node));
78     }
79 
80 #if !__TBB_CPP20_COMPARISONS_PRESENT
81     //! Inequality
82     bool operator!=(const graph_iterator& other) const { return !(operator==(other)); }
83 #endif
84 
85     //! Pre-increment
86     graph_iterator& operator++() {
87         internal_forward();
88         return *this;
89     }
90 
91     //! Post-increment
92     graph_iterator operator++(int) {
93         graph_iterator result = *this;
94         operator++();
95         return result;
96     }
97 
98 private:
99     // the graph over which we are iterating
100     GraphContainerType *my_graph;
101     // pointer into my_graph's my_nodes list
102     pointer current_node;
103 
104     //! Private initializing constructor for begin() and end() iterators
105     graph_iterator(GraphContainerType *g, bool begin);
106     void internal_forward();
107 };  // class graph_iterator
108 
109 // flags to modify the behavior of the graph reset().  Can be combined.
110 enum reset_flags {
111     rf_reset_protocol = 0,
112     rf_reset_bodies = 1 << 0,  // delete the current node body, reset to a copy of the initial node body.
113     rf_clear_edges = 1 << 1   // delete edges
114 };
115 
116 void activate_graph(graph& g);
117 void deactivate_graph(graph& g);
118 bool is_graph_active(graph& g);
119 graph_task* prioritize_task(graph& g, graph_task& arena_task);
120 void spawn_in_graph_arena(graph& g, graph_task& arena_task);
121 void enqueue_in_graph_arena(graph &g, graph_task& arena_task);
122 
123 class graph;
124 
125 //! Base class for tasks generated by graph nodes.
126 class graph_task : public task {
127 public:
128     graph_task(graph& g, small_object_allocator& allocator
129                , node_priority_t node_priority = no_priority
130     )
my_graph(g)131         : my_graph(g)
132         , priority(node_priority)
133         , my_allocator(allocator)
134     {}
135     graph& my_graph; // graph instance the task belongs to
136     // TODO revamp: rename to my_priority
137     node_priority_t priority;
138     template <typename DerivedType>
139     void destruct_and_deallocate(const execution_data& ed);
140 protected:
141     template <typename DerivedType>
142     void finalize(const execution_data& ed);
143 private:
144     // To organize task_list
145     graph_task* my_next{ nullptr };
146     small_object_allocator my_allocator;
147     // TODO revamp: elaborate internal interfaces to avoid friends declarations
148     friend class graph_task_list;
149     friend graph_task* prioritize_task(graph& g, graph_task& gt);
150 };
151 
152 struct graph_task_comparator {
operatorgraph_task_comparator153     bool operator()(const graph_task* left, const graph_task* right) {
154         return left->priority < right->priority;
155     }
156 };
157 
158 typedef tbb::concurrent_priority_queue<graph_task*, graph_task_comparator> graph_task_priority_queue_t;
159 
160 class priority_task_selector : public task {
161 public:
priority_task_selector(graph_task_priority_queue_t & priority_queue,small_object_allocator & allocator)162     priority_task_selector(graph_task_priority_queue_t& priority_queue, small_object_allocator& allocator)
163         : my_priority_queue(priority_queue), my_allocator(allocator), my_task() {}
execute(execution_data & ed)164     task* execute(execution_data& ed) override {
165         next_task();
166         __TBB_ASSERT(my_task, nullptr);
167         task* t_next = my_task->execute(ed);
168         my_allocator.delete_object(this, ed);
169         return t_next;
170     }
cancel(execution_data & ed)171     task* cancel(execution_data& ed) override {
172         if (!my_task) {
173             next_task();
174         }
175         __TBB_ASSERT(my_task, nullptr);
176         task* t_next = my_task->cancel(ed);
177         my_allocator.delete_object(this, ed);
178         return t_next;
179     }
180 private:
next_task()181     void next_task() {
182         // TODO revamp: hold functors in priority queue instead of real tasks
183         bool result = my_priority_queue.try_pop(my_task);
184         __TBB_ASSERT_EX(result, "Number of critical tasks for scheduler and tasks"
185             " in graph's priority queue mismatched");
186         __TBB_ASSERT(my_task && my_task != SUCCESSFULLY_ENQUEUED,
187             "Incorrect task submitted to graph priority queue");
188         __TBB_ASSERT(my_task->priority != no_priority,
189             "Tasks from graph's priority queue must have priority");
190     }
191 
192     graph_task_priority_queue_t& my_priority_queue;
193     small_object_allocator my_allocator;
194     graph_task* my_task;
195 };
196 
197 template <typename Receiver, typename Body> class run_and_put_task;
198 template <typename Body> class run_task;
199 
200 //********************************************************************************
201 // graph tasks helpers
202 //********************************************************************************
203 
204 //! The list of graph tasks
205 class graph_task_list : no_copy {
206 private:
207     graph_task* my_first;
208     graph_task** my_next_ptr;
209 public:
210     //! Construct empty list
graph_task_list()211     graph_task_list() : my_first(nullptr), my_next_ptr(&my_first) {}
212 
213     //! True if list is empty; false otherwise.
empty()214     bool empty() const { return !my_first; }
215 
216     //! Push task onto back of list.
push_back(graph_task & task)217     void push_back(graph_task& task) {
218         task.my_next = nullptr;
219         *my_next_ptr = &task;
220         my_next_ptr = &task.my_next;
221     }
222 
223     //! Pop the front task from the list.
pop_front()224     graph_task& pop_front() {
225         __TBB_ASSERT(!empty(), "attempt to pop item from empty task_list");
226         graph_task* result = my_first;
227         my_first = result->my_next;
228         if (!my_first) {
229             my_next_ptr = &my_first;
230         }
231         return *result;
232     }
233 };
234 
235 //! The graph class
236 /** This class serves as a handle to the graph */
237 class graph : no_copy, public graph_proxy {
238     friend class graph_node;
239 
240     void prepare_task_arena(bool reinit = false) {
241         if (reinit) {
242             __TBB_ASSERT(my_task_arena, "task arena is nullptr");
243             my_task_arena->terminate();
244             my_task_arena->initialize(task_arena::attach());
245         }
246         else {
247             __TBB_ASSERT(my_task_arena == nullptr, "task arena is not nullptr");
248             my_task_arena = new task_arena(task_arena::attach());
249         }
250         if (!my_task_arena->is_active()) // failed to attach
251             my_task_arena->initialize(); // create a new, default-initialized arena
252         __TBB_ASSERT(my_task_arena->is_active(), "task arena is not active");
253     }
254 
255 public:
256     //! Constructs a graph with isolated task_group_context
257     graph();
258 
259     //! Constructs a graph with use_this_context as context
260     explicit graph(task_group_context& use_this_context);
261 
262     //! Destroys the graph.
263     /** Calls wait_for_all, then destroys the root task and context. */
264     ~graph();
265 
266     //! Used to register that an external entity may still interact with the graph.
267     /** The graph will not return from wait_for_all until a matching number of release_wait calls is
268     made. */
269     void reserve_wait() override;
270 
271     //! Deregisters an external entity that may have interacted with the graph.
272     /** The graph will not return from wait_for_all until all the number of reserve_wait calls
273     matches the number of release_wait calls. */
274     void release_wait() override;
275 
276     //! Wait until graph is idle and the number of release_wait calls equals to the number of
277     //! reserve_wait calls.
278     /** The waiting thread will go off and steal work while it is blocked in the wait_for_all. */
wait_for_all()279     void wait_for_all() {
280         cancelled = false;
281         caught_exception = false;
282         try_call([this] {
283             my_task_arena->execute([this] {
284                 wait(my_wait_context, *my_context);
285             });
286             cancelled = my_context->is_group_execution_cancelled();
287         }).on_exception([this] {
288             my_context->reset();
289             caught_exception = true;
290             cancelled = true;
291         });
292         // TODO: the "if" condition below is just a work-around to support the concurrent wait
293         // mode. The cancellation and exception mechanisms are still broken in this mode.
294         // Consider using task group not to re-implement the same functionality.
295         if (!(my_context->traits() & task_group_context::concurrent_wait)) {
296             my_context->reset();  // consistent with behavior in catch()
297         }
298     }
299 
300     // TODO revamp: consider adding getter for task_group_context.
301 
302     // ITERATORS
303     template<typename C, typename N>
304     friend class graph_iterator;
305 
306     // Graph iterator typedefs
307     typedef graph_iterator<graph, graph_node> iterator;
308     typedef graph_iterator<const graph, const graph_node> const_iterator;
309 
310     // Graph iterator constructors
311     //! start iterator
312     iterator begin();
313     //! end iterator
314     iterator end();
315     //! start const iterator
316     const_iterator begin() const;
317     //! end const iterator
318     const_iterator end() const;
319     //! start const iterator
320     const_iterator cbegin() const;
321     //! end const iterator
322     const_iterator cend() const;
323 
324     // thread-unsafe state reset.
325     void reset(reset_flags f = rf_reset_protocol);
326 
327     //! cancels execution of the associated task_group_context
328     void cancel();
329 
330     //! return status of graph execution
is_cancelled()331     bool is_cancelled() { return cancelled; }
exception_thrown()332     bool exception_thrown() { return caught_exception; }
333 
334 private:
335     wait_context my_wait_context;
336     task_group_context *my_context;
337     bool own_context;
338     bool cancelled;
339     bool caught_exception;
340     bool my_is_active;
341 
342     graph_node *my_nodes, *my_nodes_last;
343 
344     tbb::spin_mutex nodelist_mutex;
345     void register_node(graph_node *n);
346     void remove_node(graph_node *n);
347 
348     task_arena* my_task_arena;
349 
350     graph_task_priority_queue_t my_priority_queue;
351 
352     friend void activate_graph(graph& g);
353     friend void deactivate_graph(graph& g);
354     friend bool is_graph_active(graph& g);
355     friend graph_task* prioritize_task(graph& g, graph_task& arena_task);
356     friend void spawn_in_graph_arena(graph& g, graph_task& arena_task);
357     friend void enqueue_in_graph_arena(graph &g, graph_task& arena_task);
358 
359     friend class task_arena_base;
360 
361 };  // class graph
362 
363 template<typename DerivedType>
destruct_and_deallocate(const execution_data & ed)364 inline void graph_task::destruct_and_deallocate(const execution_data& ed) {
365     auto allocator = my_allocator;
366     // TODO: investigate if direct call of derived destructor gives any benefits.
367     this->~graph_task();
368     allocator.deallocate(static_cast<DerivedType*>(this), ed);
369 }
370 
371 template<typename DerivedType>
finalize(const execution_data & ed)372 inline void graph_task::finalize(const execution_data& ed) {
373     graph& g = my_graph;
374     destruct_and_deallocate<DerivedType>(ed);
375     g.release_wait();
376 }
377 
378 //********************************************************************************
379 // end of graph tasks helpers
380 //********************************************************************************
381 
382 
383 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
384 class get_graph_helper;
385 #endif
386 
387 //! The base of all graph nodes.
388 class graph_node : no_copy {
389     friend class graph;
390     template<typename C, typename N>
391     friend class graph_iterator;
392 
393 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
394     friend class get_graph_helper;
395 #endif
396 
397 protected:
398     graph& my_graph;
graph_reference()399     graph& graph_reference() const {
400         // TODO revamp: propagate graph_reference() method to all the reference places.
401         return my_graph;
402     }
403     graph_node* next = nullptr;
404     graph_node* prev = nullptr;
405 public:
406     explicit graph_node(graph& g);
407 
408     virtual ~graph_node();
409 
410 protected:
411     // performs the reset on an individual node.
412     virtual void reset_node(reset_flags f = rf_reset_protocol) = 0;
413 };  // class graph_node
414 
activate_graph(graph & g)415 inline void activate_graph(graph& g) {
416     g.my_is_active = true;
417 }
418 
deactivate_graph(graph & g)419 inline void deactivate_graph(graph& g) {
420     g.my_is_active = false;
421 }
422 
is_graph_active(graph & g)423 inline bool is_graph_active(graph& g) {
424     return g.my_is_active;
425 }
426 
prioritize_task(graph & g,graph_task & gt)427 inline graph_task* prioritize_task(graph& g, graph_task& gt) {
428     if( no_priority == gt.priority )
429         return &gt;
430 
431     //! Non-preemptive priority pattern. The original task is submitted as a work item to the
432     //! priority queue, and a new critical task is created to take and execute a work item with
433     //! the highest known priority. The reference counting responsibility is transferred (via
434     //! allocate_continuation) to the new task.
435     task* critical_task = gt.my_allocator.new_object<priority_task_selector>(g.my_priority_queue, gt.my_allocator);
436     __TBB_ASSERT( critical_task, "bad_alloc?" );
437     g.my_priority_queue.push(&gt);
438     using tbb::detail::d1::submit;
439     submit( *critical_task, *g.my_task_arena, *g.my_context, /*as_critical=*/true );
440     return nullptr;
441 }
442 
443 //! Spawns a task inside graph arena
spawn_in_graph_arena(graph & g,graph_task & arena_task)444 inline void spawn_in_graph_arena(graph& g, graph_task& arena_task) {
445     if (is_graph_active(g)) {
446         task* gt = prioritize_task(g, arena_task);
447         if( !gt )
448             return;
449 
450         __TBB_ASSERT(g.my_task_arena && g.my_task_arena->is_active(), nullptr);
451         submit( *gt, *g.my_task_arena, *g.my_context
452 #if __TBB_PREVIEW_CRITICAL_TASKS
453                 , /*as_critical=*/false
454 #endif
455         );
456     }
457 }
458 
459 // TODO revamp: unify *_in_graph_arena functions
460 
461 //! Enqueues a task inside graph arena
enqueue_in_graph_arena(graph & g,graph_task & arena_task)462 inline void enqueue_in_graph_arena(graph &g, graph_task& arena_task) {
463     if (is_graph_active(g)) {
464         __TBB_ASSERT( g.my_task_arena && g.my_task_arena->is_active(), "Is graph's arena initialized and active?" );
465 
466         // TODO revamp: decide on the approach that does not postpone critical task
467         if( task* gt = prioritize_task(g, arena_task) )
468             submit( *gt, *g.my_task_arena, *g.my_context, /*as_critical=*/false);
469     }
470 }
471 
472 } // namespace d1
473 } // namespace detail
474 } // namespace tbb
475 
476 #endif // __TBB_flow_graph_impl_H
477