149e08aacStbbdev /*
2*a088cfa0SKonstantin Boyarinov Copyright (c) 2005-2023 Intel Corporation
349e08aacStbbdev
449e08aacStbbdev Licensed under the Apache License, Version 2.0 (the "License");
549e08aacStbbdev you may not use this file except in compliance with the License.
649e08aacStbbdev You may obtain a copy of the License at
749e08aacStbbdev
849e08aacStbbdev http://www.apache.org/licenses/LICENSE-2.0
949e08aacStbbdev
1049e08aacStbbdev Unless required by applicable law or agreed to in writing, software
1149e08aacStbbdev distributed under the License is distributed on an "AS IS" BASIS,
1249e08aacStbbdev WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1349e08aacStbbdev See the License for the specific language governing permissions and
1449e08aacStbbdev limitations under the License.
1549e08aacStbbdev */
1649e08aacStbbdev
1749e08aacStbbdev #ifndef __TBB_flow_graph_H
1849e08aacStbbdev #define __TBB_flow_graph_H
1949e08aacStbbdev
2049e08aacStbbdev #include <atomic>
2149e08aacStbbdev #include <memory>
2249e08aacStbbdev #include <type_traits>
2349e08aacStbbdev
2449e08aacStbbdev #include "detail/_config.h"
2549e08aacStbbdev #include "detail/_namespace_injection.h"
2649e08aacStbbdev #include "spin_mutex.h"
2749e08aacStbbdev #include "null_mutex.h"
2849e08aacStbbdev #include "spin_rw_mutex.h"
2949e08aacStbbdev #include "null_rw_mutex.h"
3049e08aacStbbdev #include "detail/_pipeline_filters.h"
3149e08aacStbbdev #include "detail/_task.h"
3249e08aacStbbdev #include "detail/_small_object_pool.h"
3349e08aacStbbdev #include "cache_aligned_allocator.h"
3449e08aacStbbdev #include "detail/_exception.h"
3549e08aacStbbdev #include "detail/_template_helpers.h"
3649e08aacStbbdev #include "detail/_aggregator.h"
3749e08aacStbbdev #include "detail/_allocator_traits.h"
38478de5b1Stbbdev #include "detail/_utils.h"
3949e08aacStbbdev #include "profiling.h"
4049e08aacStbbdev #include "task_arena.h"
4149e08aacStbbdev
42734f0bc0SPablo Romero #if TBB_USE_PROFILING_TOOLS && ( __unix__ || __APPLE__ )
4349e08aacStbbdev #if __INTEL_COMPILER
4449e08aacStbbdev // Disabled warning "routine is both inline and noinline"
4549e08aacStbbdev #pragma warning (push)
4649e08aacStbbdev #pragma warning( disable: 2196 )
4749e08aacStbbdev #endif
4849e08aacStbbdev #define __TBB_NOINLINE_SYM __attribute__((noinline))
4949e08aacStbbdev #else
5049e08aacStbbdev #define __TBB_NOINLINE_SYM
5149e08aacStbbdev #endif
5249e08aacStbbdev
5349e08aacStbbdev #include <tuple>
5449e08aacStbbdev #include <list>
5549e08aacStbbdev #include <queue>
56478de5b1Stbbdev #if __TBB_CPP20_CONCEPTS_PRESENT
57478de5b1Stbbdev #include <concepts>
58478de5b1Stbbdev #endif
5949e08aacStbbdev
6049e08aacStbbdev /** @file
6149e08aacStbbdev \brief The graph related classes and functions
6249e08aacStbbdev
6349e08aacStbbdev There are some applications that best express dependencies as messages
6449e08aacStbbdev passed between nodes in a graph. These messages may contain data or
6549e08aacStbbdev simply act as signals that a predecessors has completed. The graph
6649e08aacStbbdev class and its associated node classes can be used to express such
6749e08aacStbbdev applications.
6849e08aacStbbdev */
6949e08aacStbbdev
7049e08aacStbbdev namespace tbb {
7149e08aacStbbdev namespace detail {
7249e08aacStbbdev
7349e08aacStbbdev namespace d1 {
7449e08aacStbbdev
7549e08aacStbbdev //! An enumeration the provides the two most common concurrency levels: unlimited and serial
7649e08aacStbbdev enum concurrency { unlimited = 0, serial = 1 };
7749e08aacStbbdev
7849e08aacStbbdev //! A generic null type
7949e08aacStbbdev struct null_type {};
8049e08aacStbbdev
8149e08aacStbbdev //! An empty class used for messages that mean "I'm done"
8249e08aacStbbdev class continue_msg {};
8349e08aacStbbdev
84478de5b1Stbbdev } // namespace d1
85478de5b1Stbbdev
86478de5b1Stbbdev #if __TBB_CPP20_CONCEPTS_PRESENT
87478de5b1Stbbdev namespace d0 {
88478de5b1Stbbdev
89478de5b1Stbbdev template <typename ReturnType, typename OutputType>
90478de5b1Stbbdev concept node_body_return_type = std::same_as<OutputType, tbb::detail::d1::continue_msg> ||
91*a088cfa0SKonstantin Boyarinov std::convertible_to<OutputType, ReturnType>;
92478de5b1Stbbdev
93*a088cfa0SKonstantin Boyarinov // TODO: consider using std::invocable here
94478de5b1Stbbdev template <typename Body, typename Output>
95478de5b1Stbbdev concept continue_node_body = std::copy_constructible<Body> &&
requires(Body & body,const tbb::detail::d1::continue_msg & v)96478de5b1Stbbdev requires( Body& body, const tbb::detail::d1::continue_msg& v ) {
97478de5b1Stbbdev { body(v) } -> node_body_return_type<Output>;
98478de5b1Stbbdev };
99478de5b1Stbbdev
100478de5b1Stbbdev template <typename Body, typename Input, typename Output>
101478de5b1Stbbdev concept function_node_body = std::copy_constructible<Body> &&
102*a088cfa0SKonstantin Boyarinov std::invocable<Body&, const Input&> &&
103*a088cfa0SKonstantin Boyarinov node_body_return_type<std::invoke_result_t<Body&, const Input&>, Output>;
104478de5b1Stbbdev
105478de5b1Stbbdev template <typename FunctionObject, typename Input, typename Key>
106478de5b1Stbbdev concept join_node_function_object = std::copy_constructible<FunctionObject> &&
107*a088cfa0SKonstantin Boyarinov std::invocable<FunctionObject&, const Input&> &&
108*a088cfa0SKonstantin Boyarinov std::convertible_to<std::invoke_result_t<FunctionObject&, const Input&>, Key>;
109478de5b1Stbbdev
110478de5b1Stbbdev template <typename Body, typename Output>
111478de5b1Stbbdev concept input_node_body = std::copy_constructible<Body> &&
requires(Body & body,tbb::detail::d1::flow_control & fc)112478de5b1Stbbdev requires( Body& body, tbb::detail::d1::flow_control& fc ) {
113478de5b1Stbbdev { body(fc) } -> adaptive_same_as<Output>;
114478de5b1Stbbdev };
115478de5b1Stbbdev
116478de5b1Stbbdev template <typename Body, typename Input, typename OutputPortsType>
117478de5b1Stbbdev concept multifunction_node_body = std::copy_constructible<Body> &&
118*a088cfa0SKonstantin Boyarinov std::invocable<Body&, const Input&, OutputPortsType&>;
119478de5b1Stbbdev
120478de5b1Stbbdev template <typename Sequencer, typename Value>
121478de5b1Stbbdev concept sequencer = std::copy_constructible<Sequencer> &&
122*a088cfa0SKonstantin Boyarinov std::invocable<Sequencer&, const Value&> &&
123*a088cfa0SKonstantin Boyarinov std::convertible_to<std::invoke_result_t<Sequencer&, const Value&>, std::size_t>;
124478de5b1Stbbdev
125478de5b1Stbbdev template <typename Body, typename Input, typename GatewayType>
126478de5b1Stbbdev concept async_node_body = std::copy_constructible<Body> &&
127*a088cfa0SKonstantin Boyarinov std::invocable<Body&, const Input&, GatewayType&>;
128478de5b1Stbbdev
129478de5b1Stbbdev } // namespace d0
130478de5b1Stbbdev #endif // __TBB_CPP20_CONCEPTS_PRESENT
131478de5b1Stbbdev
132478de5b1Stbbdev namespace d1 {
133478de5b1Stbbdev
13449e08aacStbbdev //! Forward declaration section
13549e08aacStbbdev template< typename T > class sender;
13649e08aacStbbdev template< typename T > class receiver;
13749e08aacStbbdev class continue_receiver;
13849e08aacStbbdev
13949e08aacStbbdev template< typename T, typename U > class limiter_node; // needed for resetting decrementer
14049e08aacStbbdev
14149e08aacStbbdev template<typename T, typename M> class successor_cache;
14249e08aacStbbdev template<typename T, typename M> class broadcast_cache;
14349e08aacStbbdev template<typename T, typename M> class round_robin_cache;
14449e08aacStbbdev template<typename T, typename M> class predecessor_cache;
14549e08aacStbbdev template<typename T, typename M> class reservable_predecessor_cache;
14649e08aacStbbdev
14749e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
14849e08aacStbbdev namespace order {
14949e08aacStbbdev struct following;
15049e08aacStbbdev struct preceding;
15149e08aacStbbdev }
15249e08aacStbbdev template<typename Order, typename... Args> struct node_set;
15349e08aacStbbdev #endif
15449e08aacStbbdev
15549e08aacStbbdev
15649e08aacStbbdev } // namespace d1
15749e08aacStbbdev } // namespace detail
15849e08aacStbbdev } // namespace tbb
15949e08aacStbbdev
16049e08aacStbbdev //! The graph class
16149e08aacStbbdev #include "detail/_flow_graph_impl.h"
16249e08aacStbbdev
16349e08aacStbbdev namespace tbb {
16449e08aacStbbdev namespace detail {
16549e08aacStbbdev namespace d1 {
16649e08aacStbbdev
order_tasks(graph_task * first,graph_task * second)16749e08aacStbbdev static inline std::pair<graph_task*, graph_task*> order_tasks(graph_task* first, graph_task* second) {
16849e08aacStbbdev if (second->priority > first->priority)
16949e08aacStbbdev return std::make_pair(second, first);
17049e08aacStbbdev return std::make_pair(first, second);
17149e08aacStbbdev }
17249e08aacStbbdev
17349e08aacStbbdev // submit task if necessary. Returns the non-enqueued task if there is one.
combine_tasks(graph & g,graph_task * left,graph_task * right)17449e08aacStbbdev static inline graph_task* combine_tasks(graph& g, graph_task* left, graph_task* right) {
17549e08aacStbbdev // if no RHS task, don't change left.
17657f524caSIlya Isaev if (right == nullptr) return left;
17757f524caSIlya Isaev // right != nullptr
17857f524caSIlya Isaev if (left == nullptr) return right;
17949e08aacStbbdev if (left == SUCCESSFULLY_ENQUEUED) return right;
18049e08aacStbbdev // left contains a task
18149e08aacStbbdev if (right != SUCCESSFULLY_ENQUEUED) {
18249e08aacStbbdev // both are valid tasks
18349e08aacStbbdev auto tasks_pair = order_tasks(left, right);
18449e08aacStbbdev spawn_in_graph_arena(g, *tasks_pair.first);
18549e08aacStbbdev return tasks_pair.second;
18649e08aacStbbdev }
18749e08aacStbbdev return left;
18849e08aacStbbdev }
18949e08aacStbbdev
19049e08aacStbbdev //! Pure virtual template class that defines a sender of messages of type T
19149e08aacStbbdev template< typename T >
19249e08aacStbbdev class sender {
19349e08aacStbbdev public:
~sender()19449e08aacStbbdev virtual ~sender() {}
19549e08aacStbbdev
19649e08aacStbbdev //! Request an item from the sender
try_get(T &)19749e08aacStbbdev virtual bool try_get( T & ) { return false; }
19849e08aacStbbdev
19949e08aacStbbdev //! Reserves an item in the sender
try_reserve(T &)20049e08aacStbbdev virtual bool try_reserve( T & ) { return false; }
20149e08aacStbbdev
20249e08aacStbbdev //! Releases the reserved item
try_release()20349e08aacStbbdev virtual bool try_release( ) { return false; }
20449e08aacStbbdev
20549e08aacStbbdev //! Consumes the reserved item
try_consume()20649e08aacStbbdev virtual bool try_consume( ) { return false; }
20749e08aacStbbdev
20849e08aacStbbdev protected:
20949e08aacStbbdev //! The output type of this sender
21049e08aacStbbdev typedef T output_type;
21149e08aacStbbdev
21249e08aacStbbdev //! The successor type for this node
21349e08aacStbbdev typedef receiver<T> successor_type;
21449e08aacStbbdev
21549e08aacStbbdev //! Add a new successor to this node
21649e08aacStbbdev virtual bool register_successor( successor_type &r ) = 0;
21749e08aacStbbdev
21849e08aacStbbdev //! Removes a successor from this node
21949e08aacStbbdev virtual bool remove_successor( successor_type &r ) = 0;
22049e08aacStbbdev
22149e08aacStbbdev template<typename C>
22249e08aacStbbdev friend bool register_successor(sender<C>& s, receiver<C>& r);
22349e08aacStbbdev
22449e08aacStbbdev template<typename C>
22549e08aacStbbdev friend bool remove_successor (sender<C>& s, receiver<C>& r);
22649e08aacStbbdev }; // class sender<T>
22749e08aacStbbdev
22849e08aacStbbdev template<typename C>
register_successor(sender<C> & s,receiver<C> & r)22949e08aacStbbdev bool register_successor(sender<C>& s, receiver<C>& r) {
23049e08aacStbbdev return s.register_successor(r);
23149e08aacStbbdev }
23249e08aacStbbdev
23349e08aacStbbdev template<typename C>
remove_successor(sender<C> & s,receiver<C> & r)23449e08aacStbbdev bool remove_successor(sender<C>& s, receiver<C>& r) {
23549e08aacStbbdev return s.remove_successor(r);
23649e08aacStbbdev }
23749e08aacStbbdev
23849e08aacStbbdev //! Pure virtual template class that defines a receiver of messages of type T
23949e08aacStbbdev template< typename T >
24049e08aacStbbdev class receiver {
24149e08aacStbbdev public:
24249e08aacStbbdev //! Destructor
~receiver()24349e08aacStbbdev virtual ~receiver() {}
24449e08aacStbbdev
24549e08aacStbbdev //! Put an item to the receiver
try_put(const T & t)24649e08aacStbbdev bool try_put( const T& t ) {
24749e08aacStbbdev graph_task *res = try_put_task(t);
24849e08aacStbbdev if (!res) return false;
24949e08aacStbbdev if (res != SUCCESSFULLY_ENQUEUED) spawn_in_graph_arena(graph_reference(), *res);
25049e08aacStbbdev return true;
25149e08aacStbbdev }
25249e08aacStbbdev
25349e08aacStbbdev //! put item to successor; return task to run the successor if possible.
25449e08aacStbbdev protected:
25549e08aacStbbdev //! The input type of this receiver
25649e08aacStbbdev typedef T input_type;
25749e08aacStbbdev
25849e08aacStbbdev //! The predecessor type for this node
25949e08aacStbbdev typedef sender<T> predecessor_type;
26049e08aacStbbdev
26149e08aacStbbdev template< typename R, typename B > friend class run_and_put_task;
26249e08aacStbbdev template< typename X, typename Y > friend class broadcast_cache;
26349e08aacStbbdev template< typename X, typename Y > friend class round_robin_cache;
26449e08aacStbbdev virtual graph_task *try_put_task(const T& t) = 0;
26549e08aacStbbdev virtual graph& graph_reference() const = 0;
26649e08aacStbbdev
26749e08aacStbbdev template<typename TT, typename M> friend class successor_cache;
is_continue_receiver()26849e08aacStbbdev virtual bool is_continue_receiver() { return false; }
26949e08aacStbbdev
27049e08aacStbbdev // TODO revamp: reconsider the inheritance and move node priority out of receiver
priority()27149e08aacStbbdev virtual node_priority_t priority() const { return no_priority; }
27249e08aacStbbdev
27349e08aacStbbdev //! Add a predecessor to the node
register_predecessor(predecessor_type &)27449e08aacStbbdev virtual bool register_predecessor( predecessor_type & ) { return false; }
27549e08aacStbbdev
27649e08aacStbbdev //! Remove a predecessor from the node
remove_predecessor(predecessor_type &)27749e08aacStbbdev virtual bool remove_predecessor( predecessor_type & ) { return false; }
27849e08aacStbbdev
27949e08aacStbbdev template <typename C>
28049e08aacStbbdev friend bool register_predecessor(receiver<C>& r, sender<C>& s);
28149e08aacStbbdev template <typename C>
28249e08aacStbbdev friend bool remove_predecessor (receiver<C>& r, sender<C>& s);
28349e08aacStbbdev }; // class receiver<T>
28449e08aacStbbdev
28549e08aacStbbdev template <typename C>
register_predecessor(receiver<C> & r,sender<C> & s)28649e08aacStbbdev bool register_predecessor(receiver<C>& r, sender<C>& s) {
28749e08aacStbbdev return r.register_predecessor(s);
28849e08aacStbbdev }
28949e08aacStbbdev
29049e08aacStbbdev template <typename C>
remove_predecessor(receiver<C> & r,sender<C> & s)29149e08aacStbbdev bool remove_predecessor(receiver<C>& r, sender<C>& s) {
29249e08aacStbbdev return r.remove_predecessor(s);
29349e08aacStbbdev }
29449e08aacStbbdev
29549e08aacStbbdev //! Base class for receivers of completion messages
29649e08aacStbbdev /** These receivers automatically reset, but cannot be explicitly waited on */
29749e08aacStbbdev class continue_receiver : public receiver< continue_msg > {
29849e08aacStbbdev protected:
29949e08aacStbbdev
30049e08aacStbbdev //! Constructor
continue_receiver(int number_of_predecessors,node_priority_t a_priority)30149e08aacStbbdev explicit continue_receiver( int number_of_predecessors, node_priority_t a_priority ) {
30249e08aacStbbdev my_predecessor_count = my_initial_predecessor_count = number_of_predecessors;
30349e08aacStbbdev my_current_count = 0;
30449e08aacStbbdev my_priority = a_priority;
30549e08aacStbbdev }
30649e08aacStbbdev
30749e08aacStbbdev //! Copy constructor
continue_receiver(const continue_receiver & src)30849e08aacStbbdev continue_receiver( const continue_receiver& src ) : receiver<continue_msg>() {
30949e08aacStbbdev my_predecessor_count = my_initial_predecessor_count = src.my_initial_predecessor_count;
31049e08aacStbbdev my_current_count = 0;
31149e08aacStbbdev my_priority = src.my_priority;
31249e08aacStbbdev }
31349e08aacStbbdev
31449e08aacStbbdev //! Increments the trigger threshold
register_predecessor(predecessor_type &)31549e08aacStbbdev bool register_predecessor( predecessor_type & ) override {
31649e08aacStbbdev spin_mutex::scoped_lock l(my_mutex);
31749e08aacStbbdev ++my_predecessor_count;
31849e08aacStbbdev return true;
31949e08aacStbbdev }
32049e08aacStbbdev
32149e08aacStbbdev //! Decrements the trigger threshold
32249e08aacStbbdev /** Does not check to see if the removal of the predecessor now makes the current count
32349e08aacStbbdev exceed the new threshold. So removing a predecessor while the graph is active can cause
32449e08aacStbbdev unexpected results. */
remove_predecessor(predecessor_type &)32549e08aacStbbdev bool remove_predecessor( predecessor_type & ) override {
32649e08aacStbbdev spin_mutex::scoped_lock l(my_mutex);
32749e08aacStbbdev --my_predecessor_count;
32849e08aacStbbdev return true;
32949e08aacStbbdev }
33049e08aacStbbdev
33149e08aacStbbdev //! The input type
33249e08aacStbbdev typedef continue_msg input_type;
33349e08aacStbbdev
33449e08aacStbbdev //! The predecessor type for this node
33549e08aacStbbdev typedef receiver<input_type>::predecessor_type predecessor_type;
33649e08aacStbbdev
33749e08aacStbbdev template< typename R, typename B > friend class run_and_put_task;
33849e08aacStbbdev template<typename X, typename Y> friend class broadcast_cache;
33949e08aacStbbdev template<typename X, typename Y> friend class round_robin_cache;
34049e08aacStbbdev // execute body is supposed to be too small to create a task for.
try_put_task(const input_type &)34149e08aacStbbdev graph_task* try_put_task( const input_type & ) override {
34249e08aacStbbdev {
34349e08aacStbbdev spin_mutex::scoped_lock l(my_mutex);
34449e08aacStbbdev if ( ++my_current_count < my_predecessor_count )
34549e08aacStbbdev return SUCCESSFULLY_ENQUEUED;
34649e08aacStbbdev else
34749e08aacStbbdev my_current_count = 0;
34849e08aacStbbdev }
34949e08aacStbbdev graph_task* res = execute();
35049e08aacStbbdev return res? res : SUCCESSFULLY_ENQUEUED;
35149e08aacStbbdev }
35249e08aacStbbdev
35349e08aacStbbdev spin_mutex my_mutex;
35449e08aacStbbdev int my_predecessor_count;
35549e08aacStbbdev int my_current_count;
35649e08aacStbbdev int my_initial_predecessor_count;
35749e08aacStbbdev node_priority_t my_priority;
35849e08aacStbbdev // the friend declaration in the base class did not eliminate the "protected class"
35949e08aacStbbdev // error in gcc 4.1.2
36049e08aacStbbdev template<typename U, typename V> friend class limiter_node;
36149e08aacStbbdev
reset_receiver(reset_flags f)36249e08aacStbbdev virtual void reset_receiver( reset_flags f ) {
36349e08aacStbbdev my_current_count = 0;
36449e08aacStbbdev if (f & rf_clear_edges) {
36549e08aacStbbdev my_predecessor_count = my_initial_predecessor_count;
36649e08aacStbbdev }
36749e08aacStbbdev }
36849e08aacStbbdev
36949e08aacStbbdev //! Does whatever should happen when the threshold is reached
37049e08aacStbbdev /** This should be very fast or else spawn a task. This is
37149e08aacStbbdev called while the sender is blocked in the try_put(). */
37249e08aacStbbdev virtual graph_task* execute() = 0;
37349e08aacStbbdev template<typename TT, typename M> friend class successor_cache;
is_continue_receiver()37449e08aacStbbdev bool is_continue_receiver() override { return true; }
37549e08aacStbbdev
priority()37649e08aacStbbdev node_priority_t priority() const override { return my_priority; }
37749e08aacStbbdev }; // class continue_receiver
37849e08aacStbbdev
37949e08aacStbbdev #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
38049e08aacStbbdev template <typename K, typename T>
key_from_message(const T & t)38149e08aacStbbdev K key_from_message( const T &t ) {
38249e08aacStbbdev return t.key();
38349e08aacStbbdev }
38449e08aacStbbdev #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
38549e08aacStbbdev
38649e08aacStbbdev } // d1
38749e08aacStbbdev } // detail
38849e08aacStbbdev } // tbb
38949e08aacStbbdev
39049e08aacStbbdev #include "detail/_flow_graph_trace_impl.h"
39149e08aacStbbdev #include "detail/_hash_compare.h"
39249e08aacStbbdev
39349e08aacStbbdev namespace tbb {
39449e08aacStbbdev namespace detail {
39549e08aacStbbdev namespace d1 {
39649e08aacStbbdev
39749e08aacStbbdev #include "detail/_flow_graph_body_impl.h"
39849e08aacStbbdev #include "detail/_flow_graph_cache_impl.h"
39949e08aacStbbdev #include "detail/_flow_graph_types_impl.h"
40049e08aacStbbdev
40149e08aacStbbdev using namespace graph_policy_namespace;
40249e08aacStbbdev
40349e08aacStbbdev template <typename C, typename N>
graph_iterator(C * g,bool begin)40457f524caSIlya Isaev graph_iterator<C,N>::graph_iterator(C *g, bool begin) : my_graph(g), current_node(nullptr)
40549e08aacStbbdev {
40649e08aacStbbdev if (begin) current_node = my_graph->my_nodes;
40749e08aacStbbdev //else it is an end iterator by default
40849e08aacStbbdev }
40949e08aacStbbdev
41049e08aacStbbdev template <typename C, typename N>
41149e08aacStbbdev typename graph_iterator<C,N>::reference graph_iterator<C,N>::operator*() const {
41249e08aacStbbdev __TBB_ASSERT(current_node, "graph_iterator at end");
41349e08aacStbbdev return *operator->();
41449e08aacStbbdev }
41549e08aacStbbdev
41649e08aacStbbdev template <typename C, typename N>
41749e08aacStbbdev typename graph_iterator<C,N>::pointer graph_iterator<C,N>::operator->() const {
41849e08aacStbbdev return current_node;
41949e08aacStbbdev }
42049e08aacStbbdev
42149e08aacStbbdev template <typename C, typename N>
internal_forward()42249e08aacStbbdev void graph_iterator<C,N>::internal_forward() {
42349e08aacStbbdev if (current_node) current_node = current_node->next;
42449e08aacStbbdev }
42549e08aacStbbdev
42649e08aacStbbdev //! Constructs a graph with isolated task_group_context
graph()42757f524caSIlya Isaev inline graph::graph() : my_wait_context(0), my_nodes(nullptr), my_nodes_last(nullptr), my_task_arena(nullptr) {
42849e08aacStbbdev prepare_task_arena();
42949e08aacStbbdev own_context = true;
43049e08aacStbbdev cancelled = false;
43149e08aacStbbdev caught_exception = false;
43249e08aacStbbdev my_context = new (r1::cache_aligned_allocate(sizeof(task_group_context))) task_group_context(FLOW_TASKS);
43349e08aacStbbdev fgt_graph(this);
43449e08aacStbbdev my_is_active = true;
43549e08aacStbbdev }
43649e08aacStbbdev
graph(task_group_context & use_this_context)43749e08aacStbbdev inline graph::graph(task_group_context& use_this_context) :
43857f524caSIlya Isaev my_wait_context(0), my_context(&use_this_context), my_nodes(nullptr), my_nodes_last(nullptr), my_task_arena(nullptr) {
43949e08aacStbbdev prepare_task_arena();
44049e08aacStbbdev own_context = false;
44149e08aacStbbdev cancelled = false;
44249e08aacStbbdev caught_exception = false;
44349e08aacStbbdev fgt_graph(this);
44449e08aacStbbdev my_is_active = true;
44549e08aacStbbdev }
44649e08aacStbbdev
~graph()44749e08aacStbbdev inline graph::~graph() {
44849e08aacStbbdev wait_for_all();
44949e08aacStbbdev if (own_context) {
45049e08aacStbbdev my_context->~task_group_context();
45149e08aacStbbdev r1::cache_aligned_deallocate(my_context);
45249e08aacStbbdev }
45349e08aacStbbdev delete my_task_arena;
45449e08aacStbbdev }
45549e08aacStbbdev
reserve_wait()45649e08aacStbbdev inline void graph::reserve_wait() {
45749e08aacStbbdev my_wait_context.reserve();
45849e08aacStbbdev fgt_reserve_wait(this);
45949e08aacStbbdev }
46049e08aacStbbdev
release_wait()46149e08aacStbbdev inline void graph::release_wait() {
46249e08aacStbbdev fgt_release_wait(this);
46349e08aacStbbdev my_wait_context.release();
46449e08aacStbbdev }
46549e08aacStbbdev
register_node(graph_node * n)46649e08aacStbbdev inline void graph::register_node(graph_node *n) {
46757f524caSIlya Isaev n->next = nullptr;
46849e08aacStbbdev {
46949e08aacStbbdev spin_mutex::scoped_lock lock(nodelist_mutex);
47049e08aacStbbdev n->prev = my_nodes_last;
47149e08aacStbbdev if (my_nodes_last) my_nodes_last->next = n;
47249e08aacStbbdev my_nodes_last = n;
47349e08aacStbbdev if (!my_nodes) my_nodes = n;
47449e08aacStbbdev }
47549e08aacStbbdev }
47649e08aacStbbdev
remove_node(graph_node * n)47749e08aacStbbdev inline void graph::remove_node(graph_node *n) {
47849e08aacStbbdev {
47949e08aacStbbdev spin_mutex::scoped_lock lock(nodelist_mutex);
48049e08aacStbbdev __TBB_ASSERT(my_nodes && my_nodes_last, "graph::remove_node: Error: no registered nodes");
48149e08aacStbbdev if (n->prev) n->prev->next = n->next;
48249e08aacStbbdev if (n->next) n->next->prev = n->prev;
48349e08aacStbbdev if (my_nodes_last == n) my_nodes_last = n->prev;
48449e08aacStbbdev if (my_nodes == n) my_nodes = n->next;
48549e08aacStbbdev }
48657f524caSIlya Isaev n->prev = n->next = nullptr;
48749e08aacStbbdev }
48849e08aacStbbdev
reset(reset_flags f)48949e08aacStbbdev inline void graph::reset( reset_flags f ) {
49049e08aacStbbdev // reset context
49149e08aacStbbdev deactivate_graph(*this);
49249e08aacStbbdev
49349e08aacStbbdev my_context->reset();
49449e08aacStbbdev cancelled = false;
49549e08aacStbbdev caught_exception = false;
49649e08aacStbbdev // reset all the nodes comprising the graph
49749e08aacStbbdev for(iterator ii = begin(); ii != end(); ++ii) {
49849e08aacStbbdev graph_node *my_p = &(*ii);
49949e08aacStbbdev my_p->reset_node(f);
50049e08aacStbbdev }
50149e08aacStbbdev // Reattach the arena. Might be useful to run the graph in a particular task_arena
50249e08aacStbbdev // while not limiting graph lifetime to a single task_arena::execute() call.
50349e08aacStbbdev prepare_task_arena( /*reinit=*/true );
50449e08aacStbbdev activate_graph(*this);
50549e08aacStbbdev }
50649e08aacStbbdev
cancel()50749e08aacStbbdev inline void graph::cancel() {
50849e08aacStbbdev my_context->cancel_group_execution();
50949e08aacStbbdev }
51049e08aacStbbdev
begin()51149e08aacStbbdev inline graph::iterator graph::begin() { return iterator(this, true); }
51249e08aacStbbdev
end()51349e08aacStbbdev inline graph::iterator graph::end() { return iterator(this, false); }
51449e08aacStbbdev
begin()51549e08aacStbbdev inline graph::const_iterator graph::begin() const { return const_iterator(this, true); }
51649e08aacStbbdev
end()51749e08aacStbbdev inline graph::const_iterator graph::end() const { return const_iterator(this, false); }
51849e08aacStbbdev
cbegin()51949e08aacStbbdev inline graph::const_iterator graph::cbegin() const { return const_iterator(this, true); }
52049e08aacStbbdev
cend()52149e08aacStbbdev inline graph::const_iterator graph::cend() const { return const_iterator(this, false); }
52249e08aacStbbdev
graph_node(graph & g)52349e08aacStbbdev inline graph_node::graph_node(graph& g) : my_graph(g) {
52449e08aacStbbdev my_graph.register_node(this);
52549e08aacStbbdev }
52649e08aacStbbdev
~graph_node()52749e08aacStbbdev inline graph_node::~graph_node() {
52849e08aacStbbdev my_graph.remove_node(this);
52949e08aacStbbdev }
53049e08aacStbbdev
53149e08aacStbbdev #include "detail/_flow_graph_node_impl.h"
53249e08aacStbbdev
53349e08aacStbbdev
53449e08aacStbbdev //! An executable node that acts as a source, i.e. it has no predecessors
53549e08aacStbbdev
53649e08aacStbbdev template < typename Output >
__TBB_requires(std::copyable<Output>)537478de5b1Stbbdev __TBB_requires(std::copyable<Output>)
53849e08aacStbbdev class input_node : public graph_node, public sender< Output > {
53949e08aacStbbdev public:
54049e08aacStbbdev //! The type of the output message, which is complete
54149e08aacStbbdev typedef Output output_type;
54249e08aacStbbdev
54349e08aacStbbdev //! The type of successors of this node
54449e08aacStbbdev typedef typename sender<output_type>::successor_type successor_type;
54549e08aacStbbdev
54649e08aacStbbdev // Input node has no input type
54749e08aacStbbdev typedef null_type input_type;
54849e08aacStbbdev
54949e08aacStbbdev //! Constructor for a node with a successor
55049e08aacStbbdev template< typename Body >
551478de5b1Stbbdev __TBB_requires(input_node_body<Body, Output>)
55249e08aacStbbdev __TBB_NOINLINE_SYM input_node( graph &g, Body body )
55349e08aacStbbdev : graph_node(g), my_active(false)
55449e08aacStbbdev , my_body( new input_body_leaf< output_type, Body>(body) )
55549e08aacStbbdev , my_init_body( new input_body_leaf< output_type, Body>(body) )
55649e08aacStbbdev , my_successors(this), my_reserved(false), my_has_cached_item(false)
55749e08aacStbbdev {
55849e08aacStbbdev fgt_node_with_body(CODEPTR(), FLOW_INPUT_NODE, &this->my_graph,
55949e08aacStbbdev static_cast<sender<output_type> *>(this), this->my_body);
56049e08aacStbbdev }
56149e08aacStbbdev
56249e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
56349e08aacStbbdev template <typename Body, typename... Successors>
564478de5b1Stbbdev __TBB_requires(input_node_body<Body, Output>)
56549e08aacStbbdev input_node( const node_set<order::preceding, Successors...>& successors, Body body )
56649e08aacStbbdev : input_node(successors.graph_reference(), body)
56749e08aacStbbdev {
56849e08aacStbbdev make_edges(*this, successors);
56949e08aacStbbdev }
57049e08aacStbbdev #endif
57149e08aacStbbdev
57249e08aacStbbdev //! Copy constructor
57349e08aacStbbdev __TBB_NOINLINE_SYM input_node( const input_node& src )
57449e08aacStbbdev : graph_node(src.my_graph), sender<Output>()
57549e08aacStbbdev , my_active(false)
57649e08aacStbbdev , my_body(src.my_init_body->clone()), my_init_body(src.my_init_body->clone())
57749e08aacStbbdev , my_successors(this), my_reserved(false), my_has_cached_item(false)
57849e08aacStbbdev {
57949e08aacStbbdev fgt_node_with_body(CODEPTR(), FLOW_INPUT_NODE, &this->my_graph,
58049e08aacStbbdev static_cast<sender<output_type> *>(this), this->my_body);
58149e08aacStbbdev }
58249e08aacStbbdev
58349e08aacStbbdev //! The destructor
58449e08aacStbbdev ~input_node() { delete my_body; delete my_init_body; }
58549e08aacStbbdev
58649e08aacStbbdev //! Add a new successor to this node
58749e08aacStbbdev bool register_successor( successor_type &r ) override {
58849e08aacStbbdev spin_mutex::scoped_lock lock(my_mutex);
58949e08aacStbbdev my_successors.register_successor(r);
59049e08aacStbbdev if ( my_active )
59149e08aacStbbdev spawn_put();
59249e08aacStbbdev return true;
59349e08aacStbbdev }
59449e08aacStbbdev
59549e08aacStbbdev //! Removes a successor from this node
59649e08aacStbbdev bool remove_successor( successor_type &r ) override {
59749e08aacStbbdev spin_mutex::scoped_lock lock(my_mutex);
59849e08aacStbbdev my_successors.remove_successor(r);
59949e08aacStbbdev return true;
60049e08aacStbbdev }
60149e08aacStbbdev
60249e08aacStbbdev //! Request an item from the node
60349e08aacStbbdev bool try_get( output_type &v ) override {
60449e08aacStbbdev spin_mutex::scoped_lock lock(my_mutex);
60549e08aacStbbdev if ( my_reserved )
60649e08aacStbbdev return false;
60749e08aacStbbdev
60849e08aacStbbdev if ( my_has_cached_item ) {
60949e08aacStbbdev v = my_cached_item;
61049e08aacStbbdev my_has_cached_item = false;
61149e08aacStbbdev return true;
61249e08aacStbbdev }
61349e08aacStbbdev // we've been asked to provide an item, but we have none. enqueue a task to
61449e08aacStbbdev // provide one.
61549e08aacStbbdev if ( my_active )
61649e08aacStbbdev spawn_put();
61749e08aacStbbdev return false;
61849e08aacStbbdev }
61949e08aacStbbdev
62049e08aacStbbdev //! Reserves an item.
62149e08aacStbbdev bool try_reserve( output_type &v ) override {
62249e08aacStbbdev spin_mutex::scoped_lock lock(my_mutex);
62349e08aacStbbdev if ( my_reserved ) {
62449e08aacStbbdev return false;
62549e08aacStbbdev }
62649e08aacStbbdev
62749e08aacStbbdev if ( my_has_cached_item ) {
62849e08aacStbbdev v = my_cached_item;
62949e08aacStbbdev my_reserved = true;
63049e08aacStbbdev return true;
63149e08aacStbbdev } else {
63249e08aacStbbdev return false;
63349e08aacStbbdev }
63449e08aacStbbdev }
63549e08aacStbbdev
63649e08aacStbbdev //! Release a reserved item.
63749e08aacStbbdev /** true = item has been released and so remains in sender, dest must request or reserve future items */
63849e08aacStbbdev bool try_release( ) override {
63949e08aacStbbdev spin_mutex::scoped_lock lock(my_mutex);
64049e08aacStbbdev __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
64149e08aacStbbdev my_reserved = false;
64249e08aacStbbdev if(!my_successors.empty())
64349e08aacStbbdev spawn_put();
64449e08aacStbbdev return true;
64549e08aacStbbdev }
64649e08aacStbbdev
64749e08aacStbbdev //! Consumes a reserved item
64849e08aacStbbdev bool try_consume( ) override {
64949e08aacStbbdev spin_mutex::scoped_lock lock(my_mutex);
65049e08aacStbbdev __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
65149e08aacStbbdev my_reserved = false;
65249e08aacStbbdev my_has_cached_item = false;
65349e08aacStbbdev if ( !my_successors.empty() ) {
65449e08aacStbbdev spawn_put();
65549e08aacStbbdev }
65649e08aacStbbdev return true;
65749e08aacStbbdev }
65849e08aacStbbdev
65949e08aacStbbdev //! Activates a node that was created in the inactive state
66049e08aacStbbdev void activate() {
66149e08aacStbbdev spin_mutex::scoped_lock lock(my_mutex);
66249e08aacStbbdev my_active = true;
66349e08aacStbbdev if (!my_successors.empty())
66449e08aacStbbdev spawn_put();
66549e08aacStbbdev }
66649e08aacStbbdev
66749e08aacStbbdev template<typename Body>
66849e08aacStbbdev Body copy_function_object() {
66949e08aacStbbdev input_body<output_type> &body_ref = *this->my_body;
67049e08aacStbbdev return dynamic_cast< input_body_leaf<output_type, Body> & >(body_ref).get_body();
67149e08aacStbbdev }
67249e08aacStbbdev
67349e08aacStbbdev protected:
67449e08aacStbbdev
67549e08aacStbbdev //! resets the input_node to its initial state
67649e08aacStbbdev void reset_node( reset_flags f) override {
67749e08aacStbbdev my_active = false;
67849e08aacStbbdev my_reserved = false;
67949e08aacStbbdev my_has_cached_item = false;
68049e08aacStbbdev
68149e08aacStbbdev if(f & rf_clear_edges) my_successors.clear();
68249e08aacStbbdev if(f & rf_reset_bodies) {
68349e08aacStbbdev input_body<output_type> *tmp = my_init_body->clone();
68449e08aacStbbdev delete my_body;
68549e08aacStbbdev my_body = tmp;
68649e08aacStbbdev }
68749e08aacStbbdev }
68849e08aacStbbdev
68949e08aacStbbdev private:
69049e08aacStbbdev spin_mutex my_mutex;
69149e08aacStbbdev bool my_active;
69249e08aacStbbdev input_body<output_type> *my_body;
69349e08aacStbbdev input_body<output_type> *my_init_body;
69449e08aacStbbdev broadcast_cache< output_type > my_successors;
69549e08aacStbbdev bool my_reserved;
69649e08aacStbbdev bool my_has_cached_item;
69749e08aacStbbdev output_type my_cached_item;
69849e08aacStbbdev
69949e08aacStbbdev // used by apply_body_bypass, can invoke body of node.
70049e08aacStbbdev bool try_reserve_apply_body(output_type &v) {
70149e08aacStbbdev spin_mutex::scoped_lock lock(my_mutex);
70249e08aacStbbdev if ( my_reserved ) {
70349e08aacStbbdev return false;
70449e08aacStbbdev }
70549e08aacStbbdev if ( !my_has_cached_item ) {
70649e08aacStbbdev flow_control control;
70749e08aacStbbdev
70849e08aacStbbdev fgt_begin_body( my_body );
70949e08aacStbbdev
71049e08aacStbbdev my_cached_item = (*my_body)(control);
71149e08aacStbbdev my_has_cached_item = !control.is_pipeline_stopped;
71249e08aacStbbdev
71349e08aacStbbdev fgt_end_body( my_body );
71449e08aacStbbdev }
71549e08aacStbbdev if ( my_has_cached_item ) {
71649e08aacStbbdev v = my_cached_item;
71749e08aacStbbdev my_reserved = true;
71849e08aacStbbdev return true;
71949e08aacStbbdev } else {
72049e08aacStbbdev return false;
72149e08aacStbbdev }
72249e08aacStbbdev }
72349e08aacStbbdev
72449e08aacStbbdev graph_task* create_put_task() {
72549e08aacStbbdev small_object_allocator allocator{};
72649e08aacStbbdev typedef input_node_task_bypass< input_node<output_type> > task_type;
72749e08aacStbbdev graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this);
72849e08aacStbbdev my_graph.reserve_wait();
72949e08aacStbbdev return t;
73049e08aacStbbdev }
73149e08aacStbbdev
73249e08aacStbbdev //! Spawns a task that applies the body
73349e08aacStbbdev void spawn_put( ) {
73449e08aacStbbdev if(is_graph_active(this->my_graph)) {
73549e08aacStbbdev spawn_in_graph_arena(this->my_graph, *create_put_task());
73649e08aacStbbdev }
73749e08aacStbbdev }
73849e08aacStbbdev
73949e08aacStbbdev friend class input_node_task_bypass< input_node<output_type> >;
74049e08aacStbbdev //! Applies the body. Returning SUCCESSFULLY_ENQUEUED okay; forward_task_bypass will handle it.
74149e08aacStbbdev graph_task* apply_body_bypass( ) {
74249e08aacStbbdev output_type v;
74349e08aacStbbdev if ( !try_reserve_apply_body(v) )
74457f524caSIlya Isaev return nullptr;
74549e08aacStbbdev
74649e08aacStbbdev graph_task *last_task = my_successors.try_put_task(v);
74749e08aacStbbdev if ( last_task )
74849e08aacStbbdev try_consume();
74949e08aacStbbdev else
75049e08aacStbbdev try_release();
75149e08aacStbbdev return last_task;
75249e08aacStbbdev }
75349e08aacStbbdev }; // class input_node
75449e08aacStbbdev
75549e08aacStbbdev //! Implements a function node that supports Input -> Output
75649e08aacStbbdev template<typename Input, typename Output = continue_msg, typename Policy = queueing>
__TBB_requires(std::default_initializable<Input> && std::copy_constructible<Input> && std::copy_constructible<Output>)757478de5b1Stbbdev __TBB_requires(std::default_initializable<Input> &&
758478de5b1Stbbdev std::copy_constructible<Input> &&
759478de5b1Stbbdev std::copy_constructible<Output>)
76049e08aacStbbdev class function_node
76149e08aacStbbdev : public graph_node
76249e08aacStbbdev , public function_input< Input, Output, Policy, cache_aligned_allocator<Input> >
76349e08aacStbbdev , public function_output<Output>
76449e08aacStbbdev {
76549e08aacStbbdev typedef cache_aligned_allocator<Input> internals_allocator;
76649e08aacStbbdev
76749e08aacStbbdev public:
76849e08aacStbbdev typedef Input input_type;
76949e08aacStbbdev typedef Output output_type;
77049e08aacStbbdev typedef function_input<input_type,output_type,Policy,internals_allocator> input_impl_type;
77149e08aacStbbdev typedef function_input_queue<input_type, internals_allocator> input_queue_type;
77249e08aacStbbdev typedef function_output<output_type> fOutput_type;
77349e08aacStbbdev typedef typename input_impl_type::predecessor_type predecessor_type;
77449e08aacStbbdev typedef typename fOutput_type::successor_type successor_type;
77549e08aacStbbdev
77649e08aacStbbdev using input_impl_type::my_predecessors;
77749e08aacStbbdev
77849e08aacStbbdev //! Constructor
77949e08aacStbbdev // input_queue_type is allocated here, but destroyed in the function_input_base.
78049e08aacStbbdev // TODO: pass the graph_buffer_policy to the function_input_base so it can all
78149e08aacStbbdev // be done in one place. This would be an interface-breaking change.
78249e08aacStbbdev template< typename Body >
783478de5b1Stbbdev __TBB_requires(function_node_body<Body, Input, Output>)
78449e08aacStbbdev __TBB_NOINLINE_SYM function_node( graph &g, size_t concurrency,
78549e08aacStbbdev Body body, Policy = Policy(), node_priority_t a_priority = no_priority )
78649e08aacStbbdev : graph_node(g), input_impl_type(g, concurrency, body, a_priority),
78749e08aacStbbdev fOutput_type(g) {
78849e08aacStbbdev fgt_node_with_body( CODEPTR(), FLOW_FUNCTION_NODE, &this->my_graph,
78949e08aacStbbdev static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
79049e08aacStbbdev }
79149e08aacStbbdev
79249e08aacStbbdev template <typename Body>
793478de5b1Stbbdev __TBB_requires(function_node_body<Body, Input, Output>)
79449e08aacStbbdev function_node( graph& g, size_t concurrency, Body body, node_priority_t a_priority )
79549e08aacStbbdev : function_node(g, concurrency, body, Policy(), a_priority) {}
79649e08aacStbbdev
79749e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
79849e08aacStbbdev template <typename Body, typename... Args>
799478de5b1Stbbdev __TBB_requires(function_node_body<Body, Input, Output>)
80049e08aacStbbdev function_node( const node_set<Args...>& nodes, size_t concurrency, Body body,
80149e08aacStbbdev Policy p = Policy(), node_priority_t a_priority = no_priority )
80249e08aacStbbdev : function_node(nodes.graph_reference(), concurrency, body, p, a_priority) {
80349e08aacStbbdev make_edges_in_order(nodes, *this);
80449e08aacStbbdev }
80549e08aacStbbdev
80649e08aacStbbdev template <typename Body, typename... Args>
807478de5b1Stbbdev __TBB_requires(function_node_body<Body, Input, Output>)
80849e08aacStbbdev function_node( const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t a_priority )
80949e08aacStbbdev : function_node(nodes, concurrency, body, Policy(), a_priority) {}
81049e08aacStbbdev #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
81149e08aacStbbdev
81249e08aacStbbdev //! Copy constructor
81349e08aacStbbdev __TBB_NOINLINE_SYM function_node( const function_node& src ) :
81449e08aacStbbdev graph_node(src.my_graph),
81549e08aacStbbdev input_impl_type(src),
81649e08aacStbbdev fOutput_type(src.my_graph) {
81749e08aacStbbdev fgt_node_with_body( CODEPTR(), FLOW_FUNCTION_NODE, &this->my_graph,
81849e08aacStbbdev static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
81949e08aacStbbdev }
82049e08aacStbbdev
82149e08aacStbbdev protected:
82249e08aacStbbdev template< typename R, typename B > friend class run_and_put_task;
82349e08aacStbbdev template<typename X, typename Y> friend class broadcast_cache;
82449e08aacStbbdev template<typename X, typename Y> friend class round_robin_cache;
82549e08aacStbbdev using input_impl_type::try_put_task;
82649e08aacStbbdev
82749e08aacStbbdev broadcast_cache<output_type> &successors () override { return fOutput_type::my_successors; }
82849e08aacStbbdev
82949e08aacStbbdev void reset_node(reset_flags f) override {
83049e08aacStbbdev input_impl_type::reset_function_input(f);
83149e08aacStbbdev // TODO: use clear() instead.
83249e08aacStbbdev if(f & rf_clear_edges) {
83349e08aacStbbdev successors().clear();
83449e08aacStbbdev my_predecessors.clear();
83549e08aacStbbdev }
83649e08aacStbbdev __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "function_node successors not empty");
83749e08aacStbbdev __TBB_ASSERT(this->my_predecessors.empty(), "function_node predecessors not empty");
83849e08aacStbbdev }
83949e08aacStbbdev
84049e08aacStbbdev }; // class function_node
84149e08aacStbbdev
84249e08aacStbbdev //! implements a function node that supports Input -> (set of outputs)
84349e08aacStbbdev // Output is a tuple of output types.
84449e08aacStbbdev template<typename Input, typename Output, typename Policy = queueing>
__TBB_requires(std::default_initializable<Input> && std::copy_constructible<Input>)845478de5b1Stbbdev __TBB_requires(std::default_initializable<Input> &&
846478de5b1Stbbdev std::copy_constructible<Input>)
84749e08aacStbbdev class multifunction_node :
84849e08aacStbbdev public graph_node,
84949e08aacStbbdev public multifunction_input
85049e08aacStbbdev <
85149e08aacStbbdev Input,
85249e08aacStbbdev typename wrap_tuple_elements<
85349e08aacStbbdev std::tuple_size<Output>::value, // #elements in tuple
85449e08aacStbbdev multifunction_output, // wrap this around each element
85549e08aacStbbdev Output // the tuple providing the types
85649e08aacStbbdev >::type,
85749e08aacStbbdev Policy,
85849e08aacStbbdev cache_aligned_allocator<Input>
85949e08aacStbbdev >
86049e08aacStbbdev {
86149e08aacStbbdev typedef cache_aligned_allocator<Input> internals_allocator;
86249e08aacStbbdev
86349e08aacStbbdev protected:
86449e08aacStbbdev static const int N = std::tuple_size<Output>::value;
86549e08aacStbbdev public:
86649e08aacStbbdev typedef Input input_type;
86749e08aacStbbdev typedef null_type output_type;
86849e08aacStbbdev typedef typename wrap_tuple_elements<N,multifunction_output, Output>::type output_ports_type;
86949e08aacStbbdev typedef multifunction_input<
87049e08aacStbbdev input_type, output_ports_type, Policy, internals_allocator> input_impl_type;
87149e08aacStbbdev typedef function_input_queue<input_type, internals_allocator> input_queue_type;
87249e08aacStbbdev private:
87349e08aacStbbdev using input_impl_type::my_predecessors;
87449e08aacStbbdev public:
87549e08aacStbbdev template<typename Body>
876478de5b1Stbbdev __TBB_requires(multifunction_node_body<Body, Input, output_ports_type>)
87749e08aacStbbdev __TBB_NOINLINE_SYM multifunction_node(
87849e08aacStbbdev graph &g, size_t concurrency,
87949e08aacStbbdev Body body, Policy = Policy(), node_priority_t a_priority = no_priority
88049e08aacStbbdev ) : graph_node(g), input_impl_type(g, concurrency, body, a_priority) {
88149e08aacStbbdev fgt_multioutput_node_with_body<N>(
88249e08aacStbbdev CODEPTR(), FLOW_MULTIFUNCTION_NODE,
88349e08aacStbbdev &this->my_graph, static_cast<receiver<input_type> *>(this),
88449e08aacStbbdev this->output_ports(), this->my_body
88549e08aacStbbdev );
88649e08aacStbbdev }
88749e08aacStbbdev
88849e08aacStbbdev template <typename Body>
889478de5b1Stbbdev __TBB_requires(multifunction_node_body<Body, Input, output_ports_type>)
89049e08aacStbbdev __TBB_NOINLINE_SYM multifunction_node(graph& g, size_t concurrency, Body body, node_priority_t a_priority)
89149e08aacStbbdev : multifunction_node(g, concurrency, body, Policy(), a_priority) {}
89249e08aacStbbdev
89349e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
89449e08aacStbbdev template <typename Body, typename... Args>
895478de5b1Stbbdev __TBB_requires(multifunction_node_body<Body, Input, output_ports_type>)
89649e08aacStbbdev __TBB_NOINLINE_SYM multifunction_node(const node_set<Args...>& nodes, size_t concurrency, Body body,
89749e08aacStbbdev Policy p = Policy(), node_priority_t a_priority = no_priority)
89849e08aacStbbdev : multifunction_node(nodes.graph_reference(), concurrency, body, p, a_priority) {
89949e08aacStbbdev make_edges_in_order(nodes, *this);
90049e08aacStbbdev }
90149e08aacStbbdev
90249e08aacStbbdev template <typename Body, typename... Args>
903478de5b1Stbbdev __TBB_requires(multifunction_node_body<Body, Input, output_ports_type>)
90449e08aacStbbdev __TBB_NOINLINE_SYM multifunction_node(const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t a_priority)
90549e08aacStbbdev : multifunction_node(nodes, concurrency, body, Policy(), a_priority) {}
90649e08aacStbbdev #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
90749e08aacStbbdev
90849e08aacStbbdev __TBB_NOINLINE_SYM multifunction_node( const multifunction_node &other) :
90949e08aacStbbdev graph_node(other.my_graph), input_impl_type(other) {
91049e08aacStbbdev fgt_multioutput_node_with_body<N>( CODEPTR(), FLOW_MULTIFUNCTION_NODE,
91149e08aacStbbdev &this->my_graph, static_cast<receiver<input_type> *>(this),
91249e08aacStbbdev this->output_ports(), this->my_body );
91349e08aacStbbdev }
91449e08aacStbbdev
91549e08aacStbbdev // all the guts are in multifunction_input...
91649e08aacStbbdev protected:
91749e08aacStbbdev void reset_node(reset_flags f) override { input_impl_type::reset(f); }
91849e08aacStbbdev }; // multifunction_node
91949e08aacStbbdev
92049e08aacStbbdev //! split_node: accepts a tuple as input, forwards each element of the tuple to its
92149e08aacStbbdev // successors. The node has unlimited concurrency, so it does not reject inputs.
92249e08aacStbbdev template<typename TupleType>
92349e08aacStbbdev class split_node : public graph_node, public receiver<TupleType> {
92449e08aacStbbdev static const int N = std::tuple_size<TupleType>::value;
92549e08aacStbbdev typedef receiver<TupleType> base_type;
92649e08aacStbbdev public:
92749e08aacStbbdev typedef TupleType input_type;
92849e08aacStbbdev typedef typename wrap_tuple_elements<
92949e08aacStbbdev N, // #elements in tuple
93049e08aacStbbdev multifunction_output, // wrap this around each element
93149e08aacStbbdev TupleType // the tuple providing the types
93249e08aacStbbdev >::type output_ports_type;
93349e08aacStbbdev
split_node(graph & g)93449e08aacStbbdev __TBB_NOINLINE_SYM explicit split_node(graph &g)
93549e08aacStbbdev : graph_node(g),
93649e08aacStbbdev my_output_ports(init_output_ports<output_ports_type>::call(g, my_output_ports))
93749e08aacStbbdev {
93849e08aacStbbdev fgt_multioutput_node<N>(CODEPTR(), FLOW_SPLIT_NODE, &this->my_graph,
93949e08aacStbbdev static_cast<receiver<input_type> *>(this), this->output_ports());
94049e08aacStbbdev }
94149e08aacStbbdev
94249e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
94349e08aacStbbdev template <typename... Args>
split_node(const node_set<Args...> & nodes)94449e08aacStbbdev __TBB_NOINLINE_SYM split_node(const node_set<Args...>& nodes) : split_node(nodes.graph_reference()) {
94549e08aacStbbdev make_edges_in_order(nodes, *this);
94649e08aacStbbdev }
94749e08aacStbbdev #endif
94849e08aacStbbdev
split_node(const split_node & other)94949e08aacStbbdev __TBB_NOINLINE_SYM split_node(const split_node& other)
95049e08aacStbbdev : graph_node(other.my_graph), base_type(other),
95149e08aacStbbdev my_output_ports(init_output_ports<output_ports_type>::call(other.my_graph, my_output_ports))
95249e08aacStbbdev {
95349e08aacStbbdev fgt_multioutput_node<N>(CODEPTR(), FLOW_SPLIT_NODE, &this->my_graph,
95449e08aacStbbdev static_cast<receiver<input_type> *>(this), this->output_ports());
95549e08aacStbbdev }
95649e08aacStbbdev
output_ports()95749e08aacStbbdev output_ports_type &output_ports() { return my_output_ports; }
95849e08aacStbbdev
95949e08aacStbbdev protected:
try_put_task(const TupleType & t)96049e08aacStbbdev graph_task *try_put_task(const TupleType& t) override {
96149e08aacStbbdev // Sending split messages in parallel is not justified, as overheads would prevail.
96249e08aacStbbdev // Also, we do not have successors here. So we just tell the task returned here is successful.
96349e08aacStbbdev return emit_element<N>::emit_this(this->my_graph, t, output_ports());
96449e08aacStbbdev }
reset_node(reset_flags f)96549e08aacStbbdev void reset_node(reset_flags f) override {
96649e08aacStbbdev if (f & rf_clear_edges)
96749e08aacStbbdev clear_element<N>::clear_this(my_output_ports);
96849e08aacStbbdev
96949e08aacStbbdev __TBB_ASSERT(!(f & rf_clear_edges) || clear_element<N>::this_empty(my_output_ports), "split_node reset failed");
97049e08aacStbbdev }
graph_reference()97149e08aacStbbdev graph& graph_reference() const override {
97249e08aacStbbdev return my_graph;
97349e08aacStbbdev }
97449e08aacStbbdev
97549e08aacStbbdev private:
97649e08aacStbbdev output_ports_type my_output_ports;
97749e08aacStbbdev };
97849e08aacStbbdev
97949e08aacStbbdev //! Implements an executable node that supports continue_msg -> Output
98049e08aacStbbdev template <typename Output, typename Policy = Policy<void> >
__TBB_requires(std::copy_constructible<Output>)981478de5b1Stbbdev __TBB_requires(std::copy_constructible<Output>)
98249e08aacStbbdev class continue_node : public graph_node, public continue_input<Output, Policy>,
98349e08aacStbbdev public function_output<Output> {
98449e08aacStbbdev public:
98549e08aacStbbdev typedef continue_msg input_type;
98649e08aacStbbdev typedef Output output_type;
98749e08aacStbbdev typedef continue_input<Output, Policy> input_impl_type;
98849e08aacStbbdev typedef function_output<output_type> fOutput_type;
98949e08aacStbbdev typedef typename input_impl_type::predecessor_type predecessor_type;
99049e08aacStbbdev typedef typename fOutput_type::successor_type successor_type;
99149e08aacStbbdev
99249e08aacStbbdev //! Constructor for executable node with continue_msg -> Output
99349e08aacStbbdev template <typename Body >
994478de5b1Stbbdev __TBB_requires(continue_node_body<Body, Output>)
99549e08aacStbbdev __TBB_NOINLINE_SYM continue_node(
99649e08aacStbbdev graph &g,
99749e08aacStbbdev Body body, Policy = Policy(), node_priority_t a_priority = no_priority
99849e08aacStbbdev ) : graph_node(g), input_impl_type( g, body, a_priority ),
99949e08aacStbbdev fOutput_type(g) {
100049e08aacStbbdev fgt_node_with_body( CODEPTR(), FLOW_CONTINUE_NODE, &this->my_graph,
100149e08aacStbbdev
100249e08aacStbbdev static_cast<receiver<input_type> *>(this),
100349e08aacStbbdev static_cast<sender<output_type> *>(this), this->my_body );
100449e08aacStbbdev }
100549e08aacStbbdev
100649e08aacStbbdev template <typename Body>
1007478de5b1Stbbdev __TBB_requires(continue_node_body<Body, Output>)
100849e08aacStbbdev continue_node( graph& g, Body body, node_priority_t a_priority )
100949e08aacStbbdev : continue_node(g, body, Policy(), a_priority) {}
101049e08aacStbbdev
101149e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
101249e08aacStbbdev template <typename Body, typename... Args>
1013478de5b1Stbbdev __TBB_requires(continue_node_body<Body, Output>)
101449e08aacStbbdev continue_node( const node_set<Args...>& nodes, Body body,
101549e08aacStbbdev Policy p = Policy(), node_priority_t a_priority = no_priority )
101649e08aacStbbdev : continue_node(nodes.graph_reference(), body, p, a_priority ) {
101749e08aacStbbdev make_edges_in_order(nodes, *this);
101849e08aacStbbdev }
101949e08aacStbbdev template <typename Body, typename... Args>
1020478de5b1Stbbdev __TBB_requires(continue_node_body<Body, Output>)
102149e08aacStbbdev continue_node( const node_set<Args...>& nodes, Body body, node_priority_t a_priority)
102249e08aacStbbdev : continue_node(nodes, body, Policy(), a_priority) {}
102349e08aacStbbdev #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
102449e08aacStbbdev
102549e08aacStbbdev //! Constructor for executable node with continue_msg -> Output
102649e08aacStbbdev template <typename Body >
1027478de5b1Stbbdev __TBB_requires(continue_node_body<Body, Output>)
102849e08aacStbbdev __TBB_NOINLINE_SYM continue_node(
102949e08aacStbbdev graph &g, int number_of_predecessors,
103049e08aacStbbdev Body body, Policy = Policy(), node_priority_t a_priority = no_priority
103149e08aacStbbdev ) : graph_node(g)
103249e08aacStbbdev , input_impl_type(g, number_of_predecessors, body, a_priority),
103349e08aacStbbdev fOutput_type(g) {
103449e08aacStbbdev fgt_node_with_body( CODEPTR(), FLOW_CONTINUE_NODE, &this->my_graph,
103549e08aacStbbdev static_cast<receiver<input_type> *>(this),
103649e08aacStbbdev static_cast<sender<output_type> *>(this), this->my_body );
103749e08aacStbbdev }
103849e08aacStbbdev
103949e08aacStbbdev template <typename Body>
1040478de5b1Stbbdev __TBB_requires(continue_node_body<Body, Output>)
104149e08aacStbbdev continue_node( graph& g, int number_of_predecessors, Body body, node_priority_t a_priority)
104249e08aacStbbdev : continue_node(g, number_of_predecessors, body, Policy(), a_priority) {}
104349e08aacStbbdev
104449e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
104549e08aacStbbdev template <typename Body, typename... Args>
1046478de5b1Stbbdev __TBB_requires(continue_node_body<Body, Output>)
104749e08aacStbbdev continue_node( const node_set<Args...>& nodes, int number_of_predecessors,
104849e08aacStbbdev Body body, Policy p = Policy(), node_priority_t a_priority = no_priority )
104949e08aacStbbdev : continue_node(nodes.graph_reference(), number_of_predecessors, body, p, a_priority) {
105049e08aacStbbdev make_edges_in_order(nodes, *this);
105149e08aacStbbdev }
105249e08aacStbbdev
105349e08aacStbbdev template <typename Body, typename... Args>
1054478de5b1Stbbdev __TBB_requires(continue_node_body<Body, Output>)
105549e08aacStbbdev continue_node( const node_set<Args...>& nodes, int number_of_predecessors,
105649e08aacStbbdev Body body, node_priority_t a_priority )
105749e08aacStbbdev : continue_node(nodes, number_of_predecessors, body, Policy(), a_priority) {}
105849e08aacStbbdev #endif
105949e08aacStbbdev
106049e08aacStbbdev //! Copy constructor
106149e08aacStbbdev __TBB_NOINLINE_SYM continue_node( const continue_node& src ) :
106249e08aacStbbdev graph_node(src.my_graph), input_impl_type(src),
106349e08aacStbbdev function_output<Output>(src.my_graph) {
106449e08aacStbbdev fgt_node_with_body( CODEPTR(), FLOW_CONTINUE_NODE, &this->my_graph,
106549e08aacStbbdev static_cast<receiver<input_type> *>(this),
106649e08aacStbbdev static_cast<sender<output_type> *>(this), this->my_body );
106749e08aacStbbdev }
106849e08aacStbbdev
106949e08aacStbbdev protected:
107049e08aacStbbdev template< typename R, typename B > friend class run_and_put_task;
107149e08aacStbbdev template<typename X, typename Y> friend class broadcast_cache;
107249e08aacStbbdev template<typename X, typename Y> friend class round_robin_cache;
107349e08aacStbbdev using input_impl_type::try_put_task;
107449e08aacStbbdev broadcast_cache<output_type> &successors () override { return fOutput_type::my_successors; }
107549e08aacStbbdev
107649e08aacStbbdev void reset_node(reset_flags f) override {
107749e08aacStbbdev input_impl_type::reset_receiver(f);
107849e08aacStbbdev if(f & rf_clear_edges)successors().clear();
107949e08aacStbbdev __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "continue_node not reset");
108049e08aacStbbdev }
108149e08aacStbbdev }; // continue_node
108249e08aacStbbdev
108349e08aacStbbdev //! Forwards messages of type T to all successors
108449e08aacStbbdev template <typename T>
108549e08aacStbbdev class broadcast_node : public graph_node, public receiver<T>, public sender<T> {
108649e08aacStbbdev public:
108749e08aacStbbdev typedef T input_type;
108849e08aacStbbdev typedef T output_type;
108949e08aacStbbdev typedef typename receiver<input_type>::predecessor_type predecessor_type;
109049e08aacStbbdev typedef typename sender<output_type>::successor_type successor_type;
109149e08aacStbbdev private:
109249e08aacStbbdev broadcast_cache<input_type> my_successors;
109349e08aacStbbdev public:
109449e08aacStbbdev
broadcast_node(graph & g)109549e08aacStbbdev __TBB_NOINLINE_SYM explicit broadcast_node(graph& g) : graph_node(g), my_successors(this) {
109649e08aacStbbdev fgt_node( CODEPTR(), FLOW_BROADCAST_NODE, &this->my_graph,
109749e08aacStbbdev static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
109849e08aacStbbdev }
109949e08aacStbbdev
110049e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
110149e08aacStbbdev template <typename... Args>
broadcast_node(const node_set<Args...> & nodes)110249e08aacStbbdev broadcast_node(const node_set<Args...>& nodes) : broadcast_node(nodes.graph_reference()) {
110349e08aacStbbdev make_edges_in_order(nodes, *this);
110449e08aacStbbdev }
110549e08aacStbbdev #endif
110649e08aacStbbdev
110749e08aacStbbdev // Copy constructor
broadcast_node(const broadcast_node & src)110849e08aacStbbdev __TBB_NOINLINE_SYM broadcast_node( const broadcast_node& src ) : broadcast_node(src.my_graph) {}
110949e08aacStbbdev
111049e08aacStbbdev //! Adds a successor
register_successor(successor_type & r)111149e08aacStbbdev bool register_successor( successor_type &r ) override {
111249e08aacStbbdev my_successors.register_successor( r );
111349e08aacStbbdev return true;
111449e08aacStbbdev }
111549e08aacStbbdev
111649e08aacStbbdev //! Removes s as a successor
remove_successor(successor_type & r)111749e08aacStbbdev bool remove_successor( successor_type &r ) override {
111849e08aacStbbdev my_successors.remove_successor( r );
111949e08aacStbbdev return true;
112049e08aacStbbdev }
112149e08aacStbbdev
112249e08aacStbbdev protected:
112349e08aacStbbdev template< typename R, typename B > friend class run_and_put_task;
112449e08aacStbbdev template<typename X, typename Y> friend class broadcast_cache;
112549e08aacStbbdev template<typename X, typename Y> friend class round_robin_cache;
112649e08aacStbbdev //! build a task to run the successor if possible. Default is old behavior.
try_put_task(const T & t)112749e08aacStbbdev graph_task *try_put_task(const T& t) override {
112849e08aacStbbdev graph_task *new_task = my_successors.try_put_task(t);
112949e08aacStbbdev if (!new_task) new_task = SUCCESSFULLY_ENQUEUED;
113049e08aacStbbdev return new_task;
113149e08aacStbbdev }
113249e08aacStbbdev
graph_reference()113349e08aacStbbdev graph& graph_reference() const override {
113449e08aacStbbdev return my_graph;
113549e08aacStbbdev }
113649e08aacStbbdev
reset_node(reset_flags f)113749e08aacStbbdev void reset_node(reset_flags f) override {
113849e08aacStbbdev if (f&rf_clear_edges) {
113949e08aacStbbdev my_successors.clear();
114049e08aacStbbdev }
114149e08aacStbbdev __TBB_ASSERT(!(f & rf_clear_edges) || my_successors.empty(), "Error resetting broadcast_node");
114249e08aacStbbdev }
114349e08aacStbbdev }; // broadcast_node
114449e08aacStbbdev
114549e08aacStbbdev //! Forwards messages in arbitrary order
114649e08aacStbbdev template <typename T>
114749e08aacStbbdev class buffer_node
114849e08aacStbbdev : public graph_node
114949e08aacStbbdev , public reservable_item_buffer< T, cache_aligned_allocator<T> >
115049e08aacStbbdev , public receiver<T>, public sender<T>
115149e08aacStbbdev {
115249e08aacStbbdev typedef cache_aligned_allocator<T> internals_allocator;
115349e08aacStbbdev
115449e08aacStbbdev public:
115549e08aacStbbdev typedef T input_type;
115649e08aacStbbdev typedef T output_type;
115749e08aacStbbdev typedef typename receiver<input_type>::predecessor_type predecessor_type;
115849e08aacStbbdev typedef typename sender<output_type>::successor_type successor_type;
115949e08aacStbbdev typedef buffer_node<T> class_type;
116049e08aacStbbdev
116149e08aacStbbdev protected:
116249e08aacStbbdev typedef size_t size_type;
116349e08aacStbbdev round_robin_cache< T, null_rw_mutex > my_successors;
116449e08aacStbbdev
116549e08aacStbbdev friend class forward_task_bypass< class_type >;
116649e08aacStbbdev
116749e08aacStbbdev enum op_type {reg_succ, rem_succ, req_item, res_item, rel_res, con_res, put_item, try_fwd_task
116849e08aacStbbdev };
116949e08aacStbbdev
117049e08aacStbbdev // implements the aggregator_operation concept
117149e08aacStbbdev class buffer_operation : public aggregated_operation< buffer_operation > {
117249e08aacStbbdev public:
117349e08aacStbbdev char type;
117449e08aacStbbdev T* elem;
117549e08aacStbbdev graph_task* ltask;
117649e08aacStbbdev successor_type *r;
117749e08aacStbbdev
buffer_operation(const T & e,op_type t)117849e08aacStbbdev buffer_operation(const T& e, op_type t) : type(char(t))
117957f524caSIlya Isaev , elem(const_cast<T*>(&e)) , ltask(nullptr)
1180f2af7473Skboyarinov , r(nullptr)
118149e08aacStbbdev {}
buffer_operation(op_type t)1182f2af7473Skboyarinov buffer_operation(op_type t) : type(char(t)), elem(nullptr), ltask(nullptr), r(nullptr) {}
118349e08aacStbbdev };
118449e08aacStbbdev
118549e08aacStbbdev bool forwarder_busy;
118649e08aacStbbdev typedef aggregating_functor<class_type, buffer_operation> handler_type;
118749e08aacStbbdev friend class aggregating_functor<class_type, buffer_operation>;
118849e08aacStbbdev aggregator< handler_type, buffer_operation> my_aggregator;
118949e08aacStbbdev
handle_operations(buffer_operation * op_list)119049e08aacStbbdev virtual void handle_operations(buffer_operation *op_list) {
119149e08aacStbbdev handle_operations_impl(op_list, this);
119249e08aacStbbdev }
119349e08aacStbbdev
119449e08aacStbbdev template<typename derived_type>
handle_operations_impl(buffer_operation * op_list,derived_type * derived)119549e08aacStbbdev void handle_operations_impl(buffer_operation *op_list, derived_type* derived) {
119649e08aacStbbdev __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
119749e08aacStbbdev
119857f524caSIlya Isaev buffer_operation *tmp = nullptr;
119949e08aacStbbdev bool try_forwarding = false;
120049e08aacStbbdev while (op_list) {
120149e08aacStbbdev tmp = op_list;
120249e08aacStbbdev op_list = op_list->next;
120349e08aacStbbdev switch (tmp->type) {
120449e08aacStbbdev case reg_succ: internal_reg_succ(tmp); try_forwarding = true; break;
120549e08aacStbbdev case rem_succ: internal_rem_succ(tmp); break;
120649e08aacStbbdev case req_item: internal_pop(tmp); break;
120749e08aacStbbdev case res_item: internal_reserve(tmp); break;
120849e08aacStbbdev case rel_res: internal_release(tmp); try_forwarding = true; break;
120949e08aacStbbdev case con_res: internal_consume(tmp); try_forwarding = true; break;
121049e08aacStbbdev case put_item: try_forwarding = internal_push(tmp); break;
121149e08aacStbbdev case try_fwd_task: internal_forward_task(tmp); break;
121249e08aacStbbdev }
121349e08aacStbbdev }
121449e08aacStbbdev
121549e08aacStbbdev derived->order();
121649e08aacStbbdev
121749e08aacStbbdev if (try_forwarding && !forwarder_busy) {
121849e08aacStbbdev if(is_graph_active(this->my_graph)) {
121949e08aacStbbdev forwarder_busy = true;
122049e08aacStbbdev typedef forward_task_bypass<class_type> task_type;
122149e08aacStbbdev small_object_allocator allocator{};
122249e08aacStbbdev graph_task* new_task = allocator.new_object<task_type>(graph_reference(), allocator, *this);
122349e08aacStbbdev my_graph.reserve_wait();
122449e08aacStbbdev // tmp should point to the last item handled by the aggregator. This is the operation
122549e08aacStbbdev // the handling thread enqueued. So modifying that record will be okay.
122649e08aacStbbdev // TODO revamp: check that the issue is still present
122749e08aacStbbdev // workaround for icc bug (at least 12.0 and 13.0)
122849e08aacStbbdev // error: function "tbb::flow::interfaceX::combine_tasks" cannot be called with the given argument list
122949e08aacStbbdev // argument types are: (graph, graph_task *, graph_task *)
123049e08aacStbbdev graph_task *z = tmp->ltask;
123149e08aacStbbdev graph &g = this->my_graph;
123249e08aacStbbdev tmp->ltask = combine_tasks(g, z, new_task); // in case the op generated a task
123349e08aacStbbdev }
123449e08aacStbbdev }
123549e08aacStbbdev } // handle_operations
123649e08aacStbbdev
grab_forwarding_task(buffer_operation & op_data)123749e08aacStbbdev inline graph_task *grab_forwarding_task( buffer_operation &op_data) {
123849e08aacStbbdev return op_data.ltask;
123949e08aacStbbdev }
124049e08aacStbbdev
enqueue_forwarding_task(buffer_operation & op_data)124149e08aacStbbdev inline bool enqueue_forwarding_task(buffer_operation &op_data) {
124249e08aacStbbdev graph_task *ft = grab_forwarding_task(op_data);
124349e08aacStbbdev if(ft) {
124449e08aacStbbdev spawn_in_graph_arena(graph_reference(), *ft);
124549e08aacStbbdev return true;
124649e08aacStbbdev }
124749e08aacStbbdev return false;
124849e08aacStbbdev }
124949e08aacStbbdev
125049e08aacStbbdev //! This is executed by an enqueued task, the "forwarder"
forward_task()125149e08aacStbbdev virtual graph_task *forward_task() {
125249e08aacStbbdev buffer_operation op_data(try_fwd_task);
125357f524caSIlya Isaev graph_task *last_task = nullptr;
125449e08aacStbbdev do {
125549e08aacStbbdev op_data.status = WAIT;
125657f524caSIlya Isaev op_data.ltask = nullptr;
125749e08aacStbbdev my_aggregator.execute(&op_data);
125849e08aacStbbdev
125949e08aacStbbdev // workaround for icc bug
126049e08aacStbbdev graph_task *xtask = op_data.ltask;
126149e08aacStbbdev graph& g = this->my_graph;
126249e08aacStbbdev last_task = combine_tasks(g, last_task, xtask);
126349e08aacStbbdev } while (op_data.status ==SUCCEEDED);
126449e08aacStbbdev return last_task;
126549e08aacStbbdev }
126649e08aacStbbdev
126749e08aacStbbdev //! Register successor
internal_reg_succ(buffer_operation * op)126849e08aacStbbdev virtual void internal_reg_succ(buffer_operation *op) {
1269f2af7473Skboyarinov __TBB_ASSERT(op->r, nullptr);
127049e08aacStbbdev my_successors.register_successor(*(op->r));
127149e08aacStbbdev op->status.store(SUCCEEDED, std::memory_order_release);
127249e08aacStbbdev }
127349e08aacStbbdev
127449e08aacStbbdev //! Remove successor
internal_rem_succ(buffer_operation * op)127549e08aacStbbdev virtual void internal_rem_succ(buffer_operation *op) {
1276f2af7473Skboyarinov __TBB_ASSERT(op->r, nullptr);
127749e08aacStbbdev my_successors.remove_successor(*(op->r));
127849e08aacStbbdev op->status.store(SUCCEEDED, std::memory_order_release);
127949e08aacStbbdev }
128049e08aacStbbdev
128149e08aacStbbdev private:
order()128249e08aacStbbdev void order() {}
128349e08aacStbbdev
is_item_valid()128449e08aacStbbdev bool is_item_valid() {
128549e08aacStbbdev return this->my_item_valid(this->my_tail - 1);
128649e08aacStbbdev }
128749e08aacStbbdev
try_put_and_add_task(graph_task * & last_task)128849e08aacStbbdev void try_put_and_add_task(graph_task*& last_task) {
128949e08aacStbbdev graph_task *new_task = my_successors.try_put_task(this->back());
129049e08aacStbbdev if (new_task) {
129149e08aacStbbdev // workaround for icc bug
129249e08aacStbbdev graph& g = this->my_graph;
129349e08aacStbbdev last_task = combine_tasks(g, last_task, new_task);
129449e08aacStbbdev this->destroy_back();
129549e08aacStbbdev }
129649e08aacStbbdev }
129749e08aacStbbdev
129849e08aacStbbdev protected:
129949e08aacStbbdev //! Tries to forward valid items to successors
internal_forward_task(buffer_operation * op)130049e08aacStbbdev virtual void internal_forward_task(buffer_operation *op) {
130149e08aacStbbdev internal_forward_task_impl(op, this);
130249e08aacStbbdev }
130349e08aacStbbdev
130449e08aacStbbdev template<typename derived_type>
internal_forward_task_impl(buffer_operation * op,derived_type * derived)130549e08aacStbbdev void internal_forward_task_impl(buffer_operation *op, derived_type* derived) {
130649e08aacStbbdev __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
130749e08aacStbbdev
130849e08aacStbbdev if (this->my_reserved || !derived->is_item_valid()) {
130949e08aacStbbdev op->status.store(FAILED, std::memory_order_release);
131049e08aacStbbdev this->forwarder_busy = false;
131149e08aacStbbdev return;
131249e08aacStbbdev }
131349e08aacStbbdev // Try forwarding, giving each successor a chance
131457f524caSIlya Isaev graph_task* last_task = nullptr;
131549e08aacStbbdev size_type counter = my_successors.size();
131649e08aacStbbdev for (; counter > 0 && derived->is_item_valid(); --counter)
131749e08aacStbbdev derived->try_put_and_add_task(last_task);
131849e08aacStbbdev
131949e08aacStbbdev op->ltask = last_task; // return task
132049e08aacStbbdev if (last_task && !counter) {
132149e08aacStbbdev op->status.store(SUCCEEDED, std::memory_order_release);
132249e08aacStbbdev }
132349e08aacStbbdev else {
132449e08aacStbbdev op->status.store(FAILED, std::memory_order_release);
132549e08aacStbbdev forwarder_busy = false;
132649e08aacStbbdev }
132749e08aacStbbdev }
132849e08aacStbbdev
internal_push(buffer_operation * op)132949e08aacStbbdev virtual bool internal_push(buffer_operation *op) {
1330f2af7473Skboyarinov __TBB_ASSERT(op->elem, nullptr);
133149e08aacStbbdev this->push_back(*(op->elem));
133249e08aacStbbdev op->status.store(SUCCEEDED, std::memory_order_release);
133349e08aacStbbdev return true;
133449e08aacStbbdev }
133549e08aacStbbdev
internal_pop(buffer_operation * op)133649e08aacStbbdev virtual void internal_pop(buffer_operation *op) {
1337f2af7473Skboyarinov __TBB_ASSERT(op->elem, nullptr);
133849e08aacStbbdev if(this->pop_back(*(op->elem))) {
133949e08aacStbbdev op->status.store(SUCCEEDED, std::memory_order_release);
134049e08aacStbbdev }
134149e08aacStbbdev else {
134249e08aacStbbdev op->status.store(FAILED, std::memory_order_release);
134349e08aacStbbdev }
134449e08aacStbbdev }
134549e08aacStbbdev
internal_reserve(buffer_operation * op)134649e08aacStbbdev virtual void internal_reserve(buffer_operation *op) {
1347f2af7473Skboyarinov __TBB_ASSERT(op->elem, nullptr);
134849e08aacStbbdev if(this->reserve_front(*(op->elem))) {
134949e08aacStbbdev op->status.store(SUCCEEDED, std::memory_order_release);
135049e08aacStbbdev }
135149e08aacStbbdev else {
135249e08aacStbbdev op->status.store(FAILED, std::memory_order_release);
135349e08aacStbbdev }
135449e08aacStbbdev }
135549e08aacStbbdev
internal_consume(buffer_operation * op)135649e08aacStbbdev virtual void internal_consume(buffer_operation *op) {
135749e08aacStbbdev this->consume_front();
135849e08aacStbbdev op->status.store(SUCCEEDED, std::memory_order_release);
135949e08aacStbbdev }
136049e08aacStbbdev
internal_release(buffer_operation * op)136149e08aacStbbdev virtual void internal_release(buffer_operation *op) {
136249e08aacStbbdev this->release_front();
136349e08aacStbbdev op->status.store(SUCCEEDED, std::memory_order_release);
136449e08aacStbbdev }
136549e08aacStbbdev
136649e08aacStbbdev public:
136749e08aacStbbdev //! Constructor
buffer_node(graph & g)136849e08aacStbbdev __TBB_NOINLINE_SYM explicit buffer_node( graph &g )
136949e08aacStbbdev : graph_node(g), reservable_item_buffer<T, internals_allocator>(), receiver<T>(),
137049e08aacStbbdev sender<T>(), my_successors(this), forwarder_busy(false)
137149e08aacStbbdev {
137249e08aacStbbdev my_aggregator.initialize_handler(handler_type(this));
137349e08aacStbbdev fgt_node( CODEPTR(), FLOW_BUFFER_NODE, &this->my_graph,
137449e08aacStbbdev static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
137549e08aacStbbdev }
137649e08aacStbbdev
137749e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
137849e08aacStbbdev template <typename... Args>
buffer_node(const node_set<Args...> & nodes)137949e08aacStbbdev buffer_node(const node_set<Args...>& nodes) : buffer_node(nodes.graph_reference()) {
138049e08aacStbbdev make_edges_in_order(nodes, *this);
138149e08aacStbbdev }
138249e08aacStbbdev #endif
138349e08aacStbbdev
138449e08aacStbbdev //! Copy constructor
buffer_node(const buffer_node & src)138549e08aacStbbdev __TBB_NOINLINE_SYM buffer_node( const buffer_node& src ) : buffer_node(src.my_graph) {}
138649e08aacStbbdev
138749e08aacStbbdev //
138849e08aacStbbdev // message sender implementation
138949e08aacStbbdev //
139049e08aacStbbdev
139149e08aacStbbdev //! Adds a new successor.
139249e08aacStbbdev /** Adds successor r to the list of successors; may forward tasks. */
register_successor(successor_type & r)139349e08aacStbbdev bool register_successor( successor_type &r ) override {
139449e08aacStbbdev buffer_operation op_data(reg_succ);
139549e08aacStbbdev op_data.r = &r;
139649e08aacStbbdev my_aggregator.execute(&op_data);
139749e08aacStbbdev (void)enqueue_forwarding_task(op_data);
139849e08aacStbbdev return true;
139949e08aacStbbdev }
140049e08aacStbbdev
140149e08aacStbbdev //! Removes a successor.
140249e08aacStbbdev /** Removes successor r from the list of successors.
140349e08aacStbbdev It also calls r.remove_predecessor(*this) to remove this node as a predecessor. */
remove_successor(successor_type & r)140449e08aacStbbdev bool remove_successor( successor_type &r ) override {
140549e08aacStbbdev // TODO revamp: investigate why full qualification is necessary here
140649e08aacStbbdev tbb::detail::d1::remove_predecessor(r, *this);
140749e08aacStbbdev buffer_operation op_data(rem_succ);
140849e08aacStbbdev op_data.r = &r;
140949e08aacStbbdev my_aggregator.execute(&op_data);
141049e08aacStbbdev // even though this operation does not cause a forward, if we are the handler, and
141149e08aacStbbdev // a forward is scheduled, we may be the first to reach this point after the aggregator,
141249e08aacStbbdev // and so should check for the task.
141349e08aacStbbdev (void)enqueue_forwarding_task(op_data);
141449e08aacStbbdev return true;
141549e08aacStbbdev }
141649e08aacStbbdev
141749e08aacStbbdev //! Request an item from the buffer_node
141849e08aacStbbdev /** true = v contains the returned item<BR>
141949e08aacStbbdev false = no item has been returned */
try_get(T & v)142049e08aacStbbdev bool try_get( T &v ) override {
142149e08aacStbbdev buffer_operation op_data(req_item);
142249e08aacStbbdev op_data.elem = &v;
142349e08aacStbbdev my_aggregator.execute(&op_data);
142449e08aacStbbdev (void)enqueue_forwarding_task(op_data);
142549e08aacStbbdev return (op_data.status==SUCCEEDED);
142649e08aacStbbdev }
142749e08aacStbbdev
142849e08aacStbbdev //! Reserves an item.
142949e08aacStbbdev /** false = no item can be reserved<BR>
143049e08aacStbbdev true = an item is reserved */
try_reserve(T & v)143149e08aacStbbdev bool try_reserve( T &v ) override {
143249e08aacStbbdev buffer_operation op_data(res_item);
143349e08aacStbbdev op_data.elem = &v;
143449e08aacStbbdev my_aggregator.execute(&op_data);
143549e08aacStbbdev (void)enqueue_forwarding_task(op_data);
143649e08aacStbbdev return (op_data.status==SUCCEEDED);
143749e08aacStbbdev }
143849e08aacStbbdev
143949e08aacStbbdev //! Release a reserved item.
144049e08aacStbbdev /** true = item has been released and so remains in sender */
try_release()144149e08aacStbbdev bool try_release() override {
144249e08aacStbbdev buffer_operation op_data(rel_res);
144349e08aacStbbdev my_aggregator.execute(&op_data);
144449e08aacStbbdev (void)enqueue_forwarding_task(op_data);
144549e08aacStbbdev return true;
144649e08aacStbbdev }
144749e08aacStbbdev
144849e08aacStbbdev //! Consumes a reserved item.
144949e08aacStbbdev /** true = item is removed from sender and reservation removed */
try_consume()145049e08aacStbbdev bool try_consume() override {
145149e08aacStbbdev buffer_operation op_data(con_res);
145249e08aacStbbdev my_aggregator.execute(&op_data);
145349e08aacStbbdev (void)enqueue_forwarding_task(op_data);
145449e08aacStbbdev return true;
145549e08aacStbbdev }
145649e08aacStbbdev
145749e08aacStbbdev protected:
145849e08aacStbbdev
145949e08aacStbbdev template< typename R, typename B > friend class run_and_put_task;
146049e08aacStbbdev template<typename X, typename Y> friend class broadcast_cache;
146149e08aacStbbdev template<typename X, typename Y> friend class round_robin_cache;
146249e08aacStbbdev //! receive an item, return a task *if possible
try_put_task(const T & t)146349e08aacStbbdev graph_task *try_put_task(const T &t) override {
146449e08aacStbbdev buffer_operation op_data(t, put_item);
146549e08aacStbbdev my_aggregator.execute(&op_data);
146649e08aacStbbdev graph_task *ft = grab_forwarding_task(op_data);
146749e08aacStbbdev // sequencer_nodes can return failure (if an item has been previously inserted)
146849e08aacStbbdev // We have to spawn the returned task if our own operation fails.
146949e08aacStbbdev
147049e08aacStbbdev if(ft && op_data.status ==FAILED) {
147149e08aacStbbdev // we haven't succeeded queueing the item, but for some reason the
147249e08aacStbbdev // call returned a task (if another request resulted in a successful
147349e08aacStbbdev // forward this could happen.) Queue the task and reset the pointer.
147457f524caSIlya Isaev spawn_in_graph_arena(graph_reference(), *ft); ft = nullptr;
147549e08aacStbbdev }
147649e08aacStbbdev else if(!ft && op_data.status ==SUCCEEDED) {
147749e08aacStbbdev ft = SUCCESSFULLY_ENQUEUED;
147849e08aacStbbdev }
147949e08aacStbbdev return ft;
148049e08aacStbbdev }
148149e08aacStbbdev
graph_reference()148249e08aacStbbdev graph& graph_reference() const override {
148349e08aacStbbdev return my_graph;
148449e08aacStbbdev }
148549e08aacStbbdev
148649e08aacStbbdev protected:
reset_node(reset_flags f)148749e08aacStbbdev void reset_node( reset_flags f) override {
148849e08aacStbbdev reservable_item_buffer<T, internals_allocator>::reset();
148949e08aacStbbdev // TODO: just clear structures
149049e08aacStbbdev if (f&rf_clear_edges) {
149149e08aacStbbdev my_successors.clear();
149249e08aacStbbdev }
149349e08aacStbbdev forwarder_busy = false;
149449e08aacStbbdev }
149549e08aacStbbdev }; // buffer_node
149649e08aacStbbdev
149749e08aacStbbdev //! Forwards messages in FIFO order
149849e08aacStbbdev template <typename T>
149949e08aacStbbdev class queue_node : public buffer_node<T> {
150049e08aacStbbdev protected:
150149e08aacStbbdev typedef buffer_node<T> base_type;
150249e08aacStbbdev typedef typename base_type::size_type size_type;
150349e08aacStbbdev typedef typename base_type::buffer_operation queue_operation;
150449e08aacStbbdev typedef queue_node class_type;
150549e08aacStbbdev
150649e08aacStbbdev private:
150749e08aacStbbdev template<typename> friend class buffer_node;
150849e08aacStbbdev
is_item_valid()150949e08aacStbbdev bool is_item_valid() {
151049e08aacStbbdev return this->my_item_valid(this->my_head);
151149e08aacStbbdev }
151249e08aacStbbdev
try_put_and_add_task(graph_task * & last_task)151349e08aacStbbdev void try_put_and_add_task(graph_task*& last_task) {
151449e08aacStbbdev graph_task *new_task = this->my_successors.try_put_task(this->front());
151549e08aacStbbdev if (new_task) {
151649e08aacStbbdev // workaround for icc bug
151749e08aacStbbdev graph& graph_ref = this->graph_reference();
151849e08aacStbbdev last_task = combine_tasks(graph_ref, last_task, new_task);
151949e08aacStbbdev this->destroy_front();
152049e08aacStbbdev }
152149e08aacStbbdev }
152249e08aacStbbdev
152349e08aacStbbdev protected:
internal_forward_task(queue_operation * op)152449e08aacStbbdev void internal_forward_task(queue_operation *op) override {
152549e08aacStbbdev this->internal_forward_task_impl(op, this);
152649e08aacStbbdev }
152749e08aacStbbdev
internal_pop(queue_operation * op)152849e08aacStbbdev void internal_pop(queue_operation *op) override {
152949e08aacStbbdev if ( this->my_reserved || !this->my_item_valid(this->my_head)){
153049e08aacStbbdev op->status.store(FAILED, std::memory_order_release);
153149e08aacStbbdev }
153249e08aacStbbdev else {
153349e08aacStbbdev this->pop_front(*(op->elem));
153449e08aacStbbdev op->status.store(SUCCEEDED, std::memory_order_release);
153549e08aacStbbdev }
153649e08aacStbbdev }
internal_reserve(queue_operation * op)153749e08aacStbbdev void internal_reserve(queue_operation *op) override {
153849e08aacStbbdev if (this->my_reserved || !this->my_item_valid(this->my_head)) {
153949e08aacStbbdev op->status.store(FAILED, std::memory_order_release);
154049e08aacStbbdev }
154149e08aacStbbdev else {
154249e08aacStbbdev this->reserve_front(*(op->elem));
154349e08aacStbbdev op->status.store(SUCCEEDED, std::memory_order_release);
154449e08aacStbbdev }
154549e08aacStbbdev }
internal_consume(queue_operation * op)154649e08aacStbbdev void internal_consume(queue_operation *op) override {
154749e08aacStbbdev this->consume_front();
154849e08aacStbbdev op->status.store(SUCCEEDED, std::memory_order_release);
154949e08aacStbbdev }
155049e08aacStbbdev
155149e08aacStbbdev public:
155249e08aacStbbdev typedef T input_type;
155349e08aacStbbdev typedef T output_type;
155449e08aacStbbdev typedef typename receiver<input_type>::predecessor_type predecessor_type;
155549e08aacStbbdev typedef typename sender<output_type>::successor_type successor_type;
155649e08aacStbbdev
155749e08aacStbbdev //! Constructor
queue_node(graph & g)155849e08aacStbbdev __TBB_NOINLINE_SYM explicit queue_node( graph &g ) : base_type(g) {
155949e08aacStbbdev fgt_node( CODEPTR(), FLOW_QUEUE_NODE, &(this->my_graph),
156049e08aacStbbdev static_cast<receiver<input_type> *>(this),
156149e08aacStbbdev static_cast<sender<output_type> *>(this) );
156249e08aacStbbdev }
156349e08aacStbbdev
156449e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
156549e08aacStbbdev template <typename... Args>
queue_node(const node_set<Args...> & nodes)156649e08aacStbbdev queue_node( const node_set<Args...>& nodes) : queue_node(nodes.graph_reference()) {
156749e08aacStbbdev make_edges_in_order(nodes, *this);
156849e08aacStbbdev }
156949e08aacStbbdev #endif
157049e08aacStbbdev
157149e08aacStbbdev //! Copy constructor
queue_node(const queue_node & src)157249e08aacStbbdev __TBB_NOINLINE_SYM queue_node( const queue_node& src) : base_type(src) {
157349e08aacStbbdev fgt_node( CODEPTR(), FLOW_QUEUE_NODE, &(this->my_graph),
157449e08aacStbbdev static_cast<receiver<input_type> *>(this),
157549e08aacStbbdev static_cast<sender<output_type> *>(this) );
157649e08aacStbbdev }
157749e08aacStbbdev
157849e08aacStbbdev
157949e08aacStbbdev protected:
reset_node(reset_flags f)158049e08aacStbbdev void reset_node( reset_flags f) override {
158149e08aacStbbdev base_type::reset_node(f);
158249e08aacStbbdev }
158349e08aacStbbdev }; // queue_node
158449e08aacStbbdev
158549e08aacStbbdev //! Forwards messages in sequence order
158649e08aacStbbdev template <typename T>
__TBB_requires(std::copyable<T>)1587478de5b1Stbbdev __TBB_requires(std::copyable<T>)
158849e08aacStbbdev class sequencer_node : public queue_node<T> {
158949e08aacStbbdev function_body< T, size_t > *my_sequencer;
159049e08aacStbbdev // my_sequencer should be a benign function and must be callable
159149e08aacStbbdev // from a parallel context. Does this mean it needn't be reset?
159249e08aacStbbdev public:
159349e08aacStbbdev typedef T input_type;
159449e08aacStbbdev typedef T output_type;
159549e08aacStbbdev typedef typename receiver<input_type>::predecessor_type predecessor_type;
159649e08aacStbbdev typedef typename sender<output_type>::successor_type successor_type;
159749e08aacStbbdev
159849e08aacStbbdev //! Constructor
159949e08aacStbbdev template< typename Sequencer >
1600478de5b1Stbbdev __TBB_requires(sequencer<Sequencer, T>)
160149e08aacStbbdev __TBB_NOINLINE_SYM sequencer_node( graph &g, const Sequencer& s ) : queue_node<T>(g),
160249e08aacStbbdev my_sequencer(new function_body_leaf< T, size_t, Sequencer>(s) ) {
160349e08aacStbbdev fgt_node( CODEPTR(), FLOW_SEQUENCER_NODE, &(this->my_graph),
160449e08aacStbbdev static_cast<receiver<input_type> *>(this),
160549e08aacStbbdev static_cast<sender<output_type> *>(this) );
160649e08aacStbbdev }
160749e08aacStbbdev
160849e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
160949e08aacStbbdev template <typename Sequencer, typename... Args>
1610478de5b1Stbbdev __TBB_requires(sequencer<Sequencer, T>)
161149e08aacStbbdev sequencer_node( const node_set<Args...>& nodes, const Sequencer& s)
161249e08aacStbbdev : sequencer_node(nodes.graph_reference(), s) {
161349e08aacStbbdev make_edges_in_order(nodes, *this);
161449e08aacStbbdev }
161549e08aacStbbdev #endif
161649e08aacStbbdev
161749e08aacStbbdev //! Copy constructor
161849e08aacStbbdev __TBB_NOINLINE_SYM sequencer_node( const sequencer_node& src ) : queue_node<T>(src),
161949e08aacStbbdev my_sequencer( src.my_sequencer->clone() ) {
162049e08aacStbbdev fgt_node( CODEPTR(), FLOW_SEQUENCER_NODE, &(this->my_graph),
162149e08aacStbbdev static_cast<receiver<input_type> *>(this),
162249e08aacStbbdev static_cast<sender<output_type> *>(this) );
162349e08aacStbbdev }
162449e08aacStbbdev
162549e08aacStbbdev //! Destructor
162649e08aacStbbdev ~sequencer_node() { delete my_sequencer; }
162749e08aacStbbdev
162849e08aacStbbdev protected:
162949e08aacStbbdev typedef typename buffer_node<T>::size_type size_type;
163049e08aacStbbdev typedef typename buffer_node<T>::buffer_operation sequencer_operation;
163149e08aacStbbdev
163249e08aacStbbdev private:
163349e08aacStbbdev bool internal_push(sequencer_operation *op) override {
163449e08aacStbbdev size_type tag = (*my_sequencer)(*(op->elem));
163549e08aacStbbdev #if !TBB_DEPRECATED_SEQUENCER_DUPLICATES
163649e08aacStbbdev if (tag < this->my_head) {
163749e08aacStbbdev // have already emitted a message with this tag
163849e08aacStbbdev op->status.store(FAILED, std::memory_order_release);
163949e08aacStbbdev return false;
164049e08aacStbbdev }
164149e08aacStbbdev #endif
164249e08aacStbbdev // cannot modify this->my_tail now; the buffer would be inconsistent.
164349e08aacStbbdev size_t new_tail = (tag+1 > this->my_tail) ? tag+1 : this->my_tail;
164449e08aacStbbdev
164549e08aacStbbdev if (this->size(new_tail) > this->capacity()) {
164649e08aacStbbdev this->grow_my_array(this->size(new_tail));
164749e08aacStbbdev }
164849e08aacStbbdev this->my_tail = new_tail;
164949e08aacStbbdev
165049e08aacStbbdev const op_stat res = this->place_item(tag, *(op->elem)) ? SUCCEEDED : FAILED;
165149e08aacStbbdev op->status.store(res, std::memory_order_release);
165249e08aacStbbdev return res ==SUCCEEDED;
165349e08aacStbbdev }
165449e08aacStbbdev }; // sequencer_node
165549e08aacStbbdev
165649e08aacStbbdev //! Forwards messages in priority order
165749e08aacStbbdev template<typename T, typename Compare = std::less<T>>
165849e08aacStbbdev class priority_queue_node : public buffer_node<T> {
165949e08aacStbbdev public:
166049e08aacStbbdev typedef T input_type;
166149e08aacStbbdev typedef T output_type;
166249e08aacStbbdev typedef buffer_node<T> base_type;
166349e08aacStbbdev typedef priority_queue_node class_type;
166449e08aacStbbdev typedef typename receiver<input_type>::predecessor_type predecessor_type;
166549e08aacStbbdev typedef typename sender<output_type>::successor_type successor_type;
166649e08aacStbbdev
166749e08aacStbbdev //! Constructor
166849e08aacStbbdev __TBB_NOINLINE_SYM explicit priority_queue_node( graph &g, const Compare& comp = Compare() )
166949e08aacStbbdev : buffer_node<T>(g), compare(comp), mark(0) {
167049e08aacStbbdev fgt_node( CODEPTR(), FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
167149e08aacStbbdev static_cast<receiver<input_type> *>(this),
167249e08aacStbbdev static_cast<sender<output_type> *>(this) );
167349e08aacStbbdev }
167449e08aacStbbdev
167549e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
167649e08aacStbbdev template <typename... Args>
167749e08aacStbbdev priority_queue_node(const node_set<Args...>& nodes, const Compare& comp = Compare())
167849e08aacStbbdev : priority_queue_node(nodes.graph_reference(), comp) {
167949e08aacStbbdev make_edges_in_order(nodes, *this);
168049e08aacStbbdev }
168149e08aacStbbdev #endif
168249e08aacStbbdev
168349e08aacStbbdev //! Copy constructor
priority_queue_node(const priority_queue_node & src)168449e08aacStbbdev __TBB_NOINLINE_SYM priority_queue_node( const priority_queue_node &src )
168549e08aacStbbdev : buffer_node<T>(src), mark(0)
168649e08aacStbbdev {
168749e08aacStbbdev fgt_node( CODEPTR(), FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
168849e08aacStbbdev static_cast<receiver<input_type> *>(this),
168949e08aacStbbdev static_cast<sender<output_type> *>(this) );
169049e08aacStbbdev }
169149e08aacStbbdev
169249e08aacStbbdev protected:
169349e08aacStbbdev
reset_node(reset_flags f)169449e08aacStbbdev void reset_node( reset_flags f) override {
169549e08aacStbbdev mark = 0;
169649e08aacStbbdev base_type::reset_node(f);
169749e08aacStbbdev }
169849e08aacStbbdev
169949e08aacStbbdev typedef typename buffer_node<T>::size_type size_type;
170049e08aacStbbdev typedef typename buffer_node<T>::item_type item_type;
170149e08aacStbbdev typedef typename buffer_node<T>::buffer_operation prio_operation;
170249e08aacStbbdev
170349e08aacStbbdev //! Tries to forward valid items to successors
internal_forward_task(prio_operation * op)170449e08aacStbbdev void internal_forward_task(prio_operation *op) override {
170549e08aacStbbdev this->internal_forward_task_impl(op, this);
170649e08aacStbbdev }
170749e08aacStbbdev
handle_operations(prio_operation * op_list)170849e08aacStbbdev void handle_operations(prio_operation *op_list) override {
170949e08aacStbbdev this->handle_operations_impl(op_list, this);
171049e08aacStbbdev }
171149e08aacStbbdev
internal_push(prio_operation * op)171249e08aacStbbdev bool internal_push(prio_operation *op) override {
171349e08aacStbbdev prio_push(*(op->elem));
171449e08aacStbbdev op->status.store(SUCCEEDED, std::memory_order_release);
171549e08aacStbbdev return true;
171649e08aacStbbdev }
171749e08aacStbbdev
internal_pop(prio_operation * op)171849e08aacStbbdev void internal_pop(prio_operation *op) override {
171949e08aacStbbdev // if empty or already reserved, don't pop
172049e08aacStbbdev if ( this->my_reserved == true || this->my_tail == 0 ) {
172149e08aacStbbdev op->status.store(FAILED, std::memory_order_release);
172249e08aacStbbdev return;
172349e08aacStbbdev }
172449e08aacStbbdev
172549e08aacStbbdev *(op->elem) = prio();
172649e08aacStbbdev op->status.store(SUCCEEDED, std::memory_order_release);
172749e08aacStbbdev prio_pop();
172849e08aacStbbdev
172949e08aacStbbdev }
173049e08aacStbbdev
173149e08aacStbbdev // pops the highest-priority item, saves copy
internal_reserve(prio_operation * op)173249e08aacStbbdev void internal_reserve(prio_operation *op) override {
173349e08aacStbbdev if (this->my_reserved == true || this->my_tail == 0) {
173449e08aacStbbdev op->status.store(FAILED, std::memory_order_release);
173549e08aacStbbdev return;
173649e08aacStbbdev }
173749e08aacStbbdev this->my_reserved = true;
173849e08aacStbbdev *(op->elem) = prio();
173949e08aacStbbdev reserved_item = *(op->elem);
174049e08aacStbbdev op->status.store(SUCCEEDED, std::memory_order_release);
174149e08aacStbbdev prio_pop();
174249e08aacStbbdev }
174349e08aacStbbdev
internal_consume(prio_operation * op)174449e08aacStbbdev void internal_consume(prio_operation *op) override {
174549e08aacStbbdev op->status.store(SUCCEEDED, std::memory_order_release);
174649e08aacStbbdev this->my_reserved = false;
174749e08aacStbbdev reserved_item = input_type();
174849e08aacStbbdev }
174949e08aacStbbdev
internal_release(prio_operation * op)175049e08aacStbbdev void internal_release(prio_operation *op) override {
175149e08aacStbbdev op->status.store(SUCCEEDED, std::memory_order_release);
175249e08aacStbbdev prio_push(reserved_item);
175349e08aacStbbdev this->my_reserved = false;
175449e08aacStbbdev reserved_item = input_type();
175549e08aacStbbdev }
175649e08aacStbbdev
175749e08aacStbbdev private:
175849e08aacStbbdev template<typename> friend class buffer_node;
175949e08aacStbbdev
order()176049e08aacStbbdev void order() {
176149e08aacStbbdev if (mark < this->my_tail) heapify();
176249e08aacStbbdev __TBB_ASSERT(mark == this->my_tail, "mark unequal after heapify");
176349e08aacStbbdev }
176449e08aacStbbdev
is_item_valid()176549e08aacStbbdev bool is_item_valid() {
176649e08aacStbbdev return this->my_tail > 0;
176749e08aacStbbdev }
176849e08aacStbbdev
try_put_and_add_task(graph_task * & last_task)176949e08aacStbbdev void try_put_and_add_task(graph_task*& last_task) {
177049e08aacStbbdev graph_task * new_task = this->my_successors.try_put_task(this->prio());
177149e08aacStbbdev if (new_task) {
177249e08aacStbbdev // workaround for icc bug
177349e08aacStbbdev graph& graph_ref = this->graph_reference();
177449e08aacStbbdev last_task = combine_tasks(graph_ref, last_task, new_task);
177549e08aacStbbdev prio_pop();
177649e08aacStbbdev }
177749e08aacStbbdev }
177849e08aacStbbdev
177949e08aacStbbdev private:
178049e08aacStbbdev Compare compare;
178149e08aacStbbdev size_type mark;
178249e08aacStbbdev
178349e08aacStbbdev input_type reserved_item;
178449e08aacStbbdev
178549e08aacStbbdev // in case a reheap has not been done after a push, check if the mark item is higher than the 0'th item
prio_use_tail()178649e08aacStbbdev bool prio_use_tail() {
178749e08aacStbbdev __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds before test");
178849e08aacStbbdev return mark < this->my_tail && compare(this->get_my_item(0), this->get_my_item(this->my_tail - 1));
178949e08aacStbbdev }
179049e08aacStbbdev
179149e08aacStbbdev // prio_push: checks that the item will fit, expand array if necessary, put at end
prio_push(const T & src)179249e08aacStbbdev void prio_push(const T &src) {
179349e08aacStbbdev if ( this->my_tail >= this->my_array_size )
179449e08aacStbbdev this->grow_my_array( this->my_tail + 1 );
179549e08aacStbbdev (void) this->place_item(this->my_tail, src);
179649e08aacStbbdev ++(this->my_tail);
179749e08aacStbbdev __TBB_ASSERT(mark < this->my_tail, "mark outside bounds after push");
179849e08aacStbbdev }
179949e08aacStbbdev
180049e08aacStbbdev // prio_pop: deletes highest priority item from the array, and if it is item
180149e08aacStbbdev // 0, move last item to 0 and reheap. If end of array, just destroy and decrement tail
180249e08aacStbbdev // and mark. Assumes the array has already been tested for emptiness; no failure.
prio_pop()180349e08aacStbbdev void prio_pop() {
180449e08aacStbbdev if (prio_use_tail()) {
180549e08aacStbbdev // there are newly pushed elements; last one higher than top
180649e08aacStbbdev // copy the data
180749e08aacStbbdev this->destroy_item(this->my_tail-1);
180849e08aacStbbdev --(this->my_tail);
180949e08aacStbbdev __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
181049e08aacStbbdev return;
181149e08aacStbbdev }
181249e08aacStbbdev this->destroy_item(0);
181349e08aacStbbdev if(this->my_tail > 1) {
181449e08aacStbbdev // push the last element down heap
181557f524caSIlya Isaev __TBB_ASSERT(this->my_item_valid(this->my_tail - 1), nullptr);
181649e08aacStbbdev this->move_item(0,this->my_tail - 1);
181749e08aacStbbdev }
181849e08aacStbbdev --(this->my_tail);
181949e08aacStbbdev if(mark > this->my_tail) --mark;
182049e08aacStbbdev if (this->my_tail > 1) // don't reheap for heap of size 1
182149e08aacStbbdev reheap();
182249e08aacStbbdev __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
182349e08aacStbbdev }
182449e08aacStbbdev
prio()182549e08aacStbbdev const T& prio() {
182649e08aacStbbdev return this->get_my_item(prio_use_tail() ? this->my_tail-1 : 0);
182749e08aacStbbdev }
182849e08aacStbbdev
182949e08aacStbbdev // turn array into heap
heapify()183049e08aacStbbdev void heapify() {
183149e08aacStbbdev if(this->my_tail == 0) {
183249e08aacStbbdev mark = 0;
183349e08aacStbbdev return;
183449e08aacStbbdev }
183549e08aacStbbdev if (!mark) mark = 1;
183649e08aacStbbdev for (; mark<this->my_tail; ++mark) { // for each unheaped element
183749e08aacStbbdev size_type cur_pos = mark;
183849e08aacStbbdev input_type to_place;
183949e08aacStbbdev this->fetch_item(mark,to_place);
184049e08aacStbbdev do { // push to_place up the heap
184149e08aacStbbdev size_type parent = (cur_pos-1)>>1;
184249e08aacStbbdev if (!compare(this->get_my_item(parent), to_place))
184349e08aacStbbdev break;
184449e08aacStbbdev this->move_item(cur_pos, parent);
184549e08aacStbbdev cur_pos = parent;
184649e08aacStbbdev } while( cur_pos );
184749e08aacStbbdev (void) this->place_item(cur_pos, to_place);
184849e08aacStbbdev }
184949e08aacStbbdev }
185049e08aacStbbdev
185149e08aacStbbdev // otherwise heapified array with new root element; rearrange to heap
reheap()185249e08aacStbbdev void reheap() {
185349e08aacStbbdev size_type cur_pos=0, child=1;
185449e08aacStbbdev while (child < mark) {
185549e08aacStbbdev size_type target = child;
185649e08aacStbbdev if (child+1<mark &&
185749e08aacStbbdev compare(this->get_my_item(child),
185849e08aacStbbdev this->get_my_item(child+1)))
185949e08aacStbbdev ++target;
186049e08aacStbbdev // target now has the higher priority child
186149e08aacStbbdev if (compare(this->get_my_item(target),
186249e08aacStbbdev this->get_my_item(cur_pos)))
186349e08aacStbbdev break;
186449e08aacStbbdev // swap
186549e08aacStbbdev this->swap_items(cur_pos, target);
186649e08aacStbbdev cur_pos = target;
186749e08aacStbbdev child = (cur_pos<<1)+1;
186849e08aacStbbdev }
186949e08aacStbbdev }
187049e08aacStbbdev }; // priority_queue_node
187149e08aacStbbdev
187249e08aacStbbdev //! Forwards messages only if the threshold has not been reached
187349e08aacStbbdev /** This node forwards items until its threshold is reached.
187449e08aacStbbdev It contains no buffering. If the downstream node rejects, the
187549e08aacStbbdev message is dropped. */
187649e08aacStbbdev template< typename T, typename DecrementType=continue_msg >
187749e08aacStbbdev class limiter_node : public graph_node, public receiver< T >, public sender< T > {
187849e08aacStbbdev public:
187949e08aacStbbdev typedef T input_type;
188049e08aacStbbdev typedef T output_type;
188149e08aacStbbdev typedef typename receiver<input_type>::predecessor_type predecessor_type;
188249e08aacStbbdev typedef typename sender<output_type>::successor_type successor_type;
188349e08aacStbbdev //TODO: There is a lack of predefined types for its controlling "decrementer" port. It should be fixed later.
188449e08aacStbbdev
188549e08aacStbbdev private:
188649e08aacStbbdev size_t my_threshold;
188749e08aacStbbdev size_t my_count; // number of successful puts
188849e08aacStbbdev size_t my_tries; // number of active put attempts
18890815661eSIlya Mishin size_t my_future_decrement; // number of active decrement
189049e08aacStbbdev reservable_predecessor_cache< T, spin_mutex > my_predecessors;
189149e08aacStbbdev spin_mutex my_mutex;
189249e08aacStbbdev broadcast_cache< T > my_successors;
189349e08aacStbbdev
189449e08aacStbbdev //! The internal receiver< DecrementType > that adjusts the count
189549e08aacStbbdev threshold_regulator< limiter_node<T, DecrementType>, DecrementType > decrement;
189649e08aacStbbdev
decrement_counter(long long delta)189749e08aacStbbdev graph_task* decrement_counter( long long delta ) {
18980815661eSIlya Mishin if ( delta > 0 && size_t(delta) > my_threshold ) {
18990815661eSIlya Mishin delta = my_threshold;
19000815661eSIlya Mishin }
19010815661eSIlya Mishin
190249e08aacStbbdev {
190349e08aacStbbdev spin_mutex::scoped_lock lock(my_mutex);
190425e2b1ddSIlya Mishin if ( delta > 0 && size_t(delta) > my_count ) {
19050815661eSIlya Mishin if( my_tries > 0 ) {
19060815661eSIlya Mishin my_future_decrement += (size_t(delta) - my_count);
19070815661eSIlya Mishin }
190849e08aacStbbdev my_count = 0;
190925e2b1ddSIlya Mishin }
191025e2b1ddSIlya Mishin else if ( delta < 0 && size_t(-delta) > my_threshold - my_count ) {
191149e08aacStbbdev my_count = my_threshold;
191225e2b1ddSIlya Mishin }
191325e2b1ddSIlya Mishin else {
191449e08aacStbbdev my_count -= size_t(delta); // absolute value of delta is sufficiently small
191549e08aacStbbdev }
191625e2b1ddSIlya Mishin __TBB_ASSERT(my_count <= my_threshold, "counter values are truncated to be inside the [0, threshold] interval");
191725e2b1ddSIlya Mishin }
191849e08aacStbbdev return forward_task();
191949e08aacStbbdev }
192049e08aacStbbdev
192149e08aacStbbdev // Let threshold_regulator call decrement_counter()
192249e08aacStbbdev friend class threshold_regulator< limiter_node<T, DecrementType>, DecrementType >;
192349e08aacStbbdev
192449e08aacStbbdev friend class forward_task_bypass< limiter_node<T,DecrementType> >;
192549e08aacStbbdev
check_conditions()192649e08aacStbbdev bool check_conditions() { // always called under lock
192749e08aacStbbdev return ( my_count + my_tries < my_threshold && !my_predecessors.empty() && !my_successors.empty() );
192849e08aacStbbdev }
192949e08aacStbbdev
193057f524caSIlya Isaev // only returns a valid task pointer or nullptr, never SUCCESSFULLY_ENQUEUED
forward_task()193149e08aacStbbdev graph_task* forward_task() {
193249e08aacStbbdev input_type v;
193357f524caSIlya Isaev graph_task* rval = nullptr;
193449e08aacStbbdev bool reserved = false;
19350815661eSIlya Mishin
193649e08aacStbbdev {
193749e08aacStbbdev spin_mutex::scoped_lock lock(my_mutex);
193849e08aacStbbdev if ( check_conditions() )
193949e08aacStbbdev ++my_tries;
194049e08aacStbbdev else
194157f524caSIlya Isaev return nullptr;
194249e08aacStbbdev }
194349e08aacStbbdev
194449e08aacStbbdev //SUCCESS
194549e08aacStbbdev // if we can reserve and can put, we consume the reservation
194649e08aacStbbdev // we increment the count and decrement the tries
194749e08aacStbbdev if ( (my_predecessors.try_reserve(v)) == true ) {
194849e08aacStbbdev reserved = true;
194957f524caSIlya Isaev if ( (rval = my_successors.try_put_task(v)) != nullptr ) {
195049e08aacStbbdev {
195149e08aacStbbdev spin_mutex::scoped_lock lock(my_mutex);
195249e08aacStbbdev ++my_count;
19530815661eSIlya Mishin if ( my_future_decrement ) {
19540815661eSIlya Mishin if ( my_count > my_future_decrement ) {
19550815661eSIlya Mishin my_count -= my_future_decrement;
19560815661eSIlya Mishin my_future_decrement = 0;
19570815661eSIlya Mishin }
19580815661eSIlya Mishin else {
19590815661eSIlya Mishin my_future_decrement -= my_count;
19600815661eSIlya Mishin my_count = 0;
19610815661eSIlya Mishin }
19620815661eSIlya Mishin }
196349e08aacStbbdev --my_tries;
196449e08aacStbbdev my_predecessors.try_consume();
196549e08aacStbbdev if ( check_conditions() ) {
196649e08aacStbbdev if ( is_graph_active(this->my_graph) ) {
196749e08aacStbbdev typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
196849e08aacStbbdev small_object_allocator allocator{};
196949e08aacStbbdev graph_task* rtask = allocator.new_object<task_type>( my_graph, allocator, *this );
197049e08aacStbbdev my_graph.reserve_wait();
197149e08aacStbbdev spawn_in_graph_arena(graph_reference(), *rtask);
197249e08aacStbbdev }
197349e08aacStbbdev }
197449e08aacStbbdev }
197549e08aacStbbdev return rval;
197649e08aacStbbdev }
197749e08aacStbbdev }
197849e08aacStbbdev //FAILURE
197949e08aacStbbdev //if we can't reserve, we decrement the tries
198049e08aacStbbdev //if we can reserve but can't put, we decrement the tries and release the reservation
198149e08aacStbbdev {
198249e08aacStbbdev spin_mutex::scoped_lock lock(my_mutex);
198349e08aacStbbdev --my_tries;
198449e08aacStbbdev if (reserved) my_predecessors.try_release();
198549e08aacStbbdev if ( check_conditions() ) {
198649e08aacStbbdev if ( is_graph_active(this->my_graph) ) {
198749e08aacStbbdev small_object_allocator allocator{};
198849e08aacStbbdev typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
198949e08aacStbbdev graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this);
199049e08aacStbbdev my_graph.reserve_wait();
199149e08aacStbbdev __TBB_ASSERT(!rval, "Have two tasks to handle");
199249e08aacStbbdev return t;
199349e08aacStbbdev }
199449e08aacStbbdev }
199549e08aacStbbdev return rval;
199649e08aacStbbdev }
199749e08aacStbbdev }
199849e08aacStbbdev
initialize()199949e08aacStbbdev void initialize() {
200049e08aacStbbdev fgt_node(
200149e08aacStbbdev CODEPTR(), FLOW_LIMITER_NODE, &this->my_graph,
200249e08aacStbbdev static_cast<receiver<input_type> *>(this), static_cast<receiver<DecrementType> *>(&decrement),
200349e08aacStbbdev static_cast<sender<output_type> *>(this)
200449e08aacStbbdev );
200549e08aacStbbdev }
200649e08aacStbbdev
200749e08aacStbbdev public:
200849e08aacStbbdev //! Constructor
limiter_node(graph & g,size_t threshold)200949e08aacStbbdev limiter_node(graph &g, size_t threshold)
20100815661eSIlya Mishin : graph_node(g), my_threshold(threshold), my_count(0), my_tries(0), my_future_decrement(0),
20110815661eSIlya Mishin my_predecessors(this), my_successors(this), decrement(this)
201249e08aacStbbdev {
201349e08aacStbbdev initialize();
201449e08aacStbbdev }
201549e08aacStbbdev
201649e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
201749e08aacStbbdev template <typename... Args>
limiter_node(const node_set<Args...> & nodes,size_t threshold)201849e08aacStbbdev limiter_node(const node_set<Args...>& nodes, size_t threshold)
201949e08aacStbbdev : limiter_node(nodes.graph_reference(), threshold) {
202049e08aacStbbdev make_edges_in_order(nodes, *this);
202149e08aacStbbdev }
202249e08aacStbbdev #endif
202349e08aacStbbdev
202449e08aacStbbdev //! Copy constructor
limiter_node(const limiter_node & src)202549e08aacStbbdev limiter_node( const limiter_node& src ) : limiter_node(src.my_graph, src.my_threshold) {}
202649e08aacStbbdev
202749e08aacStbbdev //! The interface for accessing internal receiver< DecrementType > that adjusts the count
decrementer()202849e08aacStbbdev receiver<DecrementType>& decrementer() { return decrement; }
202949e08aacStbbdev
203049e08aacStbbdev //! Replace the current successor with this new successor
register_successor(successor_type & r)203149e08aacStbbdev bool register_successor( successor_type &r ) override {
203249e08aacStbbdev spin_mutex::scoped_lock lock(my_mutex);
203349e08aacStbbdev bool was_empty = my_successors.empty();
203449e08aacStbbdev my_successors.register_successor(r);
203549e08aacStbbdev //spawn a forward task if this is the only successor
203649e08aacStbbdev if ( was_empty && !my_predecessors.empty() && my_count + my_tries < my_threshold ) {
203749e08aacStbbdev if ( is_graph_active(this->my_graph) ) {
203849e08aacStbbdev small_object_allocator allocator{};
203949e08aacStbbdev typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
204049e08aacStbbdev graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this);
204149e08aacStbbdev my_graph.reserve_wait();
204249e08aacStbbdev spawn_in_graph_arena(graph_reference(), *t);
204349e08aacStbbdev }
204449e08aacStbbdev }
204549e08aacStbbdev return true;
204649e08aacStbbdev }
204749e08aacStbbdev
204849e08aacStbbdev //! Removes a successor from this node
204949e08aacStbbdev /** r.remove_predecessor(*this) is also called. */
remove_successor(successor_type & r)205049e08aacStbbdev bool remove_successor( successor_type &r ) override {
205149e08aacStbbdev // TODO revamp: investigate why qualification is needed for remove_predecessor() call
205249e08aacStbbdev tbb::detail::d1::remove_predecessor(r, *this);
205349e08aacStbbdev my_successors.remove_successor(r);
205449e08aacStbbdev return true;
205549e08aacStbbdev }
205649e08aacStbbdev
205749e08aacStbbdev //! Adds src to the list of cached predecessors.
register_predecessor(predecessor_type & src)205849e08aacStbbdev bool register_predecessor( predecessor_type &src ) override {
205949e08aacStbbdev spin_mutex::scoped_lock lock(my_mutex);
206049e08aacStbbdev my_predecessors.add( src );
206149e08aacStbbdev if ( my_count + my_tries < my_threshold && !my_successors.empty() && is_graph_active(this->my_graph) ) {
206249e08aacStbbdev small_object_allocator allocator{};
206349e08aacStbbdev typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
206449e08aacStbbdev graph_task* t = allocator.new_object<task_type>(my_graph, allocator, *this);
206549e08aacStbbdev my_graph.reserve_wait();
206649e08aacStbbdev spawn_in_graph_arena(graph_reference(), *t);
206749e08aacStbbdev }
206849e08aacStbbdev return true;
206949e08aacStbbdev }
207049e08aacStbbdev
207149e08aacStbbdev //! Removes src from the list of cached predecessors.
remove_predecessor(predecessor_type & src)207249e08aacStbbdev bool remove_predecessor( predecessor_type &src ) override {
207349e08aacStbbdev my_predecessors.remove( src );
207449e08aacStbbdev return true;
207549e08aacStbbdev }
207649e08aacStbbdev
207749e08aacStbbdev protected:
207849e08aacStbbdev
207949e08aacStbbdev template< typename R, typename B > friend class run_and_put_task;
208049e08aacStbbdev template<typename X, typename Y> friend class broadcast_cache;
208149e08aacStbbdev template<typename X, typename Y> friend class round_robin_cache;
208249e08aacStbbdev //! Puts an item to this receiver
try_put_task(const T & t)208349e08aacStbbdev graph_task* try_put_task( const T &t ) override {
208449e08aacStbbdev {
208549e08aacStbbdev spin_mutex::scoped_lock lock(my_mutex);
208649e08aacStbbdev if ( my_count + my_tries >= my_threshold )
208757f524caSIlya Isaev return nullptr;
208849e08aacStbbdev else
208949e08aacStbbdev ++my_tries;
209049e08aacStbbdev }
209149e08aacStbbdev
209249e08aacStbbdev graph_task* rtask = my_successors.try_put_task(t);
209349e08aacStbbdev if ( !rtask ) { // try_put_task failed.
209449e08aacStbbdev spin_mutex::scoped_lock lock(my_mutex);
209549e08aacStbbdev --my_tries;
209649e08aacStbbdev if (check_conditions() && is_graph_active(this->my_graph)) {
209749e08aacStbbdev small_object_allocator allocator{};
209849e08aacStbbdev typedef forward_task_bypass<limiter_node<T, DecrementType>> task_type;
209949e08aacStbbdev rtask = allocator.new_object<task_type>(my_graph, allocator, *this);
210049e08aacStbbdev my_graph.reserve_wait();
210149e08aacStbbdev }
210249e08aacStbbdev }
210349e08aacStbbdev else {
210449e08aacStbbdev spin_mutex::scoped_lock lock(my_mutex);
210549e08aacStbbdev ++my_count;
21060815661eSIlya Mishin if ( my_future_decrement ) {
21070815661eSIlya Mishin if ( my_count > my_future_decrement ) {
21080815661eSIlya Mishin my_count -= my_future_decrement;
21090815661eSIlya Mishin my_future_decrement = 0;
21100815661eSIlya Mishin }
21110815661eSIlya Mishin else {
21120815661eSIlya Mishin my_future_decrement -= my_count;
21130815661eSIlya Mishin my_count = 0;
21140815661eSIlya Mishin }
21150815661eSIlya Mishin }
211649e08aacStbbdev --my_tries;
211749e08aacStbbdev }
211849e08aacStbbdev return rtask;
211949e08aacStbbdev }
212049e08aacStbbdev
graph_reference()212149e08aacStbbdev graph& graph_reference() const override { return my_graph; }
212249e08aacStbbdev
reset_node(reset_flags f)212349e08aacStbbdev void reset_node( reset_flags f ) override {
212449e08aacStbbdev my_count = 0;
212549e08aacStbbdev if ( f & rf_clear_edges ) {
212649e08aacStbbdev my_predecessors.clear();
212749e08aacStbbdev my_successors.clear();
212849e08aacStbbdev }
21290815661eSIlya Mishin else {
213049e08aacStbbdev my_predecessors.reset();
213149e08aacStbbdev }
213249e08aacStbbdev decrement.reset_receiver(f);
213349e08aacStbbdev }
213449e08aacStbbdev }; // limiter_node
213549e08aacStbbdev
213649e08aacStbbdev #include "detail/_flow_graph_join_impl.h"
213749e08aacStbbdev
213849e08aacStbbdev template<typename OutputTuple, typename JP=queueing> class join_node;
213949e08aacStbbdev
214049e08aacStbbdev template<typename OutputTuple>
214149e08aacStbbdev class join_node<OutputTuple,reserving>: public unfolded_join_node<std::tuple_size<OutputTuple>::value, reserving_port, OutputTuple, reserving> {
214249e08aacStbbdev private:
214349e08aacStbbdev static const int N = std::tuple_size<OutputTuple>::value;
214449e08aacStbbdev typedef unfolded_join_node<N, reserving_port, OutputTuple, reserving> unfolded_type;
214549e08aacStbbdev public:
214649e08aacStbbdev typedef OutputTuple output_type;
214749e08aacStbbdev typedef typename unfolded_type::input_ports_type input_ports_type;
join_node(graph & g)214849e08aacStbbdev __TBB_NOINLINE_SYM explicit join_node(graph &g) : unfolded_type(g) {
214949e08aacStbbdev fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_RESERVING, &this->my_graph,
215049e08aacStbbdev this->input_ports(), static_cast< sender< output_type > *>(this) );
215149e08aacStbbdev }
215249e08aacStbbdev
215349e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
215449e08aacStbbdev template <typename... Args>
215549e08aacStbbdev __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, reserving = reserving()) : join_node(nodes.graph_reference()) {
215649e08aacStbbdev make_edges_in_order(nodes, *this);
215749e08aacStbbdev }
215849e08aacStbbdev #endif
215949e08aacStbbdev
join_node(const join_node & other)216049e08aacStbbdev __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) {
216149e08aacStbbdev fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_RESERVING, &this->my_graph,
216249e08aacStbbdev this->input_ports(), static_cast< sender< output_type > *>(this) );
216349e08aacStbbdev }
216449e08aacStbbdev
216549e08aacStbbdev };
216649e08aacStbbdev
216749e08aacStbbdev template<typename OutputTuple>
216849e08aacStbbdev class join_node<OutputTuple,queueing>: public unfolded_join_node<std::tuple_size<OutputTuple>::value, queueing_port, OutputTuple, queueing> {
216949e08aacStbbdev private:
217049e08aacStbbdev static const int N = std::tuple_size<OutputTuple>::value;
217149e08aacStbbdev typedef unfolded_join_node<N, queueing_port, OutputTuple, queueing> unfolded_type;
217249e08aacStbbdev public:
217349e08aacStbbdev typedef OutputTuple output_type;
217449e08aacStbbdev typedef typename unfolded_type::input_ports_type input_ports_type;
join_node(graph & g)217549e08aacStbbdev __TBB_NOINLINE_SYM explicit join_node(graph &g) : unfolded_type(g) {
217649e08aacStbbdev fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
217749e08aacStbbdev this->input_ports(), static_cast< sender< output_type > *>(this) );
217849e08aacStbbdev }
217949e08aacStbbdev
218049e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
218149e08aacStbbdev template <typename... Args>
218249e08aacStbbdev __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, queueing = queueing()) : join_node(nodes.graph_reference()) {
218349e08aacStbbdev make_edges_in_order(nodes, *this);
218449e08aacStbbdev }
218549e08aacStbbdev #endif
218649e08aacStbbdev
join_node(const join_node & other)218749e08aacStbbdev __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) {
218849e08aacStbbdev fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
218949e08aacStbbdev this->input_ports(), static_cast< sender< output_type > *>(this) );
219049e08aacStbbdev }
219149e08aacStbbdev
219249e08aacStbbdev };
219349e08aacStbbdev
2194478de5b1Stbbdev #if __TBB_CPP20_CONCEPTS_PRESENT
2195478de5b1Stbbdev // Helper function which is well-formed only if all of the elements in OutputTuple
2196478de5b1Stbbdev // satisfies join_node_function_object<body[i], tuple[i], K>
2197478de5b1Stbbdev template <typename OutputTuple, typename K,
2198478de5b1Stbbdev typename... Functions, std::size_t... Idx>
2199478de5b1Stbbdev void join_node_function_objects_helper( std::index_sequence<Idx...> )
2200478de5b1Stbbdev requires (std::tuple_size_v<OutputTuple> == sizeof...(Functions)) &&
2201478de5b1Stbbdev (... && join_node_function_object<Functions, std::tuple_element_t<Idx, OutputTuple>, K>);
2202478de5b1Stbbdev
2203478de5b1Stbbdev template <typename OutputTuple, typename K, typename... Functions>
2204478de5b1Stbbdev concept join_node_functions = requires {
2205478de5b1Stbbdev join_node_function_objects_helper<OutputTuple, K, Functions...>(std::make_index_sequence<sizeof...(Functions)>{});
2206478de5b1Stbbdev };
2207478de5b1Stbbdev
2208478de5b1Stbbdev #endif
2209478de5b1Stbbdev
221049e08aacStbbdev // template for key_matching join_node
221149e08aacStbbdev // tag_matching join_node is a specialization of key_matching, and is source-compatible.
221249e08aacStbbdev template<typename OutputTuple, typename K, typename KHash>
221349e08aacStbbdev class join_node<OutputTuple, key_matching<K, KHash> > : public unfolded_join_node<std::tuple_size<OutputTuple>::value,
221449e08aacStbbdev key_matching_port, OutputTuple, key_matching<K,KHash> > {
221549e08aacStbbdev private:
221649e08aacStbbdev static const int N = std::tuple_size<OutputTuple>::value;
221749e08aacStbbdev typedef unfolded_join_node<N, key_matching_port, OutputTuple, key_matching<K,KHash> > unfolded_type;
221849e08aacStbbdev public:
221949e08aacStbbdev typedef OutputTuple output_type;
222049e08aacStbbdev typedef typename unfolded_type::input_ports_type input_ports_type;
222149e08aacStbbdev
222249e08aacStbbdev #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
join_node(graph & g)222349e08aacStbbdev join_node(graph &g) : unfolded_type(g) {}
222449e08aacStbbdev #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
222549e08aacStbbdev
222649e08aacStbbdev template<typename __TBB_B0, typename __TBB_B1>
__TBB_requires(join_node_functions<OutputTuple,K,__TBB_B0,__TBB_B1>)2227478de5b1Stbbdev __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1>)
222849e08aacStbbdev __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1) : unfolded_type(g, b0, b1) {
222949e08aacStbbdev fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
223049e08aacStbbdev this->input_ports(), static_cast< sender< output_type > *>(this) );
223149e08aacStbbdev }
223249e08aacStbbdev template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2>
__TBB_requires(join_node_functions<OutputTuple,K,__TBB_B0,__TBB_B1,__TBB_B2>)2233478de5b1Stbbdev __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2>)
223449e08aacStbbdev __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2) : unfolded_type(g, b0, b1, b2) {
223549e08aacStbbdev fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
223649e08aacStbbdev this->input_ports(), static_cast< sender< output_type > *>(this) );
223749e08aacStbbdev }
223849e08aacStbbdev template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3>
__TBB_requires(join_node_functions<OutputTuple,K,__TBB_B0,__TBB_B1,__TBB_B2,__TBB_B3>)2239478de5b1Stbbdev __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3>)
224049e08aacStbbdev __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3) : unfolded_type(g, b0, b1, b2, b3) {
224149e08aacStbbdev fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
224249e08aacStbbdev this->input_ports(), static_cast< sender< output_type > *>(this) );
224349e08aacStbbdev }
224449e08aacStbbdev template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4>
__TBB_requires(join_node_functions<OutputTuple,K,__TBB_B0,__TBB_B1,__TBB_B2,__TBB_B3,__TBB_B4>)2245478de5b1Stbbdev __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4>)
224649e08aacStbbdev __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4) :
224749e08aacStbbdev unfolded_type(g, b0, b1, b2, b3, b4) {
224849e08aacStbbdev fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
224949e08aacStbbdev this->input_ports(), static_cast< sender< output_type > *>(this) );
225049e08aacStbbdev }
225149e08aacStbbdev #if __TBB_VARIADIC_MAX >= 6
225249e08aacStbbdev template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
225349e08aacStbbdev typename __TBB_B5>
__TBB_requires(join_node_functions<OutputTuple,K,__TBB_B0,__TBB_B1,__TBB_B2,__TBB_B3,__TBB_B4,__TBB_B5>)2254478de5b1Stbbdev __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5>)
225549e08aacStbbdev __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5) :
225649e08aacStbbdev unfolded_type(g, b0, b1, b2, b3, b4, b5) {
225749e08aacStbbdev fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
225849e08aacStbbdev this->input_ports(), static_cast< sender< output_type > *>(this) );
225949e08aacStbbdev }
226049e08aacStbbdev #endif
226149e08aacStbbdev #if __TBB_VARIADIC_MAX >= 7
226249e08aacStbbdev template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
226349e08aacStbbdev typename __TBB_B5, typename __TBB_B6>
__TBB_requires(join_node_functions<OutputTuple,K,__TBB_B0,__TBB_B1,__TBB_B2,__TBB_B3,__TBB_B4,__TBB_B5,__TBB_B6>)2264478de5b1Stbbdev __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5, __TBB_B6>)
226549e08aacStbbdev __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6) :
226649e08aacStbbdev unfolded_type(g, b0, b1, b2, b3, b4, b5, b6) {
226749e08aacStbbdev fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
226849e08aacStbbdev this->input_ports(), static_cast< sender< output_type > *>(this) );
226949e08aacStbbdev }
227049e08aacStbbdev #endif
227149e08aacStbbdev #if __TBB_VARIADIC_MAX >= 8
227249e08aacStbbdev template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
227349e08aacStbbdev typename __TBB_B5, typename __TBB_B6, typename __TBB_B7>
__TBB_requires(join_node_functions<OutputTuple,K,__TBB_B0,__TBB_B1,__TBB_B2,__TBB_B3,__TBB_B4,__TBB_B5,__TBB_B6,__TBB_B7>)2274478de5b1Stbbdev __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5, __TBB_B6, __TBB_B7>)
227549e08aacStbbdev __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
227649e08aacStbbdev __TBB_B7 b7) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7) {
227749e08aacStbbdev fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
227849e08aacStbbdev this->input_ports(), static_cast< sender< output_type > *>(this) );
227949e08aacStbbdev }
228049e08aacStbbdev #endif
228149e08aacStbbdev #if __TBB_VARIADIC_MAX >= 9
228249e08aacStbbdev template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
228349e08aacStbbdev typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8>
__TBB_requires(join_node_functions<OutputTuple,K,__TBB_B0,__TBB_B1,__TBB_B2,__TBB_B3,__TBB_B4,__TBB_B5,__TBB_B6,__TBB_B7,__TBB_B8>)2284478de5b1Stbbdev __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5, __TBB_B6, __TBB_B7, __TBB_B8>)
228549e08aacStbbdev __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
228649e08aacStbbdev __TBB_B7 b7, __TBB_B8 b8) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8) {
228749e08aacStbbdev fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
228849e08aacStbbdev this->input_ports(), static_cast< sender< output_type > *>(this) );
228949e08aacStbbdev }
229049e08aacStbbdev #endif
229149e08aacStbbdev #if __TBB_VARIADIC_MAX >= 10
229249e08aacStbbdev template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
229349e08aacStbbdev typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8, typename __TBB_B9>
__TBB_requires(join_node_functions<OutputTuple,K,__TBB_B0,__TBB_B1,__TBB_B2,__TBB_B3,__TBB_B4,__TBB_B5,__TBB_B6,__TBB_B7,__TBB_B8,__TBB_B9>)2294478de5b1Stbbdev __TBB_requires(join_node_functions<OutputTuple, K, __TBB_B0, __TBB_B1, __TBB_B2, __TBB_B3, __TBB_B4, __TBB_B5, __TBB_B6, __TBB_B7, __TBB_B8, __TBB_B9>)
229549e08aacStbbdev __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
229649e08aacStbbdev __TBB_B7 b7, __TBB_B8 b8, __TBB_B9 b9) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8, b9) {
229749e08aacStbbdev fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
229849e08aacStbbdev this->input_ports(), static_cast< sender< output_type > *>(this) );
229949e08aacStbbdev }
230049e08aacStbbdev #endif
230149e08aacStbbdev
230249e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
2303b15aabb3Stbbdev template <
2304b15aabb3Stbbdev #if (__clang_major__ == 3 && __clang_minor__ == 4)
2305b15aabb3Stbbdev // clang 3.4 misdeduces 'Args...' for 'node_set' while it can cope with template template parameter.
2306b15aabb3Stbbdev template<typename...> class node_set,
2307b15aabb3Stbbdev #endif
2308b15aabb3Stbbdev typename... Args, typename... Bodies
2309b15aabb3Stbbdev >
2310478de5b1Stbbdev __TBB_requires((sizeof...(Bodies) == 0) || join_node_functions<OutputTuple, K, Bodies...>)
join_node(const node_set<Args...> & nodes,Bodies...bodies)231149e08aacStbbdev __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, Bodies... bodies)
231249e08aacStbbdev : join_node(nodes.graph_reference(), bodies...) {
231349e08aacStbbdev make_edges_in_order(nodes, *this);
231449e08aacStbbdev }
2315478de5b1Stbbdev #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
231649e08aacStbbdev
join_node(const join_node & other)231749e08aacStbbdev __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) {
231849e08aacStbbdev fgt_multiinput_node<N>( CODEPTR(), FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
231949e08aacStbbdev this->input_ports(), static_cast< sender< output_type > *>(this) );
232049e08aacStbbdev }
232149e08aacStbbdev
232249e08aacStbbdev };
232349e08aacStbbdev
232449e08aacStbbdev // indexer node
232549e08aacStbbdev #include "detail/_flow_graph_indexer_impl.h"
232649e08aacStbbdev
232749e08aacStbbdev // TODO: Implement interface with variadic template or tuple
232849e08aacStbbdev template<typename T0, typename T1=null_type, typename T2=null_type, typename T3=null_type,
232949e08aacStbbdev typename T4=null_type, typename T5=null_type, typename T6=null_type,
233049e08aacStbbdev typename T7=null_type, typename T8=null_type, typename T9=null_type> class indexer_node;
233149e08aacStbbdev
233249e08aacStbbdev //indexer node specializations
233349e08aacStbbdev template<typename T0>
233449e08aacStbbdev class indexer_node<T0> : public unfolded_indexer_node<std::tuple<T0> > {
233549e08aacStbbdev private:
233649e08aacStbbdev static const int N = 1;
233749e08aacStbbdev public:
233849e08aacStbbdev typedef std::tuple<T0> InputTuple;
233949e08aacStbbdev typedef tagged_msg<size_t, T0> output_type;
234049e08aacStbbdev typedef unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)234149e08aacStbbdev __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
234249e08aacStbbdev fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
234349e08aacStbbdev this->input_ports(), static_cast< sender< output_type > *>(this) );
234449e08aacStbbdev }
234549e08aacStbbdev
234649e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
234749e08aacStbbdev template <typename... Args>
indexer_node(const node_set<Args...> & nodes)234849e08aacStbbdev indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
234949e08aacStbbdev make_edges_in_order(nodes, *this);
235049e08aacStbbdev }
235149e08aacStbbdev #endif
235249e08aacStbbdev
235349e08aacStbbdev // Copy constructor
indexer_node(const indexer_node & other)235449e08aacStbbdev __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
235549e08aacStbbdev fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
235649e08aacStbbdev this->input_ports(), static_cast< sender< output_type > *>(this) );
235749e08aacStbbdev }
235849e08aacStbbdev };
235949e08aacStbbdev
236049e08aacStbbdev template<typename T0, typename T1>
236149e08aacStbbdev class indexer_node<T0, T1> : public unfolded_indexer_node<std::tuple<T0, T1> > {
236249e08aacStbbdev private:
236349e08aacStbbdev static const int N = 2;
236449e08aacStbbdev public:
236549e08aacStbbdev typedef std::tuple<T0, T1> InputTuple;
236649e08aacStbbdev typedef tagged_msg<size_t, T0, T1> output_type;
236749e08aacStbbdev typedef unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)236849e08aacStbbdev __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
236949e08aacStbbdev fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
237049e08aacStbbdev this->input_ports(), static_cast< sender< output_type > *>(this) );
237149e08aacStbbdev }
237249e08aacStbbdev
237349e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
237449e08aacStbbdev template <typename... Args>
indexer_node(const node_set<Args...> & nodes)237549e08aacStbbdev indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
237649e08aacStbbdev make_edges_in_order(nodes, *this);
237749e08aacStbbdev }
237849e08aacStbbdev #endif
237949e08aacStbbdev
238049e08aacStbbdev // Copy constructor
indexer_node(const indexer_node & other)238149e08aacStbbdev __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
238249e08aacStbbdev fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
238349e08aacStbbdev this->input_ports(), static_cast< sender< output_type > *>(this) );
238449e08aacStbbdev }
238549e08aacStbbdev
238649e08aacStbbdev };
238749e08aacStbbdev
238849e08aacStbbdev template<typename T0, typename T1, typename T2>
238949e08aacStbbdev class indexer_node<T0, T1, T2> : public unfolded_indexer_node<std::tuple<T0, T1, T2> > {
239049e08aacStbbdev private:
239149e08aacStbbdev static const int N = 3;
239249e08aacStbbdev public:
239349e08aacStbbdev typedef std::tuple<T0, T1, T2> InputTuple;
239449e08aacStbbdev typedef tagged_msg<size_t, T0, T1, T2> output_type;
239549e08aacStbbdev typedef unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)239649e08aacStbbdev __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
239749e08aacStbbdev fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
239849e08aacStbbdev this->input_ports(), static_cast< sender< output_type > *>(this) );
239949e08aacStbbdev }
240049e08aacStbbdev
240149e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
240249e08aacStbbdev template <typename... Args>
indexer_node(const node_set<Args...> & nodes)240349e08aacStbbdev indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
240449e08aacStbbdev make_edges_in_order(nodes, *this);
240549e08aacStbbdev }
240649e08aacStbbdev #endif
240749e08aacStbbdev
240849e08aacStbbdev // Copy constructor
indexer_node(const indexer_node & other)240949e08aacStbbdev __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
241049e08aacStbbdev fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
241149e08aacStbbdev this->input_ports(), static_cast< sender< output_type > *>(this) );
241249e08aacStbbdev }
241349e08aacStbbdev
241449e08aacStbbdev };
241549e08aacStbbdev
241649e08aacStbbdev template<typename T0, typename T1, typename T2, typename T3>
241749e08aacStbbdev class indexer_node<T0, T1, T2, T3> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3> > {
241849e08aacStbbdev private:
241949e08aacStbbdev static const int N = 4;
242049e08aacStbbdev public:
242149e08aacStbbdev typedef std::tuple<T0, T1, T2, T3> InputTuple;
242249e08aacStbbdev typedef tagged_msg<size_t, T0, T1, T2, T3> output_type;
242349e08aacStbbdev typedef unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)242449e08aacStbbdev __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
242549e08aacStbbdev fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
242649e08aacStbbdev this->input_ports(), static_cast< sender< output_type > *>(this) );
242749e08aacStbbdev }
242849e08aacStbbdev
242949e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
243049e08aacStbbdev template <typename... Args>
indexer_node(const node_set<Args...> & nodes)243149e08aacStbbdev indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
243249e08aacStbbdev make_edges_in_order(nodes, *this);
243349e08aacStbbdev }
243449e08aacStbbdev #endif
243549e08aacStbbdev
243649e08aacStbbdev // Copy constructor
indexer_node(const indexer_node & other)243749e08aacStbbdev __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
243849e08aacStbbdev fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
243949e08aacStbbdev this->input_ports(), static_cast< sender< output_type > *>(this) );
244049e08aacStbbdev }
244149e08aacStbbdev
244249e08aacStbbdev };
244349e08aacStbbdev
244449e08aacStbbdev template<typename T0, typename T1, typename T2, typename T3, typename T4>
244549e08aacStbbdev class indexer_node<T0, T1, T2, T3, T4> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4> > {
244649e08aacStbbdev private:
244749e08aacStbbdev static const int N = 5;
244849e08aacStbbdev public:
244949e08aacStbbdev typedef std::tuple<T0, T1, T2, T3, T4> InputTuple;
245049e08aacStbbdev typedef tagged_msg<size_t, T0, T1, T2, T3, T4> output_type;
245149e08aacStbbdev typedef unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)245249e08aacStbbdev __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
245349e08aacStbbdev fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
245449e08aacStbbdev this->input_ports(), static_cast< sender< output_type > *>(this) );
245549e08aacStbbdev }
245649e08aacStbbdev
245749e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
245849e08aacStbbdev template <typename... Args>
indexer_node(const node_set<Args...> & nodes)245949e08aacStbbdev indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
246049e08aacStbbdev make_edges_in_order(nodes, *this);
246149e08aacStbbdev }
246249e08aacStbbdev #endif
246349e08aacStbbdev
246449e08aacStbbdev // Copy constructor
indexer_node(const indexer_node & other)246549e08aacStbbdev __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
246649e08aacStbbdev fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
246749e08aacStbbdev this->input_ports(), static_cast< sender< output_type > *>(this) );
246849e08aacStbbdev }
246949e08aacStbbdev
247049e08aacStbbdev };
247149e08aacStbbdev
247249e08aacStbbdev #if __TBB_VARIADIC_MAX >= 6
247349e08aacStbbdev template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5>
247449e08aacStbbdev class indexer_node<T0, T1, T2, T3, T4, T5> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5> > {
247549e08aacStbbdev private:
247649e08aacStbbdev static const int N = 6;
247749e08aacStbbdev public:
247849e08aacStbbdev typedef std::tuple<T0, T1, T2, T3, T4, T5> InputTuple;
247949e08aacStbbdev typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5> output_type;
248049e08aacStbbdev typedef unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)248149e08aacStbbdev __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
248249e08aacStbbdev fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
248349e08aacStbbdev this->input_ports(), static_cast< sender< output_type > *>(this) );
248449e08aacStbbdev }
248549e08aacStbbdev
248649e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
248749e08aacStbbdev template <typename... Args>
indexer_node(const node_set<Args...> & nodes)248849e08aacStbbdev indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
248949e08aacStbbdev make_edges_in_order(nodes, *this);
249049e08aacStbbdev }
249149e08aacStbbdev #endif
249249e08aacStbbdev
249349e08aacStbbdev // Copy constructor
indexer_node(const indexer_node & other)249449e08aacStbbdev __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
249549e08aacStbbdev fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
249649e08aacStbbdev this->input_ports(), static_cast< sender< output_type > *>(this) );
249749e08aacStbbdev }
249849e08aacStbbdev
249949e08aacStbbdev };
250049e08aacStbbdev #endif //variadic max 6
250149e08aacStbbdev
250249e08aacStbbdev #if __TBB_VARIADIC_MAX >= 7
250349e08aacStbbdev template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
250449e08aacStbbdev typename T6>
250549e08aacStbbdev class indexer_node<T0, T1, T2, T3, T4, T5, T6> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5, T6> > {
250649e08aacStbbdev private:
250749e08aacStbbdev static const int N = 7;
250849e08aacStbbdev public:
250949e08aacStbbdev typedef std::tuple<T0, T1, T2, T3, T4, T5, T6> InputTuple;
251049e08aacStbbdev typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6> output_type;
251149e08aacStbbdev typedef unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)251249e08aacStbbdev __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
251349e08aacStbbdev fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
251449e08aacStbbdev this->input_ports(), static_cast< sender< output_type > *>(this) );
251549e08aacStbbdev }
251649e08aacStbbdev
251749e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
251849e08aacStbbdev template <typename... Args>
indexer_node(const node_set<Args...> & nodes)251949e08aacStbbdev indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
252049e08aacStbbdev make_edges_in_order(nodes, *this);
252149e08aacStbbdev }
252249e08aacStbbdev #endif
252349e08aacStbbdev
252449e08aacStbbdev // Copy constructor
indexer_node(const indexer_node & other)252549e08aacStbbdev __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
252649e08aacStbbdev fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
252749e08aacStbbdev this->input_ports(), static_cast< sender< output_type > *>(this) );
252849e08aacStbbdev }
252949e08aacStbbdev
253049e08aacStbbdev };
253149e08aacStbbdev #endif //variadic max 7
253249e08aacStbbdev
253349e08aacStbbdev #if __TBB_VARIADIC_MAX >= 8
253449e08aacStbbdev template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
253549e08aacStbbdev typename T6, typename T7>
253649e08aacStbbdev class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5, T6, T7> > {
253749e08aacStbbdev private:
253849e08aacStbbdev static const int N = 8;
253949e08aacStbbdev public:
254049e08aacStbbdev typedef std::tuple<T0, T1, T2, T3, T4, T5, T6, T7> InputTuple;
254149e08aacStbbdev typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7> output_type;
254249e08aacStbbdev typedef unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)254349e08aacStbbdev indexer_node(graph& g) : unfolded_type(g) {
254449e08aacStbbdev fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
254549e08aacStbbdev this->input_ports(), static_cast< sender< output_type > *>(this) );
254649e08aacStbbdev }
254749e08aacStbbdev
254849e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
254949e08aacStbbdev template <typename... Args>
indexer_node(const node_set<Args...> & nodes)255049e08aacStbbdev indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
255149e08aacStbbdev make_edges_in_order(nodes, *this);
255249e08aacStbbdev }
255349e08aacStbbdev #endif
255449e08aacStbbdev
255549e08aacStbbdev // Copy constructor
indexer_node(const indexer_node & other)255649e08aacStbbdev indexer_node( const indexer_node& other ) : unfolded_type(other) {
255749e08aacStbbdev fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
255849e08aacStbbdev this->input_ports(), static_cast< sender< output_type > *>(this) );
255949e08aacStbbdev }
256049e08aacStbbdev
256149e08aacStbbdev };
256249e08aacStbbdev #endif //variadic max 8
256349e08aacStbbdev
256449e08aacStbbdev #if __TBB_VARIADIC_MAX >= 9
256549e08aacStbbdev template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
256649e08aacStbbdev typename T6, typename T7, typename T8>
256749e08aacStbbdev class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7, T8> : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> > {
256849e08aacStbbdev private:
256949e08aacStbbdev static const int N = 9;
257049e08aacStbbdev public:
257149e08aacStbbdev typedef std::tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> InputTuple;
257249e08aacStbbdev typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8> output_type;
257349e08aacStbbdev typedef unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)257449e08aacStbbdev __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
257549e08aacStbbdev fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
257649e08aacStbbdev this->input_ports(), static_cast< sender< output_type > *>(this) );
257749e08aacStbbdev }
257849e08aacStbbdev
257949e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
258049e08aacStbbdev template <typename... Args>
indexer_node(const node_set<Args...> & nodes)258149e08aacStbbdev indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
258249e08aacStbbdev make_edges_in_order(nodes, *this);
258349e08aacStbbdev }
258449e08aacStbbdev #endif
258549e08aacStbbdev
258649e08aacStbbdev // Copy constructor
indexer_node(const indexer_node & other)258749e08aacStbbdev __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
258849e08aacStbbdev fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
258949e08aacStbbdev this->input_ports(), static_cast< sender< output_type > *>(this) );
259049e08aacStbbdev }
259149e08aacStbbdev
259249e08aacStbbdev };
259349e08aacStbbdev #endif //variadic max 9
259449e08aacStbbdev
259549e08aacStbbdev #if __TBB_VARIADIC_MAX >= 10
259649e08aacStbbdev template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
259749e08aacStbbdev typename T6, typename T7, typename T8, typename T9>
259849e08aacStbbdev class indexer_node/*default*/ : public unfolded_indexer_node<std::tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> > {
259949e08aacStbbdev private:
260049e08aacStbbdev static const int N = 10;
260149e08aacStbbdev public:
260249e08aacStbbdev typedef std::tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> InputTuple;
260349e08aacStbbdev typedef tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> output_type;
260449e08aacStbbdev typedef unfolded_indexer_node<InputTuple> unfolded_type;
indexer_node(graph & g)260549e08aacStbbdev __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
260649e08aacStbbdev fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
260749e08aacStbbdev this->input_ports(), static_cast< sender< output_type > *>(this) );
260849e08aacStbbdev }
260949e08aacStbbdev
261049e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
261149e08aacStbbdev template <typename... Args>
indexer_node(const node_set<Args...> & nodes)261249e08aacStbbdev indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
261349e08aacStbbdev make_edges_in_order(nodes, *this);
261449e08aacStbbdev }
261549e08aacStbbdev #endif
261649e08aacStbbdev
261749e08aacStbbdev // Copy constructor
indexer_node(const indexer_node & other)261849e08aacStbbdev __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
261949e08aacStbbdev fgt_multiinput_node<N>( CODEPTR(), FLOW_INDEXER_NODE, &this->my_graph,
262049e08aacStbbdev this->input_ports(), static_cast< sender< output_type > *>(this) );
262149e08aacStbbdev }
262249e08aacStbbdev
262349e08aacStbbdev };
262449e08aacStbbdev #endif //variadic max 10
262549e08aacStbbdev
262649e08aacStbbdev template< typename T >
internal_make_edge(sender<T> & p,receiver<T> & s)262749e08aacStbbdev inline void internal_make_edge( sender<T> &p, receiver<T> &s ) {
262849e08aacStbbdev register_successor(p, s);
262949e08aacStbbdev fgt_make_edge( &p, &s );
263049e08aacStbbdev }
263149e08aacStbbdev
263249e08aacStbbdev //! Makes an edge between a single predecessor and a single successor
263349e08aacStbbdev template< typename T >
make_edge(sender<T> & p,receiver<T> & s)263449e08aacStbbdev inline void make_edge( sender<T> &p, receiver<T> &s ) {
263549e08aacStbbdev internal_make_edge( p, s );
263649e08aacStbbdev }
263749e08aacStbbdev
263849e08aacStbbdev //Makes an edge from port 0 of a multi-output predecessor to port 0 of a multi-input successor.
263949e08aacStbbdev template< typename T, typename V,
264049e08aacStbbdev typename = typename T::output_ports_type, typename = typename V::input_ports_type >
make_edge(T & output,V & input)264149e08aacStbbdev inline void make_edge( T& output, V& input) {
264249e08aacStbbdev make_edge(std::get<0>(output.output_ports()), std::get<0>(input.input_ports()));
264349e08aacStbbdev }
264449e08aacStbbdev
264549e08aacStbbdev //Makes an edge from port 0 of a multi-output predecessor to a receiver.
264649e08aacStbbdev template< typename T, typename R,
264749e08aacStbbdev typename = typename T::output_ports_type >
make_edge(T & output,receiver<R> & input)264849e08aacStbbdev inline void make_edge( T& output, receiver<R>& input) {
264949e08aacStbbdev make_edge(std::get<0>(output.output_ports()), input);
265049e08aacStbbdev }
265149e08aacStbbdev
265249e08aacStbbdev //Makes an edge from a sender to port 0 of a multi-input successor.
265349e08aacStbbdev template< typename S, typename V,
265449e08aacStbbdev typename = typename V::input_ports_type >
make_edge(sender<S> & output,V & input)265549e08aacStbbdev inline void make_edge( sender<S>& output, V& input) {
265649e08aacStbbdev make_edge(output, std::get<0>(input.input_ports()));
265749e08aacStbbdev }
265849e08aacStbbdev
265949e08aacStbbdev template< typename T >
internal_remove_edge(sender<T> & p,receiver<T> & s)266049e08aacStbbdev inline void internal_remove_edge( sender<T> &p, receiver<T> &s ) {
266149e08aacStbbdev remove_successor( p, s );
266249e08aacStbbdev fgt_remove_edge( &p, &s );
266349e08aacStbbdev }
266449e08aacStbbdev
266549e08aacStbbdev //! Removes an edge between a single predecessor and a single successor
266649e08aacStbbdev template< typename T >
remove_edge(sender<T> & p,receiver<T> & s)266749e08aacStbbdev inline void remove_edge( sender<T> &p, receiver<T> &s ) {
266849e08aacStbbdev internal_remove_edge( p, s );
266949e08aacStbbdev }
267049e08aacStbbdev
267149e08aacStbbdev //Removes an edge between port 0 of a multi-output predecessor and port 0 of a multi-input successor.
267249e08aacStbbdev template< typename T, typename V,
267349e08aacStbbdev typename = typename T::output_ports_type, typename = typename V::input_ports_type >
remove_edge(T & output,V & input)267449e08aacStbbdev inline void remove_edge( T& output, V& input) {
267549e08aacStbbdev remove_edge(std::get<0>(output.output_ports()), std::get<0>(input.input_ports()));
267649e08aacStbbdev }
267749e08aacStbbdev
267849e08aacStbbdev //Removes an edge between port 0 of a multi-output predecessor and a receiver.
267949e08aacStbbdev template< typename T, typename R,
268049e08aacStbbdev typename = typename T::output_ports_type >
remove_edge(T & output,receiver<R> & input)268149e08aacStbbdev inline void remove_edge( T& output, receiver<R>& input) {
268249e08aacStbbdev remove_edge(std::get<0>(output.output_ports()), input);
268349e08aacStbbdev }
268449e08aacStbbdev //Removes an edge between a sender and port 0 of a multi-input successor.
268549e08aacStbbdev template< typename S, typename V,
268649e08aacStbbdev typename = typename V::input_ports_type >
remove_edge(sender<S> & output,V & input)268749e08aacStbbdev inline void remove_edge( sender<S>& output, V& input) {
268849e08aacStbbdev remove_edge(output, std::get<0>(input.input_ports()));
268949e08aacStbbdev }
269049e08aacStbbdev
269149e08aacStbbdev //! Returns a copy of the body from a function or continue node
269249e08aacStbbdev template< typename Body, typename Node >
copy_body(Node & n)269349e08aacStbbdev Body copy_body( Node &n ) {
269449e08aacStbbdev return n.template copy_function_object<Body>();
269549e08aacStbbdev }
269649e08aacStbbdev
269749e08aacStbbdev //composite_node
269849e08aacStbbdev template< typename InputTuple, typename OutputTuple > class composite_node;
269949e08aacStbbdev
270049e08aacStbbdev template< typename... InputTypes, typename... OutputTypes>
270149e08aacStbbdev class composite_node <std::tuple<InputTypes...>, std::tuple<OutputTypes...> > : public graph_node {
270249e08aacStbbdev
270349e08aacStbbdev public:
270449e08aacStbbdev typedef std::tuple< receiver<InputTypes>&... > input_ports_type;
270549e08aacStbbdev typedef std::tuple< sender<OutputTypes>&... > output_ports_type;
270649e08aacStbbdev
270749e08aacStbbdev private:
270849e08aacStbbdev std::unique_ptr<input_ports_type> my_input_ports;
270949e08aacStbbdev std::unique_ptr<output_ports_type> my_output_ports;
271049e08aacStbbdev
271149e08aacStbbdev static const size_t NUM_INPUTS = sizeof...(InputTypes);
271249e08aacStbbdev static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
271349e08aacStbbdev
271449e08aacStbbdev protected:
reset_node(reset_flags)271549e08aacStbbdev void reset_node(reset_flags) override {}
271649e08aacStbbdev
271749e08aacStbbdev public:
composite_node(graph & g)271849e08aacStbbdev composite_node( graph &g ) : graph_node(g) {
271949e08aacStbbdev fgt_multiinput_multioutput_node( CODEPTR(), FLOW_COMPOSITE_NODE, this, &this->my_graph );
272049e08aacStbbdev }
272149e08aacStbbdev
272249e08aacStbbdev template<typename T1, typename T2>
set_external_ports(T1 && input_ports_tuple,T2 && output_ports_tuple)272349e08aacStbbdev void set_external_ports(T1&& input_ports_tuple, T2&& output_ports_tuple) {
272449e08aacStbbdev static_assert(NUM_INPUTS == std::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
272549e08aacStbbdev static_assert(NUM_OUTPUTS == std::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
272649e08aacStbbdev
272749e08aacStbbdev fgt_internal_input_alias_helper<T1, NUM_INPUTS>::alias_port( this, input_ports_tuple);
272849e08aacStbbdev fgt_internal_output_alias_helper<T2, NUM_OUTPUTS>::alias_port( this, output_ports_tuple);
272949e08aacStbbdev
273049e08aacStbbdev my_input_ports.reset( new input_ports_type(std::forward<T1>(input_ports_tuple)) );
273149e08aacStbbdev my_output_ports.reset( new output_ports_type(std::forward<T2>(output_ports_tuple)) );
273249e08aacStbbdev }
273349e08aacStbbdev
273449e08aacStbbdev template< typename... NodeTypes >
add_visible_nodes(const NodeTypes &...n)273549e08aacStbbdev void add_visible_nodes(const NodeTypes&... n) { add_nodes_impl(this, true, n...); }
273649e08aacStbbdev
273749e08aacStbbdev template< typename... NodeTypes >
add_nodes(const NodeTypes &...n)273849e08aacStbbdev void add_nodes(const NodeTypes&... n) { add_nodes_impl(this, false, n...); }
273949e08aacStbbdev
274049e08aacStbbdev
input_ports()274149e08aacStbbdev input_ports_type& input_ports() {
274249e08aacStbbdev __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
274349e08aacStbbdev return *my_input_ports;
274449e08aacStbbdev }
274549e08aacStbbdev
output_ports()274649e08aacStbbdev output_ports_type& output_ports() {
274749e08aacStbbdev __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
274849e08aacStbbdev return *my_output_ports;
274949e08aacStbbdev }
275049e08aacStbbdev }; // class composite_node
275149e08aacStbbdev
275249e08aacStbbdev //composite_node with only input ports
275349e08aacStbbdev template< typename... InputTypes>
275449e08aacStbbdev class composite_node <std::tuple<InputTypes...>, std::tuple<> > : public graph_node {
275549e08aacStbbdev public:
275649e08aacStbbdev typedef std::tuple< receiver<InputTypes>&... > input_ports_type;
275749e08aacStbbdev
275849e08aacStbbdev private:
275949e08aacStbbdev std::unique_ptr<input_ports_type> my_input_ports;
276049e08aacStbbdev static const size_t NUM_INPUTS = sizeof...(InputTypes);
276149e08aacStbbdev
276249e08aacStbbdev protected:
reset_node(reset_flags)276349e08aacStbbdev void reset_node(reset_flags) override {}
276449e08aacStbbdev
276549e08aacStbbdev public:
composite_node(graph & g)276649e08aacStbbdev composite_node( graph &g ) : graph_node(g) {
276749e08aacStbbdev fgt_composite( CODEPTR(), this, &g );
276849e08aacStbbdev }
276949e08aacStbbdev
277049e08aacStbbdev template<typename T>
set_external_ports(T && input_ports_tuple)277149e08aacStbbdev void set_external_ports(T&& input_ports_tuple) {
277249e08aacStbbdev static_assert(NUM_INPUTS == std::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
277349e08aacStbbdev
277449e08aacStbbdev fgt_internal_input_alias_helper<T, NUM_INPUTS>::alias_port( this, input_ports_tuple);
277549e08aacStbbdev
277649e08aacStbbdev my_input_ports.reset( new input_ports_type(std::forward<T>(input_ports_tuple)) );
277749e08aacStbbdev }
277849e08aacStbbdev
277949e08aacStbbdev template< typename... NodeTypes >
add_visible_nodes(const NodeTypes &...n)278049e08aacStbbdev void add_visible_nodes(const NodeTypes&... n) { add_nodes_impl(this, true, n...); }
278149e08aacStbbdev
278249e08aacStbbdev template< typename... NodeTypes >
add_nodes(const NodeTypes &...n)278349e08aacStbbdev void add_nodes( const NodeTypes&... n) { add_nodes_impl(this, false, n...); }
278449e08aacStbbdev
278549e08aacStbbdev
input_ports()278649e08aacStbbdev input_ports_type& input_ports() {
278749e08aacStbbdev __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
278849e08aacStbbdev return *my_input_ports;
278949e08aacStbbdev }
279049e08aacStbbdev
279149e08aacStbbdev }; // class composite_node
279249e08aacStbbdev
279349e08aacStbbdev //composite_nodes with only output_ports
279449e08aacStbbdev template<typename... OutputTypes>
279549e08aacStbbdev class composite_node <std::tuple<>, std::tuple<OutputTypes...> > : public graph_node {
279649e08aacStbbdev public:
279749e08aacStbbdev typedef std::tuple< sender<OutputTypes>&... > output_ports_type;
279849e08aacStbbdev
279949e08aacStbbdev private:
280049e08aacStbbdev std::unique_ptr<output_ports_type> my_output_ports;
280149e08aacStbbdev static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
280249e08aacStbbdev
280349e08aacStbbdev protected:
reset_node(reset_flags)280449e08aacStbbdev void reset_node(reset_flags) override {}
280549e08aacStbbdev
280649e08aacStbbdev public:
composite_node(graph & g)280749e08aacStbbdev __TBB_NOINLINE_SYM composite_node( graph &g ) : graph_node(g) {
280849e08aacStbbdev fgt_composite( CODEPTR(), this, &g );
280949e08aacStbbdev }
281049e08aacStbbdev
281149e08aacStbbdev template<typename T>
set_external_ports(T && output_ports_tuple)281249e08aacStbbdev void set_external_ports(T&& output_ports_tuple) {
281349e08aacStbbdev static_assert(NUM_OUTPUTS == std::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
281449e08aacStbbdev
281549e08aacStbbdev fgt_internal_output_alias_helper<T, NUM_OUTPUTS>::alias_port( this, output_ports_tuple);
281649e08aacStbbdev
281749e08aacStbbdev my_output_ports.reset( new output_ports_type(std::forward<T>(output_ports_tuple)) );
281849e08aacStbbdev }
281949e08aacStbbdev
282049e08aacStbbdev template<typename... NodeTypes >
add_visible_nodes(const NodeTypes &...n)282149e08aacStbbdev void add_visible_nodes(const NodeTypes&... n) { add_nodes_impl(this, true, n...); }
282249e08aacStbbdev
282349e08aacStbbdev template<typename... NodeTypes >
add_nodes(const NodeTypes &...n)282449e08aacStbbdev void add_nodes(const NodeTypes&... n) { add_nodes_impl(this, false, n...); }
282549e08aacStbbdev
282649e08aacStbbdev
output_ports()282749e08aacStbbdev output_ports_type& output_ports() {
282849e08aacStbbdev __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
282949e08aacStbbdev return *my_output_ports;
283049e08aacStbbdev }
283149e08aacStbbdev
283249e08aacStbbdev }; // class composite_node
283349e08aacStbbdev
283449e08aacStbbdev template<typename Gateway>
283549e08aacStbbdev class async_body_base: no_assign {
283649e08aacStbbdev public:
283749e08aacStbbdev typedef Gateway gateway_type;
283849e08aacStbbdev
async_body_base(gateway_type * gateway)283949e08aacStbbdev async_body_base(gateway_type *gateway): my_gateway(gateway) { }
set_gateway(gateway_type * gateway)284049e08aacStbbdev void set_gateway(gateway_type *gateway) {
284149e08aacStbbdev my_gateway = gateway;
284249e08aacStbbdev }
284349e08aacStbbdev
284449e08aacStbbdev protected:
284549e08aacStbbdev gateway_type *my_gateway;
284649e08aacStbbdev };
284749e08aacStbbdev
284849e08aacStbbdev template<typename Input, typename Ports, typename Gateway, typename Body>
284949e08aacStbbdev class async_body: public async_body_base<Gateway> {
2850324afd9eSIlya Mishin private:
2851324afd9eSIlya Mishin Body my_body;
2852324afd9eSIlya Mishin
285349e08aacStbbdev public:
285449e08aacStbbdev typedef async_body_base<Gateway> base_type;
285549e08aacStbbdev typedef Gateway gateway_type;
285649e08aacStbbdev
async_body(const Body & body,gateway_type * gateway)285749e08aacStbbdev async_body(const Body &body, gateway_type *gateway)
285849e08aacStbbdev : base_type(gateway), my_body(body) { }
285949e08aacStbbdev
operator()2860*a088cfa0SKonstantin Boyarinov void operator()( const Input &v, Ports & ) noexcept(noexcept(tbb::detail::invoke(my_body, v, std::declval<gateway_type&>()))) {
2861*a088cfa0SKonstantin Boyarinov tbb::detail::invoke(my_body, v, *this->my_gateway);
286249e08aacStbbdev }
286349e08aacStbbdev
get_body()286449e08aacStbbdev Body get_body() { return my_body; }
286549e08aacStbbdev };
286649e08aacStbbdev
286749e08aacStbbdev //! Implements async node
286849e08aacStbbdev template < typename Input, typename Output,
286949e08aacStbbdev typename Policy = queueing_lightweight >
__TBB_requires(std::default_initializable<Input> && std::copy_constructible<Input>)2870478de5b1Stbbdev __TBB_requires(std::default_initializable<Input> && std::copy_constructible<Input>)
287149e08aacStbbdev class async_node
287249e08aacStbbdev : public multifunction_node< Input, std::tuple< Output >, Policy >, public sender< Output >
287349e08aacStbbdev {
287449e08aacStbbdev typedef multifunction_node< Input, std::tuple< Output >, Policy > base_type;
287549e08aacStbbdev typedef multifunction_input<
287649e08aacStbbdev Input, typename base_type::output_ports_type, Policy, cache_aligned_allocator<Input>> mfn_input_type;
287749e08aacStbbdev
287849e08aacStbbdev public:
287949e08aacStbbdev typedef Input input_type;
288049e08aacStbbdev typedef Output output_type;
288149e08aacStbbdev typedef receiver<input_type> receiver_type;
288249e08aacStbbdev typedef receiver<output_type> successor_type;
288349e08aacStbbdev typedef sender<input_type> predecessor_type;
288449e08aacStbbdev typedef receiver_gateway<output_type> gateway_type;
288549e08aacStbbdev typedef async_body_base<gateway_type> async_body_base_type;
288649e08aacStbbdev typedef typename base_type::output_ports_type output_ports_type;
288749e08aacStbbdev
288849e08aacStbbdev private:
288949e08aacStbbdev class receiver_gateway_impl: public receiver_gateway<Output> {
289049e08aacStbbdev public:
289149e08aacStbbdev receiver_gateway_impl(async_node* node): my_node(node) {}
289249e08aacStbbdev void reserve_wait() override {
289349e08aacStbbdev fgt_async_reserve(static_cast<typename async_node::receiver_type *>(my_node), &my_node->my_graph);
289449e08aacStbbdev my_node->my_graph.reserve_wait();
289549e08aacStbbdev }
289649e08aacStbbdev
289749e08aacStbbdev void release_wait() override {
2898b15aabb3Stbbdev async_node* n = my_node;
2899b15aabb3Stbbdev graph* g = &n->my_graph;
2900b15aabb3Stbbdev g->release_wait();
2901b15aabb3Stbbdev fgt_async_commit(static_cast<typename async_node::receiver_type *>(n), g);
290249e08aacStbbdev }
290349e08aacStbbdev
290449e08aacStbbdev //! Implements gateway_type::try_put for an external activity to submit a message to FG
290549e08aacStbbdev bool try_put(const Output &i) override {
290649e08aacStbbdev return my_node->try_put_impl(i);
290749e08aacStbbdev }
290849e08aacStbbdev
290949e08aacStbbdev private:
291049e08aacStbbdev async_node* my_node;
291149e08aacStbbdev } my_gateway;
291249e08aacStbbdev
291349e08aacStbbdev //The substitute of 'this' for member construction, to prevent compiler warnings
291449e08aacStbbdev async_node* self() { return this; }
291549e08aacStbbdev
291649e08aacStbbdev //! Implements gateway_type::try_put for an external activity to submit a message to FG
291749e08aacStbbdev bool try_put_impl(const Output &i) {
291849e08aacStbbdev multifunction_output<Output> &port_0 = output_port<0>(*this);
291949e08aacStbbdev broadcast_cache<output_type>& port_successors = port_0.successors();
292049e08aacStbbdev fgt_async_try_put_begin(this, &port_0);
292149e08aacStbbdev // TODO revamp: change to std::list<graph_task*>
292249e08aacStbbdev graph_task_list tasks;
292349e08aacStbbdev bool is_at_least_one_put_successful = port_successors.gather_successful_try_puts(i, tasks);
292449e08aacStbbdev __TBB_ASSERT( is_at_least_one_put_successful || tasks.empty(),
292549e08aacStbbdev "Return status is inconsistent with the method operation." );
292649e08aacStbbdev
292749e08aacStbbdev while( !tasks.empty() ) {
292849e08aacStbbdev enqueue_in_graph_arena(this->my_graph, tasks.pop_front());
292949e08aacStbbdev }
293049e08aacStbbdev fgt_async_try_put_end(this, &port_0);
293149e08aacStbbdev return is_at_least_one_put_successful;
293249e08aacStbbdev }
293349e08aacStbbdev
293449e08aacStbbdev public:
293549e08aacStbbdev template<typename Body>
2936478de5b1Stbbdev __TBB_requires(async_node_body<Body, input_type, gateway_type>)
293749e08aacStbbdev __TBB_NOINLINE_SYM async_node(
293849e08aacStbbdev graph &g, size_t concurrency,
293949e08aacStbbdev Body body, Policy = Policy(), node_priority_t a_priority = no_priority
294049e08aacStbbdev ) : base_type(
294149e08aacStbbdev g, concurrency,
294249e08aacStbbdev async_body<Input, typename base_type::output_ports_type, gateway_type, Body>
294349e08aacStbbdev (body, &my_gateway), a_priority ), my_gateway(self()) {
294449e08aacStbbdev fgt_multioutput_node_with_body<1>(
294549e08aacStbbdev CODEPTR(), FLOW_ASYNC_NODE,
294649e08aacStbbdev &this->my_graph, static_cast<receiver<input_type> *>(this),
294749e08aacStbbdev this->output_ports(), this->my_body
294849e08aacStbbdev );
294949e08aacStbbdev }
295049e08aacStbbdev
2951478de5b1Stbbdev template <typename Body>
2952478de5b1Stbbdev __TBB_requires(async_node_body<Body, input_type, gateway_type>)
2953b15aabb3Stbbdev __TBB_NOINLINE_SYM async_node(graph& g, size_t concurrency, Body body, node_priority_t a_priority)
2954b15aabb3Stbbdev : async_node(g, concurrency, body, Policy(), a_priority) {}
295549e08aacStbbdev
295649e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
295749e08aacStbbdev template <typename Body, typename... Args>
2958478de5b1Stbbdev __TBB_requires(async_node_body<Body, input_type, gateway_type>)
295949e08aacStbbdev __TBB_NOINLINE_SYM async_node(
296049e08aacStbbdev const node_set<Args...>& nodes, size_t concurrency, Body body,
296149e08aacStbbdev Policy = Policy(), node_priority_t a_priority = no_priority )
296249e08aacStbbdev : async_node(nodes.graph_reference(), concurrency, body, a_priority) {
296349e08aacStbbdev make_edges_in_order(nodes, *this);
296449e08aacStbbdev }
296549e08aacStbbdev
296649e08aacStbbdev template <typename Body, typename... Args>
2967478de5b1Stbbdev __TBB_requires(async_node_body<Body, input_type, gateway_type>)
2968b15aabb3Stbbdev __TBB_NOINLINE_SYM async_node(const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t a_priority)
2969b15aabb3Stbbdev : async_node(nodes, concurrency, body, Policy(), a_priority) {}
297049e08aacStbbdev #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
297149e08aacStbbdev
297249e08aacStbbdev __TBB_NOINLINE_SYM async_node( const async_node &other ) : base_type(other), sender<Output>(), my_gateway(self()) {
297349e08aacStbbdev static_cast<async_body_base_type*>(this->my_body->get_body_ptr())->set_gateway(&my_gateway);
297449e08aacStbbdev static_cast<async_body_base_type*>(this->my_init_body->get_body_ptr())->set_gateway(&my_gateway);
297549e08aacStbbdev
297649e08aacStbbdev fgt_multioutput_node_with_body<1>( CODEPTR(), FLOW_ASYNC_NODE,
297749e08aacStbbdev &this->my_graph, static_cast<receiver<input_type> *>(this),
297849e08aacStbbdev this->output_ports(), this->my_body );
297949e08aacStbbdev }
298049e08aacStbbdev
298149e08aacStbbdev gateway_type& gateway() {
298249e08aacStbbdev return my_gateway;
298349e08aacStbbdev }
298449e08aacStbbdev
298549e08aacStbbdev // Define sender< Output >
298649e08aacStbbdev
298749e08aacStbbdev //! Add a new successor to this node
298849e08aacStbbdev bool register_successor(successor_type&) override {
298949e08aacStbbdev __TBB_ASSERT(false, "Successors must be registered only via ports");
299049e08aacStbbdev return false;
299149e08aacStbbdev }
299249e08aacStbbdev
299349e08aacStbbdev //! Removes a successor from this node
299449e08aacStbbdev bool remove_successor(successor_type&) override {
299549e08aacStbbdev __TBB_ASSERT(false, "Successors must be removed only via ports");
299649e08aacStbbdev return false;
299749e08aacStbbdev }
299849e08aacStbbdev
299949e08aacStbbdev template<typename Body>
300049e08aacStbbdev Body copy_function_object() {
300149e08aacStbbdev typedef multifunction_body<input_type, typename base_type::output_ports_type> mfn_body_type;
300249e08aacStbbdev typedef async_body<Input, typename base_type::output_ports_type, gateway_type, Body> async_body_type;
300349e08aacStbbdev mfn_body_type &body_ref = *this->my_body;
300449e08aacStbbdev async_body_type ab = *static_cast<async_body_type*>(dynamic_cast< multifunction_body_leaf<input_type, typename base_type::output_ports_type, async_body_type> & >(body_ref).get_body_ptr());
300549e08aacStbbdev return ab.get_body();
300649e08aacStbbdev }
300749e08aacStbbdev
300849e08aacStbbdev protected:
300949e08aacStbbdev
301049e08aacStbbdev void reset_node( reset_flags f) override {
301149e08aacStbbdev base_type::reset_node(f);
301249e08aacStbbdev }
301349e08aacStbbdev };
301449e08aacStbbdev
301549e08aacStbbdev #include "detail/_flow_graph_node_set_impl.h"
301649e08aacStbbdev
301749e08aacStbbdev template< typename T >
301849e08aacStbbdev class overwrite_node : public graph_node, public receiver<T>, public sender<T> {
301949e08aacStbbdev public:
302049e08aacStbbdev typedef T input_type;
302149e08aacStbbdev typedef T output_type;
302249e08aacStbbdev typedef typename receiver<input_type>::predecessor_type predecessor_type;
302349e08aacStbbdev typedef typename sender<output_type>::successor_type successor_type;
302449e08aacStbbdev
overwrite_node(graph & g)302549e08aacStbbdev __TBB_NOINLINE_SYM explicit overwrite_node(graph &g)
302649e08aacStbbdev : graph_node(g), my_successors(this), my_buffer_is_valid(false)
302749e08aacStbbdev {
302849e08aacStbbdev fgt_node( CODEPTR(), FLOW_OVERWRITE_NODE, &this->my_graph,
302949e08aacStbbdev static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
303049e08aacStbbdev }
303149e08aacStbbdev
303249e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
303349e08aacStbbdev template <typename... Args>
overwrite_node(const node_set<Args...> & nodes)303449e08aacStbbdev overwrite_node(const node_set<Args...>& nodes) : overwrite_node(nodes.graph_reference()) {
303549e08aacStbbdev make_edges_in_order(nodes, *this);
303649e08aacStbbdev }
303749e08aacStbbdev #endif
303849e08aacStbbdev
303949e08aacStbbdev //! Copy constructor; doesn't take anything from src; default won't work
overwrite_node(const overwrite_node & src)304049e08aacStbbdev __TBB_NOINLINE_SYM overwrite_node( const overwrite_node& src ) : overwrite_node(src.my_graph) {}
304149e08aacStbbdev
~overwrite_node()304249e08aacStbbdev ~overwrite_node() {}
304349e08aacStbbdev
register_successor(successor_type & s)304449e08aacStbbdev bool register_successor( successor_type &s ) override {
304549e08aacStbbdev spin_mutex::scoped_lock l( my_mutex );
304649e08aacStbbdev if (my_buffer_is_valid && is_graph_active( my_graph )) {
304749e08aacStbbdev // We have a valid value that must be forwarded immediately.
304849e08aacStbbdev bool ret = s.try_put( my_buffer );
304949e08aacStbbdev if ( ret ) {
305049e08aacStbbdev // We add the successor that accepted our put
305149e08aacStbbdev my_successors.register_successor( s );
305249e08aacStbbdev } else {
305349e08aacStbbdev // In case of reservation a race between the moment of reservation and register_successor can appear,
305449e08aacStbbdev // because failed reserve does not mean that register_successor is not ready to put a message immediately.
305549e08aacStbbdev // We have some sort of infinite loop: reserving node tries to set pull state for the edge,
305649e08aacStbbdev // but overwrite_node tries to return push state back. That is why we have to break this loop with task creation.
305749e08aacStbbdev small_object_allocator allocator{};
305849e08aacStbbdev typedef register_predecessor_task task_type;
305949e08aacStbbdev graph_task* t = allocator.new_object<task_type>(graph_reference(), allocator, *this, s);
306049e08aacStbbdev graph_reference().reserve_wait();
306149e08aacStbbdev spawn_in_graph_arena( my_graph, *t );
306249e08aacStbbdev }
306349e08aacStbbdev } else {
306449e08aacStbbdev // No valid value yet, just add as successor
306549e08aacStbbdev my_successors.register_successor( s );
306649e08aacStbbdev }
306749e08aacStbbdev return true;
306849e08aacStbbdev }
306949e08aacStbbdev
remove_successor(successor_type & s)307049e08aacStbbdev bool remove_successor( successor_type &s ) override {
307149e08aacStbbdev spin_mutex::scoped_lock l( my_mutex );
307249e08aacStbbdev my_successors.remove_successor(s);
307349e08aacStbbdev return true;
307449e08aacStbbdev }
307549e08aacStbbdev
try_get(input_type & v)307649e08aacStbbdev bool try_get( input_type &v ) override {
307749e08aacStbbdev spin_mutex::scoped_lock l( my_mutex );
307849e08aacStbbdev if ( my_buffer_is_valid ) {
307949e08aacStbbdev v = my_buffer;
308049e08aacStbbdev return true;
308149e08aacStbbdev }
308249e08aacStbbdev return false;
308349e08aacStbbdev }
308449e08aacStbbdev
308549e08aacStbbdev //! Reserves an item
try_reserve(T & v)308649e08aacStbbdev bool try_reserve( T &v ) override {
308749e08aacStbbdev return try_get(v);
308849e08aacStbbdev }
308949e08aacStbbdev
309049e08aacStbbdev //! Releases the reserved item
try_release()309149e08aacStbbdev bool try_release() override { return true; }
309249e08aacStbbdev
309349e08aacStbbdev //! Consumes the reserved item
try_consume()309449e08aacStbbdev bool try_consume() override { return true; }
309549e08aacStbbdev
is_valid()309649e08aacStbbdev bool is_valid() {
309749e08aacStbbdev spin_mutex::scoped_lock l( my_mutex );
309849e08aacStbbdev return my_buffer_is_valid;
309949e08aacStbbdev }
310049e08aacStbbdev
clear()310149e08aacStbbdev void clear() {
310249e08aacStbbdev spin_mutex::scoped_lock l( my_mutex );
310349e08aacStbbdev my_buffer_is_valid = false;
310449e08aacStbbdev }
310549e08aacStbbdev
310649e08aacStbbdev protected:
310749e08aacStbbdev
310849e08aacStbbdev template< typename R, typename B > friend class run_and_put_task;
310949e08aacStbbdev template<typename X, typename Y> friend class broadcast_cache;
311049e08aacStbbdev template<typename X, typename Y> friend class round_robin_cache;
try_put_task(const input_type & v)311149e08aacStbbdev graph_task* try_put_task( const input_type &v ) override {
311249e08aacStbbdev spin_mutex::scoped_lock l( my_mutex );
311349e08aacStbbdev return try_put_task_impl(v);
311449e08aacStbbdev }
311549e08aacStbbdev
try_put_task_impl(const input_type & v)311649e08aacStbbdev graph_task * try_put_task_impl(const input_type &v) {
311749e08aacStbbdev my_buffer = v;
311849e08aacStbbdev my_buffer_is_valid = true;
311949e08aacStbbdev graph_task* rtask = my_successors.try_put_task(v);
312049e08aacStbbdev if (!rtask) rtask = SUCCESSFULLY_ENQUEUED;
312149e08aacStbbdev return rtask;
312249e08aacStbbdev }
312349e08aacStbbdev
graph_reference()312449e08aacStbbdev graph& graph_reference() const override {
312549e08aacStbbdev return my_graph;
312649e08aacStbbdev }
312749e08aacStbbdev
312849e08aacStbbdev //! Breaks an infinite loop between the node reservation and register_successor call
312949e08aacStbbdev struct register_predecessor_task : public graph_task {
register_predecessor_taskregister_predecessor_task313049e08aacStbbdev register_predecessor_task(
313149e08aacStbbdev graph& g, small_object_allocator& allocator, predecessor_type& owner, successor_type& succ)
313249e08aacStbbdev : graph_task(g, allocator), o(owner), s(succ) {};
313349e08aacStbbdev
executeregister_predecessor_task313449e08aacStbbdev task* execute(execution_data& ed) override {
313549e08aacStbbdev // TODO revamp: investigate why qualification is needed for register_successor() call
313649e08aacStbbdev using tbb::detail::d1::register_predecessor;
313749e08aacStbbdev using tbb::detail::d1::register_successor;
313849e08aacStbbdev if ( !register_predecessor(s, o) ) {
313949e08aacStbbdev register_successor(o, s);
314049e08aacStbbdev }
31416edb5c3aSVladimir Serov finalize<register_predecessor_task>(ed);
31426edb5c3aSVladimir Serov return nullptr;
31436edb5c3aSVladimir Serov }
31446edb5c3aSVladimir Serov
cancelregister_predecessor_task31456edb5c3aSVladimir Serov task* cancel(execution_data& ed) override {
31466edb5c3aSVladimir Serov finalize<register_predecessor_task>(ed);
314749e08aacStbbdev return nullptr;
314849e08aacStbbdev }
314949e08aacStbbdev
315049e08aacStbbdev predecessor_type& o;
315149e08aacStbbdev successor_type& s;
315249e08aacStbbdev };
315349e08aacStbbdev
315449e08aacStbbdev spin_mutex my_mutex;
315549e08aacStbbdev broadcast_cache< input_type, null_rw_mutex > my_successors;
315649e08aacStbbdev input_type my_buffer;
315749e08aacStbbdev bool my_buffer_is_valid;
315849e08aacStbbdev
reset_node(reset_flags f)315949e08aacStbbdev void reset_node( reset_flags f) override {
316049e08aacStbbdev my_buffer_is_valid = false;
316149e08aacStbbdev if (f&rf_clear_edges) {
316249e08aacStbbdev my_successors.clear();
316349e08aacStbbdev }
316449e08aacStbbdev }
316549e08aacStbbdev }; // overwrite_node
316649e08aacStbbdev
316749e08aacStbbdev template< typename T >
316849e08aacStbbdev class write_once_node : public overwrite_node<T> {
316949e08aacStbbdev public:
317049e08aacStbbdev typedef T input_type;
317149e08aacStbbdev typedef T output_type;
317249e08aacStbbdev typedef overwrite_node<T> base_type;
317349e08aacStbbdev typedef typename receiver<input_type>::predecessor_type predecessor_type;
317449e08aacStbbdev typedef typename sender<output_type>::successor_type successor_type;
317549e08aacStbbdev
317649e08aacStbbdev //! Constructor
write_once_node(graph & g)317749e08aacStbbdev __TBB_NOINLINE_SYM explicit write_once_node(graph& g) : base_type(g) {
317849e08aacStbbdev fgt_node( CODEPTR(), FLOW_WRITE_ONCE_NODE, &(this->my_graph),
317949e08aacStbbdev static_cast<receiver<input_type> *>(this),
318049e08aacStbbdev static_cast<sender<output_type> *>(this) );
318149e08aacStbbdev }
318249e08aacStbbdev
318349e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
318449e08aacStbbdev template <typename... Args>
write_once_node(const node_set<Args...> & nodes)318549e08aacStbbdev write_once_node(const node_set<Args...>& nodes) : write_once_node(nodes.graph_reference()) {
318649e08aacStbbdev make_edges_in_order(nodes, *this);
318749e08aacStbbdev }
318849e08aacStbbdev #endif
318949e08aacStbbdev
319049e08aacStbbdev //! Copy constructor: call base class copy constructor
write_once_node(const write_once_node & src)319149e08aacStbbdev __TBB_NOINLINE_SYM write_once_node( const write_once_node& src ) : base_type(src) {
319249e08aacStbbdev fgt_node( CODEPTR(), FLOW_WRITE_ONCE_NODE, &(this->my_graph),
319349e08aacStbbdev static_cast<receiver<input_type> *>(this),
319449e08aacStbbdev static_cast<sender<output_type> *>(this) );
319549e08aacStbbdev }
319649e08aacStbbdev
319749e08aacStbbdev protected:
319849e08aacStbbdev template< typename R, typename B > friend class run_and_put_task;
319949e08aacStbbdev template<typename X, typename Y> friend class broadcast_cache;
320049e08aacStbbdev template<typename X, typename Y> friend class round_robin_cache;
try_put_task(const T & v)320149e08aacStbbdev graph_task *try_put_task( const T &v ) override {
320249e08aacStbbdev spin_mutex::scoped_lock l( this->my_mutex );
320357f524caSIlya Isaev return this->my_buffer_is_valid ? nullptr : this->try_put_task_impl(v);
320449e08aacStbbdev }
320549e08aacStbbdev }; // write_once_node
320649e08aacStbbdev
set_name(const graph & g,const char * name)320749e08aacStbbdev inline void set_name(const graph& g, const char *name) {
320849e08aacStbbdev fgt_graph_desc(&g, name);
320949e08aacStbbdev }
321049e08aacStbbdev
321149e08aacStbbdev template <typename Output>
set_name(const input_node<Output> & node,const char * name)321249e08aacStbbdev inline void set_name(const input_node<Output>& node, const char *name) {
321349e08aacStbbdev fgt_node_desc(&node, name);
321449e08aacStbbdev }
321549e08aacStbbdev
321649e08aacStbbdev template <typename Input, typename Output, typename Policy>
set_name(const function_node<Input,Output,Policy> & node,const char * name)321749e08aacStbbdev inline void set_name(const function_node<Input, Output, Policy>& node, const char *name) {
321849e08aacStbbdev fgt_node_desc(&node, name);
321949e08aacStbbdev }
322049e08aacStbbdev
322149e08aacStbbdev template <typename Output, typename Policy>
set_name(const continue_node<Output,Policy> & node,const char * name)322249e08aacStbbdev inline void set_name(const continue_node<Output,Policy>& node, const char *name) {
322349e08aacStbbdev fgt_node_desc(&node, name);
322449e08aacStbbdev }
322549e08aacStbbdev
322649e08aacStbbdev template <typename T>
set_name(const broadcast_node<T> & node,const char * name)322749e08aacStbbdev inline void set_name(const broadcast_node<T>& node, const char *name) {
322849e08aacStbbdev fgt_node_desc(&node, name);
322949e08aacStbbdev }
323049e08aacStbbdev
323149e08aacStbbdev template <typename T>
set_name(const buffer_node<T> & node,const char * name)323249e08aacStbbdev inline void set_name(const buffer_node<T>& node, const char *name) {
323349e08aacStbbdev fgt_node_desc(&node, name);
323449e08aacStbbdev }
323549e08aacStbbdev
323649e08aacStbbdev template <typename T>
set_name(const queue_node<T> & node,const char * name)323749e08aacStbbdev inline void set_name(const queue_node<T>& node, const char *name) {
323849e08aacStbbdev fgt_node_desc(&node, name);
323949e08aacStbbdev }
324049e08aacStbbdev
324149e08aacStbbdev template <typename T>
set_name(const sequencer_node<T> & node,const char * name)324249e08aacStbbdev inline void set_name(const sequencer_node<T>& node, const char *name) {
324349e08aacStbbdev fgt_node_desc(&node, name);
324449e08aacStbbdev }
324549e08aacStbbdev
324649e08aacStbbdev template <typename T, typename Compare>
set_name(const priority_queue_node<T,Compare> & node,const char * name)324749e08aacStbbdev inline void set_name(const priority_queue_node<T, Compare>& node, const char *name) {
324849e08aacStbbdev fgt_node_desc(&node, name);
324949e08aacStbbdev }
325049e08aacStbbdev
325149e08aacStbbdev template <typename T, typename DecrementType>
set_name(const limiter_node<T,DecrementType> & node,const char * name)325249e08aacStbbdev inline void set_name(const limiter_node<T, DecrementType>& node, const char *name) {
325349e08aacStbbdev fgt_node_desc(&node, name);
325449e08aacStbbdev }
325549e08aacStbbdev
325649e08aacStbbdev template <typename OutputTuple, typename JP>
set_name(const join_node<OutputTuple,JP> & node,const char * name)325749e08aacStbbdev inline void set_name(const join_node<OutputTuple, JP>& node, const char *name) {
325849e08aacStbbdev fgt_node_desc(&node, name);
325949e08aacStbbdev }
326049e08aacStbbdev
326149e08aacStbbdev template <typename... Types>
set_name(const indexer_node<Types...> & node,const char * name)326249e08aacStbbdev inline void set_name(const indexer_node<Types...>& node, const char *name) {
326349e08aacStbbdev fgt_node_desc(&node, name);
326449e08aacStbbdev }
326549e08aacStbbdev
326649e08aacStbbdev template <typename T>
set_name(const overwrite_node<T> & node,const char * name)326749e08aacStbbdev inline void set_name(const overwrite_node<T>& node, const char *name) {
326849e08aacStbbdev fgt_node_desc(&node, name);
326949e08aacStbbdev }
327049e08aacStbbdev
327149e08aacStbbdev template <typename T>
set_name(const write_once_node<T> & node,const char * name)327249e08aacStbbdev inline void set_name(const write_once_node<T>& node, const char *name) {
327349e08aacStbbdev fgt_node_desc(&node, name);
327449e08aacStbbdev }
327549e08aacStbbdev
327649e08aacStbbdev template<typename Input, typename Output, typename Policy>
set_name(const multifunction_node<Input,Output,Policy> & node,const char * name)327749e08aacStbbdev inline void set_name(const multifunction_node<Input, Output, Policy>& node, const char *name) {
327849e08aacStbbdev fgt_multioutput_node_desc(&node, name);
327949e08aacStbbdev }
328049e08aacStbbdev
328149e08aacStbbdev template<typename TupleType>
set_name(const split_node<TupleType> & node,const char * name)328249e08aacStbbdev inline void set_name(const split_node<TupleType>& node, const char *name) {
328349e08aacStbbdev fgt_multioutput_node_desc(&node, name);
328449e08aacStbbdev }
328549e08aacStbbdev
328649e08aacStbbdev template< typename InputTuple, typename OutputTuple >
set_name(const composite_node<InputTuple,OutputTuple> & node,const char * name)328749e08aacStbbdev inline void set_name(const composite_node<InputTuple, OutputTuple>& node, const char *name) {
328849e08aacStbbdev fgt_multiinput_multioutput_node_desc(&node, name);
328949e08aacStbbdev }
329049e08aacStbbdev
329149e08aacStbbdev template<typename Input, typename Output, typename Policy>
set_name(const async_node<Input,Output,Policy> & node,const char * name)329249e08aacStbbdev inline void set_name(const async_node<Input, Output, Policy>& node, const char *name)
329349e08aacStbbdev {
329449e08aacStbbdev fgt_multioutput_node_desc(&node, name);
329549e08aacStbbdev }
329649e08aacStbbdev } // d1
329749e08aacStbbdev } // detail
329849e08aacStbbdev } // tbb
329949e08aacStbbdev
330049e08aacStbbdev
330149e08aacStbbdev // Include deduction guides for node classes
330249e08aacStbbdev #include "detail/_flow_graph_nodes_deduction.h"
330349e08aacStbbdev
330449e08aacStbbdev namespace tbb {
330549e08aacStbbdev namespace flow {
330649e08aacStbbdev inline namespace v1 {
330749e08aacStbbdev using detail::d1::receiver;
330849e08aacStbbdev using detail::d1::sender;
330949e08aacStbbdev
331049e08aacStbbdev using detail::d1::serial;
331149e08aacStbbdev using detail::d1::unlimited;
331249e08aacStbbdev
331349e08aacStbbdev using detail::d1::reset_flags;
331449e08aacStbbdev using detail::d1::rf_reset_protocol;
331549e08aacStbbdev using detail::d1::rf_reset_bodies;
331649e08aacStbbdev using detail::d1::rf_clear_edges;
331749e08aacStbbdev
331849e08aacStbbdev using detail::d1::graph;
331949e08aacStbbdev using detail::d1::graph_node;
332049e08aacStbbdev using detail::d1::continue_msg;
332149e08aacStbbdev
332249e08aacStbbdev using detail::d1::input_node;
332349e08aacStbbdev using detail::d1::function_node;
332449e08aacStbbdev using detail::d1::multifunction_node;
332549e08aacStbbdev using detail::d1::split_node;
332649e08aacStbbdev using detail::d1::output_port;
332749e08aacStbbdev using detail::d1::indexer_node;
332849e08aacStbbdev using detail::d1::tagged_msg;
332949e08aacStbbdev using detail::d1::cast_to;
333049e08aacStbbdev using detail::d1::is_a;
333149e08aacStbbdev using detail::d1::continue_node;
333249e08aacStbbdev using detail::d1::overwrite_node;
333349e08aacStbbdev using detail::d1::write_once_node;
333449e08aacStbbdev using detail::d1::broadcast_node;
333549e08aacStbbdev using detail::d1::buffer_node;
333649e08aacStbbdev using detail::d1::queue_node;
333749e08aacStbbdev using detail::d1::sequencer_node;
333849e08aacStbbdev using detail::d1::priority_queue_node;
333949e08aacStbbdev using detail::d1::limiter_node;
334049e08aacStbbdev using namespace detail::d1::graph_policy_namespace;
334149e08aacStbbdev using detail::d1::join_node;
334249e08aacStbbdev using detail::d1::input_port;
334349e08aacStbbdev using detail::d1::copy_body;
334449e08aacStbbdev using detail::d1::make_edge;
334549e08aacStbbdev using detail::d1::remove_edge;
334649e08aacStbbdev using detail::d1::tag_value;
334749e08aacStbbdev using detail::d1::composite_node;
334849e08aacStbbdev using detail::d1::async_node;
334949e08aacStbbdev using detail::d1::node_priority_t;
335049e08aacStbbdev using detail::d1::no_priority;
335149e08aacStbbdev
335249e08aacStbbdev #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
335349e08aacStbbdev using detail::d1::follows;
335449e08aacStbbdev using detail::d1::precedes;
335549e08aacStbbdev using detail::d1::make_node_set;
335649e08aacStbbdev using detail::d1::make_edges;
335749e08aacStbbdev #endif
335849e08aacStbbdev
335949e08aacStbbdev } // v1
336049e08aacStbbdev } // flow
336149e08aacStbbdev
336249e08aacStbbdev using detail::d1::flow_control;
336349e08aacStbbdev
336449e08aacStbbdev namespace profiling {
336549e08aacStbbdev using detail::d1::set_name;
336649e08aacStbbdev } // profiling
336749e08aacStbbdev
336849e08aacStbbdev } // tbb
336949e08aacStbbdev
337049e08aacStbbdev
3371734f0bc0SPablo Romero #if TBB_USE_PROFILING_TOOLS && ( __unix__ || __APPLE__ )
337249e08aacStbbdev // We don't do pragma pop here, since it still gives warning on the USER side
337349e08aacStbbdev #undef __TBB_NOINLINE_SYM
337449e08aacStbbdev #endif
337549e08aacStbbdev
337649e08aacStbbdev #endif // __TBB_flow_graph_H
3377