1 /*
2 Copyright (c) 2005-2022 Intel Corporation
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 #ifndef __TBB_flow_graph_impl_H
18 #define __TBB_flow_graph_impl_H
19
20 // #include "../config.h"
21 #include "_task.h"
22 #include "../task_group.h"
23 #include "../task_arena.h"
24 #include "../flow_graph_abstractions.h"
25
26 #include "../concurrent_priority_queue.h"
27
28 #include <list>
29
30 namespace tbb {
31 namespace detail {
32
33 namespace d1 {
34
35 class graph_task;
36 static graph_task* const SUCCESSFULLY_ENQUEUED = (graph_task*)-1;
37 typedef unsigned int node_priority_t;
38 static const node_priority_t no_priority = node_priority_t(0);
39
40 class graph;
41 class graph_node;
42
43 template <typename GraphContainerType, typename GraphNodeType>
44 class graph_iterator {
45 friend class graph;
46 friend class graph_node;
47 public:
48 typedef size_t size_type;
49 typedef GraphNodeType value_type;
50 typedef GraphNodeType* pointer;
51 typedef GraphNodeType& reference;
52 typedef const GraphNodeType& const_reference;
53 typedef std::forward_iterator_tag iterator_category;
54
55 //! Copy constructor
graph_iterator(const graph_iterator & other)56 graph_iterator(const graph_iterator& other) :
57 my_graph(other.my_graph), current_node(other.current_node)
58 {}
59
60 //! Assignment
61 graph_iterator& operator=(const graph_iterator& other) {
62 if (this != &other) {
63 my_graph = other.my_graph;
64 current_node = other.current_node;
65 }
66 return *this;
67 }
68
69 //! Dereference
70 reference operator*() const;
71
72 //! Dereference
73 pointer operator->() const;
74
75 //! Equality
76 bool operator==(const graph_iterator& other) const {
77 return ((my_graph == other.my_graph) && (current_node == other.current_node));
78 }
79
80 #if !__TBB_CPP20_COMPARISONS_PRESENT
81 //! Inequality
82 bool operator!=(const graph_iterator& other) const { return !(operator==(other)); }
83 #endif
84
85 //! Pre-increment
86 graph_iterator& operator++() {
87 internal_forward();
88 return *this;
89 }
90
91 //! Post-increment
92 graph_iterator operator++(int) {
93 graph_iterator result = *this;
94 operator++();
95 return result;
96 }
97
98 private:
99 // the graph over which we are iterating
100 GraphContainerType *my_graph;
101 // pointer into my_graph's my_nodes list
102 pointer current_node;
103
104 //! Private initializing constructor for begin() and end() iterators
105 graph_iterator(GraphContainerType *g, bool begin);
106 void internal_forward();
107 }; // class graph_iterator
108
109 // flags to modify the behavior of the graph reset(). Can be combined.
110 enum reset_flags {
111 rf_reset_protocol = 0,
112 rf_reset_bodies = 1 << 0, // delete the current node body, reset to a copy of the initial node body.
113 rf_clear_edges = 1 << 1 // delete edges
114 };
115
116 void activate_graph(graph& g);
117 void deactivate_graph(graph& g);
118 bool is_graph_active(graph& g);
119 graph_task* prioritize_task(graph& g, graph_task& arena_task);
120 void spawn_in_graph_arena(graph& g, graph_task& arena_task);
121 void enqueue_in_graph_arena(graph &g, graph_task& arena_task);
122
123 class graph;
124
125 //! Base class for tasks generated by graph nodes.
126 class graph_task : public task {
127 public:
128 graph_task(graph& g, small_object_allocator& allocator
129 , node_priority_t node_priority = no_priority
130 )
my_graph(g)131 : my_graph(g)
132 , priority(node_priority)
133 , my_allocator(allocator)
134 {}
135 graph& my_graph; // graph instance the task belongs to
136 // TODO revamp: rename to my_priority
137 node_priority_t priority;
138 template <typename DerivedType>
139 void destruct_and_deallocate(const execution_data& ed);
140 protected:
141 template <typename DerivedType>
142 void finalize(const execution_data& ed);
143 private:
144 // To organize task_list
145 graph_task* my_next{ nullptr };
146 small_object_allocator my_allocator;
147 // TODO revamp: elaborate internal interfaces to avoid friends declarations
148 friend class graph_task_list;
149 friend graph_task* prioritize_task(graph& g, graph_task& gt);
150 };
151
152 struct graph_task_comparator {
operatorgraph_task_comparator153 bool operator()(const graph_task* left, const graph_task* right) {
154 return left->priority < right->priority;
155 }
156 };
157
158 typedef tbb::concurrent_priority_queue<graph_task*, graph_task_comparator> graph_task_priority_queue_t;
159
160 class priority_task_selector : public task {
161 public:
priority_task_selector(graph_task_priority_queue_t & priority_queue,small_object_allocator & allocator)162 priority_task_selector(graph_task_priority_queue_t& priority_queue, small_object_allocator& allocator)
163 : my_priority_queue(priority_queue), my_allocator(allocator), my_task() {}
execute(execution_data & ed)164 task* execute(execution_data& ed) override {
165 next_task();
166 __TBB_ASSERT(my_task, nullptr);
167 task* t_next = my_task->execute(ed);
168 my_allocator.delete_object(this, ed);
169 return t_next;
170 }
cancel(execution_data & ed)171 task* cancel(execution_data& ed) override {
172 if (!my_task) {
173 next_task();
174 }
175 __TBB_ASSERT(my_task, nullptr);
176 task* t_next = my_task->cancel(ed);
177 my_allocator.delete_object(this, ed);
178 return t_next;
179 }
180 private:
next_task()181 void next_task() {
182 // TODO revamp: hold functors in priority queue instead of real tasks
183 bool result = my_priority_queue.try_pop(my_task);
184 __TBB_ASSERT_EX(result, "Number of critical tasks for scheduler and tasks"
185 " in graph's priority queue mismatched");
186 __TBB_ASSERT(my_task && my_task != SUCCESSFULLY_ENQUEUED,
187 "Incorrect task submitted to graph priority queue");
188 __TBB_ASSERT(my_task->priority != no_priority,
189 "Tasks from graph's priority queue must have priority");
190 }
191
192 graph_task_priority_queue_t& my_priority_queue;
193 small_object_allocator my_allocator;
194 graph_task* my_task;
195 };
196
197 template <typename Receiver, typename Body> class run_and_put_task;
198 template <typename Body> class run_task;
199
200 //********************************************************************************
201 // graph tasks helpers
202 //********************************************************************************
203
204 //! The list of graph tasks
205 class graph_task_list : no_copy {
206 private:
207 graph_task* my_first;
208 graph_task** my_next_ptr;
209 public:
210 //! Construct empty list
graph_task_list()211 graph_task_list() : my_first(nullptr), my_next_ptr(&my_first) {}
212
213 //! True if list is empty; false otherwise.
empty()214 bool empty() const { return !my_first; }
215
216 //! Push task onto back of list.
push_back(graph_task & task)217 void push_back(graph_task& task) {
218 task.my_next = nullptr;
219 *my_next_ptr = &task;
220 my_next_ptr = &task.my_next;
221 }
222
223 //! Pop the front task from the list.
pop_front()224 graph_task& pop_front() {
225 __TBB_ASSERT(!empty(), "attempt to pop item from empty task_list");
226 graph_task* result = my_first;
227 my_first = result->my_next;
228 if (!my_first) {
229 my_next_ptr = &my_first;
230 }
231 return *result;
232 }
233 };
234
235 //! The graph class
236 /** This class serves as a handle to the graph */
237 class graph : no_copy, public graph_proxy {
238 friend class graph_node;
239
240 void prepare_task_arena(bool reinit = false) {
241 if (reinit) {
242 __TBB_ASSERT(my_task_arena, "task arena is nullptr");
243 my_task_arena->terminate();
244 my_task_arena->initialize(task_arena::attach());
245 }
246 else {
247 __TBB_ASSERT(my_task_arena == nullptr, "task arena is not nullptr");
248 my_task_arena = new task_arena(task_arena::attach());
249 }
250 if (!my_task_arena->is_active()) // failed to attach
251 my_task_arena->initialize(); // create a new, default-initialized arena
252 __TBB_ASSERT(my_task_arena->is_active(), "task arena is not active");
253 }
254
255 public:
256 //! Constructs a graph with isolated task_group_context
257 graph();
258
259 //! Constructs a graph with use_this_context as context
260 explicit graph(task_group_context& use_this_context);
261
262 //! Destroys the graph.
263 /** Calls wait_for_all, then destroys the root task and context. */
264 ~graph();
265
266 //! Used to register that an external entity may still interact with the graph.
267 /** The graph will not return from wait_for_all until a matching number of release_wait calls is
268 made. */
269 void reserve_wait() override;
270
271 //! Deregisters an external entity that may have interacted with the graph.
272 /** The graph will not return from wait_for_all until all the number of reserve_wait calls
273 matches the number of release_wait calls. */
274 void release_wait() override;
275
276 //! Wait until graph is idle and the number of release_wait calls equals to the number of
277 //! reserve_wait calls.
278 /** The waiting thread will go off and steal work while it is blocked in the wait_for_all. */
wait_for_all()279 void wait_for_all() {
280 cancelled = false;
281 caught_exception = false;
282 try_call([this] {
283 my_task_arena->execute([this] {
284 wait(my_wait_context, *my_context);
285 });
286 cancelled = my_context->is_group_execution_cancelled();
287 }).on_exception([this] {
288 my_context->reset();
289 caught_exception = true;
290 cancelled = true;
291 });
292 // TODO: the "if" condition below is just a work-around to support the concurrent wait
293 // mode. The cancellation and exception mechanisms are still broken in this mode.
294 // Consider using task group not to re-implement the same functionality.
295 if (!(my_context->traits() & task_group_context::concurrent_wait)) {
296 my_context->reset(); // consistent with behavior in catch()
297 }
298 }
299
300 // TODO revamp: consider adding getter for task_group_context.
301
302 // ITERATORS
303 template<typename C, typename N>
304 friend class graph_iterator;
305
306 // Graph iterator typedefs
307 typedef graph_iterator<graph, graph_node> iterator;
308 typedef graph_iterator<const graph, const graph_node> const_iterator;
309
310 // Graph iterator constructors
311 //! start iterator
312 iterator begin();
313 //! end iterator
314 iterator end();
315 //! start const iterator
316 const_iterator begin() const;
317 //! end const iterator
318 const_iterator end() const;
319 //! start const iterator
320 const_iterator cbegin() const;
321 //! end const iterator
322 const_iterator cend() const;
323
324 // thread-unsafe state reset.
325 void reset(reset_flags f = rf_reset_protocol);
326
327 //! cancels execution of the associated task_group_context
328 void cancel();
329
330 //! return status of graph execution
is_cancelled()331 bool is_cancelled() { return cancelled; }
exception_thrown()332 bool exception_thrown() { return caught_exception; }
333
334 private:
335 wait_context my_wait_context;
336 task_group_context *my_context;
337 bool own_context;
338 bool cancelled;
339 bool caught_exception;
340 bool my_is_active;
341
342 graph_node *my_nodes, *my_nodes_last;
343
344 tbb::spin_mutex nodelist_mutex;
345 void register_node(graph_node *n);
346 void remove_node(graph_node *n);
347
348 task_arena* my_task_arena;
349
350 graph_task_priority_queue_t my_priority_queue;
351
352 friend void activate_graph(graph& g);
353 friend void deactivate_graph(graph& g);
354 friend bool is_graph_active(graph& g);
355 friend graph_task* prioritize_task(graph& g, graph_task& arena_task);
356 friend void spawn_in_graph_arena(graph& g, graph_task& arena_task);
357 friend void enqueue_in_graph_arena(graph &g, graph_task& arena_task);
358
359 friend class task_arena_base;
360
361 }; // class graph
362
363 template<typename DerivedType>
destruct_and_deallocate(const execution_data & ed)364 inline void graph_task::destruct_and_deallocate(const execution_data& ed) {
365 auto allocator = my_allocator;
366 // TODO: investigate if direct call of derived destructor gives any benefits.
367 this->~graph_task();
368 allocator.deallocate(static_cast<DerivedType*>(this), ed);
369 }
370
371 template<typename DerivedType>
finalize(const execution_data & ed)372 inline void graph_task::finalize(const execution_data& ed) {
373 graph& g = my_graph;
374 destruct_and_deallocate<DerivedType>(ed);
375 g.release_wait();
376 }
377
378 //********************************************************************************
379 // end of graph tasks helpers
380 //********************************************************************************
381
382
383 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
384 class get_graph_helper;
385 #endif
386
387 //! The base of all graph nodes.
388 class graph_node : no_copy {
389 friend class graph;
390 template<typename C, typename N>
391 friend class graph_iterator;
392
393 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
394 friend class get_graph_helper;
395 #endif
396
397 protected:
398 graph& my_graph;
graph_reference()399 graph& graph_reference() const {
400 // TODO revamp: propagate graph_reference() method to all the reference places.
401 return my_graph;
402 }
403 graph_node* next = nullptr;
404 graph_node* prev = nullptr;
405 public:
406 explicit graph_node(graph& g);
407
408 virtual ~graph_node();
409
410 protected:
411 // performs the reset on an individual node.
412 virtual void reset_node(reset_flags f = rf_reset_protocol) = 0;
413 }; // class graph_node
414
activate_graph(graph & g)415 inline void activate_graph(graph& g) {
416 g.my_is_active = true;
417 }
418
deactivate_graph(graph & g)419 inline void deactivate_graph(graph& g) {
420 g.my_is_active = false;
421 }
422
is_graph_active(graph & g)423 inline bool is_graph_active(graph& g) {
424 return g.my_is_active;
425 }
426
prioritize_task(graph & g,graph_task & gt)427 inline graph_task* prioritize_task(graph& g, graph_task& gt) {
428 if( no_priority == gt.priority )
429 return >
430
431 //! Non-preemptive priority pattern. The original task is submitted as a work item to the
432 //! priority queue, and a new critical task is created to take and execute a work item with
433 //! the highest known priority. The reference counting responsibility is transferred (via
434 //! allocate_continuation) to the new task.
435 task* critical_task = gt.my_allocator.new_object<priority_task_selector>(g.my_priority_queue, gt.my_allocator);
436 __TBB_ASSERT( critical_task, "bad_alloc?" );
437 g.my_priority_queue.push(>);
438 using tbb::detail::d1::submit;
439 submit( *critical_task, *g.my_task_arena, *g.my_context, /*as_critical=*/true );
440 return nullptr;
441 }
442
443 //! Spawns a task inside graph arena
spawn_in_graph_arena(graph & g,graph_task & arena_task)444 inline void spawn_in_graph_arena(graph& g, graph_task& arena_task) {
445 if (is_graph_active(g)) {
446 task* gt = prioritize_task(g, arena_task);
447 if( !gt )
448 return;
449
450 __TBB_ASSERT(g.my_task_arena && g.my_task_arena->is_active(), nullptr);
451 submit( *gt, *g.my_task_arena, *g.my_context
452 #if __TBB_PREVIEW_CRITICAL_TASKS
453 , /*as_critical=*/false
454 #endif
455 );
456 }
457 }
458
459 // TODO revamp: unify *_in_graph_arena functions
460
461 //! Enqueues a task inside graph arena
enqueue_in_graph_arena(graph & g,graph_task & arena_task)462 inline void enqueue_in_graph_arena(graph &g, graph_task& arena_task) {
463 if (is_graph_active(g)) {
464 __TBB_ASSERT( g.my_task_arena && g.my_task_arena->is_active(), "Is graph's arena initialized and active?" );
465
466 // TODO revamp: decide on the approach that does not postpone critical task
467 if( task* gt = prioritize_task(g, arena_task) )
468 submit( *gt, *g.my_task_arena, *g.my_context, /*as_critical=*/false);
469 }
470 }
471
472 } // namespace d1
473 } // namespace detail
474 } // namespace tbb
475
476 #endif // __TBB_flow_graph_impl_H
477