1 /* 2 Copyright (c) 2005-2022 Intel Corporation 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef __TBB_flow_graph_H 18 #define __TBB_flow_graph_H 19 20 #include <atomic> 21 #include <memory> 22 #include <type_traits> 23 24 #include "detail/_config.h" 25 #include "detail/_namespace_injection.h" 26 #include "spin_mutex.h" 27 #include "null_mutex.h" 28 #include "spin_rw_mutex.h" 29 #include "null_rw_mutex.h" 30 #include "detail/_pipeline_filters.h" 31 #include "detail/_task.h" 32 #include "detail/_small_object_pool.h" 33 #include "cache_aligned_allocator.h" 34 #include "detail/_exception.h" 35 #include "detail/_template_helpers.h" 36 #include "detail/_aggregator.h" 37 #include "detail/_allocator_traits.h" 38 #include "detail/_utils.h" 39 #include "profiling.h" 40 #include "task_arena.h" 41 42 #if TBB_USE_PROFILING_TOOLS && ( __unix__ || __APPLE__ ) 43 #if __INTEL_COMPILER 44 // Disabled warning "routine is both inline and noinline" 45 #pragma warning (push) 46 #pragma warning( disable: 2196 ) 47 #endif 48 #define __TBB_NOINLINE_SYM __attribute__((noinline)) 49 #else 50 #define __TBB_NOINLINE_SYM 51 #endif 52 53 #include <tuple> 54 #include <list> 55 #include <queue> 56 #if __TBB_CPP20_CONCEPTS_PRESENT 57 #include <concepts> 58 #endif 59 60 /** @file 61 \brief The graph related classes and functions 62 63 There are some applications that best express dependencies as messages 64 passed between nodes in a graph. These messages may contain data or 65 simply act as signals that a predecessors has completed. The graph 66 class and its associated node classes can be used to express such 67 applications. 68 */ 69 70 namespace tbb { 71 namespace detail { 72 73 namespace d1 { 74 75 //! An enumeration the provides the two most common concurrency levels: unlimited and serial 76 enum concurrency { unlimited = 0, serial = 1 }; 77 78 //! A generic null type 79 struct null_type {}; 80 81 //! An empty class used for messages that mean "I'm done" 82 class continue_msg {}; 83 84 } // namespace d1 85 86 #if __TBB_CPP20_CONCEPTS_PRESENT 87 namespace d0 { 88 89 template <typename ReturnType, typename OutputType> 90 concept node_body_return_type = std::same_as<OutputType, tbb::detail::d1::continue_msg> || 91 std::same_as<OutputType, ReturnType>; 92 93 template <typename Body, typename Output> 94 concept continue_node_body = std::copy_constructible<Body> && 95 requires( Body& body, const tbb::detail::d1::continue_msg& v ) { 96 { body(v) } -> node_body_return_type<Output>; 97 }; 98 99 template <typename Body, typename Input, typename Output> 100 concept function_node_body = std::copy_constructible<Body> && 101 requires( Body& body, const Input& v ) { 102 { body(v) } -> node_body_return_type<Output>; 103 }; 104 105 template <typename FunctionObject, typename Input, typename Key> 106 concept join_node_function_object = std::copy_constructible<FunctionObject> && 107 requires( FunctionObject& func, const Input& v ) { 108 { func(v) } -> adaptive_same_as<Key>; 109 }; 110 111 template <typename Body, typename Output> 112 concept input_node_body = std::copy_constructible<Body> && 113 requires( Body& body, tbb::detail::d1::flow_control& fc ) { 114 { body(fc) } -> adaptive_same_as<Output>; 115 }; 116 117 template <typename Body, typename Input, typename OutputPortsType> 118 concept multifunction_node_body = std::copy_constructible<Body> && 119 requires( Body& body, const Input& v, OutputPortsType& p ) { 120 body(v, p); 121 }; 122 123 template <typename Sequencer, typename Value> 124 concept sequencer = std::copy_constructible<Sequencer> && 125 requires( Sequencer& seq, const Value& value ) { 126 { seq(value) } -> adaptive_same_as<std::size_t>; 127 }; 128 129 template <typename Body, typename Input, typename GatewayType> 130 concept async_node_body = std::copy_constructible<Body> && 131 requires( Body& body, const Input& v, GatewayType& gateway ) { 132 body(v, gateway); 133 }; 134 135 } // namespace d0 136 #endif // __TBB_CPP20_CONCEPTS_PRESENT 137 138 namespace d1 { 139 140 //! Forward declaration section 141 template< typename T > class sender; 142 template< typename T > class receiver; 143 class continue_receiver; 144 145 template< typename T, typename U > class limiter_node; // needed for resetting decrementer 146 147 template<typename T, typename M> class successor_cache; 148 template<typename T, typename M> class broadcast_cache; 149 template<typename T, typename M> class round_robin_cache; 150 template<typename T, typename M> class predecessor_cache; 151 template<typename T, typename M> class reservable_predecessor_cache; 152 153 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 154 namespace order { 155 struct following; 156 struct preceding; 157 } 158 template<typename Order, typename... Args> struct node_set; 159 #endif 160 161 162 } // namespace d1 163 } // namespace detail 164 } // namespace tbb 165 166 //! The graph class 167 #include "detail/_flow_graph_impl.h" 168 169 namespace tbb { 170 namespace detail { 171 namespace d1 { 172 173 static inline std::pair<graph_task*, graph_task*> order_tasks(graph_task* first, graph_task* second) { 174 if (second->priority > first->priority) 175 return std::make_pair(second, first); 176 return std::make_pair(first, second); 177 } 178 179 // submit task if necessary. Returns the non-enqueued task if there is one. 180 static inline graph_task* combine_tasks(graph& g, graph_task* left, graph_task* right) { 181 // if no RHS task, don't change left. 182 if (right == nullptr) return left; 183 // right != nullptr 184 if (left == nullptr) return right; 185 if (left == SUCCESSFULLY_ENQUEUED) return right; 186 // left contains a task 187 if (right != SUCCESSFULLY_ENQUEUED) { 188 // both are valid tasks 189 auto tasks_pair = order_tasks(left, right); 190 spawn_in_graph_arena(g, *tasks_pair.first); 191 return tasks_pair.second; 192 } 193 return left; 194 } 195 196 //! Pure virtual template class that defines a sender of messages of type T 197 template< typename T > 198 class sender { 199 public: 200 virtual ~sender() {} 201 202 //! Request an item from the sender 203 virtual bool try_get( T & ) { return false; } 204 205 //! Reserves an item in the sender 206 virtual bool try_reserve( T & ) { return false; } 207 208 //! Releases the reserved item 209 virtual bool try_release( ) { return false; } 210 211 //! Consumes the reserved item 212 virtual bool try_consume( ) { return false; } 213 214 protected: 215 //! The output type of this sender 216 typedef T output_type; 217 218 //! The successor type for this node 219 typedef receiver<T> successor_type; 220 221 //! Add a new successor to this node 222 virtual bool register_successor( successor_type &r ) = 0; 223 224 //! Removes a successor from this node 225 virtual bool remove_successor( successor_type &r ) = 0; 226 227 template<typename C> 228 friend bool register_successor(sender<C>& s, receiver<C>& r); 229 230 template<typename C> 231 friend bool remove_successor (sender<C>& s, receiver<C>& r); 232 }; // class sender<T> 233 234 template<typename C> 235 bool register_successor(sender<C>& s, receiver<C>& r) { 236 return s.register_successor(r); 237 } 238 239 template<typename C> 240 bool remove_successor(sender<C>& s, receiver<C>& r) { 241 return s.remove_successor(r); 242 } 243 244 //! Pure virtual template class that defines a receiver of messages of type T 245 template< typename T > 246 class receiver { 247 public: 248 //! Destructor 249 virtual ~receiver() {} 250 251 //! Put an item to the receiver 252 bool try_put( const T& t ) { 253 graph_task *res = try_put_task(t); 254 if (!res) return false; 255 if (res != SUCCESSFULLY_ENQUEUED) spawn_in_graph_arena(graph_reference(), *res); 256 return true; 257 } 258 259 //! put item to successor; return task to run the successor if possible. 260 protected: 261 //! The input type of this receiver 262 typedef T input_type; 263 264 //! The predecessor type for this node 265 typedef sender<T> predecessor_type; 266 267 template< typename R, typename B > friend class run_and_put_task; 268 template< typename X, typename Y > friend class broadcast_cache; 269 template< typename X, typename Y > friend class round_robin_cache; 270 virtual graph_task *try_put_task(const T& t) = 0; 271 virtual graph& graph_reference() const = 0; 272 273 template<typename TT, typename M> friend class successor_cache; 274 virtual bool is_continue_receiver() { return false; } 275 276 // TODO revamp: reconsider the inheritance and move node priority out of receiver 277 virtual node_priority_t priority() const { return no_priority; } 278 279 //! Add a predecessor to the node 280 virtual bool register_predecessor( predecessor_type & ) { return false; } 281 282 //! Remove a predecessor from the node 283 virtual bool remove_predecessor( predecessor_type & ) { return false; } 284 285 template <typename C> 286 friend bool register_predecessor(receiver<C>& r, sender<C>& s); 287 template <typename C> 288 friend bool remove_predecessor (receiver<C>& r, sender<C>& s); 289 }; // class receiver<T> 290 291 template <typename C> 292 bool register_predecessor(receiver<C>& r, sender<C>& s) { 293 return r.register_predecessor(s); 294 } 295 296 template <typename C> 297 bool remove_predecessor(receiver<C>& r, sender<C>& s) { 298 return r.remove_predecessor(s); 299 } 300 301 //! Base class for receivers of completion messages 302 /** These receivers automatically reset, but cannot be explicitly waited on */ 303 class continue_receiver : public receiver< continue_msg > { 304 protected: 305 306 //! Constructor 307 explicit continue_receiver( int number_of_predecessors, node_priority_t a_priority ) { 308 my_predecessor_count = my_initial_predecessor_count = number_of_predecessors; 309 my_current_count = 0; 310 my_priority = a_priority; 311 } 312 313 //! Copy constructor 314 continue_receiver( const continue_receiver& src ) : receiver<continue_msg>() { 315 my_predecessor_count = my_initial_predecessor_count = src.my_initial_predecessor_count; 316 my_current_count = 0; 317 my_priority = src.my_priority; 318 } 319 320 //! Increments the trigger threshold 321 bool register_predecessor( predecessor_type & ) override { 322 spin_mutex::scoped_lock l(my_mutex); 323 ++my_predecessor_count; 324 return true; 325 } 326 327 //! Decrements the trigger threshold 328 /** Does not check to see if the removal of the predecessor now makes the current count 329 exceed the new threshold. So removing a predecessor while the graph is active can cause 330 unexpected results. */ 331 bool remove_predecessor( predecessor_type & ) override { 332 spin_mutex::scoped_lock l(my_mutex); 333 --my_predecessor_count; 334 return true; 335 } 336 337 //! The input type 338 typedef continue_msg input_type; 339 340 //! The predecessor type for this node 341 typedef receiver<input_type>::predecessor_type predecessor_type; 342 343 template< typename R, typename B > friend class run_and_put_task; 344 template<typename X, typename Y> friend class broadcast_cache; 345 template<typename X, typename Y> friend class round_robin_cache; 346 // execute body is supposed to be too small to create a task for. 347 graph_task* try_put_task( const input_type & ) override { 348 { 349 spin_mutex::scoped_lock l(my_mutex); 350 if ( ++my_current_count < my_predecessor_count ) 351 return SUCCESSFULLY_ENQUEUED; 352 else 353 my_current_count = 0; 354 } 355 graph_task* res = execute(); 356 return res? res : SUCCESSFULLY_ENQUEUED; 357 } 358 359 spin_mutex my_mutex; 360 int my_predecessor_count; 361 int my_current_count; 362 int my_initial_predecessor_count; 363 node_priority_t my_priority; 364 // the friend declaration in the base class did not eliminate the "protected class" 365 // error in gcc 4.1.2 366 template<typename U, typename V> friend class limiter_node; 367 368 virtual void reset_receiver( reset_flags f ) { 369 my_current_count = 0; 370 if (f & rf_clear_edges) { 371 my_predecessor_count = my_initial_predecessor_count; 372 } 373 } 374 375 //! Does whatever should happen when the threshold is reached 376 /** This should be very fast or else spawn a task. This is 377 called while the sender is blocked in the try_put(). */ 378 virtual graph_task* execute() = 0; 379 template<typename TT, typename M> friend class successor_cache; 380 bool is_continue_receiver() override { return true; } 381 382 node_priority_t priority() const override { return my_priority; } 383 }; // class continue_receiver 384 385 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 386 template <typename K, typename T> 387 K key_from_message( const T &t ) { 388 return t.key(); 389 } 390 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */ 391 392 } // d1 393 } // detail 394 } // tbb 395 396 #include "detail/_flow_graph_trace_impl.h" 397 #include "detail/_hash_compare.h" 398 399 namespace tbb { 400 namespace detail { 401 namespace d1 { 402 403 #include "detail/_flow_graph_body_impl.h" 404 #include "detail/_flow_graph_cache_impl.h" 405 #include "detail/_flow_graph_types_impl.h" 406 407 using namespace graph_policy_namespace; 408 409 template <typename C, typename N> 410 graph_iterator<C,N>::graph_iterator(C *g, bool begin) : my_graph(g), current_node(nullptr) 411 { 412 if (begin) current_node = my_graph->my_nodes; 413 //else it is an end iterator by default 414 } 415 416 template <typename C, typename N> 417 typename graph_iterator<C,N>::reference graph_iterator<C,N>::operator*() const { 418 __TBB_ASSERT(current_node, "graph_iterator at end"); 419 return *operator->(); 420 } 421 422 template <typename C, typename N> 423 typename graph_iterator<C,N>::pointer graph_iterator<C,N>::operator->() const { 424 return current_node; 425 } 426 427 template <typename C, typename N> 428 void graph_iterator<C,N>::internal_forward() { 429 if (current_node) current_node = current_node->next; 430 } 431 432 //! Constructs a graph with isolated task_group_context 433 inline graph::graph() : my_wait_context(0), my_nodes(nullptr), my_nodes_last(nullptr), my_task_arena(nullptr) { 434 prepare_task_arena(); 435 own_context = true; 436 cancelled = false; 437 caught_exception = false; 438 my_context = new (r1::cache_aligned_allocate(sizeof(task_group_context))) task_group_context(FLOW_TASKS); 439 fgt_graph(this); 440 my_is_active = true; 441 } 442 443 inline graph::graph(task_group_context& use_this_context) : 444 my_wait_context(0), my_context(&use_this_context), my_nodes(nullptr), my_nodes_last(nullptr), my_task_arena(nullptr) { 445 prepare_task_arena(); 446 own_context = false; 447 cancelled = false; 448 caught_exception = false; 449 fgt_graph(this); 450 my_is_active = true; 451 } 452 453 inline graph::~graph() { 454 wait_for_all(); 455 if (own_context) { 456 my_context->~task_group_context(); 457 r1::cache_aligned_deallocate(my_context); 458 } 459 delete my_task_arena; 460 } 461 462 inline void graph::reserve_wait() { 463 my_wait_context.reserve(); 464 fgt_reserve_wait(this); 465 } 466 467 inline void graph::release_wait() { 468 fgt_release_wait(this); 469 my_wait_context.release(); 470 } 471 472 inline void graph::register_node(graph_node *n) { 473 n->next = nullptr; 474 { 475 spin_mutex::scoped_lock lock(nodelist_mutex); 476 n->prev = my_nodes_last; 477 if (my_nodes_last) my_nodes_last->next = n; 478 my_nodes_last = n; 479 if (!my_nodes) my_nodes = n; 480 } 481 } 482 483 inline void graph::remove_node(graph_node *n) { 484 { 485 spin_mutex::scoped_lock lock(nodelist_mutex); 486 __TBB_ASSERT(my_nodes && my_nodes_last, "graph::remove_node: Error: no registered nodes"); 487 if (n->prev) n->prev->next = n->next; 488 if (n->next) n->next->prev = n->prev; 489 if (my_nodes_last == n) my_nodes_last = n->prev; 490 if (my_nodes == n) my_nodes = n->next; 491 } 492 n->prev = n->next = nullptr; 493 } 494 495 inline void graph::reset( reset_flags f ) { 496 // reset context 497 deactivate_graph(*this); 498 499 my_context->reset(); 500 cancelled = false; 501 caught_exception = false; 502 // reset all the nodes comprising the graph 503 for(iterator ii = begin(); ii != end(); ++ii) { 504 graph_node *my_p = &(*ii); 505 my_p->reset_node(f); 506 } 507 // Reattach the arena. Might be useful to run the graph in a particular task_arena 508 // while not limiting graph lifetime to a single task_arena::execute() call. 509 prepare_task_arena( /*reinit=*/true ); 510 activate_graph(*this); 511 } 512 513 inline void graph::cancel() { 514 my_context->cancel_group_execution(); 515 } 516 517 inline graph::iterator graph::begin() { return iterator(this, true); } 518 519 inline graph::iterator graph::end() { return iterator(this, false); } 520 521 inline graph::const_iterator graph::begin() const { return const_iterator(this, true); } 522 523 inline graph::const_iterator graph::end() const { return const_iterator(this, false); } 524 525 inline graph::const_iterator graph::cbegin() const { return const_iterator(this, true); } 526 527 inline graph::const_iterator graph::cend() const { return const_iterator(this, false); } 528 529 inline graph_node::graph_node(graph& g) : my_graph(g) { 530 my_graph.register_node(this); 531 } 532 533 inline graph_node::~graph_node() { 534 my_graph.remove_node(this); 535 } 536 537 #include "detail/_flow_graph_node_impl.h" 538 539 540 //! An executable node that acts as a source, i.e. it has no predecessors 541 542 template < typename Output > 543 __TBB_requires(std::copyable<Output>) 544 class input_node : public graph_node, public sender< Output > { 545 public: 546 //! The type of the output message, which is complete 547 typedef Output output_type; 548 549 //! The type of successors of this node 550 typedef typename sender<output_type>::successor_type successor_type; 551 552 // Input node has no input type 553 typedef null_type input_type; 554 555 //! Constructor for a node with a successor 556 template< typename Body > 557 __TBB_requires(input_node_body<Body, Output>) 558 __TBB_NOINLINE_SYM input_node( graph &g, Body body ) 559 : graph_node(g), my_active(false) 560 , my_body( new input_body_leaf< output_type, Body>(body) ) 561 , my_init_body( new input_body_leaf< output_type, Body>(body) ) 562 , my_successors(this), my_reserved(false), my_has_cached_item(false) 563 { 564 fgt_node_with_body(CODEPTR(), FLOW_INPUT_NODE, &this->my_graph, 565 static_cast<sender<output_type> *>(this), this->my_body); 566 } 567 568 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 569 template <typename Body, typename... Successors> 570 __TBB_requires(input_node_body<Body, Output>) 571 input_node( const node_set<order::preceding, Successors...>& successors, Body body ) 572 : input_node(successors.graph_reference(), body) 573 { 574 make_edges(*this, successors); 575 } 576 #endif 577 578 //! Copy constructor 579 __TBB_NOINLINE_SYM input_node( const input_node& src ) 580 : graph_node(src.my_graph), sender<Output>() 581 , my_active(false) 582 , my_body(src.my_init_body->clone()), my_init_body(src.my_init_body->clone()) 583 , my_successors(this), my_reserved(false), my_has_cached_item(false) 584 { 585 fgt_node_with_body(CODEPTR(), FLOW_INPUT_NODE, &this->my_graph, 586 static_cast<sender<output_type> *>(this), this->my_body); 587 } 588 589 //! The destructor 590 ~input_node() { delete my_body; delete my_init_body; } 591 592 //! Add a new successor to this node 593 bool register_successor( successor_type &r ) override { 594 spin_mutex::scoped_lock lock(my_mutex); 595 my_successors.register_successor(r); 596 if ( my_active ) 597 spawn_put(); 598 return true; 599 } 600 601 //! Removes a successor from this node 602 bool remove_successor( successor_type &r ) override { 603 spin_mutex::scoped_lock lock(my_mutex); 604 my_successors.remove_successor(r); 605 return true; 606 } 607 608 //! Request an item from the node 609 bool try_get( output_type &v ) override { 610 spin_mutex::scoped_lock lock(my_mutex); 611 if ( my_reserved ) 612 return false; 613 614 if ( my_has_cached_item ) { 615 v = my_cached_item; 616 my_has_cached_item = false; 617 return true; 618 } 619 // we've been asked to provide an item, but we have none. enqueue a task to 620 // provide one. 621 if ( my_active ) 622 spawn_put(); 623 return false; 624 } 625 626 //! Reserves an item. 627 bool try_reserve( output_type &v ) override { 628 spin_mutex::scoped_lock lock(my_mutex); 629 if ( my_reserved ) { 630 return false; 631 } 632 633 if ( my_has_cached_item ) { 634 v = my_cached_item; 635 my_reserved = true; 636 return true; 637 } else { 638 return false; 639 } 640 } 641 642 //! Release a reserved item. 643 /** true = item has been released and so remains in sender, dest must request or reserve future items */ 644 bool try_release( ) override { 645 spin_mutex::scoped_lock lock(my_mutex); 646 __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" ); 647 my_reserved = false; 648 if(!my_successors.empty()) 649 spawn_put(); 650 return true; 651 } 652 653 //! Consumes a reserved item 654 bool try_consume( ) override { 655 spin_mutex::scoped_lock lock(my_mutex); 656 __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" ); 657 my_reserved = false; 658 my_has_cached_item = false; 659 if ( !my_successors.empty() ) { 660 spawn_put(); 661 } 662 return true; 663 } 664 665 //! Activates a node that was created in the inactive state 666 void activate() { 667 spin_mutex::scoped_lock lock(my_mutex); 668 my_active = true; 669 if (!my_successors.empty()) 670 spawn_put(); 671 } 672 673 template<typename Body> 674 Body copy_function_object() { 675 input_body<output_type> &body_ref = *this->my_body; 676 return dynamic_cast< input_body_leaf<output_type, Body> & >(body_ref).get_body(); 677 } 678 679 protected: 680 681 //! resets the input_node to its initial state 682 void reset_node( reset_flags f) override { 683 my_active = false; 684 my_reserved = false; 685 my_has_cached_item = false; 686 687 if(f & rf_clear_edges) my_successors.clear(); 688 if(f & rf_reset_bodies) { 689 input_body<output_type> *tmp = my_init_body->clone(); 690 delete my_body; 691 my_body = tmp; 692 } 693 } 694 695 private: 696 spin_mutex my_mutex; 697 bool my_active; 698 input_body<output_type> *my_body; 699 input_body<output_type> *my_init_body; 700 broadcast_cache< output_type > my_successors; 701 bool my_reserved; 702 bool my_has_cached_item; 703 output_type my_cached_item; 704 705 // used by apply_body_bypass, can invoke body of node. 706 bool try_reserve_apply_body(output_type &v) { 707 spin_mutex::scoped_lock lock(my_mutex); 708 if ( my_reserved ) { 709 return false; 710 } 711 if ( !my_has_cached_item ) { 712 flow_control control; 713 714 fgt_begin_body( my_body ); 715 716 my_cached_item = (*my_body)(control); 717 my_has_cached_item = !control.is_pipeline_stopped; 718 719 fgt_end_body( my_body ); 720 } 721 if ( my_has_cached_item ) { 722 v = my_cached_item; 723 my_reserved = true; 724 return true; 725 } else { 726 return false; 727 } 728 } 729 730 graph_task* create_put_task() { 731 small_object_allocator allocator{}; 732 typedef input_node_task_bypass< input_node<output_type> > task_type; 733 graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this); 734 my_graph.reserve_wait(); 735 return t; 736 } 737 738 //! Spawns a task that applies the body 739 void spawn_put( ) { 740 if(is_graph_active(this->my_graph)) { 741 spawn_in_graph_arena(this->my_graph, *create_put_task()); 742 } 743 } 744 745 friend class input_node_task_bypass< input_node<output_type> >; 746 //! Applies the body. Returning SUCCESSFULLY_ENQUEUED okay; forward_task_bypass will handle it. 747 graph_task* apply_body_bypass( ) { 748 output_type v; 749 if ( !try_reserve_apply_body(v) ) 750 return nullptr; 751 752 graph_task *last_task = my_successors.try_put_task(v); 753 if ( last_task ) 754 try_consume(); 755 else 756 try_release(); 757 return last_task; 758 } 759 }; // class input_node 760 761 //! Implements a function node that supports Input -> Output 762 template<typename Input, typename Output = continue_msg, typename Policy = queueing> 763 __TBB_requires(std::default_initializable<Input> && 764 std::copy_constructible<Input> && 765 std::copy_constructible<Output>) 766 class function_node 767 : public graph_node 768 , public function_input< Input, Output, Policy, cache_aligned_allocator<Input> > 769 , public function_output<Output> 770 { 771 typedef cache_aligned_allocator<Input> internals_allocator; 772 773 public: 774 typedef Input input_type; 775 typedef Output output_type; 776 typedef function_input<input_type,output_type,Policy,internals_allocator> input_impl_type; 777 typedef function_input_queue<input_type, internals_allocator> input_queue_type; 778 typedef function_output<output_type> fOutput_type; 779 typedef typename input_impl_type::predecessor_type predecessor_type; 780 typedef typename fOutput_type::successor_type successor_type; 781 782 using input_impl_type::my_predecessors; 783 784 //! Constructor 785 // input_queue_type is allocated here, but destroyed in the function_input_base. 786 // TODO: pass the graph_buffer_policy to the function_input_base so it can all 787 // be done in one place. This would be an interface-breaking change. 788 template< typename Body > 789 __TBB_requires(function_node_body<Body, Input, Output>) 790 __TBB_NOINLINE_SYM function_node( graph &g, size_t concurrency, 791 Body body, Policy = Policy(), node_priority_t a_priority = no_priority ) 792 : graph_node(g), input_impl_type(g, concurrency, body, a_priority), 793 fOutput_type(g) { 794 fgt_node_with_body( CODEPTR(), FLOW_FUNCTION_NODE, &this->my_graph, 795 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body ); 796 } 797 798 template <typename Body> 799 __TBB_requires(function_node_body<Body, Input, Output>) 800 function_node( graph& g, size_t concurrency, Body body, node_priority_t a_priority ) 801 : function_node(g, concurrency, body, Policy(), a_priority) {} 802 803 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 804 template <typename Body, typename... Args> 805 __TBB_requires(function_node_body<Body, Input, Output>) 806 function_node( const node_set<Args...>& nodes, size_t concurrency, Body body, 807 Policy p = Policy(), node_priority_t a_priority = no_priority ) 808 : function_node(nodes.graph_reference(), concurrency, body, p, a_priority) { 809 make_edges_in_order(nodes, *this); 810 } 811 812 template <typename Body, typename... Args> 813 __TBB_requires(function_node_body<Body, Input, Output>) 814 function_node( const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t a_priority ) 815 : function_node(nodes, concurrency, body, Policy(), a_priority) {} 816 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 817 818 //! Copy constructor 819 __TBB_NOINLINE_SYM function_node( const function_node& src ) : 820 graph_node(src.my_graph), 821 input_impl_type(src), 822 fOutput_type(src.my_graph) { 823 fgt_node_with_body( CODEPTR(), FLOW_FUNCTION_NODE, &this->my_graph, 824 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body ); 825 } 826 827 protected: 828 template< typename R, typename B > friend class run_and_put_task; 829 template<typename X, typename Y> friend class broadcast_cache; 830 template<typename X, typename Y> friend class round_robin_cache; 831 using input_impl_type::try_put_task; 832 833 broadcast_cache<output_type> &successors () override { return fOutput_type::my_successors; } 834 835 void reset_node(reset_flags f) override { 836 input_impl_type::reset_function_input(f); 837 // TODO: use clear() instead. 838 if(f & rf_clear_edges) { 839 successors().clear(); 840 my_predecessors.clear(); 841 } 842 __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "function_node successors not empty"); 843 __TBB_ASSERT(this->my_predecessors.empty(), "function_node predecessors not empty"); 844 } 845 846 }; // class function_node 847 848 //! implements a function node that supports Input -> (set of outputs) 849 // Output is a tuple of output types. 850 template<typename Input, typename Output, typename Policy = queueing> 851 __TBB_requires(std::default_initializable<Input> && 852 std::copy_constructible<Input>) 853 class multifunction_node : 854 public graph_node, 855 public multifunction_input 856 < 857 Input, 858 typename wrap_tuple_elements< 859 std::tuple_size<Output>::value, // #elements in tuple 860 multifunction_output, // wrap this around each element 861 Output // the tuple providing the types 862 >::type, 863 Policy, 864 cache_aligned_allocator<Input> 865 > 866 { 867 typedef cache_aligned_allocator<Input> internals_allocator; 868 869 protected: 870 static const int N = std::tuple_size<Output>::value; 871 public: 872 typedef Input input_type; 873 typedef null_type output_type; 874 typedef typename wrap_tuple_elements<N,multifunction_output, Output>::type output_ports_type; 875 typedef multifunction_input< 876 input_type, output_ports_type, Policy, internals_allocator> input_impl_type; 877 typedef function_input_queue<input_type, internals_allocator> input_queue_type; 878 private: 879 using input_impl_type::my_predecessors; 880 public: 881 template<typename Body> 882 __TBB_requires(multifunction_node_body<Body, Input, output_ports_type>) 883 __TBB_NOINLINE_SYM multifunction_node( 884 graph &g, size_t concurrency, 885 Body body, Policy = Policy(), node_priority_t a_priority = no_priority 886 ) : graph_node(g), input_impl_type(g, concurrency, body, a_priority) { 887 fgt_multioutput_node_with_body<N>( 888 CODEPTR(), FLOW_MULTIFUNCTION_NODE, 889 &this->my_graph, static_cast<receiver<input_type> *>(this), 890 this->output_ports(), this->my_body 891 ); 892 } 893 894 template <typename Body> 895 __TBB_requires(multifunction_node_body<Body, Input, output_ports_type>) 896 __TBB_NOINLINE_SYM multifunction_node(graph& g, size_t concurrency, Body body, node_priority_t a_priority) 897 : multifunction_node(g, concurrency, body, Policy(), a_priority) {} 898 899 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 900 template <typename Body, typename... Args> 901 __TBB_requires(multifunction_node_body<Body, Input, output_ports_type>) 902 __TBB_NOINLINE_SYM multifunction_node(const node_set<Args...>& nodes, size_t concurrency, Body body, 903 Policy p = Policy(), node_priority_t a_priority = no_priority) 904 : multifunction_node(nodes.graph_reference(), concurrency, body, p, a_priority) { 905 make_edges_in_order(nodes, *this); 906 } 907 908 template <typename Body, typename... Args> 909 __TBB_requires(multifunction_node_body<Body, Input, output_ports_type>) 910 __TBB_NOINLINE_SYM multifunction_node(const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t a_priority) 911 : multifunction_node(nodes, concurrency, body, Policy(), a_priority) {} 912 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 913 914 __TBB_NOINLINE_SYM multifunction_node( const multifunction_node &other) : 915 graph_node(other.my_graph), input_impl_type(other) { 916 fgt_multioutput_node_with_body<N>( CODEPTR(), FLOW_MULTIFUNCTION_NODE, 917 &this->my_graph, static_cast<receiver<input_type> *>(this), 918 this->output_ports(), this->my_body ); 919 } 920 921 // all the guts are in multifunction_input... 922 protected: 923 void reset_node(reset_flags f) override { input_impl_type::reset(f); } 924 }; // multifunction_node 925 926 //! split_node: accepts a tuple as input, forwards each element of the tuple to its 927 // successors. The node has unlimited concurrency, so it does not reject inputs. 928 template<typename TupleType> 929 class split_node : public graph_node, public receiver<TupleType> { 930 static const int N = std::tuple_size<TupleType>::value; 931 typedef receiver<TupleType> base_type; 932 public: 933 typedef TupleType input_type; 934 typedef typename wrap_tuple_elements< 935 N, // #elements in tuple 936 multifunction_output, // wrap this around each element 937 TupleType // the tuple providing the types 938 >::type output_ports_type; 939 940 __TBB_NOINLINE_SYM explicit split_node(graph &g) 941 : graph_node(g), 942 my_output_ports(init_output_ports<output_ports_type>::call(g, my_output_ports)) 943 { 944 fgt_multioutput_node<N>(CODEPTR(), FLOW_SPLIT_NODE, &this->my_graph, 945 static_cast<receiver<input_type> *>(this), this->output_ports()); 946 } 947 948 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 949 template <typename... Args> 950 __TBB_NOINLINE_SYM split_node(const node_set<Args...>& nodes) : split_node(nodes.graph_reference()) { 951 make_edges_in_order(nodes, *this); 952 } 953 #endif 954 955 __TBB_NOINLINE_SYM split_node(const split_node& other) 956 : graph_node(other.my_graph), base_type(other), 957 my_output_ports(init_output_ports<output_ports_type>::call(other.my_graph, my_output_ports)) 958 { 959 fgt_multioutput_node<N>(CODEPTR(), FLOW_SPLIT_NODE, &this->my_graph, 960 static_cast<receiver<input_type> *>(this), this->output_ports()); 961 } 962 963 output_ports_type &output_ports() { return my_output_ports; } 964 965 protected: 966 graph_task *try_put_task(const TupleType& t) override { 967 // Sending split messages in parallel is not justified, as overheads would prevail. 968 // Also, we do not have successors here. So we just tell the task returned here is successful. 969 return emit_element<N>::emit_this(this->my_graph, t, output_ports()); 970 } 971 void reset_node(reset_flags f) override { 972 if (f & rf_clear_edges) 973 clear_element<N>::clear_this(my_output_ports); 974 975 __TBB_ASSERT(!(f & rf_clear_edges) || clear_element<N>::this_empty(my_output_ports), "split_node reset failed"); 976 } 977 graph& graph_reference() const override { 978 return my_graph; 979 } 980 981 private: 982 output_ports_type my_output_ports; 983 }; 984 985 //! Implements an executable node that supports continue_msg -> Output 986 template <typename Output, typename Policy = Policy<void> > 987 __TBB_requires(std::copy_constructible<Output>) 988 class continue_node : public graph_node, public continue_input<Output, Policy>, 989 public function_output<Output> { 990 public: 991 typedef continue_msg input_type; 992 typedef Output output_type; 993 typedef continue_input<Output, Policy> input_impl_type; 994 typedef function_output<output_type> fOutput_type; 995 typedef typename input_impl_type::predecessor_type predecessor_type; 996 typedef typename fOutput_type::successor_type successor_type; 997 998 //! Constructor for executable node with continue_msg -> Output 999 template <typename Body > 1000 __TBB_requires(continue_node_body<Body, Output>) 1001 __TBB_NOINLINE_SYM continue_node( 1002 graph &g, 1003 Body body, Policy = Policy(), node_priority_t a_priority = no_priority 1004 ) : graph_node(g), input_impl_type( g, body, a_priority ), 1005 fOutput_type(g) { 1006 fgt_node_with_body( CODEPTR(), FLOW_CONTINUE_NODE, &this->my_graph, 1007 1008 static_cast<receiver<input_type> *>(this), 1009 static_cast<sender<output_type> *>(this), this->my_body ); 1010 } 1011 1012 template <typename Body> 1013 __TBB_requires(continue_node_body<Body, Output>) 1014 continue_node( graph& g, Body body, node_priority_t a_priority ) 1015 : continue_node(g, body, Policy(), a_priority) {} 1016 1017 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 1018 template <typename Body, typename... Args> 1019 __TBB_requires(continue_node_body<Body, Output>) 1020 continue_node( const node_set<Args...>& nodes, Body body, 1021 Policy p = Policy(), node_priority_t a_priority = no_priority ) 1022 : continue_node(nodes.graph_reference(), body, p, a_priority ) { 1023 make_edges_in_order(nodes, *this); 1024 } 1025 template <typename Body, typename... Args> 1026 __TBB_requires(continue_node_body<Body, Output>) 1027 continue_node( const node_set<Args...>& nodes, Body body, node_priority_t a_priority) 1028 : continue_node(nodes, body, Policy(), a_priority) {} 1029 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 1030 1031 //! Constructor for executable node with continue_msg -> Output 1032 template <typename Body > 1033 __TBB_requires(continue_node_body<Body, Output>) 1034 __TBB_NOINLINE_SYM continue_node( 1035 graph &g, int number_of_predecessors, 1036 Body body, Policy = Policy(), node_priority_t a_priority = no_priority 1037 ) : graph_node(g) 1038 , input_impl_type(g, number_of_predecessors, body, a_priority), 1039 fOutput_type(g) { 1040 fgt_node_with_body( CODEPTR(), FLOW_CONTINUE_NODE, &this->my_graph, 1041 static_cast<receiver<input_type> *>(this), 1042 static_cast<sender<output_type> *>(this), this->my_body ); 1043 } 1044 1045 template <typename Body> 1046 __TBB_requires(continue_node_body<Body, Output>) 1047 continue_node( graph& g, int number_of_predecessors, Body body, node_priority_t a_priority) 1048 : continue_node(g, number_of_predecessors, body, Policy(), a_priority) {} 1049 1050 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 1051 template <typename Body, typename... Args> 1052 __TBB_requires(continue_node_body<Body, Output>) 1053 continue_node( const node_set<Args...>& nodes, int number_of_predecessors, 1054 Body body, Policy p = Policy(), node_priority_t a_priority = no_priority ) 1055 : continue_node(nodes.graph_reference(), number_of_predecessors, body, p, a_priority) { 1056 make_edges_in_order(nodes, *this); 1057 } 1058 1059 template <typename Body, typename... Args> 1060 __TBB_requires(continue_node_body<Body, Output>) 1061 continue_node( const node_set<Args...>& nodes, int number_of_predecessors, 1062 Body body, node_priority_t a_priority ) 1063 : continue_node(nodes, number_of_predecessors, body, Policy(), a_priority) {} 1064 #endif 1065 1066 //! Copy constructor 1067 __TBB_NOINLINE_SYM continue_node( const continue_node& src ) : 1068 graph_node(src.my_graph), input_impl_type(src), 1069 function_output<Output>(src.my_graph) { 1070 fgt_node_with_body( CODEPTR(), FLOW_CONTINUE_NODE, &this->my_graph, 1071 static_cast<receiver<input_type> *>(this), 1072 static_cast<sender<output_type> *>(this), this->my_body ); 1073 } 1074 1075 protected: 1076 template< typename R, typename B > friend class run_and_put_task; 1077 template<typename X, typename Y> friend class broadcast_cache; 1078 template<typename X, typename Y> friend class round_robin_cache; 1079 using input_impl_type::try_put_task; 1080 broadcast_cache<output_type> &successors () override { return fOutput_type::my_successors; } 1081 1082 void reset_node(reset_flags f) override { 1083 input_impl_type::reset_receiver(f); 1084 if(f & rf_clear_edges)successors().clear(); 1085 __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "continue_node not reset"); 1086 } 1087 }; // continue_node 1088 1089 //! Forwards messages of type T to all successors 1090 template <typename T> 1091 class broadcast_node : public graph_node, public receiver<T>, public sender<T> { 1092 public: 1093 typedef T input_type; 1094 typedef T output_type; 1095 typedef typename receiver<input_type>::predecessor_type predecessor_type; 1096 typedef typename sender<output_type>::successor_type successor_type; 1097 private: 1098 broadcast_cache<input_type> my_successors; 1099 public: 1100 1101 __TBB_NOINLINE_SYM explicit broadcast_node(graph& g) : graph_node(g), my_successors(this) { 1102 fgt_node( CODEPTR(), FLOW_BROADCAST_NODE, &this->my_graph, 1103 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) ); 1104 } 1105 1106 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 1107 template <typename... Args> 1108 broadcast_node(const node_set<Args...>& nodes) : broadcast_node(nodes.graph_reference()) { 1109 make_edges_in_order(nodes, *this); 1110 } 1111 #endif 1112 1113 // Copy constructor 1114 __TBB_NOINLINE_SYM broadcast_node( const broadcast_node& src ) : broadcast_node(src.my_graph) {} 1115 1116 //! Adds a successor 1117 bool register_successor( successor_type &r ) override { 1118 my_successors.register_successor( r ); 1119 return true; 1120 } 1121 1122 //! Removes s as a successor 1123 bool remove_successor( successor_type &r ) override { 1124 my_successors.remove_successor( r ); 1125 return true; 1126 } 1127 1128 protected: 1129 template< typename R, typename B > friend class run_and_put_task; 1130 template<typename X, typename Y> friend class broadcast_cache; 1131 template<typename X, typename Y> friend class round_robin_cache; 1132 //! build a task to run the successor if possible. Default is old behavior. 1133 graph_task *try_put_task(const T& t) override { 1134 graph_task *new_task = my_successors.try_put_task(t); 1135 if (!new_task) new_task = SUCCESSFULLY_ENQUEUED; 1136 return new_task; 1137 } 1138 1139 graph& graph_reference() const override { 1140 return my_graph; 1141 } 1142 1143 void reset_node(reset_flags f) override { 1144 if (f&rf_clear_edges) { 1145 my_successors.clear(); 1146 } 1147 __TBB_ASSERT(!(f & rf_clear_edges) || my_successors.empty(), "Error resetting broadcast_node"); 1148 } 1149 }; // broadcast_node 1150 1151 //! Forwards messages in arbitrary order 1152 template <typename T> 1153 class buffer_node 1154 : public graph_node 1155 , public reservable_item_buffer< T, cache_aligned_allocator<T> > 1156 , public receiver<T>, public sender<T> 1157 { 1158 typedef cache_aligned_allocator<T> internals_allocator; 1159 1160 public: 1161 typedef T input_type; 1162 typedef T output_type; 1163 typedef typename receiver<input_type>::predecessor_type predecessor_type; 1164 typedef typename sender<output_type>::successor_type successor_type; 1165 typedef buffer_node<T> class_type; 1166 1167 protected: 1168 typedef size_t size_type; 1169 round_robin_cache< T, null_rw_mutex > my_successors; 1170 1171 friend class forward_task_bypass< class_type >; 1172 1173 enum op_type {reg_succ, rem_succ, req_item, res_item, rel_res, con_res, put_item, try_fwd_task 1174 }; 1175 1176 // implements the aggregator_operation concept 1177 class buffer_operation : public aggregated_operation< buffer_operation > { 1178 public: 1179 char type; 1180 T* elem; 1181 graph_task* ltask; 1182 successor_type *r; 1183 1184 buffer_operation(const T& e, op_type t) : type(char(t)) 1185 , elem(const_cast<T*>(&e)) , ltask(nullptr) 1186 , r(nullptr) 1187 {} 1188 buffer_operation(op_type t) : type(char(t)), elem(nullptr), ltask(nullptr), r(nullptr) {} 1189 }; 1190 1191 bool forwarder_busy; 1192 typedef aggregating_functor<class_type, buffer_operation> handler_type; 1193 friend class aggregating_functor<class_type, buffer_operation>; 1194 aggregator< handler_type, buffer_operation> my_aggregator; 1195 1196 virtual void handle_operations(buffer_operation *op_list) { 1197 handle_operations_impl(op_list, this); 1198 } 1199 1200 template<typename derived_type> 1201 void handle_operations_impl(buffer_operation *op_list, derived_type* derived) { 1202 __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived"); 1203 1204 buffer_operation *tmp = nullptr; 1205 bool try_forwarding = false; 1206 while (op_list) { 1207 tmp = op_list; 1208 op_list = op_list->next; 1209 switch (tmp->type) { 1210 case reg_succ: internal_reg_succ(tmp); try_forwarding = true; break; 1211 case rem_succ: internal_rem_succ(tmp); break; 1212 case req_item: internal_pop(tmp); break; 1213 case res_item: internal_reserve(tmp); break; 1214 case rel_res: internal_release(tmp); try_forwarding = true; break; 1215 case con_res: internal_consume(tmp); try_forwarding = true; break; 1216 case put_item: try_forwarding = internal_push(tmp); break; 1217 case try_fwd_task: internal_forward_task(tmp); break; 1218 } 1219 } 1220 1221 derived->order(); 1222 1223 if (try_forwarding && !forwarder_busy) { 1224 if(is_graph_active(this->my_graph)) { 1225 forwarder_busy = true; 1226 typedef forward_task_bypass<class_type> task_type; 1227 small_object_allocator allocator{}; 1228 graph_task* new_task = allocator.new_object<task_type>(graph_reference(), allocator, *this); 1229 my_graph.reserve_wait(); 1230 // tmp should point to the last item handled by the aggregator. This is the operation 1231 // the handling thread enqueued. So modifying that record will be okay. 1232 // TODO revamp: check that the issue is still present 1233 // workaround for icc bug (at least 12.0 and 13.0) 1234 // error: function "tbb::flow::interfaceX::combine_tasks" cannot be called with the given argument list 1235 // argument types are: (graph, graph_task *, graph_task *) 1236 graph_task *z = tmp->ltask; 1237 graph &g = this->my_graph; 1238 tmp->ltask = combine_tasks(g, z, new_task); // in case the op generated a task 1239 } 1240 } 1241 } // handle_operations 1242 1243 inline graph_task *grab_forwarding_task( buffer_operation &op_data) { 1244 return op_data.ltask; 1245 } 1246 1247 inline bool enqueue_forwarding_task(buffer_operation &op_data) { 1248 graph_task *ft = grab_forwarding_task(op_data); 1249 if(ft) { 1250 spawn_in_graph_arena(graph_reference(), *ft); 1251 return true; 1252 } 1253 return false; 1254 } 1255 1256 //! This is executed by an enqueued task, the "forwarder" 1257 virtual graph_task *forward_task() { 1258 buffer_operation op_data(try_fwd_task); 1259 graph_task *last_task = nullptr; 1260 do { 1261 op_data.status = WAIT; 1262 op_data.ltask = nullptr; 1263 my_aggregator.execute(&op_data); 1264 1265 // workaround for icc bug 1266 graph_task *xtask = op_data.ltask; 1267 graph& g = this->my_graph; 1268 last_task = combine_tasks(g, last_task, xtask); 1269 } while (op_data.status ==SUCCEEDED); 1270 return last_task; 1271 } 1272 1273 //! Register successor 1274 virtual void internal_reg_succ(buffer_operation *op) { 1275 __TBB_ASSERT(op->r, nullptr); 1276 my_successors.register_successor(*(op->r)); 1277 op->status.store(SUCCEEDED, std::memory_order_release); 1278 } 1279 1280 //! Remove successor 1281 virtual void internal_rem_succ(buffer_operation *op) { 1282 __TBB_ASSERT(op->r, nullptr); 1283 my_successors.remove_successor(*(op->r)); 1284 op->status.store(SUCCEEDED, std::memory_order_release); 1285 } 1286 1287 private: 1288 void order() {} 1289 1290 bool is_item_valid() { 1291 return this->my_item_valid(this->my_tail - 1); 1292 } 1293 1294 void try_put_and_add_task(graph_task*& last_task) { 1295 graph_task *new_task = my_successors.try_put_task(this->back()); 1296 if (new_task) { 1297 // workaround for icc bug 1298 graph& g = this->my_graph; 1299 last_task = combine_tasks(g, last_task, new_task); 1300 this->destroy_back(); 1301 } 1302 } 1303 1304 protected: 1305 //! Tries to forward valid items to successors 1306 virtual void internal_forward_task(buffer_operation *op) { 1307 internal_forward_task_impl(op, this); 1308 } 1309 1310 template<typename derived_type> 1311 void internal_forward_task_impl(buffer_operation *op, derived_type* derived) { 1312 __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived"); 1313 1314 if (this->my_reserved || !derived->is_item_valid()) { 1315 op->status.store(FAILED, std::memory_order_release); 1316 this->forwarder_busy = false; 1317 return; 1318 } 1319 // Try forwarding, giving each successor a chance 1320 graph_task* last_task = nullptr; 1321 size_type counter = my_successors.size(); 1322 for (; counter > 0 && derived->is_item_valid(); --counter) 1323 derived->try_put_and_add_task(last_task); 1324 1325 op->ltask = last_task; // return task 1326 if (last_task && !counter) { 1327 op->status.store(SUCCEEDED, std::memory_order_release); 1328 } 1329 else { 1330 op->status.store(FAILED, std::memory_order_release); 1331 forwarder_busy = false; 1332 } 1333 } 1334 1335 virtual bool internal_push(buffer_operation *op) { 1336 __TBB_ASSERT(op->elem, nullptr); 1337 this->push_back(*(op->elem)); 1338 op->status.store(SUCCEEDED, std::memory_order_release); 1339 return true; 1340 } 1341 1342 virtual void internal_pop(buffer_operation *op) { 1343 __TBB_ASSERT(op->elem, nullptr); 1344 if(this->pop_back(*(op->elem))) { 1345 op->status.store(SUCCEEDED, std::memory_order_release); 1346 } 1347 else { 1348 op->status.store(FAILED, std::memory_order_release); 1349 } 1350 } 1351 1352 virtual void internal_reserve(buffer_operation *op) { 1353 __TBB_ASSERT(op->elem, nullptr); 1354 if(this->reserve_front(*(op->elem))) { 1355 op->status.store(SUCCEEDED, std::memory_order_release); 1356 } 1357 else { 1358 op->status.store(FAILED, std::memory_order_release); 1359 } 1360 } 1361 1362 virtual void internal_consume(buffer_operation *op) { 1363 this->consume_front(); 1364 op->status.store(SUCCEEDED, std::memory_order_release); 1365 } 1366 1367 virtual void internal_release(buffer_operation *op) { 1368 this->release_front(); 1369 op->status.store(SUCCEEDED, std::memory_order_release); 1370 } 1371 1372 public: 1373 //! Constructor 1374 __TBB_NOINLINE_SYM explicit buffer_node( graph &g ) 1375 : graph_node(g), reservable_item_buffer<T, internals_allocator>(), receiver<T>(), 1376 sender<T>(), my_successors(this), forwarder_busy(false) 1377 { 1378 my_aggregator.initialize_handler(handler_type(this)); 1379 fgt_node( CODEPTR(), FLOW_BUFFER_NODE, &this->my_graph, 1380 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) ); 1381 } 1382 1383 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 1384 template <typename... Args> 1385 buffer_node(const node_set<Args...>& nodes) : buffer_node(nodes.graph_reference()) { 1386 make_edges_in_order(nodes, *this); 1387 } 1388 #endif 1389 1390 //! Copy constructor 1391 __TBB_NOINLINE_SYM buffer_node( const buffer_node& src ) : buffer_node(src.my_graph) {} 1392 1393 // 1394 // message sender implementation 1395 // 1396 1397 //! Adds a new successor. 1398 /** Adds successor r to the list of successors; may forward tasks. */ 1399 bool register_successor( successor_type &r ) override { 1400 buffer_operation op_data(reg_succ); 1401 op_data.r = &r; 1402 my_aggregator.execute(&op_data); 1403 (void)enqueue_forwarding_task(op_data); 1404 return true; 1405 } 1406 1407 //! Removes a successor. 1408 /** Removes successor r from the list of successors. 1409 It also calls r.remove_predecessor(*this) to remove this node as a predecessor. */ 1410 bool remove_successor( successor_type &r ) override { 1411 // TODO revamp: investigate why full qualification is necessary here 1412 tbb::detail::d1::remove_predecessor(r, *this); 1413 buffer_operation op_data(rem_succ); 1414 op_data.r = &r; 1415 my_aggregator.execute(&op_data); 1416 // even though this operation does not cause a forward, if we are the handler, and 1417 // a forward is scheduled, we may be the first to reach this point after the aggregator, 1418 // and so should check for the task. 1419 (void)enqueue_forwarding_task(op_data); 1420 return true; 1421 } 1422 1423 //! Request an item from the buffer_node 1424 /** true = v contains the returned item<BR> 1425 false = no item has been returned */ 1426 bool try_get( T &v ) override { 1427 buffer_operation op_data(req_item); 1428 op_data.elem = &v; 1429 my_aggregator.execute(&op_data); 1430 (void)enqueue_forwarding_task(op_data); 1431 return (op_data.status==SUCCEEDED); 1432 } 1433 1434 //! Reserves an item. 1435 /** false = no item can be reserved<BR> 1436 true = an item is reserved */ 1437 bool try_reserve( T &v ) override { 1438 buffer_operation op_data(res_item); 1439 op_data.elem = &v; 1440 my_aggregator.execute(&op_data); 1441 (void)enqueue_forwarding_task(op_data); 1442 return (op_data.status==SUCCEEDED); 1443 } 1444 1445 //! Release a reserved item. 1446 /** true = item has been released and so remains in sender */ 1447 bool try_release() override { 1448 buffer_operation op_data(rel_res); 1449 my_aggregator.execute(&op_data); 1450 (void)enqueue_forwarding_task(op_data); 1451 return true; 1452 } 1453 1454 //! Consumes a reserved item. 1455 /** true = item is removed from sender and reservation removed */ 1456 bool try_consume() override { 1457 buffer_operation op_data(con_res); 1458 my_aggregator.execute(&op_data); 1459 (void)enqueue_forwarding_task(op_data); 1460 return true; 1461 } 1462 1463 protected: 1464 1465 template< typename R, typename B > friend class run_and_put_task; 1466 template<typename X, typename Y> friend class broadcast_cache; 1467 template<typename X, typename Y> friend class round_robin_cache; 1468 //! receive an item, return a task *if possible 1469 graph_task *try_put_task(const T &t) override { 1470 buffer_operation op_data(t, put_item); 1471 my_aggregator.execute(&op_data); 1472 graph_task *ft = grab_forwarding_task(op_data); 1473 // sequencer_nodes can return failure (if an item has been previously inserted) 1474 // We have to spawn the returned task if our own operation fails. 1475 1476 if(ft && op_data.status ==FAILED) { 1477 // we haven't succeeded queueing the item, but for some reason the 1478 // call returned a task (if another request resulted in a successful 1479 // forward this could happen.) Queue the task and reset the pointer. 1480 spawn_in_graph_arena(graph_reference(), *ft); ft = nullptr; 1481 } 1482 else if(!ft && op_data.status ==SUCCEEDED) { 1483 ft = SUCCESSFULLY_ENQUEUED; 1484 } 1485 return ft; 1486 } 1487 1488 graph& graph_reference() const override { 1489 return my_graph; 1490 } 1491 1492 protected: 1493 void reset_node( reset_flags f) override { 1494 reservable_item_buffer<T, internals_allocator>::reset(); 1495 // TODO: just clear structures 1496 if (f&rf_clear_edges) { 1497 my_successors.clear(); 1498 } 1499 forwarder_busy = false; 1500 } 1501 }; // buffer_node 1502 1503 //! Forwards messages in FIFO order 1504 template <typename T> 1505 class queue_node : public buffer_node<T> { 1506 protected: 1507 typedef buffer_node<T> base_type; 1508 typedef typename base_type::size_type size_type; 1509 typedef typename base_type::buffer_operation queue_operation; 1510 typedef queue_node class_type; 1511 1512 private: 1513 template<typename> friend class buffer_node; 1514 1515 bool is_item_valid() { 1516 return this->my_item_valid(this->my_head); 1517 } 1518 1519 void try_put_and_add_task(graph_task*& last_task) { 1520 graph_task *new_task = this->my_successors.try_put_task(this->front()); 1521 if (new_task) { 1522 // workaround for icc bug 1523 graph& graph_ref = this->graph_reference(); 1524 last_task = combine_tasks(graph_ref, last_task, new_task); 1525 this->destroy_front(); 1526 } 1527 } 1528 1529 protected: 1530 void internal_forward_task(queue_operation *op) override { 1531 this->internal_forward_task_impl(op, this); 1532 } 1533 1534 void internal_pop(queue_operation *op) override { 1535 if ( this->my_reserved || !this->my_item_valid(this->my_head)){ 1536 op->status.store(FAILED, std::memory_order_release); 1537 } 1538 else { 1539 this->pop_front(*(op->elem)); 1540 op->status.store(SUCCEEDED, std::memory_order_release); 1541 } 1542 } 1543 void internal_reserve(queue_operation *op) override { 1544 if (this->my_reserved || !this->my_item_valid(this->my_head)) { 1545 op->status.store(FAILED, std::memory_order_release); 1546 } 1547 else { 1548 this->reserve_front(*(op->elem)); 1549 op->status.store(SUCCEEDED, std::memory_order_release); 1550 } 1551 } 1552 void internal_consume(queue_operation *op) override { 1553 this->consume_front(); 1554 op->status.store(SUCCEEDED, std::memory_order_release); 1555 } 1556 1557 public: 1558 typedef T input_type; 1559 typedef T output_type; 1560 typedef typename receiver<input_type>::predecessor_type predecessor_type; 1561 typedef typename sender<output_type>::successor_type successor_type; 1562 1563 //! Constructor 1564 __TBB_NOINLINE_SYM explicit queue_node( graph &g ) : base_type(g) { 1565 fgt_node( CODEPTR(), FLOW_QUEUE_NODE, &(this->my_graph), 1566 static_cast<receiver<input_type> *>(this), 1567 static_cast<sender<output_type> *>(this) ); 1568 } 1569 1570 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 1571 template <typename... Args> 1572 queue_node( const node_set<Args...>& nodes) : queue_node(nodes.graph_reference()) { 1573 make_edges_in_order(nodes, *this); 1574 } 1575 #endif 1576 1577 //! Copy constructor 1578 __TBB_NOINLINE_SYM queue_node( const queue_node& src) : base_type(src) { 1579 fgt_node( CODEPTR(), FLOW_QUEUE_NODE, &(this->my_graph), 1580 static_cast<receiver<input_type> *>(this), 1581 static_cast<sender<output_type> *>(this) ); 1582 } 1583 1584 1585 protected: 1586 void reset_node( reset_flags f) override { 1587 base_type::reset_node(f); 1588 } 1589 }; // queue_node 1590 1591 //! Forwards messages in sequence order 1592 template <typename T> 1593 __TBB_requires(std::copyable<T>) 1594 class sequencer_node : public queue_node<T> { 1595 function_body< T, size_t > *my_sequencer; 1596 // my_sequencer should be a benign function and must be callable 1597 // from a parallel context. Does this mean it needn't be reset? 1598 public: 1599 typedef T input_type; 1600 typedef T output_type; 1601 typedef typename receiver<input_type>::predecessor_type predecessor_type; 1602 typedef typename sender<output_type>::successor_type successor_type; 1603 1604 //! Constructor 1605 template< typename Sequencer > 1606 __TBB_requires(sequencer<Sequencer, T>) 1607 __TBB_NOINLINE_SYM sequencer_node( graph &g, const Sequencer& s ) : queue_node<T>(g), 1608 my_sequencer(new function_body_leaf< T, size_t, Sequencer>(s) ) { 1609 fgt_node( CODEPTR(), FLOW_SEQUENCER_NODE, &(this->my_graph), 1610 static_cast<receiver<input_type> *>(this), 1611 static_cast<sender<output_type> *>(this) ); 1612 } 1613 1614 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 1615 template <typename Sequencer, typename... Args> 1616 __TBB_requires(sequencer<Sequencer, T>) 1617 sequencer_node( const node_set<Args...>& nodes, const Sequencer& s) 1618 : sequencer_node(nodes.graph_reference(), s) { 1619 make_edges_in_order(nodes, *this); 1620 } 1621 #endif 1622 1623 //! Copy constructor 1624 __TBB_NOINLINE_SYM sequencer_node( const sequencer_node& src ) : queue_node<T>(src), 1625 my_sequencer( src.my_sequencer->clone() ) { 1626 fgt_node( CODEPTR(), FLOW_SEQUENCER_NODE, &(this->my_graph), 1627 static_cast<receiver<input_type> *>(this), 1628 static_cast<sender<output_type> *>(this) ); 1629 } 1630 1631 //! Destructor 1632 ~sequencer_node() { delete my_sequencer; } 1633 1634 protected: 1635 typedef typename buffer_node<T>::size_type size_type; 1636 typedef typename buffer_node<T>::buffer_operation sequencer_operation; 1637 1638 private: 1639 bool internal_push(sequencer_operation *op) override { 1640 size_type tag = (*my_sequencer)(*(op->elem)); 1641 #if !TBB_DEPRECATED_SEQUENCER_DUPLICATES 1642 if (tag < this->my_head) { 1643 // have already emitted a message with this tag 1644 op->status.store(FAILED, std::memory_order_release); 1645 return false; 1646 } 1647 #endif 1648 // cannot modify this->my_tail now; the buffer would be inconsistent. 1649 size_t new_tail = (tag+1 > this->my_tail) ? tag+1 : this->my_tail; 1650 1651 if (this->size(new_tail) > this->capacity()) { 1652 this->grow_my_array(this->size(new_tail)); 1653 } 1654 this->my_tail = new_tail; 1655 1656 const op_stat res = this->place_item(tag, *(op->elem)) ? SUCCEEDED : FAILED; 1657 op->status.store(res, std::memory_order_release); 1658 return res ==SUCCEEDED; 1659 } 1660 }; // sequencer_node 1661 1662 //! Forwards messages in priority order 1663 template<typename T, typename Compare = std::less<T>> 1664 class priority_queue_node : public buffer_node<T> { 1665 public: 1666 typedef T input_type; 1667 typedef T output_type; 1668 typedef buffer_node<T> base_type; 1669 typedef priority_queue_node class_type; 1670 typedef typename receiver<input_type>::predecessor_type predecessor_type; 1671 typedef typename sender<output_type>::successor_type successor_type; 1672 1673 //! Constructor 1674 __TBB_NOINLINE_SYM explicit priority_queue_node( graph &g, const Compare& comp = Compare() ) 1675 : buffer_node<T>(g), compare(comp), mark(0) { 1676 fgt_node( CODEPTR(), FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph), 1677 static_cast<receiver<input_type> *>(this), 1678 static_cast<sender<output_type> *>(this) ); 1679 } 1680 1681 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 1682 template <typename... Args> 1683 priority_queue_node(const node_set<Args...>& nodes, const Compare& comp = Compare()) 1684 : priority_queue_node(nodes.graph_reference(), comp) { 1685 make_edges_in_order(nodes, *this); 1686 } 1687 #endif 1688 1689 //! Copy constructor 1690 __TBB_NOINLINE_SYM priority_queue_node( const priority_queue_node &src ) 1691 : buffer_node<T>(src), mark(0) 1692 { 1693 fgt_node( CODEPTR(), FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph), 1694 static_cast<receiver<input_type> *>(this), 1695 static_cast<sender<output_type> *>(this) ); 1696 } 1697 1698 protected: 1699 1700 void reset_node( reset_flags f) override { 1701 mark = 0; 1702 base_type::reset_node(f); 1703 } 1704 1705 typedef typename buffer_node<T>::size_type size_type; 1706 typedef typename buffer_node<T>::item_type item_type; 1707 typedef typename buffer_node<T>::buffer_operation prio_operation; 1708 1709 //! Tries to forward valid items to successors 1710 void internal_forward_task(prio_operation *op) override { 1711 this->internal_forward_task_impl(op, this); 1712 } 1713 1714 void handle_operations(prio_operation *op_list) override { 1715 this->handle_operations_impl(op_list, this); 1716 } 1717 1718 bool internal_push(prio_operation *op) override { 1719 prio_push(*(op->elem)); 1720 op->status.store(SUCCEEDED, std::memory_order_release); 1721 return true; 1722 } 1723 1724 void internal_pop(prio_operation *op) override { 1725 // if empty or already reserved, don't pop 1726 if ( this->my_reserved == true || this->my_tail == 0 ) { 1727 op->status.store(FAILED, std::memory_order_release); 1728 return; 1729 } 1730 1731 *(op->elem) = prio(); 1732 op->status.store(SUCCEEDED, std::memory_order_release); 1733 prio_pop(); 1734 1735 } 1736 1737 // pops the highest-priority item, saves copy 1738 void internal_reserve(prio_operation *op) override { 1739 if (this->my_reserved == true || this->my_tail == 0) { 1740 op->status.store(FAILED, std::memory_order_release); 1741 return; 1742 } 1743 this->my_reserved = true; 1744 *(op->elem) = prio(); 1745 reserved_item = *(op->elem); 1746 op->status.store(SUCCEEDED, std::memory_order_release); 1747 prio_pop(); 1748 } 1749 1750 void internal_consume(prio_operation *op) override { 1751 op->status.store(SUCCEEDED, std::memory_order_release); 1752 this->my_reserved = false; 1753 reserved_item = input_type(); 1754 } 1755 1756 void internal_release(prio_operation *op) override { 1757 op->status.store(SUCCEEDED, std::memory_order_release); 1758 prio_push(reserved_item); 1759 this->my_reserved = false; 1760 reserved_item = input_type(); 1761 } 1762 1763 private: 1764 template<typename> friend class buffer_node; 1765 1766 void order() { 1767 if (mark < this->my_tail) heapify(); 1768 __TBB_ASSERT(mark == this->my_tail, "mark unequal after heapify"); 1769 } 1770 1771 bool is_item_valid() { 1772 return this->my_tail > 0; 1773 } 1774 1775 void try_put_and_add_task(graph_task*& last_task) { 1776 graph_task * new_task = this->my_successors.try_put_task(this->prio()); 1777 if (new_task) { 1778 // workaround for icc bug 1779 graph& graph_ref = this->graph_reference(); 1780 last_task = combine_tasks(graph_ref, last_task, new_task); 1781 prio_pop(); 1782 } 1783 } 1784 1785 private: 1786 Compare compare; 1787 size_type mark; 1788 1789 input_type reserved_item; 1790 1791 // in case a reheap has not been done after a push, check if the mark item is higher than the 0'th item 1792 bool prio_use_tail() { 1793 __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds before test"); 1794 return mark < this->my_tail && compare(this->get_my_item(0), this->get_my_item(this->my_tail - 1)); 1795 } 1796 1797 // prio_push: checks that the item will fit, expand array if necessary, put at end 1798 void prio_push(const T &src) { 1799 if ( this->my_tail >= this->my_array_size ) 1800 this->grow_my_array( this->my_tail + 1 ); 1801 (void) this->place_item(this->my_tail, src); 1802 ++(this->my_tail); 1803 __TBB_ASSERT(mark < this->my_tail, "mark outside bounds after push"); 1804 } 1805 1806 // prio_pop: deletes highest priority item from the array, and if it is item 1807 // 0, move last item to 0 and reheap. If end of array, just destroy and decrement tail 1808 // and mark. Assumes the array has already been tested for emptiness; no failure. 1809 void prio_pop() { 1810 if (prio_use_tail()) { 1811 // there are newly pushed elements; last one higher than top 1812 // copy the data 1813 this->destroy_item(this->my_tail-1); 1814 --(this->my_tail); 1815 __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop"); 1816 return; 1817 } 1818 this->destroy_item(0); 1819 if(this->my_tail > 1) { 1820 // push the last element down heap 1821 __TBB_ASSERT(this->my_item_valid(this->my_tail - 1), nullptr); 1822 this->move_item(0,this->my_tail - 1); 1823 } 1824 --(this->my_tail); 1825 if(mark > this->my_tail) --mark; 1826 if (this->my_tail > 1) // don't reheap for heap of size 1 1827 reheap(); 1828 __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop"); 1829 } 1830 1831 const T& prio() { 1832 return this->get_my_item(prio_use_tail() ? this->my_tail-1 : 0); 1833 } 1834 1835 // turn array into heap 1836 void heapify() { 1837 if(this->my_tail == 0) { 1838 mark = 0; 1839 return; 1840 } 1841 if (!mark) mark = 1; 1842 for (; mark<this->my_tail; ++mark) { // for each unheaped element 1843 size_type cur_pos = mark; 1844 input_type to_place; 1845 this->fetch_item(mark,to_place); 1846 do { // push to_place up the heap 1847 size_type parent = (cur_pos-1)>>1; 1848 if (!compare(this->get_my_item(parent), to_place)) 1849 break; 1850 this->move_item(cur_pos, parent); 1851 cur_pos = parent; 1852 } while( cur_pos ); 1853 (void) this->place_item(cur_pos, to_place); 1854 } 1855 } 1856 1857 // otherwise heapified array with new root element; rearrange to heap 1858 void reheap() { 1859 size_type cur_pos=0, child=1; 1860 while (child < mark) { 1861 size_type target = child; 1862 if (child+1<mark && 1863 compare(this->get_my_item(child), 1864 this->get_my_item(child+1))) 1865 ++target; 1866 // target now has the higher priority child 1867 if (compare(this->get_my_item(target), 1868 this->get_my_item(cur_pos))) 1869 break; 1870 // swap 1871 this->swap_items(cur_pos, target); 1872 cur_pos = target; 1873 child = (cur_pos<<1)+1; 1874 } 1875 } 1876 }; // priority_queue_node 1877 1878 //! Forwards messages only if the threshold has not been reached 1879 /** This node forwards items until its threshold is reached. 1880 It contains no buffering. If the downstream node rejects, the 1881 message is dropped. */ 1882 template< typename T, typename DecrementType=continue_msg > 1883 class limiter_node : public graph_node, public receiver< T >, public sender< T > { 1884 public: 1885 typedef T input_type; 1886 typedef T output_type; 1887 typedef typename receiver<input_type>::predecessor_type predecessor_type; 1888 typedef typename sender<output_type>::successor_type successor_type; 1889 //TODO: There is a lack of predefined types for its controlling "decrementer" port. It should be fixed later. 1890 1891 private: 1892 size_t my_threshold; 1893 size_t my_count; // number of successful puts 1894 size_t my_tries; // number of active put attempts 1895 size_t my_future_decrement; // number of active decrement 1896 reservable_predecessor_cache< T, spin_mutex > my_predecessors; 1897 spin_mutex my_mutex; 1898 broadcast_cache< T > my_successors; 1899 1900 //! The internal receiver< DecrementType > that adjusts the count 1901 threshold_regulator< limiter_node<T, DecrementType>, DecrementType > decrement; 1902 1903 graph_task* decrement_counter( long long delta ) { 1904 if ( delta > 0 && size_t(delta) > my_threshold ) { 1905 delta = my_threshold; 1906 } 1907 1908 { 1909 spin_mutex::scoped_lock lock(my_mutex); 1910 if ( delta > 0 && size_t(delta) > my_count ) { 1911 if( my_tries > 0 ) { 1912 my_future_decrement += (size_t(delta) - my_count); 1913 } 1914 my_count = 0; 1915 } 1916 else if ( delta < 0 && size_t(-delta) > my_threshold - my_count ) { 1917 my_count = my_threshold; 1918 } 1919 else { 1920 my_count -= size_t(delta); // absolute value of delta is sufficiently small 1921 } 1922 __TBB_ASSERT(my_count <= my_threshold, "counter values are truncated to be inside the [0, threshold] interval"); 1923 } 1924 return forward_task(); 1925 } 1926 1927 // Let threshold_regulator call decrement_counter() 1928 friend class threshold_regulator< limiter_node<T, DecrementType>, DecrementType >; 1929 1930 friend class forward_task_bypass< limiter_node<T,DecrementType> >; 1931 1932 bool check_conditions() { // always called under lock 1933 return ( my_count + my_tries < my_threshold && !my_predecessors.empty() && !my_successors.empty() ); 1934 } 1935 1936 // only returns a valid task pointer or nullptr, never SUCCESSFULLY_ENQUEUED 1937 graph_task* forward_task() { 1938 input_type v; 1939 graph_task* rval = nullptr; 1940 bool reserved = false; 1941 1942 { 1943 spin_mutex::scoped_lock lock(my_mutex); 1944 if ( check_conditions() ) 1945 ++my_tries; 1946 else 1947 return nullptr; 1948 } 1949 1950 //SUCCESS 1951 // if we can reserve and can put, we consume the reservation 1952 // we increment the count and decrement the tries 1953 if ( (my_predecessors.try_reserve(v)) == true ) { 1954 reserved = true; 1955 if ( (rval = my_successors.try_put_task(v)) != nullptr ) { 1956 { 1957 spin_mutex::scoped_lock lock(my_mutex); 1958 ++my_count; 1959 if ( my_future_decrement ) { 1960 if ( my_count > my_future_decrement ) { 1961 my_count -= my_future_decrement; 1962 my_future_decrement = 0; 1963 } 1964 else { 1965 my_future_decrement -= my_count; 1966 my_count = 0; 1967 } 1968 } 1969 --my_tries; 1970 my_predecessors.try_consume(); 1971 if ( check_conditions() ) { 1972 if ( is_graph_active(this->my_graph) ) { 1973 typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type; 1974 small_object_allocator allocator{}; 1975 graph_task* rtask = allocator.new_object<task_type>( my_graph, allocator, *this ); 1976 my_graph.reserve_wait(); 1977 spawn_in_graph_arena(graph_reference(), *rtask); 1978 } 1979 } 1980 } 1981 return rval; 1982 } 1983 } 1984 //FAILURE 1985 //if we can't reserve, we decrement the tries 1986 //if we can reserve but can't put, we decrement the tries and release the reservation 1987 { 1988 spin_mutex::scoped_lock lock(my_mutex); 1989 --my_tries; 1990 if (reserved) my_predecessors.try_release(); 1991 if ( check_conditions() ) { 1992 if ( is_graph_active(this->my_graph) ) { 1993 small_object_allocator allocator{}; 1994 typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type; 1995 graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this); 1996 my_graph.reserve_wait(); 1997 __TBB_ASSERT(!rval, "Have two tasks to handle"); 1998 return t; 1999 } 2000 } 2001 return rval; 2002 } 2003 } 2004 2005 void initialize() { 2006 fgt_node( 2007 CODEPTR(), FLOW_LIMITER_NODE, &this->my_graph, 2008 static_cast<receiver<input_type> *>(this), static_cast<receiver<DecrementType> *>(&decrement), 2009 static_cast<sender<output_type> *>(this) 2010 ); 2011 } 2012 2013 public: 2014 //! Constructor 2015 limiter_node(graph &g, size_t threshold) 2016 : graph_node(g), my_threshold(threshold), my_count(0), my_tries(0), my_future_decrement(0), 2017 my_predecessors(this), my_successors(this), decrement(this) 2018 { 2019 initialize(); 2020 } 2021 2022 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 2023 template <typename... Args> 2024 limiter_node(const node_set<Args...>& nodes, size_t threshold) 2025 : limiter_node(nodes.graph_reference(), threshold) { 2026 make_edges_in_order(nodes, *this); 2027 } 2028 #endif 2029 2030 //! Copy constructor 2031 limiter_node( const limiter_node& src ) : limiter_node(src.my_graph, src.my_threshold) {} 2032 2033 //! The interface for accessing internal receiver< DecrementType > that adjusts the count 2034 receiver<DecrementType>& decrementer() { return decrement; } 2035 2036 //! Replace the current successor with this new successor 2037 bool register_successor( successor_type &r ) override { 2038 spin_mutex::scoped_lock lock(my_mutex); 2039 bool was_empty = my_successors.empty(); 2040 my_successors.register_successor(r); 2041 //spawn a forward task if this is the only successor 2042 if ( was_empty && !my_predecessors.empty() && my_count + my_tries < my_threshold ) { 2043 if ( is_graph_active(this->my_graph) ) { 2044 small_object_allocator allocator{}; 2045 typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type; 2046 graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this); 2047 my_graph.reserve_wait(); 2048 spawn_in_graph_arena(graph_reference(), *t); 2049 } 2050 } 2051 return true; 2052 } 2053 2054 //! Removes a successor from this node 2055 /** r.remove_predecessor(*this) is also called. */ 2056 bool remove_successor( successor_type &r ) override { 2057 // TODO revamp: investigate why qualification is needed for remove_predecessor() call 2058 tbb::detail::d1::remove_predecessor(r, *this); 2059 my_successors.remove_successor(r); 2060 return true; 2061 } 2062 2063 //! Adds src to the list of cached predecessors. 2064 bool register_predecessor( predecessor_type &src ) override { 2065 spin_mutex::scoped_lock lock(my_mutex); 2066 my_predecessors.add( src ); 2067 if ( my_count + my_tries < my_threshold && !my_successors.empty() && is_graph_active(this->my_graph) ) { 2068 small_object_allocator allocator{}; 2069 typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type; 2070 graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this); 2071 my_graph.reserve_wait(); 2072 spawn_in_graph_arena(graph_reference(), *t); 2073 } 2074 return true; 2075 } 2076 2077 //! Removes src from the list of cached predecessors. 2078 bool remove_predecessor( predecessor_type &src ) override { 2079 my_predecessors.remove( src ); 2080 return true; 2081 } 2082 2083 protected: 2084 2085 template< typename R, typename B > friend class run_and_put_task; 2086 template<typename X, typename Y> friend class broadcast_cache; 2087 template<typename X, typename Y> friend class round_robin_cache; 2088 //! Puts an item to this receiver 2089 graph_task* try_put_task( const T &t ) override { 2090 { 2091 spin_mutex::scoped_lock lock(my_mutex); 2092 if ( my_count + my_tries >= my_threshold ) 2093 return nullptr; 2094 else 2095 ++my_tries; 2096 } 2097 2098 graph_task* rtask = my_successors.try_put_task(t); 2099 if ( !rtask ) { // try_put_task failed. 2100 spin_mutex::scoped_lock lock(my_mutex); 2101 --my_tries; 2102 if (check_conditions() && is_graph_active(this->my_graph)) { 2103 small_object_allocator allocator{}; 2104 typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type; 2105 rtask = allocator.new_object<task_type>(my_graph, allocator, *this); 2106 my_graph.reserve_wait(); 2107 } 2108 } 2109 else { 2110 spin_mutex::scoped_lock lock(my_mutex); 2111 ++my_count; 2112 if ( my_future_decrement ) { 2113 if ( my_count > my_future_decrement ) { 2114 my_count -= my_future_decrement; 2115 my_future_decrement = 0; 2116 } 2117 else { 2118 my_future_decrement -= my_count; 2119 my_count = 0; 2120 } 2121 } 2122 --my_tries; 2123 } 2124 return rtask; 2125 } 2126 2127 graph& graph_reference() const override { return my_graph; } 2128 2129 void reset_node( reset_flags f ) override { 2130 my_count = 0; 2131 if ( f & rf_clear_edges ) { 2132 my_predecessors.clear(); 2133 my_successors.clear(); 2134 } 2135 else { 2136 my_predecessors.reset(); 2137 } 2138 decrement.reset_receiver(f); 2139 } 2140 }; // limiter_node 2141 2142 #include "detail/_flow_graph_join_impl.h" 2143 2144 template<typename OutputTuple, typename JP=queueing> class join_node; 2145 2146 template<typename OutputTuple> 2147 class join_node<OutputTuple,reserving>: public unfolded_join_node<std::tuple_size<OutputTuple>::value, reserving_port, OutputTuple, reserving> { 2148 private: 2149 static const int N = std::tuple_size<OutputTuple>::value; 2150 typedef unfolded_join_node<N, reserving_port, OutputTuple, reserving> unfolded_type; 2151 public: 2152 typedef OutputTuple output_type; 2153 typedef typename unfolded_type::input_ports_type input_ports_type; 2154 __TBB_NOINLINE_SYM explicit join_node(graph &g) : unfolded_type(g) { 2155 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_RESERVING, &this->my_graph, 2156 this->input_ports(), static_cast< sender< output_type > *>(this) ); 2157 } 2158 2159 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 2160 template <typename... Args> 2161 __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, reserving = reserving()) : join_node(nodes.graph_reference()) { 2162 make_edges_in_order(nodes, *this); 2163 } 2164 #endif 2165 2166 __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) { 2167 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_RESERVING, &this->my_graph, 2168 this->input_ports(), static_cast< sender< output_type > *>(this) ); 2169 } 2170 2171 }; 2172 2173 template<typename OutputTuple> 2174 class join_node<OutputTuple,queueing>: public unfolded_join_node<std::tuple_size<OutputTuple>::value, queueing_port, OutputTuple, queueing> { 2175 private: 2176 static const int N = std::tuple_size<OutputTuple>::value; 2177 typedef unfolded_join_node<N, queueing_port, OutputTuple, queueing> unfolded_type; 2178 public: 2179 typedef OutputTuple output_type; 2180 typedef typename unfolded_type::input_ports_type input_ports_type; 2181 __TBB_NOINLINE_SYM explicit join_node(graph &g) : unfolded_type(g) { 2182 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_QUEUEING, &this->my_graph, 2183 this->input_ports(), static_cast< sender< output_type > *>(this) ); 2184 } 2185 2186 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 2187 template <typename... Args> 2188 __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, queueing = queueing()) : join_node(nodes.graph_reference()) { 2189 make_edges_in_order(nodes, *this); 2190 } 2191 #endif 2192 2193 __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) { 2194 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_QUEUEING, &this->my_graph, 2195 this->input_ports(), static_cast< sender< output_type > *>(this) ); 2196 } 2197 2198 }; 2199 2200 #if __TBB_CPP20_CONCEPTS_PRESENT 2201 // Helper function which is well-formed only if all of the elements in OutputTuple 2202 // satisfies join_node_function_object<body[i], tuple[i], K> 2203 template <typename OutputTuple, typename K, 2204 typename... Functions, std::size_t... Idx> 2205 void join_node_function_objects_helper( std::index_sequence<Idx...> ) 2206 requires (std::tuple_size_v<OutputTuple> == sizeof...(Functions)) && 2207 (... && join_node_function_object<Functions, std::tuple_element_t<Idx, OutputTuple>, K>); 2208 2209 template <typename OutputTuple, typename K, typename... Functions> 2210 concept join_node_functions = requires { 2211 join_node_function_objects_helper<OutputTuple, K, Functions...>(std::make_index_sequence<sizeof...(Functions)>{}); 2212 }; 2213 2214 #endif 2215 2216 // template for key_matching join_node 2217 // tag_matching join_node is a specialization of key_matching, and is source-compatible. 2218 template<typename OutputTuple, typename K, typename KHash> 2219 class join_node<OutputTuple, key_matching<K, KHash> > : public unfolded_join_node<std::tuple_size<OutputTuple>::value, 2220 key_matching_port, OutputTuple, key_matching<K,KHash> > { 2221 private: 2222 static const int N = std::tuple_size<OutputTuple>::value; 2223 typedef unfolded_join_node<N, key_matching_port, OutputTuple, key_matching<K,KHash> > unfolded_type; 2224 public: 2225 typedef OutputTuple output_type; 2226 typedef typename unfolded_type::input_ports_type input_ports_type; 2227 2228 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 2229 join_node(graph &g) : unfolded_type(g) {} 2230 #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */ 2231 2232 template<typename __TBB_B0, typename __TBB_B1> 2233 __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1>) 2234 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1) : unfolded_type(g, b0, b1) { 2235 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph, 2236 this->input_ports(), static_cast< sender< output_type > *>(this) ); 2237 } 2238 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2> 2239 __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2>) 2240 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2) : unfolded_type(g, b0, b1, b2) { 2241 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph, 2242 this->input_ports(), static_cast< sender< output_type > *>(this) ); 2243 } 2244 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3> 2245 __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3>) 2246 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3) : unfolded_type(g, b0, b1, b2, b3) { 2247 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph, 2248 this->input_ports(), static_cast< sender< output_type > *>(this) ); 2249 } 2250 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4> 2251 __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4>) 2252 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4) : 2253 unfolded_type(g, b0, b1, b2, b3, b4) { 2254 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph, 2255 this->input_ports(), static_cast< sender< output_type > *>(this) ); 2256 } 2257 #if __TBB_VARIADIC_MAX >= 6 2258 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4, 2259 typename __TBB_B5> 2260 __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5>) 2261 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5) : 2262 unfolded_type(g, b0, b1, b2, b3, b4, b5) { 2263 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph, 2264 this->input_ports(), static_cast< sender< output_type > *>(this) ); 2265 } 2266 #endif 2267 #if __TBB_VARIADIC_MAX >= 7 2268 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4, 2269 typename __TBB_B5, typename __TBB_B6> 2270 __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5, __TBB_B6>) 2271 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6) : 2272 unfolded_type(g, b0, b1, b2, b3, b4, b5, b6) { 2273 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph, 2274 this->input_ports(), static_cast< sender< output_type > *>(this) ); 2275 } 2276 #endif 2277 #if __TBB_VARIADIC_MAX >= 8 2278 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4, 2279 typename __TBB_B5, typename __TBB_B6, typename __TBB_B7> 2280 __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5, __TBB_B6, __TBB_B7>) 2281 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6, 2282 __TBB_B7 b7) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7) { 2283 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph, 2284 this->input_ports(), static_cast< sender< output_type > *>(this) ); 2285 } 2286 #endif 2287 #if __TBB_VARIADIC_MAX >= 9 2288 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4, 2289 typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8> 2290 __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5, __TBB_B6, __TBB_B7, __TBB_B8>) 2291 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6, 2292 __TBB_B7 b7, __TBB_B8 b8) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8) { 2293 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph, 2294 this->input_ports(), static_cast< sender< output_type > *>(this) ); 2295 } 2296 #endif 2297 #if __TBB_VARIADIC_MAX >= 10 2298 template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4, 2299 typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8, typename __TBB_B9> 2300 __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5, __TBB_B6, __TBB_B7, __TBB_B8, __TBB_B9>) 2301 __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6, 2302 __TBB_B7 b7, __TBB_B8 b8, __TBB_B9 b9) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8, b9) { 2303 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph, 2304 this->input_ports(), static_cast< sender< output_type > *>(this) ); 2305 } 2306 #endif 2307 2308 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 2309 template < 2310 #if (__clang_major__ == 3 && __clang_minor__ == 4) 2311 // clang 3.4 misdeduces 'Args...' for 'node_set' while it can cope with template template parameter. 2312 template<typename...> class node_set, 2313 #endif 2314 typename... Args, typename... Bodies 2315 > 2316 __TBB_requires((sizeof...(Bodies) == 0) || join_node_functions<OutputTuple, K, Bodies...>) 2317 __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, Bodies... bodies) 2318 : join_node(nodes.graph_reference(), bodies...) { 2319 make_edges_in_order(nodes, *this); 2320 } 2321 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 2322 2323 __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) { 2324 fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph, 2325 this->input_ports(), static_cast< sender< output_type > *>(this) ); 2326 } 2327 2328 }; 2329 2330 // indexer node 2331 #include "detail/_flow_graph_indexer_impl.h" 2332 2333 // TODO: Implement interface with variadic template or tuple 2334 template<typename T0, typename T1=null_type, typename T2=null_type, typename T3=null_type, 2335 typename T4=null_type, typename T5=null_type, typename T6=null_type, 2336 typename T7=null_type, typename T8=null_type, typename T9=null_type> class indexer_node; 2337 2338 //indexer node specializations 2339 template<typename T0> 2340 class indexer_node<T0> : public unfolded_indexer_node<std::tuple<T0> > { 2341 private: 2342 static const int N = 1; 2343 public: 2344 typedef std::tuple<T0> InputTuple; 2345 typedef tagged_msg<size_t, T0> output_type; 2346 typedef unfolded_indexer_node<InputTuple> unfolded_type; 2347 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) { 2348 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph, 2349 this->input_ports(), static_cast< sender< output_type > *>(this) ); 2350 } 2351 2352 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 2353 template <typename... Args> 2354 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) { 2355 make_edges_in_order(nodes, *this); 2356 } 2357 #endif 2358 2359 // Copy constructor 2360 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) { 2361 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph, 2362 this->input_ports(), static_cast< sender< output_type > *>(this) ); 2363 } 2364 }; 2365 2366 template<typename T0, typename T1> 2367 class indexer_node<T0, T1> : public unfolded_indexer_node<std::tuple<T0, T1> > { 2368 private: 2369 static const int N = 2; 2370 public: 2371 typedef std::tuple<T0, T1> InputTuple; 2372 typedef tagged_msg<size_t, T0, T1> output_type; 2373 typedef unfolded_indexer_node<InputTuple> unfolded_type; 2374 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) { 2375 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph, 2376 this->input_ports(), static_cast< sender< output_type > *>(this) ); 2377 } 2378 2379 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 2380 template <typename... Args> 2381 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) { 2382 make_edges_in_order(nodes, *this); 2383 } 2384 #endif 2385 2386 // Copy constructor 2387 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) { 2388 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph, 2389 this->input_ports(), static_cast< sender< output_type > *>(this) ); 2390 } 2391 2392 }; 2393 2394 template<typename T0, typename T1, typename T2> 2395 class indexer_node<T0, T1, T2> : public unfolded_indexer_node<std::tuple<T0, T1, T2> > { 2396 private: 2397 static const int N = 3; 2398 public: 2399 typedef std::tuple<T0, T1, T2> InputTuple; 2400 typedef tagged_msg<size_t, T0, T1, T2> output_type; 2401 typedef unfolded_indexer_node<InputTuple> unfolded_type; 2402 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) { 2403 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph, 2404 this->input_ports(), static_cast< sender< output_type > *>(this) ); 2405 } 2406 2407 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 2408 template <typename... Args> 2409 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) { 2410 make_edges_in_order(nodes, *this); 2411 } 2412 #endif 2413 2414 // Copy constructor 2415 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) { 2416 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph, 2417 this->input_ports(), static_cast< sender< output_type > *>(this) ); 2418 } 2419 2420 }; 2421 2422 template<typename T0, typename T1, typename T2, typename T3> 2423 class indexer_node<T0, T1, T2, T3> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3> > { 2424 private: 2425 static const int N = 4; 2426 public: 2427 typedef std::tuple<T0, T1, T2, T3> InputTuple; 2428 typedef tagged_msg<size_t, T0, T1, T2, T3> output_type; 2429 typedef unfolded_indexer_node<InputTuple> unfolded_type; 2430 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) { 2431 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph, 2432 this->input_ports(), static_cast< sender< output_type > *>(this) ); 2433 } 2434 2435 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 2436 template <typename... Args> 2437 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) { 2438 make_edges_in_order(nodes, *this); 2439 } 2440 #endif 2441 2442 // Copy constructor 2443 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) { 2444 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph, 2445 this->input_ports(), static_cast< sender< output_type > *>(this) ); 2446 } 2447 2448 }; 2449 2450 template<typename T0, typename T1, typename T2, typename T3, typename T4> 2451 class indexer_node<T0, T1, T2, T3, T4> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4> > { 2452 private: 2453 static const int N = 5; 2454 public: 2455 typedef std::tuple<T0, T1, T2, T3, T4> InputTuple; 2456 typedef tagged_msg<size_t, T0, T1, T2, T3, T4> output_type; 2457 typedef unfolded_indexer_node<InputTuple> unfolded_type; 2458 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) { 2459 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph, 2460 this->input_ports(), static_cast< sender< output_type > *>(this) ); 2461 } 2462 2463 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 2464 template <typename... Args> 2465 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) { 2466 make_edges_in_order(nodes, *this); 2467 } 2468 #endif 2469 2470 // Copy constructor 2471 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) { 2472 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph, 2473 this->input_ports(), static_cast< sender< output_type > *>(this) ); 2474 } 2475 2476 }; 2477 2478 #if __TBB_VARIADIC_MAX >= 6 2479 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5> 2480 class indexer_node<T0, T1, T2, T3, T4, T5> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5> > { 2481 private: 2482 static const int N = 6; 2483 public: 2484 typedef std::tuple<T0, T1, T2, T3, T4, T5> InputTuple; 2485 typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5> output_type; 2486 typedef unfolded_indexer_node<InputTuple> unfolded_type; 2487 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) { 2488 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph, 2489 this->input_ports(), static_cast< sender< output_type > *>(this) ); 2490 } 2491 2492 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 2493 template <typename... Args> 2494 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) { 2495 make_edges_in_order(nodes, *this); 2496 } 2497 #endif 2498 2499 // Copy constructor 2500 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) { 2501 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph, 2502 this->input_ports(), static_cast< sender< output_type > *>(this) ); 2503 } 2504 2505 }; 2506 #endif //variadic max 6 2507 2508 #if __TBB_VARIADIC_MAX >= 7 2509 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5, 2510 typename T6> 2511 class indexer_node<T0, T1, T2, T3, T4, T5, T6> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5, T6> > { 2512 private: 2513 static const int N = 7; 2514 public: 2515 typedef std::tuple<T0, T1, T2, T3, T4, T5, T6> InputTuple; 2516 typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6> output_type; 2517 typedef unfolded_indexer_node<InputTuple> unfolded_type; 2518 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) { 2519 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph, 2520 this->input_ports(), static_cast< sender< output_type > *>(this) ); 2521 } 2522 2523 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 2524 template <typename... Args> 2525 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) { 2526 make_edges_in_order(nodes, *this); 2527 } 2528 #endif 2529 2530 // Copy constructor 2531 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) { 2532 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph, 2533 this->input_ports(), static_cast< sender< output_type > *>(this) ); 2534 } 2535 2536 }; 2537 #endif //variadic max 7 2538 2539 #if __TBB_VARIADIC_MAX >= 8 2540 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5, 2541 typename T6, typename T7> 2542 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5, T6, T7> > { 2543 private: 2544 static const int N = 8; 2545 public: 2546 typedef std::tuple<T0, T1, T2, T3, T4, T5, T6, T7> InputTuple; 2547 typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7> output_type; 2548 typedef unfolded_indexer_node<InputTuple> unfolded_type; 2549 indexer_node(graph& g) : unfolded_type(g) { 2550 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph, 2551 this->input_ports(), static_cast< sender< output_type > *>(this) ); 2552 } 2553 2554 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 2555 template <typename... Args> 2556 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) { 2557 make_edges_in_order(nodes, *this); 2558 } 2559 #endif 2560 2561 // Copy constructor 2562 indexer_node( const indexer_node& other ) : unfolded_type(other) { 2563 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph, 2564 this->input_ports(), static_cast< sender< output_type > *>(this) ); 2565 } 2566 2567 }; 2568 #endif //variadic max 8 2569 2570 #if __TBB_VARIADIC_MAX >= 9 2571 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5, 2572 typename T6, typename T7, typename T8> 2573 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7, T8> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> > { 2574 private: 2575 static const int N = 9; 2576 public: 2577 typedef std::tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> InputTuple; 2578 typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8> output_type; 2579 typedef unfolded_indexer_node<InputTuple> unfolded_type; 2580 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) { 2581 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph, 2582 this->input_ports(), static_cast< sender< output_type > *>(this) ); 2583 } 2584 2585 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 2586 template <typename... Args> 2587 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) { 2588 make_edges_in_order(nodes, *this); 2589 } 2590 #endif 2591 2592 // Copy constructor 2593 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) { 2594 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph, 2595 this->input_ports(), static_cast< sender< output_type > *>(this) ); 2596 } 2597 2598 }; 2599 #endif //variadic max 9 2600 2601 #if __TBB_VARIADIC_MAX >= 10 2602 template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5, 2603 typename T6, typename T7, typename T8, typename T9> 2604 class indexer_node/*default*/ : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> > { 2605 private: 2606 static const int N = 10; 2607 public: 2608 typedef std::tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> InputTuple; 2609 typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> output_type; 2610 typedef unfolded_indexer_node<InputTuple> unfolded_type; 2611 __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) { 2612 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph, 2613 this->input_ports(), static_cast< sender< output_type > *>(this) ); 2614 } 2615 2616 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 2617 template <typename... Args> 2618 indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) { 2619 make_edges_in_order(nodes, *this); 2620 } 2621 #endif 2622 2623 // Copy constructor 2624 __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) { 2625 fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph, 2626 this->input_ports(), static_cast< sender< output_type > *>(this) ); 2627 } 2628 2629 }; 2630 #endif //variadic max 10 2631 2632 template< typename T > 2633 inline void internal_make_edge( sender<T> &p, receiver<T> &s ) { 2634 register_successor(p, s); 2635 fgt_make_edge( &p, &s ); 2636 } 2637 2638 //! Makes an edge between a single predecessor and a single successor 2639 template< typename T > 2640 inline void make_edge( sender<T> &p, receiver<T> &s ) { 2641 internal_make_edge( p, s ); 2642 } 2643 2644 //Makes an edge from port 0 of a multi-output predecessor to port 0 of a multi-input successor. 2645 template< typename T, typename V, 2646 typename = typename T::output_ports_type, typename = typename V::input_ports_type > 2647 inline void make_edge( T& output, V& input) { 2648 make_edge(std::get<0>(output.output_ports()), std::get<0>(input.input_ports())); 2649 } 2650 2651 //Makes an edge from port 0 of a multi-output predecessor to a receiver. 2652 template< typename T, typename R, 2653 typename = typename T::output_ports_type > 2654 inline void make_edge( T& output, receiver<R>& input) { 2655 make_edge(std::get<0>(output.output_ports()), input); 2656 } 2657 2658 //Makes an edge from a sender to port 0 of a multi-input successor. 2659 template< typename S, typename V, 2660 typename = typename V::input_ports_type > 2661 inline void make_edge( sender<S>& output, V& input) { 2662 make_edge(output, std::get<0>(input.input_ports())); 2663 } 2664 2665 template< typename T > 2666 inline void internal_remove_edge( sender<T> &p, receiver<T> &s ) { 2667 remove_successor( p, s ); 2668 fgt_remove_edge( &p, &s ); 2669 } 2670 2671 //! Removes an edge between a single predecessor and a single successor 2672 template< typename T > 2673 inline void remove_edge( sender<T> &p, receiver<T> &s ) { 2674 internal_remove_edge( p, s ); 2675 } 2676 2677 //Removes an edge between port 0 of a multi-output predecessor and port 0 of a multi-input successor. 2678 template< typename T, typename V, 2679 typename = typename T::output_ports_type, typename = typename V::input_ports_type > 2680 inline void remove_edge( T& output, V& input) { 2681 remove_edge(std::get<0>(output.output_ports()), std::get<0>(input.input_ports())); 2682 } 2683 2684 //Removes an edge between port 0 of a multi-output predecessor and a receiver. 2685 template< typename T, typename R, 2686 typename = typename T::output_ports_type > 2687 inline void remove_edge( T& output, receiver<R>& input) { 2688 remove_edge(std::get<0>(output.output_ports()), input); 2689 } 2690 //Removes an edge between a sender and port 0 of a multi-input successor. 2691 template< typename S, typename V, 2692 typename = typename V::input_ports_type > 2693 inline void remove_edge( sender<S>& output, V& input) { 2694 remove_edge(output, std::get<0>(input.input_ports())); 2695 } 2696 2697 //! Returns a copy of the body from a function or continue node 2698 template< typename Body, typename Node > 2699 Body copy_body( Node &n ) { 2700 return n.template copy_function_object<Body>(); 2701 } 2702 2703 //composite_node 2704 template< typename InputTuple, typename OutputTuple > class composite_node; 2705 2706 template< typename... InputTypes, typename... OutputTypes> 2707 class composite_node <std::tuple<InputTypes...>, std::tuple<OutputTypes...> > : public graph_node { 2708 2709 public: 2710 typedef std::tuple< receiver<InputTypes>&... > input_ports_type; 2711 typedef std::tuple< sender<OutputTypes>&... > output_ports_type; 2712 2713 private: 2714 std::unique_ptr<input_ports_type> my_input_ports; 2715 std::unique_ptr<output_ports_type> my_output_ports; 2716 2717 static const size_t NUM_INPUTS = sizeof...(InputTypes); 2718 static const size_t NUM_OUTPUTS = sizeof...(OutputTypes); 2719 2720 protected: 2721 void reset_node(reset_flags) override {} 2722 2723 public: 2724 composite_node( graph &g ) : graph_node(g) { 2725 fgt_multiinput_multioutput_node( CODEPTR(), FLOW_COMPOSITE_NODE, this, &this->my_graph ); 2726 } 2727 2728 template<typename T1, typename T2> 2729 void set_external_ports(T1&& input_ports_tuple, T2&& output_ports_tuple) { 2730 static_assert(NUM_INPUTS == std::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports"); 2731 static_assert(NUM_OUTPUTS == std::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports"); 2732 2733 fgt_internal_input_alias_helper<T1, NUM_INPUTS>::alias_port( this, input_ports_tuple); 2734 fgt_internal_output_alias_helper<T2, NUM_OUTPUTS>::alias_port( this, output_ports_tuple); 2735 2736 my_input_ports.reset( new input_ports_type(std::forward<T1>(input_ports_tuple)) ); 2737 my_output_ports.reset( new output_ports_type(std::forward<T2>(output_ports_tuple)) ); 2738 } 2739 2740 template< typename... NodeTypes > 2741 void add_visible_nodes(const NodeTypes&... n) { add_nodes_impl(this, true, n...); } 2742 2743 template< typename... NodeTypes > 2744 void add_nodes(const NodeTypes&... n) { add_nodes_impl(this, false, n...); } 2745 2746 2747 input_ports_type& input_ports() { 2748 __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports"); 2749 return *my_input_ports; 2750 } 2751 2752 output_ports_type& output_ports() { 2753 __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports"); 2754 return *my_output_ports; 2755 } 2756 }; // class composite_node 2757 2758 //composite_node with only input ports 2759 template< typename... InputTypes> 2760 class composite_node <std::tuple<InputTypes...>, std::tuple<> > : public graph_node { 2761 public: 2762 typedef std::tuple< receiver<InputTypes>&... > input_ports_type; 2763 2764 private: 2765 std::unique_ptr<input_ports_type> my_input_ports; 2766 static const size_t NUM_INPUTS = sizeof...(InputTypes); 2767 2768 protected: 2769 void reset_node(reset_flags) override {} 2770 2771 public: 2772 composite_node( graph &g ) : graph_node(g) { 2773 fgt_composite( CODEPTR(), this, &g ); 2774 } 2775 2776 template<typename T> 2777 void set_external_ports(T&& input_ports_tuple) { 2778 static_assert(NUM_INPUTS == std::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports"); 2779 2780 fgt_internal_input_alias_helper<T, NUM_INPUTS>::alias_port( this, input_ports_tuple); 2781 2782 my_input_ports.reset( new input_ports_type(std::forward<T>(input_ports_tuple)) ); 2783 } 2784 2785 template< typename... NodeTypes > 2786 void add_visible_nodes(const NodeTypes&... n) { add_nodes_impl(this, true, n...); } 2787 2788 template< typename... NodeTypes > 2789 void add_nodes( const NodeTypes&... n) { add_nodes_impl(this, false, n...); } 2790 2791 2792 input_ports_type& input_ports() { 2793 __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports"); 2794 return *my_input_ports; 2795 } 2796 2797 }; // class composite_node 2798 2799 //composite_nodes with only output_ports 2800 template<typename... OutputTypes> 2801 class composite_node <std::tuple<>, std::tuple<OutputTypes...> > : public graph_node { 2802 public: 2803 typedef std::tuple< sender<OutputTypes>&... > output_ports_type; 2804 2805 private: 2806 std::unique_ptr<output_ports_type> my_output_ports; 2807 static const size_t NUM_OUTPUTS = sizeof...(OutputTypes); 2808 2809 protected: 2810 void reset_node(reset_flags) override {} 2811 2812 public: 2813 __TBB_NOINLINE_SYM composite_node( graph &g ) : graph_node(g) { 2814 fgt_composite( CODEPTR(), this, &g ); 2815 } 2816 2817 template<typename T> 2818 void set_external_ports(T&& output_ports_tuple) { 2819 static_assert(NUM_OUTPUTS == std::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports"); 2820 2821 fgt_internal_output_alias_helper<T, NUM_OUTPUTS>::alias_port( this, output_ports_tuple); 2822 2823 my_output_ports.reset( new output_ports_type(std::forward<T>(output_ports_tuple)) ); 2824 } 2825 2826 template<typename... NodeTypes > 2827 void add_visible_nodes(const NodeTypes&... n) { add_nodes_impl(this, true, n...); } 2828 2829 template<typename... NodeTypes > 2830 void add_nodes(const NodeTypes&... n) { add_nodes_impl(this, false, n...); } 2831 2832 2833 output_ports_type& output_ports() { 2834 __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports"); 2835 return *my_output_ports; 2836 } 2837 2838 }; // class composite_node 2839 2840 template<typename Gateway> 2841 class async_body_base: no_assign { 2842 public: 2843 typedef Gateway gateway_type; 2844 2845 async_body_base(gateway_type *gateway): my_gateway(gateway) { } 2846 void set_gateway(gateway_type *gateway) { 2847 my_gateway = gateway; 2848 } 2849 2850 protected: 2851 gateway_type *my_gateway; 2852 }; 2853 2854 template<typename Input, typename Ports, typename Gateway, typename Body> 2855 class async_body: public async_body_base<Gateway> { 2856 private: 2857 Body my_body; 2858 2859 public: 2860 typedef async_body_base<Gateway> base_type; 2861 typedef Gateway gateway_type; 2862 2863 async_body(const Body &body, gateway_type *gateway) 2864 : base_type(gateway), my_body(body) { } 2865 2866 void operator()( const Input &v, Ports & ) noexcept(noexcept(my_body(v, std::declval<gateway_type&>()))) { 2867 my_body(v, *this->my_gateway); 2868 } 2869 2870 Body get_body() { return my_body; } 2871 }; 2872 2873 //! Implements async node 2874 template < typename Input, typename Output, 2875 typename Policy = queueing_lightweight > 2876 __TBB_requires(std::default_initializable<Input> && std::copy_constructible<Input>) 2877 class async_node 2878 : public multifunction_node< Input, std::tuple< Output >, Policy >, public sender< Output > 2879 { 2880 typedef multifunction_node< Input, std::tuple< Output >, Policy > base_type; 2881 typedef multifunction_input< 2882 Input, typename base_type::output_ports_type, Policy, cache_aligned_allocator<Input>> mfn_input_type; 2883 2884 public: 2885 typedef Input input_type; 2886 typedef Output output_type; 2887 typedef receiver<input_type> receiver_type; 2888 typedef receiver<output_type> successor_type; 2889 typedef sender<input_type> predecessor_type; 2890 typedef receiver_gateway<output_type> gateway_type; 2891 typedef async_body_base<gateway_type> async_body_base_type; 2892 typedef typename base_type::output_ports_type output_ports_type; 2893 2894 private: 2895 class receiver_gateway_impl: public receiver_gateway<Output> { 2896 public: 2897 receiver_gateway_impl(async_node* node): my_node(node) {} 2898 void reserve_wait() override { 2899 fgt_async_reserve(static_cast<typename async_node::receiver_type *>(my_node), &my_node->my_graph); 2900 my_node->my_graph.reserve_wait(); 2901 } 2902 2903 void release_wait() override { 2904 async_node* n = my_node; 2905 graph* g = &n->my_graph; 2906 g->release_wait(); 2907 fgt_async_commit(static_cast<typename async_node::receiver_type *>(n), g); 2908 } 2909 2910 //! Implements gateway_type::try_put for an external activity to submit a message to FG 2911 bool try_put(const Output &i) override { 2912 return my_node->try_put_impl(i); 2913 } 2914 2915 private: 2916 async_node* my_node; 2917 } my_gateway; 2918 2919 //The substitute of 'this' for member construction, to prevent compiler warnings 2920 async_node* self() { return this; } 2921 2922 //! Implements gateway_type::try_put for an external activity to submit a message to FG 2923 bool try_put_impl(const Output &i) { 2924 multifunction_output<Output> &port_0 = output_port<0>(*this); 2925 broadcast_cache<output_type>& port_successors = port_0.successors(); 2926 fgt_async_try_put_begin(this, &port_0); 2927 // TODO revamp: change to std::list<graph_task*> 2928 graph_task_list tasks; 2929 bool is_at_least_one_put_successful = port_successors.gather_successful_try_puts(i, tasks); 2930 __TBB_ASSERT( is_at_least_one_put_successful || tasks.empty(), 2931 "Return status is inconsistent with the method operation." ); 2932 2933 while( !tasks.empty() ) { 2934 enqueue_in_graph_arena(this->my_graph, tasks.pop_front()); 2935 } 2936 fgt_async_try_put_end(this, &port_0); 2937 return is_at_least_one_put_successful; 2938 } 2939 2940 public: 2941 template<typename Body> 2942 __TBB_requires(async_node_body<Body, input_type, gateway_type>) 2943 __TBB_NOINLINE_SYM async_node( 2944 graph &g, size_t concurrency, 2945 Body body, Policy = Policy(), node_priority_t a_priority = no_priority 2946 ) : base_type( 2947 g, concurrency, 2948 async_body<Input, typename base_type::output_ports_type, gateway_type, Body> 2949 (body, &my_gateway), a_priority ), my_gateway(self()) { 2950 fgt_multioutput_node_with_body<1>( 2951 CODEPTR(), FLOW_ASYNC_NODE, 2952 &this->my_graph, static_cast<receiver<input_type> *>(this), 2953 this->output_ports(), this->my_body 2954 ); 2955 } 2956 2957 template <typename Body> 2958 __TBB_requires(async_node_body<Body, input_type, gateway_type>) 2959 __TBB_NOINLINE_SYM async_node(graph& g, size_t concurrency, Body body, node_priority_t a_priority) 2960 : async_node(g, concurrency, body, Policy(), a_priority) {} 2961 2962 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 2963 template <typename Body, typename... Args> 2964 __TBB_requires(async_node_body<Body, input_type, gateway_type>) 2965 __TBB_NOINLINE_SYM async_node( 2966 const node_set<Args...>& nodes, size_t concurrency, Body body, 2967 Policy = Policy(), node_priority_t a_priority = no_priority ) 2968 : async_node(nodes.graph_reference(), concurrency, body, a_priority) { 2969 make_edges_in_order(nodes, *this); 2970 } 2971 2972 template <typename Body, typename... Args> 2973 __TBB_requires(async_node_body<Body, input_type, gateway_type>) 2974 __TBB_NOINLINE_SYM async_node(const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t a_priority) 2975 : async_node(nodes, concurrency, body, Policy(), a_priority) {} 2976 #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 2977 2978 __TBB_NOINLINE_SYM async_node( const async_node &other ) : base_type(other), sender<Output>(), my_gateway(self()) { 2979 static_cast<async_body_base_type*>(this->my_body->get_body_ptr())->set_gateway(&my_gateway); 2980 static_cast<async_body_base_type*>(this->my_init_body->get_body_ptr())->set_gateway(&my_gateway); 2981 2982 fgt_multioutput_node_with_body<1>( CODEPTR(), FLOW_ASYNC_NODE, 2983 &this->my_graph, static_cast<receiver<input_type> *>(this), 2984 this->output_ports(), this->my_body ); 2985 } 2986 2987 gateway_type& gateway() { 2988 return my_gateway; 2989 } 2990 2991 // Define sender< Output > 2992 2993 //! Add a new successor to this node 2994 bool register_successor(successor_type&) override { 2995 __TBB_ASSERT(false, "Successors must be registered only via ports"); 2996 return false; 2997 } 2998 2999 //! Removes a successor from this node 3000 bool remove_successor(successor_type&) override { 3001 __TBB_ASSERT(false, "Successors must be removed only via ports"); 3002 return false; 3003 } 3004 3005 template<typename Body> 3006 Body copy_function_object() { 3007 typedef multifunction_body<input_type, typename base_type::output_ports_type> mfn_body_type; 3008 typedef async_body<Input, typename base_type::output_ports_type, gateway_type, Body> async_body_type; 3009 mfn_body_type &body_ref = *this->my_body; 3010 async_body_type ab = *static_cast<async_body_type*>(dynamic_cast< multifunction_body_leaf<input_type, typename base_type::output_ports_type, async_body_type> & >(body_ref).get_body_ptr()); 3011 return ab.get_body(); 3012 } 3013 3014 protected: 3015 3016 void reset_node( reset_flags f) override { 3017 base_type::reset_node(f); 3018 } 3019 }; 3020 3021 #include "detail/_flow_graph_node_set_impl.h" 3022 3023 template< typename T > 3024 class overwrite_node : public graph_node, public receiver<T>, public sender<T> { 3025 public: 3026 typedef T input_type; 3027 typedef T output_type; 3028 typedef typename receiver<input_type>::predecessor_type predecessor_type; 3029 typedef typename sender<output_type>::successor_type successor_type; 3030 3031 __TBB_NOINLINE_SYM explicit overwrite_node(graph &g) 3032 : graph_node(g), my_successors(this), my_buffer_is_valid(false) 3033 { 3034 fgt_node( CODEPTR(), FLOW_OVERWRITE_NODE, &this->my_graph, 3035 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) ); 3036 } 3037 3038 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 3039 template <typename... Args> 3040 overwrite_node(const node_set<Args...>& nodes) : overwrite_node(nodes.graph_reference()) { 3041 make_edges_in_order(nodes, *this); 3042 } 3043 #endif 3044 3045 //! Copy constructor; doesn't take anything from src; default won't work 3046 __TBB_NOINLINE_SYM overwrite_node( const overwrite_node& src ) : overwrite_node(src.my_graph) {} 3047 3048 ~overwrite_node() {} 3049 3050 bool register_successor( successor_type &s ) override { 3051 spin_mutex::scoped_lock l( my_mutex ); 3052 if (my_buffer_is_valid && is_graph_active( my_graph )) { 3053 // We have a valid value that must be forwarded immediately. 3054 bool ret = s.try_put( my_buffer ); 3055 if ( ret ) { 3056 // We add the successor that accepted our put 3057 my_successors.register_successor( s ); 3058 } else { 3059 // In case of reservation a race between the moment of reservation and register_successor can appear, 3060 // because failed reserve does not mean that register_successor is not ready to put a message immediately. 3061 // We have some sort of infinite loop: reserving node tries to set pull state for the edge, 3062 // but overwrite_node tries to return push state back. That is why we have to break this loop with task creation. 3063 small_object_allocator allocator{}; 3064 typedef register_predecessor_task task_type; 3065 graph_task* t = allocator.new_object<task_type>(graph_reference(), allocator, *this, s); 3066 graph_reference().reserve_wait(); 3067 spawn_in_graph_arena( my_graph, *t ); 3068 } 3069 } else { 3070 // No valid value yet, just add as successor 3071 my_successors.register_successor( s ); 3072 } 3073 return true; 3074 } 3075 3076 bool remove_successor( successor_type &s ) override { 3077 spin_mutex::scoped_lock l( my_mutex ); 3078 my_successors.remove_successor(s); 3079 return true; 3080 } 3081 3082 bool try_get( input_type &v ) override { 3083 spin_mutex::scoped_lock l( my_mutex ); 3084 if ( my_buffer_is_valid ) { 3085 v = my_buffer; 3086 return true; 3087 } 3088 return false; 3089 } 3090 3091 //! Reserves an item 3092 bool try_reserve( T &v ) override { 3093 return try_get(v); 3094 } 3095 3096 //! Releases the reserved item 3097 bool try_release() override { return true; } 3098 3099 //! Consumes the reserved item 3100 bool try_consume() override { return true; } 3101 3102 bool is_valid() { 3103 spin_mutex::scoped_lock l( my_mutex ); 3104 return my_buffer_is_valid; 3105 } 3106 3107 void clear() { 3108 spin_mutex::scoped_lock l( my_mutex ); 3109 my_buffer_is_valid = false; 3110 } 3111 3112 protected: 3113 3114 template< typename R, typename B > friend class run_and_put_task; 3115 template<typename X, typename Y> friend class broadcast_cache; 3116 template<typename X, typename Y> friend class round_robin_cache; 3117 graph_task* try_put_task( const input_type &v ) override { 3118 spin_mutex::scoped_lock l( my_mutex ); 3119 return try_put_task_impl(v); 3120 } 3121 3122 graph_task * try_put_task_impl(const input_type &v) { 3123 my_buffer = v; 3124 my_buffer_is_valid = true; 3125 graph_task* rtask = my_successors.try_put_task(v); 3126 if (!rtask) rtask = SUCCESSFULLY_ENQUEUED; 3127 return rtask; 3128 } 3129 3130 graph& graph_reference() const override { 3131 return my_graph; 3132 } 3133 3134 //! Breaks an infinite loop between the node reservation and register_successor call 3135 struct register_predecessor_task : public graph_task { 3136 register_predecessor_task( 3137 graph& g, small_object_allocator& allocator, predecessor_type& owner, successor_type& succ) 3138 : graph_task(g, allocator), o(owner), s(succ) {}; 3139 3140 task* execute(execution_data& ed) override { 3141 // TODO revamp: investigate why qualification is needed for register_successor() call 3142 using tbb::detail::d1::register_predecessor; 3143 using tbb::detail::d1::register_successor; 3144 if ( !register_predecessor(s, o) ) { 3145 register_successor(o, s); 3146 } 3147 finalize<register_predecessor_task>(ed); 3148 return nullptr; 3149 } 3150 3151 task* cancel(execution_data& ed) override { 3152 finalize<register_predecessor_task>(ed); 3153 return nullptr; 3154 } 3155 3156 predecessor_type& o; 3157 successor_type& s; 3158 }; 3159 3160 spin_mutex my_mutex; 3161 broadcast_cache< input_type, null_rw_mutex > my_successors; 3162 input_type my_buffer; 3163 bool my_buffer_is_valid; 3164 3165 void reset_node( reset_flags f) override { 3166 my_buffer_is_valid = false; 3167 if (f&rf_clear_edges) { 3168 my_successors.clear(); 3169 } 3170 } 3171 }; // overwrite_node 3172 3173 template< typename T > 3174 class write_once_node : public overwrite_node<T> { 3175 public: 3176 typedef T input_type; 3177 typedef T output_type; 3178 typedef overwrite_node<T> base_type; 3179 typedef typename receiver<input_type>::predecessor_type predecessor_type; 3180 typedef typename sender<output_type>::successor_type successor_type; 3181 3182 //! Constructor 3183 __TBB_NOINLINE_SYM explicit write_once_node(graph& g) : base_type(g) { 3184 fgt_node( CODEPTR(), FLOW_WRITE_ONCE_NODE, &(this->my_graph), 3185 static_cast<receiver<input_type> *>(this), 3186 static_cast<sender<output_type> *>(this) ); 3187 } 3188 3189 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 3190 template <typename... Args> 3191 write_once_node(const node_set<Args...>& nodes) : write_once_node(nodes.graph_reference()) { 3192 make_edges_in_order(nodes, *this); 3193 } 3194 #endif 3195 3196 //! Copy constructor: call base class copy constructor 3197 __TBB_NOINLINE_SYM write_once_node( const write_once_node& src ) : base_type(src) { 3198 fgt_node( CODEPTR(), FLOW_WRITE_ONCE_NODE, &(this->my_graph), 3199 static_cast<receiver<input_type> *>(this), 3200 static_cast<sender<output_type> *>(this) ); 3201 } 3202 3203 protected: 3204 template< typename R, typename B > friend class run_and_put_task; 3205 template<typename X, typename Y> friend class broadcast_cache; 3206 template<typename X, typename Y> friend class round_robin_cache; 3207 graph_task *try_put_task( const T &v ) override { 3208 spin_mutex::scoped_lock l( this->my_mutex ); 3209 return this->my_buffer_is_valid ? nullptr : this->try_put_task_impl(v); 3210 } 3211 }; // write_once_node 3212 3213 inline void set_name(const graph& g, const char *name) { 3214 fgt_graph_desc(&g, name); 3215 } 3216 3217 template <typename Output> 3218 inline void set_name(const input_node<Output>& node, const char *name) { 3219 fgt_node_desc(&node, name); 3220 } 3221 3222 template <typename Input, typename Output, typename Policy> 3223 inline void set_name(const function_node<Input, Output, Policy>& node, const char *name) { 3224 fgt_node_desc(&node, name); 3225 } 3226 3227 template <typename Output, typename Policy> 3228 inline void set_name(const continue_node<Output,Policy>& node, const char *name) { 3229 fgt_node_desc(&node, name); 3230 } 3231 3232 template <typename T> 3233 inline void set_name(const broadcast_node<T>& node, const char *name) { 3234 fgt_node_desc(&node, name); 3235 } 3236 3237 template <typename T> 3238 inline void set_name(const buffer_node<T>& node, const char *name) { 3239 fgt_node_desc(&node, name); 3240 } 3241 3242 template <typename T> 3243 inline void set_name(const queue_node<T>& node, const char *name) { 3244 fgt_node_desc(&node, name); 3245 } 3246 3247 template <typename T> 3248 inline void set_name(const sequencer_node<T>& node, const char *name) { 3249 fgt_node_desc(&node, name); 3250 } 3251 3252 template <typename T, typename Compare> 3253 inline void set_name(const priority_queue_node<T, Compare>& node, const char *name) { 3254 fgt_node_desc(&node, name); 3255 } 3256 3257 template <typename T, typename DecrementType> 3258 inline void set_name(const limiter_node<T, DecrementType>& node, const char *name) { 3259 fgt_node_desc(&node, name); 3260 } 3261 3262 template <typename OutputTuple, typename JP> 3263 inline void set_name(const join_node<OutputTuple, JP>& node, const char *name) { 3264 fgt_node_desc(&node, name); 3265 } 3266 3267 template <typename... Types> 3268 inline void set_name(const indexer_node<Types...>& node, const char *name) { 3269 fgt_node_desc(&node, name); 3270 } 3271 3272 template <typename T> 3273 inline void set_name(const overwrite_node<T>& node, const char *name) { 3274 fgt_node_desc(&node, name); 3275 } 3276 3277 template <typename T> 3278 inline void set_name(const write_once_node<T>& node, const char *name) { 3279 fgt_node_desc(&node, name); 3280 } 3281 3282 template<typename Input, typename Output, typename Policy> 3283 inline void set_name(const multifunction_node<Input, Output, Policy>& node, const char *name) { 3284 fgt_multioutput_node_desc(&node, name); 3285 } 3286 3287 template<typename TupleType> 3288 inline void set_name(const split_node<TupleType>& node, const char *name) { 3289 fgt_multioutput_node_desc(&node, name); 3290 } 3291 3292 template< typename InputTuple, typename OutputTuple > 3293 inline void set_name(const composite_node<InputTuple, OutputTuple>& node, const char *name) { 3294 fgt_multiinput_multioutput_node_desc(&node, name); 3295 } 3296 3297 template<typename Input, typename Output, typename Policy> 3298 inline void set_name(const async_node<Input, Output, Policy>& node, const char *name) 3299 { 3300 fgt_multioutput_node_desc(&node, name); 3301 } 3302 } // d1 3303 } // detail 3304 } // tbb 3305 3306 3307 // Include deduction guides for node classes 3308 #include "detail/_flow_graph_nodes_deduction.h" 3309 3310 namespace tbb { 3311 namespace flow { 3312 inline namespace v1 { 3313 using detail::d1::receiver; 3314 using detail::d1::sender; 3315 3316 using detail::d1::serial; 3317 using detail::d1::unlimited; 3318 3319 using detail::d1::reset_flags; 3320 using detail::d1::rf_reset_protocol; 3321 using detail::d1::rf_reset_bodies; 3322 using detail::d1::rf_clear_edges; 3323 3324 using detail::d1::graph; 3325 using detail::d1::graph_node; 3326 using detail::d1::continue_msg; 3327 3328 using detail::d1::input_node; 3329 using detail::d1::function_node; 3330 using detail::d1::multifunction_node; 3331 using detail::d1::split_node; 3332 using detail::d1::output_port; 3333 using detail::d1::indexer_node; 3334 using detail::d1::tagged_msg; 3335 using detail::d1::cast_to; 3336 using detail::d1::is_a; 3337 using detail::d1::continue_node; 3338 using detail::d1::overwrite_node; 3339 using detail::d1::write_once_node; 3340 using detail::d1::broadcast_node; 3341 using detail::d1::buffer_node; 3342 using detail::d1::queue_node; 3343 using detail::d1::sequencer_node; 3344 using detail::d1::priority_queue_node; 3345 using detail::d1::limiter_node; 3346 using namespace detail::d1::graph_policy_namespace; 3347 using detail::d1::join_node; 3348 using detail::d1::input_port; 3349 using detail::d1::copy_body; 3350 using detail::d1::make_edge; 3351 using detail::d1::remove_edge; 3352 using detail::d1::tag_value; 3353 using detail::d1::composite_node; 3354 using detail::d1::async_node; 3355 using detail::d1::node_priority_t; 3356 using detail::d1::no_priority; 3357 3358 #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET 3359 using detail::d1::follows; 3360 using detail::d1::precedes; 3361 using detail::d1::make_node_set; 3362 using detail::d1::make_edges; 3363 #endif 3364 3365 } // v1 3366 } // flow 3367 3368 using detail::d1::flow_control; 3369 3370 namespace profiling { 3371 using detail::d1::set_name; 3372 } // profiling 3373 3374 } // tbb 3375 3376 3377 #if TBB_USE_PROFILING_TOOLS && ( __unix__ || __APPLE__ ) 3378 // We don't do pragma pop here, since it still gives warning on the USER side 3379 #undef __TBB_NOINLINE_SYM 3380 #endif 3381 3382 #endif // __TBB_flow_graph_H 3383