1 /* 2 Copyright (c) 2005-2022 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef __TBB_flow_graph_impl_H 18 #define __TBB_flow_graph_impl_H 19 20 // #include "../config.h" 21 #include "_task.h" 22 #include "../task_group.h" 23 #include "../task_arena.h" 24 #include "../flow_graph_abstractions.h" 25 26 #include "../concurrent_priority_queue.h" 27 28 #include <list> 29 30 namespace tbb { 31 namespace detail { 32 33 namespace d1 { 34 35 class graph_task; 36 static graph_task* const SUCCESSFULLY_ENQUEUED = (graph_task*)-1; 37 typedef unsigned int node_priority_t; 38 static const node_priority_t no_priority = node_priority_t(0); 39 40 class graph; 41 class graph_node; 42 43 template <typename GraphContainerType, typename GraphNodeType> 44 class graph_iterator { 45 friend class graph; 46 friend class graph_node; 47 public: 48 typedef size_t size_type; 49 typedef GraphNodeType value_type; 50 typedef GraphNodeType* pointer; 51 typedef GraphNodeType& reference; 52 typedef const GraphNodeType& const_reference; 53 typedef std::forward_iterator_tag iterator_category; 54 55 //! Copy constructor 56 graph_iterator(const graph_iterator& other) : 57 my_graph(other.my_graph), current_node(other.current_node) 58 {} 59 60 //! Assignment 61 graph_iterator& operator=(const graph_iterator& other) { 62 if (this != &other) { 63 my_graph = other.my_graph; 64 current_node = other.current_node; 65 } 66 return *this; 67 } 68 69 //! Dereference 70 reference operator*() const; 71 72 //! Dereference 73 pointer operator->() const; 74 75 //! Equality 76 bool operator==(const graph_iterator& other) const { 77 return ((my_graph == other.my_graph) && (current_node == other.current_node)); 78 } 79 80 #if !__TBB_CPP20_COMPARISONS_PRESENT 81 //! Inequality 82 bool operator!=(const graph_iterator& other) const { return !(operator==(other)); } 83 #endif 84 85 //! Pre-increment 86 graph_iterator& operator++() { 87 internal_forward(); 88 return *this; 89 } 90 91 //! Post-increment 92 graph_iterator operator++(int) { 93 graph_iterator result = *this; 94 operator++(); 95 return result; 96 } 97 98 private: 99 // the graph over which we are iterating 100 GraphContainerType *my_graph; 101 // pointer into my_graph's my_nodes list 102 pointer current_node; 103 104 //! Private initializing constructor for begin() and end() iterators 105 graph_iterator(GraphContainerType *g, bool begin); 106 void internal_forward(); 107 }; // class graph_iterator 108 109 // flags to modify the behavior of the graph reset(). Can be combined. 110 enum reset_flags { 111 rf_reset_protocol = 0, 112 rf_reset_bodies = 1 << 0, // delete the current node body, reset to a copy of the initial node body. 113 rf_clear_edges = 1 << 1 // delete edges 114 }; 115 116 void activate_graph(graph& g); 117 void deactivate_graph(graph& g); 118 bool is_graph_active(graph& g); 119 graph_task* prioritize_task(graph& g, graph_task& arena_task); 120 void spawn_in_graph_arena(graph& g, graph_task& arena_task); 121 void enqueue_in_graph_arena(graph &g, graph_task& arena_task); 122 123 class graph; 124 125 //! Base class for tasks generated by graph nodes. 126 class graph_task : public task { 127 public: 128 graph_task(graph& g, small_object_allocator& allocator 129 , node_priority_t node_priority = no_priority 130 ) 131 : my_graph(g) 132 , priority(node_priority) 133 , my_allocator(allocator) 134 {} 135 graph& my_graph; // graph instance the task belongs to 136 // TODO revamp: rename to my_priority 137 node_priority_t priority; 138 template <typename DerivedType> 139 void destruct_and_deallocate(const execution_data& ed); 140 protected: 141 template <typename DerivedType> 142 void finalize(const execution_data& ed); 143 private: 144 // To organize task_list 145 graph_task* my_next{ nullptr }; 146 small_object_allocator my_allocator; 147 // TODO revamp: elaborate internal interfaces to avoid friends declarations 148 friend class graph_task_list; 149 friend graph_task* prioritize_task(graph& g, graph_task& gt); 150 }; 151 152 struct graph_task_comparator { 153 bool operator()(const graph_task* left, const graph_task* right) { 154 return left->priority < right->priority; 155 } 156 }; 157 158 typedef tbb::concurrent_priority_queue<graph_task*, graph_task_comparator> graph_task_priority_queue_t; 159 160 class priority_task_selector : public task { 161 public: 162 priority_task_selector(graph_task_priority_queue_t& priority_queue, small_object_allocator& allocator) 163 : my_priority_queue(priority_queue), my_allocator(allocator), my_task() {} 164 task* execute(execution_data& ed) override { 165 next_task(); 166 __TBB_ASSERT(my_task, nullptr); 167 task* t_next = my_task->execute(ed); 168 my_allocator.delete_object(this, ed); 169 return t_next; 170 } 171 task* cancel(execution_data& ed) override { 172 if (!my_task) { 173 next_task(); 174 } 175 __TBB_ASSERT(my_task, nullptr); 176 task* t_next = my_task->cancel(ed); 177 my_allocator.delete_object(this, ed); 178 return t_next; 179 } 180 private: 181 void next_task() { 182 // TODO revamp: hold functors in priority queue instead of real tasks 183 bool result = my_priority_queue.try_pop(my_task); 184 __TBB_ASSERT_EX(result, "Number of critical tasks for scheduler and tasks" 185 " in graph's priority queue mismatched"); 186 __TBB_ASSERT(my_task && my_task != SUCCESSFULLY_ENQUEUED, 187 "Incorrect task submitted to graph priority queue"); 188 __TBB_ASSERT(my_task->priority != no_priority, 189 "Tasks from graph's priority queue must have priority"); 190 } 191 192 graph_task_priority_queue_t& my_priority_queue; 193 small_object_allocator my_allocator; 194 graph_task* my_task; 195 }; 196 197 template <typename Receiver, typename Body> class run_and_put_task; 198 template <typename Body> class run_task; 199 200 //******************************************************************************** 201 // graph tasks helpers 202 //******************************************************************************** 203 204 //! The list of graph tasks 205 class graph_task_list : no_copy { 206 private: 207 graph_task* my_first; 208 graph_task** my_next_ptr; 209 public: 210 //! Construct empty list 211 graph_task_list() : my_first(nullptr), my_next_ptr(&my_first) {} 212 213 //! True if list is empty; false otherwise. 214 bool empty() const { return !my_first; } 215 216 //! Push task onto back of list. 217 void push_back(graph_task& task) { 218 task.my_next = nullptr; 219 *my_next_ptr = &task; 220 my_next_ptr = &task.my_next; 221 } 222 223 //! Pop the front task from the list. 224 graph_task& pop_front() { 225 __TBB_ASSERT(!empty(), "attempt to pop item from empty task_list"); 226 graph_task* result = my_first; 227 my_first = result->my_next; 228 if (!my_first) { 229 my_next_ptr = &my_first; 230 } 231 return *result; 232 } 233 }; 234 235 //! The graph class 236 /** This class serves as a handle to the graph */ 237 class graph : no_copy, public graph_proxy { 238 friend class graph_node; 239 240 void prepare_task_arena(bool reinit = false) { 241 if (reinit) { 242 __TBB_ASSERT(my_task_arena, "task arena is nullptr"); 243 my_task_arena->terminate(); 244 my_task_arena->initialize(task_arena::attach()); 245 } 246 else { 247 __TBB_ASSERT(my_task_arena == nullptr, "task arena is not nullptr"); 248 my_task_arena = new task_arena(task_arena::attach()); 249 } 250 if (!my_task_arena->is_active()) // failed to attach 251 my_task_arena->initialize(); // create a new, default-initialized arena 252 __TBB_ASSERT(my_task_arena->is_active(), "task arena is not active"); 253 } 254 255 public: 256 //! Constructs a graph with isolated task_group_context 257 graph(); 258 259 //! Constructs a graph with use_this_context as context 260 explicit graph(task_group_context& use_this_context); 261 262 //! Destroys the graph. 263 /** Calls wait_for_all, then destroys the root task and context. */ 264 ~graph(); 265 266 //! Used to register that an external entity may still interact with the graph. 267 /** The graph will not return from wait_for_all until a matching number of release_wait calls is 268 made. */ 269 void reserve_wait() override; 270 271 //! Deregisters an external entity that may have interacted with the graph. 272 /** The graph will not return from wait_for_all until all the number of reserve_wait calls 273 matches the number of release_wait calls. */ 274 void release_wait() override; 275 276 //! Wait until graph is idle and the number of release_wait calls equals to the number of 277 //! reserve_wait calls. 278 /** The waiting thread will go off and steal work while it is blocked in the wait_for_all. */ 279 void wait_for_all() { 280 cancelled = false; 281 caught_exception = false; 282 try_call([this] { 283 my_task_arena->execute([this] { 284 wait(my_wait_context, *my_context); 285 }); 286 cancelled = my_context->is_group_execution_cancelled(); 287 }).on_exception([this] { 288 my_context->reset(); 289 caught_exception = true; 290 cancelled = true; 291 }); 292 // TODO: the "if" condition below is just a work-around to support the concurrent wait 293 // mode. The cancellation and exception mechanisms are still broken in this mode. 294 // Consider using task group not to re-implement the same functionality. 295 if (!(my_context->traits() & task_group_context::concurrent_wait)) { 296 my_context->reset(); // consistent with behavior in catch() 297 } 298 } 299 300 // TODO revamp: consider adding getter for task_group_context. 301 302 // ITERATORS 303 template<typename C, typename N> 304 friend class graph_iterator; 305 306 // Graph iterator typedefs 307 typedef graph_iterator<graph, graph_node> iterator; 308 typedef graph_iterator<const graph, const graph_node> const_iterator; 309 310 // Graph iterator constructors 311 //! start iterator 312 iterator begin(); 313 //! end iterator 314 iterator end(); 315 //! start const iterator 316 const_iterator begin() const; 317 //! end const iterator 318 const_iterator end() const; 319 //! start const iterator 320 const_iterator cbegin() const; 321 //! end const iterator 322 const_iterator cend() const; 323 324 // thread-unsafe state reset. 325 void reset(reset_flags f = rf_reset_protocol); 326 327 //! cancels execution of the associated task_group_context 328 void cancel(); 329 330 //! return status of graph execution 331 bool is_cancelled() { return cancelled; } 332 bool exception_thrown() { return caught_exception; } 333 334 private: 335 wait_context my_wait_context; 336 task_group_context *my_context; 337 bool own_context; 338 bool cancelled; 339 bool caught_exception; 340 bool my_is_active; 341 342 graph_node *my_nodes, *my_nodes_last; 343 344 tbb::spin_mutex nodelist_mutex; 345 void register_node(graph_node *n); 346 void remove_node(graph_node *n); 347 348 task_arena* my_task_arena; 349 350 graph_task_priority_queue_t my_priority_queue; 351 352 friend void activate_graph(graph& g); 353 friend void deactivate_graph(graph& g); 354 friend bool is_graph_active(graph& g); 355 friend graph_task* prioritize_task(graph& g, graph_task& arena_task); 356 friend void spawn_in_graph_arena(graph& g, graph_task& arena_task); 357 friend void enqueue_in_graph_arena(graph &g, graph_task& arena_task); 358 359 friend class task_arena_base; 360 361 }; // class graph 362 363 template<typename DerivedType> 364 inline void graph_task::destruct_and_deallocate(const execution_data& ed) { 365 auto allocator = my_allocator; 366 // TODO: investigate if direct call of derived destructor gives any benefits. 367 this->~graph_task(); 368 allocator.deallocate(static_cast<DerivedType*>(this), ed); 369 } 370 371 template<typename DerivedType> 372 inline void graph_task::finalize(const execution_data& ed) { 373 graph& g = my_graph; 374 destruct_and_deallocate<DerivedType>(ed); 375 g.release_wait(); 376 } 377 378 //******************************************************************************** 379 // end of graph tasks helpers 380 //******************************************************************************** 381 382 383 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 384 class get_graph_helper; 385 #endif 386 387 //! The base of all graph nodes. 388 class graph_node : no_copy { 389 friend class graph; 390 template<typename C, typename N> 391 friend class graph_iterator; 392 393 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 394 friend class get_graph_helper; 395 #endif 396 397 protected: 398 graph& my_graph; 399 graph& graph_reference() const { 400 // TODO revamp: propagate graph_reference() method to all the reference places. 401 return my_graph; 402 } 403 graph_node* next = nullptr; 404 graph_node* prev = nullptr; 405 public: 406 explicit graph_node(graph& g); 407 408 virtual ~graph_node(); 409 410 protected: 411 // performs the reset on an individual node. 412 virtual void reset_node(reset_flags f = rf_reset_protocol) = 0; 413 }; // class graph_node 414 415 inline void activate_graph(graph& g) { 416 g.my_is_active = true; 417 } 418 419 inline void deactivate_graph(graph& g) { 420 g.my_is_active = false; 421 } 422 423 inline bool is_graph_active(graph& g) { 424 return g.my_is_active; 425 } 426 427 inline graph_task* prioritize_task(graph& g, graph_task& gt) { 428 if( no_priority == gt.priority ) 429 return > 430 431 //! Non-preemptive priority pattern. The original task is submitted as a work item to the 432 //! priority queue, and a new critical task is created to take and execute a work item with 433 //! the highest known priority. The reference counting responsibility is transferred (via 434 //! allocate_continuation) to the new task. 435 task* critical_task = gt.my_allocator.new_object<priority_task_selector>(g.my_priority_queue, gt.my_allocator); 436 __TBB_ASSERT( critical_task, "bad_alloc?" ); 437 g.my_priority_queue.push(>); 438 using tbb::detail::d1::submit; 439 submit( *critical_task, *g.my_task_arena, *g.my_context, /*as_critical=*/true ); 440 return nullptr; 441 } 442 443 //! Spawns a task inside graph arena 444 inline void spawn_in_graph_arena(graph& g, graph_task& arena_task) { 445 if (is_graph_active(g)) { 446 task* gt = prioritize_task(g, arena_task); 447 if( !gt ) 448 return; 449 450 __TBB_ASSERT(g.my_task_arena && g.my_task_arena->is_active(), nullptr); 451 submit( *gt, *g.my_task_arena, *g.my_context 452 #if __TBB_PREVIEW_CRITICAL_TASKS 453 , /*as_critical=*/false 454 #endif 455 ); 456 } 457 } 458 459 // TODO revamp: unify *_in_graph_arena functions 460 461 //! Enqueues a task inside graph arena 462 inline void enqueue_in_graph_arena(graph &g, graph_task& arena_task) { 463 if (is_graph_active(g)) { 464 __TBB_ASSERT( g.my_task_arena && g.my_task_arena->is_active(), "Is graph's arena initialized and active?" ); 465 466 // TODO revamp: decide on the approach that does not postpone critical task 467 if( task* gt = prioritize_task(g, arena_task) ) 468 submit( *gt, *g.my_task_arena, *g.my_context, /*as_critical=*/false); 469 } 470 } 471 472 } // namespace d1 473 } // namespace detail 474 } // namespace tbb 475 476 #endif // __TBB_flow_graph_impl_H 477