1 /* 2 Copyright (c) 2005-2021 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef __TBB_flow_graph_impl_H 18 #define __TBB_flow_graph_impl_H 19 20 // #include "../config.h" 21 #include "_task.h" 22 #include "../task_group.h" 23 #include "../task_arena.h" 24 #include "../flow_graph_abstractions.h" 25 26 #include "../concurrent_priority_queue.h" 27 28 #include <list> 29 30 namespace tbb { 31 namespace detail { 32 33 namespace d1 { 34 35 class graph_task; 36 static graph_task* const SUCCESSFULLY_ENQUEUED = (graph_task*)-1; 37 typedef unsigned int node_priority_t; 38 static const node_priority_t no_priority = node_priority_t(0); 39 40 class graph; 41 class graph_node; 42 43 template <typename GraphContainerType, typename GraphNodeType> 44 class graph_iterator { 45 friend class graph; 46 friend class graph_node; 47 public: 48 typedef size_t size_type; 49 typedef GraphNodeType value_type; 50 typedef GraphNodeType* pointer; 51 typedef GraphNodeType& reference; 52 typedef const GraphNodeType& const_reference; 53 typedef std::forward_iterator_tag iterator_category; 54 55 //! Copy constructor 56 graph_iterator(const graph_iterator& other) : 57 my_graph(other.my_graph), current_node(other.current_node) 58 {} 59 60 //! Assignment 61 graph_iterator& operator=(const graph_iterator& other) { 62 if (this != &other) { 63 my_graph = other.my_graph; 64 current_node = other.current_node; 65 } 66 return *this; 67 } 68 69 //! Dereference 70 reference operator*() const; 71 72 //! Dereference 73 pointer operator->() const; 74 75 //! Equality 76 bool operator==(const graph_iterator& other) const { 77 return ((my_graph == other.my_graph) && (current_node == other.current_node)); 78 } 79 80 #if !__TBB_CPP20_COMPARISONS_PRESENT 81 //! Inequality 82 bool operator!=(const graph_iterator& other) const { return !(operator==(other)); } 83 #endif 84 85 //! Pre-increment 86 graph_iterator& operator++() { 87 internal_forward(); 88 return *this; 89 } 90 91 //! Post-increment 92 graph_iterator operator++(int) { 93 graph_iterator result = *this; 94 operator++(); 95 return result; 96 } 97 98 private: 99 // the graph over which we are iterating 100 GraphContainerType *my_graph; 101 // pointer into my_graph's my_nodes list 102 pointer current_node; 103 104 //! Private initializing constructor for begin() and end() iterators 105 graph_iterator(GraphContainerType *g, bool begin); 106 void internal_forward(); 107 }; // class graph_iterator 108 109 // flags to modify the behavior of the graph reset(). Can be combined. 110 enum reset_flags { 111 rf_reset_protocol = 0, 112 rf_reset_bodies = 1 << 0, // delete the current node body, reset to a copy of the initial node body. 113 rf_clear_edges = 1 << 1 // delete edges 114 }; 115 116 void activate_graph(graph& g); 117 void deactivate_graph(graph& g); 118 bool is_graph_active(graph& g); 119 graph_task* prioritize_task(graph& g, graph_task& arena_task); 120 void spawn_in_graph_arena(graph& g, graph_task& arena_task); 121 void enqueue_in_graph_arena(graph &g, graph_task& arena_task); 122 123 class graph; 124 125 //! Base class for tasks generated by graph nodes. 126 class graph_task : public task { 127 public: 128 graph_task(graph& g, small_object_allocator& allocator 129 , node_priority_t node_priority = no_priority 130 ) 131 : my_graph(g) 132 , priority(node_priority) 133 , my_allocator(allocator) 134 {} 135 graph& my_graph; // graph instance the task belongs to 136 // TODO revamp: rename to my_priority 137 node_priority_t priority; 138 void destruct_and_deallocate(const execution_data& ed); 139 task* cancel(execution_data& ed) override; 140 protected: 141 void finalize(const execution_data& ed); 142 private: 143 // To organize task_list 144 graph_task* my_next{ nullptr }; 145 small_object_allocator my_allocator; 146 // TODO revamp: elaborate internal interfaces to avoid friends declarations 147 friend class graph_task_list; 148 friend graph_task* prioritize_task(graph& g, graph_task& gt); 149 }; 150 151 struct graph_task_comparator { 152 bool operator()(const graph_task* left, const graph_task* right) { 153 return left->priority < right->priority; 154 } 155 }; 156 157 typedef tbb::concurrent_priority_queue<graph_task*, graph_task_comparator> graph_task_priority_queue_t; 158 159 class priority_task_selector : public task { 160 public: 161 priority_task_selector(graph_task_priority_queue_t& priority_queue, small_object_allocator& allocator) 162 : my_priority_queue(priority_queue), my_allocator(allocator), my_task() {} 163 task* execute(execution_data& ed) override { 164 next_task(); 165 __TBB_ASSERT(my_task, nullptr); 166 task* t_next = my_task->execute(ed); 167 my_allocator.delete_object(this, ed); 168 return t_next; 169 } 170 task* cancel(execution_data& ed) override { 171 if (!my_task) { 172 next_task(); 173 } 174 __TBB_ASSERT(my_task, nullptr); 175 task* t_next = my_task->cancel(ed); 176 my_allocator.delete_object(this, ed); 177 return t_next; 178 } 179 private: 180 void next_task() { 181 // TODO revamp: hold functors in priority queue instead of real tasks 182 bool result = my_priority_queue.try_pop(my_task); 183 __TBB_ASSERT_EX(result, "Number of critical tasks for scheduler and tasks" 184 " in graph's priority queue mismatched"); 185 __TBB_ASSERT(my_task && my_task != SUCCESSFULLY_ENQUEUED, 186 "Incorrect task submitted to graph priority queue"); 187 __TBB_ASSERT(my_task->priority != no_priority, 188 "Tasks from graph's priority queue must have priority"); 189 } 190 191 graph_task_priority_queue_t& my_priority_queue; 192 small_object_allocator my_allocator; 193 graph_task* my_task; 194 }; 195 196 template <typename Receiver, typename Body> class run_and_put_task; 197 template <typename Body> class run_task; 198 199 //******************************************************************************** 200 // graph tasks helpers 201 //******************************************************************************** 202 203 //! The list of graph tasks 204 class graph_task_list : no_copy { 205 private: 206 graph_task* my_first; 207 graph_task** my_next_ptr; 208 public: 209 //! Construct empty list 210 graph_task_list() : my_first(nullptr), my_next_ptr(&my_first) {} 211 212 //! True if list is empty; false otherwise. 213 bool empty() const { return !my_first; } 214 215 //! Push task onto back of list. 216 void push_back(graph_task& task) { 217 task.my_next = nullptr; 218 *my_next_ptr = &task; 219 my_next_ptr = &task.my_next; 220 } 221 222 //! Pop the front task from the list. 223 graph_task& pop_front() { 224 __TBB_ASSERT(!empty(), "attempt to pop item from empty task_list"); 225 graph_task* result = my_first; 226 my_first = result->my_next; 227 if (!my_first) { 228 my_next_ptr = &my_first; 229 } 230 return *result; 231 } 232 }; 233 234 //! The graph class 235 /** This class serves as a handle to the graph */ 236 class graph : no_copy, public graph_proxy { 237 friend class graph_node; 238 239 void prepare_task_arena(bool reinit = false) { 240 if (reinit) { 241 __TBB_ASSERT(my_task_arena, "task arena is NULL"); 242 my_task_arena->terminate(); 243 my_task_arena->initialize(task_arena::attach()); 244 } 245 else { 246 __TBB_ASSERT(my_task_arena == NULL, "task arena is not NULL"); 247 my_task_arena = new task_arena(task_arena::attach()); 248 } 249 if (!my_task_arena->is_active()) // failed to attach 250 my_task_arena->initialize(); // create a new, default-initialized arena 251 __TBB_ASSERT(my_task_arena->is_active(), "task arena is not active"); 252 } 253 254 public: 255 //! Constructs a graph with isolated task_group_context 256 graph(); 257 258 //! Constructs a graph with use_this_context as context 259 explicit graph(task_group_context& use_this_context); 260 261 //! Destroys the graph. 262 /** Calls wait_for_all, then destroys the root task and context. */ 263 ~graph(); 264 265 //! Used to register that an external entity may still interact with the graph. 266 /** The graph will not return from wait_for_all until a matching number of release_wait calls is 267 made. */ 268 void reserve_wait() override; 269 270 //! Deregisters an external entity that may have interacted with the graph. 271 /** The graph will not return from wait_for_all until all the number of reserve_wait calls 272 matches the number of release_wait calls. */ 273 void release_wait() override; 274 275 //! Wait until graph is idle and the number of release_wait calls equals to the number of 276 //! reserve_wait calls. 277 /** The waiting thread will go off and steal work while it is blocked in the wait_for_all. */ 278 void wait_for_all() { 279 cancelled = false; 280 caught_exception = false; 281 try_call([this] { 282 my_task_arena->execute([this] { 283 wait(my_wait_context, *my_context); 284 }); 285 cancelled = my_context->is_group_execution_cancelled(); 286 }).on_exception([this] { 287 my_context->reset(); 288 caught_exception = true; 289 cancelled = true; 290 }); 291 // TODO: the "if" condition below is just a work-around to support the concurrent wait 292 // mode. The cancellation and exception mechanisms are still broken in this mode. 293 // Consider using task group not to re-implement the same functionality. 294 if (!(my_context->traits() & task_group_context::concurrent_wait)) { 295 my_context->reset(); // consistent with behavior in catch() 296 } 297 } 298 299 // TODO revamp: consider adding getter for task_group_context. 300 301 // ITERATORS 302 template<typename C, typename N> 303 friend class graph_iterator; 304 305 // Graph iterator typedefs 306 typedef graph_iterator<graph, graph_node> iterator; 307 typedef graph_iterator<const graph, const graph_node> const_iterator; 308 309 // Graph iterator constructors 310 //! start iterator 311 iterator begin(); 312 //! end iterator 313 iterator end(); 314 //! start const iterator 315 const_iterator begin() const; 316 //! end const iterator 317 const_iterator end() const; 318 //! start const iterator 319 const_iterator cbegin() const; 320 //! end const iterator 321 const_iterator cend() const; 322 323 // thread-unsafe state reset. 324 void reset(reset_flags f = rf_reset_protocol); 325 326 //! cancels execution of the associated task_group_context 327 void cancel(); 328 329 //! return status of graph execution 330 bool is_cancelled() { return cancelled; } 331 bool exception_thrown() { return caught_exception; } 332 333 private: 334 wait_context my_wait_context; 335 task_group_context *my_context; 336 bool own_context; 337 bool cancelled; 338 bool caught_exception; 339 bool my_is_active; 340 341 graph_node *my_nodes, *my_nodes_last; 342 343 tbb::spin_mutex nodelist_mutex; 344 void register_node(graph_node *n); 345 void remove_node(graph_node *n); 346 347 task_arena* my_task_arena; 348 349 graph_task_priority_queue_t my_priority_queue; 350 351 friend void activate_graph(graph& g); 352 friend void deactivate_graph(graph& g); 353 friend bool is_graph_active(graph& g); 354 friend graph_task* prioritize_task(graph& g, graph_task& arena_task); 355 friend void spawn_in_graph_arena(graph& g, graph_task& arena_task); 356 friend void enqueue_in_graph_arena(graph &g, graph_task& arena_task); 357 358 friend class task_arena_base; 359 360 }; // class graph 361 362 inline void graph_task::destruct_and_deallocate(const execution_data& ed) { 363 auto allocator = my_allocator; 364 // TODO: investigate if direct call of derived destructor gives any benefits. 365 this->~graph_task(); 366 allocator.deallocate(this, ed); 367 } 368 369 inline void graph_task::finalize(const execution_data& ed) { 370 graph& g = my_graph; 371 destruct_and_deallocate(ed); 372 g.release_wait(); 373 } 374 375 inline task* graph_task::cancel(execution_data& ed) { 376 finalize(ed); 377 return nullptr; 378 } 379 380 //******************************************************************************** 381 // end of graph tasks helpers 382 //******************************************************************************** 383 384 385 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 386 class get_graph_helper; 387 #endif 388 389 //! The base of all graph nodes. 390 class graph_node : no_copy { 391 friend class graph; 392 template<typename C, typename N> 393 friend class graph_iterator; 394 395 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 396 friend class get_graph_helper; 397 #endif 398 399 protected: 400 graph& my_graph; 401 graph& graph_reference() const { 402 // TODO revamp: propagate graph_reference() method to all the reference places. 403 return my_graph; 404 } 405 graph_node* next = nullptr; 406 graph_node* prev = nullptr; 407 public: 408 explicit graph_node(graph& g); 409 410 virtual ~graph_node(); 411 412 protected: 413 // performs the reset on an individual node. 414 virtual void reset_node(reset_flags f = rf_reset_protocol) = 0; 415 }; // class graph_node 416 417 inline void activate_graph(graph& g) { 418 g.my_is_active = true; 419 } 420 421 inline void deactivate_graph(graph& g) { 422 g.my_is_active = false; 423 } 424 425 inline bool is_graph_active(graph& g) { 426 return g.my_is_active; 427 } 428 429 inline graph_task* prioritize_task(graph& g, graph_task& gt) { 430 if( no_priority == gt.priority ) 431 return > 432 433 //! Non-preemptive priority pattern. The original task is submitted as a work item to the 434 //! priority queue, and a new critical task is created to take and execute a work item with 435 //! the highest known priority. The reference counting responsibility is transferred (via 436 //! allocate_continuation) to the new task. 437 task* critical_task = gt.my_allocator.new_object<priority_task_selector>(g.my_priority_queue, gt.my_allocator); 438 __TBB_ASSERT( critical_task, "bad_alloc?" ); 439 g.my_priority_queue.push(>); 440 using tbb::detail::d1::submit; 441 submit( *critical_task, *g.my_task_arena, *g.my_context, /*as_critical=*/true ); 442 return nullptr; 443 } 444 445 //! Spawns a task inside graph arena 446 inline void spawn_in_graph_arena(graph& g, graph_task& arena_task) { 447 if (is_graph_active(g)) { 448 task* gt = prioritize_task(g, arena_task); 449 if( !gt ) 450 return; 451 452 __TBB_ASSERT(g.my_task_arena && g.my_task_arena->is_active(), NULL); 453 submit( *gt, *g.my_task_arena, *g.my_context 454 #if __TBB_PREVIEW_CRITICAL_TASKS 455 , /*as_critical=*/false 456 #endif 457 ); 458 } 459 } 460 461 // TODO revamp: unify *_in_graph_arena functions 462 463 //! Enqueues a task inside graph arena 464 inline void enqueue_in_graph_arena(graph &g, graph_task& arena_task) { 465 if (is_graph_active(g)) { 466 __TBB_ASSERT( g.my_task_arena && g.my_task_arena->is_active(), "Is graph's arena initialized and active?" ); 467 468 // TODO revamp: decide on the approach that does not postpone critical task 469 if( task* gt = prioritize_task(g, arena_task) ) 470 submit( *gt, *g.my_task_arena, *g.my_context, /*as_critical=*/false); 471 } 472 } 473 474 } // namespace d1 475 } // namespace detail 476 } // namespace tbb 477 478 #endif // __TBB_flow_graph_impl_H 479