xref: /oneTBB/include/oneapi/tbb/flow_graph.h (revision a088cfa0)
149e08aacStbbdev /*
2*a088cfa0SKonstantin Boyarinov     Copyright (c) 2005-2023 Intel Corporation
349e08aacStbbdev 
449e08aacStbbdev     Licensed under the Apache License, Version 2.0 (the "License");
549e08aacStbbdev     you may not use this file except in compliance with the License.
649e08aacStbbdev     You may obtain a copy of the License at
749e08aacStbbdev 
849e08aacStbbdev         http://www.apache.org/licenses/LICENSE-2.0
949e08aacStbbdev 
1049e08aacStbbdev     Unless required by applicable law or agreed to in writing, software
1149e08aacStbbdev     distributed under the License is distributed on an "AS IS" BASIS,
1249e08aacStbbdev     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1349e08aacStbbdev     See the License for the specific language governing permissions and
1449e08aacStbbdev     limitations under the License.
1549e08aacStbbdev */
1649e08aacStbbdev 
1749e08aacStbbdev #ifndef __TBB_flow_graph_H
1849e08aacStbbdev #define __TBB_flow_graph_H
1949e08aacStbbdev 
2049e08aacStbbdev #include <atomic>
2149e08aacStbbdev #include <memory>
2249e08aacStbbdev #include <type_traits>
2349e08aacStbbdev 
2449e08aacStbbdev #include "detail/_config.h"
2549e08aacStbbdev #include "detail/_namespace_injection.h"
2649e08aacStbbdev #include "spin_mutex.h"
2749e08aacStbbdev #include "null_mutex.h"
2849e08aacStbbdev #include "spin_rw_mutex.h"
2949e08aacStbbdev #include "null_rw_mutex.h"
3049e08aacStbbdev #include "detail/_pipeline_filters.h"
3149e08aacStbbdev #include "detail/_task.h"
3249e08aacStbbdev #include "detail/_small_object_pool.h"
3349e08aacStbbdev #include "cache_aligned_allocator.h"
3449e08aacStbbdev #include "detail/_exception.h"
3549e08aacStbbdev #include "detail/_template_helpers.h"
3649e08aacStbbdev #include "detail/_aggregator.h"
3749e08aacStbbdev #include "detail/_allocator_traits.h"
38478de5b1Stbbdev #include "detail/_utils.h"
3949e08aacStbbdev #include "profiling.h"
4049e08aacStbbdev #include "task_arena.h"
4149e08aacStbbdev 
42734f0bc0SPablo Romero #if TBB_USE_PROFILING_TOOLS && ( __unix__ || __APPLE__ )
4349e08aacStbbdev    #if __INTEL_COMPILER
4449e08aacStbbdev        // Disabled warning "routine is both inline and noinline"
4549e08aacStbbdev        #pragma warning (push)
4649e08aacStbbdev        #pragma warning( disable: 2196 )
4749e08aacStbbdev    #endif
4849e08aacStbbdev    #define __TBB_NOINLINE_SYM __attribute__((noinline))
4949e08aacStbbdev #else
5049e08aacStbbdev    #define __TBB_NOINLINE_SYM
5149e08aacStbbdev #endif
5249e08aacStbbdev 
5349e08aacStbbdev #include <tuple>
5449e08aacStbbdev #include <list>
5549e08aacStbbdev #include <queue>
56478de5b1Stbbdev #if __TBB_CPP20_CONCEPTS_PRESENT
57478de5b1Stbbdev #include <concepts>
58478de5b1Stbbdev #endif
5949e08aacStbbdev 
6049e08aacStbbdev /** @file
6149e08aacStbbdev   \brief The graph related classes and functions
6249e08aacStbbdev 
6349e08aacStbbdev   There are some applications that best express dependencies as messages
6449e08aacStbbdev   passed between nodes in a graph.  These messages may contain data or
6549e08aacStbbdev   simply act as signals that a predecessors has completed. The graph
6649e08aacStbbdev   class and its associated node classes can be used to express such
6749e08aacStbbdev   applications.
6849e08aacStbbdev */
6949e08aacStbbdev 
7049e08aacStbbdev namespace tbb {
7149e08aacStbbdev namespace detail {
7249e08aacStbbdev 
7349e08aacStbbdev namespace d1 {
7449e08aacStbbdev 
7549e08aacStbbdev //! An enumeration the provides the two most common concurrency levels: unlimited and serial
7649e08aacStbbdev enum concurrency { unlimited = 0, serial = 1 };
7749e08aacStbbdev 
7849e08aacStbbdev //! A generic null type
7949e08aacStbbdev struct null_type {};
8049e08aacStbbdev 
8149e08aacStbbdev //! An empty class used for messages that mean "I'm done"
8249e08aacStbbdev class continue_msg {};
8349e08aacStbbdev 
84478de5b1Stbbdev } // namespace d1
85478de5b1Stbbdev 
86478de5b1Stbbdev #if __TBB_CPP20_CONCEPTS_PRESENT
87478de5b1Stbbdev namespace d0 {
88478de5b1Stbbdev 
89478de5b1Stbbdev template <typename ReturnType, typename OutputType>
90478de5b1Stbbdev concept node_body_return_type = std::same_as<OutputType, tbb::detail::d1::continue_msg> ||
91*a088cfa0SKonstantin Boyarinov                                 std::convertible_to<OutputType, ReturnType>;
92478de5b1Stbbdev 
93*a088cfa0SKonstantin Boyarinov // TODO: consider using std::invocable here
94478de5b1Stbbdev template <typename Body, typename Output>
95478de5b1Stbbdev concept continue_node_body = std::copy_constructible<Body> &&
requires(Body & body,const tbb::detail::d1::continue_msg & v)96478de5b1Stbbdev                              requires( Body& body, const tbb::detail::d1::continue_msg& v ) {
97478de5b1Stbbdev                                  { body(v) } -> node_body_return_type<Output>;
98478de5b1Stbbdev                              };
99478de5b1Stbbdev 
100478de5b1Stbbdev template <typename Body, typename Input, typename Output>
101478de5b1Stbbdev concept function_node_body = std::copy_constructible<Body> &&
102*a088cfa0SKonstantin Boyarinov                              std::invocable<Body&, const Input&> &&
103*a088cfa0SKonstantin Boyarinov                              node_body_return_type<std::invoke_result_t<Body&, const Input&>, Output>;
104478de5b1Stbbdev 
105478de5b1Stbbdev template <typename FunctionObject, typename Input, typename Key>
106478de5b1Stbbdev concept join_node_function_object = std::copy_constructible<FunctionObject> &&
107*a088cfa0SKonstantin Boyarinov                                     std::invocable<FunctionObject&, const Input&> &&
108*a088cfa0SKonstantin Boyarinov                                     std::convertible_to<std::invoke_result_t<FunctionObject&, const Input&>, Key>;
109478de5b1Stbbdev 
110478de5b1Stbbdev template <typename Body, typename Output>
111478de5b1Stbbdev concept input_node_body = std::copy_constructible<Body> &&
requires(Body & body,tbb::detail::d1::flow_control & fc)112478de5b1Stbbdev                           requires( Body& body, tbb::detail::d1::flow_control& fc ) {
113478de5b1Stbbdev                               { body(fc) } -> adaptive_same_as<Output>;
114478de5b1Stbbdev                           };
115478de5b1Stbbdev 
116478de5b1Stbbdev template <typename Body, typename Input, typename OutputPortsType>
117478de5b1Stbbdev concept multifunction_node_body = std::copy_constructible<Body> &&
118*a088cfa0SKonstantin Boyarinov                                   std::invocable<Body&, const Input&, OutputPortsType&>;
119478de5b1Stbbdev 
120478de5b1Stbbdev template <typename Sequencer, typename Value>
121478de5b1Stbbdev concept sequencer = std::copy_constructible<Sequencer> &&
122*a088cfa0SKonstantin Boyarinov                     std::invocable<Sequencer&, const Value&> &&
123*a088cfa0SKonstantin Boyarinov                     std::convertible_to<std::invoke_result_t<Sequencer&, const Value&>, std::size_t>;
124478de5b1Stbbdev 
125478de5b1Stbbdev template <typename Body, typename Input, typename GatewayType>
126478de5b1Stbbdev concept async_node_body = std::copy_constructible<Body> &&
127*a088cfa0SKonstantin Boyarinov                           std::invocable<Body&, const Input&, GatewayType&>;
128478de5b1Stbbdev 
129478de5b1Stbbdev } // namespace d0
130478de5b1Stbbdev #endif // __TBB_CPP20_CONCEPTS_PRESENT
131478de5b1Stbbdev 
132478de5b1Stbbdev namespace d1 {
133478de5b1Stbbdev 
13449e08aacStbbdev //! Forward declaration section
13549e08aacStbbdev template< typename T > class sender;
13649e08aacStbbdev template< typename T > class receiver;
13749e08aacStbbdev class continue_receiver;
13849e08aacStbbdev 
13949e08aacStbbdev template< typename T, typename U > class limiter_node;  // needed for resetting decrementer
14049e08aacStbbdev 
14149e08aacStbbdev template<typename T, typename M> class successor_cache;
14249e08aacStbbdev template<typename T, typename M> class broadcast_cache;
14349e08aacStbbdev template<typename T, typename M> class round_robin_cache;
14449e08aacStbbdev template<typename T, typename M> class predecessor_cache;
14549e08aacStbbdev template<typename T, typename M> class reservable_predecessor_cache;
14649e08aacStbbdev 
14749e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
14849e08aacStbbdev namespace order {
14949e08aacStbbdev struct following;
15049e08aacStbbdev struct preceding;
15149e08aacStbbdev }
15249e08aacStbbdev template<typename Order, typename... Args> struct node_set;
15349e08aacStbbdev #endif
15449e08aacStbbdev 
15549e08aacStbbdev 
15649e08aacStbbdev } // namespace d1
15749e08aacStbbdev } // namespace detail
15849e08aacStbbdev } // namespace tbb
15949e08aacStbbdev 
16049e08aacStbbdev //! The graph class
16149e08aacStbbdev #include "detail/_flow_graph_impl.h"
16249e08aacStbbdev 
16349e08aacStbbdev namespace tbb {
16449e08aacStbbdev namespace detail {
16549e08aacStbbdev namespace d1 {
16649e08aacStbbdev 
order_tasks(graph_task * first,graph_task * second)16749e08aacStbbdev static inline std::pair<graph_task*, graph_task*> order_tasks(graph_task* first, graph_task* second) {
16849e08aacStbbdev     if (second->priority > first->priority)
16949e08aacStbbdev         return std::make_pair(second, first);
17049e08aacStbbdev     return std::make_pair(first, second);
17149e08aacStbbdev }
17249e08aacStbbdev 
17349e08aacStbbdev // submit task if necessary. Returns the non-enqueued task if there is one.
combine_tasks(graph & g,graph_task * left,graph_task * right)17449e08aacStbbdev static inline graph_task* combine_tasks(graph& g, graph_task* left, graph_task* right) {
17549e08aacStbbdev     // if no RHS task, don't change left.
17657f524caSIlya Isaev     if (right == nullptr) return left;
17757f524caSIlya Isaev     // right != nullptr
17857f524caSIlya Isaev     if (left == nullptr) return right;
17949e08aacStbbdev     if (left == SUCCESSFULLY_ENQUEUED) return right;
18049e08aacStbbdev     // left contains a task
18149e08aacStbbdev     if (right != SUCCESSFULLY_ENQUEUED) {
18249e08aacStbbdev         // both are valid tasks
18349e08aacStbbdev         auto tasks_pair = order_tasks(left, right);
18449e08aacStbbdev         spawn_in_graph_arena(g, *tasks_pair.first);
18549e08aacStbbdev         return tasks_pair.second;
18649e08aacStbbdev     }
18749e08aacStbbdev     return left;
18849e08aacStbbdev }
18949e08aacStbbdev 
19049e08aacStbbdev //! Pure virtual template class that defines a sender of messages of type T
19149e08aacStbbdev template< typename T >
19249e08aacStbbdev class sender {
19349e08aacStbbdev public:
~sender()19449e08aacStbbdev     virtual ~sender() {}
19549e08aacStbbdev 
19649e08aacStbbdev     //! Request an item from the sender
try_get(T &)19749e08aacStbbdev     virtual bool try_get( T & ) { return false; }
19849e08aacStbbdev 
19949e08aacStbbdev     //! Reserves an item in the sender
try_reserve(T &)20049e08aacStbbdev     virtual bool try_reserve( T & ) { return false; }
20149e08aacStbbdev 
20249e08aacStbbdev     //! Releases the reserved item
try_release()20349e08aacStbbdev     virtual bool try_release( ) { return false; }
20449e08aacStbbdev 
20549e08aacStbbdev     //! Consumes the reserved item
try_consume()20649e08aacStbbdev     virtual bool try_consume( ) { return false; }
20749e08aacStbbdev 
20849e08aacStbbdev protected:
20949e08aacStbbdev     //! The output type of this sender
21049e08aacStbbdev     typedef T output_type;
21149e08aacStbbdev 
21249e08aacStbbdev     //! The successor type for this node
21349e08aacStbbdev     typedef receiver<T> successor_type;
21449e08aacStbbdev 
21549e08aacStbbdev     //! Add a new successor to this node
21649e08aacStbbdev     virtual bool register_successor( successor_type &r ) = 0;
21749e08aacStbbdev 
21849e08aacStbbdev     //! Removes a successor from this node
21949e08aacStbbdev     virtual bool remove_successor( successor_type &r ) = 0;
22049e08aacStbbdev 
22149e08aacStbbdev     template<typename C>
22249e08aacStbbdev     friend bool register_successor(sender<C>& s, receiver<C>& r);
22349e08aacStbbdev 
22449e08aacStbbdev     template<typename C>
22549e08aacStbbdev     friend bool remove_successor  (sender<C>& s, receiver<C>& r);
22649e08aacStbbdev };  // class sender<T>
22749e08aacStbbdev 
22849e08aacStbbdev template<typename C>
register_successor(sender<C> & s,receiver<C> & r)22949e08aacStbbdev bool register_successor(sender<C>& s, receiver<C>& r) {
23049e08aacStbbdev     return s.register_successor(r);
23149e08aacStbbdev }
23249e08aacStbbdev 
23349e08aacStbbdev template<typename C>
remove_successor(sender<C> & s,receiver<C> & r)23449e08aacStbbdev bool remove_successor(sender<C>& s, receiver<C>& r) {
23549e08aacStbbdev     return s.remove_successor(r);
23649e08aacStbbdev }
23749e08aacStbbdev 
23849e08aacStbbdev //! Pure virtual template class that defines a receiver of messages of type T
23949e08aacStbbdev template< typename T >
24049e08aacStbbdev class receiver {
24149e08aacStbbdev public:
24249e08aacStbbdev     //! Destructor
~receiver()24349e08aacStbbdev     virtual ~receiver() {}
24449e08aacStbbdev 
24549e08aacStbbdev     //! Put an item to the receiver
try_put(const T & t)24649e08aacStbbdev     bool try_put( const T& t ) {
24749e08aacStbbdev         graph_task *res = try_put_task(t);
24849e08aacStbbdev         if (!res) return false;
24949e08aacStbbdev         if (res != SUCCESSFULLY_ENQUEUED) spawn_in_graph_arena(graph_reference(), *res);
25049e08aacStbbdev         return true;
25149e08aacStbbdev     }
25249e08aacStbbdev 
25349e08aacStbbdev     //! put item to successor; return task to run the successor if possible.
25449e08aacStbbdev protected:
25549e08aacStbbdev     //! The input type of this receiver
25649e08aacStbbdev     typedef T input_type;
25749e08aacStbbdev 
25849e08aacStbbdev     //! The predecessor type for this node
25949e08aacStbbdev     typedef sender<T> predecessor_type;
26049e08aacStbbdev 
26149e08aacStbbdev     template< typename R, typename B > friend class run_and_put_task;
26249e08aacStbbdev     template< typename X, typename Y > friend class broadcast_cache;
26349e08aacStbbdev     template< typename X, typename Y > friend class round_robin_cache;
26449e08aacStbbdev     virtual graph_task *try_put_task(const T& t) = 0;
26549e08aacStbbdev     virtual graph& graph_reference() const = 0;
26649e08aacStbbdev 
26749e08aacStbbdev     template<typename TT, typename M> friend class successor_cache;
is_continue_receiver()26849e08aacStbbdev     virtual bool is_continue_receiver() { return false; }
26949e08aacStbbdev 
27049e08aacStbbdev     // TODO revamp: reconsider the inheritance and move node priority out of receiver
priority()27149e08aacStbbdev     virtual node_priority_t priority() const { return no_priority; }
27249e08aacStbbdev 
27349e08aacStbbdev     //! Add a predecessor to the node
register_predecessor(predecessor_type &)27449e08aacStbbdev     virtual bool register_predecessor( predecessor_type & ) { return false; }
27549e08aacStbbdev 
27649e08aacStbbdev     //! Remove a predecessor from the node
remove_predecessor(predecessor_type &)27749e08aacStbbdev     virtual bool remove_predecessor( predecessor_type & ) { return false; }
27849e08aacStbbdev 
27949e08aacStbbdev     template <typename C>
28049e08aacStbbdev     friend bool register_predecessor(receiver<C>& r, sender<C>& s);
28149e08aacStbbdev     template <typename C>
28249e08aacStbbdev     friend bool remove_predecessor  (receiver<C>& r, sender<C>& s);
28349e08aacStbbdev }; // class receiver<T>
28449e08aacStbbdev 
28549e08aacStbbdev template <typename C>
register_predecessor(receiver<C> & r,sender<C> & s)28649e08aacStbbdev bool register_predecessor(receiver<C>& r, sender<C>& s) {
28749e08aacStbbdev     return r.register_predecessor(s);
28849e08aacStbbdev }
28949e08aacStbbdev 
29049e08aacStbbdev template <typename C>
remove_predecessor(receiver<C> & r,sender<C> & s)29149e08aacStbbdev bool remove_predecessor(receiver<C>& r, sender<C>& s) {
29249e08aacStbbdev     return r.remove_predecessor(s);
29349e08aacStbbdev }
29449e08aacStbbdev 
29549e08aacStbbdev //! Base class for receivers of completion messages
29649e08aacStbbdev /** These receivers automatically reset, but cannot be explicitly waited on */
29749e08aacStbbdev class continue_receiver : public receiver< continue_msg > {
29849e08aacStbbdev protected:
29949e08aacStbbdev 
30049e08aacStbbdev     //! Constructor
continue_receiver(int number_of_predecessors,node_priority_t a_priority)30149e08aacStbbdev     explicit continue_receiver( int number_of_predecessors, node_priority_t a_priority ) {
30249e08aacStbbdev         my_predecessor_count = my_initial_predecessor_count = number_of_predecessors;
30349e08aacStbbdev         my_current_count = 0;
30449e08aacStbbdev         my_priority = a_priority;
30549e08aacStbbdev     }
30649e08aacStbbdev 
30749e08aacStbbdev     //! Copy constructor
continue_receiver(const continue_receiver & src)30849e08aacStbbdev     continue_receiver( const continue_receiver& src ) : receiver<continue_msg>() {
30949e08aacStbbdev         my_predecessor_count = my_initial_predecessor_count = src.my_initial_predecessor_count;
31049e08aacStbbdev         my_current_count = 0;
31149e08aacStbbdev         my_priority = src.my_priority;
31249e08aacStbbdev     }
31349e08aacStbbdev 
31449e08aacStbbdev     //! Increments the trigger threshold
register_predecessor(predecessor_type &)31549e08aacStbbdev     bool register_predecessor( predecessor_type & ) override {
31649e08aacStbbdev         spin_mutex::scoped_lock l(my_mutex);
31749e08aacStbbdev         ++my_predecessor_count;
31849e08aacStbbdev         return true;
31949e08aacStbbdev     }
32049e08aacStbbdev 
32149e08aacStbbdev     //! Decrements the trigger threshold
32249e08aacStbbdev     /** Does not check to see if the removal of the predecessor now makes the current count
32349e08aacStbbdev         exceed the new threshold.  So removing a predecessor while the graph is active can cause
32449e08aacStbbdev         unexpected results. */
remove_predecessor(predecessor_type &)32549e08aacStbbdev     bool remove_predecessor( predecessor_type & ) override {
32649e08aacStbbdev         spin_mutex::scoped_lock l(my_mutex);
32749e08aacStbbdev         --my_predecessor_count;
32849e08aacStbbdev         return true;
32949e08aacStbbdev     }
33049e08aacStbbdev 
33149e08aacStbbdev     //! The input type
33249e08aacStbbdev     typedef continue_msg input_type;
33349e08aacStbbdev 
33449e08aacStbbdev     //! The predecessor type for this node
33549e08aacStbbdev     typedef receiver<input_type>::predecessor_type predecessor_type;
33649e08aacStbbdev 
33749e08aacStbbdev     template< typename R, typename B > friend class run_and_put_task;
33849e08aacStbbdev     template<typename X, typename Y> friend class broadcast_cache;
33949e08aacStbbdev     template<typename X, typename Y> friend class round_robin_cache;
34049e08aacStbbdev     // execute body is supposed to be too small to create a task for.
try_put_task(const input_type &)34149e08aacStbbdev     graph_task* try_put_task( const input_type & ) override {
34249e08aacStbbdev         {
34349e08aacStbbdev             spin_mutex::scoped_lock l(my_mutex);
34449e08aacStbbdev             if ( ++my_current_count < my_predecessor_count )
34549e08aacStbbdev                 return SUCCESSFULLY_ENQUEUED;
34649e08aacStbbdev             else
34749e08aacStbbdev                 my_current_count = 0;
34849e08aacStbbdev         }
34949e08aacStbbdev         graph_task* res = execute();
35049e08aacStbbdev         return res? res : SUCCESSFULLY_ENQUEUED;
35149e08aacStbbdev     }
35249e08aacStbbdev 
35349e08aacStbbdev     spin_mutex my_mutex;
35449e08aacStbbdev     int my_predecessor_count;
35549e08aacStbbdev     int my_current_count;
35649e08aacStbbdev     int my_initial_predecessor_count;
35749e08aacStbbdev     node_priority_t my_priority;
35849e08aacStbbdev     // the friend declaration in the base class did not eliminate the "protected class"
35949e08aacStbbdev     // error in gcc 4.1.2
36049e08aacStbbdev     template<typename U, typename V> friend class limiter_node;
36149e08aacStbbdev 
reset_receiver(reset_flags f)36249e08aacStbbdev     virtual void reset_receiver( reset_flags f ) {
36349e08aacStbbdev         my_current_count = 0;
36449e08aacStbbdev         if (f & rf_clear_edges) {
36549e08aacStbbdev             my_predecessor_count = my_initial_predecessor_count;
36649e08aacStbbdev         }
36749e08aacStbbdev     }
36849e08aacStbbdev 
36949e08aacStbbdev     //! Does whatever should happen when the threshold is reached
37049e08aacStbbdev     /** This should be very fast or else spawn a task.  This is
37149e08aacStbbdev         called while the sender is blocked in the try_put(). */
37249e08aacStbbdev     virtual graph_task* execute() = 0;
37349e08aacStbbdev     template<typename TT, typename M> friend class successor_cache;
is_continue_receiver()37449e08aacStbbdev     bool is_continue_receiver() override { return true; }
37549e08aacStbbdev 
priority()37649e08aacStbbdev     node_priority_t priority() const override { return my_priority; }
37749e08aacStbbdev }; // class continue_receiver
37849e08aacStbbdev 
37949e08aacStbbdev #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
38049e08aacStbbdev     template <typename K, typename T>
key_from_message(const T & t)38149e08aacStbbdev     K key_from_message( const T &t ) {
38249e08aacStbbdev         return t.key();
38349e08aacStbbdev     }
38449e08aacStbbdev #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
38549e08aacStbbdev 
38649e08aacStbbdev } // d1
38749e08aacStbbdev } // detail
38849e08aacStbbdev } // tbb
38949e08aacStbbdev 
39049e08aacStbbdev #include "detail/_flow_graph_trace_impl.h"
39149e08aacStbbdev #include "detail/_hash_compare.h"
39249e08aacStbbdev 
39349e08aacStbbdev namespace tbb {
39449e08aacStbbdev namespace detail {
39549e08aacStbbdev namespace d1 {
39649e08aacStbbdev 
39749e08aacStbbdev #include "detail/_flow_graph_body_impl.h"
39849e08aacStbbdev #include "detail/_flow_graph_cache_impl.h"
39949e08aacStbbdev #include "detail/_flow_graph_types_impl.h"
40049e08aacStbbdev 
40149e08aacStbbdev using namespace graph_policy_namespace;
40249e08aacStbbdev 
40349e08aacStbbdev template <typename C, typename N>
graph_iterator(C * g,bool begin)40457f524caSIlya Isaev graph_iterator<C,N>::graph_iterator(C *g, bool begin) : my_graph(g), current_node(nullptr)
40549e08aacStbbdev {
40649e08aacStbbdev     if (begin) current_node = my_graph->my_nodes;
40749e08aacStbbdev     //else it is an end iterator by default
40849e08aacStbbdev }
40949e08aacStbbdev 
41049e08aacStbbdev template <typename C, typename N>
41149e08aacStbbdev typename graph_iterator<C,N>::reference graph_iterator<C,N>::operator*() const {
41249e08aacStbbdev     __TBB_ASSERT(current_node, "graph_iterator at end");
41349e08aacStbbdev     return *operator->();
41449e08aacStbbdev }
41549e08aacStbbdev 
41649e08aacStbbdev template <typename C, typename N>
41749e08aacStbbdev typename graph_iterator<C,N>::pointer graph_iterator<C,N>::operator->() const {
41849e08aacStbbdev     return current_node;
41949e08aacStbbdev }
42049e08aacStbbdev 
42149e08aacStbbdev template <typename C, typename N>
internal_forward()42249e08aacStbbdev void graph_iterator<C,N>::internal_forward() {
42349e08aacStbbdev     if (current_node) current_node = current_node->next;
42449e08aacStbbdev }
42549e08aacStbbdev 
42649e08aacStbbdev //! Constructs a graph with isolated task_group_context
graph()42757f524caSIlya Isaev inline graph::graph() : my_wait_context(0), my_nodes(nullptr), my_nodes_last(nullptr), my_task_arena(nullptr) {
42849e08aacStbbdev     prepare_task_arena();
42949e08aacStbbdev     own_context = true;
43049e08aacStbbdev     cancelled = false;
43149e08aacStbbdev     caught_exception = false;
43249e08aacStbbdev     my_context = new (r1::cache_aligned_allocate(sizeof(task_group_context))) task_group_context(FLOW_TASKS);
43349e08aacStbbdev     fgt_graph(this);
43449e08aacStbbdev     my_is_active = true;
43549e08aacStbbdev }
43649e08aacStbbdev 
graph(task_group_context & use_this_context)43749e08aacStbbdev inline graph::graph(task_group_context& use_this_context) :
43857f524caSIlya Isaev     my_wait_context(0), my_context(&use_this_context), my_nodes(nullptr), my_nodes_last(nullptr), my_task_arena(nullptr) {
43949e08aacStbbdev     prepare_task_arena();
44049e08aacStbbdev     own_context = false;
44149e08aacStbbdev     cancelled = false;
44249e08aacStbbdev     caught_exception = false;
44349e08aacStbbdev     fgt_graph(this);
44449e08aacStbbdev     my_is_active = true;
44549e08aacStbbdev }
44649e08aacStbbdev 
~graph()44749e08aacStbbdev inline graph::~graph() {
44849e08aacStbbdev     wait_for_all();
44949e08aacStbbdev     if (own_context) {
45049e08aacStbbdev         my_context->~task_group_context();
45149e08aacStbbdev         r1::cache_aligned_deallocate(my_context);
45249e08aacStbbdev     }
45349e08aacStbbdev     delete my_task_arena;
45449e08aacStbbdev }
45549e08aacStbbdev 
reserve_wait()45649e08aacStbbdev inline void graph::reserve_wait() {
45749e08aacStbbdev     my_wait_context.reserve();
45849e08aacStbbdev     fgt_reserve_wait(this);
45949e08aacStbbdev }
46049e08aacStbbdev 
release_wait()46149e08aacStbbdev inline void graph::release_wait() {
46249e08aacStbbdev     fgt_release_wait(this);
46349e08aacStbbdev     my_wait_context.release();
46449e08aacStbbdev }
46549e08aacStbbdev 
register_node(graph_node * n)46649e08aacStbbdev inline void graph::register_node(graph_node *n) {
46757f524caSIlya Isaev     n->next = nullptr;
46849e08aacStbbdev     {
46949e08aacStbbdev         spin_mutex::scoped_lock lock(nodelist_mutex);
47049e08aacStbbdev         n->prev = my_nodes_last;
47149e08aacStbbdev         if (my_nodes_last) my_nodes_last->next = n;
47249e08aacStbbdev         my_nodes_last = n;
47349e08aacStbbdev         if (!my_nodes) my_nodes = n;
47449e08aacStbbdev     }
47549e08aacStbbdev }
47649e08aacStbbdev 
remove_node(graph_node * n)47749e08aacStbbdev inline void graph::remove_node(graph_node *n) {
47849e08aacStbbdev     {
47949e08aacStbbdev         spin_mutex::scoped_lock lock(nodelist_mutex);
48049e08aacStbbdev         __TBB_ASSERT(my_nodes && my_nodes_last, "graph::remove_node: Error: no registered nodes");
48149e08aacStbbdev         if (n->prev) n->prev->next = n->next;
48249e08aacStbbdev         if (n->next) n->next->prev = n->prev;
48349e08aacStbbdev         if (my_nodes_last == n) my_nodes_last = n->prev;
48449e08aacStbbdev         if (my_nodes == n) my_nodes = n->next;
48549e08aacStbbdev     }
48657f524caSIlya Isaev     n->prev = n->next = nullptr;
48749e08aacStbbdev }
48849e08aacStbbdev 
reset(reset_flags f)48949e08aacStbbdev inline void graph::reset( reset_flags f ) {
49049e08aacStbbdev     // reset context
49149e08aacStbbdev     deactivate_graph(*this);
49249e08aacStbbdev 
49349e08aacStbbdev     my_context->reset();
49449e08aacStbbdev     cancelled = false;
49549e08aacStbbdev     caught_exception = false;
49649e08aacStbbdev     // reset all the nodes comprising the graph
49749e08aacStbbdev     for(iterator ii = begin(); ii != end(); ++ii) {
49849e08aacStbbdev         graph_node *my_p = &(*ii);
49949e08aacStbbdev         my_p->reset_node(f);
50049e08aacStbbdev     }
50149e08aacStbbdev     // Reattach the arena. Might be useful to run the graph in a particular task_arena
50249e08aacStbbdev     // while not limiting graph lifetime to a single task_arena::execute() call.
50349e08aacStbbdev     prepare_task_arena( /*reinit=*/true );
50449e08aacStbbdev     activate_graph(*this);
50549e08aacStbbdev }
50649e08aacStbbdev 
cancel()50749e08aacStbbdev inline void graph::cancel() {
50849e08aacStbbdev     my_context->cancel_group_execution();
50949e08aacStbbdev }
51049e08aacStbbdev 
begin()51149e08aacStbbdev inline graph::iterator graph::begin() { return iterator(this, true); }
51249e08aacStbbdev 
end()51349e08aacStbbdev inline graph::iterator graph::end() { return iterator(this, false); }
51449e08aacStbbdev 
begin()51549e08aacStbbdev inline graph::const_iterator graph::begin() const { return const_iterator(this, true); }
51649e08aacStbbdev 
end()51749e08aacStbbdev inline graph::const_iterator graph::end() const { return const_iterator(this, false); }
51849e08aacStbbdev 
cbegin()51949e08aacStbbdev inline graph::const_iterator graph::cbegin() const { return const_iterator(this, true); }
52049e08aacStbbdev 
cend()52149e08aacStbbdev inline graph::const_iterator graph::cend() const { return const_iterator(this, false); }
52249e08aacStbbdev 
graph_node(graph & g)52349e08aacStbbdev inline graph_node::graph_node(graph& g) : my_graph(g) {
52449e08aacStbbdev     my_graph.register_node(this);
52549e08aacStbbdev }
52649e08aacStbbdev 
~graph_node()52749e08aacStbbdev inline graph_node::~graph_node() {
52849e08aacStbbdev     my_graph.remove_node(this);
52949e08aacStbbdev }
53049e08aacStbbdev 
53149e08aacStbbdev #include "detail/_flow_graph_node_impl.h"
53249e08aacStbbdev 
53349e08aacStbbdev 
53449e08aacStbbdev //! An executable node that acts as a source, i.e. it has no predecessors
53549e08aacStbbdev 
53649e08aacStbbdev template < typename Output >
__TBB_requires(std::copyable<Output>)537478de5b1Stbbdev     __TBB_requires(std::copyable<Output>)
53849e08aacStbbdev class input_node : public graph_node, public sender< Output > {
53949e08aacStbbdev public:
54049e08aacStbbdev     //! The type of the output message, which is complete
54149e08aacStbbdev     typedef Output output_type;
54249e08aacStbbdev 
54349e08aacStbbdev     //! The type of successors of this node
54449e08aacStbbdev     typedef typename sender<output_type>::successor_type successor_type;
54549e08aacStbbdev 
54649e08aacStbbdev     // Input node has no input type
54749e08aacStbbdev     typedef null_type input_type;
54849e08aacStbbdev 
54949e08aacStbbdev     //! Constructor for a node with a successor
55049e08aacStbbdev     template< typename Body >
551478de5b1Stbbdev         __TBB_requires(input_node_body<Body, Output>)
55249e08aacStbbdev      __TBB_NOINLINE_SYM input_node( graph &g, Body body )
55349e08aacStbbdev          : graph_node(g), my_active(false)
55449e08aacStbbdev          , my_body( new input_body_leaf< output_type, Body>(body) )
55549e08aacStbbdev          , my_init_body( new input_body_leaf< output_type, Body>(body) )
55649e08aacStbbdev          , my_successors(this), my_reserved(false), my_has_cached_item(false)
55749e08aacStbbdev     {
55849e08aacStbbdev         fgt_node_with_body(CODEPTR(), FLOW_INPUT_NODE, &this->my_graph,
55949e08aacStbbdev                            static_cast<sender<output_type> *>(this), this->my_body);
56049e08aacStbbdev     }
56149e08aacStbbdev 
56249e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
56349e08aacStbbdev     template <typename Body, typename... Successors>
564478de5b1Stbbdev         __TBB_requires(input_node_body<Body, Output>)
56549e08aacStbbdev     input_node( const node_set<order::preceding, Successors...>& successors, Body body )
56649e08aacStbbdev         : input_node(successors.graph_reference(), body)
56749e08aacStbbdev     {
56849e08aacStbbdev         make_edges(*this, successors);
56949e08aacStbbdev     }
57049e08aacStbbdev #endif
57149e08aacStbbdev 
57249e08aacStbbdev     //! Copy constructor
57349e08aacStbbdev     __TBB_NOINLINE_SYM input_node( const input_node& src )
57449e08aacStbbdev         : graph_node(src.my_graph), sender<Output>()
57549e08aacStbbdev         , my_active(false)
57649e08aacStbbdev         , my_body(src.my_init_body->clone()), my_init_body(src.my_init_body->clone())
57749e08aacStbbdev         , my_successors(this), my_reserved(false), my_has_cached_item(false)
57849e08aacStbbdev     {
57949e08aacStbbdev         fgt_node_with_body(CODEPTR(), FLOW_INPUT_NODE, &this->my_graph,
58049e08aacStbbdev                            static_cast<sender<output_type> *>(this), this->my_body);
58149e08aacStbbdev     }
58249e08aacStbbdev 
58349e08aacStbbdev     //! The destructor
58449e08aacStbbdev     ~input_node() { delete my_body; delete my_init_body; }
58549e08aacStbbdev 
58649e08aacStbbdev     //! Add a new successor to this node
58749e08aacStbbdev     bool register_successor( successor_type &r ) override {
58849e08aacStbbdev         spin_mutex::scoped_lock lock(my_mutex);
58949e08aacStbbdev         my_successors.register_successor(r);
59049e08aacStbbdev         if ( my_active )
59149e08aacStbbdev             spawn_put();
59249e08aacStbbdev         return true;
59349e08aacStbbdev     }
59449e08aacStbbdev 
59549e08aacStbbdev     //! Removes a successor from this node
59649e08aacStbbdev     bool remove_successor( successor_type &r ) override {
59749e08aacStbbdev         spin_mutex::scoped_lock lock(my_mutex);
59849e08aacStbbdev         my_successors.remove_successor(r);
59949e08aacStbbdev         return true;
60049e08aacStbbdev     }
60149e08aacStbbdev 
60249e08aacStbbdev     //! Request an item from the node
60349e08aacStbbdev     bool try_get( output_type &v ) override {
60449e08aacStbbdev         spin_mutex::scoped_lock lock(my_mutex);
60549e08aacStbbdev         if ( my_reserved )
60649e08aacStbbdev             return false;
60749e08aacStbbdev 
60849e08aacStbbdev         if ( my_has_cached_item ) {
60949e08aacStbbdev             v = my_cached_item;
61049e08aacStbbdev             my_has_cached_item = false;
61149e08aacStbbdev             return true;
61249e08aacStbbdev         }
61349e08aacStbbdev         // we've been asked to provide an item, but we have none.  enqueue a task to
61449e08aacStbbdev         // provide one.
61549e08aacStbbdev         if ( my_active )
61649e08aacStbbdev             spawn_put();
61749e08aacStbbdev         return false;
61849e08aacStbbdev     }
61949e08aacStbbdev 
62049e08aacStbbdev     //! Reserves an item.
62149e08aacStbbdev     bool try_reserve( output_type &v ) override {
62249e08aacStbbdev         spin_mutex::scoped_lock lock(my_mutex);
62349e08aacStbbdev         if ( my_reserved ) {
62449e08aacStbbdev             return false;
62549e08aacStbbdev         }
62649e08aacStbbdev 
62749e08aacStbbdev         if ( my_has_cached_item ) {
62849e08aacStbbdev             v = my_cached_item;
62949e08aacStbbdev             my_reserved = true;
63049e08aacStbbdev             return true;
63149e08aacStbbdev         } else {
63249e08aacStbbdev             return false;
63349e08aacStbbdev         }
63449e08aacStbbdev     }
63549e08aacStbbdev 
63649e08aacStbbdev     //! Release a reserved item.
63749e08aacStbbdev     /** true = item has been released and so remains in sender, dest must request or reserve future items */
63849e08aacStbbdev     bool try_release( ) override {
63949e08aacStbbdev         spin_mutex::scoped_lock lock(my_mutex);
64049e08aacStbbdev         __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
64149e08aacStbbdev         my_reserved = false;
64249e08aacStbbdev         if(!my_successors.empty())
64349e08aacStbbdev             spawn_put();
64449e08aacStbbdev         return true;
64549e08aacStbbdev     }
64649e08aacStbbdev 
64749e08aacStbbdev     //! Consumes a reserved item
64849e08aacStbbdev     bool try_consume( ) override {
64949e08aacStbbdev         spin_mutex::scoped_lock lock(my_mutex);
65049e08aacStbbdev         __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
65149e08aacStbbdev         my_reserved = false;
65249e08aacStbbdev         my_has_cached_item = false;
65349e08aacStbbdev         if ( !my_successors.empty() ) {
65449e08aacStbbdev             spawn_put();
65549e08aacStbbdev         }
65649e08aacStbbdev         return true;
65749e08aacStbbdev     }
65849e08aacStbbdev 
65949e08aacStbbdev     //! Activates a node that was created in the inactive state
66049e08aacStbbdev     void activate() {
66149e08aacStbbdev         spin_mutex::scoped_lock lock(my_mutex);
66249e08aacStbbdev         my_active = true;
66349e08aacStbbdev         if (!my_successors.empty())
66449e08aacStbbdev             spawn_put();
66549e08aacStbbdev     }
66649e08aacStbbdev 
66749e08aacStbbdev     template<typename Body>
66849e08aacStbbdev     Body copy_function_object() {
66949e08aacStbbdev         input_body<output_type> &body_ref = *this->my_body;
67049e08aacStbbdev         return dynamic_cast< input_body_leaf<output_type, Body> & >(body_ref).get_body();
67149e08aacStbbdev     }
67249e08aacStbbdev 
67349e08aacStbbdev protected:
67449e08aacStbbdev 
67549e08aacStbbdev     //! resets the input_node to its initial state
67649e08aacStbbdev     void reset_node( reset_flags f) override {
67749e08aacStbbdev         my_active = false;
67849e08aacStbbdev         my_reserved = false;
67949e08aacStbbdev         my_has_cached_item = false;
68049e08aacStbbdev 
68149e08aacStbbdev         if(f & rf_clear_edges) my_successors.clear();
68249e08aacStbbdev         if(f & rf_reset_bodies) {
68349e08aacStbbdev             input_body<output_type> *tmp = my_init_body->clone();
68449e08aacStbbdev             delete my_body;
68549e08aacStbbdev             my_body = tmp;
68649e08aacStbbdev         }
68749e08aacStbbdev     }
68849e08aacStbbdev 
68949e08aacStbbdev private:
69049e08aacStbbdev     spin_mutex my_mutex;
69149e08aacStbbdev     bool my_active;
69249e08aacStbbdev     input_body<output_type> *my_body;
69349e08aacStbbdev     input_body<output_type> *my_init_body;
69449e08aacStbbdev     broadcast_cache< output_type > my_successors;
69549e08aacStbbdev     bool my_reserved;
69649e08aacStbbdev     bool my_has_cached_item;
69749e08aacStbbdev     output_type my_cached_item;
69849e08aacStbbdev 
69949e08aacStbbdev     // used by apply_body_bypass, can invoke body of node.
70049e08aacStbbdev     bool try_reserve_apply_body(output_type &v) {
70149e08aacStbbdev         spin_mutex::scoped_lock lock(my_mutex);
70249e08aacStbbdev         if ( my_reserved ) {
70349e08aacStbbdev             return false;
70449e08aacStbbdev         }
70549e08aacStbbdev         if ( !my_has_cached_item ) {
70649e08aacStbbdev             flow_control control;
70749e08aacStbbdev 
70849e08aacStbbdev             fgt_begin_body( my_body );
70949e08aacStbbdev 
71049e08aacStbbdev             my_cached_item = (*my_body)(control);
71149e08aacStbbdev             my_has_cached_item = !control.is_pipeline_stopped;
71249e08aacStbbdev 
71349e08aacStbbdev             fgt_end_body( my_body );
71449e08aacStbbdev         }
71549e08aacStbbdev         if ( my_has_cached_item ) {
71649e08aacStbbdev             v = my_cached_item;
71749e08aacStbbdev             my_reserved = true;
71849e08aacStbbdev             return true;
71949e08aacStbbdev         } else {
72049e08aacStbbdev             return false;
72149e08aacStbbdev         }
72249e08aacStbbdev     }
72349e08aacStbbdev 
72449e08aacStbbdev     graph_task* create_put_task() {
72549e08aacStbbdev         small_object_allocator allocator{};
72649e08aacStbbdev         typedef input_node_task_bypass< input_node<output_type> > task_type;
72749e08aacStbbdev         graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this);
72849e08aacStbbdev         my_graph.reserve_wait();
72949e08aacStbbdev         return t;
73049e08aacStbbdev     }
73149e08aacStbbdev 
73249e08aacStbbdev     //! Spawns a task that applies the body
73349e08aacStbbdev     void spawn_put( ) {
73449e08aacStbbdev         if(is_graph_active(this->my_graph)) {
73549e08aacStbbdev             spawn_in_graph_arena(this->my_graph, *create_put_task());
73649e08aacStbbdev         }
73749e08aacStbbdev     }
73849e08aacStbbdev 
73949e08aacStbbdev     friend class input_node_task_bypass< input_node<output_type> >;
74049e08aacStbbdev     //! Applies the body.  Returning SUCCESSFULLY_ENQUEUED okay; forward_task_bypass will handle it.
74149e08aacStbbdev     graph_task* apply_body_bypass( ) {
74249e08aacStbbdev         output_type v;
74349e08aacStbbdev         if ( !try_reserve_apply_body(v) )
74457f524caSIlya Isaev             return nullptr;
74549e08aacStbbdev 
74649e08aacStbbdev         graph_task *last_task = my_successors.try_put_task(v);
74749e08aacStbbdev         if ( last_task )
74849e08aacStbbdev             try_consume();
74949e08aacStbbdev         else
75049e08aacStbbdev             try_release();
75149e08aacStbbdev         return last_task;
75249e08aacStbbdev     }
75349e08aacStbbdev };  // class input_node
75449e08aacStbbdev 
75549e08aacStbbdev //! Implements a function node that supports Input -> Output
75649e08aacStbbdev template<typename Input, typename Output = continue_msg, typename Policy = queueing>
__TBB_requires(std::default_initializable<Input> && std::copy_constructible<Input> && std::copy_constructible<Output>)757478de5b1Stbbdev     __TBB_requires(std::default_initializable<Input> &&
758478de5b1Stbbdev                    std::copy_constructible<Input> &&
759478de5b1Stbbdev                    std::copy_constructible<Output>)
76049e08aacStbbdev class function_node
76149e08aacStbbdev     : public graph_node
76249e08aacStbbdev     , public function_input< Input, Output, Policy, cache_aligned_allocator<Input> >
76349e08aacStbbdev     , public function_output<Output>
76449e08aacStbbdev {
76549e08aacStbbdev     typedef cache_aligned_allocator<Input> internals_allocator;
76649e08aacStbbdev 
76749e08aacStbbdev public:
76849e08aacStbbdev     typedef Input input_type;
76949e08aacStbbdev     typedef Output output_type;
77049e08aacStbbdev     typedef function_input<input_type,output_type,Policy,internals_allocator> input_impl_type;
77149e08aacStbbdev     typedef function_input_queue<input_type, internals_allocator> input_queue_type;
77249e08aacStbbdev     typedef function_output<output_type> fOutput_type;
77349e08aacStbbdev     typedef typename input_impl_type::predecessor_type predecessor_type;
77449e08aacStbbdev     typedef typename fOutput_type::successor_type successor_type;
77549e08aacStbbdev 
77649e08aacStbbdev     using input_impl_type::my_predecessors;
77749e08aacStbbdev 
77849e08aacStbbdev     //! Constructor
77949e08aacStbbdev     // input_queue_type is allocated here, but destroyed in the function_input_base.
78049e08aacStbbdev     // TODO: pass the graph_buffer_policy to the function_input_base so it can all
78149e08aacStbbdev     // be done in one place.  This would be an interface-breaking change.
78249e08aacStbbdev     template< typename Body >
783478de5b1Stbbdev         __TBB_requires(function_node_body<Body, Input, Output>)
78449e08aacStbbdev      __TBB_NOINLINE_SYM function_node( graph &g, size_t concurrency,
78549e08aacStbbdev                    Body body, Policy = Policy(), node_priority_t a_priority = no_priority )
78649e08aacStbbdev         : graph_node(g), input_impl_type(g, concurrency, body, a_priority),
78749e08aacStbbdev           fOutput_type(g) {
78849e08aacStbbdev         fgt_node_with_body( CODEPTR(), FLOW_FUNCTION_NODE, &this->my_graph,
78949e08aacStbbdev                 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
79049e08aacStbbdev     }
79149e08aacStbbdev 
79249e08aacStbbdev     template <typename Body>
793478de5b1Stbbdev         __TBB_requires(function_node_body<Body, Input, Output>)
79449e08aacStbbdev     function_node( graph& g, size_t concurrency, Body body, node_priority_t a_priority )
79549e08aacStbbdev         : function_node(g, concurrency, body, Policy(), a_priority) {}
79649e08aacStbbdev 
79749e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
79849e08aacStbbdev     template <typename Body, typename... Args>
799478de5b1Stbbdev         __TBB_requires(function_node_body<Body, Input, Output>)
80049e08aacStbbdev     function_node( const node_set<Args...>& nodes, size_t concurrency, Body body,
80149e08aacStbbdev                    Policy p = Policy(), node_priority_t a_priority = no_priority )
80249e08aacStbbdev         : function_node(nodes.graph_reference(), concurrency, body, p, a_priority) {
80349e08aacStbbdev         make_edges_in_order(nodes, *this);
80449e08aacStbbdev     }
80549e08aacStbbdev 
80649e08aacStbbdev     template <typename Body, typename... Args>
807478de5b1Stbbdev         __TBB_requires(function_node_body<Body, Input, Output>)
80849e08aacStbbdev     function_node( const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t a_priority )
80949e08aacStbbdev         : function_node(nodes, concurrency, body, Policy(), a_priority) {}
81049e08aacStbbdev #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
81149e08aacStbbdev 
81249e08aacStbbdev     //! Copy constructor
81349e08aacStbbdev     __TBB_NOINLINE_SYM function_node( const function_node& src ) :
81449e08aacStbbdev         graph_node(src.my_graph),
81549e08aacStbbdev         input_impl_type(src),
81649e08aacStbbdev         fOutput_type(src.my_graph) {
81749e08aacStbbdev         fgt_node_with_body( CODEPTR(), FLOW_FUNCTION_NODE, &this->my_graph,
81849e08aacStbbdev                 static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
81949e08aacStbbdev     }
82049e08aacStbbdev 
82149e08aacStbbdev protected:
82249e08aacStbbdev     template< typename R, typename B > friend class run_and_put_task;
82349e08aacStbbdev     template<typename X, typename Y> friend class broadcast_cache;
82449e08aacStbbdev     template<typename X, typename Y> friend class round_robin_cache;
82549e08aacStbbdev     using input_impl_type::try_put_task;
82649e08aacStbbdev 
82749e08aacStbbdev     broadcast_cache<output_type> &successors () override { return fOutput_type::my_successors; }
82849e08aacStbbdev 
82949e08aacStbbdev     void reset_node(reset_flags f) override {
83049e08aacStbbdev         input_impl_type::reset_function_input(f);
83149e08aacStbbdev         // TODO: use clear() instead.
83249e08aacStbbdev         if(f & rf_clear_edges) {
83349e08aacStbbdev             successors().clear();
83449e08aacStbbdev             my_predecessors.clear();
83549e08aacStbbdev         }
83649e08aacStbbdev         __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "function_node successors not empty");
83749e08aacStbbdev         __TBB_ASSERT(this->my_predecessors.empty(), "function_node predecessors not empty");
83849e08aacStbbdev     }
83949e08aacStbbdev 
84049e08aacStbbdev };  // class function_node
84149e08aacStbbdev 
84249e08aacStbbdev //! implements a function node that supports Input -> (set of outputs)
84349e08aacStbbdev // Output is a tuple of output types.
84449e08aacStbbdev template<typename Input, typename Output, typename Policy = queueing>
__TBB_requires(std::default_initializable<Input> && std::copy_constructible<Input>)845478de5b1Stbbdev     __TBB_requires(std::default_initializable<Input> &&
846478de5b1Stbbdev                    std::copy_constructible<Input>)
84749e08aacStbbdev class multifunction_node :
84849e08aacStbbdev     public graph_node,
84949e08aacStbbdev     public multifunction_input
85049e08aacStbbdev     <
85149e08aacStbbdev         Input,
85249e08aacStbbdev         typename wrap_tuple_elements<
85349e08aacStbbdev             std::tuple_size<Output>::value,  // #elements in tuple
85449e08aacStbbdev             multifunction_output,  // wrap this around each element
85549e08aacStbbdev             Output // the tuple providing the types
85649e08aacStbbdev         >::type,
85749e08aacStbbdev         Policy,
85849e08aacStbbdev         cache_aligned_allocator<Input>
85949e08aacStbbdev     >
86049e08aacStbbdev {
86149e08aacStbbdev     typedef cache_aligned_allocator<Input> internals_allocator;
86249e08aacStbbdev 
86349e08aacStbbdev protected:
86449e08aacStbbdev     static const int N = std::tuple_size<Output>::value;
86549e08aacStbbdev public:
86649e08aacStbbdev     typedef Input input_type;
86749e08aacStbbdev     typedef null_type output_type;
86849e08aacStbbdev     typedef typename wrap_tuple_elements<N,multifunction_output, Output>::type output_ports_type;
86949e08aacStbbdev     typedef multifunction_input<
87049e08aacStbbdev         input_type, output_ports_type, Policy, internals_allocator> input_impl_type;
87149e08aacStbbdev     typedef function_input_queue<input_type, internals_allocator> input_queue_type;
87249e08aacStbbdev private:
87349e08aacStbbdev     using input_impl_type::my_predecessors;
87449e08aacStbbdev public:
87549e08aacStbbdev     template<typename Body>
876478de5b1Stbbdev         __TBB_requires(multifunction_node_body<Body, Input, output_ports_type>)
87749e08aacStbbdev     __TBB_NOINLINE_SYM multifunction_node(
87849e08aacStbbdev         graph &g, size_t concurrency,
87949e08aacStbbdev         Body body, Policy = Policy(), node_priority_t a_priority = no_priority
88049e08aacStbbdev     ) : graph_node(g), input_impl_type(g, concurrency, body, a_priority) {
88149e08aacStbbdev         fgt_multioutput_node_with_body<N>(
88249e08aacStbbdev             CODEPTR(), FLOW_MULTIFUNCTION_NODE,
88349e08aacStbbdev             &this->my_graph, static_cast<receiver<input_type> *>(this),
88449e08aacStbbdev             this->output_ports(), this->my_body
88549e08aacStbbdev         );
88649e08aacStbbdev     }
88749e08aacStbbdev 
88849e08aacStbbdev     template <typename Body>
889478de5b1Stbbdev         __TBB_requires(multifunction_node_body<Body, Input, output_ports_type>)
89049e08aacStbbdev     __TBB_NOINLINE_SYM multifunction_node(graph& g, size_t concurrency, Body body, node_priority_t a_priority)
89149e08aacStbbdev         : multifunction_node(g, concurrency, body, Policy(), a_priority) {}
89249e08aacStbbdev 
89349e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
89449e08aacStbbdev     template <typename Body, typename... Args>
895478de5b1Stbbdev         __TBB_requires(multifunction_node_body<Body, Input, output_ports_type>)
89649e08aacStbbdev     __TBB_NOINLINE_SYM multifunction_node(const node_set<Args...>& nodes, size_t concurrency, Body body,
89749e08aacStbbdev                        Policy p = Policy(), node_priority_t a_priority = no_priority)
89849e08aacStbbdev         : multifunction_node(nodes.graph_reference(), concurrency, body, p, a_priority) {
89949e08aacStbbdev         make_edges_in_order(nodes, *this);
90049e08aacStbbdev     }
90149e08aacStbbdev 
90249e08aacStbbdev     template <typename Body, typename... Args>
903478de5b1Stbbdev         __TBB_requires(multifunction_node_body<Body, Input, output_ports_type>)
90449e08aacStbbdev     __TBB_NOINLINE_SYM multifunction_node(const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t a_priority)
90549e08aacStbbdev         : multifunction_node(nodes, concurrency, body, Policy(), a_priority) {}
90649e08aacStbbdev #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
90749e08aacStbbdev 
90849e08aacStbbdev     __TBB_NOINLINE_SYM multifunction_node( const multifunction_node &other) :
90949e08aacStbbdev         graph_node(other.my_graph), input_impl_type(other) {
91049e08aacStbbdev         fgt_multioutput_node_with_body<N>( CODEPTR(), FLOW_MULTIFUNCTION_NODE,
91149e08aacStbbdev                 &this->my_graph, static_cast<receiver<input_type> *>(this),
91249e08aacStbbdev                 this->output_ports(), this->my_body );
91349e08aacStbbdev     }
91449e08aacStbbdev 
91549e08aacStbbdev     // all the guts are in multifunction_input...
91649e08aacStbbdev protected:
91749e08aacStbbdev     void reset_node(reset_flags f) override { input_impl_type::reset(f); }
91849e08aacStbbdev };  // multifunction_node
91949e08aacStbbdev 
92049e08aacStbbdev //! split_node: accepts a tuple as input, forwards each element of the tuple to its
92149e08aacStbbdev //  successors.  The node has unlimited concurrency, so it does not reject inputs.
92249e08aacStbbdev template<typename TupleType>
92349e08aacStbbdev class split_node : public graph_node, public receiver<TupleType> {
92449e08aacStbbdev     static const int N = std::tuple_size<TupleType>::value;
92549e08aacStbbdev     typedef receiver<TupleType> base_type;
92649e08aacStbbdev public:
92749e08aacStbbdev     typedef TupleType input_type;
92849e08aacStbbdev     typedef typename wrap_tuple_elements<
92949e08aacStbbdev             N,  // #elements in tuple
93049e08aacStbbdev             multifunction_output,  // wrap this around each element
93149e08aacStbbdev             TupleType // the tuple providing the types
93249e08aacStbbdev         >::type  output_ports_type;
93349e08aacStbbdev 
split_node(graph & g)93449e08aacStbbdev     __TBB_NOINLINE_SYM explicit split_node(graph &g)
93549e08aacStbbdev         : graph_node(g),
93649e08aacStbbdev           my_output_ports(init_output_ports<output_ports_type>::call(g, my_output_ports))
93749e08aacStbbdev     {
93849e08aacStbbdev         fgt_multioutput_node<N>(CODEPTR(), FLOW_SPLIT_NODE, &this->my_graph,
93949e08aacStbbdev             static_cast<receiver<input_type> *>(this), this->output_ports());
94049e08aacStbbdev     }
94149e08aacStbbdev 
94249e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
94349e08aacStbbdev     template <typename... Args>
split_node(const node_set<Args...> & nodes)94449e08aacStbbdev     __TBB_NOINLINE_SYM split_node(const node_set<Args...>& nodes) : split_node(nodes.graph_reference()) {
94549e08aacStbbdev         make_edges_in_order(nodes, *this);
94649e08aacStbbdev     }
94749e08aacStbbdev #endif
94849e08aacStbbdev 
split_node(const split_node & other)94949e08aacStbbdev     __TBB_NOINLINE_SYM split_node(const split_node& other)
95049e08aacStbbdev         : graph_node(other.my_graph), base_type(other),
95149e08aacStbbdev           my_output_ports(init_output_ports<output_ports_type>::call(other.my_graph, my_output_ports))
95249e08aacStbbdev     {
95349e08aacStbbdev         fgt_multioutput_node<N>(CODEPTR(), FLOW_SPLIT_NODE, &this->my_graph,
95449e08aacStbbdev             static_cast<receiver<input_type> *>(this), this->output_ports());
95549e08aacStbbdev     }
95649e08aacStbbdev 
output_ports()95749e08aacStbbdev     output_ports_type &output_ports() { return my_output_ports; }
95849e08aacStbbdev 
95949e08aacStbbdev protected:
try_put_task(const TupleType & t)96049e08aacStbbdev     graph_task *try_put_task(const TupleType& t) override {
96149e08aacStbbdev         // Sending split messages in parallel is not justified, as overheads would prevail.
96249e08aacStbbdev         // Also, we do not have successors here. So we just tell the task returned here is successful.
96349e08aacStbbdev         return emit_element<N>::emit_this(this->my_graph, t, output_ports());
96449e08aacStbbdev     }
reset_node(reset_flags f)96549e08aacStbbdev     void reset_node(reset_flags f) override {
96649e08aacStbbdev         if (f & rf_clear_edges)
96749e08aacStbbdev             clear_element<N>::clear_this(my_output_ports);
96849e08aacStbbdev 
96949e08aacStbbdev         __TBB_ASSERT(!(f & rf_clear_edges) || clear_element<N>::this_empty(my_output_ports), "split_node reset failed");
97049e08aacStbbdev     }
graph_reference()97149e08aacStbbdev     graph& graph_reference() const override {
97249e08aacStbbdev         return my_graph;
97349e08aacStbbdev     }
97449e08aacStbbdev 
97549e08aacStbbdev private:
97649e08aacStbbdev     output_ports_type my_output_ports;
97749e08aacStbbdev };
97849e08aacStbbdev 
97949e08aacStbbdev //! Implements an executable node that supports continue_msg -> Output
98049e08aacStbbdev template <typename Output, typename Policy = Policy<void> >
__TBB_requires(std::copy_constructible<Output>)981478de5b1Stbbdev     __TBB_requires(std::copy_constructible<Output>)
98249e08aacStbbdev class continue_node : public graph_node, public continue_input<Output, Policy>,
98349e08aacStbbdev                       public function_output<Output> {
98449e08aacStbbdev public:
98549e08aacStbbdev     typedef continue_msg input_type;
98649e08aacStbbdev     typedef Output output_type;
98749e08aacStbbdev     typedef continue_input<Output, Policy> input_impl_type;
98849e08aacStbbdev     typedef function_output<output_type> fOutput_type;
98949e08aacStbbdev     typedef typename input_impl_type::predecessor_type predecessor_type;
99049e08aacStbbdev     typedef typename fOutput_type::successor_type successor_type;
99149e08aacStbbdev 
99249e08aacStbbdev     //! Constructor for executable node with continue_msg -> Output
99349e08aacStbbdev     template <typename Body >
994478de5b1Stbbdev         __TBB_requires(continue_node_body<Body, Output>)
99549e08aacStbbdev     __TBB_NOINLINE_SYM continue_node(
99649e08aacStbbdev         graph &g,
99749e08aacStbbdev         Body body, Policy = Policy(), node_priority_t a_priority = no_priority
99849e08aacStbbdev     ) : graph_node(g), input_impl_type( g, body, a_priority ),
99949e08aacStbbdev         fOutput_type(g) {
100049e08aacStbbdev         fgt_node_with_body( CODEPTR(), FLOW_CONTINUE_NODE, &this->my_graph,
100149e08aacStbbdev 
100249e08aacStbbdev                                            static_cast<receiver<input_type> *>(this),
100349e08aacStbbdev                                            static_cast<sender<output_type> *>(this), this->my_body );
100449e08aacStbbdev     }
100549e08aacStbbdev 
100649e08aacStbbdev     template <typename Body>
1007478de5b1Stbbdev         __TBB_requires(continue_node_body<Body, Output>)
100849e08aacStbbdev     continue_node( graph& g, Body body, node_priority_t a_priority )
100949e08aacStbbdev         : continue_node(g, body, Policy(), a_priority) {}
101049e08aacStbbdev 
101149e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
101249e08aacStbbdev     template <typename Body, typename... Args>
1013478de5b1Stbbdev         __TBB_requires(continue_node_body<Body, Output>)
101449e08aacStbbdev     continue_node( const node_set<Args...>& nodes, Body body,
101549e08aacStbbdev                    Policy p = Policy(), node_priority_t a_priority = no_priority )
101649e08aacStbbdev         : continue_node(nodes.graph_reference(), body, p, a_priority ) {
101749e08aacStbbdev         make_edges_in_order(nodes, *this);
101849e08aacStbbdev     }
101949e08aacStbbdev     template <typename Body, typename... Args>
1020478de5b1Stbbdev         __TBB_requires(continue_node_body<Body, Output>)
102149e08aacStbbdev     continue_node( const node_set<Args...>& nodes, Body body, node_priority_t a_priority)
102249e08aacStbbdev         : continue_node(nodes, body, Policy(), a_priority) {}
102349e08aacStbbdev #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
102449e08aacStbbdev 
102549e08aacStbbdev     //! Constructor for executable node with continue_msg -> Output
102649e08aacStbbdev     template <typename Body >
1027478de5b1Stbbdev         __TBB_requires(continue_node_body<Body, Output>)
102849e08aacStbbdev     __TBB_NOINLINE_SYM continue_node(
102949e08aacStbbdev         graph &g, int number_of_predecessors,
103049e08aacStbbdev         Body body, Policy = Policy(), node_priority_t a_priority = no_priority
103149e08aacStbbdev     ) : graph_node(g)
103249e08aacStbbdev       , input_impl_type(g, number_of_predecessors, body, a_priority),
103349e08aacStbbdev         fOutput_type(g) {
103449e08aacStbbdev         fgt_node_with_body( CODEPTR(), FLOW_CONTINUE_NODE, &this->my_graph,
103549e08aacStbbdev                                            static_cast<receiver<input_type> *>(this),
103649e08aacStbbdev                                            static_cast<sender<output_type> *>(this), this->my_body );
103749e08aacStbbdev     }
103849e08aacStbbdev 
103949e08aacStbbdev     template <typename Body>
1040478de5b1Stbbdev         __TBB_requires(continue_node_body<Body, Output>)
104149e08aacStbbdev     continue_node( graph& g, int number_of_predecessors, Body body, node_priority_t a_priority)
104249e08aacStbbdev         : continue_node(g, number_of_predecessors, body, Policy(), a_priority) {}
104349e08aacStbbdev 
104449e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
104549e08aacStbbdev     template <typename Body, typename... Args>
1046478de5b1Stbbdev         __TBB_requires(continue_node_body<Body, Output>)
104749e08aacStbbdev     continue_node( const node_set<Args...>& nodes, int number_of_predecessors,
104849e08aacStbbdev                    Body body, Policy p = Policy(), node_priority_t a_priority = no_priority )
104949e08aacStbbdev         : continue_node(nodes.graph_reference(), number_of_predecessors, body, p, a_priority) {
105049e08aacStbbdev         make_edges_in_order(nodes, *this);
105149e08aacStbbdev     }
105249e08aacStbbdev 
105349e08aacStbbdev     template <typename Body, typename... Args>
1054478de5b1Stbbdev         __TBB_requires(continue_node_body<Body, Output>)
105549e08aacStbbdev     continue_node( const node_set<Args...>& nodes, int number_of_predecessors,
105649e08aacStbbdev                    Body body, node_priority_t a_priority )
105749e08aacStbbdev         : continue_node(nodes, number_of_predecessors, body, Policy(), a_priority) {}
105849e08aacStbbdev #endif
105949e08aacStbbdev 
106049e08aacStbbdev     //! Copy constructor
106149e08aacStbbdev     __TBB_NOINLINE_SYM continue_node( const continue_node& src ) :
106249e08aacStbbdev         graph_node(src.my_graph), input_impl_type(src),
106349e08aacStbbdev         function_output<Output>(src.my_graph) {
106449e08aacStbbdev         fgt_node_with_body( CODEPTR(), FLOW_CONTINUE_NODE, &this->my_graph,
106549e08aacStbbdev                                            static_cast<receiver<input_type> *>(this),
106649e08aacStbbdev                                            static_cast<sender<output_type> *>(this), this->my_body );
106749e08aacStbbdev     }
106849e08aacStbbdev 
106949e08aacStbbdev protected:
107049e08aacStbbdev     template< typename R, typename B > friend class run_and_put_task;
107149e08aacStbbdev     template<typename X, typename Y> friend class broadcast_cache;
107249e08aacStbbdev     template<typename X, typename Y> friend class round_robin_cache;
107349e08aacStbbdev     using input_impl_type::try_put_task;
107449e08aacStbbdev     broadcast_cache<output_type> &successors () override { return fOutput_type::my_successors; }
107549e08aacStbbdev 
107649e08aacStbbdev     void reset_node(reset_flags f) override {
107749e08aacStbbdev         input_impl_type::reset_receiver(f);
107849e08aacStbbdev         if(f & rf_clear_edges)successors().clear();
107949e08aacStbbdev         __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "continue_node not reset");
108049e08aacStbbdev     }
108149e08aacStbbdev };  // continue_node
108249e08aacStbbdev 
108349e08aacStbbdev //! Forwards messages of type T to all successors
108449e08aacStbbdev template <typename T>
108549e08aacStbbdev class broadcast_node : public graph_node, public receiver<T>, public sender<T> {
108649e08aacStbbdev public:
108749e08aacStbbdev     typedef T input_type;
108849e08aacStbbdev     typedef T output_type;
108949e08aacStbbdev     typedef typename receiver<input_type>::predecessor_type predecessor_type;
109049e08aacStbbdev     typedef typename sender<output_type>::successor_type successor_type;
109149e08aacStbbdev private:
109249e08aacStbbdev     broadcast_cache<input_type> my_successors;
109349e08aacStbbdev public:
109449e08aacStbbdev 
broadcast_node(graph & g)109549e08aacStbbdev     __TBB_NOINLINE_SYM explicit broadcast_node(graph& g) : graph_node(g), my_successors(this) {
109649e08aacStbbdev         fgt_node( CODEPTR(), FLOW_BROADCAST_NODE, &this->my_graph,
109749e08aacStbbdev                   static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
109849e08aacStbbdev     }
109949e08aacStbbdev 
110049e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
110149e08aacStbbdev     template <typename... Args>
broadcast_node(const node_set<Args...> & nodes)110249e08aacStbbdev     broadcast_node(const node_set<Args...>& nodes) : broadcast_node(nodes.graph_reference()) {
110349e08aacStbbdev         make_edges_in_order(nodes, *this);
110449e08aacStbbdev     }
110549e08aacStbbdev #endif
110649e08aacStbbdev 
110749e08aacStbbdev     // Copy constructor
broadcast_node(const broadcast_node & src)110849e08aacStbbdev     __TBB_NOINLINE_SYM broadcast_node( const broadcast_node& src ) : broadcast_node(src.my_graph) {}
110949e08aacStbbdev 
111049e08aacStbbdev     //! Adds a successor
register_successor(successor_type & r)111149e08aacStbbdev     bool register_successor( successor_type &r ) override {
111249e08aacStbbdev         my_successors.register_successor( r );
111349e08aacStbbdev         return true;
111449e08aacStbbdev     }
111549e08aacStbbdev 
111649e08aacStbbdev     //! Removes s as a successor
remove_successor(successor_type & r)111749e08aacStbbdev     bool remove_successor( successor_type &r ) override {
111849e08aacStbbdev         my_successors.remove_successor( r );
111949e08aacStbbdev         return true;
112049e08aacStbbdev     }
112149e08aacStbbdev 
112249e08aacStbbdev protected:
112349e08aacStbbdev     template< typename R, typename B > friend class run_and_put_task;
112449e08aacStbbdev     template<typename X, typename Y> friend class broadcast_cache;
112549e08aacStbbdev     template<typename X, typename Y> friend class round_robin_cache;
112649e08aacStbbdev     //! build a task to run the successor if possible.  Default is old behavior.
try_put_task(const T & t)112749e08aacStbbdev     graph_task *try_put_task(const T& t) override {
112849e08aacStbbdev         graph_task *new_task = my_successors.try_put_task(t);
112949e08aacStbbdev         if (!new_task) new_task = SUCCESSFULLY_ENQUEUED;
113049e08aacStbbdev         return new_task;
113149e08aacStbbdev     }
113249e08aacStbbdev 
graph_reference()113349e08aacStbbdev     graph& graph_reference() const override {
113449e08aacStbbdev         return my_graph;
113549e08aacStbbdev     }
113649e08aacStbbdev 
reset_node(reset_flags f)113749e08aacStbbdev     void reset_node(reset_flags f) override {
113849e08aacStbbdev         if (f&rf_clear_edges) {
113949e08aacStbbdev            my_successors.clear();
114049e08aacStbbdev         }
114149e08aacStbbdev         __TBB_ASSERT(!(f & rf_clear_edges) || my_successors.empty(), "Error resetting broadcast_node");
114249e08aacStbbdev     }
114349e08aacStbbdev };  // broadcast_node
114449e08aacStbbdev 
114549e08aacStbbdev //! Forwards messages in arbitrary order
114649e08aacStbbdev template <typename T>
114749e08aacStbbdev class buffer_node
114849e08aacStbbdev     : public graph_node
114949e08aacStbbdev     , public reservable_item_buffer< T, cache_aligned_allocator<T> >
115049e08aacStbbdev     , public receiver<T>, public sender<T>
115149e08aacStbbdev {
115249e08aacStbbdev     typedef cache_aligned_allocator<T> internals_allocator;
115349e08aacStbbdev 
115449e08aacStbbdev public:
115549e08aacStbbdev     typedef T input_type;
115649e08aacStbbdev     typedef T output_type;
115749e08aacStbbdev     typedef typename receiver<input_type>::predecessor_type predecessor_type;
115849e08aacStbbdev     typedef typename sender<output_type>::successor_type successor_type;
115949e08aacStbbdev     typedef buffer_node<T> class_type;
116049e08aacStbbdev 
116149e08aacStbbdev protected:
116249e08aacStbbdev     typedef size_t size_type;
116349e08aacStbbdev     round_robin_cache< T, null_rw_mutex > my_successors;
116449e08aacStbbdev 
116549e08aacStbbdev     friend class forward_task_bypass< class_type >;
116649e08aacStbbdev 
116749e08aacStbbdev     enum op_type {reg_succ, rem_succ, req_item, res_item, rel_res, con_res, put_item, try_fwd_task
116849e08aacStbbdev     };
116949e08aacStbbdev 
117049e08aacStbbdev     // implements the aggregator_operation concept
117149e08aacStbbdev     class buffer_operation : public aggregated_operation< buffer_operation > {
117249e08aacStbbdev     public:
117349e08aacStbbdev         char type;
117449e08aacStbbdev         T* elem;
117549e08aacStbbdev         graph_task* ltask;
117649e08aacStbbdev         successor_type *r;
117749e08aacStbbdev 
buffer_operation(const T & e,op_type t)117849e08aacStbbdev         buffer_operation(const T& e, op_type t) : type(char(t))
117957f524caSIlya Isaev                                                   , elem(const_cast<T*>(&e)) , ltask(nullptr)
1180f2af7473Skboyarinov                                                   , r(nullptr)
118149e08aacStbbdev         {}
buffer_operation(op_type t)1182f2af7473Skboyarinov         buffer_operation(op_type t) : type(char(t)), elem(nullptr), ltask(nullptr), r(nullptr) {}
118349e08aacStbbdev     };
118449e08aacStbbdev 
118549e08aacStbbdev     bool forwarder_busy;
118649e08aacStbbdev     typedef aggregating_functor<class_type, buffer_operation> handler_type;
118749e08aacStbbdev     friend class aggregating_functor<class_type, buffer_operation>;
118849e08aacStbbdev     aggregator< handler_type, buffer_operation> my_aggregator;
118949e08aacStbbdev 
handle_operations(buffer_operation * op_list)119049e08aacStbbdev     virtual void handle_operations(buffer_operation *op_list) {
119149e08aacStbbdev         handle_operations_impl(op_list, this);
119249e08aacStbbdev     }
119349e08aacStbbdev 
119449e08aacStbbdev     template<typename derived_type>
handle_operations_impl(buffer_operation * op_list,derived_type * derived)119549e08aacStbbdev     void handle_operations_impl(buffer_operation *op_list, derived_type* derived) {
119649e08aacStbbdev         __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
119749e08aacStbbdev 
119857f524caSIlya Isaev         buffer_operation *tmp = nullptr;
119949e08aacStbbdev         bool try_forwarding = false;
120049e08aacStbbdev         while (op_list) {
120149e08aacStbbdev             tmp = op_list;
120249e08aacStbbdev             op_list = op_list->next;
120349e08aacStbbdev             switch (tmp->type) {
120449e08aacStbbdev             case reg_succ: internal_reg_succ(tmp); try_forwarding = true; break;
120549e08aacStbbdev             case rem_succ: internal_rem_succ(tmp); break;
120649e08aacStbbdev             case req_item: internal_pop(tmp); break;
120749e08aacStbbdev             case res_item: internal_reserve(tmp); break;
120849e08aacStbbdev             case rel_res:  internal_release(tmp); try_forwarding = true; break;
120949e08aacStbbdev             case con_res:  internal_consume(tmp); try_forwarding = true; break;
121049e08aacStbbdev             case put_item: try_forwarding = internal_push(tmp); break;
121149e08aacStbbdev             case try_fwd_task: internal_forward_task(tmp); break;
121249e08aacStbbdev             }
121349e08aacStbbdev         }
121449e08aacStbbdev 
121549e08aacStbbdev         derived->order();
121649e08aacStbbdev 
121749e08aacStbbdev         if (try_forwarding && !forwarder_busy) {
121849e08aacStbbdev             if(is_graph_active(this->my_graph)) {
121949e08aacStbbdev                 forwarder_busy = true;
122049e08aacStbbdev                 typedef forward_task_bypass<class_type> task_type;
122149e08aacStbbdev                 small_object_allocator allocator{};
122249e08aacStbbdev                 graph_task* new_task = allocator.new_object<task_type>(graph_reference(), allocator, *this);
122349e08aacStbbdev                 my_graph.reserve_wait();
122449e08aacStbbdev                 // tmp should point to the last item handled by the aggregator.  This is the operation
122549e08aacStbbdev                 // the handling thread enqueued.  So modifying that record will be okay.
122649e08aacStbbdev                 // TODO revamp: check that the issue is still present
122749e08aacStbbdev                 // workaround for icc bug  (at least 12.0 and 13.0)
122849e08aacStbbdev                 // error: function "tbb::flow::interfaceX::combine_tasks" cannot be called with the given argument list
122949e08aacStbbdev                 //        argument types are: (graph, graph_task *, graph_task *)
123049e08aacStbbdev                 graph_task *z = tmp->ltask;
123149e08aacStbbdev                 graph &g = this->my_graph;
123249e08aacStbbdev                 tmp->ltask = combine_tasks(g, z, new_task);  // in case the op generated a task
123349e08aacStbbdev             }
123449e08aacStbbdev         }
123549e08aacStbbdev     }  // handle_operations
123649e08aacStbbdev 
grab_forwarding_task(buffer_operation & op_data)123749e08aacStbbdev     inline graph_task *grab_forwarding_task( buffer_operation &op_data) {
123849e08aacStbbdev         return op_data.ltask;
123949e08aacStbbdev     }
124049e08aacStbbdev 
enqueue_forwarding_task(buffer_operation & op_data)124149e08aacStbbdev     inline bool enqueue_forwarding_task(buffer_operation &op_data) {
124249e08aacStbbdev         graph_task *ft = grab_forwarding_task(op_data);
124349e08aacStbbdev         if(ft) {
124449e08aacStbbdev             spawn_in_graph_arena(graph_reference(), *ft);
124549e08aacStbbdev             return true;
124649e08aacStbbdev         }
124749e08aacStbbdev         return false;
124849e08aacStbbdev     }
124949e08aacStbbdev 
125049e08aacStbbdev     //! This is executed by an enqueued task, the "forwarder"
forward_task()125149e08aacStbbdev     virtual graph_task *forward_task() {
125249e08aacStbbdev         buffer_operation op_data(try_fwd_task);
125357f524caSIlya Isaev         graph_task *last_task = nullptr;
125449e08aacStbbdev         do {
125549e08aacStbbdev             op_data.status = WAIT;
125657f524caSIlya Isaev             op_data.ltask = nullptr;
125749e08aacStbbdev             my_aggregator.execute(&op_data);
125849e08aacStbbdev 
125949e08aacStbbdev             // workaround for icc bug
126049e08aacStbbdev             graph_task *xtask = op_data.ltask;
126149e08aacStbbdev             graph& g = this->my_graph;
126249e08aacStbbdev             last_task = combine_tasks(g, last_task, xtask);
126349e08aacStbbdev         } while (op_data.status ==SUCCEEDED);
126449e08aacStbbdev         return last_task;
126549e08aacStbbdev     }
126649e08aacStbbdev 
126749e08aacStbbdev     //! Register successor
internal_reg_succ(buffer_operation * op)126849e08aacStbbdev     virtual void internal_reg_succ(buffer_operation *op) {
1269f2af7473Skboyarinov         __TBB_ASSERT(op->r, nullptr);
127049e08aacStbbdev         my_successors.register_successor(*(op->r));
127149e08aacStbbdev         op->status.store(SUCCEEDED, std::memory_order_release);
127249e08aacStbbdev     }
127349e08aacStbbdev 
127449e08aacStbbdev     //! Remove successor
internal_rem_succ(buffer_operation * op)127549e08aacStbbdev     virtual void internal_rem_succ(buffer_operation *op) {
1276f2af7473Skboyarinov         __TBB_ASSERT(op->r, nullptr);
127749e08aacStbbdev         my_successors.remove_successor(*(op->r));
127849e08aacStbbdev         op->status.store(SUCCEEDED, std::memory_order_release);
127949e08aacStbbdev     }
128049e08aacStbbdev 
128149e08aacStbbdev private:
order()128249e08aacStbbdev     void order() {}
128349e08aacStbbdev 
is_item_valid()128449e08aacStbbdev     bool is_item_valid() {
128549e08aacStbbdev         return this->my_item_valid(this->my_tail - 1);
128649e08aacStbbdev     }
128749e08aacStbbdev 
try_put_and_add_task(graph_task * & last_task)128849e08aacStbbdev     void try_put_and_add_task(graph_task*& last_task) {
128949e08aacStbbdev         graph_task *new_task = my_successors.try_put_task(this->back());
129049e08aacStbbdev         if (new_task) {
129149e08aacStbbdev             // workaround for icc bug
129249e08aacStbbdev             graph& g = this->my_graph;
129349e08aacStbbdev             last_task = combine_tasks(g, last_task, new_task);
129449e08aacStbbdev             this->destroy_back();
129549e08aacStbbdev         }
129649e08aacStbbdev     }
129749e08aacStbbdev 
129849e08aacStbbdev protected:
129949e08aacStbbdev     //! Tries to forward valid items to successors
internal_forward_task(buffer_operation * op)130049e08aacStbbdev     virtual void internal_forward_task(buffer_operation *op) {
130149e08aacStbbdev         internal_forward_task_impl(op, this);
130249e08aacStbbdev     }
130349e08aacStbbdev 
130449e08aacStbbdev     template<typename derived_type>
internal_forward_task_impl(buffer_operation * op,derived_type * derived)130549e08aacStbbdev     void internal_forward_task_impl(buffer_operation *op, derived_type* derived) {
130649e08aacStbbdev         __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
130749e08aacStbbdev 
130849e08aacStbbdev         if (this->my_reserved || !derived->is_item_valid()) {
130949e08aacStbbdev             op->status.store(FAILED, std::memory_order_release);
131049e08aacStbbdev             this->forwarder_busy = false;
131149e08aacStbbdev             return;
131249e08aacStbbdev         }
131349e08aacStbbdev         // Try forwarding, giving each successor a chance
131457f524caSIlya Isaev         graph_task* last_task = nullptr;
131549e08aacStbbdev         size_type counter = my_successors.size();
131649e08aacStbbdev         for (; counter > 0 && derived->is_item_valid(); --counter)
131749e08aacStbbdev             derived->try_put_and_add_task(last_task);
131849e08aacStbbdev 
131949e08aacStbbdev         op->ltask = last_task;  // return task
132049e08aacStbbdev         if (last_task && !counter) {
132149e08aacStbbdev             op->status.store(SUCCEEDED, std::memory_order_release);
132249e08aacStbbdev         }
132349e08aacStbbdev         else {
132449e08aacStbbdev             op->status.store(FAILED, std::memory_order_release);
132549e08aacStbbdev             forwarder_busy = false;
132649e08aacStbbdev         }
132749e08aacStbbdev     }
132849e08aacStbbdev 
internal_push(buffer_operation * op)132949e08aacStbbdev     virtual bool internal_push(buffer_operation *op) {
1330f2af7473Skboyarinov         __TBB_ASSERT(op->elem, nullptr);
133149e08aacStbbdev         this->push_back(*(op->elem));
133249e08aacStbbdev         op->status.store(SUCCEEDED, std::memory_order_release);
133349e08aacStbbdev         return true;
133449e08aacStbbdev     }
133549e08aacStbbdev 
internal_pop(buffer_operation * op)133649e08aacStbbdev     virtual void internal_pop(buffer_operation *op) {
1337f2af7473Skboyarinov         __TBB_ASSERT(op->elem, nullptr);
133849e08aacStbbdev         if(this->pop_back(*(op->elem))) {
133949e08aacStbbdev             op->status.store(SUCCEEDED, std::memory_order_release);
134049e08aacStbbdev         }
134149e08aacStbbdev         else {
134249e08aacStbbdev             op->status.store(FAILED, std::memory_order_release);
134349e08aacStbbdev         }
134449e08aacStbbdev     }
134549e08aacStbbdev 
internal_reserve(buffer_operation * op)134649e08aacStbbdev     virtual void internal_reserve(buffer_operation *op) {
1347f2af7473Skboyarinov         __TBB_ASSERT(op->elem, nullptr);
134849e08aacStbbdev         if(this->reserve_front(*(op->elem))) {
134949e08aacStbbdev             op->status.store(SUCCEEDED, std::memory_order_release);
135049e08aacStbbdev         }
135149e08aacStbbdev         else {
135249e08aacStbbdev             op->status.store(FAILED, std::memory_order_release);
135349e08aacStbbdev         }
135449e08aacStbbdev     }
135549e08aacStbbdev 
internal_consume(buffer_operation * op)135649e08aacStbbdev     virtual void internal_consume(buffer_operation *op) {
135749e08aacStbbdev         this->consume_front();
135849e08aacStbbdev         op->status.store(SUCCEEDED, std::memory_order_release);
135949e08aacStbbdev     }
136049e08aacStbbdev 
internal_release(buffer_operation * op)136149e08aacStbbdev     virtual void internal_release(buffer_operation *op) {
136249e08aacStbbdev         this->release_front();
136349e08aacStbbdev         op->status.store(SUCCEEDED, std::memory_order_release);
136449e08aacStbbdev     }
136549e08aacStbbdev 
136649e08aacStbbdev public:
136749e08aacStbbdev     //! Constructor
buffer_node(graph & g)136849e08aacStbbdev     __TBB_NOINLINE_SYM explicit buffer_node( graph &g )
136949e08aacStbbdev         : graph_node(g), reservable_item_buffer<T, internals_allocator>(), receiver<T>(),
137049e08aacStbbdev           sender<T>(), my_successors(this), forwarder_busy(false)
137149e08aacStbbdev     {
137249e08aacStbbdev         my_aggregator.initialize_handler(handler_type(this));
137349e08aacStbbdev         fgt_node( CODEPTR(), FLOW_BUFFER_NODE, &this->my_graph,
137449e08aacStbbdev                                  static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
137549e08aacStbbdev     }
137649e08aacStbbdev 
137749e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
137849e08aacStbbdev     template <typename... Args>
buffer_node(const node_set<Args...> & nodes)137949e08aacStbbdev     buffer_node(const node_set<Args...>& nodes) : buffer_node(nodes.graph_reference()) {
138049e08aacStbbdev         make_edges_in_order(nodes, *this);
138149e08aacStbbdev     }
138249e08aacStbbdev #endif
138349e08aacStbbdev 
138449e08aacStbbdev     //! Copy constructor
buffer_node(const buffer_node & src)138549e08aacStbbdev     __TBB_NOINLINE_SYM buffer_node( const buffer_node& src ) : buffer_node(src.my_graph) {}
138649e08aacStbbdev 
138749e08aacStbbdev     //
138849e08aacStbbdev     // message sender implementation
138949e08aacStbbdev     //
139049e08aacStbbdev 
139149e08aacStbbdev     //! Adds a new successor.
139249e08aacStbbdev     /** Adds successor r to the list of successors; may forward tasks.  */
register_successor(successor_type & r)139349e08aacStbbdev     bool register_successor( successor_type &r ) override {
139449e08aacStbbdev         buffer_operation op_data(reg_succ);
139549e08aacStbbdev         op_data.r = &r;
139649e08aacStbbdev         my_aggregator.execute(&op_data);
139749e08aacStbbdev         (void)enqueue_forwarding_task(op_data);
139849e08aacStbbdev         return true;
139949e08aacStbbdev     }
140049e08aacStbbdev 
140149e08aacStbbdev     //! Removes a successor.
140249e08aacStbbdev     /** Removes successor r from the list of successors.
140349e08aacStbbdev         It also calls r.remove_predecessor(*this) to remove this node as a predecessor. */
remove_successor(successor_type & r)140449e08aacStbbdev     bool remove_successor( successor_type &r ) override {
140549e08aacStbbdev         // TODO revamp: investigate why full qualification is necessary here
140649e08aacStbbdev         tbb::detail::d1::remove_predecessor(r, *this);
140749e08aacStbbdev         buffer_operation op_data(rem_succ);
140849e08aacStbbdev         op_data.r = &r;
140949e08aacStbbdev         my_aggregator.execute(&op_data);
141049e08aacStbbdev         // even though this operation does not cause a forward, if we are the handler, and
141149e08aacStbbdev         // a forward is scheduled, we may be the first to reach this point after the aggregator,
141249e08aacStbbdev         // and so should check for the task.
141349e08aacStbbdev         (void)enqueue_forwarding_task(op_data);
141449e08aacStbbdev         return true;
141549e08aacStbbdev     }
141649e08aacStbbdev 
141749e08aacStbbdev     //! Request an item from the buffer_node
141849e08aacStbbdev     /**  true = v contains the returned item<BR>
141949e08aacStbbdev          false = no item has been returned */
try_get(T & v)142049e08aacStbbdev     bool try_get( T &v ) override {
142149e08aacStbbdev         buffer_operation op_data(req_item);
142249e08aacStbbdev         op_data.elem = &v;
142349e08aacStbbdev         my_aggregator.execute(&op_data);
142449e08aacStbbdev         (void)enqueue_forwarding_task(op_data);
142549e08aacStbbdev         return (op_data.status==SUCCEEDED);
142649e08aacStbbdev     }
142749e08aacStbbdev 
142849e08aacStbbdev     //! Reserves an item.
142949e08aacStbbdev     /**  false = no item can be reserved<BR>
143049e08aacStbbdev          true = an item is reserved */
try_reserve(T & v)143149e08aacStbbdev     bool try_reserve( T &v ) override {
143249e08aacStbbdev         buffer_operation op_data(res_item);
143349e08aacStbbdev         op_data.elem = &v;
143449e08aacStbbdev         my_aggregator.execute(&op_data);
143549e08aacStbbdev         (void)enqueue_forwarding_task(op_data);
143649e08aacStbbdev         return (op_data.status==SUCCEEDED);
143749e08aacStbbdev     }
143849e08aacStbbdev 
143949e08aacStbbdev     //! Release a reserved item.
144049e08aacStbbdev     /**  true = item has been released and so remains in sender */
try_release()144149e08aacStbbdev     bool try_release() override {
144249e08aacStbbdev         buffer_operation op_data(rel_res);
144349e08aacStbbdev         my_aggregator.execute(&op_data);
144449e08aacStbbdev         (void)enqueue_forwarding_task(op_data);
144549e08aacStbbdev         return true;
144649e08aacStbbdev     }
144749e08aacStbbdev 
144849e08aacStbbdev     //! Consumes a reserved item.
144949e08aacStbbdev     /** true = item is removed from sender and reservation removed */
try_consume()145049e08aacStbbdev     bool try_consume() override {
145149e08aacStbbdev         buffer_operation op_data(con_res);
145249e08aacStbbdev         my_aggregator.execute(&op_data);
145349e08aacStbbdev         (void)enqueue_forwarding_task(op_data);
145449e08aacStbbdev         return true;
145549e08aacStbbdev     }
145649e08aacStbbdev 
145749e08aacStbbdev protected:
145849e08aacStbbdev 
145949e08aacStbbdev     template< typename R, typename B > friend class run_and_put_task;
146049e08aacStbbdev     template<typename X, typename Y> friend class broadcast_cache;
146149e08aacStbbdev     template<typename X, typename Y> friend class round_robin_cache;
146249e08aacStbbdev     //! receive an item, return a task *if possible
try_put_task(const T & t)146349e08aacStbbdev     graph_task *try_put_task(const T &t) override {
146449e08aacStbbdev         buffer_operation op_data(t, put_item);
146549e08aacStbbdev         my_aggregator.execute(&op_data);
146649e08aacStbbdev         graph_task *ft = grab_forwarding_task(op_data);
146749e08aacStbbdev         // sequencer_nodes can return failure (if an item has been previously inserted)
146849e08aacStbbdev         // We have to spawn the returned task if our own operation fails.
146949e08aacStbbdev 
147049e08aacStbbdev         if(ft && op_data.status ==FAILED) {
147149e08aacStbbdev             // we haven't succeeded queueing the item, but for some reason the
147249e08aacStbbdev             // call returned a task (if another request resulted in a successful
147349e08aacStbbdev             // forward this could happen.)  Queue the task and reset the pointer.
147457f524caSIlya Isaev             spawn_in_graph_arena(graph_reference(), *ft); ft = nullptr;
147549e08aacStbbdev         }
147649e08aacStbbdev         else if(!ft && op_data.status ==SUCCEEDED) {
147749e08aacStbbdev             ft = SUCCESSFULLY_ENQUEUED;
147849e08aacStbbdev         }
147949e08aacStbbdev         return ft;
148049e08aacStbbdev     }
148149e08aacStbbdev 
graph_reference()148249e08aacStbbdev     graph& graph_reference() const override {
148349e08aacStbbdev         return my_graph;
148449e08aacStbbdev     }
148549e08aacStbbdev 
148649e08aacStbbdev protected:
reset_node(reset_flags f)148749e08aacStbbdev     void reset_node( reset_flags f) override {
148849e08aacStbbdev         reservable_item_buffer<T, internals_allocator>::reset();
148949e08aacStbbdev         // TODO: just clear structures
149049e08aacStbbdev         if (f&rf_clear_edges) {
149149e08aacStbbdev             my_successors.clear();
149249e08aacStbbdev         }
149349e08aacStbbdev         forwarder_busy = false;
149449e08aacStbbdev     }
149549e08aacStbbdev };  // buffer_node
149649e08aacStbbdev 
149749e08aacStbbdev //! Forwards messages in FIFO order
149849e08aacStbbdev template <typename T>
149949e08aacStbbdev class queue_node : public buffer_node<T> {
150049e08aacStbbdev protected:
150149e08aacStbbdev     typedef buffer_node<T> base_type;
150249e08aacStbbdev     typedef typename base_type::size_type size_type;
150349e08aacStbbdev     typedef typename base_type::buffer_operation queue_operation;
150449e08aacStbbdev     typedef queue_node class_type;
150549e08aacStbbdev 
150649e08aacStbbdev private:
150749e08aacStbbdev     template<typename> friend class buffer_node;
150849e08aacStbbdev 
is_item_valid()150949e08aacStbbdev     bool is_item_valid() {
151049e08aacStbbdev         return this->my_item_valid(this->my_head);
151149e08aacStbbdev     }
151249e08aacStbbdev 
try_put_and_add_task(graph_task * & last_task)151349e08aacStbbdev     void try_put_and_add_task(graph_task*& last_task) {
151449e08aacStbbdev         graph_task *new_task = this->my_successors.try_put_task(this->front());
151549e08aacStbbdev         if (new_task) {
151649e08aacStbbdev             // workaround for icc bug
151749e08aacStbbdev             graph& graph_ref = this->graph_reference();
151849e08aacStbbdev             last_task = combine_tasks(graph_ref, last_task, new_task);
151949e08aacStbbdev             this->destroy_front();
152049e08aacStbbdev         }
152149e08aacStbbdev     }
152249e08aacStbbdev 
152349e08aacStbbdev protected:
internal_forward_task(queue_operation * op)152449e08aacStbbdev     void internal_forward_task(queue_operation *op) override {
152549e08aacStbbdev         this->internal_forward_task_impl(op, this);
152649e08aacStbbdev     }
152749e08aacStbbdev 
internal_pop(queue_operation * op)152849e08aacStbbdev     void internal_pop(queue_operation *op) override {
152949e08aacStbbdev         if ( this->my_reserved || !this->my_item_valid(this->my_head)){
153049e08aacStbbdev             op->status.store(FAILED, std::memory_order_release);
153149e08aacStbbdev         }
153249e08aacStbbdev         else {
153349e08aacStbbdev             this->pop_front(*(op->elem));
153449e08aacStbbdev             op->status.store(SUCCEEDED, std::memory_order_release);
153549e08aacStbbdev         }
153649e08aacStbbdev     }
internal_reserve(queue_operation * op)153749e08aacStbbdev     void internal_reserve(queue_operation *op) override {
153849e08aacStbbdev         if (this->my_reserved || !this->my_item_valid(this->my_head)) {
153949e08aacStbbdev             op->status.store(FAILED, std::memory_order_release);
154049e08aacStbbdev         }
154149e08aacStbbdev         else {
154249e08aacStbbdev             this->reserve_front(*(op->elem));
154349e08aacStbbdev             op->status.store(SUCCEEDED, std::memory_order_release);
154449e08aacStbbdev         }
154549e08aacStbbdev     }
internal_consume(queue_operation * op)154649e08aacStbbdev     void internal_consume(queue_operation *op) override {
154749e08aacStbbdev         this->consume_front();
154849e08aacStbbdev         op->status.store(SUCCEEDED, std::memory_order_release);
154949e08aacStbbdev     }
155049e08aacStbbdev 
155149e08aacStbbdev public:
155249e08aacStbbdev     typedef T input_type;
155349e08aacStbbdev     typedef T output_type;
155449e08aacStbbdev     typedef typename receiver<input_type>::predecessor_type predecessor_type;
155549e08aacStbbdev     typedef typename sender<output_type>::successor_type successor_type;
155649e08aacStbbdev 
155749e08aacStbbdev     //! Constructor
queue_node(graph & g)155849e08aacStbbdev     __TBB_NOINLINE_SYM explicit queue_node( graph &g ) : base_type(g) {
155949e08aacStbbdev         fgt_node( CODEPTR(), FLOW_QUEUE_NODE, &(this->my_graph),
156049e08aacStbbdev                                  static_cast<receiver<input_type> *>(this),
156149e08aacStbbdev                                  static_cast<sender<output_type> *>(this) );
156249e08aacStbbdev     }
156349e08aacStbbdev 
156449e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
156549e08aacStbbdev     template <typename... Args>
queue_node(const node_set<Args...> & nodes)156649e08aacStbbdev     queue_node( const node_set<Args...>& nodes) : queue_node(nodes.graph_reference()) {
156749e08aacStbbdev         make_edges_in_order(nodes, *this);
156849e08aacStbbdev     }
156949e08aacStbbdev #endif
157049e08aacStbbdev 
157149e08aacStbbdev     //! Copy constructor
queue_node(const queue_node & src)157249e08aacStbbdev     __TBB_NOINLINE_SYM queue_node( const queue_node& src) : base_type(src) {
157349e08aacStbbdev         fgt_node( CODEPTR(), FLOW_QUEUE_NODE, &(this->my_graph),
157449e08aacStbbdev                                  static_cast<receiver<input_type> *>(this),
157549e08aacStbbdev                                  static_cast<sender<output_type> *>(this) );
157649e08aacStbbdev     }
157749e08aacStbbdev 
157849e08aacStbbdev 
157949e08aacStbbdev protected:
reset_node(reset_flags f)158049e08aacStbbdev     void reset_node( reset_flags f) override {
158149e08aacStbbdev         base_type::reset_node(f);
158249e08aacStbbdev     }
158349e08aacStbbdev };  // queue_node
158449e08aacStbbdev 
158549e08aacStbbdev //! Forwards messages in sequence order
158649e08aacStbbdev template <typename T>
__TBB_requires(std::copyable<T>)1587478de5b1Stbbdev     __TBB_requires(std::copyable<T>)
158849e08aacStbbdev class sequencer_node : public queue_node<T> {
158949e08aacStbbdev     function_body< T, size_t > *my_sequencer;
159049e08aacStbbdev     // my_sequencer should be a benign function and must be callable
159149e08aacStbbdev     // from a parallel context.  Does this mean it needn't be reset?
159249e08aacStbbdev public:
159349e08aacStbbdev     typedef T input_type;
159449e08aacStbbdev     typedef T output_type;
159549e08aacStbbdev     typedef typename receiver<input_type>::predecessor_type predecessor_type;
159649e08aacStbbdev     typedef typename sender<output_type>::successor_type successor_type;
159749e08aacStbbdev 
159849e08aacStbbdev     //! Constructor
159949e08aacStbbdev     template< typename Sequencer >
1600478de5b1Stbbdev         __TBB_requires(sequencer<Sequencer, T>)
160149e08aacStbbdev     __TBB_NOINLINE_SYM sequencer_node( graph &g, const Sequencer& s ) : queue_node<T>(g),
160249e08aacStbbdev         my_sequencer(new function_body_leaf< T, size_t, Sequencer>(s) ) {
160349e08aacStbbdev         fgt_node( CODEPTR(), FLOW_SEQUENCER_NODE, &(this->my_graph),
160449e08aacStbbdev                                  static_cast<receiver<input_type> *>(this),
160549e08aacStbbdev                                  static_cast<sender<output_type> *>(this) );
160649e08aacStbbdev     }
160749e08aacStbbdev 
160849e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
160949e08aacStbbdev     template <typename Sequencer, typename... Args>
1610478de5b1Stbbdev         __TBB_requires(sequencer<Sequencer, T>)
161149e08aacStbbdev     sequencer_node( const node_set<Args...>& nodes, const Sequencer& s)
161249e08aacStbbdev         : sequencer_node(nodes.graph_reference(), s) {
161349e08aacStbbdev         make_edges_in_order(nodes, *this);
161449e08aacStbbdev     }
161549e08aacStbbdev #endif
161649e08aacStbbdev 
161749e08aacStbbdev     //! Copy constructor
161849e08aacStbbdev     __TBB_NOINLINE_SYM sequencer_node( const sequencer_node& src ) : queue_node<T>(src),
161949e08aacStbbdev         my_sequencer( src.my_sequencer->clone() ) {
162049e08aacStbbdev         fgt_node( CODEPTR(), FLOW_SEQUENCER_NODE, &(this->my_graph),
162149e08aacStbbdev                                  static_cast<receiver<input_type> *>(this),
162249e08aacStbbdev                                  static_cast<sender<output_type> *>(this) );
162349e08aacStbbdev     }
162449e08aacStbbdev 
162549e08aacStbbdev     //! Destructor
162649e08aacStbbdev     ~sequencer_node() { delete my_sequencer; }
162749e08aacStbbdev 
162849e08aacStbbdev protected:
162949e08aacStbbdev     typedef typename buffer_node<T>::size_type size_type;
163049e08aacStbbdev     typedef typename buffer_node<T>::buffer_operation sequencer_operation;
163149e08aacStbbdev 
163249e08aacStbbdev private:
163349e08aacStbbdev     bool internal_push(sequencer_operation *op) override {
163449e08aacStbbdev         size_type tag = (*my_sequencer)(*(op->elem));
163549e08aacStbbdev #if !TBB_DEPRECATED_SEQUENCER_DUPLICATES
163649e08aacStbbdev         if (tag < this->my_head) {
163749e08aacStbbdev             // have already emitted a message with this tag
163849e08aacStbbdev             op->status.store(FAILED, std::memory_order_release);
163949e08aacStbbdev             return false;
164049e08aacStbbdev         }
164149e08aacStbbdev #endif
164249e08aacStbbdev         // cannot modify this->my_tail now; the buffer would be inconsistent.
164349e08aacStbbdev         size_t new_tail = (tag+1 > this->my_tail) ? tag+1 : this->my_tail;
164449e08aacStbbdev 
164549e08aacStbbdev         if (this->size(new_tail) > this->capacity()) {
164649e08aacStbbdev             this->grow_my_array(this->size(new_tail));
164749e08aacStbbdev         }
164849e08aacStbbdev         this->my_tail = new_tail;
164949e08aacStbbdev 
165049e08aacStbbdev         const op_stat res = this->place_item(tag, *(op->elem)) ? SUCCEEDED : FAILED;
165149e08aacStbbdev         op->status.store(res, std::memory_order_release);
165249e08aacStbbdev         return res ==SUCCEEDED;
165349e08aacStbbdev     }
165449e08aacStbbdev };  // sequencer_node
165549e08aacStbbdev 
165649e08aacStbbdev //! Forwards messages in priority order
165749e08aacStbbdev template<typename T, typename Compare = std::less<T>>
165849e08aacStbbdev class priority_queue_node : public buffer_node<T> {
165949e08aacStbbdev public:
166049e08aacStbbdev     typedef T input_type;
166149e08aacStbbdev     typedef T output_type;
166249e08aacStbbdev     typedef buffer_node<T> base_type;
166349e08aacStbbdev     typedef priority_queue_node class_type;
166449e08aacStbbdev     typedef typename receiver<input_type>::predecessor_type predecessor_type;
166549e08aacStbbdev     typedef typename sender<output_type>::successor_type successor_type;
166649e08aacStbbdev 
166749e08aacStbbdev     //! Constructor
166849e08aacStbbdev     __TBB_NOINLINE_SYM explicit priority_queue_node( graph &g, const Compare& comp = Compare() )
166949e08aacStbbdev         : buffer_node<T>(g), compare(comp), mark(0) {
167049e08aacStbbdev         fgt_node( CODEPTR(), FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
167149e08aacStbbdev                                  static_cast<receiver<input_type> *>(this),
167249e08aacStbbdev                                  static_cast<sender<output_type> *>(this) );
167349e08aacStbbdev     }
167449e08aacStbbdev 
167549e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
167649e08aacStbbdev     template <typename... Args>
167749e08aacStbbdev     priority_queue_node(const node_set<Args...>& nodes, const Compare& comp = Compare())
167849e08aacStbbdev         : priority_queue_node(nodes.graph_reference(), comp) {
167949e08aacStbbdev         make_edges_in_order(nodes, *this);
168049e08aacStbbdev     }
168149e08aacStbbdev #endif
168249e08aacStbbdev 
168349e08aacStbbdev     //! Copy constructor
priority_queue_node(const priority_queue_node & src)168449e08aacStbbdev     __TBB_NOINLINE_SYM priority_queue_node( const priority_queue_node &src )
168549e08aacStbbdev         : buffer_node<T>(src), mark(0)
168649e08aacStbbdev     {
168749e08aacStbbdev         fgt_node( CODEPTR(), FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
168849e08aacStbbdev                                  static_cast<receiver<input_type> *>(this),
168949e08aacStbbdev                                  static_cast<sender<output_type> *>(this) );
169049e08aacStbbdev     }
169149e08aacStbbdev 
169249e08aacStbbdev protected:
169349e08aacStbbdev 
reset_node(reset_flags f)169449e08aacStbbdev     void reset_node( reset_flags f) override {
169549e08aacStbbdev         mark = 0;
169649e08aacStbbdev         base_type::reset_node(f);
169749e08aacStbbdev     }
169849e08aacStbbdev 
169949e08aacStbbdev     typedef typename buffer_node<T>::size_type size_type;
170049e08aacStbbdev     typedef typename buffer_node<T>::item_type item_type;
170149e08aacStbbdev     typedef typename buffer_node<T>::buffer_operation prio_operation;
170249e08aacStbbdev 
170349e08aacStbbdev     //! Tries to forward valid items to successors
internal_forward_task(prio_operation * op)170449e08aacStbbdev     void internal_forward_task(prio_operation *op) override {
170549e08aacStbbdev         this->internal_forward_task_impl(op, this);
170649e08aacStbbdev     }
170749e08aacStbbdev 
handle_operations(prio_operation * op_list)170849e08aacStbbdev     void handle_operations(prio_operation *op_list) override {
170949e08aacStbbdev         this->handle_operations_impl(op_list, this);
171049e08aacStbbdev     }
171149e08aacStbbdev 
internal_push(prio_operation * op)171249e08aacStbbdev     bool internal_push(prio_operation *op) override {
171349e08aacStbbdev         prio_push(*(op->elem));
171449e08aacStbbdev         op->status.store(SUCCEEDED, std::memory_order_release);
171549e08aacStbbdev         return true;
171649e08aacStbbdev     }
171749e08aacStbbdev 
internal_pop(prio_operation * op)171849e08aacStbbdev     void internal_pop(prio_operation *op) override {
171949e08aacStbbdev         // if empty or already reserved, don't pop
172049e08aacStbbdev         if ( this->my_reserved == true || this->my_tail == 0 ) {
172149e08aacStbbdev             op->status.store(FAILED, std::memory_order_release);
172249e08aacStbbdev             return;
172349e08aacStbbdev         }
172449e08aacStbbdev 
172549e08aacStbbdev         *(op->elem) = prio();
172649e08aacStbbdev         op->status.store(SUCCEEDED, std::memory_order_release);
172749e08aacStbbdev         prio_pop();
172849e08aacStbbdev 
172949e08aacStbbdev     }
173049e08aacStbbdev 
173149e08aacStbbdev     // pops the highest-priority item, saves copy
internal_reserve(prio_operation * op)173249e08aacStbbdev     void internal_reserve(prio_operation *op) override {
173349e08aacStbbdev         if (this->my_reserved == true || this->my_tail == 0) {
173449e08aacStbbdev             op->status.store(FAILED, std::memory_order_release);
173549e08aacStbbdev             return;
173649e08aacStbbdev         }
173749e08aacStbbdev         this->my_reserved = true;
173849e08aacStbbdev         *(op->elem) = prio();
173949e08aacStbbdev         reserved_item = *(op->elem);
174049e08aacStbbdev         op->status.store(SUCCEEDED, std::memory_order_release);
174149e08aacStbbdev         prio_pop();
174249e08aacStbbdev     }
174349e08aacStbbdev 
internal_consume(prio_operation * op)174449e08aacStbbdev     void internal_consume(prio_operation *op) override {
174549e08aacStbbdev         op->status.store(SUCCEEDED, std::memory_order_release);
174649e08aacStbbdev         this->my_reserved = false;
174749e08aacStbbdev         reserved_item = input_type();
174849e08aacStbbdev     }
174949e08aacStbbdev 
internal_release(prio_operation * op)175049e08aacStbbdev     void internal_release(prio_operation *op) override {
175149e08aacStbbdev         op->status.store(SUCCEEDED, std::memory_order_release);
175249e08aacStbbdev         prio_push(reserved_item);
175349e08aacStbbdev         this->my_reserved = false;
175449e08aacStbbdev         reserved_item = input_type();
175549e08aacStbbdev     }
175649e08aacStbbdev 
175749e08aacStbbdev private:
175849e08aacStbbdev     template<typename> friend class buffer_node;
175949e08aacStbbdev 
order()176049e08aacStbbdev     void order() {
176149e08aacStbbdev         if (mark < this->my_tail) heapify();
176249e08aacStbbdev         __TBB_ASSERT(mark == this->my_tail, "mark unequal after heapify");
176349e08aacStbbdev     }
176449e08aacStbbdev 
is_item_valid()176549e08aacStbbdev     bool is_item_valid() {
176649e08aacStbbdev         return this->my_tail > 0;
176749e08aacStbbdev     }
176849e08aacStbbdev 
try_put_and_add_task(graph_task * & last_task)176949e08aacStbbdev     void try_put_and_add_task(graph_task*& last_task) {
177049e08aacStbbdev         graph_task * new_task = this->my_successors.try_put_task(this->prio());
177149e08aacStbbdev         if (new_task) {
177249e08aacStbbdev             // workaround for icc bug
177349e08aacStbbdev             graph& graph_ref = this->graph_reference();
177449e08aacStbbdev             last_task = combine_tasks(graph_ref, last_task, new_task);
177549e08aacStbbdev             prio_pop();
177649e08aacStbbdev         }
177749e08aacStbbdev     }
177849e08aacStbbdev 
177949e08aacStbbdev private:
178049e08aacStbbdev     Compare compare;
178149e08aacStbbdev     size_type mark;
178249e08aacStbbdev 
178349e08aacStbbdev     input_type reserved_item;
178449e08aacStbbdev 
178549e08aacStbbdev     // in case a reheap has not been done after a push, check if the mark item is higher than the 0'th item
prio_use_tail()178649e08aacStbbdev     bool prio_use_tail() {
178749e08aacStbbdev         __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds before test");
178849e08aacStbbdev         return mark < this->my_tail && compare(this->get_my_item(0), this->get_my_item(this->my_tail - 1));
178949e08aacStbbdev     }
179049e08aacStbbdev 
179149e08aacStbbdev     // prio_push: checks that the item will fit, expand array if necessary, put at end
prio_push(const T & src)179249e08aacStbbdev     void prio_push(const T &src) {
179349e08aacStbbdev         if ( this->my_tail >= this->my_array_size )
179449e08aacStbbdev             this->grow_my_array( this->my_tail + 1 );
179549e08aacStbbdev         (void) this->place_item(this->my_tail, src);
179649e08aacStbbdev         ++(this->my_tail);
179749e08aacStbbdev         __TBB_ASSERT(mark < this->my_tail, "mark outside bounds after push");
179849e08aacStbbdev     }
179949e08aacStbbdev 
180049e08aacStbbdev     // prio_pop: deletes highest priority item from the array, and if it is item
180149e08aacStbbdev     // 0, move last item to 0 and reheap.  If end of array, just destroy and decrement tail
180249e08aacStbbdev     // and mark.  Assumes the array has already been tested for emptiness; no failure.
prio_pop()180349e08aacStbbdev     void prio_pop()  {
180449e08aacStbbdev         if (prio_use_tail()) {
180549e08aacStbbdev             // there are newly pushed elements; last one higher than top
180649e08aacStbbdev             // copy the data
180749e08aacStbbdev             this->destroy_item(this->my_tail-1);
180849e08aacStbbdev             --(this->my_tail);
180949e08aacStbbdev             __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
181049e08aacStbbdev             return;
181149e08aacStbbdev         }
181249e08aacStbbdev         this->destroy_item(0);
181349e08aacStbbdev         if(this->my_tail > 1) {
181449e08aacStbbdev             // push the last element down heap
181557f524caSIlya Isaev             __TBB_ASSERT(this->my_item_valid(this->my_tail - 1), nullptr);
181649e08aacStbbdev             this->move_item(0,this->my_tail - 1);
181749e08aacStbbdev         }
181849e08aacStbbdev         --(this->my_tail);
181949e08aacStbbdev         if(mark > this->my_tail) --mark;
182049e08aacStbbdev         if (this->my_tail > 1) // don't reheap for heap of size 1
182149e08aacStbbdev             reheap();
182249e08aacStbbdev         __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
182349e08aacStbbdev     }
182449e08aacStbbdev 
prio()182549e08aacStbbdev     const T& prio() {
182649e08aacStbbdev         return this->get_my_item(prio_use_tail() ? this->my_tail-1 : 0);
182749e08aacStbbdev     }
182849e08aacStbbdev 
182949e08aacStbbdev     // turn array into heap
heapify()183049e08aacStbbdev     void heapify() {
183149e08aacStbbdev         if(this->my_tail == 0) {
183249e08aacStbbdev             mark = 0;
183349e08aacStbbdev             return;
183449e08aacStbbdev         }
183549e08aacStbbdev         if (!mark) mark = 1;
183649e08aacStbbdev         for (; mark<this->my_tail; ++mark) { // for each unheaped element
183749e08aacStbbdev             size_type cur_pos = mark;
183849e08aacStbbdev             input_type to_place;
183949e08aacStbbdev             this->fetch_item(mark,to_place);
184049e08aacStbbdev             do { // push to_place up the heap
184149e08aacStbbdev                 size_type parent = (cur_pos-1)>>1;
184249e08aacStbbdev                 if (!compare(this->get_my_item(parent), to_place))
184349e08aacStbbdev                     break;
184449e08aacStbbdev                 this->move_item(cur_pos, parent);
184549e08aacStbbdev                 cur_pos = parent;
184649e08aacStbbdev             } while( cur_pos );
184749e08aacStbbdev             (void) this->place_item(cur_pos, to_place);
184849e08aacStbbdev         }
184949e08aacStbbdev     }
185049e08aacStbbdev 
185149e08aacStbbdev     // otherwise heapified array with new root element; rearrange to heap
reheap()185249e08aacStbbdev     void reheap() {
185349e08aacStbbdev         size_type cur_pos=0, child=1;
185449e08aacStbbdev         while (child < mark) {
185549e08aacStbbdev             size_type target = child;
185649e08aacStbbdev             if (child+1<mark &&
185749e08aacStbbdev                 compare(this->get_my_item(child),
185849e08aacStbbdev                         this->get_my_item(child+1)))
185949e08aacStbbdev                 ++target;
186049e08aacStbbdev             // target now has the higher priority child
186149e08aacStbbdev             if (compare(this->get_my_item(target),
186249e08aacStbbdev                         this->get_my_item(cur_pos)))
186349e08aacStbbdev                 break;
186449e08aacStbbdev             // swap
186549e08aacStbbdev             this->swap_items(cur_pos, target);
186649e08aacStbbdev             cur_pos = target;
186749e08aacStbbdev             child = (cur_pos<<1)+1;
186849e08aacStbbdev         }
186949e08aacStbbdev     }
187049e08aacStbbdev };  // priority_queue_node
187149e08aacStbbdev 
187249e08aacStbbdev //! Forwards messages only if the threshold has not been reached
187349e08aacStbbdev /** This node forwards items until its threshold is reached.
187449e08aacStbbdev     It contains no buffering.  If the downstream node rejects, the
187549e08aacStbbdev     message is dropped. */
187649e08aacStbbdev template< typename T, typename DecrementType=continue_msg >
187749e08aacStbbdev class limiter_node : public graph_node, public receiver< T >, public sender< T > {
187849e08aacStbbdev public:
187949e08aacStbbdev     typedef T input_type;
188049e08aacStbbdev     typedef T output_type;
188149e08aacStbbdev     typedef typename receiver<input_type>::predecessor_type predecessor_type;
188249e08aacStbbdev     typedef typename sender<output_type>::successor_type successor_type;
188349e08aacStbbdev     //TODO: There is a lack of predefined types for its controlling "decrementer" port. It should be fixed later.
188449e08aacStbbdev 
188549e08aacStbbdev private:
188649e08aacStbbdev     size_t my_threshold;
188749e08aacStbbdev     size_t my_count; // number of successful puts
188849e08aacStbbdev     size_t my_tries; // number of active put attempts
18890815661eSIlya Mishin     size_t my_future_decrement; // number of active decrement
189049e08aacStbbdev     reservable_predecessor_cache< T, spin_mutex > my_predecessors;
189149e08aacStbbdev     spin_mutex my_mutex;
189249e08aacStbbdev     broadcast_cache< T > my_successors;
189349e08aacStbbdev 
189449e08aacStbbdev     //! The internal receiver< DecrementType > that adjusts the count
189549e08aacStbbdev     threshold_regulator< limiter_node<T, DecrementType>, DecrementType > decrement;
189649e08aacStbbdev 
decrement_counter(long long delta)189749e08aacStbbdev     graph_task* decrement_counter( long long delta ) {
18980815661eSIlya Mishin         if ( delta > 0 && size_t(delta) > my_threshold ) {
18990815661eSIlya Mishin             delta = my_threshold;
19000815661eSIlya Mishin         }
19010815661eSIlya Mishin 
190249e08aacStbbdev         {
190349e08aacStbbdev             spin_mutex::scoped_lock lock(my_mutex);
190425e2b1ddSIlya Mishin             if ( delta > 0 && size_t(delta) > my_count ) {
19050815661eSIlya Mishin                 if( my_tries > 0 ) {
19060815661eSIlya Mishin                     my_future_decrement += (size_t(delta) - my_count);
19070815661eSIlya Mishin                 }
190849e08aacStbbdev                 my_count = 0;
190925e2b1ddSIlya Mishin             }
191025e2b1ddSIlya Mishin             else if ( delta < 0 && size_t(-delta) > my_threshold - my_count ) {
191149e08aacStbbdev                 my_count = my_threshold;
191225e2b1ddSIlya Mishin             }
191325e2b1ddSIlya Mishin             else {
191449e08aacStbbdev                 my_count -= size_t(delta); // absolute value of delta is sufficiently small
191549e08aacStbbdev             }
191625e2b1ddSIlya Mishin             __TBB_ASSERT(my_count <= my_threshold, "counter values are truncated to be inside the [0, threshold] interval");
191725e2b1ddSIlya Mishin         }
191849e08aacStbbdev         return forward_task();
191949e08aacStbbdev     }
192049e08aacStbbdev 
192149e08aacStbbdev     // Let threshold_regulator call decrement_counter()
192249e08aacStbbdev     friend class threshold_regulator< limiter_node<T, DecrementType>, DecrementType >;
192349e08aacStbbdev 
192449e08aacStbbdev     friend class forward_task_bypass< limiter_node<T,DecrementType> >;
192549e08aacStbbdev 
check_conditions()192649e08aacStbbdev     bool check_conditions() {  // always called under lock
192749e08aacStbbdev         return ( my_count + my_tries < my_threshold && !my_predecessors.empty() && !my_successors.empty() );
192849e08aacStbbdev     }
192949e08aacStbbdev 
193057f524caSIlya Isaev     // only returns a valid task pointer or nullptr, never SUCCESSFULLY_ENQUEUED
forward_task()193149e08aacStbbdev     graph_task* forward_task() {
193249e08aacStbbdev         input_type v;
193357f524caSIlya Isaev         graph_task* rval = nullptr;
193449e08aacStbbdev         bool reserved = false;
19350815661eSIlya Mishin 
193649e08aacStbbdev         {
193749e08aacStbbdev             spin_mutex::scoped_lock lock(my_mutex);
193849e08aacStbbdev             if ( check_conditions() )
193949e08aacStbbdev                 ++my_tries;
194049e08aacStbbdev             else
194157f524caSIlya Isaev                 return nullptr;
194249e08aacStbbdev         }
194349e08aacStbbdev 
194449e08aacStbbdev         //SUCCESS
194549e08aacStbbdev         // if we can reserve and can put, we consume the reservation
194649e08aacStbbdev         // we increment the count and decrement the tries
194749e08aacStbbdev         if ( (my_predecessors.try_reserve(v)) == true ) {
194849e08aacStbbdev             reserved = true;
194957f524caSIlya Isaev             if ( (rval = my_successors.try_put_task(v)) != nullptr ) {
195049e08aacStbbdev                 {
195149e08aacStbbdev                     spin_mutex::scoped_lock lock(my_mutex);
195249e08aacStbbdev                     ++my_count;
19530815661eSIlya Mishin                     if ( my_future_decrement ) {
19540815661eSIlya Mishin                         if ( my_count > my_future_decrement ) {
19550815661eSIlya Mishin                             my_count -= my_future_decrement;
19560815661eSIlya Mishin                             my_future_decrement = 0;
19570815661eSIlya Mishin                         }
19580815661eSIlya Mishin                         else {
19590815661eSIlya Mishin                             my_future_decrement -= my_count;
19600815661eSIlya Mishin                             my_count = 0;
19610815661eSIlya Mishin                         }
19620815661eSIlya Mishin                     }
196349e08aacStbbdev                     --my_tries;
196449e08aacStbbdev                     my_predecessors.try_consume();
196549e08aacStbbdev                     if ( check_conditions() ) {
196649e08aacStbbdev                         if ( is_graph_active(this->my_graph) ) {
196749e08aacStbbdev                             typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
196849e08aacStbbdev                             small_object_allocator allocator{};
196949e08aacStbbdev                             graph_task* rtask = allocator.new_object<task_type>( my_graph, allocator, *this );
197049e08aacStbbdev                             my_graph.reserve_wait();
197149e08aacStbbdev                             spawn_in_graph_arena(graph_reference(), *rtask);
197249e08aacStbbdev                         }
197349e08aacStbbdev                     }
197449e08aacStbbdev                 }
197549e08aacStbbdev                 return rval;
197649e08aacStbbdev             }
197749e08aacStbbdev         }
197849e08aacStbbdev         //FAILURE
197949e08aacStbbdev         //if we can't reserve, we decrement the tries
198049e08aacStbbdev         //if we can reserve but can't put, we decrement the tries and release the reservation
198149e08aacStbbdev         {
198249e08aacStbbdev             spin_mutex::scoped_lock lock(my_mutex);
198349e08aacStbbdev             --my_tries;
198449e08aacStbbdev             if (reserved) my_predecessors.try_release();
198549e08aacStbbdev             if ( check_conditions() ) {
198649e08aacStbbdev                 if ( is_graph_active(this->my_graph) ) {
198749e08aacStbbdev                     small_object_allocator allocator{};
198849e08aacStbbdev                     typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
198949e08aacStbbdev                     graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this);
199049e08aacStbbdev                     my_graph.reserve_wait();
199149e08aacStbbdev                     __TBB_ASSERT(!rval, "Have two tasks to handle");
199249e08aacStbbdev                     return t;
199349e08aacStbbdev                 }
199449e08aacStbbdev             }
199549e08aacStbbdev             return rval;
199649e08aacStbbdev         }
199749e08aacStbbdev     }
199849e08aacStbbdev 
initialize()199949e08aacStbbdev     void initialize() {
200049e08aacStbbdev         fgt_node(
200149e08aacStbbdev             CODEPTR(), FLOW_LIMITER_NODE, &this->my_graph,
200249e08aacStbbdev             static_cast<receiver<input_type> *>(this), static_cast<receiver<DecrementType> *>(&decrement),
200349e08aacStbbdev             static_cast<sender<output_type> *>(this)
200449e08aacStbbdev         );
200549e08aacStbbdev     }
200649e08aacStbbdev 
200749e08aacStbbdev public:
200849e08aacStbbdev     //! Constructor
limiter_node(graph & g,size_t threshold)200949e08aacStbbdev     limiter_node(graph &g, size_t threshold)
20100815661eSIlya Mishin         : graph_node(g), my_threshold(threshold), my_count(0), my_tries(0), my_future_decrement(0),
20110815661eSIlya Mishin         my_predecessors(this), my_successors(this), decrement(this)
201249e08aacStbbdev     {
201349e08aacStbbdev         initialize();
201449e08aacStbbdev     }
201549e08aacStbbdev 
201649e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
201749e08aacStbbdev     template <typename... Args>
limiter_node(const node_set<Args...> & nodes,size_t threshold)201849e08aacStbbdev     limiter_node(const node_set<Args...>& nodes, size_t threshold)
201949e08aacStbbdev         : limiter_node(nodes.graph_reference(), threshold) {
202049e08aacStbbdev         make_edges_in_order(nodes, *this);
202149e08aacStbbdev     }
202249e08aacStbbdev #endif
202349e08aacStbbdev 
202449e08aacStbbdev     //! Copy constructor
limiter_node(const limiter_node & src)202549e08aacStbbdev     limiter_node( const limiter_node& src ) : limiter_node(src.my_graph, src.my_threshold) {}
202649e08aacStbbdev 
202749e08aacStbbdev     //! The interface for accessing internal receiver< DecrementType > that adjusts the count
decrementer()202849e08aacStbbdev     receiver<DecrementType>& decrementer() { return decrement; }
202949e08aacStbbdev 
203049e08aacStbbdev     //! Replace the current successor with this new successor
register_successor(successor_type & r)203149e08aacStbbdev     bool register_successor( successor_type &r ) override {
203249e08aacStbbdev         spin_mutex::scoped_lock lock(my_mutex);
203349e08aacStbbdev         bool was_empty = my_successors.empty();
203449e08aacStbbdev         my_successors.register_successor(r);
203549e08aacStbbdev         //spawn a forward task if this is the only successor
203649e08aacStbbdev         if ( was_empty && !my_predecessors.empty() && my_count + my_tries < my_threshold ) {
203749e08aacStbbdev             if ( is_graph_active(this->my_graph) ) {
203849e08aacStbbdev                 small_object_allocator allocator{};
203949e08aacStbbdev                 typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
204049e08aacStbbdev                 graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this);
204149e08aacStbbdev                 my_graph.reserve_wait();
204249e08aacStbbdev                 spawn_in_graph_arena(graph_reference(), *t);
204349e08aacStbbdev             }
204449e08aacStbbdev         }
204549e08aacStbbdev         return true;
204649e08aacStbbdev     }
204749e08aacStbbdev 
204849e08aacStbbdev     //! Removes a successor from this node
204949e08aacStbbdev     /** r.remove_predecessor(*this) is also called. */
remove_successor(successor_type & r)205049e08aacStbbdev     bool remove_successor( successor_type &r ) override {
205149e08aacStbbdev         // TODO revamp: investigate why qualification is needed for remove_predecessor() call
205249e08aacStbbdev         tbb::detail::d1::remove_predecessor(r, *this);
205349e08aacStbbdev         my_successors.remove_successor(r);
205449e08aacStbbdev         return true;
205549e08aacStbbdev     }
205649e08aacStbbdev 
205749e08aacStbbdev     //! Adds src to the list of cached predecessors.
register_predecessor(predecessor_type & src)205849e08aacStbbdev     bool register_predecessor( predecessor_type &src ) override {
205949e08aacStbbdev         spin_mutex::scoped_lock lock(my_mutex);
206049e08aacStbbdev         my_predecessors.add( src );
206149e08aacStbbdev         if ( my_count + my_tries < my_threshold && !my_successors.empty() && is_graph_active(this->my_graph) ) {
206249e08aacStbbdev             small_object_allocator allocator{};
206349e08aacStbbdev             typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
206449e08aacStbbdev             graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this);
206549e08aacStbbdev             my_graph.reserve_wait();
206649e08aacStbbdev             spawn_in_graph_arena(graph_reference(), *t);
206749e08aacStbbdev         }
206849e08aacStbbdev         return true;
206949e08aacStbbdev     }
207049e08aacStbbdev 
207149e08aacStbbdev     //! Removes src from the list of cached predecessors.
remove_predecessor(predecessor_type & src)207249e08aacStbbdev     bool remove_predecessor( predecessor_type &src ) override {
207349e08aacStbbdev         my_predecessors.remove( src );
207449e08aacStbbdev         return true;
207549e08aacStbbdev     }
207649e08aacStbbdev 
207749e08aacStbbdev protected:
207849e08aacStbbdev 
207949e08aacStbbdev     template< typename R, typename B > friend class run_and_put_task;
208049e08aacStbbdev     template<typename X, typename Y> friend class broadcast_cache;
208149e08aacStbbdev     template<typename X, typename Y> friend class round_robin_cache;
208249e08aacStbbdev     //! Puts an item to this receiver
try_put_task(const T & t)208349e08aacStbbdev     graph_task* try_put_task( const T &t ) override {
208449e08aacStbbdev         {
208549e08aacStbbdev             spin_mutex::scoped_lock lock(my_mutex);
208649e08aacStbbdev             if ( my_count + my_tries >= my_threshold )
208757f524caSIlya Isaev                 return nullptr;
208849e08aacStbbdev             else
208949e08aacStbbdev                 ++my_tries;
209049e08aacStbbdev         }
209149e08aacStbbdev 
209249e08aacStbbdev         graph_task* rtask = my_successors.try_put_task(t);
209349e08aacStbbdev         if ( !rtask ) {  // try_put_task failed.
209449e08aacStbbdev             spin_mutex::scoped_lock lock(my_mutex);
209549e08aacStbbdev             --my_tries;
209649e08aacStbbdev             if (check_conditions() && is_graph_active(this->my_graph)) {
209749e08aacStbbdev                 small_object_allocator allocator{};
209849e08aacStbbdev                 typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
209949e08aacStbbdev                 rtask = allocator.new_object<task_type>(my_graph, allocator, *this);
210049e08aacStbbdev                 my_graph.reserve_wait();
210149e08aacStbbdev             }
210249e08aacStbbdev         }
210349e08aacStbbdev         else {
210449e08aacStbbdev             spin_mutex::scoped_lock lock(my_mutex);
210549e08aacStbbdev             ++my_count;
21060815661eSIlya Mishin             if ( my_future_decrement ) {
21070815661eSIlya Mishin                 if ( my_count > my_future_decrement ) {
21080815661eSIlya Mishin                     my_count -= my_future_decrement;
21090815661eSIlya Mishin                     my_future_decrement = 0;
21100815661eSIlya Mishin                 }
21110815661eSIlya Mishin                 else {
21120815661eSIlya Mishin                     my_future_decrement -= my_count;
21130815661eSIlya Mishin                     my_count = 0;
21140815661eSIlya Mishin                 }
21150815661eSIlya Mishin             }
211649e08aacStbbdev             --my_tries;
211749e08aacStbbdev         }
211849e08aacStbbdev         return rtask;
211949e08aacStbbdev     }
212049e08aacStbbdev 
graph_reference()212149e08aacStbbdev     graph& graph_reference() const override { return my_graph; }
212249e08aacStbbdev 
reset_node(reset_flags f)212349e08aacStbbdev     void reset_node( reset_flags f ) override {
212449e08aacStbbdev         my_count = 0;
212549e08aacStbbdev         if ( f & rf_clear_edges ) {
212649e08aacStbbdev             my_predecessors.clear();
212749e08aacStbbdev             my_successors.clear();
212849e08aacStbbdev         }
21290815661eSIlya Mishin         else {
213049e08aacStbbdev             my_predecessors.reset();
213149e08aacStbbdev         }
213249e08aacStbbdev         decrement.reset_receiver(f);
213349e08aacStbbdev     }
213449e08aacStbbdev };  // limiter_node
213549e08aacStbbdev 
213649e08aacStbbdev #include "detail/_flow_graph_join_impl.h"
213749e08aacStbbdev 
213849e08aacStbbdev template<typename OutputTuple, typename JP=queueing> class join_node;
213949e08aacStbbdev 
214049e08aacStbbdev template<typename OutputTuple>
214149e08aacStbbdev class join_node<OutputTuple,reserving>: public unfolded_join_node<std::tuple_size<OutputTuple>::value, reserving_port, OutputTuple, reserving> {
214249e08aacStbbdev private:
214349e08aacStbbdev     static const int N = std::tuple_size<OutputTuple>::value;
214449e08aacStbbdev     typedef unfolded_join_node<N, reserving_port, OutputTuple, reserving> unfolded_type;
214549e08aacStbbdev public:
214649e08aacStbbdev     typedef OutputTuple output_type;
214749e08aacStbbdev     typedef typename unfolded_type::input_ports_type input_ports_type;
join_node(graph & g)214849e08aacStbbdev      __TBB_NOINLINE_SYM explicit join_node(graph &g) : unfolded_type(g) {
214949e08aacStbbdev         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_RESERVING, &this->my_graph,
215049e08aacStbbdev                                             this->input_ports(), static_cast< sender< output_type > *>(this) );
215149e08aacStbbdev     }
215249e08aacStbbdev 
215349e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
215449e08aacStbbdev     template <typename... Args>
215549e08aacStbbdev     __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, reserving = reserving()) : join_node(nodes.graph_reference()) {
215649e08aacStbbdev         make_edges_in_order(nodes, *this);
215749e08aacStbbdev     }
215849e08aacStbbdev #endif
215949e08aacStbbdev 
join_node(const join_node & other)216049e08aacStbbdev     __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) {
216149e08aacStbbdev         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_RESERVING, &this->my_graph,
216249e08aacStbbdev                                             this->input_ports(), static_cast< sender< output_type > *>(this) );
216349e08aacStbbdev     }
216449e08aacStbbdev 
216549e08aacStbbdev };
216649e08aacStbbdev 
216749e08aacStbbdev template<typename OutputTuple>
216849e08aacStbbdev class join_node<OutputTuple,queueing>: public unfolded_join_node<std::tuple_size<OutputTuple>::value, queueing_port, OutputTuple, queueing> {
216949e08aacStbbdev private:
217049e08aacStbbdev     static const int N = std::tuple_size<OutputTuple>::value;
217149e08aacStbbdev     typedef unfolded_join_node<N, queueing_port, OutputTuple, queueing> unfolded_type;
217249e08aacStbbdev public:
217349e08aacStbbdev     typedef OutputTuple output_type;
217449e08aacStbbdev     typedef typename unfolded_type::input_ports_type input_ports_type;
join_node(graph & g)217549e08aacStbbdev      __TBB_NOINLINE_SYM explicit join_node(graph &g) : unfolded_type(g) {
217649e08aacStbbdev         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
217749e08aacStbbdev                                             this->input_ports(), static_cast< sender< output_type > *>(this) );
217849e08aacStbbdev     }
217949e08aacStbbdev 
218049e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
218149e08aacStbbdev     template <typename... Args>
218249e08aacStbbdev     __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, queueing = queueing()) : join_node(nodes.graph_reference()) {
218349e08aacStbbdev         make_edges_in_order(nodes, *this);
218449e08aacStbbdev     }
218549e08aacStbbdev #endif
218649e08aacStbbdev 
join_node(const join_node & other)218749e08aacStbbdev     __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) {
218849e08aacStbbdev         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
218949e08aacStbbdev                                             this->input_ports(), static_cast< sender< output_type > *>(this) );
219049e08aacStbbdev     }
219149e08aacStbbdev 
219249e08aacStbbdev };
219349e08aacStbbdev 
2194478de5b1Stbbdev #if __TBB_CPP20_CONCEPTS_PRESENT
2195478de5b1Stbbdev // Helper function which is well-formed only if all of the elements in OutputTuple
2196478de5b1Stbbdev // satisfies join_node_function_object<body[i], tuple[i], K>
2197478de5b1Stbbdev template <typename OutputTuple, typename K,
2198478de5b1Stbbdev           typename... Functions, std::size_t... Idx>
2199478de5b1Stbbdev void join_node_function_objects_helper( std::index_sequence<Idx...> )
2200478de5b1Stbbdev     requires (std::tuple_size_v<OutputTuple> == sizeof...(Functions)) &&
2201478de5b1Stbbdev              (... && join_node_function_object<Functions, std::tuple_element_t<Idx, OutputTuple>, K>);
2202478de5b1Stbbdev 
2203478de5b1Stbbdev template <typename OutputTuple, typename K, typename... Functions>
2204478de5b1Stbbdev concept join_node_functions = requires {
2205478de5b1Stbbdev     join_node_function_objects_helper<OutputTuple, K, Functions...>(std::make_index_sequence<sizeof...(Functions)>{});
2206478de5b1Stbbdev };
2207478de5b1Stbbdev 
2208478de5b1Stbbdev #endif
2209478de5b1Stbbdev 
221049e08aacStbbdev // template for key_matching join_node
221149e08aacStbbdev // tag_matching join_node is a specialization of key_matching, and is source-compatible.
221249e08aacStbbdev template<typename OutputTuple, typename K, typename KHash>
221349e08aacStbbdev class join_node<OutputTuple, key_matching<K, KHash> > : public unfolded_join_node<std::tuple_size<OutputTuple>::value,
221449e08aacStbbdev       key_matching_port, OutputTuple, key_matching<K,KHash> > {
221549e08aacStbbdev private:
221649e08aacStbbdev     static const int N = std::tuple_size<OutputTuple>::value;
221749e08aacStbbdev     typedef unfolded_join_node<N, key_matching_port, OutputTuple, key_matching<K,KHash> > unfolded_type;
221849e08aacStbbdev public:
221949e08aacStbbdev     typedef OutputTuple output_type;
222049e08aacStbbdev     typedef typename unfolded_type::input_ports_type input_ports_type;
222149e08aacStbbdev 
222249e08aacStbbdev #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
join_node(graph & g)222349e08aacStbbdev     join_node(graph &g) : unfolded_type(g) {}
222449e08aacStbbdev #endif  /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
222549e08aacStbbdev 
222649e08aacStbbdev     template<typename __TBB_B0, typename __TBB_B1>
__TBB_requires(join_node_functions<OutputTuple,K,__TBB_B0,__TBB_B1>)2227478de5b1Stbbdev         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1>)
222849e08aacStbbdev      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1) : unfolded_type(g, b0, b1) {
222949e08aacStbbdev         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
223049e08aacStbbdev                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
223149e08aacStbbdev     }
223249e08aacStbbdev     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2>
__TBB_requires(join_node_functions<OutputTuple,K,__TBB_B0,__TBB_B1,__TBB_B2>)2233478de5b1Stbbdev         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2>)
223449e08aacStbbdev      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2) : unfolded_type(g, b0, b1, b2) {
223549e08aacStbbdev         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
223649e08aacStbbdev                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
223749e08aacStbbdev     }
223849e08aacStbbdev     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3>
__TBB_requires(join_node_functions<OutputTuple,K,__TBB_B0,__TBB_B1,__TBB_B2,__TBB_B3>)2239478de5b1Stbbdev         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3>)
224049e08aacStbbdev      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3) : unfolded_type(g, b0, b1, b2, b3) {
224149e08aacStbbdev         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
224249e08aacStbbdev                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
224349e08aacStbbdev     }
224449e08aacStbbdev     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4>
__TBB_requires(join_node_functions<OutputTuple,K,__TBB_B0,__TBB_B1,__TBB_B2,__TBB_B3,__TBB_B4>)2245478de5b1Stbbdev         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4>)
224649e08aacStbbdev      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4) :
224749e08aacStbbdev             unfolded_type(g, b0, b1, b2, b3, b4) {
224849e08aacStbbdev         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
224949e08aacStbbdev                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
225049e08aacStbbdev     }
225149e08aacStbbdev #if __TBB_VARIADIC_MAX >= 6
225249e08aacStbbdev     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
225349e08aacStbbdev         typename __TBB_B5>
__TBB_requires(join_node_functions<OutputTuple,K,__TBB_B0,__TBB_B1,__TBB_B2,__TBB_B3,__TBB_B4,__TBB_B5>)2254478de5b1Stbbdev         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5>)
225549e08aacStbbdev      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5) :
225649e08aacStbbdev             unfolded_type(g, b0, b1, b2, b3, b4, b5) {
225749e08aacStbbdev         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
225849e08aacStbbdev                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
225949e08aacStbbdev     }
226049e08aacStbbdev #endif
226149e08aacStbbdev #if __TBB_VARIADIC_MAX >= 7
226249e08aacStbbdev     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
226349e08aacStbbdev         typename __TBB_B5, typename __TBB_B6>
__TBB_requires(join_node_functions<OutputTuple,K,__TBB_B0,__TBB_B1,__TBB_B2,__TBB_B3,__TBB_B4,__TBB_B5,__TBB_B6>)2264478de5b1Stbbdev         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5, __TBB_B6>)
226549e08aacStbbdev      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6) :
226649e08aacStbbdev             unfolded_type(g, b0, b1, b2, b3, b4, b5, b6) {
226749e08aacStbbdev         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
226849e08aacStbbdev                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
226949e08aacStbbdev     }
227049e08aacStbbdev #endif
227149e08aacStbbdev #if __TBB_VARIADIC_MAX >= 8
227249e08aacStbbdev     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
227349e08aacStbbdev         typename __TBB_B5, typename __TBB_B6, typename __TBB_B7>
__TBB_requires(join_node_functions<OutputTuple,K,__TBB_B0,__TBB_B1,__TBB_B2,__TBB_B3,__TBB_B4,__TBB_B5,__TBB_B6,__TBB_B7>)2274478de5b1Stbbdev         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5, __TBB_B6, __TBB_B7>)
227549e08aacStbbdev      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
227649e08aacStbbdev             __TBB_B7 b7) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7) {
227749e08aacStbbdev         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
227849e08aacStbbdev                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
227949e08aacStbbdev     }
228049e08aacStbbdev #endif
228149e08aacStbbdev #if __TBB_VARIADIC_MAX >= 9
228249e08aacStbbdev     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
228349e08aacStbbdev         typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8>
__TBB_requires(join_node_functions<OutputTuple,K,__TBB_B0,__TBB_B1,__TBB_B2,__TBB_B3,__TBB_B4,__TBB_B5,__TBB_B6,__TBB_B7,__TBB_B8>)2284478de5b1Stbbdev         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5, __TBB_B6, __TBB_B7, __TBB_B8>)
228549e08aacStbbdev      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
228649e08aacStbbdev             __TBB_B7 b7, __TBB_B8 b8) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8) {
228749e08aacStbbdev         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
228849e08aacStbbdev                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
228949e08aacStbbdev     }
229049e08aacStbbdev #endif
229149e08aacStbbdev #if __TBB_VARIADIC_MAX >= 10
229249e08aacStbbdev     template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
229349e08aacStbbdev         typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8, typename __TBB_B9>
__TBB_requires(join_node_functions<OutputTuple,K,__TBB_B0,__TBB_B1,__TBB_B2,__TBB_B3,__TBB_B4,__TBB_B5,__TBB_B6,__TBB_B7,__TBB_B8,__TBB_B9>)2294478de5b1Stbbdev         __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5, __TBB_B6, __TBB_B7, __TBB_B8, __TBB_B9>)
229549e08aacStbbdev      __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
229649e08aacStbbdev             __TBB_B7 b7, __TBB_B8 b8, __TBB_B9 b9) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8, b9) {
229749e08aacStbbdev         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
229849e08aacStbbdev                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
229949e08aacStbbdev     }
230049e08aacStbbdev #endif
230149e08aacStbbdev 
230249e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2303b15aabb3Stbbdev     template <
2304b15aabb3Stbbdev #if (__clang_major__ == 3 && __clang_minor__ == 4)
2305b15aabb3Stbbdev         // clang 3.4 misdeduces 'Args...' for 'node_set' while it can cope with template template parameter.
2306b15aabb3Stbbdev         template<typename...> class node_set,
2307b15aabb3Stbbdev #endif
2308b15aabb3Stbbdev         typename... Args, typename... Bodies
2309b15aabb3Stbbdev     >
2310478de5b1Stbbdev     __TBB_requires((sizeof...(Bodies) == 0) || join_node_functions<OutputTuple, K, Bodies...>)
join_node(const node_set<Args...> & nodes,Bodies...bodies)231149e08aacStbbdev     __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, Bodies... bodies)
231249e08aacStbbdev         : join_node(nodes.graph_reference(), bodies...) {
231349e08aacStbbdev         make_edges_in_order(nodes, *this);
231449e08aacStbbdev     }
2315478de5b1Stbbdev #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
231649e08aacStbbdev 
join_node(const join_node & other)231749e08aacStbbdev     __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) {
231849e08aacStbbdev         fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
231949e08aacStbbdev                                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
232049e08aacStbbdev     }
232149e08aacStbbdev 
232249e08aacStbbdev };
232349e08aacStbbdev 
232449e08aacStbbdev // indexer node
232549e08aacStbbdev #include "detail/_flow_graph_indexer_impl.h"
232649e08aacStbbdev 
232749e08aacStbbdev // TODO: Implement interface with variadic template or tuple
232849e08aacStbbdev template<typename T0, typename T1=null_type, typename T2=null_type, typename T3=null_type,
232949e08aacStbbdev                       typename T4=null_type, typename T5=null_type, typename T6=null_type,
233049e08aacStbbdev                       typename T7=null_type, typename T8=null_type, typename T9=null_type> class indexer_node;
233149e08aacStbbdev 
233249e08aacStbbdev //indexer node specializations
233349e08aacStbbdev template<typename T0>
233449e08aacStbbdev class indexer_node<T0> : public unfolded_indexer_node<std::tuple<T0> > {
233549e08aacStbbdev private:
233649e08aacStbbdev     static const int N = 1;
233749e08aacStbbdev public:
233849e08aacStbbdev     typedef std::tuple<T0> InputTuple;
233949e08aacStbbdev     typedef tagged_msg<size_t, T0> output_type;
234049e08aacStbbdev     typedef unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)234149e08aacStbbdev     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
234249e08aacStbbdev         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
234349e08aacStbbdev                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
234449e08aacStbbdev     }
234549e08aacStbbdev 
234649e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
234749e08aacStbbdev     template <typename... Args>
indexer_node(const node_set<Args...> & nodes)234849e08aacStbbdev     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
234949e08aacStbbdev         make_edges_in_order(nodes, *this);
235049e08aacStbbdev     }
235149e08aacStbbdev #endif
235249e08aacStbbdev 
235349e08aacStbbdev     // Copy constructor
indexer_node(const indexer_node & other)235449e08aacStbbdev     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
235549e08aacStbbdev         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
235649e08aacStbbdev                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
235749e08aacStbbdev     }
235849e08aacStbbdev };
235949e08aacStbbdev 
236049e08aacStbbdev template<typename T0, typename T1>
236149e08aacStbbdev class indexer_node<T0, T1> : public unfolded_indexer_node<std::tuple<T0, T1> > {
236249e08aacStbbdev private:
236349e08aacStbbdev     static const int N = 2;
236449e08aacStbbdev public:
236549e08aacStbbdev     typedef std::tuple<T0, T1> InputTuple;
236649e08aacStbbdev     typedef tagged_msg<size_t, T0, T1> output_type;
236749e08aacStbbdev     typedef unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)236849e08aacStbbdev     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
236949e08aacStbbdev         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
237049e08aacStbbdev                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
237149e08aacStbbdev     }
237249e08aacStbbdev 
237349e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
237449e08aacStbbdev     template <typename... Args>
indexer_node(const node_set<Args...> & nodes)237549e08aacStbbdev     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
237649e08aacStbbdev         make_edges_in_order(nodes, *this);
237749e08aacStbbdev     }
237849e08aacStbbdev #endif
237949e08aacStbbdev 
238049e08aacStbbdev     // Copy constructor
indexer_node(const indexer_node & other)238149e08aacStbbdev     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
238249e08aacStbbdev         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
238349e08aacStbbdev                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
238449e08aacStbbdev     }
238549e08aacStbbdev 
238649e08aacStbbdev };
238749e08aacStbbdev 
238849e08aacStbbdev template<typename T0, typename T1, typename T2>
238949e08aacStbbdev class indexer_node<T0, T1, T2> : public unfolded_indexer_node<std::tuple<T0, T1, T2> > {
239049e08aacStbbdev private:
239149e08aacStbbdev     static const int N = 3;
239249e08aacStbbdev public:
239349e08aacStbbdev     typedef std::tuple<T0, T1, T2> InputTuple;
239449e08aacStbbdev     typedef tagged_msg<size_t, T0, T1, T2> output_type;
239549e08aacStbbdev     typedef unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)239649e08aacStbbdev     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
239749e08aacStbbdev         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
239849e08aacStbbdev                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
239949e08aacStbbdev     }
240049e08aacStbbdev 
240149e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
240249e08aacStbbdev     template <typename... Args>
indexer_node(const node_set<Args...> & nodes)240349e08aacStbbdev     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
240449e08aacStbbdev         make_edges_in_order(nodes, *this);
240549e08aacStbbdev     }
240649e08aacStbbdev #endif
240749e08aacStbbdev 
240849e08aacStbbdev     // Copy constructor
indexer_node(const indexer_node & other)240949e08aacStbbdev     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
241049e08aacStbbdev         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
241149e08aacStbbdev                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
241249e08aacStbbdev     }
241349e08aacStbbdev 
241449e08aacStbbdev };
241549e08aacStbbdev 
241649e08aacStbbdev template<typename T0, typename T1, typename T2, typename T3>
241749e08aacStbbdev class indexer_node<T0, T1, T2, T3> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3> > {
241849e08aacStbbdev private:
241949e08aacStbbdev     static const int N = 4;
242049e08aacStbbdev public:
242149e08aacStbbdev     typedef std::tuple<T0, T1, T2, T3> InputTuple;
242249e08aacStbbdev     typedef tagged_msg<size_t, T0, T1, T2, T3> output_type;
242349e08aacStbbdev     typedef unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)242449e08aacStbbdev     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
242549e08aacStbbdev         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
242649e08aacStbbdev                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
242749e08aacStbbdev     }
242849e08aacStbbdev 
242949e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
243049e08aacStbbdev     template <typename... Args>
indexer_node(const node_set<Args...> & nodes)243149e08aacStbbdev     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
243249e08aacStbbdev         make_edges_in_order(nodes, *this);
243349e08aacStbbdev     }
243449e08aacStbbdev #endif
243549e08aacStbbdev 
243649e08aacStbbdev     // Copy constructor
indexer_node(const indexer_node & other)243749e08aacStbbdev     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
243849e08aacStbbdev         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
243949e08aacStbbdev                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
244049e08aacStbbdev     }
244149e08aacStbbdev 
244249e08aacStbbdev };
244349e08aacStbbdev 
244449e08aacStbbdev template<typename T0, typename T1, typename T2, typename T3, typename T4>
244549e08aacStbbdev class indexer_node<T0, T1, T2, T3, T4> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4> > {
244649e08aacStbbdev private:
244749e08aacStbbdev     static const int N = 5;
244849e08aacStbbdev public:
244949e08aacStbbdev     typedef std::tuple<T0, T1, T2, T3, T4> InputTuple;
245049e08aacStbbdev     typedef tagged_msg<size_t, T0, T1, T2, T3, T4> output_type;
245149e08aacStbbdev     typedef unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)245249e08aacStbbdev     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
245349e08aacStbbdev         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
245449e08aacStbbdev                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
245549e08aacStbbdev     }
245649e08aacStbbdev 
245749e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
245849e08aacStbbdev     template <typename... Args>
indexer_node(const node_set<Args...> & nodes)245949e08aacStbbdev     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
246049e08aacStbbdev         make_edges_in_order(nodes, *this);
246149e08aacStbbdev     }
246249e08aacStbbdev #endif
246349e08aacStbbdev 
246449e08aacStbbdev     // Copy constructor
indexer_node(const indexer_node & other)246549e08aacStbbdev     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
246649e08aacStbbdev         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
246749e08aacStbbdev                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
246849e08aacStbbdev     }
246949e08aacStbbdev 
247049e08aacStbbdev };
247149e08aacStbbdev 
247249e08aacStbbdev #if __TBB_VARIADIC_MAX >= 6
247349e08aacStbbdev template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5>
247449e08aacStbbdev class indexer_node<T0, T1, T2, T3, T4, T5> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5> > {
247549e08aacStbbdev private:
247649e08aacStbbdev     static const int N = 6;
247749e08aacStbbdev public:
247849e08aacStbbdev     typedef std::tuple<T0, T1, T2, T3, T4, T5> InputTuple;
247949e08aacStbbdev     typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5> output_type;
248049e08aacStbbdev     typedef unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)248149e08aacStbbdev     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
248249e08aacStbbdev         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
248349e08aacStbbdev                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
248449e08aacStbbdev     }
248549e08aacStbbdev 
248649e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
248749e08aacStbbdev     template <typename... Args>
indexer_node(const node_set<Args...> & nodes)248849e08aacStbbdev     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
248949e08aacStbbdev         make_edges_in_order(nodes, *this);
249049e08aacStbbdev     }
249149e08aacStbbdev #endif
249249e08aacStbbdev 
249349e08aacStbbdev     // Copy constructor
indexer_node(const indexer_node & other)249449e08aacStbbdev     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
249549e08aacStbbdev         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
249649e08aacStbbdev                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
249749e08aacStbbdev     }
249849e08aacStbbdev 
249949e08aacStbbdev };
250049e08aacStbbdev #endif //variadic max 6
250149e08aacStbbdev 
250249e08aacStbbdev #if __TBB_VARIADIC_MAX >= 7
250349e08aacStbbdev template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
250449e08aacStbbdev          typename T6>
250549e08aacStbbdev class indexer_node<T0, T1, T2, T3, T4, T5, T6> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5, T6> > {
250649e08aacStbbdev private:
250749e08aacStbbdev     static const int N = 7;
250849e08aacStbbdev public:
250949e08aacStbbdev     typedef std::tuple<T0, T1, T2, T3, T4, T5, T6> InputTuple;
251049e08aacStbbdev     typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6> output_type;
251149e08aacStbbdev     typedef unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)251249e08aacStbbdev     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
251349e08aacStbbdev         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
251449e08aacStbbdev                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
251549e08aacStbbdev     }
251649e08aacStbbdev 
251749e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
251849e08aacStbbdev     template <typename... Args>
indexer_node(const node_set<Args...> & nodes)251949e08aacStbbdev     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
252049e08aacStbbdev         make_edges_in_order(nodes, *this);
252149e08aacStbbdev     }
252249e08aacStbbdev #endif
252349e08aacStbbdev 
252449e08aacStbbdev     // Copy constructor
indexer_node(const indexer_node & other)252549e08aacStbbdev     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
252649e08aacStbbdev         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
252749e08aacStbbdev                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
252849e08aacStbbdev     }
252949e08aacStbbdev 
253049e08aacStbbdev };
253149e08aacStbbdev #endif //variadic max 7
253249e08aacStbbdev 
253349e08aacStbbdev #if __TBB_VARIADIC_MAX >= 8
253449e08aacStbbdev template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
253549e08aacStbbdev          typename T6, typename T7>
253649e08aacStbbdev class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5, T6, T7> > {
253749e08aacStbbdev private:
253849e08aacStbbdev     static const int N = 8;
253949e08aacStbbdev public:
254049e08aacStbbdev     typedef std::tuple<T0, T1, T2, T3, T4, T5, T6, T7> InputTuple;
254149e08aacStbbdev     typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7> output_type;
254249e08aacStbbdev     typedef unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)254349e08aacStbbdev     indexer_node(graph& g) : unfolded_type(g) {
254449e08aacStbbdev         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
254549e08aacStbbdev                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
254649e08aacStbbdev     }
254749e08aacStbbdev 
254849e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
254949e08aacStbbdev     template <typename... Args>
indexer_node(const node_set<Args...> & nodes)255049e08aacStbbdev     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
255149e08aacStbbdev         make_edges_in_order(nodes, *this);
255249e08aacStbbdev     }
255349e08aacStbbdev #endif
255449e08aacStbbdev 
255549e08aacStbbdev     // Copy constructor
indexer_node(const indexer_node & other)255649e08aacStbbdev     indexer_node( const indexer_node& other ) : unfolded_type(other) {
255749e08aacStbbdev         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
255849e08aacStbbdev                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
255949e08aacStbbdev     }
256049e08aacStbbdev 
256149e08aacStbbdev };
256249e08aacStbbdev #endif //variadic max 8
256349e08aacStbbdev 
256449e08aacStbbdev #if __TBB_VARIADIC_MAX >= 9
256549e08aacStbbdev template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
256649e08aacStbbdev          typename T6, typename T7, typename T8>
256749e08aacStbbdev class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7, T8> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> > {
256849e08aacStbbdev private:
256949e08aacStbbdev     static const int N = 9;
257049e08aacStbbdev public:
257149e08aacStbbdev     typedef std::tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> InputTuple;
257249e08aacStbbdev     typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8> output_type;
257349e08aacStbbdev     typedef unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)257449e08aacStbbdev     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
257549e08aacStbbdev         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
257649e08aacStbbdev                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
257749e08aacStbbdev     }
257849e08aacStbbdev 
257949e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
258049e08aacStbbdev     template <typename... Args>
indexer_node(const node_set<Args...> & nodes)258149e08aacStbbdev     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
258249e08aacStbbdev         make_edges_in_order(nodes, *this);
258349e08aacStbbdev     }
258449e08aacStbbdev #endif
258549e08aacStbbdev 
258649e08aacStbbdev     // Copy constructor
indexer_node(const indexer_node & other)258749e08aacStbbdev     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
258849e08aacStbbdev         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
258949e08aacStbbdev                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
259049e08aacStbbdev     }
259149e08aacStbbdev 
259249e08aacStbbdev };
259349e08aacStbbdev #endif //variadic max 9
259449e08aacStbbdev 
259549e08aacStbbdev #if __TBB_VARIADIC_MAX >= 10
259649e08aacStbbdev template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
259749e08aacStbbdev          typename T6, typename T7, typename T8, typename T9>
259849e08aacStbbdev class indexer_node/*default*/ : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> > {
259949e08aacStbbdev private:
260049e08aacStbbdev     static const int N = 10;
260149e08aacStbbdev public:
260249e08aacStbbdev     typedef std::tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> InputTuple;
260349e08aacStbbdev     typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> output_type;
260449e08aacStbbdev     typedef unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)260549e08aacStbbdev     __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
260649e08aacStbbdev         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
260749e08aacStbbdev                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
260849e08aacStbbdev     }
260949e08aacStbbdev 
261049e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
261149e08aacStbbdev     template <typename... Args>
indexer_node(const node_set<Args...> & nodes)261249e08aacStbbdev     indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
261349e08aacStbbdev         make_edges_in_order(nodes, *this);
261449e08aacStbbdev     }
261549e08aacStbbdev #endif
261649e08aacStbbdev 
261749e08aacStbbdev     // Copy constructor
indexer_node(const indexer_node & other)261849e08aacStbbdev     __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
261949e08aacStbbdev         fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
262049e08aacStbbdev                                            this->input_ports(), static_cast< sender< output_type > *>(this) );
262149e08aacStbbdev     }
262249e08aacStbbdev 
262349e08aacStbbdev };
262449e08aacStbbdev #endif //variadic max 10
262549e08aacStbbdev 
262649e08aacStbbdev template< typename T >
internal_make_edge(sender<T> & p,receiver<T> & s)262749e08aacStbbdev inline void internal_make_edge( sender<T> &p, receiver<T> &s ) {
262849e08aacStbbdev     register_successor(p, s);
262949e08aacStbbdev     fgt_make_edge( &p, &s );
263049e08aacStbbdev }
263149e08aacStbbdev 
263249e08aacStbbdev //! Makes an edge between a single predecessor and a single successor
263349e08aacStbbdev template< typename T >
make_edge(sender<T> & p,receiver<T> & s)263449e08aacStbbdev inline void make_edge( sender<T> &p, receiver<T> &s ) {
263549e08aacStbbdev     internal_make_edge( p, s );
263649e08aacStbbdev }
263749e08aacStbbdev 
263849e08aacStbbdev //Makes an edge from port 0 of a multi-output predecessor to port 0 of a multi-input successor.
263949e08aacStbbdev template< typename T, typename V,
264049e08aacStbbdev           typename = typename T::output_ports_type, typename = typename V::input_ports_type >
make_edge(T & output,V & input)264149e08aacStbbdev inline void make_edge( T& output, V& input) {
264249e08aacStbbdev     make_edge(std::get<0>(output.output_ports()), std::get<0>(input.input_ports()));
264349e08aacStbbdev }
264449e08aacStbbdev 
264549e08aacStbbdev //Makes an edge from port 0 of a multi-output predecessor to a receiver.
264649e08aacStbbdev template< typename T, typename R,
264749e08aacStbbdev           typename = typename T::output_ports_type >
make_edge(T & output,receiver<R> & input)264849e08aacStbbdev inline void make_edge( T& output, receiver<R>& input) {
264949e08aacStbbdev      make_edge(std::get<0>(output.output_ports()), input);
265049e08aacStbbdev }
265149e08aacStbbdev 
265249e08aacStbbdev //Makes an edge from a sender to port 0 of a multi-input successor.
265349e08aacStbbdev template< typename S,  typename V,
265449e08aacStbbdev           typename = typename V::input_ports_type >
make_edge(sender<S> & output,V & input)265549e08aacStbbdev inline void make_edge( sender<S>& output, V& input) {
265649e08aacStbbdev      make_edge(output, std::get<0>(input.input_ports()));
265749e08aacStbbdev }
265849e08aacStbbdev 
265949e08aacStbbdev template< typename T >
internal_remove_edge(sender<T> & p,receiver<T> & s)266049e08aacStbbdev inline void internal_remove_edge( sender<T> &p, receiver<T> &s ) {
266149e08aacStbbdev     remove_successor( p, s );
266249e08aacStbbdev     fgt_remove_edge( &p, &s );
266349e08aacStbbdev }
266449e08aacStbbdev 
266549e08aacStbbdev //! Removes an edge between a single predecessor and a single successor
266649e08aacStbbdev template< typename T >
remove_edge(sender<T> & p,receiver<T> & s)266749e08aacStbbdev inline void remove_edge( sender<T> &p, receiver<T> &s ) {
266849e08aacStbbdev     internal_remove_edge( p, s );
266949e08aacStbbdev }
267049e08aacStbbdev 
267149e08aacStbbdev //Removes an edge between port 0 of a multi-output predecessor and port 0 of a multi-input successor.
267249e08aacStbbdev template< typename T, typename V,
267349e08aacStbbdev           typename = typename T::output_ports_type, typename = typename V::input_ports_type >
remove_edge(T & output,V & input)267449e08aacStbbdev inline void remove_edge( T& output, V& input) {
267549e08aacStbbdev     remove_edge(std::get<0>(output.output_ports()), std::get<0>(input.input_ports()));
267649e08aacStbbdev }
267749e08aacStbbdev 
267849e08aacStbbdev //Removes an edge between port 0 of a multi-output predecessor and a receiver.
267949e08aacStbbdev template< typename T, typename R,
268049e08aacStbbdev           typename = typename T::output_ports_type >
remove_edge(T & output,receiver<R> & input)268149e08aacStbbdev inline void remove_edge( T& output, receiver<R>& input) {
268249e08aacStbbdev      remove_edge(std::get<0>(output.output_ports()), input);
268349e08aacStbbdev }
268449e08aacStbbdev //Removes an edge between a sender and port 0 of a multi-input successor.
268549e08aacStbbdev template< typename S,  typename V,
268649e08aacStbbdev           typename = typename V::input_ports_type >
remove_edge(sender<S> & output,V & input)268749e08aacStbbdev inline void remove_edge( sender<S>& output, V& input) {
268849e08aacStbbdev      remove_edge(output, std::get<0>(input.input_ports()));
268949e08aacStbbdev }
269049e08aacStbbdev 
269149e08aacStbbdev //! Returns a copy of the body from a function or continue node
269249e08aacStbbdev template< typename Body, typename Node >
copy_body(Node & n)269349e08aacStbbdev Body copy_body( Node &n ) {
269449e08aacStbbdev     return n.template copy_function_object<Body>();
269549e08aacStbbdev }
269649e08aacStbbdev 
269749e08aacStbbdev //composite_node
269849e08aacStbbdev template< typename InputTuple, typename OutputTuple > class composite_node;
269949e08aacStbbdev 
270049e08aacStbbdev template< typename... InputTypes, typename... OutputTypes>
270149e08aacStbbdev class composite_node <std::tuple<InputTypes...>, std::tuple<OutputTypes...> > : public graph_node {
270249e08aacStbbdev 
270349e08aacStbbdev public:
270449e08aacStbbdev     typedef std::tuple< receiver<InputTypes>&... > input_ports_type;
270549e08aacStbbdev     typedef std::tuple< sender<OutputTypes>&... > output_ports_type;
270649e08aacStbbdev 
270749e08aacStbbdev private:
270849e08aacStbbdev     std::unique_ptr<input_ports_type> my_input_ports;
270949e08aacStbbdev     std::unique_ptr<output_ports_type> my_output_ports;
271049e08aacStbbdev 
271149e08aacStbbdev     static const size_t NUM_INPUTS = sizeof...(InputTypes);
271249e08aacStbbdev     static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
271349e08aacStbbdev 
271449e08aacStbbdev protected:
reset_node(reset_flags)271549e08aacStbbdev     void reset_node(reset_flags) override {}
271649e08aacStbbdev 
271749e08aacStbbdev public:
composite_node(graph & g)271849e08aacStbbdev     composite_node( graph &g ) : graph_node(g) {
271949e08aacStbbdev         fgt_multiinput_multioutput_node( CODEPTR(), FLOW_COMPOSITE_NODE, this, &this->my_graph );
272049e08aacStbbdev     }
272149e08aacStbbdev 
272249e08aacStbbdev     template<typename T1, typename T2>
set_external_ports(T1 && input_ports_tuple,T2 && output_ports_tuple)272349e08aacStbbdev     void set_external_ports(T1&& input_ports_tuple, T2&& output_ports_tuple) {
272449e08aacStbbdev         static_assert(NUM_INPUTS == std::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
272549e08aacStbbdev         static_assert(NUM_OUTPUTS == std::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
272649e08aacStbbdev 
272749e08aacStbbdev         fgt_internal_input_alias_helper<T1, NUM_INPUTS>::alias_port( this, input_ports_tuple);
272849e08aacStbbdev         fgt_internal_output_alias_helper<T2, NUM_OUTPUTS>::alias_port( this, output_ports_tuple);
272949e08aacStbbdev 
273049e08aacStbbdev         my_input_ports.reset( new input_ports_type(std::forward<T1>(input_ports_tuple)) );
273149e08aacStbbdev         my_output_ports.reset( new output_ports_type(std::forward<T2>(output_ports_tuple)) );
273249e08aacStbbdev     }
273349e08aacStbbdev 
273449e08aacStbbdev     template< typename... NodeTypes >
add_visible_nodes(const NodeTypes &...n)273549e08aacStbbdev     void add_visible_nodes(const NodeTypes&... n) { add_nodes_impl(this, true, n...); }
273649e08aacStbbdev 
273749e08aacStbbdev     template< typename... NodeTypes >
add_nodes(const NodeTypes &...n)273849e08aacStbbdev     void add_nodes(const NodeTypes&... n) { add_nodes_impl(this, false, n...); }
273949e08aacStbbdev 
274049e08aacStbbdev 
input_ports()274149e08aacStbbdev     input_ports_type& input_ports() {
274249e08aacStbbdev          __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
274349e08aacStbbdev          return *my_input_ports;
274449e08aacStbbdev     }
274549e08aacStbbdev 
output_ports()274649e08aacStbbdev     output_ports_type& output_ports() {
274749e08aacStbbdev          __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
274849e08aacStbbdev          return *my_output_ports;
274949e08aacStbbdev     }
275049e08aacStbbdev };  // class composite_node
275149e08aacStbbdev 
275249e08aacStbbdev //composite_node with only input ports
275349e08aacStbbdev template< typename... InputTypes>
275449e08aacStbbdev class composite_node <std::tuple<InputTypes...>, std::tuple<> > : public graph_node {
275549e08aacStbbdev public:
275649e08aacStbbdev     typedef std::tuple< receiver<InputTypes>&... > input_ports_type;
275749e08aacStbbdev 
275849e08aacStbbdev private:
275949e08aacStbbdev     std::unique_ptr<input_ports_type> my_input_ports;
276049e08aacStbbdev     static const size_t NUM_INPUTS = sizeof...(InputTypes);
276149e08aacStbbdev 
276249e08aacStbbdev protected:
reset_node(reset_flags)276349e08aacStbbdev     void reset_node(reset_flags) override {}
276449e08aacStbbdev 
276549e08aacStbbdev public:
composite_node(graph & g)276649e08aacStbbdev     composite_node( graph &g ) : graph_node(g) {
276749e08aacStbbdev         fgt_composite( CODEPTR(), this, &g );
276849e08aacStbbdev     }
276949e08aacStbbdev 
277049e08aacStbbdev    template<typename T>
set_external_ports(T && input_ports_tuple)277149e08aacStbbdev    void set_external_ports(T&& input_ports_tuple) {
277249e08aacStbbdev        static_assert(NUM_INPUTS == std::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
277349e08aacStbbdev 
277449e08aacStbbdev        fgt_internal_input_alias_helper<T, NUM_INPUTS>::alias_port( this, input_ports_tuple);
277549e08aacStbbdev 
277649e08aacStbbdev        my_input_ports.reset( new input_ports_type(std::forward<T>(input_ports_tuple)) );
277749e08aacStbbdev    }
277849e08aacStbbdev 
277949e08aacStbbdev     template< typename... NodeTypes >
add_visible_nodes(const NodeTypes &...n)278049e08aacStbbdev     void add_visible_nodes(const NodeTypes&... n) { add_nodes_impl(this, true, n...); }
278149e08aacStbbdev 
278249e08aacStbbdev     template< typename... NodeTypes >
add_nodes(const NodeTypes &...n)278349e08aacStbbdev     void add_nodes( const NodeTypes&... n) { add_nodes_impl(this, false, n...); }
278449e08aacStbbdev 
278549e08aacStbbdev 
input_ports()278649e08aacStbbdev     input_ports_type& input_ports() {
278749e08aacStbbdev          __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
278849e08aacStbbdev          return *my_input_ports;
278949e08aacStbbdev     }
279049e08aacStbbdev 
279149e08aacStbbdev };  // class composite_node
279249e08aacStbbdev 
279349e08aacStbbdev //composite_nodes with only output_ports
279449e08aacStbbdev template<typename... OutputTypes>
279549e08aacStbbdev class composite_node <std::tuple<>, std::tuple<OutputTypes...> > : public graph_node {
279649e08aacStbbdev public:
279749e08aacStbbdev     typedef std::tuple< sender<OutputTypes>&... > output_ports_type;
279849e08aacStbbdev 
279949e08aacStbbdev private:
280049e08aacStbbdev     std::unique_ptr<output_ports_type> my_output_ports;
280149e08aacStbbdev     static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
280249e08aacStbbdev 
280349e08aacStbbdev protected:
reset_node(reset_flags)280449e08aacStbbdev     void reset_node(reset_flags) override {}
280549e08aacStbbdev 
280649e08aacStbbdev public:
composite_node(graph & g)280749e08aacStbbdev     __TBB_NOINLINE_SYM composite_node( graph &g ) : graph_node(g) {
280849e08aacStbbdev         fgt_composite( CODEPTR(), this, &g );
280949e08aacStbbdev     }
281049e08aacStbbdev 
281149e08aacStbbdev    template<typename T>
set_external_ports(T && output_ports_tuple)281249e08aacStbbdev    void set_external_ports(T&& output_ports_tuple) {
281349e08aacStbbdev        static_assert(NUM_OUTPUTS == std::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
281449e08aacStbbdev 
281549e08aacStbbdev        fgt_internal_output_alias_helper<T, NUM_OUTPUTS>::alias_port( this, output_ports_tuple);
281649e08aacStbbdev 
281749e08aacStbbdev        my_output_ports.reset( new output_ports_type(std::forward<T>(output_ports_tuple)) );
281849e08aacStbbdev    }
281949e08aacStbbdev 
282049e08aacStbbdev     template<typename... NodeTypes >
add_visible_nodes(const NodeTypes &...n)282149e08aacStbbdev     void add_visible_nodes(const NodeTypes&... n) { add_nodes_impl(this, true, n...); }
282249e08aacStbbdev 
282349e08aacStbbdev     template<typename... NodeTypes >
add_nodes(const NodeTypes &...n)282449e08aacStbbdev     void add_nodes(const NodeTypes&... n) { add_nodes_impl(this, false, n...); }
282549e08aacStbbdev 
282649e08aacStbbdev 
output_ports()282749e08aacStbbdev     output_ports_type& output_ports() {
282849e08aacStbbdev          __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
282949e08aacStbbdev          return *my_output_ports;
283049e08aacStbbdev     }
283149e08aacStbbdev 
283249e08aacStbbdev };  // class composite_node
283349e08aacStbbdev 
283449e08aacStbbdev template<typename Gateway>
283549e08aacStbbdev class async_body_base: no_assign {
283649e08aacStbbdev public:
283749e08aacStbbdev     typedef Gateway gateway_type;
283849e08aacStbbdev 
async_body_base(gateway_type * gateway)283949e08aacStbbdev     async_body_base(gateway_type *gateway): my_gateway(gateway) { }
set_gateway(gateway_type * gateway)284049e08aacStbbdev     void set_gateway(gateway_type *gateway) {
284149e08aacStbbdev         my_gateway = gateway;
284249e08aacStbbdev     }
284349e08aacStbbdev 
284449e08aacStbbdev protected:
284549e08aacStbbdev     gateway_type *my_gateway;
284649e08aacStbbdev };
284749e08aacStbbdev 
284849e08aacStbbdev template<typename Input, typename Ports, typename Gateway, typename Body>
284949e08aacStbbdev class async_body: public async_body_base<Gateway> {
2850324afd9eSIlya Mishin private:
2851324afd9eSIlya Mishin     Body my_body;
2852324afd9eSIlya Mishin 
285349e08aacStbbdev public:
285449e08aacStbbdev     typedef async_body_base<Gateway> base_type;
285549e08aacStbbdev     typedef Gateway gateway_type;
285649e08aacStbbdev 
async_body(const Body & body,gateway_type * gateway)285749e08aacStbbdev     async_body(const Body &body, gateway_type *gateway)
285849e08aacStbbdev         : base_type(gateway), my_body(body) { }
285949e08aacStbbdev 
operator()2860*a088cfa0SKonstantin Boyarinov     void operator()( const Input &v, Ports & ) noexcept(noexcept(tbb::detail::invoke(my_body, v, std::declval<gateway_type&>()))) {
2861*a088cfa0SKonstantin Boyarinov         tbb::detail::invoke(my_body, v, *this->my_gateway);
286249e08aacStbbdev     }
286349e08aacStbbdev 
get_body()286449e08aacStbbdev     Body get_body() { return my_body; }
286549e08aacStbbdev };
286649e08aacStbbdev 
286749e08aacStbbdev //! Implements async node
286849e08aacStbbdev template < typename Input, typename Output,
286949e08aacStbbdev            typename Policy = queueing_lightweight >
__TBB_requires(std::default_initializable<Input> && std::copy_constructible<Input>)2870478de5b1Stbbdev     __TBB_requires(std::default_initializable<Input> && std::copy_constructible<Input>)
287149e08aacStbbdev class async_node
287249e08aacStbbdev     : public multifunction_node< Input, std::tuple< Output >, Policy >, public sender< Output >
287349e08aacStbbdev {
287449e08aacStbbdev     typedef multifunction_node< Input, std::tuple< Output >, Policy > base_type;
287549e08aacStbbdev     typedef multifunction_input<
287649e08aacStbbdev         Input, typename base_type::output_ports_type, Policy, cache_aligned_allocator<Input>> mfn_input_type;
287749e08aacStbbdev 
287849e08aacStbbdev public:
287949e08aacStbbdev     typedef Input input_type;
288049e08aacStbbdev     typedef Output output_type;
288149e08aacStbbdev     typedef receiver<input_type> receiver_type;
288249e08aacStbbdev     typedef receiver<output_type> successor_type;
288349e08aacStbbdev     typedef sender<input_type> predecessor_type;
288449e08aacStbbdev     typedef receiver_gateway<output_type> gateway_type;
288549e08aacStbbdev     typedef async_body_base<gateway_type> async_body_base_type;
288649e08aacStbbdev     typedef typename base_type::output_ports_type output_ports_type;
288749e08aacStbbdev 
288849e08aacStbbdev private:
288949e08aacStbbdev     class receiver_gateway_impl: public receiver_gateway<Output> {
289049e08aacStbbdev     public:
289149e08aacStbbdev         receiver_gateway_impl(async_node* node): my_node(node) {}
289249e08aacStbbdev         void reserve_wait() override {
289349e08aacStbbdev             fgt_async_reserve(static_cast<typename async_node::receiver_type *>(my_node), &my_node->my_graph);
289449e08aacStbbdev             my_node->my_graph.reserve_wait();
289549e08aacStbbdev         }
289649e08aacStbbdev 
289749e08aacStbbdev         void release_wait() override {
2898b15aabb3Stbbdev             async_node* n = my_node;
2899b15aabb3Stbbdev             graph* g = &n->my_graph;
2900b15aabb3Stbbdev             g->release_wait();
2901b15aabb3Stbbdev             fgt_async_commit(static_cast<typename async_node::receiver_type *>(n), g);
290249e08aacStbbdev         }
290349e08aacStbbdev 
290449e08aacStbbdev         //! Implements gateway_type::try_put for an external activity to submit a message to FG
290549e08aacStbbdev         bool try_put(const Output &i) override {
290649e08aacStbbdev             return my_node->try_put_impl(i);
290749e08aacStbbdev         }
290849e08aacStbbdev 
290949e08aacStbbdev     private:
291049e08aacStbbdev         async_node* my_node;
291149e08aacStbbdev     } my_gateway;
291249e08aacStbbdev 
291349e08aacStbbdev     //The substitute of 'this' for member construction, to prevent compiler warnings
291449e08aacStbbdev     async_node* self() { return this; }
291549e08aacStbbdev 
291649e08aacStbbdev     //! Implements gateway_type::try_put for an external activity to submit a message to FG
291749e08aacStbbdev     bool try_put_impl(const Output &i) {
291849e08aacStbbdev         multifunction_output<Output> &port_0 = output_port<0>(*this);
291949e08aacStbbdev         broadcast_cache<output_type>& port_successors = port_0.successors();
292049e08aacStbbdev         fgt_async_try_put_begin(this, &port_0);
292149e08aacStbbdev         // TODO revamp: change to std::list<graph_task*>
292249e08aacStbbdev         graph_task_list tasks;
292349e08aacStbbdev         bool is_at_least_one_put_successful = port_successors.gather_successful_try_puts(i, tasks);
292449e08aacStbbdev         __TBB_ASSERT( is_at_least_one_put_successful || tasks.empty(),
292549e08aacStbbdev                       "Return status is inconsistent with the method operation." );
292649e08aacStbbdev 
292749e08aacStbbdev         while( !tasks.empty() ) {
292849e08aacStbbdev             enqueue_in_graph_arena(this->my_graph, tasks.pop_front());
292949e08aacStbbdev         }
293049e08aacStbbdev         fgt_async_try_put_end(this, &port_0);
293149e08aacStbbdev         return is_at_least_one_put_successful;
293249e08aacStbbdev     }
293349e08aacStbbdev 
293449e08aacStbbdev public:
293549e08aacStbbdev     template<typename Body>
2936478de5b1Stbbdev         __TBB_requires(async_node_body<Body, input_type, gateway_type>)
293749e08aacStbbdev     __TBB_NOINLINE_SYM async_node(
293849e08aacStbbdev         graph &g, size_t concurrency,
293949e08aacStbbdev         Body body, Policy = Policy(), node_priority_t a_priority = no_priority
294049e08aacStbbdev     ) : base_type(
294149e08aacStbbdev         g, concurrency,
294249e08aacStbbdev         async_body<Input, typename base_type::output_ports_type, gateway_type, Body>
294349e08aacStbbdev         (body, &my_gateway), a_priority ), my_gateway(self()) {
294449e08aacStbbdev         fgt_multioutput_node_with_body<1>(
294549e08aacStbbdev             CODEPTR(), FLOW_ASYNC_NODE,
294649e08aacStbbdev             &this->my_graph, static_cast<receiver<input_type> *>(this),
294749e08aacStbbdev             this->output_ports(), this->my_body
294849e08aacStbbdev         );
294949e08aacStbbdev     }
295049e08aacStbbdev 
2951478de5b1Stbbdev     template <typename Body>
2952478de5b1Stbbdev         __TBB_requires(async_node_body<Body, input_type, gateway_type>)
2953b15aabb3Stbbdev     __TBB_NOINLINE_SYM async_node(graph& g, size_t concurrency, Body body, node_priority_t a_priority)
2954b15aabb3Stbbdev         : async_node(g, concurrency, body, Policy(), a_priority) {}
295549e08aacStbbdev 
295649e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
295749e08aacStbbdev     template <typename Body, typename... Args>
2958478de5b1Stbbdev         __TBB_requires(async_node_body<Body, input_type, gateway_type>)
295949e08aacStbbdev     __TBB_NOINLINE_SYM async_node(
296049e08aacStbbdev         const node_set<Args...>& nodes, size_t concurrency, Body body,
296149e08aacStbbdev         Policy = Policy(), node_priority_t a_priority = no_priority )
296249e08aacStbbdev         : async_node(nodes.graph_reference(), concurrency, body, a_priority) {
296349e08aacStbbdev         make_edges_in_order(nodes, *this);
296449e08aacStbbdev     }
296549e08aacStbbdev 
296649e08aacStbbdev     template <typename Body, typename... Args>
2967478de5b1Stbbdev         __TBB_requires(async_node_body<Body, input_type, gateway_type>)
2968b15aabb3Stbbdev     __TBB_NOINLINE_SYM async_node(const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t a_priority)
2969b15aabb3Stbbdev         : async_node(nodes, concurrency, body, Policy(), a_priority) {}
297049e08aacStbbdev #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
297149e08aacStbbdev 
297249e08aacStbbdev     __TBB_NOINLINE_SYM async_node( const async_node &other ) : base_type(other), sender<Output>(), my_gateway(self()) {
297349e08aacStbbdev         static_cast<async_body_base_type*>(this->my_body->get_body_ptr())->set_gateway(&my_gateway);
297449e08aacStbbdev         static_cast<async_body_base_type*>(this->my_init_body->get_body_ptr())->set_gateway(&my_gateway);
297549e08aacStbbdev 
297649e08aacStbbdev         fgt_multioutput_node_with_body<1>( CODEPTR(), FLOW_ASYNC_NODE,
297749e08aacStbbdev                 &this->my_graph, static_cast<receiver<input_type> *>(this),
297849e08aacStbbdev                 this->output_ports(), this->my_body );
297949e08aacStbbdev     }
298049e08aacStbbdev 
298149e08aacStbbdev     gateway_type& gateway() {
298249e08aacStbbdev         return my_gateway;
298349e08aacStbbdev     }
298449e08aacStbbdev 
298549e08aacStbbdev     // Define sender< Output >
298649e08aacStbbdev 
298749e08aacStbbdev     //! Add a new successor to this node
298849e08aacStbbdev     bool register_successor(successor_type&) override {
298949e08aacStbbdev         __TBB_ASSERT(false, "Successors must be registered only via ports");
299049e08aacStbbdev         return false;
299149e08aacStbbdev     }
299249e08aacStbbdev 
299349e08aacStbbdev     //! Removes a successor from this node
299449e08aacStbbdev     bool remove_successor(successor_type&) override {
299549e08aacStbbdev         __TBB_ASSERT(false, "Successors must be removed only via ports");
299649e08aacStbbdev         return false;
299749e08aacStbbdev     }
299849e08aacStbbdev 
299949e08aacStbbdev     template<typename Body>
300049e08aacStbbdev     Body copy_function_object() {
300149e08aacStbbdev         typedef multifunction_body<input_type, typename base_type::output_ports_type> mfn_body_type;
300249e08aacStbbdev         typedef async_body<Input, typename base_type::output_ports_type, gateway_type, Body> async_body_type;
300349e08aacStbbdev         mfn_body_type &body_ref = *this->my_body;
300449e08aacStbbdev         async_body_type ab = *static_cast<async_body_type*>(dynamic_cast< multifunction_body_leaf<input_type, typename base_type::output_ports_type, async_body_type> & >(body_ref).get_body_ptr());
300549e08aacStbbdev         return ab.get_body();
300649e08aacStbbdev     }
300749e08aacStbbdev 
300849e08aacStbbdev protected:
300949e08aacStbbdev 
301049e08aacStbbdev     void reset_node( reset_flags f) override {
301149e08aacStbbdev        base_type::reset_node(f);
301249e08aacStbbdev     }
301349e08aacStbbdev };
301449e08aacStbbdev 
301549e08aacStbbdev #include "detail/_flow_graph_node_set_impl.h"
301649e08aacStbbdev 
301749e08aacStbbdev template< typename T >
301849e08aacStbbdev class overwrite_node : public graph_node, public receiver<T>, public sender<T> {
301949e08aacStbbdev public:
302049e08aacStbbdev     typedef T input_type;
302149e08aacStbbdev     typedef T output_type;
302249e08aacStbbdev     typedef typename receiver<input_type>::predecessor_type predecessor_type;
302349e08aacStbbdev     typedef typename sender<output_type>::successor_type successor_type;
302449e08aacStbbdev 
overwrite_node(graph & g)302549e08aacStbbdev     __TBB_NOINLINE_SYM explicit overwrite_node(graph &g)
302649e08aacStbbdev         : graph_node(g), my_successors(this), my_buffer_is_valid(false)
302749e08aacStbbdev     {
302849e08aacStbbdev         fgt_node( CODEPTR(), FLOW_OVERWRITE_NODE, &this->my_graph,
302949e08aacStbbdev                   static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
303049e08aacStbbdev     }
303149e08aacStbbdev 
303249e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
303349e08aacStbbdev     template <typename... Args>
overwrite_node(const node_set<Args...> & nodes)303449e08aacStbbdev     overwrite_node(const node_set<Args...>& nodes) : overwrite_node(nodes.graph_reference()) {
303549e08aacStbbdev         make_edges_in_order(nodes, *this);
303649e08aacStbbdev     }
303749e08aacStbbdev #endif
303849e08aacStbbdev 
303949e08aacStbbdev     //! Copy constructor; doesn't take anything from src; default won't work
overwrite_node(const overwrite_node & src)304049e08aacStbbdev     __TBB_NOINLINE_SYM overwrite_node( const overwrite_node& src ) : overwrite_node(src.my_graph) {}
304149e08aacStbbdev 
~overwrite_node()304249e08aacStbbdev     ~overwrite_node() {}
304349e08aacStbbdev 
register_successor(successor_type & s)304449e08aacStbbdev     bool register_successor( successor_type &s ) override {
304549e08aacStbbdev         spin_mutex::scoped_lock l( my_mutex );
304649e08aacStbbdev         if (my_buffer_is_valid && is_graph_active( my_graph )) {
304749e08aacStbbdev             // We have a valid value that must be forwarded immediately.
304849e08aacStbbdev             bool ret = s.try_put( my_buffer );
304949e08aacStbbdev             if ( ret ) {
305049e08aacStbbdev                 // We add the successor that accepted our put
305149e08aacStbbdev                 my_successors.register_successor( s );
305249e08aacStbbdev             } else {
305349e08aacStbbdev                 // In case of reservation a race between the moment of reservation and register_successor can appear,
305449e08aacStbbdev                 // because failed reserve does not mean that register_successor is not ready to put a message immediately.
305549e08aacStbbdev                 // We have some sort of infinite loop: reserving node tries to set pull state for the edge,
305649e08aacStbbdev                 // but overwrite_node tries to return push state back. That is why we have to break this loop with task creation.
305749e08aacStbbdev                 small_object_allocator allocator{};
305849e08aacStbbdev                 typedef register_predecessor_task task_type;
305949e08aacStbbdev                 graph_task* t = allocator.new_object<task_type>(graph_reference(), allocator, *this, s);
306049e08aacStbbdev                 graph_reference().reserve_wait();
306149e08aacStbbdev                 spawn_in_graph_arena( my_graph, *t );
306249e08aacStbbdev             }
306349e08aacStbbdev         } else {
306449e08aacStbbdev             // No valid value yet, just add as successor
306549e08aacStbbdev             my_successors.register_successor( s );
306649e08aacStbbdev         }
306749e08aacStbbdev         return true;
306849e08aacStbbdev     }
306949e08aacStbbdev 
remove_successor(successor_type & s)307049e08aacStbbdev     bool remove_successor( successor_type &s ) override {
307149e08aacStbbdev         spin_mutex::scoped_lock l( my_mutex );
307249e08aacStbbdev         my_successors.remove_successor(s);
307349e08aacStbbdev         return true;
307449e08aacStbbdev     }
307549e08aacStbbdev 
try_get(input_type & v)307649e08aacStbbdev     bool try_get( input_type &v ) override {
307749e08aacStbbdev         spin_mutex::scoped_lock l( my_mutex );
307849e08aacStbbdev         if ( my_buffer_is_valid ) {
307949e08aacStbbdev             v = my_buffer;
308049e08aacStbbdev             return true;
308149e08aacStbbdev         }
308249e08aacStbbdev         return false;
308349e08aacStbbdev     }
308449e08aacStbbdev 
308549e08aacStbbdev     //! Reserves an item
try_reserve(T & v)308649e08aacStbbdev     bool try_reserve( T &v ) override {
308749e08aacStbbdev         return try_get(v);
308849e08aacStbbdev     }
308949e08aacStbbdev 
309049e08aacStbbdev     //! Releases the reserved item
try_release()309149e08aacStbbdev     bool try_release() override { return true; }
309249e08aacStbbdev 
309349e08aacStbbdev     //! Consumes the reserved item
try_consume()309449e08aacStbbdev     bool try_consume() override { return true; }
309549e08aacStbbdev 
is_valid()309649e08aacStbbdev     bool is_valid() {
309749e08aacStbbdev        spin_mutex::scoped_lock l( my_mutex );
309849e08aacStbbdev        return my_buffer_is_valid;
309949e08aacStbbdev     }
310049e08aacStbbdev 
clear()310149e08aacStbbdev     void clear() {
310249e08aacStbbdev        spin_mutex::scoped_lock l( my_mutex );
310349e08aacStbbdev        my_buffer_is_valid = false;
310449e08aacStbbdev     }
310549e08aacStbbdev 
310649e08aacStbbdev protected:
310749e08aacStbbdev 
310849e08aacStbbdev     template< typename R, typename B > friend class run_and_put_task;
310949e08aacStbbdev     template<typename X, typename Y> friend class broadcast_cache;
311049e08aacStbbdev     template<typename X, typename Y> friend class round_robin_cache;
try_put_task(const input_type & v)311149e08aacStbbdev     graph_task* try_put_task( const input_type &v ) override {
311249e08aacStbbdev         spin_mutex::scoped_lock l( my_mutex );
311349e08aacStbbdev         return try_put_task_impl(v);
311449e08aacStbbdev     }
311549e08aacStbbdev 
try_put_task_impl(const input_type & v)311649e08aacStbbdev     graph_task * try_put_task_impl(const input_type &v) {
311749e08aacStbbdev         my_buffer = v;
311849e08aacStbbdev         my_buffer_is_valid = true;
311949e08aacStbbdev         graph_task* rtask = my_successors.try_put_task(v);
312049e08aacStbbdev         if (!rtask) rtask = SUCCESSFULLY_ENQUEUED;
312149e08aacStbbdev         return rtask;
312249e08aacStbbdev     }
312349e08aacStbbdev 
graph_reference()312449e08aacStbbdev     graph& graph_reference() const override {
312549e08aacStbbdev         return my_graph;
312649e08aacStbbdev     }
312749e08aacStbbdev 
312849e08aacStbbdev     //! Breaks an infinite loop between the node reservation and register_successor call
312949e08aacStbbdev     struct register_predecessor_task : public graph_task {
register_predecessor_taskregister_predecessor_task313049e08aacStbbdev         register_predecessor_task(
313149e08aacStbbdev             graph& g, small_object_allocator& allocator, predecessor_type& owner, successor_type& succ)
313249e08aacStbbdev             : graph_task(g, allocator), o(owner), s(succ) {};
313349e08aacStbbdev 
executeregister_predecessor_task313449e08aacStbbdev         task* execute(execution_data& ed) override {
313549e08aacStbbdev             // TODO revamp: investigate why qualification is needed for register_successor() call
313649e08aacStbbdev             using tbb::detail::d1::register_predecessor;
313749e08aacStbbdev             using tbb::detail::d1::register_successor;
313849e08aacStbbdev             if ( !register_predecessor(s, o) ) {
313949e08aacStbbdev                 register_successor(o, s);
314049e08aacStbbdev             }
31416edb5c3aSVladimir Serov             finalize<register_predecessor_task>(ed);
31426edb5c3aSVladimir Serov             return nullptr;
31436edb5c3aSVladimir Serov         }
31446edb5c3aSVladimir Serov 
cancelregister_predecessor_task31456edb5c3aSVladimir Serov         task* cancel(execution_data& ed) override {
31466edb5c3aSVladimir Serov             finalize<register_predecessor_task>(ed);
314749e08aacStbbdev             return nullptr;
314849e08aacStbbdev         }
314949e08aacStbbdev 
315049e08aacStbbdev         predecessor_type& o;
315149e08aacStbbdev         successor_type& s;
315249e08aacStbbdev     };
315349e08aacStbbdev 
315449e08aacStbbdev     spin_mutex my_mutex;
315549e08aacStbbdev     broadcast_cache< input_type, null_rw_mutex > my_successors;
315649e08aacStbbdev     input_type my_buffer;
315749e08aacStbbdev     bool my_buffer_is_valid;
315849e08aacStbbdev 
reset_node(reset_flags f)315949e08aacStbbdev     void reset_node( reset_flags f) override {
316049e08aacStbbdev         my_buffer_is_valid = false;
316149e08aacStbbdev        if (f&rf_clear_edges) {
316249e08aacStbbdev            my_successors.clear();
316349e08aacStbbdev        }
316449e08aacStbbdev     }
316549e08aacStbbdev };  // overwrite_node
316649e08aacStbbdev 
316749e08aacStbbdev template< typename T >
316849e08aacStbbdev class write_once_node : public overwrite_node<T> {
316949e08aacStbbdev public:
317049e08aacStbbdev     typedef T input_type;
317149e08aacStbbdev     typedef T output_type;
317249e08aacStbbdev     typedef overwrite_node<T> base_type;
317349e08aacStbbdev     typedef typename receiver<input_type>::predecessor_type predecessor_type;
317449e08aacStbbdev     typedef typename sender<output_type>::successor_type successor_type;
317549e08aacStbbdev 
317649e08aacStbbdev     //! Constructor
write_once_node(graph & g)317749e08aacStbbdev     __TBB_NOINLINE_SYM explicit write_once_node(graph& g) : base_type(g) {
317849e08aacStbbdev         fgt_node( CODEPTR(), FLOW_WRITE_ONCE_NODE, &(this->my_graph),
317949e08aacStbbdev                                  static_cast<receiver<input_type> *>(this),
318049e08aacStbbdev                                  static_cast<sender<output_type> *>(this) );
318149e08aacStbbdev     }
318249e08aacStbbdev 
318349e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
318449e08aacStbbdev     template <typename... Args>
write_once_node(const node_set<Args...> & nodes)318549e08aacStbbdev     write_once_node(const node_set<Args...>& nodes) : write_once_node(nodes.graph_reference()) {
318649e08aacStbbdev         make_edges_in_order(nodes, *this);
318749e08aacStbbdev     }
318849e08aacStbbdev #endif
318949e08aacStbbdev 
319049e08aacStbbdev     //! Copy constructor: call base class copy constructor
write_once_node(const write_once_node & src)319149e08aacStbbdev     __TBB_NOINLINE_SYM write_once_node( const write_once_node& src ) : base_type(src) {
319249e08aacStbbdev         fgt_node( CODEPTR(), FLOW_WRITE_ONCE_NODE, &(this->my_graph),
319349e08aacStbbdev                                  static_cast<receiver<input_type> *>(this),
319449e08aacStbbdev                                  static_cast<sender<output_type> *>(this) );
319549e08aacStbbdev     }
319649e08aacStbbdev 
319749e08aacStbbdev protected:
319849e08aacStbbdev     template< typename R, typename B > friend class run_and_put_task;
319949e08aacStbbdev     template<typename X, typename Y> friend class broadcast_cache;
320049e08aacStbbdev     template<typename X, typename Y> friend class round_robin_cache;
try_put_task(const T & v)320149e08aacStbbdev     graph_task *try_put_task( const T &v ) override {
320249e08aacStbbdev         spin_mutex::scoped_lock l( this->my_mutex );
320357f524caSIlya Isaev         return this->my_buffer_is_valid ? nullptr : this->try_put_task_impl(v);
320449e08aacStbbdev     }
320549e08aacStbbdev }; // write_once_node
320649e08aacStbbdev 
set_name(const graph & g,const char * name)320749e08aacStbbdev inline void set_name(const graph& g, const char *name) {
320849e08aacStbbdev     fgt_graph_desc(&g, name);
320949e08aacStbbdev }
321049e08aacStbbdev 
321149e08aacStbbdev template <typename Output>
set_name(const input_node<Output> & node,const char * name)321249e08aacStbbdev inline void set_name(const input_node<Output>& node, const char *name) {
321349e08aacStbbdev     fgt_node_desc(&node, name);
321449e08aacStbbdev }
321549e08aacStbbdev 
321649e08aacStbbdev template <typename Input, typename Output, typename Policy>
set_name(const function_node<Input,Output,Policy> & node,const char * name)321749e08aacStbbdev inline void set_name(const function_node<Input, Output, Policy>& node, const char *name) {
321849e08aacStbbdev     fgt_node_desc(&node, name);
321949e08aacStbbdev }
322049e08aacStbbdev 
322149e08aacStbbdev template <typename Output, typename Policy>
set_name(const continue_node<Output,Policy> & node,const char * name)322249e08aacStbbdev inline void set_name(const continue_node<Output,Policy>& node, const char *name) {
322349e08aacStbbdev     fgt_node_desc(&node, name);
322449e08aacStbbdev }
322549e08aacStbbdev 
322649e08aacStbbdev template <typename T>
set_name(const broadcast_node<T> & node,const char * name)322749e08aacStbbdev inline void set_name(const broadcast_node<T>& node, const char *name) {
322849e08aacStbbdev     fgt_node_desc(&node, name);
322949e08aacStbbdev }
323049e08aacStbbdev 
323149e08aacStbbdev template <typename T>
set_name(const buffer_node<T> & node,const char * name)323249e08aacStbbdev inline void set_name(const buffer_node<T>& node, const char *name) {
323349e08aacStbbdev     fgt_node_desc(&node, name);
323449e08aacStbbdev }
323549e08aacStbbdev 
323649e08aacStbbdev template <typename T>
set_name(const queue_node<T> & node,const char * name)323749e08aacStbbdev inline void set_name(const queue_node<T>& node, const char *name) {
323849e08aacStbbdev     fgt_node_desc(&node, name);
323949e08aacStbbdev }
324049e08aacStbbdev 
324149e08aacStbbdev template <typename T>
set_name(const sequencer_node<T> & node,const char * name)324249e08aacStbbdev inline void set_name(const sequencer_node<T>& node, const char *name) {
324349e08aacStbbdev     fgt_node_desc(&node, name);
324449e08aacStbbdev }
324549e08aacStbbdev 
324649e08aacStbbdev template <typename T, typename Compare>
set_name(const priority_queue_node<T,Compare> & node,const char * name)324749e08aacStbbdev inline void set_name(const priority_queue_node<T, Compare>& node, const char *name) {
324849e08aacStbbdev     fgt_node_desc(&node, name);
324949e08aacStbbdev }
325049e08aacStbbdev 
325149e08aacStbbdev template <typename T, typename DecrementType>
set_name(const limiter_node<T,DecrementType> & node,const char * name)325249e08aacStbbdev inline void set_name(const limiter_node<T, DecrementType>& node, const char *name) {
325349e08aacStbbdev     fgt_node_desc(&node, name);
325449e08aacStbbdev }
325549e08aacStbbdev 
325649e08aacStbbdev template <typename OutputTuple, typename JP>
set_name(const join_node<OutputTuple,JP> & node,const char * name)325749e08aacStbbdev inline void set_name(const join_node<OutputTuple, JP>& node, const char *name) {
325849e08aacStbbdev     fgt_node_desc(&node, name);
325949e08aacStbbdev }
326049e08aacStbbdev 
326149e08aacStbbdev template <typename... Types>
set_name(const indexer_node<Types...> & node,const char * name)326249e08aacStbbdev inline void set_name(const indexer_node<Types...>& node, const char *name) {
326349e08aacStbbdev     fgt_node_desc(&node, name);
326449e08aacStbbdev }
326549e08aacStbbdev 
326649e08aacStbbdev template <typename T>
set_name(const overwrite_node<T> & node,const char * name)326749e08aacStbbdev inline void set_name(const overwrite_node<T>& node, const char *name) {
326849e08aacStbbdev     fgt_node_desc(&node, name);
326949e08aacStbbdev }
327049e08aacStbbdev 
327149e08aacStbbdev template <typename T>
set_name(const write_once_node<T> & node,const char * name)327249e08aacStbbdev inline void set_name(const write_once_node<T>& node, const char *name) {
327349e08aacStbbdev     fgt_node_desc(&node, name);
327449e08aacStbbdev }
327549e08aacStbbdev 
327649e08aacStbbdev template<typename Input, typename Output, typename Policy>
set_name(const multifunction_node<Input,Output,Policy> & node,const char * name)327749e08aacStbbdev inline void set_name(const multifunction_node<Input, Output, Policy>& node, const char *name) {
327849e08aacStbbdev     fgt_multioutput_node_desc(&node, name);
327949e08aacStbbdev }
328049e08aacStbbdev 
328149e08aacStbbdev template<typename TupleType>
set_name(const split_node<TupleType> & node,const char * name)328249e08aacStbbdev inline void set_name(const split_node<TupleType>& node, const char *name) {
328349e08aacStbbdev     fgt_multioutput_node_desc(&node, name);
328449e08aacStbbdev }
328549e08aacStbbdev 
328649e08aacStbbdev template< typename InputTuple, typename OutputTuple >
set_name(const composite_node<InputTuple,OutputTuple> & node,const char * name)328749e08aacStbbdev inline void set_name(const composite_node<InputTuple, OutputTuple>& node, const char *name) {
328849e08aacStbbdev     fgt_multiinput_multioutput_node_desc(&node, name);
328949e08aacStbbdev }
329049e08aacStbbdev 
329149e08aacStbbdev template<typename Input, typename Output, typename Policy>
set_name(const async_node<Input,Output,Policy> & node,const char * name)329249e08aacStbbdev inline void set_name(const async_node<Input, Output, Policy>& node, const char *name)
329349e08aacStbbdev {
329449e08aacStbbdev     fgt_multioutput_node_desc(&node, name);
329549e08aacStbbdev }
329649e08aacStbbdev } // d1
329749e08aacStbbdev } // detail
329849e08aacStbbdev } // tbb
329949e08aacStbbdev 
330049e08aacStbbdev 
330149e08aacStbbdev // Include deduction guides for node classes
330249e08aacStbbdev #include "detail/_flow_graph_nodes_deduction.h"
330349e08aacStbbdev 
330449e08aacStbbdev namespace tbb {
330549e08aacStbbdev namespace flow {
330649e08aacStbbdev inline namespace v1 {
330749e08aacStbbdev     using detail::d1::receiver;
330849e08aacStbbdev     using detail::d1::sender;
330949e08aacStbbdev 
331049e08aacStbbdev     using detail::d1::serial;
331149e08aacStbbdev     using detail::d1::unlimited;
331249e08aacStbbdev 
331349e08aacStbbdev     using detail::d1::reset_flags;
331449e08aacStbbdev     using detail::d1::rf_reset_protocol;
331549e08aacStbbdev     using detail::d1::rf_reset_bodies;
331649e08aacStbbdev     using detail::d1::rf_clear_edges;
331749e08aacStbbdev 
331849e08aacStbbdev     using detail::d1::graph;
331949e08aacStbbdev     using detail::d1::graph_node;
332049e08aacStbbdev     using detail::d1::continue_msg;
332149e08aacStbbdev 
332249e08aacStbbdev     using detail::d1::input_node;
332349e08aacStbbdev     using detail::d1::function_node;
332449e08aacStbbdev     using detail::d1::multifunction_node;
332549e08aacStbbdev     using detail::d1::split_node;
332649e08aacStbbdev     using detail::d1::output_port;
332749e08aacStbbdev     using detail::d1::indexer_node;
332849e08aacStbbdev     using detail::d1::tagged_msg;
332949e08aacStbbdev     using detail::d1::cast_to;
333049e08aacStbbdev     using detail::d1::is_a;
333149e08aacStbbdev     using detail::d1::continue_node;
333249e08aacStbbdev     using detail::d1::overwrite_node;
333349e08aacStbbdev     using detail::d1::write_once_node;
333449e08aacStbbdev     using detail::d1::broadcast_node;
333549e08aacStbbdev     using detail::d1::buffer_node;
333649e08aacStbbdev     using detail::d1::queue_node;
333749e08aacStbbdev     using detail::d1::sequencer_node;
333849e08aacStbbdev     using detail::d1::priority_queue_node;
333949e08aacStbbdev     using detail::d1::limiter_node;
334049e08aacStbbdev     using namespace detail::d1::graph_policy_namespace;
334149e08aacStbbdev     using detail::d1::join_node;
334249e08aacStbbdev     using detail::d1::input_port;
334349e08aacStbbdev     using detail::d1::copy_body;
334449e08aacStbbdev     using detail::d1::make_edge;
334549e08aacStbbdev     using detail::d1::remove_edge;
334649e08aacStbbdev     using detail::d1::tag_value;
334749e08aacStbbdev     using detail::d1::composite_node;
334849e08aacStbbdev     using detail::d1::async_node;
334949e08aacStbbdev     using detail::d1::node_priority_t;
335049e08aacStbbdev     using detail::d1::no_priority;
335149e08aacStbbdev 
335249e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
335349e08aacStbbdev     using detail::d1::follows;
335449e08aacStbbdev     using detail::d1::precedes;
335549e08aacStbbdev     using detail::d1::make_node_set;
335649e08aacStbbdev     using detail::d1::make_edges;
335749e08aacStbbdev #endif
335849e08aacStbbdev 
335949e08aacStbbdev } // v1
336049e08aacStbbdev } // flow
336149e08aacStbbdev 
336249e08aacStbbdev     using detail::d1::flow_control;
336349e08aacStbbdev 
336449e08aacStbbdev namespace profiling {
336549e08aacStbbdev     using detail::d1::set_name;
336649e08aacStbbdev } // profiling
336749e08aacStbbdev 
336849e08aacStbbdev } // tbb
336949e08aacStbbdev 
337049e08aacStbbdev 
3371734f0bc0SPablo Romero #if TBB_USE_PROFILING_TOOLS  && ( __unix__ || __APPLE__ )
337249e08aacStbbdev    // We don't do pragma pop here, since it still gives warning on the USER side
337349e08aacStbbdev    #undef __TBB_NOINLINE_SYM
337449e08aacStbbdev #endif
337549e08aacStbbdev 
337649e08aacStbbdev #endif // __TBB_flow_graph_H
3377