1 /*
2     Copyright (c) 2005-2021 Intel Corporation
3 
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7 
8         http://www.apache.org/licenses/LICENSE-2.0
9 
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16 
17 #ifndef __TBB_flow_graph_impl_H
18 #define __TBB_flow_graph_impl_H
19 
20 // #include "../config.h"
21 #include "_task.h"
22 #include "../task_group.h"
23 #include "../task_arena.h"
24 #include "../flow_graph_abstractions.h"
25 
26 #include "../concurrent_priority_queue.h"
27 
28 #include <list>
29 
30 namespace tbb {
31 namespace detail {
32 
33 namespace d1 {
34 
35 class graph_task;
36 static graph_task* const SUCCESSFULLY_ENQUEUED = (graph_task*)-1;
37 typedef unsigned int node_priority_t;
38 static const node_priority_t no_priority = node_priority_t(0);
39 
40 class graph;
41 class graph_node;
42 
43 template <typename GraphContainerType, typename GraphNodeType>
44 class graph_iterator {
45     friend class graph;
46     friend class graph_node;
47 public:
48     typedef size_t size_type;
49     typedef GraphNodeType value_type;
50     typedef GraphNodeType* pointer;
51     typedef GraphNodeType& reference;
52     typedef const GraphNodeType& const_reference;
53     typedef std::forward_iterator_tag iterator_category;
54 
55     //! Copy constructor
56     graph_iterator(const graph_iterator& other) :
57         my_graph(other.my_graph), current_node(other.current_node)
58     {}
59 
60     //! Assignment
61     graph_iterator& operator=(const graph_iterator& other) {
62         if (this != &other) {
63             my_graph = other.my_graph;
64             current_node = other.current_node;
65         }
66         return *this;
67     }
68 
69     //! Dereference
70     reference operator*() const;
71 
72     //! Dereference
73     pointer operator->() const;
74 
75     //! Equality
76     bool operator==(const graph_iterator& other) const {
77         return ((my_graph == other.my_graph) && (current_node == other.current_node));
78     }
79 
80 #if !__TBB_CPP20_COMPARISONS_PRESENT
81     //! Inequality
82     bool operator!=(const graph_iterator& other) const { return !(operator==(other)); }
83 #endif
84 
85     //! Pre-increment
86     graph_iterator& operator++() {
87         internal_forward();
88         return *this;
89     }
90 
91     //! Post-increment
92     graph_iterator operator++(int) {
93         graph_iterator result = *this;
94         operator++();
95         return result;
96     }
97 
98 private:
99     // the graph over which we are iterating
100     GraphContainerType *my_graph;
101     // pointer into my_graph's my_nodes list
102     pointer current_node;
103 
104     //! Private initializing constructor for begin() and end() iterators
105     graph_iterator(GraphContainerType *g, bool begin);
106     void internal_forward();
107 };  // class graph_iterator
108 
109 // flags to modify the behavior of the graph reset().  Can be combined.
110 enum reset_flags {
111     rf_reset_protocol = 0,
112     rf_reset_bodies = 1 << 0,  // delete the current node body, reset to a copy of the initial node body.
113     rf_clear_edges = 1 << 1   // delete edges
114 };
115 
116 void activate_graph(graph& g);
117 void deactivate_graph(graph& g);
118 bool is_graph_active(graph& g);
119 graph_task* prioritize_task(graph& g, graph_task& arena_task);
120 void spawn_in_graph_arena(graph& g, graph_task& arena_task);
121 void enqueue_in_graph_arena(graph &g, graph_task& arena_task);
122 
123 class graph;
124 
125 //! Base class for tasks generated by graph nodes.
126 class graph_task : public task {
127 public:
128     graph_task(graph& g, small_object_allocator& allocator
129                , node_priority_t node_priority = no_priority
130     )
131         : my_graph(g)
132         , priority(node_priority)
133         , my_allocator(allocator)
134     {}
135     graph& my_graph; // graph instance the task belongs to
136     // TODO revamp: rename to my_priority
137     node_priority_t priority;
138     void destruct_and_deallocate(const execution_data& ed);
139     task* cancel(execution_data& ed) override;
140 protected:
141     void finalize(const execution_data& ed);
142 private:
143     // To organize task_list
144     graph_task* my_next{ nullptr };
145     small_object_allocator my_allocator;
146     // TODO revamp: elaborate internal interfaces to avoid friends declarations
147     friend class graph_task_list;
148     friend graph_task* prioritize_task(graph& g, graph_task& gt);
149 };
150 
151 struct graph_task_comparator {
152     bool operator()(const graph_task* left, const graph_task* right) {
153         return left->priority < right->priority;
154     }
155 };
156 
157 typedef tbb::concurrent_priority_queue<graph_task*, graph_task_comparator> graph_task_priority_queue_t;
158 
159 class priority_task_selector : public task {
160 public:
161     priority_task_selector(graph_task_priority_queue_t& priority_queue, small_object_allocator& allocator)
162         : my_priority_queue(priority_queue), my_allocator(allocator), my_task() {}
163     task* execute(execution_data& ed) override {
164         next_task();
165         __TBB_ASSERT(my_task, nullptr);
166         task* t_next = my_task->execute(ed);
167         my_allocator.delete_object(this, ed);
168         return t_next;
169     }
170     task* cancel(execution_data& ed) override {
171         if (!my_task) {
172             next_task();
173         }
174         __TBB_ASSERT(my_task, nullptr);
175         task* t_next = my_task->cancel(ed);
176         my_allocator.delete_object(this, ed);
177         return t_next;
178     }
179 private:
180     void next_task() {
181         // TODO revamp: hold functors in priority queue instead of real tasks
182         bool result = my_priority_queue.try_pop(my_task);
183         __TBB_ASSERT_EX(result, "Number of critical tasks for scheduler and tasks"
184             " in graph's priority queue mismatched");
185         __TBB_ASSERT(my_task && my_task != SUCCESSFULLY_ENQUEUED,
186             "Incorrect task submitted to graph priority queue");
187         __TBB_ASSERT(my_task->priority != no_priority,
188             "Tasks from graph's priority queue must have priority");
189     }
190 
191     graph_task_priority_queue_t& my_priority_queue;
192     small_object_allocator my_allocator;
193     graph_task* my_task;
194 };
195 
196 template <typename Receiver, typename Body> class run_and_put_task;
197 template <typename Body> class run_task;
198 
199 //********************************************************************************
200 // graph tasks helpers
201 //********************************************************************************
202 
203 //! The list of graph tasks
204 class graph_task_list : no_copy {
205 private:
206     graph_task* my_first;
207     graph_task** my_next_ptr;
208 public:
209     //! Construct empty list
210     graph_task_list() : my_first(nullptr), my_next_ptr(&my_first) {}
211 
212     //! True if list is empty; false otherwise.
213     bool empty() const { return !my_first; }
214 
215     //! Push task onto back of list.
216     void push_back(graph_task& task) {
217         task.my_next = nullptr;
218         *my_next_ptr = &task;
219         my_next_ptr = &task.my_next;
220     }
221 
222     //! Pop the front task from the list.
223     graph_task& pop_front() {
224         __TBB_ASSERT(!empty(), "attempt to pop item from empty task_list");
225         graph_task* result = my_first;
226         my_first = result->my_next;
227         if (!my_first) {
228             my_next_ptr = &my_first;
229         }
230         return *result;
231     }
232 };
233 
234 //! The graph class
235 /** This class serves as a handle to the graph */
236 class graph : no_copy, public graph_proxy {
237     friend class graph_node;
238 
239     void prepare_task_arena(bool reinit = false) {
240         if (reinit) {
241             __TBB_ASSERT(my_task_arena, "task arena is NULL");
242             my_task_arena->terminate();
243             my_task_arena->initialize(task_arena::attach());
244         }
245         else {
246             __TBB_ASSERT(my_task_arena == NULL, "task arena is not NULL");
247             my_task_arena = new task_arena(task_arena::attach());
248         }
249         if (!my_task_arena->is_active()) // failed to attach
250             my_task_arena->initialize(); // create a new, default-initialized arena
251         __TBB_ASSERT(my_task_arena->is_active(), "task arena is not active");
252     }
253 
254 public:
255     //! Constructs a graph with isolated task_group_context
256     graph();
257 
258     //! Constructs a graph with use_this_context as context
259     explicit graph(task_group_context& use_this_context);
260 
261     //! Destroys the graph.
262     /** Calls wait_for_all, then destroys the root task and context. */
263     ~graph();
264 
265     //! Used to register that an external entity may still interact with the graph.
266     /** The graph will not return from wait_for_all until a matching number of release_wait calls is
267     made. */
268     void reserve_wait() override;
269 
270     //! Deregisters an external entity that may have interacted with the graph.
271     /** The graph will not return from wait_for_all until all the number of reserve_wait calls
272     matches the number of release_wait calls. */
273     void release_wait() override;
274 
275     //! Wait until graph is idle and the number of release_wait calls equals to the number of
276     //! reserve_wait calls.
277     /** The waiting thread will go off and steal work while it is blocked in the wait_for_all. */
278     void wait_for_all() {
279         cancelled = false;
280         caught_exception = false;
281         try_call([this] {
282             my_task_arena->execute([this] {
283                 wait(my_wait_context, *my_context);
284             });
285             cancelled = my_context->is_group_execution_cancelled();
286         }).on_exception([this] {
287             my_context->reset();
288             caught_exception = true;
289             cancelled = true;
290         });
291         // TODO: the "if" condition below is just a work-around to support the concurrent wait
292         // mode. The cancellation and exception mechanisms are still broken in this mode.
293         // Consider using task group not to re-implement the same functionality.
294         if (!(my_context->traits() & task_group_context::concurrent_wait)) {
295             my_context->reset();  // consistent with behavior in catch()
296         }
297     }
298 
299     // TODO revamp: consider adding getter for task_group_context.
300 
301     // ITERATORS
302     template<typename C, typename N>
303     friend class graph_iterator;
304 
305     // Graph iterator typedefs
306     typedef graph_iterator<graph, graph_node> iterator;
307     typedef graph_iterator<const graph, const graph_node> const_iterator;
308 
309     // Graph iterator constructors
310     //! start iterator
311     iterator begin();
312     //! end iterator
313     iterator end();
314     //! start const iterator
315     const_iterator begin() const;
316     //! end const iterator
317     const_iterator end() const;
318     //! start const iterator
319     const_iterator cbegin() const;
320     //! end const iterator
321     const_iterator cend() const;
322 
323     // thread-unsafe state reset.
324     void reset(reset_flags f = rf_reset_protocol);
325 
326     //! cancels execution of the associated task_group_context
327     void cancel();
328 
329     //! return status of graph execution
330     bool is_cancelled() { return cancelled; }
331     bool exception_thrown() { return caught_exception; }
332 
333 private:
334     wait_context my_wait_context;
335     task_group_context *my_context;
336     bool own_context;
337     bool cancelled;
338     bool caught_exception;
339     bool my_is_active;
340 
341     graph_node *my_nodes, *my_nodes_last;
342 
343     tbb::spin_mutex nodelist_mutex;
344     void register_node(graph_node *n);
345     void remove_node(graph_node *n);
346 
347     task_arena* my_task_arena;
348 
349     graph_task_priority_queue_t my_priority_queue;
350 
351     friend void activate_graph(graph& g);
352     friend void deactivate_graph(graph& g);
353     friend bool is_graph_active(graph& g);
354     friend graph_task* prioritize_task(graph& g, graph_task& arena_task);
355     friend void spawn_in_graph_arena(graph& g, graph_task& arena_task);
356     friend void enqueue_in_graph_arena(graph &g, graph_task& arena_task);
357 
358     friend class task_arena_base;
359 
360 };  // class graph
361 
362 inline void graph_task::destruct_and_deallocate(const execution_data& ed) {
363     auto allocator = my_allocator;
364     // TODO: investigate if direct call of derived destructor gives any benefits.
365     this->~graph_task();
366     allocator.deallocate(this, ed);
367 }
368 
369 inline void graph_task::finalize(const execution_data& ed) {
370     graph& g = my_graph;
371     destruct_and_deallocate(ed);
372     g.release_wait();
373 }
374 
375 inline task* graph_task::cancel(execution_data& ed) {
376     finalize(ed);
377     return nullptr;
378 }
379 
380 //********************************************************************************
381 // end of graph tasks helpers
382 //********************************************************************************
383 
384 
385 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
386 class get_graph_helper;
387 #endif
388 
389 //! The base of all graph nodes.
390 class graph_node : no_copy {
391     friend class graph;
392     template<typename C, typename N>
393     friend class graph_iterator;
394 
395 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
396     friend class get_graph_helper;
397 #endif
398 
399 protected:
400     graph& my_graph;
401     graph& graph_reference() const {
402         // TODO revamp: propagate graph_reference() method to all the reference places.
403         return my_graph;
404     }
405     graph_node* next = nullptr;
406     graph_node* prev = nullptr;
407 public:
408     explicit graph_node(graph& g);
409 
410     virtual ~graph_node();
411 
412 protected:
413     // performs the reset on an individual node.
414     virtual void reset_node(reset_flags f = rf_reset_protocol) = 0;
415 };  // class graph_node
416 
417 inline void activate_graph(graph& g) {
418     g.my_is_active = true;
419 }
420 
421 inline void deactivate_graph(graph& g) {
422     g.my_is_active = false;
423 }
424 
425 inline bool is_graph_active(graph& g) {
426     return g.my_is_active;
427 }
428 
429 inline graph_task* prioritize_task(graph& g, graph_task& gt) {
430     if( no_priority == gt.priority )
431         return &gt;
432 
433     //! Non-preemptive priority pattern. The original task is submitted as a work item to the
434     //! priority queue, and a new critical task is created to take and execute a work item with
435     //! the highest known priority. The reference counting responsibility is transferred (via
436     //! allocate_continuation) to the new task.
437     task* critical_task = gt.my_allocator.new_object<priority_task_selector>(g.my_priority_queue, gt.my_allocator);
438     __TBB_ASSERT( critical_task, "bad_alloc?" );
439     g.my_priority_queue.push(&gt);
440     using tbb::detail::d1::submit;
441     submit( *critical_task, *g.my_task_arena, *g.my_context, /*as_critical=*/true );
442     return nullptr;
443 }
444 
445 //! Spawns a task inside graph arena
446 inline void spawn_in_graph_arena(graph& g, graph_task& arena_task) {
447     if (is_graph_active(g)) {
448         task* gt = prioritize_task(g, arena_task);
449         if( !gt )
450             return;
451 
452         __TBB_ASSERT(g.my_task_arena && g.my_task_arena->is_active(), NULL);
453         submit( *gt, *g.my_task_arena, *g.my_context
454 #if __TBB_PREVIEW_CRITICAL_TASKS
455                 , /*as_critical=*/false
456 #endif
457         );
458     }
459 }
460 
461 // TODO revamp: unify *_in_graph_arena functions
462 
463 //! Enqueues a task inside graph arena
464 inline void enqueue_in_graph_arena(graph &g, graph_task& arena_task) {
465     if (is_graph_active(g)) {
466         __TBB_ASSERT( g.my_task_arena && g.my_task_arena->is_active(), "Is graph's arena initialized and active?" );
467 
468         // TODO revamp: decide on the approach that does not postpone critical task
469         if( task* gt = prioritize_task(g, arena_task) )
470             submit( *gt, *g.my_task_arena, *g.my_context, /*as_critical=*/false);
471     }
472 }
473 
474 } // namespace d1
475 } // namespace detail
476 } // namespace tbb
477 
478 #endif // __TBB_flow_graph_impl_H
479